List:Commits« Previous MessageNext Message »
From:pekka Date:January 19 2006 1:01pm
Subject:bk commit into 5.1 tree (pekka:1.2074)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2074 06/01/19 14:01:32 pekka@stripped +13 -0
  ndb - wl#2972  rbr blobs ndb api support

  storage/ndb/test/ndbapi/test_event_merge.cpp
    1.9 06/01/19 14:00:03 pekka@stripped +77 -67
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
    1.12 06/01/19 14:00:03 pekka@stripped +37 -2
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.33 06/01/19 14:00:03 pekka@stripped +429 -49
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbEventOperation.cpp
    1.10 06/01/19 14:00:03 pekka@stripped +12 -0
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.42 06/01/19 14:00:03 pekka@stripped +4 -1
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.98 06/01/19 14:00:03 pekka@stripped +82 -8
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbDictionary.cpp
    1.46 06/01/19 14:00:03 pekka@stripped +5 -0
    rbr blobs ndb api support

  storage/ndb/src/ndbapi/NdbBlob.cpp
    1.31 06/01/19 14:00:03 pekka@stripped +227 -40
    rbr blobs ndb api support

  storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
    1.4 06/01/19 14:00:03 pekka@stripped +117 -52
    rbr blobs ndb api support

  storage/ndb/ndbapi-examples/ndbapi_event/Makefile
    1.3 06/01/19 14:00:03 pekka@stripped +3 -3
    rbr blobs ndb api support

  storage/ndb/include/ndbapi/NdbEventOperation.hpp
    1.18 06/01/19 14:00:03 pekka@stripped +8 -0
    rbr blobs ndb api support

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.63 06/01/19 14:00:03 pekka@stripped +19 -1
    rbr blobs ndb api support

  storage/ndb/include/ndbapi/NdbBlob.hpp
    1.18 06/01/19 14:00:03 pekka@stripped +37 -1
    rbr blobs ndb api support

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51-rbr

--- 1.62/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-17 09:24:52 +01:00
+++ 1.63/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-19 14:00:03 +01:00
@@ -1124,7 +1124,7 @@
       _TE_NODE_FAILURE=10,
       _TE_SUBSCRIBE=11,
       _TE_UNSUBSCRIBE=12,
-      _TE_NUL=13 // internal (INS o DEL within same GCI)
+      _TE_NUL=13 // internal (e.g. INS o DEL within same GCI)
     };
 #endif
     /**
@@ -1260,6 +1260,24 @@
      * @return Number of columns, -1 on error
      */
     int getNoOfEventColumns() const;
+
+    /**
+     * The merge events flag is false by default.  Setting it true
+     * implies that events are merged in following ways:
+     *
+     * - for given NdbEventOperation associated with this event,
+     *   events on same PK within same GCI are merged into single event
+     *
+     * - a blob table event is created for each blob attribute
+     *   and blob events are handled as part of main table events
+     *
+     * - blob post/pre data from the blob part events can be read
+     *   via NdbBlob methods as a single value
+     *
+     * NOTE: Currently this flag is not inherited by NdbEventOperation
+     * and must be set on NdbEventOperation explicitly.
+     */
+    void mergeEvents(bool flag);
 
     /**
      * Get object status

--- 1.17/storage/ndb/include/ndbapi/NdbEventOperation.hpp	2006-01-13 18:01:27 +01:00
+++ 1.18/storage/ndb/include/ndbapi/NdbEventOperation.hpp	2006-01-19 14:00:03 +01:00
@@ -150,6 +150,14 @@
    */
   NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0);
 
+  /**
+   * These methods replace getValue/getPreValue for blobs.  Each
+   * method creates a blob handle NdbBlob.  The handle supports only
+   * read operations.  See NdbBlob.
+   */
+  NdbBlob* getBlobHandle(const char *anAttrName);
+  NdbBlob* getPreBlobHandle(const char *anAttrName);
+
   int isOverrun() const;
 
   /**

--- 1.45/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-17 09:24:54 +01:00
+++ 1.46/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-19 14:00:03 +01:00
@@ -901,6 +901,11 @@
   return m_impl.getNoOfEventColumns();
 }
 
+void NdbDictionary::Event::mergeEvents(bool flag)
+{
+  m_impl.m_mergeEvents = flag;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::Event::getObjectStatus() const
 {

--- 1.97/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-17 09:24:54 +01:00
+++ 1.98/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-19 14:00:03 +01:00
@@ -1072,6 +1072,7 @@
   m_tableId= RNIL;
   mi_type= 0;
   m_dur= NdbDictionary::Event::ED_UNDEFINED;
+  m_mergeEvents = false;
   m_tableImpl= NULL;
   m_rep= NdbDictionary::Event::ER_UPDATED;
 }
@@ -2036,7 +2037,7 @@
 NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
 {
   unsigned n= t.m_noOfBlobs;
-  DBUG_ENTER("NdbDictioanryImpl::addBlobTables");
+  DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
   // optimized for blob column being the last one
   // and not looking for more than one if not neccessary
   for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
@@ -3151,7 +3152,37 @@
 #endif
 
   // NdbDictInterface m_receiver;
-  DBUG_RETURN(m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */));
+  if (m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */) != 0)
+    DBUG_RETURN(-1);
+
+  // Create blob events
+  if (evnt.m_mergeEvents && createBlobEvents(evnt) != 0) {
+    int save_code = m_error.code;
+    (void)dropEvent(evnt.m_name.c_str());
+    m_error.code = save_code;
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictionaryImpl::createBlobEvents(NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::createBlobEvents");
+  NdbTableImpl& t = *evnt.m_tableImpl;
+  Uint32 n = t.m_noOfBlobs;
+  Uint32 i;
+  for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+    NdbColumnImpl & c = *evnt.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    n--;
+    NdbEventImpl blob_evnt;
+    NdbBlob::getBlobEvent(blob_evnt, &evnt, &c);
+    if (createEvent(blob_evnt) != 0)
+      DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
 }
 
 int
@@ -3400,6 +3431,7 @@
   
   if ( attributeList_sz > table.getNoOfColumns() )
   {
+    m_error.code = 241;
     DBUG_PRINT("error",("Invalid version, too many columns"));
     delete ev;
     DBUG_RETURN(NULL);
@@ -3409,6 +3441,7 @@
   for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
     if ( id >= table.getNoOfColumns())
     {
+      m_error.code = 241;
       DBUG_PRINT("error",("Invalid version, column %d out of range", id));
       delete ev;
       DBUG_RETURN(NULL);
@@ -3566,13 +3599,54 @@
 int 
 NdbDictionaryImpl::dropEvent(const char * eventName)
 {
-  NdbEventImpl *ev= new NdbEventImpl();
-  ev->setName(eventName);
-  int ret= m_receiver.dropEvent(*ev);
-  delete ev;  
+  DBUG_ENTER("NdbDictionaryImpl::dropEvent");
+  DBUG_PRINT("info", ("name=%s", eventName));
 
-  //  printf("__________________RET %u\n", ret);
-  return ret;
+  NdbEventImpl *evnt = getEvent(eventName); // allocated
+  if (evnt == NULL) {
+    if (m_error.code != 723 && // no such table
+        m_error.code != 241)   // invalid table
+      DBUG_RETURN(-1);
+    DBUG_PRINT("info", ("no table, drop by name alone"));
+    evnt = new NdbEventImpl();
+    evnt->setName(eventName);
+  }
+  int ret = dropEvent(*evnt);
+  delete evnt;  
+  DBUG_RETURN(ret);
+}
+
+int
+NdbDictionaryImpl::dropEvent(const NdbEventImpl& evnt)
+{
+  if (dropBlobEvents(evnt) != 0)
+    return -1;
+  if (m_receiver.dropEvent(evnt) != 0)
+    return -1;
+  return 0;
+}
+
+int
+NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropBlobEvents");
+  if (evnt.m_tableImpl != 0) {
+    const NdbTableImpl& t = *evnt.m_tableImpl;
+    Uint32 n = t.m_noOfBlobs;
+    Uint32 i;
+    for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+      const NdbColumnImpl& c = *evnt.m_columns[i];
+      if (! c.getBlobType() || c.getPartSize() == 0)
+        continue;
+      n--;
+      char bename[MAX_TAB_NAME_SIZE];
+      NdbBlob::getBlobEventName(bename, &evnt, &c);
+      (void)dropEvent(bename);
+    }
+  } else {
+    // could loop over MAX_ATTRIBUTES_IN_TABLE ...
+  }
+  DBUG_RETURN(0);
 }
 
 int

--- 1.41/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-17 09:24:54 +01:00
+++ 1.42/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-19 14:00:03 +01:00
@@ -277,7 +277,6 @@
   NdbDictionary::Event::EventDurability  getDurability() const;
   void setReport(NdbDictionary::Event::EventReport r);
   NdbDictionary::Event::EventReport  getReport() const;
-  void addEventColumn(const NdbColumnImpl &c);
   int getNoOfEventColumns() const;
 
   void print() {
@@ -295,6 +294,7 @@
   Uint32 mi_type;
   NdbDictionary::Event::EventDurability m_dur;
   NdbDictionary::Event::EventReport m_rep;
+  bool m_mergeEvents;
 
   NdbTableImpl *m_tableImpl;
   BaseString m_tableName;
@@ -547,7 +547,10 @@
 			       NdbTableImpl * table);
 
   int createEvent(NdbEventImpl &);
+  int createBlobEvents(NdbEventImpl &);
   int dropEvent(const char * eventName);
+  int dropEvent(const NdbEventImpl &);
+  int dropBlobEvents(const NdbEventImpl &);
 
   int executeSubscribeEvent(NdbEventOperationImpl &);
   int stopSubscribeEvent(NdbEventOperationImpl &);

--- 1.9/storage/ndb/src/ndbapi/NdbEventOperation.cpp	2006-01-13 18:01:27 +01:00
+++ 1.10/storage/ndb/src/ndbapi/NdbEventOperation.cpp	2006-01-19 14:00:03 +01:00
@@ -55,6 +55,18 @@
   return m_impl.getValue(colName, aValue, 1);
 }
 
+NdbBlob *
+NdbEventOperation::getBlobHandle(const char *colName)
+{
+  return m_impl.getBlobHandle(colName, 0);
+}
+
+NdbBlob *
+NdbEventOperation::getPreBlobHandle(const char *colName)
+{
+  return m_impl.getBlobHandle(colName, 1);
+}
+
 int
 NdbEventOperation::execute()
 {

--- 1.32/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2006-01-17 07:36:41 +01:00
+++ 1.33/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2006-01-19 14:00:03 +01:00
@@ -38,6 +38,7 @@
 #include "DictCache.hpp"
 #include <portlib/NdbMem.h>
 #include <NdbRecAttr.hpp>
+#include <NdbBlob.hpp>
 #include <NdbEventOperation.hpp>
 #include "NdbEventOperationImpl.hpp"
 
@@ -48,6 +49,20 @@
 static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4;
 static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
 
+#ifdef VM_TRACE
+static void
+print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
+{
+  printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
+  for (int i = 0; i <= 2; i++) {
+    printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
+    for (int j = 0; j < ptr[i].sz; j++)
+      printf("%08x ", ptr[i].p[j]);
+    printf("\n");
+  }
+}
+#endif
+
 /*
  * Class NdbEventOperationImpl
  *
@@ -60,7 +75,7 @@
 #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
 #define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
 #define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
-#define DBUG_DUMP_EVENT(A,B,C) DBUG_SUMP(A,B,C)
+#define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
 #else
 #define DBUG_ENTER_EVENT(A)
 #define DBUG_RETURN_EVENT(A) return(A)
@@ -92,6 +107,11 @@
   theCurrentDataAttrs[0] = NULL;
   theFirstDataAttrs[1] = NULL;
   theCurrentDataAttrs[1] = NULL;
+
+  theBlobList = NULL;
+  theBlobOpList = NULL;
+  theMainOp = NULL;
+
   m_data_item= NULL;
   m_eventImpl = NULL;
 
@@ -117,7 +137,11 @@
 
   m_state= EO_CREATED;
 
-  m_mergeEvents = false;
+#ifdef ndb_event_stores_merge_events_flag
+  m_mergeEvents = m_eventImpl->m_mergeEvents;
+#else
+   m_mergeEvents = false;
+#endif
 
   m_has_error= 0;
 
@@ -254,10 +278,183 @@
   DBUG_RETURN(tAttr);
 }
 
+NdbBlob*
+NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
+{
+  DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
+
+  assert(m_mergeEvents);
+
+  if (m_state != EO_CREATED) {
+    ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
+	     "instantiation and execute()");
+    DBUG_RETURN(NULL);
+  }
+
+  NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
+
+  if (tAttrInfo == NULL) {
+    ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
+    DBUG_RETURN(NULL);
+  }
+
+  NdbBlob* bh = getBlobHandle(tAttrInfo, n);
+  DBUG_RETURN(bh);
+}
+
+NdbBlob*
+NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
+{
+  DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
+  DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
+  
+  // as in NdbOperation, create only one instance
+  NdbBlob* tBlob = theBlobList;
+  NdbBlob* tLastBlob = NULL;
+  while (tBlob != NULL) {
+    if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
+      DBUG_RETURN(tBlob);
+    tLastBlob = tBlob;
+    tBlob = tBlob->theNext;
+  }
+
+  // blob event name
+  char bename[MAX_TAB_NAME_SIZE];
+  NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
+
+  // find blob event op if any (it serves both post and pre handles)
+  assert(tAttrInfo->m_blobTable != NULL);
+  NdbEventOperationImpl* tBlobOp = theBlobOpList;
+  NdbEventOperationImpl* tLastBlopOp = NULL;
+  while (tBlobOp != NULL) {
+    if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
+      assert(tBlobOp->m_eventImpl->m_tableImpl == tAttrInfo->m_blobTable);
+      break;
+    }
+    tLastBlopOp = tBlobOp;
+    tBlobOp = tBlobOp->theNextBlobOp;
+  }
+
+  DBUG_PRINT("info", ("%s op %s", tBlobOp ? " reuse" : " create", bename));
+
+  // create blob event op if not found
+  if (tBlobOp == NULL) {
+    NdbEventOperation* tmp = m_ndb->createEventOperation(bename);
+    if (tmp == NULL)
+      DBUG_RETURN(NULL);
+    tBlobOp = &tmp->m_impl;
+
+    // pointer to main table op
+    tBlobOp->theMainOp = this;
+    tBlobOp->m_mergeEvents = m_mergeEvents;
+
+    // add to list end
+    if (tLastBlopOp == NULL)
+      theBlobOpList = tBlobOp;
+    else
+      tLastBlopOp->theNextBlobOp = tBlobOp;
+    tBlobOp->theNextBlobOp = NULL;
+  }
+
+  tBlob = m_ndb->getNdbBlob();
+  if (tBlob == NULL)
+    DBUG_RETURN(NULL);
+
+  // calls getValue on inline and blob part
+  if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
+    m_ndb->releaseNdbBlob(tBlob);
+    DBUG_RETURN(NULL);
+  }
+
+  // add to list end
+  if (tLastBlob == NULL)
+    theBlobList = tBlob;
+  else
+    tLastBlob->theNext = tBlob;
+  tBlob->theNext = NULL;
+  DBUG_RETURN(tBlob);
+}
+
+int
+NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
+                                     Uint32 part, Uint32 count)
+{
+  DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
+  DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
+                      part, count, blob->theEventBlobVersion));
+
+  NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
+
+  EventBufData* main_data = m_data_item;
+  DBUG_PRINT_EVENT("info", ("main_data=%p", main_data));
+  assert(main_data != NULL);
+
+  // search for blob parts list head
+  EventBufData* head;
+  assert(m_data_item != NULL);
+  head = m_data_item->m_next_blob;
+  while (head != NULL)
+  {
+    if (head->m_event_op == blob_op)
+    {
+      DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
+      break;
+    }
+    head = head->m_next_blob;
+  }
+
+  Uint32 nparts = 0;
+  EventBufData* data = head;
+  // XXX optimize using part no ordering
+  while (data != NULL)
+  {
+    /*
+     * Hack part no directly out of buffer since it is not returned
+     * in pre data (PK buglet).  For part data use receive_event().
+     * This means extra copy.
+     */
+    blob_op->m_data_item = data;
+    int r = blob_op->receive_event();
+    assert(r > 0);
+    Uint32 no = data->get_blob_part_no();
+    Uint32 sz = blob->thePartSize;
+    const char* src = blob->theBlobEventDataBuf.data;
+
+    DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part sz=%u", data, no, sz));
+
+    if (part <= no && no < part + count)
+    {
+      DBUG_PRINT_EVENT("info", ("part within read range"));
+      memcpy(buf + (no - part) * sz, src, sz);
+      nparts++;
+    }
+    else
+    {
+      DBUG_PRINT_EVENT("info", ("part outside read range"));
+    }
+    data = data->m_next;
+  }
+  assert(nparts == count);
+
+  DBUG_RETURN_EVENT(0);
+}
+
 int
 NdbEventOperationImpl::execute()
 {
   DBUG_ENTER("NdbEventOperationImpl::execute");
+  m_ndb->theEventBuffer->add_drop_lock();
+  int r = execute_nolock();
+  m_ndb->theEventBuffer->add_drop_unlock();
+  DBUG_RETURN(r);
+}
+
+int
+NdbEventOperationImpl::execute_nolock()
+{
+  DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
+  DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
+
   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
   if (!myDict) {
     m_error.code= m_ndb->getNdbError().code;
@@ -266,18 +463,26 @@
 
   if (theFirstPkAttrs[0] == NULL && 
       theFirstDataAttrs[0] == NULL) { // defaults to get all
-    
   }
 
-  m_ndb->theEventBuffer->add_drop_lock();
   m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
   m_state= EO_EXECUTING;
   mi_type= m_eventImpl->mi_type;
   m_ndb->theEventBuffer->add_op();
   int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
   if (r == 0) {
-    m_ndb->theEventBuffer->add_drop_unlock();
-    DBUG_RETURN(0);
+    if (theMainOp == NULL) {
+      DBUG_PRINT("info", ("execute blob ops"));
+      NdbEventOperationImpl* blob_op = theBlobOpList;
+      while (blob_op != NULL) {
+        r = blob_op->execute_nolock();
+        if (r != 0)
+          break;
+        blob_op = blob_op->theNextBlobOp;
+      }
+    }
+    if (r == 0)
+      DBUG_RETURN(0);
   }
   //Error
   m_state= EO_ERROR;
@@ -285,7 +490,6 @@
   m_magic_number= 0;
   m_error.code= myDict->getNdbError().code;
   m_ndb->theEventBuffer->remove_op();
-  m_ndb->theEventBuffer->add_drop_unlock();
   DBUG_RETURN(r);
 }
 
@@ -709,21 +913,6 @@
   return ret;
 }
 
-#ifdef VM_TRACE
-static void
-print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3])
-{
-  printf("%s\n", tag);
-  printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
-  for (int i = 0; i <= 2; i++) {
-    printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
-    for (int j = 0; j < ptr[i].sz; j++)
-      printf("%08x ", ptr[i].p[j]);
-    printf("\n");
-  }
-}
-#endif
-
 NdbEventOperation *
 NdbEventBuffer::nextEvent()
 {
@@ -751,6 +940,7 @@
   while ((data= m_available_data.m_head))
   {
     NdbEventOperationImpl *op= data->m_event_op;
+    DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
 
     // set NdbEventOperation data
     op->m_data_item= data;
@@ -767,7 +957,10 @@
 
     // NUL event is not returned
     if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
+    {
+      DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
       continue;
+    }
 
     int r= op->receive_event();
     if (r > 0)
@@ -777,6 +970,12 @@
 #ifdef VM_TRACE
 	m_latest_command= m_latest_command_save;
 #endif
+        NdbBlob* tBlob = op->theBlobList;
+        while (tBlob != NULL)
+        {
+          (void)tBlob->atNextEvent();
+          tBlob = tBlob->theNext;
+        }
 	DBUG_RETURN_EVENT(op->m_facade);
       }
       // the next event belonged to an event op that is no
@@ -1161,7 +1360,7 @@
   DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
   Uint64 gci= sdata->gci;
 
-  if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) )
+  if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
   {
     Gci_container* bucket= find_bucket(&m_active_gci, gci);
       
@@ -1179,9 +1378,9 @@
       DBUG_RETURN_EVENT(0);
     }
 
-    bool use_hash =
-      op->m_mergeEvents &&
+    const bool is_data_event =
       sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
+    const bool use_hash =  op->m_mergeEvents && is_data_event;
 
     // find position in bucket hash table
     EventBufData* data = 0;
@@ -1207,10 +1406,38 @@
         op->m_has_error = 3;
         DBUG_RETURN_EVENT(-1);
       }
-      // add it to list and hash table
-      bucket->m_data.append(data);
+      data->m_event_op = op;
+      if (op->theMainOp == NULL || ! is_data_event)
+      {
+        bucket->m_data.append(data);
+      }
+      else
+      {
+        // find or create main event for this blob event
+        EventBufData_hash::Pos main_hpos;
+        int ret = get_main_data(bucket, main_hpos, data);
+        if (ret == -1)
+        {
+          op->m_has_error = 4;
+          DBUG_RETURN_EVENT(-1);
+        }
+        EventBufData* main_data = main_hpos.data;
+        if (ret != 0) // main event was created
+        {
+          main_data->m_event_op = op->theMainOp;
+          bucket->m_data.append(main_data);
+          if (use_hash)
+          {
+            main_data->m_pkhash = main_hpos.pkhash;
+            bucket->m_data_hash.append(main_hpos, main_data);
+          }
+        }
+        // link blob event under main event
+        add_blob_data(main_data, data);
+      }
       if (use_hash)
       {
+        data->m_pkhash = hpos.pkhash;
         bucket->m_data_hash.append(hpos, data);
       }
 #ifdef VM_TRACE
@@ -1226,18 +1453,12 @@
         DBUG_RETURN_EVENT(-1);
       }
     }
-    data->m_event_op = op;
-    if (use_hash)
-    {
-      data->m_pkhash = hpos.pkhash;
-    }
     DBUG_RETURN_EVENT(0);
   }
 
 #ifdef VM_TRACE
-  if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation)
+  if ((Uint32)op->m_eventImpl->mi_type & (1 << (Uint32)sdata->operation))
   {
-    // XXX never reached
     DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
     DBUG_RETURN_EVENT(0);
   }
@@ -1300,6 +1521,8 @@
 int
 NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
 {
+  DBUG_ENTER("NdbEventBuffer::alloc_mem");
+  DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
   const Uint32 min_alloc_size = 128;
 
   Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
@@ -1317,7 +1540,7 @@
 
     data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
     if (data->memory == 0)
-      return -1;
+      DBUG_RETURN(-1);
     data->sz = alloc_size;
     m_total_alloc += data->sz;
   }
@@ -1332,7 +1555,7 @@
     memptr += ptr[i].sz;
   }
 
-  return 0;
+  DBUG_RETURN(0);
 }
 
 int 
@@ -1404,13 +1627,10 @@
   {
     Uint32 k;
     for (k = 0; k < n; k++)
-      p1[j1++] = p2[j2++];
-  }
-  else
-  {
-    j1 += n;
-    j2 += n;
+      p1[j1 + k] = p2[j2 + k];
   }
+  j1 += n;
+  j2 += n;
 }
 
 int 
@@ -1443,8 +1663,8 @@
   data->sz = 0;
 
   // compose ptr1 o ptr2 = ptr
-  LinearSectionPtr (&ptr1) [3] = olddata.ptr;
-  LinearSectionPtr (&ptr) [3] = data->ptr;
+  LinearSectionPtr (&ptr1)[3] = olddata.ptr;
+  LinearSectionPtr (&ptr)[3] = data->ptr;
 
   // loop twice where first loop only sets sizes
   int loop;
@@ -1458,7 +1678,7 @@
       data->sdata->operation = tp->t3;
     }
 
-    ptr[0].sz = ptr[1].sz = ptr[3].sz = 0;
+    ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
 
     // copy pk from new version
     {
@@ -1572,6 +1792,113 @@
 
   DBUG_RETURN_EVENT(0);
 }
+ 
+/*
+ * Given blob part event, find main table event on inline part.  It
+ * should exist (force in TUP) but may arrive later.  If so, create
+ * NUL event on main table.  The real event replaces it later.
+ */
+
+// write attribute headers for concatened PK
+static void
+split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer,
+                      const Uint32* pk_buffer, Uint32 pk_sz)
+{
+  Uint32 sz = 0; // words parsed so far
+  Uint32 n;  // pk attr count
+  Uint32 i;
+  for (i = n = 0; i < t->m_columns.size() && n < t->m_noOfKeys; i++)
+  {
+    const NdbColumnImpl* c = t->getColumn(i);
+    assert(c != NULL);
+    if (! c->m_pk)
+      continue;
+
+    assert(sz < pk_sz);
+    Uint32 bytesize = c->m_attrSize * c->m_arraySize;
+    Uint32 lb, len;
+    bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_buffer[sz], bytesize,
+                                         lb, len);
+    assert(ok);
+
+    AttributeHeader ah(i, lb + len);
+    ah_buffer[n++] = ah.m_value;
+    sz += ah.getDataSize();
+  }
+  assert(n == t->m_noOfKeys && sz == pk_sz);
+}
+
+int
+NdbEventBuffer::get_main_data(Gci_container* bucket,
+                              EventBufData_hash::Pos& hpos,
+                              EventBufData* blob_data)
+{
+  DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
+
+  NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
+  assert(main_op != NULL);
+  const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
+
+  // create LinearSectionPtr for main table key
+  LinearSectionPtr ptr[3];
+  Uint32 ah_buffer[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
+  ptr[0].sz = mainTable->m_noOfKeys;
+  ptr[0].p = ah_buffer;
+  ptr[1].sz = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
+  ptr[1].p = blob_data->ptr[1].p;
+  ptr[2].sz = 0;
+  ptr[2].p = 0;
+  split_concatenated_pk(mainTable, ptr[0].p, ptr[1].p, ptr[1].sz);
+
+  DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
+  DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
+
+  // search for main event buffer
+  bucket->m_data_hash.search(hpos, main_op, ptr);
+  if (hpos.data != NULL)
+    DBUG_RETURN_EVENT(0);
+
+  // not found, create a place-holder
+  EventBufData* main_data = alloc_data();
+  if (main_data == NULL)
+    DBUG_RETURN_EVENT(-1);
+  SubTableData sdata = *blob_data->sdata;
+  sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
+  sdata.operation = NdbDictionary::Event::_TE_NUL;
+  if (copy_data(&sdata, ptr, main_data) != 0)
+    DBUG_RETURN_EVENT(-1);
+  hpos.data = main_data;
+
+  DBUG_RETURN_EVENT(1);
+}
+
+void
+NdbEventBuffer::add_blob_data(EventBufData* main_data,
+                              EventBufData* blob_data)
+{
+  DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
+  DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data));
+  EventBufData* head;
+  head = main_data->m_next_blob;
+  while (head != NULL)
+  {
+    if (head->m_event_op == blob_data->m_event_op)
+      break;
+    head = head->m_next_blob;
+  }
+  if (head == NULL)
+  {
+    head = blob_data;
+    head->m_next_blob = main_data->m_next_blob;
+    main_data->m_next_blob = head;
+  }
+  else
+  {
+    blob_data->m_next = head->m_next;
+    head->m_next = blob_data;
+  }
+  DBUG_VOID_RETURN_EVENT;
+}
 
 NdbEventOperationImpl *
 NdbEventBuffer::move_data()
@@ -1613,6 +1940,31 @@
 #endif
   m_free_data_sz+= list.m_sz;
 
+  // free blobs XXX unacceptable performance, fix later
+  {
+    EventBufData* data = list.m_head;
+    while (1) {
+      while (data->m_next_blob != NULL) {
+        EventBufData* blob_head = data->m_next_blob;
+        data->m_next_blob = blob_head->m_next_blob;
+        blob_head->m_next_blob = NULL;
+        while (blob_head != NULL) {
+          EventBufData* blob_part = blob_head;
+          blob_head = blob_head->m_next;
+          blob_part->m_next = m_free_data;
+          m_free_data = blob_part;
+#ifdef VM_TRACE
+          m_free_data_count++;
+#endif
+          m_free_data_sz += blob_part->sz;
+        }
+      }
+      if (data == list.m_tail)
+        break;
+      data = data->m_next;
+    }
+  }
+
   // list returned to m_free_data
   new (&list) EventBufData_list;
 }
@@ -1648,6 +2000,14 @@
   if (m_dropped_ev_op)
     m_dropped_ev_op->m_prev= op;
   m_dropped_ev_op= op;
+ 
+  // drop blob ops
+  while (op->theBlobOpList != NULL)
+  {
+    NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
+    op->theBlobOpList = op->theBlobOpList->theNextBlobOp;
+    (void)m_ndb->dropEventOperation(tBlobOp);
+  }
 
   // ToDo, take care of these to be deleted at the
   // appropriate time, after we are sure that there
@@ -1717,6 +2077,10 @@
 Uint32
 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
 {
+  DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
+  DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
+  DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
+
   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
 
   // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
@@ -1747,13 +2111,19 @@
     (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
     dptr += ((bytesize + 3) / 4) * 4;
   }
-  return nr1;
+  DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
+  DBUG_RETURN_EVENT(nr1);
 }
 
-// this is seldom invoked
 bool
 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
 {
+  DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
+  DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
+  DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
+  DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
+  DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
+
   const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
 
   Uint32 nkey = tab->m_noOfKeys;
@@ -1763,6 +2133,8 @@
   const uchar* dptr1 = (uchar*)ptr1[1].p;
   const uchar* dptr2 = (uchar*)ptr2[1].p;
 
+  bool equal = true;
+
   while (nkey-- != 0)
   {
     AttributeHeader ah1(*hptr1++);
@@ -1787,16 +2159,22 @@
     CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
     int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
     if (res != 0)
-      return false;
+    {
+      equal = false;
+      break;
+    }
     dptr1 += ((bytesize1 + 3) / 4) * 4;
     dptr2 += ((bytesize2 + 3) / 4) * 4;
   }
-  return true;
+
+  DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
+  DBUG_RETURN_EVENT(equal);
 }
 
 void
 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
 {
+  DBUG_ENTER_EVENT("EventBufData_hash::search");
   Uint32 pkhash = getpkhash(op, ptr);
   Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
   EventBufData* data = m_hash[index];
@@ -1811,6 +2189,8 @@
   hpos.index = index;
   hpos.data = data;
   hpos.pkhash = pkhash;
+  DBUG_PRINT_EVENT("info", ("search result=%p", data));
+  DBUG_VOID_RETURN_EVENT;
 }
 
 template class Vector<Gci_container>;

--- 1.11/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2006-01-13 18:01:27 +01:00
+++ 1.12/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2006-01-19 14:00:03 +01:00
@@ -21,6 +21,7 @@
 #include <signaldata/SumaImpl.hpp>
 #include <transporter/TransporterDefinitions.hpp>
 #include <NdbRecAttr.hpp>
+#include <AttributeHeader.hpp>
 
 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
 
@@ -35,9 +36,28 @@
   LinearSectionPtr ptr[3];
   unsigned sz;
   NdbEventOperationImpl *m_event_op;
-  EventBufData *m_next; // Next wrt to global order
+
+  /*
+   * Blobs are stored in blob list (m_next_blob) where each entry
+   * is list of parts (m_next) in part number order.
+   *
+   * TODO order by part no and link for fast read and free_list
+   */
+
+  EventBufData *m_next; // Next wrt to global order or Next blob part
+  EventBufData *m_next_blob; // First part in next blob
+
   EventBufData *m_next_hash; // Next in per-GCI hash
   Uint32 m_pkhash; // PK hash (without op) for fast compare
+
+  // Get blob part number from blob data
+  Uint32 get_blob_part_no() {
+    assert(ptr[0].sz > 2);
+    Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() +
+                AttributeHeader(ptr[0].p[1]).getDataSize();
+    Uint32 no = ptr[1].p[pos];
+    return no;
+  }
 };
 
 class EventBufData_list
@@ -70,7 +90,6 @@
 {
 }
 
-
 inline
 int EventBufData_list::is_empty()
 {
@@ -173,9 +192,13 @@
   NdbEventOperation::State getState();
 
   int execute();
+  int execute_nolock();
   int stop();
   NdbRecAttr *getValue(const char *colName, char *aValue, int n);
   NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
+  NdbBlob *getBlobHandle(const char *colName, int n);
+  NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
+  int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
   int receive_event();
   Uint64 getGCI();
   Uint64 getLatestGCI();
@@ -199,6 +222,13 @@
   NdbRecAttr *theFirstDataAttrs[2];
   NdbRecAttr *theCurrentDataAttrs[2];
 
+  NdbBlob* theBlobList;
+  union {
+  NdbEventOperationImpl* theBlobOpList;
+  NdbEventOperationImpl* theNextBlobOp;
+  };
+  NdbEventOperationImpl* theMainOp; // blob op pointer to main op
+
   NdbEventOperation::State m_state; /* note connection to mi_type */
   Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
 		   * else same as in EventImpl
@@ -275,6 +305,11 @@
   int merge_data(const SubTableData * const sdata,
                  LinearSectionPtr ptr[3],
                  EventBufData* data);
+  int get_main_data(Gci_container* bucket,
+                    EventBufData_hash::Pos& hpos,
+                    EventBufData* blob_data);
+  void add_blob_data(EventBufData* main_data,
+                     EventBufData* blob_data);
 
   void free_list(EventBufData_list &list);
 

--- 1.17/storage/ndb/include/ndbapi/NdbBlob.hpp	2005-10-06 10:26:06 +02:00
+++ 1.18/storage/ndb/include/ndbapi/NdbBlob.hpp	2006-01-19 14:00:03 +01:00
@@ -28,6 +28,7 @@
 class NdbRecAttr;
 class NdbTableImpl;
 class NdbColumnImpl;
+class NdbEventOperationImpl;
 
 /**
  * @class NdbBlob
@@ -71,6 +72,10 @@
  * writes.  It avoids execute penalty if nothing is pending.  It is not
  * needed after execute (obviously) or after next scan result.
  *
+ * NdbBlob also supports reading post or pre blob data from events.  The
+ * handle can be read after next event on main table has been retrieved.
+ * The data is available immediately.  See NdbEventOperation.
+ *
  * NdbBlob methods return -1 on error and 0 on success, and use output
  * parameters when necessary.
  *
@@ -146,6 +151,12 @@
    */
   int setActiveHook(ActiveHook* activeHook, void* arg);
   /**
+   * Check if blob value is defined (NULL or not).  Used as first call
+   * on event based blob.  The argument is set to -1 for not defined.
+   * Unlike getNull() this does not cause error on the handle.
+   */
+  int getDefined(int& isNull);
+  /**
    * Check if blob is null.
    */
   int getNull(bool& isNull);
@@ -192,6 +203,11 @@
    */
   static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName);
   /**
+   * Get blob event name.  The blob event is created if the main event
+   * monitors the blob column.  The name includes main event name.
+   */
+  static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName);
+  /**
    * Return error object.  The error may be blob specific (below) or may
    * be copied from a failed implicit operation.
    */
@@ -217,17 +233,29 @@
   friend class NdbScanOperation;
   friend class NdbDictionaryImpl;
   friend class NdbResultSet; // atNextResult
+  friend class NdbEventBuffer;
+  friend class NdbEventOperationImpl;
 #endif
   // state
   State theState;
   void setState(State newState);
+  // quick and dirty support for events (consider subclassing)
+  int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event
   // define blob table
   static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c);
   static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c);
+  static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c);
+  static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c);
   // ndb api stuff
   Ndb* theNdb;
   NdbTransaction* theNdbCon;
   NdbOperation* theNdbOp;
+  NdbEventOperationImpl* theEventOp;
+  NdbEventOperationImpl* theBlobEventOp;
+  NdbRecAttr* theBlobEventPkRecAttr;
+  NdbRecAttr* theBlobEventDistRecAttr;
+  NdbRecAttr* theBlobEventPartRecAttr;
+  NdbRecAttr* theBlobEventDataRecAttr;
   const NdbTableImpl* theTable;
   const NdbTableImpl* theAccessTable;
   const NdbTableImpl* theBlobTable;
@@ -263,6 +291,8 @@
   Buf theHeadInlineBuf;
   Buf theHeadInlineCopyBuf;     // for writeTuple
   Buf thePartBuf;
+  Buf theBlobEventDataBuf;
+  Uint32 thePartNumber;         // for event
   Head* theHead;
   char* theInlineData;
   NdbRecAttr* theHeadInlineRecAttr;
@@ -306,6 +336,8 @@
   int readDataPrivate(char* buf, Uint32& bytes);
   int writeDataPrivate(const char* buf, Uint32 bytes);
   int readParts(char* buf, Uint32 part, Uint32 count);
+  int readTableParts(char* buf, Uint32 part, Uint32 count);
+  int readEventParts(char* buf, Uint32 part, Uint32 count);
   int insertParts(const char* buf, Uint32 part, Uint32 count);
   int updateParts(const char* buf, Uint32 part, Uint32 count);
   int deleteParts(Uint32 part, Uint32 count);
@@ -317,19 +349,23 @@
   int invokeActiveHook();
   // blob handle maintenance
   int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
+  int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version);
+  int prepareColumn();
   int preExecute(NdbTransaction::ExecType anExecType, bool& batch);
   int postExecute(NdbTransaction::ExecType anExecType);
   int preCommit();
   int atNextResult();
+  int atNextEvent();
   // errors
   void setErrorCode(int anErrorCode, bool invalidFlag = true);
   void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
   void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true);
+  void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true);
 #ifdef VM_TRACE
   int getOperationType() const;
   friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
 #endif
-
+  // list stuff
   void next(NdbBlob* obj) { theNext= obj;}
   NdbBlob* next() { return theNext;}
   friend struct Ndb_free_list_t<NdbBlob>;

--- 1.30/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-01-17 08:37:33 +01:00
+++ 1.31/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-01-19 14:00:03 +01:00
@@ -23,6 +23,7 @@
 #include <NdbBlob.hpp>
 #include "NdbBlobImpl.hpp"
 #include <NdbScanOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
 
 /*
  * Reading index table directly (as a table) is faster but there are
@@ -147,6 +148,61 @@
   DBUG_VOID_RETURN;
 }
 
+int
+NdbBlob::getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName)
+{
+  NdbEventImpl* e = anNdb->theDictionary->m_impl.getEvent(eventName);
+  if (e == NULL)
+    return -1;
+  NdbColumnImpl* c = e->m_tableImpl->getColumn(columnName);
+  if (c == NULL)
+    return -1;
+  getBlobEventName(bename, e, c);
+  return 0;
+}
+
+void
+NdbBlob::getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c)
+{
+  // XXX events should have object id
+  snprintf(bename, MAX_TAB_NAME_SIZE, "NDB$BLOBEVENT_%s_%d", e->m_name.c_str(), (int)c->m_column_no);
+}
+
+void
+NdbBlob::getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c)
+{
+  DBUG_ENTER("NdbBlob::getBlobEvent");
+  // blob table
+  assert(c->m_blobTable != NULL);
+  const NdbTableImpl& bt = *c->m_blobTable;
+  // blob event name
+  char bename[NdbBlobImpl::BlobTableNameSize];
+  getBlobEventName(bename, e, c);
+  be.setName(bename);
+  be.setTable(bt);
+  // simple assigments
+  be.mi_type = e->mi_type;
+  be.m_dur = e->m_dur;
+  be.m_mergeEvents = e->m_mergeEvents;
+  // report unchanged data
+  // not really needed now since UPD is DEL o INS and we subscribe to all
+  be.setReport(NdbDictionary::Event::ER_ALL);
+  // columns PK - DIST - PART - DATA
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)0);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)1);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)2);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)3);
+    be.addColumn(*bc);
+  }
+  DBUG_VOID_RETURN;
+}
+
 // initialization
 
 NdbBlob::NdbBlob(Ndb*)
@@ -158,9 +214,16 @@
 NdbBlob::init()
 {
   theState = Idle;
+  theEventBlobVersion = -1;
   theNdb = NULL;
   theNdbCon = NULL;
   theNdbOp = NULL;
+  theEventOp = NULL;
+  theBlobEventOp = NULL;
+  theBlobEventPkRecAttr = NULL;
+  theBlobEventDistRecAttr = NULL;
+  theBlobEventPartRecAttr = NULL;
+  theBlobEventDataRecAttr = NULL;
   theTable = NULL;
   theAccessTable = NULL;
   theBlobTable = NULL;
@@ -439,7 +502,7 @@
   DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
   assert(theHeadInlineRecAttr != NULL);
   theNullFlag = theHeadInlineRecAttr->isNULL();
-  assert(theNullFlag != -1);
+  assert(theEventBlobVersion >= 0 || theNullFlag != -1);
   theLength = ! theNullFlag ? theHead->length : 0;
   DBUG_VOID_RETURN;
 }
@@ -544,6 +607,18 @@
 // misc operations
 
 int
+NdbBlob::getDefined(int& isNull)
+{
+  DBUG_ENTER("NdbBlob::getDefined");
+  if (theState == Prepared && theSetFlag) {
+    isNull = (theSetBuf == NULL);
+    DBUG_RETURN(0);
+  }
+  isNull = theNullFlag;
+  DBUG_RETURN(0);
+}
+
+int
 NdbBlob::getNull(bool& isNull)
 {
   DBUG_ENTER("NdbBlob::getNull");
@@ -887,6 +962,18 @@
 {
   DBUG_ENTER("NdbBlob::readParts");
   DBUG_PRINT("info", ("part=%u count=%u", part, count));
+  int ret;
+  if (theEventBlobVersion == -1)
+    ret = readTableParts(buf, part, count);
+  else
+    ret = readEventParts(buf, part, count);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
+{
+  DBUG_ENTER("NdbBlob::readTableParts");
   Uint32 n = 0;
   while (n < count) {
     NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -907,6 +994,18 @@
 }
 
 int
+NdbBlob::readEventParts(char* buf, Uint32 part, Uint32 count)
+{
+  DBUG_ENTER("NdbBlob::readEventParts");
+  int ret = theEventOp->readBlobParts(buf, this, part, count);
+  if (ret != 0) {
+    setErrorCode(theEventOp);
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
 NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
 {
   DBUG_ENTER("NdbBlob::insertParts");
@@ -1094,48 +1193,12 @@
   theTable = anOp->m_currentTable;
   theAccessTable = anOp->m_accessTable;
   theColumn = aColumn;
-  NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
-  switch (theColumn->getType()) {
-  case NdbDictionary::Column::Blob:
-    partType = NdbDictionary::Column::Binary;
-    theFillChar = 0x0;
-    break;
-  case NdbDictionary::Column::Text:
-    partType = NdbDictionary::Column::Char;
-    theFillChar = 0x20;
-    break;
-  default:
-    setErrorCode(NdbBlobImpl::ErrUsage);
+  // prepare blob column and table
+  if (prepareColumn() == -1)
     DBUG_RETURN(-1);
-  }
-  // sizes
-  theInlineSize = theColumn->getInlineSize();
-  thePartSize = theColumn->getPartSize();
-  theStripeSize = theColumn->getStripeSize();
-  // sanity check
-  assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
-  assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
-  if (thePartSize > 0) {
-    const NdbDictionary::Table* bt = NULL;
-    const NdbDictionary::Column* bc = NULL;
-    if (theStripeSize == 0 ||
-        (bt = theColumn->getBlobTable()) == NULL ||
-        (bc = bt->getColumn("DATA")) == NULL ||
-        bc->getType() != partType ||
-        bc->getLength() != (int)thePartSize) {
-      setErrorCode(NdbBlobImpl::ErrTable);
-      DBUG_RETURN(-1);
-    }
-    theBlobTable = &NdbTableImpl::getImpl(*bt);
-  }
-  // buffers
-  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
+  // extra buffers
   theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
-  theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
   theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
-  thePartBuf.alloc(thePartSize);
-  theHead = (Head*)theHeadInlineBuf.data;
-  theInlineData = theHeadInlineBuf.data + sizeof(Head);
   // handle different operation types
   bool supportedOp = false;
   if (isKeyOp()) {
@@ -1189,6 +1252,99 @@
   DBUG_RETURN(0);
 }
 
+int
+NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version)
+{
+  DBUG_ENTER("NdbBlob::atPrepare [event]");
+  DBUG_PRINT("info", ("this=%p op=%p", this, anOp));
+  assert(theState == Idle);
+  assert(version == 0 || version == 1);
+  theEventBlobVersion = version;
+  // ndb api stuff
+  theNdb = anOp->m_ndb;
+  theEventOp = anOp;
+  theBlobEventOp = aBlobOp;
+  theTable = anOp->m_eventImpl->m_tableImpl;
+  theColumn = aColumn;
+  // prepare blob column and table
+  if (prepareColumn() == -1)
+    DBUG_RETURN(-1);
+  // extra buffers
+  theBlobEventDataBuf.alloc(thePartSize);
+  // prepare receive of head+inline
+  theHeadInlineRecAttr = theEventOp->getValue(aColumn, theHeadInlineBuf.data, version);
+  if (theHeadInlineRecAttr == NULL) {
+    setErrorCode(theEventOp);
+    DBUG_RETURN(-1);
+  }
+  // prepare receive of blob part
+  if ((theBlobEventPkRecAttr =
+         theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
+                                  theKeyBuf.data, version)) == NULL ||
+      (theBlobEventDistRecAttr =
+         theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
+                                  (char*)0, version)) == NULL ||
+      (theBlobEventPartRecAttr =
+         theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)2),
+                                  (char*)&thePartNumber, version)) == NULL ||
+      (theBlobEventDataRecAttr =
+         theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)3),
+                                  theBlobEventDataBuf.data, version)) == NULL) {
+    setErrorCode(theBlobEventOp);
+    DBUG_RETURN(-1);
+  }
+  setState(Prepared);
+  DBUG_RETURN(0);
+}
+
+int
+NdbBlob::prepareColumn()
+{
+  DBUG_ENTER("prepareColumn");
+  NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
+  switch (theColumn->getType()) {
+  case NdbDictionary::Column::Blob:
+    partType = NdbDictionary::Column::Binary;
+    theFillChar = 0x0;
+    break;
+  case NdbDictionary::Column::Text:
+    partType = NdbDictionary::Column::Char;
+    theFillChar = 0x20;
+    break;
+  default:
+    setErrorCode(NdbBlobImpl::ErrUsage);
+    DBUG_RETURN(-1);
+  }
+  // sizes
+  theInlineSize = theColumn->getInlineSize();
+  thePartSize = theColumn->getPartSize();
+  theStripeSize = theColumn->getStripeSize();
+  // sanity check
+  assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
+  assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
+  if (thePartSize > 0) {
+    const NdbDictionary::Table* bt = NULL;
+    const NdbDictionary::Column* bc = NULL;
+    if (theStripeSize == 0 ||
+        (bt = theColumn->getBlobTable()) == NULL ||
+        (bc = bt->getColumn("DATA")) == NULL ||
+        bc->getType() != partType ||
+        bc->getLength() != (int)thePartSize) {
+      setErrorCode(NdbBlobImpl::ErrTable);
+      DBUG_RETURN(-1);
+    }
+    // blob table
+    theBlobTable = &NdbTableImpl::getImpl(*bt);
+  }
+  // these buffers are always used
+  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
+  theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
+  theHead = (Head*)theHeadInlineBuf.data;
+  theInlineData = theHeadInlineBuf.data + sizeof(Head);
+  thePartBuf.alloc(thePartSize);
+  DBUG_RETURN(0);
+}
+
 /*
  * Before execute of prepared operation.  May add new operations before
  * this one.  May ask that this operation and all before it (a "batch")
@@ -1537,6 +1693,26 @@
   DBUG_RETURN(0);
 }
 
+/*
+ * After next event on main table.
+ */
+int
+NdbBlob::atNextEvent()
+{
+  DBUG_ENTER("NdbBlob::atNextEvent");
+  DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d", this, theEventOp, theBlobEventOp, theEventBlobVersion));
+  if (theState == Invalid)
+    DBUG_RETURN(-1);
+  assert(theEventBlobVersion >= 0);
+  getHeadFromRecAttr();
+  if (theNullFlag == -1) // value not defined
+    DBUG_RETURN(0);
+  if (setPos(0) == -1)
+    DBUG_RETURN(-1);
+  setState(Active);
+  DBUG_RETURN(0);
+}
+
 // misc
 
 const NdbDictionary::Column*
@@ -1583,6 +1759,17 @@
   if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0)
     ;
   else if ((code = theNdb->theError.code) != 0)
+    ;
+  else
+    code = NdbBlobImpl::ErrUnknown;
+  setErrorCode(code, invalidFlag);
+}
+
+void
+NdbBlob::setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag)
+{
+  int code = 0;
+  if ((code = anOp->m_error.code) != 0)
     ;
   else
     code = NdbBlobImpl::ErrUnknown;

--- 1.8/storage/ndb/test/ndbapi/test_event_merge.cpp	2006-01-13 18:01:27 +01:00
+++ 1.9/storage/ndb/test/ndbapi/test_event_merge.cpp	2006-01-19 14:00:03 +01:00
@@ -21,14 +21,7 @@
 #include <my_sys.h>
 #include <ndb_version.h>
 
-#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0)
-#define version50
-#else
-#undef version50
-#endif
-
-// until rbr in 5.1
-#undef version51rbr
+// version >= 5.1 required
 
 #if !defined(min) || !defined(max)
 #define min(x, y) ((x) < (y) ? (x) : (y))
@@ -57,11 +50,11 @@
  * There are other -no-* options, each added to isolate a specific bug.
  *
  * There are 5 ways (ignoring NUL operand) to compose 2 ops:
- *                      5.0 bugs        5.1 bugs
+ *
  * INS o DEL = NUL
- * INS o UPD = INS                      type=INS
- * DEL o INS = UPD      type=INS        type=INS
- * UPD o DEL = DEL      no event
+ * INS o UPD = INS
+ * DEL o INS = UPD
+ * UPD o DEL = DEL
  * UPD o UPD = UPD
  */
 
@@ -73,17 +66,19 @@
   uint maxpk;
   my_bool no_blobs;
   my_bool no_implicit_nulls;
+  my_bool no_missing_update;
   my_bool no_multiops;
   my_bool no_nulls;
   my_bool one_blob;
   const char* opstring;
   uint seed;
   my_bool separate_events;
+  uint tweak; // whatever's useful
   my_bool use_table;
 };
 
 static Opts g_opts;
-static const uint g_maxpk = 100;
+static const uint g_maxpk = 1000;
 static const uint g_maxopstringpart = 100;
 static const char* g_opstringpart[g_maxopstringpart];
 static uint g_opstringparts = 0;
@@ -712,6 +707,20 @@
     if (! c.nullable) {
       chkrc(ind0 <= 0 && ind1 <= 0);
     }
+    if (c.isblob()) {
+      // blob values must be from allowed chars
+      int j;
+      for (j = 0; j < 2; j++) {
+        const Data& d = op->data[j];
+        if (d.ind[i] == 0) {
+          const Data::Txt& t = *d.ptr[i].txt;
+          int k;
+          for (k = 0; k < t.len; k++) {
+            chkrc(strchr(g_charval, t.val[k]) != 0);
+          }
+        }
+      }
+    }
   }
   return 0;
 }
@@ -849,9 +858,8 @@
     const Col& c = g_col[i];
     evt.addEventColumn(c.name);
   }
-#ifdef version51rbr
+  evt.setReport(NdbDictionary::Event::ER_UPDATED);
   evt.mergeEvents(! g_opts.separate_events);
-#endif
   if (g_dic->getEvent(evt.getName()) != 0)
     chkdb(g_dic->dropEvent(evt.getName()) == 0);
   chkdb(g_dic->createEvent(evt) == 0);
@@ -875,14 +883,8 @@
 createeventop()
 {
   ll1("createeventop");
-#ifdef version50
-  uint bsz = 10 * g_opts.maxops;
-  chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName(), bsz)) != 0);
-#else
   chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
-  // available in gci merge changeset
   g_evt_op->mergeEvents(! g_opts.separate_events); // not yet inherited
-#endif
   uint i;
   for (i = 0; i < ncol(); i++) {
     const Col& c = g_col[i];
@@ -891,10 +893,8 @@
       chkdb((g_ev_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0);
       chkdb((g_ev_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0);
     } else {
-#ifdef version51rbr
       chkdb((g_ev_bh[0][i] = g_evt_op->getBlobHandle(c.name)) != 0);
       chkdb((g_ev_bh[1][i] = g_evt_op->getPreBlobHandle(c.name)) != 0);
-#endif
     }
   }
   return 0;
@@ -909,10 +909,10 @@
   return 0;
 }
 
+// wait for event to be installed and for GCIs to pass
 static int
-waitgci() // wait for event to be installed and for at least 1 GCI to pass
+waitgci(uint ngci)
 {
-  const uint ngci = 3;
   ll1("waitgci " << ngci);
   Uint32 gci[2];
   uint i = 0;
@@ -976,7 +976,6 @@
       if (! c.isblob()) {
         ind = ra[i]->isNULL();
       } else {
-#ifdef version51rbr
         int ret;
         ret = bh[i]->getDefined(ind);
         assert(ret == 0);
@@ -992,8 +991,10 @@
           Uint32 len = t.len;
           ret = bh[i]->readData(t.val, len);
           assert(ret == 0 && len == t.len);
+          // to see the data, have to execute...
+          chkdb(g_con->execute(NoCommit) == 0);
+          assert(memchr(t.val, 'X', t.len) == 0);
         }
-#endif
       }
       assert(ind >= 0);
       d0.ind[i] = ind;
@@ -1042,7 +1043,7 @@
   } else if (t == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) {
     d.noop |= (1 << i);
     d.ind[i] = 1; // implicit NULL value is known
-  } else if (t == Op::UPD && urandom(10, 100)) {
+  } else if (t == Op::UPD && ! g_opts.no_missing_update && urandom(10, 100)) {
     d.noop |= (1 << i);
     d.ind[i] = -1; // fixed up in caller
   } else if (! g_opts.no_nulls && c.nullable && urandom(10, 100)) {
@@ -1060,6 +1061,8 @@
       {
         char* p = d.ptr[i].ch;
         uint u = urandom(g_charlen);
+        if (u == 0)
+          u = urandom(g_charlen); // 2x bias for non-empty
         uint j;
         for (j = 0; j < g_charlen; j++) {
           uint v = urandom(strlen(g_charval));
@@ -1070,10 +1073,19 @@
     case NdbDictionary::Column::Text:
       {
         Data::Txt& t = *d.ptr[i].txt;
+        delete [] t.val;
+        t.val = 0;
+        if (g_opts.tweak & 1) {
+          uint u = 256 + 2000;
+          uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval));
+          t.val = new char [u];
+          t.len = u;
+          memset(t.val, g_charval[v], u);
+          break;
+        }
         uint u = urandom(g_maxblobsize);
         u = urandom(u); // 4x bias for smaller blobs
         u = urandom(u);
-        delete [] t.val;
         t.val = new char [u];
         t.len = u;
         uint j = 0;
@@ -1134,9 +1146,15 @@
 {
   ll1("makeops");
   Uint32 pk1 = 0;
-  while (g_usedops < g_opts.maxops && pk1 < g_opts.maxpk) {
-    if (g_opts.opstring == 0)
+  while (1) {
+    if (g_opts.opstring == 0) {
+      if (g_usedops >= g_opts.maxops) // use up ops
+        break;
       pk1 = urandom(g_opts.maxpk);
+    } else {
+      if (pk1 >= g_opts.maxpk) // use up pks
+        break;
+    }
     ll2("makeops: pk1=" << pk1);
     // total op on the pk so far
     // optype either NUL=initial/deleted or INS=created
@@ -1465,7 +1483,7 @@
           }
           if (tmpok) {
             ok = gci_op->match = true;
-            ll2("===: match");
+            ll2("match");
           }
         }
         pos++;
@@ -1555,7 +1573,6 @@
         NdbRecAttr* ra = g_ev_ra[j][i];
         ind = ra->isNULL();
       } else {
-#ifdef version51rbr
         NdbBlob* bh = g_ev_bh[j][i];
         ret = bh->getDefined(ind);
         assert(ret == 0);
@@ -1572,7 +1589,6 @@
           ret = bh->readData(t.val, len);
           assert(ret == 0 && len == t.len);
         }
-#endif
       }
       d[j].ind[i] = ind;
     }
@@ -1585,38 +1601,22 @@
   ll1("runevents");
   uint mspoll = 1000;
   uint npoll = 6; // strangely long delay
+  ll1("poll " << npoll);
   while (npoll != 0) {
     npoll--;
     int ret;
-    ll1("poll");
     ret = g_ndb->pollEvents(mspoll);
     if (ret <= 0)
       continue;
     while (1) {
       g_rec_ev->init(Op::EV);
-#ifdef version50
-      int overrun = g_opts.maxops;
-      chkdb((ret = g_evt_op->next(&overrun)) >= 0);
-      chkrc(overrun == 0);
-      if (ret == 0)
-        break;
-#else
       NdbEventOperation* tmp_op = g_ndb->nextEvent();
       if (tmp_op == 0)
         break;
       reqrc(g_evt_op == tmp_op);
-#endif
       chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0);
       geteventdata();
       g_rec_ev->gci = g_evt_op->getGCI();
-#ifdef version50
-      // fix to match 5.1
-      if (g_rec_ev->type == Op::UPD) {
-        Uint32 pk1 = g_rec_ev->data[0].pk1;
-        makedata(getcol("pk1"), g_rec_ev->data[1], pk1, Op::UPD);
-        makedata(getcol("pk2"), g_rec_ev->data[1], pk1, Op::UPD);
-      }
-#endif
       // get indicators and blob value
       ll2("runevents: EVT: " << *g_rec_ev);
       // check basic sanity
@@ -1667,7 +1667,7 @@
   chkrc(createtable() == 0);
   chkrc(createevent() == 0);
   for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
-    ll0("loop " << g_loop);
+    ll0("=== loop " << g_loop << " ===");
     setseed(g_loop);
     resetmem();
     chkrc(scantab() == 0); // alternative: save tot_op for loop > 0
@@ -1675,7 +1675,7 @@
     g_rec_ev = getop(Op::EV);
     chkrc(createeventop() == 0);
     chkdb(g_evt_op->execute() == 0);
-    chkrc(waitgci() == 0);
+    chkrc(waitgci(3) == 0);
     chkrc(runops() == 0);
     if (! g_opts.separate_events)
       chkrc(mergeops() == 0);
@@ -1685,6 +1685,8 @@
     chkrc(matchevents() == 0);
     chkrc(matchops() == 0);
     chkrc(dropeventop() == 0);
+    // time erases everything..
+    chkrc(waitgci(1) == 0);
   }
   chkrc(dropevent() == 0);
   chkrc(droptable() == 0);
@@ -1703,41 +1705,48 @@
   { "loglevel", 1002, "Logging level in this program (default 0)",
     (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0,
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "loop", 1003, "Number of test loops (default 2, 0=forever)",
+  { "loop", 1003, "Number of test loops (default 3, 0=forever)",
     (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0,
-    GET_INT, REQUIRED_ARG, 2, 0, 0, 0, 0, 0 },
+    GET_INT, REQUIRED_ARG, 3, 0, 0, 0, 0, 0 },
   { "maxops", 1004, "Approx number of PK operations (default 1000)",
     (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0,
     GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
   { "maxpk", 1005, "Number of different PK values (default 10)",
     (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0,
-    GET_UINT, REQUIRED_ARG, 10, 1, g_maxpk, 0, 0, 0 },
+    GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
   { "no-blobs", 1006, "Omit blob attributes (5.0: true)",
     (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-implicit-nulls", 1007, "Insert must include NULL values explicitly",
+  { "no-implicit-nulls", 1007, "Insert must include all attrs"
+                               " i.e. no implicit NULLs",
     (gptr*)&g_opts.no_implicit_nulls, (gptr*)&g_opts.no_implicit_nulls, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-multiops", 1008, "Allow only 1 operation per commit",
+  { "no-missing-update", 1008, "Update must include all non-PK attrs",
+    (gptr*)&g_opts.no_missing_update, (gptr*)&g_opts.no_missing_update, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "no-multiops", 1009, "Allow only 1 operation per commit",
     (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-nulls", 1009, "Create no NULL values",
+  { "no-nulls", 1010, "Create no NULL values",
     (gptr*)&g_opts.no_nulls, (gptr*)&g_opts.no_nulls, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "one-blob", 1010, "Only one blob attribute (defautt 2)",
+  { "one-blob", 1011, "Only one blob attribute (default 2)",
     (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "opstring", 1011, "Operations to run e.g. idiucdc (c is commit) or"
+  { "opstring", 1012, "Operations to run e.g. idiucdc (c is commit) or"
                       " iuuc:uudc (the : separates loops)",
     (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0,
     GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "seed", 1012, "Random seed (0=loop number, default -1=random)",
+  { "seed", 1013, "Random seed (0=loop number, default -1=random)",
     (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
     GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
-  { "separate-events", 1013, "Do not combine events per GCI (5.0: true)",
+  { "separate-events", 1014, "Do not combine events per GCI (5.0: true)",
     (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "use-table", 1014, "Use existing table 'tem1'",
+  { "tweak", 1015, "Whatever the source says",
+    (gptr*)&g_opts.tweak, (gptr*)&g_opts.tweak, 0,
+    GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  { "use-table", 1016, "Use existing table 'tem1'",
     (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { 0, 0, 0,
@@ -1754,9 +1763,10 @@
 static int
 checkopts()
 {
-#ifdef version50
-  g_opts.separate_events = true;
-#endif
+  if (g_opts.maxpk > g_maxpk) {
+    ll0("setting maxpk to " << g_maxpk);
+    g_opts.maxpk = g_maxpk;
+  }
   if (g_opts.separate_events) {
     g_opts.no_blobs = true;
   }

--- 1.2/storage/ndb/ndbapi-examples/ndbapi_event/Makefile	2005-12-05 15:19:27 +01:00
+++ 1.3/storage/ndb/ndbapi-examples/ndbapi_event/Makefile	2006-01-19 14:00:03 +01:00
@@ -4,7 +4,7 @@
 CXX = g++ -g
 CFLAGS = -c -Wall -fno-rtti -fno-exceptions
 CXXFLAGS = 
-DEBUG = 
+DEBUG =# -DVM_TRACE
 LFLAGS = -Wall
 TOP_SRCDIR = ../../../..
 INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include
@@ -16,8 +16,8 @@
 $(TARGET): $(OBJS)
 	$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
 
-$(TARGET).o: $(SRCS)
-	$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
+$(TARGET).o: $(SRCS) Makefile
+	$(CXX) $(CFLAGS) $(DEBUG) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
 
 clean:
 	rm -f *.o $(TARGET)

--- 1.3/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp	2006-01-13 18:01:27 +01:00
+++ 1.4/storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp	2006-01-19 14:00:03 +01:00
@@ -54,26 +54,32 @@
 #include <stdio.h>
 #include <iostream>
 #include <unistd.h>
+#ifdef VM_TRACE
+#include <my_global.h>
+#endif
+#ifndef assert
+#include <assert.h>
+#endif
 
 
 /**
- *
- * Assume that there is a table t0 which is being updated by 
+ * Assume that there is a table which is being updated by 
  * another process (e.g. flexBench -l 0 -stdtables).
- * We want to monitor what happens with columns c0,c1,c2,c3.
+ * We want to monitor what happens with column values.
  *
- * or together with the mysql client;
+ * Or using the mysql client:
  *
  * shell> mysql -u root
  * mysql> create database TEST_DB;
  * mysql> use TEST_DB;
- * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
+ * mysql> create table t0
+ *        (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
  *        primary key(c0, c2)) engine ndb charset latin1;
  *
  * In another window start ndbapi_event, wait until properly started
- *
-   insert into t0 values (1, 2, 'a', 'b');
-   insert into t0 values (3, 4, 'c', 'd');
+ 
+   insert into t0 values (1, 2, 'a', 'b', null);
+   insert into t0 values (3, 4, 'c', 'd', null);
    update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
    update t0 set c3 = 'f'; -- use scan
    update t0 set c3 = 'F'; -- use scan update to 'same'
@@ -81,7 +87,18 @@
    update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
    update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
    delete from t0;
- *
+
+   insert ...; update ...; -- see events w/ same pk merged (if -m option)
+   delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
+   update ...; update ...;
+
+   -- text requires -m flag
+   set @a = repeat('a',256); -- inline size
+   set @b = repeat('b',2000); -- part size
+   set @c = repeat('c',2000*30); -- 30 parts
+
+   -- update the text field using combinations of @a, @b, @c ...
+ 
  * you should see the data popping up in the example window
  *
  */
@@ -95,12 +112,18 @@
 		  const char *eventName,
 		  const char *eventTableName,
 		  const char **eventColumnName,
-		  const int noEventColumnName);
+		  const int noEventColumnName,
+                  bool merge_events);
 
 int main(int argc, char** argv)
 {
   ndb_init();
-  bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0;
+  bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0;
+#ifdef VM_TRACE
+  bool dbug = argc > 1 && strchr(argv[1], 'd') != 0;
+  if (dbug) DBUG_PUSH("d:t:");
+  if (dbug) putenv("API_SIGNAL_LOG=-");
+#endif
 
   Ndb_cluster_connection *cluster_connection=
     new Ndb_cluster_connection(); // Object representing the cluster
@@ -134,12 +157,13 @@
 
   const char *eventName= "CHNG_IN_t0";
   const char *eventTableName= "t0";
-  const int noEventColumnName= 4;
+  const int noEventColumnName= 5;
   const char *eventColumnName[noEventColumnName]=
     {"c0",
      "c1",
      "c2",
-     "c3"
+     "c3",
+     "c4"
     };
   
   // Create events
@@ -147,9 +171,14 @@
 		eventName,
 		eventTableName,
 		eventColumnName,
-		noEventColumnName);
+		noEventColumnName,
+                merge_events);
+
+  // Normal values and blobs are unfortunately handled differently..
+  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
 
-  int j= 0;
+  int i, j, k, l;
+  j = 0;
   while (j < 99) {
 
     // Start "transaction" for handling events
@@ -160,12 +189,17 @@
     op->mergeEvents(merge_events);
 
     printf("get values\n");
-    NdbRecAttr* recAttr[noEventColumnName];
-    NdbRecAttr* recAttrPre[noEventColumnName];
+    RA_BH recAttr[noEventColumnName];
+    RA_BH recAttrPre[noEventColumnName];
     // primary keys should always be a part of the result
-    for (int i = 0; i < noEventColumnName; i++) {
-      recAttr[i]    = op->getValue(eventColumnName[i]);
-      recAttrPre[i] = op->getPreValue(eventColumnName[i]);
+    for (i = 0; i < noEventColumnName; i++) {
+      if (i < 4) {
+        recAttr[i].ra    = op->getValue(eventColumnName[i]);
+        recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
+      } else if (merge_events) {
+        recAttr[i].bh    = op->getBlobHandle(eventColumnName[i]);
+        recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
+      }
     }
 
     // set up the callbacks
@@ -174,13 +208,16 @@
     if (op->execute())
       APIERROR(op->getNdbError());
 
-    int i= 0;
-    while(i < 40) {
+    NdbEventOperation* the_op = op;
+
+    i= 0;
+    while (i < 40) {
       // printf("now waiting for event...\n");
-      int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
+      int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
       if (r > 0) {
 	// printf("got data! %d\n", r);
 	while ((op= myNdb->nextEvent())) {
+          assert(the_op == op);
 	  i++;
 	  switch (op->getEventType()) {
 	  case NdbDictionary::Event::TE_INSERT:
@@ -195,40 +232,66 @@
 	  default:
 	    abort(); // should not happen
 	  }
-          printf(" gci=%d\n", op->getGCI());
-          printf("post:  ");
-	  for (int i = 0; i < noEventColumnName; i++) {
-	    if (recAttr[i]->isNULL() >= 0) { // we have a value
-	      if (recAttr[i]->isNULL() == 0) { // we have a non-null value
-                if (i < 2)
-                  printf("%-5u", recAttr[i]->u_32_value());
-                else
-                  printf("%-5.4s", recAttr[i]->aRef());
-              } else                           // we have a null value
-		printf("%-5s", "NULL");
-	    } else
-              printf("%-5s", "-");
+          printf(" gci=%d\n", (int)op->getGCI());
+          for (k = 0; k <= 1; k++) {
+            printf(k == 0 ? "post: " : "pre : ");
+            for (l = 0; l < noEventColumnName; l++) {
+              if (l < 4) {
+                NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
+                if (ra->isNULL() >= 0) { // we have a value
+                  if (ra->isNULL() == 0) { // we have a non-null value
+                    if (l < 2)
+                      printf("%-5u", ra->u_32_value());
+                    else
+                      printf("%-5.4s", ra->aRef());
+                  } else
+                    printf("%-5s", "NULL");
+                } else
+                  printf("%-5s", "-"); // no value
+              } else if (merge_events) {
+                int isNull;
+                NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
+                bh->getDefined(isNull);
+                if (isNull >= 0) { // we have a value
+                  if (! isNull) { // we have a non-null value
+                    Uint64 length = 0;
+                    bh->getLength(length);
+                    // read into buffer
+                    unsigned char* buf = new unsigned char [length];
+                    memset(buf, 'X', length);
+                    Uint32 n = length;
+                    bh->readData(buf, n); // n is in/out
+                    assert(n == length);
+                    // pretty-print
+                    bool first = true;
+                    Uint32 i = 0;
+                    while (i < n) {
+                      unsigned char c = buf[i++];
+                      Uint32 m = 1;
+                      while (i < n && buf[i] == c)
+                        i++, m++;
+                      if (! first)
+                        printf("+");
+                      printf("%u%c", m, c);
+                      first = false;
+                    }
+                    printf("[%u]", n);
+                    delete [] buf;
+                  } else
+                    printf("%-5s", "NULL");
+                } else
+                  printf("%-5s", "-"); // no value
+              }
+            }
+            printf("\n");
           }
-          printf("\npre :  ");
-	  for (int i = 0; i < noEventColumnName; i++) {
-	    if (recAttrPre[i]->isNULL() >= 0) { // we have a value
-	      if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
-                if (i < 2)
-                  printf("%-5u", recAttrPre[i]->u_32_value());
-                else
-                  printf("%-5.4s", recAttrPre[i]->aRef());
-              } else                              // we have a null value
-		printf("%-5s", "NULL");
-	    } else
-              printf("%-5s", "-");
-	  }
-          printf("\n");
 	}
       } else
 	;//printf("timed out\n");
     }
     // don't want to listen to events anymore
-    if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError());
+    if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
+    the_op = 0;
 
     j++;
   }
@@ -250,7 +313,8 @@
 		  const char *eventName,
 		  const char *eventTableName,
 		  const char **eventColumnNames,
-		  const int noEventColumnNames)
+		  const int noEventColumnNames,
+                  bool merge_events)
 {
   NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
   if (!myDict) APIERROR(myNdb->getNdbError());
@@ -265,6 +329,7 @@
   //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
 
   myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
+  myEvent.mergeEvents(merge_events);
 
   // Add event to database
   if (myDict->createEvent(myEvent) == 0)
Thread
bk commit into 5.1 tree (pekka:1.2074)pekka19 Jan