3620 magnus.blaudd@stripped 2011-10-31 [merge]
Merge in latest ndbcluster with schema dist refactorings
added:
sql/ndb_event_data.cc
sql/ndb_event_data.h
modified:
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_glue.h
sql/ndb_thd.cc
sql/ndb_thd.h
storage/ndb/CMakeLists.txt
3619 magnus.blaudd@stripped 2011-10-27 [merge]
Merge
added:
sql/ndb_anyvalue.cc
sql/ndb_anyvalue.h
sql/ndb_binlog_extra_row_info.cc
sql/ndb_binlog_extra_row_info.h
sql/ndb_ndbapi_util.cc
sql/ndb_ndbapi_util.h
modified:
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_connection.cc
sql/ndb_share.h
storage/ndb/CMakeLists.txt
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-10-27 11:57:04 +0000
+++ b/sql/ha_ndbcluster.cc 2011-10-29 09:02:21 +0000
@@ -53,6 +53,7 @@
#include "ndb_conflict_trans.h"
#include "ndb_anyvalue.h"
#include "ndb_binlog_extra_row_info.h"
+#include "ndb_event_data.h"
// ndb interface initialization/cleanup
extern "C" void ndb_init_internal();
@@ -8760,6 +8761,31 @@ ha_ndbcluster::start_transaction_part_id
/**
+ Static error print function called from static handler method
+ ndbcluster_commit and ndbcluster_rollback.
+*/
+static void
+ndbcluster_print_error(int error, const NdbOperation *error_op)
+{
+ DBUG_ENTER("ndbcluster_print_error");
+ TABLE_SHARE share;
+ const char *tab_name= (error_op) ? error_op->getTableName() : "";
+ if (tab_name == NULL)
+ {
+ DBUG_ASSERT(tab_name != NULL);
+ tab_name= "";
+ }
+ share.db.str= (char*) "";
+ share.db.length= 0;
+ share.table_name.str= (char *) tab_name;
+ share.table_name.length= strlen(tab_name);
+ ha_ndbcluster error_handler(ndbcluster_hton, &share);
+ error_handler.print_error(error, MYF(0));
+ DBUG_VOID_RETURN;
+}
+
+
+/**
Commit a transaction started in NDB.
*/
@@ -11841,7 +11867,7 @@ static int ndbcluster_close_connection(h
/**
Try to discover one table from NDB.
*/
-
+static
int ndbcluster_discover(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
@@ -11962,7 +11988,7 @@ err:
/**
Check if a table exists in NDB.
*/
-
+static
int ndbcluster_table_exists_in_engine(handlerton *hton, THD* thd,
const char *db,
const char *name)
@@ -12751,30 +12777,6 @@ void ha_ndbcluster::print_error(int erro
/**
- Static error print function called from static handler method
- ndbcluster_commit and ndbcluster_rollback.
-*/
-
-void ndbcluster_print_error(int error, const NdbOperation *error_op)
-{
- DBUG_ENTER("ndbcluster_print_error");
- TABLE_SHARE share;
- const char *tab_name= (error_op) ? error_op->getTableName() : "";
- if (tab_name == NULL)
- {
- DBUG_ASSERT(tab_name != NULL);
- tab_name= "";
- }
- share.db.str= (char*) "";
- share.db.length= 0;
- share.table_name.str= (char *) tab_name;
- share.table_name.length= strlen(tab_name);
- ha_ndbcluster error_handler(ndbcluster_hton, &share);
- error_handler.print_error(error, MYF(0));
- DBUG_VOID_RETURN;
-}
-
-/**
Set a given location from full pathname to database name.
*/
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2011-10-27 11:48:43 +0000
+++ b/sql/ha_ndbcluster.h 2011-10-28 07:14:45 +0000
@@ -42,7 +42,6 @@ class NdbBlob;
class NdbIndexStat;
class NdbEventOperation;
class ha_ndbcluster_cond;
-class Ndb_event_data;
class NdbQuery;
class NdbQueryOperation;
class NdbQueryOperationTypeWrapper;
@@ -497,9 +496,6 @@ private:
const uchar *record,
bool use_active_index);
friend int ndbcluster_drop_database_impl(THD *thd, const char *path);
- friend int ndb_handle_schema_change(THD *thd,
- Ndb *ndb, NdbEventOperation *pOp,
- NDB_SHARE *share);
void check_read_before_write_removal();
static int drop_table_impl(THD *thd, ha_ndbcluster *h, Ndb *ndb,
@@ -814,12 +810,6 @@ private:
int add_handler_to_open_tables(THD*, Thd_ndb*, ha_ndbcluster* handler);
};
-int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
- const void** frmblob, uint* frmlen);
-int ndbcluster_table_exists_in_engine(THD* thd,
- const char *db, const char *name);
-void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
static const char ndbcluster_hton_name[]= "ndbcluster";
static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
extern int ndbcluster_terminating;
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-10-27 09:34:06 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-10-29 14:21:55 +0000
@@ -58,6 +58,7 @@ bool ndb_log_empty_epochs(void);
#include "ndb_dist_priv_util.h"
#include "ndb_anyvalue.h"
#include "ndb_binlog_extra_row_info.h"
+#include "ndb_event_data.h"
/*
Timeout for syncing schema events between
@@ -141,9 +142,9 @@ static int ndbcluster_binlog_terminating
Mutex and condition used for interacting between client sql thread
and injector thread
*/
-pthread_t ndb_binlog_thread;
-pthread_mutex_t injector_mutex;
-pthread_cond_t injector_cond;
+static pthread_t ndb_binlog_thread;
+static pthread_mutex_t injector_mutex;
+static pthread_cond_t injector_cond;
/* NDB Injector thread (used for binlog creation) */
static ulonglong ndb_latest_applied_binlog_epoch= 0;
@@ -152,7 +153,7 @@ static ulonglong ndb_latest_received_bin
NDB_SHARE *ndb_apply_status_share= 0;
NDB_SHARE *ndb_schema_share= 0;
-pthread_mutex_t ndb_schema_share_mutex;
+static pthread_mutex_t ndb_schema_share_mutex;
extern my_bool opt_log_slave_updates;
static my_bool g_ndb_log_slave_updates;
@@ -260,20 +261,6 @@ static void dbug_print_table(const char
#define dbug_print_table(a,b)
#endif
-static inline void
-print_warning_list(const char* prefix, List<MYSQL_ERROR>& list)
-{
- List_iterator_fast<MYSQL_ERROR> it(list);
- MYSQL_ERROR *err;
- while ((err= it++))
- {
- sql_print_warning("%s: (%d)%s",
- prefix,
- MYSQL_ERROR_get_sql_errno(err),
- MYSQL_ERROR_get_message_text(err));
- }
-}
-
static void run_query(THD *thd, char *buf, char *end,
const int *no_print_error)
@@ -2087,7 +2074,7 @@ end:
static
int
ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
- Ndb_event_data *event_data)
+ const Ndb_event_data *event_data)
{
DBUG_ENTER("ndb_handle_schema_change");
NDB_SHARE *share= event_data->share;
@@ -2207,14 +2194,87 @@ private:
class Ndb_schema_event_handler {
- struct Cluster_schema
+ class Ndb_schema_op
{
+ // Unpack Ndb_schema_op from event_data pointer
+ void unpack_event(const Ndb_event_data *event_data)
+ {
+ TABLE *table= event_data->shadow_table;
+ Field **field;
+ /* unpack blob values */
+ uchar* blobs_buffer= 0;
+ uint blobs_buffer_size= 0;
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ {
+ ptrdiff_t ptrdiff= 0;
+ int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
+ blobs_buffer, blobs_buffer_size,
+ ptrdiff);
+ if (ret != 0)
+ {
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_PRINT("info", ("blob read error"));
+ DBUG_ASSERT(FALSE);
+ }
+ }
+ /* db varchar 1 length uchar */
+ field= table->field;
+ db_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(db_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(db));
+ memcpy(db, (*field)->ptr + 1, db_length);
+ db[db_length]= 0;
+ /* name varchar 1 length uchar */
+ field++;
+ name_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(name_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(name));
+ memcpy(name, (*field)->ptr + 1, name_length);
+ name[name_length]= 0;
+ /* slock fixed length */
+ field++;
+ slock_length= (*field)->field_length;
+ DBUG_ASSERT((*field)->field_length == sizeof(slock_buf));
+ memcpy(slock_buf, (*field)->ptr, slock_length);
+ /* query blob */
+ field++;
+ {
+ Field_blob *field_blob= (Field_blob*)(*field);
+ uint blob_len= field_blob->get_length((*field)->ptr);
+ uchar *blob_ptr= 0;
+ field_blob->get_ptr(&blob_ptr);
+ DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
+ query_length= blob_len;
+ query= sql_strmake((char*) blob_ptr, blob_len);
+ }
+ /* node_id */
+ field++;
+ node_id= (Uint32)((Field_long *)*field)->val_int();
+ /* epoch */
+ field++;
+ epoch= ((Field_long *)*field)->val_int();
+ /* id */
+ field++;
+ id= (Uint32)((Field_long *)*field)->val_int();
+ /* version */
+ field++;
+ version= (Uint32)((Field_long *)*field)->val_int();
+ /* type */
+ field++;
+ type= (Uint32)((Field_long *)*field)->val_int();
+ /* free blobs buffer */
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ }
+
+ public:
uchar db_length;
char db[64];
uchar name_length;
char name[64];
uchar slock_length;
- uint32 slock[SCHEMA_SLOCK_SIZE/4];
+ uint32 slock_buf[SCHEMA_SLOCK_SIZE/4];
+ MY_BITMAP slock;
unsigned short query_length;
char *query;
Uint64 epoch;
@@ -2223,99 +2283,52 @@ class Ndb_schema_event_handler {
uint32 version;
uint32 type;
uint32 any_value;
- };
+ /**
+ Create a Ndb_schema_op from event_data
+ */
+ static Ndb_schema_op*
+ create(const Ndb_event_data* event_data,
+ Uint32 any_value)
+ {
+ DBUG_ENTER("Ndb_schema_op::create");
+ Ndb_schema_op* schema_op=
+ (Ndb_schema_op*)sql_alloc(sizeof(Ndb_schema_op));
+ bitmap_init(&schema_op->slock,
+ schema_op->slock_buf, 8*SCHEMA_SLOCK_SIZE, FALSE);
+ schema_op->unpack_event(event_data);
+ schema_op->any_value= any_value;
+ DBUG_PRINT("exit", ("%s.%s: query: '%s' type: %d",
+ schema_op->db, schema_op->name,
+ schema_op->query,
+ schema_op->type));
+ DBUG_RETURN(schema_op);
+ }
+ };
static void
print_could_not_discover_error(THD *thd,
- const Cluster_schema *schema)
+ const Ndb_schema_op *schema)
{
sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d. "
"my_errno: %d",
schema->db, schema->name, schema->query,
schema->node_id, my_errno);
- print_warning_list("NDB Binlog", thd_warn_list(thd));
+ thd_print_warning_list(thd, "NDB Binlog");
}
- /*
- Transfer schema table event data into Cluster_schema struct
- */
- static void ndbcluster_get_schema(const Ndb_event_data *event_data,
- Cluster_schema *s)
+ static void
+ write_schema_op_to_binlog(THD *thd, Ndb_schema_op *schema)
{
- TABLE *table= event_data->shadow_table;
- Field **field;
- /* unpack blob values */
- uchar* blobs_buffer= 0;
- uint blobs_buffer_size= 0;
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- {
- ptrdiff_t ptrdiff= 0;
- int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
- blobs_buffer, blobs_buffer_size,
- ptrdiff);
- if (ret != 0)
- {
- my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- DBUG_PRINT("info", ("blob read error"));
- DBUG_ASSERT(FALSE);
- }
- }
- /* db varchar 1 length uchar */
- field= table->field;
- s->db_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->db_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
- memcpy(s->db, (*field)->ptr + 1, s->db_length);
- s->db[s->db_length]= 0;
- /* name varchar 1 length uchar */
- field++;
- s->name_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->name_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
- memcpy(s->name, (*field)->ptr + 1, s->name_length);
- s->name[s->name_length]= 0;
- /* slock fixed length */
- field++;
- s->slock_length= (*field)->field_length;
- DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
- memcpy(s->slock, (*field)->ptr, s->slock_length);
- /* query blob */
- field++;
- {
- Field_blob *field_blob= (Field_blob*)(*field);
- uint blob_len= field_blob->get_length((*field)->ptr);
- uchar *blob_ptr= 0;
- field_blob->get_ptr(&blob_ptr);
- DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
- s->query_length= blob_len;
- s->query= sql_strmake((char*) blob_ptr, blob_len);
- }
- /* node_id */
- field++;
- s->node_id= (Uint32)((Field_long *)*field)->val_int();
- /* epoch */
- field++;
- s->epoch= ((Field_long *)*field)->val_int();
- /* id */
- field++;
- s->id= (Uint32)((Field_long *)*field)->val_int();
- /* version */
- field++;
- s->version= (Uint32)((Field_long *)*field)->val_int();
- /* type */
- field++;
- s->type= (Uint32)((Field_long *)*field)->val_int();
- /* free blobs buffer */
- my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- dbug_tmp_restore_column_map(table->read_set, old_map);
- }
+ if (!ndb_binlog_running)
+ {
+ // This mysqld is not writing a binlog
+ return;
+ }
- static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
- {
/* any_value == 0 means local cluster sourced change that
* should be logged
*/
@@ -2379,24 +2392,23 @@ class Ndb_schema_event_handler {
/*
- acknowledge handling of schema operation
+ Acknowledge handling of schema operation
+ - Inform the other nodes that schema op has
+ been completed by this node (by updating the
+ row for this op in ndb_schema table)
*/
- static int
- ndbcluster_update_slock(THD *thd,
- const char *db,
- const char *table_name,
- uint32 table_id,
- uint32 table_version)
+ int
+ ack_schema_op(const char *db, const char *table_name,
+ uint32 table_id, uint32 table_version)
{
- DBUG_ENTER("ndbcluster_update_slock");
+ DBUG_ENTER("ack_schema_op");
if (!ndb_schema_share)
{
DBUG_RETURN(0);
}
const NdbError *ndb_error= 0;
- uint32 node_id= g_ndb_cluster_connection->node_id();
- Ndb *ndb= check_ndb_in_thd(thd);
+ Ndb *ndb= check_ndb_in_thd(m_thd);
char save_db[FN_HEADLEN];
strcpy(save_db, ndb->getDatabaseName());
@@ -2469,7 +2481,7 @@ class Ndb_schema_event_handler {
{
uint32 copy[SCHEMA_SLOCK_SIZE/4];
memcpy(copy, bitbuf, sizeof(copy));
- bitmap_clear_bit(&slock, node_id);
+ bitmap_clear_bit(&slock, own_nodeid());
sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
db, table_name,
table_id, table_version,
@@ -2479,7 +2491,7 @@ class Ndb_schema_event_handler {
}
else
{
- bitmap_clear_bit(&slock, node_id);
+ bitmap_clear_bit(&slock, own_nodeid());
}
{
@@ -2505,7 +2517,7 @@ class Ndb_schema_event_handler {
r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
DBUG_ASSERT(r == 0);
/* node_id */
- r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+ r|= op->setValue(SCHEMA_NODE_ID_I, own_nodeid());
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
@@ -2515,14 +2527,15 @@ class Ndb_schema_event_handler {
NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
{
DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
- node_id, db, table_name));
+ own_nodeid(), db, table_name));
dict->forceGCPWait(1);
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError && !thd->killed)
+ if (this_error->status == NdbError::TemporaryError &&
+ !thd_killed(m_thd))
{
if (retries--)
{
@@ -2538,12 +2551,10 @@ class Ndb_schema_event_handler {
if (ndb_error)
{
- char buf[1024];
- my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
- db, table_name);
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- ndb_error->code, ndb_error->message, buf);
+ sql_print_warning("NDB: Could not release slock on '%s.%s', "
+ "Error code: %d Message: %s",
+ db, table_name,
+ ndb_error->code, ndb_error->message);
}
if (trans)
ndb->closeTransaction(trans);
@@ -2552,43 +2563,100 @@ class Ndb_schema_event_handler {
}
-static int
-handle_schema_event(THD *thd, Ndb *s_ndb,
- NdbEventOperation *pOp,
- List<Cluster_schema> *post_epoch_log_list,
- List<Cluster_schema> *post_epoch_unlock_list,
- MEM_ROOT *mem_root)
-{
- DBUG_ENTER("handle_schema_event");
- Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
- NDB_SHARE *tmp_share= event_data->share;
- if (tmp_share && ndb_schema_share == tmp_share)
+ bool check_is_ndb_schema_event(const Ndb_event_data* event_data) const
+ {
+ if (!event_data)
+ {
+ // Received event without event data pointer
+ assert(false);
+ return false;
+ }
+
+ NDB_SHARE *share= event_data->share;
+ if (!share)
+ {
+ // Received event where the event_data is not properly initialized
+ assert(false);
+ return false;
+ }
+ assert(event_data->shadow_table);
+ assert(event_data->ndb_value[0]);
+ assert(event_data->ndb_value[1]);
+
+ pthread_mutex_lock(&ndb_schema_share_mutex);
+ if (share != ndb_schema_share)
+ {
+ // Received event from s_ndb not pointing at the ndb_schema_share
+ pthread_mutex_unlock(&ndb_schema_share_mutex);
+ assert(false);
+ return false;
+ }
+ assert(!strncmp(share->db, STRING_WITH_LEN(NDB_REP_DB)));
+ assert(!strncmp(share->table_name, STRING_WITH_LEN(NDB_SCHEMA_TABLE)));
+ pthread_mutex_unlock(&ndb_schema_share_mutex);
+ return true;
+ }
+
+
+ void
+ log_after_epoch(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("log_after_epoch");
+ m_post_epoch_log_list.push_back(schema, m_mem_root);
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ unlock_after_epoch(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("unlock_after_epoch");
+ m_post_epoch_unlock_list.push_back(schema, m_mem_root);
+ DBUG_VOID_RETURN;
+ }
+
+
+ uint own_nodeid(void) const
+ {
+ return m_own_nodeid;
+ }
+
+
+ bool
+ check_if_local_tables_in_db(const char *dbname) const
{
- NDBEVENT::TableEvent ev_type= pOp->getEventType();
- DBUG_PRINT("enter", ("%s.%s ev_type: %d",
- tmp_share->db, tmp_share->table_name, ev_type));
- if (ev_type == NDBEVENT::TE_UPDATE ||
- ev_type == NDBEVENT::TE_INSERT)
- {
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- Ndb *ndb= thd_ndb->ndb;
- NDBDICT *dict= ndb->getDictionary();
- Thd_ndb_options_guard thd_ndb_options(thd_ndb);
- Cluster_schema *schema= (Cluster_schema *)
- sql_alloc(sizeof(Cluster_schema));
- MY_BITMAP slock;
- bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
- uint node_id= g_ndb_cluster_connection->node_id();
- {
- ndbcluster_get_schema(event_data, schema);
- schema->any_value= pOp->getAnyValue();
- }
- enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
- DBUG_PRINT("info",
- ("%s.%s: log query_length: %d query: '%s' type: %d",
- schema->db, schema->name,
- schema->query_length, schema->query,
- schema_type));
+ DBUG_ENTER("check_if_local_tables_in_db");
+ DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
+ List<LEX_STRING> files;
+ char path[FN_REFLEN + 1];
+
+ build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
+ if (find_files(m_thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
+ {
+ m_thd->clear_error();
+ DBUG_PRINT("info", ("Failed to find files"));
+ DBUG_RETURN(true);
+ }
+ DBUG_PRINT("info",("found: %d files", files.elements));
+
+ LEX_STRING *tabname;
+ while ((tabname= files.pop()))
+ {
+ DBUG_PRINT("info", ("Found table %s", tabname->str));
+ if (ndbcluster_check_if_local_table(dbname, tabname->str))
+ DBUG_RETURN(true);
+ }
+
+ DBUG_RETURN(false);
+ }
+
+
+ int
+ handle_schema_op(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_schema_op");
+ {
+ const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
if (opt_ndb_extra_logging > 19)
{
@@ -2599,7 +2667,8 @@ handle_schema_event(THD *thd, Ndb *s_ndb
get_schema_type_name(schema_type),
schema_type,
schema->node_id,
- slock.bitmap[0], slock.bitmap[1]);
+ schema->slock.bitmap[0],
+ schema->slock.bitmap[1]);
}
if ((schema->db[0] == 0) && (schema->name[0] == 0))
@@ -2611,6 +2680,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
*/
DBUG_RETURN(0);
}
+
switch (schema_type)
{
case SOT_CLEAR_SLOCK:
@@ -2619,24 +2689,29 @@ handle_schema_event(THD *thd, Ndb *s_ndb
schema events get inserted in the binlog after any data
events
*/
- post_epoch_log_list->push_back(schema, mem_root);
+ log_after_epoch(schema);
DBUG_RETURN(0);
case SOT_ALTER_TABLE_COMMIT:
case SOT_RENAME_TABLE_PREPARE:
case SOT_ONLINE_ALTER_TABLE_PREPARE:
case SOT_ONLINE_ALTER_TABLE_COMMIT:
- post_epoch_log_list->push_back(schema, mem_root);
- post_epoch_unlock_list->push_back(schema, mem_root);
+ log_after_epoch(schema);
+ unlock_after_epoch(schema);
DBUG_RETURN(0);
default:
break;
}
- if (schema->node_id != node_id)
+ if (schema->node_id != own_nodeid())
{
- int log_query= 0, post_epoch_unlock= 0;
+ THD* thd= m_thd; // Code compatibility
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Ndb *ndb= thd_ndb->ndb;
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+
+ int post_epoch_unlock= 0;
switch (schema_type)
{
@@ -2652,7 +2727,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
schema->query + schema->query_length,
no_print_error);
/* binlog dropping table after any table operations */
- post_epoch_log_list->push_back(schema, mem_root);
+ log_after_epoch(schema);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
@@ -2667,7 +2742,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
(schema_type == SOT_DROP_TABLE ? "dropp" : "renam"),
schema->db, schema->name, schema->query,
schema->node_id);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
}
// Fall through
case SOT_TRUNCATE_TABLE:
@@ -2687,7 +2762,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
{
{
ndb->setDatabaseName(schema->db);
- Ndb_table_guard ndbtab_g(dict, schema->name);
+ Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
ndbtab_g.invalidate();
}
TABLE_LIST table_list;
@@ -2722,20 +2797,20 @@ handle_schema_event(THD *thd, Ndb *s_ndb
{
print_could_not_discover_error(thd, schema);
}
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
break;
case SOT_DROP_DB:
/* Drop the database locally if it only contains ndb tables */
thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
+ if (!check_if_local_tables_in_db(schema->db))
{
const int no_print_error[1]= {0};
run_query(thd, schema->query,
schema->query + schema->query_length,
no_print_error);
/* binlog dropping database after any table operations */
- post_epoch_log_list->push_back(schema, mem_root);
+ log_after_epoch(schema);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
@@ -2746,7 +2821,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
"binlog schema event '%s' from node %d. ",
schema->db, schema->query,
schema->node_id);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
}
break;
@@ -2758,7 +2833,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
run_query(thd, schema->query,
schema->query + schema->query_length,
no_print_error);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
break;
}
@@ -2779,13 +2854,13 @@ handle_schema_event(THD *thd, Ndb *s_ndb
run_query(thd, cmd,
cmd + strlen(cmd),
no_print_error);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
break;
}
case SOT_TABLESPACE:
case SOT_LOGFILE_GROUP:
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
break;
case SOT_ALTER_TABLE_COMMIT:
@@ -2799,159 +2874,44 @@ handle_schema_event(THD *thd, Ndb *s_ndb
break;
}
- if (log_query && ndb_binlog_running)
- ndb_binlog_query(thd, schema);
+
/* signal that schema operation has been handled */
- DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
- if (bitmap_is_set(&slock, node_id))
+ DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
+ if (bitmap_is_set(&schema->slock, own_nodeid()))
{
if (post_epoch_unlock)
- post_epoch_unlock_list->push_back(schema, mem_root);
+ unlock_after_epoch(schema);
else
- ndbcluster_update_slock(thd, schema->db, schema->name,
- schema->id, schema->version);
+ ack_schema_op(schema->db, schema->name,
+ schema->id, schema->version);
}
}
- DBUG_RETURN(0);
- }
- /*
- the normal case of UPDATE/INSERT has already been handled
- */
- switch (ev_type)
- {
- case NDBEVENT::TE_DELETE:
- // skip
- break;
- case NDBEVENT::TE_CLUSTER_FAILURE:
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
- ndb_schema_share->key,
- (uint)(pOp->getGCI() >> 32),
- (uint)(pOp->getGCI()));
- // fall through
- case NDBEVENT::TE_DROP:
- if (opt_ndb_extra_logging &&
- ndb_binlog_tables_inited && ndb_binlog_running)
- sql_print_information("NDB Binlog: ndb tables initially "
- "read only on reconnect.");
-
- /* begin protect ndb_schema_share */
- pthread_mutex_lock(&ndb_schema_share_mutex);
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- ndb_schema_share->key,
- ndb_schema_share->use_count));
- free_share(&ndb_schema_share);
- ndb_schema_share= 0;
- ndb_binlog_tables_inited= FALSE;
- ndb_binlog_is_ready= FALSE;
- pthread_mutex_unlock(&ndb_schema_share_mutex);
- /* end protect ndb_schema_share */
-
- close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
- // fall through
- case NDBEVENT::TE_ALTER:
- ndb_handle_schema_change(thd, s_ndb, pOp, event_data);
- break;
- case NDBEVENT::TE_NODE_FAILURE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- DBUG_ASSERT(node_id != 0xFF);
- pthread_mutex_lock(&tmp_share->mutex);
- bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
- DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, down,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
- break;
- }
- case NDBEVENT::TE_SUBSCRIBE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- uint8 req_id= pOp->getReqNodeId();
- DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- pthread_mutex_lock(&tmp_share->mutex);
- bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
- DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- req_id,
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
- break;
- }
- case NDBEVENT::TE_UNSUBSCRIBE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- uint8 req_id= pOp->getReqNodeId();
- DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- pthread_mutex_lock(&tmp_share->mutex);
- bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
- DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- req_id,
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
- break;
- }
- default:
- sql_print_error("NDB Binlog: unknown non data event %d for %s. "
- "Ignoring...", (unsigned) ev_type, tmp_share->key);
}
+ DBUG_RETURN(0);
}
- DBUG_RETURN(0);
-}
-
-/*
- process any operations that should be done after
- the epoch is complete
-*/
-static void
-handle_schema_log_post_epoch(THD *thd,
- List<Cluster_schema> *log_list)
-{
- DBUG_ENTER("handle_schema_log_post_epoch");
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- Ndb *ndb= thd_ndb->ndb;
- NDBDICT *dict= ndb->getDictionary();
-
- Cluster_schema *schema;
- while ((schema= log_list->pop()))
+ void
+ handle_schema_op_post_epoch(Ndb_schema_op* schema)
{
+ DBUG_ENTER("handle_schema_op_post_epoch");
+ THD* thd = m_thd; // Code compatibility
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Ndb *ndb= thd_ndb->ndb;
+ NDBDICT *dict= ndb->getDictionary();
Thd_ndb_options_guard thd_ndb_options(thd_ndb);
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
schema->query_length, schema->query,
schema->type));
- int log_query= 0;
+
{
- enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
+ const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
char key[FN_REFLEN + 1];
build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
if (schema_type == SOT_CLEAR_SLOCK)
{
+ /* Ack to any SQL thread waiting for schema op to complete */
pthread_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
@@ -2968,10 +2928,10 @@ handle_schema_log_post_epoch(THD *thd,
key, schema->id, schema->version,
ndb_schema_object->slock[0],
ndb_schema_object->slock[1],
- schema->slock[0],
- schema->slock[1]);
+ schema->slock_buf[0],
+ schema->slock_buf[1]);
}
- memcpy(ndb_schema_object->slock, schema->slock,
+ memcpy(ndb_schema_object->slock, schema->slock_buf,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(uchar*)ndb_schema_object->slock_bitmap.bitmap,
@@ -2998,8 +2958,15 @@ handle_schema_log_post_epoch(THD *thd,
}
}
pthread_mutex_unlock(&ndbcluster_mutex);
- continue;
+ DBUG_VOID_RETURN;
}
+
+ if (opt_ndb_extra_logging > 9)
+ sql_print_information("%s - %s.%s",
+ get_schema_type_name(schema_type),
+ schema->db ? schema->db : "(null)",
+ schema->name ? schema->name : "(null)");
+
/* ndb_share reference temporary, free below */
NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
if (share)
@@ -3010,12 +2977,11 @@ handle_schema_log_post_epoch(THD *thd,
switch (schema_type)
{
case SOT_DROP_DB:
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
break;
+
case SOT_DROP_TABLE:
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_DROP_TABLE %s.%s", schema->db, schema->name);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
{
ndb->setDatabaseName(schema->db);
Ndb_table_guard ndbtab_g(dict, schema->name);
@@ -3029,29 +2995,25 @@ handle_schema_log_post_epoch(THD *thd,
close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
}
break;
+
case SOT_RENAME_TABLE:
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_RENAME_TABLE %s.%s", schema->db, schema->name);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
if (share)
{
ndbcluster_rename_share(thd, share);
}
break;
+
case SOT_RENAME_TABLE_PREPARE:
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_RENAME_TABLE_PREPARE %s.%s -> %s",
- schema->db, schema->name, schema->query);
if (share &&
schema->node_id != g_ndb_cluster_connection->node_id())
ndbcluster_prepare_rename_share(share, schema->query);
break;
+
case SOT_ALTER_TABLE_COMMIT:
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
if (schema->node_id == g_ndb_cluster_connection->node_id())
break;
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
{
ndb->setDatabaseName(schema->db);
Ndb_table_guard ndbtab_g(dict, schema->name);
@@ -3112,8 +3074,6 @@ handle_schema_log_post_epoch(THD *thd,
case SOT_ONLINE_ALTER_TABLE_PREPARE:
{
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name);
int error= 0;
ndb->setDatabaseName(schema->db);
{
@@ -3141,7 +3101,7 @@ handle_schema_log_post_epoch(THD *thd,
DBUG_PRINT("info", ("Detected frm change of table %s.%s",
schema->db, schema->name));
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
/*
If the there is no local table shadowing the altered table and
@@ -3220,10 +3180,9 @@ handle_schema_log_post_epoch(THD *thd,
}
break;
}
+
case SOT_ONLINE_ALTER_TABLE_COMMIT:
{
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
if (share)
{
pthread_mutex_lock(&share->mutex);
@@ -3245,10 +3204,9 @@ handle_schema_log_post_epoch(THD *thd,
}
break;
}
+
case SOT_RENAME_TABLE_NEW:
- if (opt_ndb_extra_logging > 9)
- sql_print_information("SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name);
- log_query= 1;
+ write_schema_op_to_binlog(thd, schema);
if (ndb_binlog_running && (!share || !share->op))
{
/*
@@ -3279,9 +3237,11 @@ handle_schema_log_post_epoch(THD *thd,
}
}
break;
+
default:
DBUG_ASSERT(FALSE);
}
+
if (share)
{
/* ndb_share reference temporary free */
@@ -3291,40 +3251,55 @@ handle_schema_log_post_epoch(THD *thd,
share= 0;
}
}
- if (ndb_binlog_running && log_query)
- ndb_binlog_query(thd, schema);
+
+ DBUG_VOID_RETURN;
}
- DBUG_VOID_RETURN;
-}
+ /*
+ process any operations that should be done after
+ the epoch is complete
+ */
+ void
+ handle_schema_log_post_epoch(List<Ndb_schema_op> *log_list)
+ {
+ DBUG_ENTER("handle_schema_log_post_epoch");
+
+ Ndb_schema_op* schema;
+ while ((schema= log_list->pop()))
+ {
+ handle_schema_op_post_epoch(schema);
+ }
+ DBUG_VOID_RETURN;
+ }
-static void
-handle_schema_unlock_post_epoch(THD *thd,
- List<Cluster_schema> *unlock_list)
-{
- DBUG_ENTER("handle_schema_unlock_post_epoch");
- Cluster_schema *schema;
- while ((schema= unlock_list->pop()))
+ void
+ handle_schema_unlock_post_epoch(List<Ndb_schema_op> *unlock_list)
{
- ndbcluster_update_slock(thd, schema->db, schema->name,
- schema->id, schema->version);
+ DBUG_ENTER("handle_schema_unlock_post_epoch");
+
+ Ndb_schema_op *schema;
+ while ((schema= unlock_list->pop()))
+ {
+ ack_schema_op(schema->db, schema->name,
+ schema->id, schema->version);
+ }
+ DBUG_VOID_RETURN;
}
- DBUG_VOID_RETURN;
-}
THD* m_thd;
MEM_ROOT* m_mem_root;
+ uint m_own_nodeid;
- List<Cluster_schema> m_post_epoch_log_list;
- List<Cluster_schema> m_post_epoch_unlock_list;
+ List<Ndb_schema_op> m_post_epoch_log_list;
+ List<Ndb_schema_op> m_post_epoch_unlock_list;
public:
Ndb_schema_event_handler(); // Not implemented
Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
- Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root):
- m_thd(thd), m_mem_root(mem_root)
+ Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root, uint own_nodeid):
+ m_thd(thd), m_mem_root(mem_root), m_own_nodeid(own_nodeid)
{
}
@@ -3337,19 +3312,148 @@ public:
void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
{
- handle_schema_event(m_thd, s_ndb, pOp,
- &m_post_epoch_log_list,
- &m_post_epoch_unlock_list,
- m_mem_root);
+ DBUG_ENTER("handle_event");
+
+ const Ndb_event_data *event_data=
+ static_cast<const Ndb_event_data*>(pOp->getCustomData());
+
+ if (!check_is_ndb_schema_event(event_data))
+ DBUG_VOID_RETURN;
+
+ const NDBEVENT::TableEvent ev_type= pOp->getEventType();
+ switch (ev_type)
+ {
+ case NDBEVENT::TE_INSERT:
+ case NDBEVENT::TE_UPDATE:
+ {
+ /* ndb_schema table, row INSERTed or UPDATEed*/
+ Ndb_schema_op* schema_op=
+ Ndb_schema_op::create(event_data, pOp->getAnyValue());
+ handle_schema_op(schema_op);
+ break;
+ }
+
+ case NDBEVENT::TE_DELETE:
+ /* ndb_schema table, row DELETEd */
+ break;
+
+ case NDBEVENT::TE_CLUSTER_FAILURE:
+ if (opt_ndb_extra_logging)
+ sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
+ ndb_schema_share->key,
+ (uint)(pOp->getGCI() >> 32),
+ (uint)(pOp->getGCI()));
+ // fall through
+ case NDBEVENT::TE_DROP:
+ /* ndb_schema table DROPped */
+ if (opt_ndb_extra_logging &&
+ ndb_binlog_tables_inited && ndb_binlog_running)
+ sql_print_information("NDB Binlog: ndb tables initially "
+ "read only on reconnect.");
+
+ /* release the ndb_schema_share */
+ pthread_mutex_lock(&ndb_schema_share_mutex);
+ free_share(&ndb_schema_share);
+ ndb_schema_share= 0;
+ ndb_binlog_tables_inited= FALSE;
+ ndb_binlog_is_ready= FALSE;
+ pthread_mutex_unlock(&ndb_schema_share_mutex);
+
+ close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
+ // fall through
+ case NDBEVENT::TE_ALTER:
+ /* ndb_schema table ALTERed */
+ ndb_handle_schema_change(m_thd, s_ndb, pOp, event_data);
+ break;
+
+ case NDBEVENT::TE_NODE_FAILURE:
+ {
+ /* Remove all subscribers for node from bitmap in ndb_schema_share */
+ NDB_SHARE *tmp_share= event_data->share;
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ DBUG_ASSERT(node_id != 0xFF);
+ pthread_mutex_lock(&tmp_share->mutex);
+ bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
+ DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
+ if (opt_ndb_extra_logging)
+ {
+ sql_print_information("NDB Binlog: Node: %d, down,"
+ " Subscriber bitmask %x%x",
+ pOp->getNdbdNodeId(),
+ tmp_share->subscriber_bitmap[node_id].bitmap[1],
+ tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+ }
+ pthread_mutex_unlock(&tmp_share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+
+ case NDBEVENT::TE_SUBSCRIBE:
+ {
+ /* Add node as subscriber from bitmap in ndb_schema_share */
+ NDB_SHARE *tmp_share= event_data->share;
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ uint8 req_id= pOp->getReqNodeId();
+ DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+ pthread_mutex_lock(&tmp_share->mutex);
+ bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
+ DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
+ if (opt_ndb_extra_logging)
+ {
+ sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
+ " Subscriber bitmask %x%x",
+ pOp->getNdbdNodeId(),
+ req_id,
+ tmp_share->subscriber_bitmap[node_id].bitmap[1],
+ tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+ }
+ pthread_mutex_unlock(&tmp_share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+
+ case NDBEVENT::TE_UNSUBSCRIBE:
+ {
+ /* Remove node as subscriber from bitmap in ndb_schema_share */
+ NDB_SHARE *tmp_share= event_data->share;
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ uint8 req_id= pOp->getReqNodeId();
+ DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+ pthread_mutex_lock(&tmp_share->mutex);
+ bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
+ DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
+ if (opt_ndb_extra_logging)
+ {
+ sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
+ " Subscriber bitmask %x%x",
+ pOp->getNdbdNodeId(),
+ req_id,
+ tmp_share->subscriber_bitmap[node_id].bitmap[1],
+ tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+ }
+ pthread_mutex_unlock(&tmp_share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+
+ default:
+ {
+ NDB_SHARE *tmp_share= event_data->share;
+ sql_print_error("NDB Binlog: unknown non data event %d for %s. "
+ "Ignoring...", (unsigned) ev_type, tmp_share->key);
+ }
+ }
+
+ DBUG_VOID_RETURN;
}
void post_epoch()
{
if (m_post_epoch_log_list.elements > 0)
{
- handle_schema_log_post_epoch(m_thd, &m_post_epoch_log_list);
+ handle_schema_log_post_epoch(&m_post_epoch_log_list);
// NOTE post_epoch_unlock_list may not be handled!
- handle_schema_unlock_post_epoch(m_thd, &m_post_epoch_unlock_list);
+ handle_schema_unlock_post_epoch(&m_post_epoch_unlock_list);
}
// There should be no work left todo...
DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
@@ -4784,7 +4888,7 @@ err:
*conflict_fn_spec= NULL;
if (ndberror.code && opt_ndb_extra_logging)
- print_warning_list("NDB", thd_warn_list(thd));
+ thd_print_warning_list(thd, "NDB");
DBUG_RETURN(ndberror.code);
}
@@ -4990,32 +5094,6 @@ ndbcluster_check_if_local_table(const ch
DBUG_RETURN(false);
}
-bool
-ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
-{
- DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
- DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
- LEX_STRING *tabname;
- List<LEX_STRING> files;
- char path[FN_REFLEN + 1];
-
- build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
- if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
- {
- thd->clear_error();
- DBUG_PRINT("info", ("Failed to find files"));
- DBUG_RETURN(true);
- }
- DBUG_PRINT("info",("found: %d files", files.elements));
- while ((tabname= files.pop()))
- {
- DBUG_PRINT("info", ("Found table %s", tabname->str));
- if (ndbcluster_check_if_local_table(dbname, tabname->str))
- DBUG_RETURN(true);
- }
-
- DBUG_RETURN(false);
-}
/*
Common function for setting up everything for logging a table at
@@ -5930,7 +6008,8 @@ handle_non_data_event(THD *thd,
NdbEventOperation *pOp,
ndb_binlog_index_row &row)
{
- Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
+ const Ndb_event_data* event_data=
+ static_cast<const Ndb_event_data*>(pOp->getCustomData());
NDB_SHARE *share= event_data->share;
NDBEVENT::TableEvent type= pOp->getEventType();
@@ -6939,7 +7018,9 @@ restart_cluster_failure:
// The Ndb_schema_event_handler does not necessarily need
// to use the same memroot(or vice versa)
- Ndb_schema_event_handler schema_event_handler(thd, &mem_root);
+ Ndb_schema_event_handler
+ schema_event_handler(thd, &mem_root,
+ g_ndb_cluster_connection->node_id());
*root_ptr= &mem_root;
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2011-10-27 09:34:06 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2011-10-29 14:21:55 +0000
@@ -31,35 +31,6 @@ typedef NdbDictionary::Event NDBEVENT;
extern handlerton *ndbcluster_hton;
-class Ndb_event_data
-{
-public:
- Ndb_event_data(NDB_SHARE *the_share) :
- shadow_table(0),
- share(the_share)
- {
- ndb_value[0]= 0;
- ndb_value[1]= 0;
- }
- ~Ndb_event_data()
- {
- if (shadow_table)
- closefrm(shadow_table, 1);
- shadow_table= 0;
- free_root(&mem_root, MYF(0));
- share= 0;
- /*
- ndbvalue[] allocated with my_multi_malloc
- so only first pointer should be freed
- */
- my_free(ndb_value[0], MYF(MY_WME|MY_ALLOW_ZERO_PTR));
- }
- MEM_ROOT mem_root;
- TABLE *shadow_table;
- NDB_SHARE *share;
- NdbValue *ndb_value[2];
-};
-
/*
The numbers below must not change as they
are passed between mysql servers, and if changed
@@ -218,6 +189,3 @@ int cmp_frm(const NDBTAB *ndbtab, const
*/
bool
ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
-bool
-ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
-
=== modified file 'sql/ha_ndbcluster_glue.h'
--- a/sql/ha_ndbcluster_glue.h 2011-10-05 07:16:59 +0000
+++ b/sql/ha_ndbcluster_glue.h 2011-10-29 09:18:21 +0000
@@ -129,40 +129,6 @@ Diagnostics_area* thd_stmt_da(THD* thd)
#endif
}
-/* extract the list of warnings from THD */
-static inline
-List<MYSQL_ERROR>& thd_warn_list(const THD * thd)
-{
-#if MYSQL_VERSION_ID < 50500
- return const_cast<THD*>(thd)->warn_list;
-#else
- /* "options" has moved to "variables.option_bits" */
- return thd->warning_info->warn_list();
-#endif
-}
-
-static inline
-const char* MYSQL_ERROR_get_message_text(const MYSQL_ERROR* err)
-{
-#if MYSQL_VERSION_ID < 50500
- return err->msg;
-#else
- /* "msg" is gone, use accessor */
- return err->get_message_text();
-#endif
-}
-
-static inline
-uint MYSQL_ERROR_get_sql_errno(const MYSQL_ERROR* err)
-{
-#if MYSQL_VERSION_ID < 50500
- return err->code;
-#else
- /* "code" is gone, use accessor */
- return err->get_sql_errno();
-#endif
-}
-
#if MYSQL_VERSION_ID < 50500
/*
=== added file 'sql/ndb_event_data.cc'
--- a/sql/ndb_event_data.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_event_data.cc 2011-10-29 09:02:21 +0000
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_event_data.h"
+
+#include <table.h>
+
+
+Ndb_event_data::Ndb_event_data(NDB_SHARE *the_share) :
+ shadow_table(NULL),
+ share(the_share)
+{
+ ndb_value[0]= NULL;
+ ndb_value[1]= NULL;
+}
+
+
+Ndb_event_data::~Ndb_event_data()
+{
+ if (shadow_table)
+ closefrm(shadow_table, 1);
+ shadow_table= NULL;
+ free_root(&mem_root, MYF(0));
+ share= NULL;
+ /*
+ ndbvalue[] allocated with my_multi_malloc -> only
+ first pointer need to be freed
+ */
+ my_free(ndb_value[0]);
+}
=== added file 'sql/ndb_event_data.h'
--- a/sql/ndb_event_data.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_event_data.h 2011-10-29 09:02:21 +0000
@@ -0,0 +1,39 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_EVENT_DATA_H
+#define NDB_EVENT_DATA_H
+
+#include <my_global.h> // my_alloc.h
+#include <my_alloc.h> // MEM_ROOT
+
+class Ndb_event_data
+{
+public:
+ Ndb_event_data(); // Not implemented
+ Ndb_event_data(const Ndb_event_data&); // Not implemented
+ Ndb_event_data(struct NDB_SHARE *the_share);
+
+ ~Ndb_event_data();
+
+ MEM_ROOT mem_root;
+ struct TABLE *shadow_table;
+ struct NDB_SHARE *share;
+ union NdbValue *ndb_value[2];
+};
+
+#endif
=== modified file 'sql/ndb_thd.cc'
--- a/sql/ndb_thd.cc 2011-03-08 22:08:44 +0000
+++ b/sql/ndb_thd.cc 2011-10-29 09:18:21 +0000
@@ -43,3 +43,23 @@ Ndb* check_ndb_in_thd(THD* thd, bool val
return thd_ndb->ndb;
}
+
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
+#include <sql_class.h>
+
+void
+thd_print_warning_list(THD* thd, const char* prefix)
+{
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
+ MYSQL_ERROR *err;
+ while ((err= it++))
+ {
+ sql_print_warning("%s: (%d)%s",
+ prefix,
+ err->get_sql_errno(),
+ err->get_message_text());
+ }
+}
=== modified file 'sql/ndb_thd.h'
--- a/sql/ndb_thd.h 2011-03-08 12:59:56 +0000
+++ b/sql/ndb_thd.h 2011-10-29 09:18:21 +0000
@@ -51,4 +51,9 @@ thd_set_thd_ndb(THD *thd, class Thd_ndb
/* Make sure THD has a Thd_ndb struct assigned */
class Ndb* check_ndb_in_thd(THD* thd, bool validate_ndb= false);
+
+/* Print thd's list of warnings to error log */
+void
+thd_print_warning_list(THD* thd, const char* prefix);
+
#endif
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2011-10-27 09:34:06 +0000
+++ b/storage/ndb/CMakeLists.txt 2011-10-29 09:02:21 +0000
@@ -83,6 +83,7 @@ SET(NDBCLUSTER_SOURCES
../../sql/ndb_anyvalue.cc
../../sql/ndb_ndbapi_util.cc
../../sql/ndb_binlog_extra_row_info.cc
+ ../../sql/ndb_event_data.cc
)
# Include directories used when building ha_ndbcluster
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster branch (magnus.blaudd:3619 to 3620) | magnus.blaudd | 1 Nov |