List:Commits« Previous MessageNext Message »
From:pekka Date:June 21 2006 2:15pm
Subject:bk commit into 5.1 tree (pekka:1.2233) BUG#18102
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2233 06/06/21 16:15:04 pekka@stripped +3 -0
  ndb - bug#18102: fixes on NDB API level

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
    1.25 06/06/21 16:13:21 pekka@stripped +1 -3
    fix getGCIEventOperations when merge events is on

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.62 06/06/21 16:13:20 pekka@stripped +21 -23
    fix getGCIEventOperations when merge events is on

  storage/ndb/test/ndbapi/test_event_merge.cpp
    1.12 06/06/21 16:11:52 pekka@stripped +754 -327
    test multiple tables and getGCIEventOperations

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51-bug18102

--- 1.61/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2006-06-14 01:19:06 +02:00
+++ 1.62/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2006-06-21 16:13:20 +02:00
@@ -1159,7 +1159,10 @@ NdbEventBuffer::nextEvent()
     NdbEventOperationImpl *op= data->m_event_op;
     DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
 
-    // blob table ops must not be seen at this level
+    /*
+     * If merge is on, blob part sub-events must not be seen on this level.
+     * If merge is not on, there are no blob part sub-events.
+     */
     assert(op->theMainOp == NULL);
 
     // set NdbEventOperation data
@@ -1175,13 +1178,6 @@ NdbEventBuffer::nextEvent()
     op->m_data_done_count++;
 #endif
 
-    // NUL event is not returned
-    if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
-    {
-      DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
-      continue;
-    }
-
     int r= op->receive_event();
     if (r > 0)
     {
@@ -1203,6 +1199,12 @@ NdbEventBuffer::nextEvent()
           gci_ops = m_available_data.next_gci_ops();
         }
         assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
+        // to return TE_NUL it should be made into data event
+        if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
+        {
+          DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
+          continue;
+        }
 	DBUG_RETURN_EVENT(op->m_facade);
       }
       // the next event belonged to an event op that is no
@@ -1800,19 +1802,19 @@ NdbEventBuffer::insertDataL(NdbEventOper
     else
     {
       // event with same op, PK found, merge into old buffer
-      Uint32 old_op = data->sdata->operation;
       if (unlikely(merge_data(sdata, ptr, data)))
       {
         op->m_has_error = 3;
         DBUG_RETURN_EVENT(-1);
       }
-      Uint32 new_op = data->sdata->operation;
-
-      // make Gci_ops reflect the merge by delete old and add new
-      EventBufData_list::Gci_op g = { op, (1 << old_op) };
-      // bucket->m_data.del_gci_op(g); // XXX whats wrong? fix later
-      g.event_types = (1 << new_op);
-      bucket->m_data.add_gci_op(g);
+      // merge is on so we do not report blob part events
+      if (! is_blob_event) {
+        // report actual operation, not composite
+        // there is no way to "fix" the flags for a composite op
+        // since the flags represent multiple ops on multiple PKs
+        EventBufData_list::Gci_op g = { op, (1 << sdata->operation) };
+        bucket->m_data.add_gci_op(g);
+      }
     }
     DBUG_RETURN_EVENT(0);
   }
@@ -2381,21 +2383,18 @@ void EventBufData_list::append_list(Even
 }
 
 void
-EventBufData_list::add_gci_op(Gci_op g, bool del)
+EventBufData_list::add_gci_op(Gci_op g)
 {
   DBUG_ENTER_EVENT("EventBufData_list::add_gci_op");
   DBUG_PRINT_EVENT("info", ("p.op: %p  g.event_types: %x", g.op, g.event_types));
-  assert(g.op != NULL);
+  assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent
   Uint32 i;
   for (i = 0; i < m_gci_op_count; i++) {
     if (m_gci_op_list[i].op == g.op)
       break;
   }
   if (i < m_gci_op_count) {
-    if (! del)
-      m_gci_op_list[i].event_types |= g.event_types;
-    else
-      m_gci_op_list[i].event_types &= ~ g.event_types;
+    m_gci_op_list[i].event_types |= g.event_types;
   } else {
     if (m_gci_op_count == m_gci_op_alloc) {
       Uint32 n = 1 + 2 * m_gci_op_alloc;
@@ -2413,7 +2412,6 @@ EventBufData_list::add_gci_op(Gci_op g, 
       m_gci_op_alloc = n;
     }
     assert(m_gci_op_count < m_gci_op_alloc);
-    assert(! del);
 #ifndef DBUG_OFF
     i = m_gci_op_count;
 #endif

--- 1.24/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2006-05-19 15:16:37 +02:00
+++ 1.25/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2006-06-21 16:13:21 +02:00
@@ -145,9 +145,7 @@ public:
   Gci_ops *first_gci_ops();
   Gci_ops *next_gci_ops();
   // case 1 above; add Gci_op to single list
-  void add_gci_op(Gci_op g, bool del = false);
-  // delete bit from existing flags
-  void del_gci_op(Gci_op g) { add_gci_op(g, true); }
+  void add_gci_op(Gci_op g);
 private:
   // case 2 above; move single list or multi list from
   // one list to another

--- 1.11/storage/ndb/test/ndbapi/test_event_merge.cpp	2006-02-01 18:08:08 +01:00
+++ 1.12/storage/ndb/test/ndbapi/test_event_merge.cpp	2006-06-21 16:11:52 +02:00
@@ -36,7 +36,9 @@
  * operation and its post/pre data
  *
  * 2) In event API version >= 5.1 separate commits within same GCI are
- * by default merged.  This is required to read blob data via NdbBlob.
+ * optionally merged.  This is required to read blob data via NdbBlob.
+ *
+ * In this test merge is on by default.
  *
  * Option --separate-events disables GCI merge and implies --no-blobs.
  * This is used to test basic events functionality.
@@ -56,6 +58,10 @@
  * DEL o INS = UPD
  * UPD o DEL = DEL
  * UPD o UPD = UPD
+ *
+ * Event merge in NDB API handles idempotent INS o INS and DEL o DEL
+ * which are possible on NF (node failure).  This test does not handle
+ * them when --separate-events is used.
  */
 
 struct Opts {
@@ -72,6 +78,7 @@ struct Opts {
   my_bool one_blob;
   const char* opstring;
   uint seed;
+  int maxtab;
   my_bool separate_events;
   uint tweak; // whatever's useful
   my_bool use_table;
@@ -79,6 +86,7 @@ struct Opts {
 
 static Opts g_opts;
 static const uint g_maxpk = 1000;
+static const uint g_maxtab = 100;
 static const uint g_maxopstringpart = 100;
 static const char* g_opstringpart[g_maxopstringpart];
 static uint g_opstringparts = 0;
@@ -91,8 +99,6 @@ static NdbTransaction* g_con = 0;
 static NdbOperation* g_op = 0;
 static NdbScanOperation* g_scan_op = 0;
 
-static const char* g_tabname = "tem1";
-static const char* g_evtname = "tem1ev1";
 static const uint g_charlen = 5;
 static const char* g_charval = "abcdefgh";
 static const char* g_csname = "latin1_swedish_ci";
@@ -102,9 +108,6 @@ static uint g_blobpartsize = 2000;
 static uint g_blobstripesize = 2;
 static const uint g_maxblobsize = 100000;
 
-static const NdbDictionary::Table* g_tab = 0;
-static const NdbDictionary::Event* g_evt = 0;
-
 static NdbEventOperation* g_evt_op = 0;
 static NdbBlob* g_bh = 0;
 
@@ -151,6 +154,9 @@ static int& g_loglevel = g_opts.loglevel
 #define ll2(x) \
   do { if (g_loglevel < 2) break; ndbout << x << endl; } while (0)
 
+#define ll3(x) \
+  do { if (g_loglevel < 3) break; ndbout << x << endl; } while (0)
+
 static void
 errdb()
 {
@@ -188,7 +194,7 @@ errdb()
   if (g_bh != 0) {
     const NdbError& e = g_bh->getNdbError();
     if (e.code != 0)
-      ll0(++any << " evt_op: error " << e);
+      ll0(++any << " bh: error " << e);
   }
   if (! any)
     ll0("unknown db error");
@@ -209,7 +215,7 @@ struct Col {
   }
 };
 
-static Col g_col[] = {
+static const Col g_col[] = {
   { 0, "pk1", NdbDictionary::Column::Unsigned, true, false, 1, 4 },
   { 1, "pk2", NdbDictionary::Column::Char, true, false,  g_charlen, g_charlen },
   { 2, "seq", NdbDictionary::Column::Unsigned,  false, false, 1, 4 },
@@ -229,7 +235,7 @@ ncol()
   if (g_opts.no_blobs)
     n -= g_blobcols;
   else if (g_opts.one_blob)
-    n -= (g_blobcols - 1);
+    n -= (g_blobcols - 2);
   return n;
 }
 
@@ -252,17 +258,49 @@ getcol(const char* name)
   return getcol(i);
 }
 
+struct Tab {
+  char tabname[20];
+  const Col* col;
+  const NdbDictionary::Table* tab;
+  char evtname[20];
+  const NdbDictionary::Event* evt;
+  Tab(uint idx) :
+    col(g_col),
+    tab(0),
+    evt(0)
+  {
+    sprintf(tabname, "tem%d", idx);
+    sprintf(evtname, "tem%dev", idx);
+  }
+};
+
+static Tab* g_tablst[g_maxtab];
+
+static uint
+maxtab()
+{
+  return g_opts.maxtab;
+}
+
+static Tab&
+tab(uint i)
+{
+  assert(i < maxtab() && g_tablst[i] != 0);
+  return *g_tablst[i];
+}
+
 static int
-createtable()
+createtable(Tab& t)
 {
-  g_tab = 0;
-  NdbDictionary::Table tab(g_tabname);
+  ll2("createtable: " << t.tabname);
+  t.tab = 0;
+  NdbDictionary::Table tab(t.tabname);
   tab.setLogging(false);
   CHARSET_INFO* cs;
   chkrc((cs = get_charset_by_name(g_csname, MYF(0))) != 0);
   uint i;
   for (i = 0; i < ncol(); i++) {
-    const Col& c = g_col[i];
+    const Col& c = t.col[i];
     NdbDictionary::Column col(c.name);
     col.setType(c.type);
     col.setPrimaryKey(c.pk);
@@ -295,16 +333,16 @@ createtable()
   }
   g_dic = g_ndb->getDictionary();
   if (! g_opts.use_table) {
-    if (g_dic->getTable(g_tabname) != 0)
-      chkdb(g_dic->dropTable(g_tabname) == 0);
+    if (g_dic->getTable(t.tabname) != 0)
+      chkdb(g_dic->dropTable(t.tabname) == 0);
     chkdb(g_dic->createTable(tab) == 0);
   }
-  chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
+  chkdb((t.tab = g_dic->getTable(t.tabname)) != 0);
   g_dic = 0;
   if (! g_opts.use_table) {
     // extra row for GCI probe
     chkdb((g_con = g_ndb->startTransaction()) != 0);
-    chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
+    chkdb((g_op = g_con->getNdbOperation(t.tabname)) != 0);
     chkdb(g_op->insertTuple() == 0);
     Uint32 pk1;
     char pk2[g_charlen + 1];
@@ -321,17 +359,94 @@ createtable()
 }
 
 static int
-droptable()
+createtable()
+{
+  ll1("createtable");
+  for (uint i = 0; i < maxtab(); i++)
+    chkrc(createtable(tab(i)) == 0);
+  return 0;
+}
+
+static int
+droptable(Tab& t)
 {
+  ll2("droptable: " << t.tabname);
   if (! g_opts.use_table) {
     g_dic = g_ndb->getDictionary();
-    chkdb(g_dic->dropTable(g_tab->getName()) == 0);
-    g_tab = 0;
+    chkdb(g_dic->dropTable(t.tabname) == 0);
+    t.tab = 0;
     g_dic = 0;
   }
   return 0;
 }
 
+static int
+droptable()
+{
+  ll1("droptable");
+  for (uint i = 0; i < maxtab(); i++)
+    chkrc(droptable(tab(i)) == 0);
+  return 0;
+}
+
+static int
+createevent(Tab& t)
+{
+  ll2("createevent: " << t.evtname);
+  t.evt = 0;
+  g_dic = g_ndb->getDictionary();
+  NdbDictionary::Event evt(t.evtname);
+  assert(t.tab != 0);
+  evt.setTable(*t.tab);
+  evt.addTableEvent(NdbDictionary::Event::TE_ALL);
+  uint i;
+  for (i = 0; i < ncol(); i++) {
+    const Col& c = g_col[i];
+    evt.addEventColumn(c.name);
+  }
+  evt.setReport(NdbDictionary::Event::ER_UPDATED);
+  evt.mergeEvents(! g_opts.separate_events);
+#if 0 // XXX random bugs
+  if (g_dic->getEvent(t.evtname) != 0)
+    chkdb(g_dic->dropEvent(t.evtname) == 0);
+#else
+  (void)g_dic->dropEvent(t.evtname);
+  chkdb(g_dic->createEvent(evt) == 0);
+#endif
+  chkdb((t.evt = g_dic->getEvent(t.evtname)) != 0);
+  g_dic = 0;
+  return 0;
+}
+
+static int
+createevent()
+{
+  ll1("createevent");
+  for (uint i = 0; i < maxtab(); i++)
+    chkrc(createevent(tab(i)) == 0);
+  return 0;
+}
+
+static int
+dropevent(Tab& t, bool force = false)
+{
+  ll2("dropevent: " << t.evtname);
+  g_dic = g_ndb->getDictionary();
+  chkdb(g_dic->dropEvent(t.evtname) == 0 || force);
+  t.evt = 0;
+  g_dic = 0;
+  return 0;
+}
+
+static int
+dropevent(bool force = false)
+{
+  ll1("dropevent");
+  for (uint i = 0; i < maxtab(); i++)
+    chkrc(dropevent(tab(i), force) == 0 || force);
+  return 0;
+}
+
 struct Data {
   struct Txt { char* val; uint len; };
   union Ptr { Uint32* u32; char* ch; Txt* txt; void* v; };
@@ -445,15 +560,15 @@ operator<<(NdbOut& out, const Data& d)
     case NdbDictionary::Column::Text:
     case NdbDictionary::Column::Blob:
       {
-        Data::Txt& t = *d.ptr[i].txt;
+        Data::Txt& txt = *d.ptr[i].txt;
         bool first = true;
         uint j = 0;
-        while (j < t.len) {
+        while (j < txt.len) {
           char c[2];
-          c[0] = t.val[j++];
+          c[0] = txt.val[j++];
           c[1] = 0;
           uint m = 1;
-          while (j < t.len && t.val[j] == c[0])
+          while (j < txt.len && txt.val[j] == c[0])
             j++, m++;
           if (! first)
             out << "+";
@@ -471,10 +586,11 @@ operator<<(NdbOut& out, const Data& d)
 }
 
 // some random os may define these
-#undef NUL
+#undef UNDEF
 #undef INS
 #undef DEL
 #undef UPD
+#undef NUL
 
 static const uint g_optypes = 3; // real ops 0-2
 
@@ -485,7 +601,7 @@ static const uint g_optypes = 3; // real
  */
 struct Op { // single or composite
   enum Kind { OP = 1, EV = 2 };
-  enum Type { NUL = -1, INS, DEL, UPD };
+  enum Type { UNDEF = -1, INS, DEL, UPD, NUL };
   Kind kind;
   Type type;
   Op* next_op; // within one commit
@@ -499,10 +615,10 @@ struct Op { // single or composite
   Data data[2]; // 0-post 1-pre
   bool match; // matched to event
   Uint32 gci; // defined for com op and event
-  void init(Kind a_kind) {
+  void init(Kind a_kind, Type a_type = UNDEF) {
     kind = a_kind;
     assert(kind == OP || kind == EV);
-    type = NUL;
+    type = a_type;
     next_op = next_com = next_gci = next_ev = next_free = 0;
     free = false;
     num_op = num_com = 0;
@@ -518,12 +634,9 @@ struct Op { // single or composite
 };
 
 static NdbOut&
-operator<<(NdbOut& out, Op::Type t)
+operator<<(NdbOut& out, Op::Type optype)
 {
-  switch (t) {
-  case Op::NUL:
-    out << "NUL";
-    break;
+  switch (optype) {
   case Op::INS:
     out << "INS";
     break;
@@ -533,8 +646,11 @@ operator<<(NdbOut& out, Op::Type t)
   case Op::UPD:
     out << "UPD";
     break;
+  case Op::NUL:
+    out << "NUL";
+    break;
   default:
-    out << (int)t;
+    out << (int)optype;
     break;
   }
   return out;
@@ -554,41 +670,149 @@ operator<<(NdbOut& out, const Op& op)
 static int
 seteventtype(Op* ev, NdbDictionary::Event::TableEvent te)
 {
-  Op::Type t = Op::NUL;
+  Op::Type optype = Op::UNDEF;
   switch (te) {
   case NdbDictionary::Event::TE_INSERT:
-    t = Op::INS;
+    optype = Op::INS;
     break;
   case NdbDictionary::Event::TE_DELETE:
-    t = Op::DEL;
+    optype = Op::DEL;
     break;
   case NdbDictionary::Event::TE_UPDATE:
-    t = Op::UPD;
+    optype = Op::UPD;
     break;
   default:
-    ll0("EVT: " << *ev << ": bad event type" << (int)te);
+    ll0("EVT: " << *ev << ": bad event type " << hex << (uint)te);
     return -1;
   }
-  ev->type = t;
+  ev->type = optype;
   return 0;
 }
 
+struct Counter { // debug aid
+  const char* name;
+  uint count;
+  Counter(const char* a_name) : name(a_name), count(0) {
+  }
+  friend class NdbOut& operator<<(NdbOut& out, const Counter& counter) {
+    out << counter.name << "(" << counter.count << ")";
+    return out;
+  }
+  operator uint() {
+    return count;
+  }
+  Counter operator ++(int) {
+    ll3(*this << "++");
+    Counter tmp = *this;
+    count++;
+    return tmp;
+  }
+  Counter operator --(int) {
+    ll3(*this << "--");
+    assert(count != 0);
+    Counter tmp = *this;
+    count--;
+    return tmp;
+  }
+};
+
 static Op* g_opfree = 0;
 static uint g_freeops = 0;
 static uint g_usedops = 0;
+static uint g_gciops = 0;
 static uint g_maxcom = 10; // max ops per commit
-static Op* g_pk_op[g_maxpk];
-static Op* g_pk_ev[g_maxpk];
 static uint g_seq = 0;
-static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-pre
-static NdbBlob* g_ev_bh[2][g_maxcol]; // 0-post 1-pre
 static Op* g_rec_ev;
-static uint g_ev_pos[g_maxpk];
-static uint g_num_gci = 0;
 static uint g_num_ev = 0;
 
+static const uint g_maxgcis = 500; // max GCIs seen during 1 loop
+
+// operation data per table and each loop
+struct Run : public Tab {
+  bool skip; // no ops in current loop
+  NdbEventOperation* evt_op;
+  uint gcicnt; // number of CGIs seen in current loop
+  Uint64 gcinum[g_maxgcis];
+  Uint32 gcievtypes[g_maxgcis][2]; // 0-getGCIEventOperations 1-nextEvent
+  uint tableops; // real table ops in this loop
+  uint blobops; // approx blob part ops in this loop
+  uint gciops; // commit chains or (after mergeops) gci chains
+  Op* pk_op[g_maxpk]; // GCI chain of ops per PK
+  Op* pk_ev[g_maxpk]; // events per PK
+  uint ev_pos[g_maxpk]; // counts events
+  NdbRecAttr* ev_ra[2][g_maxcol]; // 0-post 1-pre
+  NdbBlob* ev_bh[2][g_maxcol]; // 0-post 1-pre
+  Run(uint idx) :
+    Tab(idx)
+  {
+    reset();
+  }
+  void reset()
+  {
+    int i, j;
+    skip = false;
+    evt_op = 0;
+    gcicnt = 0;
+    for (i = 0; i < g_maxgcis; i++) {
+      gcinum[i] = (Uint64)0;
+      gcievtypes[i][0] = gcievtypes[i][1] = (Uint32)0;
+    }
+    tableops = 0;
+    blobops = 0;
+    gciops = 0;
+    for (i = 0; i < g_maxpk; i++) {
+      pk_op[i] = 0;
+      pk_ev[i] = 0;
+      ev_pos[i] = 0;
+    }
+    for (j = 0; i < 2; j ++) {
+      for (i = 0; i < g_maxcol; i++) {
+        ev_ra[j][i] = 0;
+        ev_bh[j][i] = 0;
+      }
+    }
+  }
+  int addgci(Uint64 gci)
+  {
+    assert(gcicnt < g_maxgcis);
+    chkrc(gcicnt == 0 || gcinum[gcicnt - 1] < gci);
+    gcinum[gcicnt++] = gci;
+    return 0;
+  }
+  void addevtypes(Uint64 gci, Uint32 evtypes, uint i)
+  {
+    assert(gcicnt != 0 && gci == gcinum[gcicnt - 1]);
+    assert(evtypes != 0);
+    assert(i < 2);
+    gcievtypes[gcicnt - 1][i] |= evtypes;
+  }
+};
+
+static Run* g_runlst[g_maxtab];
+
+static uint
+maxrun()
+{
+  return maxtab();
+}
+
+static Run&
+run(uint i)
+{
+  assert(i < maxrun() && g_runlst[i] != 0);
+  return *g_runlst[i];
+}
+
+static void
+initrun()
+{
+  uint i;
+  for (i = 0; i < maxrun(); i++)
+    g_tablst[i] = g_runlst[i] = new Run(i);
+}
+
 static Op*
-getop(Op::Kind a_kind)
+getop(Op::Kind a_kind, Op::Type a_type = Op::UNDEF)
 {
   if (g_opfree == 0) {
     assert(g_freeops == 0);
@@ -604,14 +828,16 @@ getop(Op::Kind a_kind)
   assert(g_freeops != 0);
   g_freeops--;
   g_usedops++;
-  op->init(a_kind);
+  op->init(a_kind, a_type);
   op->free = false;
+  ll3("getop: " << op);
   return op;
 }
 
 static void
 freeop(Op* op)
 {
+  ll3("freeop: " << op);
   assert(! op->free);
   op->freemem();
   op->free = true;
@@ -623,26 +849,16 @@ freeop(Op* op)
 }
 
 static void
-resetmem()
+resetmem(Run& r)
 {
-  int i, j;
-  for (j = 0; j < 2; j++) {
-    for (i = 0; i < g_maxcol; i++) {
-      g_ev_ra[j][i] = 0;
-      g_ev_bh[j][i] = 0;
-    }
-  }
-  if (g_rec_ev != 0) {
-    freeop(g_rec_ev);
-    g_rec_ev = 0;
-  }
+  ll2("resetmem");
   Uint32 pk1;
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++)
-    g_ev_pos[pk1] = 0;
+    r.ev_pos[pk1] = 0;
   // leave g_seq
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    if (g_pk_op[pk1] != 0) {
-      Op* tot_op = g_pk_op[pk1];
+    if (r.pk_op[pk1] != 0) {
+      Op* tot_op = r.pk_op[pk1];
       while (tot_op->next_gci != 0) {
         Op* gci_op = tot_op->next_gci;
         while (gci_op->next_com != 0) {
@@ -659,21 +875,33 @@ resetmem()
         freeop(gci_op);
       }
       freeop(tot_op);
-      g_pk_op[pk1] = 0;
+      r.pk_op[pk1] = 0;
     }
-    if (g_pk_ev[pk1] != 0) {
-      Op* tot_op = g_pk_ev[pk1];
+    if (r.pk_ev[pk1] != 0) {
+      Op* tot_op = r.pk_ev[pk1];
       while (tot_op->next_ev != 0) {
         Op* ev = tot_op->next_ev;
         tot_op->next_ev = ev->next_ev;
         freeop(ev);
       }
       freeop(tot_op);
-      g_pk_ev[pk1] = 0;
+      r.pk_ev[pk1] = 0;
     }
   }
+  r.reset();
+}
+
+static void
+resetmem()
+{
+  if (g_rec_ev != 0) {
+    freeop(g_rec_ev);
+    g_rec_ev = 0;
+  }
+  for (uint i = 0; i < maxrun(); i++)
+    resetmem(run(i));
   assert(g_usedops == 0);
-  g_num_gci = g_num_ev = 0;
+  g_gciops = g_num_ev = 0;
 }
 
 static void
@@ -706,10 +934,11 @@ static const uint g_ncomp = sizeof(g_com
 static int
 checkop(const Op* op, Uint32& pk1)
 {
-  Op::Type t = op->type;
-  if (t == Op::NUL)
+  Op::Type optype = op->type;
+  assert(optype != Op::UNDEF);
+  if (optype == Op::NUL)
     return 0;
-  chkrc(t == Op::INS || t == Op::DEL || t == Op::UPD);
+  chkrc(optype == Op::INS || optype == Op::DEL || optype == Op::UPD);
   const Data& d0 = op->data[0];
   const Data& d1 = op->data[1];
   {
@@ -726,19 +955,19 @@ checkop(const Op* op, Uint32& pk1)
     // the rules are the rules..
     if (c.pk) {
       chkrc(ind0 == 0); // always PK in post data
-      if (t == Op::INS)
+      if (optype == Op::INS)
         chkrc(ind1 == -1);
-      if (t == Op::DEL)
+      if (optype == Op::DEL)
         chkrc(ind1 == -1); // no PK in pre data
-      if (t == Op::UPD)
+      if (optype == Op::UPD)
         chkrc(ind1 == 0);
     }
     if (! c.pk) {
-      if (t == Op::INS)
+      if (optype == Op::INS)
         chkrc(ind0 >= 0 && ind1 == -1);
-      if (t == Op::DEL)
+      if (optype == Op::DEL)
         chkrc(ind0 == -1 && ind1 >= 0); // always non-PK in pre data
-      if (t == Op::UPD)
+      if (optype == Op::UPD)
         chkrc(ind0 == -1 || ind1 >= 0); // update must have pre data
     }
     if (! c.nullable) {
@@ -750,10 +979,10 @@ checkop(const Op* op, Uint32& pk1)
       for (j = 0; j < 2; j++) {
         const Data& d = op->data[j];
         if (d.ind[i] == 0) {
-          const Data::Txt& t = *d.ptr[i].txt;
+          const Data::Txt& txt = *d.ptr[i].txt;
           int k;
-          for (k = 0; k < t.len; k++) {
-            chkrc(strchr(g_charval, t.val[k]) != 0);
+          for (k = 0; k < txt.len; k++) {
+            chkrc(strchr(g_charval, txt.val[k]) != 0);
           }
         }
       }
@@ -837,6 +1066,7 @@ copyop(const Op* op1, Op* op3)
 static int
 compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
 {
+  assert(op1->type != Op::UNDEF && op2->type != Op::UNDEF);
   Comp* comp;
   if (op2->type == Op::NUL) {
     copyop(op1, op3);
@@ -882,67 +1112,69 @@ compop(const Op* op1, const Op* op2, Op*
 }
 
 static int
-createevent()
+createeventop(Run& r)
 {
-  ll1("createevent");
-  g_evt = 0;
-  g_dic = g_ndb->getDictionary();
-  NdbDictionary::Event evt(g_evtname);
-  evt.setTable(*g_tab);
-  evt.addTableEvent(NdbDictionary::Event::TE_ALL);
+  ll2("createeventop: " << r.tabname);
+  chkdb((r.evt_op = g_ndb->createEventOperation(r.evtname)) != 0);
+  r.evt_op->mergeEvents(! g_opts.separate_events); // not yet inherited
   uint i;
   for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
-    evt.addEventColumn(c.name);
+    Data (&d)[2] = g_rec_ev->data;
+    if (! c.isblob()) {
+      chkdb((r.ev_ra[0][i] = r.evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0);
+      chkdb((r.ev_ra[1][i] = r.evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0);
+    } else {
+      chkdb((r.ev_bh[0][i] = r.evt_op->getBlobHandle(c.name)) != 0);
+      chkdb((r.ev_bh[1][i] = r.evt_op->getPreBlobHandle(c.name)) != 0);
+    }
   }
-  evt.setReport(NdbDictionary::Event::ER_UPDATED);
-  evt.mergeEvents(! g_opts.separate_events);
-  if (g_dic->getEvent(evt.getName()) != 0)
-    chkdb(g_dic->dropEvent(evt.getName()) == 0);
-  chkdb(g_dic->createEvent(evt) == 0);
-  chkdb((g_evt = g_dic->getEvent(evt.getName())) != 0);
-  g_dic = 0;
   return 0;
 }
 
 static int
-dropevent()
+createeventop()
 {
-  ll1("dropevent");
-  g_dic = g_ndb->getDictionary();
-  chkdb(g_dic->dropEvent(g_evt->getName()) == 0);
-  g_evt = 0;
-  g_dic = 0;
+  ll1("createeventop");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(createeventop(run(i)) == 0);
   return 0;
 }
 
 static int
-createeventop()
+executeeventop(Run& r)
 {
-  ll1("createeventop");
-  chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
-  g_evt_op->mergeEvents(! g_opts.separate_events); // not yet inherited
-  uint i;
-  for (i = 0; i < ncol(); i++) {
-    const Col& c = g_col[i];
-    Data (&d)[2] = g_rec_ev->data;
-    if (! c.isblob()) {
-      chkdb((g_ev_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0);
-      chkdb((g_ev_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0);
-    } else {
-      chkdb((g_ev_bh[0][i] = g_evt_op->getBlobHandle(c.name)) != 0);
-      chkdb((g_ev_bh[1][i] = g_evt_op->getPreBlobHandle(c.name)) != 0);
-    }
+  ll2("executeeventop: " << r.tabname);
+  chkdb(r.evt_op->execute() == 0);
+  return 0;
+}
+
+static int
+executeeventop()
+{
+  ll1("executeeventop");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(executeeventop(run(i)) == 0);
+  return 0;
+}
+
+static int
+dropeventop(Run& r, bool force = false)
+{
+  ll2("dropeventop: " << r.tabname);
+  if (r.evt_op != 0) {
+    chkdb(g_ndb->dropEventOperation(r.evt_op) == 0 || force);
+    r.evt_op = 0;
   }
   return 0;
 }
 
 static int
-dropeventop()
+dropeventops(bool force = false)
 {
-  ll1("dropeventop");
-  chkdb(g_ndb->dropEventOperation(g_evt_op) == 0);
-  g_evt_op = 0;
+  ll1("dropeventops");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(dropeventop(run(i), force) == 0 || force);
   return 0;
 }
 
@@ -956,11 +1188,12 @@ waitgci(uint ngci)
   while (1) {
     chkdb((g_con = g_ndb->startTransaction()) != 0);
     { // forced to exec a dummy op
+      Tab& t = tab(0); // use first table
       Uint32 pk1;
       char pk2[g_charlen + 1];
       pk1 = g_maxpk;
       sprintf(pk2, "%-*u", g_charlen, pk1);
-      chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
+      chkdb((g_op = g_con->getNdbOperation(t.tabname)) != 0);
       chkdb(g_op->readTuple() == 0);
       chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
       chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
@@ -982,14 +1215,15 @@ waitgci(uint ngci)
 
 // scan table and set current tot_op for each pk1
 static int
-scantab()
+scantable(Run& r)
 {
+  ll2("scantable: " << r.tabname);
   NdbRecAttr* ra[g_maxcol];
   NdbBlob* bh[g_maxcol];
   Op* rec_op = getop(Op::OP);
   Data& d0 = rec_op->data[0];
   chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkdb((g_scan_op = g_con->getNdbScanOperation(g_tabname)) != 0);
+  chkdb((g_scan_op = g_con->getNdbScanOperation(r.tabname)) != 0);
   chkdb(g_scan_op->readTuples() == 0);
   uint i;
   for (i = 0; i < ncol(); i++) {
@@ -1017,27 +1251,27 @@ scantab()
         ret = bh[i]->getDefined(ind);
         assert(ret == 0);
         if (ind == 0) {
-          Data::Txt& t = *d0.ptr[i].txt;
+          Data::Txt& txt = *d0.ptr[i].txt;
           Uint64 len64;
           ret = bh[i]->getLength(len64);
           assert(ret == 0);
-          t.len = (uint)len64;
-          delete [] t.val;
-          t.val = new char [t.len];
-          memset(t.val, 'X', t.len);
-          Uint32 len = t.len;
-          ret = bh[i]->readData(t.val, len);
-          assert(ret == 0 && len == t.len);
+          txt.len = (uint)len64;
+          delete [] txt.val;
+          txt.val = new char [txt.len];
+          memset(txt.val, 'X', txt.len);
+          Uint32 len = txt.len;
+          ret = bh[i]->readData(txt.val, len);
+          assert(ret == 0 && len == txt.len);
           // to see the data, have to execute...
           chkdb(g_con->execute(NoCommit) == 0);
-          assert(memchr(t.val, 'X', t.len) == 0);
+          assert(memchr(txt.val, 'X', txt.len) == 0);
         }
       }
       assert(ind >= 0);
       d0.ind[i] = ind;
     }
-    assert(g_pk_op[pk1] == 0);
-    Op* tot_op = g_pk_op[pk1] = getop(Op::OP);
+    assert(r.pk_op[pk1] == 0);
+    Op* tot_op = r.pk_op[pk1] = getop(Op::OP);
     copyop(rec_op, tot_op);
     tot_op->type = Op::INS;
   }
@@ -1049,8 +1283,17 @@ scantab()
   return 0;
 }
 
+static int
+scantable()
+{
+  ll1("scantable");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(scantable(run(i)) == 0);
+  return 0;
+}
+
 static void
-makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
+makedata(const Col& c, Data& d, Uint32 pk1, Op::Type optype)
 {
   uint i = c.no;
   if (c.pk) {
@@ -1072,15 +1315,15 @@ makedata(const Col& c, Data& d, Uint32 p
       break;
     }
     d.ind[i] = 0;
-  } else if (t == Op::DEL) {
+  } else if (optype == Op::DEL) {
     ;
   } else if (i == getcol("seq").no) {
     d.seq = g_seq++;
     d.ind[i] = 0;
-  } else if (t == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) {
+  } else if (optype == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) {
     d.noop |= (1 << i);
     d.ind[i] = 1; // implicit NULL value is known
-  } else if (t == Op::UPD && ! g_opts.no_missing_update && urandom(10, 100)) {
+  } else if (optype == Op::UPD && ! g_opts.no_missing_update && urandom(10, 100)) {
     d.noop |= (1 << i);
     d.ind[i] = -1; // fixed up in caller
   } else if (! g_opts.no_nulls && c.nullable && urandom(10, 100)) {
@@ -1111,22 +1354,22 @@ makedata(const Col& c, Data& d, Uint32 p
     case NdbDictionary::Column::Blob:
       {
         const bool tinyblob = (c.type == NdbDictionary::Column::Blob);
-        Data::Txt& t = *d.ptr[i].txt;
-        delete [] t.val;
-        t.val = 0;
+        Data::Txt& txt = *d.ptr[i].txt;
+        delete [] txt.val;
+        txt.val = 0;
         if (g_opts.tweak & 1) {
           uint u = g_blobinlinesize + (tinyblob ? 0 : g_blobpartsize);
           uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval));
-          t.val = new char [u];
-          t.len = u;
-          memset(t.val, g_charval[v], u);
+          txt.val = new char [u];
+          txt.len = u;
+          memset(txt.val, g_charval[v], u);
           break;
         }
         uint u = urandom(tinyblob ? g_blobinlinesize : g_maxblobsize);
         u = urandom(u); // 4x bias for smaller blobs
         u = urandom(u);
-        t.val = new char [u];
-        t.len = u;
+        txt.val = new char [u];
+        txt.len = u;
         uint j = 0;
         while (j < u) {
           assert(u > 0);
@@ -1134,7 +1377,7 @@ makedata(const Col& c, Data& d, Uint32 p
           if (k > u - j)
             k = u - j;
           uint v = urandom(strlen(g_charval));
-          memset(&t.val[j], g_charval[v], k);
+          memset(&txt.val[j], g_charval[v], k);
           j += k;
         }
       }
@@ -1148,25 +1391,25 @@ makedata(const Col& c, Data& d, Uint32 p
 }
 
 static void
-makeop(const Op* prev_op, Op* op, Uint32 pk1, Op::Type t)
+makeop(const Op* prev_op, Op* op, Uint32 pk1, Op::Type optype)
 {
-  op->type = t;
+  op->type = optype;
   const Data& dp = prev_op->data[0];
   Data& d0 = op->data[0];
   Data& d1 = op->data[1];
   uint i;
   for (i = 0; i < ncol(); i++) {
     const Col& c = getcol(i);
-    makedata(c, d0, pk1, t);
-    if (t == Op::INS) {
+    makedata(c, d0, pk1, optype);
+    if (optype == Op::INS) {
       d1.ind[i] = -1;
-    } else if (t == Op::DEL) {
+    } else if (optype == Op::DEL) {
       assert(dp.ind[i] >= 0);
       if (c.pk)
         d1.ind[i] = -1;
       else
         copycol(c, dp, d1);
-    } else if (t == Op::UPD) {
+    } else if (optype == Op::UPD) {
       assert(dp.ind[i] >= 0);
       if (d0.ind[i] == -1) // not updating this col
         copycol(c, dp, d0); // must keep track of data
@@ -1180,14 +1423,30 @@ makeop(const Op* prev_op, Op* op, Uint32
   reqrc(pk1 == pk1_tmp);
 }
 
+static uint
+approxblobops(Op* op)
+{
+  uint avg_blob_size = g_maxblobsize / 4; // see makedata()
+  uint avg_blob_ops = avg_blob_size / 2000;
+  uint n = 0;
+  if (! g_opts.no_blobs) {
+    n += avg_blob_ops;
+    if (! g_opts.one_blob)
+      n += avg_blob_ops;
+    if (op->type == Op::UPD)
+      n *= 2;
+  }
+  return n;
+}
+
 static void
-makeops()
+makeops(Run& r)
 {
-  ll1("makeops");
+  ll1("makeops: " << r.tabname);
   Uint32 pk1 = 0;
   while (1) {
     if (g_opts.opstring == 0) {
-      if (g_usedops >= g_opts.maxops) // use up ops
+      if (r.tableops + r.blobops >= g_opts.maxops) // use up ops
         break;
       pk1 = urandom(g_opts.maxpk);
     } else {
@@ -1197,17 +1456,17 @@ makeops()
     ll2("makeops: pk1=" << pk1);
     // total op on the pk so far
     // optype either NUL=initial/deleted or INS=created
-    Op* tot_op = g_pk_op[pk1];
+    Op* tot_op = r.pk_op[pk1];
     if (tot_op == 0)
-      tot_op = g_pk_op[pk1] = getop(Op::OP);
+      tot_op = r.pk_op[pk1] = getop(Op::OP, Op::NUL);
     assert(tot_op->type == Op::NUL || tot_op->type == Op::INS);
     // add new commit chain to end
     Op* last_gci = tot_op;
     while (last_gci->next_gci != 0)
       last_gci = last_gci->next_gci;
-    Op* gci_op = getop(Op::OP);
+    Op* gci_op = getop(Op::OP, Op::NUL);
     last_gci->next_gci = gci_op;
-    Op* com_op = getop(Op::OP);
+    Op* com_op = getop(Op::OP, Op::NUL);
     gci_op->next_com = com_op;
     // length of random chain
     uint len = ~0;
@@ -1215,18 +1474,18 @@ makeops()
       len = 1 + urandom(g_maxcom - 1);
       len = 1 + urandom(len - 1); // 2x bias for short chain
     }
-    ll2("makeops: com chain");
     uint n = 0;
     while (1) {
       // random or from current g_opts.opstring part
-      Op::Type t;
+      Op::Type optype;
       if (g_opts.opstring == 0) {
         if (n == len)
           break;
         do {
-          t = (Op::Type)urandom(g_optypes);
-        } while (tot_op->type == Op::NUL && (t == Op::DEL || t == Op::UPD) ||
-                 tot_op->type == Op::INS && t == Op::INS);
+          optype = (Op::Type)urandom(g_optypes);
+        } while (tot_op->type == Op::NUL &&
+                 (optype == Op::DEL || optype == Op::UPD) ||
+                 tot_op->type == Op::INS && optype == Op::INS);
       } else {
         const char* str = g_opstringpart[g_loop % g_opstringparts];
         uint m = strlen(str);
@@ -1241,10 +1500,12 @@ makeops()
         const char* p = "idu";
         const char* q = strchr(p, c);
         assert(q != 0);
-        t = (Op::Type)(q - p);
+        optype = (Op::Type)(q - p);
       }
       Op* op = getop(Op::OP);
-      makeop(tot_op, op, pk1, t);
+      makeop(tot_op, op, pk1, optype);
+      r.tableops++;
+      r.blobops += approxblobops(op);
       // add to end
       Op* last_op = com_op;
       while (last_op->next_op != 0)
@@ -1262,15 +1523,40 @@ makeops()
     // copy to gci level
     copyop(com_op, gci_op);
     tot_op->num_com += 1;
-    g_num_gci += 1;
+    r.gciops += 1;
+    g_gciops += 1;
   }
-  ll1("makeops: used ops = " << g_usedops << " com ops = " << g_num_gci);
+  ll1("makeops: " << r.tabname << ": com recs = " << r.gciops);
+}
+
+static void
+selecttables()
+{
+  uint i;
+  for (i = 0; i < maxrun(); i++)
+    run(i).skip = false;
+  for (i = 0; i + 1 < maxrun(); i++)
+    run(urandom(maxrun())).skip = true;
+  uint cnt = 0;
+  for (i = 0; i < maxrun(); i++)
+    cnt += ! run(i).skip;
+  ll1("use " << cnt << "/" << maxrun() << " tables in this loop");
+}
+
+static void
+makeops()
+{
+  selecttables();
+  for (uint i = 0; i < maxrun(); i++)
+    if (! run(i).skip)
+      makeops(run(i));
+  ll1("makeops: used recs = " << g_usedops << " com recs = " << g_gciops);
 }
 
 static int
-addndbop(Op* op)
+addndbop(Run& r, Op* op)
 {
-  chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
+  chkdb((g_op = g_con->getNdbOperation(r.tabname)) != 0);
   switch (op->type) {
   case Op::INS:
     chkdb(g_op->insertTuple() == 0);
@@ -1308,10 +1594,10 @@ addndbop(Op* op)
         else
           chkdb(g_op->setValue(c.name, (const char*)0) == 0);
       } else {
-        const Data::Txt& t = *d.ptr[i].txt;
+        const Data::Txt& txt = *d.ptr[i].txt;
         g_bh = g_op->getBlobHandle(c.name);
         if (d.ind[i] == 0)
-          chkdb(g_bh->setValue(t.val, t.len) == 0);
+          chkdb(g_bh->setValue(txt.val, txt.len) == 0);
         else
           chkdb(g_bh->setValue(0, 0) == 0);
         g_bh = 0;
@@ -1326,48 +1612,54 @@ static int
 runops()
 {
   ll1("runops");
+  Op* gci_op[g_maxtab][g_maxpk];
+  uint left = 0; // number of table pks with ops
+  int i;
   Uint32 pk1;
-  Op* gci_op[g_maxpk];
-  uint left = 0; // number of pks with ops
-  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    gci_op[pk1] = 0;
-    // total op on the pk
-    Op* tot_op = g_pk_op[pk1];
-    if (tot_op == 0)
-      continue;
-    if (tot_op->next_gci == 0) {
-      assert(g_loop != 0 && tot_op->type == Op::INS);
-      continue;
+  for (i = 0; i < maxrun(); i++) {
+    Run& r = run(i);
+    for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
+      gci_op[i][pk1] = 0;
+      // total op on the pk
+      Op* tot_op = r.pk_op[pk1];
+      if (tot_op == 0)
+        continue;
+      if (tot_op->next_gci == 0) {
+        assert(g_loop != 0 && tot_op->type == Op::INS);
+        continue;
+      }
+      // first commit chain
+      assert(tot_op->next_gci != 0);
+      gci_op[i][pk1] = tot_op->next_gci;
+      left++;
     }
-    // first commit chain
-    assert(tot_op->next_gci != 0);
-    gci_op[pk1] = tot_op->next_gci;
-    left++;
   }
   while (left != 0) {
+    i = urandom(maxrun());
     pk1 = urandom(g_opts.maxpk);
-    if (gci_op[pk1] == 0)
+    if (gci_op[i][pk1] == 0)
       continue;
+    Run& r = run(i);
     // do the ops in one transaction
     chkdb((g_con = g_ndb->startTransaction()) != 0);
-    Op* com_op = gci_op[pk1]->next_com;
+    Op* com_op = gci_op[i][pk1]->next_com;
     assert(com_op != 0);
     // first op in chain
     Op* op = com_op->next_op;
     assert(op != 0);
     while (op != 0) {
       ll2("runops:" << *op);
-      chkrc(addndbop(op) == 0);
+      chkrc(addndbop(r, op) == 0);
       op = op->next_op;
     }
     chkdb(g_con->execute(Commit) == 0);
-    gci_op[pk1]->gci = com_op->gci = g_con->getGCI();
-    ll2("commit: gci=" << com_op->gci);
+    gci_op[i][pk1]->gci = com_op->gci = g_con->getGCI();
+    ll2("commit: " << run(i).tabname << " gci=" << com_op->gci);
     g_ndb->closeTransaction(g_con);
     g_con = 0;
     // next chain
-    gci_op[pk1] = gci_op[pk1]->next_gci;
-    if (gci_op[pk1] == 0) {
+    gci_op[i][pk1] = gci_op[i][pk1]->next_gci;
+    if (gci_op[i][pk1] == 0) {
       assert(left != 0);
       left--;
     }
@@ -1377,14 +1669,14 @@ runops()
 }
 
 // move com chains with same gci under same gci entry
-static int
-mergeops()
+static void
+mergeops(Run& r)
 {
-  ll1("mergeops");
+  ll2("mergeops: " << r.tabname);
   uint mergecnt = 0;
   Uint32 pk1;
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    Op* tot_op = g_pk_op[pk1];
+    Op* tot_op = r.pk_op[pk1];
     if (tot_op == 0)
       continue;
     Op* gci_op = tot_op->next_gci;
@@ -1408,24 +1700,32 @@ mergeops()
         gci_op2 = gci_op2->next_gci;
         freeop(tmp_op);
         mergecnt++;
-        assert(g_num_gci != 0);
-        g_num_gci--;
+        assert(r.gciops != 0 && g_gciops != 0);
+        r.gciops--;
+        g_gciops--;
       }
       gci_op = gci_op->next_gci = gci_op2;
     }
   }
-  ll1("mergeops: used ops = " << g_usedops << " gci ops = " << g_num_gci);
-  return 0;
+  ll1("mergeops: " << r.tabname << ": gci recs = " << r.gciops);
+}
+
+static void
+mergeops()
+{
+  for (uint i = 0; i < maxrun(); i++)
+    mergeops(run(i));
+  ll1("mergeops: used recs = " << g_usedops << " gci recs = " << g_gciops);
 }
 
 // set bit for equal post/pre data in UPD, for use in event match
 static void
-cmppostpre()
+cmppostpre(Run& r)
 {
-  ll1("cmppostpre");
+  ll2("cmppostpre: " << r.tabname);
   Uint32 pk1;
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    Op* tot_op = g_pk_op[pk1];
+    Op* tot_op = r.pk_op[pk1];
     Op* gci_op = tot_op ? tot_op->next_gci : 0;
     while (gci_op != 0) {
       if (gci_op->type == Op::UPD) {
@@ -1446,6 +1746,148 @@ cmppostpre()
     }
   }
 }
+
+static void
+cmppostpre()
+{
+  ll1("cmppostpre");
+  for (uint i = 0; i < maxrun(); i++)
+    cmppostpre(run(i));
+}
+
+static int
+findevent(const NdbEventOperation* evt_op)
+{
+  uint i;
+  for (i = 0; i < maxrun(); i++) {
+    if (run(i).evt_op == evt_op)
+      break;
+  }
+  chkrc(i < maxrun());
+  return i;
+}
+
+static void
+geteventdata(Run& r)
+{
+  Data (&d)[2] = g_rec_ev->data;
+  int i, j;
+  for (j = 0; j < 2; j++) {
+    for (i = 0; i < ncol(); i++) {
+      const Col& c = getcol(i);
+      int ind, ret;
+      if (! c.isblob()) {
+        NdbRecAttr* ra = r.ev_ra[j][i];
+        ind = ra->isNULL();
+      } else {
+        NdbBlob* bh = r.ev_bh[j][i];
+        ret = bh->getDefined(ind);
+        assert(ret == 0);
+        if (ind == 0) { // value was returned and is not NULL
+          Data::Txt& txt = *d[j].ptr[i].txt;
+          Uint64 len64;
+          ret = bh->getLength(len64);
+          assert(ret == 0);
+          txt.len = (uint)len64;
+          delete [] txt.val;
+          txt.val = new char [txt.len];
+          memset(txt.val, 'X', txt.len);
+          Uint32 len = txt.len;
+          ret = bh->readData(txt.val, len);
+          assert(ret == 0 && len == txt.len);
+        }
+      }
+      d[j].ind[i] = ind;
+    }
+  }
+}
+
+static int
+addgcievents(Uint64 gci)
+{
+  ll1("getgcieventops");
+  uint count = 0;
+  uint seen_current = 0;
+  Uint32 iter = 0;
+  while (1) {
+    Uint32 evtypes = 0;
+    const NdbEventOperation* evt_op =
+      g_ndb->getGCIEventOperations(&iter, &evtypes);
+    if (evt_op == 0)
+      break;
+    // evt_op->getGCI() is not defined yet
+    int i;
+    chkrc((i = findevent(evt_op)) != -1);
+    run(i).addevtypes(gci, evtypes, 0);
+    seen_current += (g_evt_op == evt_op);
+    count++;
+  }
+  chkrc(seen_current == 1);
+  ll1("addgcievents: " << count);
+  return 0;
+}
+
+static int
+runevents()
+{
+  ll1("runevents");
+  uint mspoll = 1000;
+  uint npoll = 6; // strangely long delay
+  ll1("poll " << npoll);
+  Uint64 gci = (Uint64)0;
+  while (npoll != 0) {
+    npoll--;
+    int ret;
+    ret = g_ndb->pollEvents(mspoll);
+    if (ret <= 0)
+      continue;
+    while (1) {
+      g_rec_ev->init(Op::EV);
+      g_evt_op = g_ndb->nextEvent();
+      if (g_evt_op == 0)
+        break;
+      Uint64 newgci = g_evt_op->getGCI();
+      assert(newgci != 0);
+      g_rec_ev->gci = newgci;
+      if (gci != newgci) {
+        ll1("new gci: " << gci << " -> " << newgci);
+        gci = newgci;
+        // add slot in each tab|e
+        uint i;
+        for (i = 0; i < maxtab(); i++)
+          chkrc(run(i).addgci(gci) == 0);
+        chkrc(addgcievents(gci) == 0);
+      }
+      int i;
+      chkrc((i = findevent(g_evt_op)) != -1);
+      Run& r = run(i);
+      NdbDictionary::Event::TableEvent evtype = g_evt_op->getEventType();
+      chkrc(seteventtype(g_rec_ev, evtype) == 0);
+      r.addevtypes(gci, (Uint32)evtype, 1);
+      geteventdata(r);
+      ll2("runevents: EVT: " << *g_rec_ev);
+      // check basic sanity
+      Uint32 pk1 = ~(Uint32)0;
+      chkrc(checkop(g_rec_ev, pk1) == 0);
+      // add to events
+      Op* tot_ev = r.pk_ev[pk1];
+      if (tot_ev == 0)
+        tot_ev = r.pk_ev[pk1] = getop(Op::EV);
+      Op* last_ev = tot_ev;
+      while (last_ev->next_ev != 0)
+        last_ev = last_ev->next_ev;
+      // copy and add
+      Op* ev = getop(Op::EV);
+      copyop(g_rec_ev, ev);
+      g_rec_ev->freemem();
+      last_ev->next_ev = ev;
+      g_num_ev++;
+    }
+  }
+  ll1("runevents: used ops = " << g_usedops << " events = " << g_num_ev);
+  return 0;
+}
+
 static int
 cmpopevdata(const Data& d1, const Data& d2)
 {
@@ -1474,9 +1916,8 @@ cmpopevdata(const Data (&d1)[2], const D
 }
 
 static int
-matchevent(Op* ev)
+matchevent(Run& r, Op* ev)
 {
-  Op::Type t = ev->type;
   Data (&d2)[2] = ev->data;
   // get PK
   Uint32 pk1 = d2[0].pk1;
@@ -1484,10 +1925,10 @@ matchevent(Op* ev)
   // on error repeat and print details
   uint loop = 0;
   while (loop <= 1) {
-    uint g_loglevel = loop == 0 ? g_opts.loglevel : 2;
-    ll1("matchevent: pk1=" << pk1 << " type=" << t);
+    int g_loglevel = loop == 0 ? g_opts.loglevel : 2;
+    ll1("matchevent: " << r.tabname << ": pk1=" << pk1 << " type=" << ev->type);
     ll2("EVT: " << *ev);
-    Op* tot_op = g_pk_op[pk1];
+    Op* tot_op = r.pk_op[pk1];
     Op* gci_op = tot_op ? tot_op->next_gci : 0;
     uint pos = 0;
     bool ok = false;
@@ -1506,21 +1947,21 @@ matchevent(Op* ev)
         }
         com_op = com_op->next_com;
       }
-      // match agains GCI op
+      // match against GCI op
       if (gci_op->type != Op::NUL) {
         const Data (&d1)[2] = gci_op->data;
         if (cmpopevdata(d1, d2) == 0) {
           bool tmpok = true;
-          if (gci_op->type != t) {
-            ll2("***: wrong type " << gci_op->type << " != " << t);
+          if (gci_op->type != ev->type) {
+            ll2("***: wrong type " << gci_op->type << " != " << ev->type);
             tmpok = false;
           }
           if (gci_op->match) {
             ll2("***: duplicate match");
             tmpok = false;
           }
-          if (pos != g_ev_pos[pk1]) {
-            ll2("***: wrong pos " << pos << " != " << g_ev_pos[pk1]);
+          if (pos != r.ev_pos[pk1]) {
+            ll2("***: wrong pos " << pos << " != " << r.ev_pos[pk1]);
             tmpok = false;
           }
           if (gci_op->gci != ev->gci) {
@@ -1537,10 +1978,10 @@ matchevent(Op* ev)
       gci_op = gci_op->next_gci;
     }
     if (ok) {
-      ll1("matchevent: match");
+      ll2("matchevent: match");
       return 0;
     }
-    ll1("matchevent: ERROR: no match");
+    ll0("matchevent: ERROR: no match");
     if (g_loglevel >= 2)
       return -1;
     loop++;
@@ -1549,19 +1990,20 @@ matchevent(Op* ev)
 }
 
 static int
-matchevents()
+matchevents(Run& r)
 {
+  ll1("matchevents: " << r.tabname);
   uint nomatch = 0;
   Uint32 pk1;
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    Op* tot_ev = g_pk_ev[pk1];
+    Op* tot_ev = r.pk_ev[pk1];
     if (tot_ev == 0)
       continue;
     Op* ev = tot_ev->next_ev;
     while (ev != 0) {
-      if (matchevent(ev) < 0)
+      if (matchevent(r, ev) < 0)
         nomatch++;
-      g_ev_pos[pk1]++;
+      r.ev_pos[pk1]++;
       ev = ev->next_ev;
     }
   }
@@ -1570,13 +2012,22 @@ matchevents()
 }
 
 static int
-matchops()
+matchevents()
 {
-  ll1("matchops");
+  ll1("matchevents");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(matchevents(run(i)) == 0);
+  return 0;
+}
+
+static int
+matchops(Run& r)
+{
+  ll1("matchops: " << r.tabname);
   uint nomatch = 0;
   Uint32 pk1;
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    Op* tot_op = g_pk_op[pk1];
+    Op* tot_op = r.pk_op[pk1];
     if (tot_op == 0)
       continue;
     Op* gci_op = tot_op->next_gci;
@@ -1606,84 +2057,52 @@ matchops()
   return 0;
 }
 
-static void
-geteventdata()
+static int
+matchops()
 {
-  Data (&d)[2] = g_rec_ev->data;
-  int i, j;
-  for (j = 0; j < 2; j++) {
-    for (i = 0; i < ncol(); i++) {
-      const Col& c = getcol(i);
-      int ind, ret;
-      if (! c.isblob()) {
-        NdbRecAttr* ra = g_ev_ra[j][i];
-        ind = ra->isNULL();
-      } else {
-        NdbBlob* bh = g_ev_bh[j][i];
-        ret = bh->getDefined(ind);
-        assert(ret == 0);
-        if (ind == 0) { // value was returned and is not NULL
-          Data::Txt& t = *d[j].ptr[i].txt;
-          Uint64 len64;
-          ret = bh->getLength(len64);
-          assert(ret == 0);
-          t.len = (uint)len64;
-          delete [] t.val;
-          t.val = new char [t.len];
-          memset(t.val, 'X', t.len);
-          Uint32 len = t.len;
-          ret = bh->readData(t.val, len);
-          assert(ret == 0 && len == t.len);
-        }
-      }
-      d[j].ind[i] = ind;
-    }
-  }
+  ll1("matchops");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(matchops(run(i)) == 0);
+  return 0;
 }
 
 static int
-runevents()
+matchgcievents(Run& r)
 {
-  ll1("runevents");
-  uint mspoll = 1000;
-  uint npoll = 6; // strangely long delay
-  ll1("poll " << npoll);
-  while (npoll != 0) {
-    npoll--;
-    int ret;
-    ret = g_ndb->pollEvents(mspoll);
-    if (ret <= 0)
+  ll1("matchgcievents: " << r.tabname);
+  uint i;
+  for (i = 0; i < r.gcicnt; i++) {
+    Uint32 t0 = r.gcievtypes[i][0];
+    Uint32 t1 = r.gcievtypes[i][1];
+    ll1("gci: " << r.gcinum[i] << hex << " report: " << t0 << " seen: " << t1);
+
+    if (r.skip)
+      chkrc(t0 == 0 && t1 == 0);
+    if (t0 == 0 && t1 == 0)
       continue;
-    while (1) {
-      g_rec_ev->init(Op::EV);
-      NdbEventOperation* tmp_op = g_ndb->nextEvent();
-      if (tmp_op == 0)
-        break;
-      reqrc(g_evt_op == tmp_op);
-      chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0);
-      geteventdata();
-      g_rec_ev->gci = g_evt_op->getGCI();
-      // get indicators and blob value
-      ll2("runevents: EVT: " << *g_rec_ev);
-      // check basic sanity
-      Uint32 pk1 = ~(Uint32)0;
-      chkrc(checkop(g_rec_ev, pk1) == 0);
-      // add to events
-      Op* tot_ev = g_pk_ev[pk1];
-      if (tot_ev == 0)
-        tot_ev = g_pk_ev[pk1] = getop(Op::EV);
-      Op* last_ev = tot_ev;
-      while (last_ev->next_ev != 0)
-        last_ev = last_ev->next_ev;
-      // copy and add
-      Op* ev = getop(Op::EV);
-      copyop(g_rec_ev, ev);
-      g_rec_ev->freemem();
-      last_ev->next_ev = ev;
-      g_num_ev++;
+
+    // check if not reported event op seen
+    chkrc(t0 != 0);
+    // check if not reported event type seen
+    chkrc((~t0 & t1) == 0);
+
+    // the other way does not work under merge
+    if (g_opts.separate_events) {
+      // check if reported event op not seen
+      chkrc(t1 != 0);
+      // check if reported event type not seen
+      chkrc((t0 & ~t1) == 0);
     }
   }
-  ll1("runevents: used ops = " << g_usedops << " events = " << g_num_ev);
+  return 0;
+}
+
+static int
+matchgcievents()
+{
+  ll1("matchgcievents");
+  for (uint i = 0; i < maxrun(); i++)
+    chkrc(matchgcievents(run(i)) == 0);
   return 0;
 }
 
@@ -1711,27 +2130,29 @@ static int
 runtest()
 {
   setseed(-1);
+  initrun();
   chkrc(createtable() == 0);
   chkrc(createevent() == 0);
   for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
     ll0("=== loop " << g_loop << " ===");
     setseed(g_loop);
     resetmem();
-    chkrc(scantab() == 0); // alternative: save tot_op for loop > 0
+    chkrc(scantable() == 0); // alternative: save tot_op for loop > 0
     makeops();
     g_rec_ev = getop(Op::EV);
     chkrc(createeventop() == 0);
-    chkdb(g_evt_op->execute() == 0);
+    chkrc(executeeventop() == 0);
     chkrc(waitgci(3) == 0);
     chkrc(runops() == 0);
     if (! g_opts.separate_events)
-      chkrc(mergeops() == 0);
+      mergeops();
     cmppostpre();
     chkrc(runevents() == 0);
-    ll0("counts: gci = " << g_num_gci << " ev = " << g_num_ev);
+    ll0("counts: gci ops = " << g_gciops << " ev ops = " << g_num_ev);
     chkrc(matchevents() == 0);
     chkrc(matchops() == 0);
-    chkrc(dropeventop() == 0);
+    chkrc(matchgcievents() == 0);
+    chkrc(dropeventops() == 0);
     // time erases everything..
     chkrc(waitgci(1) == 0);
   }
@@ -1751,51 +2172,54 @@ my_long_options[] =
   { "abort-on-error", 1001, "Do abort() on any error",
     (gptr*)&g_opts.abort_on_error, (gptr*)&g_opts.abort_on_error, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "loglevel", 1002, "Logging level in this program (default 0)",
+  { "loglevel", 1002, "Logging level in this program 0-3 (default 0)",
     (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0,
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "loop", 1003, "Number of test loops (default 3, 0=forever)",
+  { "loop", 1003, "Number of test loops (default 5, 0=forever)",
     (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0,
-    GET_INT, REQUIRED_ARG, 3, 0, 0, 0, 0, 0 },
-  { "maxops", 1004, "Approx number of PK operations (default 1000)",
+    GET_INT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
+  { "maxops", 1004, "Approx number of PK operations per table (default 1000)",
     (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0,
     GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
-  { "maxpk", 1005, "Number of different PK values (default 10)",
+  { "maxpk", 1005, "Number of different PK values (default 10, max 1000)",
     (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0,
     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
-  { "no-blobs", 1006, "Omit blob attributes (5.0: true)",
+  { "maxtab", 1006, "Number of tables (default 10, max 100)",
+    (gptr*)&g_opts.maxtab, (gptr*)&g_opts.maxtab, 0,
+    GET_INT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
+  { "no-blobs", 1007, "Omit blob attributes (5.0: true)",
     (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-implicit-nulls", 1007, "Insert must include all attrs"
+  { "no-implicit-nulls", 1008, "Insert must include all attrs"
                                " i.e. no implicit NULLs",
     (gptr*)&g_opts.no_implicit_nulls, (gptr*)&g_opts.no_implicit_nulls, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-missing-update", 1008, "Update must include all non-PK attrs",
+  { "no-missing-update", 1009, "Update must include all non-PK attrs",
     (gptr*)&g_opts.no_missing_update, (gptr*)&g_opts.no_missing_update, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-multiops", 1009, "Allow only 1 operation per commit",
+  { "no-multiops", 1010, "Allow only 1 operation per commit",
     (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-nulls", 1010, "Create no NULL values",
+  { "no-nulls", 1011, "Create no NULL values",
     (gptr*)&g_opts.no_nulls, (gptr*)&g_opts.no_nulls, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "one-blob", 1011, "Only one blob attribute (default 2)",
+  { "one-blob", 1012, "Only one blob attribute (default 2)",
     (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "opstring", 1012, "Operations to run e.g. idiucdc (c is commit) or"
+  { "opstring", 1013, "Operations to run e.g. idiucdc (c is commit) or"
                       " iuuc:uudc (the : separates loops)",
     (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0,
     GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "seed", 1013, "Random seed (0=loop number, default -1=random)",
+  { "seed", 1014, "Random seed (0=loop number, default -1=random)",
     (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
     GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
-  { "separate-events", 1014, "Do not combine events per GCI (5.0: true)",
+  { "separate-events", 1015, "Do not combine events per GCI (5.0: true)",
     (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "tweak", 1015, "Whatever the source says",
+  { "tweak", 1016, "Whatever the source says",
     (gptr*)&g_opts.tweak, (gptr*)&g_opts.tweak, 0,
     GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "use-table", 1016, "Use existing table 'tem1'",
+  { "use-table", 1017, "Use existing tables",
     (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { 0, 0, 0,
@@ -1812,10 +2236,6 @@ usage()
 static int
 checkopts()
 {
-  if (g_opts.maxpk > g_maxpk) {
-    ll0("setting maxpk to " << g_maxpk);
-    g_opts.maxpk = g_maxpk;
-  }
   if (g_opts.separate_events) {
     g_opts.no_blobs = true;
   }
@@ -1837,16 +2257,25 @@ checkopts()
     uint i;
     for (i = 0; i < g_opstringparts; i++) {
       const char* s = g_opstringpart[i];
-      while (*s != 0)
-        if (strchr("iduc", *s++) == 0)
+      while (*s != 0) {
+        if (strchr("iduc", *s++) == 0) {
+          ll0("opstring chars are i,d,u,c");
           return -1;
-      if (s == g_opstringpart[i] || s[-1] != 'c')
+        }
+      }
+      if (s == g_opstringpart[i] || s[-1] != 'c') {
+        ll0("opstring chain must end in 'c'");
         return -1;
+      }
     }
   }
   if (g_opts.no_nulls) {
     g_opts.no_implicit_nulls = true;
   }
+  if (g_opts.maxpk > g_maxpk ||
+      g_opts.maxtab > g_maxtab) {
+    return -1;
+  }
   return 0;
 }
 
@@ -1876,10 +2305,8 @@ main(int argc, char** argv)
       }
     }
   }
-  if (g_evt_op != 0) {
-    (void)dropeventop();
-    g_evt_op = 0;
-  }
+  dropeventops(true);
+  dropevent(true);
   delete g_ndb;
   delete g_ncc;
   return NDBT_ProgramExit(NDBT_FAILED);
Thread
bk commit into 5.1 tree (pekka:1.2233) BUG#18102pekka21 Jun