3762 Frazer Clement 2012-01-17
Recommit reverted patches
added:
mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
modified:
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ndb_share.h
3761 Frazer Clement 2012-01-17
Fix late definition problem
ndb_thd.cc #defines MYSQL_SERVER just before it #includes sql_class.h.
Type definitions with sql_class.h change size and offsets depending on
the definition of MYSQL_SERVER (Particularly the NET type embedded in
THD).
Unfortunately sql_class.h #includes other headers, which also depend on
MYSQL_SERVER and which have already been included, without it defined,
prior to this point.
This results in an invalid definition of THD for code after the #include
of sql_class.h, which can result in data corruption during execution.
The fix is to move the MYSQL_SERVER #define up to the earliest point.
Testing is difficult as it is not easy to hit the problem (execution
of thd_print_warning_list()) without error injection. However, a separate
feature to follow adds testing of this path.
modified:
sql/ndb_thd.cc
3760 Frazer Clement 2012-01-12 [merge]
Merge 7.1->7.2
removed:
mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
modified:
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test
mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ndb_share.h
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result 2012-01-17 12:21:37 +0000
@@ -229,7 +229,51 @@ select server_id, master_server_id, coun
server_id master_server_id count a
2 1 1 3
drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+Test that online table distribution sets up conflict functions and exceptions tables
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+create table t2diffex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t3oneex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+show variables like 'server_id';
+Variable_name Value
+server_id 1
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+show variables like 'server_id';
+Variable_name Value
+server_id 3
+MySQLD error output for server 2.1 matching pattern %NDB Slave%
+relevant
+[warning] ndb slave: table test.t3oneex : no extra row author bits in table.
+[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
"Cleanup"
drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
include/rpl_end.inc
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-12 02:44:08 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result 2012-01-17 12:21:37 +0000
@@ -15,7 +15,7 @@ delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
Warnings:
-Warning 1626 Error in parsing conflict function. Message: column 'X' has wrong datatype
+Warning 1626 Error in parsing conflict function. Message: Column 'X' has wrong datatype
drop table t1;
delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test 2012-01-17 12:21:37 +0000
@@ -2,7 +2,7 @@
# Test engine native conflict resolution for ndb
#
#
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
--source include/have_binlog_format_mixed_or_row.inc
--source suite/ndb_rpl/ndb_master-slave.inc
@@ -307,14 +307,77 @@ select server_id, master_server_id, coun
--connection master
drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+
+--echo Test that online table distribution sets up conflict functions and exceptions tables
+
+# t1allsame - Same on all servers, no exceptions table
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+
+# t2diffex - Different on each server with an exceptions table
+# Not a recommended configuration!
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+
+# t3oneex - Only on one server with an exceptions table
+# Note that it's not defined on the server where it's created (not recommended)
+# so on the server where it's defined, we get an error due to having no extra
+# author bits
+#
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+
+# Create exception tables
+create table t2diffex$EX (
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+
+create table t3oneex$EX (
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+# Now create actual tables on this server (id 1)
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+
+# Now examine server logs to see that conflict detection and
+# exceptions tables were setup as expected
+show variables like 'server_id';
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=4
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+
+--connection server2
+--disable_query_log
+call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*");
+--enable_query_log
+
+show variables like 'server_id';
+--let $server_num=2.1
+--let $pattern=%NDB Slave%
+--let $limit=6
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+--connection master
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
###############
--echo "Cleanup"
--connection master
drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
--sync_slave_with_master
--source include/rpl_end.inc
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-12 02:44:08 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test 2012-01-17 12:21:37 +0000
@@ -231,6 +231,11 @@ insert into mysql.ndb_replication values
("test", "t3", 0, 7, "NDB\$EPOCH(32)"),
("test", "t4", 0, 7, "NDB\$EPOCH(-1)");
+--disable_query_log
+# Only need suppress here, as table creation fails due to this.
+call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*");
+--enable_query_log
+
--error 1005
create table test.t3 (a int primary key) engine=ndb;
show warnings;
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-12 02:26:21 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test 2012-01-17 12:21:37 +0000
@@ -2,7 +2,7 @@
# Some negative tests of the ndb_replication table
#
#
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
--source include/have_binlog_format_mixed_or_row.inc
--source suite/ndb_rpl/ndb_master-slave.inc
@@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication
--enable_warnings
--enable_query_log
+# Need suppressions on all servers where warnings/errors can be seen.
+--disable_query_log
+--connection server1
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection default
+--enable_query_log
+
# Non existant conflict_fn
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
--error 1005
create table t1 (a int key, X int) engine ndb;
@@ -56,6 +73,8 @@ delete from mysql.ndb_replication;
# Column type cannot be used for this function
# gives warning when creating table
+#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
drop table t1;
@@ -63,6 +82,8 @@ delete from mysql.ndb_replication;
# Too few arguments
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
--error 1005
create table t1 (a int key, X int) engine ndb;
@@ -71,6 +92,7 @@ delete from mysql.ndb_replication;
# Too many arguments
# gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
--error 1005
create table t1 (a int key, X int) engine ndb;
=== added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc'
--- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc 2012-01-17 12:21:37 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--echo MySQLD error output for server $server_num matching pattern $pattern
+create table errlog (a int auto_increment primary key, txt text) engine=myisam;
+
+--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog (txt);
+
+--eval select lower(right(txt, length(txt) - locate('[',txt) + 1)) as relevant from errlog where txt like '$pattern' order by a desc limit $limit;
+
+drop table errlog;
+--enable_query_log
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2012-01-12 02:44:08 +0000
+++ b/sql/ha_ndbcluster.cc 2012-01-17 12:21:37 +0000
@@ -2130,7 +2130,7 @@ int ha_ndbcluster::get_metadata(THD *thd
#ifdef HAVE_NDB_BINLOG
ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
- ::server_id, table, FALSE);
+ ::server_id, FALSE);
#endif
DBUG_RETURN(0);
@@ -5143,6 +5143,7 @@ ha_ndbcluster::prepare_conflict_detectio
{
res = conflict_fn->prep_func(m_share->m_cfn_share,
op_type,
+ m_ndb_record,
old_data,
new_data,
table->write_set,
@@ -5993,7 +5994,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
for (k= 0; k < nkey; k++)
{
DBUG_ASSERT(pk_row != NULL);
- const uchar* data= pk_row + cfn_share->m_offset[k];
+ const uchar* data=
+ (const uchar*) NdbDictionary::getValuePtr(key_rec,
+ (const char*) pk_row,
+ cfn_share->m_key_attrids[k]);
if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
{
err= ex_op->getNdbError();
@@ -10020,7 +10024,6 @@ int ha_ndbcluster::create(const char *na
m_dbname,
m_tabname,
::server_id,
- form,
&binlog_flags,
&conflict_fn,
args,
@@ -10484,7 +10487,6 @@ cleanup_failed:
ndbcluster_apply_binlog_replication_info(thd,
share,
m_table,
- form,
conflict_fn,
args,
num_args,
@@ -10929,7 +10931,7 @@ int ha_ndbcluster::rename_table(const ch
#ifdef HAVE_NDB_BINLOG
if (share)
ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
- ::server_id, NULL, TRUE);
+ ::server_id, TRUE);
#endif
/* always create an event for the table */
String event_name(INJECTOR_EVENT_LEN);
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2012-01-12 02:44:08 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2012-01-17 12:21:37 +0000
@@ -4004,7 +4004,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
const NDBTAB *ndbtab, uint field_index,
uint resolve_col_sz,
const st_conflict_fn_def* conflict_fn,
- TABLE *table,
uint8 flags)
{
DBUG_ENTER("slave_set_resolve_fn");
@@ -4024,8 +4023,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
/* Calculate resolve col stuff (if relevant) */
cfn_share->m_resolve_size= resolve_col_sz;
cfn_share->m_resolve_column= field_index;
- cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
- table->record[0]);
cfn_share->m_flags = flags;
{
@@ -4074,8 +4071,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
col->getNullable() == ex_col->getNullable();
if (!ok)
break;
- cfn_share->m_offset[k]=
- (uint16)(table->field[i]->ptr - table->record[0]);
+ /*
+ Store mapping of Exception table key# to
+ orig table attrid
+ */
+ cfn_share->m_key_attrids[k]= i;
k++;
}
}
@@ -4086,9 +4086,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
ndbtab_g.release();
if (opt_ndb_extra_logging)
sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
- table->s->db.str,
- table->s->table_name.str,
- table->s->db.str,
+ share->db,
+ share->table_name,
+ share->db,
ex_tab_name);
}
else
@@ -4121,6 +4121,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
int
row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4129,7 +4130,10 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
DBUG_ENTER("row_conflict_fn_old");
uint32 resolve_column= cfn_share->m_resolve_column;
uint32 resolve_size= cfn_share->m_resolve_size;
- const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+ const uchar* field_ptr = (const uchar*)
+ NdbDictionary::getValuePtr(data_record,
+ (const char*) old_data,
+ cfn_share->m_resolve_column);
assert((resolve_size == 4) || (resolve_size == 8));
@@ -4197,6 +4201,7 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
int
row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4205,7 +4210,10 @@ row_conflict_fn_max_update_only(NDB_CONF
DBUG_ENTER("row_conflict_fn_max_update_only");
uint32 resolve_column= cfn_share->m_resolve_column;
uint32 resolve_size= cfn_share->m_resolve_size;
- const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+ const uchar* field_ptr = (const uchar*)
+ NdbDictionary::getValuePtr(data_record,
+ (const char*) new_data,
+ cfn_share->m_resolve_column);
assert((resolve_size == 4) || (resolve_size == 8));
@@ -4284,6 +4292,7 @@ row_conflict_fn_max_update_only(NDB_CONF
int
row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4297,6 +4306,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
case UPDATE_ROW:
return row_conflict_fn_max_update_only(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4308,6 +4318,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
*/
return row_conflict_fn_old(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4336,6 +4347,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
int
row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4349,6 +4361,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
case UPDATE_ROW:
return row_conflict_fn_max_update_only(cfn_share,
op_type,
+ data_record,
old_data,
new_data,
write_set,
@@ -4373,6 +4386,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
int
row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -4468,7 +4482,6 @@ parse_conflict_fn_spec(const char* confl
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
Uint32* max_args,
- const TABLE* table,
char *msg, uint msg_len)
{
DBUG_ENTER("parse_conflict_fn_spec");
@@ -4559,9 +4572,6 @@ parse_conflict_fn_spec(const char* confl
uint len= (uint)(end_arg - start_arg);
args[no_args].type= type;
- args[no_args].ptr= start_arg;
- args[no_args].len= len;
- args[no_args].fieldno= (uint32)-1;
DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
@@ -4570,20 +4580,13 @@ parse_conflict_fn_spec(const char* confl
{
case CFAT_COLUMN_NAME:
{
- /* find column in table */
- DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
- TABLE_SHARE *table_s= table->s;
- for (uint j= 0; j < table_s->fields; j++)
- {
- Field *field= table_s->field[j];
- if (strncmp(start_arg, field->field_name, len) == 0 &&
- field->field_name[len] == '\0')
- {
- DBUG_PRINT("info", ("found %s", field->field_name));
- args[no_args].fieldno= j;
- break;
- }
- }
+ /* Copy column name out into argument's buffer */
+ char* dest= &args[no_args].resolveColNameBuff[0];
+
+ memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ?
+ len :
+ NAME_CHAR_LEN));
+ dest[len]= '\0';
break;
}
case CFAT_EXTRA_GCI_BITS:
@@ -4645,7 +4648,7 @@ parse_conflict_fn_spec(const char* confl
}
/* parse error */
my_snprintf(msg, msg_len, "%s, %s at '%s'",
- conflict_fn_spec, error_str, ptr);
+ conflict_fn_spec, error_str, ptr);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
@@ -4654,7 +4657,6 @@ static int
setup_conflict_fn(THD *thd, NDB_SHARE *share,
const NDBTAB *ndbtab,
char *msg, uint msg_len,
- TABLE *table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
const Uint32 num_args)
@@ -4676,38 +4678,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
DBUG_RETURN(-1);
}
+ /* Now try to find the column in the table */
+ int colNum = -1;
+ const char* resolveColName = args[0].resolveColNameBuff;
+ int resolveColNameLen = (int)strlen(resolveColName);
+
+ for (int j=0; j< ndbtab->getNoOfColumns(); j++)
+ {
+ const char* colName = ndbtab->getColumn(j)->getName();
+
+ if (strncmp(colName,
+ resolveColName,
+ resolveColNameLen) == 0 &&
+ colName[resolveColNameLen] == '\0')
+ {
+ colNum = j;
+ break;
+ }
+ }
+ if (colNum == -1)
+ {
+ my_snprintf(msg, msg_len,
+ "Could not find resolve column %s.",
+ resolveColName);
+ DBUG_PRINT("info", ("%s", msg));
+ DBUG_RETURN(-1);
+ }
+
uint resolve_col_sz= 0;
if (0 == (resolve_col_sz =
- slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+ slave_check_resolve_col_type(ndbtab, colNum)))
{
/* wrong data type */
slave_reset_conflict_fn(share);
my_snprintf(msg, msg_len,
- "column '%s' has wrong datatype",
- table->s->field[args[0].fieldno]->field_name);
+ "Column '%s' has wrong datatype",
+ resolveColName);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
if (slave_set_resolve_fn(thd, share, ndbtab,
- args[0].fieldno, resolve_col_sz,
- conflict_fn, table, CFF_NONE))
+ colNum, resolve_col_sz,
+ conflict_fn, CFF_NONE))
{
my_snprintf(msg, msg_len,
- "unable to setup conflict resolution using column '%s'",
- table->s->field[args[0].fieldno]->field_name);
+ "Unable to setup conflict resolution using column '%s'",
+ resolveColName);
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
- table->s->db.str,
- table->s->table_name.str,
- conflict_fn->name,
- table->s->field[args[0].fieldno]->field_name);
- }
+
+ /* Success, update message */
+ my_snprintf(msg, msg_len,
+ "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+ share->db,
+ share->table_name,
+ conflict_fn->name,
+ resolveColName);
break;
}
case CFT_NDB_EPOCH:
@@ -4734,7 +4763,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
* represent SavePeriod/EpochPeriod
*/
if (ndbtab->getExtraRowGciBits() == 0)
- sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
+ sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution",
+ share->db,
+ share->table_name);
if (ndbtab->getExtraRowAuthorBits() == 0)
{
@@ -4746,20 +4777,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
if (slave_set_resolve_fn(thd, share, ndbtab,
0, // field_no
0, // resolve_col_sz
- conflict_fn, table, CFF_REFRESH_ROWS))
+ conflict_fn, CFF_REFRESH_ROWS))
{
my_snprintf(msg, msg_len,
"unable to setup conflict resolution");
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
- table->s->db.str,
- table->s->table_name.str,
- conflict_fn->name);
- }
+ /* Success, update message */
+ my_snprintf(msg, msg_len,
+ "NDB Slave: Table %s.%s using conflict_fn %s.",
+ share->db,
+ share->table_name,
+ conflict_fn->name);
+
break;
}
case CFT_NUMBER_OF_CFTS:
@@ -5065,7 +5096,6 @@ ndbcluster_get_binlog_replication_info(T
const char* db,
const char* table_name,
uint server_id,
- const TABLE *table,
Uint32* binlog_flags,
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
@@ -5113,34 +5143,42 @@ ndbcluster_get_binlog_replication_info(T
DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
}
- if (table != NULL)
+ if (conflict_fn_spec != NULL)
{
- if (conflict_fn_spec != NULL)
+ char tmp_buf[FN_REFLEN];
+
+ if (parse_conflict_fn_spec(conflict_fn_spec,
+ conflict_fn,
+ args,
+ num_args,
+ tmp_buf,
+ sizeof(tmp_buf)) != 0)
{
- char tmp_buf[FN_REFLEN];
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_CONFLICT_FN_PARSE_ERROR,
+ ER(ER_CONFLICT_FN_PARSE_ERROR),
+ tmp_buf);
- if (parse_conflict_fn_spec(conflict_fn_spec,
- conflict_fn,
- args,
- num_args,
- table,
- tmp_buf,
- sizeof(tmp_buf)) != 0)
+ /*
+ Log as well, useful for contexts where the thd's stack of
+ warnings are ignored
+ */
+ if (opt_ndb_extra_logging)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_CONFLICT_FN_PARSE_ERROR,
- ER(ER_CONFLICT_FN_PARSE_ERROR),
- tmp_buf);
- DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+ sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
+ db, table_name,
+ tmp_buf);
}
- }
- else
- {
- /* No conflict function specified */
- conflict_fn= NULL;
- num_args= 0;
+
+ DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
}
}
+ else
+ {
+ /* No conflict function specified */
+ conflict_fn= NULL;
+ num_args= 0;
+ }
DBUG_RETURN(0);
}
@@ -5149,7 +5187,6 @@ int
ndbcluster_apply_binlog_replication_info(THD *thd,
NDB_SHARE *share,
const NDBTAB* ndbtab,
- TABLE* table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
Uint32 num_args,
@@ -5170,15 +5207,32 @@ ndbcluster_apply_binlog_replication_info
if (setup_conflict_fn(thd, share,
ndbtab,
tmp_buf, sizeof(tmp_buf),
- table,
conflict_fn,
args,
- num_args) != 0)
+ num_args) == 0)
{
+ if (opt_ndb_extra_logging)
+ {
+ sql_print_information("%s", tmp_buf);
+ }
+ }
+ else
+ {
+ /*
+ Dump setup failure message to error log
+ for cases where thd warning stack is
+ ignored
+ */
+ sql_print_warning("NDB Slave: Table %s.%s : %s",
+ share->db,
+ share->table_name,
+ tmp_buf);
+
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_CONFLICT_FN_PARSE_ERROR,
ER(ER_CONFLICT_FN_PARSE_ERROR),
tmp_buf);
+
DBUG_RETURN(-1);
}
}
@@ -5196,7 +5250,6 @@ ndbcluster_read_binlog_replication(THD *
NDB_SHARE *share,
const NDBTAB *ndbtab,
uint server_id,
- TABLE *table,
bool do_set_binlog_flags)
{
DBUG_ENTER("ndbcluster_read_binlog_replication");
@@ -5209,7 +5262,6 @@ ndbcluster_read_binlog_replication(THD *
share->db,
share->table_name,
server_id,
- table,
&binlog_flags,
&conflict_fn,
args,
@@ -5217,7 +5269,6 @@ ndbcluster_read_binlog_replication(THD *
(ndbcluster_apply_binlog_replication_info(thd,
share,
ndbtab,
- table,
conflict_fn,
args,
num_args,
@@ -5322,7 +5373,7 @@ int ndbcluster_create_binlog_setup(THD *
/*
*/
ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
- ::server_id, NULL, TRUE);
+ ::server_id, TRUE);
#endif
/*
check if logging turned off for this table
@@ -6690,6 +6741,71 @@ void updateInjectorStats(Ndb* schemaNdb,
dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
}
+/**
+ injectApplyStatusWriteRow
+
+ Inject a WRITE_ROW event on the ndb_apply_status table into
+ the Binlog.
+ This contains our server_id and the supplied epoch number.
+ When applied on the Slave it gives a transactional position
+ marker
+*/
+static
+bool
+injectApplyStatusWriteRow(injector::transaction& trans,
+ ulonglong gci)
+{
+ DBUG_ENTER("injectApplyStatusWriteRow");
+ if (ndb_apply_status_share == NULL)
+ {
+ sql_print_error("NDB: Could not get apply status share");
+ DBUG_ASSERT(ndb_apply_status_share != NULL);
+ DBUG_RETURN(false);
+ }
+
+ /* Build row buffer for generated ndb_apply_status
+ WRITE_ROW event
+ First get the relevant table structure.
+ */
+ DBUG_ASSERT(!ndb_apply_status_share->event_data);
+ DBUG_ASSERT(ndb_apply_status_share->op);
+ Ndb_event_data* event_data=
+ (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
+ DBUG_ASSERT(event_data);
+ DBUG_ASSERT(event_data->shadow_table);
+ TABLE* apply_status_table= event_data->shadow_table;
+
+ /*
+ Intialize apply_status_table->record[0]
+ */
+ empty_record(apply_status_table);
+
+ apply_status_table->field[0]->store((longlong)::server_id, true);
+ apply_status_table->field[1]->store((longlong)gci, true);
+ apply_status_table->field[2]->store("", 0, &my_charset_bin);
+ apply_status_table->field[3]->store((longlong)0, true);
+ apply_status_table->field[4]->store((longlong)0, true);
+#ifndef DBUG_OFF
+ const LEX_STRING& name= apply_status_table->s->table_name;
+ DBUG_PRINT("info", ("use_table: %.*s",
+ (int) name.length, name.str));
+#endif
+ injector::transaction::table tbl(apply_status_table, true);
+ int ret = trans.use_table(::server_id, tbl);
+ assert(ret == 0); NDB_IGNORE_VALUE(ret);
+
+ ret= trans.write_row(::server_id,
+ injector::transaction::table(apply_status_table,
+ true),
+ &apply_status_table->s->all_set,
+ apply_status_table->s->fields,
+ apply_status_table->record[0]);
+
+ assert(ret == 0);
+
+ DBUG_RETURN(true);
+}
+
extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
extern ulong opt_ndb_report_thresh_binlog_mem_usage;
@@ -7161,44 +7277,6 @@ restart_cluster_failure:
{
DBUG_PRINT("info", ("pollEvents res: %d", res));
thd->proc_info= "Processing events";
- uchar apply_status_buf[512];
- TABLE *apply_status_table= NULL;
- if (ndb_apply_status_share)
- {
- /*
- We construct the buffer to write the apply status binlog
- event here, as the table->record[0] buffer is referenced
- by the apply status event operation, and will be filled
- with data at the nextEvent call if the first event should
- happen to be from the apply status table
- */
- Ndb_event_data *event_data= ndb_apply_status_share->event_data;
- if (!event_data)
- {
- DBUG_ASSERT(ndb_apply_status_share->op);
- event_data=
- (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
- DBUG_ASSERT(event_data);
- }
- apply_status_table= event_data->shadow_table;
-
- /*
- Intialize apply_status_table->record[0]
- */
- empty_record(apply_status_table);
-
- apply_status_table->field[0]->store((longlong)::server_id, true);
- /*
- gci is added later, just before writing to binlog as gci
- is unknown here
- */
- apply_status_table->field[2]->store("", 0, &my_charset_bin);
- apply_status_table->field[3]->store((longlong)0, true);
- apply_status_table->field[4]->store((longlong)0, true);
- DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
- memcpy(apply_status_buf, apply_status_table->record[0],
- apply_status_table->s->reclength);
- }
NdbEventOperation *pOp= i_ndb->nextEvent();
ndb_binlog_index_row _row;
ndb_binlog_index_row *rows= &_row;
@@ -7321,35 +7399,11 @@ restart_cluster_failure:
}
if (trans.good())
{
- if (apply_status_table)
- {
-#ifndef DBUG_OFF
- const LEX_STRING& name= apply_status_table->s->table_name;
- DBUG_PRINT("info", ("use_table: %.*s",
- (int) name.length, name.str));
-#endif
- injector::transaction::table tbl(apply_status_table, true);
- int ret = trans.use_table(::server_id, tbl);
- assert(ret == 0); NDB_IGNORE_VALUE(ret);
-
- /* add the gci to the record */
- Field *field= apply_status_table->field[1];
- my_ptrdiff_t row_offset=
- (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
- field->move_field_offset(row_offset);
- field->store((longlong)gci, true);
- field->move_field_offset(-row_offset);
-
- trans.write_row(::server_id,
- injector::transaction::table(apply_status_table,
- true),
- &apply_status_table->s->all_set,
- apply_status_table->s->fields,
- apply_status_buf);
- }
- else
+ /* Inject ndb_apply_status WRITE_ROW event */
+ if (!injectApplyStatusWriteRow(trans,
+ gci))
{
- sql_print_error("NDB: Could not get apply status share");
+ sql_print_error("NDB Binlog: Failed to inject apply status write row");
}
}
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2012-01-12 02:44:08 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2012-01-17 12:21:37 +0000
@@ -82,7 +82,6 @@ ndbcluster_get_binlog_replication_info(T
const char* db,
const char* table_name,
uint server_id,
- const TABLE *table,
Uint32* binlog_flags,
const st_conflict_fn_def** conflict_fn,
st_conflict_fn_arg* args,
@@ -91,7 +90,6 @@ int
ndbcluster_apply_binlog_replication_info(THD *thd,
NDB_SHARE *share,
const NDBTAB* ndbtab,
- TABLE* table,
const st_conflict_fn_def* conflict_fn,
const st_conflict_fn_arg* args,
Uint32 num_args,
@@ -102,7 +100,6 @@ ndbcluster_read_binlog_replication(THD *
NDB_SHARE *share,
const NDBTAB *ndbtab,
uint server_id,
- TABLE *table,
bool do_set_binlog_flags);
#endif
int ndb_create_table_from_engine(THD *thd, const char *db,
=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h 2012-01-12 02:44:08 +0000
+++ b/sql/ndb_share.h 2012-01-17 12:21:37 +0000
@@ -22,6 +22,8 @@
#include <my_alloc.h> // MEM_ROOT
#include <thr_lock.h> // THR_LOCK
#include <my_bitmap.h> // MY_BITMAP
+#include <mysql_com.h> // NAME_CHAR_LEN
+#include <sql_const.h> // MAX_REF_PARTS
#include <ndbapi/Ndb.hpp> // Ndb::TupleIdRange
@@ -56,11 +58,9 @@ enum enum_conflict_fn_arg_type
struct st_conflict_fn_arg
{
enum_conflict_fn_arg_type type;
- const char *ptr;
- uint32 len;
union
{
- uint32 fieldno; // CFAT_COLUMN_NAME
+ char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
};
};
@@ -88,6 +88,7 @@ enum enum_conflicting_op_type
*/
typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
@@ -131,16 +132,21 @@ enum enum_conflict_fn_table_flags
CFF_REFRESH_ROWS = 1
};
+/*
+ Maximum supported key parts (16)
+ (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
typedef struct st_ndbcluster_conflict_fn_share {
const st_conflict_fn_def* m_conflict_fn;
/* info about original table */
uint8 m_pk_cols;
- uint8 m_resolve_column;
+ uint16 m_resolve_column;
uint8 m_resolve_size;
uint8 m_flags;
- uint16 m_offset[16];
- uint16 m_resolve_offset;
+ uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
const NdbDictionary::Table *m_ex_tab;
uint32 m_count;
=== modified file 'sql/ndb_thd.cc'
--- a/sql/ndb_thd.cc 2011-10-31 09:22:55 +0000
+++ b/sql/ndb_thd.cc 2012-01-17 12:15:00 +0000
@@ -15,6 +15,10 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
#include "ndb_thd.h"
#include "ndb_thd_ndb.h"
@@ -44,10 +48,6 @@ Ndb* check_ndb_in_thd(THD* thd, bool val
return thd_ndb->ndb;
}
-#ifndef MYSQL_SERVER
-#define MYSQL_SERVER
-#endif
-
#include <sql_class.h>
void
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3760 to 3762) | Frazer Clement | 17 Jan |