List:Commits« Previous MessageNext Message »
From:pekka Date:December 27 2005 5:29pm
Subject:bk commit into 5.1 tree (pekka:1.1998)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1998 05/12/27 18:28:55 pekka@stripped +3 -0
  Merge mysql.com:/space/pekka/ndb/version/my50
  into  mysql.com:/space/pekka/ndb/version/my51

  storage/ndb/test/ndbapi/Makefile.am
    1.26 05/12/27 18:28:35 pekka@stripped +1 -4
    merge-ul

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.25 05/12/27 18:27:46 pekka@stripped +0 -1
    merge-ul

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.7 05/12/27 18:27:21 pekka@stripped +0 -2
    merge-ul

  storage/ndb/test/ndbapi/Makefile.am
    1.19.6.2 05/12/27 18:25:49 pekka@stripped +0 -0
    Merge rename: ndb/test/ndbapi/Makefile.am -> storage/ndb/test/ndbapi/Makefile.am

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.18.1.2 05/12/27 18:25:49 pekka@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbEventOperationImpl.cpp -> storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.2.2.2 05/12/27 18:25:49 pekka@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp -> storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51/RESYNC

--- 1.2.2.1/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2005-12-27 16:29:00 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2005-12-27 18:27:21 +01:00
@@ -20,571 +20,646 @@
 #include <ndb_limits.h>
 #include <pc.hpp>
 #include <signaldata/TupCommit.hpp>
+#include "../dblqh/Dblqh.hpp"
 
 #define ljam() { jamLine(5000 + __LINE__); }
 #define ljamEntry() { jamEntryLine(5000 + __LINE__); }
 
-void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
-{
-  jamEntry();
-  OperationrecPtr loopOpPtr;
-  loopOpPtr.i = signal->theData[0];
-  Uint32 gci = signal->theData[1];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  while (loopOpPtr.p->nextActiveOp != RNIL) {
-    ljam();
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  }//while
-  do {
-    Uint32 blockNo = refToBlock(loopOpPtr.p->userblockref);
-    ndbrequire(loopOpPtr.p->transstate == STARTED);
-    signal->theData[0] = loopOpPtr.p->userpointer;
-    signal->theData[1] = gci;
-    if (loopOpPtr.p->prevActiveOp == RNIL) {
-      ljam();
-      EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
-      return;
-    }//if
-    ljam();
-    EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
-    jamEntry();
-    loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  } while (true);
-}//Dbtup::execTUP_WRITELOG_REQ()
-
 void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
 {
   TablerecPtr regTabPtr;
   FragrecordPtr regFragPtr;
+  Uint32 frag_page_id, frag_id;
 
-  jamEntry();
+  ljamEntry();
 
-  Uint32 fragId = signal->theData[0];
-  regTabPtr.i = signal->theData[1];
-  Uint32 fragPageId = signal->theData[2];
-  Uint32 pageIndex = signal->theData[3];
+  frag_id= signal->theData[0];
+  regTabPtr.i= signal->theData[1];
+  frag_page_id= signal->theData[2];
+  Uint32 page_index= signal->theData[3];
 
   ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
-  getFragmentrec(regFragPtr, fragId, regTabPtr.p);
-  ndbrequire(regFragPtr.p != NULL);
+  
+  getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
+  ndbassert(regFragPtr.p != NULL);
+  
+  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~0))
+  {
+    Local_key tmp;
+    tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
+    tmp.m_page_idx= page_index;
+    
+    PagePtr pagePtr;
+    Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
+    
+    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
+    {
+      ljam();
+      
+      if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
+      {
+	free_var_part(regFragPtr.p, regTabPtr.p,
+		      *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p),
+		      Var_page::CHAIN);
+      }
+      free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p, 0);
+    } else {
+      free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
+    }
+  }
+}
 
-  PagePtr pagePtr;
-  pagePtr.i = getRealpid(regFragPtr.p, fragPageId);
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 pageIndexScaled = pageIndex >> 1;
-  ndbrequire((pageIndex & 1) == 0);
-  Uint32 pageOffset = ZPAGE_HEADER_SIZE + 
-                     (regTabPtr.p->tupheadsize * pageIndexScaled);
-//---------------------------------------------------
-/* --- Deallocate a tuple as requested by ACC  --- */
-//---------------------------------------------------
-  if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
+void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
+{
+  jamEntry();
+  OperationrecPtr loopOpPtr;
+  loopOpPtr.i= signal->theData[0];
+  Uint32 gci= signal->theData[1];
+  c_operation_pool.getPtr(loopOpPtr);
+  while (loopOpPtr.p->prevActiveOp != RNIL) {
     ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_INSERT_TH,
-                        fragPageId,
-                        pageIndex,
-                        regTabPtr.i,
-                        fragId,
-                        regFragPtr.p->checkpointVersion);
-    cprAddData(signal,
-               regFragPtr.p,
-               pagePtr.i,
-               regTabPtr.p->tupheadsize,
-               pageOffset);
-  }//if
-  {
-    freeTh(regFragPtr.p,
-           regTabPtr.p,
-           signal,
-           pagePtr.p,
-           pageOffset);
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
   }
+  do {
+    ndbrequire(get_trans_state(loopOpPtr.p) == TRANS_STARTED);
+    signal->theData[0]= loopOpPtr.p->userpointer;
+    signal->theData[1]= gci;
+    if (loopOpPtr.p->nextActiveOp == RNIL) {
+      ljam();
+      EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
+      return;
+    }
+    ljam();
+    EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
+    jamEntry();
+    loopOpPtr.i= loopOpPtr.p->nextActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
+  } while (true);
 }
 
-/* ---------------------------------------------------------------- */
-/* ------------ PERFORM A COMMIT ON AN UPDATE OPERATION  ---------- */
-/* ---------------------------------------------------------------- */
-void Dbtup::commitUpdate(Signal* signal,
-                         Operationrec*  const regOperPtr,
-                         Fragrecord* const regFragPtr,
-                         Tablerec* const regTabPtr)
-{
-  if (regOperPtr->realPageIdC != RNIL) {
-    if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageIdC)) {
-/* ------------------------------------------------------------------------ */
-/* IF THE COPY WAS CREATED WITHIN THIS CHECKPOINT WE ONLY HAVE              */
-/* TO LOG THE CREATION OF THE COPY. IF HOWEVER IT WAS CREATED BEFORE  SAVE  */
-/* THIS CHECKPOINT, WE HAVE TO THE DATA AS WELL.                            */
-/* ------------------------------------------------------------------------ */
-      if (regOperPtr->undoLogged) {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH_NO_DATA,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-      } else {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-        cprAddData(signal,
-                   regFragPtr,
-                   regOperPtr->realPageIdC,
-                   regTabPtr->tupheadsize,
-                   regOperPtr->pageOffsetC);
-      }//if
-    }//if
-
-    PagePtr copyPagePtr;
-    copyPagePtr.i = regOperPtr->realPageIdC;
-    ptrCheckGuard(copyPagePtr, cnoOfPage, page);
-    freeTh(regFragPtr,
-           regTabPtr,
-           signal,
-           copyPagePtr.p,
-           (Uint32)regOperPtr->pageOffsetC);
-    regOperPtr->realPageIdC = RNIL;
-    regOperPtr->fragPageIdC = RNIL;
-    regOperPtr->pageOffsetC = ZNIL;
-    regOperPtr->pageIndexC = ZNIL;
-  }//if
-}//Dbtup::commitUpdate()
-
-void
-Dbtup::commitSimple(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
-{
-  operPtr.p = regOperPtr;
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
-
-  // Checking detached triggers
-  checkDetachedTriggers(signal,
-                        regOperPtr,
-                        regTabPtr);
+void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr,
+                               Tuple_header *tuple_ptr)
+{
+  OperationrecPtr raoOperPtr;
 
-  removeActiveOpList(regOperPtr);
-  if (regOperPtr->optype == ZUPDATE) {
-    ljam();
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else if (regOperPtr->optype == ZINSERT) {
-    ljam();
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else {
-    ndbrequire(regOperPtr->optype == ZDELETE);
-  }//if
-}//Dbtup::commitSimple()
-
-void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr)
-{
-  if (regOperPtr->inActiveOpList == ZTRUE) {
-    OperationrecPtr raoOperPtr;
-    regOperPtr->inActiveOpList = ZFALSE;
-    if (regOperPtr->prevActiveOp != RNIL) {
+  /**
+   * Release copy tuple
+   */
+  if(regOperPtr->op_struct.op_type != ZDELETE && 
+     !regOperPtr->m_copy_tuple_location.isNull())
+    c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
+  
+  if (regOperPtr->op_struct.in_active_list) {
+    regOperPtr->op_struct.in_active_list= false;
+    if (regOperPtr->nextActiveOp != RNIL) {
       ljam();
-      raoOperPtr.i = regOperPtr->prevActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->nextActiveOp = regOperPtr->nextActiveOp;
+      raoOperPtr.i= regOperPtr->nextActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->prevActiveOp= regOperPtr->prevActiveOp;
     } else {
       ljam();
-      PagePtr pagePtr;
-      pagePtr.i = regOperPtr->realPageId;
-      ptrCheckGuard(pagePtr, cnoOfPage, page);
-      ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-      pagePtr.p->pageWord[regOperPtr->pageOffset] = regOperPtr->nextActiveOp;
-    }//if
-    if (regOperPtr->nextActiveOp != RNIL) {
+      tuple_ptr->m_operation_ptr_i = regOperPtr->prevActiveOp;
+    }
+    if (regOperPtr->prevActiveOp != RNIL) {
       ljam();
-      raoOperPtr.i = regOperPtr->nextActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->prevActiveOp = regOperPtr->prevActiveOp;
-    }//if
-    regOperPtr->prevActiveOp = RNIL;
-    regOperPtr->nextActiveOp = RNIL;
-  }//if
-}//Dbtup::removeActiveOpList()
+      raoOperPtr.i= regOperPtr->prevActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->nextActiveOp= regOperPtr->nextActiveOp;
+    }
+    regOperPtr->prevActiveOp= RNIL;
+    regOperPtr->nextActiveOp= RNIL;
+  }
+}
 
 /* ---------------------------------------------------------------- */
 /* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
 /* ---------------------------------------------------------------- */
-void Dbtup::initOpConnection(Operationrec* regOperPtr,
-			     Fragrecord * fragPtrP)
+void Dbtup::initOpConnection(Operationrec* regOperPtr)
 {
-  Uint32 RinFragList = regOperPtr->inFragList;
-  regOperPtr->transstate = IDLE;
-  regOperPtr->currentAttrinbufLen = 0;
-  regOperPtr->optype = ZREAD;
-  if (RinFragList == ZTRUE) {
-    OperationrecPtr tropNextLinkPtr;
-    OperationrecPtr tropPrevLinkPtr;
-/*----------------------------------------------------------------- */
-/*       TO ENSURE THAT WE HAVE SUCCESSFUL ABORTS OF FOLLOWING      */
-/*       OPERATIONS WHICH NEVER STARTED WE SET THE OPTYPE TO READ.  */
-/*----------------------------------------------------------------- */
-/*       REMOVE IT FROM THE DOUBLY LINKED LIST ON THE FRAGMENT      */
-/*----------------------------------------------------------------- */
-    tropPrevLinkPtr.i = regOperPtr->prevOprecInList;
-    tropNextLinkPtr.i = regOperPtr->nextOprecInList;
-    regOperPtr->inFragList = ZFALSE;
-    if (tropPrevLinkPtr.i == RNIL) {
-      ljam();
-      fragPtrP->firstusedOprec = tropNextLinkPtr.i;
-    } else {
-      ljam();
-      ptrCheckGuard(tropPrevLinkPtr, cnoOfOprec, operationrec);
-      tropPrevLinkPtr.p->nextOprecInList = tropNextLinkPtr.i;
-    }//if
-    if (tropNextLinkPtr.i == RNIL) {
-      fragPtrP->lastusedOprec = tropPrevLinkPtr.i;
-    } else {
-      ptrCheckGuard(tropNextLinkPtr, cnoOfOprec, operationrec);
-      tropNextLinkPtr.p->prevOprecInList = tropPrevLinkPtr.i;
-    }
-    regOperPtr->prevOprecInList = RNIL;
-    regOperPtr->nextOprecInList = RNIL;
-  }//if
-}//Dbtup::initOpConnection()
+  set_tuple_state(regOperPtr, TUPLE_ALREADY_ABORTED);
+  set_trans_state(regOperPtr, TRANS_IDLE);
+  regOperPtr->currentAttrinbufLen= 0;
+  regOperPtr->op_struct.op_type= ZREAD;
+  regOperPtr->op_struct.m_disk_preallocated= 0;
+  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr->op_struct.m_wait_log_buffer= 0;
+  regOperPtr->m_undo_buffer_space= 0;
+}
 
-/* ----------------------------------------------------------------- */
-/* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
-/* ----------------------------------------------------------------- */
-void Dbtup::execTUP_COMMITREQ(Signal* signal) 
+void
+Dbtup::dealloc_tuple(Signal* signal,
+		     Uint32 gci,
+		     Page* page,
+		     Tuple_header* ptr, 
+		     Operationrec* regOperPtr, 
+		     Fragrecord* regFragPtr, 
+		     Tablerec* regTabPtr)
 {
-  FragrecordPtr regFragPtr;
-  OperationrecPtr regOperPtr;
-  TablerecPtr regTabPtr;
-
-  TupCommitReq * const tupCommitReq = (TupCommitReq *)signal->getDataPtr();
-
-  ljamEntry();
-  regOperPtr.i = tupCommitReq->opPtr;
-  ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
-
-  ndbrequire(regOperPtr.p->transstate == STARTED);
-  regOperPtr.p->gci = tupCommitReq->gci;
-  regOperPtr.p->hashValue = tupCommitReq->hashValue;
+  if (ptr->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Local_key disk;
+    memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
+    Ptr<GlobalPage> disk_page;
+    m_global_page_pool.getPtr(disk_page, 
+			      regOperPtr->m_commit_disk_callback_page);
+    disk_page_free(signal, regTabPtr, regFragPtr, 
+		   &disk, *(PagePtr*)&disk_page, gci);
+  }
+}
 
-  regFragPtr.i = regOperPtr.p->fragmentPtr;
-  ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
+static
+inline
+bool
+operator>=(const Local_key& key1, const Local_key& key2)
+{
+  return key1.m_page_no > key2.m_page_no ||
+    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
+}
 
-  regTabPtr.i = regOperPtr.p->tableRef;
-  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
+void
+Dbtup::commit_operation(Signal* signal,
+			Uint32 gci,
+			Tuple_header* tuple_ptr, 
+			Page* page,
+			Operationrec* regOperPtr, 
+			Fragrecord* regFragPtr, 
+			Tablerec* regTabPtr)
+{
+  ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
+  
+  Uint32 save= tuple_ptr->m_operation_ptr_i;
+  Uint32 bits= tuple_ptr->m_header_bits;
+
+  Tuple_header *disk_ptr= 0;
+  Tuple_header *copy= (Tuple_header*)
+    c_undo_buffer.get_ptr(&regOperPtr->m_copy_tuple_location);
+  
+  Uint32 copy_bits= copy->m_header_bits;
+
+  Uint32 fix_size= regTabPtr->m_offsets[MM].m_fix_header_size;
+  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  if(mm_vars == 0)
+  {
+    memcpy(tuple_ptr, copy, 4*fix_size);
+    //ndbout_c("commit: memcpy %p %p %d", tuple_ptr, copy, 4*fix_size);
+    disk_ptr= (Tuple_header*)(((Uint32*)copy)+fix_size);
+  }
+  else if(bits & Tuple_header::CHAINED_ROW)
+  {
+    Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr);
+    memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fix_size));
 
-  if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
-    ljam();
-    executeTuxCommitTriggers(signal,
-                             regOperPtr.p,
-                             regTabPtr.p);
+    Local_key tmp; tmp.assref(*ref);
+    if(0) printf("%p %d %d (%d bytes) - ref: %x ", tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx,
+	   4*(Tuple_header::HeaderSize+fix_size),
+	   *ref);
+    Ptr<Var_page> vpagePtr;
+    Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref);
+    Uint32 *src= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
+    ndbassert(4*vpagePtr.p->get_entry_len(tmp.m_page_idx) >= sz);
+    memcpy(dst, src, sz);
+    if(0) printf("ptr: %p %d ref: %x - chain commit", dst, sz, *ref);
+    copy_bits |= Tuple_header::CHAINED_ROW;
+    
+    if(0)
+    {
+      for(Uint32 i = 0; i<((sz+3)>>2); i++)
+	printf(" %.8x", src[i]);
+      printf("\n");
+    }
+    
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      if(0) printf(" - shrink %d -> %d - ", 
+	     vpagePtr.p->get_entry_len(tmp.m_page_idx), (sz + 3) >> 2);
+      vpagePtr.p->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+      if(0)ndbout_c("%p->shrink_entry(%d, %d)", vpagePtr.p, tmp.m_page_idx, 
+	       (sz + 3) >> 2);
+      update_free_page_list(regFragPtr, vpagePtr.p);
+    } 
+    if(0) ndbout_c("");
+    disk_ptr = (Tuple_header*)
+      (((Uint32*)copy)+Tuple_header::HeaderSize+fix_size+((sz + 3) >> 2));
+  } 
+  else 
+  {
+    Uint32 *var_part= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= Tuple_header::HeaderSize + fix_size +
+      ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
+    ndbassert(((Var_page*)page)->
+	      get_entry_len(regOperPtr->m_tuple_location.m_page_idx) >= sz);
+    memcpy(tuple_ptr, copy, 4*sz);      
+    if(0) ndbout_c("%p %d %d (%d bytes)", tuple_ptr,
+	     regOperPtr->m_tuple_location.m_page_no,
+	     regOperPtr->m_tuple_location.m_page_idx,
+	     4*sz);
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      ((Var_page*)page)->shrink_entry(regOperPtr->m_tuple_location.m_page_idx, 
+				      sz);
+      if(0)ndbout_c("%p->shrink_entry(%d, %d)", 
+	       page, regOperPtr->m_tuple_location.m_page_idx, sz);
+      update_free_page_list(regFragPtr, (Var_page*)page);
+    } 
+    disk_ptr = (Tuple_header*)(((Uint32*)copy)+sz);
+  }
+  
+  if (regTabPtr->m_no_of_disk_attributes &&
+      (copy_bits & Tuple_header::DISK_INLINE))
+  {
+    Local_key key;
+    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
+    Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
+
+    PagePtr pagePtr = *(PagePtr*)&m_pgman.m_ptr;
+    ndbassert(pagePtr.p->m_page_no == key.m_page_no);
+    ndbassert(pagePtr.p->m_file_no == key.m_file_no);
+    Uint32 sz, *dst;
+    if(copy_bits & Tuple_header::DISK_ALLOC)
+    {
+      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, pagePtr, gci);
+
+      if(lcpScan_ptr_i != RNIL)
+      {
+	ScanOpPtr scanOp;
+	c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
+	Local_key rowid = regOperPtr->m_tuple_location;
+	Local_key scanpos = scanOp.p->m_scanPos.m_key;
+	rowid.m_page_no = pagePtr.p->frag_page_id;
+	if(rowid >= scanpos)
+	{
+	  copy_bits |= Tuple_header::LCP_SKIP;
+	}
+      }
+    }
+    
+    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    {
+      sz= regTabPtr->m_offsets[DD].m_fix_header_size;
+      dst= ((Fix_page*)pagePtr.p)->get_ptr(key.m_page_idx, sz);
+    }
+    else
+    {
+      dst= ((Var_page*)pagePtr.p)->get_ptr(key.m_page_idx);
+      sz= ((Var_page*)pagePtr.p)->get_entry_len(key.m_page_idx);
+    }
+    
+    if(! (copy_bits & Tuple_header::DISK_ALLOC))
+    {
+      disk_page_undo_update(pagePtr.p, &key, dst, sz, gci, logfile_group_id);
+    }
+    
+    memcpy(dst, disk_ptr, 4*sz);
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+    
+    ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
+    copy_bits |= Tuple_header::DISK_PART;
   }
 
-  if (regOperPtr.p->tupleState == NO_OTHER_OP) {
-    if ((regOperPtr.p->prevActiveOp == RNIL) &&
-        (regOperPtr.p->nextActiveOp == RNIL)) {
-      ljam();
-/* ---------------------------------------------------------- */
-// We handle the simple case separately as an optimisation
-/* ---------------------------------------------------------- */
-      commitSimple(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-    } else {
-/* ---------------------------------------------------------- */
-// This is the first commit message of this record in this
-// transaction. We will commit this record completely for this
-// transaction. If there are other operations they will be
-// responsible to release their own resources. Also commit of
-// a delete is postponed until the last operation is committed
-// on the tuple.
-//
-// As part of this commitRecord we will also handle detached
-// triggers and release of resources for this operation.
-/* ---------------------------------------------------------- */
-      ljam();
-      commitRecord(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-      removeActiveOpList(regOperPtr.p);
-    }//if
-  } else {
-    ljam();
-/* ---------------------------------------------------------- */
-// Release any copy tuples
-/* ---------------------------------------------------------- */
-    ndbrequire(regOperPtr.p->tupleState == TO_BE_COMMITTED);
-    commitUpdate(signal, regOperPtr.p, regFragPtr.p, regTabPtr.p);
-    removeActiveOpList(regOperPtr.p);
-  }//if
-  initOpConnection(regOperPtr.p, regFragPtr.p);
-}//execTUP_COMMITREQ()
+  
+  Uint32 clear= 
+    Tuple_header::ALLOC |
+    Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE | 
+    Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN;
+  copy_bits &= ~(Uint32)clear;
+  
+  tuple_ptr->m_header_bits= copy_bits;
+  tuple_ptr->m_operation_ptr_i= save;
+  
+  if (regTabPtr->checksumIndicator) {
+    jam();
+    setChecksum(tuple_ptr, regTabPtr);
+  }
+}
 
 void
-Dbtup::updateGcpId(Signal* signal,
-                   Operationrec* const regOperPtr,
-                   Fragrecord* const regFragPtr,
-                   Tablerec* const regTabPtr)
-{
-  PagePtr pagePtr;
-  ljam();
-//--------------------------------------------------------------------
-// Is this code safe for UNDO logging. Not sure currently. RONM
-//--------------------------------------------------------------------
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 temp = regOperPtr->pageOffset + regTabPtr->tupGCPIndex;
-  ndbrequire((temp < ZWORDS_ON_PAGE) &&
-             (regTabPtr->tupGCPIndex < regTabPtr->tupheadsize));
-  if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
-    Uint32 prevGCI = pagePtr.p->pageWord[temp];
-    ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_UPDATE_GCI,
-                        regOperPtr->fragPageId,
-                        regOperPtr->pageIndex,
-                        regOperPtr->tableRef,
-                        regOperPtr->fragId,
-                        regFragPtr->checkpointVersion);
-    cprAddGCIUpdate(signal,
-                    prevGCI,
-                    regFragPtr);
-  }//if
-  pagePtr.p->pageWord[temp] = regOperPtr->gci;
-  if (regTabPtr->checksumIndicator) {
-    ljam();
-    setChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize);
-  }//if
-}//Dbtup::updateGcpId()
+Dbtup::disk_page_commit_callback(Signal* signal, 
+				 Uint32 opPtrI, Uint32 page_id)
+{
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
+
+  ljamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr.p->m_commit_disk_callback_page= page_id;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
+  
+  execTUP_COMMITREQ(signal);
+  if(signal->theData[0] == 0)
+    c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
 
 void
-Dbtup::commitRecord(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
+Dbtup::disk_page_log_buffer_callback(Signal* signal, 
+				     Uint32 opPtrI,
+				     Uint32 unused)
 {
-  Uint32 opType;
-  OperationrecPtr firstOpPtr;
-  PagePtr pagePtr;
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
 
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
+  ljamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
+  ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
+  regOperPtr.p->op_struct.m_wait_log_buffer= 0;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page);
+  
+  execTUP_COMMITREQ(signal);
+  ndbassert(signal->theData[0] == 0);
+  
+  c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
 
-  setTupleStatesSetOpType(regOperPtr, pagePtr.p, opType, firstOpPtr);
+void
+Dbtup::fix_commit_order(OperationrecPtr opPtr)
+{
+  ndbassert(!opPtr.p->is_first_operation());
+  OperationrecPtr firstPtr = opPtr;
+  while(firstPtr.p->prevActiveOp != RNIL)
+  {
+    firstPtr.i = firstPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(firstPtr);    
+  }
 
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
+  ndbout_c("fix_commit_order (swapping %d and %d)",
+	   opPtr.i, firstPtr.i);
+  
+  /**
+   * Swap data between first and curr
+   */
+  Uint32 prev= opPtr.p->prevActiveOp;
+  Uint32 next= opPtr.p->nextActiveOp;
+  Uint32 seco= firstPtr.p->nextActiveOp;
+
+  Operationrec tmp = *opPtr.p;
+  * opPtr.p = * firstPtr.p;
+  * firstPtr.p = tmp;
+
+  c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i;
+  c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i;
+  if(next != RNIL)
+    c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i;
+}
 
-  if (opType == ZINSERT_DELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting the tuple and ended by deleting. Seen from
-// transactions point of view no changes were made.
-//--------------------------------------------------------------------
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    return;
-  } else if (opType == ZINSERT) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting whereafter we made several changes to the
-// tuple that could include updates, deletes and new inserts. The final
-// state of the tuple is the original tuple. This is reached from this
-// operation. We change the optype on this operation to ZINSERT to
-// ensure proper operation of the detached trigger.
-// We restore the optype after executing triggers although not really
-// needed.
-//--------------------------------------------------------------------
-    Uint32 saveOpType = regOperPtr->optype;
-    regOperPtr->optype = ZINSERT;
-    operPtr.p = regOperPtr;
-
-    checkDetachedTriggers(signal,
-                          regOperPtr,
-                          regTabPtr);
+/* ----------------------------------------------------------------- */
+/* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
+/* ----------------------------------------------------------------- */
+void Dbtup::execTUP_COMMITREQ(Signal* signal) 
+{
+  FragrecordPtr regFragPtr;
+  OperationrecPtr regOperPtr;
+  TablerecPtr regTabPtr;
+  KeyReqStruct req_struct;
+  TransState trans_state;
+  Uint32 no_of_fragrec, no_of_tablerec, hash_value, gci;
 
-    regOperPtr->optype = saveOpType;
-  } else if (opType == ZUPDATE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple
-// reference. This operation contains the before value of this record
-// for this transaction. Then this operation is used for executing
-// triggers with optype set to update.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-
-    Uint32 saveOpType = befOpPtr.p->optype;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> saveAttributeMask;
-
-    calculateChangeMask(pagePtr.p,
-                        regTabPtr,
-                        befOpPtr.p->pageOffset,
-                        attributeMask);
-
-    saveAttributeMask.clear();
-    saveAttributeMask.bitOR(befOpPtr.p->changeMask);
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(attributeMask);
-    befOpPtr.p->gci = regOperPtr->gci;
-    
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
 
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(saveAttributeMask);
+  regOperPtr.i= tupCommitReq->opPtr;
+  ljamEntry();
 
-    befOpPtr.p->optype = saveOpType;
-  } else if (opType == ZDELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple.
-// We benefit from the fact that we know that it cannot be a simple
-// delete and it cannot be an insert followed by a delete. Thus there
-// must either be an update or a insert following a delete. In both
-// cases we will find a before value in a copy tuple.
-//
-// An added complexity is that the trigger handling assumes that the
-// before value is located in the original tuple so we have to move the
-// copy tuple reference to the original tuple reference and afterwards
-// restore it again.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-    Uint32 saveOpType = befOpPtr.p->optype;
-
-    Uint32 realPageId = befOpPtr.p->realPageId;
-    Uint32 pageOffset = befOpPtr.p->pageOffset;
-    Uint32 fragPageId = befOpPtr.p->fragPageId;
-    Uint32 pageIndex  = befOpPtr.p->pageIndex;
-
-    befOpPtr.p->realPageId = befOpPtr.p->realPageIdC;
-    befOpPtr.p->pageOffset = befOpPtr.p->pageOffsetC;
-    befOpPtr.p->fragPageId = befOpPtr.p->fragPageIdC;
-    befOpPtr.p->pageIndex  = befOpPtr.p->pageIndexC;
-    befOpPtr.p->gci = regOperPtr->gci;
-
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
-
-    befOpPtr.p->realPageId = realPageId;
-    befOpPtr.p->pageOffset = pageOffset;
-    befOpPtr.p->fragPageId = fragPageId;
-    befOpPtr.p->pageIndex  = pageIndex;
-    befOpPtr.p->optype     = saveOpType;
-  } else {
-    ndbrequire(false);
-  }//if
+  c_operation_pool.getPtr(regOperPtr);
+  if(!regOperPtr.p->is_first_operation())
+  {
+    /**
+     * Out of order commit 
+     */
+    fix_commit_order(regOperPtr);
+  }
+  ndbassert(regOperPtr.p->is_first_operation());
+  
+  regFragPtr.i= regOperPtr.p->fragmentPtr;
+  trans_state= get_trans_state(regOperPtr.p);
+
+  no_of_fragrec= cnoOfFragrec;
+
+  ndbrequire(trans_state == TRANS_STARTED);
+  ptrCheckGuard(regFragPtr, no_of_fragrec, fragrecord);
+
+  no_of_tablerec= cnoOfTablerec;
+  regTabPtr.i= regFragPtr.p->fragTableId;
+  hash_value= tupCommitReq->hashValue;
+  gci= tupCommitReq->gci;
+
+  req_struct.signal= signal;
+  req_struct.hash_value= hash_value;
+  req_struct.gci= gci;
+
+  ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
+
+  PagePtr page;
+  Tuple_header* tuple_ptr= 0;
+  if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+  {
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
 
-  commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-  if (regTabPtr->GCPIndicator) {
-    updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-  }//if
-}//Dbtup::commitRecord()
+    Page_cache_client::Request req;
+    /**
+     * Check for page
+     */
+    if(!regOperPtr.p->m_copy_tuple_location.isNull())
+    {
+      Tuple_header* tmp= (Tuple_header*)
+	c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location);
+      
+      memcpy(&req.m_page, 
+	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+    } 
+    else
+    {
+      // initial delete
+      ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
+      tuple_ptr= (Tuple_header*)
+	get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
+      memcpy(&req.m_page, 
+	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+    }
+    req.m_callback.m_callbackData= regOperPtr.i;
+    req.m_callback.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_commit_callback);
+
+    int flags= regOperPtr.p->op_struct.op_type |
+      Page_cache_client::COMMIT_REQ | Page_cache_client::STRICT_ORDER;
+    int res= m_pgman.get_page(signal, req, flags);
+    switch(res){
+    case 0:
+      /**
+       * Timeslice
+       */
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+    regOperPtr.p->m_commit_disk_callback_page= res;
+    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  } 
+  
+  if(regOperPtr.p->op_struct.m_wait_log_buffer)
+  {
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
+    
+    Callback cb;
+    cb.m_callbackData= regOperPtr.i;
+    cb.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_log_buffer_callback);
+    Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+    
+    Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+    int res= lgman.get_log_buffer(signal, sz, &cb);
+    switch(res){
+    case 0:
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+  }
+  
+  if(!tuple_ptr)
+  {
+    req_struct.m_tuple_ptr= tuple_ptr = (Tuple_header*)
+      get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
+  }
+  
+  if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
+  {
+    /**
+     * Execute all tux triggers at first commit
+     *   since previous tuple is otherwise removed...
+     *   btw...is this a "good" solution??
+     *   
+     *   why can't we instead remove "own version" (when approriate ofcourse)
+     */
+    if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
+      ljam();
+      OperationrecPtr loopPtr= regOperPtr;
+      while(loopPtr.i != RNIL)
+      {
+	c_operation_pool.getPtr(loopPtr);
+	executeTuxCommitTriggers(signal,
+				 loopPtr.p,
+				 regFragPtr.p,
+				 regTabPtr.p);
+	set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
+	loopPtr.i = loopPtr.p->nextActiveOp;
+      }
+    }
+  }
+  
+  if(regOperPtr.p->is_last_operation())
+  {
+    /**
+     * Perform "real" commit
+     */
+    set_change_mask_info(&req_struct, regOperPtr.p);
+    checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p);
+    
+    if(regOperPtr.p->op_struct.op_type != ZDELETE)
+    {
+      commit_operation(signal, gci, tuple_ptr, page.p,
+		       regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+    }
+    else
+    {
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+      dealloc_tuple(signal, gci, page.p, tuple_ptr, 
+		    regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+    }
+  } 
+  else
+  {
+    removeActiveOpList(regOperPtr.p, tuple_ptr);   
+  }
+  
+  initOpConnection(regOperPtr.p);
+  signal->theData[0] = 0;
+}
 
 void
-Dbtup::setTupleStatesSetOpType(Operationrec* const regOperPtr,
-                               Page* const pagePtr,
-                               Uint32& opType,
-                               OperationrecPtr& firstOpPtr)
+Dbtup::set_change_mask_info(KeyReqStruct * const req_struct,
+                            Operationrec * const regOperPtr)
 {
-  OperationrecPtr loopOpPtr;
-  OperationrecPtr lastOpPtr;
-
-  ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[regOperPtr->pageOffset];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  lastOpPtr = loopOpPtr;
-  if (loopOpPtr.p->optype == ZDELETE) {
-    ljam();
-    opType = ZDELETE;
-  } else {
-    ljam();
-    opType = ZUPDATE;
-  }//if
-  do {
-    ljam();
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    firstOpPtr = loopOpPtr;
-    loopOpPtr.p->tupleState = TO_BE_COMMITTED;
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-  } while (loopOpPtr.i != RNIL);
-  if (opType == ZDELETE) {
-    ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT_DELETE;
-    }//if
-  } else {
+  ChangeMaskState change_mask= get_change_mask_state(regOperPtr);
+  if (change_mask == USE_SAVED_CHANGE_MASK) {
     ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT;
-    }//if
-  }///if
-}//Dbtup::setTupleStatesSetOpType()
-
-void Dbtup::findBeforeValueOperation(OperationrecPtr& befOpPtr,
-                                     OperationrecPtr firstOpPtr)
-{
-  befOpPtr = firstOpPtr;
-  if (befOpPtr.p->realPageIdC != RNIL) {
+    req_struct->changeMask.setWord(0, regOperPtr->saved_change_mask[0]);
+    req_struct->changeMask.setWord(1, regOperPtr->saved_change_mask[1]);
+    //get saved state
+  } else if (change_mask == RECALCULATE_CHANGE_MASK) {
+    ljam();
+    //Recompute change mask, for now set all bits
+    req_struct->changeMask.set();
+  } else if (change_mask == SET_ALL_MASK) {
     ljam();
-    return;
+    req_struct->changeMask.set();
   } else {
     ljam();
-    befOpPtr.i = befOpPtr.p->prevActiveOp;
-    ptrCheckGuard(befOpPtr, cnoOfOprec, operationrec);
-    ndbrequire(befOpPtr.p->realPageIdC != RNIL);
-  }//if
-}//Dbtup::findBeforeValueOperation()
+    ndbrequire(change_mask == DELETE_CHANGES);
+  }
+}
 
 void
 Dbtup::calculateChangeMask(Page* const pagePtr,
                            Tablerec* const regTabPtr,
-                           Uint32 pageOffset,
-                           Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask)
+                           KeyReqStruct * const req_struct)
 {
   OperationrecPtr loopOpPtr;
-
-  attributeMask.clear();
-  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[pageOffset];
+  Uint32 saved_word1= 0;
+  Uint32 saved_word2= 0;
+  loopOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
   do {
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    if (loopOpPtr.p->optype == ZUPDATE) {
+    c_operation_pool.getPtr(loopOpPtr);
+    ndbrequire(loopOpPtr.p->op_struct.op_type == ZUPDATE);
+    ChangeMaskState change_mask= get_change_mask_state(loopOpPtr.p);
+    if (change_mask == USE_SAVED_CHANGE_MASK) {
+      ljam();
+      saved_word1|= loopOpPtr.p->saved_change_mask[0];
+      saved_word2|= loopOpPtr.p->saved_change_mask[1];
+    } else if (change_mask == RECALCULATE_CHANGE_MASK) {
       ljam();
-      attributeMask.bitOR(loopOpPtr.p->changeMask);
-    } else if (loopOpPtr.p->optype == ZINSERT) {
-      ljam();
-      attributeMask.set();
+      //Recompute change mask, for now set all bits
+      req_struct->changeMask.set();
       return;
     } else {
-      ndbrequire(loopOpPtr.p->optype == ZDELETE);
-    }//if
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+      ndbrequire(change_mask == SET_ALL_MASK);
+      ljam();
+      req_struct->changeMask.set();
+      return;
+    }
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
   } while (loopOpPtr.i != RNIL);
-}//Dbtup::calculateChangeMask()
+  req_struct->changeMask.setWord(0, saved_word1);
+  req_struct->changeMask.setWord(1, saved_word2);
+}

--- 1.18.1.1/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2005-12-27 16:02:20 +01:00
+++ 1.25/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2005-12-27 18:27:46 +01:00
@@ -41,6 +41,13 @@
 #include <NdbEventOperation.hpp>
 #include "NdbEventOperationImpl.hpp"
 
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
+
+static Gci_container g_empty_gci_container;
+static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4;
+static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
+
 /*
  * Class NdbEventOperationImpl
  *
@@ -49,14 +56,20 @@
 
 //#define EVENT_DEBUG
 
+// todo handle several ndb objects
+// todo free allocated data when closing NdbEventBuffer
 
 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
 					     Ndb *theNdb, 
-					     const char* eventName, 
-					     const int bufferLength) 
-  : NdbEventOperation(*this), m_ndb(theNdb),
-    m_state(EO_ERROR), m_bufferL(bufferLength)
+					     const char* eventName) 
+  : NdbEventOperation(*this), m_facade(&N), m_magic_number(0),
+    m_ndb(theNdb), m_state(EO_ERROR), mi_type(0), m_oid(~(Uint32)0),
+#ifdef VM_TRACE
+    m_data_done_count(0), m_data_count(0),
+#endif
+    m_next(0), m_prev(0)
 {
+  DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
   m_eventId = 0;
   theFirstPkAttrs[0] = NULL;
   theCurrentPkAttrs[0] = NULL;
@@ -66,10 +79,11 @@
   theCurrentDataAttrs[0] = NULL;
   theFirstDataAttrs[1] = NULL;
   theCurrentDataAttrs[1] = NULL;
-  sdata = NULL;
-  ptr[0].p = NULL;
-  ptr[1].p = NULL;
-  ptr[2].p = NULL;
+  m_data_item= NULL;
+  m_eventImpl = NULL;
+
+  m_custom_data= 0;
+  m_has_error= 1;
 
   // we should lookup id in Dictionary, TODO
   // also make sure we only have one listener on each event
@@ -77,49 +91,44 @@
   if (!m_ndb) abort();
 
   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
-  if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; }
+  if (!myDict) { m_error.code= m_ndb->getNdbError().code; DBUG_VOID_RETURN; }
 
   const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
-  if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; }
+  if (!myEvnt) { m_error.code= myDict->getNdbError().code; DBUG_VOID_RETURN; }
 
   m_eventImpl = &myEvnt->m_impl;
 
   m_eventId = m_eventImpl->m_eventId;
 
-  m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
-  if (m_bufferHandle->m_bufferL > 0) 
-    m_bufferL =m_bufferHandle->m_bufferL;
-  else
-    m_bufferHandle->m_bufferL = m_bufferL;
+  m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
+
+  m_state= EO_CREATED;
 
-  m_state = EO_CREATED;
+  m_has_error= 0;
+
+  DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
+  DBUG_VOID_RETURN;
 }
 
 NdbEventOperationImpl::~NdbEventOperationImpl()
 {
-  int i;
-  if (sdata) NdbMem_Free((char*)sdata);
-  for (i=0 ; i<2; i++) {
-    NdbRecAttr *p = theFirstPkAttrs[i];
-    while (p) {
-      NdbRecAttr *p_next = p->next();
-      m_ndb->releaseRecAttr(p);
-      p = p_next;
-    }
-  }
-  for (i=0 ; i<2; i++) {
-    NdbRecAttr *p = theFirstDataAttrs[i];
-    while (p) {
-      NdbRecAttr *p_next = p->next();
-      m_ndb->releaseRecAttr(p);
-      p = p_next;
-    }
-  }
-  if (m_state == EO_EXECUTING) {
-    stop();
-    // m_bufferHandle->dropSubscribeEvent(m_bufferId);
-    ; // We should send stop signal here
+  DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
+  m_magic_number= 0;
+
+  stop();
+  // m_bufferHandle->dropSubscribeEvent(m_bufferId);
+  ; // ToDo? We should send stop signal here
+  
+  m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
+  DBUG_PRINT("exit",("this: 0x%x/0x%x oid: %u", this, m_facade, m_oid));
+
+  if (m_eventImpl)
+  {
+    delete m_eventImpl->m_facade;
+    m_eventImpl= 0;
   }
+
+  DBUG_VOID_RETURN;
 }
 
 NdbEventOperation::State
@@ -133,7 +142,8 @@
 {
   DBUG_ENTER("NdbEventOperationImpl::getValue");
   if (m_state != EO_CREATED) {
-    ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");
+    ndbout_c("NdbEventOperationImpl::getValue may only be called between "
+	     "instantiation and execute()");
     DBUG_RETURN(NULL);
   }
 
@@ -244,42 +254,22 @@
     
   }
 
-  NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
-
-
-  int hasSubscriber;
-  int r= m_bufferHandle->prepareAddSubscribeEvent(this,
-						  hasSubscriber /*return value*/);
-
-  if (r < 0)
-  {
-    m_error.code= 4709;
-    DBUG_RETURN(-1);
-  }
-
-  m_eventImpl->m_bufferId = m_bufferId = (Uint32)r;
-
-  r = -1;
-  if (m_bufferId >= 0) {
-    // now we check if there's already a subscriber
-
-    if (hasSubscriber == 0) { // only excute if there's no other subscribers 
-      r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
-    } else {
-      r = 0;
-    }
-    if (r) {
-      //Error
-      m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
-      m_state = EO_ERROR;
-    } else {
-      m_bufferHandle->addSubscribeEvent(m_bufferId, this);
-      m_state = EO_EXECUTING;
-    }
-  } else {
-    //Error
-    m_state = EO_ERROR;
+  m_ndb->theEventBuffer->add_drop_lock();
+  m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
+  m_state= EO_EXECUTING;
+  mi_type= m_eventImpl->mi_type;
+  m_ndb->theEventBuffer->add_op();
+  int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
+  if (r == 0) {
+    m_ndb->theEventBuffer->add_drop_unlock();
+    DBUG_RETURN(0);
   }
+  //Error
+  m_state= EO_ERROR;
+  mi_type= 0;
+  m_magic_number= 0;
+  m_ndb->theEventBuffer->remove_op();
+  m_ndb->theEventBuffer->add_drop_unlock();
   DBUG_RETURN(r);
 }
 
@@ -287,249 +277,204 @@
 NdbEventOperationImpl::stop()
 {
   DBUG_ENTER("NdbEventOperationImpl::stop");
+  int i;
+
+  for (i=0 ; i<2; i++) {
+    NdbRecAttr *p = theFirstPkAttrs[i];
+    while (p) {
+      NdbRecAttr *p_next = p->next();
+      m_ndb->releaseRecAttr(p);
+      p = p_next;
+    }
+    theFirstPkAttrs[i]= 0;
+  }
+  for (i=0 ; i<2; i++) {
+    NdbRecAttr *p = theFirstDataAttrs[i];
+    while (p) {
+      NdbRecAttr *p_next = p->next();
+      m_ndb->releaseRecAttr(p);
+      p = p_next;
+    }
+    theFirstDataAttrs[i]= 0;
+  }
+
   if (m_state != EO_EXECUTING)
   {
     DBUG_RETURN(-1);
   }
 
-  //  ndbout_c("NdbEventOperation::stopping()");
-
   NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
   if (!myDict) {
     m_error.code= m_ndb->getNdbError().code;
     DBUG_RETURN(-1);
   }
 
-  NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
-
-  int hasSubscriber;
-  int ret = 
-    m_bufferHandle->prepareDropSubscribeEvent(m_bufferId,
-					      hasSubscriber /* return value */);
-
-  if (ret < 0) {
-    m_error.code= 4712;
-    DBUG_RETURN(-1);
-  }
-  //  m_eventImpl->m_bufferId = m_bufferId;
-
-  int r = -1;
-
-  if (hasSubscriber == 0) { // only excute if there's no other subscribers
-    r = myDictImpl.stopSubscribeEvent(*m_eventImpl);
-#ifdef EVENT_DEBUG
-    ndbout_c("NdbEventOperation::stopping() done");
-#endif
-  } else
-    r = 0;
-
-  if (r) {
-    //Error
-    m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
-    m_error.code= myDictImpl.m_error.code;
-    m_state = EO_ERROR;
-  } else {
-#ifdef EVENT_DEBUG
-    ndbout_c("NdbEventOperation::dropping()");
-#endif
-    m_bufferHandle->dropSubscribeEvent(m_bufferId);
-    m_state = EO_CREATED;
+  m_ndb->theEventBuffer->add_drop_lock();
+  int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
+  m_ndb->theEventBuffer->remove_op();
+  m_state= EO_DROPPED;
+  mi_type= 0;
+  if (r == 0) {
+    m_ndb->theEventBuffer->add_drop_unlock();
+    DBUG_RETURN(0);
   }
-
+  //Error
+  m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
+  m_state= EO_ERROR;
+  m_ndb->theEventBuffer->add_drop_unlock();
   DBUG_RETURN(r);
 }
 
-bool
-NdbEventOperationImpl::isConsistent()
-{
-  return sdata->isGCIConsistent();
-}
-
-Uint32
+Uint64
 NdbEventOperationImpl::getGCI()
 {
-  return sdata->gci;
+  return m_data_item->sdata->gci;
 }
 
-Uint32
+Uint64
 NdbEventOperationImpl::getLatestGCI()
 {
-  return NdbGlobalEventBufferHandle::getLatestGCI();
+  return m_ndb->theEventBuffer->getLatestGCI();
 }
 
 int
-NdbEventOperationImpl::next(int *pOverrun)
+NdbEventOperationImpl::receive_event()
 {
-  DBUG_ENTER("NdbEventOperationImpl::next");
-  int nr = 10000; // a high value
-  int tmpOverrun = 0;
-  int *ptmpOverrun;
-  if (pOverrun) {
-    ptmpOverrun = &tmpOverrun;
-  } else
-    ptmpOverrun = NULL;
-
-  while (nr > 0) {
-    int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata,
-					       ptr, pOverrun);
-    if (pOverrun) {
-      tmpOverrun += *pOverrun;
-      *pOverrun = tmpOverrun;
-    }
-
-    if (r <= 0) 
-    {
-      DBUG_RETURN(r); // no data
-    }
+  DBUG_ENTER("NdbEventOperationImpl::receive_event");
 
-    if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever
-  
-#ifdef EVENT_DEBUG
-    ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);
-#endif
+  Uint32 operation= (Uint32)m_data_item->sdata->operation;
+  DBUG_PRINT("info",("sdata->operation %u",operation));
 
-    // now move the data into the RecAttrs
-    if ((theFirstPkAttrs[0] == NULL) && 
-	(theFirstPkAttrs[1] == NULL) &&
-	(theFirstDataAttrs[0] == NULL) && 
-	(theFirstDataAttrs[1] == NULL)) 
-    {
-      DBUG_RETURN(r);
-    }
-    // no copying since no RecAttr's
+  if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
+  {
+    DBUG_RETURN(1);
+  }
 
+  // now move the data into the RecAttrs
+    
+  int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
 
-    Uint32 *aAttrPtr = ptr[0].p;
-    Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
-    Uint32 *aDataPtr = ptr[1].p;
+  Uint32 *aAttrPtr = m_data_item->ptr[0].p;
+  Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
+  Uint32 *aDataPtr = m_data_item->ptr[1].p;
 
-#ifdef EVENT_DEBUG
-    int i;
-    printf("after values sz=%u\n", ptr[1].sz);
-    for(i=0; i < (int)ptr[1].sz; i++)
-      printf ("H'%.8X ",ptr[1].p[i]);
-    printf("\n");
-    printf("before values sz=%u\n", ptr[2].sz);
-    for(i=0; i < (int)ptr[2].sz; i++)
-      printf ("H'%.8X ",ptr[2].p[i]);
-    printf("\n");
-#endif
+  DBUG_DUMP("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
+  DBUG_DUMP("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
 
-    // copy data into the RecAttr's
-    // we assume that the respective attribute lists are sorted
+  // copy data into the RecAttr's
+  // we assume that the respective attribute lists are sorted
 
-    // first the pk's
+  // first the pk's
+  {
+    NdbRecAttr *tAttr= theFirstPkAttrs[0];
+    NdbRecAttr *tAttr1= theFirstPkAttrs[1];
+    while(tAttr)
     {
-      NdbRecAttr *tAttr= theFirstPkAttrs[0];
-      while(tAttr)
+      assert(aAttrPtr < aAttrEndPtr);
+      unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize();
+      assert(tAttr->attrId() ==
+	     AttributeHeader(*aAttrPtr).getAttributeId());
+      receive_data(tAttr, aDataPtr, tDataSz);
+      if (is_update)
       {
-	assert(aAttrPtr < aAttrEndPtr);
-	unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
-	assert(tAttr->attrId() ==
-	       AttributeHeader(*aAttrPtr).getAttributeId());
-	assert(tAttr->receive_data(aDataPtr, tDataSz));
-        // next
-	aAttrPtr++;
-	aDataPtr+= tDataSz;
-	tAttr= tAttr->next();
+	receive_data(tAttr1, aDataPtr, tDataSz);
+	tAttr1= tAttr1->next();
       }
+      // next
+      aAttrPtr++;
+      aDataPtr+= (tDataSz + 3) >> 2;
+      tAttr= tAttr->next();
     }
-
-    NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
-
-    Uint32 tRecAttrId;
-    Uint32 tAttrId;
-    Uint32 tDataSz;
-    int hasSomeData=0;
-    while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
-      tRecAttrId = tWorkingRecAttr->attrId();
-      tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
-      tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
-      
-      while (tAttrId > tRecAttrId) {
-	//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
-	tWorkingRecAttr->setUNDEFINED();
-	tWorkingRecAttr = tWorkingRecAttr->next();
-	if (tWorkingRecAttr == NULL)
-	  break;
-	tRecAttrId = tWorkingRecAttr->attrId();
-      }
+  }
+  
+  NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
+  
+  Uint32 tRecAttrId;
+  Uint32 tAttrId;
+  Uint32 tDataSz;
+  int hasSomeData=0;
+  while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
+    tRecAttrId = tWorkingRecAttr->attrId();
+    tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
+    tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
+    
+    while (tAttrId > tRecAttrId) {
+      DBUG_PRINT("info",("undef [%u] %u 0x%x [%u] 0x%x",
+			 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
+      tWorkingRecAttr->setUNDEFINED();
+      tWorkingRecAttr = tWorkingRecAttr->next();
       if (tWorkingRecAttr == NULL)
 	break;
+      tRecAttrId = tWorkingRecAttr->attrId();
+    }
+    if (tWorkingRecAttr == NULL)
+      break;
+    
+    if (tAttrId == tRecAttrId) {
+      hasSomeData++;
       
-      //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+      DBUG_PRINT("info",("set [%u] %u 0x%x [%u] 0x%x",
+			 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
       
-      if (tAttrId == tRecAttrId) {
-	hasSomeData++;
-	
-	//printf("set!\n");
-	
-	assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
-	tWorkingRecAttr = tWorkingRecAttr->next();
-      }
-      aAttrPtr++;
-      aDataPtr += tDataSz;
+      receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
+      tWorkingRecAttr = tWorkingRecAttr->next();
     }
+    aAttrPtr++;
+    aDataPtr += (tDataSz + 3) >> 2;
+  }
     
-    while (tWorkingRecAttr != NULL) {
-      tRecAttrId = tWorkingRecAttr->attrId();
-      //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+  while (tWorkingRecAttr != NULL) {
+    tRecAttrId = tWorkingRecAttr->attrId();
+    //printf("set undefined [%u] %u %u [%u]\n",
+    //       tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+    tWorkingRecAttr->setUNDEFINED();
+    tWorkingRecAttr = tWorkingRecAttr->next();
+  }
+  
+  tWorkingRecAttr = theFirstDataAttrs[1];
+  aDataPtr = m_data_item->ptr[2].p;
+  Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
+  while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
+    tRecAttrId = tWorkingRecAttr->attrId();
+    tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
+    tDataSz = AttributeHeader(*aDataPtr).getByteSize();
+    aDataPtr++;
+    while (tAttrId > tRecAttrId) {
       tWorkingRecAttr->setUNDEFINED();
       tWorkingRecAttr = tWorkingRecAttr->next();
-    }
-    
-    tWorkingRecAttr = theFirstDataAttrs[1];
-    aDataPtr = ptr[2].p;
-    Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
-    while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
-      tRecAttrId = tWorkingRecAttr->attrId();
-      tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
-      tDataSz = AttributeHeader(*aDataPtr).getDataSize();
-      aDataPtr++;
-      while (tAttrId > tRecAttrId) {
-	tWorkingRecAttr->setUNDEFINED();
-	tWorkingRecAttr = tWorkingRecAttr->next();
-	if (tWorkingRecAttr == NULL)
-	  break;
-	tRecAttrId = tWorkingRecAttr->attrId();
-      }
       if (tWorkingRecAttr == NULL)
 	break;
-      if (tAttrId == tRecAttrId) {
-	assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
-	hasSomeData++;
-	
-	assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
-	tWorkingRecAttr = tWorkingRecAttr->next();
-      }
-      aDataPtr += tDataSz;
+      tRecAttrId = tWorkingRecAttr->attrId();
     }
-    while (tWorkingRecAttr != NULL) {
-      tWorkingRecAttr->setUNDEFINED();
+    if (tWorkingRecAttr == NULL)
+      break;
+    if (tAttrId == tRecAttrId) {
+      assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
+      hasSomeData++;
+      
+      receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
       tWorkingRecAttr = tWorkingRecAttr->next();
     }
-    
-    if (hasSomeData)
-    {
-      DBUG_RETURN(r);
-    }
+    aDataPtr += (tDataSz + 3) >> 2;
+  }
+  while (tWorkingRecAttr != NULL) {
+    tWorkingRecAttr->setUNDEFINED();
+    tWorkingRecAttr = tWorkingRecAttr->next();
   }
+  
+  if (hasSomeData || !is_update)
+  {
+    DBUG_RETURN(1);
+  }
+
   DBUG_RETURN(0);
 }
 
 NdbDictionary::Event::TableEvent 
 NdbEventOperationImpl::getEventType()
 {
-  switch (sdata->operation) {
-  case TriggerEvent::TE_INSERT:
-    return NdbDictionary::Event::TE_INSERT;
-  case TriggerEvent::TE_DELETE:
-    return NdbDictionary::Event::TE_DELETE;
-  case TriggerEvent::TE_UPDATE:
-    return NdbDictionary::Event::TE_UPDATE;
-  default:
-    return NdbDictionary::Event::TE_ALL;
-  }
+  return (NdbDictionary::Event::TableEvent)
+    (1 << (unsigned)m_data_item->sdata->operation);
 }
 
 
@@ -563,9 +508,9 @@
 void
 NdbEventOperationImpl::printAll()
 {
-  Uint32 *aAttrPtr = ptr[0].p;
-  Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
-  Uint32 *aDataPtr = ptr[1].p;
+  Uint32 *aAttrPtr = m_data_item->ptr[0].p;
+  Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
+  Uint32 *aDataPtr = m_data_item->ptr[1].p;
 
   //tRecAttr->setup(tAttrInfo, aValue)) {
 
@@ -580,787 +525,879 @@
   }
 }
 
-
-int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber)
-{
-  return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber);
-}
-
 /*
- * Global variable ndbGlobalEventBuffer
- * Class NdbGlobalEventBufferHandle
- * Class NdbGlobalEventBuffer
- *
+ * Class NdbEventBuffer
+ * Each Ndb object has a Object.
  */
 
-#define ADD_DROP_LOCK_GUARDR(TYPE, FN) \
-{ \
-  ndbGlobalEventBuffer->add_drop_lock(); \
-  ndbGlobalEventBuffer->lock(); \
-  TYPE r = ndbGlobalEventBuffer->FN; \
-  ndbGlobalEventBuffer->unlock(); \
-  if (r < 0) { \
-    ndbGlobalEventBuffer->add_drop_unlock(); \
-  } \
-  return r;\
-}
-#define GUARDR(TYPE, FN) \
-{ \
-  ndbGlobalEventBuffer->lock(); \
-  TYPE r = ndbGlobalEventBuffer->FN; \
-  ndbGlobalEventBuffer->unlock(); \
-  return r;\
-}
-#define GUARD(FN) \
-{ \
-  ndbGlobalEventBuffer->lock(); \
-  ndbGlobalEventBuffer->FN; \
-  ndbGlobalEventBuffer->unlock(); \
-}
-#define ADD_DROP_UNLOCK_GUARD(FN) \
-{ \
-  GUARD(FN); \
-  ndbGlobalEventBuffer->add_drop_unlock(); \
-}
-#define GUARDBLOCK(BLOCK) \
-{ \
-  ndbGlobalEventBuffer->lock(); \
-  BLOCK \
-  ndbGlobalEventBuffer->unlock(); \
-}
+// ToDo ref count this so it get's destroyed
+NdbMutex *NdbEventBuffer::p_add_drop_mutex= 0;
 
-/*
- * Global variable ndbGlobalEventBuffer
- *
- */
-
-extern NdbMutex * ndb_global_event_buffer_mutex;
-static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL;
-
-/*
- * Class NdbGlobalEventBufferHandle
- * Each Ndb object has a Handle.  This Handle is used to access the
- * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer
- */
-
-NdbGlobalEventBufferHandle *
-NdbGlobalEventBuffer_init(int n) 
-{
-  return new NdbGlobalEventBufferHandle(n);
-  // return NdbGlobalEventBufferHandle::init(n);
-}
-
-void
-NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h) 
+NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
+  m_system_nodes(ndb->theImpl->theNoOfDBnodes),
+  m_ndb(ndb),
+  m_latestGCI(0),
+  m_total_alloc(0),
+  m_free_thresh(10),
+  m_min_free_thresh(10),
+  m_max_free_thresh(100),
+  m_gci_slip_thresh(3),
+  m_dropped_ev_op(0),
+  m_active_op_count(0)
 {
-  delete h;
-}
+#ifdef VM_TRACE
+  m_latest_command= "NdbEventBuffer::NdbEventBuffer";
+#endif
 
-NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle
-(int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0)
-{
   if ((p_cond = NdbCondition_Create()) ==  NULL) {
-    ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed");
+    ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
     exit(-1);
   }
-  
-  NdbMutex_Lock(ndb_global_event_buffer_mutex);
-  if (ndbGlobalEventBuffer == NULL) {
-    if (ndbGlobalEventBuffer == NULL) {
-      ndbGlobalEventBuffer = new NdbGlobalEventBuffer();
-      if (!ndbGlobalEventBuffer) {
-	NdbMutex_Unlock(ndb_global_event_buffer_mutex);
-	ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer");
-	exit(-1);
-      }
+  m_mutex= ndb->theImpl->theWaiter.m_mutex;
+  lock();
+  if (p_add_drop_mutex == 0)
+  {
+    if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
+      ndbout_c("NdbEventBuffer: NdbMutex_Create() failed");
+      exit(-1);
     }
   }
-  NdbMutex_Unlock(ndb_global_event_buffer_mutex);
+  unlock();
+
+  // ToDo set event buffer size
+  // pre allocate event data array
+  m_sz= 0;
+#ifdef VM_TRACE
+  m_free_data_count= 0;
+#endif
+  m_free_data= 0;
+  m_free_data_sz= 0;
 
-  GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));
+  // initialize lists
+  bzero(&g_empty_gci_container, sizeof(Gci_container));
+  init_gci_containers();
 }
 
-NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
+NdbEventBuffer::~NdbEventBuffer()
 {
+  // todo lock?  what if receive thread writes here?
+  for (unsigned j= 0; j < m_allocated_data.size(); j++)
+  {
+    unsigned sz= m_allocated_data[j]->sz;
+    EventBufData *data= m_allocated_data[j]->data;
+    EventBufData *end_data= data+sz;
+    for (; data < end_data; data++)
+    {
+      if (data->sdata)
+	NdbMem_Free(data->sdata);
+    }
+    NdbMem_Free((char*)m_allocated_data[j]);
+  }
+
   NdbCondition_Destroy(p_cond);
 
-  ndbGlobalEventBuffer->lock();
-  ndbGlobalEventBuffer->real_remove(this);
-  ndbGlobalEventBuffer->unlock();
-
-  NdbMutex_Lock(ndb_global_event_buffer_mutex);
-  if (ndbGlobalEventBuffer->m_handlers.size() == 0) {
-    delete ndbGlobalEventBuffer;
-    ndbGlobalEventBuffer = NULL;
+  lock();
+  if (p_add_drop_mutex)
+  {
+    NdbMutex_Destroy(p_add_drop_mutex);
+    p_add_drop_mutex = 0;
   }
-  NdbMutex_Unlock(ndb_global_event_buffer_mutex);
+  unlock();
 }
 
 void
-NdbGlobalEventBufferHandle::addBufferId(int bufferId)
+NdbEventBuffer::add_op()
 {
-  DBUG_ENTER("NdbGlobalEventBufferHandle::addBufferId");
-  DBUG_PRINT("enter",("bufferId=%d",bufferId));
-  if (m_nids >= NDB_MAX_ACTIVE_EVENTS) {
-    ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting");
-    exit(-1);
+  if(m_active_op_count == 0)
+  {
+    init_gci_containers();
   }
-  m_bufferIds[m_nids] = bufferId;
-  m_nids++;
-  DBUG_VOID_RETURN;
+  m_active_op_count++;
 }
 
 void
-NdbGlobalEventBufferHandle::dropBufferId(int bufferId)
-{
-  DBUG_ENTER("NdbGlobalEventBufferHandle::dropBufferId");
-  DBUG_PRINT("enter",("bufferId=%d",bufferId));
-  for (int i = 0; i < m_nids; i++)
-    if (m_bufferIds[i] == bufferId) {
-      m_nids--;
-      for (; i < m_nids; i++)
-	m_bufferIds[i] = m_bufferIds[i+1];
-      DBUG_VOID_RETURN;
-    }
-  ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist",
-	   bufferId);
-  exit(-1);
-}
-/*
-NdbGlobalEventBufferHandle *
-NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS)
+NdbEventBuffer::remove_op()
 {
-  return new NdbGlobalEventBufferHandle();
-}
-void
-NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
-{
-  delete handle;
-}
-*/
-int 
-NdbGlobalEventBufferHandle::prepareAddSubscribeEvent
-(NdbEventOperationImpl *eventOp, int& hasSubscriber)
-{
-  ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp,
-							 hasSubscriber));
-}
-void
-NdbGlobalEventBufferHandle::addSubscribeEvent
-(int bufferId, NdbEventOperationImpl *ndbEventOperationImpl)
-{
-  ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId, ndbEventOperationImpl));
+  m_active_op_count--;
 }
+
 void
-NdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId)
+NdbEventBuffer::init_gci_containers()
 {
-  ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId));
+  bzero(&m_complete_data, sizeof(m_complete_data));
+  m_latest_complete_GCI = m_latestGCI = 0;
+  m_active_gci.clear();
+  m_active_gci.fill(2 * ACTIVE_GCI_DIRECTORY_SIZE - 1, g_empty_gci_container);
 }
 
-int 
-NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId,
-						     int& hasSubscriber)
+int NdbEventBuffer::expand(unsigned sz)
 {
-  ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId, hasSubscriber));
-}
+  unsigned alloc_size=
+    sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData);
+  EventBufData_chunk *chunk_data=
+    (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
 
-void
-NdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId)
-{
-  ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId));
-}
+  chunk_data->sz= sz;
+  m_allocated_data.push_back(chunk_data);
 
-void 
-NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId)
-{
-  ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId));
-}
+  EventBufData *data= chunk_data->data;
+  EventBufData *end_data= data+sz;
+  EventBufData *last_data= m_free_data;
 
-int 
-NdbGlobalEventBufferHandle::insertDataL(int bufferId,
-					const SubTableData * const sdata,
-					LinearSectionPtr ptr[3])
-{
-  GUARDR(int,real_insertDataL(bufferId,sdata,ptr));
-}
- 
-void
-NdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci)
-{
-  GUARD(real_latestGCI(bufferId,gci));
-}
- 
-Uint32
-NdbGlobalEventBufferHandle::getLatestGCI()
-{
-  GUARDR(Uint32, real_getLatestGCI());
-}
- 
-inline void
-NdbGlobalEventBufferHandle::group_lock()
-{
-  ndbGlobalEventBuffer->group_lock();
-}
+  bzero((void*)data, sz*sizeof(EventBufData));
+  for (; data < end_data; data++)
+  {
+    data->m_next= last_data;
+    last_data= data;
+  }
+  m_free_data= last_data;
 
-inline void
-NdbGlobalEventBufferHandle::group_unlock()
-{
-  ndbGlobalEventBuffer->group_unlock();
+  m_sz+= sz;
+#ifdef VM_TRACE
+  m_free_data_count+= sz;
+#endif
+  return 0;
 }
 
 int
-NdbGlobalEventBufferHandle::wait(int aMillisecondNumber)
+NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
 {
-  GUARDR(int, real_wait(this, aMillisecondNumber));
+  int ret= 1;
+#ifdef VM_TRACE
+  const char *m_latest_command_save= m_latest_command;
+  m_latest_command= "NdbEventBuffer::pollEvents";
+#endif
+
+  NdbMutex_Lock(m_mutex);
+  NdbEventOperationImpl *ev_op= move_data();
+  if (unlikely(ev_op == 0))
+  {
+    NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
+    ev_op= move_data();
+    if (unlikely(ev_op == 0))
+      ret= 0;
+  }
+  if (latestGCI)
+    *latestGCI= m_latestGCI;
+#ifdef VM_TRACE
+  if (ev_op)
+  {
+    // m_mutex is locked
+    // update event ops data counters
+    ev_op->m_data_count-= ev_op->m_data_done_count;
+    ev_op->m_data_done_count= 0;
+  }
+  m_latest_command= m_latest_command_save;
+#endif
+  NdbMutex_Unlock(m_mutex); // we have moved the data
+  return ret;
 }
 
-int NdbGlobalEventBufferHandle::getDataL(const int bufferId,
-					 SubTableData * &sdata,
-					 LinearSectionPtr ptr[3],
-					 int *pOverrun)
+NdbEventOperation *
+NdbEventBuffer::nextEvent()
 {
-  GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun));
-}
+  DBUG_ENTER("NdbEventBuffer::nextEvent");
+#ifdef VM_TRACE
+  const char *m_latest_command_save= m_latest_command;
+#endif
 
-/*
- * Class NdbGlobalEventBuffer
- *
- *
- */
+  if (m_used_data.m_count > 1024)
+  {
+#ifdef VM_TRACE
+    m_latest_command= "NdbEventBuffer::nextEvent (lock)";
+#endif
+    NdbMutex_Lock(m_mutex);
+    // return m_used_data to m_free_data
+    free_list(m_used_data);
+
+    NdbMutex_Unlock(m_mutex);
+  }
+#ifdef VM_TRACE
+  m_latest_command= "NdbEventBuffer::nextEvent";
+#endif
+
+  EventBufData *data;
+  while ((data= m_available_data.m_head))
+  {
+    NdbEventOperationImpl *op= data->m_event_op;
+
+    // set NdbEventOperation data
+    op->m_data_item= data;
 
+    // remove item from m_available_data
+    m_available_data.remove_first();
+
+    // add it to used list
+    m_used_data.append(data);
+
+#ifdef VM_TRACE
+    op->m_data_done_count++;
+#endif
+
+    int r= op->receive_event();
+    if (r > 0)
+    {
+      if (op->m_state == NdbEventOperation::EO_EXECUTING)
+      {
+#ifdef VM_TRACE
+	m_latest_command= m_latest_command_save;
+#endif
+	DBUG_RETURN(op->m_facade);
+      }
+      // the next event belonged to an event op that is no
+      // longer valid, skip to next
+      continue;
+    }
+#ifdef VM_TRACE
+    m_latest_command= m_latest_command_save;
+#endif
+  }
+  m_error.code= 0;
+#ifdef VM_TRACE
+  m_latest_command= m_latest_command_save;
+#endif
+  DBUG_RETURN(0);
+}
 
 void
-NdbGlobalEventBuffer::lock()
+NdbEventBuffer::lock()
 {
-  if (!m_group_lock_flag)
-    NdbMutex_Lock(ndb_global_event_buffer_mutex);
+  NdbMutex_Lock(m_mutex);
 }
 void
-NdbGlobalEventBuffer::unlock()
+NdbEventBuffer::unlock()
 {
-  if (!m_group_lock_flag)
-    NdbMutex_Unlock(ndb_global_event_buffer_mutex);
+  NdbMutex_Unlock(m_mutex);
 }
 void
-NdbGlobalEventBuffer::add_drop_lock()
+NdbEventBuffer::add_drop_lock()
 {
   NdbMutex_Lock(p_add_drop_mutex);
 }
 void
-NdbGlobalEventBuffer::add_drop_unlock()
+NdbEventBuffer::add_drop_unlock()
 {
   NdbMutex_Unlock(p_add_drop_mutex);
 }
-inline void
-NdbGlobalEventBuffer::group_lock()
-{
-  lock();
-  m_group_lock_flag = 1;
-}
 
-inline void
-NdbGlobalEventBuffer::group_unlock()
-{
-  m_group_lock_flag = 0;
-  unlock();
-}
-
-void
-NdbGlobalEventBuffer::lockB(int bufferId)
-{
-  NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
-}
-void
-NdbGlobalEventBuffer::unlockB(int bufferId)
-{
-  NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
+static
+NdbOut&
+operator<<(NdbOut& out, const Gci_container& gci)
+{
+  out << "[ GCI: " << gci.m_gci
+      << "  state: " << hex << gci.m_state 
+      << "  head: " << hex << gci.m_data.m_head
+      << "  tail: " << hex << gci.m_data.m_tail
+#ifdef VM_TRACE
+      << "  cnt: " << dec << gci.m_data.m_count
+#endif
+      << " gcp: " << dec << gci.m_gcp_complete_rep_count 
+      << "]";
+  return out;
 }
 
-// Private methods
-
-NdbGlobalEventBuffer::NdbGlobalEventBuffer() : 
-  m_handlers(),
-  m_group_lock_flag(0),
-  m_latestGCI(0),
-  m_no(0) // must start at ZERO!
+static
+Gci_container*
+find_bucket_chained(Vector<Gci_container> * active, Uint64 gci)
 {
-  if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
-    ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
-    exit(-1);
-  }
-}
+  Uint32 pos = (gci & ACTIVE_GCI_MASK);
+  Gci_container *bucket= active->getBase() + pos;
 
-NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
-{
-  NdbMutex_Destroy(p_add_drop_mutex);
-  // NdbMem_Deallocate(m_eventBufferIdToEventId);
-}
-void
-NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
-				 int MAX_NUMBER_ACTIVE_EVENTS)
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::real_init");
-  DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
-  if (m_handlers.size() == 0)
-  { // First init
-    DBUG_PRINT("info",("first to come"));
-    m_max = MAX_NUMBER_ACTIVE_EVENTS;
-    m_buf = new BufItem[m_max];
-    for (int i=0; i<m_max; i++) {
-      m_buf[i].gId= 0;
-    }
-  }
-  assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);
-  // TODO make sure we don't hit roof
-  m_handlers.push_back(h);
-  DBUG_VOID_RETURN;
-}
-void
-NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::real_remove");
-  DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
-  for (Uint32 i=0 ; i < m_handlers.size(); i++)
+  if(gci > bucket->m_gci)
   {
-    DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i]));
-    if (m_handlers[i] == h)
+    Gci_container* move;
+    Uint32 move_pos = pos + ACTIVE_GCI_DIRECTORY_SIZE;
+    do 
     {
-      m_handlers.erase(i);
-      if (m_handlers.size() == 0)
+      active->fill(move_pos, g_empty_gci_container);
+      bucket = active->getBase() + pos; // Needs to recomputed after fill
+      move = active->getBase() + move_pos;
+      if(move->m_gcp_complete_rep_count == 0)
       {
-	DBUG_PRINT("info",("last to go"));
-	delete[] m_buf;
-	m_buf = NULL;
+	memcpy(move, bucket, sizeof(Gci_container));
+	bzero(bucket, sizeof(Gci_container));
+	bucket->m_gci = gci;
+	bucket->m_gcp_complete_rep_count = ~(Uint32)0;
+	return bucket;
       }
-      DBUG_VOID_RETURN;
-    }
+      move_pos += ACTIVE_GCI_DIRECTORY_SIZE;
+    } while(true);
+  }
+  else /** gci < bucket->m_gci */
+  {
+    Uint32 size = active->size() - ACTIVE_GCI_DIRECTORY_SIZE;
+    do 
+    {
+      pos += ACTIVE_GCI_DIRECTORY_SIZE;
+      bucket += ACTIVE_GCI_DIRECTORY_SIZE;
+      
+      if(bucket->m_gci == gci)
+	return bucket;
+      
+    } while(pos < size);
+    
+    return 0;
   }
-  ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle");
-  DBUG_PRINT("error",("non-existing handle"));
-  abort();
-  DBUG_VOID_RETURN;
 }
 
-int
-NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
-(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp,
- int& hasSubscriber)
+inline
+Gci_container*
+find_bucket(Vector<Gci_container> * active, Uint64 gci)
 {
-  DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent");
-  int i;
-  int bufferId= -1;
-  Uint32 eventId= eventOp->m_eventId;
+  Uint32 pos = (gci & ACTIVE_GCI_MASK);
+  Gci_container *bucket= active->getBase() + pos;
+  if(likely(gci == bucket->m_gci))
+    return bucket;
 
-  DBUG_PRINT("enter",("eventId: %u", eventId));
-  //  add_drop_lock(); // only one thread can do add or drop at a time
+  return find_bucket_chained(active,gci);
+}
 
-  // Find place where eventId already set
-  for (i=0; i<m_no; i++) {
-    if (m_buf[i].gId == eventId) {
-      bufferId= i;
-      break;
-    }
-  }
-  if (bufferId < 0) {
-    // find space for new bufferId
-    for (i=0; i<m_no; i++) {
-      if (m_buf[i].gId == 0) {
-	bufferId= i; // we found an empty spot
-	goto found_bufferId;
-      }
-    }
-    if (bufferId < 0 &&
-	m_no < m_max) {
-      // room for more so get that
-      bufferId= m_no;
-      m_buf[m_no].gId= 0;
-      m_no++;
-    } else {
-       //      add_drop_unlock();
-      DBUG_PRINT("error",("Can't accept more subscribers:"
-			  " bufferId=%d, m_no=%d, m_max=%d",
-			  bufferId, m_no, m_max));
-      DBUG_RETURN(-1);
-    }
+void
+NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
+{
+  if (unlikely(m_active_op_count == 0))
+  {
+    return;
   }
-found_bufferId:
-
-  BufItem &b= m_buf[ID(bufferId)];
-
-  if (b.gId == 0) { // first subscriber needs some initialization
-
-    bufferId= NO_ID(0, bufferId);
+  
+  DBUG_ENTER("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
 
-    b.gId= eventId;
-    b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;
+  const Uint64 gci= rep->gci;
+  const Uint32 cnt= rep->gcp_complete_rep_count;
 
-    if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) {
-      ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
-      abort();
-    }
+  Gci_container *bucket = find_bucket(&m_active_gci, gci);
 
-    b.subs= 0;
-    b.f= 0;
-    b.sz= 0;
-    b.max_sz= aHandle->m_bufferL;
-    b.data= 
-      (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));
-    for (int i = 0; i < b.max_sz; i++) {
-      b.data[i].sdata= NULL;
-      b.data[i].ptr[0].p= NULL;
-      b.data[i].ptr[1].p= NULL;
-      b.data[i].ptr[2].p= NULL;
-    }
-  } else {
-    DBUG_PRINT("info",
-	       ("TRYING handle one subscriber per event b.subs=%u",b.subs));
-    int ni = -1;
-    for(int i=0; i < b.subs;i++) {
-      if (b.ps[i].theHandle == NULL) {
-	ni = i;
-	break;
-      }
-    }
-    if (ni < 0) {
-      if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) {
-	ni = b.subs;
-      } else {
-	DBUG_PRINT("error",
-		   ("Can't accept more subscribers: b.subs=%d",b.subs));
-	//	add_drop_unlock();
-	DBUG_RETURN(-1);
-      }
+  if (unlikely(bucket == 0))
+  {
+    /**
+     * Already completed GCI...
+     *   Possible in case of resend during NF handling
+     */
+    ndbout << "bucket == 0, gci:" << gci
+	   << " complete: " << m_complete_data << endl;
+    for(Uint32 i = 0; i<m_active_gci.size(); i++)
+    {
+      ndbout << i << " - " << m_active_gci[i] << endl;
     }
-    bufferId = NO_ID(ni, bufferId);
+    DBUG_VOID_RETURN;
   }
 
-  // initialize BufItem::Ps
+  Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
+  if(unlikely(old_cnt == ~(Uint32)0))
   {
-    int n = NO(bufferId);
-    NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
-    e.theHandle = aHandle;
-    e.b=0;
-    e.bufferempty = 1;
-    e.overrun=0; // set to -1 to handle first insert
+    old_cnt = m_system_nodes;
   }
+  
+  assert(old_cnt >= cnt);
+  bucket->m_gcp_complete_rep_count = old_cnt - cnt;
 
-  if (b.subs > 0)
-    hasSubscriber = 1;
-  else
-    hasSubscriber = 0;
-
-  DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d hasSubscriber=%d",
-		     bufferId, eventId, hasSubscriber));
+  if(old_cnt == cnt)
+  {
+    if(likely(gci == m_latestGCI + 1 || m_latestGCI == 0))
+    {
+      m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus
+      if(!bucket->m_data.is_empty())
+      {
+#ifdef VM_TRACE
+	assert(bucket->m_data.m_count);
+#endif
+	m_complete_data.m_data.append(bucket->m_data);
+      }
+      reportStatus();
+      bzero(bucket, sizeof(Gci_container));
+      bucket->m_gci = gci + ACTIVE_GCI_DIRECTORY_SIZE;
+      bucket->m_gcp_complete_rep_count = m_system_nodes;
+      if(unlikely(m_latest_complete_GCI > gci))
+      {
+	complete_outof_order_gcis();
+      }
 
-  /* we now have a lock on the prepare so that no one can mess with this
-   * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
-   */
-  DBUG_RETURN(bufferId);
-}
+      // signal that somethings happened
 
-void
-NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent");
-  BufItem &b = m_buf[ID(bufferId)];
-  int n = NO(bufferId);
-
-  DBUG_PRINT("enter", ("bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d",
-		       bufferId, ID(bufferId), NO(bufferId)));
-
-  b.ps[n].theHandle = NULL;
-
-  // remove subscribers from the end,
-  // we have to keep gaps since the position
-  // has been handed out in bufferId
-  for (int i = b.subs-1; i >= 0; i--)
-    if (b.ps[i].theHandle == NULL)
-      b.subs--;
+      NdbCondition_Signal(p_cond);
+    }
     else
-      break;
-
-  if (b.subs == 0) {
-    DBUG_PRINT("info",("no more subscribers left on eventId %d", b.gId));
-    b.gId= 0;  // We don't have any subscribers, reuse BufItem
-    if (b.data) {
-      NdbMem_Free((void *)b.data);
-      b.data = NULL;
-    }
-    if (b.p_buf_mutex) {
-      NdbMutex_Destroy(b.p_buf_mutex);
-      b.p_buf_mutex = NULL;
+    {
+      /** out of order something */
+      ndbout_c("out of order bucket: %d gci: %lld m_latestGCI: %lld", 
+	       bucket-m_active_gci.getBase(), gci, m_latestGCI);
+      bucket->m_state = Gci_container::GC_COMPLETE;
+      bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
+      m_latest_complete_GCI = gci;
     }
   }
-  //  add_drop_unlock();
+  
   DBUG_VOID_RETURN;
 }
 
 void
-NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, 
-					     void *ndbEventOperation)
+NdbEventBuffer::complete_outof_order_gcis()
 {
-  DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent");
-  BufItem &b = m_buf[ID(bufferId)];
-  int n = NO(bufferId);
-
-  b.subs++;
-  b.ps[n].theHandle->addBufferId(bufferId);
+  Uint64 start_gci = m_latestGCI + 1;
+  Uint64 stop_gci = m_latest_complete_GCI;
+  
+  const Uint32 size = m_active_gci.size();
+  Gci_container* array= m_active_gci.getBase();
+  
+  ndbout_c("complete_outof_order_gcis");
+  for(Uint32 i = 0; i<size; i++)
+  {
+    ndbout << i << " - " << array[i] << endl;
+  }
+  
+  for(; start_gci <= stop_gci; start_gci++)
+  {
+    /**
+     * Find gci
+     */
+    Uint32 i;
+    Gci_container* bucket= 0;
+    for(i = 0; i<size; i++)
+    {
+      Gci_container* tmp = array + i;
+      if(tmp->m_gci == start_gci && tmp->m_state == Gci_container::GC_COMPLETE)
+      {
+	bucket= tmp;
+	break;
+      }
+    }
+    if(bucket == 0)
+    {
+      break;
+    }
 
-  //  add_drop_unlock();
-  DBUG_PRINT("info",("added bufferId %d", bufferId));
-  DBUG_VOID_RETURN;
+    printf("complete_outof_order_gcis - completing %lld", start_gci);
+    if(!bucket->m_data.is_empty())
+    {
+#ifdef VM_TRACE
+      assert(bucket->m_data.m_count);
+#endif
+      m_complete_data.m_data.append(bucket->m_data);
+#ifdef VM_TRACE
+      ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count,
+	       m_complete_data.m_data.m_count);
+#else
+      ndbout_c("");
+#endif
+    }
+    bzero(bucket, sizeof(Gci_container));
+    if(i < ACTIVE_GCI_DIRECTORY_SIZE)
+    {
+      bucket->m_gci = start_gci + ACTIVE_GCI_DIRECTORY_SIZE;
+      bucket->m_gcp_complete_rep_count = m_system_nodes;
+    }
+    
+    m_latestGCI = m_complete_data.m_gci = start_gci;
+  }
+  
+  ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI);
 }
 
 void
-NdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId)
+NdbEventBuffer::completeClusterFailed()
 {
-  //  add_drop_unlock(); // only one thread can do add or drop at a time
-}
+  DBUG_ENTER("NdbEventBuffer::completeClusterFailed");
 
-int 
-NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
-						     int& hasSubscriber)
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent");
-  //  add_drop_lock(); // only one thread can do add or drop at a time
+  SubTableData data;
+  LinearSectionPtr ptr[3];
+  bzero(&data, sizeof(data));
+  bzero(ptr, sizeof(ptr));
 
-  BufItem &b = m_buf[ID(bufferId)];
+  data.tableId = ~0;
+  data.operation = NdbDictionary::Event::_TE_CLUSTER_FAILURE;
+  data.logType = SubTableData::LOG;
 
-  int n = 0;
-  for(int i=0; i < b.subs;i++) {
-    if (b.ps[i].theHandle != NULL)
-      n++;
+  /**
+   * Find min not completed GCI
+   */
+  Uint32 sz= m_active_gci.size();
+  Uint64 gci= ~0;
+  Gci_container* bucket = 0;
+  Gci_container* array = m_active_gci.getBase();
+  for(Uint32 i = 0; i<sz; i++)
+  {
+    if(array[i].m_gcp_complete_rep_count && array[i].m_gci < gci)
+    {
+      bucket= array + i;
+      gci = bucket->m_gci;
+    }
   }
 
-  if (n > 1)
-    hasSubscriber = 1;
-  else if (n == 1)
-    hasSubscriber = 0;
-  else
+  if(bucket == 0)
   {
-    DBUG_RETURN(-1);
+    /**
+     * Did not find any not completed GCI's
+     *   lets fake one...
+     */
+    gci = m_latestGCI + 1;
+    bucket = array + ( gci & ACTIVE_GCI_MASK );
+    bucket->m_gcp_complete_rep_count = 1;
   }
+  
+  const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1; 
 
-  DBUG_RETURN(0);
-}
-
-void
-NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent");
-  //  add_drop_lock(); // only one thread can do add-drop at a time
-
-  BufItem &b = m_buf[ID(bufferId)];
-  int n = NO(bufferId);
-
-  b.ps[n].overrun=0;
-  b.ps[n].bufferempty=1;
-  b.ps[n].b=0;
-  b.ps[n].theHandle->dropBufferId(bufferId);
-
-  real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();
-
-#ifdef EVENT_DEBUG
-  ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);
+  /**
+   * Release all GCI's
+   */
+  for(Uint32 i = 0; i<sz; i++)
+  {
+    Gci_container* tmp = array + i;
+    if(!tmp->m_data.is_empty())
+    {
+      free_list(tmp->m_data);
+#if 0
+      m_free_data_count++;
+      EventBufData* loop= tmp->m_head;
+      while(loop != tmp->m_tail)
+      {
+	m_free_data_count++;
+	loop = loop->m_next;
+      }
 #endif
-  DBUG_VOID_RETURN;
-}
+    }
+    bzero(tmp, sizeof(Gci_container));
+  }
+  
+  bucket->m_gci = gci;
+  bucket->m_gcp_complete_rep_count = cnt;
+  
+  data.gci = gci;
+  
+  /**
+   * Insert this event for each operation
+   */
+  NdbEventOperation* op= 0;
+  while((op = m_ndb->getEventOperation(op)))
+  {
+    NdbEventOperationImpl* impl= &op->m_impl;
+    data.senderData = impl->m_oid;
+    insertDataL(impl, &data, ptr); 
+  }
+  
+  /**
+   * And finally complete this GCI
+   */
+  SubGcpCompleteRep rep;
+  rep.gci= gci;
+  rep.gcp_complete_rep_count= cnt;
+  execSUB_GCP_COMPLETE_REP(&rep);
 
-void
-NdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci)
-{
-  if (gci > m_latestGCI)
-    m_latestGCI = gci;
-  else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-)
-    m_latestGCI = gci;
+  DBUG_VOID_RETURN;
 }
 
-Uint32
-NdbGlobalEventBuffer::real_getLatestGCI()
+Uint64
+NdbEventBuffer::getLatestGCI()
 {
   return m_latestGCI;
 }
 
 int
-NdbGlobalEventBuffer::real_insertDataL(int bufferId, 
-				       const SubTableData * const sdata, 
-				       LinearSectionPtr ptr[3])
+NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
+			    const SubTableData * const sdata, 
+			    LinearSectionPtr ptr[3])
 {
-  DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL");
-  BufItem &b = m_buf[ID(bufferId)];
-#ifdef EVENT_DEBUG
-  int n = NO(bufferId);
-#endif
+  DBUG_ENTER("NdbEventBuffer::insertDataL");
 
-  if ( b.eventType & (1 << (Uint32)sdata->operation) )
+  Uint64 gci= sdata->gci;
+  EventBufData *data= m_free_data;
+
+  if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) )
   {
-    if (b.subs) {
-#ifdef EVENT_DEBUG
-      ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);
+    Gci_container* bucket= find_bucket(&m_active_gci, gci);
+      
+    DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId));
+
+    if (unlikely(bucket == 0))
+    {
+      /**
+       * Already completed GCI...
+       *   Possible in case of resend during NF handling
+       */
+      DBUG_RETURN(0);
+    }
+
+    if (unlikely(data == 0))
+    {
+#ifdef VM_TRACE
+      assert(m_free_data_count == 0);
+      assert(m_free_data_sz == 0);
 #endif
-      // move front forward
-      if (copy_data_alloc(sdata, ptr,
-			  b.data[b.f].sdata, b.data[b.f].ptr))
+      expand(4000);
+      reportStatus();
+
+      data= m_free_data;
+      if (unlikely(data == 0))
       {
-	DBUG_RETURN(-1);
-      }
-      for (int i=0; i < b.subs; i++) {
-	NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i];
-	if (e.theHandle) { // active subscriber
-	  if (b.f == e.b) { // next-to-read == written
-	    if (e.bufferempty == 0) {
-	      e.overrun++; // another item has been overwritten
-	      e.b++; // move next-to-read next since old item was overwritten
-	      if (e.b == b.max_sz) e.b= 0; // start from beginning
-	    }
-	  }
-	  e.bufferempty = 0;
-	  // signal subscriber that there's more to get
-	  NdbCondition_Signal(e.theHandle->p_cond);
-	}
+#ifdef VM_TRACE
+	printf("m_latest_command: %s\n", m_latest_command);
+	printf("no free data, m_latestGCI %lld\n",
+	       m_latestGCI);
+	printf("m_free_data_count %d\n", m_free_data_count);
+	printf("m_available_data_count %d first gci %d last gci %d\n",
+	       m_available_data.m_count,
+	       m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
+	       m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
+	printf("m_used_data_count %d\n", m_used_data.m_count);
+#endif
+	op->m_has_error= 2;
+	DBUG_RETURN(-1); // TODO handle this, overrun, or, skip?
       }
-      b.f++; // move next-to-write
-      if (b.f == b.max_sz) b.f = 0; // start from beginning
-#ifdef EVENT_DEBUG
-      ndbout_c("Front= %d Back = %d overun = %d", b.f,
-	       b.ps[n].b, b.ps[n].overrun);
-#endif
-    } else {
-#ifdef EVENT_DEBUG
-      ndbout_c("Data arrived before ready eventId", b.gId);
+    }
+
+    // remove data from free list
+    m_free_data= data->m_next;
+#ifdef VM_TRACE
+    m_free_data_count--;
+    assert(m_free_data_sz >= data->sz);
 #endif
+    m_free_data_sz-= data->sz;
+
+    if (unlikely(copy_data_alloc(sdata, ptr, data)))
+    {
+      op->m_has_error= 3;
+      DBUG_RETURN(-1);
     }
-  }
-  else
-  {
-#ifdef EVENT_DEBUG
-    ndbout_c("skipped");
+
+    // add it to received data
+    bucket->m_data.append(data);
+
+    data->m_event_op= op;
+#ifdef VM_TRACE
+    op->m_data_count++;
 #endif
+    DBUG_RETURN(0);
   }
 
-  DBUG_RETURN(0);
+#ifdef VM_TRACE
+  if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation)
+  {
+    DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId));
+    DBUG_RETURN(0);
+  }
+  else {
+    DBUG_PRINT("info",("skipped"));
+    DBUG_RETURN(0);
+  }
+#else
+  return 0;
+#endif
 }
 
-int NdbGlobalEventBuffer::hasData(int bufferId) {
-  DBUG_ENTER("NdbGlobalEventBuffer::hasData");
-  BufItem &b = m_buf[ID(bufferId)];
-  int n = NO(bufferId);
-  NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
-
-  if(e.bufferempty)
+int 
+NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
+				LinearSectionPtr f_ptr[3],
+				EventBufData *ev_buf)
+{
+  DBUG_ENTER("NdbEventBuffer::copy_data_alloc");
+  const unsigned min_alloc_size= 128;
+  const unsigned sz4= (sizeof(SubTableData)+3)>>2;
+  Uint32 f_ptr_sz_0= f_ptr[0].sz;
+  Uint32 f_ptr_sz_1= f_ptr[1].sz;
+  Uint32 f_ptr_sz_2= f_ptr[2].sz;
+  LinearSectionPtr *t_ptr= ev_buf->ptr;
+  SubTableData *sdata= ev_buf->sdata;
+  const unsigned alloc_size= (sz4 +
+			      f_ptr_sz_0 +
+			      f_ptr_sz_1 +
+			      f_ptr_sz_2) * sizeof(Uint32);
+  Uint32 *ptr;
+  if (alloc_size > min_alloc_size)
   {
-    DBUG_RETURN(0);
+    if (sdata)
+    {
+      NdbMem_Free((char*)sdata);
+#ifdef VM_TRACE
+      assert(m_total_alloc >= ev_buf->sz);
+#endif
+      m_total_alloc-= ev_buf->sz;
+    }
+    ptr= (Uint32*)NdbMem_Allocate(alloc_size);
+    ev_buf->sdata= (SubTableData *)ptr;
+    ev_buf->sz= alloc_size;
+    m_total_alloc+= alloc_size;
+  }
+  else /* alloc_size <= min_alloc_size */
+  {
+    if (sdata)
+      ptr= (Uint32*)sdata;
+    else
+    {
+      ptr= (Uint32*)NdbMem_Allocate(min_alloc_size);
+      ev_buf->sdata= (SubTableData *)ptr;
+      ev_buf->sz= min_alloc_size;
+      m_total_alloc+= min_alloc_size;
+    }
   }
 
-  if (b.f <= e.b)
+  memcpy(ptr,f_sdata,sizeof(SubTableData));
+  ptr+= sz4;
+
+  t_ptr->p= ptr;
+  t_ptr->sz= f_ptr_sz_0;
+
+  memcpy(ptr, f_ptr[0].p, sizeof(Uint32)*f_ptr_sz_0);
+  ptr+= f_ptr_sz_0;
+  t_ptr++;
+
+  t_ptr->p= ptr;
+  t_ptr->sz= f_ptr_sz_1;
+
+  memcpy(ptr, f_ptr[1].p, sizeof(Uint32)*f_ptr_sz_1);
+  ptr+= f_ptr_sz_1;
+  t_ptr++;
+
+  if (f_ptr_sz_2)
   {
-    DBUG_RETURN(b.max_sz-e.b + b.f);
+    t_ptr->p= ptr;
+    t_ptr->sz= f_ptr_sz_2;
+    memcpy(ptr, f_ptr[2].p, sizeof(Uint32)*f_ptr_sz_2);
   }
   else
   {
-    DBUG_RETURN(b.f-e.b);
+    t_ptr->p= 0;
+    t_ptr->sz= 0;
   }
+
+  DBUG_RETURN(0);
 }
 
-int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
-					SubTableData * &sdata,
-					LinearSectionPtr ptr[3],
-					int *pOverrun)
+NdbEventOperationImpl *
+NdbEventBuffer::move_data()
 {
-  DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL");
-  BufItem &b = m_buf[ID(bufferId)];
-  int n = NO(bufferId);
-  NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
+  // handle received data
+  if (!m_complete_data.m_data.is_empty())
+  {
+    // move this list to last in m_available_data
+    m_available_data.append(m_complete_data.m_data);
 
-  if (pOverrun) {
-    *pOverrun = e.overrun;
-    e.overrun = 0; // if pOverrun is returned to user reset e.overrun
+    bzero(&m_complete_data, sizeof(m_complete_data));
   }
 
-  if (e.bufferempty)
+  // handle used data
+  if (!m_used_data.is_empty())
+  {
+    // return m_used_data to m_free_data
+    free_list(m_used_data);
+  }
+  if (!m_available_data.is_empty())
   {
-    DBUG_RETURN(0); // nothing to get
+    DBUG_ENTER("NdbEventBuffer::move_data");
+#ifdef VM_TRACE
+    DBUG_PRINT("exit",("m_available_data_count %u", m_available_data.m_count));
+#endif
+    DBUG_RETURN(m_available_data.m_head->m_event_op);
   }
+  return 0;
+}
+
+void
+NdbEventBuffer::free_list(EventBufData_list &list)
+{
+  // return list to m_free_data
+  list.m_tail->m_next= m_free_data;
+  m_free_data= list.m_head;
+#ifdef VM_TRACE
+  m_free_data_count+= list.m_count;
+#endif
+  m_free_data_sz+= list.m_sz;
 
-  DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d",
-		     ID(bufferId), NO(bufferId), e.b));
+  // list returned to m_free_data
+  new (&list) EventBufData_list;
+}
 
-  if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,
-		      sdata, ptr))
+NdbEventOperation*
+NdbEventBuffer::createEventOperation(const char* eventName,
+				     NdbError &theError)
+{
+  DBUG_ENTER("NdbEventBuffer::createEventOperation");
+  NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName);
+  if (tOp == 0)
   {
-    DBUG_RETURN(-1);
+    theError.code= 4000;
+    DBUG_RETURN(NULL);
+  }
+  if (tOp->getState() != NdbEventOperation::EO_CREATED) {
+    theError.code= tOp->getNdbError().code;
+    delete tOp;
+    DBUG_RETURN(NULL);
   }
+  DBUG_RETURN(tOp);
+}
 
-  e.b++; if (e.b == b.max_sz) e.b= 0; // move next-to-read forward
+void
+NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
+{
+  NdbEventOperationImpl* op= getEventOperationImpl(tOp);
 
-  if (b.f == e.b) // back has cought up with front
-    e.bufferempty = 1;
+  op->stop();
 
-#ifdef EVENT_DEBUG
-  ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);
-#endif
+  op->m_next= m_dropped_ev_op;
+  op->m_prev= 0;
+  if (m_dropped_ev_op)
+    m_dropped_ev_op->m_prev= op;
+  m_dropped_ev_op= op;
 
-  DBUG_RETURN(hasData(bufferId)+1);
-}
-int 
-NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
-				      LinearSectionPtr f_ptr[3],
-				      SubTableData * &t_sdata,
-				      LinearSectionPtr t_ptr[3])
-{
-  DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc");
-  unsigned sz4= (sizeof(SubTableData)+3)>>2;
-  Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 +
-					 f_ptr[0].sz +
-					 f_ptr[1].sz +
-					 f_ptr[2].sz) * sizeof(Uint32));
-  if (t_sdata)
-    NdbMem_Free((char*)t_sdata);
-  t_sdata= (SubTableData *)ptr;
-  memcpy(t_sdata,f_sdata,sizeof(SubTableData));
-  ptr+= sz4;
+  // ToDo, take care of these to be deleted at the
+  // appropriate time, after we are sure that there
+  // are _no_ more events coming
 
-  for (int i = 0; i < 3; i++) {
-    LinearSectionPtr & f_p = f_ptr[i];
-    LinearSectionPtr & t_p = t_ptr[i];
-    if (f_p.sz > 0) {
-      t_p.p= (Uint32 *)ptr;
-      memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);
-      ptr+= f_p.sz;
-      t_p.sz= f_p.sz;
-    } else {
-      t_p.p= NULL;
-      t_p.sz= 0;
-    }
-  }
-  DBUG_RETURN(0);
+  //  delete tOp;
 }
-int
-NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h,
-				int aMillisecondNumber)
+
+void
+NdbEventBuffer::reportStatus()
 {
-  DBUG_ENTER("NdbGlobalEventBuffer::real_wait");
-  // check if there are anything in any of the buffers
-  int i;
-  int n = 0;
-  for (i = 0; i < h->m_nids; i++)
-    n += hasData(h->m_bufferIds[i]);
-  if (n) 
+  EventBufData *apply_buf= m_available_data.m_head;
+  Uint64 apply_gci, latest_gci= m_latestGCI;
+  if (apply_buf == 0)
+    apply_buf= m_complete_data.m_data.m_head;
+  if (apply_buf)
+    apply_gci= apply_buf->sdata->gci;
+  else
+    apply_gci= latest_gci;
+
+  if (100*m_free_data_sz < m_min_free_thresh*m_total_alloc &&
+      m_total_alloc > 1024*1024)
   {
-    DBUG_RETURN(n);
+    /* report less free buffer than m_free_thresh,
+       next report when more free than 2 * m_free_thresh
+    */
+    m_min_free_thresh= 0;
+    m_max_free_thresh= 2 * m_free_thresh;
+    goto send_report;
   }
-
-  int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex,
-				   aMillisecondNumber);
-  if (r > 0)
+  
+  if (100*m_free_data_sz > m_max_free_thresh*m_total_alloc &&
+      m_total_alloc > 1024*1024)
   {
-    DBUG_RETURN(-1);
+    /* report more free than 2 * m_free_thresh
+       next report when less free than m_free_thresh
+    */
+    m_min_free_thresh= m_free_thresh;
+    m_max_free_thresh= 100;
+    goto send_report;
+ }
+  if (latest_gci-apply_gci >=  m_gci_slip_thresh)
+  {
+    goto send_report;
   }
+  return;
 
-  n = 0;
-  for (i = 0; i < h->m_nids; i++)
-    n += hasData(h->m_bufferIds[i]);
-  DBUG_RETURN(n);
+send_report:
+  Uint32 data[8];
+  data[0]= NDB_LE_EventBufferStatus;
+  data[1]= m_total_alloc-m_free_data_sz;
+  data[2]= m_total_alloc;
+  data[3]= 0;
+  data[4]= apply_gci & ~(Uint32)0;
+  data[5]= apply_gci >> 32;
+  data[6]= latest_gci & ~(Uint32)0;
+  data[7]= latest_gci >> 32;
+  m_ndb->theImpl->send_event_report(data,8);
+#ifdef VM_TRACE
+  assert(m_total_alloc >= m_free_data_sz);
+#endif
 }
 
-template class Vector<NdbGlobalEventBufferHandle*>;
+template class Vector<Gci_container>;
+template class Vector<NdbEventBuffer::EventBufData_chunk*>;

--- 1.19.6.1/ndb/test/ndbapi/Makefile.am	2005-12-27 16:33:52 +01:00
+++ 1.26/storage/ndb/test/ndbapi/Makefile.am	2005-12-27 18:28:35 +01:00
@@ -34,9 +34,7 @@
 testPartitioning \
 testBitfield \
 DbCreate DbAsyncGenerator \
-test_event_multi_table \
-testSRBank \
-test_event_merge
+testSRBank
 
 #flexTimedAsynch
 #testBlobs
@@ -79,14 +77,12 @@
 testBitfield_SOURCES = testBitfield.cpp
 DbCreate_SOURCES = bench/mainPopulate.cpp bench/dbPopulate.cpp bench/userInterface.cpp bench/dbPopulate.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp
 DbAsyncGenerator_SOURCES = bench/mainAsyncGenerator.cpp bench/asyncGenerator.cpp bench/ndb_async2.cpp bench/dbGenerator.h bench/macros.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp
-test_event_multi_table_SOURCES = test_event_multi_table.cpp
 testSRBank_SOURCES = testSRBank.cpp
-test_event_merge_SOURCES = test_event_merge.cpp
 
-INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
+INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/include/kernel
 
-include $(top_srcdir)/ndb/config/common.mk.am
-include $(top_srcdir)/ndb/config/type_ndbapitest.mk.am
+include $(top_srcdir)/storage/ndb/config/common.mk.am
+include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am
 
 ##testDict_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
 ##testIndex_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
@@ -104,62 +100,61 @@
              testScan.dsp
 
 flexBench.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ flexBench
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(flexBench_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ flexBench
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(flexBench_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testBasic.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testBasic
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testBasic_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testBasic
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testBasic_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testOIBasic.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testOIBasic
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testOIBasic_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testOIBasic
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testOIBasic_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testBlobs.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testBlobs
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testBlobs_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testBlobs
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testBlobs_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testScan.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testScan
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testScan_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
-
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testScan
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testScan_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
Thread
bk commit into 5.1 tree (pekka:1.1998)pekka27 Dec