List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:July 10 2007 12:50pm
Subject:Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842
View as plain text  
Rafal, hello.

I think that's a good work which you have done.
The code becomes more readable and hopefully optimal.
I am making some notes down the lines.

Thanks for addressing the issues I started to ask before this current
patch. The problem is described pretty good imo. Maybe to add couple
of things.

I have one concern about extending bitmap library with a function that
does not exist and needed. I strongly recommend to that instead of
to stop at one inch from the entry to the perfect code paradise :)
Other suggestions are left to your disposal.


Once more enjoy your vacation!

cheers,

Andrei

> ChangeSet@stripped, 2007-07-06 16:58:18+02:00, rafal@quant.(none) +8 -0
>   BUG#21842 (Cluster fails to replicate to innodb or myisam with err 134 
>   using TPC-B):
>      
>   Problem: A RBR event can contain incomplete row data (only key value and
>   fields which have been changed). In that case, when the row is unpacked
>   into record and written to a table, the missing fields get incorrect NULL

do not they get generally defaults? Compare with
" The 
+  missing columns are filled with default values".

At any rate that does not relate to ndb to ndb replication. Please
leave a note.

>   values leading to master-slave inconsistency.

>      

I'd explicitly added that the implemented solution performs the task
belonging to the engine, so it's rather a workaround needed until all
engines will respect table->write_set flags to process only the selected
fields.

>   Solution: Use values found in slave's table for columns which are not given
>   in the rows event. The code for writing a single row uses the following 
>   algorithm: 
>   
>   1. unpack row_data into table->record[0],
>   2. try to insert record,
>   3. if duplicate record found, fetch it into table->record[1],
>   4. unpack row_data into table->record[1],
>   5. write table->record[1] into the table.
>     


>   Where row_data is the row as stored in the data area of a rows event. 
>   Thus:
>     
>   a) unpacking of row_data happens at the time when row is written into 
>      a table,
>     
>   b) when unpacking (in step 4), only columns present in row_data are 
>      overwritten - all other columns remain as they were found in the table.
>      
>   Since all data needed for the above algorithm is stored inside 
>   Rows_log_event class, functions which locate and write rows are turned 
>   into methods of that class.
>   
>   replace_record()     -> Rows_log_event::write_row()
>   find_and_fetch_row() -> Rows_log_event::find_and_fetch_row()

That's logical. I suggest to name  Rows_log_event::fetch_row() instead. After all
what's needed is what's fetched.

Actually you eliminated  do_prepare_row's  to hand its duties to
unpack_current_row(), i.e in some sense
     do_prepare_row()    -> unpack_current_row()
>     
>   Both methods take row data from event's data buffer - the row being 
>   processed is pointed by m_curr_row. They unpack the data as needed into 
>   table's record buffers record[0] or record[1]. When row is unpacked, 
>   m_curr_row_end is set to point at next row in the data buffer.
>   

that's okay. \remark{I was thinking on m_next_row instead (since
m_curr_row_end becomes known only at the end of the current row
unpacking) but it does not really matter}

>   Other changes introduced in this changeset:
>   
>   - Change signature of unpack_row(): don't report errors and don't
>     setup table's rw_set here. Errors can happen only when setting default 
>     values in prepare_record() function and are detected there.

Would you agree with names wearing more info on the func does like
unpack_row_defaults, row_extra_fields_set_defaults() instead?

I wanted to add also "unpack_end" as a candidate but noticed
`prepare_record()' is called before `unpack_current_row()'.
And then there must be some extra work to fill the meaningful fields'
values with defaults. E.g


+  if ((error= prepare_record(log, m_table,m_width,
+                             TRUE /* check if columns have def. values */)))
+    goto finish; 
+  
+  /* unpack row into m_table->record[0] */
+  error= unpack_current_row();

If that observation is correct could we avoid the extra work?
Why can not prepare_record's job be done after unpack_current_row? Or
rather why not to call the func at the end of unpack_current_row
in accord with the existing logic?

>    
>   - In Rows_log_event and derived classes, don't pass arguments to
>     the execution primitives (do_...() member functions) but use class
>     members instead.
>   
>   - The changes seem to fix rpl_ndb_extraCol test. Before it produced
>     results different than the same test run on other storage engines.
>     Now the results are identical. The result file is updated
>     accordingly.
>
>   mysql-test/r/rpl_ndb_extraCol.result@stripped, 2007-07-06 16:58:05+02:00,
> rafal@quant.(none) +27 -27
>     Correct results. Now the results of the NDB test are the same as for 
>     other engines.
>
>   sql/log_event.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +534 -572
>     - Initialization of new Rows_log_event members.
>     - Fixing some typos in documentation.
>     
>     In Rows_log_event::do_apply_event:
>     - Set COMPLETE_ROWS_F flag if all colums are present in the row

>     - Move initialization of tables write/read sets outside the rows
>       processing loop (and out of unpack_row() function).

Mild suggestion.
There is a thing we could optimize perhaps current work.
m_cols member of Rows_log_event is set at constructing the event. The
only reason for the member is to propagate the bits to the final
table->write_set destination. So that could be done in do_exec_row()
where I have yet another stronger suggestion.


>     - Remove calls to do_prepare_row() - no longer needed.
>     - Add code managing m_curr_row and m_curr_row_end pointers.
>     
>     - Change signatures of row processing methods of Rows_log_event and it
>       descendants - now most arguments are taken from class members.
>     - Remove do_prepare_row() methods which are no longer used.
>     - The auto_afree_ptr template is moved to rpl_utility.h (so that it can
>       be used in log_event_old.cc).
>     - Removed copy_extra_fields() function - no longer used.
>     - Change replace_record() function into Rows_log_event::write_row()
>       method.
>     - Change find_and_fetch_row() function into method of Rows_log_event.
>     - Don't allocate memory for m_after_image buffer which is not
>       used anymore.

Okay.

>     - Change implementation of Update_rows_log_event::do_exec_row() to use 
>       unpack_current_row() method to get before/after images.
>
>   sql/log_event.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +50 -53

>     - Add new COMPLETE_ROWS_F flag in Rows_log_event.

New Rows_log_event flag COMPLETE_ROWS_F is set locally on slave.  Put
it into the common with RELAXED_UNIQUE_CHECKS_F etc reduces the space
for possible new future replicated flags.
Why not to make a member (a struct groupping slave side members) for
local (slave side) purposes instead? 

>     - Add Rows_log_event members describing the row being processed.
>     - Add a pointer to key buffer which is used in derived classes.
>     - Add unpack_current_row() method.
>     - Change signatures of do_...() methods (replace method arguments by
>       class members).
>     - Remove do_prepare_row() method which is no longer used.
>     - Update method documentation.
>
>   sql/log_event_old.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +1109 -0
>     Move here old implementation of Rows_log_event::do_apply_event() and 
>     helper methods.

Suggest to call them more precisely like

log_event_pre_canonic_format.cc,h

>
>   sql/log_event_old.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +123 -7
>     - Define new class Old_rows_log_event encapsulating old version of
>       Rows_log_event::do_apply_event() and the helper methods.
>     - Add the Old_rows_log_event class as a base for *_old versions of RBR event
>       classes, ensure that the old version of do_apply_event() is called.
>     - For *_old classes, declare the helper methods used in the old version of
>       do_apply_event().
>
>   sql/rpl_record.cc@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +37 -33
>     - Make unpack_row non-destructive for columns not present in the row.
>     - Don't fill read/write set here as it is done outside these
>     functions.

also optimization, okay.

>     - Move initialization of a record with default values to a separate
>       function prepare_record().

the better name and the order of invocation suggested above.


>
>   sql/rpl_record.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +8 -5
>     - Change signature of unpack_row().
>     - Declare function prepare_record().
>
>   sql/rpl_utility.h@stripped, 2007-07-06 16:58:06+02:00, rafal@quant.(none) +31 -0
>     Move auto_afree_ptr template here so that it can be re-used (currently
>     in log_event.cc and log_event_old.cc).
>
> diff -Nrup a/mysql-test/r/rpl_ndb_extraCol.result
> b/mysql-test/r/rpl_ndb_extraCol.result
> --- a/mysql-test/r/rpl_ndb_extraCol.result	2007-06-11 22:15:19 +02:00
> +++ b/mysql-test/r/rpl_ndb_extraCol.result	2007-07-06 16:58:05 +02:00
> @@ -28,9 +28,9 @@ a	b	c
>  *** Select from slave ***
>  SELECT * FROM t1 ORDER BY a;
>  a	b	c	d	e
> -1	2	TEXAS	NULL	NULL
> -2	1	AUSTIN	NULL	NULL
> -3	4	QA	NULL	NULL
> +1	2	TEXAS	2	TEST
> +2	1	AUSTIN	2	TEST
> +3	4	QA	2	TEST
>  *** Drop t1  ***
>  DROP TABLE t1;
>  *** Create t3 on slave  ***
> @@ -309,9 +309,9 @@ a	b	c
>  *** Select from slave ***
>  SELECT * FROM t7 ORDER BY a;
>  a	b	c	d	e
> -1	b1b1	Kyle	NULL	NULL
> -2	b1b1	JOE	NULL	NULL
> -3	b1b1	QA	NULL	NULL
> +1	b1b1	Kyle	0000-00-00 00:00:00	Extra Column Testing
> +2	b1b1	JOE	0000-00-00 00:00:00	Extra Column Testing
> +3	b1b1	QA	0000-00-00 00:00:00	Extra Column Testing
>  *** Drop t7  ***
>  DROP TABLE t7;
>  *** Create t8 on slave  ***
> @@ -477,9 +477,9 @@ a	b	c
>  *** Select on Slave ***
>  SELECT * FROM t12 ORDER BY a;
>  a	b	f	c	e
> -1	b1b1b1b1b1b1b1b1	Kyle	NULL	NULL
> -2	b1b1b1b1b1b1b1b1	JOE	NULL	NULL
> -3	b1b1b1b1b1b1b1b1	QA	NULL	NULL
> +1	b1b1b1b1b1b1b1b1	Kyle	test	1
> +2	b1b1b1b1b1b1b1b1	JOE	test	1
> +3	b1b1b1b1b1b1b1b1	QA	test	1
>  *** Drop t12  ***
>  DROP TABLE t12;
>  **** Extra Colums End ****
> @@ -509,9 +509,9 @@ a	b	c
>  *** Select on Slave ****
>  SELECT * FROM t13 ORDER BY a;
>  a	b	c	d	e
> -1	b1b1b1b1b1b1b1b1	Kyle	NULL	CURRENT_TIMESTAMP
> -2	b1b1b1b1b1b1b1b1	JOE	NULL	CURRENT_TIMESTAMP
> -3	b1b1b1b1b1b1b1b1	QA	NULL	CURRENT_TIMESTAMP
> +1	b1b1b1b1b1b1b1b1	Kyle	1	CURRENT_TIMESTAMP
> +2	b1b1b1b1b1b1b1b1	JOE	1	CURRENT_TIMESTAMP
> +3	b1b1b1b1b1b1b1b1	QA	1	CURRENT_TIMESTAMP
>  *** Drop t13  ***
>  DROP TABLE t13;
>  *** 22117 END *** 
> @@ -545,9 +545,9 @@ c1	c2	c3	c4	c5
>  *** Select on Slave ****
>  SELECT * FROM t14 ORDER BY c1;
>  c1	c2	c3	c4	c5	c6	c7
> -1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	NULL	CURRENT_TIMESTAMP
> -2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	NULL	CURRENT_TIMESTAMP
> -3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	NULL	CURRENT_TIMESTAMP
> +1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	1	CURRENT_TIMESTAMP
> +2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	1	CURRENT_TIMESTAMP
> +3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	1	CURRENT_TIMESTAMP
>  *** connect to master and drop columns ***
>  ALTER TABLE t14 DROP COLUMN c2;
>  ALTER TABLE t14 DROP COLUMN c4;
> @@ -560,9 +560,9 @@ c1	c3	c5
>  *** Select from Slave ***
>  SELECT * FROM t14 ORDER BY c1;
>  c1	c3	c5	c6	c7
> -1	Replication Testing Extra Col	Kyle	NULL	CURRENT_TIMESTAMP
> -2	This Test Should work	JOE	NULL	CURRENT_TIMESTAMP
> -3	If is does not, I will open a bug	QA	NULL	CURRENT_TIMESTAMP
> +1	Replication Testing Extra Col	Kyle	1	CURRENT_TIMESTAMP
> +2	This Test Should work	JOE	1	CURRENT_TIMESTAMP
> +3	If is does not, I will open a bug	QA	1	CURRENT_TIMESTAMP
>  *** Drop t14  ***
>  DROP TABLE t14;
>  *** Create t15 on slave  ***
> @@ -593,9 +593,9 @@ c1	c2	c3	c4	c5
>  *** Select on Slave ****
>  SELECT * FROM t15 ORDER BY c1;
>  c1	c2	c3	c4	c5	c6	c7
> -1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	NULL	CURRENT_TIMESTAMP
> -2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	NULL	CURRENT_TIMESTAMP
> -3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	NULL	CURRENT_TIMESTAMP
> +1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	1	CURRENT_TIMESTAMP
> +2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	1	CURRENT_TIMESTAMP
> +3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	1	CURRENT_TIMESTAMP
>  *** Add column on master that is a Extra on Slave ***
>  ALTER TABLE t15 ADD COLUMN c6 INT AFTER c5;
>  ********************************************
> @@ -653,9 +653,9 @@ c1	c2	c3	c4	c5	c6
>  *** Try to select from slave ****
>  SELECT * FROM t15 ORDER BY c1;
>  c1	c2	c3	c4	c5	c6	c7
> -1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	NULL	CURRENT_TIMESTAMP
> -2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	NULL	CURRENT_TIMESTAMP
> -3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	NULL	CURRENT_TIMESTAMP
> +1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	1	CURRENT_TIMESTAMP
> +2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	1	CURRENT_TIMESTAMP
> +3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	1	CURRENT_TIMESTAMP
>  5	2.00	Replication Testing	b1b1b1b1b1b1b1b1	Buda	2	CURRENT_TIMESTAMP
>  *** DROP TABLE t15 ***
>  DROP TABLE t15;
> @@ -687,9 +687,9 @@ c1	c2	c3	c4	c5
>  *** Select on Slave ****
>  SELECT * FROM t16 ORDER BY c1;
>  c1	c2	c3	c4	c5	c6	c7
> -1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	NULL	CURRENT_TIMESTAMP
> -2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	NULL	CURRENT_TIMESTAMP
> -3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	NULL	CURRENT_TIMESTAMP
> +1	1.00	Replication Testing Extra Col	b1b1b1b1b1b1b1b1	Kyle	1	CURRENT_TIMESTAMP
> +2	2.00	This Test Should work	b1b1b1b1b1b1b1b1	JOE	1	CURRENT_TIMESTAMP
> +3	3.00	If is does not, I will open a bug	b1b1b1b1b1b1b1b1	QA	1	CURRENT_TIMESTAMP
>  *** Add Partition on master ***
>  ALTER TABLE t16 PARTITION BY KEY(c1) PARTITIONS 4;
>  INSERT INTO t16 () VALUES(4,1.00,'Replication Rocks',@b1,'Omer');
> diff -Nrup a/sql/log_event.cc b/sql/log_event.cc
> --- a/sql/log_event.cc	2007-06-28 15:07:53 +02:00
> +++ b/sql/log_event.cc	2007-07-06 16:58:06 +02:00
> @@ -5635,13 +5635,14 @@ Rows_log_event::Rows_log_event(THD *thd_
>      m_table_id(tid),
>      m_width(tbl_arg ? tbl_arg->s->fields : 1),
>      m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
> -    m_flags(0)
> +    m_curr_row(NULL), m_curr_row_end(NULL),
> +    m_flags(0), m_key(NULL)
>  {
>    /*
>      We allow a special form of dummy event when the table, and cols
>      are null and the table id is ~0UL.  This is a temporary
>      solution, to be able to terminate a started statement in the
> -    binary log: the extreneous events will be removed in the future.
> +    binary log: the extraneous events will be removed in the future.
>     */
>    DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL ||
>                !tbl_arg && !cols && tid == ~0UL);
> @@ -5650,7 +5651,7 @@ Rows_log_event::Rows_log_event(THD *thd_
>        set_flags(NO_FOREIGN_KEY_CHECKS_F);
>    if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
>        set_flags(RELAXED_UNIQUE_CHECKS_F);
> -  /* if bitmap_init fails, catched in is_valid() */
> +  /* if bitmap_init fails, caught in is_valid() */
>    if (likely(!bitmap_init(&m_cols,
>                            m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
>                            (m_width + 7) & ~7UL,
> @@ -5674,7 +5675,9 @@ Rows_log_event::Rows_log_event(const cha
>                                 *description_event)
>    : Log_event(buf, description_event),
>      m_row_count(0),
> -    m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
> +    m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
> +    m_curr_row(NULL), m_curr_row_end(NULL),
> +    m_key(NULL)
>  {
>    DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
>    uint8 const common_header_len= description_event->common_header_len;
> @@ -5708,7 +5711,7 @@ Rows_log_event::Rows_log_event(const cha
>    DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
>    m_width = net_field_length(&ptr_after_width);
>    DBUG_PRINT("debug", ("m_width=%lu", m_width));
> -  /* if bitmap_init fails, catched in is_valid() */
> +  /* if bitmap_init fails, caught in is_valid() */
>    if (likely(!bitmap_init(&m_cols,
>                            m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
>                            (m_width + 7) & ~7UL,
> @@ -5732,7 +5735,7 @@ Rows_log_event::Rows_log_event(const cha
>    {
>      DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
>  
> -    /* if bitmap_init fails, catched in is_valid() */
> +    /* if bitmap_init fails, caught in is_valid() */
>      if (likely(!bitmap_init(&m_cols_ai,
>                              m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai :
> NULL,
>                              (m_width + 7) & ~7UL,
> @@ -5761,6 +5764,7 @@ Rows_log_event::Rows_log_event(const cha
>    m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
>    if (likely((bool)m_rows_buf))
>    {
> +    m_curr_row= m_rows_buf;
>      m_rows_end= m_rows_buf + data_size;
>      m_rows_cur= m_rows_end;
>      memcpy(m_rows_buf, ptr_rows_data, data_size);
> @@ -5865,7 +5869,6 @@ int Rows_log_event::do_apply_event(RELAY
>  {
>    DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)");
>    int error= 0;
> -  uchar const *row_start= m_rows_buf;
>  
>    /*
>      If m_table_id == ~0UL, then we have a dummy event that does not
> @@ -6028,7 +6031,9 @@ int Rows_log_event::do_apply_event(RELAY
>  
>    DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count
> == 0);
>  
> -  TABLE* table=
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);
> +  TABLE* 
> +    table= 
> +    m_table=
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);

But there is no need for the local var 'table'? Let's substitute it
with m_table.

>  
>    if (table)
>    {
> @@ -6075,22 +6080,52 @@ int Rows_log_event::do_apply_event(RELAY
>       */
>      const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);
>  
> -    error= do_before_row_operations(table);
> -    while (error == 0 && row_start < m_rows_end)
> -    {
> -      uchar const *row_end= NULL;
> -      if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
> -        break; // We should perform the after-row operation even in
> -               // the case of error
> +     if ( m_width == table->s->fields &&
> bitmap_is_set_all(&m_cols))
> +      set_flags(COMPLETE_ROWS_F);
> +
> +    /* 
> +      Initialize table's write and read sets. Extra columns are included
> +      as they will be filled with default values.
> +
> +      bit pos: 0 1 2 3 4 ... N-1 | N N+1 ... M |
> +      bit val: X X X X X .... X  | 1  1  ... 1 |
> +
> +      N = number of columns on master (m_width)
> +      M = number of columns on slave 
> +      X = bits from the row's column set (m_cols)      
> +
> +      We set all bits in read_set since we want to retrieve all columns even
> +      when the row is not complete.
> +     */
> +
> +    bitmap_set_all(table->write_set);
> +    bitmap_set_all(table->read_set);
>  
> -      DBUG_ASSERT(row_end != NULL); // cannot happen
> -      DBUG_ASSERT(row_end <= m_rows_end);
> +    /* 
> +      Note: I (Rafal) couldn't find any bitmap function which would
> +      copy bits from m_cols to a prefix of write_set. This is why I use
> +      simple loop below.
> +     */
> +    if (!get_flags(COMPLETE_ROWS_F)) 
> +      for (uint bit=0 ; bit < m_width ; ++bit)
> +         if (!bitmap_is_set(&m_cols,bit))
> +            bitmap_clear_bit(table->write_set,bit);
> +

We started discussing this loop due to lack of primitive.  Personally
I feel very uncomfortable to see that we use suboptimal solution which
can be avoided ralatively easy.  I think we agreed previously that if
the patch would have to wait till you are back from vacation then this
issue should be addressed in the newer patch.  How I see it can be
done. The current bitmap_intersect() fills the destination bitmap's
extra bits with zeros, which in our case should be with ones or rather
in case dest.n_bits > orig.n_bits the destination bitmap's
[dest.n_bits - orig.n_bits + 1, dest.n_bits] need preserving.



> +    // Do event specific preparations 
> +
> +    error= do_before_row_operations(thd);
>  
> +    // row processing loop
> +
> +    while (error == 0 && m_curr_row < m_rows_end)
> +    {
>        /* in_use can have been set to NULL in close_tables_for_reopen */
>        THD* old_thd= table->in_use;
>        if (!table->in_use)
>          table->in_use= thd;
> -      error= do_exec_row(table);
> +
> +      error= do_exec_row(thd,rli);
> +
>        table->in_use = old_thd;
>        switch (error)
>        {
> @@ -6110,20 +6145,39 @@ int Rows_log_event::do_apply_event(RELAY
>  	break;
>        }
>  
> -      row_start= row_end;
> -    }
> +      /*
> +        If m_curr_row_end  was not set during event execution (e.g., because
> +        of errors) we can't proceed to the next row. If the error is transient
> +        (i.e., error==0 at this point) we must call unpack_row() to set 

   unpack_current_row() is definetly meant -------------------^

> +        m_curr_row_end.
> +       */ 
> +
> +      if (!m_curr_row_end && !error)
> +        unpack_current_row();
> +
> +      // at this moment m_curr_row_end should be set
> +      DBUG_ASSERT(error || m_curr_row_end != NULL); 
> +      DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
> +      DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
> +
> +      m_curr_row= m_curr_row_end;
> +
> +    } // row processing loop
> +
>      DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
>                      const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;);
> -    error= do_after_row_operations(table, error);

> +
> +    error= do_after_row_operations(thd, error);
> +
>      if (!cache_stmt)
>      {
>        DBUG_PRINT("info", ("Marked that we need to keep log"));
>        thd->options|= OPTION_KEEP_LOG;
>      }
> -  }
> +  } // if (table)
>  
>    if (error)
> -  {                     /* error has occured during the transaction */
> +  {                     /* error has occurred during the transaction */
>      rli->report(ERROR_LEVEL, thd->net.last_errno,
>                  "Error in %s event: error during transaction execution "
>                  "on table %s.%s",
> @@ -6169,7 +6223,7 @@ int Rows_log_event::do_apply_event(RELAY
>        wait (reached end of last relay log and nothing gets appended
>        there), we timeout after one minute, and notify DBA about the
>        problem.  When WL#2975 is implemented, just remove the member
> -      st_relay_log_info::last_event_start_time and all its occurences.
> +      st_relay_log_info::last_event_start_time and all its occurrences.
>      */
>      const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
>    }
> @@ -6407,7 +6461,7 @@ Table_map_log_event::Table_map_log_event
>    m_data_size+= m_tbllen + 2;	// Include length and terminating \0
>    m_data_size+= 1 + m_colcnt;	// COLCNT and column types
>  
> -  /* If malloc fails, catched in is_valid() */
> +  /* If malloc fails, caught in is_valid() */
>    if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
>    {
>      m_coltype= reinterpret_cast<uchar*>(m_memory);
> @@ -6488,7 +6542,7 @@ Table_map_log_event::Table_map_log_event
>                       (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
>                       m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
>  
> -  /* Allocate mem for all fields in one go. If fails, catched in is_valid() */
> +  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
>    m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
>                                       &m_dbnam, (uint) m_dblen + 1,
>                                       &m_tblnam, (uint) m_tbllen + 1,
> @@ -6786,7 +6840,7 @@ Write_rows_log_event::Write_rows_log_eve
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Write_rows_log_event::do_before_row_operations(TABLE *table)
> +int Write_rows_log_event::do_before_row_operations(THD *thd)
>  {
>    int error= 0;
>  
> @@ -6808,26 +6862,26 @@ int Write_rows_log_event::do_before_row_
>    /* 
>       Do not raise the error flag in case of hitting to an unique attribute
>    */
> -  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> +  m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
>    /* 
>       NDB specific: update from ndb master wrapped as Write_rows
>    */
>    /*
>      so that the event should be applied to replace slave's row
>    */
> -  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> +  m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
>    /* 
>       NDB specific: if update from ndb master wrapped as Write_rows
>       does not find the row it's assumed idempotent binlog applying
>       is taking place; don't raise the error.
>    */
> -  table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> +  m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
>    /*
>      TODO: the cluster team (Tomas?) says that it's better if the engine knows
>      how many rows are going to be inserted, then it can allocate needed memory
>      from the start.
>    */
> -  table->file->ha_start_bulk_insert(0);
> +  m_table->file->ha_start_bulk_insert(0);
>    /*
>      We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
>      any TIMESTAMP column with data from the row but instead will use
> @@ -6843,45 +6897,29 @@ int Write_rows_log_event::do_before_row_
>      some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
>      analyze if explicit data is provided for slave's TIMESTAMP columns).
>    */
> -  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
>    return error;
>  }
>  
> -int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Write_rows_log_event::do_after_row_operations(THD *thd, int error)
>  {
>    int local_error= 0;
> -  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> -  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> +  m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> +  m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
>    /*
>      reseting the extra with 
> -    table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
> +    m_table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
>      fires bug#27077
>      todo: explain or fix
>    */
> -  if ((local_error= table->file->ha_end_bulk_insert()))
> +  if ((local_error= m_table->file->ha_end_bulk_insert()))
>    {
> -    table->file->print_error(local_error, MYF(0));
> +    m_table->file->print_error(local_error, MYF(0));
>    }
>    return error? error : local_error;
>  }
>  
> -int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> -                                         TABLE *table,
> -                                         uchar const *const row_start,
> -                                         uchar const **const row_end)
> -{
> -  DBUG_ASSERT(table != NULL);
> -  DBUG_ASSERT(row_start && row_end);
> -
> -  if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> -                            &m_master_reclength, table->write_set,
> WRITE_ROWS_EVENT))
> -  {
> -    thd->net.last_errno= error;
> -    return error;
> -  }
> -  bitmap_copy(table->read_set, table->write_set);
> -  return 0;
> -}
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
>  
>  /*
>    Check if there are more UNIQUE keys after the given key.
> @@ -6895,120 +6933,6 @@ last_uniq_key(TABLE *table, uint keyno)
>    return 1;
>  }
>  
> -/* Anonymous namespace for template functions/classes */
> -namespace {
> -
> -  /*
> -    Smart pointer that will automatically call my_afree (a macro) when
> -    the pointer goes out of scope.  This is used so that I do not have
> -    to remember to call my_afree() before each return.  There is no
> -    overhead associated with this, since all functions are inline.
> -
> -    I (Matz) would prefer to use the free function as a template
> -    parameter, but that is not possible when the "function" is a
> -    macro.
> -  */
> -  template <class Obj>
> -  class auto_afree_ptr
> -  {
> -    Obj* m_ptr;
> -  public:
> -    auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
> -    ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
> -    void assign(Obj* ptr) {
> -      /* Only to be called if it hasn't been given a value before. */
> -      DBUG_ASSERT(m_ptr == NULL);
> -      m_ptr= ptr;
> -    }
> -    Obj* get() { return m_ptr; }
> -  };
> -
> -}
> -
> -
> -/*
> -  Copy "extra" columns from record[1] to record[0].
> -
> -  Copy the extra fields that are not present on the master but are
> -  present on the slave from record[1] to record[0].  This is used
> -  after fetching a record that are to be updated, either inside
> -  replace_record() or as part of executing an update_row().
> - */
> -static int
> -copy_extra_record_fields(TABLE *table,
> -                         size_t master_reclength,
> -                         my_ptrdiff_t master_fields)
> -{
> -  DBUG_PRINT("info", ("Copying to 0x%lx "
> -                      "from field %lu at offset %lu "
> -                      "to field %d at offset %lu",
> -                      (long) table->record[0],
> -                      (ulong) master_fields, (ulong) master_reclength,
> -                      table->s->fields, table->s->reclength));
> -  /*
> -    Copying the extra fields of the slave that does not exist on
> -    master into record[0] (which are basically the default values).
> -  */
> -  DBUG_ASSERT(master_reclength <= table->s->reclength);
> -  if (master_reclength < table->s->reclength)
> -    bmove_align(table->record[0] + master_reclength,
> -                table->record[1] + master_reclength,
> -                table->s->reclength - master_reclength);
> -    
> -  /*
> -    Bit columns are special.  We iterate over all the remaining
> -    columns and copy the "extra" bits to the new record.  This is
> -    not a very good solution: it should be refactored on
> -    opportunity.
> -
> -    REFACTORING SUGGESTION (Matz).  Introduce a member function
> -    similar to move_field_offset() called copy_field_offset() to
> -    copy field values and implement it for all Field subclasses. Use
> -    this function to copy data from the found record to the record
> -    that are going to be inserted.
> -
> -    The copy_field_offset() function need to be a virtual function,
> -    which in this case will prevent copying an entire range of
> -    fields efficiently.
> -  */
> -  {
> -    Field **field_ptr= table->field + master_fields;
> -    for ( ; *field_ptr ; ++field_ptr)
> -    {
> -      /*
> -        Set the null bit according to the values in record[1]
> -       */
> -      if ((*field_ptr)->maybe_null() &&
> -         
> (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
> -        (*field_ptr)->set_null();
> -      else
> -        (*field_ptr)->set_notnull();
> -
> -      /*
> -        Do the extra work for special columns.
> -       */
> -      switch ((*field_ptr)->real_type())
> -      {
> -      default:
> -        /* Nothing to do */
> -        break;
> -
> -      case MYSQL_TYPE_BIT:
> -        Field_bit *f= static_cast<Field_bit*>(*field_ptr);
> -        if (f->bit_len > 0)
> -        {
> -          my_ptrdiff_t const offset= table->record[1] - table->record[0];
> -          uchar const bits=
> -            get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
> -          set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
> -        }
> -        break;
> -      }
> -    }
> -  }
> -  return 0;                                     // All OK
> -}
> -
>  #define DBUG_PRINT_BITSET(N,FRM,BS)                \
>    do {                                             \
>      char buf[256];                                 \
> @@ -7044,58 +6968,89 @@ is_duplicate_key_error(int errcode)
>    return false;
>  }
>  
> +/**
> +  Write the current row into @c m_table.
>  
> -/*
> -  Replace the provided record in the database.
> +  The row is located in the row buffer, pointed by @c m_curr_row member.
> +  Number of columns of the row is stored in @c m_width member (it can be smaller
> +  than the number of columns in the table to which we insert). Bitmap @c m_cols
> +  indicates which columns are present in the row. It is assumed that event's 
> +  table is already open and pointed by @c m_table.
> +
> +  If the same record already exists in the table it can be either overwritten 
> +  or an error is reported depending on the value of @c overwrite flag 
> +  (error reporting not yet implemented). Note that the matching record can be
> +  different from the row we insert if we use primary keys to identify records in
> +  the table.
> +
> +  The row to be inserted can contain values only for selected columns. The 
> +  missing columns are filled with default values using @c prepare_record() 
> +  function. If a matching record is found in the table and overwritten, the 
> +  missing columns are taken from it.
> +
> +  @param  thd   Thread context for writing the record.
> +  @param  log   Logger for error reporting.
> +  @param  overwrite  Shall we overwrite if the row already exists or signal 
> +                error (currently ignored).
> +
> +  @returns Error code on failure, 0 on success.
> +
> +  This method, if successful, sets @c m_curr_row_end pointer to point at the
> +  next row in the rows buffer. This is done when unpacking the row to be 
> +  inserted.
>  
> -  SYNOPSIS
> -      replace_record()
> -      thd    Thread context for writing the record.
> -      table  Table to which record should be written.
> -      master_reclength
> -             Offset to first column that is not present on the master,
> -             alternatively the length of the record on the master
> -             side.
> -
> -  RETURN VALUE
> -      Error code on failure, 0 on success.
> -
> -  DESCRIPTION
> -      Similar to how it is done in mysql_insert(), we first try to do
> -      a ha_write_row() and of that fails due to duplicated keys (or
> -      indices), we do an ha_update_row() or a ha_delete_row() instead.
> - */
> -static int
> -replace_record(THD *thd, TABLE *table,
> -               ulong const master_reclength,
> -               uint const master_fields)
> +  @note If a matching record is found, it is either updated using 
> +  @c ha_update_row() function or first deleted and then new record written.
> +
> +*/ 
> +
> +int
> +Rows_log_event::write_row(THD *const thd, 

> +                          const Slave_reporting_capability *const log,
> +                          const bool overwrite)
>  {
> -  DBUG_ENTER("replace_record");
> -  DBUG_ASSERT(table != NULL && thd != NULL);
> +  DBUG_ENTER("write_row");
> +  DBUG_ASSERT(m_table != NULL && thd != NULL);
>  
>    int error;
>    int keynum;
>    auto_afree_ptr<char> key(NULL);
>  
> -#ifndef DBUG_OFF
> -  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> -  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> -#endif
> +  /* fill m_table->record[0] with default values */
> +
> +  if ((error= prepare_record(log, m_table,m_width,
> +                             TRUE /* check if columns have def. values */)))
> +    goto finish; 
> +  
> +  /* unpack row into m_table->record[0] */
> +  error= unpack_current_row(); // TODO: how to handle errors?
> +
> +  DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> +  DBUG_PRINT_BITSET("debug", "write_set = %s", m_table->write_set);
> +  DBUG_PRINT_BITSET("debug", "read_set = %s", m_table->read_set);
> +
> +  /* 
> +    Try to write record. If a corresponding record already exists in the table,
> +    we try to change it using ha_update_row() if possible. Otherwise we delete
> +    it and jump here to try writing again. 
> +
> +    TODO: Add safety measures against infinite looping. 
> +   */
>  
> -  while ((error= table->file->ha_write_row(table->record[0])))
> + write_row:
> +
> +  if ((error= m_table->file->ha_write_row(m_table->record[0])))
>    {
> +    DBUG_PRINT("info",("ha_write_row() returned error %d",error));
> +
>      if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
> +      goto handler_error;
> +
> +    if ((keynum= m_table->file->get_dup_key(error)) < 0)
>      {
> -      table->file->print_error(error, MYF(0)); /* to check at
> exec_relay_log_event */
> -      DBUG_RETURN(error);
> -    }
> -    if ((keynum= table->file->get_dup_key(error)) < 0)
> -    {
> -      /* We failed to retrieve the duplicate key */
> -      DBUG_RETURN(error);
> +      DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns
> %d)",keynum));
> +      goto handler_error;
>      }
> -
>      /*
>         We need to retrieve the old row into record[1] to be able to
>         either update or delete the offending record.  We either:
> @@ -7106,50 +7061,70 @@ replace_record(THD *thd, TABLE *table,
>         - use index_read_idx() with the key that is duplicated, to
>           retrieve the offending row.
>       */
> -    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
> +    if (m_table->file->ha_table_flags() & HA_DUPLICATE_POS)
>      {
> -      error= table->file->rnd_pos(table->record[1],
> table->file->dup_ref);
> +      DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
> +
> +      error= m_table->file->rnd_pos(m_table->record[1], 
> +                                    m_table->file->dup_ref);
>        if (error)
>        {
> -        table->file->print_error(error, MYF(0));
> -        DBUG_RETURN(error);
> +        DBUG_PRINT("info",("rnd_pos() returns error %d",error));
> +        goto handler_error;
>        }
>      }
>      else
>      {
> -      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
> -      {
> -        DBUG_RETURN(my_errno);
> +      DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
> +
> +      if (m_table->file->extra(HA_EXTRA_FLUSH_CACHE))
> +      { 
> +        DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
> +        error= my_errno;
> +        goto finish;
>        }
>  
>        if (key.get() == NULL)
>        {
> -       
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> +       
> key.assign(static_cast<char*>(my_alloca(m_table->s->max_unique_length)));
>          if (key.get() == NULL)
> -          DBUG_RETURN(ENOMEM);
> +        {
> +          DBUG_PRINT("info",("Can't allocate key buffer"));
> +          error= ENOMEM;
> +          goto finish;
> +        }
>        }
>  
> -      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> 0);
> -      error= table->file->index_read_idx(table->record[1], keynum,
> -                                         (const uchar*)key.get(),
> -                                         HA_WHOLE_KEY,
> -                                         HA_READ_KEY_EXACT);
> -      if (error)
> -      {
> -        table->file->print_error(error, MYF(0));
> -        DBUG_RETURN(error);
> +      key_copy((uchar*)key.get(), m_table->record[0], m_table->key_info +
> keynum, 0);
> +      if ((error= m_table->file->index_read_idx(m_table->record[1],
> keynum,
> +                                                (const uchar*)key.get(),
> +                                                HA_WHOLE_KEY,
> +                                                HA_READ_KEY_EXACT)))
> +      { 
> +        DBUG_PRINT("info",("index_read_idx() returns error %d",error)); 
> +        goto handler_error;
>        }
>      }
>  
>      /*
> -       Now, table->record[1] should contain the offending row.  That
> +       Now, record[1] should contain the offending row.  That
>         will enable us to update it or, alternatively, delete it (so
>         that we can insert the new row afterwards).
> +     */
>  
> -       First we copy the columns into table->record[0] that are not
> -       present on the master from table->record[1], if there are any.
> +    /*
> +      If row is incomplete we will use the record found to fill 
> +      missing columns.  
>      */
> -    copy_extra_record_fields(table, master_reclength, master_fields);
> +    if (!get_flags(COMPLETE_ROWS_F))
> +    {
> +      restore_record(m_table,record[1]);
> +      error= unpack_current_row();
> +    }
> +
> +    DBUG_PRINT("debug",("preparing for update: before and after image"));
> +    DBUG_DUMP("record[1] (before)", m_table->record[1],
> m_table->s->reclength);
> +    DBUG_DUMP("record[0] (after)", m_table->record[0],
> m_table->s->reclength);
>  
>      /*
>         REPLACE is defined as either INSERT or DELETE + INSERT.  If
> @@ -7166,37 +7141,71 @@ replace_record(THD *thd, TABLE *table,
>         fail, so we're better off just deleting the row and inserting
>         the correct row.
>       */
> -    if (last_uniq_key(table, keynum) &&
> -        !table->file->referenced_by_foreign_key())
> +    if (last_uniq_key(m_table, keynum) &&
> +        !m_table->file->referenced_by_foreign_key())
>      {
> -      error=table->file->ha_update_row(table->record[1],
> -                                       table->record[0]);
> -      if (error && error != HA_ERR_RECORD_IS_THE_SAME)
> -        table->file->print_error(error, MYF(0));
> -      else
> +      DBUG_PRINT("info",("Updating row using ha_update_row()"));
> +      error= m_table->file->ha_update_row(m_table->record[1],
> +                                          m_table->record[0]);
> +      if (error == HA_ERR_RECORD_IS_THE_SAME)
> +      {
> +        DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME ret. value from"
> +                           " ha_update_row()"));
>          error= 0;
> -      DBUG_RETURN(error);
> +      }
> +        
> +      if (error)
> +      {
> +        DBUG_PRINT("info",("ha_update_row() returns error %d",error));
> +        goto handler_error;
> +      }
>      }
>      else
>      {
> -      if ((error= table->file->ha_delete_row(table->record[1])))
> -      {
> -        table->file->print_error(error, MYF(0));
> -        DBUG_RETURN(error);
> +      DBUG_PRINT("info",("Deleting offending row and trying to write new one
> again"));
> +      if ((error= m_table->file->ha_delete_row(m_table->record[1])))
> +      { 
> +        DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
> +        goto handler_error;
>        }
> -      /* Will retry ha_write_row() with the offending row removed. */
> +
> +      /* try to write the row again */
> +      goto write_row;
>      }
> -  }
>  
> + } // if (ha_write_row()) 
> +
> +#ifndef DBUG_OFF
> + if (error)
> +   DBUG_PRINT("info",("Failed to insert row (error=%d)",error));
> +#endif
> +
> + goto finish;
> +
> + handler_error:
> +
> + if (error)
> +    m_table->file->print_error(error, MYF(0));
> +
> + finish:
> +  
>    DBUG_RETURN(error);
>  }
>  
> -int Write_rows_log_event::do_exec_row(TABLE *table)
> +#endif
> +
> +int Write_rows_log_event::do_exec_row(THD *thd, 
> +                                      const Slave_reporting_capability *const log)
>  {
> -  DBUG_ASSERT(table != NULL);
> -  int error= replace_record(thd, table, m_master_reclength, m_width);
> -  return error;
> +  DBUG_ASSERT(m_table != NULL);
> +  int error= write_row(thd, log, TRUE /* overwrite */);
> +  
> +  if (error && !thd->net.last_errno)
> +    thd->net.last_errno= error;
> +      
> +  return error; 
>  }
> +
>  #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
>  
>  #ifdef MYSQL_CLIENT
> @@ -7289,50 +7298,79 @@ record_compare_exit:
>    return result;
>  }
>  
> +/**
> +  Locate the current row in @c m_table.
>  
> -/*
> -  Find the row given by 'key', if the table has keys, or else use a table scan
> -  to find (and fetch) the row.
> +  The current row is stored in event's row buffer, pointed by @c m_curr_row 
> +  member. Member @c m_width tells how many columns are there in the row (this 
> +  number can be smaller than the number of columns in the table). It is assumed 
> +  that event's table is already open and pointed by @c m_table.
> +
> +  If a corresponding record is found in the table it is stored in 
> +  @c m_table->record[0]. Note that when record is located based on a primary 
> +  key, it is possible that the record found differs from the row being located.
> +
> +  If no key is specified or table does not have keys, a table scan is used to 
> +  find the row. In that case the row should be complete and contain values for
> +  all columns. However, it can still be shorter than the table, i.e. the table 
> +  can contain extra columns not present in the row. 
> +
> +  @returns Error code on failure, 0 on success. If success @c m_table->record[0]
> 
> +  contains the record found. Also, the internal "cursor" of the table is 
> +  positioned at the record found.
>  
> -  If the engine allows random access of the records, a combination of
> -  position() and rnd_pos() will be used.
> +  @note If the engine allows random access of the records, a combination of
> +  @c position() and @c rnd_pos() will be used. 
> + */
>  
> -  @param table Pointer to table to search
> -  @param key   Pointer to key to use for search, if table has key
> +int Rows_log_event::find_and_fetch_row()
> +{
> +  DBUG_ENTER("find_and_fetch_row");
>  
> -  @pre <code>table->record[0]</code> shall contain the row to locate
> -  and <code>key</code> shall contain a key to use for searching, if
> -  the engine has a key.
> -
> -  @post If the return value is zero, <code>table->record[1]</code>
> -  will contain the fetched row and the internal "cursor" will refer to
> -  the row. If the return value is non-zero,
> -  <code>table->record[1]</code> is undefined.  In either case,
> -  <code>table->record[0]</code> is undefined.
> +  DBUG_ASSERT(m_table && m_table->in_use != NULL);
>  
> -  @return Zero if the row was successfully fetched into
> -  <code>table->record[1]</code>, error code otherwise.
> - */
> +  int error;
>  
> -static int find_and_fetch_row(TABLE *table, uchar *key)
> -{
> -  DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
> -  DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx  record: 0x%lx",
> -		       (long) table, (long) key, (long) table->record[1]));
> +  /* unpack row - missing fields get default values */
> +
> +  prepare_record(NULL,m_table,m_width,FALSE /* don't check errors */); // TODO:
> shall we check and report errors here?
> +  error= unpack_current_row(); 
> +
> +  DBUG_PRINT("info",("looking for the following record"));
> +  DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> +
> +  /* Select best method for locating the row based on handler capabilities */
> +
> +  enum { TABLE_SCAN, INDEX, POSITION } method= TABLE_SCAN;
> +
> +  if ((m_table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> +      m_table->s->primary_key < MAX_KEY)
> +   method= POSITION;
> +  else if (m_table->s->keys > 0)
> +   method= INDEX;
> +
> +  if (method != POSITION)
> +  { 
> +    m_table->use_all_columns();
> +    /*
> +      Save unpacked row in record[1] for comparisons during scans if they happen.
> +      Note: missing columns are filled with default values and these will be used
> +      in comparisons. TODO: better handling of missing columns in comparisons.
> +     */ 
> +    store_record(m_table,record[1]); 
> +  }
>  
> -  DBUG_ASSERT(table->in_use != NULL);
> +  /* Locate row using selected method. The record found is stored in record[0] */
>  
> -  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +  switch (method) {
>  
> -  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> -      table->s->primary_key < MAX_KEY)
> +  case POSITION:
>    {
> +    DBUG_PRINT("info",("locating record using primary key (position)"));
>      /*
> -      Use a more efficient method to fetch the record given by
> -      table->record[0] if the engine allows it.  We first compute a
> -      row reference using the position() member function (it will be
> -      stored in table->file->ref) and the use rnd_pos() to position
> -      the "cursor" (i.e., record[0] in this case) at the correct row.
> +      We compute a row reference using position() member function 
> +      (it will be stored in table->file->ref) and then use rnd_pos() to 
> +      position the "cursor" (i.e., record[0] in this case) at the correct row.
>  
>        TODO: Add a check that the correct record has been fetched by
>        comparing with the original record. Take into account that the
> @@ -7343,36 +7381,34 @@ static int find_and_fetch_row(TABLE *tab
>                int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
>        ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
>                                   table->s->reclength) == 0);
> -
>      */
> -    table->file->position(table->record[0]);
> -    int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> -    /*
> -      rnd_pos() returns the record in table->record[0], so we have to
> -      move it to table->record[1].
> -     */
> -    bmove_align(table->record[1], table->record[0],
> table->s->reclength);
> -    DBUG_RETURN(error);
> +    m_table->file->position(m_table->record[0]);
> +    if ((error= m_table->file->rnd_pos(m_table->record[0],
> m_table->file->ref)))
> +      goto handler_error;
> +
> +    break;
>    }
>  
> -  /* We need to retrieve all fields */
> -  /* TODO: Move this out from this function to main loop */
> -  table->use_all_columns();
> -
> -  if (table->s->keys > 0)
> -  {
> -    int error;
> -    /* We have a key: search the table using the index */
> -    if (!table->file->inited && (error=
> table->file->ha_index_init(0, FALSE)))
> -      DBUG_RETURN(error);
> +  case INDEX:    
> +  {  
> +    DBUG_PRINT("info",("locating record using primary key (index_read)"));
>  
> -  /*
> -    Don't print debug messages when running valgrind since they can
> -    trigger false warnings.
> -   */
> +    /* Initialize primary key index (key #0) */
> +
> +    if (!m_table->file->inited && (error=
> m_table->file->ha_index_init(0, FALSE)))
> +      goto handler_error;
> +
> +    /* Fill key data for the row */
> +
> +    DBUG_ASSERT(m_key);
> +    key_copy(m_key, m_table->record[0], m_table->key_info, 0);
> +
> +    /*
> +      Don't print debug messages when running valgrind since they can
> +      trigger false warnings.
> +     */
>  #ifndef HAVE_purify
> -    DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> -    DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +    DBUG_DUMP("key data", m_key, m_table->key_info->key_length);
>  #endif
>  
>      /*
> @@ -7382,14 +7418,16 @@ static int find_and_fetch_row(TABLE *tab
>        correctly.
>      */
>      my_ptrdiff_t const pos=
> -      table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
> -    table->record[1][pos]= 0xFF;
> -    if ((error= table->file->index_read(table->record[1], key,
> HA_WHOLE_KEY,
> -                                        HA_READ_KEY_EXACT)))
> -    {
> -      table->file->print_error(error, MYF(0));
> -      table->file->ha_index_end();
> -      DBUG_RETURN(error);
> +      m_table->s->null_bytes > 0 ? m_table->s->null_bytes - 1 : 0;
> +    m_table->record[0][pos]= 0xFF;
> +    
> +    if ((error= m_table->file->index_read(m_table->record[0], m_key, 
> +                                          HA_WHOLE_KEY,
> +                                          HA_READ_KEY_EXACT)))
> +    {
> +      DBUG_PRINT("info",("no record matching the key found in the table"));
> +      m_table->file->ha_index_end();
> +      goto handler_error;
>      }
>  
>    /*
> @@ -7397,8 +7435,8 @@ static int find_and_fetch_row(TABLE *tab
>      trigger false warnings.
>     */
>  #ifndef HAVE_purify
> -    DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> -    DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +    DBUG_PRINT("info",("found first matching record")); 
> +    DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
>  #endif
>      /*
>        Below is a minor "optimization".  If the key (i.e., key number
> @@ -7414,92 +7452,156 @@ static int find_and_fetch_row(TABLE *tab
>        found.  I can see no scenario where it would be incorrect to
>        chose the row to change only using a PK or an UNNI.
>      */
> -    if (table->key_info->flags & HA_NOSAME)
> +    if (!(m_table->key_info->flags & HA_NOSAME))
>      {
> -      table->file->ha_index_end();
> -      DBUG_RETURN(0);
> -    }
> +      /*
> +        In case key is not unique, we still have to iterate over records found
> +        and find the one which is identical to the row given. The row is unpacked
> +        in record[1] where missing columns are filled with default values.
> +       */ 
> +      DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
> 
>  
> -    while (record_compare(table))
> -    {
> -      int error;
> +      while (record_compare(m_table))
> +      {  
> +        /*
> +          We need to set the null bytes to ensure that the filler bit
> +          are all set when returning.  There are storage engines that
> +          just set the necessary bits on the bytes and don't set the
> +          filler bits correctly.
> +  
> +          TODO[record format ndb]: Remove this code once NDB returns the
> +          correct record format.
> +        */
> +        if (m_table->s->null_bytes > 0)
> +        {
> +          m_table->record[0][m_table->s->null_bytes - 1]|=
> +            256U - (1U << m_table->s->last_null_bit_pos);
> +        }
> +  
> +        if ((error= m_table->file->index_next(m_table->record[0])))
> +        {
> +          DBUG_PRINT("info",("no record matching the given row found"));
> +          m_table->file->ha_index_end();
> +          goto handler_error;
> +        }
>  
> -      /*
> -        We need to set the null bytes to ensure that the filler bit
> -        are all set when returning.  There are storage engines that
> -        just set the necessary bits on the bytes and don't set the
> -        filler bits correctly.
> +        /*
> +          Don't print debug messages when running valgrind since they can
> +          trigger false warnings.
> +         */
> +#ifndef HAVE_purify
> +        DBUG_PRINT("info",("found next record")); 
> +        DBUG_DUMP("record[0]", m_table->record[0], m_table->s->reclength);
> +#endif
> +      } // index scan loop
>  
> -        TODO[record format ndb]: Remove this code once NDB returns the
> -        correct record format.
> -      */
> -      if (table->s->null_bytes > 0)
> -      {
> -        table->record[1][table->s->null_bytes - 1]|=
> -          256U - (1U << table->s->last_null_bit_pos);
> -      }
> +    } // if !<unique_index>
>  
> -      if ((error= table->file->index_next(table->record[1])))
> -      {
> -	table->file->print_error(error, MYF(0));
> -        table->file->ha_index_end();
> -	DBUG_RETURN(error);
> -      }
> -    }
> +    m_table->file->ha_index_end();
>  
> -    /*
> -      Have to restart the scan to be able to fetch the next row.
> -    */
> -    table->file->ha_index_end();
> +    break;
>    }
> -  else
> -  {
> +
> +  case TABLE_SCAN:    
> +  {    
> +    DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
> +
>      int restart_count= 0; // Number of times scanning has restarted from top
> -    int error;
>  
> -    /* We don't have a key: search the table using rnd_next() */
> -    if ((error= table->file->ha_rnd_init(1)))
> -      return error;
> +    if ((error= m_table->file->ha_rnd_init(1)))
> +    {
> +      DBUG_PRINT("info",("error initializing table scan (ha_rnd_init)"));
> +      goto handler_error;
> +    }
>  
>      /* Continue until we find the right record or have made a full loop */
> +    /*
> +      TODO: record_compare() below will take into account all fields in 
> +      table->record[1] - some of them might be not present in the row given and
> are
> +      filled with default values. Eventually, only the fields present in the row
> +      should be used to determine if a record was found or not.
> +
> +      Idea: change signature of record_compare to use row data for comparison. 
> +      I.e., function should check if columns present in the row have the same
> +      value in table->record[0].  
> +     */ 
>      do
>      {
> -      error= table->file->rnd_next(table->record[1]);
> -
> -      DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -      DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
> +      error= m_table->file->rnd_next(m_table->record[0]);
>  
>        switch (error) {
> +
>        case 0:
> -      case HA_ERR_RECORD_DELETED:
> -	break;
> +        DBUG_DUMP("record found", m_table->record[0],
> m_table->s->reclength);
> +
> +      case HA_ERR_RECORD_DELETED: 
> +        break;
>  
>        case HA_ERR_END_OF_FILE:
> -	if (++restart_count < 2)
> -	  table->file->ha_rnd_init(1);
> -	break;
> +        if (++restart_count < 2)
> +          m_table->file->ha_rnd_init(1); // TODO: handle error...
> +        break;
>  
>        default:
> -	table->file->print_error(error, MYF(0));
> -        DBUG_PRINT("info", ("Record not found"));
> -        table->file->ha_rnd_end();
> -	DBUG_RETURN(error);
> +        DBUG_PRINT("info", ("Failed to get next record (rnd_next)"));
> +        m_table->file->ha_rnd_end();
> +        goto handler_error;
>        }
>      }
> -    while (restart_count < 2 && record_compare(table));
> +    while (restart_count < 2 && record_compare(m_table));
>  
> -    /*
> -      Have to restart the scan to be able to fetch the next row.
> -    */
> -    DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
> -    table->file->ha_rnd_end();
> +    m_table->file->ha_rnd_end();
>  
>      DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
> -    DBUG_RETURN(error);
> +
> +    if (error || restart_count == 2)
> +    {
> +      DBUG_PRINT("info",("Record not found"));  
> +      error= error? error: HA_ERR_END_OF_FILE;
> +      goto finish; 
> +    }
> +
> +    break;
>    }
>  
> -  DBUG_RETURN(0);
> +  default: 
> +    DBUG_ASSERT(FALSE);
> +
> +  } // switch (method)
> +
> +  if (error)
> +  {
> +    DBUG_PRINT("info", ("Record not found"));
> +    goto finish;
> +  }
> +
> +  /*
> +    We have located the row in the table - it is stored in record[0].
> +   */ 
> +
> +  DBUG_PRINT("info", ("Record found"));
> +
> +  /*
> +    Don't print debug messages when running valgrind since they can
> +    trigger false warnings.
> +   */
> +#ifndef HAVE_purify
> +  DBUG_DUMP("record found", m_table->record[0], m_table->s->reclength);
> +#endif
> +
> +  goto finish;
> +
> + handler_error:
> +
> +  if (error)
> +    m_table->file->print_error(error, MYF(0));
> +
> + finish:
> +
> +  DBUG_RETURN(error);
> +
>  }
> +
>  #endif
>  
>  /*
> @@ -7511,9 +7613,6 @@ Delete_rows_log_event::Delete_rows_log_e
>                                               ulong tid, MY_BITMAP const *cols,
>                                               bool is_transactional)
>    : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
> -#ifdef HAVE_REPLICATION
> -  ,m_memory(NULL), m_key(NULL), m_after_image(NULL)
> -#endif
>  {
>  }
>  #endif /* #if !defined(MYSQL_CLIENT) */
> @@ -7525,23 +7624,16 @@ Delete_rows_log_event::Delete_rows_log_e
>  Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
>                                               const Format_description_log_event
>                                               *description_event)
> -#if defined(MYSQL_CLIENT)
>    : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
> -#else
> -  : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
> -    m_memory(NULL), m_key(NULL), m_after_image(NULL)
> -#endif
>  {
>  }
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Delete_rows_log_event::do_before_row_operations(TABLE *table)
> +int Delete_rows_log_event::do_before_row_operations(THD *thd)
>  {
> -  DBUG_ASSERT(m_memory == NULL);
> -
> -  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> -      table->s->primary_key < MAX_KEY)
> +  if ((m_table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> +      m_table->s->primary_key < MAX_KEY)
>    {
>      /*
>        We don't need to allocate any memory for m_after_image and
> @@ -7550,87 +7642,38 @@ int Delete_rows_log_event::do_before_row
>      return 0;
>    }
>  
> -  int error= 0;
> -
> -  if (table->s->keys > 0)
> +  if (m_table->s->keys > 0)
>    {
> -    m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> -                                       &m_after_image,
> -                                       (uint) table->s->reclength,
> -                                       &m_key,
> -                                       (uint) table->key_info->key_length,
> -                                       NullS);
> +    // Allocate buffer for key searches
> +    m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
> +    if (!m_key)
> +      return HA_ERR_OUT_OF_MEM;
>    }
> -  else
> -  {
> -    m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> -    m_memory= (uchar*)m_after_image;
> -    m_key= NULL;
> -  }
> -  if (!m_memory)
> -    return HA_ERR_OUT_OF_MEM;
> -
> -  return error;
> +  return 0;
>  }
>  
> -int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Delete_rows_log_event::do_after_row_operations(THD *thd, int error)
>  {
>    /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> -  table->file->ha_index_or_rnd_end();
> -  my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
> -  m_memory= NULL;
> -  m_after_image= NULL;
> +  m_table->file->ha_index_or_rnd_end();
> +  my_free(m_key, MYF(MY_ALLOW_ZERO_PTR));
>    m_key= NULL;
>  
>    return error;
>  }
>  
> -int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> -                                          TABLE *table,
> -                                          uchar const *const row_start,
> -                                          uchar const **const row_end)
> -{
> -  DBUG_ASSERT(row_start && row_end);
> -  /*
> -    This assertion actually checks that there is at least as many
> -    columns on the slave as on the master.
> -  */
> -  DBUG_ASSERT(table->s->fields >= m_width);
> -
> -  if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> -                            &m_master_reclength, table->read_set,
> DELETE_ROWS_EVENT))
> -  {
> -    thd->net.last_errno= error;
> -    return error;
> -  }
> -
> -  /*
> -    If we will access rows using the random access method, m_key will
> -    be set to NULL, so we do not need to make a key copy in that case.
> -   */
> -  if (m_key)
> -  {
> -    KEY *const key_info= table->key_info;
> -
> -    key_copy(m_key, table->record[0], key_info, 0);
> -  }
> -
> -  return 0;
> -}
> -
> -int Delete_rows_log_event::do_exec_row(TABLE *table)
> +int Delete_rows_log_event::do_exec_row(THD *thd,
> +                                       const Slave_reporting_capability *const)
>  {
>    int error;
> -  DBUG_ASSERT(table != NULL);
> +  DBUG_ASSERT(m_table != NULL);
>  
> -  if (!(error= find_and_fetch_row(table, m_key)))
> +  if (!(error= find_and_fetch_row())) 
>    { 
>      /*
> -      Now we should have the right row to delete.  We are using
> -      record[0] since it is guaranteed to point to a record with the
> -      correct value.
> +      Delete the record found, located in record[0]
>      */
> -    error= table->file->ha_delete_row(table->record[0]);
> +    error= m_table->file->ha_delete_row(m_table->record[0]);
>    }
>    return error;
>  }
> @@ -7660,10 +7703,6 @@ Update_rows_log_event::Update_rows_log_e
>                                               MY_BITMAP const *cols_ai,
>                                               bool is_transactional)
>  : Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional)
> -#ifdef HAVE_REPLICATION
> -  , m_memory(NULL), m_key(NULL)
> -
> -#endif
>  {
>    init(cols_ai);
>  }
> @@ -7673,16 +7712,13 @@ Update_rows_log_event::Update_rows_log_e
>                                               MY_BITMAP const *cols,
>                                               bool is_transactional)
>  : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
> -#ifdef HAVE_REPLICATION
> -  , m_memory(NULL), m_key(NULL)
> -#endif
>  {
>    init(cols);
>  }
>  
>  void Update_rows_log_event::init(MY_BITMAP const *cols)
>  {
> -  /* if bitmap_init fails, catched in is_valid() */
> +  /* if bitmap_init fails, caught in is_valid() */
>    if (likely(!bitmap_init(&m_cols_ai,
>                            m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
>                            (m_width + 7) & ~7UL,
> @@ -7712,152 +7748,78 @@ Update_rows_log_event::Update_rows_log_e
>                                               const
>                                               Format_description_log_event
>                                               *description_event)
> -#if defined(MYSQL_CLIENT)
>    : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
> -#else
> -  : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event),
> -    m_memory(NULL), m_key(NULL)
> -#endif
>  {
>  }
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Update_rows_log_event::do_before_row_operations(TABLE *table)
> +int Update_rows_log_event::do_before_row_operations(THD *thd)
>  {
> -  DBUG_ASSERT(m_memory == NULL);
> -
> -  int error= 0;
> -
> -  if (table->s->keys > 0)
> +  if (m_table->s->keys > 0)
>    {
> -    m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> -                                       &m_after_image,
> -                                       (uint) table->s->reclength,
> -                                       &m_key,
> -                                       (uint) table->key_info->key_length,
> -                                       NullS);
> +    // Allocate buffer for key searches
> +    m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
> +    if (!m_key)
> +      return HA_ERR_OUT_OF_MEM;
>    }
> -  else
> -  {
> -    m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> -    m_memory= m_after_image;
> -    m_key= NULL;
> -  }
> -  if (!m_memory)
> -    return HA_ERR_OUT_OF_MEM;
>  
> -  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
>  
> -  return error;
> +  return 0;
>  }
>  
> -int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
> +int Update_rows_log_event::do_after_row_operations(THD *thd, int error)
>  {
>    /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> -  table->file->ha_index_or_rnd_end();
> -  my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
> -  m_memory= NULL;
> -  m_after_image= NULL;
> +  m_table->file->ha_index_or_rnd_end();
> +  my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
>    m_key= NULL;
>  
>    return error;
>  }
>  
> -int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
> -                                          TABLE *table,
> -                                          uchar const *const row_start,
> -                                          uchar const **const row_end)
> +int Update_rows_log_event::do_exec_row(THD *thd,
> +                                       const Slave_reporting_capability *const)
>  {
> -  int error;
> -  DBUG_ASSERT(row_start && row_end);
> -  /*
> -    This assertion actually checks that there is at least as many
> -    columns on the slave as on the master.
> -  */
> -  DBUG_ASSERT(table->s->fields >= m_width);
> -
> -  /*
> -    We need to perform some juggling below since unpack_row() always
> -    unpacks into table->record[0]. For more information, see the
> -    comments for unpack_row().
> -  */
> -
> -  /* record[0] is the before image for the update */
> -  if ((error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
> -                         &m_master_reclength, table->read_set,
> UPDATE_ROWS_EVENT)))
> -  {
> -    thd->net.last_errno= error;
> -    return error;
> -  }
> +  DBUG_ASSERT(m_table != NULL);
> +  DBUG_ASSERT(m_table->s->fields >= m_width);
>  
> -  store_record(table, record[1]);
> -  uchar const *next_start = *row_end;
> -  /* m_after_image is the after image for the update */
> -  if ((error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end,
> -                         &m_master_reclength, table->write_set,
> UPDATE_ROWS_EVENT)))
> -  {
> -    thd->net.last_errno= error;
> +  int error= find_and_fetch_row(); 
> +  if (error)
>      return error;
> -  }
> -
> -  bmove_align(m_after_image, table->record[0], table->s->reclength);
> -  restore_record(table, record[1]);
> -
> -  /*
> -    Don't print debug messages when running valgrind since they can
> -    trigger false warnings.
> -   */
> -#ifndef HAVE_purify
> -  DBUG_DUMP("record[0]",     table->record[0], table->s->reclength);
> -  DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
> -#endif
>  
>    /*
> -    If we will access rows using the random access method, m_key will
> -    be set to NULL, so we do not need to make a key copy in that case.
> -   */
> -  if (m_key)
> -  {
> -    KEY *const key_info= table->key_info;
> +    This is the situation after locating BI:
>  
> -    key_copy(m_key, table->record[0], key_info, 0);
> -  }
> +    ===|=== before image ====|=== after image ===|===
> +       ^                     ^
> +       m_curr_row            m_curr_row_end
>  
> -  return error;
> -}
> +    BI found in the table is stored in record[0]. We copy it to record[1]
> +    and unpack AI to record[0].
> +   */
>  
> -int Update_rows_log_event::do_exec_row(TABLE *table)
> -{
> -  DBUG_ASSERT(table != NULL);
> +  store_record(m_table,record[1]);
>  
> -  int error= find_and_fetch_row(table, m_key);
> -  if (error)
> -    return error;
> -
> -  /*
> -    We have to ensure that the new record (i.e., the after image) is
> -    in record[0] and the old record (i.e., the before image) is in
> -    record[1].  This since some storage engines require this (for
> -    example, the partition engine).
> -
> -    Since find_and_fetch_row() puts the fetched record (i.e., the old
> -    record) in record[1], we can keep it there. We put the new record
> -    (i.e., the after image) into record[0], and copy the fields that
> -    are on the slave (i.e., in record[1]) into record[0], effectively
> -    overwriting the default values that where put there by the
> -    unpack_row() function.
> -  */
> -  bmove_align(table->record[0], m_after_image, table->s->reclength);
> -  copy_extra_record_fields(table, m_master_reclength, m_width);
> +  m_curr_row= m_curr_row_end;
> +  error= unpack_current_row(); // this also updates m_curr_row_end
>  
>    /*
>      Now we have the right row to update.  The old row (the one we're
> -    looking for) is in record[1] and the new row has is in record[0].
> -    We also have copied the original values already in the slave's
> -    database into the after image delivered from the master.
> +    looking for) is in record[1] and the new row is in record[0].
>    */
> -  error= table->file->ha_update_row(table->record[1],
> table->record[0]);
> +#ifndef HAVE_purify
> +  /*
> +    Don't print debug messages when running valgrind since they can
> +    trigger false warnings.
> +   */
> +  DBUG_PRINT("info",("Updating row in table"));
> +  DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
> +  DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
> +#endif
> +
> +  error= m_table->file->ha_update_row(m_table->record[1],
> m_table->record[0]);
>    if (error == HA_ERR_RECORD_IS_THE_SAME)
>      error= 0;
>  
> diff -Nrup a/sql/log_event.h b/sql/log_event.h
> --- a/sql/log_event.h	2007-06-05 01:15:00 +02:00
> +++ b/sql/log_event.h	2007-07-06 16:58:06 +02:00
> @@ -23,6 +23,10 @@
>  
>  #include <my_bitmap.h>
>  #include "rpl_constants.h"
> +#ifndef MYSQL_CLIENT
> +#include "rpl_record.h"
> +#include "rpl_reporting.h"
> +#endif
>  
>  #define LOG_READ_EOF    -1
>  #define LOG_READ_BOGUS  -2
> @@ -2135,7 +2139,13 @@ public:
>      NO_FOREIGN_KEY_CHECKS_F = (1U << 1),
>  
>      /* Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */
> -    RELAXED_UNIQUE_CHECKS_F = (1U << 2)
> +    RELAXED_UNIQUE_CHECKS_F = (1U << 2),
> +
> +    /* 
> +      Indicates that rows in this event are complete, that is contain
> +      values for all columns of the table.
> +     */
> +    COMPLETE_ROWS_F = (1U << 3)
>    };
>  
>    typedef uint16 flag_set;
> @@ -2239,7 +2249,25 @@ protected:
>    uchar    *m_rows_cur;		/* One-after the end of the data */
>    uchar    *m_rows_end;		/* One-after the end of the allocated space */
>  
> +  const uchar *m_curr_row;     /* Start of the row being processed */
> +  const uchar *m_curr_row_end; /* One-after the end of the current row */
> +
>    flag_set m_flags;		/* Flags for row-level events */
> +  uchar    *m_key;      /* Buffer to keep key value during searches */
> +
> +  /* helper functions */
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +  int find_and_fetch_row();
> +  int write_row(THD *const, const Slave_reporting_capability *const, const bool);
> +
> +  // Unpack the current row into m_table->record[0]

Suggest for the function to have a self-check

> +  int unpack_current_row()
> +  { 
       int ret= 

> +    ::unpack_row(m_table, m_width, m_curr_row, &m_cols, 
> +                        &m_curr_row_end, &m_master_reclength);

       DBUG_ASSERT(m_curr_row_end >= m_curr_row);

       return ret;
> +  }
> +#endif
>  
>  private:
>  
> @@ -2264,7 +2292,7 @@ private:
>        The member function will return 0 if all went OK, or a non-zero
>        error code otherwise.
>    */
> -  virtual int do_before_row_operations(TABLE *table) = 0;
> +  virtual int do_before_row_operations(THD*) = 0;
>  
>    /*
>      Primitive to clean up after a sequence of row executions.
> @@ -2274,45 +2302,31 @@ private:
>        After doing a sequence of do_prepare_row() and do_exec_row(),
>        this member function should be called to clean up and release
>        any allocated buffers.
> +      
> +      The error argument, if non-zero, indicates an error which happened during
> +      row processing before this function was called. In this case, even if 
> +      function is successful, it should return the error code given in the
> argument.
>    */
> -  virtual int do_after_row_operations(TABLE *table, int error) = 0;
> -
> -  /*
> -    Primitive to prepare for handling one row in a row-level event.
> -    
> -    DESCRIPTION 
> -
> -      The member function prepares for execution of operations needed for one
> -      row in a row-level event by reading up data from the buffer containing
> -      the row. No specific interpretation of the data is normally done here,
> -      since SQL thread specific data is not available: that data is made
> -      available for the do_exec function.
> -
> -      A pointer to the start of the next row, or NULL if the preparation
> -      failed. Currently, preparation cannot fail, but don't rely on this
> -      behavior. 
> -
> -    RETURN VALUE
> -      Error code, if something went wrong, 0 otherwise.
> -   */
> -  virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> -                             uchar const *row_start,
> -                             uchar const **row_end) = 0;
> +  virtual int do_after_row_operations(THD*,int error) = 0;
>  
>    /*
>      Primitive to do the actual execution necessary for a row.
>  
>      DESCRIPTION
>        The member function will do the actual execution needed to handle a row.
> +      The row is located at m_curr_row. When the function returns, 
> +      m_curr_row_end should point at the next row (one byte after the end
> +      of the current row).    
>  
>      RETURN VALUE
>        0 if execution succeeded, 1 if execution failed.
>        
>    */
> -  virtual int do_exec_row(TABLE *table) = 0;
> +  virtual int do_exec_row(THD*, const Slave_reporting_capability *const) = 0;
>  #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> -};
>  
> +  friend class Old_rows_log_event;
> +};
>  
>  /*****************************************************************************
>  
> @@ -2362,14 +2376,9 @@ private:
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -  uchar *m_memory;
> -  uchar *m_after_image;
> -
> -  virtual int do_before_row_operations(TABLE *table);
> -  virtual int do_after_row_operations(TABLE *table, int error);
> -  virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> -                             uchar const *row_start, uchar const **row_end);
> -  virtual int do_exec_row(TABLE *table);
> +  virtual int do_before_row_operations(THD*);
> +  virtual int do_after_row_operations(THD*,int);
> +  virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
>  #endif
>  };
>  

The following hunk positioned - before your patch - do_ methods under protected label
which
is differently from the similar code above. 
Whould it be the right moment to syncronize them?


> @@ -2441,15 +2450,9 @@ protected:
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -  uchar *m_memory;
> -  uchar *m_key;
> -  uchar *m_after_image;
> -
> -  virtual int do_before_row_operations(TABLE *table);
> -  virtual int do_after_row_operations(TABLE *table, int error);
> -  virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> -                             uchar const *row_start, uchar const **row_end);
> -  virtual int do_exec_row(TABLE *table);
> +  virtual int do_before_row_operations(THD*);
> +  virtual int do_after_row_operations(THD*,int);
> +  virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
>  #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
>  };
>  
> @@ -2512,15 +2515,9 @@ protected:
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -  uchar *m_memory;
> -  uchar *m_key;
> -  uchar *m_after_image;
> -
> -  virtual int do_before_row_operations(TABLE *table);
> -  virtual int do_after_row_operations(TABLE *table, int error);
> -  virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> -                             uchar const *row_start, uchar const **row_end);
> -  virtual int do_exec_row(TABLE *table);
> +  virtual int do_before_row_operations(THD*);
> +  virtual int do_after_row_operations(THD*,int);
> +  virtual int do_exec_row(THD*,const Slave_reporting_capability *const);
>  #endif
>  };
>  
> diff -Nrup a/sql/log_event_old.cc b/sql/log_event_old.cc
> --- a/sql/log_event_old.cc	2007-05-10 11:59:28 +02:00
> +++ b/sql/log_event_old.cc	2007-07-06 16:58:06 +02:00
> @@ -1,9 +1,961 @@
>  
>  #include "mysql_priv.h"
> +#ifndef MYSQL_CLIENT
> +#include "rpl_rli.h"
> +#include "rpl_utility.h"
> +#endif
>  #include "log_event_old.h"
>  #include "rpl_record_old.h"
>  
> +#define DBUG_PRINT_BITSET(N,FRM,BS)                \
> +  do {                                             \
> +    char buf[256];                                 \
> +    for (uint i = 0 ; i < (BS)->n_bits ; ++i)      \
> +      buf[i] = bitmap_is_set((BS), i) ? '1' : '0'; \
> +    buf[(BS)->n_bits] = '\0';                      \
> +    DBUG_PRINT((N), ((FRM), buf));                 \
> +  } while (0)
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +// Old implementation of do_apply_event()
> +int 
> +Old_rows_log_event::do_apply_event(Rows_log_event *ev, const RELAY_LOG_INFO *rli)
> +{
> +  DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)");
> +  DBUG_ASSERT(ev);
> +  
> +  THD *thd= ev->thd;
> +  int error= 0;
> +  uchar const *row_start= ev->m_rows_buf;
> +
> +  /*
> +    If m_table_id == ~0UL, then we have a dummy event that does not
> +    contain any data.  In that case, we just remove all tables in the
> +    tables_to_lock list, close the thread tables, and return with
> +    success.
> +   */
> +  if (ev->m_table_id == ~0UL)
> +  {
> +    /*
> +       This one is supposed to be set: just an extra check so that
> +       nothing strange has happened.
> +     */
> +    DBUG_ASSERT(ev->get_flags(Rows_log_event::STMT_END_F));
> +
> +    const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> +    close_thread_tables(thd);
> +    thd->clear_error();
> +    DBUG_RETURN(0);
> +  }
> +
> +  /*
> +    'thd' has been set by exec_relay_log_event(), just before calling
> +    do_apply_event(). We still check here to prevent future coding
> +    errors.
> +  */
> +  DBUG_ASSERT(rli->sql_thd == thd);
> +
> +  /*
> +    If there is no locks taken, this is the first binrow event seen
> +    after the table map events.  We should then lock all the tables
> +    used in the transaction and proceed with execution of the actual
> +    event.
> +  */
> +  if (!thd->lock)
> +  {
> +    bool need_reopen= 1; /* To execute the first lap of the loop below */
> +
> +    /*
> +      lock_tables() reads the contents of thd->lex, so they must be
> +      initialized. Contrary to in
> +      Table_map_log_event::do_apply_event() we don't call
> +      mysql_init_query() as that may reset the binlog format.
> +    */
> +    lex_start(thd);
> +
> +    while ((error= lock_tables(thd, rli->tables_to_lock,
> +                               rli->tables_to_lock_count, &need_reopen)))
> +    {
> +      if (!need_reopen)
> +      {
> +        if (thd->query_error || thd->is_fatal_error)
> +        {
> +          /*
> +            Error reporting borrowed from Query_log_event with many excessive
> +            simplifications (we don't honour --slave-skip-errors)
> +          */
> +          uint actual_error= thd->net.last_errno;
> +          rli->report(ERROR_LEVEL, actual_error,
> +                      "Error '%s' in %s event: when locking tables",
> +                      (actual_error ? thd->net.last_error :
> +                       "unexpected success or fatal error"),
> +                      ev->get_type_str());
> +          thd->is_fatal_error= 1;
> +        }
> +        else
> +        {
> +          rli->report(ERROR_LEVEL, error,
> +                      "Error in %s event: when locking tables",
> +                      ev->get_type_str());
> +        }
> +        const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> +        DBUG_RETURN(error);
> +      }
> +
> +      /*
> +        So we need to reopen the tables.
> +
> +        We need to flush the pending RBR event, since it keeps a
> +        pointer to an open table.
> +
> +        ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
> +        the pending RBR event and reset the table pointer after the
> +        tables has been reopened.
> +
> +        NOTE: For this new scheme there should be no pending event:
> +        need to add code to assert that is the case.
> +       */
> +      thd->binlog_flush_pending_rows_event(false);
> +      TABLE_LIST *tables= rli->tables_to_lock;
> +      close_tables_for_reopen(thd, &tables);
> +
> +      uint tables_count= rli->tables_to_lock_count;
> +      if ((error= open_tables(thd, &tables, &tables_count, 0)))
> +      {
> +        if (thd->query_error || thd->is_fatal_error)
> +        {
> +          /*
> +            Error reporting borrowed from Query_log_event with many excessive
> +            simplifications (we don't honour --slave-skip-errors)
> +          */
> +          uint actual_error= thd->net.last_errno;
> +          rli->report(ERROR_LEVEL, actual_error,
> +                      "Error '%s' on reopening tables",
> +                      (actual_error ? thd->net.last_error :
> +                       "unexpected success or fatal error"));
> +          thd->query_error= 1;
> +        }
> +        const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> +        DBUG_RETURN(error);
> +      }
> +    }
> +
> +    /*
> +      When the open and locking succeeded, we check all tables to
> +      ensure that they still have the correct type.
> +
> +      We can use a down cast here since we know that every table added
> +      to the tables_to_lock is a RPL_TABLE_LIST.
> +    */
> +
> +    {
> +      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
> +      for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
> +      {
> +        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
> +        {
> +          mysql_unlock_tables(thd, thd->lock);
> +          thd->lock= 0;
> +          thd->query_error= 1;
> +          const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> +          DBUG_RETURN(Rows_log_event::ERR_BAD_TABLE_DEF);
> +        }
> +      }
> +    }
> +
> +    /*
> +      ... and then we add all the tables to the table map and remove
> +      them from tables to lock.
> +
> +      We also invalidate the query cache for all the tables, since
> +      they will now be changed.
> +
> +      TODO [/Matz]: Maybe the query cache should not be invalidated
> +      here? It might be that a table is not changed, even though it
> +      was locked for the statement.  We do know that each
> +      Rows_log_event contain at least one row, so after processing one
> +      Rows_log_event, we can invalidate the query cache for the
> +      associated table.
> +     */
> +    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
> +    {
> +     
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.set_table(ptr->table_id,
> ptr->table);
> +    }
> +#ifdef HAVE_QUERY_CACHE
> +    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
> +#endif
> +    const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
> +  }
> +
> +  DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count
> == 0);
> +
> +  TABLE* table= 
> +   
> const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(ev->m_table_id);
> +
> +  if (table)
> +  {
> +    /*
> +      table == NULL means that this table should not be replicated
> +      (this was set up by Table_map_log_event::do_apply_event()
> +      which tested replicate-* rules).
> +    */
> +
> +    /*
> +      It's not needed to set_time() but
> +      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
> +      much slave is behind
> +      2) it will be needed when we allow replication from a table with no
> +      TIMESTAMP column to a table with one.
> +      So we call set_time(), like in SBR. Presently it changes nothing.
> +    */
> +    thd->set_time((time_t)ev->when);
> +    /*
> +      There are a few flags that are replicated with each row event.
> +      Make sure to set/clear them before executing the main body of
> +      the event.
> +    */
> +    if (ev->get_flags(Rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
> +        thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
> +    else
> +        thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
> +
> +    if (ev->get_flags(Rows_log_event::RELAXED_UNIQUE_CHECKS_F))
> +        thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
> +    else
> +        thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
> +    /* A small test to verify that objects have consistent types */
> +    DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
> +
> +    /*
> +      Now we are in a statement and will stay in a statement until we
> +      see a STMT_END_F.
> +
> +      We set this flag here, before actually applying any rows, in
> +      case the SQL thread is stopped and we need to detect that we're
> +      inside a statement and halting abruptly might cause problems
> +      when restarting.
> +     */
> +    const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);
> +
> +    error= do_before_row_operations(table);
> +    while (error == 0 && row_start < ev->m_rows_end)
> +    {
> +      uchar const *row_end= NULL;
> +      if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
> +        break; // We should perform the after-row operation even in
> +               // the case of error
> +
> +      DBUG_ASSERT(row_end != NULL); // cannot happen
> +      DBUG_ASSERT(row_end <= ev->m_rows_end);
> +
> +      /* in_use can have been set to NULL in close_tables_for_reopen */
> +      THD* old_thd= table->in_use;
> +      if (!table->in_use)
> +        table->in_use= thd;
> +      error= do_exec_row(table);
> +      table->in_use = old_thd;
> +      switch (error)
> +      {
> +        /* Some recoverable errors */
> +      case HA_ERR_RECORD_CHANGED:
> +      case HA_ERR_KEY_NOT_FOUND:  /* Idempotency support: OK if
> +                                           tuple does not exist */
> +  error= 0;
> +      case 0:
> +  break;
> +
> +      default:
> +  rli->report(ERROR_LEVEL, thd->net.last_errno,
> +                    "Error in %s event: row application failed",
> +                    ev->get_type_str());
> +  thd->query_error= 1;
> +  break;
> +      }
> +
> +      row_start= row_end;
> +    }
> +    DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
> +                    const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;);
> +    error= do_after_row_operations(table, error);
> +    if (!ev->cache_stmt)
> +    {
> +      DBUG_PRINT("info", ("Marked that we need to keep log"));
> +      thd->options|= OPTION_KEEP_LOG;
> +    }
> +  }
> +
> +  if (error)
> +  {                     /* error has occured during the transaction */
> +    rli->report(ERROR_LEVEL, thd->net.last_errno,
> +                "Error in %s event: error during transaction execution "
> +                "on table %s.%s",
> +                ev->get_type_str(), table->s->db.str,
> +                table->s->table_name.str);
> +
> +    /*
> +      If one day we honour --skip-slave-errors in row-based replication, and
> +      the error should be skipped, then we would clear mappings, rollback,
> +      close tables, but the slave SQL thread would not stop and then may
> +      assume the mapping is still available, the tables are still open...
> +      So then we should clear mappings/rollback/close here only if this is a
> +      STMT_END_F.
> +      For now we code, knowing that error is not skippable and so slave SQL
> +      thread is certainly going to stop.
> +      rollback at the caller along with sbr.
> +    */
> +    thd->reset_current_stmt_binlog_row_based();
> +    const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, error);
> +    thd->query_error= 1;
> +    DBUG_RETURN(error);
> +  }
> +
> +  /*
> +    This code would ideally be placed in do_update_pos() instead, but
> +    since we have no access to table there, we do the setting of
> +    last_event_start_time here instead.
> +  */
> +  if (table && (table->s->primary_key == MAX_KEY) &&
> +      !ev->cache_stmt && 
> +      ev->get_flags(Rows_log_event::STMT_END_F) == Rows_log_event::RLE_NO_FLAGS)
> +  {
> +    /*
> +      ------------ Temporary fix until WL#2975 is implemented ---------
> +
> +      This event is not the last one (no STMT_END_F). If we stop now
> +      (in case of terminate_slave_thread()), how will we restart? We
> +      have to restart from Table_map_log_event, but as this table is
> +      not transactional, the rows already inserted will still be
> +      present, and idempotency is not guaranteed (no PK) so we risk
> +      that repeating leads to double insert. So we desperately try to
> +      continue, hope we'll eventually leave this buggy situation (by
> +      executing the final Rows_log_event). If we are in a hopeless
> +      wait (reached end of last relay log and nothing gets appended
> +      there), we timeout after one minute, and notify DBA about the
> +      problem.  When WL#2975 is implemented, just remove the member
> +      st_relay_log_info::last_event_start_time and all its occurences.
> +    */
> +    const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
> +  }
> +
> +  DBUG_RETURN(0);
> +}
> +#endif
> +
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> +/*
> +  Check if there are more UNIQUE keys after the given key.
> +*/
> +static int
> +last_uniq_key(TABLE *table, uint keyno)
> +{
> +  while (++keyno < table->s->keys)
> +    if (table->key_info[keyno].flags & HA_NOSAME)
> +      return 0;
> +  return 1;
> +}
> +
> +/*
> +  Compares table->record[0] and table->record[1]
> +
> +  Returns TRUE if different.
> +*/
> +static bool record_compare(TABLE *table)
> +{
> +  /*
> +    Need to set the X bit and the filler bits in both records since
> +    there are engines that do not set it correctly.
> +
> +    In addition, since MyISAM checks that one hasn't tampered with the
> +    record, it is necessary to restore the old bytes into the record
> +    after doing the comparison.
> +
> +    TODO[record format ndb]: Remove it once NDB returns correct
> +    records. Check that the other engines also return correct records.
> +   */
> +
> +  bool result= FALSE;
> +  uchar saved_x[2], saved_filler[2];
> +
> +  if (table->s->null_bytes > 0)
> +  {
> +    for (int i = 0 ; i < 2 ; ++i)
> +    {
> +      saved_x[i]= table->record[i][0];
> +      saved_filler[i]= table->record[i][table->s->null_bytes - 1];
> +      table->record[i][0]|= 1U;
> +      table->record[i][table->s->null_bytes - 1]|=
> +        256U - (1U << table->s->last_null_bit_pos);
> +    }
> +  }
> +
> +  if (table->s->blob_fields + table->s->varchar_fields == 0)
> +  {
> +    result= cmp_record(table,record[1]);
> +    goto record_compare_exit;
> +  }
> +
> +  /* Compare null bits */
> +  if (memcmp(table->null_flags,
> +       table->null_flags+table->s->rec_buff_length,
> +       table->s->null_bytes))
> +  {
> +    result= TRUE;       // Diff in NULL value
> +    goto record_compare_exit;
> +  }
> +
> +  /* Compare updated fields */
> +  for (Field **ptr=table->field ; *ptr ; ptr++)
> +  {
> +    if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
> +    {
> +      result= TRUE;
> +      goto record_compare_exit;
> +    }
> +  }
> +
> +record_compare_exit:
> +  /*
> +    Restore the saved bytes.
> +
> +    TODO[record format ndb]: Remove this code once NDB returns the
> +    correct record format.
> +  */
> +  if (table->s->null_bytes > 0)
> +  {
> +    for (int i = 0 ; i < 2 ; ++i)
> +    {
> +      table->record[i][0]= saved_x[i];
> +      table->record[i][table->s->null_bytes - 1]= saved_filler[i];
> +    }
> +  }
> +
> +  return result;
> +}
> +
> +/*
> +  Copy "extra" columns from record[1] to record[0].
> +
> +  Copy the extra fields that are not present on the master but are
> +  present on the slave from record[1] to record[0].  This is used
> +  after fetching a record that are to be updated, either inside
> +  replace_record() or as part of executing an update_row().
> + */
> +static int
> +copy_extra_record_fields(TABLE *table,
> +                         size_t master_reclength,
> +                         my_ptrdiff_t master_fields)
> +{
> +  DBUG_PRINT("info", ("Copying to 0x%lx "
> +                      "from field %lu at offset %lu "
> +                      "to field %d at offset %lu",
> +                      (long) table->record[0],
> +                      (ulong) master_fields, (ulong) master_reclength,
> +                      table->s->fields, table->s->reclength));
> +  /*
> +    Copying the extra fields of the slave that does not exist on
> +    master into record[0] (which are basically the default values).
> +  */
> +  DBUG_ASSERT(master_reclength <= table->s->reclength);
> +  if (master_reclength < table->s->reclength)
> +    bmove_align(table->record[0] + master_reclength,
> +                table->record[1] + master_reclength,
> +                table->s->reclength - master_reclength);
> +    
> +  /*
> +    Bit columns are special.  We iterate over all the remaining
> +    columns and copy the "extra" bits to the new record.  This is
> +    not a very good solution: it should be refactored on
> +    opportunity.
> +
> +    REFACTORING SUGGESTION (Matz).  Introduce a member function
> +    similar to move_field_offset() called copy_field_offset() to
> +    copy field values and implement it for all Field subclasses. Use
> +    this function to copy data from the found record to the record
> +    that are going to be inserted.
> +
> +    The copy_field_offset() function need to be a virtual function,
> +    which in this case will prevent copying an entire range of
> +    fields efficiently.
> +  */
> +  {
> +    Field **field_ptr= table->field + master_fields;
> +    for ( ; *field_ptr ; ++field_ptr)
> +    {
> +      /*
> +        Set the null bit according to the values in record[1]
> +       */
> +      if ((*field_ptr)->maybe_null() &&
> +         
> (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
> +        (*field_ptr)->set_null();
> +      else
> +        (*field_ptr)->set_notnull();
> +
> +      /*
> +        Do the extra work for special columns.
> +       */
> +      switch ((*field_ptr)->real_type())
> +      {
> +      default:
> +        /* Nothing to do */
> +        break;
> +
> +      case MYSQL_TYPE_BIT:
> +        Field_bit *f= static_cast<Field_bit*>(*field_ptr);
> +        if (f->bit_len > 0)
> +        {
> +          my_ptrdiff_t const offset= table->record[1] - table->record[0];
> +          uchar const bits=
> +            get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
> +          set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
> +        }
> +        break;
> +      }
> +    }
> +  }
> +  return 0;                                     // All OK
> +}
> +
> +/*
> +  Replace the provided record in the database.
> +
> +  SYNOPSIS
> +      replace_record()
> +      thd    Thread context for writing the record.
> +      table  Table to which record should be written.
> +      master_reclength
> +             Offset to first column that is not present on the master,
> +             alternatively the length of the record on the master
> +             side.
> +
> +  RETURN VALUE
> +      Error code on failure, 0 on success.
> +
> +  DESCRIPTION
> +      Similar to how it is done in mysql_insert(), we first try to do
> +      a ha_write_row() and of that fails due to duplicated keys (or
> +      indices), we do an ha_update_row() or a ha_delete_row() instead.
> + */
> +static int
> +replace_record(THD *thd, TABLE *table,
> +               ulong const master_reclength,
> +               uint const master_fields)
> +{
> +  DBUG_ENTER("replace_record");
> +  DBUG_ASSERT(table != NULL && thd != NULL);
> +
> +  int error;
> +  int keynum;
> +  auto_afree_ptr<char> key(NULL);
> +
> +#ifndef DBUG_OFF
> +  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> +  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> +#endif
> +
> +  while ((error= table->file->ha_write_row(table->record[0])))
> +  {
> +    if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
> +    {
> +      table->file->print_error(error, MYF(0)); /* to check at
> exec_relay_log_event */
> +      DBUG_RETURN(error);
> +    }
> +    if ((keynum= table->file->get_dup_key(error)) < 0)
> +    {
> +      /* We failed to retrieve the duplicate key */
> +      DBUG_RETURN(error);
> +    }
> +
> +    /*
> +       We need to retrieve the old row into record[1] to be able to
> +       either update or delete the offending record.  We either:
> +
> +       - use rnd_pos() with a row-id (available as dupp_row) to the
> +         offending row, if that is possible (MyISAM and Blackhole), or else
> +
> +       - use index_read_idx() with the key that is duplicated, to
> +         retrieve the offending row.
> +     */
> +    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
> +    {
> +      error= table->file->rnd_pos(table->record[1],
> table->file->dup_ref);
> +      if (error)
> +      {
> +        table->file->print_error(error, MYF(0));
> +        DBUG_RETURN(error);
> +      }
> +    }
> +    else
> +    {
> +      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
> +      {
> +        DBUG_RETURN(my_errno);
> +      }
> +
> +      if (key.get() == NULL)
> +      {
> +       
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> +        if (key.get() == NULL)
> +          DBUG_RETURN(ENOMEM);
> +      }
> +
> +      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> 0);
> +      error= table->file->index_read_idx(table->record[1], keynum,
> +                                         (const uchar*)key.get(),
> +                                         HA_WHOLE_KEY,
> +                                         HA_READ_KEY_EXACT);
> +      if (error)
> +      {
> +        table->file->print_error(error, MYF(0));
> +        DBUG_RETURN(error);
> +      }
> +    }
> +
> +    /*
> +       Now, table->record[1] should contain the offending row.  That
> +       will enable us to update it or, alternatively, delete it (so
> +       that we can insert the new row afterwards).
> +
> +       First we copy the columns into table->record[0] that are not
> +       present on the master from table->record[1], if there are any.
> +    */
> +    copy_extra_record_fields(table, master_reclength, master_fields);
> +
> +    /*
> +       REPLACE is defined as either INSERT or DELETE + INSERT.  If
> +       possible, we can replace it with an UPDATE, but that will not
> +       work on InnoDB if FOREIGN KEY checks are necessary.
> +
> +       I (Matz) am not sure of the reason for the last_uniq_key()
> +       check as, but I'm guessing that it's something along the
> +       following lines.
> +
> +       Suppose that we got the duplicate key to be a key that is not
> +       the last unique key for the table and we perform an update:
> +       then there might be another key for which the unique check will
> +       fail, so we're better off just deleting the row and inserting
> +       the correct row.
> +     */
> +    if (last_uniq_key(table, keynum) &&
> +        !table->file->referenced_by_foreign_key())
> +    {
> +      error=table->file->ha_update_row(table->record[1],
> +                                       table->record[0]);
> +      if (error && error != HA_ERR_RECORD_IS_THE_SAME)
> +        table->file->print_error(error, MYF(0));
> +      else
> +        error= 0;
> +      DBUG_RETURN(error);
> +    }
> +    else
> +    {
> +      if ((error= table->file->ha_delete_row(table->record[1])))
> +      {
> +        table->file->print_error(error, MYF(0));
> +        DBUG_RETURN(error);
> +      }
> +      /* Will retry ha_write_row() with the offending row removed. */
> +    }
> +  }
> +
> +  DBUG_RETURN(error);
> +}
> +
> +/**
> +  Find the row given by 'key', if the table has keys, or else use a table scan
> +  to find (and fetch) the row.
> +
> +  If the engine allows random access of the records, a combination of
> +  position() and rnd_pos() will be used.
> +
> +  @param table Pointer to table to search
> +  @param key   Pointer to key to use for search, if table has key
> +
> +  @pre <code>table->record[0]</code> shall contain the row to locate
> +  and <code>key</code> shall contain a key to use for searching, if
> +  the engine has a key.
> +
> +  @post If the return value is zero, <code>table->record[1]</code>
> +  will contain the fetched row and the internal "cursor" will refer to
> +  the row. If the return value is non-zero,
> +  <code>table->record[1]</code> is undefined.  In either case,
> +  <code>table->record[0]</code> is undefined.
> +
> +  @return Zero if the row was successfully fetched into
> +  <code>table->record[1]</code>, error code otherwise.
> + */
> +
> +static int find_and_fetch_row(TABLE *table, uchar *key)
> +{
> +  DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
> +  DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx  record: 0x%lx",
> +           (long) table, (long) key, (long) table->record[1]));
> +
> +  DBUG_ASSERT(table->in_use != NULL);
> +
> +  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +
> +  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> +      table->s->primary_key < MAX_KEY)
> +  {
> +    /*
> +      Use a more efficient method to fetch the record given by
> +      table->record[0] if the engine allows it.  We first compute a
> +      row reference using the position() member function (it will be
> +      stored in table->file->ref) and the use rnd_pos() to position
> +      the "cursor" (i.e., record[0] in this case) at the correct row.
> +
> +      TODO: Add a check that the correct record has been fetched by
> +      comparing with the original record. Take into account that the
> +      record on the master and slave can be of different
> +      length. Something along these lines should work:
> +
> +      ADD>>>  store_record(table,record[1]);
> +              int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> +      ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
> +                                 table->s->reclength) == 0);
> +
> +    */
> +    table->file->position(table->record[0]);
> +    int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> +    /*
> +      rnd_pos() returns the record in table->record[0], so we have to
> +      move it to table->record[1].
> +     */
> +    bmove_align(table->record[1], table->record[0],
> table->s->reclength);
> +    DBUG_RETURN(error);
> +  }
> +
> +  /* We need to retrieve all fields */
> +  /* TODO: Move this out from this function to main loop */
> +  table->use_all_columns();
> +
> +  if (table->s->keys > 0)
> +  {
> +    int error;
> +    /* We have a key: search the table using the index */
> +    if (!table->file->inited && (error=
> table->file->ha_index_init(0, FALSE)))
> +      DBUG_RETURN(error);
> +
> +  /*
> +    Don't print debug messages when running valgrind since they can
> +    trigger false warnings.
> +   */
> +#ifndef HAVE_purify
> +    DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> +    DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +#endif
> +
> +    /*
> +      We need to set the null bytes to ensure that the filler bit are
> +      all set when returning.  There are storage engines that just set
> +      the necessary bits on the bytes and don't set the filler bits
> +      correctly.
> +    */
> +    my_ptrdiff_t const pos=
> +      table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
> +    table->record[1][pos]= 0xFF;
> +    if ((error= table->file->index_read(table->record[1], key,
> HA_WHOLE_KEY,
> +                                        HA_READ_KEY_EXACT)))
> +    {
> +      table->file->print_error(error, MYF(0));
> +      table->file->ha_index_end();
> +      DBUG_RETURN(error);
> +    }
> +
> +  /*
> +    Don't print debug messages when running valgrind since they can
> +    trigger false warnings.
> +   */
> +#ifndef HAVE_purify
> +    DBUG_DUMP("table->record[0]", table->record[0],
> table->s->reclength);
> +    DBUG_DUMP("table->record[1]", table->record[1],
> table->s->reclength);
> +#endif
> +    /*
> +      Below is a minor "optimization".  If the key (i.e., key number
> +      0) has the HA_NOSAME flag set, we know that we have found the
> +      correct record (since there can be no duplicates); otherwise, we
> +      have to compare the record with the one found to see if it is
> +      the correct one.
> +
> +      CAVEAT! This behaviour is essential for the replication of,
> +      e.g., the mysql.proc table since the correct record *shall* be
> +      found using the primary key *only*.  There shall be no
> +      comparison of non-PK columns to decide if the correct record is
> +      found.  I can see no scenario where it would be incorrect to
> +      chose the row to change only using a PK or an UNNI.
> +    */
> +    if (table->key_info->flags & HA_NOSAME)
> +    {
> +      table->file->ha_index_end();
> +      DBUG_RETURN(0);
> +    }
> +
> +    while (record_compare(table))
> +    {
> +      int error;
> +
> +      /*
> +        We need to set the null bytes to ensure that the filler bit
> +        are all set when returning.  There are storage engines that
> +        just set the necessary bits on the bytes and don't set the
> +        filler bits correctly.
> +
> +        TODO[record format ndb]: Remove this code once NDB returns the
> +        correct record format.
> +      */
> +      if (table->s->null_bytes > 0)
> +      {
> +        table->record[1][table->s->null_bytes - 1]|=
> +          256U - (1U << table->s->last_null_bit_pos);
> +      }
> +
> +      if ((error= table->file->index_next(table->record[1])))
> +      {
> +  table->file->print_error(error, MYF(0));
> +        table->file->ha_index_end();
> +  DBUG_RETURN(error);
> +      }
> +    }
> +
> +    /*
> +      Have to restart the scan to be able to fetch the next row.
> +    */
> +    table->file->ha_index_end();
> +  }
> +  else
> +  {
> +    int restart_count= 0; // Number of times scanning has restarted from top
> +    int error;
> +
> +    /* We don't have a key: search the table using rnd_next() */
> +    if ((error= table->file->ha_rnd_init(1)))
> +      return error;
> +
> +    /* Continue until we find the right record or have made a full loop */
> +    do
> +    {
> +      error= table->file->rnd_next(table->record[1]);
> +
> +      DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +      DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
> +
> +      switch (error) {
> +      case 0:
> +      case HA_ERR_RECORD_DELETED:
> +  break;
> +
> +      case HA_ERR_END_OF_FILE:
> +  if (++restart_count < 2)
> +    table->file->ha_rnd_init(1);
> +  break;
> +
> +      default:
> +  table->file->print_error(error, MYF(0));
> +        DBUG_PRINT("info", ("Record not found"));
> +        table->file->ha_rnd_end();
> +  DBUG_RETURN(error);
> +      }
> +    }
> +    while (restart_count < 2 && record_compare(table));
> +
> +    /*
> +      Have to restart the scan to be able to fetch the next row.
> +    */
> +    DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
> +    table->file->ha_rnd_end();
> +
> +    DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
> +    DBUG_RETURN(error);
> +  }
> +
> +  DBUG_RETURN(0);
> +}
> +
> +/**********************************************************
> +  Row handling primitives for Write_rows_log_event_old
> + **********************************************************/
> +
> +int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> +  int error= 0;
> +
> +  /*
> +    We are using REPLACE semantics and not INSERT IGNORE semantics
> +    when writing rows, that is: new rows replace old rows.  We need to
> +    inform the storage engine that it should use this behaviour.
> +  */
> +
> +  /* Tell the storage engine that we are using REPLACE semantics. */
> +  thd->lex->duplicates= DUP_REPLACE;
> +
> +  /*
> +    Pretend we're executing a REPLACE command: this is needed for
> +    InnoDB and NDB Cluster since they are not (properly) checking the
> +    lex->duplicates flag.
> +  */
> +  thd->lex->sql_command= SQLCOM_REPLACE;
> +  /* 
> +     Do not raise the error flag in case of hitting to an unique attribute
> +  */
> +  table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> +  /* 
> +     NDB specific: update from ndb master wrapped as Write_rows
> +  */
> +  /*
> +    so that the event should be applied to replace slave's row
> +  */
> +  table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> +  /* 
> +     NDB specific: if update from ndb master wrapped as Write_rows
> +     does not find the row it's assumed idempotent binlog applying
> +     is taking place; don't raise the error.
> +  */
> +  table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> +  /*
> +    TODO: the cluster team (Tomas?) says that it's better if the engine knows
> +    how many rows are going to be inserted, then it can allocate needed memory
> +    from the start.
> +  */
> +  table->file->ha_start_bulk_insert(0);
> +  /*
> +    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
> +    any TIMESTAMP column with data from the row but instead will use
> +    the event's current time.
> +    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
> +    columns, we know that all TIMESTAMP columns on slave will receive explicit
> +    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
> +    When we allow a table without TIMESTAMP to be replicated to a table having
> +    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
> +    column to be replicated into a BIGINT column and the slave's table has a
> +    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
> +    from set_time() which we called earlier (consistent with SBR). And then in
> +    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
> +    analyze if explicit data is provided for slave's TIMESTAMP columns).
> +  */
> +  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +  return error;
> +}
> +
> +int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> +  int local_error= 0;
> +  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> +  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> +  /*
> +    reseting the extra with 
> +    table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
> +    fires bug#27077
> +    todo: explain or fix
> +  */
> +  if ((local_error= table->file->ha_end_bulk_insert()))
> +  {
> +    table->file->print_error(local_error, MYF(0));
> +  }
> +  return error? error : local_error;
> +}
> +
>  int
>  Write_rows_log_event_old::do_prepare_row(THD *thd,
>                                           RELAY_LOG_INFO const *rli,
> @@ -23,6 +975,65 @@ Write_rows_log_event_old::do_prepare_row
>    return error;
>  }
>  
> +int Write_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> +  DBUG_ASSERT(table != NULL);
> +  int error= replace_record(thd, table, m_master_reclength, m_width);
> +  return error;
> +}
> +
> +/**********************************************************
> +  Row handling primitives for Delete_rows_log_event_old
> + **********************************************************/
> +
> +int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> +  DBUG_ASSERT(m_memory == NULL);
> +
> +  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
> +      table->s->primary_key < MAX_KEY)
> +  {
> +    /*
> +      We don't need to allocate any memory for m_after_image and
> +      m_key since they are not used.
> +    */
> +    return 0;
> +  }
> +
> +  int error= 0;
> +
> +  if (table->s->keys > 0)
> +  {
> +    m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> +                                       &m_after_image,
> +                                       (uint) table->s->reclength,
> +                                       &m_key,
> +                                       (uint) table->key_info->key_length,
> +                                       NullS);
> +  }
> +  else
> +  {
> +    m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> +    m_memory= (uchar*)m_after_image;
> +    m_key= NULL;
> +  }
> +  if (!m_memory)
> +    return HA_ERR_OUT_OF_MEM;
> +
> +  return error;
> +}
> +
> +int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> +  /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> +  table->file->ha_index_or_rnd_end();
> +  my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
> +  m_memory= NULL;
> +  m_after_image= NULL;
> +  m_key= NULL;
> +
> +  return error;
> +}
>  
>  int
>  Delete_rows_log_event_old::do_prepare_row(THD *thd,
> @@ -57,6 +1068,67 @@ Delete_rows_log_event_old::do_prepare_ro
>    return error;
>  }
>  
> +int Delete_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> +  int error;
> +  DBUG_ASSERT(table != NULL);
> +
> +  if (!(error= ::find_and_fetch_row(table, m_key)))
> +  { 
> +    /*
> +      Now we should have the right row to delete.  We are using
> +      record[0] since it is guaranteed to point to a record with the
> +      correct value.
> +    */
> +    error= table->file->ha_delete_row(table->record[0]);
> +  }
> +  return error;
> +}
> +
> +/**********************************************************
> +  Row handling primitives for Update_rows_log_event_old
> + **********************************************************/
> +
> +int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
> +{
> +  DBUG_ASSERT(m_memory == NULL);
> +
> +  int error= 0;
> +
> +  if (table->s->keys > 0)
> +  {
> +    m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> +                                       &m_after_image,
> +                                       (uint) table->s->reclength,
> +                                       &m_key,
> +                                       (uint) table->key_info->key_length,
> +                                       NullS);
> +  }
> +  else
> +  {
> +    m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
> +    m_memory= m_after_image;
> +    m_key= NULL;
> +  }
> +  if (!m_memory)
> +    return HA_ERR_OUT_OF_MEM;
> +
> +  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +
> +  return error;
> +}
> +
> +int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
> +{
> +  /*error= ToDo:find out what this should really be, this triggers close_scan in
> nbd, returning error?*/
> +  table->file->ha_index_or_rnd_end();
> +  my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
> +  m_memory= NULL;
> +  m_after_image= NULL;
> +  m_key= NULL;
> +
> +  return error;
> +}
>  
>  int Update_rows_log_event_old::do_prepare_row(THD *thd,
>                                                RELAY_LOG_INFO const *rli,
> @@ -97,6 +1169,43 @@ int Update_rows_log_event_old::do_prepar
>  
>      key_copy(m_key, table->record[0], key_info, 0);
>    }
> +
> +  return error;
> +}
> +
> +int Update_rows_log_event_old::do_exec_row(TABLE *table)
> +{
> +  DBUG_ASSERT(table != NULL);
> +
> +  int error= ::find_and_fetch_row(table, m_key);
> +  if (error)
> +    return error;
> +
> +  /*
> +    We have to ensure that the new record (i.e., the after image) is
> +    in record[0] and the old record (i.e., the before image) is in
> +    record[1].  This since some storage engines require this (for
> +    example, the partition engine).
> +
> +    Since find_and_fetch_row() puts the fetched record (i.e., the old
> +    record) in record[1], we can keep it there. We put the new record
> +    (i.e., the after image) into record[0], and copy the fields that
> +    are on the slave (i.e., in record[1]) into record[0], effectively
> +    overwriting the default values that where put there by the
> +    unpack_row() function.
> +  */
> +  bmove_align(table->record[0], m_after_image, table->s->reclength);
> +  copy_extra_record_fields(table, m_master_reclength, m_width);
> +
> +  /*
> +    Now we have the right row to update.  The old row (the one we're
> +    looking for) is in record[1] and the new row has is in record[0].
> +    We also have copied the original values already in the slave's
> +    database into the after image delivered from the master.
> +  */
> +  error= table->file->ha_update_row(table->record[1],
> table->record[0]);
> +  if (error == HA_ERR_RECORD_IS_THE_SAME)
> +    error= 0;
>  
>    return error;
>  }
> diff -Nrup a/sql/log_event_old.h b/sql/log_event_old.h
> --- a/sql/log_event_old.h	2007-05-10 11:59:28 +02:00
> +++ b/sql/log_event_old.h	2007-07-06 16:58:06 +02:00
> @@ -20,9 +20,90 @@
>    Need to include this file at the proper position of log_event.h
>   */
>  
> +  
> +class Old_rows_log_event
> +{
> + public:
> + 
> +  virtual ~Old_rows_log_event() {}
> +
> + protected:
> +  
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> +  virtual int do_apply_event(Rows_log_event*,const RELAY_LOG_INFO*);
> +
> +  /*
> +    Primitive to prepare for a sequence of row executions.
> +
> +    DESCRIPTION
> +
> +      Before doing a sequence of do_prepare_row() and do_exec_row()
> +      calls, this member function should be called to prepare for the
> +      entire sequence. Typically, this member function will allocate
> +      space for any buffers that are needed for the two member
> +      functions mentioned above.
> +
> +    RETURN VALUE
> +
> +      The member function will return 0 if all went OK, or a non-zero
> +      error code otherwise.
> +  */
> +  virtual int do_before_row_operations(TABLE *table) = 0;
> +
> +  /*
> +    Primitive to clean up after a sequence of row executions.
> +
> +    DESCRIPTION
> +    
> +      After doing a sequence of do_prepare_row() and do_exec_row(),
> +      this member function should be called to clean up and release
> +      any allocated buffers.
> +  */
> +  virtual int do_after_row_operations(TABLE *table, int error) = 0;
> +
> +  /*
> +    Primitive to prepare for handling one row in a row-level event.
> +    
> +    DESCRIPTION 
> +
> +      The member function prepares for execution of operations needed for one
> +      row in a row-level event by reading up data from the buffer containing
> +      the row. No specific interpretation of the data is normally done here,
> +      since SQL thread specific data is not available: that data is made
> +      available for the do_exec function.
> +
> +      A pointer to the start of the next row, or NULL if the preparation
> +      failed. Currently, preparation cannot fail, but don't rely on this
> +      behavior. 
> +
> +    RETURN VALUE
> +      Error code, if something went wrong, 0 otherwise.
> +   */
> +  virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
> +                             uchar const *row_start,
> +                             uchar const **row_end) = 0;
> +
> +  /*
> +    Primitive to do the actual execution necessary for a row.
>  
> -class Write_rows_log_event_old : public Write_rows_log_event
> +    DESCRIPTION
> +      The member function will do the actual execution needed to handle a row.
> +
> +    RETURN VALUE
> +      0 if execution succeeded, 1 if execution failed.
> +      
> +  */
> +  virtual int do_exec_row(TABLE *table) = 0;
> +
> +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +};
> +
> +
> +class Write_rows_log_event_old 
> + : public Write_rows_log_event, public Old_rows_log_event
>  {
> +
>  public:
>    enum
>    {
> @@ -49,14 +130,26 @@ private:
>    virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +  // use old definition of do_apply_event()
> +  virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> +  { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> +  // primitives for old version of do_apply_event()
> +  virtual int do_before_row_operations(TABLE *table);
> +  virtual int do_after_row_operations(TABLE *table, int error);
>    virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
>                               uchar const *row_start, uchar const **row_end);
> +  virtual int do_exec_row(TABLE *table);
> +
>  #endif
>  };
>  
>  
> -class Update_rows_log_event_old : public Update_rows_log_event
> +class Update_rows_log_event_old 
> +  : public Update_rows_log_event, public Old_rows_log_event
>  {
> +  uchar *m_after_image, *m_memory;
> +  
>  public:
>    enum 
>    {
> @@ -67,14 +160,16 @@ public:
>  #if !defined(MYSQL_CLIENT)
>    Update_rows_log_event_old(THD *thd, TABLE *table, ulong table_id,
>                             MY_BITMAP const *cols, bool is_transactional)
> -    : Update_rows_log_event(thd, table, table_id, cols, is_transactional)
> +    : Update_rows_log_event(thd, table, table_id, cols, is_transactional),
> +      m_after_image(NULL), m_memory(NULL)
>    {
>    }
>  #endif
>  #if defined(HAVE_REPLICATION)
>    Update_rows_log_event_old(const char *buf, uint event_len, 
>                              const Format_description_log_event *descr)
> -    : Update_rows_log_event(buf, event_len, descr)
> +    : Update_rows_log_event(buf, event_len, descr),
> +      m_after_image(NULL), m_memory(NULL)
>    {
>    }
>  #endif
> @@ -83,14 +178,25 @@ private:
>    virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +  // use old definition of do_apply_event()
> +  virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> +  { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> +  // primitives for old version of do_apply_event()
> +  virtual int do_before_row_operations(TABLE *table);
> +  virtual int do_after_row_operations(TABLE *table, int error);
>    virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
>                               uchar const *row_start, uchar const **row_end);
> +  virtual int do_exec_row(TABLE *table);
>  #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
>  };
>  
>  
> -class Delete_rows_log_event_old : public Delete_rows_log_event
> +class Delete_rows_log_event_old 
> +  : public Delete_rows_log_event, public Old_rows_log_event
>  {
> +  uchar *m_after_image, *m_memory;
> + 
>  public:
>    enum 
>    {
> @@ -101,14 +207,16 @@ public:
>  #if !defined(MYSQL_CLIENT)
>    Delete_rows_log_event_old(THD *thd, TABLE *table, ulong table_id,
>                             MY_BITMAP const *cols, bool is_transactional)
> -    : Delete_rows_log_event(thd, table, table_id, cols, is_transactional)
> +    : Delete_rows_log_event(thd, table, table_id, cols, is_transactional),
> +      m_after_image(NULL), m_memory(NULL)
>    {
>    }
>  #endif
>  #if defined(HAVE_REPLICATION)
>    Delete_rows_log_event_old(const char *buf, uint event_len,
>                              const Format_description_log_event *descr)
> -    : Delete_rows_log_event(buf, event_len, descr)
> +    : Delete_rows_log_event(buf, event_len, descr),
> +      m_after_image(NULL), m_memory(NULL)
>    {
>    }
>  #endif
> @@ -117,8 +225,16 @@ private:
>    virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +  // use old definition of do_apply_event()
> +  virtual int do_apply_event(const RELAY_LOG_INFO *rli)
> +  { return Old_rows_log_event::do_apply_event(this,rli); }
> +
> +  // primitives for old version of do_apply_event()
> +  virtual int do_before_row_operations(TABLE *table);
> +  virtual int do_after_row_operations(TABLE *table, int error);
>    virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
>                               uchar const *row_start, uchar const **row_end);
> +  virtual int do_exec_row(TABLE *table);
>  #endif
>  };
>  
> diff -Nrup a/sql/rpl_record.cc b/sql/rpl_record.cc
> --- a/sql/rpl_record.cc	2007-06-11 22:15:28 +02:00
> +++ b/sql/rpl_record.cc	2007-07-06 16:58:06 +02:00
> @@ -133,26 +133,21 @@ pack_row(TABLE *table, MY_BITMAP const* 
>     the various member functions of Field and subclasses expect to
>     write.
>  
> -   The row is assumed to only consist of the fields for which the
> -   bitset represented by @c arr and @c bits; the other parts of the
> -   record are left alone.
> +   The row is assumed to only consist of the fields for which the corresponding
> +   bit in bitset @c cols is set; the other parts of the record are left alone.
>  
>     At most @c colcnt columns are read: if the table is larger than
>     that, the remaining fields are not filled in.
>  
> -   @param rli     Relay log info
>     @param table   Table to unpack into
>     @param colcnt  Number of columns to read from record
> -   @param row     Packed row data
> -   @param cols    Pointer to columns data to fill in
> +   @param row_data Packed row data
> +   @param cols    Pointer to bitset describing columns to fill in
>     @param row_end Pointer to variable that will hold the value of the
>                    one-after-end position for the row
>     @param master_reclength
>                    Pointer to variable that will be set to the length of the
>                    record on the master side
> -   @param rw_set  Pointer to bitmap that holds either the read_set or the
> -                  write_set of the table
> -
>  
>     @retval 0 No error
>  
> @@ -163,11 +158,9 @@ pack_row(TABLE *table, MY_BITMAP const* 
>   */
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
>  int
> -unpack_row(RELAY_LOG_INFO const *rli,
> -           TABLE *table, uint const colcnt,
> +unpack_row(TABLE *table, uint const colcnt,
>             uchar const *const row_data, MY_BITMAP const *cols,
> -           uchar const **const row_end, ulong *const master_reclength,
> -           MY_BITMAP* const rw_set, Log_event_type const event_type)
> +           uchar const **const row_end, ulong *const master_reclength)
>  {
>    DBUG_ENTER("unpack_row");
>    DBUG_ASSERT(row_data);
> @@ -177,10 +170,6 @@ unpack_row(RELAY_LOG_INFO const *rli,
>    uchar const *null_ptr= row_data;
>    uchar const *pack_ptr= row_data + master_null_byte_count;
>  
> -  bitmap_clear_all(rw_set);
> -
> -  empty_record(table);
> -
>    Field **const begin_ptr = table->field;
>    Field **field_ptr;
>    Field **const end_ptr= begin_ptr + colcnt;
> @@ -225,7 +214,6 @@ unpack_row(RELAY_LOG_INFO const *rli,
>          pack_ptr= f->unpack(f->ptr, pack_ptr);
>        }
>  
> -      bitmap_set_bit(rw_set, f->field_index);
>        null_mask <<= 1;
>      }
>    }
> @@ -244,30 +232,46 @@ unpack_row(RELAY_LOG_INFO const *rli,
>      else
>        *master_reclength = table->s->reclength;
>    }
> +  
> +  DBUG_RETURN(error);
> +}
> +
> +/**
> +  Fills table->record[0] with default values.
> +
> +  First @c empty_record() is called and then, additionally, fields are
> +  initialized explicitly with a call to @c set_default().
> +
> +  For optimization reasons, the explicit initialization can be skipped for
> +  first @c skip fields. This is useful if later we are going to fill these 
> +  fields from other source (e.g. from a Rows replication event).
> +
> +  If @c check is true, fields are explicitly initialized only if they have
> +  default value or can be NULL. Otherwise error is reported.
> + */ 
> +int prepare_record(const Slave_reporting_capability *const log, 
> +                   TABLE *const table, 
> +                   const uint skip, const bool check)
> +{
> +  DBUG_ENTER("prepare_record");
>  
> -  /*
> -    Set properties for remaining columns, if there are any. We let the
> -    corresponding bit in the write_set be set, to write the value if
> -    it was not there already. We iterate over all remaining columns,
> -    even if there were an error, to get as many error messages as
> -    possible.  We are still able to return a pointer to the next row,
> -    so redo that.
> +  int error= 0;
> +  empty_record(table);
>  
> -    This generation of error messages is only relevant when inserting
> -    new rows.
> -   */
> -  for ( ; *field_ptr ; ++field_ptr)
> +  /* Explicit initialization of fields */
> +
> +  for (Field **field_ptr= table->field+skip ; *field_ptr ; ++field_ptr)
>    {
>      uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG;
>      Field *const f= *field_ptr;
>  
> -    if (event_type == WRITE_ROWS_EVENT &&
> -        ((*field_ptr)->flags & mask) == mask)
> +    if (check && ((f->flags & mask) == mask))
>      {
> -      rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
> +      DBUG_ASSERT(log);
> +      log->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
>                    "Field `%s` of table `%s`.`%s` "
>                    "has no default value and cannot be NULL",
> -                  (*field_ptr)->field_name, table->s->db.str,
> +                  f->field_name, table->s->db.str,
>                    table->s->table_name.str);
>        error = ER_NO_DEFAULT_FOR_FIELD;
>      }
> diff -Nrup a/sql/rpl_record.h b/sql/rpl_record.h
> --- a/sql/rpl_record.h	2007-05-10 11:59:28 +02:00
> +++ b/sql/rpl_record.h	2007-07-06 16:58:06 +02:00
> @@ -16,18 +16,21 @@
>  #ifndef RPL_RECORD_H
>  #define RPL_RECORD_H
>  
> +#include <rpl_reporting.h>
> +
>  #if !defined(MYSQL_CLIENT)
>  size_t pack_row(TABLE* table, MY_BITMAP const* cols,
>                  uchar *row_data, const uchar *data);
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int unpack_row(RELAY_LOG_INFO const *rli,
> -               TABLE *table, uint const colcnt,
> +int unpack_row(TABLE *table, uint const colcnt,
>                 uchar const *const row_data, MY_BITMAP const *cols,
> -               uchar const **const row_end, ulong *const master_reclength,
> -               MY_BITMAP* const rw_set,
> -               Log_event_type const event_type);
> +               uchar const **const row_end, ulong *const master_reclength); 
> +
> +// Fill table's record[0] with default values.
> +int prepare_record(const Slave_reporting_capability *const, TABLE *const, 
> +                   const uint =0, const bool =FALSE);
>  #endif
>  
>  #endif
> diff -Nrup a/sql/rpl_utility.h b/sql/rpl_utility.h
> --- a/sql/rpl_utility.h	2007-05-24 00:39:21 +02:00
> +++ b/sql/rpl_utility.h	2007-07-06 16:58:06 +02:00
> @@ -134,4 +134,35 @@ struct RPL_TABLE_LIST
>    table_def m_tabledef;
>  };
>  
> +
> +/* Anonymous namespace for template functions/classes */
> +namespace {
> +
> +  /*
> +    Smart pointer that will automatically call my_afree (a macro) when
> +    the pointer goes out of scope.  This is used so that I do not have
> +    to remember to call my_afree() before each return.  There is no
> +    overhead associated with this, since all functions are inline.
> +
> +    I (Matz) would prefer to use the free function as a template
> +    parameter, but that is not possible when the "function" is a
> +    macro.
> +  */
> +  template <class Obj>
> +  class auto_afree_ptr
> +  {
> +    Obj* m_ptr;
> +  public:
> +    auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
> +    ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
> +    void assign(Obj* ptr) {
> +      /* Only to be called if it hasn't been given a value before. */
> +      DBUG_ASSERT(m_ptr == NULL);
> +      m_ptr= ptr;
> +    }
> +    Obj* get() { return m_ptr; }
> +  };
> +
> +}
> +
>  #endif /* RPL_UTILITY_H */
>
> -- 
> MySQL Code Commits Mailing List
> For list archives: http://lists.mysql.com/commits
> To unsubscribe:    http://lists.mysql.com/commits?unsub=1
>
Thread
bk commit into 5.1 tree (rafal:1.2530) BUG#21842rsomla6 Jul
  • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842Andrei Elkin10 Jul
  • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842Mats Kindahl22 Aug
    • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842Mats Kindahl23 Aug
    • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part 1Rafal Somla23 Aug
      • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part 1Mats Kindahl23 Aug
        • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part 1Mats Kindahl23 Aug
        • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part 1Rafal Somla23 Aug
          • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part 1Mats Kindahl23 Aug
    • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part2Rafal Somla23 Aug
      • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part2Mats Kindahl23 Aug
    • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part3Rafal Somla23 Aug
      • Re: bk commit into 5.1 tree (rafal:1.2530) BUG#21842 - part3Mats Kindahl23 Aug