List:Commits« Previous MessageNext Message »
From:Stewart Smith Date:January 28 2006 4:41am
Subject:bk commit into 5.2 tree (stewart:1.2078)
View as plain text  
Below is the list of changes that have just been committed into a local
5.2 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2078 06/01/28 17:41:44 stewart@stripped +2 -0
  Merge mysql.com:/home/stewart/Documents/MySQL/5.1/wl1504
  into  mysql.com:/home/stewart/Documents/MySQL/5.2/wl1504-add-node

  storage/ndb/src/kernel/blocks/suma/Suma.cpp
    1.30 06/01/28 17:41:40 stewart@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
    1.42 06/01/28 17:41:39 stewart@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	stewart
# Host:	willster.(none)
# Root:	/home/stewart/Documents/MySQL/5.2/wl1504-add-node/RESYNC

--- 1.41/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2005-12-03 23:29:58 +13:00
+++ 1.42/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2006-01-28 17:41:39 +13:00
@@ -67,6 +67,7 @@
 #include <signaldata/CreateFragmentation.hpp>
 #include <signaldata/LqhFrag.hpp>
 #include <signaldata/FsOpenReq.hpp>
+#include <signaldata/DihFragCount.hpp>
 #include <DebuggerNames.hpp>
 
 #include <EventLogger.hpp>
@@ -215,7 +216,7 @@
   signal->theData[2] = c_nodeStartMaster.failNr;
   signal->theData[3] = 0;
   signal->theData[4] = currentgcp;  
-  sendSignal(nodeDihRef, GSN_INCL_NODEREQ, signal, 5, JBB);
+  sendSignal(nodeDihRef, GSN_INCL_NODEREQ, signal, 5, JBA);
 }//Dbdih::sendINCL_NODEREQ()
 
 void Dbdih::sendMASTER_GCPREQ(Signal* signal, Uint32 nodeId)
@@ -609,6 +610,14 @@
     checkWaitDropTabFailedLqh(signal, nodeId, tableId);
     return;
   }
+  case DihContinueB::ZTO_START_FRAGMENTS:
+  {
+    TakeOverRecordPtr takeOverPtr;
+    takeOverPtr.i = signal->theData[1];
+    ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
   }//switch
   
   ndbrequire(false);
@@ -1817,11 +1826,6 @@
   ndbrequire(c_nodeStartMaster.startNode == Tnodeid);
   ndbrequire(getNodeStatus(Tnodeid) == NodeRecord::STARTING);
   
-  sendSTART_RECREQ(signal, Tnodeid);
-}//Dbdih::execSTART_MEREQ()
-
-void Dbdih::nodeRestartStartRecConfLab(Signal* signal) 
-{
   c_nodeStartMaster.blockLcp = true;
   if ((c_lcpState.lcpStatus != LCP_STATUS_IDLE) &&
       (c_lcpState.lcpStatus != LCP_TCGET)) {
@@ -1914,6 +1918,14 @@
   // global checkpoint id and the correct state. We do not wait for any reply
   // since the starting node will not send any.
   /*-------------------------------------------------------------------------*/
+  Uint32 startVersion = getNodeInfo(c_nodeStartMaster.startNode).m_version;
+  
+  if ((getMajor(startVersion) == 4 && startVersion >= NDBD_INCL_NODECONF_VERSION_4) ||
+      (getMajor(startVersion) == 5 && startVersion >= NDBD_INCL_NODECONF_VERSION_5))
+  {
+    c_INCL_NODEREQ_Counter.setWaitingFor(c_nodeStartMaster.startNode);
+  }
+  
   sendINCL_NODEREQ(signal, c_nodeStartMaster.startNode);
 }//Dbdih::gcpBlockedLab()
 
@@ -2116,6 +2128,13 @@
   jamEntry();
   Uint32 retRef = signal->theData[0];
   Uint32 nodeId = signal->theData[1];
+  if (nodeId == getOwnNodeId() && ERROR_INSERTED(7165))
+  {
+    CLEAR_ERROR_INSERT_VALUE;
+    sendSignalWithDelay(reference(), GSN_INCL_NODEREQ, signal, 5000, signal->getLength());
+    return;
+  }
+  
   Uint32 tnodeStartFailNr = signal->theData[2];
   currentgcp = signal->theData[4];
   CRASH_INSERTION(7127);
@@ -2143,6 +2162,15 @@
     // id's and the lcp status.
     /*-----------------------------------------------------------------------*/
     CRASH_INSERTION(7171);
+    Uint32 masterVersion = getNodeInfo(refToNode(cmasterdihref)).m_version;
+    
+    if ((NDB_VERSION_MAJOR == 4 && masterVersion >= NDBD_INCL_NODECONF_VERSION_4) ||
+	(NDB_VERSION_MAJOR == 5 && masterVersion >= NDBD_INCL_NODECONF_VERSION_5))
+    {
+      signal->theData[0] = getOwnNodeId();
+      signal->theData[1] = getOwnNodeId();
+      sendSignal(cmasterdihref, GSN_INCL_NODECONF, signal, 2, JBB);
+    }
     return;
   }//if
   if (getNodeStatus(nodeId) != NodeRecord::STARTING) {
@@ -2619,13 +2647,14 @@
     return;
   }//if
   c_startToLock = takeOverPtrI;
+
+  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   StartToReq * const req = (StartToReq *)&signal->theData[0];
   req->userPtr = takeOverPtr.i;
   req->userRef = reference();
   req->startingNodeId = takeOverPtr.p->toStartingNode;
   req->nodeTakenOver = takeOverPtr.p->toFailedNode;
   req->nodeRestart = takeOverPtr.p->toNodeRestart;
-  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   sendLoopMacro(START_TOREQ, sendSTART_TOREQ);
 }//Dbdih::sendStartTo()
 
@@ -2669,9 +2698,153 @@
   CRASH_INSERTION(7134);
   c_startToLock = RNIL;
 
+  if (takeOverPtr.p->toNodeRestart)
+  {
+    jam();
+    takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING_LOCAL_FRAGMENTS;
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
+
   startNextCopyFragment(signal, takeOverPtr.i);
 }//Dbdih::execSTART_TOCONF()
 
+void
+Dbdih::nr_start_fragments(Signal* signal, 
+			  TakeOverRecordPtr takeOverPtr)
+{
+  Uint32 loopCount = 0 ;
+  TabRecordPtr tabPtr;
+  while (loopCount++ < 100) {
+    tabPtr.i = takeOverPtr.p->toCurrentTabref;
+    if (tabPtr.i >= ctabFileSize) {
+      jam();
+      nr_run_redo(signal, takeOverPtr);
+      return;
+    }//if
+    ptrAss(tabPtr, tabRecord);
+    if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE){
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    Uint32 fragId = takeOverPtr.p->toCurrentFragid;
+    if (fragId >= tabPtr.p->totalfragments) {
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    FragmentstorePtr fragPtr;
+    getFragstore(tabPtr.p, fragId, fragPtr);
+    ReplicaRecordPtr loopReplicaPtr;
+    loopReplicaPtr.i = fragPtr.p->oldStoredReplicas;
+    while (loopReplicaPtr.i != RNIL) {
+      ptrCheckGuard(loopReplicaPtr, creplicaFileSize, replicaRecord);
+      if (loopReplicaPtr.p->procNode == takeOverPtr.p->toStartingNode) {
+        jam();
+	nr_start_fragment(signal, takeOverPtr, loopReplicaPtr);
+	break;
+      } else {
+        jam();
+        loopReplicaPtr.i = loopReplicaPtr.p->nextReplica;
+      }//if
+    }//while
+    takeOverPtr.p->toCurrentFragid++;
+  }//while
+  signal->theData[0] = DihContinueB::ZTO_START_FRAGMENTS;
+  signal->theData[1] = takeOverPtr.i;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+}
+
+void
+Dbdih::nr_start_fragment(Signal* signal, 
+			 TakeOverRecordPtr takeOverPtr,
+			 ReplicaRecordPtr replicaPtr)
+{
+  Uint32 i, j = 0;
+  Uint32 maxLcpId = 0;
+  Uint32 maxLcpIndex = ~0;
+  
+  Uint32 restorableGCI = 0;
+  
+  ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
+	   takeOverPtr.p->toCurrentTabref,
+	   takeOverPtr.p->toCurrentFragid,
+	   replicaPtr.p->nextLcp);
+  
+  Uint32 idx = replicaPtr.p->nextLcp;
+  for(i = 0; i<MAX_LCP_STORED; i++, idx = nextLcpNo(idx))
+  {
+    ndbout_c("scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+    if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
+    {
+      ndbrequire(replicaPtr.p->lcpId[idx] > maxLcpId);
+      Uint32 startGci = replicaPtr.p->maxGciCompleted[idx];
+      Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+      for (;j < replicaPtr.p->noCrashedReplicas; j++)
+      {
+	ndbout_c("crashed replica: %d(%d) replicaLastGci: %d",
+		 j, 
+		 replicaPtr.p->noCrashedReplicas,
+		 replicaPtr.p->replicaLastGci[j]);
+	if (replicaPtr.p->replicaLastGci[j] > stopGci)
+	{
+	  maxLcpId = replicaPtr.p->lcpId[idx];
+	  maxLcpIndex = idx;
+	  restorableGCI = replicaPtr.p->replicaLastGci[j];
+	  break;
+	}
+      }
+    }
+  }
+  
+  if (maxLcpIndex == ~0)
+  {
+    ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
+	     takeOverPtr.p->toStartingNode,
+	     takeOverPtr.p->toCurrentTabref,
+	     takeOverPtr.p->toCurrentFragid);
+    replicaPtr.p->lcpIdStarted = 0;
+  }
+  else
+  {
+    ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d) newestRestorableGCI: %d",
+	     maxLcpId,
+	     maxLcpIndex,
+	     replicaPtr.p->maxGciStarted[maxLcpIndex],
+	     replicaPtr.p->maxGciCompleted[maxLcpIndex],	     
+	     restorableGCI,
+	     SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
+	     SYSFILE->newestRestorableGCI);
+
+    replicaPtr.p->lcpIdStarted = restorableGCI;
+    BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+    req->userPtr = 0;
+    req->userRef = reference();
+    req->lcpNo = maxLcpIndex;
+    req->lcpId = maxLcpId;
+    req->tableId = takeOverPtr.p->toCurrentTabref;
+    req->fragId = takeOverPtr.p->toCurrentFragid;
+    req->noOfLogNodes = 1;
+    req->lqhLogNode[0] = takeOverPtr.p->toStartingNode;
+    req->startGci[0] = replicaPtr.p->maxGciCompleted[maxLcpIndex];
+    req->lastGci[0] = restorableGCI;
+    sendSignal(ref, GSN_START_FRAGREQ, signal, 
+	       StartFragReq::SignalLength, JBB);
+  }
+}
+
+void
+Dbdih::nr_run_redo(Signal* signal, TakeOverRecordPtr takeOverPtr)
+{
+  takeOverPtr.p->toCurrentTabref = 0;
+  takeOverPtr.p->toCurrentFragid = 0;
+  sendSTART_RECREQ(signal, takeOverPtr.p->toStartingNode);
+}
+
 void Dbdih::initStartTakeOver(const StartToReq * req, 
 			      TakeOverRecordPtr takeOverPtr)
 {
@@ -3004,6 +3177,14 @@
     /*---------------------------------------------------------------------- */
     FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);
+    Uint32 gci = 0;
+    if (takeOverPtr.p->toNodeRestart)
+    {
+      ReplicaRecordPtr replicaPtr;
+      findReplica(replicaPtr, fragPtr.p, takeOverPtr.p->toStartingNode, true);
+      gci = replicaPtr.p->lcpIdStarted;
+      replicaPtr.p->lcpIdStarted = 0;
+    }
     takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
     BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toCopyNode);
     CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
@@ -3014,6 +3195,7 @@
     copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
     copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
     copyFragReq->distributionKey = fragPtr.p->distributionKey;
+    copyFragReq->gci = gci;
     sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
   } else {
     ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
@@ -3804,9 +3986,23 @@
   /*------------------------------------------------------------------------*/
   // Verify that a starting node has also crashed. Reset the node start record.
   /*-------------------------------------------------------------------------*/
-  if (c_nodeStartMaster.startNode != RNIL) {
-    ndbrequire(getNodeStatus(c_nodeStartMaster.startNode)!= NodeRecord::ALIVE);
+#if 0
+  /**
+   * Node will crash by itself...
+   *   nodeRestart is run then...
+   */
+  if (false && c_nodeStartMaster.startNode != RNIL && getNodeStatus(c_nodeStartMaster.startNode) == NodeRecord::ALIVE)
+  {
+    BlockReference cntrRef = calcNdbCntrBlockRef(c_nodeStartMaster.startNode);
+    SystemError * const sysErr = (SystemError*)&signal->theData[0];
+    sysErr->errorCode = SystemError::StartInProgressError;
+    sysErr->errorRef = reference();
+    sysErr->data1= 0;
+    sysErr->data2= __LINE__;
+    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,  SystemError::SignalLength, JBA);
+    nodeResetStart();  
   }//if
+#endif
 
   /*--------------------------------------------------*/
   /*                                                  */
@@ -4052,6 +4248,8 @@
 						  Uint32 takeOverPtrI)
 {
   jam();
+  ndbout_c("checkTakeOverInMasterStartNodeFailure %x",
+	   takeOverPtrI);
   if (takeOverPtrI == RNIL) {
     jam();
     return;
@@ -4065,6 +4263,9 @@
   takeOverPtr.i = takeOverPtrI;
   ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
 
+  ndbout_c("takeOverPtr.p->toMasterStatus: %x", 
+	   takeOverPtr.p->toMasterStatus);
+  
   bool ok = false;
   switch (takeOverPtr.p->toMasterStatus) {
   case TakeOverRecord::IDLE:
@@ -4173,6 +4374,13 @@
     //-----------------------------------------------------------------------
     endTakeOver(takeOverPtr.i);
     break;
+
+  case TakeOverRecord::STARTING_LOCAL_FRAGMENTS:
+    ok = true;
+    jam();
+    endTakeOver(takeOverPtr.i);
+    break;
+    
     /**
      * The following are states that it should not be possible to "be" in
      */
@@ -5252,15 +5460,16 @@
     /**
      * For each of replica record
      */
-    Uint32 replicaNo = 0;
+    bool found = false;
     ReplicaRecordPtr replicaPtr;
     for(replicaPtr.i = fragPtr.p->storedReplicas; replicaPtr.i != RNIL;
-        replicaPtr.i = replicaPtr.p->nextReplica, replicaNo++) {
+        replicaPtr.i = replicaPtr.p->nextReplica) {
       jam();
 
       ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
       if(replicaPtr.p->procNode == nodeId){
         jam();
+	found = true;
 	noOfRemovedReplicas++;
 	removeNodeFromStored(nodeId, fragPtr, replicaPtr);
 	if(replicaPtr.p->lcpOngoingFlag){
@@ -5276,6 +5485,15 @@
 	}
       }
     }
+    if (!found)
+    {
+      jam();
+      /**
+       * Run updateNodeInfo to remove any dead nodes from list of activeNodes
+       *  see bug#15587
+       */
+      updateNodeInfo(fragPtr);
+    }
     noOfRemainingLcpReplicas += fragPtr.p->noLcpReplicas;
   }
   
@@ -6237,7 +6455,6 @@
   ***************************************
   */
 
-#define UNDEF_NODEGROUP 65535
 static inline void inc_node_or_group(Uint32 &node, Uint32 max_node)
 {
   Uint32 next = node + 1;
@@ -6594,6 +6811,8 @@
     Uint32 activeIndex = 0;
     getFragstore(tabPtr.p, fragId, fragPtr);
     fragPtr.p->preferredPrimary = fragments[index];
+    fragPtr.p->m_log_part_id = c_nextLogPart++;
+    
     for (Uint32 i = 0; i<noReplicas; i++) {
       const Uint32 nodeId = fragments[index++];
       ReplicaRecordPtr replicaPtr;
@@ -6638,9 +6857,9 @@
   jam();
   const Uint32 fragCount = tabPtr.p->totalfragments;
   ReplicaRecordPtr replicaPtr; replicaPtr.i = RNIL;
+  FragmentstorePtr fragPtr;
   for(; fragId<fragCount; fragId++){
     jam();
-    FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);    
     
     replicaPtr.i = fragPtr.p->storedReplicas;
@@ -6698,6 +6917,7 @@
     req->nodeId = getOwnNodeId();
     req->totalFragments = fragCount;
     req->startGci = SYSFILE->newestRestorableGCI;
+    req->logPartId = fragPtr.p->m_log_part_id;
     sendSignal(DBDICT_REF, GSN_ADD_FRAGREQ, signal, 
 	       AddFragReq::SignalLength, JBB);
     return;
@@ -7179,39 +7399,66 @@
 
 void Dbdih::execDI_FCOUNTREQ(Signal* signal) 
 {
+  DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtr();
   ConnectRecordPtr connectPtr;
   TabRecordPtr tabPtr;
+  const BlockReference senderRef = signal->senderBlockRef();
+  const Uint32 senderData = req->m_senderData;
   jamEntry();
-  connectPtr.i = signal->theData[0];
-  tabPtr.i = signal->theData[1];
+  connectPtr.i = req->m_connectionData;
+  tabPtr.i = req->m_tableRef;
   ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
 
-  ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
+  if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+  {
+    DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+    //connectPtr.i == RNIL -> question without connect record
+    if(connectPtr.i == RNIL)
+      ref->m_connectionData = RNIL;
+    else
+      ref->m_connectionData = connectPtr.p->userpointer;
+    ref->m_tableRef = tabPtr.i;
+    ref->m_senderData = senderData;
+    ref->m_error = DihFragCountRef::ErroneousTableState;
+    ref->m_tableStatus = tabPtr.p->tabStatus;
+    sendSignal(senderRef, GSN_DI_FCOUNTREF, signal, 
+               DihFragCountRef::SignalLength, JBB);
+    return;
+  }
 
   if(connectPtr.i != RNIL){
     ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
     if (connectPtr.p->connectState == ConnectRecord::INUSE) {
       jam();
-      signal->theData[0] = connectPtr.p->userpointer;
-      signal->theData[1] = tabPtr.p->totalfragments;
-      sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,2, JBB);
-      return;
-    }//if
-    signal->theData[0] = connectPtr.p->userpointer;
-    signal->theData[1] = ZERRONOUSSTATE;
-    sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal, 2, JBB);
+      DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
+      conf->m_connectionData = connectPtr.p->userpointer;
+      conf->m_tableRef = tabPtr.i;
+      conf->m_senderData = senderData;
+      conf->m_fragmentCount = tabPtr.p->totalfragments;
+      conf->m_noOfBackups = tabPtr.p->noOfBackups;
+      sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,
+                 DihFragCountConf::SignalLength, JBB);
+      return;
+    }//if
+    DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+    ref->m_connectionData = connectPtr.p->userpointer;
+    ref->m_tableRef = tabPtr.i;
+    ref->m_senderData = senderData;
+    ref->m_error = DihFragCountRef::ErroneousTableState;
+    ref->m_tableStatus = tabPtr.p->tabStatus;
+    sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal, 
+               DihFragCountRef::SignalLength, JBB);
     return;
   }//if
-
+  DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
   //connectPtr.i == RNIL -> question without connect record
-  const Uint32 senderData = signal->theData[2];
-  const BlockReference senderRef = signal->senderBlockRef();
-  signal->theData[0] = RNIL;
-  signal->theData[1] = tabPtr.p->totalfragments;
-  signal->theData[2] = tabPtr.i;
-  signal->theData[3] = senderData;
-  signal->theData[4] = tabPtr.p->noOfBackups;
-  sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal, 5, JBB);
+  conf->m_connectionData = RNIL;
+  conf->m_tableRef = tabPtr.i;
+  conf->m_senderData = senderData;
+  conf->m_fragmentCount = tabPtr.p->totalfragments;
+  conf->m_noOfBackups = tabPtr.p->noOfBackups;
+  sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal, 
+             DihFragCountConf::SignalLength, JBB);
 }//Dbdih::execDI_FCOUNTREQ()
 
 void Dbdih::execDIGETPRIMREQ(Signal* signal) 
@@ -8884,8 +9131,8 @@
     // otherwise we have a problem.
     /* --------------------------------------------------------------------- */
     jam();
-    ndbrequire(senderNodeId == c_nodeStartMaster.startNode);
-    nodeRestartStartRecConfLab(signal);
+    ndbout_c("startNextCopyFragment");
+    startNextCopyFragment(signal, findTakeOver(senderNodeId));
     return;
   } else {
     /* --------------------------------------------------------------------- */
@@ -9904,9 +10151,11 @@
 }
 
 void Dbdih::findReplica(ReplicaRecordPtr& replicaPtr, 
-			Fragmentstore* fragPtrP, Uint32 nodeId)
+			Fragmentstore* fragPtrP, 
+			Uint32 nodeId,
+			bool old)
 {
-  replicaPtr.i = fragPtrP->storedReplicas;
+  replicaPtr.i = old ? fragPtrP->oldStoredReplicas : fragPtrP->storedReplicas;
   while(replicaPtr.i != RNIL){
     ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
     if (replicaPtr.p->procNode == nodeId) {
@@ -11177,6 +11426,7 @@
   currentgcp = 0;
   cverifyQueueCounter = 0;
   cwaitLcpSr = false;
+  c_nextLogPart = 0;
 
   nodeResetStart();
   c_nodeStartMaster.wait = ZFALSE;
@@ -11197,7 +11447,11 @@
 
   cnoReplicas = 1;
   ndb_mgm_get_int_parameter(p, CFG_DB_NO_REPLICAS, &cnoReplicas);
-  cnoReplicas = cnoReplicas > 4 ? 4 : cnoReplicas;
+  if (cnoReplicas > 4)
+  {
+    progError(__LINE__, NDBD_EXIT_INVALID_CONFIG,
+	      "Only up to four replicas are supported. Check NoOfReplicas.");
+  }
 
   cgcpDelay = 2000;
   ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &cgcpDelay);
@@ -11787,14 +12041,14 @@
     break;
   case CheckNodeGroups::GetNodeGroupMembers: {
     ok = true;
-    Uint32 ownNodeGoup =
+    Uint32 ownNodeGroup =
       Sysfile::getNodeGroup(sd->nodeId, SYSFILE->nodeGroups);
 
-    sd->output = ownNodeGoup;
+    sd->output = ownNodeGroup;
     sd->mask.clear();
 
     NodeGroupRecordPtr ngPtr;
-    ngPtr.i = ownNodeGoup;
+    ngPtr.i = ownNodeGroup;
     if (ownNodeGoup!=NO_NODE_GROUP_ID)
     {
       ptrCheckGuard(ngPtr, MAX_NDB_NODES, nodeGroupRecord);
@@ -11805,7 +12059,7 @@
     }
 #if 0
     for (int i = 0; i < MAX_NDB_NODES; i++) {
-      if (ownNodeGoup == 
+      if (ownNodeGroup == 
 	  Sysfile::getNodeGroup(i, SYSFILE->nodeGroups)) {
 	sd->mask.set(i);
       }
@@ -11980,6 +12234,8 @@
     jam();
     fragPtr.p->distributionKey = TdistKey;
   }//if
+
+  fragPtr.p->m_log_part_id = readPageWord(rf);
 }//Dbdih::readFragment()
 
 Uint32 Dbdih::readPageWord(RWFragment* rf) 
@@ -13086,6 +13342,7 @@
   writePageWord(wf, fragPtr.p->noStoredReplicas);
   writePageWord(wf, fragPtr.p->noOldStoredReplicas);
   writePageWord(wf, fragPtr.p->distributionKey);
+  writePageWord(wf, fragPtr.p->m_log_part_id);
 }//Dbdih::writeFragment()
 
 void Dbdih::writePageWord(RWFragment* wf, Uint32 dataWord)

--- 1.29/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2005-11-30 02:14:49 +13:00
+++ 1.30/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2006-01-28 17:41:40 +13:00
@@ -14,6 +14,7 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
+#include <my_config.h>
 #include "Suma.hpp"
 
 #include <ndb_version.h>
@@ -42,11 +43,14 @@
 #include <signaldata/CreateTab.hpp>
 #include <signaldata/DropTab.hpp>
 #include <signaldata/AlterTab.hpp>
+#include <signaldata/DihFragCount.hpp>
+#include <signaldata/SystemError.hpp>
 
 #include <ndbapi/NdbDictionary.hpp>
 
 #include <DebuggerNames.hpp>
 #include <../dbtup/Dbtup.hpp>
+#include <../dbdih/Dbdih.hpp>
 
 //#define HANDOVER_DEBUG
 //#define NODEFAIL_DEBUG
@@ -694,7 +698,7 @@
   
   if(failed.get(Restart.nodeId))
   {
-    Restart.nodeId = 0;
+    Restart.resetRestart(signal);
   }
 
   signal->theData[0] = SumaContinueB::RESEND_BUCKET;
@@ -1029,6 +1033,10 @@
   const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
   const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
   const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;
+  const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
+    Subscription::REPORT_ALL : 0;
+  const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
+    Subscription::REPORT_SUBSCRIBE : 0;
   const Uint32 tableId = req.tableId;
 
   Subscription key;
@@ -1077,11 +1085,13 @@
     subPtr.p->m_subscriptionId   = subId;
     subPtr.p->m_subscriptionKey  = subKey;
     subPtr.p->m_subscriptionType = type;
+    subPtr.p->m_options          = reportSubscribe | reportAll;
     subPtr.p->m_tableId          = tableId;
     subPtr.p->m_table_ptrI       = RNIL;
     subPtr.p->m_state            = Subscription::DEFINED;
     subPtr.p->n_subscribers      = 0;
 
+    fprintf(stderr, "table %d options %x\n", subPtr.p->m_tableId, subPtr.p->m_options);
     DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
 		       key.m_subscriptionId, key.m_subscriptionKey));
 
@@ -1173,6 +1183,8 @@
   TablePtr tabPtr;
   initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr);
   tabPtr.p->n_subscribers++;
+  if (subPtr.p->m_options & Subscription::REPORT_ALL)
+    tabPtr.p->m_reportAll = true;
   DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
 		     tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
   DBUG_VOID_RETURN;
@@ -1388,6 +1400,8 @@
     DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u",
 		       tabPtr.p->m_tableId, tabPtr.i, tabPtr.p->n_subscribers));
 
+    tabPtr.p->m_reportAll = false;
+
     tabPtr.p->m_error         = 0;
     tabPtr.p->m_schemaVersion = RNIL;
     tabPtr.p->m_state = Table::DEFINING;
@@ -1538,10 +1552,12 @@
    * We need to gather fragment info
    */
   jam();
-  signal->theData[0] = RNIL;
-  signal->theData[1] = tableId;
-  signal->theData[2] = tabPtr.i;
-  sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
+  DihFragCountReq* req = (DihFragCountReq*)signal->getDataPtrSend();
+  req->m_connectionData = RNIL;
+  req->m_tableRef = tableId;
+  req->m_senderData = tabPtr.i;
+  sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 
+             DihFragCountReq::SignalLength, JBB);
 }
 
 bool
@@ -1687,20 +1703,52 @@
 }
 
 void 
+Suma::execDI_FCOUNTREF(Signal* signal)
+{
+  jamEntry();
+  DBUG_ENTER("Suma::execDI_FCOUNTREF");
+  DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+  switch ((DihFragCountRef::ErrorCode) ref->m_error)
+  {
+  case DihFragCountRef::ErroneousTableState:
+    jam();
+    if (ref->m_tableStatus == Dbdih::TabRecord::TS_CREATING)
+    {
+      const Uint32 tableId = ref->m_senderData;
+      const Uint32 tabPtr_i = ref->m_tableRef;      
+      DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+
+      req->m_connectionData = RNIL;
+      req->m_tableRef = tabPtr_i;
+      req->m_senderData = tableId;
+      sendSignalWithDelay(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 
+                          DihFragCountReq::SignalLength, 
+                          DihFragCountReq::RetryInterval);
+      DBUG_VOID_RETURN;
+    }
+    ndbrequire(false);
+  default:
+    ndbrequire(false);
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+void 
 Suma::execDI_FCOUNTCONF(Signal* signal)
 {
   jamEntry();
   DBUG_ENTER("Suma::execDI_FCOUNTCONF");
   ndbassert(signal->getNoOfSections() == 0);
-
-  const Uint32 userPtr = signal->theData[0];
-  const Uint32 fragCount = signal->theData[1];
-  const Uint32 tableId = signal->theData[2];
+  DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+  const Uint32 userPtr = conf->m_connectionData;
+  const Uint32 fragCount = conf->m_fragmentCount;
+  const Uint32 tableId = conf->m_tableRef;
 
   ndbrequire(userPtr == RNIL && signal->length() == 5);
 
   TablePtr tabPtr;
-  tabPtr.i= signal->theData[3];
+  tabPtr.i= conf->m_senderData;
   ndbrequire((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) != 0);
   ndbrequire(tabPtr.p->m_tableId == tableId);
 
@@ -2162,6 +2210,8 @@
     jam();
     initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr);
     tabPtr.p->n_subscribers++;
+    if (subPtr.p->m_options & Subscription::REPORT_ALL)
+      tabPtr.p->m_reportAll = true;
     DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
 		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
     DBUG_VOID_RETURN;
@@ -2204,6 +2254,10 @@
 		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
   sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
 	     SubStartConf::SignalLength, JBB);
+
+  reportAllSubscribers(signal, NdbDictionary::Event::_TE_SUBSCRIBE,
+                       subPtr, subbPtr);
+
   DBUG_VOID_RETURN;
 }
 
@@ -2476,9 +2530,60 @@
   DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
 		     c_subscriberPool.getSize(),
 		     c_subscriberPool.getNoOfFree()));
+
+  reportAllSubscribers(signal, NdbDictionary::Event::_TE_UNSUBSCRIBE,
+                       subPtr, subbPtr);
+
   DBUG_VOID_RETURN;
 }
 
+// report new started subscriber to all other subscribers
+void
+Suma::reportAllSubscribers(Signal *signal,
+                           NdbDictionary::Event::_TableEvent table_event,
+                           SubscriptionPtr subPtr,
+                           SubscriberPtr subbPtr)
+{
+  if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
+  {
+    return;
+  }
+  if (subPtr.p->n_subscribers == 0)
+  {
+    ndbrequire(table_event != NdbDictionary::Event::_TE_SUBSCRIBE);
+    return;
+  }
+ 
+  SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
+  data->gci            = m_last_complete_gci + 1;
+  data->tableId        = subPtr.p->m_tableId;
+  data->operation      = table_event;
+  data->logType        = 0;
+  data->ndbd_nodeid    = refToNode(reference());
+  
+  TablePtr tabPtr;
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  LocalDLList<Subscriber> subbs(c_subscriberPool, tabPtr.p->c_subscribers);
+  SubscriberPtr i_subbPtr;
+  for(subbs.first(i_subbPtr); !i_subbPtr.isNull(); subbs.next(i_subbPtr))
+  {
+    if (i_subbPtr.p->m_subPtrI == subPtr.i)
+    {
+      data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
+      data->senderData = i_subbPtr.p->m_senderData;
+      sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                 SubTableData::SignalLength, JBB);
+      if (i_subbPtr.i != subbPtr.i)
+      {
+        data->req_nodeid = refToNode(i_subbPtr.p->m_senderRef);
+        data->senderData = subbPtr.p->m_senderData;
+        sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                   SubTableData::SignalLength, JBB);
+      }
+    }
+  }
+}
+
 void
 Suma::sendSubStopRef(Signal* signal, Uint32 errCode)
 {
@@ -2532,6 +2637,7 @@
       req->setTriggerEvent((TriggerEvent::Value)j);
       req->setTableId(m_tableId);
       req->setAttributeMask(attrMask);
+      req->setReportAllMonitoredAttributes(m_reportAll);
       suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 
 		      signal, CreateTrigReq::SignalLength, JBB);
       ret= 1;
@@ -3156,7 +3262,8 @@
 	  Page_pos pos= bucket->m_buffer_head;
 	  ndbrequire(pos.m_max_gci < gci);
 
-	  Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+	  Buffer_page* page= (Buffer_page*)
+	    m_tup->c_page_pool.getPtr(pos.m_page_id);
 	  ndbout_c("takeover %d", pos.m_page_id);
 	  page->m_max_gci = pos.m_max_gci;
 	  page->m_words_used = pos.m_page_pos;
@@ -3659,7 +3766,33 @@
   jamEntry();
   DBUG_ENTER("Suma::execSUB_CREATE_REF");
   ndbassert(signal->getNoOfSections() == 0);
-  ndbrequire(false);
+  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
+  Uint32 error= ref->errorCode;
+  if (error != 1415)
+  {
+    /*
+     * This will happen if an api node connects during while other node
+     * is restarting, and in this case the subscription will already
+     * have been created.
+     * ToDo: more complete handling of api nodes joining during
+     * node restart
+     */
+    Uint32 senderRef = signal->getSendersBlockRef();
+    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
+    // for some reason we did not manage to create a subscription
+    // on the starting node
+    SystemError * const sysErr = (SystemError*)&signal->theData[0];
+    sysErr->errorCode = SystemError::CopySubscriptionRef;
+    sysErr->errorRef = reference();
+    sysErr->data1 = error;
+    sysErr->data2 = 0;
+    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
+               SystemError::SignalLength, JBB);
+    Restart.resetRestart(signal);
+    DBUG_VOID_RETURN;
+  }
+  // SubCreateConf has same signaldata as SubCreateRef
+  Restart.runSUB_CREATE_CONF(signal);
   DBUG_VOID_RETURN;
 }
 
@@ -3688,7 +3821,22 @@
   jamEntry();
   DBUG_ENTER("Suma::execSUB_START_REF");
   ndbassert(signal->getNoOfSections() == 0);
-  ndbrequire(false);
+  SubStartRef *const ref= (SubStartRef *)signal->getDataPtr();
+  Uint32 error= ref->errorCode;
+  {
+    Uint32 senderRef = signal->getSendersBlockRef();
+    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
+    // for some reason we did not manage to start a subscriber
+    // on the starting node
+    SystemError * const sysErr = (SystemError*)&signal->theData[0];
+    sysErr->errorCode = SystemError::CopySubscriberRef;
+    sysErr->errorRef = reference();
+    sysErr->data1 = error;
+    sysErr->data2 = 0;
+    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
+               SystemError::SignalLength, JBB);
+    Restart.resetRestart(signal);
+  }
   DBUG_VOID_RETURN;
 }
 
@@ -3983,6 +4131,15 @@
   //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
   suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
 		  SumaStartMeConf::SignalLength, JBB);
+  resetRestart(signal);
+  DBUG_VOID_RETURN;
+}
+
+void
+Suma::Restart::resetRestart(Signal* signal)
+{
+  jam();
+  DBUG_ENTER("Suma::Restart::resetRestart");
   nodeId = 0;
   DBUG_VOID_RETURN;
 }
@@ -4098,8 +4255,14 @@
   Bucket* bucket= c_buckets+buck;
   Page_pos pos= bucket->m_buffer_head;
 
-  Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
-  Uint32* ptr= page->m_data + pos.m_page_pos;
+  Buffer_page* page = 0;
+  Uint32 *ptr = 0;
+  
+  if (likely(pos.m_page_id != RNIL))
+  {
+    page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
+    ptr= page->m_data + pos.m_page_pos;
+  }
 
   const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
   
@@ -4157,7 +4320,7 @@
     pos.m_page_pos = sz;
     pos.m_last_gci = gci;
     
-    page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+    page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
     page->m_next_page= RNIL;
     ptr= page->m_data;
     goto loop; //
@@ -4188,7 +4351,7 @@
   
   if(tail != RNIL)
   {
-    Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
     bucket->m_buffer_tail = page->m_next_page;
     free_page(tail, page);
     signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
@@ -4232,8 +4395,8 @@
   Uint32 ref= m_first_free_page;
   if(likely(ref != RNIL))
   {
-    m_first_free_page = ((Buffer_page*)m_tup->cpage+ref)->m_next_page;
-    Uint32 chunk = ((Buffer_page*)m_tup->cpage+ref)->m_page_chunk_ptr_i;
+    m_first_free_page = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page;
+    Uint32 chunk = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
     c_page_chunk_pool.getPtr(ptr, chunk);
     ndbassert(ptr.p->m_free);
     ptr.p->m_free--;
@@ -4256,7 +4419,7 @@
   Buffer_page* page;
   for(Uint32 i = 0; i<count; i++)
   {
-    page = (Buffer_page*)(m_tup->cpage+ref);
+    page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref);
     page->m_page_state= SUMA_SEQUENCE;
     page->m_page_chunk_ptr_i = ptr.i;
     page->m_next_page = ++ref;
@@ -4320,7 +4483,7 @@
   else
   {
     jam();
-    Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
     Uint32 max_gci = page->m_max_gci;
     Uint32 next_page = page->m_next_page;
 
@@ -4413,7 +4576,7 @@
   Bucket* bucket= c_buckets+buck;
   Uint32 tail= bucket->m_buffer_tail;
 
-  Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+  Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
   Uint32 max_gci = page->m_max_gci;
   Uint32 next_page = page->m_next_page;
   Uint32 *ptr = page->m_data + pos;
Thread
bk commit into 5.2 tree (stewart:1.2078)Stewart Smith30 Jan