#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3251 Andrei Elkin 2010-12-20 [merge]
wl#5569 MTS
manual merging from the repo and correcting GAQ processing with introducing a volatile byte to indicate whether an item is busy or released.
added:
mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result
mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test
modified:
mysql-test/extra/rpl_tests/rpl_parallel_load.test
mysql-test/r/mysqld--help-notwin.result
mysql-test/r/mysqld--help-win.result
mysql-test/suite/funcs_1/r/is_columns_mysql.result
mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
mysql-test/suite/rpl/t/disabled.def
mysql-test/suite/sys_vars/r/all_vars.result
scripts/mysql_system_tables.sql
sql/log_event.cc
sql/mysqld.cc
sql/mysqld.h
sql/rpl_info_factory.cc
sql/rpl_info_factory.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/rpl_slave.h
sql/sys_vars.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-17 12:46:15 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-20 15:58:58 +0000
@@ -6,7 +6,7 @@
# load volume parameter
#
-let $iter = 1000;
+let $iter = 500;
# windows run on PB2 is too slow to time out
disable_query_log;
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result 2010-12-14 14:46:20 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result 2010-12-18 00:33:02 +0000
@@ -341,6 +341,9 @@ The following options may be given as th
--min-examined-row-limit=#
Don't write queries to slow log that examine fewer rows
than that
+ --mts-checkpoint-group=#
+ Define the number of transactions before a checkpoint
+ operation is called.
--mts-checkpoint-period=#
Gather workers' activities to flush the relay log info to
disk after every #th milli-seconds. The zero value
@@ -895,6 +898,7 @@ max-user-connections 0
max-write-lock-count 18446744073709551615
memlock FALSE
min-examined-row-limit 0
+mts-checkpoint-group 512
mts-checkpoint-period 300
mts-coordinator-basic-nap 5
mts-exp-slave-local-timestamp FALSE
=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result 2010-12-16 21:41:45 +0000
+++ b/mysql-test/r/mysqld--help-win.result 2010-12-18 00:33:02 +0000
@@ -340,6 +340,9 @@ The following options may be given as th
--min-examined-row-limit=#
Don't write queries to slow log that examine fewer rows
than that
+ --mts-checkpoint-group=#
+ Define the number of transactions before a checkpoint
+ operation is called.
--mts-checkpoint-period=#
Gather workers' activities to flush the relay log info to
disk after every #th milli-seconds. The zero value
@@ -898,6 +901,7 @@ max-user-connections 0
max-write-lock-count 18446744073709551615
memlock FALSE
min-examined-row-limit 0
+mts-checkpoint-group 512
mts-checkpoint-period 300
mts-coordinator-basic-nap 5
mts-exp-slave-local-timestamp FALSE
=== modified file 'mysql-test/suite/funcs_1/r/is_columns_mysql.result'
--- a/mysql-test/suite/funcs_1/r/is_columns_mysql.result 2010-12-15 17:46:05 +0000
+++ b/mysql-test/suite/funcs_1/r/is_columns_mysql.result 2010-12-18 00:33:02 +0000
@@ -179,9 +179,8 @@ def mysql slave_relay_log_info Number_of
def mysql slave_relay_log_info Relay_log_name 3 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
def mysql slave_relay_log_info Relay_log_pos 4 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
def mysql slave_relay_log_info Sql_delay 7 NULL NO int NULL NULL 10 0 NULL NULL int(11) select,insert,update,references
-def mysql slave_worker_info Checkpoint_log_pos 7 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
-def mysql slave_worker_info Group_bitmap 9 NULL YES text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
-def mysql slave_worker_info Group_count 8 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned select,insert,update,references
+def mysql slave_worker_info Group_bitmap 8 NULL YES text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
+def mysql slave_worker_info Group_count 7 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned select,insert,update,references
def mysql slave_worker_info Master_id 1 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned PRI select,insert,update,references
def mysql slave_worker_info Master_log_name 5 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
def mysql slave_worker_info Master_log_pos 6 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
@@ -517,7 +516,6 @@ NULL mysql slave_worker_info Worker_id i
NULL mysql slave_worker_info Relay_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
1.0000 mysql slave_worker_info Master_log_name text 65535 65535 utf8 utf8_bin text
NULL mysql slave_worker_info Master_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
-NULL mysql slave_worker_info Checkpoint_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
NULL mysql slave_worker_info Group_count int NULL NULL NULL NULL int(10) unsigned
1.0000 mysql slave_worker_info Group_bitmap text 65535 65535 utf8 utf8_bin text
NULL mysql slow_log start_time timestamp NULL NULL NULL NULL timestamp
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result 2010-12-07 17:35:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result 2010-12-18 00:33:02 +0000
@@ -8,10 +8,12 @@ create view coord_wait_list as
SELECT id from Information_Schema.processlist
where state like 'Waiting for Slave Worker%';
include/stop_slave.inc
-set @save.slave_parallel_workers= @@global.slave_parallel_workers;
-set @@global.slave_parallel_workers= 4;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
set @@global.mts_slave_worker_queue_len_max= 5;
+Warnings:
+Note 1724 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
include/start_slave.inc
create database d0;
create table d0.t1 (a int auto_increment primary key) engine=innodb;
@@ -32,6 +34,8 @@ set @@global.mts_slave_worker_queue_len_
include/stop_slave.inc
set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
set @@global.mts_pending_jobs_size_max= 1024;
+Warnings:
+Note 1724 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
include/start_slave.inc
create table d0.t2 (a int auto_increment primary key, b text null) engine=innodb;
begin;
@@ -190,4 +194,4 @@ drop database d2;
drop database d1;
drop database d0;
drop view coord_wait_list;
-set @@global.slave_parallel_workers= @save.slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def 2010-12-14 12:51:30 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def 2010-12-18 00:33:02 +0000
@@ -16,4 +16,3 @@ rpl_row_event_max_size : Bug#55675 20
rpl_delayed_slave : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
rpl_log_pos : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
rpl_trigger : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
-rpl_parallel_conf_limits : wl#5599 9-12-2010 Andrei Waiting for the recovery wl
=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result 2010-12-16 21:41:45 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result 2010-12-18 00:33:02 +0000
@@ -13,6 +13,7 @@ left join t1 on variable_name=test_name
There should be *no* variables listed below:
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
MTS_EXP_SLAVE_LOCAL_TIMESTAMP
MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
@@ -22,16 +23,16 @@ INNODB_ANALYZE_IS_PERSISTENT
INNODB_RESET_MONITOR_COUNTER
MTS_SLAVE_PARALLEL_WORKERS
MTS_WORKER_UNDERRUN_LEVEL
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
INNODB_ENABLE_MONITOR_COUNTER
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
MTS_COORDINATOR_BASIC_NAP
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
MTS_EXP_SLAVE_LOCAL_TIMESTAMP
MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
@@ -41,13 +42,12 @@ INNODB_ANALYZE_IS_PERSISTENT
INNODB_RESET_MONITOR_COUNTER
MTS_SLAVE_PARALLEL_WORKERS
MTS_WORKER_UNDERRUN_LEVEL
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
INNODB_ENABLE_MONITOR_COUNTER
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
MTS_COORDINATOR_BASIC_NAP
drop table t1;
drop table t2;
=== added file 'mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result'
--- a/mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/mts_checkpoint_group_basic.result 2010-12-18 00:33:02 +0000
@@ -0,0 +1,51 @@
+SET @start_global_value = @@global.mts_checkpoint_group;
+SELECT @start_global_value;
+@start_global_value
+512
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+select @@session.mts_checkpoint_group;
+ERROR HY000: Variable 'mts_checkpoint_group' is a GLOBAL variable
+show global variables like 'mts_checkpoint_group';
+Variable_name Value
+mts_checkpoint_group 512
+show session variables like 'mts_checkpoint_group';
+Variable_name Value
+mts_checkpoint_group 512
+select * from information_schema.global_variables where variable_name='mts_checkpoint_group';
+VARIABLE_NAME VARIABLE_VALUE
+MTS_CHECKPOINT_GROUP 512
+select * from information_schema.session_variables where variable_name='mts_checkpoint_group';
+VARIABLE_NAME VARIABLE_VALUE
+MTS_CHECKPOINT_GROUP 512
+set global mts_checkpoint_group=1;
+Warnings:
+Warning 1292 Truncated incorrect mts_checkpoint_group value: '1'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+set session mts_checkpoint_group=1;
+ERROR HY000: Variable 'mts_checkpoint_group' is a GLOBAL variable and should be set with SET GLOBAL
+set global mts_checkpoint_group=1.1;
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group=1e1;
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group="foo";
+ERROR 42000: Incorrect argument type to variable 'mts_checkpoint_group'
+set global mts_checkpoint_group=0;
+Warnings:
+Warning 1292 Truncated incorrect mts_checkpoint_group value: '0'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
+set global mts_checkpoint_group=cast(-1 as unsigned int);
+Warnings:
+Warning 1292 Truncated incorrect mts_checkpoint_group value: '18446744073709551615'
+select @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+4294967288
+SET @@global.mts_checkpoint_group = @start_global_value;
+SELECT @@global.mts_checkpoint_group;
+@@global.mts_checkpoint_group
+512
=== added file 'mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test'
--- a/mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/mts_checkpoint_group_basic.test 2010-12-18 00:33:02 +0000
@@ -0,0 +1,44 @@
+--source include/not_embedded.inc
+
+SET @start_global_value = @@global.mts_checkpoint_group;
+SELECT @start_global_value;
+
+#
+# exists as global only
+#
+select @@global.mts_checkpoint_group;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+select @@session.mts_checkpoint_group;
+show global variables like 'mts_checkpoint_group';
+show session variables like 'mts_checkpoint_group';
+select * from information_schema.global_variables where variable_name='mts_checkpoint_group';
+select * from information_schema.session_variables where variable_name='mts_checkpoint_group';
+
+#
+# show that it's writable
+#
+set global mts_checkpoint_group=1;
+select @@global.mts_checkpoint_group;
+--error ER_GLOBAL_VARIABLE
+set session mts_checkpoint_group=1;
+
+#
+# incorrect types
+#
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group=1.1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group=1e1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global mts_checkpoint_group="foo";
+
+#
+# min/max values
+#
+set global mts_checkpoint_group=0;
+select @@global.mts_checkpoint_group;
+set global mts_checkpoint_group=cast(-1 as unsigned int);
+select @@global.mts_checkpoint_group;
+
+SET @@global.mts_checkpoint_group = @start_global_value;
+SELECT @@global.mts_checkpoint_group;
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql 2010-12-15 17:46:05 +0000
+++ b/scripts/mysql_system_tables.sql 2010-12-18 00:33:02 +0000
@@ -104,7 +104,7 @@ CREATE TABLE IF NOT EXISTS slave_relay_l
CREATE TABLE IF NOT EXISTS slave_master_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Host TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_name TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_password TEXT CHARACTER SET utf8 COLLATE utf8_bin, Port INTEGER UNSIGNED NOT NULL, Connect_retry INTEGER UNSIGNED NOT NULL, Enabled_ssl BOOLEAN NOT NULL, Ssl_ca TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_capath TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cert TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cipher TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_key TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_verify_servert_cert BOOLEAN NOT NULL, Heartbeat FLOAT NOT NULL, Bind TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ignored_server_ids TEXT CHARACTER SET utf8 COLLATE utf8_bin, Uuid TEXT CHARACTER SET utf8 COLLATE utf8_bin, Retry_count BIGIN!
T UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Master Information';
-CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Checkpoint_log_pos BIGINT UNSIGNED NOT NULL, Group_count INTEGER UNSIGNED NOT NULL, Group_bitmap TEXT CHARACTER SET utf8 COLLATE utf8_bin, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
+CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Group_count INTEGER UNSIGNED NOT NULL, Group_bitmap TEXT CHARACTER SET utf8 COLLATE utf8_bin, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
--
-- PERFORMANCE SCHEMA INSTALLATION
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-18 19:00:23 +0000
+++ b/sql/log_event.cc 2010-12-20 15:58:58 +0000
@@ -2429,6 +2429,7 @@ Slave_worker *Log_event::get_slave_worke
g.checkpoint_log_name= NULL;
g.checkpoint_log_pos= 0;
g.checkpoint_seqno= (uint) -1;
+ g.done= 0;
// the last occupied GAQ's array index
gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2775,6 +2776,10 @@ int Log_event::apply_event(Relay_log_inf
}
DBUG_RETURN(do_apply_event(rli));
}
+
+ uint critical= (rli->checkpoint_group / rli->slave_parallel_workers);
+ if (rli->is_parallel_exec() && rli->checkpoint_seqno > critical)
+ mts_checkpoint_routine(c_rli, 0, TRUE, TRUE);
DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
rli->last_assigned_worker);
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-12-14 14:46:20 +0000
+++ b/sql/mysqld.cc 2010-12-18 00:33:02 +0000
@@ -507,7 +507,7 @@ ulong thread_id=1L,current_pid;
ulong slow_launch_threads = 0;
uint sync_binlog_period= 0, sync_relaylog_period= 0,
sync_relayloginfo_period= 0, sync_masterinfo_period= 0,
- mts_checkpoint_period= 0;
+ mts_checkpoint_period= 0, mts_checkpoint_group;
ulong expire_logs_days = 0;
const double log_10[] = {
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-12-14 14:46:20 +0000
+++ b/sql/mysqld.h 2010-12-18 00:33:02 +0000
@@ -133,7 +133,7 @@ extern ulong expire_logs_days;
extern my_bool relay_log_recovery;
extern uint sync_binlog_period, sync_relaylog_period,
sync_relayloginfo_period, sync_masterinfo_period,
- mts_checkpoint_period;
+ mts_checkpoint_period, mts_checkpoint_group;
extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size;
extern ulong tc_log_page_waits;
extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb;
=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc 2010-12-15 17:46:05 +0000
+++ b/sql/rpl_info_factory.cc 2010-12-18 00:33:02 +0000
@@ -408,7 +408,8 @@ bool Rpl_info_factory::change_engine(Rpl
DBUG_RETURN(FALSE);
}
-Slave_worker *Rpl_info_factory::create_worker(uint worker_option, uint worker_id)
+Slave_worker *Rpl_info_factory::create_worker(uint worker_option, uint worker_id,
+ Relay_log_info *rli)
{
DBUG_ENTER("Rpl_info_factory::create_worker");
@@ -429,7 +430,7 @@ Slave_worker *Rpl_info_factory::create_w
pos= strmov(info_name, "worker");
sprintf(pos, ".%u", worker_id);
- if (!(worker= new Slave_worker(info_name, "worker")))
+ if (!(worker= new Slave_worker(info_name, "worker", rli)))
goto err;
if (!(key_info_idx= new ulong[NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER]))
=== modified file 'sql/rpl_info_factory.h'
--- a/sql/rpl_info_factory.h 2010-12-09 16:17:32 +0000
+++ b/sql/rpl_info_factory.h 2010-12-18 00:33:02 +0000
@@ -64,7 +64,8 @@ class Rpl_info_factory
static bool create_coordinators(uint mi_option, Master_info **mi,
uint rli_option, Relay_log_info **rli);
- static Slave_worker *create_worker(uint rli_option, uint worker_id);
+ static Slave_worker *create_worker(uint rli_option, uint worker_id,
+ Relay_log_info *rli);
static Master_info *create_mi(uint rli_option);
static Relay_log_info *create_rli(uint rli_option, bool is_slave_recovery);
static bool decide_repository(Rpl_info *info,
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli.cc 2010-12-18 00:33:02 +0000
@@ -70,8 +70,8 @@ Relay_log_info::Relay_log_info(bool is_s
tables_to_lock(0), tables_to_lock_count(0),
rows_query_ev(NULL), last_event_start_time(0),
this_worker(NULL), slave_parallel_workers(0),
- sql_delay(0), sql_delay_end(0),
- m_flags(0)
+ checkpoint_group(mts_checkpoint_group),
+ sql_delay(0), sql_delay_end(0), m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli.h 2010-12-18 00:33:02 +0000
@@ -469,6 +469,7 @@ public:
*/
DYNAMIC_ARRAY least_occupied_workers;
uint checkpoint_seqno; // counter of groups executed after the most recent CP
+ uint checkpoint_group; // counter of groups after which a checkpoint is called.
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-17 12:46:15 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-20 15:58:58 +0000
@@ -15,16 +15,15 @@ const char *info_slave_worker_fields []=
"relay_log_pos",
"master_log_name",
"master_log_pos",
- "checkpoint_log_pos",
"group_count",
"group_bitmap"
};
-Slave_worker::Slave_worker(const char* type, const char* pfs)
- : Rpl_info_worker(type, pfs), curr_group_exec_parts(0),
+Slave_worker::Slave_worker(const char* type, const char* pfs,
+ Relay_log_info *rli)
+ : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
group_relay_log_pos(0), group_master_log_pos(0),
- checkpoint_log_pos(0), inited_group_execed(0),
- critical(0)
+ checkpoint_log_pos(0), inited_group_execed(0)
{
group_relay_log_name[0]= 0;
group_master_log_name[0]= 0;
@@ -52,7 +51,8 @@ int Slave_worker::init_info()
if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
goto err;
- if (bitmap_init(&group_execed, NULL, 32768, FALSE))
+ if (bitmap_init(&group_execed, NULL,
+ c_rli->checkpoint_group, FALSE))
goto err;
inited_group_execed= 1;
@@ -125,7 +125,6 @@ bool Slave_worker::read_info(Rpl_info_ha
ulong temp_group_relay_log_pos= 0;
ulong temp_group_master_log_pos= 0;
- ulong temp_checkpoint_log_pos= 0;
ulong nbytes= 0;
uchar *buffer= (uchar *) group_execed.bitmap;
@@ -142,8 +141,6 @@ bool Slave_worker::read_info(Rpl_info_ha
(char *) "") ||
from->get_info((ulong *) &temp_group_master_log_pos,
(ulong) 0) ||
- from->get_info((ulong *) &temp_checkpoint_log_pos,
- (ulong) 0) ||
from->get_info(&nbytes, (ulong) 0) ||
from->get_info(buffer, (size_t) nbytes,
(uchar *) 0))
@@ -151,7 +148,6 @@ bool Slave_worker::read_info(Rpl_info_ha
group_relay_log_pos= temp_group_relay_log_pos;
group_master_log_pos= temp_group_master_log_pos;
- checkpoint_log_pos= temp_checkpoint_log_pos;
DBUG_RETURN(FALSE);
}
@@ -168,7 +164,6 @@ bool Slave_worker::write_info(Rpl_info_h
to->set_info((ulong) group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
to->set_info((ulong) group_master_log_pos) ||
- to->set_info((ulong) checkpoint_log_pos) ||
to->set_info(nbytes) ||
to->set_info(buffer, (size_t) nbytes))
DBUG_RETURN(TRUE);
@@ -196,16 +191,10 @@ bool Slave_worker::commit_positions(Log_
my_free(ptr_g->checkpoint_log_name);
ptr_g->checkpoint_log_name= NULL;
- critical= FALSE;
bitmap_clear_all(&group_execed);
}
- if (ptr_g->checkpoint_seqno)
- {
- if (ptr_g->checkpoint_seqno > (32768 / 8))
- critical= TRUE;
- bitmap_fast_test_and_set(&group_execed, ptr_g->checkpoint_seqno);
- }
+ bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
@@ -537,6 +526,7 @@ void Slave_worker::slave_worker_ends_gro
// first ever group must have relay log name
DBUG_ASSERT(last_group_done_index != c_rli->gaq->s ||
ptr_g->group_relay_log_name != NULL);
+ DBUG_ASSERT(ptr_g->worker_id == id);
if (ptr_g->group_relay_log_name != NULL)
{
@@ -555,8 +545,9 @@ void Slave_worker::slave_worker_ends_gro
ptr_g->group_master_log_pos= group_master_log_pos;
ptr_g->group_relay_log_pos= group_relay_log_pos;
+ ptr_g->done= 1; // GAQ index is available to C now
- last_group_done_index = gaq_idx; // GAQ index is available to C now
+ last_group_done_index= gaq_idx;
}
// cleanup relating to the last executed group regardless of error
@@ -629,6 +620,7 @@ ulong circular_buffer_queue::de_queue(uc
DBUG_ASSERT(e == s ||
(len == (a >= e)? (a - e) :
(s + a - e)));
+ DBUG_ASSERT(a != e);
return ret;
}
@@ -655,6 +647,7 @@ ulong circular_buffer_queue::de_tail(uch
DBUG_ASSERT(e == s ||
(len == (a >= e)? (a - e) :
(s + a - e)));
+ DBUG_ASSERT(a != e);
return a;
}
@@ -686,9 +679,12 @@ ulong circular_buffer_queue::en_queue(vo
// post-boundary cond
if (a == e)
a= s;
+
DBUG_ASSERT(a == e ||
len == (a >= e) ?
(a - e) : (s + a - e));
+ DBUG_ASSERT(a != e);
+
return ret;
}
@@ -714,12 +710,13 @@ void* circular_buffer_queue::head_queue(
range, incl cases the queue is empty or full.
@return TRUE if the first arg identifies a queue entity ordered
- before one defined by the 2nd arg,
+ after one defined by the 2nd arg,
FALSE otherwise.
*/
bool circular_buffer_queue::gt(ulong i, ulong k)
{
DBUG_ASSERT(i < s && k < s);
+ DBUG_ASSERT(a != e);
if (i >= e)
if (k >= e)
@@ -770,8 +767,12 @@ ulong Slave_committed_queue::move_queue_
if (ptr_g->worker_id == (ulong) -1)
break; /* the head is not even assigned */
get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
-
- if (w_i->last_group_done_index == s || gt(i, w_i->last_group_done_index))
+
+ // no stale last_group_done_index value
+ // DBUG_ASSERT(w_i->checkpoint_notified);
+
+ //if (in(w_i->last_group_done_index) && gt(i, w_i->last_group_done_index))
+ if (!ptr_g->done)
break; /* gap at i'th */
// memorize the last met group_relay_log_name
@@ -869,25 +870,3 @@ int wait_for_workers_to_finish(Relay_log
}
return ret;
}
-
-bool critical_worker(Relay_log_info *rli)
-{
- bool critical= FALSE;
- DYNAMIC_ARRAY *ws= &rli->workers;
- Slave_worker *current_worker= NULL;
-
- if (rli->is_parallel_exec() && ws)
- {
- for (uint i= 0; ws && i < ws->elements; i++)
- {
- get_dynamic(ws, (uchar*) ¤t_worker, i);
- if (current_worker->critical)
- {
- critical= current_worker->critical;
- break;
- }
- }
- }
- return(critical);
-}
-
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-16 22:00:47 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-20 15:58:58 +0000
@@ -98,6 +98,9 @@ public:
*/
void* head_queue();
bool gt(ulong i, ulong k); // comparision of ordering of two entities
+ /* index is within the valid range */
+ bool in(ulong k) { return !empty() &&
+ (e > a ? (k >= e || k < a) : (k >= e && k < a)); }
bool empty() { return e == s; }
bool full() { return a == s; }
};
@@ -127,6 +130,7 @@ typedef struct st_slave_job_group
uint checkpoint_seqno;
my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
char* checkpoint_log_name;
+ volatile uchar done; // Flag raised by W, read and reset by C
} Slave_job_group;
#define get_job(from, to) \
@@ -196,7 +200,8 @@ public:
class Slave_worker : public Rpl_info_worker
{
public:
- Slave_worker(const char *type, const char *pfs);
+ Slave_worker(const char *type, const char *pfs,
+ Relay_log_info *rli);
virtual ~Slave_worker();
mysql_mutex_t jobs_lock;
@@ -261,8 +266,6 @@ public:
bool inited_group_execed;
- bool critical;
-
private:
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-17 12:46:15 +0000
+++ b/sql/rpl_slave.cc 2010-12-18 00:33:02 +0000
@@ -171,9 +171,6 @@ static int terminate_slave_thread(THD *t
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
- bool force, bool locked);
-bool mts_recovery_routine(Relay_log_info *rli);
/*
Find out which replications threads are running
@@ -3796,13 +3793,11 @@ int mts_recovery_cmp(Slave_job_group *id
{
longlong filecmp= strcmp(id1->group_relay_log_name, id2->group_relay_log_name);
longlong poscmp= id1->group_relay_log_pos - id2->group_relay_log_pos;
- return (filecmp < 0 ? -1 :
- (filecmp > 0 ? 1 :
- (poscmp < 0 ? -1 :
- (poscmp > 0 ? 1 : 0))));
+ return (filecmp < 0 ? -1 : (filecmp > 0 ? 1 :
+ (poscmp < 0 ? -1 : (poscmp > 0 ? 1 : 0))));
}
-bool mts_recovery_groups(Relay_log_info *rli, DYNAMIC_ARRAY *jobs)
+bool mts_recovery_groups(Relay_log_info *rli, DYNAMIC_ARRAY *jobs, MY_BITMAP *groups)
{
DBUG_ENTER("mts_recovery_groups");
DBUG_ASSERT(rli->slave_parallel_workers > 0);
@@ -3812,15 +3807,20 @@ bool mts_recovery_groups(Relay_log_info
for (uint id= 0; id < rli->slave_parallel_workers; id++)
{
Slave_worker *worker=
- Rpl_info_factory::create_worker(opt_worker_repository_id, id);
+ Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
worker->init_info();
- get_job(worker, job);
- insert_dynamic(jobs, (uchar*) &job);
+ longlong filecmp= strcmp(rli->get_group_relay_log_name(),
+ worker->group_relay_log_name);
+ longlong poscmp= rli->get_group_relay_log_pos() -
+ worker->group_relay_log_pos;
+ if (filecmp < 0 || (filecmp == 0 && poscmp < 0))
+ {
+ get_job(worker, job);
+ insert_dynamic(jobs, (uchar*) &job);
+ }
}
sort_dynamic(jobs, (qsort_cmp) mts_recovery_cmp);
- DBUG_ASSERT(rli->slave_parallel_workers == jobs->elements);
-
DBUG_RETURN(FALSE);
}
@@ -3831,22 +3831,27 @@ bool mts_recovery_routine(Relay_log_info
const char *errmsg= NULL;
bool error= FALSE;
DYNAMIC_ARRAY jobs;
+ MY_BITMAP groups;
+ uint group_counter;
+ bool curr_group_seen_begin= FALSE;
Slave_job_group job;
IO_CACHE log;
File file;
MY_STAT s;
DBUG_ENTER("mts_recovery_routine");
-
+
+ bitmap_init(&groups, NULL, rli->checkpoint_group, FALSE);
my_init_dynamic_array(&jobs, sizeof(Slave_job_group),
rli->slave_parallel_workers, rli->slave_parallel_workers);
-
- mts_recovery_groups(rli, &jobs);
+ mts_recovery_groups(rli, &jobs, &groups);
+ ulong save_slave_parallel_workers= rli->slave_parallel_workers;
Format_description_log_event fdle(BINLOG_VERSION);
if (!fdle.is_valid())
goto end;
+ rli->slave_parallel_workers= 0;
for (uint pos= 0; pos < jobs.elements; pos++)
{
get_dynamic(&jobs, (uchar *) &job, pos);
@@ -3857,10 +3862,6 @@ bool mts_recovery_routine(Relay_log_info
job.group_relay_log_name,
(ulong) job.group_relay_log_pos);
- if (job.group_relay_log_name == NULL || job.group_relay_log_pos == 0 ||
- rli->get_group_relay_log_pos() >= job.group_relay_log_pos)
- continue;
-
if (log_name == NULL || strcmp(log_name, job.group_relay_log_name))
{
if (ev)
@@ -3908,12 +3909,43 @@ bool mts_recovery_routine(Relay_log_info
{
DBUG_ASSERT(ev->is_valid());
+ /* Andrei --> This was not supposed to happen. */
+ if (group_counter > groups.n_bits)
+ {
+ sql_print_information("Invalid condition, we need to investigate this.");
+ break;
+ }
+
+ bool unhandled= !bitmap_is_set(&groups, group_counter);
+
+ if (ev->starts_group())
+ curr_group_seen_begin= TRUE;
+
+ if (unhandled)
+ ev->apply_event(rli);
+
+ if (ev->ends_group() || !curr_group_seen_begin)
+ {
+ curr_group_seen_begin= FALSE;
+ group_counter++;
+ }
+
delete ev;
+
ev= NULL;
}
+
+ rli->set_group_relay_log_name(job.group_relay_log_name);
+ rli->set_event_relay_log_name(job.group_relay_log_name);
+ rli->set_group_master_log_name(job.group_master_log_name);
+
+ rli->set_group_relay_log_pos(job.group_relay_log_pos);
+ rli->set_event_relay_log_pos(job.group_relay_log_pos);
+ rli->set_group_master_log_pos(job.group_master_log_pos);
}
end:
+ rli->slave_parallel_workers= save_slave_parallel_workers;
if (desc)
{
delete desc;
@@ -3935,6 +3967,7 @@ end:
}
delete_dynamic(&jobs);
+ bitmap_free(&groups);
DBUG_RETURN(error ? error : rli->flush_info(TRUE));
}
@@ -4039,7 +4072,7 @@ int slave_start_single_worker(Relay_log_
uint k;
pthread_t th;
Slave_worker *w=
- Rpl_info_factory::create_worker(opt_worker_repository_id, i);
+ Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli);
Slave_job_item empty= {NULL};
w->c_rli= rli;
@@ -5764,12 +5797,10 @@ static Log_event* next_event(Relay_log_i
/*
MTS checkpoint in the successful read branch
*/
- bool critical= critical_worker(rli);
- if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
- critical))
+ if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
{
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
- mts_checkpoint_routine(rli, period, critical, TRUE); // ALFRANIO ERROR
+ mts_checkpoint_routine(rli, period, FALSE, TRUE); // ALFRANIO ERROR
}
if (hot_log)
@@ -5891,17 +5922,14 @@ static Log_event* next_event(Relay_log_i
const char* old_msg= thd->proc_info;
- bool critical= critical_worker(rli);
- if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
- critical))
+ if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
{
int ret= 0;
struct timespec waittime;
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
do
{
- critical= critical_worker(rli);
- mts_checkpoint_routine(rli, period, critical, FALSE); // ALFRANIO ERROR
+ mts_checkpoint_routine(rli, period, FALSE, FALSE); // ALFRANIO ERROR
set_timespec_nsec(waittime, period);
thd->enter_cond(log_cond, log_lock,
"Slave has read all relay log; "
=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h 2010-12-08 00:33:48 +0000
+++ b/sql/rpl_slave.h 2010-12-18 00:33:02 +0000
@@ -240,6 +240,8 @@ extern char *master_ssl_cipher, *master_
extern I_List<THD> threads;
bool mts_recovery_routine(Relay_log_info *rli);
+bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
+ bool force, bool locked);
#endif /* HAVE_REPLICATION */
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-12-14 08:57:16 +0000
+++ b/sql/sys_vars.cc 2010-12-18 00:33:02 +0000
@@ -3111,6 +3111,12 @@ static Sys_var_uint Sys_checkpoint_mts_p
"The zero value disables the checkpoint routine (makes sense for debugging).",
GLOBAL_VAR(mts_checkpoint_period), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, UINT_MAX), DEFAULT(300), BLOCK_SIZE(1));
+
+static Sys_var_uint Sys_checkpoint_mts_group(
+ "mts_checkpoint_group", "Define the number of transactions "
+ "before a checkpoint operation is called.",
+ GLOBAL_VAR(mts_checkpoint_group), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(512, UINT_MAX), DEFAULT(512), BLOCK_SIZE(8));
#endif
static Sys_var_uint Sys_sync_binlog_period(
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101220155858-wqeicq3db3m9qdor.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3251) WL#5569 | Andrei Elkin | 20 Dec |