Below is the list of changes that have just been committed into a
4.0 repository of sasha. When sasha does a push, they will be propogated to
the main repository and within 24 hours after the push to the public repository.
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html
ChangeSet@stripped, 2001-10-20 07:07:22-06:00, sasha@stripped
temporary commit needed to fix the mess I've made when I did bk rm on an edited file
will not be pushed
BitKeeper/etc/ignore
1.103 01/10/20 07:07:21 sasha@stripped +0 -1
remove my_global.h
libmysql/libmysql.c
1.79 01/10/20 07:07:21 sasha@stripped +13 -5
fixes for new format of SHOW SLAVE HOSTS
mysql-test/r/rpl000002.result
1.13 01/10/20 07:07:21 sasha@stripped +2 -2
updated result
mysys/mf_iocache.c
1.10 01/10/20 07:07:21 sasha@stripped +119 -46
merge with mf_iocache.cc
sql/Makefile.am
1.63 01/10/20 07:07:21 sasha@stripped +1 -1
remove mf_iocache.cc
sql/repl_failsafe.cc
1.4 01/10/20 07:07:21 sasha@stripped +176 -0
some new code, not finished
sql/repl_failsafe.h
1.4 01/10/20 07:07:21 sasha@stripped +6 -0
a couple of new routines
sql/slave.cc
1.134 01/10/20 07:07:21 sasha@stripped +36 -30
fail-safe replication updates, clean-up
sql/sql_repl.cc
1.61 01/10/20 07:07:21 sasha@stripped +10 -0
updated SHOW SLAVE HOSTS format
sql/sql_repl.h
1.14 01/10/20 07:07:21 sasha@stripped +1 -0
SHOW SLAVE HOSTS updates
BitKeeper/deleted/.del-mf_iocache.cc~51294525f4fced3d
1.19 01/10/19 22:07:52 sasha@stripped +0 -0
Delete: sql/mf_iocache.cc
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: sasha
# Host: mysql.sashanet.com
# Root: /home/sasha/src/bk/mysql-4.0
--- 1.78/libmysql/libmysql.c Wed Oct 17 10:39:38 2001
+++ 1.79/libmysql/libmysql.c Sat Oct 20 07:07:21 2001
@@ -1147,6 +1147,8 @@
MYSQL_ROW row;
int error = 1;
int has_auth_info;
+ int port_ind;
+
if (!mysql->net.vio && !mysql_real_connect(mysql,0,0,0,0,0,0,0))
{
expand_error(mysql, CR_PROBE_MASTER_CONNECT);
@@ -1162,8 +1164,14 @@
switch (mysql_num_fields(res))
{
- case 3: has_auth_info = 0; break;
- case 5: has_auth_info = 1; break;
+ case 5:
+ has_auth_info = 0;
+ port_ind=2;
+ break;
+ case 7:
+ has_auth_info = 1;
+ port_ind=4;
+ break;
default:
goto err;
}
@@ -1175,8 +1183,8 @@
if (has_auth_info)
{
- tmp_user = row[3];
- tmp_pass = row[4];
+ tmp_user = row[2];
+ tmp_pass = row[3];
}
else
{
@@ -1184,7 +1192,7 @@
tmp_pass = mysql->passwd;
}
- if (!(slave = spawn_init(mysql, row[1], atoi(row[2]),
+ if (!(slave = spawn_init(mysql, row[1], atoi(row[port_ind]),
tmp_user, tmp_pass)))
goto err;
--- 1.9/mysys/mf_iocache.c Fri Aug 3 15:57:52 2001
+++ 1.10/mysys/mf_iocache.c Sat Oct 20 07:07:21 2001
@@ -39,7 +39,7 @@
#include <errno.h>
static void my_aiowait(my_aio_result *result);
#endif
-
+#include <assert.h>
/*
** if cachesize == 0 then use default cachesize (from s-file)
@@ -55,7 +55,8 @@
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
- info->file=file;
+ /* There is no file in net_reading */
+ info->file= file;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize)
@@ -80,33 +81,39 @@
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
- use_async_io=0; /* No nead to use async */
+ use_async_io=0; /* No need to use async */
}
}
}
-
- for (;;)
+ if ((int) type < (int) READ_NET)
{
- cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
- (ulong) ~(min_cache-1));
- if (cachesize < min_cache)
- cachesize = min_cache;
- if ((info->buffer=
- (byte*) my_malloc(cachesize,
- MYF((cache_myflags & ~ MY_WME) |
- (cachesize == min_cache ? MY_WME : 0)))) != 0)
- break; /* Enough memory found */
- if (cachesize == min_cache)
- DBUG_RETURN(2); /* Can't alloc cache */
- cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
+ for (;;)
+ {
+ cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
+ (ulong) ~(min_cache-1));
+ if (cachesize < min_cache)
+ cachesize = min_cache;
+ if ((info->buffer=
+ (byte*) my_malloc(cachesize,
+ MYF((cache_myflags & ~ MY_WME) |
+ (cachesize == min_cache ? MY_WME : 0)))) != 0)
+ break; /* Enough memory found */
+ if (cachesize == min_cache)
+ DBUG_RETURN(2); /* Can't alloc cache */
+ cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
+ }
}
- info->pos_in_file=seek_offset;
+ else
+ info->buffer=0;
+ DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
+ info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
- info->seek_not_done= test(file >= 0); /* Seek not done */
+ info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
+ type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE)
+ if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
@@ -114,10 +121,12 @@
{
info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
}
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
+ /* end_of_file may be changed by user later */
+ info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
+ : ~(my_off_t) 0);
info->type=type;
info->error=0;
- info->read_function=_my_b_read;
+ info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
@@ -169,6 +178,8 @@
DBUG_ENTER("reinit_io_cache");
info->seek_not_done= test(info->file >= 0); /* Seek not done */
+
+ /* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file +
@@ -179,8 +190,12 @@
info->rc_end=info->rc_pos;
info->end_of_file=my_b_tell(info);
}
- else if (info->type == READ_CACHE && type == WRITE_CACHE)
- info->rc_end=info->buffer+info->buffer_length;
+ else if (type == WRITE_CACHE)
+ {
+ if (info->type == READ_CACHE)
+ info->rc_end=info->buffer+info->buffer_length;
+ info->end_of_file = ~(my_off_t) 0;
+ }
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
@@ -188,13 +203,22 @@
}
else
{
+ /*
+ If we change from WRITE_CACHE to READ_CACHE, assume that everything
+ after the current positions should be ignored
+ */
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
- if (flush_io_cache(info))
+ /* No need to flush cache if we want to reuse it */
+ if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
DBUG_RETURN(1);
- info->pos_in_file=seek_offset;
+ if (info->pos_in_file != seek_offset)
+ {
+ info->pos_in_file=seek_offset;
+ info->seek_not_done=1;
+ }
info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE)
+ if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
@@ -202,19 +226,23 @@
{
info->rc_end=info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
+ info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
+ ~(my_off_t) 0);
}
}
info->type=type;
info->error=0;
- info->read_function=_my_b_read;
+ info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read;
#ifdef HAVE_AIOWAIT
- if (use_async_io && ! my_disable_async_io &&
- ((ulong) info->buffer_length <
- (ulong) (info->end_of_file - seek_offset)))
+ if (type != READ_NET)
{
- info->read_length=info->buffer_length/2;
- info->read_function=_my_b_async_read;
+ if (use_async_io && ! my_disable_async_io &&
+ ((ulong) info->buffer_length <
+ (ulong) (info->end_of_file - seek_offset)))
+ {
+ info->read_length=info->buffer_length/2;
+ info->read_function=_my_b_async_read;
+ }
}
info->inited=0;
#endif
@@ -223,18 +251,27 @@
- /* Read buffered. Returns 1 if can't read requested characters */
- /* Returns 0 if record read */
+ /*
+ Read buffered. Returns 1 if can't read requested characters
+ This function is only called from the my_b_read() macro
+ when there isn't enough characters in the buffer to
+ satisfy the request.
+ Returns 0 we succeeded in reading all data
+ */
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
-
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
- Buffer+=left_length;
- Count-=left_length;
+
+ if ((left_length=(uint) (info->rc_end-info->rc_pos)))
+ {
+ dbug_assert(Count >= left_length); /* User is not using my_b_read() */
+ memcpy(Buffer,info->rc_pos, (size_t) (left_length));
+ Buffer+=left_length;
+ Count-=left_length;
+ }
+ /* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
@@ -264,10 +301,10 @@
left_length+=length;
diff_length=0;
}
- max_length=info->end_of_file - pos_in_file;
- if (max_length > info->read_length-diff_length)
- max_length=info->read_length-diff_length;
-
+ max_length=info->read_length-diff_length;
+ if (info->type != READ_FIFO &&
+ (info->end_of_file - pos_in_file) < max_length)
+ max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
@@ -293,6 +330,37 @@
return 0;
}
+ /*
+ ** Read buffered from the net.
+ ** Returns 1 if can't read requested characters
+ ** Returns 0 if record read
+ */
+
+int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
+ uint Count __attribute__((unused)))
+{
+ int read_length;
+ NET *net= &(current_thd)->net;
+
+ if (info->end_of_file)
+ return 1; /* because my_b_get (no _) takes 1 byte at a time */
+ read_length=my_net_read(net);
+ if (read_length == (int) packet_error)
+ {
+ info->error= -1;
+ return 1;
+ }
+ if (read_length == 0)
+ {
+ /* End of file from client */
+ info->end_of_file = 1; return 1;
+ }
+ /* to set up stuff for my_b_get (no _) */
+ info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length;
+ Buffer[0] = info->rc_pos[0]; /* length is always 1 */
+ info->rc_pos++;
+ return 0;
+}
#ifdef HAVE_AIOWAIT
@@ -490,6 +558,11 @@
Buffer+=rest_length;
Count-=rest_length;
info->rc_pos+=rest_length;
+ if (info->pos_in_file+info->buffer_length > info->end_of_file)
+ {
+ my_errno=errno=EFBIG;
+ return info->error = -1;
+ }
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
@@ -596,7 +669,7 @@
}
}
#ifdef HAVE_AIOWAIT
- else
+ else if (info->type != READ_NET)
{
my_aiowait(&info->aio_result); /* Wait for outstanding req */
info->inited=0;
--- 1.62/sql/Makefile.am Tue Oct 9 20:57:29 2001
+++ 1.63/sql/Makefile.am Sat Oct 20 07:07:21 2001
@@ -77,7 +77,7 @@
ha_berkeley.cc ha_innobase.cc \
ha_isam.cc ha_isammrg.cc \
sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \
- sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
+ sql_load.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
slave.cc sql_repl.cc sql_union.cc \
mini_client.cc mini_client_errors.c \
--- 1.133/sql/slave.cc Wed Oct 17 14:04:26 2001
+++ 1.134/sql/slave.cc Sat Oct 20 07:07:21 2001
@@ -55,6 +55,8 @@
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
+static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
+ bool reconnect);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
@@ -615,6 +617,10 @@
int2store(buf, (uint16)report_port);
packet.append(buf, 2);
+ int4store(buf, rpl_recovery_rank);
+ packet.append(buf, 4);
+ int4store(buf, 0); /* tell the master will fill in master_id */
+ packet.append(buf, 4);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
@@ -1017,7 +1023,6 @@
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
- thd->set_time();
DBUG_ENTER("handle_slave");
pthread_detach_this_thread();
@@ -1067,6 +1072,7 @@
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
+ update_slave_list(mysql);
while (!slave_killed(thd))
{
@@ -1244,30 +1250,7 @@
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
- int slave_was_killed;
-#ifndef DBUG_OFF
- events_till_disconnect = disconnect_slave_event_count;
-#endif
- while(!(slave_was_killed = slave_killed(thd)) &&
- !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, 0))
- {
- sql_print_error("Slave thread: error connecting to master: %s (%d),\
- retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
- safe_sleep(thd, mi->connect_retry);
- }
-
- if(!slave_was_killed)
- {
- change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
- mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
- mi->user, mi->host, mi->port);
-#ifdef SIGNAL_WITH_VIO_CLOSE
- thd->set_active_vio(mysql->net.vio);
-#endif
- }
-
- return slave_was_killed;
+ return connect_to_master(thd, mysql, mi, 0);
}
/*
@@ -1275,7 +1258,8 @@
master_retry_count times
*/
-static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
+static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
+ bool reconnect)
{
int slave_was_killed;
int last_errno= -2; // impossible error
@@ -1290,12 +1274,15 @@
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
- while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
+ while (!(slave_was_killed = slave_killed(thd)) &&
+ (reconnect ? mc_mysql_reconnect(mysql) :
+ !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
- sql_print_error("Slave thread: error re-connecting to master: \
+ sql_print_error("Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
@@ -1309,18 +1296,26 @@
if (master_retry_count && err_count++ == master_retry_count)
{
slave_was_killed=1;
- change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
+ if (reconnect)
+ change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break;
}
}
if (!slave_was_killed)
{
- sql_print_error("Slave: reconnected to master '%s@%s:%d',\
+ if (reconnect)
+ sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
llstr(glob_mi.pos,llbuff));
+ else
+ {
+ change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
+ mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
+ mi->user, mi->host, mi->port);
+ }
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
@@ -1328,6 +1323,17 @@
return slave_was_killed;
}
+
+/*
+ Try to connect until successful or slave killed or we have retried
+ master_retry_count times
+*/
+
+static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
+{
+ return connect_to_master(thd, mysql, mi, 1);
+}
+
#ifdef __GNUC__
template class I_List_iterator<i_string>;
--- 1.102/BitKeeper/etc/ignore Wed Oct 17 14:08:29 2001
+++ 1.103/BitKeeper/etc/ignore Sat Oct 20 07:07:21 2001
@@ -182,7 +182,6 @@
heap/hp_test1
heap/hp_test2
include/my_config.h
-include/my_global.h
include/mysql_version.h
include/widec.h
innobase/conftest.s1
--- 1.12/mysql-test/r/rpl000002.result Wed Oct 10 18:59:44 2001
+++ 1.13/mysql-test/r/rpl000002.result Sat Oct 20 07:07:21 2001
@@ -15,8 +15,8 @@
2001
2002
show slave hosts;
-Server_id Host Port
-2 127.0.0.1 $SLAVE_MYPORT
+Server_id Host Port Rpl_recovery_rank Master_id
+2 127.0.0.1 9307 2 1
drop table t1;
slave stop;
drop table if exists t2;
--- 1.3/sql/repl_failsafe.cc Thu Oct 11 13:54:04 2001
+++ 1.4/sql/repl_failsafe.cc Sat Oct 20 07:07:21 2001
@@ -18,6 +18,9 @@
#include "mysql_priv.h"
#include "repl_failsafe.h"
+#include "sql_repl.h"
+#include "slave.h"
+#include "mini_client.h"
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
@@ -33,11 +36,184 @@
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
+static int init_failsafe_rpl_thread(THD* thd)
+{
+ DBUG_ENTER("init_failsafe_rpl_thread");
+ thd->system_thread = thd->bootstrap = 1;
+ thd->client_capabilities = 0;
+ my_net_init(&thd->net, 0);
+ thd->net.timeout = slave_net_timeout;
+ thd->max_packet_length=thd->net.max_packet;
+ thd->master_access= ~0;
+ thd->priv_user = 0;
+ thd->system_thread = 1;
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id = thread_id++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ if (init_thr_lock() ||
+ my_pthread_setspecific_ptr(THR_THD, thd) ||
+ my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
+ my_pthread_setspecific_ptr(THR_NET, &thd->net))
+ {
+ close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
+ end_thread(thd,0);
+ DBUG_RETURN(-1);
+ }
+
+ thd->mysys_var=my_thread_var;
+ thd->dbug_thread_id=my_thread_id();
+#if !defined(__WIN__) && !defined(OS2)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
+#endif
+
+ thd->mem_root.free=thd->mem_root.used=0;
+ if (thd->max_join_size == (ulong) ~0L)
+ thd->options |= OPTION_BIG_SELECTS;
+
+ thd->proc_info="Thread initialized";
+ thd->version=refresh_version;
+ thd->set_time();
+ DBUG_RETURN(0);
+}
+
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status;
+ pthread_cond_signal(&COND_rpl_status);
+ pthread_mutex_unlock(&LOCK_rpl_status);
+}
+
+int update_slave_list(MYSQL* mysql)
+{
+ MYSQL_RESULT* res=0;
+ MYSQL_ROW row;
+ const char* error=0;
+ bool have_auth_info;
+ int port_ind;
+
+ if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
+ !(res = mc_mysql_store_result(mysql)))
+ {
+ error = "Query error";
+ goto err;
+ }
+
+ switch (mc_mysql_num_fields(res))
+ {
+ case 5:
+ have_auth_info = 0;
+ port_ind=2;
+ break;
+ case 7:
+ have_auth_info = 1;
+ port_ind=4;
+ break;
+ default:
+ error = "Invalid number of fields in SHOW SLAVE HOSTS";
+ goto err;
+ }
+
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ while ((row = mc_mysql_fetch_row(res)))
+ {
+ uint32 server_id;
+ SLAVE_INFO* si;
+ server_id = atoi(row[0]);
+ if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
+ (byte*)&server_id,4)))
+ si = old_si;
+ else
+ {
+ if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
+ {
+ error = "Out of memory";
+ pthread_mutex_unlock(&LOCK_slave_list);
+ goto err;
+ }
+ si->server_id = server_id;
+ }
+ strnmov(si->host, row[1], sizeof(si->host));
+ si->port = atoi(row[port_ind]);
+ si->rpl_recovery_rank = atoi(row[port_ind+1]);
+ si->master_id = atoi(row[port_ind+2]);
+ if (have_auth_info)
+ {
+ strnmov(si->user, row[2], sizeof(si->user));
+ strnmov(si->password, row[3], sizeof(si->password));
+ }
+ }
+ pthread_mutex_unlock(&LOCK_slave_list);
+err:
+ if (res)
+ mc_mysql_free_result(res);
+ if (error)
+ {
+ sql_print_error("Error updating slave list:",error);
+ return 1;
+ }
+ return 0;
+}
+
+int find_recovery_captain(THD* thd, MYSQL* mysql)
+{
+
+ return 0;
+}
+
+pthread_handler_decl(handle_failsafe_rpl,arg)
+{
+ DBUG_ENTER("handle_failsafe_rpl");
+ THD *thd = new THD;
+ thd->thread_stack = (char*)&thd;
+ MYSQL* recovery_captain = 0;
+ pthread_detach_this_thread();
+ if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
+ {
+ sql_print_error("Could not initialize failsafe replication thread");
+ goto err;
+ }
+ pthread_mutex_lock(&LOCK_rpl_status);
+ while (!thd->killed && !abort_loop)
+ {
+ bool break_req_chain = 0;
+ const char* msg = thd->enter_cond(&COND_rpl_status,
+ &LOCK_rpl_status, "Waiting for request");
+ pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
+ thd->proc_info="Processling request";
+ while (!break_req_chain)
+ {
+ switch (rpl_status)
+ {
+ case RPL_LOST_SOLDIER:
+ if (find_recovery_captain(thd, recovery_captain))
+ rpl_status=RPL_TROOP_SOLDIER;
+ else
+ rpl_status=RPL_RECOVERY_CAPTAIN;
+ break_req_chain=1; /* for now until other states are implemented */
+ break;
+ default:
+ break_req_chain=1;
+ break;
+ }
+ }
+ thd->exit_cond(msg);
+ }
pthread_mutex_unlock(&LOCK_rpl_status);
+err:
+ if (recovery_captain)
+ mc_mysql_close(recovery_captain);
+ delete thd;
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(0);
}
+
+
+
--- 1.3/sql/repl_failsafe.h Thu Oct 11 13:54:05 2001
+++ 1.4/sql/repl_failsafe.h Sat Oct 20 07:07:21 2001
@@ -1,6 +1,8 @@
#ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
+#include "mysql.h"
+
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
@@ -10,7 +12,11 @@
extern pthread_mutex_t LOCK_rpl_status;
extern pthread_cond_t COND_rpl_status;
extern TYPELIB rpl_role_typelib, rpl_status_typelib;
+extern uint rpl_recovery_rank;
extern const char* rpl_role_type[], *rpl_status_type[];
+pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
+int find_recovery_captain(THD* thd, MYSQL* mysql);
+int update_slave_list(MYSQL* mysql);
#endif
--- 1.60/sql/sql_repl.cc Fri Oct 12 09:37:24 2001
+++ 1.61/sql/sql_repl.cc Sat Oct 20 07:07:21 2001
@@ -140,6 +140,11 @@
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
+ p += 2;
+ si->rpl_recovery_rank = uint4korr(p);
+ p += 4;
+ if (!(si->master_id = uint4korr(p)))
+ si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
@@ -534,6 +539,7 @@
DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock);
+ DBUG_PRINT("wait",("binary log received update"));
break;
default:
@@ -1253,6 +1259,8 @@
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
+ field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
+ field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
@@ -1271,6 +1279,8 @@
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
+ net_store_data(packet, si->rpl_recovery_rank);
+ net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
--- 1.13/sql/sql_repl.h Fri Oct 12 09:37:24 2001
+++ 1.14/sql/sql_repl.h Sat Oct 20 07:07:21 2001
@@ -6,6 +6,7 @@
typedef struct st_slave_info
{
uint32 server_id;
+ uint32 rpl_recovery_rank, master_id;
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
| Thread |
|---|
| • bk commit into 4.0 tree | sasha | 20 Oct |