From: Rohit Kalhans Date: January 18 2013 1:33pm Subject: bzr push into mysql-trunk-wl6314 branch (rohit.kalhans:5366 to 5368) WL#6314 List-Archive: http://lists.mysql.com/commits/145617 Message-Id: <201301181333.r0IDXopC003058@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 5368 Rohit Kalhans 2013-01-18 WL#6314: Intra-schema multi-threaded slave - Implemented group id assignment on the slave side - Associated BGC based group with workers threads. modified: sql/binlog.cc sql/handler.cc sql/log_event.cc sql/log_event.h 5367 Rohit Kalhans 2013-01-18 WL#6314: Intra-schema multi-threaded slave - Implemented state clock for prepare & commit sequencing - Added new fields in status var section of Query log events to store the prepare and commit logical timestamp. - update to these timestamps will be done in do_write_cache(). modified: sql/binlog.cc sql/binlog.h sql/handler.cc sql/log_event.cc sql/log_event.h sql/rpl_rli.cc sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/sql_class.cc sql/sql_class.h 5366 Rohit Kalhans 2013-01-14 WL#6314: Intra-schema multi-threaded slave Updated the branch name. Added variable to choose the type of mts @ .bzr-mysql/default.conf updated tree name added: mysql-test/suite/sys_vars/r/mts_parallel_type_basic.result mysql-test/suite/sys_vars/t/mts_parallel_type_basic.test modified: .bzr-mysql/default.conf sql/log.h sql/mysqld.cc sql/mysqld.h sql/sys_vars.cc === modified file 'sql/binlog.cc' --- a/sql/binlog.cc revid:rohit.kalhans@stripped +++ b/sql/binlog.cc revid:rohit.kalhans@stripped @@ -1200,10 +1200,9 @@ binlog_trx_cache_data::truncate(THD *thd static int binlog_prepare(handlerton *hton, THD *thd, bool all) { /* - do nothing. just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::commit() + switch to 1pc. Real work will be done in MYSQL_BIN_LOG::commit() + We will however step the prepare clock here. */ return 0; } @@ -4998,7 +4997,7 @@ uint MYSQL_BIN_LOG::next_file_id() events prior to fill in the binlog cache. */ -int MYSQL_BIN_LOG::do_write_cache(IO_CACHE *cache) +int MYSQL_BIN_LOG::do_write_cache(THD* thd, IO_CACHE *cache) { DBUG_ENTER("MYSQL_BIN_LOG::do_write_cache(IO_CACHE *)"); @@ -5022,6 +5021,7 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); uchar buf[BINLOG_CHECKSUM_LEN]; + bool pc_fixed= false; // while there is just one alg the following must hold: DBUG_ASSERT(!do_checksum || @@ -5173,6 +5173,18 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len uchar *log_pos= ev + LOG_POS_OFFSET; + /* fix prepare & commit (pc) timestamp */ + if (!pc_fixed) + { + uchar* pc_ptr= ev + QUERY_HEADER_LEN + thd->prepare_commit_offset; + // fix prepare ts + int8store(pc_ptr, thd->prepare_seq_no); + pc_ptr+= 8; + //fix commit ts + int8store(pc_ptr, thd->commit_seq_no); + pc_fixed= true; + } + /* fix end_log_pos */ val= uint4korr(log_pos) + group + (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); @@ -5215,6 +5227,7 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC hdr_offs -= length; } + /* Write the entire buf to the binary log file */ if (!do_checksum) if (my_b_write(&log_file, cache->read_pos, length)) @@ -5342,7 +5355,7 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd { DBUG_EXECUTE_IF("crash_before_writing_xid", { - if ((write_error= do_write_cache(cache))) + if ((write_error= do_write_cache(thd, cache))) DBUG_PRINT("info", ("error writing binlog cache: %d", write_error)); flush_and_sync(true); @@ -5350,7 +5363,7 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd DBUG_SUICIDE(); }); - if ((write_error= do_write_cache(cache))) + if ((write_error= do_write_cache(thd, cache))) goto err; if (incident && write_incident(thd, false/*need_lock_log=false*/, @@ -6744,7 +6757,9 @@ static int binlog_start_trans_and_stmt(T Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), is_transactional, FALSE, TRUE, 0, TRUE); if (cache_data->write_event(thd, &qinfo)) + { DBUG_RETURN(1); + } } DBUG_RETURN(0); @@ -8214,6 +8229,38 @@ void THD::issue_unsafe_warnings() } /** +SYNOPSIS: + Atomically steps state clock. + @parms: NONE + @ret_val: current timestamp + */ +int64 +Logical_clock_state::step_clock() +{ + int64 retval; + DBUG_ENTER("Logical_clock_state::step_clock"); + my_atomic_rwlock_wrlock(&state_LOCK); + (void) my_atomic_add64(&state, step); + retval= my_atomic_load64(&state); + my_atomic_rwlock_wrunlock(&state_LOCK); + DBUG_RETURN(retval); +} + +/** +SYNOPSIS: + Atomically fetch the current state + */ +int64 +Logical_clock_state::get_timestamp() +{ + int64 retval= 0; + my_atomic_rwlock_rdlock(&state_LOCK); + retval= my_atomic_load64(&state); + my_atomic_rwlock_rdunlock(&state_LOCK); + return retval; +} + +/** Log the current query. The query will be logged in either row format or statement format === modified file 'sql/binlog.h' --- a/sql/binlog.h revid:rohit.kalhans@stripped +++ b/sql/binlog.h revid:rohit.kalhans@stripped @@ -26,6 +26,23 @@ class Master_info; class Format_description_log_event; /** + Logical timestamp generator for binlog Prepare stage. + */ +class Logical_clock_state +{ +private: + my_atomic_rwlock_t state_LOCK; + volatile int64 state; + int64 step; +protected: + inline void init(){ state= 1; step= 1;} +public: + Logical_clock_state(){init();} + int64 step_clock(); + int64 get_timestamp(); +}; + +/** Class for maintaining the commit stages for binary log group commit. */ class Stage_manager { @@ -265,7 +282,6 @@ class MYSQL_BIN_LOG: public TC_LOG, priv mysql_cond_t update_cond; ulonglong bytes_written; IO_CACHE index_file; - char index_file_name[FN_REFLEN]; /* crash_safe_index_file is temp file used for guaranteeing index file crash safe when master server restarts. @@ -374,6 +390,15 @@ public: bool is_relay_log; ulong signal_cnt; // update of the counter is checked by heartbeat uint8 checksum_alg_reset; // to contain a new value when binlog is rotated + + /* clock to timestamp the binlog prepares */ + Logical_clock_state prepare_clock; + + /* Clock to timestamp the commits */ + Logical_clock_state commit_clock; + + char index_file_name[FN_REFLEN]; + /* Holds the last seen in Relay-Log FD's checksum alg value. The initial value comes from the slave's local FD that heads @@ -560,7 +585,7 @@ public: bool write_event(Log_event* event_info); bool write_cache(THD *thd, class binlog_cache_data *binlog_cache_data); - int do_write_cache(IO_CACHE *cache); + int do_write_cache(THD* thd, IO_CACHE *cache); void set_write_error(THD *thd, bool is_transactional); bool check_write_error(THD *thd); === modified file 'sql/handler.cc' --- a/sql/handler.cc revid:rohit.kalhans@stripped +++ b/sql/handler.cc revid:rohit.kalhans@stripped @@ -1414,6 +1414,12 @@ int ha_commit_trans(THD *thd, bool all) goto end; } + if (!thd->prepare_seq_written) + { + thd->prepare_seq_no= mysql_bin_log.prepare_clock.step_clock(); + thd->prepare_seq_written= true; + } + if (!trans->no_2pc && (rw_ha_count > 1)) error= tc_log->prepare(thd, all); } @@ -1423,6 +1429,17 @@ int ha_commit_trans(THD *thd, bool all) error= 1; goto end; } + + /* + Here we step the commit clock to keep a track of all committed + transactions + */ + if (!error && (all || trans->has_modified_non_trans_table())) + { + thd->commit_seq_no= mysql_bin_log.commit_clock.step_clock(); + thd->prepare_seq_written= false; + } + DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); end: if (release_mdl && mdl_request.ticket) === modified file 'sql/log_event.cc' --- a/sql/log_event.cc revid:rohit.kalhans@stripped +++ b/sql/log_event.cc revid:rohit.kalhans@stripped @@ -2656,8 +2656,59 @@ bool Log_event::contains_partition_info( return res; } +/* + The function is called when being assigned group's descriptor + is just allocated. It fills in the descriptor struct an identifier + of the parent group using the prepare_seq_no and commit_seq_no. +*/ +inline void mts_assign_parent_group_id(Log_event *ev, Relay_log_info *rli) +{ + Slave_committed_queue *gaq= rli->gaq; + + if (rli->mts_parallel_type == MTS_PARALLEL_TYPE_BGC) + { + /* + A group id updater must satisfy the following: + - A query log event ("BEGIN" ) + - ev->prepare_seq_no > 0 + - ev->commit_seq_no > 0 +TODO: The code below is an outline marker and later we will use the +taxonomical inferences from the two seq_numbers that we got from the master +during BGC to assign the slave side group id. + */ + if (ev->get_type_code() == QUERY_EVENT) + { + rli->mts_last_known_parent_group_id= + gaq->get_job_group(rli->gaq->assigned_group_index)->parent_seqno= + rli->mts_groups_assigned - 1; + } + else + { + gaq->get_job_group(rli->gaq->assigned_group_index)->parent_seqno= + rli->mts_last_known_parent_group_id; + } + } +} + /** The method maps the event to a Worker and return a pointer to it. + The rest of sending the event to the Worker is done by the caller. + + Irrespective of the type of Group marking (DB partioned or BGC) the + following holds truw: + + - to recognize the beginning of a group to allocate the group descriptor + and queue it; + - to associate an event with a Worker (which also handles possible conflicts + detection and waiting for their termination); + - to finalize the group assignement when the group closing event is met. + + When parallelization mode is BGC-based the partitioning info in the event + is simply ignored. Thereby association with a Worker does not require + Assigned Partition Hash of the partitioned method. + This method is not interested in all the taxonomy of the event group + property, what we care about is the boundaries of the group. + As a part of the group, an event belongs to one of the following types: B - beginning of a group of events (BEGIN query_log_event) @@ -2691,6 +2742,7 @@ bool Log_event::contains_partition_info( There's few memory allocations commented where to be freed. @return a pointer to the Worker struct or NULL. + TODO:The BGC-based parallelization still has to address lookup for temp tables. */ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) @@ -2752,8 +2804,9 @@ Slave_worker *Log_event::get_slave_worke // mark the current group as started with explicit B-event rli->mts_end_group_sets_max_dbs= true; rli->curr_group_seen_begin= true; + mts_assign_parent_group_id(this, rli); } - + if (is_gtid_event(this)) // mark the current group as started with explicit Gtid-event rli->curr_group_seen_gtid= true; @@ -2763,20 +2816,47 @@ Slave_worker *Log_event::get_slave_worke } else { + // The block is a result of not making GTID event as group starter. + // TODO: Make GITD event as B-event that is starts_group() to + // return true. + Log_event *ptr_curr_ev= this; // B-event is appended to the Deferred Array associated with GCAP insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev); rli->curr_group_seen_begin= true; rli->mts_end_group_sets_max_dbs= true; + mts_assign_parent_group_id(this, rli); DBUG_ASSERT(rli->curr_group_da.elements == 2); DBUG_ASSERT(starts_group()); return ret_worker; } } - // mini-group representative - - if (contains_partition_info(rli->mts_end_group_sets_max_dbs)) + if (rli->mts_parallel_type == MTS_PARALLEL_TYPE_BGC) + { + ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index); + + /* + data_lock is held for short time of updating exec coordinates. + An alternative could be run a lighter version of the funcition + that would defer exec coordinates updating, e.g raising a + special flag in rli->gam.lwm. + TODO: check once per group + */ + while (ptr_group->parent_seqno > rli->gaq->lwm.total_seqno) + (void) mts_checkpoint_routine(rli, 0, true, true /*need_data_lock=true*/); + + // compute worker, todo: consider to generelize get_least_occupied_worker() + ret_worker= (rli->last_assigned_worker) ? rli->last_assigned_worker : + *(Slave_worker **) dynamic_array_ptr(&rli->least_occupied_workers, 0); + if (!ret_worker) + ret_worker= get_least_occupied_worker(&rli->workers); + ptr_group->worker_id= ret_worker->id; + // "fixing" temp tables: + if (get_type_code() == QUERY_EVENT) + static_cast(this)->mts_accessed_dbs= 0; + } + else if (contains_partition_info(rli->mts_end_group_sets_max_dbs)) { int i= 0; num_dbs= mts_number_dbs(); @@ -3458,6 +3538,22 @@ bool Query_log_event::write(IO_CACHE* fi } /* + We store 0 in the following two status vars since we don't have the prepare + and the commit timestamps. The locgical timestamps will be updated in the + do_write_cache. + */ + if (thd) + { + thd->prepare_commit_offset= (int)(start-start_of_status); + *start++= Q_PREPARE_TS; + memset(start, 0, 8); + start+= 8; + + *start++= Q_COMMIT_TS; + memset(start,0,8); + start+= 8; + } + /* NOTE: When adding new status vars, please don't forget to update the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function code_name() in this file. @@ -3832,6 +3928,8 @@ code_name(int code) case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE"; case Q_UPDATED_DB_NAMES: return "Q_UPDATED_DB_NAMES"; case Q_MICROSECONDS: return "Q_MICROSECONDS"; + case Q_PREPARE_TS: return "Q_PREPARE_TS"; + case Q_COMMIT_TS: return "Q_COMMIT_TS"; } sprintf(buf, "CODE#%d", code); return buf; @@ -4094,6 +4192,17 @@ Query_log_event::Query_log_event(const c DBUG_VOID_RETURN; break; } + + case Q_PREPARE_TS: + CHECK_SPACE(pos, end, 8); + prepare_seq_no= (int64)uint8korr(pos); + pos+= 8; + + case Q_COMMIT_TS: + CHECK_SPACE(pos, end, 8); + commit_seq_no= (int64)uint8korr(pos); + pos+= 8; + default: /* That's why you must write status vars in growing order of code */ DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\ === modified file 'sql/log_event.h' --- a/sql/log_event.h revid:rohit.kalhans@stripped +++ b/sql/log_event.h revid:rohit.kalhans@stripped @@ -303,6 +303,7 @@ struct sql_ex_info /* type, db_1, db_2, ... */ \ 1U + (MAX_DBS_IN_EVENT_MTS * (1 + NAME_LEN)) + \ 3U + /* type, microseconds */ + \ + 1U+ 8U + 8U /* type, prepare & commit timestamp */ + \ 1U + 16 + 1 + 60/* type, user_len, user, host_len, host */) #define MAX_LOG_EVENT_HEADER ( /* in order of Query_log_event::write */ \ LOG_EVENT_HEADER_LEN + /* write_header */ \ @@ -390,6 +391,15 @@ struct sql_ex_info #define Q_MICROSECONDS 13 +/* + Q_PREPARE_TS and Q_COMMIT_TS status variables stores the pepare + timestamp when the transaction entered the binlog prepare and commit + phases respectively. These wll be used to apply transactions in parallel + on the slave. + */ +#define Q_PREPARE_TS 14 +#define Q_COMMIT_TS 15 + /* Intvar event post-header */ /* Intvar event data */ @@ -2185,6 +2195,12 @@ public: /* !!! Public in this pat !strncasecmp(query, "SAVEPOINT", 9) || !strncasecmp(query, "ROLLBACK", 8); } + /* + Prepare and commit sequence number. will be set to 0 if the event is not a + transaction starter. + */ + int64 prepare_seq_no; + int64 commit_seq_no; /** Notice, DDL queries are logged without BEGIN/COMMIT parentheses and identification of such single-query group === modified file 'sql/rpl_rli.cc' --- a/sql/rpl_rli.cc revid:rohit.kalhans@stripped +++ b/sql/rpl_rli.cc revid:rohit.kalhans@stripped @@ -98,8 +98,9 @@ Relay_log_info::Relay_log_info(bool is_s checkpoint_group(opt_mts_checkpoint_group), recovery_groups_inited(false), mts_recovery_group_cnt(0), mts_recovery_index(0), mts_recovery_group_seen_begin(0), - mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false), - rli_description_event(NULL), + mts_group_status(MTS_NOT_IN_GROUP), + mts_parallel_type(MTS_PARALLEL_TYPE_DB_NAME), + reported_unsafe_warning(false), rli_description_event(NULL), sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), error_on_rli_init_info(false) { @@ -151,7 +152,9 @@ void Relay_log_info::init_workers(ulong Parallel slave parameters initialization is done regardless whether the feature is or going to be active or not. */ - mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0; + mts_last_known_parent_group_id= + mts_groups_assigned= mts_events_assigned= + pending_jobs= wq_size_waits_cnt= 0; mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; mts_last_online_stat= 0; my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4); === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h revid:rohit.kalhans@stripped +++ b/sql/rpl_rli.h revid:rohit.kalhans@stripped @@ -582,7 +582,7 @@ public: MTS statistics: */ ulonglong mts_events_assigned; // number of events (statements) scheduled - ulong mts_groups_assigned; // number of groups (transactions) scheduled + ulonglong mts_groups_assigned; // number of groups (transactions) scheduled volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess_cnt increments ulong wq_size_waits_cnt; // number of times C slept due to WQ:s oversize /* @@ -601,6 +601,14 @@ public: time_t mts_last_online_stat; /* end of MTS statistics */ + /* MTS type */ + enum_mts_parallel_type mts_parallel_type; + /* + Slave side local seq_no identifying a parent group that being + the scheduled transaction is considered to be dependent + */ + ulonglong mts_last_known_parent_group_id; + /* most of allocation in the coordinator rli is there */ void init_workers(ulong); === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc revid:rohit.kalhans@stripped +++ b/sql/rpl_rli_pdb.cc revid:rohit.kalhans@stripped @@ -1843,7 +1843,9 @@ int slave_worker_exec_job(Slave_worker * worker->curr_group_seen_begin= true; // The current group is started with B-event worker->end_group_sets_max_dbs= true; } - else if (!is_gtid_event(ev)) + else if (!is_gtid_event(ev) && + // no need to address partioning in BGC mode + (rli->mts_parallel_type != MTS_PARALLEL_TYPE_BGC)) { if ((part_event= ev->contains_partition_info(worker->end_group_sets_max_dbs))) === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h revid:rohit.kalhans@stripped +++ b/sql/rpl_rli_pdb.h revid:rohit.kalhans@stripped @@ -152,7 +152,6 @@ typedef struct st_slave_job_group my_off_t group_relay_log_pos; // filled by W ulong worker_id; Slave_worker *worker; - ulonglong total_seqno; my_off_t master_log_pos; // B-event log_pos /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */ @@ -164,7 +163,11 @@ typedef struct st_slave_job_group volatile uchar done; // Flag raised by W, read and reset by Coordinator ulong shifted; // shift the last CP bitmap at receiving a new CP time_t ts; // Group's timestampt to update Seconds_behind_master - + /** + BGC-based parallelization + */ + ulonglong parent_seqno; // parent group id + ulonglong total_seqno; // current group id /* Coordinator fills the struct with defaults and options at starting of a group distribution. @@ -183,6 +186,7 @@ typedef struct st_slave_job_group checkpoint_relay_log_pos= 0; checkpoint_seqno= (uint) -1; done= 0; + parent_seqno= 0; } } Slave_job_group; === modified file 'sql/sql_class.cc' --- a/sql/sql_class.cc revid:rohit.kalhans@stripped +++ b/sql/sql_class.cc revid:rohit.kalhans@stripped @@ -1043,6 +1043,9 @@ THD::THD(bool enable_plugins) #ifndef DBUG_OFF gis_debug= 0; #endif +#ifndef MYSQL_CLIENT + prepare_seq_written= false; +#endif } === modified file 'sql/sql_class.h' --- a/sql/sql_class.h revid:rohit.kalhans@stripped +++ b/sql/sql_class.h revid:rohit.kalhans@stripped @@ -2296,6 +2296,16 @@ public: /* MTS: method inserts a new unique name into binlog_updated_dbs */ void add_to_binlog_accessed_dbs(const char *db); + /* MTS: Trans prepare timestamp */ + int64 prepare_seq_no; + bool prepare_seq_written; + + /* MTS: Trans commit timestamp */ + int64 commit_seq_no; + + /* MTS: offset to the prepare & commit seq. no. in the status vars */ + int prepare_commit_offset; + #endif /* MYSQL_CLIENT */ public: No bundle (reason: useless for push emails).