From: Frazer Clement Date: January 17 2012 2:10pm Subject: bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4414 to 4415) List-Archive: http://lists.mysql.com/commits/142429 Message-Id: <201201171411.q0HEB4xT001831@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4415 Frazer Clement 2012-01-17 [merge] Merge 7.0->7.1 added: mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc modified: mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h 4414 jonas oreland 2012-01-17 [merge] ndb - merge 70 to 71 modified: storage/ndb/include/debugger/SignalLoggerManager.hpp storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp storage/ndb/include/ndb_types.h.in storage/ndb/include/transporter/TransporterCallback.hpp storage/ndb/include/transporter/TransporterDefinitions.hpp storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/debugger/EventLogger.cpp storage/ndb/src/common/debugger/SignalLoggerManager.cpp storage/ndb/src/common/transporter/Packer.cpp storage/ndb/src/common/transporter/TCP_Transporter.cpp storage/ndb/src/common/transporter/TCP_Transporter.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/Emulator.hpp storage/ndb/src/kernel/vm/SectionReader.cpp storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp === modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result' --- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-12 02:26:21 +0000 +++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-17 13:51:35 +0000 @@ -229,7 +229,51 @@ select server_id, master_server_id, coun server_id master_server_id count a 2 1 1 3 drop table t2_max, t2_max$EX; +drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; +delete from mysql.ndb_replication; +Test that online table distribution sets up conflict functions and exceptions tables +insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)"); +insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()"); +create table t2diffex$EX ( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +create table t3oneex$EX ( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t3oneex(a int primary key, b varchar(200)) engine=ndb; +show variables like 'server_id'; +Variable_name Value +server_id 1 +MySQLD error output for server 1.1 matching pattern %NDB Slave% +relevant +[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x. +[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex +[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x. +show variables like 'server_id'; +Variable_name Value +server_id 3 +MySQLD error output for server 2.1 matching pattern %NDB Slave% +relevant +[warning] ndb slave: table test.t3oneex : no extra row author bits in table. +[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution +[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex +[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x. +drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; "Cleanup" drop table mysql.ndb_replication; -drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; include/rpl_end.inc === modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result' --- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-12 02:26:21 +0000 +++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-17 13:51:35 +0000 @@ -15,7 +15,7 @@ delete from mysql.ndb_replication; insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)"); create table t1 (a int key, X int) engine ndb; Warnings: -Error 1627 Error in parsing conflict function. Message: column 'X' has wrong datatype +Error 1627 Error in parsing conflict function. Message: Column 'X' has wrong datatype drop table t1; delete from mysql.ndb_replication; insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()"); === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-12 02:26:21 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-17 13:51:35 +0000 @@ -2,7 +2,7 @@ # Test engine native conflict resolution for ndb # # ---source include/have_ndb.inc +--source include/have_multi_ndb.inc --source include/have_binlog_format_mixed_or_row.inc --source suite/ndb_rpl/ndb_master-slave.inc @@ -307,14 +307,77 @@ select server_id, master_server_id, coun --connection master drop table t2_max, t2_max$EX; +drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; +delete from mysql.ndb_replication; + +--echo Test that online table distribution sets up conflict functions and exceptions tables + +# t1allsame - Same on all servers, no exceptions table +insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)"); + +# t2diffex - Different on each server with an exceptions table +# Not a recommended configuration! +insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)"); + +# t3oneex - Only on one server with an exceptions table +# Note that it's not defined on the server where it's created (not recommended) +# so on the server where it's defined, we get an error due to having no extra +# author bits +# +insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()"); + +# Create exception tables +create table t2diffex$EX ( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + primary key(server_id, master_server_id, master_epoch, count)) engine ndb; + +create table t3oneex$EX ( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +# Now create actual tables on this server (id 1) +create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t3oneex(a int primary key, b varchar(200)) engine=ndb; + +# Now examine server logs to see that conflict detection and +# exceptions tables were setup as expected +show variables like 'server_id'; +--let $server_num=1.1 +--let $pattern=%NDB Slave% +--let $limit=4 +--source suite/ndb_rpl/t/show_mysqld_warnings.inc + + +--connection server2 +--disable_query_log +call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*"); +--enable_query_log + +show variables like 'server_id'; +--let $server_num=2.1 +--let $pattern=%NDB Slave% +--let $limit=6 + +--source suite/ndb_rpl/t/show_mysqld_warnings.inc + +--connection master +drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; ############### --echo "Cleanup" --connection master drop table mysql.ndb_replication; -drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; --sync_slave_with_master --source include/rpl_end.inc === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-12 02:26:21 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-17 13:51:35 +0000 @@ -231,6 +231,11 @@ insert into mysql.ndb_replication values ("test", "t3", 0, 7, "NDB\$EPOCH(32)"), ("test", "t4", 0, 7, "NDB\$EPOCH(-1)"); +--disable_query_log +# Only need suppress here, as table creation fails due to this. +call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*"); +--enable_query_log + --error 1005 create table test.t3 (a int primary key) engine=ndb; show warnings; === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-12 02:26:21 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-17 13:51:35 +0000 @@ -2,7 +2,7 @@ # Some negative tests of the ndb_replication table # # ---source include/have_ndb.inc +--source include/have_multi_ndb.inc --source include/have_binlog_format_mixed_or_row.inc --source suite/ndb_rpl/ndb_master-slave.inc @@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication --enable_warnings --enable_query_log +# Need suppressions on all servers where warnings/errors can be seen. +--disable_query_log +--connection server1 +call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); +call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); +call mtr.add_suppression("NDB Slave: .* missing function argument .*"); +call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); +--connection server2 +call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); +call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); +call mtr.add_suppression("NDB Slave: .* missing function argument .*"); +call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); +--connection default +--enable_query_log + # Non existant conflict_fn # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)"); --error 1005 create table t1 (a int key, X int) engine ndb; @@ -56,6 +73,8 @@ delete from mysql.ndb_replication; # Column type cannot be used for this function # gives warning when creating table +#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)"); create table t1 (a int key, X int) engine ndb; drop table t1; @@ -63,6 +82,8 @@ delete from mysql.ndb_replication; # Too few arguments # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* missing function argument .*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()"); --error 1005 create table t1 (a int key, X int) engine ndb; @@ -71,6 +92,7 @@ delete from mysql.ndb_replication; # Too many arguments # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)"); --error 1005 create table t1 (a int key, X int) engine ndb; === added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc' --- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 2012-01-17 13:51:35 +0000 @@ -0,0 +1,10 @@ +--disable_query_log +--echo MySQLD error output for server $server_num matching pattern $pattern +create table errlog (a int auto_increment primary key, txt text) engine=myisam; + +--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog (txt); + +--eval select replace( lower( right(txt, length(txt) - locate('[',txt) + 1)), '\r', '') as relevant from errlog where txt like '$pattern' order by a desc limit $limit; + +drop table errlog; +--enable_query_log === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2012-01-12 02:28:03 +0000 +++ b/sql/ha_ndbcluster.cc 2012-01-17 13:57:01 +0000 @@ -2199,7 +2199,7 @@ int ha_ndbcluster::get_metadata(THD *thd #ifdef HAVE_NDB_BINLOG ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table, - ::server_id, table, FALSE); + ::server_id, FALSE); #endif DBUG_RETURN(0); @@ -4805,6 +4805,7 @@ ha_ndbcluster::prepare_conflict_detectio { res = conflict_fn->prep_func(m_share->m_cfn_share, op_type, + m_ndb_record, old_data, new_data, table->write_set, @@ -5655,7 +5656,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR for (k= 0; k < nkey; k++) { DBUG_ASSERT(pk_row != NULL); - const uchar* data= pk_row + cfn_share->m_offset[k]; + const uchar* data= + (const uchar*) NdbDictionary::getValuePtr(key_rec, + (const char*) pk_row, + cfn_share->m_key_attrids[k]); if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1) { err= ex_op->getNdbError(); @@ -9599,7 +9603,6 @@ int ha_ndbcluster::create(const char *na m_dbname, m_tabname, ::server_id, - form, &binlog_flags, &conflict_fn, args, @@ -10078,7 +10081,6 @@ cleanup_failed: ndbcluster_apply_binlog_replication_info(thd, share, m_table, - form, conflict_fn, args, num_args, @@ -10518,7 +10520,7 @@ int ha_ndbcluster::rename_table(const ch #ifdef HAVE_NDB_BINLOG if (share) ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab, - ::server_id, NULL, TRUE); + ::server_id, TRUE); #endif /* always create an event for the table */ String event_name(INJECTOR_EVENT_LEN); === modified file 'sql/ha_ndbcluster.h' --- a/sql/ha_ndbcluster.h 2012-01-12 02:28:03 +0000 +++ b/sql/ha_ndbcluster.h 2012-01-17 13:57:01 +0000 @@ -144,11 +144,9 @@ enum enum_conflict_fn_arg_type struct st_conflict_fn_arg { enum_conflict_fn_arg_type type; - const char *ptr; - uint32 len; union { - uint32 fieldno; // CFAT_COLUMN_NAME + char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS }; }; @@ -176,6 +174,7 @@ enum enum_conflicting_op_type */ typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -219,16 +218,21 @@ enum enum_conflict_fn_table_flags CFF_REFRESH_ROWS = 1 }; +/* + Maximum supported key parts (16) + (Ndb supports 32, but MySQL has a lower limit) +*/ +static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS; + typedef struct st_ndbcluster_conflict_fn_share { const st_conflict_fn_def* m_conflict_fn; /* info about original table */ uint8 m_pk_cols; - uint8 m_resolve_column; + uint16 m_resolve_column; uint8 m_resolve_size; uint8 m_flags; - uint16 m_offset[16]; - uint16 m_resolve_offset; + uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ]; const NdbDictionary::Table *m_ex_tab; uint32 m_count; === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2012-01-12 02:28:03 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2012-01-17 13:57:01 +0000 @@ -3847,7 +3847,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE const NDBTAB *ndbtab, uint field_index, uint resolve_col_sz, const st_conflict_fn_def* conflict_fn, - TABLE *table, uint8 flags) { DBUG_ENTER("slave_set_resolve_fn"); @@ -3867,8 +3866,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE /* Calculate resolve col stuff (if relevant) */ cfn_share->m_resolve_size= resolve_col_sz; cfn_share->m_resolve_column= field_index; - cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr - - table->record[0]); cfn_share->m_flags = flags; { @@ -3917,8 +3914,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE col->getNullable() == ex_col->getNullable(); if (!ok) break; - cfn_share->m_offset[k]= - (uint16)(table->field[i]->ptr - table->record[0]); + /* + Store mapping of Exception table key# to + orig table attrid + */ + cfn_share->m_key_attrids[k]= i; k++; } } @@ -3929,9 +3929,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE ndbtab_g.release(); if (opt_ndb_extra_logging) sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s", - table->s->db.str, - table->s->table_name.str, - table->s->db.str, + share->db, + share->table_name, + share->db, ex_tab_name); } else @@ -3964,6 +3964,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE int row_conflict_fn_old(st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -3972,7 +3973,10 @@ row_conflict_fn_old(st_ndbcluster_confli DBUG_ENTER("row_conflict_fn_old"); uint32 resolve_column= cfn_share->m_resolve_column; uint32 resolve_size= cfn_share->m_resolve_size; - const uchar* field_ptr = old_data + cfn_share->m_resolve_offset; + const uchar* field_ptr = (const uchar*) + NdbDictionary::getValuePtr(data_record, + (const char*) old_data, + cfn_share->m_resolve_column); assert((resolve_size == 4) || (resolve_size == 8)); @@ -4040,6 +4044,7 @@ row_conflict_fn_old(st_ndbcluster_confli int row_conflict_fn_max_update_only(st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4048,7 +4053,10 @@ row_conflict_fn_max_update_only(st_ndbcl DBUG_ENTER("row_conflict_fn_max_update_only"); uint32 resolve_column= cfn_share->m_resolve_column; uint32 resolve_size= cfn_share->m_resolve_size; - const uchar* field_ptr = new_data + cfn_share->m_resolve_offset; + const uchar* field_ptr = (const uchar*) + NdbDictionary::getValuePtr(data_record, + (const char*) new_data, + cfn_share->m_resolve_column); assert((resolve_size == 4) || (resolve_size == 8)); @@ -4127,6 +4135,7 @@ row_conflict_fn_max_update_only(st_ndbcl int row_conflict_fn_max(st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4140,6 +4149,7 @@ row_conflict_fn_max(st_ndbcluster_confli case UPDATE_ROW: return row_conflict_fn_max_update_only(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4151,6 +4161,7 @@ row_conflict_fn_max(st_ndbcluster_confli */ return row_conflict_fn_old(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4179,6 +4190,7 @@ row_conflict_fn_max(st_ndbcluster_confli int row_conflict_fn_max_del_win(st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4192,6 +4204,7 @@ row_conflict_fn_max_del_win(st_ndbcluste case UPDATE_ROW: return row_conflict_fn_max_update_only(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4216,6 +4229,7 @@ row_conflict_fn_max_del_win(st_ndbcluste int row_conflict_fn_epoch(st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4311,7 +4325,6 @@ parse_conflict_fn_spec(const char* confl const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, Uint32* max_args, - const TABLE* table, char *msg, uint msg_len) { DBUG_ENTER("parse_conflict_fn_spec"); @@ -4402,9 +4415,6 @@ parse_conflict_fn_spec(const char* confl uint len= (uint)(end_arg - start_arg); args[no_args].type= type; - args[no_args].ptr= start_arg; - args[no_args].len= len; - args[no_args].fieldno= (uint32)-1; DBUG_PRINT("info", ("found argument %s %u", start_arg, len)); @@ -4413,20 +4423,11 @@ parse_conflict_fn_spec(const char* confl { case CFAT_COLUMN_NAME: { - /* find column in table */ - DBUG_PRINT("info", ("searching for %s %u", start_arg, len)); - TABLE_SHARE *table_s= table->s; - for (uint j= 0; j < table_s->fields; j++) - { - Field *field= table_s->field[j]; - if (strncmp(start_arg, field->field_name, len) == 0 && - field->field_name[len] == '\0') - { - DBUG_PRINT("info", ("found %s", field->field_name)); - args[no_args].fieldno= j; - break; - } - } + /* Copy column name out into argument's buffer */ + char* dest= &args[no_args].resolveColNameBuff[0]; + + memcpy(dest, start_arg, MIN(len, (uint)NAME_CHAR_LEN)); + dest[len]= '\0'; break; } case CFAT_EXTRA_GCI_BITS: @@ -4488,7 +4489,7 @@ parse_conflict_fn_spec(const char* confl } /* parse error */ my_snprintf(msg, msg_len, "%s, %s at '%s'", - conflict_fn_spec, error_str, ptr); + conflict_fn_spec, error_str, ptr); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } @@ -4497,7 +4498,6 @@ static int setup_conflict_fn(THD *thd, NDB_SHARE *share, const NDBTAB *ndbtab, char *msg, uint msg_len, - TABLE *table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, const Uint32 num_args) @@ -4519,38 +4519,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s DBUG_RETURN(-1); } + /* Now try to find the column in the table */ + int colNum = -1; + const char* resolveColName = args[0].resolveColNameBuff; + int resolveColNameLen = (int)strlen(resolveColName); + + for (int j=0; j< ndbtab->getNoOfColumns(); j++) + { + const char* colName = ndbtab->getColumn(j)->getName(); + + if (strncmp(colName, + resolveColName, + resolveColNameLen) == 0 && + colName[resolveColNameLen] == '\0') + { + colNum = j; + break; + } + } + if (colNum == -1) + { + my_snprintf(msg, msg_len, + "Could not find resolve column %s.", + resolveColName); + DBUG_PRINT("info", ("%s", msg)); + DBUG_RETURN(-1); + } + uint resolve_col_sz= 0; if (0 == (resolve_col_sz = - slave_check_resolve_col_type(ndbtab, args[0].fieldno))) + slave_check_resolve_col_type(ndbtab, colNum))) { /* wrong data type */ slave_reset_conflict_fn(share); my_snprintf(msg, msg_len, - "column '%s' has wrong datatype", - table->s->field[args[0].fieldno]->field_name); + "Column '%s' has wrong datatype", + resolveColName); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } if (slave_set_resolve_fn(thd, share, ndbtab, - args[0].fieldno, resolve_col_sz, - conflict_fn, table, CFF_NONE)) + colNum, resolve_col_sz, + conflict_fn, CFF_NONE)) { my_snprintf(msg, msg_len, - "unable to setup conflict resolution using column '%s'", - table->s->field[args[0].fieldno]->field_name); + "Unable to setup conflict resolution using column '%s'", + resolveColName); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.", - table->s->db.str, - table->s->table_name.str, - conflict_fn->name, - table->s->field[args[0].fieldno]->field_name); - } + + /* Success, update message */ + my_snprintf(msg, msg_len, + "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.", + share->db, + share->table_name, + conflict_fn->name, + resolveColName); break; } case CFT_NDB_EPOCH: @@ -4577,7 +4604,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s * represent SavePeriod/EpochPeriod */ if (ndbtab->getExtraRowGciBits() == 0) - sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution"); + sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution", + share->db, + share->table_name); if (ndbtab->getExtraRowAuthorBits() == 0) { @@ -4589,20 +4618,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s if (slave_set_resolve_fn(thd, share, ndbtab, 0, // field_no 0, // resolve_col_sz - conflict_fn, table, CFF_REFRESH_ROWS)) + conflict_fn, CFF_REFRESH_ROWS)) { my_snprintf(msg, msg_len, "unable to setup conflict resolution"); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.", - table->s->db.str, - table->s->table_name.str, - conflict_fn->name); - } + /* Success, update message */ + my_snprintf(msg, msg_len, + "NDB Slave: Table %s.%s using conflict_fn %s.", + share->db, + share->table_name, + conflict_fn->name); + break; } case CFT_NUMBER_OF_CFTS: @@ -4908,7 +4937,6 @@ ndbcluster_get_binlog_replication_info(T const char* db, const char* table_name, uint server_id, - const TABLE *table, Uint32* binlog_flags, const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, @@ -4956,34 +4984,42 @@ ndbcluster_get_binlog_replication_info(T DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR); } - if (table != NULL) + if (conflict_fn_spec != NULL) { - if (conflict_fn_spec != NULL) + char tmp_buf[FN_REFLEN]; + + if (parse_conflict_fn_spec(conflict_fn_spec, + conflict_fn, + args, + num_args, + tmp_buf, + sizeof(tmp_buf)) != 0) { - char tmp_buf[FN_REFLEN]; + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_CONFLICT_FN_PARSE_ERROR, + ER(ER_CONFLICT_FN_PARSE_ERROR), + tmp_buf); - if (parse_conflict_fn_spec(conflict_fn_spec, - conflict_fn, - args, - num_args, - table, - tmp_buf, - sizeof(tmp_buf)) != 0) + /* + Log as well, useful for contexts where the thd's stack of + warnings are ignored + */ + if (opt_ndb_extra_logging) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, - ER_CONFLICT_FN_PARSE_ERROR, - ER(ER_CONFLICT_FN_PARSE_ERROR), - tmp_buf); - DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR); + sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s", + db, table_name, + tmp_buf); } - } - else - { - /* No conflict function specified */ - conflict_fn= NULL; - num_args= 0; + + DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR); } } + else + { + /* No conflict function specified */ + conflict_fn= NULL; + num_args= 0; + } DBUG_RETURN(0); } @@ -4992,7 +5028,6 @@ int ndbcluster_apply_binlog_replication_info(THD *thd, NDB_SHARE *share, const NDBTAB* ndbtab, - TABLE* table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, Uint32 num_args, @@ -5013,15 +5048,32 @@ ndbcluster_apply_binlog_replication_info if (setup_conflict_fn(thd, share, ndbtab, tmp_buf, sizeof(tmp_buf), - table, conflict_fn, args, - num_args) != 0) + num_args) == 0) { + if (opt_ndb_extra_logging) + { + sql_print_information("%s", tmp_buf); + } + } + else + { + /* + Dump setup failure message to error log + for cases where thd warning stack is + ignored + */ + sql_print_warning("NDB Slave: Table %s.%s : %s", + share->db, + share->table_name, + tmp_buf); + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_CONFLICT_FN_PARSE_ERROR, ER(ER_CONFLICT_FN_PARSE_ERROR), tmp_buf); + DBUG_RETURN(-1); } } @@ -5039,7 +5091,6 @@ ndbcluster_read_binlog_replication(THD * NDB_SHARE *share, const NDBTAB *ndbtab, uint server_id, - TABLE *table, bool do_set_binlog_flags) { DBUG_ENTER("ndbcluster_read_binlog_replication"); @@ -5052,7 +5103,6 @@ ndbcluster_read_binlog_replication(THD * share->db, share->table_name, server_id, - table, &binlog_flags, &conflict_fn, args, @@ -5060,7 +5110,6 @@ ndbcluster_read_binlog_replication(THD * (ndbcluster_apply_binlog_replication_info(thd, share, ndbtab, - table, conflict_fn, args, num_args, @@ -5236,7 +5285,7 @@ int ndbcluster_create_binlog_setup(THD * /* */ ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab, - ::server_id, NULL, TRUE); + ::server_id, TRUE); #endif /* check if logging turned off for this table @@ -6708,6 +6757,71 @@ void updateInjectorStats(Ndb* schemaNdb, dataNdb->getClientStat(Ndb::EventBytesRecvdCount); } +/** + injectApplyStatusWriteRow + + Inject a WRITE_ROW event on the ndb_apply_status table into + the Binlog. + This contains our server_id and the supplied epoch number. + When applied on the Slave it gives a transactional position + marker +*/ +static +bool +injectApplyStatusWriteRow(injector::transaction& trans, + ulonglong gci) +{ + DBUG_ENTER("injectApplyStatusWriteRow"); + if (ndb_apply_status_share == NULL) + { + sql_print_error("NDB: Could not get apply status share"); + DBUG_ASSERT(ndb_apply_status_share != NULL); + DBUG_RETURN(false); + } + + /* Build row buffer for generated ndb_apply_status + WRITE_ROW event + First get the relevant table structure. + */ + DBUG_ASSERT(!ndb_apply_status_share->event_data); + DBUG_ASSERT(ndb_apply_status_share->op); + Ndb_event_data* event_data= + (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); + DBUG_ASSERT(event_data); + DBUG_ASSERT(event_data->table); + TABLE* apply_status_table= event_data->table; + + /* + Intialize apply_status_table->record[0] + */ + empty_record(apply_status_table); + + apply_status_table->field[0]->store((longlong)::server_id, true); + apply_status_table->field[1]->store((longlong)gci, true); + apply_status_table->field[2]->store("", 0, &my_charset_bin); + apply_status_table->field[3]->store((longlong)0, true); + apply_status_table->field[4]->store((longlong)0, true); +#ifndef DBUG_OFF + const LEX_STRING& name= apply_status_table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", + (int) name.length, name.str)); +#endif + injector::transaction::table tbl(apply_status_table, true); + int ret = trans.use_table(::server_id, tbl); + assert(ret == 0); NDB_IGNORE_VALUE(ret); + + ret= trans.write_row(::server_id, + injector::transaction::table(apply_status_table, + true), + &apply_status_table->s->all_set, + apply_status_table->s->fields, + apply_status_table->record[0]); + + assert(ret == 0); + + DBUG_RETURN(true); +} + enum Binlog_thread_state { BCCC_running= 0, @@ -7191,44 +7305,6 @@ restart_cluster_failure: { DBUG_PRINT("info", ("pollEvents res: %d", res)); thd->proc_info= "Processing events"; - uchar apply_status_buf[512]; - TABLE *apply_status_table= NULL; - if (ndb_apply_status_share) - { - /* - We construct the buffer to write the apply status binlog - event here, as the table->record[0] buffer is referenced - by the apply status event operation, and will be filled - with data at the nextEvent call if the first event should - happen to be from the apply status table - */ - Ndb_event_data *event_data= ndb_apply_status_share->event_data; - if (!event_data) - { - DBUG_ASSERT(ndb_apply_status_share->op); - event_data= - (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); - DBUG_ASSERT(event_data); - } - apply_status_table= event_data->table; - - /* - Intialize apply_status_table->record[0] - */ - empty_record(apply_status_table); - - apply_status_table->field[0]->store((longlong)::server_id, true); - /* - gci is added later, just before writing to binlog as gci - is unknown here - */ - apply_status_table->field[2]->store("", 0, &my_charset_bin); - apply_status_table->field[3]->store((longlong)0, true); - apply_status_table->field[4]->store((longlong)0, true); - DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength); - memcpy(apply_status_buf, apply_status_table->record[0], - apply_status_table->s->reclength); - } NdbEventOperation *pOp= i_ndb->nextEvent(); ndb_binlog_index_row _row; ndb_binlog_index_row *rows= &_row; @@ -7356,35 +7432,11 @@ restart_cluster_failure: } if (trans.good()) { - if (apply_status_table) - { -#ifndef DBUG_OFF - const LEX_STRING& name= apply_status_table->s->table_name; - DBUG_PRINT("info", ("use_table: %.*s", - (int) name.length, name.str)); -#endif - injector::transaction::table tbl(apply_status_table, true); - int ret = trans.use_table(::server_id, tbl); - assert(ret == 0); NDB_IGNORE_VALUE(ret); - - /* add the gci to the record */ - Field *field= apply_status_table->field[1]; - my_ptrdiff_t row_offset= - (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]); - field->move_field_offset(row_offset); - field->store((longlong)gci, true); - field->move_field_offset(-row_offset); - - trans.write_row(::server_id, - injector::transaction::table(apply_status_table, - true), - &apply_status_table->s->all_set, - apply_status_table->s->fields, - apply_status_buf); - } - else + /* Inject ndb_apply_status WRITE_ROW event */ + if (!injectApplyStatusWriteRow(trans, + gci)) { - sql_print_error("NDB: Could not get apply status share"); + sql_print_error("NDB Binlog: Failed to inject apply status write row"); } } #ifdef RUN_NDB_BINLOG_TIMER === modified file 'sql/ha_ndbcluster_binlog.h' --- a/sql/ha_ndbcluster_binlog.h 2012-01-12 02:28:03 +0000 +++ b/sql/ha_ndbcluster_binlog.h 2012-01-17 13:57:01 +0000 @@ -237,7 +237,6 @@ ndbcluster_get_binlog_replication_info(T const char* db, const char* table_name, uint server_id, - const TABLE *table, Uint32* binlog_flags, const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, @@ -246,7 +245,6 @@ int ndbcluster_apply_binlog_replication_info(THD *thd, NDB_SHARE *share, const NDBTAB* ndbtab, - TABLE* table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, Uint32 num_args, @@ -257,7 +255,6 @@ ndbcluster_read_binlog_replication(THD * NDB_SHARE *share, const NDBTAB *ndbtab, uint server_id, - TABLE *table, bool do_set_binlog_flags); #endif int ndb_create_table_from_engine(THD *thd, const char *db, No bundle (reason: useless for push emails).