List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:January 17 2012 2:11pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4796 to 4798)
View as plain text  
 4798 Frazer Clement	2012-01-17
        Refactor ndb_apply_status write_row generation
        
        Existing code splits row buffer initialisation from setting 
        and writing.  Looks like an optimisation that's been broken.
        
        This patch pulls out the buffer preparation and writing into
        a method, and calls it from one place, chipping some code and 
        locals off the Binlog Injector monolith.

    modified:
      sql/ha_ndbcluster_binlog.cc
 4797 Frazer Clement	2012-01-17
      Bug#13578660 NDB REPLICATION : CONFLICT DETECTION NOT ENABLED ON ALL CONNECTED
       MYSQLDS
        
        Conflict function setup was using a TABLE* object 
        (MySQLD table representation) to determine the offsets
        of the resolve column in the table record for NDB$MAX and
        NDB$OLD, and the primary key column offsets for exceptions
        table inserts.
        
        Conflict function setup is part of binlog setup and occurs
        as a result of :
         - Table creation
         - Table 'discovery' due to observed ndb_schema notifications
         - Table 'discovery' due to SHOW TABLES
         - Table 'discovery' at server startup
        
        In some of these situations, the calling context has no MySQLD
        Table* for the table, and so null is passed.  As the information
        to setup conflict detection was not available, it was not set up!
        
        Fix
        
        Remove the use of a TABLE* when setting up conflict detection.
        Parts :
         1) Store resolve column number in NDB_SHARE rather than a record
            offset
         2) Store exception's table pk attrids in NDB_SHARE rather than
            record offsets
         3) Modify conflict function definition code to use resolve column
            number and NdbRecord to determine the correct offset.
         4) Modify exceptions table insert code to use pk attrids and 
            NdbRecord to determine the correct offsets.
         5) Avoid using TABLE* members for error messages etc
         6) Remove TABLE* parameter from calls
         7) Remove check for TABLE*==NULL
        
        Additional items
        
        Cleanup/improve reporting of conflict function setup.
        Use sufficient bits to encode resolve column attrid.
        Add testcase to verify conflict table setup distribution.

    added:
      mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
    modified:
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
 4796 Jonas Oreland	2012-01-16
      ndb - factor out TransporterReceiveHandle from TransporterCallbackKernel and put mt.cpp version on stack of receive-thread. Add getter so that block/unblock-receive continues to work (error insert)

    modified:
      storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	2012-01-17 13:51:35 +0000
@@ -229,7 +229,51 @@ select server_id, master_server_id, coun
 server_id	master_server_id	count	a
 2	1	1	3
 drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+Test that online table distribution sets up conflict functions and exceptions tables
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+create table t2diffex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t3oneex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+show variables like 'server_id';
+Variable_name	Value
+server_id	1
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+show variables like 'server_id';
+Variable_name	Value
+server_id	3
+MySQLD error output for server 2.1 matching pattern %NDB Slave%
+relevant
+[warning] ndb slave: table test.t3oneex : no extra row author bits in table.
+[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
 "Cleanup"
 drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2012-01-17 13:51:35 +0000
@@ -15,7 +15,7 @@ delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
 Warnings:
-Error	1627	Error in parsing conflict function. Message: column 'X' has wrong datatype
+Error	1627	Error in parsing conflict function. Message: Column 'X' has wrong datatype
 drop table t1;
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	2012-01-17 13:51:35 +0000
@@ -2,7 +2,7 @@
 # Test engine native conflict resolution for ndb
 #
 #
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
 --source include/have_binlog_format_mixed_or_row.inc
 --source suite/ndb_rpl/ndb_master-slave.inc
 
@@ -307,14 +307,77 @@ select server_id, master_server_id, coun
 
 --connection master
 drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+
+--echo Test that online table distribution sets up conflict functions and exceptions tables
+
+# t1allsame - Same on all servers, no exceptions table
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+
+# t2diffex - Different on each server with an exceptions table
+# Not a recommended configuration!
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+
+# t3oneex - Only on one server with an exceptions table
+# Note that it's not defined on the server where it's created (not recommended)
+# so on the server where it's defined, we get an error due to having no extra
+# author bits
+#
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+
+# Create exception tables
+create table t2diffex$EX (
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+
+create table t3oneex$EX (
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
 
+# Now create actual tables on this server (id 1)
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+
+# Now examine server logs to see that conflict detection and
+# exceptions tables were setup as expected
+show variables like 'server_id';
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=4
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+
+--connection server2
+--disable_query_log
+call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*");
+--enable_query_log
+
+show variables like 'server_id';
+--let $server_num=2.1
+--let $pattern=%NDB Slave%
+--let $limit=6
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+--connection master
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
 
 ###############
 --echo "Cleanup"
 
 --connection master
 drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
 --sync_slave_with_master
 
 --source include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test	2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test	2012-01-17 13:51:35 +0000
@@ -231,6 +231,11 @@ insert into mysql.ndb_replication values
   ("test", "t3", 0, 7, "NDB\$EPOCH(32)"),
   ("test", "t4", 0, 7, "NDB\$EPOCH(-1)");
 
+--disable_query_log
+# Only need suppress here, as table creation fails due to this.
+call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*");
+--enable_query_log
+
 --error 1005
 create table test.t3 (a int primary key) engine=ndb;
 show warnings;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2012-01-17 13:51:35 +0000
@@ -2,7 +2,7 @@
 # Some negative tests of the ndb_replication table
 #
 #
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
 --source include/have_binlog_format_mixed_or_row.inc
 --source suite/ndb_rpl/ndb_master-slave.inc
 
@@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication
 --enable_warnings
 --enable_query_log
 
+# Need suppressions on all servers where warnings/errors can be seen.
+--disable_query_log
+--connection server1
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection default
+--enable_query_log
+
 # Non existant conflict_fn
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 --error 1005
 create table t1 (a int key, X int) engine ndb;
@@ -56,6 +73,8 @@ delete from mysql.ndb_replication;
 
 # Column type cannot be used for this function
 # gives warning when creating table
+#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
 drop table t1;
@@ -63,6 +82,8 @@ delete from mysql.ndb_replication;
 
 # Too few arguments
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
 --error 1005
 create table t1 (a int key, X int) engine ndb;
@@ -71,6 +92,7 @@ delete from mysql.ndb_replication;
 
 # Too many arguments
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
 --error 1005
 create table t1 (a int key, X int) engine ndb;

=== added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc'
--- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	2012-01-17 13:51:35 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--echo MySQLD error output for server $server_num matching pattern $pattern
+create table errlog (a int auto_increment primary key, txt text) engine=myisam;
+
+--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog (txt);
+
+--eval select replace( lower( right(txt, length(txt) - locate('[',txt) + 1)), '\r', '') as relevant from errlog where txt like '$pattern' order by a desc limit $limit;
+
+drop table errlog;
+--enable_query_log

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-01-12 02:26:21 +0000
+++ b/sql/ha_ndbcluster.cc	2012-01-17 13:51:35 +0000
@@ -2199,7 +2199,7 @@ int ha_ndbcluster::get_metadata(THD *thd
 
 #ifdef HAVE_NDB_BINLOG
   ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
-                                     ::server_id, table, FALSE);
+                                     ::server_id, FALSE);
 #endif
 
   DBUG_RETURN(0);
@@ -4805,6 +4805,7 @@ ha_ndbcluster::prepare_conflict_detectio
   {
     res = conflict_fn->prep_func(m_share->m_cfn_share,
                                  op_type,
+                                 m_ndb_record,
                                  old_data,
                                  new_data,
                                  table->write_set,
@@ -5655,7 +5656,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
         for (k= 0; k < nkey; k++)
         {
           DBUG_ASSERT(pk_row != NULL);
-          const uchar* data= pk_row + cfn_share->m_offset[k];
+          const uchar* data=
+            (const uchar*) NdbDictionary::getValuePtr(key_rec,
+                                                      (const char*) pk_row,
+                                                      cfn_share->m_key_attrids[k]);
           if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
           {
             err= ex_op->getNdbError();
@@ -9599,7 +9603,6 @@ int ha_ndbcluster::create(const char *na
                                                           m_dbname,
                                                           m_tabname,
                                                           ::server_id,
-                                                          form,
                                                           &binlog_flags,
                                                           &conflict_fn,
                                                           args,
@@ -10078,7 +10081,6 @@ cleanup_failed:
         ndbcluster_apply_binlog_replication_info(thd,
                                                  share,
                                                  m_table,
-                                                 form,
                                                  conflict_fn,
                                                  args,
                                                  num_args,
@@ -10518,7 +10520,7 @@ int ha_ndbcluster::rename_table(const ch
 #ifdef HAVE_NDB_BINLOG
     if (share)
       ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
-                                         ::server_id, NULL, TRUE);
+                                         ::server_id, TRUE);
 #endif
     /* always create an event for the table */
     String event_name(INJECTOR_EVENT_LEN);

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2012-01-12 02:26:21 +0000
+++ b/sql/ha_ndbcluster.h	2012-01-17 13:51:35 +0000
@@ -144,11 +144,9 @@ enum enum_conflict_fn_arg_type
 struct st_conflict_fn_arg
 {
   enum_conflict_fn_arg_type type;
-  const char *ptr;
-  uint32 len;
   union
   {
-    uint32 fieldno;      // CFAT_COLUMN_NAME
+    char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
     uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
   };
 };
@@ -176,6 +174,7 @@ enum enum_conflicting_op_type
 */
 typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
                                      enum_conflicting_op_type op_type,
+                                     const NdbRecord* data_record,
                                      const uchar* old_data,
                                      const uchar* new_data,
                                      const MY_BITMAP* write_set,
@@ -219,16 +218,21 @@ enum enum_conflict_fn_table_flags
   CFF_REFRESH_ROWS = 1
 };
 
+/*
+   Maximum supported key parts (16)
+   (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
 typedef struct st_ndbcluster_conflict_fn_share {
   const st_conflict_fn_def* m_conflict_fn;
 
   /* info about original table */
   uint8 m_pk_cols;
-  uint8 m_resolve_column;
+  uint16 m_resolve_column;
   uint8 m_resolve_size;
   uint8 m_flags;
-  uint16 m_offset[16];
-  uint16 m_resolve_offset;
+  uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
 
   const NdbDictionary::Table *m_ex_tab;
   uint32 m_count;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2012-01-12 02:26:21 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2012-01-17 13:52:58 +0000
@@ -3847,7 +3847,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
                      const NDBTAB *ndbtab, uint field_index,
                      uint resolve_col_sz,
                      const st_conflict_fn_def* conflict_fn,
-                     TABLE *table,
                      uint8 flags)
 {
   DBUG_ENTER("slave_set_resolve_fn");
@@ -3867,8 +3866,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   /* Calculate resolve col stuff (if relevant) */
   cfn_share->m_resolve_size= resolve_col_sz;
   cfn_share->m_resolve_column= field_index;
-  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
-                                        table->record[0]);
   cfn_share->m_flags = flags;
 
   {
@@ -3917,8 +3914,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
               col->getNullable() == ex_col->getNullable();
             if (!ok)
               break;
-            cfn_share->m_offset[k]=
-              (uint16)(table->field[i]->ptr - table->record[0]);
+            /*
+               Store mapping of Exception table key# to
+               orig table attrid
+            */
+            cfn_share->m_key_attrids[k]= i;
             k++;
           }
         }
@@ -3929,9 +3929,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
           ndbtab_g.release();
           if (opt_ndb_extra_logging)
             sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
-                                  table->s->db.str,
-                                  table->s->table_name.str,
-                                  table->s->db.str,
+                                  share->db,
+                                  share->table_name,
+                                  share->db,
                                   ex_tab_name);
         }
         else
@@ -3964,6 +3964,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
 int
 row_conflict_fn_old(st_ndbcluster_conflict_fn_share* cfn_share,
                     enum_conflicting_op_type op_type,
+                    const NdbRecord* data_record,
                     const uchar* old_data,
                     const uchar* new_data,
                     const MY_BITMAP* write_set,
@@ -3972,7 +3973,10 @@ row_conflict_fn_old(st_ndbcluster_confli
   DBUG_ENTER("row_conflict_fn_old");
   uint32 resolve_column= cfn_share->m_resolve_column;
   uint32 resolve_size= cfn_share->m_resolve_size;
-  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+  const uchar* field_ptr = (const uchar*)
+    NdbDictionary::getValuePtr(data_record,
+                               (const char*) old_data,
+                               cfn_share->m_resolve_column);
 
   assert((resolve_size == 4) || (resolve_size == 8));
 
@@ -4040,6 +4044,7 @@ row_conflict_fn_old(st_ndbcluster_confli
 int
 row_conflict_fn_max_update_only(st_ndbcluster_conflict_fn_share* cfn_share,
                                 enum_conflicting_op_type op_type,
+                                const NdbRecord* data_record,
                                 const uchar* old_data,
                                 const uchar* new_data,
                                 const MY_BITMAP* write_set,
@@ -4048,7 +4053,10 @@ row_conflict_fn_max_update_only(st_ndbcl
   DBUG_ENTER("row_conflict_fn_max_update_only");
   uint32 resolve_column= cfn_share->m_resolve_column;
   uint32 resolve_size= cfn_share->m_resolve_size;
-  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+  const uchar* field_ptr = (const uchar*)
+    NdbDictionary::getValuePtr(data_record,
+                               (const char*) new_data,
+                               cfn_share->m_resolve_column);
 
   assert((resolve_size == 4) || (resolve_size == 8));
 
@@ -4127,6 +4135,7 @@ row_conflict_fn_max_update_only(st_ndbcl
 int
 row_conflict_fn_max(st_ndbcluster_conflict_fn_share* cfn_share,
                     enum_conflicting_op_type op_type,
+                    const NdbRecord* data_record,
                     const uchar* old_data,
                     const uchar* new_data,
                     const MY_BITMAP* write_set,
@@ -4140,6 +4149,7 @@ row_conflict_fn_max(st_ndbcluster_confli
   case UPDATE_ROW:
     return row_conflict_fn_max_update_only(cfn_share,
                                            op_type,
+                                           data_record,
                                            old_data,
                                            new_data,
                                            write_set,
@@ -4151,6 +4161,7 @@ row_conflict_fn_max(st_ndbcluster_confli
      */
     return row_conflict_fn_old(cfn_share,
                                op_type,
+                               data_record,
                                old_data,
                                new_data,
                                write_set,
@@ -4179,6 +4190,7 @@ row_conflict_fn_max(st_ndbcluster_confli
 int
 row_conflict_fn_max_del_win(st_ndbcluster_conflict_fn_share* cfn_share,
                             enum_conflicting_op_type op_type,
+                            const NdbRecord* data_record,
                             const uchar* old_data,
                             const uchar* new_data,
                             const MY_BITMAP* write_set,
@@ -4192,6 +4204,7 @@ row_conflict_fn_max_del_win(st_ndbcluste
   case UPDATE_ROW:
     return row_conflict_fn_max_update_only(cfn_share,
                                            op_type,
+                                           data_record,
                                            old_data,
                                            new_data,
                                            write_set,
@@ -4216,6 +4229,7 @@ row_conflict_fn_max_del_win(st_ndbcluste
 int
 row_conflict_fn_epoch(st_ndbcluster_conflict_fn_share* cfn_share,
                       enum_conflicting_op_type op_type,
+                      const NdbRecord* data_record,
                       const uchar* old_data,
                       const uchar* new_data,
                       const MY_BITMAP* write_set,
@@ -4311,7 +4325,6 @@ parse_conflict_fn_spec(const char* confl
                        const st_conflict_fn_def** conflict_fn,
                        st_conflict_fn_arg* args,
                        Uint32* max_args,
-                       const TABLE* table,
                        char *msg, uint msg_len)
 {
   DBUG_ENTER("parse_conflict_fn_spec");
@@ -4402,9 +4415,6 @@ parse_conflict_fn_spec(const char* confl
 
       uint len= (uint)(end_arg - start_arg);
       args[no_args].type=    type;
-      args[no_args].ptr=     start_arg;
-      args[no_args].len=     len;
-      args[no_args].fieldno= (uint32)-1;
  
       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
 
@@ -4413,20 +4423,11 @@ parse_conflict_fn_spec(const char* confl
       {
       case CFAT_COLUMN_NAME:
       {
-        /* find column in table */
-        DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
-        TABLE_SHARE *table_s= table->s;
-        for (uint j= 0; j < table_s->fields; j++)
-        {
-          Field *field= table_s->field[j];
-          if (strncmp(start_arg, field->field_name, len) == 0 &&
-              field->field_name[len] == '\0')
-          {
-            DBUG_PRINT("info", ("found %s", field->field_name));
-            args[no_args].fieldno= j;
-            break;
-          }
-        }
+        /* Copy column name out into argument's buffer */
+        char* dest= &args[no_args].resolveColNameBuff[0];
+
+        memcpy(dest, start_arg, MIN(len, (uint)NAME_CHAR_LEN));
+        dest[len]= '\0';
         break;
       }
       case CFAT_EXTRA_GCI_BITS:
@@ -4488,7 +4489,7 @@ parse_conflict_fn_spec(const char* confl
   }
   /* parse error */
   my_snprintf(msg, msg_len, "%s, %s at '%s'",
-           conflict_fn_spec, error_str, ptr);
+              conflict_fn_spec, error_str, ptr);
   DBUG_PRINT("info", ("%s", msg));
   DBUG_RETURN(-1);
 }
@@ -4497,7 +4498,6 @@ static int
 setup_conflict_fn(THD *thd, NDB_SHARE *share,
                   const NDBTAB *ndbtab,
                   char *msg, uint msg_len,
-                  TABLE *table,
                   const st_conflict_fn_def* conflict_fn,
                   const st_conflict_fn_arg* args,
                   const Uint32 num_args)
@@ -4519,38 +4519,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
       DBUG_RETURN(-1);
     }
 
+    /* Now try to find the column in the table */
+    int colNum = -1;
+    const char* resolveColName = args[0].resolveColNameBuff;
+    int resolveColNameLen = (int)strlen(resolveColName);
+
+    for (int j=0; j< ndbtab->getNoOfColumns(); j++)
+    {
+      const char* colName = ndbtab->getColumn(j)->getName();
+
+      if (strncmp(colName,
+                  resolveColName,
+                  resolveColNameLen) == 0 &&
+          colName[resolveColNameLen] == '\0')
+      {
+        colNum = j;
+        break;
+      }
+    }
+    if (colNum == -1)
+    {
+      my_snprintf(msg, msg_len,
+                  "Could not find resolve column %s.",
+                  resolveColName);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
     uint resolve_col_sz= 0;
 
     if (0 == (resolve_col_sz =
-              slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+              slave_check_resolve_col_type(ndbtab, colNum)))
     {
       /* wrong data type */
       slave_reset_conflict_fn(share);
       my_snprintf(msg, msg_len,
-                  "column '%s' has wrong datatype",
-                  table->s->field[args[0].fieldno]->field_name);
+                  "Column '%s' has wrong datatype",
+                  resolveColName);
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
 
     if (slave_set_resolve_fn(thd, share, ndbtab,
-                             args[0].fieldno, resolve_col_sz,
-                             conflict_fn, table, CFF_NONE))
+                             colNum, resolve_col_sz,
+                             conflict_fn, CFF_NONE))
     {
       my_snprintf(msg, msg_len,
-                  "unable to setup conflict resolution using column '%s'",
-                  table->s->field[args[0].fieldno]->field_name);
+                  "Unable to setup conflict resolution using column '%s'",
+                  resolveColName);
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
-    if (opt_ndb_extra_logging)
-    {
-       sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
-                             table->s->db.str,
-                             table->s->table_name.str,
-                             conflict_fn->name,
-                             table->s->field[args[0].fieldno]->field_name);
-    }
+
+    /* Success, update message */
+    my_snprintf(msg, msg_len,
+                "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+                share->db,
+                share->table_name,
+                conflict_fn->name,
+                resolveColName);
     break;
   }
   case CFT_NDB_EPOCH:
@@ -4577,7 +4604,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
      * represent SavePeriod/EpochPeriod
      */
     if (ndbtab->getExtraRowGciBits() == 0)
-      sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
+      sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution",
+                            share->db,
+                            share->table_name);
 
     if (ndbtab->getExtraRowAuthorBits() == 0)
     {
@@ -4589,20 +4618,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     if (slave_set_resolve_fn(thd, share, ndbtab,
                              0, // field_no
                              0, // resolve_col_sz
-                             conflict_fn, table, CFF_REFRESH_ROWS))
+                             conflict_fn, CFF_REFRESH_ROWS))
     {
       my_snprintf(msg, msg_len,
                   "unable to setup conflict resolution");
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
-    if (opt_ndb_extra_logging)
-    {
-      sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
-                            table->s->db.str,
-                            table->s->table_name.str,
-                            conflict_fn->name);
-    }
+    /* Success, update message */
+    my_snprintf(msg, msg_len,
+                "NDB Slave: Table %s.%s using conflict_fn %s.",
+                share->db,
+                share->table_name,
+                conflict_fn->name);
+
     break;
   }
   case CFT_NUMBER_OF_CFTS:
@@ -4908,7 +4937,6 @@ ndbcluster_get_binlog_replication_info(T
                                        const char* db,
                                        const char* table_name,
                                        uint server_id,
-                                       const TABLE *table,
                                        Uint32* binlog_flags,
                                        const st_conflict_fn_def** conflict_fn,
                                        st_conflict_fn_arg* args,
@@ -4956,34 +4984,42 @@ ndbcluster_get_binlog_replication_info(T
     DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
   }
 
-  if (table != NULL)
+  if (conflict_fn_spec != NULL)
   {
-    if (conflict_fn_spec != NULL)
+    char tmp_buf[FN_REFLEN];
+
+    if (parse_conflict_fn_spec(conflict_fn_spec,
+                               conflict_fn,
+                               args,
+                               num_args,
+                               tmp_buf,
+                               sizeof(tmp_buf)) != 0)
     {
-      char tmp_buf[FN_REFLEN];
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_CONFLICT_FN_PARSE_ERROR,
+                          ER(ER_CONFLICT_FN_PARSE_ERROR),
+                          tmp_buf);
 
-      if (parse_conflict_fn_spec(conflict_fn_spec,
-                                 conflict_fn,
-                                 args,
-                                 num_args,
-                                 table,
-                                 tmp_buf,
-                                 sizeof(tmp_buf)) != 0)
+      /*
+         Log as well, useful for contexts where the thd's stack of
+         warnings are ignored
+       */
+      if (opt_ndb_extra_logging)
       {
-        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                            ER_CONFLICT_FN_PARSE_ERROR,
-                            ER(ER_CONFLICT_FN_PARSE_ERROR),
-                            tmp_buf);
-        DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+        sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
+                          db, table_name,
+                          tmp_buf);
       }
-    }
-    else
-    {
-      /* No conflict function specified */
-      conflict_fn= NULL;
-      num_args= 0;
+
+      DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
     }
   }
+  else
+  {
+    /* No conflict function specified */
+    conflict_fn= NULL;
+    num_args= 0;
+  }
 
   DBUG_RETURN(0);
 }
@@ -4992,7 +5028,6 @@ int
 ndbcluster_apply_binlog_replication_info(THD *thd,
                                          NDB_SHARE *share,
                                          const NDBTAB* ndbtab,
-                                         TABLE* table,
                                          const st_conflict_fn_def* conflict_fn,
                                          const st_conflict_fn_arg* args,
                                          Uint32 num_args,
@@ -5013,15 +5048,32 @@ ndbcluster_apply_binlog_replication_info
     if (setup_conflict_fn(thd, share,
                           ndbtab,
                           tmp_buf, sizeof(tmp_buf),
-                          table,
                           conflict_fn,
                           args,
-                          num_args) != 0)
+                          num_args) == 0)
     {
+      if (opt_ndb_extra_logging)
+      {
+        sql_print_information("%s", tmp_buf);
+      }
+    }
+    else
+    {
+      /*
+        Dump setup failure message to error log
+        for cases where thd warning stack is
+        ignored
+      */
+      sql_print_warning("NDB Slave: Table %s.%s : %s",
+                        share->db,
+                        share->table_name,
+                        tmp_buf);
+
       push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                           ER_CONFLICT_FN_PARSE_ERROR,
                           ER(ER_CONFLICT_FN_PARSE_ERROR),
                           tmp_buf);
+
       DBUG_RETURN(-1);
     }
   }
@@ -5039,7 +5091,6 @@ ndbcluster_read_binlog_replication(THD *
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,
                                    uint server_id,
-                                   TABLE *table,
                                    bool do_set_binlog_flags)
 {
   DBUG_ENTER("ndbcluster_read_binlog_replication");
@@ -5052,7 +5103,6 @@ ndbcluster_read_binlog_replication(THD *
                                               share->db,
                                               share->table_name,
                                               server_id,
-                                              table,
                                               &binlog_flags,
                                               &conflict_fn,
                                               args,
@@ -5060,7 +5110,6 @@ ndbcluster_read_binlog_replication(THD *
       (ndbcluster_apply_binlog_replication_info(thd,
                                                 share,
                                                 ndbtab,
-                                                table,
                                                 conflict_fn,
                                                 args,
                                                 num_args,
@@ -5236,7 +5285,7 @@ int ndbcluster_create_binlog_setup(THD *
     /*
      */
     ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
-                                       ::server_id, NULL, TRUE);
+                                       ::server_id, TRUE);
 #endif
     /*
       check if logging turned off for this table
@@ -6708,6 +6757,71 @@ void updateInjectorStats(Ndb* schemaNdb,
     dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
 }
 
+/**
+   injectApplyStatusWriteRow
+
+   Inject a WRITE_ROW event on the ndb_apply_status table into
+   the Binlog.
+   This contains our server_id and the supplied epoch number.
+   When applied on the Slave it gives a transactional position
+   marker
+*/
+static
+bool
+injectApplyStatusWriteRow(injector::transaction& trans,
+                          ulonglong gci)
+{
+  DBUG_ENTER("injectApplyStatusWriteRow");
+  if (ndb_apply_status_share == NULL)
+  {
+    sql_print_error("NDB: Could not get apply status share");
+    DBUG_ASSERT(ndb_apply_status_share != NULL);
+    DBUG_RETURN(false);
+  }
+
+  /* Build row buffer for generated ndb_apply_status
+     WRITE_ROW event
+     First get the relevant table structure.
+  */
+  DBUG_ASSERT(!ndb_apply_status_share->event_data);
+  DBUG_ASSERT(ndb_apply_status_share->op);
+  Ndb_event_data* event_data=
+    (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
+  DBUG_ASSERT(event_data);
+  DBUG_ASSERT(event_data->table);
+  TABLE* apply_status_table= event_data->table;
+
+  /*
+    Intialize apply_status_table->record[0]
+  */
+  empty_record(apply_status_table);
+
+  apply_status_table->field[0]->store((longlong)::server_id, true);
+  apply_status_table->field[1]->store((longlong)gci, true);
+  apply_status_table->field[2]->store("", 0, &my_charset_bin);
+  apply_status_table->field[3]->store((longlong)0, true);
+  apply_status_table->field[4]->store((longlong)0, true);
+#ifndef DBUG_OFF
+  const LEX_STRING& name= apply_status_table->s->table_name;
+  DBUG_PRINT("info", ("use_table: %.*s",
+                      (int) name.length, name.str));
+#endif
+  injector::transaction::table tbl(apply_status_table, true);
+  int ret = trans.use_table(::server_id, tbl);
+  assert(ret == 0); NDB_IGNORE_VALUE(ret);
+
+  ret= trans.write_row(::server_id,
+                       injector::transaction::table(apply_status_table,
+                                                    true),
+                       &apply_status_table->s->all_set,
+                       apply_status_table->s->fields,
+                       apply_status_table->record[0]);
+
+  assert(ret == 0);
+
+  DBUG_RETURN(true);
+}
+
 enum Binlog_thread_state
 {
   BCCC_running= 0,
@@ -7191,44 +7305,6 @@ restart_cluster_failure:
     {
       DBUG_PRINT("info", ("pollEvents res: %d", res));
       thd->proc_info= "Processing events";
-      uchar apply_status_buf[512];
-      TABLE *apply_status_table= NULL;
-      if (ndb_apply_status_share)
-      {
-        /*
-          We construct the buffer to write the apply status binlog
-          event here, as the table->record[0] buffer is referenced
-          by the apply status event operation, and will be filled
-          with data at the nextEvent call if the first event should
-          happen to be from the apply status table
-        */
-        Ndb_event_data *event_data= ndb_apply_status_share->event_data;
-        if (!event_data)
-        {
-          DBUG_ASSERT(ndb_apply_status_share->op);
-          event_data= 
-            (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
-          DBUG_ASSERT(event_data);
-        }
-        apply_status_table= event_data->table;
-
-        /* 
-           Intialize apply_status_table->record[0] 
-        */
-        empty_record(apply_status_table);
-
-        apply_status_table->field[0]->store((longlong)::server_id, true);
-        /*
-          gci is added later, just before writing to binlog as gci
-          is unknown here
-        */
-        apply_status_table->field[2]->store("", 0, &my_charset_bin);
-        apply_status_table->field[3]->store((longlong)0, true);
-        apply_status_table->field[4]->store((longlong)0, true);
-        DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
-        memcpy(apply_status_buf, apply_status_table->record[0],
-               apply_status_table->s->reclength);
-      }
       NdbEventOperation *pOp= i_ndb->nextEvent();
       ndb_binlog_index_row _row;
       ndb_binlog_index_row *rows= &_row;
@@ -7356,35 +7432,11 @@ restart_cluster_failure:
         }
         if (trans.good())
         {
-          if (apply_status_table)
-          {
-#ifndef DBUG_OFF
-            const LEX_STRING& name= apply_status_table->s->table_name;
-            DBUG_PRINT("info", ("use_table: %.*s",
-                                (int) name.length, name.str));
-#endif
-            injector::transaction::table tbl(apply_status_table, true);
-            int ret = trans.use_table(::server_id, tbl);
-            assert(ret == 0); NDB_IGNORE_VALUE(ret);
-
-            /* add the gci to the record */
-            Field *field= apply_status_table->field[1];
-            my_ptrdiff_t row_offset=
-              (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
-            field->move_field_offset(row_offset);
-            field->store((longlong)gci, true);
-            field->move_field_offset(-row_offset);
-
-            trans.write_row(::server_id,
-                            injector::transaction::table(apply_status_table,
-                                                         true),
-                            &apply_status_table->s->all_set,
-                            apply_status_table->s->fields,
-                            apply_status_buf);
-          }
-          else
+          /* Inject ndb_apply_status WRITE_ROW event */
+          if (!injectApplyStatusWriteRow(trans,
+                                         gci))
           {
-            sql_print_error("NDB: Could not get apply status share");
+            sql_print_error("NDB Binlog: Failed to inject apply status write row");
           }
         }
 #ifdef RUN_NDB_BINLOG_TIMER

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2012-01-12 02:26:21 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2012-01-17 13:51:35 +0000
@@ -237,7 +237,6 @@ ndbcluster_get_binlog_replication_info(T
                                        const char* db,
                                        const char* table_name,
                                        uint server_id,
-                                       const TABLE *table,
                                        Uint32* binlog_flags,
                                        const st_conflict_fn_def** conflict_fn,
                                        st_conflict_fn_arg* args,
@@ -246,7 +245,6 @@ int
 ndbcluster_apply_binlog_replication_info(THD *thd,
                                          NDB_SHARE *share,
                                          const NDBTAB* ndbtab,
-                                         TABLE* table,
                                          const st_conflict_fn_def* conflict_fn,
                                          const st_conflict_fn_arg* args,
                                          Uint32 num_args,
@@ -257,7 +255,6 @@ ndbcluster_read_binlog_replication(THD *
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,
                                    uint server_id,
-                                   TABLE *table,
                                    bool do_set_binlog_flags);
 #endif
 int ndb_create_table_from_engine(THD *thd, const char *db,

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4796 to 4798) Frazer Clement17 Jan