List:Commits« Previous MessageNext Message »
From:He Zhenxing Date:October 12 2009 12:55pm
Subject:bzr commit into mysql-5.1-rep-semisync branch (zhenxing.he:3114) Bug#45848
View as plain text  
#At file:///media/sdb2/hezx/work/mysql/bzrwork/semisync/5.1-rep-semisync/ based on revid:zhenxing.he@stripped

 3114 He Zhenxing	2009-10-12
      Backport BUG#45848 Semisynchronous replication internals are visible in SHOW PROCESSLIST and logs
      
      Semi-sync uses an extra connection from slave to master to send
      replies, this is a normal client connection, and used a normal
      SET query to set the reply information on master, which is visible
      to user and may cause some confusion and complaining.
      
      This problem is fixed by using the method of sending reply by
      using the same connection that is used by master dump thread to
      send binlog to slave. Since now the semi-sync plugins are integrated
      with the server code, it is not a problem to use the internal net
      interfaces to do this.
      
      The master dump thread will mark the event requires a reply and
      wait for the reply when the event just sent is the last event
      of a transaction and semi-sync status is ON; And the slave will
      send a reply to master when it received such an event that requires
      a reply.

    M  mysql-test/suite/rpl/r/rpl_semi_sync.result
    M  mysql-test/suite/rpl/t/rpl_semi_sync.test
    M  plugin/semisync/Makefile.am
    M  plugin/semisync/semisync.h
    M  plugin/semisync/semisync_master.cc
    M  plugin/semisync/semisync_master.h
    M  plugin/semisync/semisync_master_plugin.cc
    M  plugin/semisync/semisync_slave.cc
    M  plugin/semisync/semisync_slave.h
    M  plugin/semisync/semisync_slave_plugin.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_semi_sync.result'
--- a/mysql-test/suite/rpl/r/rpl_semi_sync.result	2009-09-30 08:09:31 +0000
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result	2009-10-12 12:55:01 +0000
@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx	0
 show status like 'Rpl_semi_sync_master_yes_tx';
 Variable_name	Value
 Rpl_semi_sync_master_yes_tx	0
-create table t1(n int) engine = ENGINE_TYPE;
+create table t1(a int) engine = ENGINE_TYPE;
 [ master state after CREATE TABLE statement ]
 show status like 'Rpl_semi_sync_master_status';
 Variable_name	Value
@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx	0
 show status like 'Rpl_semi_sync_master_yes_tx';
 Variable_name	Value
 Rpl_semi_sync_master_yes_tx	1
+select CONNECTIONS_NORMAL_SLAVE - CONNECTIONS_NORMAL_SLAVE as 'Should be 0';
+Should be 0
+0
 [ insert records to table ]
 [ master status after inserts ]
 show status like 'Rpl_semi_sync_master_status';
@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx	301
 show status like 'Rpl_semi_sync_slave_status';
 Variable_name	Value
 Rpl_semi_sync_slave_status	ON
-select count(distinct n) from t1;
-count(distinct n)
+select count(distinct a) from t1;
+count(distinct a)
 300
-select min(n) from t1;
-min(n)
+select min(a) from t1;
+min(a)
 1
-select max(n) from t1;
-max(n)
+select max(a) from t1;
+max(a)
 300
+#
+# Test semi-sync master will switch OFF after one transacton
+# timeout waiting for slave reply.
+#
 include/stop_slave.inc
 [ on master ]
 [ master status should be ON ]
@@ -134,7 +141,16 @@ Variable_name	Value
 Rpl_semi_sync_master_clients	1
 [ semi-sync replication of these transactions will fail ]
 insert into t1 values (500);
-delete from t1 where n < 500;
+[ master status should be OFF ]
+show status like 'Rpl_semi_sync_master_status';
+Variable_name	Value
+Rpl_semi_sync_master_status	OFF
+show status like 'Rpl_semi_sync_master_no_tx';
+Variable_name	Value
+Rpl_semi_sync_master_no_tx	1
+show status like 'Rpl_semi_sync_master_yes_tx';
+Variable_name	Value
+Rpl_semi_sync_master_yes_tx	301
 insert into t1 values (100);
 [ master status should be OFF ]
 show status like 'Rpl_semi_sync_master_status';
@@ -142,10 +158,13 @@ Variable_name	Value
 Rpl_semi_sync_master_status	OFF
 show status like 'Rpl_semi_sync_master_no_tx';
 Variable_name	Value
-Rpl_semi_sync_master_no_tx	3
+Rpl_semi_sync_master_no_tx	302
 show status like 'Rpl_semi_sync_master_yes_tx';
 Variable_name	Value
 Rpl_semi_sync_master_yes_tx	301
+#
+# Test semi-sync status on master will be ON again when slave catches up
+#
 [ on slave ]
 [ slave status should be OFF ]
 show status like 'Rpl_semi_sync_slave_status';
@@ -156,31 +175,33 @@ include/start_slave.inc
 show status like 'Rpl_semi_sync_slave_status';
 Variable_name	Value
 Rpl_semi_sync_slave_status	ON
-select count(distinct n) from t1;
-count(distinct n)
+select count(distinct a) from t1;
+count(distinct a)
 2
-select min(n) from t1;
-min(n)
+select min(a) from t1;
+min(a)
 100
-select max(n) from t1;
-max(n)
+select max(a) from t1;
+max(a)
 500
 [ on master ]
-[ do something to activate semi-sync ]
-drop table t1;
-[ master status should be ON again ]
+[ master status should be ON again after slave catches up ]
 show status like 'Rpl_semi_sync_master_status';
 Variable_name	Value
 Rpl_semi_sync_master_status	ON
 show status like 'Rpl_semi_sync_master_no_tx';
 Variable_name	Value
-Rpl_semi_sync_master_no_tx	3
+Rpl_semi_sync_master_no_tx	302
 show status like 'Rpl_semi_sync_master_yes_tx';
 Variable_name	Value
-Rpl_semi_sync_master_yes_tx	302
+Rpl_semi_sync_master_yes_tx	301
 show status like 'Rpl_semi_sync_master_clients';
 Variable_name	Value
 Rpl_semi_sync_master_clients	1
+#
+# Test disable/enable master semi-sync on the fly.
+#
+drop table t1;
 [ on slave ]
 include/stop_slave.inc
 [ on master ]
@@ -206,6 +227,9 @@ rpl_semi_sync_master_enabled	ON
 show status like 'Rpl_semi_sync_master_status';
 Variable_name	Value
 Rpl_semi_sync_master_status	ON
+#
+# Test RESET MASTER/SLAVE
+#
 [ on slave ]
 include/start_slave.inc
 [ on master ]

=== modified file 'mysql-test/suite/rpl/t/rpl_semi_sync.test'
--- a/mysql-test/suite/rpl/t/rpl_semi_sync.test	2009-09-30 08:09:31 +0000
+++ b/mysql-test/suite/rpl/t/rpl_semi_sync.test	2009-10-12 12:55:01 +0000
@@ -7,6 +7,11 @@ source include/master-slave.inc;
 let $engine_type= InnoDB;
 #let $engine_type= MyISAM;
 
+# After fix of BUG#45848, semi-sync slave should not create any extra
+# connections on master, save the count of connections before start
+# semi-sync slave for comparison below.
+let $_connections_normal_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
+
 # Suppress warnings that might be generated during the test
 disable_query_log;
 connection master;
@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_n
 show status like 'Rpl_semi_sync_master_yes_tx';
 
 replace_result $engine_type ENGINE_TYPE;
-eval create table t1(n int) engine = $engine_type;
+eval create table t1(a int) engine = $engine_type;
 
 echo [ master state after CREATE TABLE statement ];
 show status like 'Rpl_semi_sync_master_status';
 show status like 'Rpl_semi_sync_master_no_tx';
 show status like 'Rpl_semi_sync_master_yes_tx';
 
+# After fix of BUG#45848, semi-sync slave should not create any extra
+# connections on master.
+let $_connections_semisync_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
+replace_result $_connections_semisync_slave CONNECTIONS_SEMISYNC_SLAVE;
+replace_result $_connections_normal_slave CONNECTIONS_NORMAL_SLAVE;
+eval select $_connections_semisync_slave - $_connections_normal_slave as 'Should be 0';
+
 let $i=300;
 echo [ insert records to table ];
 disable_query_log;
@@ -178,10 +190,15 @@ echo [ on slave ];
 echo [ slave status after replicated inserts ];
 show status like 'Rpl_semi_sync_slave_status';
 
-select count(distinct n) from t1;
-select min(n) from t1;
-select max(n) from t1;
+select count(distinct a) from t1;
+select min(a) from t1;
+select max(a) from t1;
 
+--echo #
+--echo # Test semi-sync master will switch OFF after one transacton
+--echo # timeout waiting for slave reply.
+--echo #
+connection slave;
 source include/stop_slave.inc;
 
 connection master;
@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_c
 
 echo [ semi-sync replication of these transactions will fail ];
 insert into t1 values (500);
-delete from t1 where n < 500;
-insert into t1 values (100);
+
+# Wait for the semi-sync replication of this transaction to timeout
+let $status_var= Rpl_semi_sync_master_status;
+let $status_var_value= OFF;
+source include/wait_for_status_var.inc;
 
 # The second semi-sync check should be off because one transaction
 # times out during waiting.
@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_s
 show status like 'Rpl_semi_sync_master_no_tx';
 show status like 'Rpl_semi_sync_master_yes_tx';
 
+# Semi-sync status on master is now OFF, so all these transactions
+# will be replicated asynchronously.
+let $i=300;
+disable_query_log;
+while ($i)
+{
+  eval delete from t1 where a=$i;
+  dec $i;
+}
+enable_query_log;
+
+insert into t1 values (100);
+
+echo [ master status should be OFF ];
+show status like 'Rpl_semi_sync_master_status';
+show status like 'Rpl_semi_sync_master_no_tx';
+show status like 'Rpl_semi_sync_master_yes_tx';
+
+--echo #
+--echo # Test semi-sync status on master will be ON again when slave catches up
+--echo #
+
 # Save the master position for later use.
 save_master_pos;
 
@@ -221,23 +263,25 @@ sync_with_master;
 echo [ slave status should be ON ];
 show status like 'Rpl_semi_sync_slave_status';
 
-select count(distinct n) from t1;
-select min(n) from t1;
-select max(n) from t1;
+select count(distinct a) from t1;
+select min(a) from t1;
+select max(a) from t1;
 
 connection master;
 echo [ on master ];
 
-echo [ do something to activate semi-sync ];
-drop table t1;
-
-# The third semi-sync check should be on again.
-echo [ master status should be ON again ];
+# The master semi-sync status should be on again after slave catches up.
+echo [ master status should be ON again after slave catches up ];
 show status like 'Rpl_semi_sync_master_status';
 show status like 'Rpl_semi_sync_master_no_tx';
 show status like 'Rpl_semi_sync_master_yes_tx';
 show status like 'Rpl_semi_sync_master_clients';
 
+--echo #
+--echo # Test disable/enable master semi-sync on the fly.
+--echo #
+
+drop table t1;
 sync_slave_with_master;
 echo [ on slave ];
 
@@ -259,6 +303,10 @@ set global rpl_semi_sync_master_enabled=
 show variables like 'rpl_semi_sync_master_enabled';
 show status like 'Rpl_semi_sync_master_status';
 
+--echo #
+--echo # Test RESET MASTER/SLAVE
+--echo #
+
 connection slave;
 echo [ on slave ];
 
@@ -512,6 +560,8 @@ source include/start_slave.inc;
 
 connection master;
 drop table t1;
+sync_slave_with_master;
+
+connection master;
 drop user rpl@stripped;
 flush privileges;
-sync_slave_with_master;

=== modified file 'plugin/semisync/Makefile.am'
--- a/plugin/semisync/Makefile.am	2009-09-26 04:49:49 +0000
+++ b/plugin/semisync/Makefile.am	2009-10-12 12:55:01 +0000
@@ -18,6 +18,7 @@
 pkgplugindir =		$(pkglibdir)/plugin
 INCLUDES =              -I$(top_srcdir)/include \
 			-I$(top_srcdir)/sql \
+			-I$(top_srcdir)/regex \
 			-I$(srcdir)
 
 noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h

=== modified file 'plugin/semisync/semisync.h'
--- a/plugin/semisync/semisync.h	2009-09-26 04:49:49 +0000
+++ b/plugin/semisync/semisync.h	2009-10-12 12:55:01 +0000
@@ -18,25 +18,9 @@
 #ifndef SEMISYNC_H
 #define SEMISYNC_H
 
-#include <stdint.h>
-#include <string.h>
-#include <assert.h>
-#include <sys/time.h>
-#include <time.h>
-#include <stdio.h>
-#include <pthread.h>
-#include <mysql.h>
-
-typedef uint32_t uint32;
-typedef unsigned long long my_off_t;
-#define FN_REFLEN	512	/* Max length of full path-name */
-void sql_print_error(const char *format, ...);
-void sql_print_warning(const char *format, ...);
-void sql_print_information(const char *format, ...);
-extern unsigned long max_connections;
-
 #define MYSQL_SERVER
 #define HAVE_REPLICATION
+#include <mysql_priv.h>
 #include <my_global.h>
 #include <my_pthread.h>
 #include <mysql/plugin.h>
@@ -92,4 +76,16 @@ public:
   static const unsigned char kPacketFlagSync;
 };
 
+/* The layout of a semisync slave reply packet:
+   1 byte for the magic num
+   8 bytes for the binlog positon
+   n bytes for the binlog filename, terminated with a '\0'
+*/
+#define REPLY_MAGIC_NUM_LEN 1
+#define REPLY_BINLOG_POS_LEN 8
+#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
+#define REPLY_MAGIC_NUM_OFFSET 0
+#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
+#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
+
 #endif /* SEMISYNC_H */

=== modified file 'plugin/semisync/semisync_master.cc'
--- a/plugin/semisync/semisync_master.cc	2009-10-03 05:00:05 +0000
+++ b/plugin/semisync/semisync_master.cc	2009-10-12 12:55:01 +0000
@@ -546,19 +546,6 @@ bool ReplSemiSyncMaster::is_semi_sync_sl
   return val;
 }
 
-int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
-{
-  char log_name[FN_REFLEN];
-  char *endptr;
-  my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
-  if (!log_pos || !endptr || *endptr != ':' )
-    return 1;
-  endptr++;                                     // skip the ':' seperator
-  strncpy(log_name, endptr, FN_REFLEN);
-  uint32 server_id= 0;
-  return reportReplyBinlog(server_id, log_name, log_pos);
-}
-
 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
 					  const char *log_file_name,
 					  my_off_t log_file_pos)
@@ -679,7 +666,7 @@ int ReplSemiSyncMaster::commitTrx(const 
                             "Waiting for semi-sync ACK from slave");
 
     /* This is the real check inside the mutex. */
-    if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients)
+    if (!getMasterEnabled() || !is_on())
       goto l_end;
 
     if (trace_level_ & kTraceDetail)
@@ -691,17 +678,20 @@ int ReplSemiSyncMaster::commitTrx(const 
 
     while (is_on())
     {
-      int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
-                                     trx_wait_binlog_name, trx_wait_binlog_pos);
-      if (cmp >= 0)
+      if (reply_file_name_inited_)
       {
-        /* We have already sent the relevant binlog to the slave: no need to
-         * wait here.
-         */
-        if (trace_level_ & kTraceDetail)
-          sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
-                                kWho, reply_file_name_, (unsigned long)reply_file_pos_);
-        break;
+        int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+                                       trx_wait_binlog_name, trx_wait_binlog_pos);
+        if (cmp >= 0)
+        {
+          /* We have already sent the relevant binlog to the slave: no need to
+           * wait here.
+           */
+          if (trace_level_ & kTraceDetail)
+            sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
+                                  kWho, reply_file_name_, (unsigned long)reply_file_pos_);
+          break;
+        }
       }
 
       /* Let us update the info about the minimum binlog position of waiting
@@ -709,8 +699,8 @@ int ReplSemiSyncMaster::commitTrx(const 
        */
       if (wait_file_name_inited_)
       {
-        cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
-                                   wait_file_name_, wait_file_pos_);
+        int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
+                                       wait_file_name_, wait_file_pos_);
         if (cmp <= 0)
 	{
           /* This thd has a lower position, let's update the minimum info. */
@@ -824,6 +814,13 @@ int ReplSemiSyncMaster::commitTrx(const 
     }
 
   l_end:
+    /*
+      At this point, the binlog file and position of this transaction
+      must have been removed from ActiveTranx.
+    */
+    assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
+                                             trx_wait_binlog_pos));
+    
     /* Update the status counter. */
     if (is_on() && rpl_semi_sync_master_clients)
       enabled_transactions_++;
@@ -1045,7 +1042,9 @@ int ReplSemiSyncMaster::updateSyncHeader
    * reserve the packet header.
    */
   if (sync)
+  {
     (packet)[2] = kPacketFlagSync;
+  }
 
   return function_exit(kWho, 0);
 }
@@ -1098,8 +1097,8 @@ int ReplSemiSyncMaster::writeTranxInBinl
         if insert tranx_node failed, print a warning message
         and turn off semi-sync
       */
-      sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul",
-                        log_file_name, log_file_pos);
+      sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
+                        log_file_name, (ulong)log_file_pos);
       switch_off();
     }
   }
@@ -1110,6 +1109,113 @@ int ReplSemiSyncMaster::writeTranxInBinl
   return function_exit(kWho, result);
 }
 
+int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
+                                       const char *event_buf)
+{
+  const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
+  const unsigned char *packet;
+  char     log_file_name[FN_REFLEN];
+  my_off_t log_file_pos;
+  ulong    packet_len;
+  int      result = -1;
+
+  struct timeval start_tv;
+  int   start_time_err= 0;
+  ulong trc_level = trace_level_;
+
+  function_enter(kWho);
+
+  assert((unsigned char)event_buf[1] == kPacketMagicNum);
+  if ((unsigned char)event_buf[2] != kPacketFlagSync)
+  {
+    /* current event does not require reply */
+    result = 0;
+    goto l_end;
+  }
+
+  if (trc_level & kTraceNetWait)
+    start_time_err = gettimeofday(&start_tv, 0);
+
+  /* We flush to make sure that the current event is sent to the network,
+   * instead of being buffered in the TCP/IP stack.
+   */
+  if (net_flush(net))
+  {
+    sql_print_error("Semi-sync master failed on net_flush() "
+                    "before waiting for slave reply");
+    goto l_end;
+  }
+
+  net_clear(net, 0);
+  if (trc_level & kTraceDetail)
+    sql_print_information("%s: Wait for replica's reply", kWho);
+
+  /* Wait for the network here.  Though binlog dump thread can indefinitely wait
+   * here, transactions would not wait indefintely.
+   * Transactions wait on binlog replies detected by binlog dump threads.  If
+   * binlog dump threads wait too long, transactions will timeout and continue.
+   */
+  packet_len = my_net_read(net);
+
+  if (trc_level & kTraceNetWait)
+  {
+    if (start_time_err != 0)
+    {
+      sql_print_error("Semi-sync master wait for reply "
+                      "gettimeofday fail to get start time");
+      timefunc_fails_++;
+    }
+    else
+    {
+      int wait_time;
+
+      wait_time = getWaitTime(start_tv);
+      if (wait_time < 0)
+      {
+        sql_print_error("Semi-sync master wait for reply "
+                        "gettimeofday fail to get wait time.");
+        timefunc_fails_++;
+      }
+      else
+      {
+        total_net_wait_num_++;
+        total_net_wait_time_ += wait_time;
+      }
+    }
+  }
+
+  if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
+  {
+    if (packet_len == packet_error)
+      sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
+                      net->last_error, net->last_errno);
+    else
+      sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
+                      net->last_error, net->last_errno);
+    goto l_end;
+  }
+
+  packet = net->read_pos;
+  if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
+  {
+    sql_print_error("Read semi-sync reply magic number error");
+    goto l_end;
+  }
+
+  log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
+  strcpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET);
+
+  if (trc_level & kTraceDetail)
+    sql_print_information("%s: Got reply (%s, %lu)",
+                          kWho, log_file_name, (ulong)log_file_pos);
+
+  result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+
+ l_end:
+  return function_exit(kWho, result);
+}
+
+
 int ReplSemiSyncMaster::resetMaster()
 {
   const char *kWho = "ReplSemiSyncMaster::resetMaster";

=== modified file 'plugin/semisync/semisync_master.h'
--- a/plugin/semisync/semisync_master.h	2009-10-03 05:00:05 +0000
+++ b/plugin/semisync/semisync_master.h	2009-10-12 12:55:01 +0000
@@ -81,7 +81,7 @@ public:
   /* Insert an active transaction node with the specified position.
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
 
@@ -91,7 +91,7 @@ public:
    * list and the hash table will be reset to empty.
    * 
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int clear_active_tranx_nodes(const char *log_file_name,
 			       my_off_t    log_file_pos);
@@ -253,8 +253,6 @@ class ReplSemiSyncMaster
   /* Is the slave servered by the thread requested semi-sync */
   bool is_semi_sync_slave();
 
-  int reportReplyBinlog(const char *log_file_pos);
-  
   /* In semi-sync replication, reports up to which binlog position we have
    * received replies from the slave indicating that it already get the events.
    *
@@ -265,7 +263,7 @@ class ReplSemiSyncMaster
    *                        the replies from the slave
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int reportReplyBinlog(uint32 server_id,
                         const char* log_file_name,
@@ -284,7 +282,7 @@ class ReplSemiSyncMaster
    *  trx_wait_binlog_pos  - (IN)  ending position's file offset
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int commitTrx(const char* trx_wait_binlog_name,
                 my_off_t trx_wait_binlog_pos);
@@ -313,7 +311,7 @@ class ReplSemiSyncMaster
    *  server_id     - (IN)  master server id number
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int updateSyncHeader(unsigned char *packet,
                        const char *log_file_name,
@@ -330,10 +328,23 @@ class ReplSemiSyncMaster
    *  log_file_pos  - (IN)  transaction ending position's file offset
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
 
+  /* Read the slave's reply so that we know how much progress the slave makes
+   * on receive replication events.
+   * 
+   * Input:
+   *  net          - (IN)  the connection to master
+   *  server_id    - (IN)  master server id number
+   *  event_buf    - (IN)  pointer to the event packet
+   *
+   * Return:
+   *  0: success;  non-zero: error
+   */
+  int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
+
   /* Export internal statistics for semi-sync replication. */
   void setExportStats();
 

=== modified file 'plugin/semisync/semisync_master_plugin.cc'
--- a/plugin/semisync/semisync_master_plugin.cc	2009-09-26 04:49:49 +0000
+++ b/plugin/semisync/semisync_master_plugin.cc	2009-10-12 12:55:01 +0000
@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_t
   bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
   
   if (semi_sync_slave)
+  {
     /* One more semi-sync slave */
     repl_semisync.add_slave();
+    
+    /*
+      Let's assume this semi-sync slave has already received all
+      binlog events before the filename and position it requests.
+  */
+    repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
+  }
   sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
 			semi_sync_slave ? "semi-sync" : "asynchronous",
 			param->server_id, log_file, (unsigned long)log_pos);
@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_t
 int repl_semi_after_send_event(Binlog_transmit_param *param,
                                const char *event_buf, unsigned long len)
 {
+  if (repl_semisync.is_semi_sync_slave())
+  {
+    THD *thd= current_thd;
+    /*
+      Possible errors in reading slave reply are ignored deliberately
+      because we do not want dump thread to quit on this. Error
+      messages are already reported.
+    */
+    (void) repl_semisync.readSlaveReply(&thd->net,
+                                        param->server_id, event_buf);
+    thd->clear_error();
+  }
   return 0;
 }
 
@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_ena
 				      void *ptr,
 				      const void *val);
 
-static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
-                                                 SYS_VAR *var,
-                                                 void *ptr,
-                                                 const void *val);
-
 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
   PLUGIN_VAR_OPCMDARG,
  "Enable semi-synchronous replication master (disabled by default). ",
@@ -168,22 +183,10 @@ static MYSQL_SYSVAR_ULONG(trace_level, r
   &fix_rpl_semi_sync_master_trace_level, // update
   32, 0, ~0L, 1);
 
-/*
-  Use a SESSION instead of GLOBAL variable for slave to send reply to
-  avoid requiring SUPER privilege.
-*/
-static MYSQL_THDVAR_STR(reply_log_file_pos,
-  PLUGIN_VAR_NOCMDOPT,
-  "The log filename and position slave has queued to relay log.",
-  NULL,             // check
-  &fix_rpl_semi_sync_master_reply_log_file_pos,
-  "");
-
 static SYS_VAR* semi_sync_master_system_vars[]= {
   MYSQL_SYSVAR(enabled),
   MYSQL_SYSVAR(timeout),
   MYSQL_SYSVAR(trace_level),
-  MYSQL_SYSVAR(reply_log_file_pos),
   NULL,
 };
 
@@ -228,19 +231,6 @@ static void fix_rpl_semi_sync_master_ena
   return;
 }
 
-static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
-                                                 SYS_VAR *var,
-                                                 void *ptr,
-                                                 const void *val)
-{
-  const char *log_file_pos= *(char **)val;
-  
-  if (repl_semisync.reportReplyBinlog(log_file_pos))
-    sql_print_error("report slave binlog reply failed.");
-
-  return;
-}
-
 Trans_observer trans_observer = {
   sizeof(Trans_observer),		// len
 

=== modified file 'plugin/semisync/semisync_slave.cc'
--- a/plugin/semisync/semisync_slave.cc	2009-10-03 05:00:05 +0000
+++ b/plugin/semisync/semisync_slave.cc	2009-10-12 12:55:01 +0000
@@ -104,19 +104,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_
   return 0;
 }
 
-int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
+int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
+                                 const char *binlog_filename,
+                                 my_off_t binlog_filepos)
 {
-  char query[FN_REFLEN + 100];
-  sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
-          (unsigned long long)log_pos, log_name);
-  if (mysql_real_query(mysql_reply, query, strlen(query)))
+  const char *kWho = "ReplSemiSyncSlave::slaveReply";
+  NET *net= &mysql->net;
+  uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+                     + REPLY_BINLOG_POS_LEN
+                     + REPLY_BINLOG_NAME_LEN];
+  int  reply_res, name_len = strlen(binlog_filename);
+
+  function_enter(kWho);
+
+  /* Prepare the buffer of the reply. */
+  reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
+  int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
+  memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
+         binlog_filename,
+         name_len + 1 /* including trailing '\0' */);
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: reply (%s, %lu)", kWho,
+                          binlog_filename, (ulong)binlog_filepos);
+
+  net_clear(net, 0);
+  /* Send the reply. */
+  reply_res = my_net_write(net, reply_buffer,
+                           name_len + REPLY_BINLOG_NAME_OFFSET);
+  if (!reply_res)
   {
-    sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
-    mysql_free_result(mysql_store_result(mysql_reply));
-    mysql_close(mysql_reply);
-    mysql_reply= 0;
-    return 1;
+    reply_res = net_flush(net);
+    if (reply_res)
+      sql_print_error("Semi-sync slave net_flush() reply failed");
   }
-  mysql_free_result(mysql_store_result(mysql_reply));
-  return 0;
+  else
+  {
+    sql_print_error("Semi-sync slave send reply failed: %s (%d)",
+                    net->last_error, net->last_errno);
+  }
+
+  return function_exit(kWho, reply_res);
 }

=== modified file 'plugin/semisync/semisync_slave.h'
--- a/plugin/semisync/semisync_slave.h	2009-10-03 05:00:05 +0000
+++ b/plugin/semisync/semisync_slave.h	2009-10-12 12:55:01 +0000
@@ -57,7 +57,7 @@ public:
    *  payload_len - (IN)  payload length
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
   int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
                           const char **payload, unsigned long *payload_len);
@@ -67,13 +67,15 @@ public:
    * binlog position.
    * 
    * Input:
-   *  log_name         - (IN)  the reply point's binlog file name
-   *  log_pos   - (IN)  the reply point's binlog file offset
+   *  mysql            - (IN)  the mysql network connection
+   *  binlog_filename  - (IN)  the reply point's binlog file name
+   *  binlog_filepos   - (IN)  the reply point's binlog file offset
    *
    * Return:
-   *  0: success;  -1 or otherwise: error
+   *  0: success;  non-zero: error
    */
-  int slaveReply(const char *log_name, my_off_t log_pos);
+  int slaveReply(MYSQL *mysql, const char *binlog_filename,
+                 my_off_t binlog_filepos);
 
   /*
     Connect to master for sending reply

=== modified file 'plugin/semisync/semisync_slave_plugin.cc'
--- a/plugin/semisync/semisync_slave_plugin.cc	2009-09-26 04:49:49 +0000
+++ b/plugin/semisync/semisync_slave_plugin.cc	2009-10-12 12:55:01 +0000
@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_
   if (!repl_semisync.getSlaveEnabled())
     return 0;
 
-  /*
-    Create the connection that is used to send slave ACK replies to
-    master
-  */
-  if (repl_semisync.slaveReplyConnect())
-    return 1;
-
   /* Check if master server has semi-sync plugin installed */
   query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
   if (mysql_real_query(mysql, query, strlen(query)) ||
@@ -106,7 +99,8 @@ int repl_semi_slave_queue_event(Binlog_r
 				uint32 flags)
 {
   if (rpl_semi_sync_slave_status && semi_sync_need_reply)
-    return repl_semisync.slaveReply(param->master_log_name,
+    return repl_semisync.slaveReply(param->mysql,
+                                    param->master_log_name,
                                     param->master_log_pos);
   return 0;
 }


Attachment: [text/bzr-bundle] bzr/zhenxing.he@sun.com-20091012125501-j370003q5re10kot.bundle
Thread
bzr commit into mysql-5.1-rep-semisync branch (zhenxing.he:3114) Bug#45848He Zhenxing12 Oct