Rafal, hello.
I think that's a good work which you have done.
The code becomes more readable and hopefully optimal.
I am making some notes down the lines.
Thanks for addressing the issues I started to ask before this current
patch. The problem is described pretty good imo. Maybe to add couple
of things.
I have one concern about extending bitmap library with a function that
does not exist and needed. I strongly recommend to that instead of
to stop at one inch from the entry to the perfect code paradise :)
Other suggestions are left to your disposal.
Once more enjoy your vacation!
cheers,
Andrei
> ChangeSet@stripped, 2007-07-06 16:58:18+02:00, rafal@quant.(none) +8 -0
> BUG#21842 (Cluster fails to replicate to innodb or myisam with err 134
> using TPC-B):
>
> Problem: A RBR event can contain incomplete row data (only key value and
> fields which have been changed). In that case, when the row is unpacked
> into record and written to a table, the missing fields get incorrect NULL
do not they get generally defaults? Compare with
" The
+ missing columns are filled with default values".
At any rate that does not relate to ndb to ndb replication. Please
leave a note.
> values leading to master-slave inconsistency.
>
I'd explicitly added that the implemented solution performs the task
belonging to the engine, so it's rather a workaround needed until all
engines will respect table->write_set flags to process only the selected
fields.
> Solution: Use values found in slave's table for columns which are not given
> in the rows event. The code for writing a single row uses the following
> algorithm:
>
> 1. unpack row_data into table->record[0],
> 2. try to insert record,
> 3. if duplicate record found, fetch it into table->record[1],
> 4. unpack row_data into table->record[1],
> 5. write table->record[1] into the table.
>
> Where row_data is the row as stored in the data area of a rows event.
> Thus:
>
> a) unpacking of row_data happens at the time when row is written into
> a table,
>
> b) when unpacking (in step 4), only columns present in row_data are
> overwritten - all other columns remain as they were found in the table.
>
> Since all data needed for the above algorithm is stored inside
> Rows_log_event class, functions which locate and write rows are turned
> into methods of that class.
>
> replace_record() -> Rows_log_event::write_row()
> find_and_fetch_row() -> Rows_log_event::find_and_fetch_row()
That's logical. I suggest to name Rows_log_event::fetch_row() instead. After all
what's needed is what's fetched.
Actually you eliminated do_prepare_row's to hand its duties to
unpack_current_row(), i.e in some sense
do_prepare_row() -> unpack_current_row()
>
> Both methods take row data from event's data buffer - the row being
> processed is pointed by m_curr_row. They unpack the data as needed into
> table's record buffers record[0] or record[1]. When row is unpacked,
> m_curr_row_end is set to point at next row in the data buffer.
>
that's okay. \remark{I was thinking on m_next_row instead (since
m_curr_row_end becomes known only at the end of the current row
unpacking) but it does not really matter}
> Other changes introduced in this changeset:
>
> - Change signature of unpack_row(): don't report errors and don't
> setup table's rw_set here. Errors can happen only when setting default
> values in prepare_record() function and are detected there.
Would you agree with names wearing more info on the func does like
unpack_row_defaults, row_extra_fields_set_defaults() instead?
I wanted to add also "unpack_end" as a candidate but noticed
`prepare_record()' is called before `unpack_current_row()'.
And then there must be some extra work to fill the meaningful fields'
values with defaults. E.g
+ if ((error= prepare_record(log, m_table,m_width,
+ TRUE /* check if columns have def. values */)))
+ goto finish;
+
+ /* unpack row into m_table->record[0] */
+ error= unpack_current_row();
If that observation is correct could we avoid the extra work?
Why can not prepare_record's job be done after unpack_current_row? Or
rather why not to call the func at the end of unpack_current_row
in accord with the existing logic?
>
> - In Rows_log_event and derived classes, don't pass arguments to
> the execution primitives (do_...() member functions) but use class
> members instead.
>
> - The changes seem to fix rpl_ndb_extraCol test. Before it produced
> results different than the same test run on other storage engines.
> Now the results are identical. The result file is updated
> accordingly.
>
> mysql-test/r/rpl_ndb_extraCol.result@stripped, 2007-07-06 16:58:05+02:00,
> rafal@quant.(none) +27 -27
> Correct results. Now the results of the NDB test are the same as for
> other engines.
>
> sql/log_event.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +534 -572
> - Initialization of new Rows_log_event members.
> - Fixing some typos in documentation.
>
> In Rows_log_event::do_apply_event:
> - Set COMPLETE_ROWS_F flag if all colums are present in the row
> - Move initialization of tables write/read sets outside the rows
> processing loop (and out of unpack_row() function).
Mild suggestion.
There is a thing we could optimize perhaps current work.
m_cols member of Rows_log_event is set at constructing the event. The
only reason for the member is to propagate the bits to the final
table->write_set destination. So that could be done in do_exec_row()
where I have yet another stronger suggestion.
> - Remove calls to do_prepare_row() - no longer needed.
> - Add code managing m_curr_row and m_curr_row_end pointers.
>
> - Change signatures of row processing methods of Rows_log_event and it
> descendants - now most arguments are taken from class members.
> - Remove do_prepare_row() methods which are no longer used.
> - The auto_afree_ptr template is moved to rpl_utility.h (so that it can
> be used in log_event_old.cc).
> - Removed copy_extra_fields() function - no longer used.
> - Change replace_record() function into Rows_log_event::write_row()
> method.
> - Change find_and_fetch_row() function into method of Rows_log_event.
> - Don't allocate memory for m_after_image buffer which is not
> used anymore.
Okay.
> - Change implementation of Update_rows_log_event::do_exec_row() to use
> unpack_current_row() method to get before/after images.
>
> sql/log_event.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +50 -53
> - Add new COMPLETE_ROWS_F flag in Rows_log_event.
New Rows_log_event flag COMPLETE_ROWS_F is set locally on slave. Put
it into the common with RELAXED_UNIQUE_CHECKS_F etc reduces the space
for possible new future replicated flags.
Why not to make a member (a struct groupping slave side members) for
local (slave side) purposes instead?
> - Add Rows_log_event members describing the row being processed.
> - Add a pointer to key buffer which is used in derived classes.
> - Add unpack_current_row() method.
> - Change signatures of do_...() methods (replace method arguments by
> class members).
> - Remove do_prepare_row() method which is no longer used.
> - Update method documentation.
>
> sql/log_event_old.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +1109 -0
> Move here old implementation of Rows_log_event::do_apply_event() and
> helper methods.
Suggest to call them more precisely like
log_event_pre_canonic_format.cc,h
>
> sql/log_event_old.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +123 -7
> - Define new class Old_rows_log_event encapsulating old version of
> Rows_log_event::do_apply_event() and the helper methods.
> - Add the Old_rows_log_event class as a base for *_old versions of RBR event
> classes, ensure that the old version of do_apply_event() is called.
> - For *_old classes, declare the helper methods used in the old version of
> do_apply_event().
>
> sql/rpl_record.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +37 -33
> - Make unpack_row non-destructive for columns not present in the row.
> - Don't fill read/write set here as it is done outside these
> functions.
also optimization, okay.
> - Move initialization of a record with default values to a separate
> function prepare_record().
the better name and the order of invocation suggested above.
>
> sql/rpl_record.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +8 -5
> - Change signature of unpack_row().
> - Declare function prepare_record().
>
> sql/rpl_utility.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +31 -0
> Move auto_afree_ptr template here so that it can be re-used (currently
> in log_event.cc and log_event_old.cc).
>
> diff -Nrup a/mysql-test/r/rpl_ndb_extraCol.result
> b/mysql-test/r/rpl_ndb_extraCol.result
> --- a/mysql-test/r/rpl_ndb_extraCol.result 2007-06-11 22:15:19 +02:00
> +++ b/mysql-test/r/rpl_ndb_extraCol.result 2007-07-06 16:58:05 +02:00
> @@ -28,9 +28,9 @@ a b c
> *** Select from slave ***
> SELECT * FROM t1 ORDER BY a;
> a b c d e
> -1 2 TEXAS NULL NULL
> -2 1 AUSTIN NULL NULL
> -3 4 QA NULL NULL
> +1 2 TEXAS 2 TEST
> +2 1 AUSTIN 2 TEST
> +3 4 QA 2 TEST
> *** Drop t1 ***
> DROP TABLE t1;
> *** Create t3 on slave ***
> @@ -309,9 +309,9 @@ a b c
> *** Select from slave ***
> SELECT * FROM t7 ORDER BY a;
> a b c d e
> -1 b1b1 Kyle NULL NULL
> -2 b1b1 JOE NULL NULL
> -3 b1b1 QA NULL NULL
> +1 b1b1 Kyle 0000-00-00 00:00:00 Extra Column Testing
> +2 b1b1 JOE 0000-00-00 00:00:00 Extra Column Testing
> +3 b1b1 QA 0000-00-00 00:00:00 Extra Column Testing
> *** Drop t7 ***
> DROP TABLE t7;
> *** Create t8 on slave ***
> @@ -477,9 +477,9 @@ a b c
> *** Select on Slave ***
> SELECT * FROM t12 ORDER BY a;
> a b f c e
> -1 b1b1b1b1b1b1b1b1 Kyle NULL NULL
> -2 b1b1b1b1b1b1b1b1 JOE NULL NULL
> -3 b1b1b1b1b1b1b1b1 QA NULL NULL
> +1 b1b1b1b1b1b1b1b1 Kyle test 1
> +2 b1b1b1b1b1b1b1b1 JOE test 1
> +3 b1b1b1b1b1b1b1b1 QA test 1
> *** Drop t12 ***
> DROP TABLE t12;
> **** Extra Colums End ****
> @@ -509,9 +509,9 @@ a b c
> *** Select on Slave ****
> SELECT * FROM t13 ORDER BY a;
> a b c d e
> -1 b1b1b1b1b1b1b1b1 Kyle NULL CURRENT_TIMESTAMP
> -2 b1b1b1b1b1b1b1b1 JOE NULL CURRENT_TIMESTAMP
> -3 b1b1b1b1b1b1b1b1 QA NULL CURRENT_TIMESTAMP
> +1 b1b1b1b1b1b1b1b1 Kyle 1 CURRENT_TIMESTAMP
> +2 b1b1b1b1b1b1b1b1 JOE 1 CURRENT_TIMESTAMP
> +3 b1b1b1b1b1b1b1b1 QA 1 CURRENT_TIMESTAMP
> *** Drop t13 ***
> DROP TABLE t13;
> *** 22117 END ***
> @@ -545,9 +545,9 @@ c1 c2 c3 c4 c5
> *** Select on Slave ****
> SELECT * FROM t14 ORDER BY c1;
> c1 c2 c3 c4 c5 c6 c7
> -1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle NULL CURRENT_TIMESTAMP
> -2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE NULL CURRENT_TIMESTAMP
> -3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA NULL CURRENT_TIMESTAMP
> +1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle 1 CURRENT_TIMESTAMP
> +2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE 1 CURRENT_TIMESTAMP
> +3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA 1 CURRENT_TIMESTAMP
> *** connect to master and drop columns ***
> ALTER TABLE t14 DROP COLUMN c2;
> ALTER TABLE t14 DROP COLUMN c4;
> @@ -560,9 +560,9 @@ c1 c3 c5
> *** Select from Slave ***
> SELECT * FROM t14 ORDER BY c1;
> c1 c3 c5 c6 c7
> -1 Replication Testing Extra Col Kyle NULL CURRENT_TIMESTAMP
> -2 This Test Should work JOE NULL CURRENT_TIMESTAMP
> -3 If is does not, I will open a bug QA NULL CURRENT_TIMESTAMP
> +1 Replication Testing Extra Col Kyle 1 CURRENT_TIMESTAMP
> +2 This Test Should work JOE 1 CURRENT_TIMESTAMP
> +3 If is does not, I will open a bug QA 1 CURRENT_TIMESTAMP
> *** Drop t14 ***
> DROP TABLE t14;
> *** Create t15 on slave ***
> @@ -593,9 +593,9 @@ c1 c2 c3 c4 c5
> *** Select on Slave ****
> SELECT * FROM t15 ORDER BY c1;
> c1 c2 c3 c4 c5 c6 c7
> -1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle NULL CURRENT_TIMESTAMP
> -2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE NULL CURRENT_TIMESTAMP
> -3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA NULL CURRENT_TIMESTAMP
> +1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle 1 CURRENT_TIMESTAMP
> +2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE 1 CURRENT_TIMESTAMP
> +3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA 1 CURRENT_TIMESTAMP
> *** Add column on master that is a Extra on Slave ***
> ALTER TABLE t15 ADD COLUMN c6 INT AFTER c5;
> ********************************************
> @@ -653,9 +653,9 @@ c1 c2 c3 c4 c5 c6
> *** Try to select from slave ****
> SELECT * FROM t15 ORDER BY c1;
> c1 c2 c3 c4 c5 c6 c7
> -1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle NULL CURRENT_TIMESTAMP
> -2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE NULL CURRENT_TIMESTAMP
> -3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA NULL CURRENT_TIMESTAMP
> +1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle 1 CURRENT_TIMESTAMP
> +2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE 1 CURRENT_TIMESTAMP
> +3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA 1 CURRENT_TIMESTAMP
> 5 2.00 Replication Testing b1b1b1b1b1b1b1b1 Buda 2 CURRENT_TIMESTAMP
> *** DROP TABLE t15 ***
> DROP TABLE t15;
> @@ -687,9 +687,9 @@ c1 c2 c3 c4 c5
> *** Select on Slave ****
> SELECT * FROM t16 ORDER BY c1;
> c1 c2 c3 c4 c5 c6 c7
> -1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle NULL CURRENT_TIMESTAMP
> -2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE NULL CURRENT_TIMESTAMP
> -3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA NULL CURRENT_TIMESTAMP
> +1 1.00 Replication Testing Extra Col b1b1b1b1b1b1b1b1 Kyle 1 CURRENT_TIMESTAMP
> +2 2.00 This Test Should work b1b1b1b1b1b1b1b1 JOE 1 CURRENT_TIMESTAMP
> +3 3.00 If is does not, I will open a bug b1b1b1b1b1b1b1b1 QA 1 CURRENT_TIMESTAMP
> *** Add Partition on master ***
> ALTER TABLE t16 PARTITION BY KEY(c1) PARTITIONS 4;
> INSERT INTO t16 () VALUES(4,1.00,'Replication Rocks',@b1,'Omer');
> diff -Nrup a/sql/log_event.cc b/sql/log_event.cc
> --- a/sql/log_event.cc 2007-06-28 15:07:53 +02:00
> +++ b/sql/log_event.cc 2007-07-06 16:58:06 +02:00
> @@ -5635,13 +5635,14 @@ Rows_log_event::Rows_log_event(THD *thd_
> m_table_id(tid),
> m_width(tbl_arg ? tbl_arg->s->fields : 1),
> m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
> - m_flags(0)
> + m_curr_row(NULL), m_curr_row_end(NULL),
> + m_flags(0), m_key(NULL)
> {
> /*
> We allow a special form of dummy event when the table, and cols
> are null and the table id is ~0UL. This is a temporary
> solution, to be able to terminate a started statement in the
> - binary log: the extreneous events will be removed in the future.
> + binary log: the extraneous events will be removed in the future.
> */
> DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL ||
> !tbl_arg && !cols && tid == ~0UL);
> @@ -5650,7 +5651,7 @@ Rows_log_event::Rows_log_event(THD *thd_
> set_flags(NO_FOREIGN_KEY_CHECKS_F);
> if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
> set_flags(RELAXED_UNIQUE_CHECKS_F);
> - /* if bitmap_init fails, catched in is_valid() */
> + /* if bitmap_init fails, caught in is_valid() */
> if (likely(!bitmap_init(&m_cols,
> m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
> (m_width + 7) & ~7UL,
> @@ -5674,7 +5675,9 @@ Rows_log_event::Rows_log_event(const cha
> *description_event)
> : Log_event(buf, description_event),
> m_row_count(0),
> - m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
> + m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
> + m_curr_row(NULL), m_curr_row_end(NULL),
> + m_key(NULL)
> {
> DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
> uint8 const common_header_len= description_event->common_header_len;
> @@ -5708,7 +5711,7 @@ Rows_log_event::Rows_log_event(const cha
> DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
> m_width = net_field_length(&ptr_after_width);
> DBUG_PRINT("debug", ("m_width=%lu", m_width));
> - /* if bitmap_init fails, catched in is_valid() */
> + /* if bitmap_init fails, caught in is_valid() */
> if (likely(!bitmap_init(&m_cols,
> m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
> (m_width + 7) & ~7UL,
> @@ -5732,7 +5735,7 @@ Rows_log_event::Rows_log_event(const cha
> {
> DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
>
> - /* if bitmap_init fails, catched in is_valid() */
> + /* if bitmap_init fails, caught in is_valid() */
> if (likely(!bitmap_init(&m_cols_ai,
> m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai :
> NULL,
> (m_width + 7) & ~7UL,
> @@ -5761,6 +5764,7 @@ Rows_log_event::Rows_log_event(const cha
> m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
> if (likely((bool)m_rows_buf))
> {
> + m_curr_row= m_rows_buf;
> m_rows_end= m_rows_buf + data_size;
> m_rows_cur= m_rows_end;
> memcpy(m_rows_buf, ptr_rows_data, data_size);
> @@ -5865,7 +5869,6 @@ int Rows_log_event::do_apply_event(RELAY
> {
> DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)");
> int error= 0;
> - uchar const *row_start= m_rows_buf;
>
> /*
> If m_table_id == ~0UL, then we have a dummy event that does not
> @@ -6028,7 +6031,9 @@ int Rows_log_event::do_apply_event(RELAY
>
> DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count
> == 0);
>
> - TABLE* table=
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);
> + TABLE*
> + table=
> + m_table=
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);
But there is no need for the local var 'table'? Let's substitute it
with m_table.
>
> if (table)
> {
> @@ -6075,22 +6080,52 @@ int Rows_log_event::do_apply_event(RELAY
> */
> const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);
>
> - error= do_before_row_operations(table);
> - while (error == 0 && row_start < m_rows_end)
> - {
> - uchar const *row_end= NULL;
> - if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
> - break; // We should perform the after-row operation even in
> - // the case of error
> + if ( m_width == table->s->fields &&
> bitmap_is_set_all(&m_cols))
> + set_flags(COMPLETE_ROWS_F);
> +
> + /*
> + Initialize table's write and read sets. Extra columns are included
> + as they will be filled with default values.
> +
> + bit pos: 0 1 2 3 4 ... N-1 | N N+1 ... M |
> + bit val: X X X X X .... X | 1 1 ... 1 |
> +
> + N = number of columns on master (m_width)
> + M = number of columns on slave
> + X = bits from the row's column set (m_cols)
> +
> + We set all bits in read_set since we want to retrieve all columns even
> + when the row is not complete.
> + */
> +
> + bitmap_set_all(table->write_set);
> + bitmap_set_all(table->read_set);
>
> - DBUG_ASSERT(row_end != NULL); // cannot happen
> - DBUG_ASSERT(row_end <= m_rows_end);
> + /*
> + Note: I (Rafal) couldn't find any bitmap function which would
> + copy bits from m_cols to a prefix of write_set. This is why I use
> + simple loop below.
> + */
> + if (!get_flags(COMPLETE_ROWS_F))
> + for (uint bit=0 ; bit < m_width ; ++bit)
> + if (!bitmap_is_set(&m_cols,bit))
> + bitmap_clear_bit(table->write_set,bit);
> +
We started discussing this loop due to lack of primitive. Personally
I feel very uncomfortable to see that we use suboptimal solution which
can be avoided ralatively easy. I think we agreed previously that if
the patch would have to wait till you are back from vacation then this
issue should be addressed in the newer patch. How I see it can be
done. The current bitmap_intersect() fills the destination bitmap's
extra bits with zeros, which in our case should be with ones or rather
in case dest.n_bits > orig.n_bits the destination bitmap's
[dest.n_bits - orig.n_bits + 1, dest.n_bits] need preserving.
> + // Do event specific preparations
> +
> + error= do_before_row_operations(thd);
>
> + // row processing loop
> +
> + while (error == 0 && m_curr_row < m_rows_end)
> + {
> /* in_use can have been set to NULL in close_tables_for_reopen */
> THD* old_thd= table->in_use;
> if (!table->in_use)
> table->in_use= thd;
> - error= do_exec_row(table);
> +
> + error= do_exec_row(thd,rli);
> +
> table->in_use = old_thd;
> switch (error)
> {
> @@ -6110,20 +6145,39 @@ int Rows_log_event::do_apply_event(RELAY
> break;
> }
>
> - row_start= row_end;
> - }
> + /*
> + If m_curr_row_end was not set during event execution (e.g., because
> + of errors) we can't proceed to the next row. If the error is transient
> + (i.e., error==0 at this point) we must call unpack_row() to set
unpack_current_row() is definetly meant -------------------^
> + m_curr_row_end.
> + */
> +
> + if (!m_curr_row_end && !error)
> + unpack_current_row();
> +
> + // at this moment m_curr_row_end should be set
> + DBUG_ASSERT(error || m_curr_row_end != NULL);
> + DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
> + DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
> +
> + m_curr_row= m_curr_row_end;
> +
> + } // row processing loop
> +
> DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
> const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;);
> - error= do_after_row_operations(table, error);
> +
> + error= do_after_row_operations(thd, error);
> +
> if (!cache_stmt)
> {
> DBUG_PRINT("info", ("Marked that we need to keep log"));
> thd->options|= OPTION_KEEP_LOG;
> }
> - }
> + } // if (table)
>
> if (error)
> - { /* error has occured during the transaction */
> + { /* error has occurred during the transaction */
> rli->report(ERROR_LEVEL, thd->net.last_errno,
> "Error in %s event: error during transaction execution "
> "on table %s.%s",
> @@ -6169,7 +6223,7 @@ int Rows_log_event::do_apply_event(RELAY
> wait (reached end of last relay log and nothing gets appended
> there), we timeout after one minute, and notify DBA about the
> problem. When WL#2975 is implemented, just remove the member
> - st_relay_log_info::last_event_start_time and all its occurences.
> + st_relay_log_info::last_event_start_time and all its occurrences.
> */
> const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
> }
> @@ -6407,7 +6461,7 @@ Table_map_log_event::Table_map_log_event
> m_data_size+= m_tbllen + 2; // Include length and terminating \0
> m_data_size+= 1 + m_colcnt; // COLCNT and column types
>
> - /* If malloc fails, catched in is_valid() */
> + /* If malloc fails, caught in is_valid() */
> if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
> {
> m_coltype= reinterpret_cast<uchar*>(m_memory);
> @@ -6488,7 +6542,7 @@ Table_map_log_event::Table_map_log_event
> (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
> m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
>
> - /* Allocate mem for all fields in one go. If fails, catched in is_valid() */
> + /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
> m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> &m_dbnam, (uint) m_dblen + 1,
> &m_tblnam, (uint) m_tbllen + 1,
> @@ -6786,7 +6840,7 @@ Write_rows_log_event::Write_rows_log_eve
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Write_rows_log_event::do_before_row_operations(TABLE *table)
> +int Write_rows_log_event::do_before_row_operations(THD *thd)
> {
> int error= 0;
>
> @@ -6808,26 +6862,26 @@ int Write_rows_log_event::do_before_row_
> /*
> Do not raise the error flag in case of hitting to an unique attribute
> */
> - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> /*
> NDB specific: update from ndb master wrapped as Write_rows
> */
> /*
> so that the event should be applied to replace slave's row
> */
> - table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> /*
> NDB specific: if update from ndb master wrapped as Write_rows
> does not find the row it's assumed idempotent binlog applying
> is taking place; don't raise the error.
> */
> - table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> /*
> TODO: the cluster team (Tomas?) says that it's better if the engine knows
> how many rows are going to be inserted, then it can allocate needed memory
> from the start.
> */
> - table->file->ha_start_bulk_insert(0);
> + m_table->file->ha_start_bulk_insert(0);
> /*
> We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
> any TIMESTAMP column with data from the row but instead will use
> @@ -6843,45 +6897,29 @@ int Write_rows_log_event::do_before_row_
> some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
> analyze if explicit data is provided for slave's TIMESTAMP columns).
> */
> - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> return error;
> }
>
> -int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Write_rows_log_event::do_after_row_operations(THD *thd, int error)
> {
> int local_error= 0;
> - table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> - table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> /*
> reseting the extra with
> - table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
> + m_table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
> fires bug#27077
> todo: explain or fix
> */
> - if ((local_error= table->file->ha_end_bulk_insert()))
> + if ((local_error= m_table->file->ha_end_bulk_insert()))
> {
> - table->file->print_error(local_error, MYF(0));
> + m_table->file->print_error(local_error, MYF(0));
> }
> return error? error : local_error;
> }
>
> -int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> - TABLE *table,
> - uchar const *const row_start,
> - uchar const **const row_end)
> -{
> - DBUG_ASSERT(table != NULL);
> - DBUG_ASSERT(row_start && row_end);
> -
> - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> - &m_master_reclength, table->write_set,
> WRITE_ROWS_EVENT))
> - {
> - thd->net.last_errno= error;
> - return error;
> - }
> - bitmap_copy(table->read_set, table->write_set);
> - return 0;
> -}
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
>
> /*
> Check if there are more UNIQUE keys after the given key.
> @@ -6895,120 +6933,6 @@ last_uniq_key(TABLE *table, uint keyno)
> return 1;
> }
>
> -/* Anonymous namespace for template functions/classes */
> -namespace {
> -
> - /*
> - Smart pointer that will automatically call my_afree (a macro) when
> - the pointer goes out of scope. This is used so that I do not have
> - to remember to call my_afree() before each return. There is no
> - overhead associated with this, since all functions are inline.
> -
> - I (Matz) would prefer to use the free function as a template
> - parameter, but that is not possible when the "function" is a
> - macro.
> - */
> - template <class Obj>
> - class auto_afree_ptr
> - {
> - Obj* m_ptr;
> - public:
> - auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
> - ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
> - void assign(Obj* ptr) {
> - /* Only to be called if it hasn't been given a value before. */
> - DBUG_ASSERT(m_ptr == NULL);
> - m_ptr= ptr;
> - }
> - Obj* get() { return m_ptr; }
> - };
> -
> -}
> -
> -
> -/*
> - Copy "extra" columns from record[1] to record[0].
> -
> - Copy the extra fields that are not present on the master but are
> - present on the slave from record[1] to record[0]. This is used
> - after fetching a record that are to be updated, either inside
> - replace_record() or as part of executing an update_row().
> - */
> -static int
> -copy_extra_record_fields(TABLE *table,
> - size_t master_reclength,
> - my_ptrdiff_t master_fields)
> -{
> - DBUG_PRINT("info", ("Copying to 0x%lx "
> - "from field %lu at offset %lu "
> - "to field %d at offset %lu",
> - (long) table->record[0],
> - (ulong) master_fields, (ulong) master_reclength,
> - table->s->fields, table->s->reclength));
> - /*
> - Copying the extra fields of the slave that does not exist on
> - master into record[0] (which are basically the default values).
> - */
> - DBUG_ASSERT(master_reclength <= table->s->reclength);
> - if (master_reclength < table->s->reclength)
> - bmove_align(table->record[0] + master_reclength,
> - table->record[1] + master_reclength,
> - table->s->reclength - master_reclength);
> -
> - /*
> - Bit columns are special. We iterate over all the remaining
> - columns and copy the "extra" bits to the new record. This is
> - not a very good solution: it should be refactored on
> - opportunity.
> -
> - REFACTORING SUGGESTION (Matz). Introduce a member function
> - similar to move_field_offset() called copy_field_offset() to
> - copy field values and implement it for all Field subclasses. Use
> - this function to copy data from the found record to the record
> - that are going to be inserted.
> -
> - The copy_field_offset() function need to be a virtual function,
> - which in this case will prevent copying an entire range of
> - fields efficiently.
> - */
> - {
> - Field **field_ptr= table->field + master_fields;
> - for ( ; *field_ptr ; ++field_ptr)
> - {
> - /*
> - Set the null bit according to the values in record[1]
> - */
> - if ((*field_ptr)->maybe_null() &&
> -
> (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
> - (*field_ptr)->set_null();
> - else
> - (*field_ptr)->set_notnull();
> -
> - /*
> - Do the extra work for special columns.
> - */
> - switch ((*field_ptr)->real_type())
> - {
> - default:
> - /* Nothing to do */
> - break;
> -
> - case MYSQL_TYPE_BIT:
> - Field_bit *f= static_cast<Field_bit*>(*field_ptr);
> - if (f->bit_len > 0)
> - {
> - my_ptrdiff_t const offset= table->record[1] - table->record[0];
> - uchar const bits=
> - get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
> - set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
> - }
> - break;
> - }
> - }
> - }
> - return 0; // All OK
> -}
> -
> #define DBUG_PRINT_BITSET(N,FRM,BS) \
> do { \
> char buf[256]; \
> @@ -7044,58 +6968,89 @@ is_duplicate_key_error(int errcode)
> return false;
> }
>
> +/**
> + Write the current row into @c m_table.
>
> -/*
> - Replace the provided record in the database.
> + The row is located in the row buffer, pointed by @c m_curr_row member.
> + Number of columns of the row is stored in @c m_width member (it can be smaller
> + than the number of columns in the table to which we insert). Bitmap @c m_cols
> + indicates which columns are present in the row. It is assumed that event's
> + table is already open and pointed by @c m_table.
> +
> + If the same record already exists in the table it can be either overwritten
> + or an error is reported depending on the value of @c overwrite flag
> + (error reporting not yet implemented). Note that the matching record can be
> + different from the row we insert if we use primary keys to identify records in
> + the table.
> +
> + The row to be inserted can contain values only for selected columns. The
> + missing columns are filled with default values using @c prepare_record()
> + function. If a matching record is found in the table and overwritten, the
> + missing columns are taken from it.
> +
> + @param thd Thread context for writing the record.
> + @param log Logger for error reporting.
> + @param overwrite Shall we overwrite if the row already exists or signal
> + error (currently ignored).
> +
> + @returns Error code on failure, 0 on success.
> +
> + This method, if successful, sets @c m_curr_row_end pointer to point at the
> + next row in the rows buffer. This is done when unpacking the row to be
> + inserted.
>
> - SYNOPSIS
> - replace_record()
> - thd Thread context for writing the record.
> - table Table to which record should be written.
> - master_reclength
> - Offset to first column that is not present on the master,
> - alternatively the length of the record on the master
> - side.
> -
> - RETURN VALUE
> - Error code on failure, 0 on success.
> -
> - DESCRIPTION
> - Similar to how it is done in mysql_insert(), we first try to do
> - a ha_write_row() and of that fails due to duplicated keys (or
> - indices), we do an ha_update_row() or a ha_delete_row() instead.
> - */
> -static int
> -replace_record(THD *thd, TABLE *table,
> - ulong const master_reclength,
> - uint const master_fields)
> + @note If a matching record is found, it is either updated using
> + @c ha_update_row() function or first deleted and then new record written.
> +
> +*/
> +
> +int
> +Rows_log_event::write_row(THD *const thd,
> + const Slave_reporting_capability *const log,
> + const bool overwrite)
> {
> - DBUG_ENTER("replace_record");
> - DBUG_ASSERT(table != NULL && thd != NULL);
> + DBUG_ENTER("write_row");
> + DBUG_ASSERT(m_table != NULL && thd != NULL);
>
> int error;
> int keynum;
> auto_afree_ptr<char> key(NULL);
>
> -#ifndef DBUG_OFF
> - DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> - DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> - DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> -#endif
> + /* fill m_table->record[0] with default values */
> +
> + if ((error= prepare_record(log, m_table,m_width,
> + TRUE /* check if columns have def. values */)))
> + goto finish;
> +
> + /* unpack row into m_table->record[0] */
> + error= unpack_current_row(); // TODO: how to handle errors?
> +
> + DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> + DBUG_PRINT_BITSET("debug", "write_set = %s", m_table->write_set);
> + DBUG_PRINT_BITSET("debug", "read_set = %s", m_table->read_set);
> +
> + /*
> + Try to write record. If a corresponding record already exists in the table,
> + we try to change it using ha_update_row() if possible. Otherwise we delete
> + it and jump here to try writing again.
> +
> + TODO: Add safety measures against infinite looping.
> + */
>
> - while ((error= table->file->ha_write_row(table->record[0])))
> + write_row:
> +
> + if ((error= m_table->file->ha_write_row(m_table->record[0])))
> {
> + DBUG_PRINT("info",("ha_write_row() returned error %d",error));
> +
> if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
> + goto handler_error;
> +
> + if ((keynum= m_table->file->get_dup_key(error)) < 0)
> {
> - table->file->print_error(error, MYF(0)); /* to check at
> exec_relay_log_event */
> - DBUG_RETURN(error);
> - }
> - if ((keynum= table->file->get_dup_key(error)) < 0)
> - {
> - /* We failed to retrieve the duplicate key */
> - DBUG_RETURN(error);
> + DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns
> %d)",keynum));
> + goto handler_error;
> }
> -
> /*
> We need to retrieve the old row into record[1] to be able to
> either update or delete the offending record. We either:
> @@ -7106,50 +7061,70 @@ replace_record(THD *thd, TABLE *table,
> - use index_read_idx() with the key that is duplicated, to
> retrieve the offending row.
> */
> - if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
> + if (m_table->file->ha_table_flags() & HA_DUPLICATE_POS)
> {
> - error= table->file->rnd_pos(table->record[1],
> table->file->dup_ref);
> + DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
> +
> + error= m_table->file->rnd_pos(m_table->record[1],
> + m_table->file->dup_ref);
> if (error)
> {
> - table->file->print_error(error, MYF(0));
> - DBUG_RETURN(error);
> + DBUG_PRINT("info",("rnd_pos() returns error %d",error));
> + goto handler_error;
> }
> }
> else
> {
> - if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
> - {
> - DBUG_RETURN(my_errno);
> + DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
> +
> + if (m_table->file->extra(HA_EXTRA_FLUSH_CACHE))
> + {
> + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
> + error= my_errno;
> + goto finish;
> }
>
> if (key.get() == NULL)
> {
> -
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> +
> key.assign(static_cast<char*>(my_alloca(m_table->s->max_unique_length)));
> if (key.get() == NULL)
> - DBUG_RETURN(ENOMEM);
> + {
> + DBUG_PRINT("info",("Can't allocate key buffer"));
> + error= ENOMEM;
> + goto finish;
> + }
> }
>
> - key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> 0);
> - error= table->file->index_read_idx(table->record[1], keynum,
> - (const uchar*)key.get(),
> - HA_WHOLE_KEY,
> - HA_READ_KEY_EXACT);
> - if (error)
> - {
> - table->file->print_error(error, MYF(0));
> - DBUG_RETURN(error);
> + key_copy((uchar*)key.get(), m_table->record[0], m_table->key_info +
> keynum, 0);
> + if ((error= m_table->file->index_read_idx(m_table->record[1],
> keynum,
> + (const uchar*)key.get(),
> + HA_WHOLE_KEY,
> + HA_READ_KEY_EXACT)))
> + {
> + DBUG_PRINT("info",("index_read_idx() returns error %d",error));
> + goto handler_error;
> }
> }
>
> /*
> - Now, table->record[1] should contain the offending row. That
> + Now, record[1] should contain the offending row. That
> will enable us to update it or, alternatively, delete it (so
> that we can insert the new row afterwards).
> + */
>
> - First we copy the columns into table->record[0] that are not
> - present on the master from table->record[1], if there are any.
> + /*
> + If row is incomplete we will use the record found to fill
> + missing columns.
> */
> - copy_extra_record_fields(table, master_reclength, master_fields);
> + if (!get_flags(COMPLETE_ROWS_F))
> + {
> + restore_record(m_table,record[1]);
> + error= unpack_current_row();
> + }
> +
> + DBUG_PRINT("debug",("preparing for update: before and after image"));
> + DBUG_DUMP("record[1] (before)", m_table->record[1],
> m_table->s->reclength);
> + DBUG_DUMP("record[0] (after)", m_table->record[0],
> m_table->s->reclength);
>
> /*
> REPLACE is defined as either INSERT or DELETE + INSERT. If
> @@ -7166,37 +7141,71 @@ replace_record(THD *thd, TABLE *table,
> fail, so we're better off just deleting the row and inserting
> the correct row.
> */
> - if (last_uniq_key(table, keynum) &&
> - !table->file->referenced_by_foreign_key())
> + if (last_uniq_key(m_table, keynum) &&
> + !m_table->file->referenced_by_foreign_key())
> {
> - error=table->file->ha_update_row(table->record[1],
> - table->record[0]);
> - if (error && error != HA_ERR_RECORD_IS_THE_SAME)
> - table->file->print_error(error, MYF(0));
> - else
> + DBUG_PRINT("info",("Updating row using ha_update_row()"));
> + error= m_table->file->ha_update_row(m_table->record[1],
> + m_table->record[0]);
> + if (error == HA_ERR_RECORD_IS_THE_SAME)
> + {
> + DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME ret. value from"
> + " ha_update_row()"));
> error= 0;
> - DBUG_RETURN(error);
> + }
> +
> + if (error)
> + {
> + DBUG_PRINT("info",("ha_update_row() returns error %d",error));
> + goto handler_error;
> + }
> }
> else
> {
> - if ((error= table->file->ha_delete_row(table->record[1])))
> - {
> - table->file->print_error(error, MYF(0));
> - DBUG_RETURN(error);
> + DBUG_PRINT("info",("Deleting offending row and trying to write new one
> again"));
> + if ((error= m_table->file->ha_delete_row(m_table->record[1])))
> + {
> + DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
> + goto handler_error;
> }
> - /* Will retry ha_write_row() with the offending row removed. */
> +
> + /* try to write the row again */
> + goto write_row;
> }
> - }
>
> + } // if (ha_write_row())
> +
> +#ifndef DBUG_OFF
> + if (error)
> + DBUG_PRINT("info",("Failed to insert row (error=%d)",error));
> +#endif
> +
> + goto finish;
> +
> + handler_error:
> +
> + if (error)
> + m_table->file->print_error(error, MYF(0));
> +
> + finish:
> +
> DBUG_RETURN(error);
> }
>
> -int Write_rows_log_event::do_exec_row(TABLE *table)
> +#endif
> +
> +int Write_rows_log_event::do_exec_row(THD *thd,
> + const Slave_reporting_capability *const log)
> {
> - DBUG_ASSERT(table != NULL);
> - int error= replace_record(thd, table, m_master_reclength, m_width);
> - return error;
> + DBUG_ASSERT(m_table != NULL);
> + int error= write_row(thd, log, TRUE /* overwrite */);
> +
> + if (error && !thd->net.last_errno)
> + thd->net.last_errno= error;
> +
> + return error;
> }
> +
> #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
>
> #ifdef MYSQL_CLIENT
> @@ -7289,50 +7298,79 @@ record_compare_exit:
> return result;
> }
>
> +/**
> + Locate the current row in @c m_table.
>
> -/*
> - Find the row given by 'key', if the table has keys, or else use a table scan
> - to find (and fetch) the row.
> + The current row is stored in event's row buffer, pointed by @c m_curr_row
> + member. Member @c m_width tells how many columns are there in the row (this
> + number can be smaller than the number of columns in the table). It is assumed
> + that event's table is already open and pointed by @c m_table.
> +
> + If a corresponding record is found in the table it is stored in
> + @c m_table->record[0]. Note that when record is located based on a primary
> + key, it is possible that the record found differs from the row being located.
> +
> + If no key is specified or table does not have keys, a table scan is used to
> + find the row. In that case the row should be complete and contain values for
> + all columns. However, it can still be shorter than the table, i.e. the table
> + can contain extra columns not present in the row.
> +
> + @returns Error code on failure, 0 on success. If success @c m_table->record[0]
>
> + contains the record found. Also, the internal "cursor" of the table is
> + positioned at the record found.
>
> - If the engine allows random access of the records, a combination of
> - position() and rnd_pos() will be used.
> + @note If the engine allows random access of the records, a combination of
> + @c position() and @c rnd_pos() will be used.
> + */
>
> - @param table Pointer to table to search
> - @param key Pointer to key to use for search, if table has key
> +int Rows_log_event::find_and_fetch_row()
> +{
> + DBUG_ENTER("find_and_fetch_row");
>
> - @pre <code>table->record[0]</code> shall contain the row to locate
> - and <code>key</code> shall contain a key to use for searching, if
> - the engine has a key.
> -
> - @post If the return value is zero, <code>table->record[1]</code>
> - will contain the fetched row and the internal "cursor" will refer to
> - the row. If the return value is non-zero,
> - <code>table->record[1]</code> is undefined. In either case,
> - <code>table->record[0]</code> is undefined.
> + DBUG_ASSERT(m_table && m_table->in_use != NULL);
>
> - @return Zero if the row was successfully fetched into
> - <code>table->record[1]</code>, error code otherwise.
> - */
> + int error;
>
> -static int find_and_fetch_row(TABLE *table, uchar *key)
> -{
> - DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
> - DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
> - (long) table, (long) key, (long) table->record[1]));
> + /* unpack row - missing fields get default values */
> +
> + prepare_record(NULL,m_table,m_width,FALSE /* don't check errors */); // TODO:
> shall we check and report errors here?
> + error= unpack_current_row();
> +
> + DBUG_PRINT("info",("looking for the following record"));
> + DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> +
> + /* Select best method for locating the row based on handler capabilities */
> +
> + enum { TABLE_SCAN, INDEX, POSITION } method= TABLE_SCAN;
> +
> + if ((m_table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> + m_table->s->primary_key < MAX_KEY)
> + method= POSITION;
> + else if (m_table->s->keys > 0)
> + method= INDEX;
> +
> + if (method != POSITION)
> + {
> + m_table->use_all_columns();
> + /*
> + Save unpacked row in record[1] for comparisons during scans if they happen.
> + Note: missing columns are filled with default values and these will be used
> + in comparisons. TODO: better handling of missing columns in comparisons.
> + */
> + store_record(m_table,record[1]);
> + }
>
> - DBUG_ASSERT(table->in_use != NULL);
> + /* Locate row using selected method. The record found is stored in record[0] */
>
> - DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> + switch (method) {
>
> - if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> - table->s->primary_key < MAX_KEY)
> + case POSITION:
> {
> + DBUG_PRINT("info",("locating record using primary key (position)"));
> /*
> - Use a more efficient method to fetch the record given by
> - table->record[0] if the engine allows it. We first compute a
> - row reference using the position() member function (it will be
> - stored in table->file->ref) and the use rnd_pos() to position
> - the "cursor" (i.e., record[0] in this case) at the correct row.
> + We compute a row reference using position() member function
> + (it will be stored in table->file->ref) and then use rnd_pos() to
> + position the "cursor" (i.e., record[0] in this case) at the correct row.
>
> TODO: Add a check that the correct record has been fetched by
> comparing with the original record. Take into account that the
> @@ -7343,36 +7381,34 @@ static int find_and_fetch_row(TABLE *tab
> int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
> table->s->reclength) == 0);
> -
> */
> - table->file->position(table->record[0]);
> - int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> - /*
> - rnd_pos() returns the record in table->record[0], so we have to
> - move it to table->record[1].
> - */
> - bmove_align(table->record[1], table->record[0],
> table->s->reclength);
> - DBUG_RETURN(error);
> + m_table->file->position(m_table->record[0]);
> + if ((error= m_table->file->rnd_pos(m_table->record[0],
> m_table->file->ref)))
> + goto handler_error;
> +
> + break;
> }
>
> - /* We need to retrieve all fields */
> - /* TODO: Move this out from this function to main loop */
> - table->use_all_columns();
> -
> - if (table->s->keys > 0)
> - {
> - int error;
> - /* We have a key: search the table using the index */
> - if (!table->file->inited && (error=
> table->file->ha_index_init(0, FALSE)))
> - DBUG_RETURN(error);
> + case INDEX:
> + {
> + DBUG_PRINT("info",("locating record using primary key (index_read)"));
>
> - /*
> - Don't print debug messages when running valgrind since they can
> - trigger false warnings.
> - */
> + /* Initialize primary key index (key #0) */
> +
> + if (!m_table->file->inited && (error=
> m_table->file->ha_index_init(0, FALSE)))
> + goto handler_error;
> +
> + /* Fill key data for the row */
> +
> + DBUG_ASSERT(m_key);
> + key_copy(m_key, m_table->record[0], m_table->key_info, 0);
> +
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> #ifndef HAVE_purify
> - DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> - DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> + DBUG_DUMP("key data", m_key, m_table->key_info->key_length);
> #endif
>
> /*
> @@ -7382,14 +7418,16 @@ static int find_and_fetch_row(TABLE *tab
> correctly.
> */
> my_ptrdiff_t const pos=
> - table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
> - table->record[1][pos]= 0xFF;
> - if ((error= table->file->index_read(table->record[1], key,
> HA_WHOLE_KEY,
> - HA_READ_KEY_EXACT)))
> - {
> - table->file->print_error(error, MYF(0));
> - table->file->ha_index_end();
> - DBUG_RETURN(error);
> + m_table->s->null_bytes > 0 ? m_table->s->null_bytes - 1 : 0;
> + m_table->record[0][pos]= 0xFF;
> +
> + if ((error= m_table->file->index_read(m_table->record[0], m_key,
> + HA_WHOLE_KEY,
> + HA_READ_KEY_EXACT)))
> + {
> + DBUG_PRINT("info",("no record matching the key found in the table"));
> + m_table->file->ha_index_end();
> + goto handler_error;
> }
>
> /*
> @@ -7397,8 +7435,8 @@ static int find_and_fetch_row(TABLE *tab
> trigger false warnings.
> */
> #ifndef HAVE_purify
> - DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> - DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> + DBUG_PRINT("info",("found first matching record"));
> + DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> #endif
> /*
> Below is a minor "optimization". If the key (i.e., key number
> @@ -7414,92 +7452,156 @@ static int find_and_fetch_row(TABLE *tab
> found. I can see no scenario where it would be incorrect to
> chose the row to change only using a PK or an UNNI.
> */
> - if (table->key_info->flags & HA_NOSAME)
> + if (!(m_table->key_info->flags & HA_NOSAME))
> {
> - table->file->ha_index_end();
> - DBUG_RETURN(0);
> - }
> + /*
> + In case key is not unique, we still have to iterate over records found
> + and find the one which is identical to the row given. The row is unpacked
> + in record[1] where missing columns are filled with default values.
> + */
> + DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
>
>
> - while (record_compare(table))
> - {
> - int error;
> + while (record_compare(m_table))
> + {
> + /*
> + We need to set the null bytes to ensure that the filler bit
> + are all set when returning. There are storage engines that
> + just set the necessary bits on the bytes and don't set the
> + filler bits correctly.
> +
> + TODO[record format ndb]: Remove this code once NDB returns the
> + correct record format.
> + */
> + if (m_table->s->null_bytes > 0)
> + {
> + m_table->record[0][m_table->s->null_bytes - 1]|=
> + 256U - (1U << m_table->s->last_null_bit_pos);
> + }
> +
> + if ((error= m_table->file->index_next(m_table->record[0])))
> + {
> + DBUG_PRINT("info",("no record matching the given row found"));
> + m_table->file->ha_index_end();
> + goto handler_error;
> + }
>
> - /*
> - We need to set the null bytes to ensure that the filler bit
> - are all set when returning. There are storage engines that
> - just set the necessary bits on the bytes and don't set the
> - filler bits correctly.
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> +#ifndef HAVE_purify
> + DBUG_PRINT("info",("found next record"));
> + DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> +#endif
> + } // index scan loop
>
> - TODO[record format ndb]: Remove this code once NDB returns the
> - correct record format.
> - */
> - if (table->s->null_bytes > 0)
> - {
> - table->record[1][table->s->null_bytes - 1]|=
> - 256U - (1U << table->s->last_null_bit_pos);
> - }
> + } // if !<unique_index>
>
> - if ((error= table->file->index_next(table->record[1])))
> - {
> - table->file->print_error(error, MYF(0));
> - table->file->ha_index_end();
> - DBUG_RETURN(error);
> - }
> - }
> + m_table->file->ha_index_end();
>
> - /*
> - Have to restart the scan to be able to fetch the next row.
> - */
> - table->file->ha_index_end();
> + break;
> }
> - else
> - {
> +
> + case TABLE_SCAN:
> + {
> + DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
> +
> int restart_count= 0; // Number of times scanning has restarted from top
> - int error;
>
> - /* We don't have a key: search the table using rnd_next() */
> - if ((error= table->file->ha_rnd_init(1)))
> - return error;
> + if ((error= m_table->file->ha_rnd_init(1)))
> + {
> + DBUG_PRINT("info",("error initializing table scan (ha_rnd_init)"));
> + goto handler_error;
> + }
>
> /* Continue until we find the right record or have made a full loop */
> + /*
> + TODO: record_compare() below will take into account all fields in
> + table->record[1] - some of them might be not present in the row given and
> are
> + filled with default values. Eventually, only the fields present in the row
> + should be used to determine if a record was found or not.
> +
> + Idea: change signature of record_compare to use row data for comparison.
> + I.e., function should check if columns present in the row have the same
> + value in table->record[0].
> + */
> do
> {
> - error= table->file->rnd_next(table->record[1]);
> -
> - DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> - DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
> + error= m_table->file->rnd_next(m_table->record[0]);
>
> switch (error) {
> +
> case 0:
> - case HA_ERR_RECORD_DELETED:
> - break;
> + DBUG_DUMP("record found", m_table->record[0],
> m_table->s->reclength);
> +
> + case HA_ERR_RECORD_DELETED:
> + break;
>
> case HA_ERR_END_OF_FILE:
> - if (++restart_count < 2)
> - table->file->ha_rnd_init(1);
> - break;
> + if (++restart_count < 2)
> + m_table->file->ha_rnd_init(1); // TODO: handle error...
> + break;
>
> default:
> - table->file->print_error(error, MYF(0));
> - DBUG_PRINT("info", ("Record not found"));
> - table->file->ha_rnd_end();
> - DBUG_RETURN(error);
> + DBUG_PRINT("info", ("Failed to get next record (rnd_next)"));
> + m_table->file->ha_rnd_end();
> + goto handler_error;
> }
> }
> - while (restart_count < 2 && record_compare(table));
> + while (restart_count < 2 && record_compare(m_table));
>
> - /*
> - Have to restart the scan to be able to fetch the next row.
> - */
> - DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
> - table->file->ha_rnd_end();
> + m_table->file->ha_rnd_end();
>
> DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
> - DBUG_RETURN(error);
> +
> + if (error || restart_count == 2)
> + {
> + DBUG_PRINT("info",("Record not found"));
> + error= error? error: HA_ERR_END_OF_FILE;
> + goto finish;
> + }
> +
> + break;
> }
>
> - DBUG_RETURN(0);
> + default:
> + DBUG_ASSERT(FALSE);
> +
> + } // switch (method)
> +
> + if (error)
> + {
> + DBUG_PRINT("info", ("Record not found"));
> + goto finish;
> + }
> +
> + /*
> + We have located the row in the table - it is stored in record[0].
> + */
> +
> + DBUG_PRINT("info", ("Record found"));
> +
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> +#ifndef HAVE_purify
> + DBUG_DUMP("record found", m_table->record[0], m_table->s->reclength);
> +#endif
> +
> + goto finish;
> +
> + handler_error:
> +
> + if (error)
> + m_table->file->print_error(error, MYF(0));
> +
> + finish:
> +
> + DBUG_RETURN(error);
> +
> }
> +
> #endif
>
> /*
> @@ -7511,9 +7613,6 @@ Delete_rows_log_event::Delete_rows_log_e
> ulong tid, MY_BITMAP const *cols,
> bool is_transactional)
> : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
> -#ifdef HAVE_REPLICATION
> - ,m_memory(NULL), m_key(NULL), m_after_image(NULL)
> -#endif
> {
> }
> #endif /* #if !defined(MYSQL_CLIENT) */
> @@ -7525,23 +7624,16 @@ Delete_rows_log_event::Delete_rows_log_e
> Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
> const Format_description_log_event
> *description_event)
> -#if defined(MYSQL_CLIENT)
> : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
> -#else
> - : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
> - m_memory(NULL), m_key(NULL), m_after_image(NULL)
> -#endif
> {
> }
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Delete_rows_log_event::do_before_row_operations(TABLE *table)
> +int Delete_rows_log_event::do_before_row_operations(THD *thd)
> {
> - DBUG_ASSERT(m_memory == NULL);
> -
> - if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> - table->s->primary_key < MAX_KEY)
> + if ((m_table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> + m_table->s->primary_key < MAX_KEY)
> {
> /*
> We don't need to allocate any memory for m_after_image and
> @@ -7550,87 +7642,38 @@ int Delete_rows_log_event::do_before_row
> return 0;
> }
>
> - int error= 0;
> -
> - if (table->s->keys > 0)
> + if (m_table->s->keys > 0)
> {
> - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> - &m_after_image,
> - (uint) table->s->reclength,
> - &m_key,
> - (uint) table->key_info->key_length,
> - NullS);
> + // Allocate buffer for key searches
> + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
> + if (!m_key)
> + return HA_ERR_OUT_OF_MEM;
> }
> - else
> - {
> - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> - m_memory= (uchar*)m_after_image;
> - m_key= NULL;
> - }
> - if (!m_memory)
> - return HA_ERR_OUT_OF_MEM;
> -
> - return error;
> + return 0;
> }
>
> -int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Delete_rows_log_event::do_after_row_operations(THD *thd, int error)
> {
> /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> - table->file->ha_index_or_rnd_end();
> - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
> - m_memory= NULL;
> - m_after_image= NULL;
> + m_table->file->ha_index_or_rnd_end();
> + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR));
> m_key= NULL;
>
> return error;
> }
>
> -int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> - TABLE *table,
> - uchar const *const row_start,
> - uchar const **const row_end)
> -{
> - DBUG_ASSERT(row_start && row_end);
> - /*
> - This assertion actually checks that there is at least as many
> - columns on the slave as on the master.
> - */
> - DBUG_ASSERT(table->s->fields >= m_width);
> -
> - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> - &m_master_reclength, table->read_set,
> DELETE_ROWS_EVENT))
> - {
> - thd->net.last_errno= error;
> - return error;
> - }
> -
> - /*
> - If we will access rows using the random access method, m_key will
> - be set to NULL, so we do not need to make a key copy in that case.
> - */
> - if (m_key)
> - {
> - KEY *const key_info= table->key_info;
> -
> - key_copy(m_key, table->record[0], key_info, 0);
> - }
> -
> - return 0;
> -}
> -
> -int Delete_rows_log_event::do_exec_row(TABLE *table)
> +int Delete_rows_log_event::do_exec_row(THD *thd,
> + const Slave_reporting_capability *const)
> {
> int error;
> - DBUG_ASSERT(table != NULL);
> + DBUG_ASSERT(m_table != NULL);
>
> - if (!(error= find_and_fetch_row(table, m_key)))
> + if (!(error= find_and_fetch_row()))
> {
> /*
> - Now we should have the right row to delete. We are using
> - record[0] since it is guaranteed to point to a record with the
> - correct value.
> + Delete the record found, located in record[0]
> */
> - error= table->file->ha_delete_row(table->record[0]);
> + error= m_table->file->ha_delete_row(m_table->record[0]);
> }
> return error;
> }
> @@ -7660,10 +7703,6 @@ Update_rows_log_event::Update_rows_log_e
> MY_BITMAP const *cols_ai,
> bool is_transactional)
> : Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional)
> -#ifdef HAVE_REPLICATION
> - , m_memory(NULL), m_key(NULL)
> -
> -#endif
> {
> init(cols_ai);
> }
> @@ -7673,16 +7712,13 @@ Update_rows_log_event::Update_rows_log_e
> MY_BITMAP const *cols,
> bool is_transactional)
> : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
> -#ifdef HAVE_REPLICATION
> - , m_memory(NULL), m_key(NULL)
> -#endif
> {
> init(cols);
> }
>
> void Update_rows_log_event::init(MY_BITMAP const *cols)
> {
> - /* if bitmap_init fails, catched in is_valid() */
> + /* if bitmap_init fails, caught in is_valid() */
> if (likely(!bitmap_init(&m_cols_ai,
> m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
> (m_width + 7) & ~7UL,
> @@ -7712,152 +7748,78 @@ Update_rows_log_event::Update_rows_log_e
> const
> Format_description_log_event
> *description_event)
> -#if defined(MYSQL_CLIENT)
> : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
> -#else
> - : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event),
> - m_memory(NULL), m_key(NULL)
> -#endif
> {
> }
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Update_rows_log_event::do_before_row_operations(TABLE *table)
> +int Update_rows_log_event::do_before_row_operations(THD *thd)
> {
> - DBUG_ASSERT(m_memory == NULL);
> -
> - int error= 0;
> -
> - if (table->s->keys > 0)
> + if (m_table->s->keys > 0)
> {
> - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> - &m_after_image,
> - (uint) table->s->reclength,
> - &m_key,
> - (uint) table->key_info->key_length,
> - NullS);
> + // Allocate buffer for key searches
> + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
> + if (!m_key)
> + return HA_ERR_OUT_OF_MEM;
> }
> - else
> - {
> - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> - m_memory= m_after_image;
> - m_key= NULL;
> - }
> - if (!m_memory)
> - return HA_ERR_OUT_OF_MEM;
>
> - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
>
> - return error;
> + return 0;
> }
>
> -int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Update_rows_log_event::do_after_row_operations(THD *thd, int error)
> {
> /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> - table->file->ha_index_or_rnd_end();
> - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
> - m_memory= NULL;
> - m_after_image= NULL;
> + m_table->file->ha_index_or_rnd_end();
> + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
> m_key= NULL;
>
> return error;
> }
>
> -int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> - TABLE *table,
> - uchar const *const row_start,
> - uchar const **const row_end)
> +int Update_rows_log_event::do_exec_row(THD *thd,
> + const Slave_reporting_capability *const)
> {
> - int error;
> - DBUG_ASSERT(row_start && row_end);
> - /*
> - This assertion actually checks that there is at least as many
> - columns on the slave as on the master.
> - */
> - DBUG_ASSERT(table->s->fields >= m_width);
> -
> - /*
> - We need to perform some juggling below since unpack_row() always
> - unpacks into table->record[0]. For more information, see the
> - comments for unpack_row().
> - */
> -
> - /* record[0] is the before image for the update */
> - if ((error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> - &m_master_reclength, table->read_set,
> UPDATE_ROWS_EVENT)))
> - {
> - thd->net.last_errno= error;
> - return error;
> - }
> + DBUG_ASSERT(m_table != NULL);
> + DBUG_ASSERT(m_table->s->fields >= m_width);
>
> - store_record(table, record[1]);
> - uchar const *next_start = *row_end;
> - /* m_after_image is the after image for the update */
> - if ((error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end,
> - &m_master_reclength, table->write_set,
> UPDATE_ROWS_EVENT)))
> - {
> - thd->net.last_errno= error;
> + int error= find_and_fetch_row();
> + if (error)
> return error;
> - }
> -
> - bmove_align(m_after_image, table->record[0], table->s->reclength);
> - restore_record(table, record[1]);
> -
> - /*
> - Don't print debug messages when running valgrind since they can
> - trigger false warnings.
> - */
> -#ifndef HAVE_purify
> - DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> - DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
> -#endif
>
> /*
> - If we will access rows using the random access method, m_key will
> - be set to NULL, so we do not need to make a key copy in that case.
> - */
> - if (m_key)
> - {
> - KEY *const key_info= table->key_info;
> + This is the situation after locating BI:
>
> - key_copy(m_key, table->record[0], key_info, 0);
> - }
> + ===|=== before image ====|=== after image ===|===
> + ^ ^
> + m_curr_row m_curr_row_end
>
> - return error;
> -}
> + BI found in the table is stored in record[0]. We copy it to record[1]
> + and unpack AI to record[0].
> + */
>
> -int Update_rows_log_event::do_exec_row(TABLE *table)
> -{
> - DBUG_ASSERT(table != NULL);
> + store_record(m_table,record[1]);
>
> - int error= find_and_fetch_row(table, m_key);
> - if (error)
> - return error;
> -
> - /*
> - We have to ensure that the new record (i.e., the after image) is
> - in record[0] and the old record (i.e., the before image) is in
> - record[1]. This since some storage engines require this (for
> - example, the partition engine).
> -
> - Since find_and_fetch_row() puts the fetched record (i.e., the old
> - record) in record[1], we can keep it there. We put the new record
> - (i.e., the after image) into record[0], and copy the fields that
> - are on the slave (i.e., in record[1]) into record[0], effectively
> - overwriting the default values that where put there by the
> - unpack_row() function.
> - */
> - bmove_align(table->record[0], m_after_image, table->s->reclength);
> - copy_extra_record_fields(table, m_master_reclength, m_width);
> + m_curr_row= m_curr_row_end;
> + error= unpack_current_row(); // this also updates m_curr_row_end
>
> /*
> Now we have the right row to update. The old row (the one we're
> - looking for) is in record[1] and the new row has is in record[0].
> - We also have copied the original values already in the slave's
> - database into the after image delivered from the master.
> + looking for) is in record[1] and the new row is in record[0].
> */
> - error= table->file->ha_update_row(table->record[1],
> table->record[0]);
> +#ifndef HAVE_purify
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> + DBUG_PRINT("info",("Updating row in table"));
> + DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
> + DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
> +#endif
> +
> + error= m_table->file->ha_update_row(m_table->record[1],
> m_table->record[0]);
> if (error == HA_ERR_RECORD_IS_THE_SAME)
> error= 0;
>
> diff -Nrup a/sql/log_event.h b/sql/log_event.h
> --- a/sql/log_event.h 2007-06-05 01:15:00 +02:00
> +++ b/sql/log_event.h 2007-07-06 16:58:06 +02:00
> @@ -23,6 +23,10 @@
>
> #include <my_bitmap.h>
> #include "rpl_constants.h"
> +#ifndef MYSQL_CLIENT
> +#include "rpl_record.h"
> +#include "rpl_reporting.h"
> +#endif
>
> #define LOG_READ_EOF -1
> #define LOG_READ_BOGUS -2
> @@ -2135,7 +2139,13 @@ public:
> NO_FOREIGN_KEY_CHECKS_F = (1U << 1),
>
> /* Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */
> - RELAXED_UNIQUE_CHECKS_F = (1U << 2)
> + RELAXED_UNIQUE_CHECKS_F = (1U << 2),
> +
> + /*
> + Indicates that rows in this event are complete, that is contain
> + values for all columns of the table.
> + */
> + COMPLETE_ROWS_F = (1U << 3)
> };
>
> typedef uint16 flag_set;
> @@ -2239,7 +2249,25 @@ protected:
> uchar *m_rows_cur; /* One-after the end of the data */
> uchar *m_rows_end; /* One-after the end of the allocated space */
>
> + const uchar *m_curr_row; /* Start of the row being processed */
> + const uchar *m_curr_row_end; /* One-after the end of the current row */
> +
> flag_set m_flags; /* Flags for row-level events */
> + uchar *m_key; /* Buffer to keep key value during searches */
> +
> + /* helper functions */
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> + int find_and_fetch_row();
> + int write_row(THD *const, const Slave_reporting_capability *const, const bool);
> +
> + // Unpack the current row into m_table->record[0]
Suggest for the function to have a self-check
> + int unpack_current_row()
> + {
int ret=
> + ::unpack_row(m_table, m_width, m_curr_row, &m_cols,
> + &m_curr_row_end, &m_master_reclength);
DBUG_ASSERT(m_curr_row_end >= m_curr_row);
return ret;
> + }
> +#endif
>
> private:
>
> @@ -2264,7 +2292,7 @@ private:
> The member function will return 0 if all went OK, or a non-zero
> error code otherwise.
> */
> - virtual int do_before_row_operations(TABLE *table) = 0;
> + virtual int do_before_row_operations(THD*) = 0;
>
> /*
> Primitive to clean up after a sequence of row executions.
> @@ -2274,45 +2302,31 @@ private:
> After doing a sequence of do_prepare_row() and do_exec_row(),
> this member function should be called to clean up and release
> any allocated buffers.
> +
> + The error argument, if non-zero, indicates an error which happened during
> + row processing before this function was called. In this case, even if
> + function is successful, it should return the error code given in the
> argument.
> */
> - virtual int do_after_row_operations(TABLE *table, int error) = 0;
> -
> - /*
> - Primitive to prepare for handling one row in a row-level event.
> -
> - DESCRIPTION
> -
> - The member function prepares for execution of operations needed for one
> - row in a row-level event by reading up data from the buffer containing
> - the row. No specific interpretation of the data is normally done here,
> - since SQL thread specific data is not available: that data is made
> - available for the do_exec function.
> -
> - A pointer to the start of the next row, or NULL if the preparation
> - failed. Currently, preparation cannot fail, but don't rely on this
> - behavior.
> -
> - RETURN VALUE
> - Error code, if something went wrong, 0 otherwise.
> - */
> - virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> - uchar const *row_start,
> - uchar const **row_end) = 0;
> + virtual int do_after_row_operations(THD*,int error) = 0;
>
> /*
> Primitive to do the actual execution necessary for a row.
>
> DESCRIPTION
> The member function will do the actual execution needed to handle a row.
> + The row is located at m_curr_row. When the function returns,
> + m_curr_row_end should point at the next row (one byte after the end
> + of the current row).
>
> RETURN VALUE
> 0 if execution succeeded, 1 if execution failed.
>
> */
> - virtual int do_exec_row(TABLE *table) = 0;
> + virtual int do_exec_row(THD*, const Slave_reporting_capability *const) = 0;
> #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> -};
>
> + friend class Old_rows_log_event;
> +};
>
> /*****************************************************************************
>
> @@ -2362,14 +2376,9 @@ private:
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> - uchar *m_memory;
> - uchar *m_after_image;
> -
> - virtual int do_before_row_operations(TABLE *table);
> - virtual int do_after_row_operations(TABLE *table, int error);
> - virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> - uchar const *row_start, uchar const **row_end);
> - virtual int do_exec_row(TABLE *table);
> + virtual int do_before_row_operations(THD*);
> + virtual int do_after_row_operations(THD*,int);
> + virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
> #endif
> };
>
The following hunk positioned - before your patch - do_ methods under protected label
which
is differently from the similar code above.
Whould it be the right moment to syncronize them?
> @@ -2441,15 +2450,9 @@ protected:
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> - uchar *m_memory;
> - uchar *m_key;
> - uchar *m_after_image;
> -
> - virtual int do_before_row_operations(TABLE *table);
> - virtual int do_after_row_operations(TABLE *table, int error);
> - virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> - uchar const *row_start, uchar const **row_end);
> - virtual int do_exec_row(TABLE *table);
> + virtual int do_before_row_operations(THD*);
> + virtual int do_after_row_operations(THD*,int);
> + virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
> #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> };
>
> @@ -2512,15 +2515,9 @@ protected:
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> - uchar *m_memory;
> - uchar *m_key;
> - uchar *m_after_image;
> -
> - virtual int do_before_row_operations(TABLE *table);
> - virtual int do_after_row_operations(TABLE *table, int error);
> - virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> - uchar const *row_start, uchar const **row_end);
> - virtual int do_exec_row(TABLE *table);
> + virtual int do_before_row_operations(THD*);
> + virtual int do_after_row_operations(THD*,int);
> + virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
> #endif
> };
>
> diff -Nrup a/sql/log_event_old.cc b/sql/log_event_old.cc
> --- a/sql/log_event_old.cc 2007-05-10 11:59:28 +02:00
> +++ b/sql/log_event_old.cc 2007-07-06 16:58:06 +02:00
> @@ -1,9 +1,961 @@
>
> #include "mysql_priv.h"
> +#ifndef MYSQL_CLIENT
> +#include "rpl_rli.h"
> +#include "rpl_utility.h"
> +#endif
> #include "log_event_old.h"
> #include "rpl_record_old.h"
>
> +#define DBUG_PRINT_BITSET(N,FRM,BS) \
> + do { \
> + char buf[256]; \
> + for (uint i = 0 ; i < (BS)->n_bits ; ++i) \
> + buf[i] = bitmap_is_set((BS), i) ? '1' : '0'; \
> + buf[(BS)->n_bits] = '\0'; \
> + DBUG_PRINT((N), ((FRM), buf)); \
> + } while (0)
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +// Old implementation of do_apply_event()
> +int
> +Old_rows_log_event::do_apply_event(Rows_log_event *ev, const RELAY_LOG_INFO *rli)
> +{
> + DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)");
> + DBUG_ASSERT(ev);
> +
> + THD *thd= ev->thd;
> + int error= 0;
> + uchar const *row_start= ev->m_rows_buf;
> +
> + /*
> + If m_table_id == ~0UL, then we have a dummy event that does not
> + contain any data. In that case, we just remove all tables in the
> + tables_to_lock list, close the thread tables, and return with
> + success.
> + */
> + if (ev->m_table_id == ~0UL)
> + {
> + /*
> + This one is supposed to be set: just an extra check so that
> + nothing strange has happened.
> + */
> + DBUG_ASSERT(ev->get_flags(Rows_log_event::STMT_END_F));
> +
> + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> + close_thread_tables(thd);
> + thd->clear_error();
> + DBUG_RETURN(0);
> + }
> +
> + /*
> + 'thd' has been set by exec_relay_log_event(), just before calling
> + do_apply_event(). We still check here to prevent future coding
> + errors.
> + */
> + DBUG_ASSERT(rli->sql_thd == thd);
> +
> + /*
> + If there is no locks taken, this is the first binrow event seen
> + after the table map events. We should then lock all the tables
> + used in the transaction and proceed with execution of the actual
> + event.
> + */
> + if (!thd->lock)
> + {
> + bool need_reopen= 1; /* To execute the first lap of the loop below */
> +
> + /*
> + lock_tables() reads the contents of thd->lex, so they must be
> + initialized. Contrary to in
> + Table_map_log_event::do_apply_event() we don't call
> + mysql_init_query() as that may reset the binlog format.
> + */
> + lex_start(thd);
> +
> + while ((error= lock_tables(thd, rli->tables_to_lock,
> + rli->tables_to_lock_count, &need_reopen)))
> + {
> + if (!need_reopen)
> + {
> + if (thd->query_error || thd->is_fatal_error)
> + {
> + /*
> + Error reporting borrowed from Query_log_event with many excessive
> + simplifications (we don't honour --slave-skip-errors)
> + */
> + uint actual_error= thd->net.last_errno;
> + rli->report(ERROR_LEVEL, actual_error,
> + "Error '%s' in %s event: when locking tables",
> + (actual_error ? thd->net.last_error :
> + "unexpected success or fatal error"),
> + ev->get_type_str());
> + thd->is_fatal_error= 1;
> + }
> + else
> + {
> + rli->report(ERROR_LEVEL, error,
> + "Error in %s event: when locking tables",
> + ev->get_type_str());
> + }
> + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> + DBUG_RETURN(error);
> + }
> +
> + /*
> + So we need to reopen the tables.
> +
> + We need to flush the pending RBR event, since it keeps a
> + pointer to an open table.
> +
> + ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
> + the pending RBR event and reset the table pointer after the
> + tables has been reopened.
> +
> + NOTE: For this new scheme there should be no pending event:
> + need to add code to assert that is the case.
> + */
> + thd->binlog_flush_pending_rows_event(false);
> + TABLE_LIST *tables= rli->tables_to_lock;
> + close_tables_for_reopen(thd, &tables);
> +
> + uint tables_count= rli->tables_to_lock_count;
> + if ((error= open_tables(thd, &tables, &tables_count, 0)))
> + {
> + if (thd->query_error || thd->is_fatal_error)
> + {
> + /*
> + Error reporting borrowed from Query_log_event with many excessive
> + simplifications (we don't honour --slave-skip-errors)
> + */
> + uint actual_error= thd->net.last_errno;
> + rli->report(ERROR_LEVEL, actual_error,
> + "Error '%s' on reopening tables",
> + (actual_error ? thd->net.last_error :
> + "unexpected success or fatal error"));
> + thd->query_error= 1;
> + }
> + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> + DBUG_RETURN(error);
> + }
> + }
> +
> + /*
> + When the open and locking succeeded, we check all tables to
> + ensure that they still have the correct type.
> +
> + We can use a down cast here since we know that every table added
> + to the tables_to_lock is a RPL_TABLE_LIST.
> + */
> +
> + {
> + RPL_TABLE_LIST *ptr= rli->tables_to_lock;
> + for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
> + {
> + if (ptr->m_tabledef.compatible_with(rli, ptr->table))
> + {
> + mysql_unlock_tables(thd, thd->lock);
> + thd->lock= 0;
> + thd->query_error= 1;
> + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> + DBUG_RETURN(Rows_log_event::ERR_BAD_TABLE_DEF);
> + }
> + }
> + }
> +
> + /*
> + ... and then we add all the tables to the table map and remove
> + them from tables to lock.
> +
> + We also invalidate the query cache for all the tables, since
> + they will now be changed.
> +
> + TODO [/Matz]: Maybe the query cache should not be invalidated
> + here? It might be that a table is not changed, even though it
> + was locked for the statement. We do know that each
> + Rows_log_event contain at least one row, so after processing one
> + Rows_log_event, we can invalidate the query cache for the
> + associated table.
> + */
> + for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
> + {
> +
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.set_table(ptr->table_id,
> ptr->table);
> + }
> +#ifdef HAVE_QUERY_CACHE
> + query_cache.invalidate_locked_for_write(rli->tables_to_lock);
> +#endif
> + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> + }
> +
> + DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count
> == 0);
> +
> + TABLE* table=
> +
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(ev->m_table_id);
> +
> + if (table)
> + {
> + /*
> + table == NULL means that this table should not be replicated
> + (this was set up by Table_map_log_event::do_apply_event()
> + which tested replicate-* rules).
> + */
> +
> + /*
> + It's not needed to set_time() but
> + 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
> + much slave is behind
> + 2) it will be needed when we allow replication from a table with no
> + TIMESTAMP column to a table with one.
> + So we call set_time(), like in SBR. Presently it changes nothing.
> + */
> + thd->set_time((time_t)ev->when);
> + /*
> + There are a few flags that are replicated with each row event.
> + Make sure to set/clear them before executing the main body of
> + the event.
> + */
> + if (ev->get_flags(Rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
> + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
> + else
> + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
> +
> + if (ev->get_flags(Rows_log_event::RELAXED_UNIQUE_CHECKS_F))
> + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
> + else
> + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
> + /* A small test to verify that objects have consistent types */
> + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
> +
> + /*
> + Now we are in a statement and will stay in a statement until we
> + see a STMT_END_F.
> +
> + We set this flag here, before actually applying any rows, in
> + case the SQL thread is stopped and we need to detect that we're
> + inside a statement and halting abruptly might cause problems
> + when restarting.
> + */
> + const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);
> +
> + error= do_before_row_operations(table);
> + while (error == 0 && row_start < ev->m_rows_end)
> + {
> + uchar const *row_end= NULL;
> + if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
> + break; // We should perform the after-row operation even in
> + // the case of error
> +
> + DBUG_ASSERT(row_end != NULL); // cannot happen
> + DBUG_ASSERT(row_end <= ev->m_rows_end);
> +
> + /* in_use can have been set to NULL in close_tables_for_reopen */
> + THD* old_thd= table->in_use;
> + if (!table->in_use)
> + table->in_use= thd;
> + error= do_exec_row(table);
> + table->in_use = old_thd;
> + switch (error)
> + {
> + /* Some recoverable errors */
> + case HA_ERR_RECORD_CHANGED:
> + case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
> + tuple does not exist */
> + error= 0;
> + case 0:
> + break;
> +
> + default:
> + rli->report(ERROR_LEVEL, thd->net.last_errno,
> + "Error in %s event: row application failed",
> + ev->get_type_str());
> + thd->query_error= 1;
> + break;
> + }
> +
> + row_start= row_end;
> + }
> + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
> + const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;);
> + error= do_after_row_operations(table, error);
> + if (!ev->cache_stmt)
> + {
> + DBUG_PRINT("info", ("Marked that we need to keep log"));
> + thd->options|= OPTION_KEEP_LOG;
> + }
> + }
> +
> + if (error)
> + { /* error has occured during the transaction */
> + rli->report(ERROR_LEVEL, thd->net.last_errno,
> + "Error in %s event: error during transaction execution "
> + "on table %s.%s",
> + ev->get_type_str(), table->s->db.str,
> + table->s->table_name.str);
> +
> + /*
> + If one day we honour --skip-slave-errors in row-based replication, and
> + the error should be skipped, then we would clear mappings, rollback,
> + close tables, but the slave SQL thread would not stop and then may
> + assume the mapping is still available, the tables are still open...
> + So then we should clear mappings/rollback/close here only if this is a
> + STMT_END_F.
> + For now we code, knowing that error is not skippable and so slave SQL
> + thread is certainly going to stop.
> + rollback at the caller along with sbr.
> + */
> + thd->reset_current_stmt_binlog_row_based();
> + const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, error);
> + thd->query_error= 1;
> + DBUG_RETURN(error);
> + }
> +
> + /*
> + This code would ideally be placed in do_update_pos() instead, but
> + since we have no access to table there, we do the setting of
> + last_event_start_time here instead.
> + */
> + if (table && (table->s->primary_key == MAX_KEY) &&
> + !ev->cache_stmt &&
> + ev->get_flags(Rows_log_event::STMT_END_F) == Rows_log_event::RLE_NO_FLAGS)
> + {
> + /*
> + ------------ Temporary fix until WL#2975 is implemented ---------
> +
> + This event is not the last one (no STMT_END_F). If we stop now
> + (in case of terminate_slave_thread()), how will we restart? We
> + have to restart from Table_map_log_event, but as this table is
> + not transactional, the rows already inserted will still be
> + present, and idempotency is not guaranteed (no PK) so we risk
> + that repeating leads to double insert. So we desperately try to
> + continue, hope we'll eventually leave this buggy situation (by
> + executing the final Rows_log_event). If we are in a hopeless
> + wait (reached end of last relay log and nothing gets appended
> + there), we timeout after one minute, and notify DBA about the
> + problem. When WL#2975 is implemented, just remove the member
> + st_relay_log_info::last_event_start_time and all its occurences.
> + */
> + const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
> + }
> +
> + DBUG_RETURN(0);
> +}
> +#endif
> +
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> +/*
> + Check if there are more UNIQUE keys after the given key.
> +*/
> +static int
> +last_uniq_key(TABLE *table, uint keyno)
> +{
> + while (++keyno < table->s->keys)
> + if (table->key_info[keyno].flags & HA_NOSAME)
> + return 0;
> + return 1;
> +}
> +
> +/*
> + Compares table->record[0] and table->record[1]
> +
> + Returns TRUE if different.
> +*/
> +static bool record_compare(TABLE *table)
> +{
> + /*
> + Need to set the X bit and the filler bits in both records since
> + there are engines that do not set it correctly.
> +
> + In addition, since MyISAM checks that one hasn't tampered with the
> + record, it is necessary to restore the old bytes into the record
> + after doing the comparison.
> +
> + TODO[record format ndb]: Remove it once NDB returns correct
> + records. Check that the other engines also return correct records.
> + */
> +
> + bool result= FALSE;
> + uchar saved_x[2], saved_filler[2];
> +
> + if (table->s->null_bytes > 0)
> + {
> + for (int i = 0 ; i < 2 ; ++i)
> + {
> + saved_x[i]= table->record[i][0];
> + saved_filler[i]= table->record[i][table->s->null_bytes - 1];
> + table->record[i][0]|= 1U;
> + table->record[i][table->s->null_bytes - 1]|=
> + 256U - (1U << table->s->last_null_bit_pos);
> + }
> + }
> +
> + if (table->s->blob_fields + table->s->varchar_fields == 0)
> + {
> + result= cmp_record(table,record[1]);
> + goto record_compare_exit;
> + }
> +
> + /* Compare null bits */
> + if (memcmp(table->null_flags,
> + table->null_flags+table->s->rec_buff_length,
> + table->s->null_bytes))
> + {
> + result= TRUE; // Diff in NULL value
> + goto record_compare_exit;
> + }
> +
> + /* Compare updated fields */
> + for (Field **ptr=table->field ; *ptr ; ptr++)
> + {
> + if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
> + {
> + result= TRUE;
> + goto record_compare_exit;
> + }
> + }
> +
> +record_compare_exit:
> + /*
> + Restore the saved bytes.
> +
> + TODO[record format ndb]: Remove this code once NDB returns the
> + correct record format.
> + */
> + if (table->s->null_bytes > 0)
> + {
> + for (int i = 0 ; i < 2 ; ++i)
> + {
> + table->record[i][0]= saved_x[i];
> + table->record[i][table->s->null_bytes - 1]= saved_filler[i];
> + }
> + }
> +
> + return result;
> +}
> +
> +/*
> + Copy "extra" columns from record[1] to record[0].
> +
> + Copy the extra fields that are not present on the master but are
> + present on the slave from record[1] to record[0]. This is used
> + after fetching a record that are to be updated, either inside
> + replace_record() or as part of executing an update_row().
> + */
> +static int
> +copy_extra_record_fields(TABLE *table,
> + size_t master_reclength,
> + my_ptrdiff_t master_fields)
> +{
> + DBUG_PRINT("info", ("Copying to 0x%lx "
> + "from field %lu at offset %lu "
> + "to field %d at offset %lu",
> + (long) table->record[0],
> + (ulong) master_fields, (ulong) master_reclength,
> + table->s->fields, table->s->reclength));
> + /*
> + Copying the extra fields of the slave that does not exist on
> + master into record[0] (which are basically the default values).
> + */
> + DBUG_ASSERT(master_reclength <= table->s->reclength);
> + if (master_reclength < table->s->reclength)
> + bmove_align(table->record[0] + master_reclength,
> + table->record[1] + master_reclength,
> + table->s->reclength - master_reclength);
> +
> + /*
> + Bit columns are special. We iterate over all the remaining
> + columns and copy the "extra" bits to the new record. This is
> + not a very good solution: it should be refactored on
> + opportunity.
> +
> + REFACTORING SUGGESTION (Matz). Introduce a member function
> + similar to move_field_offset() called copy_field_offset() to
> + copy field values and implement it for all Field subclasses. Use
> + this function to copy data from the found record to the record
> + that are going to be inserted.
> +
> + The copy_field_offset() function need to be a virtual function,
> + which in this case will prevent copying an entire range of
> + fields efficiently.
> + */
> + {
> + Field **field_ptr= table->field + master_fields;
> + for ( ; *field_ptr ; ++field_ptr)
> + {
> + /*
> + Set the null bit according to the values in record[1]
> + */
> + if ((*field_ptr)->maybe_null() &&
> +
> (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
> + (*field_ptr)->set_null();
> + else
> + (*field_ptr)->set_notnull();
> +
> + /*
> + Do the extra work for special columns.
> + */
> + switch ((*field_ptr)->real_type())
> + {
> + default:
> + /* Nothing to do */
> + break;
> +
> + case MYSQL_TYPE_BIT:
> + Field_bit *f= static_cast<Field_bit*>(*field_ptr);
> + if (f->bit_len > 0)
> + {
> + my_ptrdiff_t const offset= table->record[1] - table->record[0];
> + uchar const bits=
> + get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
> + set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
> + }
> + break;
> + }
> + }
> + }
> + return 0; // All OK
> +}
> +
> +/*
> + Replace the provided record in the database.
> +
> + SYNOPSIS
> + replace_record()
> + thd Thread context for writing the record.
> + table Table to which record should be written.
> + master_reclength
> + Offset to first column that is not present on the master,
> + alternatively the length of the record on the master
> + side.
> +
> + RETURN VALUE
> + Error code on failure, 0 on success.
> +
> + DESCRIPTION
> + Similar to how it is done in mysql_insert(), we first try to do
> + a ha_write_row() and of that fails due to duplicated keys (or
> + indices), we do an ha_update_row() or a ha_delete_row() instead.
> + */
> +static int
> +replace_record(THD *thd, TABLE *table,
> + ulong const master_reclength,
> + uint const master_fields)
> +{
> + DBUG_ENTER("replace_record");
> + DBUG_ASSERT(table != NULL && thd != NULL);
> +
> + int error;
> + int keynum;
> + auto_afree_ptr<char> key(NULL);
> +
> +#ifndef DBUG_OFF
> + DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> + DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> + DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> +#endif
> +
> + while ((error= table->file->ha_write_row(table->record[0])))
> + {
> + if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
> + {
> + table->file->print_error(error, MYF(0)); /* to check at
> exec_relay_log_event */
> + DBUG_RETURN(error);
> + }
> + if ((keynum= table->file->get_dup_key(error)) < 0)
> + {
> + /* We failed to retrieve the duplicate key */
> + DBUG_RETURN(error);
> + }
> +
> + /*
> + We need to retrieve the old row into record[1] to be able to
> + either update or delete the offending record. We either:
> +
> + - use rnd_pos() with a row-id (available as dupp_row) to the
> + offending row, if that is possible (MyISAM and Blackhole), or else
> +
> + - use index_read_idx() with the key that is duplicated, to
> + retrieve the offending row.
> + */
> + if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
> + {
> + error= table->file->rnd_pos(table->record[1],
> table->file->dup_ref);
> + if (error)
> + {
> + table->file->print_error(error, MYF(0));
> + DBUG_RETURN(error);
> + }
> + }
> + else
> + {
> + if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
> + {
> + DBUG_RETURN(my_errno);
> + }
> +
> + if (key.get() == NULL)
> + {
> +
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> + if (key.get() == NULL)
> + DBUG_RETURN(ENOMEM);
> + }
> +
> + key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> 0);
> + error= table->file->index_read_idx(table->record[1], keynum,
> + (const uchar*)key.get(),
> + HA_WHOLE_KEY,
> + HA_READ_KEY_EXACT);
> + if (error)
> + {
> + table->file->print_error(error, MYF(0));
> + DBUG_RETURN(error);
> + }
> + }
> +
> + /*
> + Now, table->record[1] should contain the offending row. That
> + will enable us to update it or, alternatively, delete it (so
> + that we can insert the new row afterwards).
> +
> + First we copy the columns into table->record[0] that are not
> + present on the master from table->record[1], if there are any.
> + */
> + copy_extra_record_fields(table, master_reclength, master_fields);
> +
> + /*
> + REPLACE is defined as either INSERT or DELETE + INSERT. If
> + possible, we can replace it with an UPDATE, but that will not
> + work on InnoDB if FOREIGN KEY checks are necessary.
> +
> + I (Matz) am not sure of the reason for the last_uniq_key()
> + check as, but I'm guessing that it's something along the
> + following lines.
> +
> + Suppose that we got the duplicate key to be a key that is not
> + the last unique key for the table and we perform an update:
> + then there might be another key for which the unique check will
> + fail, so we're better off just deleting the row and inserting
> + the correct row.
> + */
> + if (last_uniq_key(table, keynum) &&
> + !table->file->referenced_by_foreign_key())
> + {
> + error=table->file->ha_update_row(table->record[1],
> + table->record[0]);
> + if (error && error != HA_ERR_RECORD_IS_THE_SAME)
> + table->file->print_error(error, MYF(0));
> + else
> + error= 0;
> + DBUG_RETURN(error);
> + }
> + else
> + {
> + if ((error= table->file->ha_delete_row(table->record[1])))
> + {
> + table->file->print_error(error, MYF(0));
> + DBUG_RETURN(error);
> + }
> + /* Will retry ha_write_row() with the offending row removed. */
> + }
> + }
> +
> + DBUG_RETURN(error);
> +}
> +
> +/**
> + Find the row given by 'key', if the table has keys, or else use a table scan
> + to find (and fetch) the row.
> +
> + If the engine allows random access of the records, a combination of
> + position() and rnd_pos() will be used.
> +
> + @param table Pointer to table to search
> + @param key Pointer to key to use for search, if table has key
> +
> + @pre <code>table->record[0]</code> shall contain the row to locate
> + and <code>key</code> shall contain a key to use for searching, if
> + the engine has a key.
> +
> + @post If the return value is zero, <code>table->record[1]</code>
> + will contain the fetched row and the internal "cursor" will refer to
> + the row. If the return value is non-zero,
> + <code>table->record[1]</code> is undefined. In either case,
> + <code>table->record[0]</code> is undefined.
> +
> + @return Zero if the row was successfully fetched into
> + <code>table->record[1]</code>, error code otherwise.
> + */
> +
> +static int find_and_fetch_row(TABLE *table, uchar *key)
> +{
> + DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
> + DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
> + (long) table, (long) key, (long) table->record[1]));
> +
> + DBUG_ASSERT(table->in_use != NULL);
> +
> + DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +
> + if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> + table->s->primary_key < MAX_KEY)
> + {
> + /*
> + Use a more efficient method to fetch the record given by
> + table->record[0] if the engine allows it. We first compute a
> + row reference using the position() member function (it will be
> + stored in table->file->ref) and the use rnd_pos() to position
> + the "cursor" (i.e., record[0] in this case) at the correct row.
> +
> + TODO: Add a check that the correct record has been fetched by
> + comparing with the original record. Take into account that the
> + record on the master and slave can be of different
> + length. Something along these lines should work:
> +
> + ADD>>> store_record(table,record[1]);
> + int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> + ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
> + table->s->reclength) == 0);
> +
> + */
> + table->file->position(table->record[0]);
> + int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> + /*
> + rnd_pos() returns the record in table->record[0], so we have to
> + move it to table->record[1].
> + */
> + bmove_align(table->record[1], table->record[0],
> table->s->reclength);
> + DBUG_RETURN(error);
> + }
> +
> + /* We need to retrieve all fields */
> + /* TODO: Move this out from this function to main loop */
> + table->use_all_columns();
> +
> + if (table->s->keys > 0)
> + {
> + int error;
> + /* We have a key: search the table using the index */
> + if (!table->file->inited && (error=
> table->file->ha_index_init(0, FALSE)))
> + DBUG_RETURN(error);
> +
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> +#ifndef HAVE_purify
> + DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> + DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +#endif
> +
> + /*
> + We need to set the null bytes to ensure that the filler bit are
> + all set when returning. There are storage engines that just set
> + the necessary bits on the bytes and don't set the filler bits
> + correctly.
> + */
> + my_ptrdiff_t const pos=
> + table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
> + table->record[1][pos]= 0xFF;
> + if ((error= table->file->index_read(table->record[1], key,
> HA_WHOLE_KEY,
> + HA_READ_KEY_EXACT)))
> + {
> + table->file->print_error(error, MYF(0));
> + table->file->ha_index_end();
> + DBUG_RETURN(error);
> + }
> +
> + /*
> + Don't print debug messages when running valgrind since they can
> + trigger false warnings.
> + */
> +#ifndef HAVE_purify
> + DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> + DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +#endif
> + /*
> + Below is a minor "optimization". If the key (i.e., key number
> + 0) has the HA_NOSAME flag set, we know that we have found the
> + correct record (since there can be no duplicates); otherwise, we
> + have to compare the record with the one found to see if it is
> + the correct one.
> +
> + CAVEAT! This behaviour is essential for the replication of,
> + e.g., the mysql.proc table since the correct record *shall* be
> + found using the primary key *only*. There shall be no
> + comparison of non-PK columns to decide if the correct record is
> + found. I can see no scenario where it would be incorrect to
> + chose the row to change only using a PK or an UNNI.
> + */
> + if (table->key_info->flags & HA_NOSAME)
> + {
> + table->file->ha_index_end();
> + DBUG_RETURN(0);
> + }
> +
> + while (record_compare(table))
> + {
> + int error;
> +
> + /*
> + We need to set the null bytes to ensure that the filler bit
> + are all set when returning. There are storage engines that
> + just set the necessary bits on the bytes and don't set the
> + filler bits correctly.
> +
> + TODO[record format ndb]: Remove this code once NDB returns the
> + correct record format.
> + */
> + if (table->s->null_bytes > 0)
> + {
> + table->record[1][table->s->null_bytes - 1]|=
> + 256U - (1U << table->s->last_null_bit_pos);
> + }
> +
> + if ((error= table->file->index_next(table->record[1])))
> + {
> + table->file->print_error(error, MYF(0));
> + table->file->ha_index_end();
> + DBUG_RETURN(error);
> + }
> + }
> +
> + /*
> + Have to restart the scan to be able to fetch the next row.
> + */
> + table->file->ha_index_end();
> + }
> + else
> + {
> + int restart_count= 0; // Number of times scanning has restarted from top
> + int error;
> +
> + /* We don't have a key: search the table using rnd_next() */
> + if ((error= table->file->ha_rnd_init(1)))
> + return error;
> +
> + /* Continue until we find the right record or have made a full loop */
> + do
> + {
> + error= table->file->rnd_next(table->record[1]);
> +
> + DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> + DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
> +
> + switch (error) {
> + case 0:
> + case HA_ERR_RECORD_DELETED:
> + break;
> +
> + case HA_ERR_END_OF_FILE:
> + if (++restart_count < 2)
> + table->file->ha_rnd_init(1);
> + break;
> +
> + default:
> + table->file->print_error(error, MYF(0));
> + DBUG_PRINT("info", ("Record not found"));
> + table->file->ha_rnd_end();
> + DBUG_RETURN(error);
> + }
> + }
> + while (restart_count < 2 && record_compare(table));
> +
> + /*
> + Have to restart the scan to be able to fetch the next row.
> + */
> + DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
> + table->file->ha_rnd_end();
> +
> + DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
> + DBUG_RETURN(error);
> + }
> +
> + DBUG_RETURN(0);
> +}
> +
> +/**********************************************************
> + Row handling primitives for Write_rows_log_event_old
> + **********************************************************/
> +
> +int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> + int error= 0;
> +
> + /*
> + We are using REPLACE semantics and not INSERT IGNORE semantics
> + when writing rows, that is: new rows replace old rows. We need to
> + inform the storage engine that it should use this behaviour.
> + */
> +
> + /* Tell the storage engine that we are using REPLACE semantics. */
> + thd->lex->duplicates= DUP_REPLACE;
> +
> + /*
> + Pretend we're executing a REPLACE command: this is needed for
> + InnoDB and NDB Cluster since they are not (properly) checking the
> + lex->duplicates flag.
> + */
> + thd->lex->sql_command= SQLCOM_REPLACE;
> + /*
> + Do not raise the error flag in case of hitting to an unique attribute
> + */
> + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> + /*
> + NDB specific: update from ndb master wrapped as Write_rows
> + */
> + /*
> + so that the event should be applied to replace slave's row
> + */
> + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> + /*
> + NDB specific: if update from ndb master wrapped as Write_rows
> + does not find the row it's assumed idempotent binlog applying
> + is taking place; don't raise the error.
> + */
> + table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> + /*
> + TODO: the cluster team (Tomas?) says that it's better if the engine knows
> + how many rows are going to be inserted, then it can allocate needed memory
> + from the start.
> + */
> + table->file->ha_start_bulk_insert(0);
> + /*
> + We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
> + any TIMESTAMP column with data from the row but instead will use
> + the event's current time.
> + As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
> + columns, we know that all TIMESTAMP columns on slave will receive explicit
> + data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
> + When we allow a table without TIMESTAMP to be replicated to a table having
> + more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
> + column to be replicated into a BIGINT column and the slave's table has a
> + TIMESTAMP column, then the slave's TIMESTAMP column will take its value
> + from set_time() which we called earlier (consistent with SBR). And then in
> + some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
> + analyze if explicit data is provided for slave's TIMESTAMP columns).
> + */
> + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> + return error;
> +}
> +
> +int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> + int local_error= 0;
> + table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> + /*
> + reseting the extra with
> + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
> + fires bug#27077
> + todo: explain or fix
> + */
> + if ((local_error= table->file->ha_end_bulk_insert()))
> + {
> + table->file->print_error(local_error, MYF(0));
> + }
> + return error? error : local_error;
> +}
> +
> int
> Write_rows_log_event_old::do_prepare_row(THD *thd,
> RELAY_LOG_INFO const *rli,
> @@ -23,6 +975,65 @@ Write_rows_log_event_old::do_prepare_row
> return error;
> }
>
> +int Write_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> + DBUG_ASSERT(table != NULL);
> + int error= replace_record(thd, table, m_master_reclength, m_width);
> + return error;
> +}
> +
> +/**********************************************************
> + Row handling primitives for Delete_rows_log_event_old
> + **********************************************************/
> +
> +int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> + DBUG_ASSERT(m_memory == NULL);
> +
> + if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> + table->s->primary_key < MAX_KEY)
> + {
> + /*
> + We don't need to allocate any memory for m_after_image and
> + m_key since they are not used.
> + */
> + return 0;
> + }
> +
> + int error= 0;
> +
> + if (table->s->keys > 0)
> + {
> + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> + &m_after_image,
> + (uint) table->s->reclength,
> + &m_key,
> + (uint) table->key_info->key_length,
> + NullS);
> + }
> + else
> + {
> + m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> + m_memory= (uchar*)m_after_image;
> + m_key= NULL;
> + }
> + if (!m_memory)
> + return HA_ERR_OUT_OF_MEM;
> +
> + return error;
> +}
> +
> +int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> + /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> + table->file->ha_index_or_rnd_end();
> + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
> + m_memory= NULL;
> + m_after_image= NULL;
> + m_key= NULL;
> +
> + return error;
> +}
>
> int
> Delete_rows_log_event_old::do_prepare_row(THD *thd,
> @@ -57,6 +1068,67 @@ Delete_rows_log_event_old::do_prepare_ro
> return error;
> }
>
> +int Delete_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> + int error;
> + DBUG_ASSERT(table != NULL);
> +
> + if (!(error= ::find_and_fetch_row(table, m_key)))
> + {
> + /*
> + Now we should have the right row to delete. We are using
> + record[0] since it is guaranteed to point to a record with the
> + correct value.
> + */
> + error= table->file->ha_delete_row(table->record[0]);
> + }
> + return error;
> +}
> +
> +/**********************************************************
> + Row handling primitives for Update_rows_log_event_old
> + **********************************************************/
> +
> +int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> + DBUG_ASSERT(m_memory == NULL);
> +
> + int error= 0;
> +
> + if (table->s->keys > 0)
> + {
> + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> + &m_after_image,
> + (uint) table->s->reclength,
> + &m_key,
> + (uint) table->key_info->key_length,
> + NullS);
> + }
> + else
> + {
> + m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> + m_memory= m_after_image;
> + m_key= NULL;
> + }
> + if (!m_memory)
> + return HA_ERR_OUT_OF_MEM;
> +
> + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +
> + return error;
> +}
> +
> +int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> + /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> + table->file->ha_index_or_rnd_end();
> + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
> + m_memory= NULL;
> + m_after_image= NULL;
> + m_key= NULL;
> +
> + return error;
> +}
>
> int Update_rows_log_event_old::do_prepare_row(THD *thd,
> RELAY_LOG_INFO const *rli,
> @@ -97,6 +1169,43 @@ int Update_rows_log_event_old::do_prepar
>
> key_copy(m_key, table->record[0], key_info, 0);
> }
> +
> + return error;
> +}
> +
> +int Update_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> + DBUG_ASSERT(table != NULL);
> +
> + int error= ::find_and_fetch_row(table, m_key);
> + if (error)
> + return error;
> +
> + /*
> + We have to ensure that the new record (i.e., the after image) is
> + in record[0] and the old record (i.e., the before image) is in
> + record[1]. This since some storage engines require this (for
> + example, the partition engine).
> +
> + Since find_and_fetch_row() puts the fetched record (i.e., the old
> + record) in record[1], we can keep it there. We put the new record
> + (i.e., the after image) into record[0], and copy the fields that
> + are on the slave (i.e., in record[1]) into record[0], effectively
> + overwriting the default values that where put there by the
> + unpack_row() function.
> + */
> + bmove_align(table->record[0], m_after_image, table->s->reclength);
> + copy_extra_record_fields(table, m_master_reclength, m_width);
> +
> + /*
> + Now we have the right row to update. The old row (the one we're
> + looking for) is in record[1] and the new row has is in record[0].
> + We also have copied the original values already in the slave's
> + database into the after image delivered from the master.
> + */
> + error= table->file->ha_update_row(table->record[1],
> table->record[0]);
> + if (error == HA_ERR_RECORD_IS_THE_SAME)
> + error= 0;
>
> return error;
> }
> diff -Nrup a/sql/log_event_old.h b/sql/log_event_old.h
> --- a/sql/log_event_old.h 2007-05-10 11:59:28 +02:00
> +++ b/sql/log_event_old.h 2007-07-06 16:58:06 +02:00
> @@ -20,9 +20,90 @@
> Need to include this file at the proper position of log_event.h
> */
>
> +
> +class Old_rows_log_event
> +{
> + public:
> +
> + virtual ~Old_rows_log_event() {}
> +
> + protected:
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> + virtual int do_apply_event(Rows_log_event*,const RELAY_LOG_INFO*);
> +
> + /*
> + Primitive to prepare for a sequence of row executions.
> +
> + DESCRIPTION
> +
> + Before doing a sequence of do_prepare_row() and do_exec_row()
> + calls, this member function should be called to prepare for the
> + entire sequence. Typically, this member function will allocate
> + space for any buffers that are needed for the two member
> + functions mentioned above.
> +
> + RETURN VALUE
> +
> + The member function will return 0 if all went OK, or a non-zero
> + error code otherwise.
> + */
> + virtual int do_before_row_operations(TABLE *table) = 0;
> +
> + /*
> + Primitive to clean up after a sequence of row executions.
> +
> + DESCRIPTION
> +
> + After doing a sequence of do_prepare_row() and do_exec_row(),
> + this member function should be called to clean up and release
> + any allocated buffers.
> + */
> + virtual int do_after_row_operations(TABLE *table, int error) = 0;
> +
> + /*
> + Primitive to prepare for handling one row in a row-level event.
> +
> + DESCRIPTION
> +
> + The member function prepares for execution of operations needed for one
> + row in a row-level event by reading up data from the buffer containing
> + the row. No specific interpretation of the data is normally done here,
> + since SQL thread specific data is not available: that data is made
> + available for the do_exec function.
> +
> + A pointer to the start of the next row, or NULL if the preparation
> + failed. Currently, preparation cannot fail, but don't rely on this
> + behavior.
> +
> + RETURN VALUE
> + Error code, if something went wrong, 0 otherwise.
> + */
> + virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> + uchar const *row_start,
> + uchar const **row_end) = 0;
> +
> + /*
> + Primitive to do the actual execution necessary for a row.
>
> -class Write_rows_log_event_old : public Write_rows_log_event
> + DESCRIPTION
> + The member function will do the actual execution needed to handle a row.
> +
> + RETURN VALUE
> + 0 if execution succeeded, 1 if execution failed.
> +
> + */
> + virtual int do_exec_row(TABLE *table) = 0;
> +
> +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +};
> +
> +
> +class Write_rows_log_event_old
> + : public Write_rows_log_event, public Old_rows_log_event
> {
> +
> public:
> enum
> {
> @@ -49,14 +130,26 @@ private:
> virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> + // use old definition of do_apply_event()
> + virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> + { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> + // primitives for old version of do_apply_event()
> + virtual int do_before_row_operations(TABLE *table);
> + virtual int do_after_row_operations(TABLE *table, int error);
> virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> uchar const *row_start, uchar const **row_end);
> + virtual int do_exec_row(TABLE *table);
> +
> #endif
> };
>
>
> -class Update_rows_log_event_old : public Update_rows_log_event
> +class Update_rows_log_event_old
> + : public Update_rows_log_event, public Old_rows_log_event
> {
> + uchar *m_after_image, *m_memory;
> +
> public:
> enum
> {
> @@ -67,14 +160,16 @@ public:
> #if !defined(MYSQL_CLIENT)
> Update_rows_log_event_old(THD *thd, TABLE *table, ulong table_id,
> MY_BITMAP const *cols, bool is_transactional)
> - : Update_rows_log_event(thd, table, table_id, cols, is_transactional)
> + : Update_rows_log_event(thd, table, table_id, cols, is_transactional),
> + m_after_image(NULL), m_memory(NULL)
> {
> }
> #endif
> #if defined(HAVE_REPLICATION)
> Update_rows_log_event_old(const char *buf, uint event_len,
> const Format_description_log_event *descr)
> - : Update_rows_log_event(buf, event_len, descr)
> + : Update_rows_log_event(buf, event_len, descr),
> + m_after_image(NULL), m_memory(NULL)
> {
> }
> #endif
> @@ -83,14 +178,25 @@ private:
> virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> + // use old definition of do_apply_event()
> + virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> + { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> + // primitives for old version of do_apply_event()
> + virtual int do_before_row_operations(TABLE *table);
> + virtual int do_after_row_operations(TABLE *table, int error);
> virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> uchar const *row_start, uchar const **row_end);
> + virtual int do_exec_row(TABLE *table);
> #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> };
>
>
> -class Delete_rows_log_event_old : public Delete_rows_log_event
> +class Delete_rows_log_event_old
> + : public Delete_rows_log_event, public Old_rows_log_event
> {
> + uchar *m_after_image, *m_memory;
> +
> public:
> enum
> {
> @@ -101,14 +207,16 @@ public:
> #if !defined(MYSQL_CLIENT)
> Delete_rows_log_event_old(THD *thd, TABLE *table, ulong table_id,
> MY_BITMAP const *cols, bool is_transactional)
> - : Delete_rows_log_event(thd, table, table_id, cols, is_transactional)
> + : Delete_rows_log_event(thd, table, table_id, cols, is_transactional),
> + m_after_image(NULL), m_memory(NULL)
> {
> }
> #endif
> #if defined(HAVE_REPLICATION)
> Delete_rows_log_event_old(const char *buf, uint event_len,
> const Format_description_log_event *descr)
> - : Delete_rows_log_event(buf, event_len, descr)
> + : Delete_rows_log_event(buf, event_len, descr),
> + m_after_image(NULL), m_memory(NULL)
> {
> }
> #endif
> @@ -117,8 +225,16 @@ private:
> virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> + // use old definition of do_apply_event()
> + virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> + { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> + // primitives for old version of do_apply_event()
> + virtual int do_before_row_operations(TABLE *table);
> + virtual int do_after_row_operations(TABLE *table, int error);
> virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> uchar const *row_start, uchar const **row_end);
> + virtual int do_exec_row(TABLE *table);
> #endif
> };
>
> diff -Nrup a/sql/rpl_record.cc b/sql/rpl_record.cc
> --- a/sql/rpl_record.cc 2007-06-11 22:15:28 +02:00
> +++ b/sql/rpl_record.cc 2007-07-06 16:58:06 +02:00
> @@ -133,26 +133,21 @@ pack_row(TABLE *table, MY_BITMAP const*
> the various member functions of Field and subclasses expect to
> write.
>
> - The row is assumed to only consist of the fields for which the
> - bitset represented by @c arr and @c bits; the other parts of the
> - record are left alone.
> + The row is assumed to only consist of the fields for which the corresponding
> + bit in bitset @c cols is set; the other parts of the record are left alone.
>
> At most @c colcnt columns are read: if the table is larger than
> that, the remaining fields are not filled in.
>
> - @param rli Relay log info
> @param table Table to unpack into
> @param colcnt Number of columns to read from record
> - @param row Packed row data
> - @param cols Pointer to columns data to fill in
> + @param row_data Packed row data
> + @param cols Pointer to bitset describing columns to fill in
> @param row_end Pointer to variable that will hold the value of the
> one-after-end position for the row
> @param master_reclength
> Pointer to variable that will be set to the length of the
> record on the master side
> - @param rw_set Pointer to bitmap that holds either the read_set or the
> - write_set of the table
> -
>
> @retval 0 No error
>
> @@ -163,11 +158,9 @@ pack_row(TABLE *table, MY_BITMAP const*
> */
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> int
> -unpack_row(RELAY_LOG_INFO const *rli,
> - TABLE *table, uint const colcnt,
> +unpack_row(TABLE *table, uint const colcnt,
> uchar const *const row_data, MY_BITMAP const *cols,
> - uchar const **const row_end, ulong *const master_reclength,
> - MY_BITMAP* const rw_set, Log_event_type const event_type)
> + uchar const **const row_end, ulong *const master_reclength)
> {
> DBUG_ENTER("unpack_row");
> DBUG_ASSERT(row_data);
> @@ -177,10 +170,6 @@ unpack_row(RELAY_LOG_INFO const *rli,
> uchar const *null_ptr= row_data;
> uchar const *pack_ptr= row_data + master_null_byte_count;
>
> - bitmap_clear_all(rw_set);
> -
> - empty_record(table);
> -
> Field **const begin_ptr = table->field;
> Field **field_ptr;
> Field **const end_ptr= begin_ptr + colcnt;
> @@ -225,7 +214,6 @@ unpack_row(RELAY_LOG_INFO const *rli,
> pack_ptr= f->unpack(f->ptr, pack_ptr);
> }
>
> - bitmap_set_bit(rw_set, f->field_index);
> null_mask <<= 1;
> }
> }
> @@ -244,30 +232,46 @@ unpack_row(RELAY_LOG_INFO const *rli,
> else
> *master_reclength = table->s->reclength;
> }
> +
> + DBUG_RETURN(error);
> +}
> +
> +/**
> + Fills table->record[0] with default values.
> +
> + First @c empty_record() is called and then, additionally, fields are
> + initialized explicitly with a call to @c set_default().
> +
> + For optimization reasons, the explicit initialization can be skipped for
> + first @c skip fields. This is useful if later we are going to fill these
> + fields from other source (e.g. from a Rows replication event).
> +
> + If @c check is true, fields are explicitly initialized only if they have
> + default value or can be NULL. Otherwise error is reported.
> + */
> +int prepare_record(const Slave_reporting_capability *const log,
> + TABLE *const table,
> + const uint skip, const bool check)
> +{
> + DBUG_ENTER("prepare_record");
>
> - /*
> - Set properties for remaining columns, if there are any. We let the
> - corresponding bit in the write_set be set, to write the value if
> - it was not there already. We iterate over all remaining columns,
> - even if there were an error, to get as many error messages as
> - possible. We are still able to return a pointer to the next row,
> - so redo that.
> + int error= 0;
> + empty_record(table);
>
> - This generation of error messages is only relevant when inserting
> - new rows.
> - */
> - for ( ; *field_ptr ; ++field_ptr)
> + /* Explicit initialization of fields */
> +
> + for (Field **field_ptr= table->field+skip ; *field_ptr ; ++field_ptr)
> {
> uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG;
> Field *const f= *field_ptr;
>
> - if (event_type == WRITE_ROWS_EVENT &&
> - ((*field_ptr)->flags & mask) == mask)
> + if (check && ((f->flags & mask) == mask))
> {
> - rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
> + DBUG_ASSERT(log);
> + log->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
> "Field `%s` of table `%s`.`%s` "
> "has no default value and cannot be NULL",
> - (*field_ptr)->field_name, table->s->db.str,
> + f->field_name, table->s->db.str,
> table->s->table_name.str);
> error = ER_NO_DEFAULT_FOR_FIELD;
> }
> diff -Nrup a/sql/rpl_record.h b/sql/rpl_record.h
> --- a/sql/rpl_record.h 2007-05-10 11:59:28 +02:00
> +++ b/sql/rpl_record.h 2007-07-06 16:58:06 +02:00
> @@ -16,18 +16,21 @@
> #ifndef RPL_RECORD_H
> #define RPL_RECORD_H
>
> +#include <rpl_reporting.h>
> +
> #if !defined(MYSQL_CLIENT)
> size_t pack_row(TABLE* table, MY_BITMAP const* cols,
> uchar *row_data, const uchar *data);
> #endif
>
> #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int unpack_row(RELAY_LOG_INFO const *rli,
> - TABLE *table, uint const colcnt,
> +int unpack_row(TABLE *table, uint const colcnt,
> uchar const *const row_data, MY_BITMAP const *cols,
> - uchar const **const row_end, ulong *const master_reclength,
> - MY_BITMAP* const rw_set,
> - Log_event_type const event_type);
> + uchar const **const row_end, ulong *const master_reclength);
> +
> +// Fill table's record[0] with default values.
> +int prepare_record(const Slave_reporting_capability *const, TABLE *const,
> + const uint =0, const bool =FALSE);
> #endif
>
> #endif
> diff -Nrup a/sql/rpl_utility.h b/sql/rpl_utility.h
> --- a/sql/rpl_utility.h 2007-05-24 00:39:21 +02:00
> +++ b/sql/rpl_utility.h 2007-07-06 16:58:06 +02:00
> @@ -134,4 +134,35 @@ struct RPL_TABLE_LIST
> table_def m_tabledef;
> };
>
> +
> +/* Anonymous namespace for template functions/classes */
> +namespace {
> +
> + /*
> + Smart pointer that will automatically call my_afree (a macro) when
> + the pointer goes out of scope. This is used so that I do not have
> + to remember to call my_afree() before each return. There is no
> + overhead associated with this, since all functions are inline.
> +
> + I (Matz) would prefer to use the free function as a template
> + parameter, but that is not possible when the "function" is a
> + macro.
> + */
> + template <class Obj>
> + class auto_afree_ptr
> + {
> + Obj* m_ptr;
> + public:
> + auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
> + ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
> + void assign(Obj* ptr) {
> + /* Only to be called if it hasn't been given a value before. */
> + DBUG_ASSERT(m_ptr == NULL);
> + m_ptr= ptr;
> + }
> + Obj* get() { return m_ptr; }
> + };
> +
> +}
> +
> #endif /* RPL_UTILITY_H */
>
> --
> MySQL Code Commits Mailing List
> For list archives: http://lists.mysql.com/commits
> To unsubscribe: http://lists.mysql.com/commits?unsub=1
>