4423 Pekka Nousiainen 2011-08-11
wl#4124 f01_event.diff
listen to data events on head table
modified:
sql/ha_ndb_index_stat.cc
sql/ha_ndb_index_stat.h
storage/ndb/include/ndb_constants.h
storage/ndb/include/ndbapi/NdbIndexStat.hpp
storage/ndb/src/ndbapi/NdbIndexStat.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
storage/ndb/test/ndbapi/testIndexStat.cpp
storage/ndb/tools/ndb_index_stat.cpp
4422 Pekka Nousiainen 2011-08-09
treename is wl4124-new2 from x17_fix and up
modified:
.bzr-mysql/default.conf
=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc 2011-07-23 17:22:32 +0000
+++ b/sql/ha_ndb_index_stat.cc 2011-08-11 17:11:30 +0000
@@ -498,8 +498,8 @@ struct Ndb_index_stat_glob {
uint list_count[Ndb_index_stat::LT_Count]; /* Temporary use */
uint total_count;
uint force_update;
- uint no_stats;
uint wait_update;
+ uint no_stats;
uint cache_query_bytes; /* In use */
uint cache_clean_bytes; /* Obsolete versions not yet removed */
Ndb_index_stat_glob() :
@@ -890,6 +890,39 @@ ndb_index_stat_free(NDB_SHARE *share)
pthread_mutex_unlock(&ndb_index_stat_list_mutex);
}
+/* Find entry across shares */
+/* wl4124_todo mutex overkill, hash table, can we find table share */
+Ndb_index_stat*
+ndb_index_stat_find_entry(int index_id, int index_version, int table_id)
+{
+ DBUG_ENTER("ndb_index_stat_find_entry");
+ pthread_mutex_lock(&ndbcluster_mutex);
+ pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
+ index_id, index_version, table_id));
+
+ int lt;
+ for (lt=1; lt < Ndb_index_stat::LT_Count; lt++)
+ {
+ Ndb_index_stat *st=ndb_index_stat_list[lt].head;
+ while (st != 0)
+ {
+ if (st->index_id == index_id &&
+ st->index_version == index_version)
+ {
+ pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(st);
+ }
+ st= st->list_next;
+ }
+ }
+
+ pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(0);
+}
+
/* Statistics thread sub-routines */
void
@@ -931,12 +964,14 @@ ndb_index_stat_cache_clean(Ndb_index_sta
/* Misc in/out parameters for process steps */
struct Ndb_index_stat_proc {
+ NdbIndexStat* is_util; // For metadata and polling
Ndb *ndb;
time_t now;
int lt;
bool busy;
bool end;
Ndb_index_stat_proc() :
+ is_util(0),
ndb(0),
now(0),
lt(0),
@@ -1383,6 +1418,75 @@ ndb_index_stat_proc_error(Ndb_index_stat
pr.busy= true;
}
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+ /*
+ Put on Check list if idle.
+ We get event also for our own analyze but this should not matter.
+ */
+ pr.lt= st->lt;
+ if (st->lt == Ndb_index_stat::LT_Idle ||
+ st->lt == Ndb_index_stat::LT_Error)
+ pr.lt= Ndb_index_stat::LT_Check;
+}
+
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr)
+{
+ NdbIndexStat *is= pr.is_util;
+ Ndb *ndb= pr.ndb;
+ int ret;
+ ret= is->poll_listener(ndb, 0);
+ DBUG_PRINT("index_stat", ("poll_listener ret: %d", ret));
+ if (ret == -1)
+ {
+ // wl4124_todo report error
+ DBUG_ASSERT(false);
+ return;
+ }
+ if (ret == 0)
+ return;
+
+ while (1)
+ {
+ ret= is->next_listener(ndb);
+ DBUG_PRINT("index_stat", ("next_listener ret: %d", ret));
+ if (ret == -1)
+ {
+ // wl4124_todo report error
+ DBUG_ASSERT(false);
+ return;
+ }
+ if (ret == 0)
+ break;
+
+ NdbIndexStat::Head head;
+ is->get_head(head);
+ DBUG_PRINT("index_stat", ("next_listener eventType: %d indexId: %u",
+ head.m_eventType, head.m_indexId));
+
+ Ndb_index_stat *st= ndb_index_stat_find_entry(head.m_indexId,
+ head.m_indexVersion,
+ head.m_tableId);
+ /*
+ Another process can update stats for an index which is not found
+ in this mysqld. Ignore it.
+ */
+ if (st != 0)
+ {
+ DBUG_PRINT("index_stat", ("st %s proc %s", st->id, "Event"));
+ ndb_index_stat_proc_event(pr, st);
+ if (pr.lt != st->lt)
+ ndb_index_stat_list_move(st, pr.lt);
+ }
+ else
+ {
+ DBUG_PRINT("index_stat", ("entry not found in this mysqld"));
+ }
+ }
+}
+
#ifndef DBUG_OFF
void
ndb_index_stat_report(const Ndb_index_stat_glob& old_glob)
@@ -1466,6 +1570,7 @@ ndb_index_stat_proc(Ndb_index_stat_proc
ndb_index_stat_proc_evict(pr);
ndb_index_stat_proc_delete(pr);
ndb_index_stat_proc_error(pr);
+ ndb_index_stat_proc_event(pr);
#ifndef DBUG_OFF
ndb_index_stat_report(old_glob);
@@ -1511,11 +1616,14 @@ ndb_index_stat_end()
/* Index stats thread */
-static int
-ndb_index_stat_check_or_create_systables(NdbIndexStat* is, Ndb* ndb)
+int
+ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc &pr)
{
DBUG_ENTER("ndb_index_stat_check_or_create_systables");
+ NdbIndexStat *is= pr.is_util;
+ Ndb *ndb= pr.ndb;
+
if (is->check_systables(ndb) == 0)
{
DBUG_PRINT("index_stat", ("using existing index stats tables"));
@@ -1542,6 +1650,82 @@ ndb_index_stat_check_or_create_systables
DBUG_RETURN(-1);
}
+int
+ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc &pr)
+{
+ DBUG_ENTER("ndb_index_stat_check_or_create_sysevents");
+
+ NdbIndexStat *is= pr.is_util;
+ Ndb *ndb= pr.ndb;
+
+ if (is->check_sysevents(ndb) == 0)
+ {
+ DBUG_PRINT("index_stat", ("using existing index stats events"));
+ DBUG_RETURN(0);
+ }
+
+ if (is->create_sysevents(ndb) == 0)
+ {
+ DBUG_PRINT("index_stat", ("created index stats events"));
+ DBUG_RETURN(0);
+ }
+
+ if (is->getNdbError().code == 746)
+ {
+ // race between mysqlds, maybe
+ DBUG_PRINT("index_stat", ("create index stats events failed: error %d line %d",
+ is->getNdbError().code, is->getNdbError().line));
+ DBUG_RETURN(-1);
+ }
+
+ sql_print_warning("create index stats events failed: error %d line %d",
+ is->getNdbError().code, is->getNdbError().line);
+ DBUG_RETURN(-1);
+}
+
+int
+ndb_index_stat_start_listener(Ndb_index_stat_proc &pr)
+{
+ DBUG_ENTER("ndb_index_stat_start_listener");
+
+ NdbIndexStat *is= pr.is_util;
+ Ndb *ndb= pr.ndb;
+
+ if (is->create_listener(ndb) == -1)
+ {
+ sql_print_warning("create index stats listener failed: error %d line %d",
+ is->getNdbError().code, is->getNdbError().line);
+ DBUG_RETURN(-1);
+ }
+
+ if (is->execute_listener(ndb) == -1)
+ {
+ sql_print_warning("execute index stats listener failed: error %d line %d",
+ is->getNdbError().code, is->getNdbError().line);
+ DBUG_RETURN(-1);
+ }
+
+ DBUG_RETURN(0);
+}
+
+int
+ndb_index_stat_stop_listener(Ndb_index_stat_proc &pr)
+{
+ DBUG_ENTER("ndb_index_stat_stop_listener");
+
+ NdbIndexStat *is= pr.is_util;
+ Ndb *ndb= pr.ndb;
+
+ if (is->drop_listener(ndb) == -1)
+ {
+ sql_print_warning("drop index stats listener failed: error %d line %d",
+ is->getNdbError().code, is->getNdbError().line);
+ DBUG_RETURN(-1);
+ }
+
+ DBUG_RETURN(0);
+}
+
pthread_handler_t
ndb_index_stat_thread_func(void *arg __attribute__((unused)))
{
@@ -1552,6 +1736,10 @@ ndb_index_stat_thread_func(void *arg __a
my_thread_init();
DBUG_ENTER("ndb_index_stat_thread_func");
+ Ndb_index_stat_proc pr;
+ NdbIndexStat is_util;
+ pr.is_util= &is_util;
+
// wl4124_todo remove useless stuff copied from utility thread
pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
@@ -1638,10 +1826,13 @@ ndb_index_stat_thread_func(void *arg __a
pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
goto ndb_index_stat_thread_end;
}
+ pr.ndb= thd_ndb->ndb;
ndb_index_stat_allow(1);
bool enable_ok;
enable_ok= false;
+ bool have_listener;
+ have_listener= false;
set_timespec(abstime, 0);
for (;;)
@@ -1662,9 +1853,6 @@ ndb_index_stat_thread_func(void *arg __a
/* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
- Ndb_index_stat_proc pr;
- pr.ndb= thd_ndb->ndb;
-
do
{
if (enable_ok != enable_ok_new)
@@ -1674,13 +1862,24 @@ ndb_index_stat_thread_func(void *arg __a
if (enable_ok_new)
{
- // at enable check or create stats tables
- NdbIndexStat is;
- if (ndb_index_stat_check_or_create_systables(&is, thd_ndb->ndb) == -1)
+ // at enable check or create stats tables and events
+ if (ndb_index_stat_check_or_create_systables(pr) == -1 ||
+ ndb_index_stat_check_or_create_sysevents(pr) == -1 ||
+ ndb_index_stat_start_listener(pr) == -1)
{
// try again in next loop
break;
}
+ have_listener= true;
+ }
+ else
+ {
+ // not a normal use-case
+ if (have_listener)
+ {
+ if (ndb_index_stat_stop_listener(pr) == 0)
+ have_listener= false;
+ }
}
enable_ok= enable_ok_new;
}
@@ -1711,6 +1910,11 @@ ndb_index_stat_thread_end:
net_end(&thd->net);
ndb_index_stat_thread_fail:
+ if (have_listener)
+ {
+ if (ndb_index_stat_stop_listener(pr) == 0)
+ have_listener= false;
+ }
if (thd_ndb)
{
ha_ndbcluster::release_thd_ndb(thd_ndb);
=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h 2011-07-23 14:35:37 +0000
+++ b/sql/ha_ndb_index_stat.h 2011-08-11 17:11:30 +0000
@@ -19,6 +19,8 @@
extern struct st_ndb_status g_ndb_status;
+extern pthread_mutex_t ndbcluster_mutex;
+
extern pthread_t ndb_index_stat_thread;
extern pthread_cond_t COND_ndb_index_stat_thread;
extern pthread_mutex_t LOCK_ndb_index_stat_thread;
=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/ndb_constants.h 2011-08-11 17:11:30 +0000
@@ -120,6 +120,7 @@
#define NDB_INDEX_STAT_HEAD_TABLE "ndb_index_stat_head"
#define NDB_INDEX_STAT_SAMPLE_TABLE "ndb_index_stat_sample"
#define NDB_INDEX_STAT_SAMPLE_INDEX1 "ndb_index_stat_sample_x1"
+#define NDB_INDEX_STAT_HEAD_EVENT "ndb_index_stat_head_event"
#define NDB_INDEX_STAT_PREFIX "ndb_index_stat"
=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp 2011-07-31 17:40:01 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp 2011-08-11 17:11:30 +0000
@@ -100,7 +100,10 @@ public:
InvalidCache = 4718,
InternalError = 4719,
BadSysTables = 4720, // sys tables partly missing or invalid
- HaveSysTables = 4244 // create error if all sys tables exist
+ HaveSysTables = 4244, // create error if all sys tables exist
+ NoSysEvents = 4710,
+ BadSysEvents = BadSysTables,
+ HaveSysEvents = 746
};
/*
@@ -191,6 +194,7 @@ public:
*/
struct Head {
Int32 m_found; // -1 no read done, 0 = no record, 1 = exists
+ Int32 m_eventType; // if polling, NdbDictionary::Event::TE_INSERT etc
Uint32 m_indexId;
Uint32 m_indexVersion;
Uint32 m_tableId;
@@ -327,6 +331,42 @@ public:
static void get_rule(const Stat& stat, char* buffer);
/*
+ * Events (there is 1) for polling. These are dictionary objects.
+ * Correct sys tables must exist. Drop ignores non-existing events.
+ */
+ int create_sysevents(Ndb* ndb);
+ int drop_sysevents(Ndb* ndb);
+ int check_sysevents(Ndb* ndb);
+
+ /*
+ * Create listener for stats updates. Only 1 is allowed.
+ */
+ int create_listener(Ndb* ndb);
+
+ /*
+ * Start listening for events (call NdbEventOperation::execute).
+ */
+ int execute_listener(Ndb* ndb);
+
+ /*
+ * Poll the listener (call Ndb::pollEvents). Returns 1 if there are
+ * events available and 0 otherwise, or -1 on failure as usual.
+ */
+ int poll_listener(Ndb* ndb, int max_wait_ms);
+
+ /*
+ * Get next available event. Returns 1 if a new event was returned
+ * and 0 otherwise, or -1 on failure as usual. Use get_heed() to
+ * retrieve event type and data.
+ */
+ int next_listener(Ndb* ndb);
+
+ /*
+ * Drop the listener.
+ */
+ int drop_listener(Ndb* ndb);
+
+ /*
* Memory allocator for stats cache data (key and value byte arrays).
* Implementation default uses malloc/free. The memory in use is the
* sum of CacheInfo::m_totalBytes from all cache types.
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-08-11 17:11:30 +0000
@@ -656,6 +656,82 @@ NdbIndexStat::get_rule(const Stat& stat_
DBUG_VOID_RETURN;
}
+// events and polling
+
+int
+NdbIndexStat::create_sysevents(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::create_sysevents");
+ if (m_impl.create_sysevents(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::drop_sysevents(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::drop_sysevents");
+ if (m_impl.drop_sysevents(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::check_sysevents(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::check_sysevents");
+ if (m_impl.check_sysevents(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::create_listener(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::create_listener");
+ if (m_impl.create_listener(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::execute_listener(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::execute_listener");
+ if (m_impl.execute_listener(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+ DBUG_ENTER("NdbIndexStat::poll_listener");
+ int ret = m_impl.poll_listener(ndb, max_wait_ms);
+ if (ret == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::next_listener(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::next_listener");
+ int ret = m_impl.next_listener(ndb);
+ if (ret == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::drop_listener(Ndb* ndb)
+{
+ DBUG_ENTER("NdbIndexStat::drop_listener");
+ if (m_impl.drop_listener(ndb) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
// mem
NdbIndexStat::Mem::Mem()
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-09 15:37:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-11 17:11:30 +0000
@@ -22,6 +22,7 @@
#include <Bitmask.hpp>
#include <NdbSqlUtil.hpp>
#include <NdbRecord.hpp>
+#include <NdbEventOperation.hpp>
#include "NdbIndexStatImpl.hpp"
#undef min
@@ -33,8 +34,8 @@ static const char* const g_headtable_nam
static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
-const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
-const int ERR_TupleNotFound[] = { 626, 0 };
+static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
+static const int ERR_TupleNotFound[] = { 626, 0 };
NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
NdbIndexStat(*this),
@@ -45,6 +46,7 @@ NdbIndexStatImpl::NdbIndexStatImpl(NdbIn
init();
m_query_mutex = NdbMutex_Create();
assert(m_query_mutex != 0);
+ m_eventOp = 0;
m_mem_handler = &c_mem_default_handler;
}
@@ -544,10 +546,8 @@ NdbIndexStatImpl::drop_systables(Ndb* nd
}
int
-NdbIndexStatImpl::check_systables(Ndb* ndb)
+NdbIndexStatImpl::check_systables(Sys& sys)
{
- Sys sys(this, ndb);
-
if (get_systables(sys) == -1)
return -1;
@@ -566,6 +566,17 @@ NdbIndexStatImpl::check_systables(Ndb* n
return 0;
}
+int
+NdbIndexStatImpl::check_systables(Ndb* ndb)
+{
+ Sys sys(this, ndb);
+
+ if (check_systables(sys) == -1)
+ return -1;
+
+ return 0;
+}
+
// operation context
NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
@@ -781,6 +792,7 @@ void
NdbIndexStatImpl::init_head(Head& head)
{
head.m_found = -1;
+ head.m_eventType = -1;
head.m_indexId = 0;
head.m_indexVersion = 0;
head.m_tableId = 0;
@@ -2259,6 +2271,183 @@ NdbIndexStatImpl::query_keycmp(const Cac
return res;
}
+// events and polling
+
+int
+NdbIndexStatImpl::create_sysevents(Ndb* ndb)
+{
+ Sys sys(this, ndb);
+ NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+ if (check_systables(sys) == -1)
+ return -1;
+ const NdbDictionary::Table* tab = sys.m_headtable;
+ require(tab != 0);
+
+ const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+ NdbDictionary::Event ev(evname, *tab);
+ ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
+ ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
+ ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
+ for (int i = 0; i < tab->getNoOfColumns(); i++)
+ ev.addEventColumn(i);
+ ev.setReport(NdbDictionary::Event::ER_UPDATED);
+
+ if (dic->createEvent(ev) == -1)
+ {
+ setError(dic->getNdbError().code, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
+{
+ Sys sys(this, ndb);
+ NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+ if (check_systables(sys) == -1)
+ return -1;
+
+ const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+ if (dic->dropEvent(evname) == -1)
+ {
+ int code = dic->getNdbError().code;
+ if (code != 4710)
+ {
+ setError(dic->getNdbError().code, __LINE__);
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int
+NdbIndexStatImpl::check_sysevents(Ndb* ndb)
+{
+ Sys sys(this, ndb);
+ NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+ if (check_systables(sys) == -1)
+ return -1;
+
+ const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+ const NdbDictionary::Event* ev = dic->getEvent(evname);
+ if (ev == 0)
+ {
+ setError(dic->getNdbError().code, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbIndexStatImpl::create_listener(Ndb* ndb)
+{
+ if (m_eventOp != 0)
+ {
+ setError(UsageError, __LINE__);
+ return -1;
+ }
+ const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+ m_eventOp = ndb->createEventOperation(evname);
+ if (m_eventOp == 0)
+ {
+ setError(ndb->getNdbError().code, __LINE__);
+ return -1;
+ }
+
+ // all columns are non-nullable
+ Head& head = m_facadeHead;
+ if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
+ m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
+ m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
+ m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
+ m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
+ m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
+ m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
+ m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
+ m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
+ {
+ setError(m_eventOp->getNdbError().code, __LINE__);
+ return -1;
+ }
+ // wl4124_todo why this
+ static Head xxx;
+ if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
+ m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
+ m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
+ m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
+ m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
+ m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
+ m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
+ m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
+ m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
+ {
+ setError(m_eventOp->getNdbError().code, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbIndexStatImpl::execute_listener(Ndb* ndb)
+{
+ if (m_eventOp == 0)
+ {
+ setError(UsageError, __LINE__);
+ return -1;
+ }
+ if (m_eventOp->execute() == -1)
+ {
+ setError(m_eventOp->getNdbError().code, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+ int ret;
+ if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
+ {
+ setError(ndb->getNdbError().code, __LINE__);
+ return -1;
+ }
+ return (ret == 0 ? 0 : 1);
+}
+
+int
+NdbIndexStatImpl::next_listener(Ndb* ndb)
+{
+ NdbEventOperation* op = ndb->nextEvent();
+ if (op == 0)
+ return 0;
+
+ Head& head = m_facadeHead;
+ head.m_eventType = (int)op->getEventType();
+ return 1;
+}
+
+int
+NdbIndexStatImpl::drop_listener(Ndb* ndb)
+{
+ if (m_eventOp == 0)
+ {
+ setError(UsageError, __LINE__);
+ return -1;
+ }
+ if (ndb->dropEventOperation(m_eventOp) != 0)
+ {
+ setError(ndb->getNdbError().code, __LINE__);
+ return -1;
+ }
+ m_eventOp = 0;
+ return 0;
+}
+
// mem alloc - default impl
NdbIndexStatImpl::MemDefault::MemDefault()
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp 2011-07-31 17:40:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp 2011-08-11 17:11:30 +0000
@@ -29,6 +29,7 @@ class NdbTransaction;
class NdbIndexScanOperation;
class NdbRecAttr;
class NdbOperation;
+class NdbEventOperation;
extern const uint g_ndb_index_stat_head_frm_len;
extern const uint8 g_ndb_index_stat_head_frm_data[];
@@ -71,6 +72,7 @@ public:
Cache* m_cacheClean;
// mutex for query cache switch, memory barrier would do
NdbMutex* m_query_mutex;
+ NdbEventOperation* m_eventOp;
Mem* m_mem_handler;
NdbIndexStat::Error m_error;
@@ -98,6 +100,7 @@ public:
int get_systables(Sys& sys);
int create_systables(Ndb* ndb);
int drop_systables(Ndb* ndb);
+ int check_systables(Sys& sys);
int check_systables(Ndb* ndb);
// operation context
@@ -279,6 +282,17 @@ public:
void query_search(const Cache&, const Bound&, StatBound&);
int query_keycmp(const Cache&, const Bound&, uint pos, Uint32& numEq);
+ // events and polling
+ int create_sysevents(Ndb* ndb);
+ int drop_sysevents(Ndb* ndb);
+ int check_sysevents(Ndb* ndb);
+ //
+ int create_listener(Ndb* ndb);
+ int execute_listener(Ndb* ndb);
+ int poll_listener(Ndb* ndb, int max_wait_ms);
+ int next_listener(Ndb* ndb);
+ int drop_listener(Ndb* ndb);
+
// default memory allocator
struct MemDefault : public Mem {
virtual void* mem_alloc(UintPtr bytes);
=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-09 15:36:25 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-11 17:11:30 +0000
@@ -134,6 +134,7 @@ static NdbIndexScanOperation* g_rangesca
static NdbIndexStat* g_is = 0;
static bool g_has_created_stat_tables = false;
+static bool g_has_created_stat_events = false;
static uint
urandom()
@@ -213,7 +214,7 @@ errdb()
ll0(++any << " rangescan_op: error " << e);
}
if (g_is != 0) {
- const NdbError& e = g_is->getNdbError();
+ const NdbIndexStat::Error& e = g_is->getNdbError();
if (e.code != 0)
ll0(++any << " stat: error " << e);
}
@@ -1518,6 +1519,40 @@ readstat()
return 0;
}
+// test polling after updatestat
+
+static int
+startlistener()
+{
+ ll1("startlistener");
+ chkdb(g_is->create_listener(g_ndb_sys) == 0);
+ chkdb(g_is->execute_listener(g_ndb_sys) == 0);
+ return 0;
+}
+
+static int
+runlistener()
+{
+ ll1("runlistener");
+ int ret;
+ chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1);
+ chkrc(ret == 1);
+ // one event is expected
+ chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+ chkrc(ret == 1);
+ chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+ chkrc(ret == 0);
+ return 0;
+}
+
+static int
+stoplistener()
+{
+ ll1("stoplistener");
+ chkdb(g_is->drop_listener(g_ndb_sys) != -1);
+ return 0;
+}
+
// stats queries
// exact stats from scan results
@@ -2081,6 +2116,7 @@ runtest()
chkrc(createindex() == 0);
chkrc(createNdbRecords() == 0);
chkrc(definestat() == 0);
+ chkrc(startlistener() == 0);
for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
ll0("=== loop " << g_loop << " ===");
@@ -2094,6 +2130,7 @@ runtest()
makeranges();
chkrc(scanranges() == 0);
chkrc(updatestat() == 0);
+ chkrc(runlistener() == 0);
chkrc(readstat() == 0);
chkrc(queryranges() == 0);
loopstats();
@@ -2101,6 +2138,7 @@ runtest()
}
finalstats();
+ chkrc(stoplistener() == 0);
if (!g_opts.keeptable)
chkrc(droptable() == 0);
freeranges();
@@ -2234,22 +2272,66 @@ docreate_stat_tables()
{
if (g_is->check_systables(g_ndb_sys) == 0)
return 0;
+ ll1("check_systables: " << g_is->getNdbError());
- if (g_is->create_systables(g_ndb_sys) == 0)
- {
- g_has_created_stat_tables = true;
- return 0;
- }
- return -1;
+ ll0("create stat tables");
+ chkdb(g_is->create_systables(g_ndb_sys) == 0);
+ g_has_created_stat_tables = true;
+ return 0;
}
static
-void
+int
dodrop_stat_tables()
{
if (g_has_created_stat_tables == false)
- return;
- g_is->drop_systables(g_ndb_sys);
+ return 0;
+
+ ll0("drop stat tables");
+ chkdb(g_is->drop_systables(g_ndb_sys) == 0);
+ return 0;
+}
+
+static int
+docreate_stat_events()
+{
+ if (g_is->check_sysevents(g_ndb_sys) == 0)
+ return 0;
+ ll1("check_sysevents: " << g_is->getNdbError());
+
+ ll0("create stat events");
+ chkdb(g_is->create_sysevents(g_ndb_sys) == 0);
+ g_has_created_stat_events = true;
+ return 0;
+}
+
+static int
+dodrop_stat_events()
+{
+ if (g_has_created_stat_events == false)
+ return 0;
+
+ ll0("drop stat events");
+ chkdb(g_is->drop_sysevents(g_ndb_sys) == 0);
+ return 0;
+}
+
+static int
+docreate_sys_objects()
+{
+ require(g_is != 0 && g_ndb_sys != 0);
+ chkrc(docreate_stat_tables() == 0);
+ chkrc(docreate_stat_events() == 0);
+ return 0;
+}
+
+static int
+dodrop_sys_objects()
+{
+ require(g_is != 0 && g_ndb_sys != 0);
+ chkrc(dodrop_stat_events() == 0);
+ chkrc(dodrop_stat_tables() == 0);
+ return 0;
}
int
@@ -2277,17 +2359,22 @@ main(int argc, char** argv)
ll0("connect failed");
return NDBT_ProgramExit(NDBT_FAILED);
}
- if (docreate_stat_tables() == -1){
- ll0("failed to create stat tables");
- return NDBT_ProgramExit(NDBT_FAILED);
+ if (docreate_sys_objects() == -1) {
+ ll0("failed to check or create stat tables and events");
+ goto failed;
}
if (runtest() == -1) {
ll0("test failed");
- dodrop_stat_tables();
- dodisconnect();
- return NDBT_ProgramExit(NDBT_FAILED);
+ goto failed;
+ }
+ if (dodrop_sys_objects() == -1) {
+ ll0("failed to drop created stat tables or events");
+ goto failed;
}
- dodrop_stat_tables();
dodisconnect();
return NDBT_ProgramExit(NDBT_OK);
+failed:
+ (void)dodrop_sys_objects();
+ dodisconnect();
+ return NDBT_ProgramExit(NDBT_FAILED);
}
=== modified file 'storage/ndb/tools/ndb_index_stat.cpp'
--- a/storage/ndb/tools/ndb_index_stat.cpp 2011-07-31 17:40:01 +0000
+++ b/storage/ndb/tools/ndb_index_stat.cpp 2011-08-11 17:11:30 +0000
@@ -35,6 +35,8 @@ static my_bool _sys_create = false;
static my_bool _sys_create_if_not_exist = false;
static my_bool _sys_create_if_not_valid = false;
static my_bool _sys_check = false;
+static my_bool _sys_skip_tables = false;
+static my_bool _sys_skip_events = false;
static int _sys_any = 0;
// other
static my_bool _verbose = false;
@@ -382,69 +384,146 @@ dosys()
{
if (_sys_drop)
{
- g_info << "dropping any sys tables" << endl;
- CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
- CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
- CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
- "unexpected error: " << g_is->getNdbError());
+ if (!_sys_skip_events)
+ {
+ g_info << "dropping sys events" << endl;
+ CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+ "unexpected error: " << g_is->getNdbError());
+ }
+
+ if (!_sys_skip_tables)
+ {
+ g_info << "dropping all sys tables" << endl;
+ CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+ "unexpected error: " << g_is->getNdbError());
+ }
g_info << "drop done" << endl;
}
if (_sys_create)
{
- g_info << "creating all sys tables" << endl;
- CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
- CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
- g_info << "create done" << endl;
- }
-
- if (_sys_create_if_not_exist)
- {
- if (g_is->check_systables(g_ndb_sys) == -1)
+ if (!_sys_skip_tables)
{
- CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
- g_is->getNdbError());
g_info << "creating all sys tables" << endl;
CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ }
+
+ if (!_sys_skip_events)
+ {
+ g_info << "creating sys events" << endl;
+ CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
g_info << "create done" << endl;
}
- else
+ }
+
+ if (_sys_create_if_not_exist)
+ {
+ if (!_sys_skip_tables)
{
- g_info << "using existing sys tables" << endl;
+ if (g_is->check_systables(g_ndb_sys) == -1)
+ {
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+ g_is->getNdbError());
+ g_info << "creating all sys tables" << endl;
+ CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "create done" << endl;
+ }
+ else
+ {
+ g_info << "using existing sys tables" << endl;
+ }
+ }
+
+ if (!_sys_skip_events)
+ {
+ if (g_is->check_sysevents(g_ndb_sys) == -1)
+ {
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+ g_is->getNdbError());
+ g_info << "creating sys events" << endl;
+ CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "create done" << endl;
+ }
+ else
+ {
+ g_info << "using existing sys events" << endl;
+ }
}
}
if (_sys_create_if_not_valid)
{
- if (g_is->check_systables(g_ndb_sys) == -1)
+ if (!_sys_skip_tables)
{
- if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+ if (g_is->check_systables(g_ndb_sys) == -1)
{
- CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
- g_is->getNdbError());
- g_info << "dropping invalid sys tables" << endl;
- CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
- CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
- CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
- "unexpected error: " << g_is->getNdbError());
- g_info << "drop done" << endl;
+ if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+ {
+ CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
+ g_is->getNdbError());
+ g_info << "dropping invalid sys tables" << endl;
+ CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+ "unexpected error: " << g_is->getNdbError());
+ g_info << "drop done" << endl;
+ }
+ g_info << "creating all sys tables" << endl;
+ CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "create done" << endl;
+ }
+ else
+ {
+ g_info << "using existing sys tables" << endl;
}
- g_info << "creating all sys tables" << endl;
- CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
- CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
- g_info << "create done" << endl;
}
- else
+ if (!_sys_skip_events)
{
- g_info << "using existing sys tables" << endl;
+ if (g_is->check_sysevents(g_ndb_sys) == -1)
+ {
+ if (g_is->getNdbError().code != NdbIndexStat::NoSysEvents)
+ {
+ CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysEvents,
+ g_is->getNdbError());
+ g_info << "dropping invalid sys events" << endl;
+ CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+ CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+ "unexpected error: " << g_is->getNdbError());
+ g_info << "drop done" << endl;
+ }
+ g_info << "creating sys events" << endl;
+ CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "create done" << endl;
+ }
+ else
+ {
+ g_info << "using existing sys events" << endl;
+ }
}
}
if (_sys_check)
{
- CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
- g_info << "sys tables ok" << endl;
+ if (!_sys_skip_tables)
+ {
+ CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "sys tables ok" << endl;
+ }
+ if (!_sys_skip_events)
+ {
+ CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+ g_info << "sys events ok" << endl;
+ }
}
}
while (0);
@@ -514,25 +593,33 @@ my_long_options[] =
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
// sys options
{ "sys-drop", ++oi,
- "Drop any stats tables in NDB kernel (all stats is lost)",
+ "Drop any stats tables and events in NDB kernel (all stats is lost)",
(uchar **)&_sys_drop, (uchar **)&_sys_drop, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "sys-create", ++oi,
- "Create stats tables in NDB kernel (must not exist)",
+ "Create stats tables and events in NDB kernel (must not exist)",
(uchar **)&_sys_create, (uchar **)&_sys_create, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "sys-create-if-not-exist", ++oi,
- "Like --sys-create but do nothing if correct stats tables exist",
+ "Like --sys-create but do nothing if correct objects exist",
(uchar **)&_sys_create_if_not_exist, (uchar **)&_sys_create_if_not_exist, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "sys-create-if-not-valid", ++oi,
- "Like --sys-create-if-not-exist but first drop any invalid tables",
+ "Like --sys-create-if-not-exist but first drop any invalid objects",
(uchar **)&_sys_create_if_not_valid, (uchar **)&_sys_create_if_not_valid, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "sys-check", ++oi,
- "Check that correct stats tables exist in NDB kernel",
+ "Check that correct stats tables and events exist in NDB kernel",
(uchar **)&_sys_check, (uchar **)&_sys_check, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "sys-skip-tables", ++oi,
+ "Do not apply sys options to tables",
+ (uchar **)&_sys_skip_tables, (uchar **)&_sys_skip_tables, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "sys-skip-events", ++oi,
+ "Do not apply sys options to events",
+ (uchar **)&_sys_skip_events, (uchar **)&_sys_skip_events, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
// other
{ "verbose", 'v',
"Verbose messages",
@@ -581,7 +668,9 @@ checkopts(int argc, char** argv)
(_sys_create_if_not_exist != 0) +
(_sys_create_if_not_valid != 0) +
(_sys_drop != 0) +
- ( _sys_check != 0);
+ ( _sys_check != 0) +
+ (_sys_skip_tables != 0) +
+ (_sys_skip_events != 0);
if (!_sys_any)
{
if (_dbname == 0)
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4422to 4423) WL#4124 | Pekka Nousiainen | 16 Aug |