List:Commits« Previous MessageNext Message »
From:pekka Date:January 9 2006 12:09am
Subject:bk commit into 5.0 tree (pekka:1.2021)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2021 06/01/09 01:09:42 pekka@stripped +1 -0
  ndb - wl#2972 ndb api test_event_merge 5.0+5.1

  ndb/test/ndbapi/test_event_merge.cpp
    1.3 06/01/09 01:07:22 pekka@stripped +877 -368
    5.0 passes. 5.1 has many problems

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my50

--- 1.2/ndb/test/ndbapi/test_event_merge.cpp	2005-12-27 16:33:52 +01:00
+++ 1.3/ndb/test/ndbapi/test_event_merge.cpp	2006-01-09 01:07:22 +01:00
@@ -27,6 +27,9 @@
 #undef version50
 #endif
 
+// until rbr in 5.1
+#undef version51rbr
+
 #if !defined(min) || !defined(max)
 #define min(x, y) ((x) < (y) ? (x) : (y))
 #define max(x, y) ((x) > (y) ? (x) : (y))
@@ -42,13 +45,20 @@
  * 2) In event API version >= 5.1 separate commits within same GCI are
  * by default merged.  This is required to read blob data via NdbBlob.
  *
- * This test program ignores Blob columns in version 5.0.
+ * Option --separate-events disables GCI merge and implies --no-blobs.
+ * This is used to test basic events functionality.
+ *
+ * Option --no-blobs omits blob attributes.  This is used to test GCI
+ * merge without getting into blob bugs.
+ *
+ * Option --no-multiops allows 1 operation per commit.  This avoids TUP
+ * and blob multi-operation bugs.
  *
  * There are 5 ways (ignoring NUL operand) to compose 2 ops:
  *                      5.0 bugs        5.1 bugs
  * INS o DEL = NUL
- * INS o UPD = INS                       5.1
- * DEL o INS = UPD      type=INS         5.1
+ * INS o UPD = INS                      type=INS
+ * DEL o INS = UPD      type=INS        type=INS
  * UPD o DEL = DEL      no event
  * UPD o UPD = UPD
  */
@@ -59,41 +69,69 @@
   uint loop;
   uint maxops;
   uint maxpk;
-  const char* opstr;
+  my_bool no_blobs;
+  my_bool no_multiops;
+  my_bool one_blob;
+  const char* opstring;
   uint seed;
   my_bool separate_events;
   my_bool use_table;
 };
 
 static Opts g_opts;
-static const uint g_maxops = 10000;
 static const uint g_maxpk = 100;
+static const uint g_maxopstringpart = 100;
+static const char* g_opstringpart[g_maxopstringpart];
+static uint g_opstringparts = 0;
+static uint g_loop = 0;
 
 static Ndb_cluster_connection* g_ncc = 0;
 static Ndb* g_ndb = 0;
 static NdbDictionary::Dictionary* g_dic = 0;
 static NdbTransaction* g_con = 0;
 static NdbOperation* g_op = 0;
+static NdbScanOperation* g_scan_op = 0;
 
 static const char* g_tabname = "tem1";
 static const char* g_evtname = "tem1ev1";
 static const uint g_charlen = 5;
+static const char* g_charval = "abcdefgh";
 static const char* g_csname = "latin1_swedish_ci";
 
+static uint g_blobinlinesize = 256;
+static uint g_blobpartsize = 2000;
+static uint g_blobstripesize = 2;
+static const uint g_maxblobsize = 100000;
+
 static const NdbDictionary::Table* g_tab = 0;
 static const NdbDictionary::Event* g_evt = 0;
 
 static NdbEventOperation* g_evt_op = 0;
+static NdbBlob* g_bh = 0;
 
 static uint
-urandom(uint n)
+urandom()
 {
   uint r = (uint)random();
-  if (n != 0)
-    r = r % n;
   return r;
 }
 
+static uint
+urandom(uint m)
+{
+  if (m == 0)
+    return 0;
+  uint r = urandom();
+  r = r % m;
+  return r;
+}
+
+static bool
+urandom(uint per, uint cent)
+{
+  return urandom(cent) < per;
+}
+
 static int& g_loglevel = g_opts.loglevel; // default log level
 
 #define chkdb(x) \
@@ -138,11 +176,21 @@
     if (e.code != 0)
       ll0(++any << " op: error " << e);
   }
+  if (g_scan_op != 0) {
+    const NdbError& e = g_scan_op->getNdbError();
+    if (e.code != 0)
+      ll0(++any << " scan_op: error " << e);
+  }
   if (g_evt_op != 0) {
     const NdbError& e = g_evt_op->getNdbError();
     if (e.code != 0)
       ll0(++any << " evt_op: error " << e);
   }
+  if (g_bh != 0) {
+    const NdbError& e = g_bh->getNdbError();
+    if (e.code != 0)
+      ll0(++any << " evt_op: error " << e);
+  }
   if (! any)
     ll0("unknown db error");
 }
@@ -155,31 +203,47 @@
   bool nullable;
   uint length;
   uint size;
+  bool isblob() const {
+    return type == NdbDictionary::Column::Text;
+  }
 };
 
 static Col g_col[] = {
   { 0, "pk1", NdbDictionary::Column::Unsigned, true, false, 1, 4 },
   { 1, "pk2", NdbDictionary::Column::Char, true, false,  g_charlen, g_charlen },
   { 2, "seq", NdbDictionary::Column::Unsigned,  false, false, 1, 4 },
-  { 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen }
+  { 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen },
+  { 4, "tx1", NdbDictionary::Column::Text, false, true, 0, 0 },
+  { 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 }
 };
 
-static const uint g_ncol = sizeof(g_col)/sizeof(g_col[0]);
+static const uint g_maxcol = sizeof(g_col)/sizeof(g_col[0]);
+
+static uint
+ncol()
+{
+  uint n = g_maxcol;
+  if (g_opts.no_blobs)
+    n -= 2;
+  else if (g_opts.one_blob)
+    n -= 1;
+  return n;
+}
 
 static const Col&
 getcol(uint i)
 {
-  if (i < g_ncol)
+  if (i < ncol())
     return g_col[i];
   assert(false);
-  return g_col[g_ncol];
+  return g_col[0];
 }
 
 static const Col&
 getcol(const char* name)
 {
   uint i;
-  for (i = 0; i < g_ncol; i++)
+  for (i = 0; i < ncol(); i++)
     if (strcmp(g_col[i].name, name) == 0)
       break;
   return getcol(i);
@@ -194,7 +258,7 @@
   CHARSET_INFO* cs;
   chkrc((cs = get_charset_by_name(g_csname, MYF(0))) != 0);
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
     NdbDictionary::Column col(c.name);
     col.setType(c.type);
@@ -209,6 +273,12 @@
       col.setLength(c.length);
       col.setCharset(cs);
       break;
+    case NdbDictionary::Column::Text:
+      col.setInlineSize(g_blobinlinesize);
+      col.setPartSize(g_blobpartsize);
+      col.setStripeSize(g_blobstripesize);
+      col.setCharset(cs);
+      break;
     default:
       assert(false);
       break;
@@ -229,9 +299,9 @@
     chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
     chkdb(g_op->insertTuple() == 0);
     Uint32 pk1;
-    char pk2[g_charlen];
+    char pk2[g_charlen + 1];
     pk1 = g_maxpk;
-    memset(pk2, 0x20, g_charlen);
+    sprintf(pk2, "%-*u", g_charlen, pk1);
     chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
     chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
     chkdb(g_con->execute(Commit) == 0);
@@ -255,34 +325,86 @@
 }
 
 struct Data {
+  struct Txt { char* val; uint len; };
+  union Ptr { Uint32* u32; char* ch; Txt* txt; void* v; };
   Uint32 pk1;
-  char pk2[g_charlen];
+  char pk2[g_charlen + 1];
   Uint32 seq;
-  char cc1[g_charlen];
-  void* ptr[g_ncol];
-  int ind[g_ncol]; // -1 = no data, 1 = NULL, 0 = not NULL
+  char cc1[g_charlen + 1];
+  Txt tx1;
+  Txt tx2;
+  Ptr ptr[g_maxcol];
+  int ind[g_maxcol]; // -1 = no data, 1 = NULL, 0 = not NULL
+  uint noop; // bit: omit in NdbOperation (implicit NULL INS or no UPD)
+  uint ppeq; // bit: post/pre data value equal in GCI data[0]/data[1]
   void init() {
     uint i;
     pk1 = 0;
     memset(pk2, 0, sizeof(pk2));
     seq = 0;
     memset(cc1, 0, sizeof(cc1));
-    ptr[0] = &pk1;
-    ptr[1] = pk2;
-    ptr[2] = &seq;
-    ptr[3] = cc1;
-    for (i = 0; i < g_ncol; i++)
+    tx1.val = tx2.val = 0;
+    tx1.len = tx2.len = 0;
+    ptr[0].u32 = &pk1;
+    ptr[1].ch = pk2;
+    ptr[2].u32 = &seq;
+    ptr[3].ch = cc1;
+    ptr[4].txt = &tx1;
+    ptr[5].txt = &tx2;
+    for (i = 0; i < g_maxcol; i++)
       ind[i] = -1;
+    noop = 0;
+    ppeq = 0;
+  }
+  void free() {
+    delete [] tx1.val;
+    delete [] tx2.val;
+    init();
   }
 };
 
+static int
+cmpcol(const Col& c, const Data& d1, const Data& d2)
+{
+  uint i = c.no;
+  if (d1.ind[i] != d2.ind[i])
+    return 1;
+  if (d1.ind[i] == 0) {
+    switch (c.type) {
+    case NdbDictionary::Column::Unsigned:
+      if (*d1.ptr[i].u32 != *d2.ptr[i].u32)
+        return 1;
+      break;
+    case NdbDictionary::Column::Char:
+      if (memcmp(d1.ptr[i].ch, d2.ptr[i].ch, c.size) != 0)
+        return 1;
+      break;
+    case NdbDictionary::Column::Text:
+      {
+        const Data::Txt& t1 = *d1.ptr[i].txt;
+        const Data::Txt& t2 = *d2.ptr[i].txt;
+        if (t1.len != t2.len)
+          return 1;
+        if (memcmp(t1.val, t2.val, t1.len) != 0)
+          return 1;
+      }
+      break;
+    default:
+      assert(false);
+      break;
+    }
+  }
+  return 0;
+}
+
 static NdbOut&
 operator<<(NdbOut& out, const Data& d)
 {
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = getcol(i);
-    out << (i == 0 ? "" : " ") << c.name << "=";
+    out << (i == 0 ? "" : " ") << c.name;
+    out << (! (d.noop & (1 << i)) ? "=" : ":");
     if (d.ind[i] == -1)
       continue;
     if (d.ind[i] == 1) {
@@ -291,12 +413,12 @@
     }
     switch (c.type) {
     case NdbDictionary::Column::Unsigned:
-      out << *(Uint32*)d.ptr[i];
+      out << *d.ptr[i].u32;
       break;
     case NdbDictionary::Column::Char:
       {
         char buf[g_charlen + 1];
-        memcpy(buf, d.ptr[i], g_charlen);
+        memcpy(buf, d.ptr[i].ch, g_charlen);
         uint n = g_charlen;
         while (1) {
           buf[n] = 0;
@@ -304,11 +426,30 @@
             break;
           n--;
         }
-        out << buf;
+        out << "'" << buf << "'";
+      }
+      break;
+    case NdbDictionary::Column::Text:
+      {
+        Data::Txt& t = *d.ptr[i].txt;
+        bool first = true;
+        uint j = 0;
+        while (j < t.len) {
+          char c[2];
+          c[0] = t.val[j++];
+          c[1] = 0;
+          uint m = 1;
+          while (j < t.len && t.val[j] == c[0])
+            j++, m++;
+          if (! first)
+            out << "+";
+          first = false;
+          out << m << c;
+        }
       }
       break;
     default:
-      out << "?";
+      assert(false);
       break;
     }
   }
@@ -329,18 +470,26 @@
   Type type;
   Op* next_op; // within one commit
   Op* next_com; // next commit chain or next event
+  Op* next_gci; // groups commit chains (unless --separate-events)
+  Op* next_ev;
+  Op* next_free; // free list
+  bool free; // on free list
   uint num_op;
   uint num_com;
   Data data[2]; // 0-post 1-pre
   bool match; // matched to event
-  void init() {
+  Uint32 gci; // defined for com op and event
+  void init(Kind a_kind) {
+    kind = a_kind;
     assert(kind == OP || kind == EV);
     type = NUL;
-    next_op = next_com = 0;
+    next_op = next_com = next_gci = next_ev = next_free = 0;
+    free = false;
     num_op = num_com = 0;
     data[0].init();
     data[1].init();
     match = false;
+    gci = 0;
   }
 };
 
@@ -370,9 +519,11 @@
 static NdbOut&
 operator<<(NdbOut& out, const Op& op)
 {
-  out << "t=" << op.type;
+  out << op.type;
   out << " " << op.data[0];
   out << " [" << op.data[1] << "]";
+  if (op.gci != 0)
+    out << " gci:" << op.gci;
   return out;
 }
 
@@ -398,75 +549,104 @@
   return 0;
 }
 
+static Op* g_opfree = 0;
+static uint g_freeops = 0;
 static uint g_usedops = 0;
-static uint g_usedevs = 0;
-static Op g_oplist[g_maxops];
-static Op g_evlist[g_maxops];
-static uint g_maxcom = 8; // max ops per commit
-
+static uint g_maxcom = 10; // max ops per commit
 static Op* g_pk_op[g_maxpk];
 static Op* g_pk_ev[g_maxpk];
 static uint g_seq = 0;
-
-static NdbRecAttr* g_ra[2][g_ncol]; // 0-post 1-pre
+static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-pre
+static NdbBlob* g_ev_bh[2][g_maxcol]; // 0-post 1-pre
 static Op* g_rec_ev;
-static uint g_ev_cnt[g_maxpk];
-
-static uint
-getfreeops()
-{
-  assert(g_opts.maxops >= g_usedops);
-  return g_opts.maxops - g_usedops;
-}
-
-static uint
-getfreeevs()
-{
-  assert(g_opts.maxops >= g_usedevs);
-  return g_opts.maxops - g_usedevs;
-}
+static uint g_ev_pos[g_maxpk];
 
 static Op*
-getop()
+getop(Op::Kind a_kind)
 {
-  if (g_usedops < g_opts.maxops) {
-    Op* op = &g_oplist[g_usedops++];
-    op->kind = Op::OP;
-    op->init();
-    return op;
-  }
-  assert(false);
-  return 0;
+  if (g_opfree == 0) {
+    assert(g_freeops == 0);
+    Op* op = new Op;
+    assert(op != 0);
+    op->next_free = g_opfree;
+    g_opfree = op;
+    op->free = true;
+    g_freeops++;
+  }
+  Op* op = g_opfree;
+  g_opfree = op->next_free;
+  assert(g_freeops != 0);
+  g_freeops--;
+  g_usedops++;
+  op->init(a_kind);
+  return op;
 }
 
-static Op*
-getev()
+static void
+freeop(Op* op)
 {
-  if (g_usedevs < g_opts.maxops) {
-    Op* ev = &g_evlist[g_usedevs++];
-    ev->kind = Op::EV;
-    ev->init();
-    return ev;
-  }
-  assert(false);
-  return 0;
+  assert(! op->free);
+  op->data[0].free();
+  op->data[1].free();
+  op->free = true;
+  op->next_free = g_opfree;
+  g_opfree = op;
+  g_freeops++;
+  assert(g_usedops != 0);
+  g_usedops--;
 }
 
 static void
 resetmem()
 {
   int i, j;
-  for (j = 0; j < 2; j++)
-    for (i = 0; i < g_ncol; i++)
-      g_ra[j][i] = 0;
-  g_rec_ev = 0;
-  for (i = 0; i < g_opts.maxpk; i++)
-    g_pk_op[i] = 0;
-  for (i = 0; i < g_opts.maxpk; i++)
-    g_ev_cnt[i] = 0;
-  g_seq = 0;
-  g_usedops = 0;
-  g_usedevs = 0;
+  for (j = 0; j < 2; j++) {
+    for (i = 0; i < g_maxcol; i++) {
+      g_ev_ra[j][i] = 0;
+      g_ev_bh[j][i] = 0;
+    }
+  }
+  if (g_rec_ev != 0) {
+    freeop(g_rec_ev);
+    g_rec_ev = 0;
+  }
+  Uint32 pk1;
+  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++)
+    g_ev_pos[pk1] = 0;
+  // leave g_seq
+  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
+    if (g_pk_op[pk1] != 0) {
+      Op* tot_op = g_pk_op[pk1];
+      while (tot_op->next_gci != 0) {
+        Op* gci_op = tot_op->next_gci;
+        while (gci_op->next_com != 0) {
+          Op* com_op = gci_op->next_com;
+          while (com_op->next_op != 0) {
+            Op* op = com_op->next_op;
+            com_op->next_op = op->next_op;
+            freeop(op);
+          }
+          gci_op->next_com = com_op->next_com;
+          freeop(com_op);
+        }
+        tot_op->next_gci = gci_op->next_gci;
+        freeop(gci_op);
+      }
+      freeop(tot_op);
+      g_pk_op[pk1] = 0;
+    }
+    if (g_pk_ev[pk1] != 0) {
+      Op* tot_op = g_pk_ev[pk1];
+      while (tot_op->next_ev != 0) {
+        Op* ev = tot_op->next_ev;
+        tot_op->next_ev = ev->next_ev;
+        freeop(ev);
+      }
+      freeop(tot_op);
+      g_pk_ev[pk1] = 0;
+    }
+  }
+  assert(g_usedops == 0);
 }
 
 struct Comp {
@@ -487,43 +667,43 @@
 static int
 checkop(const Op* op, Uint32& pk1)
 {
-  const Data (&d)[2] = op->data;
   Op::Type t = op->type;
-  chkrc(t == Op::NUL || t == Op::INS || t == Op::DEL || t == Op::UPD);
-  { const Col& c = getcol("pk1");
-    chkrc(d[0].ind[c.no] == 0);
-    pk1 = d[0].pk1;
+  if (t == Op::NUL)
+    return 0;
+  chkrc(t == Op::INS || t == Op::DEL || t == Op::UPD);
+  const Data& d0 = op->data[0];
+  const Data& d1 = op->data[1];
+  {
+    const Col& c = getcol("pk1");
+    chkrc(d0.ind[c.no] == 0);
+    pk1 = d0.pk1;
     chkrc(pk1 < g_opts.maxpk);
   }
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = getcol(i);
-    if (t != Op::NUL) {
-      if (c.pk) {
-        chkrc(d[0].ind[i] == 0); // even DEL has PK in post data
-        if (t == Op::INS) {
-          chkrc(d[1].ind[i] == -1);
-        } else if (t == Op::DEL) {
-#ifdef ndb_event_cares_about_pk_pre_data
-          chkrc(d[1].ind[i] == -1);
-#endif
-        } else {
-#ifdef ndb_event_cares_about_pk_pre_data
-          chkrc(d[1].ind[i] == 0);
-#endif
-        }
-      } else {
-        if (t == Op::INS) {
-          chkrc(d[0].ind[i] >= 0);
-          chkrc(d[1].ind[i] == -1);
-        } else if (t == Op::DEL) {
-          chkrc(d[0].ind[i] == -1);
-          chkrc(d[1].ind[i] >= 0);
-        } else if (op->kind == Op::OP) {
-          chkrc(d[0].ind[i] >= 0);
-          chkrc(d[1].ind[i] >= 0);
-        }
-      }
+    const int ind0 = d0.ind[i];
+    const int ind1 = d1.ind[i];
+    // the rules are the rules..
+    if (c.pk) {
+      chkrc(ind0 == 0); // always PK in post data
+      if (t == Op::INS)
+        chkrc(ind1 == -1);
+      if (t == Op::DEL)
+        chkrc(ind1 == -1); // no PK in pre data
+      if (t == Op::UPD)
+        chkrc(ind1 == 0);
+    }
+    if (! c.pk) {
+      if (t == Op::INS)
+        chkrc(ind0 >= 0 && ind1 == -1);
+      if (t == Op::DEL)
+        chkrc(ind0 == -1 && ind1 >= 0); // always non-PK in pre data
+      if (t == Op::UPD)
+        chkrc(ind0 == -1 || ind1 >= 0); // update must have pre data
+    }
+    if (! c.nullable) {
+      chkrc(ind0 <= 0 && ind1 <= 0);
     }
   }
   return 0;
@@ -542,28 +722,51 @@
 static void
 copycol(const Col& c, const Data& d1, Data& d3)
 {
-  if ((d3.ind[c.no] = d1.ind[c.no]) != -1)
-    memmove(d3.ptr[c.no], d1.ptr[c.no], c.size);
+  uint i = c.no;
+  if ((d3.ind[i] = d1.ind[i]) == 0) {
+    if (! c.isblob()) {
+      memmove(d3.ptr[i].v, d1.ptr[i].v, c.size);
+    } else {
+      Data::Txt& t1 = *d1.ptr[i].txt;
+      Data::Txt& t3 = *d3.ptr[i].txt;
+      delete [] t3.val;
+      t3.val = new char [t1.len];
+      t3.len = t1.len;
+      memcpy(t3.val, t1.val, t1.len);
+    }
+  }
 }
 
 static void
-copykeys(const Data& d1, Data& d3)
+copydata(const Data& d1, Data& d3, bool pk, bool nonpk)
 {
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
-    if (c.pk)
+    if (c.pk && pk || ! c.pk && nonpk)
       copycol(c, d1, d3);
   }
 }
 
 static void
-copydata(const Data& d1, Data& d3)
+compdata(const Data& d1, const Data& d2, Data& d3, bool pk, bool nonpk)
 {
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
-    copycol(c, d1, d3);
+    if (c.pk && pk || ! c.pk && nonpk) {
+      const Data* d = 0;
+      if (d1.ind[i] == -1 && d2.ind[i] == -1)
+        d3.ind[i] = -1;
+      else if (d1.ind[i] == -1 && d2.ind[i] != -1)
+        d = &d2;
+      else if (d1.ind[i] != -1 && d2.ind[i] == -1)
+        d = &d1;
+      else
+        d = &d2;
+      if (d != 0)
+        copycol(c, *d, d3);
+    }
   }
 }
 
@@ -571,33 +774,13 @@
 copyop(const Op* op1, Op* op3)
 {
   op3->type = op1->type;
-  copydata(op1->data[0], op3->data[0]);
-  copydata(op1->data[1], op3->data[1]);
+  copydata(op1->data[0], op3->data[0], true, true);
+  copydata(op1->data[1], op3->data[1], true, true);
+  op3->gci = op1->gci;
   Uint32 pk1_tmp;
   reqrc(checkop(op3, pk1_tmp) == 0);
 }
 
-// not needed for ops
-static void
-compdata(const Data& d1, const Data& d2, Data& d3) // d2 overrides d1
-{
-  uint i;
-  for (i = 0; i < g_ncol; i++) {
-    const Col& c = g_col[i];
-    const Data* d = 0;
-    if (d1.ind[i] == -1 && d2.ind[i] == -1)
-      d3.ind[i] = -1;
-    else if (d1.ind[i] == -1 && d2.ind[i] != -1)
-      d = &d2;
-    else if (d1.ind[i] != -1 && d2.ind[i] == -1)
-      d = &d1;
-    else
-      d = &d2;
-    if (d != 0)
-      copycol(c, *d, d3);
-  }
-}
-
 static int
 compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
 {
@@ -610,16 +793,38 @@
     copyop(op2, op3);
     return 0;
   }
+  Op::Kind kind =
+    op1->kind == Op::OP && op2->kind == Op::OP ? Op::OP : Op::EV;
+  Op* res_op = getop(kind);
   chkrc((comp = comptype(op1->type, op2->type)) != 0);
-  op3->type = comp->t3;
-  copykeys(op2->data[0], op3->data[0]);
-  if (op3->type != Op::DEL)
-    copydata(op2->data[0], op3->data[0]);
-  if (op3->type != Op::INS)
-    copydata(op1->data[1], op3->data[1]);
+  res_op->type = comp->t3;
+  if (res_op->type == Op::INS) {
+    // INS o UPD
+    compdata(op1->data[0], op2->data[0], res_op->data[0], true, true);
+    // pre = undef
+  }
+  if (res_op->type == Op::DEL) {
+    // UPD o DEL
+    copydata(op2->data[0], res_op->data[0], true, false); // PK
+    copydata(op1->data[1], res_op->data[1], false, true); // non-PK
+  } 
+  if (res_op->type == Op::UPD && op1->type == Op::DEL) {
+    // DEL o INS
+    copydata(op2->data[0], res_op->data[0], true, true);
+    copydata(op1->data[0], res_op->data[1], true, false); // PK
+    copydata(op1->data[1], res_op->data[1], false, true); // non-PK
+  }
+  if (res_op->type == Op::UPD && op1->type == Op::UPD) {
+    // UPD o UPD
+    compdata(op1->data[0], op2->data[0], res_op->data[0], true, true);
+    compdata(op2->data[1], op1->data[1], res_op->data[1], true, true);
+  }
+  assert(op1->gci == op2->gci);
+  res_op->gci = op2->gci;
   Uint32 pk1_tmp;
-  reqrc(checkop(op3, pk1_tmp) == 0);
-  // not eliminating identical post-pre fields
+  reqrc(checkop(res_op, pk1_tmp) == 0);
+  copyop(res_op, op3);
+  freeop(res_op);
   return 0;
 }
 
@@ -632,12 +837,14 @@
   NdbDictionary::Event evt(g_evtname);
   evt.setTable(*g_tab);
   evt.addTableEvent(NdbDictionary::Event::TE_ALL);
-  // pk always
-  evt.addEventColumn("pk1");
-  evt.addEventColumn("pk2");
-  // simple cols
-  evt.addEventColumn("seq");
-  evt.addEventColumn("cc1");
+  uint i;
+  for (i = 0; i < ncol(); i++) {
+    const Col& c = g_col[i];
+    evt.addEventColumn(c.name);
+  }
+#ifdef version51rbr
+  evt.separateEvents(g_opts.separate_events);
+#endif
   if (g_dic->getEvent(evt.getName()) != 0)
     chkdb(g_dic->dropEvent(evt.getName()) == 0);
   chkdb(g_dic->createEvent(evt) == 0);
@@ -666,20 +873,22 @@
   chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName(), bsz)) != 0);
 #else
   chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
+#ifdef version51rbr
+  g_evt_op->separateEvents(g_opts.separate_events); // not yet inherited
+#endif
 #endif
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
     Data (&d)[2] = g_rec_ev->data;
-    switch (c.type) {
-    case NdbDictionary::Column::Unsigned:
-    case NdbDictionary::Column::Char:
-      chkdb((g_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i])) != 0);
-      chkdb((g_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i])) != 0);
-      break;
-    default:
-      assert(false);
-      break;
+    if (! c.isblob()) {
+      chkdb((g_ev_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0);
+      chkdb((g_ev_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0);
+    } else {
+#ifdef version51rbr
+      chkdb((g_ev_bh[0][i] = g_evt_op->getBlobHandle(c.name)) != 0);
+      chkdb((g_ev_bh[1][i] = g_evt_op->getPreBlobHandle(c.name)) != 0);
+#endif
     }
   }
   return 0;
@@ -705,9 +914,9 @@
     chkdb((g_con = g_ndb->startTransaction()) != 0);
     { // forced to exec a dummy op
       Uint32 pk1;
-      char pk2[g_charlen];
+      char pk2[g_charlen + 1];
       pk1 = g_maxpk;
-      memset(pk2, 0x20, g_charlen);
+      sprintf(pk2, "%-*u", g_charlen, pk1);
       chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
       chkdb(g_op->readTuple() == 0);
       chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
@@ -723,61 +932,153 @@
       break;
     }
     i = 1;
+    sleep(1);
   }
   return 0;
 }
 
+// scan table and set current tot_op for each pk1
 static int
-makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
+scantab()
 {
-  op->type = t;
-  if (t != Op::INS)
-    copydata(prev_op->data[0], op->data[1]);
+  NdbRecAttr* ra[g_maxcol];
+  NdbBlob* bh[g_maxcol];
+  Op* rec_op = getop(Op::OP);
+  Data& d0 = rec_op->data[0];
+  chkdb((g_con = g_ndb->startTransaction()) != 0);
+  chkdb((g_scan_op = g_con->getNdbScanOperation(g_tabname)) != 0);
+  chkdb(g_scan_op->readTuples() == 0);
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = getcol(i);
-    Data (&d)[2] = op->data;
-    if (i == getcol("pk1").no) {
-      d[0].pk1 = pk1;
-      d[0].ind[i] = 0;
-      continue;
-    }
-    if (i == getcol("pk2").no) {
-      sprintf(d[0].pk2, "%-*u", g_charlen, d[0].pk1);
-      d[0].ind[i] = 0;
-      continue;
+    if (! c.isblob()) {
+      chkdb((ra[i] = g_scan_op->getValue(c.name, (char*)d0.ptr[i].v)) != 0);
+    } else {
+      chkdb((bh[i] = g_scan_op->getBlobHandle(c.name)) != 0);
     }
-    if (t == Op::DEL) {
-      d[0].ind[i] = -1;
-      continue;
-    }
-    if (i == getcol("seq").no) {
-      d[0].seq = g_seq++;
-      d[0].ind[i] = 0;
+  }
+  chkdb(g_con->execute(NoCommit) == 0);
+  int ret;
+  while ((ret = g_scan_op->nextResult()) == 0) {
+    Uint32 pk1 = d0.pk1;
+    if (pk1 >= g_opts.maxpk)
       continue;
+    rec_op->type = Op::INS;
+    for (i = 0; i < ncol(); i++) {
+      const Col& c = getcol(i);
+      int ind;
+      if (! c.isblob()) {
+        ind = ra[i]->isNULL();
+      } else {
+#ifdef version51rbr
+        int ret;
+        ret = bh[i]->getDefined(ind);
+        assert(ret == 0);
+        if (ind == 0) {
+          Data::Txt& t = *d0.ptr[i].txt;
+          Uint64 len64;
+          ret = bh[i]->getLength(len64);
+          assert(ret == 0);
+          t.len = (uint)len64;
+          delete [] t.val;
+          t.val = new char [t.len];
+          memset(t.val, 'X', t.len);
+          Uint32 len = t.len;
+          ret = bh[i]->readData(t.val, len);
+          assert(ret == 0 && len == t.len);
+        }
+#endif
+      }
+      assert(ind >= 0);
+      d0.ind[i] = ind;
     }
-    uint u;
-    u = urandom(100);
-    if (c.nullable && u < 20) {
-      d[0].ind[i] = 1;
-      continue;
+    assert(g_pk_op[pk1] == 0);
+    Op* tot_op = g_pk_op[pk1] = getop(Op::OP);
+    copyop(rec_op, tot_op);
+    tot_op->type = Op::INS;
+  }
+  chkdb(ret == 1);
+  g_ndb->closeTransaction(g_con);
+  g_scan_op = 0;
+  g_con = 0;
+  freeop(rec_op);
+  return 0;
+}
+
+static void
+makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
+{
+  uint i = c.no;
+  if (c.pk) {
+    switch (c.type) {
+    case NdbDictionary::Column::Unsigned:
+      {
+        Uint32* p = d.ptr[i].u32;
+        *p = pk1;
+      }
+      break;
+    case NdbDictionary::Column::Char:
+      {
+        char* p = d.ptr[i].ch;
+        sprintf(p, "%-*u", g_charlen, pk1);
+      }
+      break;
+    default:
+      assert(false);
+      break;
     }
+    d.ind[i] = 0;
+  } else if (t == Op::DEL) {
+    ;
+  } else if (i == getcol("seq").no) {
+    d.seq = g_seq++;
+    d.ind[i] = 0;
+  } else if (t == Op::INS && c.nullable && urandom(10, 100)) {
+    d.noop |= (1 << i);
+    d.ind[i] = 1; // implicit NULL value is known
+  } else if (t == Op::UPD && urandom(10, 100)) {
+    d.noop |= (1 << i);
+    d.ind[i] = -1; // fixed up in caller
+  } else if (c.nullable && urandom(10, 100)) {
+    d.ind[i] = 1;
+  } else {
     switch (c.type) {
     case NdbDictionary::Column::Unsigned:
       {
-        u = urandom(0);
-        Uint32* p = (Uint32*)d[0].ptr[i];
+        Uint32* p = d.ptr[i].u32;
+        uint u = urandom();
         *p = u;
       }
       break;
     case NdbDictionary::Column::Char:
       {
-        u = urandom(g_charlen);
-        char* p = (char*)d[0].ptr[i];
+        char* p = d.ptr[i].ch;
+        uint u = urandom(g_charlen);
         uint j;
         for (j = 0; j < g_charlen; j++) {
-          uint v = urandom(3);
-          p[j] = j < u ? "abcde"[v] : 0x20;
+          uint v = urandom(strlen(g_charval));
+          p[j] = j < u ? g_charval[v] : 0x20;
+        }
+      }
+      break;
+    case NdbDictionary::Column::Text:
+      {
+        Data::Txt& t = *d.ptr[i].txt;
+        uint u = urandom(g_maxblobsize);
+        u = urandom(u); // 4x bias for smaller blobs
+        u = urandom(u);
+        delete [] t.val;
+        t.val = new char [u];
+        t.len = u;
+        uint j = 0;
+        while (j < u) {
+          assert(u > 0);
+          uint k = 1 + urandom(u - 1);
+          if (k > u - j)
+            k = u - j;
+          uint v = urandom(strlen(g_charval));
+          memset(&t.val[j], g_charval[v], k);
+          j += k;
         }
       }
       break;
@@ -785,74 +1086,78 @@
       assert(false);
       break;
     }
-    d[0].ind[i] = 0;
+    d.ind[i] = 0;
   }
-  Uint32 pk1_tmp = ~(Uint32)0;
-  chkrc(checkop(op, pk1_tmp) == 0);
-  reqrc(pk1 == pk1_tmp);
-  return 0;
 }
 
 static void
-makeop(Op* tot_op, Op* com_op, Uint32 pk1, Op::Type t)
+makeop(const Op* prev_op, Op* op, Uint32 pk1, Op::Type t)
 {
-  Op tmp_op;
-  tmp_op.kind = Op::OP;
-  Op* op = getop();
-  reqrc(makeop(op, pk1, t, tot_op) == 0);
-  // add to end
-  Op* last_op = com_op;
-  while (last_op->next_op != 0)
-    last_op = last_op->next_op;
-  last_op->next_op = op;
-  // merge into chain head
-  tmp_op.init();
-  reqrc(compop(com_op, op, &tmp_op) == 0);
-  copyop(&tmp_op, com_op);
-  // merge into total op
-  tmp_op.init();
-  reqrc(compop(tot_op, op, &tmp_op) == 0);
-  copyop(&tmp_op, tot_op);
-  // counts
-  com_op->num_op += 1;
-  tot_op->num_op += 1;
+  op->type = t;
+  const Data& dp = prev_op->data[0];
+  Data& d0 = op->data[0];
+  Data& d1 = op->data[1];
+  uint i;
+  for (i = 0; i < ncol(); i++) {
+    const Col& c = getcol(i);
+    makedata(c, d0, pk1, t);
+    if (t == Op::INS) {
+      d1.ind[i] = -1;
+    } else if (t == Op::DEL) {
+      assert(dp.ind[i] >= 0);
+      if (c.pk)
+        d1.ind[i] = -1;
+      else
+        copycol(c, dp, d1);
+    } else if (t == Op::UPD) {
+      assert(dp.ind[i] >= 0);
+      if (d0.ind[i] == -1) // not updating this col
+        copycol(c, dp, d0); // must keep track of data
+      copycol(c, dp, d1);
+    } else {
+      assert(false);
+    }
+  }
+  Uint32 pk1_tmp = ~(Uint32)0;
+  reqrc(checkop(op, pk1_tmp) == 0);
+  reqrc(pk1 == pk1_tmp);
 }
 
 static void
 makeops()
 {
   ll1("makeops");
-  uint resv = g_opts.opstr == 0 ? 2 * g_opts.maxpk : 0; // for final deletes
-  uint next = g_opts.opstr == 0 ? g_maxcom : strlen(g_opts.opstr);
-  Op tmp_op;
-  tmp_op.kind = Op::OP;
   Uint32 pk1 = 0;
-  while (getfreeops() >= resv + 2 + next && pk1 < g_opts.maxpk) {
-    if (g_opts.opstr == 0)
+  while (g_usedops < g_opts.maxops && pk1 < g_opts.maxpk) {
+    if (g_opts.opstring == 0)
       pk1 = urandom(g_opts.maxpk);
-    ll2("makeops: pk1=" << pk1 << " free=" << getfreeops());
+    ll2("makeops: pk1=" << pk1);
     // total op on the pk so far
     // optype either NUL=initial/deleted or INS=created
     Op* tot_op = g_pk_op[pk1];
     if (tot_op == 0)
-      tot_op = g_pk_op[pk1] = getop(); //1
+      tot_op = g_pk_op[pk1] = getop(Op::OP);
     assert(tot_op->type == Op::NUL || tot_op->type == Op::INS);
     // add new commit chain to end
-    Op* last_com = tot_op;
-    while (last_com->next_com != 0)
-      last_com = last_com->next_com;
-    Op* com_op = getop(); //2
-    last_com->next_com = com_op;
+    Op* last_gci = tot_op;
+    while (last_gci->next_gci != 0)
+      last_gci = last_gci->next_gci;
+    Op* gci_op = getop(Op::OP);
+    last_gci->next_gci = gci_op;
+    Op* com_op = getop(Op::OP);
+    gci_op->next_com = com_op;
     // length of random chain
     uint len = ~0;
-    if (g_opts.opstr == 0)
+    if (g_opts.opstring == 0) {
       len = 1 + urandom(g_maxcom - 1);
+      len = 1 + urandom(len - 1); // 2x bias for short chain
+    }
     ll2("makeops: com chain");
     uint n = 0;
     while (1) {
-      // random or from g_opts.opstr
+      // random or from current g_opts.opstring part
       Op::Type t;
-      if (g_opts.opstr == 0) {
+      if (g_opts.opstring == 0) {
         if (n == len)
           break;
         do {
@@ -860,10 +1165,11 @@
         } while (tot_op->type == Op::NUL && (t == Op::DEL || t == Op::UPD) ||
                  tot_op->type == Op::INS && t == Op::INS);
       } else {
-        uint m = strlen(g_opts.opstr);
+        const char* str = g_opstringpart[g_loop % g_opstringparts];
+        uint m = strlen(str);
         uint k = tot_op->num_com + tot_op->num_op;
         assert(k < m);
-        char c = g_opts.opstr[k];
+        char c = str[k];
         if (c == 'c') {
           if (k + 1 == m)
             pk1 += 1;
@@ -874,30 +1180,27 @@
         assert(q != 0);
         t = (Op::Type)(q - p);
       }
-      makeop(tot_op, com_op, pk1, t);
+      Op* op = getop(Op::OP);
+      makeop(tot_op, op, pk1, t);
+      // add to end
+      Op* last_op = com_op;
+      while (last_op->next_op != 0)
+        last_op = last_op->next_op;
+      last_op->next_op = op;
+      // merge into chain head and total op
+      reqrc(compop(com_op, op, com_op) == 0);
+      reqrc(compop(tot_op, op, tot_op) == 0);
       assert(tot_op->type == Op::NUL || tot_op->type == Op::INS);
+      // counts
+      com_op->num_op += 1;
+      tot_op->num_op += 1;
       n++;
     }
+    // copy to gci level
+    copyop(com_op, gci_op);
     tot_op->num_com += 1;
   }
-  assert(getfreeops() >= resv);
-  // terminate with DEL if necessary
-  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    Op* tot_op = g_pk_op[pk1];
-    if (tot_op == 0)
-      continue;
-    if (tot_op->type == Op::NUL)
-      continue;
-    assert(g_opts.opstr == 0);
-    Op* last_com = tot_op;
-    while (last_com->next_com != 0)
-      last_com = last_com->next_com;
-    Op* com_op = getop(); //1
-    last_com->next_com = com_op;
-    makeop(tot_op, com_op, pk1, Op::DEL);
-    assert(tot_op->type == Op::NUL);
-    tot_op->num_com += 1;
-  }
+  ll1("makeops: used ops = " << g_usedops);
 }
 
 static int
@@ -919,23 +1222,36 @@
     break;
   }
   uint i;
-  for (i = 0; i < g_ncol; i++) {
+  for (i = 0; i < ncol(); i++) {
     const Col& c = getcol(i);
     const Data& d = op->data[0];
     if (! c.pk)
       continue;
-    chkdb(g_op->equal(c.name, (char*)d.ptr[i]) == 0);
+    chkdb(g_op->equal(c.name, (const char*)d.ptr[i].v) == 0);
   }
   if (op->type != Op::DEL) {
-    for (i = 0; i < g_ncol; i++) {
+    for (i = 0; i < ncol(); i++) {
       const Col& c = getcol(i);
       const Data& d = op->data[0];
       if (c.pk)
         continue;
-      if (d.ind[i] == -1)
+      if (d.noop & (1 << i))
         continue;
-      const char* ptr = d.ind[i] == 0 ? (char*)d.ptr[i] : 0;
-      chkdb(g_op->setValue(c.name, ptr) == 0);
+      assert(d.ind[i] >= 0);
+      if (! c.isblob()) {
+        if (d.ind[i] == 0)
+          chkdb(g_op->setValue(c.name, (const char*)d.ptr[i].v) == 0);
+        else
+          chkdb(g_op->setValue(c.name, (const char*)0) == 0);
+      } else {
+        const Data::Txt& t = *d.ptr[i].txt;
+        g_bh = g_op->getBlobHandle(c.name);
+        if (d.ind[i] == 0)
+          chkdb(g_bh->setValue(t.val, t.len) == 0);
+        else
+          chkdb(g_bh->setValue(0, 0) == 0);
+        g_bh = 0;
+      }
     }
   }
   g_op = 0;
@@ -947,40 +1263,43 @@
 {
   ll1("runops");
   Uint32 pk1;
-  const Op* com_op[g_maxpk];
-  uint left = 0;
+  Op* gci_op[g_maxpk];
+  uint left = 0; // number of pks with ops
   for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
-    com_op[pk1] = 0;
+    gci_op[pk1] = 0;
     // total op on the pk
     Op* tot_op = g_pk_op[pk1];
     if (tot_op == 0)
       continue;
     // first commit chain
-    assert(tot_op->next_com != 0);
-    com_op[pk1] = tot_op->next_com;
+    assert(tot_op->next_gci != 0);
+    gci_op[pk1] = tot_op->next_gci;
     left++;
   }
   while (left != 0) {
     pk1 = urandom(g_opts.maxpk);
-    if (com_op[pk1] == 0)
+    if (gci_op[pk1] == 0)
       continue;
     // do the ops in one transaction
-    ll2("runops: pk1=" << pk1);
     chkdb((g_con = g_ndb->startTransaction()) != 0);
+    Op* com_op = gci_op[pk1]->next_com;
+    assert(com_op != 0);
     // first op in chain
-    Op* op = com_op[pk1]->next_op;
+    Op* op = com_op->next_op;
     assert(op != 0);
     while (op != 0) {
-      ll2("add op:" << *op);
+      ll2("runops:" << *op);
       chkrc(addndbop(op) == 0);
       op = op->next_op;
     }
     chkdb(g_con->execute(Commit) == 0);
+    gci_op[pk1]->gci = com_op->gci = g_con->getGCI();
+    ll2("commit: gci=" << com_op->gci);
     g_ndb->closeTransaction(g_con);
     g_con = 0;
     // next chain
-    com_op[pk1] = com_op[pk1]->next_com;
-    if (com_op[pk1] == 0) {
+    gci_op[pk1] = gci_op[pk1]->next_gci;
+    if (gci_op[pk1] == 0) {
       assert(left != 0);
       left--;
     }
@@ -989,13 +1308,106 @@
   return 0;
 }
 
+// move com chains with same gci under same gci entry
+static int
+mergeops()
+{
+  ll1("mergeops");
+  uint mergecnt = 0;
+  Uint32 pk1;
+  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
+    Op* tot_op = g_pk_op[pk1];
+    if (tot_op == 0)
+      continue;
+    Op* gci_op = tot_op->next_gci;
+    assert(gci_op != 0);
+    while (gci_op != 0) {
+      Op* com_op = gci_op->next_com;
+      assert(com_op != 0 && com_op->next_com == 0);
+      assert(gci_op->gci == com_op->gci);
+      Op* last_com = com_op;
+      Op* gci_op2 = gci_op->next_gci;
+      while (gci_op2 != 0 && gci_op->gci == gci_op2->gci) {
+        // move link to com level
+        last_com = last_com->next_com = gci_op2->next_com;
+        // merge to gci
+        reqrc(compop(gci_op, gci_op2, gci_op) == 0);
+        // move to next and discard
+        Op* tmp_op = gci_op2;
+        gci_op2 = gci_op2->next_gci;
+        freeop(tmp_op);
+        mergecnt++;
+      }
+      gci_op = gci_op->next_gci = gci_op2;
+    }
+  }
+  ll1("mergeops: used ops = " << g_usedops);
+  ll1("mergeops: merged " << mergecnt << " gci entries");
+  return 0;
+}
+
+// set bit for equal post/pre data in UPD, for use in event match
+static void
+cmppostpre()
+{
+  ll1("cmppostpre");
+  Uint32 pk1;
+  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
+    Op* tot_op = g_pk_op[pk1];
+    Op* gci_op = tot_op ? tot_op->next_gci : 0;
+    while (gci_op != 0) {
+      if (gci_op->type == Op::UPD) {
+        Data (&d)[2] = gci_op->data;
+        uint i;
+        for (i = 0; i < ncol(); i++) {
+          const Col& c = getcol(i);
+          bool eq =
+            d[0].ind[i] == 1 && d[1].ind[i] == 1 ||
+            d[0].ind[i] == 0 && d[1].ind[i] == 0 && cmpcol(c, d[0], d[1]) == 0;
+          if (eq) {
+            d[0].ppeq |= (1 << i);
+            d[1].ppeq |= (1 << i);
+          }
+        }
+      }
+      gci_op = gci_op->next_gci;
+    }
+  }
+}
+static int
+cmpopevdata(const Data& d1, const Data& d2)
+{
+  uint i;
+  for (i = 0; i < ncol(); i++) {
+    const Col& c = getcol(i);
+    if (cmpcol(c, d1, d2) != 0) {
+      if ((d1.ppeq & (1 << i)) && d2.ind[i] == -1)
+        ; // post/pre data equal and no event data returned is OK
+      else
+        return 1;
+    }
+  }
+  return 0;
+}
+
+// compare operation to event data
+static int
+cmpopevdata(const Data (&d1)[2], const Data (&d2)[2])
+{
+  if (cmpopevdata(d1[0], d2[0]) != 0)
+    return 1;
+  if (cmpopevdata(d1[1], d2[1]) != 0)
+    return 1;
+  return 0;
+}
+
 static int
 matchevent(Op* ev)
 {
   Op::Type t = ev->type;
-  Data (&d)[2] = ev->data;
+  Data (&d2)[2] = ev->data;
   // get PK
-  Uint32 pk1 = d[0].pk1;
+  Uint32 pk1 = d2[0].pk1;
   chkrc(pk1 < g_opts.maxpk);
   // on error repeat and print details
   uint loop = 0;
@@ -1004,42 +1416,59 @@
     ll1("matchevent: pk1=" << pk1 << " type=" << t);
     ll2("EVT: " << *ev);
     Op* tot_op = g_pk_op[pk1];
-    Op* com_op = tot_op ? tot_op->next_com : 0;
-    uint cnt = 0;
+    Op* gci_op = tot_op ? tot_op->next_gci : 0;
+    uint pos = 0;
     bool ok = false;
-    while (com_op != 0) {
-      ll2("COM: " << *com_op);
-      Op* op = com_op->next_op;
-      assert(op != 0);
-      while (op != 0) {
-        ll2("---: " << *op);
-        op = op->next_op;
+    while (gci_op != 0) {
+      ll2("GCI: " << *gci_op);
+      // print details
+      Op* com_op = gci_op->next_com;
+      assert(com_op != 0);
+      while (com_op != 0) {
+        ll2("COM: " << *com_op);
+        Op* op = com_op->next_op;
+        assert(op != 0);
+        while (op != 0) {
+          ll2("OP : " << *op);
+          op = op->next_op;
+        }
+        com_op = com_op->next_com;
       }
-      if (com_op->type != Op::NUL) {
-        if (com_op->type == t) {
-          const Data (&d2)[2] = com_op->data;
-          if (t == Op::INS && d2[0].seq == d[0].seq ||
-              t == Op::DEL && d2[1].seq == d[1].seq ||
-              t == Op::UPD && d2[0].seq == d[0].seq) {
-            if (cnt == g_ev_cnt[pk1]) {
-              if (! com_op->match) {
-                ll2("match pos " << cnt);
-                ok = com_op->match = true;
-              } else {
-                ll2("duplicate match");
-              }
-            } else {
-              ll2("match bad pos event=" << g_ev_cnt[pk1] << " op=" << cnt);
-            }
+      // match agains GCI op
+      if (gci_op->type != Op::NUL) {
+        const Data (&d1)[2] = gci_op->data;
+        if (cmpopevdata(d1, d2) == 0) {
+          bool tmpok = true;
+          if (gci_op->type != t) {
+            ll2("***: wrong type " << gci_op->type << " != " << t);
+            tmpok = false;
+          }
+          if (gci_op->match) {
+            ll2("***: duplicate match");
+            tmpok = false;
+          }
+          if (pos != g_ev_pos[pk1]) {
+            ll2("***: wrong pos " << pos << " != " << g_ev_pos[pk1]);
+            tmpok = false;
+          }
+          if (gci_op->gci != ev->gci) {
+            ll2("***: wrong gci " << gci_op->gci << " != " << ev->gci);
+            tmpok = false;
+          }
+          if (tmpok) {
+            ok = gci_op->match = true;
+            ll2("===: match");
           }
         }
-        cnt++;
+        pos++;
       }
-      com_op = com_op->next_com;
+      gci_op = gci_op->next_gci;
     }
-    if (ok)
+    if (ok) {
+      ll1("matchevent: match");
       return 0;
-    ll2("no match");
+    }
+    ll1("matchevent: ERROR: no match");
     if (g_loglevel >= 2)
       return -1;
     loop++;
@@ -1056,12 +1485,12 @@
     Op* tot_ev = g_pk_ev[pk1];
     if (tot_ev == 0)
       continue;
-    Op* com_ev = tot_ev->next_com;
-    while (com_ev != 0) {
-      if (matchevent(com_ev) < 0)
+    Op* ev = tot_ev->next_ev;
+    while (ev != 0) {
+      if (matchevent(ev) < 0)
         nomatch++;
-      g_ev_cnt[pk1]++;
-      com_ev = com_ev->next_com;
+      g_ev_pos[pk1]++;
+      ev = ev->next_ev;
     }
   }
   chkrc(nomatch == 0);
@@ -1095,22 +1524,58 @@
   return 0;
 }
 
+static void
+geteventdata()
+{
+  Data (&d)[2] = g_rec_ev->data;
+  int i, j;
+  for (j = 0; j < 2; j++) {
+    for (i = 0; i < ncol(); i++) {
+      const Col& c = getcol(i);
+      int ind, ret;
+      if (! c.isblob()) {
+        NdbRecAttr* ra = g_ev_ra[j][i];
+        ind = ra->isNULL();
+      } else {
+#ifdef version51rbr
+        NdbBlob* bh = g_ev_bh[j][i];
+        ret = bh->getDefined(ind);
+        assert(ret == 0);
+        if (ind == 0) { // value was returned and is not NULL
+          Data::Txt& t = *d[j].ptr[i].txt;
+          Uint64 len64;
+          ret = bh->getLength(len64);
+          assert(ret == 0);
+          t.len = (uint)len64;
+          delete [] t.val;
+          t.val = new char [t.len];
+          memset(t.val, 'X', t.len);
+          Uint32 len = t.len;
+          ret = bh->readData(t.val, len);
+          assert(ret == 0 && len == t.len);
+        }
+#endif
+      }
+      d[j].ind[i] = ind;
+    }
+  }
+}
+
 static int
 runevents()
 {
   ll1("runevents");
-  NdbEventOperation* evt_op;
-  uint npoll = 3;
+  uint mspoll = 1000;
+  uint npoll = 6; // strangely long delay
   while (npoll != 0) {
     npoll--;
     int ret;
     ll1("poll");
-    ret = g_ndb->pollEvents(1000);
+    ret = g_ndb->pollEvents(mspoll);
     if (ret <= 0)
       continue;
     while (1) {
-      g_rec_ev->init();
-      Data (&d)[2] = g_rec_ev->data;
+      g_rec_ev->init(Op::EV);
 #ifdef version50
       int overrun = g_opts.maxops;
       chkdb((ret = g_evt_op->next(&overrun)) >= 0);
@@ -1124,32 +1589,35 @@
       reqrc(g_evt_op == tmp_op);
 #endif
       chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0);
-      // get indicators
-      { int i, j;
-        for (j = 0; j < 2; j++)
-          for (i = 0; i < g_ncol; i++)
-            d[j].ind[i] = g_ra[j][i]->isNULL();
+      geteventdata();
+      g_rec_ev->gci = g_evt_op->getGCI();
+#ifdef version50
+      // fix to match 5.1
+      if (g_rec_ev->type == Op::UPD) {
+        Uint32 pk1 = g_rec_ev->data[0].pk1;
+        makedata(getcol("pk1"), g_rec_ev->data[1], pk1, Op::UPD);
+        makedata(getcol("pk2"), g_rec_ev->data[1], pk1, Op::UPD);
       }
+#endif
+      // get indicators and blob value
       ll2("runevents: EVT: " << *g_rec_ev);
       // check basic sanity
       Uint32 pk1 = ~(Uint32)0;
       chkrc(checkop(g_rec_ev, pk1) == 0);
       // add to events
-      chkrc(getfreeevs() >= 2);
       Op* tot_ev = g_pk_ev[pk1];
       if (tot_ev == 0)
-        tot_ev = g_pk_ev[pk1] = getev(); //1
-      Op* last_com = tot_ev;
-      while (last_com->next_com != 0)
-        last_com = last_com->next_com;
+        tot_ev = g_pk_ev[pk1] = getop(Op::EV);
+      Op* last_ev = tot_ev;
+      while (last_ev->next_ev != 0)
+        last_ev = last_ev->next_ev;
       // copy and add
-      Op* ev = getev(); //3
+      Op* ev = getop(Op::EV);
       copyop(g_rec_ev, ev);
-      last_com->next_com = ev;
+      last_ev->next_ev = ev;
     }
   }
-  chkrc(matchevents() == 0);
-  chkrc(matchops() == 0);
+  ll1("runevents: used ops = " << g_usedops);
   return 0;
 }
 
@@ -1179,18 +1647,23 @@
   setseed(-1);
   chkrc(createtable() == 0);
   chkrc(createevent() == 0);
-  uint n;
-  for (n = 0; n < g_opts.loop; n++) {
-    ll0("loop " << n);
-    setseed(n);
+  for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
+    ll0("loop " << g_loop);
+    setseed(g_loop);
     resetmem();
-    g_rec_ev = getev();
+    chkrc(scantab() == 0); // alternative: save tot_op for loop > 0
+    makeops();
+    g_rec_ev = getop(Op::EV);
     chkrc(createeventop() == 0);
     chkdb(g_evt_op->execute() == 0);
     chkrc(waitgci() == 0);
-    makeops();
     chkrc(runops() == 0);
+    if (! g_opts.separate_events)
+      chkrc(mergeops() == 0);
+    cmppostpre();
     chkrc(runevents() == 0);
+    chkrc(matchevents() == 0);
+    chkrc(matchops() == 0);
     chkrc(dropeventop() == 0);
   }
   chkrc(dropevent() == 0);
@@ -1204,31 +1677,41 @@
 my_long_options[] =
 {
   NDB_STD_OPTS("test_event_merge"),
-  { "abort-on-error", 1008, "Do abort() on any error",
+  { "abort-on-error", 1001, "Do abort() on any error",
     (gptr*)&g_opts.abort_on_error, (gptr*)&g_opts.abort_on_error, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "loglevel", 1001, "Logging level in this program (default 0)",
+  { "loglevel", 1002, "Logging level in this program (default 0)",
     (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0,
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "loop", 1002, "Number of test loops (default 1, 0=forever)",
+  { "loop", 1003, "Number of test loops (default 2, 0=forever)",
     (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0,
-    GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
-  { "maxops", 1003, "Number of PK operations (default 2000)",
+    GET_INT, REQUIRED_ARG, 2, 0, 0, 0, 0, 0 },
+  { "maxops", 1004, "Approx number of PK operations (default 1000)",
     (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0,
-    GET_UINT, REQUIRED_ARG, 2000, 0, g_maxops, 0, 0, 0 },
-  { "maxpk", 1004, "Number of different PK values (default 10)",
+    GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
+  { "maxpk", 1005, "Number of different PK values (default 10)",
     (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0,
     GET_UINT, REQUIRED_ARG, 10, 1, g_maxpk, 0, 0, 0 },
-  { "opstr", 1005, "Ops to run e.g. idiucdc (c = commit, default random)",
-    (gptr*)&g_opts.opstr, (gptr*)&g_opts.opstr, 0,
+  { "no-blobs", 1006, "Omit blob attributes (5.0: true)",
+    (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "no-multiops", 1007, "Allow only 1 operation per commit",
+    (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "one-blob", 1008, "Only one blob attribute (defautt 2)",
+    (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "opstring", 1009, "Operations to run e.g. idiucdc (c is commit) or"
+                      " iuuc:uudc (the : separates loops)",
+    (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0,
     GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "seed", 1006, "Random seed (0=loop number, default -1=random)",
+  { "seed", 1010, "Random seed (0=loop number, default -1=random)",
     (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
     GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
-  { "separate-events", 1007, "Do not combine events per GCI >5.0",
+  { "separate-events", 1011, "Do not combine events per GCI (5.0: true)",
     (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "use-table", 1008, "Use existing table 'tem1'",
+  { "use-table", 1012, "Use existing table 'tem1'",
     (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { 0, 0, 0,
@@ -1245,14 +1728,36 @@
 static int
 checkopts()
 {
-  if (g_opts.opstr != 0) {
-    const char* s = g_opts.opstr;
-    uint n = strlen(s);
-    if (n < 3 || s[0] != 'i' || s[n-2] != 'd' || s[n-1] != 'c')
-      return -1;
-    while (*s != 0)
-      if (strchr("iduc", *s++) == 0)
+#ifdef version50
+  g_opts.separate_events = true;
+#endif
+  if (g_opts.separate_events) {
+    g_opts.no_blobs = true;
+  }
+  if (g_opts.no_multiops) {
+    g_maxcom = 1;
+  }
+  if (g_opts.opstring != 0) {
+    uint len = strlen(g_opts.opstring);
+    char* str = new char [len + 1];
+    memcpy(str, g_opts.opstring, len + 1);
+    char* s = str;
+    while (1) {
+      g_opstringpart[g_opstringparts++] = s;
+      s = strchr(s, ':');
+      if (s == 0)
+        break;
+      *s++ = 0;
+    }
+    uint i;
+    for (i = 0; i < g_opstringparts; i++) {
+      const char* s = g_opstringpart[i];
+      while (*s != 0)
+        if (strchr("iduc", *s++) == 0)
+          return -1;
+      if (s == g_opstringpart[i] || s[-1] != 'c')
         return -1;
+    }
   }
   return 0;
 }
@@ -1279,6 +1784,10 @@
       if (runtest() == 0)
         return NDBT_ProgramExit(NDBT_OK);
     }
+  }
+  if (g_evt_op != 0) {
+    (void)dropeventop();
+    g_evt_op = 0;
   }
   delete g_ndb;
   delete g_ncc;
Thread
bk commit into 5.0 tree (pekka:1.2021)pekka9 Jan