Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2523 07/06/06 15:59:16 tomas@stripped +13 -0
WL#3867 choosing logging + specifying conflict resolution function
mysql-test/t/rpl_ndb_rep_error.test
1.1 07/06/06 15:59:05 tomas@stripped +79 -0
negative tests for using the ndb replication table
mysql-test/r/rpl_ndb_rep_error.result
1.1 07/06/06 15:59:05 tomas@stripped +34 -0
negative tests for using the ndb replication table
mysql-test/t/rpl_ndb_rep_error.test
1.0 07/06/06 15:59:05 tomas@stripped +0 -0
BitKeeper file /home/tomas/mysql-5.1-wl3867/mysql-test/t/rpl_ndb_rep_error.test
mysql-test/r/rpl_ndb_rep_error.result
1.0 07/06/06 15:59:05 tomas@stripped +0 -0
BitKeeper file /home/tomas/mysql-5.1-wl3867/mysql-test/r/rpl_ndb_rep_error.result
sql/share/errmsg.txt
1.156 07/06/06 15:59:04 tomas@stripped +4 -0
new error messages for ndb replication table
sql/mysqld.cc
1.645 07/06/06 15:59:04 tomas@stripped +10 -1
new option to set default loggin type
sql/log_event.cc
1.280 07/06/06 15:59:04 tomas@stripped +0 -19
remove handler function to set conflict resolution
sql/handler.h
1.260 07/06/06 15:59:04 tomas@stripped +0 -2
remove handler function to set conflict resolution
sql/ha_ndbcluster_tables.h
1.7 07/06/06 15:59:04 tomas@stripped +12 -0
nre binlog table definition
sql/ha_ndbcluster_binlog.h
1.28 07/06/06 15:59:04 tomas@stripped +8 -1
read function for the binlog table
sql/ha_ndbcluster_binlog.cc
1.123 07/06/06 15:59:04 tomas@stripped +501 -9
default logging option "updated only"
event name different for "full row" logging
parsing of conlict resolution function NDB$MAX
reading of the replication table
setting different logging depending op flags on share
sql/ha_ndbcluster.h
1.184 07/06/06 15:59:04 tomas@stripped +19 -5
flags for different logging options
sql/ha_ndbcluster.cc
1.476 07/06/06 15:59:03 tomas@stripped +15 -34
read replication table before setting up logging
use different event names for "full row" and "updated only"
mysql-test/t/rpl_ndb_conflict.test
1.4 07/06/06 15:59:03 tomas@stripped +29 -0
updated test
mysql-test/r/rpl_ndb_conflict.result
1.2 07/06/06 15:59:03 tomas@stripped +1 -0
updated test
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.mysql.com
# Root: /home/tomas/mysql-5.1-wl3867
--- 1.259/sql/handler.h 2007-06-01 16:17:22 +02:00
+++ 1.260/sql/handler.h 2007-06-06 15:59:04 +02:00
@@ -1668,8 +1668,6 @@ public:
but we don't have a primary key
*/
virtual void use_hidden_primary_key();
-
- virtual int slave_set_resolve_highest(uint field_index) { return 0; }
private:
/*
Row-level primitives for storage engines. These should be
--- 1.279/sql/log_event.cc 2007-06-01 16:17:22 +02:00
+++ 1.280/sql/log_event.cc 2007-06-06 15:59:04 +02:00
@@ -5952,25 +5952,6 @@ int Rows_log_event::do_apply_event(RELAY
const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
-#ifndef WL3867_INTERFACE_DONE_PROPERLY
- {
- TABLE *table= ptr->table;
- /*
- Check if table has resolve setup and for which column
- Until we have a proper interface for this we have this
- dummy loop that does it on column name
- */
- for (uint j= 0; j < table->s->fields; j++)
- {
- Field *field= table->s->field[j];
- if (strcmp("X", field->field_name) == 0)
- {
- table->file->slave_set_resolve_highest(j);
- break;
- }
- }
- }
-#endif
}
}
--- 1.644/sql/mysqld.cc 2007-06-01 08:52:12 +02:00
+++ 1.645/sql/mysqld.cc 2007-06-06 15:59:04 +02:00
@@ -380,6 +380,7 @@ ulong ndb_extra_logging;
ulong ndb_report_thresh_binlog_epoch_slip;
ulong ndb_report_thresh_binlog_mem_usage;
my_bool opt_ndb_log_update_as_write;
+my_bool opt_ndb_log_updated_only;
#endif
extern const char *ndb_distribution_names[];
@@ -4928,7 +4929,7 @@ enum options_mysqld
OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE,
OPT_NDB_USE_COPYING_ALTER_TABLE,
- OPT_NDB_LOG_UPDATE_AS_WRITE,
+ OPT_NDB_LOG_UPDATE_AS_WRITE, OPT_NDB_LOG_UPDATED_ONLY,
OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_COMPLETION_TYPE,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -5489,6 +5490,14 @@ master-ssl",
"replicating to other storage engines than ndbcluster",
(gptr*) &opt_ndb_log_update_as_write,
(gptr*) &opt_ndb_log_update_as_write,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ {"ndb-log-updated-only", OPT_NDB_LOG_UPDATED_ONLY,
+ "For efficiency log only updated columns. Columns are considered "
+ "as \"updated\" even if they are updated with the same value. "
+ "This may cause compatability problems if"
+ "replicating to other storage engines than ndbcluster",
+ (gptr*) &opt_ndb_log_updated_only,
+ (gptr*) &opt_ndb_log_updated_only,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
#endif
{"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
--- 1.155/sql/share/errmsg.txt 2007-04-19 18:58:29 +02:00
+++ 1.156/sql/share/errmsg.txt 2007-06-06 15:59:04 +02:00
@@ -6059,3 +6059,7 @@ ER_SLAVE_INCIDENT
eng "The incident %s occured on the master. Message: %-.64s"
ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT
eng "Table has no partition for some existing values"
+ER_NDB_REPLICATION_SCHEMA_ERROR
+ eng "Bad schema for mysql.ndb_replication table. Message: %-.64s"
+ER_CONFLICT_FN_PARSE_ERROR
+ eng "Error in parsing conflict function. Message: %-.64s"
--- 1.475/sql/ha_ndbcluster.cc 2007-06-01 17:10:35 +02:00
+++ 1.476/sql/ha_ndbcluster.cc 2007-06-06 15:59:03 +02:00
@@ -3531,34 +3531,6 @@ int ha_ndbcluster::primary_key_cmp(const
}
#ifdef HAVE_NDB_BINLOG
-int ha_ndbcluster::slave_set_resolve_highest(uint field_index)
-{
- DBUG_ENTER("ha_ndbcluster::slave_set_resolve_highest");
- const NDBCOL *c= m_table->getColumn(field_index);
- switch (c->getType())
- {
- case NDBCOL::Unsigned:
- m_share->m_resolve_size= sizeof(Uint32);
- m_share->m_resolve_column= field_index;
- DBUG_PRINT("info", ("resolve column Uint32%u",
- m_share->m_resolve_column));
- DBUG_RETURN(0);
- break;
- case NDBCOL::Bigunsigned:
- m_share->m_resolve_size= sizeof(Uint64);
- m_share->m_resolve_column= field_index;
- DBUG_PRINT("info", ("resolve column Uint64 %u",
- m_share->m_resolve_column));
- DBUG_RETURN(0);
- break;
- default:
- DBUG_PRINT("info", ("resolve column %u has wrong type",
- m_share->m_resolve_column));
- DBUG_RETURN(0);
- break;
- }
- DBUG_RETURN(-1);
-}
/*
To perform conflict resolution, an interpreted program is used to read
@@ -6079,8 +6051,11 @@ int ha_ndbcluster::create(const char *na
while (!IS_TMP_PREFIX(m_tabname))
{
+ ndbcluster_read_binlog_replication(thd, ndb, share, m_table, ::server_id);
+
String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,m_dbname,m_tabname);
+ ndb_rep_event_name(&event_name, m_dbname, m_tabname,
+ get_binlog_full(share));
int do_event_op= ndb_binlog_running;
if (!ndb_schema_share &&
@@ -6518,19 +6493,24 @@ int ha_ndbcluster::rename_table(const ch
{
is_old_table_tmpfile= 0;
String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
+ ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0,
+ get_binlog_full(share));
ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share,
"rename table");
}
if (!result && !IS_TMP_PREFIX(new_tabname))
{
- /* always create an event for the table */
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0);
Ndb_table_guard ndbtab_g2(dict, new_tabname);
const NDBTAB *ndbtab= ndbtab_g2.get_table();
+ ndbcluster_read_binlog_replication(current_thd, ndb, share, ndbtab, ::server_id);
+
+ /* always create an event for the table */
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0,
+ get_binlog_full(share));
+
if (!ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share,
share && ndb_binlog_running ? 2 : 1/* push warning */))
{
@@ -6756,7 +6736,8 @@ retry_temporary_error1:
if (!IS_TMP_PREFIX(table_name))
{
String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
+ ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0,
+ get_binlog_full(share));
ndbcluster_handle_drop_table(ndb,
table_dropped ? event_name.c_ptr() : 0,
share, "delete table");
--- 1.183/sql/ha_ndbcluster.h 2007-06-01 17:10:35 +02:00
+++ 1.184/sql/ha_ndbcluster.h 2007-06-06 15:59:04 +02:00
@@ -172,6 +172,21 @@ struct Ndb_tuple_id_range_guard {
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
#define NSF_BLOB_FLAG 2 /* table has blob attributes */
#define NSF_NO_BINLOG 4 /* table should not be binlogged */
+#define NSF_BINLOG_FULL 8 /* table should be binlogged with full rows */
+#define NSF_BINLOG_USE_UPDATE 16 /* table update should be binlogged using
+ update log event */
+inline void set_binlog_nologging(NDB_SHARE *share)
+{ share->flags|= NSF_NO_BINLOG; }
+inline my_bool get_binlog_nologging(NDB_SHARE *share)
+{ return (share->flags & NSF_NO_BINLOG) != 0; }
+inline void set_binlog_full(NDB_SHARE *share)
+{ share->flags|= NSF_BINLOG_FULL; }
+inline my_bool get_binlog_full(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_FULL) != 0; }
+inline void set_binlog_use_update(NDB_SHARE *share)
+{ share->flags|= NSF_BINLOG_USE_UPDATE; }
+inline my_bool get_binlog_use_update(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
#endif
typedef enum ndb_query_state_bits {
@@ -251,11 +266,6 @@ class ha_ndbcluster: public handler
int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data);
-#ifdef HAVE_NDB_BINLOG
- int update_row_timestamp_resolve(const byte *old_data, byte *new_data,
- NdbInterpretedCode *);
- int slave_set_resolve_highest(uint field_index);
-#endif
int delete_row(const byte *buf);
int index_init(uint index, bool sorted);
int index_end();
@@ -410,6 +420,10 @@ static void set_tabname(const char *path
uint table_changes);
private:
+#ifdef HAVE_NDB_BINLOG
+ int update_row_timestamp_resolve(const byte *old_data, byte *new_data,
+ NdbInterpretedCode *);
+#endif
friend int ndbcluster_drop_database_impl(const char *path);
friend int ndb_handle_schema_change(THD *thd,
Ndb *ndb, NdbEventOperation *pOp,
--- New file ---
+++ mysql-test/r/rpl_ndb_rep_error.result 07/06/06 15:59:05
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
create table t1 (a int key, X int) engine ndb;
Warnings:
Error 1588 Bad schema for mysql.ndb_replication table. Message: Missing or wrong type for column 'server_id'
drop table t1;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
create table t1 (a int key, X int) engine ndb;
Warnings:
Error 1589 Error in parsing conflict function. Message: NDB$X(X), unknown conflict resolution function at 'NDB$X(X)'
drop table t1;
delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
Warnings:
Error 1589 Error in parsing conflict function. Message: column 'X' has wrong datatype
drop table t1;
delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
create table t1 (a int key, X int) engine ndb;
Warnings:
Error 1589 Error in parsing conflict function. Message: NDB$MAX(), missing function argument at ')'
drop table t1;
delete from mysql.ndb_replication;
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
create table t1 (a int key, X int) engine ndb;
Warnings:
Error 1589 Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
drop table t1;
delete from mysql.ndb_replication;
--- New file ---
+++ mysql-test/t/rpl_ndb_rep_error.test 07/06/06 15:59:05
#
# Some negative tests of the ndb_replication table
#
#
--source include/have_ndb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
#
# wrong schema for the table
#
--disable_warnings
--disable_query_log
drop table if exists mysql.ndb_replication;
CREATE TABLE mysql.ndb_replication
(db VARBINARY(63),
table_name VARBINARY(63),
server_id INT UNSIGNED,
binlog_type INT UNSIGNED,
conflict_fn VARBINARY(128),
PRIMARY KEY USING HASH (db,table_name))
ENGINE=NDB PARTITION BY KEY(db,table_name);
--enable_warnings
--enable_query_log
# gives warning when trying to create table as logging
# may not be as intended
create table t1 (a int key, X int) engine ndb;
drop table t1;
#
# correct schema for the table
# but other errors
#
--disable_warnings
--disable_query_log
drop table mysql.ndb_replication;
CREATE TABLE mysql.ndb_replication
(db VARBINARY(63),
table_name VARBINARY(63),
server_id INT UNSIGNED,
binlog_type INT UNSIGNED,
conflict_fn VARBINARY(128),
PRIMARY KEY USING HASH (db,table_name,server_id))
ENGINE=NDB PARTITION BY KEY(db,table_name);
--enable_warnings
--enable_query_log
# Non existant conflict_fn
# gives warning when creating table
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
create table t1 (a int key, X int) engine ndb;
drop table t1;
delete from mysql.ndb_replication;
# Column type cannot be used for this function
# gives warning when creating table
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, X int) engine ndb;
drop table t1;
delete from mysql.ndb_replication;
# Too few arguments
# gives warning when creating table
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
create table t1 (a int key, X int) engine ndb;
drop table t1;
delete from mysql.ndb_replication;
# Too many arguments
# gives warning when creating table
insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
create table t1 (a int key, X int) engine ndb;
drop table t1;
delete from mysql.ndb_replication;
--disable_query_log
drop table mysql.ndb_replication;
--enable_query_log
--- 1.122/sql/ha_ndbcluster_binlog.cc 2007-06-01 17:10:35 +02:00
+++ 1.123/sql/ha_ndbcluster_binlog.cc 2007-06-06 15:59:04 +02:00
@@ -34,6 +34,7 @@
#endif
extern my_bool opt_ndb_log_update_as_write;
+extern my_bool opt_ndb_log_updated_only;
/*
defines for cluster replication table names
@@ -2440,15 +2441,490 @@ int ndbcluster_binlog_start()
used by the client sql threads
**************************************************************/
void
-ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
+ndb_rep_event_name(String *event_name,const char *db, const char *tbl,
+ my_bool full)
{
- event_name->set_ascii("REPL$", 5);
+ if (full)
+ event_name->set_ascii("REPLF$", 6);
+ else
+ event_name->set_ascii("REPL$", 5);
event_name->append(db);
if (tbl)
{
event_name->append('/');
event_name->append(tbl);
}
+ DBUG_PRINT("info", ("ndb_rep_event_name: %s", event_name->c_ptr()));
+}
+
+static void
+set_binlog_flags(NDB_SHARE *share,
+ Ndb_binlog_type ndb_binlog_type)
+{
+ /* set NSF_USE_UPDATE flag */
+ if (ndb_binlog_type & NBT_USE_UPDATE ||
+ !opt_ndb_log_update_as_write)
+ set_binlog_use_update(share);
+
+ /* set NSF_FULL and NSF_NO_BINLOG */
+ switch (ndb_binlog_type)
+ {
+ case NBT_UPDATED_USE_UPDATE:
+ case NBT_UPDATED:
+ /* Updates only, is default */
+ break;
+ case NBT_NO_LOGGING:
+ set_binlog_nologging(share);
+ /* fall through */
+ case NBT_USE_UPDATE:
+ /* fall through */
+ case NBT_DEFAULT:
+ if (opt_ndb_log_updated_only)
+ break;
+ /* fall through */
+ case NBT_FULL:
+ case NBT_FULL_USE_UPDATE:
+ set_binlog_full(share);
+ break;
+ }
+}
+static int
+slave_set_resolve_max(NDB_SHARE *share, const NDBTAB *ndbtab, uint field_index)
+{
+ DBUG_ENTER("slave_set_resolve_max");
+ const NDBCOL *c= ndbtab->getColumn(field_index);
+ switch (c->getType())
+ {
+ case NDBCOL::Unsigned:
+ share->m_resolve_size= sizeof(Uint32);
+ share->m_resolve_column= field_index;
+ DBUG_PRINT("info", ("resolve column Uint32 %u",
+ share->m_resolve_column));
+ DBUG_RETURN(0);
+ break;
+ case NDBCOL::Bigunsigned:
+ share->m_resolve_size= sizeof(Uint64);
+ share->m_resolve_column= field_index;
+ DBUG_PRINT("info", ("resolve column Uint64 %u",
+ share->m_resolve_column));
+ DBUG_RETURN(0);
+ break;
+ default:
+ DBUG_PRINT("info", ("resolve column %u has wrong type",
+ share->m_resolve_column));
+ DBUG_RETURN(-1);
+ break;
+ }
+}
+
+enum enum_conflict_fn_type
+{
+ CFT_NDB_MAX
+};
+enum enum_conflict_fn_arg_type
+{
+ CFAT_END
+ ,CFAT_COLUMN_NAME
+};
+struct st_conflict_fn_arg
+{
+ enum_conflict_fn_arg_type type;
+ const char *ptr;
+ uint len;
+ uint fieldno; // CFAT_COLUMN_NAME
+};
+struct st_conflict_fn_def
+{
+ const char *name;
+ enum_conflict_fn_type type;
+ enum enum_conflict_fn_arg_type arg_type;
+};
+static struct st_conflict_fn_def conflict_fns[]=
+{
+ { "NDB$MAX", CFT_NDB_MAX, CFAT_COLUMN_NAME }
+ ,{ NULL, CFT_NDB_MAX, CFAT_END }
+};
+static unsigned n_conflict_fns=
+sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
+
+static int
+set_conflict_fn(NDB_SHARE *share,
+ const NDBTAB *ndbtab,
+ const NDBCOL *conflict_col,
+ char *conflict_fn,
+ char *msg, uint msg_len)
+{
+ DBUG_ENTER("set_conflict_fn");
+ uint len= 0;
+ switch (conflict_col->getArrayType())
+ {
+ case NDBCOL::ArrayTypeShortVar:
+ len= *(uchar*)conflict_fn;
+ conflict_fn++;
+ break;
+ case NDBCOL::ArrayTypeMediumVar:
+ len= uint2korr(conflict_fn);
+ conflict_fn+= 2;
+ break;
+ default:
+ break;
+ }
+ conflict_fn[len]= '\0';
+ const char *ptr= conflict_fn;
+ const char *error_str= "unknown conflict resolution function";
+ /* remove whitespace */
+ while (*ptr == ' ' && *ptr != '\0') ptr++;
+
+ const unsigned MAX_ARGS= 8;
+ unsigned no_args= 0;
+ struct st_conflict_fn_arg args[MAX_ARGS];
+
+ for (unsigned i= 0; i < n_conflict_fns; i++)
+ {
+ struct st_conflict_fn_def &fn= conflict_fns[i];
+ if (fn.name == NULL)
+ continue;
+
+ uint len= strlen(fn.name);
+ if (strncmp(ptr, fn.name, len))
+ continue;
+
+ /* skip function name */
+ ptr+= len;
+
+ /* remove whitespace */
+ while (*ptr == ' ' && *ptr != '\0') ptr++;
+
+ /* next '(' */
+ if (*ptr != '(')
+ {
+ error_str= "missing '('";
+ break;
+ }
+ ptr++;
+
+ /* find all arguments */
+ for (;;)
+ {
+ /* expected type */
+ enum enum_conflict_fn_arg_type type=
+ conflict_fns[i+no_args].arg_type;
+
+ /* remove whitespace */
+ while (*ptr == ' ' && *ptr != '\0') ptr++;
+
+ if (type == CFAT_END)
+ {
+ args[no_args].type= type;
+ error_str= NULL;
+ break;
+ }
+
+ /* arg */
+ const char *start_arg= ptr;
+ while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
+ const char *end_arg= ptr;
+
+ /* any arg given? */
+ if (start_arg == end_arg)
+ {
+ error_str= "missing function argument";
+ break;
+ }
+
+ uint len= end_arg - start_arg;
+ args[no_args].type= type;
+ args[no_args].ptr= start_arg;
+ args[no_args].len= len;
+ args[no_args].fieldno= (uint)-1;
+
+ switch (type)
+ {
+ case CFAT_COLUMN_NAME:
+ {
+ /* find column in table */
+ DBUG_PRINT("info", ("serching for %s %u", start_arg, len));
+ TABLE_SHARE *table_s= share->table_share;
+ for (uint j= 0; j < table_s->fields; j++)
+ {
+ Field *field= table_s->field[j];
+ if (strncmp(start_arg, field->field_name, len) == 0 &&
+ field->field_name[len] == '\0')
+ {
+ DBUG_PRINT("info", ("found %s", field->field_name));
+ args[no_args].fieldno= j;
+ break;
+ }
+ }
+ break;
+ }
+ case CFAT_END:
+ abort();
+ }
+
+ no_args++;
+ }
+
+ if (error_str)
+ break;
+
+ /* remove whitespace */
+ while (*ptr == ' ' && *ptr != '\0') ptr++;
+
+ /* next ')' */
+ if (*ptr != ')')
+ {
+ error_str= "missing ')'";
+ break;
+ }
+ ptr++;
+
+ /* remove whitespace */
+ while (*ptr == ' ' && *ptr != '\0') ptr++;
+
+ /* garbage in the end? */
+ if (*ptr != '\0')
+ {
+ error_str= "garbage in the end";
+ break;
+ }
+
+ /* setup the function */
+ switch (fn.type)
+ {
+ case CFT_NDB_MAX:
+ if (args[0].fieldno != (uint)-1 &&
+ slave_set_resolve_max(share, ndbtab, args[0].fieldno))
+ {
+ /* wrong data type */
+ TABLE_SHARE *table_s= share->table_share;
+ snprintf(msg, msg_len,
+ "column '%s' has wrong datatype",
+ table_s->field[args[0].fieldno]->field_name);
+ DBUG_PRINT("info", (msg));
+ DBUG_RETURN(-1);
+ }
+ break;
+ }
+
+ DBUG_RETURN(0);
+ }
+ /* parse error */
+ snprintf(msg, msg_len, "%s, %s at '%s'",
+ conflict_fn, error_str, ptr);
+ DBUG_PRINT("info", (msg));
+ DBUG_RETURN(-1);
+}
+
+static const char *ndb_rep_db= NDB_REP_DB;
+static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
+static const char *nrt_db= "db";
+static const char *nrt_table_name= "table_name";
+static const char *nrt_server_id= "server_id";
+static const char *nrt_binlog_type= "binlog_type";
+static const char *nrt_conflict_fn= "conflict_fn";
+int
+ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
+ NDB_SHARE *share,
+ const NDBTAB *ndbtab,
+ uint server_id)
+{
+ DBUG_ENTER("ndbcluster_read_binlog_replication");
+ const char *db= share->db;
+ const char *table_name= share->table_name;
+ NdbError ndberror;
+ int error;
+ const char *error_str= "<none>";
+
+ ndb->setDatabaseName(ndb_rep_db);
+ NDBDICT *dict= ndb->getDictionary();
+ Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
+ const NDBTAB *reptab= ndbtab_g.get_table();
+ if (reptab == NULL &&
+ dict->getNdbError().classification == NdbError::SchemaError)
+ {
+ DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
+ set_binlog_flags(share, NBT_DEFAULT);
+ DBUG_RETURN(0);
+ }
+ const NDBCOL
+ *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
+ char tmp_buf[FN_REFLEN];
+ uint retries= 100;
+ int retry_sleep= 30; /* 30 milliseconds, transaction */
+ if (reptab == NULL)
+ {
+ ndberror= dict->getNdbError();
+ goto err;
+ }
+ error= -1;
+ col_db= reptab->getColumn(error_str= nrt_db);
+ if (col_db == NULL ||
+ !col_db->getPrimaryKey() ||
+ col_db->getType() != NDBCOL::Varbinary)
+ goto err;
+ col_table_name= reptab->getColumn(error_str= nrt_table_name);
+ if (col_table_name == NULL ||
+ !col_table_name->getPrimaryKey() ||
+ col_table_name->getType() != NDBCOL::Varbinary)
+ goto err;
+ col_server_id= reptab->getColumn(error_str= nrt_server_id);
+ if (col_server_id == NULL ||
+ !col_server_id->getPrimaryKey() ||
+ col_server_id->getType() != NDBCOL::Unsigned)
+ goto err;
+ col_binlog_type= reptab->getColumn(error_str= nrt_binlog_type);
+ if (col_binlog_type == NULL ||
+ col_binlog_type->getPrimaryKey() ||
+ col_binlog_type->getType() != NDBCOL::Unsigned)
+ goto err;
+ col_conflict_fn= reptab->getColumn(error_str= nrt_conflict_fn);
+ if (col_conflict_fn == NULL)
+ {
+ col_conflict_fn= NULL;
+ }
+ else if (col_conflict_fn->getPrimaryKey() ||
+ col_conflict_fn->getType() != NDBCOL::Varbinary)
+ goto err;
+
+ error= 0;
+ for (;;)
+ {
+ NdbTransaction *trans= ndb->startTransaction();
+ if (trans == NULL)
+ {
+ ndberror= ndb->getNdbError();
+ break;
+ }
+ NdbRecAttr *col_binlog_type_rec_attr[2];
+ NdbRecAttr *col_conflict_fn_rec_attr[2]= {NULL, NULL};
+ uint32 ndb_binlog_type[2];
+ const uint sz= 256;
+ char ndb_conflict_fn_buf[2*sz];
+ char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
+ NdbOperation *op[2];
+ uint32 i, id= 0;
+ for (i= 0; i < 2; i++)
+ {
+ NdbOperation *_op;
+ DBUG_PRINT("info", ("reading[%u]: %s,%s,%u", i, db, table_name, id));
+ if ((_op= trans->getNdbOperation(reptab)) == NULL) abort();
+ if (_op->readTuple(NdbOperation::LM_CommittedRead)) abort();
+ ndb_pack_varchar(col_db, tmp_buf, db, strlen(db));
+ if (_op->equal(col_db->getColumnNo(), tmp_buf)) abort();
+ ndb_pack_varchar(col_table_name, tmp_buf, table_name, strlen(table_name));
+ if (_op->equal(col_table_name->getColumnNo(), tmp_buf)) abort();
+ if (_op->equal(col_server_id->getColumnNo(), id)) abort();
+ if ((col_binlog_type_rec_attr[i]=
+ _op->getValue(col_binlog_type, (char *)&(ndb_binlog_type[i]))) == 0) abort();
+ /* optional columns */
+ if (col_conflict_fn)
+ {
+ if ((col_conflict_fn_rec_attr[i]=
+ _op->getValue(col_conflict_fn, ndb_conflict_fn[i])) == 0) abort();
+ }
+ id= server_id;
+ op[i]= _op;
+ }
+
+ if (trans->execute(NdbTransaction::Commit,
+ NdbOperation::AO_IgnoreError))
+ {
+ if (ndb->getNdbError().status == NdbError::TemporaryError);
+ {
+ if (retries--)
+ {
+ if (trans)
+ ndb->closeTransaction(trans);
+ my_sleep(retry_sleep);
+ continue;
+ }
+ }
+ ndberror= trans->getNdbError();
+ ndb->closeTransaction(trans);
+ break;
+ }
+ ndb->closeTransaction(trans);
+ for (i= 0; i < 2; i++)
+ {
+ if (op[i]->getNdbError().code)
+ {
+ if (op[i]->getNdbError().classification == NdbError::NoDataFound)
+ {
+ col_binlog_type_rec_attr[i]= NULL;
+ col_conflict_fn_rec_attr[i]= NULL;
+ DBUG_PRINT("info", ("not found row[%u]", i));
+ continue;
+ }
+ ndberror= op[i]->getNdbError();
+ break;
+ }
+ DBUG_PRINT("info", ("found row[%u]", i));
+ }
+ if (col_binlog_type_rec_attr[1] == NULL ||
+ col_binlog_type_rec_attr[1]->isNULL())
+ {
+ col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
+ ndb_binlog_type[1]= ndb_binlog_type[0];
+ }
+ if (col_conflict_fn_rec_attr[1] == NULL ||
+ col_conflict_fn_rec_attr[1]->isNULL())
+ {
+ col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
+ ndb_conflict_fn[1]= ndb_conflict_fn[0];
+ }
+
+ if (col_binlog_type_rec_attr[1] == NULL ||
+ col_binlog_type_rec_attr[1]->isNULL())
+ set_binlog_flags(share, NBT_DEFAULT);
+ else
+ set_binlog_flags(share, (enum Ndb_binlog_type) ndb_binlog_type[1]);
+ if (col_conflict_fn_rec_attr[1] == NULL ||
+ col_conflict_fn_rec_attr[1]->isNULL())
+ ; /* no conflict_fn */
+ else if (set_conflict_fn(share, ndbtab, col_conflict_fn, ndb_conflict_fn[1],
+ tmp_buf, sizeof(tmp_buf)))
+ {
+ error_str= tmp_buf;
+ error= 1;
+ goto err;
+ }
+
+ DBUG_RETURN(0);
+ }
+
+err:
+ if (error > 0)
+ {
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_CONFLICT_FN_PARSE_ERROR,
+ ER(ER_CONFLICT_FN_PARSE_ERROR),
+ error_str);
+ }
+ else if (error < 0)
+ {
+ char msg[FN_REFLEN];
+ snprintf(msg, sizeof(msg),
+ "Missing or wrong type for column '%s'", error_str);
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_NDB_REPLICATION_SCHEMA_ERROR,
+ ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
+ msg);
+ }
+ else
+ {
+ char msg[FN_REFLEN];
+ snprintf(tmp_buf, sizeof(tmp_buf), "ndberror %u", ndberror.code);
+ snprintf(msg, sizeof(msg), "Unable to retrieve %s.%s, logging and "
+ "conflict resolution may not function as intended (%s)",
+ ndb_rep_db, ndb_replication_table, tmp_buf);
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_ILLEGAL_HA_CREATE_OPTION,
+ ER(ER_ILLEGAL_HA_CREATE_OPTION),
+ ndbcluster_hton_name, msg);
+ }
+ set_binlog_flags(share, NBT_DEFAULT);
+ DBUG_RETURN(ndberror.code);
}
bool
@@ -2524,7 +3000,7 @@ int ndbcluster_create_binlog_setup(Ndb *
if (share && share_may_exist)
{
- if (share->flags & NSF_NO_BINLOG ||
+ if (get_binlog_nologging(share) ||
share->op != 0 ||
share->op_old != 0)
{
@@ -2580,7 +3056,7 @@ int ndbcluster_create_binlog_setup(Ndb *
if (!do_event_op)
{
- share->flags|= NSF_NO_BINLOG;
+ set_binlog_nologging(share);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
@@ -2609,8 +3085,13 @@ int ndbcluster_create_binlog_setup(Ndb *
dict->getNdbError().code);
break; // error
}
+
+ /*
+ */
+ ndbcluster_read_binlog_replication(current_thd, ndb, share, ndbtab, ::server_id);
+
String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, db, table_name);
+ ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
/*
event should have been created by someone else,
but let's make sure, and create if it doesn't exist
@@ -2677,6 +3158,7 @@ ndbcluster_create_event(Ndb *ndb, const
DBUG_RETURN(0);
}
+ ndb->setDatabaseName(share->db);
NDBDICT *dict= ndb->getDictionary();
NDBEVENT my_event(event_name);
my_event.setTable(*ndbtab);
@@ -2707,8 +3189,16 @@ ndbcluster_create_event(Ndb *ndb, const
if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
strcmp(share->table_name, NDB_SCHEMA_TABLE))
{
- my_event.setReport(NDBEVENT::ER_UPDATED);
- DBUG_PRINT("info", ("subscription only updated"));
+ if (get_binlog_full(share))
+ {
+ my_event.setReport(NDBEVENT::ER_ALL);
+ DBUG_PRINT("info", ("subscription all"));
+ }
+ else
+ {
+ my_event.setReport(NDBEVENT::ER_UPDATED);
+ DBUG_PRINT("info", ("subscription only updated"));
+ }
}
else
{
@@ -3038,7 +3528,9 @@ ndbcluster_create_event_ops(NDB_SHARE *s
share->key, (long) share->op, share->use_count));
if (ndb_extra_logging)
- sql_print_information("NDB Binlog: logging %s", share->key);
+ sql_print_information("NDB Binlog: logging %s (%s,%s)", share->key,
+ get_binlog_full(share) ? "FULL" : "UPDATED",
+ get_binlog_use_update(share) ? "USE_UPDATE" : "USE_WRITE");
DBUG_RETURN(0);
}
@@ -3551,7 +4043,7 @@ ndb_binlog_thread_handle_data_event(Ndb
&b, table->record[0]);
DBUG_EXECUTE("info", print_records(table, table->record[0]););
if (table->s->primary_key != MAX_KEY &&
- opt_ndb_log_update_as_write)
+ !get_binlog_use_update(share))
{
/*
since table has a primary key, we can do a write
--- 1.27/sql/ha_ndbcluster_binlog.h 2007-05-14 14:46:20 +02:00
+++ 1.28/sql/ha_ndbcluster_binlog.h 2007-06-06 15:59:04 +02:00
@@ -155,7 +155,14 @@ int ndbcluster_handle_drop_table(Ndb *nd
NDB_SHARE *share,
const char *type_str);
void ndb_rep_event_name(String *event_name,
- const char *db, const char *tbl);
+ const char *db, const char *tbl, my_bool full);
+
+int
+ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
+ NDB_SHARE *share,
+ const NDBTAB *ndbtab,
+ uint server_id);
+
int ndb_create_table_from_engine(THD *thd, const char *db,
const char *table_name);
int ndbcluster_binlog_start();
--- 1.6/sql/ha_ndbcluster_tables.h 2007-03-20 17:07:50 +01:00
+++ 1.7/sql/ha_ndbcluster_tables.h 2007-06-06 15:59:04 +02:00
@@ -21,3 +21,15 @@
#define OLD_NDB_APPLY_TABLE "apply_status"
#define NDB_SCHEMA_TABLE "ndb_schema"
#define OLD_NDB_SCHEMA_TABLE "schema"
+#define NDB_REPLICATION_TABLE "ndb_replication"
+
+enum Ndb_binlog_type
+{
+ NBT_DEFAULT = 0
+ ,NBT_NO_LOGGING = 1
+ ,NBT_FULL = 2
+ ,NBT_UPDATED = 3
+ ,NBT_USE_UPDATE = 4 // bit 0x4 indicates USE_UPDATE
+ ,NBT_FULL_USE_UPDATE = 4+NBT_FULL // bit 0x4 indicates USE_UPDATE
+ ,NBT_UPDATED_USE_UPDATE = 4+NBT_UPDATED // bit 0x4 indicates USE_UPDATE
+};
--- 1.1/mysql-test/r/rpl_ndb_conflict.result 2007-06-01 17:10:35 +02:00
+++ 1.2/mysql-test/r/rpl_ndb_conflict.result 2007-06-06 15:59:03 +02:00
@@ -4,6 +4,7 @@ reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
+insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
create table t1 (a int key, b varchar(32), X int unsigned) engine ndb;
create table t2 (a int key, b varchar(32), c int unsigned) engine ndb;
begin;
--- 1.3/mysql-test/t/rpl_ndb_conflict.test 2007-06-02 06:29:33 +02:00
+++ 1.4/mysql-test/t/rpl_ndb_conflict.test 2007-06-06 15:59:03 +02:00
@@ -1,8 +1,37 @@
+#
+# Test engine native conflict resolution for ndb
+#
+#
--source include/have_ndb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
+--disable_warnings
+--disable_query_log
+--connection slave
+drop table if exists mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+ (db VARBINARY(63),
+ table_name VARBINARY(63),
+ server_id INT UNSIGNED,
+ binlog_type INT UNSIGNED,
+ conflict_fn VARBINARY(128),
+ PRIMARY KEY USING HASH (db,table_name,server_id))
+ ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+#
+# test native NDB$MAX() conflict resulution
+#
+--connection slave
+insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
--let col_type = int unsigned
--source extra/rpl_tests/rpl_ndb_conflict_1.test
--let col_type = bigint unsigned
--source extra/rpl_tests/rpl_ndb_conflict_1.test
+
+--disable_query_log
+--connection slave
+drop table mysql.ndb_replication;
+--enable_query_log
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.2523) | tomas | 6 Jun |