List:Commits« Previous MessageNext Message »
From:He Zhenxing Date:June 27 2008 1:06pm
Subject:bzr commit into mysql-6.0 branch (hezx:2685) WL#4398
View as plain text  
#At file:///media/sda3/work/mysql/bzrwork/semisync/6.0/

 2685 He Zhenxing	2008-06-27
      WL#4398 Replication Interface for semi-synchronous replication
      
      Add missing files from previous push
added:
  sql/replication.h
  sql/rpl_handler.cc
  sql/rpl_handler.h

=== added file 'sql/replication.h'
--- a/sql/replication.h	1970-01-01 00:00:00 +0000
+++ b/sql/replication.h	2008-06-27 13:06:03 +0000
@@ -0,0 +1,429 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+   Transaction observer flags.
+*/
+enum Trans_flags {
+  /** Transaction is a real transaction */
+  TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+   Transaction observer parameter
+*/
+typedef struct Trans_param {
+  uint32 server_id;
+  uint32 flags;
+} Trans_param;
+
+/**
+   Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+  uint32 len;
+
+  /**
+     This callback is called after transaction commit
+     
+     This callback is called right after commit to storage engines for
+     transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     succeeded.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_commit)(Trans_param *param);
+
+  /**
+     This callback is called after transaction rollback
+
+     This callback is called right after rollback to storage engines
+     for transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     failed.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+   Binlog storage flags
+*/
+enum Binlog_storage_flags {
+  /** Binary log was sync:ed */
+  BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+   Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+  uint32 server_id;
+} Binlog_storage_param;
+
+/**
+   Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+  uint32 len;
+
+  /**
+     This callback is called after binlog has been flushed
+
+     This callback is called after cached events have been flushed to
+     binary log file. Whether the binary log file is synchronized to
+     disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+     @param param Observer common parameter
+     @param log_file Binlog file name been updated
+     @param log_pos Binlog position after update
+     @param flags flags for binlog storage
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_flush)(Binlog_storage_param *param,
+                     const char *log_file, my_off_t log_pos,
+                     uint32 flags);
+} Binlog_storage_observer;
+
+/**
+   Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+  uint32 server_id;
+  uint32 flags;
+} Binlog_transmit_param;
+
+/**
+   Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+  uint32 len;
+  
+  /**
+     This callback is called when binlog dumping starts
+
+
+     @param param Observer common parameter
+     @param log_file Binlog file name to transmit from
+     @param log_pos Binlog position to transmit from
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_start)(Binlog_transmit_param *param,
+                        const char *log_file, my_off_t log_pos);
+
+  /**
+     This callback is called when binlog dumping stops
+
+     @param param Observer common parameter
+     
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_stop)(Binlog_transmit_param *param);
+
+  /**
+     This callback is called to reserve bytes in packet header for event transmission
+
+     This callback is called when resetting transmit packet header to
+     reserve bytes for this observer in packet header.
+
+     The @a header buffer is allocated by the server code, and @a size
+     is the size of the header buffer. Each observer can only reserve
+     a maximum size of @a size in the header.
+
+     @param param Observer common parameter
+     @param header Pointer of the header buffer
+     @param size Size of the header buffer
+     @param len Header length reserved by this observer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*reserve_header)(Binlog_transmit_param *param,
+                        unsigned char *header,
+                        unsigned long size,
+                        unsigned long *len);
+
+  /**
+     This callback is called before sending an event packet to slave
+
+     @param param Observer common parameter
+     @param packet Binlog event packet to send
+     @param len Length of the event packet
+     @param log_file Binlog file name of the event packet to send
+     @param log_pos Binlog position of the event packet to send
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_send_event)(Binlog_transmit_param *param,
+                           unsigned char *packet, unsigned long len,
+                           const char *log_file, my_off_t log_pos );
+
+  /**
+     This callback is called after sending an event packet to slave
+
+     @param param Observer common parameter
+     @param event_buf Binlog event packet buffer sent
+     @param len length of the event packet buffer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+   */
+  int (*after_send_event)(Binlog_transmit_param *param,
+                          const char *event_buf, unsigned long len);
+
+  /**
+     This callback is called after resetting master status
+
+     This is called when executing the command RESET MASTER, and is
+     used to reset status variables added by observers.
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+   Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+  /** Binary relay log was sync:ed */
+  BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+  Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+  uint32 server_id;
+
+  /* Master host, user and port */
+  char *host;
+  char *user;
+  uint port;
+
+  char *master_log_name;
+  my_off_t master_log_pos;
+
+  MYSQL *mysql;                        /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+   Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+  uint32 len;
+
+  /**
+     This callback is called when slave IO thread starts
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_start)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called when slave IO thread stops
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_stop)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called before slave requesting binlog transmission from master
+
+     This is called before slave issuing BINLOG_DUMP command to master
+     to request binlog.
+
+     @param param Observer common parameter
+     @param flags binlog dump flags
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+  /**
+     This callback is called after read an event packet from master
+
+     @param param Observer common parameter
+     @param packet The event packet read from master
+     @param len Length of the event packet read from master
+     @param event_buf The event packet return after process
+     @param event_len The length of event packet return after process
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_read_event)(Binlog_relay_IO_param *param,
+                          const char *packet, unsigned long len,
+                          const char **event_buf, unsigned long *event_len);
+
+  /**
+     This callback is called after written an event packet to relay log
+
+     @param param Observer common parameter
+     @param event_buf Event packet written to relay log
+     @param event_len Length of the event packet written to relay log
+     @param flags flags for relay log
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_queue_event)(Binlog_relay_IO_param *param,
+                           const char *event_buf, unsigned long event_len,
+                           uint32 flags);
+
+  /**
+     This callback is called after reset slave relay log IO status
+     
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+   Register a transaction observer
+
+   @param observer The transaction observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Unregister a transaction observer
+
+   @param observer The transaction observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Register a binlog storage observer
+
+   @param observer The binlog storage observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Unregister a binlog storage observer
+
+   @param observer The binlog storage observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Register a binlog transmit observer
+
+   @param observer The binlog transmit observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Unregister a binlog transmit observer
+
+   @param observer The binlog transmit observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Register a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Unregister a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */

=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc	2008-06-27 13:06:03 +0000
@@ -0,0 +1,453 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+int thd_net_read(const unsigned char **packet, size_t *len)
+{
+  THD *thd= current_thd;
+  ulong ret= my_net_read(&thd->net);
+  if (ret == packet_error)
+    return 1;
+  *len= ret;
+  *packet= thd->net.read_pos;
+  return 0;
+}
+
+int thd_net_write(const unsigned char *packet, size_t len)
+{
+  return my_net_write(&current_thd->net, packet, len);
+}
+
+int thd_net_flush()
+{
+  return net_flush(&current_thd->net);
+}
+
+int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len)
+{
+  ulong ret= my_net_read(&mysql->net);
+  if (ret == packet_error)
+    return 1;
+  *len= ret;
+  *packet= mysql->net.read_pos;
+  return 0;
+}
+
+int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len)
+{
+  return my_net_write(&mysql->net, packet, len);
+}
+
+int mysql_net_flush(MYSQL *mysql)
+{
+  return net_flush(&mysql->net);
+}
+
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_int(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_real(const char *name,
+                      double *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_real(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+                     size_t len, unsigned int precision, int *null_value)
+{
+  String str;
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  entry->val_str(&null_val, &str, precision);
+  strncpy(value, str.c_ptr(), len);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int delegates_init()
+{
+  static unsigned char trans_mem[sizeof(Trans_delegate)];
+  static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+  static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+  static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+  
+  if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+      || (!transaction_delegate->is_inited())
+      || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+      || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+      || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+      || (!binlog_transmit_delegate->is_inited())
+      || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+      || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+      )
+    return 1;
+  return 0;
+}
+
+void delegates_destroy()
+{
+  transaction_delegate->~Trans_delegate();
+  binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+  binlog_transmit_delegate->~Binlog_transmit_delegate();
+  binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+  This macro is used by almost all the Delegate methods to iterate
+  over all the observers running given callback function of the
+  delegate .
+  
+  Add observer plugins to the thd->lex list, after each statement, all
+  plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(f, thd, args)                                  \
+  param.server_id= thd->server_id;                                      \
+  read_lock();                                                          \
+  Observer_info_iterator iter= observer_info_iter();                    \
+  Observer_info *info= iter++;                                          \
+  for (; info; info= iter++)                                            \
+  {                                                                     \
+    plugin_ref plugin=                                                  \
+      my_plugin_lock(thd, &info->plugin);                               \
+    if (!plugin)                                                        \
+    {                                                                   \
+      unlock();                                                         \
+      return 1;                                                         \
+    }                                                                   \
+    if (((Observer *)info->observer)->f                                 \
+        && ((Observer *)info->observer)->f args)                        \
+    {                                                                   \
+      unlock();                                                         \
+      return 1;                                                         \
+    }                                                                   \
+  }                                                                     \
+  unlock();                                                             \
+  return 0
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  if (all || thd->transaction.all.ha_list == 0)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  FOREACH_OBSERVER(after_commit, thd, (&param));
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  if (all || thd->transaction.all.ha_list == 0)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  FOREACH_OBSERVER(after_rollback, thd, (&param));
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+                                         const char *log_file,
+                                         my_off_t log_pos,
+                                         bool synced)
+{
+  Binlog_storage_param param;
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  FOREACH_OBSERVER(after_flush, thd, (&param, log_file, log_pos, flags));
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+                                             const char *log_file,
+                                             my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(transmit_start, thd, (&param, log_file, log_pos));
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(transmit_stop, thd, (&param));
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+                                             String *packet)
+{
+  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+     bytes should be enough for each Observer to reserve their extra
+     header. If later found this is not enough, we can increase this
+     /HEZX
+  */
+#define RESERVE_HEADER_SIZE 32
+  unsigned char header[RESERVE_HEADER_SIZE];
+  ulong hlen;
+  Binlog_transmit_param param;
+  param.flags= flags;
+  param.server_id= thd->server_id;
+  
+  read_lock();
+  Observer_info_iterator iter= observer_info_iter();
+  Observer_info *info= iter++;
+  for (; info; info= iter++)
+  {
+    plugin_ref plugin=
+      my_plugin_lock(thd, &info->plugin);
+    if (!plugin)
+    {
+      unlock();
+      return 1;
+    }
+    hlen= 0;
+    if (((Observer *)info->observer)->reserve_header
+        && ((Observer *)info->observer)->reserve_header(&param,
+                                                        header,
+                                                        RESERVE_HEADER_SIZE,
+                                                        &hlen))
+    {
+      unlock();
+      return 1;
+    }
+    if (hlen == 0)
+      continue;
+    if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+    {
+      unlock();
+      return 1;
+    }
+  }
+  unlock();
+  return 0;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+                                                String *packet,
+                                                const char *log_file,
+                                                my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(before_send_event, thd,
+                   (&param, (uchar *)packet->c_ptr(),
+                    packet->length(), log_file, log_pos));
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+                                               String *packet)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(after_send_event, thd,
+                   (&param, packet->c_ptr(), packet->length()));
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(after_reset_master, thd, (&param));
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  FOREACH_OBSERVER(thread_start, thd, (&param));
+}
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  FOREACH_OBSERVER(thread_stop, thd, (&param));
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+                                                      Master_info *mi,
+                                                      ushort flags)
+{
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  FOREACH_OBSERVER(before_request_transmit, thd, (&param, (uint32)flags));
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+                                               const char *packet, ulong len,
+                                               const char **event_buf,
+                                               ulong *event_len)
+{
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  FOREACH_OBSERVER(after_read_event, thd,
+                   (&param, packet, len, event_buf, event_len));
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+                                                const char *event_buf,
+                                                ulong event_len,
+                                                bool synced)
+{
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  FOREACH_OBSERVER(after_queue_event, thd,
+                   (&param, event_buf, event_len, flags));
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+  Binlog_relay_IO_param param;
+  param.mysql= mi->mysql;
+  param.user= mi->user;
+  param.host= mi->host;
+  param.port= mi->port;
+  param.master_log_name= mi->master_log_name;
+  param.master_log_pos= mi->master_log_pos;
+
+  FOREACH_OBSERVER(after_reset_slave, thd, (&param));
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */

=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h	2008-06-27 13:06:03 +0000
@@ -0,0 +1,206 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+  void *observer;
+  st_plugin_int *plugin_int;
+  plugin_ref plugin;
+
+  Observer_info(void *ob, st_plugin_int *p)
+    :observer(ob), plugin_int(p)
+  {
+    plugin= plugin_int_to_ref(plugin_int);
+  }
+};
+
+class Delegate {
+public:
+  typedef List<Observer_info> Observer_info_list;
+  typedef List_iterator<Observer_info> Observer_info_iterator;
+  
+  int add_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (!info)
+    {
+      info= new Observer_info(observer, plugin);
+      if (!info || observer_info_list.push_back(info, &memroot))
+        ret= TRUE;
+    }
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+  
+  int remove_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (info)
+      iter.remove();
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+
+  inline Observer_info_iterator observer_info_iter()
+  {
+    return Observer_info_iterator(observer_info_list);
+  }
+
+  inline bool is_empty()
+  {
+    return observer_info_list.is_empty();
+  }
+
+  inline int read_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_rdlock(&lock);
+  }
+
+  inline int write_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_wrlock(&lock);
+  }
+
+  inline int unlock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_unlock(&lock);
+  }
+
+  inline bool is_inited()
+  {
+    return inited;
+  }
+  
+  Delegate()
+  {
+    inited= FALSE;
+    if (my_rwlock_init(&lock, NULL))
+      return;
+    init_sql_alloc(&memroot, 1024, 0);
+    inited= TRUE;
+  }
+  ~Delegate()
+  {
+    inited= FALSE;
+    rwlock_destroy(&lock);
+    free_root(&memroot, MYF(0));
+  }
+
+private:
+  Observer_info_list observer_info_list;
+  rw_lock_t lock;
+  MEM_ROOT memroot;
+  bool inited;
+};
+
+class Trans_delegate
+  :public Delegate {
+public:
+  typedef Trans_observer Observer;
+  int before_commit(THD *thd, bool all);
+  int before_rollback(THD *thd, bool all);
+  int after_commit(THD *thd, bool all);
+  int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+  :public Delegate {
+public:
+  typedef Binlog_storage_observer Observer;
+  int after_flush(THD *thd, const char *log_file,
+                  my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+  :public Delegate {
+public:
+  typedef Binlog_transmit_observer Observer;
+  int transmit_start(THD *thd, ushort flags,
+                     const char *log_file, my_off_t log_pos);
+  int transmit_stop(THD *thd, ushort flags);
+  int reserve_header(THD *thd, ushort flags, String *packet);
+  int before_send_event(THD *thd, ushort flags,
+                        String *packet, const
+                        char *log_file, my_off_t log_pos );
+  int after_send_event(THD *thd, ushort flags,
+                       String *packet);
+  int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+  :public Delegate {
+public:
+  typedef Binlog_relay_IO_observer Observer;
+  int thread_start(THD *thd, Master_info *mi);
+  int thread_stop(THD *thd, Master_info *mi);
+  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+  int after_read_event(THD *thd, Master_info *mi,
+                       const char *packet, ulong len,
+                       const char **event_buf, ulong *event_len);
+  int after_queue_event(THD *thd, Master_info *mi,
+                        const char *event_buf, ulong event_len,
+                        bool synced);
+  int after_reset_slave(THD *thd, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+#define RUN_HOOK(group, hook, args)             \
+  group ##_delegate->hook args
+
+#endif /* RPL_HANDLER_H */

Thread
bzr commit into mysql-6.0 branch (hezx:2685) WL#4398He Zhenxing27 Jun
  • Re: bzr commit into mysql-6.0 branch (hezx:2685) WL#4398Konstantin Osipov27 Jun
    • Re: bzr commit into mysql-6.0 branch (hezx:2685) WL#4398He Zhenxing28 Jun