List:Commits« Previous MessageNext Message »
From:tomas Date:April 11 2007 6:30pm
Subject:bk commit into 5.1 tree (tomas:1.2576)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-04-11 20:30:23+02:00, tomas@stripped +3 -0
  Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.0-ndb
  into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-single-user
  MERGE: 1.1810.2374.122

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2007-04-11 20:30:11+02:00, tomas@stripped +0 -6
    null merge
    MERGE: 1.2.7.2

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2007-04-11 20:30:02+02:00, tomas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp -> storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp

  storage/ndb/src/ndbapi/Ndb.cpp@stripped, 2007-04-11 20:30:02+02:00, tomas@stripped +0 -0
    Auto merged
    MERGE: 1.49.26.2

  storage/ndb/src/ndbapi/Ndb.cpp@stripped, 2007-04-11 20:30:02+02:00, tomas@stripped +0 -0
    Merge rename: ndb/src/ndbapi/Ndb.cpp -> storage/ndb/src/ndbapi/Ndb.cpp

  storage/ndb/src/ndbapi/Ndbif.cpp@stripped, 2007-04-11 20:30:02+02:00, tomas@stripped +0 -0
    Auto merged
    MERGE: 1.27.12.2

  storage/ndb/src/ndbapi/Ndbif.cpp@stripped, 2007-04-11 20:30:02+02:00, tomas@stripped +0 -0
    Merge rename: ndb/src/ndbapi/Ndbif.cpp -> storage/ndb/src/ndbapi/Ndbif.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	whalegate.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-single-user/RESYNC

--- 1.2.7.1/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-04-11 08:28:07 +02:00
+++ 1.25/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-04-11 20:30:11 +02:00
@@ -14,252 +14,422 @@
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
 #define DBTUP_C
+#define DBTUP_COMMIT_CPP
 #include "Dbtup.hpp"
 #include <RefConvert.hpp>
 #include <ndb_limits.h>
 #include <pc.hpp>
 #include <signaldata/TupCommit.hpp>
+#include "../dblqh/Dblqh.hpp"
 
-#define ljam() { jamLine(5000 + __LINE__); }
-#define ljamEntry() { jamEntryLine(5000 + __LINE__); }
+void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
+{
+  TablerecPtr regTabPtr;
+  FragrecordPtr regFragPtr;
+  Uint32 frag_page_id, frag_id;
+
+  jamEntry();
+
+  frag_id= signal->theData[0];
+  regTabPtr.i= signal->theData[1];
+  frag_page_id= signal->theData[2];
+  Uint32 page_index= signal->theData[3];
+
+  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
+  
+  getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
+  ndbassert(regFragPtr.p != NULL);
+  
+  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~ (Uint32) 0))
+  {
+    Local_key tmp;
+    tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
+    tmp.m_page_idx= page_index;
+    
+    PagePtr pagePtr;
+    Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
+
+    ndbassert(ptr->m_header_bits & Tuple_header::FREE);
+
+    if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
+    {
+      ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
+      ptr->m_header_bits |= Tuple_header::FREED;
+      return;
+    }
+    
+    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
+    {
+      jam();
+      free_var_rec(regFragPtr.p, regTabPtr.p, &tmp, pagePtr);
+    } else {
+      free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
+    }
+  }
+}
 
 void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
 {
   jamEntry();
   OperationrecPtr loopOpPtr;
-  loopOpPtr.i = signal->theData[0];
-  Uint32 gci = signal->theData[1];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  while (loopOpPtr.p->nextActiveOp != RNIL) {
-    ljam();
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  }//while
+  loopOpPtr.i= signal->theData[0];
+  Uint32 gci= signal->theData[1];
+  c_operation_pool.getPtr(loopOpPtr);
+  while (loopOpPtr.p->prevActiveOp != RNIL) {
+    jam();
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
+  }
   do {
-    Uint32 blockNo = refToBlock(loopOpPtr.p->userblockref);
-    ndbrequire(loopOpPtr.p->transstate == STARTED);
-    signal->theData[0] = loopOpPtr.p->userpointer;
-    signal->theData[1] = gci;
-    if (loopOpPtr.p->prevActiveOp == RNIL) {
-      ljam();
-      EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
+    ndbrequire(get_trans_state(loopOpPtr.p) == TRANS_STARTED);
+    signal->theData[0]= loopOpPtr.p->userpointer;
+    signal->theData[1]= gci;
+    if (loopOpPtr.p->nextActiveOp == RNIL) {
+      jam();
+      EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
       return;
-    }//if
-    ljam();
-    EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
+    }
+    jam();
+    EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
     jamEntry();
-    loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+    loopOpPtr.i= loopOpPtr.p->nextActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
   } while (true);
-}//Dbtup::execTUP_WRITELOG_REQ()
+}
 
-void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
+void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr,
+                               Tuple_header *tuple_ptr)
 {
-  TablerecPtr regTabPtr;
-  FragrecordPtr regFragPtr;
+  OperationrecPtr raoOperPtr;
 
-  jamEntry();
+  /**
+   * Release copy tuple
+   */
+  if(regOperPtr->op_struct.op_type != ZDELETE && 
+     !regOperPtr->m_copy_tuple_location.isNull())
+    c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
+  
+  if (regOperPtr->op_struct.in_active_list) {
+    regOperPtr->op_struct.in_active_list= false;
+    if (regOperPtr->nextActiveOp != RNIL) {
+      jam();
+      raoOperPtr.i= regOperPtr->nextActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->prevActiveOp= regOperPtr->prevActiveOp;
+    } else {
+      jam();
+      tuple_ptr->m_operation_ptr_i = regOperPtr->prevActiveOp;
+    }
+    if (regOperPtr->prevActiveOp != RNIL) {
+      jam();
+      raoOperPtr.i= regOperPtr->prevActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->nextActiveOp= regOperPtr->nextActiveOp;
+    }
+    regOperPtr->prevActiveOp= RNIL;
+    regOperPtr->nextActiveOp= RNIL;
+  }
+}
 
-  Uint32 fragId = signal->theData[0];
-  regTabPtr.i = signal->theData[1];
-  Uint32 fragPageId = signal->theData[2];
-  Uint32 pageIndex = signal->theData[3];
+/* ---------------------------------------------------------------- */
+/* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
+/* ---------------------------------------------------------------- */
+void Dbtup::initOpConnection(Operationrec* regOperPtr)
+{
+  set_tuple_state(regOperPtr, TUPLE_ALREADY_ABORTED);
+  set_trans_state(regOperPtr, TRANS_IDLE);
+  regOperPtr->currentAttrinbufLen= 0;
+  regOperPtr->op_struct.op_type= ZREAD;
+  regOperPtr->op_struct.m_disk_preallocated= 0;
+  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr->op_struct.m_wait_log_buffer= 0;
+  regOperPtr->m_undo_buffer_space= 0;
+}
 
-  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
-  getFragmentrec(regFragPtr, fragId, regTabPtr.p);
-  ndbrequire(regFragPtr.p != NULL);
+static
+inline
+bool
+operator>(const Local_key& key1, const Local_key& key2)
+{
+  return key1.m_page_no > key2.m_page_no ||
+    (key1.m_page_no == key2.m_page_no && key1.m_page_idx > key2.m_page_idx);
+}
 
-  PagePtr pagePtr;
-  pagePtr.i = getRealpid(regFragPtr.p, fragPageId);
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 pageIndexScaled = pageIndex >> 1;
-  ndbrequire((pageIndex & 1) == 0);
-  Uint32 pageOffset = ZPAGE_HEADER_SIZE + 
-                     (regTabPtr.p->tupheadsize * pageIndexScaled);
-//---------------------------------------------------
-/* --- Deallocate a tuple as requested by ACC  --- */
-//---------------------------------------------------
-  if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
-    ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_INSERT_TH,
-                        fragPageId,
-                        pageIndex,
-                        regTabPtr.i,
-                        fragId,
-                        regFragPtr.p->checkpointVersion);
-    cprAddData(signal,
-               regFragPtr.p,
-               pagePtr.i,
-               regTabPtr.p->tupheadsize,
-               pageOffset);
-  }//if
-  {
-    freeTh(regFragPtr.p,
-           regTabPtr.p,
-           signal,
-           pagePtr.p,
-           pageOffset);
+void
+Dbtup::dealloc_tuple(Signal* signal,
+		     Uint32 gci,
+		     Page* page,
+		     Tuple_header* ptr, 
+		     Operationrec* regOperPtr, 
+		     Fragrecord* regFragPtr, 
+		     Tablerec* regTabPtr)
+{
+  Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
+  Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
+
+  Uint32 bits = ptr->m_header_bits;
+  Uint32 extra_bits = Tuple_header::FREED;
+  if (bits & Tuple_header::DISK_PART)
+  {
+    Local_key disk;
+    memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
+    PagePtr tmpptr;
+    tmpptr.i = m_pgman.m_ptr.i;
+    tmpptr.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
+    disk_page_free(signal, regTabPtr, regFragPtr, 
+		   &disk, tmpptr, gci);
+  }
+  
+  if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
+  {
+    ScanOpPtr scanOp;
+    c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
+    Local_key rowid = regOperPtr->m_tuple_location;
+    Local_key scanpos = scanOp.p->m_scanPos.m_key;
+    rowid.m_page_no = page->frag_page_id;
+    if (rowid > scanpos)
+    {
+      extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
+      ptr->m_operation_ptr_i = lcp_keep_list;
+      regFragPtr->m_lcp_keep_list = rowid.ref();
+    }
+  }
+  
+  ptr->m_header_bits = bits | extra_bits;
+  
+  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
+  {
+    jam();
+    * ptr->get_mm_gci(regTabPtr) = gci;
   }
 }
 
-/* ---------------------------------------------------------------- */
-/* ------------ PERFORM A COMMIT ON AN UPDATE OPERATION  ---------- */
-/* ---------------------------------------------------------------- */
-void Dbtup::commitUpdate(Signal* signal,
-                         Operationrec*  const regOperPtr,
-                         Fragrecord* const regFragPtr,
-                         Tablerec* const regTabPtr)
-{
-  if (regOperPtr->realPageIdC != RNIL) {
-    if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageIdC)) {
-/* ------------------------------------------------------------------------ */
-/* IF THE COPY WAS CREATED WITHIN THIS CHECKPOINT WE ONLY HAVE              */
-/* TO LOG THE CREATION OF THE COPY. IF HOWEVER IT WAS CREATED BEFORE  SAVE  */
-/* THIS CHECKPOINT, WE HAVE TO THE DATA AS WELL.                            */
-/* ------------------------------------------------------------------------ */
-      if (regOperPtr->undoLogged) {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH_NO_DATA,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-      } else {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-        cprAddData(signal,
-                   regFragPtr,
-                   regOperPtr->realPageIdC,
-                   regTabPtr->tupheadsize,
-                   regOperPtr->pageOffsetC);
-      }//if
-    }//if
-
-    PagePtr copyPagePtr;
-    copyPagePtr.i = regOperPtr->realPageIdC;
-    ptrCheckGuard(copyPagePtr, cnoOfPage, page);
-    freeTh(regFragPtr,
-           regTabPtr,
-           signal,
-           copyPagePtr.p,
-           (Uint32)regOperPtr->pageOffsetC);
-    regOperPtr->realPageIdC = RNIL;
-    regOperPtr->fragPageIdC = RNIL;
-    regOperPtr->pageOffsetC = ZNIL;
-    regOperPtr->pageIndexC = ZNIL;
-  }//if
-}//Dbtup::commitUpdate()
+void
+Dbtup::commit_operation(Signal* signal,
+			Uint32 gci,
+			Tuple_header* tuple_ptr, 
+			PagePtr pagePtr,
+			Operationrec* regOperPtr, 
+			Fragrecord* regFragPtr, 
+			Tablerec* regTabPtr)
+{
+  ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
+  
+  Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
+  Uint32 save= tuple_ptr->m_operation_ptr_i;
+  Uint32 bits= tuple_ptr->m_header_bits;
+
+  Tuple_header *disk_ptr= 0;
+  Tuple_header *copy= (Tuple_header*)
+    c_undo_buffer.get_ptr(&regOperPtr->m_copy_tuple_location);
+  
+  Uint32 copy_bits= copy->m_header_bits;
+
+  Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
+  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  if(mm_vars == 0)
+  {
+    memcpy(tuple_ptr, copy, 4*fixsize);
+    disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
+  }
+  else
+  {
+    Var_part_ref *ref= (Var_part_ref*)tuple_ptr->get_var_part_ptr(regTabPtr);
+    memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fixsize));
+    
+    Local_key tmp; 
+    ref->copyout(&tmp);
+
+    PagePtr vpagePtr;
+    Uint32 *dst= get_ptr(&vpagePtr, *ref);
+    Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
+    Uint32 *src= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
+    ndbassert(4*vpagePtrP->get_entry_len(tmp.m_page_idx) >= sz);
+    memcpy(dst, src, sz);
+
+    copy_bits |= Tuple_header::CHAINED_ROW;
+    
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+      update_free_page_list(regFragPtr, vpagePtr);
+    } 
+    
+    disk_ptr = (Tuple_header*)
+      (((Uint32*)copy)+Tuple_header::HeaderSize+fixsize+((sz + 3) >> 2));
+  } 
+  
+  if (regTabPtr->m_no_of_disk_attributes &&
+      (copy_bits & Tuple_header::DISK_INLINE))
+  {
+    Local_key key;
+    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
+
+    PagePtr diskPagePtr = *(PagePtr*)&m_pgman.m_ptr;
+    ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+    ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
+    Uint32 sz, *dst;
+    if(copy_bits & Tuple_header::DISK_ALLOC)
+    {
+      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
+    }
+    
+    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    {
+      sz= regTabPtr->m_offsets[DD].m_fix_header_size;
+      dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
+    }
+    else
+    {
+      dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+      sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
+    }
+    
+    if(! (copy_bits & Tuple_header::DISK_ALLOC))
+    {
+      disk_page_undo_update(diskPagePtr.p, 
+			    &key, dst, sz, gci, logfile_group_id);
+    }
+    
+    memcpy(dst, disk_ptr, 4*sz);
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+    
+    ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
+    copy_bits |= Tuple_header::DISK_PART;
+  }
+  
+  if(lcpScan_ptr_i != RNIL && (bits & Tuple_header::ALLOC))
+  {
+    ScanOpPtr scanOp;
+    c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
+    Local_key rowid = regOperPtr->m_tuple_location;
+    Local_key scanpos = scanOp.p->m_scanPos.m_key;
+    rowid.m_page_no = pagePtr.p->frag_page_id;
+    if(rowid > scanpos)
+    {
+       copy_bits |= Tuple_header::LCP_SKIP;
+    }
+  }
+  
+  Uint32 clear= 
+    Tuple_header::ALLOC | Tuple_header::FREE |
+    Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE | 
+    Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN;
+  copy_bits &= ~(Uint32)clear;
+  
+  tuple_ptr->m_header_bits= copy_bits;
+  tuple_ptr->m_operation_ptr_i= save;
+  
+  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
+  {
+    jam();
+    * tuple_ptr->get_mm_gci(regTabPtr) = gci;
+  }
+  
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
+    jam();
+    setChecksum(tuple_ptr, regTabPtr);
+  }
+}
 
 void
-Dbtup::commitSimple(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
-{
-  operPtr.p = regOperPtr;
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
-
-  // Checking detached triggers
-  checkDetachedTriggers(signal,
-                        regOperPtr,
-                        regTabPtr);
-
-  removeActiveOpList(regOperPtr);
-  if (regOperPtr->optype == ZUPDATE) {
-    ljam();
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else if (regOperPtr->optype == ZINSERT) {
-    ljam();
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else {
-    ndbrequire(regOperPtr->optype == ZDELETE);
-  }//if
-}//Dbtup::commitSimple()
-
-void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr)
-{
-  if (regOperPtr->inActiveOpList == ZTRUE) {
-    OperationrecPtr raoOperPtr;
-    regOperPtr->inActiveOpList = ZFALSE;
-    if (regOperPtr->prevActiveOp != RNIL) {
-      ljam();
-      raoOperPtr.i = regOperPtr->prevActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->nextActiveOp = regOperPtr->nextActiveOp;
-    } else {
-      ljam();
-      PagePtr pagePtr;
-      pagePtr.i = regOperPtr->realPageId;
-      ptrCheckGuard(pagePtr, cnoOfPage, page);
-      ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-      pagePtr.p->pageWord[regOperPtr->pageOffset] = regOperPtr->nextActiveOp;
-    }//if
-    if (regOperPtr->nextActiveOp != RNIL) {
-      ljam();
-      raoOperPtr.i = regOperPtr->nextActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->prevActiveOp = regOperPtr->prevActiveOp;
-    }//if
-    regOperPtr->prevActiveOp = RNIL;
-    regOperPtr->nextActiveOp = RNIL;
-  }//if
-}//Dbtup::removeActiveOpList()
+Dbtup::disk_page_commit_callback(Signal* signal, 
+				 Uint32 opPtrI, Uint32 page_id)
+{
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
 
-/* ---------------------------------------------------------------- */
-/* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
-/* ---------------------------------------------------------------- */
-void Dbtup::initOpConnection(Operationrec* regOperPtr,
-			     Fragrecord * fragPtrP)
+  jamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr.p->m_commit_disk_callback_page= page_id;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
+  
+  {
+    PagePtr tmp;
+    tmp.i = m_pgman.m_ptr.i;
+    tmp.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
+    disk_page_set_dirty(tmp);
+  }
+  
+  execTUP_COMMITREQ(signal);
+  if(signal->theData[0] == 0)
+    c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
+
+void
+Dbtup::disk_page_log_buffer_callback(Signal* signal, 
+				     Uint32 opPtrI,
+				     Uint32 unused)
 {
-  Uint32 RinFragList = regOperPtr->inFragList;
-  regOperPtr->transstate = IDLE;
-  regOperPtr->currentAttrinbufLen = 0;
-  regOperPtr->optype = ZREAD;
-  if (RinFragList == ZTRUE) {
-    OperationrecPtr tropNextLinkPtr;
-    OperationrecPtr tropPrevLinkPtr;
-/*----------------------------------------------------------------- */
-/*       TO ENSURE THAT WE HAVE SUCCESSFUL ABORTS OF FOLLOWING      */
-/*       OPERATIONS WHICH NEVER STARTED WE SET THE OPTYPE TO READ.  */
-/*----------------------------------------------------------------- */
-/*       REMOVE IT FROM THE DOUBLY LINKED LIST ON THE FRAGMENT      */
-/*----------------------------------------------------------------- */
-    tropPrevLinkPtr.i = regOperPtr->prevOprecInList;
-    tropNextLinkPtr.i = regOperPtr->nextOprecInList;
-    regOperPtr->inFragList = ZFALSE;
-    if (tropPrevLinkPtr.i == RNIL) {
-      ljam();
-      fragPtrP->firstusedOprec = tropNextLinkPtr.i;
-    } else {
-      ljam();
-      ptrCheckGuard(tropPrevLinkPtr, cnoOfOprec, operationrec);
-      tropPrevLinkPtr.p->nextOprecInList = tropNextLinkPtr.i;
-    }//if
-    if (tropNextLinkPtr.i == RNIL) {
-      fragPtrP->lastusedOprec = tropPrevLinkPtr.i;
-    } else {
-      ptrCheckGuard(tropNextLinkPtr, cnoOfOprec, operationrec);
-      tropNextLinkPtr.p->prevOprecInList = tropPrevLinkPtr.i;
-    }
-    regOperPtr->prevOprecInList = RNIL;
-    regOperPtr->nextOprecInList = RNIL;
-  }//if
-}//Dbtup::initOpConnection()
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
+
+  jamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
+  ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
+  regOperPtr.p->op_struct.m_wait_log_buffer= 0;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page);
+  
+  execTUP_COMMITREQ(signal);
+  ndbassert(signal->theData[0] == 0);
+  
+  c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
+
+void
+Dbtup::fix_commit_order(OperationrecPtr opPtr)
+{
+  ndbassert(!opPtr.p->is_first_operation());
+  OperationrecPtr firstPtr = opPtr;
+  while(firstPtr.p->prevActiveOp != RNIL)
+  {
+    firstPtr.i = firstPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(firstPtr);    
+  }
+
+  ndbout_c("fix_commit_order (swapping %d and %d)",
+	   opPtr.i, firstPtr.i);
+  
+  /**
+   * Swap data between first and curr
+   */
+  Uint32 prev= opPtr.p->prevActiveOp;
+  Uint32 next= opPtr.p->nextActiveOp;
+  Uint32 seco= firstPtr.p->nextActiveOp;
+
+  Operationrec tmp = *opPtr.p;
+  * opPtr.p = * firstPtr.p;
+  * firstPtr.p = tmp;
+
+  c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i;
+  c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i;
+  if(next != RNIL)
+    c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i;
+}
 
 /* ----------------------------------------------------------------- */
 /* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
@@ -269,327 +439,269 @@
   FragrecordPtr regFragPtr;
   OperationrecPtr regOperPtr;
   TablerecPtr regTabPtr;
+  KeyReqStruct req_struct;
+  TransState trans_state;
+  Uint32 no_of_fragrec, no_of_tablerec, hash_value, gci;
 
-  TupCommitReq * const tupCommitReq = (TupCommitReq *)signal->getDataPtr();
-
-  ljamEntry();
-  regOperPtr.i = tupCommitReq->opPtr;
-  ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
-
-  ndbrequire(regOperPtr.p->transstate == STARTED);
-  regOperPtr.p->gci = tupCommitReq->gci;
-  regOperPtr.p->hashValue = tupCommitReq->hashValue;
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
 
-  regFragPtr.i = regOperPtr.p->fragmentPtr;
-  ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
-
-  regTabPtr.i = regOperPtr.p->tableRef;
-  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
-
-  if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
-    ljam();
-    executeTuxCommitTriggers(signal,
-                             regOperPtr.p,
-                             regTabPtr.p);
-  }
-
-  if (regOperPtr.p->tupleState == NO_OTHER_OP) {
-    if ((regOperPtr.p->prevActiveOp == RNIL) &&
-        (regOperPtr.p->nextActiveOp == RNIL)) {
-      ljam();
-/* ---------------------------------------------------------- */
-// We handle the simple case separately as an optimisation
-/* ---------------------------------------------------------- */
-      commitSimple(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-    } else {
-/* ---------------------------------------------------------- */
-// This is the first commit message of this record in this
-// transaction. We will commit this record completely for this
-// transaction. If there are other operations they will be
-// responsible to release their own resources. Also commit of
-// a delete is postponed until the last operation is committed
-// on the tuple.
-//
-// As part of this commitRecord we will also handle detached
-// triggers and release of resources for this operation.
-/* ---------------------------------------------------------- */
-      ljam();
-      commitRecord(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-      removeActiveOpList(regOperPtr.p);
-    }//if
-  } else {
-    ljam();
-/* ---------------------------------------------------------- */
-// Release any copy tuples
-/* ---------------------------------------------------------- */
-    ndbrequire(regOperPtr.p->tupleState == TO_BE_COMMITTED);
-    commitUpdate(signal, regOperPtr.p, regFragPtr.p, regTabPtr.p);
-    removeActiveOpList(regOperPtr.p);
-  }//if
-  initOpConnection(regOperPtr.p, regFragPtr.p);
-}//execTUP_COMMITREQ()
-
-void
-Dbtup::updateGcpId(Signal* signal,
-                   Operationrec* const regOperPtr,
-                   Fragrecord* const regFragPtr,
-                   Tablerec* const regTabPtr)
-{
-  PagePtr pagePtr;
-  ljam();
-//--------------------------------------------------------------------
-// Is this code safe for UNDO logging. Not sure currently. RONM
-//--------------------------------------------------------------------
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 temp = regOperPtr->pageOffset + regTabPtr->tupGCPIndex;
-  ndbrequire((temp < ZWORDS_ON_PAGE) &&
-             (regTabPtr->tupGCPIndex < regTabPtr->tupheadsize));
-  if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
-    Uint32 prevGCI = pagePtr.p->pageWord[temp];
-    ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_UPDATE_GCI,
-                        regOperPtr->fragPageId,
-                        regOperPtr->pageIndex,
-                        regOperPtr->tableRef,
-                        regOperPtr->fragId,
-                        regFragPtr->checkpointVersion);
-    cprAddGCIUpdate(signal,
-                    prevGCI,
-                    regFragPtr);
-  }//if
-  pagePtr.p->pageWord[temp] = regOperPtr->gci;
-  if (regTabPtr->checksumIndicator) {
-    ljam();
-    setChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize);
-  }//if
-}//Dbtup::updateGcpId()
+  regOperPtr.i= tupCommitReq->opPtr;
+  jamEntry();
 
-void
-Dbtup::commitRecord(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
-{
-  Uint32 opType;
-  OperationrecPtr firstOpPtr;
-  PagePtr pagePtr;
-
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-
-  setTupleStatesSetOpType(regOperPtr, pagePtr.p, opType, firstOpPtr);
-
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
-  Uint32 hashValue = firstOpPtr.p->hashValue;
-
-  if (opType == ZINSERT_DELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting the tuple and ended by deleting. Seen from
-// transactions point of view no changes were made.
-//--------------------------------------------------------------------
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    return;
-  } else if (opType == ZINSERT) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting whereafter we made several changes to the
-// tuple that could include updates, deletes and new inserts. The final
-// state of the tuple is the original tuple. This is reached from this
-// operation. We change the optype on this operation to ZINSERT to
-// ensure proper operation of the detached trigger.
-// We restore the optype after executing triggers although not really
-// needed.
-//--------------------------------------------------------------------
-    Uint32 saveOpType = regOperPtr->optype;
-    regOperPtr->optype = ZINSERT;
-    regOperPtr->hashValue = hashValue;
-    operPtr.p = regOperPtr;
-
-    checkDetachedTriggers(signal,
-                          regOperPtr,
-                          regTabPtr);
-
-    regOperPtr->optype = saveOpType;
-  } else if (opType == ZUPDATE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple
-// reference. This operation contains the before value of this record
-// for this transaction. Then this operation is used for executing
-// triggers with optype set to update.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-
-    Uint32 saveOpType = befOpPtr.p->optype;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> saveAttributeMask;
-
-    calculateChangeMask(pagePtr.p,
-                        regTabPtr,
-                        befOpPtr.p->pageOffset,
-                        attributeMask);
-
-    saveAttributeMask.clear();
-    saveAttributeMask.bitOR(befOpPtr.p->changeMask);
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(attributeMask);
-    befOpPtr.p->gci = regOperPtr->gci;
-    befOpPtr.p->optype = ZUPDATE;
-    befOpPtr.p->hashValue = hashValue;
-    
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
-
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(saveAttributeMask);
-
-    befOpPtr.p->optype = saveOpType;
-  } else if (opType == ZDELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple.
-// We benefit from the fact that we know that it cannot be a simple
-// delete and it cannot be an insert followed by a delete. Thus there
-// must either be an update or a insert following a delete. In both
-// cases we will find a before value in a copy tuple.
-//
-// An added complexity is that the trigger handling assumes that the
-// before value is located in the original tuple so we have to move the
-// copy tuple reference to the original tuple reference and afterwards
-// restore it again.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-    Uint32 saveOpType = befOpPtr.p->optype;
-
-    Uint32 realPageId = befOpPtr.p->realPageId;
-    Uint32 pageOffset = befOpPtr.p->pageOffset;
-    Uint32 fragPageId = befOpPtr.p->fragPageId;
-    Uint32 pageIndex  = befOpPtr.p->pageIndex;
-
-    befOpPtr.p->optype = ZDELETE;
-    befOpPtr.p->realPageId = befOpPtr.p->realPageIdC;
-    befOpPtr.p->pageOffset = befOpPtr.p->pageOffsetC;
-    befOpPtr.p->fragPageId = befOpPtr.p->fragPageIdC;
-    befOpPtr.p->pageIndex  = befOpPtr.p->pageIndexC;
-    befOpPtr.p->gci = regOperPtr->gci;
-    befOpPtr.p->hashValue = hashValue;
-
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
-
-    befOpPtr.p->realPageId = realPageId;
-    befOpPtr.p->pageOffset = pageOffset;
-    befOpPtr.p->fragPageId = fragPageId;
-    befOpPtr.p->pageIndex  = pageIndex;
-    befOpPtr.p->optype     = saveOpType;
-  } else {
-    ndbrequire(false);
-  }//if
+  c_operation_pool.getPtr(regOperPtr);
+  if(!regOperPtr.p->is_first_operation())
+  {
+    /**
+     * Out of order commit   XXX check effect on triggers
+     */
+    fix_commit_order(regOperPtr);
+  }
+  ndbassert(regOperPtr.p->is_first_operation());
+  
+  regFragPtr.i= regOperPtr.p->fragmentPtr;
+  trans_state= get_trans_state(regOperPtr.p);
+
+  no_of_fragrec= cnoOfFragrec;
+
+  ndbrequire(trans_state == TRANS_STARTED);
+  ptrCheckGuard(regFragPtr, no_of_fragrec, fragrecord);
+
+  no_of_tablerec= cnoOfTablerec;
+  regTabPtr.i= regFragPtr.p->fragTableId;
+  hash_value= tupCommitReq->hashValue;
+  gci= tupCommitReq->gci;
+
+  req_struct.signal= signal;
+  req_struct.hash_value= hash_value;
+  req_struct.gci= gci;
+
+  ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
+
+  PagePtr page;
+  Tuple_header* tuple_ptr= (Tuple_header*)
+    get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
+  
+  bool get_page = false;
+  if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+  {
+    Page_cache_client::Request req;
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
+
+    /**
+     * Check for page
+     */
+    if(!regOperPtr.p->m_copy_tuple_location.isNull())
+    {
+      Tuple_header* tmp= (Tuple_header*)
+	c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location);
+      
+      memcpy(&req.m_page, 
+	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+      if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
+		   tmp->m_header_bits & Tuple_header::DISK_ALLOC))
+      {
+	jam();
+	/**
+	 * Insert+Delete
+	 */
+	regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+	regOperPtr.p->op_struct.m_wait_log_buffer = 0;	
+	disk_page_abort_prealloc(signal, regFragPtr.p, 
+				 &req.m_page, req.m_page.m_page_idx);
+	
+	c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id, 
+				regOperPtr.p->m_undo_buffer_space);
+	if (0) ndbout_c("insert+delete");
+	goto skip_disk;
+      }
+    } 
+    else
+    {
+      // initial delete
+      ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
+      memcpy(&req.m_page, 
+	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+      
+      ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+    }
+    req.m_callback.m_callbackData= regOperPtr.i;
+    req.m_callback.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_commit_callback);
+
+    /*
+     * Consider commit to be correlated.  Otherwise pk op + commit makes
+     * the page hot.   XXX move to TUP which knows better.
+     */
+    int flags= regOperPtr.p->op_struct.op_type |
+      Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
+    int res= m_pgman.get_page(signal, req, flags);
+    switch(res){
+    case 0:
+      /**
+       * Timeslice
+       */
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+    get_page = true;
 
-  commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-  if (regTabPtr->GCPIndicator) {
-    updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-  }//if
-}//Dbtup::commitRecord()
+    {
+      PagePtr tmpptr;
+      tmpptr.i = m_pgman.m_ptr.i;
+      tmpptr.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
+      disk_page_set_dirty(tmpptr);
+    }
+    
+    regOperPtr.p->m_commit_disk_callback_page= res;
+    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  } 
+  
+  if(regOperPtr.p->op_struct.m_wait_log_buffer)
+  {
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
+    
+    Callback cb;
+    cb.m_callbackData= regOperPtr.i;
+    cb.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_log_buffer_callback);
+    Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+    
+    Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+    int res= lgman.get_log_buffer(signal, sz, &cb);
+    switch(res){
+    case 0:
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+  }
+  
+  if(!tuple_ptr)
+  {
+    tuple_ptr = (Tuple_header*)
+      get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
+  }
+skip_disk:
+  req_struct.m_tuple_ptr = tuple_ptr;
+  
+  if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
+  {
+    /**
+     * Execute all tux triggers at first commit
+     *   since previous tuple is otherwise removed...
+     *   btw...is this a "good" solution??
+     *   
+     *   why can't we instead remove "own version" (when approriate ofcourse)
+     */
+    if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
+      jam();
+      OperationrecPtr loopPtr= regOperPtr;
+      while(loopPtr.i != RNIL)
+      {
+	c_operation_pool.getPtr(loopPtr);
+	executeTuxCommitTriggers(signal,
+				 loopPtr.p,
+				 regFragPtr.p,
+				 regTabPtr.p);
+	set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
+	loopPtr.i = loopPtr.p->nextActiveOp;
+      }
+    }
+  }
+  
+  if(regOperPtr.p->is_last_operation())
+  {
+    /**
+     * Perform "real" commit
+     */
+    set_change_mask_info(&req_struct, regOperPtr.p);
+    checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p);
+    
+    if(regOperPtr.p->op_struct.op_type != ZDELETE)
+    {
+      commit_operation(signal, gci, tuple_ptr, page,
+		       regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+    }
+    else
+    {
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+      if (get_page)
+	ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+      dealloc_tuple(signal, gci, page.p, tuple_ptr, 
+		    regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+    }
+  } 
+  else
+  {
+    removeActiveOpList(regOperPtr.p, tuple_ptr);   
+  }
+  
+  initOpConnection(regOperPtr.p);
+  signal->theData[0] = 0;
+}
 
 void
-Dbtup::setTupleStatesSetOpType(Operationrec* const regOperPtr,
-                               Page* const pagePtr,
-                               Uint32& opType,
-                               OperationrecPtr& firstOpPtr)
+Dbtup::set_change_mask_info(KeyReqStruct * const req_struct,
+                            Operationrec * const regOperPtr)
 {
-  OperationrecPtr loopOpPtr;
-  OperationrecPtr lastOpPtr;
-
-  ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[regOperPtr->pageOffset];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  lastOpPtr = loopOpPtr;
-  if (loopOpPtr.p->optype == ZDELETE) {
-    ljam();
-    opType = ZDELETE;
+  ChangeMaskState state = get_change_mask_state(regOperPtr);
+  if (state == USE_SAVED_CHANGE_MASK) {
+    jam();
+    req_struct->changeMask.setWord(0, regOperPtr->saved_change_mask[0]);
+    req_struct->changeMask.setWord(1, regOperPtr->saved_change_mask[1]);
+  } else if (state == RECALCULATE_CHANGE_MASK) {
+    jam();
+    // Recompute change mask, for now set all bits
+    req_struct->changeMask.set();
+  } else if (state == SET_ALL_MASK) {
+    jam();
+    req_struct->changeMask.set();
   } else {
-    ljam();
-    opType = ZUPDATE;
-  }//if
-  do {
-    ljam();
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    firstOpPtr = loopOpPtr;
-    loopOpPtr.p->tupleState = TO_BE_COMMITTED;
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-  } while (loopOpPtr.i != RNIL);
-  if (opType == ZDELETE) {
-    ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT_DELETE;
-    }//if
-  } else {
-    ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT;
-    }//if
-  }///if
-}//Dbtup::setTupleStatesSetOpType()
-
-void Dbtup::findBeforeValueOperation(OperationrecPtr& befOpPtr,
-                                     OperationrecPtr firstOpPtr)
-{
-  befOpPtr = firstOpPtr;
-  if (befOpPtr.p->realPageIdC != RNIL) {
-    ljam();
-    return;
-  } else {
-    ljam();
-    befOpPtr.i = befOpPtr.p->prevActiveOp;
-    ptrCheckGuard(befOpPtr, cnoOfOprec, operationrec);
-    ndbrequire(befOpPtr.p->realPageIdC != RNIL);
-  }//if
-}//Dbtup::findBeforeValueOperation()
+    jam();
+    ndbrequire(state == DELETE_CHANGES);
+    req_struct->changeMask.set();
+  }
+}
 
 void
 Dbtup::calculateChangeMask(Page* const pagePtr,
                            Tablerec* const regTabPtr,
-                           Uint32 pageOffset,
-                           Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask)
+                           KeyReqStruct * const req_struct)
 {
   OperationrecPtr loopOpPtr;
-
-  attributeMask.clear();
-  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[pageOffset];
+  Uint32 saved_word1= 0;
+  Uint32 saved_word2= 0;
+  loopOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
   do {
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    if (loopOpPtr.p->optype == ZUPDATE) {
-      ljam();
-      attributeMask.bitOR(loopOpPtr.p->changeMask);
-    } else if (loopOpPtr.p->optype == ZINSERT) {
-      ljam();
-      attributeMask.set();
+    c_operation_pool.getPtr(loopOpPtr);
+    ndbrequire(loopOpPtr.p->op_struct.op_type == ZUPDATE);
+    ChangeMaskState change_mask= get_change_mask_state(loopOpPtr.p);
+    if (change_mask == USE_SAVED_CHANGE_MASK) {
+      jam();
+      saved_word1|= loopOpPtr.p->saved_change_mask[0];
+      saved_word2|= loopOpPtr.p->saved_change_mask[1];
+    } else if (change_mask == RECALCULATE_CHANGE_MASK) {
+      jam();
+      //Recompute change mask, for now set all bits
+      req_struct->changeMask.set();
       return;
     } else {
-      ndbrequire(loopOpPtr.p->optype == ZDELETE);
-    }//if
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+      ndbrequire(change_mask == SET_ALL_MASK);
+      jam();
+      req_struct->changeMask.set();
+      return;
+    }
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
   } while (loopOpPtr.i != RNIL);
-}//Dbtup::calculateChangeMask()
+  req_struct->changeMask.setWord(0, saved_word1);
+  req_struct->changeMask.setWord(1, saved_word2);
+}

--- 1.49.26.1/ndb/src/ndbapi/Ndb.cpp	2007-04-11 19:29:30 +02:00
+++ 1.92/storage/ndb/src/ndbapi/Ndb.cpp	2007-04-11 20:30:02 +02:00
@@ -27,6 +27,8 @@
 #include "NdbImpl.hpp"
 #include <NdbOperation.hpp>
 #include <NdbTransaction.hpp>
+#include <NdbEventOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
 #include <NdbRecAttr.hpp>
 #include <md5_hash.hpp>
 #include <NdbSleep.h>
@@ -142,7 +144,7 @@
 //***************************************************************************
   
   int	         tReturnCode;
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
 
   DBUG_ENTER("Ndb::NDB_connect");
 
@@ -177,24 +179,9 @@
   tSignal->setData(theMyRef, 2);	// Set my block reference
   tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
   Uint32 nodeSequence;
-  { // send and receive signal
-    Guard guard(tp->theMutexPtr);
-    nodeSequence = tp->getNodeSequence(tNode);
-    bool node_is_alive = tp->get_node_alive(tNode);
-    if (node_is_alive) { 
-      DBUG_PRINT("info",("Sending signal to node %u", tNode));
-      tReturnCode = tp->sendSignal(tSignal, tNode);  
-      releaseSignal(tSignal); 
-      if (tReturnCode != -1) {
-        theImpl->theWaiter.m_node = tNode;  
-        theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
-        tReturnCode = receiveResponse(); 
-      }//if
-    } else {
-      releaseSignal(tSignal);
-      tReturnCode = -1;
-    }//if
-  }
+  tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
+                             0, &nodeSequence);
+  releaseSignal(tSignal); 
   if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
     //************************************************
     // Send and receive was successful
@@ -240,10 +227,9 @@
 void 
 Ndb::doDisconnect()
 {
+  DBUG_ENTER("Ndb::doDisconnect");
   NdbTransaction* tNdbCon;
   CHECK_STATUS_MACRO_VOID;
-  /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */
-  DBUG_ENTER("Ndb::doDisconnect");
 
   Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
   Uint8 *theDBnodes= theImpl->theDBnodes;
@@ -603,6 +589,7 @@
 #ifdef CUSTOMER_RELEASE
   return -1;
 #else
+  DBUG_ENTER("Ndb::NdbTamper");
   CHECK_STATUS_MACRO;
   checkFailedNode();
 
@@ -624,20 +611,20 @@
 	break;
      default:
         theError.code = 4102;
-        return -1;
+        DBUG_RETURN(-1);
   }
 
   tNdbConn = getNdbCon();	// Get free connection object
   if (tNdbConn == NULL) {
     theError.code = 4000;
-    return -1;
+    DBUG_RETURN(-1);
   }
   tSignal.setSignal(GSN_DIHNDBTAMPER);
   tSignal.setData (tAction, 1);
   tSignal.setData(tNdbConn->ptr2int(),2);
   tSignal.setData(theMyRef,3);		// Set return block reference
   tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
   if (tAction == 3) {
     tp->lock_mutex();
     tp->sendSignal(&tSignal, aNode);
@@ -649,12 +636,12 @@
     if (tNode == 0) {
       theError.code = 4002;
       releaseNdbCon(tNdbConn);
-      return -1;
+      DBUG_RETURN(-1);
     }//if
     ret_code = tp->sendSignal(&tSignal,aNode);
     tp->unlock_mutex();
     releaseNdbCon(tNdbConn);
-    return ret_code;
+    DBUG_RETURN(ret_code);
   } else {
     do {
       tp->lock_mutex();
@@ -665,7 +652,7 @@
       if (tNode == 0) {
         theError.code = 4009;
         releaseNdbCon(tNdbConn);
-        return -1;
+        DBUG_RETURN(-1);
       }//if
       ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
       if (ret_code == 0) {  
@@ -673,15 +660,15 @@
           theRestartGCI = 0;
         }//if
         releaseNdbCon(tNdbConn);
-        return theRestartGCI;
+        DBUG_RETURN(theRestartGCI);
       } else if ((ret_code == -5) || (ret_code == -2)) {
         TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
       } else {
-        return -1;
+        DBUG_RETURN(-1);
       }//if
     } while (1);
   }
-  return 0;
+  DBUG_RETURN(0);
 #endif
 }
 #if 0
@@ -780,15 +767,18 @@
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
   DBUG_RETURN(0);
@@ -799,31 +789,48 @@
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 & tupleId,
+                           Uint32 cacheSize)
+{
+  DBUG_ENTER("Ndb::getAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::getTupleIdFromNdb(Ndb_local_table_info* info,
-                       Uint64 & tupleId, Uint32 cacheSize)
+Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
+                       TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = ++info->m_first_tuple_id;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = ++range.m_first_tuple_id;
     DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
   }
   else
@@ -836,7 +843,7 @@
      * and returns first tupleId in the new range.
      */
     Uint64 opValue = cacheSize;
-    if (opTupleIdOnNdb(info, opValue, 0) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -848,15 +855,18 @@
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
@@ -867,31 +877,47 @@
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::readTupleIdFromNdb(Ndb_local_table_info* info,
-                        Uint64 & tupleId)
+Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
+                            TupleIdRange & range, Uint64 & tupleId)
+{
+  DBUG_ENTER("Ndb::readAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
+                        TupleIdRange & range, Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = info->m_first_tuple_id + 1;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = range.m_first_tuple_id + 1;
   }
   else
   {
@@ -900,7 +926,7 @@
      * only if no other transactions are allowed.
      */
     Uint64 opValue = 0;
-    if (opTupleIdOnNdb(info, opValue, 3) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -912,15 +938,18 @@
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
@@ -930,36 +959,52 @@
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 tupleId,
+                           bool increase)
+{
+  DBUG_ENTER("Ndb::setAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
 int
-Ndb::setTupleIdInNdb(Ndb_local_table_info* info,
-                     Uint64 tupleId, bool increase)
+Ndb::setTupleIdInNdb(const NdbTableImpl* table,
+                     TupleIdRange & range, Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setTupleIdInNdb");
   if (increase)
   {
-    if (info->m_first_tuple_id != info->m_last_tuple_id)
+    if (range.m_first_tuple_id != range.m_last_tuple_id)
     {
-      assert(info->m_first_tuple_id < info->m_last_tuple_id);
-      if (tupleId <= info->m_first_tuple_id + 1)
+      assert(range.m_first_tuple_id < range.m_last_tuple_id);
+      if (tupleId <= range.m_first_tuple_id + 1)
 	DBUG_RETURN(0);
-      if (tupleId <= info->m_last_tuple_id)
+      if (tupleId <= range.m_last_tuple_id)
       {
-	info->m_first_tuple_id = tupleId - 1;
+	range.m_first_tuple_id = tupleId - 1;
         DBUG_PRINT("info", 
                    ("Setting next auto increment cached value to %lu",
                     (ulong)tupleId));  
@@ -970,7 +1015,7 @@
      * if tupleId <= NEXTID, do nothing.  otherwise update NEXTID to
      * tupleId and set cached range to first = last = tupleId - 1.
      */
-    if (opTupleIdOnNdb(info, tupleId, 2) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
       DBUG_RETURN(-1);
   }
   else
@@ -978,42 +1023,62 @@
     /*
      * update NEXTID to given value.  reset cached range.
      */
-    if (opTupleIdOnNdb(info, tupleId, 1) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
       DBUG_RETURN(-1);
   }
   DBUG_RETURN(0);
 }
 
+int Ndb::initAutoIncrement()
+{
+  if (m_sys_tab_0)
+    return 0;
+
+  BaseString currentDb(getDatabaseName());
+  BaseString currentSchema(getDatabaseSchemaName());
+
+  setDatabaseName("sys");
+  setDatabaseSchemaName("def");
+
+  m_sys_tab_0 = theDictionary->getTableGlobal("SYSTAB_0");
+
+  // Restore current name space
+  setDatabaseName(currentDb.c_str());
+  setDatabaseSchemaName(currentSchema.c_str());
+
+  if (m_sys_tab_0 == NULL) {
+    assert(theDictionary->m_error.code != 0);
+    theError.code = theDictionary->m_error.code;
+    return -1;
+  }
+
+  return 0;
+}
+
 int
-Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op)
+Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
+                    TupleIdRange & range, Uint64 & opValue, Uint32 op)
 {
   DBUG_ENTER("Ndb::opTupleIdOnNdb");
-  Uint32 aTableId = info->m_table_impl->m_tableId;
+  Uint32 aTableId = table->m_id;
   DBUG_PRINT("enter", ("table: %u  value: %lu  op: %u",
                        aTableId, (ulong) opValue, op));
 
-  NdbTransaction*     tConnection;
-  NdbOperation*      tOperation= 0; // Compiler warning if not initialized
+  NdbTransaction*    tConnection = NULL;
+  NdbOperation*      tOperation = NULL;
   Uint64             tValue;
   NdbRecAttr*        tRecAttrResult;
 
-  NdbError savedError;
-
-  CHECK_STATUS_MACRO_ZERO;
+  CHECK_STATUS_MACRO;
 
-  BaseString currentDb(getDatabaseName());
-  BaseString currentSchema(getDatabaseSchemaName());
+  if (initAutoIncrement() == -1)
+    goto error_handler;
 
-  setDatabaseName("sys");
-  setDatabaseSchemaName("def");
   tConnection = this->startTransaction();
   if (tConnection == NULL)
-    goto error_return;
+    goto error_handler;
 
-  if (usingFullyQualifiedNames())
-    tOperation = tConnection->getNdbOperation("SYSTAB_0");
-  else
-    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
+  tOperation = tConnection->getNdbOperation(m_sys_tab_0);
   if (tOperation == NULL)
     goto error_handler;
 
@@ -1021,18 +1086,18 @@
     {
     case 0:
       tOperation->interpretedUpdateTuple();
-      tOperation->equal("SYSKEY_0", aTableId );
+      tOperation->equal("SYSKEY_0", aTableId);
       tOperation->incValue("NEXTID", opValue);
       tRecAttrResult = tOperation->getValue("NEXTID");
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
       tValue = tRecAttrResult->u_64_value();
 
-      info->m_first_tuple_id = tValue - opValue;
-      info->m_last_tuple_id  = tValue - 1;
-      opValue = info->m_first_tuple_id; // out
+      range.m_first_tuple_id = tValue - opValue;
+      range.m_last_tuple_id  = tValue - 1;
+      opValue = range.m_first_tuple_id; // out
       break;
     case 1:
       // create on first use
@@ -1040,11 +1105,10 @@
       tOperation->equal("SYSKEY_0", aTableId );
       tOperation->setValue("NEXTID", opValue);
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
-      info->m_first_tuple_id = ~(Uint64)0;
-      info->m_last_tuple_id  = ~(Uint64)0;
+      range.reset();
       break;
     case 2:
       tOperation->interpretedUpdateTuple();
@@ -1058,7 +1122,7 @@
       tOperation->def_label(0);
       tOperation->interpret_exit_nok(9999);
       
-      if (tConnection->execute( Commit ) == -1)
+      if (tConnection->execute( NdbTransaction::Commit ) == -1)
       {
         if (tConnection->theError.code != 9999)
           goto error_handler;
@@ -1067,15 +1131,15 @@
       {
         DBUG_PRINT("info", 
                    ("Setting next auto increment value (db) to %lu",
-                    (ulong)opValue));  
-        info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
+                    (ulong) opValue));  
+        range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
       }
       break;
     case 3:
       tOperation->readTuple();
       tOperation->equal("SYSKEY_0", aTableId );
       tRecAttrResult = tOperation->getValue("NEXTID");
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
       opValue = tRecAttrResult->u_64_value(); // out
       break;
@@ -1085,29 +1149,28 @@
 
   this->closeTransaction(tConnection);
 
-  // Restore current name space
-  setDatabaseName(currentDb.c_str());
-  setDatabaseSchemaName(currentSchema.c_str());
-
   DBUG_RETURN(0);
 
-  error_handler:
+error_handler:
+  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
+             theError.code,
+             tConnection != NULL ? tConnection->theError.code : -1,
+             tOperation != NULL ? tOperation->theError.code : -1));
+
+  if (theError.code == 0 && tConnection != NULL)
     theError.code = tConnection->theError.code;
+  if (theError.code == 0 && tOperation != NULL)
+    theError.code = tOperation->theError.code;
+  DBUG_ASSERT(theError.code != 0);
 
-    savedError = theError;
+  NdbError savedError;
+  savedError = theError;
 
+  if (tConnection != NULL)
     this->closeTransaction(tConnection);
-    theError = savedError;
 
-  error_return:
-    // Restore current name space
-    setDatabaseName(currentDb.c_str());
-    setDatabaseSchemaName(currentSchema.c_str());
+  theError = savedError;
 
-  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
-             theError.code,
-             tConnection ? tConnection->theError.code : -1,
-             tOperation ? tOperation->theError.code : -1));
   DBUG_RETURN(-1);
 }
 
@@ -1128,16 +1191,23 @@
   return Data;
 #endif
 }
+
+// <internal>
+Ndb_cluster_connection &
+Ndb::get_ndb_cluster_connection()
+{
+  return theImpl->m_ndb_cluster_connection;
+}
+
 const char * Ndb::getCatalogName() const
 {
   return theImpl->m_dbname.c_str();
 }
 
-
 int Ndb::setCatalogName(const char * a_catalog_name)
 {
-  if (a_catalog_name)
-  {
+  // TODO can table_name_separator be escaped?
+  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
     if (!theImpl->m_dbname.assign(a_catalog_name) ||
         theImpl->update_prefix())
     {
@@ -1153,10 +1223,10 @@
   return theImpl->m_schemaname.c_str();
 }
 
-
 int Ndb::setSchemaName(const char * a_schema_name)
 {
-  if (a_schema_name) {
+  // TODO can table_name_separator be escaped?
+  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
     if (!theImpl->m_schemaname.assign(a_schema_name) ||
         theImpl->update_prefix())
     {
@@ -1166,10 +1236,8 @@
   }
   return 0;
 }
+// </internal>
  
-/*
-Deprecated functions
-*/
 const char * Ndb::getDatabaseName() const
 {
   return getCatalogName();
@@ -1189,6 +1257,26 @@
 {
   return setSchemaName(a_schema_name);
 }
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+  const char* s0 = t->m_impl.m_internalName.c_str();
+  const char* s1 = strchr(s0, table_name_separator);
+  if (s1 && s1 != s0) {
+    const char* s2 = strchr(s1 + 1, table_name_separator);
+    if (s2 && s2 != s1 + 1) {
+      char buf[NAME_LEN + 1];
+      if (s1 - s0 <= NAME_LEN && s2 - (s1 + 1) <= NAME_LEN) {
+        sprintf(buf, "%.*s", (int) (s1 - s0), s0);
+        setDatabaseName(buf);
+        sprintf(buf, "%.*s", (int) (s2 - (s1 + 1)), s1 + 1);
+        setDatabaseSchemaName(buf);
+        return 0;
+      }
+    }
+  }
+  return -1;
+}
  
 bool Ndb::usingFullyQualifiedNames()
 {
@@ -1251,9 +1339,16 @@
   if (fullyQualifiedNames)
   {
     /* Internal table name format <db>/<schema>/<table>
-       <db>/<schema> is already available in m_prefix
+       <db>/<schema>/ is already available in m_prefix
        so just concat the two strings
      */
+#ifdef VM_TRACE
+    // verify that m_prefix looks like abc/def/
+    const char* s0 = theImpl->m_prefix.c_str();
+    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
+#endif
     ret.assfmt("%s%s",
                theImpl->m_prefix.c_str(),
                external_name);
@@ -1265,6 +1360,35 @@
   DBUG_RETURN(ret);
 }
 
+const BaseString
+Ndb::old_internalize_index_name(const NdbTableImpl * table,
+				const char * external_name) const
+{
+  BaseString ret;
+  DBUG_ENTER("old_internalize_index_name");
+  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
+                       external_name, table ? table->m_id : ~0));
+  if (!table)
+  {
+    DBUG_PRINT("error", ("!table"));
+    DBUG_RETURN(ret);
+  }
+
+  if (fullyQualifiedNames)
+  {
+    /* Internal index name format <db>/<schema>/<tabid>/<table> */
+    ret.assfmt("%s%d%c%s",
+               theImpl->m_prefix.c_str(),
+               table->m_id,
+               table_name_separator,
+               external_name);
+  }
+  else
+    ret.assign(external_name);
+
+  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
+  DBUG_RETURN(ret);
+}
 
 const BaseString
 Ndb::internalize_index_name(const NdbTableImpl * table,
@@ -1273,7 +1397,7 @@
   BaseString ret;
   DBUG_ENTER("internalize_index_name");
   DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
-                       external_name, table ? table->m_tableId : ~0));
+                       external_name, table ? table->m_id : ~0));
   if (!table)
   {
     DBUG_PRINT("error", ("!table"));
@@ -1282,10 +1406,10 @@
 
   if (fullyQualifiedNames)
   {
-    /* Internal index name format <db>/<schema>/<tabid>/<table> */
+    /* Internal index name format sys/def/<tabid>/<table> */
     ret.assfmt("%s%d%c%s",
-               theImpl->m_prefix.c_str(),
-               table->m_tableId,
+               theImpl->m_systemPrefix.c_str(),
+               table->m_id,
                table_name_separator,
                external_name);
   }
@@ -1340,6 +1464,113 @@
   BaseString ret = BaseString(schemaName);
   delete [] schemaName;
   return ret;
+}
+
+// ToDo set event buffer size
+NdbEventOperation* Ndb::createEventOperation(const char* eventName)
+{
+  DBUG_ENTER("Ndb::createEventOperation");
+  NdbEventOperation* tOp= theEventBuffer->createEventOperation(eventName,
+							       theError);
+  if (tOp)
+  {
+    // keep track of all event operations
+    NdbEventOperationImpl *op=
+      NdbEventBuffer::getEventOperationImpl(tOp);
+    op->m_next= theImpl->m_ev_op;
+    op->m_prev= 0;
+    theImpl->m_ev_op= op;
+    if (op->m_next)
+      op->m_next->m_prev= op;
+  }
+
+  DBUG_RETURN(tOp);
+}
+
+int Ndb::dropEventOperation(NdbEventOperation* tOp)
+{
+  DBUG_ENTER("Ndb::dropEventOperation");
+  DBUG_PRINT("info", ("name: %s", tOp->getEvent()->getTable()->getName()));
+  // remove it from list
+  NdbEventOperationImpl *op=
+    NdbEventBuffer::getEventOperationImpl(tOp);
+  if (op->m_next)
+    op->m_next->m_prev= op->m_prev;
+  if (op->m_prev)
+    op->m_prev->m_next= op->m_next;
+  else
+    theImpl->m_ev_op= op->m_next;
+
+  DBUG_PRINT("info", ("first: %s",
+                      theImpl->m_ev_op ? theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
+  assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);
+
+  theEventBuffer->dropEventOperation(tOp);
+  DBUG_RETURN(0);
+}
+
+NdbEventOperation *Ndb::getEventOperation(NdbEventOperation* tOp)
+{
+  NdbEventOperationImpl *op;
+  if (tOp)
+    op= NdbEventBuffer::getEventOperationImpl(tOp)->m_next;
+  else
+    op= theImpl->m_ev_op;
+  if (op)
+    return op->m_facade;
+  return 0;
+}
+
+int
+Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
+{
+  return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
+}
+
+int
+Ndb::flushIncompleteEvents(Uint64 gci)
+{
+  return theEventBuffer->flushIncompleteEvents(gci);
+}
+
+NdbEventOperation *Ndb::nextEvent()
+{
+  return theEventBuffer->nextEvent();
+}
+
+const NdbEventOperation*
+Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
+{
+  NdbEventOperationImpl* op =
+    theEventBuffer->getGCIEventOperations(iter, event_types);
+  if (op != NULL)
+    return op->m_facade;
+  return NULL;
+}
+
+Uint64 Ndb::getLatestGCI()
+{
+  return theEventBuffer->getLatestGCI();
+}
+
+void Ndb::setReportThreshEventGCISlip(unsigned thresh)
+{
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+   theEventBuffer->m_free_thresh= thresh;
+   theEventBuffer->m_min_free_thresh= thresh;
+   theEventBuffer->m_max_free_thresh= 100;
+ }
+}
+
+void Ndb::setReportThreshEventFreeMem(unsigned thresh)
+{
+  if (theEventBuffer->m_free_thresh != thresh)
+  {
+    theEventBuffer->m_free_thresh= thresh;
+    theEventBuffer->m_min_free_thresh= thresh;
+    theEventBuffer->m_max_free_thresh= 100;
+  }
 }
 
 #ifdef VM_TRACE

--- 1.27.12.1/ndb/src/ndbapi/Ndbif.cpp	2007-04-11 20:22:29 +02:00
+++ 1.52/storage/ndb/src/ndbapi/Ndbif.cpp	2007-04-11 20:30:02 +02:00
@@ -25,6 +25,7 @@
 #include <NdbRecAttr.hpp>
 #include <NdbReceiver.hpp>
 #include "API.hpp"
+#include "NdbEventOperationImpl.hpp"
 
 #include <signaldata/TcCommit.hpp>
 #include <signaldata/TcKeyFailConf.hpp>
@@ -36,11 +37,14 @@
 #include <signaldata/TransIdAI.hpp>
 #include <signaldata/ScanFrag.hpp>
 #include <signaldata/ScanTab.hpp>
+#include <signaldata/SumaImpl.hpp>
 
 #include <ndb_limits.h>
 #include <NdbOut.hpp>
 #include <NdbTick.h>
 
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
 
 /******************************************************************************
  * int init( int aNrOfCon, int aNrOfOp );
@@ -73,7 +77,7 @@
     DBUG_RETURN(-1);
   }//if
   theInitState = StartingInit;
-  TransporterFacade * theFacade =  TransporterFacade::instance();
+  TransporterFacade * theFacade =  theImpl->m_transporter_facade;
   theFacade->lock_mutex();
   
   const int tBlockNo = theFacade->open(this,
@@ -141,7 +145,7 @@
   ndbout << "error_handler" << endl;
   releaseTransactionArrays();
   delete theDictionary;
-  TransporterFacade::instance()->close(theNdbBlockNumber, 0);
+  theImpl->m_transporter_facade->close(theNdbBlockNumber, 0);
   DBUG_RETURN(-1);
 }
 
@@ -172,6 +176,7 @@
 
 void Ndb::connected(Uint32 ref)
 {
+// cluster connect, a_node == own reference
   theMyRef= ref;
   Uint32 tmpTheNode= refToNode(ref);
   Uint64 tBlockNo= refToBlock(ref);
@@ -179,7 +184,7 @@
     assert(theMyRef == numberToRef(theNdbBlockNumber, tmpTheNode));
   }
   
-  TransporterFacade * theFacade =  TransporterFacade::instance();
+  TransporterFacade * theFacade =  theImpl->m_transporter_facade;
   int i, n= 0;
   for (i = 1; i < MAX_NDB_NODES; i++){
     if (theFacade->getIsDbNode(i)){
@@ -187,7 +192,8 @@
       n++;
     }
   }
-  theImpl->theNoOfDBnodes= n;
+  theImpl->theNoOfDBnodes = n;
+  
   theFirstTransId = ((Uint64)tBlockNo << 52)+
     ((Uint64)tmpTheNode << 40);
   theFirstTransId += theFacade->m_max_trans_id;
@@ -203,16 +209,30 @@
   theNode= tmpTheNode; // flag that Ndb object is initialized
 }
 
+void Ndb::report_node_connected(Uint32 nodeId)
+{
+  if (theEventBuffer)
+  {
+    // node connected
+    // eventOperations in the ndb object should be notified
+    theEventBuffer->report_node_connected(nodeId);
+  }
+}
+
 void
 Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete)
 {
   DBUG_ENTER("Ndb::statusMessage");
+  DBUG_PRINT("info", ("a_node: %u  alive: %u  nfComplete: %u",
+                      a_node, alive, nfComplete));
   Ndb* tNdb = (Ndb*)NdbObject;
   if (alive) {
     if (nfComplete) {
+      // cluster connect, a_node == own reference
       tNdb->connected(a_node);
       DBUG_VOID_RETURN;
     }//if
+    tNdb->report_node_connected(a_node);
   } else {
     if (nfComplete) {
       tNdb->report_node_failure_completed(a_node);
@@ -234,6 +254,7 @@
    * 
    * This method is only called by ClusterMgr (via lots of methods)
    */
+
   theImpl->the_release_ind[node_id] = 1;
   // must come after
   theImpl->the_release_ind[0] = 1;
@@ -245,6 +266,19 @@
 void
 Ndb::report_node_failure_completed(Uint32 node_id)
 {
+  if (theEventBuffer)
+  {
+    // node failed
+    // eventOperations in the ndb object should be notified
+    theEventBuffer->report_node_failure(node_id);
+    if(!theImpl->m_transporter_facade->theClusterMgr->isClusterAlive())
+    {
+      // cluster is unavailable, 
+      // eventOperations in the ndb object should be notified
+      theEventBuffer->completeClusterFailed();
+    }
+  }
+  
   abortTransactionsAfterNodeFailure(node_id);
 
 }//Ndb::report_node_failure_completed()
@@ -322,6 +356,7 @@
   const Uint32 tFirstData = *tDataPtr;
   const Uint32 tLen = aSignal->getLength();
   void * tFirstDataPtr;
+  NdbWaiter *t_waiter;
 
   /*
     In order to support 64 bit processes in the application we need to use
@@ -351,10 +386,11 @@
         }//if
 
 	if(TcKeyConf::getMarkerFlag(keyConf->confInfo)){
-	  NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
-					   keyConf->transId1, 
-					   keyConf->transId2,
-					   aTCRef);
+	  NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
+                                            theCommitAckSignal,
+                                            keyConf->transId1, 
+                                            keyConf->transId2,
+                                            aTCRef);
 	}
       
 	return;
@@ -430,10 +466,11 @@
 #endif
       }
       if(tFirstData & 1){
-	NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
-					 failConf->transId1, 
-					 failConf->transId2,
-					 aTCRef);
+	NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
+                                          theCommitAckSignal,
+                                          failConf->transId1, 
+                                          failConf->transId2,
+                                          aTCRef);
       }
       return;
     }
@@ -460,7 +497,7 @@
       ndbout_c("Recevied TCKEY_FAILREF wo/ operation");
 #endif
       return;
-      break;
+      return;
     }
   case GSN_TCKEYREF:
     {
@@ -501,10 +538,11 @@
 	}//if
 
 	if(tFirstData & 1){
-	  NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
-					   commitConf->transId1, 
-					   commitConf->transId2,
-					   aTCRef);
+	  NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
+                                            theCommitAckSignal,
+                                            commitConf->transId1, 
+                                            commitConf->transId2,
+                                            aTCRef);
 	}
 	return;
       }
@@ -660,11 +698,75 @@
   case GSN_CREATE_INDX_REF:
   case GSN_DROP_INDX_CONF:
   case GSN_DROP_INDX_REF:
+  case GSN_CREATE_EVNT_CONF:
+  case GSN_CREATE_EVNT_REF:
+  case GSN_DROP_EVNT_CONF:
+  case GSN_DROP_EVNT_REF:
   case GSN_LIST_TABLES_CONF:
+  case GSN_CREATE_FILE_REF:
+  case GSN_CREATE_FILE_CONF:
+  case GSN_CREATE_FILEGROUP_REF:
+  case GSN_CREATE_FILEGROUP_CONF:
+  case GSN_DROP_FILE_REF:
+  case GSN_DROP_FILE_CONF:
+  case GSN_DROP_FILEGROUP_REF:
+  case GSN_DROP_FILEGROUP_CONF:
+  case GSN_WAIT_GCP_CONF:
+  case GSN_WAIT_GCP_REF:
     NdbDictInterface::execSignal(&theDictionary->m_receiver,
 				 aSignal, ptr);
-    break;
+    return;
+    
+  case GSN_SUB_REMOVE_CONF:
+  case GSN_SUB_REMOVE_REF:
+    return; // ignore these signals
+  case GSN_SUB_START_CONF:
+  case GSN_SUB_START_REF:
+  case GSN_SUB_STOP_CONF:
+  case GSN_SUB_STOP_REF:
+    NdbDictInterface::execSignal(&theDictionary->m_receiver,
+				 aSignal, ptr);
+    return;
+  case GSN_SUB_GCP_COMPLETE_REP:
+  {
+    const SubGcpCompleteRep * const rep=
+      CAST_CONSTPTR(SubGcpCompleteRep, aSignal->getDataPtr());
+    theEventBuffer->execSUB_GCP_COMPLETE_REP(rep);
+    return;
+  }
+  case GSN_SUB_TABLE_DATA:
+  {
+    const SubTableData * const sdata=
+      CAST_CONSTPTR(SubTableData, aSignal->getDataPtr());
+    const Uint32 oid = sdata->senderData;
+    NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
+
+    if (unlikely(op == 0 || op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER))
+    {
+      g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
+			  "number");
+      return ;
+    }
+
+    // Accumulate DIC_TAB_INFO for TE_ALTER events
+    if (SubTableData::getOperation(sdata->requestInfo) == 
+	NdbDictionary::Event::_TE_ALTER &&
+        !op->execSUB_TABLE_DATA(aSignal, ptr))
+      return;
     
+    for (int i= aSignal->m_noOfSections;i < 3; i++) {
+      ptr[i].p = NULL;
+      ptr[i].sz = 0;
+    }
+    DBUG_PRINT("info",("oid=senderData: %d, gci: %d, operation: %d, "
+		       "tableId: %d",
+		       sdata->senderData, sdata->gci, 
+		       SubTableData::getOperation(sdata->requestInfo),
+		       sdata->tableId));
+
+    theEventBuffer->insertDataL(op,sdata, ptr);
+    return;
+  }
   case GSN_DIHNDBTAMPER:
     {
       tFirstDataPtr = int2void(tFirstData);
@@ -776,10 +878,11 @@
     }//if
     
     if(TcIndxConf::getMarkerFlag(indxConf->confInfo)){
-      NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
-				       indxConf->transId1, 
-				       indxConf->transId2,
-				       aTCRef);
+      NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
+                                        theCommitAckSignal,
+                                        indxConf->transId1, 
+                                        indxConf->transId2,
+                                        aTCRef);
     }
     return;
   }
@@ -805,11 +908,32 @@
   } 
   default:
     goto InvalidSignal;
-  }//switch
-  
-  if (theImpl->theWaiter.m_state == NO_WAIT) {
-    // Wake up the thread waiting for response
-    NdbCondition_Signal(theImpl->theWaiter.m_condition);
+  }//swich
+
+  t_waiter= &theImpl->theWaiter;
+  if (t_waiter->get_state() == NO_WAIT && tWaitState != NO_WAIT)
+  {
+    /*
+      If our waiter object is the owner of the "poll rights", then we
+      can simply return, we will return from this routine to the
+      place where external_poll was called. From there it will move
+      the "poll ownership" to a new thread if available.
+
+      If our waiter object doesn't own the "poll rights", then we must
+      signal the thread from where this waiter object called
+      its conditional wait. This will wake up this thread so that it
+      can continue its work.
+    */
+    TransporterFacade *tp= theImpl->m_transporter_facade;
+    if (tp->get_poll_owner() != t_waiter)
+    {
+      /*
+         Wake up the thread waiting for response and remove it from queue
+         of objects waiting for receive completion
+      */
+      tp->remove_from_cond_wait_queue(t_waiter);
+      t_waiter->cond_signal();
+    }
   }//if
   return;
 
@@ -865,7 +989,19 @@
     if ((theMinNoOfEventsToWakeUp != 0) &&
         (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
       theMinNoOfEventsToWakeUp = 0;
-      NdbCondition_Signal(theImpl->theWaiter.m_condition);
+      TransporterFacade *tp = theImpl->m_transporter_facade;
+      NdbWaiter *t_waiter= &theImpl->theWaiter;
+      if (tp->get_poll_owner() != t_waiter) {
+        /*
+          When we come here, this is executed by the thread owning the "poll
+          rights". This thread is not where our waiter object belongs.
+          Thus we wake up the thread owning this waiter object but first
+          we must remove it from the conditional wait queue so that we
+          don't assign it as poll owner later on.
+        */
+        tp->remove_from_cond_wait_queue(t_waiter);
+        t_waiter->cond_signal();
+      }
       return;
     }//if
   } else {
@@ -935,8 +1071,9 @@
 void
 Ndb::check_send_timeout()
 {
-  Uint32 timeout = TransporterFacade::instance()->m_waitfor_timeout;
+  Uint32 timeout = theImpl->m_transporter_facade->m_waitfor_timeout;
   NDB_TICKS current_time = NdbTick_CurrentMillisecond();
+  assert(current_time >= the_last_check_time);
   if (current_time - the_last_check_time > 1000) {
     the_last_check_time = current_time;
     Uint32 no_of_sent = theNoOfSentTransactions;
@@ -1024,7 +1161,7 @@
      and we keep a small space for messages like that.
   */
   Uint32 i;
-  TransporterFacade* tp = TransporterFacade::instance();
+  TransporterFacade* tp = theImpl->m_transporter_facade;
   Uint32 no_of_prep_trans = theNoOfPreparedTransactions;
   for (i = 0; i < no_of_prep_trans; i++) {
     NdbTransaction * a_con = thePreparedTransactionsArray[i];
@@ -1127,7 +1264,8 @@
 ******************************************************************************/
 void	
 Ndb::waitCompletedTransactions(int aMilliSecondsToWait, 
-			       int noOfEventsToWaitFor)
+			       int noOfEventsToWaitFor,
+                               PollGuard *poll_guard)
 {
   theImpl->theWaiter.m_state = NO_WAIT; 
   /**
@@ -1136,22 +1274,24 @@
    * (see ReportFailure)
    */
   int waitTime = aMilliSecondsToWait;
-  NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime;
+  NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+  NDB_TICKS maxTime = currTime + (NDB_TICKS)waitTime;
   theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
+  const int maxsleep = aMilliSecondsToWait > 10 ? 10 : aMilliSecondsToWait;
   do {
-    if (waitTime < 1000) waitTime = 1000;
-    NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
-			     (NdbMutex*)theImpl->theWaiter.m_mutex,
-			     waitTime);
+    poll_guard->wait_for_input(maxsleep);
     if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
       break;
     }//if
     theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
     waitTime = (int)(maxTime - NdbTick_CurrentMillisecond());
   } while (waitTime > 0);
-  return;
 }//Ndb::waitCompletedTransactions()
 
+void Ndb::cond_signal()
+{
+  NdbCondition_Signal(theImpl->theWaiter.m_condition);
+}
 /*****************************************************************************
 void sendPreparedTransactions(int forceSend = 0);
 
@@ -1162,9 +1302,9 @@
 void	
 Ndb::sendPreparedTransactions(int forceSend)
 {
-  TransporterFacade::instance()->lock_mutex();
+  theImpl->m_transporter_facade->lock_mutex();
   sendPrepTrans(forceSend);
-  TransporterFacade::instance()->unlock_mutex();
+  theImpl->m_transporter_facade->unlock_mutex();
   return;
 }//Ndb::sendPreparedTransactions()
 
@@ -1179,28 +1319,39 @@
 int	
 Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
 {
+  /*
+    The PollGuard has an implicit call of unlock_and_signal through the
+    ~PollGuard method. This method is called implicitly by the compiler
+    in all places where the object is out of context due to a return,
+    break, continue or simply end of statement block
+  */
+  PollGuard pg(theImpl->m_transporter_facade, &theImpl->theWaiter,
+               theNdbBlockNumber);
+  sendPrepTrans(forceSend);
+  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
+}
+
+int
+Ndb::poll_trans(int aMillisecondNumber, int minNoOfEventsToWakeup,
+                PollGuard *pg)
+{
   NdbTransaction* tConArray[1024];
   Uint32         tNoCompletedTransactions;
-
-  //theCurrentConnectCounter = 0;
-  //theCurrentConnectIndex++;
-  TransporterFacade::instance()->lock_mutex();
-  sendPrepTrans(forceSend);
   if ((minNoOfEventsToWakeup <= 0) ||
       ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
     minNoOfEventsToWakeup = theNoOfSentTransactions;
   }//if
   if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
       (aMillisecondNumber > 0)) {
-    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
+    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, pg);
     tNoCompletedTransactions = pollCompleted(tConArray);
   } else {
     tNoCompletedTransactions = pollCompleted(tConArray);
   }//if
-  TransporterFacade::instance()->unlock_mutex();
+  pg->unlock_and_signal();
   reportCallback(tConArray, tNoCompletedTransactions);
   return tNoCompletedTransactions;
-}//Ndb::sendPollNdb()
+}
 
 /*****************************************************************************
 int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);
@@ -1212,67 +1363,23 @@
 int	
 Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup)
 {
-  NdbTransaction* tConArray[1024];
-  Uint32         tNoCompletedTransactions;
-
-  //theCurrentConnectCounter = 0;
-  //theCurrentConnectIndex++;
-  TransporterFacade::instance()->lock_mutex();
-  if ((minNoOfEventsToWakeup == 0) ||
-      ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
-    minNoOfEventsToWakeup = theNoOfSentTransactions;
-  }//if
-  if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
-      (aMillisecondNumber > 0)) {
-    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
-    tNoCompletedTransactions = pollCompleted(tConArray);
-  } else {
-    tNoCompletedTransactions = pollCompleted(tConArray);
-  }//if
-  TransporterFacade::instance()->unlock_mutex();
-  reportCallback(tConArray, tNoCompletedTransactions);
-  return tNoCompletedTransactions;
-}//Ndb::sendPollNdbWithoutWait()
-
-/*****************************************************************************
-int receiveOptimisedResponse();
-
-Return:  0 - Response received
-        -1 - Timeout occured waiting for response
-        -2 - Node failure interupted wait for response
-
-******************************************************************************/
-int	
-Ndb::receiveResponse(int waitTime){
-  int tResultCode;
-  TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
-  
-  theImpl->theWaiter.wait(waitTime);
-  
-  if(theImpl->theWaiter.m_state == NO_WAIT) {
-    tResultCode = 0;
-  } else {
-
-#ifdef VM_TRACE
-    ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
-    ndbout << theImpl->theWaiter.m_state << endl;
-#endif
-
-    if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){
-      tResultCode = -2;
-    } else {
-      tResultCode = -1;
-    }
-    theImpl->theWaiter.m_state = NO_WAIT;
-  }
-  return tResultCode;
-}//Ndb::receiveResponse()
+  /*
+    The PollGuard has an implicit call of unlock_and_signal through the
+    ~PollGuard method. This method is called implicitly by the compiler
+    in all places where the object is out of context due to a return,
+    break, continue or simply end of statement block
+  */
+  PollGuard pg(theImpl->m_transporter_facade, &theImpl->theWaiter,
+               theNdbBlockNumber);
+  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
+}
 
 int
 Ndb::sendRecSignal(Uint16 node_id,
 		   Uint32 aWaitState,
 		   NdbApiSignal* aSignal,
-                   Uint32 conn_seq)
+                   Uint32 conn_seq,
+                   Uint32 *ret_conn_seq)
 {
   /*
   In most situations 0 is returned.
@@ -1285,19 +1392,28 @@
   */
 
   int return_code;
-  TransporterFacade* tp = TransporterFacade::instance();
-  Uint32 send_size = 1; // Always sends one signal only 
-  tp->lock_mutex();
+  Uint32 read_conn_seq;
+  TransporterFacade* tp = theImpl->m_transporter_facade;
+  Uint32 send_size = 1; // Always sends one signal only
   // Protected area
+  /*
+    The PollGuard has an implicit call of unlock_and_signal through the
+    ~PollGuard method. This method is called implicitly by the compiler
+    in all places where the object is out of context due to a return,
+    break, continue or simply end of statement block
+  */
+  PollGuard poll_guard(tp,&theImpl->theWaiter,theNdbBlockNumber);
+  read_conn_seq= tp->getNodeSequence(node_id);
+  if (ret_conn_seq)
+    *ret_conn_seq= read_conn_seq;
   if ((tp->get_node_alive(node_id)) &&
-      ((tp->getNodeSequence(node_id) == conn_seq) ||
+      ((read_conn_seq == conn_seq) ||
        (conn_seq == 0))) {
     if (tp->check_send_size(node_id, send_size)) {
       return_code = tp->sendSignal(aSignal, node_id);
       if (return_code != -1) {
-        theImpl->theWaiter.m_node = node_id;
-        theImpl->theWaiter.m_state = aWaitState;
-        return_code = receiveResponse();
+        return poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,node_id,
+                                         aWaitState, false);
       } else {
 	return_code = -3;
       }
@@ -1306,29 +1422,28 @@
     }//if
   } else {
     if ((tp->get_node_stopping(node_id)) &&
-        ((tp->getNodeSequence(node_id) == conn_seq) ||
+        ((read_conn_seq == conn_seq) ||
          (conn_seq == 0))) {
       return_code = -5;
     } else {
       return_code = -2;
     }//if
   }//if
-  tp->unlock_mutex();
-  // End of protected area
   return return_code;
+  // End of protected area
 }//Ndb::sendRecSignal()
 
 void
-NdbTransaction::sendTC_COMMIT_ACK(NdbApiSignal * aSignal,
-				 Uint32 transId1, Uint32 transId2, 
-				 Uint32 aTCRef){
+NdbTransaction::sendTC_COMMIT_ACK(TransporterFacade *tp,
+                                  NdbApiSignal * aSignal,
+                                  Uint32 transId1, Uint32 transId2, 
+                                  Uint32 aTCRef){
 #ifdef MARKER_TRACE
   ndbout_c("Sending TC_COMMIT_ACK(0x%.8x, 0x%.8x) to -> %d",
 	   transId1,
 	   transId2,
 	   refToNode(aTCRef));
 #endif  
-  TransporterFacade *tp = TransporterFacade::instance();
   aSignal->theTrace                = TestOrd::TraceAPI;
   aSignal->theReceiversBlockNumber = DBTC;
   aSignal->theVerId_signalNumber   = GSN_TC_COMMIT_ACK;
@@ -1337,6 +1452,29 @@
   Uint32 * dataPtr = aSignal->getDataPtrSend();
   dataPtr[0] = transId1;
   dataPtr[1] = transId2;
-  
   tp->sendSignalUnCond(aSignal, refToNode(aTCRef));
+}
+
+int
+NdbImpl::send_event_report(Uint32 *data, Uint32 length)
+{
+  NdbApiSignal aSignal(m_ndb.theMyRef);
+  TransporterFacade *tp = m_transporter_facade;
+  aSignal.theTrace                = TestOrd::TraceAPI;
+  aSignal.theReceiversBlockNumber = CMVMI;
+  aSignal.theVerId_signalNumber   = GSN_EVENT_REP;
+  aSignal.theLength               = length;
+  memcpy((char *)aSignal.getDataPtrSend(), (char *)data, length*4);
+
+  Uint32 tNode;
+  Ndb_cluster_connection_node_iter node_iter;
+  m_ndb_cluster_connection.init_get_next_node(node_iter);
+  while ((tNode= m_ndb_cluster_connection.get_next_node(node_iter)))
+  {
+    if(tp->get_node_alive(tNode)){
+      tp->sendSignal(&aSignal, tNode);
+      return 0;
+    }
+  }
+  return 1;
 }
Thread
bk commit into 5.1 tree (tomas:1.2576)tomas11 Apr