List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:September 17 2010 8:49am
Subject:bzr commit into mysql-next-mr branch (aelkin:3200) WL#5563
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5563-paraslave_part_db/ based on revid:aelkin@stripped

 3200 Andrei Elkin	2010-09-17
      wl#5563
      
      improved test;
      fixed a delete issue that was used to crash;
      added @@global.slave_local_timestamp to fill in timestamp col slave clock value.
      Performance growth can be seen through the test.
      
      todo: merge with Alfranio work on hashing and dyn alloc of PFS obj:s.

    modified:
      mysql-test/suite/rpl/r/rpl_parallel.result
      mysql-test/suite/rpl/t/rpl_parallel.test
      sql/log_event.cc
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2010-09-15 11:51:49 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2010-09-17 08:49:00 +0000
@@ -4,37 +4,16 @@ reset master;
 reset slave;
 drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
 start slave;
+call mtr.add_suppression('Parallel Slave: pending jobs get to the max');
 include/stop_slave.inc
 set @save.slave_exec_mode= @@global.slave_exec_mode;
 set @@global.slave_exec_mode = 'Parallel';
-start slave io_thread;
-create database test3;
-use test3;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test2;
-use test2;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test1;
-use test1;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test0;
-use test0;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-start slave sql_thread;
+start slave;
+stop slave sql_thread;
+*** you can connect and change the exec mode as well now ***
+*** and select * from benchmark before to run consistency check ***
+Comparing tables master:test3.tm_nk and slave:test3.tm_nk
+Comparing tables master:test2.tm_nk and slave:test2.tm_nk
+Comparing tables master:test1.tm_nk and slave:test1.tm_nk
 Comparing tables master:test0.tm_nk and slave:test0.tm_nk
-Comparing tables master:test0.tm_wk and slave:test0.tm_wk
-Comparing tables master:test0.ti_nk and slave:test0.ti_nk
-Comparing tables master:test0.ti_wk and slave:test0.ti_wk
 set @@global.slave_exec_mode= @save.slave_exec_mode;

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2010-09-15 11:51:49 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2010-09-17 08:49:00 +0000
@@ -1,13 +1,16 @@
 source include/master-slave.inc;
 
 let $workers = 4;
-let $iter = 50;
+let $iter = 5000;
 
 connection slave;
+
+call mtr.add_suppression('Parallel Slave: pending jobs get to the max');
+
 source include/stop_slave.inc;
 set @save.slave_exec_mode= @@global.slave_exec_mode;
 set @@global.slave_exec_mode = 'Parallel';
-start slave io_thread;
+start slave;
 
 connection master;
 
@@ -28,9 +31,6 @@ begin
 end|
 delimiter ;|
 
---enable_result_log
---enable_query_log
-
 ## let $i = $workers + 1;
 ##eval 
 # delimiter |;
@@ -72,17 +72,77 @@ while($i)
   create table ti_nk (a int, b int) engine=innodb;
   create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
 
+  # this table is special - just for timing. It's more special on test0 db
+  # where it contains master timing of the load as well.
+  create table benchmark (state text) engine=myisam; # timestamp keep on the slave side
+
   dec $i;
 }
 
+--enable_result_log
+--enable_query_log
+
+
+#connection slave;
+sync_slave_with_master;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $workers + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+  alter table benchmark add ts timestamp not null default current_timestamp;
+
+  dec $i;
+}
+--enable_result_log
+--enable_query_log
+
+
+# not gather events into relay log w/o executing yet
+stop slave sql_thread;
+
 ##call p1(1);
 
+connection master;
+
 --disable_query_log
 --disable_result_log
 
 #
 # Load producer
 #
+
+# initial timestamp to record
+
+# the extra ts col on slave is effective only with the STMT format (todo: bug-report)
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+let $i = $workers + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+
+  insert into benchmark set state='slave takes on load';
+
+  dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
+connection slave;
+
+use test0;
+insert into benchmark set state='master started load';
+
+
+connection master;
+
 while ($iter)
 {
     let $i = $workers + 1;
@@ -105,6 +165,30 @@ while ($iter)
     dec $iter;
 }
 
+connection slave;
+
+use test0;
+insert into benchmark set state='master ends load';
+
+connection master;
+
+# terminal timestamp to record
+
+let $i = $workers + 1;
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+
+  insert into benchmark set state='slave ends load';
+
+  dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
 --enable_result_log
 --enable_query_log
 
@@ -113,26 +197,35 @@ connection slave;
 ## todo: record start and end time of appying to compare times of
 #  parallel and serial execution.
 
-start slave sql_thread;
+--disable_query_log
+--disable_result_log
+
+### --sleep 15  # todo: convert to wait for the last event has been applied
+
+--echo *** you can connect and change the exec mode as well now ***
+--echo *** and select * from benchmark before to run consistency check ***
+
+insert into test0.benchmark set state='slave is processing load';
 
---sleep 5  # todo: convert to wait for the last event has been applied
+select sleep(10000);
+### start slave sql_thread;
 
-let $diff_table_1=master:test0.tm_nk;
-let $diff_table_2=slave:test0.tm_nk;
-source include/diff_tables.inc;
 
-let $diff_table_1=master:test0.tm_wk;
-let $diff_table_2=slave:test0.tm_wk;
-source include/diff_tables.inc;
+let $i = $workers + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
 
-let $diff_table_1=master:test0.ti_nk;
-let $diff_table_2=slave:test0.ti_nk;
-source include/diff_tables.inc;
+  let $diff_table_1=master:test$i1.tm_nk;
+  let $diff_table_2=slave:test$i1.tm_nk;
+  source include/diff_tables.inc;
 
-let $diff_table_1=master:test0.ti_wk;
-let $diff_table_2=slave:test0.ti_wk;
-source include/diff_tables.inc;
+  dec $i;
+}
 
+--enable_result_log
+--enable_query_log
 
 connection master;
 
@@ -156,6 +249,7 @@ connection slave;
 set @@global.slave_exec_mode= @save.slave_exec_mode;
 
 connection master;
-### select sleep(10000);
+
+###select sleep(10000);
 
 # End of 4.1 tests

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-09-15 11:51:49 +0000
+++ b/sql/log_event.cc	2010-09-17 08:49:00 +0000
@@ -2167,7 +2167,10 @@ void append_item_to_jobs(struct slave_jo
   mysql_mutex_lock(&w->jobs_lock);
   if (!w->thd->is_slave_error)  // error check exists at stmt commit as well
   {
-    w->jobs.push_back(job_item, &w->mem_root);
+    int err;
+    err= w->jobs.push_back(job_item, &w->mem_root);
+    DBUG_ASSERT(!err);
+
     w->curr_jobs++;
     if (w->jobs.elements == 1)
       mysql_cond_signal(&w->jobs_cond);
@@ -2185,11 +2188,6 @@ void append_item_to_jobs(struct slave_jo
    scheduling event execution either serially or in parallel
 */
 int Log_event::apply_event(Relay_log_info const *rli)
-#if 0
-{
-   return do_apply_event(rli);
-}
-#else
 {
   uint i_w;
   DBUG_ENTER("LOG_EVENT:apply_event");
@@ -2208,7 +2206,6 @@ int Log_event::apply_event(Relay_log_inf
   mysql_mutex_lock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
   job_item= const_cast<Relay_log_info*>(rli)->free_jobs.pop();
   mysql_mutex_unlock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
-  DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%d", job_item, this, i_w));
   if (!job_item)
   { 
     /*
@@ -2219,13 +2216,14 @@ int Log_event::apply_event(Relay_log_inf
     job_item= (struct slave_job_item *)
       alloc_root(&const_cast<Relay_log_info*>(rli)->job_list_mem_root,
                  sizeof(struct slave_job_item));
+    DBUG_PRINT("Log_event::apply_event:", ("allocated: %p item", job_item));
   }
+  DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%d", job_item, this, i_w));
   job_item->data= this;
   append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
   DBUG_RETURN(0);
 }
 
-#endif
 
 /**
    Worker's routine to wait for a new assignement in its
@@ -2274,6 +2272,7 @@ int slave_worker_exec_job(struct slave_w
   struct slave_job_item *job_item;
   THD *thd= w->thd;
   Log_event *ev= NULL;
+  int err;
 
   DBUG_ENTER("slave_worker_exec_job");
 
@@ -2284,6 +2283,7 @@ int slave_worker_exec_job(struct slave_w
     goto err;
   }
   ev= static_cast<Log_event*>(job_item->data);
+  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data %p", w->id, job_item, ev));
 
   thd->server_id = ev->server_id;
   thd->set_time();
@@ -2300,9 +2300,15 @@ int slave_worker_exec_job(struct slave_w
   rli->pending_jobs--;
   DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max &&
               rli->pending_jobs >= 0);
-  rli->free_jobs.push_back(job_item, &rli->job_list_mem_root);
-  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data %p", w->id, job_item, ev));
-  
+
+  // todo: move afront of applying
+  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu -> job item: %p data %p", w->id, job_item, ev));
+#ifndef DBUG_OFF
+  job_item->data= (char*) w->id; // mark as used by worker
+#endif
+  err= rli->free_jobs.push_back(job_item, &rli->job_list_mem_root);
+  DBUG_ASSERT(!err);
+
   /* coordinator can be waiting */
 
   if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
@@ -2318,17 +2324,31 @@ int slave_worker_exec_job(struct slave_w
   }
   else
   {
-    // Accumulating events is supposed to be rare.
-    // todo: add a private memroot, max length param, overrun guard, statistics
-    Log_event *used_ev;
-    w->data_in_use.push_back(ev);
-    while (w->data_in_use.is_empty() &&
-           (used_ev=
-            static_cast<Log_event *>(w->data_in_use.first_node()->info))->soiled)
-    {
-      DBUG_PRINT("slave_worker_exec_job GC:", ("W_%lu, event %p", w->id, used_ev));
-      delete used_ev;
-      w->data_in_use.pop();
+    // Accumulating to be-deleted events is supposed to be pretty rare, 
+    // still necessary.
+    // todo: convert ev->update_pos() into rli->update_pos() and get rid of this
+    //       workaround.
+
+    mysql_mutex_lock(&w->jobs_lock);
+    w->data_in_use.push_back(ev, &w->mem_root);
+    mysql_mutex_unlock(&w->jobs_lock);
+
+    while (!w->data_in_use.is_empty())
+    {
+      struct slave_job_item *item=
+        static_cast<struct slave_job_item *>(w->data_in_use.first_node()->info);
+      Log_event *used_ev= static_cast<Log_event *>(item->data);
+
+      if (used_ev->soiled)
+      {
+        DBUG_PRINT("GC:", ("W_%lu, event %p", w->id, used_ev));
+        delete used_ev;
+        w->data_in_use.pop();
+      }
+      else
+      {
+        break;
+      }
     }
   }
 
@@ -3481,7 +3501,7 @@ int Query_log_event::do_apply_event(Rela
   */
   if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
   {
-    thd->set_time((time_t)when);
+    thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
     thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
     thd->variables.pseudo_thread_id= thread_id;		// for temp tables
     DBUG_PRINT("query",("%s", thd->query()));
@@ -5040,7 +5060,7 @@ int Load_log_event::do_apply_event(NET* 
   */
   if (rpl_filter->db_ok(thd->db))
   {
-    thd->set_time((time_t)when);
+    thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
     thd->set_query_id(next_query_id());
     thd->warning_info->opt_clear_warning_info(thd->query_id);
 
@@ -7931,7 +7951,7 @@ int Rows_log_event::do_apply_event(Relay
       TIMESTAMP column to a table with one.
       So we call set_time(), like in SBR. Presently it changes nothing.
     */
-    thd->set_time((time_t)when);
+    thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
 
     /*
       Now we are in a statement and will stay in a statement until we

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-09-09 18:43:16 +0000
+++ b/sql/mysqld.cc	2010-09-17 08:49:00 +0000
@@ -461,6 +461,7 @@ uint  slave_net_timeout;
 ulong slave_exec_mode_options;
 ulonglong slave_type_conversions_options;
 ulong slave_parallel_workers;
+my_bool slave_local_timestamp_opt;
 ulong thread_cache_size=0;
 ulong binlog_cache_size=0;
 ulonglong  max_binlog_cache_size=0;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-09-09 18:43:16 +0000
+++ b/sql/mysqld.h	2010-09-17 08:49:00 +0000
@@ -173,6 +173,7 @@ extern LEX_CSTRING reason_slave_blocked;
 extern ulong slave_trans_retries;
 extern uint  slave_net_timeout;
 extern ulong slave_parallel_workers;
+extern my_bool slave_local_timestamp_opt;
 extern uint max_user_connections;
 extern ulong what_to_log,flush_time;
 extern ulong max_prepared_stmt_count, prepared_stmt_count;

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-09-13 10:15:38 +0000
+++ b/sql/rpl_rli.h	2010-09-17 08:49:00 +0000
@@ -28,7 +28,6 @@ class Master_info;
 extern uint sql_slave_skip_counter;
 
 #define SLAVE_WORKER_QUEUE_SIZE 8096
-#define PARA_SLAVE_JOB_PARAM_QUEUE_SIZE 1024
 
 struct slave_job_item
 {

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-09-13 10:15:38 +0000
+++ b/sql/rpl_slave.cc	2010-09-17 08:49:00 +0000
@@ -5016,7 +5016,8 @@ static Log_event* next_event(Relay_log_i
       DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
       // mts-II: consider the following if the above fails:
       DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos ||
-                  bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_PARALLEL) == 1);
+                  //bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_PARALLEL) == 1);
+                  slave_exec_mode_options == SLAVE_EXEC_MODE_PARALLEL); // todo: fix slave_exec_mode enum->set
 
     }
 #endif

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-09-09 18:43:16 +0000
+++ b/sql/sys_vars.cc	2010-09-17 08:49:00 +0000
@@ -3049,6 +3049,11 @@ static Sys_var_ulong Sys_slave_parallel_
        "Number of worker threads for executing events in parallel ",
        GLOBAL_VAR(slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
+static Sys_var_mybool Sys_slave_local_timestamp(
+       "slave_local_timestamp", "if enabled slave computes the event appying "
+       "time value to implicitly affected timestamp columms. Otherwise (default) "
+       "installs prescribed by the master value.",
+       GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
 #endif
 
 static bool check_locale(sys_var *self, THD *thd, set_var *var)


Attachment: [text/bzr-bundle] bzr/aelkin@mysql.com-20100917084900-n4pohdldt6st9rws.bundle
Thread
bzr commit into mysql-next-mr branch (aelkin:3200) WL#5563Andrei Elkin17 Sep