List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:September 12 2010 11:04am
Subject:bzr commit into mysql-next-mr branch (aelkin:3197) WL#5563
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5563-paraslave_part_db/ based on revid:aelkin@stripped

 3197 Andrei Elkin	2010-09-12
      wl#5563
      
      Rows-event parallelization basically is implemented although tested shallowly. Write access to rli central stuct by workers may be be eliminated entirely at this phase. E.g that relates to errors. (todo: leave rli out of Worker scope

    modified:
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-09-11 14:00:08 +0000
+++ b/sql/log_event.cc	2010-09-12 11:04:38 +0000
@@ -3436,7 +3436,7 @@ int Query_log_event::do_apply_event(Rela
   DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
 
   clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
-  if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock)
+  if (strcmp("COMMIT", query) == 0 && *(rli->get_tables_to_lock()) != NULL)
   {
     /*
       Cleaning-up the last statement context:
@@ -7770,7 +7770,7 @@ int Rows_log_event::do_apply_event(Relay
     do_apply_event(). We still check here to prevent future coding
     errors.
   */
-  DBUG_ASSERT(rli->sql_thd == thd);
+  DBUG_ASSERT(rli->sql_thd == thd || rli->is_parallel_exec());
 
   /*
     If there is no locks taken, this is the first binrow event seen
@@ -7822,7 +7822,7 @@ int Rows_log_event::do_apply_event(Relay
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
-    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
+    if (open_and_lock_tables(thd, *rli->get_tables_to_lock(), FALSE, 0))
     {
       uint actual_error= thd->stmt_da->sql_errno();
       if (thd->is_slave_error || thd->is_fatal_error)
@@ -7853,8 +7853,8 @@ int Rows_log_event::do_apply_event(Relay
 
     {
       DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
-                           rli->tables_to_lock));
-      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
+                           *rli->get_tables_to_lock()));
+      RPL_TABLE_LIST *ptr= *rli->get_tables_to_lock();
       for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
       {
         TABLE *conv_table;
@@ -7894,19 +7894,23 @@ int Rows_log_event::do_apply_event(Relay
       Rows_log_event, we can invalidate the query cache for the
       associated table.
      */
-    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= *rli->get_tables_to_lock() ; ptr ; ptr= ptr->next_global)
     {
-      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+      if (!rli->is_parallel_exec())
+        const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+      else
+        rli->get_current_worker()->m_table_map.set_table(ptr->table_id, ptr->table);
     }
 #ifdef HAVE_QUERY_CACHE
-    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
+    query_cache.invalidate_locked_for_write(*rli->get_tables_to_lock());
 #endif
   }
 
   TABLE* 
-    table= 
-    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-
+    table= m_table= (!rli->is_parallel_exec()) ?
+    const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id) :
+    rli->get_current_worker()->m_table_map.get_table(m_table_id);
+  
   DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
   if (table)
@@ -8626,6 +8630,8 @@ int Table_map_log_event::do_apply_event(
 
   int error= 0;
 
+  // mts-II todo: consider filtering
+
   if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
       (!rpl_filter->db_ok(table_list->db) ||
        (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
@@ -8655,9 +8661,12 @@ int Table_map_log_event::do_apply_event(
       We record in the slave's information that the table should be
       locked by linking the table into the list of tables to lock.
     */
-    table_list->next_global= table_list->next_local= rli->tables_to_lock;
-    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
-    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+    table_list->next_global= table_list->next_local= *rli->get_tables_to_lock();
+
+    //const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+    //const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+    const_cast<Relay_log_info*>(rli)->add_table_to_lock(table_list);
+
     /* 'memory' is freed in clear_tables_to_lock */
   }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-09-09 18:43:16 +0000
+++ b/sql/rpl_rli.cc	2010-09-12 11:04:38 +0000
@@ -110,7 +110,42 @@ Relay_log_info::~Relay_log_info()
   DBUG_VOID_RETURN;
 }
 
+Slave_worker* Relay_log_info::get_current_worker() const
+{ 
+  uint i;
+  Slave_worker* w_i;
+  if (!is_parallel_exec())
+    return NULL;
+  for (i= 0; i< workers.elements; i++)
+  {
+    // convert to use hashing
+    get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
+    if (w_i->thd == current_thd)
+      return w_i;
+  }
+  DBUG_ASSERT(0);;
+}
+
+RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
+{
+  return
+    is_parallel_exec() ?
+    &get_current_worker()->tables_to_lock :
+    const_cast<RPL_TABLE_LIST**>(&tables_to_lock);
+}
 
+uint Relay_log_info::add_table_to_lock(RPL_TABLE_LIST *table_list)
+{
+  if (!is_parallel_exec())
+    tables_to_lock= table_list;
+  else
+    get_current_worker()->tables_to_lock= table_list;
+  return 
+    (!is_parallel_exec()) ? 
+    tables_to_lock_count++ : 
+    get_current_worker()->tables_to_lock_count++;
+}
+             
 /**
   Wrapper around Relay_log_info::init(const char *).
 
@@ -1296,7 +1331,7 @@ void Relay_log_info::cleanup_context(THD
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
 
-  DBUG_ASSERT(sql_thd == thd);
+  DBUG_ASSERT(sql_thd == thd || is_parallel_exec());
   /*
     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
     may have opened tables, which we cannot be sure have been closed (because
@@ -1314,7 +1349,11 @@ void Relay_log_info::cleanup_context(THD
     trans_rollback_stmt(thd); // if a "statement transaction"
     trans_rollback(thd);      // if a "real transaction"
   }
-  m_table_map.clear_tables();
+  if (!is_parallel_exec())
+    m_table_map.clear_tables();
+  else
+    get_current_worker()->m_table_map.clear_tables();
+
   slave_close_thread_tables(thd);
   if (error)
     thd->mdl_context.release_transactional_locks();
@@ -1329,20 +1368,26 @@ void Relay_log_info::cleanup_context(THD
 
 void Relay_log_info::clear_tables_to_lock()
 {
-  while (tables_to_lock)
+  RPL_TABLE_LIST **p_tables= get_tables_to_lock();
+  Slave_worker* w_c= get_current_worker();
+  while ((*p_tables))
   {
-    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
-    if (tables_to_lock->m_tabledef_valid)
+    uchar* to_free= reinterpret_cast<uchar*>((*p_tables));
+    if ((*p_tables)->m_tabledef_valid)
     {
-      tables_to_lock->m_tabledef.table_def::~table_def();
-      tables_to_lock->m_tabledef_valid= FALSE;
+      (*p_tables)->m_tabledef.table_def::~table_def();
+      (*p_tables)->m_tabledef_valid= FALSE;
     }
-    tables_to_lock=
-      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
-    tables_to_lock_count--;
+    (*p_tables)=
+      static_cast<RPL_TABLE_LIST*>((*p_tables)->next_global);
+    if (w_c)
+      w_c->tables_to_lock_count--;
+    else
+      tables_to_lock_count--;
     my_free(to_free);
   }
-  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
+  DBUG_ASSERT((*p_tables) == NULL &&
+              ((!w_c && tables_to_lock_count == 0) || w_c->tables_to_lock_count == 0));
 }
 
 void Relay_log_info::slave_close_thread_tables(THD *thd)

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-09-10 17:32:39 +0000
+++ b/sql/rpl_rli.h	2010-09-12 11:04:38 +0000
@@ -35,7 +35,7 @@ struct slave_job_item
   void *data;
 };
 
-struct slave_worker
+typedef struct slave_worker
 {
   // operational
   THD *thd;
@@ -47,12 +47,17 @@ struct slave_worker
   ulong id;
   TABLE *current_table;
 
+  // rbr
+  RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
+  uint tables_to_lock_count;        /* RBR: Count of tables to lock */
+  table_mapping m_table_map;      /* RBR: Mapping table-id to table */
+
   // statictics
   ulong wait_jobs;  // to gather statistics how many times got idle
   ulong stmt_jobs;  // how many jobs per stmt
   ulong trans_jobs;  // how many jobs per trns
   volatile int curr_jobs; // the current assignments
-};
+} Slave_worker;
 
 class List_jobs : public List<struct slave_job_item>
 {
@@ -282,7 +287,7 @@ public:
   ulong slave_exec_mode;
 
   /*
-    TODO/WL4648: the Worker pool as a dyn array of THD
+    WL#5563 mts-II
   */
   DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
   volatile int pending_jobs;
@@ -309,6 +314,10 @@ public:
   List<slave_job_item> free_jobs;
   // MEM_ROOT jobs_mem_root;
   MEM_ROOT job_list_mem_root; // similar to worker's mem_root
+  Slave_worker* get_current_worker() const;
+  RPL_TABLE_LIST** get_tables_to_lock() const;
+  uint add_table_to_lock(RPL_TABLE_LIST *table_list);
+
   THD * sql_thd;
 
 #ifndef DBUG_OFF
@@ -429,8 +438,9 @@ public:
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
+    TABLE_LIST *tables= *get_tables_to_lock();
     DBUG_ASSERT(tabledef_var && conv_table_var);
-    for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
       if (ptr->table == table_arg)
       {
         *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-09-10 17:32:39 +0000
+++ b/sql/rpl_slave.cc	2010-09-12 11:04:38 +0000
@@ -3454,6 +3454,8 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&rli->pending_jobs_lock);
   mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
   thd= (w->thd= new THD);
+  w->tables_to_lock= NULL;
+  w->tables_to_lock_count= 0;
 
   thd->thread_stack = (char*)&thd;
   


Attachment: [text/bzr-bundle] bzr/aelkin@mysql.com-20100912110438-cp4ojww0ut1ydmxz.bundle
Thread
bzr commit into mysql-next-mr branch (aelkin:3197) WL#5563Andrei Elkin12 Sep