3255 Andrei Elkin 2010-12-21 [merge]
merging with repo
modified:
mysql-test/suite/funcs_1/r/is_columns_mysql.result
scripts/mysql_system_tables.sql
sql/rpl_info_table.cc
sql/rpl_info_table_access.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-12-16 21:41:45 +0000
+++ b/sql/log_event.h 2010-12-21 19:31:29 +0000
@@ -757,11 +757,11 @@ typedef struct st_print_event_info
Such identifier is not yet unique generally as the event originating master
is resetable. Also the crashed master can be replaced with some other.
*/
-struct event_coordinates
+typedef struct event_coordinates
{
char * file_name; // binlog file name (directories stripped)
my_off_t pos; // event's position in the binlog file
-};
+} LOG_POS_COORD;
/**
@class Log_event
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-20 22:18:33 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-21 19:31:29 +0000
@@ -15,10 +15,14 @@ const char *info_slave_worker_fields []=
"relay_log_pos",
"master_log_name",
"master_log_pos",
+
+ // todo: remove the next four
"checkpoint_relay_log_name",
"checkpoint_relay_log_pos",
"checkpoint_master_log_name",
"checkpoint_master_log_pos",
+
+ "checkpoint_seqno", // index of the last committed group in the bitmap
"checkpoint_group_size",
"checkpoint_group_bitmap"
};
@@ -27,7 +31,7 @@ Slave_worker::Slave_worker(const char* t
Relay_log_info *rli)
: Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
group_relay_log_pos(0), group_master_log_pos(0),
- checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
+ checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
inited_group_execed(0)
{
group_relay_log_name[0]= 0;
@@ -133,6 +137,7 @@ bool Slave_worker::read_info(Rpl_info_ha
ulong temp_group_master_log_pos= 0;
ulong temp_checkpoint_relay_log_pos= 0;
ulong temp_checkpoint_master_log_pos= 0;
+ ulong temp_checkpoint_seqno= 0;
ulong nbytes= 0;
uchar *buffer= (uchar *) group_execed.bitmap;
@@ -159,6 +164,8 @@ bool Slave_worker::read_info(Rpl_info_ha
(char *) "") ||
from->get_info((ulong *) &temp_checkpoint_master_log_pos,
(ulong) 0) ||
+ from->get_info((ulong *) &temp_checkpoint_seqno,
+ (ulong) 0) ||
from->get_info(&nbytes, (ulong) 0) ||
from->get_info(buffer, (size_t) nbytes,
(uchar *) 0))
@@ -168,6 +175,7 @@ bool Slave_worker::read_info(Rpl_info_ha
group_master_log_pos= temp_group_master_log_pos;
checkpoint_relay_log_pos= temp_checkpoint_relay_log_pos;
checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
+ checkpoint_seqno= temp_checkpoint_seqno;
DBUG_RETURN(FALSE);
}
@@ -188,6 +196,7 @@ bool Slave_worker::write_info(Rpl_info_h
to->set_info((ulong) checkpoint_relay_log_pos) ||
to->set_info(checkpoint_master_log_name) ||
to->set_info((ulong) checkpoint_master_log_pos) ||
+ to->set_info((ulong) checkpoint_seqno) ||
to->set_info(nbytes) ||
to->set_info(buffer, (size_t) nbytes))
DBUG_RETURN(TRUE);
@@ -224,7 +233,7 @@ bool Slave_worker::commit_positions(Log_
}
bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
-
+ checkpoint_seqno= ptr_g->checkpoint_seqno;
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
strmake(group_master_log_name, c_rli->get_group_master_log_name(),
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-20 22:18:33 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-21 19:31:29 +0000
@@ -139,6 +139,7 @@ typedef struct st_slave_job_group
do \
{ \
to.worker_id= from->id; \
+ to.checkpoint_seqno= from->checkpoint_seqno; \
to.group_master_log_pos= from->checkpoint_master_log_pos; \
to.group_master_log_name= from->checkpoint_master_log_name; \
to.group_relay_log_pos= from->checkpoint_relay_log_pos; \
@@ -253,10 +254,14 @@ public:
ulonglong group_relay_log_pos;
char group_master_log_name[FN_REFLEN];
ulonglong group_master_log_pos;
+
+ // todo: remove
char checkpoint_relay_log_name[FN_REFLEN];
ulonglong checkpoint_relay_log_pos;
+
char checkpoint_master_log_name[FN_REFLEN];
ulonglong checkpoint_master_log_pos;
+ ulong checkpoint_seqno;
int init_info();
void end_info();
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-21 14:11:05 +0000
+++ b/sql/rpl_slave.cc 2010-12-21 19:34:58 +0000
@@ -3791,69 +3791,185 @@ err:
/**
Orders jobs by comparing relay log information.
*/
+#if 0
int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
+#endif
+
+int mts_event_coord_cmp(LOG_POS_COORD *id1, LOG_POS_COORD *id2)
{
- longlong filecmp= strcmp(id1->group_relay_log_name, id2->group_relay_log_name);
- longlong poscmp= id1->group_relay_log_pos - id2->group_relay_log_pos;
+ longlong filecmp= strcmp(id1->file_name, id2->file_name);
+ longlong poscmp= id1->pos - id2->pos;
return (filecmp < 0 ? -1 : (filecmp > 0 ? 1 :
(poscmp < 0 ? -1 : (poscmp > 0 ? 1 : 0))));
}
bool mts_recovery_groups(Relay_log_info *rli, MY_BITMAP *groups)
{
- Log_event *ev= NULL, *desc= NULL;
- char *log_name= NULL;
+ Log_event *ev= NULL; // , *desc= NULL;
+ const char *log_name= NULL;
const char *errmsg= NULL;
bool error= FALSE;
- DYNAMIC_ARRAY jobs;
- uint group_worker_counter;
- uint group_lwm_counter;
+ DYNAMIC_ARRAY above_lwm_jobs;
bool curr_group_seen_begin= FALSE;
Slave_job_group job_worker;
Slave_job_group job_file;
IO_CACHE log;
File file;
MY_STAT s;
- longlong filecmp= 0;
- longlong poscmp= 0;
+
+ LOG_POS_COORD cp=
+ {
+ (char *) rli->get_group_master_log_name(),
+ rli->get_group_master_log_pos()
+ };
DBUG_ENTER("mts_recovery_groups");
DBUG_ASSERT(rli->recovery_parallel_workers > 0);
/*
- Gathers information on workers and stores it in jobs ordered by
- relay log information.
+ Gathers information on valuable workers and stores it in
+ above_lwm_jobs in asc ordered by the master binlog coordinates.
*/
- my_init_dynamic_array(&jobs, sizeof(Slave_job_group),
+ my_init_dynamic_array(&above_lwm_jobs, sizeof(Slave_job_group),
rli->recovery_parallel_workers, rli->recovery_parallel_workers);
+
for (uint id= 0; id < rli->recovery_parallel_workers; id++)
{
Slave_worker *worker=
Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
worker->init_info();
retrieve_job(worker, job_file);
+ LOG_POS_COORD w_last= {worker->group_master_log_name, worker->group_master_log_pos};
+ if (mts_event_coord_cmp(&w_last, &cp) > 0)
+ insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
+ else
+ delete worker;
+ };
+
+#if 0
+ for (uint id= 0; id < rli->slave_parallel_workers; id++)
+ {
+ Slave_worker *worker=
+ Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
+ worker->init_info();
+ retrieve_job(worker, job_file);
/*
This avoids gathering information on workers that haven't
processed anything.
*/
+
+ // TODO: disregard W_i | W_i->coord < LWM
+
if (job_file.group_relay_log_name != NULL && strcmp(job_file.group_relay_log_name, "") &&
job_file.group_master_log_name != NULL && strcmp(job_file.group_master_log_name, ""))
- insert_dynamic(&jobs, (uchar*) &job_file);
+ insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
}
- sort_dynamic(&jobs, (qsort_cmp) mts_recovery_cmp);
+#endif
+ sort_dynamic(&above_lwm_jobs, (qsort_cmp) mts_event_coord_cmp);
/*
- In what follows, the group bitmap is constructed.
+ In what follows, the group Recovery Bitmap is constructed.
+
+ seek(lwm);
+
+ while(w= next(above_lwm_w))
+ do
+ read G
+ if G == w->last_comm
+ w.B << group_cnt++;
+ RB |= w.B;
+ break;
+ else
+ group_cnt++;
+ while(!eof);
+ continue;
*/
Format_description_log_event fdle(BINLOG_VERSION);
if (!fdle.is_valid())
- goto end;
+ {
+ error= TRUE;
+ goto err;
+ }
- for (uint it_job= 0; it_job < jobs.elements; it_job++)
+ log_name= const_cast<Relay_log_info*>(rli)->get_group_relay_log_name();
+ if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
+ {
+ error= TRUE;
+ sql_print_error("%s", errmsg);
+ goto err;
+ }
+ DBUG_ASSERT(my_stat(log_name, &s, MYF(0))); // TODO: Alfranio, why my_stat?
+
+ my_b_seek(&log, (my_off_t) rli->get_group_relay_log_pos());
+
+ bitmap_clear_all(groups);
+
+ for (uint it_job= 0, group_worker_counter= 0; it_job < above_lwm_jobs.elements; it_job++)
+ {
+ Slave_worker *w= ((Slave_job_group *)
+ dynamic_array_ptr(&above_lwm_jobs, it_job))->worker;
+ LOG_POS_COORD w_last= { w->group_master_log_name, w->group_master_log_pos };
+
+ sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
+ "group_relay_log_name %s, group_relay_log_pos %lu "
+ "group_master_log_name %s, group_master_log_pos %lu",
+ w->id,
+ w->group_relay_log_name,
+ (ulong) w->group_relay_log_pos,
+ w->group_master_log_name,
+ (ulong) w->group_master_log_pos);
+
+ // TODO: extend to handle sequence of relay logs (read(ev) -> EOF)
+
+ while ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)))
+ {
+ DBUG_ASSERT(ev->is_valid());
+ DBUG_ASSERT(group_worker_counter < rli->checkpoint_group);
+
+ // TODO: relax condition to allow --mts_exp_run_query_in_parallel= 1
+ if (ev->starts_group())
+ curr_group_seen_begin= TRUE;
+ else
+ if (ev->ends_group())
+ {
+ int ret;
+ LOG_POS_COORD ev_coord= { (char *) rli->get_group_master_log_name(),
+ ev->log_pos };
+ if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
+ {
+ // hit it
+ // w.B << group_cnt++;
+ // RB |= w.B;
+ for (uint i= w->checkpoint_seqno - group_worker_counter, j= 0;
+ i <= w->checkpoint_seqno; i++, j++)
+ {
+ //bitmap_intersect(&rli->groups, &w->group_execed);
+ if (_bitmap_is_set(&w->group_execed, i))
+ bitmap_fast_test_and_set(groups, j);
+ }
+ group_worker_counter++;
+ delete ev;
+ ev= NULL;
+ break;
+ }
+ else
+ {
+ DBUG_ASSERT(ret < 0);
+ group_worker_counter++;
+ }
+ }
+ delete ev;
+ ev= NULL;
+ }
+ }
+
+#if 0
+ for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
{
group_worker_counter= 0;
group_lwm_counter= 0;
- get_dynamic(&jobs, (uchar *) &job_worker, it_job);
+ get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
"group_relay_log_name %s, group_relay_log_pos %lu "
@@ -3864,9 +3980,9 @@ bool mts_recovery_groups(Relay_log_info
job_worker.group_master_log_name,
(ulong) job_worker.group_master_log_pos);
- for (uint it_file= 0; it_file < jobs.elements; it_file++)
+ for (uint it_file= 0; it_file < above_lwm_jobs.elements; it_file++)
{
- get_dynamic(&jobs, (uchar *) &job_file, it_file);
+ get_dynamic(&above_lwm_jobs, (uchar *) &job_file, it_file);
/*
Either the current relay log file was already processed by the
@@ -3959,21 +4075,29 @@ end:
desc= NULL;
}
+
if (log_name)
{
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
log_name= NULL;
}
+#endif
- for (uint it_job= 0; it_job < jobs.elements; it_job++)
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ log_name= NULL;
+
+err:
+
+ for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
{
- get_dynamic(&jobs, (uchar *) &job_worker, it_job);
+ get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
job_worker.worker->end_info();
delete job_worker.worker;
}
- delete_dynamic(&jobs);
+ delete_dynamic(&above_lwm_jobs);
DBUG_RETURN(error);
}
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101221193458-dha63d1348f3trsz.bundle
| Thread |
|---|
| • bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3255) | Andrei Elkin | 21 Dec |