List:Commits« Previous MessageNext Message »
From:jonas Date:January 12 2006 9:12am
Subject:bk commit into 5.1 tree (jonas:1.2042)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2042 06/01/12 10:11:58 jonas@stripped +3 -0
  Merge perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new
  into  perch.ndb.mysql.com:/home/jonas/src/51-new

  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
    1.37 06/01/12 10:11:54 jonas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    1.84 06/01/12 10:11:54 jonas@stripped +0 -9
    Auto merged

  storage/ndb/src/kernel/blocks/ERROR_codes.txt
    1.17 06/01/12 10:11:54 jonas@stripped +0 -3
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-new/RESYNC

--- 1.16/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2005-12-21 16:45:03 +01:00
+++ 1.17/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2006-01-12 10:11:54 +01:00
@@ -463,3 +463,13 @@
 6003 Crash in participant @ CreateTabReq::Prepare
 6004 Crash in participant @ CreateTabReq::Commit
 6005 Crash in participant @ CreateTabReq::CreateDrop
+
+Dbtup:
+4014 - handleInsert - Out of undo buffer
+4015 - handleInsert - Out of log space
+4016 - handleInsert - AI Inconsistency
+4017 - handleInsert - Out of memory
+4018 - handleInsert - Null check error
+4019 - handleInsert - Alloc rowid error
+4020 - handleInsert - Size change error
+4021 - handleInsert - Out of disk space

--- 1.83/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2005-12-21 16:45:03 +01:00
+++ 1.84/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2006-01-12 10:11:54 +01:00
@@ -118,57 +118,19 @@
 
 const Uint32 NR_ScanNo = 0;
 
-void Dblqh::execACC_COM_BLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in ACC is in critical sector of being full.
-/* ------------------------------------------------------------------------- */
-  cCounterAccCommitBlocked++;
-  caccCommitBlocked = true;
-  cCommitBlocked = true;
-  return;
-}//Dblqh::execACC_COM_BLOCK()
-
-void Dblqh::execACC_COM_UNBLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in ACC ok again.
-/* ------------------------------------------------------------------------- */
-  caccCommitBlocked = false;
-  if (ctupCommitBlocked == false) {
-    jam();
-    cCommitBlocked = false;
-  }//if
-  return;
-}//Dblqh::execACC_COM_UNBLOCK()
-
-void Dblqh::execTUP_COM_BLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in TUP is in critical sector of being full.
-/* ------------------------------------------------------------------------- */
-  cCounterTupCommitBlocked++;
-  ctupCommitBlocked = true;
-  cCommitBlocked = true;
-  return;
-}//Dblqh::execTUP_COM_BLOCK()
-
-void Dblqh::execTUP_COM_UNBLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in TUP ok again.
-/* ------------------------------------------------------------------------- */
-  ctupCommitBlocked = false;
-  if (caccCommitBlocked == false) {
-    jam();
-    cCommitBlocked = false;
-  }//if
-  return;
-}//Dblqh::execTUP_COM_UNBLOCK()
+#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
+static FileOutputStream tracenr_fos(fopen("tracenr.log", "w+"));
+NdbOut tracenrout(tracenr_fos);
+static int TRACENR_FLAG = 0;
+#define TRACENR(x) tracenrout << x
+#define SET_TRACENR_FLAG TRACENR_FLAG = 1
+#define CLEAR_TRACENR_FLAG TRACENR_FLAG = 0
+#else
+#define TRACENR_FLAG 0
+#define TRACENR(x)
+#define SET_TRACENR_FLAG
+#define CLEAR_TRACENR_FLAG
+#endif
 
 /* ------------------------------------------------------------------------- */
 /* -------               SEND SYSTEM ERROR                           ------- */
@@ -242,12 +204,6 @@
     ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
     fragptr.i = tcConnectptr.p->fragmentptr;
     c_fragment_pool.getPtr(fragptr);
-    if ((cCommitBlocked == true) &&
-        (fragptr.p->fragActiveStatus == ZTRUE)) {
-      jam();
-      sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
-      return;
-    }//if
     logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
     getFirstInLogQueue(signal);
 
@@ -289,12 +245,13 @@
       writeCommitLog(signal, logPartPtr);
       logNextStart(signal);
       if (tcConnectptr.p->transactionState == TcConnectionrec::LOG_COMMIT_QUEUED) {
-        if (tcConnectptr.p->seqNoReplica != 0) {
+        if (tcConnectptr.p->seqNoReplica == 0 ||
+	    tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY) {
           jam();
-          commitReplyLab(signal);
+          localCommitLab(signal);
         } else {
           jam();
-          localCommitLab(signal);
+          commitReplyLab(signal);
         }//if
         return;
       } else {
@@ -556,12 +513,13 @@
     preComputedRequestInfoMask = 0;
     LqhKeyReq::setKeyLen(preComputedRequestInfoMask, RI_KEYLEN_MASK);
     LqhKeyReq::setLastReplicaNo(preComputedRequestInfoMask, RI_LAST_REPL_MASK);
-    LqhKeyReq::setLockType(preComputedRequestInfoMask, RI_LOCK_TYPE_MASK);
     // Dont LqhKeyReq::setApplicationAddressFlag
     LqhKeyReq::setDirtyFlag(preComputedRequestInfoMask, 1);
     // Dont LqhKeyReq::setInterpretedFlag
     LqhKeyReq::setSimpleFlag(preComputedRequestInfoMask, 1);
     LqhKeyReq::setOperation(preComputedRequestInfoMask, RI_OPERATION_MASK);
+    LqhKeyReq::setGCIFlag(preComputedRequestInfoMask, 1);
+    LqhKeyReq::setNrCopyFlag(preComputedRequestInfoMask, 1);
     // Dont setAIInLqhKeyReq
     // Dont setSeqNoReplica
     // Dont setSameClientAndTcFlag
@@ -813,6 +771,7 @@
     jam();
     if (NodeBitmask::get(readNodes->allNodes, i)) {
       jam();
+      m_sr_nodes.set(i);
       cnodeData[ind]    = i;
       cnodeStatus[ind]  = NodeBitmask::get(readNodes->inactiveNodes, i);
       //readNodes->getVersionId(i, readNodes->theVersionIds) not used
@@ -824,11 +783,23 @@
   ndbrequire(!(cnoOfNodes == 1 && cstartType == NodeState::ST_NODE_RESTART));
   
   caddNodeState = ZFALSE;
-  if (cstartType == NodeState::ST_SYSTEM_RESTART) {
+  if (cstartType == NodeState::ST_SYSTEM_RESTART) 
+  {
     jam();
     sendNdbSttorryLab(signal);
     return;
-  }//if
+  } 
+  else if (cstartType == NodeState::ST_NODE_RESTART)
+  {
+    jam();
+    SET_TRACENR_FLAG;
+    m_sr_nodes.clear();
+    m_sr_nodes.set(getOwnNodeId());
+    sendNdbSttorryLab(signal);
+    return;
+  }
+  SET_TRACENR_FLAG;
+  
   checkStartCompletedLab(signal);
   return;
 }//Dblqh::execREAD_NODESCONF()
@@ -858,6 +829,7 @@
 {
   cstartPhase = ZNIL;
   cstartType = ZNIL;
+  CLEAR_TRACENR_FLAG;
   sendNdbSttorryLab(signal);
   return;
 }//Dblqh::startphase6Lab()
@@ -979,6 +951,13 @@
   Uint32 tableType = req->tableType;
   Uint32 primaryTableId = req->primaryTableId;
   Uint32 tablespace= req->tablespace_id;
+  Uint32 logPart = req->logPartId;
+
+  if (signal->getLength() < 20)
+  {
+    logPart = (fragId & 1) + 2 * (tabptr.i & 1);
+  }
+  logPart &= 3;
 
   ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
   bool tempTable = ((reqinfo & LqhFragReq::TemporaryTable) != 0);
@@ -1021,7 +1000,8 @@
   fragptr.p->startGci = startGci;
   fragptr.p->newestGci = startGci;
   fragptr.p->tableType = tableType;
-
+  fragptr.p->m_log_part_ptr_i = logPart; // assumes array
+  
   if (DictTabInfo::isOrderedIndex(tableType)) {
     jam();
     // find corresponding primary table fragment
@@ -1385,9 +1365,14 @@
       if (addfragptr.p->fragCopyCreation == 1) {
         jam();
         if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType))
+	{
+	  fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
           fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
+	}
         else
+	{
           fragptr.p->fragStatus = Fragrecord::FSACTIVE;
+	}
         fragptr.p->logFlag = Fragrecord::STATE_FALSE;
       } else {
         jam();
@@ -2019,17 +2004,6 @@
   jamEntry();
   cLqhTimeOutCount++;
   cLqhTimeOutCheckCount++;
-  if ((cCounterAccCommitBlocked > 0) ||
-     (cCounterTupCommitBlocked > 0)) {
-    jam();
-    signal->theData[0] = NDB_LE_UndoLogBlocked;
-    signal->theData[1] = cCounterTupCommitBlocked;
-    signal->theData[2] = cCounterAccCommitBlocked;
-    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
-
-    cCounterTupCommitBlocked = 0;
-    cCounterAccCommitBlocked = 0;
-  }//if
   if (cLqhTimeOutCheckCount < 10) {
     jam();
     return;
@@ -2527,6 +2501,7 @@
   c_fragment_pool.getPtr(regFragptr);
   fragptr = regFragptr;
   
+  TcConnectionrec* regTcPtr = tcConnectptr.p;
   switch (tcConnectptr.p->transactionState) {
   case TcConnectionrec::WAIT_TUP:
     jam();
@@ -3191,7 +3166,7 @@
   c_Counters.operations++;
 
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  regTcPtr->clientBlockref = signal->senderBlockRef();
+  Uint32 senderRef = regTcPtr->clientBlockref = signal->senderBlockRef();
   regTcPtr->clientConnectrec = sig0;
   regTcPtr->tcOprec = sig0;
   regTcPtr->storedProcId = ZNIL;
@@ -3210,10 +3185,12 @@
   regTcPtr->tcBlockref = sig5;
 
   const Uint8 op = LqhKeyReq::getOperation(Treqinfo);
-  if (op == ZREAD && !getAllowRead()){
+  if ((op == ZREAD || op == ZREAD_EX) && !getAllowRead()){
     noFreeRecordLab(signal, lqhKeyReq, ZNODE_SHUTDOWN_IN_PROGESS);
     return;
   }
+  
+  Uint32 senderVersion = getNodeInfo(refToNode(senderRef)).m_version;
 
   regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(TtotReclenAi);
   regTcPtr->tcScanInfo  = lqhKeyReq->scanInfo;
@@ -3268,15 +3245,26 @@
   
   regTcPtr->reqinfo = Treqinfo;
   regTcPtr->lastReplicaNo = LqhKeyReq::getLastReplicaNo(Treqinfo);
-  regTcPtr->lockType      = LqhKeyReq::getLockType(Treqinfo);
   regTcPtr->dirtyOp       = LqhKeyReq::getDirtyFlag(Treqinfo);
   regTcPtr->opExec        = LqhKeyReq::getInterpretedFlag(Treqinfo);
   regTcPtr->opSimple      = LqhKeyReq::getSimpleFlag(Treqinfo);
-  regTcPtr->operation     = LqhKeyReq::getOperation(Treqinfo);
-  regTcPtr->simpleRead    = regTcPtr->operation == ZREAD && regTcPtr->opSimple;
+  regTcPtr->simpleRead    = op == ZREAD && regTcPtr->opSimple;
   regTcPtr->seqNoReplica  = LqhKeyReq::getSeqNoReplica(Treqinfo);
   UintR TreclenAiLqhkey   = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
   regTcPtr->apiVersionNo  = 0; 
+  regTcPtr->m_use_rowid   = LqhKeyReq::getRowidFlag(Treqinfo);
+  regTcPtr->m_dealloc     = 0;
+  if (unlikely(senderVersion < NDBD_ROWID_VERSION))
+  {
+    regTcPtr->operation = op;
+    regTcPtr->lockType = LqhKeyReq::getLockType(Treqinfo);
+  }
+  else
+  {
+    regTcPtr->operation = op == ZREAD_EX ? ZREAD : op;
+    regTcPtr->lockType = 
+      op == ZREAD_EX ? ZUPDATE : op == ZWRITE ? ZINSERT : op;
+  }
   
   CRASH_INSERTION2(5041, regTcPtr->simpleRead && 
 		   refToNode(signal->senderBlockRef()) != cownNodeid);
@@ -3327,11 +3315,33 @@
     } else {
       nextPos += 4;
     }//if
-  } else {
+  } 
+  else if (! (LqhKeyReq::getNrCopyFlag(Treqinfo)))
+  {
     LQHKEY_error(signal, 3);
     return;
   }//if
-
+  
+  sig0 = lqhKeyReq->variableData[nextPos + 0];
+  sig1 = lqhKeyReq->variableData[nextPos + 1];
+  regTcPtr->m_row_id.m_page_no = sig0;
+  regTcPtr->m_row_id.m_page_idx = sig1;
+  nextPos += 2 * LqhKeyReq::getRowidFlag(Treqinfo);
+
+  sig2 = lqhKeyReq->variableData[nextPos + 0];
+  sig3 = cnewestGci;
+  regTcPtr->gci = LqhKeyReq::getGCIFlag(Treqinfo) ? sig2 : sig3;
+  nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
+  
+  if (LqhKeyReq::getRowidFlag(Treqinfo))
+  {
+    ndbassert(refToBlock(senderRef) != DBTC);
+  }
+  else if(op == ZINSERT)
+  {
+    ndbassert(refToBlock(senderRef) == DBTC);
+  }
+  
   if ((LqhKeyReq::FixedSignalLength + nextPos + TreclenAiLqhkey) != 
       signal->length()) {
     LQHKEY_error(signal, 2);
@@ -3398,18 +3408,34 @@
     LQHKEY_error(signal, 6);
     return;
   }//if
+
+  if (LqhKeyReq::getNrCopyFlag(Treqinfo))
+  {
+    ndbassert(refToBlock(senderRef) == DBLQH);
+    ndbassert(LqhKeyReq::getRowidFlag(Treqinfo));
+    if (! (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION))
+    {
+      ndbout_c("fragptr.p->fragStatus: %d",
+	       fragptr.p->fragStatus);
+    }
+    ndbassert(fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION);
+    fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
+  }
+  
   Uint8 TcopyType = fragptr.p->fragCopy;
+  Uint32 logPart = fragptr.p->m_log_part_ptr_i;
   tfragDistKey = fragptr.p->fragDistributionKey;
   if (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION) {
     jam();
-    regTcPtr->activeCreat = ZTRUE;
+    regTcPtr->activeCreat = fragptr.p->m_copy_started_state;
     CRASH_INSERTION(5002);
     CRASH_INSERTION2(5042, tabptr.i == c_error_insert_table_id);
   } else {
-    regTcPtr->activeCreat = ZFALSE;
+    regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   }//if
   regTcPtr->replicaType = TcopyType;
   regTcPtr->fragmentptr = fragptr.i;
+  regTcPtr->m_log_part_ptr_i = logPart;
   Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
   if ((tfragDistKey != TdistKey) &&
       (regTcPtr->seqNoReplica == 0) &&
@@ -3607,7 +3633,6 @@
 void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) 
 {
   UintR ttcScanOp;
-  UintR taccreq;
 
 /* -------------------------------------------------------------------------- */
 /*       INPUT:          TC_CONNECTPTR           ACTIVE CONNECTION RECORD     */
@@ -3620,6 +3645,7 @@
 /* -------------------------------------------------------------------------- */
   Uint32 tc_ptr_i = tcConnectptr.i;
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
+  Uint32 activeCreat = regTcPtr->activeCreat;
   if (regTcPtr->indTakeOver == ZTRUE) {
     jam();
     ttcScanOp = KeyInfo20::getScanOp(regTcPtr->tcScanInfo);
@@ -3668,36 +3694,75 @@
 /*       BEFORE SENDING THE MESSAGE THE REQUEST INFORMATION IS SET   */
 /*       PROPERLY.                                                   */
 /* ----------------------------------------------------------------- */
-#if 0
-  if (regTcPtr->tableref != 0) {
+  if (TRACENR_FLAG)
+  {
     switch (regTcPtr->operation) {
-    case ZREAD: ndbout << "Läsning "; break;
-    case ZUPDATE: ndbout << " Uppdatering "; break;
-    case ZWRITE: ndbout << "Write "; break;
-    case ZINSERT: ndbout << "Inläggning "; break;
-    case ZDELETE: ndbout << "Borttagning "; break;
-    default: ndbout << "????"; break;
+    case ZREAD: TRACENR("READ"); break;
+    case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZWRITE: TRACENR("WRITE"); break;
+    case ZINSERT: TRACENR("INSERT"); break;
+    case ZDELETE: TRACENR("DELETE"); break;
+    default: TRACENR("<Unknown: " << regTcPtr->operation << ">"); break;
     }
-    ndbout << "med nyckel = " << regTcPtr->tupkeyData[0] << endl;
+    
+    TRACENR(" tab: " << regTcPtr->tableref 
+	   << " frag: " << regTcPtr->fragmentid
+	   << " activeCreat: " << (Uint32)activeCreat);
+    if (LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo))
+      TRACENR(" NrCopy");
+    if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
+      TRACENR(" rowid: " << regTcPtr->m_row_id);
+    TRACENR(" key: " << regTcPtr->tupkeyData[0]);
   }
-#endif
   
-  regTcPtr->transactionState = TcConnectionrec::WAIT_ACC;
-  taccreq = regTcPtr->operation;
-  taccreq = taccreq + (regTcPtr->opSimple << 3);
-  taccreq = taccreq + (regTcPtr->lockType << 4);
-  taccreq = taccreq + (regTcPtr->dirtyOp << 6);
-  taccreq = taccreq + (regTcPtr->replicaType << 7);
-  taccreq = taccreq + (regTcPtr->apiVersionNo << 9);
+  if (likely(activeCreat == Fragrecord::AC_NORMAL))
+  {
+    if (TRACENR_FLAG)
+      TRACENR(endl);
+    ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo));
+    exec_acckeyreq(signal, tcConnectptr);
+  } 
+  else if (activeCreat == Fragrecord::AC_NR_COPY)
+  {
+    handle_nr_copy(signal, tcConnectptr);
+  }
+  else
+  {
+    ndbassert(activeCreat == Fragrecord::AC_IGNORED);
+    if (TRACENR_FLAG)
+      TRACENR(" IGNORING (activeCreat == 2)" << endl);
+    
+    signal->theData[0] = tc_ptr_i;
+    regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
+    
+    signal->theData[0] = regTcPtr->tupConnectrec;
+    EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+    jamEntry();
+    
+    execACC_ABORTCONF(signal);
+  }
+}
+
+void
+Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
+{
+  Uint32 taccreq;
+  regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC;
+  taccreq = regTcPtr.p->operation;
+  taccreq = taccreq + (regTcPtr.p->opSimple << 3);
+  taccreq = taccreq + (regTcPtr.p->lockType << 4);
+  taccreq = taccreq + (regTcPtr.p->dirtyOp << 6);
+  taccreq = taccreq + (regTcPtr.p->replicaType << 7);
+  taccreq = taccreq + (regTcPtr.p->apiVersionNo << 9);
 /* ************ */
 /*  ACCKEYREQ < */
 /* ************ */
   Uint32 sig0, sig1, sig2, sig3, sig4;
-  sig0 = regTcPtr->accConnectrec;
+  sig0 = regTcPtr.p->accConnectrec;
   sig1 = fragptr.p->accFragptr;
-  sig2 = regTcPtr->hashValue;
-  sig3 = regTcPtr->primKeyLen;
-  sig4 = regTcPtr->transid[0];
+  sig2 = regTcPtr.p->hashValue;
+  sig3 = regTcPtr.p->primKeyLen;
+  sig4 = regTcPtr.p->transid[0];
   signal->theData[0] = sig0;
   signal->theData[1] = sig1;
   signal->theData[2] = taccreq;
@@ -3705,39 +3770,430 @@
   signal->theData[4] = sig3;
   signal->theData[5] = sig4;
 
-  sig0 = regTcPtr->transid[1];
-  sig1 = regTcPtr->tupkeyData[0];
-  sig2 = regTcPtr->tupkeyData[1];
-  sig3 = regTcPtr->tupkeyData[2];
-  sig4 = regTcPtr->tupkeyData[3];
+  sig0 = regTcPtr.p->transid[1];
+  sig1 = regTcPtr.p->tupkeyData[0];
+  sig2 = regTcPtr.p->tupkeyData[1];
+  sig3 = regTcPtr.p->tupkeyData[2];
+  sig4 = regTcPtr.p->tupkeyData[3];
   signal->theData[6] = sig0;
   signal->theData[7] = sig1;
   signal->theData[8] = sig2;
   signal->theData[9] = sig3;
   signal->theData[10] = sig4;
-  if (regTcPtr->primKeyLen > 4) {
+  if (regTcPtr.p->primKeyLen > 4) {
     sendKeyinfoAcc(signal, 11);
   }//if
-  EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ, 
-		 signal, 7 + regTcPtr->primKeyLen);
+  EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ, 
+		 signal, 7 + regTcPtr.p->primKeyLen);
   if (signal->theData[0] < RNIL) {
-    signal->theData[0] = tc_ptr_i;
+    signal->theData[0] = regTcPtr.i;
     execACCKEYCONF(signal);
     return;
   } else if (signal->theData[0] == RNIL) {
     ;
   } else {
     ndbrequire(signal->theData[0] == (UintR)-1);
-    signal->theData[0] = tc_ptr_i;
+    signal->theData[0] = regTcPtr.i;
     execACCKEYREF(signal);
   }//if
   return;
 }//Dblqh::prepareContinueAfterBlockedLab()
 
-/* ========================================================================== */
-/* =======                  SEND KEYINFO TO ACC                       ======= */
-/*                                                                            */
-/* ========================================================================== */
+void
+Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
+{
+  jam();
+  Uint32 tableId = regTcPtr.p->tableref;
+  Uint32 fragPtr = fragptr.p->tupFragptr;
+  Uint32 op = regTcPtr.p->operation;
+
+  const bool copy = LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo);
+
+  if (!LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
+  {
+    /**
+     * Rowid not set, that mean that primary has finished copying...
+     */
+    jam();
+    if (TRACENR_FLAG)
+      TRACENR(" Waiting for COPY_ACTIVEREQ" << endl);
+    ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo));
+    regTcPtr.p->activeCreat = Fragrecord::AC_NORMAL;
+    exec_acckeyreq(signal, regTcPtr);
+    return;
+  }
+
+  regTcPtr.p->m_nr_delete.m_cnt = 1; // Wait for real op aswell
+  Uint32* dst = signal->theData+24;
+  bool uncommitted;
+  const int len = c_tup->nr_read_pk(fragPtr, &regTcPtr.p->m_row_id, dst, 
+				    uncommitted);
+  const bool match = (len>0) ? compare_key(regTcPtr.p, dst, len) == 0 : false;
+  
+  if (TRACENR_FLAG)
+    TRACENR(" len: " << len << " match: " << match 
+	   << " uncommitted: " << uncommitted);
+
+  if (copy)
+  {
+    ndbassert(LqhKeyReq::getGCIFlag(regTcPtr.p->reqinfo));
+    if (match)
+    {
+      /**
+       * Case 1
+       */
+      jam();
+      ndbassert(op == ZINSERT);
+      if (TRACENR_FLAG)
+	TRACENR(" Changing from INSERT to ZUPDATE" << endl);
+      regTcPtr.p->operation = ZUPDATE;
+      goto run;
+    }
+    else if (len > 0 && op == ZDELETE)
+    {
+      /**
+       * Case 4
+       *   Perform delete using rowid
+       *     primKeyLen == 0
+       *     tupkeyData[0] == rowid
+       */
+      jam();
+      ndbassert(regTcPtr.p->primKeyLen == 0);
+      if (TRACENR_FLAG)
+	TRACENR(" performing DELETE key: " 
+	       << dst[0] << endl); 
+      regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref();
+      if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
+      {
+	regTcPtr.p->hashValue = calculateHash(tableId, dst);
+      }
+      else
+      {
+	regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len);
+      }
+      goto run;
+    }
+    else if (len == 0 && op == ZDELETE)
+    {
+      /**
+       * Case 7
+       */
+      jam();
+      if (TRACENR_FLAG)
+	TRACENR(" UPDATE_GCI" << endl); 
+      c_tup->nr_update_gci(fragPtr, &regTcPtr.p->m_row_id, regTcPtr.p->gci);
+      goto update_gci_ignore;
+    }
+    
+    /**
+     * 1) Delete row at specified rowid (if len > 0)
+     * 2) Delete specified row at different rowid (if exists)
+     * 3) Run insert
+     */
+    if (len > 0)
+    {
+      /**
+       * 1) Delete row at specified rowid (if len > 0)
+       */
+      jam();
+      nr_copy_delete_row(signal, regTcPtr, &regTcPtr.p->m_row_id, len);
+    }
+    /**
+     * 2) Delete specified row at different rowid (if exists)    
+     */
+    jam();
+    nr_copy_delete_row(signal, regTcPtr, 0, 0);
+    if (TRACENR_FLAG)
+      TRACENR(" RUN INSERT" << endl); 
+    goto run;
+  }
+  else
+  {
+    if (!match && op != ZINSERT)
+    {
+      jam();
+      if (TRACENR_FLAG)
+	TRACENR(" IGNORE " << endl); 
+      goto ignore;
+    }
+    if (match)
+    {
+      jam();
+      if (op != ZDELETE)
+      {
+	if (TRACENR_FLAG)
+	  TRACENR(" Changing from to ZWRITE" << endl);
+	regTcPtr.p->operation = ZWRITE;
+      }
+      goto run;
+    }
+    
+    /**
+     * 1) Delete row at specified rowid (if len > 0)
+     * 2) Delete specified row at different rowid (if exists)
+     * 3) Run insert
+     */
+    if (len > 0)
+    {
+      /**
+       * 1) Delete row at specified rowid (if len > 0)
+       */
+      jam();
+      nr_copy_delete_row(signal, regTcPtr, &regTcPtr.p->m_row_id, len);
+    }
+
+    /**
+     * 2) Delete specified row at different rowid (if exists)    
+     */
+    jam();
+    nr_copy_delete_row(signal, regTcPtr, 0, 0);
+    if (TRACENR_FLAG)
+      TRACENR(" RUN op: " << op << endl); 
+    goto run;
+  }
+  
+run:
+  jam();
+  exec_acckeyreq(signal, regTcPtr);
+  return;
+  
+ignore:
+  jam();
+  ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo));
+update_gci_ignore:
+  regTcPtr.p->activeCreat = Fragrecord::AC_IGNORED;
+  signal->theData[0] = regTcPtr.p->tupConnectrec;
+  EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
+  regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
+  signal->theData[0] = regTcPtr.i;
+  execACC_ABORTCONF(signal);
+}
+
+int
+Dblqh::compare_key(const TcConnectionrec* regTcPtr, 
+		   const Uint32 * ptr, Uint32 len)
+{
+  if (regTcPtr->primKeyLen != len)
+    return 1;
+  
+  if (len <= 4)
+    return memcmp(ptr, regTcPtr->tupkeyData, 4*len);
+  
+  if (memcmp(ptr, regTcPtr->tupkeyData, sizeof(regTcPtr->tupkeyData)))
+    return 1;
+  
+  len -= (sizeof(regTcPtr->tupkeyData) >> 2);
+  ptr += (sizeof(regTcPtr->tupkeyData) >> 2);
+
+  DatabufPtr regDatabufptr;
+  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
+  ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+  while(len > 4)
+  {
+    if (memcmp(ptr, regDatabufptr.p, 4*4))
+      return 1;
+
+    ptr += 4;
+    len -= 4;
+    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);    
+  }
+
+  if (memcmp(ptr, regDatabufptr.p, 4*len))
+    return 1;
+
+  return 0;
+}
+
+void
+Dblqh::nr_copy_delete_row(Signal* signal, 
+			  Ptr<TcConnectionrec> regTcPtr,
+			  Local_key* rowid, Uint32 len)
+{
+  Ptr<Fragrecord> fragPtr = fragptr;
+
+  Uint32 keylen;
+  Uint32 tableId = regTcPtr.p->tableref;
+  Uint32 accPtr = regTcPtr.p->accConnectrec;
+  
+  signal->theData[0] = accPtr;
+  signal->theData[1] = fragptr.p->accFragptr;
+  signal->theData[2] = ZDELETE + (ZDELETE << 4);
+  signal->theData[5] = regTcPtr.p->transid[0];
+  signal->theData[6] = regTcPtr.p->transid[1];
+  
+  if (rowid)
+  {
+    jam();
+    keylen = 1;
+    if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
+    {
+      signal->theData[3] = calculateHash(tableId, signal->theData+24);
+    }
+    else
+    {
+      signal->theData[3] = md5_hash((Uint64*)(signal->theData+24), len);
+    }
+    signal->theData[4] = 0; // seach by local key
+    signal->theData[7] = rowid->ref();
+  }
+  else
+  {
+    jam();
+    keylen = regTcPtr.p->primKeyLen;
+    signal->theData[3] = regTcPtr.p->hashValue;
+    signal->theData[4] = keylen;
+    signal->theData[7] = regTcPtr.p->tupkeyData[0];
+    signal->theData[8] = regTcPtr.p->tupkeyData[1];
+    signal->theData[9] = regTcPtr.p->tupkeyData[2];
+    signal->theData[10] = regTcPtr.p->tupkeyData[3];
+    if (keylen > 4)
+      sendKeyinfoAcc(signal, 11);
+  }
+  const Uint32 ref = refToBlock(regTcPtr.p->tcAccBlockref);
+  EXECUTE_DIRECT(ref, GSN_ACCKEYREQ, signal, 7 + keylen);
+  jamEntry();
+
+  Uint32 retValue = signal->theData[0];
+  ndbrequire(retValue != RNIL); // This should never block...
+  ndbrequire(retValue != (Uint32)-1 || rowid == 0); // rowid should never fail
+
+  if (retValue == (Uint32)-1)
+  {
+    /**
+     * Only delete by pk, may fail
+     */
+    jam();
+    ndbrequire(rowid == 0);
+    signal->theData[0] = accPtr;
+    signal->theData[1] = false;
+    EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
+    jamEntry();
+    return;
+  }
+
+  /**
+   * We found row (and have it locked in ACC)
+   */
+  ndbrequire(regTcPtr.p->m_dealloc == 0);
+  Local_key save = regTcPtr.p->m_row_id;
+  signal->theData[0] = regTcPtr.p->accConnectrec;
+  EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
+  jamEntry();
+
+  ndbrequire(regTcPtr.p->m_dealloc == 1);  
+  int ret = c_tup->nr_delete(signal, regTcPtr.i, 
+			     fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id, 
+			     regTcPtr.p->gci);
+  jamEntry();
+
+  if (ret)
+  {
+    ndbassert(ret == 1);
+    Uint32 pos = regTcPtr.p->m_nr_delete.m_cnt - 1;
+    memcpy(regTcPtr.p->m_nr_delete.m_disk_ref + pos, 
+	   signal->theData, sizeof(Local_key));
+    regTcPtr.p->m_nr_delete.m_page_id[pos] = RNIL;
+    regTcPtr.p->m_nr_delete.m_cnt = pos + 2;
+    ndbout << "PENDING DISK DELETE: " << 
+      regTcPtr.p->m_nr_delete.m_disk_ref[pos] << endl;
+  }
+  
+  TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
+
+  regTcPtr.p->m_dealloc = 0;
+  regTcPtr.p->m_row_id = save;
+  fragptr = fragPtr;
+  tcConnectptr = regTcPtr;
+}
+
+void
+Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id)
+{
+  Ptr<TcConnectionrec> tcPtr;
+  tcPtr.i = op->m_ptr_i;
+  ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec);
+  
+  Ptr<Fragrecord> fragPtr;
+  c_fragment_pool.getPtr(fragPtr, tcPtr.p->fragmentptr);  
+
+  op->m_gci = tcPtr.p->gci;
+  op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr;
+
+  ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
+  ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
+  ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
+  
+  
+  if (page_id == RNIL)
+  {
+    // get log buffer callback
+    for (Uint32 i = 0; i<2; i++)
+    {
+      if (tcPtr.p->m_nr_delete.m_page_id[i] != RNIL)
+      {
+	op->m_page_id = tcPtr.p->m_nr_delete.m_page_id[i];
+	op->m_disk_ref = tcPtr.p->m_nr_delete.m_disk_ref[i];
+	return;
+      }
+    }
+  }
+  else
+  {
+    // get page callback
+    for (Uint32 i = 0; i<2; i++)
+    {
+      Local_key key = tcPtr.p->m_nr_delete.m_disk_ref[i];
+      if (op->m_disk_ref.m_page_no == key.m_page_no &&
+	  op->m_disk_ref.m_file_no == key.m_file_no &&
+	  tcPtr.p->m_nr_delete.m_page_id[i] == RNIL)
+      {
+	op->m_disk_ref = key;
+	tcPtr.p->m_nr_delete.m_page_id[i] = page_id;
+	return;
+      }
+    }
+  }
+  ndbrequire(false);
+}
+
+void 
+Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
+{
+  jamEntry();
+  Ptr<TcConnectionrec> tcPtr;
+  tcPtr.i = op->m_ptr_i;
+  ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec);
+
+  ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
+  ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
+  ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
+  
+  tcPtr.p->m_nr_delete.m_cnt--;
+  if (tcPtr.p->m_nr_delete.m_cnt == 0)
+  {
+    tcConnectptr = tcPtr;
+    c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr);
+    packLqhkeyreqLab(signal);
+    return;
+  }
+
+  if (memcmp(&tcPtr.p->m_nr_delete.m_disk_ref[0], 
+	     &op->m_disk_ref, sizeof(Local_key)) == 0)
+  {
+    jam();
+    ndbassert(tcPtr.p->m_nr_delete.m_page_id[0] != RNIL);
+    tcPtr.p->m_nr_delete.m_page_id[0] = tcPtr.p->m_nr_delete.m_page_id[1];
+    tcPtr.p->m_nr_delete.m_disk_ref[0] = tcPtr.p->m_nr_delete.m_disk_ref[1];
+  }
+}
+
+
+/* =*======================================================================= */
+/* =======                 SEND KEYINFO TO ACC                       ======= */
+/*                                                                           */
+/* ========================================================================= */
 void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti) 
 {
   DatabufPtr regDatabufptr;
@@ -3778,6 +4234,43 @@
   EXECUTE_DIRECT(tup, GSN_TUP_ALLOCREQ, signal, 3);
 }//Dblqh::execTUP_ALLOCREQ()
 
+void Dblqh::execTUP_DEALLOCREQ(Signal* signal)
+{
+  TcConnectionrecPtr regTcPtr;  
+  
+  jamEntry();
+  regTcPtr.i = signal->theData[4];
+  
+  if (TRACENR_FLAG)
+  {
+    Local_key tmp;
+    tmp.m_page_no = signal->theData[2];
+    tmp.m_page_idx = signal->theData[3];
+    TRACENR("TUP_DEALLOC: " << tmp << 
+      (signal->theData[5] ? " DIRECT " : " DELAYED") << endl);
+  }
+  
+  if (signal->theData[5])
+  {
+    jam();
+    Local_key tmp;
+    tmp.m_page_no = signal->theData[2];
+    tmp.m_page_idx = signal->theData[3];
+    EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, signal->getLength());
+    return;
+  }
+  else
+  {
+    jam();
+    ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+    regTcPtr.p->m_row_id.m_page_no = signal->theData[2];
+    regTcPtr.p->m_row_id.m_page_idx = signal->theData[3];
+    
+    ndbrequire(regTcPtr.p->m_dealloc == 0);
+    regTcPtr.p->m_dealloc = 1;
+  }
+}//Dblqh::execTUP_ALLOCREQ()
+
 /* ************>> */
 /*  ACCKEYCONF  > */
 /* ************>> */
@@ -3799,7 +4292,6 @@
   }//if
 
   // reset the activeCreat since that is only valid in cases where the record was not present.
-  regTcPtr->activeCreat = ZFALSE;
   /* ------------------------------------------------------------------------
    * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
    * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
@@ -3810,18 +4302,30 @@
    * ----------------------------------------------------------------------- */
   if (regTcPtr->operation == ZWRITE) 
   {
+    ndbassert(regTcPtr->seqNoReplica == 0 || 
+	      regTcPtr->activeCreat == Fragrecord::AC_NR_COPY);
     Uint32 op= signal->theData[1];
+    Uint32 requestInfo = regTcPtr->reqinfo;
     if(likely(op == ZINSERT || op == ZUPDATE))
     {
+      jam();
       regTcPtr->operation = op;
     }
     else
     {
+      jam();
       warningEvent("Convering %d to ZUPDATE", op);
       op = regTcPtr->operation = ZUPDATE;
     }
+    if (regTcPtr->seqNoReplica == 0)
+    {
+      jam();
+      requestInfo &= ~(RI_OPERATION_MASK <<  RI_OPERATION_SHIFT);
+      LqhKeyReq::setOperation(requestInfo, op);
+      regTcPtr->reqinfo = requestInfo;
+    }
   }//if
-
+  
   /* ------------------------------------------------------------------------
    * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
    * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
@@ -3847,6 +4351,7 @@
 			    Uint32 local_key,
 			    Uint32 disk_page)
 {
+  Uint32 op = regTcPtr->operation;
   regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
   /* ------------------------------------------------------------------------
    * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
@@ -3856,17 +4361,18 @@
    * IS NEEDED SINCE TWO SCHEMA VERSIONS CAN BE ACTIVE SIMULTANEOUSLY ON A 
    * TABLE.
    * ----------------------------------------------------------------------- */
-  Uint32 localKey2 = local_key & MAX_TUPLES_PER_PAGE;
-  Uint32 localKey1 = local_key >> MAX_TUPLES_BITS;
+  Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
+  Uint32 page_no = local_key >> MAX_TUPLES_BITS;
 #ifdef TRACE_LQHKEYREQ
-  ndbout << "localkey: [ " << hex << localKey1 << " " << localKey2 << "]" 
+  ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]" 
 	 << endl;
 #endif
   Uint32 Ttupreq = regTcPtr->dirtyOp;
   Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
-  Ttupreq = Ttupreq + (regTcPtr->operation << 6);
+  Ttupreq = Ttupreq + (op << 6);
   Ttupreq = Ttupreq + (regTcPtr->opExec << 10);
   Ttupreq = Ttupreq + (regTcPtr->apiVersionNo << 11);
+  Ttupreq = Ttupreq + (regTcPtr->m_use_rowid << 11);
 
   /* --------------------------------------------------------------------- 
    * Clear interpreted mode bit since we do not want the next replica to
@@ -3882,8 +4388,8 @@
   TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
   tupKeyReq->connectPtr = sig0;
   tupKeyReq->request = Ttupreq;
-  tupKeyReq->keyRef1 = localKey1;
-  tupKeyReq->keyRef2 = localKey2;
+  tupKeyReq->keyRef1 = page_no;
+  tupKeyReq->keyRef2 = page_idx;
 
   sig0 = regTcPtr->totReclenAi;
   sig1 = regTcPtr->applOprec;
@@ -3903,13 +4409,83 @@
   tupKeyReq->transId1 = sig1;
   tupKeyReq->transId2 = sig2;
   tupKeyReq->fragPtr = sig3;
+
+  sig0 = regTcPtr->m_row_id.m_page_no;
+  sig1 = regTcPtr->m_row_id.m_page_idx;
+  
   tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
   tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
   tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
   tupKeyReq->savePointId = tcConnectptr.p->savePointId;
   tupKeyReq->disk_page= disk_page;
 
+  tupKeyReq->m_row_id_page_no = sig0;
+  tupKeyReq->m_row_id_page_idx = sig1;
+  
+  if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
+  {
+    ndbout << "INSERT " << regFragptrP->tabRef 
+	   << "(" << regFragptrP->fragId << ")";
+
+    {
+      ndbout << "key=[" << hex;
+      Uint32 i;
+      for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
+	ndbout << hex << regTcPtr->tupkeyData[i] << " ";
+      }
+      
+      DatabufPtr regDatabufptr;
+      regDatabufptr.i = regTcPtr->firstTupkeybuf;
+      while(i < regTcPtr->primKeyLen)
+      {
+	ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+	for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
+	  ndbout << hex << regDatabufptr.p->data[j] << " ";
+      }
+      ndbout << "] ";
+    }
+
+    if(regTcPtr->m_use_rowid)
+      ndbout << " " << regTcPtr->m_row_id;
+  }
+
+  if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
+  {
+    Local_key lk; lk.assref(local_key);
+    
+    ndbout << "DELETE " << regFragptrP->tabRef 
+	   << "(" << regFragptrP->fragId << ") " << lk;
+
+    {
+      ndbout << "key=[" << hex;
+      Uint32 i;
+      for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
+	ndbout << hex << regTcPtr->tupkeyData[i] << " ";
+      }
+      
+      DatabufPtr regDatabufptr;
+      regDatabufptr.i = regTcPtr->firstTupkeybuf;
+      while(i < regTcPtr->primKeyLen)
+      {
+	ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+	for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
+	  ndbout << hex << regDatabufptr.p->data[j] << " ";
+      }
+      ndbout << "]" << endl;
+    }
+
+  }
+
+  regTcPtr->m_use_rowid |= (op == ZINSERT);
+  regTcPtr->m_row_id.m_page_no = page_no;
+  regTcPtr->m_row_id.m_page_idx = page_idx;
+  
   EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
+
+  if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
+  {
+    ndbout << endl;
+  }
 }//Dblqh::execACCKEYCONF()
 
 void
@@ -3928,7 +4504,7 @@
   else if(res == 0)
   {
     regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP;
-    regTcPtr.p->m_local_key = local_key;
+    regTcPtr.p->m_row_id.assref(local_key);
   }
   else 
   {
@@ -3956,7 +4532,7 @@
     c_fragment_pool.getPtr(fragPtr, regTcPtr->fragmentptr);
     
     acckeyconf_tupkeyreq(signal, regTcPtr, fragPtr.p, 
-			 regTcPtr->m_local_key,
+			 regTcPtr->m_row_id.ref(),
 			 disk_page);
   }
   else if (state != TcConnectionrec::WAIT_TUP)
@@ -3999,7 +4575,6 @@
      * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
      * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
      * ---------------------------------------------------------------------- */
-    regTcPtr->gci = cnewestGci;
     commitContinueAfterBlockedLab(signal);
     return;
   }//if
@@ -4045,7 +4620,6 @@
 	 * THIS OPERATION WAS A WRITE OPERATION THAT DO NOT NEED LOGGING AND 
 	 * THAT CAN CAN  BE COMMITTED IMMEDIATELY.                     
 	 * ----------------------------------------------------------------- */
-        regTcPtr->gci = cnewestGci;
         commitContinueAfterBlockedLab(signal);
         return;
       } else {
@@ -4092,7 +4666,6 @@
        * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE 
        * OPERATION.   
        * -------------------------------------------------------------------- */
-      regTcPtr->gci = cnewestGci;
       localCommitLab(signal);
       return;
     } else {
@@ -4113,12 +4686,11 @@
 	/* ------------------------------------------------------------------
 	 * THIS OPERATION WAS A WRITE OPERATION THAT DO NOT NEED LOGGING AND 
 	 * THAT CAN CAN  BE COMMITTED IMMEDIATELY. 
-	 * ------------------------------------------------------------------ */
+	 * ----------------------------------------------------------------- */
         jam();
-	  /* ----------------------------------------------------------------
-	   * IT MUST BE ACTIVE CREATION OF A FRAGMENT.
-	   * ---------------------------------------------------------------- */
-        regTcPtr->gci = cnewestGci;
+	/* ----------------------------------------------------------------
+	 * IT MUST BE ACTIVE CREATION OF A FRAGMENT.
+	 * ---------------------------------------------------------------- */
         localCommitLab(signal);
         return;
       } else {
@@ -4179,7 +4751,7 @@
     return;
   }//if
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  logPartPtr.i = regTcPtr->hashValue & 3;
+  logPartPtr.i = regTcPtr->m_log_part_ptr_i;
   ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
 /* -------------------------------------------------- */
 /*       THIS PART IS USED TO WRITE THE LOG           */
@@ -4344,7 +4916,6 @@
     /* ----------------------------------------------------------------------
      * DIRTY OPERATIONS SHOULD COMMIT BEFORE THEY PACK THE REQUEST/RESPONSE.
      * ---------------------------------------------------------------------- */
-    regTcPtr->gci = cnewestGci;
     localCommitLab(signal);
   }//if
 }//Dblqh::logLqhkeyreqLab()
@@ -4415,11 +4986,35 @@
   UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
   Treqinfo = preComputedRequestInfoMask & regTcPtr->reqinfo;
 
+  Uint32 nextNodeId = regTcPtr->nextReplica;
+  Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
+
   UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
   LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
   LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
   LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
   LqhKeyReq::setAIInLqhKeyReq(Treqinfo, regTcPtr->reclenAiLqhkey);
+  
+  if (unlikely(nextVersion < NDBD_ROWID_VERSION))
+  {
+    LqhKeyReq::setLockType(Treqinfo, regTcPtr->lockType);
+  }
+  else
+  {
+    regTcPtr->m_use_rowid |= 
+      fragptr.p->m_copy_started_state == Fragrecord::AC_NR_COPY;
+    LqhKeyReq::setRowidFlag(Treqinfo, regTcPtr->m_use_rowid);
+  }
+
+  if (LqhKeyReq::getRowidFlag(Treqinfo))
+  {
+    //ndbassert(LqhKeyReq::getOperation(Treqinfo) == ZINSERT);
+  }
+  else
+  {
+    ndbassert(LqhKeyReq::getOperation(Treqinfo) != ZINSERT);
+  }
+  
   UintR TreadLenAiInd = (regTcPtr->readlenAi == 0 ? 0 : 1);
   UintR TsameLqhAndClient = (tcConnectptr.i == 
                              regTcPtr->tcOprec ? 0 : 1);
@@ -4491,6 +5086,16 @@
     nextPos += 4;
   }//if
 
+  sig0 = regTcPtr->gci;
+  Local_key tmp = regTcPtr->m_row_id;
+  
+  lqhKeyReq->variableData[nextPos + 0] = tmp.m_page_no;
+  lqhKeyReq->variableData[nextPos + 1] = tmp.m_page_idx;
+  nextPos += 2*LqhKeyReq::getRowidFlag(Treqinfo);
+
+  lqhKeyReq->variableData[nextPos + 0] = sig0;
+  nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
+
   sig0 = regTcPtr->firstAttrinfo[0];
   sig1 = regTcPtr->firstAttrinfo[1];
   sig2 = regTcPtr->firstAttrinfo[2];
@@ -4636,7 +5241,9 @@
   Uint32 operation = tcConnectptr.p->operation;
   Uint32 keyLen = tcConnectptr.p->primKeyLen;
   Uint32 aiLen = tcConnectptr.p->currTupAiLen;
-  Uint32 totLogLen = aiLen + keyLen + ZLOG_HEAD_SIZE;
+  Local_key rowid = tcConnectptr.p->m_row_id;
+  Uint32 totLogLen = ZLOG_HEAD_SIZE + aiLen + keyLen;
+  
   if ((logPos + ZLOG_HEAD_SIZE) < ZPAGE_SIZE) {
     Uint32* dataPtr = &logPagePtr.p->logPageWord[logPos];
     logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + ZLOG_HEAD_SIZE;
@@ -4646,6 +5253,8 @@
     dataPtr[3] = operation;
     dataPtr[4] = aiLen;
     dataPtr[5] = keyLen;
+    dataPtr[6] = rowid.m_page_no;
+    dataPtr[7] = rowid.m_page_idx;
   } else {
     writeLogWord(signal, ZPREP_OP_TYPE);
     writeLogWord(signal, totLogLen);
@@ -4653,6 +5262,8 @@
     writeLogWord(signal, operation);
     writeLogWord(signal, aiLen);
     writeLogWord(signal, keyLen);
+    writeLogWord(signal, rowid.m_page_no);
+    writeLogWord(signal, rowid.m_page_idx);
   }//if
 }//Dblqh::writeLogHeader()
 
@@ -4840,6 +5451,22 @@
   regTcPtr->lastAttrinbuf = RNIL;
   regTcPtr->firstTupkeybuf = RNIL;
   regTcPtr->lastTupkeybuf = RNIL;
+
+  if (regTcPtr->m_dealloc)
+  {
+    jam();
+    regTcPtr->m_dealloc = 0;
+
+    if (TRACENR_FLAG)
+      TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
+    
+    signal->theData[0] = regTcPtr->fragmentid;
+    signal->theData[1] = regTcPtr->tableref;
+    signal->theData[2] = regTcPtr->m_row_id.m_page_no;
+    signal->theData[3] = regTcPtr->m_row_id.m_page_idx;
+    signal->theData[4] = RNIL;
+    EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 5);
+  }
 }//Dblqh::releaseOprec()
 
 /* ------------------------------------------------------------------------- */
@@ -5169,15 +5796,24 @@
   if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
       (tcConnectptr.p->transid[0] == transid1) &&
       (tcConnectptr.p->transid[1] == transid2)) {
-    if (tcConnectptr.p->seqNoReplica != 0) {
+    if (tcConnectptr.p->seqNoReplica != 0 && 
+	tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
       jam();
       localCommitLab(signal);
       return;
-    } else {
+    } 
+    else if (tcConnectptr.p->seqNoReplica == 0)
+    {
       jam();
       completeTransLastLab(signal);
       return;
-    }//if
+    }
+    else
+    {
+      jam();
+      completeTransNotLastLab(signal);
+      return;
+    }
   }//if
   if (tcConnectptr.p->transactionState != TcConnectionrec::COMMITTED) {
     warningReport(signal, 2);
@@ -5255,15 +5891,21 @@
     return;
     break;
   }//switch
-  if (regTcPtr->seqNoReplica != 0) {
+  if (regTcPtr->seqNoReplica != 0 && 
+      regTcPtr->activeCreat != Fragrecord::AC_NR_COPY) {
     jam();
     localCommitLab(signal);
-    return;
-  } else {
+  } 
+  else if (regTcPtr->seqNoReplica == 0)
+  {
     jam();
     completeTransLastLab(signal);
-    return;
-  }//if
+  }
+  else
+  {
+    jam();
+    completeTransNotLastLab(signal);
+  }
 }//Dblqh::execCOMPLETEREQ()
 
 /* ************> */
@@ -5310,7 +5952,6 @@
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   TcConnectionrec::LogWriteState logWriteState = regTcPtr->logWriteState;
   TcConnectionrec::TransactionState transState = regTcPtr->transactionState;
-  ndbrequire(regTcPtr->gci == gci || regTcPtr->gci == 0);
   regTcPtr->gci = gci; 
   if (transState == TcConnectionrec::PREPARED) {
     if (logWriteState == TcConnectionrec::WRITTEN) {
@@ -5361,12 +6002,13 @@
     warningReport(signal, 0);
     return;
   }//if
-  if (regTcPtr->seqNoReplica != 0) {
+  if (regTcPtr->seqNoReplica == 0 ||
+      regTcPtr->activeCreat == Fragrecord::AC_NR_COPY) {
     jam();
-    commitReplyLab(signal);
+    localCommitLab(signal);
     return;
   }//if
-  localCommitLab(signal);
+  commitReplyLab(signal);
   return;
 }//Dblqh::commitReqLab()
 
@@ -5380,7 +6022,6 @@
   Uint32 newestGci = cnewestGci;
   TcConnectionrec::LogWriteState logWriteState = regTcPtr->logWriteState;
   TcConnectionrec::TransactionState transState = regTcPtr->transactionState;
-  ndbrequire(regTcPtr->gci == gci || regTcPtr->gci == 0);
   regTcPtr->gci = gci;
   if (gci > newestGci) {
     jam();
@@ -5398,7 +6039,7 @@
     LogPartRecordPtr regLogPartPtr;
     Uint32 noOfLogPages = cnoOfLogPages;
     jam();
-    regLogPartPtr.i = regTcPtr->hashValue & 3;
+    regLogPartPtr.i = regTcPtr->m_log_part_ptr_i;
     ptrCheckGuard(regLogPartPtr, clogPartFileSize, logPartRecord);
     if ((regLogPartPtr.p->logPartState == LogPartRecord::ACTIVE) ||
         (noOfLogPages == 0)) {
@@ -5485,62 +6126,60 @@
 /*WE MUST COMMIT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN AND SEES A    */
 /*DIRTY STATE IN TUP.                                                        */
 /* ------------------------------------------------------------------------- */
-  TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  Fragrecord * const regFragptr = fragptr.p;
-  Uint32 operation = regTcPtr->operation;
-  Uint32 simpleRead = regTcPtr->simpleRead;
-  Uint32 dirtyOp = regTcPtr->dirtyOp;
-  if (regTcPtr->activeCreat == ZFALSE) {
-    if ((cCommitBlocked == true) &&
-        (regFragptr->fragActiveStatus == ZTRUE)) {
-      jam();
-/* ------------------------------------------------------------------------- */
-// TUP and/or ACC have problems in writing the undo log to disk fast enough.
-// We must avoid the commit at this time and try later instead. The fragment
-// is also active with a local checkpoint and this commit can generate UNDO
-// log records that overflow the UNDO log buffer.
-/* ------------------------------------------------------------------------- */
-/*---------------------------------------------------------------------------*/
-// We must delay the write of commit info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
-      logPartPtr.i = regTcPtr->hashValue & 3;
-      ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
-      linkWaitLog(signal, logPartPtr);
-      regTcPtr->transactionState = TcConnectionrec::COMMIT_QUEUED;
-      if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
-        jam();
-        logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-      }//if
-      return;
-    }//if
+  Ptr<TcConnectionrec> regTcPtr = tcConnectptr;
+  Ptr<Fragrecord> regFragptr = fragptr;
+  Uint32 operation = regTcPtr.p->operation;
+  Uint32 simpleRead = regTcPtr.p->simpleRead;
+  Uint32 dirtyOp = regTcPtr.p->dirtyOp;
+  if (regTcPtr.p->activeCreat != Fragrecord::AC_IGNORED) {
     if (operation != ZREAD) {
       TupCommitReq * const tupCommitReq = 
         (TupCommitReq *)signal->getDataPtrSend();
-      Uint32 sig0 = regTcPtr->tupConnectrec;
-      Uint32 tup = refToBlock(regTcPtr->tcTupBlockref);
+      Uint32 sig0 = regTcPtr.p->tupConnectrec;
+      Uint32 tup = refToBlock(regTcPtr.p->tcTupBlockref);
       jam();
       tupCommitReq->opPtr = sig0;
-      tupCommitReq->gci = regTcPtr->gci;
-      tupCommitReq->hashValue = regTcPtr->hashValue;
+      tupCommitReq->gci = regTcPtr.p->gci;
+      tupCommitReq->hashValue = regTcPtr.p->hashValue;
       EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal, 
 		     TupCommitReq::SignalLength);
 
       if(signal->theData[0] != 0)
       {
-	regTcPtr->transactionState = TcConnectionrec::WAIT_TUP_COMMIT;
+	regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP_COMMIT;
 	return; // TUP_COMMIT was timesliced
       }
       
-      Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
-      signal->theData[0] = regTcPtr->accConnectrec;
+      if (TRACENR_FLAG)
+      {
+	TRACENR("COMMIT: ");
+	switch (regTcPtr.p->operation) {
+	case ZREAD: TRACENR("READ"); break;
+	case ZUPDATE: TRACENR("UPDATE"); break;
+	case ZWRITE: TRACENR("WRITE"); break;
+	case ZINSERT: TRACENR("INSERT"); break;
+	case ZDELETE: TRACENR("DELETE"); break;
+	}
+
+	TRACENR(" tab: " << regTcPtr.p->tableref 
+	       << " frag: " << regTcPtr.p->fragmentid
+	       << " activeCreat: " << (Uint32)regTcPtr.p->activeCreat);
+	if (LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo))
+	  TRACENR(" NrCopy");
+	if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
+	  TRACENR(" rowid: " << regTcPtr.p->m_row_id);
+	TRACENR(" key: " << regTcPtr.p->tupkeyData[0]);
+	TRACENR(endl);
+      }
+
+      Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
+      signal->theData[0] = regTcPtr.p->accConnectrec;
       EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
+      
     } else {
       if(!dirtyOp){
-	Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
-	signal->theData[0] = regTcPtr->accConnectrec;
+	Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
+	signal->theData[0] = regTcPtr.p->accConnectrec;
 	EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
       }
       
@@ -5553,13 +6192,17 @@
 /*RESOURCES BELONGING TO THIS OPERATION SINCE NO MORE WORK WILL BE           */
 /*PERFORMED.                                                                 */
 /* ------------------------------------------------------------------------- */
+	fragptr = regFragptr;
+	tcConnectptr = regTcPtr;
 	cleanUp(signal);
 	return;
       }//if
     }
   }//if
   jamEntry();
-  tupcommit_conf(signal, regTcPtr, regFragptr);
+  fragptr = regFragptr;
+  tcConnectptr = regTcPtr;
+  tupcommit_conf(signal, regTcPtr.p, regFragptr.p);
 }
 
 void
@@ -5569,7 +6212,7 @@
 
   tcConnectptr.i = tcPtrI;
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  TcConnectionrec * const tcPtr = tcConnectptr.p;
+  TcConnectionrec * tcPtr = tcConnectptr.p;
 
   ndbrequire(tcPtr->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
 
@@ -5583,31 +6226,45 @@
   EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
   jamEntry();
 
+  tcConnectptr.i = tcPtrI;
+  tcConnectptr.p = tcPtr;
   tupcommit_conf(signal, tcPtr, regFragptr.p);
 }
 
 void
 Dblqh::tupcommit_conf(Signal* signal, 
-		      TcConnectionrec * regTcPtr,
+		      TcConnectionrec * tcPtrP,
 		      Fragrecord * regFragptr)
 {
-  Uint32 dirtyOp = regTcPtr->dirtyOp;
-  Uint32 seqNoReplica = regTcPtr->seqNoReplica;
-  if (regTcPtr->gci > regFragptr->newestGci) {
+  Uint32 dirtyOp = tcPtrP->dirtyOp;
+  Uint32 seqNoReplica = tcPtrP->seqNoReplica;
+  Uint32 activeCreat = tcPtrP->activeCreat;
+  if (tcPtrP->gci > regFragptr->newestGci) {
     jam();
 /* ------------------------------------------------------------------------- */
 /*IT IS THE FIRST TIME THIS GLOBAL CHECKPOINT IS INVOLVED IN UPDATING THIS   */
 /*FRAGMENT. UPDATE THE VARIABLE THAT KEEPS TRACK OF NEWEST GCI IN FRAGMENT   */
 /* ------------------------------------------------------------------------- */
-    regFragptr->newestGci = regTcPtr->gci;
+    regFragptr->newestGci = tcPtrP->gci;
   }//if
-  if (dirtyOp != ZTRUE) {
-    if (seqNoReplica != 0) {
+  if (dirtyOp != ZTRUE) 
+  {
+    if (seqNoReplica == 0 || activeCreat == Fragrecord::AC_NR_COPY)
+    {
       jam();
-      completeTransNotLastLab(signal);
+      commitReplyLab(signal);
       return;
     }//if
-    commitReplyLab(signal);
+    if (seqNoReplica == 0)
+    {
+      jam();
+      completeTransLastLab(signal);
+    }
+    else
+    {      
+      jam();
+      completeTransNotLastLab(signal);
+    }
     return;
   } else {
 /* ------------------------------------------------------------------------- */
@@ -5615,11 +6272,28 @@
 /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
 /*THOSE SIGNALS INTERNALLY.                                                  */
 /* ------------------------------------------------------------------------- */
-    if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
+    if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) {
       jam();
+      if (activeCreat == Fragrecord::AC_NR_COPY && 
+	  tcPtrP->m_nr_delete.m_cnt > 1)
+      {
+	jam();
+	/**
+	 * Nr delete waiting for disk delete to complete...
+	 */
+#ifdef VM_TRACE
+	TablerecPtr tablePtr;
+	tablePtr.i = tcPtrP->tableref;
+	ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
+	ndbrequire(tablePtr.p->m_disk_table);
+#endif
+	tcPtrP->m_nr_delete.m_cnt--;
+	tcPtrP->transactionState = TcConnectionrec::WAIT_TUP_COMMIT;
+	return;
+      }
       packLqhkeyreqLab(signal);
     } else {
-      ndbrequire(regTcPtr->abortState != TcConnectionrec::NEW_FROM_TC);
+      ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC);
       jam();
       sendLqhTransconf(signal, LqhTransConf::Committed);
       cleanUp(signal);
@@ -5847,7 +6521,7 @@
     sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
   }//if
   regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
 
   const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
   if(commitAckMarker != RNIL){
@@ -5907,7 +6581,7 @@
   regTcPtr->reqBlockref = reqBlockref;
   regTcPtr->reqRef = reqPtr;
   regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   abortCommonLab(signal);
   return;
 }//Dblqh::execABORTREQ()
@@ -5952,51 +6626,49 @@
   }//switch
   const Uint32 errCode = terrorCode; 
   tcPtr->errorCode = errCode;
-/* ------------------------------------------------------------------------- */
-/*WHEN AN ABORT FROM TC ARRIVES IT COULD ACTUALLY BE A CORRECT BEHAVIOUR     */
-/*SINCE THE TUPLE MIGHT NOT HAVE ARRIVED YET OR ALREADY HAVE BEEN INSERTED.  */
-/* ------------------------------------------------------------------------- */
-  if (tcPtr->activeCreat == ZTRUE) {
-    jam();
-/* ------------------------------------------------------------------------- */
-/*THIS IS A NORMAL EVENT DURING CREATION OF A FRAGMENT. PERFORM ABORT IN     */
-/*TUP AND ACC AND THEN CONTINUE WITH NORMAL COMMIT PROCESSING. IF THE ERROR  */
-/*HAPPENS TO BE A SERIOUS ERROR THEN PERFORM ABORT PROCESSING AS NORMAL.     */
-/* ------------------------------------------------------------------------- */
+
+  if (TRACENR_FLAG)
+  {
+    TRACENR("ACCKEYREF: " << errCode << " ");
     switch (tcPtr->operation) {
-    case ZUPDATE:
-    case ZDELETE:
-      jam();
-      if (errCode != ZNO_TUPLE_FOUND) {
-        jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-        tcPtr->activeCreat = ZFALSE;
-      }//if
+    case ZREAD: TRACENR("READ"); break;
+    case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZWRITE: TRACENR("WRITE"); break;
+    case ZINSERT: TRACENR("INSERT"); break;
+    case ZDELETE: TRACENR("DELETE"); break;
+    default: TRACENR("<Unknown: " << tcPtr->operation << ">"); break;
+    }
+    
+    TRACENR(" tab: " << tcPtr->tableref 
+	   << " frag: " << tcPtr->fragmentid
+	   << " activeCreat: " << (Uint32)tcPtr->activeCreat);
+    if (LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo))
+      TRACENR(" NrCopy");
+    if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
+      TRACENR(" rowid: " << tcPtr->m_row_id);
+    TRACENR(" key: " << tcPtr->tupkeyData[0]);
+    TRACENR(endl);
+    
+  }
+
+  if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY)
+  {
+    jam();
+    Uint32 op = tcPtr->operation;
+    switch(errCode){
+    case ZNO_TUPLE_FOUND:
+      ndbrequire(op == ZDELETE);
       break;
-    case ZINSERT:
-      jam();
-      if (errCode != ZTUPLE_ALREADY_EXIST) {
-        jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-        tcPtr->activeCreat = ZFALSE;
-      }//if
       break;
     default:
-      jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-      tcPtr->activeCreat = ZFALSE;
-      break;
-    }//switch
-  } else {
+      ndbrequire(false);
+    }
+    tcPtr->activeCreat = Fragrecord::AC_IGNORED;
+  }
+  else 
+  {
+    ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo));
+    
     /**
      * Only primary replica can get ZTUPLE_ALREADY_EXIST || ZNO_TUPLE_FOUND
      *
@@ -6010,11 +6682,16 @@
      *
      * -> ZNO_TUPLE_FOUND is possible
      */
+    ndbrequire(tcPtr->operation == ZREAD 
+	       || tcPtr->operation == ZREAD_EX
+	       || tcPtr->seqNoReplica == 0);
+    
     ndbrequire
       (tcPtr->seqNoReplica == 0 ||
        errCode != ZTUPLE_ALREADY_EXIST ||
        (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
   }
+  
   tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
   abortCommonLab(signal);
   return;
@@ -6027,7 +6704,7 @@
     jam();
     return;
   }//if
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
   regTcPtr->errorCode = terrorCode;
   abortStateHandlerLab(signal);
@@ -6207,7 +6884,7 @@
    *       ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED 
    *       WITH NORMAL ABORT HANDLING.
    * ----------------------------------------------------------------------- */
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   abortCommonLab(signal);
   return;
 }//Dblqh::abortErrorLab()
@@ -6216,7 +6893,8 @@
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
-  if(regTcPtr->activeCreat != ZTRUE && commitAckMarker != RNIL){
+  if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && 
+     commitAckMarker != RNIL){
     /**
      * There is no NR ongoing and we have a marker
      */
@@ -6281,42 +6959,12 @@
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   fragptr.i = regTcPtr->fragmentptr;
   c_fragment_pool.getPtr(fragptr);
-  if ((cCommitBlocked == true) &&
-      (fragptr.p->fragActiveStatus == ZTRUE) &&
-      (canBlock == true) &&
-      (regTcPtr->operation != ZREAD)) {
-    jam();
-/* ------------------------------------------------------------------------- */
-// TUP and/or ACC have problems in writing the undo log to disk fast enough.
-// We must avoid the abort at this time and try later instead. The fragment
-// is also active with a local checkpoint and this commit can generate UNDO
-// log records that overflow the UNDO log buffer.
-//
-// In certain situations it is simply too complex to insert a wait state here
-// since ACC is active and we cannot release the operation from the active
-// list without causing great complexity.
-/* ------------------------------------------------------------------------- */
-/*---------------------------------------------------------------------------*/
-// We must delay the write of abort info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
-    logPartPtr.i = regTcPtr->hashValue & 3;
-    ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
-    linkWaitLog(signal, logPartPtr);
-    regTcPtr->transactionState = TcConnectionrec::ABORT_QUEUED;
-    if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
-      jam();
-      logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-    }//if
-    return;
-  }//if
   signal->theData[0] = regTcPtr->tupConnectrec;
   EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
   regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
   signal->theData[0] = regTcPtr->accConnectrec;
-  EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 1);
+  signal->theData[1] = true;
+  EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
   /* ------------------------------------------------------------------------
    * We need to insert a real-time break by sending ACC_ABORTCONF through the
    * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
@@ -6337,11 +6985,11 @@
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
-  if (regTcPtr->activeCreat == ZTRUE) {
+  if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) {
     /* ----------------------------------------------------------------------
      * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE
      * WITH NORMAL COMMIT PROCESSING.
-     * ---------------------------------------------------------------------- */
+     * --------------------------------------------------------------------- */
     if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) {
       jam();
       regTcPtr->abortState = TcConnectionrec::ABORT_IDLE;
@@ -6657,6 +7305,7 @@
 	       * THE RECEIVER OF THE COPY HAVE FAILED. 
 	       * WE HAVE TO CLOSE THE COPY PROCESS. 
 	       * ----------------------------------------------------------- */
+	      ndbout_c("close copy");
               tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
               tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
               closeCopyRequestLab(signal);
@@ -6814,13 +7463,28 @@
   jamEntry();
   scanptr.i = nextScanConf->scanPtr;
   c_scanRecordPool.getPtr(scanptr);
-  if (nextScanConf->localKeyLength == 1) {
+  if (likely(nextScanConf->localKeyLength == 1)) 
+  {
     jam();
-    nextScanConf->localKey[1] = 
-                  nextScanConf->localKey[0] & MAX_TUPLES_PER_PAGE;
-    nextScanConf->localKey[0] = nextScanConf->localKey[0] >> MAX_TUPLES_BITS;
-  }//if
-
+    scanptr.p->m_row_id.assref(nextScanConf->localKey[0]);
+  }
+  else
+  {
+    jam();
+    scanptr.p->m_row_id.m_page_no = nextScanConf->localKey[0];
+    scanptr.p->m_row_id.m_page_idx = nextScanConf->localKey[1]; 
+  }
+  
+#ifdef VM_TRACE
+  if (signal->getLength() > 2 && nextScanConf->accOperationPtr != RNIL)
+  {
+    Ptr<TcConnectionrec> regTcPtr;
+    regTcPtr.i = scanptr.p->scanTcrec;
+    ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+    ndbassert(regTcPtr.p->fragmentid == nextScanConf->fragId);
+  }
+#endif
+  
   fragptr.i = scanptr.p->fragPtrI;
   c_fragment_pool.getPtr(fragptr);
   switch (scanptr.p->scanState) {
@@ -7572,6 +8236,8 @@
   AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode);
   AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
   AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
+  AccScanReq::setNoDiskScanFlag(req->requestInfo, 
+				!tcConnectptr.p->m_disk_table);
   req->transId1 = tcConnectptr.p->transid[0];
   req->transId2 = tcConnectptr.p->transid[1];
   req->savePointId = tcConnectptr.p->savePointId;
@@ -8094,9 +8760,6 @@
                              scanptr.p->m_curr_batch_size_rows,
                              nextScanConf->accOperationPtr);
   jam();
-  scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
-  scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
-  scanptr.p->scanLocalFragid = nextScanConf->fragId;
   nextScanConfLoopLab(signal);
 }//Dblqh::nextScanConfScanLab()
 
@@ -8145,9 +8808,7 @@
   jam();
   
   int res;
-  Uint32 local_key;
-  local_key = scanptr.p->scanLocalref[0] << MAX_TUPLES_BITS;
-  local_key += scanptr.p->scanLocalref[1];
+  Uint32 local_key = scanPtr.p->m_row_id.ref();
   
   if((res= c_tup->load_diskpage_scan(signal, 
 				     regTcPtr.p->tupConnectrec,
@@ -8214,13 +8875,13 @@
   Uint32 reqinfo = (scanPtr.p->scanLockHold == ZFALSE);
   reqinfo = reqinfo + (regTcPtr->operation << 6);
   reqinfo = reqinfo + (regTcPtr->opExec << 10);
-  
+
   TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend(); 
   
   tupKeyReq->connectPtr = regTcPtr->tupConnectrec;
   tupKeyReq->request = reqinfo;
-  tupKeyReq->keyRef1 = scanPtr.p->scanLocalref[0];
-  tupKeyReq->keyRef2 = scanPtr.p->scanLocalref[1];
+  tupKeyReq->keyRef1 = scanPtr.p->m_row_id.m_page_no;
+  tupKeyReq->keyRef2 = scanPtr.p->m_row_id.m_page_idx;
   tupKeyReq->attrBufLen = 0;
   tupKeyReq->opRef = scanPtr.p->scanApiOpPtr; 
   tupKeyReq->applRef = scanPtr.p->scanApiBlockref;
@@ -8260,9 +8921,9 @@
 Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
 {
   Uint32 tableId = tcConP->tableref;
-  Uint32 fragId = scanP->scanLocalFragid;
-  Uint32 fragPageId = scanP->scanLocalref[0];
-  Uint32 pageIndex = scanP->scanLocalref[1];
+  Uint32 fragId = tcConP->fragmentid;
+  Uint32 fragPageId = scanP->m_row_id.m_page_no;
+  Uint32 pageIndex = scanP->m_row_id.m_page_idx;
 
   if(scanP->rangeScan)
   {
@@ -8636,9 +9297,7 @@
   scanptr.p->tupScan = tupScan;
   scanptr.p->scanState = ScanRecord::SCAN_FREE;
   scanptr.p->scanFlag = ZFALSE;
-  scanptr.p->scanLocalref[0] = 0;
-  scanptr.p->scanLocalref[1] = 0;
-  scanptr.p->scanLocalFragid = 0;
+  scanptr.p->m_row_id.setNull();
   scanptr.p->scanTcWaiting = ZTRUE;
   scanptr.p->scanNumber = ~0;
   scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
@@ -8741,8 +9400,8 @@
   tcConnectptr.p->commitAckMarker = RNIL;
   tcConnectptr.p->m_offset_current_keybuf = 0;
   tcConnectptr.p->m_scan_curr_range_no = 0;
-  tcConnectptr.p->m_disk_table = tabptr.p->m_disk_table;
-
+  tcConnectptr.p->m_dealloc = 0;
+  
   TablerecPtr tTablePtr;
   tTablePtr.i = tabptr.p->primaryTableId;
   ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
@@ -9040,7 +9699,8 @@
   const Uint32 copyPtr = copyFragReq->userPtr;
   const Uint32 userRef = copyFragReq->userRef;
   const Uint32 nodeId = copyFragReq->nodeId;
-
+  const Uint32 gci = copyFragReq->gci;
+  
   ndbrequire(cnoActiveCopy < 3);
   ndbrequire(getFragmentrec(signal, fragId));
   ndbrequire(fragptr.p->copyFragState == ZIDLE);
@@ -9097,7 +9757,7 @@
   scanptr.p->scanApiOpPtr = tcConnectptr.i;
   scanptr.p->scanApiBlockref = reference();
   fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
-  scanptr.p->scanBlockref = DBACC_REF;
+  scanptr.p->scanBlockref = DBTUP_REF;
   scanptr.p->scanLockHold = ZFALSE;
   
   initScanTc(0,
@@ -9111,6 +9771,7 @@
   tcConnectptr.p->copyCountWords = 0;
   tcConnectptr.p->tcOprec = tcConnectptr.i;
   tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
+  tcConnectptr.p->savePointId = gci;
   scanptr.p->scanState = ScanRecord::WAIT_ACC_COPY;
   AccScanReq * req = (AccScanReq*)&signal->theData[0];
   req->senderData = scanptr.i;
@@ -9120,10 +9781,13 @@
   req->requestInfo = 0;
   AccScanReq::setLockMode(req->requestInfo, 0);
   AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
+  AccScanReq::setNRScanFlag(req->requestInfo, gci ? 1 : 0);
+  AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
+
   req->transId1 = tcConnectptr.p->transid[0];
   req->transId2 = tcConnectptr.p->transid[1];
   req->savePointId = tcConnectptr.p->savePointId;
-  sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal, 
+  sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal, 
 	     AccScanReq::SignalLength, JBB);
   return;
 }//Dblqh::execCOPY_FRAGREQ()
@@ -9152,7 +9816,7 @@
   signal->theData[2] = scanptr.p->scanSchemaVersion;
   signal->theData[3] = ZSTORED_PROC_COPY;
 // theData[4] is not used in TUP with ZSTORED_PROC_COPY
-  sendSignal(tcConnectptr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
+  sendSignal(scanptr.p->scanBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
   return;
 }//Dblqh::accScanConfCopyLab()
 
@@ -9211,12 +9875,25 @@
 
 void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) 
 {
+  /**
+   * Start sending ROWID for all operations from now on
+   */
+  fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
+
+  if (0)
+  {
+    ndbout_c("STOPPING COPY (%d -> %d %d %d)",
+	     scanptr.p->scanBlockref,
+	     scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
+    return;
+  }
+  
   scanptr.i = tcConnectptr.p->tcScanRec;
   c_scanRecordPool.getPtr(scanptr);
   signal->theData[0] = scanptr.p->scanAccPtr;
   signal->theData[1] = RNIL;
   signal->theData[2] = NextScanReq::ZSCAN_NEXT;
-  sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
   return;
 }//Dblqh::continueFirstCopyAfterBlockedLab()
 
@@ -9267,30 +9944,87 @@
     return;
   }//if
 
-  // If accOperationPtr == RNIL no record was returned by ACC
-  if (nextScanConf->accOperationPtr == RNIL) {
+  TcConnectionrec * tcConP = tcConnectptr.p;
+  
+  tcConP->m_use_rowid = true;
+  tcConP->m_row_id = scanptr.p->m_row_id;
+  
+  if (signal->getLength() == 7)
+  {
     jam();
-    signal->theData[0] = scanptr.p->scanAccPtr;
-    signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
-    sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
-    return;      
-  }
-
-  set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
-  initCopyTc(signal);
+    ndbrequire(nextScanConf->accOperationPtr == RNIL);
+    initCopyTc(signal, ZDELETE);
+    set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
+    tcConP->gci = nextScanConf->gci;
+
+    tcConP->primKeyLen = 0;
+    tcConP->totSendlenAi = 0;
+    tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
 
-  Fragrecord* fragPtrP= fragptr.p;
-  scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
-  tcConnectptr.p->transactionState = TcConnectionrec::COPY_TUPKEY;
-  if(tcConnectptr.p->m_disk_table)
-  {
-    next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+/*---------------------------------------------------------------------------*/
+// To avoid using up to many operation records in ACC we will increase the
+// constant to ensure that we never send more than 40 records at a time.
+// This is where the constant 56 comes from. For long records this constant
+// will not matter that much. The current maximum is 6000 words outstanding
+// (including a number of those 56 words not really sent). We also have to
+// ensure that there are never more simultaneous usage of these operation
+// records to ensure that node recovery does not fail because of simultaneous
+// scanning.
+/*---------------------------------------------------------------------------*/
+    UintR TnoOfWords = 8;
+    TnoOfWords = TnoOfWords + MAGIC_CONSTANT;
+    TnoOfWords = TnoOfWords + (TnoOfWords >> 2);
+    
+    /*-----------------------------------------------------------------
+     * NOTE for transid1!
+     * Transid1 in the tcConnection record is used load regulate the 
+     * copy(node recovery) process.
+     * The number of outstanding words are written in the transid1 
+     * variable. This will be sent to the starting node in the 
+     * LQHKEYREQ signal and when the answer is returned in the LQHKEYCONF
+     * we can reduce the number of outstanding words and check to see
+     * if more LQHKEYREQ signals should be sent.
+     * 
+     * However efficient this method is rather unsafe in such way that
+     * it overwrites the transid1 original data.
+     *
+     * Also see TR 587.
+     *----------------------------------------------------------------*/
+    tcConP->transid[0] = TnoOfWords; // Data overload, see note!
+    packLqhkeyreqLab(signal);
+    tcConP->copyCountWords += TnoOfWords;
+    scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
+    if (tcConP->copyCountWords < cmaxWordsAtNodeRec) {
+      nextRecordCopy(signal);
+    }
+    return;
   }
   else
   {
-    next_scanconf_tupkeyreq(signal, scanptr, tcConnectptr.p, fragPtrP, RNIL);
+    // If accOperationPtr == RNIL no record was returned by ACC
+    if (nextScanConf->accOperationPtr == RNIL) {
+      jam();
+      signal->theData[0] = scanptr.p->scanAccPtr;
+      signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
+      sendSignal(scanptr.p->scanBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
+      return;      
+    }
+    
+    initCopyTc(signal, ZINSERT);
+    set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
+    
+    Fragrecord* fragPtrP= fragptr.p;
+    scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
+    tcConP->transactionState = TcConnectionrec::COPY_TUPKEY;
+    if(tcConP->m_disk_table)
+    {
+      next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+    }
+    else
+    {
+      next_scanconf_tupkeyreq(signal, scanptr, tcConP, fragPtrP, RNIL);
+    }
   }
-  return;
 }//Dblqh::nextScanConfCopyLab()
 
 
@@ -9366,6 +10100,7 @@
   Uint32* tmp = signal->getDataPtrSend()+24;
   Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
   
+  tcConP->gci = tmp[len];
   // Calculate hash (no need to linearies key)
   if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
   {
@@ -9532,10 +10267,23 @@
   c_scanRecordPool.getPtr(scanptr);
   tcConnectptr.p->errorCode = 0;
   Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
-  signal->theData[0] = scanptr.p->scanAccPtr;
-  signal->theData[1] = acc_op_ptr;
-  signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
-  sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  if (acc_op_ptr != RNIL)
+  {
+    signal->theData[0] = scanptr.p->scanAccPtr;
+    signal->theData[1] = acc_op_ptr;
+    signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
+    sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  }
+  else
+  {
+    /**
+     * No need to commit (unlock)
+     */
+    signal->theData[0] = scanptr.p->scanAccPtr;
+    signal->theData[1] = RNIL;
+    signal->theData[2] = NextScanReq::ZSCAN_NEXT;
+    sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  }
   return;
 }//Dblqh::continueCopyAfterBlockedLab()
 
@@ -9566,6 +10314,12 @@
   tcConnectptr.p->transid[1] = 0;
   fragptr.i = tcConnectptr.p->fragmentptr;
   c_fragment_pool.getPtr(fragptr);
+
+  /**
+   * Stop sending ROWID for all operations from now on
+   */
+  fragptr.p->m_copy_started_state = Fragrecord::AC_NORMAL;
+  
   scanptr.i = tcConnectptr.p->tcScanRec;
   c_scanRecordPool.getPtr(scanptr);
   scanptr.p->scanState = ScanRecord::WAIT_CLOSE_COPY;
@@ -9605,8 +10359,8 @@
   c_scanRecordPool.getPtr(scanptr);
   signal->theData[0] = scanptr.p->scanAccPtr;
   signal->theData[1] = RNIL;
-  signal->theData[2] = ZCOPY_CLOSE;
-  sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  signal->theData[2] = NextScanReq::ZSCAN_CLOSE;
+  sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
   return;
 }//Dblqh::continueCloseCopyAfterBlockedLab()
 
@@ -9691,7 +10445,7 @@
       conf->tableId = tcConnectptr.p->tableref;
       conf->fragId = tcConnectptr.p->fragmentid;
       sendSignal(tcConnectptr.p->clientBlockref, GSN_COPY_FRAGCONF, signal,
-                 CopyFragConf::SignalLength, JBB);
+		 CopyFragConf::SignalLength, JBB);
     }//if
   }//if
   releaseActiveCopy(signal);
@@ -9710,6 +10464,7 @@
 void Dblqh::closeCopyRequestLab(Signal* signal) 
 {
   scanptr.p->scanErrorCounter++;
+  ndbout_c("closeCopyRequestLab: scanState: %d", scanptr.p->scanState);
   switch (scanptr.p->scanState) {
   case ScanRecord::WAIT_TUPKEY_COPY:
   case ScanRecord::WAIT_NEXT_SCAN_COPY:
@@ -9793,7 +10548,13 @@
     }//if
     return;
   }//if
+  
   fragptr.p->fragStatus = Fragrecord::FSACTIVE;
+  if (TRACENR_FLAG)
+    TRACENR("tab: " << tabptr.i 
+	    << " frag: " << fragId 
+	    << " COPY ACTIVE" << endl);
+  
   if (fragptr.p->lcpFlag == Fragrecord::LCP_STATE_TRUE) {
     jam();
     fragptr.p->logFlag = Fragrecord::STATE_TRUE;
@@ -9939,21 +10700,18 @@
 /*                                                                           */
 /*       SUBROUTINE SHORT NAME = ICT                                         */
 /* ========================================================================= */
-void Dblqh::initCopyTc(Signal* signal) 
+void Dblqh::initCopyTc(Signal* signal, Operation_t op) 
 {
-  const NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
-  scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
-  scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
-  scanptr.p->scanLocalFragid = nextScanConf->fragId;
   tcConnectptr.p->operation = ZREAD;
   tcConnectptr.p->apiVersionNo = 0;
   tcConnectptr.p->opExec = 0;	/* NOT INTERPRETED MODE */
   tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
   Uint32 reqinfo = 0;
-  LqhKeyReq::setLockType(reqinfo, ZINSERT);
   LqhKeyReq::setDirtyFlag(reqinfo, 1);
   LqhKeyReq::setSimpleFlag(reqinfo, 1);
-  LqhKeyReq::setOperation(reqinfo, ZWRITE);
+  LqhKeyReq::setOperation(reqinfo, op);
+  LqhKeyReq::setGCIFlag(reqinfo, 1);
+  LqhKeyReq::setNrCopyFlag(reqinfo, 1);
                                         /* AILen in LQHKEYREQ  IS ZERO */
   tcConnectptr.p->reqinfo = reqinfo;
 /* ------------------------------------------------------------------------ */
@@ -10273,7 +11031,6 @@
    * ----------------------------------------------------------------------- */
   fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
   c_fragment_pool.getPtr(fragptr);
-  fragptr.p->fragActiveStatus = ZFALSE;
   
   contChkpNextFragLab(signal);
   return;
@@ -10727,6 +11484,29 @@
   const Uint32 dihBlockRef = saveReq->dihBlockRef;
   const Uint32 dihPtr = saveReq->dihPtr;
   const Uint32 gci = saveReq->gci;
+
+  if(getNodeState().startLevel >= NodeState::SL_STOPPING_4){
+    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
+    saveRef->dihPtr = dihPtr;
+    saveRef->nodeId = getOwnNodeId();
+    saveRef->gci    = gci;
+    saveRef->errorCode = GCPSaveRef::NodeShutdownInProgress;
+    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
+	       GCPSaveRef::SignalLength, JBB);
+    return;
+  }
+
+  if (getNodeState().getNodeRestartInProgress())
+  {
+    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
+    saveRef->dihPtr = dihPtr;
+    saveRef->nodeId = getOwnNodeId();
+    saveRef->gci    = gci;
+    saveRef->errorCode = GCPSaveRef::NodeRestartInProgress;
+    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
+	       GCPSaveRef::SignalLength, JBB);
+    return;
+  }
   
   ndbrequire(gci >= cnewestCompletedGci);
   
@@ -10763,30 +11543,7 @@
   }//if
   
   ndbrequire(ccurrentGcprec == RNIL);
-  
-  
-  if(getNodeState().startLevel >= NodeState::SL_STOPPING_4){
-    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
-    saveRef->dihPtr = dihPtr;
-    saveRef->nodeId = getOwnNodeId();
-    saveRef->gci    = gci;
-    saveRef->errorCode = GCPSaveRef::NodeShutdownInProgress;
-    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
-	       GCPSaveRef::SignalLength, JBB);
-    return;
-  }
-
-  if(getNodeState().getNodeRestartInProgress()){
-    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
-    saveRef->dihPtr = dihPtr;
-    saveRef->nodeId = getOwnNodeId();
-    saveRef->gci    = gci;
-    saveRef->errorCode = GCPSaveRef::NodeRestartInProgress;
-    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
-	       GCPSaveRef::SignalLength, JBB);
-    return;
-  }
-  
+    
   ccurrentGcprec = 0;
   gcpPtr.i = ccurrentGcprec;
   ptrCheckGuard(gcpPtr, cgcprecFileSize, gcpRecord);
@@ -11705,7 +12462,7 @@
 /* WE WILL CLOSE THE FILE.                                                   */
 /*---------------------------------------------------------------------------*/
         logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_WRITE_LOG;
-        closeFile(signal, logFilePtr);
+        closeFile(signal, logFilePtr, __LINE__);
       }//if
     }//if
   }//if
@@ -11875,7 +12632,7 @@
     jam();
     releaseLogpage(signal);
     logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_INIT;
-    closeFile(signal, logFilePtr);
+    closeFile(signal, logFilePtr, __LINE__);
     return;
   }//if
   writeInitMbyte(signal);
@@ -12433,7 +13190,7 @@
    * ------------------------------------------------------------------------ */
   releaseLogpage(signal);
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   LogFileRecordPtr locLogFilePtr;
   findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
   locLogFilePtr.p->logFileStatus = LogFileRecord::OPEN_SR_LAST_FILE;
@@ -12504,7 +13261,7 @@
     }//if
   }//if
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   if (logPartPtr.p->noLogFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
     Uint32 fileNo;
     if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
@@ -12562,7 +13319,7 @@
   }//if
   releaseLogpage(signal);
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   if (logPartPtr.p->srRemainingFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
     Uint32 fileNo;
     if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
@@ -12871,13 +13628,6 @@
    *   WE ALSO NEED TO SET CNEWEST_GCI TO ENSURE THAT LOG RECORDS ARE EXECUTED
    *   WITH A PROPER GCI.
    *------------------------------------------------------------------------ */
-  if (cstartType == NodeState::ST_NODE_RESTART) {
-    jam();
-    signal->theData[0] = ZSR_PHASE3_START;
-    signal->theData[1] = ZSR_PHASE2_COMPLETED;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-    return;
-  }//if
   if(cstartType == NodeState::ST_INITIAL_NODE_RESTART){
     jam();
     StartRecConf * conf = (StartRecConf*)signal->getDataPtrSend();
@@ -13025,16 +13775,9 @@
      *    WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED 
      *    ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
      * --------------------------------------------------------------------- */
-    for (Uint32 i = 0; i < cnoOfNodes; i++) {
-      jam();
-      if (cnodeStatus[i] == ZNODE_UP) {
-        jam();
-        ndbrequire(cnodeData[i] < MAX_NDB_NODES);
-        BlockReference ref = calcLqhBlockRef(cnodeData[i]);
-        signal->theData[0] = cownNodeid;
-        sendSignal(ref, GSN_EXEC_SRREQ, signal, 1, JBB);
-      }//if
-    }//for
+    NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+    signal->theData[0] = cownNodeid;
+    sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
     return;
   } else {
     jam();
@@ -13075,8 +13818,15 @@
       c_lcp_complete_fragments.remove(fragptr);
       c_redo_complete_fragments.add(fragptr);
       
-      fragptr.p->fragStatus = Fragrecord::FSACTIVE;
-      fragptr.p->logFlag = Fragrecord::STATE_TRUE;
+      if (!getNodeState().getNodeRestartInProgress())
+      {
+	fragptr.p->logFlag = Fragrecord::STATE_TRUE;
+	fragptr.p->fragStatus = Fragrecord::FSACTIVE;
+      } 
+      else
+      {	
+	fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;	
+      }
       signal->theData[0] = fragptr.p->srUserptr;
       signal->theData[1] = cownNodeid;
       sendSignal(fragptr.p->srBlockref, GSN_START_FRAGCONF, signal, 2, JBB);
@@ -13174,32 +13924,22 @@
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   arrGuard(nodeId, MAX_NDB_NODES);
-  cnodeExecSrState[nodeId] = ZEXEC_SR_COMPLETED;
-  ndbrequire(cnoOfNodes < MAX_NDB_NODES);
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
+  m_sr_exec_sr_conf.set(nodeId);
+  if (!m_sr_nodes.equal(m_sr_exec_sr_conf))
+  {
     jam();
-    if (cnodeStatus[i] == ZNODE_UP) {
-      jam();
-      nodeId = cnodeData[i];
-      arrGuard(nodeId, MAX_NDB_NODES);
-      if (cnodeExecSrState[nodeId] != ZEXEC_SR_COMPLETED) {
-        jam();
-	/* ------------------------------------------------------------------
-	 *  ALL NODES HAVE NOT REPORTED COMPLETION OF EXECUTING FRAGMENT 
-	 *  LOGS YET.
-	 * ----------------------------------------------------------------- */
-        return;
-      }//if
-    }//if
-  }//for
-
+    /* ------------------------------------------------------------------
+     *  ALL NODES HAVE NOT REPORTED COMPLETION OF EXECUTING FRAGMENT 
+     *  LOGS YET.
+     * ----------------------------------------------------------------- */
+    return;
+  }
+  
   /* ------------------------------------------------------------------------
    *  CLEAR NODE SYSTEM RESTART EXECUTION STATE TO PREPARE FOR NEXT PHASE OF
    *  LOG EXECUTION.
    * ----------------------------------------------------------------------- */
-  for (nodeId = 0; nodeId < MAX_NDB_NODES; nodeId++) {
-    cnodeExecSrState[nodeId] = ZSTART_SR;
-  }//for
+  m_sr_exec_sr_conf.clear();
 
   /* ------------------------------------------------------------------------
    *  NOW CHECK IF ALL FRAGMENTS IN THIS PHASE HAVE COMPLETED. IF SO START THE
@@ -13283,29 +14023,19 @@
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   ndbrequire(nodeId < MAX_NDB_NODES);
-  cnodeSrState[nodeId] = ZEXEC_SR_COMPLETED;
-  ndbrequire(cnoOfNodes < MAX_NDB_NODES);
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
+  m_sr_exec_sr_req.set(nodeId);
+  if (!m_sr_exec_sr_req.equal(m_sr_nodes))
+  {
     jam();
-    if (cnodeStatus[i] == ZNODE_UP) {
-      jam();
-      nodeId = cnodeData[i];
-      if (cnodeSrState[nodeId] != ZEXEC_SR_COMPLETED) {
-        jam();
-	/* ------------------------------------------------------------------
-	 *  ALL NODES HAVE NOT REPORTED COMPLETION OF SENDING EXEC_FRAGREQ YET.
-	 * ----------------------------------------------------------------- */
-        return;
-      }//if
-    }//if
-  }//for
+    return;
+  }
+
   /* ------------------------------------------------------------------------
    *  CLEAR NODE SYSTEM RESTART STATE TO PREPARE FOR NEXT PHASE OF LOG 
    *  EXECUTION
    * ----------------------------------------------------------------------- */
-  for (nodeId = 0; nodeId < MAX_NDB_NODES; nodeId++) {
-    cnodeSrState[nodeId] = ZSTART_SR;
-  }//for
+  m_sr_exec_sr_req.clear();
+
   if (csrPhasesCompleted != 0) {
     /* ----------------------------------------------------------------------
      *       THE FIRST PHASE MUST ALWAYS EXECUTE THE LOG.
@@ -13313,7 +14043,7 @@
     if (cnoFragmentsExecSr == 0) {
       jam();
       /* --------------------------------------------------------------------
-       *   THERE WERE NO FRAGMENTS THAT NEEDED TO EXECUTE THE LOG IN THIS PHASE.
+       *  THERE WERE NO FRAGMENTS THAT NEEDED TO EXECUTE THE LOG IN THIS PHASE.
        * ------------------------------------------------------------------- */
       srPhase3Comp(signal);
       return;
@@ -13357,11 +14087,6 @@
   if (csrPhaseStarted == ZSR_NO_PHASE_STARTED) {
     jam();
     csrPhaseStarted = tsrPhaseStarted;
-    if (cstartType == NodeState::ST_NODE_RESTART) {
-      ndbrequire(cinitialStartOngoing == ZTRUE);
-      cinitialStartOngoing = ZFALSE;
-      checkStartCompletedLab(signal);
-    }//if
     return;
   }//if  
   ndbrequire(csrPhaseStarted != tsrPhaseStarted);
@@ -13384,22 +14109,12 @@
       logPartPtr.p->logLastGci = 2;
     }//if
   }//for
-  if (cstartType == NodeState::ST_NODE_RESTART) {
-    jam();
-    /* ----------------------------------------------------------------------
-     *  FOR A NODE RESTART WE HAVE NO FRAGMENTS DEFINED YET. 
-     *  THUS WE CAN SKIP THAT PART
-     * --------------------------------------------------------------------- */
-    signal->theData[0] = ZSR_GCI_LIMITS;
-    signal->theData[1] = RNIL;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  } else {
-    jam();
-    c_lcp_complete_fragments.first(fragptr);
-    signal->theData[0] = ZSR_GCI_LIMITS;
-    signal->theData[1] = fragptr.i;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  }//if
+  
+  jam();
+  c_lcp_complete_fragments.first(fragptr);
+  signal->theData[0] = ZSR_GCI_LIMITS;
+  signal->theData[1] = fragptr.i;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
   return;
 }//Dblqh::srPhase3Start()
 
@@ -13759,7 +14474,7 @@
       jam();
       releaseMmPages(signal);
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_SR_COMPLETED;
-      closeFile(signal, logFilePtr);
+      closeFile(signal, logFilePtr, __LINE__);
       return;
       break;
     case LogPartRecord::LES_EXEC_LOG_NEW_MBYTE:
@@ -13775,7 +14490,7 @@
       ptrCheckGuard(nextLogFilePtr, clogFileFileSize, logFileRecord);
       nextLogFilePtr.p->currentMbyte = 0;
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_SR;
-      closeFile(signal, logFilePtr);
+      closeFile(signal, logFilePtr, __LINE__);
       return;
       break;
     case LogPartRecord::LES_EXEC_LOG:
@@ -14110,6 +14825,8 @@
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   fragptr.i = tcConnectptr.p->fragmentptr;
   c_fragment_pool.getPtr(fragptr);
+  tcConnectptr.p->m_log_part_ptr_i = fragptr.p->m_log_part_ptr_i;
+
   // Read a log record and prepare it for execution
   readLogHeader(signal);
   readKey(signal);
@@ -14226,7 +14943,7 @@
     if (logFilePtr.i != nextAfterCurrentLogFilePtr.i) {
       // This file should be closed.
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSE_SR_INVALIDATE_PAGES;
-      closeFile(signal, logFilePtr);  
+      closeFile(signal, logFilePtr, __LINE__);  
       // Return from this function and wait for close confirm. Then come back 
       // and test the previous file for closing.
       return;
@@ -14283,17 +15000,19 @@
   case ZUPDATE:
   case ZDELETE:
     jam();
-    ndbrequire(terrorCode == ZNO_TUPLE_FOUND);
+    if (unlikely(terrorCode != ZNO_TUPLE_FOUND))
+      goto error;
     break;
   case ZINSERT:
     jam();
-    ndbrequire(terrorCode == ZTUPLE_ALREADY_EXIST);
+    if (unlikely(terrorCode != ZTUPLE_ALREADY_EXIST && terrorCode != 899))
+      goto error;
+    
     break;
   default:
-    ndbrequire(false);
-    return;
-    break;
-  }//switch
+    goto error;
+  }
+
   if (result == ZOK) {
     jam();
     execLogRecord(signal);
@@ -14312,6 +15031,19 @@
    *  PROCEEDING IN RARE CASES.
    * ----------------------------------------------------------------------- */
   return;
+error:
+  BaseString tmp;
+  tmp.appfmt("You have found a bug!"
+	     " Failed op (%s) during REDO table: %d fragment: %d err: %d",
+	     tcConnectptr.p->operation == ZINSERT ? "INSERT" :
+	     tcConnectptr.p->operation == ZUPDATE ? "UPDATE" :
+	     tcConnectptr.p->operation == ZDELETE ? "DELETE" :
+	     tcConnectptr.p->operation == ZWRITE ? "WRITE" : "<unknown>",
+	     tcConnectptr.p->tableref,
+	     tcConnectptr.p->fragmentid,
+	     terrorCode);
+  progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 
+	    tmp.c_str());
 }//Dblqh::logLqhkeyrefLab()
 
 void Dblqh::closeExecSrCompletedLab(Signal* signal) 
@@ -14359,24 +15091,11 @@
    *   ALL LOG PARTS HAVE COMPLETED THE EXECUTION OF THE LOG. WE CAN NOW START
    *   SENDING THE EXEC_FRAGCONF SIGNALS TO ALL INVOLVED FRAGMENTS.
    * ----------------------------------------------------------------------- */
-  if (cstartType != NodeState::ST_NODE_RESTART) {
-    jam();
-    c_lcp_complete_fragments.first(fragptr);
-    signal->theData[0] = ZSEND_EXEC_CONF;
-    signal->theData[1] = fragptr.i;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  } else {
-    jam();
-    /* ----------------------------------------------------------------------
-     *  FOR NODE RESTART WE CAN SKIP A NUMBER OF STEPS SINCE WE HAVE NO 
-     *  FRAGMENTS DEFINED AT THIS POINT. OBVIOUSLY WE WILL NOT NEED TO 
-     *  EXECUTE ANY MORE LOG STEPS EITHER AND THUS WE CAN IMMEDIATELY 
-     *  START FINDING THE END AND THE START OF THE LOG.
-     * --------------------------------------------------------------------- */
-    csrPhasesCompleted = 3;
-    execSrCompletedLab(signal);
-    return;
-  }//if
+  jam();
+  c_lcp_complete_fragments.first(fragptr);
+  signal->theData[0] = ZSEND_EXEC_CONF;
+  signal->theData[1] = fragptr.i;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
   return;
 }//Dblqh::execLogComp()
 
@@ -14765,7 +15484,7 @@
 /*       YET. THIS IS POSSIBLE IF ACTIVE CREATION OF THE FRAGMENT IS       */
 /*       ONGOING.                                                          */
 /*************************************************************************>*/
-    if (tcConnectptr.p->activeCreat == ZTRUE) {
+    if (tcConnectptr.p->activeCreat == Fragrecord::AC_IGNORED) {
         jam();
 /*************************************************************************>*/
 /*       ONGOING ABORTS DURING ACTIVE CREATION MUST SAVE THE ATTRIBUTE INFO*/
@@ -14810,7 +15529,7 @@
 /*       COMPLETED AND THAT THE ERROR CODE IS PROPERLY SET                 */
 /*************************************************************************>*/
         tcConnectptr.p->errorCode = terrorCode;
-        tcConnectptr.p->activeCreat = ZFALSE;
+        tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL;
         if (tcConnectptr.p->transactionState == 
 	    TcConnectionrec::WAIT_AI_AFTER_ABORT) {
           jam();
@@ -15082,13 +15801,15 @@
 /* ------       CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG        ------- */
 /*                                                                           */
 /* ------------------------------------------------------------------------- */
-void Dblqh::closeFile(Signal* signal, LogFileRecordPtr clfLogFilePtr) 
+void Dblqh::closeFile(Signal* signal, 
+		      LogFileRecordPtr clfLogFilePtr, Uint32 line) 
 {
   signal->theData[0] = clfLogFilePtr.p->fileRef;
   signal->theData[1] = cownref;
   signal->theData[2] = clfLogFilePtr.i;
   signal->theData[3] = ZCLOSE_NO_DELETE;
-  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
+  signal->theData[4] = line;
+  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 5, JBA);
 }//Dblqh::closeFile()
 
 
@@ -15420,7 +16141,6 @@
     refresh_watch_dog();
     new (fragptr.p) Fragrecord();
     fragptr.p->fragStatus = Fragrecord::FREE;
-    fragptr.p->fragActiveStatus = ZFALSE;
     fragptr.p->execSrStatus = Fragrecord::IDLE;
     fragptr.p->srStatus = Fragrecord::SS_IDLE;
   }
@@ -15579,10 +16299,9 @@
   switch (data) {
   case 0:
     jam();
-    for (i = 0; i < MAX_NDB_NODES; i++) {
-      cnodeSrState[i] = ZSTART_SR;
-      cnodeExecSrState[i] = ZSTART_SR;
-    }//for
+    m_sr_nodes.clear();
+    m_sr_exec_sr_req.clear();
+    m_sr_exec_sr_conf.clear();
     for (i = 0; i < 1024; i++) {
       ctransidHash[i] = RNIL;
     }//for
@@ -15590,16 +16309,11 @@
       cactiveCopy[i] = RNIL;
     }//for
     cnoActiveCopy = 0;
-    cCounterAccCommitBlocked = 0;
-    cCounterTupCommitBlocked = 0;
-    caccCommitBlocked = false;
-    ctupCommitBlocked = false;
-    cCommitBlocked = false;
     ccurrentGcprec = RNIL;
     caddNodeState = ZFALSE;
     cstartRecReq = ZFALSE;
-    cnewestGci = (UintR)-1;
-    cnewestCompletedGci = (UintR)-1;
+    cnewestGci = ~0;
+    cnewestCompletedGci = ~0;
     crestartOldestGci = 0;
     crestartNewestGci = 0;
     csrPhaseStarted = ZSR_NO_PHASE_STARTED;
@@ -15924,7 +16638,7 @@
  * ========================================================================= */
 void Dblqh::initLogPointers(Signal* signal) 
 {
-  logPartPtr.i = tcConnectptr.p->hashValue & 3;
+  logPartPtr.i = tcConnectptr.p->m_log_part_ptr_i;
   ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
   logFilePtr.i = logPartPtr.p->currentLogfile;
   ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
@@ -15956,10 +16670,10 @@
 /*       SET SIMPLE TRANSACTION                                              */
 /* ------------------------------------------------------------------------- */
   LqhKeyReq::setSimpleFlag(Treqinfo, 1);
+  LqhKeyReq::setGCIFlag(Treqinfo, 1);
 /* ------------------------------------------------------------------------- */
 /* SET OPERATION TYPE AND LOCK MODE (NEVER READ OPERATION OR SCAN IN LOG)    */
 /* ------------------------------------------------------------------------- */
-  LqhKeyReq::setLockType(Treqinfo, regTcPtr->operation);
   LqhKeyReq::setOperation(Treqinfo, regTcPtr->operation);
   regTcPtr->reqinfo = Treqinfo;
 /* ------------------------------------------------------------------------ */
@@ -16446,6 +17160,8 @@
     tcConnectptr.p->operation = logPagePtr.p->logPageWord[logPos + 3];
     tcConnectptr.p->totSendlenAi = logPagePtr.p->logPageWord[logPos + 4];
     tcConnectptr.p->primKeyLen = logPagePtr.p->logPageWord[logPos + 5];
+    tcConnectptr.p->m_row_id.m_page_no = logPagePtr.p->logPageWord[logPos + 6];
+    tcConnectptr.p->m_row_id.m_page_idx = logPagePtr.p->logPageWord[logPos+ 7];
     logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + ZLOG_HEAD_SIZE;
   } else {
     jam();
@@ -16455,7 +17171,11 @@
     tcConnectptr.p->operation = readLogwordExec(signal);
     tcConnectptr.p->totSendlenAi = readLogwordExec(signal);
     tcConnectptr.p->primKeyLen = readLogwordExec(signal);
+    tcConnectptr.p->m_row_id.m_page_no = readLogwordExec(signal);
+    tcConnectptr.p->m_row_id.m_page_idx = readLogwordExec(signal);
   }//if
+
+  tcConnectptr.p->m_use_rowid = (tcConnectptr.p->operation == ZINSERT);
 }//Dblqh::readLogHeader()
 
 /* ------------------------------------------------------------------------- */
@@ -16762,7 +17482,7 @@
         clfLogFilePtr.i = logPartPtr.p->execSrExecLogFile;
         ptrCheckGuard(clfLogFilePtr, clogFileFileSize, logFileRecord);
         clfLogFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_LOG;
-        closeFile(signal, clfLogFilePtr);
+        closeFile(signal, clfLogFilePtr, __LINE__);
         result = ZCLOSE_FILE;
       }//if
     }//if
@@ -17361,9 +18081,8 @@
 	      sp.p->scanAiLength,
 	      sp.p->m_curr_batch_size_rows,
 	      sp.p->m_max_batch_size_rows);
-    infoEvent(" errCnt=%d, localFid=%d, schV=%d",
+    infoEvent(" errCnt=%d, schV=%d",
 	      sp.p->scanErrorCounter,
-	      sp.p->scanLocalFragid,
 	      sp.p->scanSchemaVersion);
     infoEvent(" stpid=%d, flag=%d, lhold=%d, lmode=%d, num=%d",
 	      sp.p->scanStoredProcId,
@@ -17544,7 +18263,6 @@
 	       << " m_max_batch_size_rows="<<
 	  TscanPtr.p->m_max_batch_size_rows
 	       << " scanErrorCounter="<<TscanPtr.p->scanErrorCounter
-	       << " scanLocalFragid="<<TscanPtr.p->scanLocalFragid
 	       << endl;
 	ndbout << " scanSchemaVersion="<<TscanPtr.p->scanSchemaVersion
 	       << "  scanStoredProcId="<<TscanPtr.p->scanStoredProcId
@@ -17696,60 +18414,54 @@
 Dblqh::execCREATE_TRIG_REQ(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference tupref = calcTupBlockRef(myNodeId);
 
-  sendSignal(tupref, GSN_CREATE_TRIG_REQ, signal, CreateTrigReq::SignalLength, JBB);
+  sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, signal,
+             CreateTrigReq::SignalLength, JBB);
 }
 
 void
 Dblqh::execCREATE_TRIG_CONF(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference dictref = calcDictBlockRef(myNodeId);
 
-  sendSignal(dictref, GSN_CREATE_TRIG_CONF, signal, CreateTrigConf::SignalLength, JBB);
+  sendSignal(DBDICT_REF, GSN_CREATE_TRIG_CONF, signal,
+             CreateTrigConf::SignalLength, JBB);
 }
 
 void
 Dblqh::execCREATE_TRIG_REF(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference dictref = calcDictBlockRef(myNodeId);
 
-  sendSignal(dictref, GSN_CREATE_TRIG_REF, signal, CreateTrigRef::SignalLength, JBB);
+  sendSignal(DBDICT_REF, GSN_CREATE_TRIG_REF, signal,
+             CreateTrigRef::SignalLength, JBB);
 }
 
 void
 Dblqh::execDROP_TRIG_REQ(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference tupref = calcTupBlockRef(myNodeId);
 
-  sendSignal(tupref, GSN_DROP_TRIG_REQ, signal, DropTrigReq::SignalLength, JBB);
+  sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ, signal,
+             DropTrigReq::SignalLength, JBB);
 }
 
 void
 Dblqh::execDROP_TRIG_CONF(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference dictref = calcDictBlockRef(myNodeId);
 
-  sendSignal(dictref, GSN_DROP_TRIG_CONF, signal, DropTrigConf::SignalLength, JBB);
+  sendSignal(DBDICT_REF, GSN_DROP_TRIG_CONF, signal,
+             DropTrigConf::SignalLength, JBB);
 }
 
 void
 Dblqh::execDROP_TRIG_REF(Signal* signal)
 {
   jamEntry();
-  NodeId myNodeId = getOwnNodeId();
-  BlockReference dictref = calcDictBlockRef(myNodeId);
 
-  sendSignal(dictref, GSN_DROP_TRIG_REF, signal, DropTrigRef::SignalLength, JBB);
+  sendSignal(DBDICT_REF, GSN_DROP_TRIG_REF, signal,
+             DropTrigRef::SignalLength, JBB);
 }
 
 Uint32 Dblqh::calcPageCheckSum(LogPageRecordPtr logP){

--- 1.36/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2005-12-21 16:45:03 +01:00
+++ 1.37/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2006-01-12 10:11:54 +01:00
@@ -556,5 +556,17 @@
   DBUG_VOID_RETURN;
 }
 
+void
+Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
+{
+  m_impl.init_get_next_node(iter);
+}
+
+Uint32
+Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
+{
+  return m_impl.get_next_node(iter);
+}
+
 template class Vector<Ndb_cluster_connection_impl::Node>;
 
Thread
bk commit into 5.1 tree (jonas:1.2042)jonas12 Jan