#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3248 Andrei Elkin 2010-12-17 [merge]
merge from wl#5569 repo to local branch
rpl_sequential opt files are added to avoid mtr give up to process a bulk of unsafe warnings.
added:
mysql-test/suite/rpl/t/rpl_sequential-master.opt
mysql-test/suite/rpl/t/rpl_sequential-slave.opt
modified:
mysql-test/suite/funcs_1/r/is_columns_mysql.result
mysql-test/suite/rpl/r/rpl_mixed_crash_safe.result
mysql-test/suite/rpl/r/rpl_row_crash_safe.result
mysql-test/suite/rpl/r/rpl_stm_crash_safe.result
scripts/mysql_system_tables.sql
sql/rpl_info_dummy.cc
sql/rpl_info_dummy.h
sql/rpl_info_factory.cc
sql/rpl_info_file.cc
sql/rpl_info_file.h
sql/rpl_info_handler.h
sql/rpl_info_table.cc
sql/rpl_info_table.h
sql/rpl_info_table_access.cc
sql/rpl_mi.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/suite/funcs_1/r/is_columns_mysql.result'
--- a/mysql-test/suite/funcs_1/r/is_columns_mysql.result 2010-12-08 00:33:48 +0000
+++ b/mysql-test/suite/funcs_1/r/is_columns_mysql.result 2010-12-15 17:46:05 +0000
@@ -175,15 +175,18 @@ def mysql slave_relay_log_info Master_id
def mysql slave_relay_log_info Master_log_name 5 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
def mysql slave_relay_log_info Master_log_pos 6 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
def mysql slave_relay_log_info Number_of_lines 2 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned select,insert,update,references
+def mysql slave_relay_log_info Number_of_workers 8 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned select,insert,update,references
def mysql slave_relay_log_info Relay_log_name 3 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
def mysql slave_relay_log_info Relay_log_pos 4 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
def mysql slave_relay_log_info Sql_delay 7 NULL NO int NULL NULL 10 0 NULL NULL int(11) select,insert,update,references
+def mysql slave_worker_info Checkpoint_log_pos 7 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
+def mysql slave_worker_info Group_bitmap 9 NULL YES text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
+def mysql slave_worker_info Group_count 8 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned select,insert,update,references
def mysql slave_worker_info Master_id 1 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned PRI select,insert,update,references
-def mysql slave_worker_info Master_log_name 6 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
-def mysql slave_worker_info Master_log_pos 7 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
-def mysql slave_worker_info Partitions 3 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
-def mysql slave_worker_info Relay_log_name 4 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
-def mysql slave_worker_info Relay_log_pos 5 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
+def mysql slave_worker_info Master_log_name 5 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
+def mysql slave_worker_info Master_log_pos 6 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
+def mysql slave_worker_info Relay_log_name 3 NULL NO text 65535 65535 NULL NULL utf8 utf8_bin text select,insert,update,references
+def mysql slave_worker_info Relay_log_pos 4 NULL NO bigint NULL NULL 20 0 NULL NULL bigint(20) unsigned select,insert,update,references
def mysql slave_worker_info Worker_id 2 NULL NO int NULL NULL 10 0 NULL NULL int(10) unsigned PRI select,insert,update,references
def mysql slow_log db 7 NULL NO varchar 512 1536 NULL NULL utf8 utf8_general_ci varchar(512) select,insert,update,references
def mysql slow_log insert_id 9 NULL NO int NULL NULL 10 0 NULL NULL int(11) select,insert,update,references
@@ -507,13 +510,16 @@ NULL mysql slave_relay_log_info Relay_lo
1.0000 mysql slave_relay_log_info Master_log_name text 65535 65535 utf8 utf8_bin text
NULL mysql slave_relay_log_info Master_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
NULL mysql slave_relay_log_info Sql_delay int NULL NULL NULL NULL int(11)
+NULL mysql slave_relay_log_info Number_of_workers int NULL NULL NULL NULL int(10) unsigned
NULL mysql slave_worker_info Master_id int NULL NULL NULL NULL int(10) unsigned
NULL mysql slave_worker_info Worker_id int NULL NULL NULL NULL int(10) unsigned
-1.0000 mysql slave_worker_info Partitions text 65535 65535 utf8 utf8_bin text
1.0000 mysql slave_worker_info Relay_log_name text 65535 65535 utf8 utf8_bin text
NULL mysql slave_worker_info Relay_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
1.0000 mysql slave_worker_info Master_log_name text 65535 65535 utf8 utf8_bin text
NULL mysql slave_worker_info Master_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
+NULL mysql slave_worker_info Checkpoint_log_pos bigint NULL NULL NULL NULL bigint(20) unsigned
+NULL mysql slave_worker_info Group_count int NULL NULL NULL NULL int(10) unsigned
+1.0000 mysql slave_worker_info Group_bitmap text 65535 65535 utf8 utf8_bin text
NULL mysql slow_log start_time timestamp NULL NULL NULL NULL timestamp
1.0000 mysql slow_log user_host mediumtext 16777215 16777215 utf8 utf8_general_ci mediumtext
NULL mysql slow_log query_time time NULL NULL NULL NULL time
=== modified file 'mysql-test/suite/rpl/r/rpl_mixed_crash_safe.result'
--- a/mysql-test/suite/rpl/r/rpl_mixed_crash_safe.result 2010-12-13 21:16:31 +0000
+++ b/mysql-test/suite/rpl/r/rpl_mixed_crash_safe.result 2010-12-15 17:46:05 +0000
@@ -24,6 +24,7 @@ slave_relay_log_info CREATE TABLE `slave
`Master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`Master_log_pos` bigint(20) unsigned NOT NULL,
`Sql_delay` int(11) NOT NULL,
+ `Number_of_workers` int(10) unsigned NOT NULL,
PRIMARY KEY (`Master_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Relay Log Information'
SET SQL_LOG_BIN=0;
=== modified file 'mysql-test/suite/rpl/r/rpl_row_crash_safe.result'
--- a/mysql-test/suite/rpl/r/rpl_row_crash_safe.result 2010-12-13 21:16:31 +0000
+++ b/mysql-test/suite/rpl/r/rpl_row_crash_safe.result 2010-12-15 17:46:05 +0000
@@ -24,6 +24,7 @@ slave_relay_log_info CREATE TABLE `slave
`Master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`Master_log_pos` bigint(20) unsigned NOT NULL,
`Sql_delay` int(11) NOT NULL,
+ `Number_of_workers` int(10) unsigned NOT NULL,
PRIMARY KEY (`Master_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Relay Log Information'
SET SQL_LOG_BIN=0;
=== modified file 'mysql-test/suite/rpl/r/rpl_stm_crash_safe.result'
--- a/mysql-test/suite/rpl/r/rpl_stm_crash_safe.result 2010-12-13 21:16:31 +0000
+++ b/mysql-test/suite/rpl/r/rpl_stm_crash_safe.result 2010-12-15 17:46:05 +0000
@@ -24,6 +24,7 @@ slave_relay_log_info CREATE TABLE `slave
`Master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`Master_log_pos` bigint(20) unsigned NOT NULL,
`Sql_delay` int(11) NOT NULL,
+ `Number_of_workers` int(10) unsigned NOT NULL,
PRIMARY KEY (`Master_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Relay Log Information'
SET SQL_LOG_BIN=0;
=== added file 'mysql-test/suite/rpl/t/rpl_sequential-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_sequential-master.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_sequential-master.opt 2010-12-16 22:00:47 +0000
@@ -0,0 +1 @@
+--log-warnings=0
=== added file 'mysql-test/suite/rpl/t/rpl_sequential-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_sequential-slave.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_sequential-slave.opt 2010-12-16 22:00:47 +0000
@@ -0,0 +1,2 @@
+--log-warnings=0
+
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql 2010-12-08 00:33:48 +0000
+++ b/scripts/mysql_system_tables.sql 2010-12-15 17:46:05 +0000
@@ -100,11 +100,11 @@ CREATE TABLE IF NOT EXISTS event ( db ch
CREATE TABLE IF NOT EXISTS ndb_binlog_index (Position BIGINT UNSIGNED NOT NULL, File VARCHAR(255) NOT NULL, epoch BIGINT UNSIGNED NOT NULL, inserts BIGINT UNSIGNED NOT NULL, updates BIGINT UNSIGNED NOT NULL, deletes BIGINT UNSIGNED NOT NULL, schemaops BIGINT UNSIGNED NOT NULL, PRIMARY KEY(epoch)) ENGINE=MYISAM;
-CREATE TABLE IF NOT EXISTS slave_relay_log_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Sql_delay INTEGER NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Relay Log Information';
+CREATE TABLE IF NOT EXISTS slave_relay_log_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Sql_delay INTEGER NOT NULL, Number_of_workers INTEGER UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Relay Log Information';
CREATE TABLE IF NOT EXISTS slave_master_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Host TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_name TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_password TEXT CHARACTER SET utf8 COLLATE utf8_bin, Port INTEGER UNSIGNED NOT NULL, Connect_retry INTEGER UNSIGNED NOT NULL, Enabled_ssl BOOLEAN NOT NULL, Ssl_ca TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_capath TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cert TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cipher TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_key TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_verify_servert_cert BOOLEAN NOT NULL, Heartbeat FLOAT NOT NULL, Bind TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ignored_server_ids TEXT CHARACTER SET utf8 COLLATE utf8_bin, Uuid TEXT CHARACTER SET utf8 COLLATE utf8_bin, Retry_count BIGIN!
T UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Master Information';
-CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Partitions TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
+CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Checkpoint_log_pos BIGINT UNSIGNED NOT NULL, Group_count INTEGER UNSIGNED NOT NULL, Group_bitmap TEXT CHARACTER SET utf8 COLLATE utf8_bin, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
--
-- PERFORMANCE SCHEMA INSTALLATION
=== modified file 'sql/rpl_info_dummy.cc'
--- a/sql/rpl_info_dummy.cc 2010-12-09 16:17:32 +0000
+++ b/sql/rpl_info_dummy.cc 2010-12-15 17:46:05 +0000
@@ -79,6 +79,14 @@ bool Rpl_info_dummy::do_set_info(const i
}
bool Rpl_info_dummy::do_set_info(const int pos __attribute__((unused)),
+ const uchar *value __attribute__((unused)),
+ const size_t size __attribute__((unused)))
+{
+ if (abort) DBUG_ASSERT(0);
+ return FALSE;
+}
+
+bool Rpl_info_dummy::do_set_info(const int pos __attribute__((unused)),
const ulong value __attribute__((unused)))
{
if (abort) DBUG_ASSERT(0);
@@ -116,6 +124,15 @@ bool Rpl_info_dummy::do_get_info(const i
}
bool Rpl_info_dummy::do_get_info(const int pos __attribute__((unused)),
+ uchar *value __attribute__((unused)),
+ const size_t size __attribute__((unused)),
+ const uchar *default_value __attribute__((unused)))
+{
+ if (abort) DBUG_ASSERT(0);
+ return FALSE;
+}
+
+bool Rpl_info_dummy::do_get_info(const int pos __attribute__((unused)),
ulong *value __attribute__((unused)),
const ulong default_value __attribute__((unused)))
{
=== modified file 'sql/rpl_info_dummy.h'
--- a/sql/rpl_info_dummy.h 2010-12-09 16:17:32 +0000
+++ b/sql/rpl_info_dummy.h 2010-12-15 17:46:05 +0000
@@ -44,12 +44,16 @@ private:
int do_prepare_info_for_read(const uint nidx);
int do_prepare_info_for_write(const uint nidx);
bool do_set_info(const int pos, const char *value);
+ bool do_set_info(const int pos, const uchar *value,
+ const size_t size);
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
+ bool do_get_info(const int pos, uchar *value, const size_t size,
+ const uchar *default_value);
bool do_get_info(const int pos, int *value,
const int default_value);
bool do_get_info(const int pos, ulong *value,
=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc 2010-12-13 21:16:31 +0000
+++ b/sql/rpl_info_factory.cc 2010-12-15 17:46:05 +0000
@@ -437,6 +437,7 @@ Slave_worker *Rpl_info_factory::create_w
key_info_idx[0]= server_id;
key_info_idx[1]= worker_id;
worker->set_idx_info(key_info_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
+ worker->id= worker_id;
DBUG_ASSERT(worker_option == WORKER_REPOSITORY_FILE ||
worker_option == WORKER_REPOSITORY_TABLE ||
=== modified file 'sql/rpl_info_file.cc'
--- a/sql/rpl_info_file.cc 2010-12-07 03:05:41 +0000
+++ b/sql/rpl_info_file.cc 2010-12-15 17:46:05 +0000
@@ -186,6 +186,12 @@ bool Rpl_info_file::do_set_info(const in
FALSE : TRUE);
}
+bool Rpl_info_file::do_set_info(const int pos, const uchar *value,
+ const size_t size)
+{
+ return (my_b_write(&info_file, value, size));
+}
+
bool Rpl_info_file::do_set_info(const int pos, const ulong value)
{
return (my_b_printf(&info_file, "%lu\n", value) > (size_t) 0 ?
@@ -243,6 +249,12 @@ bool Rpl_info_file::do_get_info(const in
default_value));
}
+bool Rpl_info_file::do_get_info(const int pos, uchar *value, const size_t size,
+ const uchar *default_value)
+{
+ return(my_b_read(&info_file, value, size));
+}
+
bool Rpl_info_file::do_get_info(const int pos, ulong *value,
const ulong default_value)
{
=== modified file 'sql/rpl_info_file.h'
--- a/sql/rpl_info_file.h 2010-12-07 03:05:41 +0000
+++ b/sql/rpl_info_file.h 2010-12-15 17:46:05 +0000
@@ -51,12 +51,16 @@ private:
int do_prepare_info_for_read(const uint nidx);
int do_prepare_info_for_write(const uint nidx);
bool do_set_info(const int pos, const char *value);
+ bool do_set_info(const int pos, const uchar *value,
+ const size_t size);
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
+ bool do_get_info(const int pos, uchar *value, const size_t size,
+ const uchar *default_value);
bool do_get_info(const int pos, int *value,
const int default_value);
bool do_get_info(const int pos, ulong *value,
=== modified file 'sql/rpl_info_handler.h'
--- a/sql/rpl_info_handler.h 2010-12-07 03:05:41 +0000
+++ b/sql/rpl_info_handler.h 2010-12-15 17:46:05 +0000
@@ -158,6 +158,18 @@ public:
return(prv_error);
}
+ template <class TypeHandler>
+ bool set_info(TypeHandler const value, const size_t size)
+ {
+ if (cursor >= ninfo || prv_error)
+ return TRUE;
+
+ if (!(prv_error= do_set_info(cursor, value, size)))
+ cursor++;
+
+ return(prv_error);
+ }
+
/**
Returns the value of a field.
Any call must be done in the right order which
@@ -199,8 +211,9 @@ public:
@retval FALSE No error
@retval TRUE Failure
*/
- bool get_info(char *value, const size_t size,
- const char *default_value)
+ template <class TypeHandler>
+ bool get_info(TypeHandler value, const size_t size,
+ TypeHandler const default_value)
{
if (cursor >= ninfo || prv_error)
return TRUE;
@@ -314,12 +327,18 @@ private:
virtual int do_prepare_info_for_write(const uint nidx)= 0;
virtual bool do_set_info(const int pos, const char *value)= 0;
+ virtual bool do_set_info(const int pos, const uchar *value,
+ const size_t size)= 0;
virtual bool do_set_info(const int pos, const ulong value)= 0;
virtual bool do_set_info(const int pos, const int value)= 0;
virtual bool do_set_info(const int pos, const float value)= 0;
virtual bool do_set_info(const int pos, const Dynamic_ids *value)= 0;
- virtual bool do_get_info(const int pos, char *value, const size_t size,
+ virtual bool do_get_info(const int pos, char *value,
+ const size_t size,
const char *default_value)= 0;
+ virtual bool do_get_info(const int pos, uchar *value,
+ const size_t size,
+ const uchar *default_value)= 0;
virtual bool do_get_info(const int pos, ulong *value,
const ulong default_value)= 0;
virtual bool do_get_info(const int pos, int *value,
=== modified file 'sql/rpl_info_table.cc'
--- a/sql/rpl_info_table.cc 2010-12-13 21:16:31 +0000
+++ b/sql/rpl_info_table.cc 2010-12-15 17:46:05 +0000
@@ -336,6 +336,14 @@ bool Rpl_info_table::do_set_info(const i
&my_charset_bin));
}
+bool Rpl_info_table::do_set_info(const int pos, const uchar *value,
+ const size_t size)
+{
+ // TODO -- ALFRANIO - ANDREI
+ return (field_values->value[pos].copy((char *) "", 0,
+ &my_charset_bin));
+}
+
bool Rpl_info_table::do_set_info(const int pos, const ulong value)
{
return (field_values->value[pos].set_int(value, TRUE,
@@ -376,6 +384,13 @@ bool Rpl_info_table::do_get_info(const i
return FALSE;
}
+bool Rpl_info_table::do_get_info(const int pos, uchar *value, const size_t size,
+ const uchar *default_value)
+{
+ // TODO -- ALFRANIO - ANDREI
+ return FALSE;
+}
+
bool Rpl_info_table::do_get_info(const int pos, ulong *value,
const ulong default_value)
{
=== modified file 'sql/rpl_info_table.h'
--- a/sql/rpl_info_table.h 2010-12-13 21:16:31 +0000
+++ b/sql/rpl_info_table.h 2010-12-15 17:46:05 +0000
@@ -76,12 +76,16 @@ private:
int do_prepare_info_for_read(const uint nidx);
int do_prepare_info_for_write(const uint nidx);
bool do_set_info(const int pos, const char *value);
+ bool do_set_info(const int pos, const uchar *value,
+ const size_t size);
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
+ bool do_get_info(const int pos, uchar *value, const size_t size,
+ const uchar *default_value);
bool do_get_info(const int pos, int *value,
const int default_value);
bool do_get_info(const int pos, ulong *value,
=== modified file 'sql/rpl_info_table_access.cc'
--- a/sql/rpl_info_table_access.cc 2010-12-13 21:16:31 +0000
+++ b/sql/rpl_info_table_access.cc 2010-12-15 17:46:05 +0000
@@ -275,6 +275,7 @@ bool Rpl_info_table_access::store_info_v
while (field_idx < max_num_field)
{
fields[field_idx]->set_notnull();
+
if (fields[field_idx]->store(field_values->value[field_idx].c_ptr_safe(),
field_values->value[field_idx].length(),
&my_charset_bin))
=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc 2010-12-08 12:59:07 +0000
+++ b/sql/rpl_mi.cc 2010-12-15 17:46:05 +0000
@@ -305,7 +305,8 @@ bool Master_info::read_info(Rpl_info_han
*/
if (from->prepare_info_for_read(nidx) ||
- from->get_info(master_log_name, sizeof(master_log_name), ""))
+ from->get_info(master_log_name, (size_t) sizeof(master_log_name),
+ (char *) ""))
DBUG_RETURN(TRUE);
lines= strtoul(master_log_name, &first_non_digit, 10);
@@ -314,17 +315,18 @@ bool Master_info::read_info(Rpl_info_han
*first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
{
/* Seems to be new format => read master log name */
- if (from->get_info(master_log_name, sizeof(master_log_name), ""))
+ if (from->get_info(master_log_name, (size_t) sizeof(master_log_name),
+ (char *) ""))
DBUG_RETURN(TRUE);
}
else
lines= 7;
if (from->get_info(&temp_master_log_pos,
- (ulong) BIN_LOG_HEADER_SIZE) ||
- from->get_info(host, sizeof(host), 0) ||
- from->get_info(user, sizeof(user), "test") ||
- from->get_info(password, sizeof(password), 0) ||
+ (ulong) BIN_LOG_HEADER_SIZE) ||
+ from->get_info(host, (size_t) sizeof(host), (char *) 0) ||
+ from->get_info(user, (size_t) sizeof(user), (char *) "test") ||
+ from->get_info(password, (size_t) sizeof(password), (char *) 0) ||
from->get_info((int *) &port, (int) MYSQL_PORT) ||
from->get_info((int *) &connect_retry,
(int) DEFAULT_CONNECT_RETRY))
@@ -339,11 +341,11 @@ bool Master_info::read_info(Rpl_info_han
if (lines >= LINES_IN_MASTER_INFO_WITH_SSL)
{
if (from->get_info(&temp_ssl, 0) ||
- from->get_info(ssl_ca, sizeof(ssl_ca), 0) ||
- from->get_info(ssl_capath, sizeof(ssl_capath), 0) ||
- from->get_info(ssl_cert, sizeof(ssl_cert), 0) ||
- from->get_info(ssl_cipher, sizeof(ssl_cipher), 0) ||
- from->get_info(ssl_key, sizeof(ssl_key), 0))
+ from->get_info(ssl_ca, (size_t) sizeof(ssl_ca), (char *) 0) ||
+ from->get_info(ssl_capath, (size_t) sizeof(ssl_capath), (char *) 0) ||
+ from->get_info(ssl_cert, (size_t) sizeof(ssl_cert), (char *) 0) ||
+ from->get_info(ssl_cipher, (size_t) sizeof(ssl_cipher), (char *) 0) ||
+ from->get_info(ssl_key, (size_t) sizeof(ssl_key), (char *) 0))
DBUG_RETURN(TRUE);
}
@@ -372,7 +374,7 @@ bool Master_info::read_info(Rpl_info_han
*/
if (lines >= LINE_FOR_MASTER_BIND)
{
- if (from->get_info(bind_addr, sizeof(bind_addr), ""))
+ if (from->get_info(bind_addr, (size_t) sizeof(bind_addr), (char *) ""))
DBUG_RETURN(TRUE);
}
@@ -389,7 +391,8 @@ bool Master_info::read_info(Rpl_info_han
/* Starting from 5.5 the master_uuid may be in the repository. */
if (lines >= LINE_FOR_MASTER_UUID)
{
- if (from->get_info(master_uuid, sizeof(master_uuid), 0))
+ if (from->get_info(master_uuid, (size_t) sizeof(master_uuid),
+ (char *) 0))
DBUG_RETURN(TRUE);
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-16 21:41:45 +0000
+++ b/sql/rpl_rli.cc 2010-12-16 22:00:47 +0000
@@ -1,4 +1,4 @@
-/* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
+/* /opyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -41,7 +41,8 @@ const char* info_rli_fields[]=
"group_relay_log_pos",
"group_master_log_name",
"group_master_log_pos",
- "sql_delay"
+ "sql_delay",
+ "number_of_workers"
};
const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
@@ -139,7 +140,6 @@ void Relay_log_info::deinit_workers()
if (!this_worker)
delete_dynamic(&workers);
- slave_parallel_workers= 0;
}
Relay_log_info::~Relay_log_info()
@@ -1258,10 +1258,7 @@ int Relay_log_info::init_info()
if (hot_log)
mysql_mutex_unlock(log_lock);
- // The correct place should be here. // ANDREI
- // mts_recovery_routine(this);
-
- DBUG_RETURN(0);
+ DBUG_RETURN(slave_parallel_workers ? mts_recovery_routine(this) : 0);
}
cur_log_fd = -1;
@@ -1627,7 +1624,8 @@ bool Relay_log_info::read_info(Rpl_info_
overwritten by the second row later.
*/
if (from->prepare_info_for_read(nidx) ||
- from->get_info(group_relay_log_name, sizeof(group_relay_log_name), ""))
+ from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
+ (char *) ""))
DBUG_RETURN(TRUE);
lines= strtoul(group_relay_log_name, &first_non_digit, 10);
@@ -1636,18 +1634,20 @@ bool Relay_log_info::read_info(Rpl_info_
*first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
{
/* Seems to be new format => read group relay log name */
- if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name), ""))
+ if (from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
+ (char *) ""))
DBUG_RETURN(TRUE);
}
else
DBUG_PRINT("info", ("relay_log_info file is in old format."));
if (from->get_info((ulong *) &temp_group_relay_log_pos,
- (ulong) BIN_LOG_HEADER_SIZE) ||
+ (ulong) BIN_LOG_HEADER_SIZE) ||
from->get_info(group_master_log_name,
- sizeof(group_relay_log_name), "") ||
+ (size_t) sizeof(group_relay_log_name),
+ (char *) "") ||
from->get_info((ulong *) &temp_group_master_log_pos,
- (ulong) 0))
+ (ulong) 0))
DBUG_RETURN(TRUE);
if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
@@ -1656,6 +1656,12 @@ bool Relay_log_info::read_info(Rpl_info_
DBUG_RETURN(TRUE);
}
+ if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
+ {
+ if (from->get_info(&slave_parallel_workers,(ulong) 0))
+ DBUG_RETURN(TRUE);
+ }
+
group_relay_log_pos= temp_group_relay_log_pos;
group_master_log_pos= temp_group_master_log_pos;
sql_delay= (int32) temp_sql_delay;
@@ -1674,12 +1680,13 @@ bool Relay_log_info::write_info(Rpl_info
//DBUG_ASSERT(!belongs_to_client());
if (to->prepare_info_for_write(nidx) ||
- to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_DELAY) ||
+ to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_WORKERS) ||
to->set_info(group_relay_log_name) ||
to->set_info((ulong) group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
to->set_info((ulong) group_master_log_pos) ||
- to->set_info((int) sql_delay))
+ to->set_info((int) sql_delay) ||
+ to->set_info(slave_parallel_workers))
DBUG_RETURN(TRUE);
DBUG_RETURN(FALSE);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-16 21:41:45 +0000
+++ b/sql/rpl_rli.h 2010-12-16 22:00:47 +0000
@@ -694,6 +694,11 @@ private:
*/
static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5;
+ /*
+ Before the WL#5599, relay_log.info had 5 lines. Now it has 6 lines.
+ */
+ static const int LINES_IN_RELAY_LOG_INFO_WITH_WORKERS= 6;
+
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-16 21:41:45 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-16 22:00:47 +0000
@@ -11,41 +11,57 @@
*/
const char *info_slave_worker_fields []=
{
- "partitions",
"relay_log_name",
"relay_log_pos",
"master_log_name",
- "master_log_pos"
+ "master_log_pos",
+ "checkpoint_log_pos",
+ "group_count",
+ "group_bitmap"
};
Slave_worker::Slave_worker(const char* type, const char* pfs)
- : Rpl_info_worker(type, pfs), group_relay_log_pos(0),
- group_master_log_pos(0), checkpoint_log_pos(0)
+ : Rpl_info_worker(type, pfs), curr_group_exec_parts(0),
+ group_relay_log_pos(0), group_master_log_pos(0),
+ checkpoint_log_pos(0), inited_group_execed(0),
+ critical(0)
{
group_relay_log_name[0]= 0;
group_master_log_name[0]= 0;
checkpoint_log_name[0]= 0;
- curr_group_exec_parts= new Database_ids(NAME_LEN);
}
Slave_worker::~Slave_worker()
{
if (curr_group_exec_parts)
delete curr_group_exec_parts;
+
+ if (inited_group_execed)
+ bitmap_free(&group_execed);
}
int Slave_worker::init_info()
{
+ int necessary_to_configure= 0;
+
DBUG_ENTER("Slave_worker::init_info");
if (inited)
DBUG_RETURN(0);
+ if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
+ goto err;
+
+ if (bitmap_init(&group_execed, NULL, 32768, FALSE))
+ goto err;
+
+ inited_group_execed= 1;
+
/*
The init_info() is used to either create or read information
from the repository, in order to initialize the Slave_worker.
*/
- int necessary_to_configure= check_info();
+ necessary_to_configure= check_info();
if (handler->init_info(uidx, nidx))
goto err;
@@ -109,23 +125,33 @@ bool Slave_worker::read_info(Rpl_info_ha
ulong temp_group_relay_log_pos= 0;
ulong temp_group_master_log_pos= 0;
+ ulong temp_checkpoint_log_pos= 0;
+ ulong nbytes= 0;
+ uchar *buffer= (uchar *) group_execed.bitmap;
if (from->prepare_info_for_read(nidx))
DBUG_RETURN(TRUE);
- if (from->get_info(curr_group_exec_parts, (Dynamic_ids *) NULL) ||
- from->get_info(group_relay_log_name,
- sizeof(group_relay_log_name), "") ||
+ if (from->get_info(group_relay_log_name,
+ (size_t) sizeof(group_relay_log_name),
+ (char *) "") ||
from->get_info((ulong *) &temp_group_relay_log_pos,
(ulong) 0) ||
from->get_info(group_master_log_name,
- sizeof(group_master_log_name), "") ||
+ (size_t) sizeof(group_master_log_name),
+ (char *) "") ||
from->get_info((ulong *) &temp_group_master_log_pos,
- (ulong) 0))
+ (ulong) 0) ||
+ from->get_info((ulong *) &temp_checkpoint_log_pos,
+ (ulong) 0) ||
+ from->get_info(&nbytes, (ulong) 0) ||
+ from->get_info(buffer, (size_t) nbytes,
+ (uchar *) 0))
DBUG_RETURN(TRUE);
group_relay_log_pos= temp_group_relay_log_pos;
group_master_log_pos= temp_group_master_log_pos;
+ checkpoint_log_pos= temp_checkpoint_log_pos;
DBUG_RETURN(FALSE);
}
@@ -134,20 +160,17 @@ bool Slave_worker::write_info(Rpl_info_h
{
DBUG_ENTER("Master_info::write_info");
- /*
- In certain cases this code may create master.info files that seems
- corrupted, because of extra lines filled with garbage in the end
- file (this happens if new contents take less space than previous
- contents of file). But because of number of lines in the first line
- of file we don't care about this garbage.
- */
+ ulong nbytes= (ulong) no_bytes_in_map(&group_execed);
+ uchar *buffer= (uchar*) group_execed.bitmap;
if (to->prepare_info_for_write(nidx) ||
- to->set_info(curr_group_exec_parts) ||
to->set_info(group_relay_log_name) ||
- to->set_info((ulong)group_relay_log_pos) ||
+ to->set_info((ulong) group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
- to->set_info((ulong)group_master_log_pos))
+ to->set_info((ulong) group_master_log_pos) ||
+ to->set_info((ulong) checkpoint_log_pos) ||
+ to->set_info(nbytes) ||
+ to->set_info(buffer, (size_t) nbytes))
DBUG_RETURN(TRUE);
DBUG_RETURN(FALSE);
@@ -173,11 +196,16 @@ bool Slave_worker::commit_positions(Log_
my_free(ptr_g->checkpoint_log_name);
ptr_g->checkpoint_log_name= NULL;
- // TODO: reset the current bitmap
-
+ critical= FALSE;
+ bitmap_clear_all(&group_execed);
+ }
+
+ if (ptr_g->checkpoint_seqno)
+ {
+ if (ptr_g->checkpoint_seqno > (32768 / 8))
+ critical= TRUE;
+ bitmap_fast_test_and_set(&group_execed, ptr_g->checkpoint_seqno);
}
-
- // TODO: update the group bitmap ptr_g->checkpoint_seqno 'th bit
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
@@ -839,3 +867,25 @@ int wait_for_workers_to_finish(Relay_log
}
return ret;
}
+
+bool critical_worker(Relay_log_info *rli)
+{
+ bool critical= FALSE;
+ DYNAMIC_ARRAY *ws= &rli->workers;
+ Slave_worker *current_worker= NULL;
+
+ if (rli->is_parallel_exec() && ws)
+ {
+ for (uint i= 0; ws && i < ws->elements; i++)
+ {
+ get_dynamic(ws, (uchar*) ¤t_worker, i);
+ if (current_worker->critical)
+ {
+ critical= current_worker->critical;
+ break;
+ }
+ }
+ }
+ return(critical);
+}
+
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-16 21:41:45 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-16 22:00:47 +0000
@@ -5,6 +5,7 @@
#include "sql_string.h"
#include "rpl_rli.h"
#include <my_sys.h>
+#include <my_bitmap.h>
/* APH entry */
struct db_worker
@@ -26,6 +27,7 @@ Slave_worker *get_slave_worker(const cha
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
int wait_for_workers_to_finish(Relay_log_info const *rli,
Slave_worker *ignore= NULL);
+bool critical_worker(Relay_log_info *rli);
#define SLAVE_WORKER_QUEUE_SIZE 8096
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
@@ -53,15 +55,21 @@ public:
ulong a; // first Available index to append at (next to tail)
ulong e; // the head index
volatile ulong len; // it is also queried to compute least occupied
+ bool inited_queue;
circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
- s(max), a(0), e(max), len(0)
+ s(max), a(0), e(max), len(0), inited_queue(FALSE)
{
DBUG_ASSERT(s < ULONG_MAX);
- my_init_dynamic_array(&Q, el_size, s, alloc_inc);
+ if (!my_init_dynamic_array(&Q, el_size, s, alloc_inc))
+ inited_queue= TRUE;
+ }
+ circular_buffer_queue () : inited_queue(FALSE) {}
+ ~circular_buffer_queue ()
+ {
+ if (inited_queue)
+ delete_dynamic(&Q);
}
- circular_buffer_queue () {}
- ~circular_buffer_queue () { delete_dynamic(&Q); }
/**
Content of the being dequeued item is copied to the arg-pointer
@@ -96,9 +104,7 @@ public:
typedef struct st_slave_job_group
{
- char *group_master_log_name; // (actually redundant)
- Dynamic_ids *db_ids; // This is used upon recovery.
-
+ char *group_master_log_name; // (actually redundant)
my_off_t master_log_pos; // B-event log_pos
my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint
my_off_t group_relay_log_pos; // filled by W
@@ -114,6 +120,7 @@ typedef struct st_slave_job_group
*/
char *group_relay_log_name; // The value is last seen relay-log
ulong worker_id;
+ Slave_worker *worker;
ulonglong total_seqno;
/* checkpoint coord are reset by CP and rotate:s */
@@ -128,9 +135,7 @@ typedef struct st_slave_job_group
to.worker_id= from->id; \
to.group_relay_log_pos= from->group_relay_log_pos; \
to.group_relay_log_name= from->group_relay_log_name; \
- to.group_master_log_pos= from->group_master_log_pos; \
- to.group_master_log_name= from->group_master_log_name; \
- to.db_ids= from->curr_group_exec_parts; \
+ to.worker= from; \
} while (0)
/**
@@ -252,6 +257,12 @@ public:
bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
+ MY_BITMAP group_execed;
+
+ bool inited_group_execed;
+
+ bool critical;
+
private:
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-16 21:41:45 +0000
+++ b/sql/rpl_slave.cc 2010-12-16 22:00:47 +0000
@@ -171,7 +171,8 @@ static int terminate_slave_thread(THD *t
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked);
+static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
+ bool force, bool locked);
bool mts_recovery_routine(Relay_log_info *rli);
/*
@@ -372,9 +373,8 @@ int init_recovery(Master_info* mi, const
Relay_log_info *rli= mi->rli;
char *group_master_log_name= NULL;
- // ANDREI The correct place should be here.
- // if (rli->is_parallel_exec() && !(error= mts_recovery_routine(rli)))
- // goto err;
+ if (rli->slave_parallel_workers && !(error= mts_recovery_routine(rli)))
+ goto err;
group_master_log_name= const_cast<char *>(rli->get_group_master_log_name());
if (group_master_log_name[0])
@@ -392,7 +392,7 @@ int init_recovery(Master_info* mi, const
rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
}
-// err:
+err:
DBUG_RETURN(error);
}
@@ -3794,16 +3794,42 @@ err:
int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
{
- return id1->group_relay_log_pos < id2->group_relay_log_pos ? -1 :
- (id1->group_relay_log_pos > id2->group_relay_log_pos ? 1 : 0);
+ longlong filecmp= strcmp(id1->group_relay_log_name, id2->group_relay_log_name);
+ longlong poscmp= id1->group_relay_log_pos - id2->group_relay_log_pos;
+ return (filecmp < 0 ? -1 :
+ (filecmp > 0 ? 1 :
+ (poscmp < 0 ? -1 :
+ (poscmp > 0 ? 1 : 0))));
}
-bool mts_recovery_routine(Relay_log_info *rli)
+bool mts_recovery_groups(Relay_log_info *rli, DYNAMIC_ARRAY *jobs)
{
+ DBUG_ENTER("mts_recovery_groups");
+ DBUG_ASSERT(rli->slave_parallel_workers > 0);
+
+ Slave_job_group job;
+
+ for (uint id= 0; id < rli->slave_parallel_workers; id++)
+ {
+ Slave_worker *worker=
+ Rpl_info_factory::create_worker(opt_worker_repository_id, id);
+ worker->init_info();
+ get_job(worker, job);
+ insert_dynamic(jobs, (uchar*) &job);
+ }
+ sort_dynamic(jobs, (qsort_cmp) mts_recovery_cmp);
+
+ DBUG_ASSERT(rli->slave_parallel_workers == jobs->elements);
+
+ DBUG_RETURN(FALSE);
+}
+
+bool mts_recovery_routine(Relay_log_info *rli)
+{
Log_event *ev= NULL, *desc= NULL;
char *log_name= NULL;
const char *errmsg= NULL;
- bool error= TRUE;
+ bool error= FALSE;
DYNAMIC_ARRAY jobs;
Slave_job_group job;
IO_CACHE log;
@@ -3811,21 +3837,11 @@ bool mts_recovery_routine(Relay_log_info
MY_STAT s;
DBUG_ENTER("mts_recovery_routine");
- DBUG_ASSERT(rli->workers.elements > 0);
-
+
my_init_dynamic_array(&jobs, sizeof(Slave_job_group),
- rli->workers.elements, rli->workers.elements);
+ rli->slave_parallel_workers, rli->slave_parallel_workers);
- for (uint pos= 0; pos < rli->workers.elements; pos++)
- {
- Slave_worker *worker;
- get_dynamic(&rli->workers, (uchar *) &worker, pos);
- get_job(worker, job);
- insert_dynamic(&jobs, (uchar*) &job);
- }
- sort_dynamic(&jobs, (qsort_cmp) mts_recovery_cmp);
-
- DBUG_ASSERT(rli->workers.elements == jobs.elements);
+ mts_recovery_groups(rli, &jobs);
Format_description_log_event fdle(BINLOG_VERSION);
if (!fdle.is_valid())
@@ -3833,19 +3849,13 @@ bool mts_recovery_routine(Relay_log_info
for (uint pos= 0; pos < jobs.elements; pos++)
{
- String buffer;
get_dynamic(&jobs, (uchar *) &job, pos);
- job.db_ids->pack_dynamic_ids(&buffer);
- sql_print_information("Recoverying relay log info based on Worker-Id %lu, partitions %s, "
- "group_relay_log_name %s, group_relay_log_pos %lu, "
- "group_master_log_name %s, group_master_lo_pos %lu",
+ sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
+ "group_relay_log_name %s, group_relay_log_pos %lu",
job.worker_id,
- buffer.c_ptr_safe(),
job.group_relay_log_name,
- (ulong) job.group_relay_log_pos,
- job.group_master_log_name,
- (ulong) job.group_master_log_pos);
+ (ulong) job.group_relay_log_pos);
if (job.group_relay_log_name == NULL || job.group_relay_log_pos == 0 ||
rli->get_group_relay_log_pos() >= job.group_relay_log_pos)
@@ -3916,6 +3926,14 @@ end:
mysql_file_close(file, MYF(MY_WME));
log_name= NULL;
}
+
+ for (uint pos= 0; pos < jobs.elements; pos++)
+ {
+ get_dynamic(&jobs, (uchar *) &job, pos);
+ job.worker->end_info();
+ delete job.worker;
+ }
+
delete_dynamic(&jobs);
DBUG_RETURN(error ? error : rli->flush_info(TRUE));
@@ -3928,7 +3946,8 @@ end:
@return FALSE success, TRUE otherwise
*/
-bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked)
+bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
+ bool force, bool locked)
{
ulong cnt;
bool error= FALSE;
@@ -3938,7 +3957,7 @@ bool mts_checkpoint_routine(Relay_log_in
set_timespec_nsec(curr_clock, 0);
ulong diff= diff_timespec(curr_clock, rli->last_clock);
- if (diff < period && !rli->gaq->full())
+ if (!force && diff < period && !rli->gaq->full())
{
/*
We do not need to execute the checkpoint now because
@@ -4333,13 +4352,6 @@ pthread_handler_t handle_slave_sql(void
}
THD_CHECK_SENTRY(thd);
- /*
- TODO: Alfranio, to settle invocation point. It's moved to possibly
- please an assert in the following block
- */
- //if (rli->is_parallel_exec() && mts_recovery_routine(rli))
- // goto err;
-
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
@@ -5713,8 +5725,6 @@ static Log_event* next_event(Relay_log_i
llstr(rli->get_event_relay_log_pos(),llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- // TODO: sort out with Alfranio
-
DBUG_ASSERT(my_b_tell(cur_log) == rli->get_event_relay_log_pos() || rli->is_parallel_exec());
DBUG_PRINT("info", ("next_event group master %s %lu group relay %s %lu event %s %lu\n",
@@ -5753,10 +5763,12 @@ static Log_event* next_event(Relay_log_i
/*
MTS checkpoint in the successful read branch
*/
- if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
+ bool critical= critical_worker(rli);
+ if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
+ critical))
{
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
- mts_checkpoint_routine(rli, period, TRUE); // ALFRANIO --- WHAT TO DO with ERRORS?
+ mts_checkpoint_routine(rli, period, critical, TRUE); // ALFRANIO ERROR
}
if (hot_log)
@@ -5878,14 +5890,17 @@ static Log_event* next_event(Relay_log_i
const char* old_msg= thd->proc_info;
- if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
+ bool critical= critical_worker(rli);
+ if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 ||
+ critical))
{
int ret= 0;
struct timespec waittime;
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
do
{
- mts_checkpoint_routine(rli, period, FALSE); // ALFRANIO ERROR
+ critical= critical_worker(rli);
+ mts_checkpoint_routine(rli, period, critical, FALSE); // ALFRANIO ERROR
set_timespec_nsec(waittime, period);
thd->enter_cond(log_cond, log_lock,
"Slave has read all relay log; "
@@ -6330,6 +6345,9 @@ int start_slave(THD* thd , Master_info*
/*
To cache the system var value and used it in the following.
The system var can change but not the cached.
+
+ We cannot change this variable in other places, otherwise,
+ recovery will break.
*/
mi->rli->slave_parallel_workers= opt_mts_slave_parallel_workers;
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101216220047-m2jph3ne5zaz3vge.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3248) WL#5569 | Andrei Elkin | 16 Dec |