List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:December 1 2010 7:15pm
Subject:bzr commit into mysql-next-mr.crash-safe branch (alfranio.correia:3224)
View as plain text  
#At file:///home/acorreia/workspace.sun/repository.mysql.new/bzrwork/wl-5569/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3224 Alfranio Correia	2010-12-01
      Introduced some code to implement a checkpoint routine.

    added:
      sql/dynamic_ids.cc
    renamed:
      sql/server_ids.h => sql/dynamic_ids.h
    modified:
      sql/CMakeLists.txt
      sql/Makefile.am
      sql/log_event.cc
      sql/rpl_info_dummy.cc
      sql/rpl_info_dummy.h
      sql/rpl_info_file.cc
      sql/rpl_info_file.h
      sql/rpl_info_handler.h
      sql/rpl_info_table.cc
      sql/rpl_info_table.h
      sql/rpl_mi.cc
      sql/rpl_mi.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/dynamic_ids.h
=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt	2010-11-23 09:03:37 +0000
+++ b/sql/CMakeLists.txt	2010-12-01 19:15:08 +0000
@@ -106,7 +106,7 @@ ADD_DEPENDENCIES(master GenError)
 SET (SLAVE_SOURCE rpl_slave.cc rpl_reporting.cc rpl_mi.cc rpl_rli.cc
 		  rpl_info_handler.cc rpl_info_file.cc rpl_info_table.cc
 		  rpl_info_values.cc rpl_info.cc rpl_info_factory.cc
-		  rpl_info_table_access.cc server_ids.h rpl_rli_pdb.cc
+		  rpl_info_table_access.cc dynamic_ids.cc rpl_rli_pdb.cc
 		  rpl_info_dummy.cc)
 ADD_LIBRARY(slave ${SLAVE_SOURCE})
 ADD_DEPENDENCIES(slave GenError)

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2010-11-23 09:03:37 +0000
+++ b/sql/Makefile.am	2010-12-01 19:15:08 +0000
@@ -111,7 +111,7 @@ noinst_HEADERS =	item.h item_func.h item
 			log.h sql_show.h rpl_info.h rpl_info_file.h \
 			rpl_info_table.h rpl_rli.h rpl_mi.h rpl_info_values.h \
 			rpl_info_table_access.h rpl_info_dummy.h \
-			rpl_info_factory.h server_ids.h rpl_rli_pdb.h \
+			rpl_info_factory.h dynamic_ids.h rpl_rli_pdb.h \
 			sql_select.h structs.h table.h sql_udf.h hash_filo.h \
 			lex.h lex_symbol.h sql_acl.h sql_crypt.h sql_base.h \
 			sql_table.h key.h lock.h thr_malloc.h strfunc.h \
@@ -195,7 +195,7 @@ libbinlog_la_SOURCES =	log_event.cc log_
 librpl_la_SOURCES    =  rpl_handler.cc rpl_tblmap.cc
 libmaster_la_SOURCES =	rpl_master.cc
 libslave_la_SOURCES = 	rpl_slave.cc rpl_reporting.cc rpl_rli.cc rpl_mi.cc \
-			rpl_info.cc rpl_info_factory.cc rpl_info_file.cc \
+			rpl_info.cc rpl_info_factory.cc dynamic_ids.cc rpl_info_file.cc \
 			rpl_info_handler.cc rpl_info_table.cc rpl_info_dummy.cc \
 			rpl_info_table_access.cc rpl_info_values.cc rpl_rli_pdb.cc
 libndb_la_CPPFLAGS=	@ndbcluster_includes@

=== added file 'sql/dynamic_ids.cc'
--- a/sql/dynamic_ids.cc	1970-01-01 00:00:00 +0000
+++ b/sql/dynamic_ids.cc	2010-12-01 19:15:08 +0000
@@ -0,0 +1,106 @@
+#include "dynamic_ids.h"
+
+Dynamic_ids::Dynamic_ids()
+{
+  my_init_dynamic_array(&dynamic_ids, 64, 16, 16);
+}
+
+Dynamic_ids::~Dynamic_ids()
+{
+  delete_dynamic(&dynamic_ids);
+}
+
+bool Server_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
+{
+  char *token= NULL, *last= NULL;
+  uint num_items= 0;
+ 
+  DBUG_ENTER("Server_ids::unpack_dynamic_ids");
+
+  token= strtok_r((char *)const_cast<const char*>(param_dynamic_ids),
+                  " ", &last);
+
+  if (token == NULL)
+    DBUG_RETURN(TRUE);
+
+  num_items= atoi(token);
+  for (uint i=0; i < num_items; i++)
+  {
+    token= strtok_r(NULL, " ", &last);
+    if (token == NULL)
+      DBUG_RETURN(TRUE);
+    else
+    {
+      ulong val= atol(token);
+      insert_dynamic(&dynamic_ids, (uchar *) &val);
+    }
+  }
+  DBUG_RETURN(FALSE);
+}
+
+
+bool Server_ids::do_pack_dynamic_ids(String *buffer)
+{
+  DBUG_ENTER("Server_ids::pack_dynamic_ids");
+
+  if (buffer->set_int(dynamic_ids.elements, FALSE, &my_charset_bin))
+    DBUG_RETURN(TRUE);
+
+  for (ulong i= 0;
+       i < dynamic_ids.elements; i++)
+  {
+    ulong s_id;
+    get_dynamic(&dynamic_ids, (uchar*) &s_id, i);
+    if (buffer->append(" ") ||
+        buffer->append_ulonglong(s_id))
+      DBUG_RETURN(TRUE);
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
+bool Database_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
+{
+  char *token= NULL, *last= NULL;
+  uint num_items= 0;
+ 
+  DBUG_ENTER("Server_ids::unpack_dynamic_ids");
+
+  token= strtok_r((char *)const_cast<const char*>(param_dynamic_ids),
+                  " ", &last);
+
+  if (token == NULL)
+    DBUG_RETURN(TRUE);
+
+  num_items= atoi(token);
+  for (uint i=0; i < num_items; i++)
+  {
+    token= strtok_r(NULL, " ", &last);
+    if (token == NULL)
+      DBUG_RETURN(TRUE);
+    else
+      insert_dynamic(&dynamic_ids, (uchar *) token);
+  }
+  DBUG_RETURN(FALSE);
+}
+
+bool Database_ids::do_pack_dynamic_ids(String *buffer)
+{
+  char token[2000];
+
+  DBUG_ENTER("Server_ids::pack_dynamic_ids");
+
+  if (buffer->set_int(dynamic_ids.elements, FALSE, &my_charset_bin))
+    DBUG_RETURN(TRUE);
+
+  for (ulong i= 0;
+       i < dynamic_ids.elements; i++)
+  {
+    get_dynamic(&dynamic_ids, (uchar*) token, i);
+    if (buffer->append(" ") ||
+        buffer->append(token))
+      DBUG_RETURN(TRUE);
+  }
+
+  DBUG_RETURN(FALSE);
+}

=== renamed file 'sql/server_ids.h' => 'sql/dynamic_ids.h'
--- a/sql/server_ids.h	2010-10-25 10:39:01 +0000
+++ b/sql/dynamic_ids.h	2010-12-01 19:15:08 +0000
@@ -1,20 +1,67 @@
-#ifndef SERVER_ID_H
+/* Copyright (c) 2000, 2010 Oracle and/or its affiliates. All rights reserved.
 
-#define SERVER_ID_H
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef DYNAMIC_ID_H
+
+#define DYNAMIC_ID_H
 
 #include <my_sys.h>
 #include <sql_string.h>
 
-class Server_ids
+class Dynamic_ids
 {
-  public:
-    DYNAMIC_ARRAY server_ids;
+public:
+    DYNAMIC_ARRAY dynamic_ids;
 
-    Server_ids();
-    ~Server_ids();
+    Dynamic_ids();
+    virtual ~Dynamic_ids();
 
-    bool pack_server_ids(String *buffer);
-    bool unpack_server_ids(char *param_server_ids);
+    bool pack_dynamic_ids(String *buffer)
+    {
+      return(do_pack_dynamic_ids(buffer));
+    }
+
+    bool unpack_dynamic_ids(char *param_dynamic_ids)
+    {
+      return(do_unpack_dynamic_ids(param_dynamic_ids));
+    }
+
+private:
+    virtual bool do_pack_dynamic_ids(String *buffer)= 0;
+    virtual bool do_unpack_dynamic_ids(char *param_dynamic_ids)= 0;
 };
 
+class Server_ids : public Dynamic_ids
+{
+public:
+    Server_ids() { };
+    virtual ~Server_ids() { };
+
+private:
+    bool do_pack_dynamic_ids(String *buffer);
+    bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+};
+
+class Database_ids : public Dynamic_ids
+{
+public:
+    Database_ids() { };
+    virtual ~Database_ids() { };
+
+private:
+    bool do_pack_dynamic_ids(String *buffer);
+    bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+};
 #endif

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-01 17:08:21 +0000
+++ b/sql/log_event.cc	2010-12-01 19:15:08 +0000
@@ -2656,14 +2656,12 @@ int slave_worker_exec_job(Slave_worker *
   if (ev->starts_group())
   {
     w->curr_group_seen_begin= TRUE; // The current group is started with B-event
-    // ANDREI ----
-    // w->configure_position(rli);
   } 
   else
   {
     if (ev->contains_partition_info())
     {
-      DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+      DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
       uint i;
       char key[NAME_LEN + 2];
       bool found= FALSE;
@@ -2691,7 +2689,8 @@ int slave_worker_exec_job(Slave_worker *
   if (ev->ends_group() || !w->curr_group_seen_begin)
   {
     w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
-    w->checkpoint(w->w_rli);
+    if (!(ev->get_type_code() == XID_EVENT && w->is_transactional()))
+      w->commit_positions(ev);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -6225,13 +6224,15 @@ int Xid_log_event::do_apply_event(Relay_
     the context of the current transaction in order to provide
     data integrity. See sql/rpl_rli.h for further details.
   */
-  bool is_trans_repo= rli_ptr->is_transactional();
+  Slave_worker *w= rli_ptr->get_current_worker();
+  bool is_parallel= (w != NULL);
+  bool is_trans_repo= (is_parallel ? w->is_transactional() : rli_ptr->is_transactional());
 
   /* For a slave Xid_log_event is COMMIT */
   general_log_print(thd, COM_QUERY,
                     "COMMIT /* implicit, from Xid_log_event */");
 
-  if (is_trans_repo)
+  if (is_trans_repo && !is_parallel)
   {
     mysql_mutex_lock(&rli_ptr->data_lock);
   }
@@ -6249,7 +6250,7 @@ int Xid_log_event::do_apply_event(Relay_
   /*
     We need to update the positions in here to make it transactional.  
   */
-  if (is_trans_repo)
+  if (is_trans_repo && !is_parallel)
   {
     rli_ptr->inc_event_relay_log_pos();
     rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
@@ -6265,6 +6266,11 @@ int Xid_log_event::do_apply_event(Relay_
     if ((error= rli_ptr->flush_info(TRUE)))
       goto err;
   }
+  else if (is_trans_repo && is_parallel)
+  {
+    if ((error= w->commit_positions(this)))
+      goto err;
+  }
 
   DBUG_PRINT("info", ("do_apply group master %s %lu  group relay %s %lu event %s %lu\n",
     rli_ptr->get_group_master_log_name(),
@@ -6281,7 +6287,7 @@ int Xid_log_event::do_apply_event(Relay_
   thd->mdl_context.release_transactional_locks();
 
 err:
-  if (is_trans_repo)
+  if (is_trans_repo && !is_parallel)
   {
     mysql_cond_broadcast(&rli_ptr->data_cond);
     mysql_mutex_unlock(&rli_ptr->data_lock);

=== modified file 'sql/rpl_info_dummy.cc'
--- a/sql/rpl_info_dummy.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_info_dummy.cc	2010-12-01 19:15:08 +0000
@@ -96,7 +96,7 @@ bool Rpl_info_dummy::do_set_info(const i
 }
 
 bool Rpl_info_dummy::do_set_info(const int pos __attribute__((unused)),
-                                const Server_ids *value __attribute__((unused)))
+                                const Dynamic_ids *value __attribute__((unused)))
 {
   if (abort) DBUG_ASSERT(0);
   return FALSE;
@@ -136,8 +136,8 @@ bool Rpl_info_dummy::do_get_info(const i
 }
 
 bool Rpl_info_dummy::do_get_info(const int pos __attribute__((unused)),
-                                Server_ids *value __attribute__((unused)),
-                                const Server_ids *default_value __attribute__((unused)))
+                                Dynamic_ids *value __attribute__((unused)),
+                                const Dynamic_ids *default_value __attribute__((unused)))
 {
   if (abort) DBUG_ASSERT(0);
   return FALSE;

=== modified file 'sql/rpl_info_dummy.h'
--- a/sql/rpl_info_dummy.h	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_info_dummy.h	2010-12-01 19:15:08 +0000
@@ -47,7 +47,7 @@ private:
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);
   bool do_set_info(const int pos, const float value);
-  bool do_set_info(const int pos, const Server_ids *value);
+  bool do_set_info(const int pos, const Dynamic_ids *value);
   bool do_get_info(const int pos, char *value, const size_t size,
                    const char *default_value);
   bool do_get_info(const int pos, int *value,
@@ -56,8 +56,8 @@ private:
                    const ulong default_value);
   bool do_get_info(const int pos, float *value,
                    const float default_value);
-  bool do_get_info(const int pos, Server_ids *value,
-                   const Server_ids *default_value);
+  bool do_get_info(const int pos, Dynamic_ids *value,
+                   const Dynamic_ids *default_value);
   char* do_get_description_info();
   bool do_is_transactional();
 

=== modified file 'sql/rpl_info_file.cc'
--- a/sql/rpl_info_file.cc	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_file.cc	2010-12-01 19:15:08 +0000
@@ -217,7 +217,7 @@ bool Rpl_info_file::do_set_info(const in
           FALSE : TRUE);
 }
 
-bool Rpl_info_file::do_set_info(const int pos, const Server_ids *value)
+bool Rpl_info_file::do_set_info(const int pos, const Dynamic_ids *value)
 {
   bool error= TRUE;
   String buffer;
@@ -225,7 +225,7 @@ bool Rpl_info_file::do_set_info(const in
   /*
     This produces a line listing the total number and all the server_ids.
   */
-  if (const_cast<Server_ids *>(value)->pack_server_ids(&buffer))
+  if (const_cast<Dynamic_ids *>(value)->pack_dynamic_ids(&buffer))
     goto err;
 
   error= (my_b_printf(&info_file, "%s\n", buffer.c_ptr_safe()) >
@@ -262,8 +262,8 @@ bool Rpl_info_file::do_get_info(const in
                                   default_value));
 }
 
-bool Rpl_info_file::do_get_info(const int pos, Server_ids *value,
-                                const Server_ids *default_value __attribute__((unused)))
+bool Rpl_info_file::do_get_info(const int pos, Dynamic_ids *value,
+                                const Dynamic_ids *default_value __attribute__((unused)))
 {
   /*
     Static buffer to use most of the times. However, if it is not big
@@ -277,7 +277,7 @@ bool Rpl_info_file::do_get_info(const in
                                              &buffer_act,
                                              &info_file);
   if (!error)
-    value->unpack_server_ids(buffer_act);
+    value->unpack_dynamic_ids(buffer_act);
 
   if (buffer != buffer_act)
   {

=== modified file 'sql/rpl_info_file.h'
--- a/sql/rpl_info_file.h	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_file.h	2010-12-01 19:15:08 +0000
@@ -54,7 +54,7 @@ private:
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);
   bool do_set_info(const int pos, const float value);
-  bool do_set_info(const int pos, const Server_ids *value);
+  bool do_set_info(const int pos, const Dynamic_ids *value);
   bool do_get_info(const int pos, char *value, const size_t size,
                    const char *default_value);
   bool do_get_info(const int pos, int *value,
@@ -63,8 +63,8 @@ private:
                    const ulong default_value);
   bool do_get_info(const int pos, float *value,
                    const float default_value);
-  bool do_get_info(const int pos, Server_ids *value,
-                   const Server_ids *default_value);
+  bool do_get_info(const int pos, Dynamic_ids *value,
+                   const Dynamic_ids *default_value);
   char* do_get_description_info();
   bool do_is_transactional();
 

=== modified file 'sql/rpl_info_handler.h'
--- a/sql/rpl_info_handler.h	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_handler.h	2010-12-01 19:15:08 +0000
@@ -17,7 +17,7 @@
 #define RPL_INFO_HANDLER_H
 
 #include <my_global.h>
-#include <server_ids.h>
+#include <dynamic_ids.h>
 #include "rpl_info_values.h"
 
 class Rpl_info_handler
@@ -224,8 +224,8 @@ public:
     @retval FALSE No error
     @retval TRUE Failure
   */
-  bool get_info(Server_ids *value,
-                const Server_ids *default_value)
+  bool get_info(Dynamic_ids *value,
+                const Dynamic_ids *default_value)
   {
     if (cursor >= ninfo || prv_error)
       return TRUE;
@@ -317,7 +317,7 @@ private:
   virtual bool do_set_info(const int pos, const ulong value)= 0;
   virtual bool do_set_info(const int pos, const int value)= 0;
   virtual bool do_set_info(const int pos, const float value)= 0;
-  virtual bool do_set_info(const int pos, const Server_ids *value)= 0;
+  virtual bool do_set_info(const int pos, const Dynamic_ids *value)= 0;
   virtual bool do_get_info(const int pos, char *value, const size_t size,
                            const char *default_value)= 0;
   virtual bool do_get_info(const int pos, ulong *value,
@@ -326,8 +326,8 @@ private:
                            const int default_value)= 0;
   virtual bool do_get_info(const int pos, float *value,
                            const float default_value)= 0;
-  virtual bool do_get_info(const int pos, Server_ids *value,
-                           const Server_ids *default_value)= 0;
+  virtual bool do_get_info(const int pos, Dynamic_ids *value,
+                           const Dynamic_ids *default_value)= 0;
   virtual char* do_get_description_info()= 0;
   virtual bool do_is_transactional()= 0;
 

=== modified file 'sql/rpl_info_table.cc'
--- a/sql/rpl_info_table.cc	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_table.cc	2010-12-01 19:15:08 +0000
@@ -354,9 +354,9 @@ bool Rpl_info_table::do_set_info(const i
                                             &my_charset_bin));
 }
 
-bool Rpl_info_table::do_set_info(const int pos, const Server_ids *value)
+bool Rpl_info_table::do_set_info(const int pos, const Dynamic_ids *value)
 {
-  if (const_cast<Server_ids *>(value)->pack_server_ids(&field_values->value[pos]))
+  if (const_cast<Dynamic_ids *>(value)->pack_dynamic_ids(&field_values->value[pos]))
     return TRUE;
 
   return FALSE;
@@ -428,10 +428,10 @@ bool Rpl_info_table::do_get_info(const i
   return TRUE;
 }
 
-bool Rpl_info_table::do_get_info(const int pos, Server_ids *value,
-                                 const Server_ids *default_value __attribute__((unused)))
+bool Rpl_info_table::do_get_info(const int pos, Dynamic_ids *value,
+                                 const Dynamic_ids *default_value __attribute__((unused)))
 {
-  if (value->unpack_server_ids(field_values->value[pos].c_ptr_safe()))
+  if (value->unpack_dynamic_ids(field_values->value[pos].c_ptr_safe()))
     return TRUE;
 
   return FALSE;

=== modified file 'sql/rpl_info_table.h'
--- a/sql/rpl_info_table.h	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_info_table.h	2010-12-01 19:15:08 +0000
@@ -73,7 +73,7 @@ private:
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);
   bool do_set_info(const int pos, const float value);
-  bool do_set_info(const int pos, const Server_ids *value);
+  bool do_set_info(const int pos, const Dynamic_ids *value);
   bool do_get_info(const int pos, char *value, const size_t size,
                    const char *default_value);
   bool do_get_info(const int pos, int *value,
@@ -82,8 +82,8 @@ private:
                    const ulong default_value);
   bool do_get_info(const int pos, float *value,
                    const float default_value);
-  bool do_get_info(const int pos, Server_ids *value,
-                   const Server_ids *default_value);
+  bool do_get_info(const int pos, Dynamic_ids *value,
+                   const Dynamic_ids *default_value);
   char* do_get_description_info();
   bool do_is_transactional();
 

=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_mi.cc	2010-12-01 19:15:08 +0000
@@ -122,13 +122,13 @@ int change_master_server_id_cmp(ulong *i
  */
 bool Master_info::shall_ignore_server_id(ulong s_id)
 {
-  if (likely(ignore_server_ids->server_ids.elements == 1))
+  if (likely(ignore_server_ids->dynamic_ids.elements == 1))
     return (* (ulong*)
-      dynamic_array_ptr(&(ignore_server_ids->server_ids), 0)) == s_id;
+      dynamic_array_ptr(&(ignore_server_ids->dynamic_ids), 0)) == s_id;
   else      
     return bsearch((const ulong *) &s_id,
-                   ignore_server_ids->server_ids.buffer,
-                   ignore_server_ids->server_ids.elements, sizeof(ulong),
+                   ignore_server_ids->dynamic_ids.buffer,
+                   ignore_server_ids->dynamic_ids.elements, sizeof(ulong),
                    (int (*) (const void*, const void*)) change_master_server_id_cmp)
       != NULL;
 }
@@ -384,7 +384,7 @@ bool Master_info::read_info(Rpl_info_han
   */
   if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS)
   {
-     if (from->get_info(ignore_server_ids, (Server_ids *) NULL))
+     if (from->get_info(ignore_server_ids, (Dynamic_ids *) NULL))
       DBUG_RETURN(TRUE);
   }
 

=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_mi.h	2010-12-01 19:15:08 +0000
@@ -92,7 +92,7 @@ class Master_info : public Rpl_info_coor
   long clock_diff_with_master;
   float heartbeat_period;         // interface with CHANGE MASTER or master.info
   ulonglong received_heartbeats;  // counter of received heartbeat events
-  Server_ids *ignore_server_ids;
+  Dynamic_ids *ignore_server_ids;
   ulong master_id;
   ulong retry_count;
   char master_uuid[UUID_LENGTH+1];

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-01 17:08:21 +0000
+++ b/sql/rpl_rli.cc	2010-12-01 19:15:08 +0000
@@ -56,7 +56,7 @@ PSI_cond_key *key_cond_slave_parallel_wo
 PSI_cond_key key_cond_slave_parallel_pend_jobs;
 
 Relay_log_info::Relay_log_info(bool is_slave_recovery)
-   :Rpl_info_coordinator("SQL"),
+   :Rpl_info_coordinator("SQL"), checkpoint_thd(0), checkpoint_running(0),
    replicate_same_server_id(::replicate_same_server_id),
    cur_log_fd(-1), relay_log(&sync_relaylog_period),
    is_relay_log_recovery(is_slave_recovery),
@@ -82,6 +82,8 @@ Relay_log_info::Relay_log_info(bool is_s
   mysql_mutex_init(key_relay_log_info_log_space_lock,
                    &log_space_lock, MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
+  mysql_cond_init(key_checkpoint_start_cond, &checkpoint_start_cond, NULL);
+  mysql_cond_init(key_checkpoint_stop_cond, &checkpoint_stop_cond, NULL);
   relay_log.init_pthread_objects();
  /*
    Parallel slave parameters initialization is done regardless
@@ -131,6 +133,8 @@ Relay_log_info::~Relay_log_info()
 {
   DBUG_ENTER("Relay_log_info::~Relay_log_info");
 
+  mysql_cond_destroy(&checkpoint_start_cond);
+  mysql_cond_destroy(&checkpoint_stop_cond);
   mysql_mutex_destroy(&log_space_lock);
   mysql_cond_destroy(&log_space_cond);
   relay_log.cleanup();

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-01 17:08:21 +0000
+++ b/sql/rpl_rli.h	2010-12-01 19:15:08 +0000
@@ -105,6 +105,12 @@ tables along with the --relay-log-recove
 class Relay_log_info : public Rpl_info_coordinator
 {
 public:
+  THD* checkpoint_thd;
+  bool checkpoint_running;
+
+  PSI_cond_key key_checkpoint_start_cond, key_checkpoint_stop_cond; 
+  mysql_cond_t checkpoint_start_cond, checkpoint_stop_cond; 
+
   /**
      Flags for the state of the replication.
    */

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-01 17:08:21 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-01 19:15:08 +0000
@@ -19,13 +19,14 @@ const char *info_slave_worker_fields []=
 };
 
 Slave_worker::Slave_worker(const char* type): Rpl_info_worker(type)
-{ 
-
+{
+  curr_group_exec_parts= new Database_ids();
 }
 
 Slave_worker::~Slave_worker() 
 {
-
+  if (curr_group_exec_parts)
+    delete curr_group_exec_parts;
 }
 
 int Slave_worker::init_info()
@@ -107,8 +108,7 @@ bool Slave_worker::read_info(Rpl_info_ha
   if (from->prepare_info_for_read())
     DBUG_RETURN(TRUE);
 
-  if (from->get_info(partitions,
-                     sizeof(partitions), "") ||
+  if (from->get_info(curr_group_exec_parts, (Dynamic_ids *) NULL) ||
       from->get_info(group_relay_log_name,
                      sizeof(group_relay_log_name), "") ||
       from->get_info((ulong *) &temp_group_relay_log_pos,
@@ -138,7 +138,7 @@ bool Slave_worker::write_info(Rpl_info_h
   */
 
   if (to->prepare_info_for_write() ||
-      to->set_info(partitions) ||
+      to->set_info(curr_group_exec_parts) ||
       to->set_info(group_relay_log_name) ||
       to->set_info((ulong)group_relay_log_pos) ||
       to->set_info(group_master_log_name) ||
@@ -153,18 +153,18 @@ size_t Slave_worker::get_number_worker_f
   return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
 }
 
-bool Slave_worker::checkpoint(Relay_log_info *info)
+bool Slave_worker::commit_positions(Log_event *ev)
 {
-  DBUG_ENTER("Relay_coordinator::checkpoint_worker");
+  DBUG_ENTER("Slave_worker::checkpoint_positions");
 
   bool error= FALSE;
 
-  group_relay_log_pos= info->get_group_relay_log_pos();
-  strmake(group_relay_log_name, info->get_group_relay_log_name(),
+  group_relay_log_pos= ev->future_event_relay_log_pos;
+  strmake(group_relay_log_name, "",
           sizeof(group_relay_log_name)-1);
 
-  group_master_log_pos= info->get_group_master_log_pos();
-  strmake(group_master_log_name, info->get_group_master_log_name(),
+  group_master_log_pos= ev->mts_group_cnt;
+  strmake(group_master_log_name, c_rli->get_group_master_log_name(),
           sizeof(group_master_log_name)-1);
 
   error= flush_info(TRUE);
@@ -172,7 +172,6 @@ bool Slave_worker::checkpoint(Relay_log_
   DBUG_RETURN(error);
 }
 
-
 static HASH mapping_db_to_worker;
 static bool inited_hash_workers= FALSE;
 
@@ -200,7 +199,7 @@ static void free_entry(db_worker *entry)
 {
   DBUG_ENTER("free_entry");
 
-  DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
+  DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
 
   my_free((void *) entry->db);
   my_free(entry);
@@ -480,7 +479,7 @@ Slave_worker *get_least_occupied_worker(
 
 void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
 {
-  uint i;
+  int i;
 
   if (!error)
   {
@@ -501,15 +500,15 @@ void Slave_worker::slave_worker_ends_gro
     last_group_done_index = gaq_idx;
   }
 
-  for (i= curr_group_exec_parts.elements; i > 0; i--)
+  for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
   {
     db_worker *entry= NULL;
     my_hash_value_type hash_value;
     char key[NAME_LEN + 2];
 
-    get_dynamic(&curr_group_exec_parts, (uchar*) key, i - 1);
+    get_dynamic(&(curr_group_exec_parts->dynamic_ids), (uchar*) key, i - 1);
     hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
-    
+
     mysql_mutex_lock(&slave_worker_hash_lock);
 
     entry= (db_worker *)
@@ -517,7 +516,6 @@ void Slave_worker::slave_worker_ends_gro
                                       (uchar*) key + 1, key[0]);
 
     DBUG_ASSERT(entry && entry->usage != 0);
-
     DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
 
     entry->usage--;
@@ -540,8 +538,8 @@ void Slave_worker::slave_worker_ends_gro
     */  
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
-    
-    delete_dynamic_element(&curr_group_exec_parts, i - 1);
+
+    delete_dynamic_element(&(curr_group_exec_parts->dynamic_ids), i - 1);
   }
   curr_group_seen_begin= FALSE;
 }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-01 19:15:08 +0000
@@ -173,7 +173,7 @@ public:
   // fixme: experimental
   Relay_log_info *w_rli;
 
-  DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+  Dynamic_ids *curr_group_exec_parts; // CGEP
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
   // @c last_group_done_index is for recovery, although can be viewed
   //    as statistics as well.
@@ -218,7 +218,7 @@ public:
 
   void slave_worker_ends_group(ulong, int);  // CGEP walk through to upd APH
 
-  bool checkpoint(Relay_log_info *rli);
+  bool commit_positions(Log_event *evt);
 
 private:
   bool read_info(Rpl_info_handler *from);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-01 17:08:21 +0000
+++ b/sql/rpl_slave.cc	2010-12-01 19:15:08 +0000
@@ -51,7 +51,7 @@
 #include "log_event.h"                          // Rotate_log_event,
                                                 // Create_file_log_event,
                                                 // Format_description_log_event
-#include "server_ids.h"
+#include "dynamic_ids.h"
 #include "rpl_rli_pdb.h"
 
 #ifdef HAVE_REPLICATION
@@ -140,7 +140,7 @@ failed read"
 };
 
 
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER} SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
 
 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -168,6 +168,7 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
+bool checkpoint_routine(Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -1504,7 +1505,7 @@ maybe it is a *VERY OLD MASTER*.");
     mysql_free_result(master_res);
     master_res= NULL;
   }
-  if (mi->master_id == 0 && mi->ignore_server_ids->server_ids.elements > 0)
+  if (mi->master_id == 0 && mi->ignore_server_ids->dynamic_ids.elements > 0)
   {
     errmsg= "Slave configured with server id filtering could not detect the master server id.";
     err_code= ER_SLAVE_FATAL_ERROR;
@@ -2076,11 +2077,11 @@ bool show_master_info(THD* thd, Master_i
       char buff[FN_REFLEN];
       ulong i, cur_len;
       for (i= 0, buff[0]= 0, cur_len= 0;
-           i < mi->ignore_server_ids->server_ids.elements; i++)
+           i < mi->ignore_server_ids->dynamic_ids.elements; i++)
       {
         ulong s_id, slen;
         char sbuff[FN_REFLEN];
-        get_dynamic(&(mi->ignore_server_ids->server_ids), (uchar*) &s_id, i);
+        get_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id, i);
         slen= sprintf(sbuff, (i == 0 ? "%lu" : ", %lu"), s_id);
         if (cur_len + slen + 4 > FN_REFLEN)
         {
@@ -2668,40 +2669,13 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
+    if ((rli->curr_group_is_parallel == FALSE && 
+        !(ev->get_type_code() == XID_EVENT && rli->is_transactional())) ||
         skip_event)
+      error= ev->update_pos(rli);
 
-//  TODO: Alfranio to fix/restore the condition to update the main RLI
-//      It is kept the prototype time way in order to process rpl_parallel.
-//      This is parallel exec and event required the sequential mode
-//      that also includes all Workers finished their assignements.
-//      It was served so inside apply_event() above.
-//      The main RLI table is safe to update now.
-
-//      if (!rli->is_parallel_exec() || ev->only_sequential_exec())
-           error= ev->update_pos(rli);
-#if 1
-
-    // experimental checkpoint per each scheduling attempt
-    // logics of next_event()
-    {
-      uint i;
-      rli->gaq->move_queue_head(&rli->workers);
-
-      /* TODO: 
-         the least occupied sorting out needs moving to the actual
-         checkpoint location - next_event()
-      */
-      for (i= 0; i < rli->workers.elements; i++)
-      {
-        Slave_worker *w_i;
-        get_dynamic(&rli->workers, (uchar *) &w_i, i);
-        set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
-      };
-      sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
-    }           
-
-#endif
+    /* Temporarily placed here /Alfranio */
+    checkpoint_routine(rli);
 
 #ifndef DBUG_OFF
     DBUG_PRINT("info", ("update_pos error = %d", error));
@@ -3623,6 +3597,151 @@ err:
   DBUG_RETURN(0); 
 }
 
+bool checkpoint_routine(Relay_log_info *rli)
+{
+  bool error= FALSE;
+
+  DBUG_ENTER("checkpoint_routine");
+ 
+  // ANDREI LOCKS?
+
+  uint i;
+  rli->gaq->move_queue_head(&rli->workers);
+
+  /* TODO: 
+     the least occupied sorting out needs moving to the actual
+     checkpoint location - next_event()
+  */
+  for (i= 0; i < rli->workers.elements; i++)
+  {
+    Slave_worker *w_i;
+    get_dynamic(&rli->workers, (uchar *) &w_i, i);
+    set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
+  };
+  sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
+
+  error= rli->flush_info(TRUE);
+
+  // ANDREI NOTIFICATIONS?
+  DBUG_RETURN(error);
+}
+
+pthread_handler_t handle_slave_checkpoint(void *arg)
+{
+  THD *thd;                     /* needs to be first for thread_stack */
+  int error= 0;
+  Relay_log_info *rli= (Relay_log_info *) arg;
+
+  my_thread_init();
+  DBUG_ENTER("handle_slave_worker");
+
+  rli->checkpoint_running= 1;
+
+  thd= new THD;
+  if (!thd)
+  {
+    sql_print_error("Failed during slave checkpoint initialization");
+
+    mysql_mutex_lock(&rli->data_lock);
+    mysql_cond_broadcast(&rli->checkpoint_start_cond);
+    mysql_mutex_unlock(&rli->data_lock);
+
+    goto err;
+  }
+
+  thd->thread_stack = (char*)&thd;
+  
+  pthread_detach_this_thread();
+
+  mysql_mutex_lock(&rli->data_lock);
+  rli->checkpoint_thd= thd;
+  mysql_cond_broadcast(&rli->checkpoint_start_cond);
+  mysql_mutex_unlock(&rli->data_lock);
+
+  if (init_slave_thread(thd, SLAVE_THD_CHECKPOINT))
+  {
+    // todo make SQL thread killed
+    sql_print_error("Failed during slave worker initialization");
+    goto err;
+  }
+  thd->init_for_queries();
+  mysql_mutex_lock(&LOCK_thread_count);
+  threads.append(thd);
+  mysql_mutex_unlock(&LOCK_thread_count);
+
+  DBUG_ASSERT(thd->is_slave_error == 0);
+
+  while (!thd->killed && !error)
+  {
+    checkpoint_routine(rli);
+    sleep(1);
+  }
+
+  mysql_mutex_lock(&rli->data_lock);
+  rli->checkpoint_running= 0;
+  rli->checkpoint_thd= NULL;
+  mysql_cond_broadcast(&rli->checkpoint_stop_cond);
+  mysql_mutex_unlock(&rli->data_lock);
+
+  if (error)
+  {
+    mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
+    rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
+    mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+  }
+
+err:
+
+  if (thd)
+  {
+    mysql_mutex_lock(&LOCK_thread_count);
+    THD_CHECK_SENTRY(thd);
+    delete thd;
+    mysql_mutex_unlock(&LOCK_thread_count);
+  }
+
+  my_thread_end();
+  pthread_exit(0);
+  DBUG_RETURN(0); 
+}
+
+int slave_start_checkpoint(Relay_log_info *rli)
+{
+  pthread_t th;
+  int error= 0;
+
+  if (pthread_create(&th, &connection_attrib, handle_slave_checkpoint,
+                     (void*) rli))
+  {
+    sql_print_error("Failed during slave checkpoint thread create");
+    error= 1;
+  }
+
+  while (error == 0 && rli->checkpoint_running == 0)
+    mysql_cond_wait(&rli->checkpoint_start_cond, &rli->data_lock);
+
+  mysql_mutex_unlock(&rli->data_lock);
+
+  return (error);
+}
+
+int slave_stop_checkpoint(Relay_log_info *rli)
+{
+  mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
+  mysql_mutex_lock(&rli->data_lock);
+  if (rli->checkpoint_thd)
+    rli->checkpoint_thd->awake(THD::KILL_QUERY);
+  mysql_mutex_unlock(&rli->data_lock);
+  mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+ 
+  mysql_mutex_lock(&rli->data_lock);
+  while (rli->checkpoint_running == 1)
+    mysql_cond_wait(&rli->checkpoint_stop_cond, &rli->data_lock);
+  mysql_mutex_unlock(&rli->data_lock);
+
+  return 0;
+}
+
 /**
    A single Worker thread is forked out.
    
@@ -3679,11 +3798,7 @@ int slave_start_single_worker(Relay_log_
                    MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
 
-  // CGEP dynarray holds id:s of partitions of the Current being executed Group
-  my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
-                        SLAVE_INIT_DBS_IN_GROUP, 1);
   w->curr_group_seen_begin= FALSE;
-
   if (pthread_create(&th, &connection_attrib, handle_slave_worker,
                      (void*) w))
   {
@@ -3752,6 +3867,12 @@ int slave_start_workers(Relay_log_info *
     goto err;
   }
 
+  /* if (rli->is_parallel_exec())
+  {
+    slave_start_checkpoint(rli);
+  }
+  */
+
 err:
   return error;
 }
@@ -3763,6 +3884,11 @@ void slave_stop_workers(Relay_log_info *
 {
   int i;
   THD *thd= rli->info_thd;
+
+  /*if (rli->is_parallel_exec())
+  {
+    slave_stop_checkpoint(rli);
+  }*/
   
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
@@ -3808,7 +3934,6 @@ void slave_stop_workers(Relay_log_info *
 
     DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
     delete_dynamic(&w->jobs.Q);
-    delete_dynamic(&w->curr_group_exec_parts);   // GCEP
     delete_dynamic_element(&rli->workers, i);
     delete w->w_rli;
 
@@ -4732,7 +4857,7 @@ static int queue_event(Master_info* mi,c
         If the master is on the ignore list, execution of
         format description log events and rotate events is necessary.
       */
-      (mi->ignore_server_ids->server_ids.elements > 0 &&
+      (mi->ignore_server_ids->dynamic_ids.elements > 0 &&
        mi->shall_ignore_server_id(s_id) &&
        /* everything is filtered out from non-master */
        (s_id != mi->master_id ||
@@ -6017,7 +6142,7 @@ bool change_master(THD* thd, Master_info
     is mentioning IGNORE_SERVER_IDS= (...)
   */
   if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
-    reset_dynamic(&mi->ignore_server_ids->server_ids);
+    reset_dynamic(&(mi->ignore_server_ids->dynamic_ids));
   for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
   {
     ulong s_id;
@@ -6031,14 +6156,14 @@ bool change_master(THD* thd, Master_info
     else
     {
       if (bsearch((const ulong *) &s_id,
-                  mi->ignore_server_ids->server_ids.buffer,
-                  mi->ignore_server_ids->server_ids.elements, sizeof(ulong),
+                  mi->ignore_server_ids->dynamic_ids.buffer,
+                  mi->ignore_server_ids->dynamic_ids.elements, sizeof(ulong),
                   (int (*) (const void*, const void*))
                   change_master_server_id_cmp) == NULL)
-        insert_dynamic(&mi->ignore_server_ids->server_ids, (uchar*) &s_id);
+        insert_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id);
     }
   }
-  sort_dynamic(&mi->ignore_server_ids->server_ids, (qsort_cmp) change_master_server_id_cmp);
+  sort_dynamic(&(mi->ignore_server_ids->dynamic_ids), (qsort_cmp) change_master_server_id_cmp);
 
   if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
     mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
@@ -6215,61 +6340,3 @@ err:
   @} (end of group Replication)
 */
 #endif /* HAVE_REPLICATION */
-
-Server_ids::Server_ids()
-{
-  my_init_dynamic_array(&server_ids, sizeof(::server_id), 16, 16);
-}
-
-Server_ids::~Server_ids()
-{
-  delete_dynamic(&server_ids);
-}
-
-bool Server_ids::unpack_server_ids(char *param_server_ids)
-{
-  char *token= NULL, *last= NULL;
-  uint num_items= 0;
- 
-  DBUG_ENTER("Server_ids::unpack_server_ids");
-
-  token= strtok_r((char *)const_cast<const char*>(param_server_ids),
-                  " ", &last);
-
-  if (token == NULL)
-    DBUG_RETURN(TRUE);
-
-  num_items= atoi(token);
-  for (uint i=0; i < num_items; i++)
-  {
-    token= strtok_r(NULL, " ", &last);
-    if (token == NULL)
-      DBUG_RETURN(TRUE);
-    else
-    {
-      ulong val= atol(token);
-      insert_dynamic(&server_ids, (uchar *) &val);
-    }
-  }
-  DBUG_RETURN(FALSE);
-}
-
-bool Server_ids::pack_server_ids(String *buffer)
-{
-  DBUG_ENTER("Server_ids::pack_server_ids");
-
-  if (buffer->set_int(server_ids.elements, FALSE, &my_charset_bin))
-    DBUG_RETURN(TRUE);
-
-  for (ulong i= 0;
-       i < server_ids.elements; i++)
-  {
-    ulong s_id;
-    get_dynamic(&server_ids, (uchar*) &s_id, i);
-    if (buffer->append(" ") ||
-        buffer->append_ulonglong(s_id))
-      DBUG_RETURN(TRUE);
-  }
-
-  DBUG_RETURN(FALSE);
-}


Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20101201191508-owdwp6ayr5sj2obl.bundle
Thread
bzr commit into mysql-next-mr.crash-safe branch (alfranio.correia:3224) Alfranio Correia1 Dec