List:Commits« Previous MessageNext Message »
From:jonas Date:March 21 2007 3:35pm
Subject:bk commit into 5.1 tree (jonas:1.2445)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-03-21 15:35:22+01:00, jonas@stripped +4 -0
  Merge perch.ndb.mysql.com:/home/jonas/src/50-work
  into  perch.ndb.mysql.com:/home/jonas/src/51-telco-gca
  MERGE: 1.1810.2124.56

  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp@stripped, 2007-03-21 15:35:18+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.8.10.2

  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp@stripped, 2007-03-21 15:35:18+01:00,
jonas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbdih/Dbdih.hpp ->
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2007-03-21 15:35:19+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.24.69.2

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2007-03-21 15:35:18+01:00,
jonas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbdih/DbdihMain.cpp ->
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp

  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp@stripped, 2007-03-21 15:35:19+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.26.13.2

  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp@stripped, 2007-03-21 15:35:18+01:00,
jonas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtc/Dbtc.hpp ->
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-03-21 15:35:19+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.73.35.2

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-03-21 15:35:18+01:00,
jonas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtc/DbtcMain.cpp ->
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-telco-gca/RESYNC

--- 1.8.10.1/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2007-03-21 15:35:27 +01:00
+++ 1.27/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2007-03-21 15:35:27 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -21,7 +20,6 @@
 #include <pc.hpp>
 #include <SimulatedBlock.hpp>
 #include "Sysfile.hpp"
-#include <ArrayList.hpp>
 #include <SignalCounter.hpp>
 
 #include <signaldata/MasterLCP.hpp>
@@ -61,17 +59,12 @@
 // ------------------------------------------
 // Error Codes for Transactions (None sofar)
 // ------------------------------------------
+#define ZUNDEFINED_FRAGMENT_ERROR 311
 
 // --------------------------------------
 // Error Codes for Add Table
 // --------------------------------------
 #define ZREPLERROR1 306
-#define ZNOTIMPLEMENTED 307
-#define ZTABLEINSTALLED 310
-// --------------------------------------
-// Error Codes for Scan Table
-// --------------------------------------
-#define ZERRONOUSSTATE 308
 
 // --------------------------------------
 // Crash Codes
@@ -236,6 +229,8 @@
     Uint32 storedReplicas;       /* "ALIVE" STORED REPLICAS */
     Uint32 nextFragmentChunk;
     
+    Uint32 m_log_part_id;
+    
     Uint8 distributionKey;
     Uint8 fragReplicas;
     Uint8 noOldStoredReplicas;  /* NUMBER OF "DEAD" STORED REPLICAS */
@@ -468,14 +463,22 @@
       TS_DROPPING = 3
     };
     enum Method {
-      HASH = 0,
-      NOTDEFINED = 1
+      LINEAR_HASH = 0,
+      NOTDEFINED = 1,
+      NORMAL_HASH = 2,
+      USER_DEFINED = 3
+    };
+    enum Storage {
+      ST_NOLOGGING = 0,         // Table is not logged, but survives SR
+      ST_NORMAL = 1,            // Normal table, logged and durable
+      ST_TEMPORARY = 2          // Table is lost after SR, not logged
     };
     CopyStatus tabCopyStatus;
     UpdateState tabUpdateState;
     TabLcpStatus tabLcpStatus;
     TabStatus tabStatus;
     Method method;
+    Storage tabStorage;
 
     Uint32 pageRef[8];
 //-----------------------------------------------------------------------------
@@ -508,7 +511,6 @@
     Uint8 kvalue;
     Uint8 noOfBackups;
     Uint8 noPages;
-    Uint8 storedTable;              /* 0 IF THE TABLE IS A TEMPORARY TABLE */
     Uint16 tableType;
     Uint16 primaryTableId;
   };
@@ -541,7 +543,9 @@
       TO_END_COPY = 19,
       TO_END_COPY_ONGOING = 20,
       TO_WAIT_ENDING = 21,
-      ENDING = 22
+      ENDING = 22,
+      
+      STARTING_LOCAL_FRAGMENTS = 24
     };
     enum ToSlaveStatus {
       TO_SLAVE_IDLE = 0,
@@ -568,7 +572,7 @@
   typedef Ptr<TakeOverRecord> TakeOverRecordPtr;
   
 public:
-  Dbdih(const class Configuration &);
+  Dbdih(Block_context& ctx);
   virtual ~Dbdih();
 
   struct RWFragment {
@@ -632,6 +636,7 @@
   void execTCGETOPSIZECONF(Signal *);
   void execTC_CLOPSIZECONF(Signal *);
   
+  int handle_invalid_lcp_no(const class LcpFragRep*, ReplicaRecordPtr);
   void execLCP_FRAG_REP(Signal *);
   void execLCP_COMPLETE_REP(Signal *);
   void execSTART_LCP_REQ(Signal *);
@@ -683,6 +688,7 @@
   void execGETGCIREQ(Signal *);
   void execDIH_RESTARTREQ(Signal *);
   void execSTART_RECCONF(Signal *);
+  void execSTART_FRAGREF(Signal *);
   void execSTART_FRAGCONF(Signal *);
   void execADD_FRAGCONF(Signal *);
   void execADD_FRAGREF(Signal *);
@@ -974,7 +980,9 @@
   void initialiseRecordsLab(Signal *, Uint32 stepNo, Uint32, Uint32);
 
   void findReplica(ReplicaRecordPtr& regReplicaPtr,
-                   Fragmentstore* fragPtrP, Uint32 nodeId);
+                   Fragmentstore* fragPtrP, 
+		   Uint32 nodeId,
+		   bool oldStoredReplicas = false);
 //------------------------------------
 // Node failure handling methods
 //------------------------------------
@@ -1135,6 +1143,10 @@
   void setNodeCopyCompleted(Uint32 nodeId, bool newState);
   bool checkNodeAlive(Uint32 nodeId);
 
+  void nr_start_fragments(Signal*, TakeOverRecordPtr);
+  void nr_start_fragment(Signal*, TakeOverRecordPtr, ReplicaRecordPtr);
+  void nr_run_redo(Signal*, TakeOverRecordPtr);
+  
   // Initialisation
   void initData();
   void initRecords();
@@ -1161,7 +1173,8 @@
 
   Uint32 c_nextNodeGroup;
   NodeGroupRecord *nodeGroupRecord;
-
+  Uint32 c_nextLogPart;
+  
   NodeRecord *nodeRecord;
 
   PageRecord *pageRecord;
@@ -1553,13 +1566,13 @@
    * Pool/list of WaitGCPProxyRecord record
    */
   ArrayPool<WaitGCPProxyRecord> waitGCPProxyPool;
-  ArrayList<WaitGCPProxyRecord> c_waitGCPProxyList;
+  DLList<WaitGCPProxyRecord> c_waitGCPProxyList;
 
   /**
    * Pool/list of WaitGCPMasterRecord record
    */
   ArrayPool<WaitGCPMasterRecord> waitGCPMasterPool;
-  ArrayList<WaitGCPMasterRecord> c_waitGCPMasterList;
+  DLList<WaitGCPMasterRecord> c_waitGCPMasterList;
 
   void checkWaitGCPProxy(Signal*, NodeId failedNodeId);
   void checkWaitGCPMaster(Signal*, NodeId failedNodeId);
@@ -1601,6 +1614,8 @@
    * Reply from nodeId
    */
   void startInfoReply(Signal *, Uint32 nodeId);
+
+  void dump_replica_info();
 
   // DIH specifics for execNODE_START_REP (sendDictUnlockOrd)
   void execNODE_START_REP(Signal* signal);

--- 1.24.69.1/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-03-21 15:35:27 +01:00
+++ 1.106/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-03-21 15:35:27 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -67,6 +66,7 @@
 #include <signaldata/CreateFragmentation.hpp>
 #include <signaldata/LqhFrag.hpp>
 #include <signaldata/FsOpenReq.hpp>
+#include <signaldata/DihFragCount.hpp>
 #include <signaldata/DictLock.hpp>
 #include <DebuggerNames.hpp>
 
@@ -610,6 +610,14 @@
     checkWaitDropTabFailedLqh(signal, nodeId, tableId);
     return;
   }
+  case DihContinueB::ZTO_START_FRAGMENTS:
+  {
+    TakeOverRecordPtr takeOverPtr;
+    takeOverPtr.i = signal->theData[1];
+    ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
   }//switch
   
   ndbrequire(false);
@@ -666,7 +674,9 @@
   if (cmasterdihref != reference())
   {
     jam();
+    Uint32 tmp= SYSFILE->m_restart_seq;
     memcpy(sysfileData, cdata, sizeof(sysfileData));
+    SYSFILE->m_restart_seq = tmp;
 
     if (c_set_initial_start_flag)
     {
@@ -1085,7 +1095,7 @@
   jamEntry();
 
   const ndb_mgm_configuration_iterator * p = 
-    theConfiguration.getOwnConfigIterator();
+    m_ctx.m_config.getOwnConfigIterator();
   ndbrequireErr(p != 0, NDBD_EXIT_INVALID_CONFIG);
 
   initData();
@@ -1128,6 +1138,26 @@
   return;
 }//Dbdih::execSTART_FRAGCONF()
 
+void Dbdih::execSTART_FRAGREF(Signal* signal) 
+{
+  jamEntry();
+ 
+  /**
+   * Kill starting node
+   */
+  Uint32 errCode = signal->theData[1];
+  Uint32 nodeId = signal->theData[2];
+  
+  SystemError * const sysErr = (SystemError*)&signal->theData[0];
+  sysErr->errorCode = SystemError::StartFragRefError;
+  sysErr->errorRef = reference();
+  sysErr->data1 = errCode;
+  sysErr->data2 = 0;
+  sendSignal(calcNdbCntrBlockRef(nodeId), GSN_SYSTEM_ERROR, signal, 
+	     SystemError::SignalLength, JBB);
+  return;
+}//Dbdih::execSTART_FRAGCONF()
+
 void Dbdih::execSTART_MEREF(Signal* signal) 
 {
   jamEntry();
@@ -1171,7 +1201,7 @@
 {
   jamEntry();
   cntrlblockref = signal->theData[0];
-  if(theConfiguration.getInitialStart()){
+  if(m_ctx.m_config.getInitialStart()){
     sendSignal(cntrlblockref, GSN_DIH_RESTARTREF, signal, 1, JBB);
   } else {
     readGciFileLab(signal);
@@ -1745,12 +1775,15 @@
    *
    * But dont copy lastCompletedGCI:s
    */
+  Uint32 key = SYSFILE->m_restart_seq;
   Uint32 tempGCP[MAX_NDB_NODES];
   for(i = 0; i < MAX_NDB_NODES; i++)
     tempGCP[i] = SYSFILE->lastCompletedGCI[i];
 
   for(i = 0; i < Sysfile::SYSFILE_SIZE32; i++)
     sysfileData[i] = cdata[i];
+
+  SYSFILE->m_restart_seq = key;
   for(i = 0; i < MAX_NDB_NODES; i++)
     SYSFILE->lastCompletedGCI[i] = tempGCP[i];
 
@@ -1798,8 +1831,8 @@
     return;
   }//if
   if (getNodeStatus(nodeId) != NodeRecord::DEAD){
-    ndbout << "nodeStatus in START_PERMREQ = " 
-	   << (Uint32) getNodeStatus(nodeId) << endl;
+    g_eventLogger.error("nodeStatus in START_PERMREQ = %u",
+                        (Uint32) getNodeStatus(nodeId));
     ndbrequire(false);
   }//if
 
@@ -1908,11 +1941,6 @@
   ndbrequire(c_nodeStartMaster.startNode == Tnodeid);
   ndbrequire(getNodeStatus(Tnodeid) == NodeRecord::STARTING);
   
-  sendSTART_RECREQ(signal, Tnodeid);
-}//Dbdih::execSTART_MEREQ()
-
-void Dbdih::nodeRestartStartRecConfLab(Signal* signal) 
-{
   c_nodeStartMaster.blockLcp = true;
   if ((c_lcpState.lcpStatus != LCP_STATUS_IDLE) &&
       (c_lcpState.lcpStatus != LCP_TCGET)) {
@@ -2059,6 +2087,9 @@
     signal->theData[0] = reference();
     signal->theData[1] = c_nodeStartSlave.nodeId;
     sendSignal(BACKUP_REF, GSN_INCL_NODEREQ, signal, 2, JBB);
+    
+    // Suma will not send response to this for now, later...
+    sendSignal(SUMA_REF, GSN_INCL_NODEREQ, signal, 2, JBB);
     return;
   }//if
   if (TstartNode_or_blockref == numberToRef(BACKUP, getOwnNodeId())){
@@ -2735,13 +2766,14 @@
     return;
   }//if
   c_startToLock = takeOverPtrI;
+
+  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   StartToReq * const req = (StartToReq *)&signal->theData[0];
   req->userPtr = takeOverPtr.i;
   req->userRef = reference();
   req->startingNodeId = takeOverPtr.p->toStartingNode;
   req->nodeTakenOver = takeOverPtr.p->toFailedNode;
   req->nodeRestart = takeOverPtr.p->toNodeRestart;
-  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   sendLoopMacro(START_TOREQ, sendSTART_TOREQ);
 }//Dbdih::sendStartTo()
 
@@ -2785,9 +2817,166 @@
   CRASH_INSERTION(7134);
   c_startToLock = RNIL;
 
+  if (takeOverPtr.p->toNodeRestart)
+  {
+    jam();
+    takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING_LOCAL_FRAGMENTS;
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
+
   startNextCopyFragment(signal, takeOverPtr.i);
 }//Dbdih::execSTART_TOCONF()
 
+void
+Dbdih::nr_start_fragments(Signal* signal, 
+			  TakeOverRecordPtr takeOverPtr)
+{
+  Uint32 loopCount = 0 ;
+  TabRecordPtr tabPtr;
+  while (loopCount++ < 100) {
+    tabPtr.i = takeOverPtr.p->toCurrentTabref;
+    if (tabPtr.i >= ctabFileSize) {
+      jam();
+      nr_run_redo(signal, takeOverPtr);
+      return;
+    }//if
+    ptrAss(tabPtr, tabRecord);
+    if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE ||
+	tabPtr.p->tabStorage != TabRecord::ST_NORMAL)
+    {
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    Uint32 fragId = takeOverPtr.p->toCurrentFragid;
+    if (fragId >= tabPtr.p->totalfragments) {
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    FragmentstorePtr fragPtr;
+    getFragstore(tabPtr.p, fragId, fragPtr);
+    ReplicaRecordPtr loopReplicaPtr;
+    loopReplicaPtr.i = fragPtr.p->oldStoredReplicas;
+    while (loopReplicaPtr.i != RNIL) {
+      ptrCheckGuard(loopReplicaPtr, creplicaFileSize, replicaRecord);
+      if (loopReplicaPtr.p->procNode == takeOverPtr.p->toStartingNode) {
+        jam();
+	nr_start_fragment(signal, takeOverPtr, loopReplicaPtr);
+	break;
+      } else {
+        jam();
+        loopReplicaPtr.i = loopReplicaPtr.p->nextReplica;
+      }//if
+    }//while
+    takeOverPtr.p->toCurrentFragid++;
+  }//while
+  signal->theData[0] = DihContinueB::ZTO_START_FRAGMENTS;
+  signal->theData[1] = takeOverPtr.i;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+}
+
+void
+Dbdih::nr_start_fragment(Signal* signal, 
+			 TakeOverRecordPtr takeOverPtr,
+			 ReplicaRecordPtr replicaPtr)
+{
+  Uint32 i, j = 0;
+  Uint32 maxLcpId = 0;
+  Uint32 maxLcpIndex = ~0;
+  
+  Uint32 restorableGCI = 0;
+  
+  ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
+	   takeOverPtr.p->toCurrentTabref,
+	   takeOverPtr.p->toCurrentFragid,
+	   replicaPtr.p->nextLcp);
+  
+  Uint32 idx = replicaPtr.p->nextLcp;
+  for(i = 0; i<MAX_LCP_STORED; i++, idx = nextLcpNo(idx))
+  {
+    ndbout_c("scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+    if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
+    {
+      ndbrequire(replicaPtr.p->lcpId[idx] > maxLcpId);
+      Uint32 startGci = replicaPtr.p->maxGciCompleted[idx];
+      Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+      for (;j < replicaPtr.p->noCrashedReplicas; j++)
+      {
+	ndbout_c("crashed replica: %d(%d) replicaLastGci: %d",
+		 j, 
+		 replicaPtr.p->noCrashedReplicas,
+		 replicaPtr.p->replicaLastGci[j]);
+	if (replicaPtr.p->replicaLastGci[j] > stopGci)
+	{
+	  maxLcpId = replicaPtr.p->lcpId[idx];
+	  maxLcpIndex = idx;
+	  restorableGCI = replicaPtr.p->replicaLastGci[j];
+	  break;
+	}
+      }
+    }
+  }
+  
+  if (maxLcpIndex == ~ (Uint32) 0)
+  {
+    ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
+	     takeOverPtr.p->toStartingNode,
+	     takeOverPtr.p->toCurrentTabref,
+	     takeOverPtr.p->toCurrentFragid);
+    replicaPtr.p->lcpIdStarted = 0;
+    BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+    req->userPtr = 0;
+    req->userRef = reference();
+    req->lcpNo = ZNIL;
+    req->lcpId = 0;
+    req->tableId = takeOverPtr.p->toCurrentTabref;
+    req->fragId = takeOverPtr.p->toCurrentFragid;
+    req->noOfLogNodes = 0;
+    sendSignal(ref, GSN_START_FRAGREQ, signal, 
+	       StartFragReq::SignalLength, JBB);
+  }
+  else
+  {
+    ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d)
newestRestorableGCI: %d",
+	     maxLcpId,
+	     maxLcpIndex,
+	     replicaPtr.p->maxGciStarted[maxLcpIndex],
+	     replicaPtr.p->maxGciCompleted[maxLcpIndex],	     
+	     restorableGCI,
+	     SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
+	     SYSFILE->newestRestorableGCI);
+
+    replicaPtr.p->lcpIdStarted = restorableGCI;
+    BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+    req->userPtr = 0;
+    req->userRef = reference();
+    req->lcpNo = maxLcpIndex;
+    req->lcpId = maxLcpId;
+    req->tableId = takeOverPtr.p->toCurrentTabref;
+    req->fragId = takeOverPtr.p->toCurrentFragid;
+    req->noOfLogNodes = 1;
+    req->lqhLogNode[0] = takeOverPtr.p->toStartingNode;
+    req->startGci[0] = replicaPtr.p->maxGciCompleted[maxLcpIndex];
+    req->lastGci[0] = restorableGCI;
+    sendSignal(ref, GSN_START_FRAGREQ, signal, 
+	       StartFragReq::SignalLength, JBB);
+  }
+}
+
+void
+Dbdih::nr_run_redo(Signal* signal, TakeOverRecordPtr takeOverPtr)
+{
+  takeOverPtr.p->toCurrentTabref = 0;
+  takeOverPtr.p->toCurrentFragid = 0;
+  sendSTART_RECREQ(signal, takeOverPtr.p->toStartingNode);
+}
+
 void Dbdih::initStartTakeOver(const StartToReq * req, 
 			      TakeOverRecordPtr takeOverPtr)
 {
@@ -3120,6 +3309,14 @@
     /*---------------------------------------------------------------------- */
     FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);
+    Uint32 gci = 0;
+    if (takeOverPtr.p->toNodeRestart)
+    {
+      ReplicaRecordPtr replicaPtr;
+      findReplica(replicaPtr, fragPtr.p, takeOverPtr.p->toStartingNode, true);
+      gci = replicaPtr.p->lcpIdStarted;
+      replicaPtr.p->lcpIdStarted = 0;
+    }
     takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
     BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toCopyNode);
     CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
@@ -3130,6 +3327,7 @@
     copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
     copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
     copyFragReq->distributionKey = fragPtr.p->distributionKey;
+    copyFragReq->gci = gci;
     copyFragReq->nodeCount = extractNodeInfo(fragPtr.p, 
 					     copyFragReq->nodeList);
     sendSignal(ref, GSN_COPY_FRAGREQ, signal, 
@@ -3662,6 +3860,7 @@
   /*     WE ALSO COPY TO OUR OWN NODE. TO ENABLE US TO DO THIS PROPERLY WE   */
   /*     START BY CLOSING THIS FILE.                                         */
   /* ----------------------------------------------------------------------- */
+  globalData.m_restart_seq = ++SYSFILE->m_restart_seq;
   closeFile(signal, filePtr);
   filePtr.p->reqStatus = FileRecord::CLOSING_GCP;
 }//Dbdih::readingGcpLab()
@@ -3870,6 +4069,11 @@
   Uint32 newMasterId = nodeFail->masterNodeId;
   const Uint32 noOfFailedNodes = nodeFail->noOfNodes;
 
+  if (ERROR_INSERTED(7179))
+  {
+    CLEAR_ERROR_INSERT_VALUE;
+  }
+
   /*-------------------------------------------------------------------------*/
   // The first step is to convert from a bit mask to an array of failed nodes.
   /*-------------------------------------------------------------------------*/
@@ -4041,9 +4245,9 @@
     jam();
     break;
   default:
-    ndbout_c("outstanding gsn: %s(%d)", 
-	     getSignalName(c_nodeStartMaster.m_outstandingGsn), 
-	     c_nodeStartMaster.m_outstandingGsn);
+    g_eventLogger.error("outstanding gsn: %s(%d)", 
+                        getSignalName(c_nodeStartMaster.m_outstandingGsn), 
+                        c_nodeStartMaster.m_outstandingGsn);
     ndbrequire(false);
   }
   
@@ -4180,6 +4384,8 @@
 						  Uint32 takeOverPtrI)
 {
   jam();
+  ndbout_c("checkTakeOverInMasterStartNodeFailure %x",
+	   takeOverPtrI);
   if (takeOverPtrI == RNIL) {
     jam();
     return;
@@ -4193,6 +4399,9 @@
   takeOverPtr.i = takeOverPtrI;
   ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
 
+  ndbout_c("takeOverPtr.p->toMasterStatus: %x", 
+	   takeOverPtr.p->toMasterStatus);
+  
   bool ok = false;
   switch (takeOverPtr.p->toMasterStatus) {
   case TakeOverRecord::IDLE:
@@ -4301,6 +4510,13 @@
     //-----------------------------------------------------------------------
     endTakeOver(takeOverPtr.i);
     break;
+
+  case TakeOverRecord::STARTING_LOCAL_FRAGMENTS:
+    ok = true;
+    jam();
+    endTakeOver(takeOverPtr.i);
+    break;
+    
     /**
      * The following are states that it should not be possible to "be" in
      */
@@ -4484,9 +4700,10 @@
       failedNodePtr.p->activeStatus = Sysfile::NS_NotActive_NotTakenOver;
       break;
     default:
-      ndbout << "activeStatus = " << (Uint32)
failedNodePtr.p->activeStatus;
-      ndbout << " at failure after NODE_FAILREP of node = ";
-      ndbout << failedNodePtr.i << endl;
+      g_eventLogger.error("activeStatus = %u "
+                          "at failure after NODE_FAILREP of node = %u",
+                          (Uint32) failedNodePtr.p->activeStatus,
+                          failedNodePtr.i);
       ndbrequire(false);
       break;
     }//switch
@@ -4643,7 +4860,7 @@
     /**
      * Node failure during master take over...
      */
-    ndbout_c("Nodefail during master take over (old: %d)", oldNode);
+    g_eventLogger.info("Nodefail during master take over (old: %d)", oldNode);
   }
   
   NodeRecordPtr nodePtr;
@@ -4915,7 +5132,8 @@
   if (latestLcpId > SYSFILE->latestLCP_ID) {
     jam();
 #if 0
-    ndbout_c("Dbdih: Setting SYSFILE->latestLCP_ID to %d", latestLcpId);
+    g_eventLogger.info("Dbdih: Setting SYSFILE->latestLCP_ID to %d",
+                       latestLcpId);
     SYSFILE->latestLCP_ID = latestLcpId;
 #endif
     SYSFILE->keepGCI = oldestKeepGci;
@@ -5406,7 +5624,7 @@
 
   //const Uint32 lcpId = SYSFILE->latestLCP_ID;
   const bool lcpOngoingFlag = (tabPtr.p->tabLcpStatus== TabRecord::TLS_ACTIVE);
-  const bool temporary = !tabPtr.p->storedTable;
+  const bool unlogged = (tabPtr.p->tabStorage != TabRecord::ST_NORMAL);
   
   FragmentstorePtr fragPtr;
   for(Uint32 fragNo = 0; fragNo < tabPtr.p->totalfragments; fragNo++){
@@ -5427,7 +5645,7 @@
         jam();
 	found = true;
 	noOfRemovedReplicas++;
-	removeNodeFromStored(nodeId, fragPtr, replicaPtr, temporary);
+	removeNodeFromStored(nodeId, fragPtr, replicaPtr, unlogged);
 	if(replicaPtr.p->lcpOngoingFlag){
 	  jam();
 	  /**
@@ -5574,7 +5792,7 @@
 
   if (ERROR_INSERTED(7030))
   {
-    ndbout_c("Reenable GCP_PREPARE");
+    g_eventLogger.info("Reenable GCP_PREPARE");
     CLEAR_ERROR_INSERT_VALUE;
   }
   
@@ -5755,18 +5973,16 @@
     c_lcpState.setLcpStatus(LCP_STATUS_IDLE, __LINE__);
 #if 0
     if(c_copyGCISlave.m_copyReason == CopyGCIReq::LOCAL_CHECKPOINT){
-      ndbout_c("Dbdih: Also resetting c_copyGCISlave");
+      g_eventLogger.info("Dbdih: Also resetting c_copyGCISlave");
       c_copyGCISlave.m_copyReason = CopyGCIReq::IDLE;
       c_copyGCISlave.m_expectedNextWord = 0;
     }
 #endif
   }
 
-  bool ok = false;
   MasterLCPConf::State lcpState;
   switch (c_lcpState.lcpStatus) {
   case LCP_STATUS_IDLE:
-    ok = true;
     jam();
     /*------------------------------------------------*/
     /*       LOCAL CHECKPOINT IS CURRENTLY NOT ACTIVE */
@@ -5777,7 +5993,6 @@
     lcpState = MasterLCPConf::LCP_STATUS_IDLE;
     break;
   case LCP_STATUS_ACTIVE:
-    ok = true;
     jam();
     /*--------------------------------------------------*/
     /*       COPY OF RESTART INFORMATION HAS BEEN       */
@@ -5786,7 +6001,6 @@
     lcpState = MasterLCPConf::LCP_STATUS_ACTIVE;
     break;
   case LCP_TAB_COMPLETED:
-    ok = true;
     jam();
     /*--------------------------------------------------------*/
     /*       ALL LCP_REPORT'S HAVE BEEN COMPLETED FOR         */
@@ -5796,7 +6010,6 @@
     lcpState = MasterLCPConf::LCP_TAB_COMPLETED;
     break;
   case LCP_TAB_SAVED:
-    ok = true;
     jam();
     /*--------------------------------------------------------*/
     /*       ALL LCP_REPORT'S HAVE BEEN COMPLETED FOR         */
@@ -5820,15 +6033,16 @@
     break;
   case LCP_COPY_GCI:
   case LCP_INIT_TABLES:
-    ok = true;
     /**
      * These two states are handled by if statements above
      */
     ndbrequire(false);
     lcpState= MasterLCPConf::LCP_STATUS_IDLE; // remove warning
     break;
+  default:
+    ndbrequire(false);
+    lcpState= MasterLCPConf::LCP_STATUS_IDLE; // remove warning
   }//switch
-  ndbrequire(ok);
 
   Uint32 failedNodeId = c_lcpState.m_MASTER_LCPREQ_FailedNodeId;
   MasterLCPConf * const conf = (MasterLCPConf *)&signal->theData[0];
@@ -5844,7 +6058,7 @@
 
   if(c_lcpState.lcpStatus == LCP_TAB_SAVED){
 #ifdef VM_TRACE
-    ndbout_c("Sending extra GSN_LCP_COMPLETE_REP to new master");    
+    g_eventLogger.info("Sending extra GSN_LCP_COMPLETE_REP to new master");    
 #endif
     sendLCP_COMPLETE_REP(signal);
   }
@@ -6002,7 +6216,7 @@
   CRASH_INSERTION(7180);
   
 #ifdef VM_TRACE
-  ndbout_c("MASTER_LCPCONF");
+  g_eventLogger.info("MASTER_LCPCONF");
   printMASTER_LCP_CONF(stdout, &signal->theData[0], 0, 0);
 #endif  
 
@@ -6079,7 +6293,7 @@
     // protocol.
     /* --------------------------------------------------------------------- */
 #ifdef VM_TRACE
-    ndbout_c("MASTER_LCPhandling:: LMTOS_ALL_IDLE -> checkLcpStart");
+    g_eventLogger.info("MASTER_LCPhandling:: LMTOS_ALL_IDLE -> checkLcpStart");
 #endif
     checkLcpStart(signal, __LINE__);
     break;
@@ -6090,7 +6304,7 @@
     // protocol by calculating the keep gci and storing the new lcp id.
     /* --------------------------------------------------------------------- */
 #ifdef VM_TRACE
-    ndbout_c("MASTER_LCPhandling:: LMTOS_COPY_ONGOING -> storeNewLcpId");
+    g_eventLogger.info("MASTER_LCPhandling:: LMTOS_COPY_ONGOING -> storeNewLcpId");
 #endif
     if (c_lcpState.lcpStatus == LCP_STATUS_ACTIVE) {
       jam();
@@ -6101,7 +6315,7 @@
       /*---------------------------------------------------------------------*/
       Uint32 lcpId = SYSFILE->latestLCP_ID;
 #ifdef VM_TRACE
-      ndbout_c("Decreasing latestLCP_ID from %d to %d", lcpId, lcpId - 1);
+      g_eventLogger.info("Decreasing latestLCP_ID from %d to %d", lcpId, lcpId - 1);
 #endif
       SYSFILE->latestLCP_ID--;
     }//if
@@ -6118,10 +6332,10 @@
        * complete before finalising the LCP process.
        * ------------------------------------------------------------------ */
 #ifdef VM_TRACE
-      ndbout_c("MASTER_LCPhandling:: LMTOS_ALL_ACTIVE -> "
-	       "startLcpRoundLoopLab(table=%u, fragment=%u)",
-	       c_lcpMasterTakeOverState.minTableId, 
-	       c_lcpMasterTakeOverState.minFragId);
+      g_eventLogger.info("MASTER_LCPhandling:: LMTOS_ALL_ACTIVE -> "
+                         "startLcpRoundLoopLab(table=%u, fragment=%u)",
+                         c_lcpMasterTakeOverState.minTableId, 
+                         c_lcpMasterTakeOverState.minFragId);
 #endif
     
       c_lcpState.keepGci = SYSFILE->keepGCI;
@@ -6428,96 +6642,147 @@
   3.7.1   A D D   T A B L E   M A I N L Y
   ***************************************
   */
-void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
+
+static inline void inc_node_or_group(Uint32 &node, Uint32 max_node)
+{
+  Uint32 next = node + 1;
+  node = (next == max_node ? 0 : next);
+}
+
+/*
+  Spread fragments in backwards compatible mode
+*/
+static void set_default_node_groups(Signal *signal, Uint32 noFrags)
+{
+  Uint16 *node_group_array = (Uint16*)&signal->theData[25];
+  Uint32 i;
+  node_group_array[0] = 0;
+  for (i = 1; i < noFrags; i++)
+    node_group_array[i] = UNDEF_NODEGROUP;
+}
+void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal)
+{
+  Uint16 node_group_id[MAX_NDB_PARTITIONS];
   jamEntry();
   CreateFragmentationReq * const req = 
     (CreateFragmentationReq*)signal->getDataPtr();
   
   const Uint32 senderRef = req->senderRef;
   const Uint32 senderData = req->senderData;
-  const Uint32 fragmentNode = req->fragmentNode;
-  const Uint32 fragmentType = req->fragmentationType;
-  //const Uint32 fragmentCount = req->noOfFragments;
+  Uint32 noOfFragments = req->noOfFragments;
+  const Uint32 fragType = req->fragmentationType;
   const Uint32 primaryTableId = req->primaryTableId;
 
   Uint32 err = 0;
   
   do {
-    Uint32 noOfFragments = 0;
-    Uint32 noOfReplicas = cnoReplicas;
-    switch(fragmentType){
-    case DictTabInfo::AllNodesSmallTable:
-      jam();
-      noOfFragments = csystemnodes;
-      break;
-    case DictTabInfo::AllNodesMediumTable:
-      jam();
-      noOfFragments = 2 * csystemnodes;
-      break;
-    case DictTabInfo::AllNodesLargeTable:
-      jam();
-      noOfFragments = 4 * csystemnodes;
-      break;
-    case DictTabInfo::SingleFragment:
-      jam();
-      noOfFragments = 1;
-      break;
-#if 0
-    case DictTabInfo::SpecifiedFragmentCount:
-      noOfFragments = (fragmentCount == 0 ? 1 : (fragmentCount + 1)/ 2);
-      break;
-#endif
-    default:
-      jam();
-      err = CreateFragmentationRef::InvalidFragmentationType;
-      break;
-    }
-    if(err)
-      break;
-   
     NodeGroupRecordPtr NGPtr;
     TabRecordPtr primTabPtr;
+    Uint32 count = 2;
+    Uint16 noOfReplicas = cnoReplicas;
+    Uint16 *fragments = (Uint16*)(signal->theData+25);
     if (primaryTableId == RNIL) {
-      if(fragmentNode == 0){
-        jam();
-        NGPtr.i = 0; 
-	if(noOfFragments < csystemnodes)
-	{
-	  NGPtr.i = c_nextNodeGroup; 
-	  c_nextNodeGroup = (NGPtr.i + 1 == cnoOfNodeGroups ? 0 : NGPtr.i + 1);
-	}
-      } else if(! (fragmentNode < MAX_NDB_NODES)) {
-        jam();
-        err = CreateFragmentationRef::InvalidNodeId;
-      } else {
-        jam();
-        const Uint32 stat = Sysfile::getNodeStatus(fragmentNode,
-                                                   SYSFILE->nodeStatus);
-        switch (stat) {
-        case Sysfile::NS_Active:
-        case Sysfile::NS_ActiveMissed_1:
-        case Sysfile::NS_ActiveMissed_2:
-        case Sysfile::NS_TakeOver:
+      jam();
+      switch ((DictTabInfo::FragmentType)fragType)
+      {
+        /*
+          Backward compatability and for all places in code not changed.
+        */
+        case DictTabInfo::AllNodesSmallTable:
+          jam();
+          noOfFragments = csystemnodes;
+          set_default_node_groups(signal, noOfFragments);
+          break;
+        case DictTabInfo::AllNodesMediumTable:
           jam();
+          noOfFragments = 2 * csystemnodes;
+          set_default_node_groups(signal, noOfFragments);
           break;
-        case Sysfile::NS_NotActive_NotTakenOver:
+        case DictTabInfo::AllNodesLargeTable:
           jam();
+          noOfFragments = 4 * csystemnodes;
+          set_default_node_groups(signal, noOfFragments);
           break;
-        case Sysfile::NS_HotSpare:
+        case DictTabInfo::SingleFragment:
           jam();
-        case Sysfile::NS_NotDefined:
+          noOfFragments = 1;
+          set_default_node_groups(signal, noOfFragments);
+          break;
+        case DictTabInfo::DistrKeyHash:
+          jam();
+        case DictTabInfo::DistrKeyLin:
           jam();
+          if (noOfFragments == 0)
+          {
+            jam();
+            noOfFragments = csystemnodes;
+            set_default_node_groups(signal, noOfFragments);
+          }
+          break;
         default:
           jam();
-          err = CreateFragmentationRef::InvalidNodeType;
+          if (noOfFragments == 0)
+          {
+            jam();
+            err = CreateFragmentationRef::InvalidFragmentationType;
+          }
           break;
+      }
+      if (err)
+        break;
+      /*
+        When we come here the the exact partition is specified
+        and there is an array of node groups sent along as well.
+      */
+      memcpy(&node_group_id[0], &signal->theData[25], 2 * noOfFragments);
+      Uint16 next_replica_node[MAX_NDB_NODES];
+      memset(next_replica_node,0,sizeof(next_replica_node));
+      Uint32 default_node_group= c_nextNodeGroup;
+      for(Uint32 fragNo = 0; fragNo < noOfFragments; fragNo++)
+      {
+        jam();
+        NGPtr.i = node_group_id[fragNo];
+        if (NGPtr.i == UNDEF_NODEGROUP)
+        {
+          jam();
+	  NGPtr.i = default_node_group; 
         }
-        if(err)
+        if (NGPtr.i > cnoOfNodeGroups)
+        {
+          jam();
+          err = CreateFragmentationRef::InvalidNodeGroup;
           break;
-        NGPtr.i = Sysfile::getNodeGroup(fragmentNode,
-                                        SYSFILE->nodeGroups);
+        }
+        ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+        const Uint32 max = NGPtr.p->nodeCount;
+	
+	fragments[count++] = c_nextLogPart++; // Store logpart first
+	Uint32 tmp= next_replica_node[NGPtr.i];
+        for(Uint32 replicaNo = 0; replicaNo < noOfReplicas; replicaNo++)
+        {
+          jam();
+          const Uint16 nodeId = NGPtr.p->nodesInGroup[tmp];
+          fragments[count++]= nodeId;
+          inc_node_or_group(tmp, max);
+        }
+        inc_node_or_group(tmp, max);
+	next_replica_node[NGPtr.i]= tmp;
+	
+        /**
+         * Next node group for next fragment
+         */
+        inc_node_or_group(default_node_group, cnoOfNodeGroups);
+      }
+      if (err)
+      {
+        jam();
         break;
       }
+      else
+      {
+        jam();
+        c_nextNodeGroup = default_node_group;
+      }
     } else {
       if (primaryTableId >= ctabFileSize) {
         jam();
@@ -6531,48 +6796,14 @@
         err = CreateFragmentationRef::InvalidPrimaryTable;
         break;
       }
-      if (noOfFragments != primTabPtr.p->totalfragments) {
-        jam();
-        err = CreateFragmentationRef::InvalidFragmentationType;
-        break;
-      }
-    }
-    
-    Uint32 count = 2;
-    Uint16 *fragments = (Uint16*)(signal->theData+25);
-    if (primaryTableId == RNIL) {
-      jam();
-      Uint8 next_replica_node[MAX_NDB_NODES];
-      memset(next_replica_node,0,sizeof(next_replica_node));
-      for(Uint32 fragNo = 0; fragNo<noOfFragments; fragNo++){
-        jam();
-        ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);      
-        const Uint32 max = NGPtr.p->nodeCount;
-	
-	Uint32 tmp= next_replica_node[NGPtr.i];
-        for(Uint32 replicaNo = 0; replicaNo<noOfReplicas; replicaNo++)
-        {
-          jam();
-          const Uint32 nodeId = NGPtr.p->nodesInGroup[tmp++];
-          fragments[count++] = nodeId;
-          tmp = (tmp >= max ? 0 : tmp);
-        }
-	tmp++;
-	next_replica_node[NGPtr.i]= (tmp >= max ? 0 : tmp);
-	
-        /**
-         * Next node group for next fragment
-         */
-        NGPtr.i++;
-        NGPtr.i = (NGPtr.i == cnoOfNodeGroups ? 0 : NGPtr.i);
-      }
-    } else {
+      noOfFragments= primTabPtr.p->totalfragments;
       for (Uint32 fragNo = 0;
-           fragNo < primTabPtr.p->totalfragments; fragNo++) {
+           fragNo < noOfFragments; fragNo++) {
         jam();
         FragmentstorePtr fragPtr;
         ReplicaRecordPtr replicaPtr;
         getFragstore(primTabPtr.p, fragNo, fragPtr);
+	fragments[count++] = fragPtr.p->m_log_part_id;
         fragments[count++] = fragPtr.p->preferredPrimary;
         for (replicaPtr.i = fragPtr.p->storedReplicas;
              replicaPtr.i != RNIL;
@@ -6581,9 +6812,9 @@
           ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
           if (replicaPtr.p->procNode != fragPtr.p->preferredPrimary) {
             jam();
-            fragments[count++] = replicaPtr.p->procNode;
-          }//if
-        }//for
+            fragments[count++]= replicaPtr.p->procNode;
+          }
+        }
         for (replicaPtr.i = fragPtr.p->oldStoredReplicas;
              replicaPtr.i != RNIL;
              replicaPtr.i = replicaPtr.p->nextReplica) {
@@ -6591,25 +6822,32 @@
           ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
           if (replicaPtr.p->procNode != fragPtr.p->preferredPrimary) {
             jam();
-            fragments[count++] = replicaPtr.p->procNode;
-          }//if
-        }//for
+            fragments[count++]= replicaPtr.p->procNode;
+          }
+        }
       }
     }
-    ndbrequire(count == (2 + noOfReplicas * noOfFragments)); 
+    if(count != (2U + (1 + noOfReplicas) * noOfFragments)){
+        char buf[255];
+        BaseString::snprintf(buf, sizeof(buf),
+                           "Illegal configuration change: NoOfReplicas."
+                           " Can't be applied online ");
+        progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
+    }
     
     CreateFragmentationConf * const conf = 
       (CreateFragmentationConf*)signal->getDataPtrSend();
     conf->senderRef = reference();
     conf->senderData = senderData;
-    conf->noOfReplicas = noOfReplicas;
-    conf->noOfFragments = noOfFragments;
+    conf->noOfReplicas = (Uint32)noOfReplicas;
+    conf->noOfFragments = (Uint32)noOfFragments;
 
-    fragments[0] = noOfReplicas;
-    fragments[1] = noOfFragments;
+    fragments[0]= noOfReplicas;
+    fragments[1]= noOfFragments;
     
     if(senderRef != 0)
     {
+      jam();
       LinearSectionPtr ptr[3];
       ptr[0].p = (Uint32*)&fragments[0];
       ptr[0].sz = (count + 1) / 2;
@@ -6621,33 +6859,17 @@
 		 ptr,
 		 1);
     }
-    else
-    {
-      // Execute direct
-      signal->theData[0] = 0;
-    }
+    // Always ACK/NACK (here ACK)
+    signal->theData[0] = 0;
     return;
   } while(false);
-
-  if(senderRef != 0)
-  {
-    CreateFragmentationRef * const ref = 
-      (CreateFragmentationRef*)signal->getDataPtrSend();
-    ref->senderRef = reference();
-    ref->senderData = senderData;
-    ref->errorCode = err;
-    sendSignal(senderRef, GSN_CREATE_FRAGMENTATION_REF, signal, 
-	       CreateFragmentationRef::SignalLength, JBB);
-  }
-  else
-  {
-    // Execute direct
-    signal->theData[0] = err;
-  }
+  // Always ACK/NACK (here NACK)
+  signal->theData[0] = err;
 }
 
 void Dbdih::execDIADDTABREQ(Signal* signal) 
 {
+  Uint32 fragType;
   jamEntry();
 
   DiAddTabReq * const req = (DiAddTabReq*)signal->getDataPtr();
@@ -6672,6 +6894,7 @@
   ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
   tabPtr.p->connectrec = connectPtr.i;
   tabPtr.p->tableType = req->tableType;
+  fragType= req->fragType;
   tabPtr.p->schemaVersion = req->schemaVersion;
   tabPtr.p->primaryTableId = req->primaryTableId;
 
@@ -6707,15 +6930,46 @@
   /* BUT THEY DO NOT HAVE ANY INFORMATION ABOUT ANY TABLE*/
   /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
   tabPtr.p->tabStatus = TabRecord::TS_CREATING;
-  tabPtr.p->storedTable = req->storedTable;
-  tabPtr.p->method = TabRecord::HASH;
+  if(req->loggedTable)
+    tabPtr.p->tabStorage= TabRecord::ST_NORMAL;
+  else if(req->temporaryTable)
+    tabPtr.p->tabStorage= TabRecord::ST_TEMPORARY;
+  else
+    tabPtr.p->tabStorage= TabRecord::ST_NOLOGGING;
   tabPtr.p->kvalue = req->kValue;
 
+  switch ((DictTabInfo::FragmentType)fragType)
+  {
+    case DictTabInfo::AllNodesSmallTable:
+    case DictTabInfo::AllNodesMediumTable:
+    case DictTabInfo::AllNodesLargeTable:
+    case DictTabInfo::SingleFragment:
+      jam();
+    case DictTabInfo::DistrKeyLin:
+      jam();
+      tabPtr.p->method= TabRecord::LINEAR_HASH;
+      break;
+    case DictTabInfo::DistrKeyHash:
+    case DictTabInfo::DistrKeyUniqueHashIndex:
+    case DictTabInfo::DistrKeyOrderedIndex:
+      jam();
+      tabPtr.p->method= TabRecord::NORMAL_HASH;
+      break;
+    case DictTabInfo::UserDefined:
+      jam();
+      tabPtr.p->method= TabRecord::USER_DEFINED;
+      break;
+    default:
+      ndbrequire(false);
+  }
+
   union {
     Uint16 fragments[2 + MAX_FRAG_PER_NODE*MAX_REPLICAS*MAX_NDB_NODES];
     Uint32 align;
   };
   SegmentedSectionPtr fragDataPtr;
+  LINT_INIT(fragDataPtr.i);
+  LINT_INIT(fragDataPtr.sz);
   signal->getSection(fragDataPtr, DiAddTabReq::FRAGMENTATION);
   copy((Uint32*)fragments, fragDataPtr);
   releaseSections(signal);
@@ -6759,7 +7013,9 @@
     FragmentstorePtr fragPtr;
     Uint32 activeIndex = 0;
     getFragstore(tabPtr.p, fragId, fragPtr);
+    fragPtr.p->m_log_part_id = fragments[index++];
     fragPtr.p->preferredPrimary = fragments[index];
+    
     for (Uint32 i = 0; i<noReplicas; i++) {
       const Uint32 nodeId = fragments[index++];
       ReplicaRecordPtr replicaPtr;
@@ -6803,10 +7059,12 @@
 		      TabRecordPtr tabPtr, Uint32 fragId){
   jam();
   const Uint32 fragCount = tabPtr.p->totalfragments;
-  ReplicaRecordPtr replicaPtr; replicaPtr.i = RNIL;
+  ReplicaRecordPtr replicaPtr;
+  LINT_INIT(replicaPtr.p);
+  replicaPtr.i = RNIL;
+  FragmentstorePtr fragPtr;
   for(; fragId<fragCount; fragId++){
     jam();
-    FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);    
     
     replicaPtr.i = fragPtr.p->storedReplicas;
@@ -6846,7 +7104,7 @@
     ndbrequire(replicaPtr.p->procNode == getOwnNodeId());
 
     Uint32 requestInfo = 0;
-    if(!tabPtr.p->storedTable){
+    if(tabPtr.p->tabStorage != TabRecord::ST_NORMAL){
       requestInfo |= LqhFragReq::TemporaryTable;
     }
     
@@ -6864,6 +7122,7 @@
     req->nodeId = getOwnNodeId();
     req->totalFragments = fragCount;
     req->startGci = SYSFILE->newestRestorableGCI;
+    req->logPartId = fragPtr.p->m_log_part_id;
     sendSignal(DBDICT_REF, GSN_ADD_FRAGREQ, signal, 
 	       AddFragReq::SignalLength, JBB);
     return;
@@ -7145,17 +7404,40 @@
   tabPtr.i = req->tableId;
   Uint32 hashValue = req->hashValue;
   Uint32 ttabFileSize = ctabFileSize;
+  Uint32 fragId;
+  DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
   TabRecord* regTabDesc = tabRecord;
   jamEntry();
   ptrCheckGuard(tabPtr, ttabFileSize, regTabDesc);
-  Uint32 fragId = hashValue & tabPtr.p->mask;
-  ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
-  if (fragId < tabPtr.p->hashpointer) {
+  if (tabPtr.p->method == TabRecord::LINEAR_HASH)
+  {
     jam();
-    fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
-  }//if
+    fragId = hashValue & tabPtr.p->mask;
+    ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
+    if (fragId < tabPtr.p->hashpointer) {
+      jam();
+      fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
+    }//if
+  }
+  else if (tabPtr.p->method == TabRecord::NORMAL_HASH)
+  {
+    jam();
+    fragId= hashValue % tabPtr.p->totalfragments;
+  }
+  else
+  {
+    jam();
+    ndbassert(tabPtr.p->method == TabRecord::USER_DEFINED);
+    fragId= hashValue;
+    if (fragId >= tabPtr.p->totalfragments)
+    {
+      jam();
+      conf->zero= 1; //Indicate error;
+      signal->theData[1]= ZUNDEFINED_FRAGMENT_ERROR;
+      return;
+    }
+  }
   getFragstore(tabPtr.p, fragId, fragPtr);
-  DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
   Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes);
   Uint32 sig2 = (nodeCount - 1) + 
     (fragPtr.p->distributionKey << 16);
@@ -7322,39 +7604,70 @@
 
 void Dbdih::execDI_FCOUNTREQ(Signal* signal) 
 {
+  DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtr();
   ConnectRecordPtr connectPtr;
   TabRecordPtr tabPtr;
+  const BlockReference senderRef = signal->senderBlockRef();
+  const Uint32 senderData = req->m_senderData;
   jamEntry();
-  connectPtr.i = signal->theData[0];
-  tabPtr.i = signal->theData[1];
+  connectPtr.i = req->m_connectionData;
+  tabPtr.i = req->m_tableRef;
   ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
 
-  ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
+  if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+  {
+    DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+    //connectPtr.i == RNIL -> question without connect record
+    if(connectPtr.i == RNIL)
+      ref->m_connectionData = RNIL;
+    else
+    {
+      jam();
+      ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
+      ref->m_connectionData = connectPtr.p->userpointer;
+    }
+    ref->m_tableRef = tabPtr.i;
+    ref->m_senderData = senderData;
+    ref->m_error = DihFragCountRef::ErroneousTableState;
+    ref->m_tableStatus = tabPtr.p->tabStatus;
+    sendSignal(senderRef, GSN_DI_FCOUNTREF, signal, 
+               DihFragCountRef::SignalLength, JBB);
+    return;
+  }
 
   if(connectPtr.i != RNIL){
     ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
     if (connectPtr.p->connectState == ConnectRecord::INUSE) {
       jam();
-      signal->theData[0] = connectPtr.p->userpointer;
-      signal->theData[1] = tabPtr.p->totalfragments;
-      sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,2, JBB);
-      return;
-    }//if
-    signal->theData[0] = connectPtr.p->userpointer;
-    signal->theData[1] = ZERRONOUSSTATE;
-    sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal, 2, JBB);
+      DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
+      conf->m_connectionData = connectPtr.p->userpointer;
+      conf->m_tableRef = tabPtr.i;
+      conf->m_senderData = senderData;
+      conf->m_fragmentCount = tabPtr.p->totalfragments;
+      conf->m_noOfBackups = tabPtr.p->noOfBackups;
+      sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,
+                 DihFragCountConf::SignalLength, JBB);
+      return;
+    }//if
+    DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+    ref->m_connectionData = connectPtr.p->userpointer;
+    ref->m_tableRef = tabPtr.i;
+    ref->m_senderData = senderData;
+    ref->m_error = DihFragCountRef::ErroneousTableState;
+    ref->m_tableStatus = tabPtr.p->tabStatus;
+    sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal, 
+               DihFragCountRef::SignalLength, JBB);
     return;
   }//if
-
+  DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
   //connectPtr.i == RNIL -> question without connect record
-  const Uint32 senderData = signal->theData[2];
-  const BlockReference senderRef = signal->senderBlockRef();
-  signal->theData[0] = RNIL;
-  signal->theData[1] = tabPtr.p->totalfragments;
-  signal->theData[2] = tabPtr.i;
-  signal->theData[3] = senderData;
-  signal->theData[4] = tabPtr.p->noOfBackups;
-  sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal, 5, JBB);
+  conf->m_connectionData = RNIL;
+  conf->m_tableRef = tabPtr.i;
+  conf->m_senderData = senderData;
+  conf->m_fragmentCount = tabPtr.p->totalfragments;
+  conf->m_noOfBackups = tabPtr.p->noOfBackups;
+  sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal, 
+             DihFragCountConf::SignalLength, JBB);
 }//Dbdih::execDI_FCOUNTREQ()
 
 void Dbdih::execDIGETPRIMREQ(Signal* signal) 
@@ -7426,8 +7739,8 @@
         if (cgcpSameCounter == 1200) {
           jam();
 #ifdef VM_TRACE
-          ndbout << "System crash due to GCP Stop in state = ";
-          ndbout << (Uint32) cgcpStatus << endl;
+          g_eventLogger.error("System crash due to GCP Stop in state = %u",
+                              (Uint32) cgcpStatus);
 #endif
           crashSystemAtGcpStop(signal);
           return;
@@ -7440,8 +7753,8 @@
           if (cgcpSameCounter == 1200) {
             jam();
 #ifdef VM_TRACE
-            ndbout << "System crash due to GCP Stop in state = ";
-            ndbout << (Uint32) cgcpStatus << endl;
+            g_eventLogger.error("System crash due to GCP Stop in state = %u",
+                                (Uint32) cgcpStatus);
 #endif
 	    crashSystemAtGcpStop(signal);
             return;
@@ -7632,7 +7945,7 @@
      getNodeState().startLevel == NodeState::SL_STARTED){
     jam();
 #if 0
-    ndbout_c("Dbdih: Clearing initial start ongoing");
+    g_eventLogger.info("Dbdih: Clearing initial start ongoing");
 #endif
     Sysfile::clearInitialStartOngoing(SYSFILE->systemRestartBits);
   }
@@ -7651,7 +7964,7 @@
   if (ERROR_INSERTED(7030))
   {
     cgckptflag = true;
-    ndbout_c("Delayed GCP_PREPARE 5s");
+    g_eventLogger.info("Delayed GCP_PREPARE 5s");
     sendSignalWithDelay(reference(), GSN_GCP_PREPARE, signal, 5000,
 			signal->getLength());
     return;
@@ -7671,7 +7984,7 @@
 
   if (ERROR_INSERTED(7031))
   {
-    ndbout_c("Crashing delayed in GCP_PREPARE 3s");
+    g_eventLogger.info("Crashing delayed in GCP_PREPARE 3s");
     signal->theData[0] = 9999;
     sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 3000, 1);
     return;
@@ -8140,6 +8453,15 @@
   if (reason == CopyGCIReq::GLOBAL_CHECKPOINT) {
     jam();
     cgcpParticipantState = GCP_PARTICIPANT_READY;
+    
+    SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
+    rep->gci = coldgcp;
+    sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal, 
+	       SubGcpCompleteRep::SignalLength, JBB);
+
+    EXECUTE_DIRECT(LGMAN, GSN_SUB_GCP_COMPLETE_REP, signal, 
+		   SubGcpCompleteRep::SignalLength);
+    jamEntry();
   }
   
   jam();
@@ -8198,7 +8520,7 @@
      * This is LCP master takeover
      */
 #ifdef VM_TRACE
-    ndbout_c("initLcpLab aborted due to LCP master takeover - 1");
+    g_eventLogger.info("initLcpLab aborted due to LCP master takeover - 1");
 #endif
     c_lcpState.setLcpStatus(LCP_STATUS_IDLE, __LINE__);
     sendMASTER_LCPCONF(signal);
@@ -8211,7 +8533,7 @@
      * Master take over but has not yet received MASTER_LCPREQ
      */
 #ifdef VM_TRACE
-    ndbout_c("initLcpLab aborted due to LCP master takeover - 2");    
+    g_eventLogger.info("initLcpLab aborted due to LCP master takeover - 2");
 #endif
     return;
   }
@@ -8228,9 +8550,9 @@
       continue;
     }
 
-    if (tabPtr.p->storedTable == 0) {
+    if (tabPtr.p->tabStorage != TabRecord::ST_NORMAL) {
       /**
-       * Temporary table
+       * Table is not logged
        */
       jam();
       tabPtr.p->tabLcpStatus = TabRecord::TLS_COMPLETED;
@@ -8543,7 +8865,7 @@
 	    ConstPtr<ReplicaRecord> constReplicaPtr;
 	    constReplicaPtr.i = replicaPtr.i;
 	    constReplicaPtr.p = replicaPtr.p;
-	    if (tabPtr.p->storedTable == 0 ||
+	    if (tabPtr.p->tabStorage != TabRecord::ST_NORMAL ||
 		setup_create_replica(fragPtr,
 				     &createReplica, constReplicaPtr))
 	    {
@@ -8736,12 +9058,11 @@
   rf.rwfTabPtr.p->hashpointer = readPageWord(&rf);
   rf.rwfTabPtr.p->kvalue = readPageWord(&rf);
   rf.rwfTabPtr.p->mask = readPageWord(&rf);
-  ndbrequire(readPageWord(&rf) == TabRecord::HASH);
-  rf.rwfTabPtr.p->method = TabRecord::HASH;
-  /* ---------------------------------- */
-  /* Type of table, 2 = temporary table */
-  /* ---------------------------------- */
-  rf.rwfTabPtr.p->storedTable = readPageWord(&rf); 
+  rf.rwfTabPtr.p->method = (TabRecord::Method)readPageWord(&rf);
+  /* ------------- */
+  /* Type of table */
+  /* ------------- */
+  rf.rwfTabPtr.p->tabStorage = (TabRecord::Storage)(readPageWord(&rf)); 
 
   Uint32 noOfFrags = rf.rwfTabPtr.p->totalfragments;
   ndbrequire(noOfFrags > 0);
@@ -8831,8 +9152,8 @@
   writePageWord(&wf, tabPtr.p->hashpointer);
   writePageWord(&wf, tabPtr.p->kvalue);
   writePageWord(&wf, tabPtr.p->mask);
-  writePageWord(&wf, TabRecord::HASH);
-  writePageWord(&wf, tabPtr.p->storedTable);
+  writePageWord(&wf, tabPtr.p->method);
+  writePageWord(&wf, tabPtr.p->tabStorage);
 
   signal->theData[0] = DihContinueB::ZPACK_FRAG_INTO_PAGES;
   signal->theData[1] = tabPtr.i;
@@ -8932,6 +9253,80 @@
 /*****************************************************************************/
 /* **********     START FRAGMENT MODULE                          *************/
 /*****************************************************************************/
+void
+Dbdih::dump_replica_info()
+{
+  TabRecordPtr tabPtr;
+  FragmentstorePtr fragPtr;
+
+  for(tabPtr.i = 0; tabPtr.i < ctabFileSize; tabPtr.i++)
+  {
+    ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
+    if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+      continue;
+    
+    for(Uint32 fid = 0; fid<tabPtr.p->totalfragments; fid++)
+    {
+      getFragstore(tabPtr.p, fid, fragPtr);
+      ndbout_c("tab: %d frag: %d gci: %d\n  -- storedReplicas:", 
+	       tabPtr.i, fid, SYSFILE->newestRestorableGCI);
+      
+      Uint32 i;
+      ReplicaRecordPtr replicaPtr;
+      replicaPtr.i = fragPtr.p->storedReplicas;
+      for(; replicaPtr.i != RNIL; replicaPtr.i = replicaPtr.p->nextReplica)
+      {
+	ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
+	ndbout_c("  node: %d initialGci: %d nextLcp: %d noCrashedReplicas: %d",
+		 replicaPtr.p->procNode,
+		 replicaPtr.p->initialGci,
+		 replicaPtr.p->nextLcp,
+		 replicaPtr.p->noCrashedReplicas);
+	for(i = 0; i<MAX_LCP_STORED; i++)
+	{
+	  ndbout_c("    i: %d %s : lcpId: %d maxGci Completed: %d Started: %d",
+		   i, 
+		   (replicaPtr.p->lcpStatus[i] == ZVALID ?"VALID":"INVALID"),
+		   replicaPtr.p->lcpId[i],
+		   replicaPtr.p->maxGciCompleted[i],
+		   replicaPtr.p->maxGciStarted[i]);
+	}
+	
+	for (i = 0; i < 8; i++)
+	{
+	  ndbout_c("    crashed replica: %d replicaLastGci: %d createGci: %d",
+		   i, 
+		   replicaPtr.p->replicaLastGci[i],
+		   replicaPtr.p->createGci[i]);
+	}
+      }
+      ndbout_c("  -- oldStoredReplicas");
+      replicaPtr.i = fragPtr.p->oldStoredReplicas;
+      for(; replicaPtr.i != RNIL; replicaPtr.i = replicaPtr.p->nextReplica)
+      {
+	ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
+	for(i = 0; i<MAX_LCP_STORED; i++)
+	{
+	  ndbout_c("    i: %d %s : lcpId: %d maxGci Completed: %d Started: %d",
+		   i, 
+		   (replicaPtr.p->lcpStatus[i] == ZVALID ?"VALID":"INVALID"),
+		   replicaPtr.p->lcpId[i],
+		   replicaPtr.p->maxGciCompleted[i],
+		   replicaPtr.p->maxGciStarted[i]);
+	}
+	
+	for (i = 0; i < 8; i++)
+	{
+	  ndbout_c("    crashed replica: %d replicaLastGci: %d createGci: %d",
+		   i, 
+		   replicaPtr.p->replicaLastGci[i],
+		   replicaPtr.p->createGci[i]);
+	}
+      }
+    }
+  }
+}
+
 void Dbdih::startFragment(Signal* signal, Uint32 tableId, Uint32 fragId) 
 {
   Uint32 TloopCount = 0;
@@ -8963,7 +9358,7 @@
       continue;
     }
     
-    if(tabPtr.p->storedTable == 0){
+    if(tabPtr.p->tabStorage != TabRecord::ST_NORMAL){
       jam();
       TloopCount++;
       tableId++;
@@ -8993,6 +9388,7 @@
   /*     SEARCH FOR STORED REPLICAS THAT CAN BE USED TO RESTART THE SYSTEM.  */
   /* ----------------------------------------------------------------------- */
   searchStoredReplicas(fragPtr);
+
   if (cnoOfCreateReplicas == 0) {
     /* --------------------------------------------------------------------- */
     /*   THERE WERE NO STORED REPLICAS AVAILABLE THAT CAN SERVE AS REPLICA TO*/
@@ -9005,6 +9401,10 @@
     char buf[64];
     BaseString::snprintf(buf, sizeof(buf), "table: %d fragment: %d gci: %d",
 			 tableId, fragId, SYSFILE->newestRestorableGCI);
+
+    ndbout_c(buf);
+    dump_replica_info();
+    
     progError(__LINE__, NDBD_EXIT_NO_RESTORABLE_REPLICA, buf);
     ndbrequire(false);
     return;
@@ -9081,8 +9481,8 @@
     // otherwise we have a problem.
     /* --------------------------------------------------------------------- */
     jam();
-    ndbrequire(senderNodeId == c_nodeStartMaster.startNode);
-    nodeRestartStartRecConfLab(signal);
+    ndbout_c("startNextCopyFragment");
+    startNextCopyFragment(signal, findTakeOver(senderNodeId));
     return;
   } else {
     /* --------------------------------------------------------------------- */
@@ -9442,9 +9842,10 @@
 {
   CRASH_INSERTION(7009);
   if (c_lcpState.lcpStatus != LCP_STATUS_IDLE) {
-    ndbout << "lcpStatus = " << (Uint32) c_lcpState.lcpStatus;
-    ndbout << "lcpStatusUpdatedPlace = " << 
-      c_lcpState.lcpStatusUpdatedPlace << endl;
+    g_eventLogger.error("lcpStatus = %u"
+                        "lcpStatusUpdatedPlace = %d",
+                        (Uint32) c_lcpState.lcpStatus,
+                        c_lcpState.lcpStatusUpdatedPlace);
     ndbrequire(false);
     return;
   }//if
@@ -9583,7 +9984,7 @@
     }//if
     ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
     if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE || 
-	tabPtr.p->storedTable == 0) {
+	tabPtr.p->tabStorage != TabRecord::ST_NORMAL) {
       if (TloopCount > 100) {
         jam();
         signal->theData[0] = DihContinueB::ZCALCULATE_KEEP_GCI;
@@ -9943,12 +10344,42 @@
   Uint32 fragId = lcpReport->fragId;
   
   jamEntry();
+
+  if (ERROR_INSERTED(7178) && nodeId != getOwnNodeId())
+  {
+    jam();
+    Uint32 owng =Sysfile::getNodeGroup(getOwnNodeId(), SYSFILE->nodeGroups);
+    Uint32 nodeg = Sysfile::getNodeGroup(nodeId, SYSFILE->nodeGroups);
+    if (owng == nodeg)
+    {
+      jam();
+      ndbout_c("throwing away LCP_FRAG_REP from  (and killing) %d", nodeId);
+      SET_ERROR_INSERT_VALUE(7179);
+      signal->theData[0] = 9999;
+      sendSignal(numberToRef(CMVMI, nodeId), 
+		 GSN_NDB_TAMPER, signal, 1, JBA);  
+      return;
+    }
+  }
  
+  if (ERROR_INSERTED(7179) && nodeId != getOwnNodeId())
+  {
+    jam();
+    Uint32 owng =Sysfile::getNodeGroup(getOwnNodeId(), SYSFILE->nodeGroups);
+    Uint32 nodeg = Sysfile::getNodeGroup(nodeId, SYSFILE->nodeGroups);
+    if (owng == nodeg)
+    {
+      jam();
+      ndbout_c("throwing away LCP_FRAG_REP from %d", nodeId);
+      return;
+    }
+  }    
+
   CRASH_INSERTION2(7025, isMaster());
   CRASH_INSERTION2(7016, !isMaster());
-
+  
   bool fromTimeQueue = (signal->senderBlockRef() == reference());
-
+  
   TabRecordPtr tabPtr;
   tabPtr.i = tableId;
   ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
@@ -9997,9 +10428,8 @@
 
     if(tabPtr.p->tabStatus == TabRecord::TS_DROPPING){
       jam();
-      ndbout_c("TS_DROPPING - Neglecting to save Table: %d Frag: %d - ",
-	       tableId,
-	       fragId);
+      g_eventLogger.info("TS_DROPPING - Neglecting to save Table: %d Frag: %d - ",
+                         tableId, fragId);
     } else {
       jam();
       /**
@@ -10112,9 +10542,11 @@
 }
 
 void Dbdih::findReplica(ReplicaRecordPtr& replicaPtr, 
-			Fragmentstore* fragPtrP, Uint32 nodeId)
+			Fragmentstore* fragPtrP, 
+			Uint32 nodeId,
+			bool old)
 {
-  replicaPtr.i = fragPtrP->storedReplicas;
+  replicaPtr.i = old ? fragPtrP->oldStoredReplicas : fragPtrP->storedReplicas;
   while(replicaPtr.i != RNIL){
     ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
     if (replicaPtr.p->procNode == nodeId) {
@@ -10127,7 +10559,7 @@
   };
 
 #ifdef VM_TRACE
-  ndbout_c("Fragment Replica(node=%d) not found", nodeId);
+  g_eventLogger.info("Fragment Replica(node=%d) not found", nodeId);
   replicaPtr.i = fragPtrP->oldStoredReplicas;
   while(replicaPtr.i != RNIL){
     ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
@@ -10140,14 +10572,45 @@
     }//if
   };
   if(replicaPtr.i != RNIL){
-    ndbout_c("...But was found in oldStoredReplicas");
+    g_eventLogger.info("...But was found in oldStoredReplicas");
   } else {
-    ndbout_c("...And wasn't found in oldStoredReplicas");
+    g_eventLogger.info("...And wasn't found in oldStoredReplicas");
   }
 #endif
   ndbrequire(false);
 }//Dbdih::findReplica()
 
+
+int
+Dbdih::handle_invalid_lcp_no(const LcpFragRep* rep, 
+			     ReplicaRecordPtr replicaPtr)
+{
+  ndbrequire(!isMaster());
+  Uint32 lcpNo = rep->lcpNo;
+  Uint32 lcpId = rep->lcpId;
+  Uint32 replicaLcpNo = replicaPtr.p->nextLcp;
+  Uint32 prevReplicaLcpNo = prevLcpNo(replicaLcpNo);
+
+  warningEvent("Detected previous node failure of %d during lcp",
+	       rep->nodeId);
+  replicaPtr.p->nextLcp = lcpNo;
+  replicaPtr.p->lcpId[lcpNo] = 0;
+  replicaPtr.p->lcpStatus[lcpNo] = ZINVALID;
+  
+  for (Uint32 i = lcpNo; i != lcpNo; i = nextLcpNo(i))
+  {
+    jam();
+    if (replicaPtr.p->lcpStatus[i] == ZVALID &&
+	replicaPtr.p->lcpId[i] >= lcpId)
+    {
+      ndbout_c("i: %d lcpId: %d", i, replicaPtr.p->lcpId[i]);
+      ndbrequire(false);
+    }
+  }
+
+  return 0;
+}
+
 /**
  * Return true  if table is all fragment replicas have been checkpointed
  *                 to disk (in all LQHs)
@@ -10176,9 +10639,12 @@
   
   ndbrequire(replicaPtr.p->lcpOngoingFlag == true);
   if(lcpNo != replicaPtr.p->nextLcp){
-    ndbout_c("lcpNo = %d replicaPtr.p->nextLcp = %d", 
-	     lcpNo, replicaPtr.p->nextLcp);
-    ndbrequire(false);
+    if (handle_invalid_lcp_no(lcpReport, replicaPtr))
+    {
+      g_eventLogger.error("lcpNo = %d replicaPtr.p->nextLcp = %d",
+                          lcpNo, replicaPtr.p->nextLcp);
+      ndbrequire(false);
+    }
   }
   ndbrequire(lcpNo == replicaPtr.p->nextLcp);
   ndbrequire(lcpNo < MAX_LCP_STORED);
@@ -10212,7 +10678,7 @@
       // Not all fragments in table have been checkpointed.
       /* ----------------------------------------------------------------- */
       if(0)
-	ndbout_c("reportLcpCompletion: fragment %d not ready", fid);
+	g_eventLogger.info("reportLcpCompletion: fragment %d not ready", fid);
       return false;
     }//if
   }//for
@@ -10340,7 +10806,7 @@
   jamEntry();
 
 #if 0
-  ndbout_c("LCP_COMPLETE_REP"); 
+  g_eventLogger.info("LCP_COMPLETE_REP"); 
   printLCP_COMPLETE_REP(stdout, 
 			signal->getDataPtr(),
 			signal->length(), number());
@@ -10426,7 +10892,7 @@
   if(c_lcpMasterTakeOverState.state != LMTOS_IDLE){
     jam();
 #ifdef VM_TRACE
-    ndbout_c("Exiting from allNodesLcpCompletedLab");
+    g_eventLogger.info("Exiting from allNodesLcpCompletedLab");
 #endif
     return;
   }
@@ -10522,6 +10988,14 @@
 /* ------------------------------------------------------------------------- */
 void Dbdih::tableUpdateLab(Signal* signal, TabRecordPtr tabPtr) {
   FileRecordPtr filePtr;
+  if(tabPtr.p->tabStorage == TabRecord::ST_TEMPORARY) {
+    // For temporary tables we do not write to disk. Mark both copies 0 and 1
+    // as done, and go straight to the after-close code.
+    filePtr.i = tabPtr.p->tabFile[1];
+    ptrCheckGuard(filePtr, cfileFileSize, fileRecord);
+    tableCloseLab(signal, filePtr);
+    return;
+  }
   filePtr.i = tabPtr.p->tabFile[0];
   ptrCheckGuard(filePtr, cfileFileSize, fileRecord);
   createFileRw(signal, filePtr);
@@ -10655,14 +11129,14 @@
     
     infoEvent("Detected GCP stop...sending kill to %s", 
 	      c_GCP_SAVEREQ_Counter.getText());
-    ndbout_c("Detected GCP stop...sending kill to %s", 
-	     c_GCP_SAVEREQ_Counter.getText());
+    g_eventLogger.error("Detected GCP stop...sending kill to %s", 
+                        c_GCP_SAVEREQ_Counter.getText());
     return;
   }
   case GCP_SAVE_LQH_FINISHED:
-    ndbout_c("m_copyReason: %d m_waiting: %d",
-	     c_copyGCIMaster.m_copyReason,
-	     c_copyGCIMaster.m_waiting);
+    g_eventLogger.error("m_copyReason: %d m_waiting: %d",
+                        c_copyGCIMaster.m_copyReason,
+                        c_copyGCIMaster.m_waiting);
     break;
   case GCP_READY: // shut up lint
   case GCP_PREPARE_SENT:
@@ -10670,11 +11144,11 @@
     break;
   }
   
-  ndbout_c("c_copyGCISlave: sender{Data, Ref} %d %x reason: %d nextWord: %d",
-	   c_copyGCISlave.m_senderData,
-	   c_copyGCISlave.m_senderRef,
-	   c_copyGCISlave.m_copyReason,
-	   c_copyGCISlave.m_expectedNextWord);
+  g_eventLogger.error("c_copyGCISlave: sender{Data, Ref} %d %x reason: %d nextWord: %d",
+                      c_copyGCISlave.m_senderData,
+                      c_copyGCISlave.m_senderRef,
+                      c_copyGCISlave.m_copyReason,
+                      c_copyGCISlave.m_expectedNextWord);
 
   FileRecordPtr file0Ptr;
   file0Ptr.i = crestartInfoFile[0];
@@ -11140,6 +11614,7 @@
 {
   ConstPtr<ReplicaRecord> fblFoundReplicaPtr;
   ConstPtr<ReplicaRecord> fblReplicaPtr;
+  LINT_INIT(fblFoundReplicaPtr.p);
   
   /* --------------------------------------------------------------------- */
   /*       WE START WITH ZERO AS FOUND TO ENSURE THAT FIRST HIT WILL BE    */
@@ -11382,6 +11857,7 @@
   cnoHotSpare = 0;
   cnoOfActiveTables = 0;
   cnoOfNodeGroups = 0;
+  c_nextNodeGroup = 0;
   cnoReplicas = 0;
   coldgcp = 0;
   coldGcpId = 0;
@@ -11401,6 +11877,7 @@
   c_newest_restorable_gci = 0;
   cverifyQueueCounter = 0;
   cwaitLcpSr = false;
+  c_nextLogPart = 0;
 
   nodeResetStart();
   c_nodeStartMaster.wait = ZFALSE;
@@ -11408,7 +11885,7 @@
   memset(&sysfileData[0], 0, sizeof(sysfileData));
 
   const ndb_mgm_configuration_iterator * p = 
-    theConfiguration.getOwnConfigIterator();
+    m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
   
   c_lcpState.clcpDelay = 20;
@@ -11487,6 +11964,8 @@
     SYSFILE->takeOver[i] = 0;
   }//for
   Sysfile::setInitialStartOngoing(SYSFILE->systemRestartBits);
+  srand(time(0));
+  globalData.m_restart_seq = SYSFILE->m_restart_seq = 0;
 }//Dbdih::initRestartInfo()
 
 /*--------------------------------------------------------------------*/
@@ -11553,7 +12032,7 @@
   tabPtr.p->kvalue = 0;
   tabPtr.p->hashpointer = (Uint32)-1;
   tabPtr.p->mask = 0;
-  tabPtr.p->storedTable = 1;
+  tabPtr.p->tabStorage = TabRecord::ST_NORMAL;
   tabPtr.p->tabErrorCode = 0;
   tabPtr.p->schemaVersion = (Uint32)-1;
   tabPtr.p->tabRemoveNode = RNIL;
@@ -12237,6 +12716,8 @@
     jam();
     fragPtr.p->distributionKey = TdistKey;
   }//if
+
+  fragPtr.p->m_log_part_id = readPageWord(rf);
 }//Dbdih::readFragment()
 
 Uint32 Dbdih::readPageWord(RWFragment* rf) 
@@ -12877,9 +13358,9 @@
       nodePtr.i = getOwnNodeId();
       ptrAss(nodePtr, nodeRecord);
       ndbrequire(nodePtr.p->activeStatus == Sysfile::NS_Active);
-      ndbout_c("NR: setLcpActiveStatusEnd - m_participatingLQH");
+      g_eventLogger.info("NR: setLcpActiveStatusEnd - m_participatingLQH");
     } else {
-      ndbout_c("NR: setLcpActiveStatusEnd - !m_participatingLQH");
+      g_eventLogger.info("NR: setLcpActiveStatusEnd - !m_participatingLQH");
     }
   }
   
@@ -13333,6 +13814,7 @@
   writePageWord(wf, fragPtr.p->noStoredReplicas);
   writePageWord(wf, fragPtr.p->noOldStoredReplicas);
   writePageWord(wf, fragPtr.p->distributionKey);
+  writePageWord(wf, fragPtr.p->m_log_part_id);
 }//Dbdih::writeFragment()
 
 void Dbdih::writePageWord(RWFragment* wf, Uint32 dataWord)
@@ -13399,7 +13881,7 @@
   signal->theData[0] = filePtr.p->fileRef;
   signal->theData[1] = reference();
   signal->theData[2] = filePtr.i;
-  signal->theData[3] = ZLIST_OF_PAIRS;
+  signal->theData[3] = ZLIST_OF_PAIRS_SYNCH;
   signal->theData[4] = ZVAR_NO_WORD;
   signal->theData[5] = tab->noPages;
   for (Uint32 i = 0; i < tab->noPages; i++) {
@@ -13710,8 +14192,8 @@
   }
 
   if(arg == DumpStateOrd::EnableUndoDelayDataWrite){
-    ndbout << "Dbdih:: delay write of datapages for table = " 
-	   << dumpState->args[1]<< endl;
+    g_eventLogger.info("Dbdih:: delay write of datapages for table = %d", 
+                       dumpState->args[1]);
     // Send this dump to ACC and TUP
     EXECUTE_DIRECT(DBACC, GSN_DUMP_STATE_ORD, signal, 2);
     EXECUTE_DIRECT(DBTUP, GSN_DUMP_STATE_ORD, signal, 2);
@@ -13728,13 +14210,13 @@
   }//if
   if (signal->theData[0] == DumpStateOrd::DihMinTimeBetweenLCP) {
     // Set time between LCP to min value
-    ndbout << "Set time between LCP to min value" << endl;
+    g_eventLogger.info("Set time between LCP to min value");
     c_lcpState.clcpDelay = 0; // TimeBetweenLocalCheckpoints.min
     return;
   }
   if (signal->theData[0] == DumpStateOrd::DihMaxTimeBetweenLCP) {
     // Set time between LCP to max value
-    ndbout << "Set time between LCP to max value" << endl;
+    g_eventLogger.info("Set time between LCP to max value");
     c_lcpState.clcpDelay = 31; // TimeBetweenLocalCheckpoints.max
     return;
   }
@@ -13762,7 +14244,7 @@
     if (signal->getLength() == 1)
     {
       const ndb_mgm_configuration_iterator * p = 
-	theConfiguration.getOwnConfigIterator();
+	m_ctx.m_config.getOwnConfigIterator();
       ndbrequire(p != 0);
       ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &cgcpDelay);
     }
@@ -13770,7 +14252,7 @@
     {
       cgcpDelay = signal->theData[1];
     }
-    ndbout_c("Setting time between gcp : %d", cgcpDelay);
+    g_eventLogger.info("Setting time between gcp : %d", cgcpDelay);
   }
 
   if (arg == 7021 && signal->getLength() == 2)
@@ -13893,7 +14375,7 @@
 	while(index < count){
 	  if(nodePtr.p->queuedChkpt[index].tableId == tabPtr.i){
 	    jam();
-	    //	    ndbout_c("Unqueuing %d", index);
+	    //	    g_eventLogger.info("Unqueuing %d", index);
 	    
 	    count--;
 	    for(Uint32 i = index; i<count; i++){
@@ -13933,7 +14415,7 @@
       if(checkLcpAllTablesDoneInLqh()){
 	jam();
 	
-	ndbout_c("This is the last table");
+	g_eventLogger.info("This is the last table");
 	
 	/**
 	 * Then check if saving of tab info is done for all tables
@@ -13942,7 +14424,7 @@
 	checkLcpCompletedLab(signal);
 	
 	if(a != c_lcpState.lcpStatus){
-	  ndbout_c("And all tables are written to already written disk");
+	  g_eventLogger.info("And all tables are written to already written disk");
 	}
       }
       break;
@@ -14887,13 +15369,14 @@
   {
     Uint32 masterVersion = getNodeInfo(cmasterNodeId).m_version;
 
-    unsigned int get_major = getMajor(masterVersion);
-    unsigned int get_minor = getMinor(masterVersion);
-    unsigned int get_build = getBuild(masterVersion);
-
-    ndbrequire(get_major == 4 || get_major == 5);
+    const unsigned int get_major = getMajor(masterVersion);
+    const unsigned int get_minor = getMinor(masterVersion);
+    const unsigned int get_build = getBuild(masterVersion);
+    ndbrequire(get_major >= 4);
 
     if (masterVersion < NDBD_DICT_LOCK_VERSION_5 ||
+        masterVersion < NDBD_DICT_LOCK_VERSION_5_1 &&
+          get_major == 5 && get_minor == 1 ||
         ERROR_INSERTED(7176)) {
       jam();
 
@@ -14964,10 +15447,13 @@
   {
     Uint32 masterVersion = getNodeInfo(cmasterNodeId).m_version;
 
-    unsigned int get_major = getMajor(masterVersion);
-    ndbrequire(get_major == 4 || get_major == 5);
+    const unsigned int get_major = getMajor(masterVersion);
+    const unsigned int get_minor = getMinor(masterVersion);
+    ndbrequire(get_major >= 4);
 
     if (masterVersion < NDBD_DICT_LOCK_VERSION_5 ||
+        masterVersion < NDBD_DICT_LOCK_VERSION_5_1 &&
+          get_major == 5 && get_minor == 1 ||
         ERROR_INSERTED(7176)) {
       return;
     }

--- 1.26.13.1/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2007-03-21 15:35:27 +01:00
+++ 1.47/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2007-03-21 15:35:27 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -139,7 +138,6 @@
 
 #define ZNOT_FOUND 626
 #define ZALREADYEXIST 630
-#define ZINCONSISTENTHASHINDEX 892
 #define ZNOTUNIQUE 893
 
 #define ZINVALID_KEY 290
@@ -389,6 +387,13 @@
     Uint32 fireingOperation;
 
     /**
+     * The fragment id of the firing operation. This will be appended
+     * to the Primary Key such that the record can be found even in the
+     * case of user defined partitioning.
+     */
+    Uint32 fragId;
+
+    /**
      * Used for scrapping in case of node failure
      */
     Uint32 nodeId;
@@ -526,7 +531,7 @@
   /**
    * The list of defined indexes
    */  
-  ArrayList<TcIndexData> c_theIndexes;
+  DLList<TcIndexData> c_theIndexes;
   UintR c_maxNumberOfIndexes;
 
   struct TcIndexOperation {
@@ -732,7 +737,7 @@
     UintR accumulatingIndexOp;
     UintR executingIndexOp;
     UintR tcIndxSendArray[6];
-    ArrayList<TcIndexOperation> theSeizedIndexOperations;
+    DLList<TcIndexOperation> theSeizedIndexOperations;
   };
   
   typedef Ptr<ApiConnectRecord> ApiConnectRecordPtr;
@@ -867,7 +872,7 @@
     
     Uint8  distributionKeyIndicator;
     Uint8  m_special_hash; // collation or distribution key
-    Uint8  unused2;
+    Uint8  m_no_disk_flag;
     Uint8  lenAiInTckeyreq;  /* LENGTH OF ATTRIBUTE INFORMATION IN TCKEYREQ */
 
     Uint8  fragmentDistributionKey;  /* DIH generation no */
@@ -878,11 +883,7 @@
      */
     Uint8  opExec;     
 
-    /** 
-     * LOCK TYPE OF OPERATION IF READ OPERATION
-     * 0 = READ LOCK, 1 = WRITE LOCK                    
-     */
-    Uint8  opLock;     
+    Uint8  unused;
 
     /** 
      * IS THE OPERATION A SIMPLE TRANSACTION            
@@ -942,8 +943,7 @@
       NF_CHECK_SCAN        = 0x2,
       NF_CHECK_TRANSACTION = 0x4,
       NF_CHECK_DROP_TAB    = 0x8,
-      NF_NODE_FAIL_BITS    = 0xF, // All bits...
-      NF_STARTED           = 0x10
+      NF_NODE_FAIL_BITS    = 0xF // All bits...
     };
     Uint32 m_nf_bits;
     NdbNodeBitmask m_lqh_trans_conf;
@@ -967,7 +967,8 @@
     Uint8 noOfKeyAttr;
     Uint8 hasCharAttr;
     Uint8 noOfDistrKeys;
-    
+    Uint8 hasVarKeys;
+
     bool checkTable(Uint32 schemaVersion) const {
       return enabled && !dropping && 
 	(table_version_major(schemaVersion) == table_version_major(currentSchemaVersion));
@@ -1262,7 +1263,7 @@
   typedef Ptr<TcFailRecord> TcFailRecordPtr;
 
 public:
-  Dbtc(const class Configuration &);
+  Dbtc(Block_context&);
   virtual ~Dbtc();
 
 private:
@@ -1282,7 +1283,7 @@
   void execLQHKEYREF(Signal* signal);
   void execTRANSID_AI_R(Signal* signal);
   void execKEYINFO20_R(Signal* signal);
-
+  void execROUTE_ORD(Signal* signal);
   // Received signals
   void execDUMP_STATE_ORD(Signal* signal);
   void execSEND_PACKED(Signal* signal);
@@ -1320,7 +1321,6 @@
   void execCOMMITCONF(Signal* signal);
   void execABORTCONF(Signal* signal);
   void execNODE_FAILREP(Signal* signal);
-  void execNODE_START_REP(Signal* signal);
   void execINCL_NODEREQ(Signal* signal);
   void execTIME_SIGNAL(Signal* signal);
   void execAPI_FAILREQ(Signal* signal);

--- 1.73.35.1/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-03-21 15:35:27 +01:00
+++ 1.133/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-03-21 15:35:27 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -38,6 +37,7 @@
 #include <signaldata/TcContinueB.hpp>
 #include <signaldata/TcKeyFailConf.hpp>
 #include <signaldata/AbortAll.hpp>
+#include <signaldata/DihFragCount.hpp>
 #include <signaldata/ScanFrag.hpp>
 #include <signaldata/ScanTab.hpp>
 #include <signaldata/PrepDropTab.hpp>
@@ -71,6 +71,8 @@
 #include <NdbOut.hpp>
 #include <DebuggerNames.hpp>
 
+#include <signaldata/RouteOrd.hpp>
+
 // Use DEBUG to print messages that should be
 // seen only when we debug the product
 #ifdef VM_TRACE
@@ -344,7 +346,7 @@
   tabptr.p->noOfKeyAttr = desc->noOfKeyAttr;
   tabptr.p->hasCharAttr = desc->hasCharAttr;
   tabptr.p->noOfDistrKeys = desc->noOfDistrKeys;
-  
+  tabptr.p->hasVarKeys = desc->noOfVarKeys > 0;
   signal->theData[0] = tabptr.i;
   signal->theData[1] = retPtr;
   sendSignal(retRef, GSN_TC_SCHVERCONF, signal, 2, JBB);
@@ -610,7 +612,7 @@
   jamEntry();
   
   const ndb_mgm_configuration_iterator * p = 
-    theConfiguration.getOwnConfigIterator();
+    m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
   
   initData();
@@ -1294,6 +1296,7 @@
 	   apiConnectptr.p->firstTcConnect == RNIL))
       {
         jam();                                   /* JUST REPLY OK */
+	apiConnectptr.p->m_transaction_nodes.clear();
         releaseApiCon(signal, apiConnectptr.i);
         signal->theData[0] = tuserpointer;
         sendSignal(tapiBlockref,
@@ -2296,14 +2299,15 @@
 {
   Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY];
   const TableRecord* tabPtrP = &tableRecord[tabPtrI];
+  const bool hasVarKeys = tabPtrP->hasVarKeys;
   const bool hasCharAttr = tabPtrP->hasCharAttr;
-  const bool hasDistKeys = tabPtrP->noOfDistrKeys > 0;
+  const bool compute_distkey = distr && (tabPtrP->noOfDistrKeys > 0);
   
   Uint32 *dst = (Uint32*)Tmp;
   Uint32 dstPos = 0;
   Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
   Uint32 * keyPartLenPtr;
-  if(hasCharAttr)
+  if(hasCharAttr || (compute_distkey && hasVarKeys))
   {
     keyPartLenPtr = keyPartLen;
     dstPos = xfrm_key(tabPtrI, src, dst, sizeof(Tmp) >> 2, keyPartLenPtr);
@@ -2321,7 +2325,7 @@
   
   md5_hash(dstHash, (Uint64*)dst, dstPos);
   
-  if(distr && hasDistKeys)
+  if(compute_distkey)
   {
     jam();
     
@@ -2730,12 +2734,14 @@
   Uint8 TDirtyFlag          = tcKeyReq->getDirtyFlag(Treqinfo);
   Uint8 TInterpretedFlag    = tcKeyReq->getInterpretedFlag(Treqinfo);
   Uint8 TDistrKeyFlag       = tcKeyReq->getDistributionKeyFlag(Treqinfo);
+  Uint8 TNoDiskFlag         = TcKeyReq::getNoDiskFlag(Treqinfo);
   Uint8 TexecuteFlag        = TexecFlag;
   
   regCachePtr->opSimple = TSimpleFlag;
   regCachePtr->opExec   = TInterpretedFlag;
   regTcPtr->dirtyOp  = TDirtyFlag;
   regCachePtr->distributionKeyIndicator = TDistrKeyFlag;
+  regCachePtr->m_no_disk_flag = TNoDiskFlag;
 
   //-------------------------------------------------------------
   // The next step is to read the upto three conditional words.
@@ -2799,17 +2805,9 @@
   regCachePtr->attrinfo15[2] = Tdata4;
   regCachePtr->attrinfo15[3] = Tdata5;
 
-  if (TOperationType == ZREAD) {
+  if (TOperationType == ZREAD || TOperationType == ZREAD_EX) {
     Uint32 TreadCount = c_counters.creadCount;
     jam();
-    regCachePtr->opLock = 0;
-    c_counters.creadCount = TreadCount + 1;
-  } else if(TOperationType == ZREAD_EX){
-    Uint32 TreadCount = c_counters.creadCount;
-    jam();
-    TOperationType = ZREAD;
-    regTcPtr->operation = ZREAD;
-    regCachePtr->opLock = ZUPDATE;
     c_counters.creadCount = TreadCount + 1;
   } else {
     if(regApiPtr->commitAckMarker == RNIL){
@@ -2843,24 +2841,10 @@
     c_counters.cwriteCount = TwriteCount + 1;
     switch (TOperationType) {
     case ZUPDATE:
-      jam();
-      if (TattrLen == 0) {
-        //TCKEY_abort(signal, 5);
-        //return;
-      }//if
-      /*---------------------------------------------------------------------*/
-      // The missing break is intentional since we also want to set the opLock 
-      // variable also for updates
-      /*---------------------------------------------------------------------*/
     case ZINSERT:
     case ZDELETE:
-      jam();      
-      regCachePtr->opLock = TOperationType;
-      break;
     case ZWRITE:
       jam();
-      // A write operation is originally an insert operation.
-      regCachePtr->opLock = ZINSERT;  
       break;
     default:
       TCKEY_abort(signal, 9);
@@ -3035,7 +3019,7 @@
   tnoOfStandby = (tnodeinfo >> 8) & 3;
  
   regCachePtr->fragmentDistributionKey = (tnodeinfo >> 16) & 255;
-  if (Toperation == ZREAD) {
+  if (Toperation == ZREAD || Toperation == ZREAD_EX) {
     if (Tdirty == 1) {
       jam();
       /*-------------------------------------------------------------*/
@@ -3068,28 +3052,7 @@
 	  }//if
 	}//for
       }
-
-      if (regTcPtr->tcNodedata[0] != getOwnNodeId())
-      {
-	jam();
-	for (Uint32 i = 0; i < tnoOfBackup + 1; i++)
-	{
-	  HostRecordPtr hostPtr;
-	  hostPtr.i = regTcPtr->tcNodedata[i];
-	  ptrCheckGuard(hostPtr, chostFilesize, hostRecord);
-	  if (hostPtr.p->m_nf_bits & HostRecord::NF_STARTED)
-	  {
-	    jam();
-	    if (i != 0)
-	    {
-	      jam();
-	      regTcPtr->tcNodedata[0] = hostPtr.i;
-	    }
-	    break;
-	  }
-	}
-      }//if
-    }
+    }//if
     jam();
     regTcPtr->lastReplicaNo = 0;
     regTcPtr->noOfNodes = 1;
@@ -3185,6 +3148,7 @@
   TcConnectRecord * const regTcPtr = tcConnectptr.p;
   ApiConnectRecord * const regApiPtr = apiConnectptr.p;
   CacheRecord * const regCachePtr = cachePtr.p;
+  Uint32 version = getNodeInfo(refToNode(TBRef)).m_version;
   UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
 #ifdef ERROR_INSERT
   if (ERROR_INSERTED(8002)) {
@@ -3228,7 +3192,12 @@
   bool simpleRead = (sig1 == ZREAD && sig0 == ZTRUE);
   LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
   LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
-  LqhKeyReq::setLockType(Tdata10, regCachePtr->opLock);
+  if (unlikely(version < NDBD_ROWID_VERSION))
+  {
+    Uint32 op = regTcPtr->operation;
+    Uint32 lock = (Operation_t) op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ?
ZINSERT : (Operation_t) op;
+    LqhKeyReq::setLockType(Tdata10, lock);
+  }
   /* ---------------------------------------------------------------------- */
   // Indicate Application Reference is present in bit 15
   /* ---------------------------------------------------------------------- */
@@ -3237,6 +3206,8 @@
   LqhKeyReq::setInterpretedFlag(Tdata10, regCachePtr->opExec);
   LqhKeyReq::setSimpleFlag(Tdata10, sig0);
   LqhKeyReq::setOperation(Tdata10, sig1);
+  LqhKeyReq::setNoDiskFlag(Tdata10, regCachePtr->m_no_disk_flag);
+
   /* ----------------------------------------------------------------------- 
    * Sequential Number of first LQH = 0, bit 22-23                           
    * IF ATTRIBUTE INFORMATION IS SENT IN TCKEYREQ,
@@ -3954,7 +3925,7 @@
   const UintR TopWords = (UintR)regApiPtr->tckeyrec;
   localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
   const Uint32 type = getNodeInfo(localHostptr.i).m_type;
-  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
   const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
   const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1;
   ptrAss(localHostptr, hostRecord);
@@ -4636,7 +4607,8 @@
     commitConf->transId1 = regApiPtr->transid[0];
     commitConf->transId2 = regApiPtr->transid[1];
     commitConf->gci = regApiPtr->globalcheckpointid;
-    sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal, 
+
+    sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
 	       TcCommitConf::SignalLength, JBB);
   } else if (regApiPtr->returnsignal == RS_NO_RETURN) {
     jam();
@@ -4826,13 +4798,14 @@
   key.transid2 = signal->theData[1];
 
   CommitAckMarkerPtr removedMarker;
-  m_commitAckMarkerHash.release(removedMarker, key);
+  m_commitAckMarkerHash.remove(removedMarker, key);
   if (removedMarker.i == RNIL) {
     jam();
     warningHandlerLab(signal, __LINE__);
     return;
   }//if
   sendRemoveMarkers(signal, removedMarker.p);
+  m_commitAckMarkerPool.release(removedMarker);
 }
 
 void
@@ -5177,6 +5150,19 @@
 	return;
       }
       
+      /* Only ref in certain situations */
+      {
+	const Uint32 opType = regTcPtr->operation;
+	if (   (opType == ZDELETE && errCode != ZNOT_FOUND)
+	    || (opType == ZINSERT && errCode != ZALREADYEXIST)
+	    || (opType == ZUPDATE && errCode != ZNOT_FOUND)
+	    || (opType == ZWRITE  && errCode != 839 && errCode != 840))
+	{
+	  TCKEY_abort(signal, 49);
+	  return;
+	}
+      }
+
       /* *************** */
       /*    TCKEYREF   < */
       /* *************** */
@@ -7027,19 +7013,6 @@
 }//Dbtc::execNODE_FAILREP()
 
 void
-Dbtc::execNODE_START_REP(Signal* signal)
-{
-  Uint32 nodeId = signal->theData[0];
-  hostptr.i = nodeId;
-  ptrCheckGuard(hostptr, chostFilesize, hostRecord);
-  if (hostptr.p->m_nf_bits == 0)
-  {
-    jam();
-    hostptr.p->m_nf_bits |= HostRecord::NF_STARTED;
-  }
-}
-
-void
 Dbtc::checkNodeFailComplete(Signal* signal, 
 			    Uint32 failedNodeId,
 			    Uint32 bit)
@@ -7130,7 +7103,7 @@
     if (transPtr.p->m_transaction_nodes.get(failedNodeId))
     {
       jam();
-
+      
       // Force timeout regardless of state      
       c_appl_timeout_value = 1;
       setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
@@ -8829,6 +8802,7 @@
   ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
   ScanFragReq::setTupScanFlag(tmp, ScanTabReq::getTupScanFlag(ri));
   ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
+  ScanFragReq::setNoDiskFlag(tmp, ScanTabReq::getNoDiskFlag(ri));
   
   scanptr.p->scanRequestInfo = tmp;
   scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
@@ -8950,9 +8924,11 @@
      * THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED. 
      * WE MUST FIRST GET THE NUMBER OF  FRAGMENTS IN THE TABLE.
      ***************************************************/
-    signal->theData[0] = tcConnectptr.p->dihConnectptr;
-    signal->theData[1] = scanptr.p->scanTableref;
-    sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB);
+    DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+    req->m_connectionData = tcConnectptr.p->dihConnectptr;
+    req->m_tableRef = scanptr.p->scanTableref;
+    sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 
+               DihFragCountReq::SignalLength, JBB);
   }
   else 
   {
@@ -8963,17 +8939,18 @@
     UintR TerrorIndicator = signal->theData[0];
     jamEntry();
     if (TerrorIndicator != 0) {
-      signal->theData[0] = tcConnectptr.i;
-      //signal->theData[1] Contains error
+      DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+      ref->m_connectionData = tcConnectptr.i;
+      ref->m_error = signal->theData[1];
       execDI_FCOUNTREF(signal);
       return;
     }
     
     UintR Tdata1 = signal->theData[1];
     scanptr.p->scanNextFragId = Tdata1;
-
-    signal->theData[0] = tcConnectptr.i;
-    signal->theData[1] = 1; // Frag count
+    DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+    conf->m_connectionData = tcConnectptr.i;
+    conf->m_fragmentCount = 1; // Frag count
     execDI_FCOUNTCONF(signal);
   }
   return;
@@ -8991,8 +8968,9 @@
 void Dbtc::execDI_FCOUNTCONF(Signal* signal) 
 {
   jamEntry();
-  tcConnectptr.i = signal->theData[0];
-  Uint32 tfragCount = signal->theData[1];
+  DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+  tcConnectptr.i = conf->m_connectionData;
+  Uint32 tfragCount = conf->m_fragmentCount;
   ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
   apiConnectptr.i = tcConnectptr.p->apiConnect;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -9076,9 +9054,10 @@
 void Dbtc::execDI_FCOUNTREF(Signal* signal) 
 {
   jamEntry();
-  tcConnectptr.i = signal->theData[0];
+  DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+  tcConnectptr.i = ref->m_connectionData;
   ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
-  const Uint32 errCode = signal->theData[1];
+  const Uint32 errCode = ref->m_error;
   apiConnectptr.i = tcConnectptr.p->apiConnect;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
   ScanRecordPtr scanptr;
@@ -10234,6 +10213,7 @@
     tabptr.p->noOfKeyAttr = 0;
     tabptr.p->hasCharAttr = 0;
     tabptr.p->noOfDistrKeys = 0;
+    tabptr.p->hasVarKeys = 0;
   }//for
 }//Dbtc::initTable()
 
@@ -11298,7 +11278,6 @@
   ApiConnectRecordPtr transPtr;
   TcConnectRecord *localTcConnectRecord = tcConnectRecord;
   TcConnectRecordPtr opPtr;
-
   /**
    * TODO
    * Check transid,
@@ -11312,6 +11291,7 @@
     
     c_firedTriggerHash.remove(trigPtr);
 
+    trigPtr.p->fragId= fireOrd->fragId;
     bool ok = trigPtr.p->keyValues.getSize() == fireOrd->m_noPrimKeyWords;
     ok &= trigPtr.p->afterValues.getSize() == fireOrd->m_noAfterValueWords;
     ok &= trigPtr.p->beforeValues.getSize() == fireOrd->m_noBeforeValueWords;
@@ -11523,7 +11503,7 @@
   const UintR TopWords = (UintR)regApiPtr->tcindxrec;
   localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
   const Uint32 type = getNodeInfo(localHostptr.i).m_type;
-  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
   const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
   const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL ? 0 : 1);
   ptrAss(localHostptr, hostRecord);
@@ -12222,7 +12202,11 @@
   Uint32 dataPos = 0;
   TcKeyReq * const tcIndxReq = &indexOp->tcIndxReq;
   TcKeyReq * const tcKeyReq = (TcKeyReq *)signal->getDataPtrSend();
-  Uint32 * dataPtr = &tcKeyReq->scanInfo;
+  /*
+    Data points to distrGroupHashValue since scanInfo is used to send
+    fragment id of receiving fragment
+  */
+  Uint32 * dataPtr = &tcKeyReq->distrGroupHashValue;
   Uint32 tcKeyLength = TcKeyReq::StaticLength;
   Uint32 tcKeyRequestInfo = tcIndxReq->requestInfo;
   TcIndexData* indexData;
@@ -12261,11 +12245,16 @@
   regApiPtr->executingIndexOp = indexOp->indexOpId;;
   regApiPtr->noIndexOp++; // Increase count
 
-  // Filter out AttributeHeader:s since this should not be in key
+  /*
+    Filter out AttributeHeader:s since this should not be in key.
+    Also filter out fragment id from primary key and handle that
+    separately by setting it as Distribution Key and set indicator.
+  */
+
   AttributeHeader* attrHeader = (AttributeHeader *) aiIter.data;
     
   Uint32 headerSize = attrHeader->getHeaderSize();
-  Uint32 keySize = attrHeader->getDataSize();
+  Uint32 keySize = attrHeader->getDataSize() - 1;
   TcKeyReq::setKeyLength(tcKeyRequestInfo, keySize);
   // Skip header
   if (headerSize == 1) {
@@ -12275,6 +12264,9 @@
     jam();
     moreKeyData = indexOp->transIdAI.next(aiIter, headerSize - 1);
   }//if
+  tcKeyReq->scanInfo = *aiIter.data; //Fragment Id
+  moreKeyData = indexOp->transIdAI.next(aiIter);
+  TcKeyReq::setDistributionKeyFlag(tcKeyRequestInfo, 1U);
   while(// If we have not read complete key
 	(keySize != 0) &&
 	(dataPos < keyBufSize)) {
@@ -12540,7 +12532,7 @@
 	  tmp2.release();
 	  LocalDataBuffer<11> tmp3(pool, trigPtr.p->afterValues);
 	  tmp3.release();
-          regApiPtr->theFiredTriggers.release(trigPtr.i);
+          regApiPtr->theFiredTriggers.release(trigPtr);
         }
 	trigPtr = nextTrigPtr;
       }
@@ -12657,7 +12649,7 @@
   AttributeBuffer::DataBufferIterator iter;
   Uint32 attrId = 0;
   Uint32 keyLength = 0;
-  Uint32 totalPrimaryKeyLength = 0;
+  Uint32 totalPrimaryKeyLength = 1; // fragment length
   Uint32 hops;
 
   indexTabPtr.i = indexData->indexId;
@@ -12710,11 +12702,12 @@
     hops = attrHeader->getHeaderSize() + attrHeader->getDataSize();
     moreAttrData = keyValues.next(iter, hops);
   }
-  AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength);
+  AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength << 2);
+  Uint32 attributesLength = afterValues.getSize() + 
+    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
   
   TcKeyReq::setKeyLength(tcKeyRequestInfo, keyLength);
-  tcKeyReq->attrLen = afterValues.getSize() + 
-    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+  tcKeyReq->attrLen = attributesLength;
   tcKeyReq->tableId = indexData->indexId;
   TcKeyReq::setOperationType(tcKeyRequestInfo, ZINSERT);
   TcKeyReq::setExecutingTrigger(tcKeyRequestInfo, true);
@@ -12764,8 +12757,11 @@
   }
 
   tcKeyLength += dataPos;
-  Uint32 attributesLength = afterValues.getSize() + 
-    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+  /*
+    Size of attrinfo is unique index attributes one by one, header for each
+    of them (all contained in the afterValues data structure), plus a header,
+    the primary key (compacted) and the fragment id before the primary key
+  */
   if (attributesLength <= attrBufSize) {
     jam();
     // ATTRINFO fits in TCKEYREQ
@@ -12782,6 +12778,10 @@
     // as one attribute
     pkAttrHeader.insertHeader(dataPtr);
     dataPtr += pkAttrHeader.getHeaderSize();
+    /*
+      Insert fragment id before primary key as part of reference to tuple
+    */
+    *dataPtr++ = firedTriggerData->fragId;
     moreAttrData = keyValues.first(iter);
     while(moreAttrData) {
       jam();
@@ -12977,6 +12977,29 @@
     pkAttrHeader.insertHeader(dataPtr);
     dataPtr += pkAttrHeader.getHeaderSize();
     attrInfoPos += pkAttrHeader.getHeaderSize();
+    /*
+      Add fragment id before primary key
+      TODO: This code really needs to be made into a long signal
+      to remove this messy code.
+    */
+    if (attrInfoPos == AttrInfo::DataLength)
+    {
+      jam();
+      // Flush ATTRINFO
+#if INTERNAL_TRIGGER_TCKEYREQ_JBA
+      sendSignal(reference(), GSN_ATTRINFO, signal, 
+                 AttrInfo::HeaderLength + AttrInfo::DataLength, JBA);
+#else
+      EXECUTE_DIRECT(DBTC, GSN_ATTRINFO, signal,
+                     AttrInfo::HeaderLength + AttrInfo::DataLength);
+      jamEntry();
+#endif
+      dataPtr = (Uint32 *) &attrInfo->attrData;	  
+      attrInfoPos = 0;
+    }
+    attrInfoPos++;
+    *dataPtr++ = firedTriggerData->fragId;
+
     moreAttrData = keyValues.first(iter);
     while(moreAttrData) {
       jam();
@@ -13277,3 +13300,56 @@
   return 0;
 }
 
+void
+Dbtc::execROUTE_ORD(Signal* signal)
+{
+  jamEntry();
+  if(!assembleFragments(signal)){
+    jam();
+    return;
+  }
+
+  RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+  Uint32 dstRef = ord->dstRef;
+  Uint32 srcRef = ord->srcRef;
+  Uint32 gsn = ord->gsn;
+  Uint32 cnt = ord->cnt;
+
+  if (likely(getNodeInfo(refToNode(dstRef)).m_connected))
+  {
+    jam();
+    Uint32 secCount = signal->getNoOfSections();
+    SegmentedSectionPtr ptr[3];
+    ndbrequire(secCount >= 1 && secCount <= 3);
+
+    jamLine(secCount);
+    for (Uint32 i = 0; i<secCount; i++)
+      signal->getSection(ptr[i], i);
+
+    /**
+     * Put section 0 in signal->theData
+     */
+    ndbrequire(ptr[0].sz <= 25);
+    copy(signal->theData, ptr[0]);
+
+    signal->header.m_noOfSections = 0;
+    
+    /**
+     * Shift rest of sections
+     */
+    for(Uint32 i = 1; i<secCount; i++)
+    {
+      signal->setSection(ptr[i], i - 1);
+    }
+
+    sendSignal(dstRef, gsn, signal, ptr[0].sz, JBB);
+
+    signal->header.m_noOfSections = 0;
+    signal->setSection(ptr[0], 0);
+    releaseSections(signal);
+    return ;
+  }
+
+  warningEvent("Unable to route GSN: %d from %x to %x",
+	       gsn, srcRef, dstRef);
+}
Thread
bk commit into 5.1 tree (jonas:1.2445)jonas21 Mar