#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped
3211 Andrei Elkin 2010-11-18
wl#5569 MTS
Extending the wl#5563 prototype gradually.
This commit addresses:
1. recovery interface (a new Worker rli plus rli->gaq and pseudo-code for checkpoint
to update GAQ and the central RLI recovery table.
Wrt rli, C and W execute do_apply_event(c_rli) where c_rli is the central
instance. C executes update_pos(c_rli), but W update_pos(w_rli).
others:
- decreased processing time for rpl_parallel, serial.
@ sql/log_event.cc
Enhance Log_event::get_slave_worker_id() to classify events by set of parallelization properties;
Presence of a property in an event forces some actions both on C and W side.
en_queue etc are prepared to turn into circular_buffer_queue methods.
Pseudo-coded numerious todo:s wrt to low-level-design implementation.
Deployed changes due to Worker private rli.
Annotated on Deferred Array for B,p,r property events.
delete ev is moved from C to W which is fault-prone but it could not be kept
any longer as a part of de_queue() that transits into cir_buf_queue class.
@ sql/log_event.h
removed `soiled' that was used to make delete ev run safely.
Added Log_event methods identifying the parallelization properties, incl
- contains_partition_info() to identify events containing
info to be processed by the partition hash func
- starts,ends_group()
- also updated the list of only_serial().
@ sql/rpl_rli.cc
Only Coordinator can destroy Workers dynarray;
Relay_log_info::get_current_worker() turned out to become more complicated, see comments;
Reminder to migrate rli->future... into ev-> future_event_relay_log_pos
which would make Worker to find the value out the event's context;
Prototyped // w->flush_info() in stmt_done;
@ sql/rpl_rli.h
The worker RLI has `this_worker' pointing to the actual worker instance.
@ sql/rpl_rli_pdb.cc
Annotated with fine details APH etc implementation.
@ sql/rpl_rli_pdb.h
Trasformed earlier queue struct into a family of classes.
Recovery interface: last_group_done_index of Slave_worker to be filled in with an index
of GAQ queue by W. To poll the value by C at checkpoint.
Added CGEP to W context (sim to CGAP of C).
@ sql/rpl_slave.cc
Simplified the Worker poll.
Deployed worker rli initialization.
Recovery: rli->gaq is instantiated by C at worker poll activization.
Recovery: pseudo-code for checkpoint in next_event().
@ sql/sys_vars.cc
editted help lines for slave_max_pending_jobs.
modified:
mysql-test/extra/rpl_tests/rpl_parallel_load.test
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-09-21 22:19:05 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-11-18 14:00:52 +0000
@@ -13,7 +13,7 @@ let $workers = `select @@global.slave_pa
#
# load volume parameter
#
-let $iter = 5000;
+let $iter = 1000;
connection slave;
@@ -230,6 +230,7 @@ start slave sql_thread;
let $wait_timeout= 600;
let $wait_condition= SELECT count(*)+sleep(1) = 5 FROM test0.benchmark;
source include/wait_condition.inc;
+
use test;
select * from test0.benchmark into outfile 'benchmark.out';
select ts from test0.benchmark where state like 'master started load' into @m_0;
@@ -239,6 +240,13 @@ select ts from test0.benchmark where sta
select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+# debug: pre diff check-out
+--disable_result_log
+--disable_query_log
+###select sleep(9999);
+--enable_result_log
+--enable_query_log
+
let $i = $workers + 1;
while($i)
{
@@ -255,6 +263,13 @@ while($i)
--enable_result_log
--enable_query_log
+# debug: pre diff check-out
+--disable_result_log
+--disable_query_log
+###select sleep(9999);
+--enable_result_log
+--enable_query_log
+
connection master;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-11 11:53:01 +0000
+++ b/sql/log_event.cc 2010-11-18 14:00:52 +0000
@@ -2154,47 +2154,100 @@ Log_event::continue_group(Relay_log_info
return Log_event::do_shall_skip(rli);
}
+
+bool Log_event::contains_partition_info()
+{
+ return get_type_code() == TABLE_MAP_EVENT ||
+
+ // todo: the 4 types below are limitly parallel-supported (the default
+ // session db not the actual db)
+
+ get_type_code() == QUERY_EVENT;
+}
+
/**
- *** a template method to be replaced by the actual hash func ***
- *** Limitted to work for STMT binlog format events ***
- *** Rows-event handling is limitted to one Worker ***
- @return index \in [0, M] range, where M is the max index of the worker pool.
+ General hashing function to compute the id of an applier for
+ the current event.
+ At computing the id few rules apply depending on partitioning properties
+ that the event instance can feature.
+
+ Let's call the properties.
+
+ B - beginning of a group of events (BEGIN query_log_event)
+ g - mini-group representative event containing the partition info
+ (any Table_map, a Query_log_event)
+ p - a mini-group internal event that *p*receeding its g-parent
+ (int_, rand_, user_ var:s)
+ r - a mini-group internal "regular" event that follows its g-parent
+ (Write, Update, Delete -rows)
+ S - sequentially applied event (may not be a part of any group).
+ Events of this type are determined via @c only_serial_exec()
+ earlier and don't cause calling this method .
+ T - terminator of the group (XID, COMMIT, ROLLBACK)
+
+ Only `g' case requires to compute the assigned Worker id.
+ In `T, r' cases it's @c last_assigned_worker that is one that was
+ assigned at the last `g' processing.
+ In `B' case it's NULL to indicate the Coordinator will skip doing anything
+ more with the event. Its scheduling gets deffered until the following
+ `g' event names a Worker.
+
+ @note `p' and g-Query-log-event is not supported yet.
+
+ @note The function can update APH, CGAP objects.
+
+ @return a pointer to the Worker stuct or NULL.
*/
Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
{
- if (!(get_type_code() == XID_EVENT ||
- get_type_code() == INTVAR_EVENT ||
- get_type_code() == USER_VAR_EVENT ||
- get_type_code() == RAND_EVENT ||
- get_type_code() == DELETE_ROWS_EVENT ||
- get_type_code() == UPDATE_ROWS_EVENT ||
- get_type_code() == WRITE_ROWS_EVENT))
+ /* checking properties and perform corresponding actions */
+
+ // g
+ if (contains_partition_info())
{
- Slave_worker *worker= get_slave_worker(get_db(), rli->workers);
+ // a lot of things inside `get_slave_worker_id'
+ Slave_worker *worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
}
+ // B
+ if (starts_group())
+ {
+ // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
+ // Group_cnt= rli->gaq->e;
+ // rli->gaq->en_queue({NULL, W_s});
+ // B-event is appended to the Deferred Array associated with GCAP
+ }
+
+ // T
+ if (ends_group())
+ {
+ // assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
+
+ // cleanup: CGAP := nil
+
+ // ev->group_cnt := GAQ.Group_cnt, so that
+ // at procesing *Worker*.WQ.last_group_cnt := ev->group_cnt
+ }
+
+ // todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
+
+ // r
return (rli->last_assigned_worker);
}
static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
{
- Slave_job_item free;
-
if (jobs->a == jobs->s)
{
DBUG_ASSERT(jobs->a == jobs->Q.elements);
return -1;
}
- // GC
- get_dynamic(&jobs->Q, (uchar*) &free, jobs->a);
- if (free.data != NULL)
- delete static_cast<Log_event *>(free.data);
// store
- free.data= item->data;
- set_dynamic(&jobs->Q, (uchar*) &free, jobs->a);
+
+ set_dynamic(&jobs->Q, (uchar*) item, jobs->a);
// pre-boundary cond
if (jobs->e == jobs->s)
@@ -2220,12 +2273,12 @@ static void * head_queue(Slave_jobs_queu
if (jobs->e == jobs->s)
{
DBUG_ASSERT(jobs->len == 0);
- ret->data= NULL;
+ ret->data= NULL; // todo: move to caller
return NULL;
}
get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
- DBUG_ASSERT(ret->data);
+ DBUG_ASSERT(ret->data); // todo: move to caller
return ret;
}
@@ -2325,7 +2378,7 @@ int Log_event::apply_event(Relay_log_inf
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
- if (!rli->is_parallel_exec() || only_serial_exec())
+ if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
DBUG_RETURN(do_apply_event(rli));
if (!(w= get_slave_worker_id(rli)) ||
@@ -2335,6 +2388,14 @@ int Log_event::apply_event(Relay_log_inf
job_item->data= this;
DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+
+ // TODO: MTS-APH DA (BEGIN)
+ // for (i= 0; i < CGDA.elements; i++)
+ // {
+ // append_item_to_jobs(job_item, CGDA[i], const_cast<Relay_log_info*>(rli));
+ // }
+ // todo: resize if elements > min
+ // CGDA.elements= 0;
append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
DBUG_RETURN(FALSE);
}
@@ -2380,6 +2441,9 @@ struct slave_job_item* pop_jobs_item(Sla
mts-II worker main routine.
The worker thread waits for an event, execute it, fixes statistics counters.
+ @note the function maintains CGEP and modifies APH, and causes
+ modification of GAQ.
+
@return 0 success
-1 got killed or an error happened during appying
*/
@@ -2409,10 +2473,33 @@ int slave_worker_exec_job(Slave_worker *
DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd));
+ if (ev->starts_group())
+ {
+ // ... ? do nothing ?
+ }
+ if (ev->contains_partition_info())
+ {
+ // TODO: w->CGEP .= ev->get_db()
+ }
+ if (ev->ends_group())
+ {
+ w->slave_worker_ends_group();
+
+ // TODO: GAQ related
+ // w->last_group_done_index = ev->group_cnt
+ }
+
error= ev->do_apply_event(rli);
mysql_mutex_lock(&w->jobs_lock);
- de_queue(&w->jobs, job_item); // now event of the item is garbage
+ de_queue(&w->jobs, job_item);
+ /*
+ preserving signatures of existing methods.
+ todo: convert update_pos(w->w_rli) -> update_pos(w)
+ to remove w_rli w/a
+ */
+ ev->update_pos(w->w_rli);
+ delete ev; // after ev->update_pos() event is garbage
mysql_mutex_unlock(&w->jobs_lock);
/* statistics */
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-10 10:57:13 +0000
+++ b/sql/log_event.h 2010-11-18 14:00:52 +0000
@@ -645,6 +645,7 @@ class THD;
class Format_description_log_event;
class Relay_log_info;
class Slave_worker;
+class Slave_committed_queue;
#ifdef MYSQL_CLIENT
enum enum_base64_output_mode {
@@ -1141,13 +1142,6 @@ public:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
public:
- /**
- mts-II (todo: may change in favor of relocatin update_pos to rli class)
- Coordinator to mark the instance as available to delete.
- The member is designed for the slave side instance.
- */
- volatile bool soiled;
-
/**
mst-II: to execute serially due to
technical or conceptual limitation
@@ -1157,6 +1151,14 @@ public:
bool only_serial_exec()
{
return
+ // todo: the 4 types below are limitly parallel-supported (the default
+ // session db not the actual db)
+
+ // get_type_code() == QUERY_EVENT ||
+ // get_type_code() == INTVAR_EVENT ||
+ // get_type_code() == USER_VAR_EVENT ||
+ // get_type_code() == RAND_EVENT ||
+
get_type_code() == STOP_EVENT ||
get_type_code() == ROTATE_EVENT ||
get_type_code() == LOAD_EVENT ||
@@ -1174,6 +1176,18 @@ public:
get_type_code() == PRE_GA_DELETE_ROWS_EVENT ||
get_type_code() == INCIDENT_EVENT;
}
+
+ /**
+ Events of a cetain type carry partitioning data such as db names.
+ */
+ bool contains_partition_info();
+
+ /**
+ Events of a cetain type start or end a group of events treated
+ transactionally wrt binlog.
+ */
+ virtual bool starts_group() { return FALSE; }
+ virtual bool ends_group() { return FALSE; }
/**
@return index in \in [0, M] range to indicate
@@ -1832,6 +1846,19 @@ public: /* !!! Public in this pat
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
+ /**
+ todo: Parallel support for DDL:s.
+ DDL queries are logged without BEGIN/COMMIT parentheses
+ and can be regarded as the starting and the ending events of
+ its self-group.
+ */
+ bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
+ bool ends_group()
+ {
+ return
+ !strncmp(query, "COMMIT", q_len) ||
+ !strncasecmp(query, STRING_WITH_LEN("ROLLBACK"));
+ }
};
@@ -2579,7 +2606,7 @@ class Xid_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
-
+ bool ends_group() { return TRUE; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli.cc 2010-11-18 14:00:52 +0000
@@ -76,6 +76,7 @@ Relay_log_info::Relay_log_info(bool is_s
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
rows_query_ev(NULL), last_event_start_time(0),
+ this_worker(NULL),
sql_delay(0), sql_delay_end(0),
m_flags(0)
{
@@ -145,31 +146,48 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&pending_jobs_lock);
mysql_cond_destroy(&pending_jobs_cond);
- delete_dynamic(&workers);
+ if (!this_worker)
+ delete_dynamic(&workers);
DBUG_VOID_RETURN;
}
+/**
+ The method can be run both by C having the Main (coord) rli context and
+ by W having both the main and the Local (worker) rli context.
+ Decistion matrix:
+
+ Main Local
+ -+-------------
+ C| this_w -
+ W| w_i this_w
+
+*/
Slave_worker* Relay_log_info::get_current_worker() const
{
uint i;
- Slave_worker* w_i;
if (!is_parallel_exec() || info_thd == current_thd)
- return NULL;
+ return this_worker; // can be asserted: !this_worker => C
for (i= 0; i< workers.elements; i++)
{
- // convert to use hashing
+ Slave_worker* w_i;
+ // todo: optimaze/replace the loop
get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
if (w_i->info_thd == current_thd)
+ {
return w_i;
+ }
}
DBUG_ASSERT(0);
}
+/**
+ The method can be run both by C having the Main context.
+*/
RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
{
return
- ((!is_parallel_exec()) || (info_thd == current_thd)) ?
+ ((!is_parallel_exec()) || info_thd == current_thd) ?
const_cast<RPL_TABLE_LIST**>(&tables_to_lock) :
&get_current_worker()->tables_to_lock;
}
@@ -998,12 +1016,22 @@ void Relay_log_info::stmt_done(my_off_t
while the MyISAM table has already been updated.
*/
if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos();
+ inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
else
{
+ Slave_worker *w;
inc_group_relay_log_pos(event_master_log_pos);
+
/* Alfranio needs to update the coordinator and workers. */
- flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
+
+ if ((w= get_current_worker()) == NULL)
+ flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
+
+ // Andrei: tried testing the worker's method but got a segfault
+ // because in to->set_info(group_relay_log_name) the arg is NULL.
+
+ //else
+ // w->flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
}
}
@@ -1012,7 +1040,7 @@ void Relay_log_info::cleanup_context(THD
{
DBUG_ENTER("Relay_log_info::cleanup_context");
- DBUG_ASSERT(info_thd == thd || is_parallel_exec());
+ DBUG_ASSERT((info_thd == thd) || is_parallel_exec());
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli.h 2010-11-18 14:00:52 +0000
@@ -271,20 +271,6 @@ public:
*/
ulong slave_exec_mode;
- /*
- WL#5563 mts-II
- */
- DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
- volatile int pending_jobs;
- ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
- mysql_mutex_t pending_jobs_lock;
- mysql_cond_t pending_jobs_cond;
- int slave_pending_jobs_max;
- Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
-
- // List<slave_job_item> free_jobs;
-
- Slave_worker* get_current_worker() const;
RPL_TABLE_LIST** get_tables_to_lock() const;
uint add_table_to_lock(RPL_TABLE_LIST *table_list);
@@ -435,6 +421,22 @@ public:
*/
time_t last_event_start_time;
+ /*
+ WL#5569 MTS-II
+ */
+ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
+ volatile int pending_jobs;
+ ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
+ mysql_mutex_t pending_jobs_lock;
+ mysql_cond_t pending_jobs_cond;
+ int slave_pending_jobs_max;
+ Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
+ Slave_committed_queue *gaq;
+
+ Slave_worker* get_current_worker() const;
+ Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
+ Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
+
/**
Helper function to do after statement completion.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-18 14:00:52 +0000
@@ -201,7 +201,47 @@ void destroy_hash_workers()
DBUG_VOID_RETURN;
}
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers)
+/**
+ The function produces a reference to the struct of a Worker
+ that has been or will be engaged to process the @c dbname -keyed partition (D).
+ It checks a local to Coordinator CGAP list first and returns
+ @c last_assigned_worker when found (todo: assert).
+
+ Otherwise, the partition is appended to the current group list:
+
+ CGAP .= D
+
+ and a possible D's Worker id is searched in APH that collects tuples
+ (P, W_id, U, mutex, cond).
+ In case not found,
+
+ W_d := W_c unless W_c is NULL.
+
+ When W_c is NULL it is assigned to a least occupied,
+
+ W_d := W_c := W_{least_occupied}
+
+ APH .= a new (D, W_d, 1)
+
+ In a case APH contains W_d == W_c, (assert U >= 1)
+
+ update APH set U++ where APH.P = D
+
+ The case APH contains a W_d != W_c != NULL assigned to D-partition represents
+ the hashing conflict and is handled as the following:
+
+ a. marks the record of APH with a flag requesting to signal in the
+ cond var when `U' the usage counter drops to zero by the other Worker;
+ b. waits for the other Worker to finish tasks on that partition and
+ gets the signal;
+ c. updates the APH record to point to the first Worker (naturally, U := 1),
+ scheduled the event, and goes back into the parallel mode
+
+ @note modifies CGAP, APH
+
+ @return the pointer to a Worker struct
+*/
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers /*, W_c */)
{
DBUG_ENTER("get_slave_worker");
@@ -285,3 +325,22 @@ Slave_worker *get_free_worker(DYNAMIC_AR
DBUG_ASSERT(worker != NULL);
return(worker);
}
+
+/**
+ Deallocative routine that makes few things in opposite to
+ @c get_slave_worker().
+
+ Affected by the being committed group APH tuples are updated.
+
+ for each D_i in CGEP
+ assert (W_id == APH.W_id|P_d == D_i)
+ update APH set U-- where P_d = D_i
+ if U == 0 \and count(APH) > max
+ delete from APH where U = 0
+
+ CGEP the Worker partition cache is cleaned up.
+*/
+
+void Slave_worker::slave_worker_ends_group()
+{
+}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-18 14:00:52 +0000
@@ -18,20 +18,59 @@ Slave_worker *get_slave_worker(const cha
Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
#define SLAVE_WORKER_QUEUE_SIZE 8096
+#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
typedef struct slave_job_item
{
void *data;
} Slave_job_item;
-typedef struct slave_jobs_queue
+class circular_buffer_queue
{
+public:
+
DYNAMIC_ARRAY Q;
ulong s;
ulong a;
ulong e;
- ulong len;
-} Slave_jobs_queue;
+ volatile ulong len; // it is also queried to compute least occupied
+
+ circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
+ s(max), a(0), e(max), len(0)
+ {
+ my_init_dynamic_array(&Q, el_size, s, alloc_inc);
+ };
+ circular_buffer_queue () {};
+};
+
+/**
+ Group Assigned Queue whose first element identifies first gap
+ in committed sequence. The head of the queue is therefore next to
+ the low-water-mark.
+*/
+class Slave_committed_queue : public circular_buffer_queue
+{
+public:
+ // Allocation of file_name that is common for all Slave_assigned_job_group:s
+ char current_binlog[FN_REFLEN];
+ void update_current_binlog(const char *post_rotate); //master's Rotate exec it
+ Slave_committed_queue (const char *log, uint el_size, ulong max, uint inc= 0)
+ : circular_buffer_queue(el_size, max, inc)
+ {
+ strmake(current_binlog, log, sizeof(current_binlog) - 1);
+ };
+};
+
+typedef struct st_slave_assigned_job_group
+{
+ struct event_coordinates coord;
+ ulong worker_id;
+ ulonglong total_seqno;
+} Slave_assigned_job_group;
+
+class Slave_jobs_queue : public circular_buffer_queue
+{
+};
class Slave_worker : public Rpl_info_worker
{
@@ -41,10 +80,21 @@ public:
mysql_mutex_t jobs_lock;
mysql_cond_t jobs_cond;
-
- //List<struct slave_job_item> jobs;
Slave_jobs_queue jobs;
+ // fixme: experimental
+ Relay_log_info *w_rli;
+
+ DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+
+ // @c last_group_done_index is for recovery, although can be viewed
+ // as statistics as well.
+ // C marks a T-event with the incremented group_cnt that is
+ // an index in GAQ; W stores it at the event execution.
+ // C polls the value periodically to maintain an array
+ // of the indexes in order to progress on GAQ's lwm, see @c next_event().
+ volatile ulong last_group_done_index; // it's index in GAQ
+
List<Log_event> data_in_use; // events are still in use by SQL thread
ulong id;
TABLE *current_table;
@@ -76,6 +126,8 @@ public:
size_t get_number_worker_fields();
+ void slave_worker_ends_group(); // CGEP walk through to upd APH
+
private:
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_slave.cc 2010-11-18 14:00:52 +0000
@@ -70,6 +70,12 @@ MY_BITMAP slave_error_mask;
char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
+typedef struct st_slave_worker_init_args
+{
+ Relay_log_info *rli;
+ Slave_worker *w;
+} SLAVE_WORKER_INIT_ARGS;
+
char* slave_load_tmpdir = 0;
Master_info *active_mi= 0;
@@ -2617,8 +2623,6 @@ int apply_event_and_update_pos(Log_event
// Sleeps if needed, and unlocks rli->data_lock.
if (sql_delay_event(ev, thd, rli))
DBUG_RETURN(0);
- if (rli->is_parallel_exec()) // todo: consider adding `ev' as an arg
- ev->soiled= FALSE;
exec_res= ev->apply_event(rli);
}
else
@@ -2662,7 +2666,16 @@ int apply_event_and_update_pos(Log_event
int error= 0;
if (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
skip_event)
- error= ev->update_pos(rli);
+
+// TODO: Alfranio to fix/restore the condition to update the main RLI
+// It is kept the prototype time way in order to process rpl_parallel.
+// This is parallel exec and event required the sequential mode
+// that also includes all Workers finished their assignements.
+// It was served so inside apply_event() above.
+// The main RLI table is safe to update now.
+
+// if (!rli->is_parallel_exec() || ev->only_serial_exec())
+ error= ev->update_pos(rli);
#ifndef DBUG_OFF
DBUG_PRINT("info", ("update_pos error = %d", error));
if (!rli->belongs_to_client())
@@ -2693,15 +2706,10 @@ int apply_event_and_update_pos(Log_event
" Stopped in %s position %s",
rli->get_group_relay_log_name(),
llstr(rli->get_group_relay_log_pos(), buf));
- if (rli->is_parallel_exec())
- ev->soiled= TRUE;
DBUG_RETURN(2);
}
}
- if (rli->is_parallel_exec())
- ev->soiled= TRUE;
-
DBUG_RETURN(exec_res ? 1 : 0);
}
@@ -2845,7 +2853,7 @@ static int exec_relay_log_event(THD* thd
handle_rows_query_log_event(ev, rli);
if (!rli->is_parallel_exec() &&
- ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+ ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts todo: check this case
{
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
@@ -3481,7 +3489,13 @@ pthread_handler_t handle_slave_worker(vo
Slave_worker *w;
int error= 0;
+ // TODO: remove pending_jobs ref:s to
+ // TODO: ulong id;
+
+ // rli= ((SLAVE_WORKER_INIT_ARGS *) arg)->rli;
+
Relay_log_info* rli = ((Master_info*)arg)->rli;
+
my_thread_init();
DBUG_ENTER("handle_slave_worker");
@@ -3498,13 +3512,23 @@ pthread_handler_t handle_slave_worker(vo
goto err;
}
- /* handshake with each started workers */
+ /* handshake with each started worker */
mysql_mutex_lock(&rli->pending_jobs_lock);
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1);
rli->pending_jobs= 0;
+
+ // TODO: w= ((SLAVE_WORKER_INIT_ARGS *) arg)->w;
+
w->info_thd= thd;
w->tables_to_lock= NULL;
w->tables_to_lock_count= 0;
+
+ // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
+ w->w_rli= Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
+ w->w_rli->info_thd= thd;
+ w->w_rli->workers= rli->workers; // shallow copying is sufficient
+ w->w_rli->this_worker= w;
+
thd->thread_stack = (char*)&thd;
mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
mysql_mutex_unlock(&rli->pending_jobs_lock);
@@ -3543,21 +3567,92 @@ err:
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
}
+
+ delete w->w_rli; // fixme: experimenting
+
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
}
+/**
+ A single Worker thread is forked out.
+
+ @return 0 suppress or 1 if fails
+*/
+int slave_start_single_worker(Relay_log_info *rli, ulong i)
+{
+ int error= 0;
+ uint k;
+ pthread_t th;
+ Slave_worker *w=
+ Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
+ Slave_job_item empty= {NULL};
+ SLAVE_WORKER_INIT_ARGS worker_args= {rli, w};
+
+ w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
+ w->id= i;
+ w->current_table= NULL;
+ w->usage_partition= 0;
+ w->last_group_done_index= rli->gaq->s; // out of range
+
+ // Queue initialization
+ rli->slave_pending_jobs_max= ::slave_max_pending_jobs; // may change while offline
+ w->jobs.s= rli->slave_pending_jobs_max + 1;
+ my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
+ for (k= 0; k < w->jobs.s; k++)
+ insert_dynamic(&w->jobs.Q, (uchar*) &empty);
+
+ DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+
+ w->jobs.e= w->jobs.s;
+ w->jobs.len= w->jobs.a= 0;
+
+ set_dynamic(&rli->workers, (uchar*) &w, i);
+ mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
+
+ // CGEP dynarray holds id:s of partitions of the Current being executed Group
+ my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
+ SLAVE_INIT_DBS_IN_GROUP, 1);
+ if (pthread_create(&th, &connection_attrib, handle_slave_worker,
+ (void*) &worker_args))
+ {
+ sql_print_error("Failed during slave worker thread create");
+ error= 1;
+ goto err;
+ }
-/*
- Worker threads start one-by-one with synch through rli->pending_jobs
- returns 0 success or 1 if fails
+err:
+ return error;
+}
+
+/**
+ The @c init_slave_workers number of Worker threads start one-by-one with synch
+ through rli->pending_jobs.
+ Also objects are initialized that Coordinator and Workers will maintain during their
+ session life time.
+
+ @return 0 success or 1 if fails
*/
int slave_start_workers(Relay_log_info *rli, ulong n)
{
uint i;
int error= 0;
+
+ // TODO: CGEP dynarray holds id:s of partitions of the Current being executed Group
+
+ // GAQ queue holds seqno:s of scheduled groups. C polls workers in
+ // @c lwm_checkpoint_period to update GAQ (see @c @next_event())
+ // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee
+ // each assigned job being sent to a WQ will be represented by an item in GAQ.
+ // ::slave_max_pending_jobs is the worst case when all jobs contain
+ // one event and map to one worker.
+ rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+ sizeof(Slave_assigned_job_group),
+ ::slave_max_pending_jobs);
for (i= 0; i < n; i++)
{
uint k;
@@ -3568,9 +3663,9 @@ int slave_start_workers(Relay_log_info *
w->id= i;
w->current_table= NULL;
w->usage_partition= 0;
+ w->last_group_done_index= rli->gaq->s; // out of range
// Queue initialization
- rli->slave_pending_jobs_max= ::slave_max_pending_jobs; // may change while offline
w->jobs.s= rli->slave_pending_jobs_max + 1;
my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
for (k= 0; k < w->jobs.s; k++)
@@ -3610,11 +3705,12 @@ int slave_start_workers(Relay_log_info *
mysql_mutex_unlock(&rli->pending_jobs_lock);
}
- /*
- Error checking is done internally when creating, destroying and using the
- associated hash functions.
- */
- init_hash_workers(n);
+ if (init_hash_workers(n)) // MTS TODO: mapping_db_to_worker -> APH
+ {
+ sql_print_error("Failed to init partitions hash");
+ error= 1;
+ goto err;
+ }
err:
return error;
@@ -3627,6 +3723,7 @@ void slave_stop_workers(Relay_log_info *
{
int i;
+
mysql_mutex_lock(&rli->pending_jobs_lock);
rli->pending_jobs += rli->workers.elements;
@@ -3661,6 +3758,7 @@ void slave_stop_workers(Relay_log_info *
delete w;
}
destroy_hash_workers();
+ delete rli->gaq;
}
/**
@@ -5020,7 +5118,7 @@ static Log_event* next_event(Relay_log_i
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
*/
- rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
+ rli->set_future_event_relay_log_pos(my_b_tell(cur_log)); // TODO_MTS: ev->f_rlp
if (hot_log)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
@@ -5134,6 +5232,21 @@ static Log_event* next_event(Relay_log_i
mysql_mutex_unlock(&rli->log_space_lock);
mysql_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update_relay_log unlocks lock_log !
+
+ // TODO: MTS LWM
+ // show-slave-status and
+ // if (rli->is_parallel_exec()
+ // {
+ // do
+ // {
+ // if (curr_clock - last_clock > lwm_period)
+ // last_clock= curr_clock
+ // \foreach W poll i:= W->last_group_done_index
+ // update rli->gaq[i > lwm] and RLI.exec_master_log_pos
+ // ret= mysql_bin_log.wait_for_update_relay_log(thd, lwm_period);
+ // } while ((ret == ETIMEDOUT || ret == ETIME) && && !thd->killed);
+ // }
+
rli->relay_log.wait_for_update_relay_log(rli->info_thd);
// re-acquire data lock since we released it earlier
mysql_mutex_lock(&rli->data_lock);
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-11-09 13:04:14 +0000
+++ b/sql/sys_vars.cc 2010-11-18 14:00:52 +0000
@@ -3111,8 +3111,8 @@ static Sys_var_ulong Sys_slave_parallel_
VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
static Sys_var_ulong Sys_slave_max_pending_jobs(
"slave_max_pending_jobs",
- "Number of replication events read out of Relay log and still not applied "
- "at one time. The coordinator thread suspends further jobs assigning until "
+ "Number of replication events read out of Relay log and still not applied. "
+ "The coordinator thread suspends further jobs assigning until "
"conditions have been improved ",
GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(40000), BLOCK_SIZE(1));
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101118140052-mqha481cp01dqa6i.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3211) WL#5569 | Andrei Elkin | 19 Nov |