List:Commits« Previous MessageNext Message »
From:jonas Date:December 14 2007 1:48pm
Subject:bk commit into 5.1 tree (jonas:1.2179)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-12-14 13:48:05+01:00, jonas@stripped +23 -0
  SUMA V2

  cluster_change_hist.txt@stripped, 2007-12-14 13:48:03+01:00, jonas@stripped +15
-0
    update cluster change hist

  configure.in@stripped, 2007-12-14 13:48:03+01:00, jonas@stripped +3 -3
    add version handling for SUMA v2

  storage/ndb/include/kernel/GlobalSignalNumbers.h@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +10 -4
    add DictLock to drop6 (SUMA)

  storage/ndb/include/kernel/signaldata/DictLock.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +93 -0
    New BitKeeper file ``storage/ndb/include/kernel/signaldata/DictLock.hpp''

  storage/ndb/include/kernel/signaldata/DictLock.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +0 -0

  storage/ndb/include/kernel/signaldata/SumaImpl.hpp@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +41 -8
    Add lots of new error codes

  storage/ndb/include/mgmapi/mgmapi_config_parameters.h@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +4 -0
    add new config parameters

  storage/ndb/include/ndb_version.h.in@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +2 -0
    add version handling for SUMA v2

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +196 -27
    add dict lock
    make substartreq "almost" atomic

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +15 -0
    add dict lock
    make substartreq "almost" atomic

  storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp@stripped, 2007-12-14 13:48:03+01:00,
jonas@stripped +1 -1
    let QMGR wait for API_FAILCONF from SUMA

  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +21 -21
    let QMGR wait for API_FAILCONF from SUMA

  storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +1630 -1769
    SUMA v2

  storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +128 -129
    SUMA v2

  storage/ndb/src/kernel/blocks/suma/SumaInit.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +8 -12
    SUMA v2

  storage/ndb/src/kernel/vm/DLFifoList.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +2 -0
    utility

  storage/ndb/src/kernel/vm/RequestTracker.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +2 -1
    utility

  storage/ndb/src/kernel/vm/SafeCounter.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +5 -0
    utility

  storage/ndb/src/kernel/vm/SafeCounter.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +1 -0
    utility

  storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +26 -33
    easier tunable pools

  storage/ndb/src/kernel/vm/SimulatedBlock.hpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +4 -1
    easier tunable pools

  storage/ndb/src/mgmsrv/ConfigInfo.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +36 -0
    Add new config parameters

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +47 -7
    Add sleeps before retires in dictSignal

  storage/ndb/src/ndbapi/ndberror.c@stripped, 2007-12-14 13:48:04+01:00,
jonas@stripped +7 -1
    make 701 temporary

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/drop6

--- 1.343/configure.in	2007-12-14 13:48:09 +01:00
+++ 1.344/configure.in	2007-12-14 13:48:09 +01:00
@@ -7,7 +7,7 @@
 AC_CANONICAL_SYSTEM
 # The Docs Makefile.am parses this line!
 # Don't forget to also update the NDB lines below.
-AM_INIT_AUTOMAKE(mysql, 5.2.1-a_drop6p12)
+AM_INIT_AUTOMAKE(mysql, 5.2.2-a_drop6p13)
 AM_CONFIG_HEADER(config.h)
 
 PROTOCOL_VERSION=10
@@ -18,8 +18,8 @@
 # ndb version
 NDB_VERSION_MAJOR=5
 NDB_VERSION_MINOR=2
-NDB_VERSION_BUILD=1
-NDB_VERSION_STATUS="a_drop6p12"
+NDB_VERSION_BUILD=2
+NDB_VERSION_STATUS="a_drop6p13"
 
 # Set all version vars based on $VERSION. How do we do this more elegant ?
 # Remember that regexps needs to quote [ and ] since this is run through m4

--- 1.32/cluster_change_hist.txt	2007-12-14 13:48:09 +01:00
+++ 1.33/cluster_change_hist.txt	2007-12-14 13:48:09 +01:00
@@ -13,6 +13,21 @@
 
 -------------------------------
 
+Alcatel Drop 6p13 (Fri 14 December 2007)
+
+     *Bug#26457 - Additional fix for 
+      "Incorrect handling of LCP take-over during multi-master-node-failure"
+
+  Suma V2
+    - make all operation(s) wrt subscription(s) o(1)
+    - fix various race conditions
+
+    - new config variable "MaxNoOfSubscriptions" (default = MaxNoOfTables)
+    - new config variable "MaxNoOfSubscribers" (default = 2 * MaxNoOfSubscriptions)
+    - new config variable "MaxNoOfConcurrentSubOperations" (default = 256)
+
+-------------------------------
+
 Alcatel Drop 6p12 (Wed 14 November 2007)
 
      *Bug#32359 Load/slow network can lead to "trashing" 
--- New file ---
+++ storage/ndb/include/kernel/signaldata/DictLock.hpp	07/12/14 13:48:04
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef DICT_LOCK_HPP
#define DICT_LOCK_HPP

#include "SignalData.hpp"

// see comments in Dbdict.hpp

class DictLockReq {
  friend class Dbdict;
  friend class Dbdih;
  friend class Suma;
public:
  STATIC_CONST( SignalLength = 3 );
  enum LockType {
    NoLock = 0
    ,NodeRestartLock = 1 // S-lock
    ,NodeFailureLock = 2 // S-lock
    ,CreateTableLock = 3
    ,AlterTableLock  = 4
    ,DropTableLock   = 5
    ,CreateIndexLock = 6
    ,DropIndexLock   = 7
    ,CreateFileLock  = 8
    ,CreateFilegroupLock = 9
    ,DropFileLock    = 10
    ,DropFilegroupLock = 11
    ,SumaStartMe = 12
  };
private:
  Uint32 userPtr;
  Uint32 lockType;
  Uint32 userRef;
};

class DictLockConf {
  friend class Dbdict;
  friend class Dbdih;
public:
  STATIC_CONST( SignalLength = 3 );
private:
  Uint32 userPtr;
  Uint32 lockType;
  Uint32 lockPtr;
};

class DictLockRef {
  friend class Dbdict;
  friend class Dbdih;
  friend class Suma;
public:
  STATIC_CONST( SignalLength = 3 );
  enum ErrorCode {
    NotMaster = 1,
    InvalidLockType = 2,
    BadUserRef = 3,
    TooLate = 4,
    TooManyRequests = 5
  };
private:
  Uint32 userPtr;
  Uint32 lockType;
  Uint32 errorCode;
};

class DictUnlockOrd {
  friend class Dbdict;
  friend class Dbdih;
  friend class Suma;
public:
  STATIC_CONST( SignalLength = 4 );
private:
  Uint32 lockPtr;
  Uint32 lockType;
  Uint32 senderData;
  Uint32 senderRef;
};

#endif


--- 1.27/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2007-12-14 13:48:09 +01:00
+++ 1.28/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2007-12-14 13:48:09 +01:00
@@ -86,6 +86,10 @@
 #define CFG_DB_MAX_OPEN_FILES         159
 #define CFG_DB_INITIAL_OPEN_FILES     162
 
+#define CFG_DB_SUBSCRIPTIONS          174
+#define CFG_DB_SUBSCRIBERS            175
+#define CFG_DB_SUB_OPERATIONS         176
+
 #define CFG_DB_DATA_MEM_2             199
 
 #define CFG_NODE_ARBIT_RANK           200

--- 1.11/storage/ndb/include/ndb_version.h.in	2007-12-14 13:48:09 +01:00
+++ 1.12/storage/ndb/include/ndb_version.h.in	2007-12-14 13:48:09 +01:00
@@ -64,5 +64,7 @@
 #define NDBD_PREPARE_COPY_FRAG_VERSION_DROP5 NDB_MAKE_VERSION(5,1,5)
 #define NDBD_PREPARE_COPY_FRAG_VERSION_DROP6 NDB_MAKE_VERSION(5,2,1)
 
+#define NDBD_SUMA_DICT_LOCK NDB_MAKE_VERSION(5,2,2)
+
 #endif
  

--- 1.18/storage/ndb/include/kernel/GlobalSignalNumbers.h	2007-12-14 13:48:09 +01:00
+++ 1.19/storage/ndb/include/kernel/GlobalSignalNumbers.h	2007-12-14 13:48:09 +01:00
@@ -511,16 +511,16 @@
 #define GSN_TEST_ORD                    407
 #define GSN_TESTSIG                     408
 #define GSN_TIME_SIGNAL                 409
-/* 410 unused  */
-/* 411 unused  */
-/* 412 unused */
+/* 410 not unused  */
+/* 411 not unused  */
+/* 412 not unused */
 #define GSN_TUP_ABORTREQ                414
 #define GSN_TUP_ADD_ATTCONF             415
 #define GSN_TUP_ADD_ATTRREF             416
 #define GSN_TUP_ADD_ATTRREQ             417
 #define GSN_TUP_ATTRINFO                418
 #define GSN_TUP_COMMITREQ               419
-/* 420 unused */
+/* 420 not unused */
 #define GSN_TUP_LCPCONF                 421
 #define GSN_TUP_LCPREF                  422
 #define GSN_TUP_LCPREQ                  423
@@ -936,5 +936,11 @@
 
 #define GSN_ACC_LOCKREQ			711
 #define GSN_READ_PSEUDO_REQ             712
+
+#define GSN_DICT_LOCK_REQ               410
+#define GSN_DICT_LOCK_CONF              411
+#define GSN_DICT_LOCK_REF               412
+#define GSN_DICT_UNLOCK_ORD             420
+
 
 #endif

--- 1.9/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2007-12-14 13:48:09 +01:00
+++ 1.10/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2007-12-14 13:48:09 +01:00
@@ -30,7 +30,6 @@
   
   friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
   STATIC_CONST( SignalLength = 6 );
-  STATIC_CONST( SignalLength2 = 7 );
   
   enum SubscriptionType {
     SingleTableScan  = 1,  // 
@@ -39,7 +38,6 @@
     SelectiveTableSnapshot  = 4,  // User defines tables
     RemoveFlags  = 0xff,
     GetFlags     = 0xff << 16,
-    AddTableFlag = 0x1 << 16,
     RestartFlag  = 0x2 << 16
   };
   
@@ -49,7 +47,6 @@
   Uint32 subscriptionKey;
   Uint32 subscriptionType;
   Uint32 tableId;
-  Uint32 state;
 };
 
 struct SubCreateRef {
@@ -65,6 +62,15 @@
   Uint32 senderRef;
   Uint32 senderData;
   Uint32 errorCode;
+
+  enum ErrorCode
+  {
+    SubscriptionAlreadyExist = 1415
+    ,OutOfSubscriptionRecords = 1422
+    ,OutOfTableRecords = 1423
+    ,TableDropped = 1417
+    ,NF_FakeErrorREF = 11
+  };
 };
 
 struct SubCreateConf {
@@ -95,8 +101,7 @@
   friend struct Suma;
   
   friend bool printSUB_START_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 6 );
-  STATIC_CONST( SignalLength2 = SignalLength+1 );
+  STATIC_CONST( SignalLength = 7 );
 
   Uint32 senderRef;
   Uint32 senderData;
@@ -118,11 +123,21 @@
     Undefined = 1,
     NF_FakeErrorREF = 11,
     Busy = 701,
-    PartiallyConnected = 1421
+    PartiallyConnected = 1421,
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    Dropped = 1418,
+    Defining = 1418,
+    OutOfSubscriberRecords = 1412,
+    OutOfSubOpRecords = 1424,
+    NotMaster = 702, // For API/DICT communication
+    BusyWithNR = 1405,
+    NodeDied = 1427
   };
 
   STATIC_CONST( SignalLength = 7 );
   STATIC_CONST( SignalLength2 = SignalLength+1 );
+  STATIC_CONST( SL_MasterNode = 9 );
   
   Uint32 senderRef;
   Uint32 senderData;
@@ -134,6 +149,7 @@
   Uint32 errorCode;
   // with SignalLength2
   Uint32 subscriberRef;
+  Uint32 masterNodeId;
 };
 
 struct SubStartConf {
@@ -184,10 +200,18 @@
   enum ErrorCode {
     Undefined = 1,
     NF_FakeErrorREF = 11,
-    Busy = 701
+    Busy = 701,
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    Defining = 1425,
+    OutOfSubOpRecords = 1424,
+    NoSuchSubscriber = 1426,
+    NotMaster = 702,
+    BusyWithNR = 1405
   };
 
   STATIC_CONST( SignalLength = 8 );
+  STATIC_CONST( SL_MasterNode = 9 );
   
   Uint32 senderRef;
   Uint32 senderData;
@@ -197,6 +221,7 @@
   Uint32 subscriberData;
   Uint32 subscriberRef;
   Uint32 errorCode;
+  Uint32 masterNodeId;
 };
 
 struct SubStopConf {
@@ -412,7 +437,10 @@
   enum ErrorCode {
     Undefined = 1,
     NF_FakeErrorREF = 11,
-    Busy = 701
+    Busy = 701,
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    AlreadyDropped = 1419
   };
 
   Uint32 senderRef;
@@ -520,6 +548,11 @@
     RESEND_BUCKET = 1
     ,RELEASE_GCI = 2
     ,OUT_OF_BUFFER_RELEASE = 3
+    ,API_FAIL_GCI_LIST = 4
+    ,API_FAIL_SUBSCRIBER_LIST = 5
+    ,API_FAIL_SUBSCRIPTION = 6
+    ,SUB_STOP_REQ = 7
+    ,RETRY_DICT_LOCK = 8
   };
 };
 

--- 1.80/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2007-12-14 13:48:09 +01:00
+++ 1.81/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2007-12-14 13:48:09 +01:00
@@ -1228,6 +1228,42 @@
     STR_VALUE(MAX_INT_RNIL) },
 
   { 
+    CFG_DB_SUBSCRIPTIONS,
+    "MaxNoOfSubscriptions",
+    DB_TOKEN,
+    "Max no of subscriptions (default 0 == MaxNoOfTables)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "0",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
+    CFG_DB_SUBSCRIBERS,
+    "MaxNoOfSubscribers",
+    DB_TOKEN,
+    "Max no of subscribers (default 0 == 2 * MaxNoOfTables)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "0",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
+    CFG_DB_SUB_OPERATIONS,
+    "MaxNoOfConcurrentSubOperations",
+    DB_TOKEN,
+    "Max no of concurrent subscriber operations",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "256",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
     KEY_INTERNAL,
     "TcpBind_INADDR_ANY",
     DB_TOKEN,

--- 1.65/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-12-14 13:48:09 +01:00
+++ 1.66/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-12-14 13:48:09 +01:00
@@ -77,6 +77,10 @@
 #include <signaldata/CreateTab.hpp>
 #include <NdbSleep.h>
 #include <signaldata/ApiBroadcast.hpp>
+#include <signaldata/DictLock.hpp>
+
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
 
 #define ZNOT_FOUND 626
 #define ZALREADYEXIST 630
@@ -177,6 +181,14 @@
 	       DropTableReq::SignalLength, JBB);
   }
 #endif  
+
+  if (signal->theData[0] == 8004)
+  {
+    infoEvent("DICT: c_counterMgr size: %u free: %u",
+              c_counterMgr.getSize(),
+              c_counterMgr.getNoOfFree());
+    c_counterMgr.printNODE_FAILREP();
+  }
   
   return;
 }//Dbdict::execDUMP_STATE_ORD()
@@ -1350,6 +1362,9 @@
   addRecSignal(GSN_DROP_TAB_CONF, &Dbdict::execDROP_TAB_CONF);
 
   addRecSignal(GSN_BACKUP_FRAGMENT_REQ, &Dbdict::execBACKUP_FRAGMENT_REQ);
+
+  addRecSignal(GSN_DICT_LOCK_REQ, &Dbdict::execDICT_LOCK_REQ);
+  addRecSignal(GSN_DICT_UNLOCK_ORD, &Dbdict::execDICT_UNLOCK_ORD);
 }//Dbdict::Dbdict()
 
 Dbdict::~Dbdict() 
@@ -1358,6 +1373,18 @@
 
 BLOCK_FUNCTIONS(Dbdict)
 
+void
+Dbdict::getBlockConfig(const char * name, Uint32 * count)
+{
+  if (name != 0 && count != 0)
+  {
+    if (strcmp(name, "SafeCounter") == 0)
+    {
+      * count = 25;
+    }
+  }
+}
+
 void Dbdict::initCommonData() 
 {
 /* ---------------------------------------------------------------- */
@@ -1385,6 +1412,9 @@
   c_systemRestart = false;
   c_initialNodeRestart = false;
   c_nodeRestart = false;
+
+  c_outstanding_sub_startstop = 0;
+  c_sub_startstop_lock.clear();
 }//Dbdict::initCommonData()
 
 void Dbdict::initRecords() 
@@ -2831,9 +2861,12 @@
   }
   ndbrequire(ok);
   
+  NdbNodeBitmask tmp;
+  tmp.assign(NdbNodeBitmask::Size, theFailedNodes);
+
   for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
     jam();
-    if(NdbNodeBitmask::get(theFailedNodes, i)) {
+    if(tmp.get(i)) {
       jam();
       NodeRecordPtr nodePtr;
       c_nodes.getPtr(nodePtr, i);
@@ -2850,6 +2883,7 @@
     }//if
   }//for
 
+  c_sub_startstop_lock.bitANDC(tmp);
 }//execNODE_FAILREP()
 
 
@@ -7560,7 +7594,7 @@
 {
   jamEntry();
   EVENT_TRACE;
-   ndbrequire(recvSignalUtilReq(signal, 0) == 0);
+  ndbrequire(recvSignalUtilReq(signal, 0) == 0);
 }
 
 void Dbdict::execUTIL_EXECUTE_REF(Signal *signal)
@@ -7816,12 +7850,13 @@
   }
 #endif
 
+  CreateEvntReq *req = (CreateEvntReq*)signal->getDataPtr();
+
   if (! assembleFragments(signal)) {
     jam();
     return;
   }
 
-  CreateEvntReq *req = (CreateEvntReq*)signal->getDataPtr();
   const CreateEvntReq::RequestType requestType = req->getRequestType();
   const Uint32                     requestFlag = req->getRequestFlag();
 
@@ -8062,17 +8097,17 @@
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::DICT_TAB_INFO_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
@@ -8498,6 +8533,7 @@
   }
 
   sendSignal(rg, GSN_CREATE_EVNT_REQ, signal, CreateEvntReq::SignalLength, JBB);
+  return;
 }
 
 void
@@ -8516,7 +8552,6 @@
   OpCreateEventPtr evntRecPtr;
 
   evntRecPtr.i = ref->getUserData();
-
   ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
 #ifdef EVENT_PH2_DEBUG
@@ -8528,6 +8563,7 @@
     evntRecPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(ref->senderRef));
   } else {
     jam();
+    evntRecPtr.p->m_errorCode = ref->errorCode;
     evntRecPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(ref->senderRef));
   }
   createEvent_sendReply(signal, evntRecPtr);
@@ -8613,12 +8649,6 @@
   evntRecPtr.i = ref->senderData;
   ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
-  if (ref->errorCode == 1415) {
-    jam();
-    createEvent_sendReply(signal, evntRecPtr);
-    DBUG_VOID_RETURN;
-  }
-
   if (ref->errorCode)
   {
     evntRecPtr.p->m_errorCode = ref->errorCode;
@@ -8756,6 +8786,66 @@
  *
  *******************************************************************/
 
+void
+Dbdict::execDICT_LOCK_REQ(Signal* signal)
+{
+  jamEntry();
+  DictLockReq req = *(DictLockReq*)signal->getDataPtr();
+
+  Uint32 err = 0;
+  do {
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      err = DictLockRef::NotMaster;
+      break;
+    }
+
+    if (req.lockType != DictLockReq::SumaStartMe)
+    {
+      jam();
+      err = DictLockRef::InvalidLockType;
+      break;
+    }
+
+    if (c_outstanding_sub_startstop)
+    {
+      jam();
+      g_eventLogger.info("refing dict lock to %u", refToNode(req.userRef));
+      err = DictLockRef::TooManyRequests;
+      break;
+    }
+
+    c_sub_startstop_lock.set(refToNode(req.userRef));
+
+    g_eventLogger.info("granting dict lock to %u", refToNode(req.userRef));
+    DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
+    conf->userPtr = req.userPtr;
+    conf->lockType = req.lockType;
+    conf->lockPtr = 0;
+    sendSignal(req.userRef, GSN_DICT_LOCK_CONF, signal,
+               DictLockConf::SignalLength, JBB);
+    return;
+  } while(0);
+
+  DictLockRef* ref = (DictLockRef*)signal->getDataPtrSend();
+  ref->userPtr = req.userPtr;
+  ref->lockType = req.lockType;
+  ref->errorCode = err;
+  sendSignal(req.userRef, GSN_DICT_LOCK_REF, signal,
+             DictLockRef::SignalLength, JBB);
+}
+
+void
+Dbdict::execDICT_UNLOCK_ORD(Signal* signal)
+{
+  jamEntry();
+  DictUnlockOrd ord = *(DictUnlockOrd*)signal->getDataPtr();
+
+  g_eventLogger.info("clearing dict lock for %u", refToNode(ord.senderRef));
+  c_sub_startstop_lock.clear(refToNode(ord.senderRef));
+}
+
 void Dbdict::execSUB_START_REQ(Signal* signal)
 {
   jamEntry();
@@ -8779,9 +8869,10 @@
     //      ret->setErrorNode(reference());
     ref->senderRef = reference();
     ref->errorCode = errCode;
+    ref->masterNodeId = c_masterNodeId;
 
     sendSignal(origSenderRef, GSN_SUB_START_REF, signal,
-	       SubStartRef::SignalLength2, JBB);
+	       SubStartRef::SL_MasterNode, JBB);
     return;
   }
 
@@ -8790,6 +8881,11 @@
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_START_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = req->subscriberRef;
+    subbPtr.p->m_subscriberData = req->subscriberData;
   }
   
   if (refToBlock(origSenderRef) != DBDICT) {
@@ -8797,6 +8893,22 @@
      * Coordinator
      */
     jam();
+
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStartRef::NotMaster;
+      goto busy;
+    }
+
+    if (!c_sub_startstop_lock.isclear())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStartRef::BusyWithNR;
+      goto busy;
+    }
     
     subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly
     NodeReceiverGroup rg(DBDICT, c_aliveNodes);
@@ -8817,7 +8929,8 @@
     ndbout_c("DBDICT(Coordinator) sending GSN_SUB_START_REQ to DBDICT participants
subbPtr.i = (%d)", subbPtr.i);
 #endif
 
-    sendSignal(rg, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength2, JBB);
+    c_outstanding_sub_startstop++;
+    sendSignal(rg, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength, JBB);
     return;
   }
   /*
@@ -8834,7 +8947,7 @@
 #ifdef EVENT_PH3_DEBUG
     ndbout_c("DBDICT(Participant) sending GSN_SUB_START_REQ to SUMA subbPtr.i = (%d)",
subbPtr.i);
 #endif
-    sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength2, JBB);
+    sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength, JBB);
   }
 }
 
@@ -8947,25 +9060,35 @@
     return;
   }
 
-  if (subbPtr.p->m_reqTracker.hasRef()) {
+  if (subbPtr.p->m_reqTracker.hasRef())
+  {
     jam();
 #ifdef EVENT_DEBUG
     ndbout_c("SUB_START_REF");
 #endif
-    SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
-    ref->errorCode = subbPtr.p->m_errorCode;
-    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
-	       signal, SubStartRef::SignalLength, JBB);
-    if (subbPtr.p->m_reqTracker.hasConf()) {
-      //  stopStartedNodes(signal);
-    }
-    c_opSubEvent.release(subbPtr);
+
+    NodeReceiverGroup rg(DBDICT, subbPtr.p->m_reqTracker.m_confs);
+    RequestTracker & p = subbPtr.p->m_reqTracker;
+    ndbrequire(p.init<SubStopRef>(c_counterMgr, rg, GSN_SUB_STOP_REF,
+                                  subbPtr.i));
+
+    SubStopReq* req = (SubStopReq*) signal->getDataPtrSend();
+
+    req->senderRef  = reference();
+    req->senderData = subbPtr.i;
+    req->subscriptionId = subbPtr.p->m_subscriptionId;
+    req->subscriptionKey = subbPtr.p->m_subscriptionKey;
+    req->subscriberRef = subbPtr.p->m_subscriberRef;
+    req->subscriberData = subbPtr.p->m_subscriberData;
+    sendSignal(rg, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
     return;
   }
 #ifdef EVENT_DEBUG
   ndbout_c("SUB_START_CONF");
 #endif
   
+  ndbrequire(c_outstanding_sub_startstop);
+  c_outstanding_sub_startstop--;
   SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
   * conf = subbPtr.p->m_sub_start_conf;
   sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_CONF,
@@ -8997,9 +9120,10 @@
     //      ret->setErrorNode(reference());
     ref->senderRef = reference();
     ref->errorCode = errCode;
+    ref->masterNodeId = c_masterNodeId;
 
     sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
-	       SubStopRef::SignalLength, JBB);
+	       SubStopRef::SL_MasterNode, JBB);
     return;
   }
 
@@ -9008,6 +9132,11 @@
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_STOP_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = req->subscriberRef;
+    subbPtr.p->m_subscriberData = req->subscriberData;
   }
   
   if (refToBlock(origSenderRef) != DBDICT) {
@@ -9015,6 +9144,23 @@
      * Coordinator
      */
     jam();
+
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStopRef::NotMaster;
+      goto busy;
+    }
+
+    if (!c_sub_startstop_lock.isclear())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStopRef::BusyWithNR;
+      goto busy;
+    }
+
 #ifdef EVENT_DEBUG
     ndbout_c("SUB_STOP_REQ 1");
 #endif
@@ -9033,7 +9179,8 @@
     
     req->senderRef  = reference();
     req->senderData = subbPtr.i;
-    
+
+    c_outstanding_sub_startstop++;
     sendSignal(rg, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
     return;
   }
@@ -9144,6 +9291,23 @@
     return;
   }
 
+  ndbrequire(c_outstanding_sub_startstop);
+  c_outstanding_sub_startstop--;
+
+  if (subbPtr.p->m_gsn == GSN_SUB_START_REQ)
+  {
+    jam();
+    SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
+    ref->senderRef  = reference();
+    ref->senderData = subbPtr.p->m_senderData;
+    ref->errorCode  = subbPtr.p->m_errorCode;
+
+    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
+	       signal, SubStartRef::SignalLength, JBB);
+    c_opSubEvent.release(subbPtr);
+    return;
+  }
+
   if (subbPtr.p->m_reqTracker.hasRef()) {
     jam();
 #ifdef EVENT_DEBUG
@@ -9354,6 +9518,11 @@
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_REMOVE_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = RNIL;
+    subbPtr.p->m_subscriberData = RNIL;
   }
 
   SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend();

--- 1.24/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-12-14 13:48:09 +01:00
+++ 1.25/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-12-14 13:48:09 +01:00
@@ -557,6 +557,9 @@
   void execSUB_REMOVE_CONF(Signal* signal);
   void execSUB_REMOVE_REF(Signal* signal);
 
+  void execDICT_LOCK_REQ(Signal* signal);
+  void execDICT_UNLOCK_ORD(Signal* signal);
+
   // Trigger signals
   void execCREATE_TRIG_REQ(Signal* signal);
   void execCREATE_TRIG_CONF(Signal* signal);
@@ -1368,6 +1371,12 @@
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_errorCode;
+
+    Uint32 m_gsn;
+    Uint32 m_subscriptionId;
+    Uint32 m_subscriptionKey;
+    Uint32 m_subscriberRef;
+    Uint32 m_subscriberData;
     union {
       SubStartConf m_sub_start_conf;
       SubStopConf m_sub_stop_conf;
@@ -2165,6 +2174,12 @@
   int getMetaTable(MetaData::Table& table, const char* tableName);
   int getMetaAttribute(MetaData::Attribute& attribute, const MetaData::Table&
table, Uint32 attributeId);
   int getMetaAttribute(MetaData::Attribute& attribute, const MetaData::Table&
table, const char* attributeName);
+
+  Uint32 c_outstanding_sub_startstop;
+  NdbNodeBitmask c_sub_startstop_lock;
+
+protected:
+  virtual void getBlockConfig(const char * name, Uint32*);
 };
 
 #endif

--- 1.20/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2007-12-14 13:48:09 +01:00
+++ 1.21/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2007-12-14 13:48:09 +01:00
@@ -92,6 +92,7 @@
     NORMAL = 0,
     WAITING_FOR_FAILCONF1 = 1,
     WAITING_FOR_FAILCONF2 = 2,
+    WAITING_FOR_FAILCONF3 = 3,
     WAITING_FOR_NDB_FAILCONF = 3
   };
 
@@ -154,7 +155,6 @@
     QmgrState sendCommitFailReqStatus;
     QmgrState sendPresToStatus;
     FailState failState;
-    BlockReference rcv[2];        // remember which failconf we have received
     BlockReference blockRef;
 
     NodeRec() { }

--- 1.41/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2007-12-14 13:48:09 +01:00
+++ 1.42/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2007-12-14 13:48:09 +01:00
@@ -264,8 +264,6 @@
     nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
     nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
     nodePtr.p->failState = NORMAL;
-    nodePtr.p->rcv[0] = 0;
-    nodePtr.p->rcv[1] = 0;
   }//for
 }
 
@@ -2486,30 +2484,26 @@
   failedNodePtr.i = signal->theData[0];  
   ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec);
 
-  if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF1){
+  if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF1)
+  {
     jam();
-
-    failedNodePtr.p->rcv[0] = signal->theData[1];
     failedNodePtr.p->failState = WAITING_FOR_FAILCONF2;
-
-  } else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF2) {
-    failedNodePtr.p->rcv[1] = signal->theData[1];
+  }
+  else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF2)
+  {
+    jam();
+    failedNodePtr.p->failState = WAITING_FOR_FAILCONF3;
+  }
+  else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF3)
+  {
+    jam();
     failedNodePtr.p->failState = NORMAL;
-
-    if (failedNodePtr.p->rcv[0] == failedNodePtr.p->rcv[1]) {
-      jam();
-      systemErrorLab(signal, __LINE__);
-    } else {
-      jam();
-      failedNodePtr.p->rcv[0] = 0;
-      failedNodePtr.p->rcv[1] = 0;
-    }//if
-  } else {
+  }
+  else
+  {
     jam();
-#ifdef VM_TRACE
     ndbout << "failedNodePtr.p->failState = "
 	   << (Uint32)(failedNodePtr.p->failState) << endl;
-#endif   
     systemErrorLab(signal, __LINE__);
   }//if
   return;
@@ -2724,7 +2718,7 @@
     signal->theData[0] = nodeId;
     signal->theData[1] = QMGR_REF;
     sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
-    failedNodePtr.p->failState = NORMAL;
+    failedNodePtr.p->failState = WAITING_FOR_FAILCONF3;
   }
 
   failedNodePtr.p->phase = ZFAIL_CLOSING;
@@ -5042,6 +5036,12 @@
     c_error_insert_extra = signal->theData[1];
   }
 #endif
+
+  if (signal->theData[0] == 900 && signal->getLength() == 2)
+  {
+    ndbout_c("disconnecting %u", signal->theData[1]);
+    api_failed(signal, signal->theData[1]);
+  }
 }//Qmgr::execDUMP_STATE_ORD()
 
 void Qmgr::execSET_VAR_REQ(Signal* signal) 

--- 1.43/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-12-14 13:48:09 +01:00
+++ 1.44/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-12-14 13:48:09 +01:00
@@ -44,11 +44,15 @@
 #include <signaldata/AlterTab.hpp>
 #include <signaldata/SystemError.hpp>
 
+#include <signaldata/DictLock.hpp>
 #include <ndbapi/NdbDictionary.hpp>
 
 #include <DebuggerNames.hpp>
 #include <../dbtup/Dbtup.hpp>
 
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
+
 //#define HANDOVER_DEBUG
 //#define NODEFAIL_DEBUG
 //#define NODEFAIL_DEBUG2
@@ -88,6 +92,16 @@
 
 #define PRINT_ONLY 0
 
+#include <ndb_version.h>
+
+static
+inline
+bool
+ndbd_suma_dictlock(Uint32 x)
+{
+  return x >= NDBD_SUMA_DICT_LOCK;
+}
+
 void
 Suma::getNodeGroupMembers(Signal* signal)
 {
@@ -173,9 +187,28 @@
   c_tables.setSize(noTables);
   
   c_subscriptions.setSize(noTables);
-  c_subscriberPool.setSize(2*noTables);
+
+  Uint32 cnt = 0;
+  cnt = 0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
+  if (cnt == 0)
+  {
+    jam();
+    cnt = noTables;
+  }
+  c_subscriptionPool.setSize(cnt);
+
+  cnt *= 2;
+  ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &cnt);
+  c_subscriberPool.setSize(cnt);
+
+  cnt = 0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
+  if (cnt)
+    c_subOpPool.setSize(cnt);
+  else
+    c_subOpPool.setSize(256);
   
-  c_subscriptionPool.setSize(noTables);
   c_syncPool.setSize(2);
   c_dataBufferPool.setSize(noAttrs);
 
@@ -263,16 +296,20 @@
       DBUG_VOID_RETURN;
     }
     
-    c_startup.m_restart_server_node_id = 0;
     getNodeGroupMembers(signal);
     if (typeOfStart == NodeState::ST_NODE_RESTART ||
 	typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
     {
       jam();
       
-      send_start_me_req(signal);
+      send_dict_lock_req(signal);
       return;
     }
+    else
+    {
+      jam();
+      c_startup.m_restart_server_node_id = 0;
+    }
   }
   
   if(startphase == 7)
@@ -345,6 +382,47 @@
 }
 
 void
+Suma::send_dict_lock_req(Signal* signal)
+{
+  if (ndbd_suma_dictlock(getNodeInfo(c_masterNodeId).m_version))
+  {
+    jam();
+    DictLockReq* req = (DictLockReq*)signal->getDataPtrSend();
+    req->lockType = DictLockReq::SumaStartMe;
+    req->userPtr = 0;
+    req->userRef = reference();
+    sendSignal(calcDictBlockRef(c_masterNodeId),
+               GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
+  }
+  else
+  {
+    jam();
+    c_startup.m_restart_server_node_id = 0;
+    send_start_me_req(signal);
+  }
+}
+
+void
+Suma::execDICT_LOCK_CONF(Signal* signal)
+{
+  jamEntry();
+  c_startup.m_restart_server_node_id = 0;
+  send_start_me_req(signal);
+}
+
+void
+Suma::execDICT_LOCK_REF(Signal* signal)
+{
+  jamEntry();
+
+  DictLockRef* ref = (DictLockRef*)signal->getDataPtr();
+
+  ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
+  signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
+  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 1);
+}
+
+void
 Suma::send_start_me_req(Signal* signal)
 {
   Uint32 nodeId= c_startup.m_restart_server_node_id;
@@ -387,6 +465,18 @@
   infoEvent("Suma: node %d has completed restoring me", 
 	    c_startup.m_restart_server_node_id);
   sendSTTORRY(signal);  
+
+  if (ndbd_suma_dictlock(getNodeInfo(c_masterNodeId).m_version))
+  {
+    jam();
+    DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend();
+    ord->lockPtr = 0;
+    ord->lockType = DictLockReq::SumaStartMe;
+    ord->senderData = 0;
+    ord->senderRef = reference();
+    sendSignal(calcDictBlockRef(c_masterNodeId),
+               GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB);
+  }
   c_startup.m_restart_server_node_id= 0;
 }
 
@@ -529,6 +619,23 @@
   case SumaContinueB::OUT_OF_BUFFER_RELEASE:
     out_of_buffer_release(signal, signal->theData[1]);
     return;
+  case SumaContinueB::API_FAIL_GCI_LIST:
+    api_fail_gci_list(signal, signal->theData[1]);
+    return;
+  case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
+    api_fail_subscriber_list(signal,
+                             signal->theData[1]);
+    return;
+  case SumaContinueB::API_FAIL_SUBSCRIPTION:
+    api_fail_subscription(signal);
+    return;
+  case SumaContinueB::SUB_STOP_REQ:
+    sub_stop_req(signal);
+    return;
+  case SumaContinueB::RETRY_DICT_LOCK:
+    jam();
+    send_dict_lock_req(signal);
+    return;
   }
 }
 
@@ -543,169 +650,262 @@
   jamEntry();
   DBUG_ENTER("Suma::execAPI_FAILREQ");
   Uint32 failedApiNode = signal->theData[0];
-  //BlockReference retRef = signal->theData[1];
+  BlockReference retRef = signal->theData[1];
+
+  c_connected_nodes.clear(failedApiNode);
 
   if (c_failedApiNodes.get(failedApiNode))
   {
     jam();
-    return;
+    goto CONF;
   }
 
   if (!c_subscriber_nodes.get(failedApiNode))
   {
     jam();
-    return;
+    goto CONF;
   }
 
   c_failedApiNodes.set(failedApiNode);
-  c_connected_nodes.clear(failedApiNode);
-  bool found = removeSubscribersOnNode(signal, failedApiNode);
+  c_subscriber_nodes.clear(failedApiNode);
+  
+  check_start_handover(signal);
 
-  if(!found){
-    jam();
-    c_failedApiNodes.clear(failedApiNode);
-  }
+  signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
+  signal->theData[1] = failedApiNode;
+  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
+  return;
+
+CONF:
+  signal->theData[0] = failedApiNode;
+  signal->theData[1] = reference();
+  sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+
+  DBUG_VOID_RETURN;
+}//execAPI_FAILREQ()
+
+void
+Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
+{
+  jam();
 
-  SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
   Ptr<Gcp_record> gcp;
-  for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
+  if (c_gcp_list.first(gcp))
   {
     jam();
-    ack->rep.gci = gcp.p->m_gci;
-    if(gcp.p->m_subscribers.get(failedApiNode))
+    gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
+
+    if (gcp.p->m_subscribers.isclear())
     {
       jam();
-      ack->rep.senderRef = numberToRef(0, failedApiNode);
-      sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal, 
-		 SubGcpCompleteAck::SignalLength, JBB);
+
+      SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend();
+      ack->rep.gci = gcp.p->m_gci;
+      ack->rep.senderRef = reference();
+      NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
+      sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
+                 SubGcpCompleteAck::SignalLength, JBB);
+
+      c_gcp_list.release(gcp);
+
+      signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
+      signal->theData[1] = nodeId;
+      sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
+      return;
     }
   }
 
-  c_subscriber_nodes.clear(failedApiNode);
-  
-  check_start_handover(signal);
+  if (ERROR_INSERTED(13023))
+  {
+    CLEAR_ERROR_INSERT_VALUE;
+  }
 
-  DBUG_VOID_RETURN;
-}//execAPI_FAILREQ()
+  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
+  signal->theData[1] = nodeId;
+  signal->theData[2] = RNIL; // SubOpPtr
+  signal->theData[3] = RNIL; // c_subscribers bucket
+  signal->theData[4] = RNIL; // subscriptionId
+  signal->theData[5] = RNIL; // SubscriptionKey
 
-bool
-Suma::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
+  Ptr<SubOpRecord> subOpPtr;
+  if (c_subOpPool.seize(subOpPtr))
+  {
+    signal->theData[2] = subOpPtr.i;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
+  }
+  else
+  {
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
+
+  return;
+}
+
+void
+Suma::api_fail_subscriber_list(Signal* signal, Uint32 nodeId)
 {
-  DBUG_ENTER("Suma::removeSubscribersOnNode");
-  bool found = false;
+  jam();
 
-  KeyTable<Table>::Iterator it;
-  for(c_tables.first(it);!it.isNull();c_tables.next(it))
+  Ptr<SubOpRecord> subOpPtr;
+  subOpPtr.i = signal->theData[2];
+  if (subOpPtr.i == RNIL)
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
-    SubscriberPtr i_subbPtr;
-    for(subbs.first(i_subbPtr);!i_subbPtr.isNull();)
+    if (c_subOpPool.seize(subOpPtr))
     {
-      SubscriberPtr subbPtr = i_subbPtr;
-      subbs.next(i_subbPtr);
-      jam();
-      if (refToNode(subbPtr.p->m_senderRef) == nodeId) {
-	jam();
-	subbs.remove(subbPtr);
-	c_removeDataSubscribers.add(subbPtr);
-	found = true;
-      }
+      signal->theData[3] = RNIL;
     }
-    if (subbs.isEmpty())
+    else
     {
-      // ToDo handle this
+      jam();
+      sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+      return;
     }
   }
-  if(found){
+  else
+  {
     jam();
-    sendSubStopReq(signal);
+    c_subOpPool.getPtr(subOpPtr);
   }
-  DBUG_RETURN(found);
-}
 
-void
-Suma::sendSubStopReq(Signal *signal, bool unlock){
-  static bool remove_lock = false;
-  jam();
-  DBUG_ENTER("Suma::sendSubStopReq");
+  Uint32 bucket = signal->theData[3];
+  Uint32 subscriptionId = signal->theData[4];
+  Uint32 subscriptionKey = signal->theData[5];
 
-  SubscriberPtr subbPtr;
-  c_removeDataSubscribers.first(subbPtr);
-  if (subbPtr.isNull()){
+  DLHashTable<Subscription>::Iterator iter;
+  if (bucket == RNIL)
+  {
+    jam();
+    c_subscriptions.first(iter);
+  }
+  else
+  {
     jam();
-#if 0
-    signal->theData[0] = failedApiNode;
-    signal->theData[1] = reference();
-    sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
-#endif
-    c_failedApiNodes.clear();
 
-    remove_lock = false;
-    DBUG_VOID_RETURN;
+    Subscription key;
+    key.m_subscriptionId = subscriptionId;
+    key.m_subscriptionKey = subscriptionKey;
+    if (c_subscriptions.find(iter.curr, key) == false)
+    {
+      jam();
+      /**
+       * We restart from this bucket :-(
+       */
+      c_subscriptions.next(bucket, iter);
+    }
+    else
+    {
+      iter.bucket = bucket;
+    }
   }
 
-  if(remove_lock && !unlock) {
+  if (iter.curr.isNull())
+  {
     jam();
-    DBUG_VOID_RETURN;
+    signal->theData[0] = nodeId;
+    signal->theData[1] = reference();
+    sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+    c_failedApiNodes.clear(nodeId);
+    return;
   }
-  remove_lock = true;
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+  subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
+  subOpPtr.p->m_subPtrI = iter.curr.i;
+  subOpPtr.p->m_senderRef = nodeId;
+  subOpPtr.p->m_senderData = iter.bucket;
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
-  req->senderRef       = reference();
-  req->senderData      = subbPtr.i;
-  req->subscriberRef   = subbPtr.p->m_senderRef;
-  req->subscriberData  = subbPtr.p->m_senderData;
-  req->subscriptionId  = subPtr.p->m_subscriptionId;
-  req->subscriptionKey = subPtr.p->m_subscriptionKey;
-  req->part = SubscriptionData::TableData;
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req);
+  bool empty = list.isEmpty();
+  list.add(subOpPtr);
 
-  sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
-  DBUG_VOID_RETURN;
+  if (empty)
+  {
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
 }
 
 void
-Suma::execSUB_STOP_CONF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_STOP_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  sendSubStopReq(signal,true);
-  DBUG_VOID_RETURN;
-}
+Suma::api_fail_subscription(Signal* signal)
+{
+  jam();
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
 
-void
-Suma::execSUB_STOP_REF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_STOP_REF");
-  ndbassert(signal->getNoOfSections() == 0);
+  Uint32 nodeId = subOpPtr.p->m_senderRef;
 
-  SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
 
-  Uint32 senderData      = ref->senderData;
-  Uint32 subscriptionId  = ref->subscriptionId;
-  Uint32 subscriptionKey = ref->subscriptionKey;
-  Uint32 part            = ref->part;
-  Uint32 subscriberData  = ref->subscriberData;
-  Uint32 subscriberRef   = ref->subscriberRef;
+  Ptr<Subscriber> ptr;
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    if (signal->theData[2] == RNIL)
+    {
+      jam();
+      list.first(ptr);
+    }
+    else
+    {
+      jam();
+      list.getPtr(ptr, signal->theData[2]);
+    }
 
-  if(ref->errorCode != 1411){
-    ndbrequire(false);
+    for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
+    {
+      jam();
+      if (refToNode(ptr.p->m_senderRef) == nodeId)
+      {
+        jam();
+        Ptr<Subscriber> tmp = ptr;
+        list.next(ptr);
+        list.release(tmp);
+      }
+      else
+      {
+        jam();
+        list.next(ptr);
+      }
+    }
   }
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
-  req->senderRef       = reference();
-  req->senderData      = senderData;
-  req->subscriberRef   = subscriberRef;
-  req->subscriberData  = subscriberData;
-  req->subscriptionId  = subscriptionId;
-  req->subscriptionKey = subscriptionKey;
-  req->part = part;
+  if (!ptr.isNull())
+  {
+    jam();
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = ptr.i;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  }
 
-  sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
+  // Start potential waiter(s)
+  check_remove_queue(signal, subPtr, subOpPtr, true, false);
+  check_release_subscription(signal, subPtr);
+
+  // Continue iterating through subscriptions
+  DLHashTable<Subscription>::Iterator iter;
+  iter.bucket = subOpPtr.p->m_senderData;
+  iter.curr = subPtr;
+
+  if (c_subscriptions.next(iter))
+  {
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
+    signal->theData[1] = nodeId;
+    signal->theData[2] = subOpPtr.i;
+    signal->theData[3] = iter.bucket;
+    signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId
+    signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
+    return;
+  }
 
-  DBUG_VOID_RETURN;
+  c_subOpPool.release(subOpPtr);
+  signal->theData[0] = nodeId;
+  signal->theData[1] = reference();
+  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+  c_failedApiNodes.clear(nodeId);
 }
 
 void
@@ -717,9 +917,22 @@
   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
   NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
   
-  if(failed.get(Restart.nodeId))
+  if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref)))
   {
-    Restart.resetRestart(signal);
+    jam();
+
+    if (c_restart.m_waiting_on_self)
+    {
+      jam();
+      c_restart.m_abort = 1;
+    }
+    else
+    {
+      jam();
+      Ptr<Subscription> subPtr;
+      c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
+      abort_start_me(signal, subPtr, false);
+    }
   }
 
   if (ERROR_INSERTED(13032))
@@ -870,16 +1083,16 @@
 	      c_dataBufferPool.getSize(),
 	      c_dataBufferPool.getNoOfFree());
 
-    infoEvent("Suma: c_metaSubscribers count: %d",
-	      count_subscribers(c_metaSubscribers));
+    infoEvent("Suma: c_subOpPool  size: %d free: %d",
+	      c_subOpPool.getSize(),
+	      c_subOpPool.getNoOfFree());
+
 #if 0
     infoEvent("Suma: c_dataSubscribers count: %d",
 	      count_subscribers(c_dataSubscribers));
     infoEvent("Suma: c_prepDataSubscribers count: %d",
 	      count_subscribers(c_prepDataSubscribers));
 #endif
-    infoEvent("Suma: c_removeDataSubscribers count: %d",
-	      count_subscribers(c_removeDataSubscribers));
   }
 
   if(tCase == 8005)
@@ -949,21 +1162,76 @@
         return;
       }
 
-      infoEvent("Table: %u ver: %u #n: %u (ref,data,subscritopn)",
+      infoEvent("Table %u ver %u",
                 it.curr.p->m_tableId,
-                it.curr.p->m_schemaVersion,
-                it.curr.p->n_subscribers);
+                it.curr.p->m_schemaVersion);
 
-      Ptr<Subscriber> ptr;
-      LocalDLList<Subscriber> list(c_subscriberPool, it.curr.p->c_subscribers);
-      for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+      Uint32 cnt = 0;
+      Ptr<Subscription> subPtr;
+      LocalDLList<Subscription> subList(c_subscriptionPool,
+                                        it.curr.p->m_subscriptions);
+      for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
       {
-        jam();
-        infoEvent(" [ %x %u %u ]", 
-                  ptr.p->m_senderRef,
-                  ptr.p->m_senderData,
-                  ptr.p->m_subPtrI);
+        infoEvent(" Subcription %u", subPtr.i);
+        {
+          Ptr<Subscriber> ptr;
+          LocalDLList<Subscriber> list(c_subscriberPool,
+                                       subPtr.p->m_subscribers);
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            cnt++;
+            infoEvent("  Subscriber [ %x %u %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData,
+                      subPtr.i);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                       subPtr.p->m_create_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  create [ %x %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                       subPtr.p->m_start_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  start [ %x %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                        subPtr.p->m_stop_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  stop [ %u %x %u ]",
+                      ptr.p->m_opType,
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
       }
+      infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
       c_tables.next(it);
     }
 
@@ -1104,19 +1372,6 @@
  *
  * Creation of subscriptions
  */
-
-void 
-Suma::addTableId(Uint32 tableId,
-			    SubscriptionPtr subPtr, SyncRecord *psyncRec)
-{
-  DBUG_ENTER("Suma::addTableId");
-  DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId, subPtr.i));
-  subPtr.p->m_tableId= tableId;
-  if(psyncRec != NULL)
-    psyncRec->m_tableList.append(&tableId, 1);
-  DBUG_VOID_RETURN;
-}
-
 void
 Suma::execSUB_CREATE_REQ(Signal* signal)
 {
@@ -1127,24 +1382,13 @@
 
   const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();    
   
-  const Uint32 subRef  = req.senderRef;
-  const Uint32 subData = req.senderData;
+  const Uint32 senderRef  = req.senderRef;
+  const Uint32 senderData = req.senderData;
   const Uint32 subId   = req.subscriptionId;
   const Uint32 subKey  = req.subscriptionKey;
   const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;
   const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
-  const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
-  const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;
   const Uint32 tableId = req.tableId;
-  Subscription::State state = (Subscription::State) req.state;
-  if (signal->getLength() != SubCreateReq::SignalLength2)
-  {
-    /*
-      api or restarted by older version
-      if restarted by old version, do the best we can
-    */
-    state = Subscription::DEFINED;
-  }
 
   Subscription key;
   key.m_subscriptionId  = subId;
@@ -1154,80 +1398,192 @@
 		      key.m_subscriptionId, key.m_subscriptionKey));
 
   SubscriptionPtr subPtr;
+  bool found = c_subscriptions.find(subPtr, key);
 
-  if (addTableFlag) {
-    ndbrequire(restartFlag);  //TODO remove this
+  if (c_startup.m_restart_server_node_id == RNIL)
+  {
+    jam();
 
-    if(!c_subscriptions.find(subPtr, key)) {
-      jam();
-      sendSubCreateRef(signal, 1407);
-      DBUG_VOID_RETURN;
-    }
+    /**
+     * We havent started syncing yet
+     */
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::NF_FakeErrorREF);
+    return;
+  }
+
+  bool allowDup = true; //c_startup.m_restart_server_node_id;
+
+  if (found && !allowDup)
+  {
+    jam();
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::SubscriptionAlreadyExist);
+    return;
+  }
+
+  if (found == false)
+  {
     jam();
-    if (restartFlag)
+    if(!c_subscriptions.seize(subPtr))
     {
-      ndbrequire(type != SubCreateReq::SingleTableScan);
-      ndbrequire(req.tableId != subPtr.p->m_tableId);
-      ndbrequire(type != SubCreateReq::TableEvent);
-      addTableId(req.tableId, subPtr, 0);
+      jam();
+      sendSubCreateRef(signal, senderRef, senderData,
+                       SubCreateRef::OutOfSubscriptionRecords);
+      return;
     }
-  } else {
-    if (c_startup.m_restart_server_node_id && 
-        subRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+
+    new (subPtr.p) Subscription();
+    subPtr.p->m_seq_no           = c_current_seq;
+    subPtr.p->m_subscriptionId   = subId;
+    subPtr.p->m_subscriptionKey  = subKey;
+    subPtr.p->m_subscriptionType = type;
+    subPtr.p->m_tableId          = tableId;
+    subPtr.p->m_table_ptrI       = RNIL;
+    subPtr.p->m_state            = Subscription::UNDEFINED;
+    subPtr.p->m_trigger_state    =  Subscription::T_UNDEFINED;
+    subPtr.p->m_triggers[0]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_triggers[1]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_triggers[2]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_errorCode        = 0;
+  }
+
+  Ptr<SubOpRecord> subOpPtr;
+  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req);
+  if (subOpList.seize(subOpPtr) == false)
+  {
+    jam();
+    if (found == false)
     {
-      /**
-       * only allow "restart_server" Suma's to come through 
-       * for restart purposes
-       */
       jam();
-      sendSubCreateRef(signal, 1415);
-      DBUG_VOID_RETURN;
+      c_subscriptions.release(subPtr);
     }
-    // Check that id/key is unique
-    if(c_subscriptions.find(subPtr, key)) {
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::OutOfTableRecords);
+    return;
+  }
+
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+
+  TablePtr tabPtr;
+  if (c_tables.find(tabPtr, tableId))
+  {
+    jam();
+  }
+  else
+  {
+    jam();
+    if (c_tablePool.seize(tabPtr) == false)
+    {
       jam();
-      sendSubCreateRef(signal, 1415);
-      DBUG_VOID_RETURN;
+      subOpList.release(subOpPtr);
+      c_subscriptions.release(subPtr);
+      sendSubCreateRef(signal, senderRef, senderData,
+                       SubCreateRef::OutOfTableRecords);
+      return;
     }
-    if(!c_subscriptions.seize(subPtr)) {
+
+    new (tabPtr.p) Table;
+    tabPtr.p->m_tableId= tableId;
+    tabPtr.p->m_ptrI= tabPtr.i;
+    tabPtr.p->m_error = 0;
+    tabPtr.p->m_schemaVersion = RNIL;
+    tabPtr.p->m_state = Table::UNDEFINED;
+    c_tables.add(tabPtr);
+  }
+
+  if (found == false)
+  {
+    jam();
+    c_subscriptions.add(subPtr);
+    LocalDLList<Subscription> list(c_subscriptionPool,
+                                   tabPtr.p->m_subscriptions);
+    list.add(subPtr);
+    subPtr.p->m_table_ptrI = tabPtr.i;
+  }
+
+  switch(tabPtr.p->m_state){
+  case Table::DEFINED:{
+    jam();
+    // Send conf
+    subOpList.release(subOpPtr);
+    subPtr.p->m_state = Subscription::DEFINED;
+    SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+    conf->senderRef  = reference();
+    conf->senderData = senderData;
+    sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
+               SubCreateConf::SignalLength, JBB);
+    return;
+  }
+  case Table::UNDEFINED:{
+    jam();
+    tabPtr.p->m_state = Table::DEFINING;
+    subPtr.p->m_state = Subscription::DEFINING;
+
+    if (ERROR_INSERTED(13031))
+    {
       jam();
-      sendSubCreateRef(signal, 1412);
-      DBUG_VOID_RETURN;
+      CLEAR_ERROR_INSERT_VALUE;
+      GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
+      ref->tableId = tableId;
+      ref->senderData = tabPtr.i;
+      ref->errorCode = GetTabInfoRef::TableNotDefined;
+      sendSignal(reference(), GSN_GET_TABINFOREF, signal,
+                 GetTabInfoRef::SignalLength, JBB);
+      return;
     }
-    DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-		       c_subscriptionPool.getSize(),
-		       c_subscriptionPool.getNoOfFree()));
+
+    GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
+    req->senderRef = reference();
+    req->senderData = tabPtr.i;
+    req->requestType =
+      GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+    req->tableId = tableId;
+
+    sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
+               GetTabInfoReq::SignalLength, JBB);
+    return;
+  }
+  case Table::DEFINING:
+  {
     jam();
-    subPtr.p->m_senderRef        = subRef;
-    subPtr.p->m_senderData       = subData;
-    subPtr.p->m_subscriptionId   = subId;
-    subPtr.p->m_subscriptionKey  = subKey;
-    subPtr.p->m_subscriptionType = type;
-    subPtr.p->m_tableId          = tableId;
-    subPtr.p->m_table_ptrI       = RNIL;
-    subPtr.p->m_state            = state;
-    subPtr.p->n_subscribers      = 0;
+    /**
+     * just wait for completion
+     */
+    subPtr.p->m_state = Subscription::DEFINING;
+    return;
+  }
+  case Table::DROPPED:
+  case Table::ALTERED:
+  {
+    subOpList.release(subOpPtr);
 
-    DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		       key.m_subscriptionId, key.m_subscriptionKey));
+    {
+      LocalDLList<Subscription> list(c_subscriptionPool,
+                                     tabPtr.p->m_subscriptions);
+      list.remove(subPtr);
+    }
+    c_subscriptions.release(subPtr);
 
-    c_subscriptions.add(subPtr);
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::TableDropped);
+    return;
+  }
   }
 
-  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
-  conf->senderRef  = reference();
-  conf->senderData = subPtr.p->m_senderData;
-  sendSignal(subRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);
-  DBUG_VOID_RETURN;
+  ndbrequire(false);
 }
 
 void
-Suma::sendSubCreateRef(Signal* signal, Uint32 errCode)
+Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data,
+                       Uint32 errCode)
 {
   jam();
   SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
   ref->errorCode  = errCode;
-  sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 
+  ref->senderData = data;
+  sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
 	     SubCreateRef::SignalLength, JBB);
   return;
 }
@@ -1242,8 +1598,7 @@
 Suma::execSUB_SYNC_REQ(Signal* signal)
 {
   jamEntry();
-  DBUG_ENTER("Suma::execSUB_SYNC_REQ");
-  ndbassert(signal->getNoOfSections() <= 1);
+
   CRASH_INSERTION(13004);
 
   SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
@@ -1253,15 +1608,11 @@
   key.m_subscriptionId = req->subscriptionId;
   key.m_subscriptionKey = req->subscriptionKey;
 
-  DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		      key.m_subscriptionId, key.m_subscriptionKey));
-
   if(!c_subscriptions.find(subPtr, key))
   {
     jam();
-    DBUG_PRINT("info",("Not found"));
     sendSubSyncRef(signal, 1407);
-    DBUG_VOID_RETURN;
+    return;
   }
 
   bool ok = false;
@@ -1272,11 +1623,9 @@
   {
     jam();
     sendSubSyncRef(signal, 1416);
-    DBUG_VOID_RETURN;
+    return;
   }
-  DBUG_PRINT("info",("c_syncPool  size: %d free: %d",
-		     c_syncPool.getSize(),
-		     c_syncPool.getNoOfFree()));
+
   new (syncPtr.p) Ptr<SyncRecord>;
   syncPtr.p->m_senderRef        = req->senderRef;
   syncPtr.p->m_senderData       = req->senderData;
@@ -1296,53 +1645,7 @@
     }
   }
 
-  TablePtr tabPtr;
-  initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr);
-  tabPtr.p->n_subscribers++;
-  DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		     tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-  DBUG_VOID_RETURN;
-
-  switch(part){
-  case SubscriptionData::MetaData:
-    ndbrequire(false);
-#if 0
-    ok = true;
-    jam();
-    if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
-      TableList::DataBufferIterator it;
-      syncPtr.p->m_tableList.first(it);
-      if(it.isNull()) {
-	/**
-	 * Get all tables from dict
-	 */
-	ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
-	req->senderRef   = reference();
-	req->senderData  = syncPtr.i;
-	req->requestData = 0;
-	/**
-	 * @todo: accomodate scan of index tables?
-	 */
-	req->setTableType(DictTabInfo::UserTable);
-
-	sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 
-		   ListTablesReq::SignalLength, JBB);
-	break;
-      }
-    }
-
-    syncPtr.p->startMeta(signal);
-#endif
-    break;
-  case SubscriptionData::TableData: {
-    ok = true;
-    jam();
-    syncPtr.p->startScan(signal);
-    break;
-  }
-  }
-  ndbrequire(ok);
-  DBUG_VOID_RETURN;
+  syncPtr.p->startScan(signal);
 }
 
 void
@@ -1363,312 +1666,10 @@
  * Dict interface
  */
 
-#if 0
-void
-Suma::execLIST_TABLES_CONF(Signal* signal){
-  jamEntry();
-  CRASH_INSERTION(13005);
-  ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
-  SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
-  tmp->runLIST_TABLES_CONF(signal);
-}
-#endif
-
-
 /*************************************************************************
  *
  *
  */
-#if 0
-void
-Suma::Table::runLIST_TABLES_CONF(Signal* signal){
-  jam();
-
-  ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
-  const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
-
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-
-  for (unsigned i = 0; i < len; i++) {
-    subPtr.p->m_maxTables++;
-    suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
-  }
-
-  //  for (unsigned i = 0; i < len; i++)
-  //    conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
-  //  m_tableList.append(&conf->tableData[0], len);
-
-#if 0 
-  TableList::DataBufferIterator it;
-  int i = 0;
-  for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
-    ndbout_c("%u listtableconf tableid %d", i++, *it.data);
-  }
-#endif
-
-  if(len == ListTablesConf::DataLength){
-    jam();
-    // we expect more LIST_TABLE_CONF
-    return;
-  }
-
-#if 0
-  subPtr.p->m_currentTable = 0;
-  subPtr.p->m_maxTables    = 0;
-
-  TableList::DataBufferIterator it;
-  for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
-    subPtr.p->m_maxTables++;
-    suma.addTableId(*it.data, subPtr, NULL);
-#ifdef NODEFAIL_DEBUG
-    ndbout_c(" listtableconf tableid %d",*it.data);
-#endif
-  }
-#endif
-  
-  startMeta(signal);
-}
-#endif
-
-
-int 
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
-			   SubscriberPtr subbPtr)
-{
-  DBUG_ENTER("Suma::initTable SubscriberPtr");
-  DBUG_PRINT("enter",("tableId: %d", tableId));
-
-  int r= initTable(signal,tableId,tabPtr);
-
-  {
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    subscribers.add(subbPtr);
-  }
-
-  DBUG_PRINT("info",("added subscriber: %i", subbPtr.i));
-  
-  if (r)
-  {
-    jam();
-    // we have to wait getting tab info
-    DBUG_RETURN(1);
-  }
-
-  if (tabPtr.p->setupTrigger(signal, *this))
-  {
-    jam();
-    // we have to wait for triggers to be setup
-    DBUG_RETURN(1);
-  }
-
-  int ret = completeOneSubscriber(signal, tabPtr, subbPtr);
-  if (ret == -1)
-  {
-    jam();
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    subscribers.release(subbPtr);
-  }
-  completeInitTable(signal, tabPtr);
-  DBUG_RETURN(0);
-}
-
-int 
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
-			   Ptr<SyncRecord> syncPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::initTable Ptr<SyncRecord>");
-  DBUG_PRINT("enter",("tableId: %d", tableId));
-
-  int r= initTable(signal,tableId,tabPtr);
-
-  {
-    LocalDLList<SyncRecord> syncRecords(c_syncPool,tabPtr.p->c_syncRecords);
-    syncRecords.add(syncPtr);
-  }
-
-  if (r)
-  {
-    // we have to wait getting tab info
-    DBUG_RETURN(1);
-  }
-  completeInitTable(signal, tabPtr);
-  DBUG_RETURN(0);
-}
-
-int
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::initTable");
-
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
-  {
-    // table not being prepared
-    // seize a new table, initialize and add to c_tables
-    ndbrequire(c_tablePool.seize(tabPtr));
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       c_tablePool.getSize(),
-		       c_tablePool.getNoOfFree()));
-    new (tabPtr.p) Table;
-
-    tabPtr.p->m_tableId= tableId;
-    tabPtr.p->m_ptrI= tabPtr.i;
-    tabPtr.p->n_subscribers = 0;
-    DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.i, tabPtr.p->n_subscribers));
-
-    tabPtr.p->m_error         = 0;
-    tabPtr.p->m_schemaVersion = RNIL;
-    tabPtr.p->m_state = Table::DEFINING;
-    tabPtr.p->m_hasTriggerDefined[0] = 0;
-    tabPtr.p->m_hasTriggerDefined[1] = 0;
-    tabPtr.p->m_hasTriggerDefined[2] = 0;
-    tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
-    tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
-    tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
-
-    c_tables.add(tabPtr);
-
-    GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
-    req->senderRef = reference();
-    req->senderData = tabPtr.i;
-    req->requestType = 
-      GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
-    req->tableId = tableId;
-
-    DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId));
-
-    if (ERROR_INSERTED(13031))
-    {
-      jam();
-      CLEAR_ERROR_INSERT_VALUE;
-      GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
-      ref->tableId = tableId;
-      ref->senderData = tabPtr.i;
-      ref->errorCode = GetTabInfoRef::TableNotDefined;
-      sendSignal(reference(), GSN_GET_TABINFOREF, signal, 
-		 GetTabInfoRef::SignalLength, JBB);
-      DBUG_RETURN(1);
-    }
-
-    sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
-	       GetTabInfoReq::SignalLength, JBB);
-    DBUG_RETURN(1);
-  }
-  if (tabPtr.p->m_state == Table::DEFINING)
-  {
-    DBUG_RETURN(1);
-  }
-  // ToDo should be a ref signal instead
-  ndbrequire(tabPtr.p->m_state == Table::DEFINED);
-  DBUG_RETURN(0);
-}
-
-int
-Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeOneSubscriber");
-
-  if (tabPtr.p->m_error &&
-      (c_startup.m_restart_server_node_id == 0 ||
-       tabPtr.p->m_state != Table::DROPPED))
-  {
-    jam();
-    sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
-		    SubscriptionData::TableData);
-    tabPtr.p->n_subscribers--;
-    DBUG_RETURN(-1);
-  }
-  else
-  {
-    jam();
-    SubscriptionPtr subPtr;
-    c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-    subPtr.p->m_table_ptrI= tabPtr.i;
-    sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
-			 SubscriptionData::TableData);
-  }
-  DBUG_RETURN(0);
-}
-
-void
-Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeAllSubscribers");
-  // handle all subscribers
-  {
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subscribers.first(subbPtr); !subbPtr.isNull();)
-    {
-      jam();
-      Ptr<Subscriber> tmp = subbPtr;
-      subscribers.next(subbPtr);
-      int ret = completeOneSubscriber(signal, tabPtr, tmp);
-      if (ret == -1)
-      {
-	jam();
-	subscribers.release(tmp);
-      }
-    }
-  }
-  DBUG_VOID_RETURN;
-}
-
-void
-Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeInitTable");
-
-  // handle all syncRecords
-  while (!tabPtr.p->c_syncRecords.isEmpty())
-  {
-    Ptr<SyncRecord> syncPtr;
-    {
-      LocalDLList<SyncRecord> syncRecords(c_syncPool,
-					tabPtr.p->c_syncRecords);
-      syncRecords.first(syncPtr);
-      syncRecords.remove(syncPtr);
-    }
-    syncPtr.p->ptrI = syncPtr.i;
-    if (tabPtr.p->m_error == 0)
-    {
-      jam();
-      syncPtr.p->startScan(signal);
-    }
-    else
-    {
-      jam();
-      syncPtr.p->completeScan(signal, tabPtr.p->m_error);
-      tabPtr.p->n_subscribers--;
-    }
-  }
-  
-  if (tabPtr.p->m_error)
-  {
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(*this);
-  }
-  else
-  {
-    tabPtr.p->m_state = Table::DEFINED;
-  }
-
-  DBUG_VOID_RETURN;
-}
-
-
 void
 Suma::execGET_TABINFOREF(Signal* signal){
   jamEntry();
@@ -1699,17 +1700,43 @@
     GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
     req->senderRef = reference();
     req->senderData = senderData;
-    req->requestType = 
+    req->requestType =
       GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
     req->tableId = tableId;
     sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
                         30, GetTabInfoReq::SignalLength);
     return;
   }
-  tabPtr.p->m_state = Table::DROPPED;
-  tabPtr.p->m_error = errorCode;
-  completeAllSubscribers(signal, tabPtr);
-  completeInitTable(signal, tabPtr);
+
+  ndbrequire(false);
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+  Ptr<Subscription> subPtr;
+  bool empty = subList.isEmpty();
+  for(subList.first(subPtr); !subPtr.isNull();)
+  {
+    jam();
+    Ptr<SubOpRecord> ptr;
+    LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
+    for (list.first(ptr); !ptr.isNull(); )
+    {
+      jam();
+      sendSubCreateRef(signal,
+                       ptr.p->m_senderRef,
+                       ptr.p->m_senderData,
+                       SubCreateRef::TableDropped);
+
+      Ptr<SubOpRecord> tmp0 = ptr;
+      list.next(ptr);
+      list.release(tmp0);
+    }
+    Ptr<Subscription> tmp1 = subPtr;
+    subList.next(subPtr);
+    subList.release(tmp1);
+  }
+
+  c_tables.release(tabPtr);
+  ndbassert(!empty);
 }
 
 void
@@ -1721,7 +1748,7 @@
   if(!assembleFragments(signal)){
     return;
   }
-  
+
   GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
   Uint32 tableId = conf->tableId;
   TablePtr tabPtr;
@@ -1730,6 +1757,7 @@
   signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
   ndbrequire(tabPtr.p->parseTable(ptr, *this));
   releaseSections(signal);
+
   /**
    * We need to gather fragment info
    */
@@ -1758,57 +1786,6 @@
   jam();
   suma.suma_ndbrequire(s == SimpleProperties::Break);
 
-#if 0
-ToDo handle this
-  if(m_schemaVersion != tableDesc.TableVersion){
-    jam();
-
-    release(* this);
-
-    // oops wrong schema version in stored tabledesc
-    // we need to find all subscriptions with old table desc
-    // and all subscribers to this
-    // hopefully none
-    c_tables.release(tabPtr);
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       suma.c_tablePool.getSize(),
-		       suma.c_tablePool.getNoOfFree()));
-    tabPtr.setNull();
-    DLHashTable<Suma::Subscription>::Iterator i_subPtr;
-    c_subscriptions.first(i_subPtr);
-    SubscriptionPtr subPtr;
-    for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
-      jam();
-      c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
-      SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
-      if (tmp == syncPtr_p) {
-	jam();
-	continue;
-      }
-      if (subPtr.p->m_tables.get(tableId)) {
-	jam();
-	subPtr.p->m_tables.clear(tableId); // remove this old table reference
-	TableList::DataBufferIterator it;
-	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
-	  jam();
-	  if (*it.data == tableId){
-	    jam();
-	    Uint32 *pdata = it.data;
-	    tmp->m_tableList.next(it);
-	    for(;!it.isNull();tmp->m_tableList.next(it)) {
-	      jam();
-	      *pdata = *it.data;
-	      pdata = it.data;
-	    }
-	    *pdata = RNIL; // todo remove this last item...
-	    break;
-	  }
-	}
-      }
-    }
-  }
-#endif
-
   if(m_attributes.getSize() != 0){
     jam();
     DBUG_RETURN(true);
@@ -1949,19 +1926,40 @@
   const Uint32 nextFrag = fragNo + 1;
   if(nextFrag == tabPtr.p->m_fragCount)
   {
-    /**
-     * Complete frag info for table
-     * table is not up to date
-     */
+    jam();
+    tabPtr.p->m_state = Table::DEFINED;
 
-    if (tabPtr.p->c_subscribers.isEmpty())
+    LocalDLList<Subscription> subList(c_subscriptionPool,
+                                      tabPtr.p->m_subscriptions);
+    Ptr<Subscription> subPtr;
+    bool empty = subList.isEmpty();
+    for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
     {
-      completeInitTable(signal,tabPtr);
-      DBUG_VOID_RETURN;
+      jam();
+      subPtr.p->m_state = Subscription::DEFINED;
+
+      Ptr<SubOpRecord> ptr;
+      LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
+      for (list.first(ptr); !ptr.isNull();)
+      {
+        jam();
+        SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+        conf->senderRef  = reference();
+        conf->senderData = ptr.p->m_senderData;
+        sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
+                   SubCreateConf::SignalLength, JBB);
+
+        Ptr<SubOpRecord> tmp = ptr;
+        list.next(ptr);
+        list.release(tmp);
+      }
     }
-    tabPtr.p->setupTrigger(signal, *this);
-    DBUG_VOID_RETURN;
+
+    ndbassert(!empty);
+
+    return;
   }
+
   signal->theData[0] = RNIL;
   signal->theData[1] = tabPtr.i;
   signal->theData[2] = tableId;
@@ -1971,38 +1969,6 @@
   DBUG_VOID_RETURN;
 }
 
-#if 0
-void
-Suma::SyncRecord::completeTableInit(Signal* signal)
-{
-  jam();
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-  
-#if PRINT_ONLY
-  ndbout_c("GSN_SUB_SYNC_CONF (meta)");
-#else
- 
-  suma.releaseSections(signal);
-
-  if (m_error) {
-    SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
-    ref->senderRef = suma.reference();
-    ref->senderData = subPtr.p->m_senderData;
-    ref->errorCode = SubSyncRef::Undefined;
-    suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_REF, signal,
-		    SubSyncRef::SignalLength, JBB);
-  } else {
-    SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
-    conf->senderRef = suma.reference();
-    conf->senderData = subPtr.p->m_senderData;
-    suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_CONF, signal,
-		    SubSyncConf::SignalLength, JBB);
-  }
-#endif
-}
-#endif
-
 /**********************************************************
  *
  * Scan interface
@@ -2026,7 +1992,7 @@
 
 bool
 Suma::SyncRecord::getNextFragment(TablePtr * tab, 
-					     FragmentDescriptor * fd)
+                                  FragmentDescriptor * fd)
 {
   jam();
   SubscriptionPtr subPtr;
@@ -2053,11 +2019,6 @@
       }
     }
     m_currentFragment = 0;
-
-    tabPtr.p->n_subscribers--;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(suma);
   }
   return false;
 }
@@ -2291,809 +2252,802 @@
   key.m_subscriptionId        = req->subscriptionId;
   key.m_subscriptionKey       = req->subscriptionKey;
 
-  Uint32 ref = signal->getSendersBlockRef();
-  bool delayed = (ref == reference());
-  if (delayed)
-  {
-    jam();
-    ref = signal->theData[signal->getLength() - 1];
-  }
-  if (c_startup.m_restart_server_node_id && 
-      senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+  SubscriptionPtr subPtr;
+
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
-    /**
-     * only allow "restart_server" Suma's to come through 
-     * for restart purposes
-     */
     jam();
 
-    if (c_startup.m_restart_server_node_id == RNIL)
-    {
-      jam();
-      sendSubStartRef(signal, ref, SubStartRef::NF_FakeErrorREF);
-      return;
-    }
-
     /**
-     * Delay it...
+     * We havent started syncing yet
      */
-    ndbout_c("delay");
-    if (delayed)
-    {
-      jam();
-      sendSignalWithDelay(SUMA_REF, GSN_SUB_START_REQ, signal, 100, 
-			  signal->getLength());   
-    }
-    else
-    {
-      jam();
-      signal->theData[signal->getLength()] = ref;
-      sendSignalWithDelay(SUMA_REF, GSN_SUB_START_REQ, signal, 100, 
-			  signal->getLength() + 1);   
-    }
-
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::NF_FakeErrorREF);
+    return;
   }
-  
-  SubscriptionPtr subPtr;
-  if(!c_subscriptions.find(subPtr, key)){
+
+  bool found = c_subscriptions.find(subPtr, key);
+  if (!found)
+  {
     jam();
-    sendSubStartRef(signal, ref, 1407);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::NoSuchSubscription);
+    return;
   }
   
-  if (subPtr.p->m_state == Subscription::LOCKED) {
+  switch(subPtr.p->m_state){
+  case Subscription::DROPPED:
     jam();
-    DBUG_PRINT("info",("Locked"));
-    sendSubStartRef(signal, ref, 1411);
-    DBUG_VOID_RETURN;
-  }
-
-  if (subPtr.p->m_state == Subscription::DROPPED) {
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::Dropped);
+    return;
+  case Subscription::DEFINING:
     jam();
-    DBUG_PRINT("info",("Dropped"));
-    sendSubStartRef(signal, ref, 1418);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::Defining);
+    return;
+  case Subscription::DEFINED:
+    break;
   }
 
-  ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
-
-  if (delayed)
+  if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
   {
-    ndbout_c("handling delayed SUB_START_REQ!");
     jam();
-    /**
-     * Check if it's already there...
-     */
-    TablePtr tabPtr;
-    if (c_tables.find(tabPtr, subPtr.p->m_tableId))
-    {
-      jam();
-      SubscriberPtr tmp;
-      LocalDLList<Subscriber> list(c_subscriberPool, tabPtr.p->c_subscribers);
-      for (list.first(tmp); !tmp.isNull(); list.next(tmp))
-      {
-	if (tmp.p->m_senderRef == subscriberRef &&
-	    tmp.p->m_senderData == subscriberData)
-	{
-	  jam();
-	  /**
-	   * This is ugly but will work short term...
-	   */
-	  sendSubStartRef(signal, ref, SubStartRef::NF_FakeErrorREF);
-	  return;
-	}
-      }
-    }
+    sendSubStartRef(signal,
+                    senderRef, senderData, subPtr.p->m_errorCode);
+    return;
   }
   
   SubscriberPtr subbPtr;
-  if(!c_subscriberPool.seize(subbPtr)){
+  if(!c_subscriberPool.seize(subbPtr))
+  {
     jam();
-    sendSubStartRef(signal, ref, 1412);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
+    return;
   }
 
-  if (c_startup.m_restart_server_node_id == 0 && 
-      !c_connected_nodes.get(refToNode(subscriberRef)))
-    
+  Ptr<SubOpRecord> subOpPtr;
+  if (!c_subOpPool.seize(subOpPtr))
   {
     jam();
     c_subscriberPool.release(subbPtr);
-    sendSubStartRef(signal, ref, SubStartRef::PartiallyConnected);
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::OutOfSubOpRecords);
     return;
   }
   
-  DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
-		     c_subscriberPool.getSize(),
-		     c_subscriberPool.getNoOfFree()));
-
-  c_subscriber_nodes.set(refToNode(subscriberRef));
-
-  // setup subscription record
-  if (subPtr.p->m_state == Subscription::DEFINED)
-    subPtr.p->m_state = Subscription::LOCKED;
-  // store these here for later use
-  subPtr.p->m_senderRef  = senderRef;
-  subPtr.p->m_senderData = senderData;
-
   // setup subscriber record
   subbPtr.p->m_senderRef  = subscriberRef;
   subbPtr.p->m_senderData = subscriberData;
-  subbPtr.p->m_subPtrI= subPtr.i;
 
-  DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
-		     "tableId: %u id: %u key: %u",
-		     subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
-		     subPtr.i,  subPtr.p->m_senderRef,  subPtr.p->m_senderData,
-		     subPtr.p->m_tableId,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+  subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
+  subOpPtr.p->m_subPtrI = subPtr.i;
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+  subOpPtr.p->m_subscriberRef = subbPtr.i;
 
-  TablePtr tabPtr;
-  switch(part){
-  case SubscriptionData::MetaData:
-    jam();
-    c_metaSubscribers.add(subbPtr);
-    sendSubStartComplete(signal, subbPtr, 0, part);
-    DBUG_VOID_RETURN;
-  case SubscriptionData::TableData: 
-    jam();
-    initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr);
-    tabPtr.p->n_subscribers++;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    DBUG_VOID_RETURN;
+  {
+    LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
+    subOpList.add(subOpPtr);
   }
-  ndbrequire(false);
-}
 
-void
-Suma::sendSubStartComplete(Signal* signal,
-			   SubscriberPtr subbPtr, 
-			   Uint32 firstGCI,
-			   SubscriptionData::Part part)
-{
-  jam();
-  DBUG_ENTER("Suma::sendSubStartComplete");
+  /**
+   * Check triggers
+   */
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
+    jam();
+    /**
+     * create triggers
+     */
+    create_triggers(signal, subPtr);
+    break;
+  case Subscription::T_CREATING:
+    jam();
+    /**
+     * Triggers are already being created...wait for completion
+     */
+    return;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     * Trigger(s) are being dropped...wait for completion
+     *   (and recreate them when done)
+     */
+    break;
+  case Subscription::T_DEFINED:{
+    jam();
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
-             (subPtr.p->m_state == Subscription::DROPPED &&
-              c_startup.m_restart_server_node_id));
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
+    report_sub_start_conf(signal, subPtr);
+    return;
+  }
+  case Subscription::T_ERROR:
     jam();
-    subPtr.p->m_state = Subscription::DEFINED;
+    ndbrequire(false); // Checked above
+    break;
   }
-  subPtr.p->n_subscribers++;
-
-  DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
-		     "tableId: %u[i=%u] id: %u key: %u",
-		     subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
-		     subPtr.i,  subPtr.p->m_senderRef,  subPtr.p->m_senderData,
-		     subPtr.p->m_tableId, subPtr.p->m_table_ptrI,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
-
-  SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
-  
-  conf->senderRef       = reference();
-  conf->senderData      = subPtr.p->m_senderData;
-  conf->subscriptionId  = subPtr.p->m_subscriptionId;
-  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
-  conf->firstGCI        = firstGCI;
-  conf->part            = (Uint32) part;
-
-  DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr.i,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
-  sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
-	     SubStartConf::SignalLength, JBB);
-  DBUG_VOID_RETURN;
 }
 
 void
-Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 errCode)
+Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
 {
   jam();
   SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
   ref->senderRef = reference();
-  ref->errorCode = errCode;
+  ref->senderData = data;
+  ref->errorCode = err;
   releaseSections(signal);
-  sendSignal(dstref, GSN_SUB_START_REF, signal, 
+  sendSignal(dstref, GSN_SUB_START_REF, signal,
 	     SubStartRef::SignalLength, JBB);
 }
+
 void
-Suma::sendSubStartRef(Signal* signal,
-				 SubscriberPtr subbPtr, Uint32 error,
-				 SubscriptionData::Part part)
+Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr)
 {
   jam();
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
+  subPtr.p->m_trigger_state = Subscription::T_CREATING;
 
-  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
-             (subPtr.p->m_state == Subscription::DROPPED &&
-              c_startup.m_restart_server_node_id));
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
-    jam();
-    subPtr.p->m_state = Subscription::DEFINED;
-  }
+  TablePtr tabPtr;
+  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-  SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
-  ref->senderRef        = reference();
-  ref->senderData       = subPtr.p->m_senderData;
-  ref->subscriptionId   = subPtr.p->m_subscriptionId;
-  ref->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  ref->part             = (Uint32) part;
-  ref->errorCode        = error;
+  AttributeMask attrMask;
+  tabPtr.p->createAttributeMask(attrMask, *this);
 
-  sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_REF, signal, 
-	     SubStartRef::SignalLength, JBB);
-}
+  subPtr.p->m_outstanding_trigger = 3;
+  for(Uint32 j = 0; j<3; j++)
+  {
+    Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) |
subPtr.i;
+    ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
 
-/**********************************************************
- * Suma participant interface
- *
- * Stopping and removing of subscriber
- *
- */
+    CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+    req->setUserRef(SUMA_REF);
+    req->setConnectionPtr(subPtr.i);
+    req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+    req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
+    req->setMonitorReplicas(true);
+    req->setMonitorAllAttributes(false);
+    req->setReceiverRef(SUMA_REF);
+    req->setTriggerId(triggerId);
+    req->setTriggerEvent((TriggerEvent::Value)j);
+    req->setTableId(tabPtr.p->m_tableId);
+    req->setAttributeMask(attrMask);
+    sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+               signal, CreateTrigReq::SignalLength, JBB);
+  }
+}
 
 void
-Suma::execSUB_STOP_REQ(Signal* signal){
+Suma::execCREATE_TRIG_CONF(Signal* signal)
+{
   jamEntry();
-  ndbassert(signal->getNoOfSections() == 0);
-  DBUG_ENTER("Suma::execSUB_STOP_REQ");
-  
-  CRASH_INSERTION(13019);
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
-  Uint32 senderRef      = req->senderRef;
-  Uint32 senderData     = req->senderData;
-  Uint32 subscriberRef  = req->subscriberRef;
-  Uint32 subscriberData = req->subscriberData;
+  CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtr();
+  const Uint32 triggerId = conf->getTriggerId();
+  Uint32 type = (triggerId >> 16) & 0x3;
+  Uint32 tableId = conf->getTableId();
+
+  TablePtr tabPtr;
   SubscriptionPtr subPtr;
-  Subscription key; 
-  key.m_subscriptionId  = req->subscriptionId;
-  key.m_subscriptionKey = req->subscriptionKey;
-  Uint32 part = req->part;
-  
-  if (key.m_subscriptionKey == 0 &&
-      key.m_subscriptionId == 0 &&
-      subscriberData == 0)
-  {
-    SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
-    
-    conf->senderRef       = reference();
-    conf->senderData      = senderData;
-    conf->subscriptionId  = key.m_subscriptionId;
-    conf->subscriptionKey = key.m_subscriptionKey;
-    conf->subscriberData  = subscriberData;
+  c_subscriptions.getPtr(subPtr, conf->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-    sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
-	       SubStopConf::SignalLength, JBB);
+  ndbrequire(tabPtr.p->m_tableId == conf->getTableId());
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
 
-    removeSubscribersOnNode(signal, refToNode(senderRef));
-    DBUG_VOID_RETURN;
-  }
+  ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = triggerId;
+
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
 
-  if (c_startup.m_restart_server_node_id && 
-      senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+  if (subPtr.p->m_outstanding_trigger)
   {
+    jam();
     /**
-     * only allow "restart_server" Suma's to come through 
-     * for restart purposes
+     * Wait for more
      */
-    jam();
-    Uint32 err = c_startup.m_restart_server_node_id != RNIL ? 1405 : 
-      SubStopRef::NF_FakeErrorREF;
-    
-    sendSubStopRef(signal, err);
-    DBUG_VOID_RETURN;
+    return;
   }
 
-  if(!c_subscriptions.find(subPtr, key)){
+  if (subPtr.p->m_errorCode == 0)
+  {
     jam();
-    DBUG_PRINT("error", ("not found"));
-    sendSubStopRef(signal, 1407);
-    DBUG_VOID_RETURN;
+    subPtr.p->m_trigger_state = Subscription::T_DEFINED;
+    report_sub_start_conf(signal, subPtr);
   }
-  
-  if (subPtr.p->m_state == Subscription::LOCKED) {
+  else
+  {
     jam();
-    DBUG_PRINT("error", ("locked"));
-    sendSubStopRef(signal, 1411);
-    DBUG_VOID_RETURN;
+    subPtr.p->m_trigger_state = Subscription::T_ERROR;
+    drop_triggers(signal, subPtr);
   }
+}
+
+void
+Suma::execCREATE_TRIG_REF(Signal* signal)
+{
+  jamEntry();
 
-  ndbrequire(part == SubscriptionData::TableData);
+  CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
+  const Uint32 triggerId = ref->getTriggerId();
+  Uint32 type = (triggerId >> 16) & 0x3;
+  Uint32 tableId = ref->getTableId();
 
   TablePtr tabPtr;
-  tabPtr.i = subPtr.p->m_table_ptrI;
-  if (tabPtr.i == RNIL ||
-      !(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
-      tabPtr.p->m_tableId != subPtr.p->m_tableId)
-  {
-    jam();
-    DBUG_PRINT("error", ("no such table id %u[i=%u]",
-			 subPtr.p->m_tableId, subPtr.p->m_table_ptrI));
-    sendSubStopRef(signal, 1417);
-    DBUG_VOID_RETURN;
-  }
-
-  DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u",
-		     subPtr.i, subPtr.p->m_tableId, tabPtr.i,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+  SubscriptionPtr subPtr;
+  c_subscriptions.getPtr(subPtr, ref->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-  SubscriberPtr subbPtr;
-  if (senderRef == reference()){
-    jam();
-    c_subscriberPool.getPtr(subbPtr, senderData);
-    ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && 
-	       subbPtr.p->m_senderRef == subscriberRef &&
-	       subbPtr.p->m_senderData == subscriberData);
-    c_removeDataSubscribers.remove(subbPtr);
-  }
-  else
-  {
-    jam();
-    LocalDLList<Subscriber>
-      subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
+  ndbrequire(tabPtr.p->m_tableId == ref->getTableId());
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
 
-    DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d",
-		       subPtr.i, subscriberRef, subscriberData));
-    for (subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr))
-    {
-      jam();
-      DBUG_PRINT("info",
-		 ("search: subscription: %u, ref: %u, data: %u, subscriber %u", 
-		  subbPtr.p->m_subPtrI, subbPtr.p->m_senderRef,
-		  subbPtr.p->m_senderData, subbPtr.i));
-      if (subbPtr.p->m_subPtrI == subPtr.i &&
-	  subbPtr.p->m_senderRef == subscriberRef &&
-	  subbPtr.p->m_senderData == subscriberData)
-      {
-	jam();
-	DBUG_PRINT("info",("found"));
-	break;
-      }
-    }
-    /**
-     * If we didn't find anyone, send ref
-     */
-    if (subbPtr.isNull()) {
-      jam();
-      DBUG_PRINT("error", ("subscriber not found"));
-      sendSubStopRef(signal, 1407);
-      DBUG_VOID_RETURN;
-    }
-    subscribers.remove(subbPtr);
-  }
+  ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
 
-  subPtr.p->m_senderRef  = senderRef; // store ref to requestor
-  subPtr.p->m_senderData = senderData; // store ref to requestor
+  subPtr.p->m_errorCode = ref->getErrorCode();
 
-  tabPtr.p->m_drop_subbPtr= subbPtr;
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
 
-  if (subPtr.p->m_state == Subscription::DEFINED)
+  if (subPtr.p->m_outstanding_trigger)
   {
     jam();
-    subPtr.p->m_state = Subscription::LOCKED;
+    /**
+     * Wait for more
+     */
+    return;
   }
 
-  if (tabPtr.p->m_state == Table::DROPPED)
-    // not ALTERED here since trigger must be removed
-  {
-    jam();
-    tabPtr.p->n_subscribers--;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(*this);
-    sendSubStopComplete(signal, tabPtr.p->m_drop_subbPtr);
-  }
-  else
+  for (Uint32 i = 0; i<3; i++)
   {
     jam();
-    tabPtr.p->dropTrigger(signal,*this);
+    if (subPtr.p->m_triggers[i] == ILLEGAL_TRIGGER_ID)
+    {
+      jam();
+      /**
+       * Wait for more
+       */
+      return;
+    }
   }
-  DBUG_VOID_RETURN;
+
+  subPtr.p->m_trigger_state = Subscription::T_ERROR;
+  drop_triggers(signal, subPtr);
 }
 
 void
-Suma::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr)
+Suma::report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr)
 {
-  jam();
-  DBUG_ENTER("Suma::sendSubStopComplete");
-  CRASH_INSERTION(13020);
-
-  DBUG_PRINT("info",("removed subscriber: %i", subbPtr.i));
-
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-
-  Uint32 senderRef= subPtr.p->m_senderRef;
-  Uint32 senderData= subPtr.p->m_senderData;
-
-  subPtr.p->n_subscribers--;
-  ndbassert( subPtr.p->m_state == Subscription::LOCKED ||
-	     subPtr.p->m_state == Subscription::DROPPED );
-  if ( subPtr.p->m_state == Subscription::LOCKED )
   {
-    jam();
-    subPtr.p->m_state = Subscription::DEFINED;
-    if (subPtr.p->n_subscribers == 0)
+    LocalDLList<Subscriber> list(c_subscriberPool,
+                                 subPtr.p->m_subscribers);
+    LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
+
+    Ptr<Subscriber> ptr;
+    Ptr<SubOpRecord> subOpPtr;
+    for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
     {
       jam();
-#if 1
-      subPtr.p->m_table_ptrI = RNIL;
-#else
-      TablePtr tabPtr;
-      tabPtr.i = subPtr.p->m_table_ptrI;
-      if ((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) &&
-	  (tabPtr.p->m_state == Table::DROPPED ||
-	   tabPtr.p->m_state == Table::ALTERED) &&
-	  false)
+
+      Uint32 senderRef = subOpPtr.p->m_senderRef;
+      Uint32 senderData = subOpPtr.p->m_senderData;
+      c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
+
+      Uint32 nodeId = refToNode(ptr.p->m_senderRef);
+      if (c_startup.m_restart_server_node_id ||
+          (c_failedApiNodes.get(nodeId) == false &&
+           c_connected_nodes.get(nodeId)))
       {
-	// last subscriber, and table is dropped
-	// safe to drop subscription
-	c_subscriptions.release(subPtr);
-	DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-			   c_subscriptionPool.getSize(),
-			   c_subscriptionPool.getNoOfFree()));
+        SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
+        conf->senderRef       = reference();
+        conf->senderData      = senderData;
+        conf->subscriptionId  = subPtr.p->m_subscriptionId;
+        conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+        conf->firstGCI        = m_last_complete_gci + 3;
+        conf->part            = SubscriptionData::TableData;
+
+        sendSignal(senderRef, GSN_SUB_START_CONF, signal,
+                   SubStartConf::SignalLength, JBB);
+
+        list.add(ptr);
+        c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef));
       }
       else
       {
-	subPtr.p->m_table_ptrI = RNIL;
+        jam();
+        g_eventLogger.warning("Node %u failed in report_sub_start_conf",
+                              nodeId);
+        sendSubStartRef(signal,
+                        senderRef, senderData, SubStartRef::NodeDied);
+
+        c_subscriberPool.release(ptr);
       }
-      ndbassert(tabPtr.p != 0);
-#endif
+
+      Ptr<SubOpRecord> tmp = subOpPtr;
+      subOpList.next(subOpPtr);
+      subOpList.release(tmp);
     }
   }
-  else if ( subPtr.p->n_subscribers == 0 )
+
+  check_release_subscription(signal, subPtr);
+}
+
+void
+Suma::report_sub_start_ref(Signal* signal,
+                           Ptr<Subscription> subPtr,
+                           Uint32 errCode)
+{
+  LocalDLList<Subscriber> list(c_subscriberPool,
+                               subPtr.p->m_subscribers);
+  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
+
+  Ptr<Subscriber> ptr;
+  Ptr<SubOpRecord> subOpPtr;
+  for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
   {
-    // subscription is marked to be removed
-    // and there are no subscribers left
     jam();
-    ndbassert(subPtr.p->m_state == Subscription::DROPPED);
-    completeSubRemove(subPtr);
-  }
 
-  // let subscriber know that subscrber is stopped
-  {
-    SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
-    data->gci            = m_last_complete_gci + 1; // XXX ???
-    data->tableId        = 0;
-    data->operation      = NdbDictionary::Event::_TE_STOP;
-    data->senderData     = subbPtr.p->m_senderData;
-    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-	       SubTableData::SignalLength, JBB);
-  }
+    Uint32 senderRef = subOpPtr.p->m_senderRef;
+    Uint32 senderData = subOpPtr.p->m_senderData;
+    c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
 
-  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
-  
-  conf->senderRef= reference();
-  conf->senderData= senderData;
+    SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
+    ref->senderRef  = reference();
+    ref->senderData = senderData;
+    ref->errorCode  = errCode;
 
-  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
-	     SubStopConf::SignalLength, JBB);
+    sendSignal(senderRef, GSN_SUB_START_REF, signal,
+               SubStartConf::SignalLength, JBB);
 
-  c_subscriberPool.release(subbPtr);
-  DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
-		     c_subscriberPool.getSize(),
-		     c_subscriberPool.getNoOfFree()));
-  DBUG_VOID_RETURN;
-}
 
-void
-Suma::sendSubStopRef(Signal* signal, Uint32 errCode)
-{
-  jam();
-  DBUG_ENTER("Suma::sendSubStopRef");
-  SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
-  ref->senderRef = reference();
-  ref->errorCode = errCode;
-  sendSignal(signal->getSendersBlockRef(), 
-	     GSN_SUB_STOP_REF, 
-	     signal, 
-	     SubStopRef::SignalLength,
-	     JBB);
-  DBUG_VOID_RETURN;
+    Ptr<SubOpRecord> tmp = subOpPtr;
+    subOpList.next(subOpPtr);
+    subOpList.release(tmp);
+    c_subscriberPool.release(ptr);
+  }
 }
 
-/**********************************************************
- *
- * Trigger admin interface
- *
- */
-
-int
-Suma::Table::setupTrigger(Signal* signal,
-			  Suma &suma)
+void
+Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr)
 {
   jam();
-  DBUG_ENTER("Suma::Table::setupTrigger");
-
-  int ret= 0;
-  
-  AttributeMask attrMask;
-  createAttributeMask(attrMask, suma);
 
+  subPtr.p->m_outstanding_trigger = 0;
   for(Uint32 j = 0; j<3; j++)
   {
-    Uint32 triggerId = (m_schemaVersion << 18) | (j << 16) | m_ptrI;
-    if(m_hasTriggerDefined[j] == 0)
+    Uint32 triggerId = subPtr.p->m_triggers[j];
+    if (triggerId != ILLEGAL_TRIGGER_ID)
     {
-      suma.suma_ndbrequire(m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
-      DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId, j));
-      CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+      jam();
+      subPtr.p->m_outstanding_trigger++;
+      DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
+      req->setConnectionPtr(subPtr.i);
       req->setUserRef(SUMA_REF);
-      req->setConnectionPtr(m_ptrI);
+      req->setRequestType(DropTrigReq::RT_USER);
       req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
       req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
-      req->setMonitorReplicas(true);
-      req->setMonitorAllAttributes(false);
-      req->setReceiverRef(SUMA_REF);
+      req->setIndexId(RNIL);
+
+      req->setTableId(subPtr.p->m_tableId);
       req->setTriggerId(triggerId);
       req->setTriggerEvent((TriggerEvent::Value)j);
-      req->setTableId(m_tableId);
-      req->setAttributeMask(attrMask);
-      suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 
-		      signal, CreateTrigReq::SignalLength, JBB);
-      ret= 1;
-    }
-    else
-    {
-      m_hasTriggerDefined[j]++;
-      DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u",
-			 m_tableId, j, m_hasTriggerDefined[j]));
+
+      sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+                 signal, DropTrigReq::SignalLength, JBB);
     }
   }
-  DBUG_RETURN(ret);
-}
 
-void
-Suma::Table::createAttributeMask(AttributeMask& mask,
-                                            Suma &suma)
-{
-  jam();
-  mask.clear();
-  DataBuffer<15>::DataBufferIterator it;
-  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
-  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
-    mask.set(* it.data);
+  if (subPtr.p->m_outstanding_trigger == 0)
+  {
+    jam();
+    drop_triggers_complete(signal, subPtr);
   }
 }
 
 void
-Suma::execCREATE_TRIG_CONF(Signal* signal){
+Suma::execDROP_TRIG_REF(Signal* signal)
+{
   jamEntry();
-  DBUG_ENTER("Suma::execCREATE_TRIG_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
-  const Uint32 triggerId = conf->getTriggerId();
+  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
+  Ptr<Table> tabPtr;
+  Ptr<Subscription> subPtr;
+  const Uint32 triggerId = ref->getTriggerId();
   Uint32 type = (triggerId >> 16) & 0x3;
-  Uint32 tableId = conf->getTableId();
-
 
-  DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
-		       type, tableId,conf->getConnectionPtr(),triggerId & 0xFFFF));
- 
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, conf->getConnectionPtr());
-  ndbrequire(tabPtr.p->m_tableId == tableId);
-  ndbrequire(tabPtr.p->m_state == Table::DEFINING);
+  c_subscriptionPool.getPtr(subPtr, ref->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  ndbrequire(tabPtr.p->m_tableId == ref->getTableId());
 
   ndbrequire(type < 3);
-  tabPtr.p->m_triggerIds[type] = triggerId;
-  ndbrequire(tabPtr.p->m_hasTriggerDefined[type] == 0);
-  tabPtr.p->m_hasTriggerDefined[type] = 1;
+  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
 
-  if (type == 2)
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
+
+  if (subPtr.p->m_outstanding_trigger)
   {
-    completeAllSubscribers(signal, tabPtr);
-    completeInitTable(signal,tabPtr);
-    DBUG_VOID_RETURN;
+    jam();
+    /**
+     * Wait for more
+     */
+    return;
   }
-  DBUG_VOID_RETURN;
+
+  drop_triggers_complete(signal, subPtr);
 }
 
 void
-Suma::execCREATE_TRIG_REF(Signal* signal){
+Suma::execDROP_TRIG_CONF(Signal* signal)
+{
   jamEntry();
-  DBUG_ENTER("Suma::execCREATE_TRIG_REF");
-  ndbassert(signal->getNoOfSections() == 0);  
-  CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
-  const Uint32 triggerId = ref->getTriggerId();
+
+  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
+
+  Ptr<Table> tabPtr;
+  Ptr<Subscription> subPtr;
+  const Uint32 triggerId = conf->getTriggerId();
   Uint32 type = (triggerId >> 16) & 0x3;
-  Uint32 tableId = ref->getTableId();
-  
-  DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
-		       type, tableId,ref->getConnectionPtr(),triggerId & 0xFFFF));
- 
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, ref->getConnectionPtr());
-  ndbrequire(tabPtr.p->m_tableId == tableId);
-  ndbrequire(tabPtr.p->m_state == Table::DEFINING);
 
-  tabPtr.p->m_error= ref->getErrorCode();
+  c_subscriptionPool.getPtr(subPtr, conf->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  ndbrequire(tabPtr.p->m_tableId == conf->getTableId());
 
   ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
 
-  if (type == 2)
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
+
+  if (subPtr.p->m_outstanding_trigger)
   {
-    completeAllSubscribers(signal, tabPtr);
-    completeInitTable(signal,tabPtr);
-    DBUG_VOID_RETURN;
+    jam();
+    /**
+     * Wait for more
+     */
+    return;
   }
 
-  DBUG_VOID_RETURN;
+  drop_triggers_complete(signal, subPtr);
 }
 
 void
-Suma::Table::dropTrigger(Signal* signal,Suma& suma)
+Suma::drop_triggers_complete(Signal* signal, Ptr<Subscription> subPtr)
 {
-  jam();
-  DBUG_ENTER("Suma::dropTrigger");
-  
-  for(Uint32 j = 0; j<3; j++){
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
+  case Subscription::T_CREATING:
+  case Subscription::T_DEFINED:
     jam();
-    suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
-    if(m_hasTriggerDefined[j] == 1) {
-      jam();
-
-      DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
-      req->setConnectionPtr(m_ptrI);
-      req->setUserRef(SUMA_REF); // Sending to myself
-      req->setRequestType(DropTrigReq::RT_USER);
-      req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
-      req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
-      req->setIndexId(RNIL);
-
-      req->setTableId(m_tableId);
-      req->setTriggerId(m_triggerIds[j]);
-      req->setTriggerEvent((TriggerEvent::Value)j);
-
-      DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]",
-			 m_triggerIds[j],
-			 TriggerType::SUBSCRIPTION_BEFORE,
-			 TriggerActionTime::TA_DETACHED,
-			 j,
-			 m_tableId, j));
-      suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
-		      signal, DropTrigReq::SignalLength, JBB);
-    } else {
+    ndbrequire(false);
+    break;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     */
+    subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
+    if (!subPtr.p->m_start_req.isEmpty())
+    {
       jam();
-      suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
-      runDropTrigger(signal,m_triggerIds[j],suma);
+      create_triggers(signal, subPtr);
+      return;
     }
+    break;
+  case Subscription::T_ERROR:
+    jam();
+    Uint32 err = subPtr.p->m_errorCode;
+    subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
+    subPtr.p->m_errorCode = 0;
+    report_sub_start_ref(signal, subPtr, err);
+    break;
   }
-  DBUG_VOID_RETURN;
+
+  check_release_subscription(signal, subPtr);
 }
 
+/**********************************************************
+ * Suma participant interface
+ *
+ * Stopping and removing of subscriber
+ *
+ */
+
 void
-Suma::execDROP_TRIG_REF(Signal* signal){
+Suma::execSUB_STOP_REQ(Signal* signal){
   jamEntry();
-  DBUG_ENTER("Suma::execDROP_TRIG_REF");
   ndbassert(signal->getNoOfSections() == 0);
-  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
-  if (ref->getErrorCode() != DropTrigRef::TriggerNotFound)
+  DBUG_ENTER("Suma::execSUB_STOP_REQ");
+  
+  CRASH_INSERTION(13019);
+
+  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
+  Uint32 senderRef      = req->senderRef;
+  Uint32 senderData     = req->senderData;
+  Uint32 subscriberRef  = req->subscriberRef;
+  Uint32 subscriberData = req->subscriberData;
+  SubscriptionPtr subPtr;
+  Subscription key; 
+  key.m_subscriptionId  = req->subscriptionId;
+  key.m_subscriptionKey = req->subscriptionKey;
+  bool abortStart = false;
+
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
-    ndbrequire(false);
+    jam();
+
+    /**
+     * We havent started syncing yet
+     */
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::NF_FakeErrorREF);
+    return;
   }
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, ref->getConnectionPtr());
-  ndbrequire(ref->getTableId() == tabPtr.p->m_tableId);
 
-  tabPtr.p->runDropTrigger(signal, ref->getTriggerId(), *this);
-  DBUG_VOID_RETURN;
-}
+  bool found = c_subscriptions.find(subPtr, key);
 
-void
-Suma::execDROP_TRIG_CONF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execDROP_TRIG_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
+  if (!found)
+  {
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::NoSuchSubscription);
+    return;
+  }
+  
+  switch(subPtr.p->m_state){
+  case Subscription::DEFINING:
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::Defining);
+    return;
+  case Subscription::DROPPED:
+    jam();
+    break;
+  case Subscription::DEFINED:
+    jam();
+    break;
+  }
 
-  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, conf->getConnectionPtr());
-  ndbrequire(conf->getTableId() == tabPtr.p->m_tableId);
+  if (abortStart)
+  {
+    Ptr<Subscriber> ptr;
+    jam();
+    LocalDLList<Subscriber> list (c_subscriberPool, subPtr.p->m_subscribers);
+    if (list.first(ptr) &&
+        (ptr.p->m_senderRef == subscriberRef &&
+         ptr.p->m_senderData == subscriberData))
+    {
+      jam();
+      sendSubStopConf(signal, senderRef, senderData, ptr);
+      list.release(ptr);
+      return;
+    }
+  }
 
-  tabPtr.p->runDropTrigger(signal, conf->getTriggerId(),*this);
-  DBUG_VOID_RETURN;
+  Ptr<SubOpRecord> subOpPtr;
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+  bool empty = list.isEmpty();
+  if (list.seize(subOpPtr) == false)
+  {
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::OutOfSubOpRecords);
+    return;
+  }
+
+  subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
+  subOpPtr.p->m_subPtrI = subPtr.i;
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+  subOpPtr.p->m_subscriberRef = subscriberRef;
+  subOpPtr.p->m_subscriberData = subscriberData;
+
+
+  if (empty)
+  {
+    jam();
+    signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
 }
 
 void
-Suma::Table::runDropTrigger(Signal* signal,
-				       Uint32 triggerId,
-				       Suma &suma)
+Suma::sub_stop_req(Signal* signal)
 {
   jam();
-  Uint32 type = (triggerId >> 16) & 0x3;
 
-  suma.suma_ndbrequire(type < 3);
-  suma.suma_ndbrequire(m_triggerIds[type] == triggerId);
-  m_hasTriggerDefined[type]--;
-  if (m_hasTriggerDefined[type] == 0)
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
+
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
+
+  Ptr<Subscriber> ptr;
   {
-    jam();
-    m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    if (signal->theData[2] == RNIL)
+    {
+      jam();
+      list.first(ptr);
+    }
+    else
+    {
+      jam();
+      list.getPtr(ptr, signal->theData[2]);
+    }
+
+    for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr))
+    {
+      if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
+          ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
+      {
+        jam();
+        list.release(ptr);
+        goto found;
+      }
+    }
   }
-  if( m_hasTriggerDefined[0] != m_hasTriggerDefined[1] ||
-      m_hasTriggerDefined[0] != m_hasTriggerDefined[2])
+
+  if (ptr.isNull())
   {
-    // more to come
     jam();
+    sendSubStopRef(signal,
+                   subOpPtr.p->m_senderRef,
+                   subOpPtr.p->m_senderData,
+                   SubStopRef::NoSuchSubscriber);
+    check_remove_queue(signal, subPtr, subOpPtr, true, true);
     return;
   }
 
-#if 0
-  ndbout_c("trigger completed");
-#endif
-
-
-  n_subscribers--;
-  DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		     m_tableId, n_subscribers));
-  checkRelease(suma);
+  signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+  signal->theData[1] = subOpPtr.i;
+  signal->theData[2] = ptr.i;
+  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  return;
 
-  suma.sendSubStopComplete(signal, m_drop_subbPtr);
+found:
+  sendSubStopConf(signal,
+                  subOpPtr.p->m_senderRef, subOpPtr.p->m_senderData, ptr);
+  check_remove_queue(signal, subPtr, subOpPtr, true, true);
+  check_release_subscription(signal, subPtr);
 }
 
-void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
-
 void
-Suma::Table::checkRelease(Suma &suma)
+Suma::check_remove_queue(Signal* signal,
+                         Ptr<Subscription> subPtr,
+                         Ptr<SubOpRecord> subOpPtr,
+                         bool ishead,
+                         bool dorelease)
 {
-  jam();
-  DBUG_ENTER("Suma::Table::checkRelease");
-  if (n_subscribers == 0)
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+
   {
-    jam();
-    suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
-    suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
-    suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
-    if (!c_subscribers.isEmpty())
+    Ptr<SubOpRecord> tmp;
+    list.first(tmp);
+    if (ishead)
     {
-      LocalDLList<Subscriber>
-	subscribers(suma.c_subscriberPool,c_subscribers);
-      SubscriberPtr subbPtr;
-      for (subscribers.first(subbPtr);!subbPtr.isNull();
-	   subscribers.next(subbPtr))
-      {
-	jam();
-	DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
-      }
-      suma.suma_ndbrequire(false);
+      jam();
+      ndbrequire(tmp.i == subOpPtr.i);
     }
-    if (!c_syncRecords.isEmpty())
+    else
     {
-      LocalDLList<SyncRecord>
-	syncRecords(suma.c_syncPool,c_syncRecords);
-      Ptr<SyncRecord> syncPtr;
-      for (syncRecords.first(syncPtr);!syncPtr.isNull();
-	   syncRecords.next(syncPtr))
-      {
-	jam();
-	DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
-      }
-      suma.suma_ndbrequire(false);
+      jam();
+      ishead = (tmp.i == subOpPtr.i);
     }
-    release(suma);
-    suma.c_tables.remove(m_ptrI);
-    suma.c_tablePool.release(m_ptrI);
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       suma.c_tablePool.getSize(),
-		       suma.c_tablePool.getNoOfFree()));
+  }
+
+  if (dorelease)
+  {
+    jam();
+    list.release(subOpPtr);
   }
   else
   {
-    DBUG_PRINT("info",("n_subscribers: %d", n_subscribers));
+    jam();
+    list.remove(subOpPtr);
+  }
+
+  if (ishead)
+  {
+    jam();
+    if (list.first(subOpPtr) == false)
+    {
+      jam();
+      return;
+    }
+    // Fall through
+  }
+  else
+  {
+    jam();
+    return;
+  }
+
+  switch(subOpPtr.p->m_opType){
+  case SubOpRecord::R_SUB_STOP_REQ:
+    jam();
+    signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  case SubOpRecord::R_API_FAIL_REQ:
+    jam();
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  case SubOpRecord::R_START_ME_REQ:
+    jam();
+    sendSubCreateReq(signal, subPtr);
+    return;
+  }
+}
+
+void
+Suma::sendSubStopConf(Signal* signal,
+                      Uint32 senderRef,
+                      Uint32 senderData,
+                      Ptr<Subscriber> subbPtr)
+{
+  jam();
+  CRASH_INSERTION(13020);
+
+  // let subscriber know that subscrber is stopped
+  {
+    SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
+    data->gci            = m_last_complete_gci + 1; // XXX ???
+    data->tableId        = 0;
+    data->operation      = NdbDictionary::Event::_TE_STOP;
+    data->senderData     = subbPtr.p->m_senderData;
+    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+	       SubTableData::SignalLength, JBB);
+  }
+  
+  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
+  conf->senderRef= reference();
+  conf->senderData= senderData;
+  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
+	     SubStopConf::SignalLength, JBB);
+}
+
+void
+Suma::sendSubStopRef(Signal* signal,
+                     Uint32 retref,
+                     Uint32 data,
+                     Uint32 errCode)
+{
+  jam();
+  SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
+  ref->senderRef = reference();
+  ref->errorCode = errCode;
+  ref->senderData = data;
+  sendSignal(retref, GSN_SUB_STOP_REF, signal,  SubStopRef::SignalLength, JBB);
+}
+
+void
+Suma::Table::createAttributeMask(AttributeMask& mask,
+                                 Suma &suma)
+{
+  jam();
+  mask.clear();
+  DataBuffer<15>::DataBufferIterator it;
+  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
+  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
+    mask.set(* it.data);
   }
-  DBUG_VOID_RETURN;
 }
 
+void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
+
+
 /**********************************************************
  * Scan data interface
  *
@@ -3168,9 +3122,9 @@
    * Initialize signal
    */  
   SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
-  Uint32 ref = subPtr.p->m_senderRef;
+  Uint32 ref = syncPtr.p->m_senderRef;
   sdata->tableId = syncPtr.p->m_currentTableId;
-  sdata->senderData = subPtr.p->m_senderData;
+  sdata->senderData = syncPtr.p->m_senderData;
   sdata->operation = NdbDictionary::Event::_TE_SCAN; // Scan
   sdata->gci = 0; // Undefined
 #if PRINT_ONLY
@@ -3353,10 +3307,10 @@
   const Uint32 hashValue = trg->getHashValue();
   const Uint32 gci       = trg->getGCI();
   const Uint32 event     = trg->getTriggerEvent();
-  TablePtr tabPtr;
-  tabPtr.i               = trigId & 0xFFFF;
 
-  DBUG_PRINT("enter",("tabPtr.i=%u", tabPtr.i));
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF);
+
   ndbrequire(f_bufferLock == trigId);
   /**
    * Reset f_bufferLock
@@ -3364,8 +3318,9 @@
   f_bufferLock = 0;
   b_bufferLock = 0;
 
-  ndbrequire(tabPtr.p = c_tablePool.getPtr(tabPtr.i));
-  Uint32 tableId = tabPtr.p->m_tableId;
+  Uint32 tableId = subPtr.p->m_tableId;
+  Uint32 schemaVersion =
+    c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
 
   Uint32 bucket= hashValue % c_no_of_buckets;
   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
@@ -3390,12 +3345,10 @@
     data->logType        = 0;
     
     {
-      LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
+      LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
       SubscriberPtr subbPtr;
       for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
       {
-	DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
-			   refToNode(subbPtr.p->m_senderRef)));
 	data->senderData = subbPtr.p->m_senderData;
 	sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
 		   SubTableData::SignalLength, JBB, ptr, nptr);
@@ -3408,8 +3361,8 @@
     Uint32 sz = f_trigBufferSize + b_trigBufferSize + 3;
     if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
     {
-      * dst++ = tableId;
-      * dst++ = tabPtr.p->m_schemaVersion;
+      * dst++ = subPtr.i;
+      * dst++ = schemaVersion;
       * dst++ = (event << 16) | f_trigBufferSize;
       memcpy(dst, f_buffer, f_trigBufferSize << 2);
       dst += f_trigBufferSize;
@@ -3534,7 +3487,8 @@
     {
       char buf[100];
       c_subscriber_nodes.getText(buf);
-      ndbout_c("c_gcp_list.seize() failed: gci: %d nodes: %s", gci, buf);
+      g_eventLogger.error("c_gcp_list.seize() failed: gci: %d nodes: %s",
+                          gci, buf);
     }
   }
   
@@ -3566,13 +3520,6 @@
   jamEntry();
   DBUG_ENTER("Suma::execCREATE_TAB_CONF");
 
-#if 0
-  CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
-  Uint32 tableId = conf->senderData;
-
-  TablePtr tabPtr;
-  initTable(signal,tableId,tabPtr);
-#endif
   DBUG_VOID_RETURN;
 }
 
@@ -3580,7 +3527,6 @@
 Suma::execDROP_TAB_CONF(Signal *signal)
 {
   jamEntry();
-  DBUG_ENTER("Suma::execDROP_TAB_CONF");
   ndbassert(signal->getNoOfSections() == 0);
 
   DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
@@ -3588,59 +3534,54 @@
   Uint32 tableId= conf->tableId;
 
   TablePtr tabPtr;
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
+  if (!c_tables.find(tabPtr, tableId))
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
 
   DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
 
   tabPtr.p->m_state = Table::DROPPED;
-  tabPtr.p->m_hasTriggerDefined[0] = 0;
-  tabPtr.p->m_hasTriggerDefined[1] = 0;
-  tabPtr.p->m_hasTriggerDefined[2] = 0;
-  tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
-  tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
-  tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
+  c_tables.remove(tabPtr);
 
   if (senderRef == 0)
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
   // dict coordinator sends info to API
-  
+
   SubTableData * data = (SubTableData*)signal->getDataPtrSend();
   data->gci            = m_last_complete_gci+1;
   data->tableId        = tableId;
   data->operation      = NdbDictionary::Event::_TE_DROP;
   data->req_nodeid     = refToNode(senderRef);
   
+  Ptr<Subscription> subPtr;
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+
+  for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+    if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
     {
       jam();
-      /*
-       * get subscription ptr for this subscriber
-       */
-      SubscriptionPtr subPtr;
-      c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-      if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
-	jam();
-	continue;
-	//continue in for-loop if the table is not part of 
-	//the subscription. Otherwise, send data to subscriber.
-      }
-      data->senderData= subbPtr.p->m_senderData;
-      sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+      continue;
+      //continue in for-loop if the table is not part of
+      //the subscription. Otherwise, send data to subscriber.
+    }
+
+    Ptr<Subscriber> ptr;
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
+    {
+      jam();
+      data->senderData= ptr.p->m_senderData;
+      sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
 		 SubTableData::SignalLength, JBB);
-      DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
     }
   }
-  DBUG_VOID_RETURN;
 }
 
 void
@@ -3655,21 +3596,20 @@
   Uint32 tableId= conf->tableId;
 
   TablePtr tabPtr;
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
+  if (!c_tables.find(tabPtr, tableId))
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
 
-  DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
-
   tabPtr.p->m_state = Table::ALTERED;
+  c_tables.remove(tabPtr);
   // triggers must be removed, waiting for sub stop req for that
 
   if (senderRef == 0)
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
   // dict coordinator sends info to API
   
@@ -3679,31 +3619,30 @@
   data->operation      = NdbDictionary::Event::_TE_ALTER;
   data->req_nodeid     = refToNode(senderRef);
  
+  Ptr<Subscription> subPtr;
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+
+  for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+    if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
     {
       jam();
-      /*
-       * get subscription ptr for this subscriber
-       */
-      SubscriptionPtr subPtr;
-      c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-      if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
-	jam();
-	continue;
-	//continue in for-loop if the table is not part of 
-	//the subscription. Otherwise, send data to subscriber.
-      }
+      continue;
+      //continue in for-loop if the table is not part of
+      //the subscription. Otherwise, send data to subscriber.
+    }
 
-      data->senderData= subbPtr.p->m_senderData;
-      sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+    Ptr<Subscriber> ptr;
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
+    {
+      jam();
+      data->senderData= ptr.p->m_senderData;
+      sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
 		 SubTableData::SignalLength, JBB);
-      DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
     }
   }
-  DBUG_VOID_RETURN;
 }
 
 void
@@ -3736,6 +3675,13 @@
   // Ack from User and not an ack from other SUMA, redistribute in nodegroup
   
   Uint32 nodeId = refToNode(senderRef);
+  if (ERROR_INSERTED(13023))
+  {
+    ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u from %u",
+             gci, nodeId);
+    return;
+  }
+
   
   jam();
   Ptr<Gcp_record> gcp;
@@ -3744,6 +3690,7 @@
     if(gcp.p->m_gci == gci)
     {
       gcp.p->m_subscribers.clear(nodeId);
+      gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
       if(!gcp.p->m_subscribers.isclear())
       {
 	jam();
@@ -3755,8 +3702,8 @@
   
   if(gcp.isNull())
   {
-    ndbout_c("ACK wo/ gcp record (gci: %d) ref: %.8x from: %.8x", 
-	     gci, senderRef, signal->getSendersBlockRef());
+    g_eventLogger.warning("ACK wo/ gcp record (gci: %d) ref: %.8x from: %.8x",
+                          gci, senderRef, signal->getSendersBlockRef());
   }
   else
   {
@@ -3788,7 +3735,6 @@
 {
   jamEntry();
   DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
-  ndbassert(signal->getNoOfSections() == 0);
 
   CRASH_INSERTION(13021);
 
@@ -3798,54 +3744,43 @@
   key.m_subscriptionId  = req.subscriptionId;
   key.m_subscriptionKey = req.subscriptionKey;
 
-  DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		      key.m_subscriptionId, key.m_subscriptionKey));
-
-  if(!c_subscriptions.find(subPtr, key))
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
     jam();
-    DBUG_PRINT("info",("Not found"));
-    sendSubRemoveRef(signal, req, 1407);
-    DBUG_VOID_RETURN;
-  }
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
-    /**
-     * we are currently setting up triggers etc. for this event
-     */
-    jam();
-    sendSubRemoveRef(signal, req, 1413);
-    DBUG_VOID_RETURN;
-  }
-  if (subPtr.p->m_state == Subscription::DROPPED)
-  {
+
     /**
-     * already dropped
+     * We havent started syncing yet
      */
-    jam();
-    sendSubRemoveRef(signal, req, 1419);
-    DBUG_VOID_RETURN;
+    sendSubRemoveRef(signal,  req, SubRemoveRef::NF_FakeErrorREF);
+    return;
   }
 
-  ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
-  DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
+  bool found = c_subscriptions.find(subPtr, key);
 
-  if (subPtr.p->n_subscribers == 0)
+  if(!found)
   {
-    // no subscribers on the subscription
-    // remove it
     jam();
-    completeSubRemove(subPtr);
+    sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
+    return;
   }
-  else
-  {
-    // subscribers left on the subscription
-    // mark it to be removed once all subscribers
-    // are removed
+
+  switch(subPtr.p->m_state){
+  case Subscription::DEFINING:
+    ndbrequire(false);
+  case Subscription::DROPPED:
+    /**
+     * already dropped
+     */
     jam();
-    subPtr.p->m_state = Subscription::DROPPED;
+    sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
+    return;
+  case Subscription::DEFINED:
+    break;
   }
 
+  subPtr.p->m_state = Subscription::DROPPED;
+  check_release_subscription(signal, subPtr);
+
   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
   conf->senderRef            = reference();
   conf->senderData           = req.senderData;
@@ -3853,73 +3788,111 @@
   conf->subscriptionKey      = req.subscriptionKey;
 
   sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
-	     SubRemoveConf::SignalLength, JBB);
-
-  DBUG_VOID_RETURN;
+             SubRemoveConf::SignalLength, JBB);
+  return;
 }
 
 void
-Suma::completeSubRemove(SubscriptionPtr subPtr)
+Suma::check_release_subscription(Signal* signal, Ptr<Subscription> subPtr)
 {
-  DBUG_ENTER("Suma::completeSubRemove");
-  Uint32 subscriptionId  = subPtr.p->m_subscriptionId;
-  Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
+  if (!subPtr.p->m_subscribers.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  c_subscriptions.release(subPtr);
-  DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-		     c_subscriptionPool.getSize(),
-		     c_subscriptionPool.getNoOfFree()));
+  if (!subPtr.p->m_start_req.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  /**
-   * I was the last subscription to be remove so clear c_tables
-   */
-#if 0
-  ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
-	   c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());
-#endif
+  if (!subPtr.p->m_stop_req.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
     jam();
-#if 0
-    ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
-#endif
-    int count= 0;
-    KeyTable<Table>::Iterator it;
-    for(c_tables.first(it); !it.isNull(); )
-    {
-      // ndbrequire(false);
-      
-      DBUG_PRINT("error",("trailing table id: %d[i=%d] n_subscribers: %d m_state: %d",
-			  it.curr.p->m_tableId,
-			  it.curr.p->m_ptrI,
-			  it.curr.p->n_subscribers,
-			  it.curr.p->m_state));
+    goto do_release;
+  case Subscription::T_CREATING:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  case Subscription::T_DEFINED:
+    jam();
+    subPtr.p->m_trigger_state = Subscription::T_DROPPING;
+    drop_triggers(signal, subPtr);
+    return;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  case Subscription::T_ERROR:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  }
+  ndbrequire(false);
 
-      LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
-      SubscriberPtr subbPtr;
-      for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
-      {
-	DBUG_PRINT("error",("subscriber %d, m_subPtrI: %d", subbPtr.i,
subbPtr.p->m_subPtrI));
-      }
+do_release:
+  TablePtr tabPtr;
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-      it.curr.p->release(* this);
-      TablePtr tabPtr = it.curr;
-      c_tables.next(it);
+  if (tabPtr.p->m_state == Table::DROPPED)
+  {
+    jam();
+    subPtr.p->m_state = Subscription::DROPPED;
+  }
+
+  if (subPtr.p->m_state != Subscription::DROPPED)
+  {
+    jam();
+    return;
+  }
+
+  {
+    LocalDLList<Subscription> list(c_subscriptionPool,
+                                   tabPtr.p->m_subscriptions);
+    list.remove(subPtr);
+  }
+
+  if (tabPtr.p->m_subscriptions.isEmpty())
+  {
+    jam();
+    switch(tabPtr.p->m_state){
+    case Table::UNDEFINED:
+      ndbrequire(false);
+    case Table::DEFINING:
+      break;
+    case Table::DEFINED:
+      jam();
       c_tables.remove(tabPtr);
+      // Fall through
+    case Table::DROPPED:
+      jam();
+    case Table::ALTERED:
+      jam();
+      tabPtr.p->release(* this);
       c_tablePool.release(tabPtr);
-      DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-			 c_tablePool.getSize(),
-			 c_tablePool.getNoOfFree()));
-      count++;
-    }
-    DBUG_ASSERT(count == 0);
+    };
   }
-  DBUG_VOID_RETURN;
+
+  c_subscriptions.release(subPtr);
 }
 
 void
-Suma::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
-				  Uint32 errCode)
+Suma::sendSubRemoveRef(Signal* signal,
+                       const SubRemoveReq& req,
+                       Uint32 errCode)
 {
   jam();
   DBUG_ENTER("Suma::sendSubRemoveRef");
@@ -3946,10 +3919,6 @@
   fragBuf.release();
 
   m_state = UNDEFINED;
-#ifndef DBUG_OFF
-  if (n_subscribers != 0)
-    abort();
-#endif
 }
 
 void
@@ -3973,400 +3942,290 @@
  */
 
 void
-Suma::execSUMA_START_ME_REQ(Signal* signal) {
-  jamEntry();
-  DBUG_ENTER("Suma::execSUMA_START_ME");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUMA_START_ME_REQ(signal, signal->getSendersBlockRef());
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::execSUB_CREATE_REF(Signal* signal) {
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_CREATE_REF");
-  ndbassert(signal->getNoOfSections() == 0);
-  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
-  Uint32 error= ref->errorCode;
-  if (error != 1415)
-  {
-    /*
-     * This will happen if an api node connects during while other node
-     * is restarting, and in this case the subscription will already
-     * have been created.
-     * ToDo: more complete handling of api nodes joining during
-     * node restart
-     */
-    Uint32 senderRef = signal->getSendersBlockRef();
-    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
-    // for some reason we did not manage to create a subscription
-    // on the starting node
-    SystemError * const sysErr = (SystemError*)&signal->theData[0];
-    sysErr->errorCode = SystemError::CopySubscriptionRef;
-    sysErr->errorRef = reference();
-    sysErr->data1 = error;
-    sysErr->data2 = 0;
-    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
-               SystemError::SignalLength, JBB);
-    Restart.resetRestart(signal);
-    DBUG_VOID_RETURN;
-  }
-  // SubCreateConf has same signaldata as SubCreateRef
-  Restart.runSUB_CREATE_CONF(signal);
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::execSUB_CREATE_CONF(Signal* signal)
+Suma::execSUMA_START_ME_REQ(Signal* signal)
 {
   jamEntry();
-  DBUG_ENTER("Suma::execSUB_CREATE_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUB_CREATE_CONF(signal);
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::execSUB_START_CONF(Signal* signal)
-{
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_START_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUB_START_CONF(signal);
-  DBUG_VOID_RETURN;
-}
 
-void
-Suma::execSUB_START_REF(Signal* signal) {
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_START_REF");
-  ndbassert(signal->getNoOfSections() == 0);
-  SubStartRef *const ref= (SubStartRef *)signal->getDataPtr();
-  Uint32 error= ref->errorCode;
+  Uint32 retref = signal->getSendersBlockRef();
+  if (c_restart.m_ref)
   {
-    Uint32 senderRef = signal->getSendersBlockRef();
-    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
-    // for some reason we did not manage to start a subscriber
-    // on the starting node
-    SystemError * const sysErr = (SystemError*)&signal->theData[0];
-    sysErr->errorCode = SystemError::CopySubscriberRef;
-    sysErr->errorRef = reference();
-    sysErr->data1 = error;
-    sysErr->data2 = 0;
-    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
-               SystemError::SignalLength, JBB);
-    Restart.resetRestart(signal);
+    jam();
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = SumaStartMeRef::Busy;
+    sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
+    return;
   }
-  DBUG_VOID_RETURN;
-}
-
-Suma::Restart::Restart(Suma& s) : suma(s)
-{
-  nodeId = 0;
-}
 
-void
-Suma::Restart::runSUMA_START_ME_REQ(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::runSUMA_START_ME");
-
-  if(nodeId != 0)
+  Ptr<SubOpRecord> subOpPtr;
+  if (c_subOpPool.seize(subOpPtr) == false)
   {
+    jam();
     SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
     ref->errorCode = SumaStartMeRef::Busy;
-    suma.sendSignal(sumaRef, GSN_SUMA_START_ME_REF, signal,
-		    SumaStartMeRef::SignalLength, JBB);
+    sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
     return;
   }
 
-  nodeId = refToNode(sumaRef);
-  startNode(signal, sumaRef);
-
-  DBUG_VOID_RETURN;
-}
-
-void
-Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::startNode");
-  
-  // right now we can only handle restarting one node
-  // at a time in a node group
-  
-  createSubscription(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
+  subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
 
-void 
-Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::createSubscription");
-  suma.c_subscriptions.first(c_subIt);
-  nextSubscription(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
+  c_restart.m_abort = 0;
+  c_restart.m_waiting_on_self = 0;
+  c_restart.m_ref = retref;
+  c_restart.m_max_seq = c_current_seq;
+  c_restart.m_subOpPtrI = subOpPtr.i;
 
-void 
-Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::nextSubscription");
-
-  if (c_subIt.isNull())
+  DLHashTable<Subscription>::Iterator it;
+  if (c_subscriptions.first(it))
   {
     jam();
-    completeSubscription(signal, sumaRef);
-    DBUG_VOID_RETURN;
-  }
-  SubscriptionPtr subPtr;
-  subPtr.i = c_subIt.curr.i;
-  subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
 
-  suma.c_subscriptions.next(c_subIt);
-
-  SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
-      
-  req->senderRef        = suma.reference();
-  req->senderData       = subPtr.i;
-  req->subscriptionId   = subPtr.p->m_subscriptionId;
-  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  req->subscriptionType = subPtr.p->m_subscriptionType |
-    SubCreateReq::RestartFlag;
-
-  switch (subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
-    jam();
-    req->tableId = subPtr.p->m_tableId;
-    req->state = subPtr.p->m_state;
-    suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
-		    SubCreateReq::SignalLength2, JBB);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SingleTableScan:
-    jam();
-    nextSubscription(signal, sumaRef);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-    ndbrequire(false);
+    /**
+     * We only need to handle subscriptions with seq <= c_current_seq
+     *   all subscriptions(s) created after this, will be handled by
+     *   starting suma directly
+     */
+    c_current_seq++;
   }
-  ndbrequire(false);
+
+  copySubscription(signal, it);
 }
 
 void
-Suma::Restart::runSUB_CREATE_CONF(Signal* signal)
+Suma::copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator it)
 {
   jam();
-  DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF");
 
-  const Uint32 senderRef = signal->senderBlockRef();
-  Uint32 sumaRef = signal->getSendersBlockRef();
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
 
-  SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
+  Ptr<Subscription> subPtr = it.curr;
+  if (!subPtr.isNull())
+  {
+    jam();
+    c_restart.m_subPtrI = subPtr.i;
+    c_restart.m_bucket = it.bucket;
 
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr,conf->senderData);
 
-  switch(subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
-    if (1)
+    LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+    bool empty = list.isEmpty();
+    list.add(subOpPtr);
+
+    if (!empty)
     {
+      /**
+       * Wait for lock
+       */
       jam();
-      nextSubscription(signal, sumaRef);
-    } else {
-      jam();
-      SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
-      
-      req->senderRef        = suma.reference();
-      req->senderData       = subPtr.i;
-      req->subscriptionId   = subPtr.p->m_subscriptionId;
-      req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-      req->subscriptionType = subPtr.p->m_subscriptionType |
-	SubCreateReq::RestartFlag |
-	SubCreateReq::AddTableFlag;
+      c_restart.m_waiting_on_self = 1;
+      return;
+    }
 
-      req->tableId = 0;
+    sendSubCreateReq(signal, subPtr);
+  }
+  else
+  {
+    jam();
+    SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend();
+    conf->unused = 0;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
+               SumaStartMeConf::SignalLength, JBB);
 
-      suma.sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
-		      SubCreateReq::SignalLength, JBB);
-    }
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SingleTableScan:
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-    ndbrequire(false);
+    c_subOpPool.release(subOpPtr);
+    c_restart.m_ref = 0;
+    return;
   }
-  ndbrequire(false);
 }
 
-void 
-Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef)
+void
+Suma::sendSubCreateReq(Signal* signal, Ptr<Subscription> subPtr)
 {
   jam();
-  DBUG_ENTER("Suma::Restart::completeSubscription");
-  startSubscriber(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
 
-void 
-Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::startSubscriber");
-  suma.c_tables.first(c_tabIt);
-  if (c_tabIt.isNull())
+  if (c_restart.m_abort)
   {
-    completeSubscriber(signal, sumaRef);
-    DBUG_VOID_RETURN;
+    jam();
+    abort_start_me(signal, subPtr, true);
+    return;
   }
-  SubscriberPtr subbPtr;
+
+  if (subPtr.p->m_state != Subscription::DROPPED)
   {
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
-    subbs.first(subbPtr);
+    jam();
+    c_restart.m_waiting_on_self = 0;
+    SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+    req->senderRef        = reference();
+    req->senderData       = subPtr.i;
+    req->subscriptionId   = subPtr.p->m_subscriptionId;
+    req->subscriptionKey  = subPtr.p->m_subscriptionKey;
+    req->subscriptionType = subPtr.p->m_subscriptionType;
+    req->tableId          = subPtr.p->m_tableId;
+    sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
+               SubCreateReq::SignalLength, JBB);
+  }
+  else
+  {
+    /**
+     * No need to copy DROPPED subscription...
+     *   but this introduces a real time break
+     */
+    c_restart.m_waiting_on_self = 1;
+    SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
+    conf->senderRef        = reference();
+    conf->senderData       = subPtr.i;
+    sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
+               SubCreateConf::SignalLength, JBB);
   }
-  nextSubscriber(signal, sumaRef, subbPtr);
-  DBUG_VOID_RETURN;
 }
 
 void 
-Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef,
-			      SubscriberPtr subbPtr)
+Suma::execSUB_CREATE_REF(Signal* signal)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::nextSubscriber");
-  while (subbPtr.isNull())
-  {
-    jam();
-    DBUG_PRINT("info",("prev tableId %u",c_tabIt.curr.p->m_tableId));
-    suma.c_tables.next(c_tabIt);
-    if (c_tabIt.isNull())
-    {
-      completeSubscriber(signal, sumaRef);
-      DBUG_VOID_RETURN;
-    }
-    DBUG_PRINT("info",("next tableId %u",c_tabIt.curr.p->m_tableId));
+  jamEntry();
 
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
-    subbs.first(subbPtr);
+  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
+  Uint32 error= ref->errorCode;
+
+  {
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = error;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
   }
 
-  /*
-   * get subscription ptr for this subscriber
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
+  abort_start_me(signal, subPtr, true);
+}
+
+void 
+Suma::execSUB_CREATE_CONF(Signal* signal)
+{
+  jamEntry();
+
+  /**
+   * We have lock...start all subscriber(s)
    */
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-  switch (subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
+  c_restart.m_waiting_on_self = 0;
+
+  /**
+   * Check if we were aborted...
+   *  this signal is sent to self in case of DROPPED subscription...
+   */
+  if (c_restart.m_abort)
+  {
     jam();
-    sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-  case SubCreateReq::SingleTableScan:
-    ndbrequire(false);
+    abort_start_me(signal, subPtr, true);
+    return;
   }
-  ndbrequire(false);
+
+  Ptr<Subscriber> ptr;
+  if (subPtr.p->m_state != Subscription::DROPPED)
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    list.first(ptr);
+  }
+  else
+  {
+    ptr.setNull();
+  }
+
+  copySubscriber(signal, subPtr, ptr);
 }
 
 void
-Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
-			       Signal* signal, Uint32 sumaRef)
+Suma::copySubscriber(Signal* signal,
+                     Ptr<Subscription> subPtr,
+                     Ptr<Subscriber> ptr)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::sendSubStartReq");
-  SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
+  if (!ptr.isNull())
+  {
+    jam();
 
-  req->senderRef        = suma.reference();
-  req->senderData       = subbPtr.i;
-  req->subscriptionId   = subPtr.p->m_subscriptionId;
-  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  req->part             = SubscriptionData::TableData;
-  req->subscriberData   = subbPtr.p->m_senderData;
-  req->subscriberRef    = subbPtr.p->m_senderRef;
-
-  // restarting suma will not respond to this until startphase 5
-  // since it is not until then data copying has been completed
-  DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u]",
-		     subbPtr.i,
-		     subPtr.p->m_subscriptionId,
-		     subPtr.p->m_subscriptionKey,
-		     subPtr.p->m_tableId));
+    SubStartReq* req = (SubStartReq*)signal->getDataPtrSend();
+    req->senderRef        = reference();
+    req->senderData       = ptr.i;
+    req->subscriptionId   = subPtr.p->m_subscriptionId;
+    req->subscriptionKey  = subPtr.p->m_subscriptionKey;
+    req->part             = SubscriptionData::TableData;
+    req->subscriberData   = ptr.p->m_senderData;
+    req->subscriberRef    = ptr.p->m_senderRef;
 
-  suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
-		  signal, SubStartReq::SignalLength2, JBB);
-  DBUG_VOID_RETURN;
+    sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
+               signal, SubStartReq::SignalLength, JBB);
+    return;
+  }
+  else
+  {
+    // remove lock from this subscription
+    Ptr<SubOpRecord> subOpPtr;
+    c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
+    check_remove_queue(signal, subPtr, subOpPtr, true, false);
+    check_release_subscription(signal, subPtr);
+
+    DLHashTable<Subscription>::Iterator it;
+    it.curr = subPtr;
+    it.bucket = c_restart.m_bucket;
+    c_subscriptions.next(it);
+    copySubscription(signal, it);
+  }
 }
 
 void 
-Suma::Restart::runSUB_START_CONF(Signal* signal)
+Suma::execSUB_START_CONF(Signal* signal)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::runSUB_START_CONF");
+  jamEntry();
 
   SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
 
-  Subscription key;
-  SubscriptionPtr subPtr;
-  key.m_subscriptionId  = conf->subscriptionId;
-  key.m_subscriptionKey = conf->subscriptionKey;
-  ndbrequire(suma.c_subscriptions.find(subPtr, key));
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-  TablePtr tabPtr;
-  ndbrequire(suma.c_tables.find(tabPtr, subPtr.p->m_tableId));
+  Ptr<Subscriber> ptr;
+  c_subscriberPool.getPtr(ptr, conf->senderData);
+
+  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+  list.next(ptr);
+  copySubscriber(signal, subPtr, ptr);
+}
+
+void
+Suma::execSUB_START_REF(Signal* signal)
+{
+  jamEntry();
+
+  SubStartRef * sig = (SubStartRef*)signal->getDataPtr();
+  Uint32 errorCode = sig->errorCode;
 
-  SubscriberPtr subbPtr;
   {
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,tabPtr.p->c_subscribers);
-    subbs.getPtr(subbPtr, conf->senderData);
-    DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u",
-		       subbPtr.i,key.m_subscriptionId,key.m_subscriptionKey,
-		       subPtr.p->m_tableId));
-    subbs.next(subbPtr);
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = errorCode;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
   }
 
-  Uint32 sumaRef = signal->getSendersBlockRef();
-  nextSubscriber(signal, sumaRef, subbPtr);
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef)
-{
-  DBUG_ENTER("Suma::Restart::completeSubscriber");
-  completeRestartingNode(signal, sumaRef);
-  DBUG_VOID_RETURN;
+  abort_start_me(signal, subPtr, true);
 }
 
 void
-Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef)
+Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr,
+                     bool lockowner)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::completeRestartingNode");
-  //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
-  suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
-		  SumaStartMeConf::SignalLength, JBB);
-  resetRestart(signal);
-  DBUG_VOID_RETURN;
-}
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
+  check_remove_queue(signal, subPtr, subOpPtr, lockowner, true);
+  check_release_subscription(signal, subPtr);
 
-void
-Suma::Restart::resetRestart(Signal* signal)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::resetRestart");
-  nodeId = 0;
-  DBUG_VOID_RETURN;
+  c_restart.m_ref = 0;
 }
 
-// only run on restarting suma
-
 void
 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
 {
@@ -4815,7 +4674,6 @@
   {
     free_page(tail, page);
     tail = bucket->m_buffer_tail = next_page;
-    ndbout_c("pos==0 && min_gci(%d) > max_gci(%d) resend switching page to
%d", min_gci, max_gci, tail);
     goto next;
   }
   
@@ -4864,7 +4722,10 @@
   
       char buf[255];
       c_subscriber_nodes.getText(buf);
-      ndbout_c("resending GCI: %d rows: %d -> %s", last_gci, g_cnt, buf);
+      if (g_cnt)
+      {
+        ndbout_c("resending GCI: %d rows: %d -> %s", last_gci, g_cnt, buf);
+      }
       g_cnt = 0;
       
       NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
@@ -4874,7 +4735,7 @@
     else
     {
       g_cnt++;
-      Uint32 table = * src++ ;
+      Uint32 subPtrI = * src++ ;
       Uint32 schemaversion = * src ++;
       Uint32 event = * src >> 16;
       Uint32 sz_1 = (* src ++) & 0xFFFF;
@@ -4889,9 +4750,12 @@
       /**
        * Signal to subscriber(s)
        */
+      Ptr<Subscription> subPtr;
+      c_subscriptionPool.getPtr(subPtr, subPtrI);
       Ptr<Table> tabPtr;
-      if (c_tables.find(tabPtr, table) && 
-	  tabPtr.p->m_schemaVersion == schemaversion)
+      c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+      Uint32 table = subPtr.p->m_tableId;
+      if (tabPtr.p->m_schemaVersion == schemaversion)
       {
 	SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
 	data->gci            = last_gci;
@@ -4901,12 +4765,10 @@
 	
 	{
 	  LocalDLList<Subscriber> list(c_subscriberPool,
-				       tabPtr.p->c_subscribers);
+				       subPtr.p->m_subscribers);
 	  SubscriberPtr subbPtr;
 	  for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
 	  {
-	    DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
-			       refToNode(subbPtr.p->m_senderRef)));
 	    data->senderData = subbPtr.p->m_senderData;
 	    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
 		       SubTableData::SignalLength, JBB, ptr, nptr);
@@ -4927,7 +4789,6 @@
     tail = bucket->m_buffer_tail = next_page;
     pos = 0;
     last_gci = 0;
-    ndbout_c("ptr == end -> resend switching page to %d", tail);
   }
   else
   {

--- 1.10/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2007-12-14 13:48:09 +01:00
+++ 1.11/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2007-12-14 13:48:09 +01:00
@@ -48,20 +48,13 @@
   
   void execSUB_START_REQ(Signal* signal);
   void execSUB_STOP_REQ(Signal* signal);
-  
+
   void execSUB_SYNC_REQ(Signal* signal);
   void execSUB_ABORT_SYNC_REQ(Signal* signal);
 
-  void execSUB_STOP_CONF(Signal* signal);
-  void execSUB_STOP_REF(Signal* signal);
-
  /**
    * Dict interface
    */
-#if 0
-  void execLIST_TABLES_REF(Signal* signal);
-  void execLIST_TABLES_CONF(Signal* signal);
-#endif
   void execGET_TABINFOREF(Signal* signal);
   void execGET_TABINFO_CONF(Signal* signal);
 
@@ -71,6 +64,10 @@
   void execDROP_TAB_CONF(Signal* signal);
   void execALTER_TAB_CONF(Signal* signal);
   void execCREATE_TAB_CONF(Signal* signal);
+
+  void execDICT_LOCK_REF(Signal*);
+  void execDICT_LOCK_CONF(Signal*);
+
   /**
    * Scan interface
    */
@@ -137,7 +134,6 @@
   struct Subscriber {
     Uint32 m_senderRef;
     Uint32 m_senderData;
-    Uint32 m_subPtrI; //reference to subscription
     Uint32 nextList;
 
     union { Uint32 nextPool; Uint32 prevList; };
@@ -197,22 +193,92 @@
   };
   friend struct SyncRecord;
 
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
-		Ptr<SyncRecord> syncPtr);
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
-		SubscriberPtr subbPtr);
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
-  
-  int completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr);
-  void completeAllSubscribers(Signal* signal, TablePtr tabPtr);
-  void completeInitTable(Signal* signal, TablePtr tabPtr);
+  struct SubOpRecord
+  {
+    enum OpType
+    {
+      R_SUB_START_REQ,
+      R_SUB_STOP_REQ,
+      R_START_ME_REQ,
+      R_API_FAIL_REQ
+    };
+
+    Uint32 m_opType;
+    Uint32 m_subPtrI;
+    Uint32 m_senderRef;
+    Uint32 m_senderData;
+    Uint32 m_subscriberRef;
+    Uint32 m_subscriberData;
+
+    Uint32 nextList;
+    union {
+      Uint32 prevList;
+      Uint32 nextPool;
+    };
+  };
+  friend struct SubOpRecord;
+
+  struct Subscription
+  {
+    Uint32 m_seq_no;
+    Uint32 m_subscriptionId;
+    Uint32 m_subscriptionKey;
+    Uint32 m_subscriptionType;
+
+    enum State {
+      UNDEFINED,
+      DEFINED,
+      DROPPED,
+      DEFINING
+    };
+
+    enum TriggerState {
+      T_UNDEFINED,
+      T_CREATING,
+      T_DEFINED,
+      T_DROPPING,
+      T_ERROR
+    };
+
+    State m_state;
+    TriggerState m_trigger_state;
+
+    DLList<Subscriber>::Head m_subscribers;
+    DLFifoList<SubOpRecord>::Head m_create_req;
+    DLFifoList<SubOpRecord>::Head m_start_req;
+    DLFifoList<SubOpRecord>::Head m_stop_req;
+
+    Uint32 m_errorCode;
+    Uint32 m_outstanding_trigger;
+    Uint32 m_triggers[3];
+
+    Uint32 nextList, prevList;
+    Uint32 nextHash;
+    union { Uint32 prevHash; Uint32 nextPool; };
+
+    Uint32 hashValue() const {
+      return m_subscriptionId + m_subscriptionKey;
+    }
+
+    bool equal(const Subscription & s) const {
+      return
+	m_subscriptionId == s.m_subscriptionId &&
+	m_subscriptionKey == s.m_subscriptionKey;
+    }
+    /**
+     * The following holds the tables included
+     * in the subscription.
+     */
+    Uint32 m_tableId;
+    Uint32 m_table_ptrI;
+  };
+  typedef Ptr<Subscription> SubscriptionPtr;
 
   struct Table {
-    Table() { m_tableId = ~0; n_subscribers = 0; }
+    Table() { m_tableId = ~0; }
     void release(Suma&);
-    void checkRelease(Suma &suma);
 
-    DLList<Subscriber>::Head c_subscribers;
+    DLList<Subscription>::Head m_subscriptions;
     DLList<SyncRecord>::Head c_syncRecords;
 
     enum State {
@@ -225,35 +291,15 @@
     State m_state;
 
     Uint32 m_ptrI;
-    SubscriberPtr m_drop_subbPtr;
-
-    Uint32 n_subscribers;
 
     bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
     /**
      * Create triggers
      */
-    int setupTrigger(Signal* signal, Suma &suma);
-    void completeTrigger(Signal* signal);
     void createAttributeMask(AttributeMask&, Suma &suma);
     
-    /**
-     * Drop triggers
-     */
-    void dropTrigger(Signal* signal,Suma&);
-    void runDropTrigger(Signal* signal, Uint32 triggerId,Suma&);
-
-    /**
-     * Sync meta
-     */    
-#if 0
-    void runLIST_TABLES_CONF(Signal* signal);
-#endif
-    
     union { Uint32 m_tableId; Uint32 key; };
     Uint32 m_schemaVersion;
-    Uint32 m_hasTriggerDefined[3]; // Insert/Update/Delete
-    Uint32 m_triggerIds[3]; // Insert/Update/Delete
 
     Uint32 m_error;
     /**
@@ -281,49 +327,10 @@
       return m_tableId == rec.m_tableId;
     }
   };
-
-  struct Subscription {
-    Uint32 m_senderRef;
-    Uint32 m_senderData;
-    Uint32 m_subscriptionId;
-    Uint32 m_subscriptionKey;
-    Uint32 m_subscriptionType;
-
-    enum State {
-      UNDEFINED,
-      LOCKED,
-      DEFINED,
-      DROPPED
-    };
-    State m_state;
-    Uint32 n_subscribers;
-
-    Uint32 nextHash;
-    union { Uint32 prevHash; Uint32 nextPool; };
-
-    Uint32 hashValue() const {
-      return m_subscriptionId + m_subscriptionKey;
-    }
-
-    bool equal(const Subscription & s) const {
-      return 
-	m_subscriptionId == s.m_subscriptionId && 
-	m_subscriptionKey == s.m_subscriptionKey;
-    }
-    /**
-     * The following holds the tables included 
-     * in the subscription.
-     */
-    Uint32 m_tableId;
-    Uint32 m_table_ptrI;
-  };
-  typedef Ptr<Subscription> SubscriptionPtr;
     
   /**
    * 
    */
-  DLList<Subscriber> c_metaSubscribers;
-  DLList<Subscriber> c_removeDataSubscribers;
 
   /**
    * Lists
@@ -339,6 +346,7 @@
   ArrayPool<Subscription> c_subscriptionPool;
   ArrayPool<SyncRecord> c_syncPool;
   DataBuffer<15>::DataBufferPool c_dataBufferPool;
+  ArrayPool<SubOpRecord> c_subOpPool;
 
   NodeBitmask c_failedApiNodes;
   
@@ -347,32 +355,33 @@
    */
   bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
 
-  bool checkTableTriggers(SegmentedSectionPtr ptr);
+  void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
 
-  void addTableId(Uint32 TableId,
-		  SubscriptionPtr subPtr, SyncRecord *psyncRec);
+  void sendSubCreateRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void sendSubStopRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void sendSubStopConf(Signal*, Uint32 ref, Uint32 data, Ptr<Subscriber>);
 
-  void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
-  void sendSubCreateRef(Signal* signal, Uint32 errorCode);
-  void sendSubStartRef(Signal*, SubscriberPtr, Uint32 errorCode, SubscriptionData::Part);
-  void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 errorCode);
-  void sendSubStopRef(Signal* signal, Uint32 errorCode);
   void sendSubSyncRef(Signal* signal, Uint32 errorCode);  
   void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
 			Uint32 errorCode);
-  void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, 
-			    SubscriptionData::Part);
-  void sendSubStopComplete(Signal*, SubscriberPtr);
   void sendSubStopReq(Signal* signal, bool unlock= false);
 
   void completeSubRemove(SubscriptionPtr subPtr);
 
   Uint32 getFirstGCI(Signal* signal);
 
-  /**
-   * Table admin
-   */
-  void convertNameToId( SubscriptionPtr subPtr, Signal * signal);
+  void create_triggers(Signal*, Ptr<Subscription>);
+  void drop_triggers(Signal*, Ptr<Subscription>);
+  void drop_triggers_complete(Signal*, Ptr<Subscription>);
+
+  void report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr);
+  void report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32);
+
+  void sub_stop_req(Signal*);
+  void check_remove_queue(Signal*, Ptr<Subscription>,
+                          Ptr<SubOpRecord>,bool,bool);
+  void check_release_subscription(Signal* signal, Ptr<Subscription>);
 
   /**
    * Public interface
@@ -405,6 +414,10 @@
   void execAPI_START_REP(Signal* signal);
   void execAPI_FAILREQ(Signal* signal) ;
 
+  void api_fail_gci_list(Signal*, Uint32 node);
+  void api_fail_subscriber_list(Signal*, Uint32 node);
+  void api_fail_subscription(Signal*);
+
   void execSUB_GCP_COMPLETE_ACK(Signal* signal);
 
   /**
@@ -425,6 +438,12 @@
   void execSUMA_START_ME_REQ(Signal* signal);
   void execSUMA_START_ME_REF(Signal* signal);
   void execSUMA_START_ME_CONF(Signal* signal);
+
+  void copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator);
+  void sendSubCreateReq(Signal* signal, Ptr<Subscription>);
+  void copySubscriber(Signal*, Ptr<Subscription>, Ptr<Subscriber>);
+  void abort_start_me(Signal*, Ptr<Subscription>, bool lockowner);
+
   void execSUMA_HANDOVER_REQ(Signal* signal);
   void execSUMA_HANDOVER_REF(Signal* signal);
   void execSUMA_HANDOVER_CONF(Signal* signal);
@@ -444,41 +463,7 @@
    * for Suma that is restarting another
    */
 
-  struct Restart {
-    Restart(Suma& s);
-
-    Suma & suma;
-    Uint32 nodeId;
-
-    DLHashTable<Subscription>::Iterator c_subIt;
-    KeyTable<Table>::Iterator c_tabIt;
-
-    void progError(int line, int cause, const char * extra) { 
-      suma.progError(line, cause, extra); 
-    }
-
-    void resetNode(Uint32 sumaRef);
-    void runSUMA_START_ME_REQ(Signal*, Uint32 sumaRef);
-    void startNode(Signal*, Uint32 sumaRef);
-
-    void createSubscription(Signal* signal, Uint32 sumaRef);
-    void nextSubscription(Signal* signal, Uint32 sumaRef);
-    void runSUB_CREATE_CONF(Signal* signal);
-    void completeSubscription(Signal* signal, Uint32 sumaRef);
-
-    void startSubscriber(Signal* signal, Uint32 sumaRef);
-    void nextSubscriber(Signal* signal, Uint32 sumaRef, SubscriberPtr subbPtr);
-    void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
-			 Signal* signal, Uint32 sumaRef);
-    void runSUB_START_CONF(Signal* signal);
-    void completeSubscriber(Signal* signal, Uint32 sumaRef);
-
-    void completeRestartingNode(Signal* signal, Uint32 sumaRef);
-    void resetRestart(Signal* signal);
-  } Restart;
-
 private:
-  friend class Restart;
   /**
    * Variables
    */
@@ -494,6 +479,19 @@
     Uint32 m_restart_server_node_id;
     NdbNodeBitmask m_handover_nodes;
   } c_startup;
+
+  struct Restart
+  {
+    Uint16 m_abort;
+    Uint16 m_waiting_on_self;
+    Uint32 m_ref;
+    Uint32 m_max_seq;
+    Uint32 m_subPtrI;
+    Uint32 m_subOpPtrI;
+    Uint32 m_bucket; // In c_subscribers hashtable
+  } c_restart;
+
+  Uint32 c_current_seq; // Sequence no on subscription(s)
   
   NodeBitmask c_connected_nodes;  // (NODE/API) START REP / (API/NODE) FAIL REQ
   NodeBitmask c_subscriber_nodes; // 
@@ -506,6 +504,7 @@
   Uint32 c_nodesInGroup[MAX_REPLICAS];
   NdbNodeBitmask c_nodes_in_nodegroup_mask;  // NodeId's of nodes in nodegroup
 
+  void send_dict_lock_req(Signal* signal);
   void send_start_me_req(Signal* signal);
   void check_start_handover(Signal* signal);
   void send_handover_req(Signal* signal);

--- 1.11/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2007-12-14 13:48:09 +01:00
+++ 1.12/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2007-12-14 13:48:09 +01:00
@@ -21,13 +21,12 @@
 
 Suma::Suma(const Configuration & conf) :
   SimulatedBlock(SUMA, conf),
-  c_metaSubscribers(c_subscriberPool),
-  c_removeDataSubscribers(c_subscriberPool),
   c_tables(c_tablePool),
   c_subscriptions(c_subscriptionPool),
-  Restart(*this),
   c_gcp_list(c_gcp_pool)
 {
+  BLOCK_CONSTRUCTOR(Suma);
+
   // Add received signals
   addRecSignal(GSN_READ_CONFIG_REQ, &Suma::execREAD_CONFIG_REQ);
   addRecSignal(GSN_STTOR, &Suma::execSTTOR);
@@ -68,8 +67,6 @@
   addRecSignal(GSN_SUB_REMOVE_REQ, &Suma::execSUB_REMOVE_REQ);
   addRecSignal(GSN_SUB_START_REQ, &Suma::execSUB_START_REQ);
   addRecSignal(GSN_SUB_STOP_REQ, &Suma::execSUB_STOP_REQ);
-  addRecSignal(GSN_SUB_STOP_REF, &Suma::execSUB_STOP_REF);
-  addRecSignal(GSN_SUB_STOP_CONF, &Suma::execSUB_STOP_CONF);
   addRecSignal(GSN_SUB_SYNC_REQ, &Suma::execSUB_SYNC_REQ);
 
   /**
@@ -79,15 +76,12 @@
   addRecSignal(GSN_ALTER_TAB_CONF, &Suma::execALTER_TAB_CONF);
   addRecSignal(GSN_CREATE_TAB_CONF, &Suma::execCREATE_TAB_CONF);
 
-#if 0
-  addRecSignal(GSN_LIST_TABLES_CONF, &Suma::execLIST_TABLES_CONF);
-#endif
   addRecSignal(GSN_GET_TABINFO_CONF, &Suma::execGET_TABINFO_CONF);
   addRecSignal(GSN_GET_TABINFOREF, &Suma::execGET_TABINFOREF);
-#if 0
-  addRecSignal(GSN_GET_TABLEID_CONF, &Suma::execGET_TABLEID_CONF);
-  addRecSignal(GSN_GET_TABLEID_REF, &Suma::execGET_TABLEID_REF);
-#endif
+
+  addRecSignal(GSN_DICT_LOCK_REF, &Suma::execDICT_LOCK_REF);
+  addRecSignal(GSN_DICT_LOCK_CONF, &Suma::execDICT_LOCK_CONF);
+
   /**
    * Dih interface
    */
@@ -122,6 +116,8 @@
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, 
 	       &Suma::execSUB_GCP_COMPLETE_REP);
 
+  c_current_seq = 0;
+  c_restart.m_ref = 0;
   c_startup.m_restart_server_node_id = RNIL; // Server for my NR
 }
 

--- 1.4/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-12-14 13:48:09 +01:00
+++ 1.5/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-12-14 13:48:09 +01:00
@@ -34,6 +34,8 @@
     Head();
     Uint32 firstItem;
     Uint32 lastItem;
+
+    inline bool isEmpty() const { return firstItem == RNIL;}
   };
   
   DLFifoList(ArrayPool<T> & thePool);

--- 1.4/storage/ndb/src/kernel/vm/RequestTracker.hpp	2007-12-14 13:48:09 +01:00
+++ 1.5/storage/ndb/src/kernel/vm/RequestTracker.hpp	2007-12-14 13:48:09 +01:00
@@ -49,9 +49,10 @@
 
   bool done() { return m_sc.done(); }
 
+  NdbNodeBitmask m_confs;
+
 private:
   SafeCounterHandle m_sc;
-  NdbNodeBitmask m_confs;
   Uint8 m_nRefs;
 };
 

--- 1.4/storage/ndb/src/kernel/vm/SafeCounter.cpp	2007-12-14 13:48:09 +01:00
+++ 1.5/storage/ndb/src/kernel/vm/SafeCounter.cpp	2007-12-14 13:48:09 +01:00
@@ -34,6 +34,11 @@
   return m_counterPool.getSize();
 }
 
+Uint32
+SafeCounterManager::getNoOfFree() const {
+  return m_counterPool.getNoOfFree();
+}
+
 bool
 SafeCounterManager::seize(ActiveCounterPtr& ptr){
   return m_activeCounters.seize(ptr);

--- 1.5/storage/ndb/src/kernel/vm/SafeCounter.hpp	2007-12-14 13:48:09 +01:00
+++ 1.6/storage/ndb/src/kernel/vm/SafeCounter.hpp	2007-12-14 13:48:09 +01:00
@@ -65,6 +65,7 @@
   
   bool setSize(Uint32 maxNoOfActiveMutexes, bool exit_on_error = true);
   Uint32 getSize() const ;
+  Uint32 getNoOfFree() const;
 
   void execNODE_FAILREP(Signal*); 
   void printNODE_FAILREP(); 

--- 1.24/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2007-12-14 13:48:09 +01:00
+++ 1.25/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2007-12-14 13:48:09 +01:00
@@ -62,39 +62,6 @@
   c_fragmentIdCounter = 1;
   c_fragSenderRunning = false;
   
-  Properties tmp;
-  const Properties * p = &tmp;
-  ndbrequire(p != 0);
-
-  Uint32 count = 10;
-  char buf[255];
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentSendPool", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentSendPool", &count);
-  c_fragmentSendPool.setSize(count);
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentInfoPool", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentInfoPool", &count);
-  c_fragmentInfoPool.setSize(count);
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentInfoHash", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentInfoHash", &count);
-  c_fragmentInfoHash.setSize(count);
-
-  count = 5;
-  BaseString::snprintf(buf, 255, "%s.ActiveMutexes", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("ActiveMutexes", &count);
-  c_mutexMgr.setSize(count);
-  
-  c_counterMgr.setSize(5);
-  
 #ifdef VM_TRACE_TIME
   clearTimes();
 #endif
@@ -113,6 +80,30 @@
 #endif
 }
 
+void
+SimulatedBlock::initCommon()
+{
+  Uint32 count = 10;
+  getBlockConfig("FragmentSendPool", &count);
+  c_fragmentSendPool.setSize(count);
+
+  count = 10;
+  getBlockConfig("FragmentInfoPool", &count);
+  c_fragmentInfoPool.setSize(count);
+
+  count = 10;
+  getBlockConfig("FragmentInfoHash", &count);
+  c_fragmentInfoHash.setSize(count);
+
+  count = 5;
+  getBlockConfig("ActiveMutexes", &count);
+  c_mutexMgr.setSize(count);
+
+  count = 5;
+  getBlockConfig("SafeCounter", &count);
+  c_counterMgr.setSize(count);
+}
+
 SimulatedBlock::~SimulatedBlock()
 {
   freeBat();
@@ -1017,6 +1008,7 @@
     /**
      * Don't release allocated segments
      */
+    signal->header.m_fragmentInfo = 0;
     signal->header.m_noOfSections = 0;
     return false;
   }
@@ -1044,6 +1036,7 @@
      * fragInfo = 2
      */
     if(fragInfo == 2){
+      signal->header.m_fragmentInfo = 0;
       signal->header.m_noOfSections = 0;
       return false;
     }

--- 1.19/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2007-12-14 13:48:09 +01:00
+++ 1.20/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2007-12-14 13:48:09 +01:00
@@ -522,6 +522,8 @@
   // Variable for storing inserted errors, see pc.H
   ERROR_INSERT_VARIABLE;
 
+  virtual void initCommon();
+  virtual void getBlockConfig(const char * name, Uint32*) {}
 private:
   // Metadata common part shared by block instances
   MetaData::Common* c_ptrMetaDataCommon;
@@ -786,7 +788,8 @@
 private: \
   void addRecSignal(GlobalSignalNumber gsn, ExecSignalLocal f, bool force = false)
 
-#define BLOCK_CONSTRUCTOR(BLOCK)
+#define BLOCK_CONSTRUCTOR(BLOCK) { initCommon();}
+
 
 #define BLOCK_FUNCTIONS(BLOCK) \
 void \

--- 1.98/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-12-14 13:48:09 +01:00
+++ 1.99/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-12-14 13:48:09 +01:00
@@ -39,6 +39,7 @@
 #include "NdbBlobImpl.hpp"
 #include <AttributeHeader.hpp>
 #include <my_sys.h>
+#include <NdbSleep.h>
 
 #define DEBUG_PRINT 0
 #define INCOMPATIBLE_VERSION -2
@@ -973,7 +974,22 @@
 {
   DBUG_ENTER("NdbDictInterface::dictSignal");
   DBUG_PRINT("enter", ("useMasterNodeId: %d", useMasterNodeId));
-  for(Uint32 i = 0; i<RETRIES; i++){
+
+  int sleep = 50;
+  int mod = 5;
+
+  for(Uint32 i = 0; i<RETRIES; i++)
+  {
+
+    if (i == RETRIES / 2)
+    {
+      mod = 10;
+    }
+    if (i == 3*RETRIES/4)
+    {
+      sleep = 100;
+    }
+
     //if (useMasterNodeId == 0)
     m_buffer.clear();
 
@@ -1046,9 +1062,14 @@
     {
       const NdbError &error= getNdbError();
       if (error.status ==  NdbError::TemporaryError)
-	continue;
+      {
+        NdbSleep_MilliSleep(sleep + 10 * (rand() % mod));
+        continue;
+      }
     }
-    else if ( (temporaryMask & m_error.code) != 0 ) {
+    else if ( (temporaryMask & m_error.code) != 0 )
+    {
+      NdbSleep_MilliSleep(sleep + 10 * (rand() % mod));
       continue;
     }
     if (errcodes) {
@@ -1059,7 +1080,10 @@
 	  break;
 	}
       if (doContinue)
-	continue;
+      {
+        NdbSleep_MilliSleep(sleep + 10 * (rand() % mod));
+        continue;
+      }
     }
 
     DBUG_RETURN(-1);
@@ -2658,7 +2682,7 @@
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_SUB_START_REQ;
-  tSignal.theLength = SubStartReq::SignalLength2;
+  tSignal.theLength = SubStartReq::SignalLength;
   
   SubStartReq * req = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
 
@@ -2672,7 +2696,9 @@
 		     "subscriberData=%d",req->subscriptionId,
 		     req->subscriptionKey,req->subscriberData));
 
-  int errCodes[] = { SubStartRef::Busy };
+  int errCodes[] = { SubStartRef::Busy,
+                     SubStartRef::BusyWithNR,
+                     SubStartRef::NotMaster };
   DBUG_RETURN(dictSignal(&tSignal,NULL,0,
 			 1 /*use masternode id*/,
 			 100,
@@ -2712,7 +2738,9 @@
 		     "subscriberData=%d",req->subscriptionId,
 		     req->subscriptionKey,req->subscriberData));
 
-  int errCodes[] = { SubStopRef::Busy };
+  int errCodes[] = { SubStopRef::Busy,
+                     SubStopRef::BusyWithNR,
+                     SubStopRef::NotMaster };
   DBUG_RETURN(dictSignal(&tSignal,NULL,0,
 			 1 /*use masternode id*/,
 			 100,
@@ -2893,6 +2921,13 @@
 
   DBUG_PRINT("error",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d",
 		      subscriptionId,subscriptionKey,subscriberData,m_error.code));
+
+  if (m_error.code == SubStopRef::NotMaster &&
+      signal->getLength() >= SubStopRef::SL_MasterNode)
+  {
+    m_masterNodeId = subStopRef->masterNodeId;
+  }
+
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
@@ -2941,6 +2976,11 @@
   const SubStartRef * const subStartRef=
     CAST_CONSTPTR(SubStartRef, signal->getDataPtr());
   m_error.code= subStartRef->errorCode;
+  if (m_error.code == SubStartRef::NotMaster &&
+      signal->getLength() >= SubStartRef::SL_MasterNode)
+  {
+    m_masterNodeId = subStartRef->masterNodeId;
+  }
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }

--- 1.51/storage/ndb/src/ndbapi/ndberror.c	2007-12-14 13:48:09 +01:00
+++ 1.52/storage/ndb/src/ndbapi/ndberror.c	2007-12-14 13:48:09 +01:00
@@ -334,7 +334,7 @@
   /**
    * SchemaError
    */
-  { 701,  DMEC, SE, "System busy with other schema operation" },
+  { 701,  DMEC, TR, "System busy with other schema operation" },
   { 703,  DMEC, SE, "Invalid table format" },
   { 704,  DMEC, SE, "Attribute name too long" },
   { 705,  DMEC, SE, "Table name too long" },
@@ -433,6 +433,12 @@
   { 1418, DMEC, SE, "Subscription dropped, no new subscribers allowed" },
   { 1419, DMEC, SE, "Subscription already dropped" },
   { 1421, DMEC, SE, "Partially connected API in NdbOperation::execute()" },
+  { 1422, DMEC, SE, "Out of subscription records" },
+  { 1423, DMEC, SE, "Out of table records in SUMA" },
+  { 1424, DMES, SE, "Out of MaxNoOfConcurrentSubOperations" ],
+  { 1425, DMEC, SE, "Subscription being defined...while trying to stop subscriber" },
+  { 1426, DMEC, SE, "No such subscriber" },
+  { 1427, DMEC, SE, "Api node died, when SUB_START_REQ reached node "},
   
   { 4004, DMEC, AE, "Attribute name not found in the Table" },
   
Thread
bk commit into 5.1 tree (jonas:1.2179)jonas14 Dec