MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:jonas Date:February 20 2008 9:04am
Subject:bk commit into 5.1 tree (jonas:1.2532)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas.  When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-02-20 10:04:29+01:00, jonas@stripped +27 -0
  ndb - SUMA v2.1
    port of SUMA v2 from drop6 to telco-6.2
    (with adoption made to changes made in 51)

  storage/ndb/include/kernel/signaldata/DictLock.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +21 -3
    add new lock types

  storage/ndb/include/kernel/signaldata/SumaImpl.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +52 -15
    fix suma signal classes
    (mainly add new error codes)

  storage/ndb/include/mgmapi/mgmapi_config_parameters.h@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +6 -1
    add new config parameters

  storage/ndb/include/ndb_version.h.in@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +23 -1
    add version defines

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +256 -57
    1) add SUMA dict lock
    2) fix so that SUB_START_REQ is (almost) atomic
    3) set correct error codes in various places

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +12 -0
    1) add SUMA dict lock
    2) fix so that SUB_START_REQ is (almost) atomic
    3) set correct error codes in various places

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1 -1
    dick lock

  storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1 -1
    let QMGR wait for SUMA api_failconf

  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +21 -21
    let QMGR wait for SUMA api_failconf

  storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1840 -1939
    SUMA v2
    - create SubOperation
    - Remove all O(n) loops
      and use SubOp together with CONTINUEB
    - fix various bugs

  storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +150 -153
    SUMA v2
    - create SubOperation
    - Remove all O(n) loops
      and use SubOp together with CONTINUEB
    - fix various bugs

  storage/ndb/src/kernel/blocks/suma/SumaInit.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +6 -12
    SUMA v2
    - create SubOperation
    - Remove all O(n) loops
      and use SubOp together with CONTINUEB
    - fix various bugs

  storage/ndb/src/kernel/blocks/trix/Trix.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +2 -0
    Fix unique index build wrt SUMA v2

  storage/ndb/src/kernel/blocks/trix/Trix.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1 -0
    Fix unique index build wrt SUMA v2

  storage/ndb/src/kernel/vm/RequestTracker.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +2 -1
    debug

  storage/ndb/src/kernel/vm/SafeCounter.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +5 -0
    debug

  storage/ndb/src/kernel/vm/SafeCounter.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1 -0
    debug

  storage/ndb/src/kernel/vm/SimulatedBlock.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +24 -33
    move getParam from SimulatedBlock-constructor
      and put it into BLOCK_CONSTRUCTOR

  storage/ndb/src/kernel/vm/SimulatedBlock.hpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +4 -1
    move getParam from SimulatedBlock-constructor
      and put it into BLOCK_CONSTRUCTOR

  storage/ndb/src/mgmsrv/ConfigInfo.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +36 -0
    new parameters

  storage/ndb/src/ndbapi/Ndb.cpp@stripped, 2008-02-20 10:04:24+01:00, jonas@stripped +1 -11
    fix race condiction

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +31 -8
    fix race condiction

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +24 -2
    fix race condiction

  storage/ndb/src/ndbapi/ndberror.c@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +6 -0
    new error codes

  storage/ndb/test/ndbapi/test_event.cpp@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +117 -7
    new testcase

  storage/ndb/test/run-test/daily-basic-tests.txt@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +3 -0
    new testcase

  storage/ndb/test/tools/connect.cpp@stripped, 2008-02-20 10:04:25+01:00, jonas@stripped +73 -49
    more features

diff -Nrup a/storage/ndb/include/kernel/signaldata/DictLock.hpp b/storage/ndb/include/kernel/signaldata/DictLock.hpp
--- a/storage/ndb/include/kernel/signaldata/DictLock.hpp	2006-12-23 20:20:04 +01:00
+++ b/storage/ndb/include/kernel/signaldata/DictLock.hpp	2008-02-20 10:04:24 +01:00
@@ -23,11 +23,23 @@
 class DictLockReq {
   friend class Dbdict;
   friend class Dbdih;
+  friend class Suma;
 public:
   STATIC_CONST( SignalLength = 3 );
   enum LockType {
-    NoLock = 0,
-    NodeRestartLock = 1
+    NoLock = 0
+    ,NodeRestartLock = 1 // S-lock
+    ,NodeFailureLock = 2 // S-lock
+    ,CreateTableLock = 3
+    ,AlterTableLock  = 4
+    ,DropTableLock   = 5
+    ,CreateIndexLock = 6
+    ,DropIndexLock   = 7
+    ,CreateFileLock  = 8
+    ,CreateFilegroupLock = 9
+    ,DropFileLock    = 10
+    ,DropFilegroupLock = 11
+    ,SumaStartMe = 12
   };
 private:
   Uint32 userPtr;
@@ -49,6 +61,7 @@ private:
 class DictLockRef {
   friend class Dbdict;
   friend class Dbdih;
+  friend class Suma;
 public:
   STATIC_CONST( SignalLength = 3 );
   enum ErrorCode {
@@ -67,11 +80,16 @@ private:
 class DictUnlockOrd {
   friend class Dbdict;
   friend class Dbdih;
+  friend class Suma;
 public:
-  STATIC_CONST( SignalLength = 2 );
+  STATIC_CONST( SignalLengthDict = 2 );
+  STATIC_CONST( SignalLengthDih  = 2 );
+  STATIC_CONST( SignalLengthSuma = 4 );
 private:
   Uint32 lockPtr;
   Uint32 lockType;
+  Uint32 senderData;
+  Uint32 senderRef;
 };
 
 #endif
diff -Nrup a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2008-02-11 14:24:16 +01:00
+++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2008-02-20 10:04:24 +01:00
@@ -29,7 +29,6 @@ struct SubCreateReq {
   
   friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
   STATIC_CONST( SignalLength = 6 );
-  STATIC_CONST( SignalLength2 = 7 );
   
   enum SubscriptionType {
     SingleTableScan  = 1,  // 
@@ -38,7 +37,6 @@ struct SubCreateReq {
     SelectiveTableSnapshot  = 4,  // User defines tables
     RemoveFlags  = 0xff,
     GetFlags     = 0xff << 16,
-    AddTableFlag = 0x1 << 16,
     RestartFlag  = 0x2 << 16,
     ReportAll    = 0x4 << 16,
     ReportSubscribe= 0x8 << 16
@@ -50,7 +48,6 @@ struct SubCreateReq {
   Uint32 subscriptionKey;
   Uint32 subscriptionType;
   Uint32 tableId;
-  Uint32 state;
 };
 
 struct SubCreateRef {
@@ -66,6 +63,15 @@ struct SubCreateRef {
   Uint32 senderRef;
   Uint32 senderData;
   Uint32 errorCode;
+
+  enum ErrorCode
+  {
+    SubscriptionAlreadyExist = 1415
+    ,OutOfSubscriptionRecords = 1422
+    ,OutOfTableRecords = 1423
+    ,TableDropped = 1417
+    ,NF_FakeErrorREF = 11
+  };
 };
 
 struct SubCreateConf {
@@ -96,8 +102,7 @@ struct SubStartReq {
   friend struct Suma;
   
   friend bool printSUB_START_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 6 );
-  STATIC_CONST( SignalLength2 = SignalLength+1 );
+  STATIC_CONST( SignalLength = 7 );
 
   Uint32 senderRef;
   Uint32 senderData;
@@ -119,12 +124,21 @@ struct SubStartRef {
     Undefined = 1,
     NF_FakeErrorREF = 11,
     Busy = 701,
-    NotMaster = 702,
-    PartiallyConnected = 1421
+    PartiallyConnected = 1421,
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    Dropped = 1418,
+    Defining = 1418,
+    OutOfSubscriberRecords = 1412,
+    OutOfSubOpRecords = 1424,
+    NotMaster = 702, // For API/DICT communication
+    BusyWithNR = 1405,
+    NodeDied = 1427
   };
 
   STATIC_CONST( SignalLength = 7 );
   STATIC_CONST( SignalLength2 = SignalLength+1 );
+  STATIC_CONST( SL_MasterNode = 9 );
   
   Uint32 senderRef;
   Uint32 senderData;
@@ -168,8 +182,13 @@ struct SubStopReq {
    */
   friend struct Suma;
   
+  enum RequestInfo
+  {
+    RI_ABORT_START = 0x1
+  };
+
   friend bool printSUB_STOP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 7 );
+  STATIC_CONST( SignalLength = 8 );
   Uint32 senderRef;
   Uint32 senderData;
   Uint32 subscriptionId;
@@ -177,6 +196,7 @@ struct SubStopReq {
   Uint32 part;  // SubscriptionData::Part
   Uint32 subscriberData;
   Uint32 subscriberRef;
+  Uint32 requestInfo;
 };
 
 struct SubStopRef {
@@ -190,11 +210,17 @@ struct SubStopRef {
     Undefined = 1,
     NF_FakeErrorREF = 11,
     Busy = 701,
-    NotMaster = 702
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    Defining = 1425,
+    OutOfSubOpRecords = 1424,
+    NoSuchSubscriber = 1426,
+    NotMaster = 702,
+    BusyWithNR = 1405
   };
 
   STATIC_CONST( SignalLength = 8 );
-  STATIC_CONST( SignalLength2 = SignalLength+1 );
+  STATIC_CONST( SL_MasterNode = 9 );
   
   Uint32 senderRef;
   Uint32 senderData;
@@ -204,7 +230,6 @@ struct SubStopRef {
   Uint32 subscriberData;
   Uint32 subscriberRef;
   Uint32 errorCode;
-  // with SignalLength2
   Uint32 m_masterNodeId;
 };
 
@@ -262,6 +287,7 @@ struct SubSyncRef {
   Uint32 senderRef;
   Uint32 senderData;
   Uint32 errorCode;
+  Uint32 masterNodeId;
 };
 
 struct SubSyncConf {
@@ -343,10 +369,11 @@ struct SubSyncContinueReq {
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 2 );
+  STATIC_CONST( SignalLength = 3 );
 
   Uint32 subscriberData;
   Uint32 noOfRowsSent;
+  Uint32 senderData;
 };
 
 struct SubSyncContinueRef {
@@ -358,10 +385,11 @@ struct SubSyncContinueRef {
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 2 );
+  STATIC_CONST( SignalLength = 3 );
 
   Uint32 subscriptionId;
   Uint32 subscriptionKey;
+  Uint32 senderData;
 };
 
 struct SubSyncContinueConf {
@@ -373,10 +401,11 @@ struct SubSyncContinueConf {
   friend struct Trix;
   
   friend bool printSUB_SYNC_CONTINUE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-  STATIC_CONST( SignalLength = 2 );
+  STATIC_CONST( SignalLength = 3 );
 
   Uint32 subscriptionId;
   Uint32 subscriptionKey;
+  Uint32 senderData;
 };
 
 struct SubGcpCompleteRep {
@@ -439,7 +468,10 @@ struct SubRemoveRef {
   enum ErrorCode {
     Undefined = 1,
     NF_FakeErrorREF = 11,
-    Busy = 701
+    Busy = 701,
+    NoSuchSubscription = 1407,
+    Locked = 1411,
+    AlreadyDropped = 1419
   };
 
   Uint32 senderRef;
@@ -547,6 +579,11 @@ struct SumaContinueB
     RESEND_BUCKET = 1
     ,RELEASE_GCI = 2
     ,OUT_OF_BUFFER_RELEASE = 3
+    ,API_FAIL_GCI_LIST = 4
+    ,API_FAIL_SUBSCRIBER_LIST = 5
+    ,API_FAIL_SUBSCRIPTION = 6
+    ,SUB_STOP_REQ = 7
+    ,RETRY_DICT_LOCK = 8
   };
 };
 
diff -Nrup a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-02-11 14:24:16 +01:00
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-02-20 10:04:24 +01:00
@@ -123,7 +123,12 @@
 #define CFG_DB_MICRO_GCP_INTERVAL     170 /* micro gcp */
 #define CFG_DB_MICRO_GCP_TIMEOUT      171
 
-#define CFG_DB_MAX_BUFFERED_GCP       180 /* subscriptions */
+/* 172 - 178 used in 6.3 */
+
+#define CFG_DB_SUBSCRIPTIONS          179
+#define CFG_DB_SUBSCRIBERS            180
+#define CFG_DB_SUB_OPERATIONS         181
+#define CFG_DB_MAX_BUFFERED_GCP       182
 
 #define CFG_DB_SGA                    198 /* super pool mem */
 #define CFG_DB_DATA_MEM_2             199 /* used in special build in 5.1 */
diff -Nrup a/storage/ndb/include/ndb_version.h.in b/storage/ndb/include/ndb_version.h.in
--- a/storage/ndb/include/ndb_version.h.in	2007-10-24 11:36:17 +02:00
+++ b/storage/ndb/include/ndb_version.h.in	2008-02-20 10:04:24 +01:00
@@ -160,5 +160,27 @@ ndb_check_prep_copy_frag_version(Uint32 
   return 0;
 }
 
+#define NDBD_SUMA_DICT_LOCK_62 NDB_MAKE_VERSION(6,2,13)
+#define NDBD_SUMA_DICT_LOCK_63 NDB_MAKE_VERSION(6,3,11)
+
+static
+inline
+int
+ndbd_suma_dictlock(Uint32 x)
+{
+  if (x >= NDB_VERSION_D)
+    return 1;
+  
+  const Uint32 major = (x >> 16) & 0xFF;
+  const Uint32 minor = (x >>  8) & 0xFF;
+  
+  if (major >= 6)
+  {
+    if (minor == 2)
+      return x >= NDBD_SUMA_DICT_LOCK_62;
+  }
+  
+  return x >= NDBD_SUMA_DICT_LOCK_63;
+}
+
 #endif
- 
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-01-04 11:35:56 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-02-20 10:04:24 +01:00
@@ -81,6 +81,10 @@
 #include <signaldata/CreateTab.hpp>
 #include <NdbSleep.h>
 #include <signaldata/ApiBroadcast.hpp>
+#include <signaldata/DictLock.hpp>
+
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
 
 #include <signaldata/DropObj.hpp>
 #include <signaldata/CreateObj.hpp>
@@ -301,6 +305,14 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
     }
   }    
   
+  if (signal->theData[0] == 8004)
+  {
+    infoEvent("DICT: c_counterMgr size: %u free: %u",
+              c_counterMgr.getSize(),
+              c_counterMgr.getNoOfFree());
+    c_counterMgr.printNODE_FAILREP();
+  }
+    
   return;
 }//Dbdict::execDUMP_STATE_ORD()
 
@@ -1743,6 +1755,20 @@ Dbdict::~Dbdict() 
 
 BLOCK_FUNCTIONS(Dbdict)
 
+bool
+Dbdict::getParam(const char * name, Uint32 * count)
+{
+  if (name != 0 && count != 0)
+  {
+    if (strcmp(name, "ActiveCounters") == 0)
+    {
+      * count = 25;
+      return true;
+    }
+  }
+  return false;
+}
+
 void Dbdict::initCommonData() 
 {
 /* ---------------------------------------------------------------- */
@@ -1770,6 +1796,9 @@ void Dbdict::initCommonData() 
   c_systemRestart = false;
   c_initialNodeRestart = false;
   c_nodeRestart = false;
+
+  c_outstanding_sub_startstop = 0;
+  c_sub_startstop_lock.clear();
 }//Dbdict::initCommonData()
 
 void Dbdict::initRecords() 
@@ -3769,9 +3798,12 @@ void Dbdict::execNODE_FAILREP(Signal* si
   }
   ndbrequire(ok);
   
+  NdbNodeBitmask tmp;
+  tmp.assign(NdbNodeBitmask::Size, theFailedNodes);
+
   for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
     jam();
-    if(NdbNodeBitmask::get(theFailedNodes, i)) {
+    if(tmp.get(i)) {
       jam();
       NodeRecordPtr nodePtr;
       c_nodes.getPtr(nodePtr, i);
@@ -3797,6 +3829,7 @@ void Dbdict::execNODE_FAILREP(Signal* si
    */
   removeStaleDictLocks(signal, theFailedNodes);
 
+  c_sub_startstop_lock.bitANDC(tmp);
 }//execNODE_FAILREP()
 
 
@@ -9136,7 +9169,7 @@ void Dbdict::execUTIL_EXECUTE_CONF(Signa
 {
   jamEntry();
   EVENT_TRACE;
-   ndbrequire(recvSignalUtilReq(signal, 0) == 0);
+  ndbrequire(recvSignalUtilReq(signal, 0) == 0);
 }
 
 void Dbdict::execUTIL_EXECUTE_REF(Signal *signal)
@@ -9392,12 +9425,13 @@ Dbdict::execCREATE_EVNT_REQ(Signal* sign
   }
 #endif
 
+  CreateEvntReq *req = (CreateEvntReq*)signal->getDataPtr();
+
   if (! assembleFragments(signal)) {
     jam();
     return;
   }
 
-  CreateEvntReq *req = (CreateEvntReq*)signal->getDataPtr();
   const CreateEvntReq::RequestType requestType = req->getRequestType();
   const Uint32                     requestFlag = req->getRequestFlag();
 
@@ -9655,17 +9689,17 @@ void interpretUtilPrepareErrorCode(UtilP
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::DICT_TAB_INFO_ERROR:
     jam();
-    error = 1;
+    error = 748;
     line = __LINE__;
     DBUG_VOID_RETURN;
   case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
@@ -10089,6 +10123,7 @@ Dbdict::createEventComplete_RT_USER_GET(
   }
 
   sendSignal(rg, GSN_CREATE_EVNT_REQ, signal, CreateEvntReq::SignalLength, JBB);
+  return;
 }
 
 void
@@ -10107,7 +10142,6 @@ void Dbdict::execCREATE_EVNT_REF(Signal*
   OpCreateEventPtr evntRecPtr;
 
   evntRecPtr.i = ref->getUserData();
-
   ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
 #ifdef EVENT_PH2_DEBUG
@@ -10119,6 +10153,7 @@ void Dbdict::execCREATE_EVNT_REF(Signal*
     evntRecPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(ref->senderRef));
   } else {
     jam();
+    evntRecPtr.p->m_errorCode = ref->errorCode;
     evntRecPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(ref->senderRef));
   }
   createEvent_sendReply(signal, evntRecPtr);
@@ -10212,12 +10247,6 @@ void Dbdict::execSUB_CREATE_REF(Signal* 
   evntRecPtr.i = ref->senderData;
   ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
-  if (ref->errorCode == 1415) {
-    jam();
-    createEvent_sendReply(signal, evntRecPtr);
-    DBUG_VOID_RETURN;
-  }
-
   if (ref->errorCode)
   {
     evntRecPtr.p->m_errorCode = ref->errorCode;
@@ -10355,6 +10384,68 @@ void Dbdict::createEvent_sendReply(Signa
  *
  *******************************************************************/
 
+#if 0
+void
+Dbdict::execDICT_LOCK_REQ(Signal* signal)
+{
+  jamEntry();
+  DictLockReq req = *(DictLockReq*)signal->getDataPtr();
+
+  Uint32 err = 0;
+  do {
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      err = DictLockRef::NotMaster;
+      break;
+    }
+
+    if (req.lockType != DictLockReq::SumaStartMe)
+    {
+      jam();
+      err = DictLockRef::InvalidLockType;
+      break;
+    }
+
+    if (c_outstanding_sub_startstop)
+    {
+      jam();
+      g_eventLogger.info("refing dict lock to %u", refToNode(req.userRef));
+      err = DictLockRef::TooManyRequests;
+      break;
+    }
+
+    c_sub_startstop_lock.set(refToNode(req.userRef));
+
+    g_eventLogger.info("granting dict lock to %u", refToNode(req.userRef));
+    DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
+    conf->userPtr = req.userPtr;
+    conf->lockType = req.lockType;
+    conf->lockPtr = 0;
+    sendSignal(req.userRef, GSN_DICT_LOCK_CONF, signal,
+               DictLockConf::SignalLength, JBB);
+    return;
+  } while(0);
+
+  DictLockRef* ref = (DictLockRef*)signal->getDataPtrSend();
+  ref->userPtr = req.userPtr;
+  ref->lockType = req.lockType;
+  ref->errorCode = err;
+  sendSignal(req.userRef, GSN_DICT_LOCK_REF, signal,
+             DictLockRef::SignalLength, JBB);
+}
+
+void
+Dbdict::execDICT_UNLOCK_ORD(Signal* signal)
+{
+  jamEntry();
+  DictUnlockOrd ord = *(DictUnlockOrd*)signal->getDataPtr();
+
+  g_eventLogger.info("clearing dict lock for %u", refToNode(ord.senderRef));
+  c_sub_startstop_lock.clear(refToNode(ord.senderRef));
+}
+#endif
+
 void Dbdict::execSUB_START_REQ(Signal* signal)
 {
   jamEntry();
@@ -10378,14 +10469,17 @@ void Dbdict::execSUB_START_REQ(Signal* s
   OpSubEventPtr subbPtr;
   Uint32 errCode = 0;
 
+
+#if XXX_JONAS
   DictLockPtr loopPtr;
   if (c_dictLockQueue.first(loopPtr) &&
       loopPtr.p->lt->lockType == DictLockReq::NodeRestartLock)
   {
     jam();
-    errCode = 1405;
+    errCode = SubStartRef::BusyWithNR;
     goto busy;
   }
+#endif
 
   if (!c_opSubEvent.seize(subbPtr)) {
     errCode = SubStartRef::Busy;
@@ -10403,9 +10497,10 @@ busy:
     //      ret->setErrorNode(reference());
     ref->senderRef = reference();
     ref->errorCode = errCode;
+    ref->m_masterNodeId = c_masterNodeId;
 
     sendSignal(origSenderRef, GSN_SUB_START_REF, signal,
-	       SubStartRef::SignalLength2, JBB);
+	       SubStartRef::SL_MasterNode, JBB);
     return;
   }
 
@@ -10414,6 +10509,11 @@ busy:
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_START_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = req->subscriberRef;
+    subbPtr.p->m_subscriberData = req->subscriberData;
   }
   
   if (refToBlock(origSenderRef) != DBDICT) {
@@ -10421,6 +10521,22 @@ busy:
      * Coordinator
      */
     jam();
+
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStartRef::NotMaster;
+      goto busy;
+    }
+
+    if (!c_sub_startstop_lock.isclear())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStartRef::BusyWithNR;
+      goto busy;
+    }
     
     subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly
     NodeReceiverGroup rg(DBDICT, c_aliveNodes);
@@ -10450,15 +10566,16 @@ busy:
         rg.m_nodes.clear(getOwnNodeId());
       }
       sendSignal(rg, GSN_SUB_START_REQ, signal,
-                 SubStartReq::SignalLength2, JBB);
+                 SubStartReq::SignalLength, JBB);
       sendSignalWithDelay(reference(),
                           GSN_SUB_START_REQ,
-                          signal, 5000, SubStartReq::SignalLength2);
+                          signal, 5000, SubStartReq::SignalLength);
     }
     else
     {
+      c_outstanding_sub_startstop++;
       sendSignal(rg, GSN_SUB_START_REQ, signal,
-                 SubStartReq::SignalLength2, JBB);
+                 SubStartReq::SignalLength, JBB);
     }
     return;
   }
@@ -10478,7 +10595,7 @@ busy:
 #ifdef EVENT_PH3_DEBUG
     ndbout_c("DBDICT(Participant) sending GSN_SUB_START_REQ to SUMA subbPtr.i = (%d)", subbPtr.i);
 #endif
-    sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength2, JBB);
+    sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength, JBB);
   }
 }
 
@@ -10591,26 +10708,36 @@ void Dbdict::completeSubStartReq(Signal*
     return;
   }
 
-  if (subbPtr.p->m_reqTracker.hasRef()) {
+  if (subbPtr.p->m_reqTracker.hasRef())
+  {
     jam();
 #ifdef EVENT_DEBUG
     ndbout_c("SUB_START_REF");
 #endif
-    SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
-    ref->senderRef = reference();
-    ref->errorCode = subbPtr.p->m_errorCode;
-    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
-	       signal, SubStartRef::SignalLength, JBB);
-    if (subbPtr.p->m_reqTracker.hasConf()) {
-      //  stopStartedNodes(signal);
-    }
-    c_opSubEvent.release(subbPtr);
+
+    NodeReceiverGroup rg(DBDICT, subbPtr.p->m_reqTracker.m_confs);
+    RequestTracker & p = subbPtr.p->m_reqTracker;
+    ndbrequire(p.init<SubStopRef>(c_counterMgr, rg, GSN_SUB_STOP_REF,
+                                  subbPtr.i));
+
+    SubStopReq* req = (SubStopReq*) signal->getDataPtrSend();
+
+    req->senderRef  = reference();
+    req->senderData = subbPtr.i;
+    req->subscriptionId = subbPtr.p->m_subscriptionId;
+    req->subscriptionKey = subbPtr.p->m_subscriptionKey;
+    req->subscriberRef = subbPtr.p->m_subscriberRef;
+    req->subscriberData = subbPtr.p->m_subscriberData;
+    req->requestInfo = SubStopReq::RI_ABORT_START;
+    sendSignal(rg, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
     return;
   }
 #ifdef EVENT_DEBUG
   ndbout_c("SUB_START_CONF");
 #endif
   
+  ndbrequire(c_outstanding_sub_startstop);
+  c_outstanding_sub_startstop--;
   SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
   * conf = subbPtr.p->m_sub_start_conf;
   sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_CONF,
@@ -10630,20 +10757,6 @@ void Dbdict::execSUB_STOP_REQ(Signal* si
 
   Uint32 origSenderRef = signal->senderBlockRef();
 
-  if (refToBlock(origSenderRef) != DBDICT &&
-      getOwnNodeId() != c_masterNodeId)
-  {
-    /*
-     * Coordinator but not master
-     */
-    SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
-    ref->senderRef = reference();
-    ref->errorCode = SubStopRef::NotMaster;
-    ref->m_masterNodeId = c_masterNodeId;
-    sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
-	       SubStopRef::SignalLength2, JBB);
-    return;
-  }
   OpSubEventPtr subbPtr;
   Uint32 errCode = 0;
   if (!c_opSubEvent.seize(subbPtr)) {
@@ -10656,17 +10769,29 @@ busy:
     //      ret->setErrorNode(reference());
     ref->senderRef = reference();
     ref->errorCode = errCode;
+    ref->m_masterNodeId = c_masterNodeId;
 
     sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
-	       SubStopRef::SignalLength, JBB);
+	       SubStopRef::SL_MasterNode, JBB);
     return;
   }
 
   {
-    const SubStopReq* req = (SubStopReq*) signal->getDataPtr();
+    SubStopReq* req = (SubStopReq*) signal->getDataPtr();
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_STOP_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = req->subscriberRef;
+    subbPtr.p->m_subscriberData = req->subscriberData;
+
+    if (signal->getLength() < SubStopReq::SignalLength)
+    {
+      jam();
+      req->requestInfo = 0;
+    }
   }
   
   if (refToBlock(origSenderRef) != DBDICT) {
@@ -10674,6 +10799,23 @@ busy:
      * Coordinator
      */
     jam();
+
+    if (c_masterNodeId != getOwnNodeId())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStopRef::NotMaster;
+      goto busy;
+    }
+
+    if (!c_sub_startstop_lock.isclear())
+    {
+      jam();
+      c_opSubEvent.release(subbPtr);
+      errCode = SubStopRef::BusyWithNR;
+      goto busy;
+    }
+
 #ifdef EVENT_DEBUG
     ndbout_c("SUB_STOP_REQ 1");
 #endif
@@ -10692,7 +10834,8 @@ busy:
     
     req->senderRef  = reference();
     req->senderData = subbPtr.i;
-    
+
+    c_outstanding_sub_startstop++;
     sendSignal(rg, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
     return;
   }
@@ -10806,6 +10949,23 @@ void Dbdict::completeSubStopReq(Signal* 
     return;
   }
 
+  ndbrequire(c_outstanding_sub_startstop);
+  c_outstanding_sub_startstop--;
+
+  if (subbPtr.p->m_gsn == GSN_SUB_START_REQ)
+  {
+    jam();
+    SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
+    ref->senderRef  = reference();
+    ref->senderData = subbPtr.p->m_senderData;
+    ref->errorCode  = subbPtr.p->m_errorCode;
+
+    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
+	       signal, SubStartRef::SignalLength, JBB);
+    c_opSubEvent.release(subbPtr);
+    return;
+  }
+
   if (subbPtr.p->m_reqTracker.hasRef()) {
     jam();
 #ifdef EVENT_DEBUG
@@ -11033,6 +11193,11 @@ Dbdict::execSUB_REMOVE_REQ(Signal* signa
     subbPtr.p->m_senderRef = req->senderRef;
     subbPtr.p->m_senderData = req->senderData;
     subbPtr.p->m_errorCode = 0;
+    subbPtr.p->m_gsn = GSN_SUB_REMOVE_REQ;
+    subbPtr.p->m_subscriptionId = req->subscriptionId;
+    subbPtr.p->m_subscriptionKey = req->subscriptionKey;
+    subbPtr.p->m_subscriberRef = RNIL;
+    subbPtr.p->m_subscriberData = RNIL;
   }
 
   CRASH_INSERTION2(6010, getOwnNodeId() != c_masterNodeId);
@@ -14034,44 +14199,67 @@ void
 Dbdict::execDICT_LOCK_REQ(Signal* signal)
 {
   jamEntry();
-  const DictLockReq* req = (const DictLockReq*)&signal->theData[0];
+  const DictLockReq req = *(DictLockReq*)&signal->theData[0];
 
   // make sure bad request crashes slave, not master (us)
 
   if (getOwnNodeId() != c_masterNodeId) {
     jam();
-    sendDictLockRef(signal, *req, DictLockRef::NotMaster);
+    sendDictLockRef(signal, req, DictLockRef::NotMaster);
+    return;
+  }
+
+  if (req.lockType == DictLockReq::SumaStartMe)
+  {
+    if (c_outstanding_sub_startstop)
+    {
+      jam();
+      g_eventLogger.info("refing dict lock to %u", refToNode(req.userRef));
+      sendDictLockRef(signal, req, DictLockRef::TooManyRequests);
+      return;
+    }
+
+    c_sub_startstop_lock.set(refToNode(req.userRef));
+
+    g_eventLogger.info("granting dict lock to %u", refToNode(req.userRef));
+    DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
+    conf->userPtr = req.userPtr;
+    conf->lockType = req.lockType;
+    conf->lockPtr = 0;
+    sendSignal(req.userRef, GSN_DICT_LOCK_CONF, signal,
+               DictLockConf::SignalLength, JBB);
     return;
   }
 
-  const DictLockType* lt = getDictLockType(req->lockType);
+  const DictLockType* lt = getDictLockType(req.lockType);
   if (lt == NULL) {
     jam();
-    sendDictLockRef(signal, *req, DictLockRef::InvalidLockType);
+    sendDictLockRef(signal, req, DictLockRef::InvalidLockType);
     return;
   }
 
-  if (req->userRef != signal->getSendersBlockRef() ||
-      getNodeInfo(refToNode(req->userRef)).m_type != NodeInfo::DB) {
+  if (req.userRef != signal->getSendersBlockRef() ||
+      getNodeInfo(refToNode(req.userRef)).m_type != NodeInfo::DB) {
     jam();
-    sendDictLockRef(signal, *req, DictLockRef::BadUserRef);
+    sendDictLockRef(signal, req, DictLockRef::BadUserRef);
     return;
   }
 
-  if (c_aliveNodes.get(refToNode(req->userRef))) {
+
+  if (c_aliveNodes.get(refToNode(req.userRef))) {
     jam();
-    sendDictLockRef(signal, *req, DictLockRef::TooLate);
+    sendDictLockRef(signal, req, DictLockRef::TooLate);
     return;
   }
 
   DictLockPtr lockPtr;
   if (! c_dictLockQueue.seize(lockPtr)) {
     jam();
-    sendDictLockRef(signal, *req, DictLockRef::TooManyRequests);
+    sendDictLockRef(signal, req, DictLockRef::TooManyRequests);
     return;
   }
 
-  lockPtr.p->req = *req;
+  lockPtr.p->req = req;
   lockPtr.p->locked = false;
   lockPtr.p->lt = lt;
 
@@ -14146,6 +14334,17 @@ Dbdict::execDICT_UNLOCK_ORD(Signal* sign
 {
   jamEntry();
   const DictUnlockOrd* ord = (const DictUnlockOrd*)&signal->theData[0];
+
+  if (ord->lockType ==  DictLockReq::SumaStartMe)
+  {
+    ndbassert(signal->getLength() == DictUnlockOrd::SignalLengthSuma);
+    g_eventLogger.info("clearing dict lock for %u", refToNode(ord->senderRef));
+    c_sub_startstop_lock.clear(refToNode(ord->senderRef));
+    return;
+  }
+
+  ndbassert(signal->getLength() == DictUnlockOrd::SignalLengthDict ||
+            signal->getLength() == DictUnlockOrd::SignalLengthDih);
 
   DictLockPtr lockPtr;
   c_dictLockQueue.getPtr(lockPtr, ord->lockPtr);
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-01-04 11:35:56 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-02-20 10:04:24 +01:00
@@ -1641,6 +1641,12 @@ private:
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_errorCode;
+
+    Uint32 m_gsn;
+    Uint32 m_subscriptionId;
+    Uint32 m_subscriptionKey;
+    Uint32 m_subscriberRef;
+    Uint32 m_subscriberData;
     union {
       SubStartConf m_sub_start_conf;
       SubStopConf m_sub_stop_conf;
@@ -2666,6 +2672,12 @@ public:
   void drop_undofile_commit_complete(Signal* signal, SchemaOp*);
   
   int checkSingleUserMode(Uint32 senderRef);
+
+  Uint32 c_outstanding_sub_startstop;
+  NdbNodeBitmask c_sub_startstop_lock;
+
+protected:
+  virtual bool getParam(const char * param, Uint32 * retVal);
 };
 
 inline bool
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-01-24 12:20:55 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-02-20 10:04:24 +01:00
@@ -16499,7 +16499,7 @@ Dbdih::sendDictUnlockOrd(Signal* signal,
 
   BlockReference dictMasterRef = calcDictBlockRef(cmasterNodeId);
   sendSignal(dictMasterRef, GSN_DICT_UNLOCK_ORD, signal,
-      DictUnlockOrd::SignalLength, JBB);
+      DictUnlockOrd::SignalLengthDih, JBB);
 }
 
 #ifdef ERROR_INSERT
diff -Nrup a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
--- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2007-11-23 11:06:27 +01:00
+++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2008-02-20 10:04:24 +01:00
@@ -91,6 +91,7 @@ public:
     NORMAL = 0,
     WAITING_FOR_FAILCONF1 = 1,
     WAITING_FOR_FAILCONF2 = 2,
+    WAITING_FOR_FAILCONF3 = 3,
     WAITING_FOR_NDB_FAILCONF = 3
   };
 
@@ -154,7 +155,6 @@ public:
     QmgrState sendCommitFailReqStatus;
     QmgrState sendPresToStatus;
     FailState failState;
-    BlockReference rcv[2];        // remember which failconf we have received
     BlockReference blockRef;
 
     NodeRec() { }
diff -Nrup a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2007-11-23 11:09:28 +01:00
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2008-02-20 10:04:24 +01:00
@@ -265,8 +265,6 @@ Qmgr::execSTART_ORD(Signal* signal)
     nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
     nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
     nodePtr.p->failState = NORMAL;
-    nodePtr.p->rcv[0] = 0;
-    nodePtr.p->rcv[1] = 0;
   }//for
 }
 
@@ -2570,30 +2568,26 @@ void Qmgr::execAPI_FAILCONF(Signal* sign
   failedNodePtr.i = signal->theData[0];  
   ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec);
 
-  if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF1){
+  if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF1)
+  {
     jam();
-
-    failedNodePtr.p->rcv[0] = signal->theData[1];
     failedNodePtr.p->failState = WAITING_FOR_FAILCONF2;
-
-  } else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF2) {
-    failedNodePtr.p->rcv[1] = signal->theData[1];
+  }
+  else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF2)
+  {
+    jam();
+    failedNodePtr.p->failState = WAITING_FOR_FAILCONF3;
+  }
+  else if (failedNodePtr.p->failState == WAITING_FOR_FAILCONF3)
+  {
+    jam();
     failedNodePtr.p->failState = NORMAL;
-
-    if (failedNodePtr.p->rcv[0] == failedNodePtr.p->rcv[1]) {
-      jam();
-      systemErrorLab(signal, __LINE__);
-    } else {
-      jam();
-      failedNodePtr.p->rcv[0] = 0;
-      failedNodePtr.p->rcv[1] = 0;
-    }//if
-  } else {
+  }
+  else
+  {
     jam();
-#ifdef VM_TRACE
     ndbout << "failedNodePtr.p->failState = "
 	   << (Uint32)(failedNodePtr.p->failState) << endl;
-#endif   
     systemErrorLab(signal, __LINE__);
   }//if
   return;
@@ -2820,7 +2814,7 @@ Qmgr::api_failed(Signal* signal, Uint32 
     signal->theData[0] = nodeId;
     signal->theData[1] = QMGR_REF;
     sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
-    failedNodePtr.p->failState = NORMAL;
+    failedNodePtr.p->failState = WAITING_FOR_FAILCONF3;
   }
 
   failedNodePtr.p->phase = ZFAIL_CLOSING;
@@ -5152,6 +5146,12 @@ Qmgr::execDUMP_STATE_ORD(Signal* signal)
     c_error_insert_extra = signal->theData[1];
   }
 #endif
+
+  if (signal->theData[0] == 900 && signal->getLength() == 2)
+  {
+    ndbout_c("disconnecting %u", signal->theData[1]);
+    api_failed(signal, signal->theData[1]);
+  }
 }//Qmgr::execDUMP_STATE_ORD()
 
 
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-02-19 09:10:33 +01:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-02-20 10:04:24 +01:00
@@ -45,12 +45,16 @@
 #include <signaldata/DihFragCount.hpp>
 #include <signaldata/SystemError.hpp>
 
+#include <signaldata/DictLock.hpp>
 #include <ndbapi/NdbDictionary.hpp>
 
 #include <DebuggerNames.hpp>
 #include <../dbtup/Dbtup.hpp>
 #include <../dbdih/Dbdih.hpp>
 
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
+
 //#define HANDOVER_DEBUG
 //#define NODEFAIL_DEBUG
 //#define NODEFAIL_DEBUG2
@@ -90,6 +94,8 @@ static const Uint32 MAX_CONCURRENT_GCP =
 
 #define PRINT_ONLY 0
 
+#include <ndb_version.h>
+
 void
 Suma::getNodeGroupMembers(Signal* signal)
 {
@@ -177,9 +183,36 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
   c_tables.setSize(noTables);
   
   c_subscriptions.setSize(noTables);
-  c_subscriberPool.setSize(2*noTables);
+
+  Uint32 cnt = 0;
+  cnt = 0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
+  if (cnt == 0)
+  {
+    jam();
+    cnt = noTables;
+  }
+  c_subscriptionPool.setSize(cnt);
+
+  cnt *= 2;
+  {
+    Uint32 val = 0;
+    ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val);
+    if (val)
+    {
+      jam();
+      cnt =  val;
+    }
+  }
+  c_subscriberPool.setSize(cnt);
+
+  cnt = 0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
+  if (cnt)
+    c_subOpPool.setSize(cnt);
+  else
+    c_subOpPool.setSize(256);
   
-  c_subscriptionPool.setSize(noTables);
   c_syncPool.setSize(2);
   c_dataBufferPool.setSize(noAttrs);
 
@@ -187,12 +220,19 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
 
   // Calculate needed gcp pool as 10 records + the ones needed
   // during a possible api timeout
-  Uint32 dbApiHbInterval, gcpInterval;
+  Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0;
   ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
 			    &dbApiHbInterval);
   ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
                             &gcpInterval);
-  c_gcp_pool.setSize(10 + (4*dbApiHbInterval)/gcpInterval);
+  ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL,
+                            &microGcpInterval);
+
+  if (microGcpInterval)
+  {
+    gcpInterval = microGcpInterval;
+  }
+  c_gcp_pool.setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval);
   
   c_page_chunk_pool.setSize(50);
 
@@ -270,16 +310,20 @@ Suma::execSTTOR(Signal* signal) {
       DBUG_VOID_RETURN;
     }
 
-    c_startup.m_restart_server_node_id = 0;    
     getNodeGroupMembers(signal);
     if (typeOfStart == NodeState::ST_NODE_RESTART ||
 	typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
     {
       jam();
-      
-      send_start_me_req(signal);
+
+      send_dict_lock_req(signal);
       return;
     }
+    else
+    {
+      jam();
+      c_startup.m_restart_server_node_id = 0;
+    }
   }
   
   if(startphase == 7)
@@ -358,6 +402,49 @@ Suma::execSTTOR(Signal* signal) {
 }
 
 void
+Suma::send_dict_lock_req(Signal* signal)
+{
+  if (ndbd_suma_dictlock(getNodeInfo(c_masterNodeId).m_version))
+  {
+    jam();
+    DictLockReq* req = (DictLockReq*)signal->getDataPtrSend();
+    req->lockType = DictLockReq::SumaStartMe;
+    req->userPtr = 0;
+    req->userRef = reference();
+    sendSignal(calcDictBlockRef(c_masterNodeId),
+               GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
+  }
+  else
+  {
+    jam();
+    c_startup.m_restart_server_node_id = 0;
+    send_start_me_req(signal);
+  }
+}
+
+void
+Suma::execDICT_LOCK_CONF(Signal* signal)
+{
+  jamEntry();
+  c_startup.m_restart_server_node_id = 0;
+
+  CRASH_INSERTION(13034);
+  send_start_me_req(signal);
+}
+
+void
+Suma::execDICT_LOCK_REF(Signal* signal)
+{
+  jamEntry();
+
+  DictLockRef* ref = (DictLockRef*)signal->getDataPtr();
+
+  ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
+  signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
+  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 1);
+}
+
+void
 Suma::send_start_me_req(Signal* signal)
 {
   Uint32 nodeId= c_startup.m_restart_server_node_id;
@@ -385,7 +472,22 @@ void
 Suma::execSUMA_START_ME_REF(Signal* signal)
 {
   const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
-  ndbrequire(ref->errorCode == SumaStartMeRef::Busy);
+
+  Uint32 error = ref->errorCode;
+  if (error != SumaStartMeRef::Busy)
+  {
+    jam();
+    // for some reason we did not manage to create a subscription
+    // on the starting node
+    SystemError * const sysErr = (SystemError*)&signal->theData[0];
+    sysErr->errorCode = SystemError::CopySubscriptionRef;
+    sysErr->errorRef = reference();
+    sysErr->data[0] = error;
+    sysErr->data[1] = 0;
+    sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal,
+               SystemError::SignalLength, JBB);
+    return;
+  }
 
   infoEvent("Suma: node %d refused %d", 
 	    c_startup.m_restart_server_node_id, ref->errorCode);
@@ -400,6 +502,18 @@ Suma::execSUMA_START_ME_CONF(Signal* sig
   infoEvent("Suma: node %d has completed restoring me", 
 	    c_startup.m_restart_server_node_id);
   sendSTTORRY(signal);  
+
+  if (ndbd_suma_dictlock(getNodeInfo(c_masterNodeId).m_version))
+  {
+    jam();
+    DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend();
+    ord->lockPtr = 0;
+    ord->lockType = DictLockReq::SumaStartMe;
+    ord->senderData = 0;
+    ord->senderRef = reference();
+    sendSignal(calcDictBlockRef(c_masterNodeId),
+               GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLengthSuma, JBB);
+  }
   c_startup.m_restart_server_node_id= 0;
 }
 
@@ -570,6 +684,23 @@ Suma::execCONTINUEB(Signal* signal){
   case SumaContinueB::OUT_OF_BUFFER_RELEASE:
     out_of_buffer_release(signal, signal->theData[1]);
     return;
+  case SumaContinueB::API_FAIL_GCI_LIST:
+    api_fail_gci_list(signal, signal->theData[1]);
+    return;
+  case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
+    api_fail_subscriber_list(signal,
+                             signal->theData[1]);
+    return;
+  case SumaContinueB::API_FAIL_SUBSCRIPTION:
+    api_fail_subscription(signal);
+    return;
+  case SumaContinueB::SUB_STOP_REQ:
+    sub_stop_req(signal);
+    return;
+  case SumaContinueB::RETRY_DICT_LOCK:
+    jam();
+    send_dict_lock_req(signal);
+    return;
   }
 }
 
@@ -584,182 +715,273 @@ void Suma::execAPI_FAILREQ(Signal* signa
   jamEntry();
   DBUG_ENTER("Suma::execAPI_FAILREQ");
   Uint32 failedApiNode = signal->theData[0];
-  //BlockReference retRef = signal->theData[1];
+  BlockReference retRef = signal->theData[1];
 
-  if (c_startup.m_restart_server_node_id &&
-      c_startup.m_restart_server_node_id != RNIL)
-  {
-    jam();
-    sendSignalWithDelay(reference(), GSN_API_FAILREQ, signal,
-                        200, signal->getLength());
-    return;
-  }
+  c_connected_nodes.clear(failedApiNode);
 
   if (c_failedApiNodes.get(failedApiNode))
   {
     jam();
-    return;
+    goto CONF;
   }
 
   if (!c_subscriber_nodes.get(failedApiNode))
   {
     jam();
-    return;
+    goto CONF;
   }
 
   c_failedApiNodes.set(failedApiNode);
-  c_connected_nodes.clear(failedApiNode);
-  bool found = removeSubscribersOnNode(signal, failedApiNode);
+  c_subscriber_nodes.clear(failedApiNode);
+  
+  check_start_handover(signal);
 
-  if(!found){
-    jam();
-    c_failedApiNodes.clear(failedApiNode);
-  }
+  signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
+  signal->theData[1] = failedApiNode;
+  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
+  return;
+
+CONF:
+  signal->theData[0] = failedApiNode;
+  signal->theData[1] = reference();
+  sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+
+  DBUG_VOID_RETURN;
+}//execAPI_FAILREQ()
+
+void
+Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
+{
+  jam();
 
-  SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
   Ptr<Gcp_record> gcp;
-  for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
+  if (c_gcp_list.first(gcp))
   {
     jam();
-    ack->rep.gci_hi = gcp.p->m_gci >> 32;
-    ack->rep.gci_lo = gcp.p->m_gci & 0xFFFFFFFF;
-    if(gcp.p->m_subscribers.get(failedApiNode))
+    gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
+
+    if (gcp.p->m_subscribers.isclear())
     {
       jam();
-      gcp.p->m_subscribers.clear(failedApiNode);
-      ack->rep.senderRef = numberToRef(0, failedApiNode);
-      sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal, 
-		 SubGcpCompleteAck::SignalLength, JBB);
+
+      SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend();
+      ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32);
+      ack->rep.gci_lo = Uint32(gcp.p->m_gci);
+      ack->rep.senderRef = reference();
+      NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
+      sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
+                 SubGcpCompleteAck::SignalLength, JBB);
+
+      c_gcp_list.release(gcp);
+
+      signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
+      signal->theData[1] = nodeId;
+      sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
+      return;
     }
   }
 
-  c_subscriber_nodes.clear(failedApiNode);
-  
-  check_start_handover(signal);
+  if (ERROR_INSERTED(13023))
+  {
+    CLEAR_ERROR_INSERT_VALUE;
+  }
 
-  DBUG_VOID_RETURN;
-}//execAPI_FAILREQ()
+  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
+  signal->theData[1] = nodeId;
+  signal->theData[2] = RNIL; // SubOpPtr
+  signal->theData[3] = RNIL; // c_subscribers bucket
+  signal->theData[4] = RNIL; // subscriptionId
+  signal->theData[5] = RNIL; // SubscriptionKey
 
-bool
-Suma::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
-{
-  DBUG_ENTER("Suma::removeSubscribersOnNode");
-  bool found = false;
+  Ptr<SubOpRecord> subOpPtr;
+  if (c_subOpPool.seize(subOpPtr))
+  {
+    signal->theData[2] = subOpPtr.i;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
+  }
+  else
+  {
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
+
+  return;
+}
 
-  KeyTable<Table>::Iterator it;
-  LINT_INIT(it.bucket);
-  LINT_INIT(it.curr.p);
-  for(c_tables.first(it);!it.isNull();c_tables.next(it))
+void
+Suma::api_fail_subscriber_list(Signal* signal, Uint32 nodeId)
+{
+  jam();
+  Ptr<SubOpRecord> subOpPtr;
+  subOpPtr.i = signal->theData[2];
+  if (subOpPtr.i == RNIL)
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
-    SubscriberPtr i_subbPtr;
-    for(subbs.first(i_subbPtr);!i_subbPtr.isNull();)
+    if (c_subOpPool.seize(subOpPtr))
     {
-      SubscriberPtr subbPtr = i_subbPtr;
-      subbs.next(i_subbPtr);
-      jam();
-      if (refToNode(subbPtr.p->m_senderRef) == nodeId) {
-	jam();
-	subbs.remove(subbPtr);
-	c_removeDataSubscribers.add(subbPtr);
-	found = true;
-      }
+      signal->theData[3] = RNIL;
     }
-    if (subbs.isEmpty())
+    else
     {
-      // ToDo handle this
+      jam();
+      sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+      return;
     }
   }
-  if(found){
+  else
+  {
     jam();
-    sendSubStopReq(signal);
+    c_subOpPool.getPtr(subOpPtr);
   }
-  DBUG_RETURN(found);
-}
 
-void
-Suma::sendSubStopReq(Signal *signal, bool unlock){
-  static bool remove_lock = false;
-  jam();
-  DBUG_ENTER("Suma::sendSubStopReq");
+  Uint32 bucket = signal->theData[3];
+  Uint32 subscriptionId = signal->theData[4];
+  Uint32 subscriptionKey = signal->theData[5];
 
-  SubscriberPtr subbPtr;
-  c_removeDataSubscribers.first(subbPtr);
-  if (subbPtr.isNull()){
+  DLHashTable<Subscription>::Iterator iter;
+  if (bucket == RNIL)
+  {
+    jam();
+    c_subscriptions.first(iter);
+  }
+  else
+  {
     jam();
-#if 0
-    signal->theData[0] = failedApiNode;
-    signal->theData[1] = reference();
-    sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
-#endif
-    c_failedApiNodes.clear();
 
-    remove_lock = false;
-    DBUG_VOID_RETURN;
+    Subscription key;
+    key.m_subscriptionId = subscriptionId;
+    key.m_subscriptionKey = subscriptionKey;
+    if (c_subscriptions.find(iter.curr, key) == false)
+    {
+      jam();
+      /**
+       * We restart from this bucket :-(
+       */
+      c_subscriptions.next(bucket, iter);
+    }
+    else
+    {
+      iter.bucket = bucket;
+    }
   }
 
-  if(remove_lock && !unlock) {
+  if (iter.curr.isNull())
+  {
     jam();
-    DBUG_VOID_RETURN;
+    signal->theData[0] = nodeId;
+    signal->theData[1] = reference();
+    sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+    c_failedApiNodes.clear(nodeId);
+    return;
   }
-  remove_lock = true;
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+  subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
+  subOpPtr.p->m_subPtrI = iter.curr.i;
+  subOpPtr.p->m_senderRef = nodeId;
+  subOpPtr.p->m_senderData = iter.bucket;
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
-  req->senderRef       = reference();
-  req->senderData      = subbPtr.i;
-  req->subscriberRef   = subbPtr.p->m_senderRef;
-  req->subscriberData  = subbPtr.p->m_senderData;
-  req->subscriptionId  = subPtr.p->m_subscriptionId;
-  req->subscriptionKey = subPtr.p->m_subscriptionKey;
-  req->part = SubscriptionData::TableData;
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req);
+  bool empty = list.isEmpty();
+  list.add(subOpPtr);
 
-  sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
-  DBUG_VOID_RETURN;
+  if (empty)
+  {
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
 }
 
 void
-Suma::execSUB_STOP_CONF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_STOP_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  sendSubStopReq(signal,true);
-  DBUG_VOID_RETURN;
-}
+Suma::api_fail_subscription(Signal* signal)
+{
+  jam();
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
 
-void
-Suma::execSUB_STOP_REF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_STOP_REF");
-  ndbassert(signal->getNoOfSections() == 0);
+  Uint32 nodeId = subOpPtr.p->m_senderRef;
+
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
+
+  Ptr<Subscriber> ptr;
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    if (signal->theData[2] == RNIL)
+    {
+      jam();
+      list.first(ptr);
+    }
+    else
+    {
+      jam();
+      list.getPtr(ptr, signal->theData[2]);
+    }
 
-  SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
+    for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
+    {
+      jam();
+      if (refToNode(ptr.p->m_senderRef) == nodeId)
+      {
+        jam();
 
-  Uint32 senderData      = ref->senderData;
-  Uint32 subscriptionId  = ref->subscriptionId;
-  Uint32 subscriptionKey = ref->subscriptionKey;
-  Uint32 part            = ref->part;
-  Uint32 subscriberData  = ref->subscriberData;
-  Uint32 subscriberRef   = ref->subscriberRef;
+        Ptr<Subscriber> tmp = ptr;
+        list.next(ptr);
+        list.remove(tmp);
+        
+        /**
+         * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
+         */
+        bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
 
-  if(ref->errorCode != 1411){
-    ndbrequire(false);
+        send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP,
+                                  report, list);
+        
+        c_subscriberPool.release(tmp);
+      }
+      else
+      {
+        jam();
+        list.next(ptr);
+      }
+    }
   }
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
-  req->senderRef       = reference();
-  req->senderData      = senderData;
-  req->subscriberRef   = subscriberRef;
-  req->subscriberData  = subscriberData;
-  req->subscriptionId  = subscriptionId;
-  req->subscriptionKey = subscriptionKey;
-  req->part = part;
+  if (!ptr.isNull())
+  {
+    jam();
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = ptr.i;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  }
 
-  sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
+  // Start potential waiter(s)
+  check_remove_queue(signal, subPtr, subOpPtr, true, false);
+  check_release_subscription(signal, subPtr);
+
+  // Continue iterating through subscriptions
+  DLHashTable<Subscription>::Iterator iter;
+  iter.bucket = subOpPtr.p->m_senderData;
+  iter.curr = subPtr;
+
+  if (c_subscriptions.next(iter))
+  {
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
+    signal->theData[1] = nodeId;
+    signal->theData[2] = subOpPtr.i;
+    signal->theData[3] = iter.bucket;
+    signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId
+    signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
+    return;
+  }
 
-  DBUG_VOID_RETURN;
+  c_subOpPool.release(subOpPtr);
+  signal->theData[0] = nodeId;
+  signal->theData[1] = reference();
+  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+  c_failedApiNodes.clear(nodeId);
 }
 
 void
@@ -771,9 +993,22 @@ Suma::execNODE_FAILREP(Signal* signal){
   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
   NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
   
-  if(failed.get(Restart.nodeId))
+  if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref)))
   {
-    Restart.resetRestart(signal);
+    jam();
+
+    if (c_restart.m_waiting_on_self)
+    {
+      jam();
+      c_restart.m_abort = 1;
+    }
+    else
+    {
+      jam();
+      Ptr<Subscription> subPtr;
+      c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
+      abort_start_me(signal, subPtr, false);
+    }
   }
 
   if (ERROR_INSERTED(13032))
@@ -854,19 +1089,6 @@ Suma::execSIGNAL_DROPPED_REP(Signal* sig
  *
  */
 
-static unsigned
-count_subscribers(const DLList<Suma::Subscriber> &subs)
-{
-  unsigned n= 0;
-  Suma::SubscriberPtr i_subbPtr;
-  subs.first(i_subbPtr);
-  while(!i_subbPtr.isNull()){
-    n++;
-    subs.next(i_subbPtr);
-  }
-  return n;
-}
-
 void
 Suma::execDUMP_STATE_ORD(Signal* signal){
   jamEntry();
@@ -923,16 +1145,16 @@ Suma::execDUMP_STATE_ORD(Signal* signal)
 	      c_dataBufferPool.getSize(),
 	      c_dataBufferPool.getNoOfFree());
 
-    infoEvent("Suma: c_metaSubscribers count: %d",
-	      count_subscribers(c_metaSubscribers));
+    infoEvent("Suma: c_subOpPool  size: %d free: %d",
+	      c_subOpPool.getSize(),
+	      c_subOpPool.getNoOfFree());
+
 #if 0
     infoEvent("Suma: c_dataSubscribers count: %d",
 	      count_subscribers(c_dataSubscribers));
     infoEvent("Suma: c_prepDataSubscribers count: %d",
 	      count_subscribers(c_prepDataSubscribers));
 #endif
-    infoEvent("Suma: c_removeDataSubscribers count: %d",
-	      count_subscribers(c_removeDataSubscribers));
   }
 
   if(tCase == 8005)
@@ -1016,21 +1238,76 @@ Suma::execDUMP_STATE_ORD(Signal* signal)
         return;
       }
 
-      infoEvent("Table: %u ver: %u #n: %u (ref,data,subscritopn)",
+      infoEvent("Table %u ver %u",
                 it.curr.p->m_tableId,
-                it.curr.p->m_schemaVersion,
-                it.curr.p->n_subscribers);
+                it.curr.p->m_schemaVersion);
 
-      Ptr<Subscriber> ptr;
-      LocalDLList<Subscriber> list(c_subscriberPool, it.curr.p->c_subscribers);
-      for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+      Uint32 cnt = 0;
+      Ptr<Subscription> subPtr;
+      LocalDLList<Subscription> subList(c_subscriptionPool,
+                                        it.curr.p->m_subscriptions);
+      for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
       {
-        jam();
-        infoEvent(" [ %x %u %u ]", 
-                  ptr.p->m_senderRef,
-                  ptr.p->m_senderData,
-                  ptr.p->m_subPtrI);
+        infoEvent(" Subcription %u", subPtr.i);
+        {
+          Ptr<Subscriber> ptr;
+          LocalDLList<Subscriber> list(c_subscriberPool,
+                                       subPtr.p->m_subscribers);
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            cnt++;
+            infoEvent("  Subscriber [ %x %u %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData,
+                      subPtr.i);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                       subPtr.p->m_create_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  create [ %x %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                       subPtr.p->m_start_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  start [ %x %u ]",
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
+
+        {
+          Ptr<SubOpRecord> ptr;
+          LocalDLFifoList<SubOpRecord> list(c_subOpPool,
+                                        subPtr.p->m_stop_req);
+
+          for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
+          {
+            jam();
+            infoEvent("  stop [ %u %x %u ]",
+                      ptr.p->m_opType,
+                      ptr.p->m_senderRef,
+                      ptr.p->m_senderData);
+          }
+        }
       }
+      infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
       c_tables.next(it);
     }
 
@@ -1171,19 +1448,6 @@ Suma::sendSubIdRef(Signal* signal,
  *
  * Creation of subscriptions
  */
-
-void 
-Suma::addTableId(Uint32 tableId,
-			    SubscriptionPtr subPtr, SyncRecord *psyncRec)
-{
-  DBUG_ENTER("Suma::addTableId");
-  DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId, subPtr.i));
-  subPtr.p->m_tableId= tableId;
-  if(psyncRec != NULL)
-    psyncRec->m_tableList.append(&tableId, 1);
-  DBUG_VOID_RETURN;
-}
-
 void
 Suma::execSUB_CREATE_REQ(Signal* signal)
 {
@@ -1194,28 +1458,17 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
 
   const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();    
   
-  const Uint32 subRef  = req.senderRef;
-  const Uint32 subData = req.senderData;
+  const Uint32 senderRef  = req.senderRef;
+  const Uint32 senderData = req.senderData;
   const Uint32 subId   = req.subscriptionId;
   const Uint32 subKey  = req.subscriptionKey;
   const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;
   const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
-  const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
-  const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;
   const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
     Subscription::REPORT_ALL : 0;
   const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
     Subscription::REPORT_SUBSCRIBE : 0;
   const Uint32 tableId = req.tableId;
-  Subscription::State state = (Subscription::State) req.state;
-  if (signal->getLength() != SubCreateReq::SignalLength2)
-  {
-    /*
-      api or restarted by older version
-      if restarted by old version, do the best we can
-    */
-    state = Subscription::DEFINED;
-  }
 
   Subscription key;
   key.m_subscriptionId  = subId;
@@ -1226,83 +1479,201 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
 
   SubscriptionPtr subPtr;
 
-  if (addTableFlag) {
-    ndbrequire(restartFlag);  //TODO remove this
+  bool found = c_subscriptions.find(subPtr, key);
 
-    if(!c_subscriptions.find(subPtr, key)) {
-      jam();
-      sendSubCreateRef(signal, 1407);
-      DBUG_VOID_RETURN;
-    }
+  if (c_startup.m_restart_server_node_id == RNIL)
+  {
+    jam();
+
+    /**
+     * We havent started syncing yet
+     */
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::NF_FakeErrorREF);
+    return;
+  }
+
+  CRASH_INSERTION2(13035, c_startup.m_restart_server_node_id != RNIL);
+  CRASH_INSERTION(13036);
+  
+  bool allowDup = true; //c_startup.m_restart_server_node_id;
+
+  if (found && !allowDup)
+  {
     jam();
-    if (restartFlag)
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::SubscriptionAlreadyExist);
+    return;
+  }
+
+  if (found == false)
+  {
+    jam();
+    if(!c_subscriptions.seize(subPtr))
     {
-      ndbrequire(type != SubCreateReq::SingleTableScan);
-      ndbrequire(req.tableId != subPtr.p->m_tableId);
-      ndbrequire(type != SubCreateReq::TableEvent);
-      addTableId(req.tableId, subPtr, 0);
+      jam();
+      sendSubCreateRef(signal, senderRef, senderData,
+                       SubCreateRef::OutOfSubscriptionRecords);
+      return;
     }
-  } else {
-    if (c_startup.m_restart_server_node_id && 
-        subRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+
+    new (subPtr.p) Subscription();
+    subPtr.p->m_seq_no           = c_current_seq;
+    subPtr.p->m_subscriptionId   = subId;
+    subPtr.p->m_subscriptionKey  = subKey;
+    subPtr.p->m_subscriptionType = type;
+    subPtr.p->m_tableId          = tableId;
+    subPtr.p->m_table_ptrI       = RNIL;
+    subPtr.p->m_state            = Subscription::UNDEFINED;
+    subPtr.p->m_trigger_state    =  Subscription::T_UNDEFINED;
+    subPtr.p->m_triggers[0]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_triggers[1]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_triggers[2]      = ILLEGAL_TRIGGER_ID;
+    subPtr.p->m_errorCode        = 0;
+    subPtr.p->m_options          = reportSubscribe | reportAll;
+  }
+
+  Ptr<SubOpRecord> subOpPtr;
+  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req);
+  if (subOpList.seize(subOpPtr) == false)
+  {
+    jam();
+    if (found == false)
     {
-      /**
-       * only allow "restart_server" Suma's to come through 
-       * for restart purposes
-       */
       jam();
-      sendSubCreateRef(signal, 1415);
-      DBUG_VOID_RETURN;
+      c_subscriptions.release(subPtr);
     }
-    // Check that id/key is unique
-    if(c_subscriptions.find(subPtr, key)) {
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::OutOfTableRecords);
+    return;
+  }
+
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+
+  TablePtr tabPtr;
+  if (found)
+  {
+    jam();
+    c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  }
+  else if (c_tables.find(tabPtr, tableId))
+  {
+    jam();
+  }
+  else
+  {
+    jam();
+    if (c_tablePool.seize(tabPtr) == false)
+    {
       jam();
-      sendSubCreateRef(signal, 1415);
-      DBUG_VOID_RETURN;
+      subOpList.release(subOpPtr);
+      c_subscriptions.release(subPtr);
+      sendSubCreateRef(signal, senderRef, senderData,
+                       SubCreateRef::OutOfTableRecords);
+      return;
     }
-    if(!c_subscriptions.seize(subPtr)) {
+
+    new (tabPtr.p) Table;
+    tabPtr.p->m_tableId= tableId;
+    tabPtr.p->m_ptrI= tabPtr.i;
+    tabPtr.p->m_error = 0;
+    tabPtr.p->m_schemaVersion = RNIL;
+    tabPtr.p->m_state = Table::UNDEFINED;
+    c_tables.add(tabPtr);
+  }
+
+  if (found == false)
+  {
+    jam();
+    c_subscriptions.add(subPtr);
+    LocalDLList<Subscription> list(c_subscriptionPool,
+                                   tabPtr.p->m_subscriptions);
+    list.add(subPtr);
+    subPtr.p->m_table_ptrI = tabPtr.i;
+  }
+
+  switch(tabPtr.p->m_state){
+  case Table::DEFINED:{
+    jam();
+    // Send conf
+    subOpList.release(subOpPtr);
+    subPtr.p->m_state = Subscription::DEFINED;
+    SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+    conf->senderRef  = reference();
+    conf->senderData = senderData;
+    sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
+               SubCreateConf::SignalLength, JBB);
+    return;
+  }
+  case Table::UNDEFINED:{
+    jam();
+    tabPtr.p->m_state = Table::DEFINING;
+    subPtr.p->m_state = Subscription::DEFINING;
+
+    if (ERROR_INSERTED(13031))
+    {
       jam();
-      sendSubCreateRef(signal, 1412);
-      DBUG_VOID_RETURN;
+      CLEAR_ERROR_INSERT_VALUE;
+      GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
+      ref->tableId = tableId;
+      ref->senderData = tabPtr.i;
+      ref->errorCode = GetTabInfoRef::TableNotDefined;
+      sendSignal(reference(), GSN_GET_TABINFOREF, signal,
+                 GetTabInfoRef::SignalLength, JBB);
+      return;
     }
-    DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-		       c_subscriptionPool.getSize(),
-		       c_subscriptionPool.getNoOfFree()));
+
+    GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
+    req->senderRef = reference();
+    req->senderData = tabPtr.i;
+    req->requestType =
+      GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+    req->tableId = tableId;
+
+    sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
+               GetTabInfoReq::SignalLength, JBB);
+    return;
+  }
+  case Table::DEFINING:
+  {
     jam();
-    subPtr.p->m_senderRef        = subRef;
-    subPtr.p->m_senderData       = subData;
-    subPtr.p->m_subscriptionId   = subId;
-    subPtr.p->m_subscriptionKey  = subKey;
-    subPtr.p->m_subscriptionType = type;
-    subPtr.p->m_options          = reportSubscribe | reportAll;
-    subPtr.p->m_tableId          = tableId;
-    subPtr.p->m_table_ptrI       = RNIL;
-    subPtr.p->m_state            = state;
-    subPtr.p->n_subscribers      = 0;
-    subPtr.p->m_current_sync_ptrI = RNIL;
-
-    fprintf(stderr, "table %d options %x\n", subPtr.p->m_tableId, subPtr.p->m_options);
-    DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		       key.m_subscriptionId, key.m_subscriptionKey));
+    /**
+     * just wait for completion
+     */
+    subPtr.p->m_state = Subscription::DEFINING;
+    return;
+  }
+  case Table::DROPPED:
+  {
+    subOpList.release(subOpPtr);
 
-    c_subscriptions.add(subPtr);
+    {
+      LocalDLList<Subscription> list(c_subscriptionPool,
+                                     tabPtr.p->m_subscriptions);
+      list.remove(subPtr);
+    }
+    c_subscriptions.release(subPtr);
+
+    sendSubCreateRef(signal, senderRef, senderData,
+                     SubCreateRef::TableDropped);
+    return;
+  }
   }
 
-  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
-  conf->senderRef  = reference();
-  conf->senderData = subPtr.p->m_senderData;
-  sendSignal(subRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);
-  DBUG_VOID_RETURN;
+  ndbrequire(false);
 }
 
 void
-Suma::sendSubCreateRef(Signal* signal, Uint32 errCode)
+Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data,
+                       Uint32 errCode)
 {
   jam();
   SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
   ref->errorCode  = errCode;
-  sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 
-	     SubCreateRef::SignalLength, JBB);
+  ref->senderData = data;
+  sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
+  	     SubCreateRef::SignalLength, JBB);
   return;
 }
 
@@ -1316,8 +1687,7 @@ void 
 Suma::execSUB_SYNC_REQ(Signal* signal)
 {
   jamEntry();
-  DBUG_ENTER("Suma::execSUB_SYNC_REQ");
-  ndbassert(signal->getNoOfSections() <= 1);
+
   CRASH_INSERTION(13004);
 
   SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
@@ -1327,44 +1697,37 @@ Suma::execSUB_SYNC_REQ(Signal* signal)
   key.m_subscriptionId = req->subscriptionId;
   key.m_subscriptionKey = req->subscriptionKey;
 
-  DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		      key.m_subscriptionId, key.m_subscriptionKey));
-
   if(!c_subscriptions.find(subPtr, key))
   {
     jam();
-    DBUG_PRINT("info",("Not found"));
     sendSubSyncRef(signal, 1407);
-    DBUG_VOID_RETURN;
+    return;
   }
 
   bool ok = false;
   SubscriptionData::Part part = (SubscriptionData::Part)req->part;
   
   Ptr<SyncRecord> syncPtr;
-  if(!c_syncPool.seize(syncPtr))
+  LocalDLList<SyncRecord> list(c_syncPool, subPtr.p->m_syncRecords);
+  if(!list.seize(syncPtr))
   {
     jam();
     sendSubSyncRef(signal, 1416);
-    DBUG_VOID_RETURN;
+    return;
   }
-  DBUG_PRINT("info",("c_syncPool  size: %d free: %d",
-		     c_syncPool.getSize(),
-		     c_syncPool.getNoOfFree()));
-
+  
+  new (syncPtr.p) Ptr<SyncRecord>;
   syncPtr.p->m_senderRef        = req->senderRef;
   syncPtr.p->m_senderData       = req->senderData;
   syncPtr.p->m_subscriptionPtrI = subPtr.i;
   syncPtr.p->ptrI               = syncPtr.i;
   syncPtr.p->m_error            = 0;
-
-  subPtr.p->m_current_sync_ptrI = syncPtr.i;
-
+  
   {
     jam();
     syncPtr.p->m_tableList.append(&subPtr.p->m_tableId, 1);
     if(signal->getNoOfSections() > 0){
-      SegmentedSectionPtr ptr(0,0,0);
+      SegmentedSectionPtr ptr;
       signal->getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
       LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
       append(attrBuf, ptr, getSectionSegmentPool());
@@ -1372,55 +1735,7 @@ Suma::execSUB_SYNC_REQ(Signal* signal)
     }
   }
 
-  TablePtr tabPtr;
-  initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr);
-  tabPtr.p->n_subscribers++;
-  if (subPtr.p->m_options & Subscription::REPORT_ALL)
-    tabPtr.p->m_reportAll = true;
-  DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		     tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-  DBUG_VOID_RETURN;
-
-  switch(part){
-  case SubscriptionData::MetaData:
-    ndbrequire(false);
-#if 0
-    ok = true;
-    jam();
-    if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
-      TableList::DataBufferIterator it;
-      syncPtr.p->m_tableList.first(it);
-      if(it.isNull()) {
-	/**
-	 * Get all tables from dict
-	 */
-	ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
-	req->senderRef   = reference();
-	req->senderData  = syncPtr.i;
-	req->requestData = 0;
-	/**
-	 * @todo: accomodate scan of index tables?
-	 */
-	req->setTableType(DictTabInfo::UserTable);
-
-	sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 
-		   ListTablesReq::SignalLength, JBB);
-	break;
-      }
-    }
-
-    syncPtr.p->startMeta(signal);
-#endif
-    break;
-  case SubscriptionData::TableData: {
-    ok = true;
-    jam();
-    syncPtr.p->startScan(signal);
-    break;
-  }
-  }
-  ndbrequire(ok);
-  DBUG_VOID_RETURN;
+  syncPtr.p->startScan(signal);
 }
 
 void
@@ -1441,315 +1756,10 @@ Suma::sendSubSyncRef(Signal* signal, Uin
  * Dict interface
  */
 
-#if 0
-void
-Suma::execLIST_TABLES_CONF(Signal* signal){
-  jamEntry();
-  CRASH_INSERTION(13005);
-  ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
-  SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
-  tmp->runLIST_TABLES_CONF(signal);
-}
-#endif
-
-
 /*************************************************************************
  *
  *
  */
-#if 0
-void
-Suma::Table::runLIST_TABLES_CONF(Signal* signal){
-  jam();
-
-  ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
-  const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
-
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-
-  for (unsigned i = 0; i < len; i++) {
-    subPtr.p->m_maxTables++;
-    suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
-  }
-
-  //  for (unsigned i = 0; i < len; i++)
-  //    conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
-  //  m_tableList.append(&conf->tableData[0], len);
-
-#if 0 
-  TableList::DataBufferIterator it;
-  int i = 0;
-  for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
-    ndbout_c("%u listtableconf tableid %d", i++, *it.data);
-  }
-#endif
-
-  if(len == ListTablesConf::DataLength){
-    jam();
-    // we expect more LIST_TABLE_CONF
-    return;
-  }
-
-#if 0
-  subPtr.p->m_currentTable = 0;
-  subPtr.p->m_maxTables    = 0;
-
-  TableList::DataBufferIterator it;
-  for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
-    subPtr.p->m_maxTables++;
-    suma.addTableId(*it.data, subPtr, NULL);
-#ifdef NODEFAIL_DEBUG
-    ndbout_c(" listtableconf tableid %d",*it.data);
-#endif
-  }
-#endif
-  
-  startMeta(signal);
-}
-#endif
-
-
-int 
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
-			   SubscriberPtr subbPtr)
-{
-  DBUG_ENTER("Suma::initTable SubscriberPtr");
-  DBUG_PRINT("enter",("tableId: %d", tableId));
-
-  int r= initTable(signal,tableId,tabPtr);
-
-  {
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    subscribers.add(subbPtr);
-  }
-
-  DBUG_PRINT("info",("added subscriber: %i", subbPtr.i));
-  
-  if (r)
-  {
-    jam();
-    // we have to wait getting tab info
-    DBUG_RETURN(1);
-  }
-
-  if (tabPtr.p->setupTrigger(signal, *this))
-  {
-    jam();
-    // we have to wait for triggers to be setup
-    DBUG_RETURN(1);
-  }
-
-  int ret = completeOneSubscriber(signal, tabPtr, subbPtr);
-  if (ret == -1)
-  {
-    jam();
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    subscribers.release(subbPtr);
-  }
-  completeInitTable(signal, tabPtr);
-  DBUG_RETURN(0);
-}
-
-int 
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
-			   Ptr<SyncRecord> syncPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::initTable Ptr<SyncRecord>");
-  DBUG_PRINT("enter",("tableId: %d", tableId));
-
-  int r= initTable(signal,tableId,tabPtr);
-
-  {
-    LocalDLList<SyncRecord> syncRecords(c_syncPool,tabPtr.p->c_syncRecords);
-    syncRecords.add(syncPtr);
-  }
-
-  if (r)
-  {
-    // we have to wait getting tab info
-    DBUG_RETURN(1);
-  }
-  completeInitTable(signal, tabPtr);
-  DBUG_RETURN(0);
-}
-
-int
-Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::initTable");
-
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
-  {
-    // table not being prepared
-    // seize a new table, initialize and add to c_tables
-    ndbrequire(c_tablePool.seize(tabPtr));
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       c_tablePool.getSize(),
-		       c_tablePool.getNoOfFree()));
-    new (tabPtr.p) Table;
-
-    tabPtr.p->m_tableId= tableId;
-    tabPtr.p->m_ptrI= tabPtr.i;
-    tabPtr.p->n_subscribers = 0;
-    DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.i, tabPtr.p->n_subscribers));
-
-    tabPtr.p->m_reportAll = false;
-
-    tabPtr.p->m_error         = 0;
-    tabPtr.p->m_schemaVersion = RNIL;
-    tabPtr.p->m_state = Table::DEFINING;
-    tabPtr.p->m_drop_subbPtr.p = 0;
-    for (int j= 0; j < 3; j++)
-    {
-      tabPtr.p->m_hasTriggerDefined[j] = 0;
-      tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
-      tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
-    }
-
-    c_tables.add(tabPtr);
-
-    GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
-    req->senderRef = reference();
-    req->senderData = tabPtr.i;
-    req->requestType = 
-      GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
-    req->tableId = tableId;
-
-    DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId));
-
-    if (ERROR_INSERTED(13031))
-    {
-      jam();
-      CLEAR_ERROR_INSERT_VALUE;
-      GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
-      ref->tableId = tableId;
-      ref->senderData = tabPtr.i;
-      ref->errorCode = GetTabInfoRef::TableNotDefined;
-      sendSignal(reference(), GSN_GET_TABINFOREF, signal, 
-		 GetTabInfoRef::SignalLength, JBB);
-      DBUG_RETURN(1);
-    }
-
-    sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
-	       GetTabInfoReq::SignalLength, JBB);
-    DBUG_RETURN(1);
-  }
-  if (tabPtr.p->m_state == Table::DEFINING)
-  {
-    DBUG_RETURN(1);
-  }
-  // ToDo should be a ref signal instead
-  ndbrequire(tabPtr.p->m_state == Table::DEFINED);
-  DBUG_RETURN(0);
-}
-
-int
-Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeOneSubscriber");
-
-  if (tabPtr.p->m_error &&
-      (c_startup.m_restart_server_node_id == 0 ||
-       tabPtr.p->m_state != Table::DROPPED))
-  {
-    jam();
-    sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
-		    SubscriptionData::TableData);
-    tabPtr.p->n_subscribers--;
-    DBUG_RETURN(-1);
-  }
-  else
-  {
-    jam();
-    SubscriptionPtr subPtr;
-    c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-    subPtr.p->m_table_ptrI= tabPtr.i;
-    sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
-			 SubscriptionData::TableData);
-  }
-  DBUG_RETURN(0);
-}
-
-void
-Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeAllSubscribers");
-  // handle all subscribers
-  {
-    LocalDLList<Subscriber> subscribers(c_subscriberPool,
-					tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subscribers.first(subbPtr); !subbPtr.isNull();)
-    {
-      jam();
-      Ptr<Subscriber> tmp = subbPtr;
-      subscribers.next(subbPtr);
-      int ret = completeOneSubscriber(signal, tabPtr, tmp);
-      if (ret == -1)
-      {
-	jam();
-	subscribers.release(tmp);
-      }
-    }
-  }
-  DBUG_VOID_RETURN;
-}
-
-void
-Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
-{
-  jam();
-  DBUG_ENTER("Suma::completeInitTable");
-
-  // handle all syncRecords
-  while (!tabPtr.p->c_syncRecords.isEmpty())
-  {
-    Ptr<SyncRecord> syncPtr;
-    {
-      LocalDLList<SyncRecord> syncRecords(c_syncPool,
-					tabPtr.p->c_syncRecords);
-      syncRecords.first(syncPtr);
-      syncRecords.remove(syncPtr);
-    }
-    syncPtr.p->ptrI = syncPtr.i;
-    if (tabPtr.p->m_error == 0)
-    {
-      jam();
-      syncPtr.p->startScan(signal);
-    }
-    else
-    {
-      jam();
-      syncPtr.p->completeScan(signal, tabPtr.p->m_error);
-      tabPtr.p->n_subscribers--;
-    }
-  }
-  
-  if (tabPtr.p->m_error)
-  {
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(*this);
-  }
-  else
-  {
-    tabPtr.p->m_state = Table::DEFINED;
-  }
-
-  DBUG_VOID_RETURN;
-}
-
-
 void
 Suma::execGET_TABINFOREF(Signal* signal){
   jamEntry();
@@ -1774,26 +1784,48 @@ Suma::execGET_TABINFOREF(Signal* signal)
     break;
   case GetTabInfoRef::TableNameTooLong:
     ndbrequire(false);
-    break;
-  case GetTabInfoRef::NoFetchByName:
-    break;
   }
   if (do_resend_request)
   {
     GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
     req->senderRef = reference();
     req->senderData = senderData;
-    req->requestType = 
+    req->requestType =
       GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
     req->tableId = tableId;
     sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
                         30, GetTabInfoReq::SignalLength);
     return;
   }
-  tabPtr.p->m_state = Table::DROPPED;
-  tabPtr.p->m_error = errorCode;
-  completeAllSubscribers(signal, tabPtr);
-  completeInitTable(signal, tabPtr);
+
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+  Ptr<Subscription> subPtr;
+  bool empty = subList.isEmpty();
+  for(subList.first(subPtr); !subPtr.isNull();)
+  {
+    jam();
+    Ptr<SubOpRecord> ptr;
+    LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
+    for (list.first(ptr); !ptr.isNull(); )
+    {
+      jam();
+      sendSubCreateRef(signal,
+                       ptr.p->m_senderRef,
+                       ptr.p->m_senderData,
+                       SubCreateRef::TableDropped);
+
+      Ptr<SubOpRecord> tmp0 = ptr;
+      list.next(ptr);
+      list.release(tmp0);
+    }
+    Ptr<Subscription> tmp1 = subPtr;
+    subList.next(subPtr);
+    subList.release(tmp1);
+  }
+
+  c_tables.release(tabPtr);
+  ndbassert(!empty);
 }
 
 void
@@ -1805,25 +1837,24 @@ Suma::execGET_TABINFO_CONF(Signal* signa
   if(!assembleFragments(signal)){
     return;
   }
-  
+
   GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
   Uint32 tableId = conf->tableId;
   TablePtr tabPtr;
   c_tablePool.getPtr(tabPtr, conf->senderData);
-  SegmentedSectionPtr ptr(0,0,0);
+  SegmentedSectionPtr ptr;
   signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
   ndbrequire(tabPtr.p->parseTable(ptr, *this));
   releaseSections(signal);
+
   /**
    * We need to gather fragment info
    */
   jam();
-  DihFragCountReq* req = (DihFragCountReq*)signal->getDataPtrSend();
-  req->m_connectionData = RNIL;
-  req->m_tableRef = tableId;
-  req->m_senderData = tabPtr.i;
-  sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 
-             DihFragCountReq::SignalLength, JBB);
+  signal->theData[0] = RNIL;
+  signal->theData[1] = tableId;
+  signal->theData[2] = tabPtr.i;
+  sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
 }
 
 bool
@@ -1844,59 +1875,12 @@ Suma::Table::parseTable(SegmentedSection
   jam();
   suma.suma_ndbrequire(s == SimpleProperties::Break);
 
-#if 0
-  //ToDo handle this
-  if(table_version_major(m_schemaVersion) !=
-     table_version_major(tableDesc.TableVersion)){
-    jam();
-
-    release(* this);
-
-    // oops wrong schema version in stored tabledesc
-    // we need to find all subscriptions with old table desc
-    // and all subscribers to this
-    // hopefully none
-    c_tables.release(tabPtr);
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       suma.c_tablePool.getSize(),
-		       suma.c_tablePool.getNoOfFree()));
-    tabPtr.setNull();
-    DLHashTable<Suma::Subscription>::Iterator i_subPtr;
-    c_subscriptions.first(i_subPtr);
-    SubscriptionPtr subPtr;
-    for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
-      jam();
-      c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
-      SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
-      if (tmp == syncPtr_p) {
-	jam();
-	continue;
-      }
-      if (subPtr.p->m_tables.get(tableId)) {
-	jam();
-	subPtr.p->m_tables.clear(tableId); // remove this old table reference
-	TableList::DataBufferIterator it;
-	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
-	  jam();
-	  if (*it.data == tableId){
-	    jam();
-	    Uint32 *pdata = it.data;
-	    tmp->m_tableList.next(it);
-	    for(;!it.isNull();tmp->m_tableList.next(it)) {
-	      jam();
-	      *pdata = *it.data;
-	      pdata = it.data;
-	    }
-	    *pdata = RNIL; // todo remove this last item...
-	    break;
-	  }
-	}
-      }
-    }
-  }
-#endif
-
+  /**
+   * Initialize table object
+   */
   m_noOfAttributes = tableDesc.NoOfAttributes;
+  m_schemaVersion = tableDesc.TableVersion;
+  
   DBUG_RETURN(true);
 }
 
@@ -1999,19 +1983,40 @@ Suma::execDIGETPRIMCONF(Signal* signal){
   const Uint32 nextFrag = fragNo + 1;
   if(nextFrag == tabPtr.p->m_fragCount)
   {
-    /**
-     * Complete frag info for table
-     * table is not up to date
-     */
+    jam();
+    tabPtr.p->m_state = Table::DEFINED;
 
-    if (tabPtr.p->c_subscribers.isEmpty())
+    LocalDLList<Subscription> subList(c_subscriptionPool,
+                                      tabPtr.p->m_subscriptions);
+    Ptr<Subscription> subPtr;
+    bool empty = subList.isEmpty();
+    for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
     {
-      completeInitTable(signal,tabPtr);
-      DBUG_VOID_RETURN;
+      jam();
+      subPtr.p->m_state = Subscription::DEFINED;
+
+      Ptr<SubOpRecord> ptr;
+      LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
+      for (list.first(ptr); !ptr.isNull();)
+      {
+        jam();
+        SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+        conf->senderRef  = reference();
+        conf->senderData = ptr.p->m_senderData;
+        sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
+                   SubCreateConf::SignalLength, JBB);
+
+        Ptr<SubOpRecord> tmp = ptr;
+        list.next(ptr);
+        list.release(tmp);
+      }
     }
-    tabPtr.p->setupTrigger(signal, *this);
-    DBUG_VOID_RETURN;
+
+    ndbassert(!empty);
+
+    return;
   }
+
   signal->theData[0] = RNIL;
   signal->theData[1] = tabPtr.i;
   signal->theData[2] = tableId;
@@ -2021,38 +2026,6 @@ Suma::execDIGETPRIMCONF(Signal* signal){
   DBUG_VOID_RETURN;
 }
 
-#if 0
-void
-Suma::SyncRecord::completeTableInit(Signal* signal)
-{
-  jam();
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-  
-#if PRINT_ONLY
-  ndbout_c("GSN_SUB_SYNC_CONF (meta)");
-#else
- 
-  suma.releaseSections(signal);
-
-  if (m_error) {
-    SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
-    ref->senderRef = suma.reference();
-    ref->senderData = subPtr.p->m_senderData;
-    ref->errorCode = SubSyncRef::Undefined;
-    suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_REF, signal,
-		    SubSyncRef::SignalLength, JBB);
-  } else {
-    SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
-    conf->senderRef = suma.reference();
-    conf->senderData = subPtr.p->m_senderData;
-    suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_CONF, signal,
-		    SubSyncConf::SignalLength, JBB);
-  }
-#endif
-}
-#endif
-
 /**********************************************************
  *
  * Scan interface
@@ -2076,7 +2049,7 @@ Suma::SyncRecord::startScan(Signal* sign
 
 bool
 Suma::SyncRecord::getNextFragment(TablePtr * tab, 
-					     FragmentDescriptor * fd)
+                                  FragmentDescriptor * fd)
 {
   jam();
   SubscriptionPtr subPtr;
@@ -2103,11 +2076,6 @@ Suma::SyncRecord::getNextFragment(TableP
       }
     }
     m_currentFragment = 0;
-
-    tabPtr.p->n_subscribers--;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(suma);
   }
   return false;
 }
@@ -2221,6 +2189,7 @@ Suma::execSCAN_FRAGCONF(Signal* signal){
     SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
     req->subscriberData = syncPtr.p->m_senderData;
     req->noOfRowsSent = completedOps;
+    req->senderData = senderData;
     sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
 	       SubSyncContinueReq::SignalLength, JBB);
 #endif
@@ -2248,11 +2217,12 @@ Suma::execSUB_SYNC_CONTINUE_CONF(Signal*
   Subscription key; 
   key.m_subscriptionId = conf->subscriptionId;
   key.m_subscriptionKey = conf->subscriptionKey;
-  
+  Uint32 syncPtrI = conf->senderData;
+
   ndbrequire(c_subscriptions.find(subPtr, key));
 
   ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
-  req->senderData = subPtr.p->m_current_sync_ptrI;
+  req->senderData = syncPtrI;
   req->closeFlag = 0;
   req->transId1 = 0;
   req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
@@ -2267,7 +2237,6 @@ Suma::SyncRecord::completeScan(Signal* s
 {
   jam();
   DBUG_ENTER("Suma::SyncRecord::completeScan");
-  //  m_tableList.release();
 
 #if PRINT_ONLY
   ndbout_c("GSN_SUB_SYNC_CONF (data)");
@@ -2291,16 +2260,14 @@ Suma::SyncRecord::completeScan(Signal* s
 #endif
 
   release();
+  SubscriptionPtr subPtr;
+  suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI);
+  LocalDLList<SyncRecord> list(suma.c_syncPool, subPtr.p->m_syncRecords);
+  Ptr<SyncRecord> tmp;
+  tmp.i = ptrI;
+  tmp.p = this;
+  list.release(tmp);
   
-  Ptr<Subscription> subPtr;
-  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-  ndbrequire(subPtr.p->m_current_sync_ptrI == ptrI);
-  subPtr.p->m_current_sync_ptrI = RNIL;
-
-  suma.c_syncPool.release(ptrI);
-  DBUG_PRINT("info",("c_syncPool  size: %d free: %d",
-		     suma.c_syncPool.getSize(),
-		     suma.c_syncPool.getNoOfFree()));
   DBUG_VOID_RETURN;
 }
 
@@ -2344,878 +2311,887 @@ Suma::execSUB_START_REQ(Signal* signal){
   key.m_subscriptionId        = req->subscriptionId;
   key.m_subscriptionKey       = req->subscriptionKey;
 
-  if (c_startup.m_restart_server_node_id && 
-      senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+  SubscriptionPtr subPtr;
+
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
+    jam();
+
     /**
-     * only allow "restart_server" Suma's to come through 
-     * for restart purposes
+     * We havent started syncing yet
      */
-    jam();
-    Uint32 err = c_startup.m_restart_server_node_id != RNIL ? 1405 : 
-      SubStartRef::NF_FakeErrorREF;
-    
-    sendSubStartRef(signal, err);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::NF_FakeErrorREF);
+    return;
   }
+
+  CRASH_INSERTION2(13037, c_startup.m_restart_server_node_id != 0);
   
-  SubscriptionPtr subPtr;
-  if(!c_subscriptions.find(subPtr, key)){
+  bool found = c_subscriptions.find(subPtr, key);
+  if (!found)
+  {
     jam();
-    sendSubStartRef(signal, 1407);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::NoSuchSubscription);
+    return;
   }
   
-  if (subPtr.p->m_state == Subscription::LOCKED) {
+  switch(subPtr.p->m_state){
+  case Subscription::DROPPED:
     jam();
-    DBUG_PRINT("info",("Locked"));
-    sendSubStartRef(signal, 1411);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::Dropped);
+    return;
+  case Subscription::DEFINING:
+    jam();
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::Defining);
+    return;
+  case Subscription::DEFINED:
+    break;
   }
 
-  if (subPtr.p->m_state == Subscription::DROPPED &&
-      c_startup.m_restart_server_node_id == 0) {
+  if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
+  {
     jam();
-    DBUG_PRINT("info",("Dropped"));
-    sendSubStartRef(signal, 1418);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, subPtr.p->m_errorCode);
+    return;
   }
-
-  ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
-             c_startup.m_restart_server_node_id);
-
+  
   SubscriberPtr subbPtr;
-  if(!c_subscriberPool.seize(subbPtr)){
+  if(!c_subscriberPool.seize(subbPtr))
+  {
     jam();
-    sendSubStartRef(signal, 1412);
-    DBUG_VOID_RETURN;
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
+    return;
   }
 
-  if (c_startup.m_restart_server_node_id == 0 && 
-      !c_connected_nodes.get(refToNode(subscriberRef)))
-    
+  Ptr<SubOpRecord> subOpPtr;
+  if (!c_subOpPool.seize(subOpPtr))
   {
     jam();
     c_subscriberPool.release(subbPtr);
-    sendSubStartRef(signal, SubStartRef::PartiallyConnected);
+    sendSubStartRef(signal,
+                    senderRef, senderData, SubStartRef::OutOfSubOpRecords);
     return;
   }
   
-  DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
-		     c_subscriberPool.getSize(),
-		     c_subscriberPool.getNoOfFree()));
-
-  c_subscriber_nodes.set(refToNode(subscriberRef));
-
-  // setup subscription record
-  if (subPtr.p->m_state == Subscription::DEFINED)
-    subPtr.p->m_state = Subscription::LOCKED;
-  // store these here for later use
-  subPtr.p->m_senderRef  = senderRef;
-  subPtr.p->m_senderData = senderData;
-
   // setup subscriber record
   subbPtr.p->m_senderRef  = subscriberRef;
   subbPtr.p->m_senderData = subscriberData;
-  subbPtr.p->m_subPtrI= subPtr.i;
 
-  DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
-		     "tableId: %u id: %u key: %u",
-		     subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
-		     subPtr.i,  subPtr.p->m_senderRef,  subPtr.p->m_senderData,
-		     subPtr.p->m_tableId,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+  subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
+  subOpPtr.p->m_subPtrI = subPtr.i;
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+  subOpPtr.p->m_subscriberRef = subbPtr.i;
 
-  TablePtr tabPtr;
-  switch(part){
-  case SubscriptionData::MetaData:
-    jam();
-    c_metaSubscribers.add(subbPtr);
-    sendSubStartComplete(signal, subbPtr, 0, part);
-    DBUG_VOID_RETURN;
-  case SubscriptionData::TableData: 
-    jam();
-    initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr);
-    tabPtr.p->n_subscribers++;
-    if (subPtr.p->m_options & Subscription::REPORT_ALL)
-      tabPtr.p->m_reportAll = true;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    DBUG_VOID_RETURN;
+  {
+    LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
+    subOpList.add(subOpPtr);
   }
-  ndbrequire(false);
-}
 
-void
-Suma::sendSubStartComplete(Signal* signal,
-			   SubscriberPtr subbPtr, 
-			   Uint64 firstGCI,
-			   SubscriptionData::Part part)
-{
-  jam();
-  DBUG_ENTER("Suma::sendSubStartComplete");
+  /**
+   * Check triggers
+   */
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
+    jam();
+    /**
+     * create triggers
+     */
+    create_triggers(signal, subPtr);
+    break;
+  case Subscription::T_CREATING:
+    jam();
+    /**
+     * Triggers are already being created...wait for completion
+     */
+    return;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     * Trigger(s) are being dropped...wait for completion
+     *   (and recreate them when done)
+     */
+    break;
+  case Subscription::T_DEFINED:{
+    jam();
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
-             (subPtr.p->m_state == Subscription::DROPPED &&
-              c_startup.m_restart_server_node_id));
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
+    report_sub_start_conf(signal, subPtr);
+    return;
+  }
+  case Subscription::T_ERROR:
     jam();
-    subPtr.p->m_state = Subscription::DEFINED;
+    ndbrequire(false); // Checked above
+    break;
   }
-  subPtr.p->n_subscribers++;
-
-  DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
-		     "tableId: %u[i=%u] id: %u key: %u",
-		     subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
-		     subPtr.i,  subPtr.p->m_senderRef,  subPtr.p->m_senderData,
-		     subPtr.p->m_tableId, subPtr.p->m_table_ptrI,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
-
-  SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
-  
-  conf->senderRef       = reference();
-  conf->senderData      = subPtr.p->m_senderData;
-  conf->subscriptionId  = subPtr.p->m_subscriptionId;
-  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
-  conf->firstGCI        = firstGCI;
-  conf->part            = (Uint32) part;
-
-  DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr.i,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
-  sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
-	     SubStartConf::SignalLength, JBB);
-
-  reportAllSubscribers(signal, NdbDictionary::Event::_TE_SUBSCRIBE,
-                       subPtr, subbPtr);
-
-  DBUG_VOID_RETURN;
 }
 
 void
-Suma::sendSubStartRef(Signal* signal, Uint32 errCode)
+Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
 {
   jam();
   SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
   ref->senderRef = reference();
-  ref->errorCode = errCode;
+  ref->senderData = data;
+  ref->errorCode = err;
   releaseSections(signal);
-  sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, 
+  sendSignal(dstref, GSN_SUB_START_REF, signal,
 	     SubStartRef::SignalLength, JBB);
 }
+
 void
-Suma::sendSubStartRef(Signal* signal,
-				 SubscriberPtr subbPtr, Uint32 error,
-				 SubscriptionData::Part part)
+Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr)
 {
   jam();
 
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
+  subPtr.p->m_trigger_state = Subscription::T_CREATING;
 
-  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
-             (subPtr.p->m_state == Subscription::DROPPED &&
-              c_startup.m_restart_server_node_id));
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
-    jam();
-    subPtr.p->m_state = Subscription::DEFINED;
-  }
+  TablePtr tabPtr;
+  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-  SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
-  ref->senderRef        = reference();
-  ref->senderData       = subPtr.p->m_senderData;
-  ref->subscriptionId   = subPtr.p->m_subscriptionId;
-  ref->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  ref->part             = (Uint32) part;
-  ref->errorCode        = error;
+  AttributeMask attrMask;
+  tabPtr.p->createAttributeMask(attrMask, *this);
 
-  sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_REF, signal, 
-	     SubStartRef::SignalLength, JBB);
-}
+  subPtr.p->m_outstanding_trigger = 3;
+  for(Uint32 j = 0; j<3; j++)
+  {
+    Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i;
+    ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
 
-/**********************************************************
- * Suma participant interface
- *
- * Stopping and removing of subscriber
- *
- */
+    CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+    req->setUserRef(SUMA_REF);
+    req->setConnectionPtr(subPtr.i);
+    req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+    req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
+    req->setMonitorReplicas(true);
+    req->setMonitorAllAttributes(true);
+    req->setReceiverRef(SUMA_REF);
+    req->setTriggerId(triggerId);
+    req->setTriggerEvent((TriggerEvent::Value)j);
+    req->setTableId(tabPtr.p->m_tableId);
+    req->setAttributeMask(attrMask);
+    req->setReportAllMonitoredAttributes(subPtr.p->m_options & Subscription::REPORT_ALL);
+    sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+               signal, CreateTrigReq::SignalLength, JBB);
+  }
+}
 
 void
-Suma::execSUB_STOP_REQ(Signal* signal){
+Suma::execCREATE_TRIG_CONF(Signal* signal)
+{
   jamEntry();
-  ndbassert(signal->getNoOfSections() == 0);
-  DBUG_ENTER("Suma::execSUB_STOP_REQ");
-  
-  CRASH_INSERTION(13019);
 
-  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
-  Uint32 senderRef      = req->senderRef;
-  Uint32 senderData     = req->senderData;
-  Uint32 subscriberRef  = req->subscriberRef;
-  Uint32 subscriberData = req->subscriberData;
+  CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtr();
+  const Uint32 triggerId = conf->getTriggerId();
+  Uint32 type = (triggerId >> 16) & 0x3;
+  Uint32 tableId = conf->getTableId();
+
+  TablePtr tabPtr;
   SubscriptionPtr subPtr;
-  Subscription key; 
-  key.m_subscriptionId  = req->subscriptionId;
-  key.m_subscriptionKey = req->subscriptionKey;
-  Uint32 part = req->part;
-  
-  if (key.m_subscriptionKey == 0 &&
-      key.m_subscriptionId == 0 &&
-      subscriberData == 0)
-  {
-    SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
-    
-    conf->senderRef       = reference();
-    conf->senderData      = senderData;
-    conf->subscriptionId  = key.m_subscriptionId;
-    conf->subscriptionKey = key.m_subscriptionKey;
-    conf->subscriberData  = subscriberData;
+  c_subscriptions.getPtr(subPtr, conf->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-    sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
-	       SubStopConf::SignalLength, JBB);
+  ndbrequire(tabPtr.p->m_tableId == conf->getTableId());
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
 
-    removeSubscribersOnNode(signal, refToNode(senderRef));
-    DBUG_VOID_RETURN;
-  }
+  ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = triggerId;
+
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
 
-  if (c_startup.m_restart_server_node_id && 
-      senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
+  if (subPtr.p->m_outstanding_trigger)
   {
+    jam();
     /**
-     * only allow "restart_server" Suma's to come through 
-     * for restart purposes
+     * Wait for more
      */
-    jam();
-    Uint32 err = c_startup.m_restart_server_node_id != RNIL ? 1405 : 
-      SubStopRef::NF_FakeErrorREF;
-    
-    sendSubStopRef(signal, err);
-    DBUG_VOID_RETURN;
+    return;
   }
 
-  if(!c_subscriptions.find(subPtr, key)){
+  if (subPtr.p->m_errorCode == 0)
+  {
     jam();
-    DBUG_PRINT("error", ("not found"));
-    sendSubStopRef(signal, 1407);
-    DBUG_VOID_RETURN;
+    subPtr.p->m_trigger_state = Subscription::T_DEFINED;
+    report_sub_start_conf(signal, subPtr);
   }
-  
-  if (subPtr.p->m_state == Subscription::LOCKED) {
+  else
+  {
     jam();
-    DBUG_PRINT("error", ("locked"));
-    sendSubStopRef(signal, 1411);
-    DBUG_VOID_RETURN;
+    subPtr.p->m_trigger_state = Subscription::T_ERROR;
+    drop_triggers(signal, subPtr);
   }
+}
 
-  ndbrequire(part == SubscriptionData::TableData);
+void
+Suma::execCREATE_TRIG_REF(Signal* signal)
+{
+  jamEntry();
+
+  CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
+  const Uint32 triggerId = ref->getTriggerId();
+  Uint32 type = (triggerId >> 16) & 0x3;
+  Uint32 tableId = ref->getTableId();
 
   TablePtr tabPtr;
-  tabPtr.i = subPtr.p->m_table_ptrI;
-  if (tabPtr.i == RNIL ||
-      !(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
-      tabPtr.p->m_tableId != subPtr.p->m_tableId)
-  {
-    jam();
-    DBUG_PRINT("error", ("no such table id %u[i=%u]",
-			 subPtr.p->m_tableId, subPtr.p->m_table_ptrI));
-    sendSubStopRef(signal, 1417);
-    DBUG_VOID_RETURN;
-  }
+  SubscriptionPtr subPtr;
+  c_subscriptions.getPtr(subPtr, ref->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-  if (tabPtr.p->m_drop_subbPtr.p != 0) {
-    jam();
-    DBUG_PRINT("error", ("table locked"));
-    sendSubStopRef(signal, 1420);
-    DBUG_VOID_RETURN;
-  }
+  ndbrequire(tabPtr.p->m_tableId == ref->getTableId());
+  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
 
-  DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u",
-		     subPtr.i, subPtr.p->m_tableId, tabPtr.i,
-		     subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+  ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
 
-  SubscriberPtr subbPtr;
-  if (senderRef == reference()){
-    jam();
-    c_subscriberPool.getPtr(subbPtr, senderData);
-    ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && 
-	       subbPtr.p->m_senderRef == subscriberRef &&
-	       subbPtr.p->m_senderData == subscriberData);
-    c_removeDataSubscribers.remove(subbPtr);
-  }
-  else
+  subPtr.p->m_errorCode = ref->getErrorCode();
+
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
+
+  if (subPtr.p->m_outstanding_trigger)
   {
     jam();
-    LocalDLList<Subscriber>
-      subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
-
-    DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d",
-		       subPtr.i, subscriberRef, subscriberData));
-    for (subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr))
-    {
-      jam();
-      DBUG_PRINT("info",
-		 ("search: subscription: %u, ref: %u, data: %u, subscriber %u", 
-		  subbPtr.p->m_subPtrI, subbPtr.p->m_senderRef,
-		  subbPtr.p->m_senderData, subbPtr.i));
-      if (subbPtr.p->m_subPtrI == subPtr.i &&
-	  subbPtr.p->m_senderRef == subscriberRef &&
-	  subbPtr.p->m_senderData == subscriberData)
-      {
-	jam();
-	DBUG_PRINT("info",("found"));
-	break;
-      }
-    }
     /**
-     * If we didn't find anyone, send ref
+     * Wait for more
      */
-    if (subbPtr.isNull()) {
-      jam();
-      DBUG_PRINT("error", ("subscriber not found"));
-      sendSubStopRef(signal, 1407);
-      DBUG_VOID_RETURN;
-    }
-    subscribers.remove(subbPtr);
+    return;
   }
 
-  subPtr.p->m_senderRef  = senderRef; // store ref to requestor
-  subPtr.p->m_senderData = senderData; // store ref to requestor
-
-  tabPtr.p->m_drop_subbPtr = subbPtr;
-
-  if (subPtr.p->m_state == Subscription::DEFINED)
+  for (Uint32 i = 0; i<3; i++)
   {
     jam();
-    subPtr.p->m_state = Subscription::LOCKED;
+    if (subPtr.p->m_triggers[i] == ILLEGAL_TRIGGER_ID)
+    {
+      jam();
+      /**
+       * Wait for more
+       */
+      return;
+    }
   }
 
-  if (tabPtr.p->m_state == Table::DROPPED)
-    // not ALTERED here since trigger must be removed
-  {
-    jam();
-    tabPtr.p->n_subscribers--;
-    DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		       tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
-    tabPtr.p->checkRelease(*this);
-    sendSubStopComplete(signal, tabPtr.p->m_drop_subbPtr);
-    tabPtr.p->m_drop_subbPtr.p = 0;
-  }
-  else
-  {
-    jam();
-    tabPtr.p->dropTrigger(signal,*this);
-  }
-  DBUG_VOID_RETURN;
+  subPtr.p->m_trigger_state = Subscription::T_ERROR;
+  drop_triggers(signal, subPtr);
 }
 
 void
-Suma::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr)
+Suma::report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr)
 {
-  jam();
-  DBUG_ENTER("Suma::sendSubStopComplete");
-  CRASH_INSERTION(13020);
-
-  DBUG_PRINT("info",("removed subscriber: %i", subbPtr.i));
-
-  SubscriptionPtr subPtr;
-  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-
-  Uint32 senderRef= subPtr.p->m_senderRef;
-  Uint32 senderData= subPtr.p->m_senderData;
-
-  subPtr.p->n_subscribers--;
-  ndbassert( subPtr.p->m_state == Subscription::LOCKED ||
-	     subPtr.p->m_state == Subscription::DROPPED );
-  if ( subPtr.p->m_state == Subscription::LOCKED )
+  const Uint64 gci = get_current_gci(signal);
   {
-    jam();
-    subPtr.p->m_state = Subscription::DEFINED;
-    if (subPtr.p->n_subscribers == 0)
+    LocalDLList<Subscriber> list(c_subscriberPool,
+                                 subPtr.p->m_subscribers);
+    LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
+
+    Ptr<Subscriber> ptr;
+    Ptr<SubOpRecord> subOpPtr;
+    for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
     {
       jam();
-#if 1
-      subPtr.p->m_table_ptrI = RNIL;
-#else
-      TablePtr tabPtr;
-      tabPtr.i = subPtr.p->m_table_ptrI;
-      if ((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) &&
-	  (tabPtr.p->m_state == Table::DROPPED ||
-	   tabPtr.p->m_state == Table::ALTERED) &&
-	  false)
+
+      Uint32 senderRef = subOpPtr.p->m_senderRef;
+      Uint32 senderData = subOpPtr.p->m_senderData;
+      c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
+
+      Uint32 nodeId = refToNode(ptr.p->m_senderRef);
+      if (c_startup.m_restart_server_node_id ||
+          (c_failedApiNodes.get(nodeId) == false &&
+           c_connected_nodes.get(nodeId)))
       {
-	// last subscriber, and table is dropped
-	// safe to drop subscription
-	c_subscriptions.release(subPtr);
-	DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-			   c_subscriptionPool.getSize(),
-			   c_subscriptionPool.getNoOfFree()));
+        SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
+        conf->senderRef       = reference();
+        conf->senderData      = senderData;
+        conf->subscriptionId  = subPtr.p->m_subscriptionId;
+        conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+        conf->firstGCI        = Uint32(gci >> 32);
+        conf->part            = SubscriptionData::TableData;
+
+        sendSignal(senderRef, GSN_SUB_START_CONF, signal,
+                   SubStartConf::SignalLength, JBB);
+
+        /**
+         * Call before adding to list...
+         *   cause method will (maybe) iterate thought list
+         */
+        bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
+        send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE,
+                                  report, list);
+        
+        list.add(ptr);
+        c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef));
       }
       else
       {
-	subPtr.p->m_table_ptrI = RNIL;
+        jam();
+        g_eventLogger.warning("Node %u failed in report_sub_start_conf",
+                              nodeId);
+        sendSubStartRef(signal,
+                        senderRef, senderData, SubStartRef::NodeDied);
+
+        c_subscriberPool.release(ptr);
       }
-      ndbassert(tabPtr.p != 0);
-#endif
+      
+      Ptr<SubOpRecord> tmp = subOpPtr;
+      subOpList.next(subOpPtr);
+      subOpList.release(tmp);
     }
   }
-  else if ( subPtr.p->n_subscribers == 0 )
-  {
-    // subscription is marked to be removed
-    // and there are no subscribers left
-    jam();
-    ndbassert(subPtr.p->m_state == Subscription::DROPPED);
-    completeSubRemove(subPtr);
-  }
-
-  // let subscriber know that subscrber is stopped
-  {
-    const Uint64 gci = get_current_gci(signal);
-    SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
-    data->gci_hi         = Uint32(gci >> 32);
-    data->gci_lo         = Uint32(gci);
-    data->tableId        = 0;
-    data->requestInfo    = 0;
-    SubTableData::setOperation(data->requestInfo, 
-			       NdbDictionary::Event::_TE_STOP);
-    SubTableData::setNdbdNodeId(data->requestInfo,
-				getOwnNodeId());
-    data->senderData     = subbPtr.p->m_senderData;
-    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-	       SubTableData::SignalLength, JBB);
-  }
   
-  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
-  
-  conf->senderRef= reference();
-  conf->senderData= senderData;
-
-  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
-	     SubStopConf::SignalLength, JBB);
-
-  c_subscriberPool.release(subbPtr);
-  DBUG_PRINT("info",("c_subscriberPool  size: %d free: %d",
-		     c_subscriberPool.getSize(),
-		     c_subscriberPool.getNoOfFree()));
-
-  reportAllSubscribers(signal, NdbDictionary::Event::_TE_UNSUBSCRIBE,
-                       subPtr, subbPtr);
-
-  DBUG_VOID_RETURN;
+  check_release_subscription(signal, subPtr);
 }
 
-// report new started subscriber to all other subscribers
 void
-Suma::reportAllSubscribers(Signal *signal,
-                           NdbDictionary::Event::_TableEvent table_event,
-                           SubscriptionPtr subPtr,
-                           SubscriberPtr subbPtr)
+Suma::report_sub_start_ref(Signal* signal,
+                           Ptr<Subscription> subPtr,
+                           Uint32 errCode)
 {
-  const Uint64 gci = get_current_gci(signal);
-  SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
+  LocalDLList<Subscriber> list(c_subscriberPool,
+                               subPtr.p->m_subscribers);
+  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
 
-  if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE &&
-      !c_startup.m_restart_server_node_id)
+  Ptr<Subscriber> ptr;
+  Ptr<SubOpRecord> subOpPtr;
+  for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
   {
-    data->gci_hi         = Uint32(gci >> 32);
-    data->gci_lo         = Uint32(gci);
-    data->tableId        = subPtr.p->m_tableId;
-    data->requestInfo    = 0;
-    SubTableData::setOperation(data->requestInfo, 
-			       NdbDictionary::Event::_TE_ACTIVE);
-    SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
-    SubTableData::setReqNodeId(data->requestInfo, 
-			       refToNode(subbPtr.p->m_senderRef));
-    data->changeMask     = 0;
-    data->totalLen       = 0;
-    data->senderData     = subbPtr.p->m_senderData;
-    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-               SubTableData::SignalLength, JBB);
-  }
+    jam();
 
-  if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
-  {
-    return;
-  }
-  if (subPtr.p->n_subscribers == 0)
-  {
-    ndbrequire(table_event != NdbDictionary::Event::_TE_SUBSCRIBE);
-    return;
-  }
- 
-//#ifdef VM_TRACE
-  ndbout_c("reportAllSubscribers  subPtr.i: %d  subPtr.p->n_subscribers: %d",
-           subPtr.i, subPtr.p->n_subscribers);
-//#endif
-  data->gci_hi         = Uint32(gci >> 32);
-  data->gci_lo         = Uint32(gci);
-  data->tableId        = subPtr.p->m_tableId;
-  data->requestInfo    = 0;
-  SubTableData::setOperation(data->requestInfo, table_event);
-  SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
-  data->changeMask     = 0;
-  data->totalLen       = 0;
-  
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
-  LocalDLList<Subscriber> subbs(c_subscriberPool, tabPtr.p->c_subscribers);
-  SubscriberPtr i_subbPtr;
-  for(subbs.first(i_subbPtr); !i_subbPtr.isNull(); subbs.next(i_subbPtr))
-  {
-    if (i_subbPtr.p->m_subPtrI == subPtr.i)
-    {
-      SubTableData::setReqNodeId(data->requestInfo, 
-				 refToNode(subbPtr.p->m_senderRef));
-      data->senderData = i_subbPtr.p->m_senderData;
-      sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-                 SubTableData::SignalLength, JBB);
-//#ifdef VM_TRACE
-      ndbout_c("sent %s(%d) to node %d, req_nodeid: %d  senderData: %d",
-               table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
-               "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
-               refToNode(i_subbPtr.p->m_senderRef),
-               refToNode(subbPtr.p->m_senderRef), data->senderData
-               );
-//#endif
-      if (i_subbPtr.i != subbPtr.i)
-      {
-	SubTableData::setReqNodeId(data->requestInfo, 
-				   refToNode(i_subbPtr.p->m_senderRef));
-	
-        data->senderData = subbPtr.p->m_senderData;
-        sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-                   SubTableData::SignalLength, JBB);
-//#ifdef VM_TRACE
-        ndbout_c("sent %s(%d) to node %d, req_nodeid: %d  senderData: %d",
-                 table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
-                 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
-                 refToNode(subbPtr.p->m_senderRef),
-                 refToNode(i_subbPtr.p->m_senderRef), data->senderData
-                 );
-//#endif
-      }
-    }
+    Uint32 senderRef = subOpPtr.p->m_senderRef;
+    Uint32 senderData = subOpPtr.p->m_senderData;
+    c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
+
+    SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
+    ref->senderRef  = reference();
+    ref->senderData = senderData;
+    ref->errorCode  = errCode;
+
+    sendSignal(senderRef, GSN_SUB_START_REF, signal,
+               SubStartConf::SignalLength, JBB);
+
+
+    Ptr<SubOpRecord> tmp = subOpPtr;
+    subOpList.next(subOpPtr);
+    subOpList.release(tmp);
+    c_subscriberPool.release(ptr);
   }
 }
 
 void
-Suma::sendSubStopRef(Signal* signal, Uint32 errCode)
+Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr)
 {
   jam();
-  DBUG_ENTER("Suma::sendSubStopRef");
-  SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
-  ref->senderRef = reference();
-  ref->errorCode = errCode;
-  sendSignal(signal->getSendersBlockRef(), 
-	     GSN_SUB_STOP_REF, 
-	     signal, 
-	     SubStopRef::SignalLength,
-	     JBB);
-  DBUG_VOID_RETURN;
-}
-
-/**********************************************************
- *
- * Trigger admin interface
- *
- */
-
-int
-Suma::Table::setupTrigger(Signal* signal,
-			  Suma &suma)
-{
-  jam();
-  DBUG_ENTER("Suma::Table::setupTrigger");
-
-  int ret= 0;
-  
-  AttributeMask attrMask;
-  createAttributeMask(attrMask, suma);
 
+  subPtr.p->m_outstanding_trigger = 0;
   for(Uint32 j = 0; j<3; j++)
   {
-    Uint32 triggerId = (m_schemaVersion << 18) | (j << 16) | m_ptrI;
-    if(m_hasTriggerDefined[j] == 0)
+    Uint32 triggerId = subPtr.p->m_triggers[j];
+    if (triggerId != ILLEGAL_TRIGGER_ID)
     {
-      suma.suma_ndbrequire(m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
-      DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId, j));
-      CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+      jam();
+      subPtr.p->m_outstanding_trigger++;
+      DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
+      req->setConnectionPtr(subPtr.i);
       req->setUserRef(SUMA_REF);
-      req->setConnectionPtr(m_ptrI);
+      req->setRequestType(DropTrigReq::RT_USER);
       req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
       req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
-      req->setMonitorReplicas(true);
-      //req->setMonitorAllAttributes(j == TriggerEvent::TE_DELETE);
-      req->setMonitorAllAttributes(true);
-      req->setReceiverRef(SUMA_REF);
+      req->setIndexId(RNIL);
+
+      req->setTableId(subPtr.p->m_tableId);
       req->setTriggerId(triggerId);
       req->setTriggerEvent((TriggerEvent::Value)j);
-      req->setTableId(m_tableId);
-      req->setAttributeMask(attrMask);
-      req->setReportAllMonitoredAttributes(m_reportAll);
-      suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 
-		      signal, CreateTrigReq::SignalLength, JBB);
-      ret= 1;
-    }
-    else
-    {
-      m_hasTriggerDefined[j]++;
-      DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u",
-			 m_tableId, j, m_hasTriggerDefined[j]));
+
+      sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+                 signal, DropTrigReq::SignalLength, JBB);
     }
   }
-  DBUG_RETURN(ret);
-}
 
-void
-Suma::Table::createAttributeMask(AttributeMask& mask,
-                                            Suma &suma)
-{
-  jam();
-  mask.clear();
-  for(Uint32 i = 0; i<m_noOfAttributes; i++)
-    mask.set(i);
+  if (subPtr.p->m_outstanding_trigger == 0)
+  {
+    jam();
+    drop_triggers_complete(signal, subPtr);
+  }
 }
 
 void
-Suma::execCREATE_TRIG_CONF(Signal* signal){
+Suma::execDROP_TRIG_REF(Signal* signal)
+{
   jamEntry();
-  DBUG_ENTER("Suma::execCREATE_TRIG_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
-  const Uint32 triggerId = conf->getTriggerId();
+  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
+  Ptr<Table> tabPtr;
+  Ptr<Subscription> subPtr;
+  const Uint32 triggerId = ref->getTriggerId();
   Uint32 type = (triggerId >> 16) & 0x3;
-  Uint32 tableId = conf->getTableId();
 
-
-  DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
-		       type, tableId,conf->getConnectionPtr(),triggerId & 0xFFFF));
- 
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, conf->getConnectionPtr());
-  ndbrequire(tabPtr.p->m_tableId == tableId);
-  ndbrequire(tabPtr.p->m_state == Table::DEFINING);
+  c_subscriptionPool.getPtr(subPtr, ref->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  ndbrequire(tabPtr.p->m_tableId == ref->getTableId());
 
   ndbrequire(type < 3);
-  tabPtr.p->m_triggerIds[type] = triggerId;
-  ndbrequire(tabPtr.p->m_hasTriggerDefined[type] == 0);
-  tabPtr.p->m_hasTriggerDefined[type] = 1;
+  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
 
-  if (type == 2)
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
+
+  if (subPtr.p->m_outstanding_trigger)
   {
-    completeAllSubscribers(signal, tabPtr);
-    completeInitTable(signal,tabPtr);
-    DBUG_VOID_RETURN;
+    jam();
+    /**
+     * Wait for more
+     */
+    return;
   }
-  DBUG_VOID_RETURN;
+
+  drop_triggers_complete(signal, subPtr);
 }
 
 void
-Suma::execCREATE_TRIG_REF(Signal* signal){
+Suma::execDROP_TRIG_CONF(Signal* signal)
+{
   jamEntry();
-  DBUG_ENTER("Suma::execCREATE_TRIG_REF");
-  ndbassert(signal->getNoOfSections() == 0);  
-  CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
-  const Uint32 triggerId = ref->getTriggerId();
+
+  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
+
+  Ptr<Table> tabPtr;
+  Ptr<Subscription> subPtr;
+  const Uint32 triggerId = conf->getTriggerId();
   Uint32 type = (triggerId >> 16) & 0x3;
-  Uint32 tableId = ref->getTableId();
-  
-  DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
-		       type, tableId,ref->getConnectionPtr(),triggerId & 0xFFFF));
- 
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, ref->getConnectionPtr());
-  ndbrequire(tabPtr.p->m_tableId == tableId);
-  ndbrequire(tabPtr.p->m_state == Table::DEFINING);
 
-  tabPtr.p->m_error= ref->getErrorCode();
+  c_subscriptionPool.getPtr(subPtr, conf->getConnectionPtr());
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+  ndbrequire(tabPtr.p->m_tableId == conf->getTableId());
 
   ndbrequire(type < 3);
+  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
+  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
 
-  if (type == 2)
+  ndbrequire(subPtr.p->m_outstanding_trigger);
+  subPtr.p->m_outstanding_trigger--;
+
+  if (subPtr.p->m_outstanding_trigger)
   {
-    completeAllSubscribers(signal, tabPtr);
-    completeInitTable(signal,tabPtr);
-    DBUG_VOID_RETURN;
+    jam();
+    /**
+     * Wait for more
+     */
+    return;
   }
 
-  DBUG_VOID_RETURN;
+  drop_triggers_complete(signal, subPtr);
 }
 
 void
-Suma::Table::dropTrigger(Signal* signal,Suma& suma)
+Suma::drop_triggers_complete(Signal* signal, Ptr<Subscription> subPtr)
 {
-  jam();
-  DBUG_ENTER("Suma::dropTrigger");
-  
-  m_hasOutstandingTriggerReq[0] =
-    m_hasOutstandingTriggerReq[1] =
-    m_hasOutstandingTriggerReq[2] = 1;
-  for(Uint32 j = 0; j<3; j++){
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
+  case Subscription::T_CREATING:
+  case Subscription::T_DEFINED:
     jam();
-    suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
-    if(m_hasTriggerDefined[j] == 1) {
-      jam();
-
-      DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
-      req->setConnectionPtr(m_ptrI);
-      req->setUserRef(SUMA_REF); // Sending to myself
-      req->setRequestType(DropTrigReq::RT_USER);
-      req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
-      req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
-      req->setIndexId(RNIL);
-
-      req->setTableId(m_tableId);
-      req->setTriggerId(m_triggerIds[j]);
-      req->setTriggerEvent((TriggerEvent::Value)j);
-
-      DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]",
-			 m_triggerIds[j],
-			 TriggerType::SUBSCRIPTION_BEFORE,
-			 TriggerActionTime::TA_DETACHED,
-			 j,
-			 m_tableId, j));
-      suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
-		      signal, DropTrigReq::SignalLength, JBB);
-    } else {
+    ndbrequire(false);
+    break;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     */
+    subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
+    if (!subPtr.p->m_start_req.isEmpty())
+    {
       jam();
-      suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
-      runDropTrigger(signal,m_triggerIds[j],suma);
+      create_triggers(signal, subPtr);
+      return;
     }
+    break;
+  case Subscription::T_ERROR:
+    jam();
+    Uint32 err = subPtr.p->m_errorCode;
+    subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
+    subPtr.p->m_errorCode = 0;
+    report_sub_start_ref(signal, subPtr, err);
+    break;
   }
-  DBUG_VOID_RETURN;
+
+  check_release_subscription(signal, subPtr);
 }
 
+/**********************************************************
+ * Suma participant interface
+ *
+ * Stopping and removing of subscriber
+ *
+ */
+
 void
-Suma::execDROP_TRIG_REF(Signal* signal){
+Suma::execSUB_STOP_REQ(Signal* signal){
   jamEntry();
-  DBUG_ENTER("Suma::execDROP_TRIG_REF");
   ndbassert(signal->getNoOfSections() == 0);
-  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
-  if (ref->getErrorCode() != DropTrigRef::TriggerNotFound)
+  DBUG_ENTER("Suma::execSUB_STOP_REQ");
+  
+  CRASH_INSERTION(13019);
+
+  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
+  Uint32 senderRef      = req->senderRef;
+  Uint32 senderData     = req->senderData;
+  Uint32 subscriberRef  = req->subscriberRef;
+  Uint32 subscriberData = req->subscriberData;
+  SubscriptionPtr subPtr;
+  Subscription key; 
+  key.m_subscriptionId  = req->subscriptionId;
+  key.m_subscriptionKey = req->subscriptionKey;
+  bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START);
+
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
-    ndbrequire(false);
+    jam();
+
+    /**
+     * We havent started syncing yet
+     */
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::NF_FakeErrorREF);
+    return;
   }
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, ref->getConnectionPtr());
-  ndbrequire(ref->getTableId() == tabPtr.p->m_tableId);
 
-  tabPtr.p->runDropTrigger(signal, ref->getTriggerId(), *this);
-  DBUG_VOID_RETURN;
-}
+  bool found = c_subscriptions.find(subPtr, key);
 
-void
-Suma::execDROP_TRIG_CONF(Signal* signal){
-  jamEntry();
-  DBUG_ENTER("Suma::execDROP_TRIG_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
+  if (!found)
+  {
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::NoSuchSubscription);
+    return;
+  }
+  
+  switch(subPtr.p->m_state){
+  case Subscription::DEFINING:
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::Defining);
+    return;
+  case Subscription::DROPPED:
+    jam();
+    break;
+  case Subscription::DEFINED:
+    jam();
+    break;
+  }
 
-  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
-  TablePtr tabPtr;
-  c_tables.getPtr(tabPtr, conf->getConnectionPtr());
-  ndbrequire(conf->getTableId() == tabPtr.p->m_tableId);
+  Ptr<SubOpRecord> subOpPtr;
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+  bool empty = list.isEmpty();
+  if (list.seize(subOpPtr) == false)
+  {
+    jam();
+    sendSubStopRef(signal,
+                   senderRef, senderData, SubStopRef::OutOfSubOpRecords);
+    return;
+  }
 
-  tabPtr.p->runDropTrigger(signal, conf->getTriggerId(),*this);
-  DBUG_VOID_RETURN;
+  if (abortStart)
+  {
+    jam();
+    subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ;
+  }
+  else
+  {
+    jam();
+    subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
+  }
+  subOpPtr.p->m_subPtrI = subPtr.i;
+  subOpPtr.p->m_senderRef = senderRef;
+  subOpPtr.p->m_senderData = senderData;
+  subOpPtr.p->m_subscriberRef = subscriberRef;
+  subOpPtr.p->m_subscriberData = subscriberData;
+
+
+  if (empty)
+  {
+    jam();
+    signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  }
 }
 
 void
-Suma::Table::runDropTrigger(Signal* signal,
-				       Uint32 triggerId,
-				       Suma &suma)
+Suma::sub_stop_req(Signal* signal)
 {
   jam();
-  Uint32 type = (triggerId >> 16) & 0x3;
 
-  suma.suma_ndbrequire(type < 3);
-  suma.suma_ndbrequire(m_triggerIds[type] == triggerId);
-  suma.suma_ndbrequire(m_hasTriggerDefined[type] > 0);
-  suma.suma_ndbrequire(m_hasOutstandingTriggerReq[type] == 1);
-  m_hasTriggerDefined[type]--;
-  m_hasOutstandingTriggerReq[type] = 0;
-  if (m_hasTriggerDefined[type] == 0)
-  {
-    jam();
-    m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
-  }
-  if( m_hasOutstandingTriggerReq[0] ||
-      m_hasOutstandingTriggerReq[1] ||
-      m_hasOutstandingTriggerReq[2])
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
+
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
+
+  Ptr<Subscriber> ptr;
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    if (signal->theData[2] == RNIL)
+    {
+      jam();
+      list.first(ptr);
+    }
+    else
+    {
+      jam();
+      list.getPtr(ptr, signal->theData[2]);
+    }
+
+    for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr))
+    {
+      if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
+          ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
+      {
+        jam();
+        goto found;
+      }
+    }
+  }
+
+  if (ptr.isNull())
   {
-    // more to come
     jam();
+    sendSubStopRef(signal,
+                   subOpPtr.p->m_senderRef,
+                   subOpPtr.p->m_senderData,
+                   SubStopRef::NoSuchSubscriber);
+    check_remove_queue(signal, subPtr, subOpPtr, true, true);
     return;
   }
 
-#if 0
-  ndbout_c("trigger completed");
-#endif
-
-
-  n_subscribers--;
-  DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
-		     m_tableId, n_subscribers));
-  checkRelease(suma);
+  signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+  signal->theData[1] = subOpPtr.i;
+  signal->theData[2] = ptr.i;
+  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+  return;
 
-  suma.sendSubStopComplete(signal, m_drop_subbPtr);
-  m_drop_subbPtr.p = 0;
+found:
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    list.remove(ptr);
+    /**
+     * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
+     */
+    bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
+    report_sub_stop_conf(signal, subOpPtr, ptr, report, list);
+    c_subscriberPool.release(ptr);
+  }
+  check_remove_queue(signal, subPtr, subOpPtr, true, true);
+  check_release_subscription(signal, subPtr);
 }
 
-void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
-
 void
-Suma::Table::checkRelease(Suma &suma)
+Suma::check_remove_queue(Signal* signal,
+                         Ptr<Subscription> subPtr,
+                         Ptr<SubOpRecord> subOpPtr,
+                         bool ishead,
+                         bool dorelease)
 {
-  jam();
-  DBUG_ENTER("Suma::Table::checkRelease");
-  if (n_subscribers == 0)
+  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+
   {
-    jam();
-    suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
-    suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
-    suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
-    if (!c_subscribers.isEmpty())
+    Ptr<SubOpRecord> tmp;
+    list.first(tmp);
+    if (ishead)
     {
-      LocalDLList<Subscriber>
-	subscribers(suma.c_subscriberPool,c_subscribers);
-      SubscriberPtr subbPtr;
-      for (subscribers.first(subbPtr);!subbPtr.isNull();
-	   subscribers.next(subbPtr))
-      {
-	jam();
-	DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
-      }
-      suma.suma_ndbrequire(false);
+      jam();
+      ndbrequire(tmp.i == subOpPtr.i);
     }
-    if (!c_syncRecords.isEmpty())
+    else
     {
-      LocalDLList<SyncRecord>
-	syncRecords(suma.c_syncPool,c_syncRecords);
-      Ptr<SyncRecord> syncPtr;
-      for (syncRecords.first(syncPtr);!syncPtr.isNull();
-	   syncRecords.next(syncPtr))
-      {
-	jam();
-	DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
-      }
-      suma.suma_ndbrequire(false);
+      jam();
+      ishead = (tmp.i == subOpPtr.i);
+    }
+  }
+
+  if (dorelease)
+  {
+    jam();
+    list.release(subOpPtr);
+  }
+  else
+  {
+    jam();
+    list.remove(subOpPtr);
+  }
+
+  if (ishead)
+  {
+    jam();
+    if (list.first(subOpPtr) == false)
+    {
+      jam();
+      c_restart.m_waiting_on_self = 1;
+      return;
     }
-    release(suma);
-    suma.c_tables.remove(m_ptrI);
-    suma.c_tablePool.release(m_ptrI);
-    DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-		       suma.c_tablePool.getSize(),
-		       suma.c_tablePool.getNoOfFree()));
+    // Fall through
   }
   else
   {
-    DBUG_PRINT("info",("n_subscribers: %d", n_subscribers));
+    jam();
+    return;
   }
-  DBUG_VOID_RETURN;
+
+  switch(subOpPtr.p->m_opType){
+  case SubOpRecord::R_SUB_ABORT_START_REQ:
+  case SubOpRecord::R_SUB_STOP_REQ:
+    jam();
+    signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  case SubOpRecord::R_API_FAIL_REQ:
+    jam();
+    signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
+    signal->theData[1] = subOpPtr.i;
+    signal->theData[2] = RNIL;
+    sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+    return;
+  case SubOpRecord::R_START_ME_REQ:
+    jam();
+    sendSubCreateReq(signal, subPtr);
+    return;
+  }
+}
+
+void
+Suma::report_sub_stop_conf(Signal* signal,
+                           Ptr<SubOpRecord> subOpPtr,
+                           Ptr<Subscriber> ptr,
+                           bool report,
+                           LocalDLList<Subscriber>& list)
+{
+  jam();
+  CRASH_INSERTION(13020);
+  
+  Uint32 senderRef = subOpPtr.p->m_senderRef;
+  Uint32 senderData = subOpPtr.p->m_senderData;
+  bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ;
+  
+  // let subscriber know that subscrber is stopped
+  if (!abortStart)
+  {
+    jam();
+    send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP,
+                              report, list);
+  }
+  
+  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
+  conf->senderRef= reference();
+  conf->senderData= senderData;
+  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
+	     SubStopConf::SignalLength, JBB);
+}
+
+void
+Suma::sendSubStopRef(Signal* signal,
+                     Uint32 retref,
+                     Uint32 data,
+                     Uint32 errCode)
+{
+  jam();
+  SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
+  ref->senderRef = reference();
+  ref->errorCode = errCode;
+  ref->senderData = data;
+  sendSignal(retref, GSN_SUB_STOP_REF, signal,  SubStopRef::SignalLength, JBB);
+}
+
+// report new started subscriber to all other subscribers
+void
+Suma::send_sub_start_stop_event(Signal *signal,
+                                Ptr<Subscriber> ptr,
+                                NdbDictionary::Event::_TableEvent event,
+                                bool report,
+                                LocalDLList<Subscriber>& list)
+{
+  const Uint64 gci = get_current_gci(signal);
+  SubTableData * data  = (SubTableData*)signal->getDataPtrSend();
+  Uint32 nodeId = refToNode(ptr.p->m_senderRef);
+
+  NdbDictionary::Event::_TableEvent other;
+  if (event == NdbDictionary::Event::_TE_STOP)
+  {
+    other = NdbDictionary::Event::_TE_UNSUBSCRIBE;
+  }
+  else if (event == NdbDictionary::Event::_TE_ACTIVE)
+  {
+    other = NdbDictionary::Event::_TE_SUBSCRIBE;
+  }
+  else
+  {
+    jamLine(event);
+    ndbrequire(false);
+  }
+  
+  data->gci_hi         = Uint32(gci >> 32);
+  data->gci_lo         = Uint32(gci);
+  data->tableId        = 0;
+  data->requestInfo    = 0;
+  SubTableData::setOperation(data->requestInfo, event);
+  SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
+  SubTableData::setReqNodeId(data->requestInfo, nodeId);
+  data->changeMask     = 0;
+  data->totalLen       = 0;
+  data->senderData     = ptr.p->m_senderData;
+  sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+             SubTableData::SignalLength, JBB);
+
+  if (report == false)
+  {
+    return;
+  }
+
+  data->requestInfo    = 0;
+  SubTableData::setOperation(data->requestInfo, other);
+  SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
+
+  Ptr<Subscriber> tmp;
+  for(list.first(tmp); !tmp.isNull(); list.next(tmp))
+  {
+    jam();
+    SubTableData::setReqNodeId(data->requestInfo, nodeId);
+    data->senderData = tmp.p->m_senderData;
+    sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+               SubTableData::SignalLength, JBB);
+    
+    ndbassert(tmp.i != ptr.i); // ptr should *NOT* be in list now
+    if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE)
+    {
+      jam();
+      SubTableData::setReqNodeId(data->requestInfo, 
+                                 refToNode(tmp.p->m_senderRef));
+      
+      data->senderData = ptr.p->m_senderData;
+      sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                 SubTableData::SignalLength, JBB);
+    }
+  }
+}
+
+void
+Suma::Table::createAttributeMask(AttributeMask& mask,
+                                 Suma &suma)
+{
+  jam();
+  mask.clear();
+  for(Uint32 i = 0; i<m_noOfAttributes; i++)
+    mask.set(i);
 }
 
+void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
+
+
 /**********************************************************
  * Scan data interface
  *
@@ -3290,9 +3266,9 @@ Suma::execTRANSID_AI(Signal* signal)
    * Initialize signal
    */  
   SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
-  Uint32 ref = subPtr.p->m_senderRef;
+  Uint32 ref = syncPtr.p->m_senderRef;
   sdata->tableId = syncPtr.p->m_currentTableId;
-  sdata->senderData = subPtr.p->m_senderData;
+  sdata->senderData = syncPtr.p->m_senderData;
   sdata->requestInfo = 0;
   SubTableData::setOperation(sdata->requestInfo, 
 			     NdbDictionary::Event::_TE_SCAN); // Scan
@@ -3481,12 +3457,12 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
   const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
   const Uint32 event     = trg->getTriggerEvent();
   const Uint32 any_value = trg->getAnyValue();
-  TablePtr tabPtr;
-  tabPtr.i               = trigId & 0xFFFF;
+
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF);
 
   ndbassert(gci > m_last_complete_gci);
 
-  DBUG_PRINT("enter",("tabPtr.i=%u", tabPtr.i));
   ndbrequire(f_bufferLock == trigId);
   /**
    * Reset f_bufferLock
@@ -3494,8 +3470,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
   f_bufferLock = 0;
   b_bufferLock = 0;
   
-  ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0);
-  Uint32 tableId = tabPtr.p->m_tableId;
+  Uint32 tableId = subPtr.p->m_tableId;
+  Uint32 schemaVersion =
+    c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
   
   Uint32 bucket= hashValue % c_no_of_buckets;
   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
@@ -3515,8 +3492,6 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
     /**
      * Signal to subscriber(s)
      */
-    ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0);
-    
     SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
     data->gci_hi         = gci_hi;
     data->gci_lo         = gci_lo;
@@ -3528,12 +3503,10 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
     data->totalLen       = ptrLen;
     
     {
-      LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
+      LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
       SubscriberPtr subbPtr;
       for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
       {
-	DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
-			   refToNode(subbPtr.p->m_senderRef)));
 	data->senderData = subbPtr.p->m_senderData;
 	sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
 		   SubTableData::SignalLength, JBB, ptr, nptr);
@@ -3547,8 +3520,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
     Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
     if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
     {
-      * dst++ = tableId;
-      * dst++ = tabPtr.p->m_schemaVersion;
+      * dst++ = subPtr.i;
+      * dst++ = schemaVersion;
       * dst++ = (event << 16) | f_trigBufferSize;
       * dst++ = any_value;
       memcpy(dst, f_buffer, f_trigBufferSize << 2);
@@ -3785,6 +3758,13 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
       gcp.p->m_gci = gci;
       gcp.p->m_subscribers = c_subscriber_nodes;
     }
+    else
+    {
+      char buf[100];
+      c_subscriber_nodes.getText(buf);
+      g_eventLogger.error("c_gcp_list.seize() failed: gci: %d nodes: %s",
+                          gci, buf);
+    }
   }
   
   /**
@@ -3816,13 +3796,6 @@ Suma::execCREATE_TAB_CONF(Signal *signal
   jamEntry();
   DBUG_ENTER("Suma::execCREATE_TAB_CONF");
 
-#if 0
-  CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
-  Uint32 tableId = conf->senderData;
-
-  TablePtr tabPtr;
-  initTable(signal,tableId,tabPtr);
-#endif
   DBUG_VOID_RETURN;
 }
 
@@ -3830,7 +3803,6 @@ void
 Suma::execDROP_TAB_CONF(Signal *signal)
 {
   jamEntry();
-  DBUG_ENTER("Suma::execDROP_TAB_CONF");
   ndbassert(signal->getNoOfSections() == 0);
 
   DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
@@ -3838,33 +3810,24 @@ Suma::execDROP_TAB_CONF(Signal *signal)
   Uint32 tableId= conf->tableId;
 
   TablePtr tabPtr;
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
+  if (!c_tables.find(tabPtr, tableId))
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
 
   DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
 
   tabPtr.p->m_state = Table::DROPPED;
-  for (int j= 0; j < 3; j++)
-  {
-    if (!tabPtr.p->m_hasOutstandingTriggerReq[j])
-    {
-      tabPtr.p->m_hasTriggerDefined[j] = 0;
-      tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
-      tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
-    }
-    else
-      tabPtr.p->m_hasTriggerDefined[j] = 1;
-  }
+  c_tables.remove(tabPtr);
+
   if (senderRef == 0)
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
   // dict coordinator sends info to API
-  
+
   const Uint64 gci = get_current_gci(signal);
   SubTableData * data = (SubTableData*)signal->getDataPtrSend();
   data->gci_hi         = Uint32(gci >> 32);
@@ -3874,30 +3837,33 @@ Suma::execDROP_TAB_CONF(Signal *signal)
   SubTableData::setOperation(data->requestInfo,NdbDictionary::Event::_TE_DROP);
   SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
   
+  Ptr<Subscription> subPtr;
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+
+  for (subList.first(subPtr); !subPtr.isNull(); )
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+    if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
     {
       jam();
-      /*
-       * get subscription ptr for this subscriber
-       */
-      SubscriptionPtr subPtr;
-      c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-      if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
-	jam();
-	continue;
-	//continue in for-loop if the table is not part of 
-	//the subscription. Otherwise, send data to subscriber.
-      }
-      data->senderData= subbPtr.p->m_senderData;
-      sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-		 SubTableData::SignalLength, JBB);
-      DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
+      continue;
+      //continue in for-loop if the table is not part of
+      //the subscription. Otherwise, send data to subscriber.
     }
+
+    Ptr<Subscriber> ptr;
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
+    {
+      jam();
+      data->senderData= ptr.p->m_senderData;
+      sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                 SubTableData::SignalLength, JBB);
+    }
+    
+    Ptr<Subscription> tmp = subPtr;
+    subList.next(subPtr);
   }
-  DBUG_VOID_RETURN;
 }
 
 static Uint32 b_dti_buf[MAX_WORDS_META_FILE];
@@ -3914,37 +3880,26 @@ Suma::execALTER_TAB_REQ(Signal *signal)
   Uint32 tableId= req->tableId;
   Uint32 changeMask= req->changeMask;
   TablePtr tabPtr;
-  if (!c_tables.find(tabPtr, tableId) ||
-      tabPtr.p->m_state == Table::DROPPED ||
-      tabPtr.p->m_state == Table::ALTERED)
+  if (!c_tables.find(tabPtr, tableId))
   {
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
 
-  DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
-  Table::State old_state = tabPtr.p->m_state;
-  tabPtr.p->m_state = Table::ALTERED;
-  // triggers must be removed, waiting for sub stop req for that
-
   if (senderRef == 0)
   {
-    tabPtr.p->m_state = old_state;
-    DBUG_VOID_RETURN;
+    jam();
+    return;
   }
   // dict coordinator sends info to API
-
+  
   // Copy DICT_TAB_INFO to local buffer
   SegmentedSectionPtr tabInfoPtr;
   signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
-#ifndef DBUG_OFF
-  ndbout_c("DICT_TAB_INFO in SUMA,  tabInfoPtr.sz = %d", tabInfoPtr.sz);
-  SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
-  reader.printAll(ndbout);
-#endif
   copy(b_dti_buf, tabInfoPtr);
-  LinearSectionPtr ptr[3];
-  ptr[0].p = b_dti_buf;
-  ptr[0].sz = tabInfoPtr.sz;
+  LinearSectionPtr lptr[3];
+  lptr[0].p = b_dti_buf;
+  lptr[0].sz = tabInfoPtr.sz;
 
   releaseSections(signal);
 
@@ -3960,32 +3915,32 @@ Suma::execALTER_TAB_REQ(Signal *signal)
   data->flags          = 0;
   data->changeMask     = changeMask;
   data->totalLen       = tabInfoPtr.sz;
+  Ptr<Subscription> subPtr;
+  LocalDLList<Subscription> subList(c_subscriptionPool,
+                                    tabPtr.p->m_subscriptions);
+
+  for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
   {
-    LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
-    SubscriberPtr subbPtr;
-    for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+    if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
     {
       jam();
-      /*
-       * get subscription ptr for this subscriber
-       */
-      SubscriptionPtr subPtr;
-      c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-      if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
-	jam();
-	continue;
-	//continue in for-loop if the table is not part of 
-	//the subscription. Otherwise, send data to subscriber.
-      }
-
-      data->senderData= subbPtr.p->m_senderData;
+      continue;
+      //continue in for-loop if the table is not part of
+      //the subscription. Otherwise, send data to subscriber.
+    }
+  
+    Ptr<Subscriber> ptr;
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    for(list.first(ptr); !ptr.isNull(); list.next(ptr))
+    {
+      jam();
+      data->senderData= ptr.p->m_senderData;
       Callback c = { 0, 0 };
-      sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-                           SubTableData::SignalLength, JBB, ptr, 1, c);
-      DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
+      sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                           SubTableData::SignalLength, JBB, lptr, 1, c);
     }
   }
-  tabPtr.p->m_state = old_state;
+
   DBUG_VOID_RETURN;
 }
 
@@ -4035,6 +3990,13 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
   // Ack from User and not an ack from other SUMA, redistribute in nodegroup
   
   Uint32 nodeId = refToNode(senderRef);
+  if (ERROR_INSERTED(13023))
+  {
+    ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u",
+             Uint32(gci>>32), Uint32(gci), nodeId);
+    return;
+  }
+
   
   jam();
   Ptr<Gcp_record> gcp;
@@ -4043,6 +4005,7 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
     if(gcp.p->m_gci == gci)
     {
       gcp.p->m_subscribers.clear(nodeId);
+      gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
       if(!gcp.p->m_subscribers.isclear())
       {
 	jam();
@@ -4054,8 +4017,9 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
   
   if(gcp.isNull())
   {
-    ndbout_c("ACK wo/ gcp record (gci: %u/%u)", 
-	     Uint32(gci >> 32), Uint32(gci));
+    g_eventLogger.warning("ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x",
+                          Uint32(gci >> 32), Uint32(gci),
+                          senderRef, signal->getSendersBlockRef());
   }
   else
   {
@@ -4087,7 +4051,6 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
 {
   jamEntry();
   DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
-  ndbassert(signal->getNoOfSections() == 0);
 
   CRASH_INSERTION(13021);
 
@@ -4097,54 +4060,43 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
   key.m_subscriptionId  = req.subscriptionId;
   key.m_subscriptionKey = req.subscriptionKey;
 
-  DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
-		      key.m_subscriptionId, key.m_subscriptionKey));
-
-  if(!c_subscriptions.find(subPtr, key))
+  if (c_startup.m_restart_server_node_id == RNIL)
   {
     jam();
-    DBUG_PRINT("info",("Not found"));
-    sendSubRemoveRef(signal, req, 1407);
-    DBUG_VOID_RETURN;
-  }
-  if (subPtr.p->m_state == Subscription::LOCKED)
-  {
-    /**
-     * we are currently setting up triggers etc. for this event
-     */
-    jam();
-    sendSubRemoveRef(signal, req, 1413);
-    DBUG_VOID_RETURN;
-  }
-  if (subPtr.p->m_state == Subscription::DROPPED)
-  {
+
     /**
-     * already dropped
+     * We havent started syncing yet
      */
-    jam();
-    sendSubRemoveRef(signal, req, 1419);
-    DBUG_VOID_RETURN;
+    sendSubRemoveRef(signal,  req, SubRemoveRef::NF_FakeErrorREF);
+    return;
   }
 
-  ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
-  DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
+  bool found = c_subscriptions.find(subPtr, key);
 
-  if (subPtr.p->n_subscribers == 0)
+  if(!found)
   {
-    // no subscribers on the subscription
-    // remove it
     jam();
-    completeSubRemove(subPtr);
+    sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
+    return;
   }
-  else
-  {
-    // subscribers left on the subscription
-    // mark it to be removed once all subscribers
-    // are removed
+
+  switch(subPtr.p->m_state){
+  case Subscription::DEFINING:
+    ndbrequire(false);
+  case Subscription::DROPPED:
+    /**
+     * already dropped
+     */
     jam();
-    subPtr.p->m_state = Subscription::DROPPED;
+    sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
+    return;
+  case Subscription::DEFINED:
+    break;
   }
 
+  subPtr.p->m_state = Subscription::DROPPED;
+  check_release_subscription(signal, subPtr);
+
   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
   conf->senderRef            = reference();
   conf->senderData           = req.senderData;
@@ -4152,73 +4104,109 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
   conf->subscriptionKey      = req.subscriptionKey;
 
   sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
-	     SubRemoveConf::SignalLength, JBB);
-
-  DBUG_VOID_RETURN;
+             SubRemoveConf::SignalLength, JBB);
+  return;
 }
 
 void
-Suma::completeSubRemove(SubscriptionPtr subPtr)
+Suma::check_release_subscription(Signal* signal, Ptr<Subscription> subPtr)
 {
-  DBUG_ENTER("Suma::completeSubRemove");
-  //Uint32 subscriptionId  = subPtr.p->m_subscriptionId;
-  //Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
+  if (!subPtr.p->m_subscribers.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  c_subscriptions.release(subPtr);
-  DBUG_PRINT("info",("c_subscriptionPool  size: %d free: %d",
-		     c_subscriptionPool.getSize(),
-		     c_subscriptionPool.getNoOfFree()));
+  if (!subPtr.p->m_start_req.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  /**
-   * I was the last subscription to be remove so clear c_tables
-   */
-#if 0
-  ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
-	   c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());
-#endif
+  if (!subPtr.p->m_stop_req.isEmpty())
+  {
+    jam();
+    return;
+  }
 
-  if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {
+  switch(subPtr.p->m_trigger_state){
+  case Subscription::T_UNDEFINED:
     jam();
-#if 0
-    ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
-#endif
-    int count= 0;
-    KeyTable<Table>::Iterator it;
-    for(c_tables.first(it); !it.isNull(); )
-    {
-      // ndbrequire(false);
-      
-      DBUG_PRINT("error",("trailing table id: %d[i=%d] n_subscribers: %d m_state: %d",
-			  it.curr.p->m_tableId,
-			  it.curr.p->m_ptrI,
-			  it.curr.p->n_subscribers,
-			  it.curr.p->m_state));
+    goto do_release;
+  case Subscription::T_CREATING:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  case Subscription::T_DEFINED:
+    jam();
+    subPtr.p->m_trigger_state = Subscription::T_DROPPING;
+    drop_triggers(signal, subPtr);
+    return;
+  case Subscription::T_DROPPING:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  case Subscription::T_ERROR:
+    jam();
+    /**
+     * Wait for completion
+     */
+    return;
+  }
+  ndbrequire(false);
 
-      LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
-      SubscriberPtr subbPtr;
-      for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
-      {
-	DBUG_PRINT("error",("subscriber %d, m_subPtrI: %d", subbPtr.i, subbPtr.p->m_subPtrI));
-      }
+do_release:
+  TablePtr tabPtr;
+  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-      it.curr.p->release(* this);
-      TablePtr tabPtr = it.curr;
-      c_tables.next(it);
+  if (tabPtr.p->m_state == Table::DROPPED)
+  {
+    jam();
+    subPtr.p->m_state = Subscription::DROPPED;
+  }
+
+  if (subPtr.p->m_state != Subscription::DROPPED)
+  {
+    jam();
+    return;
+  }
+
+  {
+    LocalDLList<Subscription> list(c_subscriptionPool,
+                                   tabPtr.p->m_subscriptions);
+    list.remove(subPtr);
+  }
+
+  if (tabPtr.p->m_subscriptions.isEmpty())
+  {
+    jam();
+    switch(tabPtr.p->m_state){
+    case Table::UNDEFINED:
+      ndbrequire(false);
+    case Table::DEFINING:
+      break;
+    case Table::DEFINED:
+      jam();
       c_tables.remove(tabPtr);
+      // Fall through
+    case Table::DROPPED:
+      jam();
+      tabPtr.p->release(* this);
       c_tablePool.release(tabPtr);
-      DBUG_PRINT("info",("c_tablePool  size: %d free: %d",
-			 c_tablePool.getSize(),
-			 c_tablePool.getNoOfFree()));
-      count++;
-    }
-    DBUG_ASSERT(count == 0);
+    };
   }
-  DBUG_VOID_RETURN;
+  
+  c_subscriptions.release(subPtr);
 }
 
 void
-Suma::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
-				  Uint32 errCode)
+Suma::sendSubRemoveRef(Signal* signal,
+                       const SubRemoveReq& req,
+                       Uint32 errCode)
 {
   jam();
   DBUG_ENTER("Suma::sendSubRemoveRef");
@@ -4242,10 +4230,6 @@ Suma::Table::release(Suma & suma){
   fragBuf.release();
 
   m_state = UNDEFINED;
-#ifndef DBUG_OFF
-  if (n_subscribers != 0)
-    abort();
-#endif
 }
 
 void
@@ -4271,398 +4255,315 @@ Suma::SyncRecord::release(){
 void
 Suma::execSUMA_START_ME_REQ(Signal* signal) {
   jamEntry();
-  DBUG_ENTER("Suma::execSUMA_START_ME");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUMA_START_ME_REQ(signal, signal->getSendersBlockRef());
-  DBUG_VOID_RETURN;
-}
 
-void 
-Suma::execSUB_CREATE_REF(Signal* signal) {
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_CREATE_REF");
-  ndbassert(signal->getNoOfSections() == 0);
-  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
-  Uint32 error= ref->errorCode;
-  if (error != 1415)
+  Uint32 retref = signal->getSendersBlockRef();
+  if (c_restart.m_ref)
   {
-    /*
-     * This will happen if an api node connects during while other node
-     * is restarting, and in this case the subscription will already
-     * have been created.
-     * ToDo: more complete handling of api nodes joining during
-     * node restart
-     */
-    Uint32 senderRef = signal->getSendersBlockRef();
-    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
-    // for some reason we did not manage to create a subscription
-    // on the starting node
-    SystemError * const sysErr = (SystemError*)&signal->theData[0];
-    sysErr->errorCode = SystemError::CopySubscriptionRef;
-    sysErr->errorRef = reference();
-    sysErr->data[0] = error;
-    sysErr->data[1] = 0;
-    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
-               SystemError::SignalLength, JBB);
-    Restart.resetRestart(signal);
-    DBUG_VOID_RETURN;
+    jam();
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = SumaStartMeRef::Busy;
+    sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
+    return;
   }
-  // SubCreateConf has same signaldata as SubCreateRef
-  Restart.runSUB_CREATE_CONF(signal);
-  DBUG_VOID_RETURN;
-}
 
-void 
-Suma::execSUB_CREATE_CONF(Signal* signal)
-{
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_CREATE_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUB_CREATE_CONF(signal);
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::execSUB_START_CONF(Signal* signal)
-{
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_START_CONF");
-  ndbassert(signal->getNoOfSections() == 0);
-  Restart.runSUB_START_CONF(signal);
-  DBUG_VOID_RETURN;
-}
-
-void
-Suma::execSUB_START_REF(Signal* signal) {
-  jamEntry();
-  DBUG_ENTER("Suma::execSUB_START_REF");
-  ndbassert(signal->getNoOfSections() == 0);
-  SubStartRef *const ref= (SubStartRef *)signal->getDataPtr();
-  Uint32 error= ref->errorCode;
+  Ptr<SubOpRecord> subOpPtr;
+  if (c_subOpPool.seize(subOpPtr) == false)
   {
-    Uint32 senderRef = signal->getSendersBlockRef();
-    BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
-    // for some reason we did not manage to start a subscriber
-    // on the starting node
-    SystemError * const sysErr = (SystemError*)&signal->theData[0];
-    sysErr->errorCode = SystemError::CopySubscriberRef;
-    sysErr->errorRef = reference();
-    sysErr->data[0] = error;
-    sysErr->data[1] = 0;
-    sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
-               SystemError::SignalLength, JBB);
-    Restart.resetRestart(signal);
+    jam();
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = SumaStartMeRef::Busy;
+    sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
+    return;
   }
-  DBUG_VOID_RETURN;
-}
 
-Suma::Restart::Restart(Suma& s) : suma(s)
-{
-  nodeId = 0;
-}
+  subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
 
-void
-Suma::Restart::runSUMA_START_ME_REQ(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::runSUMA_START_ME");
+  c_restart.m_abort = 0;
+  c_restart.m_waiting_on_self = 0;
+  c_restart.m_ref = retref;
+  c_restart.m_max_seq = c_current_seq;
+  c_restart.m_subOpPtrI = subOpPtr.i;
 
-  if(nodeId != 0)
+  DLHashTable<Subscription>::Iterator it;
+  if (c_subscriptions.first(it))
   {
-    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
-    ref->errorCode = SumaStartMeRef::Busy;
-    suma.sendSignal(sumaRef, GSN_SUMA_START_ME_REF, signal,
-		    SumaStartMeRef::SignalLength, JBB);
-    return;
-  }
+    jam();
 
-  nodeId = refToNode(sumaRef);
-  startNode(signal, sumaRef);
+    /**
+     * We only need to handle subscriptions with seq <= c_current_seq
+     *   all subscriptions(s) created after this, will be handled by
+     *   starting suma directly
+     */
+    c_current_seq++;
+  }
 
-  DBUG_VOID_RETURN;
+  copySubscription(signal, it);
 }
 
 void
-Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::startNode");
-  
-  // right now we can only handle restarting one node
-  // at a time in a node group
-  
-  createSubscription(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
-
-void 
-Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef)
+Suma::copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator it)
 {
   jam();
-  DBUG_ENTER("Suma::Restart::createSubscription");
-  suma.c_subscriptions.first(c_subIt);
-  nextSubscription(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
 
-void 
-Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::nextSubscription");
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
 
-  if (c_subIt.isNull())
+  Ptr<Subscription> subPtr = it.curr;
+  if (!subPtr.isNull())
   {
     jam();
-    completeSubscription(signal, sumaRef);
-    DBUG_VOID_RETURN;
-  }
-  SubscriptionPtr subPtr;
-  subPtr.i = c_subIt.curr.i;
-  subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
+    c_restart.m_subPtrI = subPtr.i;
+    c_restart.m_bucket = it.bucket;
 
-  suma.c_subscriptions.next(c_subIt);
 
-  SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
-      
-  req->senderRef        = suma.reference();
-  req->senderData       = subPtr.i;
-  req->subscriptionId   = subPtr.p->m_subscriptionId;
-  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  req->subscriptionType = subPtr.p->m_subscriptionType |
-    SubCreateReq::RestartFlag;
-
-  switch (subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
-    jam();
-    req->tableId = subPtr.p->m_tableId;
-    req->state = subPtr.p->m_state;
-    suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
-		    SubCreateReq::SignalLength2, JBB);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SingleTableScan:
+    LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
+    bool empty = list.isEmpty();
+    list.add(subOpPtr);
+
+    if (!empty)
+    {
+      /**
+       * Wait for lock
+       */
+      jam();
+      c_restart.m_waiting_on_self = 1;
+      return;
+    }
+
+    sendSubCreateReq(signal, subPtr);
+  }
+  else
+  {
     jam();
-    nextSubscription(signal, sumaRef);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-    ndbrequire(false);
+    SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend();
+    conf->unused = 0;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
+               SumaStartMeConf::SignalLength, JBB);
+
+    c_subOpPool.release(subOpPtr);
+    c_restart.m_ref = 0;
+    return;
   }
-  ndbrequire(false);
 }
 
 void
-Suma::Restart::runSUB_CREATE_CONF(Signal* signal)
+Suma::sendSubCreateReq(Signal* signal, Ptr<Subscription> subPtr)
 {
   jam();
-  DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF");
 
-  const Uint32 senderRef = signal->senderBlockRef();
-  Uint32 sumaRef = signal->getSendersBlockRef();
+  if (c_restart.m_abort)
+  {
+    jam();
+    abort_start_me(signal, subPtr, true);
+    return;
+  }
 
-  SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
+  Ptr<Table> tabPtr;
+  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
 
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr,conf->senderData);
+  if (subPtr.p->m_state != Subscription::DROPPED &&
+      tabPtr.p->m_state != Table::DROPPED)
+  {
+    jam();
+    c_restart.m_waiting_on_self = 0;
+    SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+    req->senderRef        = reference();
+    req->senderData       = subPtr.i;
+    req->subscriptionId   = subPtr.p->m_subscriptionId;
+    req->subscriptionKey  = subPtr.p->m_subscriptionKey;
+    req->subscriptionType = subPtr.p->m_subscriptionType;
+    req->tableId          = subPtr.p->m_tableId;
 
-  switch(subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
-    if (1)
+    if (subPtr.p->m_options & Subscription::REPORT_ALL)
     {
-      jam();
-      nextSubscription(signal, sumaRef);
-    } else {
-      jam();
-      SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
-      
-      req->senderRef        = suma.reference();
-      req->senderData       = subPtr.i;
-      req->subscriptionId   = subPtr.p->m_subscriptionId;
-      req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-      req->subscriptionType = subPtr.p->m_subscriptionType |
-	SubCreateReq::RestartFlag |
-	SubCreateReq::AddTableFlag;
+      req->subscriptionType |= SubCreateReq::ReportAll;
+    }
 
-      req->tableId = 0;
+    if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
+    {
+      req->subscriptionType |= SubCreateReq::ReportSubscribe;
+    }
 
-      suma.sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
-		      SubCreateReq::SignalLength, JBB);
+    if (!ndbd_suma_dictlock(getNodeInfo(refToNode(c_restart.m_ref)).m_version))
+    {
+      jam();
+      /**
+       * Downgrade
+       *
+       * In pre suma v2, SUB_CREATE_REQ::SignalLength is one greater
+       *   but code checks length and set a default value...
+       *   so we dont need to do anything...
+       *   Thank you Ms. Fortuna
+       */
     }
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SingleTableScan:
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-    ndbrequire(false);
+    
+    sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
+               SubCreateReq::SignalLength, JBB);
+  }
+  else
+  {
+    /**
+     * No need to copy DROPPED subscription...
+     *   but this introduces a real time break
+     */
+    c_restart.m_waiting_on_self = 1;
+    SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
+    conf->senderRef        = reference();
+    conf->senderData       = subPtr.i;
+    sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
+               SubCreateConf::SignalLength, JBB);
   }
-  ndbrequire(false);
 }
 
 void 
-Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef)
+Suma::execSUB_CREATE_REF(Signal* signal)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::completeSubscription");
-  startSubscriber(signal, sumaRef);
-  DBUG_VOID_RETURN;
-}
+  jamEntry();
+
+  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
+  Uint32 error= ref->errorCode;
 
-void 
-Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::startSubscriber");
-  suma.c_tables.first(c_tabIt);
-  if (c_tabIt.isNull())
-  {
-    completeSubscriber(signal, sumaRef);
-    DBUG_VOID_RETURN;
-  }
-  SubscriberPtr subbPtr;
   {
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
-    subbs.first(subbPtr);
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = error;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
   }
-  nextSubscriber(signal, sumaRef, subbPtr);
-  DBUG_VOID_RETURN;
+
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
+  abort_start_me(signal, subPtr, true);
 }
 
 void 
-Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef,
-			      SubscriberPtr subbPtr)
+Suma::execSUB_CREATE_CONF(Signal* signal)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::nextSubscriber");
-  while (subbPtr.isNull())
-  {
-    jam();
-    DBUG_PRINT("info",("prev tableId %u",c_tabIt.curr.p->m_tableId));
-    suma.c_tables.next(c_tabIt);
-    if (c_tabIt.isNull())
-    {
-      completeSubscriber(signal, sumaRef);
-      DBUG_VOID_RETURN;
-    }
-    DBUG_PRINT("info",("next tableId %u",c_tabIt.curr.p->m_tableId));
-
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
-    subbs.first(subbPtr);
-  }
+  jamEntry();
 
-  /*
-   * get subscription ptr for this subscriber
+  /**
+   * We have lock...start all subscriber(s)
    */
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-  SubscriptionPtr subPtr;
-  suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-  switch (subPtr.p->m_subscriptionType) {
-  case SubCreateReq::TableEvent:
+  c_restart.m_waiting_on_self = 0;
+
+  /**
+   * Check if we were aborted...
+   *  this signal is sent to self in case of DROPPED subscription...
+   */
+  if (c_restart.m_abort)
+  {
     jam();
-    sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
-    DBUG_VOID_RETURN;
-  case SubCreateReq::SelectiveTableSnapshot:
-  case SubCreateReq::DatabaseSnapshot:
-  case SubCreateReq::SingleTableScan:
-    ndbrequire(false);
+    abort_start_me(signal, subPtr, true);
+    return;
   }
-  ndbrequire(false);
+
+  Ptr<Subscriber> ptr;
+  if (subPtr.p->m_state != Subscription::DROPPED)
+  {
+    LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+    list.first(ptr);
+  }
+  else
+  {
+    ptr.setNull();
+  }
+
+  copySubscriber(signal, subPtr, ptr);
 }
 
 void
-Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
-			       Signal* signal, Uint32 sumaRef)
+Suma::copySubscriber(Signal* signal,
+                     Ptr<Subscription> subPtr,
+                     Ptr<Subscriber> ptr)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::sendSubStartReq");
-  SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
+  if (!ptr.isNull())
+  {
+    jam();
 
-  req->senderRef        = suma.reference();
-  req->senderData       = subbPtr.i;
-  req->subscriptionId   = subPtr.p->m_subscriptionId;
-  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
-  req->part             = SubscriptionData::TableData;
-  req->subscriberData   = subbPtr.p->m_senderData;
-  req->subscriberRef    = subbPtr.p->m_senderRef;
-
-  // restarting suma will not respond to this until startphase 5
-  // since it is not until then data copying has been completed
-  DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u] %u",
-		     subbPtr.i,
-		     subPtr.p->m_subscriptionId,
-		     subPtr.p->m_subscriptionKey,
-		     subPtr.p->m_tableId));
+    SubStartReq* req = (SubStartReq*)signal->getDataPtrSend();
+    req->senderRef        = reference();
+    req->senderData       = ptr.i;
+    req->subscriptionId   = subPtr.p->m_subscriptionId;
+    req->subscriptionKey  = subPtr.p->m_subscriptionKey;
+    req->part             = SubscriptionData::TableData;
+    req->subscriberData   = ptr.p->m_senderData;
+    req->subscriberRef    = ptr.p->m_senderRef;
 
-  suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
-		  signal, SubStartReq::SignalLength2, JBB);
-  DBUG_VOID_RETURN;
+    sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
+               signal, SubStartReq::SignalLength, JBB);
+    return;
+  }
+  else
+  {
+    // remove lock from this subscription
+    Ptr<SubOpRecord> subOpPtr;
+    c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
+    check_remove_queue(signal, subPtr, subOpPtr, true, false);
+    check_release_subscription(signal, subPtr);
+
+    DLHashTable<Subscription>::Iterator it;
+    it.curr = subPtr;
+    it.bucket = c_restart.m_bucket;
+    c_subscriptions.next(it);
+    copySubscription(signal, it);
+  }
 }
 
 void 
-Suma::Restart::runSUB_START_CONF(Signal* signal)
+Suma::execSUB_START_CONF(Signal* signal)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::runSUB_START_CONF");
+  jamEntry();
 
   SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
 
-  Subscription key;
-  SubscriptionPtr subPtr;
-  key.m_subscriptionId  = conf->subscriptionId;
-  key.m_subscriptionKey = conf->subscriptionKey;
-  ndbrequire(suma.c_subscriptions.find(subPtr, key));
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-  TablePtr tabPtr;
-  ndbrequire(suma.c_tables.find(tabPtr, subPtr.p->m_tableId));
+  Ptr<Subscriber> ptr;
+  c_subscriberPool.getPtr(ptr, conf->senderData);
+
+  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
+  list.next(ptr);
+  copySubscriber(signal, subPtr, ptr);
+}
+
+void
+Suma::execSUB_START_REF(Signal* signal)
+{
+  jamEntry();
+
+  SubStartRef * sig = (SubStartRef*)signal->getDataPtr();
+  Uint32 errorCode = sig->errorCode;
 
-  SubscriberPtr subbPtr;
   {
-    LocalDLList<Subscriber>
-      subbs(suma.c_subscriberPool,tabPtr.p->c_subscribers);
-    subbs.getPtr(subbPtr, conf->senderData);
-    DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u",
-		       subbPtr.i,key.m_subscriptionId,key.m_subscriptionKey,
-		       subPtr.p->m_tableId));
-    subbs.next(subbPtr);
+    SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
+    ref->errorCode = errorCode;
+    sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
+               SumaStartMeRef::SignalLength, JBB);
   }
 
-  Uint32 sumaRef = signal->getSendersBlockRef();
-  nextSubscriber(signal, sumaRef, subbPtr);
-
-  DBUG_VOID_RETURN;
-}
+  Ptr<Subscription> subPtr;
+  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
 
-void 
-Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef)
-{
-  DBUG_ENTER("Suma::Restart::completeSubscriber");
-  completeRestartingNode(signal, sumaRef);
-  DBUG_VOID_RETURN;
+  abort_start_me(signal, subPtr, true);
 }
 
 void
-Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef)
+Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr,
+                     bool lockowner)
 {
-  jam();
-  DBUG_ENTER("Suma::Restart::completeRestartingNode");
-  //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
-  suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
-		  SumaStartMeConf::SignalLength, JBB);
-  resetRestart(signal);
-  DBUG_VOID_RETURN;
-}
+  Ptr<SubOpRecord> subOpPtr;
+  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
+  check_remove_queue(signal, subPtr, subOpPtr, lockowner, true);
+  check_release_subscription(signal, subPtr);
 
-void
-Suma::Restart::resetRestart(Signal* signal)
-{
-  jam();
-  DBUG_ENTER("Suma::Restart::resetRestart");
-  nodeId = 0;
-  DBUG_VOID_RETURN;
+  c_restart.m_ref = 0;
 }
 
-// only run on restarting suma
-
 void
 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
 {
@@ -5162,9 +5063,6 @@ Suma::resend_bucket(Signal* signal, Uint
   {
     free_page(tail, page);
     tail = bucket->m_buffer_tail = next_page;
-    ndbout_c("pos==0 && min_gci(%u/%u) > max_gci(%u/%u) resend switching page to %d", 
-	     Uint32(min_gci >> 32), Uint32(min_gci), 
-	     Uint32(max_gci >> 32), Uint32(max_gci), tail);
     goto next;
   }
   
@@ -5228,8 +5126,11 @@ Suma::resend_bucket(Signal* signal, Uint
   
       char buf[255];
       c_subscriber_nodes.getText(buf);
-      ndbout_c("resending GCI: %u/%u rows: %d -> %s", 
-	       Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
+      if (g_cnt)
+      {      
+        ndbout_c("resending GCI: %u/%u rows: %d -> %s", 
+                 Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
+      }
       g_cnt = 0;
       
       NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
@@ -5240,7 +5141,7 @@ Suma::resend_bucket(Signal* signal, Uint
     {
       const uint buffer_header_sz = 4;
       g_cnt++;
-      Uint32 table = * src++ ;
+      Uint32 subPtrI = * src++ ;
       Uint32 schemaVersion = * src++;
       Uint32 event = * src >> 16;
       Uint32 sz_1 = (* src ++) & 0xFFFF;
@@ -5259,9 +5160,12 @@ Suma::resend_bucket(Signal* signal, Uint
       /**
        * Signal to subscriber(s)
        */
+      Ptr<Subscription> subPtr;
+      c_subscriptionPool.getPtr(subPtr, subPtrI);
       Ptr<Table> tabPtr;
-      if (c_tables.find(tabPtr, table) && 
-          table_version_major(tabPtr.p->m_schemaVersion) ==
+      c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+      Uint32 table = subPtr.p->m_tableId;
+      if (table_version_major(tabPtr.p->m_schemaVersion) ==
           table_version_major(schemaVersion))
       {
 	SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
@@ -5275,18 +5179,16 @@ Suma::resend_bucket(Signal* signal, Uint
 	data->totalLen       = ptrLen;
 	
 	{
-	  LocalDLList<Subscriber> 
-	    list(c_subscriberPool,tabPtr.p->c_subscribers);
-	  SubscriberPtr subbPtr;
-	  for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
-	  {
-	    DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
-			       refToNode(subbPtr.p->m_senderRef)));
-	    data->senderData = subbPtr.p->m_senderData;
-	    sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
-		       SubTableData::SignalLength, JBB, ptr, nptr);
-	  }
-	}
+          LocalDLList<Subscriber> list(c_subscriberPool,
+                                       subPtr.p->m_subscribers);
+          SubscriberPtr subbPtr;
+          for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
+          {
+            data->senderData = subbPtr.p->m_senderData;
+            sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+                       SubTableData::SignalLength, JBB, ptr, nptr);
+          }
+        }
       }
     }
     
@@ -5302,7 +5204,6 @@ Suma::resend_bucket(Signal* signal, Uint
     tail = bucket->m_buffer_tail = next_page;
     pos = 0;
     last_gci = 0;
-    ndbout_c("ptr == end -> resend switching page to %d", tail);
   }
   else
   {
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.hpp b/storage/ndb/src/kernel/blocks/suma/Suma.hpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2008-02-11 14:24:16 +01:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2008-02-20 10:04:24 +01:00
@@ -48,20 +48,13 @@ public:
   
   void execSUB_START_REQ(Signal* signal);
   void execSUB_STOP_REQ(Signal* signal);
-  
+
   void execSUB_SYNC_REQ(Signal* signal);
   void execSUB_ABORT_SYNC_REQ(Signal* signal);
 
-  void execSUB_STOP_CONF(Signal* signal);
-  void execSUB_STOP_REF(Signal* signal);
-
  /**
    * Dict interface
    */
-#if 0
-  void execLIST_TABLES_REF(Signal* signal);
-  void execLIST_TABLES_CONF(Signal* signal);
-#endif
   void execGET_TABINFOREF(Signal* signal);
   void execGET_TABINFO_CONF(Signal* signal);
 
@@ -71,6 +64,10 @@ public:
   void execDROP_TAB_CONF(Signal* signal);
   void execALTER_TAB_REQ(Signal* signal);
   void execCREATE_TAB_CONF(Signal* signal);
+
+  void execDICT_LOCK_REF(Signal*);
+  void execDICT_LOCK_CONF(Signal*);
+
   /**
    * Scan interface
    */
@@ -137,62 +134,12 @@ public:
   struct Subscriber {
     Uint32 m_senderRef;
     Uint32 m_senderData;
-    Uint32 m_subPtrI; //reference to subscription
     Uint32 nextList;
 
     union { Uint32 nextPool; Uint32 prevList; };
   };
   typedef Ptr<Subscriber> SubscriberPtr;
 
-  /**
-   * Subscriptions
-   */
-
-  struct Subscription {
-    Subscription() {}
-    Uint32 m_senderRef;
-    Uint32 m_senderData;
-    Uint32 m_subscriptionId;
-    Uint32 m_subscriptionKey;
-    Uint32 m_subscriptionType;
-    Uint16 m_options;
-
-    enum Options {
-      REPORT_ALL       = 0x1,
-      REPORT_SUBSCRIBE = 0x2
-    };
-
-    enum State {
-      UNDEFINED,
-      LOCKED,
-      DEFINED,
-      DROPPED
-    };
-    State m_state;
-    Uint32 n_subscribers;
-
-    Uint32 nextHash;
-    union { Uint32 prevHash; Uint32 nextPool; };
-
-    Uint32 hashValue() const {
-      return m_subscriptionId + m_subscriptionKey;
-    }
-
-    bool equal(const Subscription & s) const {
-      return 
-	m_subscriptionId == s.m_subscriptionId && 
-	m_subscriptionKey == s.m_subscriptionKey;
-    }
-    /**
-     * The following holds the tables included 
-     * in the subscription.
-     */
-    Uint32 m_tableId;
-    Uint32 m_table_ptrI;
-    Uint32 m_current_sync_ptrI;
-  };
-  typedef Ptr<Subscription> SubscriptionPtr;
-
   class Table;
   friend class Table;
   typedef Ptr<Table> TablePtr;
@@ -245,65 +192,119 @@ public:
   };
   friend struct SyncRecord;
 
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
-		Ptr<SyncRecord> syncPtr);
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
-		SubscriberPtr subbPtr);
-  int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
-  
-  int completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr);
-  void completeAllSubscribers(Signal* signal, TablePtr tabPtr);
-  void completeInitTable(Signal* signal, TablePtr tabPtr);
+  struct SubOpRecord
+  {
+    enum OpType
+    {
+      R_SUB_START_REQ,
+      R_SUB_STOP_REQ,
+      R_START_ME_REQ,
+      R_API_FAIL_REQ,
+      R_SUB_ABORT_START_REQ,
+    };
+
+    Uint32 m_opType;
+    Uint32 m_subPtrI;
+    Uint32 m_senderRef;
+    Uint32 m_senderData;
+    Uint32 m_subscriberRef;
+    Uint32 m_subscriberData;
+
+    Uint32 nextList;
+    union {
+      Uint32 prevList;
+      Uint32 nextPool;
+    };
+  };
+  friend struct SubOpRecord;
+
+  struct Subscription
+  {
+    Uint32 m_seq_no;
+    Uint32 m_subscriptionId;
+    Uint32 m_subscriptionKey;
+    Uint32 m_subscriptionType;
+    Uint16 m_options;
+
+    enum Options {
+      REPORT_ALL       = 0x1,
+      REPORT_SUBSCRIBE = 0x2
+    };
+
+    enum State {
+      UNDEFINED,
+      DEFINED,
+      DROPPED,
+      DEFINING
+    };
+
+    enum TriggerState {
+      T_UNDEFINED,
+      T_CREATING,
+      T_DEFINED,
+      T_DROPPING,
+      T_ERROR
+    };
+
+    State m_state;
+    TriggerState m_trigger_state;
+
+    DLList<Subscriber>::Head m_subscribers;
+    DLFifoList<SubOpRecord>::Head m_create_req;
+    DLFifoList<SubOpRecord>::Head m_start_req;
+    DLFifoList<SubOpRecord>::Head m_stop_req;
+    DLList<SyncRecord>::Head m_syncRecords;
+    
+    Uint32 m_errorCode;
+    Uint32 m_outstanding_trigger;
+    Uint32 m_triggers[3];
+
+    Uint32 nextList, prevList;
+    Uint32 nextHash;
+    union { Uint32 prevHash; Uint32 nextPool; };
+
+    Uint32 hashValue() const {
+      return m_subscriptionId + m_subscriptionKey;
+    }
+
+    bool equal(const Subscription & s) const {
+      return
+	m_subscriptionId == s.m_subscriptionId &&
+	m_subscriptionKey == s.m_subscriptionKey;
+    }
+    /**
+     * The following holds the tables included
+     * in the subscription.
+     */
+    Uint32 m_tableId;
+    Uint32 m_table_ptrI;
+  };
+  typedef Ptr<Subscription> SubscriptionPtr;
 
   struct Table {
-    Table() { m_tableId = ~0; n_subscribers = 0; }
+    Table() { m_tableId = ~0; }
     void release(Suma&);
-    void checkRelease(Suma &suma);
 
-    DLList<Subscriber>::Head c_subscribers;
-    DLList<SyncRecord>::Head c_syncRecords;
+    DLList<Subscription>::Head m_subscriptions;
 
     enum State {
       UNDEFINED,
       DEFINING,
       DEFINED,
-      DROPPED,
-      ALTERED
+      DROPPED
     };
     State m_state;
 
     Uint32 m_ptrI;
-    SubscriberPtr m_drop_subbPtr;
-
-    Uint32 n_subscribers;
-    bool m_reportAll;
 
     bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
     /**
      * Create triggers
      */
-    int setupTrigger(Signal* signal, Suma &suma);
-    void completeTrigger(Signal* signal);
     void createAttributeMask(AttributeMask&, Suma &suma);
     
-    /**
-     * Drop triggers
-     */
-    void dropTrigger(Signal* signal,Suma&);
-    void runDropTrigger(Signal* signal, Uint32 triggerId,Suma&);
-
-    /**
-     * Sync meta
-     */    
-#if 0
-    void runLIST_TABLES_CONF(Signal* signal);
-#endif
-    
     union { Uint32 m_tableId; Uint32 key; };
     Uint32 m_schemaVersion;
-    Uint8  m_hasTriggerDefined[3]; // Insert/Update/Delete
-    Uint8  m_hasOutstandingTriggerReq[3]; // Insert/Update/Delete
-    Uint32 m_triggerIds[3]; // Insert/Update/Delete
 
     Uint32 m_error;
     
@@ -312,9 +313,9 @@ public:
      */
     Uint32 m_fragCount;
     DataBuffer<15>::Head m_fragments;  // Fragment descriptors
-
-    Uint32 m_noOfAttributes;
     
+    Uint32 m_noOfAttributes;
+
     /**
      * Hash table stuff
      */
@@ -331,8 +332,6 @@ public:
   /**
    * 
    */
-  DLList<Subscriber> c_metaSubscribers;
-  DLList<Subscriber> c_removeDataSubscribers;
 
   /**
    * Lists
@@ -348,6 +347,7 @@ public:
   ArrayPool<Subscription> c_subscriptionPool;
   ArrayPool<SyncRecord> c_syncPool;
   DataBuffer<15>::DataBufferPool c_dataBufferPool;
+  ArrayPool<SubOpRecord> c_subOpPool;
 
   Uint32 c_maxBufferedGcp;
 
@@ -358,37 +358,44 @@ public:
    */
   bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
 
-  bool checkTableTriggers(SegmentedSectionPtr ptr);
+  void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
 
-  void addTableId(Uint32 TableId,
-		  SubscriptionPtr subPtr, SyncRecord *psyncRec);
+  void sendSubCreateRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void sendSubStopRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
+  void report_sub_stop_conf(Signal* signal,
+                            Ptr<SubOpRecord> subOpPtr,
+                            Ptr<Subscriber> ptr,
+                            bool report,
+                            LocalDLList<Subscriber>& list);
 
-  void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
-  void sendSubCreateRef(Signal* signal, Uint32 errorCode);
-  void sendSubStartRef(Signal*, SubscriberPtr, Uint32 errorCode, SubscriptionData::Part);
-  void sendSubStartRef(Signal* signal, Uint32 errorCode);
-  void sendSubStopRef(Signal* signal, Uint32 errorCode);
   void sendSubSyncRef(Signal* signal, Uint32 errorCode);  
   void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
 			Uint32 errorCode);
-  void sendSubStartComplete(Signal*, SubscriberPtr, Uint64 gci,
-			    SubscriptionData::Part);
-  void sendSubStopComplete(Signal*, SubscriberPtr);
   void sendSubStopReq(Signal* signal, bool unlock= false);
 
   void completeSubRemove(SubscriptionPtr subPtr);
+  
 
-  void reportAllSubscribers(Signal *signal,
-                            NdbDictionary::Event::_TableEvent table_event,
-                            SubscriptionPtr subPtr,
-                            SubscriberPtr subbPtr);
-
+  void send_sub_start_stop_event(Signal *signal,
+                                 Ptr<Subscriber> ptr,
+                                 NdbDictionary::Event::_TableEvent event,
+                                 bool report,
+                                 LocalDLList<Subscriber>& list);
+  
   Uint32 getFirstGCI(Signal* signal);
 
-  /**
-   * Table admin
-   */
-  void convertNameToId( SubscriptionPtr subPtr, Signal * signal);
+  void create_triggers(Signal*, Ptr<Subscription>);
+  void drop_triggers(Signal*, Ptr<Subscription>);
+  void drop_triggers_complete(Signal*, Ptr<Subscription>);
+
+  void report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr);
+  void report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32);
+
+  void sub_stop_req(Signal*);
+  void check_remove_queue(Signal*, Ptr<Subscription>,
+                          Ptr<SubOpRecord>,bool,bool);
+  void check_release_subscription(Signal* signal, Ptr<Subscription>);
 
   /**
    * Public interface
@@ -421,6 +428,10 @@ public:
   void execAPI_START_REP(Signal* signal);
   void execAPI_FAILREQ(Signal* signal) ;
 
+  void api_fail_gci_list(Signal*, Uint32 node);
+  void api_fail_subscriber_list(Signal*, Uint32 node);
+  void api_fail_subscription(Signal*);
+
   void execSUB_GCP_COMPLETE_ACK(Signal* signal);
 
   /**
@@ -441,6 +452,12 @@ public:
   void execSUMA_START_ME_REQ(Signal* signal);
   void execSUMA_START_ME_REF(Signal* signal);
   void execSUMA_START_ME_CONF(Signal* signal);
+
+  void copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator);
+  void sendSubCreateReq(Signal* signal, Ptr<Subscription>);
+  void copySubscriber(Signal*, Ptr<Subscription>, Ptr<Subscriber>);
+  void abort_start_me(Signal*, Ptr<Subscription>, bool lockowner);
+
   void execSUMA_HANDOVER_REQ(Signal* signal);
   void execSUMA_HANDOVER_REF(Signal* signal);
   void execSUMA_HANDOVER_CONF(Signal* signal);
@@ -460,44 +477,10 @@ public:
    * for Suma that is restarting another
    */
 
-  struct Restart {
-    Restart(Suma& s);
-
-    Suma & suma;
-    Uint32 nodeId;
-
-    DLHashTable<Subscription>::Iterator c_subIt;
-    KeyTable<Table>::Iterator c_tabIt;
-
-    void progError(int line, int cause, const char * extra) { 
-      suma.progError(line, cause, extra); 
-    }
-
-    void resetNode(Uint32 sumaRef);
-    void runSUMA_START_ME_REQ(Signal*, Uint32 sumaRef);
-    void startNode(Signal*, Uint32 sumaRef);
-
-    void createSubscription(Signal* signal, Uint32 sumaRef);
-    void nextSubscription(Signal* signal, Uint32 sumaRef);
-    void runSUB_CREATE_CONF(Signal* signal);
-    void completeSubscription(Signal* signal, Uint32 sumaRef);
-
-    void startSubscriber(Signal* signal, Uint32 sumaRef);
-    void nextSubscriber(Signal* signal, Uint32 sumaRef, SubscriberPtr subbPtr);
-    void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
-			 Signal* signal, Uint32 sumaRef);
-    void runSUB_START_CONF(Signal* signal);
-    void completeSubscriber(Signal* signal, Uint32 sumaRef);
-
-    void completeRestartingNode(Signal* signal, Uint32 sumaRef);
-    void resetRestart(Signal* signal);
-  } Restart;
-
   // for LQH transporter overload check
   const NodeBitmask& getSubscriberNodes() const { return c_subscriber_nodes; }
 
 private:
-  friend class Restart;
   /**
    * Variables
    */
@@ -513,6 +496,19 @@ private:
     Uint32 m_restart_server_node_id;
     NdbNodeBitmask m_handover_nodes;
   } c_startup;
+
+  struct Restart
+  {
+    Uint16 m_abort;
+    Uint16 m_waiting_on_self;
+    Uint32 m_ref;
+    Uint32 m_max_seq;
+    Uint32 m_subPtrI;
+    Uint32 m_subOpPtrI;
+    Uint32 m_bucket; // In c_subscribers hashtable
+  } c_restart;
+
+  Uint32 c_current_seq; // Sequence no on subscription(s)
   
   NodeBitmask c_connected_nodes;  // (NODE/API) START REP / (API/NODE) FAIL REQ
   NodeBitmask c_subscriber_nodes; // 
@@ -525,6 +521,7 @@ private:
   Uint32 c_nodesInGroup[MAX_REPLICAS];
   NdbNodeBitmask c_nodes_in_nodegroup_mask;  // NodeId's of nodes in nodegroup
 
+  void send_dict_lock_req(Signal* signal);
   void send_start_me_req(Signal* signal);
   void check_start_handover(Signal* signal);
   void send_handover_req(Signal* signal);
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
--- a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2008-02-11 15:05:02 +01:00
+++ b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2008-02-20 10:04:24 +01:00
@@ -20,11 +20,8 @@
 
 Suma::Suma(Block_context& ctx) :
   SimulatedBlock(SUMA, ctx),
-  c_metaSubscribers(c_subscriberPool),
-  c_removeDataSubscribers(c_subscriberPool),
   c_tables(c_tablePool),
   c_subscriptions(c_subscriptionPool),
-  Restart(*this),
   c_gcp_list(c_gcp_pool)
 {
   BLOCK_CONSTRUCTOR(Suma);
@@ -69,8 +66,6 @@ Suma::Suma(Block_context& ctx) :
   addRecSignal(GSN_SUB_REMOVE_REQ, &Suma::execSUB_REMOVE_REQ);
   addRecSignal(GSN_SUB_START_REQ, &Suma::execSUB_START_REQ);
   addRecSignal(GSN_SUB_STOP_REQ, &Suma::execSUB_STOP_REQ);
-  addRecSignal(GSN_SUB_STOP_REF, &Suma::execSUB_STOP_REF);
-  addRecSignal(GSN_SUB_STOP_CONF, &Suma::execSUB_STOP_CONF);
   addRecSignal(GSN_SUB_SYNC_REQ, &Suma::execSUB_SYNC_REQ);
 
   /**
@@ -80,15 +75,12 @@ Suma::Suma(Block_context& ctx) :
   addRecSignal(GSN_ALTER_TAB_REQ, &Suma::execALTER_TAB_REQ);
   addRecSignal(GSN_CREATE_TAB_CONF, &Suma::execCREATE_TAB_CONF);
 
-#if 0
-  addRecSignal(GSN_LIST_TABLES_CONF, &Suma::execLIST_TABLES_CONF);
-#endif
   addRecSignal(GSN_GET_TABINFO_CONF, &Suma::execGET_TABINFO_CONF);
   addRecSignal(GSN_GET_TABINFOREF, &Suma::execGET_TABINFOREF);
-#if 0
-  addRecSignal(GSN_GET_TABLEID_CONF, &Suma::execGET_TABLEID_CONF);
-  addRecSignal(GSN_GET_TABLEID_REF, &Suma::execGET_TABLEID_REF);
-#endif
+
+  addRecSignal(GSN_DICT_LOCK_REF, &Suma::execDICT_LOCK_REF);
+  addRecSignal(GSN_DICT_LOCK_CONF, &Suma::execDICT_LOCK_CONF);
+
   /**
    * Dih interface
    */
@@ -124,6 +116,8 @@ Suma::Suma(Block_context& ctx) :
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, 
 	       &Suma::execSUB_GCP_COMPLETE_REP);
 
+  c_current_seq = 0;
+  c_restart.m_ref = 0;
   c_startup.m_restart_server_node_id = RNIL; // Server for my NR
 
 #ifdef VM_TRACE
diff -Nrup a/storage/ndb/src/kernel/blocks/trix/Trix.cpp b/storage/ndb/src/kernel/blocks/trix/Trix.cpp
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2007-01-06 01:21:24 +01:00
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2008-02-20 10:04:24 +01:00
@@ -683,6 +683,7 @@ void Trix::execSUB_SYNC_CONTINUE_REQ(Sig
   }
   subRecPtr.p = subRec;
   subRec->pendingSubSyncContinueConf = true;
+  subRec->syncPtr = subSyncContinueReq->senderData;
   checkParallelism(signal, subRec);
 }
 
@@ -985,6 +986,7 @@ void Trix::checkParallelism(Signal* sign
       (SubSyncContinueConf *) signal->getDataPtrSend();
     subSyncContinueConf->subscriptionId = subRec->subscriptionId;
     subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
+    subSyncContinueConf->senderData = subRec->syncPtr;
     sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 
 	       SubSyncContinueConf::SignalLength , JBB);  
     subRec->pendingSubSyncContinueConf = false;
diff -Nrup a/storage/ndb/src/kernel/blocks/trix/Trix.hpp b/storage/ndb/src/kernel/blocks/trix/Trix.hpp
--- a/storage/ndb/src/kernel/blocks/trix/Trix.hpp	2006-12-23 20:20:18 +01:00
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.hpp	2008-02-20 10:04:24 +01:00
@@ -113,6 +113,7 @@ private:
     Uint32 noOfIndexColumns;
     Uint32 noOfKeyColumns;
     Uint32 parallelism;
+    Uint32 syncPtr;
     BuildIndxRef::ErrorCode errorCode;
     bool subscriptionCreated;
     bool pendingSubSyncContinueConf;
diff -Nrup a/storage/ndb/src/kernel/vm/RequestTracker.hpp b/storage/ndb/src/kernel/vm/RequestTracker.hpp
--- a/storage/ndb/src/kernel/vm/RequestTracker.hpp	2007-01-06 01:21:24 +01:00
+++ b/storage/ndb/src/kernel/vm/RequestTracker.hpp	2008-02-20 10:04:24 +01:00
@@ -48,9 +48,10 @@ public:
 
   bool done() { return m_sc.done(); }
 
+  NdbNodeBitmask m_confs;
+
 private:
   SafeCounterHandle m_sc;
-  NdbNodeBitmask m_confs;
   Uint8 m_nRefs;
 };
 
diff -Nrup a/storage/ndb/src/kernel/vm/SafeCounter.cpp b/storage/ndb/src/kernel/vm/SafeCounter.cpp
--- a/storage/ndb/src/kernel/vm/SafeCounter.cpp	2007-01-06 01:21:24 +01:00
+++ b/storage/ndb/src/kernel/vm/SafeCounter.cpp	2008-02-20 10:04:24 +01:00
@@ -33,6 +33,11 @@ SafeCounterManager::getSize() const {
   return m_counterPool.getSize();
 }
 
+Uint32
+SafeCounterManager::getNoOfFree() const {
+  return m_counterPool.getNoOfFree();
+}
+
 bool
 SafeCounterManager::seize(ActiveCounterPtr& ptr){
   return m_activeCounters.seize(ptr);
diff -Nrup a/storage/ndb/src/kernel/vm/SafeCounter.hpp b/storage/ndb/src/kernel/vm/SafeCounter.hpp
--- a/storage/ndb/src/kernel/vm/SafeCounter.hpp	2007-01-06 01:21:24 +01:00
+++ b/storage/ndb/src/kernel/vm/SafeCounter.hpp	2008-02-20 10:04:24 +01:00
@@ -64,6 +64,7 @@ public:
   
   bool setSize(Uint32 maxNoOfActiveMutexes, bool exit_on_error = true);
   Uint32 getSize() const ;
+  Uint32 getNoOfFree() const;
 
   void execNODE_FAILREP(Signal*); 
   void printNODE_FAILREP(); 
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2007-12-13 12:03:55 +01:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-02-20 10:04:24 +01:00
@@ -68,39 +68,6 @@ SimulatedBlock::SimulatedBlock(BlockNumb
   c_fragmentIdCounter = 1;
   c_fragSenderRunning = false;
   
-  Properties tmp;
-  const Properties * p = &tmp;
-  ndbrequire(p != 0);
-
-  Uint32 count = 10;
-  char buf[255];
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentSendPool", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentSendPool", &count);
-  c_fragmentSendPool.setSize(count);
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentInfoPool", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentInfoPool", &count);
-  c_fragmentInfoPool.setSize(count);
-
-  count = 10;
-  BaseString::snprintf(buf, 255, "%s.FragmentInfoHash", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("FragmentInfoHash", &count);
-  c_fragmentInfoHash.setSize(count);
-
-  count = 5;
-  BaseString::snprintf(buf, 255, "%s.ActiveMutexes", getBlockName(blockNumber));
-  if(!p->get(buf, &count))
-    p->get("ActiveMutexes", &count);
-  c_mutexMgr.setSize(count);
-  
-  c_counterMgr.setSize(5);
-  
 #ifdef VM_TRACE_TIME
   clearTimes();
 #endif
@@ -117,6 +84,30 @@ SimulatedBlock::SimulatedBlock(BlockNumb
   m_global_variables = new Ptr<void> * [1];
   m_global_variables[0] = 0;
 #endif
+}
+
+void
+SimulatedBlock::initCommon()
+{
+  Uint32 count = 10;
+  this->getParam("FragmentSendPool", &count);
+  c_fragmentSendPool.setSize(count);
+
+  count = 10;
+  this->getParam("FragmentInfoPool", &count);
+  c_fragmentInfoPool.setSize(count);
+
+  count = 10;
+  this->getParam("FragmentInfoHash", &count);
+  c_fragmentInfoHash.setSize(count);
+
+  count = 5;
+  this->getParam("ActiveMutexes", &count);
+  c_mutexMgr.setSize(count);
+  
+  count = 5;
+  this->getParam("ActiveCounters", &count);
+  c_counterMgr.setSize(count);
 }
 
 SimulatedBlock::~SimulatedBlock()
diff -Nrup a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2007-07-11 14:38:00 +02:00
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-02-20 10:04:24 +01:00
@@ -114,6 +114,7 @@ protected:
   void addRecSignalImpl(GlobalSignalNumber g, ExecFunction fun, bool f =false);
   void installSimulatedBlockFunctions();
   ExecFunction theExecArray[MAX_GSN+1];
+  void initCommon();
 public:
   /**
    * 
@@ -523,6 +524,7 @@ public: 
   MutexManager c_mutexMgr;
 
   void ignoreMutexUnlockCallback(Signal* signal, Uint32 ptrI, Uint32 retVal);
+  virtual bool getParam(const char * param, Uint32 * retVal) { return false;}
 
   SafeCounterManager c_counterMgr;
 private:
@@ -809,7 +811,8 @@ public:\
 private: \
   void addRecSignal(GlobalSignalNumber gsn, ExecSignalLocal f, bool force = false)
 
-#define BLOCK_CONSTRUCTOR(BLOCK)
+#define BLOCK_CONSTRUCTOR(BLOCK) { initCommon();}
+
 
 #define BLOCK_FUNCTIONS(BLOCK) \
 void \
diff -Nrup a/storage/ndb/src/mgmsrv/ConfigInfo.cpp b/storage/ndb/src/mgmsrv/ConfigInfo.cpp
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-02-11 15:05:02 +01:00
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-02-20 10:04:24 +01:00
@@ -352,6 +352,42 @@ const ConfigInfo::ParamInfo ConfigInfo::
     STR_VALUE(MAX_INT_RNIL) },
 
   { 
+    CFG_DB_SUBSCRIPTIONS,
+    "MaxNoOfSubscriptions",
+    DB_TOKEN,
+    "Max no of subscriptions (default 0 == MaxNoOfTables)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "0",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
+    CFG_DB_SUBSCRIBERS,
+    "MaxNoOfSubscribers",
+    DB_TOKEN,
+    "Max no of subscribers (default 0 == 2 * MaxNoOfTables)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "0",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
+    CFG_DB_SUB_OPERATIONS,
+    "MaxNoOfConcurrentSubOperations",
+    DB_TOKEN,
+    "Max no of concurrent subscriber operations",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "256",
+    "0",
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
     KEY_INTERNAL,
     "TcpBind_INADDR_ANY",
     DB_TOKEN,
diff -Nrup a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
--- a/storage/ndb/src/ndbapi/Ndb.cpp	2008-02-11 16:17:19 +01:00
+++ b/storage/ndb/src/ndbapi/Ndb.cpp	2008-02-20 10:04:24 +01:00
@@ -1827,17 +1827,7 @@ int Ndb::dropEventOperation(NdbEventOper
   // remove it from list
   NdbEventOperationImpl *op=
     NdbEventBuffer::getEventOperationImpl(tOp);
-  if (op->m_next)
-    op->m_next->m_prev= op->m_prev;
-  if (op->m_prev)
-    op->m_prev->m_next= op->m_next;
-  else
-    theImpl->m_ev_op= op->m_next;
-
-  DBUG_PRINT("info", ("first: %s",
-                      theImpl->m_ev_op ? theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
-  assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);
-
+  
   theEventBuffer->dropEventOperation(tOp);
   DBUG_RETURN(0);
 }
diff -Nrup a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-02-07 10:21:39 +01:00
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-02-20 10:04:25 +01:00
@@ -1729,7 +1729,13 @@ NdbDictInterface::dictSignal(NdbApiSigna
   for(Uint32 i = 0; i<RETRIES; i++)
   {
     if (i > 0)
-      NdbSleep_MilliSleep(sleep + 10 * (rand() % mod));
+    {
+      Uint32 t = sleep + 10 * (rand() % mod);
+#ifdef VM_TRACE
+      ndbout_c("retry sleep %ums on error %u", t, m_error.code);
+#endif
+      NdbSleep_MilliSleep(t);
+    }
     if (i == RETRIES / 2)
     {
       mod = 10;
@@ -1803,9 +1809,12 @@ NdbDictInterface::dictSignal(NdbApiSigna
     {
       const NdbError &error= getNdbError();
       if (error.status ==  NdbError::TemporaryError)
-	continue;
+      {
+        continue;
+      }
     }
-    else if ( (temporaryMask & m_error.code) != 0 ) {
+    else if ( (temporaryMask & m_error.code) != 0 )
+    {
       continue;
     }
     DBUG_PRINT("info", ("dictSignal caught error= %d", m_error.code));
@@ -1819,7 +1828,9 @@ NdbDictInterface::dictSignal(NdbApiSigna
 	}
       }
       if(errcodes[j]) // Accepted error code
-	continue;
+      {
+        continue;
+      }
     }
     break;
   }
@@ -3822,7 +3833,7 @@ NdbDictInterface::executeSubscribeEvent(
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_SUB_START_REQ;
-  tSignal.theLength = SubStartReq::SignalLength2;
+  tSignal.theLength = SubStartReq::SignalLength;
   
   SubStartReq * req = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
 
@@ -3836,11 +3847,15 @@ NdbDictInterface::executeSubscribeEvent(
 		     "subscriberData=%d",req->subscriptionId,
 		     req->subscriptionKey,req->subscriberData));
 
+  int errCodes[] = { SubStartRef::Busy,
+                     SubStartRef::BusyWithNR,
+                     SubStartRef::NotMaster,
+                     0 };
   DBUG_RETURN(dictSignal(&tSignal,NULL,0,
 			 0 /*use masternode id*/,
 			 WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
 			 -1, 100,
-			 0, -1));
+			 errCodes, -1));
 }
 
 int
@@ -3869,16 +3884,21 @@ NdbDictInterface::stopSubscribeEvent(cla
   req->subscriberData  = ev_op.m_oid;
   req->part            = (Uint32) SubscriptionData::TableData;
   req->subscriberRef   = m_reference;
+  req->requestInfo     = 0;
 
   DBUG_PRINT("info",("GSN_SUB_STOP_REQ subscriptionId=%d,subscriptionKey=%d,"
 		     "subscriberData=%d",req->subscriptionId,
 		     req->subscriptionKey,req->subscriberData));
 
+  int errCodes[] = { SubStartRef::Busy,
+                     SubStartRef::BusyWithNR,
+                     SubStartRef::NotMaster,
+                     0 };
   DBUG_RETURN(dictSignal(&tSignal,NULL,0,
 			 0 /*use masternode id*/,
 			 WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
 			 -1, 100,
-			 0, -1));
+			 errCodes, -1));
 }
 
 NdbEventImpl * 
@@ -4087,8 +4107,11 @@ NdbDictInterface::execSUB_STOP_REF(NdbAp
 
   DBUG_PRINT("error",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d",
 		      subscriptionId,subscriptionKey,subscriberData,m_error.code));
-  if (m_error.code == SubStopRef::NotMaster)
+  if (m_error.code == SubStopRef::NotMaster &&
+      signal->getLength() >= SubStopRef::SL_MasterNode)
+  {
     m_masterNodeId = subStopRef->m_masterNodeId;
+  }
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2008-02-13 11:39:47 +01:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2008-02-20 10:04:25 +01:00
@@ -2052,8 +2052,10 @@ NdbEventBuffer::report_node_connected(Ui
 {
   NdbEventOperation* op= m_ndb->getEventOperation(0);
   if (op == 0)
+  {
     return;
-
+  }
+  
   DBUG_ENTER("NdbEventBuffer::report_node_connected");
   SubTableData data;
   LinearSectionPtr ptr[3];
@@ -3205,8 +3207,16 @@ NdbEventBuffer::dropEventOperation(NdbEv
       tBlobOp->stop();
       tBlobOp = tBlobOp->m_next;
     }
+  }
+
+  /**
+   * Needs mutex lock as report_node_XXX accesses list...
+   */
+  NdbMutex_Lock(m_mutex);
 
-    // release blob handles now, further access is user error
+  // release blob handles now, further access is user error
+  if (op->theMainOp == NULL)
+  {
     while (op->theBlobList != NULL)
     {
       NdbBlob* tBlob = op->theBlobList;
@@ -3215,6 +3225,15 @@ NdbEventBuffer::dropEventOperation(NdbEv
     }
   }
 
+  if (op->m_next)
+    op->m_next->m_prev= op->m_prev;
+  if (op->m_prev)
+    op->m_prev->m_next= op->m_next;
+  else
+    m_ndb->theImpl->m_ev_op= op->m_next;
+  
+  assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
+  
   DBUG_ASSERT(op->m_ref_count > 0);
   // remove user reference
   // added in createEventOperation
@@ -3223,6 +3242,7 @@ NdbEventBuffer::dropEventOperation(NdbEv
   DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
   if (op->m_ref_count == 0)
   {
+    NdbMutex_Unlock(m_mutex);
     DBUG_PRINT("info", ("deleting op: %p", op));
     DBUG_ASSERT(op->m_node_bit_mask.isclear());
     delete op->m_facade;
@@ -3234,6 +3254,8 @@ NdbEventBuffer::dropEventOperation(NdbEv
     if (m_dropped_ev_op)
       m_dropped_ev_op->m_prev= op;
     m_dropped_ev_op= op;
+    
+    NdbMutex_Unlock(m_mutex);
   }
   DBUG_VOID_RETURN;
 }
diff -Nrup a/storage/ndb/src/ndbapi/ndberror.c b/storage/ndb/src/ndbapi/ndberror.c
--- a/storage/ndb/src/ndbapi/ndberror.c	2008-02-01 07:29:43 +01:00
+++ b/storage/ndb/src/ndbapi/ndberror.c	2008-02-20 10:04:25 +01:00
@@ -500,6 +500,12 @@ ErrorBundle ErrorCodes[] = {
 
   { 1420, DMEC, TR, "Subscriber manager busy with adding/removing a table" },
   { 1421, DMEC, SE, "Partially connected API in NdbOperation::execute()" },
+  { 1422, DMEC, SE, "Out of subscription records" },
+  { 1423, DMEC, SE, "Out of table records in SUMA" },
+  { 1424, DMEC, SE, "Out of MaxNoOfConcurrentSubOperations" },
+  { 1425, DMEC, SE, "Subscription being defined...while trying to stop subscriber" },
+  { 1426, DMEC, SE, "No such subscriber" },
+  { 1427, DMEC, NR, "Api node died, when SUB_START_REQ reached node "},
 
   { 4004, DMEC, AE, "Attribute name or id not found in the table" },
   
diff -Nrup a/storage/ndb/test/ndbapi/test_event.cpp b/storage/ndb/test/ndbapi/test_event.cpp
--- a/storage/ndb/test/ndbapi/test_event.cpp	2008-02-12 20:42:18 +01:00
+++ b/storage/ndb/test/ndbapi/test_event.cpp	2008-02-20 10:04:25 +01:00
@@ -25,11 +25,14 @@
 
 #define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
 
-static int createEvent(Ndb *pNdb, const NdbDictionary::Table &tab,
-                       bool merge_events = false)
+static int createEvent(Ndb *pNdb, 
+                       const NdbDictionary::Table &tab,
+                       NDBT_Context* ctx)
 {
   char eventName[1024];
   sprintf(eventName,"%s_EVENT",tab.getName());
+  bool merge_events = ctx->getProperty("MergeEvents");
+  bool report = ctx->getProperty("ReportSubscribe");
 
   NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
 
@@ -48,6 +51,9 @@ static int createEvent(Ndb *pNdb, const 
   }
   myEvent.mergeEvents(merge_events);
 
+  if (report)
+    myEvent.setReport(NdbDictionary::Event::ER_SUBSCRIBE);
+
   int res = myDict->createEvent(myEvent); // Add event to database
 
   if (res == 0)
@@ -139,8 +145,7 @@ NdbEventOperation *createEventOperation(
 
 static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step)
 {
-  bool merge_events = ctx->getProperty("MergeEvents");
-  if (createEvent(GETNDB(step),* ctx->getTab(), merge_events) != 0){
+  if (createEvent(GETNDB(step),* ctx->getTab(), ctx) != 0){
     return NDBT_FAILED;
   }
   return NDBT_OK;
@@ -1025,7 +1030,7 @@ static int createAllEvents(NDBT_Context*
   Ndb * ndb= GETNDB(step);
   for (int i= 0; pTabs[i]; i++)
   {
-    if (createEvent(ndb,*pTabs[i]))
+    if (createEvent(ndb,*pTabs[i], ctx))
     {
       DBUG_RETURN(NDBT_FAILED);
     }
@@ -1635,7 +1640,7 @@ static int runCreateDropNR(NDBT_Context*
   {
     result = NDBT_FAILED;
     const NdbDictionary::Table* pTab = ctx->getTab();
-    if (createEvent(ndb, *pTab))
+    if (createEvent(ndb, *pTab, ctx))
     {
       g_err << "createEvent failed" << endl;
       break;
@@ -1689,8 +1694,9 @@ runSubscribeUnsubscribe(NDBT_Context* ct
   sprintf(buf, "%s_EVENT", tab.getName());
   Ndb* ndb = GETNDB(step);
   int loops = 5 * ctx->getNumLoops();
+  int untilStopped = ctx->getProperty("SubscribeUntilStopped", (Uint32)0);
 
-  while (--loops)
+  while ((untilStopped || --loops) && !ctx->isTestStopped())
   {
     NdbEventOperation *pOp= ndb->createEventOperation(buf);
     if (pOp == 0)
@@ -2071,6 +2077,102 @@ runBug33793(NDBT_Context* ctx, NDBT_Step
 
 
 
+/** Telco 6.2 **/
+
+int
+runNFSubscribe(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbRestarter restarter;
+
+  if (restarter.getNumDbNodes() < 2)
+  {
+    ctx->stopTest();
+    return NDBT_OK;
+  }
+
+  int codes[] = {
+    13013,
+    13019,
+    13020,
+    13036,
+    0,
+  };
+
+  int nr_codes[] = {
+    13034,
+    13035,
+    13037,
+    0
+  };
+
+  int loops = ctx->getNumLoops();
+  while (loops-- && !ctx->isTestStopped())
+  {
+    int i = 0;
+    while (codes[i] != 0)
+    {
+      int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
+      int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
+      if (restarter.dumpStateOneNode(nodeId, val2, 2))
+        return NDBT_FAILED;
+      
+      ndbout_c("Node %u error: %u", nodeId, codes[i]);
+      if (restarter.insertErrorInNode(nodeId, codes[i]))
+        return NDBT_FAILED;
+      
+      if (restarter.waitNodesNoStart(&nodeId, 1))
+        return NDBT_FAILED;
+      
+      if (restarter.startNodes(&nodeId, 1))
+        return NDBT_FAILED;
+      
+      if (restarter.waitClusterStarted())
+        return NDBT_FAILED;
+      
+      i++;
+    }
+    
+    int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
+    if (restarter.restartOneDbNode(nodeId, false, true, true) != 0)
+      return NDBT_FAILED;
+    
+    if (restarter.waitNodesNoStart(&nodeId, 1))
+      return NDBT_FAILED;
+    
+    i = 0;
+    while (nr_codes[i] != 0)
+    {
+      int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
+      if (restarter.dumpStateOneNode(nodeId, val2, 2))
+        return NDBT_FAILED;
+      
+      ndbout_c("Node %u error: %u", nodeId, nr_codes[i]);
+      if (restarter.insertErrorInNode(nodeId, nr_codes[i]))
+        return NDBT_FAILED;
+      
+      if (restarter.startNodes(&nodeId, 1))
+        return NDBT_FAILED;
+      
+      if (restarter.waitNodesNoStart(&nodeId, 1))
+        return NDBT_FAILED;
+      
+      i++;
+    }
+    
+    ndbout_c("Done..now starting %u", nodeId);
+    if (restarter.startNodes(&nodeId, 1))
+      return NDBT_FAILED;
+    
+    if (restarter.waitClusterStarted())
+      return NDBT_FAILED;
+  }  
+
+  ctx->stopTest();
+  return NDBT_OK;
+}
+
+/** Telco 6.3 **/
+
 NDBT_TESTSUITE(test_event);
 TESTCASE("BasicEventOperation", 
 	 "Verify that we can listen to Events"
@@ -2207,6 +2309,14 @@ TESTCASE("Bug31701", ""){
   STEP(runBug31701);
   FINALIZER(runDropEvent);
   FINALIZER(runDropShadowTable);
+}
+TESTCASE("SubscribeNR", ""){
+  TC_PROPERTY("ReportSubscribe", 1);
+  TC_PROPERTY("SubscribeUntilStopped", 1);  
+  INITIALIZER(runCreateEvent);
+  STEPS(runSubscribeUnsubscribe, 5);
+  STEP(runNFSubscribe);
+  FINALIZER(runDropEvent);
 }
 TESTCASE("EventBufferOverflow",
          "Simulating EventBuffer overflow while node restart"
diff -Nrup a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2008-02-09 07:27:23 +01:00
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2008-02-20 10:04:25 +01:00
@@ -1094,3 +1094,6 @@ max-time: 1200
 cmd: testNodeRestart
 args: -n mixedmultiop -l 10 T1 I2 I3 D2
 
+max-time: 300
+cmd: test_event -n SubscribeNR T1
+
diff -Nrup a/storage/ndb/test/tools/connect.cpp b/storage/ndb/test/tools/connect.cpp
--- a/storage/ndb/test/tools/connect.cpp	2007-11-14 13:28:42 +01:00
+++ b/storage/ndb/test/tools/connect.cpp	2008-02-20 10:04:25 +01:00
@@ -25,6 +25,8 @@ NDB_STD_OPTS_VARS;
 static int _loop = 25;
 static int _sleep = 25;
 static int _drop = 1;
+static int _subloop = 5;
+static int _wait_all = 0;
 
 typedef uchar* gptr;
 
@@ -41,6 +43,14 @@ static struct my_option my_long_options[
     "Drop event operations before disconnect (0 = no, 1 = yes, else rand",
     (gptr*) &_drop, (gptr*) &_drop, 0,
     GET_INT, REQUIRED_ARG, _drop, 0, 0, 0, 0, 0 }, 
+  { "subscribe-loop", 256, 
+    "Loop in subscribe/unsubscribe",
+    (uchar**) &_subloop, (uchar**) &_subloop, 0,
+    GET_INT, REQUIRED_ARG, _subloop, 0, 0, 0, 0, 0 }, 
+  { "wait-all", 256, 
+    "Wait for all ndb-nodes (i.e not only some)",
+    (uchar**) &_wait_all, (uchar**) &_wait_all, 0,
+    GET_INT, REQUIRED_ARG, _wait_all, 0, 0, 0, 0, 0 }, 
   { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
 };
 
@@ -73,7 +83,9 @@ int main(int argc, char** argv){
       ndbout << "Unable to connect to management server." << endl;
       return NDBT_ProgramExit(NDBT_FAILED);
     }
-    if (con.wait_until_ready(30,30) != 0)
+    
+    int res = con.wait_until_ready(30,30);
+    if (res < 0 || (_wait_all && res != 0))
     {
       ndbout << "Cluster nodes not ready in 30 seconds." << endl;
       return NDBT_ProgramExit(NDBT_FAILED);
@@ -85,63 +97,75 @@ int main(int argc, char** argv){
       return NDBT_ProgramExit(NDBT_FAILED);
     }
 
-    Vector<NdbEventOperation*> ops;
-    const NdbDictionary::Dictionary * dict= MyNdb.getDictionary();
-    for (int j = 0; j < argc; j++) 
+    for (int k = _subloop; k >= 1; k--)
     {
-      const NdbDictionary::Table * pTab = dict->getTable(argv[j]);
-      if (pTab == 0)
+      if (k > 1 && ((k % 25) == 0))
       {
-        ndbout_c("Failed to retreive table: \"%s\"", argv[j]);
+        ndbout_c("subscribe/unsubscribe: %u", _subloop - k);
       }
-
-      BaseString tmp;
-      tmp.appfmt("EV-%s", argv[j]);
-      NdbEventOperation* pOp = MyNdb.createEventOperation(tmp.c_str());
-      if ( pOp == NULL ) 
+      Vector<NdbEventOperation*> ops;
+      const NdbDictionary::Dictionary * dict= MyNdb.getDictionary();
+      for (int j = 0; j < argc; j++) 
       {
-        ndbout << "Event operation creation failed: " << 
-          MyNdb.getNdbError() << endl;
-        return NDBT_ProgramExit(NDBT_FAILED);
+        const NdbDictionary::Table * pTab = dict->getTable(argv[j]);
+        if (pTab == 0)
+        {
+          ndbout_c("Failed to retreive table: \"%s\"", argv[j]);
+        }
+        
+        BaseString tmp;
+        tmp.appfmt("EV-%s", argv[j]);
+        NdbEventOperation* pOp = MyNdb.createEventOperation(tmp.c_str());
+        if ( pOp == NULL ) 
+        {
+          ndbout << "Event operation creation failed: " << 
+            MyNdb.getNdbError() << endl;
+          return NDBT_ProgramExit(NDBT_FAILED);
+        }
+        
+        for (int a = 0; a < pTab->getNoOfColumns(); a++) 
+        {
+          pOp->getValue(pTab->getColumn(a)->getName());
+          pOp->getPreValue(pTab->getColumn(a)->getName());
+        }
+        
+        ops.push_back(pOp);
+        if (pOp->execute())
+        { 
+          ndbout << "operation execution failed: " << pOp->getNdbError()
+                 << endl;
+          k = 1;
+        }
       }
-
-      for (int a = 0; a < pTab->getNoOfColumns(); a++) 
+      
+      if (_sleep)
       {
-        pOp->getValue(pTab->getColumn(a)->getName());
-        pOp->getPreValue(pTab->getColumn(a)->getName());
+        NdbSleep_MilliSleep(10 + rand() % _sleep);
       }
-      
-      if (pOp->execute())
-      { 
-        ndbout << "operation execution failed: " << pOp->getNdbError()
-               << endl;
-        return NDBT_ProgramExit(NDBT_FAILED);
+      else
+      {
+        ndbout_c("NDBT_ProgramExit: SLEEPING OK");
+        while(true) NdbSleep_SecSleep(5);
       }
-      ops.push_back(pOp);
-    }
-    
-    if (_sleep)
-    {
-      NdbSleep_MilliSleep(10 + rand() % _sleep);
-    }
-    
-    for (Uint32 i = 0; i<ops.size(); i++)
-    {
-      switch(_drop){
-      case 0:
-        break;
-      do_drop:
-      case 1:
-        if (MyNdb.dropEventOperation(ops[i]))
-        {
-          ndbout << "drop event operation failed " 
-                 << MyNdb.getNdbError() << endl;
-          return NDBT_ProgramExit(NDBT_FAILED);
+      
+      for (Uint32 i = 0; i<ops.size(); i++)
+      {
+        switch(k == 1 ? _drop : 1){
+        case 0:
+          break;
+        do_drop:
+        case 1:
+          if (MyNdb.dropEventOperation(ops[i]))
+          {
+            ndbout << "drop event operation failed " 
+                   << MyNdb.getNdbError() << endl;
+            return NDBT_ProgramExit(NDBT_FAILED);
+          }
+          break;
+        default:
+          if ((rand() % 100) > 50)
+            goto do_drop;
         }
-        break;
-      default:
-        if ((rand() % 100) > 50)
-          goto do_drop;
       }
     }
   }
Thread
bk commit into 5.1 tree (jonas:1.2532)jonas20 Feb