List:Commits« Previous MessageNext Message »
From:He Zhenxing Date:July 13 2008 1:54am
Subject:bzr push into mysql-6.0 branch (hezx:2672)
View as plain text  
 2672 He Zhenxing	2008-07-13 [merge]
      auto merge
modified:
  mysql-test/r/information_schema.result
  mysql-test/suite/rpl/r/rpl_circular_for_4_hosts.result
  mysql-test/suite/rpl/t/rpl_circular_for_4_hosts.test
  mysql-test/suite/rpl/t/rpl_row_stop_middle_update.test
  mysql-test/suite/rpl/t/rpl_server_id1.test
  mysql-test/t/information_schema.test
  sql/sql_show.cc

=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h	2008-06-26 14:22:39 +0000
+++ b/include/mysql/plugin.h	2008-07-11 09:58:52 +0000
@@ -630,86 +630,6 @@ void mysql_query_cache_invalidate4(MYSQL
                                    int using_trx);
 
 /**
-   Read a packet from the current thread connection.
-
-   @note The packet buffer will be allocated and freed automatically,
-   the memory pointed to by @a packet will be overwritten or freed by
-   the next read or write using current thread connection.
-
-   @param packet   return the pointer to the packet read
-   @param len      return the length of packet read
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int thd_net_read(const unsigned char **packet, size_t *len);
-
-/**
-   Write a packet to the current thread connection.
-
-   @note The packet buffer will be allocated and freed automatically,
-   the memory pointed to by @a packet will be overwritten or freed by
-   the next read or write using current thread connection.
-
-   @param packet   packet to write to the connection
-   @param len      length of the packet
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int thd_net_write(const unsigned char *packet, size_t len);
-
-/**
-   Flush write buffer of current thread connection.
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int thd_net_flush();
-
-/**
-   Read a packet from the connection.
-
-   @note The packet buffer will be allocated and freed automatically,
-   the memory pointed to by @a packet will be overwritten or freed by
-   the next read or write using the same @a mysql connection.
-
-   @param mysql    mysql client connection
-   @param packet   return the pointer to the packet read
-   @param len      return the length of packet read
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len);
-
-/**
-   Write a packet to the connection.
-
-   @note The packet buffer will be allocated and freed automatically,
-   the memory pointed to by @a packet will be overwritten or freed by
-   the next read or write using the same @a mysql connection.
-
-   @param mysql    mysql client connection
-   @param packet   packet to write to the connection
-   @param len      length of the packet
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len);
-
-/**
-   Flush write buffer of connection.
-
-   @param mysql    mysql client connection
-
-   @retval 0 Success
-   @retval 1 Failure
-*/
-int mysql_net_flush(MYSQL *mysql);
-
-/**
    Get the value of user variable as an integer.
 
    This function will return the value of variable @a name as an

=== modified file 'sql/replication.h'
--- a/sql/replication.h	2008-06-26 14:22:39 +0000
+++ b/sql/replication.h	2008-07-11 09:58:52 +0000
@@ -34,6 +34,16 @@ enum Trans_flags {
 typedef struct Trans_param {
   uint32 server_id;
   uint32 flags;
+
+  /*
+    The latest binary log file name and position written by current
+    transaction, if binary log is disabled or no log event has been
+    written into binary log file by current transaction (events
+    written into transaction log cache are not counted), these two
+    member will be zero.
+  */
+  const char *log_file;
+  my_off_t log_pos;
 } Trans_param;
 
 /**
@@ -244,7 +254,7 @@ typedef struct Binlog_relay_IO_param {
   /* Master host, user and port */
   char *host;
   char *user;
-  uint port;
+  unsigned int port;
 
   char *master_log_name;
   my_off_t master_log_pos;
@@ -423,6 +433,28 @@ int register_binlog_relay_io_observer(Bi
 */
 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
 
+/**
+   Connect to master
+
+   This function can only used in the slave I/O thread context, and
+   will use the same master information to do the connection.
+
+   @code
+   MYSQL *mysql = mysql_init(NULL);
+   if (rpl_connect_master(mysql))
+   {
+     // do stuff with the connection
+   }
+   mysql_close(mysql); // close the connection
+   @endcode
+   
+   @param mysql address of MYSQL structure to use, pass NULL will
+   create a new one
+
+   @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
 #ifdef __cplusplus
 }
 #endif

=== modified file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	2008-06-26 14:22:39 +0000
+++ b/sql/rpl_handler.cc	2008-07-11 09:58:52 +0000
@@ -29,46 +29,15 @@ Binlog_transmit_delegate *binlog_transmi
 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 #endif /* HAVE_REPLICATION */
 
-int thd_net_read(const unsigned char **packet, size_t *len)
-{
-  THD *thd= current_thd;
-  ulong ret= my_net_read(&thd->net);
-  if (ret == packet_error)
-    return 1;
-  *len= ret;
-  *packet= thd->net.read_pos;
-  return 0;
-}
-
-int thd_net_write(const unsigned char *packet, size_t len)
-{
-  return my_net_write(&current_thd->net, packet, len);
-}
-
-int thd_net_flush()
-{
-  return net_flush(&current_thd->net);
-}
-
-int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len)
-{
-  ulong ret= my_net_read(&mysql->net);
-  if (ret == packet_error)
-    return 1;
-  *len= ret;
-  *packet= mysql->net.read_pos;
-  return 0;
-}
-
-int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len)
-{
-  return my_net_write(&mysql->net, packet, len);
-}
+/*
+  structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+  my_off_t log_pos;
+  char log_file[FN_REFLEN];
+} Trans_binlog_info;
 
-int mysql_net_flush(MYSQL *mysql)
-{
-  return net_flush(&mysql->net);
-}
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 
 int get_user_var_int(const char *name,
                      long long int *value, int *null_value)
@@ -138,6 +107,9 @@ int delegates_init()
 #endif /* HAVE_REPLICATION */
       )
     return 1;
+
+  if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+    return 1;
   return 0;
 }
 
@@ -159,7 +131,7 @@ void delegates_destroy()
   Add observer plugins to the thd->lex list, after each statement, all
   plugins add to thd->lex will be automatically unlocked.
  */
-#define FOREACH_OBSERVER(f, thd, args)                                  \
+#define FOREACH_OBSERVER(r, f, thd, args)                               \
   param.server_id= thd->server_id;                                      \
   read_lock();                                                          \
   Observer_info_iterator iter= observer_info_iter();                    \
@@ -170,36 +142,73 @@ void delegates_destroy()
       my_plugin_lock(thd, &info->plugin);                               \
     if (!plugin)                                                        \
     {                                                                   \
-      unlock();                                                         \
-      return 1;                                                         \
+      r= 1;                                                             \
+      break;                                                            \
     }                                                                   \
     if (((Observer *)info->observer)->f                                 \
         && ((Observer *)info->observer)->f args)                        \
     {                                                                   \
-      unlock();                                                         \
-      return 1;                                                         \
+      r= 1;                                                             \
+      break;                                                            \
     }                                                                   \
   }                                                                     \
-  unlock();                                                             \
-  return 0
+  unlock()
 
 
 int Trans_delegate::after_commit(THD *thd, bool all)
 {
   Trans_param param;
-  if (all || thd->transaction.all.ha_list == 0)
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
     param.flags |= TRANS_IS_REAL_TRANS;
 
-  FOREACH_OBSERVER(after_commit, thd, (&param));
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
 }
 
 int Trans_delegate::after_rollback(THD *thd, bool all)
 {
   Trans_param param;
-  if (all || thd->transaction.all.ha_list == 0)
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
     param.flags |= TRANS_IS_REAL_TRANS;
 
-  FOREACH_OBSERVER(after_rollback, thd, (&param));
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
 }
 
 int Binlog_storage_delegate::after_flush(THD *thd,
@@ -212,7 +221,24 @@ int Binlog_storage_delegate::after_flush
   if (synced)
     flags |= BINLOG_STORAGE_IS_SYNCED;
 
-  FOREACH_OBSERVER(after_flush, thd, (&param, log_file, log_pos, flags));
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  if (!log_info)
+  {
+    if(!(log_info=
+         (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+      return 1;
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+  }
+    
+  strcpy(log_info->log_file, log_file+dirname_length(log_file));
+  log_info->log_pos = log_pos;
+  
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_flush, thd,
+                   (&param, log_info->log_file, log_info->log_pos, flags));
+  return ret;
 }
 
 #ifdef HAVE_REPLICATION
@@ -223,7 +249,9 @@ int Binlog_transmit_delegate::transmit_s
   Binlog_transmit_param param;
   param.flags= flags;
 
-  FOREACH_OBSERVER(transmit_start, thd, (&param, log_file, log_pos));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+  return ret;
 }
 
 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
@@ -231,7 +259,9 @@ int Binlog_transmit_delegate::transmit_s
   Binlog_transmit_param param;
   param.flags= flags;
 
-  FOREACH_OBSERVER(transmit_stop, thd, (&param));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+  return ret;
 }
 
 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
@@ -248,7 +278,8 @@ int Binlog_transmit_delegate::reserve_he
   Binlog_transmit_param param;
   param.flags= flags;
   param.server_id= thd->server_id;
-  
+
+  int ret= 0;
   read_lock();
   Observer_info_iterator iter= observer_info_iter();
   Observer_info *info= iter++;
@@ -258,8 +289,8 @@ int Binlog_transmit_delegate::reserve_he
       my_plugin_lock(thd, &info->plugin);
     if (!plugin)
     {
-      unlock();
-      return 1;
+      ret= 1;
+      break;
     }
     hlen= 0;
     if (((Observer *)info->observer)->reserve_header
@@ -268,19 +299,19 @@ int Binlog_transmit_delegate::reserve_he
                                                         RESERVE_HEADER_SIZE,
                                                         &hlen))
     {
-      unlock();
-      return 1;
+      ret= 1;
+      break;
     }
     if (hlen == 0)
       continue;
     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
     {
-      unlock();
-      return 1;
+      ret= 1;
+      break;
     }
   }
   unlock();
-  return 0;
+  return ret;
 }
 
 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
@@ -291,9 +322,12 @@ int Binlog_transmit_delegate::before_sen
   Binlog_transmit_param param;
   param.flags= flags;
 
-  FOREACH_OBSERVER(before_send_event, thd,
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_send_event, thd,
                    (&param, (uchar *)packet->c_ptr(),
-                    packet->length(), log_file, log_pos));
+                    packet->length(),
+                    log_file+dirname_length(log_file), log_pos));
+  return ret;
 }
 
 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
@@ -302,8 +336,10 @@ int Binlog_transmit_delegate::after_send
   Binlog_transmit_param param;
   param.flags= flags;
 
-  FOREACH_OBSERVER(after_send_event, thd,
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_send_event, thd,
                    (&param, packet->c_ptr(), packet->length()));
+  return ret;
 }
 
 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
@@ -312,34 +348,42 @@ int Binlog_transmit_delegate::after_rese
   Binlog_transmit_param param;
   param.flags= flags;
 
-  FOREACH_OBSERVER(after_reset_master, thd, (&param));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+  return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+                                          Master_info *mi)
+{
+  param->mysql= mi->mysql;
+  param->user= mi->user;
+  param->host= mi->host;
+  param->port= mi->port;
+  param->master_log_name= mi->master_log_name;
+  param->master_log_pos= mi->master_log_pos;
 }
 
 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
 {
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
-  FOREACH_OBSERVER(thread_start, thd, (&param));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+  return ret;
 }
 
+
 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
 {
 
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
-  FOREACH_OBSERVER(thread_stop, thd, (&param));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+  return ret;
 }
 
 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
@@ -347,14 +391,11 @@ int Binlog_relay_IO_delegate::before_req
                                                       ushort flags)
 {
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
-  FOREACH_OBSERVER(before_request_transmit, thd, (&param, (uint32)flags));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+  return ret;
 }
 
 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
@@ -363,15 +404,12 @@ int Binlog_relay_IO_delegate::after_read
                                                ulong *event_len)
 {
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
-  FOREACH_OBSERVER(after_read_event, thd,
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_read_event, thd,
                    (&param, packet, len, event_buf, event_len));
+  return ret;
 }
 
 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
@@ -380,33 +418,27 @@ int Binlog_relay_IO_delegate::after_queu
                                                 bool synced)
 {
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
   uint32 flags=0;
   if (synced)
     flags |= BINLOG_STORAGE_IS_SYNCED;
 
-  FOREACH_OBSERVER(after_queue_event, thd,
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_queue_event, thd,
                    (&param, event_buf, event_len, flags));
+  return ret;
 }
 
 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
 
 {
   Binlog_relay_IO_param param;
-  param.mysql= mi->mysql;
-  param.user= mi->user;
-  param.host= mi->host;
-  param.port= mi->port;
-  param.master_log_name= mi->master_log_name;
-  param.master_log_pos= mi->master_log_pos;
+  init_param(&param, mi);
 
-  FOREACH_OBSERVER(after_reset_slave, thd, (&param));
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+  return ret;
 }
 #endif /* HAVE_REPLICATION */
 

=== modified file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	2008-06-26 14:22:39 +0000
+++ b/sql/rpl_handler.h	2008-07-11 09:58:52 +0000
@@ -187,6 +187,8 @@ public:
                         const char *event_buf, ulong event_len,
                         bool synced);
   int after_reset_slave(THD *thd, Master_info *mi);
+private:
+  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
 };
 #endif /* HAVE_REPLICATION */
 
@@ -200,7 +202,12 @@ extern Binlog_transmit_delegate *binlog_
 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 #endif /* HAVE_REPLICATION */
 
+/*
+  if there is no observers in the delegate, we can return 0
+  immediately.
+*/
 #define RUN_HOOK(group, hook, args)             \
-  group ##_delegate->hook args
+  (group ##_delegate->is_empty() ?              \
+   0 : group ##_delegate->hook args)
 
 #endif /* RPL_HANDLER_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2008-06-28 02:49:04 +0000
+++ b/sql/slave.cc	2008-07-12 09:21:02 +0000
@@ -68,6 +68,8 @@ ulonglong relay_log_space_limit = 0;
 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 int events_till_abort = -1;
 
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
 enum enum_slave_reconnect_actions
 {
   SLAVE_RECON_ACT_REG= 0,
@@ -2140,6 +2142,8 @@ pthread_handler_t handle_slave_io(void *
                             mi->master_log_name,
                             llstr(mi->master_log_pos,llbuff)));
 
+  /* This must be called before run any binlog_relay_io hooks */
+  my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
 
   if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
     goto err;
@@ -3483,6 +3487,64 @@ static int safe_reconnect(THD* thd, MYSQ
 }
 
 
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+  THD *thd= current_thd;
+  Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+  if (!mi)
+  {
+    sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+    return NULL;
+  }
+
+  bool allocated= false;
+  
+  if (!mysql)
+  {
+    if(!(mysql= mysql_init(NULL)))
+      return NULL;
+    allocated= true;
+  }
+
+  /*
+    XXX: copied from connect_to_master, this function should not
+    change the slave status, so we cannot use connect_to_master
+    directly
+    
+    TODO: make this part a seperate function to eliminate duplication
+  */
+  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+  if (mi->ssl)
+  {
+    mysql_ssl_set(mysql,
+                  mi->ssl_key[0]?mi->ssl_key:0,
+                  mi->ssl_cert[0]?mi->ssl_cert:0,
+                  mi->ssl_ca[0]?mi->ssl_ca:0,
+                  mi->ssl_capath[0]?mi->ssl_capath:0,
+                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
+    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+                  &mi->ssl_verify_server_cert);
+  }
+#endif
+
+  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+  /* This one is not strictly needed but we have it here for completeness */
+  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+  if (io_slave_killed(thd, mi)
+      || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+                             mi->port, 0, 0))
+  {
+    if (allocated)
+      mysql_close(mysql);                       // this will free the object
+    return NULL;
+  }
+  return mysql;
+}
+
 /*
   Store the file and position where the execute-slave thread are in the
   relay log.

Thread
bzr push into mysql-6.0 branch (hezx:2672) He Zhenxing13 Jul