List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:January 20 2011 1:39pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3270) WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3270 Andrei Elkin	2011-01-20
      WL#5754 Query event parallel execution
      
      This is a patch skeletoning parallel applying of Queries containing a temporary table.
      A new specialized test proves stability and data consistency.
     @ mysql-test/suite/rpl/r/rpl_parallel_query.result
        a new results file
     @ sql/field.cc
        relaxing an assert (todo: add to it more specific claim field->table is temp).
     @ sql/handler.cc
        relaxing an assert (todo: add to it more specific claim table is temp).
     @ sql/rpl_rli.cc
        a new temp table mutex init, destroy and a set of helper functions providing access
        to C,W's thd:s in arbitrary place of the server code are added.
     @ sql/rpl_rli.h
        a new temp table mutex is added to RLI class.
     @ sql/rpl_slave.cc
        SLAVE_THD_WORKER appeared to be redundant. Worker's thd->system_thread is set to the same
        as the Coordinator thread constant.
     @ sql/sql_base.cc
        replacing refs to thd->temporary with an appropriate one corresponding to the Coord's thd->t_t:s.
        Also surrounding critical sections of codes dealing with opening, finding, closing or changing
        temproray_tables' list with a specific mutex lock/unlock.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_query.result
    modified:
      sql/field.cc
      sql/handler.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sql_base.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_query.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_query.result	2011-01-20 13:39:00 +0000
@@ -0,0 +1,30 @@
+include/master-slave.inc
+[connection master]
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 2;
+include/start_slave.inc
+Warnings:
+Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d1;
+use d1;
+create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
+create temporary table tt_1 (a int auto_increment primary key);
+insert into tt_1 values (null);
+insert into tt_1 values (null);
+insert into d1.t1 (b) select count(*) from tt_1;
+create database d2;
+use d2;
+create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
+create temporary table tt_1 (a int auto_increment primary key);
+insert into tt_1 values (null);
+insert into tt_1 values (null);
+insert into d2.t1 (b) select count(*) from tt_1;
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+drop temporary table tt_1;
+drop database d1;
+drop temporary table tt_1;
+include/stop_slave.inc
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== modified file 'sql/field.cc'
--- a/sql/field.cc	2010-12-29 00:38:59 +0000
+++ b/sql/field.cc	2011-01-20 13:39:00 +0000
@@ -3736,7 +3736,7 @@ longlong Field_long::val_int(void)
   ASSERT_COLUMN_MARKED_FOR_READ;
   int32 j;
   /* See the comment in Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
 #ifdef WORDS_BIGENDIAN
   if (table->s->db_low_byte_first)
     j=sint4korr(ptr);
@@ -6309,7 +6309,7 @@ int Field_string::store(const char *from
   const char *from_end_pos;
 
   /* See the comment for Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
 
   copy_length= well_formed_copy_nchars(field_charset,
                                        (char*) ptr, field_length,
@@ -6459,7 +6459,7 @@ String *Field_string::val_str(String *va
 {
   ASSERT_COLUMN_MARKED_FOR_READ;
   /* See the comment for Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
   uint length;
   if (table->in_use->variables.sql_mode &
       MODE_PAD_CHAR_TO_FULL_LENGTH)

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2011-01-11 11:45:02 +0000
+++ b/sql/handler.cc	2011-01-20 13:39:00 +0000
@@ -2127,7 +2127,8 @@ void **handler::ha_data(THD *thd) const
 
 THD *handler::ha_thd(void) const
 {
-  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
+  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
+              current_thd->slave_thread /* mts worker */);
   return (table && table->in_use) ? table->in_use : current_thd;
 }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-27 18:54:41 +0000
+++ b/sql/rpl_rli.cc	2011-01-20 13:39:00 +0000
@@ -52,6 +52,7 @@ static PSI_cond_info *worker_conds= NULL
 
 PSI_mutex_key *key_mutex_slave_parallel_worker= NULL;
 PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
+PSI_mutex_key key_mutex_mts_temp_tables_lock;
 
 PSI_cond_key *key_cond_slave_parallel_worker= NULL;
 PSI_cond_key key_cond_slave_parallel_pend_jobs;
@@ -130,6 +131,8 @@ void Relay_log_info::init_workers(ulong 
   mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
                    MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
+  mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
+                   MY_MUTEX_INIT_FAST);
   my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
 }
 
@@ -140,6 +143,7 @@ void Relay_log_info::deinit_workers()
 {
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
+  mysql_mutex_destroy(&mts_temp_tables_lock);
 
   if (!this_worker)
     delete_dynamic(&workers);
@@ -198,6 +202,36 @@ void Relay_log_info::reset_notified_chec
   checkpoint_seqno= 0;
 }
 
+mysql_mutex_t* mts_get_temp_table_mutex()
+{
+  return &active_mi->rli->mts_temp_tables_lock;
+}
+
+THD* mts_get_coordinator_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->c_rli->info_thd;
+}
+
+THD* mts_get_worker_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->w_rli->info_thd;
+}
+
+bool mts_is_coord_or_worker(THD *thd)
+{
+  return
+    thd->slave_thread &&
+    thd->system_thread != SYSTEM_THREAD_SLAVE_IO &&
+    (mts_get_worker_thd() != NULL);
+}
+
+
 /**
    The method can be run both by C having the Main (coord) rli context and
    by W having both the main and the Local (worker) rli context.

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-01-11 22:54:12 +0000
+++ b/sql/rpl_rli.h	2011-01-20 13:39:00 +0000
@@ -476,6 +476,15 @@ public:
   ulong mts_recovery_group_cnt; // number of groups to execute at recovery
   ulong mts_recovery_index;     // running index of recoverable groups
   bool curr_event_is_not_in_group; // a special case of group split apart by FD
+  /*
+    temporary tables are held by Coordinator though are created and dropped
+    if explicilty by Workers. The following lock has to be taken by either party
+    in order to conduct any operation in the temp tables placeholder, incl.
+    find, drop, create, open.
+  */
+  mysql_mutex_t mts_temp_tables_lock;
+
+
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-01-11 22:54:12 +0000
+++ b/sql/rpl_slave.cc	2011-01-20 13:39:00 +0000
@@ -143,7 +143,7 @@ failed read"
 };
 
 
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
 
 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -3730,14 +3730,13 @@ pthread_handler_t handle_slave_worker(vo
     sql_print_error("Failed during slave worker initialization");
     goto err;
   }
-
   w->info_thd= thd;
   w->w_rli->info_thd= thd;
 
   thd->thread_stack = (char*)&thd;
   
   pthread_detach_this_thread();
-  if (init_slave_thread(thd, SLAVE_THD_WORKER))
+  if (init_slave_thread(thd, SLAVE_THD_SQL))
   {
     // todo make SQL thread killed
     sql_print_error("Failed during slave worker initialization");

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-01-11 11:45:02 +0000
+++ b/sql/sql_base.cc	2011-01-20 13:39:00 +0000
@@ -58,6 +58,11 @@
 #include <io.h>
 #endif
 
+// MTS temp table support
+extern THD* mts_get_coordinator_thd();
+extern THD* mts_get_worker_thd();
+extern mysql_mutex_t* mts_get_temp_table_mutex();
+extern bool mts_is_coord_or_worker(THD *thd);
 
 bool
 No_such_table_error_handler::handle_condition(THD *,
@@ -1192,11 +1197,26 @@ bool close_cached_connection_tables(THD 
 
 static void mark_temp_tables_as_free_for_reuse(THD *thd)
 {
-  for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
+  TABLE *temporary_tables;
+  bool mts_slave= mts_is_coord_or_worker(thd);
+
+  if (mts_slave)
+  {
+    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+  }
+  else
+  {
+    temporary_tables= thd->temporary_tables;
+  }
+
+  for (TABLE *table= temporary_tables; table ; table=table->next)
   {
     if ((table->query_id == thd->query_id) && ! table->open_by_handler)
       mark_tmp_table_for_reuse(table);
   }
+  if (mts_slave)
+  mysql_mutex_unlock(mts_get_temp_table_mutex());
 }
 
 
@@ -1587,19 +1607,38 @@ bool close_temporary_tables(THD *thd)
   /* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
   bool was_quote_show= TRUE;
   bool error= 0;
+  bool mts_slave= mts_is_coord_or_worker(thd);
+  TABLE *temporary_tables, **ptr_temporary_tables;
+
+  if (mts_slave)
+  {
+    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+  }
+  else
+  {
+    temporary_tables= thd->temporary_tables;
+  }
+  ptr_temporary_tables= &temporary_tables;
 
-  if (!thd->temporary_tables)
+  if (!temporary_tables)
+  {
+    if (mts_slave)
+      mysql_mutex_unlock(mts_get_temp_table_mutex());
     DBUG_RETURN(FALSE);
+  }
 
   if (!mysql_bin_log.is_open())
   {
     TABLE *tmp_next;
-    for (table= thd->temporary_tables; table; table= tmp_next)
+    for (table= temporary_tables; table; table= tmp_next)
     {
       tmp_next= table->next;
       close_temporary(table, 1, 1);
     }
-    thd->temporary_tables= 0;
+    *ptr_temporary_tables= 0;
+    if (mts_slave)
+      mysql_mutex_unlock(mts_get_temp_table_mutex());
     DBUG_RETURN(FALSE);
   }
 
@@ -1617,7 +1656,7 @@ bool close_temporary_tables(THD *thd)
     of sublists of equal pseudo_thread_id
   */
 
-  for (prev_table= thd->temporary_tables, table= prev_table->next;
+  for (prev_table= temporary_tables, table= prev_table->next;
        table;
        prev_table= table, table= table->next)
   {
@@ -1626,7 +1665,7 @@ bool close_temporary_tables(THD *thd)
     {
       if (!found_user_tables)
         found_user_tables= true;
-      for (prev_sorted= NULL, sorted= thd->temporary_tables; sorted != table;
+      for (prev_sorted= NULL, sorted= temporary_tables; sorted != table;
            prev_sorted= sorted, sorted= sorted->next)
       {
         if (!is_user_table(sorted) ||
@@ -1641,7 +1680,7 @@ bool close_temporary_tables(THD *thd)
           }
           else
           {
-            thd->temporary_tables= table;
+            *ptr_temporary_tables= table;
           }
           table= prev_table;
           break;
@@ -1658,7 +1697,7 @@ bool close_temporary_tables(THD *thd)
   }
 
   /* scan sorted tmps to generate sequence of DROP */
-  for (table= thd->temporary_tables; table; table= next)
+  for (table= temporary_tables; table; table= next)
   {
     if (is_user_table(table))
     {
@@ -1731,7 +1770,10 @@ bool close_temporary_tables(THD *thd)
   }
   if (!was_quote_show)
     thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
-  thd->temporary_tables=0;
+  *ptr_temporary_tables= 0;
+
+  if (mts_slave)
+    mysql_mutex_unlock(mts_get_temp_table_mutex());
 
   DBUG_RETURN(error);
 }
@@ -2025,16 +2067,31 @@ TABLE *find_temporary_table(THD *thd,
                             const char *table_key,
                             uint table_key_length)
 {
-  for (TABLE *table= thd->temporary_tables; table; table= table->next)
+  TABLE *table= NULL, *temporary_tables;
+  bool mts_slave= mts_is_coord_or_worker(thd);
+  if (mts_slave)
+  {
+    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+  }
+  else
+  {
+    temporary_tables= thd->temporary_tables;
+  }
+
+  for (table= temporary_tables; table; table= table->next)
   {
     if (table->s->table_cache_key.length == table_key_length &&
         !memcmp(table->s->table_cache_key.str, table_key, table_key_length))
     {
-      return table;
+      break;
     }
   }
 
-  return NULL;
+  if (mts_slave)
+    mysql_mutex_unlock(mts_get_temp_table_mutex());
+
+  return table;
 }
 
 
@@ -2072,6 +2129,9 @@ TABLE *find_temporary_table(THD *thd,
 int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
 {
   TABLE *table;
+  bool mts_slave= mts_is_coord_or_worker(thd);
+  THD *thd_temp;
+
   DBUG_ENTER("drop_temporary_table");
   DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
                           table_list->db, table_list->table_name));
@@ -2094,7 +2154,22 @@ int drop_temporary_table(THD *thd, TABLE
     unlock the table and remove the table from this list.
   */
   mysql_lock_remove(thd, thd->lock, table);
-  close_temporary_table(thd, table, 1, 1);
+
+  if (mts_slave)
+  {
+    thd_temp= mts_get_coordinator_thd();
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+  }
+  else
+  {
+    thd_temp= thd;
+  }
+
+  close_temporary_table(thd_temp, table, 1, 1);
+
+  if (mts_slave)
+     mysql_mutex_unlock(mts_get_temp_table_mutex());
+
   DBUG_RETURN(0);
 }
 
@@ -2125,7 +2200,7 @@ void close_temporary_table(THD *thd, TAB
       passing non-zero value to end_slave via rli->save_temporary_tables
       when no temp tables opened, see an invariant below.
     */
-    thd->temporary_tables= table->next;
+    thd->temporary_tables= table->next; // mts: see drop_temporary_table()
     if (thd->temporary_tables)
       table->next->prev= 0;
   }
@@ -2631,7 +2706,19 @@ bool open_table(THD *thd, TABLE_LIST *ta
   if (table_list->open_type != OT_BASE_ONLY &&
       ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
   {
-    for (table= thd->temporary_tables; table ; table=table->next)
+    bool mts_slave= mts_is_coord_or_worker(thd);
+    TABLE *temporary_tables;
+    if (mts_slave)
+    {
+      temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+      mysql_mutex_lock(mts_get_temp_table_mutex());
+    }
+    else
+    {
+      temporary_tables= thd->temporary_tables;
+    }
+
+    for (table= temporary_tables; table ; table=table->next)
     {
       if (table->s->table_cache_key.length == key_length +
           TMP_TABLE_KEY_EXTRA &&
@@ -2651,14 +2738,20 @@ bool open_table(THD *thd, TABLE_LIST *ta
                       (ulong) table->query_id, (uint) thd->server_id,
                       (ulong) thd->variables.pseudo_thread_id));
 	  my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
+          if (mts_slave)
+            mysql_mutex_unlock(mts_get_temp_table_mutex());
 	  DBUG_RETURN(TRUE);
 	}
 	table->query_id= thd->query_id;
 	thd->thread_specific_used= TRUE;
         DBUG_PRINT("info",("Using temporary table"));
+        if (mts_slave)
+          mysql_mutex_unlock(mts_get_temp_table_mutex());
         goto reset;
       }
     }
+    if (mts_slave)
+      mysql_mutex_unlock(mts_get_temp_table_mutex());
   }
 
   if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5851,14 +5944,29 @@ TABLE *open_table_uncached(THD *thd, con
 
   if (add_to_temporary_tables_list)
   {
+    TABLE **ptr_temporary_tables;
+    bool mts_slave= mts_is_coord_or_worker(thd);
+
+    if (mts_slave)
+    {
+      ptr_temporary_tables= &mts_get_coordinator_thd()->temporary_tables;
+      mysql_mutex_lock(mts_get_temp_table_mutex());
+    }
+    else
+    {
+      ptr_temporary_tables= &thd->temporary_tables;
+    }
+
     /* growing temp list at the head */
-    tmp_table->next= thd->temporary_tables;
+    tmp_table->next= *ptr_temporary_tables;
     if (tmp_table->next)
       tmp_table->next->prev= tmp_table;
-    thd->temporary_tables= tmp_table;
-    thd->temporary_tables->prev= 0;
+    *ptr_temporary_tables= tmp_table;
+    (*ptr_temporary_tables)->prev= 0;
     if (thd->slave_thread)
       slave_open_temp_tables++;
+    if (mts_slave)
+       mysql_mutex_unlock(mts_get_temp_table_mutex());
   }
   tmp_table->pos_in_table_list= 0;
   DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110120133900-ygoyseep8798hy84.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3270) WL#5754Andrei Elkin20 Jan