3759 Frazer Clement 2012-01-11 [merge]
Merge 7.1->7.2
added:
mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
modified:
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ndb_share.h
storage/ndb/include/kernel/NodeInfo.hpp
storage/ndb/include/ndb_global.h
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/vm/ArrayPool.hpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/mt.cpp
3758 jonas oreland 2012-01-10 [merge]
ndb - merge 71 to 72
modified:
storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp
storage/ndb/include/kernel/signaldata/EnableCom.hpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/blocks/trpman.hpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2011-09-01 15:12:11 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-11 16:08:31 +0000
@@ -229,7 +229,51 @@ select server_id, master_server_id, coun
server_id master_server_id count a
2 1 1 3
drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+Test that online table distribution sets up conflict functions and exceptions tables
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+create table t2diffex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t3oneex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+show variables like 'server_id';
+Variable_name Value
+server_id 1
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+show variables like 'server_id';
+Variable_name Value
+server_id 3
+MySQLD error output for server 2.1 matching pattern %NDB Slave%
+relevant
+[warning] ndb slave: table test.t3oneex : no extra row author bits in table.
+[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
"Cleanup"
drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
include/rpl_end.inc
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2011-06-21 09:44:52 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-11 18:28:28 +0000
@@ -15,7 +15,7 @@ delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
Warnings:
-Warning 1626 Error in parsing conflict function. Message: column 'X' has wrong datatype
+Warning 1626 Error in parsing conflict function. Message: Column 'X' has wrong datatype
drop table t1;
delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2011-09-01 15:12:11 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-11 11:07:09 +0000
@@ -2,7 +2,7 @@
# Test engine native conflict resolution for ndb
#
#
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
--source include/have_binlog_format_mixed_or_row.inc
--source suite/ndb_rpl/ndb_master-slave.inc
@@ -307,14 +307,77 @@ select server_id, master_server_id, coun
--connection master
drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+
+--echo Test that online table distribution sets up conflict functions and exceptions tables
+
+# t1allsame - Same on all servers, no exceptions table
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+
+# t2diffex - Different on each server with an exceptions table
+# Not a recommended configuration!
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+
+# t3oneex - Only on one server with an exceptions table
+# Note that it's not defined on the server where it's created (not recommended)
+# so on the server where it's defined, we get an error due to having no extra
+# author bits
+#
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+
+# Create exception tables
+create table t2diffex$EX (
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+
+create table t3oneex$EX (
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+# Now create actual tables on this server (id 1)
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+
+# Now examine server logs to see that conflict detection and
+# exceptions tables were setup as expected
+show variables like 'server_id';
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=4
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+
+--connection server2
+--disable_query_log
+call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*");
+--enable_query_log
+
+show variables like 'server_id';
+--let $server_num=2.1
+--let $pattern=%NDB Slave%
+--let $limit=6
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+--connection master
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
###############
--echo "Cleanup"
--connection master
drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
--sync_slave_with_master
--source include/rpl_end.inc
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2011-07-11 11:09:58 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-11 18:28:28 +0000
@@ -231,6 +231,11 @@ insert into mysql.ndb_replication values
("test", "t3", 0, 7, "NDB\$EPOCH(32)"),
("test", "t4", 0, 7, "NDB\$EPOCH(-1)");
+--disable_query_log
+# Only need suppress here, as table creation fails due to this.
+call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*");
+--enable_query_log
+
--error 1005
create table test.t3 (a int primary key) engine=ndb;
show warnings;
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2011-06-16 14:34:56 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-11 12:37:11 +0000
@@ -2,7 +2,7 @@
# Some negative tests of the ndb_replication table
#
#
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
--source include/have_binlog_format_mixed_or_row.inc
--source suite/ndb_rpl/ndb_master-slave.inc
@@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication
--enable_warnings
--enable_query_log
+# Need suppressions on all servers where warnings/errors can be seen.
+--disable_query_log
+--connection server1
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection default
+--enable_query_log
+
# Non existant conflict_fn
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
--error 1005
create table t1 (a int key, X int) engine ndb;
@@ -56,6 +73,8 @@ delete from mysql.ndb_replication;
# Column type cannot be used for this function
# gives warning when creating table
+#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
drop table t1;
@@ -63,6 +82,8 @@ delete from mysql.ndb_replication;
# Too few arguments
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
--error 1005
create table t1 (a int key, X int) engine ndb;
@@ -71,6 +92,7 @@ delete from mysql.ndb_replication;
# Too many arguments
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
--error 1005
create table t1 (a int key, X int) engine ndb;
=== added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc'
--- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 2012-01-11 16:08:31 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--echo MySQLD error output for server $server_num matching pattern $pattern
+create table errlog (a int auto_increment primary key, txt text) engine=myisam;
+
+--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog (txt);
+
+--eval select lower(right(txt, length(txt) - locate('[',txt) + 1)) as relevant from errlog where txt like '$pattern' order by a desc limit $limit;
+
+drop table errlog;
+--enable_query_log
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-12-20 13:26:37 +0000
+++ b/sql/ha_ndbcluster.cc 2012-01-11 18:28:28 +0000
@@ -2130,7 +2130,7 @@ int ha_ndbcluster::get_metadata(THD *thd
#ifdef HAVE_NDB_BINLOG
ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
- ::server_id, table, FALSE);
+ ::server_id, FALSE);
#endif
DBUG_RETURN(0);
@@ -5143,6 +5143,7 @@ ha_ndbcluster::prepare_conflict_detectio
{
res = conflict_fn->prep_func(m_share->m_cfn_share,
op_type,
+ m_ndb_record,
old_data,
new_data,
table->write_set,
@@ -5993,7 +5994,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
for (k= 0; k < nkey; k++)
{
DBUG_ASSERT(pk_row != NULL);
- const uchar* data= pk_row + cfn_share->m_offset[k];
+ const uchar* data=
+ (const uchar*) NdbDictionary::getValuePtr(key_rec,
+ (const char*) pk_row,
+ cfn_share->m_key_attrids[k]);
if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
{
err= ex_op->getNdbError();
@@ -10020,7 +10024,6 @@ int ha_ndbcluster::create(const char *na
m_dbname,
m_tabname,
::server_id,
- form,
&binlog_flags,
&conflict_fn,
args,
@@ -10484,7 +10487,6 @@ cleanup_failed:
ndbcluster_apply_binlog_replication_info(thd,
share,
m_table,
- form,
conflict_fn,
args,
num_args,
@@ -10929,7 +10931,7 @@ int ha_ndbcluster::rename_table(const ch
#ifdef HAVE_NDB_BINLOG
if (share)
ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
- ::server_id, NULL, TRUE);
+ ::server_id, TRUE);
#endif
/* always create an event for the table */
String event_name(INJECTOR_EVENT_LEN);
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2012-01-09 18:41:24 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2012-01-11 18:28:28 +0000
@@ -4004,7 +4004,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
const NDBTAB *ndbtab, uint field_index,
uint resolve_col_sz,
const st_conflict_fn_def* conflict_fn,
- TABLE *table,
uint8 flags)
{
DBUG_ENTER("slave_set_resolve_fn");
@@ -4024,8 +4023,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
/* Calculate resolve col stuff (if relevant) */
cfn_share->m_resolve_size= resolve_col_sz;
cfn_share->m_resolve_column= field_index;
- cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
- table->record[0]);
cfn_share->m_flags = flags;
{
@@ -4074,8 +4071,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
col->getNullable() == ex_col->getNullable();
if (!ok)
break;
- cfn_share->m_offset[k]=
- (uint16)(table->field[i]->ptr - table->record[0]);
+ /*
+ Store mapping of Exception table key# to
+ orig table attrid
+ */
+ cfn_share->m_key_attrids[k]= i;
k++;
}
}
@@ -4086,9 +4086,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
ndbtab_g.release();
if (opt_ndb_extra_logging)
sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
- table->s->db.str,
- table->s->table_name.str,
- table->s->db.str,
+ share->db,
+ share->table_name,
+ share->db,
ex_tab_name);
}
else
@@ -4121,6 +4121,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
int
row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4129,7 +4130,10 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
DBUG_ENTER("row_conflict_fn_old");
uint32 resolve_column= cfn_share->m_resolve_column;
uint32 resolve_size= cfn_share->m_resolve_size;
- const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+ const uchar* field_ptr = (const uchar*)
+ NdbDictionary::getValuePtr(data_record,
+ (const char*) old_data,
+ cfn_share->m_resolve_column);
assert((resolve_size == 4) || (resolve_size == 8));
@@ -4197,6 +4201,7 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
int
row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4205,7 +4210,10 @@ row_conflict_fn_max_update_only(NDB_CONF
DBUG_ENTER("row_conflict_fn_max_update_only");
uint32 resolve_column= cfn_share->m_resolve_column;
uint32 resolve_size= cfn_share->m_resolve_size;
- const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+ const uchar* field_ptr = (const uchar*)
+ NdbDictionary::getValuePtr(data_record,
+ (const char*) new_data,
+ cfn_share->m_resolve_column);
assert((resolve_size == 4) || (resolve_size == 8));
@@ -4284,6 +4292,7 @@ row_conflict_fn_max_update_only(NDB_CONF
int
row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4297,6 +4306,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
case UPDATE_ROW:
return row_conflict_fn_max_update_only(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4308,6 +4318,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
*/
return row_conflict_fn_old(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4336,6 +4347,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
int
row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4349,6 +4361,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
case UPDATE_ROW:
return row_conflict_fn_max_update_only(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4373,6 +4386,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
int
row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4468,7 +4482,6 @@ parse_conflict_fn_spec(const char* confl
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
Uint32* max_args,
- const TABLE* table,
char *msg, uint msg_len)
{
DBUG_ENTER("parse_conflict_fn_spec");
@@ -4559,9 +4572,6 @@ parse_conflict_fn_spec(const char* confl
uint len= (uint)(end_arg - start_arg);
args[no_args].type= type;
- args[no_args].ptr= start_arg;
- args[no_args].len= len;
- args[no_args].fieldno= (uint32)-1;
DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
@@ -4570,20 +4580,13 @@ parse_conflict_fn_spec(const char* confl
{
case CFAT_COLUMN_NAME:
{
- /* find column in table */
- DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
- TABLE_SHARE *table_s= table->s;
- for (uint j= 0; j < table_s->fields; j++)
- {
- Field *field= table_s->field[j];
- if (strncmp(start_arg, field->field_name, len) == 0 &&
- field->field_name[len] == '\0')
- {
- DBUG_PRINT("info", ("found %s", field->field_name));
- args[no_args].fieldno= j;
- break;
- }
- }
+ /* Copy column name out into argument's buffer */
+ char* dest= &args[no_args].resolveColNameBuff[0];
+
+ memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ?
+ len :
+ NAME_CHAR_LEN));
+ dest[len]= '\0';
break;
}
case CFAT_EXTRA_GCI_BITS:
@@ -4645,7 +4648,7 @@ parse_conflict_fn_spec(const char* confl
}
/* parse error */
my_snprintf(msg, msg_len, "%s, %s at '%s'",
- conflict_fn_spec, error_str, ptr);
+ conflict_fn_spec, error_str, ptr);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
@@ -4654,7 +4657,6 @@ static int
setup_conflict_fn(THD *thd, NDB_SHARE *share,
const NDBTAB *ndbtab,
char *msg, uint msg_len,
- TABLE *table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
const Uint32 num_args)
@@ -4676,38 +4678,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
DBUG_RETURN(-1);
}
+ /* Now try to find the column in the table */
+ int colNum = -1;
+ const char* resolveColName = args[0].resolveColNameBuff;
+ int resolveColNameLen = (int)strlen(resolveColName);
+
+ for (int j=0; j< ndbtab->getNoOfColumns(); j++)
+ {
+ const char* colName = ndbtab->getColumn(j)->getName();
+
+ if (strncmp(colName,
+ resolveColName,
+ resolveColNameLen) == 0 &&
+ colName[resolveColNameLen] == '\0')
+ {
+ colNum = j;
+ break;
+ }
+ }
+ if (colNum == -1)
+ {
+ my_snprintf(msg, msg_len,
+ "Could not find resolve column %s.",
+ resolveColName);
+ DBUG_PRINT("info", ("%s", msg));
+ DBUG_RETURN(-1);
+ }
+
uint resolve_col_sz= 0;
if (0 == (resolve_col_sz =
- slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+ slave_check_resolve_col_type(ndbtab, colNum)))
{
/* wrong data type */
slave_reset_conflict_fn(share);
my_snprintf(msg, msg_len,
- "column '%s' has wrong datatype",
- table->s->field[args[0].fieldno]->field_name);
+ "Column '%s' has wrong datatype",
+ resolveColName);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
if (slave_set_resolve_fn(thd, share, ndbtab,
- args[0].fieldno, resolve_col_sz,
- conflict_fn, table, CFF_NONE))
+ colNum, resolve_col_sz,
+ conflict_fn, CFF_NONE))
{
my_snprintf(msg, msg_len,
- "unable to setup conflict resolution using column '%s'",
- table->s->field[args[0].fieldno]->field_name);
+ "Unable to setup conflict resolution using column '%s'",
+ resolveColName);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
- table->s->db.str,
- table->s->table_name.str,
- conflict_fn->name,
- table->s->field[args[0].fieldno]->field_name);
- }
+
+ /* Success, update message */
+ my_snprintf(msg, msg_len,
+ "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+ share->db,
+ share->table_name,
+ conflict_fn->name,
+ resolveColName);
break;
}
case CFT_NDB_EPOCH:
@@ -4734,7 +4763,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
* represent SavePeriod/EpochPeriod
*/
if (ndbtab->getExtraRowGciBits() == 0)
- sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
+ sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution",
+ share->db,
+ share->table_name);
if (ndbtab->getExtraRowAuthorBits() == 0)
{
@@ -4746,20 +4777,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
if (slave_set_resolve_fn(thd, share, ndbtab,
0, // field_no
0, // resolve_col_sz
- conflict_fn, table, CFF_REFRESH_ROWS))
+ conflict_fn, CFF_REFRESH_ROWS))
{
my_snprintf(msg, msg_len,
"unable to setup conflict resolution");
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
- table->s->db.str,
- table->s->table_name.str,
- conflict_fn->name);
- }
+ /* Success, update message */
+ my_snprintf(msg, msg_len,
+ "NDB Slave: Table %s.%s using conflict_fn %s.",
+ share->db,
+ share->table_name,
+ conflict_fn->name);
+
break;
}
case CFT_NUMBER_OF_CFTS:
@@ -5065,7 +5096,6 @@ ndbcluster_get_binlog_replication_info(T
const char* db,
const char* table_name,
uint server_id,
- const TABLE *table,
Uint32* binlog_flags,
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
@@ -5113,34 +5143,42 @@ ndbcluster_get_binlog_replication_info(T
DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
}
- if (table != NULL)
+ if (conflict_fn_spec != NULL)
{
- if (conflict_fn_spec != NULL)
+ char tmp_buf[FN_REFLEN];
+
+ if (parse_conflict_fn_spec(conflict_fn_spec,
+ conflict_fn,
+ args,
+ num_args,
+ tmp_buf,
+ sizeof(tmp_buf)) != 0)
{
- char tmp_buf[FN_REFLEN];
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_CONFLICT_FN_PARSE_ERROR,
+ ER(ER_CONFLICT_FN_PARSE_ERROR),
+ tmp_buf);
- if (parse_conflict_fn_spec(conflict_fn_spec,
- conflict_fn,
- args,
- num_args,
- table,
- tmp_buf,
- sizeof(tmp_buf)) != 0)
+ /*
+ Log as well, useful for contexts where the thd's stack of
+ warnings are ignored
+ */
+ if (opt_ndb_extra_logging)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_CONFLICT_FN_PARSE_ERROR,
- ER(ER_CONFLICT_FN_PARSE_ERROR),
- tmp_buf);
- DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+ sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
+ db, table_name,
+ tmp_buf);
}
- }
- else
- {
- /* No conflict function specified */
- conflict_fn= NULL;
- num_args= 0;
+
+ DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
}
}
+ else
+ {
+ /* No conflict function specified */
+ conflict_fn= NULL;
+ num_args= 0;
+ }
DBUG_RETURN(0);
}
@@ -5149,7 +5187,6 @@ int
ndbcluster_apply_binlog_replication_info(THD *thd,
NDB_SHARE *share,
const NDBTAB* ndbtab,
- TABLE* table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
Uint32 num_args,
@@ -5170,15 +5207,32 @@ ndbcluster_apply_binlog_replication_info
if (setup_conflict_fn(thd, share,
ndbtab,
tmp_buf, sizeof(tmp_buf),
- table,
conflict_fn,
args,
- num_args) != 0)
+ num_args) == 0)
{
+ if (opt_ndb_extra_logging)
+ {
+ sql_print_information("%s", tmp_buf);
+ }
+ }
+ else
+ {
+ /*
+ Dump setup failure message to error log
+ for cases where thd warning stack is
+ ignored
+ */
+ sql_print_warning("NDB Slave: Table %s.%s : %s",
+ share->db,
+ share->table_name,
+ tmp_buf);
+
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_CONFLICT_FN_PARSE_ERROR,
ER(ER_CONFLICT_FN_PARSE_ERROR),
tmp_buf);
+
DBUG_RETURN(-1);
}
}
@@ -5196,7 +5250,6 @@ ndbcluster_read_binlog_replication(THD *
NDB_SHARE *share,
const NDBTAB *ndbtab,
uint server_id,
- TABLE *table,
bool do_set_binlog_flags)
{
DBUG_ENTER("ndbcluster_read_binlog_replication");
@@ -5209,7 +5262,6 @@ ndbcluster_read_binlog_replication(THD *
share->db,
share->table_name,
server_id,
- table,
&binlog_flags,
&conflict_fn,
args,
@@ -5217,7 +5269,6 @@ ndbcluster_read_binlog_replication(THD *
(ndbcluster_apply_binlog_replication_info(thd,
share,
ndbtab,
- table,
conflict_fn,
args,
num_args,
@@ -5322,7 +5373,7 @@ int ndbcluster_create_binlog_setup(THD *
/*
*/
ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
- ::server_id, NULL, TRUE);
+ ::server_id, TRUE);
#endif
/*
check if logging turned off for this table
@@ -6690,6 +6741,71 @@ void updateInjectorStats(Ndb* schemaNdb,
dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
}
+/**
+ injectApplyStatusWriteRow
+
+ Inject a WRITE_ROW event on the ndb_apply_status table into
+ the Binlog.
+ This contains our server_id and the supplied epoch number.
+ When applied on the Slave it gives a transactional position
+ marker
+*/
+static
+bool
+injectApplyStatusWriteRow(injector::transaction& trans,
+ ulonglong gci)
+{
+ DBUG_ENTER("injectApplyStatusWriteRow");
+ if (ndb_apply_status_share == NULL)
+ {
+ sql_print_error("NDB: Could not get apply status share");
+ DBUG_ASSERT(ndb_apply_status_share != NULL);
+ DBUG_RETURN(false);
+ }
+
+ /* Build row buffer for generated ndb_apply_status
+ WRITE_ROW event
+ First get the relevant table structure.
+ */
+ DBUG_ASSERT(!ndb_apply_status_share->event_data);
+ DBUG_ASSERT(ndb_apply_status_share->op);
+ Ndb_event_data* event_data=
+ (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
+ DBUG_ASSERT(event_data);
+ DBUG_ASSERT(event_data->shadow_table);
+ TABLE* apply_status_table= event_data->shadow_table;
+
+ /*
+ Intialize apply_status_table->record[0]
+ */
+ empty_record(apply_status_table);
+
+ apply_status_table->field[0]->store((longlong)::server_id, true);
+ apply_status_table->field[1]->store((longlong)gci, true);
+ apply_status_table->field[2]->store("", 0, &my_charset_bin);
+ apply_status_table->field[3]->store((longlong)0, true);
+ apply_status_table->field[4]->store((longlong)0, true);
+#ifndef DBUG_OFF
+ const LEX_STRING& name= apply_status_table->s->table_name;
+ DBUG_PRINT("info", ("use_table: %.*s",
+ (int) name.length, name.str));
+#endif
+ injector::transaction::table tbl(apply_status_table, true);
+ int ret = trans.use_table(::server_id, tbl);
+ assert(ret == 0); NDB_IGNORE_VALUE(ret);
+
+ ret= trans.write_row(::server_id,
+ injector::transaction::table(apply_status_table,
+ true),
+ &apply_status_table->s->all_set,
+ apply_status_table->s->fields,
+ apply_status_table->record[0]);
+
+ assert(ret == 0);
+
+ DBUG_RETURN(true);
+}
+
extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
extern ulong opt_ndb_report_thresh_binlog_mem_usage;
@@ -7161,44 +7277,6 @@ restart_cluster_failure:
{
DBUG_PRINT("info", ("pollEvents res: %d", res));
thd->proc_info= "Processing events";
- uchar apply_status_buf[512];
- TABLE *apply_status_table= NULL;
- if (ndb_apply_status_share)
- {
- /*
- We construct the buffer to write the apply status binlog
- event here, as the table->record[0] buffer is referenced
- by the apply status event operation, and will be filled
- with data at the nextEvent call if the first event should
- happen to be from the apply status table
- */
- Ndb_event_data *event_data= ndb_apply_status_share->event_data;
- if (!event_data)
- {
- DBUG_ASSERT(ndb_apply_status_share->op);
- event_data=
- (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
- DBUG_ASSERT(event_data);
- }
- apply_status_table= event_data->shadow_table;
-
- /*
- Intialize apply_status_table->record[0]
- */
- empty_record(apply_status_table);
-
- apply_status_table->field[0]->store((longlong)::server_id, true);
- /*
- gci is added later, just before writing to binlog as gci
- is unknown here
- */
- apply_status_table->field[2]->store("", 0, &my_charset_bin);
- apply_status_table->field[3]->store((longlong)0, true);
- apply_status_table->field[4]->store((longlong)0, true);
- DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
- memcpy(apply_status_buf, apply_status_table->record[0],
- apply_status_table->s->reclength);
- }
NdbEventOperation *pOp= i_ndb->nextEvent();
ndb_binlog_index_row _row;
ndb_binlog_index_row *rows= &_row;
@@ -7321,35 +7399,11 @@ restart_cluster_failure:
}
if (trans.good())
{
- if (apply_status_table)
- {
-#ifndef DBUG_OFF
- const LEX_STRING& name= apply_status_table->s->table_name;
- DBUG_PRINT("info", ("use_table: %.*s",
- (int) name.length, name.str));
-#endif
- injector::transaction::table tbl(apply_status_table, true);
- int ret = trans.use_table(::server_id, tbl);
- assert(ret == 0); NDB_IGNORE_VALUE(ret);
-
- /* add the gci to the record */
- Field *field= apply_status_table->field[1];
- my_ptrdiff_t row_offset=
- (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
- field->move_field_offset(row_offset);
- field->store((longlong)gci, true);
- field->move_field_offset(-row_offset);
-
- trans.write_row(::server_id,
- injector::transaction::table(apply_status_table,
- true),
- &apply_status_table->s->all_set,
- apply_status_table->s->fields,
- apply_status_buf);
- }
- else
+ /* Inject ndb_apply_status WRITE_ROW event */
+ if (!injectApplyStatusWriteRow(trans,
+ gci))
{
- sql_print_error("NDB: Could not get apply status share");
+ sql_print_error("NDB Binlog: Failed to inject apply status write row");
}
}
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2011-11-10 20:35:28 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2012-01-11 18:28:28 +0000
@@ -82,7 +82,6 @@ ndbcluster_get_binlog_replication_info(T
const char* db,
const char* table_name,
uint server_id,
- const TABLE *table,
Uint32* binlog_flags,
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
@@ -91,7 +90,6 @@ int
ndbcluster_apply_binlog_replication_info(THD *thd,
NDB_SHARE *share,
const NDBTAB* ndbtab,
- TABLE* table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
Uint32 num_args,
@@ -102,7 +100,6 @@ ndbcluster_read_binlog_replication(THD *
NDB_SHARE *share,
const NDBTAB *ndbtab,
uint server_id,
- TABLE *table,
bool do_set_binlog_flags);
#endif
int ndb_create_table_from_engine(THD *thd, const char *db,
=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h 2011-11-10 08:21:36 +0000
+++ b/sql/ndb_share.h 2012-01-11 18:28:28 +0000
@@ -22,6 +22,8 @@
#include <my_alloc.h> // MEM_ROOT
#include <thr_lock.h> // THR_LOCK
#include <my_bitmap.h> // MY_BITMAP
+#include <mysql_com.h> // NAME_CHAR_LEN
+#include <sql_const.h> // MAX_REF_PARTS
#include <ndbapi/Ndb.hpp> // Ndb::TupleIdRange
@@ -56,11 +58,9 @@ enum enum_conflict_fn_arg_type
struct st_conflict_fn_arg
{
enum_conflict_fn_arg_type type;
- const char *ptr;
- uint32 len;
union
{
- uint32 fieldno; // CFAT_COLUMN_NAME
+ char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
};
};
@@ -88,6 +88,7 @@ enum enum_conflicting_op_type
*/
typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -131,16 +132,21 @@ enum enum_conflict_fn_table_flags
CFF_REFRESH_ROWS = 1
};
+/*
+ Maximum supported key parts (16)
+ (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
typedef struct st_ndbcluster_conflict_fn_share {
const st_conflict_fn_def* m_conflict_fn;
/* info about original table */
uint8 m_pk_cols;
- uint8 m_resolve_column;
+ uint16 m_resolve_column;
uint8 m_resolve_size;
uint8 m_flags;
- uint16 m_offset[16];
- uint16 m_resolve_offset;
+ uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
const NdbDictionary::Table *m_ex_tab;
uint32 m_count;
=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/kernel/NodeInfo.hpp 2012-01-11 18:28:28 +0000
@@ -37,15 +37,14 @@ public:
INVALID = 255 ///< Invalid type
};
NodeType getType() const;
-
+
Uint32 m_version; ///< Ndb version
Uint32 m_mysql_version; ///< MySQL version
Uint32 m_lqh_workers; ///< LQH workers
Uint32 m_type; ///< Node type
Uint32 m_connectCount; ///< No of times connected
- bool m_connected; ///< Node is connected
- Uint32 m_heartbeat_cnt; ///< Missed heartbeats
-
+ Uint32 m_connected; ///< Node is connected
+
friend NdbOut & operator<<(NdbOut&, const NodeInfo&);
};
@@ -57,7 +56,6 @@ NodeInfo::NodeInfo(){
m_lqh_workers = 0;
m_type = INVALID;
m_connectCount = 0;
- m_heartbeat_cnt= 0;
}
inline
=== modified file 'storage/ndb/include/ndb_global.h'
--- a/storage/ndb/include/ndb_global.h 2011-12-10 19:02:03 +0000
+++ b/storage/ndb/include/ndb_global.h 2012-01-11 18:28:28 +0000
@@ -249,6 +249,11 @@ extern "C" {
#define ATTRIBUTE_NOINLINE
#endif
+/**
+ * Pad to NDB_CL size
+ */
+#define NDB_CL_PADSZ(x) (NDB_CL - ((x) % NDB_CL))
+
/*
* require is like a normal assert, only it's always on (eg. in release)
*/
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-01-10 13:01:14 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-01-11 18:28:28 +0000
@@ -1326,7 +1326,7 @@ private:
Uint32 clastVerifyQueue;
Uint32 m_empty_done;
Uint32 m_ref;
- char pad[NDB_CL - (sizeof(void*) + 4 * sizeof(Uint32))];
+ char pad[NDB_CL_PADSZ(sizeof(void*) + 4 * sizeof(Uint32))];
};
bool isEmpty(const DIVERIFY_queue&);
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2011-09-02 09:16:56 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2012-01-11 18:28:28 +0000
@@ -575,6 +575,14 @@ private:
#ifdef ERROR_INSERT
Uint32 nodeFailCount;
#endif
+
+ Uint32 get_hb_count(Uint32 nodeId) const {
+ return globalData.get_hb_count(nodeId);
+ }
+
+ Uint32& set_hb_count(Uint32 nodeId) {
+ return globalData.set_hb_count(nodeId);
+ }
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2011-11-22 20:11:29 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2012-01-11 18:28:28 +0000
@@ -122,7 +122,7 @@ void Qmgr::initData()
nodePtr.p->phase = ZAPI_INACTIVE;
}
- setNodeInfo(nodePtr.i).m_heartbeat_cnt = cnt;
+ set_hb_count(nodePtr.i) = cnt;
nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-10 13:01:14 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-11 18:28:28 +0000
@@ -97,7 +97,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* sign
jamEntry();
hbNodePtr.i = signal->theData[0];
ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec);
- setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(hbNodePtr.i) = 0;
return;
}//Qmgr::execCM_HEARTBEAT()
@@ -324,7 +324,7 @@ void Qmgr::execSTTOR(Signal* signal)
if (nodePtr.p->phase == ZAPI_INACTIVE)
{
jam();
- setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3;
+ set_hb_count(nodePtr.i) = 3;
nodePtr.p->phase = ZFAIL_CLOSING;
nodePtr.p->failState = NORMAL;
}
@@ -2093,7 +2093,7 @@ void Qmgr::execCM_ADD(Signal* signal)
ndbrequire(addNodePtr.p->phase == ZSTARTING);
addNodePtr.p->phase = ZRUNNING;
m_connectivity_check.reportNodeConnect(addNodePtr.i);
- setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(addNodePtr.i) = 0;
c_clusterNodes.set(addNodePtr.i);
findNeighbours(signal, __LINE__);
@@ -2182,7 +2182,7 @@ Qmgr::joinedCluster(Signal* signal, Node
* NODES IN THE CLUSTER.
*/
nodePtr.p->phase = ZRUNNING;
- setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(nodePtr.i) = 0;
findNeighbours(signal, __LINE__);
c_clusterNodes.set(nodePtr.i);
c_start.reset();
@@ -2429,7 +2429,7 @@ void Qmgr::findNeighbours(Signal* signal
*---------------------------------------------------------------------*/
fnNodePtr.i = cneighbourl;
ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec);
- setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(fnNodePtr.i) = 0;
}//if
}//if
@@ -2737,18 +2737,20 @@ void Qmgr::checkHeartbeat(Signal* signal
}//if
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
- setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
+ set_hb_count(nodePtr.i)++;
ndbrequire(nodePtr.p->phase == ZRUNNING);
ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB);
- if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){
+ if (get_hb_count(nodePtr.i) > 2)
+ {
signal->theData[0] = NDB_LE_MissedHeartbeat;
signal->theData[1] = nodePtr.i;
- signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1;
+ signal->theData[2] = get_hb_count(nodePtr.i) - 1;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
}
- if (getNodeInfo(nodePtr.i).m_heartbeat_cnt > 4) {
+ if (get_hb_count(nodePtr.i) > 4)
+ {
jam();
if (m_connectivity_check.getEnabled())
{
@@ -2791,21 +2793,21 @@ void Qmgr::apiHbHandlingLab(Signal* sign
if (c_connectedNodes.get(nodeId))
{
jam();
- setNodeInfo(TnodePtr.i).m_heartbeat_cnt++;
-
- if(getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 2)
+ set_hb_count(TnodePtr.i)++;
+
+ if (get_hb_count(TnodePtr.i) > 2)
{
signal->theData[0] = NDB_LE_MissedHeartbeat;
signal->theData[1] = nodeId;
- signal->theData[2] = getNodeInfo(TnodePtr.i).m_heartbeat_cnt - 1;
+ signal->theData[2] = get_hb_count(TnodePtr.i) - 1;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
}
-
- if (getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 4)
+
+ if (get_hb_count(TnodePtr.i) > 4)
{
jam();
/*------------------------------------------------------------------*/
- /* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS.
+ /* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS.
* WE WILL DISCONNECT FROM IT NOW.
*------------------------------------------------------------------*/
/*------------------------------------------------------------------*/
@@ -2814,7 +2816,7 @@ void Qmgr::apiHbHandlingLab(Signal* sign
signal->theData[0] = NDB_LE_DeadDueToHeartbeat;
signal->theData[1] = nodeId;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
-
+
api_failed(signal, nodeId);
}//if
}//if
@@ -2843,16 +2845,16 @@ void Qmgr::checkStartInterface(Signal* s
Uint32 type = getNodeInfo(nodePtr.i).m_type;
if (nodePtr.p->phase == ZFAIL_CLOSING) {
jam();
- setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
+ set_hb_count(nodePtr.i)++;
if (c_connectedNodes.get(nodePtr.i)){
jam();
/*-------------------------------------------------------------------*/
// We need to ensure that the connection is not restored until it has
// been disconnected for at least three seconds.
/*-------------------------------------------------------------------*/
- setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(nodePtr.i) = 0;
}//if
- if ((getNodeInfo(nodePtr.i).m_heartbeat_cnt > 3)
+ if ((get_hb_count(nodePtr.i) > 3)
&& (nodePtr.p->failState == NORMAL)) {
/**------------------------------------------------------------------
* WE HAVE DISCONNECTED THREE SECONDS AGO. WE ARE NOW READY TO
@@ -2885,12 +2887,12 @@ void Qmgr::checkStartInterface(Signal* s
* Dont allow API node to connect before c_allow_api_connect
*/
jam();
- setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3;
+ set_hb_count(nodePtr.i) = 3;
continue;
}
}
- setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(nodePtr.i) = 0;
signal->theData[0] = 0;
signal->theData[1] = nodePtr.i;
sendSignal(TRPMAN_REF, GSN_OPEN_COMORD, signal, 2, JBB);
@@ -2898,7 +2900,7 @@ void Qmgr::checkStartInterface(Signal* s
else
{
jam();
- if(((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 60) == 0)
+ if(((get_hb_count(nodePtr.i) + 1) % 60) == 0)
{
jam();
char buf[256];
@@ -2909,10 +2911,10 @@ void Qmgr::checkStartInterface(Signal* s
"Failure handling of node %d has not completed"
" in %d min - state = %d",
nodePtr.i,
- (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60,
+ (get_hb_count(nodePtr.i)+1)/60,
nodePtr.p->failState);
warningEvent("%s", buf);
- if (((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 300) == 0)
+ if (((get_hb_count(nodePtr.i) + 1) % 300) == 0)
{
jam();
/**
@@ -2930,7 +2932,7 @@ void Qmgr::checkStartInterface(Signal* s
"Failure handling of api %u has not completed"
" in %d min - state = %d",
nodePtr.i,
- (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60,
+ (get_hb_count(nodePtr.i)+1)/60,
nodePtr.p->failState);
warningEvent("%s", buf);
if (nodePtr.p->failState == WAITING_FOR_API_FAILCONF)
@@ -3376,7 +3378,7 @@ void Qmgr::node_failed(Signal* signal, U
/*---------------------------------------------------------------------*/
failedNodePtr.p->failState = NORMAL;
failedNodePtr.p->phase = ZFAIL_CLOSING;
- setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(failedNodePtr.i) = 0;
CloseComReqConf * const closeCom =
(CloseComReqConf *)&signal->theData[0];
@@ -3445,7 +3447,7 @@ Qmgr::api_failed(Signal* signal, Uint32
failedNodePtr.p->failState = initialState;
failedNodePtr.p->phase = ZFAIL_CLOSING;
- setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(failedNodePtr.i) = 0;
setNodeInfo(failedNodePtr.i).m_version = 0;
recompute_version_info(getNodeInfo(failedNodePtr.i).m_type);
@@ -3554,7 +3556,7 @@ void Qmgr::execAPI_REGREQ(Signal* signal
setNodeInfo(apiNodePtr.i).m_version = version;
setNodeInfo(apiNodePtr.i).m_mysql_version = mysql_version;
- setNodeInfo(apiNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(apiNodePtr.i) = 0;
NodeState state = getNodeState();
if (apiNodePtr.p->phase == ZAPI_INACTIVE)
@@ -4123,7 +4125,7 @@ void Qmgr::handleApiCloseComConf(Signal*
* Allow MGM do reconnect "directly"
*/
jam();
- setNodeInfo(failedNodePtr.i).m_heartbeat_cnt = 3;
+ set_hb_count(failedNodePtr.i) = 3;
}
/* Handled the single API node failure */
@@ -4524,7 +4526,7 @@ void Qmgr::execCOMMIT_FAILREQ(Signal* si
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
nodePtr.p->phase = ZFAIL_CLOSING;
nodePtr.p->failState = WAITING_FOR_NDB_FAILCONF;
- setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(nodePtr.i) = 0;
setNodeInfo(nodePtr.i).m_version = 0;
c_clusterNodes.clear(nodePtr.i);
}//for
@@ -4816,7 +4818,7 @@ void Qmgr::failReport(Signal* signal,
failedNodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
failedNodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
failedNodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
- setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+ set_hb_count(failedNodePtr.i) = 0;
if (aSendFailRep == ZTRUE) {
jam();
if (failedNodePtr.i != getOwnNodeId()) {
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-10 13:54:55 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-11 15:43:32 +0000
@@ -702,7 +702,7 @@ TrpmanProxy::sendCLOSE_COMCONF(Signal *s
CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
*conf = ss.m_req;
- sendSignal(conf->xxxBlockRef, GSN_CLOSE_COMCONF, signal,
+ sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal,
CloseComReqConf::SignalLength, JBB);
ssRelease<Ss_CLOSE_COMREQ>(ssId);
}
=== modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp'
--- a/storage/ndb/src/kernel/vm/ArrayPool.hpp 2012-01-10 13:01:14 +0000
+++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp 2012-01-11 18:28:28 +0000
@@ -261,7 +261,7 @@ protected:
* Protect here means to have them on separate CPU cache lines to
* avoid false CPU cache line sharing.
*/
- char protect_read_var[NDB_CL - (sizeof(Uint32) + sizeof(void*))];
+ char protect_read_var[NDB_CL_PADSZ(sizeof(Uint32) + sizeof(void*))];
Uint32 firstFree;
Uint32 noOfFree;
Uint32 noOfFreeMin;
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-10 18:21:38 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-11 18:28:28 +0000
@@ -682,6 +682,12 @@ Configuration::calcSizeAlt(ConfigValues
lqhInstances = globalData.ndbMtLqhWorkers;
}
+ Uint32 tcInstances = 1;
+ if (globalData.ndbMtTcThreads > 1)
+ {
+ tcInstances = globalData.ndbMtTcThreads;
+ }
+
Uint64 indexMem = 0, dataMem = 0;
ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem);
ndb_mgm_get_int64_parameter(&db, CFG_DB_INDEX_MEM, &indexMem);
@@ -808,7 +814,8 @@ Configuration::calcSizeAlt(ConfigValues
#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) +
#else
- noOfLocalScanRecords = 4 * (noOfDBNodes * noOfScanRecords) +
+ noOfLocalScanRecords = tcInstances * lqhInstances *
+ (noOfDBNodes * noOfScanRecords) +
#endif
1 /* NR */ +
1 /* LCP */;
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2012-01-10 13:54:55 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2012-01-11 13:16:31 +0000
@@ -41,9 +41,10 @@ enum restartStates {initial_state,
perform_stop};
struct GlobalData {
+ Uint32 m_hb_count[MAX_NODES]; // hb counters
NodeInfo m_nodeInfo[MAX_NODES]; // At top to ensure cache alignment
Signal VMSignals[1]; // Owned by FastScheduler::
- Uint32 m_restart_seq; //
+ Uint32 m_restart_seq; //
NodeVersionInfo m_versionInfo;
Uint64 internalMillisecCounter; // Owned by ThreadConfig::
@@ -91,6 +92,7 @@ struct GlobalData {
ndbMtSendThreads = 0;
ndbMtReceiveThreads = 0;
ndbLogParts = 0;
+ bzero(m_hb_count, sizeof(m_hb_count));
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
#endif
@@ -107,7 +109,18 @@ struct GlobalData {
void incrementWatchDogCounter(Uint32 place);
Uint32 * getWatchDogPtr();
-
+
+ Uint32 getBlockThreads() const {
+ return ndbMtLqhThreads + ndbMtTcThreads + ndbMtReceiveThreads;
+ }
+
+ Uint32 get_hb_count(Uint32 nodeId) const {
+ return m_hb_count[nodeId];
+ }
+
+ Uint32& set_hb_count(Uint32 nodeId) {
+ return m_hb_count[nodeId];
+ }
private:
Uint32 watchDog;
SimulatedBlock* blockTable[NO_OF_BLOCKS]; // Owned by Dispatcher::
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2012-01-11 18:28:28 +0000
@@ -161,15 +161,20 @@ SimulatedBlock::initCommon()
this->getParam("FragmentInfoHash", &count);
c_fragmentInfoHash.setSize(count);
- count = 5;
+ Uint32 def = 5;
+#ifdef NDBD_MULTITHREADED
+ def += globalData.getBlockThreads();
+#endif
+
+ count = def;
this->getParam("ActiveMutexes", &count);
c_mutexMgr.setSize(count);
-
- count = 5;
+
+ count = def;
this->getParam("ActiveCounters", &count);
c_counterMgr.setSize(count);
- count = 5;
+ count = def;
this->getParam("ActiveThreadSync", &count);
c_syncThreadPool.setSize(count);
}
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-04 14:25:32 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-11 18:28:28 +0000
@@ -469,6 +469,8 @@ SignalLoggerManager::printSegmentedSecti
void
TransporterCallbackKernel::transporter_recv_from(NodeId nodeId)
{
- globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
- return;
+ if (globalData.get_hb_count(nodeId) != 0)
+ {
+ globalData.set_hb_count(nodeId) = 0;
+ }
}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-10 13:01:14 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-11 18:28:28 +0000
@@ -3474,7 +3474,8 @@ rep_init(struct thr_repository* rep, uns
Uint32
compute_jb_pages(struct EmulatorData * ed)
{
- Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
+ Uint32 cnt = NUM_MAIN_THREADS +
+ globalData.ndbMtReceiveThreads + globalData.ndbMtTcThreads + globalData.ndbMtLqhThreads + 1;
Uint32 perthread = 0;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3758 to 3759) | Frazer Clement | 12 Jan |