3036 Alfranio Correia 2010-09-15
Created a sketch of a hash map from database names to workers.
added:
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
modified:
sql/CMakeLists.txt
sql/Makefile.am
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
3035 Alfranio Correia 2010-09-15 [merge]
merge mysql-next-mr-wl5563 --> mysql-next-mr.crash-safe
added:
mysql-test/suite/rpl/r/rpl_parallel.result
mysql-test/suite/rpl/t/rpl_parallel.test
modified:
sql/log_event.cc
3034 Alfranio Correia 2010-09-15
Post-merge fix.
modified:
mysql-test/r/mysqld--help-notwin.result
mysql-test/suite/sys_vars/r/all_vars.result
=== added file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2010-09-15 11:51:49 +0000
@@ -0,0 +1,40 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+start slave io_thread;
+create database test3;
+use test3;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test2;
+use test2;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test1;
+use test1;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test0;
+use test0;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+start slave sql_thread;
+Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test0.tm_wk and slave:test0.tm_wk
+Comparing tables master:test0.ti_nk and slave:test0.ti_nk
+Comparing tables master:test0.ti_wk and slave:test0.ti_wk
+set @@global.slave_exec_mode= @save.slave_exec_mode;
=== added file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2010-09-15 11:51:49 +0000
@@ -0,0 +1,161 @@
+source include/master-slave.inc;
+
+let $workers = 4;
+let $iter = 50;
+
+connection slave;
+source include/stop_slave.inc;
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+start slave io_thread;
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+use test;
+delimiter |;
+create procedure one_session(k int)
+begin
+ while k > 0 do
+ insert into tm_nk values(k, 0);
+ insert into tm_wk values(null, 0);
+ insert into ti_nk values(k, 0);
+ insert into ti_wk values(null, 0);
+ set k = k - 1;
+ end while;
+end|
+delimiter ;|
+
+--enable_result_log
+--enable_query_log
+
+## let $i = $workers + 1;
+##eval
+# delimiter |;
+# create procedure p1(i int)
+# begin
+# while i > 0
+# ##while ($i)
+# ##{
+# ## let $i1=$i;
+# ## dec $i1;
+# ## use test$i1;
+# ## call on_session();
+# ## dec $i;
+# ##}
+# use test0;
+# call one_session();
+# use test1;
+# call one_session();
+# use test2;
+# call one_session();
+# use test3;
+# call one_session();
+# i= i-1;
+# end while;
+# end|
+# delimiter ;|
+
+
+let $i = $workers + 1;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
+
+ eval create database test$i1;
+ eval use test$i1;
+ create table tm_nk (a int, b int) engine=myisam;
+ create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+ create table ti_nk (a int, b int) engine=innodb;
+ create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+
+ dec $i;
+}
+
+##call p1(1);
+
+--disable_query_log
+--disable_result_log
+
+#
+# Load producer
+#
+while ($iter)
+{
+ let $i = $workers + 1;
+
+ while ($i)
+ {
+ let $i1 = $i;
+ dec $i1;
+
+ eval use test$i1;
+ ##call test.one_session(1);
+ eval insert into tm_nk values($iter, $i1);
+ eval insert into tm_wk values(null, $i1);
+ eval insert into ti_nk values($iter, $i1);
+ eval insert into ti_wk values(null, $i1);
+
+ dec $i;
+ }
+
+ dec $iter;
+}
+
+--enable_result_log
+--enable_query_log
+
+connection slave;
+
+## todo: record start and end time of appying to compare times of
+# parallel and serial execution.
+
+start slave sql_thread;
+
+--sleep 5 # todo: convert to wait for the last event has been applied
+
+let $diff_table_1=master:test0.tm_nk;
+let $diff_table_2=slave:test0.tm_nk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.tm_wk;
+let $diff_table_2=slave:test0.tm_wk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.ti_nk;
+let $diff_table_2=slave:test0.ti_nk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.ti_wk;
+let $diff_table_2=slave:test0.ti_wk;
+source include/diff_tables.inc;
+
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $workers + 1;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
+
+ #eval drop database test$i1;
+ dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+connection slave;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+connection master;
+### select sleep(10000);
+
+# End of 4.1 tests
=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt 2010-09-13 23:57:59 +0000
+++ b/sql/CMakeLists.txt 2010-09-15 17:26:44 +0000
@@ -105,7 +105,7 @@ ADD_DEPENDENCIES(master GenError)
SET (SLAVE_SOURCE rpl_slave.cc rpl_reporting.cc rpl_mi.cc rpl_rli.cc
rpl_info_handler.cc rpl_info_file.cc rpl_info_table.cc
rpl_info_fields.cc rpl_info.cc rpl_info_factory.cc
- rpl_info_table_access.cc server_ids.h)
+ rpl_info_table_access.cc server_ids.h rpl_rli_pdb.cc)
ADD_LIBRARY(slave ${SLAVE_SOURCE})
ADD_DEPENDENCIES(slave GenError)
ADD_LIBRARY(sqlgunitlib mdl.cc sql_list.cc sql_string.cc thr_malloc.cc)
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2010-09-06 11:10:01 +0000
+++ b/sql/Makefile.am 2010-09-15 17:26:44 +0000
@@ -111,7 +111,7 @@ noinst_HEADERS = item.h item_func.h item
log.h sql_show.h rpl_info.h rpl_info_file.h \
rpl_info_table.h rpl_rli.h rpl_mi.h rpl_info_fields.h \
rpl_info_table_access.h \
- rpl_info_factory.h server_ids.h \
+ rpl_info_factory.h server_ids.h rpl_rli_pdb.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h \
lex.h lex_symbol.h sql_acl.h sql_crypt.h sql_base.h \
sql_table.h key.h lock.h thr_malloc.h strfunc.h \
@@ -195,7 +195,7 @@ libmaster_la_SOURCES = rpl_master.cc
libslave_la_SOURCES = rpl_slave.cc rpl_reporting.cc rpl_rli.cc rpl_mi.cc \
rpl_info.cc rpl_info_factory.cc rpl_info_file.cc \
rpl_info_handler.cc rpl_info_table.cc \
- rpl_info_table_access.cc rpl_info_fields.cc
+ rpl_info_table_access.cc rpl_info_fields.cc rpl_rli_pdb.cc
libndb_la_CPPFLAGS= @ndbcluster_includes@
libndb_la_SOURCES= ha_ndbcluster.cc \
ha_ndbcluster_binlog.cc \
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-09-15 01:39:03 +0000
+++ b/sql/log_event.cc 2010-09-15 17:26:44 +0000
@@ -44,6 +44,7 @@
#include "rpl_record.h"
#include "transaction.h"
#include <my_dir.h>
+#include "rpl_rli_pdb.h"
#endif /* MYSQL_CLIENT */
@@ -2108,22 +2109,23 @@ Log_event::continue_group(Relay_log_info
uint Log_event::get_slave_worker_id(Relay_log_info const *rli)
{
- uint ret= (get_type_code() == XID_EVENT ||
- get_type_code() == INTVAR_EVENT ||
- get_type_code() == USER_VAR_EVENT ||
- get_type_code() == RAND_EVENT ||
- get_type_code() == DELETE_ROWS_EVENT ||
- get_type_code() == UPDATE_ROWS_EVENT ||
- get_type_code() == WRITE_ROWS_EVENT) ?
- rli->last_assigned_worker :
- (const_cast<Relay_log_info *>(rli)->last_assigned_worker=
- atoi(get_db() + 4) % rli->workers.elements);
+ if (!(get_type_code() == XID_EVENT ||
+ get_type_code() == INTVAR_EVENT ||
+ get_type_code() == USER_VAR_EVENT ||
+ get_type_code() == RAND_EVENT ||
+ get_type_code() == DELETE_ROWS_EVENT ||
+ get_type_code() == UPDATE_ROWS_EVENT ||
+ get_type_code() == WRITE_ROWS_EVENT))
+ {
+ // TODO --- ANDREI, please you need to take care of errors at this point
+ // worker == NULL means that it was not possible to get an worker.
+ Slave_worker *worker= get_slave_worker(get_db(), rli->workers);
+ const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker->id;
+ }
- return ret;
- //rand() % rli->workers.elements;
+ return (rli->last_assigned_worker);
}
-
void append_item_to_jobs(struct slave_job_item *job_item,
struct slave_worker *w, Relay_log_info *rli)
{
@@ -2275,7 +2277,7 @@ int slave_worker_exec_job(struct slave_w
THD *thd= w->thd;
Log_event *ev= NULL;
- DBUG_ENTER("slave_worker_exec_job()");
+ DBUG_ENTER("slave_worker_exec_job");
if (!(job_item= pop_jobs_item(w)))
{
@@ -2326,6 +2328,7 @@ int slave_worker_exec_job(struct slave_w
(used_ev=
static_cast<Log_event *>(w->data_in_use.first_node()->info))->soiled)
{
+ DBUG_PRINT("slave_worker_exec_job GC:", ("W_%lu, event %p", w->id, used_ev));
delete used_ev;
w->data_in_use.pop();
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-09-15 01:39:03 +0000
+++ b/sql/rpl_rli.cc 2010-09-15 17:26:44 +0000
@@ -27,6 +27,7 @@
#include "transaction.h"
#include "sql_parse.h" // end_trans, ROLLBACK
#include "rpl_slave.h"
+#include "rpl_rli_pdb.h"
/*
Please every time you add a new field to the relay log info, update
@@ -96,7 +97,10 @@ Relay_log_info::Relay_log_info(bool is_s
*/
slave_pending_jobs_max= 50000; // TODO: convert into a param
+ // TODO -- ANDREI --- You need to move this to a init routine because it may fail.
my_init_dynamic_array(&workers, sizeof(struct slave_worker *), slave_parallel_workers, 4);
+ init_hash_workers(slave_parallel_workers);
+
DBUG_VOID_RETURN;
}
@@ -110,7 +114,10 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&pending_jobs_lock);
mysql_cond_destroy(&pending_jobs_cond);
+
+ // ANDREI ---- See comment on the init routine in the constructor.
//free_root(&jobs_mem_root, MYF(0));
+ destroy_hash_workers();
free_root(&job_list_mem_root, MYF(0));
delete_dynamic(&workers);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-09-15 01:39:03 +0000
+++ b/sql/rpl_rli.h 2010-09-15 17:26:44 +0000
@@ -61,6 +61,7 @@ typedef struct slave_worker
ulong stmt_jobs; // how many jobs per stmt
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
+ ulong usage_partition; // number of different partitions handled by this worker
} Slave_worker;
class List_jobs : public List<struct slave_job_item>
@@ -319,7 +320,7 @@ public:
mysql_mutex_t pending_jobs_lock;
mysql_cond_t pending_jobs_cond;
int slave_pending_jobs_max;
- uint last_assigned_worker; // a hint to partitioning func for some events
+ ulong last_assigned_worker; // a hint to partitioning func for some events
/*
Keeper of `free_jobs'. Items in `free_jobs' migrate to workers' assignement
private queues, processed and become back free. At the end of event execution
=== added file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-09-15 17:26:44 +0000
@@ -0,0 +1,133 @@
+#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include "sql_priv.h"
+#include "unireg.h"
+#include "rpl_rli_pdb.h"
+#include "sql_string.h"
+#include <hash.h>
+
+HASH mapping_db_to_worker;
+
+extern "C" uchar *get_key(const uchar *record, size_t *length,
+ my_bool not_used __attribute__((unused)))
+{
+ DBUG_ENTER("get_key");
+
+ db_worker *entry=(db_worker *) record;
+ *length= strlen(entry->db);
+
+ DBUG_PRINT("info", ("get_key %s, %d", entry->db, (int) *length));
+
+ DBUG_RETURN((uchar*) entry->db);
+}
+
+
+static void free_entry(db_worker *entry)
+{
+ DBUG_ENTER("free_entry");
+
+ DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
+
+ entry->worker->usage_partition--;
+ my_free((void *) entry->db);
+ my_free(entry);
+
+ DBUG_VOID_RETURN;
+}
+
+
+int init_hash_workers(ulong slave_parallel_workers)
+{
+ DBUG_ENTER("init_hash_workers");
+ DBUG_RETURN (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
+ 0, 0, 0, get_key,
+ (my_hash_free_key) free_entry, 0) != 0);
+}
+
+void destroy_hash_workers()
+{
+ DBUG_ENTER("destroy_hash_workers");
+ my_hash_free(&mapping_db_to_worker);
+ DBUG_VOID_RETURN;
+}
+
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers)
+{
+ db_worker *entry= NULL;
+ my_hash_value_type hash_value;
+ uint dblength= (uint) strlen(dbname);
+
+ DBUG_ENTER("get_slave_worker");
+
+ DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
+ /*
+ The database name was not found which means that a worker never
+ processed events from that database. In such case, we need to
+ map the database to a worker my inserting an entry into the
+ hash map.
+ */
+
+ hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
+ dblength);
+ if (!(entry= (db_worker *)
+ my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+ (uchar*) dbname, dblength)))
+ {
+ DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
+ /*
+ Allocate an entry to be inserted and if the operation fails
+ an error is returned.
+ */
+ char *db= NULL;
+ if (!(db= (char *) my_malloc((size_t)dblength, MYF(0))))
+ goto err;
+ if (!(entry= (db_worker *) my_malloc(sizeof(db_worker), MYF(0))))
+ {
+ my_free(db);
+ goto err;
+ }
+ strmov(db, dbname);
+ entry->db= db;
+ /*
+ Get a free worker based on a policy described in the function
+ get_free_worker().
+ */
+ Slave_worker *worker= get_free_worker(workers);
+ entry->worker= worker;
+ worker->usage_partition++;
+
+ if (my_hash_insert(&mapping_db_to_worker, (uchar*) entry))
+ {
+ my_free(db);
+ my_free(entry);
+ entry= NULL;
+ goto err;
+ }
+ DBUG_PRINT("info", ("Inserted %s, %d", entry->db, (int) strlen(entry->db)));
+ }
+
+err:
+ if (entry)
+ DBUG_PRINT("info", ("Updating %s with worker %lu", entry->db, entry->worker->id));
+
+ DBUG_RETURN(entry ? entry->worker : NULL);
+}
+
+Slave_worker *get_free_worker(DYNAMIC_ARRAY workers)
+{
+ ulong usage= ULONG_MAX;
+ Slave_worker *current_worker= NULL, *worker= NULL;
+ ulong i= 0;
+
+ for (i= 0; i< workers.elements; i++)
+ {
+ get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) ¤t_worker, i);
+ if (current_worker->usage_partition <= usage)
+ {
+ worker= current_worker;
+ usage= current_worker->usage_partition;
+ }
+ }
+
+ DBUG_ASSERT(worker != NULL);
+ return(worker);
+}
=== added file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_pdb.h 2010-09-15 17:26:44 +0000
@@ -0,0 +1,17 @@
+#ifndef RPL_RLI_PDB_H
+ #include "sql_string.h"
+ #include "rpl_rli.h"
+ #include <my_sys.h>
+
+ struct db_worker
+ {
+ const char *db;
+ Slave_worker *worker;
+ } typedef db_worker;
+
+ int init_hash_workers(ulong slave_parallel_workers);
+ void destroy_hash_workers();
+ Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
+ Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+
+#endif
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-09-15 01:39:03 +0000
+++ b/sql/rpl_slave.cc 2010-09-15 17:26:44 +0000
@@ -3467,6 +3467,7 @@ int slave_start_workers(Relay_log_info *
w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
w->id= i;
w->current_table= NULL;
+ w->usage_partition= 0;
init_sql_alloc(&w->mem_root, SLAVE_WORKER_QUEUE_SIZE, 0);
set_dynamic(&rli->workers, (uchar*) &w, i);
mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20100915172644-557zdkqb0ockf0r5.bundle
| Thread |
|---|
| • bzr push into mysql-next-mr-rpl-merge branch (alfranio.correia:3034 to 3036) | Alfranio Correia | 15 Sep |