Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1979 05/12/15 09:21:13 tomas@stripped +26 -0
intermediate commit, will be recommitted later
storage/ndb/test/run-test/make-config.sh
1.5 05/12/15 09:21:05 tomas@stripped +1 -1
intermediate commit, will be recommitted later
storage/ndb/test/ndbapi/bank/BankLoad.cpp
1.11 05/12/15 09:21:05 tomas@stripped +1 -1
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
1.5 05/12/15 09:21:05 tomas@stripped +0 -13
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
1.35 05/12/15 09:21:05 tomas@stripped +2 -2
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/Ndbif.cpp
1.35 05/12/15 09:21:05 tomas@stripped +9 -4
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
1.8 05/12/15 09:21:05 tomas@stripped +2 -2
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
1.24 05/12/15 09:21:05 tomas@stripped +84 -17
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/NdbEventOperation.cpp
1.7 05/12/15 09:21:05 tomas@stripped +5 -0
intermediate commit, will be recommitted later
storage/ndb/src/ndbapi/Ndb.cpp
1.60 05/12/15 09:21:05 tomas@stripped +6 -1
intermediate commit, will be recommitted later
storage/ndb/src/kernel/blocks/suma/Suma.hpp
1.8 05/12/15 09:21:05 tomas@stripped +11 -1
intermediate commit, will be recommitted later
storage/ndb/src/kernel/blocks/suma/Suma.cpp
1.29 05/12/15 09:21:05 tomas@stripped +49 -0
intermediate commit, will be recommitted later
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
1.13 05/12/15 09:21:05 tomas@stripped +16 -0
intermediate commit, will be recommitted later
storage/ndb/include/ndbapi/NdbEventOperation.hpp
1.15 05/12/15 09:21:05 tomas@stripped +1 -0
intermediate commit, will be recommitted later
storage/ndb/include/ndbapi/NdbDictionary.hpp
1.58 05/12/15 09:21:05 tomas@stripped +7 -1
intermediate commit, will be recommitted later
sql/ha_ndbcluster_binlog.h
1.2 05/12/15 09:21:05 tomas@stripped +8 -2
intermediate commit, will be recommitted later
sql/ha_ndbcluster_binlog.cc
1.2 05/12/15 09:21:05 tomas@stripped +133 -86
intermediate commit, will be recommitted later
storage/ndb/include/kernel/signaldata/SumaImpl.hpp
1.6 05/12/15 09:21:04 tomas@stripped +1 -1
intermediate commit, will be recommitted later
sql/sql_db.cc
1.121 05/12/15 09:21:04 tomas@stripped +9 -3
intermediate commit, will be recommitted later
sql/handler.h
1.170 05/12/15 09:21:04 tomas@stripped +15 -0
intermediate commit, will be recommitted later
sql/handler.cc
1.195 05/12/15 09:21:04 tomas@stripped +89 -18
intermediate commit, will be recommitted later
sql/ha_ndbcluster.h
1.100 05/12/15 09:21:04 tomas@stripped +9 -15
intermediate commit, will be recommitted later
sql/ha_ndbcluster.cc
1.218 05/12/15 09:21:04 tomas@stripped +113 -1799
intermediate commit, will be recommitted later
sql/Makefile.am
1.124 05/12/15 09:21:04 tomas@stripped +4 -0
intermediate commit, will be recommitted later
mysql-test/t/disabled.def
1.16 05/12/15 09:21:04 tomas@stripped +0 -1
intermediate commit, will be recommitted later
libmysqld/Makefile.am
1.73 05/12/15 09:21:04 tomas@stripped +4 -0
intermediate commit, will be recommitted later
config/ac-macros/ha_ndbcluster.m4
1.11 05/12/15 09:21:04 tomas@stripped +1 -0
intermediate commit, will be recommitted later
sql/ha_ndbcluster_binlog.h
1.1 05/12/14 22:28:30 tomas@stripped +151 -0
sql/ha_ndbcluster_binlog.h
1.0 05/12/14 22:28:30 tomas@stripped +0 -0
BitKeeper file /home/tomas/mysql-5.1-wl2325-repl/sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_binlog.cc
1.1 05/12/14 22:28:26 tomas@stripped +2626 -0
sql/ha_ndbcluster_binlog.cc
1.0 05/12/14 22:28:26 tomas@stripped +0 -0
BitKeeper file /home/tomas/mysql-5.1-wl2325-repl/sql/ha_ndbcluster_binlog.cc
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-wl2325-repl
--- 1.123/sql/Makefile.am 2005-11-24 09:59:02 +01:00
+++ 1.124/sql/Makefile.am 2005-12-15 09:21:04 +01:00
@@ -104,6 +104,7 @@
ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \
ha_blackhole.h ha_federated.h ha_ndbcluster.h \
ha_partition.cc ha_partition.h \
+ ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h \
examples/ha_tina.cc examples/ha_example.cc \
examples/ha_tina.h examples/ha_example.h
mysqld_DEPENDENCIES = @mysql_se_objs@
@@ -159,6 +160,9 @@
$(CXXCOMPILE) @bdb_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster.o:ha_ndbcluster.cc ha_ndbcluster.h
+ $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
+ha_ndbcluster_binlog.o:ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
#Until we can get rid of dependencies on ha_ndbcluster.h
--- 1.194/sql/handler.cc 2005-11-24 09:59:02 +01:00
+++ 1.195/sql/handler.cc 2005-12-15 09:21:04 +01:00
@@ -2285,40 +2285,111 @@
#ifdef HAVE_ROW_BASED_REPLICATION
int ha_reset_logs(THD *thd)
{
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_reset_logs(thd);
-#endif
- return 0;
+ bool result= 0;
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->reset_logs)
+ {
+ if ((*types)->reset_logs(thd))
+ result= 1;
+ }
+ }
+ return result;
}
int ha_binlog_index_purge_file(THD *thd, const char *file)
{
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_binlog_index_purge_file(thd, file);
-#endif
- return 0;
+ bool result= 0;
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->binlog_index_purge_file)
+ {
+ if ((*types)->binlog_index_purge_file(thd, file))
+ result= 1;
+ }
+ }
+ return result;
}
void ha_reset_slave(THD* thd)
{
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_reset_slave(thd);
-#endif
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->reset_slave)
+ {
+ (*types)->reset_slave(thd);
+ }
+ }
}
void ha_binlog_wait(THD* thd)
{
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_binlog_wait(thd);
-#endif
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->binlog_wait)
+ {
+ (*types)->binlog_wait(thd);
+ }
+ }
}
int ha_binlog_end(THD* thd)
{
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_binlog_end();
-#endif
- return 0;
+ bool result= 0;
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->binlog_end)
+ {
+ if ((*types)->binlog_end(thd))
+ result= 1;
+ }
+ }
+ return result;
+}
+
+int ha_create_database(THD *thd, const char *db, const char *query,
+ int query_length)
+{
+ bool result= 0;
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->create_database)
+ {
+ fprintf(stderr, "%s\n", (*types)->name);
+ if ((*types)->create_database(thd, db, query, query_length))
+ result= 1;
+ }
+ }
+ return result;
+}
+
+int ha_alter_database(THD *thd, const char *db, const char *query,
+ int query_length)
+{
+ bool result= 0;
+ handlerton **types;
+ for (types= sys_table_types; *types; types++)
+ {
+ if ((*types)->state == SHOW_OPTION_YES &&
+ (*types)->alter_database)
+ {
+ if ((*types)->alter_database(thd, db, query, query_length))
+ result = 1;
+ }
+ }
+ return result;
}
#endif
--- 1.169/sql/handler.h 2005-11-24 09:59:02 +01:00
+++ 1.170/sql/handler.h 2005-12-15 09:21:04 +01:00
@@ -429,6 +429,15 @@
int (*repl_report_sent_binlog)(THD *thd, char *log_file_name,
my_off_t end_offset);
uint32 flags; /* global handler flags */
+ int (*reset_logs)(THD *thd);
+ int (*binlog_index_purge_file)(THD *thd, const char *file);
+ void (*reset_slave)(THD *thd);
+ void (*binlog_wait)(THD *thd);
+ int (*binlog_end)(THD *thd);
+ int (*create_database)(THD *thd, const char *db, const char *query,
+ int query_length);
+ int (*alter_database)(THD *thd, const char *db, const char *query,
+ int query_length);
} handlerton;
struct show_table_alias_st {
@@ -1523,10 +1532,16 @@
void ha_reset_slave(THD *thd);
void ha_binlog_wait(THD *thd);
int ha_binlog_end(THD *thd);
+int ha_create_database(THD *thd, const char *db, const char *query,
+ int query_length);
+int ha_alter_database(THD *thd, const char *db, const char *query,
+ int query_length);
#else
#define ha_reset_logs(a) 0
#define ha_binlog_index_purge_file(a,b) 0
#define ha_reset_slave(a)
#define ha_binlog_wait(a)
#define ha_binlog_end(a) 0
+#define ha_create_database(a,b,c,d) 0
+#define ha_alter_database(a,b,c,d) 0
#endif
--- 1.120/sql/sql_db.cc 2005-11-11 19:01:38 +01:00
+++ 1.121/sql/sql_db.cc 2005-12-15 09:21:04 +01:00
@@ -401,6 +401,7 @@
bool silent)
{
char path[FN_REFLEN+16];
+ char tmp_query[FN_REFLEN+16];
long result= 1;
int error= 0;
MY_STAT stat_info;
@@ -487,15 +488,18 @@
if (!thd->query) // Only in replication
{
- query= path;
- query_length= (uint) (strxmov(path,"create database `", db, "`", NullS) -
- path);
+ query= tmp_query;
+ query_length= (uint) (strxmov(tmp_query,"create database `",
+ db, "`", NullS) - tmp_query);
}
else
{
query= thd->query;
query_length= thd->query_length;
}
+
+ ha_create_database(thd, db, query, query_length);
+
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, query, query_length, 0,
@@ -566,6 +570,8 @@
thd->variables.collation_server;
thd->variables.collation_database= thd->db_charset;
}
+
+ ha_alter_database(thd, db, thd->query, thd->query_length);
if (mysql_bin_log.is_open())
{
--- 1.10/config/ac-macros/ha_ndbcluster.m4 2005-11-07 16:24:33 +01:00
+++ 1.11/config/ac-macros/ha_ndbcluster.m4 2005-12-15 09:21:04 +01:00
@@ -185,6 +185,7 @@
ndbcluster_libs="\$(top_builddir)/storage/ndb/src/.libs/libndbclient.a"
ndbcluster_system_libs=""
ndb_mgmclient_libs="\$(top_builddir)/storage/ndb/src/mgmclient/libndbmgmclient.la"
+ mysql_se_objs="$mysql_se_objs ha_ndbcluster_binlog.o"
MYSQL_CHECK_NDB_OPTIONS
NDBCLUSTER_WORKAROUNDS
--- 1.15/mysql-test/t/disabled.def 2005-11-24 09:59:01 +01:00
+++ 1.16/mysql-test/t/disabled.def 2005-12-15 09:21:04 +01:00
@@ -23,4 +23,3 @@
rpl_row_NOW:Bug 12574
rpl_bit_npk:Bug 13418
compress : Magnus will fix
-rpl_server_id2:fails in wl2325
--- 1.5/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-09-15 10:42:02 +02:00
+++ 1.6/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-12-15 09:21:04 +01:00
@@ -304,7 +304,7 @@
Uint32 tableId;
Uint8 operation;
Uint8 req_nodeid;
- Uint8 not_used2;
+ Uint8 ndbd_nodeid;
Uint8 not_used3;
Uint32 logType;
};
--- 1.57/storage/ndb/include/ndbapi/NdbDictionary.hpp 2005-11-07 12:19:05 +01:00
+++ 1.58/storage/ndb/include/ndbapi/NdbDictionary.hpp 2005-12-15 09:21:05 +01:00
@@ -1012,6 +1012,9 @@
TE_GCP_COMPLETE=1<<7, ///< GCP is complete
TE_CLUSTER_FAILURE=1<<8, ///< Cluster is unavailable
TE_STOP =1<<9, ///< Stop of event operation
+ TE_NODE_FAILURE=1<<10, ///< Node failed
+ TE_SUBSCRIBE =1<<11, ///< Node subscribes
+ TE_UNSUBSCRIBE =1<<12, ///< Node unsubscribes
TE_ALL=0xFFFF ///< Any/all event on table (not relevant when
///< events are received)
};
@@ -1027,7 +1030,10 @@
_TE_CREATE=6,
_TE_GCP_COMPLETE=7,
_TE_CLUSTER_FAILURE=8,
- _TE_STOP=9
+ _TE_STOP=9,
+ _TE_NODE_FAILURE=10,
+ _TE_SUBSCRIBE=11,
+ _TE_UNSUBSCRIBE=12
};
#endif
/**
--- 1.14/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2005-09-15 10:42:02 +02:00
+++ 1.15/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2005-12-15 09:21:05 +01:00
@@ -204,6 +204,7 @@
int hasError() const;
int getReqNodeId() const;
+ int getNdbdNodeId() const;
#endif
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
--- 1.28/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2005-11-07 12:19:08 +01:00
+++ 1.29/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2005-12-15 09:21:05 +01:00
@@ -1070,6 +1070,7 @@
subPtr.p->m_subscriptionId = subId;
subPtr.p->m_subscriptionKey = subKey;
subPtr.p->m_subscriptionType = type;
+ subPtr.p->m_options = Subscription::REPORT_SUBSCRIBE;
subPtr.p->m_tableId = tableId;
subPtr.p->m_table_ptrI = RNIL;
subPtr.p->m_state = Subscription::DEFINED;
@@ -2197,6 +2198,10 @@
subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
SubStartConf::SignalLength, JBB);
+
+ reportAllSubscribers(signal, NdbDictionary::Event::_TE_SUBSCRIBE,
+ subPtr, subbPtr);
+
DBUG_VOID_RETURN;
}
@@ -2469,7 +2474,51 @@
DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
c_subscriberPool.getSize(),
c_subscriberPool.getNoOfFree()));
+
+ reportAllSubscribers(signal, NdbDictionary::Event::_TE_UNSUBSCRIBE,
+ subPtr, subbPtr);
+
DBUG_VOID_RETURN;
+}
+
+// report new started subscriber to all other subscribers
+void
+Suma::reportAllSubscribers(Signal *signal,
+ NdbDictionary::Event::_TableEvent table_event,
+ SubscriptionPtr subPtr,
+ SubscriberPtr subbPtr)
+{
+ if (subPtr.p->n_subscribers == 0)
+ {
+ ndbrequire(table_event != NdbDictionary::Event::_TE_SUBSCRIBE);
+ return;
+ }
+ if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
+ {
+ return;
+ }
+
+ SubTableData * data = (SubTableData*)signal->getDataPtrSend();
+ data->gci = m_last_complete_gci + 1;
+ data->tableId = subPtr.p->m_tableId;
+ data->operation = table_event;
+ data->logType = 0;
+ data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
+ data->ndbd_nodeid = refToNode(reference());
+
+ TablePtr tabPtr;
+ c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+ LocalDLList<Subscriber> subbs(c_subscriberPool, tabPtr.p->c_subscribers);
+ SubscriberPtr i_subbPtr;
+ for(subbs.first(i_subbPtr); !i_subbPtr.isNull(); subbs.next(i_subbPtr))
+ {
+ if (i_subbPtr.p->m_subPtrI == subPtr.i)
+ {
+ data->senderData = i_subbPtr.p->m_senderData;
+ sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength, JBB);
+ }
+ }
}
void
--- 1.7/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2005-11-06 13:26:48 +01:00
+++ 1.8/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2005-12-15 09:21:05 +01:00
@@ -33,6 +33,7 @@
#include <signaldata/UtilSequence.hpp>
#include <signaldata/SumaImpl.hpp>
+#include <ndbapi/NdbDictionary.hpp>
class Suma : public SimulatedBlock {
BLOCK_DEFINES(Suma);
@@ -288,8 +289,12 @@
Uint32 m_senderData;
Uint32 m_subscriptionId;
Uint32 m_subscriptionKey;
- Uint32 m_subscriptionType;
+ Uint16 m_subscriptionType;
+ Uint16 m_options;
+ enum Options {
+ REPORT_SUBSCRIBE= 1
+ };
enum State {
UNDEFINED,
LOCKED,
@@ -367,6 +372,11 @@
void sendSubStopReq(Signal* signal, bool unlock= false);
void completeSubRemove(SubscriptionPtr subPtr);
+
+ void reportAllSubscribers(Signal *signal,
+ NdbDictionary::Event::_TableEvent table_event,
+ SubscriptionPtr subPtr,
+ SubscriberPtr subbPtr);
Uint32 getFirstGCI(Signal* signal);
--- 1.59/storage/ndb/src/ndbapi/Ndb.cpp 2005-11-07 12:19:08 +01:00
+++ 1.60/storage/ndb/src/ndbapi/Ndb.cpp 2005-12-15 09:21:05 +01:00
@@ -1305,7 +1305,12 @@
void Ndb::setReportThreshEventFreeMem(unsigned thresh)
{
- theEventBuffer->m_free_thresh= thresh;
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+ theEventBuffer->m_free_thresh= thresh;
+ theEventBuffer->m_min_free_thresh= thresh;
+ theEventBuffer->m_max_free_thresh= 100;
+ }
}
#ifdef VM_TRACE
--- 1.6/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2005-09-15 10:42:04 +02:00
+++ 1.7/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2005-12-15 09:21:05 +01:00
@@ -157,6 +157,11 @@
return m_impl.m_data_item->sdata->req_nodeid;
}
+int NdbEventOperation::getNdbdNodeId() const
+{
+ return m_impl.m_data_item->sdata->ndbd_nodeid;
+}
+
/*
* Private members
*/
--- 1.23/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-11-10 09:24:20 +01:00
+++ 1.24/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-12-15 09:21:05 +01:00
@@ -268,6 +268,7 @@
m_state= EO_ERROR;
mi_type= 0;
m_magic_number= 0;
+ m_error.code= myDict->getNdbError().code;
m_ndb->theEventBuffer->remove_op();
m_ndb->theEventBuffer->add_drop_unlock();
DBUG_RETURN(r);
@@ -539,6 +540,8 @@
m_latestGCI(0),
m_total_alloc(0),
m_free_thresh(10),
+ m_min_free_thresh(10),
+ m_max_free_thresh(100),
m_gci_slip_thresh(3),
m_dropped_ev_op(0),
m_active_op_count(0)
@@ -635,7 +638,6 @@
EventBufData_chunk *chunk_data=
(EventBufData_chunk *)NdbMem_Allocate(alloc_size);
- m_total_alloc+= alloc_size;
chunk_data->sz= sz;
m_allocated_data.push_back(chunk_data);
@@ -670,7 +672,7 @@
NdbMutex_Lock(m_mutex);
NdbEventOperationImpl *ev_op= move_data();
- if (unlikely(ev_op == 0))
+ if (unlikely(ev_op == 0 && aMillisecondNumber))
{
NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
ev_op= move_data();
@@ -902,8 +904,8 @@
assert(bucket->m_data.m_count);
#endif
m_complete_data.m_data.append(bucket->m_data);
- reportStatus();
}
+ reportStatus();
bzero(bucket, sizeof(Gci_container));
bucket->m_gci = gci + ACTIVE_GCI_DIRECTORY_SIZE;
bucket->m_gcp_complete_rep_count = m_system_nodes;
@@ -994,6 +996,33 @@
}
void
+NdbEventBuffer::report_node_failure(Uint32 node_id)
+{
+ DBUG_ENTER("NdbEventBuffer::report_node_failure");
+ SubTableData data;
+ LinearSectionPtr ptr[3];
+ bzero(&data, sizeof(data));
+ bzero(ptr, sizeof(ptr));
+
+ data.tableId = ~0;
+ data.operation = NdbDictionary::Event::_TE_NODE_FAILURE;
+ data.req_nodeid = (Uint8)node_id;
+ data.ndbd_nodeid = (Uint8)node_id;
+ data.logType = SubTableData::LOG;
+ /**
+ * Insert this event for each operation
+ */
+ NdbEventOperation* op= 0;
+ while((op = m_ndb->getEventOperation(op)))
+ {
+ NdbEventOperationImpl* impl= &op->m_impl;
+ data.senderData = impl->m_oid;
+ insertDataL(impl, &data, ptr);
+ }
+ DBUG_VOID_RETURN;
+}
+
+void
NdbEventBuffer::completeClusterFailed()
{
DBUG_ENTER("NdbEventBuffer::completeClusterFailed");
@@ -1356,23 +1385,61 @@
else
apply_gci= latest_gci;
- if (100*m_free_data_sz < m_free_thresh*m_total_alloc ||
- latest_gci-apply_gci >= m_gci_slip_thresh)
+ if (100*m_free_data_sz < m_min_free_thresh*m_total_alloc &&
+ m_total_alloc > 1024*1024)
{
- Uint32 data[8];
- data[0]= NDB_LE_EventBufferStatus;
- data[1]= m_total_alloc-m_free_data_sz;
- data[2]= m_total_alloc;
- data[3]= 0;
- data[4]= apply_gci & ~(Uint32)0;
- data[5]= apply_gci >> 32;
- data[6]= latest_gci & ~(Uint32)0;
- data[7]= latest_gci >> 32;
- m_ndb->theImpl->send_event_report(data,8);
+ /* report less free buffer than m_free_thresh,
+ next report when more free than 2 * m_free_thresh
+ */
+ fprintf(stderr, "line: %d, m_free_data_sz %d, m_min_free_thresh %d, "
+ "m_max_free_thresh %d, m_total_alloc %d, m_free_thresh %d\n",
+ __LINE__, m_free_data_sz, m_min_free_thresh,
+ m_max_free_thresh, m_total_alloc, m_free_thresh);
+ m_min_free_thresh= 0;
+ m_max_free_thresh= 2 * m_free_thresh;
+ goto send_report;
+ }
+
+ if (100*m_free_data_sz > m_max_free_thresh*m_total_alloc &&
+ m_total_alloc > 1024*1024)
+ {
+ /* report more free than 2 * m_free_thresh
+ next report when less free than m_free_thresh
+ */
+ fprintf(stderr, "line: %d, m_free_data_sz %d, m_min_free_thresh %d, "
+ "m_max_free_thresh %d, m_total_alloc %d, m_free_thresh %d\n",
+ __LINE__, m_free_data_sz, m_min_free_thresh,
+ m_max_free_thresh, m_total_alloc, m_free_thresh);
+ m_min_free_thresh= m_free_thresh;
+ m_max_free_thresh= 100;
+ goto send_report;
+ }
+
+ if (latest_gci-apply_gci >= m_gci_slip_thresh)
+ {
+ fprintf(stderr, "line: %d, m_free_data_sz %d, m_min_free_thresh %d, "
+ "m_max_free_thresh %d, m_total_alloc %d, m_free_thresh %d\n",
+ __LINE__, m_free_data_sz, m_min_free_thresh,
+ m_max_free_thresh, m_total_alloc, m_free_thresh);
+ fprintf(stderr, "%lld %lld %d\n", latest_gci, apply_gci, m_gci_slip_thresh);
+ goto send_report;
+ }
+ return;
+
+send_report:
+ Uint32 data[8];
+ data[0]= NDB_LE_EventBufferStatus;
+ data[1]= m_total_alloc-m_free_data_sz;
+ data[2]= m_total_alloc;
+ data[3]= 0;
+ data[4]= apply_gci & ~(Uint32)0;
+ data[5]= apply_gci >> 32;
+ data[6]= latest_gci & ~(Uint32)0;
+ data[7]= latest_gci >> 32;
+ m_ndb->theImpl->send_event_report(data,8);
#ifdef VM_TRACE
- assert(m_total_alloc >= m_free_data_sz);
+ assert(m_total_alloc >= m_free_data_sz);
#endif
- }
}
template class Vector<Gci_container>;
--- 1.7/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-09-15 10:42:04 +02:00
+++ 1.8/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-12-15 09:21:05 +01:00
@@ -221,7 +221,7 @@
void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
void complete_outof_order_gcis();
- void reportClusterFailed(NdbEventOperationImpl *op);
+ void report_node_failure(Uint32 node_id);
void completeClusterFailed();
// used by user thread
@@ -271,7 +271,7 @@
unsigned m_total_alloc; // total allocated memory
// threshholds to report status
- unsigned m_free_thresh;
+ unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
unsigned m_gci_slip_thresh;
NdbError m_error;
--- 1.34/storage/ndb/src/ndbapi/Ndbif.cpp 2005-11-07 12:19:09 +01:00
+++ 1.35/storage/ndb/src/ndbapi/Ndbif.cpp 2005-12-15 09:21:05 +01:00
@@ -253,12 +253,17 @@
void
Ndb::report_node_failure_completed(Uint32 node_id)
{
- if (theEventBuffer &&
- !TransporterFacade::instance()->theClusterMgr->isClusterAlive())
+ if (theEventBuffer)
{
- // cluster is unavailable,
+ // node failed
// eventOperations in the ndb object should be notified
- theEventBuffer->completeClusterFailed();
+ theEventBuffer->report_node_failure(node_id);
+ if(!TransporterFacade::instance()->theClusterMgr->isClusterAlive())
+ {
+ // cluster is unavailable,
+ // eventOperations in the ndb object should be notified
+ theEventBuffer->completeClusterFailed();
+ }
}
abortTransactionsAfterNodeFailure(node_id);
--- 1.10/storage/ndb/test/ndbapi/bank/BankLoad.cpp 2005-10-06 10:26:09 +02:00
+++ 1.11/storage/ndb/test/ndbapi/bank/BankLoad.cpp 2005-12-15 09:21:05 +01:00
@@ -23,7 +23,7 @@
*/
struct AccountTypesStruct {
int id;
- const char* descr;
+ const char descr[64];
};
const AccountTypesStruct accountTypes[] = {
{ 0, "KASSA"},
--- 1.4/storage/ndb/test/run-test/make-config.sh 2005-06-13 08:04:07 +02:00
+++ 1.5/storage/ndb/test/run-test/make-config.sh 2005-12-15 09:21:05 +01:00
@@ -44,7 +44,7 @@
;;
mysqld)
echo "$proc_no.mysqld" >> $dir_file
- echo "[ndb_mgmd]" >> $config_file
+ echo "[mysqld]" >> $config_file
echo "Id: $node_id" >> $config_file
echo "HostName: $2" >> $config_file
node_id=`expr $node_id + 1`
--- 1.217/sql/ha_ndbcluster.cc 2005-11-24 09:59:02 +01:00
+++ 1.218/sql/ha_ndbcluster.cc 2005-12-15 09:21:04 +01:00
@@ -33,10 +33,7 @@
#include <../util/Bitmask.hpp>
#include <ndbapi/NdbIndexStat.hpp>
-#ifdef HAVE_NDB_BINLOG
-#include "rpl_injector.h"
-#include "slave.h"
-#endif
+#include "ha_ndbcluster_binlog.h"
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
@@ -55,13 +52,9 @@
// createable against NDB from this handler
static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
-static const char *ha_ndb_ext=".ndb";
-static const char share_prefix[]= "./";
-
-static int ndbcluster_close_connection(THD *thd);
-static int ndbcluster_commit(THD *thd, bool all);
-static int ndbcluster_rollback(THD *thd, bool all);
-static handler* ndbcluster_create_handler(TABLE *table);
+static bool ndbcluster_init(void);
+static int ndbcluster_end(ha_panic_function flag);
+static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
handlerton ndbcluster_hton = {
"ndbcluster",
@@ -69,31 +62,7 @@
"Clustered, fault-tolerant, memory-based tables",
DB_TYPE_NDBCLUSTER,
ndbcluster_init,
- 0, /* slot */
- 0, /* savepoint size */
- ndbcluster_close_connection,
- NULL, /* savepoint_set */
- NULL, /* savepoint_rollback */
- NULL, /* savepoint_release */
- ndbcluster_commit,
- ndbcluster_rollback,
- NULL, /* prepare */
- NULL, /* recover */
- NULL, /* commit_by_xid */
- NULL, /* rollback_by_xid */
- NULL, /* create_cursor_read_view */
- NULL, /* set_cursor_read_view */
- NULL, /* close_cursor_read_view */
- ndbcluster_create_handler, /* Create a new handler */
- ndbcluster_drop_database, /* Drop a database */
- ndbcluster_end, /* Panic call */
- NULL, /* Release temporary latches */
- NULL, /* Update Statistics */
- NULL, /* Start Consistent Snapshot */
- NULL, /* Flush logs */
- ndbcluster_show_status, /* Show status */
- NULL, /* Replication Report Sent Binlog */
- HTON_NO_FLAGS
+ ~(uint)0, /* slot */
};
static handler *ndbcluster_create_handler(TABLE *table)
@@ -124,38 +93,24 @@
break; \
}
-// Typedefs for long names
-typedef NdbDictionary::Object NDBOBJ;
-typedef NdbDictionary::Column NDBCOL;
-typedef NdbDictionary::Table NDBTAB;
-typedef NdbDictionary::Index NDBINDEX;
-typedef NdbDictionary::Dictionary NDBDICT;
-typedef NdbDictionary::Event NDBEVENT;
-
static int ndbcluster_inited= 0;
-static int ndbcluster_util_inited= 0;
+int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
-static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+unsigned char g_node_id_map[max_ndb_nodes];
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
// Table lock handling
-static HASH ndbcluster_open_tables;
+HASH ndbcluster_open_tables;
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
-static NDB_SHARE *get_share(const char *key,
- bool create_if_not_exists= TRUE,
- bool have_lock= FALSE);
#ifdef HAVE_NDB_BINLOG
-/* you should have lock on ndbcluster_mutex when calling */
-static int handle_trailing_share(NDB_SHARE *share);
static int rename_share(NDB_SHARE *share, const char *new_key);
#endif
-static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
-static void real_free_share(NDB_SHARE **share);
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
@@ -165,35 +120,9 @@
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
-#ifndef DBUG_OFF
-void print_records(TABLE *table, const char *record)
-{
- if (_db_on_)
- {
- for (uint j= 0; j < table->s->fields; j++)
- {
- char buf[40];
- int pos= 0;
- Field *field= table->field[j];
- const byte* field_ptr= field->ptr - table->record[0] + record;
- int pack_len= field->pack_length();
- int n= pack_len < 10 ? pack_len : 10;
-
- for (int i= 0; i < n && pos < 20; i++)
- {
- pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
- }
- buf[pos]= 0;
- DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
- }
- }
-}
-#else
-#define print_records(a,b)
-#endif
// Util thread variables
-static pthread_t ndb_util_thread;
+pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread;
pthread_handler_t ndb_util_thread_func(void *arg);
@@ -205,75 +134,6 @@
*/
static uint32 dummy_buf;
-#ifdef HAVE_NDB_BINLOG
-#define INJECTOR_EVENT_LEN 200
-/* NDB Injector thread (used for binlog creation) */
-ulong ndb_report_thresh_binlog_epoch_slip;
-ulong ndb_report_thresh_binlog_mem_usage;
-static ulonglong ndb_latest_applied_binlog_epoch= 0;
-static ulonglong ndb_latest_handled_binlog_epoch= 0;
-static ulonglong ndb_latest_received_binlog_epoch= 0;
-static pthread_t ndb_binlog_thread;
-static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
- const char *db,
- const char *table_name,
- bool do_binlog,
- NDB_SHARE *share= 0);
-static int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
- const char *event_name);
-static int ndbcluster_create_event_ops(NDB_SHARE *share,
- const NDBTAB *ndbtab,
- const char *event_name);
-static int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
- NDB_SHARE *share);
-static void ndb_rep_event_name(String *event_name,
- const char *db, const char *tbl);
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table);
-#endif
-static int ndbcluster_binlog_start();
-pthread_handler_t ndb_binlog_thread_func(void *arg);
-
-/*
- Mutex and condition used for interacting between client sql thread
- and injector thread
-*/
-pthread_mutex_t injector_mutex;
-pthread_cond_t injector_cond;
-/*
- Flag showing if the ndb injector thread is running, if so == 1
-*/
-static int ndb_binlog_thread_running= 0;
-
-/*
- table cluster_replication.apply_status
-*/
-static int ndbcluster_create_apply_status_table(THD *thd);
-static NDB_SHARE *ndbcluster_check_apply_status_share();
-static NDB_SHARE *ndbcluster_get_apply_status_share();
-static NDB_SHARE *apply_status_share= 0;
-
-/*
- Global reference to the ndb injector thread THD oject
-
- Has one sole purpose, for setting the in_use table member variable
- in get_share(...)
-*/
-static THD *injector_thd= 0;
-
-/*
- Global reference to ndb injector thd object.
-
- Used mainly by the binlog index thread, but exposed to the client sql
- thread for one reason; to setup the events operations for a table
- to enable ndb injector thread receiving events.
-
- Must therefore always be used with a surrounding
- pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
-*/
-static Ndb *injector_ndb= 0;
-#endif /* HAVE_NDB_BINLOG */
-
/*
Stats that can be retrieved from ndb
*/
@@ -291,7 +151,7 @@
static const char * ndb_connected_host= 0;
static long ndb_connected_port= 0;
static long ndb_number_of_replicas= 0;
-static long ndb_number_of_storage_nodes= 0;
+long ndb_number_of_storage_nodes= 0;
static int update_status_variables(Ndb_cluster_connection *c)
{
@@ -312,9 +172,6 @@
{NullS, NullS, SHOW_LONG}
};
-/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
-extern Uint64 g_latest_trans_gci;
-
/*
Error handling functions
*/
@@ -442,6 +299,7 @@
all= NULL;
stmt= NULL;
error= 0;
+ options= 0;
}
Thd_ndb::~Thd_ndb()
@@ -453,14 +311,6 @@
}
inline
-Thd_ndb *
-get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
-
-inline
-void
-set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; }
-
-inline
Ndb *ha_ndbcluster::get_ndb()
{
return get_thd_ndb(current_thd)->ndb;
@@ -2575,8 +2425,8 @@
set to null.
*/
-static void ndb_unpack_record(TABLE *table, NdbValue *value,
- MY_BITMAP *defined, byte *buf)
+void ndb_unpack_record(TABLE *table, NdbValue *value,
+ MY_BITMAP *defined, byte *buf)
{
Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
@@ -3624,7 +3474,7 @@
Commit a transaction started in NDB
*/
-int ndbcluster_commit(THD *thd, bool all)
+static int ndbcluster_commit(THD *thd, bool all)
{
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3675,7 +3525,7 @@
Rollback a transaction started in NDB
*/
-int ndbcluster_rollback(THD *thd, bool all)
+static int ndbcluster_rollback(THD *thd, bool all)
{
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -4221,6 +4071,12 @@
sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
event_name.c_ptr());
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_CREATE_TABLE);
+
if (share && ndb_binlog_thread_running > 0 &&
ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
{
@@ -4367,12 +4223,14 @@
}
#ifdef HAVE_NDB_BINLOG
+ int is_old_table_tmpfile= 1;
if (share && share->op)
dict->forceGCPWait();
/* handle old table */
if (!is_prefix(m_tabname, tmp_file_prefix))
{
+ is_old_table_tmpfile= 0;
String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
@@ -4380,6 +4238,19 @@
if (!result && !is_prefix(new_tabname, tmp_file_prefix))
{
+ if (is_old_table_tmpfile)
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_ALTER_TABLE);
+ else
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_RENAME_TABLE);
+
/* always create an event for the table */
String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0);
@@ -4506,6 +4377,15 @@
*/
int table_dropped= dict->getNdbError().code != 709;
+ if (!is_prefix(table_name, tmp_file_prefix))
+ {
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_DROP_TABLE);
+ }
+
if (table_dropped && share && share->op)
dict->forceGCPWait();
@@ -4853,7 +4733,7 @@
}
-int ndbcluster_close_connection(THD *thd)
+static int ndbcluster_close_connection(THD *thd)
{
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ENTER("ndbcluster_close_connection");
@@ -5015,14 +4895,21 @@
DBUG_RETURN(ret);
}
-void ndbcluster_drop_database(char *path)
+static void ndbcluster_drop_database(char *path)
{
ndbcluster_drop_database_impl(path);
+#ifdef HAVE_NDB_BINLOG
+ char db[FN_REFLEN];
+ ha_ndbcluster::set_dbname(path, db);
+ ndbcluster_log_schema_op(current_thd, 0,
+ current_thd->query, current_thd->query_length,
+ db, "", 0, 0, SOT_DROP_DB);
+#endif
}
/*
find all tables in ndb and discover those needed
*/
-static int ndbcluster_find_all_files(THD *thd)
+int ndbcluster_find_all_files(THD *thd)
{
DBUG_ENTER("ndbcluster_find_all_files");
Ndb* ndb;
@@ -5057,10 +4944,11 @@
if (!(ndbtab= dict->getTable(elmt.name)))
{
- sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
- elmt.database, elmt.name,
- dict->getNdbError().code,
- dict->getNdbError().message);
+ if (elmt.state == NDBOBJ::StateOnline)
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ elmt.database, elmt.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
unhandled++;
continue;
}
@@ -5337,11 +5225,18 @@
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
+
+ uint node_id, i= 0;
+ Ndb_cluster_connection_node_iter node_iter;
+ memset((void *)g_node_id_map, 0xFFFF, sizeof(g_node_id_map));
+ while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
+ g_node_id_map[node_id]= i++;
+
pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
-bool ndbcluster_init()
+static bool ndbcluster_init()
{
int res;
DBUG_ENTER("ndbcluster_init");
@@ -5349,6 +5244,21 @@
if (have_ndbcluster != SHOW_OPTION_YES)
goto ndbcluster_init_error;
+ {
+ handlerton &h= ndbcluster_hton;
+ h.close_connection= ndbcluster_close_connection;
+ h.commit= ndbcluster_commit;
+ h.rollback= ndbcluster_rollback;
+ h.create= ndbcluster_create_handler; /* Create a new handler */
+ h.drop_database= ndbcluster_drop_database; /* Drop a database */
+ h.panic= ndbcluster_end; /* Panic call */
+ h.show_status= ndbcluster_show_status; /* Show status */
+#ifdef HAVE_NDB_BINLOG
+ ndbcluster_binlog_init_handlerton();
+#endif
+ h.flags= HTON_NO_FLAGS;
+ }
+
// Set connectstring if specified
if (opt_ndbcluster_connectstring != 0)
DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
@@ -5458,72 +5368,7 @@
DBUG_RETURN(TRUE);
}
-
-/*
- End use of the NDB Cluster table handler
- - free all global variables allocated by
- ndbcluster_init()
-*/
-
-int ndbcluster_binlog_end()
-{
- DBUG_ENTER("ndb_binlog_end");
-
- if (!ndbcluster_util_inited)
- DBUG_RETURN(0);
-
- // Kill ndb utility thread
- (void) pthread_mutex_lock(&LOCK_ndb_util_thread);
- DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
- (void) pthread_cond_signal(&COND_ndb_util_thread);
- (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
-#ifdef HAVE_NDB_BINLOG
- /* wait for injector thread to finish */
- if (ndb_binlog_thread_running > 0)
- {
- pthread_mutex_lock(&injector_mutex);
- while (ndb_binlog_thread_running > 0)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
- }
- pthread_mutex_unlock(&injector_mutex);
- }
-
- /* remove all shares */
- {
- pthread_mutex_lock(&ndbcluster_mutex);
- for (uint i= 0; i < ndbcluster_open_tables.records; i++)
- {
- NDB_SHARE *share=
- (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
- if (share->table)
- DBUG_PRINT("share",
- ("table->s->db.table_name: %s.%s",
- share->table->s->db, share->table->s->table_name));
- if (share->state != NSS_DROPPED && !--share->use_count)
- real_free_share(&share);
- else
- {
- DBUG_PRINT("share",
- ("[%d] 0x%lx key: %s key_length: %d",
- i, share, share->key, share->key_length));
- DBUG_PRINT("share",
- ("db.tablename: %s.%s use_count: %d commit_count: %d",
- share->db, share->table_name,
- share->use_count, share->commit_count));
- }
- }
- pthread_mutex_unlock(&ndbcluster_mutex);
- }
-#endif
- ndbcluster_util_inited= 0;
- DBUG_RETURN(0);
-}
-
-int ndbcluster_end(ha_panic_function type)
+static int ndbcluster_end(ha_panic_function type)
{
DBUG_ENTER("ndbcluster_end");
@@ -6006,60 +5851,6 @@
}
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table)
-{
- if (table == 0)
- {
- DBUG_PRINT("info",("%s: (null)", info));
- return;
- }
- DBUG_PRINT("info",
- ("%s: %s.%s s->fields: %d "
- "reclength: %d rec_buff_length: %d record[0]: %lx "
- "record[1]: %lx",
- info,
- table->s->db,
- table->s->table_name,
- table->s->fields,
- table->s->reclength,
- table->s->rec_buff_length,
- table->record[0],
- table->record[1]));
-
- for (unsigned int i= 0; i < table->s->fields; i++)
- {
- Field *f= table->field[i];
- DBUG_PRINT("info",
- ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
- "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
- i,
- f->field_name,
- f->flags,
- (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
- (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
- (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
- (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
- (f->flags & BLOB_FLAG) ? ",blob" : "",
- (f->flags & BINARY_FLAG) ? ",binary" : "",
- f->real_type(),
- f->pack_length(),
- f->ptr, f->ptr - table->record[0],
- f->null_bit,
- f->null_ptr, (byte*) f->null_ptr - table->record[0]));
- if (f->type() == MYSQL_TYPE_BIT)
- {
- Field_bit *g= (Field_bit*) f;
- DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
- "bit_ofs: %u bit_len: %u",
- g->field_length, g->bit_ptr,
- (byte*) g->bit_ptr-table->record[0],
- g->bit_ofs, g->bit_len));
- }
- }
-}
-#endif
-
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
@@ -6113,7 +5904,7 @@
Must be called with previous pthread_mutex_lock(&ndbcluster_mutex)
*/
-static int handle_trailing_share(NDB_SHARE *share)
+int handle_trailing_share(NDB_SHARE *share)
{
static ulong trailing_share_id= 0;
DBUG_ENTER("handle_trailing_share");
@@ -6263,7 +6054,7 @@
Increase refcount on existing share.
Always returns share and cannot fail.
*/
-static NDB_SHARE *get_share(NDB_SHARE *share)
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share)
{
pthread_mutex_lock(&ndbcluster_mutex);
share->use_count++;
@@ -6295,8 +6086,8 @@
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
-static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
- bool have_lock)
+NDB_SHARE *ndbcluster_get_share(const char *key, bool create_if_not_exists,
+ bool have_lock)
{
NDB_SHARE *share;
if (!have_lock)
@@ -6344,48 +6135,7 @@
share->table_name= share->db + strlen(share->db) + 1;
ha_ndbcluster::set_tabname(key, share->table_name);
#ifdef HAVE_NDB_BINLOG
- share->op= 0;
- share->table= 0;
- while (ndb_binlog_thread_running > 0)
- {
- TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
- int r;
- if ((r= openfrm(current_thd, share->key, "", 0, (uint) READ_ALL,
- 0, table)))
- {
- sql_print_error("Unable to open frm for %s, frmerror=%d",
- share->key, r);
- DBUG_PRINT("error", ("openfrm failed %d", r));
- my_free((gptr) table, MYF(0));
- table= 0;
- break;
- }
- if (!table->record[1] || table->record[1] == table->record[0])
- {
- table->record[1]= alloc_root(&table->mem_root,
- table->s->rec_buff_length);
- }
- table->in_use= injector_thd;
-
- table->s->db= share->db;
- table->s->table_name= share->table_name;
-
- share->table= table;
-#ifndef DBUG_OFF
- dbug_print_table("table", table);
-#endif
- /*
- ! do not touch the contents of the table
- it may be in use by the injector thread
- */
- share->ndb_value[0]= (NdbValue*)
- alloc_root(*root_ptr, sizeof(NdbValue) * table->s->fields
- + 1 /*extra for hidden key*/);
- share->ndb_value[1]= (NdbValue*)
- alloc_root(*root_ptr, sizeof(NdbValue) * table->s->fields
- +1 /*extra for hidden key*/);
- break;
- }
+ ndbcluster_binlog_init_share(share);
#endif
*root_ptr= old_root;
}
@@ -6414,7 +6164,7 @@
return share;
}
-static void real_free_share(NDB_SHARE **share)
+void ndbcluster_real_free_share(NDB_SHARE **share)
{
DBUG_PRINT("real_free_share",
("0x%lx key: %s key_length: %d",
@@ -6458,7 +6208,7 @@
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
-static void free_share(NDB_SHARE **share, bool have_lock)
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
{
if (!have_lock)
pthread_mutex_lock(&ndbcluster_mutex);
@@ -6484,7 +6234,6 @@
}
-
/*
Internal representation of the frm blob
@@ -7158,7 +6907,7 @@
Wait for cluster to start
*/
pthread_mutex_lock(&LOCK_ndb_util_thread);
- while (!ndb_cluster_node_id)
+ while (!ndb_cluster_node_id && (ndbcluster_hton.slot != ~(uint)0))
{
/* ndb not connected yet */
set_timespec(abstime, 1);
@@ -7173,14 +6922,25 @@
}
pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ {
+ Thd_ndb *thd_ndb;
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ {
+ sql_print_error("Could not allocate Thd_ndb object");
+ goto ndb_util_thread_end;
+ }
+ set_thd_ndb(thd, thd_ndb);
+ thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+ }
+
+#ifdef HAVE_NDB_BINLOG
+ /* create tables needed by the replication */
+ ndbcluster_setup_binlog_table_shares(thd);
+#else
/*
Get all table definitions from the storage node
*/
ndbcluster_find_all_files(thd);
-
-#ifdef HAVE_NDB_BINLOG
- /* create tables needed by the replication */
- ndbcluster_create_apply_status_table(thd);
#endif
ndbcluster_util_inited= 1;
@@ -7209,15 +6969,11 @@
#ifdef HAVE_NDB_BINLOG
/*
- Check that the apply_status_share has been created.
+ Check that the apply_status_share and schema_share has been created.
If not try to create it
*/
- if (!apply_status_share &&
- ndbcluster_check_apply_status_share() == 0)
- {
- ndbcluster_find_all_files(thd);
- ndbcluster_create_apply_status_table(thd);
- }
+ if (!apply_status_share || !schema_share)
+ ndbcluster_setup_binlog_table_shares(thd);
#endif
if (ndb_cache_check_time == 0)
@@ -7325,6 +7081,7 @@
}
}
ndb_util_thread_end:
+ sql_print_information("Stopping Cluster Utility thread");
net_end(&thd->net);
thd->cleanup();
delete thd;
@@ -8670,7 +8427,6 @@
enum ha_stat_type stat_type)
{
char buf[IO_SIZE];
- ulonglong ndb_latest_epoch= 0;
DBUG_ENTER("ndbcluster_show_status");
if (have_ndbcluster != SHOW_OPTION_YES)
@@ -8709,1453 +8465,11 @@
}
}
#ifdef HAVE_NDB_BINLOG
- pthread_mutex_lock(&injector_mutex);
- if (injector_ndb)
- {
- ndb_latest_epoch= injector_ndb->getLatestGCI();
- pthread_mutex_unlock(&injector_mutex);
-
- snprintf(buf, sizeof(buf),
- "latest_epoch=%llu, "
- "latest_trans_epoch=%llu, "
- "latest_received_binlog_epoch=%llu, "
- "latest_handled_binlog_epoch=%llu, "
- "latest_applied_binlog_epoch=%llu",
- ndb_latest_epoch,
- g_latest_trans_gci,
- ndb_latest_received_binlog_epoch,
- ndb_latest_handled_binlog_epoch,
- ndb_latest_applied_binlog_epoch);
- if (stat_print(thd, ndbcluster_hton.name, "binlog", buf))
- DBUG_RETURN(TRUE);
- }
- else
- pthread_mutex_unlock(&injector_mutex);
+ ndbcluster_show_status_binlog(thd, stat_print, stat_type);
#endif
DBUG_RETURN(FALSE);
}
-
-#ifdef HAVE_NDB_BINLOG
-
-/*
- Run a query through mysql_parse
-
- Used to:
- - purging the cluster_replication.binlog_index
- - creating the cluster_replication.apply_status table
-*/
-static void run_query(THD *thd, char *buf, char *end, my_bool print_error)
-{
- ulong save_query_length= thd->query_length;
- char *save_query= thd->query;
- ulong save_thread_id= thd->variables.pseudo_thread_id;
- NET save_net= thd->net;
-
- bzero((char*) &thd->net, sizeof(NET));
- thd->query_length= end - buf;
- thd->query= buf;
- thd->variables.pseudo_thread_id= thread_id;
- DBUG_PRINT("query", ("%s", thd->query));
-
- mysql_parse(thd, thd->query, thd->query_length);
-
- if (print_error && thd->query_error)
- {
- sql_print_error("NDB: %s: error %s %d %d %d",
- buf, thd->net.last_error, thd->net.last_errno,
- thd->net.report_error, thd->query_error);
- }
-
- thd->query_length= save_query_length;
- thd->query= save_query;
- thd->variables.pseudo_thread_id= save_thread_id;
- thd->net= save_net;
-}
-
-
-/*********************************************************************
- Internal helper functions for handeling of the cluster replication tables
- - cluster_replication.binlog_index
- - cluster_replication.apply_status
-*********************************************************************/
-
-/*
- defines for cluster replication table names
-*/
-#define NDB_REP_DB "cluster_replication"
-#define NDB_REP_TABLE "binlog_index"
-#define NDB_APPLY_TABLE "apply_status"
-#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
-
-/*
- Global variables for holding the binlog_index table reference
-*/
-TABLE *binlog_index= 0;
-TABLE_LIST binlog_tables;
-
-/*
- struct to hold the data to be inserted into the
- cluster_replication.binlog_index table
-*/
-struct Binlog_index_row {
- longlong gci;
- const char *master_log_file;
- longlong master_log_pos;
- longlong n_inserts;
- longlong n_updates;
- longlong n_deletes;
- longlong n_schemaops;
-};
-
-/*
- Open the cluster_replication.binlog_index table
-*/
-static int open_binlog_index(THD *thd, TABLE_LIST *tables,
- TABLE **binlog_index)
-{
- static char repdb[]= NDB_REP_DB;
- static char reptable[]= NDB_REP_TABLE;
- const char *save_proc_info= thd->proc_info;
-
- bzero((char*) tables, sizeof(*tables));
- tables->db= repdb;
- tables->alias= tables->table_name= reptable;
- tables->lock_type= TL_WRITE;
- thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
- tables->required_type= FRMTYPE_TABLE;
- uint counter;
- thd->clear_error();
- if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
- {
- sql_print_error("NDB Binlog: Opening binlog_index: %d, '%s'",
- thd->net.last_errno,
- thd->net.last_error ? thd->net.last_error : "");
- thd->proc_info= save_proc_info;
- return -1;
- }
- *binlog_index= tables->table;
- thd->proc_info= save_proc_info;
- return 0;
-}
-
-/*
- Insert one row in the cluster_replication.binlog_index
-
- declared friend in handler.h to be able to call write_row directly
- so that this insert is not replicated
-*/
-int ndb_add_binlog_index(THD *thd, void *_row)
-{
- Binlog_index_row &row= *(Binlog_index_row *) _row;
- int error= 0;
- bool need_reopen;
- for ( ; ; ) /* loop for need_reopen */
- {
- if (!binlog_index && open_binlog_index(thd, &binlog_tables, &binlog_index))
- {
- error= -1;
- goto add_binlog_index_err;
- }
-
- if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
- {
- if (need_reopen)
- {
- close_tables_for_reopen(thd, &binlog_tables);
- binlog_index= 0;
- continue;
- }
- sql_print_error("NDB Binlog: Unable to lock table binlog_index");
- error= -1;
- goto add_binlog_index_err;
- }
- break;
- }
-
- binlog_index->field[0]->store(row.master_log_pos);
- binlog_index->field[1]->store(row.master_log_file,
- strlen(row.master_log_file),
- &my_charset_bin);
- binlog_index->field[2]->store(row.gci);
- binlog_index->field[3]->store(row.n_inserts);
- binlog_index->field[4]->store(row.n_updates);
- binlog_index->field[5]->store(row.n_deletes);
- binlog_index->field[6]->store(row.n_schemaops);
-
- int r;
- if ((r= binlog_index->file->write_row(binlog_index->record[0])))
- {
- sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r);
- error= -1;
- goto add_binlog_index_err;
- }
-
- mysql_unlock_tables(thd, thd->lock);
- thd->lock= 0;
- return 0;
-add_binlog_index_err:
- close_thread_tables(thd);
- binlog_index= 0;
- return error;
-}
-
-/*
- check the availability af the cluster_replication.apply_status share
- - return share, but do not increase refcount
- - return 0 if there is no share
-*/
-static NDB_SHARE *ndbcluster_check_apply_status_share()
-{
- pthread_mutex_lock(&ndbcluster_mutex);
-
- void *share= hash_search(&ndbcluster_open_tables,
- NDB_APPLY_TABLE_FILE,
- sizeof(NDB_APPLY_TABLE_FILE) - 1);
- DBUG_PRINT("info",("ndbcluster_check_apply_status_share %s %p",
- NDB_APPLY_TABLE_FILE, share));
- pthread_mutex_unlock(&ndbcluster_mutex);
- return (NDB_SHARE*) share;
-}
-
-/*
- Get the share for the cluster_replication.apply_status share
-
- - return 0 if share does not exist
-*/
-static NDB_SHARE *ndbcluster_get_apply_status_share()
-{
- return get_share(NDB_APPLY_TABLE_FILE, false);
-}
-
-/*
- Create the cluster_replication.apply_status table
-*/
-static int ndbcluster_create_apply_status_table(THD *thd)
-{
- DBUG_ENTER("ndbcluster_create_apply_status_table");
-
- /*
- Check if we already have the apply status table.
- If so it should have been discovered at startup
- and thus have a share
- */
-
- if (ndbcluster_check_apply_status_share())
- DBUG_RETURN(0);
-
- if (g_ndb_cluster_connection->get_no_ready() <= 0)
- DBUG_RETURN(0);
-
- char buf[1024], *end;
-
- sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
-
- /*
- Check if apply status table exists in MySQL "dictionary"
- if so, remove it since there is none in Ndb
- */
- {
- strxnmov(buf, sizeof(buf),
- mysql_data_home,
- "/" NDB_REP_DB "/" NDB_APPLY_TABLE,
- reg_ext, NullS);
- unpack_filename(buf,buf);
- my_delete(buf, MYF(0));
- }
-
- /*
- Note, updating this table schema must be reflected in ndb_restore
- */
- end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
- NDB_REP_DB "." NDB_APPLY_TABLE
- " ( server_id INT UNSIGNED NOT NULL,"
- " epoch BIGINT UNSIGNED NOT NULL, "
- " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
-
- run_query(thd, buf, end, TRUE);
-
- DBUG_RETURN(0);
-}
-
-
-/*********************************************************************
- Functions for start, stop, wait for ndbcluster binlog thread
-*********************************************************************/
-
-static int do_ndbcluster_binlog_close_connection= 0;
-
-static int ndbcluster_binlog_start()
-{
- DBUG_ENTER("ndbcluster_binlog_start");
-
- pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&injector_cond, NULL);
-
- /* Create injector thread */
- if (pthread_create(&ndb_binlog_thread, &connection_attrib,
- ndb_binlog_thread_func, 0))
- {
- DBUG_PRINT("error", ("Could not create ndb injector thread"));
- pthread_cond_destroy(&injector_cond);
- pthread_mutex_destroy(&injector_mutex);
- DBUG_RETURN(-1);
- }
-
- /*
- Wait for the ndb injector thread to finish starting up.
- */
- pthread_mutex_lock(&injector_mutex);
- while (!ndb_binlog_thread_running)
- pthread_cond_wait(&injector_cond, &injector_mutex);
- pthread_mutex_unlock(&injector_mutex);
-
- if (ndb_binlog_thread_running < 0)
- DBUG_RETURN(-1);
-
- DBUG_RETURN(0);
-}
-
-static void ndbcluster_binlog_close_connection(THD *thd)
-{
- DBUG_ENTER("ndbcluster_binlog_close_connection");
- const char *save_info= thd->proc_info;
- thd->proc_info= "ndbcluster_binlog_close_connection";
- do_ndbcluster_binlog_close_connection= 1;
- while (ndb_binlog_thread_running > 0)
- sleep(1);
- thd->proc_info= save_info;
- DBUG_VOID_RETURN;
-}
-
-/*
- called in mysql_show_binlog_events and reset_logs to make sure we wait for
- all events originating from this mysql server to arrive in the binlog
-
- Wait for the last epoch in which the last transaction is a part of.
-
- Wait a maximum of 30 seconds.
-*/
-void ndbcluster_binlog_wait(THD *thd)
-{
- if (ndb_binlog_thread_running > 0)
- {
- DBUG_ENTER("ndbcluster_binlog_wait");
- const char *save_info= thd ? thd->proc_info : 0;
- ulonglong wait_epoch= g_latest_trans_gci;
- int count= 30;
- if (thd)
- thd->proc_info= "Waiting for ndbcluster binlog update to "
- "reach current position";
- while (count && ndb_binlog_thread_running > 0 &&
- ndb_latest_handled_binlog_epoch < wait_epoch)
- {
- count--;
- sleep(1);
- }
- if (thd)
- thd->proc_info= save_info;
- DBUG_VOID_RETURN;
- }
-}
-
-/*****************************************************************
- functions called from master sql client threads
-****************************************************************/
-
-/*
- Called from MYSQL_LOG::reset_logs in log.cc when binlog is emptied
-*/
-int ndbcluster_reset_logs(THD *thd)
-{
- if (ndb_binlog_thread_running <= 0)
- return 0;
-
- DBUG_ENTER("ndbcluster_reset_logs");
-
- /*
- Wait for all events orifinating from this mysql server has
- reached the binlog before continuing to reset
- */
- ndbcluster_binlog_wait(thd);
-
- char buf[1024];
- char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
-
- run_query(thd, buf, end, FALSE);
-
- DBUG_RETURN(0);
-}
-
-/*
- Called from MYSQL_LOG::purge_logs in log.cc when the binlog "file"
- is removed
-*/
-
-int ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
-{
- if (ndb_binlog_thread_running <= 0)
- return 0;
-
- DBUG_ENTER("ndbcluster_binlog_index_purge_file");
- DBUG_PRINT("enter", ("file: %s", file));
-
- char buf[1024];
- char *end= strmov(strmov(strmov(buf,
- "DELETE FROM "
- NDB_REP_DB "." NDB_REP_TABLE
- " WHERE File='"), file), "'");
-
- run_query(thd, buf, end, FALSE);
-
- DBUG_RETURN(0);
-}
-
-/*****************************************************************
- functions called from slave sql client threads
-****************************************************************/
-void ndbcluster_reset_slave(THD *thd)
-{
- if (ndb_binlog_thread_running <= 0)
- return;
-
- DBUG_ENTER("ndbcluster_reset_slave");
- char buf[1024];
- char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
- run_query(thd, buf, end, FALSE);
- DBUG_VOID_RETURN;
-}
-
-
-/**************************************************************
- Internal helper functions for creating/dropping ndb events
- used by the client sql threads
-**************************************************************/
-static void
-ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
-{
- event_name->set_ascii("REPL$", 5);
- event_name->append(db);
- if (tbl)
- {
- event_name->append('/');
- event_name->append(tbl);
- }
-}
-
-/*
- Common function for setting up everything for logging a table at
- create/discover.
-*/
-static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
- const char *db,
- const char *table_name,
- bool do_binlog,
- NDB_SHARE *share)
-{
- DBUG_ENTER("ndbcluster_create_binlog_setup");
-
- pthread_mutex_lock(&ndbcluster_mutex);
-
- /* Handle any trailing share */
- if (share == 0)
- {
- share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
- (byte*) key, strlen(key));
- if (share)
- handle_trailing_share(share);
- }
- else
- handle_trailing_share(share);
-
- /* Create share which is needed to hold replication information */
- if (!(share= get_share(key, true, true)))
- {
- sql_print_error("NDB Binlog: "
- "allocating table share for %s failed", key);
- }
- pthread_mutex_unlock(&ndbcluster_mutex);
-
- while (share && do_binlog)
- {
- /*
- ToDo make sanity check of share so that the table is actually the same
- I.e. we need to do openfrm in this case
- Currently awaiting this to be fixed in the 4.1 tree in the general
- case
- */
-
- /* Create the event in NDB */
- ndb->setDatabaseName(db);
-
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *ndbtab= dict->getTable(table_name);
- if (ndbtab == 0)
- {
- sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
- "%s, %d", key, dict->getNdbError().message,
- dict->getNdbError().code);
- break; // error
- }
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, db, table_name);
- /*
- event should have been created by someone else,
- but let's make sure, and create if it doesn't exist
- */
- if (!dict->getEvent(event_name.c_ptr()))
- {
- if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr()))
- {
- sql_print_error("NDB Binlog: "
- "FAILED CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- break; // error
- }
- sql_print_information("NDB Binlog: "
- "CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- }
- else
- sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
- event_name.c_ptr());
-
- /*
- create the event operations for receiving logging events
- */
- if (ndbcluster_create_event_ops(share, ndbtab,
- event_name.c_ptr()) < 0)
- {
- sql_print_error("NDB Binlog:"
- "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
- event_name.c_ptr());
- /* a warning has been issued to the client */
- DBUG_RETURN(0);
- }
- DBUG_RETURN(0);
- }
- DBUG_RETURN(-1);
-}
-
-static int
-ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
- const char *event_name)
-{
- DBUG_ENTER("ndbcluster_create_event");
- NDBDICT *dict= ndb->getDictionary();
-
- if (!dict)
- {
- sql_print_error("NDB Binlog: could not setup binlog, "
- "Invalid NdbDictionary");
- DBUG_RETURN(-1);
- }
-
- NDBEVENT my_event(event_name);
- my_event.setTable(*ndbtab);
- my_event.addTableEvent(NDBEVENT::TE_ALL);
-
- /* add all columns to the event */
- int n_cols= ndbtab->getNoOfColumns();
- for(int a= 0; a < n_cols; a++)
- my_event.addEventColumn(a);
-
- if (dict->createEvent(my_event)) // Add event to database
- {
-#ifdef NDB_BINLOG_EXTRA_WARNINGS
- /*
- failed, print a warning
- */
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
-#endif
- if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
- {
- sql_print_error("NDB Binlog: Unable to create event in database. "
- "Event: %s Error Code: %d Message: %s", event_name,
- dict->getNdbError().code, dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-
- /*
- trailing event from before; an error, but try to correct it
- */
- if (dict->dropEvent(my_event.getName()))
- {
- sql_print_error("NDB Binlog: Unable to create event in database. "
- " Attempt to correct with drop failed. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-
- /*
- try to add the event again
- */
- if (dict->createEvent(my_event))
- {
- sql_print_error("NDB Binlog: Unable to create event in database. "
- " Attempt to correct with drop ok, but create failed. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-#ifdef NDB_BINLOG_EXTRA_WARNINGS
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- 0, "NDB Binlog: Removed trailing event",
- "NDB");
-#endif
- }
-
- DBUG_RETURN(0);
-}
-
-inline int is_ndb_compatible_type(Field *field)
-{
- return
- !(field->flags & BLOB_FLAG) &&
- field->type() != MYSQL_TYPE_BIT &&
- field->pack_length() != 0;
-}
-
-/*
- - create eventOperations for receiving log events
- - setup ndb recattrs for reception of log event data
- - "start" the event operation
-
- used at create/discover of tables
-*/
-static int
-ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
- const char *event_name)
-{
- /*
- we are in either create table or rename table so table should be
- locked, hence we can work with the share without locks
- */
-
- DBUG_ENTER("ndbcluster_create_event_ops");
-
- DBUG_ASSERT(share != 0);
-
- if (share->op)
- {
- assert(share->op->getCustomData() == (void *) share);
-
- DBUG_ASSERT(share->use_count > 1);
- sql_print_error("NDB Binlog: discover reusing old ev op");
- free_share(&share); // old event op already has reference
- DBUG_RETURN(0);
- }
-
- TABLE *table= share->table;
- if (table)
- {
- /*
- Logging of blob tables is not yet implemented, it would require:
- 1. setup of events also on the blob attribute tables
- 2. collect the pieces of the blob into one from an epoch to
- provide a full blob to binlog
- */
- if (table->s->blob_fields)
- {
- sql_print_error("NDB Binlog: logging of blob table %s "
- "is not supported", share->key);
- DBUG_RETURN(0);
- }
- /*
- Logging of a table without primary key is not possible since the event
- api does not provide a "full" before image, only updated attributes
- are returned in the before image.
- */
- if (table->s->primary_key == MAX_KEY)
- {
- sql_print_error("NDB Binlog: logging of table %s without a "
- "primary key is not supported", share->key);
- DBUG_RETURN(0);
- }
- }
-
- pthread_mutex_lock(&injector_mutex);
- if (injector_ndb == 0)
- {
- pthread_mutex_unlock(&injector_mutex);
- DBUG_RETURN(-1);
- }
-
- NdbEventOperation *op= injector_ndb->createEventOperation(event_name);
- if (!op)
- {
- pthread_mutex_unlock(&injector_mutex);
- sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
- " %s",event_name);
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- injector_ndb->getNdbError().code,
- injector_ndb->getNdbError().message,
- "NDB");
- DBUG_RETURN(-1);
- }
-
- int n_columns= ndbtab->getNoOfColumns();
- int n_fields= table ? table->s->fields : 0;
- for (int j= 0; j < n_columns; j++)
- {
- const char *col_name= ndbtab->getColumn(j)->getName();
- NdbRecAttr *attr0, *attr1;
- if (j < n_fields)
- {
- Field *f= share->table->field[j];
- if (is_ndb_compatible_type(f))
- {
- DBUG_PRINT("info", ("%s compatible", col_name));
- attr0= op->getValue(col_name, f->ptr);
- attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
- share->table->record[1]);
- }
- else
- {
- DBUG_PRINT("info", ("%s non compatible", col_name));
- attr0= op->getValue(col_name);
- attr1= op->getPreValue(col_name);
- }
- }
- else
- {
- DBUG_PRINT("info", ("%s hidden key", col_name));
- attr0= op->getValue(col_name);
- attr1= op->getPreValue(col_name);
- }
- share->ndb_value[0][j].rec= attr0;
- share->ndb_value[1][j].rec= attr1;
- }
- op->setCustomData((void *) share); // set before execute
- share->op= op; // assign op in NDB_SHARE
- if (op->execute())
- {
- share->op= NULL;
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- op->getNdbError().code, op->getNdbError().message,
- "NDB");
- sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
- event_name,
- op->getNdbError().code, op->getNdbError().message);
- injector_ndb->dropEventOperation(op);
- pthread_mutex_unlock(&injector_mutex);
- DBUG_RETURN(-1);
- }
- pthread_mutex_unlock(&injector_mutex);
-
- get_share(share);
-
- DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
- share->key, share->op, share->use_count));
-
- sql_print_information("NDB Binlog: logging %s", share->key);
- DBUG_RETURN(0);
-}
-
-/*
- when entering the calling thread should have a share lock id share != 0
- then the injector thread will have one as well, i.e. share->use_count == 0
- (unless it has already dropped... then share->op == 0)
-*/
-static int
-ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
- NDB_SHARE *share)
-{
- DBUG_ENTER("ndbcluster_handle_drop_table");
-
- NDBDICT *dict= ndb->getDictionary();
- if (event_name && dict->dropEvent(event_name))
- {
- /* drop event failed for some reason, issue a warning */
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
- if (dict->getNdbError().code != 4710)
- {
- /* error is not that the event did not exist */
- sql_print_error("NDB Binlog: Unable to drop event in database. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- /* ToDo; handle error? */
- if (share && share->op &&
- share->op->getState() == NdbEventOperation::EO_EXECUTING &&
- dict->getNdbError().code != 4009)
- {
- DBUG_ASSERT(false);
- DBUG_RETURN(-1);
- }
- }
- }
-
- if (share == 0 || share->op == 0)
- {
- DBUG_RETURN(0);
- }
-
-/*
- Syncronized drop between client thread and injector thread is
- neccessary in order to maintain ordering in the binlog,
- such that the drop occurs _after_ any inserts/updates/deletes.
-
- The penalty for this is that the drop table becomes slow.
-
- This wait is however not strictly neccessary to produce a binlog
- that is usable. However the slave does not currently handle
- these out of order, thus we are keeping the SYNC_DROP_ defined
- for now.
-*/
-#define SYNC_DROP_
-#ifdef SYNC_DROP_
- (void) pthread_mutex_lock(&share->mutex);
- int max_timeout= 10;
- while (share->op)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- (void) pthread_cond_timedwait(&injector_cond,
- &share->mutex,
- &abstime);
- max_timeout--;
- if (share->op == 0)
- break;
- if (max_timeout == 0)
- {
- sql_print_error("NDB delete table: timed out. Ignoring...");
- break;
- }
- sql_print_information("NDB delete table: "
- "waiting max %u sec for drop table %s.",
- max_timeout, share->key);
- }
- (void) pthread_mutex_unlock(&share->mutex);
-#else
- (void) pthread_mutex_lock(&share->mutex);
- share->op_old= share->op;
- share->op= 0;
- (void) pthread_mutex_unlock(&share->mutex);
-#endif
-
- DBUG_RETURN(0);
-}
-
-
-/********************************************************************
- Internal helper functions for differentd events from the stoarage nodes
- used by the ndb injector thread
-********************************************************************/
-
-/*
- Handle error states on events from the storage nodes
-*/
-static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
- Binlog_index_row &row)
-{
- NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
- DBUG_ENTER("ndb_binlog_thread_handle_error");
-
- int overrun= pOp->isOverrun();
- if (overrun)
- {
- /*
- ToDo: this error should rather clear the binlog_index...
- and continue
- */
- sql_print_error("NDB Binlog: Overrun in event buffer, "
- "this means we have dropped events. Cannot "
- "continue binlog for %s", share->key);
- pOp->clearError();
- DBUG_RETURN(-1);
- }
-
- if (!pOp->isConsistent())
- {
- /*
- ToDo: this error should rather clear the binlog_index...
- and continue
- */
- sql_print_error("NDB Binlog: Not Consistent. Cannot "
- "continue binlog for %s. Error code: %d"
- " Message: %s", share->key,
- pOp->getNdbError().code,
- pOp->getNdbError().message);
- pOp->clearError();
- DBUG_RETURN(-1);
- }
- sql_print_error("NDB Binlog: unhandled error %d for table %s",
- pOp->hasError(), share->key);
- pOp->clearError();
- DBUG_RETURN(0);
-}
-
-/*
- Handle _non_ data events from the storage nodes
-*/
-static int
-ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
- Binlog_index_row &row)
-{
- NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
- NDBEVENT::TableEvent type= pOp->getEventType();
- int remote_drop_table= 0, do_close_cached_tables= 0;
-
- /* make sure to flush any pending events as they can be dependent
- on one of the tables being changed below
- */
- injector_thd->binlog_flush_pending_rows_event(true);
-
- switch (type)
- {
- case NDBEVENT::TE_CLUSTER_FAILURE:
- sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
-
- DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
- "%s received share: 0x%lx op: %lx share op: %lx "
- "op_old: %lx",
- share->key, share, pOp, share->op, share->op_old));
- if (apply_status_share)
- {
- free_share(&apply_status_share);
- apply_status_share= 0;
- }
- break;
- case NDBEVENT::TE_ALTER:
- /* ToDo: remove printout */
- sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
- share_prefix, share->table->s->db,
- share->table->s->table_name,
- share->key);
- /* do the rename of the table in the share */
- share->table->s->db= share->db;
- share->table->s->table_name= share->table_name;
- goto drop_alter_common;
- case NDBEVENT::TE_DROP:
- /* ToDo: remove printout */
- sql_print_information("NDB Binlog: drop table %s.",
- share->key);
-drop_alter_common:
- row.n_schemaops++;
- DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
- "share op: %lx op_old: %lx",
- type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
- share->key, share, pOp, share->op, share->op_old));
- if (pOp->getReqNodeId() != ndb_cluster_node_id)
- {
- ndb->setDatabaseName(share->table->s->db);
- ha_ndbcluster::invalidate_dictionary_cache(share->table,
- ndb,
- share->table->s->table_name,
- TRUE);
- remote_drop_table= 1;
- }
- break;
- default:
- sql_print_error("NDB Binlog: unknown non data event %d for %s. "
- "Ignoring...", (unsigned) type, share->key);
- return 0;
- }
-
- (void) pthread_mutex_lock(&share->mutex);
- DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
- if (share->op_old == pOp)
- share->op_old= 0;
- else
- share->op= 0;
- // either just us or drop table handling as well
-
- /* Signal ha_ndbcluster::delete/rename_table that drop is done */
- (void) pthread_mutex_unlock(&share->mutex);
- (void) pthread_cond_signal(&injector_cond);
-
- pthread_mutex_lock(&ndbcluster_mutex);
- free_share(&share, TRUE);
- if (remote_drop_table && share && share->state != NSS_DROPPED)
- {
- DBUG_PRINT("info", ("remote drop table"));
- if (share->use_count != 1)
- do_close_cached_tables= 1;
- share->state= NSS_DROPPED;
- free_share(&share, TRUE);
- }
- pthread_mutex_unlock(&ndbcluster_mutex);
-
- share= 0;
- pOp->setCustomData(0);
-
- pthread_mutex_lock(&injector_mutex);
- injector_ndb->dropEventOperation(pOp);
- pOp= 0;
- pthread_mutex_unlock(&injector_mutex);
-
- if (do_close_cached_tables)
- close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
-
- return 0;
-}
-
-/*
- Handle data events from the storage nodes
-*/
-static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
- Binlog_index_row &row,
- injector::transaction &trans)
-{
- NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
- TABLE *table= share->table;
-
- assert(table != 0);
-#ifndef DBUG_OFF
- dbug_print_table("table", table);
-#endif
- TABLE_SHARE *table_s= table->s;
- uint n_fields= table_s->fields;
- MY_BITMAP b;
- /* Potential buffer for the bitmap */
- uint32 bitbuf[128 / (sizeof(uint32) * 8)];
- bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
- n_fields, false);
- bitmap_set_all(&b);
-
- /*
- row data is already in table->record[0]
- As we told the NdbEventOperation to do this
- (saves moving data about many times)
- */
-
- switch(pOp->getEventType())
- {
- case NDBEVENT::TE_INSERT:
- row.n_inserts++;
- DBUG_PRINT("info", ("INSERT INTO %s", share->key));
- {
- ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
- trans.write_row(::server_id, injector::transaction::table(table, true),
- &b, n_fields, table->record[0]);
- }
- break;
- case NDBEVENT::TE_DELETE:
- row.n_deletes++;
- DBUG_PRINT("info",("DELETE FROM %s", share->key));
- {
- /*
- table->record[0] contains only the primary key in this case
- since we do not have an after image
- */
- int n;
- if (table->s->primary_key != MAX_KEY)
- n= 0; /*
- use the primary key only as it save time and space and
- it is the only thing needed to log the delete
- */
- else
- n= 1; /*
- we use the before values since we don't have a primary key
- since the mysql server does not handle the hidden primary
- key
- */
-
- ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
- print_records(table, table->record[n]);
- trans.delete_row(::server_id, injector::transaction::table(table, true),
- &b, n_fields, table->record[n]);
- }
- break;
- case NDBEVENT::TE_UPDATE:
- row.n_updates++;
- DBUG_PRINT("info", ("UPDATE %s", share->key));
- {
- ndb_unpack_record(table, share->ndb_value[0],
- &b, table->record[0]);
- print_records(table, table->record[0]);
- if (table->s->primary_key != MAX_KEY)
- {
- /*
- since table has a primary key, we can to a write
- using only after values
- */
- trans.write_row(::server_id, injector::transaction::table(table, true),
- &b, n_fields, table->record[0]);// after values
- }
- else
- {
- /*
- mysql server cannot handle the ndb hidden key and
- therefore needs the before image as well
- */
- ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
- print_records(table, table->record[1]);
- trans.update_row(::server_id,
- injector::transaction::table(table, true),
- &b, n_fields,
- table->record[1], // before values
- table->record[0]);// after values
- }
- }
- break;
- default:
- /* We should REALLY never get here. */
- DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
- break;
- }
-
- return 0;
-}
-
-/*
- Timer class for doing performance measurements
-*/
-//#define RUN_NDB_BINLOG_TIMER
-#ifdef RUN_NDB_BINLOG_TIMER
-class Timer
-{
-public:
- Timer() { start(); }
- void start() { gettimeofday(&m_start, 0); }
- void stop() { gettimeofday(&m_stop, 0); }
- ulong elapsed_ms()
- {
- return (ulong)
- (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
- ((longlong) m_stop.tv_usec -
- (longlong) m_start.tv_usec + 999) / 1000);
- }
-private:
- struct timeval m_start,m_stop;
-};
-#endif
-
-
-/****************************************************************
- Injector thread main loop
-****************************************************************/
-
-pthread_handler_t ndb_binlog_thread_func(void *arg)
-{
- THD *thd; /* needs to be first for thread_stack */
- Ndb *ndb= 0;
- int ndb_update_binlog_index= 1;
- injector *inj= injector::instance();
-
- pthread_mutex_lock(&injector_mutex);
- /*
- Set up the Thread
- */
- my_thread_init();
- DBUG_ENTER("ndb_binlog_thread");
-
- thd= new THD; /* note that contructor of THD uses DBUG_ */
- THD_CHECK_SENTRY(thd);
-
- thd->thread_stack= (char*) &thd; /* remember where our stack is */
- if (thd->store_globals())
- {
- thd->cleanup();
- delete thd;
- ndb_binlog_thread_running= -1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
- my_thread_end();
- pthread_exit(0);
- DBUG_RETURN(NULL);
- }
-
- thd->init_for_queries();
- thd->command= COM_DAEMON;
- injector_thd= thd;
-
- /*
- Set up ndb binlog
- */
- sql_print_information("Starting Cluster Binlog");
-
- pthread_detach_this_thread();
- thd->real_id= pthread_self();
- pthread_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thread_id++;
- threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
- thd->lex->start_transaction_opt= 0;
-
- if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
- ndb->init())
- {
- sql_print_error("NDB Binlog: Getting Ndb object failed");
- ndb_binlog_thread_running= -1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
- goto err;
- }
-
- /*
- Expose global reference to our ndb object.
-
- Used by both sql client thread and binlog thread to interact
- with the storage
- pthread_mutex_lock(&injector_mutex);
- */
- injector_ndb= ndb;
- ndb_binlog_thread_running= 1;
-
- /*
- We signal the thread that started us that we've finished
- starting up.
- */
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
-
- thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
- thd->version= refresh_version;
- thd->set_time();
- thd->main_security_ctx.host_or_ip= "";
- thd->client_capabilities= 0;
- my_net_init(&thd->net, 0);
- thd->main_security_ctx.master_access= ~0;
- thd->main_security_ctx.priv_user= 0;
-
- thd->proc_info= "Waiting for ndbcluster to start";
-
- pthread_mutex_lock(&injector_mutex);
- while (!ndbcluster_util_inited)
- {
- /* ndb not connected yet */
- struct timespec abstime;
- set_timespec(abstime, 1);
- pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
- if (abort_loop)
- {
- pthread_mutex_unlock(&injector_mutex);
- goto err;
- }
- }
- pthread_mutex_unlock(&injector_mutex);
-
- /*
- Main NDB Injector loop
- */
-
- thd->query_id= 0; // to keep valgrind quiet
- {
- static char db[]= "";
- thd->db= db;
- open_binlog_index(thd, &binlog_tables, &binlog_index);
- if (!(apply_status_share= ndbcluster_get_apply_status_share()))
- {
- sql_print_error("NDB: Could not get apply status share");
- }
- thd->db= db;
- }
-
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer main_timer;
-#endif
- for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
- ndb_latest_handled_binlog_epoch >= g_latest_trans_gci); )
- {
-
-#ifdef RUN_NDB_BINLOG_TIMER
- main_timer.stop();
- sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
- main_timer.start();
-#endif
-
- /*
- now we don't want any events before next gci is complete
- */
- thd->proc_info= "Waiting for event from ndbcluster";
- thd->set_time();
-
- /* wait for event or 1000 ms */
- Uint64 gci;
- int res= ndb->pollEvents(1000, &gci);
- ndb_latest_received_binlog_epoch= gci;
-
- if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
- ndb_latest_handled_binlog_epoch >= g_latest_trans_gci)
- break; /* Shutting down server */
-
- if (binlog_index && binlog_index->s->version < refresh_version)
- {
- if (binlog_index->s->version < refresh_version)
- {
- close_thread_tables(thd);
- binlog_index= 0;
- }
- }
-
- if (res > 0)
- {
- DBUG_PRINT("info", ("pollEvents res: %d", res));
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer gci_timer, write_timer;
- int event_count= 0;
-#endif
- thd->proc_info= "Processing events";
- NdbEventOperation *pOp= ndb->nextEvent();
- Binlog_index_row row;
- while (pOp != NULL)
- {
- ndb->
- setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
- ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
-
- assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
- if (!apply_status_share)
- {
- if (!(apply_status_share= ndbcluster_get_apply_status_share()))
- sql_print_error("NDB: Could not get apply status share");
- }
- bzero((char*) &row, sizeof(row));
- injector::transaction trans= inj->new_trans(thd);
- gci= pOp->getGCI();
- if (apply_status_share)
- {
- TABLE *table= apply_status_share->table;
- MY_BITMAP b;
- uint32 bitbuf;
- DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
- bitmap_init(&b, &bitbuf, table->s->fields, false);
- bitmap_set_all(&b);
- table->field[0]->store((longlong)::server_id);
- table->field[1]->store((longlong)gci);
- trans.write_row(::server_id,
- injector::transaction::table(table, true),
- &b, table->s->fields,
- table->record[0]);
- }
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.start();
-#endif
- do
- {
-#ifdef RUN_NDB_BINLOG_TIMER
- event_count++;
-#endif
- if (pOp->hasError() &&
- ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
- goto err;
-
-#ifndef DBUG_OFF
- {
- NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
- DBUG_PRINT("info",
- ("EVENT TYPE:%d GCI:%lld last applied: %lld "
- "share: 0x%lx", pOp->getEventType(), gci,
- ndb_latest_applied_binlog_epoch, share));
- DBUG_ASSERT(share != 0);
- }
-#endif
- if ((unsigned) pOp->getEventType() <
- (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
- else
- ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
-
- pOp= ndb->nextEvent();
- } while (pOp && pOp->getGCI() == gci);
-
- /*
- note! pOp is not referring to an event in the next epoch
- or is == 0
- */
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.stop();
-#endif
-
- if (row.n_inserts || row.n_updates
- || row.n_deletes || row.n_schemaops)
- {
- injector::transaction::binlog_pos start= trans.start_pos();
- if (int r= trans.commit())
- {
- sql_print_error("NDB binlog:"
- "Error during COMMIT of GCI. Error: %d",
- r);
- /* TODO: Further handling? */
- }
- row.gci= gci;
- row.master_log_file= start.file_name();
- row.master_log_pos= start.file_pos();
-
- DBUG_PRINT("info",("COMMIT gci %lld",gci));
- if (ndb_update_binlog_index)
- ndb_add_binlog_index(thd, &row);
- ndb_latest_applied_binlog_epoch= gci;
- }
- else
- trans.commit();
- ndb_latest_handled_binlog_epoch= gci;
-#ifdef RUN_NDB_BINLOG_TIMER
- gci_timer.stop();
- sql_print_information("gci %ld event_count %d write time "
- "%ld(%d e/s), total time %ld(%d e/s)",
- (ulong)gci, event_count,
- write_timer.elapsed_ms(),
- event_count / write_timer.elapsed_ms(),
- gci_timer.elapsed_ms(),
- event_count / gci_timer.elapsed_ms());
-#endif
- }
- }
- ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
- }
-err:
- DBUG_PRINT("info",("Shutting down cluster binlog thread"));
- close_thread_tables(thd);
- pthread_mutex_lock(&injector_mutex);
- /* don't mess with the injector_ndb anymore from other threads */
- injector_ndb= 0;
- pthread_mutex_unlock(&injector_mutex);
- thd->db= 0; // as not to try to free memory
- sql_print_information("Stopping Cluster Binlog");
-
- if (apply_status_share)
- free_share(&apply_status_share);
-
- /* remove all event operations */
- if (ndb)
- {
- NdbEventOperation *op;
- DBUG_PRINT("info",("removing all event operations"));
- while ((op= ndb->getEventOperation()))
- {
- DBUG_PRINT("info",("removing event operation on %s",
- op->getEvent()->getName()));
- NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
- free_share(&share);
- ndb->dropEventOperation(op);
- }
- delete ndb;
- ndb= 0;
- }
-
- net_end(&thd->net);
- thd->cleanup();
- delete thd;
-
- ndb_binlog_thread_running= -1;
- (void) pthread_cond_signal(&injector_cond);
-
- DBUG_PRINT("exit", ("ndb_binlog_thread"));
- my_thread_end();
-
- pthread_exit(0);
- DBUG_RETURN(NULL);
-}
-#endif /* HAVE_NDB_BINLOG */
/*
--- 1.99/sql/ha_ndbcluster.h 2005-11-24 09:59:02 +01:00
+++ 1.100/sql/ha_ndbcluster.h 2005-12-15 09:21:04 +01:00
@@ -97,6 +97,9 @@
char *old_names; // for rename table
TABLE *table;
NdbValue *ndb_value[2];
+ MY_BITMAP *subscriber_bitmap;
+ MY_BITMAP slock_bitmap;
+ uint32 slock[256/32]; // 256 bits for lock status of table
#endif
} NDB_SHARE;
@@ -473,6 +476,11 @@
Place holder for ha_ndbcluster thread specific data
*/
+enum THD_NDB_OPTIONS
+{
+ TNO_NO_LOG_SCHEMA_OP= 1 << 0
+};
+
class Thd_ndb
{
public:
@@ -484,6 +492,7 @@
NdbTransaction *all;
NdbTransaction *stmt;
int error;
+ uint32 options;
List<NDB_SHARE> changed_tables;
};
@@ -785,25 +794,10 @@
extern struct show_var_st ndb_status_variables[];
-bool ndbcluster_init(void);
-int ndbcluster_end(ha_panic_function flag);
-
int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files);
int ndbcluster_table_exists_in_engine(THD* thd,
const char *db, const char *name);
-void ndbcluster_drop_database(char* path);
-
void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
-bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
-
-#ifdef HAVE_NDB_BINLOG
-int ndbcluster_reset_logs(THD *thd);
-int ndbcluster_binlog_index_purge_file(THD *thd, const char *file);
-void ndbcluster_reset_slave(THD *thd);
-void ndbcluster_binlog_wait(THD *thd);
-int ndbcluster_binlog_end();
-#endif
--- 1.72/libmysqld/Makefile.am 2005-11-24 09:59:00 +01:00
+++ 1.73/libmysqld/Makefile.am 2005-12-15 09:21:04 +01:00
@@ -70,6 +70,7 @@
libmysqld_int_a_SOURCES= $(libmysqld_sources) $(libmysqlsources) $(sqlsources)
EXTRA_libmysqld_a_SOURCES = ha_innodb.cc ha_berkeley.cc ha_archive.cc \
ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \
+ ha_ndbcluster_binlog.cc \
ha_tina.cc ha_example.cc ha_partition.cc
libmysqld_a_DEPENDENCIES= @mysql_se_objs@
libmysqld_a_SOURCES=
@@ -98,6 +99,9 @@
$(CXXCOMPILE) @bdb_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster.o:ha_ndbcluster.cc
+ $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
+ha_ndbcluster_binlog.o: ha_ndbcluster_binlog.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
# Until we can remove dependency on ha_ndbcluster.h
--- 1.12/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2005-09-15 10:42:02 +02:00
+++ 1.13/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2005-12-15 09:21:05 +01:00
@@ -17,6 +17,7 @@
#ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP
+#include <ndb_types.h>
/**
* @class Ndb_cluster_connection
@@ -29,6 +30,18 @@
* By using the wait_until_ready() method it is possible to wait
* for the connection to reach one or more storage nodes.
*/
+
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+struct Ndb_cluster_connection_node_iter {
+ Ndb_cluster_connection_node_iter() : scan_state(~0),
+ init_pos(0),
+ cur_pos(0) {};
+ Uint8 scan_state;
+ Uint8 init_pos;
+ Uint8 cur_pos;
+};
+#endif
+
class Ndb_cluster_connection {
public:
/**
@@ -88,6 +101,9 @@
unsigned no_db_nodes();
unsigned node_id();
+
+ void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
+ Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
#endif
private:
--- 1.34/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2005-11-18 16:06:35 +01:00
+++ 1.35/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2005-12-15 09:21:05 +01:00
@@ -122,7 +122,7 @@
}
void
-Ndb_cluster_connection_impl::init_get_next_node
+Ndb_cluster_connection::init_get_next_node
(Ndb_cluster_connection_node_iter &iter)
{
if (iter.scan_state != (Uint8)~0)
@@ -136,7 +136,7 @@
}
Uint32
-Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
+Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
{
Uint32 cur_pos= iter.cur_pos;
if (cur_pos >= no_db_nodes())
--- 1.4/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2005-09-15 10:42:04 +02:00
+++ 1.5/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2005-12-15 09:21:05 +01:00
@@ -26,15 +26,6 @@
class NdbThread;
class ndb_mgm_configuration;
-struct Ndb_cluster_connection_node_iter {
- Ndb_cluster_connection_node_iter() : scan_state(~0),
- init_pos(0),
- cur_pos(0) {};
- Uint8 scan_state;
- Uint8 init_pos;
- Uint8 cur_pos;
-};
-
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
@@ -45,10 +36,6 @@
~Ndb_cluster_connection_impl();
void do_test();
-
- void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
- Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
-
private:
friend class Ndb;
friend class NdbImpl;
--- New file ---
+++ sql/ha_ndbcluster_binlog.cc 05/12/14 22:28:26
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "mysql_priv.h"
#include "ha_ndbcluster.h"
#ifdef HAVE_NDB_BINLOG
#include "rpl_injector.h"
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
/*
defines for cluster replication table names
*/
#define NDB_REP_DB "cluster_replication"
#define NDB_REP_TABLE "binlog_index"
#define NDB_APPLY_TABLE "apply_status"
#define NDB_SCHEMA_TABLE "schema"
#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
/*
Flag showing if the ndb injector thread is running, if so == 1
*/
int ndb_binlog_thread_running= 0;
/*
Global reference to the ndb injector thread THD oject
Has one sole purpose, for setting the in_use table member variable
in get_share(...)
*/
THD *injector_thd= 0;
/*
Global reference to ndb injector thd object.
Used mainly by the binlog index thread, but exposed to the client sql
thread for one reason; to setup the events operations for a table
to enable ndb injector thread receiving events.
Must therefore always be used with a surrounding
pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
*/
static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0;
/*
Mutex and condition used for interacting between client sql thread
and injector thread
*/
pthread_t ndb_binlog_thread;
pthread_mutex_t injector_mutex;
pthread_cond_t injector_cond;
/* NDB Injector thread (used for binlog creation) */
ulong ndb_report_thresh_binlog_epoch_slip;
ulong ndb_report_thresh_binlog_mem_usage;
static ulonglong ndb_latest_applied_binlog_epoch= 0;
static ulonglong ndb_latest_handled_binlog_epoch= 0;
static ulonglong ndb_latest_received_binlog_epoch= 0;
NDB_SHARE *apply_status_share= 0;
NDB_SHARE *schema_share= 0;
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
extern Uint64 g_latest_trans_gci;
/*
Helper functions
*/
#ifndef DBUG_OFF
static void print_records(TABLE *table, const char *record)
{
if (_db_on_)
{
for (uint j= 0; j < table->s->fields; j++)
{
char buf[40];
int pos= 0;
Field *field= table->field[j];
const byte* field_ptr= field->ptr - table->record[0] + record;
int pack_len= field->pack_length();
int n= pack_len < 10 ? pack_len : 10;
for (int i= 0; i < n && pos < 20; i++)
{
pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
}
buf[pos]= 0;
DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
}
}
}
#else
#define print_records(a,b)
#endif
#ifndef DBUG_OFF
static void dbug_print_table(const char *info, TABLE *table)
{
if (table == 0)
{
DBUG_PRINT("info",("%s: (null)", info));
return;
}
DBUG_PRINT("info",
("%s: %s.%s s->fields: %d "
"reclength: %d rec_buff_length: %d record[0]: %lx "
"record[1]: %lx",
info,
table->s->db,
table->s->table_name,
table->s->fields,
table->s->reclength,
table->s->rec_buff_length,
table->record[0],
table->record[1]));
for (unsigned int i= 0; i < table->s->fields; i++)
{
Field *f= table->field[i];
DBUG_PRINT("info",
("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
"ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
i,
f->field_name,
f->flags,
(f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
(f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
(f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
(f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
(f->flags & BLOB_FLAG) ? ",blob" : "",
(f->flags & BINARY_FLAG) ? ",binary" : "",
f->real_type(),
f->pack_length(),
f->ptr, f->ptr - table->record[0],
f->null_bit,
f->null_ptr, (byte*) f->null_ptr - table->record[0]));
if (f->type() == MYSQL_TYPE_BIT)
{
Field_bit *g= (Field_bit*) f;
DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
"bit_ofs: %u bit_len: %u",
g->field_length, g->bit_ptr,
(byte*) g->bit_ptr-table->record[0],
g->bit_ofs, g->bit_len));
}
}
}
#else
#define dbug_print_table(a,b)
#endif
/*
Run a query through mysql_parse
Used to:
- purging the cluster_replication.binlog_index
- creating the cluster_replication.apply_status table
*/
static void run_query(THD *thd, char *buf, char *end,
my_bool print_error, my_bool disable_binlog)
{
ulong save_query_length= thd->query_length;
char *save_query= thd->query;
ulong save_thread_id= thd->variables.pseudo_thread_id;
ulonglong save_thd_options= thd->options;
DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
NET save_net= thd->net;
bzero((char*) &thd->net, sizeof(NET));
thd->query_length= end - buf;
thd->query= buf;
thd->variables.pseudo_thread_id= thread_id;
if (disable_binlog)
thd->options&= ~OPTION_BIN_LOG;
DBUG_PRINT("query", ("%s", thd->query));
mysql_parse(thd, thd->query, thd->query_length);
if (print_error && thd->query_error)
{
sql_print_error("NDB: %s: error %s %d %d %d",
buf, thd->net.last_error, thd->net.last_errno,
thd->net.report_error, thd->query_error);
}
thd->options= thd->options;
thd->query_length= save_query_length;
thd->query= save_query;
thd->variables.pseudo_thread_id= save_thread_id;
thd->net= save_net;
}
/*
Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share)
{
MEM_ROOT *mem_root= &share->mem_root;
share->op= 0;
share->table= 0;
while (ndb_binlog_thread_running > 0)
{
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
int r;
if ((r= openfrm(current_thd, share->key, "", 0, (uint) READ_ALL,
0, table)))
{
sql_print_error("Unable to open frm for %s, frmerror=%d",
share->key, r);
DBUG_PRINT("error", ("openfrm failed %d", r));
my_free((gptr) table, MYF(0));
table= 0;
break;
}
if (!table->record[1] || table->record[1] == table->record[0])
{
table->record[1]= alloc_root(&table->mem_root,
table->s->rec_buff_length);
}
table->in_use= injector_thd;
table->s->db= share->db;
table->s->table_name= share->table_name;
share->table= table;
dbug_print_table("table", table);
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
share->ndb_value[0]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+ 1 /*extra for hidden key*/);
share->ndb_value[1]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+1 /*extra for hidden key*/);
{
int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
share->subscriber_bitmap= (MY_BITMAP*)
alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
for (i= 0; i < no_nodes; i++)
{
bitmap_init(&share->subscriber_bitmap[i],
(Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
max_ndb_nodes, false);
bitmap_clear_all(&share->subscriber_bitmap[i]);
}
bitmap_init(&share->slock_bitmap, share->slock,
sizeof(share->slock)*8, false);
bitmap_clear_all(&share->slock_bitmap);
}
break;
}
}
/*****************************************************************
functions called from master sql client threads
****************************************************************/
/*
called in mysql_show_binlog_events and reset_logs to make sure we wait for
all events originating from this mysql server to arrive in the binlog
Wait for the last epoch in which the last transaction is a part of.
Wait a maximum of 30 seconds.
*/
static void ndbcluster_binlog_wait(THD *thd)
{
if (ndb_binlog_thread_running > 0)
{
DBUG_ENTER("ndbcluster_binlog_wait");
const char *save_info= thd ? thd->proc_info : 0;
ulonglong wait_epoch= g_latest_trans_gci;
int count= 30;
if (thd)
thd->proc_info= "Waiting for ndbcluster binlog update to "
"reach current position";
while (count && ndb_binlog_thread_running > 0 &&
ndb_latest_handled_binlog_epoch < wait_epoch)
{
count--;
sleep(1);
}
if (thd)
thd->proc_info= save_info;
DBUG_VOID_RETURN;
}
}
/*
Called from MYSQL_LOG::reset_logs in log.cc when binlog is emptied
*/
static int ndbcluster_reset_logs(THD *thd)
{
if (ndb_binlog_thread_running <= 0)
return 0;
DBUG_ENTER("ndbcluster_reset_logs");
/*
Wait for all events orifinating from this mysql server has
reached the binlog before continuing to reset
*/
ndbcluster_binlog_wait(thd);
char buf[1024];
char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
run_query(thd, buf, end, FALSE, TRUE);
DBUG_RETURN(0);
}
/*
Called from MYSQL_LOG::purge_logs in log.cc when the binlog "file"
is removed
*/
static int
ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
{
if (ndb_binlog_thread_running <= 0)
return 0;
DBUG_ENTER("ndbcluster_binlog_index_purge_file");
DBUG_PRINT("enter", ("file: %s", file));
char buf[1024];
char *end= strmov(strmov(strmov(buf,
"DELETE FROM "
NDB_REP_DB "." NDB_REP_TABLE
" WHERE File='"), file), "'");
run_query(thd, buf, end, FALSE, TRUE);
DBUG_RETURN(0);
}
static int
ndbcluster_create_database(THD *thd, const char *db, const char *query,
int query_length)
{
DBUG_ENTER("ndbcluster_create_database");
DBUG_PRINT("enter", ("db: %s query: %s", db, query));
ndbcluster_log_schema_op(thd, 0,
query, query_length,
db, "", 0, 0, SOT_CREATE_DB);
DBUG_RETURN(0);
}
static int
ndbcluster_alter_database(THD *thd, const char *db, const char *query,
int query_length)
{
DBUG_ENTER("ndbcluster_alter_database");
DBUG_PRINT("enter", ("db: %s query: %s", db, query));
ndbcluster_log_schema_op(thd, 0,
query, query_length,
db, "", 0, 0, SOT_ALTER_DB);
DBUG_RETURN(0);
}
/*
End use of the NDB Cluster table handler
- free all global variables allocated by
ndbcluster_init()
*/
static int ndbcluster_binlog_end(THD *thd)
{
DBUG_ENTER("ndb_binlog_end");
if (!ndbcluster_util_inited)
DBUG_RETURN(0);
// Kill ndb utility thread
(void) pthread_mutex_lock(&LOCK_ndb_util_thread);
DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
(void) pthread_cond_signal(&COND_ndb_util_thread);
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
#ifdef HAVE_NDB_BINLOG
/* wait for injector thread to finish */
if (ndb_binlog_thread_running > 0)
{
pthread_mutex_lock(&injector_mutex);
while (ndb_binlog_thread_running > 0)
{
struct timespec abstime;
set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
}
pthread_mutex_unlock(&injector_mutex);
}
/* remove all shares */
{
pthread_mutex_lock(&ndbcluster_mutex);
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
NDB_SHARE *share=
(NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
if (share->table)
DBUG_PRINT("share",
("table->s->db.table_name: %s.%s",
share->table->s->db, share->table->s->table_name));
if (share->state != NSS_DROPPED && !--share->use_count)
real_free_share(&share);
else
{
DBUG_PRINT("share",
("[%d] 0x%lx key: %s key_length: %d",
i, share, share->key, share->key_length));
DBUG_PRINT("share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
share->db, share->table_name,
share->use_count, share->commit_count));
}
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
#endif
ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
/*****************************************************************
functions called from slave sql client threads
****************************************************************/
static void ndbcluster_reset_slave(THD *thd)
{
if (ndb_binlog_thread_running <= 0)
return;
DBUG_ENTER("ndbcluster_reset_slave");
char buf[1024];
char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
run_query(thd, buf, end, FALSE, TRUE);
DBUG_VOID_RETURN;
}
/*
Initialize the binlog part of the ndb handlerton
*/
void ndbcluster_binlog_init_handlerton()
{
handlerton &h= ndbcluster_hton;
h.reset_logs= ndbcluster_reset_logs;
h.binlog_index_purge_file=
ndbcluster_binlog_index_purge_file;
h.reset_slave= ndbcluster_reset_slave;
h.binlog_wait= ndbcluster_binlog_wait;
h.binlog_end= ndbcluster_binlog_end;
h.create_database= ndbcluster_create_database;
h.alter_database= ndbcluster_alter_database;
}
/*
check the availability af the cluster_replication.apply_status share
- return share, but do not increase refcount
- return 0 if there is no share
*/
static NDB_SHARE *ndbcluster_check_apply_status_share()
{
pthread_mutex_lock(&ndbcluster_mutex);
void *share= hash_search(&ndbcluster_open_tables,
NDB_APPLY_TABLE_FILE,
sizeof(NDB_APPLY_TABLE_FILE) - 1);
DBUG_PRINT("info",("ndbcluster_check_apply_status_share %s %p",
NDB_APPLY_TABLE_FILE, share));
pthread_mutex_unlock(&ndbcluster_mutex);
return (NDB_SHARE*) share;
}
/*
check the availability af the cluster_replication.schema share
- return share, but do not increase refcount
- return 0 if there is no share
*/
static NDB_SHARE *ndbcluster_check_schema_share()
{
pthread_mutex_lock(&ndbcluster_mutex);
void *share= hash_search(&ndbcluster_open_tables,
NDB_SCHEMA_TABLE_FILE,
sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
DBUG_PRINT("info",("ndbcluster_check_schema_share %s %p",
NDB_SCHEMA_TABLE_FILE, share));
pthread_mutex_unlock(&ndbcluster_mutex);
return (NDB_SHARE*) share;
}
/*
Create the cluster_replication.apply_status table
*/
static int ndbcluster_create_apply_status_table(THD *thd)
{
DBUG_ENTER("ndbcluster_create_apply_status_table");
/*
Check if we already have the apply status table.
If so it should have been discovered at startup
and thus have a share
*/
if (ndbcluster_check_apply_status_share())
DBUG_RETURN(0);
if (g_ndb_cluster_connection->get_no_ready() <= 0)
DBUG_RETURN(0);
char buf[1024], *end;
sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
/*
Check if apply status table exists in MySQL "dictionary"
if so, remove it since there is none in Ndb
*/
{
strxnmov(buf, sizeof(buf),
mysql_data_home,
"/" NDB_REP_DB "/" NDB_APPLY_TABLE,
reg_ext, NullS);
unpack_filename(buf,buf);
my_delete(buf, MYF(0));
}
/*
Note, updating this table schema must be reflected in ndb_restore
*/
end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
NDB_REP_DB "." NDB_APPLY_TABLE
" ( server_id INT UNSIGNED NOT NULL,"
" epoch BIGINT UNSIGNED NOT NULL, "
" PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
run_query(thd, buf, end, TRUE, TRUE);
DBUG_RETURN(0);
}
/*
Create the cluster_replication.schema table
*/
static int ndbcluster_create_schema_table(THD *thd)
{
DBUG_ENTER("ndbcluster_create_schema_table");
/*
Check if we already have the schema table.
If so it should have been discovered at startup
and thus have a share
*/
if (ndbcluster_check_schema_share())
DBUG_RETURN(0);
if (g_ndb_cluster_connection->get_no_ready() <= 0)
DBUG_RETURN(0);
char buf[1024], *end;
sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
/*
Check if schema table exists in MySQL "dictionary"
if so, remove it since there is none in Ndb
*/
{
strxnmov(buf, sizeof(buf),
mysql_data_home,
"/" NDB_REP_DB "/" NDB_SCHEMA_TABLE,
reg_ext, NullS);
unpack_filename(buf,buf);
my_delete(buf, MYF(0));
}
/*
Update the defines below to reflect the table schema
*/
end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
NDB_REP_DB "." NDB_SCHEMA_TABLE
" ( db VARCHAR(63) NOT NULL,"
" name VARCHAR(63) NOT NULL,"
" slock BINARY(32) NOT NULL,"
" query VARCHAR(2046) NOT NULL,"
" node_id INT UNSIGNED NOT NULL,"
" epoch BIGINT UNSIGNED NOT NULL,"
" id INT UNSIGNED NOT NULL,"
" version INT UNSIGNED NOT NULL,"
" type INT UNSIGNED NOT NULL,"
" PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB");
run_query(thd, buf, end, TRUE, TRUE);
DBUG_RETURN(0);
}
void ndbcluster_setup_binlog_table_shares(THD *thd)
{
int done_find_all_files= 0;
if (!apply_status_share &&
ndbcluster_check_apply_status_share() == 0)
{
if (!done_find_all_files)
{
ndbcluster_find_all_files(thd);
done_find_all_files= 1;
}
ndbcluster_create_apply_status_table(thd);
}
if (!schema_share &&
ndbcluster_check_schema_share() == 0)
{
if (!done_find_all_files)
{
ndbcluster_find_all_files(thd);
done_find_all_files= 1;
}
ndbcluster_create_schema_table(thd);
}
}
/*
Defines and struct for schema table.
Should reflect table definition above.
*/
#define SCHEMA_DB_I 0u
#define SCHEMA_NAME_I 1u
#define SCHEMA_SLOCK_I 2u
#define SCHEMA_QUERY_I 3u
#define SCHEMA_NODE_ID_I 4u
#define SCHEMA_EPOCH_I 5u
#define SCHEMA_ID_I 6u
#define SCHEMA_VERSION_I 7u
#define SCHEMA_TYPE_I 8u
#define SCHEMA_SIZE 9u
#define SCHEMA_SLOCK_SIZE 32u
#define SCHEMA_QUERY_SIZE 2048u
struct Cluster_replication_schema
{
unsigned char db_length;
char db[64];
unsigned char name_length;
char name[64];
unsigned char slock_length;
uint32 slock[SCHEMA_SLOCK_SIZE/4];
unsigned short query_length;
char query[SCHEMA_QUERY_SIZE];
Uint64 epoch;
uint32 node_id;
uint32 id;
uint32 version;
uint32 type;
};
/*
Transfer schema table data into corresponding struct
*/
static void ndbcluster_get_schema(TABLE *table,
Cluster_replication_schema *s)
{
Field **field;
/* db varchar 1 length byte */
field= table->field;
s->db_length= *(uint8*)(*field)->ptr;
DBUG_ASSERT(s->db_length <= (*field)->field_length);
DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
memcpy(s->db, (*field)->ptr + 1, s->db_length);
s->db[s->db_length]= 0;
/* name varchar 1 length byte */
field++;
s->name_length= *(uint8*)(*field)->ptr;
DBUG_ASSERT(s->name_length <= (*field)->field_length);
DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
memcpy(s->name, (*field)->ptr + 1, s->name_length);
s->name[s->name_length]= 0;
/* slock fixed length */
field++;
s->slock_length= (*field)->field_length;
DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
memcpy(s->slock, (*field)->ptr, s->slock_length);
/* query varchar 2 length bytes */
field++;
s->query_length= uint2korr((*field)->ptr);
DBUG_ASSERT(s->query_length <= (*field)->field_length);
DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query));
memcpy(s->query, (*field)->ptr + 2, s->query_length);
s->query[s->query_length]= 0;
/* node_id */
field++;
s->node_id= ((Field_long *)*field)->val_int();
/* epoch */
field++;
s->epoch= ((Field_long *)*field)->val_int();
/* id */
field++;
s->id= ((Field_long *)*field)->val_int();
/* version */
field++;
s->version= ((Field_long *)*field)->val_int();
/* type */
field++;
s->type= ((Field_long *)*field)->val_int();
}
/*
helper function to pack a ndb varchar
*/
static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz)
{
switch (col->getArrayType())
{
case NDBCOL::ArrayTypeFixed:
memcpy(buf, str, sz);
break;
case NDBCOL::ArrayTypeShortVar:
*(unsigned char*)buf= (unsigned char)sz;
memcpy(buf + 1, str, sz);
break;
case NDBCOL::ArrayTypeMediumVar:
int2store(buf, sz);
memcpy(buf + 2, str, sz);
break;
}
return buf;
}
/*
log query in schema table
*/
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length,
const char *db, const char *table_name,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
if (!thd_ndb)
{
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
{
sql_print_error("Could not allocate Thd_ndb object");
DBUG_RETURN(1);
}
set_thd_ndb(thd, thd_ndb);
}
DBUG_PRINT("enter",
("query: %s db: %s table_name: %s thd_ndb->options: %d",
query, db, table_name, thd_ndb->options));
if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
{
DBUG_RETURN(0);
}
char tmp_buf2[FN_REFLEN];
switch (type)
{
case SOT_DROP_TABLE:
/* drop database command, do not log at drop table */
if (thd->lex->sql_command == SQLCOM_DROP_DB)
DBUG_RETURN(0);
/* redo the drop table query as is may contain several tables */
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2);
break;
case SOT_CREATE_TABLE:
break;
case SOT_RENAME_TABLE:
break;
case SOT_ALTER_TABLE:
break;
case SOT_DROP_DB:
break;
case SOT_CREATE_DB:
break;
case SOT_ALTER_DB:
break;
default:
abort(); /* should not happen, programming error */
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Uint64 epoch= 0;
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(schema_share->slock)/4];
{
int i;
bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false);
bitmap_set_all(&schema_subscribers);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++)
{
MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(table_subscribers))
bitmap_intersect(&schema_subscribers,
table_subscribers);
}
(void) pthread_mutex_unlock(&schema_share->mutex);
bitmap_clear_bit(&schema_subscribers, node_id);
if (share)
{
(void) pthread_mutex_lock(&share->mutex);
memcpy(share->slock, schema_subscribers.bitmap, sizeof(share->slock));
(void) pthread_mutex_unlock(&share->mutex);
}
DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap,
no_bytes_in_map(&schema_subscribers));
DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
bitmap_is_clear_all(&schema_subscribers)));
}
Ndb *ndb= thd_ndb->ndb;
char old_db[128];
strcpy(old_db, ndb->getDatabaseName());
char tmp_buf[SCHEMA_QUERY_SIZE];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
if (ndbtab == 0)
{
if (strcmp(NDB_REP_DB, db) != 0 ||
strcmp(NDB_SCHEMA_TABLE, table_name))
{
ndb_error= &dict->getNdbError();
goto end;
}
DBUG_RETURN(0);
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
while (1)
{
if ((trans= ndb->startTransaction()) == 0)
goto err;
{
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
DBUG_ASSERT(r == 0);
/* query */
ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length);
r|= op->setValue(SCHEMA_QUERY_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* epoch */
r|= op->setValue(SCHEMA_EPOCH_I, epoch);
DBUG_ASSERT(r == 0);
/* id */
r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
DBUG_ASSERT(r == 0);
/* version */
r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)type);
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("logged: %s", query));
break;
}
err:
if (trans->getNdbError().status == NdbError::TemporaryError)
{
if (retries--)
{
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= &trans->getNdbError();
break;
}
end:
if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not log query '%s' on other mysqld's");
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(old_db);
/*
Wait for other mysqld's to acknowledge the table operation
*/
if (ndb_error == 0 &&
(type == SOT_CREATE_TABLE ||
type == SOT_RENAME_TABLE ||
type == SOT_ALTER_TABLE) &&
!bitmap_is_clear_all(&schema_subscribers))
{
int max_timeout= 10;
(void) pthread_mutex_lock(&share->mutex);
while (1)
{
struct timespec abstime;
int i;
set_timespec(abstime, 1);
(void) pthread_cond_timedwait(&injector_cond,
&share->mutex,
&abstime);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(tmp))
bitmap_intersect(&schema_subscribers, tmp);
}
(void) pthread_mutex_unlock(&schema_share->mutex);
/* remove any unsubscribed from share->slock */
bitmap_intersect(&share->slock_bitmap, &schema_subscribers);
DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap,
no_bytes_in_map(&share->slock_bitmap));
if (bitmap_is_clear_all(&share->slock_bitmap))
break;
max_timeout--;
if (max_timeout == 0)
{
sql_print_error("NDB create table: timed out. Ignoring...");
break;
}
sql_print_information("NDB create table: "
"waiting max %u sec for create table %s.",
max_timeout, share->key);
}
(void) pthread_mutex_unlock(&share->mutex);
}
DBUG_RETURN(0);
}
/*
acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
const char *db,
const char *table_name)
{
DBUG_ENTER("ndbcluster_update_slock");
if (!schema_share)
{
DBUG_RETURN(0);
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Ndb *ndb= check_ndb_in_thd(thd);
char old_db[128];
strcpy(old_db, ndb->getDatabaseName());
char tmp_buf[SCHEMA_QUERY_SIZE];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
NdbTransaction *trans= 0;
int retries= 100;
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
MY_BITMAP slock;
uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
if (ndbtab == 0)
{
abort();
DBUG_RETURN(0);
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
while (1)
{
if ((trans= ndb->startTransaction()) == 0)
goto err;
{
NdbOperation *op= 0;
int r= 0;
/* read the bitmap exlusive */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->readTupleExclusive();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::NoCommit))
goto err;
bitmap_clear_bit(&slock, node_id);
{
NdbOperation *op= 0;
int r= 0;
/* now update the tuple */
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->updateTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
strlen(table_name));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
DBUG_ASSERT(r == 0);
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
DBUG_ASSERT(r == 0);
}
if (trans->execute(NdbTransaction::Commit) == 0)
{
dict->forceGCPWait();
DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
node_id, db, table_name));
break;
}
err:
if (trans->getNdbError().status == NdbError::TemporaryError)
{
if (retries--)
{
ndb->closeTransaction(trans);
continue; // retry
}
}
ndb_error= &trans->getNdbError();
break;
}
end:
if (ndb_error)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not release lock on '%s.%s'",
db, table_name);
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(old_db);
DBUG_RETURN(0);
}
/*
Handle _non_ data events from the storage nodes
*/
static int
ndb_handle_schema_change(Ndb *ndb, NdbEventOperation *pOp, NDB_SHARE *share)
{
int remote_drop_table= 0, do_close_cached_tables= 0;
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
{
ndb->setDatabaseName(share->table->s->db);
ha_ndbcluster::invalidate_dictionary_cache(share->table,
ndb,
share->table->s->table_name,
TRUE);
remote_drop_table= 1;
}
(void) pthread_mutex_lock(&share->mutex);
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
if (share->op_old == pOp)
share->op_old= 0;
else
share->op= 0;
// either just us or drop table handling as well
/* Signal ha_ndbcluster::delete/rename_table that drop is done */
(void) pthread_mutex_unlock(&share->mutex);
(void) pthread_cond_signal(&injector_cond);
pthread_mutex_lock(&ndbcluster_mutex);
free_share(&share, TRUE);
if (remote_drop_table && share && share->state != NSS_DROPPED)
{
DBUG_PRINT("info", ("remote drop table"));
if (share->use_count != 1)
do_close_cached_tables= 1;
share->state= NSS_DROPPED;
free_share(&share, TRUE);
}
pthread_mutex_unlock(&ndbcluster_mutex);
share= 0;
pOp->setCustomData(0);
pthread_mutex_lock(&injector_mutex);
injector_ndb->dropEventOperation(pOp);
pOp= 0;
pthread_mutex_unlock(&injector_mutex);
if (do_close_cached_tables)
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
return 0;
}
static int
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NdbEventOperation *pOp,
List<Cluster_replication_schema>
*schema_list, MEM_ROOT *mem_root)
{
DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
if (share && schema_share == share)
{
NDBEVENT::TableEvent ev_type= pOp->getEventType();
DBUG_PRINT("enter", ("%s.%s ev_type: %d",
share->db, share->table_name, ev_type));
switch (ev_type)
{
case NDBEVENT::TE_UPDATE:
case NDBEVENT::TE_INSERT:
{
Cluster_replication_schema *schema= (Cluster_replication_schema *)
sql_alloc(sizeof(Cluster_replication_schema));
uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(share->table, schema);
if (schema->node_id != node_id)
{
int log_query= 0;
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query));
switch ((enum SCHEMA_OP_TYPE)schema->type)
{
case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */
schema_list->push_back(schema, mem_root);
log_query= 0;
break;
case SOT_CREATE_TABLE:
/* fall through */
case SOT_RENAME_TABLE:
/* fall through */
case SOT_ALTER_TABLE:
pthread_mutex_lock(&LOCK_open);
if (ha_create_table_from_engine(thd, schema->db, schema->name))
{
sql_print_error("Could not discover table '%s.%s' from "
"binlog schema event '%s' from node %d",
schema->db, schema->name, schema->query,
schema->node_id);
}
pthread_mutex_unlock(&LOCK_open);
{
/* signal that schema operation has been handled */
MY_BITMAP slock;
bitmap_init(&slock, schema->slock, 8*schema->slock_length, false);
if (bitmap_is_set(&slock, node_id))
ndbcluster_update_slock(thd, schema->db, schema->name);
}
log_query= 1;
break;
case SOT_DROP_DB:
#if 0
run_query(thd, schema->query,
schema->query + schema->query_length,
TRUE, /* print error */
TRUE); /* don't binlog the query */
#endif
/* binlog dropping database after any table operations */
schema_list->push_back(schema, mem_root);
log_query= 0;
break;
case SOT_CREATE_DB:
/* fall through */
case SOT_ALTER_DB:
#if 0
run_query(thd, schema->query,
schema->query + schema->query_length,
TRUE, /* print error */
FALSE); /* binlog the query */
log_query= 0;
#else
log_query= 1;
#endif
break;
case SOT_CLEAR_SLOCK:
{
char key[FN_REFLEN];
(void)strxnmov(key, FN_REFLEN, share_prefix, schema->db,
"/", schema->name, NullS);
NDB_SHARE *share= get_share(key, false, false);
if (share)
{
pthread_mutex_lock(&share->mutex);
memcpy(share->slock, schema->slock, sizeof(share->slock));
DBUG_DUMP("share->slock_bitmap.bitmap",
(char*)share->slock_bitmap.bitmap,
no_bytes_in_map(&share->slock_bitmap));
pthread_mutex_unlock(&share->mutex);
pthread_cond_signal(&injector_cond);
free_share(&share);
}
DBUG_RETURN(0);
}
}
if (log_query)
{
char *thd_db_save= thd->db;
thd->db= schema->db;
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
schema->query_length, FALSE,
schema->name[0] == 0);
thd->db= thd_db_save;
}
}
}
break;
case NDBEVENT::TE_DELETE:
// skip
break;
case NDBEVENT::TE_ALTER:
/* do the rename of the table in the share */
share->table->s->db= share->db;
share->table->s->table_name= share->table_name;
ndb_handle_schema_change(ndb, pOp, share);
break;
case NDBEVENT::TE_CLUSTER_FAILURE:
case NDBEVENT::TE_DROP:
free_share(&schema_share);
schema_share= 0;
ndb_handle_schema_change(ndb, pOp, share);
break;
case NDBEVENT::TE_NODE_FAILURE:
{
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
DBUG_ASSERT(node_id != 0xFF);
(void) pthread_mutex_lock(&share->mutex);
bitmap_clear_all(&share->subscriber_bitmap[node_id]);
DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
(void) pthread_mutex_unlock(&share->mutex);
(void) pthread_cond_signal(&injector_cond);
break;
}
case NDBEVENT::TE_SUBSCRIBE:
{
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
uint8 req_id= pOp->getReqNodeId();
DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
(void) pthread_mutex_lock(&share->mutex);
bitmap_set_bit(&share->subscriber_bitmap[node_id], req_id);
DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
(void) pthread_mutex_unlock(&share->mutex);
(void) pthread_cond_signal(&injector_cond);
break;
}
case NDBEVENT::TE_UNSUBSCRIBE:
{
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
uint8 req_id= pOp->getReqNodeId();
DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
(void) pthread_mutex_lock(&share->mutex);
bitmap_clear_bit(&share->subscriber_bitmap[node_id], req_id);
DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
(void) pthread_mutex_unlock(&share->mutex);
(void) pthread_cond_signal(&injector_cond);
break;
}
default:
sql_print_error("NDB Binlog: unknown non data event %d for %s. "
"Ignoring...", (unsigned) ev_type, share->key);
}
}
DBUG_RETURN(0);
}
/*
Timer class for doing performance measurements
*/
/*********************************************************************
Internal helper functions for handeling of the cluster replication tables
- cluster_replication.binlog_index
- cluster_replication.apply_status
*********************************************************************/
/*
Global variables for holding the binlog_index table reference
*/
TABLE *binlog_index= 0;
TABLE_LIST binlog_tables;
/*
struct to hold the data to be inserted into the
cluster_replication.binlog_index table
*/
struct Binlog_index_row {
longlong gci;
const char *master_log_file;
longlong master_log_pos;
longlong n_inserts;
longlong n_updates;
longlong n_deletes;
longlong n_schemaops;
};
/*
Open the cluster_replication.binlog_index table
*/
static int open_binlog_index(THD *thd, TABLE_LIST *tables,
TABLE **binlog_index)
{
static char repdb[]= NDB_REP_DB;
static char reptable[]= NDB_REP_TABLE;
const char *save_proc_info= thd->proc_info;
bzero((char*) tables, sizeof(*tables));
tables->db= repdb;
tables->alias= tables->table_name= reptable;
tables->lock_type= TL_WRITE;
thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
tables->required_type= FRMTYPE_TABLE;
uint counter;
thd->clear_error();
if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
{
sql_print_error("NDB Binlog: Opening binlog_index: %d, '%s'",
thd->net.last_errno,
thd->net.last_error ? thd->net.last_error : "");
thd->proc_info= save_proc_info;
return -1;
}
*binlog_index= tables->table;
thd->proc_info= save_proc_info;
return 0;
}
/*
Insert one row in the cluster_replication.binlog_index
declared friend in handler.h to be able to call write_row directly
so that this insert is not replicated
*/
int ndb_add_binlog_index(THD *thd, void *_row)
{
Binlog_index_row &row= *(Binlog_index_row *) _row;
int error= 0;
bool need_reopen;
for ( ; ; ) /* loop for need_reopen */
{
if (!binlog_index && open_binlog_index(thd, &binlog_tables, &binlog_index))
{
error= -1;
goto add_binlog_index_err;
}
if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
{
if (need_reopen)
{
close_tables_for_reopen(thd, &binlog_tables);
binlog_index= 0;
continue;
}
sql_print_error("NDB Binlog: Unable to lock table binlog_index");
error= -1;
goto add_binlog_index_err;
}
break;
}
binlog_index->field[0]->store(row.master_log_pos);
binlog_index->field[1]->store(row.master_log_file,
strlen(row.master_log_file),
&my_charset_bin);
binlog_index->field[2]->store(row.gci);
binlog_index->field[3]->store(row.n_inserts);
binlog_index->field[4]->store(row.n_updates);
binlog_index->field[5]->store(row.n_deletes);
binlog_index->field[6]->store(row.n_schemaops);
int r;
if ((r= binlog_index->file->write_row(binlog_index->record[0])))
{
sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r);
error= -1;
goto add_binlog_index_err;
}
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
return 0;
add_binlog_index_err:
close_thread_tables(thd);
binlog_index= 0;
return error;
}
/*********************************************************************
Functions for start, stop, wait for ndbcluster binlog thread
*********************************************************************/
static int do_ndbcluster_binlog_close_connection= 0;
int ndbcluster_binlog_start()
{
DBUG_ENTER("ndbcluster_binlog_start");
pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init(&injector_cond, NULL);
/* Create injector thread */
if (pthread_create(&ndb_binlog_thread, &connection_attrib,
ndb_binlog_thread_func, 0))
{
DBUG_PRINT("error", ("Could not create ndb injector thread"));
pthread_cond_destroy(&injector_cond);
pthread_mutex_destroy(&injector_mutex);
DBUG_RETURN(-1);
}
/*
Wait for the ndb injector thread to finish starting up.
*/
pthread_mutex_lock(&injector_mutex);
while (!ndb_binlog_thread_running)
pthread_cond_wait(&injector_cond, &injector_mutex);
pthread_mutex_unlock(&injector_mutex);
if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
static void ndbcluster_binlog_close_connection(THD *thd)
{
DBUG_ENTER("ndbcluster_binlog_close_connection");
const char *save_info= thd->proc_info;
thd->proc_info= "ndbcluster_binlog_close_connection";
do_ndbcluster_binlog_close_connection= 1;
while (ndb_binlog_thread_running > 0)
sleep(1);
thd->proc_info= save_info;
DBUG_VOID_RETURN;
}
/**************************************************************
Internal helper functions for creating/dropping ndb events
used by the client sql threads
**************************************************************/
void
ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
{
event_name->set_ascii("REPL$", 5);
event_name->append(db);
if (tbl)
{
event_name->append('/');
event_name->append(tbl);
}
}
/*
Common function for setting up everything for logging a table at
create/discover.
*/
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
const char *db,
const char *table_name,
bool do_binlog,
NDB_SHARE *share)
{
DBUG_ENTER("ndbcluster_create_binlog_setup");
pthread_mutex_lock(&ndbcluster_mutex);
/* Handle any trailing share */
if (share == 0)
{
share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, strlen(key));
if (share)
handle_trailing_share(share);
}
else
handle_trailing_share(share);
/* Create share which is needed to hold replication information */
if (!(share= get_share(key, true, true)))
{
sql_print_error("NDB Binlog: "
"allocating table share for %s failed", key);
}
pthread_mutex_unlock(&ndbcluster_mutex);
while (share && do_binlog)
{
/*
ToDo make sanity check of share so that the table is actually the same
I.e. we need to do openfrm in this case
Currently awaiting this to be fixed in the 4.1 tree in the general
case
*/
/* Create the event in NDB */
ndb->setDatabaseName(db);
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab= dict->getTable(table_name);
if (ndbtab == 0)
{
sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
"%s, %d", key, dict->getNdbError().message,
dict->getNdbError().code);
break; // error
}
String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, db, table_name);
/*
event should have been created by someone else,
but let's make sure, and create if it doesn't exist
*/
if (!dict->getEvent(event_name.c_ptr()))
{
if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr()))
{
sql_print_error("NDB Binlog: "
"FAILED CREATE (DISCOVER) TABLE Event: %s",
event_name.c_ptr());
break; // error
}
sql_print_information("NDB Binlog: "
"CREATE (DISCOVER) TABLE Event: %s",
event_name.c_ptr());
}
else
sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
event_name.c_ptr());
/*
create the event operations for receiving logging events
*/
if (ndbcluster_create_event_ops(share, ndbtab,
event_name.c_ptr()) < 0)
{
sql_print_error("NDB Binlog:"
"FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
event_name.c_ptr());
/* a warning has been issued to the client */
DBUG_RETURN(0);
}
DBUG_RETURN(0);
}
DBUG_RETURN(-1);
}
int
ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const char *event_name)
{
DBUG_ENTER("ndbcluster_create_event");
NDBDICT *dict= ndb->getDictionary();
if (!dict)
{
sql_print_error("NDB Binlog: could not setup binlog, "
"Invalid NdbDictionary");
DBUG_RETURN(-1);
}
NDBEVENT my_event(event_name);
my_event.setTable(*ndbtab);
my_event.addTableEvent(NDBEVENT::TE_ALL);
/* add all columns to the event */
int n_cols= ndbtab->getNoOfColumns();
for(int a= 0; a < n_cols; a++)
my_event.addEventColumn(a);
if (dict->createEvent(my_event)) // Add event to database
{
#ifdef NDB_BINLOG_EXTRA_WARNINGS
/*
failed, print a warning
*/
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
#endif
if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
{
sql_print_error("NDB Binlog: Unable to create event in database. "
"Event: %s Error Code: %d Message: %s", event_name,
dict->getNdbError().code, dict->getNdbError().message);
DBUG_RETURN(-1);
}
/*
trailing event from before; an error, but try to correct it
*/
if (dict->dropEvent(my_event.getName()))
{
sql_print_error("NDB Binlog: Unable to create event in database. "
" Attempt to correct with drop failed. "
"Event: %s Error Code: %d Message: %s",
event_name,
dict->getNdbError().code,
dict->getNdbError().message);
DBUG_RETURN(-1);
}
/*
try to add the event again
*/
if (dict->createEvent(my_event))
{
sql_print_error("NDB Binlog: Unable to create event in database. "
" Attempt to correct with drop ok, but create failed. "
"Event: %s Error Code: %d Message: %s",
event_name,
dict->getNdbError().code,
dict->getNdbError().message);
DBUG_RETURN(-1);
}
#ifdef NDB_BINLOG_EXTRA_WARNINGS
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
0, "NDB Binlog: Removed trailing event",
"NDB");
#endif
}
DBUG_RETURN(0);
}
inline int is_ndb_compatible_type(Field *field)
{
return
!(field->flags & BLOB_FLAG) &&
field->type() != MYSQL_TYPE_BIT &&
field->pack_length() != 0;
}
/*
- create eventOperations for receiving log events
- setup ndb recattrs for reception of log event data
- "start" the event operation
used at create/discover of tables
*/
int
ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
const char *event_name)
{
/*
we are in either create table or rename table so table should be
locked, hence we can work with the share without locks
*/
DBUG_ENTER("ndbcluster_create_event_ops");
DBUG_ASSERT(share != 0);
if (share->op)
{
assert(share->op->getCustomData() == (void *) share);
DBUG_ASSERT(share->use_count > 1);
sql_print_error("NDB Binlog: discover reusing old ev op");
free_share(&share); // old event op already has reference
DBUG_RETURN(0);
}
TABLE *table= share->table;
if (table)
{
/*
Logging of blob tables is not yet implemented, it would require:
1. setup of events also on the blob attribute tables
2. collect the pieces of the blob into one from an epoch to
provide a full blob to binlog
*/
if (table->s->blob_fields)
{
sql_print_error("NDB Binlog: logging of blob table %s "
"is not supported", share->key);
DBUG_RETURN(0);
}
/*
Logging of a table without primary key is not possible since the event
api does not provide a "full" before image, only updated attributes
are returned in the before image.
*/
if (table->s->primary_key == MAX_KEY)
{
sql_print_error("NDB Binlog: logging of table %s without a "
"primary key is not supported", share->key);
DBUG_RETURN(0);
}
}
int do_schema_share= 0, do_apply_status_share= 0;
int retries= 100;
if (!schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
do_schema_share= 1;
else if (!apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
do_apply_status_share= 1;
while (1)
{
pthread_mutex_lock(&injector_mutex);
Ndb *ndb= injector_ndb;
if (do_schema_share)
ndb= schema_ndb;
if (ndb == 0)
{
pthread_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1);
}
NdbEventOperation *op= ndb->createEventOperation(event_name);
if (!op)
{
pthread_mutex_unlock(&injector_mutex);
sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
" %s",event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb->getNdbError().code,
ndb->getNdbError().message,
"NDB");
DBUG_RETURN(-1);
}
int n_columns= ndbtab->getNoOfColumns();
int n_fields= table ? table->s->fields : 0;
for (int j= 0; j < n_columns; j++)
{
const char *col_name= ndbtab->getColumn(j)->getName();
NdbRecAttr *attr0, *attr1;
if (j < n_fields)
{
Field *f= share->table->field[j];
if (is_ndb_compatible_type(f))
{
DBUG_PRINT("info", ("%s compatible", col_name));
attr0= op->getValue(col_name, f->ptr);
attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
share->table->record[1]);
}
else
{
DBUG_PRINT("info", ("%s non compatible", col_name));
attr0= op->getValue(col_name);
attr1= op->getPreValue(col_name);
}
}
else
{
DBUG_PRINT("info", ("%s hidden key", col_name));
attr0= op->getValue(col_name);
attr1= op->getPreValue(col_name);
}
share->ndb_value[0][j].rec= attr0;
share->ndb_value[1][j].rec= attr1;
}
op->setCustomData((void *) share); // set before execute
share->op= op; // assign op in NDB_SHARE
if (op->execute())
{
share->op= NULL;
retries--;
if (op->getNdbError().status != NdbError::TemporaryError &&
op->getNdbError().code != 1407)
retries= 0;
if (retries == 0)
{
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
op->getNdbError().code, op->getNdbError().message,
"NDB");
sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
event_name,
op->getNdbError().code, op->getNdbError().message);
}
ndb->dropEventOperation(op);
pthread_mutex_unlock(&injector_mutex);
if (retries)
continue;
DBUG_RETURN(-1);
}
pthread_mutex_unlock(&injector_mutex);
break;
}
get_share(share);
if (do_apply_status_share)
apply_status_share= get_share(share);
else if (do_schema_share)
schema_share= get_share(share);
DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
share->key, share->op, share->use_count));
sql_print_information("NDB Binlog: logging %s", share->key);
DBUG_RETURN(0);
}
/*
when entering the calling thread should have a share lock id share != 0
then the injector thread will have one as well, i.e. share->use_count == 0
(unless it has already dropped... then share->op == 0)
*/
int
ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share)
{
DBUG_ENTER("ndbcluster_handle_drop_table");
NDBDICT *dict= ndb->getDictionary();
if (event_name && dict->dropEvent(event_name))
{
/* drop event failed for some reason, issue a warning */
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code,
dict->getNdbError().message, "NDB");
if (dict->getNdbError().code != 4710)
{
/* error is not that the event did not exist */
sql_print_error("NDB Binlog: Unable to drop event in database. "
"Event: %s Error Code: %d Message: %s",
event_name,
dict->getNdbError().code,
dict->getNdbError().message);
/* ToDo; handle error? */
if (share && share->op &&
share->op->getState() == NdbEventOperation::EO_EXECUTING &&
dict->getNdbError().code != 4009)
{
DBUG_ASSERT(false);
DBUG_RETURN(-1);
}
}
}
if (share == 0 || share->op == 0)
{
DBUG_RETURN(0);
}
/*
Syncronized drop between client thread and injector thread is
neccessary in order to maintain ordering in the binlog,
such that the drop occurs _after_ any inserts/updates/deletes.
The penalty for this is that the drop table becomes slow.
This wait is however not strictly neccessary to produce a binlog
that is usable. However the slave does not currently handle
these out of order, thus we are keeping the SYNC_DROP_ defined
for now.
*/
#define SYNC_DROP_
#ifdef SYNC_DROP_
(void) pthread_mutex_lock(&share->mutex);
int max_timeout= 10;
while (share->op)
{
struct timespec abstime;
set_timespec(abstime, 1);
(void) pthread_cond_timedwait(&injector_cond,
&share->mutex,
&abstime);
max_timeout--;
if (share->op == 0)
break;
if (max_timeout == 0)
{
sql_print_error("NDB delete table: timed out. Ignoring...");
break;
}
sql_print_information("NDB delete table: "
"waiting max %u sec for drop table %s.",
max_timeout, share->key);
}
(void) pthread_mutex_unlock(&share->mutex);
#else
(void) pthread_mutex_lock(&share->mutex);
share->op_old= share->op;
share->op= 0;
(void) pthread_mutex_unlock(&share->mutex);
#endif
DBUG_RETURN(0);
}
/********************************************************************
Internal helper functions for differentd events from the stoarage nodes
used by the ndb injector thread
********************************************************************/
/*
Handle error states on events from the storage nodes
*/
static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
Binlog_index_row &row)
{
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
DBUG_ENTER("ndb_binlog_thread_handle_error");
int overrun= pOp->isOverrun();
if (overrun)
{
/*
ToDo: this error should rather clear the binlog_index...
and continue
*/
sql_print_error("NDB Binlog: Overrun in event buffer, "
"this means we have dropped events. Cannot "
"continue binlog for %s", share->key);
pOp->clearError();
DBUG_RETURN(-1);
}
if (!pOp->isConsistent())
{
/*
ToDo: this error should rather clear the binlog_index...
and continue
*/
sql_print_error("NDB Binlog: Not Consistent. Cannot "
"continue binlog for %s. Error code: %d"
" Message: %s", share->key,
pOp->getNdbError().code,
pOp->getNdbError().message);
pOp->clearError();
DBUG_RETURN(-1);
}
sql_print_error("NDB Binlog: unhandled error %d for table %s",
pOp->hasError(), share->key);
pOp->clearError();
DBUG_RETURN(0);
}
static int
ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
Binlog_index_row &row)
{
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
NDBEVENT::TableEvent type= pOp->getEventType();
int remote_drop_table= 0, do_close_cached_tables= 0;
/* make sure to flush any pending events as they can be dependent
on one of the tables being changed below
*/
injector_thd->binlog_flush_pending_rows_event(true);
switch (type)
{
case NDBEVENT::TE_CLUSTER_FAILURE:
if (apply_status_share == share)
{
free_share(&apply_status_share);
apply_status_share= 0;
}
sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
"%s received share: 0x%lx op: %lx share op: %lx "
"op_old: %lx",
share->key, share, pOp, share->op, share->op_old));
break;
case NDBEVENT::TE_ALTER:
/* ToDo: remove printout */
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
share_prefix, share->table->s->db,
share->table->s->table_name,
share->key);
/* do the rename of the table in the share */
share->table->s->db= share->db;
share->table->s->table_name= share->table_name;
goto drop_alter_common;
case NDBEVENT::TE_DROP:
if (apply_status_share == share)
{
free_share(&apply_status_share);
apply_status_share= 0;
}
/* ToDo: remove printout */
sql_print_information("NDB Binlog: drop table %s.",
share->key);
drop_alter_common:
row.n_schemaops++;
DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
"share op: %lx op_old: %lx",
type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
share->key, share, pOp, share->op, share->op_old));
break;
case NDBEVENT::TE_NODE_FAILURE:
/* fall through */
case NDBEVENT::TE_SUBSCRIBE:
/* fall through */
case NDBEVENT::TE_UNSUBSCRIBE:
/* ignore */
return 0;
default:
sql_print_error("NDB Binlog: unknown non data event %d for %s. "
"Ignoring...", (unsigned) type, share->key);
return 0;
}
ndb_handle_schema_change(ndb, pOp, share);
return 0;
}
/*
Handle data events from the storage nodes
*/
static int
ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
Binlog_index_row &row,
injector::transaction &trans)
{
NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
if (share == apply_status_share)
return 0;
TABLE *table= share->table;
assert(table != 0);
dbug_print_table("table", table);
TABLE_SHARE *table_s= table->s;
uint n_fields= table_s->fields;
MY_BITMAP b;
/* Potential buffer for the bitmap */
uint32 bitbuf[128 / (sizeof(uint32) * 8)];
bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
n_fields, false);
bitmap_set_all(&b);
/*
row data is already in table->record[0]
As we told the NdbEventOperation to do this
(saves moving data about many times)
*/
switch(pOp->getEventType())
{
case NDBEVENT::TE_INSERT:
row.n_inserts++;
DBUG_PRINT("info", ("INSERT INTO %s", share->key));
{
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
trans.write_row(::server_id, injector::transaction::table(table, true),
&b, n_fields, table->record[0]);
}
break;
case NDBEVENT::TE_DELETE:
row.n_deletes++;
DBUG_PRINT("info",("DELETE FROM %s", share->key));
{
/*
table->record[0] contains only the primary key in this case
since we do not have an after image
*/
int n;
if (table->s->primary_key != MAX_KEY)
n= 0; /*
use the primary key only as it save time and space and
it is the only thing needed to log the delete
*/
else
n= 1; /*
we use the before values since we don't have a primary key
since the mysql server does not handle the hidden primary
key
*/
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
print_records(table, table->record[n]);
trans.delete_row(::server_id, injector::transaction::table(table, true),
&b, n_fields, table->record[n]);
}
break;
case NDBEVENT::TE_UPDATE:
row.n_updates++;
DBUG_PRINT("info", ("UPDATE %s", share->key));
{
ndb_unpack_record(table, share->ndb_value[0],
&b, table->record[0]);
print_records(table, table->record[0]);
if (table->s->primary_key != MAX_KEY)
{
/*
since table has a primary key, we can to a write
using only after values
*/
trans.write_row(::server_id, injector::transaction::table(table, true),
&b, n_fields, table->record[0]);// after values
}
else
{
/*
mysql server cannot handle the ndb hidden key and
therefore needs the before image as well
*/
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
print_records(table, table->record[1]);
trans.update_row(::server_id,
injector::transaction::table(table, true),
&b, n_fields,
table->record[1], // before values
table->record[0]);// after values
}
}
break;
default:
/* We should REALLY never get here. */
DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
break;
}
return 0;
}
//#define RUN_NDB_BINLOG_TIMER
#ifdef RUN_NDB_BINLOG_TIMER
class Timer
{
public:
Timer() { start(); }
void start() { gettimeofday(&m_start, 0); }
void stop() { gettimeofday(&m_stop, 0); }
ulong elapsed_ms()
{
return (ulong)
(((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
((longlong) m_stop.tv_usec -
(longlong) m_start.tv_usec + 999) / 1000);
}
private:
struct timeval m_start,m_stop;
};
#endif
/****************************************************************
Injector thread main loop
****************************************************************/
pthread_handler_t ndb_binlog_thread_func(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
Ndb *ndb= 0;
Thd_ndb *thd_ndb=0;
int ndb_update_binlog_index= 1;
injector *inj= injector::instance();
pthread_mutex_lock(&injector_mutex);
/*
Set up the Thread
*/
my_thread_init();
DBUG_ENTER("ndb_binlog_thread");
thd= new THD; /* note that contructor of THD uses DBUG_ */
THD_CHECK_SENTRY(thd);
thd->thread_stack= (char*) &thd; /* remember where our stack is */
if (thd->store_globals())
{
thd->cleanup();
delete thd;
ndb_binlog_thread_running= -1;
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
my_thread_end();
pthread_exit(0);
DBUG_RETURN(NULL);
}
thd->init_for_queries();
thd->command= COM_DAEMON;
injector_thd= thd;
/*
Set up ndb binlog
*/
sql_print_information("Starting Cluster Binlog");
pthread_detach_this_thread();
thd->real_id= pthread_self();
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
threads.append(thd);
pthread_mutex_unlock(&LOCK_thread_count);
thd->lex->start_transaction_opt= 0;
if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
schema_ndb->init())
{
sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
goto err;
}
if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
ndb->init())
{
sql_print_error("NDB Binlog: Getting Ndb object failed");
ndb_binlog_thread_running= -1;
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
goto err;
}
/*
Expose global reference to our ndb object.
Used by both sql client thread and binlog thread to interact
with the storage
pthread_mutex_lock(&injector_mutex);
*/
injector_ndb= ndb;
ndb_binlog_thread_running= 1;
/*
We signal the thread that started us that we've finished
starting up.
*/
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
thd->version= refresh_version;
thd->set_time();
thd->main_security_ctx.host_or_ip= "";
thd->client_capabilities= 0;
my_net_init(&thd->net, 0);
thd->main_security_ctx.master_access= ~0;
thd->main_security_ctx.priv_user= 0;
thd->proc_info= "Waiting for ndbcluster to start";
pthread_mutex_lock(&injector_mutex);
while (!ndbcluster_util_inited)
{
/* ndb not connected yet */
struct timespec abstime;
set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
if (abort_loop)
{
pthread_mutex_unlock(&injector_mutex);
goto err;
}
}
pthread_mutex_unlock(&injector_mutex);
/*
Main NDB Injector loop
*/
DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0);
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
{
sql_print_error("Could not allocate Thd_ndb object");
goto err;
}
set_thd_ndb(thd, thd_ndb);
thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
thd->query_id= 0; // to keep valgrind quiet
{
static char db[]= "";
thd->db= db;
open_binlog_index(thd, &binlog_tables, &binlog_index);
if (!apply_status_share)
{
sql_print_error("NDB: Could not get apply status share");
}
thd->db= db;
}
#ifdef RUN_NDB_BINLOG_TIMER
Timer main_timer;
#endif
for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= g_latest_trans_gci); )
{
#ifdef RUN_NDB_BINLOG_TIMER
main_timer.stop();
sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
main_timer.start();
#endif
/*
now we don't want any events before next gci is complete
*/
thd->proc_info= "Waiting for event from ndbcluster";
thd->set_time();
/* wait for event or 1000 ms */
Uint64 gci, schema_gci;
int res= ndb->pollEvents(1000, &gci);
int schema_res= schema_ndb->pollEvents(0, &schema_gci);
ndb_latest_received_binlog_epoch= gci;
while (gci > schema_gci && schema_res >= 0)
schema_res= schema_ndb->pollEvents(10, &schema_gci);
if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= g_latest_trans_gci)
break; /* Shutting down server */
if (binlog_index && binlog_index->s->version < refresh_version)
{
if (binlog_index->s->version < refresh_version)
{
close_thread_tables(thd);
binlog_index= 0;
}
}
MEM_ROOT **root_ptr=
my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
MEM_ROOT *old_root= *root_ptr;
MEM_ROOT mem_root;
init_sql_alloc(&mem_root, 4096, 0);
List<Cluster_replication_schema> schema_list;
*root_ptr= &mem_root;
if (unlikely(schema_res > 0))
{
NdbEventOperation *pOp= schema_ndb->nextEvent();
while (pOp != NULL)
{
if (!pOp->hasError())
ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp,
&schema_list, &mem_root);
else
sql_print_error("NDB: error %lu (%s) on handling "
"binlog schema event",
(ulong) pOp->getNdbError().code,
pOp->getNdbError().message);
pOp= schema_ndb->nextEvent();
}
}
if (res > 0)
{
DBUG_PRINT("info", ("pollEvents res: %d", res));
#ifdef RUN_NDB_BINLOG_TIMER
Timer gci_timer, write_timer;
int event_count= 0;
#endif
thd->proc_info= "Processing events";
NdbEventOperation *pOp= ndb->nextEvent();
Binlog_index_row row;
while (pOp != NULL)
{
ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
bzero((char*) &row, sizeof(row));
injector::transaction trans= inj->new_trans(thd);
gci= pOp->getGCI();
if (apply_status_share)
{
TABLE *table= apply_status_share->table;
MY_BITMAP b;
uint32 bitbuf;
DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
bitmap_init(&b, &bitbuf, table->s->fields, false);
bitmap_set_all(&b);
table->field[0]->store((longlong)::server_id);
table->field[1]->store((longlong)gci);
trans.write_row(::server_id,
injector::transaction::table(table, true),
&b, table->s->fields,
table->record[0]);
}
else
{
sql_print_error("NDB: Could not get apply status share");
}
#ifdef RUN_NDB_BINLOG_TIMER
write_timer.start();
#endif
do
{
#ifdef RUN_NDB_BINLOG_TIMER
event_count++;
#endif
if (pOp->hasError() &&
ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
goto err;
#ifndef DBUG_OFF
{
NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
DBUG_PRINT("info",
("EVENT TYPE:%d GCI:%lld last applied: %lld "
"share: 0x%lx", pOp->getEventType(), gci,
ndb_latest_applied_binlog_epoch, share));
DBUG_ASSERT(share != 0);
}
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
else
ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
pOp= ndb->nextEvent();
} while (pOp && pOp->getGCI() == gci);
/*
note! pOp is not referring to an event in the next epoch
or is == 0
*/
#ifdef RUN_NDB_BINLOG_TIMER
write_timer.stop();
#endif
if (row.n_inserts || row.n_updates
|| row.n_deletes || row.n_schemaops)
{
injector::transaction::binlog_pos start= trans.start_pos();
if (int r= trans.commit())
{
sql_print_error("NDB binlog:"
"Error during COMMIT of GCI. Error: %d",
r);
/* TODO: Further handling? */
}
row.gci= gci;
row.master_log_file= start.file_name();
row.master_log_pos= start.file_pos();
DBUG_PRINT("info",("COMMIT gci %lld",gci));
if (ndb_update_binlog_index)
ndb_add_binlog_index(thd, &row);
ndb_latest_applied_binlog_epoch= gci;
}
else
trans.commit();
ndb_latest_handled_binlog_epoch= gci;
#ifdef RUN_NDB_BINLOG_TIMER
gci_timer.stop();
sql_print_information("gci %ld event_count %d write time "
"%ld(%d e/s), total time %ld(%d e/s)",
(ulong)gci, event_count,
write_timer.elapsed_ms(),
event_count / write_timer.elapsed_ms(),
gci_timer.elapsed_ms(),
event_count / gci_timer.elapsed_ms());
#endif
}
}
{
Cluster_replication_schema *schema;
while ((schema= schema_list.pop()))
{
char *thd_db_save= thd->db;
thd->db= schema->db;
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
schema->query_length, FALSE,
schema->name[0] == 0);
thd->db= thd_db_save;
}
}
free_root(&mem_root, MYF(0));
*root_ptr= old_root;
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
}
err:
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
injector_ndb= 0;
pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory
sql_print_information("Stopping Cluster Binlog");
if (apply_status_share)
free_share(&apply_status_share);
if (schema_share)
free_share(&schema_share);
/* remove all event operations */
if (ndb)
{
NdbEventOperation *op;
DBUG_PRINT("info",("removing all event operations"));
while ((op= ndb->getEventOperation()))
{
DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
free_share(&share);
ndb->dropEventOperation(op);
}
delete ndb;
ndb= 0;
}
net_end(&thd->net);
thd->cleanup();
delete thd;
ndb_binlog_thread_running= -1;
(void) pthread_cond_signal(&injector_cond);
DBUG_PRINT("exit", ("ndb_binlog_thread"));
my_thread_end();
pthread_exit(0);
DBUG_RETURN(NULL);
}
bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
enum ha_stat_type stat_type)
{
char buf[IO_SIZE];
ulonglong ndb_latest_epoch= 0;
DBUG_ENTER("ndbcluster_show_status_binlog");
ndbcluster_show_status_binlog(thd, stat_print, stat_type);
pthread_mutex_lock(&injector_mutex);
if (injector_ndb)
{
ndb_latest_epoch= injector_ndb->getLatestGCI();
pthread_mutex_unlock(&injector_mutex);
snprintf(buf, sizeof(buf),
"latest_epoch=%llu, "
"latest_trans_epoch=%llu, "
"latest_received_binlog_epoch=%llu, "
"latest_handled_binlog_epoch=%llu, "
"latest_applied_binlog_epoch=%llu",
ndb_latest_epoch,
g_latest_trans_gci,
ndb_latest_received_binlog_epoch,
ndb_latest_handled_binlog_epoch,
ndb_latest_applied_binlog_epoch);
if (stat_print(thd, ndbcluster_hton.name, "binlog", buf))
DBUG_RETURN(TRUE);
}
else
pthread_mutex_unlock(&injector_mutex);
DBUG_RETURN(FALSE);
}
#endif /* HAVE_NDB_BINLOG */
--- New file ---
+++ sql/ha_ndbcluster_binlog.h 05/12/14 22:28:30
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
// Typedefs for long names
typedef NdbDictionary::Object NDBOBJ;
typedef NdbDictionary::Column NDBCOL;
typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
typedef NdbDictionary::Dictionary NDBDICT;
typedef NdbDictionary::Event NDBEVENT;
#ifdef HAVE_NDB_BINLOG
#define INJECTOR_EVENT_LEN 200
enum SCHEMA_OP_TYPE
{
SOT_DROP_TABLE,
SOT_CREATE_TABLE,
SOT_RENAME_TABLE,
SOT_ALTER_TABLE,
SOT_DROP_DB,
SOT_CREATE_DB,
SOT_ALTER_DB,
SOT_CLEAR_SLOCK
};
const uint max_ndb_nodes= 64; /* multiple of 32 */
extern pthread_t ndb_binlog_thread;
extern pthread_mutex_t injector_mutex;
extern pthread_cond_t injector_cond;
static const char *ha_ndb_ext=".ndb";
static const char share_prefix[]= "./";
extern unsigned char g_node_id_map[max_ndb_nodes];
extern handlerton ndbcluster_hton;
extern pthread_t ndb_util_thread;
extern pthread_mutex_t LOCK_ndb_util_thread;
extern pthread_cond_t COND_ndb_util_thread;
extern int ndbcluster_util_inited;
extern pthread_mutex_t ndbcluster_mutex;
extern HASH ndbcluster_open_tables;
extern Ndb_cluster_connection* g_ndb_cluster_connection;
extern long ndb_number_of_storage_nodes;
/*
Initialize the binlog part of the ndb handlerton
*/
void ndbcluster_binlog_init_handlerton();
/*
Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share);
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
const char *db,
const char *table_name,
bool do_binlog,
NDB_SHARE *share= 0);
int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
const char *event_name);
int ndbcluster_create_event_ops(NDB_SHARE *share,
const NDBTAB *ndbtab,
const char *event_name);
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length,
const char *db, const char *table_name,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
NDB_SHARE *share);
void ndb_rep_event_name(String *event_name,
const char *db, const char *tbl);
int ndbcluster_binlog_start();
pthread_handler_t ndb_binlog_thread_func(void *arg);
/*
table cluster_replication.apply_status
*/
void ndbcluster_setup_binlog_table_shares(THD *thd);
extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share;
extern THD *injector_thd;
extern int ndb_binlog_thread_running;
bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
enum ha_stat_type stat_type);
/*
prototypes for ndb handler utility function also needed by
the ndb binlog code
*/
int ndbcluster_find_all_files(THD *thd);
void ndb_unpack_record(TABLE *table, NdbValue *value,
MY_BITMAP *defined, byte *buf);
NDB_SHARE *ndbcluster_get_share(const char *key,
bool create_if_not_exists,
bool have_lock);
NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
void ndbcluster_real_free_share(NDB_SHARE **share);
int handle_trailing_share(NDB_SHARE *share);
inline NDB_SHARE *get_share(const char *key,
bool create_if_not_exists= TRUE,
bool have_lock= FALSE)
{
return ndbcluster_get_share(key, create_if_not_exists, have_lock);
}
inline NDB_SHARE *get_share(NDB_SHARE *share)
{
return ndbcluster_get_share(share);
}
inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
{
ndbcluster_free_share(share, have_lock);
}
inline void real_free_share(NDB_SHARE **share)
{
ndbcluster_real_free_share(share);
}
inline
Thd_ndb *
get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
inline
void
set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; }
Ndb* check_ndb_in_thd(THD* thd);
#endif /* HAVE_NDB_BINLOG */
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1979) | tomas | 15 Dec |