List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 30 2010 2:40pm
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3222) WL#5569
WL#5599
View as plain text  
 3222 Andrei Elkin	2010-11-30 [merge]
      merging from from wl#5569 repo containing wl#5599 integration

    modified:
      sql/binlog.cc
      sql/log_event.cc
      sql/rpl_constants.h
      sql/rpl_info.cc
      sql/rpl_info.h
      sql/rpl_info_dummy.cc
      sql/rpl_info_dummy.h
      sql/rpl_info_factory.cc
      sql/rpl_info_factory.h
      sql/rpl_mi.cc
      sql/rpl_mi.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-30 02:08:01 +0000
+++ b/sql/log_event.cc	2010-11-30 14:39:40 +0000
@@ -2411,13 +2411,32 @@ void append_item_to_jobs(slave_job_item 
                          Slave_worker *w, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
+
   DBUG_ASSERT(thd == current_thd);
   thd_proc_info(thd, "Feeding an event to a worker thread");
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
 
-  // todo: modify condition for waiting basing on sizes of worker' queues
+  // C waits basing on *data* sizes in the queues
+  while (rli->mts_pending_jobs_size +
+         ((Log_event*) (job_item->data))->data_written
+         > rli->mts_pending_jobs_size_max)
+  {
+    const char *old_msg;
+    rli->mts_wqs_oversize= TRUE;
+    rli->wait_jobs++;
+    old_msg= thd->enter_cond(&rli->pending_jobs_cond,
+                             &rli->pending_jobs_lock,
+                             "Waiting for Workers to unload queues");
+    mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
+    thd->exit_cond(old_msg);
+    mysql_mutex_lock(&rli->pending_jobs_lock);
+    if (thd->killed)
+      return;
+  }
+  rli->mts_pending_jobs_size += ((Log_event*) (job_item->data))->data_written;
 
+#if 0  
   while (rli->pending_jobs >= rli->slave_pending_jobs_max)
   {
     const char *old_msg;
@@ -2432,25 +2451,47 @@ void append_item_to_jobs(slave_job_item 
     if (thd->killed)
       return;
   }
+#endif
+
   rli->stmt_jobs++;
   rli->pending_jobs++;
   
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
+  // sleep while all queue lengths are gt Underrun
+  // sleep time lasts the longer the further WQ:s shift to Overrun
+  // Workers report their U,O status
+
+  if (rli->mts_wqs_underrun_w_id != (ulong) -1)
+  {
+    // todo: experiment with weight to get a good approximation formula
+    ulong nap_weight= rli->mts_wqs_overrun + 1;
+    my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
+  }
+
   if (!w->info_thd->killed)
   {
     int ret;
 
     mysql_mutex_lock(&w->jobs_lock);
 
-    ret= en_queue(&w->jobs, job_item);
-
-    DBUG_ASSERT(ret >= 0);
+    // possible WQ overfill
+    while (!thd->killed && (ret= en_queue(&w->jobs, job_item)) == -1)
+    {
+      const char *old_msg;
+      old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
+                               "Waiting for an event from sql thread");
+      w->jobs.overfill= TRUE;
+      w->jobs.waited_overfill++;
+      mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+      thd->exit_cond(old_msg);
 
+      mysql_mutex_lock(&w->jobs_lock);
+    }
     w->curr_jobs++;
     if (w->jobs.len == 1)
       mysql_cond_signal(&w->jobs_cond);
-    
+
     mysql_mutex_unlock(&w->jobs_lock);
   }
   else
@@ -2664,6 +2705,14 @@ int slave_worker_exec_job(Slave_worker *
 
   mysql_mutex_lock(&w->jobs_lock);
   de_queue(&w->jobs, job_item);
+
+  /* possible overfill */
+  if (w->jobs.len == w->jobs.s - 1 && w->jobs.overfill == TRUE)
+  {
+    w->jobs.overfill= FALSE;
+    mysql_cond_signal(&w->jobs_cond);
+  }
+
   /*
     preserving signatures of existing methods.
     todo: convert update_pos(w->w_rli) -> update_pos(w)
@@ -2672,7 +2721,6 @@ int slave_worker_exec_job(Slave_worker *
   if (!error)
     ev->update_pos(w->w_rli);
 
-  // delete ev;  // after ev->update_pos() event is garbage
 
   mysql_mutex_unlock(&w->jobs_lock);
 
@@ -2680,18 +2728,57 @@ int slave_worker_exec_job(Slave_worker *
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs--;
-  DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
+  rli->mts_pending_jobs_size -= ev->data_written;
+  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+  
+  // underrun
+  if ((rli->mts_worker_underrun_level * w->jobs.s) / 100 >  w->jobs.len)
+  {
+    rli-> mts_wqs_underrun_w_id= w->id;
+    // todo:
+    // w->underrun_cnt++;
+  } else if (rli->mts_wqs_underrun_w_id == w->id)
+  {
+    rli->mts_wqs_underrun_w_id= (ulong) -1;
+  }
 
-  /* coordinator can be waiting */
+  // overrun exploits the underrun level param
+  if (((100 - rli->mts_worker_underrun_level) * w->jobs.s) / 100 < w->jobs.len)
+  {
+    rli->mts_wqs_overrun++;
+    w->wq_overrun_set= TRUE;
+    // todo:
+    // w->underrun_cnt++;
+  }
+  else if (w->wq_overrun_set == TRUE)
+  {
+    rli->mts_wqs_overrun--;
+    w->wq_overrun_set= FALSE;
+  }
+
+  DBUG_ASSERT(rli->mts_wqs_overrun >= 0);
 
-  if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
-      rli->pending_jobs == 0)
+  /* coordinator can be waiting */
+  if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
+      rli->mts_wqs_oversize)  // TODO: unit/general test wqs_oversize
+  {
+    rli->mts_wqs_oversize= FALSE;
     mysql_cond_signal(&rli->pending_jobs_cond);
+  }
+  
+  //DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
+  // if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
+  //     rli->pending_jobs == 0)
+  //   mysql_cond_signal(&rli->pending_jobs_cond);
+  
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   w->stmt_jobs++;
 err:
 
+  // if (!ev)
+  //    delete ev;  // after ev->update_pos() event is garbage
+
   DBUG_RETURN(error);
 }
 

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-11-26 21:08:30 +0000
+++ b/sql/mysqld.cc	2010-11-30 14:02:15 +0000
@@ -464,6 +464,10 @@ ulong slave_parallel_workers;
 ulong slave_max_pending_jobs;
 my_bool slave_local_timestamp_opt;
 my_bool opt_slave_run_query_in_parallel;
+ulong opt_mts_partition_hash_soft_max;
+ulonglong opt_mts_pending_jobs_size_max;
+ulong opt_mts_coordinator_basic_nap;
+ulong opt_mts_worker_underrun_level;
 ulong thread_cache_size=0;
 ulong binlog_cache_size=0;
 ulonglong  max_binlog_cache_size=0;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-11-26 21:08:30 +0000
+++ b/sql/mysqld.h	2010-11-30 14:02:15 +0000
@@ -176,6 +176,11 @@ extern ulong slave_parallel_workers;
 extern ulong slave_max_pending_jobs;
 extern my_bool slave_local_timestamp_opt;
 extern my_bool opt_slave_run_query_in_parallel;
+extern ulong opt_mts_partition_hash_soft_max;
+extern ulonglong opt_mts_pending_jobs_size_max;
+extern ulong opt_mts_coordinator_basic_nap;
+extern ulong opt_mts_worker_underrun_level;
+
 extern uint max_user_connections;
 extern ulong what_to_log,flush_time;
 extern ulong max_prepared_stmt_count, prepared_stmt_count;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_rli.cc	2010-11-30 14:39:40 +0000
@@ -162,9 +162,8 @@ Slave_worker* Relay_log_info::get_curren
     return this_worker; //  can be asserted:  !this_worker => C
   for (i= 0; i< workers.elements; i++)
   {
-    Slave_worker* w_i;
-    // todo: optimaze/replace the loop
-    get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
+    Slave_worker* w_i= *(Slave_worker**)
+      dynamic_array_ptr(const_cast<DYNAMIC_ARRAY*>(&workers), i);
     if (w_i->info_thd == current_thd)
     {
       return w_i;

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_rli.h	2010-11-30 14:39:40 +0000
@@ -425,13 +425,19 @@ public:
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
   ulong   slave_pending_jobs_max;
+  ulonglong   mts_pending_jobs_size;           // actual mem usage by WQ:s
+  ulonglong   mts_pending_jobs_size_max;       // the max forcing to wait by C
+  bool    mts_wqs_oversize;                // C raises flag to wait some memory's released
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold part-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
   bool run_query_in_parallel;   // Query's default db not the actual db as part
-
+  volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
+  volatile long mts_wqs_overrun;    // W to incr and decr
+  long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
+  ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
   Slave_worker* get_current_worker() const;
   Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-30 14:39:40 +0000
@@ -323,6 +323,19 @@ Slave_worker *get_slave_worker(const cha
       hash map.
     */
     my_bool ret;
+    char *db= NULL;
+
+    if (mapping_db_to_worker.records > opt_mts_partition_hash_soft_max)
+    {
+      /* remove zero-usage (todo: relatively rare scheduled) records */
+      for (uint i= 0; i < mapping_db_to_worker.records; i++)
+      {
+        db_worker *entry= (db_worker*) my_hash_element(&mapping_db_to_worker, i);
+        if (entry->usage == 0)
+          my_hash_delete(&mapping_db_to_worker, (uchar*) entry);
+      }
+    }
+
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
     DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
@@ -330,7 +343,6 @@ Slave_worker *get_slave_worker(const cha
       Allocate an entry to be inserted and if the operation fails
       an error is returned.
     */
-    char *db= NULL;
     if (!(db= (char *) my_malloc((size_t)dblength, MYF(0))))
       goto err;
     if (!(entry= (db_worker *) my_malloc(sizeof(db_worker), MYF(0))))

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-30 14:39:40 +0000
@@ -152,6 +152,11 @@ public:
 
 class Slave_jobs_queue : public circular_buffer_queue
 {
+public:
+
+  /* C marks with true, W signals back at queue back to available */
+  bool overfill;
+  ulonglong waited_overfill;
 };
 
 class Slave_worker : public Rpl_info_worker
@@ -195,7 +200,7 @@ public:
   volatile int curr_jobs; // the current assignments
   ulong usage_partition; // number of different partitions handled by this worker
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
-
+  bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
   /*
     We need to make this a dynamic field. /Alfranio
   */

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_slave.cc	2010-11-30 14:39:40 +0000
@@ -2607,15 +2607,10 @@ int apply_event_and_update_pos(Log_event
   if (!ev->when)
     ev->when= my_time(0);
   ev->thd = thd; // because up to this point, ev->thd == 0
-  /*
-    mts-II:
-    exec mode can change dynamicaly e.g SEQUENTIAL default -> PARALLEL
-    but only when the last group has ended
-  */
   if (!rli->is_in_group() && rli->slave_exec_mode != slave_exec_mode_options)
     rli->slave_exec_mode= slave_exec_mode_options;
 
-  int reason= ev->shall_skip(rli);      // TODO: MTS skip handling
+  int reason= ev->shall_skip(rli);
   if (reason == Log_event::EVENT_SKIP_COUNT)
   {
     sql_slave_skip_counter= --rli->slave_skip_counter;
@@ -2623,7 +2618,13 @@ int apply_event_and_update_pos(Log_event
   }
   if (reason == Log_event::EVENT_SKIP_NOT)
   {
+    /* 
+       MTS-todo: to test neither skipping nor delayed-exec logics
+       are affected by parallel exec mode.
+    */
+
     // Sleeps if needed, and unlocks rli->data_lock.
+
     if (sql_delay_event(ev, thd, rli))
       DBUG_RETURN(0);
     exec_res= ev->apply_event(rli);
@@ -2812,11 +2813,17 @@ static int exec_relay_log_event(THD* thd
     /*
       This tests if the position of the beginning of the current event
       hits the UNTIL barrier.
+      MTS: since master,relay-group coordinates change per checkpoint
+      at the end of the checkpoint interval UNTIL can be left far behind.
+      Hence, UNTIL forces the sequential applying.
+
+      TODO: to not let to start with UNTIL whenever @@global.max_slave_workers>0.
     */
     if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
         rli->is_until_satisfied(thd, ev))
     {
       char buf[22];
+
       sql_print_information("Slave SQL thread stopped because it reached its"
                             " UNTIL position %s", llstr(rli->until_pos(), buf));
       /*
@@ -2875,6 +2882,13 @@ static int exec_relay_log_event(THD* thd
       else
       */
  
+      /* MTS:  Observation/todo.
+
+         ROWS_QUERY_LOG_EVENT could be supported easier if
+         destructing part of handle_rows_query_log_event would be merged
+         with rli->cleanup_context() and the rest move into 
+         ROWS...::do_apply_event
+      */
       if (thd->variables.binlog_rows_query_log_events)
         handle_rows_query_log_event(ev, rli);
 
@@ -3517,6 +3531,7 @@ pthread_handler_t handle_slave_worker(vo
   Slave_worker *w= (Slave_worker *) arg;
   Relay_log_info* rli= w->c_rli;
   ulong purge_cnt= 0;
+  ulonglong purge_size= 0;
   struct slave_job_item _item, *job_item= &_item;
 
   my_thread_init();
@@ -3574,6 +3589,7 @@ pthread_handler_t handle_slave_worker(vo
   while(de_queue(&w->jobs, job_item))
   {
     purge_cnt++;
+    purge_size += ((Log_event*) (job_item->data))->data_written;
     DBUG_ASSERT(job_item->data);
     delete static_cast<Log_event*>(job_item->data);
   }
@@ -3584,6 +3600,7 @@ pthread_handler_t handle_slave_worker(vo
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs -= purge_cnt;
+  rli->mts_pending_jobs_size -= purge_size;
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3644,8 +3661,6 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  // Queue initialization
-  rli->slave_pending_jobs_max= ::slave_max_pending_jobs; // may change while offline
   w->jobs.s= rli->slave_pending_jobs_max + 1;
   my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
   for (k= 0; k < w->jobs.s; k++)
@@ -3656,7 +3671,9 @@ int slave_start_single_worker(Relay_log_
   w->jobs.e= w->jobs.s;
   w->jobs.a= 0;
   w->jobs.len= rli->slave_pending_jobs_max + 1; // to first handshake
-
+  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
+  w->jobs.waited_overfill= 0;
+  w->wq_overrun_set= FALSE;
   set_dynamic(&rli->workers, (uchar*) &w, i);
   mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
                    MY_MUTEX_INIT_FAST);
@@ -3709,6 +3726,15 @@ int slave_start_workers(Relay_log_info *
   rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
                                       sizeof(Slave_job_group),
                                       ::slave_max_pending_jobs, n);
+
+  // size of WQ stays fixed in one slave session
+  rli->slave_pending_jobs_max= ::slave_max_pending_jobs;
+  rli->mts_pending_jobs_size= 0;
+  rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
+  rli->mts_wqs_underrun_w_id= (ulong) -1;
+  rli->mts_wqs_overrun= 0;
+  rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
+  rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
   rli->slave_worker_is_error= NULL;
   rli->curr_group_seen_begin= NULL;
@@ -3790,6 +3816,7 @@ void slave_stop_workers(Relay_log_info *
   }
 
   DBUG_ASSERT(rli->pending_jobs == 0);
+  DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
 
   destroy_hash_workers();
   delete rli->gaq;
@@ -5353,9 +5380,9 @@ static Log_event* next_event(Relay_log_i
       }
 
       /* Reset the relay-log-change-notified status of  Slave Workers */
-      for (uint i; i < rli->workers.elements; i++)
+      for (uint i= 0; i < rli->workers.elements; i++)
       {
-        Slave_worker *w= (Slave_worker *) dynamic_array_ptr(&rli->workers, i);
+        Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&rli->workers, i);
         w->relay_log_change_notified= FALSE;
       }
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-11-26 21:08:30 +0000
+++ b/sql/sys_vars.cc	2010-11-30 14:02:15 +0000
@@ -3109,6 +3109,9 @@ static Sys_var_ulong Sys_slave_parallel_
        "Number of worker threads for executing events in parallel ",
        GLOBAL_VAR(slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
+
+// TODO: redefine slave_max_pending_jobs
+
 static Sys_var_ulong Sys_slave_max_pending_jobs(
        "slave_max_pending_jobs",
        "Number of replication events read out of Relay log and still not applied. "
@@ -3119,7 +3122,7 @@ static Sys_var_ulong Sys_slave_max_pendi
 static Sys_var_mybool Sys_slave_local_timestamp(
        "slave_local_timestamp", "if enabled slave computes the event appying "
        "time value to implicitly affected timestamp columms. Otherwise (default) "
-       "installs prescribed by the master value.",
+       "installs prescribed by the master value",
        GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
 static Sys_var_mybool Sys_slave_run_query_in_parallel(
        "slave_run_query_in_parallel",
@@ -3127,6 +3130,32 @@ static Sys_var_mybool Sys_slave_run_quer
        "for parallel execution of Query_log_event ",
        GLOBAL_VAR(opt_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
        DEFAULT(FALSE));
+static Sys_var_ulong Sys_mts_partition_hash_soft_max(
+       "opt_mts_partition_hash_soft_max",
+       "Number of records in the mts partition hash below which "
+       "entries with zero usage are tolerated",
+       GLOBAL_VAR(opt_mts_partition_hash_soft_max), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, ULONG_MAX), DEFAULT(16), BLOCK_SIZE(1));
+static Sys_var_ulonglong Sys_mts_pending_jobs_size_max(
+       "opt_mts_pending_jobs_size_max",
+       "Max size of Slave Worker queues holding yet not applied events."
+       "The least possible value must be not less than the master size "
+       "max_allowed_packet.",
+       GLOBAL_VAR(opt_mts_pending_jobs_size_max), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(1024, (ulonglong)~(intptr)0), DEFAULT(16 * 1024*1024),
+       BLOCK_SIZE(1024), ON_CHECK(0));
+static Sys_var_ulong Sys_mts_coordinator_basic_nap(
+       "opt_mts_coordinator_basic_nap",
+       "Time in msec to sleep by MTS Coordinator to avoid the Worker queues "
+       "room overrun",
+       GLOBAL_VAR(opt_mts_coordinator_basic_nap), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
+static Sys_var_ulong Sys_mts_worker_underrun_level(
+       "opt_mts_worker_underrun_level",
+       "percent of Worker queue size at which Worker is considered to become "
+       "hungry",
+       GLOBAL_VAR(opt_mts_worker_underrun_level), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, 100), DEFAULT(0), BLOCK_SIZE(1));
 #endif
 
 static bool check_locale(sys_var *self, THD *thd, set_var *var)


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101130143940-k9kypuuu6ysyau6w.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3222) WL#5569WL#5599Andrei Elkin30 Nov