5368 Rohit Kalhans 2013-01-18
WL#6314: Intra-schema multi-threaded slave
- Implemented group id assignment on the slave side
- Associated BGC based group with workers threads.
modified:
sql/binlog.cc
sql/handler.cc
sql/log_event.cc
sql/log_event.h
5367 Rohit Kalhans 2013-01-18
WL#6314: Intra-schema multi-threaded slave
- Implemented state clock for prepare & commit sequencing
- Added new fields in status var section of Query log events
to store the prepare and commit logical timestamp.
- update to these timestamps will be done in do_write_cache().
modified:
sql/binlog.cc
sql/binlog.h
sql/handler.cc
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/sql_class.cc
sql/sql_class.h
5366 Rohit Kalhans 2013-01-14
WL#6314: Intra-schema multi-threaded slave
Updated the branch name.
Added variable to choose the type of mts
@ .bzr-mysql/default.conf
updated tree name
added:
mysql-test/suite/sys_vars/r/mts_parallel_type_basic.result
mysql-test/suite/sys_vars/t/mts_parallel_type_basic.test
modified:
.bzr-mysql/default.conf
sql/log.h
sql/mysqld.cc
sql/mysqld.h
sql/sys_vars.cc
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc revid:rohit.kalhans@stripped
+++ b/sql/binlog.cc revid:rohit.kalhans@stripped
@@ -1200,10 +1200,9 @@ binlog_trx_cache_data::truncate(THD *thd
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
/*
- do nothing.
just pretend we can do 2pc, so that MySQL won't
- switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::commit()
+ switch to 1pc. Real work will be done in MYSQL_BIN_LOG::commit()
+ We will however step the prepare clock here.
*/
return 0;
}
@@ -4998,7 +4997,7 @@ uint MYSQL_BIN_LOG::next_file_id()
events prior to fill in the binlog cache.
*/
-int MYSQL_BIN_LOG::do_write_cache(IO_CACHE *cache)
+int MYSQL_BIN_LOG::do_write_cache(THD* thd, IO_CACHE *cache)
{
DBUG_ENTER("MYSQL_BIN_LOG::do_write_cache(IO_CACHE *)");
@@ -5022,6 +5021,7 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC
ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
uchar buf[BINLOG_CHECKSUM_LEN];
+ bool pc_fixed= false;
// while there is just one alg the following must hold:
DBUG_ASSERT(!do_checksum ||
@@ -5173,6 +5173,18 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC
uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;
+ /* fix prepare & commit (pc) timestamp */
+ if (!pc_fixed)
+ {
+ uchar* pc_ptr= ev + QUERY_HEADER_LEN + thd->prepare_commit_offset;
+ // fix prepare ts
+ int8store(pc_ptr, thd->prepare_seq_no);
+ pc_ptr+= 8;
+ //fix commit ts
+ int8store(pc_ptr, thd->commit_seq_no);
+ pc_fixed= true;
+ }
+
/* fix end_log_pos */
val= uint4korr(log_pos) + group +
(end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
@@ -5215,6 +5227,7 @@ int MYSQL_BIN_LOG::do_write_cache(IO_CAC
hdr_offs -= length;
}
+
/* Write the entire buf to the binary log file */
if (!do_checksum)
if (my_b_write(&log_file, cache->read_pos, length))
@@ -5342,7 +5355,7 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd
{
DBUG_EXECUTE_IF("crash_before_writing_xid",
{
- if ((write_error= do_write_cache(cache)))
+ if ((write_error= do_write_cache(thd, cache)))
DBUG_PRINT("info", ("error writing binlog cache: %d",
write_error));
flush_and_sync(true);
@@ -5350,7 +5363,7 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd
DBUG_SUICIDE();
});
- if ((write_error= do_write_cache(cache)))
+ if ((write_error= do_write_cache(thd, cache)))
goto err;
if (incident && write_incident(thd, false/*need_lock_log=false*/,
@@ -6744,7 +6757,9 @@ static int binlog_start_trans_and_stmt(T
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"),
is_transactional, FALSE, TRUE, 0, TRUE);
if (cache_data->write_event(thd, &qinfo))
+ {
DBUG_RETURN(1);
+ }
}
DBUG_RETURN(0);
@@ -8214,6 +8229,38 @@ void THD::issue_unsafe_warnings()
}
/**
+SYNOPSIS:
+ Atomically steps state clock.
+ @parms: NONE
+ @ret_val: current timestamp
+ */
+int64
+Logical_clock_state::step_clock()
+{
+ int64 retval;
+ DBUG_ENTER("Logical_clock_state::step_clock");
+ my_atomic_rwlock_wrlock(&state_LOCK);
+ (void) my_atomic_add64(&state, step);
+ retval= my_atomic_load64(&state);
+ my_atomic_rwlock_wrunlock(&state_LOCK);
+ DBUG_RETURN(retval);
+}
+
+/**
+SYNOPSIS:
+ Atomically fetch the current state
+ */
+int64
+Logical_clock_state::get_timestamp()
+{
+ int64 retval= 0;
+ my_atomic_rwlock_rdlock(&state_LOCK);
+ retval= my_atomic_load64(&state);
+ my_atomic_rwlock_rdunlock(&state_LOCK);
+ return retval;
+}
+
+/**
Log the current query.
The query will be logged in either row format or statement format
=== modified file 'sql/binlog.h'
--- a/sql/binlog.h revid:rohit.kalhans@stripped
+++ b/sql/binlog.h revid:rohit.kalhans@stripped
@@ -26,6 +26,23 @@ class Master_info;
class Format_description_log_event;
/**
+ Logical timestamp generator for binlog Prepare stage.
+ */
+class Logical_clock_state
+{
+private:
+ my_atomic_rwlock_t state_LOCK;
+ volatile int64 state;
+ int64 step;
+protected:
+ inline void init(){ state= 1; step= 1;}
+public:
+ Logical_clock_state(){init();}
+ int64 step_clock();
+ int64 get_timestamp();
+};
+
+/**
Class for maintaining the commit stages for binary log group commit.
*/
class Stage_manager {
@@ -265,7 +282,6 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
mysql_cond_t update_cond;
ulonglong bytes_written;
IO_CACHE index_file;
- char index_file_name[FN_REFLEN];
/*
crash_safe_index_file is temp file used for guaranteeing
index file crash safe when master server restarts.
@@ -374,6 +390,15 @@ public:
bool is_relay_log;
ulong signal_cnt; // update of the counter is checked by heartbeat
uint8 checksum_alg_reset; // to contain a new value when binlog is rotated
+
+ /* clock to timestamp the binlog prepares */
+ Logical_clock_state prepare_clock;
+
+ /* Clock to timestamp the commits */
+ Logical_clock_state commit_clock;
+
+ char index_file_name[FN_REFLEN];
+
/*
Holds the last seen in Relay-Log FD's checksum alg value.
The initial value comes from the slave's local FD that heads
@@ -560,7 +585,7 @@ public:
bool write_event(Log_event* event_info);
bool write_cache(THD *thd, class binlog_cache_data *binlog_cache_data);
- int do_write_cache(IO_CACHE *cache);
+ int do_write_cache(THD* thd, IO_CACHE *cache);
void set_write_error(THD *thd, bool is_transactional);
bool check_write_error(THD *thd);
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc revid:rohit.kalhans@stripped
+++ b/sql/handler.cc revid:rohit.kalhans@stripped
@@ -1414,6 +1414,12 @@ int ha_commit_trans(THD *thd, bool all)
goto end;
}
+ if (!thd->prepare_seq_written)
+ {
+ thd->prepare_seq_no= mysql_bin_log.prepare_clock.step_clock();
+ thd->prepare_seq_written= true;
+ }
+
if (!trans->no_2pc && (rw_ha_count > 1))
error= tc_log->prepare(thd, all);
}
@@ -1423,6 +1429,17 @@ int ha_commit_trans(THD *thd, bool all)
error= 1;
goto end;
}
+
+ /*
+ Here we step the commit clock to keep a track of all committed
+ transactions
+ */
+ if (!error && (all || trans->has_modified_non_trans_table()))
+ {
+ thd->commit_seq_no= mysql_bin_log.commit_clock.step_clock();
+ thd->prepare_seq_written= false;
+ }
+
DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
end:
if (release_mdl && mdl_request.ticket)
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc revid:rohit.kalhans@stripped
+++ b/sql/log_event.cc revid:rohit.kalhans@stripped
@@ -2656,8 +2656,59 @@ bool Log_event::contains_partition_info(
return res;
}
+/*
+ The function is called when being assigned group's descriptor
+ is just allocated. It fills in the descriptor struct an identifier
+ of the parent group using the prepare_seq_no and commit_seq_no.
+*/
+inline void mts_assign_parent_group_id(Log_event *ev, Relay_log_info *rli)
+{
+ Slave_committed_queue *gaq= rli->gaq;
+
+ if (rli->mts_parallel_type == MTS_PARALLEL_TYPE_BGC)
+ {
+ /*
+ A group id updater must satisfy the following:
+ - A query log event ("BEGIN" )
+ - ev->prepare_seq_no > 0
+ - ev->commit_seq_no > 0
+TODO: The code below is an outline marker and later we will use the
+taxonomical inferences from the two seq_numbers that we got from the master
+during BGC to assign the slave side group id.
+ */
+ if (ev->get_type_code() == QUERY_EVENT)
+ {
+ rli->mts_last_known_parent_group_id=
+ gaq->get_job_group(rli->gaq->assigned_group_index)->parent_seqno=
+ rli->mts_groups_assigned - 1;
+ }
+ else
+ {
+ gaq->get_job_group(rli->gaq->assigned_group_index)->parent_seqno=
+ rli->mts_last_known_parent_group_id;
+ }
+ }
+}
+
/**
The method maps the event to a Worker and return a pointer to it.
+ The rest of sending the event to the Worker is done by the caller.
+
+ Irrespective of the type of Group marking (DB partioned or BGC) the
+ following holds truw:
+
+ - to recognize the beginning of a group to allocate the group descriptor
+ and queue it;
+ - to associate an event with a Worker (which also handles possible conflicts
+ detection and waiting for their termination);
+ - to finalize the group assignement when the group closing event is met.
+
+ When parallelization mode is BGC-based the partitioning info in the event
+ is simply ignored. Thereby association with a Worker does not require
+ Assigned Partition Hash of the partitioned method.
+ This method is not interested in all the taxonomy of the event group
+ property, what we care about is the boundaries of the group.
+
As a part of the group, an event belongs to one of the following types:
B - beginning of a group of events (BEGIN query_log_event)
@@ -2691,6 +2742,7 @@ bool Log_event::contains_partition_info(
There's few memory allocations commented where to be freed.
@return a pointer to the Worker struct or NULL.
+ TODO:The BGC-based parallelization still has to address lookup for temp tables.
*/
Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
@@ -2752,8 +2804,9 @@ Slave_worker *Log_event::get_slave_worke
// mark the current group as started with explicit B-event
rli->mts_end_group_sets_max_dbs= true;
rli->curr_group_seen_begin= true;
+ mts_assign_parent_group_id(this, rli);
}
-
+
if (is_gtid_event(this))
// mark the current group as started with explicit Gtid-event
rli->curr_group_seen_gtid= true;
@@ -2763,20 +2816,47 @@ Slave_worker *Log_event::get_slave_worke
}
else
{
+ // The block is a result of not making GTID event as group starter.
+ // TODO: Make GITD event as B-event that is starts_group() to
+ // return true.
+
Log_event *ptr_curr_ev= this;
// B-event is appended to the Deferred Array associated with GCAP
insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
rli->curr_group_seen_begin= true;
rli->mts_end_group_sets_max_dbs= true;
+ mts_assign_parent_group_id(this, rli);
DBUG_ASSERT(rli->curr_group_da.elements == 2);
DBUG_ASSERT(starts_group());
return ret_worker;
}
}
- // mini-group representative
-
- if (contains_partition_info(rli->mts_end_group_sets_max_dbs))
+ if (rli->mts_parallel_type == MTS_PARALLEL_TYPE_BGC)
+ {
+ ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
+
+ /*
+ data_lock is held for short time of updating exec coordinates.
+ An alternative could be run a lighter version of the funcition
+ that would defer exec coordinates updating, e.g raising a
+ special flag in rli->gam.lwm.
+ TODO: check once per group
+ */
+ while (ptr_group->parent_seqno > rli->gaq->lwm.total_seqno)
+ (void) mts_checkpoint_routine(rli, 0, true, true /*need_data_lock=true*/);
+
+ // compute worker, todo: consider to generelize get_least_occupied_worker()
+ ret_worker= (rli->last_assigned_worker) ? rli->last_assigned_worker :
+ *(Slave_worker **) dynamic_array_ptr(&rli->least_occupied_workers, 0);
+ if (!ret_worker)
+ ret_worker= get_least_occupied_worker(&rli->workers);
+ ptr_group->worker_id= ret_worker->id;
+ // "fixing" temp tables:
+ if (get_type_code() == QUERY_EVENT)
+ static_cast<Query_log_event*>(this)->mts_accessed_dbs= 0;
+ }
+ else if (contains_partition_info(rli->mts_end_group_sets_max_dbs))
{
int i= 0;
num_dbs= mts_number_dbs();
@@ -3458,6 +3538,22 @@ bool Query_log_event::write(IO_CACHE* fi
}
/*
+ We store 0 in the following two status vars since we don't have the prepare
+ and the commit timestamps. The locgical timestamps will be updated in the
+ do_write_cache.
+ */
+ if (thd)
+ {
+ thd->prepare_commit_offset= (int)(start-start_of_status);
+ *start++= Q_PREPARE_TS;
+ memset(start, 0, 8);
+ start+= 8;
+
+ *start++= Q_COMMIT_TS;
+ memset(start,0,8);
+ start+= 8;
+ }
+ /*
NOTE: When adding new status vars, please don't forget to update
the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
code_name() in this file.
@@ -3832,6 +3928,8 @@ code_name(int code)
case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
case Q_UPDATED_DB_NAMES: return "Q_UPDATED_DB_NAMES";
case Q_MICROSECONDS: return "Q_MICROSECONDS";
+ case Q_PREPARE_TS: return "Q_PREPARE_TS";
+ case Q_COMMIT_TS: return "Q_COMMIT_TS";
}
sprintf(buf, "CODE#%d", code);
return buf;
@@ -4094,6 +4192,17 @@ Query_log_event::Query_log_event(const c
DBUG_VOID_RETURN;
break;
}
+
+ case Q_PREPARE_TS:
+ CHECK_SPACE(pos, end, 8);
+ prepare_seq_no= (int64)uint8korr(pos);
+ pos+= 8;
+
+ case Q_COMMIT_TS:
+ CHECK_SPACE(pos, end, 8);
+ commit_seq_no= (int64)uint8korr(pos);
+ pos+= 8;
+
default:
/* That's why you must write status vars in growing order of code */
DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h revid:rohit.kalhans@stripped
+++ b/sql/log_event.h revid:rohit.kalhans@stripped
@@ -303,6 +303,7 @@ struct sql_ex_info
/* type, db_1, db_2, ... */ \
1U + (MAX_DBS_IN_EVENT_MTS * (1 + NAME_LEN)) + \
3U + /* type, microseconds */ + \
+ 1U+ 8U + 8U /* type, prepare & commit timestamp */ + \
1U + 16 + 1 + 60/* type, user_len, user, host_len, host */)
#define MAX_LOG_EVENT_HEADER ( /* in order of Query_log_event::write */ \
LOG_EVENT_HEADER_LEN + /* write_header */ \
@@ -390,6 +391,15 @@ struct sql_ex_info
#define Q_MICROSECONDS 13
+/*
+ Q_PREPARE_TS and Q_COMMIT_TS status variables stores the pepare
+ timestamp when the transaction entered the binlog prepare and commit
+ phases respectively. These wll be used to apply transactions in parallel
+ on the slave.
+ */
+#define Q_PREPARE_TS 14
+#define Q_COMMIT_TS 15
+
/* Intvar event post-header */
/* Intvar event data */
@@ -2185,6 +2195,12 @@ public: /* !!! Public in this pat
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
+ /*
+ Prepare and commit sequence number. will be set to 0 if the event is not a
+ transaction starter.
+ */
+ int64 prepare_seq_no;
+ int64 commit_seq_no;
/**
Notice, DDL queries are logged without BEGIN/COMMIT parentheses
and identification of such single-query group
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc revid:rohit.kalhans@stripped
+++ b/sql/rpl_rli.cc revid:rohit.kalhans@stripped
@@ -98,8 +98,9 @@ Relay_log_info::Relay_log_info(bool is_s
checkpoint_group(opt_mts_checkpoint_group),
recovery_groups_inited(false), mts_recovery_group_cnt(0),
mts_recovery_index(0), mts_recovery_group_seen_begin(0),
- mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
- rli_description_event(NULL),
+ mts_group_status(MTS_NOT_IN_GROUP),
+ mts_parallel_type(MTS_PARALLEL_TYPE_DB_NAME),
+ reported_unsafe_warning(false), rli_description_event(NULL),
sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
long_find_row_note_printed(false), error_on_rli_init_info(false)
{
@@ -151,7 +152,9 @@ void Relay_log_info::init_workers(ulong
Parallel slave parameters initialization is done regardless
whether the feature is or going to be active or not.
*/
- mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
+ mts_last_known_parent_group_id=
+ mts_groups_assigned= mts_events_assigned=
+ pending_jobs= wq_size_waits_cnt= 0;
mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
mts_last_online_stat= 0;
my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h revid:rohit.kalhans@stripped
+++ b/sql/rpl_rli.h revid:rohit.kalhans@stripped
@@ -582,7 +582,7 @@ public:
MTS statistics:
*/
ulonglong mts_events_assigned; // number of events (statements) scheduled
- ulong mts_groups_assigned; // number of groups (transactions) scheduled
+ ulonglong mts_groups_assigned; // number of groups (transactions) scheduled
volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess_cnt increments
ulong wq_size_waits_cnt; // number of times C slept due to WQ:s oversize
/*
@@ -601,6 +601,14 @@ public:
time_t mts_last_online_stat;
/* end of MTS statistics */
+ /* MTS type */
+ enum_mts_parallel_type mts_parallel_type;
+ /*
+ Slave side local seq_no identifying a parent group that being
+ the scheduled transaction is considered to be dependent
+ */
+ ulonglong mts_last_known_parent_group_id;
+
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc revid:rohit.kalhans@stripped
+++ b/sql/rpl_rli_pdb.cc revid:rohit.kalhans@stripped
@@ -1843,7 +1843,9 @@ int slave_worker_exec_job(Slave_worker *
worker->curr_group_seen_begin= true; // The current group is started with B-event
worker->end_group_sets_max_dbs= true;
}
- else if (!is_gtid_event(ev))
+ else if (!is_gtid_event(ev) &&
+ // no need to address partioning in BGC mode
+ (rli->mts_parallel_type != MTS_PARALLEL_TYPE_BGC))
{
if ((part_event=
ev->contains_partition_info(worker->end_group_sets_max_dbs)))
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h revid:rohit.kalhans@stripped
+++ b/sql/rpl_rli_pdb.h revid:rohit.kalhans@stripped
@@ -152,7 +152,6 @@ typedef struct st_slave_job_group
my_off_t group_relay_log_pos; // filled by W
ulong worker_id;
Slave_worker *worker;
- ulonglong total_seqno;
my_off_t master_log_pos; // B-event log_pos
/* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
@@ -164,7 +163,11 @@ typedef struct st_slave_job_group
volatile uchar done; // Flag raised by W, read and reset by Coordinator
ulong shifted; // shift the last CP bitmap at receiving a new CP
time_t ts; // Group's timestampt to update Seconds_behind_master
-
+ /**
+ BGC-based parallelization
+ */
+ ulonglong parent_seqno; // parent group id
+ ulonglong total_seqno; // current group id
/*
Coordinator fills the struct with defaults and options at starting of
a group distribution.
@@ -183,6 +186,7 @@ typedef struct st_slave_job_group
checkpoint_relay_log_pos= 0;
checkpoint_seqno= (uint) -1;
done= 0;
+ parent_seqno= 0;
}
} Slave_job_group;
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc revid:rohit.kalhans@stripped
+++ b/sql/sql_class.cc revid:rohit.kalhans@stripped
@@ -1043,6 +1043,9 @@ THD::THD(bool enable_plugins)
#ifndef DBUG_OFF
gis_debug= 0;
#endif
+#ifndef MYSQL_CLIENT
+ prepare_seq_written= false;
+#endif
}
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h revid:rohit.kalhans@stripped
+++ b/sql/sql_class.h revid:rohit.kalhans@stripped
@@ -2296,6 +2296,16 @@ public:
/* MTS: method inserts a new unique name into binlog_updated_dbs */
void add_to_binlog_accessed_dbs(const char *db);
+ /* MTS: Trans prepare timestamp */
+ int64 prepare_seq_no;
+ bool prepare_seq_written;
+
+ /* MTS: Trans commit timestamp */
+ int64 commit_seq_no;
+
+ /* MTS: offset to the prepare & commit seq. no. in the status vars */
+ int prepare_commit_offset;
+
#endif /* MYSQL_CLIENT */
public:
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-trunk-wl6314 branch (rohit.kalhans:5366 to 5368) WL#6314 | Rohit Kalhans | 21 Feb 2013 |