From: Andrei Elkin Date: May 30 2011 9:38am Subject: bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3286) WL#5569 List-Archive: http://lists.mysql.com/commits/138380 Message-Id: <201105300938.p4U9cb9A025936@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="===============1763042862==" --===============1763042862== MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Content-Disposition: inline #At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped 3286 Andrei Elkin 2011-05-30 wl#5569 MTS An intermediate patch to address few issues raised by reviewers. To sum up, it's about cleanup and logics simplification of event distribution to Worker and consequent actions. Some efforts were paid to support Old Master Begin-less group of events. @ sql/log_event.cc Cleanup and logics simplification in Log_event::get_slave_worker_id(), Log_event::apply_event(). The essense is: a. to return back to apply_event_and_update_pos() event associated either with the single-threaded sql-thread rli, or one of Coord or Worker. b. while the beginning of a group and corresponding actions are left to Log_event::get_slave_worker_id(), other actions including passing the event to a Worker and the final closure of the current group is moved into apply_event_and_update_pos(). Correcting Query_log_event::at-,de-tach_temp_tables() to expect the magic "-empty string name db partition through which the applier thread receives temp tables. @ sql/log_event.h More members are added to Log_event a. to associate the event with applier. b. to provide marking a B-less group of events (old master, select sf()). @ sql/rpl_rli.h Removing rli members that aren't necessary any longer. @ sql/rpl_rli_pdb.cc cleanup, a new assert, and init of an debug-related member. @ sql/rpl_rli_pdb.h Memorizing last deleted event for debugging purpose. @ sql/rpl_slave.cc Cleanup and, moving append_item_to_jobs() invocation into apply_event_and_update_pos() as well as other actions mentioned in log_event.cc comments; changing signature of apply_event_and_update_pos() to return NULL in place of referrenced pointer in case the event is handed over to a Worker; checking of the pointer value is done in places dealing with update-pos and event's destruction. modified: sql/log_event.cc sql/log_event.h sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-05-28 10:42:40 +0000 +++ b/sql/log_event.cc 2011-05-30 09:38:24 +0000 @@ -2368,11 +2368,10 @@ bool Log_event::contains_partition_info( /** General hashing function to compute the id of an applier for the current event. - The event is marked as belonging to a Worker. At computing the id few rules apply depending on partitioning properties that the event instance can feature. - Let's call the properties. + Let's call the properties through the following legends: B - beginning of a group of events (BEGIN query_log_event) g - mini-group representative event containing the partition info @@ -2380,34 +2379,37 @@ bool Log_event::contains_partition_info( p - a mini-group internal event that *p*receeding its g-parent (int_, rand_, user_ var:s) r - a mini-group internal "regular" event that follows its g-parent - (Write, Update, Delete -rows) + (Delete, Update, Write -rows) S - sequentially applied event (may not be a part of any group). Events of this type are determined via @c mts_sequential_exec() earlier and don't cause calling this method . T - terminator of the group (XID, COMMIT, ROLLBACK) - Only `g' case requires to compute the assigned Worker id. - In `T, r' cases it's @c last_assigned_worker that is one that was - assigned at the last `g' processing. - In `B' case it's NULL to indicate the Coordinator will skip doing anything - more with the event. Its scheduling gets deffered until the following - `g' event names a Worker. + Only `g' case really computes the assigned Worker id which must + be memorized by the caller and is available through @c rli argument. + For instance DUW-rows events are mapped to a Worker previously chosen + at assigning of their Table-map parent g-event. + In `B' case the assigned Worker is NULL to indicate the Coordinator will + postpone scheduling until a following `g' event decides on a Worker. - @note `p' and g-Query-log-event is not supported yet. - - @note The function can update APH, CGAP objects. + A group can consist of multiple events still without explict B + event. This is a case of old master binlog or few corner-cases of + the current master version (todo: to fix). Such group structure is + supposed to be {{p_i},g} that is it ends with the first not p-event. + Such g-event is marked with set_mts_event_ends_group(). + @note The function can update APH, CGAP, GAQ objects. + @return a pointer to the Worker stuct or NULL. */ Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli) { - Slave_worker *worker= NULL; Slave_job_group g, *ptr_g; bool is_b_event; + int num_dbs= 0; + Slave_worker *ret_worker= NULL; - rli->curr_event_is_parallel= TRUE; // event belongs to a Worker - /* checking partioning properties and perform corresponding actions */ // Beginning of a group designated explicitly with BEGIN @@ -2457,7 +2459,8 @@ Slave_worker *Log_event::get_slave_worke // mark the current grup as started with B-event rli->curr_group_seen_begin= TRUE; - return NULL; + + return ret_worker; } } @@ -2466,27 +2469,24 @@ Slave_worker *Log_event::get_slave_worke if (contains_partition_info()) { int i= 0; - int num_dbs= mts_number_dbs(); + num_dbs= mts_number_dbs(); List_iterator it(*mts_get_dbs(rli->info_thd->mem_root)); it++; if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) { - // Temporary tables of Coordinator are relocated by Worker + // provide a hint - Worker with id 0 - to the following assign if (!rli->last_assigned_worker) rli->last_assigned_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0); - - if (!rli->curr_group_isolated) - (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker); - rli->curr_group_isolated= TRUE; + (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker); } do { char **ref_cur_db= it.ref(); - if (!(rli->last_assigned_worker= + if (!(ret_worker= get_slave_worker(*ref_cur_db, rli, &mts_assigned_partitions[i], get_type_code() == QUERY_EVENT))) @@ -2496,24 +2496,23 @@ Slave_worker *Log_event::get_slave_worke { delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k); } - return NULL; + + return ret_worker; } DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db)); - DBUG_ASSERT(rli->last_assigned_worker == - mts_assigned_partitions[i]->worker); + DBUG_ASSERT(ret_worker == mts_assigned_partitions[i]->worker); DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0); i++; } while (it++); - worker= rli->last_assigned_worker; if ((ptr_g= ((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, - rli->gaq->assigned_group_index)))-> worker_id + rli->gaq->assigned_group_index)))->worker_id == (ulong) -1) { - ptr_g->worker_id= worker->id; + ptr_g->worker_id= ret_worker->id; DBUG_ASSERT(ptr_g->group_relay_log_name == NULL); } @@ -2533,9 +2532,10 @@ Slave_worker *Log_event::get_slave_worke // a mini-group internal "regular" event if (rli->last_assigned_worker) { - worker= rli->last_assigned_worker; + ret_worker= rli->last_assigned_worker; - DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 || worker->id == 0); + DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 || + ret_worker->id == 0); } else // int_, rand_, user_ var:s { @@ -2564,24 +2564,31 @@ Slave_worker *Log_event::get_slave_worke { DBUG_ASSERT(rli->curr_group_da.elements > 1); } - return NULL; + + DBUG_ASSERT(!ret_worker); + + return ret_worker; } } // the group terminal event (Commit, Xid or a DDL query) if (ends_group() || !rli->curr_group_seen_begin) { - uint i; - // index of GAQ that this terminal event belongs to mts_group_cnt= rli->gaq->assigned_group_index; + if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) + set_mts_event_ends_group(); + ptr_g= (Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index); - // TODO: throw an error when relay-log reading starts from inside of a group!! + DBUG_ASSERT(ret_worker != NULL); + + // TODO: UNTIL option, throw an error when relay-log reading + // starts from inside of a group!! - if (!worker->relay_log_change_notified) + if (!ret_worker->relay_log_change_notified) { /* Prior this event, C rotated the relay log to drop each @@ -2599,10 +2606,10 @@ Slave_worker *Log_event::get_slave_worke DBUG_ASSERT(ptr_g->group_relay_log_name != NULL); - worker->relay_log_change_notified= TRUE; + ret_worker->relay_log_change_notified= TRUE; } - if (!worker->checkpoint_notified) + if (!ret_worker->checkpoint_notified) { // Worker to dealloc // master binlog checkpoint @@ -2619,24 +2626,13 @@ Slave_worker *Log_event::get_slave_worke strcpy(ptr_g->checkpoint_relay_log_name, rli->get_group_relay_log_name()); ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos(); - worker->checkpoint_notified= TRUE; + ret_worker->checkpoint_notified= TRUE; } ptr_g->checkpoint_seqno= rli->checkpoint_seqno; rli->checkpoint_seqno++; - - DBUG_ASSERT(worker != NULL && worker == rli->last_assigned_worker); - - // CGAP cleanup - for (i= rli->curr_group_assigned_parts.elements; i > 0; i--) - delete_dynamic_element(&rli-> - curr_group_assigned_parts, i - 1); - rli->last_assigned_worker= NULL; - - // reset the B-group marker - rli->curr_group_seen_begin= FALSE; } - return worker; + return ret_worker; } // returns the next available! (TODO: incompatible to circurla_buff method!!!) @@ -2813,16 +2809,22 @@ void append_item_to_jobs(slave_job_item } /** - scheduling event execution either serially or in parallel + Scheduling event to execute in parallel or execute it directly. + In MTS case the event gets associated with either Coordinator or a + Worker. A special case of the association is NULL when the Worker + can't be decided yet. In the single threaded sequential mode the + event maps to SQL thread rli. + + @return 0 as success, otherwise a failure. */ int Log_event::apply_event(Relay_log_info const *rli) { - uint i; DBUG_ENTER("LOG_EVENT:apply_event"); Slave_worker *w= NULL; - Slave_job_item item= {NULL}, *job_item= &item; Relay_log_info *c_rli= const_cast(rli); // constless alias - bool parallel= FALSE, async_event= FALSE, seq_event= FALSE, term_event= FALSE; + bool parallel= FALSE, async_event= FALSE, seq_event= FALSE; + + worker= c_rli; if (rli->is_mts_recovery()) { @@ -2849,7 +2851,6 @@ int Log_event::apply_event(Relay_log_inf { if (parallel) { - c_rli->curr_event_is_parallel= FALSE; // mark event as belonging to Coordinator /* There are two classes of events that Coordinator executes itself. One e.g the master Rotate requires all Workers to finish up @@ -2910,51 +2911,12 @@ int Log_event::apply_event(Relay_log_inf DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) || rli->last_assigned_worker); - // getting Worker's id - if ((!(w= get_slave_worker_id(c_rli)) || - DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))) + worker= NULL; + c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli); + if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)) DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE); - job_item->data= this; - - DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id)); - - if (rli->curr_group_da.elements > 0) - { - /* - the current event sorted out which partion the current group belongs to. - It's time now to processed deferred array events. - */ - for (i= 0; i < rli->curr_group_da.elements; i++) - { - Slave_job_item da_item; - get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i); - append_item_to_jobs(&da_item, w, c_rli); - } - if (rli->curr_group_da.elements > rli->curr_group_da.max_element) - { - // reallocate to less mem - - DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements); - - c_rli->curr_group_da.elements= rli->curr_group_da.max_element; - c_rli->curr_group_da.max_element= 0; - freeze_size(&c_rli->curr_group_da); // restores max_element - } - c_rli->curr_group_da.elements= 0; - } - - if (c_rli->curr_group_isolated) - term_event= ends_group(); - - append_item_to_jobs(job_item, w, c_rli); - - if (c_rli->curr_group_isolated && term_event) - { - // to make sure the isolated group terminates in isolation as well - (void) wait_for_workers_to_finish(rli, w); - c_rli->curr_group_isolated= FALSE; - } + worker= (Relay_log_info*) w; DBUG_RETURN(FALSE); } @@ -3145,8 +3107,15 @@ err: w->slave_worker_ends_group(ev, error); // rows_query_log_event is deleted as a part of the statement cleanup + + // todo: sync_slave_with_master fails when my_sleep(1000) is put here + if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) + { + w->last_event= ev; delete ev; + } + DBUG_RETURN(error); } @@ -3768,7 +3737,7 @@ Query_log_event::Query_log_event(const c auto_increment_increment(1), auto_increment_offset(1), time_zone_len(0), lc_time_names_number(0), charset_database_number(0), table_map_for_update(0), master_data_written(0), - mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS) + mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS), m_mts_query_ends_group(FALSE) { ulong data_len; uint32 tmp; @@ -4241,22 +4210,17 @@ void Query_log_event::attach_temp_tables if (!mts_is_worker(thd) || !contains_partition_info()) return; + // in over max-db:s case just one special parttion is locked + int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ? + 1 : mts_accessed_dbs); + DBUG_ASSERT(!thd->temporary_tables); - if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) - { - THD *c_thd= mts_get_coordinator_thd(); - mts_move_temp_tables_to_thd(thd, c_thd->temporary_tables); - c_thd->temporary_tables= NULL; - } - else + for (int i= 0; i < parts; i++) { - for (int i= 0; i < mts_accessed_dbs; i++) - { - mts_move_temp_tables_to_thd(thd, - mts_assigned_partitions[i]->temporary_tables); - mts_assigned_partitions[i]->temporary_tables= NULL; - } + mts_move_temp_tables_to_thd(thd, + mts_assigned_partitions[i]->temporary_tables); + mts_assigned_partitions[i]->temporary_tables= NULL; } } @@ -4272,15 +4236,8 @@ void Query_log_event::detach_temp_tables if (!mts_is_worker(thd)) return; - if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) - { - THD *c_thd= mts_get_coordinator_thd(); - /* back to coordinator */ - mts_move_temp_tables_to_thd(c_thd, thd->temporary_tables); - thd->temporary_tables= NULL; - return; - } - + int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ? + 1 : mts_accessed_dbs); /* todo: optimize for a case of @@ -4293,7 +4250,7 @@ void Query_log_event::detach_temp_tables unmodified lists provided that the attach_ method does not destroy references to them. */ - for (int i= 0; i < mts_accessed_dbs; i++) + for (int i= 0; i < parts; i++) { mts_assigned_partitions[i]->temporary_tables= NULL; } @@ -4303,7 +4260,7 @@ void Query_log_event::detach_temp_tables int i; // find which entry to go - for (i= 0; i < mts_accessed_dbs; i++) + for (i= 0; i < parts; i++) if (strcmp(table->s->db.str, mts_accessed_db_names[i]) < 0) continue; else @@ -4317,7 +4274,7 @@ void Query_log_event::detach_temp_tables DBUG_ASSERT(!thd->temporary_tables); #ifndef DBUG_OFF - for (int i= 0; i < mts_accessed_dbs; i++) + for (int i= 0; i < parts; i++) { DBUG_ASSERT(!mts_assigned_partitions[i]->temporary_tables || !mts_assigned_partitions[i]->temporary_tables->prev); === modified file 'sql/log_event.h' --- a/sql/log_event.h 2011-05-27 21:29:14 +0000 +++ b/sql/log_event.h 2011-05-30 09:38:24 +0000 @@ -1038,6 +1038,10 @@ public: the event execution. The indexed data represent the Worker progress status. */ ulong mts_group_cnt; + /** + MTS: associating the event with either an assigned Worker or Coordinator. + */ + Relay_log_info *worker; /* a copy of the main rli value stored into event to pass to MTS worker rli */ ulonglong future_event_relay_log_pos; @@ -1113,6 +1117,9 @@ public: */ virtual uint8 mts_number_dbs() { return 1; } + virtual void set_mts_event_ends_group() { DBUG_ASSERT(0); } + virtual bool get_mts_event_ends_group() { DBUG_ASSERT(0); } + #else Log_event() : temp_buf(0) {} /* avoid having to link mysqlbinlog against libpthread */ @@ -1886,6 +1893,13 @@ public: uchar mts_accessed_dbs; char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN]; + /* + Event can be indentified as a group terminator and such fact + is memoried by the function. + */ + virtual void set_mts_event_ends_group() { m_mts_query_ends_group= TRUE; } + virtual bool get_mts_event_ends_group() { return m_mts_query_ends_group; } + #ifdef MYSQL_SERVER Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, @@ -1988,13 +2002,16 @@ public: /* !!! Public in this pat its self-group. */ bool starts_group() { return !strncmp(query, "BEGIN", q_len); } - bool ends_group() + virtual bool ends_group() { return !strncmp(query, "COMMIT", q_len) || (!strncasecmp(query, STRING_WITH_LEN("ROLLBACK")) && strncasecmp(query, STRING_WITH_LEN("ROLLBACK TO "))); } +private: + + bool m_mts_query_ends_group; }; @@ -2662,7 +2679,7 @@ class Xid_log_event: public Log_event bool write(IO_CACHE* file); #endif bool is_valid() const { return 1; } - bool ends_group() { return TRUE; } + virtual bool ends_group() { return TRUE; } private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual int do_apply_event(Relay_log_info const *rli); === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2011-05-28 10:42:40 +0000 +++ b/sql/rpl_rli.h 2011-05-30 09:38:24 +0000 @@ -445,7 +445,6 @@ public: DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events bool curr_group_seen_begin; // current group started with B-event or not - bool curr_group_isolated; // Trans is exec:d by Worker but in exclusive env volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty volatile long mts_wqs_overrun; // W to incr and decr ulong mts_wqs_underrun_cnt; // Coord goes to sleep when senses Workers are content @@ -454,8 +453,6 @@ public: ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL. ulonglong mts_total_groups; // total event groups distributed in current session - - bool curr_event_is_parallel; // if FALSE the current event is processed by C ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers ulong slave_parallel_workers; // the one slave session time number of workers ulong recovery_parallel_workers; // number of workers while recovering. === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2011-05-28 08:19:43 +0000 +++ b/sql/rpl_rli_pdb.cc 2011-05-30 09:38:24 +0000 @@ -31,7 +31,7 @@ Slave_worker::Slave_worker(const char* t Relay_log_info *rli) : Relay_log_info(FALSE), c_rli(rli), checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0), - inited_group_execed(0), running_status(FALSE) + inited_group_execed(0), running_status(FALSE), last_event(NULL) { checkpoint_relay_log_name[0]= 0; checkpoint_master_log_name[0]= 0; @@ -462,6 +462,7 @@ static void move_temp_tables_to_entry(TH @note modifies CGAP, APH and unlinks @c dbname -keyd temp tables from C's thd->temporary_tables to move them into the entry record. + Caller can opt for a Worker via setting rli->last_assigned_worker. @return the pointer to a Worker struct */ @@ -649,6 +650,9 @@ Slave_worker *get_slave_worker(const cha #ifndef DBUG_OFF else { + // all entries must have been emptied from temps by the caller + DBUG_ASSERT(entry->db_len != 0); + for (TABLE *table= thd->temporary_tables; table; table= table->next) { DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db)); === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2011-05-28 10:42:40 +0000 +++ b/sql/rpl_rli_pdb.h 2011-05-30 09:38:24 +0000 @@ -280,6 +280,7 @@ public: MY_BITMAP group_execed; bool inited_group_execed; volatile bool running_status; // TRUE when Worker is read-exec loop + Log_event *last_event; int init_info(); void end_info(); === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2011-05-28 10:42:40 +0000 +++ b/sql/rpl_slave.cc 2011-05-30 09:38:24 +0000 @@ -80,6 +80,9 @@ ulonglong relay_log_space_limit = 0; const char *relay_log_index= 0; const char *relay_log_basename= 0; +void append_item_to_jobs(slave_job_item *job_item, + Slave_worker *w, Relay_log_info *rli); + /* When slave thread exits, we need to remember the temporary tables so we can re-use them on slave start. @@ -2748,7 +2751,7 @@ int ulong_cmp(ulong *id1, ulong *id2) - Reports errors as needed. - @param ev The event to apply. + @param ptr_ev a pointer to a reference to the event to apply. @param thd The client thread that executes the event (i.e., the slave sql thread if called from a replication slave, or the client @@ -2758,6 +2761,9 @@ int ulong_cmp(ulong *id1, ulong *id2) a replication slave, or the client's thd->rli_fake if called to execute a BINLOG statement). + @note MTS can store NULL to @c ptr_ev location to indicate + the event is taken over by a Worker. + @retval 0 OK. @retval 1 Error calling ev->apply_event(). @@ -2765,10 +2771,11 @@ int ulong_cmp(ulong *id1, ulong *id2) @retval 2 No error calling ev->apply_event(), but error calling ev->update_pos(). */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) +int apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli) { int exec_res= 0; bool skip_event= FALSE; + Log_event *ev= *ptr_ev; DBUG_ENTER("apply_event_and_update_pos"); @@ -2823,6 +2830,75 @@ int apply_event_and_update_pos(Log_event if (sql_delay_event(ev, thd, rli)) DBUG_RETURN(0); exec_res= ev->apply_event(rli); + + if (!exec_res && (ev->worker != rli)) + { + if (ev->worker) + { + Slave_job_item item= {ev}, *job_item= &item; + Slave_worker *w= (Slave_worker *) ev->worker; + bool need_sync= + (ev->mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS) && + ev->get_mts_event_ends_group(); + + DBUG_ASSERT(!(ev->ends_group() || !rli->curr_group_seen_begin) || + ((Slave_worker*) ev->worker) == rli->last_assigned_worker); + + DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id)); + + // Reset mts in-group state + if (ev->ends_group() || !rli->curr_group_seen_begin) + { + // CGAP cleanup + for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--) + delete_dynamic_element(&rli-> + curr_group_assigned_parts, i - 1); + // reset the B-group marker + rli->curr_group_seen_begin= FALSE; + rli->last_assigned_worker= NULL; + } + + if (rli->curr_group_da.elements > 0) + { + /* + the current event sorted out which partion the current group belongs to. + It's time now to processed deferred array events. + */ + for (uint i= 0; i < rli->curr_group_da.elements; i++) + { + Slave_job_item da_item; + get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i); + append_item_to_jobs(&da_item, w, rli); + } + if (rli->curr_group_da.elements > rli->curr_group_da.max_element) + { + // reallocate to less mem + + DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements); + + rli->curr_group_da.elements= rli->curr_group_da.max_element; + rli->curr_group_da.max_element= 0; + freeze_size(&rli->curr_group_da); // restores max_element + } + rli->curr_group_da.elements= 0; + } + + //job_item->data= ev; + /* Notice `ev' instance can be destoyed after `append()' */ + append_item_to_jobs(job_item, w, rli); + + if (need_sync) + { + /* + combination of over-max db:s and end of the current group + forces to wait for the group completion by the assigned worker. + */ + (void) wait_for_workers_to_finish(rli, w); + } + + } + *ptr_ev= NULL; // announcing the event is passed to w-worker + } } else mysql_mutex_unlock(&rli->data_lock); @@ -2842,7 +2918,9 @@ int apply_event_and_update_pos(Log_event See sql/rpl_rli.h for further details. */ int error= 0; - if (skip_event || !rli->is_parallel_exec() || !rli->curr_event_is_parallel) + if (*ptr_ev && + (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) || + skip_event)) { #ifndef DBUG_OFF /* @@ -2958,7 +3036,7 @@ static int exec_relay_log_event(THD* thd */ mysql_mutex_lock(&rli->data_lock); - Log_event * ev = next_event(rli); + Log_event *ev = next_event(rli), **ptr_ev= &ev; DBUG_ASSERT(rli->info_thd==thd); @@ -3032,12 +3110,13 @@ static int exec_relay_log_event(THD* thd };); } - exec_res= apply_event_and_update_pos(ev, thd, rli); + /* ptr_ev can change to NULL indicating MTS coorinator passed to a Worker */ + exec_res= apply_event_and_update_pos(ptr_ev, thd, rli); - if ((!rli->is_parallel_exec() || !rli->curr_event_is_parallel)) + if (*ptr_ev) { - DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_event_is_parallel || - ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT); + DBUG_ASSERT(*ptr_ev == ev); // event remains to belong to Coordinator + /* Format_description_log_event should not be deleted because it will be used to read info about the relay log's format; it will be deleted when @@ -4176,8 +4255,6 @@ int slave_start_workers(Relay_log_info * rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level; rli->mts_total_groups= 0; rli->curr_group_seen_begin= FALSE; - rli->curr_event_is_parallel= FALSE; - rli->curr_group_isolated= FALSE; rli->checkpoint_seqno= 0; /* dyn memory to consume by Coordinator per event --===============1763042862== MIME-Version: 1.0 Content-Type: text/bzr-bundle; charset="us-ascii"; name="bzr/andrei.elkin@stripped" Content-Transfer-Encoding: 7bit Content-Disposition: inline # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: andrei.elkin@stripped # target_branch: file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-\ # mr-wl5569/ # testament_sha1: 09f6158186c2664e439fbe9af0da5dd2fb51c482 # timestamp: 2011-05-30 12:38:37 +0300 # source_branch: file:///home/andrei/MySQL/BZR/2a-23May/mysql-trunk/ # base_revision_id: andrei.elkin@stripped\ # ofiubd6s1y8b9m6e # # Begin bundle IyBCYXphYXIgcmV2aXNpb24gYnVuZGxlIHY0CiMKQlpoOTFBWSZTWUQcxNYADAn/gHdRQgT7//// /+f/yv////5gGE1r5nmvRe8B6BtzufW+189706vXuZVJS+xs276NF8jW2p27qPXb3Xm7e6Wxq8td Ft9waKvY1XPcAO2Vr3higmiZNMnpJ6Cnqfqm0aKfip7UyelPap6npNH6oDQaBoeppoD1A00TTQIB CaTJqanptTU9MjSGhoAAAAAA0aBpoEIqZP1TxRoeoPU9I9TRtTQAAAADQAAACTUkGiJiNTU2JHhT 0jRNkNNT1PU0xBkDQAGgGgYRSagmTQE00p5Mp6TaeqYMQTQ2o9TQybUaaGgAaDTagkSEAJoAmk9A Rqn6BMmSaYm0QeU9IDTQDQBkvgAfYiCCP+GrP7HtOrTytgkimQGlzPjWsa9UNe2W0cq6VxMnxPLI 2yBTPRiRiwh3Yh2CoXC2YMwVIa8Lv7+qcVfgPXkswRvjBL4vXm8u7BhQzpL23vBqt45nCqVk1MjW GhAkKA/FZQHXB88kM0ZI/8vhHAqkrS/xoM3uDt3V1z3SWSztGtO+AVjeNFnEJ6C3PzN/SPMsWX0T Mv+3m/n2TUC/oFfZgHTw2TiZsjsyc11FVkot5VFxips4E3i7KeWPN3133zr3XrDvqkUBfQ1reDSh oxYr2A8IUmkYMWxVQcgUAtyoE1rcNSq3N5mUYk9KUaQJEG6yoHgTkoik5wrzetKUkzHd3MLbZej/ N5ZckCIE8SB2kJCaKwkIiAwkAVOLFqKrQ+ZloxkcGUghM+w7jzez/zxdv8K8pxh4KKEfK7A3QWMx rom/ozNJ56RfVrY/8z5d9p2jb9gvDvFUJtgsgCgCwFIu/2zduLIFtpJOPj7u7x95z0x6tu9q1cCt 9mNjFTIgtPGcUCB3km/QXWVNTombZMULNnNsryJk3VNW7fGjBvnlYtlVZ3tMB0QvWdWyQ0Zlehop yaNQNLljXS+Rgauml60wDVNc6vBwxa1RZky6ujZ1u2SHWKO9pNEWt1zQJqbpZgyzMioGocM2Pg8M gtlVhImDNuyg+KdaJtWPjmdIucaW0+MCiZASorDigzzWMe6IqE9pA3tW6narAM0xY63n64pLhTmh HmIpRYrZ6+vJjdFZ21n8HRi1u2b2sN4fXd7f9q30VwzAgz0xW/kxwcjnvmi9ajfzb5ZDkaXyv2Py lKIIaJg8csgHC5xVRLVqpbnav1LVp2Y9YnMdn78T9WH1inNtUHsB+bSNpZQVdQV7hKZfNGfl5wQd ++jgc/LZh3ESnk3Dduppq3U0oAJyUVSE/5aMpi09xsosfzTWghwrBe/2hCQjIH0zxzZeL/uOAdwz 9Y6WSwfsrOVnlkJHuMGXkMbfLl8TP3LEaeqzHI42cZ99xos0ytm9VCJ+GUO11bO05HTz1Za/hFbd 0VbGP52YVLm2wZmOtJjOm1KeVqHOnK1VnQG1q3e2NQm3k/QBUTrlpe4y4bWO1vcXv9XvG+AZxe+M 3HVgMdxOnfzRNMtK9UzXkPBfjLws23IVOvHEchAb0Ltz8k+irfoJ+DuOq9AEb5Om+Why3huvlVQr nOQ4GqrtWRp3Xr33mz+FxlPNZZZZw+8sjYYb3vkeJePZpnjxrM1l7FsPJt5L6apuE5jkB2aeVVGE jfPlfd2kVWa6YlkhIsgxuvsKcvUkyPRk+suKAQUN+9b9+rMvebjLUhtBmiDLh6NXY/ky+GGOQoql CBsQcf+U84MKNNeDl7H6+sO4+f22kMH/DxVa2XyUGvIggKiZMe0oZ7KFpeYUP4jAkyMdKNoHfVEm QqLAimcNAMVYC61ogZ9YbvZ9vqR+1m1tGXHTnHZwt0VFmDBhUhdQqDF1S89W+3NYIlpaJuCYO8ox N1axgc19a6V0ONmuA68KAW78qBma+xpyhE1FS5mEZCKSOrHiC7a29tjvz4Y5becabPHmpdPSKnzz prsk1Fz8Io444izPZ0vE7GGhRow3IM9kfNZq6dBKBaePQTBVqiSWNaP9DCli6foOwyMOvzg0yDPu zC27tc7D1Avvwvm3J0FLgARl8vhdMFPOEnY5dT07dlrN/dIPwcORKDE4ZWNC42e/5F48cC1wqUDU hOvTOLOQhMnIGUUmNA2A7SDZtHZ6mta3yuK9T2uT0DGyoGa040kEH3/c7O/w7ABVkPhbJKSBST8C B1XqZIHkdGBKcJRXFT9bGTSj9c6VIHWHwECwALsMCoViCpBCBRCgtIpS/MmkZFy7rCJjrgC3FN+M hrQa2w3i0vbCwtwLr6xS5DEtkFd+J0QBxcZh3aBj0C7wI9tgiJcoxRHOcSXbQwVea+0TNF9+yxQJ PkHCGAOIDBElVAg6MYsi2mUgHilJB5BVqUpWUuW1ayZFFystOlufUXjfjHU4O1wAsBPWichwns5Z QSIqiZwMaRE+KXgTTqrR+DntA1j1MIwgtuQQWk76jfrFkUN4ZmwWZrOEgVshVHAuHpRNBpqoA6ED fZcyG0WUjIuobZdy8oYOZUWKSEA8vJWvdfdpu6ZZTWNiakzUAV7aRoyRmNSALgVI+KpDHZiWgYoJ FYCwsykOGFZMjEy5porYasYvqAgFRhm82iTwOFKILCwzHlaGU0G463aYGQCg8g9LgOYCIM430DLC gyUUEzec2aZ2lNZAgAQjlozQDnJAOgNJ7c17QQ9gRlhLg1WjNgpK00YwcLaKB0gAAr2DbgXX5m6y y6qYyQtpBdqVsC1reLiqhcy1Iuwt0X6CydJ6q5GYBRN0cJFcwKySi3I2Qq0mTEwhRH4ZLyE5zdKp zGTDII6kg1qWZKbu4EhGalNxZXkUg7BbUtORCsMDwK3q0mn4haOC0GjMG3psUHVW4IPEouZ6UlDk FKQoZM1oEINYdAFkqIDCNPIWF1Ba8cD+fZa0Gn4FTT4S7M5o4tGsx2ElmPMdSByLEi+yhzHDaWsZ mTO9epXLxq+/fk4eUs5AcAybteu4ShmlakdhqdBGgYN8YSrEpsnJQ4oGCO4kuVRCyy55uJmQmZDQ s65zasDrYAbCKmGlZGJ8KXIOY2F6TFjoNDyxjhfsc9VhZo1xGpk7EAVxK8CB5lGMk9IRaYHAuxDS WuVRgSfDuwJHRUdlFgdO9HgWxcEZiAu4R+fZ8/HTYwAIaoYaC7E4Fhb0rmCpdoBuji9m4nxtiqlU ozlpMolXpiicSHpQKiVBEG6Ky5o0OAwABDGwZQ2A0ZqxNVGNg4Vspht2e44D5CGlIc4gajwRKCBU C3j5uOT7Z0VTkowJxJb60zZ5sgFJnINSA4qEIWgX1dyRMACrqgHPn4SGYxCAUvh5DPlBxjGgVcxQ XGQ7pJePof+vp2nYdluvMUkYRjse2TEZqUjYcBbrKiLpgFRJCgBgi0l8QeESx3FKvfazygDXmOgg HW05+2rgniKpgHYx6dppNVReRRskKLENFlh8tGYOgcIVxRT2onwolPTjFhLKX3vxvodUHoYfDOsy KUA4Gyu608cJUzsuxnS0yu7RckaBXRIIbbEXoPF6jyMJKHCpdALQcQjAfQPr9RMfUJ6/8Tqdqzpg kM8CJSXpIAuce+AYQ/0zkfDwDwviQ4Nd1s/vh2ERCQr2xkSEwX0Ysl1xKf6ReD6LfZdT2USxz98x /tQS8H81sTJr4pL0gIT1U3TqYX0SpLoGpI389Aqgwb41ztXCWhCS246VCZhtEnEJ8yU0710/3Mtc z7tw2BbKTR1iIRFGPCoLwVAVNTzS5I25EMacjyv3bBOBIvxpj20yprSy9OQYfeaD2nzGyoHj5TvE NC2v0nihlfpC1S4KSXSDMJNnnpUOC2dBUYslatFchC4DFVCsYC3GE9CPmYx2BSpXA0E70vfhalmF UNyLRcRExSDgKfNBSF9suTJJgY0Gu7UzsN78mptBNLkxpBWiqJSKCtTgewTBIG76cjA7yhFZ7yKY CUjWaz3Hu+Hz0ofjVx7veT9/1F5prXOafqLD7KzEuMDo7cEcMD40UJ9GHOIXN4LQ0r85BAaDRKiV aDou99xKfvVix/WHX4mQRlLHXaM+ihjOtBbXWDIChVKmoetwodB7iwNP4qCy3BKaTo+SeeAuIFTy 8buZUBgDonCM2I44k149YFCQvt2Aok4dhD5dgWdtO+3thnu2W8VLR1JLbHImeVkXqVcXDhhcIWuJ dEr8fJk2aignGTuGwyl4BJ9h2yZIkEheVd51nwd5eliaAHkWF1ZtuI70pI3UjoYdaZzOb7NWWteN lOKJ7z1dLbEQQQHddZwcrcLuqnySSDs+4nSSp2wg82PEzGdJ6S26DpLr3mfTmSrCBBqqNbG3+5Ct NJC6M0crl2GFzo7CuMpJ8r+Vzr7CLguQYB1zmg+5fZRHkGeC2Df0SVishCe8J3OMiNZ7cbrbk3RH lrshdp1OlyGOMHHpilSQT/2lA1hgZ+MovY81gphYRQVliq6amscTAoIbYVC8DnMm/JBCiTO0VBeU 06Tlz8cPMYLvGG8fGom8Duk5YcdgHeaGWeQkjJQICRi5As9Z9pvOjmPQgkxD6pWCNi1FpzraTXFb jSus/xxMkU2OyudnZFVVJEq9sLUliFcuqsxHJIWDQF07TSO3rOqIIOmJOUum6k0ehqjR6OscIOtJ olc3prdy0zczrarvANKBnUF5SsQqj31pb3fbRxWOIvtyRIANhZnh4DoF5pUHQWXNGSmHh8hmdJTS OscWFUDTfnQDpcFJHFKmBl1JuRlguTWSPfSXXISEJDP/EmBP+FE+vP3O/VV2C5xtIWkRPIiV4eSU LAUwQIhITEIHFAN22hEadkA+d9rVbQbbKwVpfbMqPWy6iQsM61jZbj1OffFLQ+cNHiF5APpvIED3 VjHkBAvMM4JQ30PBr56YYI2oTtKEM5C5FEpeAZNd/ZB92poqxgoFmKVVQgiAUS3PhhL/bq0RRhkB tvN29IcvBZVm17VZbJ1xJSKMCGLnnAad1cjR2WSAKDRbw7saiDVWUgYjoK8kG/edJ6PTN43AoW7W RklzRCiarKwuPxePMWFEmIBoFdswwgdX1IfedqeufUhYt6KKyagITqwJ2JSSaCBCXKf11eE3NX6e m64LTeR4ub1lyEwDb5gW8x8B+IdovkO49Wuhal6D0nxEyZWbTz5SoMl65Gw+DSs9saL8o8/CMDau 8RuLGf3XIPhd8a3gh2ENxzUMenjdDXWadq2TqYK6IIhDvMKHn8SpUkJ+r4VF4AzB42lV2JKTvCy4 tDzXQnUEOYlye6izbIBj6pw+Mlp8no2+9OuKK01FJ+F7hCFeiwa9FKiwXLRJKaKInkgGIXXW3sE+ 0wlsb7ITbXX3GELmviRWBw4isYWYhIYhRkRhIksZq8EB0siYX2xr5aJ3GdvulFyK9yq5qy/wZdXF dMpqRPgpHo6oCbEtZy2FNOjTAZSZLfx7Lmg8kxSO1rW4xJkBO/zPjateYd2FDXVMasNFjS4DZAhV 8+PO9jK6kD3tdUBvpgZVy3tj3MnzwUKpZmZcLZxBowirJVG9Y8DFZcdQcLiEW9PPI6vAgCkESkiB HEzp022SAyk4EbERoyxtECA+m9BSkQcZcCbS+RkMLUGSeYdGaVtIwSlzuz5VivA5eDsua15t+fLN QTnfE9Z2nr0z1jmoKvG6VG9CVWtly7ACqt2CkQ1D3OJMe+PyCVAlagpJcn7kA0Tl6yPGsbk+cUFZ Un6tx7WmsnueKmuJBSED9zbtuyMyQluhWo4FKK/Yt4CUI8ZvPfUIWhyI8cC1qqO3LA68syXskuqI UTRMxb2WJrK5YgGNH99WVeW2vV1/KoAvxBu2MHJNAmvSlEA7TeFgXmonKnrS1HqDHITPR4US+ZMQ ySkOvAcC2lpbXzOsbyg3XOSm7JdBjW3GSYxhnUtAW9w19/ijFKPDKQNd4nj24YN45L7IHLOTIJwo LbZR4hhCuAXAZXkhg0BiSU4qKoQhAjJRTrJ5S0mtBmZD0YpQFkW0CgqSoZ7LEJhALBe8pJOa1ttc XSEmCdbpLNFc8y7NXEnkd19f0HOGdrk9M3E5JqhpwqylRYopBgkRJJEibGhBZIcNOjaXhkD64eyi ST5WFC9hPOWk9UDVDw+h5eiFX8lzG5A6ZMg9yGkMbGL13I70UAN6Ve3NVAHx3jxQJTzoc5dgTZxt Tbq3ypMjuk88K2TM47iVGmj+jYthRR2kJUDaW5oW/O+Gyen9rFBASRVRkWSZTbyOnVhWkYHx+eVx 9qcs3fssHNFAFt23V6hhQgdVqgw8SF2MqfrYLuatnqEHWmAYEDcn0sxMg6nzJEBPIRkKJj+5kwIZ Jee44cBcD5Rc62oDEzussIGg9hRXbqFDvyFdKFpAFRgC3rQig19bF3BxS1sPB7grjcZbgcVxtP5u yp6ZJ3x6STbOcmR4Q40RIgOxpGCgiooryNKRGBSQyQskJ0hOggVC0IHNlObLAy7gCjdEL9NXpKqz 4Iahfa3t0+RIXPL4x4X2EMGktOLpOyOc/Uc/3zlKAnyl4ZhHtaOnt5rSds4cvTy81L72tjJUJI3D fvqXbHztUUQe36pSI1k/rmiK4YBcsjQLkgl2C0afAWSOPQvmAJYrrPoZ9FL1VS1VPJIHRu4FoVUn dVKYCKUAJYJuRih4chkDCFfXQoyP/JvwyUFMKTcr8/j0IZaqBLqxZL1C7Al3e8i19WyUrydGGBJo oXEtCIzwVWmeCmPLL9q915Rw01iX3EUuYVrVgC0pVS4DbOc80spVEMXFVylFT3SrsJruZaca0rhW 21MQMbL60E6JfbiBlK5PtYtvnW9Yox5C5POIiIiIiYBM1Xi3yoKYlsGJiFUAWgGrasaPZOJIK9hc ZzRF6B0LbO+U1gyp1SVhZPouKdUCo54wSOmMnXlDcumJNE5wKpoyrpUzGiudUQrGVTj8zSvujk1R guK9KgFRiwFSEoqZaSFsolBQFCEVBqNTN0Win9SZc9JmQFrOlO+UJN5utILsD0EYbmaEzVE8WC02 gXCci7lAl42l52AU1DG42NtHaK2mD46iUgedfOV8dUats5KbLsowoF7Ca11B3KdMEuTAhpBtSfPs JwkMYm0JlVlRJu2d1qXZRQjFiWIlKAVOWJWu6UI2N9+bzOkew7Q4Lm2RzmnOXnhF/WrrOKrLTp17 696M7KjHhSY00kOISQzYKsNRm0G3gkiK8D8sFHQjTkLzC4gjgJpatHCikgvgSK088KLpYSQtt2ON morgigwhZwScDuAKVNP009TXte0TkxVblCXBOxcYA4vt8RLvSIxQMNahZsiJTFMz1g2c9E/DaHsS pzqR4RNJZ9PNpgtjmmiJOY2vrSgIAkQw2wU10nQOICjyGW4G9UYk1o1lmiU3xl82s7g4HzrwBQAd DJmQo33g1P+LuSKcKEgiDmJrAA== --===============1763042862==--