List:Internals« Previous MessageNext Message »
From:pekka Date:December 5 2005 2:20pm
Subject:bk commit into 5.1 tree (pekka:1.1950)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1950 05/12/05 15:20:15 pekka@stripped +8 -0
  ndb - wl#2972 part 1: merge events per event op, gci

  storage/ndb/test/ndbapi/test_event.cpp
    1.13 05/12/05 15:19:28 pekka@stripped +4 -0
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
    1.9 05/12/05 15:19:28 pekka@stripped +46 -6
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.25 05/12/05 15:19:27 pekka@stripped +472 -99
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/src/ndbapi/NdbEventOperation.cpp
    1.7 05/12/05 15:19:27 pekka@stripped +5 -0
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
    1.2 05/12/05 15:19:27 pekka@stripped +53 -33
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/ndbapi-examples/ndbapi_event/Makefile
    1.2 05/12/05 15:19:27 pekka@stripped +2 -2
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/include/ndbapi/NdbEventOperation.hpp
    1.15 05/12/05 15:19:27 pekka@stripped +6 -0
    wl#2972 part 1: merge events per event op, gci

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.58 05/12/05 15:19:27 pekka@stripped +2 -1
    wl#2972 part 1: merge events per event op, gci

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51-rbr

--- 1.57/storage/ndb/include/ndbapi/NdbDictionary.hpp	2005-11-07 12:19:05 +01:00
+++ 1.58/storage/ndb/include/ndbapi/NdbDictionary.hpp	2005-12-05 15:19:27 +01:00
@@ -1027,7 +1027,8 @@
       _TE_CREATE=6,
       _TE_GCP_COMPLETE=7,
       _TE_CLUSTER_FAILURE=8,
-      _TE_STOP=9
+      _TE_STOP=9,
+      _TE_NUL=10 // internal (INS o DEL within same GCI)
     };
 #endif
     /**

--- 1.14/storage/ndb/include/ndbapi/NdbEventOperation.hpp	2005-09-15 10:42:02 +02:00
+++ 1.15/storage/ndb/include/ndbapi/NdbEventOperation.hpp	2005-12-05 15:19:27 +01:00
@@ -93,6 +93,12 @@
    * Retrieve current state of the NdbEventOperation object
    */
   State getState();
+  /**
+   * By default events on same NdbEventOperation within same GCI
+   * are merged into a single event.  This can be changed with
+   * separateEvents(true).
+   */
+  void separateEvents(bool flag);
 
   /**
    * Activates the NdbEventOperation to start receiving events. The

--- 1.6/storage/ndb/src/ndbapi/NdbEventOperation.cpp	2005-09-15 10:42:04 +02:00
+++ 1.7/storage/ndb/src/ndbapi/NdbEventOperation.cpp	2005-12-05 15:19:27 +01:00
@@ -38,6 +38,11 @@
   return m_impl.getState();
 }
 
+void NdbEventOperation::separateEvents(bool flag)
+{
+  m_impl.m_separateEvents = flag;
+}
+
 NdbRecAttr *
 NdbEventOperation::getValue(const char *colName, char *aValue)
 {

--- 1.24/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2005-11-22 18:04:55 +01:00
+++ 1.25/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2005-12-05 15:19:27 +01:00
@@ -104,6 +104,8 @@
 
   m_state= EO_CREATED;
 
+  m_separateEvents = false;
+
   m_has_error= 0;
 
   DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
@@ -693,6 +695,21 @@
   return ret;
 }
 
+#ifdef VM_TRACE
+static void
+print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3])
+{
+  printf("%s\n", tag);
+  printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
+  for (int i = 0; i <= 2; i++) {
+    printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
+    for (int j = 0; j < ptr[i].sz; j++)
+      printf("%08x ", ptr[i].p[j]);
+    printf("\n");
+  }
+}
+#endif
+
 NdbEventOperation *
 NdbEventBuffer::nextEvent()
 {
@@ -734,6 +751,10 @@
     op->m_data_done_count++;
 #endif
 
+    // NUL event is not returned
+    if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
+      continue;
+
     int r= op->receive_event();
     if (r > 0)
     {
@@ -1099,13 +1120,15 @@
   DBUG_ENTER("NdbEventBuffer::insertDataL");
 
   Uint64 gci= sdata->gci;
-  EventBufData *data= m_free_data;
 
   if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) )
   {
     Gci_container* bucket= find_bucket(&m_active_gci, gci);
       
     DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId));
+    DBUG_PRINT("info", ("gci=%d tab=%d op=%d node=%d",
+                        sdata->gci, sdata->tableId, sdata->operation,
+                        sdata->req_nodeid));
 
     if (unlikely(bucket == 0))
     {
@@ -1116,61 +1139,65 @@
       DBUG_RETURN(0);
     }
 
-    if (unlikely(data == 0))
+    bool use_hash =
+      ! op->m_separateEvents &&
+      sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
+
+    // find position in bucket hash table
+    EventBufData* data = 0;
+    EventBufData_hash::Pos hpos;
+    if (use_hash)
     {
-#ifdef VM_TRACE
-      assert(m_free_data_count == 0);
-      assert(m_free_data_sz == 0);
-#endif
-      expand(4000);
-      reportStatus();
+      bucket->m_data_hash.search(hpos, op, ptr);
+      data = hpos.data;
+    }
 
-      data= m_free_data;
+    if (data == 0)
+    {
+      // allocate new result buffer
+      data = alloc_data();
       if (unlikely(data == 0))
       {
-#ifdef VM_TRACE
-	printf("m_latest_command: %s\n", m_latest_command);
-	printf("no free data, m_latestGCI %lld\n",
-	       m_latestGCI);
-	printf("m_free_data_count %d\n", m_free_data_count);
-	printf("m_available_data_count %d first gci %d last gci %d\n",
-	       m_available_data.m_count,
-	       m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
-	       m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
-	printf("m_used_data_count %d\n", m_used_data.m_count);
-#endif
-	op->m_has_error= 2;
-	DBUG_RETURN(-1); // TODO handle this, overrun, or, skip?
+        op->m_has_error = 2;
+        DBUG_RETURN(-1);
       }
-    }
 
-    // remove data from free list
-    m_free_data= data->m_next;
+      if (unlikely(copy_data(sdata, ptr, data)))
+      {
+        op->m_has_error = 3;
+        DBUG_RETURN(-1);
+      }
+      // add it to list and hash table
+      bucket->m_data.append(data);
+      if (use_hash)
+      {
+        bucket->m_data_hash.append(hpos, data);
+      }
 #ifdef VM_TRACE
-    m_free_data_count--;
-    assert(m_free_data_sz >= data->sz);
+      op->m_data_count++;
 #endif
-    m_free_data_sz-= data->sz;
-
-    if (unlikely(copy_data_alloc(sdata, ptr, data)))
+    }
+    else
     {
-      op->m_has_error= 3;
-      DBUG_RETURN(-1);
+      // event with same op, PK found, merge into old buffer
+      if (unlikely(merge_data(sdata, ptr, data)))
+      {
+        op->m_has_error = 3;
+        DBUG_RETURN(-1);
+      }
+    }
+    data->m_event_op = op;
+    if (use_hash)
+    {
+      data->m_pkhash = hpos.pkhash;
     }
-
-    // add it to received data
-    bucket->m_data.append(data);
-
-    data->m_event_op= op;
-#ifdef VM_TRACE
-    op->m_data_count++;
-#endif
     DBUG_RETURN(0);
   }
 
 #ifdef VM_TRACE
   if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation)
   {
+    // XXX never reached
     DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId));
     DBUG_RETURN(0);
   }
@@ -1183,80 +1210,324 @@
 #endif
 }
 
-int 
-NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
-				LinearSectionPtr f_ptr[3],
-				EventBufData *ev_buf)
-{
-  DBUG_ENTER("NdbEventBuffer::copy_data_alloc");
-  const unsigned min_alloc_size= 128;
-  const unsigned sz4= (sizeof(SubTableData)+3)>>2;
-  Uint32 f_ptr_sz_0= f_ptr[0].sz;
-  Uint32 f_ptr_sz_1= f_ptr[1].sz;
-  Uint32 f_ptr_sz_2= f_ptr[2].sz;
-  LinearSectionPtr *t_ptr= ev_buf->ptr;
-  SubTableData *sdata= ev_buf->sdata;
-  const unsigned alloc_size= (sz4 +
-			      f_ptr_sz_0 +
-			      f_ptr_sz_1 +
-			      f_ptr_sz_2) * sizeof(Uint32);
-  Uint32 *ptr;
-  if (alloc_size > min_alloc_size)
-  {
-    if (sdata)
-    {
-      NdbMem_Free((char*)sdata);
-#ifdef VM_TRACE
-      assert(m_total_alloc >= ev_buf->sz);
-#endif
-      m_total_alloc-= ev_buf->sz;
-    }
-    ptr= (Uint32*)NdbMem_Allocate(alloc_size);
-    ev_buf->sdata= (SubTableData *)ptr;
-    ev_buf->sz= alloc_size;
-    m_total_alloc+= alloc_size;
-  }
-  else /* alloc_size <= min_alloc_size */
+// allocate EventBufData
+EventBufData*
+NdbEventBuffer::alloc_data()
+{
+  DBUG_ENTER("alloc_data");
+  EventBufData* data = m_free_data;
+
+  if (unlikely(data == 0))
   {
-    if (sdata)
-      ptr= (Uint32*)sdata;
-    else
+#ifdef VM_TRACE
+    assert(m_free_data_count == 0);
+    assert(m_free_data_sz == 0);
+#endif
+    expand(4000);
+    reportStatus();
+
+    data = m_free_data;
+    if (unlikely(data == 0))
     {
-      ptr= (Uint32*)NdbMem_Allocate(min_alloc_size);
-      ev_buf->sdata= (SubTableData *)ptr;
-      ev_buf->sz= min_alloc_size;
-      m_total_alloc+= min_alloc_size;
+#ifdef VM_TRACE
+      printf("m_latest_command: %s\n", m_latest_command);
+      printf("no free data, m_latestGCI %lld\n",
+             m_latestGCI);
+      printf("m_free_data_count %d\n", m_free_data_count);
+      printf("m_available_data_count %d first gci %d last gci %d\n",
+             m_available_data.m_count,
+             m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
+             m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
+      printf("m_used_data_count %d\n", m_used_data.m_count);
+#endif
+      DBUG_RETURN(0); // TODO handle this, overrun, or, skip?
     }
   }
 
-  memcpy(ptr,f_sdata,sizeof(SubTableData));
-  ptr+= sz4;
+  // remove data from free list
+  m_free_data = data->m_next;
+  data->m_next = 0;
+#ifdef VM_TRACE
+  m_free_data_count--;
+  assert(m_free_data_sz >= data->sz);
+#endif
+  m_free_data_sz -= data->sz;
+  DBUG_RETURN(data);
+}
 
-  t_ptr->p= ptr;
-  t_ptr->sz= f_ptr_sz_0;
+// allocate initial or bigger memory area in EventBufData
+// takes sizes from given ptr and sets up data->ptr
+int
+NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
+{
+  const Uint32 min_alloc_size = 128;
 
-  memcpy(ptr, f_ptr[0].p, sizeof(Uint32)*f_ptr_sz_0);
-  ptr+= f_ptr_sz_0;
-  t_ptr++;
+  Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
+  Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
+  if (alloc_size < min_alloc_size)
+    alloc_size = min_alloc_size;
 
-  t_ptr->p= ptr;
-  t_ptr->sz= f_ptr_sz_1;
+  if (data->sz < alloc_size)
+  {
+    NdbMem_Free((char*)data->memory);
+    assert(m_total_alloc >= data->sz);
+    m_total_alloc -= data->sz;
+    data->memory = 0;
+    data->sz = 0;
 
-  memcpy(ptr, f_ptr[1].p, sizeof(Uint32)*f_ptr_sz_1);
-  ptr+= f_ptr_sz_1;
-  t_ptr++;
+    data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
+    if (data->memory == 0)
+      return -1;
+    data->sz = alloc_size;
+    m_total_alloc += data->sz;
+  }
 
-  if (f_ptr_sz_2)
+  Uint32* memptr = data->memory;
+  memptr += sz4;
+  int i;
+  for (i = 0; i <= 2; i++)
   {
-    t_ptr->p= ptr;
-    t_ptr->sz= f_ptr_sz_2;
-    memcpy(ptr, f_ptr[2].p, sizeof(Uint32)*f_ptr_sz_2);
+    data->ptr[i].p = memptr;
+    data->ptr[i].sz = ptr[i].sz;
+    memptr += ptr[i].sz;
+  }
+
+  return 0;
+}
+
+int 
+NdbEventBuffer::copy_data(const SubTableData * const sdata,
+                          LinearSectionPtr ptr[3],
+                          EventBufData* data)
+{
+  DBUG_ENTER("NdbEventBuffer::copy_data");
+
+  if (alloc_mem(data, ptr) != 0)
+    DBUG_RETURN(-1);
+  memcpy(data->sdata, sdata, sizeof(SubTableData));
+  int i;
+  for (i = 0; i <= 2; i++)
+    memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
+  DBUG_RETURN(0);
+}
+
+static struct Ev_t {
+  enum {
+    INS = NdbDictionary::Event::_TE_INSERT,
+    DEL = NdbDictionary::Event::_TE_DELETE,
+    UPD = NdbDictionary::Event::_TE_UPDATE,
+    NUL = NdbDictionary::Event::_TE_NUL,
+    ERR = 255
+  };
+  int t1, t2, t3;
+} ev_t[] = {
+  { Ev_t::INS, Ev_t::INS, Ev_t::ERR },
+  { Ev_t::INS, Ev_t::DEL, Ev_t::NUL }, //ok
+  { Ev_t::INS, Ev_t::UPD, Ev_t::INS }, //ok
+  { Ev_t::DEL, Ev_t::INS, Ev_t::UPD }, //ok
+  { Ev_t::DEL, Ev_t::DEL, Ev_t::ERR },
+  { Ev_t::DEL, Ev_t::UPD, Ev_t::ERR },
+  { Ev_t::UPD, Ev_t::INS, Ev_t::ERR },
+  { Ev_t::UPD, Ev_t::DEL, Ev_t::DEL }, //ok
+  { Ev_t::UPD, Ev_t::UPD, Ev_t::UPD }  //ok
+};
+
+/*
+ *   | INS            | DEL              | UPD
+ * 0 | pk ah + all ah | pk ah            | pk ah + new ah 
+ * 1 | pk ad + all ad | old pk ad        | new pk ad + new ad 
+ * 2 | empty          | old non-pk ah+ad | old ah+ad
+ */
+
+static AttributeHeader
+copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
+          Uint32 flags)
+{
+  AttributeHeader ah(p2[i2]);
+  bool do_copy = (flags & 1);
+  if (do_copy)
+    p1[i1] = p2[i2];
+  i1++;
+  i2++;
+  return ah;
+}
+
+static void
+copy_attr(AttributeHeader ah,
+          Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
+          Uint32 flags)
+{
+  bool do_copy = (flags & 1);
+  bool with_head = (flags & 2);
+  Uint32 n = with_head + ah.getDataSize();
+  if (do_copy)
+  {
+    Uint32 k;
+    for (k = 0; k < n; k++)
+      p1[j1++] = p2[j2++];
   }
   else
   {
-    t_ptr->p= 0;
-    t_ptr->sz= 0;
+    j1 += n;
+    j2 += n;
   }
+}
+
+int 
+NdbEventBuffer::merge_data(const SubTableData * const sdata,
+                           LinearSectionPtr ptr2[3],
+                           EventBufData* data)
+{
+  DBUG_ENTER("NdbEventBuffer::merge_data");
+
+  Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
+
+  int t1 = data->sdata->operation;
+  int t2 = sdata->operation;
+  if (t1 == Ev_t::NUL)
+    DBUG_RETURN(copy_data(sdata, ptr2, data));
+
+  Ev_t* tp = 0;
+  int i;
+  for (i = 0; i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
+    if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
+      tp = &ev_t[i];
+      break;
+    }
+  }
+  assert(tp != 0 && tp->t3 != Ev_t::ERR);
+
+  // save old data
+  EventBufData olddata = *data;
+  data->memory = 0;
+  data->sz = 0;
+
+  // compose ptr1 o ptr2 = ptr
+  LinearSectionPtr (&ptr1) [3] = olddata.ptr;
+  LinearSectionPtr (&ptr) [3] = data->ptr;
+
+  // loop twice where first loop only sets sizes
+  int loop;
+  for (loop = 0; loop <= 1; loop++)
+  {
+    if (loop == 1)
+    {
+      if (alloc_mem(data, ptr) != 0)
+        DBUG_RETURN(-1);
+      *data->sdata = *sdata;
+      data->sdata->operation = tp->t3;
+    }
+
+    ptr[0].sz = ptr[1].sz = ptr[3].sz = 0;
+
+    // copy pk from new version
+    {
+      AttributeHeader ah;
+      Uint32 i = 0;
+      Uint32 j = 0;
+      Uint32 i2 = 0;
+      Uint32 j2 = 0;
+      while (i < nkey)
+      {
+        ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
+        copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
+      }
+      ptr[0].sz = i;
+      ptr[1].sz = j;
+    }
+
+    // merge after values, new version overrides
+    if (tp->t3 != Ev_t::DEL)
+    {
+      AttributeHeader ah;
+      Uint32 i = ptr[0].sz;
+      Uint32 j = ptr[1].sz;
+      Uint32 i1 = 0;
+      Uint32 j1 = 0;
+      Uint32 i2 = nkey;
+      Uint32 j2 = ptr[1].sz;
+      while (i1 < nkey)
+      {
+        j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
+      }
+      while (1)
+      {
+        bool b1 = (i1 < ptr1[0].sz);
+        bool b2 = (i2 < ptr2[0].sz);
+        if (b1 && b2)
+        {
+          Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
+          Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
+          if (id1 < id2)
+            b2 = false;
+          else if (id1 > id2)
+            b1 = false;
+          else
+          {
+            j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
+            b1 = false;
+          }
+        }
+        if (b1)
+        {
+          ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
+          copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
+        }
+        else if (b2)
+        {
+          ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
+          copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
+        }
+        else
+          break;
+      }
+      ptr[0].sz = i;
+      ptr[1].sz = j;
+    }
+
+    // merge before values, old version overrides
+    if (tp->t3 != Ev_t::INS)
+    {
+      AttributeHeader ah;
+      Uint32 k = 0;
+      Uint32 k1 = 0;
+      Uint32 k2 = 0;
+      while (1)
+      {
+        bool b1 = (k1 < ptr1[2].sz);
+        bool b2 = (k2 < ptr2[2].sz);
+        if (b1 && b2)
+        {
+          Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
+          Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
+          if (id1 < id2)
+            b2 = false;
+          else if (id1 > id2)
+            b1 = false;
+          else
+          {
+            k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
+            b2 = false;
+          }
+        }
+        if (b1)
+        {
+          ah = AttributeHeader(ptr1[2].p[k1]);
+          copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
+        }
+        else if (b2)
+        {
+          ah = AttributeHeader(ptr2[2].p[k2]);
+          copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
+        }
+        else
+          break;
+      }
+    }
+  }
+
+  // free old data
+  NdbMem_Free((char*)olddata.memory);
 
   DBUG_RETURN(0);
 }
@@ -1397,6 +1668,108 @@
 #ifdef VM_TRACE
   assert(m_total_alloc >= m_free_data_sz);
 #endif
+}
+
+// hash table routines
+
+// could optimize the all-fixed case
+Uint32
+EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
+{
+  const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
+
+  // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
+  // for pk update (to equivalent pk) post/pre values give same hash
+  Uint32 nkey = tab->m_noOfKeys;
+  assert(nkey != 0 && nkey <= ptr[0].sz);
+  const Uint32* hptr = ptr[0].p;
+  const uchar* dptr = (uchar*)ptr[1].p;
+
+  // hash registers
+  ulong nr1 = 0;
+  ulong nr2 = 0;
+  while (nkey-- != 0)
+  {
+    AttributeHeader ah(*hptr++);
+    Uint32 bytesize = ah.getByteSize();
+    assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
+
+    Uint32 i = ah.getAttributeId();
+    const NdbColumnImpl* col = tab->getColumn(i);
+    assert(col != 0);
+
+    Uint32 lb, len;
+    bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
+    assert(ok);
+
+    CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
+    (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
+    dptr += bytesize;
+  }
+  return nr1;
+}
+
+// this is seldom invoked
+bool
+EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
+{
+  const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
+
+  Uint32 nkey = tab->m_noOfKeys;
+  assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
+  const Uint32* hptr1 = ptr1[0].p;
+  const Uint32* hptr2 = ptr2[0].p;
+  const uchar* dptr1 = (uchar*)ptr1[1].p;
+  const uchar* dptr2 = (uchar*)ptr2[1].p;
+
+  while (nkey-- != 0)
+  {
+    AttributeHeader ah1(*hptr1++);
+    AttributeHeader ah2(*hptr2++);
+    // sizes can differ on update of varchar endspace
+    Uint32 bytesize1 = ah1.getByteSize();
+    Uint32 bytesize2 = ah1.getByteSize();
+    assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
+    assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
+
+    assert(ah1.getAttributeId() == ah2.getAttributeId());
+    Uint32 i = ah1.getAttributeId();
+    const NdbColumnImpl* col = tab->getColumn(i);
+    assert(col != 0);
+
+    Uint32 lb1, len1;
+    bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
+    Uint32 lb2, len2;
+    bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
+    assert(ok1 && ok2 && lb1 == lb2);
+
+    CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
+    int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
+    if (res != 0)
+      return false;
+    dptr1 += bytesize1;
+    dptr2 += bytesize2;
+  }
+  return true;
+}
+
+void
+EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
+{
+  Uint32 pkhash = getpkhash(op, ptr);
+  Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
+  EventBufData* data = m_hash[index];
+  while (data != 0)
+  {
+    if (data->m_event_op == op &&
+        data->m_pkhash == pkhash &&
+        getpkequal(op, data->ptr, ptr))
+      break;
+    data = data->m_next_hash;
+  }
+  hpos.index = index;
+  hpos.data = data;
+  hpos.pkhash = pkhash;
 }
 
 template class Vector<Gci_container>;

--- 1.8/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2005-11-22 18:04:55 +01:00
+++ 1.9/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2005-12-05 15:19:28 +01:00
@@ -25,16 +25,19 @@
 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
 
 class NdbEventOperationImpl;
+
 struct EventBufData
 {
   union {
     SubTableData *sdata;
-    char *memory;
+    Uint32 *memory;
   };
   LinearSectionPtr ptr[3];
   unsigned sz;
   NdbEventOperationImpl *m_event_op;
   EventBufData *m_next; // Next wrt to global order
+  EventBufData *m_next_hash; // Next in per-GCI hash
+  Uint32 m_pkhash; // PK hash (without op) for fast compare
 };
 
 class EventBufData_list
@@ -116,6 +119,34 @@
   m_sz+= list.m_sz;
 }
 
+// GCI bucket has also a hash over data, with key event op, table PK.
+// It can only be appended to and is invalid after remove_first().
+class EventBufData_hash
+{
+public:
+  struct Pos { // search result
+    Uint32 index;       // index into hash array
+    EventBufData* data; // non-zero if found
+    Uint32 pkhash;      // PK hash
+  };
+
+  static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
+  static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);
+
+  void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
+  void append(Pos& hpos, EventBufData* data);
+
+  enum { GCI_EVENT_HASH_SIZE = 101 };
+  EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
+};
+
+inline
+void EventBufData_hash::append(Pos& hpos, EventBufData* data)
+{
+  data->m_next_hash = m_hash[hpos.index];
+  m_hash[hpos.index] = data;
+}
+
 struct Gci_container
 {
   enum State 
@@ -127,6 +158,7 @@
   Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
   Uint64 m_gci;                    // GCI
   EventBufData_list m_data;
+  EventBufData_hash m_data_hash;
 };
 
 class NdbEventOperationImpl : public NdbEventOperation {
@@ -173,6 +205,8 @@
 		   */
   Uint32 m_eventId;
   Uint32 m_oid;
+
+  bool m_separateEvents;
   
   EventBufData *m_data_item;
 
@@ -212,7 +246,6 @@
   void add_op();
   void remove_op();
   void init_gci_containers();
-  Uint32 m_active_op_count;
 
   // accessed from the "receive thread"
   int insertDataL(NdbEventOperationImpl *op,
@@ -233,10 +266,15 @@
 
   NdbEventOperationImpl *move_data();
 
-  // used by both user thread and receive thread
-  int copy_data_alloc(const SubTableData * const f_sdata,
-		      LinearSectionPtr f_ptr[3],
-		      EventBufData *ev_buf);
+  // routines to copy/merge events
+  EventBufData* alloc_data();
+  int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]);
+  int copy_data(const SubTableData * const sdata,
+                LinearSectionPtr ptr[3],
+                EventBufData* data);
+  int merge_data(const SubTableData * const sdata,
+                 LinearSectionPtr ptr[3],
+                 EventBufData* data);
 
   void free_list(EventBufData_list &list);
 
@@ -290,6 +328,8 @@
   // dropped event operations that have not yet
   // been deleted
   NdbEventOperationImpl *m_dropped_ev_op;
+
+  Uint32 m_active_op_count;
 };
 
 inline

--- 1.12/storage/ndb/test/ndbapi/test_event.cpp	2005-11-05 22:55:37 +01:00
+++ 1.13/storage/ndb/test/ndbapi/test_event.cpp	2005-12-05 15:19:28 +01:00
@@ -169,6 +169,7 @@
     g_err << function << "Event operation creation failed\n";
     return NDBT_FAILED;
   }
+  pOp->separateEvents(true);
 
   g_info << function << "get values\n";
   NdbRecAttr* recAttr[1024];
@@ -370,6 +371,7 @@
       g_err << "Event operation creation failed\n";
       return NDBT_FAILED;
     }
+    pOp->separateEvents(true);
 
     g_info << "dropping event operation" << endl;
     int res = pNdb->dropEventOperation(pOp);
@@ -540,6 +542,7 @@
     g_err << "Event operation creation failed on %s" << buf << endl;
     DBUG_RETURN(NDBT_FAILED);
   }
+  pOp->separateEvents(true);
 
   int i;
   int n_columns= table->getNoOfColumns();
@@ -1185,6 +1188,7 @@
     {
       DBUG_RETURN(NDBT_FAILED);
     }
+    pOp->separateEvents(true);
 
     int n_columns= pTabs[i]->getNoOfColumns();
     for (int j = 0; j < n_columns; j++)

--- 1.1/storage/ndb/ndbapi-examples/ndbapi_event/Makefile	2005-09-15 14:19:43 +02:00
+++ 1.2/storage/ndb/ndbapi-examples/ndbapi_event/Makefile	2005-12-05 15:19:27 +01:00
@@ -1,7 +1,7 @@
 TARGET = ndbapi_event
 SRCS = ndbapi_event.cpp
 OBJS = ndbapi_event.o
-CXX = g++
+CXX = g++ -g
 CFLAGS = -c -Wall -fno-rtti -fno-exceptions
 CXXFLAGS = 
 DEBUG = 
@@ -17,7 +17,7 @@
 	$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
 
 $(TARGET).o: $(SRCS)
-	$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS)
+	$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
 
 clean:
 	rm -f *.o $(TARGET)

--- 1.1/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp	2005-09-15 12:53:48 +02:00
+++ 1.2/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp	2005-12-05 15:19:27 +01:00
@@ -58,24 +58,29 @@
 
 /**
  *
- * Assume that there is a table TAB0 which is being updated by 
+ * Assume that there is a table t0 which is being updated by 
  * another process (e.g. flexBench -l 0 -stdtables).
- * We want to monitor what happens with columns COL0, COL2, COL11
+ * We want to monitor what happens with columns c0,c1,c2,c3.
  *
  * or together with the mysql client;
  *
  * shell> mysql -u root
  * mysql> create database TEST_DB;
  * mysql> use TEST_DB;
- * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
+ * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
+ *        primary key(c0, c2)) engine ndb charset latin1;
  *
  * In another window start ndbapi_event, wait until properly started
  *
-   insert into TAB0 values (1,2,3);
-   insert into TAB0 values (2,2,3);
-   insert into TAB0 values (3,2,9);
-   update TAB0 set COL1=10 where COL0=1;
-   delete from TAB0 where COL0=1;
+   insert into t0 values (1, 2, 'a', 'b');
+   insert into t0 values (3, 4, 'c', 'd');
+   update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
+   update t0 set c3 = 'f'; -- use scan
+   update t0 set c3 = 'F'; -- use scan update to 'same'
+   update t0 set c2 = 'g' where c0 = 1; -- update pk part
+   update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
+   update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
+   delete from t0;
  *
  * you should see the data popping up in the example window
  *
@@ -92,9 +97,10 @@
 		  const char **eventColumnName,
 		  const int noEventColumnName);
 
-int main()
+int main(int argc, char** argv)
 {
   ndb_init();
+  bool sep = argc > 1 && strcmp(argv[1], "-s") == 0;
 
   Ndb_cluster_connection *cluster_connection=
     new Ndb_cluster_connection(); // Object representing the cluster
@@ -126,13 +132,15 @@
 
   if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
 
-  const char *eventName= "CHNG_IN_TAB0";
-  const char *eventTableName= "TAB0";
-  const int noEventColumnName= 3;
+  const char *eventName= "CHNG_IN_t0";
+  const char *eventTableName= "t0";
+  const int noEventColumnName= 4;
   const char *eventColumnName[noEventColumnName]=
-    {"COL0",
-     "COL1",
-     "COL11"};
+    {"c0",
+     "c1",
+     "c2",
+     "c3"
+    };
   
   // Create events
   myCreateEvent(myNdb,
@@ -142,13 +150,14 @@
 		noEventColumnName);
 
   int j= 0;
-  while (j < 5) {
+  while (j < 99) {
 
     // Start "transaction" for handling events
     NdbEventOperation* op;
     printf("create EventOperation\n");
     if ((op = myNdb->createEventOperation(eventName)) == NULL)
       APIERROR(myNdb->getNdbError());
+    op->separateEvents(sep);
 
     printf("get values\n");
     NdbRecAttr* recAttr[noEventColumnName];
@@ -175,34 +184,45 @@
 	  i++;
 	  switch (op->getEventType()) {
 	  case NdbDictionary::Event::TE_INSERT:
-	    printf("%u INSERT: ", i);
+	    printf("%u INSERT", i);
 	    break;
 	  case NdbDictionary::Event::TE_DELETE:
-	    printf("%u DELETE: ", i);
+	    printf("%u DELETE", i);
 	    break;
 	  case NdbDictionary::Event::TE_UPDATE:
-	    printf("%u UPDATE: ", i);
+	    printf("%u UPDATE", i);
 	    break;
 	  default:
 	    abort(); // should not happen
 	  }
-	  for (int i = 1; i < noEventColumnName; i++) {
+          printf(" gci=%d\n", op->getGCI());
+          printf("post:  ");
+	  for (int i = 0; i < noEventColumnName; i++) {
 	    if (recAttr[i]->isNULL() >= 0) { // we have a value
-	      printf(" post[%u]=", i);
-	      if (recAttr[i]->isNULL() == 0) // we have a non-null value
-		printf("%u", recAttr[i]->u_32_value());
-	      else                           // we have a null value
-		printf("NULL");
-	    }
+	      if (recAttr[i]->isNULL() == 0) { // we have a non-null value
+                if (i < 2)
+                  printf("%-5u", recAttr[i]->u_32_value());
+                else
+                  printf("%-5.4s", recAttr[i]->aRef());
+              } else                           // we have a null value
+		printf("%-5s", "NULL");
+	    } else
+              printf("%-5s", "-");
+          }
+          printf("\npre :  ");
+	  for (int i = 0; i < noEventColumnName; i++) {
 	    if (recAttrPre[i]->isNULL() >= 0) { // we have a value
-	      printf(" pre[%u]=", i);
-	      if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
-		printf("%u", recAttrPre[i]->u_32_value());
-	      else                              // we have a null value
-		printf("NULL");
-	    }
+	      if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
+                if (i < 2)
+                  printf("%-5u", recAttrPre[i]->u_32_value());
+                else
+                  printf("%-5.4s", recAttrPre[i]->aRef());
+              } else                              // we have a null value
+		printf("%-5s", "NULL");
+	    } else
+              printf("%-5s", "-");
 	  }
-	  printf("\n");
+          printf("\n");
 	}
       } else
 	;//printf("timed out\n");
Thread
bk commit into 5.1 tree (pekka:1.1950)pekka5 Dec