#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3216 Andrei Elkin 2010-11-25
wl#5569
Converting the prototype time db2w hash to be concurrent;
Necessary inruduction of the least occupied Worker notion. It's currently computed
as Worker having the least number of distributed partitions.
Adding parallel support for Query_log_event;
caution: 1. the session/default not the actual db as the key
2. may not have been tested against all use cases (e.g int vars)
Fixing slave stop issues.
@ sql/log_event.cc
dding parallel support for Query_log_event that forces
changes in both Coord and Worker scopes; a query can have both B and g parallel properties.
@ sql/rpl_rli.h
Changes necessary for the concurrent hash.
Although east occupied defined as one having the least number of partitions atm, that may be too coarse so a method basing on distributed jobs can be deployed in a while.
@ sql/rpl_rli_pdb.cc
Least occupied defined as one having the least number of partitions atm (may be too coarse so a method basing on distributed jobs can be deployed in a while).
@ sql/rpl_rli_pdb.h
Changes necessary for the concurrent hash and the parallelizable query-log-event.
@ sql/rpl_slave.cc
rli->least_occupied_workers is prepared to be used in the least occupied calc
as a finer option.
Improving Workers stop.
modified:
mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2010-11-20 17:23:42 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2010-11-25 08:47:39 +0000
@@ -79,8 +79,24 @@ CREATE TABLE t1 (a int primary key);
insert into t1 values (1),(2);
-#connection slave;
-sync_slave_with_master;
+#
+# todo: remove when recovery recovers `sync_slave_with_master'
+#
+
+--sleep 3
+
+--disable_result_log
+--disable_query_log
+#select sleep(600);
+--enable_result_log
+--enable_query_log
+
+connection slave;
+# sync_slave_with_master;
+let $count= 2;
+let $table= t1;
+source include/wait_until_rows_count.inc;
+
# create an offending record
insert into t1 values (3);
@@ -95,6 +111,12 @@ let $count= 0;
let $table= worker_proc_list;
source include/wait_until_rows_count.inc;
+--disable_result_log
+--disable_query_log
+#select sleep(600);
+--enable_result_log
+--enable_query_log
+
source include/wait_for_slave_sql_to_stop.inc;
delete from t1;
@@ -105,8 +127,14 @@ source include/start_slave.inc;
connection master;
drop table t1;
-#connection slave;
-sync_slave_with_master;
+#
+# todo: remove when recovery recovers `sync_slave_with_master'
+#
+
+--sleep 3
+
+connection slave;
+#sync_slave_with_master;
drop view worker_proc_list;
drop view coord_proc_list;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-22 18:57:13 +0000
+++ b/sql/log_event.cc 2010-11-25 08:47:39 +0000
@@ -2202,52 +2202,107 @@ bool Log_event::contains_partition_info(
Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
{
Slave_worker *worker= NULL;
- /* checking properties and perform corresponding actions */
+ Slave_job_group g;
+ bool is_b_event;
- // g
- if (contains_partition_info())
- {
- // a lot of things inside `get_slave_worker_id'
- worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
- const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
- }
+ /* checking properties and perform corresponding actions */
- // B
- if (starts_group())
+ // B or a DDL
+ if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
{
// insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
// Group_cnt= rli->gaq->e;
// rli->gaq->en_queue({NULL, W_s});
- Slave_job_group gaq_item=
- {
- //{NULL, log_pos},
- log_pos,
+ g= {
+ log_pos,
+ (ulong) -1,
+ const_cast<Relay_log_info*>(rli)->mts_total_groups++
+ };
- worker->id, // todo: -> NULL and implement set_dynamic in DA
-
- const_cast<Relay_log_info*>(rli)->mts_total_groups++};
-
- rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &gaq_item);
+ // the last occupied GAQ's array index
+ rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+ DBUG_ASSERT(rli->last_assigned_worker == NULL);
+ if (is_b_event)
+ {
+ Log_event *ptr_curr_ev= this;
+ // B-event is appended to the Deferred Array associated with GCAP
+ insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
+ (uchar*) &ptr_curr_ev);
+
+ DBUG_ASSERT(rli->curr_group_da.elements == 1);
+
+ // mark the current grup as started with B-event
+ const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= is_b_event;
+ return NULL;
+ }
+ else
+ {
+ DBUG_ASSERT(!rli->curr_group_seen_begin);
+ }
+ }
+
+ //else // g
- // B-event is appended to the Deferred Array associated with GCAP
- // TODO: refine contains_partition_info() to not include BEGIN
+ if (contains_partition_info())
+ {
+ // a lot of things inside `get_slave_worker_id'
+ const_cast<Relay_log_info *>(rli)->last_assigned_worker=
+ worker= get_slave_worker(get_db(), const_cast<Relay_log_info *>(rli));
+ get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+ if (g.worker_id == (ulong) -1) // assign "offically" the current group
+ {
+ g.worker_id= worker->id; // todo/fixme: think of Slave_worker* here
+ set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+ }
}
+ else // r
+ if (rli->last_assigned_worker)
+ {
+ worker= rli->last_assigned_worker;
+
+ DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0); // g must've done
+ }
+ else // p
+ {
+ Log_event *ptr_curr_ev= this;
+
+ // TODO: assert possible event types
+
+ insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
+ (uchar*) &ptr_curr_ev);
+
+ DBUG_ASSERT(rli->curr_group_da.elements > 0);
+ }
// T
- if (ends_group())
+ if (ends_group() || !rli->curr_group_seen_begin)
{
+ uint i;
// assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
mts_group_cnt= rli->gaq->assigned_group_index;
+
+ DBUG_ASSERT(worker == rli->last_assigned_worker);
+ if (!worker)
+ {
+ // a very special case of the empty group: {B, T}
+ DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
+ && rli->curr_group_da.elements == 1);
+ worker= get_slave_worker("", const_cast<Relay_log_info *>(rli));
+ }
+
+ // CGAP cleanup
+ for (i= rli->curr_group_assigned_parts.elements; i > 0; i--)
+ delete_dynamic_element(&const_cast<Relay_log_info*>(rli)->
+ curr_group_assigned_parts, i - 1);
+ const_cast<Relay_log_info*>(rli)->last_assigned_worker= NULL;
- // *TODO* cleanup: CGAP := nil
+ // reset the B-group marker
+ const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
}
-
- // todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
-
- // r
- return (rli->last_assigned_worker);
+
+ return worker;
}
static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
@@ -2385,29 +2440,45 @@ void append_item_to_jobs(slave_job_item
*/
int Log_event::apply_event(Relay_log_info const *rli)
{
+ uint i;
DBUG_ENTER("LOG_EVENT:apply_event");
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
+ Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
DBUG_RETURN(do_apply_event(rli));
- if (!(w= get_slave_worker_id(rli)) ||
- DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
- DBUG_RETURN(TRUE);
+ if ((!(w= get_slave_worker_id(rli)) ||
+ DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
+ DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
job_item->data= this;
DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
- // TODO: MTS-APH DA (BEGIN)
- // for (i= 0; i < CGDA.elements; i++)
- // {
- // append_item_to_jobs(job_item, CGDA[i], const_cast<Relay_log_info*>(rli));
- // }
- // todo: resize if elements > min
- // CGDA.elements= 0;
- append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
+ if (rli->curr_group_da.elements > 0)
+ {
+ // the current event sorted out which partion the current group belongs to.
+ // It's time now to processed deferred array events.
+ for (i= 0; i < rli->curr_group_da.elements; i++)
+ {
+ Slave_job_item da_item;
+ get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
+ append_item_to_jobs(&da_item, w, c_rli);
+ }
+
+ if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
+ {
+ DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+ c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
+ c_rli->curr_group_da.max_element= 0;
+ freeze_size(&c_rli->curr_group_da); // restores max_element
+ }
+ c_rli->curr_group_da.elements= 0; // to keep max_element-based allocation
+ }
+
+ append_item_to_jobs(job_item, w, c_rli);
DBUG_RETURN(FALSE);
}
@@ -2486,20 +2557,42 @@ int slave_worker_exec_job(Slave_worker *
if (ev->starts_group())
{
- // ... ? do nothing ?
- }
- if (ev->contains_partition_info())
- {
- // TODO: w->CGEP .= ev->get_db()
- }
- if (ev->ends_group())
+ w->curr_group_seen_begin= TRUE; // The current group is started with B-event
+ }
+ else
{
- w->slave_worker_ends_group(ev->mts_group_cnt);
+ if (ev->contains_partition_info())
+ {
+ DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+ uint i;
+ char key[NAME_LEN + 2];
+ bool found= FALSE;
+ const char *dbname= ev->get_db();
+ uchar dblength= (uint) strlen(dbname);
+
+ for (i= 0; i < ep->elements && !found; i++)
+ {
+ get_dynamic(ep, (uchar*) key, i);
+ found=
+ (key[0] == dblength) &&
+ (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+ }
+ if (!found)
+ {
+ key[0]= dblength;
+ memcpy(key + 1, dbname, dblength + 1);
+ insert_dynamic(ep, (uchar*) key);
+ }
+ }
}
-
w->w_rli->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
error= ev->do_apply_event(rli);
+ if (ev->ends_group() || !w->curr_group_seen_begin)
+ {
+ w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
+ }
+
mysql_mutex_lock(&w->jobs_lock);
de_queue(&w->jobs, job_item);
/*
@@ -2507,7 +2600,8 @@ int slave_worker_exec_job(Slave_worker *
todo: convert update_pos(w->w_rli) -> update_pos(w)
to remove w_rli w/a
*/
- ev->update_pos(w->w_rli);
+ if (!error)
+ ev->update_pos(w->w_rli);
// delete ev; // after ev->update_pos() event is garbage
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli.cc 2010-11-25 08:47:39 +0000
@@ -76,7 +76,7 @@ Relay_log_info::Relay_log_info(bool is_s
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
rows_query_ev(NULL), last_event_start_time(0),
- this_worker(NULL),
+ this_worker(NULL), slave_worker_is_error(NULL),
sql_delay(0), sql_delay_end(0),
m_flags(0)
{
@@ -155,7 +155,7 @@ Relay_log_info::~Relay_log_info()
/**
The method can be run both by C having the Main (coord) rli context and
by W having both the main and the Local (worker) rli context.
- Decistion matrix:
+ Decision matrix:
Main Local
-+-------------
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-20 17:23:42 +0000
+++ b/sql/rpl_rli.h 2010-11-25 08:47:39 +0000
@@ -433,12 +433,22 @@ public:
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
+ DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
+ bool curr_group_seen_begin; // current group started with B-event or not
+ volatile Slave_worker* slave_worker_is_error;
Slave_worker* get_current_worker() const;
Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
- ulong least_occupied_worker; // ... todo: APH ...
+
+ /*
+ A sorted array of Worker current assignements number to provide
+ approximate view on Workers loading.
+ The first row of the least occupied Worker is queried at assigning
+ a new partition. Is updated at checkpoint commit to the main RLI.
+ */
+ DYNAMIC_ARRAY least_occupied_workers;
/**
Helper function to do after statement completion.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-19 14:51:58 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-25 08:47:39 +0000
@@ -155,6 +155,12 @@ size_t Slave_worker::get_number_worker_f
static HASH mapping_db_to_worker;
static bool inited_hash_workers= FALSE;
+PSI_mutex_key key_mutex_slave_worker_hash;
+PSI_cond_key key_cond_slave_worker_hash;
+static mysql_mutex_t slave_worker_hash_lock;
+static mysql_cond_t slave_worker_hash_cond;
+
+
extern "C" uchar *get_key(const uchar *record, size_t *length,
my_bool not_used __attribute__((unused)))
{
@@ -175,17 +181,20 @@ static void free_entry(db_worker *entry)
DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
- entry->worker->usage_partition--;
my_free((void *) entry->db);
my_free(entry);
DBUG_VOID_RETURN;
}
-
bool init_hash_workers(ulong slave_parallel_workers)
{
DBUG_ENTER("init_hash_workers");
+
+ mysql_mutex_init(key_mutex_slave_worker_hash, &slave_worker_hash_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_cond_slave_worker_hash, &slave_worker_hash_cond, NULL);
+
inited_hash_workers=
(my_hash_init(&mapping_db_to_worker, &my_charset_bin,
0, 0, 0, get_key,
@@ -198,6 +207,9 @@ void destroy_hash_workers()
DBUG_ENTER("destroy_hash_workers");
if (inited_hash_workers)
my_hash_free(&mapping_db_to_worker);
+ mysql_mutex_destroy(&slave_worker_hash_lock);
+ mysql_cond_destroy(&slave_worker_hash_cond);
+
DBUG_VOID_RETURN;
}
@@ -217,7 +229,8 @@ void destroy_hash_workers()
W_d := W_c unless W_c is NULL.
- When W_c is NULL it is assigned to a least occupied,
+ When W_c is NULL it is assigned to a least occupied as defined by
+ @c get_least_occupied_worker().
W_d := W_c := W_{least_occupied}
@@ -241,8 +254,12 @@ void destroy_hash_workers()
@return the pointer to a Worker struct
*/
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers /*, W_c */)
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli)
{
+ uint i;
+ char key[NAME_LEN + 2];
+ DYNAMIC_ARRAY *workers= &rli->workers;
+
DBUG_ENTER("get_slave_worker");
if (!inited_hash_workers)
@@ -250,7 +267,21 @@ Slave_worker *get_slave_worker(const cha
db_worker *entry= NULL;
my_hash_value_type hash_value;
- uint dblength= (uint) strlen(dbname);
+ uchar dblength= (uint) strlen(dbname);
+
+ // Search in CGAP
+ for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
+ {
+ get_dynamic(&rli->curr_group_assigned_parts, (uchar*) key, i);
+ if ((uchar) key[0] != dblength)
+ continue;
+ else
+ if (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0)
+ DBUG_RETURN(rli->last_assigned_worker);
+ }
+ key[0]= dblength;
+ memcpy(key + 1, dbname, dblength + 1);
+ insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
/*
@@ -262,10 +293,17 @@ Slave_worker *get_slave_worker(const cha
hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
dblength);
- if (!(entry= (db_worker *)
- my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
- (uchar*) dbname, dblength)))
+
+ mysql_mutex_lock(&slave_worker_hash_lock);
+
+ entry= (db_worker *)
+ my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+ (uchar*) dbname, dblength);
+ if (!entry)
{
+ my_bool ret;
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+
DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
/*
Allocate an entry to be inserted and if the operation fails
@@ -281,15 +319,18 @@ Slave_worker *get_slave_worker(const cha
}
strmov(db, dbname);
entry->db= db;
+ entry->usage= 1;
/*
- Get a free worker based on a policy described in the function
- get_free_worker().
+ Unless \exists the last assigned Worker, get a free worker based
+ on a policy described in the function get_least_occupied_worker().
*/
- Slave_worker *worker= get_free_worker(workers);
- entry->worker= worker;
- worker->usage_partition++;
+ entry->worker= !rli->last_assigned_worker ?
+ get_least_occupied_worker(workers) : rli->last_assigned_worker;
- if (my_hash_insert(&mapping_db_to_worker, (uchar*) entry))
+ mysql_mutex_lock(&slave_worker_hash_lock);
+ ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+ if (ret)
{
my_free(db);
my_free(entry);
@@ -298,6 +339,45 @@ Slave_worker *get_slave_worker(const cha
}
DBUG_PRINT("info", ("Inserted %s, %d", entry->db, (int) strlen(entry->db)));
}
+ else
+ {
+ /* There is a record. Either */
+ if (entry->usage == 0)
+ {
+ entry->worker= !rli->last_assigned_worker ?
+ get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->usage++;
+ my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+ (uchar*) dbname, dblength);
+ }
+ else if (entry->worker == rli->last_assigned_worker ||
+ !rli->last_assigned_worker)
+ {
+
+ DBUG_ASSERT(entry->worker);
+
+ entry->usage++;
+ my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+ (uchar*) dbname, dblength);
+ }
+ else // may be the hashing conflict
+ {
+ DBUG_ASSERT(rli->last_assigned_worker == NULL ||
+ rli->curr_group_assigned_parts.elements > 1);
+
+ DBUG_ASSERT(0); // ... TODO ... *not* ready yet
+
+ // future assignenment and marking at the same time
+ entry->worker= rli->last_assigned_worker;
+
+ wait();
+
+ DBUG_ASSERT(entry->usage == 0);
+ entry->usage= 1;
+
+ }
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+ }
err:
if (entry)
@@ -306,15 +386,18 @@ err:
DBUG_RETURN(entry ? entry->worker : NULL);
}
-Slave_worker *get_free_worker(DYNAMIC_ARRAY workers)
+/**
+ least_occupied in partition number sense.
+*/
+Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
{
ulong usage= ULONG_MAX;
Slave_worker *current_worker= NULL, *worker= NULL;
ulong i= 0;
- for (i= 0; i< workers.elements; i++)
+ for (i= 0; i< ws->elements; i++)
{
- get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) ¤t_worker, i);
+ get_dynamic(ws, (uchar*) ¤t_worker, i);
if (current_worker->usage_partition <= usage)
{
worker= current_worker;
@@ -323,6 +406,11 @@ Slave_worker *get_free_worker(DYNAMIC_AR
}
DBUG_ASSERT(worker != NULL);
+
+ worker->usage_partition++;
+
+ DBUG_ASSERT(worker->usage_partition != 0);
+
return(worker);
}
@@ -332,18 +420,58 @@ Slave_worker *get_free_worker(DYNAMIC_AR
Affected by the being committed group APH tuples are updated.
@c last_group_done_index member is set to the arg value.
- for each D_i in CGEP
- assert (W_id == APH.W_id|P_d == D_i)
- update APH set U-- where P_d = D_i
- if U == 0 \and count(APH) > max
- delete from APH where U = 0
-
- CGEP the Worker partition cache is cleaned up.
+
+ CGEP the Worker partition cache is cleaned up.
+
+ TODO: reclaim space if the actual size exceeds the limit.
*/
-void Slave_worker::slave_worker_ends_group(ulong gaq_idx)
+void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
{
- last_group_done_index = gaq_idx;
+ uint i;
+
+ if (!error)
+ last_group_done_index = gaq_idx;
+
+ for (i= curr_group_exec_parts.elements; i > 0; i--)
+ {
+ db_worker *entry= NULL;
+ my_hash_value_type hash_value;
+ char key[NAME_LEN + 2];
+
+ get_dynamic(&curr_group_exec_parts, (uchar*) key, i - 1);
+ hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
+
+ mysql_mutex_lock(&slave_worker_hash_lock);
+
+ entry= (db_worker *)
+ my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+ (uchar*) key + 1, key[0]);
+
+ DBUG_ASSERT(entry && entry->usage != 0 && entry->worker == this);
+
+ DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
+
+ entry->usage--;
+ my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+ (uchar*) key + 1, key[0]);
+
+ if (entry->usage == 0)
+ usage_partition--;
+ else
+ DBUG_ASSERT(usage_partition != 0);
+ /*
+ TODO:
+ if U == 0 \and count(APH) > max
+ delete from APH where U = 0;
+ delete entry;
+ */
+
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+
+ delete_dynamic_element(&curr_group_exec_parts, i - 1);
+ }
+ curr_group_seen_begin= FALSE;
}
@@ -476,6 +604,8 @@ ulong Slave_committed_queue::move_queue_
Slave_job_group g;
ulong l;
get_dynamic(&Q, (uchar *) &g, i);
+ if (g.worker_id == (ulong) -1)
+ break; /* the head is not even assigned */
get_dynamic(ws, (uchar *) &w_i, g.worker_id);
get_dynamic(&last_done, (uchar *) &l, w_i->id);
@@ -491,6 +621,16 @@ ulong Slave_committed_queue::move_queue_
{
ulong ind= de_queue((uchar*) &lwm);
+ /* todo/fixme: the least occupied sorting out can be triggered here */
+ /* e.g
+ set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+ sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+
+ int ulong_cmp(ulong *id1, ulong *id2)
+ {
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+ }
+ */
DBUG_ASSERT(ind == i);
DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-20 17:23:42 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-25 08:47:39 +0000
@@ -6,16 +6,24 @@
#include "rpl_rli.h"
#include <my_sys.h>
+/* APH entry */
struct db_worker
{
const char *db;
Slave_worker *worker;
+ ulong usage;
+
+ // todo: relax concurrency after making APH mutex/cond pair has worked
+ // pthread_mutex_t
+ // pthread_cond_t
+ // timestamp updated_at;
+
} typedef db_worker;
bool init_hash_workers(ulong slave_parallel_workers);
void destroy_hash_workers();
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
-Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
+Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
#define SLAVE_WORKER_QUEUE_SIZE 8096
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
@@ -152,7 +160,7 @@ public:
Relay_log_info *w_rli;
DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
-
+ bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
// @c last_group_done_index is for recovery, although can be viewed
// as statistics as well.
// C marks a T-event with the incremented group_cnt that is
@@ -193,7 +201,7 @@ public:
size_t get_number_worker_fields();
- void slave_worker_ends_group(ulong); // CGEP walk through to upd APH
+ void slave_worker_ends_group(ulong, int); // CGEP walk through to upd APH
private:
bool read_info(Rpl_info_handler *from);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-22 18:57:13 +0000
+++ b/sql/rpl_slave.cc 2010-11-25 08:47:39 +0000
@@ -1040,7 +1040,7 @@ static bool sql_slave_killed(THD* thd, R
DBUG_ASSERT(rli->info_thd == thd || thd->slave_thread);
DBUG_ASSERT(rli->slave_running == 1 || thd->slave_thread);// tracking buffer overrun
- if (abort_loop || thd->killed || rli->abort_slave)
+ if (abort_loop || thd->killed || rli->abort_slave || rli->slave_worker_is_error)
{
if (thd->transaction.all.modified_non_trans_table && rli->is_in_group())
{
@@ -2508,6 +2508,14 @@ static int sql_delay_event(Log_event *ev
DBUG_RETURN(0);
}
+/**
+ a sort_dynamic function on ulong type
+ returns as specified by @c qsort_cmp
+*/
+int ulong_cmp(ulong *id1, ulong *id2)
+{
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+}
/**
Applies the given event and advances the relay log position.
@@ -2672,10 +2680,24 @@ int apply_event_and_update_pos(Log_event
error= ev->update_pos(rli);
#if 1
- // experimental checkpoint per each scheduling attempt
- // logics of next_event()
+ // experimental checkpoint per each scheduling attempt
+ // logics of next_event()
+ {
+ uint i;
+ rli->gaq->move_queue_head(&rli->workers);
- rli->gaq->move_queue_head(&rli->workers);
+ /* TODO:
+ the least occupied sorting out needs moving to the actual
+ checkpoint location - next_event()
+ */
+ for (i= 0; i < rli->workers.elements; i++)
+ {
+ Slave_worker *w_i;
+ get_dynamic(&rli->workers, (uchar *) &w_i, i);
+ set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
+ };
+ sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
+ }
#endif
@@ -3537,8 +3559,9 @@ pthread_handler_t handle_slave_worker(vo
error= slave_worker_exec_job(w, rli);
}
- if (!rli->info_thd->killed)
+ if (error)
{
+ rli->slave_worker_is_error= w;
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
@@ -3639,7 +3662,7 @@ int slave_start_single_worker(Relay_log_
// CGEP dynarray holds id:s of partitions of the Current being executed Group
my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
SLAVE_INIT_DBS_IN_GROUP, 1);
-
+ w->curr_group_seen_begin= FALSE;
if (pthread_create(&th, &connection_attrib, handle_slave_worker,
(void*) w))
{
@@ -3652,6 +3675,8 @@ int slave_start_single_worker(Relay_log_
if (w->jobs.len != 0)
mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
mysql_mutex_unlock(&w->jobs_lock);
+ // Least occupied inited with zero
+ insert_dynamic(&rli->least_occupied_workers, (uchar*) &w->jobs.len);
err:
return error;
@@ -3664,7 +3689,12 @@ int slave_start_workers(Relay_log_info *
int error= 0;
// CGAP dynarray holds id:s of partitions of the Current being executed Group
- my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1);
+ my_init_dynamic_array(&rli->curr_group_assigned_parts, 1 + NAME_LEN + 1, SLAVE_INIT_DBS_IN_GROUP, 1);
+ rli->last_assigned_worker= NULL; /* associated with curr_group_assigned */
+ my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2);
+ // Least_occupied_workers array
+ my_init_dynamic_array(&rli->least_occupied_workers,
+ sizeof(Slave_jobs_queue::len), n, 0);
// GAQ queue holds seqno:s of scheduled groups. C polls workers in
// @c lwm_checkpoint_period to update GAQ (see @c next_event())
@@ -3676,15 +3706,16 @@ int slave_start_workers(Relay_log_info *
sizeof(Slave_job_group),
::slave_max_pending_jobs, n);
rli->mts_total_groups= 0;
+ rli->slave_worker_is_error= NULL;
+ rli->curr_group_seen_begin= NULL;
+
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
- {
goto err;
- }
}
- if (init_hash_workers(n)) // MTS TODO: mapping_db_to_worker -> APH
+ if (init_hash_workers(n)) // MTS: mapping_db_to_worker
{
sql_print_error("Failed to init partitions hash");
error= 1;
@@ -3701,7 +3732,8 @@ err:
void slave_stop_workers(Relay_log_info *rli)
{
int i;
-
+ THD *thd= rli->info_thd;
+
for (i= rli->workers.elements - 1; i >= 0; i--)
{
Slave_worker *w;
@@ -3721,7 +3753,7 @@ void slave_stop_workers(Relay_log_info *
mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
}
- thd_proc_info(rli->info_thd, "Waiting for workers to exit");
+ thd_proc_info(thd, "Waiting for workers to exit");
for (i= rli->workers.elements - 1; i >= 0; i--)
{
@@ -3729,16 +3761,22 @@ void slave_stop_workers(Relay_log_info *
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
mysql_mutex_lock(&w->jobs_lock);
- if (w->jobs.len != rli->slave_pending_jobs_max + 1)
+ while (w->jobs.len != rli->slave_pending_jobs_max + 1)
+ {
+ const char *save_proc_info;
+ save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
+ "Waiting for workers to exit");
mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+ thd->exit_cond(save_proc_info);
+ mysql_mutex_lock(&w->jobs_lock);
+ }
mysql_mutex_unlock(&w->jobs_lock);
-
mysql_mutex_destroy(&w->jobs_lock);
mysql_cond_destroy(&w->jobs_cond);
DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
delete_dynamic(&w->jobs.Q);
-
+ delete_dynamic(&w->curr_group_exec_parts); // GCEP
delete_dynamic_element(&rli->workers, i);
delete w;
}
@@ -3747,6 +3785,9 @@ void slave_stop_workers(Relay_log_info *
destroy_hash_workers();
delete rli->gaq;
+ delete_dynamic(&rli->least_occupied_workers); // least occupied
+ delete_dynamic(&rli->curr_group_da); // GCDA
+ delete_dynamic(&rli->curr_group_assigned_parts); // GCAP
}
/**
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101125084739-m413cpf2n9kzfntl.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3216) WL#5569 | Andrei Elkin | 25 Nov |