List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 11 2011 5:13pm
Subject:bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4422
to 4423) WL#4124
View as plain text  
 4423 Pekka Nousiainen	2011-08-11
      wl#4124 f01_event.diff
      listen to data events on head table

    modified:
      sql/ha_ndb_index_stat.cc
      sql/ha_ndb_index_stat.h
      storage/ndb/include/ndb_constants.h
      storage/ndb/include/ndbapi/NdbIndexStat.hpp
      storage/ndb/src/ndbapi/NdbIndexStat.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
      storage/ndb/test/ndbapi/testIndexStat.cpp
      storage/ndb/tools/ndb_index_stat.cpp
 4422 Pekka Nousiainen	2011-08-09
      treename is wl4124-new2 from x17_fix and up

    modified:
      .bzr-mysql/default.conf
=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	2011-07-23 17:22:32 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-08-11 17:11:30 +0000
@@ -498,8 +498,8 @@ struct Ndb_index_stat_glob {
   uint list_count[Ndb_index_stat::LT_Count]; /* Temporary use */
   uint total_count;
   uint force_update;
-  uint no_stats;
   uint wait_update;
+  uint no_stats;
   uint cache_query_bytes; /* In use */
   uint cache_clean_bytes; /* Obsolete versions not yet removed */
   Ndb_index_stat_glob() :
@@ -890,6 +890,39 @@ ndb_index_stat_free(NDB_SHARE *share)
   pthread_mutex_unlock(&ndb_index_stat_list_mutex);
 }
 
+/* Find entry across shares */
+/* wl4124_todo mutex overkill, hash table, can we find table share */
+Ndb_index_stat*
+ndb_index_stat_find_entry(int index_id, int index_version, int table_id)
+{
+  DBUG_ENTER("ndb_index_stat_find_entry");
+  pthread_mutex_lock(&ndbcluster_mutex);
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
+                            index_id, index_version, table_id));
+
+  int lt;
+  for (lt=1; lt < Ndb_index_stat::LT_Count; lt++)
+  {
+    Ndb_index_stat *st=ndb_index_stat_list[lt].head;
+    while (st != 0)
+    {
+      if (st->index_id == index_id &&
+          st->index_version == index_version)
+      {
+        pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+        pthread_mutex_unlock(&ndbcluster_mutex);
+        DBUG_RETURN(st);
+      }
+      st= st->list_next;
+    }
+  }
+
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndbcluster_mutex);
+  DBUG_RETURN(0);
+}
+
 /* Statistics thread sub-routines */
 
 void
@@ -931,12 +964,14 @@ ndb_index_stat_cache_clean(Ndb_index_sta
 
 /* Misc in/out parameters for process steps */
 struct Ndb_index_stat_proc {
+  NdbIndexStat* is_util; // For metadata and polling
   Ndb *ndb;
   time_t now;
   int lt;
   bool busy;
   bool end;
   Ndb_index_stat_proc() :
+    is_util(0),
     ndb(0),
     now(0),
     lt(0),
@@ -1383,6 +1418,75 @@ ndb_index_stat_proc_error(Ndb_index_stat
     pr.busy= true;
 }
 
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  /*
+    Put on Check list if idle.
+    We get event also for our own analyze but this should not matter.
+   */
+  pr.lt= st->lt;
+  if (st->lt == Ndb_index_stat::LT_Idle ||
+      st->lt == Ndb_index_stat::LT_Error)
+    pr.lt= Ndb_index_stat::LT_Check;
+}
+
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr)
+{
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+  int ret;
+  ret= is->poll_listener(ndb, 0);
+  DBUG_PRINT("index_stat", ("poll_listener ret: %d", ret));
+  if (ret == -1)
+  {
+    // wl4124_todo report error
+    DBUG_ASSERT(false);
+    return;
+  }
+  if (ret == 0)
+    return;
+
+  while (1)
+  {
+    ret= is->next_listener(ndb);
+    DBUG_PRINT("index_stat", ("next_listener ret: %d", ret));
+    if (ret == -1)
+    {
+      // wl4124_todo report error
+      DBUG_ASSERT(false);
+      return;
+    }
+    if (ret == 0)
+      break;
+
+    NdbIndexStat::Head head;
+    is->get_head(head);
+    DBUG_PRINT("index_stat", ("next_listener eventType: %d indexId: %u",
+                              head.m_eventType, head.m_indexId));
+
+    Ndb_index_stat *st= ndb_index_stat_find_entry(head.m_indexId,
+                                                  head.m_indexVersion,
+                                                  head.m_tableId);
+    /*
+      Another process can update stats for an index which is not found
+      in this mysqld.  Ignore it.
+     */
+    if (st != 0)
+    {
+      DBUG_PRINT("index_stat", ("st %s proc %s", st->id, "Event"));
+      ndb_index_stat_proc_event(pr, st);
+      if (pr.lt != st->lt)
+        ndb_index_stat_list_move(st, pr.lt);
+    }
+    else
+    {
+      DBUG_PRINT("index_stat", ("entry not found in this mysqld"));
+    }
+  }
+}
+
 #ifndef DBUG_OFF
 void
 ndb_index_stat_report(const Ndb_index_stat_glob& old_glob)
@@ -1466,6 +1570,7 @@ ndb_index_stat_proc(Ndb_index_stat_proc
   ndb_index_stat_proc_evict(pr);
   ndb_index_stat_proc_delete(pr);
   ndb_index_stat_proc_error(pr);
+  ndb_index_stat_proc_event(pr);
 
 #ifndef DBUG_OFF
   ndb_index_stat_report(old_glob);
@@ -1511,11 +1616,14 @@ ndb_index_stat_end()
 
 /* Index stats thread */
 
-static int
-ndb_index_stat_check_or_create_systables(NdbIndexStat* is, Ndb* ndb)
+int
+ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc &pr)
 {
   DBUG_ENTER("ndb_index_stat_check_or_create_systables");
 
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
   if (is->check_systables(ndb) == 0)
   {
     DBUG_PRINT("index_stat", ("using existing index stats tables"));
@@ -1542,6 +1650,82 @@ ndb_index_stat_check_or_create_systables
   DBUG_RETURN(-1);
 }
 
+int
+ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_check_or_create_sysevents");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->check_sysevents(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("using existing index stats events"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->create_sysevents(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("created index stats events"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->getNdbError().code == 746)
+  {
+    // race between mysqlds, maybe
+    DBUG_PRINT("index_stat", ("create index stats events failed: error %d line %d",
+                              is->getNdbError().code, is->getNdbError().line));
+    DBUG_RETURN(-1);
+  }
+
+  sql_print_warning("create index stats events failed: error %d line %d",
+                    is->getNdbError().code, is->getNdbError().line);
+  DBUG_RETURN(-1);
+}
+
+int
+ndb_index_stat_start_listener(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_start_listener");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->create_listener(ndb) == -1)
+  {
+    sql_print_warning("create index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  if (is->execute_listener(ndb) == -1)
+  {
+    sql_print_warning("execute index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndb_index_stat_stop_listener(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_stop_listener");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->drop_listener(ndb) == -1)
+  {
+    sql_print_warning("drop index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
+
 pthread_handler_t
 ndb_index_stat_thread_func(void *arg __attribute__((unused)))
 {
@@ -1552,6 +1736,10 @@ ndb_index_stat_thread_func(void *arg __a
   my_thread_init();
   DBUG_ENTER("ndb_index_stat_thread_func");
 
+  Ndb_index_stat_proc pr;
+  NdbIndexStat is_util;
+  pr.is_util= &is_util;
+
   // wl4124_todo remove useless stuff copied from utility thread
  
   pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
@@ -1638,10 +1826,13 @@ ndb_index_stat_thread_func(void *arg __a
     pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
     goto ndb_index_stat_thread_end;
   }
+  pr.ndb= thd_ndb->ndb;
 
   ndb_index_stat_allow(1);
   bool enable_ok;
   enable_ok= false;
+  bool have_listener;
+  have_listener= false;
 
   set_timespec(abstime, 0);
   for (;;)
@@ -1662,9 +1853,6 @@ ndb_index_stat_thread_func(void *arg __a
     /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
     const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
 
-    Ndb_index_stat_proc pr;
-    pr.ndb= thd_ndb->ndb;
-
     do
     {
       if (enable_ok != enable_ok_new)
@@ -1674,13 +1862,24 @@ ndb_index_stat_thread_func(void *arg __a
 
         if (enable_ok_new)
         {
-          // at enable check or create stats tables
-          NdbIndexStat is;
-          if (ndb_index_stat_check_or_create_systables(&is, thd_ndb->ndb) == -1)
+          // at enable check or create stats tables and events
+          if (ndb_index_stat_check_or_create_systables(pr) == -1 ||
+              ndb_index_stat_check_or_create_sysevents(pr) == -1 ||
+              ndb_index_stat_start_listener(pr) == -1)
           {
             // try again in next loop
             break;
           }
+          have_listener= true;
+        }
+        else
+        {
+          // not a normal use-case
+          if (have_listener)
+          {
+            if (ndb_index_stat_stop_listener(pr) == 0)
+              have_listener= false;
+          }
         }
         enable_ok= enable_ok_new;
       }
@@ -1711,6 +1910,11 @@ ndb_index_stat_thread_end:
   net_end(&thd->net);
 
 ndb_index_stat_thread_fail:
+  if (have_listener)
+  {
+    if (ndb_index_stat_stop_listener(pr) == 0)
+      have_listener= false;
+  }
   if (thd_ndb)
   {
     ha_ndbcluster::release_thd_ndb(thd_ndb);

=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	2011-07-23 14:35:37 +0000
+++ b/sql/ha_ndb_index_stat.h	2011-08-11 17:11:30 +0000
@@ -19,6 +19,8 @@
 
 extern struct st_ndb_status g_ndb_status;
 
+extern pthread_mutex_t ndbcluster_mutex;
+
 extern pthread_t ndb_index_stat_thread;
 extern pthread_cond_t COND_ndb_index_stat_thread;
 extern pthread_mutex_t LOCK_ndb_index_stat_thread;

=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/ndb_constants.h	2011-08-11 17:11:30 +0000
@@ -120,6 +120,7 @@
 #define NDB_INDEX_STAT_HEAD_TABLE    "ndb_index_stat_head"
 #define NDB_INDEX_STAT_SAMPLE_TABLE  "ndb_index_stat_sample"
 #define NDB_INDEX_STAT_SAMPLE_INDEX1 "ndb_index_stat_sample_x1"
+#define NDB_INDEX_STAT_HEAD_EVENT    "ndb_index_stat_head_event"
 
 #define NDB_INDEX_STAT_PREFIX        "ndb_index_stat"
 

=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-07-31 17:40:01 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-08-11 17:11:30 +0000
@@ -100,7 +100,10 @@ public:
     InvalidCache = 4718,
     InternalError = 4719,
     BadSysTables = 4720,  // sys tables partly missing or invalid
-    HaveSysTables = 4244  // create error if all sys tables exist
+    HaveSysTables = 4244, // create error if all sys tables exist
+    NoSysEvents = 4710,
+    BadSysEvents = BadSysTables,
+    HaveSysEvents = 746
   };
 
   /*
@@ -191,6 +194,7 @@ public:
    */
   struct Head {
     Int32 m_found;        // -1 no read done, 0 = no record, 1 = exists
+    Int32 m_eventType;    // if polling, NdbDictionary::Event::TE_INSERT etc
     Uint32 m_indexId;
     Uint32 m_indexVersion;
     Uint32 m_tableId;
@@ -327,6 +331,42 @@ public:
   static void get_rule(const Stat& stat, char* buffer);
 
   /*
+   * Events (there is 1) for polling.  These are dictionary objects.
+   * Correct sys tables must exist.  Drop ignores non-existing events.
+   */
+  int create_sysevents(Ndb* ndb);
+  int drop_sysevents(Ndb* ndb);
+  int check_sysevents(Ndb* ndb);
+
+  /*
+   * Create listener for stats updates.  Only 1 is allowed.
+   */
+  int create_listener(Ndb* ndb);
+
+  /*
+   * Start listening for events (call NdbEventOperation::execute).
+   */
+  int execute_listener(Ndb* ndb);
+
+  /*
+   * Poll the listener (call Ndb::pollEvents).  Returns 1 if there are
+   * events available and 0 otherwise, or -1 on failure as usual.
+   */
+  int poll_listener(Ndb* ndb, int max_wait_ms);
+
+  /*
+   * Get next available event.  Returns 1 if a new event was returned
+   * and 0 otherwise, or -1 on failure as usual.  Use get_heed() to
+   * retrieve event type and data.
+   */
+  int next_listener(Ndb* ndb);
+
+  /*
+   * Drop the listener.
+   */
+  int drop_listener(Ndb* ndb);
+
+  /*
    * Memory allocator for stats cache data (key and value byte arrays).
    * Implementation default uses malloc/free.  The memory in use is the
    * sum of CacheInfo::m_totalBytes from all cache types.

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-08-11 17:11:30 +0000
@@ -656,6 +656,82 @@ NdbIndexStat::get_rule(const Stat& stat_
   DBUG_VOID_RETURN;
 }
 
+// events and polling
+
+int
+NdbIndexStat::create_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::create_sysevents");
+  if (m_impl.create_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::drop_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::drop_sysevents");
+  if (m_impl.drop_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::check_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::check_sysevents");
+  if (m_impl.check_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::create_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::create_listener");
+  if (m_impl.create_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::execute_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::execute_listener");
+  if (m_impl.execute_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+  DBUG_ENTER("NdbIndexStat::poll_listener");
+  int ret = m_impl.poll_listener(ndb, max_wait_ms);
+  if (ret == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::next_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::next_listener");
+  int ret = m_impl.next_listener(ndb);
+  if (ret == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::drop_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::drop_listener");
+  if (m_impl.drop_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
 // mem
 
 NdbIndexStat::Mem::Mem()

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-08-09 15:37:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-08-11 17:11:30 +0000
@@ -22,6 +22,7 @@
 #include <Bitmask.hpp>
 #include <NdbSqlUtil.hpp>
 #include <NdbRecord.hpp>
+#include <NdbEventOperation.hpp>
 #include "NdbIndexStatImpl.hpp"
 
 #undef min
@@ -33,8 +34,8 @@ static const char* const g_headtable_nam
 static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
 static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
 
-const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
-const int ERR_TupleNotFound[] = { 626, 0 };
+static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
+static const int ERR_TupleNotFound[] = { 626, 0 };
 
 NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
   NdbIndexStat(*this),
@@ -45,6 +46,7 @@ NdbIndexStatImpl::NdbIndexStatImpl(NdbIn
   init();
   m_query_mutex = NdbMutex_Create();
   assert(m_query_mutex != 0);
+  m_eventOp = 0;
   m_mem_handler = &c_mem_default_handler;
 }
 
@@ -544,10 +546,8 @@ NdbIndexStatImpl::drop_systables(Ndb* nd
 }
 
 int
-NdbIndexStatImpl::check_systables(Ndb* ndb)
+NdbIndexStatImpl::check_systables(Sys& sys)
 {
-  Sys sys(this, ndb);
-
   if (get_systables(sys) == -1)
     return -1;
 
@@ -566,6 +566,17 @@ NdbIndexStatImpl::check_systables(Ndb* n
   return 0;
 }
 
+int
+NdbIndexStatImpl::check_systables(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  
+  if (check_systables(sys) == -1)
+    return -1;
+
+  return 0;
+}
+
 // operation context
 
 NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
@@ -781,6 +792,7 @@ void
 NdbIndexStatImpl::init_head(Head& head)
 {
   head.m_found = -1;
+  head.m_eventType = -1;
   head.m_indexId = 0;
   head.m_indexVersion = 0;
   head.m_tableId = 0;
@@ -2259,6 +2271,183 @@ NdbIndexStatImpl::query_keycmp(const Cac
   return res;
 }
 
+// events and polling
+
+int
+NdbIndexStatImpl::create_sysevents(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (check_systables(sys) == -1)
+    return -1;
+  const NdbDictionary::Table* tab = sys.m_headtable;
+  require(tab != 0);
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  NdbDictionary::Event ev(evname, *tab);
+  ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
+  ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
+  ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
+  for (int i = 0; i < tab->getNoOfColumns(); i++)
+    ev.addEventColumn(i);
+  ev.setReport(NdbDictionary::Event::ER_UPDATED);
+
+  if (dic->createEvent(ev) == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (check_systables(sys) == -1)
+    return -1;
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  if (dic->dropEvent(evname) == -1)
+  {
+    int code = dic->getNdbError().code;
+    if (code != 4710)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::check_sysevents(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (check_systables(sys) == -1)
+    return -1;
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  const NdbDictionary::Event* ev = dic->getEvent(evname);
+  if (ev == 0)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::create_listener(Ndb* ndb)
+{
+  if (m_eventOp != 0)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  m_eventOp = ndb->createEventOperation(evname);
+  if (m_eventOp == 0)
+  {
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
+  }
+
+  // all columns are non-nullable
+  Head& head = m_facadeHead;
+  if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
+      m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
+      m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
+      m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
+      m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
+      m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
+      m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
+      m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
+      m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
+  }
+  // wl4124_todo why this
+  static Head xxx;
+  if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
+      m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
+      m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
+      m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
+      m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
+      m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
+      m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
+      m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
+      m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::execute_listener(Ndb* ndb)
+{
+  if (m_eventOp == 0)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (m_eventOp->execute() == -1)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+  int ret;
+  if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
+  {
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return (ret == 0 ? 0 : 1);
+}
+
+int
+NdbIndexStatImpl::next_listener(Ndb* ndb)
+{
+  NdbEventOperation* op = ndb->nextEvent();
+  if (op == 0)
+    return 0;
+
+  Head& head = m_facadeHead;
+  head.m_eventType = (int)op->getEventType();
+  return 1;
+}
+
+int
+NdbIndexStatImpl::drop_listener(Ndb* ndb)
+{
+  if (m_eventOp == 0)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (ndb->dropEventOperation(m_eventOp) != 0)
+  {
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
+  }
+  m_eventOp = 0;
+  return 0;
+}
+
 // mem alloc - default impl
 
 NdbIndexStatImpl::MemDefault::MemDefault()

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-07-31 17:40:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-08-11 17:11:30 +0000
@@ -29,6 +29,7 @@ class NdbTransaction;
 class NdbIndexScanOperation;
 class NdbRecAttr;
 class NdbOperation;
+class NdbEventOperation;
 
 extern const uint g_ndb_index_stat_head_frm_len;
 extern const uint8 g_ndb_index_stat_head_frm_data[];
@@ -71,6 +72,7 @@ public:
   Cache* m_cacheClean;
   // mutex for query cache switch, memory barrier would do
   NdbMutex* m_query_mutex;
+  NdbEventOperation* m_eventOp;
   Mem* m_mem_handler;
   NdbIndexStat::Error m_error;
 
@@ -98,6 +100,7 @@ public:
   int get_systables(Sys& sys);
   int create_systables(Ndb* ndb);
   int drop_systables(Ndb* ndb);
+  int check_systables(Sys& sys);
   int check_systables(Ndb* ndb);
 
   // operation context
@@ -279,6 +282,17 @@ public:
   void query_search(const Cache&, const Bound&, StatBound&);
   int query_keycmp(const Cache&, const Bound&, uint pos, Uint32& numEq);
 
+  // events and polling
+  int create_sysevents(Ndb* ndb);
+  int drop_sysevents(Ndb* ndb);
+  int check_sysevents(Ndb* ndb);
+  //
+  int create_listener(Ndb* ndb);
+  int execute_listener(Ndb* ndb);
+  int poll_listener(Ndb* ndb, int max_wait_ms);
+  int next_listener(Ndb* ndb);
+  int drop_listener(Ndb* ndb);
+
   // default memory allocator
   struct MemDefault : public Mem {
     virtual void* mem_alloc(UintPtr bytes);

=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-08-09 15:36:25 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-08-11 17:11:30 +0000
@@ -134,6 +134,7 @@ static NdbIndexScanOperation* g_rangesca
 
 static NdbIndexStat* g_is = 0;
 static bool g_has_created_stat_tables = false;
+static bool g_has_created_stat_events = false;
 
 static uint
 urandom()
@@ -213,7 +214,7 @@ errdb()
       ll0(++any << " rangescan_op: error " << e);
   }
   if (g_is != 0) {
-    const NdbError& e = g_is->getNdbError();
+    const NdbIndexStat::Error& e = g_is->getNdbError();
     if (e.code != 0)
       ll0(++any << " stat: error " << e);
   }
@@ -1518,6 +1519,40 @@ readstat()
   return 0;
 }
 
+// test polling after updatestat
+
+static int
+startlistener()
+{
+  ll1("startlistener");
+  chkdb(g_is->create_listener(g_ndb_sys) == 0);
+  chkdb(g_is->execute_listener(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+runlistener()
+{
+  ll1("runlistener");
+  int ret;
+  chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1);
+  chkrc(ret == 1);
+  // one event is expected
+  chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+  chkrc(ret == 1);
+  chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+  chkrc(ret == 0);
+  return 0;
+}
+
+static int
+stoplistener()
+{
+  ll1("stoplistener");
+  chkdb(g_is->drop_listener(g_ndb_sys) != -1);
+  return 0;
+}
+
 // stats queries
 
 // exact stats from scan results
@@ -2081,6 +2116,7 @@ runtest()
   chkrc(createindex() == 0);
   chkrc(createNdbRecords() == 0);
   chkrc(definestat() == 0);
+  chkrc(startlistener() == 0);
 
   for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
     ll0("=== loop " << g_loop << " ===");
@@ -2094,6 +2130,7 @@ runtest()
     makeranges();
     chkrc(scanranges() == 0);
     chkrc(updatestat() == 0);
+    chkrc(runlistener() == 0);
     chkrc(readstat() == 0);
     chkrc(queryranges() == 0);
     loopstats();
@@ -2101,6 +2138,7 @@ runtest()
   }
   finalstats();
 
+  chkrc(stoplistener() == 0);
   if (!g_opts.keeptable)
     chkrc(droptable() == 0);
   freeranges();
@@ -2234,22 +2272,66 @@ docreate_stat_tables()
 {
   if (g_is->check_systables(g_ndb_sys) == 0)
     return 0;
+  ll1("check_systables: " << g_is->getNdbError());
 
-  if (g_is->create_systables(g_ndb_sys) == 0)
-  {
-    g_has_created_stat_tables = true;
-    return 0;
-  }
-  return -1;
+  ll0("create stat tables");
+  chkdb(g_is->create_systables(g_ndb_sys) == 0);
+  g_has_created_stat_tables = true;
+  return 0;
 }
 
 static
-void
+int
 dodrop_stat_tables()
 {
   if (g_has_created_stat_tables == false)
-    return;
-  g_is->drop_systables(g_ndb_sys);
+    return 0;
+
+  ll0("drop stat tables");
+  chkdb(g_is->drop_systables(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+docreate_stat_events()
+{
+  if (g_is->check_sysevents(g_ndb_sys) == 0)
+    return 0;
+  ll1("check_sysevents: " << g_is->getNdbError());
+
+  ll0("create stat events");
+  chkdb(g_is->create_sysevents(g_ndb_sys) == 0);
+  g_has_created_stat_events = true;
+  return 0;
+}
+
+static int
+dodrop_stat_events()
+{
+  if (g_has_created_stat_events == false)
+    return 0;
+
+  ll0("drop stat events");
+  chkdb(g_is->drop_sysevents(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+docreate_sys_objects()
+{
+  require(g_is != 0 && g_ndb_sys != 0);
+  chkrc(docreate_stat_tables() == 0);
+  chkrc(docreate_stat_events() == 0);
+  return 0;
+}
+
+static int
+dodrop_sys_objects()
+{
+  require(g_is != 0 && g_ndb_sys != 0);
+  chkrc(dodrop_stat_events() == 0);
+  chkrc(dodrop_stat_tables() == 0);
+  return 0;
 }
 
 int
@@ -2277,17 +2359,22 @@ main(int argc, char** argv)
     ll0("connect failed");
     return NDBT_ProgramExit(NDBT_FAILED);
   }
-  if (docreate_stat_tables() == -1){
-    ll0("failed to create stat tables");
-    return NDBT_ProgramExit(NDBT_FAILED);
+  if (docreate_sys_objects() == -1) {
+    ll0("failed to check or create stat tables and events");
+    goto failed;
   }
   if (runtest() == -1) {
     ll0("test failed");
-    dodrop_stat_tables();
-    dodisconnect();
-    return NDBT_ProgramExit(NDBT_FAILED);
+    goto failed;
+  }
+  if (dodrop_sys_objects() == -1) {
+    ll0("failed to drop created stat tables or events");
+    goto failed;
   }
-  dodrop_stat_tables();
   dodisconnect();
   return NDBT_ProgramExit(NDBT_OK);
+failed:
+  (void)dodrop_sys_objects();
+  dodisconnect();
+  return NDBT_ProgramExit(NDBT_FAILED);
 }

=== modified file 'storage/ndb/tools/ndb_index_stat.cpp'
--- a/storage/ndb/tools/ndb_index_stat.cpp	2011-07-31 17:40:01 +0000
+++ b/storage/ndb/tools/ndb_index_stat.cpp	2011-08-11 17:11:30 +0000
@@ -35,6 +35,8 @@ static my_bool _sys_create = false;
 static my_bool _sys_create_if_not_exist = false;
 static my_bool _sys_create_if_not_valid = false;
 static my_bool _sys_check = false;
+static my_bool _sys_skip_tables = false;
+static my_bool _sys_skip_events = false;
 static int _sys_any = 0;
 // other
 static my_bool _verbose = false;
@@ -382,69 +384,146 @@ dosys()
   {
     if (_sys_drop)
     {
-      g_info << "dropping any sys tables" << endl;
-      CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
-      CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-           "unexpected error: " << g_is->getNdbError());
+      if (!_sys_skip_events)
+      {
+        g_info << "dropping sys events" << endl;
+        CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+             "unexpected error: " << g_is->getNdbError());
+      }
+
+      if (!_sys_skip_tables)
+      {
+        g_info << "dropping all sys tables" << endl;
+        CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+             "unexpected error: " << g_is->getNdbError());
+      }
       g_info << "drop done" << endl;
     }
 
     if (_sys_create)
     {
-      g_info << "creating all sys tables" << endl;
-      CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      g_info << "create done" << endl;
-    }
-
-    if (_sys_create_if_not_exist)
-    {
-      if (g_is->check_systables(g_ndb_sys) == -1)
+      if (!_sys_skip_tables)
       {
-        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-             g_is->getNdbError());
         g_info << "creating all sys tables" << endl;
         CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
         CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      }
+
+      if (!_sys_skip_events)
+      {
+        g_info << "creating sys events" << endl;
+        CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
         g_info << "create done" << endl;
       }
-      else
+    }
+
+    if (_sys_create_if_not_exist)
+    {
+      if (!_sys_skip_tables)
       {
-        g_info << "using existing sys tables" << endl;
+        if (g_is->check_systables(g_ndb_sys) == -1)
+        {
+          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+               g_is->getNdbError());
+          g_info << "creating all sys tables" << endl;
+          CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys tables" << endl;
+        }
+      }
+
+      if (!_sys_skip_events)
+      {
+        if (g_is->check_sysevents(g_ndb_sys) == -1)
+        {
+          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+               g_is->getNdbError());
+          g_info << "creating sys events" << endl;
+          CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys events" << endl;
+        }
       }
     }
 
     if (_sys_create_if_not_valid)
     {
-      if (g_is->check_systables(g_ndb_sys) == -1)
+      if (!_sys_skip_tables)
       {
-        if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+        if (g_is->check_systables(g_ndb_sys) == -1)
         {
-          CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
-               g_is->getNdbError());
-          g_info << "dropping invalid sys tables" << endl;
-          CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
-          CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
-          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-               "unexpected error: " << g_is->getNdbError());
-          g_info << "drop done" << endl;
+          if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+          {
+            CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
+                 g_is->getNdbError());
+            g_info << "dropping invalid sys tables" << endl;
+            CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+            CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+            CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+                 "unexpected error: " << g_is->getNdbError());
+            g_info << "drop done" << endl;
+          }
+          g_info << "creating all sys tables" << endl;
+          CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys tables" << endl;
         }
-        g_info << "creating all sys tables" << endl;
-        CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
-        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-        g_info << "create done" << endl;
       }
-      else
+      if (!_sys_skip_events)
       {
-        g_info << "using existing sys tables" << endl;
+        if (g_is->check_sysevents(g_ndb_sys) == -1)
+        {
+          if (g_is->getNdbError().code != NdbIndexStat::NoSysEvents)
+          {
+            CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysEvents,
+                 g_is->getNdbError());
+            g_info << "dropping invalid sys events" << endl;
+            CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+            CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+            CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+                 "unexpected error: " << g_is->getNdbError());
+            g_info << "drop done" << endl;
+          }
+          g_info << "creating sys events" << endl;
+          CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys events" << endl;
+        }
       }
     }
 
     if (_sys_check)
     {
-      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      g_info << "sys tables ok" << endl;
+      if (!_sys_skip_tables)
+      {
+        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "sys tables ok" << endl;
+      }
+      if (!_sys_skip_events)
+      {
+        CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "sys events ok" << endl;
+      }
     }
   }
   while (0);
@@ -514,25 +593,33 @@ my_long_options[] =
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
   // sys options
   { "sys-drop", ++oi,
-    "Drop any stats tables in NDB kernel (all stats is lost)",
+    "Drop any stats tables and events in NDB kernel (all stats is lost)",
     (uchar **)&_sys_drop, (uchar **)&_sys_drop, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create", ++oi,
-    "Create stats tables in NDB kernel (must not exist)",
+    "Create stats tables and events in NDB kernel (must not exist)",
     (uchar **)&_sys_create, (uchar **)&_sys_create, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create-if-not-exist", ++oi,
-    "Like --sys-create but do nothing if correct stats tables exist",
+    "Like --sys-create but do nothing if correct objects exist",
     (uchar **)&_sys_create_if_not_exist, (uchar **)&_sys_create_if_not_exist, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create-if-not-valid", ++oi,
-    "Like --sys-create-if-not-exist but first drop any invalid tables",
+    "Like --sys-create-if-not-exist but first drop any invalid objects",
     (uchar **)&_sys_create_if_not_valid, (uchar **)&_sys_create_if_not_valid, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-check", ++oi,
-    "Check that correct stats tables exist in NDB kernel",
+    "Check that correct stats tables and events exist in NDB kernel",
     (uchar **)&_sys_check, (uchar **)&_sys_check, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-skip-tables", ++oi,
+    "Do not apply sys options to tables",
+    (uchar **)&_sys_skip_tables, (uchar **)&_sys_skip_tables, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-skip-events", ++oi,
+    "Do not apply sys options to events",
+    (uchar **)&_sys_skip_events, (uchar **)&_sys_skip_events, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   // other
   { "verbose", 'v',
     "Verbose messages",
@@ -581,7 +668,9 @@ checkopts(int argc, char** argv)
       (_sys_create_if_not_exist != 0) +
       (_sys_create_if_not_valid != 0) +
       (_sys_drop != 0) +
-      ( _sys_check != 0);
+      ( _sys_check != 0) +
+      (_sys_skip_tables != 0) +
+      (_sys_skip_events != 0);
     if (!_sys_any)
     {
       if (_dbname == 0)

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4422to 4423) WL#4124Pekka Nousiainen16 Aug