List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:September 15 2010 5:27pm
Subject:bzr push into mysql-next-mr-rpl-merge branch (alfranio.correia:3034 to 3036)
View as plain text  
 3036 Alfranio Correia	2010-09-15
      Created a sketch of a hash map from database names to workers.

    added:
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
    modified:
      sql/CMakeLists.txt
      sql/Makefile.am
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
 3035 Alfranio Correia	2010-09-15 [merge]
      merge mysql-next-mr-wl5563 --> mysql-next-mr.crash-safe

    added:
      mysql-test/suite/rpl/r/rpl_parallel.result
      mysql-test/suite/rpl/t/rpl_parallel.test
    modified:
      sql/log_event.cc
 3034 Alfranio Correia	2010-09-15
      Post-merge fix.

    modified:
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/suite/sys_vars/r/all_vars.result
=== added file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2010-09-15 11:51:49 +0000
@@ -0,0 +1,40 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+start slave io_thread;
+create database test3;
+use test3;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test2;
+use test2;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test1;
+use test1;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+create database test0;
+use test0;
+create table tm_nk (a int, b int) engine=myisam;
+create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+create table ti_nk (a int, b int) engine=innodb;
+create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+start slave sql_thread;
+Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test0.tm_wk and slave:test0.tm_wk
+Comparing tables master:test0.ti_nk and slave:test0.ti_nk
+Comparing tables master:test0.ti_wk and slave:test0.ti_wk
+set @@global.slave_exec_mode= @save.slave_exec_mode;

=== added file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2010-09-15 11:51:49 +0000
@@ -0,0 +1,161 @@
+source include/master-slave.inc;
+
+let $workers = 4;
+let $iter = 50;
+
+connection slave;
+source include/stop_slave.inc;
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+start slave io_thread;
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+use test;
+delimiter |;
+create procedure one_session(k int)
+begin
+  while k > 0 do
+    insert into tm_nk values(k, 0);
+    insert into tm_wk values(null, 0);
+    insert into ti_nk values(k, 0);
+    insert into ti_wk values(null, 0);
+    set k = k - 1;
+  end while;
+end|
+delimiter ;|
+
+--enable_result_log
+--enable_query_log
+
+## let $i = $workers + 1;
+##eval 
+# delimiter |;
+# create procedure p1(i int)
+# begin
+#     while i > 0
+#       ##while ($i)
+#       ##{
+#       ## let $i1=$i;
+#       ## dec $i1;
+#       ## use test$i1;
+#       ## call on_session();
+#       ## dec $i;
+#       ##}
+#       use test0;
+#       call one_session();
+#       use test1;
+#       call one_session();
+#       use test2;
+#       call one_session();
+#       use test3;
+#       call one_session();
+#       i= i-1;
+#     end while;
+# end|
+# delimiter ;|
+
+
+let $i = $workers + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  
+  eval create database test$i1;
+  eval use test$i1;
+  create table tm_nk (a int, b int) engine=myisam;
+  create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
+  create table ti_nk (a int, b int) engine=innodb;
+  create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+
+  dec $i;
+}
+
+##call p1(1);
+
+--disable_query_log
+--disable_result_log
+
+#
+# Load producer
+#
+while ($iter)
+{
+    let $i = $workers + 1;
+
+    while ($i)
+    {
+	let $i1 = $i;
+	dec $i1;
+
+	eval use test$i1;
+	##call test.one_session(1);
+	eval insert into tm_nk values($iter, $i1);
+	eval insert into tm_wk values(null, $i1);
+	eval insert into ti_nk values($iter, $i1);
+	eval insert into ti_wk values(null, $i1);
+    
+	dec $i;
+    }
+
+    dec $iter;
+}
+
+--enable_result_log
+--enable_query_log
+
+connection slave;
+
+## todo: record start and end time of appying to compare times of
+#  parallel and serial execution.
+
+start slave sql_thread;
+
+--sleep 5  # todo: convert to wait for the last event has been applied
+
+let $diff_table_1=master:test0.tm_nk;
+let $diff_table_2=slave:test0.tm_nk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.tm_wk;
+let $diff_table_2=slave:test0.tm_wk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.ti_nk;
+let $diff_table_2=slave:test0.ti_nk;
+source include/diff_tables.inc;
+
+let $diff_table_1=master:test0.ti_wk;
+let $diff_table_2=slave:test0.ti_wk;
+source include/diff_tables.inc;
+
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $workers + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+
+  #eval drop database test$i1;
+  dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+connection slave;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+connection master;
+### select sleep(10000);
+
+# End of 4.1 tests

=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt	2010-09-13 23:57:59 +0000
+++ b/sql/CMakeLists.txt	2010-09-15 17:26:44 +0000
@@ -105,7 +105,7 @@ ADD_DEPENDENCIES(master GenError)
 SET (SLAVE_SOURCE rpl_slave.cc rpl_reporting.cc rpl_mi.cc rpl_rli.cc
 		  rpl_info_handler.cc rpl_info_file.cc rpl_info_table.cc
 		  rpl_info_fields.cc rpl_info.cc rpl_info_factory.cc
-		  rpl_info_table_access.cc server_ids.h)
+		  rpl_info_table_access.cc server_ids.h rpl_rli_pdb.cc)
 ADD_LIBRARY(slave ${SLAVE_SOURCE})
 ADD_DEPENDENCIES(slave GenError)
 ADD_LIBRARY(sqlgunitlib mdl.cc sql_list.cc sql_string.cc thr_malloc.cc)

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2010-09-06 11:10:01 +0000
+++ b/sql/Makefile.am	2010-09-15 17:26:44 +0000
@@ -111,7 +111,7 @@ noinst_HEADERS =	item.h item_func.h item
 			log.h sql_show.h rpl_info.h rpl_info_file.h \
 			rpl_info_table.h rpl_rli.h rpl_mi.h rpl_info_fields.h \
 			rpl_info_table_access.h \
-			rpl_info_factory.h server_ids.h \
+			rpl_info_factory.h server_ids.h rpl_rli_pdb.h \
 			sql_select.h structs.h table.h sql_udf.h hash_filo.h \
 			lex.h lex_symbol.h sql_acl.h sql_crypt.h sql_base.h \
 			sql_table.h key.h lock.h thr_malloc.h strfunc.h \
@@ -195,7 +195,7 @@ libmaster_la_SOURCES =	rpl_master.cc
 libslave_la_SOURCES = 	rpl_slave.cc rpl_reporting.cc rpl_rli.cc rpl_mi.cc \
 			rpl_info.cc rpl_info_factory.cc rpl_info_file.cc \
 			rpl_info_handler.cc rpl_info_table.cc \
-			rpl_info_table_access.cc rpl_info_fields.cc
+			rpl_info_table_access.cc rpl_info_fields.cc rpl_rli_pdb.cc
 libndb_la_CPPFLAGS=	@ndbcluster_includes@
 libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndbcluster_binlog.cc \

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-09-15 01:39:03 +0000
+++ b/sql/log_event.cc	2010-09-15 17:26:44 +0000
@@ -44,6 +44,7 @@
 #include "rpl_record.h"
 #include "transaction.h"
 #include <my_dir.h>
+#include "rpl_rli_pdb.h"
 
 #endif /* MYSQL_CLIENT */
 
@@ -2108,22 +2109,23 @@ Log_event::continue_group(Relay_log_info
 
 uint Log_event::get_slave_worker_id(Relay_log_info const *rli)
 {
-  uint ret= (get_type_code() == XID_EVENT  ||
-             get_type_code() == INTVAR_EVENT ||
-             get_type_code() == USER_VAR_EVENT ||
-             get_type_code() == RAND_EVENT ||
-             get_type_code() == DELETE_ROWS_EVENT ||
-             get_type_code() == UPDATE_ROWS_EVENT ||
-             get_type_code() == WRITE_ROWS_EVENT) ?
-    rli->last_assigned_worker :
-    (const_cast<Relay_log_info *>(rli)->last_assigned_worker=
-     atoi(get_db() + 4) % rli->workers.elements);
+  if (!(get_type_code() == XID_EVENT  ||
+    get_type_code() == INTVAR_EVENT ||
+    get_type_code() == USER_VAR_EVENT ||
+    get_type_code() == RAND_EVENT ||
+    get_type_code() == DELETE_ROWS_EVENT ||
+    get_type_code() == UPDATE_ROWS_EVENT ||
+    get_type_code() == WRITE_ROWS_EVENT))
+  {
+    // TODO --- ANDREI, please you need to take care of errors at this point
+    // worker == NULL means that it was not possible to get an worker.
+    Slave_worker *worker= get_slave_worker(get_db(), rli->workers);
+    const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker->id;
+  }
 
-  return ret;
-  //rand() % rli->workers.elements;
+  return (rli->last_assigned_worker);
 }
 
-
 void append_item_to_jobs(struct slave_job_item *job_item,
                           struct slave_worker *w, Relay_log_info *rli)
 {
@@ -2275,7 +2277,7 @@ int slave_worker_exec_job(struct slave_w
   THD *thd= w->thd;
   Log_event *ev= NULL;
 
-  DBUG_ENTER("slave_worker_exec_job()");
+  DBUG_ENTER("slave_worker_exec_job");
 
   if (!(job_item= pop_jobs_item(w)))
   {
@@ -2326,6 +2328,7 @@ int slave_worker_exec_job(struct slave_w
            (used_ev=
             static_cast<Log_event *>(w->data_in_use.first_node()->info))->soiled)
     {
+      DBUG_PRINT("slave_worker_exec_job GC:", ("W_%lu, event %p", w->id, used_ev));
       delete used_ev;
       w->data_in_use.pop();
     }

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-09-15 01:39:03 +0000
+++ b/sql/rpl_rli.cc	2010-09-15 17:26:44 +0000
@@ -27,6 +27,7 @@
 #include "transaction.h"
 #include "sql_parse.h"                          // end_trans, ROLLBACK
 #include "rpl_slave.h"
+#include "rpl_rli_pdb.h"
 
 /*
   Please every time you add a new field to the relay log info, update
@@ -96,7 +97,10 @@ Relay_log_info::Relay_log_info(bool is_s
   */
   slave_pending_jobs_max= 50000; // TODO: convert into a param
 
+  // TODO -- ANDREI --- You need to move this to a init routine because it may fail.
   my_init_dynamic_array(&workers, sizeof(struct slave_worker *), slave_parallel_workers, 4);
+  init_hash_workers(slave_parallel_workers);
+
   DBUG_VOID_RETURN;
 }
 
@@ -110,7 +114,10 @@ Relay_log_info::~Relay_log_info()
 
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
+
+  // ANDREI ---- See comment on the init routine in the constructor.
   //free_root(&jobs_mem_root, MYF(0));
+  destroy_hash_workers();
   free_root(&job_list_mem_root, MYF(0));
   delete_dynamic(&workers);
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-09-15 01:39:03 +0000
+++ b/sql/rpl_rli.h	2010-09-15 17:26:44 +0000
@@ -61,6 +61,7 @@ typedef struct slave_worker
   ulong stmt_jobs;  // how many jobs per stmt
   ulong trans_jobs;  // how many jobs per trns
   volatile int curr_jobs; // the current assignments
+  ulong usage_partition; // number of different partitions handled by this worker
 } Slave_worker;
 
 class List_jobs : public List<struct slave_job_item>
@@ -319,7 +320,7 @@ public:
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
   int   slave_pending_jobs_max;
-  uint  last_assigned_worker; // a hint to partitioning func for some events 
+  ulong  last_assigned_worker; // a hint to partitioning func for some events
   /*
     Keeper of `free_jobs'. Items in `free_jobs' migrate to workers' assignement
     private queues, processed and become back free. At the end of event execution

=== added file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-09-15 17:26:44 +0000
@@ -0,0 +1,133 @@
+#include "my_global.h"                          /* NO_EMBEDDED_ACCESS_CHECKS */
+#include "sql_priv.h"
+#include "unireg.h"
+#include "rpl_rli_pdb.h"
+#include "sql_string.h"
+#include <hash.h>
+
+HASH mapping_db_to_worker;
+
+extern "C" uchar *get_key(const uchar *record, size_t *length,
+                          my_bool not_used __attribute__((unused)))
+{
+  DBUG_ENTER("get_key");
+
+  db_worker *entry=(db_worker *) record;
+  *length= strlen(entry->db);
+
+  DBUG_PRINT("info", ("get_key  %s, %d", entry->db, (int) *length));
+
+  DBUG_RETURN((uchar*) entry->db);
+}
+
+
+static void free_entry(db_worker *entry)
+{
+  DBUG_ENTER("free_entry");
+
+  DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
+
+  entry->worker->usage_partition--;
+  my_free((void *) entry->db);
+  my_free(entry);
+
+  DBUG_VOID_RETURN;
+}
+
+
+int init_hash_workers(ulong slave_parallel_workers)
+{
+  DBUG_ENTER("init_hash_workers");
+  DBUG_RETURN (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
+                            0, 0, 0, get_key,
+                            (my_hash_free_key) free_entry, 0) != 0);
+}
+
+void destroy_hash_workers()
+{
+  DBUG_ENTER("destroy_hash_workers");
+  my_hash_free(&mapping_db_to_worker);
+  DBUG_VOID_RETURN;
+}
+
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers)
+{
+  db_worker *entry= NULL;
+  my_hash_value_type hash_value;
+  uint dblength= (uint) strlen(dbname);
+
+  DBUG_ENTER("get_slave_worker");
+
+  DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
+  /*
+    The database name was not found which means that a worker never
+    processed events from that database. In such case, we need to
+    map the database to a worker my inserting an entry into the
+    hash map.
+  */
+
+  hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
+                           dblength);
+  if (!(entry= (db_worker *)
+      my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+      (uchar*) dbname, dblength)))
+  {
+    DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
+    /*
+      Allocate an entry to be inserted and if the operation fails
+      an error is returned.
+    */
+    char *db= NULL;
+    if (!(db= (char *) my_malloc((size_t)dblength, MYF(0))))
+      goto err;
+    if (!(entry= (db_worker *) my_malloc(sizeof(db_worker), MYF(0))))
+    {
+      my_free(db);
+      goto err;
+    }
+    strmov(db, dbname);
+    entry->db= db;
+    /*
+      Get a free worker based on a policy described in the function
+      get_free_worker().
+    */
+    Slave_worker *worker= get_free_worker(workers);
+    entry->worker= worker;
+    worker->usage_partition++;
+
+    if (my_hash_insert(&mapping_db_to_worker, (uchar*) entry))
+    {
+      my_free(db);
+      my_free(entry);
+      entry= NULL;
+      goto err;
+    }
+    DBUG_PRINT("info", ("Inserted %s, %d", entry->db, (int) strlen(entry->db)));
+  }
+
+err:
+  if (entry)
+    DBUG_PRINT("info", ("Updating %s with worker %lu", entry->db, entry->worker->id));
+    
+  DBUG_RETURN(entry ? entry->worker : NULL);
+}
+
+Slave_worker *get_free_worker(DYNAMIC_ARRAY workers)
+{
+  ulong usage= ULONG_MAX;
+  Slave_worker *current_worker= NULL, *worker= NULL;
+  ulong i= 0;
+
+  for (i= 0; i< workers.elements; i++)
+  {
+    get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &current_worker, i);
+    if (current_worker->usage_partition <= usage)
+    {
+      worker= current_worker;
+      usage= current_worker->usage_partition;
+    }
+  }
+  
+  DBUG_ASSERT(worker != NULL);
+  return(worker);
+}

=== added file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_pdb.h	2010-09-15 17:26:44 +0000
@@ -0,0 +1,17 @@
+#ifndef RPL_RLI_PDB_H
+  #include "sql_string.h"
+  #include "rpl_rli.h"
+  #include <my_sys.h>
+
+  struct db_worker
+  {
+    const char *db;
+    Slave_worker *worker;
+  } typedef db_worker;
+
+  int init_hash_workers(ulong slave_parallel_workers);
+  void destroy_hash_workers();
+  Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
+  Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+
+#endif

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-09-15 01:39:03 +0000
+++ b/sql/rpl_slave.cc	2010-09-15 17:26:44 +0000
@@ -3467,6 +3467,7 @@ int slave_start_workers(Relay_log_info *
     w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
     w->id= i;
     w->current_table= NULL;
+    w->usage_partition= 0;
     init_sql_alloc(&w->mem_root, SLAVE_WORKER_QUEUE_SIZE, 0);
     set_dynamic(&rli->workers, (uchar*) &w, i);
     mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,


Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20100915172644-557zdkqb0ockf0r5.bundle
Thread
bzr push into mysql-next-mr-rpl-merge branch (alfranio.correia:3034 to 3036)Alfranio Correia15 Sep