List:Commits« Previous MessageNext Message »
From:hezx Date:March 13 2008 8:23am
Subject:bk commit into 5.1 tree (hezx:1.2559)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of hezx.  When hezx does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-03-13 16:22:58+08:00, hezx@stripped +16 -0
  Manual merged with semi-sync patch from google, first shot, not compile yet

  sql/Makefile.am@stripped, 2008-03-13 16:22:05+08:00, hezx@stripped +2 -2
    Manual merged with semi-sync patch from google

  sql/handler.cc@stripped, 2008-03-13 16:22:05+08:00, hezx@stripped +52 -0
    Manual merged with semi-sync patch from google

  sql/handler.h@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +4 -0
    Manual merged with semi-sync patch from google

  sql/log.cc@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +42 -11
    Manual merged with semi-sync patch from google

  sql/mysql_priv.h@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +22 -0
    Manual merged with semi-sync patch from google

  sql/mysqld.cc@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +79 -1
    Manual merged with semi-sync patch from google

  sql/repl_semi_sync.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +1224 -0
    Manual merged with semi-sync patch from google

  sql/repl_semi_sync.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +0 -0

  sql/repl_semi_sync.h@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +449 -0
    Manual merged with semi-sync patch from google

  sql/repl_semi_sync.h@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +0 -0

  sql/set_var.cc@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +59 -0
    Manual merged with semi-sync patch from google

  sql/slave.cc@stripped, 2008-03-13 16:22:06+08:00, hezx@stripped +56 -6
    Manual merged with semi-sync patch from google

  sql/sql_class.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +1 -0
    Manual merged with semi-sync patch from google

  sql/sql_class.h@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +1 -0
    Manual merged with semi-sync patch from google

  sql/sql_load.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +3 -0
    Manual merged with semi-sync patch from google

  sql/sql_parse.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +9 -0
    Manual merged with semi-sync patch from google

  sql/sql_repl.cc@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +176 -31
    Manual merged with semi-sync patch from google

  storage/innobase/handler/ha_innodb.h@stripped, 2008-03-13 16:22:07+08:00, hezx@stripped +1 -0
    Manual merged with semi-sync patch from google

diff -Nrup a/sql/Makefile.am b/sql/Makefile.am
--- a/sql/Makefile.am	2007-12-21 04:24:01 +08:00
+++ b/sql/Makefile.am	2008-03-13 16:22:05 +08:00
@@ -75,7 +75,7 @@ noinst_HEADERS =	item.h item_func.h item
 			sql_plugin.h authors.h \
 			event_data_objects.h event_scheduler.h \
 			sql_partition.h partition_info.h partition_element.h \
-			contributors.h sql_servers.h
+			contributors.h sql_servers.h repl_semi_sync.h
 
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -119,7 +119,7 @@ mysqld_SOURCES =	sql_lex.cc sql_handler.
                         event_queue.cc event_db_repository.cc events.cc \
 			sql_plugin.cc sql_binlog.cc \
 			sql_builtin.cc sql_tablespace.cc partition_info.cc \
-			sql_servers.cc
+			sql_servers.cc repl_semi_sync.cc
 
 nodist_mysqld_SOURCES =	mini_client_errors.c pack.c client.c my_time.c my_user.c 
 
diff -Nrup a/sql/handler.cc b/sql/handler.cc
--- a/sql/handler.cc	2008-03-05 17:15:45 +08:00
+++ b/sql/handler.cc	2008-03-13 16:22:05 +08:00
@@ -4249,6 +4249,58 @@ TYPELIB *ha_known_exts(void)
   return &known_extensions;
 }
 
+/*********************************************************************
+This is called when MySQL writes the binlog entry for the current
+transaction. Writes to the InnoDB tablespace info which tells where the
+MySQL binlog entry for the current transaction ended. Also commits the
+transaction inside InnoDB but does NOT flush InnoDB log files to disk.
+To flush you have to call innobase_flush_log_to_disk. We have separated
+flushing to eliminate the bottleneck of LOCK_log in log.cc which disabled
+InnoDB's group commit capability. */
+
+int ha_report_binlog_offset_and_commit(THD *thd,
+               char *log_file_name,
+               my_off_t end_offset)
+{
+  int  error= 0;
+  void *trx = thd->ha_data[innobase_hton.slot];
+
+  if (trx)
+  {
+    if ((error=innobase_report_binlog_offset_and_commit(thd, trx,
+              log_file_name,
+              end_offset)))
+    {
+      my_error(ER_ERROR_DURING_COMMIT, MYF(0), error);
+      error=1;
+    }
+  }
+  return error;
+}
+
+/*
+  Flushes the handler log files (if my.cnf settings do not free us from it)
+  after we have called ha_report_binlog_offset_and_commit(). To eliminate
+  the bottleneck from the group commit, this should be called when
+  LOCK_log has been released in log.cc.
+
+  arguments:
+  thd:           the thread handle of the current connection
+  return value:  always 0
+*/
+
+int ha_commit_complete(THD *thd)
+{
+  void *trx = thd->ha_data[innobase_hton.slot];
+
+  if (trx)
+  {
+    innobase_commit_complete(thd);
+  }
+  return 0;
+}
+
+
 
 static bool stat_print(THD *thd, const char *type, uint type_len,
                        const char *file, uint file_len,
diff -Nrup a/sql/handler.h b/sql/handler.h
--- a/sql/handler.h	2008-02-19 19:42:58 +08:00
+++ b/sql/handler.h	2008-03-13 16:22:06 +08:00
@@ -2031,6 +2031,10 @@ void trans_register_ha(THD *thd, bool al
 #define trans_need_2pc(thd, all)                   ((total_ha_2pc > 1) && \
         !((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc))
 
+int ha_report_binlog_offset_and_commit(THD *thd, char *log_file_name,
+                                       my_off_t end_offset);
+int ha_commit_complete(THD *thd);
+
 #ifdef HAVE_NDB_BINLOG
 int ha_reset_logs(THD *thd);
 int ha_binlog_index_purge_file(THD *thd, const char *file);
diff -Nrup a/sql/log.cc b/sql/log.cc
--- a/sql/log.cc	2008-03-05 16:44:37 +08:00
+++ b/sql/log.cc	2008-03-13 16:22:06 +08:00
@@ -3570,6 +3570,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
 bool MYSQL_BIN_LOG::write(Log_event *event_info)
 {
   THD *thd= event_info->thd;
+  bool called_handler_commit=0;
   bool error= 1;
   DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
 
@@ -3734,14 +3735,35 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
     if (event_info->write(file))
       goto err;
 
+    error=0;
     if (file == &log_file) // we are writing to the real log (disk)
     {
       if (flush_and_sync())
 	goto err;
+
+      if (opt_using_transactions &&
+          !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+      {
+        /*
+          LOAD DATA INFILE in AUTOCOMMIT=1 mode writes to the binlog
+          chunks also before it is successfully completed. We only report
+          the binlog write and do the commit inside the transactional table
+          handler if the log event type is appropriate.
+        */
+        
+        if (event_info->get_type_code() == QUERY_EVENT ||
+            event_info->get_type_code() == EXEC_LOAD_EVENT)
+        {
+          error = ha_report_binlog_offset_and_commit(thd, log_file_name,
+                                                     file->pos_in_file);
+          if (error == 0)
+            called_handler_commit = 1;
+        }
+      }
+
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
-    error=0;
 
 err:
     if (error)
@@ -3758,6 +3780,15 @@ err:
     ++m_table_map_version;
 
   pthread_mutex_unlock(&LOCK_log);
+
+  /*
+    Flush the transactional handler log file now that we have released
+    LOCK_log; the flush is placed here to eliminate the bottleneck on the
+    group commit
+  */
+  if (called_handler_commit)
+    ha_commit_complete(thd);
+
   DBUG_RETURN(error);
 }
 
@@ -4093,6 +4124,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
         write_error=1;				// Don't give more errors
         goto err;
       }
+
+    if ((ha_report_binlog_offset_and_commit(thd, log_file_name,
+					    log_file.pos_in_file)))
+      goto err;
+
       signal_update();
     }
 
@@ -4115,6 +4151,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
   }
   VOID(pthread_mutex_unlock(&LOCK_log));
 
+  ha_commit_complete(thd);
+
   DBUG_RETURN(0);
 
 err:
@@ -4132,9 +4170,7 @@ err:
   Wait until we get a signal that the binary log has been updated.
 
   @param thd		Thread variable
-  @param is_slave      If 0, the caller is the Binlog_dump thread from master;
-                       if 1, the caller is the SQL thread from the slave. This
-                       influences only thd->proc_info.
+  @param new_msg        the new status of thd->proc_info
 
   @note
     One must have a lock on LOCK_log before calling this function.
@@ -4142,17 +4178,12 @@ err:
     THD::enter_cond() (see NOTES in sql_class.h).
 */
 
-void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave)
+void MYSQL_BIN_LOG::wait_for_update(THD* thd, const char *new_msg)
 {
   const char *old_msg;
   DBUG_ENTER("wait_for_update");
 
-  old_msg= thd->enter_cond(&update_cond, &LOCK_log,
-                           is_slave ?
-                           "Has read all relay log; waiting for the slave I/O "
-                           "thread to update it" :
-                           "Has sent all binlog to slave; waiting for binlog "
-                           "to be updated");
+  old_msg= thd->enter_cond(&update_cond, &LOCK_log, new_msg);
   pthread_cond_wait(&update_cond, &LOCK_log);
   thd->exit_cond(old_msg);
   DBUG_VOID_RETURN;
diff -Nrup a/sql/mysql_priv.h b/sql/mysql_priv.h
--- a/sql/mysql_priv.h	2008-02-27 00:26:40 +08:00
+++ b/sql/mysql_priv.h	2008-03-13 16:22:06 +08:00
@@ -552,6 +552,7 @@ void debug_sync_point(const char* lock_n
 /* BINLOG_DUMP options */
 
 #define BINLOG_DUMP_NON_BLOCK   1
+#define BINLOG_SEMI_SYNC        0x0002
 
 /* sql_show.cc:show_log_files() */
 #define SHOW_LOG_STATUS_FREE "FREE"
@@ -2351,6 +2352,27 @@ bool check_stack_overrun(THD *thd, long 
 inline void kill_delayed_threads(void) {}
 #define check_stack_overrun(A, B, C) 0
 #endif
+
+extern ulong rpl_semi_sync_enabled;
+extern ulong rpl_semi_sync_slave_enabled;
+extern ulong rpl_semi_sync_timeout;
+extern ulong rpl_semi_sync_trace_level;
+extern ulong rpl_semi_sync_status;
+extern ulong rpl_semi_sync_slave_status;
+extern ulong rpl_semi_sync_yes_transactions;
+extern ulong rpl_semi_sync_no_transactions;
+extern ulong rpl_semi_sync_off_times;
+extern ulong rpl_semi_sync_timefunc_fails;
+extern ulong rpl_semi_sync_num_timeouts;
+extern ulong rpl_semi_sync_wait_sessions;
+extern ulong rpl_semi_sync_back_wait_pos;
+extern ulong rpl_semi_sync_trx_wait_time;
+extern ulong rpl_semi_sync_net_wait_time;
+extern ulonglong rpl_semi_sync_net_wait_num;
+extern ulonglong rpl_semi_sync_trx_wait_num;
+extern ulonglong rpl_semi_sync_net_wait_total_time;
+extern ulonglong rpl_semi_sync_trx_wait_total_time;
+extern ulong rpl_semi_sync_clients;
 
 /* Used by handlers to store things in schema tables */
 #define IS_FILES_FILE_ID              0
diff -Nrup a/sql/mysqld.cc b/sql/mysqld.cc
--- a/sql/mysqld.cc	2008-02-27 00:26:40 +08:00
+++ b/sql/mysqld.cc	2008-03-13 16:22:06 +08:00
@@ -33,6 +33,7 @@
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
 #endif
+#include "repl_semi_sync.h"
 
 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #if defined(NOT_ENOUGH_TESTED) \
@@ -645,6 +646,30 @@ static pthread_t select_thread;
 static uint thr_kill_signal;
 #endif
 
+ReplSemiSync semi_sync_replicator;
+
+/* This indicates whether semi-synchronous replication is enabled. */
+ulong rpl_semi_sync_enabled;
+ulong rpl_semi_sync_slave_enabled;
+ulong rpl_semi_sync_timeout;
+ulong rpl_semi_sync_trace_level;
+ulong rpl_semi_sync_status           = 0;
+ulong rpl_semi_sync_slave_status     = 0;
+ulong rpl_semi_sync_yes_transactions = 0;
+ulong rpl_semi_sync_no_transactions  = 0;
+ulong rpl_semi_sync_off_times        = 0;
+ulong rpl_semi_sync_timefunc_fails   = 0;
+ulong rpl_semi_sync_num_timeouts     = 0;
+ulong rpl_semi_sync_wait_sessions    = 0;
+ulong rpl_semi_sync_back_wait_pos    = 0;
+ulong rpl_semi_sync_trx_wait_time    = 0;
+ulonglong rpl_semi_sync_trx_wait_num = 0;
+ulong rpl_semi_sync_net_wait_time    = 0;
+ulonglong rpl_semi_sync_net_wait_num = 0;
+ulong rpl_semi_sync_clients          = 0;
+ulonglong rpl_semi_sync_net_wait_total_time = 0;
+ulonglong rpl_semi_sync_trx_wait_total_time = 0;
+
 /* OS specific variables */
 
 #ifdef __WIN__
@@ -3195,6 +3220,10 @@ static int init_common_variables(const c
   DBUG_PRINT("info",("%s  Ver %s for %s on %s\n",my_progname,
 		     server_version, SYSTEM_TYPE,MACHINE_TYPE));
 
+  /* Must be called after set_options() and MY_INIT(). */
+  if (semi_sync_replicator.initObject() != 0)
+    unireg_abort(1);
+
 #ifdef HAVE_LARGE_PAGES
   /* Initialize large page size */
   if (opt_large_pages && (opt_large_page_size= my_get_large_page_size()))
@@ -5485,7 +5514,11 @@ enum options_mysqld
   OPT_MIN_EXAMINED_ROW_LIMIT,
   OPT_LOG_SLOW_SLAVE_STATEMENTS,
   OPT_OLD_MODE,
-  OPT_SLAVE_EXEC_MODE
+  OPT_SLAVE_EXEC_MODE,
+  OPT_RPL_SEMI_SYNC,
+  OPT_RPL_SEMI_SYNC_SLAVE,
+  OPT_RPL_SEMI_SYNC_TIMEOUT,
+  OPT_RPL_SEMI_SYNC_TRACE
 };
 
 
@@ -6098,6 +6131,29 @@ Can't be set to 1 if --log-slave-updates
   {"rpl-recovery-rank", OPT_RPL_RECOVERY_RANK, "Undocumented.",
    (uchar**) &rpl_recovery_rank, (uchar**) &rpl_recovery_rank, 0, GET_ULONG,
    REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+  {"rpl_semi_sync_enabled", OPT_RPL_SEMI_SYNC,
+   "1 = Enable semi-synchronous replication. 0 = Disable it",
+   (gptr*) &rpl_semi_sync_enabled,
+   (gptr*) &rpl_semi_sync_enabled, 0, GET_ULONG, REQUIRED_ARG,
+   0, 0, 1, 0, 1, 0},
+  {"rpl_semi_sync_slave_enabled", OPT_RPL_SEMI_SYNC_SLAVE,
+   "1 = Enable semi-synchronous in the slave database.  The slave will be "
+   "the semi-sync replication target",
+   (gptr*) &rpl_semi_sync_slave_enabled,
+   (gptr*) &rpl_semi_sync_slave_enabled, 0, GET_ULONG, REQUIRED_ARG,
+   0, 0, 1, 0, 1, 0},
+  {"rpl_semi_sync_timeout", OPT_RPL_SEMI_SYNC_TIMEOUT,
+   "The timeout value (in ms) for semi-synchronous replication in the master",
+   (gptr*) &rpl_semi_sync_timeout,
+   (gptr*) &rpl_semi_sync_timeout,
+    0, GET_ULONG, REQUIRED_ARG, 10, 0, ~0L, 0, 1, 0},
+  {"rpl_semi_sync_trace_level", OPT_RPL_SEMI_SYNC_TRACE,
+   "The tracing level for semi-sync replication.",
+   (gptr*) &rpl_semi_sync_trace_level,
+   (gptr*) &rpl_semi_sync_trace_level,
+    0, GET_ULONG, REQUIRED_ARG,
+   32,  /* By default, we trace the network waiting time. */
+   0, ~0L, 0, 1, 0},
   {"safe-mode", OPT_SAFE, "Skip some optimize stages (for testing).",
    0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
 #ifndef TO_BE_DELETED
@@ -7215,6 +7271,28 @@ SHOW_VAR status_vars[]= {
   {"Questions",                (char*) &show_question,            SHOW_FUNC},
 #ifdef HAVE_REPLICATION
   {"Rpl_status",               (char*) &show_rpl_status,          SHOW_FUNC},
+  {"Rpl_semi_sync_clients",    (char*) &rpl_semi_sync_clients,         SHOW_LONG},
+  {"Rpl_semi_sync_net_avg_wait_time(us)",
+                               (char*) &rpl_semi_sync_net_wait_time,   SHOW_LONG},
+  {"Rpl_semi_sync_net_wait_time",
+                         (char*) &rpl_semi_sync_net_wait_total_time,   SHOW_LONGLONG},
+  {"Rpl_semi_sync_net_waits",  (char*) &rpl_semi_sync_net_wait_num,    SHOW_LONGLONG},
+  {"Rpl_semi_sync_no_times",   (char*) &rpl_semi_sync_off_times,       SHOW_LONG},
+  {"Rpl_semi_sync_no_tx",      (char*) &rpl_semi_sync_no_transactions, SHOW_LONG},
+  {"Rpl_semi_sync_status",     (char*) &rpl_semi_sync_status,          SHOW_LONG},
+  {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status,  SHOW_LONG},
+  {"Rpl_semi_sync_timefunc_failures",
+                               (char*) &rpl_semi_sync_timefunc_fails,  SHOW_LONG},
+  {"Rpl_semi_sync_tx_avg_wait_time(us)",
+                               (char*) &rpl_semi_sync_trx_wait_time,   SHOW_LONG},
+  {"Rpl_semi_sync_tx_wait_time",
+                         (char*) &rpl_semi_sync_trx_wait_total_time,   SHOW_LONGLONG},
+  {"Rpl_semi_sync_tx_waits",   (char*) &rpl_semi_sync_trx_wait_num,    SHOW_LONGLONG},
+  {"Rpl_semi_sync_wait_pos_backtraverse",
+                               (char*) &rpl_semi_sync_back_wait_pos,   SHOW_LONG},
+  {"Rpl_semi_sync_wait_sessions",
+                               (char*) &rpl_semi_sync_wait_sessions,   SHOW_LONG},
+  {"Rpl_semi_sync_yes_tx",     (char*) &rpl_semi_sync_yes_transactions, SHOW_LONG},
 #endif
   {"Select_full_join",         (char*) offsetof(STATUS_VAR, select_full_join_count), SHOW_LONG_STATUS},
   {"Select_full_range_join",   (char*) offsetof(STATUS_VAR, select_full_range_join_count), SHOW_LONG_STATUS},
diff -Nrup a/sql/repl_semi_sync.cc b/sql/repl_semi_sync.cc
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/sql/repl_semi_sync.cc	2008-03-13 16:22:07 +08:00
@@ -0,0 +1,1224 @@
+/* Copyright (C) 2007 Google Inc.
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+/* The file defines two classes that implement semi-sync replication based on
+ * MySQL's asynchronous replication:
+ *  . ReplSemiSync::ActiveTranx: manage all active transaction nodes
+ *  . ReplSemiSync: the code flow for semi-sync replication
+ * 
+ * By default in semi-sync replication, a transaction waits for 10ms to see
+ * whether the slave has got the transaction.  10ms is based on the assumption
+ * that roundtrip time in one datacenter is less than 1ms and machine
+ * configurations should make the master database and the semi-sync slave
+ * database colocate in one date center.  Otherwise, "rpl_semi_sync_timeout"
+ * should be used to adjust timeout value.
+ *
+ */
+
+#include "mysql_priv.h"
+#include <my_dir.h>
+#include "repl_semi_sync.h"
+
+#define TIME_THOUSAND 1000
+#define TIME_MILLION  1000000
+#define TIME_BILLION  1000000000
+
+const unsigned char ReplSemiSync::kPacketMagicNum = 0xef;
+const unsigned char ReplSemiSync::kPacketFlagSync = 0x01;
+
+const ulong ReplSemiSync::kTraceGeneral  = 0x0001;
+const ulong ReplSemiSync::kTraceDetail   = 0x0010;
+const ulong ReplSemiSync::kTraceNetWait  = 0x0020;
+const ulong ReplSemiSync::kTraceFunction = 0x0040;
+
+const char  ReplSemiSync::kSyncHeader[3] =
+  {0, ReplSemiSync::kPacketMagicNum, 0};
+
+static int getWaitTime(const struct timeval& start_tv);
+
+/*******************************************************************************
+ *
+ * <ReplSemiSync::ActiveTranx> class : manage all active transaction nodes
+ *
+ ******************************************************************************/
+
+ReplSemiSync::ActiveTranx::ActiveTranx(int max_connections,
+                                       pthread_mutex_t *lock,
+                                       ulong *trace_level)
+  : num_transactions_(max_connections),
+    num_entries_(max_connections << 1),
+    lock_(lock), trace_level_(trace_level) {
+  /* Allocate the memory for the array */
+  node_array_ = new TranxNode[num_transactions_];
+  for (int idx = 0; idx < num_transactions_; ++idx) {
+    node_array_[idx].log_pos_     = 0;
+    node_array_[idx].hash_next_   = NULL;
+    node_array_[idx].next_        = node_array_ + idx + 1;
+
+    node_array_[idx].log_name_    = new char[FN_REFLEN];
+    node_array_[idx].log_name_[0] = '\x0';
+  }
+  node_array_[num_transactions_-1].next_ = NULL;
+
+  /* All nodes in the array go to the pool initially. */
+  free_pool_ = node_array_;
+
+  /* No transactions are in the list initially. */
+  trx_front_ = NULL;
+  trx_rear_  = NULL;
+
+  /* Create the hash table to find a transaction's ending event. */
+  trx_htb_ = new TranxNode *[num_entries_];
+  for (int idx = 0; idx < num_entries_; ++idx)
+    trx_htb_[idx] = NULL;
+
+  sql_print_information("Semi-sync replication initialized for %d "
+                        "transactions.", num_transactions_);
+}
+
+ReplSemiSync::ActiveTranx::~ActiveTranx() {
+  for (int idx = 0; idx < num_transactions_; ++idx) {
+    delete node_array_[idx].log_name_;
+    node_array_[idx].log_name_ = NULL;
+  }
+
+  delete [] node_array_;
+  delete [] trx_htb_;
+
+  node_array_       = NULL;
+  trx_htb_          = NULL;
+  num_transactions_ = 0;
+  num_entries_      = 0;
+}
+
+void ReplSemiSync::ActiveTranx::assert_lock_owner() {
+  safe_mutex_assert_owner(lock_);
+}
+
+void ReplSemiSync::ActiveTranx::function_enter(const char *func_name) {
+  if ((*trace_level_) & kTraceFunction)
+    sql_print_information("---> %s enter", func_name);
+}
+
+int ReplSemiSync::ActiveTranx::function_exit(const char *func_name,
+                                             int exit_code) {
+  if ((*trace_level_) & kTraceFunction)
+    sql_print_information("<--- %s exit (%d)", func_name, exit_code);
+  return exit_code;
+}
+
+uint ReplSemiSync::ActiveTranx::calc_hash(const byte *key, uint length) {
+  uint nr = 1, nr2 = 4;
+
+  /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
+  while (length--) {
+    nr  ^= (((nr & 63)+nr2)*((uint) (uchar) *key++))+ (nr << 8);
+    nr2 += 3;
+  }
+  return((uint) nr);
+}
+
+uint ReplSemiSync::ActiveTranx::get_hash_value(const char *log_file_name,
+                                               my_off_t    log_file_pos) {
+  uint hash1 = calc_hash((const byte *)log_file_name,
+                         strlen(log_file_name));
+  uint hash2 = calc_hash((const byte *)(&log_file_pos),
+                         sizeof(log_file_pos));
+
+  return (hash1 + hash2) % num_entries_;
+}
+
+ReplSemiSync::ActiveTranx::TranxNode*
+ReplSemiSync::ActiveTranx::alloc_tranx_node() {
+  TranxNode *ptr = free_pool_;
+
+  if (free_pool_) {
+    free_pool_ = free_pool_->next_;
+    ptr->next_ = NULL;
+    ptr->hash_next_ = NULL;
+  }
+
+  return ptr;
+}
+
+int ReplSemiSync::ActiveTranx::compare(
+      const char *log_file_name1, my_off_t log_file_pos1,
+      const char *log_file_name2, my_off_t log_file_pos2) {
+  int cmp = strcmp(log_file_name1, log_file_name2);
+
+  if (cmp != 0)
+    return cmp;
+
+  if (log_file_pos1 > log_file_pos2)
+    return 1;
+  else if (log_file_pos1 < log_file_pos2)
+    return -1;
+  return 0;
+}
+
+int ReplSemiSync::ActiveTranx::insert_tranx_node(
+        const char *log_file_name, my_off_t log_file_pos) {
+  const char *kWho = "ActiveTranx:insert_tranx_node";
+  TranxNode  *ins_node;
+  int         result = 0;
+  uint        hash_val;
+
+  function_enter(kWho);
+  assert_lock_owner();
+
+  ins_node = alloc_tranx_node();
+  if (!ins_node) {
+    sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
+                    kWho, log_file_name, (ulong)log_file_pos);
+    result = -1;
+    goto l_end;
+  }
+
+  /* insert the binlog position in the active transaction list. */
+  strcpy(ins_node->log_name_, log_file_name);
+  ins_node->log_pos_ = log_file_pos;
+
+  if (!trx_front_) {
+    /* The list is empty. */
+    trx_front_ = trx_rear_ = ins_node;
+  } else {
+    int cmp = compare(ins_node, trx_rear_);
+    if (cmp > 0) {
+      /* Compare with the tail first.  If the transaction happens later in
+       * binlog, then make it the new tail.
+       */
+      trx_rear_->next_ = ins_node;
+      trx_rear_        = ins_node;
+    } else {
+      /* Otherwise, it is an error because the transaction should hold the
+       * mysql_bin_log.LOCK_log when appending events.
+       */
+      sql_print_error("%s: binlog write out-of-order, tail (%s, %llu), "
+                      "new node (%s, %llu)", kWho,
+                      trx_rear_->log_name_, (ulong)trx_rear_->log_pos_,
+                      ins_node->log_name_, (ulong)ins_node->log_pos_);
+      result = -1;
+      goto l_end;
+    }
+  }
+
+  hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
+  ins_node->hash_next_ = trx_htb_[hash_val];
+  trx_htb_[hash_val]   = ins_node;
+
+  if ((*trace_level_) & kTraceDetail)
+    sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
+                          ins_node->log_name_, (ulong)ins_node->log_pos_,
+                          hash_val);
+
+ l_end:
+  return function_exit(kWho, result);
+}
+
+bool ReplSemiSync::ActiveTranx::is_tranx_end_pos(const char *log_file_name,
+                                                 my_off_t    log_file_pos) {
+  const char *kWho = "ReplSemiSync::is_tranx_end_pos";
+  function_enter(kWho);
+
+  uint hash_val = get_hash_value(log_file_name, log_file_pos);
+  TranxNode *entry = trx_htb_[hash_val];
+
+  assert_lock_owner();
+  while (entry != NULL) {
+    if (compare(entry, log_file_name, log_file_pos) == 0) {
+      break;
+    }
+    entry = entry->hash_next_;
+  }
+
+  if ((*trace_level_) & kTraceDetail)
+    sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
+                          log_file_name, (ulong)log_file_pos, hash_val);
+
+  function_exit(kWho, (entry != NULL));
+  return (entry != NULL);
+}
+
+int ReplSemiSync::ActiveTranx::clear_active_tranx_nodes(
+        const char *log_file_name, my_off_t log_file_pos) {
+  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
+  TranxNode *new_front;
+
+  function_enter(kWho);
+
+  /* Must hold the lock during the call. */
+  assert_lock_owner();
+
+  if (log_file_name != NULL) {
+    new_front = trx_front_;
+
+    while (new_front) {
+      if (compare(new_front, log_file_name, log_file_pos) > 0)
+        break;
+      new_front = new_front->next_;
+    }
+  } else {
+    /* If log_file_name is NULL, clear everything. */
+    new_front = NULL;
+  }
+
+  if (new_front == NULL) {
+    /* No active transaction nodes after the call. */
+
+    /* Clear the hash table. */
+    memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
+
+    /* Clear the active transaction list. */
+    if (trx_front_ != NULL) {
+      trx_rear_->next_ = free_pool_;
+      free_pool_ = trx_front_;
+      trx_front_ = NULL;
+      trx_rear_  = NULL;
+    }
+
+    if ((*trace_level_) & kTraceDetail)
+      sql_print_information("%s: free all nodes back to free list", kWho);
+  } else if (new_front != trx_front_) {
+    TranxNode *curr_node, *next_node;
+
+    /* Delete all transaction nodes before the confirmation point. */
+    int n_frees = 0;
+    curr_node = trx_front_;
+    while (curr_node != new_front) {
+      next_node = curr_node->next_;
+
+      /* Put the node in the memory pool. */
+      curr_node->next_ = free_pool_;
+      free_pool_       = curr_node;
+      n_frees++;
+
+      /* Remove the node from the hash table. */
+      uint hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
+      TranxNode **hash_ptr = &(trx_htb_[hash_val]);
+      while ((*hash_ptr) != NULL) {
+        if ((*hash_ptr) == curr_node) {
+          (*hash_ptr) = curr_node->hash_next_;
+          break;
+        }
+        hash_ptr = &((*hash_ptr)->hash_next_);
+      }
+
+      curr_node = next_node;
+    }
+
+    trx_front_ = new_front;
+
+    if ((*trace_level_) & kTraceDetail)
+      sql_print_information("%s: free %d nodes back until pos (%s, %llu)",
+                            kWho, n_frees,
+                            trx_front_->log_name_, (ulong)trx_front_->log_pos_);
+  }
+
+  return function_exit(kWho, 0);
+}
+
+
+/*******************************************************************************
+ *
+ * <ReplSemiSync> class: the basic code layer for sync-replication.
+ *
+ * The most important functions during semi-syn replication listed:
+ *
+ * Master:
+ *  . reportReplyBinlog(): called by the binlog dump thread when it receives
+ *                         the slave's status information.
+ *  . updateSyncHeader():  based on transaction waiting information, decide
+ *                         whether to request the slave to reply.
+ *  . readSlaveReply():    read the slave's sync reply and decide how to
+ *                         resume the waiting transaction threads.
+ *  . writeTraxInBinlog(): called by the transaction thread when it finishes
+ *                         writing all transaction events in binlog.
+ *  . commitTrx():         transaction thread wait for the slave reply.
+ * 
+ * Slave:
+ *  . slaveReadSyncHeader(): read the semi-sync header from the master, get the
+ *                         sync status and get the payload for events.
+ *  . slaveReply():        reply to the master about the replication progress.
+ *
+ ******************************************************************************/
+
+ReplSemiSync::ReplSemiSync()
+  : active_tranxs_(NULL),
+    init_done_(false),
+    reply_file_name_inited_(false),
+    reply_file_pos_(0L),
+    wait_file_name_inited_(false),
+    wait_file_pos_(0),
+    master_enabled_(false),
+    slave_enabled_(false),
+    wait_timeout_(0L),
+    trace_level_(0L),
+    state_(0),
+    enabled_transactions_(0),
+    disabled_transactions_(0),
+    timefunc_fails_(0),
+    switched_off_times_(0),
+    wait_sessions_(0),
+    wait_backtraverse_(0),
+    total_trx_wait_num_(0),
+    total_trx_wait_time_(0),
+    total_net_wait_num_(0),
+    total_net_wait_time_(0),
+    max_transactions_(0L) {
+  strcpy(reply_file_name_, "");
+  strcpy(wait_file_name_, "");
+}
+
+int ReplSemiSync::initObject() {
+  int result;
+  const char *kWho = "ReplSemiSync::initObject";
+
+  if (init_done_) {
+    fprintf(stderr, "%s called twice\n", kWho);
+    unireg_abort(1);
+  }
+  init_done_ = true;
+
+  /* References to the parameter works after set_options(). */
+  setSlaveEnabled(rpl_semi_sync_slave_enabled);
+  setWaitTimeout(rpl_semi_sync_timeout);
+  setTraceLevel(rpl_semi_sync_trace_level);
+  max_transactions_ = (int)max_connections;
+
+  /* Mutex initialization can only be done after MY_INIT(). */
+  pthread_mutex_init(&LOCK_binlog_, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_binlog_send_, NULL);
+
+  if (rpl_semi_sync_enabled)
+    result = enableMaster();
+  else
+    result = disableMaster();
+
+  return result;
+}
+
+int ReplSemiSync::enableMaster() {
+  int result = 0;
+
+  /* Must have the lock when we do enable of disable. */
+  lock();
+
+  if (!getMasterEnabled()) {
+    DBUG_ASSERT(active_tranxs_ == NULL);
+    active_tranxs_ = new ReplSemiSync::ActiveTranx(max_connections,
+                                                   &LOCK_binlog_,
+                                                   &trace_level_);
+    if (active_tranxs_ != NULL) {
+      reply_file_name_inited_  = false;
+      wait_file_name_inited_   = false;
+      commit_file_name_inited_ = false;
+
+      set_master_enabled(true);
+      sql_print_error("Semi-sync replication enabled on the master.");
+    } else {
+      sql_print_error("Semi-sync replication not able to allocate memory.");
+      result = -1;
+    }
+  }
+
+  unlock();
+
+  return result;
+}
+
+int ReplSemiSync::disableMaster() {
+  /* Must have the lock when we do enable of disable. */
+  lock();
+
+  if (getMasterEnabled()) {
+    /* Switch off the semi-sync first so that waiting transaction will be
+     * waken up.
+     */
+    switch_off();
+
+    DBUG_ASSERT(active_tranxs_ != NULL);
+    delete active_tranxs_;
+    active_tranxs_ = NULL;
+
+    reply_file_name_inited_ = false;
+    wait_file_name_inited_  = false;
+    commit_file_name_inited_ = false;
+
+    set_master_enabled(false);
+    sql_print_error("Semi-sync replication disabled on the master.");
+  }
+
+  unlock();
+
+  return 0;
+}
+
+ReplSemiSync::~ReplSemiSync() {
+  if (init_done_) {
+    pthread_mutex_destroy(&LOCK_binlog_);
+    pthread_cond_destroy(&COND_binlog_send_);
+  }
+
+  delete active_tranxs_;
+}
+
+void ReplSemiSync::lock() {
+  pthread_mutex_lock(&LOCK_binlog_);
+}
+
+void ReplSemiSync::unlock() {
+  pthread_mutex_unlock(&LOCK_binlog_);
+}
+
+void ReplSemiSync::cond_broadcast() {
+  pthread_cond_broadcast(&COND_binlog_send_);
+}
+
+int ReplSemiSync::cond_timewait(struct timespec *wait_time) {
+  const char *kWho = "ReplSemiSync::cond_timewait()";
+  int wait_res;
+
+  function_enter(kWho);
+  wait_res = pthread_cond_timedwait(&COND_binlog_send_,
+                                    &LOCK_binlog_, wait_time);
+  return function_exit(kWho, wait_res);
+}
+
+void ReplSemiSync::function_enter(const char *func_name) {
+  if (trace_level_ & kTraceFunction)
+    sql_print_information("---> %s enter", func_name);
+}
+
+int ReplSemiSync::function_exit(const char *func_name, int exit_code) {
+  if (trace_level_ & kTraceFunction)
+    sql_print_information("<--- %s exit (%d)", func_name, exit_code);
+  return exit_code;
+}
+
+int ReplSemiSync::reportReplyBinlog(THD      *thd,
+                                    char     *log_file_name,
+                                    my_off_t  log_file_pos) {
+  const char *kWho = "ReplSemiSync::reportReplyBinlog";
+  int   cmp;
+  bool  can_release_threads = false;
+  bool  need_copy_send_pos = true;
+
+  /* If semi-sync replication is not enabled, or this thd is
+   * sending binlog to a slave where we do not need synchronous replication,
+   * then return immediately */
+  if (!(getMasterEnabled() && thd->semi_sync_slave)) {
+    return 0;
+  }
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled())
+    goto l_end;
+
+  if (!is_on()) {
+    /* We check to see whether we can switch semi-sync ON. */
+    try_switch_on(thd->server_id, log_file_name, log_file_pos);
+  }
+
+  /* The position should increase monotonically, if there is only one
+   * thread sending the binlog to the slave.
+   * In reality, to improve the transaction availability, we allow multiple
+   * sync replication slaves.  So, if any one of them get the transaction,
+   * the transaction session in the primary can move forward.
+   */
+  if (reply_file_name_inited_) {
+    cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                               reply_file_name_, reply_file_pos_);
+
+    /* If the requested position is behind the sending binlog position,
+     * would not adjust sending binlog position.
+     * We based on the assumption that there are multiple semi-sync slave,
+     * and at least one of them shou/ld be up to date.
+     * If all semi-sync slaves are behind, at least initially, the primary
+     * can find the situation after the waiting timeout.  After that, some
+     * slaves should catch up quickly.
+     */
+    if (cmp < 0) {
+      /* If the position is behind, do not copy it. */
+      need_copy_send_pos = false;
+    }
+  }
+
+  if (need_copy_send_pos) {
+    strcpy(reply_file_name_, log_file_name);
+    reply_file_pos_ = log_file_pos;
+    reply_file_name_inited_ = true;
+
+    /* Remove all active transaction nodes before this point. */
+    DBUG_ASSERT(active_tranxs_ != NULL);
+    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: Got reply at (%s, %lu)", kWho,
+                            log_file_name, (ulong)log_file_pos);
+  }
+
+  if (wait_sessions_ > 0) {
+    /* Let us check if some of the waiting threads doing a trx
+     * commit can now proceed.
+     */
+    cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+                               wait_file_name_, wait_file_pos_);
+    if (cmp >= 0) {
+      /* Yes, at least one waiting thread can now proceed:
+       * let us release all waiting threads with a broadcast
+       */
+      can_release_threads = true;
+      wait_file_name_inited_ = false;
+    }
+  }
+
+ l_end:
+  unlock();
+
+  if (can_release_threads) {
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: signal all waiting threads.", kWho);
+
+    cond_broadcast();
+  }
+
+  return function_exit(kWho, 0);
+}
+
+int ReplSemiSync::commitTrx(const char* trx_wait_binlog_name,
+                            my_off_t    trx_wait_binlog_pos) {
+  const char *kWho = "ReplSemiSync::commitTrx";
+
+  function_enter(kWho);
+
+  if (getMasterEnabled() && trx_wait_binlog_name) {
+    struct timeval start_tv;
+    struct timespec abstime;
+    int wait_result, start_time_err;
+
+    start_time_err = gettimeofday(&start_tv, 0);
+
+    /* Acquire the mutex. */
+    lock();
+
+    /* This is the real check inside the mutex. */
+    if (!getMasterEnabled())
+      goto l_end;
+
+    if (trace_level_ & kTraceDetail) {
+      sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
+                            trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
+                            (int)is_on());
+    }
+
+    while (is_on()) {
+      int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+                                     trx_wait_binlog_name, trx_wait_binlog_pos);
+      if (cmp >= 0) {
+        /* We have already sent the relevant binlog to the slave: no need to
+         * wait here.
+         */
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
+                                kWho, reply_file_name_, (ulong)reply_file_pos_);
+        break;
+      }
+
+      /* Let us update the info about the minimum binlog position of waiting
+       * threads.
+       */
+      if (wait_file_name_inited_) {
+        cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
+                                   wait_file_name_, wait_file_pos_);
+        if (cmp <= 0) {
+          /* This thd has a lower position, let's update the minimum info. */
+          strcpy(wait_file_name_, trx_wait_binlog_name);
+          wait_file_pos_ = trx_wait_binlog_pos;
+
+          wait_backtraverse_++;
+          if (trace_level_ & kTraceDetail)
+            sql_print_information("%s: move back wait position (%s, %lu),",
+                                  kWho, wait_file_name_, (ulong)wait_file_pos_);
+        }
+      } else {
+        strcpy(wait_file_name_, trx_wait_binlog_name);
+        wait_file_pos_ = trx_wait_binlog_pos;
+        wait_file_name_inited_ = true;
+
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: init wait position (%s, %lu),",
+                                kWho, wait_file_name_, (ulong)wait_file_pos_);
+      }
+
+      if (start_time_err == 0) {
+        int diff_usecs = start_tv.tv_usec + wait_timeout_ * TIME_THOUSAND;
+
+        /* Calcuate the waiting period. */
+        abstime.tv_sec = start_tv.tv_sec;
+        if (diff_usecs < TIME_MILLION) {
+          abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
+        } else {
+          while (diff_usecs >= TIME_MILLION) {
+            abstime.tv_sec++;
+            diff_usecs -= TIME_MILLION;
+          }
+          abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
+        }
+
+        /* In semi-synchronous replication, we wait until the binlog-dump
+         * thread has received the reply on the relevant binlog segment from the
+         * replication slave.
+         *
+         * Let us suspend this thread to wait on the condition;
+         * when replication has progressed far enough, we will release
+         * these waiting threads.
+         */
+        wait_sessions_++;
+
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
+                                kWho, wait_timeout_,
+                                wait_file_name_, (ulong)wait_file_pos_);
+
+        DBUG_PRINT("info", ("Waiting for binlog to be sent"));
+        wait_result = cond_timewait(&abstime);
+        wait_sessions_--;
+
+        if (wait_result != 0) {
+          if (trace_level_ & kTraceGeneral) {
+            /* This is a real wait timeout. */
+            sql_print_error("Replication semi-sync not sent binlog to "
+                            "slave within the timeout %lu ms - OFF.",
+                            wait_timeout_);
+            sql_print_error("          semi-sync up to file %s, position %lu",
+                            reply_file_name_, (ulong)reply_file_pos_);
+            sql_print_error("          transaction needs file %s, position %lu",
+                            trx_wait_binlog_name, (uint)trx_wait_binlog_pos);
+          }
+          total_wait_timeouts_++;
+
+          /* switch semi-sync off */
+          switch_off();
+        } else {
+          int wait_time;
+
+          wait_time = getWaitTime(start_tv);
+          if (wait_time < 0) {
+            if (trace_level_ & kTraceGeneral) {
+              /* This is a time/gettimeofday function call error. */
+              sql_print_error("Replication semi-sync gettimeofday fail1 at "
+                              "wait position (%s, %d)",
+                              trx_wait_binlog_name, (uint)trx_wait_binlog_pos);
+            }
+            timefunc_fails_++;
+          } else {
+            total_trx_wait_num_++;
+            total_trx_wait_time_ += wait_time;
+          }
+        }
+      } else {
+        if (trace_level_ & kTraceGeneral) {
+          /* This is a gettimeofday function call error. */
+          sql_print_error("Replication semi-sync gettimeofday fail2 at "
+                          "wait position (%s, %d)",
+                          trx_wait_binlog_name, (uint)trx_wait_binlog_pos);
+        }
+        timefunc_fails_++;
+
+        /* switch semi-sync off */
+        switch_off();
+      }
+    }
+
+  l_end:
+    /* Update the status counter. */
+    if (is_on())
+      enabled_transactions_++;
+    else
+      disabled_transactions_++;
+
+    unlock();
+  }
+
+  return function_exit(kWho, 0);
+}
+
+/* Indicate that semi-sync replication is OFF now.
+ * 
+ * What should we do when it is disabled?  The problem is that we want
+ * the semi-sync replication enabled again when the slave catches up
+ * later.  But, it is not that easy to detect that the slave has caught
+ * up.  This is caused by the fact that MySQL's replication protocol is
+ * asynchronous, meaning that if the master does not use the semi-sync
+ * protocol, the slave would not send anything to the master.
+ * Still, if the master is sending (N+1)-th event, we assume that it is
+ * an indicator that the slave has received N-th event and earlier ones.
+ *
+ * If semi-sync is disabled, all transactions still update the wait
+ * position with the last position in binlog.  But no transactions will
+ * wait for confirmations and the active transaction list would not be
+ * maintained.  In binlog dump thread, updateSyncHeader() checks whether
+ * the current sending event catches up with last wait position.  If it
+ * does match, semi-sync will be switched on again.
+ */
+int ReplSemiSync::switch_off() {
+  const char *kWho = "ReplSemiSync::switch_off";
+  int result;
+
+  function_enter(kWho);
+  state_ = false;
+
+  /* Clear the active transaction list. */
+  DBUG_ASSERT(active_tranxs_ != NULL);
+  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
+
+  switched_off_times_++;
+  wait_file_name_inited_   = false;
+  reply_file_name_inited_  = false;
+  commit_file_name_inited_ = false;
+  cond_broadcast();                            /* wake up all waiting threads */
+
+  return function_exit(kWho, result);
+}
+
+int ReplSemiSync::try_switch_on(int server_id, const char *log_file_name,
+                                my_off_t log_file_pos) {
+  const char *kWho = "ReplSemiSync::try_switch_on";
+  bool semi_sync_on = false;
+
+  function_enter(kWho);
+
+  /* If the current sending event's position is larger than or equal to the
+   * 'largest' commit transaction binlog position, the slave is already
+   * catching up now and we can switch semi-sync on here.
+   * If commit_file_name_inited_ indicates there are no recent transactions,
+   * we can enable semi-sync immediately.
+   */
+  if (commit_file_name_inited_) {
+    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                   commit_file_name_, commit_file_pos_);
+    semi_sync_on = (cmp >= 0);
+  } else {
+    semi_sync_on = true;
+  }
+
+  if (semi_sync_on) {
+    /* Switch semi-sync replication on. */
+    state_ = true;
+
+    if (trace_level_ & kTraceGeneral)
+      sql_print_information("%s switch semi-sync ON with server(%d) "
+                            "at (%s, %lu), repl(%d)",
+                            kWho, server_id, log_file_name,
+                            (ulong)log_file_pos, (int)is_on());
+  }
+
+  return function_exit(kWho, 0);
+}
+
+void ReplSemiSync::reserveSyncHeader(String *packet, THD *thd) {
+  const char *kWho = "ReplSemiSync::reserveSyncHeader";
+  function_enter(kWho);
+
+  packet->length(0);
+  if (!thd->semi_sync_slave) {
+    packet->append("\0", 1);
+  } else {
+    /* Set the magic number and the sync status.  By default, no sync
+     * is required.
+     */
+    packet->append(kSyncHeader, sizeof(kSyncHeader));
+  }
+  function_exit(kWho, 0);
+}
+
+int ReplSemiSync::updateSyncHeader(String         *packet,
+                                   const char     *log_file_name,
+                                   my_off_t        log_file_pos,
+                                   THD            *thd,
+                                   bool           *sync,
+                                   Log_event_type *event_type) {
+  const char *kWho = "ReplSemiSync::updateSyncHeader";
+  int  cmp = 0;
+
+  /* If the semi-sync master is not enabled, or the slave is not a semi-sync
+   * target, do not request replies from the slave.
+   */
+  if (!getMasterEnabled() || !thd->semi_sync_slave) {
+    *sync       = false;
+    *event_type = (Log_event_type)((*packet)[LOG_EVENT_OFFSET+1]);
+    return 0;
+  }
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled()) {
+    *sync = false;
+    goto l_end;
+  }
+
+  if (is_on()) {                                           /* semi-sync is ON */
+    *sync = false;               /* No sync unless a transaction is involved. */
+
+    if (reply_file_name_inited_) {
+      cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                 reply_file_name_, reply_file_pos_);
+      if (cmp <= 0) {
+        /* If we have already got the reply for the event, then we do
+         * not need to sync the transaction again.
+         */
+        cmp = -1;
+      }
+    }
+
+    if (cmp >= 0) {
+      if (wait_file_name_inited_) {
+        cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                   wait_file_name_, wait_file_pos_);
+      } else {
+        cmp = 1;
+      }
+
+      if (cmp >= 0) {
+        /* We are going to send an event which has not reached the final
+         * commit point inside InnoDB.
+         * We need the reply from the slave because soon the transaction
+         * should wait for the reply when it reaches the end of the
+         * commit.
+         *
+         * We only wait if the event is a transaction's ending event.
+         */
+        DBUG_ASSERT(active_tranxs_ != NULL);
+        *sync = active_tranxs_->is_tranx_end_pos(log_file_name,
+                                                 log_file_pos);
+      } else {
+        /* If we are already waiting for some transaction replies which
+         * are later in binlog, do not wait for this one event.
+         */
+      }
+    }
+  } else {                                                /* semi-sync is OFF */
+    /* Check to see whether we can switch semi-sync ON. */
+    try_switch_on(thd->server_id, log_file_name, log_file_pos);
+
+    /* We must request sync reply for the current event no matter whether it
+     * is the end of a transaction.
+     * Here is the problematic situation:
+     *  . writeTranxInBinlog(): update commit_file_* base on transaction-A
+     *  . updateSyncHeader(): switch on semi-sync replication
+     *  . commitTrx(): we would wait until timeout for transaction-A for
+     *                 which binlog_dump thread never requests replies
+     *
+     * Also, it is advantageous that we update commit_file_* inside function
+     * writeTranxInBinlog().  Because commit_file_* indicates the last
+     * transaction in binlog and the current event must be equal or behind
+     * the last transaction, a reply to the current event from the slave
+     * can clear all older transactions' syncness.
+     */
+    *sync = is_on();
+  }
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
+                          kWho, thd->server_id, log_file_name,
+                          (ulong)log_file_pos, *sync, (int)is_on());
+
+ l_end:
+  unlock();
+
+  /* We do not need to clear sync flag because we set it to 0 when we
+   * reserve the packet header.
+   */
+  if (*sync)
+    (packet->c_ptr())[2] = kPacketFlagSync;
+
+  *event_type = (Log_event_type)((*packet)[LOG_EVENT_OFFSET+3]);
+  return function_exit(kWho, 0);
+}
+
+int ReplSemiSync::readSlaveReply(THD *thd, NET *net,
+                                 const char **read_errmsg,
+                                 int *read_errno) {
+  const char *kWho = "ReplSemiSync::readSlaveReply";
+  const unsigned char *packet;
+  char     log_file_name[FN_REFLEN];
+  my_off_t log_file_pos;
+  ulong    packet_len;
+  int      result = -1;
+
+  struct timeval start_tv;
+  int   start_time_err;
+  ulong trc_level = trace_level_;
+
+  function_enter(kWho);
+
+  if (trc_level & kTraceNetWait)
+    start_time_err = gettimeofday(&start_tv, 0);
+
+  /* We flush to make sure that the current event is sent to the network,
+   * instead of being buffered in the TCP/IP stack.
+   */
+  if (net_flush(net)) {
+    *read_errmsg = "failed on net_flush()";
+    *read_errno  = ER_UNKNOWN_ERROR;
+    goto l_end;
+  }
+
+  if (trc_level & kTraceDetail)
+    sql_print_information("%s: Wait for replica's reply", kWho);
+
+  /* Wait for the network here.  Though binlog dump thread can indefinitely wait
+   * here, transactions would not wait indefintely.
+   * Transactions wait on binlog replies detected by binlog dump threads.  If
+   * binlog dump threads wait too long, transactions will timeout and continue.
+   */
+  packet_len = my_net_read(net);
+
+  if (trc_level & kTraceNetWait) {
+    if (start_time_err != 0) {
+      sql_print_error("Network wait gettimeofday fail1");
+      timefunc_fails_++;
+    } else {
+      int wait_time;
+
+      wait_time = getWaitTime(start_tv);
+      if (wait_time < 0) {
+        sql_print_error("Network wait gettimeofday fail2");
+        timefunc_fails_++;
+      } else {
+        total_net_wait_num_++;
+        total_net_wait_time_ += wait_time;
+      }
+    }
+  }
+
+  if (packet_len == packet_error || packet_len < 10) {
+    if (packet_len == packet_error)
+      *read_errmsg = "Read semi-sync reply network error";
+    else
+      *read_errmsg = "Read semi-sync reply length error";
+    *read_errno  = ER_UNKNOWN_ERROR;
+    goto l_end;
+  }
+
+  packet = net->read_pos;
+  if (packet[0] != ReplSemiSync::kPacketMagicNum ||
+      packet[9] != ReplSemiSync::kPacketMagicNum) {
+    *read_errmsg = "Read semi-sync reply magic number error";
+    *read_errno  = ER_UNKNOWN_ERROR;
+    goto l_end;
+  }
+
+  log_file_pos = uint8korr(packet + 1);
+  strcpy(log_file_name, (const char*)packet + 10);
+
+  if (trc_level & kTraceDetail)
+    sql_print_information("%s: Got reply (%s, %lu)",
+                          kWho, log_file_name, (ulong)log_file_pos);
+
+  result = reportReplyBinlog(thd, log_file_name, log_file_pos);
+
+ l_end:
+  return function_exit(kWho, result);
+}
+
+int ReplSemiSync::writeTranxInBinlog(const char* log_file_name,
+                                     my_off_t    log_file_pos) {
+  const char *kWho = "ReplSemiSync::writeTranxInBinlog";
+  int result = 0;
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled())
+    goto l_end;
+
+  /* Update the 'largest' transaction commit position seen so far even
+   * though semi-sync is switched off.
+   * It is much better that we update commit_file_* here, instead of
+   * inside commitTrx().  This is mostly because updateSyncHeader()
+   * will watch for commit_file_* to decide whether to switch semi-sync
+   * on. The detailed reason is explained in function updateSyncHeader().
+   */
+  if (commit_file_name_inited_) {
+    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                   commit_file_name_, commit_file_pos_);
+    if (cmp > 0) {
+      /* This is a larger position, let's update the maximum info. */
+      strcpy(commit_file_name_, log_file_name);
+      commit_file_pos_ = log_file_pos;
+    }
+  } else {
+    strcpy(commit_file_name_, log_file_name);
+    commit_file_pos_ = log_file_pos;
+    commit_file_name_inited_ = true;
+  }
+
+  if (is_on()) {
+    DBUG_ASSERT(active_tranxs_ != NULL);
+    result = active_tranxs_->insert_tranx_node(log_file_name, log_file_pos);
+  }
+
+ l_end:
+  unlock();
+
+  return function_exit(kWho, result);
+}
+
+int ReplSemiSync::slaveReadSyncHeader(const char *header,
+                                      ulong total_len,
+                                      bool  *need_reply,
+                                      const char **payload,
+                                      ulong *payload_len) {
+  const char *kWho = "ReplSemiSync::slaveReadSyncHeader";
+  int read_res = 0;
+  function_enter(kWho);
+
+  if ((unsigned char)(header[0]) == kPacketMagicNum) {
+    *need_reply  = (header[1] & kPacketFlagSync);
+    *payload_len = total_len - 2;
+    *payload     = header + 2;
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: reply - %d", kWho, *need_reply);
+  } else {
+    sql_print_error("Missing magic number for semi-sync packet, packet "
+                    "len: %d", total_len);
+    read_res = -1;
+  }
+
+  return function_exit(kWho, read_res);
+}
+
+int ReplSemiSync::slaveReply(NET        *net,
+                             const char *binlog_filename,
+                             my_off_t    binlog_filepos) {
+  const char *kWho = "ReplSemiSync::slaveReply";
+  char reply_buffer[1+8+1+FN_REFLEN];
+  int  reply_res, name_len = strlen(binlog_filename);
+
+  function_enter(kWho);
+
+  /* Prepare the buffer of the reply. */
+  reply_buffer[0] = kPacketMagicNum;
+  int8store(reply_buffer + 1, binlog_filepos);
+  reply_buffer[9] = kPacketMagicNum;
+  memcpy(reply_buffer + 10, binlog_filename, name_len);
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: reply (%s, %lu)", kWho,
+                          binlog_filename, (ulong)binlog_filepos);
+
+  /* Send the reply. */
+  reply_res = my_net_write(net, reply_buffer, name_len + 10);
+  if (reply_res == 0)
+    reply_res = net_flush(net);
+
+  return function_exit(kWho, reply_res);
+}
+
+
+int ReplSemiSync::resetMaster(THD *thd) {
+  const char *kWho = "ReplSemiSync::resetMaster";
+  int result = 0;
+
+  function_enter(kWho);
+
+  // mysql_bin_log.lock_log();
+  // mysql_bin_log.lock_index();
+  lock();
+
+  /* If semi-sync is ON, switch it OFF. */
+  if (getMasterEnabled() && is_on()) {
+    result = switch_off();
+  }
+
+  // if (result == 0) result = mysql_bin_log.reset_logs(thd);
+  // TODO(mcallaghan): port these changes from MySQL 4
+  // result = mysql_bin_log.reset_logs(thd, true, false);
+
+  unlock();
+  // mysql_bin_log.unlock_index();
+  // mysql_bin_log.unlock_log();
+
+  return function_exit(kWho, result);
+}
+
+void ReplSemiSync::setExportStats() {
+  VOID(pthread_mutex_lock(&LOCK_status));
+  lock();
+
+  rpl_semi_sync_status           = state_ ? 1 : 0;
+  rpl_semi_sync_yes_transactions = enabled_transactions_;
+  rpl_semi_sync_no_transactions  = disabled_transactions_;
+  rpl_semi_sync_off_times        = switched_off_times_;
+  rpl_semi_sync_timefunc_fails   = timefunc_fails_;
+  rpl_semi_sync_num_timeouts     = total_wait_timeouts_;
+  rpl_semi_sync_wait_sessions    = wait_sessions_;
+  rpl_semi_sync_back_wait_pos    = wait_backtraverse_;
+  rpl_semi_sync_trx_wait_num     = total_trx_wait_num_;
+  rpl_semi_sync_trx_wait_time    =
+    ((total_trx_wait_num_) ?
+     (ulong)((double)total_trx_wait_time_ /
+             ((double)total_trx_wait_num_)) : 0);
+  rpl_semi_sync_net_wait_num     = total_net_wait_num_;
+  rpl_semi_sync_net_wait_time    =
+    ((total_net_wait_num_) ?
+     (ulong)((double)total_net_wait_time_ /
+             ((double)total_net_wait_num_)) : 0);
+
+  rpl_semi_sync_net_wait_total_time = total_net_wait_time_;
+  rpl_semi_sync_trx_wait_total_time = total_trx_wait_time_;
+
+  unlock();
+  VOID(pthread_mutex_unlock(&LOCK_status));
+}
+
+/* Get the waiting time given the wait's staring time.
+ * 
+ * Return:
+ *  >= 0: the waiting time in microsecons(us)
+ *   < 0: error in gettimeofday or time back traverse
+ */
+static int getWaitTime(const struct timeval& start_tv) {
+  ulonglong start_usecs, end_usecs;
+  struct timeval end_tv;
+  int end_time_err;
+
+  /* Starting time in microseconds(us). */
+  start_usecs = start_tv.tv_sec * TIME_MILLION + start_tv.tv_usec;
+
+  /* Get the wait time interval. */
+  end_time_err = gettimeofday(&end_tv, 0);
+
+  /* Ending time in microseconds(us). */
+  end_usecs = end_tv.tv_sec * TIME_MILLION + end_tv.tv_usec;
+
+  if (end_time_err != 0 || end_usecs < start_usecs)
+    return -1;
+
+  return (int)(end_usecs - start_usecs);
+}
diff -Nrup a/sql/repl_semi_sync.h b/sql/repl_semi_sync.h
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/sql/repl_semi_sync.h	2008-03-13 16:22:07 +08:00
@@ -0,0 +1,449 @@
+/* Copyright (C) 2007 Google Inc.
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+*/
+
+/* ReplSemiSync class is reponsible for semi synchronous replication.  The
+ * general idea of semi-sync replication is the master database need to make
+ * sure that the slave database receives its replication events before telling
+ * the client that a transaction has been committed.  The difference between
+ * semi-sync and full-sync is that full-sync replication requires that the
+ * slave database finish replicated transaction before replying the master.
+ *
+ * The current semi-sync implementation defines a transaction wait timeout.
+ * In this way, the master would not wait for the slave/replica indefinitely;
+ * instead after a configurable timeout, the master will continue the current
+ * transaction.  At the same time, semi-sync will be disabled so that no
+ * transactions will wait after this.  Later, semi-sync can be enabled again
+ * when the slave catches up in replication.  The timeout design is to prevent
+ * the master from halting for update, in case of the slave machine issues or
+ * network issues.
+ * 
+ * We export three status variables to track the semi-sync on the master:
+ *  . "Replication_semi_sync_status":           rpl_semi_sync_status
+ *  . "Replication_semi_sync_yes_transactions": rpl_semi_sync_yes_transactions
+ *  . "Replication_semi_sync_no_transactions":  rpl_semi_sync_no_transactions
+ * 
+ * This class has codes for both the master and the slave/replica side.
+ * However, the slave/replica side is relatively lightweight because the slave
+ * is mostly stateless reply of binlog events sent from the master.
+ *
+ * The master side code flow:
+ *  . session/thread with a commit transaction:
+ *    - write all events for the current transaction to binlog
+ *    - call writeTraxInBinlog()
+ *    - call commitTrx() during InnoDB's final commit action
+ *    - InnoDB commits and returns 'ok' to the client
+ *  . binlog-dump session/thread - <mysql_binlog_send>
+ *    - connection from slave to dump binlog events
+ *    - call reportReplyBinlog() to report the slave's position
+ *    - while (true):
+ *    -   call reserveSyncHeader() to create header for the next event
+ *    -   read the event from binlog
+ *    -   call updateSyncHeader() to indicate whether slave should reply
+ *    -   call readSlaveReply() to wait for slave's reply
+ *
+ * The slave side code flow - <handle_slave_io>:
+ *    - sync_status = call readSyncHeader()
+ *    - if (sync_status):
+ *    -   call slaveReply()
+ */
+
+#ifndef REPL_SEMI_SYNC_H__
+#define REPL_SEMI_SYNC_H__
+
+class THD;
+class String;
+typedef struct st_net NET;
+enum Log_event_type;
+
+class ReplSemiSync {
+ private:
+
+  /* This class manages memory for active transaction list.  We record each
+   * active transaction with a TranxNode.  Because each session can only have
+   * only one open transaction, the total active transaction nodes can not
+   * exceed the maximum sessions.  Currently in MySQL, sessions are the same
+   * as connections.
+   */
+  class ActiveTranx {
+  private:
+    struct TranxNode {
+      char             *log_name_;
+      my_off_t          log_pos_;
+      struct TranxNode *next_;            /* the next node in the sorted list */
+      struct TranxNode *hash_next_;    /* the next node during hash collision */
+    };
+
+    /* The following data structure maintains an active transaction list. */
+    TranxNode       *node_array_;
+    TranxNode       *free_pool_;
+
+    /* These two record the active transaction list in sort order. */
+    TranxNode       *trx_front_, *trx_rear_;
+
+    TranxNode      **trx_htb_;        /* A hash table on active transactions. */
+
+    int              num_transactions_;               /* maximum transactions */
+    int              num_entries_;              /* maximum hash table entries */
+    pthread_mutex_t *lock_;                                     /* mutex lock */
+    ulong           *trace_level_;                             /* trace level */
+
+    inline void assert_lock_owner();
+    inline void function_enter(const char *func_name);
+    inline int  function_exit(const char *func_name, int exit_code);
+
+    inline TranxNode* alloc_tranx_node();
+
+    inline uint calc_hash(const byte *key,uint length);
+    uint get_hash_value(const char *log_file_name, my_off_t log_file_pos);
+
+    int compare(const char *log_file_name1, my_off_t log_file_pos1,
+                const TranxNode *node2) {
+      return compare(log_file_name1, log_file_pos1,
+                     node2->log_name_, node2->log_pos_);
+    }
+    int compare(const TranxNode *node1,
+                const char *log_file_name2, my_off_t log_file_pos2) {
+      return compare(node1->log_name_, node1->log_pos_,
+                     log_file_name2, log_file_pos2);
+    }
+    int compare(const TranxNode *node1, const TranxNode *node2) {
+      return compare(node1->log_name_, node1->log_pos_,
+                     node2->log_name_, node2->log_pos_);
+    }
+
+  public:
+    ActiveTranx(int max_connections, pthread_mutex_t *lock,
+                ulong *trace_level);
+    ~ActiveTranx();
+
+    /* Insert an active transaction node with the specified position.
+     *
+     * Return:
+     *  0: success;  -1 or otherwise: error
+     */
+    int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
+
+    /* Clear the active transaction nodes until(inclusive) the specified
+     * position.
+     * If log_file_name is NULL, everything will be cleared: the sorted
+     * list and the hash table will be reset to empty.
+     * 
+     * Return:
+     *  0: success;  -1 or otherwise: error
+     */
+    int clear_active_tranx_nodes(const char *log_file_name,
+                                 my_off_t    log_file_pos);
+
+    /* Given a position, check to see whether the position is an active
+     * transaction's ending position by probing the hash table.
+     */
+    bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+
+    /* Given two binlog positions, compare which one is bigger based on
+     * (file_name, file_position).
+     */
+    static int compare(const char *log_file_name1, my_off_t log_file_pos1,
+                       const char *log_file_name2, my_off_t log_file_pos2);
+
+  };
+
+  ActiveTranx    *active_tranxs_;  /* active transaction list: the list will
+                                      be cleared when semi-sync switches off. */
+
+  /* True when initObject has been called */
+  bool init_done_;
+
+  /* This cond variable is signaled when enough binlog has been sent to slave,
+   * so that a waiting trx can return the 'ok' to the client for a commit.
+   */
+  pthread_cond_t  COND_binlog_send_;
+
+  /* Mutex that protects the following state variables and the active
+   * transaction list.
+   * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
+   * already holding LOCK_binlog_ because it can cause deadlocks.
+   */
+  pthread_mutex_t LOCK_binlog_;
+
+  /* This is set to true when reply_file_name_ contains meaningful data. */
+  bool            reply_file_name_inited_;
+
+  /* The binlog name up to which we have received replies from any slaves. */
+  char            reply_file_name_[FN_REFLEN];
+
+  /* The position in that file up to which we have the reply from any slaves. */
+  my_off_t        reply_file_pos_;
+
+  /* This is set to true when we know the 'smallest' wait position. */
+  bool            wait_file_name_inited_;
+
+  /* NULL, or the 'smallest' filename that a transaction is waiting for
+   * slave replies.
+   */
+  char            wait_file_name_[FN_REFLEN];
+
+  /* The smallest position in that file that a trx is waiting for: the trx
+   * can proceed and send an 'ok' to the client when the master has got the
+   * reply from the slave indicating that it already got the binlog events.
+   */
+  my_off_t        wait_file_pos_;
+
+  /* This is set to true when we know the 'largest' transaction commit
+   * position in the binlog file.
+   * We always maintain the position no matter whether semi-sync is switched
+   * on switched off.  When a transaction wait timeout occurs, semi-sync will
+   * switch off.  Binlog-dump thread can use the three fields to detect when
+   * slaves catch up on replication so that semi-sync can switch on again.
+   */
+  bool            commit_file_name_inited_;
+
+  /* The 'largest' binlog filename that a commit transaction is seeing.       */
+  char            commit_file_name_[FN_REFLEN];
+
+  /* The 'largest' position in that file that a commit transaction is seeing. */
+  my_off_t        commit_file_pos_;
+
+  /* All global variables which can be set by parameters. */
+  bool            master_enabled_;      /* semi-sync is enabled on the master */
+  bool            slave_enabled_;        /* semi-sycn is enabled on the slave */
+  ulong           wait_timeout_;      /* timeout period(ms) during tranx wait */
+  ulong           trace_level_;                      /* the level for tracing */
+
+  /* All status variables. */
+  bool            state_;                    /* whether semi-sync is switched */
+  ulong           enabled_transactions_;          /* semi-sync'ed tansactions */
+  ulong           disabled_transactions_;     /* non-semi-sync'ed tansactions */
+  ulong           switched_off_times_;    /* how many times are switched off? */
+  ulong           timefunc_fails_;           /* how many time function fails? */
+  ulong           total_wait_timeouts_;      /* total number of wait timeouts */
+  ulong           wait_sessions_;      /* how many sessions wait for replies? */
+  ulong           wait_backtraverse_;         /* wait position back traverses */
+  ulonglong       total_trx_wait_num_;   /* total trx waits: non-timeout ones */
+  ulonglong       total_trx_wait_time_;         /* total trx wait time: in us */
+  ulonglong       total_net_wait_num_;                 /* total network waits */
+  ulonglong       total_net_wait_time_;            /* total network wait time */
+
+  /* The number of maximum active transactions.  This should be the same as
+   * maximum connections because MySQL does not do connection sharing now.
+   */
+  int             max_transactions_;
+
+  static const ulong kTraceFunction;
+  static const ulong kTraceGeneral;
+  static const ulong kTraceDetail;
+  static const ulong kTraceNetWait;
+
+  static const char  kSyncHeader[3];              /* three byte packet header */
+
+  void lock();
+  void unlock();
+  void cond_broadcast();
+  int  cond_timewait(struct timespec *wait_time);
+
+  inline void function_enter(const char *func_name);
+  inline int  function_exit(const char *func_name, int exit_code);
+
+  /* Is semi-sync replication on? */
+  bool is_on() {
+    return (state_);
+  }
+
+  void set_master_enabled(bool enabled) {
+    master_enabled_ = enabled;
+  }
+
+  /* Switch semi-sync off because of timeout in transaction waiting. */
+  int switch_off();
+
+  /* Switch semi-sync on when slaves catch up. */
+  int try_switch_on(int server_id,
+                    const char *log_file_name, my_off_t log_file_pos);
+
+ public:
+  ReplSemiSync();
+  ~ReplSemiSync();
+
+  /* Constants in network packet header. */
+  static const unsigned char kPacketMagicNum;
+  static const unsigned char kPacketFlagSync;
+
+  bool getMasterEnabled() {
+    return master_enabled_;
+  }
+  bool getSlaveEnabled() {
+    return slave_enabled_;
+  }
+  void setSlaveEnabled(bool enabled) {
+    slave_enabled_ = enabled;
+  }
+
+  void setTraceLevel(ulong trace_level) {
+    trace_level_ = trace_level;
+  }
+
+  /* Set the transaction wait timeout period, in milliseconds. */
+  void setWaitTimeout(ulong wait_timeout) {
+    wait_timeout_ = wait_timeout;
+  }
+
+  /* Initialize this class after MySQL parameters are initialized. this
+   * function should be called once at bootstrap time.
+   */
+  int initObject();
+
+  /* Enable the object to enable semi-sync replication inside the master. */
+  int enableMaster();
+
+  /* Enable the object to enable semi-sync replication inside the master. */
+  int disableMaster();
+
+  /* In semi-sync replication, reports up to which binlog position we have
+   * received replies from the slave indicating that it already get the events.
+   *
+   * Input:
+   *  thd           - (IN)  binlog-dump thread doing the binlog communication
+   *                        to the slave
+   *  log_file_name - (IN)  binlog file name
+   *  end_offset    - (IN)  the offset in the binlog file up to which we have
+   *                        the replies from the slave
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int reportReplyBinlog(THD* thd, char* log_file_name,
+                        my_off_t end_offset);
+
+  /* Commit a transaction in the final step.  This function is called from
+   * InnoDB before returning from the low commit.  If semi-sync is switch on,
+   * the function will wait to see whether binlog-dump thread get the reply for
+   * the events of the transaction.  Remember that this is not a direct wait,
+   * instead, it waits to see whether the binlog-dump thread has reached the
+   * point.  If the wait times out, semi-sync status will be switched off and
+   * all other transaction would not wait either.
+   *
+   * Input:  (the transaction events' ending binlog position)
+   *  trx_wait_binlog_name - (IN)  ending position's file name
+   *  trx_wait_binlog_pos  - (IN)  ending position's file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int commitTrx(const char* trx_wait_binlog_name,
+                my_off_t trx_wait_binlog_pos);
+
+  /* Reserve space in the replication event packet header:
+   *  . slave semi-sync off: 1 byte - (0)
+   *  . slave semi-sync on:  3 byte - (0, 0xef, 0/1}
+   * 
+   * Input:
+   *  packet   - (IN)  the packet containing the replication event
+   *  thd      - (IN)  the binlog dump thread
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  void reserveSyncHeader(String *packet, THD *thd);
+
+  /* Update the sync bit in the packet header to indicate to the slave whether
+   * the master will wait for the reply of the event.  If semi-sync is switched
+   * off and we detect that the slave is catching up, we switch semi-sync on.
+   * 
+   * Input:
+   *  packet        - (IN)  the packet containing the replication event
+   *  log_file_name - (IN)  the event ending position's file name
+   *  log_file_pos  - (IN)  the event ending position's file offset
+   *  thd           - (IN)  the binlog dump thread
+   *  sync          - (OUT) whether the sync bit is set
+   *  event_type    - (OUT) the sending event's type
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int updateSyncHeader(String *packet,
+                       const char *log_file_name, my_off_t log_file_pos,
+                       THD *thd, bool *sync, Log_event_type *event_type);
+
+  /* Read the slave's reply so that we know how much progress the slave makes
+   * on receive replication events.
+   * 
+   * Input:
+   *  thd          - (IN)  binlog dump replication thread
+   *  net          - (IN)  the network socket
+   *  read_errmsg  - (OUT) error message if an error occurs
+   *  read_errno   - (OUT) error number if an error occurs
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int readSlaveReply(THD *thd, NET *net, const char **read_errmsg,
+                     int *read_errno);
+
+  /* Called when a transaction finished writing binlog events.
+   *  . update the 'largest' transactions' binlog event position
+   *  . insert the ending position in the active transaction list if
+   *    semi-sync is on
+   * 
+   * Input:  (the transaction events' ending binlog position)
+   *  log_file_name - (IN)  transaction ending position's file name
+   *  log_file_pos  - (IN)  transaction ending position's file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
+
+  /* A slave reads the semi-sync packet header and separate the metadata
+   * from the payload data.
+   * 
+   * Input:
+   *  header      - (IN)  packet header pointer
+   *  total_len   - (IN)  total packet length: metadata + payload
+   *  need_reply  - (IN)  whether the master is waiting for the reply
+   *  payload     - (IN)  payload: the replication event
+   *  payload_len - (IN)  payload length
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int slaveReadSyncHeader(const char *header, ulong total_len, bool *need_reply,
+                          const char **payload, ulong *payload_len);
+
+  /* A slave replies to the master indicating its replication process.  It
+   * indicates that the slave has received all events before the specified
+   * binlog position.
+   * 
+   * Input:
+   *  net              - (IN)  the network socket
+   *  binlog_filename  - (IN)  the reply point's binlog file name
+   *  binlog_filepos   - (IN)  the reply point's binlog file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int slaveReply(NET *net, const char *binlog_filename,
+                 my_off_t binlog_filepos);
+
+  /* Export internal statistics for semi-sync replication. */
+  void setExportStats();
+
+  /* 'reset master' command is issued from the user and semi-sync need to
+   * go off for that.
+   */
+  int resetMaster(THD *thd);
+};
+
+#endif   /* REPL_SEMI_SYNC_H__ */
diff -Nrup a/sql/set_var.cc b/sql/set_var.cc
--- a/sql/set_var.cc	2008-02-27 00:26:41 +08:00
+++ b/sql/set_var.cc	2008-03-13 16:22:06 +08:00
@@ -68,6 +68,9 @@ extern ulong ndb_cache_check_time;
 extern char opt_ndb_constrbuf[];
 extern ulong ndb_extra_logging;
 #endif
+#include "repl_semi_sync.h"
+
+extern ReplSemiSync semi_sync_replicator;
 
 #ifdef HAVE_NDB_BINLOG
 extern ulong ndb_report_thresh_binlog_epoch_slip;
@@ -129,6 +132,9 @@ static void fix_query_cache_size(THD *th
 static void fix_query_cache_min_res_unit(THD *thd, enum_var_type type);
 static void fix_myisam_max_sort_file_size(THD *thd, enum_var_type type);
 static void fix_max_binlog_size(THD *thd, enum_var_type type);
+static void fix_rpl_semi_sync_trace_level(THD *thd, enum_var_type type);
+static void fix_rpl_semi_sync_enabled(THD *thd, enum_var_type type);
+static void fix_rpl_semi_sync_slave_enabled(THD *thd, enum_var_type type);
 static void fix_max_relay_log_size(THD *thd, enum_var_type type);
 static void fix_max_connections(THD *thd, enum_var_type type);
 static int check_max_delayed_threads(THD *thd, set_var *var);
@@ -384,6 +390,20 @@ static sys_var_thd_ulong	sys_div_precinc
                                               &SV::div_precincrement);
 static sys_var_long_ptr	sys_rpl_recovery_rank(&vars, "rpl_recovery_rank",
 					      &rpl_recovery_rank);
+sys_var_long_ptr        sys_rpl_semi_sync_enabled("rpl_semi_sync_enabled",
+                                                  &rpl_semi_sync_enabled,
+                                                  fix_rpl_semi_sync_enabled);
+sys_var_long_ptr        sys_rpl_semi_sync_slave_enabled(
+                                              "rpl_semi_sync_slave_enabled",
+                                              &rpl_semi_sync_slave_enabled,
+                                              fix_rpl_semi_sync_slave_enabled);
+sys_var_long_ptr        sys_rpl_semi_sync_timeout(
+                                              "rpl_semi_sync_timeout",
+                                              &rpl_semi_sync_timeout);
+sys_var_long_ptr        sys_rpl_semi_sync_trace_level(
+                                              "rpl_semi_sync_trace_level",
+                                              &rpl_semi_sync_trace_level,
+                                              fix_rpl_semi_sync_trace_level);
 static sys_var_long_ptr	sys_query_cache_size(&vars, "query_cache_size",
 					     &query_cache_size,
 					     fix_query_cache_size);
@@ -1142,6 +1162,45 @@ static void fix_max_binlog_size(THD *thd
   if (!max_relay_log_size)
     active_mi->rli.relay_log.set_max_size(max_binlog_size);
 #endif
+  DBUG_VOID_RETURN;
+}
+
+static void fix_rpl_semi_sync_trace_level(THD *thd, enum_var_type type)
+{
+  DBUG_ENTER("fix_rpl_semi_sync_trace_level");
+
+  DBUG_PRINT("info",("rpl_semi_sync_trace_level=%lu",
+                     rpl_semi_sync_trace_level));
+  semi_sync_replicator.setTraceLevel(rpl_semi_sync_trace_level);
+
+  DBUG_VOID_RETURN;
+}
+
+static void fix_rpl_semi_sync_enabled(THD *thd, enum_var_type type)
+{
+  DBUG_ENTER("fix_rpl_semi_sync_enabled");
+  DBUG_PRINT("info",("rpl_semi_sync_enabled=%lu",
+                     rpl_semi_sync_enabled));
+
+  if (rpl_semi_sync_enabled) {
+    if (semi_sync_replicator.enableMaster() != 0)
+      rpl_semi_sync_enabled = false;
+  } else {
+    if (semi_sync_replicator.disableMaster() != 0)
+      rpl_semi_sync_enabled = true;
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+static void fix_rpl_semi_sync_slave_enabled(THD *thd, enum_var_type type)
+{
+  DBUG_ENTER("fix_rpl_semi_sync_slave_enabled");
+
+  DBUG_PRINT("info",("rpl_semi_sync_slave_enabled=%lu",
+                     rpl_semi_sync_slave_enabled));
+  semi_sync_replicator.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
+
   DBUG_VOID_RETURN;
 }
 
diff -Nrup a/sql/slave.cc b/sql/slave.cc
--- a/sql/slave.cc	2008-03-05 16:44:43 +08:00
+++ b/sql/slave.cc	2008-03-13 16:22:06 +08:00
@@ -37,9 +37,11 @@
 #include <thr_alarm.h>
 #include <my_dir.h>
 #include <sql_common.h>
+#include "repl_semi_sync.h"
 #include <errmsg.h>
 #include <mysys_err.h>
 
+
 #ifdef HAVE_REPLICATION
 
 #include "rpl_tblmap.h"
@@ -50,6 +52,8 @@
 bool use_slave_mask = 0;
 MY_BITMAP slave_error_mask;
 
+extern ReplSemiSync semi_sync_replicator;
+
 typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
 char* slave_load_tmpdir = 0;
@@ -1611,7 +1615,7 @@ static int safe_sleep(THD* thd, int sec,
 
 
 static int request_dump(MYSQL* mysql, Master_info* mi,
-                        bool *suppress_warnings)
+			bool *suppress_warnings, bool semi_sync_slave)
 {
   uchar buf[FN_REFLEN + 10];
   int len;
@@ -1621,6 +1625,10 @@ static int request_dump(MYSQL* mysql, Ma
   
   *suppress_warnings= FALSE;
 
+  /* If semi-synchronous replication is enabled, set the flag. */
+  if (semi_sync_slave)
+    binlog_flags |= BINLOG_SEMI_SYNC;
+
   // TODO if big log files: Change next to int8store()
   int4store(buf, (ulong) mi->master_log_pos);
   int2store(buf + 4, binlog_flags);
@@ -2299,8 +2307,9 @@ pthread_handler_t handle_slave_io(void *
   if (!safe_connect(thd, mysql, mi))
   {
     sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
-                          "replication started in log '%s' at position %s",
+			  %s replication started in log '%s' at position %s", mi->user,
                           mi->user, mi->host, mi->port,
+                          semi_sync_slave ? "semi-sync" : "asynchronous",
 			  IO_RPL_LOG_NAME,
 			  llstr(mi->master_log_pos,llbuff));
   /*
@@ -2325,6 +2334,13 @@ connected:
   if (get_master_version_and_clock(mysql, mi))
     goto err;
 
+  /* Export the semi-sync status. */
+  if (semi_sync_slave && !semi_sync_status) {
+    /* Semi-sync status is ON now. */
+    thread_safe_add(rpl_semi_sync_slave_status, 1, &LOCK_status);
+    semi_sync_status = true;
+  }
+
   if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
   {
     /*
@@ -2361,7 +2377,7 @@ connected:
   while (!io_slave_killed(thd,mi))
   {
     thd_proc_info(thd, "Requesting binlog dump");
-    if (request_dump(mysql, mi, &suppress_warnings))
+    if (request_dump(mysql, mi, &suppress_warnings, semi_sync_slave))
     {
       sql_print_error("Failed on request_dump()");
       if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2384,6 +2400,8 @@ requesting master dump") ||
 
     while (!io_slave_killed(thd,mi))
     {
+      const char* event_buf;
+      bool need_reply;
       ulong event_len;
       /*
          We say "waiting" because read_event() will wait if there's nothing to
@@ -2434,10 +2452,21 @@ Stopping slave I/O thread due to out-of-
         goto connected;
       } // if (event_len == packet_error)
 
+      need_reply = false;
+      if (semi_sync_slave) {
+        if (semi_sync_replicator.slaveReadSyncHeader(
+              (const char*)mysql->net.read_pos + 1, event_len,
+              &need_reply, &event_buf, &event_len) != 0) {
+          sql_print_error("Missing magic number in packet header.");
+          goto err;
+        }
+      } else {
+        event_buf = (const char*)mysql->net.read_pos + 1;
+      }
+
       retry_count=0;                    // ok event, reset retry counter
       thd_proc_info(thd, "Queueing master event to the relay log");
-      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
-                      event_len))
+      if (queue_event(mi, event_buf, event_len))
       {
         mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
                    ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
@@ -2520,6 +2549,12 @@ err:
   change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
   DBUG_ASSERT(thd->net.buff != 0);
   net_end(&thd->net); // destructor will not free it, because net.vio is 0
+
+  if (semi_sync_status) {
+    /* Semi-sync status is OFF now. */
+    thread_safe_sub(rpl_semi_sync_slave_status, 1, &LOCK_status);
+  }
+
   close_thread_tables(thd);
   pthread_mutex_lock(&LOCK_thread_count);
   THD_CHECK_SENTRY(thd);
@@ -2551,6 +2586,9 @@ pthread_handler_t handle_slave_sql(void 
   Relay_log_info* rli = &((Master_info*)arg)->rli;
   const char *errmsg;
 
+  bool semi_sync_slave = semi_sync_replicator.getSlaveEnabled();
+  bool semi_sync_status = false;
+
   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
   my_thread_init();
   DBUG_ENTER("handle_slave_sql");
@@ -3828,12 +3866,24 @@ static Log_event* next_event(Relay_log_i
         pthread_mutex_unlock(&rli->log_space_lock);
         pthread_cond_broadcast(&rli->log_space_cond);
         // Note that wait_for_update unlocks lock_log !
-        rli->relay_log.wait_for_update(rli->sql_thd, 1);
+        rli->relay_log.wait_for_update(rli->sql_thd,
+                                       "Has read all relay log; waiting for"
+                                       "the I/O slave thread to update it");
+
         // re-acquire data lock since we released it earlier
         pthread_mutex_lock(&rli->data_lock);
         rli->last_master_timestamp= save_timestamp;
         continue;
       }
+
+      /* We can reply the status now. */
+      if (need_reply &&
+          semi_sync_replicator.slaveReply(&mysql->net, mi->master_log_name,
+                                          mi->master_log_pos)) {
+        sql_print_error("Reply semi-sync packet failed.");
+        goto err;
+      }
+
       /*
         If the log was not hot, we need to move to the next log in
         sequence. The next log could be hot or cold, we deal with both
diff -Nrup a/sql/sql_class.cc b/sql/sql_class.cc
--- a/sql/sql_class.cc	2008-02-27 00:26:41 +08:00
+++ b/sql/sql_class.cc	2008-03-13 16:22:07 +08:00
@@ -500,6 +500,7 @@ THD::THD()
    bootstrap(0),
    derived_tables_processing(FALSE),
    spcont(NULL),
+   semi_sync_slave(0),
    m_lip(NULL)
 {
   ulong tmp;
diff -Nrup a/sql/sql_class.h b/sql/sql_class.h
--- a/sql/sql_class.h	2008-03-05 16:44:43 +08:00
+++ b/sql/sql_class.h	2008-03-13 16:22:07 +08:00
@@ -1600,6 +1600,7 @@ public:
     KILLED_NO_VALUE      /* means neither of the states */
   };
   killed_state volatile killed;
+  bool       semi_sync_slave;
 
   /* scramble - random string sent to client on handshake */
   char	     scramble[SCRAMBLE_LENGTH+1];
diff -Nrup a/sql/sql_load.cc b/sql/sql_load.cc
--- a/sql/sql_load.cc	2008-02-19 20:45:17 +08:00
+++ b/sql/sql_load.cc	2008-03-13 16:22:07 +08:00
@@ -468,6 +468,9 @@ bool mysql_load(THD *thd,sql_exchange *e
 	  }
 	}
       }
+
+      if (transactional_table)
+        ha_autocommit_or_rollback(thd,error);
     }
 #endif /*!EMBEDDED_LIBRARY*/
     error= -1;				// Error on read
diff -Nrup a/sql/sql_parse.cc b/sql/sql_parse.cc
--- a/sql/sql_parse.cc	2008-02-27 00:26:41 +08:00
+++ b/sql/sql_parse.cc	2008-03-13 16:22:07 +08:00
@@ -28,6 +28,9 @@
 #include "events.h"
 #include "sql_trigger.h"
 
+#include "repl_semi_sync.h"
+
+
 /**
   @defgroup Runtime_Environment Runtime Environment
   @{
@@ -43,6 +46,8 @@
    (LP)->sql_command == SQLCOM_DROP_FUNCTION ? \
    "FUNCTION" : "PROCEDURE")
 
+extern ReplSemiSync semi_sync_replicator;
+
 static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables);
 static bool check_show_create_table_access(THD *thd, TABLE_LIST *table);
 
@@ -2037,6 +2042,10 @@ mysql_execute_command(THD *thd)
   case SQLCOM_SHOW_PROFILE:
   case SQLCOM_SELECT:
     thd->status_var.last_query_cost= 0.0;
+
+    if (lex->orig_sql_command == SQLCOM_SHOW_STATUS)
+      semi_sync_replicator.setExportStats();
+
     if (all_tables)
     {
       res= check_table_access(thd,
diff -Nrup a/sql/sql_repl.cc b/sql/sql_repl.cc
--- a/sql/sql_repl.cc	2008-02-19 20:58:00 +08:00
+++ b/sql/sql_repl.cc	2008-03-13 16:22:07 +08:00
@@ -21,6 +21,13 @@
 #include "log_event.h"
 #include "rpl_filter.h"
 #include <my_dir.h>
+#include "repl_semi_sync.h"
+
+#ifdef HAVE_INNOBASE_DB
+#include "ha_innodb.h"
+#endif
+
+extern ReplSemiSync semi_sync_replicator;
 
 int max_binlog_dump_events = 0; // unlimited
 my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -313,6 +320,44 @@ Increase max_allowed_packet on master";
   return error;
 }
 
+/* Show processlist command dump the binlog state.
+ * 
+ * Input:
+ *  output_info   -  (OUT) the output proc_info
+ *  output_len    -  (IN)  output proc_info's length
+ *  thd           -  (IN)  the thread
+ *  input_msg     -  (IN)  the input proc_info
+ *  log_file_name -  (IN)  binlog file name
+ *  log_pos       -  (IN)  binlog position
+ */
+static void processlist_show_binlog_state(char *output_info,
+                                          int   output_len,
+                                          THD  *thd,
+                                          const char *input_msg,
+                                          const char *log_file_name,
+                                          my_off_t log_pos) {
+  DBUG_ENTER("processlist_show_binlog_state");
+
+  /* Point to input_msg in case "show processlist" access it before the copy
+   * is finished.
+   */
+  thd->proc_info= input_msg;
+
+  if (snprintf(output_info, output_len, "%s :%s:%lld:", input_msg,
+               log_file_name + dirname_length(log_file_name),
+               log_pos) > 0) {
+    thd->proc_info= output_info;
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+static void repl_cleanup(ushort flags) {
+  if (flags & BINLOG_SEMI_SYNC) {
+    /* One less semi-sync client. */
+    thread_safe_sub(rpl_semi_sync_clients, 1, &LOCK_status);
+  }
+}
 
 /*
   TODO: Clean up loop to only have one call to send_file()
@@ -324,6 +369,22 @@ void mysql_binlog_send(THD* thd, char* l
   LOG_INFO linfo;
   char *log_file_name = linfo.log_file_name;
   char search_file_name[FN_REFLEN], *name;
+
+  /* This buffer should be enough for "comments + :file_name:file_pos:". */
+  char binlog_state_msg[FN_REFLEN + 100];
+  int  binlog_state_msg_len = FN_REFLEN + 100;
+  bool need_sync = false;
+
+  /* Whether the slave is doing semi-synchronous replication. */
+  thd->semi_sync_slave = (flags & BINLOG_SEMI_SYNC);
+  ReplSemiSync *semi_sync_repl = &semi_sync_replicator;
+
+  int ev_offset;
+  if (!thd->semi_sync_slave)
+    ev_offset = 1;
+  else
+    ev_offset = 3;
+
   IO_CACHE log;
   File file = -1;
   String* packet = &thd->packet;
@@ -339,6 +400,14 @@ void mysql_binlog_send(THD* thd, char* l
   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
   bzero((char*) &log,sizeof(log));
+  sql_print_information("Start %s binlog_dump to slave_server(%d), pos(%s, %lu)",
+                        thd->semi_sync_slave ? "semi-sync" : "asynchronous",
+                        thd->server_id, log_ident, (ulong)pos);
+
+  if (flags & BINLOG_SEMI_SYNC) {
+    /* One more semi-sync clients. */
+    thread_safe_increment(rpl_semi_sync_clients, &LOCK_status);
+  }
 
 #ifndef DBUG_OFF
   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -398,7 +467,7 @@ impossible position";
     We need to start a packet with something other than 255
     to distinguish it from error
   */
-  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+  semi_sync_repl->reserveSyncHeader(packet, thd);
 
   /*
     Tell the client about the log name with a fake Rotate event;
@@ -438,7 +507,7 @@ impossible position";
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
     goto err;
   }
-  packet->set("\0", 1, &my_charset_bin);
+  semi_sync_repl->reserveSyncHeader(packet, thd);
   /*
     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
     this larger than the corresponding packet (query) sent 
@@ -462,28 +531,28 @@ impossible position";
      {
        /*
          The packet has offsets equal to the normal offsets in a binlog
-         event +1 (the first character is \0).
+         event + ev_offset (the first character is \0).
        */
        DBUG_PRINT("info",
                   ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+1]));
-       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
-         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                        LOG_EVENT_BINLOG_IN_USE_F);
-         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
            should not increment master's binlog position
            (rli->group_master_log_pos)
          */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
          /*
            if reconnect master sends FD event with `created' as 0
            to avoid destroying temp tables.
           */
          int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+1, (ulong) 0);
+                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
          /* send it */
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
          {
@@ -510,7 +579,7 @@ impossible position";
        */
      }
      /* reset the packet as we wrote to it in any case */
-     packet->set("\0", 1, &my_charset_bin);
+     semi_sync_repl->reserveSyncHeader(packet, thd);
   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
   else
   {
@@ -522,6 +591,8 @@ impossible position";
 
   while (!net->error && net->vio != 0 && !thd->killed)
   {
+    Log_event_type event_type;
+
     while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
     {
 #ifndef DBUG_OFF
@@ -533,14 +604,31 @@ impossible position";
 	goto err;
       }
 #endif
+      /* Update the binlog sending state. */
+      processlist_show_binlog_state(
+                binlog_state_msg, binlog_state_msg_len, thd,
+                "Send binlog events to slave",
+                log_file_name, pos);
+
+      DBUG_PRINT("info", ("Send packet: %s: current log position %d",
+                          log_file_name, (ulong)my_b_tell(&log)));
+
+      pos = my_b_tell(&log);
+      if (semi_sync_repl->updateSyncHeader(
+            packet, log_file_name+dirname_length(log_file_name),
+            pos, thd, &need_sync, &event_type) != 0) {
+	errmsg = "Failed on update-1 semi-sync header";
+        my_errno = LOG_READ_MEM;
+	goto err;
+      }
 
-      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+      if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
-        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                       LOG_EVENT_BINLOG_IN_USE_F);
-        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
-      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+      else if (event_type == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
       if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
@@ -550,9 +638,8 @@ impossible position";
 	goto err;
       }
 
-      DBUG_PRINT("info", ("log event code %d",
-			  (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+      DBUG_PRINT("info", ("log event code %d", event_type));
+      if (event_type == LOAD_EVENT)
       {
 	if (send_file(thd))
 	{
@@ -561,7 +648,14 @@ impossible position";
 	  goto err;
 	}
       }
-      packet->set("\0", 1, &my_charset_bin);
+
+      if (need_sync &&
+          semi_sync_repl->readSlaveReply(thd, net, &errmsg,
+                                         &my_errno) != 0) {
+        goto err;
+      }
+
+      semi_sync_repl->reserveSyncHeader(packet, thd);
     }
 
     /*
@@ -639,8 +733,15 @@ impossible position";
 	  }
 	  if (!thd->killed)
 	  {
+            /* Update the binlog sending state. */
+            processlist_show_binlog_state(
+                binlog_state_msg, binlog_state_msg_len, thd,
+                "Has sent all binlog to slave; "
+                "waiting for binlog to be updated",
+                log_file_name, pos);
+
 	    /* Note that the following call unlocks lock_log */
-	    mysql_bin_log.wait_for_update(thd, 0);
+	    mysql_bin_log.wait_for_update(thd, binlog_state_msg);
 	  }
 	  else
 	    pthread_mutex_unlock(log_lock);
@@ -655,7 +756,21 @@ impossible position";
 
 	if (read_packet)
 	{
-	  thd_proc_info(thd, "Sending binlog event to slave");
+          /* Update the binlog sending state. */
+          processlist_show_binlog_state(binlog_state_msg,
+                                        binlog_state_msg_len, thd,
+                                        "Sending binlog event to slave",
+                                        log_file_name, pos);
+
+          pos = my_b_tell(&log);
+          if (semi_sync_repl->updateSyncHeader(
+                   packet, log_file_name+dirname_length(log_file_name),
+                   pos, thd, &need_sync, &event_type) != 0) {
+            errmsg = "Failed on update-2 semi-sync header";
+            my_errno = LOG_READ_MEM;
+            goto err;
+          }
+
 	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 	  {
 	    errmsg = "Failed on my_net_write()";
@@ -663,7 +778,7 @@ impossible position";
 	    goto err;
 	  }
 
-	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+	  if (event_type == LOAD_EVENT)
 	  {
 	    if (send_file(thd))
 	    {
@@ -672,11 +787,14 @@ impossible position";
 	      goto err;
 	    }
 	  }
-	  packet->set("\0", 1, &my_charset_bin);
-	  /*
-	    No need to net_flush because we will get to flush later when
-	    we hit EOF pretty quick
-	  */
+
+          if (need_sync &&
+              semi_sync_repl->readSlaveReply(thd, net, &errmsg,
+                                             &my_errno) != 0) {
+            goto err;
+          }
+
+          semi_sync_repl->reserveSyncHeader(packet, thd);
 	}
 
 	if (fatal_error)
@@ -690,10 +808,20 @@ impossible position";
     }
     else
     {
+      char old_log_file_name[FN_REFLEN];
       bool loop_breaker = 0;
       /* need this to break out of the for loop from switch */
 
-      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
+      /* Update the binlog sending state. */
+      processlist_show_binlog_state(
+          binlog_state_msg, binlog_state_msg_len, thd,
+          "Finished reading one binlog; switching to next binlog",
+          log_file_name, pos);
+
+      /* Keep the old fileename. */
+      strmake(old_log_file_name, log_file_name,
+              sizeof(old_log_file_name) - 1);
+
       switch (mysql_bin_log.find_next_log(&linfo, 1)) {
       case LOG_INFO_EOF:
 	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
@@ -712,6 +840,15 @@ impossible position";
       end_io_cache(&log);
       (void) my_close(file, MYF(MY_WME));
 
+      /* A sanity check that we can not serve the same binlog twice because
+       * the filenames are stored in a .index file.
+       */
+      if (strcmp(old_log_file_name, log_file_name) >= 0) {
+	errmsg = "Re-serving an already served binlog file.";
+	my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+	goto err;
+      }
+
       /*
         Call fake_rotate_event() in case the previous log (the one which
         we have just finished reading) did not contain a Rotate event
@@ -729,14 +866,20 @@ impossible position";
 	goto err;
       }
 
-      packet->length(0);
-      packet->append('\0');
+      DBUG_PRINT("info", ("Binlog filename: new binlog %s", log_file_name));
+      semi_sync_repl->reserveSyncHeader(packet, thd);
+
+      /* The log position has been reset to the beginning of the new file. */
+      pos = BIN_LOG_HEADER_SIZE;
     }
   }
 
 end:
+  sql_print_information("End binlog_dump successfully: %d", thd->server_id);
+
   end_io_cache(&log);
   (void)my_close(file, MYF(MY_WME));
+  repl_cleanup(flags);
 
   my_eof(thd);
   thd_proc_info(thd, "Waiting to finalize termination");
@@ -746,8 +889,11 @@ end:
   DBUG_VOID_RETURN;
 
 err:
+  sql_print_error(errmsg);
+  sql_print_error("End binlog_dump in error: %d", thd->server_id);
   thd_proc_info(thd, "Waiting to finalize termination");
   end_io_cache(&log);
+  repl_cleanup(flags);
   /*
     Exclude  iteration through thread list
     this is needed for purge_logs() - it will iterate through
@@ -1291,6 +1437,7 @@ int reset_master(THD* thd)
                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
     return 1;
   }
+
   return mysql_bin_log.reset_logs(thd);
 }
 
@@ -1777,5 +1924,3 @@ int init_replication_sys_vars()
 }
 
 #endif /* HAVE_REPLICATION */
-
-
diff -Nrup a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h
--- a/storage/innobase/handler/ha_innodb.h	2007-11-07 06:23:45 +08:00
+++ b/storage/innobase/handler/ha_innodb.h	2008-03-13 16:22:07 +08:00
@@ -31,6 +31,7 @@ typedef struct st_innobase_share {
   uint table_name_length,use_count;
 } INNOBASE_SHARE;
 
+typedef struct trx_struct trx_t;
 
 struct dict_index_struct;
 struct row_prebuilt_struct;
Thread
bk commit into 5.1 tree (hezx:1.2559)hezx13 Mar