From: Andrei Elkin Date: July 16 2011 5:09pm Subject: bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3348 to 3349) WL#5569 List-Archive: http://lists.mysql.com/commits/140329 Message-Id: <201107161709.p6GH9EFk013093@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3349 Andrei Elkin 2011-07-16 wl#5569 MTS The patch contains improvements after code review. Changes are mostly consmetic. @ mysql-test/suite/rpl/r/rpl_parallel_start_stop.result results updated. @ sql/binlog.cc correcting comments. @ sql/field.cc renaming. @ sql/log_event.cc renaming and separating out a block of code in Log_event::get_slave_worker() into a new method of Slave_job_group class; some cleanup. @ sql/log_event.h Extending and improving comments; renaming to follow is_, get_, set_ pattern; restoring the private access to do_apply_event() in Rows_log_event. @ sql/mysqld.cc removing extra declaration. @ sql/rpl_info_factory.cc Minor comments is added. @ sql/rpl_rli.cc renaming to make _cnt suffix to all entities that have counter meaning in mts; improving comments. @ sql/rpl_rli.h Renaming, and improving comments for the new members to Relay_log_info. @ sql/rpl_rli_pdb.cc remaning. @ sql/rpl_rli_pdb.h Improving comments readability through adding legengs defining MTS specific abbreviations. @ sql/rpl_slave.cc Renaming; minor cleanup in sql_slave_killed(); adding comments on Seconds_behind_master update policy with MTS. @ sql/share/errmsg-utf8.txt Improving text of few errors. modified: mysql-test/suite/rpl/r/rpl_parallel_start_stop.result sql/binlog.cc sql/field.cc sql/log_event.cc sql/log_event.h sql/mysqld.cc sql/rpl_info_factory.cc sql/rpl_rli.cc sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc sql/share/errmsg-utf8.txt 3348 Andrei Elkin 2011-07-16 bug#12755663 MTS: RPL_CIRCULAR_FOR_4_HOSTS FAILS: CANT EXECUTE THE CURRENT EVENT GROUP MTS stopped with an error in the middle of the test. The reason is that a group of events from the slave itself was processed partly to modify the group position. In the following restart the wrong group bondary made slave either to error out or assert. Fixed with locating a possible race condition allowin Coordinator to ignore actual failed status of a Worker. So in the case of the test, the slave server group can't be started. Notice, this is a trial patch since I can't catch the failure on available to me hosts at all. @ sql/rpl_rli_pdb.cc Changing the running status of the Worker before it releases assigned entries. That ensure that the waiting in wait_for_workers_to_finish() Coordinator exits the function with a negative result and therefore stops without attempting to apply an event due to which it attempted synchronization. Couple of diagnostics into error log are added. They may be removed in short while but currently might be helpful to provide details if the failure won't disappear after this push. modified: mysql-test/suite/rpl/r/rpl_circular_for_4_hosts.result mysql-test/suite/rpl/t/rpl_circular_for_4_hosts.test sql/rpl_rli_pdb.cc === modified file 'mysql-test/suite/rpl/r/rpl_parallel_start_stop.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result 2011-06-29 17:10:36 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result 2011-07-16 17:08:31 +0000 @@ -59,17 +59,17 @@ set @save.slave_transaction_retries= @@g set @@global.slave_transaction_retries= 1; start slave sql_thread; Warnings: -Note 1739 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop. +Note 1739 Temporary failed transaction retry is not supported in multi-threaded slave mode. Such failure will force the slave to stop. include/stop_slave.inc create table t2 (a int); insert into t2 values (1); start slave until master_log_file='master-bin.000001', master_log_pos=MASTER_LOG_POS; Warnings: -Note 1739 UNTIL condtion is not supported in Parallel Slave. Slave is started in the sequential execution mode. +Note 1739 UNTIL condtion is not supported in multi-threaded slave mode. Slave is started in the sequential execution mode. include/wait_for_slave_sql_to_stop.inc include/start_slave.inc Warnings: -Note 1739 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop. +Note 1739 Temporary failed transaction retry is not supported in multi-threaded slave mode. Such failure will force the slave to stop. drop table t1; drop table t2m; drop table t2; === modified file 'sql/binlog.cc' --- a/sql/binlog.cc 2011-07-13 15:23:57 +0000 +++ b/sql/binlog.cc 2011-07-16 17:08:31 +0000 @@ -4948,13 +4948,13 @@ THD::binlog_set_pending_rows_event(Rows_ } /** - @param db db name c-string to be inserted into abc-sorted + @param db db name c-string to be inserted into alphabetically sorted THD::binlog_accessed_db_names list. Note, as the list node data so the node struct itself are allocated in THD::main_mem_root. - The list lasts for the top-level query time and resets - in @c THD::cleanup_after_query() and Query_log_event::write(). + The list lasts for the top-level query time and is reset + in @c THD::cleanup_after_query(). */ void THD::add_to_binlog_accessed_dbs(const char *db) === modified file 'sql/field.cc' --- a/sql/field.cc 2011-07-08 06:44:35 +0000 +++ b/sql/field.cc 2011-07-16 17:08:31 +0000 @@ -6502,7 +6502,7 @@ Field_string::compatible_field_size(uint { #ifdef HAVE_REPLICATION const Check_field_param check_param = { this }; - if (!mts_is_worker(rli_arg->info_thd) && rpl_master_has_bug(rli_arg, 37426, TRUE, + if (!is_mts_worker(rli_arg->info_thd) && rpl_master_has_bug(rli_arg, 37426, TRUE, check_field_for_37426, &check_param)) return FALSE; // Not compatible field sizes #endif === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-07-14 09:40:06 +0000 +++ b/sql/log_event.cc 2011-07-16 17:08:31 +0000 @@ -783,7 +783,7 @@ int Log_event::do_update_pos(Relay_log_i Matz: I don't think we will need this check with this refactoring. */ - DBUG_ASSERT(!mts_is_worker(rli->info_thd)); + DBUG_ASSERT(!is_mts_worker(rli->info_thd)); if (rli) rli->stmt_done(log_pos); @@ -2401,7 +2401,7 @@ bool Log_event::contains_partition_info( Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) { - Slave_job_group g, *ptr_g; + Slave_job_group group, *ptr_group; bool is_b_event; int num_dbs= 0; Slave_worker *ret_worker= NULL; @@ -2430,21 +2430,9 @@ Slave_worker *Log_event::get_slave_worke rli->mts_groups_assigned++; rli->curr_group_isolated= FALSE; - g.master_log_pos= log_pos; - g.group_master_log_pos= g.group_relay_log_pos= 0; - g.group_master_log_name= NULL; // todo: remove - g.group_relay_log_name= NULL; - g.worker_id= MTS_WORKER_UNDEF; - g.total_seqno= rli->mts_groups_assigned; - g.checkpoint_log_name= NULL; - g.checkpoint_log_pos= 0; - g.checkpoint_relay_log_name= NULL; - g.checkpoint_relay_log_pos= 0; - g.checkpoint_seqno= (uint) -1; - g.done= 0; - + group.reset(log_pos, rli->mts_groups_assigned); // the last occupied GAQ's array index - gaq_idx= gaq->assigned_group_index= gaq->en_queue((void *) &g); + gaq_idx= gaq->assigned_group_index= gaq->en_queue((void *) &group); DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < gaq->size); DBUG_ASSERT(gaq->get_job_group(rli->gaq->assigned_group_index)-> @@ -2474,7 +2462,7 @@ Slave_worker *Log_event::get_slave_worke { int i= 0; num_dbs= mts_number_dbs(); - List_iterator it(*mts_get_dbs(&rli->mts_coor_mem_root)); + List_iterator it(*get_mts_dbs(&rli->mts_coor_mem_root)); it++; ret_worker= rli->last_assigned_worker; @@ -2520,12 +2508,12 @@ Slave_worker *Log_event::get_slave_worke i++; } while (it++); - if ((ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index))-> + if ((ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index))-> worker_id == MTS_WORKER_UNDEF) { - ptr_g->worker_id= ret_worker->id; + ptr_group->worker_id= ret_worker->id; - DBUG_ASSERT(ptr_g->group_relay_log_name == NULL); + DBUG_ASSERT(ptr_group->group_relay_log_name == NULL); } DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS); @@ -2590,11 +2578,11 @@ Slave_worker *Log_event::get_slave_worke if (ends_group() || !rli->curr_group_seen_begin) { // index of GAQ that this terminal event belongs to - mts_group_cnt= gaq->assigned_group_index; + mts_group_idx= gaq->assigned_group_index; rli->mts_group_status= Relay_log_info::MTS_END_GROUP; if (rli->curr_group_isolated) - mts_do_isolate_group(); - ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index); + set_mts_isolate_group(); + ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index); DBUG_ASSERT(ret_worker != NULL); @@ -2614,39 +2602,39 @@ Slave_worker *Log_event::get_slave_worke the new relay-log (where the current event is from) name delivery to Worker that will receive it in commit_positions(). */ - DBUG_ASSERT(ptr_g->group_relay_log_name == NULL); + DBUG_ASSERT(ptr_group->group_relay_log_name == NULL); - ptr_g->group_relay_log_name= (char *) + ptr_group->group_relay_log_name= (char *) my_malloc(strlen(rli-> get_group_relay_log_name()) + 1, MYF(MY_WME)); - strcpy(ptr_g->group_relay_log_name, + strcpy(ptr_group->group_relay_log_name, rli->get_event_relay_log_name()); - DBUG_ASSERT(ptr_g->group_relay_log_name != NULL); + DBUG_ASSERT(ptr_group->group_relay_log_name != NULL); ret_worker->relay_log_change_notified= TRUE; } if (!ret_worker->checkpoint_notified) { - ptr_g->checkpoint_log_name= (char *) + ptr_group->checkpoint_log_name= (char *) my_malloc(strlen(rli-> get_group_master_log_name()) + 1, MYF(MY_WME)); - strcpy(ptr_g->checkpoint_log_name, + strcpy(ptr_group->checkpoint_log_name, rli->get_group_master_log_name()); - ptr_g->checkpoint_log_pos= rli->get_group_master_log_pos(); - ptr_g->checkpoint_relay_log_name= (char *) + ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos(); + ptr_group->checkpoint_relay_log_name= (char *) my_malloc(strlen(rli-> get_group_relay_log_name()) + 1, MYF(MY_WME)); - strcpy(ptr_g->checkpoint_relay_log_name, + strcpy(ptr_group->checkpoint_relay_log_name, rli->get_group_relay_log_name()); - ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos(); - ptr_g->shifted= ret_worker->bitmap_shifted; + ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos(); + ptr_group->shifted= ret_worker->bitmap_shifted; ret_worker->bitmap_shifted= 0; ret_worker->checkpoint_notified= TRUE; } - ptr_g->checkpoint_seqno= rli->checkpoint_seqno; - ptr_g->ts= when + (time_t) exec_time; // Seconds_behind_master related + ptr_group->checkpoint_seqno= rli->checkpoint_seqno; + ptr_group->ts= when + (time_t) exec_time; // Seconds_behind_master related rli->checkpoint_seqno++; // reclaiming resources allocated during the group scheduling @@ -2686,7 +2674,7 @@ int Log_event::apply_event(Relay_log_inf { bool skip= bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) && - (mts_execution_mode(::server_id, + (get_mts_execution_mode(::server_id, rli->mts_group_status == Relay_log_info::MTS_IN_GROUP) == EVENT_EXEC_PARALLEL); if (skip) @@ -2701,7 +2689,7 @@ int Log_event::apply_event(Relay_log_inf if (!(parallel= rli->is_parallel_exec()) || ((actual_exec_mode= - mts_execution_mode(::server_id, + get_mts_execution_mode(::server_id, rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) != EVENT_EXEC_PARALLEL)) { @@ -2747,8 +2735,8 @@ int Log_event::apply_event(Relay_log_inf { // handle synchronization error rli->report(WARNING_LEVEL, 0, - "Coordinator thread of multi-threaded slave is exiting " - "seeing a failed Worker to apply an event."); + "Coordinator thread of multi-threaded slave found " + "a failed to apply an event Worker."); DBUG_RETURN(-1); } /* @@ -3983,7 +3971,7 @@ void Query_log_event::print(FILE* file, */ void Query_log_event::attach_temp_tables_worker(THD *thd) { - if (!mts_is_worker(thd) || !contains_partition_info()) + if (!is_mts_worker(thd) || !contains_partition_info()) return; // in over max-db:s case just one special parttion is locked @@ -4009,7 +3997,7 @@ void Query_log_event::attach_temp_tables */ void Query_log_event::detach_temp_tables_worker(THD *thd) { - if (!mts_is_worker(thd)) + if (!is_mts_worker(thd)) return; int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ? @@ -6657,12 +6645,18 @@ void Xid_log_event::print(FILE* file, PR #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +/** + Worker commits Xid transaction and in case of its transactional + info table marks the current group as done in the Coordnator's + Group Assigned Queue. + @return zero as success or non-zero as an error +*/ int Xid_log_event::do_apply_event_worker(Slave_worker *w) { int error= 0; bool is_trans_repo= w->is_transactional(); - Slave_committed_queue *gaq= w->c_rli->gaq; + Slave_committed_queue *coordinator_gaq= w->c_rli->gaq; /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, @@ -6682,10 +6676,10 @@ int Xid_log_event::do_apply_event_worker if (is_trans_repo) { - ulong gaq_idx= mts_group_cnt; - Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx); + ulong gaq_idx= mts_group_idx; + Slave_job_group *ptr_group= coordinator_gaq->get_job_group(gaq_idx); - if ((error= w->commit_positions(this, ptr_g, true))) + if ((error= w->commit_positions(this, ptr_group, true))) goto err; } @@ -9099,7 +9093,7 @@ Rows_log_event::do_update_pos(Relay_log_ get_flags(STMT_END_F) ? "STMT_END_F " : "")); /* Worker does not execute binlog update position logics */ - DBUG_ASSERT(!mts_is_worker(rli->info_thd)); + DBUG_ASSERT(!is_mts_worker(rli->info_thd)); if (get_flags(STMT_END_F)) { === modified file 'sql/log_event.h' --- a/sql/log_event.h 2011-07-08 06:44:35 +0000 +++ b/sql/log_event.h 2011-07-16 17:08:31 +0000 @@ -257,13 +257,16 @@ struct sql_ex_info #define IGNORABLE_HEADER_LEN 0 /* - The maximum number of updated databases that a status of Query-log-event can carry. - In can redefined still to not be bigger than OVER_MAX_DBS_IN_EVENT_MTS. + The maximum number of updated databases that a status of + Query-log-event can carry. It can redefined within a range + [1.. OVER_MAX_DBS_IN_EVENT_MTS]. */ #define MAX_DBS_IN_EVENT_MTS 16 + /* When the actual number of databases exceeds MAX_DBS_IN_EVENT_MTS - the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the mts_accessed_dbs status. + the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the + mts_accessed_dbs status. */ #define OVER_MAX_DBS_IN_EVENT_MTS 254 @@ -541,7 +544,7 @@ struct sql_ex_info in isolation from any other Workers. Typically that is done for a transaction that contains a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases. - The flag is set ON for an event that terminates its group. + The flag can be set ON only for an event that terminates its group. */ #define LOG_EVENT_MTS_ISOLATE_F 0x200 @@ -1080,11 +1083,12 @@ public: ha_checksum crc; /** - Index in @c rli->gaq array to indicate a group that this event is purging. - The index is set by C:r to a group terminator event is checked by W at - the event execution. The indexed data represent the Worker progress status. + Index in @c rli->gaq array to indicate a group that this event is + purging. The index is set by Coordinator to a group terminator + event is checked by Worker at the event execution. The indexed + data represent the Worker progress status. */ - ulong mts_group_cnt; + ulong mts_group_idx; /** MTS: associating the event with either an assigned Worker or Coordinator. @@ -1274,7 +1278,10 @@ public: private: /* - possible decisions by mts_execution_mode() + possible decisions by get_mts_execution_mode(). + The execution mode can be PARALLEL or not (thereby sequential + unless impossible at all). When it's sequential it further breaks into + ASYNChronous and SYNChronous. */ enum enum_mts_event_exec_mode { @@ -1287,7 +1294,7 @@ private: */ EVENT_EXEC_ASYNC, /* - Event is run by Coordinator and requires W:s synchronization. + Event is run by Coordinator and requires synchronization with Workers. */ EVENT_EXEC_SYNC, /* @@ -1297,7 +1304,7 @@ private: }; /** - Is called from mts_execution_mode() to + Is called from get_mts_execution_mode() to @return TRUE if the event needs applying with synchronization agaist Workers, otherwise @@ -1309,7 +1316,7 @@ private: todo: to mts-support Old master Load-data related events */ - bool mts_sequential_exec() + bool is_mts_sequential_exec() { return get_type_code() == START_EVENT_V3 || @@ -1338,7 +1345,7 @@ private: @retval EVENT_EXEC_ASYNC if event is executed by Coordinator with synchronization against the Workers */ - enum enum_mts_event_exec_mode mts_execution_mode(ulong slave_server_id, + enum enum_mts_event_exec_mode get_mts_execution_mode(ulong slave_server_id, bool mts_in_group) { if ((get_type_code() == FORMAT_DESCRIPTION_EVENT && @@ -1349,7 +1356,7 @@ private: (log_pos == 0 /* very first fake Rotate */ && mts_in_group /* ignored events, R_f at slave restart */)))) return EVENT_EXEC_ASYNC; - else if (mts_sequential_exec()) + else if (is_mts_sequential_exec()) return EVENT_EXEC_SYNC; else return EVENT_EXEC_PARALLEL; @@ -1366,7 +1373,7 @@ private: The method returns a list of updated by the event databases. Other than in the case of Query-log-event the list is just one item. */ - virtual List* mts_get_dbs(MEM_ROOT *mem_root) + virtual List* get_mts_dbs(MEM_ROOT *mem_root) { List *res= new List; res->push_back(strdup_root(mem_root, get_db())); @@ -1379,7 +1386,7 @@ private: Typically that is done for a transaction that contains a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases. */ - virtual void mts_do_isolate_group() + virtual void set_mts_isolate_group() { DBUG_ASSERT(ends_group() || get_type_code() == QUERY_EVENT || @@ -1408,10 +1415,10 @@ public: execute in isolation from other Workers, FASE otherwise */ - bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; } + bool is_mts_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; } /** - Events of a cetain type can start or end a group of events treated + Events of a certain type can start or end a group of events treated transactionally wrt binlog. Public access is required by implementation of recovery + skip. @@ -1427,8 +1434,6 @@ public: */ virtual bool ends_group() { return FALSE; } -public: - /** Apply the event to the database. @@ -2030,7 +2035,7 @@ public: Returns a list of updated databases or the default db single item list in case of the number of databases exceeds MAX_DBS_IN_EVENT_MTS. */ - virtual List* mts_get_dbs(MEM_ROOT *mem_root) + virtual List* get_mts_dbs(MEM_ROOT *mem_root) { List *res= new (mem_root) List; if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) @@ -3211,7 +3216,7 @@ public: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; } - virtual List* mts_get_dbs(MEM_ROOT *mem_root) + virtual List* get_mts_dbs(MEM_ROOT *mem_root) { List *res= new List; res->push_back(strdup_root(mem_root, "")); @@ -4463,11 +4468,11 @@ public: { return IGNORABLE_HEADER_LEN + 1 + (uint) strlen(m_rows_query); } -#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); -#endif private: +#if !defined(MYSQL_CLIENT) + virtual int do_apply_event(Relay_log_info const* rli); +#endif char * m_rows_query; }; === modified file 'sql/mysqld.cc' --- a/sql/mysqld.cc 2011-07-08 06:44:35 +0000 +++ b/sql/mysqld.cc 2011-07-16 17:08:31 +0000 @@ -465,7 +465,6 @@ ulong slave_exec_mode_options; ulonglong slave_type_conversions_options; ulong opt_mts_slave_parallel_workers; ulonglong opt_mts_pending_jobs_size_max; -ulong opt_mts_coordinator_basic_nap; ulong thread_cache_size=0; ulong binlog_cache_size=0; ulonglong max_binlog_cache_size=0; === modified file 'sql/rpl_info_factory.cc' --- a/sql/rpl_info_factory.cc 2011-07-08 06:44:35 +0000 +++ b/sql/rpl_info_factory.cc 2011-07-16 17:08:31 +0000 @@ -23,7 +23,7 @@ /** Creates both a Master info and a Relay log info repository whose types are - defined as parameters. + defined as parameters. Nothing is done for Workers here. @todo Make the repository a pluggable component. === modified file 'sql/rpl_rli.cc' --- a/sql/rpl_rli.cc 2011-07-13 15:23:57 +0000 +++ b/sql/rpl_rli.cc 2011-07-16 17:08:31 +0000 @@ -123,8 +123,8 @@ void Relay_log_info::init_workers(ulong Parallel slave parameters initialization is done regardless whether the feature is or going to be active or not. */ - mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits= 0; - mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; + mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0; + mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4); } @@ -1038,7 +1038,7 @@ void Relay_log_info::stmt_done(my_off_t DBUG_ASSERT(!belongs_to_client()); /* Worker does not execute binlog update position logics */ - DBUG_ASSERT(!mts_is_worker(info_thd)); + DBUG_ASSERT(!is_mts_worker(info_thd)); /* If in a transaction, and if the slave supports transactions, just @@ -1073,12 +1073,13 @@ void Relay_log_info::stmt_done(my_off_t if (is_parallel_exec()) { - DBUG_ASSERT(!mts_is_worker(info_thd)); + DBUG_ASSERT(!is_mts_worker(info_thd)); /* - Format Description events are special events that are handled as a - synchronization points. For that reason, the checkpoint routine is - being called here. + Format Description events only can drive MTS execution to this + point. It is a special event group that is handled with + synchronization. For that reason, the checkpoint routine is + called here. */ (void) mts_checkpoint_routine(this, 0, FALSE, FALSE); // TODO: ALFRANIO ERROR } === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2011-07-08 19:40:52 +0000 +++ b/sql/rpl_rli.h 2011-07-16 17:08:31 +0000 @@ -426,7 +426,7 @@ public: legends: C - Coordinator; W - Worker; - WQ - Worker Query containing event assignments + WQ - Worker Queue containing event assignments */ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers volatile ulong pending_jobs; @@ -451,10 +451,11 @@ public: bool curr_group_isolated; // current group requires execution in isolation volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty /* - excessive overrun counter; when W increments and decrements it it - also marks updates its own wq_overrun_set + Ongoing excessive overrun counter to correspond to number of events that + are being scheduled while a WQ is close to be filled up. + The counter describes level of saturation that Workers are experiencing. */ - volatile long mts_wq_excess; + volatile long mts_wq_excess_cnt; long mts_worker_underrun_level; // % of WQ size at which W is considered hungry ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun ulong opt_slave_parallel_workers; // cache for ::opt_slave_parallel_workers @@ -498,7 +499,7 @@ public: MTS_IN_GROUP, /* at least one not-terminal event scheduled to a Worker */ MTS_END_GROUP, /* the last scheduled event is a terminal event */ - MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */ + MTS_KILLED_GROUP /* Coordinator gave up to reach MTS_END_GROUP */ } mts_group_status; /* @@ -506,10 +507,14 @@ public: */ ulong mts_events_assigned; // number of events (statements) scheduled ulong mts_groups_assigned; // number of groups (transactions) scheduled - volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess increments - ulong wq_size_waits; // number of times C slept due to WQ:s oversize - ulong mts_wq_no_underrun_cnt;// number of times of C slept when W:s were filled - ulong mts_wq_overfill_cnt; // counter of C waited when a W's queue was full + volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess_cnt increments + ulong wq_size_waits_cnt; // number of times C slept due to WQ:s oversize + /* + a counter for sleeps due to Coordinator + experienced waiting when Workers get hungry again + */ + ulong mts_wq_no_underrun_cnt; + ulong mts_wq_overfill_cnt; // counter of C waited due to a WQ queue was full /* A sorted array of the Workers' current assignement numbers to provide approximate view on Workers loading. @@ -794,7 +799,7 @@ THD* mts_get_coordinator_thd(); @param thd a reference to THD @return TRUE if thd belongs to a Worker thread and FALSE otherwise. */ -inline bool mts_is_worker(const THD *thd) +inline bool is_mts_worker(const THD *thd) { return thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER; } === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2011-07-15 23:11:11 +0000 +++ b/sql/rpl_rli_pdb.cc 2011-07-16 17:08:31 +0000 @@ -126,7 +126,7 @@ int Slave_worker::init_worker(Relay_log_ checkpoint_notified= FALSE; bitmap_shifted= 0; workers= c_rli->workers; // shallow copying is sufficient - wq_size_waits= groups_done= events_done= curr_jobs= 0; + wq_size_waits_cnt= groups_done= events_done= curr_jobs= 0; usage_partition= 0; last_group_done_index= c_rli->gaq->size; // out of range @@ -885,7 +885,7 @@ void Slave_worker::slave_worker_ends_gro if (!error) { Slave_committed_queue *gaq= c_rli->gaq; - ulong gaq_idx= ev->mts_group_cnt; + ulong gaq_idx= ev->mts_group_idx; Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx); // first ever group must have relay log name @@ -1523,7 +1523,7 @@ void append_item_to_jobs(slave_job_item sprintf(wait_info, info_format, new_pend_size); rli->mts_wq_oversize= TRUE; - rli->wq_size_waits++; // waiting due to the total size + rli->wq_size_waits_cnt++; // waiting due to the total size old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock, wait_info); mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); @@ -1548,7 +1548,7 @@ void append_item_to_jobs(slave_job_item { // todo: experiment with weight to get a good approximation formula // The longer Sleep lasts the bigger is excessive overrun counter. - ulong nap_weight= rli->mts_wq_excess + 1; + ulong nap_weight= rli->mts_wq_excess_cnt + 1; my_sleep(nap_weight * rli->mts_coordinator_basic_nap); rli->mts_wq_no_underrun_cnt++; } @@ -1720,7 +1720,7 @@ int slave_worker_exec_job(Slave_worker * } } worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos); - error= ev->do_apply_event_worker(worker); + error= ev->do_apply_event(worker); if (ev->ends_group() || (!worker->curr_group_seen_begin && /* p-events of B/T-less {p,g} group (see @@ -1731,7 +1731,7 @@ int slave_worker_exec_job(Slave_worker * { DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed %lu", - ev->mts_group_cnt, worker->last_group_done_index)); + ev->mts_group_idx, worker->last_group_done_index)); worker->slave_worker_ends_group(ev, error); /* last done sets post exec */ #ifndef DBUG_OFF @@ -1784,17 +1784,17 @@ int slave_worker_exec_job(Slave_worker * if (((100 - rli->mts_worker_underrun_level) * worker->jobs.size) / 100.0 < worker->jobs.len) { - rli->mts_wq_excess++; + rli->mts_wq_excess_cnt++; worker->wq_overrun_set= TRUE; rli->mts_wq_overrun_cnt++; } else if (worker->wq_overrun_set == TRUE) { - rli->mts_wq_excess--; + rli->mts_wq_excess_cnt--; worker->wq_overrun_set= FALSE; } - DBUG_ASSERT(rli->mts_wq_excess >= 0); + DBUG_ASSERT(rli->mts_wq_excess_cnt >= 0); /* coordinator can be waiting */ if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max && === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2011-07-13 15:23:57 +0000 +++ b/sql/rpl_rli_pdb.h 2011-07-16 17:08:31 +0000 @@ -6,6 +6,18 @@ #include "rpl_rli.h" #include #include +#include "rpl_slave.h" + +/** + Legends running throughout the module: + + C - Coordinator + CP - checkpoint + W - Worker + + B-event event that Begins a group (a transaction) + T-event event that Terminates a group (a transaction) +*/ /* Assigned Partition Hash (APH) entry */ typedef struct st_db_worker_hash_entry @@ -117,17 +129,20 @@ public: typedef struct st_slave_job_group { - char *group_master_log_name; // (actually redundant) - my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint + char *group_master_log_name; // (actually redundant) + /* + T-event lop_pos filled by Worker for CheckPoint (CP) + */ + my_off_t group_master_log_pos; /* - When RL name changes C allocates and fill in a new name of RL, + When relay-log name changes allocates and fill in a new name of relay-log, otherwise it fills in NULL. - C keeps track of each Worker has been notified on the updating + Coordinator keeps track of each Worker has been notified on the updating to make sure the routine runs once per change. W checks the value at commit and memoriezes a not-NULL. - Freeing unless NULL is left to C at CheckPoint. + Freeing unless NULL is left to Coordinator at CP. */ char *group_relay_log_name; // The value is last seen relay-log my_off_t group_relay_log_pos; // filled by W @@ -142,9 +157,29 @@ typedef struct st_slave_job_group char* checkpoint_log_name; my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint char* checkpoint_relay_log_name; - volatile uchar done; // Flag raised by W, read and reset by C + volatile uchar done; // Flag raised by W, read and reset by Coordinator ulong shifted; // shift the last CP bitmap at receiving a new CP time_t ts; // Group's timestampt to update Seconds_behind_master + + /* + Coordinator fills the struct with defaults and options at starting of + a group distribution. + */ + void reset(my_off_t master_pos, ulonglong seqno) + { + master_log_pos= master_pos; + group_master_log_pos= group_relay_log_pos= 0; + group_master_log_name= NULL; // todo: remove + group_relay_log_name= NULL; + worker_id= MTS_WORKER_UNDEF; + total_seqno= seqno; + checkpoint_log_name= NULL; + checkpoint_log_pos= 0; + checkpoint_relay_log_name= NULL; + checkpoint_relay_log_pos= 0; + checkpoint_seqno= (uint) -1; + done= 0; + } } Slave_job_group; /** @@ -217,13 +252,25 @@ public: return (Slave_job_group*) dynamic_array_ptr(&Q, ind); } + /** + Assignes @c assigned_group_index to an index of enqueued item + and returns it. + */ + ulong en_queue(void *item) + { + return assigned_group_index= circular_buffer_queue::en_queue(item); + } + }; class Slave_jobs_queue : public circular_buffer_queue { public: - /* C marks with true, W signals back at queue back to available */ + /* + Coordinator marks with true, Worker signals back at queue back to + available + */ bool overfill; ulonglong waited_overfill; }; @@ -267,10 +314,10 @@ public: volatile bool relay_log_change_notified; // Coord sets and resets, W can read volatile bool checkpoint_notified; // Coord sets and resets, W can read ulong bitmap_shifted; // shift the last bitmap at receiving new CP - bool wq_overrun_set; // W marks inself as incrementer of rli->mts_wq_excess + bool wq_overrun_set; // W marks inself as incrementer of rli->mts_wq_excess_cnt /* - Coordinatates of the last CheckPoint (CP) this Worker has + Coordinates of the last CheckPoint (CP) this Worker has acknowledged; part of is persisent data */ char checkpoint_relay_log_name[FN_REFLEN]; === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2011-07-14 07:03:55 +0000 +++ b/sql/rpl_slave.cc 2011-07-16 17:08:31 +0000 @@ -1106,6 +1106,8 @@ static bool sql_slave_killed(THD* thd, R DBUG_ASSERT(rli->slave_running == 1); if (abort_loop || thd->killed || rli->abort_slave) { + is_parallel_warn= (rli->is_parallel_exec() && + (rli->is_mts_in_group() || thd->killed)); /* Slave can execute stop being in one of two MTS or Single-Threaded mode. The modes define different criteria to accept the stop. @@ -1113,9 +1115,7 @@ static bool sql_slave_killed(THD* thd, R Killed Coordinator thread expects the worst so it warns on possible consistency issue. */ - if ((is_parallel_warn= (rli->is_parallel_exec() && - (rli->is_mts_in_group() || thd->killed))) - || + if (is_parallel_warn || (!rli->is_parallel_exec() && thd->transaction.all.cannot_safely_rollback() && rli->is_in_group())) { @@ -2822,7 +2822,7 @@ int apply_event_and_update_pos(Log_event Slave_job_item item= {ev}, *job_item= &item; Slave_worker *w= (Slave_worker *) ev->worker; // specially marked group typically with OVER_MAX_DBS_IN_EVENT_MTS db:s - bool need_sync= ev->mts_is_group_isolated(); + bool need_sync= ev->is_mts_group_isolated(); // all events except BEGIN-query must be marked with a non-NULL Worker DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker); @@ -3853,7 +3853,7 @@ pthread_handler_t handle_slave_worker(vo "events processed = %lu " "hungry waits = %lu " "priv queue overfills = %llu ", - w->id, w->events_done, w->wq_size_waits, + w->id, w->events_done, w->wq_size_waits_cnt, w->jobs.waited_overfill); mysql_cond_signal(&w->jobs_cond); // famous last goodbye @@ -4364,7 +4364,7 @@ int slave_start_workers(Relay_log_info * rli->mts_pending_jobs_size= 0; rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max; rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF; - rli->mts_wq_excess= 0; + rli->mts_wq_excess_cnt= 0; rli->mts_wq_overrun_cnt= 0; rli->mts_wq_oversize= FALSE; rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap; @@ -4507,7 +4507,7 @@ void slave_stop_workers(Relay_log_info * "waited due the total size = %lu " "sleept when Workers occupied = %lu ", rli->mts_events_assigned, rli->mts_wq_overrun_cnt, - rli->mts_wq_overfill_cnt, rli->wq_size_waits, + rli->mts_wq_overfill_cnt, rli->wq_size_waits_cnt, rli->mts_wq_no_underrun_cnt); DBUG_ASSERT(rli->pending_jobs == 0); @@ -6123,10 +6123,18 @@ static Log_event* next_event(Relay_log_i reached. */ - /* shows zero while it is sleeping (and until the next event - is about to be executed). - Note, in MTS case Seconds_Behind_Master resetting follows - slightly different schema where reaching EOF is not enough. + /* shows zero while it is sleeping (and until the next event + is about to be executed). Note, in MTS case + Seconds_Behind_Master resetting follows slightly different + schema where reaching EOF is not enough. The status + parameter is updated per some number of processed group of + events. The number can't be greater than + @@global.slave_checkpoint_group and anyway SBM updating + rate does not exceed @@global.slave_checkpoint_period. + Notice that SBM is set to a new value after processing the + terminal event (e.g Commit) of a group. Coordinator resets + SBM when notices no more groups left neither to read from + Relay-log nor to process by Workers. */ if (!rli->is_parallel_exec()) rli->last_master_timestamp= 0; === modified file 'sql/share/errmsg-utf8.txt' --- a/sql/share/errmsg-utf8.txt 2011-07-14 09:40:06 +0000 +++ b/sql/share/errmsg-utf8.txt 2011-07-16 17:08:31 +0000 @@ -6513,10 +6513,10 @@ ER_ERROR_IN_UNKNOWN_TRIGGER_BODY eng "Unknown trigger has an error in its body: '%-.256s'" ER_MTS_FEATURE_IS_NOT_SUPPORTED - eng "%s is not supported in Parallel Slave. %s" + eng "%s is not supported in multi-threaded slave mode. %s" ER_MTS_UPDATED_DBS_GREATER_MAX - eng "Modified database names number exceeds the maximum %d; the names are not written into the replication event." + eng "The number of modified databases exceeds the maximum %d; the database names will not be included in the replication event metadata." ER_MTS_CANT_PARALLEL - eng "Can't execute the current event group in parallel mode. Encountered event %s, relay-log name %s, position %s which prevents execution of this event group in parallel mode. Reason: %s." + eng "Cannot execute the current event group in the parallel mode. Encountered event %s, relay-log name %s, position %s which prevents execution of this event group in parallel mode. Reason: %s." ER_MTS_PARALLEL_INCONSISTENT_DATA eng "%s" No bundle (reason: useless for push emails).