List:Commits« Previous MessageNext Message »
From:jonas oreland Date:February 15 2011 11:41am
Subject:bzr commit into mysql-5.1-telco-7.0 branch (jonas:4194)
View as plain text  
#At file:///home/jonas/src/telco-7.0/ based on revid:jonas@stripped

 4194 jonas oreland	2011-02-15
      ndb - add support for 2-pass initial node restart copy

    modified:
      mysql-test/include/default_ndbd.cnf
      storage/ndb/include/kernel/signaldata/CopyFrag.hpp
      storage/ndb/include/kernel/signaldata/StartFragReq.hpp
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
      storage/ndb/include/ndb_version.h.in
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/test/run-test/conf-dl145a.cnf
      storage/ndb/test/run-test/conf-ndb07.cnf
=== modified file 'mysql-test/include/default_ndbd.cnf'
--- a/mysql-test/include/default_ndbd.cnf	2010-01-22 14:19:56 +0000
+++ b/mysql-test/include/default_ndbd.cnf	2011-02-15 11:41:27 +0000
@@ -14,6 +14,7 @@ TimeBetweenEpochs=             100
 NoOfFragmentLogFiles=          4
 FragmentLogFileSize=           12M
 DiskPageBufferMemory=          4M
+TwoPassInitialNodeRestartCopy=1
 
 # O_DIRECT has issues on 2.4 whach have not been handled, Bug #29612
 #ODirect= 1

=== modified file 'storage/ndb/include/kernel/signaldata/CopyFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/CopyFrag.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/CopyFrag.hpp	2011-02-15 11:41:27 +0000
@@ -32,11 +32,23 @@ class CopyFragReq {
    */
   friend class Dblqh;
 public:
-  STATIC_CONST( SignalLength = 10 );
+  STATIC_CONST( SignalLength = 11 );
 
 private:
-  Uint32 userPtr;
-  Uint32 userRef;
+
+  enum
+  {
+    CFR_TRANSACTIONAL = 1,    // Copy rows >= gci in transactional fashion
+    CFR_NON_TRANSACTIONAL = 2 // Copy rows <= gci in non transactional fashion
+  };
+  union {
+    Uint32 userPtr;
+    Uint32 senderData;
+  };
+  union {
+    Uint32 userRef;
+    Uint32 senderRef;
+  };
   Uint32 tableId;
   Uint32 fragId;
   Uint32 nodeId;
@@ -46,6 +58,7 @@ private:
   Uint32 nodeCount;
   Uint32 nodeList[1];
   //Uint32 maxPage; is stored in nodeList[nodeCount]
+  //Uint32 requestInfo is stored after maxPage
 };
 
 class CopyFragConf {
@@ -62,7 +75,10 @@ public:
   STATIC_CONST( SignalLength = 7 );
 
 private:
-  Uint32 userPtr;
+  union {
+    Uint32 userPtr;
+    Uint32 senderData;
+  };
   Uint32 sendingNodeId;
   Uint32 startingNodeId;
   Uint32 tableId;

=== modified file 'storage/ndb/include/kernel/signaldata/StartFragReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/StartFragReq.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/StartFragReq.hpp	2011-02-15 11:41:27 +0000
@@ -32,10 +32,16 @@ class StartFragReq {
    */
   friend class Dblqh;
 public:
-  STATIC_CONST( SignalLength = 19 );
+  STATIC_CONST( SignalLength = 20 );
 
   friend bool printSTART_FRAG_REQ(FILE *, const Uint32 *, Uint32, Uint16);  
   
+  enum
+  {
+    SFR_RESTORE_LCP = 1,
+    SFR_COPY_FRAG = 2
+  };
+
   Uint32 userPtr;
   Uint32 userRef;
   Uint32 lcpNo;
@@ -46,5 +52,6 @@ public:
   Uint32 lqhLogNode[4];
   Uint32 startGci[4];
   Uint32 lastGci[4];
+  Uint32 requestInfo;
 };
 #endif

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-02-15 11:41:27 +0000
@@ -179,6 +179,8 @@
 #define CFG_DB_NUMA                   614
 #define CFG_DB_LATE_ALLOC             615
 
+#define CFG_DB_2PASS_INR              616
+
 #define CFG_NODE_ARBIT_RANK           200
 #define CFG_NODE_ARBIT_DELAY          201
 #define CFG_RESERVED_SEND_BUFFER_MEMORY 202

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2011-02-03 14:20:36 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2011-02-15 11:41:27 +0000
@@ -574,4 +574,24 @@ ndbd_sync_req_support(Uint32 x)
   return x >= NDBD_SYNC_REQ_SUPPORT_71;
 }
 
+/**
+ * Does not support CopyFragReq::CFR_NON_TRANSACTIONAL
+ */
+#define NDBD_NON_TRANS_COPY_FRAG_REQ_70 NDB_MAKE_VERSION(7,0,22)
+#define NDBD_NON_TRANS_COPY_FRAG_REQ_71 NDB_MAKE_VERSION(7,1,11)
+
+static
+inline
+int
+ndbd_non_trans_copy_frag_req(Uint32 x)
+{
+  const Uint32 major = (x >> 16) & 0xFF;
+  const Uint32 minor = (x >>  8) & 0xFF;
+
+  if (major == 7 && minor == 0)
+    return x >= NDBD_NON_TRANS_COPY_FRAG_REQ_70;
+
+  return x >= NDBD_NON_TRANS_COPY_FRAG_REQ_71;
+}
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-02-15 11:41:27 +0000
@@ -1857,6 +1857,8 @@ private:
     return instanceKey;
   }
   Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
+
+  bool c_2pass_inr;
 };
 
 #if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2011-02-15 11:41:27 +0000
@@ -70,6 +70,7 @@ void Dbdih::initData() 
   cntrlblockref    = RNIL;
   c_set_initial_start_flag = FALSE;
   c_sr_wait_to = false;
+  c_2pass_inr = false;
 }//Dbdih::initData()
 
 void Dbdih::initRecords() 

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-02-03 14:20:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-02-15 11:41:27 +0000
@@ -1023,7 +1023,6 @@ void Dbdih::execFSCLOSEREF(Signal* signa
     sprintf(msg, "File system close failed during FileRecord status %d", (Uint32)status);
     fsRefError(signal,__LINE__,msg);
   }
-
   return;
 }//Dbdih::execFSCLOSEREF()
 
@@ -1325,6 +1324,13 @@ void Dbdih::execREAD_CONFIG_REQ(Signal* 
   initRecords();
   initialiseRecordsLab(signal, 0, ref, senderData);
 
+  {
+    Uint32 val = 0;
+    ndb_mgm_get_int_parameter(p, CFG_DB_2PASS_INR,
+                              &val);
+    c_2pass_inr = val ? true : false;
+  }
+
   /**
    * Set API assigned nodegroup(s)
    */
@@ -1368,7 +1374,6 @@ void Dbdih::execREAD_CONFIG_REQ(Signal* 
       }
     }
   }
-
   return;
 }//Dbdih::execSIZEALT_REP()
 
@@ -1980,7 +1985,33 @@ void Dbdih::execREAD_NODESCONF(Signal* s
     }//if
   }//for  
   nodeArray[index] = RNIL; // terminate
-  
+
+  if (c_2pass_inr)
+  {
+    jam();
+    printf("Checking 2-pass initial node restart: ");
+    for (i = 0; i<index; i++)
+    {
+      if (!ndbd_non_trans_copy_frag_req(getNodeInfo(nodeArray[i]).m_version))
+      {
+        jam();
+        c_2pass_inr = false;
+        printf("not ok (node %u) => disabled\n", nodeArray[i]);
+        break;
+      }
+    }
+    if (c_2pass_inr)
+      printf("ok\n");
+
+    /**
+     * Note: In theory it would be ok for just nodes that we plan to copy from
+     *   supported this...but in e.g a 3/4-replica scenario,
+     *      if one of the nodes does, and the other doesnt, we don't
+     *      have enought infrastructure to easily check this...
+     *      therefor we require all nodes to support it.
+     */
+  }
+
   if(cstarttype == NodeState::ST_SYSTEM_RESTART || 
      cstarttype == NodeState::ST_NODE_RESTART)
   {
@@ -3646,32 +3677,40 @@ Dbdih::nr_start_fragment(Signal* signal,
   
   Uint32 gci = 0;
   Uint32 restorableGCI = takeOverPtr.p->restorableGci;
-  
+
+#if defined VM_TRACE || defined ERROR_INSERT
   ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
 	   takeOverPtr.p->toCurrentTabref,
 	   takeOverPtr.p->toCurrentFragid,
 	   replicaPtr.p->nextLcp);
-  
+#endif
+
   Int32 j = replicaPtr.p->noCrashedReplicas - 1;
   Uint32 idx = prevLcpNo(replicaPtr.p->nextLcp);
   for(i = 0; i<MAX_LCP_USED; i++, idx = prevLcpNo(idx))
   {
+#if defined VM_TRACE || defined ERROR_INSERT
     printf("scanning idx: %d lcpId: %d crashed replicas: %u %s", 
            idx, replicaPtr.p->lcpId[idx],
            replicaPtr.p->noCrashedReplicas,
            replicaPtr.p->lcpStatus[idx] == ZVALID ? "VALID" : "NOT VALID");
+#endif
     if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
     {
       Uint32 startGci = replicaPtr.p->maxGciCompleted[idx] + 1;
       Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+#if defined VM_TRACE || defined ERROR_INSERT
       ndbout_c(" maxGciCompleted: %u maxGciStarted: %u", startGci - 1, stopGci);
+#endif
       for (; j>= 0; j--)
       {
+#if defined VM_TRACE || defined ERROR_INSERT
 	ndbout_c("crashed replica: %d(%d) replica(createGci: %u lastGci: %d )",
 		 j, 
 		 replicaPtr.p->noCrashedReplicas,
                  replicaPtr.p->createGci[j],
 		 replicaPtr.p->replicaLastGci[j]);
+#endif
 	if (replicaPtr.p->createGci[j] <= startGci &&
             replicaPtr.p->replicaLastGci[j] >= stopGci)
 	{
@@ -3684,23 +3723,29 @@ Dbdih::nr_start_fragment(Signal* signal,
     }
     else
     {
+#if defined VM_TRACE || defined ERROR_INSERT
       printf("\n");
+#endif
     }
   }
   
   idx = 2; // backward compat code
+#if defined VM_TRACE || defined ERROR_INSERT
   ndbout_c("- scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+#endif
   if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
   {
     Uint32 startGci = replicaPtr.p->maxGciCompleted[idx] + 1;
     Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
     for (;j >= 0; j--)
     {
+#if defined VM_TRACE || defined ERROR_INSERT
       ndbout_c("crashed replica: %d(%d) replica(createGci: %u lastGci: %d )",
                j, 
                replicaPtr.p->noCrashedReplicas,
                replicaPtr.p->createGci[j],
                replicaPtr.p->replicaLastGci[j]);
+#endif
       if (replicaPtr.p->createGci[j] <= startGci &&
           replicaPtr.p->replicaLastGci[j] >= stopGci)
       {
@@ -3714,17 +3759,17 @@ Dbdih::nr_start_fragment(Signal* signal,
   
 done:
   
+  StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+  req->requestInfo = StartFragReq::SFR_RESTORE_LCP;
   if (maxLcpIndex == ~ (Uint32) 0)
   {
+    /**
+     * we didn't find a local LCP that we can restore
+     */
     jam();
     ndbassert(gci == 0);
     replicaPtr.p->m_restorable_gci = gci;
-    ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
-	     takeOverPtr.p->toStartingNode,
-	     takeOverPtr.p->toCurrentTabref,
-	     takeOverPtr.p->toCurrentFragid);
 
-    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
     req->userPtr = 0;
     req->userRef = reference();
     req->lcpNo = ZNIL;
@@ -3733,6 +3778,60 @@ done:
     req->fragId = takeOverPtr.p->toCurrentFragid;
     req->noOfLogNodes = 0;
 
+    if (c_2pass_inr)
+    {
+      /**
+       * Check if we can make 2-phase copy
+       *   1) non-transaction, (after we rebuild indexes)
+       *   2) transaction (maintaining indexes during rebuild)
+       *      where the transactional copies efterything >= startGci
+       *
+       * NOTE: c_2pass_inr is only set if all nodes in cluster currently
+       *       supports this
+       */
+
+      if (takeOverPtr.p->startGci == 0)
+      {
+        jam();
+        /**
+         * Set a startGci to currently lastCompletedGCI of master
+         *   any value will do...as long as subsequent transactinal copy
+         *   will be using it (scanning >= this value)
+         */
+        takeOverPtr.p->startGci = SYSFILE->lastCompletedGCI[cmasterNodeId];
+      }
+
+      TabRecordPtr tabPtr;
+      tabPtr.i = takeOverPtr.p->toCurrentTabref;
+      ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
+
+      FragmentstorePtr fragPtr;
+      getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
+      Uint32 nodes[MAX_REPLICAS];
+      extractNodeInfo(fragPtr.p, nodes);
+
+      req->lqhLogNode[0] = nodes[0]; // Source
+      req->requestInfo = StartFragReq::SFR_COPY_FRAG;
+      replicaPtr.p->m_restorable_gci = takeOverPtr.p->startGci;
+    }
+
+    if (req->requestInfo == StartFragReq::SFR_RESTORE_LCP)
+    {
+      ndbout_c("node: %d tab: %d frag: %d no lcp to restore",
+               takeOverPtr.p->toStartingNode,
+               takeOverPtr.p->toCurrentTabref,
+               takeOverPtr.p->toCurrentFragid);
+    }
+    else
+    {
+      ndbout_c("node: %d tab: %d frag: %d copying data from %u (gci: %u)",
+               takeOverPtr.p->toStartingNode,
+               takeOverPtr.p->toCurrentTabref,
+               takeOverPtr.p->toCurrentFragid,
+               req->lqhLogNode[0],
+               req->lcpId);
+    }
+
     BlockReference ref = numberToRef(DBLQH, takeOverPtr.p->toStartingNode);
     sendSignal(ref, GSN_START_FRAGREQ, signal, 
 	       StartFragReq::SignalLength, JBB);
@@ -3748,17 +3847,19 @@ done:
 
       FragmentstorePtr fragPtr;
       getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
-      
       dump_replica_info(fragPtr.p);
     }
     ndbassert(gci == restorableGCI);
     replicaPtr.p->m_restorable_gci = gci;
     Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
-    ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d) newestRestorableGCI: %d",
+    ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
+             takeOverPtr.p->toStartingNode,
+             takeOverPtr.p->toCurrentTabref,
+             takeOverPtr.p->toCurrentFragid,
 	     maxLcpId,
-	     maxLcpIndex,
+             maxLcpIndex,
 	     replicaPtr.p->maxGciStarted[maxLcpIndex],
-	     replicaPtr.p->maxGciCompleted[maxLcpIndex],	     
+	     replicaPtr.p->maxGciCompleted[maxLcpIndex],
 	     restorableGCI,
 	     SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
 	     SYSFILE->newestRestorableGCI);
@@ -4238,7 +4339,8 @@ Dbdih::toStartCopyFrag(Signal* signal, T
     extractNodeInfo(fragPtr.p, 
                     copyFragReq->nodeList);
   copyFragReq->nodeList[len] = takeOverPtr.p->maxPage;
-  sendSignal(ref, GSN_COPY_FRAGREQ, signal, 
+  copyFragReq->nodeList[len+1] = CopyFragReq::CFR_TRANSACTIONAL;
+  sendSignal(ref, GSN_COPY_FRAGREQ, signal,
              CopyFragReq::SignalLength + len, JBB);
 }//Dbdih::toStartCopy()
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2011-02-15 11:41:27 +0000
@@ -2246,6 +2246,8 @@ private:
   void execSTORED_PROCCONF(Signal* signal);
   void execSTORED_PROCREF(Signal* signal);
   void execCOPY_FRAGREQ(Signal* signal);
+  void execCOPY_FRAGREF(Signal* signal);
+  void execCOPY_FRAGCONF(Signal* signal);
   void execPREPARE_COPY_FRAG_REQ(Signal* signal);
   void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
   void execCOPY_ACTIVEREQ(Signal* signal);
@@ -2643,6 +2645,7 @@ private:
   void restartOperationsAfterStopLab(Signal* signal);
   void startphase1Lab(Signal* signal, Uint32 config, Uint32 nodeId);
   void tupkeyConfLab(Signal* signal);
+  void copyTupkeyRefLab(Signal* signal);
   void copyTupkeyConfLab(Signal* signal);
   void scanTupkeyConfLab(Signal* signal);
   void scanTupkeyRefLab(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-02-15 11:41:27 +0000
@@ -348,6 +348,8 @@ Dblqh::Dblqh(Block_context& ctx, Uint32 
   addRecSignal(GSN_STORED_PROCCONF, &Dblqh::execSTORED_PROCCONF);
   addRecSignal(GSN_STORED_PROCREF, &Dblqh::execSTORED_PROCREF);
   addRecSignal(GSN_COPY_FRAGREQ, &Dblqh::execCOPY_FRAGREQ);
+  addRecSignal(GSN_COPY_FRAGREF, &Dblqh::execCOPY_FRAGREF);
+  addRecSignal(GSN_COPY_FRAGCONF, &Dblqh::execCOPY_FRAGCONF);
   addRecSignal(GSN_COPY_ACTIVEREQ, &Dblqh::execCOPY_ACTIVEREQ);
   addRecSignal(GSN_COPY_STATEREQ, &Dblqh::execCOPY_STATEREQ);
   addRecSignal(GSN_LQH_TRANSREQ, &Dblqh::execLQH_TRANSREQ);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-01-31 17:37:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-02-15 11:41:27 +0000
@@ -3520,7 +3520,7 @@ void Dblqh::execTUPKEYREF(Signal* signal
     abortErrorLab(signal);
     break;
   case TcConnectionrec::COPY_TUPKEY:
-    ndbrequire(false);
+    copyTupkeyRefLab(signal);
     break;
   case TcConnectionrec::SCAN_TUPKEY:
     jam();
@@ -11806,8 +11806,6 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
   ndbrequire(cfirstfreeTcConrec != RNIL);
   ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
 
-  Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
-  
   Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
     NDBD_UPDATE_FRAG_DIST_KEY_51 :  NDBD_UPDATE_FRAG_DIST_KEY_50;
   
@@ -11821,12 +11819,29 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
   }
   Uint32 maxPage = copyFragReq->nodeList[nodeCount];
   Uint32 version = getNodeInfo(refToNode(userRef)).m_version;
+  Uint32 requestInfo = copyFragReq->nodeList[nodeCount + 1];
   if (ndb_check_prep_copy_frag_version(version) < 2)
   {
     jam();
     maxPage = RNIL;
   }
-    
+
+  if (signal->getLength() < CopyFragReq::SignalLength + nodeCount)
+  {
+    jam();
+    requestInfo = CopyFragReq::CFR_TRANSACTIONAL;
+  }
+
+  if (requestInfo == CopyFragReq::CFR_NON_TRANSACTIONAL)
+  {
+    jam();
+  }
+  else
+  {
+    fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
+  }
+  Uint32 key = fragptr.p->fragDistributionKey;
+
   if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
     jam();
     /**
@@ -11842,7 +11857,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
 	       CopyFragConf::SignalLength, JBB);
     return;
   }//if
-  
+
   LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
   ndbrequire(m_reserved_scans.first(scanptr));
   m_reserved_scans.remove(scanptr);
@@ -11882,6 +11897,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
   scanptr.p->scanLockHold = ZFALSE;
   scanptr.p->m_curr_batch_size_rows = 0;
   scanptr.p->m_curr_batch_size_bytes= 0;
+  scanptr.p->readCommitted = 0;
   
   initScanTc(0,
              0,
@@ -11905,10 +11921,30 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
   req->tableId = tabptr.i;
   req->fragmentNo = fragId;
   req->requestInfo = 0;
-  AccScanReq::setLockMode(req->requestInfo, 0);
-  AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
-  AccScanReq::setNRScanFlag(req->requestInfo, 1);
-  AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
+
+  if (requestInfo == CopyFragReq::CFR_TRANSACTIONAL)
+  {
+    jam();
+    /**
+     * An node-recovery scan, is shared lock
+     *   and may not perform disk-scan (as it then can miss uncomitted inserts)
+     */
+    AccScanReq::setLockMode(req->requestInfo, 0);
+    AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
+    AccScanReq::setNRScanFlag(req->requestInfo, 1);
+    AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
+  }
+  else
+  {
+    jam();
+    /**
+     * The non-transaction scan is really only a "normal" tup scan
+     *   committed read, and don't disable disk-scan
+     */
+    AccScanReq::setLockMode(req->requestInfo, 0);
+    AccScanReq::setReadCommittedFlag(req->requestInfo, 1);
+    scanptr.p->readCommitted = 1;
+  }
 
   req->transId1 = tcConnectptr.p->transid[0];
   req->transId2 = tcConnectptr.p->transid[1];
@@ -12235,6 +12271,51 @@ void Dblqh::execTRANSID_AI(Signal* signa
 /*--------------------------------------------------------------------------*/
 /*  PRECONDITION:   TRANSACTION_STATE = COPY_TUPKEY                         */
 /*--------------------------------------------------------------------------*/
+void Dblqh::copyTupkeyRefLab(Signal* signal)
+{
+  //const TupKeyRef * tupKeyRef = (TupKeyRef *)signal->getDataPtr();
+
+  scanptr.i = tcConnectptr.p->tcScanRec;
+  c_scanRecordPool.getPtr(scanptr);
+  ScanRecord* scanP = scanptr.p;
+
+  if (scanP->readCommitted == 0)
+  {
+    jam();
+    ndbrequire(false); // Should not be possibe...we read with lock
+  }
+  else
+  {
+    jam();
+    /**
+     * Any readCommitted scan, can get 626 if it finds a candidate record
+     *   that is not visible to the scan (i.e uncommitted inserts)
+     *   if scanning with locks (shared/exclusive) this is not visible
+     *   to LQH as lock is taken earlier
+     */
+    ndbrequire(terrorCode == 626);
+  }
+
+  ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_TUPKEY_COPY);
+  if (tcConnectptr.p->errorCode != 0)
+  {
+    jam();
+    closeCopyLab(signal);
+    return;
+  }
+
+  if (scanptr.p->scanCompletedStatus == ZTRUE)
+  {
+    jam();
+    closeCopyLab(signal);
+    return;
+  }
+
+  ndbrequire(tcConnectptr.p->copyCountWords < cmaxWordsAtNodeRec);
+  scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
+  nextRecordCopy(signal);
+}
+
 void Dblqh::copyTupkeyConfLab(Signal* signal) 
 {
   const TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtr();
@@ -12245,10 +12326,14 @@ void Dblqh::copyTupkeyConfLab(Signal* si
   c_scanRecordPool.getPtr(scanptr);
   ScanRecord* scanP = scanptr.p;
 
-  Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, 0, false);
-  ndbassert(accOpPtr != (Uint32)-1);
-  c_acc->execACCKEY_ORD(signal, accOpPtr);
-  
+  if (scanP->readCommitted == 0)
+  {
+    jam();
+    Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, 0, false);
+    ndbassert(accOpPtr != (Uint32)-1);
+    c_acc->execACCKEY_ORD(signal, accOpPtr);
+  }
+
   if (tcConnectptr.p->errorCode != 0) {
     jam();
     closeCopyLab(signal);
@@ -16242,6 +16327,12 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
   Uint32 lcpNo = startFragReq->lcpNo;
   Uint32 noOfLogNodes = startFragReq->noOfLogNodes;
   Uint32 lcpId = startFragReq->lcpId;
+  Uint32 requestInfo = startFragReq->requestInfo;
+  if (signal->getLength() < StartFragReq::SignalLength)
+  {
+    jam();
+    requestInfo = StartFragReq::SFR_RESTORE_LCP;
+  }
 
   bool doprint = false;
 #ifdef ERROR_INSERT
@@ -16275,6 +16366,16 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
   fragptr.p->logFlag = Fragrecord::STATE_FALSE;
   fragptr.p->srStatus = Fragrecord::SS_IDLE;
 
+  if (requestInfo == StartFragReq::SFR_COPY_FRAG)
+  {
+    ndbrequire(lcpNo == ZNIL);
+    Uint32 n = fragptr.p->srLqhLognode[0] = startFragReq->lqhLogNode[0]; // src
+    ndbrequire(ndbd_non_trans_copy_frag_req(getNodeInfo(n).m_version));
+
+    // Magic no, meaning to COPY_FRAGREQ instead of read from disk
+    fragptr.p->srChkpnr = Z8NIL;
+  }
+
   if (noOfLogNodes > 0) 
   {
     jam();
@@ -16300,7 +16401,11 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
     }
   }//if
   
-  if (lcpNo == ZNIL)
+  if (requestInfo == StartFragReq::SFR_COPY_FRAG)
+  {
+    jam();
+  }
+  else if (lcpNo == ZNIL)
   {
     jam();
     /**
@@ -16326,15 +16431,19 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
     jam();
     c_tup->disk_restart_lcp_id(tabptr.i, fragId, lcpId);
     jamEntry();
-  }
 
-  if (ERROR_INSERTED(5055))
-  {
-    ndbrequire(c_lcpId == 0 || lcpId == 0 || c_lcpId == lcpId);
+    if (ERROR_INSERTED(5055))
+    {
+      ndbrequire(c_lcpId == 0 || lcpId == 0 || c_lcpId == lcpId);
+    }
+
+    /**
+     * Keep track of minimal lcp-id
+     */
+    c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
+    c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
   }
 
-  c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
-  c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
   c_lcp_waiting_fragments.add(fragptr);
   if(c_lcp_restoring_fragments.isEmpty())
     send_restore_lcp(signal);
@@ -16346,18 +16455,85 @@ Dblqh::send_restore_lcp(Signal * signal)
   c_lcp_waiting_fragments.first(fragptr);
   c_lcp_waiting_fragments.remove(fragptr);
   c_lcp_restoring_fragments.add(fragptr);
-  
-  RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtrSend();
-  req->senderData = fragptr.i;
-  req->senderRef = reference();
-  req->tableId = fragptr.p->tabRef;
-  req->fragmentId = fragptr.p->fragId;
-  req->lcpNo = fragptr.p->srChkpnr;
-  req->lcpId = fragptr.p->lcpId[fragptr.p->srChkpnr];
-  
-  BlockReference restoreRef = calcInstanceBlockRef(RESTORE);
-  sendSignal(restoreRef, GSN_RESTORE_LCP_REQ, signal, 
-	     RestoreLcpReq::SignalLength, JBB);
+
+  if (fragptr.p->srChkpnr != Z8NIL)
+  {
+    jam();
+    RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtrSend();
+    req->senderData = fragptr.i;
+    req->senderRef = reference();
+    req->tableId = fragptr.p->tabRef;
+    req->fragmentId = fragptr.p->fragId;
+    req->lcpNo = fragptr.p->srChkpnr;
+    req->lcpId = fragptr.p->lcpId[fragptr.p->srChkpnr];
+    BlockReference restoreRef = calcInstanceBlockRef(RESTORE);
+    sendSignal(restoreRef, GSN_RESTORE_LCP_REQ, signal,
+               RestoreLcpReq::SignalLength, JBB);
+  }
+  else
+  {
+    jam();
+
+    tabptr.i = fragptr.p->tabRef;
+    ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+
+    fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
+    CopyFragReq * req = CAST_PTR(CopyFragReq, signal->getDataPtrSend());
+    req->senderData = fragptr.i;
+    req->senderRef = reference();
+    req->tableId = fragptr.p->tabRef;
+    req->fragId = fragptr.p->fragId;
+    req->nodeId = getOwnNodeId();
+    req->schemaVersion = tabptr.p->schemaVersion;
+    req->distributionKey = 0;
+    req->gci = fragptr.p->lcpId[0];
+    req->nodeCount = 0;
+    req->nodeList[1] = CopyFragReq::CFR_NON_TRANSACTIONAL;
+    Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+    BlockReference ref = numberToRef(DBLQH, instanceKey,
+                                     fragptr.p->srLqhLognode[0]);
+
+    sendSignal(ref, GSN_COPY_FRAGREQ, signal,
+               CopyFragReq::SignalLength, JBB);
+
+  }
+}
+
+void
+Dblqh::execCOPY_FRAGREF(Signal* signal)
+{
+  jamEntry();
+  ndbrequire(false);
+}
+
+void
+Dblqh::execCOPY_FRAGCONF(Signal* signal)
+{
+  jamEntry();
+  {
+    const CopyFragConf* conf = CAST_CONSTPTR(CopyFragConf,
+                                             signal->getDataPtr());
+    c_fragment_pool.getPtr(fragptr, conf->senderData);
+    fragptr.p->fragStatus = Fragrecord::CRASH_RECOVERING;
+
+    Uint32 rows_lo = conf->rows_lo;
+    Uint32 bytes_lo = conf->bytes_lo;
+    signal->theData[0] = NDB_LE_NR_CopyFragDone;
+    signal->theData[1] = getOwnNodeId();
+    signal->theData[2] = fragptr.p->tabRef;
+    signal->theData[3] = fragptr.p->fragId;
+    signal->theData[4] = rows_lo;
+    signal->theData[5] = 0;
+    signal->theData[6] = bytes_lo;
+    signal->theData[7] = 0;
+    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
+  }
+
+  {
+    RestoreLcpConf* conf= (RestoreLcpConf*)signal->getDataPtr();
+    conf->senderData = fragptr.i;
+    execRESTORE_LCP_CONF(signal);
+  }
 }
 
 void Dblqh::startFragRefLab(Signal* signal) 

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2011-02-15 11:41:27 +0000
@@ -1911,6 +1911,21 @@ const ConfigInfo::ParamInfo ConfigInfo::
     "1"                      /* Max */
   },
 
+
+  {
+    CFG_DB_2PASS_INR,
+    "TwoPassInitialNodeRestartCopy",
+    DB_TOKEN,
+    "Copy data in 2 passes for initial node restart, "
+    "this enables multi-threaded-ordered index build for initial node restart",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_BOOL,
+    "false",
+    "false",                     /* Min */
+    "true"                       /* Max */
+  },
+
   /***************************************************************************
    * API
    ***************************************************************************/

=== modified file 'storage/ndb/test/run-test/conf-dl145a.cnf'
--- a/storage/ndb/test/run-test/conf-dl145a.cnf	2009-02-17 09:26:44 +0000
+++ b/storage/ndb/test/run-test/conf-dl145a.cnf	2011-02-15 11:41:27 +0000
@@ -31,3 +31,4 @@ ODirect=1
 SharedGlobalMemory=256M
 InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:128M
 InitialTablespace=datafile01.dat:128M;datafile02.dat:64M
+TwoPassInitialNodeRestartCopy=1

=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf	2009-09-21 08:44:14 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf	2011-02-15 11:41:27 +0000
@@ -39,3 +39,4 @@ FileSystemPathDataFiles=/data1/autotest
 FileSystemPathUndoFiles=/data2/autotest
 InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:256M
 InitialTablespace=datafile01.dat:256M;datafile02.dat:256M
+TwoPassInitialNodeRestartCopy=1


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20110215114127-83183eu4vvqf6ddn.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (jonas:4194) jonas oreland15 Feb