#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3240 Andrei Elkin 2010-12-10 [merge]
merge from wl5569 repo to a local branch
modified:
mysql-test/r/mysqld--help-notwin.result
mysql-test/r/mysqld--help-win.result
mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
sql/dynamic_ids.cc
sql/dynamic_ids.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result 2010-12-09 13:23:19 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result 2010-12-10 12:10:20 +0000
@@ -339,6 +339,10 @@ The following options may be given as th
--min-examined-row-limit=#
Don't write queries to slow log that examine fewer rows
than that
+ --mts-checkpoint-period=#
+ Gather workers' activities and synchronously flush relay
+ log info to disk after every #th mili-seconds. Use 0
+ (default) to disable checkpoint
--mts-partition-hash-soft-max=#
Number of records in the mts partition hash below which
entries with zero usage are tolerated
@@ -883,6 +887,7 @@ max-user-connections 0
max-write-lock-count 18446744073709551615
memlock FALSE
min-examined-row-limit 0
+mts-checkpoint-period 0
mts-partition-hash-soft-max 16
mts-pending-jobs-size-max 16777216
mts-slave-worker-queue-len-max 40000
=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result 2010-12-09 13:23:19 +0000
+++ b/mysql-test/r/mysqld--help-win.result 2010-12-10 12:10:20 +0000
@@ -338,6 +338,10 @@ The following options may be given as th
--min-examined-row-limit=#
Don't write queries to slow log that examine fewer rows
than that
+ --mts-checkpoint-period=#
+ Gather workers' activities and synchronously flush relay
+ log info to disk after every #th mili-seconds. Use 0
+ (default) to disable checkpoint
--mts-partition-hash-soft-max=#
Number of records in the mts partition hash below which
entries with zero usage are tolerated
@@ -886,6 +890,7 @@ max-user-connections 0
max-write-lock-count 18446744073709551615
memlock FALSE
min-examined-row-limit 0
+mts-checkpoint-period 0
mts-partition-hash-soft-max 16
mts-pending-jobs-size-max 16777216
mts-slave-worker-queue-len-max 40000
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2010-12-07 17:35:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2010-12-10 12:10:20 +0000
@@ -6,6 +6,7 @@
#
source include/master-slave.inc;
+source include/have_binlog_format_row.inc;
connection slave;
=== modified file 'sql/dynamic_ids.cc'
--- a/sql/dynamic_ids.cc 2010-12-08 12:59:07 +0000
+++ b/sql/dynamic_ids.cc 2010-12-10 12:10:20 +0000
@@ -1,6 +1,16 @@
#include "dynamic_ids.h"
-Dynamic_ids::Dynamic_ids(size_t size)
+int cmp_string(const void *id1, const void *id2)
+{
+ return strcmp((char *) id1, (char *) id2);
+}
+
+int cmp_ulong(const void *id1, const void *id2)
+{
+ return ((*(ulong *) id1) - (* (ulong *)id2));
+}
+
+Dynamic_ids::Dynamic_ids(size_t param_size): size(param_size)
{
my_init_dynamic_array(&dynamic_ids, size, 16, 16);
}
@@ -58,6 +68,15 @@ bool Server_ids::do_pack_dynamic_ids(Str
DBUG_RETURN(FALSE);
}
+bool Server_ids::do_search_id(const void *id)
+{
+ return (bsearch((ulong *) id, dynamic_ids.buffer,
+ dynamic_ids.elements, size,
+ (int (*) (const void*, const void*))
+ cmp_ulong) != NULL);
+}
+
+
bool Database_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
{
char *token= NULL, *last= NULL;
@@ -115,3 +134,11 @@ bool Database_ids::do_pack_dynamic_ids(S
DBUG_RETURN(FALSE);
}
+
+bool Database_ids::do_search_id(const void *id)
+{
+ return (bsearch((const char *) id, dynamic_ids.buffer,
+ dynamic_ids.elements, size,
+ (int (*) (const void*, const void*))
+ cmp_string) != NULL);
+}
=== modified file 'sql/dynamic_ids.h'
--- a/sql/dynamic_ids.h 2010-12-08 12:59:07 +0000
+++ b/sql/dynamic_ids.h 2010-12-10 12:10:20 +0000
@@ -25,7 +25,7 @@ class Dynamic_ids
public:
DYNAMIC_ARRAY dynamic_ids;
- Dynamic_ids(size_t size);
+ Dynamic_ids(size_t param_size);
virtual ~Dynamic_ids();
bool pack_dynamic_ids(String *buffer)
@@ -38,9 +38,18 @@ public:
return(do_unpack_dynamic_ids(param_dynamic_ids));
}
+ bool search_id(const void *id)
+ {
+ return (do_search_id(id));
+ }
+
+protected:
+ size_t size;
+
private:
virtual bool do_pack_dynamic_ids(String *buffer)= 0;
virtual bool do_unpack_dynamic_ids(char *param_dynamic_ids)= 0;
+ virtual bool do_search_id(const void *id)= 0;
};
class Server_ids : public Dynamic_ids
@@ -52,6 +61,7 @@ public:
private:
bool do_pack_dynamic_ids(String *buffer);
bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+ bool do_search_id(const void *id);
};
class Database_ids : public Dynamic_ids
@@ -63,5 +73,6 @@ public:
private:
bool do_pack_dynamic_ids(String *buffer);
bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+ bool do_search_id(const void *id);
};
#endif
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-12-08 00:33:48 +0000
+++ b/sql/mysqld.cc 2010-12-10 12:10:20 +0000
@@ -503,7 +503,8 @@ ulong prepared_stmt_count=0;
ulong thread_id=1L,current_pid;
ulong slow_launch_threads = 0;
uint sync_binlog_period= 0, sync_relaylog_period= 0,
- sync_relayloginfo_period= 0, sync_masterinfo_period= 0;
+ sync_relayloginfo_period= 0, sync_masterinfo_period= 0,
+ mts_checkpoint_period= 0;
ulong expire_logs_days = 0;
const double log_10[] = {
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-12-08 00:33:48 +0000
+++ b/sql/mysqld.h 2010-12-10 12:10:20 +0000
@@ -129,7 +129,8 @@ extern ulong current_pid;
extern ulong expire_logs_days;
extern my_bool relay_log_recovery;
extern uint sync_binlog_period, sync_relaylog_period,
- sync_relayloginfo_period, sync_masterinfo_period;
+ sync_relayloginfo_period, sync_masterinfo_period,
+ mts_checkpoint_period;
extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size;
extern ulong tc_log_page_waits;
extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb;
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-09 17:45:02 +0000
+++ b/sql/rpl_rli.cc 2010-12-10 16:25:27 +0000
@@ -78,13 +78,6 @@ Relay_log_info::Relay_log_info(bool is_s
group_master_log_name[0]= 0;
until_log_name[0]= ign_master_log_name_end[0]= 0;
- /*
- We need to decide if you are going to use an
- option and store it in a storage.
-
- 500 msec.
- */
- lwm_period= 0.500;
set_timespec_nsec(last_clock, 0);
bzero((char*) &cache_buf, sizeof(cache_buf));
@@ -96,52 +89,6 @@ Relay_log_info::Relay_log_info(bool is_s
mysql_cond_init(key_checkpoint_stop_cond, &checkpoint_stop_cond, NULL);
relay_log.init_pthread_objects();
-#if 0
-
- /*
- Parallel slave parameters initialization is done regardless
- whether the feature is or going to be active or not.
- */
- trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0;
- /*
- parallel slave parameter to pospone Crdr reading when the number
- of non-processed yet jobs becomes bigger than the limit's value
- MHS_todo: consider a memory-size based param
- */
- mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
-
- //
- // TODO -- ANDREI --- You need to take care of possible failures related to
- // allocation.
- //
- uint wi= 0;
- key_mutex_slave_parallel_worker= new PSI_mutex_key[slave_parallel_workers];
- key_cond_slave_parallel_worker= new PSI_cond_key[slave_parallel_workers];
- worker_mutexes= new PSI_mutex_info[slave_parallel_workers];
- worker_conds= new PSI_cond_info[slave_parallel_workers];
- for (wi= 0; wi < slave_parallel_workers; wi++)
- {
- worker_mutexes[wi].m_key= (PSI_mutex_key *) &(key_mutex_slave_parallel_worker[wi]);
- worker_mutexes[wi].m_name= "Slave_worker::jobs_lock";
- worker_mutexes[wi].m_flags= 0;
- worker_conds[wi].m_key= (PSI_cond_key *) &(key_cond_slave_parallel_worker[wi]);
- worker_conds[wi].m_name= "Slave_worker::jobs_cond";
- worker_conds[wi].m_flags= 0;
- }
- if (PSI_server)
- {
- PSI_server->register_mutex("worker", worker_mutexes,
- slave_parallel_workers);
- PSI_server->register_cond("worker", worker_conds,
- slave_parallel_workers);
- }
- mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
- MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
- my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
-
-#endif
-
DBUG_VOID_RETURN;
}
@@ -207,16 +154,6 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&log_space_cond);
relay_log.cleanup();
-#if 0
-
- mysql_mutex_destroy(&pending_jobs_lock);
- mysql_cond_destroy(&pending_jobs_cond);
-
- if (!this_worker)
- delete_dynamic(&workers);
-
-#endif
-
DBUG_VOID_RETURN;
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-09 17:45:02 +0000
+++ b/sql/rpl_rli.h 2010-12-10 16:25:27 +0000
@@ -344,7 +344,6 @@ public:
size_t slave_patternload_file_size;
struct timespec last_clock;
- float lwm_period;
Relay_log_info(bool is_slave_recovery);
virtual ~Relay_log_info();
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-10 15:50:03 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-10 16:25:27 +0000
@@ -125,41 +125,13 @@ typedef struct st_slave_job_group
do \
{ \
to.worker_id= from->id; \
- fprintf(stderr, "DEBUGGING Worker-Id %lu Worker-Id %lu\n", \
- to.worker_id, from->id); \
to.group_relay_log_pos= from->group_relay_log_pos; \
- fprintf(stderr, "DEBUGGING group_relay_log_pos %lu group_relay_log_pos %lu\n", \
- (ulong) to.group_relay_log_pos, (ulong) from->group_relay_log_pos); \
to.group_relay_log_name= from->group_relay_log_name; \
- fprintf(stderr, "DEBUGGING group_relay_log_name %s group_relay_log_name %s\n", \
- to.group_relay_log_name, from->group_relay_log_name); \
to.group_master_log_pos= from->group_master_log_pos; \
- fprintf(stderr, "DEBUGGING to group_master_log_pos %lu group_master_log_pos %lu\n", \
- (ulong) to.group_master_log_pos, (ulong) from->group_master_log_pos); \
to.group_master_log_name= from->group_master_log_name; \
- fprintf(stderr, "DEBUGGING to group_master_log_name %s group_master_log_pos %s\n", \
- to.group_master_log_name, from->group_master_log_name); \
to.db_ids= from->curr_group_exec_parts; \
} while (0)
-#define debug_jobs(jobs) \
- do \
- { \
- Slave_job_group job; \
- for (uint pos= 0; pos < jobs.elements; pos++) \
- { \
- get_dynamic(&jobs, (uchar *) &job, pos); \
- fprintf(stderr, "DEBUGGING Worker-Id %lu, " \
- "group_relay_log_name %s, group_relay_log_pos %lu, " \
- "group_master_log_name %s, group_master_lo_pos %lu\n", \
- job.worker_id, \
- job.group_relay_log_name, \
- (ulong) job.group_relay_log_pos, \
- job.group_master_log_name, \
- (ulong) job.group_master_log_pos); \
- } \
- } while (0)
-
/**
Group Assigned Queue whose first element identifies first gap
in committed sequence. The head of the queue is therefore next to
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-10 15:50:03 +0000
+++ b/sql/rpl_slave.cc 2010-12-10 16:25:27 +0000
@@ -171,7 +171,7 @@ static int terminate_slave_thread(THD *t
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked);
+static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked);
bool mts_recovery_routine(Relay_log_info *rli);
/*
@@ -3796,17 +3796,23 @@ err:
DBUG_RETURN(0);
}
-int mts_jobs_cmp(Slave_job_group *id1, Slave_job_group *id2)
+int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
{
return id1->group_relay_log_pos < id2->group_relay_log_pos ? -1 :
(id1->group_relay_log_pos > id2->group_relay_log_pos ? 1 : 0);
}
-
bool mts_recovery_routine(Relay_log_info *rli)
{
+ Log_event *ev= NULL, *desc= NULL;
+ char *log_name= NULL;
+ const char *errmsg= NULL;
+ bool error= TRUE;
DYNAMIC_ARRAY jobs;
Slave_job_group job;
+ IO_CACHE log;
+ File file;
+ MY_STAT s;
DBUG_ENTER("mts_recovery_routine");
DBUG_ASSERT(rli->workers.elements > 0);
@@ -3821,16 +3827,157 @@ bool mts_recovery_routine(Relay_log_info
get_job(worker, job);
insert_dynamic(&jobs, (uchar*) &job);
}
- sort_dynamic(&jobs, (qsort_cmp) change_master_server_id_cmp);
+ sort_dynamic(&jobs, (qsort_cmp) mts_recovery_cmp);
DBUG_ASSERT(rli->workers.elements == jobs.elements);
- debug_jobs(jobs);
- // TODO -- ALFRANIO CORE OF RECOVERY
+ Format_description_log_event fdle(BINLOG_VERSION);
+ if (!fdle.is_valid())
+ goto end;
+
+ for (uint pos= 0; pos < jobs.elements; pos++)
+ {
+ String buffer;
+ get_dynamic(&jobs, (uchar *) &job, pos);
+
+ job.db_ids->pack_dynamic_ids(&buffer);
+ sql_print_information("Recoverying relay log info based on Worker-Id %lu, partitions %s, "
+ "group_relay_log_name %s, group_relay_log_pos %lu, "
+ "group_master_log_name %s, group_master_lo_pos %lu",
+ job.worker_id,
+ buffer.c_ptr_safe(),
+ job.group_relay_log_name,
+ (ulong) job.group_relay_log_pos,
+ job.group_master_log_name,
+ (ulong) job.group_master_log_pos);
+
+ if (job.group_relay_log_name == NULL || job.group_relay_log_pos == 0 ||
+ rli->get_group_relay_log_pos() >= job.group_relay_log_pos)
+ continue;
+
+ if (log_name == NULL || strcmp(log_name, job.group_relay_log_name))
+ {
+ if (ev)
+ {
+ delete ev;
+ ev= NULL;
+ }
+
+ if (desc)
+ {
+ delete desc;
+ desc= NULL;
+ }
+
+ if (log_name)
+ {
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ log_name= NULL;
+ }
+
+ if ((file= open_binlog(&log, job.group_relay_log_name, &errmsg)) < 0)
+ {
+ sql_print_error("%s", errmsg);
+ goto end;
+ }
+ log_name= job.group_relay_log_name;
+
+ my_stat(log_name, &s, MYF(0));
+
+ if (!((desc= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)) &&
+ desc->get_type_code() == FORMAT_DESCRIPTION_EVENT))
+ {
+ goto end;
+ }
+ }
+ my_b_seek(&log, job.group_relay_log_pos);
+ sql_print_information("Recoverying relay log info. Checking relay log name "
+ "%s from pos %lu, maxsize %lu.", log_name,
+ (ulong) job.group_relay_log_pos, (ulong) s.st_size);
+
+ bool found= FALSE;
+ int res= 0;
+ while ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)))
+ {
+ DBUG_ASSERT(ev->is_valid());
+
+ String buffer;
+ const char *db= ev->get_db();
+
+ if (db != NULL)
+ {
+ buffer.set_int(strlen(db), FALSE, &my_charset_bin);
+ buffer.append(db);
+ found= job.db_ids->search_id(buffer.c_ptr_safe());
+ }
+ sql_print_information("Recovery relay log info. Event %s on db %s "
+ "sets cursor to pos %lu and was handled by worker(%d).",
+ ev->get_type_str(), db, (ulong) my_b_tell(&log),
+ found);
+ if (!found)
+ {
+ delete ev;
+ ev= NULL;
+ break;
+ }
+
+ res= ev->apply_event(rli);
+ delete ev;
+ ev= NULL;
+
+ if (res)
+ goto end;
+ }
+ if (!found)
+ {
+ sql_print_information("Before updating RLI "
+ "group_relay_log_name %s, "
+ "group_relay_log_pos %lu, "
+ "group_master_log_name %s, "
+ "group_master_lo_pos %lu.",
+ rli->get_group_relay_log_name(),
+ (ulong) rli->get_group_relay_log_pos(),
+ rli->get_group_master_log_name(),
+ (ulong) rli->get_group_master_log_pos());
+
+ rli->set_group_relay_log_pos(job.group_relay_log_pos);
+ rli->set_group_relay_log_name(job.group_relay_log_name);
+ rli->set_group_master_log_pos(job.group_master_log_pos);
+ rli->set_group_master_log_name(job.group_master_log_name);
+
+ sql_print_information("After updating RLI "
+ "group_relay_log_name %s, "
+ "group_relay_log_pos %lu, "
+ "group_master_log_name %s, "
+ "group_master_lo_pos %lu.",
+ rli->get_group_relay_log_name(),
+ (ulong) rli->get_group_relay_log_pos(),
+ rli->get_group_master_log_name(),
+ (ulong) rli->get_group_master_log_pos());
+ }
+ }
+ error= FALSE;
+
+end:
+ if (desc)
+ {
+ delete desc;
+ desc= NULL;
+ }
+
+ if (log_name)
+ {
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ log_name= NULL;
+ }
delete_dynamic(&jobs);
-
- DBUG_RETURN(rli->flush_info(TRUE));
+
+ DBUG_RETURN(error ? error : rli->flush_info(TRUE));
}
/**
@@ -3840,7 +3987,7 @@ bool mts_recovery_routine(Relay_log_info
@return FALSE success, TRUE otherwise
*/
-bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked)
+bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked)
{
ulong cnt;
bool error= FALSE;
@@ -5659,9 +5806,9 @@ static Log_event* next_event(Relay_log_i
/*
MTS checkpoint in the successful read branch
*/
- if (rli->is_parallel_exec() && rli->lwm_period != 0.0)
+ if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
{
- ulong period= static_cast<ulong>(rli->lwm_period * 1000000000UL);
+ ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
mts_checkpoint_routine(rli, period, TRUE); // ALFRANIO --- WHAT TO DO with ERRORS?
}
@@ -5784,11 +5931,11 @@ static Log_event* next_event(Relay_log_i
const char* old_msg= thd->proc_info;
- if (rli->is_parallel_exec() && rli->lwm_period != 0.0)
+ if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
{
int ret= 0;
struct timespec waittime;
- ulong period= static_cast<ulong>(rli->lwm_period * 1000000000UL);
+ ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
do
{
mts_checkpoint_routine(rli, period, FALSE); // ALFRANIO ERROR
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-12-10 15:50:03 +0000
+++ b/sql/sys_vars.cc 2010-12-10 16:25:27 +0000
@@ -3104,6 +3104,13 @@ static Sys_var_uint Sys_sync_relayloginf
"synchronous flushing",
GLOBAL_VAR(sync_relayloginfo_period), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1));
+
+static Sys_var_uint Sys_checkpoint_mts_period(
+ "mts_checkpoint_period", "Gather workers' activities and synchronously "
+ "flush relay log info to disk after every #th mili-seconds. Use 0 "
+ "(default) to disable checkpoint",
+ GLOBAL_VAR(mts_checkpoint_period), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1));
#endif
static Sys_var_uint Sys_sync_binlog_period(
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101210162527-3rrmlpigndb4fu6i.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3240) | Andrei Elkin | 10 Dec |