List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 25 2011 3:58pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569
WL#5599 WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3281 Andrei Elkin	2011-05-25
      wl#5569 MTS
      Wl#5754 Query event parallel appying
      wl#5599 MTS recovery
      
      The patch includes
      some cleanup, including one for temp tables support, realization of few todo:s.
     @ sql/field.cc
        Restoring asserts that were before changes to sql_base.cc.
     @ sql/handler.cc
        Restoring assert that were before changes to sql_base.cc.
     @ sql/log_event.cc
        cleanup, incl restoration of the trunk version of some pieces of code.
        passing future_event_relay_log_pos to Worker to stike out a todo in rpl_rli.cc.
     @ sql/rpl_rli.cc
        passing future_event_relay_log_pos is done via an assignment to 
        Worker's member in slave_worker_exec_job().
     @ sql/rpl_rli.h
        restoring the original version of get_table_data() though
         no real changes.
     @ sql/rpl_rli_pdb.cc
        cleanup.
     @ sql/rpl_slave.cc
        Restoring original sequential mode version of assert
        in sql_slave_killed.
        Worker is not supposed to run this function.
        Testing of skipping logics is left to the rpl suite
         be run in the parallel mode.
        Cleanup.
        Marking recovery related todo items explicitly.
        Setting up guards to guarantee sequential mode in requested
          points of the code.
     @ sql/sql_base.cc
        removing unnecessary return value in incr_slave_open_temp_tables def.
     @ sql/sql_parse.cc
        cleanup.

    modified:
      sql/field.cc
      sql/handler.cc
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_slave.cc
      sql/sql_base.cc
      sql/sql_parse.cc
=== modified file 'sql/field.cc'
--- a/sql/field.cc	2011-02-27 17:35:25 +0000
+++ b/sql/field.cc	2011-05-25 15:58:07 +0000
@@ -3736,12 +3736,7 @@ longlong Field_long::val_int(void)
   ASSERT_COLUMN_MARKED_FOR_READ;
   int32 j;
   /* See the comment in Field_long::store(long long) */
-  /* 
-     In case the method is executed not by the table's owner
-     that one must be a Slave worker thread.
-  */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
-
+  DBUG_ASSERT(table->in_use == current_thd);
 #ifdef WORDS_BIGENDIAN
   if (table->s->db_low_byte_first)
     j=sint4korr(ptr);
@@ -6313,8 +6308,8 @@ int Field_string::store(const char *from
   const char *cannot_convert_error_pos;
   const char *from_end_pos;
 
-  /* See the comment for Field_long::store(long long) and Field_long::val_int */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+  /* See the comment for Field_long::store(long long) */
+  DBUG_ASSERT(table->in_use == current_thd);
 
   copy_length= well_formed_copy_nchars(field_charset,
                                        (char*) ptr, field_length,
@@ -6463,8 +6458,8 @@ String *Field_string::val_str(String *va
 			      String *val_ptr)
 {
   ASSERT_COLUMN_MARKED_FOR_READ;
-  /* See the comment for Field_long::store(long long) and Field_long::val_int */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+  /* See the comment for Field_long::store(long long) */
+  DBUG_ASSERT(table->in_use == current_thd);
   uint length;
   if (table->in_use->variables.sql_mode &
       MODE_PAD_CHAR_TO_FULL_LENGTH)

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2011-02-27 17:35:25 +0000
+++ b/sql/handler.cc	2011-05-25 15:58:07 +0000
@@ -2127,13 +2127,7 @@ void **handler::ha_data(THD *thd) const
 
 THD *handler::ha_thd(void) const
 {
-  /* 
-     About current_thd->slave_thread alternative,
-     MTS coordinator open/closes a temp table while the rest of operation
-     is done by Workers.
-  */
-  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
-              current_thd->slave_thread);
+  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
   return (table && table->in_use) ? table->in_use : current_thd;
 }
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-24 14:29:35 +0000
+++ b/sql/log_event.cc	2011-05-25 15:58:07 +0000
@@ -2932,14 +2932,14 @@ int Log_event::apply_event(Relay_log_inf
 struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
 {
   THD *thd= w->info_thd;
+
   mysql_mutex_lock(&w->jobs_lock);
+
   while (!job_item->data && !thd->killed)
   {
     const char *old_msg;
 
-    //job_item= w->jobs.pop(); // LABS-TODO de_queue()
     head_queue(&w->jobs, job_item);
-
     if (job_item->data == NULL)
     {
       w->wait_jobs++;
@@ -2952,6 +2952,7 @@ struct slave_job_item* pop_jobs_item(Sla
   }
   if (job_item->data)
     w->curr_jobs--;
+
   mysql_mutex_unlock(&w->jobs_lock);
 
   thd_proc_info(w->info_thd, "Executing event");
@@ -2960,7 +2961,7 @@ struct slave_job_item* pop_jobs_item(Sla
 
 
 /**
-  mts-II worker main routine.
+  MTS worker main routine.
   The worker thread waits for an event, execute it, fixes statistics counters.
 
   @note the function maintains CGEP and modifies APH, and causes
@@ -3030,7 +3031,7 @@ int slave_worker_exec_job(Slave_worker *
       }
     }
   }
-
+  w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
   error= ev->do_apply_event_worker(w);
   
   if (ev->ends_group() || !w->curr_group_seen_begin)
@@ -6920,7 +6921,7 @@ int Xid_log_event::do_apply_event(Relay_
     {
       rli_ptr->set_group_master_log_pos(log_pos);
     }
-
+  
     if ((error= rli_ptr->flush_info(TRUE)))
       goto err;
   }
@@ -8923,7 +8924,7 @@ int Rows_log_event::do_apply_event(Relay
       Rows_log_event, we can invalidate the query cache for the
       associated table.
      */
-    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
     {
       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
     }
@@ -8932,9 +8933,10 @@ int Rows_log_event::do_apply_event(Relay
 #endif
   }
 
-  TABLE* table= 
+  TABLE* 
+    table= 
     m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-  
+
   DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
   if (table)
@@ -9717,7 +9719,6 @@ int Table_map_log_event::do_apply_event(
     table_list->next_global= table_list->next_local= rli->tables_to_lock;
     const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
     const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-
     /* 'memory' is freed in clear_tables_to_lock */
   }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-05-25 07:36:36 +0000
+++ b/sql/rpl_rli.cc	2011-05-25 15:58:07 +0000
@@ -1017,7 +1017,7 @@ void Relay_log_info::stmt_done(my_off_t 
     while the MyISAM table has already been updated.
   */
   if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
-    inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
+    inc_event_relay_log_pos();
   else
   {
     inc_group_relay_log_pos(event_master_log_pos);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli.h	2011-05-25 15:58:07 +0000
@@ -392,10 +392,8 @@ public:
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
-    TABLE_LIST *tables= tables_to_lock;
-
     DBUG_ASSERT(tabledef_var && conv_table_var);
-    for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
       if (ptr->table == table_arg)
       {
         *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-25 15:58:07 +0000
@@ -63,7 +63,7 @@ int Slave_worker::init_info()
     goto err;
   
   inited_group_execed= 1;
-  
+
   /*
     The init_info() is used to either create or read information
     from the repository, in order to initialize the Slave_worker.

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-25 07:36:36 +0000
+++ b/sql/rpl_slave.cc	2011-05-25 15:58:07 +0000
@@ -1075,8 +1075,8 @@ static bool sql_slave_killed(THD* thd, R
   bool ret= FALSE;
   DBUG_ENTER("sql_slave_killed");
 
-  DBUG_ASSERT(rli->info_thd == thd || thd->slave_thread);
-  DBUG_ASSERT(rli->slave_running == 1 || thd->slave_thread);// tracking buffer overrun
+  DBUG_ASSERT(rli->info_thd == thd);
+  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
   if (abort_loop || thd->killed || rli->abort_slave)
   {
     /*
@@ -2819,13 +2819,7 @@ int apply_event_and_update_pos(Log_event
   }
   if (reason == Log_event::EVENT_SKIP_NOT)
   {
-    /* 
-       MTS-todo: to test neither skipping nor delayed-exec logics
-       are affected by parallel exec mode.
-    */
-
     // Sleeps if needed, and unlocks rli->data_lock.
-
     if (sql_delay_event(ev, thd, rli))
       DBUG_RETURN(0);
     exec_res= ev->apply_event(rli);
@@ -2923,6 +2917,7 @@ int apply_event_and_update_pos(Log_event
   DBUG_RETURN(exec_res ? 1 : 0);
 }
 
+
 /**
   Top-level function for executing the next event in the relay log.
   This is called from the SQL thread.
@@ -3798,9 +3793,6 @@ err:
 /**
    Orders jobs by comparing relay log information.
 */
-#if 0
-int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
-#endif
 
 int mts_event_coord_cmp(LOG_POS_COORD *id1, LOG_POS_COORD *id2)
 {
@@ -3954,126 +3946,6 @@ bool mts_recovery_groups(Relay_log_info 
 
   DBUG_ASSERT(rli->mts_recovery_group_cnt < groups->n_bits);
 
-#if 0
-  for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
-  {
-    group_worker_counter= 0;
-    group_lwm_counter= 0;
-    get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
-
-    sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
-                          "group_relay_log_name %s, group_relay_log_pos %lu "
-                          "group_master_log_name %s, group_master_log_pos %lu",
-                          job_worker.worker_id,
-                          job_worker.group_relay_log_name,
-                          (ulong) job_worker.group_relay_log_pos,
-                          job_worker.group_master_log_name,
-                          (ulong) job_worker.group_master_log_pos);
-
-    for (uint it_file= 0; it_file < above_lwm_jobs.elements; it_file++)
-    {
-      get_dynamic(&above_lwm_jobs, (uchar *) &job_file, it_file);
-
-      /*
-        Either the current relay log file was already processed by the
-        current worker or all groups were analyzed. So, the next file
-        is checked.
-      */
-      if  ((strcmp(job_worker.group_relay_log_name,
-                  job_file.group_relay_log_name) > 0) ||
-           (group_worker_counter > (rli->checkpoint_group - 1)))
-        continue;
-
-      if (desc)
-      {
-        delete desc;
-        desc= NULL;
-      }
-
-      if (log_name)
-      {
-        end_io_cache(&log);
-        mysql_file_close(file, MYF(MY_WME));
-        log_name= NULL;
-      }
-
-      if ((file= open_binlog(&log, job_file.group_relay_log_name, &errmsg)) < 0)
-      {
-        sql_print_error("%s", errmsg);
-        goto end;
-      }
-      log_name= job_file.group_relay_log_name;
-      my_stat(log_name, &s, MYF(0));
-
-      if (!((desc= Log_event::read_log_event(&log, 0, &fdle,
-                                             opt_master_verify_checksum)) &&
-           desc->get_type_code() == FORMAT_DESCRIPTION_EVENT))
-        goto end;
-    
-      my_b_seek(&log, (my_off_t) 0);
-      while ((ev= Log_event::read_log_event(&log, 0, &fdle,
-              opt_master_verify_checksum)))
-      {
-        DBUG_ASSERT(ev->is_valid());
-
-        /*
-          All groups were analyzed. So, the next worker needs to
-          be checked.
-        */
-        if (group_worker_counter > (rli->checkpoint_group - 1))
-          break;
-
-        filecmp= strcmp(job_file.group_relay_log_name,
-                        job_worker.group_relay_log_name);
-        poscmp= ev->log_pos -
-                job_worker.group_master_log_pos;
-
-        if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
-        {
-          bool unhandled= !bitmap_is_set(&job_worker.worker->group_execed,
-                                         group_worker_counter);
-          if (ev->starts_group())
-            curr_group_seen_begin= TRUE;
-
-          if (ev->ends_group() || !curr_group_seen_begin)
-          {
-            filecmp= strcmp(job_file.group_relay_log_name,
-                            rli->get_group_master_log_name());
-            poscmp= ev->log_pos -
-                    rli->get_group_master_log_pos();
-            if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
-            {
-              if (unhandled)
-                bitmap_is_set(groups, group_lwm_counter);
-              group_lwm_counter++;
-            }
-            curr_group_seen_begin= FALSE;
-            group_worker_counter++;
-          }
-        }
-
-        delete ev;
-        ev= NULL;
-      }
-    }
-  }
-
-end:
-  if (desc)
-  {
-    delete desc;
-    desc= NULL;
-  }
-
-
-  if (log_name)
-  {
-    end_io_cache(&log);
-    mysql_file_close(file, MYF(MY_WME));
-    log_name= NULL;
-  }
-#endif
-
   end_io_cache(&log);
   mysql_file_close(file, MYF(MY_WME));
   log_name= NULL;
@@ -4471,7 +4343,7 @@ pthread_handler_t handle_slave_sql(void 
 
   pthread_detach_this_thread();
 
-  /* mts-II: starting the worker pool */
+  /* MTS: starting the worker pool */
   if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
   {
       mysql_mutex_unlock(&rli->run_lock);
@@ -4539,7 +4411,6 @@ pthread_handler_t handle_slave_sql(void 
     goto err;
   }
   THD_CHECK_SENTRY(thd);
-
 #ifndef DBUG_OFF
   {
     char llbuf1[22], llbuf2[22];
@@ -5839,7 +5710,6 @@ static Log_event* next_event(Relay_log_i
   mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
   const char* errmsg=0;
   THD* thd = rli->info_thd;
-
   DBUG_ENTER("next_event");
 
   DBUG_ASSERT(thd != 0);
@@ -5954,7 +5824,7 @@ static Log_event* next_event(Relay_log_i
       if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 || force))
       {
         ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
-        mts_checkpoint_routine(rli, period, force, TRUE); // ALFRANIO ERROR
+        mts_checkpoint_routine(rli, period, force, TRUE); // TODO: ALFRANIO ERROR
       }
 
       if (hot_log)
@@ -6085,7 +5955,7 @@ static Log_event* next_event(Relay_log_i
 
           do
           {
-            mts_checkpoint_routine(rli, period, FALSE, FALSE); // ALFRANIO ERROR
+            mts_checkpoint_routine(rli, period, FALSE, FALSE); // TODO: ALFRANIO ERROR
             set_timespec_nsec(waittime, period);
             thd->enter_cond(log_cond, log_lock,
                             "Slave has read all relay log; "
@@ -6751,9 +6621,8 @@ int reset_slave(THD *thd, Master_info* m
 
   /* 
     Clear master's log coordinates 
-
-    Andrei needs to guarantee that this done in sequential mode.
   */
+  DBUG_ASSERT(!mi->rli || !mi->rli->slave_running); // none writes in rli table
   mi->init_master_log_pos();
 
   if (remove_info(mi))
@@ -7074,9 +6943,9 @@ bool change_master(THD* thd, Master_info
     info and relay log info are prepared to handle events from all
     masters. In such case, we need to execute the code below for each
     master and correctly set the key_info_idx. /Alfranio
-
-    Andrei needs to guarantee that this done in sequential mode.
   */
+
+  DBUG_ASSERT(!mi->rli->slave_running); // none writes in rli table
   ret= mi->rli->flush_info(TRUE);
   mysql_cond_broadcast(&mi->data_cond);
   mysql_mutex_unlock(&mi->rli->data_lock);

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-05-25 07:36:36 +0000
+++ b/sql/sql_base.cc	2011-05-25 15:58:07 +0000
@@ -125,7 +125,7 @@ static void init_tdc_psi_keys(void)
 }
 #endif /* HAVE_PSI_INTERFACE */
 
-static int32 incr_slave_open_temp_tables(THD *thd, int inc)
+static void incr_slave_open_temp_tables(THD *thd, int inc)
 {
   int32 ret;
   

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2011-01-11 23:01:02 +0000
+++ b/sql/sql_parse.cc	2011-05-25 15:58:07 +0000
@@ -5164,7 +5164,7 @@ bool check_stack_overrun(THD *thd, long 
 			 uchar *buf __attribute__((unused)))
 {
   long stack_used;
-  DBUG_ASSERT(thd == current_thd);  // mts-II: be prepared to hit it
+  DBUG_ASSERT(thd == current_thd);
   if ((stack_used=used_stack(thd->thread_stack,(char*) &stack_used)) >=
       (long) (my_thread_stack_size - margin))
   {


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110525155807-sg2fqz4ijci7ubje.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569WL#5599 WL#5754Andrei Elkin25 May
  • Re: bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281)WL#5569 WL#5599 WL#5754Davi Arnaut25 May