List:Commits« Previous MessageNext Message »
From:He Zhenxing Date:September 26 2009 4:50am
Subject:bzr commit into mysql-5.1-rep-semisync branch (zhenxing.he:3108) Bug#39012
Bug#42244 Bug#44058 Bug#45672 Bug#45673 Bug#45819 Bug#45973 WL#1720 WL#439...
View as plain text  
#At file:///media/sdb2/hezx/work/mysql/bzrwork/semisync/5.1-rep-semisync/ based on revid:jperkin@stripped

 3108 He Zhenxing	2009-09-26
      Backporting WL#4398 WL#1720
      Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
      Backporting BUG#45819 BUG#45973 BUG#39012

    A  plugin/semisync/
    A  plugin/semisync/Makefile.am
    A  plugin/semisync/configure.in
    A  plugin/semisync/plug.in
    A  plugin/semisync/semisync.cc
    A  plugin/semisync/semisync.h
    A  plugin/semisync/semisync_master.cc
    A  plugin/semisync/semisync_master.h
    A  plugin/semisync/semisync_master_plugin.cc
    A  plugin/semisync/semisync_slave.cc
    A  plugin/semisync/semisync_slave.h
    A  plugin/semisync/semisync_slave_plugin.cc
    A  sql/replication.h
    A  sql/rpl_handler.cc
    A  sql/rpl_handler.h
    M  .bzr-mysql/default.conf
    M  include/mysql/plugin.h
    M  include/mysql/plugin.h.pp
    M  libmysqld/CMakeLists.txt
    M  libmysqld/Makefile.am
    M  mysql-test/mysql-test-run.pl
    M  mysql-test/suite/rpl/t/rpl000017.test
    M  sql/CMakeLists.txt
    M  sql/Makefile.am
    M  sql/handler.cc
    M  sql/log.cc
    M  sql/log.h
    M  sql/mysqld.cc
    M  sql/slave.cc
    M  sql/sql_class.cc
    M  sql/sql_class.h
    M  sql/sql_parse.cc
    M  sql/sql_plugin.cc
    M  sql/sql_plugin.h
    M  sql/sql_repl.cc
=== modified file '.bzr-mysql/default.conf'
--- a/.bzr-mysql/default.conf	2009-09-03 17:07:35 +0000
+++ b/.bzr-mysql/default.conf	2009-09-26 04:49:49 +0000
@@ -1,4 +1,4 @@
 [MYSQL]
 post_commit_to = "commits@stripped"
 post_push_to = "commits@stripped"
-tree_name = "mysql-5.1"
+tree_name = "mysql-5.1-rep-semisync"

=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h	2009-06-10 08:59:49 +0000
+++ b/include/mysql/plugin.h	2009-09-26 04:49:49 +0000
@@ -16,6 +16,11 @@
 #ifndef _my_plugin_h
 #define _my_plugin_h
 
+/* size_t */
+#include <stdlib.h>
+
+typedef struct st_mysql MYSQL;
+
 
 /*
   On Windows, exports from DLL need to be declared
@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID;
 #define MYSQL_FTPARSER_PLUGIN        2  /* Full-text parser plugin      */
 #define MYSQL_DAEMON_PLUGIN          3  /* The daemon/raw plugin type */
 #define MYSQL_INFORMATION_SCHEMA_PLUGIN  4  /* The I_S plugin type */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM    5  /* The number of plugin types   */
+#define MYSQL_REPLICATION_PLUGIN     5	/* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM    6  /* The number of plugin types   */
 
 /* We use the following strings to define licenses for plugins */
 #define PLUGIN_LICENSE_PROPRIETARY 0
@@ -650,6 +656,17 @@ struct st_mysql_information_schema
   int interface_version;
 };
 
+/*
+  API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+ #define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+ 
+ /**
+    Replication plugin descriptor
+ */
+ struct Mysql_replication {
+   int interface_version;
+ };
 
 /*
   st_mysql_value struct for reading values from mysqld.
@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL
                                    const char *key, unsigned int key_length,
                                    int using_trx);
 
+/**
+   Get the value of user variable as an integer.
+
+   This function will return the value of variable @a name as an
+   integer. If the original value of the variable is not an integer,
+   the value will be converted into an integer.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+
+/**
+   Get the value of user variable as a double precision float number.
+
+   This function will return the value of variable @a name as real
+   number. If the original value of the variable is not a real number,
+   the value will be converted into a real number.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+
+/**
+   Get the value of user variable as a string.
+
+   This function will return the value of variable @a name as
+   string. If the original value of the variable is not a string,
+   the value will be converted into a string.
+
+   @param name     user variable name
+   @param value    pointer to the value buffer
+   @param len      length of the value buffer
+   @param precision precision of the value if it is a float number
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);
+
+  
 #ifdef __cplusplus
 }
 #endif

=== modified file 'include/mysql/plugin.h.pp'
--- a/include/mysql/plugin.h.pp	2008-06-17 12:27:04 +0000
+++ b/include/mysql/plugin.h.pp	2009-09-26 04:49:49 +0000
@@ -1,3 +1,5 @@
+#include <stdlib.h>
+typedef struct st_mysql MYSQL;
 struct st_mysql_lex_string
 {
   char *str;
@@ -105,6 +107,9 @@ struct st_mysql_information_schema
 {
   int interface_version;
 };
+struct Mysql_replication {
+  int interface_version;
+};
 struct st_mysql_value
 {
   int (*value_type)(struct st_mysql_value *);
@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_
 void mysql_query_cache_invalidate4(void* thd,
                                    const char *key, unsigned int key_length,
                                    int using_trx);
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);

=== modified file 'libmysqld/CMakeLists.txt'
--- a/libmysqld/CMakeLists.txt	2009-06-10 08:59:49 +0000
+++ b/libmysqld/CMakeLists.txt	2009-09-26 04:49:49 +0000
@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm
            ../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc
            ../sql/partition_info.cc ../sql/sql_connect.cc 
            ../sql/scheduler.cc ../sql/event_parse_data.cc
+           ../sql/rpl_handler.cc
            ${GEN_SOURCES}
            ${LIB_SOURCES})
 

=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2009-07-31 19:28:15 +0000
+++ b/libmysqld/Makefile.am	2009-09-26 04:49:49 +0000
@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_co
 	rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
 	sql_tablespace.cc \
 	rpl_injector.cc my_user.c partition_info.cc \
-	sql_servers.cc event_parse_data.cc
+	sql_servers.cc event_parse_data.cc \
+	rpl_handler.cc
 
 libmysqld_int_a_SOURCES= $(libmysqld_sources)
 nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	2009-09-02 21:29:11 +0000
+++ b/mysql-test/mysql-test-run.pl	2009-09-26 04:49:49 +0000
@@ -1815,6 +1815,26 @@ sub environment_setup {
     $ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";";
   }
 
+  # --------------------------------------------------------------------------
+  # Add the path where mysqld will find semisync plugins
+  # --------------------------------------------------------------------------
+  my $lib_semisync_master_plugin=
+      mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_master.so");
+  my $lib_semisync_slave_plugin=
+      mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_slave.so");
+  if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin)
+  {
+    $ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin);
+    $ENV{'SEMISYNC_SLAVE_PLUGIN'}= basename($lib_semisync_slave_plugin);
+    $ENV{'SEMISYNC_PLUGIN_OPT'}= "--plugin-dir=".dirname($lib_semisync_master_plugin);
+  }
+  else
+  {
+    $ENV{'SEMISYNC_MASTER_PLUGIN'}= "";
+    $ENV{'SEMISYNC_SLAVE_PLUGIN'}= "";
+    $ENV{'SEMISYNC_PLUGIN_OPT'}="";
+  }
+
   # ----------------------------------------------------
   # Add the path where mysqld will find mypluglib.so
   # ----------------------------------------------------

=== modified file 'mysql-test/suite/rpl/t/rpl000017.test'
--- a/mysql-test/suite/rpl/t/rpl000017.test	2007-06-27 12:28:02 +0000
+++ b/mysql-test/suite/rpl/t/rpl000017.test	2009-09-26 04:49:49 +0000
@@ -6,6 +6,7 @@ grant replication slave on *.* to replic
 grant replication slave on *.* to replicate@stripped identified by 'aaaaaaaaaaaaaaab';
 connection slave;
 start slave;
+source include/wait_for_slave_to_start.inc;
 connection master;
 --disable_warnings
 drop table if exists t1;

=== added directory 'plugin/semisync'
=== added file 'plugin/semisync/Makefile.am'
--- a/plugin/semisync/Makefile.am	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/Makefile.am	2009-09-26 04:49:49 +0000
@@ -0,0 +1,35 @@
+# Copyright (C) 2006 MySQL AB
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+
+## Makefile.am for semi-synchronous replication
+
+pkgplugindir =		$(pkglibdir)/plugin
+INCLUDES =              -I$(top_srcdir)/include \
+			-I$(top_srcdir)/sql \
+			-I$(srcdir)
+
+noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
+
+pkgplugin_LTLIBRARIES =	libsemisync_master.la libsemisync_slave.la
+
+libsemisync_master_la_LDFLAGS =	-module
+libsemisync_master_la_CXXFLAGS=	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
+libsemisync_master_la_CFLAGS =	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
+libsemisync_master_la_SOURCES = semisync.cc semisync_master.cc semisync_master_plugin.cc
+
+libsemisync_slave_la_LDFLAGS =	-module
+libsemisync_slave_la_CXXFLAGS=	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
+libsemisync_slave_la_CFLAGS =	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
+libsemisync_slave_la_SOURCES = semisync.cc semisync_slave.cc semisync_slave_plugin.cc

=== added file 'plugin/semisync/configure.in'
--- a/plugin/semisync/configure.in	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/configure.in	2009-09-26 04:49:49 +0000
@@ -0,0 +1,9 @@
+# configure.in for semi-synchronous replication
+
+AC_INIT(mysql-semi-sync-plugin, 0.2)
+AM_INIT_AUTOMAKE
+AC_DISABLE_STATIC
+AC_PROG_LIBTOOL
+AC_CONFIG_FILES([Makefile])
+AC_OUTPUT
+

=== added file 'plugin/semisync/plug.in'
--- a/plugin/semisync/plug.in	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/plug.in	2009-09-26 04:49:49 +0000
@@ -0,0 +1,3 @@
+MYSQL_PLUGIN(semisync,[Semi-synchronous Replication Plugin],
+        [Semi-synchronous replication plugin.])
+MYSQL_PLUGIN_DYNAMIC(semisync,   [libsemisync_master.la libsemisync_slave.la])

=== added file 'plugin/semisync/semisync.cc'
--- a/plugin/semisync/semisync.cc	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,30 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#include "semisync.h"
+
+const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
+const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
+
+
+const unsigned long Trace::kTraceGeneral  = 0x0001;
+const unsigned long Trace::kTraceDetail   = 0x0010;
+const unsigned long Trace::kTraceNetWait  = 0x0020;
+const unsigned long Trace::kTraceFunction = 0x0040;
+
+const char  ReplSemiSyncBase::kSyncHeader[2] =
+  {ReplSemiSyncBase::kPacketMagicNum, 0};

=== added file 'plugin/semisync/semisync.h'
--- a/plugin/semisync/semisync.h	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync.h	2009-09-26 04:49:49 +0000
@@ -0,0 +1,95 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#ifndef SEMISYNC_H
+#define SEMISYNC_H
+
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <mysql.h>
+
+typedef uint32_t uint32;
+typedef unsigned long long my_off_t;
+#define FN_REFLEN	512	/* Max length of full path-name */
+void sql_print_error(const char *format, ...);
+void sql_print_warning(const char *format, ...);
+void sql_print_information(const char *format, ...);
+extern unsigned long max_connections;
+
+#define MYSQL_SERVER
+#define HAVE_REPLICATION
+#include <my_global.h>
+#include <my_pthread.h>
+#include <mysql/plugin.h>
+#include <replication.h>
+
+typedef struct st_mysql_show_var SHOW_VAR;
+typedef struct st_mysql_sys_var SYS_VAR;
+
+
+/**
+   This class is used to trace function calls and other process
+   information
+*/
+class Trace {
+public:
+  static const unsigned long kTraceFunction;
+  static const unsigned long kTraceGeneral;
+  static const unsigned long kTraceDetail;
+  static const unsigned long kTraceNetWait;
+
+  unsigned long           trace_level_;                      /* the level for tracing */
+
+  inline void function_enter(const char *func_name)
+  {
+    if (trace_level_ & kTraceFunction)
+      sql_print_information("---> %s enter", func_name);
+  }
+  inline int  function_exit(const char *func_name, int exit_code)
+  {
+    if (trace_level_ & kTraceFunction)
+      sql_print_information("<--- %s exit (%d)", func_name, exit_code);
+    return exit_code;
+  }
+
+  Trace()
+    :trace_level_(0L)
+  {}
+  Trace(unsigned long trace_level)
+    :trace_level_(trace_level)
+  {}
+};
+
+/**
+   Base class for semi-sync master and slave classes
+*/
+class ReplSemiSyncBase
+  :public Trace {
+public:
+  static const char  kSyncHeader[2];              /* three byte packet header */
+
+  /* Constants in network packet header. */
+  static const unsigned char kPacketMagicNum;
+  static const unsigned char kPacketFlagSync;
+};
+
+#endif /* SEMISYNC_H */

=== added file 'plugin/semisync/semisync_master.cc'
--- a/plugin/semisync/semisync_master.cc	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_master.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,1199 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#include "semisync_master.h"
+
+#define TIME_THOUSAND 1000
+#define TIME_MILLION  1000000
+#define TIME_BILLION  1000000000
+
+/* This indicates whether semi-synchronous replication is enabled. */
+char rpl_semi_sync_master_enabled;
+unsigned long rpl_semi_sync_master_timeout;
+unsigned long rpl_semi_sync_master_trace_level;
+unsigned long rpl_semi_sync_master_status           = 0;
+unsigned long rpl_semi_sync_master_yes_transactions = 0;
+unsigned long rpl_semi_sync_master_no_transactions  = 0;
+unsigned long rpl_semi_sync_master_off_times        = 0;
+unsigned long rpl_semi_sync_master_timefunc_fails   = 0;
+unsigned long rpl_semi_sync_master_num_timeouts     = 0;
+unsigned long rpl_semi_sync_master_wait_sessions    = 0;
+unsigned long rpl_semi_sync_master_back_wait_pos    = 0;
+unsigned long rpl_semi_sync_master_trx_wait_time    = 0;
+unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
+unsigned long rpl_semi_sync_master_net_wait_time    = 0;
+unsigned long long rpl_semi_sync_master_net_wait_num = 0;
+unsigned long rpl_semi_sync_master_clients          = 0;
+unsigned long long rpl_semi_sync_master_net_wait_total_time = 0;
+unsigned long long rpl_semi_sync_master_trx_wait_total_time = 0;
+
+
+static int getWaitTime(const struct timeval& start_tv);
+
+/*******************************************************************************
+ *
+ * <ActiveTranx> class : manage all active transaction nodes
+ *
+ ******************************************************************************/
+
+ActiveTranx::ActiveTranx(int max_connections,
+			 pthread_mutex_t *lock,
+			 unsigned long trace_level)
+  : Trace(trace_level), num_transactions_(max_connections),
+    num_entries_(max_connections << 1),
+    lock_(lock)
+{
+  /* Allocate the memory for the array */
+  node_array_ = new TranxNode[num_transactions_];
+  for (int idx = 0; idx < num_transactions_; ++idx)
+  {
+    node_array_[idx].log_pos_     = 0;
+    node_array_[idx].hash_next_   = NULL;
+    node_array_[idx].next_        = node_array_ + idx + 1;
+
+    node_array_[idx].log_name_    = new char[FN_REFLEN];
+    node_array_[idx].log_name_[0] = '\x0';
+  }
+  node_array_[num_transactions_-1].next_ = NULL;
+
+  /* All nodes in the array go to the pool initially. */
+  free_pool_ = node_array_;
+
+  /* No transactions are in the list initially. */
+  trx_front_ = NULL;
+  trx_rear_  = NULL;
+
+  /* Create the hash table to find a transaction's ending event. */
+  trx_htb_ = new TranxNode *[num_entries_];
+  for (int idx = 0; idx < num_entries_; ++idx)
+    trx_htb_[idx] = NULL;
+
+  sql_print_information("Semi-sync replication initialized for %d "
+                        "transactions.", num_transactions_);
+}
+
+ActiveTranx::~ActiveTranx()
+{
+  for (int idx = 0; idx < num_transactions_; ++idx)
+  {
+    delete [] node_array_[idx].log_name_;
+    node_array_[idx].log_name_ = NULL;
+  }
+
+  delete [] node_array_;
+  delete [] trx_htb_;
+
+  node_array_       = NULL;
+  trx_htb_          = NULL;
+  num_transactions_ = 0;
+  num_entries_      = 0;
+}
+
+unsigned int ActiveTranx::calc_hash(const unsigned char *key,
+                                    unsigned int length)
+{
+  unsigned int nr = 1, nr2 = 4;
+
+  /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
+  while (length--)
+  {
+    nr  ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
+    nr2 += 3;
+  }
+  return((unsigned int) nr);
+}
+
+unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
+				 my_off_t    log_file_pos)
+{
+  unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
+                                 strlen(log_file_name));
+  unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
+                                 sizeof(log_file_pos));
+
+  return (hash1 + hash2) % num_entries_;
+}
+
+ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node()
+{
+  TranxNode *ptr = free_pool_;
+
+  if (free_pool_)
+  {
+    free_pool_ = free_pool_->next_;
+    ptr->next_ = NULL;
+    ptr->hash_next_ = NULL;
+  }
+  else
+  {
+    /*
+      free_pool should never be NULL here, because we have
+      max_connections number of pre-allocated nodes.
+    */
+    sql_print_error("You have encountered a semi-sync bug (free_pool == NULL), "
+                    "please report to http://bugs.mysql.com");
+    assert(free_pool_);
+  }
+
+  return ptr;
+}
+
+int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
+			 const char *log_file_name2, my_off_t log_file_pos2)
+{
+  int cmp = strcmp(log_file_name1, log_file_name2);
+
+  if (cmp != 0)
+    return cmp;
+
+  if (log_file_pos1 > log_file_pos2)
+    return 1;
+  else if (log_file_pos1 < log_file_pos2)
+    return -1;
+  return 0;
+}
+
+int ActiveTranx::insert_tranx_node(const char *log_file_name,
+				   my_off_t log_file_pos)
+{
+  const char *kWho = "ActiveTranx:insert_tranx_node";
+  TranxNode  *ins_node;
+  int         result = 0;
+  unsigned int        hash_val;
+
+  function_enter(kWho);
+
+  ins_node = alloc_tranx_node();
+  if (!ins_node)
+  {
+    sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
+                    kWho, log_file_name, (unsigned long)log_file_pos);
+    result = -1;
+    goto l_end;
+  }
+
+  /* insert the binlog position in the active transaction list. */
+  strcpy(ins_node->log_name_, log_file_name);
+  ins_node->log_pos_ = log_file_pos;
+
+  if (!trx_front_)
+  {
+    /* The list is empty. */
+    trx_front_ = trx_rear_ = ins_node;
+  }
+  else
+  {
+    int cmp = compare(ins_node, trx_rear_);
+    if (cmp > 0)
+    {
+      /* Compare with the tail first.  If the transaction happens later in
+       * binlog, then make it the new tail.
+       */
+      trx_rear_->next_ = ins_node;
+      trx_rear_        = ins_node;
+    }
+    else
+    {
+      /* Otherwise, it is an error because the transaction should hold the
+       * mysql_bin_log.LOCK_log when appending events.
+       */
+      sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
+                      "new node (%s, %lu)", kWho,
+                      trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
+                      ins_node->log_name_, (unsigned long)ins_node->log_pos_);
+      result = -1;
+      goto l_end;
+    }
+  }
+
+  hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
+  ins_node->hash_next_ = trx_htb_[hash_val];
+  trx_htb_[hash_val]   = ins_node;
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
+                          ins_node->log_name_, (unsigned long)ins_node->log_pos_,
+                          hash_val);
+
+ l_end:
+  return function_exit(kWho, result);
+}
+
+bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
+				   my_off_t    log_file_pos)
+{
+  const char *kWho = "ActiveTranx::is_tranx_end_pos";
+  function_enter(kWho);
+
+  unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
+  TranxNode *entry = trx_htb_[hash_val];
+
+  while (entry != NULL)
+  {
+    if (compare(entry, log_file_name, log_file_pos) == 0)
+      break;
+
+    entry = entry->hash_next_;
+  }
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
+                          log_file_name, (unsigned long)log_file_pos, hash_val);
+
+  function_exit(kWho, (entry != NULL));
+  return (entry != NULL);
+}
+
+int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
+					  my_off_t log_file_pos)
+{
+  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
+  TranxNode *new_front;
+
+  function_enter(kWho);
+
+  if (log_file_name != NULL)
+  {
+    new_front = trx_front_;
+
+    while (new_front)
+    {
+      if (compare(new_front, log_file_name, log_file_pos) > 0)
+        break;
+      new_front = new_front->next_;
+    }
+  }
+  else
+  {
+    /* If log_file_name is NULL, clear everything. */
+    new_front = NULL;
+  }
+
+  if (new_front == NULL)
+  {
+    /* No active transaction nodes after the call. */
+
+    /* Clear the hash table. */
+    memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
+
+    /* Clear the active transaction list. */
+    if (trx_front_ != NULL)
+    {
+      trx_rear_->next_ = free_pool_;
+      free_pool_ = trx_front_;
+      trx_front_ = NULL;
+      trx_rear_  = NULL;
+    }
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: free all nodes back to free list", kWho);
+  }
+  else if (new_front != trx_front_)
+  {
+    TranxNode *curr_node, *next_node;
+
+    /* Delete all transaction nodes before the confirmation point. */
+    int n_frees = 0;
+    curr_node = trx_front_;
+    while (curr_node != new_front)
+    {
+      next_node = curr_node->next_;
+
+      /* Put the node in the memory pool. */
+      curr_node->next_ = free_pool_;
+      free_pool_       = curr_node;
+      n_frees++;
+
+      /* Remove the node from the hash table. */
+      unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
+      TranxNode **hash_ptr = &(trx_htb_[hash_val]);
+      while ((*hash_ptr) != NULL)
+      {
+        if ((*hash_ptr) == curr_node)
+	{
+          (*hash_ptr) = curr_node->hash_next_;
+          break;
+        }
+        hash_ptr = &((*hash_ptr)->hash_next_);
+      }
+
+      curr_node = next_node;
+    }
+
+    trx_front_ = new_front;
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: free %d nodes back until pos (%s, %lu)",
+                            kWho, n_frees,
+                            trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
+  }
+
+  return function_exit(kWho, 0);
+}
+
+
+/*******************************************************************************
+ *
+ * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
+ * <ReplSemiSyncSlave>  class: the basic code layer for sync-replication slave.
+ *
+ * The most important functions during semi-syn replication listed:
+ *
+ * Master:
+ *  . reportReplyBinlog(): called by the binlog dump thread when it receives
+ *                         the slave's status information.
+ *  . updateSyncHeader():  based on transaction waiting information, decide
+ *                         whether to request the slave to reply.
+ *  . writeTraxInBinlog(): called by the transaction thread when it finishes
+ *                         writing all transaction events in binlog.
+ *  . commitTrx():         transaction thread wait for the slave reply.
+ *
+ * Slave:
+ *  . slaveReadSyncHeader(): read the semi-sync header from the master, get the
+ *                         sync status and get the payload for events.
+ *  . slaveReply():        reply to the master about the replication progress.
+ *
+ ******************************************************************************/
+
+ReplSemiSyncMaster::ReplSemiSyncMaster()
+  : active_tranxs_(NULL),
+    init_done_(false),
+    reply_file_name_inited_(false),
+    reply_file_pos_(0L),
+    wait_file_name_inited_(false),
+    wait_file_pos_(0),
+    master_enabled_(false),
+    wait_timeout_(0L),
+    state_(0),
+    enabled_transactions_(0),
+    disabled_transactions_(0),
+    switched_off_times_(0),
+    timefunc_fails_(0),
+    wait_sessions_(0),
+    wait_backtraverse_(0),
+    total_trx_wait_num_(0),
+    total_trx_wait_time_(0),
+    total_net_wait_num_(0),
+    total_net_wait_time_(0),
+    max_transactions_(0L)
+{
+  strcpy(reply_file_name_, "");
+  strcpy(wait_file_name_, "");
+}
+
+int ReplSemiSyncMaster::initObject()
+{
+  int result;
+  const char *kWho = "ReplSemiSyncMaster::initObject";
+
+  if (init_done_)
+  {
+    fprintf(stderr, "%s called twice\n", kWho);
+    return 1;
+  }
+  init_done_ = true;
+
+  /* References to the parameter works after set_options(). */
+  setWaitTimeout(rpl_semi_sync_master_timeout);
+  setTraceLevel(rpl_semi_sync_master_trace_level);
+  max_transactions_ = (int)max_connections;
+
+  /* Mutex initialization can only be done after MY_INIT(). */
+  pthread_mutex_init(&LOCK_binlog_, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_binlog_send_, NULL);
+
+  if (rpl_semi_sync_master_enabled)
+    result = enableMaster();
+  else
+    result = disableMaster();
+
+  return result;
+}
+
+int ReplSemiSyncMaster::enableMaster()
+{
+  int result = 0;
+
+  /* Must have the lock when we do enable of disable. */
+  lock();
+
+  if (!getMasterEnabled())
+  {
+    active_tranxs_ = new ActiveTranx(max_connections,
+				     &LOCK_binlog_,
+				     trace_level_);
+    if (active_tranxs_ != NULL)
+    {
+      commit_file_name_inited_ = false;
+      reply_file_name_inited_  = false;
+      wait_file_name_inited_   = false;
+
+      set_master_enabled(true);
+      state_ = true;
+      sql_print_information("Semi-sync replication enabled on the master.");
+    }
+    else
+    {
+      sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
+      result = -1;
+    }
+  }
+
+  unlock();
+
+  return result;
+}
+
+int ReplSemiSyncMaster::disableMaster()
+{
+  /* Must have the lock when we do enable of disable. */
+  lock();
+
+  if (getMasterEnabled())
+  {
+    /* Switch off the semi-sync first so that waiting transaction will be
+     * waken up.
+     */
+    switch_off();
+
+    assert(active_tranxs_ != NULL);
+    delete active_tranxs_;
+    active_tranxs_ = NULL;
+
+    reply_file_name_inited_ = false;
+    wait_file_name_inited_  = false;
+    commit_file_name_inited_ = false;
+
+    set_master_enabled(false);
+    sql_print_information("Semi-sync replication disabled on the master.");
+  }
+
+  unlock();
+
+  return 0;
+}
+
+ReplSemiSyncMaster::~ReplSemiSyncMaster()
+{
+  if (init_done_)
+  {
+    pthread_mutex_destroy(&LOCK_binlog_);
+    pthread_cond_destroy(&COND_binlog_send_);
+  }
+
+  delete active_tranxs_;
+}
+
+void ReplSemiSyncMaster::lock()
+{
+  pthread_mutex_lock(&LOCK_binlog_);
+}
+
+void ReplSemiSyncMaster::unlock()
+{
+  pthread_mutex_unlock(&LOCK_binlog_);
+}
+
+void ReplSemiSyncMaster::cond_broadcast()
+{
+  pthread_cond_broadcast(&COND_binlog_send_);
+}
+
+int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
+{
+  const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
+  int wait_res;
+
+  function_enter(kWho);
+  wait_res = pthread_cond_timedwait(&COND_binlog_send_,
+                                    &LOCK_binlog_, wait_time);
+  return function_exit(kWho, wait_res);
+}
+
+void ReplSemiSyncMaster::add_slave()
+{
+  lock();
+  rpl_semi_sync_master_clients++;
+  unlock();
+}
+
+void ReplSemiSyncMaster::remove_slave()
+{
+  lock();
+  rpl_semi_sync_master_clients--;
+  unlock();
+}
+
+bool ReplSemiSyncMaster::is_semi_sync_slave()
+{
+  int null_value;
+  long long val= 0;
+  get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
+  return val;
+}
+
+int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
+{
+  char log_name[FN_REFLEN];
+  char *endptr;
+  my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
+  if (!log_pos || !endptr || *endptr != ':' )
+    return 1;
+  endptr++;                                     // skip the ':' seperator
+  strncpy(log_name, endptr, FN_REFLEN);
+  uint32 server_id= 0;
+  return reportReplyBinlog(server_id, log_name, log_pos);
+}
+
+int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
+					  const char *log_file_name,
+					  my_off_t log_file_pos)
+{
+  const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
+  int   cmp;
+  bool  can_release_threads = false;
+  bool  need_copy_send_pos = true;
+
+  if (!(getMasterEnabled()))
+    return 0;
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled())
+    goto l_end;
+
+  if (!is_on())
+    /* We check to see whether we can switch semi-sync ON. */
+    try_switch_on(server_id, log_file_name, log_file_pos);
+
+  /* The position should increase monotonically, if there is only one
+   * thread sending the binlog to the slave.
+   * In reality, to improve the transaction availability, we allow multiple
+   * sync replication slaves.  So, if any one of them get the transaction,
+   * the transaction session in the primary can move forward.
+   */
+  if (reply_file_name_inited_)
+  {
+    cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                               reply_file_name_, reply_file_pos_);
+
+    /* If the requested position is behind the sending binlog position,
+     * would not adjust sending binlog position.
+     * We based on the assumption that there are multiple semi-sync slave,
+     * and at least one of them shou/ld be up to date.
+     * If all semi-sync slaves are behind, at least initially, the primary
+     * can find the situation after the waiting timeout.  After that, some
+     * slaves should catch up quickly.
+     */
+    if (cmp < 0)
+    {
+      /* If the position is behind, do not copy it. */
+      need_copy_send_pos = false;
+    }
+  }
+
+  if (need_copy_send_pos)
+  {
+    strcpy(reply_file_name_, log_file_name);
+    reply_file_pos_ = log_file_pos;
+    reply_file_name_inited_ = true;
+
+    /* Remove all active transaction nodes before this point. */
+    assert(active_tranxs_ != NULL);
+    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: Got reply at (%s, %lu)", kWho,
+                            log_file_name, (unsigned long)log_file_pos);
+  }
+
+  if (wait_sessions_ > 0)
+  {
+    /* Let us check if some of the waiting threads doing a trx
+     * commit can now proceed.
+     */
+    cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+                               wait_file_name_, wait_file_pos_);
+    if (cmp >= 0)
+    {
+      /* Yes, at least one waiting thread can now proceed:
+       * let us release all waiting threads with a broadcast
+       */
+      can_release_threads = true;
+      wait_file_name_inited_ = false;
+    }
+  }
+
+ l_end:
+  unlock();
+
+  if (can_release_threads)
+  {
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: signal all waiting threads.", kWho);
+
+    cond_broadcast();
+  }
+
+  return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
+				  my_off_t trx_wait_binlog_pos)
+{
+  const char *kWho = "ReplSemiSyncMaster::commitTrx";
+
+  function_enter(kWho);
+
+  if (getMasterEnabled() && trx_wait_binlog_name)
+  {
+    struct timeval start_tv;
+    struct timespec abstime;
+    int wait_result, start_time_err;
+    const char *old_msg= 0;
+
+    start_time_err = gettimeofday(&start_tv, 0);
+
+    /* Acquire the mutex. */
+    lock();
+
+    /* This must be called after acquired the lock */
+    old_msg= thd_enter_cond(NULL, &COND_binlog_send_, &LOCK_binlog_,
+                            "Waiting for semi-sync ACK from slave");
+
+    /* This is the real check inside the mutex. */
+    if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients)
+      goto l_end;
+
+    if (trace_level_ & kTraceDetail)
+    {
+      sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
+                            trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
+                            (int)is_on());
+    }
+
+    while (is_on())
+    {
+      int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+                                     trx_wait_binlog_name, trx_wait_binlog_pos);
+      if (cmp >= 0)
+      {
+        /* We have already sent the relevant binlog to the slave: no need to
+         * wait here.
+         */
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
+                                kWho, reply_file_name_, (unsigned long)reply_file_pos_);
+        break;
+      }
+
+      /* Let us update the info about the minimum binlog position of waiting
+       * threads.
+       */
+      if (wait_file_name_inited_)
+      {
+        cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
+                                   wait_file_name_, wait_file_pos_);
+        if (cmp <= 0)
+	{
+          /* This thd has a lower position, let's update the minimum info. */
+          strcpy(wait_file_name_, trx_wait_binlog_name);
+          wait_file_pos_ = trx_wait_binlog_pos;
+
+          wait_backtraverse_++;
+          if (trace_level_ & kTraceDetail)
+            sql_print_information("%s: move back wait position (%s, %lu),",
+                                  kWho, wait_file_name_, (unsigned long)wait_file_pos_);
+        }
+      }
+      else
+      {
+        strcpy(wait_file_name_, trx_wait_binlog_name);
+        wait_file_pos_ = trx_wait_binlog_pos;
+        wait_file_name_inited_ = true;
+
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: init wait position (%s, %lu),",
+                                kWho, wait_file_name_, (unsigned long)wait_file_pos_);
+      }
+
+      if (start_time_err == 0)
+      {
+        int diff_usecs = start_tv.tv_usec + wait_timeout_ * TIME_THOUSAND;
+
+        /* Calcuate the waiting period. */
+        abstime.tv_sec = start_tv.tv_sec;
+        if (diff_usecs < TIME_MILLION)
+	{
+          abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
+        }
+	else
+	{
+          while (diff_usecs >= TIME_MILLION)
+	  {
+            abstime.tv_sec++;
+            diff_usecs -= TIME_MILLION;
+          }
+          abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
+        }
+
+        /* In semi-synchronous replication, we wait until the binlog-dump
+         * thread has received the reply on the relevant binlog segment from the
+         * replication slave.
+         *
+         * Let us suspend this thread to wait on the condition;
+         * when replication has progressed far enough, we will release
+         * these waiting threads.
+         */
+        wait_sessions_++;
+
+        if (trace_level_ & kTraceDetail)
+          sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
+                                kWho, wait_timeout_,
+                                wait_file_name_, (unsigned long)wait_file_pos_);
+
+        wait_result = cond_timewait(&abstime);
+        wait_sessions_--;
+
+        if (wait_result != 0)
+	{
+          /* This is a real wait timeout. */
+          sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
+                            "semi-sync up to file %s, position %lu.",
+                            trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
+                            reply_file_name_, (unsigned long)reply_file_pos_);
+          total_wait_timeouts_++;
+
+          /* switch semi-sync off */
+          switch_off();
+        }
+	else
+	{
+          int wait_time;
+
+          wait_time = getWaitTime(start_tv);
+          if (wait_time < 0)
+	  {
+            if (trace_level_ & kTraceGeneral)
+	    {
+              /* This is a time/gettimeofday function call error. */
+              sql_print_error("Replication semi-sync gettimeofday fail1 at "
+                              "wait position (%s, %lu)",
+                              trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
+            }
+            timefunc_fails_++;
+          }
+	  else
+	  {
+            total_trx_wait_num_++;
+            total_trx_wait_time_ += wait_time;
+          }
+        }
+      }
+      else
+      {
+        if (trace_level_ & kTraceGeneral)
+	{
+          /* This is a gettimeofday function call error. */
+          sql_print_error("Replication semi-sync gettimeofday fail2 at "
+                          "wait position (%s, %lu)",
+                          trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
+        }
+        timefunc_fails_++;
+
+        /* switch semi-sync off */
+        switch_off();
+      }
+    }
+
+  l_end:
+    /* Update the status counter. */
+    if (is_on() && rpl_semi_sync_master_clients)
+      enabled_transactions_++;
+    else
+      disabled_transactions_++;
+
+    /* The lock held will be released by thd_exit_cond, so no need to
+       call unlock() here */
+    thd_exit_cond(NULL, old_msg);
+  }
+
+  return function_exit(kWho, 0);
+}
+
+/* Indicate that semi-sync replication is OFF now.
+ * 
+ * What should we do when it is disabled?  The problem is that we want
+ * the semi-sync replication enabled again when the slave catches up
+ * later.  But, it is not that easy to detect that the slave has caught
+ * up.  This is caused by the fact that MySQL's replication protocol is
+ * asynchronous, meaning that if the master does not use the semi-sync
+ * protocol, the slave would not send anything to the master.
+ * Still, if the master is sending (N+1)-th event, we assume that it is
+ * an indicator that the slave has received N-th event and earlier ones.
+ *
+ * If semi-sync is disabled, all transactions still update the wait
+ * position with the last position in binlog.  But no transactions will
+ * wait for confirmations and the active transaction list would not be
+ * maintained.  In binlog dump thread, updateSyncHeader() checks whether
+ * the current sending event catches up with last wait position.  If it
+ * does match, semi-sync will be switched on again.
+ */
+int ReplSemiSyncMaster::switch_off()
+{
+  const char *kWho = "ReplSemiSyncMaster::switch_off";
+  int result;
+
+  function_enter(kWho);
+  state_ = false;
+
+  /* Clear the active transaction list. */
+  assert(active_tranxs_ != NULL);
+  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
+
+  switched_off_times_++;
+  wait_file_name_inited_   = false;
+  reply_file_name_inited_  = false;
+  sql_print_information("Semi-sync replication switched OFF.");
+  cond_broadcast();                            /* wake up all waiting threads */
+
+  return function_exit(kWho, result);
+}
+
+int ReplSemiSyncMaster::try_switch_on(int server_id,
+				      const char *log_file_name,
+				      my_off_t log_file_pos)
+{
+  const char *kWho = "ReplSemiSyncMaster::try_switch_on";
+  bool semi_sync_on = false;
+
+  function_enter(kWho);
+
+  /* If the current sending event's position is larger than or equal to the
+   * 'largest' commit transaction binlog position, the slave is already
+   * catching up now and we can switch semi-sync on here.
+   * If commit_file_name_inited_ indicates there are no recent transactions,
+   * we can enable semi-sync immediately.
+   */
+  if (commit_file_name_inited_)
+  {
+    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                   commit_file_name_, commit_file_pos_);
+    semi_sync_on = (cmp >= 0);
+  }
+  else
+  {
+    semi_sync_on = true;
+  }
+
+  if (semi_sync_on)
+  {
+    /* Switch semi-sync replication on. */
+    state_ = true;
+
+    sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
+                          "at (%s, %lu)",
+                          server_id, log_file_name,
+                          (unsigned long)log_file_pos);
+  }
+
+  return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
+					  unsigned long size)
+{
+  const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
+  function_enter(kWho);
+
+  int hlen=0;
+  if (!is_semi_sync_slave())
+  {
+    hlen= 0;
+  }
+  else
+  {
+    /* No enough space for the extra header, disable semi-sync master */
+    if (sizeof(kSyncHeader) > size)
+    {
+      sql_print_warning("No enough space in the packet "
+                        "for semi-sync extra header, "
+                        "semi-sync replication disabled");
+      disableMaster();
+      return 0;
+    }
+    
+    /* Set the magic number and the sync status.  By default, no sync
+     * is required.
+     */
+    memcpy(header, kSyncHeader, sizeof(kSyncHeader));
+    hlen= sizeof(kSyncHeader);
+  }
+  return function_exit(kWho, hlen);
+}
+
+int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
+					 const char *log_file_name,
+					 my_off_t log_file_pos,
+					 uint32 server_id)
+{
+  const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
+  int  cmp = 0;
+  bool sync = false;
+
+  /* If the semi-sync master is not enabled, or the slave is not a semi-sync
+   * target, do not request replies from the slave.
+   */
+  if (!getMasterEnabled() || !is_semi_sync_slave())
+  {
+    sync = false;
+    return 0;
+  }
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled())
+  {
+    sync = false;
+    goto l_end;
+  }
+
+  if (is_on())
+  {
+    /* semi-sync is ON */
+    sync = false;     /* No sync unless a transaction is involved. */
+
+    if (reply_file_name_inited_)
+    {
+      cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                 reply_file_name_, reply_file_pos_);
+      if (cmp <= 0)
+      {
+        /* If we have already got the reply for the event, then we do
+         * not need to sync the transaction again.
+         */
+        goto l_end;
+      }
+    }
+
+    if (wait_file_name_inited_)
+    {
+      cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                 wait_file_name_, wait_file_pos_);
+    }
+    else
+    {
+      cmp = 1;
+    }
+    
+    /* If we are already waiting for some transaction replies which
+     * are later in binlog, do not wait for this one event.
+     */
+    if (cmp >= 0)
+    {
+      /* 
+       * We only wait if the event is a transaction's ending event.
+       */
+      assert(active_tranxs_ != NULL);
+      sync = active_tranxs_->is_tranx_end_pos(log_file_name,
+                                               log_file_pos);
+    }
+  }
+  else
+  {
+    if (commit_file_name_inited_)
+    {
+      int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                     commit_file_name_, commit_file_pos_);
+      sync = (cmp >= 0);
+    }
+    else
+    {
+      sync = true;
+    }
+  }
+
+  if (trace_level_ & kTraceDetail)
+    sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
+                          kWho, server_id, log_file_name,
+                          (unsigned long)log_file_pos, sync, (int)is_on());
+
+ l_end:
+  unlock();
+
+  /* We do not need to clear sync flag because we set it to 0 when we
+   * reserve the packet header.
+   */
+  if (sync)
+    (packet)[2] = kPacketFlagSync;
+
+  return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
+					   my_off_t log_file_pos)
+{
+  const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
+  int result = 0;
+
+  function_enter(kWho);
+
+  lock();
+
+  /* This is the real check inside the mutex. */
+  if (!getMasterEnabled())
+    goto l_end;
+
+  /* Update the 'largest' transaction commit position seen so far even
+   * though semi-sync is switched off.
+   * It is much better that we update commit_file_* here, instead of
+   * inside commitTrx().  This is mostly because updateSyncHeader()
+   * will watch for commit_file_* to decide whether to switch semi-sync
+   * on. The detailed reason is explained in function updateSyncHeader().
+   */
+  if (commit_file_name_inited_)
+  {
+    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+                                   commit_file_name_, commit_file_pos_);
+    if (cmp > 0)
+    {
+      /* This is a larger position, let's update the maximum info. */
+      strcpy(commit_file_name_, log_file_name);
+      commit_file_pos_ = log_file_pos;
+    }
+  }
+  else
+  {
+    strcpy(commit_file_name_, log_file_name);
+    commit_file_pos_ = log_file_pos;
+    commit_file_name_inited_ = true;
+  }
+
+  if (is_on() && rpl_semi_sync_master_clients)
+  {
+    assert(active_tranxs_ != NULL);
+    if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
+    {
+      /*
+        if insert tranx_node failed, print a warning message
+        and turn off semi-sync
+      */
+      sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul",
+                        log_file_name, log_file_pos);
+      switch_off();
+    }
+  }
+
+ l_end:
+  unlock();
+
+  return function_exit(kWho, result);
+}
+
+int ReplSemiSyncMaster::resetMaster()
+{
+  const char *kWho = "ReplSemiSyncMaster::resetMaster";
+  int result = 0;
+
+  function_enter(kWho);
+
+
+  lock();
+
+  state_ = getMasterEnabled()? 1 : 0;
+
+  wait_file_name_inited_   = false;
+  reply_file_name_inited_  = false;
+  commit_file_name_inited_ = false;
+
+  enabled_transactions_ = 0;
+  disabled_transactions_ = 0;
+  switched_off_times_ = 0;
+  timefunc_fails_ = 0;
+  wait_sessions_ = 0;
+  wait_backtraverse_ = 0;
+  total_trx_wait_num_ = 0;
+  total_trx_wait_time_ = 0;
+  total_net_wait_num_ = 0;
+  total_net_wait_time_ = 0;
+
+  unlock();
+
+  return function_exit(kWho, result);
+}
+
+void ReplSemiSyncMaster::setExportStats()
+{
+  lock();
+
+  rpl_semi_sync_master_status           = state_ && rpl_semi_sync_master_clients;
+  rpl_semi_sync_master_yes_transactions = enabled_transactions_;
+  rpl_semi_sync_master_no_transactions  = disabled_transactions_;
+  rpl_semi_sync_master_off_times        = switched_off_times_;
+  rpl_semi_sync_master_timefunc_fails   = timefunc_fails_;
+  rpl_semi_sync_master_num_timeouts     = total_wait_timeouts_;
+  rpl_semi_sync_master_wait_sessions    = wait_sessions_;
+  rpl_semi_sync_master_back_wait_pos    = wait_backtraverse_;
+  rpl_semi_sync_master_trx_wait_num     = total_trx_wait_num_;
+  rpl_semi_sync_master_trx_wait_time    =
+    ((total_trx_wait_num_) ?
+     (unsigned long)((double)total_trx_wait_time_ /
+             ((double)total_trx_wait_num_)) : 0);
+  rpl_semi_sync_master_net_wait_num     = total_net_wait_num_;
+  rpl_semi_sync_master_net_wait_time    =
+    ((total_net_wait_num_) ?
+     (unsigned long)((double)total_net_wait_time_ /
+             ((double)total_net_wait_num_)) : 0);
+
+  rpl_semi_sync_master_net_wait_total_time = total_net_wait_time_;
+  rpl_semi_sync_master_trx_wait_total_time = total_trx_wait_time_;
+
+  unlock();
+}
+
+/* Get the waiting time given the wait's staring time.
+ * 
+ * Return:
+ *  >= 0: the waiting time in microsecons(us)
+ *   < 0: error in gettimeofday or time back traverse
+ */
+static int getWaitTime(const struct timeval& start_tv)
+{
+  unsigned long long start_usecs, end_usecs;
+  struct timeval end_tv;
+  int end_time_err;
+
+  /* Starting time in microseconds(us). */
+  start_usecs = start_tv.tv_sec * TIME_MILLION + start_tv.tv_usec;
+
+  /* Get the wait time interval. */
+  end_time_err = gettimeofday(&end_tv, 0);
+
+  /* Ending time in microseconds(us). */
+  end_usecs = end_tv.tv_sec * TIME_MILLION + end_tv.tv_usec;
+
+  if (end_time_err != 0 || end_usecs < start_usecs)
+    return -1;
+
+  return (int)(end_usecs - start_usecs);
+}

=== added file 'plugin/semisync/semisync_master.h'
--- a/plugin/semisync/semisync_master.h	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_master.h	2009-09-26 04:49:49 +0000
@@ -0,0 +1,366 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#ifndef SEMISYNC_MASTER_H
+#define SEMISYNC_MASTER_H
+
+#include "semisync.h"
+
+/**
+   This class manages memory for active transaction list.
+
+   We record each active transaction with a TranxNode.  Because each
+   session can only have only one open transaction, the total active
+   transaction nodes can not exceed the maximum sessions.  Currently
+   in MySQL, sessions are the same as connections.
+*/
+class ActiveTranx
+  :public Trace {
+private:
+  struct TranxNode {
+    char             *log_name_;
+    my_off_t          log_pos_;
+    struct TranxNode *next_;            /* the next node in the sorted list */
+    struct TranxNode *hash_next_;    /* the next node during hash collision */
+  };
+
+  /* The following data structure maintains an active transaction list. */
+  TranxNode       *node_array_;
+  TranxNode       *free_pool_;
+
+  /* These two record the active transaction list in sort order. */
+  TranxNode       *trx_front_, *trx_rear_;
+
+  TranxNode      **trx_htb_;        /* A hash table on active transactions. */
+
+  int              num_transactions_;               /* maximum transactions */
+  int              num_entries_;              /* maximum hash table entries */
+  pthread_mutex_t *lock_;                                     /* mutex lock */
+
+  inline void assert_lock_owner();
+
+  inline TranxNode* alloc_tranx_node();
+
+  inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
+  unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
+
+  int compare(const char *log_file_name1, my_off_t log_file_pos1,
+	      const TranxNode *node2) {
+    return compare(log_file_name1, log_file_pos1,
+		   node2->log_name_, node2->log_pos_);
+  }
+  int compare(const TranxNode *node1,
+	      const char *log_file_name2, my_off_t log_file_pos2) {
+    return compare(node1->log_name_, node1->log_pos_,
+		   log_file_name2, log_file_pos2);
+  }
+  int compare(const TranxNode *node1, const TranxNode *node2) {
+    return compare(node1->log_name_, node1->log_pos_,
+		   node2->log_name_, node2->log_pos_);
+  }
+
+public:
+  ActiveTranx(int max_connections, pthread_mutex_t *lock,
+	      unsigned long trace_level);
+  ~ActiveTranx();
+
+  /* Insert an active transaction node with the specified position.
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
+
+  /* Clear the active transaction nodes until(inclusive) the specified
+   * position.
+   * If log_file_name is NULL, everything will be cleared: the sorted
+   * list and the hash table will be reset to empty.
+   * 
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int clear_active_tranx_nodes(const char *log_file_name,
+			       my_off_t    log_file_pos);
+
+  /* Given a position, check to see whether the position is an active
+   * transaction's ending position by probing the hash table.
+   */
+  bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+
+  /* Given two binlog positions, compare which one is bigger based on
+   * (file_name, file_position).
+   */
+  static int compare(const char *log_file_name1, my_off_t log_file_pos1,
+		     const char *log_file_name2, my_off_t log_file_pos2);
+
+};
+
+/**
+   The extension class for the master of semi-synchronous replication
+*/
+class ReplSemiSyncMaster
+  :public ReplSemiSyncBase {
+ private:
+  ActiveTranx    *active_tranxs_;  /* active transaction list: the list will
+                                      be cleared when semi-sync switches off. */
+
+  /* True when initObject has been called */
+  bool init_done_;
+
+  /* This cond variable is signaled when enough binlog has been sent to slave,
+   * so that a waiting trx can return the 'ok' to the client for a commit.
+   */
+  pthread_cond_t  COND_binlog_send_;
+
+  /* Mutex that protects the following state variables and the active
+   * transaction list.
+   * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
+   * already holding LOCK_binlog_ because it can cause deadlocks.
+   */
+  pthread_mutex_t LOCK_binlog_;
+
+  /* This is set to true when reply_file_name_ contains meaningful data. */
+  bool            reply_file_name_inited_;
+
+  /* The binlog name up to which we have received replies from any slaves. */
+  char            reply_file_name_[FN_REFLEN];
+
+  /* The position in that file up to which we have the reply from any slaves. */
+  my_off_t        reply_file_pos_;
+
+  /* This is set to true when we know the 'smallest' wait position. */
+  bool            wait_file_name_inited_;
+
+  /* NULL, or the 'smallest' filename that a transaction is waiting for
+   * slave replies.
+   */
+  char            wait_file_name_[FN_REFLEN];
+
+  /* The smallest position in that file that a trx is waiting for: the trx
+   * can proceed and send an 'ok' to the client when the master has got the
+   * reply from the slave indicating that it already got the binlog events.
+   */
+  my_off_t        wait_file_pos_;
+
+  /* This is set to true when we know the 'largest' transaction commit
+   * position in the binlog file.
+   * We always maintain the position no matter whether semi-sync is switched
+   * on switched off.  When a transaction wait timeout occurs, semi-sync will
+   * switch off.  Binlog-dump thread can use the three fields to detect when
+   * slaves catch up on replication so that semi-sync can switch on again.
+   */
+  bool            commit_file_name_inited_;
+
+  /* The 'largest' binlog filename that a commit transaction is seeing.       */
+  char            commit_file_name_[FN_REFLEN];
+
+  /* The 'largest' position in that file that a commit transaction is seeing. */
+  my_off_t        commit_file_pos_;
+
+  /* All global variables which can be set by parameters. */
+  volatile bool            master_enabled_;      /* semi-sync is enabled on the master */
+  unsigned long           wait_timeout_;      /* timeout period(ms) during tranx wait */
+
+  /* All status variables. */
+  bool            state_;                    /* whether semi-sync is switched */
+  unsigned long           enabled_transactions_;          /* semi-sync'ed tansactions */
+  unsigned long           disabled_transactions_;     /* non-semi-sync'ed tansactions */
+  unsigned long           switched_off_times_;    /* how many times are switched off? */
+  unsigned long           timefunc_fails_;           /* how many time function fails? */
+  unsigned long           total_wait_timeouts_;      /* total number of wait timeouts */
+  unsigned long           wait_sessions_;      /* how many sessions wait for replies? */
+  unsigned long           wait_backtraverse_;         /* wait position back traverses */
+  unsigned long long       total_trx_wait_num_;   /* total trx waits: non-timeout ones */
+  unsigned long long       total_trx_wait_time_;         /* total trx wait time: in us */
+  unsigned long long       total_net_wait_num_;                 /* total network waits */
+  unsigned long long       total_net_wait_time_;            /* total network wait time */
+
+  /* The number of maximum active transactions.  This should be the same as
+   * maximum connections because MySQL does not do connection sharing now.
+   */
+  int             max_transactions_;
+
+  void lock();
+  void unlock();
+  void cond_broadcast();
+  int  cond_timewait(struct timespec *wait_time);
+
+  /* Is semi-sync replication on? */
+  bool is_on() {
+    return (state_);
+  }
+
+  void set_master_enabled(bool enabled) {
+    master_enabled_ = enabled;
+  }
+
+  /* Switch semi-sync off because of timeout in transaction waiting. */
+  int switch_off();
+
+  /* Switch semi-sync on when slaves catch up. */
+  int try_switch_on(int server_id,
+                    const char *log_file_name, my_off_t log_file_pos);
+
+ public:
+  ReplSemiSyncMaster();
+  ~ReplSemiSyncMaster();
+
+  bool getMasterEnabled() {
+    return master_enabled_;
+  }
+  void setTraceLevel(unsigned long trace_level) {
+    trace_level_ = trace_level;
+    if (active_tranxs_)
+      active_tranxs_->trace_level_ = trace_level;
+  }
+
+  /* Set the transaction wait timeout period, in milliseconds. */
+  void setWaitTimeout(unsigned long wait_timeout) {
+    wait_timeout_ = wait_timeout;
+  }
+
+  /* Initialize this class after MySQL parameters are initialized. this
+   * function should be called once at bootstrap time.
+   */
+  int initObject();
+
+  /* Enable the object to enable semi-sync replication inside the master. */
+  int enableMaster();
+
+  /* Enable the object to enable semi-sync replication inside the master. */
+  int disableMaster();
+
+  /* Add a semi-sync replication slave */
+  void add_slave();
+    
+  /* Remove a semi-sync replication slave */
+  void remove_slave();
+
+  /* Is the slave servered by the thread requested semi-sync */
+  bool is_semi_sync_slave();
+
+  int reportReplyBinlog(const char *log_file_pos);
+  
+  /* In semi-sync replication, reports up to which binlog position we have
+   * received replies from the slave indicating that it already get the events.
+   *
+   * Input:
+   *  server_id     - (IN)  master server id number
+   *  log_file_name - (IN)  binlog file name
+   *  end_offset    - (IN)  the offset in the binlog file up to which we have
+   *                        the replies from the slave
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int reportReplyBinlog(uint32 server_id,
+                        const char* log_file_name,
+                        my_off_t end_offset);
+
+  /* Commit a transaction in the final step.  This function is called from
+   * InnoDB before returning from the low commit.  If semi-sync is switch on,
+   * the function will wait to see whether binlog-dump thread get the reply for
+   * the events of the transaction.  Remember that this is not a direct wait,
+   * instead, it waits to see whether the binlog-dump thread has reached the
+   * point.  If the wait times out, semi-sync status will be switched off and
+   * all other transaction would not wait either.
+   *
+   * Input:  (the transaction events' ending binlog position)
+   *  trx_wait_binlog_name - (IN)  ending position's file name
+   *  trx_wait_binlog_pos  - (IN)  ending position's file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int commitTrx(const char* trx_wait_binlog_name,
+                my_off_t trx_wait_binlog_pos);
+
+  /* Reserve space in the replication event packet header:
+   *  . slave semi-sync off: 1 byte - (0)
+   *  . slave semi-sync on:  3 byte - (0, 0xef, 0/1}
+   * 
+   * Input:
+   *  header   - (IN)  the header buffer
+   *  size     - (IN)  size of the header buffer
+   *
+   * Return:
+   *  size of the bytes reserved for header
+   */
+  int reserveSyncHeader(unsigned char *header, unsigned long size);
+
+  /* Update the sync bit in the packet header to indicate to the slave whether
+   * the master will wait for the reply of the event.  If semi-sync is switched
+   * off and we detect that the slave is catching up, we switch semi-sync on.
+   * 
+   * Input:
+   *  packet        - (IN)  the packet containing the replication event
+   *  log_file_name - (IN)  the event ending position's file name
+   *  log_file_pos  - (IN)  the event ending position's file offset
+   *  server_id     - (IN)  master server id number
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int updateSyncHeader(unsigned char *packet,
+                       const char *log_file_name,
+		       my_off_t log_file_pos,
+		       uint32 server_id);
+
+  /* Called when a transaction finished writing binlog events.
+   *  . update the 'largest' transactions' binlog event position
+   *  . insert the ending position in the active transaction list if
+   *    semi-sync is on
+   * 
+   * Input:  (the transaction events' ending binlog position)
+   *  log_file_name - (IN)  transaction ending position's file name
+   *  log_file_pos  - (IN)  transaction ending position's file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
+
+  /* Export internal statistics for semi-sync replication. */
+  void setExportStats();
+
+  /* 'reset master' command is issued from the user and semi-sync need to
+   * go off for that.
+   */
+  int resetMaster();
+};
+
+/* System and status variables for the master component */
+extern char rpl_semi_sync_master_enabled;
+extern unsigned long rpl_semi_sync_master_timeout;
+extern unsigned long rpl_semi_sync_master_trace_level;
+extern unsigned long rpl_semi_sync_master_status;
+extern unsigned long rpl_semi_sync_master_yes_transactions;
+extern unsigned long rpl_semi_sync_master_no_transactions;
+extern unsigned long rpl_semi_sync_master_off_times;
+extern unsigned long rpl_semi_sync_master_timefunc_fails;
+extern unsigned long rpl_semi_sync_master_num_timeouts;
+extern unsigned long rpl_semi_sync_master_wait_sessions;
+extern unsigned long rpl_semi_sync_master_back_wait_pos;
+extern unsigned long rpl_semi_sync_master_trx_wait_time;
+extern unsigned long rpl_semi_sync_master_net_wait_time;
+extern unsigned long long rpl_semi_sync_master_net_wait_num;
+extern unsigned long long rpl_semi_sync_master_trx_wait_num;
+extern unsigned long long rpl_semi_sync_master_net_wait_total_time;
+extern unsigned long long rpl_semi_sync_master_trx_wait_total_time;
+extern unsigned long rpl_semi_sync_master_clients;
+
+#endif /* SEMISYNC_MASTER_H */

=== added file 'plugin/semisync/semisync_master_plugin.cc'
--- a/plugin/semisync/semisync_master_plugin.cc	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_master_plugin.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,380 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#include "semisync_master.h"
+
+ReplSemiSyncMaster repl_semisync;
+
+int repl_semi_report_binlog_update(Binlog_storage_param *param,
+				   const char *log_file,
+				   my_off_t log_pos, uint32 flags)
+{
+  int  error= 0;
+
+  if (repl_semisync.getMasterEnabled())
+  {
+    /*
+      Let us store the binlog file name and the position, so that
+      we know how long to wait for the binlog to the replicated to
+      the slave in synchronous replication.
+    */
+    error= repl_semisync.writeTranxInBinlog(log_file,
+                                            log_pos);
+  }
+
+  return error;
+}
+
+int repl_semi_request_commit(Trans_param *param)
+{
+  return 0;
+}
+
+int repl_semi_report_commit(Trans_param *param)
+{
+
+  bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
+
+  if (is_real_trans && param->log_pos)
+  {
+    const char *binlog_name= param->log_file;
+    return repl_semisync.commitTrx(binlog_name, param->log_pos);
+  }
+  return 0;
+}
+
+int repl_semi_report_rollback(Trans_param *param)
+{
+  return repl_semi_report_commit(param);
+}
+
+int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
+				 const char *log_file,
+				 my_off_t log_pos)
+{
+  bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
+  
+  if (semi_sync_slave)
+    /* One more semi-sync slave */
+    repl_semisync.add_slave();
+  sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
+			semi_sync_slave ? "semi-sync" : "asynchronous",
+			param->server_id, log_file, (unsigned long)log_pos);
+  
+  return 0;
+}
+
+int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
+{
+  bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
+  
+  sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
+                        semi_sync_slave ? "semi-sync" : "asynchronous",
+                        param->server_id);
+  if (semi_sync_slave)
+  {
+    /* One less semi-sync slave */
+    repl_semisync.remove_slave();
+  }
+  return 0;
+}
+
+int repl_semi_reserve_header(Binlog_transmit_param *param,
+			     unsigned char *header,
+			     unsigned long size, unsigned long *len)
+{
+  *len +=  repl_semisync.reserveSyncHeader(header, size);
+  return 0;
+}
+
+int repl_semi_before_send_event(Binlog_transmit_param *param,
+                                unsigned char *packet, unsigned long len,
+                                const char *log_file, my_off_t log_pos)
+{
+  return repl_semisync.updateSyncHeader(packet,
+					log_file,
+					log_pos,
+					param->server_id);
+}
+
+int repl_semi_after_send_event(Binlog_transmit_param *param,
+                               const char *event_buf, unsigned long len)
+{
+  return 0;
+}
+
+int repl_semi_reset_master(Binlog_transmit_param *param)
+{
+  if (repl_semisync.resetMaster())
+    return 1;
+  return 0;
+}
+
+/*
+  semisync system variables
+ */
+static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
+				      SYS_VAR *var,
+				      void *ptr,
+				      const void *val);
+
+static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
+					  SYS_VAR *var,
+					  void *ptr,
+					  const void *val);
+
+static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
+				      SYS_VAR *var,
+				      void *ptr,
+				      const void *val);
+
+static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
+                                                 SYS_VAR *var,
+                                                 void *ptr,
+                                                 const void *val);
+
+static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
+  PLUGIN_VAR_OPCMDARG,
+ "Enable semi-synchronous replication master (disabled by default). ",
+  NULL, 			// check
+  &fix_rpl_semi_sync_master_enabled,	// update
+  0);
+
+static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
+  PLUGIN_VAR_OPCMDARG,
+ "The timeout value (in ms) for semi-synchronous replication in the master",
+  NULL, 			// check
+  fix_rpl_semi_sync_master_timeout,	// update
+  10000, 0, ~0L, 1);
+
+static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
+  PLUGIN_VAR_OPCMDARG,
+ "The tracing level for semi-sync replication.",
+  NULL,				  // check
+  &fix_rpl_semi_sync_master_trace_level, // update
+  32, 0, ~0L, 1);
+
+/*
+  Use a SESSION instead of GLOBAL variable for slave to send reply to
+  avoid requiring SUPER privilege.
+*/
+static MYSQL_THDVAR_STR(reply_log_file_pos,
+  PLUGIN_VAR_NOCMDOPT,
+  "The log filename and position slave has queued to relay log.",
+  NULL,             // check
+  &fix_rpl_semi_sync_master_reply_log_file_pos,
+  "");
+
+static SYS_VAR* semi_sync_master_system_vars[]= {
+  MYSQL_SYSVAR(enabled),
+  MYSQL_SYSVAR(timeout),
+  MYSQL_SYSVAR(trace_level),
+  MYSQL_SYSVAR(reply_log_file_pos),
+  NULL,
+};
+
+
+static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
+				      SYS_VAR *var,
+				      void *ptr,
+				      const void *val)
+{
+  *(unsigned long *)ptr= *(unsigned long *)val;
+  repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
+  return;
+}
+
+static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
+					  SYS_VAR *var,
+					  void *ptr,
+					  const void *val)
+{
+  *(unsigned long *)ptr= *(unsigned long *)val;
+  repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
+  return;
+}
+
+static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
+				      SYS_VAR *var,
+				      void *ptr,
+				      const void *val)
+{
+  *(char *)ptr= *(char *)val;
+  if (rpl_semi_sync_master_enabled)
+  {
+    if (repl_semisync.enableMaster() != 0)
+      rpl_semi_sync_master_enabled = false;
+  }
+  else
+  {
+    if (repl_semisync.disableMaster() != 0)
+      rpl_semi_sync_master_enabled = true;
+  }
+
+  return;
+}
+
+static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
+                                                 SYS_VAR *var,
+                                                 void *ptr,
+                                                 const void *val)
+{
+  const char *log_file_pos= *(char **)val;
+  
+  if (repl_semisync.reportReplyBinlog(log_file_pos))
+    sql_print_error("report slave binlog reply failed.");
+
+  return;
+}
+
+Trans_observer trans_observer = {
+  sizeof(Trans_observer),		// len
+
+  repl_semi_report_commit,	// after_commit
+  repl_semi_report_rollback,	// after_rollback
+};
+
+Binlog_storage_observer storage_observer = {
+  sizeof(Binlog_storage_observer), // len
+
+  repl_semi_report_binlog_update, // report_update
+};
+
+Binlog_transmit_observer transmit_observer = {
+  sizeof(Binlog_transmit_observer), // len
+
+  repl_semi_binlog_dump_start,	// start
+  repl_semi_binlog_dump_end,	// stop
+  repl_semi_reserve_header,	// reserve_header
+  repl_semi_before_send_event,	// before_send_event
+  repl_semi_after_send_event,	// after_send_event
+  repl_semi_reset_master,	// reset
+};
+
+
+#define SHOW_FNAME(name)			\
+  rpl_semi_sync_master_show_##name
+
+#define DEF_SHOW_FUNC(name, show_type)					\
+  static  int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
+  {									\
+    repl_semisync.setExportStats();					\
+    var->type= show_type;						\
+    var->value= (char *)&rpl_semi_sync_master_##name;				\
+    return 0;								\
+  }
+
+DEF_SHOW_FUNC(clients, SHOW_LONG)
+DEF_SHOW_FUNC(net_wait_time, SHOW_LONG)
+DEF_SHOW_FUNC(net_wait_total_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(off_times, SHOW_LONG)
+DEF_SHOW_FUNC(no_transactions, SHOW_LONG)
+DEF_SHOW_FUNC(status, SHOW_BOOL)
+DEF_SHOW_FUNC(timefunc_fails, SHOW_LONG)
+DEF_SHOW_FUNC(trx_wait_time, SHOW_LONG)
+DEF_SHOW_FUNC(trx_wait_total_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(back_wait_pos, SHOW_LONG)
+DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(yes_transactions, SHOW_LONG)
+
+
+/* plugin status variables */
+static SHOW_VAR semi_sync_master_status_vars[]= {
+  {"Rpl_semi_sync_master_clients",    (char*) &SHOW_FNAME(clients),         SHOW_FUNC},
+  {"Rpl_semi_sync_master_net_avg_wait_time",
+                               (char*) &SHOW_FNAME(net_wait_time),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_net_wait_time",
+                               (char*) &SHOW_FNAME(net_wait_total_time),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_net_waits",  (char*) &SHOW_FNAME(net_wait_num),    SHOW_FUNC},
+  {"Rpl_semi_sync_master_no_times",   (char*) &SHOW_FNAME(off_times),       SHOW_FUNC},
+  {"Rpl_semi_sync_master_no_tx",      (char*) &SHOW_FNAME(no_transactions), SHOW_FUNC},
+  {"Rpl_semi_sync_master_status",     (char*) &SHOW_FNAME(status),          SHOW_FUNC},
+  {"Rpl_semi_sync_master_timefunc_failures",
+                               (char*) &SHOW_FNAME(timefunc_fails),  SHOW_FUNC},
+  {"Rpl_semi_sync_master_tx_avg_wait_time",
+                               (char*) &SHOW_FNAME(trx_wait_time),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_tx_wait_time",
+                               (char*) &SHOW_FNAME(trx_wait_total_time),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_tx_waits",   (char*) &SHOW_FNAME(trx_wait_num),    SHOW_FUNC},
+  {"Rpl_semi_sync_master_wait_pos_backtraverse",
+                               (char*) &SHOW_FNAME(back_wait_pos),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_wait_sessions",
+                               (char*) &SHOW_FNAME(wait_sessions),   SHOW_FUNC},
+  {"Rpl_semi_sync_master_yes_tx",     (char*) &SHOW_FNAME(yes_transactions), SHOW_FUNC},
+  {NULL, NULL, SHOW_LONG},
+};
+
+
+static int semi_sync_master_plugin_init(void *p)
+{
+  if (repl_semisync.initObject())
+    return 1;
+  if (register_trans_observer(&trans_observer, p))
+    return 1;
+  if (register_binlog_storage_observer(&storage_observer, p))
+    return 1;
+  if (register_binlog_transmit_observer(&transmit_observer, p))
+    return 1;
+  return 0;
+}
+
+static int semi_sync_master_plugin_deinit(void *p)
+{
+  if (unregister_trans_observer(&trans_observer, p))
+  {
+    sql_print_error("unregister_trans_observer failed");
+    return 1;
+  }
+  if (unregister_binlog_storage_observer(&storage_observer, p))
+  {
+    sql_print_error("unregister_binlog_storage_observer failed");
+    return 1;
+  }
+  if (unregister_binlog_transmit_observer(&transmit_observer, p))
+  {
+    sql_print_error("unregister_binlog_transmit_observer failed");
+    return 1;
+  }
+  sql_print_information("unregister_replicator OK");
+  return 0;
+}
+
+struct Mysql_replication semi_sync_master_plugin= {
+  MYSQL_REPLICATION_INTERFACE_VERSION
+};
+
+/*
+  Plugin library descriptor
+*/
+mysql_declare_plugin(semi_sync_master)
+{
+  MYSQL_REPLICATION_PLUGIN,
+  &semi_sync_master_plugin,
+  "rpl_semi_sync_master",
+  "He Zhenxing",
+  "Semi-synchronous replication master",
+  PLUGIN_LICENSE_GPL,
+  semi_sync_master_plugin_init, /* Plugin Init */
+  semi_sync_master_plugin_deinit, /* Plugin Deinit */
+  0x0100 /* 1.0 */,
+  semi_sync_master_status_vars,	/* status variables */
+  semi_sync_master_system_vars,	/* system variables */
+  NULL                        /* config options                  */
+}
+mysql_declare_plugin_end;

=== added file 'plugin/semisync/semisync_slave.cc'
--- a/plugin/semisync/semisync_slave.cc	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_slave.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,122 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#include "semisync_slave.h"
+
+char rpl_semi_sync_slave_enabled;
+unsigned long rpl_semi_sync_slave_status= 0;
+unsigned long rpl_semi_sync_slave_trace_level;
+
+int ReplSemiSyncSlave::initObject()
+{
+  int result= 0;
+  const char *kWho = "ReplSemiSyncSlave::initObject";
+
+  if (init_done_)
+  {
+    fprintf(stderr, "%s called twice\n", kWho);
+    return 1;
+  }
+  init_done_ = true;
+
+  /* References to the parameter works after set_options(). */
+  setSlaveEnabled(rpl_semi_sync_slave_enabled);
+  setTraceLevel(rpl_semi_sync_slave_trace_level);
+
+  return result;
+}
+
+int ReplSemiSyncSlave::slaveReplyConnect()
+{
+  if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
+  {
+    sql_print_error("Semisync slave connect to master for reply failed");
+    return 1;
+  }
+  return 0;
+}
+
+int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
+                                      unsigned long total_len,
+                                      bool  *need_reply,
+                                      const char **payload,
+                                      unsigned long *payload_len)
+{
+  const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
+  int read_res = 0;
+  function_enter(kWho);
+
+  if ((unsigned char)(header[0]) == kPacketMagicNum)
+  {
+    *need_reply  = (header[1] & kPacketFlagSync);
+    *payload_len = total_len - 2;
+    *payload     = header + 2;
+
+    if (trace_level_ & kTraceDetail)
+      sql_print_information("%s: reply - %d", kWho, *need_reply);
+  }
+  else
+  {
+    sql_print_error("Missing magic number for semi-sync packet, packet "
+                    "len: %lu", total_len);
+    read_res = -1;
+  }
+
+  return function_exit(kWho, read_res);
+}
+
+int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
+{
+  bool semi_sync= getSlaveEnabled();
+  
+  sql_print_information("Slave I/O thread: Start %s replication to\
+ master '%s@%s:%d' in log '%s' at position %lu",
+			semi_sync ? "semi-sync" : "asynchronous",
+			param->user, param->host, param->port,
+			param->master_log_name[0] ? param->master_log_name : "FIRST",
+			(unsigned long)param->master_log_pos);
+
+  if (semi_sync && !rpl_semi_sync_slave_status)
+    rpl_semi_sync_slave_status= 1;
+  return 0;
+}
+
+int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
+{
+  if (rpl_semi_sync_slave_status)
+    rpl_semi_sync_slave_status= 0;
+  if (mysql_reply)
+    mysql_close(mysql_reply);
+  mysql_reply= 0;
+  return 0;
+}
+
+int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
+{
+  char query[FN_REFLEN + 100];
+  sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
+          (unsigned long long)log_pos, log_name);
+  if (mysql_real_query(mysql_reply, query, strlen(query)))
+  {
+    sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
+    mysql_free_result(mysql_store_result(mysql_reply));
+    mysql_close(mysql_reply);
+    mysql_reply= 0;
+    return 1;
+  }
+  mysql_free_result(mysql_store_result(mysql_reply));
+  return 0;
+}

=== added file 'plugin/semisync/semisync_slave.h'
--- a/plugin/semisync/semisync_slave.h	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_slave.h	2009-09-26 04:49:49 +0000
@@ -0,0 +1,99 @@
+/* Copyright (C) 2006 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#ifndef SEMISYNC_SLAVE_H
+#define SEMISYNC_SLAVE_H
+
+#include "semisync.h"
+
+/**
+   The extension class for the slave of semi-synchronous replication
+*/
+class ReplSemiSyncSlave
+  :public ReplSemiSyncBase {
+public:
+ ReplSemiSyncSlave()
+   :slave_enabled_(false)
+  {}
+  ~ReplSemiSyncSlave() {}
+
+  void setTraceLevel(unsigned long trace_level) {
+    trace_level_ = trace_level;
+  }
+
+  /* Initialize this class after MySQL parameters are initialized. this
+   * function should be called once at bootstrap time.
+   */
+  int initObject();
+
+  bool getSlaveEnabled() {
+    return slave_enabled_;
+  }
+  void setSlaveEnabled(bool enabled) {
+    slave_enabled_ = enabled;
+  }
+
+  /* A slave reads the semi-sync packet header and separate the metadata
+   * from the payload data.
+   * 
+   * Input:
+   *  header      - (IN)  packet header pointer
+   *  total_len   - (IN)  total packet length: metadata + payload
+   *  need_reply  - (IN)  whether the master is waiting for the reply
+   *  payload     - (IN)  payload: the replication event
+   *  payload_len - (IN)  payload length
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
+                          const char **payload, unsigned long *payload_len);
+
+  /* A slave replies to the master indicating its replication process.  It
+   * indicates that the slave has received all events before the specified
+   * binlog position.
+   * 
+   * Input:
+   *  log_name         - (IN)  the reply point's binlog file name
+   *  log_pos   - (IN)  the reply point's binlog file offset
+   *
+   * Return:
+   *  0: success;  -1 or otherwise: error
+   */
+  int slaveReply(const char *log_name, my_off_t log_pos);
+
+  /*
+    Connect to master for sending reply
+   */
+  int slaveReplyConnect();
+  
+  int slaveStart(Binlog_relay_IO_param *param);
+  int slaveStop(Binlog_relay_IO_param *param);
+
+private:
+  /* True when initObject has been called */
+  bool init_done_;
+  bool slave_enabled_;        /* semi-sycn is enabled on the slave */
+  MYSQL *mysql_reply;         /* connection to send reply */
+};
+
+
+/* System and status variables for the slave component */
+extern char rpl_semi_sync_slave_enabled;
+extern unsigned long rpl_semi_sync_slave_trace_level;
+extern unsigned long rpl_semi_sync_slave_status;
+
+#endif /* SEMISYNC_SLAVE_H */

=== added file 'plugin/semisync/semisync_slave_plugin.cc'
--- a/plugin/semisync/semisync_slave_plugin.cc	1970-01-01 00:00:00 +0000
+++ b/plugin/semisync/semisync_slave_plugin.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,224 @@
+/* Copyright (C) 2007 Google Inc.
+   Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+
+#include "semisync_slave.h"
+
+ReplSemiSyncSlave repl_semisync;
+
+/*
+  indicate whether or not the slave should send a reply to the master.
+
+  This is set to true in repl_semi_slave_read_event if the current
+  event read is the last event of a transaction. And the value is
+  checked in repl_semi_slave_queue_event.
+*/
+bool semi_sync_need_reply= false;
+
+int repl_semi_reset_slave(Binlog_relay_IO_param *param)
+{
+  // TODO: reset semi-sync slave status here
+  return 0;
+}
+
+int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
+				 uint32 flags)
+{
+  MYSQL *mysql= param->mysql;
+  MYSQL_RES *res= 0;
+  MYSQL_ROW row;
+  const char *query;
+
+  if (!repl_semisync.getSlaveEnabled())
+    return 0;
+
+  /*
+    Create the connection that is used to send slave ACK replies to
+    master
+  */
+  if (repl_semisync.slaveReplyConnect())
+    return 1;
+
+  /* Check if master server has semi-sync plugin installed */
+  query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
+  if (mysql_real_query(mysql, query, strlen(query)) ||
+      !(res= mysql_store_result(mysql)))
+  {
+    mysql_free_result(mysql_store_result(mysql));
+    sql_print_error("Execution failed on master: %s", query);
+    return 1;
+  }
+
+  row= mysql_fetch_row(res);
+  if (!row || strcmp(row[1], "ON"))
+  {
+    /* Master does not support or not configured semi-sync */
+    sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous");
+    rpl_semi_sync_slave_status= 0;
+    return 0;
+  }
+
+  /*
+    Tell master dump thread that we want to do semi-sync
+    replication
+  */
+  query= "SET @rpl_semi_sync_slave= 1";
+  if (mysql_real_query(mysql, query, strlen(query)))
+  {
+    sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
+    mysql_free_result(mysql_store_result(mysql));
+    return 1;
+  }
+  mysql_free_result(mysql_store_result(mysql));
+  rpl_semi_sync_slave_status= 1;
+  return 0;
+}
+
+int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
+			       const char *packet, unsigned long len,
+			       const char **event_buf, unsigned long *event_len)
+{
+  if (rpl_semi_sync_slave_status)
+    return repl_semisync.slaveReadSyncHeader(packet, len,
+					     &semi_sync_need_reply,
+					     event_buf, event_len);
+  *event_buf= packet;
+  *event_len= len;
+  return 0;
+}
+
+int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
+				const char *event_buf,
+				unsigned long event_len,
+				uint32 flags)
+{
+  if (rpl_semi_sync_slave_status && semi_sync_need_reply)
+    return repl_semisync.slaveReply(param->master_log_name,
+                                    param->master_log_pos);
+  return 0;
+}
+
+int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
+{
+  return repl_semisync.slaveStart(param);
+}
+
+int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
+{
+  return repl_semisync.slaveStop(param);
+}
+
+
+static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,
+					    SYS_VAR *var,
+					    void *ptr,
+					    const void *val)
+{
+  *(char *)ptr= *(char *)val;
+  repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
+  return;
+}
+
+static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd,
+					  SYS_VAR *var,
+					  void *ptr,
+					  const void *val)
+{
+  *(unsigned long *)ptr= *(unsigned long *)val;
+  repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level);
+  return;
+}
+
+/* plugin system variables */
+static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled,
+  PLUGIN_VAR_OPCMDARG,
+ "Enable semi-synchronous replication slave (disabled by default). ",
+  NULL,				   // check
+  &fix_rpl_semi_sync_slave_enabled, // update
+  0);
+
+static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
+  PLUGIN_VAR_OPCMDARG,
+ "The tracing level for semi-sync replication.",
+  NULL,				  // check
+  &fix_rpl_semi_sync_trace_level, // update
+  32, 0, ~0L, 1);
+
+static SYS_VAR* semi_sync_slave_system_vars[]= {
+  MYSQL_SYSVAR(enabled),
+  MYSQL_SYSVAR(trace_level),
+  NULL,
+};
+
+
+/* plugin status variables */
+static SHOW_VAR semi_sync_slave_status_vars[]= {
+  {"Rpl_semi_sync_slave_status",
+   (char*) &rpl_semi_sync_slave_status,    SHOW_BOOL},
+  {NULL, NULL, SHOW_BOOL},
+};
+
+Binlog_relay_IO_observer relay_io_observer = {
+  sizeof(Binlog_relay_IO_observer), // len
+
+  repl_semi_slave_io_start,	// start
+  repl_semi_slave_io_end,	// stop
+  repl_semi_slave_request_dump,	// request_transmit
+  repl_semi_slave_read_event,	// after_read_event
+  repl_semi_slave_queue_event,	// after_queue_event
+  repl_semi_reset_slave,	// reset
+};
+
+static int semi_sync_slave_plugin_init(void *p)
+{
+  if (repl_semisync.initObject())
+    return 1;
+  if (register_binlog_relay_io_observer(&relay_io_observer, p))
+    return 1;
+  return 0;
+}
+
+static int semi_sync_slave_plugin_deinit(void *p)
+{
+  if (unregister_binlog_relay_io_observer(&relay_io_observer, p))
+    return 1;
+  return 0;
+}
+
+
+struct Mysql_replication semi_sync_slave_plugin= {
+  MYSQL_REPLICATION_INTERFACE_VERSION
+};
+
+/*
+  Plugin library descriptor
+*/
+mysql_declare_plugin(semi_sync_slave)
+{
+  MYSQL_REPLICATION_PLUGIN,
+  &semi_sync_slave_plugin,
+  "rpl_semi_sync_slave",
+  "He Zhenxing",
+  "Semi-synchronous replication slave",
+  PLUGIN_LICENSE_GPL,
+  semi_sync_slave_plugin_init, /* Plugin Init */
+  semi_sync_slave_plugin_deinit, /* Plugin Deinit */
+  0x0100 /* 1.0 */,
+  semi_sync_slave_status_vars,	/* status variables */
+  semi_sync_slave_system_vars,	/* system variables */
+  NULL                        /* config options                  */
+}
+mysql_declare_plugin_end;

=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt	2009-06-10 08:59:49 +0000
+++ b/sql/CMakeLists.txt	2009-09-26 04:49:49 +0000
@@ -75,6 +75,7 @@ SET (SQL_SOURCE
                rpl_rli.cc rpl_mi.cc sql_servers.cc
                sql_connect.cc scheduler.cc 
                sql_profile.cc event_parse_data.cc
+               rpl_handler.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
                ${PROJECT_SOURCE_DIR}/include/mysqld_error.h

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2009-07-31 19:28:15 +0000
+++ b/sql/Makefile.am	2009-09-26 04:49:49 +0000
@@ -76,7 +76,8 @@ noinst_HEADERS =	item.h item_func.h item
 			sql_plugin.h authors.h event_parse_data.h \
 			event_data_objects.h event_scheduler.h \
 			sql_partition.h partition_info.h partition_element.h \
-			contributors.h sql_servers.h
+			contributors.h sql_servers.h \
+			rpl_handler.h replication.h
 
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -120,7 +121,8 @@ mysqld_SOURCES =	sql_lex.cc sql_handler.
                         event_queue.cc event_db_repository.cc events.cc \
 			sql_plugin.cc sql_binlog.cc \
 			sql_builtin.cc sql_tablespace.cc partition_info.cc \
-			sql_servers.cc event_parse_data.cc
+			sql_servers.cc event_parse_data.cc \
+			rpl_handler.cc
 
 nodist_mysqld_SOURCES =	mini_client_errors.c pack.c client.c my_time.c my_user.c 
 

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2009-07-29 08:54:20 +0000
+++ b/sql/handler.cc	2009-09-26 04:49:49 +0000
@@ -24,6 +24,7 @@
 #endif
 
 #include "mysql_priv.h"
+#include "rpl_handler.h"
 #include "rpl_filter.h"
 #include <myisampack.h>
 #include <errno.h>
@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum 
     return NULL;
   }
 
+  RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+
   switch (database_type) {
 #ifndef NO_HASH
   case DB_TYPE_HASH:
@@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all)
     if (cookie)
       tc_log->unlog(cookie, xid);
     DBUG_EXECUTE_IF("crash_commit_after", abort(););
+    RUN_HOOK(transaction, after_commit, (thd, FALSE));
 end:
     if (rw_trans)
       start_waiting_global_read_lock(thd);
@@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all
     push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                  ER_WARNING_NOT_COMPLETE_ROLLBACK,
                  ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+  RUN_HOOK(transaction, after_rollback, (thd, FALSE));
   DBUG_RETURN(error);
 }
 
@@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, 
 
     thd->variables.tx_isolation=thd->session_tx_isolation;
   }
+  else
 #endif
+  {
+    if (!error)
+      RUN_HOOK(transaction, after_commit, (thd, FALSE));
+    else
+      RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+  }
   DBUG_RETURN(error);
 }
 

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2009-08-28 16:21:54 +0000
+++ b/sql/log.cc	2009-09-26 04:49:49 +0000
@@ -38,6 +38,7 @@
 #endif
 
 #include <mysql/plugin.h>
+#include "rpl_handler.h"
 
 /* max size of the log message */
 #define MAX_LOG_BUFFER_SIZE 1024
@@ -3683,9 +3684,11 @@ err:
 }
 
 
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
 {
   int err=0, fd=log_file.file;
+  if (synced)
+    *synced= 0;
   safe_mutex_assert_owner(&LOCK_log);
   if (flush_io_cache(&log_file))
     return 1;
@@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
   {
     sync_binlog_counter= 0;
     err=my_sync(fd, MYF(MY_WME));
+    if (synced)
+      *synced= 1;
   }
   return err;
 }
@@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
 
     if (file == &log_file)
     {
-      error= flush_and_sync();
+      error= flush_and_sync(0);
       if (!error)
       {
         signal_update();
@@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
 
     if (file == &log_file) // we are writing to the real log (disk)
     {
-      if (flush_and_sync())
+      bool synced= 0;
+      if (flush_and_sync(&synced))
 	goto err;
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, file->pos_in_file, synced))) {
+        sql_print_error("Failed to run 'after_flush' hooks");
+        goto err;
+      }
+
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
@@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE 
   DBUG_ASSERT(carry == 0);
 
   if (sync_log)
-    flush_and_sync();
+    flush_and_sync(0);
 
   return 0;                                     // All OK
 }
@@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *
   ev.write(&log_file);
   if (lock)
   {
-    if (!error && !(error= flush_and_sync()))
+    if (!error && !(error= flush_and_sync(0)))
     {
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
@@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
       if (incident && write_incident(thd, FALSE))
         goto err;
 
-      if (flush_and_sync())
+      bool synced= 0;
+      if (flush_and_sync(&synced))
         goto err;
       DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
       if (cache->error)				// Error on read
@@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
         write_error=1;				// Don't give more errors
         goto err;
       }
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, log_file.pos_in_file, synced)))
+      {
+        sql_print_error("Failed to run 'after_flush' hooks");
+        write_error=1;
+        goto err;
+      }
+
       signal_update();
     }
 

=== modified file 'sql/log.h'
--- a/sql/log.h	2009-06-18 13:52:46 +0000
+++ b/sql/log.h	2009-09-26 04:49:49 +0000
@@ -378,7 +378,21 @@ public:
   bool is_active(const char* log_file_name);
   int update_log_index(LOG_INFO* linfo, bool need_update_threads);
   void rotate_and_purge(uint flags);
-  bool flush_and_sync();
+
+  /**
+     Flush binlog cache and synchronize to disk.
+
+     This function flushes events in binlog cache to binary log file,
+     it will do synchronizing according to the setting of system
+     variable 'sync_binlog'. If file is synchronized, @c synced will
+     be set to 1, otherwise 0.
+
+     @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+     @retval 0 Success
+     @retval other Failure
+  */
+  bool flush_and_sync(bool *synced);
   int purge_logs(const char *to_log, bool included,
                  bool need_mutex, bool need_update_threads,
                  ulonglong *decrease_log_space);

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2009-09-02 16:58:17 +0000
+++ b/sql/mysqld.cc	2009-09-26 04:49:49 +0000
@@ -31,6 +31,8 @@
 
 #include "rpl_injector.h"
 
+#include "rpl_handler.h"
+
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
 #endif
@@ -1284,6 +1286,7 @@ void clean_up(bool print_message)
   ha_end();
   if (tc_log)
     tc_log->close();
+  delegates_destroy();
   xid_cache_free();
   delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
   multi_keycache_free();
@@ -3760,6 +3763,13 @@ static int init_server_components()
     unireg_abort(1);
   }
 
+  /* initialize delegates for extension observers */
+  if (delegates_init())
+  {
+    sql_print_error("Initialize extension delegates failed");
+    unireg_abort(1);
+  }
+
   /* need to configure logging before initializing storage engines */
   if (opt_update_log)
   {

=== added file 'sql/replication.h'
--- a/sql/replication.h	1970-01-01 00:00:00 +0000
+++ b/sql/replication.h	2009-09-26 04:49:49 +0000
@@ -0,0 +1,490 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+   Transaction observer flags.
+*/
+enum Trans_flags {
+  /** Transaction is a real transaction */
+  TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+   Transaction observer parameter
+*/
+typedef struct Trans_param {
+  uint32 server_id;
+  uint32 flags;
+
+  /*
+    The latest binary log file name and position written by current
+    transaction, if binary log is disabled or no log event has been
+    written into binary log file by current transaction (events
+    written into transaction log cache are not counted), these two
+    member will be zero.
+  */
+  const char *log_file;
+  my_off_t log_pos;
+} Trans_param;
+
+/**
+   Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+  uint32 len;
+
+  /**
+     This callback is called after transaction commit
+     
+     This callback is called right after commit to storage engines for
+     transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     succeeded.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_commit)(Trans_param *param);
+
+  /**
+     This callback is called after transaction rollback
+
+     This callback is called right after rollback to storage engines
+     for transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     failed.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+   Binlog storage flags
+*/
+enum Binlog_storage_flags {
+  /** Binary log was sync:ed */
+  BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+   Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+  uint32 server_id;
+} Binlog_storage_param;
+
+/**
+   Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+  uint32 len;
+
+  /**
+     This callback is called after binlog has been flushed
+
+     This callback is called after cached events have been flushed to
+     binary log file. Whether the binary log file is synchronized to
+     disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+     @param param Observer common parameter
+     @param log_file Binlog file name been updated
+     @param log_pos Binlog position after update
+     @param flags flags for binlog storage
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_flush)(Binlog_storage_param *param,
+                     const char *log_file, my_off_t log_pos,
+                     uint32 flags);
+} Binlog_storage_observer;
+
+/**
+   Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+  uint32 server_id;
+  uint32 flags;
+} Binlog_transmit_param;
+
+/**
+   Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+  uint32 len;
+  
+  /**
+     This callback is called when binlog dumping starts
+
+
+     @param param Observer common parameter
+     @param log_file Binlog file name to transmit from
+     @param log_pos Binlog position to transmit from
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_start)(Binlog_transmit_param *param,
+                        const char *log_file, my_off_t log_pos);
+
+  /**
+     This callback is called when binlog dumping stops
+
+     @param param Observer common parameter
+     
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_stop)(Binlog_transmit_param *param);
+
+  /**
+     This callback is called to reserve bytes in packet header for event transmission
+
+     This callback is called when resetting transmit packet header to
+     reserve bytes for this observer in packet header.
+
+     The @a header buffer is allocated by the server code, and @a size
+     is the size of the header buffer. Each observer can only reserve
+     a maximum size of @a size in the header.
+
+     @param param Observer common parameter
+     @param header Pointer of the header buffer
+     @param size Size of the header buffer
+     @param len Header length reserved by this observer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*reserve_header)(Binlog_transmit_param *param,
+                        unsigned char *header,
+                        unsigned long size,
+                        unsigned long *len);
+
+  /**
+     This callback is called before sending an event packet to slave
+
+     @param param Observer common parameter
+     @param packet Binlog event packet to send
+     @param len Length of the event packet
+     @param log_file Binlog file name of the event packet to send
+     @param log_pos Binlog position of the event packet to send
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_send_event)(Binlog_transmit_param *param,
+                           unsigned char *packet, unsigned long len,
+                           const char *log_file, my_off_t log_pos );
+
+  /**
+     This callback is called after sending an event packet to slave
+
+     @param param Observer common parameter
+     @param event_buf Binlog event packet buffer sent
+     @param len length of the event packet buffer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+   */
+  int (*after_send_event)(Binlog_transmit_param *param,
+                          const char *event_buf, unsigned long len);
+
+  /**
+     This callback is called after resetting master status
+
+     This is called when executing the command RESET MASTER, and is
+     used to reset status variables added by observers.
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+   Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+  /** Binary relay log was sync:ed */
+  BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+  Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+  uint32 server_id;
+
+  /* Master host, user and port */
+  char *host;
+  char *user;
+  unsigned int port;
+
+  char *master_log_name;
+  my_off_t master_log_pos;
+
+  MYSQL *mysql;                        /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+   Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+  uint32 len;
+
+  /**
+     This callback is called when slave IO thread starts
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_start)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called when slave IO thread stops
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_stop)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called before slave requesting binlog transmission from master
+
+     This is called before slave issuing BINLOG_DUMP command to master
+     to request binlog.
+
+     @param param Observer common parameter
+     @param flags binlog dump flags
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+  /**
+     This callback is called after read an event packet from master
+
+     @param param Observer common parameter
+     @param packet The event packet read from master
+     @param len Length of the event packet read from master
+     @param event_buf The event packet return after process
+     @param event_len The length of event packet return after process
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_read_event)(Binlog_relay_IO_param *param,
+                          const char *packet, unsigned long len,
+                          const char **event_buf, unsigned long *event_len);
+
+  /**
+     This callback is called after written an event packet to relay log
+
+     @param param Observer common parameter
+     @param event_buf Event packet written to relay log
+     @param event_len Length of the event packet written to relay log
+     @param flags flags for relay log
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_queue_event)(Binlog_relay_IO_param *param,
+                           const char *event_buf, unsigned long event_len,
+                           uint32 flags);
+
+  /**
+     This callback is called after reset slave relay log IO status
+     
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+   Register a transaction observer
+
+   @param observer The transaction observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Unregister a transaction observer
+
+   @param observer The transaction observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Register a binlog storage observer
+
+   @param observer The binlog storage observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Unregister a binlog storage observer
+
+   @param observer The binlog storage observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Register a binlog transmit observer
+
+   @param observer The binlog transmit observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Unregister a binlog transmit observer
+
+   @param observer The binlog transmit observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Register a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Unregister a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Connect to master
+
+   This function can only used in the slave I/O thread context, and
+   will use the same master information to do the connection.
+
+   @code
+   MYSQL *mysql = mysql_init(NULL);
+   if (rpl_connect_master(mysql))
+   {
+     // do stuff with the connection
+   }
+   mysql_close(mysql); // close the connection
+   @endcode
+   
+   @param mysql address of MYSQL structure to use, pass NULL will
+   create a new one
+
+   @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+/**
+   Set thread entering a condition
+
+   This function should be called before putting a thread to wait for
+   a condition. @a mutex should be held before calling this
+   function. After being waken up, @f thd_exit_cond should be called.
+
+   @param thd      The thread entering the condition, NULL means current thread
+   @param cond     The condition the thread is going to wait for
+   @param mutex    The mutex associated with the condition, this must be
+                   held before call this function
+   @param msg      The new process message for the thread
+*/
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+                           pthread_mutex_t *mutex, const char *msg);
+
+/**
+   Set thread leaving a condition
+
+   This function should be called after a thread being waken up for a
+   condition.
+
+   @param thd      The thread entering the condition, NULL means current thread
+   @param old_msg  The process message, ususally this should be the old process
+                   message before calling @f thd_enter_cond
+*/
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg);
+
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */

=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc	2009-09-26 04:49:49 +0000
@@ -0,0 +1,493 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+  my_off_t log_pos;
+  char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_int(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_real(const char *name,
+                      double *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_real(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+                     size_t len, unsigned int precision, int *null_value)
+{
+  String str;
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  entry->val_str(&null_val, &str, precision);
+  strncpy(value, str.c_ptr(), len);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int delegates_init()
+{
+  static unsigned char trans_mem[sizeof(Trans_delegate)];
+  static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+  static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+  static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+  
+  if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+      || (!transaction_delegate->is_inited())
+      || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+      || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+      || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+      || (!binlog_transmit_delegate->is_inited())
+      || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+      || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+      )
+    return 1;
+
+  if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+    return 1;
+  return 0;
+}
+
+void delegates_destroy()
+{
+  if (transaction_delegate)
+    transaction_delegate->~Trans_delegate();
+  if (binlog_storage_delegate)
+    binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+  if (binlog_transmit_delegate)
+    binlog_transmit_delegate->~Binlog_transmit_delegate();
+  if (binlog_relay_io_delegate)
+    binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+  This macro is used by almost all the Delegate methods to iterate
+  over all the observers running given callback function of the
+  delegate .
+  
+  Add observer plugins to the thd->lex list, after each statement, all
+  plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args)                               \
+  param.server_id= thd->server_id;                                      \
+  read_lock();                                                          \
+  Observer_info_iterator iter= observer_info_iter();                    \
+  Observer_info *info= iter++;                                          \
+  for (; info; info= iter++)                                            \
+  {                                                                     \
+    plugin_ref plugin=                                                  \
+      my_plugin_lock(thd, &info->plugin);                               \
+    if (!plugin)                                                        \
+    {                                                                   \
+      r= 1;                                                             \
+      break;                                                            \
+    }                                                                   \
+    if (((Observer *)info->observer)->f                                 \
+        && ((Observer *)info->observer)->f args)                        \
+    {                                                                   \
+      r= 1;                                                             \
+      plugin_unlock(thd, plugin);                                       \
+      break;                                                            \
+    }                                                                   \
+    plugin_unlock(thd, plugin);                                         \
+  }                                                                     \
+  unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+                                         const char *log_file,
+                                         my_off_t log_pos,
+                                         bool synced)
+{
+  Binlog_storage_param param;
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  if (!log_info)
+  {
+    if(!(log_info=
+         (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+      return 1;
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+  }
+    
+  strcpy(log_info->log_file, log_file+dirname_length(log_file));
+  log_info->log_pos = log_pos;
+  
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_flush, thd,
+                   (&param, log_info->log_file, log_info->log_pos, flags));
+  return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+                                             const char *log_file,
+                                             my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+                                             String *packet)
+{
+  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+     bytes should be enough for each Observer to reserve their extra
+     header. If later found this is not enough, we can increase this
+     /HEZX
+  */
+#define RESERVE_HEADER_SIZE 32
+  unsigned char header[RESERVE_HEADER_SIZE];
+  ulong hlen;
+  Binlog_transmit_param param;
+  param.flags= flags;
+  param.server_id= thd->server_id;
+
+  int ret= 0;
+  read_lock();
+  Observer_info_iterator iter= observer_info_iter();
+  Observer_info *info= iter++;
+  for (; info; info= iter++)
+  {
+    plugin_ref plugin=
+      my_plugin_lock(thd, &info->plugin);
+    if (!plugin)
+    {
+      ret= 1;
+      break;
+    }
+    hlen= 0;
+    if (((Observer *)info->observer)->reserve_header
+        && ((Observer *)info->observer)->reserve_header(&param,
+                                                        header,
+                                                        RESERVE_HEADER_SIZE,
+                                                        &hlen))
+    {
+      ret= 1;
+      plugin_unlock(thd, plugin);
+      break;
+    }
+    plugin_unlock(thd, plugin);
+    if (hlen == 0)
+      continue;
+    if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+    {
+      ret= 1;
+      break;
+    }
+  }
+  unlock();
+  return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+                                                String *packet,
+                                                const char *log_file,
+                                                my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_send_event, thd,
+                   (&param, (uchar *)packet->c_ptr(),
+                    packet->length(),
+                    log_file+dirname_length(log_file), log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+                                               String *packet)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_send_event, thd,
+                   (&param, packet->c_ptr(), packet->length()));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+  return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+                                          Master_info *mi)
+{
+  param->mysql= mi->mysql;
+  param->user= mi->user;
+  param->host= mi->host;
+  param->port= mi->port;
+  param->master_log_name= mi->master_log_name;
+  param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+  return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+                                                      Master_info *mi,
+                                                      ushort flags)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+                                               const char *packet, ulong len,
+                                               const char **event_buf,
+                                               ulong *event_len)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_read_event, thd,
+                   (&param, packet, len, event_buf, event_len));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+                                                const char *event_buf,
+                                                ulong event_len,
+                                                bool synced)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_queue_event, thd,
+                   (&param, event_buf, event_len, flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+  return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */

=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h	2009-09-26 04:49:49 +0000
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+  void *observer;
+  st_plugin_int *plugin_int;
+  plugin_ref plugin;
+
+  Observer_info(void *ob, st_plugin_int *p)
+    :observer(ob), plugin_int(p)
+  {
+    plugin= plugin_int_to_ref(plugin_int);
+  }
+};
+
+class Delegate {
+public:
+  typedef List<Observer_info> Observer_info_list;
+  typedef List_iterator<Observer_info> Observer_info_iterator;
+  
+  int add_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (!info)
+    {
+      info= new Observer_info(observer, plugin);
+      if (!info || observer_info_list.push_back(info, &memroot))
+        ret= TRUE;
+    }
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+  
+  int remove_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (info)
+      iter.remove();
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+
+  inline Observer_info_iterator observer_info_iter()
+  {
+    return Observer_info_iterator(observer_info_list);
+  }
+
+  inline bool is_empty()
+  {
+    return observer_info_list.is_empty();
+  }
+
+  inline int read_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_rdlock(&lock);
+  }
+
+  inline int write_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_wrlock(&lock);
+  }
+
+  inline int unlock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_unlock(&lock);
+  }
+
+  inline bool is_inited()
+  {
+    return inited;
+  }
+  
+  Delegate()
+  {
+    inited= FALSE;
+    if (my_rwlock_init(&lock, NULL))
+      return;
+    init_sql_alloc(&memroot, 1024, 0);
+    inited= TRUE;
+  }
+  ~Delegate()
+  {
+    inited= FALSE;
+    rwlock_destroy(&lock);
+    free_root(&memroot, MYF(0));
+  }
+
+private:
+  Observer_info_list observer_info_list;
+  rw_lock_t lock;
+  MEM_ROOT memroot;
+  bool inited;
+};
+
+class Trans_delegate
+  :public Delegate {
+public:
+  typedef Trans_observer Observer;
+  int before_commit(THD *thd, bool all);
+  int before_rollback(THD *thd, bool all);
+  int after_commit(THD *thd, bool all);
+  int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+  :public Delegate {
+public:
+  typedef Binlog_storage_observer Observer;
+  int after_flush(THD *thd, const char *log_file,
+                  my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+  :public Delegate {
+public:
+  typedef Binlog_transmit_observer Observer;
+  int transmit_start(THD *thd, ushort flags,
+                     const char *log_file, my_off_t log_pos);
+  int transmit_stop(THD *thd, ushort flags);
+  int reserve_header(THD *thd, ushort flags, String *packet);
+  int before_send_event(THD *thd, ushort flags,
+                        String *packet, const
+                        char *log_file, my_off_t log_pos );
+  int after_send_event(THD *thd, ushort flags,
+                       String *packet);
+  int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+  :public Delegate {
+public:
+  typedef Binlog_relay_IO_observer Observer;
+  int thread_start(THD *thd, Master_info *mi);
+  int thread_stop(THD *thd, Master_info *mi);
+  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+  int after_read_event(THD *thd, Master_info *mi,
+                       const char *packet, ulong len,
+                       const char **event_buf, ulong *event_len);
+  int after_queue_event(THD *thd, Master_info *mi,
+                        const char *event_buf, ulong event_len,
+                        bool synced);
+  int after_reset_slave(THD *thd, Master_info *mi);
+private:
+  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  if there is no observers in the delegate, we can return 0
+  immediately.
+*/
+#define RUN_HOOK(group, hook, args)             \
+  (group ##_delegate->is_empty() ?              \
+   0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2009-08-24 10:37:44 +0000
+++ b/sql/slave.cc	2009-09-26 04:49:49 +0000
@@ -40,6 +40,7 @@
 #include <errmsg.h>
 #include <mysqld_error.h>
 #include <mysys_err.h>
+#include "rpl_handler.h"
 
 #ifdef HAVE_REPLICATION
 
@@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0;
 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 int events_till_abort = -1;
 
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
 enum enum_slave_reconnect_actions
 {
   SLAVE_RECON_ACT_REG= 0,
@@ -231,6 +234,10 @@ int init_slave()
     TODO: re-write this to interate through the list of files
     for multi-master
   */
+
+  if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+    goto err;
+
   active_mi= new Master_info;
 
   /*
@@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec,
 }
 
 
-static int request_dump(MYSQL* mysql, Master_info* mi,
-                        bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+			bool *suppress_warnings)
 {
   uchar buf[FN_REFLEN + 10];
   int len;
-  int binlog_flags = 0; // for now
+  ushort binlog_flags = 0; // for now
   char* logname = mi->master_log_name;
   DBUG_ENTER("request_dump");
   
   *suppress_warnings= FALSE;
 
+  if (RUN_HOOK(binlog_relay_io,
+               before_request_transmit,
+               (thd, mi, binlog_flags)))
+    DBUG_RETURN(1);
+  
   // TODO if big log files: Change next to int8store()
   int4store(buf, (ulong) mi->master_log_pos);
   int2store(buf + 4, binlog_flags);
@@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *
                             mi->master_log_name,
                             llstr(mi->master_log_pos,llbuff)));
 
+  /* This must be called before run any binlog_relay_io hooks */
+  my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+  if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+  {
+    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+               ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+    goto err;
+  }
+
   if (!(mi->mysql = mysql = mysql_init(NULL)))
   {
     mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2621,7 +2643,7 @@ connected:
   while (!io_slave_killed(thd,mi))
   {
     thd_proc_info(thd, "Requesting binlog dump");
-    if (request_dump(mysql, mi, &suppress_warnings))
+    if (request_dump(thd, mysql, mi, &suppress_warnings))
     {
       sql_print_error("Failed on request_dump()");
       if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2641,6 +2663,7 @@ requesting master dump") ||
           goto err;
         goto connected;
       });
+    const char *event_buf;
 
     DBUG_ASSERT(mi->last_error().number == 0);
     while (!io_slave_killed(thd,mi))
@@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-
 
       retry_count=0;                    // ok event, reset retry counter
       thd_proc_info(thd, "Queueing master event to the relay log");
-      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
-                      event_len))
+      event_buf= (const char*)mysql->net.read_pos + 1;
+      if (RUN_HOOK(binlog_relay_io, after_read_event,
+                   (thd, mi,(const char*)mysql->net.read_pos + 1,
+                    event_len, &event_buf, &event_len)))
+      {
+        mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+                   ER(ER_SLAVE_FATAL_ERROR),
+                   "Failed to run 'after_read_event' hook");
+        goto err;
+      }
+
+      /* XXX: 'synced' should be updated by queue_event to indicate
+         whether event has been synced to disk */
+      bool synced= 0;
+      if (queue_event(mi, event_buf, event_len))
       {
         mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
                    ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
                    "could not queue event from master");
         goto err;
       }
+
+      if (RUN_HOOK(binlog_relay_io, after_queue_event,
+                   (thd, mi, event_buf, event_len, synced)))
+      {
+        mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+                   ER(ER_SLAVE_FATAL_ERROR),
+                   "Failed to run 'after_queue_event' hook");
+        goto err;
+      }
+
       if (flush_master_info(mi, 1))
       {
         sql_print_error("Failed to flush master info file");
@@ -2750,6 +2796,7 @@ err:
   // print the current replication position
   sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
                   IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+  RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
   thd->set_query(NULL, 0);
   thd->reset_db(NULL, 0);
   if (mysql)
@@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQ
 }
 
 
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+  THD *thd= current_thd;
+  Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+  if (!mi)
+  {
+    sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+    return NULL;
+  }
+
+  bool allocated= false;
+  
+  if (!mysql)
+  {
+    if(!(mysql= mysql_init(NULL)))
+    {
+      sql_print_error("rpl_connect_master: failed in mysql_init()");
+      return NULL;
+    }
+    allocated= true;
+  }
+
+  /*
+    XXX: copied from connect_to_master, this function should not
+    change the slave status, so we cannot use connect_to_master
+    directly
+    
+    TODO: make this part a seperate function to eliminate duplication
+  */
+  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+  if (mi->ssl)
+  {
+    mysql_ssl_set(mysql,
+                  mi->ssl_key[0]?mi->ssl_key:0,
+                  mi->ssl_cert[0]?mi->ssl_cert:0,
+                  mi->ssl_ca[0]?mi->ssl_ca:0,
+                  mi->ssl_capath[0]?mi->ssl_capath:0,
+                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
+    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+                  &mi->ssl_verify_server_cert);
+  }
+#endif
+
+  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+  /* This one is not strictly needed but we have it here for completeness */
+  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+  if (io_slave_killed(thd, mi)
+      || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+                             mi->port, 0, 0))
+  {
+    if (!io_slave_killed(thd, mi))
+      sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
+                      mysql_error(mysql), mysql_errno(mysql));
+    
+    if (allocated)
+      mysql_close(mysql);                       // this will free the object
+    return NULL;
+  }
+  return mysql;
+}
+
 /*
   Store the file and position where the execute-slave thread are in the
   relay log.

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2009-08-11 13:05:25 +0000
+++ b/sql/sql_class.cc	2009-09-26 04:49:49 +0000
@@ -278,6 +278,42 @@ const char *set_thd_proc_info(THD *thd, 
 }
 
 extern "C"
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+                           pthread_mutex_t *mutex, const char *msg)
+{
+  if (!thd)
+    thd= current_thd;
+
+  const char* old_msg = thd->proc_info;
+  safe_mutex_assert_owner(mutex);
+  thd->mysys_var->current_mutex = mutex;
+  thd->mysys_var->current_cond = cond;
+  thd->proc_info = msg;
+  return old_msg;
+}
+
+extern "C"
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
+{
+  if (!thd)
+    thd= current_thd;
+
+  /*
+    Putting the mutex unlock in thd_exit_cond() ensures that
+    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
+    locked (if that would not be the case, you'll get a deadlock if someone
+    does a THD::awake() on you).
+  */
+  pthread_mutex_unlock(thd->mysys_var->current_mutex);
+  pthread_mutex_lock(&thd->mysys_var->mutex);
+  thd->mysys_var->current_mutex = 0;
+  thd->mysys_var->current_cond = 0;
+  thd->proc_info = old_msg;
+  pthread_mutex_unlock(&thd->mysys_var->mutex);
+  return;
+}
+
+extern "C"
 void **thd_ha_data(const THD *thd, const struct handlerton *hton)
 {
   return (void **) &thd->ha_data[hton->slot].ha_ptr;

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2009-08-11 13:05:25 +0000
+++ b/sql/sql_class.h	2009-09-26 04:49:49 +0000
@@ -22,6 +22,7 @@
 
 #include "log.h"
 #include "rpl_tblmap.h"
+#include "replication.h"
 
 /**
   An interface that is used to take an action when
@@ -1940,27 +1941,11 @@ public:
   inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
 			  const char* msg)
   {
-    const char* old_msg = proc_info;
-    safe_mutex_assert_owner(mutex);
-    mysys_var->current_mutex = mutex;
-    mysys_var->current_cond = cond;
-    proc_info = msg;
-    return old_msg;
+    return thd_enter_cond(this, cond, mutex, msg);
   }
   inline void exit_cond(const char* old_msg)
   {
-    /*
-      Putting the mutex unlock in exit_cond() ensures that
-      mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
-      locked (if that would not be the case, you'll get a deadlock if someone
-      does a THD::awake() on you).
-    */
-    pthread_mutex_unlock(mysys_var->current_mutex);
-    pthread_mutex_lock(&mysys_var->mutex);
-    mysys_var->current_mutex = 0;
-    mysys_var->current_cond = 0;
-    proc_info = old_msg;
-    pthread_mutex_unlock(&mysys_var->mutex);
+    thd_exit_cond(this, old_msg);
   }
   inline time_t query_start() { query_start_used=1; return start_time; }
   inline void set_time()

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2009-08-28 16:21:54 +0000
+++ b/sql/sql_parse.cc	2009-09-26 04:49:49 +0000
@@ -21,6 +21,7 @@
 #include <m_ctype.h>
 #include <myisam.h>
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 #include "sp_head.h"
 #include "sp.h"

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2009-05-18 08:10:30 +0000
+++ b/sql/sql_plugin.cc	2009-09-26 04:49:49 +0000
@@ -19,14 +19,6 @@
 #define REPORT_TO_LOG  1
 #define REPORT_TO_USER 2
 
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
 extern struct st_mysql_plugin *mysqld_builtins[];
 
 /**
@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL
   { C_STRING_WITH_LEN("STORAGE ENGINE") },
   { C_STRING_WITH_LEN("FTPARSER") },
   { C_STRING_WITH_LEN("DAEMON") },
-  { C_STRING_WITH_LEN("INFORMATION SCHEMA") }
+  { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
+  { C_STRING_WITH_LEN("REPLICATION") },
 };
 
 extern int initialize_schema_table(st_plugin_int *plugin);
@@ -93,7 +86,8 @@ static int min_plugin_info_interface_ver
   MYSQL_HANDLERTON_INTERFACE_VERSION,
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
-  MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+  MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION,
 };
 static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
 {
@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_ver
   MYSQL_HANDLERTON_INTERFACE_VERSION,
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
-  MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+  MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION,
 };
 
 static bool initialized= 0;

=== modified file 'sql/sql_plugin.h'
--- a/sql/sql_plugin.h	2009-05-14 12:03:33 +0000
+++ b/sql/sql_plugin.h	2009-09-26 04:49:49 +0000
@@ -18,6 +18,14 @@
 
 class sys_var;
 
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
 /*
   the following flags are valid for plugin_init()
 */

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2009-07-24 16:04:55 +0000
+++ b/sql/sql_repl.cc	2009-09-26 04:49:49 +0000
@@ -21,6 +21,7 @@
 #include "log_event.h"
 #include "rpl_filter.h"
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 int max_binlog_dump_events = 0; // unlimited
 my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, S
   DBUG_RETURN(0);
 }
 
+/*
+  Reset thread transmit packet buffer for event sending
+
+  This function allocates header bytes for event transmission, and
+  should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+                                 ulong *ev_offset, const char **errmsg)
+{
+  int ret= 0;
+  String *packet= &thd->packet;
+
+  /* reserve and set default header */
+  packet->length(0);
+  packet->set("\0", 1, &my_charset_bin);
+
+  if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+  {
+    *errmsg= "Failed to run hook 'reserve_header'";
+    my_errno= ER_UNKNOWN_ERROR;
+    ret= 1;
+  }
+  *ev_offset= packet->length();
+  return ret;
+}
+
 static int send_file(THD *thd)
 {
   NET* net = &thd->net;
@@ -346,6 +373,9 @@ void mysql_binlog_send(THD* thd, char* l
   LOG_INFO linfo;
   char *log_file_name = linfo.log_file_name;
   char search_file_name[FN_REFLEN], *name;
+
+  ulong ev_offset;
+
   IO_CACHE log;
   File file = -1;
   String* packet = &thd->packet;
@@ -361,6 +391,14 @@ void mysql_binlog_send(THD* thd, char* l
   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
   bzero((char*) &log,sizeof(log));
+  sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+                        thd->server_id, log_ident, (ulong)pos);
+  if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+  {
+    errmsg= "Failed to run hook 'transmit_start'";
+    my_errno= ER_UNKNOWN_ERROR;
+    goto err;
+  }
 
 #ifndef DBUG_OFF
   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -416,11 +454,9 @@ impossible position";
     goto err;
   }
 
-  /*
-    We need to start a packet with something other than 255
-    to distinguish it from error
-  */
-  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+  /* reset transmit packet for the fake rotate event below */
+  if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+    goto err;
 
   /*
     Tell the client about the log name with a fake Rotate event;
@@ -460,7 +496,7 @@ impossible position";
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
     goto err;
   }
-  packet->set("\0", 1, &my_charset_bin);
+
   /*
     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
     this larger than the corresponding packet (query) sent 
@@ -476,6 +512,11 @@ impossible position";
   log_lock = mysql_bin_log.get_log_lock();
   if (pos > BIN_LOG_HEADER_SIZE)
   {
+    /* reset transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
+
      /*
        Try to find a Format_description_log_event at the beginning of
        the binlog
@@ -483,29 +524,30 @@ impossible position";
      if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
      {
        /*
-         The packet has offsets equal to the normal offsets in a binlog
-         event +1 (the first character is \0).
+         The packet has offsets equal to the normal offsets in a
+         binlog event + ev_offset (the first ev_offset characters are
+         the header (default \0)).
        */
        DBUG_PRINT("info",
                   ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+1]));
-       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
-         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                        LOG_EVENT_BINLOG_IN_USE_F);
-         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
            should not increment master's binlog position
            (rli->group_master_log_pos)
          */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
          /*
            if reconnect master sends FD event with `created' as 0
            to avoid destroying temp tables.
           */
          int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+1, (ulong) 0);
+                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
          /* send it */
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
          {
@@ -531,8 +573,6 @@ impossible position";
          Format_description_log_event will be found naturally if it is written.
        */
      }
-     /* reset the packet as we wrote to it in any case */
-     packet->set("\0", 1, &my_charset_bin);
   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
   else
   {
@@ -544,6 +584,12 @@ impossible position";
 
   while (!net->error && net->vio != 0 && !thd->killed)
   {
+    Log_event_type event_type= UNKNOWN_EVENT;
+
+    /* reset the transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
     while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
     {
 #ifndef DBUG_OFF
@@ -556,15 +602,25 @@ impossible position";
       }
 #endif
 
-      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+      event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+      if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
-        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                       LOG_EVENT_BINLOG_IN_USE_F);
-        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
-      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+      else if (event_type == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
+      pos = my_b_tell(&log);
+      if (RUN_HOOK(binlog_transmit, before_send_event,
+                   (thd, flags, packet, log_file_name, pos)))
+      {
+        my_errno= ER_UNKNOWN_ERROR;
+        errmsg= "run 'before_send_event' hook failed";
+        goto err;
+      }
+
       if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
       {
 	errmsg = "Failed on my_net_write()";
@@ -572,9 +628,8 @@ impossible position";
 	goto err;
       }
 
-      DBUG_PRINT("info", ("log event code %d",
-			  (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+      DBUG_PRINT("info", ("log event code %d", event_type));
+      if (event_type == LOAD_EVENT)
       {
 	if (send_file(thd))
 	{
@@ -583,7 +638,17 @@ impossible position";
 	  goto err;
 	}
       }
-      packet->set("\0", 1, &my_charset_bin);
+
+      if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+      {
+        errmsg= "Failed to run hook 'after_send_event'";
+        my_errno= ER_UNKNOWN_ERROR;
+        goto err;
+      }
+
+      /* reset transmit packet for next loop */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
     }
 
     /*
@@ -634,6 +699,11 @@ impossible position";
 	}
 #endif
 
+        /* reset the transmit packet for the event read from binary log
+           file */
+        if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+          goto err;
+        
 	/*
 	  No one will update the log while we are reading
 	  now, but we'll be quick and just read one record
@@ -650,6 +720,7 @@ impossible position";
 	  /* we read successfully, so we'll need to send it to the slave */
 	  pthread_mutex_unlock(log_lock);
 	  read_packet = 1;
+          event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
 	  break;
 
 	case LOG_READ_EOF:
@@ -676,8 +747,17 @@ impossible position";
 	}
 
 	if (read_packet)
-	{
-	  thd_proc_info(thd, "Sending binlog event to slave");
+        {
+          thd_proc_info(thd, "Sending binlog event to slave");
+          pos = my_b_tell(&log);
+          if (RUN_HOOK(binlog_transmit, before_send_event,
+                       (thd, flags, packet, log_file_name, pos)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "run 'before_send_event' hook failed";
+            goto err;
+          }
+	  
 	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 	  {
 	    errmsg = "Failed on my_net_write()";
@@ -685,7 +765,7 @@ impossible position";
 	    goto err;
 	  }
 
-	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+	  if (event_type == LOAD_EVENT)
 	  {
 	    if (send_file(thd))
 	    {
@@ -694,11 +774,13 @@ impossible position";
 	      goto err;
 	    }
 	  }
-	  packet->set("\0", 1, &my_charset_bin);
-	  /*
-	    No need to net_flush because we will get to flush later when
-	    we hit EOF pretty quick
-	  */
+
+          if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "Failed to run hook 'after_send_event'";
+            goto err;
+          }
 	}
 
 	if (fatal_error)
@@ -734,6 +816,10 @@ impossible position";
       end_io_cache(&log);
       (void) my_close(file, MYF(MY_WME));
 
+      /* reset transmit packet for the possible fake rotate event */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
+      
       /*
         Call fake_rotate_event() in case the previous log (the one which
         we have just finished reading) did not contain a Rotate event
@@ -750,9 +836,6 @@ impossible position";
 	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 	goto err;
       }
-
-      packet->length(0);
-      packet->append('\0');
     }
   }
 
@@ -760,6 +843,7 @@ end:
   end_io_cache(&log);
   (void)my_close(file, MYF(MY_WME));
 
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   my_eof(thd);
   thd_proc_info(thd, "Waiting to finalize termination");
   pthread_mutex_lock(&LOCK_thread_count);
@@ -770,6 +854,7 @@ end:
 err:
   thd_proc_info(thd, "Waiting to finalize termination");
   end_io_cache(&log);
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   /*
     Exclude  iteration through thread list
     this is needed for purge_logs() - it will iterate through
@@ -1064,6 +1149,7 @@ int reset_slave(THD *thd, Master_info* m
     goto err;
   }
 
+  RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
 err:
   unlock_slave_threads(mi);
   if (error)
@@ -1363,7 +1449,11 @@ int reset_master(THD* thd)
                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
     return 1;
   }
-  return mysql_bin_log.reset_logs(thd);
+
+  if (mysql_bin_log.reset_logs(thd))
+    return 1;
+  RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+  return 0;
 }
 
 int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1836,5 +1926,3 @@ int init_replication_sys_vars()
 }
 
 #endif /* HAVE_REPLICATION */
-
-


Attachment: [text/bzr-bundle] bzr/zhenxing.he@sun.com-20090926044949-zoapse64vqd08imo.bundle
Thread
bzr commit into mysql-5.1-rep-semisync branch (zhenxing.he:3108) Bug#39012Bug#42244 Bug#44058 Bug#45672 Bug#45673 Bug#45819 Bug#45973 WL#1720 WL#4398He Zhenxing26 Sep