#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5563-paraslave_part_db/ based on revid:aelkin@stripped
3197 Andrei Elkin 2010-09-12
wl#5563
Rows-event parallelization basically is implemented although tested shallowly. Write access to rli central stuct by workers may be be eliminated entirely at this phase. E.g that relates to errors. (todo: leave rli out of Worker scope
modified:
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-09-11 14:00:08 +0000
+++ b/sql/log_event.cc 2010-09-12 11:04:38 +0000
@@ -3436,7 +3436,7 @@ int Query_log_event::do_apply_event(Rela
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
- if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock)
+ if (strcmp("COMMIT", query) == 0 && *(rli->get_tables_to_lock()) != NULL)
{
/*
Cleaning-up the last statement context:
@@ -7770,7 +7770,7 @@ int Rows_log_event::do_apply_event(Relay
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rli->sql_thd == thd || rli->is_parallel_exec());
/*
If there is no locks taken, this is the first binrow event seen
@@ -7822,7 +7822,7 @@ int Rows_log_event::do_apply_event(Relay
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
+ if (open_and_lock_tables(thd, *rli->get_tables_to_lock(), FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
@@ -7853,8 +7853,8 @@ int Rows_log_event::do_apply_event(Relay
{
DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
- rli->tables_to_lock));
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
+ *rli->get_tables_to_lock()));
+ RPL_TABLE_LIST *ptr= *rli->get_tables_to_lock();
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
TABLE *conv_table;
@@ -7894,19 +7894,23 @@ int Rows_log_event::do_apply_event(Relay
Rows_log_event, we can invalidate the query cache for the
associated table.
*/
- for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= *rli->get_tables_to_lock() ; ptr ; ptr= ptr->next_global)
{
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ if (!rli->is_parallel_exec())
+ const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ else
+ rli->get_current_worker()->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(*rli->get_tables_to_lock());
#endif
}
TABLE*
- table=
- m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-
+ table= m_table= (!rli->is_parallel_exec()) ?
+ const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id) :
+ rli->get_current_worker()->m_table_map.get_table(m_table_id);
+
DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
if (table)
@@ -8626,6 +8630,8 @@ int Table_map_log_event::do_apply_event(
int error= 0;
+ // mts-II todo: consider filtering
+
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
(!rpl_filter->db_ok(table_list->db) ||
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
@@ -8655,9 +8661,12 @@ int Table_map_log_event::do_apply_event(
We record in the slave's information that the table should be
locked by linking the table into the list of tables to lock.
*/
- table_list->next_global= table_list->next_local= rli->tables_to_lock;
- const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
- const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+ table_list->next_global= table_list->next_local= *rli->get_tables_to_lock();
+
+ //const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+ //const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
+ const_cast<Relay_log_info*>(rli)->add_table_to_lock(table_list);
+
/* 'memory' is freed in clear_tables_to_lock */
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-09-09 18:43:16 +0000
+++ b/sql/rpl_rli.cc 2010-09-12 11:04:38 +0000
@@ -110,7 +110,42 @@ Relay_log_info::~Relay_log_info()
DBUG_VOID_RETURN;
}
+Slave_worker* Relay_log_info::get_current_worker() const
+{
+ uint i;
+ Slave_worker* w_i;
+ if (!is_parallel_exec())
+ return NULL;
+ for (i= 0; i< workers.elements; i++)
+ {
+ // convert to use hashing
+ get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
+ if (w_i->thd == current_thd)
+ return w_i;
+ }
+ DBUG_ASSERT(0);;
+}
+
+RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
+{
+ return
+ is_parallel_exec() ?
+ &get_current_worker()->tables_to_lock :
+ const_cast<RPL_TABLE_LIST**>(&tables_to_lock);
+}
+uint Relay_log_info::add_table_to_lock(RPL_TABLE_LIST *table_list)
+{
+ if (!is_parallel_exec())
+ tables_to_lock= table_list;
+ else
+ get_current_worker()->tables_to_lock= table_list;
+ return
+ (!is_parallel_exec()) ?
+ tables_to_lock_count++ :
+ get_current_worker()->tables_to_lock_count++;
+}
+
/**
Wrapper around Relay_log_info::init(const char *).
@@ -1296,7 +1331,7 @@ void Relay_log_info::cleanup_context(THD
{
DBUG_ENTER("Relay_log_info::cleanup_context");
- DBUG_ASSERT(sql_thd == thd);
+ DBUG_ASSERT(sql_thd == thd || is_parallel_exec());
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
@@ -1314,7 +1349,11 @@ void Relay_log_info::cleanup_context(THD
trans_rollback_stmt(thd); // if a "statement transaction"
trans_rollback(thd); // if a "real transaction"
}
- m_table_map.clear_tables();
+ if (!is_parallel_exec())
+ m_table_map.clear_tables();
+ else
+ get_current_worker()->m_table_map.clear_tables();
+
slave_close_thread_tables(thd);
if (error)
thd->mdl_context.release_transactional_locks();
@@ -1329,20 +1368,26 @@ void Relay_log_info::cleanup_context(THD
void Relay_log_info::clear_tables_to_lock()
{
- while (tables_to_lock)
+ RPL_TABLE_LIST **p_tables= get_tables_to_lock();
+ Slave_worker* w_c= get_current_worker();
+ while ((*p_tables))
{
- uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
- if (tables_to_lock->m_tabledef_valid)
+ uchar* to_free= reinterpret_cast<uchar*>((*p_tables));
+ if ((*p_tables)->m_tabledef_valid)
{
- tables_to_lock->m_tabledef.table_def::~table_def();
- tables_to_lock->m_tabledef_valid= FALSE;
+ (*p_tables)->m_tabledef.table_def::~table_def();
+ (*p_tables)->m_tabledef_valid= FALSE;
}
- tables_to_lock=
- static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
- tables_to_lock_count--;
+ (*p_tables)=
+ static_cast<RPL_TABLE_LIST*>((*p_tables)->next_global);
+ if (w_c)
+ w_c->tables_to_lock_count--;
+ else
+ tables_to_lock_count--;
my_free(to_free);
}
- DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
+ DBUG_ASSERT((*p_tables) == NULL &&
+ ((!w_c && tables_to_lock_count == 0) || w_c->tables_to_lock_count == 0));
}
void Relay_log_info::slave_close_thread_tables(THD *thd)
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-09-10 17:32:39 +0000
+++ b/sql/rpl_rli.h 2010-09-12 11:04:38 +0000
@@ -35,7 +35,7 @@ struct slave_job_item
void *data;
};
-struct slave_worker
+typedef struct slave_worker
{
// operational
THD *thd;
@@ -47,12 +47,17 @@ struct slave_worker
ulong id;
TABLE *current_table;
+ // rbr
+ RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
+ uint tables_to_lock_count; /* RBR: Count of tables to lock */
+ table_mapping m_table_map; /* RBR: Mapping table-id to table */
+
// statictics
ulong wait_jobs; // to gather statistics how many times got idle
ulong stmt_jobs; // how many jobs per stmt
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
-};
+} Slave_worker;
class List_jobs : public List<struct slave_job_item>
{
@@ -282,7 +287,7 @@ public:
ulong slave_exec_mode;
/*
- TODO/WL4648: the Worker pool as a dyn array of THD
+ WL#5563 mts-II
*/
DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
volatile int pending_jobs;
@@ -309,6 +314,10 @@ public:
List<slave_job_item> free_jobs;
// MEM_ROOT jobs_mem_root;
MEM_ROOT job_list_mem_root; // similar to worker's mem_root
+ Slave_worker* get_current_worker() const;
+ RPL_TABLE_LIST** get_tables_to_lock() const;
+ uint add_table_to_lock(RPL_TABLE_LIST *table_list);
+
THD * sql_thd;
#ifndef DBUG_OFF
@@ -429,8 +438,9 @@ public:
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
+ TABLE_LIST *tables= *get_tables_to_lock();
DBUG_ASSERT(tabledef_var && conv_table_var);
- for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
if (ptr->table == table_arg)
{
*tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-09-10 17:32:39 +0000
+++ b/sql/rpl_slave.cc 2010-09-12 11:04:38 +0000
@@ -3454,6 +3454,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_unlock(&rli->pending_jobs_lock);
mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
thd= (w->thd= new THD);
+ w->tables_to_lock= NULL;
+ w->tables_to_lock_count= 0;
thd->thread_stack = (char*)&thd;
Attachment: [text/bzr-bundle] bzr/aelkin@mysql.com-20100912110438-cp4ojww0ut1ydmxz.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr branch (aelkin:3197) WL#5563 | Andrei Elkin | 12 Sep |