From: Frazer Clement Date: January 11 2012 6:31pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3758 to 3759) List-Archive: http://lists.mysql.com/commits/142374 Message-Id: <201201111831.q0BIVI4K026633@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3759 Frazer Clement 2012-01-11 [merge] Merge 7.1->7.2 added: mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc modified: mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test sql/ha_ndbcluster.cc sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h sql/ndb_share.h storage/ndb/include/kernel/NodeInfo.hpp storage/ndb/include/ndb_global.h storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/vm/ArrayPool.hpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/GlobalData.hpp storage/ndb/src/kernel/vm/SimulatedBlock.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/mt.cpp 3758 jonas oreland 2012-01-10 [merge] ndb - merge 71 to 72 modified: storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp storage/ndb/include/kernel/signaldata/EnableCom.hpp storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/blocks/trpman.hpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/GlobalData.hpp === modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result' --- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2011-09-01 15:12:11 +0000 +++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-11 16:08:31 +0000 @@ -229,7 +229,51 @@ select server_id, master_server_id, coun server_id master_server_id count a 2 1 1 3 drop table t2_max, t2_max$EX; +drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; +delete from mysql.ndb_replication; +Test that online table distribution sets up conflict functions and exceptions tables +insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)"); +insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()"); +create table t2diffex$EX ( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +create table t3oneex$EX ( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t3oneex(a int primary key, b varchar(200)) engine=ndb; +show variables like 'server_id'; +Variable_name Value +server_id 1 +MySQLD error output for server 1.1 matching pattern %NDB Slave% +relevant +[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x. +[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex +[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x. +show variables like 'server_id'; +Variable_name Value +server_id 3 +MySQLD error output for server 2.1 matching pattern %NDB Slave% +relevant +[warning] ndb slave: table test.t3oneex : no extra row author bits in table. +[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution +[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex +[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x. +[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x. +drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; "Cleanup" drop table mysql.ndb_replication; -drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; include/rpl_end.inc === modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result' --- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2011-06-21 09:44:52 +0000 +++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-11 18:28:28 +0000 @@ -15,7 +15,7 @@ delete from mysql.ndb_replication; insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)"); create table t1 (a int key, X int) engine ndb; Warnings: -Warning 1626 Error in parsing conflict function. Message: column 'X' has wrong datatype +Warning 1626 Error in parsing conflict function. Message: Column 'X' has wrong datatype drop table t1; delete from mysql.ndb_replication; insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()"); === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2011-09-01 15:12:11 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-11 11:07:09 +0000 @@ -2,7 +2,7 @@ # Test engine native conflict resolution for ndb # # ---source include/have_ndb.inc +--source include/have_multi_ndb.inc --source include/have_binlog_format_mixed_or_row.inc --source suite/ndb_rpl/ndb_master-slave.inc @@ -307,14 +307,77 @@ select server_id, master_server_id, coun --connection master drop table t2_max, t2_max$EX; +drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; +delete from mysql.ndb_replication; + +--echo Test that online table distribution sets up conflict functions and exceptions tables + +# t1allsame - Same on all servers, no exceptions table +insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)"); + +# t2diffex - Different on each server with an exceptions table +# Not a recommended configuration! +insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)"); +insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)"); + +# t3oneex - Only on one server with an exceptions table +# Note that it's not defined on the server where it's created (not recommended) +# so on the server where it's defined, we get an error due to having no extra +# author bits +# +insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()"); + +# Create exception tables +create table t2diffex$EX ( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + primary key(server_id, master_server_id, master_epoch, count)) engine ndb; + +create table t3oneex$EX ( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + primary key(server_id, master_server_id, master_epoch, count)) engine ndb; +# Now create actual tables on this server (id 1) +create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb; +create table t3oneex(a int primary key, b varchar(200)) engine=ndb; + +# Now examine server logs to see that conflict detection and +# exceptions tables were setup as expected +show variables like 'server_id'; +--let $server_num=1.1 +--let $pattern=%NDB Slave% +--let $limit=4 +--source suite/ndb_rpl/t/show_mysqld_warnings.inc + + +--connection server2 +--disable_query_log +call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*"); +--enable_query_log + +show variables like 'server_id'; +--let $server_num=2.1 +--let $pattern=%NDB Slave% +--let $limit=6 + +--source suite/ndb_rpl/t/show_mysqld_warnings.inc + +--connection master +drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; ############### --echo "Cleanup" --connection master drop table mysql.ndb_replication; -drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`; --sync_slave_with_master --source include/rpl_end.inc === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2011-07-11 11:09:58 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-11 18:28:28 +0000 @@ -231,6 +231,11 @@ insert into mysql.ndb_replication values ("test", "t3", 0, 7, "NDB\$EPOCH(32)"), ("test", "t4", 0, 7, "NDB\$EPOCH(-1)"); +--disable_query_log +# Only need suppress here, as table creation fails due to this. +call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*"); +--enable_query_log + --error 1005 create table test.t3 (a int primary key) engine=ndb; show warnings; === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2011-06-16 14:34:56 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-11 12:37:11 +0000 @@ -2,7 +2,7 @@ # Some negative tests of the ndb_replication table # # ---source include/have_ndb.inc +--source include/have_multi_ndb.inc --source include/have_binlog_format_mixed_or_row.inc --source suite/ndb_rpl/ndb_master-slave.inc @@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication --enable_warnings --enable_query_log +# Need suppressions on all servers where warnings/errors can be seen. +--disable_query_log +--connection server1 +call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); +call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); +call mtr.add_suppression("NDB Slave: .* missing function argument .*"); +call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); +--connection server2 +call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); +call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); +call mtr.add_suppression("NDB Slave: .* missing function argument .*"); +call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); +--connection default +--enable_query_log + # Non existant conflict_fn # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)"); --error 1005 create table t1 (a int key, X int) engine ndb; @@ -56,6 +73,8 @@ delete from mysql.ndb_replication; # Column type cannot be used for this function # gives warning when creating table +#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)"); create table t1 (a int key, X int) engine ndb; drop table t1; @@ -63,6 +82,8 @@ delete from mysql.ndb_replication; # Too few arguments # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* missing function argument .*"); + insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()"); --error 1005 create table t1 (a int key, X int) engine ndb; @@ -71,6 +92,7 @@ delete from mysql.ndb_replication; # Too many arguments # gives error when creating table +#call mtr.add_suppression("NDB Slave: .* missing \')\' .*"); insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)"); --error 1005 create table t1 (a int key, X int) engine ndb; === added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc' --- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 2012-01-11 16:08:31 +0000 @@ -0,0 +1,10 @@ +--disable_query_log +--echo MySQLD error output for server $server_num matching pattern $pattern +create table errlog (a int auto_increment primary key, txt text) engine=myisam; + +--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog (txt); + +--eval select lower(right(txt, length(txt) - locate('[',txt) + 1)) as relevant from errlog where txt like '$pattern' order by a desc limit $limit; + +drop table errlog; +--enable_query_log === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2011-12-20 13:26:37 +0000 +++ b/sql/ha_ndbcluster.cc 2012-01-11 18:28:28 +0000 @@ -2130,7 +2130,7 @@ int ha_ndbcluster::get_metadata(THD *thd #ifdef HAVE_NDB_BINLOG ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table, - ::server_id, table, FALSE); + ::server_id, FALSE); #endif DBUG_RETURN(0); @@ -5143,6 +5143,7 @@ ha_ndbcluster::prepare_conflict_detectio { res = conflict_fn->prep_func(m_share->m_cfn_share, op_type, + m_ndb_record, old_data, new_data, table->write_set, @@ -5993,7 +5994,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR for (k= 0; k < nkey; k++) { DBUG_ASSERT(pk_row != NULL); - const uchar* data= pk_row + cfn_share->m_offset[k]; + const uchar* data= + (const uchar*) NdbDictionary::getValuePtr(key_rec, + (const char*) pk_row, + cfn_share->m_key_attrids[k]); if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1) { err= ex_op->getNdbError(); @@ -10020,7 +10024,6 @@ int ha_ndbcluster::create(const char *na m_dbname, m_tabname, ::server_id, - form, &binlog_flags, &conflict_fn, args, @@ -10484,7 +10487,6 @@ cleanup_failed: ndbcluster_apply_binlog_replication_info(thd, share, m_table, - form, conflict_fn, args, num_args, @@ -10929,7 +10931,7 @@ int ha_ndbcluster::rename_table(const ch #ifdef HAVE_NDB_BINLOG if (share) ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab, - ::server_id, NULL, TRUE); + ::server_id, TRUE); #endif /* always create an event for the table */ String event_name(INJECTOR_EVENT_LEN); === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2012-01-09 18:41:24 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2012-01-11 18:28:28 +0000 @@ -4004,7 +4004,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE const NDBTAB *ndbtab, uint field_index, uint resolve_col_sz, const st_conflict_fn_def* conflict_fn, - TABLE *table, uint8 flags) { DBUG_ENTER("slave_set_resolve_fn"); @@ -4024,8 +4023,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE /* Calculate resolve col stuff (if relevant) */ cfn_share->m_resolve_size= resolve_col_sz; cfn_share->m_resolve_column= field_index; - cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr - - table->record[0]); cfn_share->m_flags = flags; { @@ -4074,8 +4071,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE col->getNullable() == ex_col->getNullable(); if (!ok) break; - cfn_share->m_offset[k]= - (uint16)(table->field[i]->ptr - table->record[0]); + /* + Store mapping of Exception table key# to + orig table attrid + */ + cfn_share->m_key_attrids[k]= i; k++; } } @@ -4086,9 +4086,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE ndbtab_g.release(); if (opt_ndb_extra_logging) sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s", - table->s->db.str, - table->s->table_name.str, - table->s->db.str, + share->db, + share->table_name, + share->db, ex_tab_name); } else @@ -4121,6 +4121,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE int row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4129,7 +4130,10 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR DBUG_ENTER("row_conflict_fn_old"); uint32 resolve_column= cfn_share->m_resolve_column; uint32 resolve_size= cfn_share->m_resolve_size; - const uchar* field_ptr = old_data + cfn_share->m_resolve_offset; + const uchar* field_ptr = (const uchar*) + NdbDictionary::getValuePtr(data_record, + (const char*) old_data, + cfn_share->m_resolve_column); assert((resolve_size == 4) || (resolve_size == 8)); @@ -4197,6 +4201,7 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR int row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4205,7 +4210,10 @@ row_conflict_fn_max_update_only(NDB_CONF DBUG_ENTER("row_conflict_fn_max_update_only"); uint32 resolve_column= cfn_share->m_resolve_column; uint32 resolve_size= cfn_share->m_resolve_size; - const uchar* field_ptr = new_data + cfn_share->m_resolve_offset; + const uchar* field_ptr = (const uchar*) + NdbDictionary::getValuePtr(data_record, + (const char*) new_data, + cfn_share->m_resolve_column); assert((resolve_size == 4) || (resolve_size == 8)); @@ -4284,6 +4292,7 @@ row_conflict_fn_max_update_only(NDB_CONF int row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4297,6 +4306,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR case UPDATE_ROW: return row_conflict_fn_max_update_only(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4308,6 +4318,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR */ return row_conflict_fn_old(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4336,6 +4347,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR int row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4349,6 +4361,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT case UPDATE_ROW: return row_conflict_fn_max_update_only(cfn_share, op_type, + data_record, old_data, new_data, write_set, @@ -4373,6 +4386,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT int row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -4468,7 +4482,6 @@ parse_conflict_fn_spec(const char* confl const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, Uint32* max_args, - const TABLE* table, char *msg, uint msg_len) { DBUG_ENTER("parse_conflict_fn_spec"); @@ -4559,9 +4572,6 @@ parse_conflict_fn_spec(const char* confl uint len= (uint)(end_arg - start_arg); args[no_args].type= type; - args[no_args].ptr= start_arg; - args[no_args].len= len; - args[no_args].fieldno= (uint32)-1; DBUG_PRINT("info", ("found argument %s %u", start_arg, len)); @@ -4570,20 +4580,13 @@ parse_conflict_fn_spec(const char* confl { case CFAT_COLUMN_NAME: { - /* find column in table */ - DBUG_PRINT("info", ("searching for %s %u", start_arg, len)); - TABLE_SHARE *table_s= table->s; - for (uint j= 0; j < table_s->fields; j++) - { - Field *field= table_s->field[j]; - if (strncmp(start_arg, field->field_name, len) == 0 && - field->field_name[len] == '\0') - { - DBUG_PRINT("info", ("found %s", field->field_name)); - args[no_args].fieldno= j; - break; - } - } + /* Copy column name out into argument's buffer */ + char* dest= &args[no_args].resolveColNameBuff[0]; + + memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ? + len : + NAME_CHAR_LEN)); + dest[len]= '\0'; break; } case CFAT_EXTRA_GCI_BITS: @@ -4645,7 +4648,7 @@ parse_conflict_fn_spec(const char* confl } /* parse error */ my_snprintf(msg, msg_len, "%s, %s at '%s'", - conflict_fn_spec, error_str, ptr); + conflict_fn_spec, error_str, ptr); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } @@ -4654,7 +4657,6 @@ static int setup_conflict_fn(THD *thd, NDB_SHARE *share, const NDBTAB *ndbtab, char *msg, uint msg_len, - TABLE *table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, const Uint32 num_args) @@ -4676,38 +4678,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s DBUG_RETURN(-1); } + /* Now try to find the column in the table */ + int colNum = -1; + const char* resolveColName = args[0].resolveColNameBuff; + int resolveColNameLen = (int)strlen(resolveColName); + + for (int j=0; j< ndbtab->getNoOfColumns(); j++) + { + const char* colName = ndbtab->getColumn(j)->getName(); + + if (strncmp(colName, + resolveColName, + resolveColNameLen) == 0 && + colName[resolveColNameLen] == '\0') + { + colNum = j; + break; + } + } + if (colNum == -1) + { + my_snprintf(msg, msg_len, + "Could not find resolve column %s.", + resolveColName); + DBUG_PRINT("info", ("%s", msg)); + DBUG_RETURN(-1); + } + uint resolve_col_sz= 0; if (0 == (resolve_col_sz = - slave_check_resolve_col_type(ndbtab, args[0].fieldno))) + slave_check_resolve_col_type(ndbtab, colNum))) { /* wrong data type */ slave_reset_conflict_fn(share); my_snprintf(msg, msg_len, - "column '%s' has wrong datatype", - table->s->field[args[0].fieldno]->field_name); + "Column '%s' has wrong datatype", + resolveColName); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } if (slave_set_resolve_fn(thd, share, ndbtab, - args[0].fieldno, resolve_col_sz, - conflict_fn, table, CFF_NONE)) + colNum, resolve_col_sz, + conflict_fn, CFF_NONE)) { my_snprintf(msg, msg_len, - "unable to setup conflict resolution using column '%s'", - table->s->field[args[0].fieldno]->field_name); + "Unable to setup conflict resolution using column '%s'", + resolveColName); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.", - table->s->db.str, - table->s->table_name.str, - conflict_fn->name, - table->s->field[args[0].fieldno]->field_name); - } + + /* Success, update message */ + my_snprintf(msg, msg_len, + "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.", + share->db, + share->table_name, + conflict_fn->name, + resolveColName); break; } case CFT_NDB_EPOCH: @@ -4734,7 +4763,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s * represent SavePeriod/EpochPeriod */ if (ndbtab->getExtraRowGciBits() == 0) - sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution"); + sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution", + share->db, + share->table_name); if (ndbtab->getExtraRowAuthorBits() == 0) { @@ -4746,20 +4777,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s if (slave_set_resolve_fn(thd, share, ndbtab, 0, // field_no 0, // resolve_col_sz - conflict_fn, table, CFF_REFRESH_ROWS)) + conflict_fn, CFF_REFRESH_ROWS)) { my_snprintf(msg, msg_len, "unable to setup conflict resolution"); DBUG_PRINT("info", ("%s", msg)); DBUG_RETURN(-1); } - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.", - table->s->db.str, - table->s->table_name.str, - conflict_fn->name); - } + /* Success, update message */ + my_snprintf(msg, msg_len, + "NDB Slave: Table %s.%s using conflict_fn %s.", + share->db, + share->table_name, + conflict_fn->name); + break; } case CFT_NUMBER_OF_CFTS: @@ -5065,7 +5096,6 @@ ndbcluster_get_binlog_replication_info(T const char* db, const char* table_name, uint server_id, - const TABLE *table, Uint32* binlog_flags, const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, @@ -5113,34 +5143,42 @@ ndbcluster_get_binlog_replication_info(T DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR); } - if (table != NULL) + if (conflict_fn_spec != NULL) { - if (conflict_fn_spec != NULL) + char tmp_buf[FN_REFLEN]; + + if (parse_conflict_fn_spec(conflict_fn_spec, + conflict_fn, + args, + num_args, + tmp_buf, + sizeof(tmp_buf)) != 0) { - char tmp_buf[FN_REFLEN]; + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_CONFLICT_FN_PARSE_ERROR, + ER(ER_CONFLICT_FN_PARSE_ERROR), + tmp_buf); - if (parse_conflict_fn_spec(conflict_fn_spec, - conflict_fn, - args, - num_args, - table, - tmp_buf, - sizeof(tmp_buf)) != 0) + /* + Log as well, useful for contexts where the thd's stack of + warnings are ignored + */ + if (opt_ndb_extra_logging) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, - ER_CONFLICT_FN_PARSE_ERROR, - ER(ER_CONFLICT_FN_PARSE_ERROR), - tmp_buf); - DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR); + sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s", + db, table_name, + tmp_buf); } - } - else - { - /* No conflict function specified */ - conflict_fn= NULL; - num_args= 0; + + DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR); } } + else + { + /* No conflict function specified */ + conflict_fn= NULL; + num_args= 0; + } DBUG_RETURN(0); } @@ -5149,7 +5187,6 @@ int ndbcluster_apply_binlog_replication_info(THD *thd, NDB_SHARE *share, const NDBTAB* ndbtab, - TABLE* table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, Uint32 num_args, @@ -5170,15 +5207,32 @@ ndbcluster_apply_binlog_replication_info if (setup_conflict_fn(thd, share, ndbtab, tmp_buf, sizeof(tmp_buf), - table, conflict_fn, args, - num_args) != 0) + num_args) == 0) { + if (opt_ndb_extra_logging) + { + sql_print_information("%s", tmp_buf); + } + } + else + { + /* + Dump setup failure message to error log + for cases where thd warning stack is + ignored + */ + sql_print_warning("NDB Slave: Table %s.%s : %s", + share->db, + share->table_name, + tmp_buf); + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_CONFLICT_FN_PARSE_ERROR, ER(ER_CONFLICT_FN_PARSE_ERROR), tmp_buf); + DBUG_RETURN(-1); } } @@ -5196,7 +5250,6 @@ ndbcluster_read_binlog_replication(THD * NDB_SHARE *share, const NDBTAB *ndbtab, uint server_id, - TABLE *table, bool do_set_binlog_flags) { DBUG_ENTER("ndbcluster_read_binlog_replication"); @@ -5209,7 +5262,6 @@ ndbcluster_read_binlog_replication(THD * share->db, share->table_name, server_id, - table, &binlog_flags, &conflict_fn, args, @@ -5217,7 +5269,6 @@ ndbcluster_read_binlog_replication(THD * (ndbcluster_apply_binlog_replication_info(thd, share, ndbtab, - table, conflict_fn, args, num_args, @@ -5322,7 +5373,7 @@ int ndbcluster_create_binlog_setup(THD * /* */ ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab, - ::server_id, NULL, TRUE); + ::server_id, TRUE); #endif /* check if logging turned off for this table @@ -6690,6 +6741,71 @@ void updateInjectorStats(Ndb* schemaNdb, dataNdb->getClientStat(Ndb::EventBytesRecvdCount); } +/** + injectApplyStatusWriteRow + + Inject a WRITE_ROW event on the ndb_apply_status table into + the Binlog. + This contains our server_id and the supplied epoch number. + When applied on the Slave it gives a transactional position + marker +*/ +static +bool +injectApplyStatusWriteRow(injector::transaction& trans, + ulonglong gci) +{ + DBUG_ENTER("injectApplyStatusWriteRow"); + if (ndb_apply_status_share == NULL) + { + sql_print_error("NDB: Could not get apply status share"); + DBUG_ASSERT(ndb_apply_status_share != NULL); + DBUG_RETURN(false); + } + + /* Build row buffer for generated ndb_apply_status + WRITE_ROW event + First get the relevant table structure. + */ + DBUG_ASSERT(!ndb_apply_status_share->event_data); + DBUG_ASSERT(ndb_apply_status_share->op); + Ndb_event_data* event_data= + (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); + DBUG_ASSERT(event_data); + DBUG_ASSERT(event_data->shadow_table); + TABLE* apply_status_table= event_data->shadow_table; + + /* + Intialize apply_status_table->record[0] + */ + empty_record(apply_status_table); + + apply_status_table->field[0]->store((longlong)::server_id, true); + apply_status_table->field[1]->store((longlong)gci, true); + apply_status_table->field[2]->store("", 0, &my_charset_bin); + apply_status_table->field[3]->store((longlong)0, true); + apply_status_table->field[4]->store((longlong)0, true); +#ifndef DBUG_OFF + const LEX_STRING& name= apply_status_table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", + (int) name.length, name.str)); +#endif + injector::transaction::table tbl(apply_status_table, true); + int ret = trans.use_table(::server_id, tbl); + assert(ret == 0); NDB_IGNORE_VALUE(ret); + + ret= trans.write_row(::server_id, + injector::transaction::table(apply_status_table, + true), + &apply_status_table->s->all_set, + apply_status_table->s->fields, + apply_status_table->record[0]); + + assert(ret == 0); + + DBUG_RETURN(true); +} + extern ulong opt_ndb_report_thresh_binlog_epoch_slip; extern ulong opt_ndb_report_thresh_binlog_mem_usage; @@ -7161,44 +7277,6 @@ restart_cluster_failure: { DBUG_PRINT("info", ("pollEvents res: %d", res)); thd->proc_info= "Processing events"; - uchar apply_status_buf[512]; - TABLE *apply_status_table= NULL; - if (ndb_apply_status_share) - { - /* - We construct the buffer to write the apply status binlog - event here, as the table->record[0] buffer is referenced - by the apply status event operation, and will be filled - with data at the nextEvent call if the first event should - happen to be from the apply status table - */ - Ndb_event_data *event_data= ndb_apply_status_share->event_data; - if (!event_data) - { - DBUG_ASSERT(ndb_apply_status_share->op); - event_data= - (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); - DBUG_ASSERT(event_data); - } - apply_status_table= event_data->shadow_table; - - /* - Intialize apply_status_table->record[0] - */ - empty_record(apply_status_table); - - apply_status_table->field[0]->store((longlong)::server_id, true); - /* - gci is added later, just before writing to binlog as gci - is unknown here - */ - apply_status_table->field[2]->store("", 0, &my_charset_bin); - apply_status_table->field[3]->store((longlong)0, true); - apply_status_table->field[4]->store((longlong)0, true); - DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength); - memcpy(apply_status_buf, apply_status_table->record[0], - apply_status_table->s->reclength); - } NdbEventOperation *pOp= i_ndb->nextEvent(); ndb_binlog_index_row _row; ndb_binlog_index_row *rows= &_row; @@ -7321,35 +7399,11 @@ restart_cluster_failure: } if (trans.good()) { - if (apply_status_table) - { -#ifndef DBUG_OFF - const LEX_STRING& name= apply_status_table->s->table_name; - DBUG_PRINT("info", ("use_table: %.*s", - (int) name.length, name.str)); -#endif - injector::transaction::table tbl(apply_status_table, true); - int ret = trans.use_table(::server_id, tbl); - assert(ret == 0); NDB_IGNORE_VALUE(ret); - - /* add the gci to the record */ - Field *field= apply_status_table->field[1]; - my_ptrdiff_t row_offset= - (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]); - field->move_field_offset(row_offset); - field->store((longlong)gci, true); - field->move_field_offset(-row_offset); - - trans.write_row(::server_id, - injector::transaction::table(apply_status_table, - true), - &apply_status_table->s->all_set, - apply_status_table->s->fields, - apply_status_buf); - } - else + /* Inject ndb_apply_status WRITE_ROW event */ + if (!injectApplyStatusWriteRow(trans, + gci)) { - sql_print_error("NDB: Could not get apply status share"); + sql_print_error("NDB Binlog: Failed to inject apply status write row"); } } === modified file 'sql/ha_ndbcluster_binlog.h' --- a/sql/ha_ndbcluster_binlog.h 2011-11-10 20:35:28 +0000 +++ b/sql/ha_ndbcluster_binlog.h 2012-01-11 18:28:28 +0000 @@ -82,7 +82,6 @@ ndbcluster_get_binlog_replication_info(T const char* db, const char* table_name, uint server_id, - const TABLE *table, Uint32* binlog_flags, const st_conflict_fn_def** conflict_fn, st_conflict_fn_arg* args, @@ -91,7 +90,6 @@ int ndbcluster_apply_binlog_replication_info(THD *thd, NDB_SHARE *share, const NDBTAB* ndbtab, - TABLE* table, const st_conflict_fn_def* conflict_fn, const st_conflict_fn_arg* args, Uint32 num_args, @@ -102,7 +100,6 @@ ndbcluster_read_binlog_replication(THD * NDB_SHARE *share, const NDBTAB *ndbtab, uint server_id, - TABLE *table, bool do_set_binlog_flags); #endif int ndb_create_table_from_engine(THD *thd, const char *db, === modified file 'sql/ndb_share.h' --- a/sql/ndb_share.h 2011-11-10 08:21:36 +0000 +++ b/sql/ndb_share.h 2012-01-11 18:28:28 +0000 @@ -22,6 +22,8 @@ #include // MEM_ROOT #include // THR_LOCK #include // MY_BITMAP +#include // NAME_CHAR_LEN +#include // MAX_REF_PARTS #include // Ndb::TupleIdRange @@ -56,11 +58,9 @@ enum enum_conflict_fn_arg_type struct st_conflict_fn_arg { enum_conflict_fn_arg_type type; - const char *ptr; - uint32 len; union { - uint32 fieldno; // CFAT_COLUMN_NAME + char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS }; }; @@ -88,6 +88,7 @@ enum enum_conflicting_op_type */ typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share, enum_conflicting_op_type op_type, + const NdbRecord* data_record, const uchar* old_data, const uchar* new_data, const MY_BITMAP* write_set, @@ -131,16 +132,21 @@ enum enum_conflict_fn_table_flags CFF_REFRESH_ROWS = 1 }; +/* + Maximum supported key parts (16) + (Ndb supports 32, but MySQL has a lower limit) +*/ +static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS; + typedef struct st_ndbcluster_conflict_fn_share { const st_conflict_fn_def* m_conflict_fn; /* info about original table */ uint8 m_pk_cols; - uint8 m_resolve_column; + uint16 m_resolve_column; uint8 m_resolve_size; uint8 m_flags; - uint16 m_offset[16]; - uint16 m_resolve_offset; + uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ]; const NdbDictionary::Table *m_ex_tab; uint32 m_count; === modified file 'storage/ndb/include/kernel/NodeInfo.hpp' --- a/storage/ndb/include/kernel/NodeInfo.hpp 2011-07-05 12:46:07 +0000 +++ b/storage/ndb/include/kernel/NodeInfo.hpp 2012-01-11 18:28:28 +0000 @@ -37,15 +37,14 @@ public: INVALID = 255 ///< Invalid type }; NodeType getType() const; - + Uint32 m_version; ///< Ndb version Uint32 m_mysql_version; ///< MySQL version Uint32 m_lqh_workers; ///< LQH workers Uint32 m_type; ///< Node type Uint32 m_connectCount; ///< No of times connected - bool m_connected; ///< Node is connected - Uint32 m_heartbeat_cnt; ///< Missed heartbeats - + Uint32 m_connected; ///< Node is connected + friend NdbOut & operator<<(NdbOut&, const NodeInfo&); }; @@ -57,7 +56,6 @@ NodeInfo::NodeInfo(){ m_lqh_workers = 0; m_type = INVALID; m_connectCount = 0; - m_heartbeat_cnt= 0; } inline === modified file 'storage/ndb/include/ndb_global.h' --- a/storage/ndb/include/ndb_global.h 2011-12-10 19:02:03 +0000 +++ b/storage/ndb/include/ndb_global.h 2012-01-11 18:28:28 +0000 @@ -249,6 +249,11 @@ extern "C" { #define ATTRIBUTE_NOINLINE #endif +/** + * Pad to NDB_CL size + */ +#define NDB_CL_PADSZ(x) (NDB_CL - ((x) % NDB_CL)) + /* * require is like a normal assert, only it's always on (eg. in release) */ === modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp' --- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-01-11 18:28:28 +0000 @@ -1326,7 +1326,7 @@ private: Uint32 clastVerifyQueue; Uint32 m_empty_done; Uint32 m_ref; - char pad[NDB_CL - (sizeof(void*) + 4 * sizeof(Uint32))]; + char pad[NDB_CL_PADSZ(sizeof(void*) + 4 * sizeof(Uint32))]; }; bool isEmpty(const DIVERIFY_queue&); === modified file 'storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp' --- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2011-09-02 09:16:56 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2012-01-11 18:28:28 +0000 @@ -575,6 +575,14 @@ private: #ifdef ERROR_INSERT Uint32 nodeFailCount; #endif + + Uint32 get_hb_count(Uint32 nodeId) const { + return globalData.get_hb_count(nodeId); + } + + Uint32& set_hb_count(Uint32 nodeId) { + return globalData.set_hb_count(nodeId); + } }; #endif === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2011-11-22 20:11:29 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2012-01-11 18:28:28 +0000 @@ -122,7 +122,7 @@ void Qmgr::initData() nodePtr.p->phase = ZAPI_INACTIVE; } - setNodeInfo(nodePtr.i).m_heartbeat_cnt = cnt; + set_hb_count(nodePtr.i) = cnt; nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE; nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE; nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE; === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-11 18:28:28 +0000 @@ -97,7 +97,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* sign jamEntry(); hbNodePtr.i = signal->theData[0]; ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec); - setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(hbNodePtr.i) = 0; return; }//Qmgr::execCM_HEARTBEAT() @@ -324,7 +324,7 @@ void Qmgr::execSTTOR(Signal* signal) if (nodePtr.p->phase == ZAPI_INACTIVE) { jam(); - setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3; + set_hb_count(nodePtr.i) = 3; nodePtr.p->phase = ZFAIL_CLOSING; nodePtr.p->failState = NORMAL; } @@ -2093,7 +2093,7 @@ void Qmgr::execCM_ADD(Signal* signal) ndbrequire(addNodePtr.p->phase == ZSTARTING); addNodePtr.p->phase = ZRUNNING; m_connectivity_check.reportNodeConnect(addNodePtr.i); - setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(addNodePtr.i) = 0; c_clusterNodes.set(addNodePtr.i); findNeighbours(signal, __LINE__); @@ -2182,7 +2182,7 @@ Qmgr::joinedCluster(Signal* signal, Node * NODES IN THE CLUSTER. */ nodePtr.p->phase = ZRUNNING; - setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(nodePtr.i) = 0; findNeighbours(signal, __LINE__); c_clusterNodes.set(nodePtr.i); c_start.reset(); @@ -2429,7 +2429,7 @@ void Qmgr::findNeighbours(Signal* signal *---------------------------------------------------------------------*/ fnNodePtr.i = cneighbourl; ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec); - setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(fnNodePtr.i) = 0; }//if }//if @@ -2737,18 +2737,20 @@ void Qmgr::checkHeartbeat(Signal* signal }//if ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec); - setNodeInfo(nodePtr.i).m_heartbeat_cnt++; + set_hb_count(nodePtr.i)++; ndbrequire(nodePtr.p->phase == ZRUNNING); ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB); - if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){ + if (get_hb_count(nodePtr.i) > 2) + { signal->theData[0] = NDB_LE_MissedHeartbeat; signal->theData[1] = nodePtr.i; - signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1; + signal->theData[2] = get_hb_count(nodePtr.i) - 1; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB); } - if (getNodeInfo(nodePtr.i).m_heartbeat_cnt > 4) { + if (get_hb_count(nodePtr.i) > 4) + { jam(); if (m_connectivity_check.getEnabled()) { @@ -2791,21 +2793,21 @@ void Qmgr::apiHbHandlingLab(Signal* sign if (c_connectedNodes.get(nodeId)) { jam(); - setNodeInfo(TnodePtr.i).m_heartbeat_cnt++; - - if(getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 2) + set_hb_count(TnodePtr.i)++; + + if (get_hb_count(TnodePtr.i) > 2) { signal->theData[0] = NDB_LE_MissedHeartbeat; signal->theData[1] = nodeId; - signal->theData[2] = getNodeInfo(TnodePtr.i).m_heartbeat_cnt - 1; + signal->theData[2] = get_hb_count(TnodePtr.i) - 1; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB); } - - if (getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 4) + + if (get_hb_count(TnodePtr.i) > 4) { jam(); /*------------------------------------------------------------------*/ - /* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS. + /* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS. * WE WILL DISCONNECT FROM IT NOW. *------------------------------------------------------------------*/ /*------------------------------------------------------------------*/ @@ -2814,7 +2816,7 @@ void Qmgr::apiHbHandlingLab(Signal* sign signal->theData[0] = NDB_LE_DeadDueToHeartbeat; signal->theData[1] = nodeId; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); - + api_failed(signal, nodeId); }//if }//if @@ -2843,16 +2845,16 @@ void Qmgr::checkStartInterface(Signal* s Uint32 type = getNodeInfo(nodePtr.i).m_type; if (nodePtr.p->phase == ZFAIL_CLOSING) { jam(); - setNodeInfo(nodePtr.i).m_heartbeat_cnt++; + set_hb_count(nodePtr.i)++; if (c_connectedNodes.get(nodePtr.i)){ jam(); /*-------------------------------------------------------------------*/ // We need to ensure that the connection is not restored until it has // been disconnected for at least three seconds. /*-------------------------------------------------------------------*/ - setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(nodePtr.i) = 0; }//if - if ((getNodeInfo(nodePtr.i).m_heartbeat_cnt > 3) + if ((get_hb_count(nodePtr.i) > 3) && (nodePtr.p->failState == NORMAL)) { /**------------------------------------------------------------------ * WE HAVE DISCONNECTED THREE SECONDS AGO. WE ARE NOW READY TO @@ -2885,12 +2887,12 @@ void Qmgr::checkStartInterface(Signal* s * Dont allow API node to connect before c_allow_api_connect */ jam(); - setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3; + set_hb_count(nodePtr.i) = 3; continue; } } - setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(nodePtr.i) = 0; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; sendSignal(TRPMAN_REF, GSN_OPEN_COMORD, signal, 2, JBB); @@ -2898,7 +2900,7 @@ void Qmgr::checkStartInterface(Signal* s else { jam(); - if(((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 60) == 0) + if(((get_hb_count(nodePtr.i) + 1) % 60) == 0) { jam(); char buf[256]; @@ -2909,10 +2911,10 @@ void Qmgr::checkStartInterface(Signal* s "Failure handling of node %d has not completed" " in %d min - state = %d", nodePtr.i, - (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60, + (get_hb_count(nodePtr.i)+1)/60, nodePtr.p->failState); warningEvent("%s", buf); - if (((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 300) == 0) + if (((get_hb_count(nodePtr.i) + 1) % 300) == 0) { jam(); /** @@ -2930,7 +2932,7 @@ void Qmgr::checkStartInterface(Signal* s "Failure handling of api %u has not completed" " in %d min - state = %d", nodePtr.i, - (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60, + (get_hb_count(nodePtr.i)+1)/60, nodePtr.p->failState); warningEvent("%s", buf); if (nodePtr.p->failState == WAITING_FOR_API_FAILCONF) @@ -3376,7 +3378,7 @@ void Qmgr::node_failed(Signal* signal, U /*---------------------------------------------------------------------*/ failedNodePtr.p->failState = NORMAL; failedNodePtr.p->phase = ZFAIL_CLOSING; - setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(failedNodePtr.i) = 0; CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; @@ -3445,7 +3447,7 @@ Qmgr::api_failed(Signal* signal, Uint32 failedNodePtr.p->failState = initialState; failedNodePtr.p->phase = ZFAIL_CLOSING; - setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(failedNodePtr.i) = 0; setNodeInfo(failedNodePtr.i).m_version = 0; recompute_version_info(getNodeInfo(failedNodePtr.i).m_type); @@ -3554,7 +3556,7 @@ void Qmgr::execAPI_REGREQ(Signal* signal setNodeInfo(apiNodePtr.i).m_version = version; setNodeInfo(apiNodePtr.i).m_mysql_version = mysql_version; - setNodeInfo(apiNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(apiNodePtr.i) = 0; NodeState state = getNodeState(); if (apiNodePtr.p->phase == ZAPI_INACTIVE) @@ -4123,7 +4125,7 @@ void Qmgr::handleApiCloseComConf(Signal* * Allow MGM do reconnect "directly" */ jam(); - setNodeInfo(failedNodePtr.i).m_heartbeat_cnt = 3; + set_hb_count(failedNodePtr.i) = 3; } /* Handled the single API node failure */ @@ -4524,7 +4526,7 @@ void Qmgr::execCOMMIT_FAILREQ(Signal* si ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec); nodePtr.p->phase = ZFAIL_CLOSING; nodePtr.p->failState = WAITING_FOR_NDB_FAILCONF; - setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(nodePtr.i) = 0; setNodeInfo(nodePtr.i).m_version = 0; c_clusterNodes.clear(nodePtr.i); }//for @@ -4816,7 +4818,7 @@ void Qmgr::failReport(Signal* signal, failedNodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE; failedNodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE; failedNodePtr.p->sendPresToStatus = Q_NOT_ACTIVE; - setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0; + set_hb_count(failedNodePtr.i) = 0; if (aSendFailRep == ZTRUE) { jam(); if (failedNodePtr.i != getOwnNodeId()) { === modified file 'storage/ndb/src/kernel/blocks/trpman.cpp' --- a/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-10 13:54:55 +0000 +++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-11 15:43:32 +0000 @@ -702,7 +702,7 @@ TrpmanProxy::sendCLOSE_COMCONF(Signal *s CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend(); *conf = ss.m_req; - sendSignal(conf->xxxBlockRef, GSN_CLOSE_COMCONF, signal, + sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, CloseComReqConf::SignalLength, JBB); ssRelease(ssId); } === modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp' --- a/storage/ndb/src/kernel/vm/ArrayPool.hpp 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp 2012-01-11 18:28:28 +0000 @@ -261,7 +261,7 @@ protected: * Protect here means to have them on separate CPU cache lines to * avoid false CPU cache line sharing. */ - char protect_read_var[NDB_CL - (sizeof(Uint32) + sizeof(void*))]; + char protect_read_var[NDB_CL_PADSZ(sizeof(Uint32) + sizeof(void*))]; Uint32 firstFree; Uint32 noOfFree; Uint32 noOfFreeMin; === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-10 18:21:38 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-11 18:28:28 +0000 @@ -682,6 +682,12 @@ Configuration::calcSizeAlt(ConfigValues lqhInstances = globalData.ndbMtLqhWorkers; } + Uint32 tcInstances = 1; + if (globalData.ndbMtTcThreads > 1) + { + tcInstances = globalData.ndbMtTcThreads; + } + Uint64 indexMem = 0, dataMem = 0; ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem); ndb_mgm_get_int64_parameter(&db, CFG_DB_INDEX_MEM, &indexMem); @@ -808,7 +814,8 @@ Configuration::calcSizeAlt(ConfigValues #if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0) noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) + #else - noOfLocalScanRecords = 4 * (noOfDBNodes * noOfScanRecords) + + noOfLocalScanRecords = tcInstances * lqhInstances * + (noOfDBNodes * noOfScanRecords) + #endif 1 /* NR */ + 1 /* LCP */; === modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp' --- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2012-01-10 13:54:55 +0000 +++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2012-01-11 13:16:31 +0000 @@ -41,9 +41,10 @@ enum restartStates {initial_state, perform_stop}; struct GlobalData { + Uint32 m_hb_count[MAX_NODES]; // hb counters NodeInfo m_nodeInfo[MAX_NODES]; // At top to ensure cache alignment Signal VMSignals[1]; // Owned by FastScheduler:: - Uint32 m_restart_seq; // + Uint32 m_restart_seq; // NodeVersionInfo m_versionInfo; Uint64 internalMillisecCounter; // Owned by ThreadConfig:: @@ -91,6 +92,7 @@ struct GlobalData { ndbMtSendThreads = 0; ndbMtReceiveThreads = 0; ndbLogParts = 0; + bzero(m_hb_count, sizeof(m_hb_count)); #ifdef GCP_TIMER_HACK gcp_timer_limit = 0; #endif @@ -107,7 +109,18 @@ struct GlobalData { void incrementWatchDogCounter(Uint32 place); Uint32 * getWatchDogPtr(); - + + Uint32 getBlockThreads() const { + return ndbMtLqhThreads + ndbMtTcThreads + ndbMtReceiveThreads; + } + + Uint32 get_hb_count(Uint32 nodeId) const { + return m_hb_count[nodeId]; + } + + Uint32& set_hb_count(Uint32 nodeId) { + return m_hb_count[nodeId]; + } private: Uint32 watchDog; SimulatedBlock* blockTable[NO_OF_BLOCKS]; // Owned by Dispatcher:: === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-11-16 08:17:17 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2012-01-11 18:28:28 +0000 @@ -161,15 +161,20 @@ SimulatedBlock::initCommon() this->getParam("FragmentInfoHash", &count); c_fragmentInfoHash.setSize(count); - count = 5; + Uint32 def = 5; +#ifdef NDBD_MULTITHREADED + def += globalData.getBlockThreads(); +#endif + + count = def; this->getParam("ActiveMutexes", &count); c_mutexMgr.setSize(count); - - count = 5; + + count = def; this->getParam("ActiveCounters", &count); c_counterMgr.setSize(count); - count = 5; + count = def; this->getParam("ActiveThreadSync", &count); c_syncThreadPool.setSize(count); } === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-04 14:25:32 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-11 18:28:28 +0000 @@ -469,6 +469,8 @@ SignalLoggerManager::printSegmentedSecti void TransporterCallbackKernel::transporter_recv_from(NodeId nodeId) { - globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0; - return; + if (globalData.get_hb_count(nodeId) != 0) + { + globalData.set_hb_count(nodeId) = 0; + } } === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-11 18:28:28 +0000 @@ -3474,7 +3474,8 @@ rep_init(struct thr_repository* rep, uns Uint32 compute_jb_pages(struct EmulatorData * ed) { - Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1; + Uint32 cnt = NUM_MAIN_THREADS + + globalData.ndbMtReceiveThreads + globalData.ndbMtTcThreads + globalData.ndbMtLqhThreads + 1; Uint32 perthread = 0; No bundle (reason: useless for push emails).