From: Pekka Nousiainen Date: August 11 2011 5:13pm Subject: bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4422 to 4423) WL#4124 List-Archive: http://lists.mysql.com/commits/140594 Message-Id: <20110811171302.9AB0055875@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4423 Pekka Nousiainen 2011-08-11 wl#4124 f01_event.diff listen to data events on head table modified: sql/ha_ndb_index_stat.cc sql/ha_ndb_index_stat.h storage/ndb/include/ndb_constants.h storage/ndb/include/ndbapi/NdbIndexStat.hpp storage/ndb/src/ndbapi/NdbIndexStat.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp storage/ndb/test/ndbapi/testIndexStat.cpp storage/ndb/tools/ndb_index_stat.cpp 4422 Pekka Nousiainen 2011-08-09 treename is wl4124-new2 from x17_fix and up modified: .bzr-mysql/default.conf === modified file 'sql/ha_ndb_index_stat.cc' --- a/sql/ha_ndb_index_stat.cc 2011-07-23 17:22:32 +0000 +++ b/sql/ha_ndb_index_stat.cc 2011-08-11 17:11:30 +0000 @@ -498,8 +498,8 @@ struct Ndb_index_stat_glob { uint list_count[Ndb_index_stat::LT_Count]; /* Temporary use */ uint total_count; uint force_update; - uint no_stats; uint wait_update; + uint no_stats; uint cache_query_bytes; /* In use */ uint cache_clean_bytes; /* Obsolete versions not yet removed */ Ndb_index_stat_glob() : @@ -890,6 +890,39 @@ ndb_index_stat_free(NDB_SHARE *share) pthread_mutex_unlock(&ndb_index_stat_list_mutex); } +/* Find entry across shares */ +/* wl4124_todo mutex overkill, hash table, can we find table share */ +Ndb_index_stat* +ndb_index_stat_find_entry(int index_id, int index_version, int table_id) +{ + DBUG_ENTER("ndb_index_stat_find_entry"); + pthread_mutex_lock(&ndbcluster_mutex); + pthread_mutex_lock(&ndb_index_stat_list_mutex); + DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d", + index_id, index_version, table_id)); + + int lt; + for (lt=1; lt < Ndb_index_stat::LT_Count; lt++) + { + Ndb_index_stat *st=ndb_index_stat_list[lt].head; + while (st != 0) + { + if (st->index_id == index_id && + st->index_version == index_version) + { + pthread_mutex_unlock(&ndb_index_stat_list_mutex); + pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_RETURN(st); + } + st= st->list_next; + } + } + + pthread_mutex_unlock(&ndb_index_stat_list_mutex); + pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_RETURN(0); +} + /* Statistics thread sub-routines */ void @@ -931,12 +964,14 @@ ndb_index_stat_cache_clean(Ndb_index_sta /* Misc in/out parameters for process steps */ struct Ndb_index_stat_proc { + NdbIndexStat* is_util; // For metadata and polling Ndb *ndb; time_t now; int lt; bool busy; bool end; Ndb_index_stat_proc() : + is_util(0), ndb(0), now(0), lt(0), @@ -1383,6 +1418,75 @@ ndb_index_stat_proc_error(Ndb_index_stat pr.busy= true; } +void +ndb_index_stat_proc_event(Ndb_index_stat_proc &pr, Ndb_index_stat *st) +{ + /* + Put on Check list if idle. + We get event also for our own analyze but this should not matter. + */ + pr.lt= st->lt; + if (st->lt == Ndb_index_stat::LT_Idle || + st->lt == Ndb_index_stat::LT_Error) + pr.lt= Ndb_index_stat::LT_Check; +} + +void +ndb_index_stat_proc_event(Ndb_index_stat_proc &pr) +{ + NdbIndexStat *is= pr.is_util; + Ndb *ndb= pr.ndb; + int ret; + ret= is->poll_listener(ndb, 0); + DBUG_PRINT("index_stat", ("poll_listener ret: %d", ret)); + if (ret == -1) + { + // wl4124_todo report error + DBUG_ASSERT(false); + return; + } + if (ret == 0) + return; + + while (1) + { + ret= is->next_listener(ndb); + DBUG_PRINT("index_stat", ("next_listener ret: %d", ret)); + if (ret == -1) + { + // wl4124_todo report error + DBUG_ASSERT(false); + return; + } + if (ret == 0) + break; + + NdbIndexStat::Head head; + is->get_head(head); + DBUG_PRINT("index_stat", ("next_listener eventType: %d indexId: %u", + head.m_eventType, head.m_indexId)); + + Ndb_index_stat *st= ndb_index_stat_find_entry(head.m_indexId, + head.m_indexVersion, + head.m_tableId); + /* + Another process can update stats for an index which is not found + in this mysqld. Ignore it. + */ + if (st != 0) + { + DBUG_PRINT("index_stat", ("st %s proc %s", st->id, "Event")); + ndb_index_stat_proc_event(pr, st); + if (pr.lt != st->lt) + ndb_index_stat_list_move(st, pr.lt); + } + else + { + DBUG_PRINT("index_stat", ("entry not found in this mysqld")); + } + } +} + #ifndef DBUG_OFF void ndb_index_stat_report(const Ndb_index_stat_glob& old_glob) @@ -1466,6 +1570,7 @@ ndb_index_stat_proc(Ndb_index_stat_proc ndb_index_stat_proc_evict(pr); ndb_index_stat_proc_delete(pr); ndb_index_stat_proc_error(pr); + ndb_index_stat_proc_event(pr); #ifndef DBUG_OFF ndb_index_stat_report(old_glob); @@ -1511,11 +1616,14 @@ ndb_index_stat_end() /* Index stats thread */ -static int -ndb_index_stat_check_or_create_systables(NdbIndexStat* is, Ndb* ndb) +int +ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc &pr) { DBUG_ENTER("ndb_index_stat_check_or_create_systables"); + NdbIndexStat *is= pr.is_util; + Ndb *ndb= pr.ndb; + if (is->check_systables(ndb) == 0) { DBUG_PRINT("index_stat", ("using existing index stats tables")); @@ -1542,6 +1650,82 @@ ndb_index_stat_check_or_create_systables DBUG_RETURN(-1); } +int +ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc &pr) +{ + DBUG_ENTER("ndb_index_stat_check_or_create_sysevents"); + + NdbIndexStat *is= pr.is_util; + Ndb *ndb= pr.ndb; + + if (is->check_sysevents(ndb) == 0) + { + DBUG_PRINT("index_stat", ("using existing index stats events")); + DBUG_RETURN(0); + } + + if (is->create_sysevents(ndb) == 0) + { + DBUG_PRINT("index_stat", ("created index stats events")); + DBUG_RETURN(0); + } + + if (is->getNdbError().code == 746) + { + // race between mysqlds, maybe + DBUG_PRINT("index_stat", ("create index stats events failed: error %d line %d", + is->getNdbError().code, is->getNdbError().line)); + DBUG_RETURN(-1); + } + + sql_print_warning("create index stats events failed: error %d line %d", + is->getNdbError().code, is->getNdbError().line); + DBUG_RETURN(-1); +} + +int +ndb_index_stat_start_listener(Ndb_index_stat_proc &pr) +{ + DBUG_ENTER("ndb_index_stat_start_listener"); + + NdbIndexStat *is= pr.is_util; + Ndb *ndb= pr.ndb; + + if (is->create_listener(ndb) == -1) + { + sql_print_warning("create index stats listener failed: error %d line %d", + is->getNdbError().code, is->getNdbError().line); + DBUG_RETURN(-1); + } + + if (is->execute_listener(ndb) == -1) + { + sql_print_warning("execute index stats listener failed: error %d line %d", + is->getNdbError().code, is->getNdbError().line); + DBUG_RETURN(-1); + } + + DBUG_RETURN(0); +} + +int +ndb_index_stat_stop_listener(Ndb_index_stat_proc &pr) +{ + DBUG_ENTER("ndb_index_stat_stop_listener"); + + NdbIndexStat *is= pr.is_util; + Ndb *ndb= pr.ndb; + + if (is->drop_listener(ndb) == -1) + { + sql_print_warning("drop index stats listener failed: error %d line %d", + is->getNdbError().code, is->getNdbError().line); + DBUG_RETURN(-1); + } + + DBUG_RETURN(0); +} + pthread_handler_t ndb_index_stat_thread_func(void *arg __attribute__((unused))) { @@ -1552,6 +1736,10 @@ ndb_index_stat_thread_func(void *arg __a my_thread_init(); DBUG_ENTER("ndb_index_stat_thread_func"); + Ndb_index_stat_proc pr; + NdbIndexStat is_util; + pr.is_util= &is_util; + // wl4124_todo remove useless stuff copied from utility thread pthread_mutex_lock(&LOCK_ndb_index_stat_thread); @@ -1638,10 +1826,13 @@ ndb_index_stat_thread_func(void *arg __a pthread_mutex_lock(&LOCK_ndb_index_stat_thread); goto ndb_index_stat_thread_end; } + pr.ndb= thd_ndb->ndb; ndb_index_stat_allow(1); bool enable_ok; enable_ok= false; + bool have_listener; + have_listener= false; set_timespec(abstime, 0); for (;;) @@ -1662,9 +1853,6 @@ ndb_index_stat_thread_func(void *arg __a /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */ const bool enable_ok_new= ndb_index_stat_get_enable(NULL); - Ndb_index_stat_proc pr; - pr.ndb= thd_ndb->ndb; - do { if (enable_ok != enable_ok_new) @@ -1674,13 +1862,24 @@ ndb_index_stat_thread_func(void *arg __a if (enable_ok_new) { - // at enable check or create stats tables - NdbIndexStat is; - if (ndb_index_stat_check_or_create_systables(&is, thd_ndb->ndb) == -1) + // at enable check or create stats tables and events + if (ndb_index_stat_check_or_create_systables(pr) == -1 || + ndb_index_stat_check_or_create_sysevents(pr) == -1 || + ndb_index_stat_start_listener(pr) == -1) { // try again in next loop break; } + have_listener= true; + } + else + { + // not a normal use-case + if (have_listener) + { + if (ndb_index_stat_stop_listener(pr) == 0) + have_listener= false; + } } enable_ok= enable_ok_new; } @@ -1711,6 +1910,11 @@ ndb_index_stat_thread_end: net_end(&thd->net); ndb_index_stat_thread_fail: + if (have_listener) + { + if (ndb_index_stat_stop_listener(pr) == 0) + have_listener= false; + } if (thd_ndb) { ha_ndbcluster::release_thd_ndb(thd_ndb); === modified file 'sql/ha_ndb_index_stat.h' --- a/sql/ha_ndb_index_stat.h 2011-07-23 14:35:37 +0000 +++ b/sql/ha_ndb_index_stat.h 2011-08-11 17:11:30 +0000 @@ -19,6 +19,8 @@ extern struct st_ndb_status g_ndb_status; +extern pthread_mutex_t ndbcluster_mutex; + extern pthread_t ndb_index_stat_thread; extern pthread_cond_t COND_ndb_index_stat_thread; extern pthread_mutex_t LOCK_ndb_index_stat_thread; === modified file 'storage/ndb/include/ndb_constants.h' --- a/storage/ndb/include/ndb_constants.h 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/include/ndb_constants.h 2011-08-11 17:11:30 +0000 @@ -120,6 +120,7 @@ #define NDB_INDEX_STAT_HEAD_TABLE "ndb_index_stat_head" #define NDB_INDEX_STAT_SAMPLE_TABLE "ndb_index_stat_sample" #define NDB_INDEX_STAT_SAMPLE_INDEX1 "ndb_index_stat_sample_x1" +#define NDB_INDEX_STAT_HEAD_EVENT "ndb_index_stat_head_event" #define NDB_INDEX_STAT_PREFIX "ndb_index_stat" === modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp' --- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp 2011-07-31 17:40:01 +0000 +++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp 2011-08-11 17:11:30 +0000 @@ -100,7 +100,10 @@ public: InvalidCache = 4718, InternalError = 4719, BadSysTables = 4720, // sys tables partly missing or invalid - HaveSysTables = 4244 // create error if all sys tables exist + HaveSysTables = 4244, // create error if all sys tables exist + NoSysEvents = 4710, + BadSysEvents = BadSysTables, + HaveSysEvents = 746 }; /* @@ -191,6 +194,7 @@ public: */ struct Head { Int32 m_found; // -1 no read done, 0 = no record, 1 = exists + Int32 m_eventType; // if polling, NdbDictionary::Event::TE_INSERT etc Uint32 m_indexId; Uint32 m_indexVersion; Uint32 m_tableId; @@ -327,6 +331,42 @@ public: static void get_rule(const Stat& stat, char* buffer); /* + * Events (there is 1) for polling. These are dictionary objects. + * Correct sys tables must exist. Drop ignores non-existing events. + */ + int create_sysevents(Ndb* ndb); + int drop_sysevents(Ndb* ndb); + int check_sysevents(Ndb* ndb); + + /* + * Create listener for stats updates. Only 1 is allowed. + */ + int create_listener(Ndb* ndb); + + /* + * Start listening for events (call NdbEventOperation::execute). + */ + int execute_listener(Ndb* ndb); + + /* + * Poll the listener (call Ndb::pollEvents). Returns 1 if there are + * events available and 0 otherwise, or -1 on failure as usual. + */ + int poll_listener(Ndb* ndb, int max_wait_ms); + + /* + * Get next available event. Returns 1 if a new event was returned + * and 0 otherwise, or -1 on failure as usual. Use get_heed() to + * retrieve event type and data. + */ + int next_listener(Ndb* ndb); + + /* + * Drop the listener. + */ + int drop_listener(Ndb* ndb); + + /* * Memory allocator for stats cache data (key and value byte arrays). * Implementation default uses malloc/free. The memory in use is the * sum of CacheInfo::m_totalBytes from all cache types. === modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp' --- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-08-11 17:11:30 +0000 @@ -656,6 +656,82 @@ NdbIndexStat::get_rule(const Stat& stat_ DBUG_VOID_RETURN; } +// events and polling + +int +NdbIndexStat::create_sysevents(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::create_sysevents"); + if (m_impl.create_sysevents(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + +int +NdbIndexStat::drop_sysevents(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::drop_sysevents"); + if (m_impl.drop_sysevents(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + +int +NdbIndexStat::check_sysevents(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::check_sysevents"); + if (m_impl.check_sysevents(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + +int +NdbIndexStat::create_listener(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::create_listener"); + if (m_impl.create_listener(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + +int +NdbIndexStat::execute_listener(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::execute_listener"); + if (m_impl.execute_listener(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + +int +NdbIndexStat::poll_listener(Ndb* ndb, int max_wait_ms) +{ + DBUG_ENTER("NdbIndexStat::poll_listener"); + int ret = m_impl.poll_listener(ndb, max_wait_ms); + if (ret == -1) + DBUG_RETURN(-1); + DBUG_RETURN(ret); +} + +int +NdbIndexStat::next_listener(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::next_listener"); + int ret = m_impl.next_listener(ndb); + if (ret == -1) + DBUG_RETURN(-1); + DBUG_RETURN(ret); +} + +int +NdbIndexStat::drop_listener(Ndb* ndb) +{ + DBUG_ENTER("NdbIndexStat::drop_listener"); + if (m_impl.drop_listener(ndb) == -1) + DBUG_RETURN(-1); + DBUG_RETURN(0); +} + // mem NdbIndexStat::Mem::Mem() === modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp' --- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-09 15:37:45 +0000 +++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-11 17:11:30 +0000 @@ -22,6 +22,7 @@ #include #include #include +#include #include "NdbIndexStatImpl.hpp" #undef min @@ -33,8 +34,8 @@ static const char* const g_headtable_nam static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE; static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1; -const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 }; -const int ERR_TupleNotFound[] = { 626, 0 }; +static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 }; +static const int ERR_TupleNotFound[] = { 626, 0 }; NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) : NdbIndexStat(*this), @@ -45,6 +46,7 @@ NdbIndexStatImpl::NdbIndexStatImpl(NdbIn init(); m_query_mutex = NdbMutex_Create(); assert(m_query_mutex != 0); + m_eventOp = 0; m_mem_handler = &c_mem_default_handler; } @@ -544,10 +546,8 @@ NdbIndexStatImpl::drop_systables(Ndb* nd } int -NdbIndexStatImpl::check_systables(Ndb* ndb) +NdbIndexStatImpl::check_systables(Sys& sys) { - Sys sys(this, ndb); - if (get_systables(sys) == -1) return -1; @@ -566,6 +566,17 @@ NdbIndexStatImpl::check_systables(Ndb* n return 0; } +int +NdbIndexStatImpl::check_systables(Ndb* ndb) +{ + Sys sys(this, ndb); + + if (check_systables(sys) == -1) + return -1; + + return 0; +} + // operation context NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) : @@ -781,6 +792,7 @@ void NdbIndexStatImpl::init_head(Head& head) { head.m_found = -1; + head.m_eventType = -1; head.m_indexId = 0; head.m_indexVersion = 0; head.m_tableId = 0; @@ -2259,6 +2271,183 @@ NdbIndexStatImpl::query_keycmp(const Cac return res; } +// events and polling + +int +NdbIndexStatImpl::create_sysevents(Ndb* ndb) +{ + Sys sys(this, ndb); + NdbDictionary::Dictionary* const dic = ndb->getDictionary(); + + if (check_systables(sys) == -1) + return -1; + const NdbDictionary::Table* tab = sys.m_headtable; + require(tab != 0); + + const char* const evname = NDB_INDEX_STAT_HEAD_EVENT; + NdbDictionary::Event ev(evname, *tab); + ev.addTableEvent(NdbDictionary::Event::TE_INSERT); + ev.addTableEvent(NdbDictionary::Event::TE_DELETE); + ev.addTableEvent(NdbDictionary::Event::TE_UPDATE); + for (int i = 0; i < tab->getNoOfColumns(); i++) + ev.addEventColumn(i); + ev.setReport(NdbDictionary::Event::ER_UPDATED); + + if (dic->createEvent(ev) == -1) + { + setError(dic->getNdbError().code, __LINE__); + return -1; + } + return 0; +} + +int +NdbIndexStatImpl::drop_sysevents(Ndb* ndb) +{ + Sys sys(this, ndb); + NdbDictionary::Dictionary* const dic = ndb->getDictionary(); + + if (check_systables(sys) == -1) + return -1; + + const char* const evname = NDB_INDEX_STAT_HEAD_EVENT; + if (dic->dropEvent(evname) == -1) + { + int code = dic->getNdbError().code; + if (code != 4710) + { + setError(dic->getNdbError().code, __LINE__); + return -1; + } + } + return 0; +} + +int +NdbIndexStatImpl::check_sysevents(Ndb* ndb) +{ + Sys sys(this, ndb); + NdbDictionary::Dictionary* const dic = ndb->getDictionary(); + + if (check_systables(sys) == -1) + return -1; + + const char* const evname = NDB_INDEX_STAT_HEAD_EVENT; + const NdbDictionary::Event* ev = dic->getEvent(evname); + if (ev == 0) + { + setError(dic->getNdbError().code, __LINE__); + return -1; + } + return 0; +} + +int +NdbIndexStatImpl::create_listener(Ndb* ndb) +{ + if (m_eventOp != 0) + { + setError(UsageError, __LINE__); + return -1; + } + const char* const evname = NDB_INDEX_STAT_HEAD_EVENT; + m_eventOp = ndb->createEventOperation(evname); + if (m_eventOp == 0) + { + setError(ndb->getNdbError().code, __LINE__); + return -1; + } + + // all columns are non-nullable + Head& head = m_facadeHead; + if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 || + m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 || + m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 || + m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 || + m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 || + m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 || + m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 || + m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 || + m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0) + { + setError(m_eventOp->getNdbError().code, __LINE__); + return -1; + } + // wl4124_todo why this + static Head xxx; + if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 || + m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 || + m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 || + m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 || + m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 || + m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 || + m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 || + m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 || + m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0) + { + setError(m_eventOp->getNdbError().code, __LINE__); + return -1; + } + return 0; +} + +int +NdbIndexStatImpl::execute_listener(Ndb* ndb) +{ + if (m_eventOp == 0) + { + setError(UsageError, __LINE__); + return -1; + } + if (m_eventOp->execute() == -1) + { + setError(m_eventOp->getNdbError().code, __LINE__); + return -1; + } + return 0; +} + +int +NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms) +{ + int ret; + if ((ret = ndb->pollEvents(max_wait_ms)) < 0) + { + setError(ndb->getNdbError().code, __LINE__); + return -1; + } + return (ret == 0 ? 0 : 1); +} + +int +NdbIndexStatImpl::next_listener(Ndb* ndb) +{ + NdbEventOperation* op = ndb->nextEvent(); + if (op == 0) + return 0; + + Head& head = m_facadeHead; + head.m_eventType = (int)op->getEventType(); + return 1; +} + +int +NdbIndexStatImpl::drop_listener(Ndb* ndb) +{ + if (m_eventOp == 0) + { + setError(UsageError, __LINE__); + return -1; + } + if (ndb->dropEventOperation(m_eventOp) != 0) + { + setError(ndb->getNdbError().code, __LINE__); + return -1; + } + m_eventOp = 0; + return 0; +} + // mem alloc - default impl NdbIndexStatImpl::MemDefault::MemDefault() === modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp 2011-07-31 17:40:01 +0000 +++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp 2011-08-11 17:11:30 +0000 @@ -29,6 +29,7 @@ class NdbTransaction; class NdbIndexScanOperation; class NdbRecAttr; class NdbOperation; +class NdbEventOperation; extern const uint g_ndb_index_stat_head_frm_len; extern const uint8 g_ndb_index_stat_head_frm_data[]; @@ -71,6 +72,7 @@ public: Cache* m_cacheClean; // mutex for query cache switch, memory barrier would do NdbMutex* m_query_mutex; + NdbEventOperation* m_eventOp; Mem* m_mem_handler; NdbIndexStat::Error m_error; @@ -98,6 +100,7 @@ public: int get_systables(Sys& sys); int create_systables(Ndb* ndb); int drop_systables(Ndb* ndb); + int check_systables(Sys& sys); int check_systables(Ndb* ndb); // operation context @@ -279,6 +282,17 @@ public: void query_search(const Cache&, const Bound&, StatBound&); int query_keycmp(const Cache&, const Bound&, uint pos, Uint32& numEq); + // events and polling + int create_sysevents(Ndb* ndb); + int drop_sysevents(Ndb* ndb); + int check_sysevents(Ndb* ndb); + // + int create_listener(Ndb* ndb); + int execute_listener(Ndb* ndb); + int poll_listener(Ndb* ndb, int max_wait_ms); + int next_listener(Ndb* ndb); + int drop_listener(Ndb* ndb); + // default memory allocator struct MemDefault : public Mem { virtual void* mem_alloc(UintPtr bytes); === modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp' --- a/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-09 15:36:25 +0000 +++ b/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-11 17:11:30 +0000 @@ -134,6 +134,7 @@ static NdbIndexScanOperation* g_rangesca static NdbIndexStat* g_is = 0; static bool g_has_created_stat_tables = false; +static bool g_has_created_stat_events = false; static uint urandom() @@ -213,7 +214,7 @@ errdb() ll0(++any << " rangescan_op: error " << e); } if (g_is != 0) { - const NdbError& e = g_is->getNdbError(); + const NdbIndexStat::Error& e = g_is->getNdbError(); if (e.code != 0) ll0(++any << " stat: error " << e); } @@ -1518,6 +1519,40 @@ readstat() return 0; } +// test polling after updatestat + +static int +startlistener() +{ + ll1("startlistener"); + chkdb(g_is->create_listener(g_ndb_sys) == 0); + chkdb(g_is->execute_listener(g_ndb_sys) == 0); + return 0; +} + +static int +runlistener() +{ + ll1("runlistener"); + int ret; + chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1); + chkrc(ret == 1); + // one event is expected + chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1); + chkrc(ret == 1); + chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1); + chkrc(ret == 0); + return 0; +} + +static int +stoplistener() +{ + ll1("stoplistener"); + chkdb(g_is->drop_listener(g_ndb_sys) != -1); + return 0; +} + // stats queries // exact stats from scan results @@ -2081,6 +2116,7 @@ runtest() chkrc(createindex() == 0); chkrc(createNdbRecords() == 0); chkrc(definestat() == 0); + chkrc(startlistener() == 0); for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) { ll0("=== loop " << g_loop << " ==="); @@ -2094,6 +2130,7 @@ runtest() makeranges(); chkrc(scanranges() == 0); chkrc(updatestat() == 0); + chkrc(runlistener() == 0); chkrc(readstat() == 0); chkrc(queryranges() == 0); loopstats(); @@ -2101,6 +2138,7 @@ runtest() } finalstats(); + chkrc(stoplistener() == 0); if (!g_opts.keeptable) chkrc(droptable() == 0); freeranges(); @@ -2234,22 +2272,66 @@ docreate_stat_tables() { if (g_is->check_systables(g_ndb_sys) == 0) return 0; + ll1("check_systables: " << g_is->getNdbError()); - if (g_is->create_systables(g_ndb_sys) == 0) - { - g_has_created_stat_tables = true; - return 0; - } - return -1; + ll0("create stat tables"); + chkdb(g_is->create_systables(g_ndb_sys) == 0); + g_has_created_stat_tables = true; + return 0; } static -void +int dodrop_stat_tables() { if (g_has_created_stat_tables == false) - return; - g_is->drop_systables(g_ndb_sys); + return 0; + + ll0("drop stat tables"); + chkdb(g_is->drop_systables(g_ndb_sys) == 0); + return 0; +} + +static int +docreate_stat_events() +{ + if (g_is->check_sysevents(g_ndb_sys) == 0) + return 0; + ll1("check_sysevents: " << g_is->getNdbError()); + + ll0("create stat events"); + chkdb(g_is->create_sysevents(g_ndb_sys) == 0); + g_has_created_stat_events = true; + return 0; +} + +static int +dodrop_stat_events() +{ + if (g_has_created_stat_events == false) + return 0; + + ll0("drop stat events"); + chkdb(g_is->drop_sysevents(g_ndb_sys) == 0); + return 0; +} + +static int +docreate_sys_objects() +{ + require(g_is != 0 && g_ndb_sys != 0); + chkrc(docreate_stat_tables() == 0); + chkrc(docreate_stat_events() == 0); + return 0; +} + +static int +dodrop_sys_objects() +{ + require(g_is != 0 && g_ndb_sys != 0); + chkrc(dodrop_stat_events() == 0); + chkrc(dodrop_stat_tables() == 0); + return 0; } int @@ -2277,17 +2359,22 @@ main(int argc, char** argv) ll0("connect failed"); return NDBT_ProgramExit(NDBT_FAILED); } - if (docreate_stat_tables() == -1){ - ll0("failed to create stat tables"); - return NDBT_ProgramExit(NDBT_FAILED); + if (docreate_sys_objects() == -1) { + ll0("failed to check or create stat tables and events"); + goto failed; } if (runtest() == -1) { ll0("test failed"); - dodrop_stat_tables(); - dodisconnect(); - return NDBT_ProgramExit(NDBT_FAILED); + goto failed; + } + if (dodrop_sys_objects() == -1) { + ll0("failed to drop created stat tables or events"); + goto failed; } - dodrop_stat_tables(); dodisconnect(); return NDBT_ProgramExit(NDBT_OK); +failed: + (void)dodrop_sys_objects(); + dodisconnect(); + return NDBT_ProgramExit(NDBT_FAILED); } === modified file 'storage/ndb/tools/ndb_index_stat.cpp' --- a/storage/ndb/tools/ndb_index_stat.cpp 2011-07-31 17:40:01 +0000 +++ b/storage/ndb/tools/ndb_index_stat.cpp 2011-08-11 17:11:30 +0000 @@ -35,6 +35,8 @@ static my_bool _sys_create = false; static my_bool _sys_create_if_not_exist = false; static my_bool _sys_create_if_not_valid = false; static my_bool _sys_check = false; +static my_bool _sys_skip_tables = false; +static my_bool _sys_skip_events = false; static int _sys_any = 0; // other static my_bool _verbose = false; @@ -382,69 +384,146 @@ dosys() { if (_sys_drop) { - g_info << "dropping any sys tables" << endl; - CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError()); - CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success"); - CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, - "unexpected error: " << g_is->getNdbError()); + if (!_sys_skip_events) + { + g_info << "dropping sys events" << endl; + CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success"); + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents, + "unexpected error: " << g_is->getNdbError()); + } + + if (!_sys_skip_tables) + { + g_info << "dropping all sys tables" << endl; + CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success"); + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, + "unexpected error: " << g_is->getNdbError()); + } g_info << "drop done" << endl; } if (_sys_create) { - g_info << "creating all sys tables" << endl; - CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError()); - CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); - g_info << "create done" << endl; - } - - if (_sys_create_if_not_exist) - { - if (g_is->check_systables(g_ndb_sys) == -1) + if (!_sys_skip_tables) { - CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, - g_is->getNdbError()); g_info << "creating all sys tables" << endl; CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError()); CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); + } + + if (!_sys_skip_events) + { + g_info << "creating sys events" << endl; + CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); g_info << "create done" << endl; } - else + } + + if (_sys_create_if_not_exist) + { + if (!_sys_skip_tables) { - g_info << "using existing sys tables" << endl; + if (g_is->check_systables(g_ndb_sys) == -1) + { + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, + g_is->getNdbError()); + g_info << "creating all sys tables" << endl; + CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "create done" << endl; + } + else + { + g_info << "using existing sys tables" << endl; + } + } + + if (!_sys_skip_events) + { + if (g_is->check_sysevents(g_ndb_sys) == -1) + { + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents, + g_is->getNdbError()); + g_info << "creating sys events" << endl; + CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "create done" << endl; + } + else + { + g_info << "using existing sys events" << endl; + } } } if (_sys_create_if_not_valid) { - if (g_is->check_systables(g_ndb_sys) == -1) + if (!_sys_skip_tables) { - if (g_is->getNdbError().code != NdbIndexStat::NoSysTables) + if (g_is->check_systables(g_ndb_sys) == -1) { - CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables, - g_is->getNdbError()); - g_info << "dropping invalid sys tables" << endl; - CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError()); - CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success"); - CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, - "unexpected error: " << g_is->getNdbError()); - g_info << "drop done" << endl; + if (g_is->getNdbError().code != NdbIndexStat::NoSysTables) + { + CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables, + g_is->getNdbError()); + g_info << "dropping invalid sys tables" << endl; + CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success"); + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables, + "unexpected error: " << g_is->getNdbError()); + g_info << "drop done" << endl; + } + g_info << "creating all sys tables" << endl; + CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "create done" << endl; + } + else + { + g_info << "using existing sys tables" << endl; } - g_info << "creating all sys tables" << endl; - CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError()); - CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); - g_info << "create done" << endl; } - else + if (!_sys_skip_events) { - g_info << "using existing sys tables" << endl; + if (g_is->check_sysevents(g_ndb_sys) == -1) + { + if (g_is->getNdbError().code != NdbIndexStat::NoSysEvents) + { + CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysEvents, + g_is->getNdbError()); + g_info << "dropping invalid sys events" << endl; + CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success"); + CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents, + "unexpected error: " << g_is->getNdbError()); + g_info << "drop done" << endl; + } + g_info << "creating sys events" << endl; + CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "create done" << endl; + } + else + { + g_info << "using existing sys events" << endl; + } } } if (_sys_check) { - CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); - g_info << "sys tables ok" << endl; + if (!_sys_skip_tables) + { + CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "sys tables ok" << endl; + } + if (!_sys_skip_events) + { + CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError()); + g_info << "sys events ok" << endl; + } } } while (0); @@ -514,25 +593,33 @@ my_long_options[] = GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, // sys options { "sys-drop", ++oi, - "Drop any stats tables in NDB kernel (all stats is lost)", + "Drop any stats tables and events in NDB kernel (all stats is lost)", (uchar **)&_sys_drop, (uchar **)&_sys_drop, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "sys-create", ++oi, - "Create stats tables in NDB kernel (must not exist)", + "Create stats tables and events in NDB kernel (must not exist)", (uchar **)&_sys_create, (uchar **)&_sys_create, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "sys-create-if-not-exist", ++oi, - "Like --sys-create but do nothing if correct stats tables exist", + "Like --sys-create but do nothing if correct objects exist", (uchar **)&_sys_create_if_not_exist, (uchar **)&_sys_create_if_not_exist, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "sys-create-if-not-valid", ++oi, - "Like --sys-create-if-not-exist but first drop any invalid tables", + "Like --sys-create-if-not-exist but first drop any invalid objects", (uchar **)&_sys_create_if_not_valid, (uchar **)&_sys_create_if_not_valid, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "sys-check", ++oi, - "Check that correct stats tables exist in NDB kernel", + "Check that correct stats tables and events exist in NDB kernel", (uchar **)&_sys_check, (uchar **)&_sys_check, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "sys-skip-tables", ++oi, + "Do not apply sys options to tables", + (uchar **)&_sys_skip_tables, (uchar **)&_sys_skip_tables, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "sys-skip-events", ++oi, + "Do not apply sys options to events", + (uchar **)&_sys_skip_events, (uchar **)&_sys_skip_events, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, // other { "verbose", 'v', "Verbose messages", @@ -581,7 +668,9 @@ checkopts(int argc, char** argv) (_sys_create_if_not_exist != 0) + (_sys_create_if_not_valid != 0) + (_sys_drop != 0) + - ( _sys_check != 0); + ( _sys_check != 0) + + (_sys_skip_tables != 0) + + (_sys_skip_events != 0); if (!_sys_any) { if (_dbname == 0) No bundle (reason: useless for push emails).