List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 20 2010 3:59pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3251) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3251 Andrei Elkin	2010-12-20 [merge]
      wl#5569 MTS
      
      manual merging from the repo and correcting GAQ processing with introducing a volatile byte to indicate whether an item is busy or released.

    added:
      mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result
      mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test
    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/r/mysqld--help-win.result
      mysql-test/suite/funcs_1/r/is_columns_mysql.result
      mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
      mysql-test/suite/rpl/t/disabled.def
      mysql-test/suite/sys_vars/r/all_vars.result
      scripts/mysql_system_tables.sql
      sql/log_event.cc
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_info_factory.cc
      sql/rpl_info_factory.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/rpl_slave.h
      sql/sys_vars.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-17 12:46:15 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-20 15:58:58 +0000
@@ -6,7 +6,7 @@
 # load volume parameter
 #
 
-let $iter = 1000;
+let $iter = 500;
 
 # windows run on PB2 is too slow to time out
 disable_query_log;

=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result	2010-12-14 14:46:20 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result	2010-12-18 00:33:02 +0000
@@ -341,6 +341,9 @@ The following options may be given as th
  --min-examined-row-limit=# 
  Don't write queries to slow log that examine fewer rows
  than that
+ --mts-checkpoint-group=# 
+ Define the number of transactions before a checkpoint
+ operation is called.
  --mts-checkpoint-period=# 
  Gather workers' activities to flush the relay log info to
  disk after every #th milli-seconds. The zero value
@@ -895,6 +898,7 @@ max-user-connections 0
 max-write-lock-count 18446744073709551615
 memlock FALSE
 min-examined-row-limit 0
+mts-checkpoint-group 512
 mts-checkpoint-period 300
 mts-coordinator-basic-nap 5
 mts-exp-slave-local-timestamp FALSE

=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result	2010-12-16 21:41:45 +0000
+++ b/mysql-test/r/mysqld--help-win.result	2010-12-18 00:33:02 +0000
@@ -340,6 +340,9 @@ The following options may be given as th
  --min-examined-row-limit=# 
  Don't write queries to slow log that examine fewer rows
  than that
+ --mts-checkpoint-group=# 
+ Define the number of transactions before a checkpoint
+ operation is called.
  --mts-checkpoint-period=# 
  Gather workers' activities to flush the relay log info to
  disk after every #th milli-seconds. The zero value
@@ -898,6 +901,7 @@ max-user-connections 0
 max-write-lock-count 18446744073709551615
 memlock FALSE
 min-examined-row-limit 0
+mts-checkpoint-group 512
 mts-checkpoint-period 300
 mts-coordinator-basic-nap 5
 mts-exp-slave-local-timestamp FALSE

=== modified file 'mysql-test/suite/funcs_1/r/is_columns_mysql.result'
--- a/mysql-test/suite/funcs_1/r/is_columns_mysql.result	2010-12-15 17:46:05 +0000
+++ b/mysql-test/suite/funcs_1/r/is_columns_mysql.result	2010-12-18 00:33:02 +0000
@@ -179,9 +179,8 @@ def	mysql	slave_relay_log_info	Number_of
 def	mysql	slave_relay_log_info	Relay_log_name	3	NULL	NO	text	65535	65535	NULL	NULL	utf8	utf8_bin	text			select,insert,update,references	
 def	mysql	slave_relay_log_info	Relay_log_pos	4	NULL	NO	bigint	NULL	NULL	20	0	NULL	NULL	bigint(20) unsigned			select,insert,update,references	
 def	mysql	slave_relay_log_info	Sql_delay	7	NULL	NO	int	NULL	NULL	10	0	NULL	NULL	int(11)			select,insert,update,references	
-def	mysql	slave_worker_info	Checkpoint_log_pos	7	NULL	NO	bigint	NULL	NULL	20	0	NULL	NULL	bigint(20) unsigned			select,insert,update,references	
-def	mysql	slave_worker_info	Group_bitmap	9	NULL	YES	text	65535	65535	NULL	NULL	utf8	utf8_bin	text			select,insert,update,references	
-def	mysql	slave_worker_info	Group_count	8	NULL	NO	int	NULL	NULL	10	0	NULL	NULL	int(10) unsigned			select,insert,update,references	
+def	mysql	slave_worker_info	Group_bitmap	8	NULL	YES	text	65535	65535	NULL	NULL	utf8	utf8_bin	text			select,insert,update,references	
+def	mysql	slave_worker_info	Group_count	7	NULL	NO	int	NULL	NULL	10	0	NULL	NULL	int(10) unsigned			select,insert,update,references	
 def	mysql	slave_worker_info	Master_id	1	NULL	NO	int	NULL	NULL	10	0	NULL	NULL	int(10) unsigned	PRI		select,insert,update,references	
 def	mysql	slave_worker_info	Master_log_name	5	NULL	NO	text	65535	65535	NULL	NULL	utf8	utf8_bin	text			select,insert,update,references	
 def	mysql	slave_worker_info	Master_log_pos	6	NULL	NO	bigint	NULL	NULL	20	0	NULL	NULL	bigint(20) unsigned			select,insert,update,references	
@@ -517,7 +516,6 @@ NULL	mysql	slave_worker_info	Worker_id	i
 NULL	mysql	slave_worker_info	Relay_log_pos	bigint	NULL	NULL	NULL	NULL	bigint(20) unsigned
 1.0000	mysql	slave_worker_info	Master_log_name	text	65535	65535	utf8	utf8_bin	text
 NULL	mysql	slave_worker_info	Master_log_pos	bigint	NULL	NULL	NULL	NULL	bigint(20) unsigned
-NULL	mysql	slave_worker_info	Checkpoint_log_pos	bigint	NULL	NULL	NULL	NULL	bigint(20) unsigned
 NULL	mysql	slave_worker_info	Group_count	int	NULL	NULL	NULL	NULL	int(10) unsigned
 1.0000	mysql	slave_worker_info	Group_bitmap	text	65535	65535	utf8	utf8_bin	text
 NULL	mysql	slow_log	start_time	timestamp	NULL	NULL	NULL	NULL	timestamp

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result	2010-12-07 17:35:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result	2010-12-18 00:33:02 +0000
@@ -8,10 +8,12 @@ create view coord_wait_list as
 SELECT id from Information_Schema.processlist
 where state like 'Waiting for Slave Worker%';
 include/stop_slave.inc
-set @save.slave_parallel_workers= @@global.slave_parallel_workers;
-set @@global.slave_parallel_workers= 4;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
 set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
 set @@global.mts_slave_worker_queue_len_max= 5;
+Warnings:
+Note	1724	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 include/start_slave.inc
 create database d0;
 create table d0.t1 (a int auto_increment primary key) engine=innodb;
@@ -32,6 +34,8 @@ set @@global.mts_slave_worker_queue_len_
 include/stop_slave.inc
 set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
 set @@global.mts_pending_jobs_size_max= 1024;
+Warnings:
+Note	1724	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 include/start_slave.inc
 create table d0.t2 (a int auto_increment primary key, b text null) engine=innodb;
 begin;
@@ -190,4 +194,4 @@ drop database d2;
 drop database d1;
 drop database d0;
 drop view coord_wait_list;
-set @@global.slave_parallel_workers= @save.slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def	2010-12-14 12:51:30 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def	2010-12-18 00:33:02 +0000
@@ -16,4 +16,3 @@ rpl_row_event_max_size    : Bug#55675 20
 rpl_delayed_slave         : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
 rpl_log_pos               : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
 rpl_trigger               : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
-rpl_parallel_conf_limits  : wl#5599   9-12-2010 Andrei Waiting for the recovery wl

=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-16 21:41:45 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-18 00:33:02 +0000
@@ -13,6 +13,7 @@ left join t1 on variable_name=test_name 
 There should be *no* variables listed below:
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
 MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
@@ -22,16 +23,16 @@ INNODB_ANALYZE_IS_PERSISTENT
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
 MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
@@ -41,13 +42,12 @@ INNODB_ANALYZE_IS_PERSISTENT
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 drop table t1;
 drop table t2;

=== added file 'mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result'
--- a/mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result	2010-12-18 00:33:02 +0000
@@ -0,0 +1,51 @@
+SET @start_global_value = @@global.mts_checkpoint_group;
+SELECT @start_global_value;
+@start_global_value
+512
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+select @@session.mts_checkpoint_group;
+ERROR HY000: Variable 'mts_checkpoint_group' is a GLOBAL variable
+show global variables like 'mts_checkpoint_group';
+Variable_name	Value
+mts_checkpoint_group	512
+show session variables like 'mts_checkpoint_group';
+Variable_name	Value
+mts_checkpoint_group	512
+select * from information_schema.global_variables where variable_name='mts_checkpoint_group';
+VARIABLE_NAME	VARIABLE_VALUE
+MTS_CHECKPOINT_GROUP	512
+select * from information_schema.session_variables where variable_name='mts_checkpoint_group';
+VARIABLE_NAME	VARIABLE_VALUE
+MTS_CHECKPOINT_GROUP	512
+set global mts_checkpoint_group=1;
+Warnings:
+Warning	1292	Truncated incorrect mts_checkpoint_group value: '1'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+set session mts_checkpoint_group=1;
+ERROR HY000: Variable 'mts_checkpoint_group' is a GLOBAL variable and should be set with SET GLOBAL
+set global mts_checkpoint_group=1.1;
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group=1e1;
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group="foo";
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group=0;
+Warnings:
+Warning	1292	Truncated incorrect mts_checkpoint_group value: '0'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+set global mts_checkpoint_group=cast(-1 as unsigned int);
+Warnings:
+Warning	1292	Truncated incorrect mts_checkpoint_group value: '18446744073709551615'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+4294967288
+SET @@global.mts_checkpoint_group = @start_global_value;
+SELECT @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512

=== added file 'mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test'
--- a/mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test	2010-12-18 00:33:02 +0000
@@ -0,0 +1,44 @@
+--source include/not_embedded.inc
+
+SET @start_global_value = @@global.mts_checkpoint_group;
+SELECT @start_global_value;
+
+#
+# exists as global only
+#
+select @@global.mts_checkpoint_group;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+select @@session.mts_checkpoint_group;
+show global variables like 'mts_checkpoint_group';
+show session variables like 'mts_checkpoint_group';
+select * from information_schema.global_variables where variable_name='mts_checkpoint_group';
+select * from information_schema.session_variables where variable_name='mts_checkpoint_group';
+
+#
+# show that it's writable
+#
+set global mts_checkpoint_group=1;
+select @@global.mts_checkpoint_group;
+--error ER_GLOBAL_VARIABLE
+set session mts_checkpoint_group=1;
+
+#
+# incorrect types
+#
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group=1.1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group=1e1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group="foo";
+
+#
+# min/max values
+#
+set global mts_checkpoint_group=0;
+select @@global.mts_checkpoint_group;
+set global mts_checkpoint_group=cast(-1 as unsigned int);
+select @@global.mts_checkpoint_group;
+
+SET @@global.mts_checkpoint_group = @start_global_value;
+SELECT @@global.mts_checkpoint_group;

=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2010-12-15 17:46:05 +0000
+++ b/scripts/mysql_system_tables.sql	2010-12-18 00:33:02 +0000
@@ -104,7 +104,7 @@ CREATE TABLE IF NOT EXISTS slave_relay_l
 
 CREATE TABLE IF NOT EXISTS slave_master_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Host TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_name TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_password TEXT CHARACTER SET utf8 COLLATE utf8_bin, Port INTEGER UNSIGNED NOT NULL, Connect_retry INTEGER UNSIGNED NOT NULL, Enabled_ssl BOOLEAN NOT NULL, Ssl_ca TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_capath TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cert TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cipher TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_key TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_verify_servert_cert BOOLEAN NOT NULL, Heartbeat FLOAT NOT NULL, Bind TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ignored_server_ids TEXT CHARACTER SET utf8 COLLATE utf8_bin, Uuid TEXT CHARACTER SET utf8 COLLATE utf8_bin, Retry_count BIGIN!
 T UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Master Information';
 
-CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Checkpoint_log_pos BIGINT UNSIGNED NOT NULL, Group_count INTEGER UNSIGNED NOT NULL, Group_bitmap TEXT CHARACTER SET utf8 COLLATE utf8_bin, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
+CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Group_count INTEGER UNSIGNED NOT NULL, Group_bitmap TEXT CHARACTER SET utf8 COLLATE utf8_bin, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
 
 --
 -- PERFORMANCE SCHEMA INSTALLATION

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-18 19:00:23 +0000
+++ b/sql/log_event.cc	2010-12-20 15:58:58 +0000
@@ -2429,6 +2429,7 @@ Slave_worker *Log_event::get_slave_worke
     g.checkpoint_log_name= NULL;
     g.checkpoint_log_pos= 0;
     g.checkpoint_seqno= (uint) -1;
+    g.done= 0;
 
     // the last occupied GAQ's array index
     gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2775,6 +2776,10 @@ int Log_event::apply_event(Relay_log_inf
     }
     DBUG_RETURN(do_apply_event(rli));
   }
+
+  uint critical= (rli->checkpoint_group / rli->slave_parallel_workers);
+  if (rli->is_parallel_exec() && rli->checkpoint_seqno > critical)
+    mts_checkpoint_routine(c_rli, 0, TRUE, TRUE);
   
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-12-14 14:46:20 +0000
+++ b/sql/mysqld.cc	2010-12-18 00:33:02 +0000
@@ -507,7 +507,7 @@ ulong thread_id=1L,current_pid;
 ulong slow_launch_threads = 0;
 uint sync_binlog_period= 0, sync_relaylog_period= 0,
      sync_relayloginfo_period= 0, sync_masterinfo_period= 0,
-     mts_checkpoint_period= 0;
+     mts_checkpoint_period= 0, mts_checkpoint_group;
 ulong expire_logs_days = 0;
 
 const double log_10[] = {

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-12-14 14:46:20 +0000
+++ b/sql/mysqld.h	2010-12-18 00:33:02 +0000
@@ -133,7 +133,7 @@ extern ulong expire_logs_days;
 extern my_bool relay_log_recovery;
 extern uint sync_binlog_period, sync_relaylog_period, 
             sync_relayloginfo_period, sync_masterinfo_period,
-            mts_checkpoint_period;
+            mts_checkpoint_period, mts_checkpoint_group;
 extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size;
 extern ulong tc_log_page_waits;
 extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb;

=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc	2010-12-15 17:46:05 +0000
+++ b/sql/rpl_info_factory.cc	2010-12-18 00:33:02 +0000
@@ -408,7 +408,8 @@ bool Rpl_info_factory::change_engine(Rpl
   DBUG_RETURN(FALSE);
 }
 
-Slave_worker *Rpl_info_factory::create_worker(uint worker_option, uint worker_id)
+Slave_worker *Rpl_info_factory::create_worker(uint worker_option, uint worker_id,
+                                              Relay_log_info *rli)
 {
   DBUG_ENTER("Rpl_info_factory::create_worker");
 
@@ -429,7 +430,7 @@ Slave_worker *Rpl_info_factory::create_w
   pos= strmov(info_name, "worker");
   sprintf(pos, ".%u", worker_id);
 
-  if (!(worker= new Slave_worker(info_name, "worker")))
+  if (!(worker= new Slave_worker(info_name, "worker", rli)))
     goto err;
 
   if (!(key_info_idx= new ulong[NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER]))

=== modified file 'sql/rpl_info_factory.h'
--- a/sql/rpl_info_factory.h	2010-12-09 16:17:32 +0000
+++ b/sql/rpl_info_factory.h	2010-12-18 00:33:02 +0000
@@ -64,7 +64,8 @@ class Rpl_info_factory
 
   static bool create_coordinators(uint mi_option, Master_info **mi,
                                   uint rli_option, Relay_log_info **rli);
-  static Slave_worker *create_worker(uint rli_option, uint worker_id);
+  static Slave_worker *create_worker(uint rli_option, uint worker_id,
+                                     Relay_log_info *rli);
   static Master_info *create_mi(uint rli_option);
   static Relay_log_info *create_rli(uint rli_option, bool is_slave_recovery);
   static bool decide_repository(Rpl_info *info,

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli.cc	2010-12-18 00:33:02 +0000
@@ -70,8 +70,8 @@ Relay_log_info::Relay_log_info(bool is_s
    tables_to_lock(0), tables_to_lock_count(0),
    rows_query_ev(NULL), last_event_start_time(0),
    this_worker(NULL), slave_parallel_workers(0),
-   sql_delay(0), sql_delay_end(0),
-   m_flags(0)
+   checkpoint_group(mts_checkpoint_group),
+   sql_delay(0), sql_delay_end(0), m_flags(0)
 {
   DBUG_ENTER("Relay_log_info::Relay_log_info");
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli.h	2010-12-18 00:33:02 +0000
@@ -469,6 +469,7 @@ public:
   */
   DYNAMIC_ARRAY least_occupied_workers;
   uint checkpoint_seqno;  // counter of groups executed after the most recent CP
+  uint checkpoint_group;  // counter of groups after which a checkpoint is called.
 
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-17 12:46:15 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-20 15:58:58 +0000
@@ -15,16 +15,15 @@ const char *info_slave_worker_fields []=
   "relay_log_pos",
   "master_log_name",
   "master_log_pos",
-  "checkpoint_log_pos",
   "group_count",
   "group_bitmap"
 };
 
-Slave_worker::Slave_worker(const char* type, const char* pfs)
-  : Rpl_info_worker(type, pfs), curr_group_exec_parts(0),
+Slave_worker::Slave_worker(const char* type, const char* pfs,
+                           Relay_log_info *rli)
+  : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
   group_relay_log_pos(0), group_master_log_pos(0),
-  checkpoint_log_pos(0), inited_group_execed(0),
-  critical(0)
+  checkpoint_log_pos(0), inited_group_execed(0)
 {
   group_relay_log_name[0]= 0;
   group_master_log_name[0]= 0;
@@ -52,7 +51,8 @@ int Slave_worker::init_info()
   if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
     goto err;
 
-  if (bitmap_init(&group_execed, NULL, 32768, FALSE))
+  if (bitmap_init(&group_execed, NULL,
+                  c_rli->checkpoint_group, FALSE))
     goto err;
   
   inited_group_execed= 1;
@@ -125,7 +125,6 @@ bool Slave_worker::read_info(Rpl_info_ha
 
   ulong temp_group_relay_log_pos= 0;
   ulong temp_group_master_log_pos= 0;
-  ulong temp_checkpoint_log_pos= 0;
   ulong nbytes= 0;
   uchar *buffer= (uchar *) group_execed.bitmap;
 
@@ -142,8 +141,6 @@ bool Slave_worker::read_info(Rpl_info_ha
                      (char *) "") ||
       from->get_info((ulong *) &temp_group_master_log_pos,
                      (ulong) 0) ||
-      from->get_info((ulong *) &temp_checkpoint_log_pos,
-                     (ulong) 0) ||
       from->get_info(&nbytes, (ulong) 0) ||
       from->get_info(buffer, (size_t) nbytes,
                      (uchar *) 0))
@@ -151,7 +148,6 @@ bool Slave_worker::read_info(Rpl_info_ha
 
   group_relay_log_pos=  temp_group_relay_log_pos;
   group_master_log_pos= temp_group_master_log_pos;
-  checkpoint_log_pos= temp_checkpoint_log_pos;
 
   DBUG_RETURN(FALSE);
 }
@@ -168,7 +164,6 @@ bool Slave_worker::write_info(Rpl_info_h
       to->set_info((ulong) group_relay_log_pos) ||
       to->set_info(group_master_log_name) ||
       to->set_info((ulong) group_master_log_pos) ||
-      to->set_info((ulong) checkpoint_log_pos) ||
       to->set_info(nbytes) ||
       to->set_info(buffer, (size_t) nbytes))
     DBUG_RETURN(TRUE);
@@ -196,16 +191,10 @@ bool Slave_worker::commit_positions(Log_
     my_free(ptr_g->checkpoint_log_name);
     ptr_g->checkpoint_log_name= NULL;
 
-    critical= FALSE;
     bitmap_clear_all(&group_execed);
   }
   
-  if (ptr_g->checkpoint_seqno)
-  {
-    if (ptr_g->checkpoint_seqno > (32768 / 8))
-      critical= TRUE;
-    bitmap_fast_test_and_set(&group_execed, ptr_g->checkpoint_seqno);
-  }
+  bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
 
   group_relay_log_pos= ev->future_event_relay_log_pos;
   group_master_log_pos= ev->log_pos;
@@ -537,6 +526,7 @@ void Slave_worker::slave_worker_ends_gro
     // first ever group must have relay log name
     DBUG_ASSERT(last_group_done_index != c_rli->gaq->s ||
                 ptr_g->group_relay_log_name != NULL);
+    DBUG_ASSERT(ptr_g->worker_id == id);
 
     if (ptr_g->group_relay_log_name != NULL)
     {
@@ -555,8 +545,9 @@ void Slave_worker::slave_worker_ends_gro
 
     ptr_g->group_master_log_pos= group_master_log_pos;
     ptr_g->group_relay_log_pos= group_relay_log_pos;
+    ptr_g->done= 1;    // GAQ index is available to C now
 
-    last_group_done_index = gaq_idx;   // GAQ index is available to C now
+    last_group_done_index= gaq_idx;
   }
 
   // cleanup relating to the last executed group regardless of error
@@ -629,6 +620,7 @@ ulong circular_buffer_queue::de_queue(uc
   DBUG_ASSERT(e == s ||
               (len == (a >= e)? (a - e) :
                (s + a - e)));
+  DBUG_ASSERT(a != e);
 
   return ret;
 }
@@ -655,6 +647,7 @@ ulong circular_buffer_queue::de_tail(uch
   DBUG_ASSERT(e == s ||
               (len == (a >= e)? (a - e) :
                (s + a - e)));
+  DBUG_ASSERT(a != e);
 
   return a;
 }
@@ -686,9 +679,12 @@ ulong circular_buffer_queue::en_queue(vo
   // post-boundary cond
   if (a == e)
     a= s;
+
   DBUG_ASSERT(a == e || 
               len == (a >= e) ?
               (a - e) : (s + a - e));
+  DBUG_ASSERT(a != e);
+
   return ret;
 }
 
@@ -714,12 +710,13 @@ void* circular_buffer_queue::head_queue(
            range, incl cases the queue is empty or full.
 
    @return TRUE  if the first arg identifies a queue entity ordered
-                 before one defined by the 2nd arg,
+                 after one defined by the 2nd arg,
            FALSE otherwise.
 */
 bool circular_buffer_queue::gt(ulong i, ulong k)
 {
   DBUG_ASSERT(i < s && k < s);
+  DBUG_ASSERT(a != e);
 
   if (i >= e)
     if (k >= e)
@@ -770,8 +767,12 @@ ulong Slave_committed_queue::move_queue_
     if (ptr_g->worker_id == (ulong) -1)
       break; /* the head is not even assigned */
     get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
-    
-    if (w_i->last_group_done_index == s || gt(i, w_i->last_group_done_index))
+
+    // no stale last_group_done_index value
+    // DBUG_ASSERT(w_i->checkpoint_notified);
+
+    //if (in(w_i->last_group_done_index) && gt(i, w_i->last_group_done_index))
+    if (!ptr_g->done)
       break; /* gap at i'th */
 
     // memorize the last met group_relay_log_name
@@ -869,25 +870,3 @@ int wait_for_workers_to_finish(Relay_log
   }
   return ret;
 }
-
-bool critical_worker(Relay_log_info *rli)
-{
-  bool critical= FALSE;
-  DYNAMIC_ARRAY *ws= &rli->workers;
-  Slave_worker *current_worker= NULL;
-
-  if (rli->is_parallel_exec() && ws)
-  {
-    for (uint i= 0; ws && i < ws->elements; i++)
-    {
-      get_dynamic(ws, (uchar*) &current_worker, i);
-      if (current_worker->critical)
-      {
-        critical= current_worker->critical;
-        break;
-      }
-    }
-  }
-  return(critical);
-}
-

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-20 15:58:58 +0000
@@ -98,6 +98,9 @@ public:
   */
   void* head_queue();
   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
+  /* index is within the valid range */
+  bool in(ulong k) { return !empty() && 
+      (e > a ? (k >= e || k < a) : (k >= e && k < a)); }
   bool empty() { return e == s; }
   bool full() { return a == s; }
 };
@@ -127,6 +130,7 @@ typedef struct st_slave_job_group
   uint  checkpoint_seqno;
   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
   char*    checkpoint_log_name;
+  volatile uchar done;  // Flag raised by W,  read and reset by C
 } Slave_job_group;
 
 #define get_job(from, to) \
@@ -196,7 +200,8 @@ public:
 class Slave_worker : public Rpl_info_worker
 {
 public:
-  Slave_worker(const char *type, const char *pfs);
+  Slave_worker(const char *type, const char *pfs,
+               Relay_log_info *rli);
   virtual ~Slave_worker();
 
   mysql_mutex_t jobs_lock;
@@ -261,8 +266,6 @@ public:
   
   bool inited_group_execed;
 
-  bool critical;
-
 private:
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-17 12:46:15 +0000
+++ b/sql/rpl_slave.cc	2010-12-18 00:33:02 +0000
@@ -171,9 +171,6 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
-                                   bool force, bool locked);
-bool mts_recovery_routine(Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -3796,13 +3793,11 @@ int mts_recovery_cmp(Slave_job_group *id
 {
   longlong filecmp= strcmp(id1->group_relay_log_name, id2->group_relay_log_name);
   longlong poscmp= id1->group_relay_log_pos - id2->group_relay_log_pos;
-  return (filecmp < 0  ? -1 :
-         (filecmp > 0  ?  1 :
-         (poscmp  < 0  ? -1 :
-         (poscmp  > 0  ?  1 : 0))));
+  return (filecmp < 0  ? -1 : (filecmp > 0  ?  1 :
+         (poscmp  < 0  ? -1 : (poscmp  > 0  ?  1 : 0))));
 }
 
-bool mts_recovery_groups(Relay_log_info *rli, DYNAMIC_ARRAY *jobs)
+bool mts_recovery_groups(Relay_log_info *rli, DYNAMIC_ARRAY *jobs, MY_BITMAP *groups)
 {
   DBUG_ENTER("mts_recovery_groups");
   DBUG_ASSERT(rli->slave_parallel_workers > 0);
@@ -3812,15 +3807,20 @@ bool mts_recovery_groups(Relay_log_info 
   for (uint id= 0; id < rli->slave_parallel_workers; id++)
   {
     Slave_worker *worker=
-      Rpl_info_factory::create_worker(opt_worker_repository_id, id);
+      Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
     worker->init_info();
-    get_job(worker, job);
-    insert_dynamic(jobs, (uchar*) &job);
+    longlong filecmp= strcmp(rli->get_group_relay_log_name(),
+                             worker->group_relay_log_name);
+    longlong poscmp= rli->get_group_relay_log_pos() -
+                     worker->group_relay_log_pos;
+    if (filecmp < 0 || (filecmp == 0 && poscmp < 0))
+    {
+      get_job(worker, job);
+      insert_dynamic(jobs, (uchar*) &job);
+    }
   }
   sort_dynamic(jobs, (qsort_cmp) mts_recovery_cmp);
 
-  DBUG_ASSERT(rli->slave_parallel_workers == jobs->elements);
-
   DBUG_RETURN(FALSE);
 }
 
@@ -3831,22 +3831,27 @@ bool mts_recovery_routine(Relay_log_info
   const char *errmsg= NULL;
   bool error= FALSE;
   DYNAMIC_ARRAY jobs;
+  MY_BITMAP groups;
+  uint group_counter;
+  bool curr_group_seen_begin= FALSE;
   Slave_job_group job;
   IO_CACHE log;
   File file;
   MY_STAT s;
 
   DBUG_ENTER("mts_recovery_routine");
- 
+
+  bitmap_init(&groups, NULL, rli->checkpoint_group, FALSE);
   my_init_dynamic_array(&jobs, sizeof(Slave_job_group),
                         rli->slave_parallel_workers, rli->slave_parallel_workers);
-
-  mts_recovery_groups(rli, &jobs); 
+  mts_recovery_groups(rli, &jobs, &groups); 
+  ulong save_slave_parallel_workers= rli->slave_parallel_workers;
 
   Format_description_log_event fdle(BINLOG_VERSION);
   if (!fdle.is_valid())
     goto end;
 
+  rli->slave_parallel_workers= 0;
   for (uint pos= 0; pos < jobs.elements; pos++)
   {
     get_dynamic(&jobs, (uchar *) &job, pos);
@@ -3857,10 +3862,6 @@ bool mts_recovery_routine(Relay_log_info
                           job.group_relay_log_name,
                           (ulong) job.group_relay_log_pos);
 
-    if (job.group_relay_log_name == NULL || job.group_relay_log_pos == 0 ||
-        rli->get_group_relay_log_pos() >= job.group_relay_log_pos)
-      continue;
-
     if (log_name == NULL || strcmp(log_name, job.group_relay_log_name))
     {
       if (ev)
@@ -3908,12 +3909,43 @@ bool mts_recovery_routine(Relay_log_info
     {
       DBUG_ASSERT(ev->is_valid());
 
+      /* Andrei --> This was not supposed to happen. */
+      if (group_counter > groups.n_bits)
+      {
+        sql_print_information("Invalid condition, we need to investigate this.");
+        break;
+      }
+        
+      bool unhandled= !bitmap_is_set(&groups, group_counter);
+        
+      if (ev->starts_group())
+        curr_group_seen_begin= TRUE;
+
+      if (unhandled)
+        ev->apply_event(rli);
+
+      if (ev->ends_group() || !curr_group_seen_begin)
+      {
+        curr_group_seen_begin= FALSE;
+        group_counter++;
+      }
+
       delete ev;
+
       ev= NULL;
     }
+
+    rli->set_group_relay_log_name(job.group_relay_log_name);
+    rli->set_event_relay_log_name(job.group_relay_log_name);
+    rli->set_group_master_log_name(job.group_master_log_name);
+
+    rli->set_group_relay_log_pos(job.group_relay_log_pos);
+    rli->set_event_relay_log_pos(job.group_relay_log_pos);
+    rli->set_group_master_log_pos(job.group_master_log_pos);
   }
 
 end:
+  rli->slave_parallel_workers= save_slave_parallel_workers;
   if (desc)
   {
     delete desc;
@@ -3935,6 +3967,7 @@ end:
   }
 
   delete_dynamic(&jobs);
+  bitmap_free(&groups);
 
   DBUG_RETURN(error ? error : rli->flush_info(TRUE));
 }
@@ -4039,7 +4072,7 @@ int slave_start_single_worker(Relay_log_
   uint k;
   pthread_t th;
   Slave_worker *w=
-    Rpl_info_factory::create_worker(opt_worker_repository_id, i);
+    Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli);
   Slave_job_item empty= {NULL};
 
   w->c_rli= rli;
@@ -5764,12 +5797,10 @@ static Log_event* next_event(Relay_log_i
       /* 
          MTS checkpoint in the successful read branch 
       */
-      bool critical= critical_worker(rli);
-      if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
-                                      critical))
+      if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
       {
         ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
-        mts_checkpoint_routine(rli, period, critical, TRUE); // ALFRANIO ERROR
+        mts_checkpoint_routine(rli, period, FALSE, TRUE); // ALFRANIO ERROR
       }
 
       if (hot_log)
@@ -5891,17 +5922,14 @@ static Log_event* next_event(Relay_log_i
 
         const char* old_msg= thd->proc_info;
 
-        bool critical= critical_worker(rli);
-        if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
-                                        critical))
+        if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
         {
           int ret= 0;
           struct timespec waittime;
           ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
           do
           {
-            critical= critical_worker(rli);
-            mts_checkpoint_routine(rli, period, critical, FALSE); // ALFRANIO ERROR
+            mts_checkpoint_routine(rli, period, FALSE, FALSE); // ALFRANIO ERROR
             set_timespec_nsec(waittime, period);
             thd->enter_cond(log_cond, log_lock,
                             "Slave has read all relay log; "

=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h	2010-12-08 00:33:48 +0000
+++ b/sql/rpl_slave.h	2010-12-18 00:33:02 +0000
@@ -240,6 +240,8 @@ extern char *master_ssl_cipher, *master_
 extern I_List<THD> threads;
 
 bool mts_recovery_routine(Relay_log_info *rli);
+bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
+                            bool force, bool locked);
 
 #endif /* HAVE_REPLICATION */
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-12-14 08:57:16 +0000
+++ b/sql/sys_vars.cc	2010-12-18 00:33:02 +0000
@@ -3111,6 +3111,12 @@ static Sys_var_uint Sys_checkpoint_mts_p
        "The zero value disables the checkpoint routine (makes sense for debugging).",
        GLOBAL_VAR(mts_checkpoint_period), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, UINT_MAX), DEFAULT(300), BLOCK_SIZE(1));
+
+static Sys_var_uint Sys_checkpoint_mts_group(
+       "mts_checkpoint_group", "Define the number of transactions "
+       "before a checkpoint operation is called.",
+       GLOBAL_VAR(mts_checkpoint_group), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(512, UINT_MAX), DEFAULT(512), BLOCK_SIZE(8));
 #endif
 
 static Sys_var_uint Sys_sync_binlog_period(


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101220155858-wqeicq3db3m9qdor.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3251) WL#5569Andrei Elkin20 Dec