List:Internals« Previous MessageNext Message »
From:jonas.oreland Date:May 18 2005 12:10pm
Subject:bk commit into 5.1 tree (joreland:1.1865)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1865 05/05/18 14:10:28 joreland@stripped +13 -0
  wl1445 - ndb - event during NF
    SUMA part

  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
    1.14 05/05/18 14:10:24 joreland@stripped +1 -0
    API_START_REP

  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
    1.20 05/05/18 14:10:24 joreland@stripped +6 -0
    API_START_REP

  storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
    1.15 05/05/18 14:10:24 joreland@stripped +22 -16
    Impl. resend during NF
    Change handover protocol
    Add buffer management wrt NF

  storage/ndb/src/kernel/blocks/suma/Suma.hpp
    1.15 05/05/18 14:10:24 joreland@stripped +112 -61
    Impl. resend during NF
    Change handover protocol
    Add buffer management wrt NF

  storage/ndb/src/kernel/blocks/suma/Suma.cpp
    1.35 05/05/18 14:10:24 joreland@stripped +792 -447
    Impl. resend during NF
    Change handover protocol
    Add buffer management wrt NF

  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
    1.17 05/05/18 14:10:24 joreland@stripped +3 -0
    Let Cntr redistribute API_START_REP

  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
    1.19 05/05/18 14:10:24 joreland@stripped +10 -0
    Let Cntr redistribute API_START_REP

  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp
    1.7 05/05/18 14:10:23 joreland@stripped +1 -0
    Let Cntr redistribute API_START_REP

  storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
    1.4 05/05/18 14:10:23 joreland@stripped +1 -0
    Let Cntr redistribute API_START_REP

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.28 05/05/18 14:10:23 joreland@stripped +1 -0
    Allow suma to use TUP pages

  storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
    1.11 05/05/18 14:10:23 joreland@stripped +5 -2
    Add signal names for suma signals

  storage/ndb/include/kernel/signaldata/SumaImpl.hpp
    1.10 05/05/18 14:10:23 joreland@stripped +16 -32
    Add bucket mask to HandoverReq
    Add ContinueB

  storage/ndb/include/kernel/GlobalSignalNumbers.h
    1.13 05/05/18 14:10:23 joreland@stripped +2 -1
    Add API_START_REP service

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	joreland
# Host:	eel.hemma.oreland.se.ndb.mysql.com.ndb.mysql.com.ndb.mysql.com.ndb.mysql.com
# Root:	/home/jonas/src/mysql-5.1-wl2325

--- 1.12/storage/ndb/include/kernel/GlobalSignalNumbers.h	Fri May 13 12:58:12 2005
+++ 1.13/storage/ndb/include/kernel/GlobalSignalNumbers.h	Wed May 18 14:10:23 2005
@@ -169,6 +169,7 @@
 #define GSN_ADD_FRAGREF                 110
 #define GSN_ADD_FRAGREQ                 111
 
+#define GSN_API_START_REP               120
 #define GSN_API_FAILCONF                113
 #define GSN_API_FAILREQ                 114
 #define GSN_CNTR_START_REQ              115
@@ -176,7 +177,7 @@
 #define GSN_CNTR_START_REF              117
 #define GSN_CNTR_START_CONF             118
 #define GSN_CNTR_START_REP              119
-/* 120 unused */
+/* 120 not unused */
 /* 121 unused */
 /* 122 unused */
 /* 123 unused */

--- 1.9/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	Fri May 13 12:58:12 2005
+++ 1.10/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	Wed May 18 14:10:23 2005
@@ -29,7 +29,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 6 );
   
   enum SubscriptionType {
@@ -59,7 +58,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_CREATE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 3 );
 
   Uint32 senderRef;
@@ -75,7 +73,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_CREATE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
   
   Uint32 senderRef;
@@ -83,7 +80,6 @@
 };
 
 struct SubscriptionData {
-public:
   enum Part {
     MetaData = 1,
     TableData = 2
@@ -97,7 +93,6 @@
   friend struct Suma;
   
   friend bool printSUB_START_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 6 );
   STATIC_CONST( SignalLength2 = SignalLength+1 );
 
@@ -117,7 +112,6 @@
   friend struct Suma;
   
   friend bool printSUB_START_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   enum ErrorCode {
     Undefined = 1,
     NF_FakeErrorREF = 11,
@@ -146,7 +140,6 @@
   friend struct Grep;
   
   friend bool printSUB_START_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 7 );
   STATIC_CONST( SignalLength2 = SignalLength+1 );
 
@@ -168,7 +161,6 @@
   friend struct Suma;
   
   friend bool printSUB_STOP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 7 );
   Uint32 senderRef;
   Uint32 senderData;
@@ -186,7 +178,6 @@
   friend struct Suma;
   
   friend bool printSUB_STOP_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   enum ErrorCode {
     Undefined = 1,
     NF_FakeErrorREF = 11,
@@ -212,7 +203,6 @@
   friend struct Grep;
   
   friend bool printSUB_STOP_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 7 );
 
   Uint32 senderRef;
@@ -232,10 +222,8 @@
   friend struct Grep;
   
   friend bool printSUB_SYNC_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 5 );
   
-public:
   Uint32 senderRef;
   Uint32 senderData;
   Uint32 subscriptionId;
@@ -254,7 +242,6 @@
   friend struct Grep;
   
   friend bool printSUB_SYNC_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   enum ErrorCode {
     Undefined = 1
   };
@@ -274,7 +261,6 @@
   friend struct Grep;
   
   friend bool printSUB_SYNC_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
   
   Uint32 senderRef;
@@ -289,7 +275,6 @@
   friend struct Grep;
   
   friend bool printSUB_META_DATA(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 3 );
   SECTION( DICT_TAB_INFO = 0 );
   
@@ -306,7 +291,6 @@
   friend struct Grep;
   
   friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 7 );
   
   enum LogType {
@@ -338,7 +322,6 @@
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
 
   Uint32 subscriberData;
@@ -354,7 +337,6 @@
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
 
   Uint32 subscriptionId;
@@ -370,7 +352,6 @@
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
 
   Uint32 subscriptionId;
@@ -388,7 +369,6 @@
   friend struct Trix;
   
   friend bool printSUB_GCP_COMPLETE_REP(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 3 );
   
   Uint32 gci;
@@ -400,7 +380,6 @@
   /**
    * Sender(s)/Reciver(s)
    */
-public:
   STATIC_CONST( SignalLength = SubGcpCompleteRep::SignalLength );
 
   SubGcpCompleteRep rep;
@@ -414,7 +393,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_REMOVE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 4 );
   
   Uint32 senderRef;
@@ -431,7 +409,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_REMOVE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 5 );
   enum ErrorCode {
     Undefined = 1,
@@ -454,7 +431,6 @@
   friend struct SumaParticipant;
   
   friend bool printSUB_REMOVE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 5 );
   
   Uint32 senderRef;
@@ -471,7 +447,6 @@
   
   friend bool printCREATE_SUBSCRIPTION_ID_REQ(FILE *, const Uint32 *, 
 					       Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 2 );
   
   Uint32 senderRef;
@@ -485,7 +460,6 @@
   
   friend bool printCREATE_SUBSCRIPTION_ID_CONF(FILE *, const Uint32 *, 
 					       Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 4 );
   
   Uint32 senderRef;
@@ -501,7 +475,6 @@
   
   friend bool printCREATE_SUBSCRIPTION_ID_REF(FILE *, const Uint32 *, 
 					       Uint32, Uint16);
-public:
   STATIC_CONST( SignalLength = 3 );
   
   Uint32 senderRef;
@@ -510,7 +483,6 @@
 };
 
 struct SumaStartMeReq {
-public:
   STATIC_CONST( SignalLength = 1 );
   Uint32 unused;
 };
@@ -529,14 +501,26 @@
 };
 
 struct SumaHandoverReq {
-public:
-  STATIC_CONST( SignalLength = 1 );
+  STATIC_CONST( SignalLength = 3 );
   Uint32 gci;
+  Uint32 nodeId;
+  Uint32 theBucketMask[1];
 };
 
 struct SumaHandoverConf {
-public:
-  STATIC_CONST( SignalLength = 1 );
+  STATIC_CONST( SignalLength = 3 );
   Uint32 gci;
+  Uint32 nodeId;
+  Uint32 theBucketMask[1];
 };
+
+struct SumaContinueB
+{
+  enum 
+  {
+    RESEND_BUCKET = 1
+    ,RELEASE_GCI = 2
+  };
+};
+
 #endif

--- 1.10/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	Wed Apr 27 23:32:57 2005
+++ 1.11/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	Wed May 18 14:10:23 2005
@@ -506,10 +506,13 @@
   ,{ GSN_CREATE_EVNT_CONF,       "CREATE_EVNT_CONF" }
   ,{ GSN_CREATE_EVNT_REF,        "CREATE_EVNT_REF" }
 
-  ,{ GSN_SUMA_START_ME,          "SUMA_START_ME" }  
+  ,{ GSN_SUMA_START_ME_REQ,      "SUMA_START_ME_REQ" }  
+  ,{ GSN_SUMA_START_ME_REF,      "SUMA_START_ME_REF" }  
+  ,{ GSN_SUMA_START_ME_CONF,     "SUMA_START_ME_CONF" }  
   ,{ GSN_SUMA_HANDOVER_REQ,      "SUMA_HANDOVER_REQ"}
+  ,{ GSN_SUMA_HANDOVER_REF,      "SUMA_HANDOVER_REF"}
   ,{ GSN_SUMA_HANDOVER_CONF,     "SUMA_HANDOVER_CONF"} 
-
+  
   ,{ GSN_DROP_EVNT_REQ,          "DROP_EVNT_REQ" }
   ,{ GSN_DROP_EVNT_CONF,         "DROP_EVNT_CONF" }
   ,{ GSN_DROP_EVNT_REF,          "DROP_EVNT_REF" }

--- 1.27/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Mon May  2 17:06:36 2005
+++ 1.28/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Wed May 18 14:10:23 2005
@@ -345,6 +345,7 @@
 #endif
 
 class Dbtup: public SimulatedBlock {
+  friend class Suma; 
 public:
 
   typedef bool (Dbtup::* ReadFunction)(Uint32*,

--- 1.3/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp	Fri Apr  8 02:44:05 2005
+++ 1.4/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp	Wed May 18 14:10:23 2005
@@ -159,6 +159,7 @@
   BLOCK_DEFINES(Ndbcntr);
 
   // Transit signals
+  void execAPI_START_REP(Signal*);
   void execCONTINUEB(Signal* signal);
   void execREAD_NODESCONF(Signal* signal);
   void execREAD_NODESREF(Signal* signal);

--- 1.6/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp	Fri Apr  8 02:44:05 2005
+++ 1.7/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp	Wed May 18 14:10:23 2005
@@ -55,6 +55,7 @@
   addRecSignal(GSN_CNTR_START_CONF, &Ndbcntr::execCNTR_START_CONF);
   addRecSignal(GSN_CNTR_WAITREP, &Ndbcntr::execCNTR_WAITREP);
   addRecSignal(GSN_CNTR_START_REP, &Ndbcntr::execCNTR_START_REP);
+  addRecSignal(GSN_API_START_REP, &Ndbcntr::execAPI_START_REP, true);
   addRecSignal(GSN_NODE_FAILREP, &Ndbcntr::execNODE_FAILREP);
   addRecSignal(GSN_SYSTEM_ERROR , &Ndbcntr::execSYSTEM_ERROR);
   

--- 1.18/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	Fri May 13 12:58:12 2005
+++ 1.19/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	Wed May 18 14:10:24 2005
@@ -136,6 +136,16 @@
   }//switch
 }//Ndbcntr::execCONTINUEB()
 
+void
+Ndbcntr::execAPI_START_REP(Signal* signal)
+{
+  if(refToBlock(signal->getSendersBlockRef()) == QMGR)
+  {
+    for(Uint32 i = 0; i<ALL_BLOCKS_SZ; i++){
+      sendSignal(ALL_BLOCKS[i].Ref, GSN_API_START_REP, signal, 1, JBB);
+    }
+  }
+}
 /*******************************/
 /*  SYSTEM_ERROR               */
 /*******************************/

--- 1.16/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	Wed Apr 27 23:32:58 2005
+++ 1.17/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	Wed May 18 14:10:24 2005
@@ -1993,6 +1993,9 @@
     apiNodePtr.p->blockRef = ref;
     signal->theData[0] = apiNodePtr.i;
     sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA);
+
+    signal->theData[0] = apiNodePtr.i;
+    EXECUTE_DIRECT(NDBCNTR, GSN_API_START_REP, signal, 1);
   }
   return;
 }//Qmgr::execAPI_REGREQ()

--- 1.34/storage/ndb/src/kernel/blocks/suma/Suma.cpp	Fri May 13 12:58:12 2005
+++ 1.35/storage/ndb/src/kernel/blocks/suma/Suma.cpp	Wed May 18 14:10:24 2005
@@ -46,6 +46,7 @@
 #include <ndbapi/NdbDictionary.hpp>
 
 #include <DebuggerNames.hpp>
+#include <../dbtup/Dbtup.hpp>
 
 //#define HANDOVER_DEBUG
 //#define NODEFAIL_DEBUG
@@ -76,6 +77,7 @@
 Uint32 g_subPtrI = RNIL;
 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
 
+static const Uint32 MAX_CONCURRENT_GCP = 2;
 
 /**************************************************************
  *
@@ -118,7 +120,7 @@
   const Uint32 replicas= c_noNodesInGroup;
 
   Uint32 buckets= 1;
-  for(i = 0; i<replicas; i++)
+  for(i = 1; i <= replicas; i++)
     buckets *= i;
   
   for(i = 0; i<buckets; i++)
@@ -131,7 +133,6 @@
   }
   
   c_no_of_buckets= buckets;
-  //  ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
   ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
 
 #ifndef DBUG_OFF
@@ -160,6 +161,7 @@
   if(startphase == 3)
   {
     jam();
+    ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
     signal->theData[0] = reference();
     sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
     DBUG_VOID_RETURN;
@@ -183,17 +185,18 @@
     if (typeOfStart != NodeState::ST_NODE_RESTART &&
 	typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
     {
-      for( int i = 0; i < NO_OF_BUCKETS; i++)
+      for( Uint32 i = 0; i < c_no_of_buckets; i++)
       {
-	if (getResponsibleSumaNodeId(i) == refToNode(reference()))
+	if (get_responsible_node(i) == getOwnNodeId())
 	{
 	  // I'm running this bucket
 	  DBUG_PRINT("info",("bucket %u set to true", i));
-	  c_buckets[i].active = true;
+	  m_active_buckets.set(i);
+	  ndbout_c("m_active_buckets.set(%d)", i);
 	}
       }
     }
-
+    
     if(typeOfStart == NodeState::ST_INITIAL_START &&
        c_masterNodeId == getOwnNodeId())
     {
@@ -202,7 +205,7 @@
       DBUG_VOID_RETURN;
     }//if
   }//if
-
+  
   if(startphase == 9)
   {
     if (typeOfStart == NodeState::ST_NODE_RESTART ||
@@ -211,21 +214,22 @@
       /**
        * Handover code here
        */
-      DBUG_VOID_RETURN;
+      c_startup.m_wait_handover= true;
+      check_start_handover(signal);
     }
   }
   sendSTTORRY(signal);
-
+  
   DBUG_VOID_RETURN;
 }
 
 void
 Suma::send_start_me_req(Signal* signal)
 {
-  Uint32 nodeId= c_restart_server_node_id;
+  Uint32 nodeId= c_startup.m_restart_server_node_id;
   do {
-    nodeId = c_started_nodes.find(nodeId);
-
+    nodeId = c_alive_nodes.find(nodeId + 1);
+    
     if(nodeId == getOwnNodeId())
       continue;
     if(nodeId == NdbNodeBitmask::NotFound)
@@ -236,7 +240,9 @@
     break;
   } while(true);
   
-  c_restart_server_node_id= nodeId;
+
+  infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
+  c_startup.m_restart_server_node_id= nodeId;
   sendSignal(calcSumaBlockRef(nodeId), 
 	     GSN_SUMA_START_ME_REQ, signal, 1, JBB);
 }
@@ -246,13 +252,19 @@
 {
   const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
   ndbrequire(ref->errorCode == SumaStartMeRef::Busy);
+
+  infoEvent("Suma: node %d refused %d", 
+	    c_startup.m_restart_server_node_id, ref->errorCode);
   send_start_me_req(signal);
 }
 
 void
 Suma::execSUMA_START_ME_CONF(Signal* signal)
 {
+  infoEvent("Suma: node %d has completed restoring me", 
+	    c_startup.m_restart_server_node_id);
   sendSTTORRY(signal);  
+  c_startup.m_restart_server_node_id= 0;
 }
 
 void
@@ -290,31 +302,72 @@
   jamEntry();
   ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
  
-  c_started_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
-  c_starting_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
-
-  c_alive_nodes.assign(c_started_nodes);
-  c_alive_nodes.bitOR(c_starting_nodes);
-
+  if(getNodeState().getNodeRestartInProgress())
+  {
+    c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
+    c_alive_nodes.set(getOwnNodeId()); 
+  }
+  else
+  {
+    c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
+    NdbNodeBitmask tmp;
+    tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
+    ndbrequire(tmp.isclear()); // No nodes can be started during SR
+  }
+  
   c_masterNodeId = conf->masterNodeId;
   
   sendSTTORRY(signal);
 }
 
 void
-Suma::execNODE_START_REP(Signal* signal)
+Suma::execAPI_START_REP(Signal* signal)
 {
   Uint32 nodeId = signal->theData[0];
-  if(c_nodes_in_nodegroup_mask.get(nodeId))
+  c_connected_nodes.set(nodeId);
+  
+  if(c_startup.m_wait_handover)
   {
-    /**
-     * A handover should have happened for all nodes in my node group
-     */
-    ndbrequire(c_started_nodes.get(nodeId));
+    check_start_handover(signal);
+  }
+}
+
+void
+Suma::check_start_handover(Signal* signal)
+{
+  NodeBitmask tmp;
+  tmp.assign(c_connected_nodes);
+  tmp.bitAND(c_subscriber_nodes);
+  if(!c_subscriber_nodes.equal(tmp))
+  {
+    return;
   }
   
-  c_started_nodes.set(nodeId);
-  c_starting_nodes.clear(nodeId);
+  c_startup.m_wait_handover= false;
+  send_handover_req(signal);
+}
+
+void
+Suma::send_handover_req(Signal* signal)
+{
+  NdbNodeBitmask tmp; 
+  tmp.assign(c_alive_nodes);
+  tmp.bitAND(c_nodes_in_nodegroup_mask);
+  tmp.clear(getOwnNodeId());
+  Uint32 gci= m_last_complete_gci + 3;
+  
+  SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
+  char buf[255];
+  tmp.getText(buf);
+  infoEvent("Suma: initiate handover with nodes %s GCI: %d",
+	    buf, gci);
+
+  req->gci = gci;
+  req->nodeId = getOwnNodeId();
+  
+  NodeReceiverGroup rg(SUMA, tmp);
+  sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal, 
+	     SumaHandoverReq::SignalLength, JBB);
 }
 
 #if 0
@@ -372,6 +425,19 @@
 void
 Suma::execCONTINUEB(Signal* signal){
   jamEntry();
+  Uint32 type= signal->theData[0];
+  switch(type){
+  case SumaContinueB::RELEASE_GCI:
+    release_gci(signal, signal->theData[1], signal->theData[2]);
+    return;
+  case SumaContinueB::RESEND_BUCKET:
+    resend_bucket(signal, 
+		  signal->theData[1], 
+		  signal->theData[2],
+		  signal->theData[3],
+		  signal->theData[4]);
+    return;
+  }
 }
 
 
@@ -389,12 +455,27 @@
   //BlockReference retRef = signal->theData[1];
 
   c_failedApiNodes.set(failedApiNode);
+  c_connected_nodes.clear(failedApiNode);
   bool found = removeSubscribersOnNode(signal, failedApiNode);
 
   if(!found){
     jam();
     c_failedApiNodes.clear(failedApiNode);
   }
+
+  SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
+  Ptr<Gcp_record> gcp;
+  for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
+  {
+    ack->rep.gci = gcp.p->m_gci;
+    if(gcp.p->m_subscribers.get(failedApiNode))
+    {
+      ack->rep.senderRef = numberToRef(0, failedApiNode);
+      sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal, 
+		 SubGcpCompleteAck::SignalLength, JBB);
+    }
+  }
+  
   DBUG_VOID_RETURN;
 }//execAPI_FAILREQ()
 
@@ -527,58 +608,48 @@
   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
   NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
   
-  bool changed = false;
-
-  c_nodeFailGCI = getFirstGCI(signal);
   if(failed.get(Restart.nodeId))
   {
     Restart.nodeId = 0;
   }
-  
+
+  signal->theData[0] = SumaContinueB::RESEND_BUCKET;
+
+  NdbNodeBitmask tmp;
+  tmp.assign(c_alive_nodes);
+  tmp.bitANDC(failed);
+
   if(c_nodes_in_nodegroup_mask.overlaps(failed))
   {
-    Uint32 nodeId= failed.find(0);
-    for(; nodeId != NdbNodeBitmask::NotFound; nodeId= failed.find(nodeId))
+    for( Uint32 i = 0; i < c_no_of_buckets; i++) 
     {
-      if (c_handoverToDo) {
-	jam();
-	// TODO what if I'm a SUMA that is currently restarting and the SUMA
-	// responsible for restarting me is the one that died?
-	
-	// a node has failed whilst handover is going on
-	// let's check if we're in the process of handover with that node
-	c_handoverToDo = false;
-	for( int i = 0; i < NO_OF_BUCKETS; i++) {
-	  if (c_buckets[i].handover) {
-	    // I'm doing handover, but is it with the dead node?
-	    if (getResponsibleSumaNodeId(i) == nodeId) {
-	      // so it was the dead node, has handover started?
-	      if (c_buckets[i].handover_started) {
-		jam();
-		// we're not ok and will have lost data!
-		// set not active to indicate this -
-		// this will generate takeover behaviour
-		c_buckets[i].active = false;
-		c_buckets[i].handover_started = false;
-	      } // else we're ok to revert back to state before 
-	      c_buckets[i].handover = false;
-	    } else {
-	      jam();
-	      // ok, we're doing handover with a different node
-	      c_handoverToDo = true;
-	      DBUG_PRINT("info",("Handover to do"));
-	    }
-	  }
+      if(m_active_buckets.get(i))
+	continue;
+      else if(m_switchover_buckets.get(i))
+      {
+	Uint32 state= c_buckets[i].m_state;
+	if((state & Bucket::BUCKET_HANDOVER) && 
+	   failed.get(get_responsible_node(i)))
+	{
+	  m_active_buckets.set(i);
+	  m_switchover_buckets.clear(i);
+	  ndbout_c("aborting handover");
+	} 
+	else if(state & Bucket::BUCKET_STARTING)
+	{
+	  progError(__LINE__, ERR_SYSTEM_ERROR, 
+		    "Nodefailure during SUMA takeover");
 	}
       }
+      else if(get_responsible_node(i, tmp) == getOwnNodeId())
+      {
+	start_resend(signal, i);
+      }
     }
   }
-     
-  c_failoverBuffer.nodeFailRep();
   
-  c_alive_nodes.bitAND(failed);  // this has to be done after the loop above
-  c_started_nodes.bitAND(failed);
-  c_starting_nodes.bitAND(failed);
+  c_alive_nodes.assign(tmp);
+  
   DBUG_VOID_RETURN;
 }
 
@@ -591,7 +662,6 @@
 
   ndbrequire(!c_alive_nodes.get(nodeId));
   c_alive_nodes.set(nodeId);
-  c_starting_nodes.set(nodeId); // set as being prepared
   
 #if 0 // if we include this DIH's got to be prepared, later if needed...
   signal->theData[0] = reference();
@@ -692,6 +762,24 @@
     infoEvent("Suma: c_removeDataSubscribers count: %d",
 	      count_subscribers(c_removeDataSubscribers));
   }
+
+  if(tCase == 8005)
+  {
+    for(Uint32 i = 0; i<c_no_of_buckets; i++)
+    {
+      Bucket* ptr= c_buckets + i;
+      infoEvent("Bucket %d %d%d-%x switch gci: %d max_acked_gci: %d max_gci: %d tail: %d head: %d",
+		i, 
+		m_active_buckets.get(i),
+		m_switchover_buckets.get(i),
+		ptr->m_state,
+		ptr->m_switchover_gci,
+		ptr->m_max_acked_gci,
+		ptr->m_buffer_head.m_max_gci,
+		ptr->m_buffer_tail,
+		ptr->m_buffer_head.m_page_id);
+    }
+  }  
 }
 
 /*************************************************************
@@ -932,15 +1020,6 @@
   return;
 }
 
-Uint32
-Suma::getFirstGCI(Signal* signal) {
-  if (c_lastCompleteGCI == RNIL) {
-    ndbout_c("WARNING: c_lastCompleteGCI == RNIL");
-    return 0;
-  }
-  return c_lastCompleteGCI+3;
-}
-
 /**********************************************************
  *
  * Setting upp trigger for subscription
@@ -1273,10 +1352,7 @@
     SubscriptionPtr subPtr;
     c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
     subPtr.p->m_table_ptrI= tabPtr.i;
-
-    Uint32 gci= getFirstGCI(signal);
-    subbPtr.p->m_firstGCI = gci;
-    sendSubStartComplete(signal,subbPtr,gci,
+    sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
 			 SubscriptionData::TableData);
   }
   DBUG_VOID_RETURN;
@@ -1743,7 +1819,7 @@
   req->transId1 = 0;
   req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
   req->clientOpPtr = (ptrI << 16);
-  req->batch_size_rows= 16;
+  req->batch_size_rows= parallelism;
   req->batch_size_bytes= 0;
   suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 
 		  ScanFragReq::SignalLength, JBB);
@@ -1933,8 +2009,8 @@
   key.m_subscriptionId        = req->subscriptionId;
   key.m_subscriptionKey       = req->subscriptionKey;
   
-  if (c_restart_server_node_id && 
-      refToNode(senderRef) != c_restart_server_node_id)
+  if (c_startup.m_restart_server_node_id && 
+      refToNode(senderRef) != c_startup.m_restart_server_node_id)
   {
     /**
      * only allow "restart_server" Suma's to come through 
@@ -1969,6 +2045,8 @@
 		     c_subscriberPool.getSize(),
 		     c_subscriberPool.getNoOfFree()));
 
+  c_subscriber_nodes.set(refToNode(subscriberRef));
+
   // setup subscription record
   subPtr.p->m_state = Subscription::LOCKED;
   // store these here for later use
@@ -1978,11 +2056,6 @@
   // setup subscriber record
   subbPtr.p->m_senderRef  = subscriberRef;
   subbPtr.p->m_senderData = subscriberData;
-  subbPtr.p->m_firstGCI = RNIL;
-  if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
-    subbPtr.p->m_lastGCI = 0;
-  else
-    subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
   subbPtr.p->m_subPtrI= subPtr.i;
 
   DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
@@ -2012,9 +2085,9 @@
 
 void
 Suma::sendSubStartComplete(Signal* signal,
-				      SubscriberPtr subbPtr, 
-				      Uint32 firstGCI,
-				      SubscriptionData::Part part)
+			   SubscriberPtr subbPtr, 
+			   Uint32 firstGCI,
+			   SubscriptionData::Part part)
 {
   jam();
   DBUG_ENTER("Suma::sendSubStartComplete");
@@ -2735,6 +2808,9 @@
 
     ndbrequire(b_bufferLock == trigId);
 
+    memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
+    b_trigBufferSize += dataLen;
+
     // printf("before values %u %u %u\n",trigId, dataLen,  b_trigBufferSize);
   } else {
     jam();
@@ -2747,10 +2823,11 @@
     } else {
       ndbrequire(f_bufferLock == trigId);
     }
+
+    memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
+    f_trigBufferSize += dataLen;
   }
 
-  memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
-  b_trigBufferSize += dataLen;
   
   DBUG_VOID_RETURN;
 }
@@ -2760,29 +2837,18 @@
 #endif
 
 Uint32 
-Suma::getStoreBucket(Uint32 v)
-{
-  // id will contain id to responsible suma or 
-  // RNIL if we don't have nodegroup info yet
-
-  const Uint32 N = NO_OF_BUCKETS;
-  const Uint32 D = v % N;            // Distibution key
-  return D;
-}
-
-Uint32 
-Suma::getResponsibleSumaNodeId(Uint32 bucket)
+Suma::get_responsible_node(Uint32 bucket) const
 {
   // id will contain id to responsible suma or 
   // RNIL if we don't have nodegroup info yet
 
   jam();
   Uint32 node;
-  Bucket* ptr= c_buckets + bucket;
+  const Bucket* ptr= c_buckets + bucket;
   for(Uint32 i = 0; i<MAX_REPLICAS; i++)
   {
     node= ptr->m_nodes[i];
-    if(c_started_nodes.get(node))
+    if(c_alive_nodes.get(node))
     {
       break;
     }
@@ -2800,57 +2866,35 @@
   return node;
 }
 
-Uint32
-Suma::decideWhoToSend(Uint32 nBucket, Uint32 gci){
-  bool replicaFlag = true;
-  Uint32 nId = RNIL;
-
-  // bucket active/not active set by GCP_COMPLETE
-  if (c_buckets[nBucket].active) {
-    if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) {
-      jam();
-      replicaFlag = true; // let the other node send this
-      nId = RNIL;
-      // mark this as started, if we get a node failiure now we have some lost stuff
-      c_buckets[nBucket].handover_started = true;
-    } else {
-      jam();
-      replicaFlag = false;
-      nId = refToNode(reference());
-    }
-  } else {
-    nId  = getResponsibleSumaNodeId(nBucket);
-    replicaFlag = !(nId == refToNode(reference()));
-    
-    if (!replicaFlag) {
-      if (!c_buckets[nBucket].handover) {
-	jam();
-	// appearently a node has failed and we are taking over sending
-	// from that bucket.  Now we need to go back to latest completed
-	// GCI.  Handling will depend on Subscriber and Subscription
-	
-	// TODO, for now we make an easy takeover
-	if (gci < c_nodeFailGCI)
-	  c_lastInconsistentGCI = gci;
-	
-	// we now have responsability for this bucket and we're actively
-	// sending from that
-	c_buckets[nBucket].active = true;
-	DBUG_PRINT("info",("Takeover Bucket %u", nBucket));
-      } else if (c_buckets[nBucket].handoverGCI > gci) {
-	jam();
-	replicaFlag = true; // handover going on, but don't start sending yet
-	nId = RNIL;
-      } else {
-	jam();
-	DBUG_PRINT("info",("Possible error: Will send from GCI = %u", gci));
-      }
+Uint32 
+Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
+{
+  jam();
+  Uint32 node;
+  const Bucket* ptr= c_buckets + bucket;
+  for(Uint32 i = 0; i<MAX_REPLICAS; i++)
+  {
+    node= ptr->m_nodes[i];
+    if(mask.get(node))
+    {
+      return node;
     }
   }
   
-  DBUG_PRINT("info",("Suma:bucket %u, responsible id = %u, replicaFlag = %u",
-		     nBucket, nId, (Uint32)replicaFlag));
-  return replicaFlag;
+  return 0;
+}
+
+bool
+Suma::check_switchover(Uint32 bucket, Uint32 gci)
+{
+  const Uint32 send_mask = (Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER);
+  bool send = c_buckets[bucket].m_state & send_mask;
+  ndbassert(m_switchover_buckets.get(bucket));
+  if(unlikely(gci >= c_buckets[bucket].m_switchover_gci))
+  {
+    return send;
+  }
+  return !send;
 }
 
 void
@@ -2877,99 +2921,97 @@
   f_bufferLock = 0;
   b_bufferLock = 0;
 
-  bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
-  if (replicaFlag) {
-    jam();
-    DBUG_PRINT("info",("Other node sending"));
-    c_failoverBuffer.subTableData(gci,NULL,0);
-    DBUG_VOID_RETURN;
-  }
 
-  Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
-  ndbrequire(sz == f_trigBufferSize);
-
-  /**
-   * Reformat as "all headers" + "all data"
-   */
-  Uint32 dataLen   = 0;
-  Uint32 noOfAttrs = 0;
-  Uint32 * src     = f_buffer;
-  Uint32 * headers = signal->theData + 25;
-  Uint32 * dst     = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
-
-  LinearSectionPtr ptr[3];
-  int nptr;
-
-  ptr[0].p  = headers;
-  ptr[1].p  = dst;
-
-  while(sz > 0){
-    jam();
-    Uint32 tmp = * src ++;
-    * headers ++ = tmp;
-    Uint32 len = AttributeHeader::getDataSize(tmp);
-    memcpy(dst, src, 4 * len);
-    dst += len;
-    src += len;
+  Uint32 bucket= hashValue % c_no_of_buckets;
+  m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
+  if(m_active_buckets.get(bucket) || 
+     (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
+  {
+    m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
+    Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
+    ndbrequire(sz == f_trigBufferSize);
+    
+    /**
+     * Reformat as "all headers" + "all data"
+     */
+    Uint32 dataLen   = 0;
+    Uint32 noOfAttrs = 0;
+    Uint32 * src     = f_buffer;
+    Uint32 * headers = signal->theData + 25;
+    Uint32 * dst     = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
+    
+    LinearSectionPtr ptr[3];
+    int nptr;
+    
+    ptr[0].p  = headers;
+    ptr[1].p  = dst;
+    
+    while(sz > 0){
+      jam();
+      Uint32 tmp = * src ++;
+      * headers ++ = tmp;
+      Uint32 len = AttributeHeader::getDataSize(tmp);
+      memcpy(dst, src, 4 * len);
+      dst += len;
+      src += len;
+      
+      noOfAttrs++;
+      dataLen += len;
+      sz -= (1 + len);
+    }
+    ndbrequire(sz == 0);
+    
+    ptr[0].sz = noOfAttrs;
+    ptr[1].sz = dataLen;
+    
+    if (b_trigBufferSize > 0) {
+      jam();
+      ptr[2].p  = b_buffer;
+      ptr[2].sz = b_trigBufferSize;
+      nptr = 3;
+    } else {
+      jam();
+      nptr = 2;
+    }
+    
+    /**
+     * Signal to subscriber(s)
+     */
+    ndbrequire(tabPtr.p = c_tablePool.getPtr(tabPtr.i));
+    
+    SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
+    data->gci            = gci;
+    data->tableId        = tabPtr.p->m_tableId;
+    data->operation      = event;
+    data->noOfAttributes = noOfAttrs;
+    data->dataSize       = dataLen;
+    data->logType        = 0;
     
-    noOfAttrs++;
-    dataLen += len;
-    sz -= (1 + len);
-  }
-  ndbrequire(sz == 0);
-
-  ptr[0].sz = noOfAttrs;
-  ptr[1].sz = dataLen;
-
-  if (b_trigBufferSize > 0) {
-    jam();
-    ptr[2].p  = b_buffer;
-    ptr[2].sz = b_trigBufferSize;
-    nptr = 3;
-  } else {
-    jam();
-    nptr = 2;
-  }
-
-  /**
-   * Signal to subscriber(s)
-   */
-  ndbrequire(tabPtr.p = c_tablePool.getPtr(tabPtr.i));
-  
-  SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
-  data->gci            = gci;
-  data->tableId        = tabPtr.p->m_tableId;
-  data->operation      = event;
-  data->noOfAttributes = noOfAttrs;
-  data->dataSize       = dataLen;
-  data->logType        = 0;
-  if (c_lastInconsistentGCI == data->gci) {
-    data->setGCINotConsistent();
-  }
-
-  {
-    LocalDLList<Subscriber>
-      subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr))
     {
-      if (c_handoverToDo && subbPtr.p->m_firstGCI > gci)
+      LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
+      SubscriberPtr subbPtr;
+      for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
       {
-	DBUG_PRINT("info",("c_handoverToDo: %d, m_firstGCI = %d, gci = %d",
-			   c_handoverToDo, subbPtr.p->m_firstGCI, gci ));
-	jam();
-	// we're restarting and waiting for the right gci
-	continue;
+	DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
+			   refToNode(subbPtr.p->m_senderRef)));
+	data->senderData = subbPtr.p->m_senderData;
+	sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+		   SubTableData::SignalLength, JBB, ptr, nptr);
       }
-    
-      DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
-			 refToNode(subbPtr.p->m_senderRef)));
-      data->senderData = subbPtr.p->m_senderData;
-      sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-		 SubTableData::SignalLength, JBB, ptr, nptr);
     }
   }
-
+  else 
+  {
+    Uint32* dst;
+    Uint32 sz = f_trigBufferSize + b_trigBufferSize;
+    if((dst = get_buffer_ptr(bucket, gci, sz)))
+    {
+      memcpy(dst, f_buffer, f_trigBufferSize << 2);
+      dst += f_trigBufferSize;
+      memcpy(dst, b_buffer, b_trigBufferSize << 2);
+    }
+  }
+  
   DBUG_VOID_RETURN;
 }
 
@@ -2983,7 +3025,7 @@
 
   rep->senderRef  = reference();
   Uint32 gci = rep->gci;
-  c_lastCompleteGCI = gci;
+  m_last_complete_gci = gci;
 
   /**
    * Signal to subscriber(s)
@@ -2995,18 +3037,10 @@
     if (it.curr.p->m_state == Table::DEFINING)
       continue;
 
-    LocalDLList<Subscriber>
-      subbs(c_subscriberPool,it.curr.p->c_subscribers);
+    LocalDLList<Subscriber>list(c_subscriberPool,it.curr.p->c_subscribers);
     SubscriberPtr subbPtr;
-    for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+    for(list.first(subbPtr);!subbPtr.isNull();list.next(subbPtr))
     {
-#if 0
-      if (subbPtr.p->m_firstGCI > gci) {
-	jam();
-	// we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
-	continue;
-      }
-#endif 
       rep->senderData = subbPtr.p->m_senderData;
       
 #if PRINT_ONLY
@@ -3019,38 +3053,87 @@
 #endif    
     }
   }
+
+  Ptr<Gcp_record> gcp;
+  if(c_gcp_list.seize(gcp))
+  {
+    gcp.p->m_gci = gci;
+    gcp.p->m_subscribers = c_subscriber_nodes;
+  }
   
-  if (c_handoverToDo) {
-    jam();
-    c_handoverToDo = false;
-    for( int i = 0; i < NO_OF_BUCKETS; i++) {
-      if (c_buckets[i].handover) {
-	if (c_buckets[i].handoverGCI > gci) {
-	  jam();
-	  c_handoverToDo = true; // still waiting for the right GCI
-	  break; /* since all handover should happen at the same time
-		  * we can break here
-		  */
-	} else {
-	  c_buckets[i].handover = false;
-#ifdef HANDOVER_DEBUG
-	  ndbout_c("Handover Bucket %u", i);
-#endif
-	  if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
-	    // my bucket to be handed over to me
-	    ndbrequire(!c_buckets[i].active);
-	    jam();
-	    c_buckets[i].active = true;
-	  } else {
-	    // someone else's bucket to handover to
-	    ndbrequire(c_buckets[i].active);
-	    jam();
-	    c_buckets[i].active = false;
-	  }
+  /**
+   * 
+   */
+  if(!m_switchover_buckets.isclear())
+  {
+    Uint32 i = m_switchover_buckets.find(0);
+    for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
+    {
+      if(c_buckets[i].m_switchover_gci == gci)
+      {
+	Uint32 state = c_buckets[i].m_state;
+	m_switchover_buckets.clear(i);
+	printf("switchover complete bucket %d state: %x", i, state);
+	if(state & Bucket::BUCKET_STARTING)
+	{
+	  /**
+	   * NR case
+	   */
+	  m_active_buckets.set(i);
+	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
+	  ndbout_c("starting");
+	}
+	else if(state & Bucket::BUCKET_TAKEOVER)
+	{
+	  /**
+	   * NF case
+	   */
+	  Bucket* bucket= c_buckets + i;
+	  Page_pos pos= bucket->m_buffer_head;
+	  ndbrequire(pos.m_max_gci < gci);
+
+	  Buffer_page* page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+	  ndbout_c("takeover %d", pos.m_page_id);
+	  page->m_max_gci = pos.m_max_gci;
+	  page->m_words_used = pos.m_page_pos;
+	  page->m_next_page = RNIL;
+	  memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
+	  bucket->m_buffer_head.m_page_id = RNIL;
+	  bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
+
+	  m_active_buckets.set(i);
+	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
+	}
+	else
+	{
+	  /**
+	   * NR, living node
+	   */
+	  ndbrequire(state & Bucket::BUCKET_HANDOVER);
+	  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
+	  ndbout_c("handover");
 	}
       }
     }
   }
+
+  /**
+   * Add GCP COMPLETE REP to buffer
+   */
+  for(Uint32 i = 0; i<c_no_of_buckets; i++)
+  {
+    if(m_active_buckets.get(i))
+      continue;
+
+    if(c_buckets[i].m_buffer_tail != RNIL)
+    {
+      Uint32* dst;
+      if((dst= get_buffer_ptr(i, gci, 0)) == 0)
+      {
+	ndbrequire(false);
+      }
+    }
+  }
 }
 
 void
@@ -3059,9 +3142,10 @@
   jamEntry();
   DBUG_ENTER("Suma::execCREATE_TAB_CONF");
 
+#if 0
   CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
   Uint32 tableId = conf->senderData;
-#if 0
+
   TablePtr tabPtr;
   initTable(signal,tableId,tabPtr);
 #endif
@@ -3096,14 +3180,14 @@
   tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
   tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
 
-  if (getResponsibleSumaNodeId(0) != refToNode(reference()))
+  if (get_responsible_node(0) != refToNode(reference()))
   {
     DBUG_VOID_RETURN;
   }
   // responsible for bucket 0 sends info to API
   
   SubTableData * data = (SubTableData*)signal->getDataPtrSend();
-  data->gci            = c_lastCompleteGCI+1;
+  data->gci            = m_last_complete_gci+1;
   data->tableId        = tableId;
   data->operation      = NdbDictionary::Event::_TE_DROP;
   data->noOfAttributes = 0;
@@ -3159,14 +3243,14 @@
   tabPtr.p->m_state = Table::ALTERED;
   // triggers must be removed, waiting for sub stop req for that
 
-  if (getResponsibleSumaNodeId(0) != refToNode(reference()))
+  if (get_responsible_node(0) != refToNode(reference()))
   {
     DBUG_VOID_RETURN;
   }
   // responsible for bucket 0 sends info to API
   
   SubTableData * data = (SubTableData*)signal->getDataPtrSend();
-  data->gci            = c_lastCompleteGCI+1;
+  data->gci            = m_last_complete_gci+1;
   data->tableId        = tableId;
   data->operation      = NdbDictionary::Event::_TE_ALTER;
   data->noOfAttributes = 0;
@@ -3211,15 +3295,6 @@
 {
   jam();
 
-  SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
-
-  Uint32 gci = ack->rep.gci;
-
-#ifdef EVENT_DEBUG
-  ndbout_c("Suma::runSUB_GCP_COMPLETE_ACK gci = %u", gci);
-#endif
-
-  c_failoverBuffer.subGcpCompleteRep(gci);
 }
 
 void
@@ -3232,94 +3307,51 @@
   Uint32 gci = ack->rep.gci;
   Uint32 senderRef  = ack->rep.senderRef;
   Uint32 senderData = ack->rep.senderData;
-
+  
   if (refToBlock(senderRef) == SUMA) {
     jam();
     // Ack from other SUMA
-    runSUB_GCP_COMPLETE_ACK(signal);
+
+    for(Uint32 i = 0; i<c_no_of_buckets; i++)
+    {
+      release_gci(signal, i, gci);
+    }
     return;
   }
 
-  jam();
   // Ack from User and not an ack from other SUMA, redistribute in nodegroup
   
-  static Uint32 last_gci= 0;
-  if (last_gci < gci)
+  Uint32 nodeId = refToNode(senderRef);
+  
+  jam();
+  Ptr<Gcp_record> gcp;
+  for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
   {
-    last_gci= gci;
-
-    // tell the other SUMA's that I'm done with this GCI
-    jam();
-    NdbNodeBitmask tmp= c_alive_nodes;
-    tmp.bitAND(c_nodes_in_nodegroup_mask);
-    tmp.clear(getOwnNodeId());
-    NodeReceiverGroup rg(SUMA, tmp);
-    sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
-	       SubGcpCompleteAck::SignalLength, JBB);
-  }
-}
-
-static Uint32 tmpFailoverBuffer[512];
-//Suma::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p)
-//  :  m_dataList(p), 
-Suma::FailoverBuffer::FailoverBuffer()
-  :
-     c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false)
-{
-}
-
-bool Suma::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz)
-{
-  bool ok = true;
-
-  if (c_full) {
-    ok = false;
-#ifdef EVENT_DEBUG
-    ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u");
-#endif
-  } else {
-    c_gcis[c_next] = gci;
-    c_next++;
-    if (c_next == c_sz) c_next = 0;
-    if (c_next == c_first)
-      c_full = true;
-    //    ndbout_c("%u %u %u",c_first,c_next,c_sz);
-  }
-  return ok;
-}
-bool Suma::FailoverBuffer::subGcpCompleteRep(Uint32 gci)
-{
-  bool ok = true;
-
-  //  ndbout_c("Empty");
-  while (true) {
-    if (c_first == c_next && !c_full)
-      break;
-    if (c_gcis[c_first] > gci)
+    if(gcp.p->m_gci == gci)
+    {
+      gcp.p->m_subscribers.clear(nodeId);
+      if(!gcp.p->m_subscribers.isclear())
+      {
+	jam();
+	return;
+      }
       break;
-    c_full = false;
-    c_first++;
-    if (c_first == c_sz) c_first = 0;
-    //    ndbout_c("%u %u %u : ",c_first,c_next,c_sz);
+    }
   }
-
-  return ok;
-}
-bool Suma::FailoverBuffer::nodeFailRep()
-{
-  bool ok = true;
-  while (true) {
-    if (c_first == c_next && !c_full)
-      break;
-
-#ifdef EVENT_DEBUG
-    ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]);
-#endif
-    c_full = false;
-    c_first++;
-    if (c_first == c_sz) c_first = 0;
+  
+  if(gcp.isNull())
+  {
+    ndbout_c("ACK wo/ gcp record");
+  }
+  else
+  {
+    c_gcp_list.release(gcp);
   }
-  return ok;
+  
+  ack->rep.senderRef = reference();  
+  NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
+  sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
+	     SubGcpCompleteAck::SignalLength, JBB);
 }
 
 /**************************************************************
@@ -3842,7 +3874,7 @@
 {
   jam();
   DBUG_ENTER("Suma::Restart::completeRestartingNode");
-  SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
+  //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
   suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
 		  SumaStartMeConf::SignalLength, JBB);
   nodeId = 0;
@@ -3860,69 +3892,41 @@
   SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
 
   Uint32 gci = req->gci;
-  Uint32 new_gci = getFirstGCI(signal);
+  Uint32 nodeId = req->nodeId;
+  Uint32 new_gci = m_last_complete_gci + MAX_CONCURRENT_GCP + 1;
+  
+  Uint32 start_gci = (gci > new_gci ? gci : new_gci);
+  // mark all active buckets really belonging to restarting SUMA
 
-  if (new_gci > gci)
+  Bucket_mask tmp;
+  for( Uint32 i = 0; i < c_no_of_buckets; i++) 
   {
-    gci = new_gci;
-  }
-
-  { // all recreated subscribers at restarting SUMA start at same GCI
-    KeyTable<Table>::Iterator it;
-    for(c_tables.first(it);!it.isNull();c_tables.next(it))
+    if(get_responsible_node(i) == nodeId)
     {
-      LocalDLList<Subscriber>
-	subscribers(c_subscriberPool,it.curr.p->c_subscribers);
-      SubscriberPtr subbPtr;
-      for(subscribers.first(subbPtr);
-	  !subbPtr.isNull();
-	  subscribers.next(subbPtr))
+      if (m_active_buckets.get(i))
       {
-	subbPtr.p->m_firstGCI = gci;
+	// I'm running this bucket but it should really be the restarted node
+	tmp.set(i);
+	m_active_buckets.clear(i);
+	m_switchover_buckets.set(i);
+	c_buckets[i].m_switchover_gci = start_gci;
+	c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
+	ndbout_c("prepare to handover bucket: %d", i);
       }
-    }
-  }
-
-#ifdef NODEFAIL_DEBUG
-  ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci);
-#endif
-
-  c_handoverToDo = false;
-  c_restart_server_node_id = 0;
-  {
-#ifdef HANDOVER_DEBUG
-    int c = 0;
-#endif
-    for( int i = 0; i < NO_OF_BUCKETS; i++) {
-      jam();
-      if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
-#ifdef HANDOVER_DEBUG
-	c++;
-#endif
-        jam();
-	c_buckets[i].active = false;
-	c_buckets[i].handoverGCI = gci;
-	c_buckets[i].handover = true;
-	c_buckets[i].handover_started = false;
-	c_handoverToDo = true;
+      else if(m_switchover_buckets.get(i))
+      {
+	ndbout_c("dont handover bucket: %d %d", i, nodeId);
       }
     }
-#ifdef HANDOVER_DEBUG
-    ndbout_c("prepared handover of bucket %u buckets", c);
-#endif
-  }
-
-  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
-    jam();
-    Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
-    if (ref != reference()) {
-      jam();
-      sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal,
-		 SumaHandoverConf::SignalLength, JBB);
-    }//if
   }
   
-  sendSTTORRY(signal);
+  SumaHandoverConf* conf= (SumaHandoverConf*)signal->getDataPtrSend();
+  tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
+  conf->gci = start_gci;
+  conf->nodeId = getOwnNodeId();
+  sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
+	     SumaHandoverConf::SignalLength, JBB);
+  
   DBUG_VOID_RETURN;
 }
 
@@ -3938,40 +3942,381 @@
   jamEntry();
   DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
 
-  Uint32 sumaRef = signal->getSendersBlockRef();
   SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
 
   Uint32 gci = conf->gci;
-
+  Uint32 nodeId = conf->nodeId;
+  Bucket_mask tmp;
+  tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
 #ifdef HANDOVER_DEBUG
   ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
 #endif
 
-  /* TODO, if we are restarting several SUMA's (>2 in a nodegroup)
-   * we have to collect all these conf's before proceding
+  for( Uint32 i = 0; i < c_no_of_buckets; i++) 
+  {
+    if (tmp.get(i))
+    {
+      ndbrequire(get_responsible_node(i) == getOwnNodeId());
+      // We should run this bucket, but _nodeId_ is
+      c_buckets[i].m_switchover_gci = gci;
+      c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
+    }
+  }
+  
+  char buf[255];
+  tmp.getText(buf);
+  infoEvent("Suma: handover from node %d gci: %d buckets: %s (%d)",
+	    nodeId, gci, buf, c_no_of_buckets);
+  m_switchover_buckets.bitOR(tmp);
+  DBUG_VOID_RETURN;
+}
+
+static
+NdbOut&
+operator<<(NdbOut & out, const Suma::Page_pos & pos)
+{
+  out << "[ Page_pos:"
+      << " m_page_id: " << pos.m_page_id
+      << " m_page_pos: " << pos.m_page_pos
+      << " m_max_gci: " << pos.m_max_gci
+      << " ]";
+  return out;
+}
+
+Uint32*
+Suma::get_buffer_ptr(Uint32 buck, Uint32 gci, Uint32 sz)
+{
+  sz += 1; // len
+  Bucket* bucket= c_buckets+buck;
+  Page_pos pos= bucket->m_buffer_head;
+
+  Buffer_page* page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+  Uint32* ptr= page->m_data + pos.m_page_pos;
+
+  const bool same_gci = gci == pos.m_last_gci;
+
+  pos.m_page_pos += sz;
+  pos.m_last_gci = gci;
+  Uint32 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
+  
+  if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
+  {
+    pos.m_max_gci = max;
+    bucket->m_buffer_head = pos;
+    * ptr++ = (0x8000 << 16) | sz; // Same gci
+    return ptr;
+  }
+  else if(pos.m_page_pos + 1 <= Buffer_page::DATA_WORDS)
+  {
+loop:
+    pos.m_max_gci = max;
+    pos.m_page_pos += 1;
+    bucket->m_buffer_head = pos;
+    * ptr++ = (sz + 1); 
+    * ptr++ = gci;
+    return ptr;
+  }
+  else
+  {
+    /**
+     * new page
+     * 1) save header on last page
+     * 2) seize new page
+     */
+    Uint32 next;
+    if(unlikely((next= seize_page()) == RNIL))
+    {
+      ndbrequire(false);
+    }
+    
+    if(likely(pos.m_page_id != RNIL))
+    {
+      page->m_max_gci = pos.m_max_gci;
+      page->m_words_used = pos.m_page_pos - sz;
+      page->m_next_page= next;
+    }
+    else
+    {
+      bucket->m_buffer_tail = next;
+    }
+    
+    memset(&pos, 0, sizeof(pos));
+    pos.m_page_id = next;
+    pos.m_page_pos = sz;
+    pos.m_last_gci = gci;
+    
+    page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+    page->m_next_page= RNIL;
+    ptr= page->m_data;
+    goto loop; //
+  }
+}
+
+Uint32
+Suma::seize_page()
+{
+loop:
+  Ptr<Page_chunk> ptr;
+  Uint32 ref= m_first_free_page;
+  if(likely(ref != RNIL))
+  {
+    m_first_free_page = ((Buffer_page*)m_tup->page+ref)->m_next_page;
+    Uint32 chunk = ((Buffer_page*)m_tup->page+ref)->m_page_chunk_ptr_i;
+    c_page_chunk_pool.getPtr(ptr, chunk);
+    ndbassert(ptr.p->m_free);
+    ptr.p->m_free--;
+    return ref;
+  }
+
+  if(!c_page_chunk_pool.seize(ptr))
+    return RNIL;
+
+  Uint32 count;
+  m_tup->allocConsPages(16, count, ref);
+  ndbrequire(count > 0);
+
+  ndbout_c("alloc_chunk(%d %d) - ", ref, count);
+
+  m_first_free_page = ptr.p->m_page_id = ref;
+  ptr.p->m_size = count;
+  ptr.p->m_free = count;
+
+  Buffer_page* page;
+  for(Uint32 i = 0; i<count; i++)
+  {
+    page = (Buffer_page*)(m_tup->page+ref);
+    page->m_page_state= SUMA_SEQUENCE;
+    page->m_page_chunk_ptr_i = ptr.i;
+    page->m_next_page = ++ref;
+  }
+  page->m_next_page = RNIL;
+  
+  goto loop;
+}
+
+void
+Suma::free_page(Uint32 page_id, Buffer_page* page)
+{
+  Ptr<Page_chunk> ptr;
+  ndbrequire(page->m_page_state == SUMA_SEQUENCE);
+
+  Uint32 chunk= page->m_page_chunk_ptr_i;
+
+  c_page_chunk_pool.getPtr(ptr, chunk);  
+  
+  ptr.p->m_free ++;
+  page->m_next_page = m_first_free_page;
+  ndbrequire(ptr.p->m_free <= ptr.p->m_size);
+  
+  m_first_free_page = page_id;
+}
+
+void
+Suma::release_gci(Signal* signal, Uint32 buck, Uint32 gci)
+{
+  Bucket* bucket= c_buckets+buck;
+  Uint32 tail= bucket->m_buffer_tail;
+  Page_pos head= bucket->m_buffer_head;
+  Uint32 max_acked = bucket->m_max_acked_gci;
+
+  const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
+  if(unlikely(bucket->m_state & mask))
+  {
+    jam();
+    ndbout_c("release_gci(%d, %d) -> node failure -> abort", buck, gci);
+    return;
+  }
+  
+  bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
+  if(unlikely(tail == RNIL))
+  {
+    return;
+  }
+  
+  if(tail == head.m_page_id)
+  {
+    if(gci >= head.m_max_gci)
+    {
+      jam();
+      head.m_page_pos = 0;
+      head.m_max_gci = gci;
+      bucket->m_buffer_head = head;
+    }
+    return;
+  }
+  else
+  {
+    jam();
+    Buffer_page* page= (Buffer_page*)(m_tup->page+tail);
+    Uint32 max_gci = page->m_max_gci;
+    Uint32 next_page = page->m_next_page;
+
+    ndbassert(max_gci);
+    
+    if(gci >= max_gci)
+    {
+      jam();
+      free_page(tail, page);
+      
+      bucket->m_buffer_tail = next_page;
+      signal->theData[0] = SumaContinueB::RELEASE_GCI;
+      signal->theData[1] = buck;
+      signal->theData[2] = gci;
+      sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+      return;
+    }
+    else
+    {
+      ndbout_c("do nothing...");
+    }
+  }
+}
+
+void
+Suma::start_resend(Signal* signal, Uint32 buck)
+{
+  printf("start_resend(%d, ", buck);
+  
+  /**
+   * Resend from m_max_acked_gci + 1 until max_gci + 1
    */
+  Bucket* bucket= c_buckets + buck;
+  Page_pos pos= bucket->m_buffer_head;
+
+  if(pos.m_page_id == RNIL)
+  {
+    jam();
+    m_active_buckets.set(buck);
+    ndbout_c("empty bucket(RNIL) -> active");
+    return;
+  }
 
-  // restarting node is now prepared and ready
-  c_started_nodes.set(refToNode(sumaRef)); /* !! important to do before
-					       * below since it affects
-					       * getResponsibleSumaNodeId()
-					       */
+  Uint32 min= bucket->m_max_acked_gci + 1;
+  Uint32 max = pos.m_max_gci;
 
-  c_handoverToDo = false;
-  // mark all active buckets really belonging to restarting SUMA
-  for( int i = 0; i < NO_OF_BUCKETS; i++) {
-    if (c_buckets[i].active) {
-      // I'm running this bucket
-      if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) {
-	// but it should really be the restarted node
-	c_buckets[i].handoverGCI = gci;
-	c_buckets[i].handover = true;
-	c_buckets[i].handover_started = false;
-	c_handoverToDo = true;
-      }
+  if(min > max)
+  {
+    ndbrequire(pos.m_page_pos == 0);
+    ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
+    m_active_buckets.set(buck);
+    ndbout_c("empty bucket -> active");
+    return;
+  }
+  
+  bucket->m_switchover_gci = max + 1;
+  bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
+
+
+  m_switchover_buckets.set(buck);
+  
+  signal->theData[1] = buck;
+  signal->theData[2] = min;
+  signal->theData[3] = 0;
+  signal->theData[4] = 0;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB);	
+  
+  ndbout_c("min: %d - max: %d)", min, max);
+  ndbrequire(max >= min);
+}
+
+void
+Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, 
+		    Uint32 pos, Uint32 last_gci)
+{
+  Bucket* bucket= c_buckets+buck;
+  Uint32 tail= bucket->m_buffer_tail;
+
+  Buffer_page* page= (Buffer_page*)(m_tup->page+tail);
+  Uint32 max_gci = page->m_max_gci;
+  Uint32 next_page = page->m_next_page;
+  Uint32 *ptr = page->m_data + pos;
+  Uint32 *end = page->m_data + page->m_words_used;
+  bool delay = false;
+
+  ndbrequire(tail != RNIL);
+  
+  if(tail == bucket->m_buffer_head.m_page_id)
+  {
+    /**
+     * Don't dare to resend page while writing to it...
+     */
+    delay = true;
+    goto next;
+  }
+  
+  if(pos == 0 && min_gci > max_gci)
+  {
+    free_page(tail, page);
+    tail = bucket->m_buffer_tail = next_page;
+    goto next;
+  }
+  
+#if 0
+  for(Uint32 i = 0; i<page->m_words_used; i++)
+  {
+    printf("%.8x ", page->m_data[i]);
+    if(((i + 1) % 8) == 0)
+      printf("\n");
+  }
+  printf("\n");
+#endif
+
+  if(pos == 0)
+    ndbrequire(((* ptr) >> 16) == 0);
+
+  while(ptr < end)
+  {
+    Uint32 *src = ptr;
+    Uint32 tmp = * src++;
+    Uint32 sz = tmp & 0xFFFF;
+
+    if(! (tmp & (0x8000 << 16)))
+    {
+      last_gci = * src ++;
+    }
+
+    ptr += sz;
+    if(last_gci < min_gci)
+    {
+      continue;
     }
+    
+    break;
   }
-  DBUG_VOID_RETURN;
+  
+  if(ptr == end)
+  {
+    /**
+     * release...
+     */
+    free_page(tail, page);
+    tail = bucket->m_buffer_tail = next_page;
+    pos = 0;
+    last_gci = 0;
+  }
+  else
+  {
+    pos = (ptr - page->m_data);
+  }
+  
+next:
+  if(tail == RNIL)
+  {
+    bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
+    ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
+    ndbout_c("resend done...");
+    return;
+  }
+  
+  signal->theData[0] = SumaContinueB::RESEND_BUCKET;
+  signal->theData[1] = buck;
+  signal->theData[2] = min_gci;
+  signal->theData[3] = pos;
+  signal->theData[4] = last_gci;
+  if(!delay)
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 5, JBB);
+  else
+    sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 50, 5);   
 }
 
 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);

--- 1.14/storage/ndb/src/kernel/blocks/suma/Suma.hpp	Fri May 13 12:58:12 2005
+++ 1.15/storage/ndb/src/kernel/blocks/suma/Suma.hpp	Wed May 18 14:10:24 2005
@@ -24,6 +24,7 @@
 
 #include <SLList.hpp>
 #include <DLList.hpp>
+#include <DLFifoList.hpp>
 #include <KeyTable.hpp>
 #include <DataBuffer.hpp>
 #include <SignalCounter.hpp>
@@ -138,8 +139,6 @@
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_subPtrI; //reference to subscription
-    Uint32 m_firstGCI; // first GCI to send
-    Uint32 m_lastGCI; // last acnowledged GCI
     Uint32 nextList;
 
     union { Uint32 nextPool; Uint32 prevList; };
@@ -320,21 +319,7 @@
     Uint32 m_table_ptrI;
   };
   typedef Ptr<Subscription> SubscriptionPtr;
-  
-  struct Bucket {
-    bool active;
-    bool handover;
-    bool handover_started;
-    Uint32 m_nodes[MAX_REPLICAS];
-    Uint32 handoverGCI;
-  };
-  STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4! 
-  struct Bucket c_buckets[NO_OF_BUCKETS];
-  Uint32 c_no_of_buckets;
-  
-  bool c_handoverToDo;
-  Uint32 c_lastCompleteGCI;
-
+    
   /**
    * 
    */
@@ -356,19 +341,6 @@
   ArrayPool<SyncRecord> c_syncPool;
   DataBuffer<15>::DataBufferPool c_dataBufferPool;
 
-  /**
-   * for restarting Suma not to start sending data too early
-   */
-  Uint32 c_restart_server_node_id;
-
-  /**
-   * for flagging that a GCI containg inconsistent data
-   * typically due to node failiure
-   */
-
-  Uint32 c_lastInconsistentGCI;
-  Uint32 c_nodeFailGCI;
-
   NodeBitmask c_failedApiNodes;
   
   /**
@@ -397,30 +369,6 @@
   void completeSubRemove(SubscriptionPtr subPtr);
 
   Uint32 getFirstGCI(Signal* signal);
-  Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci);
-
-  struct FailoverBuffer {
-    //    FailoverBuffer(DataBuffer<15>::DataBufferPool & p);
-    FailoverBuffer();
-
-    bool subTableData(Uint32 gci, Uint32 *src, int sz);
-    bool subGcpCompleteRep(Uint32 gci);
-    bool nodeFailRep();
-
-    //    typedef DataBuffer<15> GCIDataBuffer;
-    //    GCIDataBuffer                      m_GCIDataBuffer;
-    //    GCIDataBuffer::DataBufferIterator  m_GCIDataBuffer_it;
-
-    Uint32 *c_gcis;
-    int c_sz;
-
-    //    Uint32 *c_buf;
-    //    int c_buf_sz;
-
-    int c_first;
-    int c_next;
-    bool c_full;
-  } c_failoverBuffer;
 
   /**
    * Table admin
@@ -444,7 +392,6 @@
    */
 
   void getNodeGroupMembers(Signal* signal);
-  void send_start_me_req(Signal* signal);
 
   void execSTTOR(Signal* signal);
   void sendSTTORRY(Signal*);
@@ -453,8 +400,8 @@
   void execREAD_NODESCONF(Signal* signal);
   void execNODE_FAILREP(Signal* signal);
   void execINCL_NODEREQ(Signal* signal);
-  void execNODE_START_REP(Signal* signal);
   void execSIGNAL_DROPPED_REP(Signal* signal);
+  void execAPI_START_REP(Signal* signal);
   void execAPI_FAILREQ(Signal* signal) ;
 
   void execSUB_GCP_COMPLETE_ACK(Signal* signal);
@@ -492,9 +439,6 @@
   void execUTIL_SEQUENCE_REF(Signal* signal);
   void execCREATE_SUBID_REQ(Signal* signal);
   
-  Uint32 getStoreBucket(Uint32 v);
-  Uint32 getResponsibleSumaNodeId(Uint32 D);
-
   /**
    * for Suma that is restarting another
    */
@@ -538,16 +482,123 @@
    */
   NodeId c_masterNodeId;
   NdbNodeBitmask c_alive_nodes;
-  NdbNodeBitmask c_started_nodes;
-  NdbNodeBitmask c_starting_nodes;
   
   /**
+   * for restarting Suma not to start sending data too early
+   */
+  struct Startup
+  {
+    bool m_wait_handover;
+    Uint32 m_restart_server_node_id;
+  } c_startup;
+  
+  NodeBitmask c_connected_nodes;  // (NODE/API) START REP / (API/NODE) FAIL REQ
+  NodeBitmask c_subscriber_nodes; // 
+
+  /**
    * for all Suma's to keep track of other Suma's in Node group
    */
   Uint32 c_nodeGroup;
   Uint32 c_noNodesInGroup;
   Uint32 c_nodesInGroup[MAX_REPLICAS];
   NdbNodeBitmask c_nodes_in_nodegroup_mask;  // NodeId's of nodes in nodegroup
+
+  void send_start_me_req(Signal* signal);
+  void check_start_handover(Signal* signal);
+  void send_handover_req(Signal* signal);
+
+  Uint32 get_responsible_node(Uint32 B) const;
+  Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
+  bool check_switchover(Uint32 bucket, Uint32 gci);
+
+public:  
+  struct Page_pos
+  {
+    Uint32 m_page_id;
+    Uint32 m_page_pos;  
+    Uint32 m_max_gci;   // max gci on page
+    Uint32 m_last_gci;  // last gci on page
+  };
+private:
+  
+  struct Bucket 
+  {
+    enum {
+      BUCKET_STARTING = 0x1  // On starting node
+      ,BUCKET_HANDOVER = 0x2 // On running node
+      ,BUCKET_TAKEOVER = 0x4 // On takeing over node
+      ,BUCKET_RESEND   = 0x8 // On takeing over node
+    };
+    Uint32 m_state;
+    Uint16 m_nodes[MAX_REPLICAS]; 
+    Uint32 m_switchover_gci;
+    Uint32 m_max_acked_gci;
+    Uint32 m_buffer_tail;   // Page
+    Page_pos m_buffer_head;
+  };
+  
+  struct Buffer_page 
+  {
+    STATIC_CONST( DATA_WORDS = 8192 - 5);
+    Uint32 m_page_state;     // Used by TUP buddy algorithm
+    Uint32 m_page_chunk_ptr_i;
+    Uint32 m_next_page;      
+    Uint32 m_words_used;     // 
+    Uint32 m_max_gci;        //
+    Uint32 m_data[DATA_WORDS];
+  };
+  
+  STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1! 
+  Uint32 c_no_of_buckets;
+  struct Bucket c_buckets[NO_OF_BUCKETS];
+  
+  STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
+  typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
+  Bucket_mask m_active_buckets;
+  Bucket_mask m_switchover_buckets;  
+  
+  class Dbtup* m_tup;
+  void init_buffers();
+  Uint32* get_buffer_ptr(Uint32 buck, Uint32 gci, Uint32 sz);
+  Uint32 seize_page();
+  void free_page(Uint32 page_id, Buffer_page* page);
+
+  void start_resend(Signal*, Uint32 bucket);
+  void resend_bucket(Signal*, Uint32 bucket, Uint32 gci, 
+		     Uint32 page_pos, Uint32 last_gci);
+  void release_gci(Signal*, Uint32 bucket, Uint32 gci);
+
+  Uint32 m_max_seen_gci;      // FIRE_TRIG_ORD
+  Uint32 m_max_sent_gci;      // FIRE_TRIG_ORD -> send
+  Uint32 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
+
+  struct Gcp_record 
+  {
+    Uint32 m_gci;
+    NodeBitmask m_subscribers;
+    union {
+      Uint32 nextPool;
+      Uint32 nextList;
+    };
+    Uint32 prevList;
+  };
+  ArrayPool<Gcp_record> c_gcp_pool;
+  DLFifoList<Gcp_record> c_gcp_list;
+
+  struct Page_chunk
+  {
+    Uint32 m_page_id;
+    Uint32 m_size;
+    Uint32 m_free;
+    union {
+      Uint32 nextPool;
+      Uint32 nextList;
+    };
+    Uint32 prevList;
+  };
+
+  Uint32 m_first_free_page;
+  ArrayPool<Page_chunk> c_page_chunk_pool;
 };
 
 #endif

--- 1.14/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	Fri May 13 12:58:12 2005
+++ 1.15/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	Wed May 18 14:10:24 2005
@@ -25,23 +25,24 @@
   c_removeDataSubscribers(c_subscriberPool),
   c_tables(c_tablePool),
   c_subscriptions(c_subscriptionPool),
-  Restart(*this)
+  Restart(*this),
+  c_gcp_list(c_gcp_pool)
 {
   c_masterNodeId = getOwnNodeId();
 
-  c_nodeGroup = c_noNodesInGroup = 0;
+  c_no_of_buckets = c_nodeGroup = c_noNodesInGroup = 0;
   for (int i = 0; i < MAX_REPLICAS; i++) {
     c_nodesInGroup[i] = 0;
   }
-
+  
   // Add received signals
   addRecSignal(GSN_STTOR, &Suma::execSTTOR);
   addRecSignal(GSN_NDB_STTOR, &Suma::execNDB_STTOR);
   addRecSignal(GSN_DUMP_STATE_ORD, &Suma::execDUMP_STATE_ORD);
   addRecSignal(GSN_READ_NODESCONF, &Suma::execREAD_NODESCONF);
+  addRecSignal(GSN_API_START_REP, &Suma::execAPI_START_REP, true);
   addRecSignal(GSN_API_FAILREQ,  &Suma::execAPI_FAILREQ);
   addRecSignal(GSN_NODE_FAILREP, &Suma::execNODE_FAILREP);
-  addRecSignal(GSN_NODE_START_REP, &Suma::execNODE_START_REP);
   addRecSignal(GSN_INCL_NODEREQ, &Suma::execINCL_NODEREQ);
   addRecSignal(GSN_CONTINUEB, &Suma::execCONTINUEB);
   addRecSignal(GSN_SIGNAL_DROPPED_REP, &Suma::execSIGNAL_DROPPED_REP, true);
@@ -148,6 +149,10 @@
   c_subscriptionPool.setSize(noTables);
   c_syncPool.setSize(2);
   c_dataBufferPool.setSize(noAttrs);
+  c_gcp_pool.setSize(5);
+
+  m_first_free_page= RNIL;
+  c_page_chunk_pool.setSize(5);
   
   {
     SLList<SyncRecord> tmp(c_syncPool);
@@ -157,20 +162,21 @@
     tmp.release();
   }
 
-  for( int i = 0; i < NO_OF_BUCKETS; i++) {
-    c_buckets[i].active = false;
-    c_buckets[i].handover = false;
-    c_buckets[i].handover_started = false;
-    c_buckets[i].handoverGCI = 0;
-    memset(c_buckets[i].m_nodes, 0, sizeof(c_buckets[i].m_nodes));
+  memset(c_buckets, 0, sizeof(c_buckets));
+  for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
+  {
+    Bucket* bucket= c_buckets+i;
+    bucket->m_buffer_tail = RNIL;
+    bucket->m_buffer_head.m_page_id = RNIL;
+    bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
   }
-  c_handoverToDo = false;
-  c_lastInconsistentGCI = RNIL;
-  c_lastCompleteGCI = RNIL;
-  c_nodeFailGCI = 0;
-
-  c_restart_server_node_id = 0; // Server for my NR
+  
+  m_max_seen_gci = 0;      // FIRE_TRIG_ORD
+  m_max_sent_gci = 0;      // FIRE_TRIG_ORD -> send
+  m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
+  
   c_failedApiNodes.clear();
+  c_startup.m_restart_server_node_id = 0; // Server for my NR
 }
 
 Suma::~Suma()

--- 1.19/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	Fri May 13 12:58:12 2005
+++ 1.20/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	Wed May 18 14:10:24 2005
@@ -141,6 +141,7 @@
   a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
   a[GSN_READ_CONFIG_REQ] = &SimulatedBlock::execREAD_CONFIG_REQ;
   a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
+  a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
 }
 
 void
@@ -908,6 +909,11 @@
 
 void
 SimulatedBlock::execNODE_START_REP(Signal* signal)
+{
+}
+
+void
+SimulatedBlock::execAPI_START_REP(Signal* signal)
 {
 }
 

--- 1.13/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	Fri May 13 12:58:12 2005
+++ 1.14/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	Wed May 18 14:10:24 2005
@@ -400,6 +400,7 @@
 
   void execSIGNAL_DROPPED_REP(Signal* signal);
   void execCONTINUE_FRAGMENTED(Signal* signal);
+  void execAPI_START_REP(Signal* signal);
   void execNODE_START_REP(Signal* signal);
   
   Uint32 c_fragmentIdCounter;
Thread
bk commit into 5.1 tree (joreland:1.1865)jonas.oreland18 May