From: Frazer Clement Date: January 11 2012 11:53am Subject: bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4779 to 4780) List-Archive: http://lists.mysql.com/commits/142366 Message-Id: <201201111153.q0BBrAse006917@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4780 Frazer Clement 2012-01-11 Refactor ndb_apply_status write_row generation Existing code splits row buffer initialisation from setting and writing. Looks like an optimisation that's been broken. This patch pulls out the buffer preparation and writing into a method, and calls it from one place, chipping some code and locals off the Binlog Injector monolith. modified: sql/ha_ndbcluster_binlog.cc 4779 Frazer Clement 2012-01-11 Bug#13578660 NDB REPLICATION : CONFLICT DETECTION NOT ENABLED ON ALL CONNECTED MYSQLDS Conflict function setup was using a TABLE* object (MySQLD table representation) to determine the offsets of the resolve column in the table record for NDB$MAX and NDB$OLD, and the primary key column offsets for exceptions table inserts. Conflict function setup is part of binlog setup and occurs as a result of : - Table creation - Table 'discovery' due to observed ndb_schema notifications - Table 'discovery' due to SHOW TABLES - Table 'discovery' at server startup In some of these situations, the calling context has no MySQLD Table* for the table, and so null is passed. As the information to setup conflict detection was not available, it was not set up! Fix Remove the use of a TABLE* when setting up conflict detection. Parts : 1) Store resolve column number in NDB_SHARE rather than a record offset 2) Store exception's table pk attrids in NDB_SHARE rather than record offsets 3) Modify conflict function definition code to use resolve column number and NdbRecord to determine the correct offset. 4) Modify exceptions table insert code to use pk attrids and NdbRecord to determine the correct offsets. 5) Avoid using TABLE* members for error messages etc 6) Remove TABLE* parameter from calls 7) Remove check for TABLE*==NULL Additional items Cleanup/improve reporting of conflict function setup. Use sufficient bits to encode resolve column attrid. Add testcase to verify conflict table setup distribution. added: mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc modified: mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2012-01-11 11:07:09 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2012-01-11 11:49:05 +0000 @@ -6757,6 +6757,71 @@ void updateInjectorStats(Ndb* schemaNdb, dataNdb->getClientStat(Ndb::EventBytesRecvdCount); } +/** + injectApplyStatusWriteRow + + Inject a WRITE_ROW event on the ndb_apply_status table into + the Binlog. + This contains our server_id and the supplied epoch number. + When applied on the Slave it gives a transactional position + marker +*/ +static +bool +injectApplyStatusWriteRow(injector::transaction& trans, + ulonglong gci) +{ + DBUG_ENTER("injectApplyStatusWriteRow"); + if (ndb_apply_status_share == NULL) + { + sql_print_error("NDB: Could not get apply status share"); + DBUG_ASSERT(ndb_apply_status_share != NULL); + DBUG_RETURN(false); + } + + /* Build row buffer for generated ndb_apply_status + WRITE_ROW event + First get the relevant table structure. + */ + DBUG_ASSERT(!ndb_apply_status_share->event_data); + DBUG_ASSERT(ndb_apply_status_share->op); + Ndb_event_data* event_data= + (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); + DBUG_ASSERT(event_data); + DBUG_ASSERT(event_data->table); + TABLE* apply_status_table= event_data->table; + + /* + Intialize apply_status_table->record[0] + */ + empty_record(apply_status_table); + + apply_status_table->field[0]->store((longlong)::server_id, true); + apply_status_table->field[1]->store((longlong)gci, true); + apply_status_table->field[2]->store("", 0, &my_charset_bin); + apply_status_table->field[3]->store((longlong)0, true); + apply_status_table->field[4]->store((longlong)0, true); +#ifndef DBUG_OFF + const LEX_STRING& name= apply_status_table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", + (int) name.length, name.str)); +#endif + injector::transaction::table tbl(apply_status_table, true); + int ret = trans.use_table(::server_id, tbl); + assert(ret == 0); NDB_IGNORE_VALUE(ret); + + ret= trans.write_row(::server_id, + injector::transaction::table(apply_status_table, + true), + &apply_status_table->s->all_set, + apply_status_table->s->fields, + apply_status_table->record[0]); + + assert(ret == 0); + + DBUG_RETURN(true); +} + enum Binlog_thread_state { BCCC_running= 0, @@ -7240,44 +7305,6 @@ restart_cluster_failure: { DBUG_PRINT("info", ("pollEvents res: %d", res)); thd->proc_info= "Processing events"; - uchar apply_status_buf[512]; - TABLE *apply_status_table= NULL; - if (ndb_apply_status_share) - { - /* - We construct the buffer to write the apply status binlog - event here, as the table->record[0] buffer is referenced - by the apply status event operation, and will be filled - with data at the nextEvent call if the first event should - happen to be from the apply status table - */ - Ndb_event_data *event_data= ndb_apply_status_share->event_data; - if (!event_data) - { - DBUG_ASSERT(ndb_apply_status_share->op); - event_data= - (Ndb_event_data *) ndb_apply_status_share->op->getCustomData(); - DBUG_ASSERT(event_data); - } - apply_status_table= event_data->table; - - /* - Intialize apply_status_table->record[0] - */ - empty_record(apply_status_table); - - apply_status_table->field[0]->store((longlong)::server_id, true); - /* - gci is added later, just before writing to binlog as gci - is unknown here - */ - apply_status_table->field[2]->store("", 0, &my_charset_bin); - apply_status_table->field[3]->store((longlong)0, true); - apply_status_table->field[4]->store((longlong)0, true); - DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength); - memcpy(apply_status_buf, apply_status_table->record[0], - apply_status_table->s->reclength); - } NdbEventOperation *pOp= i_ndb->nextEvent(); ndb_binlog_index_row _row; ndb_binlog_index_row *rows= &_row; @@ -7405,35 +7432,11 @@ restart_cluster_failure: } if (trans.good()) { - if (apply_status_table) - { -#ifndef DBUG_OFF - const LEX_STRING& name= apply_status_table->s->table_name; - DBUG_PRINT("info", ("use_table: %.*s", - (int) name.length, name.str)); -#endif - injector::transaction::table tbl(apply_status_table, true); - int ret = trans.use_table(::server_id, tbl); - assert(ret == 0); NDB_IGNORE_VALUE(ret); - - /* add the gci to the record */ - Field *field= apply_status_table->field[1]; - my_ptrdiff_t row_offset= - (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]); - field->move_field_offset(row_offset); - field->store((longlong)gci, true); - field->move_field_offset(-row_offset); - - trans.write_row(::server_id, - injector::transaction::table(apply_status_table, - true), - &apply_status_table->s->all_set, - apply_status_table->s->fields, - apply_status_buf); - } - else + /* Inject ndb_apply_status WRITE_ROW event */ + if (!injectApplyStatusWriteRow(trans, + gci)) { - sql_print_error("NDB: Could not get apply status share"); + sql_print_error("NDB Binlog: Failed to inject apply status write row"); } } #ifdef RUN_NDB_BINLOG_TIMER No bundle (reason: useless for push emails).