#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3225 Andrei Elkin 2010-12-02 [merge]
manual merge to wl#5569 tree
added:
sql/dynamic_ids.cc
renamed:
sql/server_ids.h => sql/dynamic_ids.h
modified:
.bzr-mysql/default.conf
mysql-test/suite/rpl/t/rpl_parallel.test
sql/CMakeLists.txt
sql/Makefile.am
sql/log_event.cc
sql/rpl_info_dummy.cc
sql/rpl_info_dummy.h
sql/rpl_info_file.cc
sql/rpl_info_file.h
sql/rpl_info_handler.h
sql/rpl_info_table.cc
sql/rpl_info_table.h
sql/rpl_mi.cc
sql/rpl_mi.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/dynamic_ids.h
=== modified file '.bzr-mysql/default.conf'
--- a/.bzr-mysql/default.conf 2010-11-03 15:35:37 +0000
+++ b/.bzr-mysql/default.conf 2010-12-02 13:21:23 +0000
@@ -1,4 +1,4 @@
[MYSQL]
post_commit_to = "commits@stripped"
post_push_to = "commits@stripped"
-tree_name = "mysql-next-mr.crash-safe"
+tree_name = "mysql-next-mr-wl5569"
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test 2010-12-02 17:46:46 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2010-12-02 18:13:12 +0000
@@ -34,6 +34,7 @@
#
source include/master-slave.inc;
+source include/have_binlog_format_row.inc;
connection slave;
set @save.slave_parallel_workers= @@global.slave_parallel_workers;
=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt 2010-11-23 09:03:37 +0000
+++ b/sql/CMakeLists.txt 2010-12-01 19:15:08 +0000
@@ -106,7 +106,7 @@ ADD_DEPENDENCIES(master GenError)
SET (SLAVE_SOURCE rpl_slave.cc rpl_reporting.cc rpl_mi.cc rpl_rli.cc
rpl_info_handler.cc rpl_info_file.cc rpl_info_table.cc
rpl_info_values.cc rpl_info.cc rpl_info_factory.cc
- rpl_info_table_access.cc server_ids.h rpl_rli_pdb.cc
+ rpl_info_table_access.cc dynamic_ids.cc rpl_rli_pdb.cc
rpl_info_dummy.cc)
ADD_LIBRARY(slave ${SLAVE_SOURCE})
ADD_DEPENDENCIES(slave GenError)
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2010-11-23 09:03:37 +0000
+++ b/sql/Makefile.am 2010-12-01 19:15:08 +0000
@@ -111,7 +111,7 @@ noinst_HEADERS = item.h item_func.h item
log.h sql_show.h rpl_info.h rpl_info_file.h \
rpl_info_table.h rpl_rli.h rpl_mi.h rpl_info_values.h \
rpl_info_table_access.h rpl_info_dummy.h \
- rpl_info_factory.h server_ids.h rpl_rli_pdb.h \
+ rpl_info_factory.h dynamic_ids.h rpl_rli_pdb.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h \
lex.h lex_symbol.h sql_acl.h sql_crypt.h sql_base.h \
sql_table.h key.h lock.h thr_malloc.h strfunc.h \
@@ -195,7 +195,7 @@ libbinlog_la_SOURCES = log_event.cc log_
librpl_la_SOURCES = rpl_handler.cc rpl_tblmap.cc
libmaster_la_SOURCES = rpl_master.cc
libslave_la_SOURCES = rpl_slave.cc rpl_reporting.cc rpl_rli.cc rpl_mi.cc \
- rpl_info.cc rpl_info_factory.cc rpl_info_file.cc \
+ rpl_info.cc rpl_info_factory.cc dynamic_ids.cc rpl_info_file.cc \
rpl_info_handler.cc rpl_info_table.cc rpl_info_dummy.cc \
rpl_info_table_access.cc rpl_info_values.cc rpl_rli_pdb.cc
libndb_la_CPPFLAGS= @ndbcluster_includes@
=== added file 'sql/dynamic_ids.cc'
--- a/sql/dynamic_ids.cc 1970-01-01 00:00:00 +0000
+++ b/sql/dynamic_ids.cc 2010-12-01 19:15:08 +0000
@@ -0,0 +1,106 @@
+#include "dynamic_ids.h"
+
+Dynamic_ids::Dynamic_ids()
+{
+ my_init_dynamic_array(&dynamic_ids, 64, 16, 16);
+}
+
+Dynamic_ids::~Dynamic_ids()
+{
+ delete_dynamic(&dynamic_ids);
+}
+
+bool Server_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
+{
+ char *token= NULL, *last= NULL;
+ uint num_items= 0;
+
+ DBUG_ENTER("Server_ids::unpack_dynamic_ids");
+
+ token= strtok_r((char *)const_cast<const char*>(param_dynamic_ids),
+ " ", &last);
+
+ if (token == NULL)
+ DBUG_RETURN(TRUE);
+
+ num_items= atoi(token);
+ for (uint i=0; i < num_items; i++)
+ {
+ token= strtok_r(NULL, " ", &last);
+ if (token == NULL)
+ DBUG_RETURN(TRUE);
+ else
+ {
+ ulong val= atol(token);
+ insert_dynamic(&dynamic_ids, (uchar *) &val);
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
+
+
+bool Server_ids::do_pack_dynamic_ids(String *buffer)
+{
+ DBUG_ENTER("Server_ids::pack_dynamic_ids");
+
+ if (buffer->set_int(dynamic_ids.elements, FALSE, &my_charset_bin))
+ DBUG_RETURN(TRUE);
+
+ for (ulong i= 0;
+ i < dynamic_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&dynamic_ids, (uchar*) &s_id, i);
+ if (buffer->append(" ") ||
+ buffer->append_ulonglong(s_id))
+ DBUG_RETURN(TRUE);
+ }
+
+ DBUG_RETURN(FALSE);
+}
+
+bool Database_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
+{
+ char *token= NULL, *last= NULL;
+ uint num_items= 0;
+
+ DBUG_ENTER("Server_ids::unpack_dynamic_ids");
+
+ token= strtok_r((char *)const_cast<const char*>(param_dynamic_ids),
+ " ", &last);
+
+ if (token == NULL)
+ DBUG_RETURN(TRUE);
+
+ num_items= atoi(token);
+ for (uint i=0; i < num_items; i++)
+ {
+ token= strtok_r(NULL, " ", &last);
+ if (token == NULL)
+ DBUG_RETURN(TRUE);
+ else
+ insert_dynamic(&dynamic_ids, (uchar *) token);
+ }
+ DBUG_RETURN(FALSE);
+}
+
+bool Database_ids::do_pack_dynamic_ids(String *buffer)
+{
+ char token[2000];
+
+ DBUG_ENTER("Server_ids::pack_dynamic_ids");
+
+ if (buffer->set_int(dynamic_ids.elements, FALSE, &my_charset_bin))
+ DBUG_RETURN(TRUE);
+
+ for (ulong i= 0;
+ i < dynamic_ids.elements; i++)
+ {
+ get_dynamic(&dynamic_ids, (uchar*) token, i);
+ if (buffer->append(" ") ||
+ buffer->append(token))
+ DBUG_RETURN(TRUE);
+ }
+
+ DBUG_RETURN(FALSE);
+}
=== renamed file 'sql/server_ids.h' => 'sql/dynamic_ids.h'
--- a/sql/server_ids.h 2010-10-25 10:39:01 +0000
+++ b/sql/dynamic_ids.h 2010-12-01 19:15:08 +0000
@@ -1,20 +1,67 @@
-#ifndef SERVER_ID_H
+/* Copyright (c) 2000, 2010 Oracle and/or its affiliates. All rights reserved.
-#define SERVER_ID_H
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef DYNAMIC_ID_H
+
+#define DYNAMIC_ID_H
#include <my_sys.h>
#include <sql_string.h>
-class Server_ids
+class Dynamic_ids
{
- public:
- DYNAMIC_ARRAY server_ids;
+public:
+ DYNAMIC_ARRAY dynamic_ids;
- Server_ids();
- ~Server_ids();
+ Dynamic_ids();
+ virtual ~Dynamic_ids();
- bool pack_server_ids(String *buffer);
- bool unpack_server_ids(char *param_server_ids);
+ bool pack_dynamic_ids(String *buffer)
+ {
+ return(do_pack_dynamic_ids(buffer));
+ }
+
+ bool unpack_dynamic_ids(char *param_dynamic_ids)
+ {
+ return(do_unpack_dynamic_ids(param_dynamic_ids));
+ }
+
+private:
+ virtual bool do_pack_dynamic_ids(String *buffer)= 0;
+ virtual bool do_unpack_dynamic_ids(char *param_dynamic_ids)= 0;
};
+class Server_ids : public Dynamic_ids
+{
+public:
+ Server_ids() { };
+ virtual ~Server_ids() { };
+
+private:
+ bool do_pack_dynamic_ids(String *buffer);
+ bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+};
+
+class Database_ids : public Dynamic_ids
+{
+public:
+ Database_ids() { };
+ virtual ~Database_ids() { };
+
+private:
+ bool do_pack_dynamic_ids(String *buffer);
+ bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+};
#endif
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-02 17:46:46 +0000
+++ b/sql/log_event.cc 2010-12-02 18:13:12 +0000
@@ -2667,14 +2667,12 @@ int slave_worker_exec_job(Slave_worker *
if (ev->starts_group())
{
w->curr_group_seen_begin= TRUE; // The current group is started with B-event
- // ANDREI ----
- // w->configure_position(rli);
}
else
{
if (ev->contains_partition_info())
{
- DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+ DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
uint i;
char key[NAME_LEN + 2];
bool found= FALSE;
@@ -2704,7 +2702,8 @@ int slave_worker_exec_job(Slave_worker *
DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed %lu", ev->mts_group_cnt, w->last_group_done_index));
w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
- w->checkpoint(w->w_rli);
+ if (!(ev->get_type_code() == XID_EVENT && w->is_transactional()))
+ w->commit_positions(ev);
}
mysql_mutex_lock(&w->jobs_lock);
@@ -6238,13 +6237,15 @@ int Xid_log_event::do_apply_event(Relay_
the context of the current transaction in order to provide
data integrity. See sql/rpl_rli.h for further details.
*/
- bool is_trans_repo= rli_ptr->is_transactional();
+ Slave_worker *w= rli_ptr->get_current_worker();
+ bool is_parallel= (w != NULL);
+ bool is_trans_repo= (is_parallel ? w->is_transactional() : rli_ptr->is_transactional());
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
- if (is_trans_repo)
+ if (is_trans_repo && !is_parallel)
{
mysql_mutex_lock(&rli_ptr->data_lock);
}
@@ -6262,7 +6263,7 @@ int Xid_log_event::do_apply_event(Relay_
/*
We need to update the positions in here to make it transactional.
*/
- if (is_trans_repo)
+ if (is_trans_repo && !is_parallel)
{
rli_ptr->inc_event_relay_log_pos();
rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
@@ -6278,6 +6279,11 @@ int Xid_log_event::do_apply_event(Relay_
if ((error= rli_ptr->flush_info(TRUE)))
goto err;
}
+ else if (is_trans_repo && is_parallel)
+ {
+ if ((error= w->commit_positions(this)))
+ goto err;
+ }
DBUG_PRINT("info", ("do_apply group master %s %lu group relay %s %lu event %s %lu\n",
rli_ptr->get_group_master_log_name(),
@@ -6294,7 +6300,7 @@ int Xid_log_event::do_apply_event(Relay_
thd->mdl_context.release_transactional_locks();
err:
- if (is_trans_repo)
+ if (is_trans_repo && !is_parallel)
{
mysql_cond_broadcast(&rli_ptr->data_cond);
mysql_mutex_unlock(&rli_ptr->data_lock);
=== modified file 'sql/rpl_info_dummy.cc'
--- a/sql/rpl_info_dummy.cc 2010-11-30 02:08:01 +0000
+++ b/sql/rpl_info_dummy.cc 2010-12-01 19:15:08 +0000
@@ -96,7 +96,7 @@ bool Rpl_info_dummy::do_set_info(const i
}
bool Rpl_info_dummy::do_set_info(const int pos __attribute__((unused)),
- const Server_ids *value __attribute__((unused)))
+ const Dynamic_ids *value __attribute__((unused)))
{
if (abort) DBUG_ASSERT(0);
return FALSE;
@@ -136,8 +136,8 @@ bool Rpl_info_dummy::do_get_info(const i
}
bool Rpl_info_dummy::do_get_info(const int pos __attribute__((unused)),
- Server_ids *value __attribute__((unused)),
- const Server_ids *default_value __attribute__((unused)))
+ Dynamic_ids *value __attribute__((unused)),
+ const Dynamic_ids *default_value __attribute__((unused)))
{
if (abort) DBUG_ASSERT(0);
return FALSE;
=== modified file 'sql/rpl_info_dummy.h'
--- a/sql/rpl_info_dummy.h 2010-11-30 02:08:01 +0000
+++ b/sql/rpl_info_dummy.h 2010-12-01 19:15:08 +0000
@@ -47,7 +47,7 @@ private:
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
- bool do_set_info(const int pos, const Server_ids *value);
+ bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
bool do_get_info(const int pos, int *value,
@@ -56,8 +56,8 @@ private:
const ulong default_value);
bool do_get_info(const int pos, float *value,
const float default_value);
- bool do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value);
+ bool do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value);
char* do_get_description_info();
bool do_is_transactional();
=== modified file 'sql/rpl_info_file.cc'
--- a/sql/rpl_info_file.cc 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_file.cc 2010-12-01 19:15:08 +0000
@@ -217,7 +217,7 @@ bool Rpl_info_file::do_set_info(const in
FALSE : TRUE);
}
-bool Rpl_info_file::do_set_info(const int pos, const Server_ids *value)
+bool Rpl_info_file::do_set_info(const int pos, const Dynamic_ids *value)
{
bool error= TRUE;
String buffer;
@@ -225,7 +225,7 @@ bool Rpl_info_file::do_set_info(const in
/*
This produces a line listing the total number and all the server_ids.
*/
- if (const_cast<Server_ids *>(value)->pack_server_ids(&buffer))
+ if (const_cast<Dynamic_ids *>(value)->pack_dynamic_ids(&buffer))
goto err;
error= (my_b_printf(&info_file, "%s\n", buffer.c_ptr_safe()) >
@@ -262,8 +262,8 @@ bool Rpl_info_file::do_get_info(const in
default_value));
}
-bool Rpl_info_file::do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value __attribute__((unused)))
+bool Rpl_info_file::do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value __attribute__((unused)))
{
/*
Static buffer to use most of the times. However, if it is not big
@@ -277,7 +277,7 @@ bool Rpl_info_file::do_get_info(const in
&buffer_act,
&info_file);
if (!error)
- value->unpack_server_ids(buffer_act);
+ value->unpack_dynamic_ids(buffer_act);
if (buffer != buffer_act)
{
=== modified file 'sql/rpl_info_file.h'
--- a/sql/rpl_info_file.h 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_file.h 2010-12-01 19:15:08 +0000
@@ -54,7 +54,7 @@ private:
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
- bool do_set_info(const int pos, const Server_ids *value);
+ bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
bool do_get_info(const int pos, int *value,
@@ -63,8 +63,8 @@ private:
const ulong default_value);
bool do_get_info(const int pos, float *value,
const float default_value);
- bool do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value);
+ bool do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value);
char* do_get_description_info();
bool do_is_transactional();
=== modified file 'sql/rpl_info_handler.h'
--- a/sql/rpl_info_handler.h 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_handler.h 2010-12-01 19:15:08 +0000
@@ -17,7 +17,7 @@
#define RPL_INFO_HANDLER_H
#include <my_global.h>
-#include <server_ids.h>
+#include <dynamic_ids.h>
#include "rpl_info_values.h"
class Rpl_info_handler
@@ -224,8 +224,8 @@ public:
@retval FALSE No error
@retval TRUE Failure
*/
- bool get_info(Server_ids *value,
- const Server_ids *default_value)
+ bool get_info(Dynamic_ids *value,
+ const Dynamic_ids *default_value)
{
if (cursor >= ninfo || prv_error)
return TRUE;
@@ -317,7 +317,7 @@ private:
virtual bool do_set_info(const int pos, const ulong value)= 0;
virtual bool do_set_info(const int pos, const int value)= 0;
virtual bool do_set_info(const int pos, const float value)= 0;
- virtual bool do_set_info(const int pos, const Server_ids *value)= 0;
+ virtual bool do_set_info(const int pos, const Dynamic_ids *value)= 0;
virtual bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value)= 0;
virtual bool do_get_info(const int pos, ulong *value,
@@ -326,8 +326,8 @@ private:
const int default_value)= 0;
virtual bool do_get_info(const int pos, float *value,
const float default_value)= 0;
- virtual bool do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value)= 0;
+ virtual bool do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value)= 0;
virtual char* do_get_description_info()= 0;
virtual bool do_is_transactional()= 0;
=== modified file 'sql/rpl_info_table.cc'
--- a/sql/rpl_info_table.cc 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_table.cc 2010-12-01 19:15:08 +0000
@@ -354,9 +354,9 @@ bool Rpl_info_table::do_set_info(const i
&my_charset_bin));
}
-bool Rpl_info_table::do_set_info(const int pos, const Server_ids *value)
+bool Rpl_info_table::do_set_info(const int pos, const Dynamic_ids *value)
{
- if (const_cast<Server_ids *>(value)->pack_server_ids(&field_values->value[pos]))
+ if (const_cast<Dynamic_ids *>(value)->pack_dynamic_ids(&field_values->value[pos]))
return TRUE;
return FALSE;
@@ -428,10 +428,10 @@ bool Rpl_info_table::do_get_info(const i
return TRUE;
}
-bool Rpl_info_table::do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value __attribute__((unused)))
+bool Rpl_info_table::do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value __attribute__((unused)))
{
- if (value->unpack_server_ids(field_values->value[pos].c_ptr_safe()))
+ if (value->unpack_dynamic_ids(field_values->value[pos].c_ptr_safe()))
return TRUE;
return FALSE;
=== modified file 'sql/rpl_info_table.h'
--- a/sql/rpl_info_table.h 2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_table.h 2010-12-01 19:15:08 +0000
@@ -73,7 +73,7 @@ private:
bool do_set_info(const int pos, const int value);
bool do_set_info(const int pos, const ulong value);
bool do_set_info(const int pos, const float value);
- bool do_set_info(const int pos, const Server_ids *value);
+ bool do_set_info(const int pos, const Dynamic_ids *value);
bool do_get_info(const int pos, char *value, const size_t size,
const char *default_value);
bool do_get_info(const int pos, int *value,
@@ -82,8 +82,8 @@ private:
const ulong default_value);
bool do_get_info(const int pos, float *value,
const float default_value);
- bool do_get_info(const int pos, Server_ids *value,
- const Server_ids *default_value);
+ bool do_get_info(const int pos, Dynamic_ids *value,
+ const Dynamic_ids *default_value);
char* do_get_description_info();
bool do_is_transactional();
=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc 2010-11-30 02:08:01 +0000
+++ b/sql/rpl_mi.cc 2010-12-01 19:15:08 +0000
@@ -122,13 +122,13 @@ int change_master_server_id_cmp(ulong *i
*/
bool Master_info::shall_ignore_server_id(ulong s_id)
{
- if (likely(ignore_server_ids->server_ids.elements == 1))
+ if (likely(ignore_server_ids->dynamic_ids.elements == 1))
return (* (ulong*)
- dynamic_array_ptr(&(ignore_server_ids->server_ids), 0)) == s_id;
+ dynamic_array_ptr(&(ignore_server_ids->dynamic_ids), 0)) == s_id;
else
return bsearch((const ulong *) &s_id,
- ignore_server_ids->server_ids.buffer,
- ignore_server_ids->server_ids.elements, sizeof(ulong),
+ ignore_server_ids->dynamic_ids.buffer,
+ ignore_server_ids->dynamic_ids.elements, sizeof(ulong),
(int (*) (const void*, const void*)) change_master_server_id_cmp)
!= NULL;
}
@@ -384,7 +384,7 @@ bool Master_info::read_info(Rpl_info_han
*/
if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS)
{
- if (from->get_info(ignore_server_ids, (Server_ids *) NULL))
+ if (from->get_info(ignore_server_ids, (Dynamic_ids *) NULL))
DBUG_RETURN(TRUE);
}
=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h 2010-11-30 02:08:01 +0000
+++ b/sql/rpl_mi.h 2010-12-01 19:15:08 +0000
@@ -92,7 +92,7 @@ class Master_info : public Rpl_info_coor
long clock_diff_with_master;
float heartbeat_period; // interface with CHANGE MASTER or master.info
ulonglong received_heartbeats; // counter of received heartbeat events
- Server_ids *ignore_server_ids;
+ Dynamic_ids *ignore_server_ids;
ulong master_id;
ulong retry_count;
char master_uuid[UUID_LENGTH+1];
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-02 17:46:46 +0000
+++ b/sql/rpl_rli.cc 2010-12-02 18:13:12 +0000
@@ -56,7 +56,7 @@ PSI_cond_key *key_cond_slave_parallel_wo
PSI_cond_key key_cond_slave_parallel_pend_jobs;
Relay_log_info::Relay_log_info(bool is_slave_recovery)
- :Rpl_info_coordinator("SQL"),
+ :Rpl_info_coordinator("SQL"), checkpoint_thd(0), checkpoint_running(0),
replicate_same_server_id(::replicate_same_server_id),
cur_log_fd(-1), relay_log(&sync_relaylog_period),
is_relay_log_recovery(is_slave_recovery),
@@ -82,6 +82,8 @@ Relay_log_info::Relay_log_info(bool is_s
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
+ mysql_cond_init(key_checkpoint_start_cond, &checkpoint_start_cond, NULL);
+ mysql_cond_init(key_checkpoint_stop_cond, &checkpoint_stop_cond, NULL);
relay_log.init_pthread_objects();
#if 0
@@ -190,6 +192,8 @@ Relay_log_info::~Relay_log_info()
{
DBUG_ENTER("Relay_log_info::~Relay_log_info");
+ mysql_cond_destroy(&checkpoint_start_cond);
+ mysql_cond_destroy(&checkpoint_stop_cond);
mysql_mutex_destroy(&log_space_lock);
mysql_cond_destroy(&log_space_cond);
relay_log.cleanup();
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-02 17:46:46 +0000
+++ b/sql/rpl_rli.h 2010-12-02 18:13:12 +0000
@@ -105,6 +105,12 @@ tables along with the --relay-log-recove
class Relay_log_info : public Rpl_info_coordinator
{
public:
+ THD* checkpoint_thd;
+ bool checkpoint_running;
+
+ PSI_cond_key key_checkpoint_start_cond, key_checkpoint_stop_cond;
+ mysql_cond_t checkpoint_start_cond, checkpoint_stop_cond;
+
/**
Flags for the state of the replication.
*/
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-02 17:46:46 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-02 18:13:12 +0000
@@ -19,13 +19,14 @@ const char *info_slave_worker_fields []=
};
Slave_worker::Slave_worker(const char* type): Rpl_info_worker(type)
-{
-
+{
+ curr_group_exec_parts= new Database_ids();
}
Slave_worker::~Slave_worker()
{
-
+ if (curr_group_exec_parts)
+ delete curr_group_exec_parts;
}
int Slave_worker::init_info()
@@ -107,8 +108,7 @@ bool Slave_worker::read_info(Rpl_info_ha
if (from->prepare_info_for_read())
DBUG_RETURN(TRUE);
- if (from->get_info(partitions,
- sizeof(partitions), "") ||
+ if (from->get_info(curr_group_exec_parts, (Dynamic_ids *) NULL) ||
from->get_info(group_relay_log_name,
sizeof(group_relay_log_name), "") ||
from->get_info((ulong *) &temp_group_relay_log_pos,
@@ -138,7 +138,7 @@ bool Slave_worker::write_info(Rpl_info_h
*/
if (to->prepare_info_for_write() ||
- to->set_info(partitions) ||
+ to->set_info(curr_group_exec_parts) ||
to->set_info(group_relay_log_name) ||
to->set_info((ulong)group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
@@ -153,18 +153,15 @@ size_t Slave_worker::get_number_worker_f
return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
}
-bool Slave_worker::checkpoint(Relay_log_info *info)
+bool Slave_worker::commit_positions(Log_event *ev)
{
- DBUG_ENTER("Relay_coordinator::checkpoint_worker");
+ DBUG_ENTER("Slave_worker::checkpoint_positions");
bool error= FALSE;
- group_relay_log_pos= info->get_group_relay_log_pos();
- strmake(group_relay_log_name, info->get_group_relay_log_name(),
- sizeof(group_relay_log_name)-1);
-
- group_master_log_pos= info->get_group_master_log_pos();
- strmake(group_master_log_name, info->get_group_master_log_name(),
+ group_relay_log_pos= ev->future_event_relay_log_pos;
+ group_master_log_pos= ev->mts_group_cnt;
+ strmake(group_master_log_name, c_rli->get_group_master_log_name(),
sizeof(group_master_log_name)-1);
error= flush_info(TRUE);
@@ -172,7 +169,6 @@ bool Slave_worker::checkpoint(Relay_log_
DBUG_RETURN(error);
}
-
static HASH mapping_db_to_worker;
static bool inited_hash_workers= FALSE;
@@ -200,7 +196,7 @@ static void free_entry(db_worker *entry)
{
DBUG_ENTER("free_entry");
- DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
+ DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
my_free((void *) entry->db);
my_free(entry);
@@ -480,7 +476,7 @@ Slave_worker *get_least_occupied_worker(
void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
{
- uint i;
+ int i;
if (!error)
{
@@ -505,15 +501,15 @@ void Slave_worker::slave_worker_ends_gro
last_group_done_index = gaq_idx;
}
- for (i= curr_group_exec_parts.elements; i > 0; i--)
+ for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
{
db_worker *entry= NULL;
my_hash_value_type hash_value;
char key[NAME_LEN + 2];
- get_dynamic(&curr_group_exec_parts, (uchar*) key, i - 1);
+ get_dynamic(&(curr_group_exec_parts->dynamic_ids), (uchar*) key, i - 1);
hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
-
+
mysql_mutex_lock(&slave_worker_hash_lock);
entry= (db_worker *)
@@ -521,7 +517,6 @@ void Slave_worker::slave_worker_ends_gro
(uchar*) key + 1, key[0]);
DBUG_ASSERT(entry && entry->usage != 0);
-
DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
entry->usage--;
@@ -544,8 +539,8 @@ void Slave_worker::slave_worker_ends_gro
*/
mysql_mutex_unlock(&slave_worker_hash_lock);
-
- delete_dynamic_element(&curr_group_exec_parts, i - 1);
+
+ delete_dynamic_element(&(curr_group_exec_parts->dynamic_ids), i - 1);
}
curr_group_seen_begin= FALSE;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-01 19:15:08 +0000
@@ -173,7 +173,7 @@ public:
// fixme: experimental
Relay_log_info *w_rli;
- DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+ Dynamic_ids *curr_group_exec_parts; // CGEP
bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
// @c last_group_done_index is for recovery, although can be viewed
// as statistics as well.
@@ -218,7 +218,7 @@ public:
void slave_worker_ends_group(ulong, int); // CGEP walk through to upd APH
- bool checkpoint(Relay_log_info *rli);
+ bool commit_positions(Log_event *evt);
private:
bool read_info(Rpl_info_handler *from);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-02 17:46:46 +0000
+++ b/sql/rpl_slave.cc 2010-12-02 18:13:12 +0000
@@ -51,7 +51,7 @@
#include "log_event.h" // Rotate_log_event,
// Create_file_log_event,
// Format_description_log_event
-#include "server_ids.h"
+#include "dynamic_ids.h"
#include "rpl_rli_pdb.h"
#ifdef HAVE_REPLICATION
@@ -140,7 +140,7 @@ failed read"
};
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER} SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -168,6 +168,7 @@ static int terminate_slave_thread(THD *t
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
+bool checkpoint_routine(Relay_log_info *rli);
/*
Find out which replications threads are running
@@ -1504,7 +1505,7 @@ maybe it is a *VERY OLD MASTER*.");
mysql_free_result(master_res);
master_res= NULL;
}
- if (mi->master_id == 0 && mi->ignore_server_ids->server_ids.elements > 0)
+ if (mi->master_id == 0 && mi->ignore_server_ids->dynamic_ids.elements > 0)
{
errmsg= "Slave configured with server id filtering could not detect the master server id.";
err_code= ER_SLAVE_FATAL_ERROR;
@@ -2076,11 +2077,11 @@ bool show_master_info(THD* thd, Master_i
char buff[FN_REFLEN];
ulong i, cur_len;
for (i= 0, buff[0]= 0, cur_len= 0;
- i < mi->ignore_server_ids->server_ids.elements; i++)
+ i < mi->ignore_server_ids->dynamic_ids.elements; i++)
{
ulong s_id, slen;
char sbuff[FN_REFLEN];
- get_dynamic(&(mi->ignore_server_ids->server_ids), (uchar*) &s_id, i);
+ get_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id, i);
slen= sprintf(sbuff, (i == 0 ? "%lu" : ", %lu"), s_id);
if (cur_len + slen + 4 > FN_REFLEN)
{
@@ -2668,41 +2669,15 @@ int apply_event_and_update_pos(Log_event
See sql/rpl_rli.h for further details.
*/
int error= 0;
- if (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
+ if ((rli->curr_group_is_parallel == FALSE &&
+ !(ev->get_type_code() == XID_EVENT && rli->is_transactional())) ||
skip_event)
-// TODO: Alfranio to fix/restore the condition to update the main RLI
-// It is kept the prototype time way in order to process rpl_parallel.
-// This is parallel exec and event required the sequential mode
-// that also includes all Workers finished their assignements.
-// It was served so inside apply_event() above.
-// The main RLI table is safe to update now.
-
-// if (!rli->is_parallel_exec() || ev->only_sequential_exec())
- error= ev->update_pos(rli);
-#if 1
+ error= ev->update_pos(rli);
- // experimental checkpoint per each scheduling attempt
- // logics of next_event()
+ /* Temporarily placed here /Alfranio */
if (rli->is_parallel_exec())
- {
- uint i;
- rli->gaq->move_queue_head(&rli->workers);
-
- /* TODO:
- the least occupied sorting out needs moving to the actual
- checkpoint location - next_event()
- */
- for (i= 0; i < rli->workers.elements; i++)
- {
- Slave_worker *w_i;
- get_dynamic(&rli->workers, (uchar *) &w_i, i);
- set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
- };
- sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
- }
-
-#endif
+ checkpoint_routine(rli);
#ifndef DBUG_OFF
DBUG_PRINT("info", ("update_pos error = %d", error));
@@ -3623,6 +3598,151 @@ err:
DBUG_RETURN(0);
}
+bool checkpoint_routine(Relay_log_info *rli)
+{
+ bool error= FALSE;
+
+ DBUG_ENTER("checkpoint_routine");
+
+ // ANDREI LOCKS?
+
+ uint i;
+ rli->gaq->move_queue_head(&rli->workers);
+
+ /* TODO:
+ the least occupied sorting out needs moving to the actual
+ checkpoint location - next_event()
+ */
+ for (i= 0; i < rli->workers.elements; i++)
+ {
+ Slave_worker *w_i;
+ get_dynamic(&rli->workers, (uchar *) &w_i, i);
+ set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
+ };
+ sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
+
+ error= rli->flush_info(TRUE);
+
+ // ANDREI NOTIFICATIONS?
+ DBUG_RETURN(error);
+}
+
+pthread_handler_t handle_slave_checkpoint(void *arg)
+{
+ THD *thd; /* needs to be first for thread_stack */
+ int error= 0;
+ Relay_log_info *rli= (Relay_log_info *) arg;
+
+ my_thread_init();
+ DBUG_ENTER("handle_slave_worker");
+
+ rli->checkpoint_running= 1;
+
+ thd= new THD;
+ if (!thd)
+ {
+ sql_print_error("Failed during slave checkpoint initialization");
+
+ mysql_mutex_lock(&rli->data_lock);
+ mysql_cond_broadcast(&rli->checkpoint_start_cond);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ goto err;
+ }
+
+ thd->thread_stack = (char*)&thd;
+
+ pthread_detach_this_thread();
+
+ mysql_mutex_lock(&rli->data_lock);
+ rli->checkpoint_thd= thd;
+ mysql_cond_broadcast(&rli->checkpoint_start_cond);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ if (init_slave_thread(thd, SLAVE_THD_CHECKPOINT))
+ {
+ // todo make SQL thread killed
+ sql_print_error("Failed during slave worker initialization");
+ goto err;
+ }
+ thd->init_for_queries();
+ mysql_mutex_lock(&LOCK_thread_count);
+ threads.append(thd);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ DBUG_ASSERT(thd->is_slave_error == 0);
+
+ while (!thd->killed && !error)
+ {
+ checkpoint_routine(rli);
+ sleep(1);
+ }
+
+ mysql_mutex_lock(&rli->data_lock);
+ rli->checkpoint_running= 0;
+ rli->checkpoint_thd= NULL;
+ mysql_cond_broadcast(&rli->checkpoint_stop_cond);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ if (error)
+ {
+ mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
+ rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
+ mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+ }
+
+err:
+
+ if (thd)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ THD_CHECK_SENTRY(thd);
+ delete thd;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(0);
+}
+
+int slave_start_checkpoint(Relay_log_info *rli)
+{
+ pthread_t th;
+ int error= 0;
+
+ if (pthread_create(&th, &connection_attrib, handle_slave_checkpoint,
+ (void*) rli))
+ {
+ sql_print_error("Failed during slave checkpoint thread create");
+ error= 1;
+ }
+
+ while (error == 0 && rli->checkpoint_running == 0)
+ mysql_cond_wait(&rli->checkpoint_start_cond, &rli->data_lock);
+
+ mysql_mutex_unlock(&rli->data_lock);
+
+ return (error);
+}
+
+int slave_stop_checkpoint(Relay_log_info *rli)
+{
+ mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
+ mysql_mutex_lock(&rli->data_lock);
+ if (rli->checkpoint_thd)
+ rli->checkpoint_thd->awake(THD::KILL_QUERY);
+ mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+
+ mysql_mutex_lock(&rli->data_lock);
+ while (rli->checkpoint_running == 1)
+ mysql_cond_wait(&rli->checkpoint_stop_cond, &rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ return 0;
+}
+
/**
A single Worker thread is forked out.
@@ -3679,11 +3799,7 @@ int slave_start_single_worker(Relay_log_
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
- // CGEP dynarray holds id:s of partitions of the Current being executed Group
- my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
- SLAVE_INIT_DBS_IN_GROUP, 1);
w->curr_group_seen_begin= FALSE;
-
if (pthread_create(&th, &connection_attrib, handle_slave_worker,
(void*) w))
{
@@ -3757,6 +3873,12 @@ int slave_start_workers(Relay_log_info *
goto err;
}
+ /* if (rli->is_parallel_exec())
+ {
+ slave_start_checkpoint(rli);
+ }
+ */
+
err:
return error;
}
@@ -3816,7 +3938,6 @@ void slave_stop_workers(Relay_log_info *
DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
delete_dynamic(&w->jobs.Q);
- delete_dynamic(&w->curr_group_exec_parts); // GCEP
delete_dynamic_element(&rli->workers, i);
delete w->w_rli;
@@ -4742,7 +4863,7 @@ static int queue_event(Master_info* mi,c
If the master is on the ignore list, execution of
format description log events and rotate events is necessary.
*/
- (mi->ignore_server_ids->server_ids.elements > 0 &&
+ (mi->ignore_server_ids->dynamic_ids.elements > 0 &&
mi->shall_ignore_server_id(s_id) &&
/* everything is filtered out from non-master */
(s_id != mi->master_id ||
@@ -5159,7 +5280,10 @@ static Log_event* next_event(Relay_log_i
llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->get_event_relay_log_pos(),llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(cur_log) == rli->get_event_relay_log_pos());
+
+ // TODO: sort out with Alfranio
+
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->get_event_relay_log_pos() || rli->is_parallel_exec());
DBUG_PRINT("info", ("next_event group master %s %lu group relay %s %lu event %s %lu\n",
rli->get_group_master_log_name(),
@@ -6027,7 +6151,7 @@ bool change_master(THD* thd, Master_info
is mentioning IGNORE_SERVER_IDS= (...)
*/
if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
- reset_dynamic(&mi->ignore_server_ids->server_ids);
+ reset_dynamic(&(mi->ignore_server_ids->dynamic_ids));
for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
{
ulong s_id;
@@ -6041,14 +6165,14 @@ bool change_master(THD* thd, Master_info
else
{
if (bsearch((const ulong *) &s_id,
- mi->ignore_server_ids->server_ids.buffer,
- mi->ignore_server_ids->server_ids.elements, sizeof(ulong),
+ mi->ignore_server_ids->dynamic_ids.buffer,
+ mi->ignore_server_ids->dynamic_ids.elements, sizeof(ulong),
(int (*) (const void*, const void*))
change_master_server_id_cmp) == NULL)
- insert_dynamic(&mi->ignore_server_ids->server_ids, (uchar*) &s_id);
+ insert_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id);
}
}
- sort_dynamic(&mi->ignore_server_ids->server_ids, (qsort_cmp) change_master_server_id_cmp);
+ sort_dynamic(&(mi->ignore_server_ids->dynamic_ids), (qsort_cmp) change_master_server_id_cmp);
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
@@ -6225,61 +6349,3 @@ err:
@} (end of group Replication)
*/
#endif /* HAVE_REPLICATION */
-
-Server_ids::Server_ids()
-{
- my_init_dynamic_array(&server_ids, sizeof(::server_id), 16, 16);
-}
-
-Server_ids::~Server_ids()
-{
- delete_dynamic(&server_ids);
-}
-
-bool Server_ids::unpack_server_ids(char *param_server_ids)
-{
- char *token= NULL, *last= NULL;
- uint num_items= 0;
-
- DBUG_ENTER("Server_ids::unpack_server_ids");
-
- token= strtok_r((char *)const_cast<const char*>(param_server_ids),
- " ", &last);
-
- if (token == NULL)
- DBUG_RETURN(TRUE);
-
- num_items= atoi(token);
- for (uint i=0; i < num_items; i++)
- {
- token= strtok_r(NULL, " ", &last);
- if (token == NULL)
- DBUG_RETURN(TRUE);
- else
- {
- ulong val= atol(token);
- insert_dynamic(&server_ids, (uchar *) &val);
- }
- }
- DBUG_RETURN(FALSE);
-}
-
-bool Server_ids::pack_server_ids(String *buffer)
-{
- DBUG_ENTER("Server_ids::pack_server_ids");
-
- if (buffer->set_int(server_ids.elements, FALSE, &my_charset_bin))
- DBUG_RETURN(TRUE);
-
- for (ulong i= 0;
- i < server_ids.elements; i++)
- {
- ulong s_id;
- get_dynamic(&server_ids, (uchar*) &s_id, i);
- if (buffer->append(" ") ||
- buffer->append_ulonglong(s_id))
- DBUG_RETURN(TRUE);
- }
-
- DBUG_RETURN(FALSE);
-}
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101202181312-vpfjgnnpbu4fher6.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3225) WL#5569 | Andrei Elkin | 2 Dec |