List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:March 28 2012 3:58pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4907 to 4908)
Bug#54854 Bug#11762277
View as plain text  
 4908 Frazer Clement	2012-03-28
      Bug#54854 / Bug#11762277 CAN'T FIND GOOD POSITION FOR REPLICATION BREAK BETWEEN DDL STATEMENTS 
      
      Problem
      -------
      
      Replication channel cutover uses the last applied epoch on the slave to determine where
      to begin replication from on the new Master.
      
      The last applied epoch is obtained from the Slave's mysql.ndb_apply_status table.
      The new Master's mysql.ndb_binlog_index table is queried to find the first epoch *after* the
      Slave's last applied epoch, the binlog file and offset of this epoch are used to start
      replication from the new master.
      
      Issues : 
       1) There may be *no* epoch after the last applied epoch
          If log-empty-epochs=0, and even in normal cases, where the slave is up-to-date and
          no new epoch has been finalised on the Master.
       2) As epochs are not continuously numbered, there may be a gap between the last applied
          epoch and the next.  It is not possible to determine what the next epoch number will
          be.  If the new Master is missing some epochs, the current cutover mechanism will 
          silently skip over them and jump to the first available epoch 
       3) Where there is DDL between the last applied epoch and the next epoch, the cutover mechanism
          will skip the DDL.  If the DDL has been applied then this is ok, if it has not, then it
          is silently skipped.
      
      Solution
      --------
      
      This series implements a more precise mechanism for performing replication channel cutover.
      This allows us to ensure that a replication channel cutover begins replication precisely
      after the end of the last committed epoch on the Slave.
      
      This involves :
        - Modifications to the MySQL Server Binlog code to record the next position in the Binlog
          after the COMMIT event at the end of an epoch transaction
        - Modifications to the mysql.ndb_binlog_index table schema to include next_file and next_position
          columns
        - Modifications to the Ndb Binlog injector to set the next_file and next_position columns in
          the mysql.ndb_binlog_index table.
      
      The existing replication channel cutover mechanism continues to work, with the same limitations
      as before.
      A new channel cutover mechanism is defined, making use of the new columns.
      
      Old channel cutover mechanism, given a last applied epoch from the slave.
      
        SELECT File, position from mysql.ndb_binlog_index where epoch > <last_applied_epoch>;
      
      New channel cutover mechanism :
        SELECT next_file, next_position from mysql.ndb_binlog_index where epoch = <last_applied_epoch>;
      
      Note that i) This statement uses the last applied epoch directly - there is no dependency on there
      being a following epoch, ii) There is no risk of silently 'jumping' over an epoch gap during 
      replication channel cutover, iii) Any DDL after the last applied epoch will be (re)applied.
      
      Reapplying inter-epoch DDL can result in errors on the Slave.  This is considered better than the
      old channel cutover mechanism which can result in silently skipping DDL.  A separate patch series
      implements 'DDL ignore existance errors' handling.
      
      This series includes a testcase which verifies the correctness of the next_position under 
      multithreaded Binlog inserts etc.

    added:
      mysql-test/suite/ndb_binlog/r/ndb_binlog_index.result
      mysql-test/suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc
      mysql-test/suite/ndb_binlog/t/ndb_binlog_index.test
      mysql-test/suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc
    modified:
      mysql-test/suite/ndb_binlog/r/ndb_binlog_ignore_db.result
      mysql-test/suite/ndb_binlog/t/ndb_binlog_ignore_db.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_idempotent.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_sync.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_idempotent.test
      scripts/mysql_system_tables.sql
      scripts/mysql_system_tables_fix.sql
      sql/ha_ndbcluster_binlog.cc
      sql/log.cc
      sql/log.h
      sql/log_event.h
      sql/rpl_injector.cc
      sql/rpl_injector.h
      sql/sql_class.cc
      sql/sql_class.h
 4907 Jonas Oreland	2012-03-28
      ndb - bump version to 7.0.33

    modified:
      configure.in
      storage/ndb/ndb_configure.m4
=== modified file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_ignore_db.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_ignore_db.result	2011-03-15 10:49:09 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_ignore_db.result	2012-03-28 15:17:32 +0000
@@ -31,8 +31,8 @@ drop table invisible;
 show binlog events from <binlog_start>;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
 -- ndb_binlog_index table (MyISAM) in mysql db 
-insert into ndb_binlog_index values (12, 'F', 23, 0, 0, 0, 0, 3, 44, 55);
-insert into ndb_binlog_index values (20, 'G', 23, 0, 0, 0, 0, 5, 44, 55);
+insert into ndb_binlog_index values (12, 'F', 23, 0, 0, 0, 0, 3, 44, 55, 66, 'H');
+insert into ndb_binlog_index values (20, 'G', 23, 0, 0, 0, 0, 5, 44, 55, 77, 'I');
 delete from ndb_binlog_index;
 -- Should be nothing in binlog
 show binlog events from <binlog_start>;

=== added file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_index.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_index.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_index.result	2012-03-28 15:17:32 +0000
@@ -0,0 +1,255 @@
+-------------------------------------------------
+First run using ndb_binlog_index table containing
+epoch end position information
+-------------------------------------------------
+show create table mysql.ndb_binlog_index;
+Table	Create Table
+ndb_binlog_index	CREATE TABLE `ndb_binlog_index` (
+  `Position` bigint(20) unsigned NOT NULL,
+  `File` varchar(255) NOT NULL,
+  `epoch` bigint(20) unsigned NOT NULL,
+  `inserts` int(10) unsigned NOT NULL,
+  `updates` int(10) unsigned NOT NULL,
+  `deletes` int(10) unsigned NOT NULL,
+  `schemaops` int(10) unsigned NOT NULL,
+  `orig_server_id` int(10) unsigned NOT NULL,
+  `orig_epoch` bigint(20) unsigned NOT NULL,
+  `gci` int(10) unsigned NOT NULL,
+  `next_position` bigint(20) unsigned NOT NULL,
+  `next_file` varchar(255) NOT NULL,
+  PRIMARY KEY (`epoch`,`orig_server_id`,`orig_epoch`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1
+mysql.ndb_binlog_index has next_file column? 1
+create table t1 (a int, b varchar(400)) engine=ndb;
+Test pure epochs
+----------------
+insert into t1 values(1, repeat('B', 400));
+insert into t1 values(1, repeat('F', 400));
+insert into t1 values(1, repeat('E', 400));
+flush logs;
+---------------------------------------------------------------------------
+Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+---------------------------------------------------------------------------
+epoch	calculated_pos	stored_pos
+Done
+reset master;
+Test interleaved epochs and DDL
+------------------------------
+insert into t1 values(1, repeat('R', 400));
+create table t2 (a int) engine=ndb;
+insert into t1 values(1, repeat('A', 400));
+create table t3 (a int) engine=ndb;
+insert into t1 value(1, repeat('A', 400));
+flush logs;
+---------------------------------------------------------------------------
+Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+---------------------------------------------------------------------------
+epoch	calculated_pos	stored_pos
+Done
+Test multithreaded interleaved epochs and DDL
+---------------------------------------------
+Issue DDL and DML concurrently on server1
+They will interleave in the Binlog according to Binlog mutex
+interactions between DDL executing server thread and binlog injector
+
+Check Binlog on DDL-source MySQLD to ensure that binlog index positions
+'cover' the Binlog
+Check Binlog on other MySQLD to ensure that binlog index positions
+'cover' the Binlog (DDL here is 'fabricated' by Binlog injector thread
+
+reset master;
+reset master;
+set sql_log_bin=0;
+create procedure dmlload (seconds int)
+begin
+set @x=time_to_sec(current_time()) + seconds;
+repeat
+start transaction;
+insert into t1 values (2, repeat('I', 400));
+commit;
+start transaction;
+update t1 set b=repeat('Z', 400) where a=2;
+commit;
+start transaction;
+delete from t1 where a=2;
+commit;
+until @x <= time_to_sec(current_time())
+end repeat;
+end%
+create procedure ddlload(seconds int)
+begin
+set @x=time_to_sec(current_time()) + seconds;
+repeat
+create table fmc (a int) engine=myisam;
+create table bah(a int) engine=ndb;
+drop table bah;
+drop table fmc;
+until @x <= time_to_sec(current_time())
+end repeat;
+end%
+set sql_log_bin=1;
+call ddlload(25);
+call dmlload(25);
+Now check binlog index vs binlog itself on Server1
+flush logs;
+---------------------------------------------------------------------------
+Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+---------------------------------------------------------------------------
+epoch	calculated_pos	stored_pos
+Done
+Now check binlog index vs binlog itself on Server2
+flush logs;
+---------------------------------------------------------------------------
+Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+---------------------------------------------------------------------------
+epoch	calculated_pos	stored_pos
+Done
+drop procedure ddlload;
+drop procedure dmlload;
+Cleanup
+drop table t1;
+drop table t2;
+drop table t3;
+------------------------------------------------
+Second run using ndb_binlog_index table without
+epoch end position information
+------------------------------------------------
+alter table mysql.ndb_binlog_index drop column next_file;
+alter table mysql.ndb_binlog_index drop column next_position;
+reset master;
+show create table mysql.ndb_binlog_index;
+Table	Create Table
+ndb_binlog_index	CREATE TABLE `ndb_binlog_index` (
+  `Position` bigint(20) unsigned NOT NULL,
+  `File` varchar(255) NOT NULL,
+  `epoch` bigint(20) unsigned NOT NULL,
+  `inserts` int(10) unsigned NOT NULL,
+  `updates` int(10) unsigned NOT NULL,
+  `deletes` int(10) unsigned NOT NULL,
+  `schemaops` int(10) unsigned NOT NULL,
+  `orig_server_id` int(10) unsigned NOT NULL,
+  `orig_epoch` bigint(20) unsigned NOT NULL,
+  `gci` int(10) unsigned NOT NULL,
+  PRIMARY KEY (`epoch`,`orig_server_id`,`orig_epoch`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1
+mysql.ndb_binlog_index has next_file column? 0
+create table t1 (a int, b varchar(400)) engine=ndb;
+Test pure epochs
+----------------
+insert into t1 values(1, repeat('B', 400));
+insert into t1 values(1, repeat('F', 400));
+insert into t1 values(1, repeat('E', 400));
+flush logs;
+Nothing to verify
+reset master;
+Test interleaved epochs and DDL
+------------------------------
+insert into t1 values(1, repeat('R', 400));
+create table t2 (a int) engine=ndb;
+insert into t1 values(1, repeat('A', 400));
+create table t3 (a int) engine=ndb;
+insert into t1 value(1, repeat('A', 400));
+flush logs;
+Nothing to verify
+Test multithreaded interleaved epochs and DDL
+---------------------------------------------
+Issue DDL and DML concurrently on server1
+They will interleave in the Binlog according to Binlog mutex
+interactions between DDL executing server thread and binlog injector
+
+Check Binlog on DDL-source MySQLD to ensure that binlog index positions
+'cover' the Binlog
+Check Binlog on other MySQLD to ensure that binlog index positions
+'cover' the Binlog (DDL here is 'fabricated' by Binlog injector thread
+
+reset master;
+reset master;
+set sql_log_bin=0;
+create procedure dmlload (seconds int)
+begin
+set @x=time_to_sec(current_time()) + seconds;
+repeat
+start transaction;
+insert into t1 values (2, repeat('I', 400));
+commit;
+start transaction;
+update t1 set b=repeat('Z', 400) where a=2;
+commit;
+start transaction;
+delete from t1 where a=2;
+commit;
+until @x <= time_to_sec(current_time())
+end repeat;
+end%
+create procedure ddlload(seconds int)
+begin
+set @x=time_to_sec(current_time()) + seconds;
+repeat
+create table fmc (a int) engine=myisam;
+create table bah(a int) engine=ndb;
+drop table bah;
+drop table fmc;
+until @x <= time_to_sec(current_time())
+end repeat;
+end%
+set sql_log_bin=1;
+call ddlload(25);
+call dmlload(25);
+Now check binlog index vs binlog itself on Server1
+flush logs;
+Nothing to verify
+Now check binlog index vs binlog itself on Server2
+flush logs;
+---------------------------------------------------------------------------
+Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+---------------------------------------------------------------------------
+epoch	calculated_pos	stored_pos
+Done
+drop procedure ddlload;
+drop procedure dmlload;
+Cleanup
+drop table t1;
+drop table t2;
+drop table t3;
+Now restore original schema
+reset master;
+alter table mysql.ndb_binlog_index add column next_position bigint unsigned not null;
+alter table mysql.ndb_binlog_index add column next_file varchar(255) not null;
+reset master;
+--------------------------------------
+Quick test of ndb-log-empty-epochs = 1
+--------------------------------------
+select * from mysql.ndb_binlog_index order by epoch;
+Position	File	epoch	inserts	updates	deletes	schemaops	orig_server_id	orig_epoch	gci	next_position	next_file
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+show variables like 'ndb_log_empty_epochs';
+Variable_name	Value
+ndb_log_empty_epochs	OFF
+set global ndb_log_empty_epochs=1;
+show variables like 'ndb_log_empty_epochs';
+Variable_name	Value
+ndb_log_empty_epochs	ON
+Allow some empty epochs to pass...
+Show that we have entries in ndb_binlog_index
+select count(1) > 0 from mysql.ndb_binlog_index;
+count(1) > 0
+1
+Show that all ndb_binlog_index entries have the same file + position
+select bi1.epoch, bi1.Position, bi1.File, bi1.next_position, bi1.next_file,
+bi2.epoch, bi2.Position, bi2.File, bi2.next_position, bi2.next_file
+from mysql.ndb_binlog_index as bi1,
+mysql.ndb_binlog_index as bi2
+where
+# Start and Next not same
+bi1.Position != bi1.next_position OR
+bi1.File != bi1.next_file OR
+# All epochs not the same
+bi1.Position != bi2.Position OR
+bi1.File != bi2.File;
+epoch	Position	File	next_position	next_file	epoch	Position	File	next_position	next_file
+Show that we have no epochs in the Binlog
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+Disable
+set global ndb_log_empty_epochs=0;

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc	2012-03-28 15:17:32 +0000
@@ -0,0 +1,258 @@
+#
+# ndb_binlog_check_binlog_index
+#
+# This include file determines epoch boundaries from a Binlog using the
+# mysqlbinlog tool
+# It then compares the calculated boundaries with the contents of
+# the ndb_binlog_index table
+# The intention is to prove that :
+#   - for any epoch A
+#     - The mysql.ndb_binlog_index next pos for A is the first binlog location
+#       after the COMMIT event of epoch A
+#   - Therefore the presence of a committed epoch number in a slave cluster's
+#     ndb_apply_status table is taken to mean only that the transaction which
+#     wrote it committed.
+#   - The Slave should resume immediately after the committed transaction
+# Note that :
+#   - The start position (Position, File) is vaguely defined and can refer
+#     to any position after the previously committed epoch's COMMIT event
+#     and before the epoch's BEGIN event.  A test and fix for this
+#     exists, but has been shelved.
+#
+
+--disable_query_log
+set sql_log_bin=0;
+
+let have_next_pos=query_get_value(select count(1) as have_next_file from information_schema.COLUMNS where table_schema='mysql' and table_name='ndb_binlog_index' and column_name='next_file', have_next_file, 1);
+if (!$have_next_pos)
+{
+  --echo Nothing to verify
+}
+if ($have_next_pos)
+{
+
+  let $MYSQLD_DATADIR= `select @@datadir;`;
+  --exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
+
+  create table raw_binlog_rows (txt varchar(1000));
+
+  --eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
+
+  create table binlog_stmt_parts_unassoc (txt varchar(1000), line_count int, stmt_boundary int, tx_boundary int);
+
+  set @line_count=0;
+  set @stmt_boundary=0;
+
+  # Use replace() here to get rid of any unwanted Windows
+  # CRs
+  insert into binlog_stmt_parts_unassoc
+    select replace(txt, '\r', ''),
+           @line_count:= @line_count + 1,  # So we can preserve order later
+           (txt like '%INSERT%' or         # Identify statement boundaries
+            txt like '%UPDATE%' or
+            txt like '%DELETE%' or
+            txt = 'BEGIN' or
+            txt = 'COMMIT'),
+           txt = 'BEGIN'             # Transaction boundary
+      from raw_binlog_rows
+      where
+        txt like '###%' OR
+        txt = 'BEGIN' OR
+        txt = 'COMMIT' OR
+        txt like '%# at%';   # Discard stuff we don't care about
+
+  #select * from binlog_stmt_parts_unassoc;
+
+  create table binlog_stmt_parts_assoc (txt varchar(1000), line_count int, stmt_num int, tx_num int, key(stmt_num), key(line_count), key(txt));
+
+  set @stmt_count = 0;
+  set @tx_count = 0;
+
+  insert into binlog_stmt_parts_assoc
+    select txt,
+           line_count,
+           @stmt_count:= @stmt_count + stmt_boundary,   # All rows from same stmt will
+                                                        # have same stmt_num
+           @tx_count:= @tx_count + tx_boundary          # Same transaction
+
+      from binlog_stmt_parts_unassoc order by line_count;
+
+  #select * from binlog_stmt_parts_assoc;
+
+  create table apply_status_stmt_nums (stmt_num int primary key);
+
+  insert into apply_status_stmt_nums
+    select stmt_num from binlog_stmt_parts_assoc
+    where txt like '%INSERT INTO mysql.ndb_apply_status%';
+
+  create table relevant_info (txt varchar(1000), tx_num int, line_count int);
+
+  insert into relevant_info
+    # Epoch number to tx_num mapping
+    #  ###  @2= <epoch>, <tx_num>, <line_count>
+    select bspa.txt, bspa.tx_num, bspa.line_count
+    from
+      binlog_stmt_parts_assoc as bspa,
+      apply_status_stmt_nums
+    where
+      (bspa.stmt_num = apply_status_stmt_nums.stmt_num
+           and
+       bspa.txt like '%@2=%')                # Epoch number
+  union
+    # BEGIN and COMMIT event to tx_num and file position mapping
+    # BEGIN # at <offset>, <tx_num>, <line_count>
+    # COMMIT # at <offset>, <tx_num>, <line_count>
+    #
+    select concat(bspa2.txt, " ", bspa1.txt), bspa2.tx_num, bspa1.line_count
+    from
+      binlog_stmt_parts_assoc as bspa1,
+      binlog_stmt_parts_assoc as bspa2
+    where
+      (bspa2.txt = 'BEGIN' and
+       bspa1.line_count = bspa2.line_count - 1) # Line before BEGIN event
+      or
+      (bspa2.txt = 'COMMIT' and
+       bspa1.line_count = bspa2.line_count + 1); # Line after COMMIT event
+
+  #select * from relevant_info order by line_count;
+
+  create table epoch_info (num int, epoch bigint, start_pos bigint, next_pos bigint);
+  set @epoch_num=0;
+
+  insert into epoch_info
+    select @epoch_num:=@epoch_num + 1,
+    (right(ri1.txt, length(ri1.txt) - length('###   @2='))) + 0, # epoch number
+     (right(ri2.txt, length(ri2.txt) - length('BEGIN # at '))) + 0, # start pos
+      (right(ri3.txt, length(ri3.txt) - length('COMMIT # at '))) + 0  # end pos
+    from
+      relevant_info as ri1, relevant_info as ri2, relevant_info as ri3
+    where
+      ri1.tx_num = ri2.tx_num
+      and
+      ri1.tx_num = ri3.tx_num
+      and
+      ri1.txt like '%@2=%'
+      and
+      ri2.txt like '%BEGIN%'
+      and
+      ri3.txt like '%COMMIT%';
+
+  #select * from epoch_info order by num;
+
+  # Insert dummy row 0 to give start pos of first epoch
+  --let $first_event_pos= query_get_value(SHOW BINLOG EVENTS LIMIT 1, End_log_pos, 1)
+  eval insert into epoch_info values (0,0,0,$first_event_pos);
+
+
+  # Get epoch info where following epoch starts at end of previous epoch
+  create table adjusted_epoch_info (epoch bigint, start_pos bigint);
+
+  insert into adjusted_epoch_info
+    select e2.epoch, e1.next_pos as start_pos
+      from
+        epoch_info as e1, epoch_info as e2
+      where
+        e2.num = e1.num + 1;
+
+  #select * from adjusted_epoch_info;
+
+  # Should not return any rows
+  --echo ---------------------------------------------------------------------------
+  --echo Mismatches between Binlog index next_pos and Binlog COMMIT event pos
+  --echo ---------------------------------------------------------------------------
+
+  select binlog.epoch,
+           binlog.next_pos as calculated_pos,
+           binlog_index.next_position as stored_pos
+    from epoch_info as binlog,
+           mysql.ndb_binlog_index as binlog_index
+    where binlog.epoch = binlog_index.epoch AND
+            binlog.next_pos != binlog_index.next_position;
+
+  --echo Done
+
+  # Following commented out as it is an (understandably) non-deterministic
+  # race
+  #
+  #--echo ----------------------------------------------
+  #--echo Any gaps between epoch n next_pos and epoch n+1 start_pos
+  #--echo ----------------------------------------------
+  ## This indicates that other events (e.g. DDL) were inserted between the end of
+  ## one epoch and the recorded start pos of the next epoch
+  ##select binlog.epoch,
+  ##         binlog.start_pos as calculated_start_pos,
+  ##         bi.Position as stored_start_pos
+  #select count(1) > 0
+  #  from
+  #    adjusted_epoch_info as binlog,
+  #    mysql.ndb_binlog_index as bi
+  #  where
+  #    binlog.epoch = bi.epoch
+  #    and
+  #    binlog.start_pos != bi.Position;
+  #
+  #--echo Done
+
+  # Following is commented out as it is an (understandably) non-deterministic
+  # race
+  #
+  #--echo -----------------------------------------------
+  #--echo Any stored start positions different to BEGIN positions
+  #--echo -----------------------------------------------
+  ## This indicates that other events (e.g. DDL) were inserted between the recorded
+  ## start of an epoch, and the actual start of the transaction (BEGIN)
+  ##
+  ##select binlog.epoch,
+  ##         binlog.start_pos as calculated_pos,
+  ##         binlog_index.position as stored_pos
+  #select count(1) > 0
+  #  from epoch_info as binlog,
+  #         mysql.ndb_binlog_index as binlog_index
+  #  where binlog.epoch = binlog_index.epoch AND
+  #          binlog.start_pos != binlog_index.position;
+  #
+  #--echo Done
+
+  # The following test is no longer relevant as epochs are
+  # not guaranteed adjacent.
+  #
+  #create table bi_offsets (count bigint,
+  #                              epoch bigint,
+  #                              start_pos bigint,
+  #                              next_pos bigint);
+
+  #set @epoch_count=0;
+
+  #insert into bi_offsets
+  #  select @epoch_count:=@epoch_count+1,
+  #    epoch, Position, next_position
+  #    from mysql.ndb_binlog_index
+  #    order by epoch asc;
+
+  #select * from bi_offsets order by epoch;
+
+  #--echo ----------------------------------------
+  #--echo Non adjacent epochs in ndb_binlog_index
+  #--echo ----------------------------------------
+
+  #select bio1.count, bio1.epoch, bio1.start_pos, bio1.next_pos,
+  #         bio2.count, bio2.epoch, bio2.start_pos, bio2.next_pos
+  #  from bi_offsets as bio1, bi_offsets as bio2
+  #  where
+  #    bio2.count = bio1.count + 1 AND
+  #    bio2.start_pos != bio1.next_pos;
+
+  #--echo Done.
+
+  #drop table bi_offsets;
+
+  drop table adjusted_epoch_info;
+  drop table epoch_info;
+  drop table relevant_info;
+  drop table apply_status_stmt_nums;
+  drop table binlog_stmt_parts_assoc;
+  drop table binlog_stmt_parts_unassoc;
+  drop table raw_binlog_rows;
+}
+set sql_log_bin=1;
+--enable_query_log

=== modified file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_ignore_db.test'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_ignore_db.test	2011-03-15 10:49:09 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_ignore_db.test	2012-03-28 15:17:32 +0000
@@ -41,8 +41,8 @@ drop table invisible;
 
 --echo -- ndb_binlog_index table (MyISAM) in mysql db 
 
-insert into ndb_binlog_index values (12, 'F', 23, 0, 0, 0, 0, 3, 44, 55);
-insert into ndb_binlog_index values (20, 'G', 23, 0, 0, 0, 0, 5, 44, 55);
+insert into ndb_binlog_index values (12, 'F', 23, 0, 0, 0, 0, 3, 44, 55, 66, 'H');
+insert into ndb_binlog_index values (20, 'G', 23, 0, 0, 0, 0, 5, 44, 55, 77, 'I');
 
 delete from ndb_binlog_index;
 

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_index.test'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_index.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_index.test	2012-03-28 15:17:32 +0000
@@ -0,0 +1,75 @@
+--source include/have_ndb.inc
+--source include/have_log_bin.inc
+
+--echo -------------------------------------------------
+--echo First run using ndb_binlog_index table containing
+--echo  epoch end position information
+--echo -------------------------------------------------
+show create table mysql.ndb_binlog_index;
+
+--source suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc
+
+--echo ------------------------------------------------
+--echo Second run using ndb_binlog_index table without
+--echo  epoch end position information
+--echo ------------------------------------------------
+
+alter table mysql.ndb_binlog_index drop column next_file;
+alter table mysql.ndb_binlog_index drop column next_position;
+reset master;
+show create table mysql.ndb_binlog_index;
+
+--source suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc
+
+--echo Now restore original schema
+reset master;
+
+alter table mysql.ndb_binlog_index add column next_position bigint unsigned not null;
+alter table mysql.ndb_binlog_index add column next_file varchar(255) not null;
+
+reset master;
+
+--echo --------------------------------------
+--echo Quick test of ndb-log-empty-epochs = 1
+--echo --------------------------------------
+
+select * from mysql.ndb_binlog_index order by epoch;
+
+--source include/show_binlog_events.inc
+show variables like 'ndb_log_empty_epochs';
+
+set global ndb_log_empty_epochs=1;
+show variables like 'ndb_log_empty_epochs';
+
+--echo Allow some empty epochs to pass...
+
+let $got_some_epochs=0;
+
+while(!$got_some_epochs)
+{
+  let $got_some_epochs=query_get_value(select (count(1) > 10) as num from mysql.ndb_binlog_index, num, 1);
+  --sleep 1
+}
+
+--echo Show that we have entries in ndb_binlog_index
+select count(1) > 0 from mysql.ndb_binlog_index;
+
+--echo Show that all ndb_binlog_index entries have the same file + position
+select bi1.epoch, bi1.Position, bi1.File, bi1.next_position, bi1.next_file,
+       bi2.epoch, bi2.Position, bi2.File, bi2.next_position, bi2.next_file
+from mysql.ndb_binlog_index as bi1,
+     mysql.ndb_binlog_index as bi2
+where
+     # Start and Next not same
+     bi1.Position != bi1.next_position OR
+     bi1.File != bi1.next_file OR
+     # All epochs not the same
+     bi1.Position != bi2.Position OR
+     bi1.File != bi2.File;
+
+--echo Show that we have no epochs in the Binlog
+--source include/show_binlog_events.inc
+
+--echo Disable
+set global ndb_log_empty_epochs=0;
+

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_index_test_schema_independent.inc	2012-03-28 15:17:32 +0000
@@ -0,0 +1,169 @@
+
+let have_next_pos=query_get_value(select count(1) as have_next_file from information_schema.COLUMNS where table_schema='mysql' and table_name='ndb_binlog_index' and column_name='next_file', have_next_file, 1);
+let next_file_col=next_file;
+let next_pos_col=next_position;
+
+if (!$have_next_pos)
+{
+  let next_file_col="------"; # Just used in right($next_file_col, 6)
+  let next_pos_col=1;         # Just used in ($next_pos_col = 0)
+}
+
+--echo mysql.ndb_binlog_index has next_file column? $have_next_pos
+
+create table t1 (a int, b varchar(400)) engine=ndb;
+
+#
+# Test that entries in the mysql.ndb_binlog_index file correctly align
+# with the offsets in the binlog
+# Most testing is done with the next_position, as the position is
+# subject to concurrency issues.
+#
+
+--echo Test pure epochs
+--echo ----------------
+
+insert into t1 values(1, repeat('B', 400));
+
+--disable_query_log
+--disable_result_log
+show binlog events; # wait for binlog-sync and therefore epoch end
+--enable_result_log
+--enable_query_log
+
+insert into t1 values(1, repeat('F', 400));
+
+--disable_query_log
+--disable_result_log
+show binlog events; # wait for binlog-sync and therefore epoch end
+--enable_result_log
+--enable_query_log
+
+insert into t1 values(1, repeat('E', 400));
+
+--disable_query_log
+--disable_result_log
+show binlog events; # wait for binlog-sync and therefore epoch end
+--enable_result_log
+--enable_query_log
+
+flush logs;
+--source suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc
+
+reset master;
+
+--echo Test interleaved epochs and DDL
+--echo ------------------------------
+
+insert into t1 values(1, repeat('R', 400));
+
+create table t2 (a int) engine=ndb;
+
+insert into t1 values(1, repeat('A', 400));
+
+create table t3 (a int) engine=ndb;
+
+insert into t1 value(1, repeat('A', 400));
+
+
+flush logs;
+--source suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc
+
+
+--echo Test multithreaded interleaved epochs and DDL
+--echo ---------------------------------------------
+--echo Issue DDL and DML concurrently on server1
+--echo They will interleave in the Binlog according to Binlog mutex
+--echo interactions between DDL executing server thread and binlog injector
+--echo
+--echo Check Binlog on DDL-source MySQLD to ensure that binlog index positions
+--echo 'cover' the Binlog
+--echo Check Binlog on other MySQLD to ensure that binlog index positions
+--echo 'cover' the Binlog (DDL here is 'fabricated' by Binlog injector thread
+--echo
+
+--connect (server1con1, 127.0.0.1,root,,test,$MASTER_MYPORT,)
+--connect (server1con2, 127.0.0.1,root,,test,$MASTER_MYPORT,)
+--connect (server2con1, 127.0.0.1,root,,test,$MASTER_MYPORT1,)
+
+--connection server1con1
+reset master;
+
+--connection server2con1
+reset master;
+
+--connection server1con1
+
+set sql_log_bin=0;
+delimiter %;
+create procedure dmlload (seconds int)
+begin
+  set @x=time_to_sec(current_time()) + seconds;
+  repeat
+    start transaction;
+      insert into t1 values (2, repeat('I', 400));
+    commit;
+    start transaction;
+      update t1 set b=repeat('Z', 400) where a=2;
+    commit;
+    start transaction;
+      delete from t1 where a=2;
+    commit;
+  until @x <= time_to_sec(current_time())
+  end repeat;
+end%
+
+create procedure ddlload(seconds int)
+begin
+  set @x=time_to_sec(current_time()) + seconds;
+  repeat
+    create table fmc (a int) engine=myisam;
+    create table bah(a int) engine=ndb;
+    drop table bah;
+    drop table fmc;
+  until @x <= time_to_sec(current_time())
+  end repeat;
+end%
+
+delimiter ;%
+
+set sql_log_bin=1;
+
+# Set DDL running in 'background'
+--connection server1con2
+send call ddlload(25);
+
+# And do DML in 'foreground'
+--connection server1con1
+call dmlload(25);
+
+# Retrieve DDL result
+--connection server1con2
+reap;
+
+--connection server1con1
+
+--echo Now check binlog index vs binlog itself on Server1
+flush logs;
+--source suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc
+
+--connection server2con1
+--echo Now check binlog index vs binlog itself on Server2
+flush logs;
+--source suite/ndb_binlog/t/ndb_binlog_check_binlog_index.inc
+
+--connection server1con1
+
+drop procedure ddlload;
+drop procedure dmlload;
+
+
+--echo Cleanup
+drop table t1;
+drop table t2;
+drop table t3;
+
+--connection default
+--disconnect server1con1
+--disconnect server1con2
+--disconnect server2con1

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_idempotent.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_idempotent.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_idempotent.result	2012-03-28 15:17:32 +0000
@@ -1,6 +1,10 @@
 include/master-slave.inc
 [connection master]
 CREATE TABLE t1 (c1 CHAR(15), c2 CHAR(15), c3 INT, PRIMARY KEY (c3)) ENGINE = NDB ;
+STOP SLAVE;
+RESET MASTER;
+RESET SLAVE;
+START SLAVE;
 INSERT INTO t1 VALUES ("row1","will go away",1);
 SELECT * FROM t1 ORDER BY c3;
 c1	c2	c3
@@ -26,9 +30,6 @@ row3	C	3
 row4	D	4
 include/check_slave_is_running.inc
 STOP SLAVE;
-CHANGE MASTER TO
-master_log_file = 'master-bin.000001',
-master_log_pos = <the_pos> ;
 include/check_slave_no_error.inc
 START SLAVE;
 SELECT * FROM t1 ORDER BY c3;
@@ -57,4 +58,47 @@ c1	c2	c3
 row2	new on slave	2
 include/check_slave_is_running.inc
 DROP TABLE IF EXISTS t1;
+STOP SLAVE;
+RESET MASTER;
+RESET SLAVE;
+START SLAVE;
+Let's interleave some DDL and DML in the Binlog
+Some DDL
+create table t1(a int primary key) engine=ndb;
+First epoch transaction
+begin;
+insert into t1 values (1), (2), (3);
+commit;
+Sync slave and retrieve epoch
+SELECT @first_epoch:=MAX(epoch) FROM mysql.ndb_apply_status;
+@first_epoch:=MAX(epoch)
+<first_epoch>
+Slave contents
+select * from t1 ORDER by a;
+a
+1
+2
+3
+Get the next master binlog pos from the epoch
+Now let's do some more DDL and DML
+create table t2 (a int primary key) engine=ndb;
+begin;
+insert into t2 values (1), (2), (3);
+commit;
+include/check_slave_is_running.inc
+Stop slave and reset position to start of the applied epoch
+STOP SLAVE;
+include/check_slave_no_error.inc
+insert into mtr.test_suppressions values
+("Slave: Table \'t2\' already exists .*"),
+("Slave SQL: Error \'Table \'t2\' already exists\' .*");
+START SLAVE;
+include/wait_for_slave_sql_error.inc [errno=1050]
+Last_SQL_Error = 'Error 'Table 't2' already exists' on query. Default database: 'test'. Query: 'create table t2 (a int primary key) engine=ndb''
+STOP SLAVE;
+RESET MASTER;
+RESET SLAVE;
+START SLAVE;
+DROP TABLE t1;
+DROP TABLE t2;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_sync.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_sync.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_sync.result	2012-03-28 15:17:32 +0000
@@ -75,7 +75,7 @@ DROP DATABASE ndbsynctest;
 STOP SLAVE;
 reset master;
 select * from mysql.ndb_binlog_index;
-Position	File	epoch	inserts	updates	deletes	schemaops	orig_server_id	orig_epoch	gci
+Position	File	epoch	inserts	updates	deletes	schemaops	orig_server_id	orig_epoch	gci	next_position	next_file
 reset slave;
 select * from mysql.ndb_apply_status;
 server_id	epoch	log_name	start_pos	end_pos

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_idempotent.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_idempotent.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_idempotent.test	2012-03-28 15:17:32 +0000
@@ -9,6 +9,20 @@
 
 # create a table with one row
 CREATE TABLE t1 (c1 CHAR(15), c2 CHAR(15), c3 INT, PRIMARY KEY (c3)) ENGINE = NDB ;
+
+# Replicate the table creation, then reset the master log and
+# slave to avoid the creation being replayed below
+#
+sync_slave_with_master;
+--connection slave
+STOP SLAVE;
+--connection master
+RESET MASTER;
+--connection slave
+RESET SLAVE;
+START SLAVE;
+--connection master
+
 INSERT INTO t1 VALUES ("row1","will go away",1);
 SELECT * FROM t1 ORDER BY c3;
 
@@ -49,10 +63,13 @@ source include/check_slave_is_running.in
 
 # stop slave and reset position to before the last changes
 STOP SLAVE;
---replace_result $the_pos <the_pos>
+--disable_query_log
+--disable_result_log
 eval CHANGE MASTER TO
   master_log_file = '$the_file',
   master_log_pos = $the_pos ;
+--enable_result_log
+--enable_query_log
 
 source include/check_slave_no_error.inc;
 
@@ -109,5 +126,107 @@ source include/check_slave_is_running.in
 connection master;
 DROP TABLE IF EXISTS t1;
 
+#
+# Test that the ndb_binlog_index table records
+# the start position of an epoch transaction
+# as the first position after the previous
+# epoch transaction
+#
+--sync_slave_with_master
+--connection slave
+STOP SLAVE;
+--connection master
+RESET MASTER;
+--connection slave
+RESET SLAVE;
+START SLAVE;
+--connection master
+
+--echo Let's interleave some DDL and DML in the Binlog
+
+--echo   Some DDL
+create table t1(a int primary key) engine=ndb;
+
+--echo   First epoch transaction
+begin;
+insert into t1 values (1), (2), (3);
+commit;
+
+--echo Sync slave and retrieve epoch
+--sync_slave_with_master
+--replace_column 1 <first_epoch>
+SELECT @first_epoch:=MAX(epoch) FROM mysql.ndb_apply_status;
+let $first_epoch= `select @first_epoch` ;
+
+--echo   Slave contents
+select * from t1 ORDER by a;
+
+--echo   Get the next master binlog pos from the epoch
+connection master;
+--disable_query_log
+--disable_result_log
+eval SELECT @the_pos:=next_position,
+   @the_file:=SUBSTRING_INDEX(REPLACE(next_file,'\\\\','/'), '/', -1)
+   FROM mysql.ndb_binlog_index WHERE epoch = $first_epoch ;
+let $the_pos= `SELECT @the_pos` ;
+let $the_file= `SELECT @the_file` ;
+--enable_result_log
+--enable_query_log
+
+--echo  Now let's do some more DDL and DML
+create table t2 (a int primary key) engine=ndb;
+
+begin;
+insert into t2 values (1), (2), (3);
+commit;
+
+--sync_slave_with_master
+
+--connection slave
+source include/check_slave_is_running.inc;
+
+--echo   Stop slave and reset position to start of the applied epoch
+STOP SLAVE;
+--disable_query_log
+--disable_result_log
+
+eval CHANGE MASTER TO
+  master_log_file = '$the_file',
+  master_log_pos = $the_pos;
+
+--enable_result_log
+--enable_query_log
+
+source include/check_slave_no_error.inc;
+
+# Add a suppression for the warning that will appear in the
+# Slave's .err file
+insert into mtr.test_suppressions values
+  ("Slave: Table \'t2\' already exists .*"),
+  ("Slave SQL: Error \'Table \'t2\' already exists\' .*");
+
+
+START SLAVE;
+
+--let $slave_sql_errno= 1050
+--let $show_slave_sql_error=1
+--source include/wait_for_slave_sql_error.inc
+
+STOP SLAVE;
+
+# Cleanup
+--connection master
+RESET MASTER;
+
+--connection slave
+RESET SLAVE;
+START SLAVE;
+
+--connection master
+DROP TABLE t1;
+DROP TABLE t2;
+
+--sync_slave_with_master
+
 # End of 5.1 Test
 --source include/rpl_end.inc

=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2011-06-30 15:59:25 +0000
+++ b/scripts/mysql_system_tables.sql	2012-03-28 15:17:32 +0000
@@ -98,7 +98,7 @@ DROP PREPARE stmt;
 CREATE TABLE IF NOT EXISTS event ( db char(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', name char(64) CHARACTER SET utf8 NOT NULL default '', body longblob NOT NULL, definer char(77) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', execute_at DATETIME default NULL, interval_value int(11) default NULL, interval_field ENUM('YEAR','QUARTER','MONTH','DAY','HOUR','MINUTE','WEEK','SECOND','MICROSECOND','YEAR_MONTH','DAY_HOUR','DAY_MINUTE','DAY_SECOND','HOUR_MINUTE','HOUR_SECOND','MINUTE_SECOND','DAY_MICROSECOND','HOUR_MICROSECOND','MINUTE_MICROSECOND','SECOND_MICROSECOND') default NULL, created TIMESTAMP NOT NULL, modified TIMESTAMP NOT NULL, last_executed DATETIME default NULL, starts DATETIME default NULL, ends DATETIME default NULL, status ENUM('ENABLED','DISABLED','SLAVESIDE_DISABLED') NOT NULL default 'ENABLED', on_completion ENUM('DROP','PRESERVE') NOT NULL default 'DROP', sql_mode  set('REAL_AS_FLOAT','PIPES_AS_CONCAT','ANSI_QUOTES','IGNORE_SPACE','!
 NOT_USED','ONLY_FULL_GROUP_BY','NO_UNSIGNED_SUBTRACTION','NO_DIR_IN_CREATE','POSTGRESQL','ORACLE','MSSQL','DB2','MAXDB','NO_KEY_OPTIONS','NO_TABLE_OPTIONS','NO_FIELD_OPTIONS','MYSQL323','MYSQL40','ANSI','NO_AUTO_VALUE_ON_ZERO','NO_BACKSLASH_ESCAPES','STRICT_TRANS_TABLES','STRICT_ALL_TABLES','NO_ZERO_IN_DATE','NO_ZERO_DATE','INVALID_DATES','ERROR_FOR_DIVISION_BY_ZERO','TRADITIONAL','NO_AUTO_CREATE_USER','HIGH_NOT_PRECEDENCE','NO_ENGINE_SUBSTITUTION','PAD_CHAR_TO_FULL_LENGTH') DEFAULT '' NOT NULL, comment char(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', originator INTEGER UNSIGNED NOT NULL, time_zone char(64) CHARACTER SET latin1 NOT NULL DEFAULT 'SYSTEM', character_set_client char(32) collate utf8_bin, collation_connection char(32) collate utf8_bin, db_collation char(32) collate utf8_bin, body_utf8 longblob, PRIMARY KEY (db, name) ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT 'Events';
 
 
-CREATE TABLE IF NOT EXISTS ndb_binlog_index (Position BIGINT UNSIGNED NOT NULL, File VARCHAR(255) NOT NULL, epoch BIGINT UNSIGNED NOT NULL, inserts INT UNSIGNED NOT NULL, updates INT UNSIGNED NOT NULL, deletes INT UNSIGNED NOT NULL, schemaops INT UNSIGNED NOT NULL, orig_server_id INT UNSIGNED NOT NULL, orig_epoch BIGINT UNSIGNED NOT NULL, gci INT UNSIGNED NOT NULL, PRIMARY KEY(epoch, orig_server_id, orig_epoch)) ENGINE=MYISAM;
+CREATE TABLE IF NOT EXISTS ndb_binlog_index (Position BIGINT UNSIGNED NOT NULL, File VARCHAR(255) NOT NULL, epoch BIGINT UNSIGNED NOT NULL, inserts INT UNSIGNED NOT NULL, updates INT UNSIGNED NOT NULL, deletes INT UNSIGNED NOT NULL, schemaops INT UNSIGNED NOT NULL, orig_server_id INT UNSIGNED NOT NULL, orig_epoch BIGINT UNSIGNED NOT NULL, gci INT UNSIGNED NOT NULL, next_position BIGINT UNSIGNED NOT NULL, next_file VARCHAR(255) NOT NULL, PRIMARY KEY(epoch, orig_server_id, orig_epoch)) ENGINE=MYISAM;
 
 #
 # SQL commands for creating the tables in MySQL Server which

=== modified file 'scripts/mysql_system_tables_fix.sql'
--- a/scripts/mysql_system_tables_fix.sql	2011-06-30 15:37:13 +0000
+++ b/scripts/mysql_system_tables_fix.sql	2012-03-28 15:17:32 +0000
@@ -606,3 +606,11 @@ UPDATE user SET Trigger_priv=Super_priv
 # changes was correct
 
 flush privileges;
+
+#
+# ndb_binlog_index table
+#
+ALTER TABLE ndb_binlog_index
+  ADD COLUMN next_position BIGINT UNSIGNED NOT NULL;
+ALTER TABLE ndb_binlog_index
+  ADD COLUMN next_file VARCHAR(255) NOT NULL;
\ No newline at end of file

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2012-02-17 08:03:56 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2012-03-28 15:17:32 +0000
@@ -70,6 +70,25 @@ void ndb_index_stat_restart();
 */
 static const int DEFAULT_SYNC_TIMEOUT= 120;
 
+/* Column numbers in the ndb_binlog_index table */
+enum Ndb_binlog_index_cols
+{
+  NBICOL_START_POS                 = 0
+  ,NBICOL_START_FILE               = 1
+  ,NBICOL_EPOCH                    = 2
+  ,NBICOL_NUM_INSERTS              = 3
+  ,NBICOL_NUM_UPDATES              = 4
+  ,NBICOL_NUM_DELETES              = 5
+  ,NBICOL_NUM_SCHEMAOPS            = 6
+  /* Following colums in schema 'v2' */
+  ,NBICOL_ORIG_SERVERID            = 7
+  ,NBICOL_ORIG_EPOCH               = 8
+  ,NBICOL_GCI                      = 9
+  /* Following columns in schema 'v3' */
+  ,NBICOL_NEXT_POS                 = 10
+  ,NBICOL_NEXT_FILE                = 11
+};
+
 /*
   Flag showing if the ndb injector thread is running, if so == 1
   -1 if it was started but later stopped for some reason
@@ -3507,8 +3526,8 @@ ndb_binlog_thread_handle_schema_event_po
 */
 struct ndb_binlog_index_row {
   ulonglong epoch;
-  const char *master_log_file;
-  ulonglong master_log_pos;
+  const char *start_master_log_file;
+  ulonglong start_master_log_pos;
   ulong n_inserts;
   ulong n_updates;
   ulong n_deletes;
@@ -3519,6 +3538,9 @@ struct ndb_binlog_index_row {
 
   ulong gci;
 
+  const char *next_master_log_file;
+  ulonglong next_master_log_pos;
+
   struct ndb_binlog_index_row *next;
 };
 
@@ -3595,24 +3617,53 @@ ndb_binlog_index_table__write_rows(THD *
     uint orig_server_id= 0;
     empty_record(ndb_binlog_index);
 
-    ndb_binlog_index->field[0]->store(first->master_log_pos, true);
-    ndb_binlog_index->field[1]->store(first->master_log_file,
-                                      (uint)strlen(first->master_log_file),
-                                      &my_charset_bin);
-    ndb_binlog_index->field[2]->store(epoch= first->epoch, true);
-    if (ndb_binlog_index->s->fields > 7)
-    {
-      ndb_binlog_index->field[3]->store(row->n_inserts, true);
-      ndb_binlog_index->field[4]->store(row->n_updates, true);
-      ndb_binlog_index->field[5]->store(row->n_deletes, true);
-      ndb_binlog_index->field[6]->store(row->n_schemaops, true);
-      ndb_binlog_index->field[7]->store(orig_server_id= row->orig_server_id, true);
-      ndb_binlog_index->field[8]->store(orig_epoch= row->orig_epoch, true);
-      ndb_binlog_index->field[9]->store(first->gci, true);
+    ndb_binlog_index->field[NBICOL_START_POS]
+      ->store(first->start_master_log_pos, true);
+    ndb_binlog_index->field[NBICOL_START_FILE]
+      ->store(first->start_master_log_file,
+              (uint)strlen(first->start_master_log_file),
+              &my_charset_bin);
+    ndb_binlog_index->field[NBICOL_EPOCH]
+      ->store(epoch= first->epoch, true);
+    if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
+    {
+      /* Table has ORIG_SERVERID / ORIG_EPOCH columns.
+       * Write rows with different ORIG_SERVERID / ORIG_EPOCH
+       * separately
+       */
+      ndb_binlog_index->field[NBICOL_NUM_INSERTS]
+        ->store(row->n_inserts, true);
+      ndb_binlog_index->field[NBICOL_NUM_UPDATES]
+        ->store(row->n_updates, true);
+      ndb_binlog_index->field[NBICOL_NUM_DELETES]
+        ->store(row->n_deletes, true);
+      ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
+        ->store(row->n_schemaops, true);
+      ndb_binlog_index->field[NBICOL_ORIG_SERVERID]
+        ->store(orig_server_id= row->orig_server_id, true);
+      ndb_binlog_index->field[NBICOL_ORIG_EPOCH]
+        ->store(orig_epoch= row->orig_epoch, true);
+      ndb_binlog_index->field[NBICOL_GCI]
+        ->store(first->gci, true);
+
+      if (ndb_binlog_index->s->fields > NBICOL_NEXT_POS)
+      {
+        /* Table has next log pos fields, fill them in */
+        ndb_binlog_index->field[NBICOL_NEXT_POS]
+          ->store(first->next_master_log_pos, true);
+        ndb_binlog_index->field[NBICOL_NEXT_FILE]
+          ->store(first->next_master_log_file,
+                  (uint)strlen(first->next_master_log_file),
+                  &my_charset_bin);
+      }
       row= row->next;
     }
     else
     {
+      /* Old schema : Table has no separate
+       * ORIG_SERVERID / ORIG_EPOCH columns.
+       * Merge operation counts and write one row
+       */
       while ((row= row->next))
       {
         first->n_inserts+= row->n_inserts;
@@ -3620,16 +3671,20 @@ ndb_binlog_index_table__write_rows(THD *
         first->n_deletes+= row->n_deletes;
         first->n_schemaops+= row->n_schemaops;
       }
-      ndb_binlog_index->field[3]->store((ulonglong)first->n_inserts, true);
-      ndb_binlog_index->field[4]->store((ulonglong)first->n_updates, true);
-      ndb_binlog_index->field[5]->store((ulonglong)first->n_deletes, true);
-      ndb_binlog_index->field[6]->store((ulonglong)first->n_schemaops, true);
+      ndb_binlog_index->field[NBICOL_NUM_INSERTS]
+        ->store((ulonglong)first->n_inserts, true);
+      ndb_binlog_index->field[NBICOL_NUM_UPDATES]
+        ->store((ulonglong)first->n_updates, true);
+      ndb_binlog_index->field[NBICOL_NUM_DELETES]
+        ->store((ulonglong)first->n_deletes, true);
+      ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
+        ->store((ulonglong)first->n_schemaops, true);
     }
 
     if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
     {
       char tmp[128];
-      if (ndb_binlog_index->s->fields > 7)
+      if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
         my_snprintf(tmp, sizeof(tmp), "%u/%u,%u,%u/%u",
                     uint(epoch >> 32), uint(epoch),
                     orig_server_id,
@@ -7252,7 +7307,6 @@ restart_cluster_failure:
           }
       commit_to_binlog:
           thd->proc_info= "Committing events to binlog";
-          injector::transaction::binlog_pos start= trans.start_pos();
           if (int r= trans.commit())
           {
             sql_print_error("NDB Binlog: "
@@ -7260,10 +7314,26 @@ restart_cluster_failure:
                             r);
             /* TODO: Further handling? */
           }
+          injector::transaction::binlog_pos start= trans.start_pos();
+          injector::transaction::binlog_pos next = trans.next_pos();
           rows->gci= (Uint32)(gci >> 32); // Expose gci hi/lo
           rows->epoch= gci;
-          rows->master_log_file= start.file_name();
-          rows->master_log_pos= start.file_pos();
+          rows->start_master_log_file= start.file_name();
+          rows->start_master_log_pos= start.file_pos();
+          if ((next.file_pos() == 0) &&
+              ndb_log_empty_epochs())
+          {
+            /* Empty transaction 'committed' due to log_empty_epochs
+             * therefore no next position
+             */
+            rows->next_master_log_file= start.file_name();
+            rows->next_master_log_pos= start.file_pos();
+          }
+          else
+          {
+            rows->next_master_log_file= next.file_name();
+            rows->next_master_log_pos= next.file_pos();
+          }
 
           DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
           if (opt_ndb_log_binlog_index)

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2012-02-13 17:00:22 +0000
+++ b/sql/log.cc	2012-03-28 15:17:32 +0000
@@ -4278,6 +4278,21 @@ MYSQL_BIN_LOG::remove_pending_rows_event
   DBUG_RETURN(0);
 }
 
+#ifndef MCP_BUG54854
+/*
+  Updates thd's position-of-next-event variables
+  after a *real* write a file.
+ */
+void MYSQL_BIN_LOG::update_thd_next_event_pos(THD* thd)
+{
+  if (likely(thd != NULL))
+  {
+    thd->set_next_event_pos(log_file_name,
+                            my_b_tell(&log_file));
+  }
+}
+#endif
+
 /*
   Moves the last bunch of rows from the pending Rows event to the binlog
   (either cached binlog if transaction, or disk binlog). Sets a new pending
@@ -4337,6 +4352,9 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
         signal_update();
         error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
       }
+#ifndef MCP_BUG54854
+      update_thd_next_event_pos(thd);
+#endif
     }
 
     pthread_mutex_unlock(&LOCK_log);
@@ -4535,6 +4553,9 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
       if ((error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)))
         goto err;
       
+#ifndef MCP_BUG54854
+      update_thd_next_event_pos(thd);
+#endif
     }
     error=0;
 
@@ -4874,6 +4895,10 @@ bool MYSQL_BIN_LOG::write_incident(THD *
       signal_update();
       error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
+#ifndef MCP_BUG54854
+    update_thd_next_event_pos(thd);
+#endif
+
     pthread_mutex_unlock(&LOCK_log);
   }
   DBUG_RETURN(error);
@@ -4986,6 +5011,10 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
     else
       if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))
         goto err;
+
+#ifndef MCP_BUG54854
+    update_thd_next_event_pos(thd);
+#endif
   }
   VOID(pthread_mutex_unlock(&LOCK_log));
 

=== modified file 'sql/log.h'
--- a/sql/log.h	2011-06-30 15:55:35 +0000
+++ b/sql/log.h	2012-03-28 15:17:32 +0000
@@ -27,6 +27,23 @@ bool trans_has_updated_non_trans_table(c
 bool trans_has_no_stmt_committed(const THD* thd, const bool all);
 bool stmt_has_updated_non_trans_table(const THD* thd);
 
+#ifndef MCP_BUG54854
+/**
+  the struct aggregates two paramenters that identify an event
+  uniquely in scope of communication of a particular master and slave couple.
+  I.e there can not be 2 events from the same staying connected master which
+  have the same coordinates.
+  @note
+  Such identifier is not yet unique generally as the event originating master
+  is resetable. Also the crashed master can be replaced with some other.
+*/
+struct event_coordinates
+{
+  char * file_name; // binlog file name (directories stripped)
+  my_off_t  pos;       // event's position in the binlog file
+};
+#endif
+
 /*
   Transaction Coordinator log - a base abstract class
   for two different implementations
@@ -319,6 +336,9 @@ public:
   int unlog(ulong cookie, my_xid xid);
   int recover(IO_CACHE *log, Format_description_log_event *fdle);
 #if !defined(MYSQL_CLIENT)
+#ifndef MCP_BUG54854
+  void update_thd_next_event_pos(THD *thd);
+#endif
   int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event);
   int remove_pending_rows_event(THD *thd);
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-09-21 10:11:58 +0000
+++ b/sql/log_event.h	2012-03-28 15:17:32 +0000
@@ -754,6 +754,7 @@ typedef struct st_print_event_info
 } PRINT_EVENT_INFO;
 #endif
 
+#ifdef MCP_BUG54854
 /**
   the struct aggregates two paramenters that identify an event
   uniquely in scope of communication of a particular master and slave couple.
@@ -768,6 +769,7 @@ struct event_coordinates
   char * file_name; // binlog file name (directories stripped)
   my_off_t  pos;       // event's position in the binlog file
 };
+#endif
 
 /**
   @class Log_event

=== modified file 'sql/rpl_injector.cc'
--- a/sql/rpl_injector.cc	2011-09-21 10:11:58 +0000
+++ b/sql/rpl_injector.cc	2012-03-28 15:17:32 +0000
@@ -37,6 +37,30 @@ injector::transaction::transaction(MYSQL
   m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
   m_start_pos.m_file_pos= log_info.pos;
 
+#ifndef MCP_BUG54854
+  if (unlikely(m_start_pos.m_file_name == NULL))
+  {
+    m_thd= NULL;
+    return;
+  }
+
+  /*
+     Next pos is unknown until after commit of the Binlog transaction
+  */
+  m_next_pos.m_file_name= 0;
+  m_next_pos.m_file_pos= 0;
+
+  /*
+    Ensure we don't pickup this thd's last written Binlog pos in
+    empty-transaction-commit cases
+    This is not ideal, as it zaps this information for any other
+    usage (e.g. WL4047)
+    Potential improvement : save the 'old' next pos prior to
+    commit, and restore on error.
+  */
+  m_thd->clear_next_event_pos();
+#endif
+
   begin_trans(m_thd);
 
   thd->set_current_stmt_binlog_row_based();
@@ -47,17 +71,21 @@ injector::transaction::~transaction()
   if (!good())
     return;
 
+#ifndef MCP_BUG54854
   /* Needed since my_free expects a 'char*' (instead of 'void*'). */
-  char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
-
-  /*
-    We set the first character to null just to give all the copies of the
-    start position a (minimal) chance of seening that the memory is lost.
-    All assuming the my_free does not step over the memory, of course.
-  */
-  *the_memory= '\0';
+  char* const start_pos_memory= const_cast<char*>(m_start_pos.m_file_name);
 
-  my_free(the_memory, MYF(0));
+  if (start_pos_memory)
+  {
+    my_free(start_pos_memory, MYF(0));
+  }
+
+  char* const next_pos_memory= const_cast<char*>(m_next_pos.m_file_name);
+  if (next_pos_memory)
+  {
+    my_free(next_pos_memory, MYF(0));
+  }
+#endif
 }
 
 /**
@@ -89,6 +117,24 @@ int injector::transaction::commit()
    */
    error |= ha_autocommit_or_rollback(m_thd, error);
    end_trans(m_thd, error ? ROLLBACK : COMMIT);
+
+#ifndef MCP_BUG54854
+   /* Copy next position out into our next pos member */
+   if ((error == 0) &&
+       (m_thd->binlog_next_event_pos.file_name != NULL) &&
+       ((m_next_pos.m_file_name=
+         my_strdup(m_thd->binlog_next_event_pos.file_name, MYF(0))) != NULL))
+   {
+     m_next_pos.m_file_pos= m_thd->binlog_next_event_pos.pos;
+   }
+   else
+   {
+     /* Error, problem copying etc. */
+     m_next_pos.m_file_name= NULL;
+     m_next_pos.m_file_pos= 0;
+   }
+#endif
+
    DBUG_RETURN(error);
 }
 
@@ -230,6 +276,13 @@ injector::transaction::binlog_pos inject
    return m_start_pos;			
 }
 
+#ifndef MCP_BUG54854
+injector::transaction::binlog_pos injector::transaction::next_pos() const
+{
+   return m_next_pos;
+}
+#endif
+
 
 /*
   injector - member definitions

=== modified file 'sql/rpl_injector.h'
--- a/sql/rpl_injector.h	2011-09-21 10:11:58 +0000
+++ b/sql/rpl_injector.h	2012-03-28 15:17:32 +0000
@@ -241,17 +241,37 @@ public:
       int rollback();
 #endif
 
+#ifndef MCP_BUG54854
       /*
         Get the position for the start of the transaction.
 
-        Returns the position in the binary log of the first event in this
-        transaction. If no event is yet written, the position where the event
-        *will* be written is returned. This position is known, since a
-        new_transaction() will lock the binary log and prevent any other
-        writes to the binary log.
+        This is the current 'tail of Binlog' at the time the transaction
+        was started.  The first event recorded by the transaction may
+        be at this, or some subsequent position.  The first event recorded
+        by the transaction will not be before this position.
       */
+#endif
       binlog_pos start_pos() const;
 
+#ifndef MCP_BUG54854
+      /*
+        Get the next position after the end of the transaction
+
+        This call is only valid after a transaction has been committed.
+        It returns the next Binlog position after the committed transaction.
+        It is guaranteed that no other events will be recorded between the
+        COMMIT event of the Binlog transaction, and this position.
+        Note that this position may be in a different log file to the COMMIT
+        event.
+
+        If the commit had an error, or the transaction was empty and nothing
+        was binlogged then the next_pos will have a NULL file_name(), and
+        0 file_pos().
+
+      */
+      binlog_pos next_pos() const;
+#endif
+
     private:
       /* Only the injector may construct these object */
       transaction(MYSQL_BIN_LOG *, THD *);
@@ -264,6 +284,15 @@ public:
           o.m_start_pos= tmp;
         }
 
+#ifndef MCP_BUG54854
+        /* std::swap(m_end_pos, o.m_end_pos); */
+        {
+          binlog_pos const tmp= m_next_pos;
+          m_next_pos= o.m_next_pos;
+          o.m_next_pos= tmp;
+        }
+#endif
+
         /* std::swap(m_thd, o.m_thd); */
         {
           THD* const tmp= m_thd;
@@ -336,6 +365,9 @@ public:
 
 
       binlog_pos m_start_pos;
+#ifndef MCP_BUG54854
+      binlog_pos m_next_pos;
+#endif
       THD *m_thd;
     };
 

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2012-02-13 14:45:39 +0000
+++ b/sql/sql_class.cc	2012-03-28 15:17:32 +0000
@@ -748,6 +748,11 @@ THD::THD()
   m_binlog_invoker= FALSE;
   memset(&invoker_user, 0, sizeof(invoker_user));
   memset(&invoker_host, 0, sizeof(invoker_host));
+
+#ifndef MCP_BUG54854
+  binlog_next_event_pos.file_name= NULL;
+  binlog_next_event_pos.pos= 0;
+#endif
 }
 
 
@@ -1041,6 +1046,10 @@ THD::~THD()
 
   plugin_thdvar_cleanup(this);
 
+#ifndef MCP_BUG54854
+  clear_next_event_pos();
+#endif
+
   DBUG_PRINT("info", ("freeing security context"));
   main_security_ctx.destroy();
   safeFree(db);
@@ -4172,6 +4181,35 @@ THD::binlog_row_event_extra_data_eq(cons
 }
 #endif
 
+#ifndef MCP_BUG54854
+void THD::set_next_event_pos(const char* _filename, ulonglong _pos)
+{
+  char*& filename= binlog_next_event_pos.file_name;
+  if (filename == NULL)
+  {
+    /* First time, allocate maximal buffer */
+    filename= (char*) my_malloc(FN_REFLEN+1, MYF(MY_WME));
+    if (filename == NULL) return;
+  }
+
+  assert(strlen(_filename) <= FN_REFLEN);
+  strcpy(filename, _filename);
+  filename[ FN_REFLEN ]= 0;
+
+  binlog_next_event_pos.pos= _pos;
+};
+
+void THD::clear_next_event_pos()
+{
+  if (binlog_next_event_pos.file_name != NULL)
+  {
+    my_free(binlog_next_event_pos.file_name, MYF(MY_WME));
+  }
+  binlog_next_event_pos.file_name= NULL;
+  binlog_next_event_pos.pos= 0;
+};
+#endif
+
 bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
                                  ulonglong incr)
 {

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2012-02-13 14:45:39 +0000
+++ b/sql/sql_class.h	2012-03-28 15:17:32 +0000
@@ -1446,6 +1446,17 @@ public:
                                              const uchar* b);
 #endif
 
+#ifndef MCP_BUG54854
+  /*
+    Position of first event in Binlog
+    *after* last event written by this
+    thread.
+  */
+  event_coordinates binlog_next_event_pos;
+  void set_next_event_pos(const char* _filename, ulonglong _pos);
+  void clear_next_event_pos();
+#endif
+
 #ifndef MYSQL_CLIENT
   int binlog_setup_trx_data();
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4907 to 4908)Bug#54854 Bug#11762277Frazer Clement29 Mar