List:Commits« Previous MessageNext Message »
From:jonas Date:September 12 2007 11:30am
Subject:bk commit into 5.1 tree (jonas:1.2613)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-09-12 13:30:27+02:00, jonas@stripped +20 -0
  ndb - micro gcp upgrade
    handle online upgrade from pre-micro-gcp

  storage/ndb/include/kernel/GlobalSignalNumbers.h@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +2 -1
    add signal when enabling micro gcp

  storage/ndb/include/kernel/signaldata/StartPerm.hpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +2 -1
    inform starting node if micro gcp is enabled

  storage/ndb/include/kernel/signaldata/Upgrade.hpp@stripped, 2007-09-12 13:30:25+02:00, jonas@stripped +35 -0
    New BitKeeper file ``storage/ndb/include/kernel/signaldata/Upgrade.hpp''

  storage/ndb/include/kernel/signaldata/Upgrade.hpp@stripped, 2007-09-12 13:30:25+02:00, jonas@stripped +0 -0

  storage/ndb/include/ndb_version.h.in@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +23 -0
    add checker function for micro gcp

  storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +1 -1
    add len to function, so that code can determine if's it's new/old version of signal that is received

  storage/ndb/src/common/debugger/signaldata/SignalNames.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +2 -0
    add new signal

  storage/ndb/src/common/util/version.c@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +1 -0
    make 6.1.19 compatible with 6.2.x

  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +21 -11
    split gcp record into master/participant
    fix "old" protocol master-take-over,
    fix enable micro gcp, when it's possible
    handle mixed protocol(s)

  storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +3 -0
    split gcp record into master/participant
    fix "old" protocol master-take-over,
    fix enable micro gcp, when it's possible
    handle mixed protocol(s)

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +299 -254
    split gcp record into master/participant
    fix "old" protocol master-take-over,
    fix enable micro gcp, when it's possible
    handle mixed protocol(s)

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +38 -3
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +34 -5
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +4 -0
    add support for (dis)allowing node to enter cluster dependant on
      micro gcp capability or if it's enabled

  storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +4 -0
    add support for (dis)allowing node to enter cluster dependant on
      micro gcp capability or if it's enabled

  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +40 -6
    add support for (dis)allowing node to enter cluster dependant on
      micro gcp capability or if it's enabled

  storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +8 -1
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +30 -12
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +4 -4
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2007-09-12 13:30:24+02:00, jonas@stripped +16 -3
    handle mixed protocol(s) (gci hi/lo)

  storage/ndb/src/ndbapi/Ndbif.cpp@stripped, 2007-09-12 13:30:25+02:00, jonas@stripped +3 -3
    handle mixed protocol(s) (gci hi/lo)

diff -Nrup a/storage/ndb/include/kernel/GlobalSignalNumbers.h b/storage/ndb/include/kernel/GlobalSignalNumbers.h
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2007-03-27 16:06:45 +02:00
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2007-09-12 13:30:24 +02:00
@@ -381,7 +381,8 @@ extern const GlobalSignalNumber NO_OF_SI
 #define GSN_GCP_SAVEREQ                 283
 #define GSN_GCP_TCFINISHED              284
 
-/* 285 unused */
+#define GSN_UPGRADE_PROTOCOL_ORD        285
+
 /* 286 unused */
 /* 287 unused */
 #define GSN_GETGCICONF                  288
diff -Nrup a/storage/ndb/include/kernel/signaldata/StartPerm.hpp b/storage/ndb/include/kernel/signaldata/StartPerm.hpp
--- a/storage/ndb/include/kernel/signaldata/StartPerm.hpp	2007-03-27 14:27:41 +02:00
+++ b/storage/ndb/include/kernel/signaldata/StartPerm.hpp	2007-09-12 13:30:24 +02:00
@@ -44,11 +44,12 @@ class StartPermConf {
   friend class Dbdih;
   
 public:
-  STATIC_CONST( SignalLength = 2 );
+  STATIC_CONST( SignalLength = 3 );
 private:
   
   Uint32 startingNodeId;
   Uint32 systemFailureNo;  
+  Uint32 microGCP;
 };
 
 class StartPermRef {
diff -Nrup a/storage/ndb/include/kernel/signaldata/Upgrade.hpp b/storage/ndb/include/kernel/signaldata/Upgrade.hpp
--- /dev/null	Wed Dec 31 16:00:00 196900
+++ b/storage/ndb/include/kernel/signaldata/Upgrade.hpp	2007-09-12 13:30:25 +02:00
@@ -0,0 +1,35 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef UPGRADE_PROTOCOL_HPP
+#define UPGRADE_PROTOCOL_HPP
+
+struct UpgradeProtocolOrd 
+{
+  /**
+   * Sender(s) / Reciver(s)
+   */
+  friend class Dbdih;
+
+  STATIC_CONST( SignalLength = 1 );
+
+  enum {
+    UPO_ENABLE_MICRO_GCP = 1
+  };
+
+  Uint32 type;
+};
+
+#endif
diff -Nrup a/storage/ndb/include/ndb_version.h.in b/storage/ndb/include/ndb_version.h.in
--- a/storage/ndb/include/ndb_version.h.in	2007-07-25 07:26:02 +02:00
+++ b/storage/ndb/include/ndb_version.h.in	2007-09-12 13:30:24 +02:00
@@ -91,5 +91,28 @@ Uint32 ndbGetOwnVersion();
 #define NDBD_255_NODES_VERSION NDB_MAKE_VERSION(5,1,4)
 #define NDBD_PREPARE_COPY_FRAG_VERSION NDB_MAKE_VERSION(6,2,1)
 
+#define NDBD_MICRO_GCP_62 NDB_MAKE_VERSION(6,2,5)
+#define NDBD_MICRO_GCP_63 NDB_MAKE_VERSION(6,3,2)
+
+static
+inline
+int
+ndb_check_micro_gcp(Uint32 version)
+{	
+  if (version == NDB_VERSION_D)
+    return 1;
+  {
+    const Uint32 major = (version >> 16) & 0xFF;
+    const Uint32 minor = (version >>  8) & 0xFF;
+    if (major >= 6)
+    {
+      if (minor == 2)
+        return version >= NDBD_MICRO_GCP_62;
+      return version >= NDBD_MICRO_GCP_63;
+    }
+  }
+  return 0;
+}
+
 #endif
  
diff -Nrup a/storage/ndb/include/ndbapi/NdbTransaction.hpp b/storage/ndb/include/ndbapi/NdbTransaction.hpp
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2007-09-12 13:30:24 +02:00
@@ -828,7 +828,7 @@ private:						
   int  receiveTCSEIZEREF(NdbApiSignal* anApiSignal);	
   int  receiveTCRELEASECONF(NdbApiSignal* anApiSignal);	
   int  receiveTCRELEASEREF(NdbApiSignal* anApiSignal);	
-  int  receiveTC_COMMITCONF(const class TcCommitConf *);
+  int  receiveTC_COMMITCONF(const class TcCommitConf *, Uint32 len);
   int  receiveTCKEYCONF(const class TcKeyConf *, Uint32 aDataLength);
   int  receiveTCKEY_FAILCONF(const class TcKeyFailConf *);
   int  receiveTCKEY_FAILREF(NdbApiSignal* anApiSignal);
diff -Nrup a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2007-05-14 14:47:27 +02:00
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2007-09-12 13:30:24 +02:00
@@ -644,5 +644,7 @@ const GsnName SignalNames [] = {
   ,{ GSN_PREPARE_COPY_FRAG_REQ,   "PREPARE_COPY_FRAG_REQ" }
   ,{ GSN_PREPARE_COPY_FRAG_REF,   "PREPARE_COPY_FRAG_REF" }
   ,{ GSN_PREPARE_COPY_FRAG_CONF,  "PREPARE_COPY_FRAG_CONF" }
+
+  ,{ GSN_UPGRADE_PROTOCOL_ORD, "UPGRADE_PROTOCOL_ORD" }
 };
 const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
diff -Nrup a/storage/ndb/src/common/util/version.c b/storage/ndb/src/common/util/version.c
--- a/storage/ndb/src/common/util/version.c	2007-07-11 14:58:03 +02:00
+++ b/storage/ndb/src/common/util/version.c	2007-09-12 13:30:24 +02:00
@@ -113,6 +113,7 @@ struct NdbUpGradeCompatible ndbCompatibl
   { MAKE_VERSION(6,2,NDB_VERSION_BUILD), MAKE_VERSION(6,2,1), UG_Range },
   { MAKE_VERSION(6,2,0), MAKE_VERSION(6,2,0), UG_Range},
 
+  { MAKE_VERSION(6,2,NDB_VERSION_BUILD), MAKE_VERSION(6,1,19), UG_Exact },
   { MAKE_VERSION(6,1,NDB_VERSION_BUILD), MAKE_VERSION(6,1,6), UG_Range},
   /* var page reference 32bit->64bit making 6.1.6 not backwards compatible */
   /* ndb_apply_status table changed, and no compatability code written */
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2007-09-12 13:30:24 +02:00
@@ -721,6 +721,8 @@ private:
   void execDICT_LOCK_CONF(Signal* signal);
   void execDICT_LOCK_REF(Signal* signal);
 
+  void execUPGRADE_PROTOCOL_ORD(Signal* signal);
+
   // Statement blocks
 //------------------------------------
 // Methods that send signals
@@ -842,7 +844,7 @@ private:
   void emptyverificbuffer(Signal *, bool aContintueB);
   Uint32 findHotSpare();
   void handleGcpStateInMaster(Signal *, NodeRecordPtr failedNodeptr);
-  void initRestartInfo();
+  void initRestartInfo(Signal*);
   void initRestorableGciFiles();
   void makeNodeGroups(Uint32 nodeArray[]);
   void makePrnList(class ReadNodesConf * readNodes, Uint32 nodeArray[]);
@@ -1212,7 +1214,6 @@ private:
   Uint32 cgcpSameCounter;
 #endif
 
-  bool cstartGcpNow;
   bool cgckptflag;    /* A FLAG WHICH IS SET WHILE A NEW GLOBAL CHECK
                            POINT IS BEING CREATED. NO VERIFICATION IS ALLOWED
                            IF THE FLAG IS SET*/
@@ -1224,17 +1225,21 @@ private:
    */
   struct GcpSave
   {
-    bool m_master_take_over;
-    Uint32 m_time_between_gcp;   /* Delay between global checkpoints */
     Uint32 m_gci;
     Uint32 m_master_ref;
-    Uint64 m_start_time;
     enum State {
       GCP_SAVE_IDLE     = 0, // Idle
       GCP_SAVE_REQ      = 1, // REQ received
       GCP_SAVE_CONF     = 2, // REF/CONF sent
       GCP_SAVE_COPY_GCI = 3
-    } m_state, m_master_state;
+    } m_state;
+
+    struct {
+      State m_state;
+      Uint32 m_new_gci;
+      Uint32 m_time_between_gcp;   /* Delay between global checkpoints */
+      Uint64 m_start_time;
+    } m_master;
   } m_gcp_save;
 
   /**
@@ -1243,20 +1248,23 @@ private:
   struct MicroGcp
   {
     bool m_enabled;
-    bool m_master_take_over;
-    Uint32 m_time_between_gcp;
     Uint32 m_master_ref;
     Uint64 m_old_gci;
     Uint64 m_current_gci; // Currently active
     Uint64 m_new_gci;     // Currently being prepared...
-    Uint64 m_next_gci;    // Next to prepare
-    Uint64 m_start_time;
     enum State {
       M_GCP_IDLE      = 0,
       M_GCP_PREPARE   = 1,
       M_GCP_COMMIT    = 2,
       M_GCP_COMMITTED = 3
-    } m_state, m_master_state;
+    } m_state;
+
+    struct {
+      State m_state;
+      Uint32 m_time_between_gcp;
+      Uint64 m_new_gci;
+      Uint64 m_start_time;
+    } m_master;
   } m_micro_gcp;
 
   struct GcpMonitor
@@ -1685,6 +1693,8 @@ private:
                          Uint32 block = 0, Uint32 gsn = 0, Uint32 len = 0,
                          JobBufferLevel = JBB);
 #endif
+
+  bool check_enable_micro_gcp(Signal* signal, bool broadcast);
 };
 
 #if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2007-09-12 13:30:24 +02:00
@@ -266,6 +266,9 @@ Dbdih::Dbdih(Block_context& ctx):
 	       &Dbdih::execPREPARE_COPY_FRAG_REF);
   addRecSignal(GSN_PREPARE_COPY_FRAG_CONF,
 	       &Dbdih::execPREPARE_COPY_FRAG_CONF);
+
+  addRecSignal(GSN_UPGRADE_PROTOCOL_ORD,
+	       &Dbdih::execUPGRADE_PROTOCOL_ORD);
   
   apiConnectRecord = 0;
   connectRecord = 0;
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2007-09-12 13:30:24 +02:00
@@ -69,6 +69,7 @@
 #include <signaldata/DihFragCount.hpp>
 #include <signaldata/DictLock.hpp>
 #include <DebuggerNames.hpp>
+#include <signaldata/Upgrade.hpp>
 
 #include <EventLogger.hpp>
 extern EventLogger g_eventLogger;
@@ -187,9 +188,11 @@ void Dbdih::sendGCP_COMMIT(Signal* signa
   BlockReference ref = calcDihBlockRef(nodeId);
   GCPCommit *req = (GCPCommit*)signal->getDataPtrSend();
   req->nodeId = cownNodeId;
-  req->gci_hi = Uint32(m_micro_gcp.m_new_gci >> 32);
-  req->gci_lo = Uint32(m_micro_gcp.m_new_gci);
+  req->gci_hi = Uint32(m_micro_gcp.m_master.m_new_gci >> 32);
+  req->gci_lo = Uint32(m_micro_gcp.m_master.m_new_gci);
   sendSignal(ref, GSN_GCP_COMMIT, signal, GCPCommit::SignalLength, JBA);
+
+  ndbassert(m_micro_gcp.m_enabled || Uint32(m_micro_gcp.m_new_gci) == 0);
 }//Dbdih::sendGCP_COMMIT()
 
 void Dbdih::sendGCP_PREPARE(Signal* signal, Uint32 nodeId)
@@ -197,9 +200,11 @@ void Dbdih::sendGCP_PREPARE(Signal* sign
   BlockReference ref = calcDihBlockRef(nodeId);
   GCPPrepare *req = (GCPPrepare*)signal->getDataPtrSend();
   req->nodeId = cownNodeId;
-  req->gci_hi = Uint32(m_micro_gcp.m_new_gci >> 32);
-  req->gci_lo = Uint32(m_micro_gcp.m_new_gci);
+  req->gci_hi = Uint32(m_micro_gcp.m_master.m_new_gci >> 32);
+  req->gci_lo = Uint32(m_micro_gcp.m_master.m_new_gci);
   sendSignal(ref, GSN_GCP_PREPARE, signal, GCPPrepare::SignalLength, JBA);
+  
+  ndbassert(m_micro_gcp.m_enabled || Uint32(m_micro_gcp.m_new_gci) == 0);
 }//Dbdih::sendGCP_PREPARE()
 
 void Dbdih::sendGCP_SAVEREQ(Signal* signal, Uint32 nodeId)
@@ -208,7 +213,7 @@ void Dbdih::sendGCP_SAVEREQ(Signal* sign
   BlockReference ref = calcDihBlockRef(nodeId);
   saveReq->dihBlockRef = reference();
   saveReq->dihPtr = nodeId;
-  saveReq->gci = m_gcp_save.m_gci;
+  saveReq->gci = m_gcp_save.m_master.m_new_gci;
   sendSignal(ref, GSN_GCP_SAVEREQ, signal, GCPSaveReq::SignalLength, JBB);
 }//Dbdih::sendGCP_SAVEREQ()
 
@@ -740,17 +745,6 @@ done:  
     c_newest_restorable_gci = newest;
     Sysfile::setRestartOngoing(SYSFILE->systemRestartBits);
     m_micro_gcp.m_current_gci = Uint64(newest + 1) << 32;
-    m_micro_gcp.m_new_gci = Uint64(newest + 1) << 32;
-    if (m_micro_gcp.m_enabled)
-    {
-      jam();
-      m_micro_gcp.m_next_gci = (Uint64(newest + 1) << 32) + 1;
-    }
-    else
-    {
-      jam();
-      m_micro_gcp.m_next_gci = (Uint64(newest + 2) << 32);
-    }
     setNodeInfo(signal);
     if ((Sysfile::getLCPOngoing(SYSFILE->systemRestartBits))) {
       jam();
@@ -760,6 +754,19 @@ done:  
       /* -------------------------------------------------------------------- */
       invalidateLcpInfoAfterSr();
     }//if
+
+    if (m_micro_gcp.m_enabled == false && 
+        m_micro_gcp.m_master.m_time_between_gcp)
+    {
+      /**
+       * Micro GCP is disabled...but configured...
+       */
+      jam();
+      m_micro_gcp.m_enabled = true;
+      UpgradeProtocolOrd * ord = (UpgradeProtocolOrd*)signal->getDataPtrSend();
+      ord->type = UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP;
+      EXECUTE_DIRECT(QMGR,GSN_UPGRADE_PROTOCOL_ORD,signal,signal->getLength());
+    }
     break;
   }
   case CopyGCIReq::GLOBAL_CHECKPOINT: {
@@ -772,18 +779,18 @@ done:  
       /**
        * This must be master take over...and it already running...
        */
-      ndbrequire(m_gcp_save.m_master_ref != c_copyGCISlave.m_senderRef);
+      ndbrequire(c_newest_restorable_gci == SYSFILE->newestRestorableGCI);
       m_gcp_save.m_master_ref = c_copyGCISlave.m_senderRef;
       return;
     }
 
-    if (m_gcp_save.m_state == GcpSave::GCP_SAVE_IDLE)
+    if (c_newest_restorable_gci == SYSFILE->newestRestorableGCI)
     {
       jam();
+
       /**
        * This must be master take over...and it already complete...
        */
-      ndbrequire(m_gcp_save.m_master_ref != c_copyGCISlave.m_senderRef);
       m_gcp_save.m_master_ref = c_copyGCISlave.m_senderRef;
       c_copyGCISlave.m_copyReason = CopyGCIReq::IDLE;
       signal->theData[0] = c_copyGCISlave.m_senderData;
@@ -1669,7 +1676,7 @@ void Dbdih::ndbStartReqLab(Signal* signa
   cndbStartReqBlockref = ref;
   if (cstarttype == NodeState::ST_INITIAL_START) {
     jam();
-    initRestartInfo();
+    initRestartInfo(signal);
     initGciFilesLab(signal);
     return;
   }
@@ -1873,8 +1880,23 @@ void Dbdih::execSTART_PERMCONF(Signal* s
   CRASH_INSERTION(7121);
   Uint32 nodeId = signal->theData[0];
   cfailurenr = signal->theData[1];
+  
+  bool microGCP = signal->theData[2];
+  if (signal->getLength() < StartPermConf::SignalLength)
+  {
+    microGCP = false;
+  }
+  m_micro_gcp.m_enabled = microGCP;
   ndbrequire(nodeId == cownNodeId);
   ndbsttorry10Lab(signal, __LINE__);
+
+  if (m_micro_gcp.m_enabled)
+  {
+    jam();
+    UpgradeProtocolOrd * ord = (UpgradeProtocolOrd*)signal->getDataPtrSend();
+    ord->type = UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP;
+    EXECUTE_DIRECT(QMGR,GSN_UPGRADE_PROTOCOL_ORD,signal,signal->getLength());
+  }
 }//Dbdih::execSTART_PERMCONF()
 
 void Dbdih::execSTART_PERMREF(Signal* signal) 
@@ -2082,6 +2104,7 @@ void Dbdih::startInfoReply(Signal* signa
     StartPermConf * conf = (StartPermConf*)&signal->theData[0];
     conf->startingNodeId = c_nodeStartMaster.startNode;
     conf->systemFailureNo = cfailurenr;
+    conf->microGCP = m_micro_gcp.m_enabled;
     sendSignal(calcDihBlockRef(c_nodeStartMaster.startNode), 
 	       GSN_START_PERMCONF, signal, StartPermConf::SignalLength, JBB);
     c_nodeStartMaster.m_outstandingGsn = GSN_START_PERMCONF;
@@ -2418,6 +2441,12 @@ void Dbdih::execINCL_NODEREQ(Signal* sig
   Uint32 tnodeStartFailNr = signal->theData[2];
   Uint32 gci_hi = signal->theData[4];
   Uint32 gci_lo = signal->theData[5];
+  if (unlikely(signal->getLength() < 6))
+  {
+    jam();
+    gci_lo = 0;
+  }
+  
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
   CRASH_INSERTION(7127);
   m_micro_gcp.m_current_gci = gci;
@@ -2491,7 +2520,7 @@ void Dbdih::execINCL_NODEREQ(Signal* sig
   /*-------------------------------------------------------------------------*/
   signal->theData[0] = reference();
   signal->theData[1] = nodeId;
-  signal->theData[2] = m_micro_gcp.m_current_gci >> 32;
+  signal->theData[2] = Uint32(m_micro_gcp.m_current_gci >> 32);
   sendSignal(clocallqhblockref, GSN_INCL_NODEREQ, signal, 3, JBB);
 }//Dbdih::execINCL_NODEREQ()
 
@@ -2834,7 +2863,8 @@ void Dbdih::startTakeOver(Signal* signal
     checkToCopyCompleted(signal);
     return;
   }
-  cstartGcpNow = true;
+
+  m_micro_gcp.m_master.m_start_time = m_gcp_save.m_master.m_start_time = 0;
 }//Dbdih::startTakeOver()
 
 void Dbdih::changeNodeGroups(Uint32 startNode, Uint32 nodeTakenOver)
@@ -5083,7 +5113,7 @@ void Dbdih::checkGcpOutstanding(Signal* 
     GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
     saveRef->dihPtr = failedNodeId;
     saveRef->nodeId = failedNodeId;
-    saveRef->gci    = m_gcp_save.m_gci;
+    saveRef->gci    = m_gcp_save.m_master.m_new_gci;
     saveRef->errorCode = GCPSaveRef::FakedSignalDueToNodeFailure;
     sendSignal(reference(), GSN_GCP_SAVEREF, signal, 
 	       GCPSaveRef::SignalLength, JBB);
@@ -5407,16 +5437,16 @@ void Dbdih::execMASTER_GCPCONF(Signal* s
   case MasterGCPConf::GCP_PREPARE_RECEIVED:
     jam();
     ok = true;
-    if (m_micro_gcp.m_master_state == MicroGcp::M_GCP_IDLE)
+    if (m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_IDLE)
     {
       jam();
-      m_micro_gcp.m_master_state = MicroGcp::M_GCP_PREPARE;
-      m_micro_gcp.m_new_gci = newGCI;
+      m_micro_gcp.m_master.m_state = MicroGcp::M_GCP_PREPARE;
+      m_micro_gcp.m_master.m_new_gci = newGCI;
     }
     else
     {
       jam();
-      ndbrequire(m_micro_gcp.m_new_gci == newGCI);
+      ndbrequire(m_micro_gcp.m_master.m_new_gci == newGCI);
     }
     break;
   case MasterGCPConf::GCP_COMMIT_RECEIVED:
@@ -5424,8 +5454,12 @@ void Dbdih::execMASTER_GCPCONF(Signal* s
   case MasterGCPConf::GCP_COMMITTED:
     jam();
     ok = true;
-    ndbrequire(m_micro_gcp.m_new_gci == newGCI);
-    m_micro_gcp.m_master_state = MicroGcp::M_GCP_COMMIT;
+    if (m_micro_gcp.m_master.m_state != MicroGcp::M_GCP_IDLE)
+    {
+      ndbrequire(m_micro_gcp.m_master.m_new_gci == newGCI);
+    }
+    m_micro_gcp.m_master.m_new_gci = newGCI;
+    m_micro_gcp.m_master.m_state = MicroGcp::M_GCP_COMMIT;
     break;
 #ifndef VM_TRACE
   default:
@@ -5442,26 +5476,26 @@ void Dbdih::execMASTER_GCPCONF(Signal* s
     break;
   case MasterGCPConf::GCP_SAVE_REQ:
     jam();
-    if (m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE)
+    if (m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE)
     {
       jam();
-      m_gcp_save.m_master_state = GcpSave::GCP_SAVE_REQ;
+      m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_REQ;
     }
     break;
   case MasterGCPConf::GCP_SAVE_CONF:
     jam();
-    if (m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE)
+    if (m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE)
     {
       jam();
-      m_gcp_save.m_master_state = GcpSave::GCP_SAVE_REQ;
+      m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_REQ;
     }
     break;
   case MasterGCPConf::GCP_SAVE_COPY_GCI:
     jam();
-    if (m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE)
+    if (m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE)
     {
       jam();
-      m_gcp_save.m_master_state = GcpSave::GCP_SAVE_COPY_GCI;
+      m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_COPY_GCI;
     }
     break;
 #ifndef VM_TRACE
@@ -5497,12 +5531,12 @@ void Dbdih::execMASTER_GCPREF(Signal* si
 void Dbdih::MASTER_GCPhandling(Signal* signal, Uint32 failedNodeId) 
 {
   cmasterState = MASTER_ACTIVE;
-  m_micro_gcp.m_start_time = 0;
-  m_gcp_save.m_start_time = 0;
-  m_micro_gcp.m_next_gci = m_micro_gcp.m_new_gci + 1;
+
+  m_micro_gcp.m_master.m_start_time = 0;
+  m_gcp_save.m_master.m_start_time = 0;
 
   bool ok = false;
-  switch(m_micro_gcp.m_master_state){
+  switch(m_micro_gcp.m_master.m_state){
   case MicroGcp::M_GCP_IDLE:
     jam();
     ok = true;
@@ -5536,65 +5570,64 @@ void Dbdih::MASTER_GCPhandling(Signal* s
     ndbrequire(false);
 #ifndef VM_TRACE
   default:
-    jamLine(m_micro_gcp.m_master_state);
+    jamLine(m_micro_gcp.m_master.m_state);
     ndbrequire(false);
 #endif
   }
   ndbassert(ok);
 
+  if (m_micro_gcp.m_enabled == false)
+  {
+    jam();
+    m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_IDLE;
+  }
+  else
   {
-    Uint32 saveGCI = Uint32(m_micro_gcp.m_next_gci >> 32);
-    if (m_gcp_save.m_gci + 1 == saveGCI)
+    ok = false;
+    m_gcp_save.m_master.m_new_gci = m_gcp_save.m_gci;
+    switch(m_gcp_save.m_master.m_state){
+    case GcpSave::GCP_SAVE_IDLE:
+      jam();
+      ok = true;
+      break;
+    case GcpSave::GCP_SAVE_REQ:
     {
       jam();
+      ok = true;
+      
       /**
-       * Make sure that GCP save can continue...
+       * Restart GCP_SAVE_REQ
        */
-      m_micro_gcp.m_next_gci = (Uint64(saveGCI) + 1) << 32;
+      sendLoopMacro(GCP_SAVEREQ, sendGCP_SAVEREQ);
+      break;
     }
-  }
-
-  ok = false;
-  switch(m_gcp_save.m_master_state){
-  case GcpSave::GCP_SAVE_IDLE:
-    jam();
-    ok = true;
-    break;
-  case GcpSave::GCP_SAVE_REQ:
-  {
-    jam();
-    ok = true;
-
-    /**
-     * Restart GCP_SAVE_REQ
-     */
-    sendLoopMacro(GCP_SAVEREQ, sendGCP_SAVEREQ);
-    break;
-  }
-  case GcpSave::GCP_SAVE_CONF:
-    jam();
-  case GcpSave::GCP_SAVE_COPY_GCI:
-    jam();
-    ok = true;
-    copyGciLab(signal, CopyGCIReq::GLOBAL_CHECKPOINT);
-    m_gcp_save.m_master_state = GcpSave::GCP_SAVE_COPY_GCI;
-    break;
+    case GcpSave::GCP_SAVE_CONF:
+      jam();
+    case GcpSave::GCP_SAVE_COPY_GCI:
+      jam();
+      ok = true;
+      copyGciLab(signal, CopyGCIReq::GLOBAL_CHECKPOINT);
+      m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_COPY_GCI;
+      break;
 #ifndef VM_TRACE
-  default:
-    jamLine(m_gcp_save.m_master_state);
-    ndbrequire(false);
+    default:
+      jamLine(m_gcp_save.m_master.m_state);
+      ndbrequire(false);
 #endif
+    }
+    ndbrequire(ok);
   }
-  ndbrequire(ok);
-
+  
   signal->theData[0] = NDB_LE_GCP_TakeoverCompleted;
-  signal->theData[1] = m_micro_gcp.m_master_state;
-  signal->theData[2] = m_gcp_save.m_master_state;
+  signal->theData[1] = m_micro_gcp.m_master.m_state;
+  signal->theData[2] = m_gcp_save.m_master.m_state;
   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 1, JBB);
 
-  infoEvent("kk: %u %u",
-            m_micro_gcp.m_master_state,
-            m_gcp_save.m_master_state);
+  infoEvent("kk: %u/%u %u %u",
+            Uint32(m_micro_gcp.m_current_gci >> 32),
+            Uint32(m_micro_gcp.m_current_gci),
+            m_micro_gcp.m_master.m_state,
+            m_gcp_save.m_master.m_state);
 
   /*--------------------------------------------------*/
   /*       WE SEPARATE HANDLING OF GLOBAL CHECKPOINTS */
@@ -7947,11 +7980,64 @@ void Dbdih::checkGcpStopLab(Signal* sign
   return;
 }//Dbdih::checkGcpStopLab()
 
-static
+
 bool
-check_enable_micro_gcp()
+Dbdih::check_enable_micro_gcp(Signal* signal, bool broadcast)
 {
-  return false;
+  ndbassert(m_micro_gcp.m_enabled == false);
+  ndbassert(NodeVersionInfo::DataLength == 6);
+  Uint32 min = ~(Uint32)0;
+  const NodeVersionInfo& info = getNodeVersionInfo();
+  for (Uint32 i = 0; i<3; i++)
+  {
+    Uint32 tmp = info.m_type[i].m_min_version;
+    if (tmp)
+    {
+      min = (min < tmp) ? min : tmp; 
+    }
+  }
+
+  if (ndb_check_micro_gcp(min))
+  {
+    jam();
+    m_micro_gcp.m_enabled = true;
+
+    infoEvent("Enabling micro GCP");
+    if (broadcast)
+    {
+      jam();
+      UpgradeProtocolOrd * ord = (UpgradeProtocolOrd*)signal->getDataPtrSend();
+      ord->type = UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP;
+
+      /**
+       * We need to notify all ndbd's or they'll get confused!
+       */
+      NodeRecordPtr specNodePtr;
+      specNodePtr.i = cfirstAliveNode;
+      do {
+        jam();
+        ptrCheckGuard(specNodePtr, MAX_NDB_NODES, nodeRecord);
+        sendSignal(calcDihBlockRef(specNodePtr.i), GSN_UPGRADE_PROTOCOL_ORD,
+                   signal, UpgradeProtocolOrd::SignalLength, JBA);
+        specNodePtr.i = specNodePtr.p->nextNode;
+      } while (specNodePtr.i != RNIL);
+      EXECUTE_DIRECT(QMGR,GSN_UPGRADE_PROTOCOL_ORD,signal,signal->getLength());
+    }
+  }
+  return m_micro_gcp.m_enabled;
+}
+
+void
+Dbdih::execUPGRADE_PROTOCOL_ORD(Signal* signal)
+{
+  const UpgradeProtocolOrd* ord = (UpgradeProtocolOrd*)signal->getDataPtr();
+  switch(ord->type){
+  case UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP:
+    jam();
+    m_micro_gcp.m_enabled = true;
+    EXECUTE_DIRECT(QMGR, GSN_UPGRADE_PROTOCOL_ORD,signal, signal->getLength());
+    return;
+  }
 }
 
 void
@@ -7983,54 +8069,64 @@ Dbdih::startGcpLab(Signal* signal, Uint3
   }//if
 
   Uint64 now = NdbTick_CurrentMillisecond();
-  if (cstartGcpNow == false)
+  
+  Uint32 delayMicro = m_micro_gcp.m_enabled ? 
+    m_micro_gcp.m_master.m_time_between_gcp : 
+    m_gcp_save.m_master.m_time_between_gcp;
+  
+  if (! (now >= m_micro_gcp.m_master.m_start_time + delayMicro))
   {
     jam();
-    Uint32 delay = m_micro_gcp.m_time_between_gcp;
-    if (delay == 0)
-    {
-      jam(); // I.e not configured
-      delay = m_gcp_save.m_time_between_gcp;
-    }
-
-    if (m_micro_gcp.m_start_time + delay > now)
-    {
-      jam();
-      signal->theData[0] = DihContinueB::ZSTART_GCP;
-      sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
-      return;
-    }
-  }//if
-  cstartGcpNow = false;
-  m_micro_gcp.m_start_time = now;
+    signal->theData[0] = DihContinueB::ZSTART_GCP;
+    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
+    return;
+  }
 
-  if (m_micro_gcp.m_enabled == false && m_micro_gcp.m_time_between_gcp)
+  m_micro_gcp.m_master.m_start_time = now;
+  
+  if (m_micro_gcp.m_enabled == false && 
+      m_micro_gcp.m_master.m_time_between_gcp)
   {
     /**
      * Micro GCP is disabled...but configured...
      */
     jam();
-    check_enable_micro_gcp();
+    check_enable_micro_gcp(signal, true);
   }
-
-  m_micro_gcp.m_new_gci = m_micro_gcp.m_next_gci;
-  m_micro_gcp.m_next_gci++;
-
+  
   /**
    * Check that there has not been more than 2^32 micro GCP wo/ any save
    */
-  ndbrequire((m_micro_gcp.m_new_gci & 0xFFFFFFFF) != 0xFFFFFFFF);
-
+  Uint64 currGCI = m_micro_gcp.m_current_gci;
+  ndbrequire(Uint32(currGCI) != ~(Uint32)0);
+  m_micro_gcp.m_master.m_new_gci = currGCI + 1;
+  
+  Uint32 delaySave = m_gcp_save.m_master.m_time_between_gcp;
+  if ((m_micro_gcp.m_enabled == false) ||
+      (now >= m_gcp_save.m_master.m_start_time + delaySave && 
+       m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE))
+  {
+    jam();
+    /**
+     * Time for save...switch gci_hi
+     */
+    m_gcp_save.m_master.m_start_time = now;
+    m_micro_gcp.m_master.m_new_gci = Uint64((currGCI >> 32) + 1) << 32;
+  }
+  
+  ndbassert(m_micro_gcp.m_enabled || Uint32(m_micro_gcp.m_new_gci) == 0);
+  
+  
   /***************************************************************************/
   // Report the event that a global checkpoint has started.
   /***************************************************************************/
   signal->theData[0] = NDB_LE_GlobalCheckpointStarted; //Event type
-  signal->theData[1] = m_micro_gcp.m_new_gci >> 32;
-  signal->theData[2] = m_micro_gcp.m_new_gci & 0xFFFFFFFF;
+  signal->theData[1] = m_micro_gcp.m_master.m_new_gci >> 32;
+  signal->theData[2] = m_micro_gcp.m_master.m_new_gci & 0xFFFFFFFF;
   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
-
+  
   CRASH_INSERTION(7000);
-  m_micro_gcp.m_master_state = MicroGcp::M_GCP_PREPARE;
+  m_micro_gcp.m_master.m_state = MicroGcp::M_GCP_PREPARE;
   signal->setTrace(TestOrd::TraceGlobalCheckpoint);
 
 #ifdef ERROR_INSERT
@@ -8053,8 +8149,15 @@ void Dbdih::execGCP_PREPARECONF(Signal* 
   Uint32 senderNodeId = signal->theData[0];
   Uint32 gci_hi = signal->theData[1];
   Uint32 gci_lo = signal->theData[2];
+
+  if (unlikely(signal->getLength() < GCPPrepareConf::SignalLength))
+  {
+    gci_lo = 0;
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(senderNodeId).m_version));
+  }
+
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
-  ndbrequire(gci == m_micro_gcp.m_new_gci);
+  ndbrequire(gci == m_micro_gcp.m_master.m_new_gci);
   receiveLoopMacro(GCP_PREPARE, senderNodeId);
   //-------------------------------------------------------------
   // We have now received all replies. We are ready to continue
@@ -8067,7 +8170,7 @@ void Dbdih::gcpcommitreqLab(Signal* sign
 {
   CRASH_INSERTION(7001);
 
-  m_micro_gcp.m_master_state = MicroGcp::M_GCP_COMMIT;
+  m_micro_gcp.m_master.m_state = MicroGcp::M_GCP_COMMIT;
 
 #ifdef ERROR_INSERT
   if (ERROR_INSERTED(7187))
@@ -8093,39 +8196,7 @@ void Dbdih::execGCP_NODEFINISH(Signal* s
   const Uint32 gci_lo = signal->theData[3];
   const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
 
-#if 0
-  /**
-   * This section is #if 0
-   *   as I changed GCP_NODEFINISH to only be sent in return to
-   *   node sending GCP_COMMIT
-   *
-   * So...this can never arraive at "new master"
-   */
-
-  if (!isMaster()) {
-    jam();
-    ndbrequire(failureNr > cfailurenr);
-    //-------------------------------------------------------------
-    // Another node thinks we are master. This could happen when he
-    // has heard of a node failure which I have not heard of. Ignore
-    // signal in this case since we will discover it by sending
-    // MASTER_GCPREQ to the node.
-    //-------------------------------------------------------------
-    return;
-  } else if (cmasterState == MASTER_TAKE_OVER_GCP) {
-    jam();
-    //-------------------------------------------------------------
-    // We are currently taking over as master. Ignore
-    // signal in this case since we will discover it in reception of
-    // MASTER_GCPCONF.
-    //-------------------------------------------------------------
-    return;
-  } else {
-    ndbrequire(cmasterState == MASTER_ACTIVE);
-  }//if
-#endif
-
-  ndbrequire(m_micro_gcp.m_master_state == MicroGcp::M_GCP_COMMIT);
+  ndbrequire(m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_COMMIT);
   receiveLoopMacro(GCP_COMMIT, senderNodeId);
 
   jam();
@@ -8174,7 +8245,11 @@ void Dbdih::execGCP_NODEFINISH(Signal* s
    * New protocol
    */
   Uint64 now;
-  m_micro_gcp.m_master_state = MicroGcp::M_GCP_IDLE;
+  m_micro_gcp.m_master.m_state = MicroGcp::M_GCP_IDLE;
+
+  Uint32 curr_hi = m_micro_gcp.m_current_gci >> 32;
+  Uint32 old_hi = m_micro_gcp.m_old_gci >> 32;
+  
   if (m_micro_gcp.m_enabled)
   {
     jam();
@@ -8184,48 +8259,25 @@ void Dbdih::execGCP_NODEFINISH(Signal* s
       signal->theData[0] = DihContinueB::ZSTART_GCP;
       sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
     }
-
-    if (m_gcp_save.m_master_state != GcpSave::GCP_SAVE_IDLE)
-    {
-      jam();
-      return;
-    }
-
-    now = NdbTick_CurrentMillisecond();
-    if (m_gcp_save.m_start_time + m_gcp_save.m_time_between_gcp > now)
-    {
-      jam();
-      return;
-    }
   }
   else
   {
-    jam();
-    now = NdbTick_CurrentMillisecond();
-    ndbrequire(m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE);
+    ndbrequire(curr_hi != old_hi);
   }
   
-  /**
-   * Start a save
-   */
-  Uint32 saveGCI = (m_micro_gcp.m_old_gci >> 32) - 1;
-  if (saveGCI == m_gcp_save.m_gci)
+  if (curr_hi == old_hi)
   {
     jam();
-    /**
-     * gci_hi switch has not yet occured...wait until it has
-     */
-    ndbrequire(m_micro_gcp.m_enabled);
     return;
   }
 
-  m_gcp_save.m_master_state = GcpSave::GCP_SAVE_REQ;
-  m_gcp_save.m_gci = saveGCI;
-  m_gcp_save.m_start_time = now;
+  /**
+   * Start a save
+   */
+  Uint32 saveGCI = old_hi;
+  m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_REQ;
+  m_gcp_save.m_master.m_new_gci = saveGCI;
   
-  m_micro_gcp.m_next_gci = Uint32(m_micro_gcp.m_next_gci >> 32) + 1;
-  m_micro_gcp.m_next_gci <<= 32;
-
 #ifdef ERROR_INSERT
   if (ERROR_INSERTED(7188))
   {
@@ -8236,7 +8288,7 @@ void Dbdih::execGCP_NODEFINISH(Signal* s
     return;
   }
 #endif
-
+  
   sendLoopMacro(GCP_SAVEREQ, sendGCP_SAVEREQ);
   return;
 }//Dbdih::execGCP_NODEFINISH()
@@ -8260,21 +8312,18 @@ Dbdih::execGCP_SAVEREQ(Signal* signal)
      * This is master take over...
      * and SAVE_REQ is already running
      */
-    ndbrequire(m_gcp_save.m_master_ref != req->dihBlockRef);
     ndbrequire(m_gcp_save.m_gci == req->gci);
     m_gcp_save.m_master_ref = req->dihBlockRef;
     return;
   }
 
-  if (m_gcp_save.m_state == GcpSave::GCP_SAVE_CONF)
+  if (m_gcp_save.m_gci == req->gci)
   {
     jam();
     /**
      * This is master take over...
      * and SAVE_REQ is complete...
      */
-    ndbrequire(m_gcp_save.m_master_ref != req->dihBlockRef);
-    ndbrequire(m_gcp_save.m_gci == req->gci);
     m_gcp_save.m_master_ref = req->dihBlockRef;
 
     GCPSaveReq save = (* req);
@@ -8350,7 +8399,7 @@ void Dbdih::execGCP_SAVEREF(Signal* sign
 
 void Dbdih::GCP_SAVEhandling(Signal* signal, Uint32 nodeId) 
 {
-  ndbrequire(m_gcp_save.m_master_state == GcpSave::GCP_SAVE_REQ);
+  ndbrequire(m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_REQ);
   receiveLoopMacro(GCP_SAVEREQ, nodeId);
   /*-------------------------------------------------------------------------*/
   // All nodes have replied. We are ready to update the system file.
@@ -8376,7 +8425,7 @@ void Dbdih::GCP_SAVEhandling(Signal* sig
   }
   copyGciLab(signal, CopyGCIReq::GLOBAL_CHECKPOINT);
 
-  m_gcp_save.m_master_state = GcpSave::GCP_SAVE_COPY_GCI;
+  m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_COPY_GCI;
 
 }//Dbdih::GCP_SAVEhandling()
 
@@ -8403,12 +8452,19 @@ void Dbdih::execGCP_PREPARE(Signal* sign
   Uint32 masterNodeId = req->nodeId;
   Uint32 gci_hi = req->gci_hi;
   Uint32 gci_lo = req->gci_lo;
+  if (unlikely(signal->getLength() < GCPPrepare::SignalLength))
+  {
+    jam();
+    gci_lo = 0;
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(masterNodeId).m_version));
+  }
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
+
   BlockReference retRef = calcDihBlockRef(masterNodeId);
 
   if (isMaster())
   {
-    ndbrequire(m_micro_gcp.m_master_state == MicroGcp::M_GCP_PREPARE);
+    ndbrequire(m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_PREPARE);
   }
 
   if (m_micro_gcp.m_state == MicroGcp::M_GCP_PREPARE)
@@ -8418,27 +8474,23 @@ void Dbdih::execGCP_PREPARE(Signal* sign
      * This must be master take over
      *   Prepare is already complete
      */
-    ndbrequire(m_micro_gcp.m_master_ref != retRef);
     ndbrequire(m_micro_gcp.m_new_gci == gci);
     m_micro_gcp.m_master_ref = retRef;
-
     goto reply;
   }
 
-  ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_IDLE);
-  
-  if (m_micro_gcp.m_current_gci == gci)
+  if (m_micro_gcp.m_new_gci == gci)
   {
     jam();
     /**
      * This GCP has already been prepared...
      *   Must be master takeover
      */
-    infoEvent("keso");
-    ndbrequire(m_micro_gcp.m_master_ref != retRef);
-    // Note...dont set master_ref...and this will also be GCP_COMMIT:ed
+    m_micro_gcp.m_master_ref = retRef;
     goto reply;
   }
+  
+  ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_IDLE);
 
   cgckptflag = true;
   m_micro_gcp.m_state = MicroGcp::M_GCP_PREPARE;
@@ -8474,13 +8526,19 @@ void Dbdih::execGCP_COMMIT(Signal* signa
   Uint32 masterNodeId = req->nodeId;
   Uint32 gci_hi = req->gci_hi;
   Uint32 gci_lo = req->gci_lo;
+
+  if (unlikely(signal->getLength() < GCPCommit::SignalLength))
+  {
+    gci_lo = 0;
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(masterNodeId).m_version));
+  }
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
 
   Uint32 masterRef = calcDihBlockRef(masterNodeId);
   ndbrequire(masterNodeId = cmasterNodeId);
   if (isMaster())
   {
-    ndbrequire(m_micro_gcp.m_master_state == MicroGcp::M_GCP_COMMIT);
+    ndbrequire(m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_COMMIT);
   }
 
   if (m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT)
@@ -8490,23 +8548,20 @@ void Dbdih::execGCP_COMMIT(Signal* signa
      * This must be master take over
      *   Commit is already ongoing...
      */
-    ndbrequire(m_micro_gcp.m_master_ref != masterRef);
     ndbrequire(m_micro_gcp.m_current_gci == gci);
     m_micro_gcp.m_master_ref = masterRef;
     return;
   }
 
-  if (m_micro_gcp.m_state == MicroGcp::M_GCP_COMMITTED ||
-      m_micro_gcp.m_state == MicroGcp::M_GCP_IDLE)
+  if (m_micro_gcp.m_current_gci == gci)
   {
     jam();
     /**
      * This must be master take over
      *   Commit has already completed
      */
-    ndbrequire(m_micro_gcp.m_master_ref != masterRef);
-    ndbrequire(m_micro_gcp.m_current_gci == gci);
-
+    m_micro_gcp.m_master_ref = masterRef;
+    
     GCPNodeFinished* conf = (GCPNodeFinished*)signal->getDataPtrSend();
     conf->nodeId = cownNodeId;
     conf->gci_hi = m_micro_gcp.m_old_gci >> 32;
@@ -8517,6 +8572,7 @@ void Dbdih::execGCP_COMMIT(Signal* signa
     return;
   }
 
+  ndbrequire(m_micro_gcp.m_new_gci == gci);
   ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_PREPARE);
   m_micro_gcp.m_state = MicroGcp::M_GCP_COMMIT;
   m_micro_gcp.m_master_ref = calcDihBlockRef(masterNodeId);
@@ -8581,7 +8637,7 @@ Dbdih::execSUB_GCP_COMPLETE_REP(Signal* 
   SubGcpCompleteRep* rep = (SubGcpCompleteRep*)signal->getDataPtr();
   if (isMaster())
   {
-    ndbrequire(m_micro_gcp.m_master_state == MicroGcp::M_GCP_IDLE);
+    ndbrequire(m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_IDLE);
   }
   
   Uint32 masterRef = rep->senderRef;
@@ -8592,9 +8648,7 @@ Dbdih::execSUB_GCP_COMPLETE_REP(Signal* 
      * This must be master take over
      *   signal has already arrived
      */
-    ndbrequire(m_micro_gcp.m_master_ref != masterRef);
     m_micro_gcp.m_master_ref = masterRef;
-    ndbout_c(" - ignore");
     return;
   }
 
@@ -8918,25 +8972,8 @@ void Dbdih::execCOPY_GCICONF(Signal* sig
        */
       signal->theData[0] = DihContinueB::ZSTART_GCP;
       sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
-      
-      SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
-      rep->gci_hi = m_gcp_save.m_gci;
-      rep->gci_lo = 0;
-      rep->flags = SubGcpCompleteRep::ON_DISK;
-      
-      {
-        NodeRecordPtr specNodePtr;
-        specNodePtr.i = cfirstAliveNode;
-        do {
-          jam();
-          ptrCheckGuard(specNodePtr, MAX_NDB_NODES, nodeRecord);
-          sendSignal(calcDihBlockRef(specNodePtr.i), GSN_SUB_GCP_COMPLETE_REP,
-                     signal, SubGcpCompleteRep::SignalLength, JBA);
-          specNodePtr.i = specNodePtr.p->nextNode;
-        } while (specNodePtr.i != RNIL);
-      }
     }
-    m_gcp_save.m_master_state = GcpSave::GCP_SAVE_IDLE;
+    m_gcp_save.m_master.m_state = GcpSave::GCP_SAVE_IDLE;
 
     CRASH_INSERTION(7004);
     emptyWaitGCPMasterQueue(signal);    
@@ -9051,8 +9088,21 @@ void Dbdih::writingCopyGciLab(Signal* si
 
     EXECUTE_DIRECT(LGMAN, GSN_SUB_GCP_COMPLETE_REP, signal, 
 		   SubGcpCompleteRep::SignalLength);
+    
     jamEntry();
 
+    if (m_micro_gcp.m_enabled == false)
+    {
+      jam();
+      EXECUTE_DIRECT(SUMA, GSN_SUB_GCP_COMPLETE_REP, signal, 
+                     SubGcpCompleteRep::SignalLength);
+      jamEntry();
+      ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMITTED);
+      m_micro_gcp.m_state = MicroGcp::M_GCP_IDLE;
+
+      CRASH_INSERTION(7190);
+    }
+    
 #ifdef GCP_TIMER_HACK
     NdbTick_getMicroTimer(&globalData.gcp_timer_copygci[1]);
 
@@ -12598,7 +12648,6 @@ void Dbdih::initCommonData()
   crestartGci = 0;
   crestartInfoFile[0] = RNIL;
   crestartInfoFile[1] = RNIL;
-  cstartGcpNow = false;
   cstartPhase = 0;
   c_startToLock = RNIL;
   cstarttype = (Uint32)-1;
@@ -12641,7 +12690,7 @@ void Dbdih::initCommonData()
     Uint32 tmp = 2000;
     ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &tmp);
     tmp = tmp > 60000 ? 60000 : (tmp < 10 ? 10 : tmp);
-    m_gcp_save.m_time_between_gcp = tmp;
+    m_gcp_save.m_master.m_time_between_gcp = tmp;
 
     if (ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL, &tmp) == 0 &&
         tmp)
@@ -12649,12 +12698,11 @@ void Dbdih::initCommonData()
       /**
        * A value is set for micro gcp...run new protocol when applicable
        */
-      if (tmp > m_gcp_save.m_time_between_gcp)
-        tmp = m_gcp_save.m_time_between_gcp;
+      if (tmp > m_gcp_save.m_master.m_time_between_gcp)
+        tmp = m_gcp_save.m_master.m_time_between_gcp;
       if (tmp < 10)
         tmp = 10;
-      m_micro_gcp.m_time_between_gcp = tmp;
-      m_micro_gcp.m_enabled = true;
+      m_micro_gcp.m_master.m_time_between_gcp = tmp;
     }
   }
 }//Dbdih::initCommonData()
@@ -12682,7 +12730,7 @@ void Dbdih::initFragstore(FragmentstoreP
 /*       DESCRIPTION: INITIATE RESTART INFO VARIABLE AND VARIABLES FOR   */
 /*                    GLOBAL CHECKPOINTS.                                */
 /*************************************************************************/
-void Dbdih::initRestartInfo() 
+void Dbdih::initRestartInfo(Signal* signal) 
 {
   Uint32 i;
   for (i = 0; i < MAX_NDB_NODES; i++) {
@@ -12700,17 +12748,6 @@ void Dbdih::initRestartInfo() 
 
   m_micro_gcp.m_old_gci = Uint64(1) << 32;
   m_micro_gcp.m_current_gci = Uint64(2) << 32;
-  m_micro_gcp.m_new_gci = Uint64(2) << 32;
-  if (m_micro_gcp.m_enabled)
-  {
-    jam();
-    m_micro_gcp.m_next_gci = m_micro_gcp.m_new_gci + 1;
-  }
-  else
-  {
-    jam();
-    m_micro_gcp.m_next_gci = m_micro_gcp.m_new_gci + (Uint64(1) << 32);
-  }
   crestartGci = 1;
   c_newest_restorable_gci = 1;
 
@@ -12727,6 +12764,19 @@ void Dbdih::initRestartInfo() 
   Sysfile::setInitialStartOngoing(SYSFILE->systemRestartBits);
   srand(time(0));
   globalData.m_restart_seq = SYSFILE->m_restart_seq = 0;
+
+  if (m_micro_gcp.m_enabled == false && 
+      m_micro_gcp.m_master.m_time_between_gcp)
+  {
+    /**
+     * Micro GCP is disabled...but configured...
+     */
+    jam();
+    m_micro_gcp.m_enabled = true;
+    UpgradeProtocolOrd * ord = (UpgradeProtocolOrd*)signal->getDataPtrSend();
+    ord->type = UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP;
+    EXECUTE_DIRECT(QMGR,GSN_UPGRADE_PROTOCOL_ORD,signal,signal->getLength());
+  }
 }//Dbdih::initRestartInfo()
 
 /*--------------------------------------------------------------------*/
@@ -13396,7 +13446,7 @@ void Dbdih::nodeResetStart(Signal *signa
   {
     jam();
     ndbrequire(isMaster());
-    ndbrequire(m_micro_gcp.m_master_state == MicroGcp::M_GCP_IDLE);
+    ndbrequire(m_micro_gcp.m_master.m_state == MicroGcp::M_GCP_IDLE);
     signal->theData[0] = DihContinueB::ZSTART_GCP;
     sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
   }
@@ -14168,7 +14218,7 @@ void Dbdih::takeOverCompleted(Uint32 aNo
     ndbrequire(isMaster());
     Sysfile::setTakeOverNode(aNodeId, SYSFILE->takeOver, 0);
     takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_END_COPY;
-    cstartGcpNow = true;
+    m_micro_gcp.m_master.m_start_time = m_gcp_save.m_master.m_start_time = 0;
   }//if
 }//Dbdih::takeOverCompleted()
 
@@ -14688,8 +14738,6 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
   if (arg == DumpStateOrd::DihDumpNodeRestartInfo) {
     infoEvent("c_nodeStartMaster.blockLcp = %d, c_nodeStartMaster.blockGcp = %d, c_nodeStartMaster.wait = %d",
 	      c_nodeStartMaster.blockLcp, c_nodeStartMaster.blockGcp, c_nodeStartMaster.wait);
-    infoEvent("cstartGcpNow = %d",
-              cstartGcpNow);
     infoEvent("cfirstVerifyQueue = %d, cverifyQueueCounter = %d",
               cfirstVerifyQueue, cverifyQueueCounter);
     infoEvent("cgcpOrderBlocked = %d",
@@ -14803,8 +14851,6 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
   if (signal->theData[0] == 7009) {
     infoEvent("ccalcOldestRestorableGci = %d, cnoOfNodeGroups = %d",
               c_lcpState.oldestRestorableGci, cnoOfNodeGroups);
-    infoEvent("cstartGcpNow = %d",
-              cstartGcpNow);
     infoEvent("crestartGci = %d",
               crestartGci);
   }//if  
@@ -15036,7 +15082,7 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
     {
       tmp = signal->theData[1];
     }
-    m_gcp_save.m_time_between_gcp = tmp;
+    m_gcp_save.m_master.m_time_between_gcp = tmp;
     g_eventLogger.info("Setting time between gcp : %d", tmp);
   }
 
@@ -15838,7 +15884,7 @@ void Dbdih::execWAIT_GCP_REQ(Signal* sig
     jam();
 
     if((requestType == WaitGCPReq::CompleteIfRunning) &&
-       (m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE)) {
+       (m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE)) {
       jam();
       conf->senderData = senderData;
       conf->gcp = m_micro_gcp.m_old_gci >> 32;
@@ -15861,10 +15907,9 @@ void Dbdih::execWAIT_GCP_REQ(Signal* sig
     ptr.p->clientData = senderData;
     
     if((requestType == WaitGCPReq::CompleteForceStart) && 
-       (m_gcp_save.m_master_state == GcpSave::GCP_SAVE_IDLE)) {
+       (m_gcp_save.m_master.m_state == GcpSave::GCP_SAVE_IDLE)) {
       jam();
-      cstartGcpNow = true;
-      m_gcp_save.m_start_time = 0; //
+      m_micro_gcp.m_master.m_start_time = m_gcp_save.m_master.m_start_time = 0;
     }//if
     return;
   } else { 
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2007-09-07 12:16:16 +02:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2007-09-12 13:30:24 +02:00
@@ -2465,6 +2465,16 @@ void Dblqh::execPACKED_SIGNAL(Signal* si
 
   jamEntry();
   Tlength = signal->length();
+  Uint32 TcommitLen = 5;
+  Uint32 Tgci_lo_mask = ~(Uint32)0;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(refToNode(signal->getSendersBlockRef())).m_version)))
+  {
+    jam();
+    TcommitLen = 4;
+    Tgci_lo_mask = 0;
+  }
+  
   ndbrequire(Tlength <= 25);
   MEMCOPY_NO_WORDS(&TpackedData[0], &signal->theData[0], Tlength);
   while (Tlength > Tstep) {
@@ -2480,10 +2490,10 @@ void Dblqh::execPACKED_SIGNAL(Signal* si
       signal->theData[1] = sig1;
       signal->theData[2] = sig2;
       signal->theData[3] = sig3;
-      signal->theData[4] = sig4;
-      signal->header.theLength = 5;
+      signal->theData[4] = sig4 & Tgci_lo_mask;
+      signal->header.theLength = TcommitLen;
       execCOMMIT(signal);
-      Tstep += 5;
+      Tstep += TcommitLen;
       break;
     case ZCOMPLETE:
       jam();
@@ -2830,6 +2840,13 @@ void Dblqh::sendCommitLqh(Signal* signal
   Thostptr.p->packedWordsLqh[pos + 3] = transid2;
   Thostptr.p->packedWordsLqh[pos + 4] = gci_lo;
   Thostptr.p->noOfPackedWordsLqh = pos + 5;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(gci_lo == 0);
+    Thostptr.p->noOfPackedWordsLqh = pos + 4;
+  }
 }//Dblqh::sendCommitLqh()
 
 void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
@@ -6026,6 +6043,14 @@ void Dblqh::execCOMMITREQ(Signal* signal
   Uint32 transid2 = signal->theData[4];
   Uint32 tcOprec = signal->theData[6];
   Uint32 gci_lo = signal->theData[7];
+
+  if (unlikely(signal->getLength() < 8))
+  {
+    jam();
+    gci_lo = 0;
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(signal->getSendersBlockRef())).m_version));
+  }
+
   if (ERROR_INSERTED(5004)) {
     systemErrorLab(signal, __LINE__);
   }
@@ -12144,6 +12169,16 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
     return;
   }
 
+  if (unlikely(refToNode(signal->getSendersBlockRef()) != getOwnNodeId()))
+  {
+    jam();
+    ndbassert(!ndb_check_micro_gcp
+              (getNodeInfo(refToNode
+                           (signal->getSendersBlockRef())).m_version));
+    EXECUTE_DIRECT(DBDIH, GSN_GCP_SAVEREQ, signal, signal->getLength());
+    return;
+  }
+  
   const Uint32 dihBlockRef = saveReq->dihBlockRef;
   const Uint32 dihPtr = saveReq->dihPtr;
   const Uint32 gci = saveReq->gci;
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-09-07 12:16:16 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-09-12 13:30:24 +02:00
@@ -4079,6 +4079,13 @@ void Dbtc::sendtckeyconf(Signal* signal,
       regApiPtr->tcSendArray[Ti - 6];
   }//for
   localHostptr.p->packedWordsTCKEYCONF[TcurrLen + TpacketLen] = Tpack6;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(localHostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tpack6 == 0);
+    localHostptr.p->noOfWordsTCKEYCONF = TcurrLen + TpacketLen; // no gci_lo
+  }
 }//Dbtc::sendtckeyconf()
 
 void Dbtc::copyFromToLen(UintR* sourceBuffer, UintR* destBuffer, UintR Tlen)
@@ -4520,10 +4527,10 @@ void Dbtc::sendCommitLqh(Signal* signal,
   UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
   UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
   UintR Tdata1 = regTcPtr->lastLqhCon;
-  UintR Tdata2 = regApiPtr->globalcheckpointid >> 32; // XXX Jonas
+  UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
   UintR Tdata3 = regApiPtr->transid[0];
   UintR Tdata4 = regApiPtr->transid[1];
-  UintR Tdata5 = regApiPtr->globalcheckpointid & 0xFFFFFFFF;
+  UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
 
   TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
   TDataPtr[1] = Tdata2;
@@ -4531,6 +4538,13 @@ void Dbtc::sendCommitLqh(Signal* signal,
   TDataPtr[3] = Tdata4;
   TDataPtr[4] = Tdata5;
   Thostptr.p->noOfPackedWordsLqh = Tindex + 5;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tdata5 == 0);
+    Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo
+  }
 }//Dbtc::sendCommitLqh()
 
 void
@@ -7392,6 +7406,13 @@ void Dbtc::execLQH_TRANSCONF(Signal* sig
   tapplOprec   = lqhTransConf->apiOpRec;
   const Uint32 tableId = lqhTransConf->tableId;
   Uint32 gci_lo = lqhTransConf->gci_lo;
+  if (ttransStatus == LqhTransConf::Committed && 
+      unlikely(signal->getLength() < LqhTransConf::SignalLength))
+  {
+    jam();
+    gci_lo = 0;
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(tnodeid).m_version));
+  }
   tgci |= gci_lo;
 
   if (ttransStatus == LqhTransConf::LastTransConf){
@@ -8031,14 +8052,15 @@ void Dbtc::toCommitHandlingLab(Signal* s
           apiConnectptr.p->apiConnectstate = CS_WAIT_COMMIT_CONF;
           apiConnectptr.p->timeOutCounter = 0;
           tcConnectptr.p->tcConnectstate = OS_WAIT_COMMIT_CONF;
+          Uint64 gci = apiConnectptr.p->globalcheckpointid;
           signal->theData[0] = tcConnectptr.i;
           signal->theData[1] = cownref;
-          signal->theData[2] = apiConnectptr.p->globalcheckpointid >> 32; // XXX JON
+          signal->theData[2] = Uint32(gci >> 32); // XXX JON
           signal->theData[3] = apiConnectptr.p->transid[0];
           signal->theData[4] = apiConnectptr.p->transid[1];
           signal->theData[5] = apiConnectptr.p->tcBlockref;
           signal->theData[6] = tcConnectptr.p->tcOprec;
-          signal->theData[7] = apiConnectptr.p->globalcheckpointid& 0xFFFFFFFF;
+          signal->theData[7] = Uint32(gci);
           sendSignal(tblockref, GSN_COMMITREQ, signal, 8, JBB);
           return;
         }//if
@@ -12080,7 +12102,7 @@ void Dbtc::sendTcIndxConf(Signal* signal
   UintR Tpack3 = confInfo;
   UintR Tpack4 = regApiPtr->transid[0];
   UintR Tpack5 = regApiPtr->transid[1];
-  UintR Tpack6 = Uint32(regApiPtr->globalcheckpointid >> 32);
+  UintR Tpack6 = Uint32(regApiPtr->globalcheckpointid);
 
   localHostptr.p->noOfWordsTCINDXCONF = TcurrLen + TpacketLen + 1 /* gci_lo */;
 
@@ -12097,6 +12119,13 @@ void Dbtc::sendTcIndxConf(Signal* signal
           regApiPtr->tcIndxSendArray[Ti - 6];
   }//for
   localHostptr.p->packedWordsTCINDXCONF[TcurrLen + TpacketLen] = Tpack6;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(localHostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tpack6 == 0);
+    localHostptr.p->noOfWordsTCINDXCONF = TcurrLen + TpacketLen; // no gci_lo
+  }
 }//Dbtc::sendTcIndxConf()
 
 void Dbtc::execINDXKEYINFO(Signal* signal)
diff -Nrup a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
--- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2007-03-06 18:36:41 +01:00
+++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	2007-09-12 13:30:24 +02:00
@@ -278,6 +278,8 @@ private:
   void execARBIT_CHOOSEREF(Signal* signal);
   void execARBIT_STOPREP(Signal* signal);
 
+  void execUPGRADE_PROTOCOL_ORD(Signal*);
+  
   // Statement blocks
   void check_readnodes_reply(Signal* signal, Uint32 nodeId, Uint32 gsn);
   Uint32 check_startup(Signal* signal);
@@ -464,6 +466,8 @@ private:
                        Uint32 length, 
                        JobBufferLevel jbuf,
                        Uint32 minversion);
+
+  bool m_micro_gcp_enabled;
 };
 
 #endif
diff -Nrup a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp	2007-02-14 18:24:38 +01:00
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp	2007-09-12 13:30:24 +02:00
@@ -43,6 +43,8 @@ void Qmgr::initData() 
   ndbrequire((Uint32)NodeInfo::DB == 0);
   ndbrequire((Uint32)NodeInfo::API == 1);
   ndbrequire((Uint32)NodeInfo::MGM == 2); 
+
+  m_micro_gcp_enabled = false;
 }//Qmgr::initData()
 
 void Qmgr::initRecords() 
@@ -113,6 +115,8 @@ Qmgr::Qmgr(Block_context& ctx)
   addRecSignal(GSN_DIH_RESTARTREF, &Qmgr::execDIH_RESTARTREF);
   addRecSignal(GSN_DIH_RESTARTCONF, &Qmgr::execDIH_RESTARTCONF);
   addRecSignal(GSN_NODE_VERSION_REP, &Qmgr::execNODE_VERSION_REP);
+
+  addRecSignal(GSN_UPGRADE_PROTOCOL_ORD, &Qmgr::execUPGRADE_PROTOCOL_ORD);
   
   initData();
 }//Qmgr::Qmgr()
diff -Nrup a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2007-07-11 14:47:03 +02:00
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2007-09-12 13:30:24 +02:00
@@ -33,6 +33,7 @@
 #include <signaldata/FailRep.hpp>
 #include <signaldata/DisconnectRep.hpp>
 #include <signaldata/ApiBroadcast.hpp>
+#include <signaldata/Upgrade.hpp>
 
 #include <ndb_version.h>
 
@@ -652,6 +653,15 @@ void Qmgr::execCM_REGREQ(Signal* signal)
     return;
   }
 
+  if (!ndb_check_micro_gcp(startingVersion))
+  {
+    jam();
+    infoEvent("Connection from node %u refused as it's not micro GCP enabled",
+              addNodePtr.i);
+    sendCmRegrefLab(signal, Tblockref, CmRegRef::ZINCOMPATIBLE_VERSION);
+    return;
+  }
+
   if (check_start_type(start_type, c_start.m_start_type))
   {
     jam();
@@ -2805,6 +2815,18 @@ void Qmgr::node_failed(Signal* signal, U
   return;
 }//Qmgr::node_failed()
 
+void
+Qmgr::execUPGRADE_PROTOCOL_ORD(Signal* signal)
+{
+  const UpgradeProtocolOrd* ord = (UpgradeProtocolOrd*)signal->getDataPtr();
+  switch(ord->type){
+  case UpgradeProtocolOrd::UPO_ENABLE_MICRO_GCP:
+    jam();
+    m_micro_gcp_enabled = true;
+    return;
+  }
+}
+
 /**--------------------------------------------------------------------------
  * AN API NODE IS REGISTERING. IF FOR THE FIRST TIME WE WILL ENABLE 
  * COMMUNICATION WITH ALL NDB BLOCKS.
@@ -2833,10 +2855,21 @@ void Qmgr::execAPI_REGREQ(Signal* signal
 #endif
 
   bool compatability_check;
+  const char * extra = 0;
   NodeInfo::NodeType type= getNodeInfo(apiNodePtr.i).getType();
   switch(type){
   case NodeInfo::API:
-    compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version);
+    if (m_micro_gcp_enabled && !ndb_check_micro_gcp(version))
+    {
+      jam();
+      compatability_check = false;
+      extra = ": micro gcp enabled";
+    }
+    else
+    {
+      jam();
+      compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version);
+    }
     break;
   case NodeInfo::MGM:
     compatability_check = ndbCompatible_ndb_mgmt(NDB_VERSION, version);
@@ -2848,18 +2881,19 @@ void Qmgr::execAPI_REGREQ(Signal* signal
     infoEvent("Invalid connection attempt with type %d", type);
     return;
   }
-
+  
   if (!compatability_check) {
     jam();
     char buf[NDB_VERSION_STRING_BUF_SZ];
     infoEvent("Connection attempt from %s id=%d with %s "
-	      "incompatible with %s",
+	      "incompatible with %s%s",
 	      type == NodeInfo::API ? "api or mysqld" : "management server",
 	      apiNodePtr.i,
 	      ndbGetVersionString(version, mysql_version, 0,
-			       buf, 
-			       sizeof(buf)),
-	      NDB_VERSION_STRING);
+                                  buf, 
+                                  sizeof(buf)),
+	      NDB_VERSION_STRING,
+              extra ? extra : 0);
     apiNodePtr.p->phase = ZAPI_INACTIVE;
     sendApiRegRef(signal, ref, ApiRegRef::UnsupportedVersion);
     return;
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-09-07 07:29:59 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-09-12 13:30:24 +02:00
@@ -3857,8 +3857,15 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
   SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
   Uint32 gci_hi = ack->rep.gci_hi;
   Uint32 gci_lo = ack->rep.gci_lo;
-  Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
   Uint32 senderRef  = ack->rep.senderRef;
+  if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength))
+  {
+    jam();
+    ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(senderRef)).m_version));
+    gci_lo = 0;
+  }
+
+  Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
 
   if (refToBlock(senderRef) == SUMA) {
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2007-09-07 07:29:59 +02:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2007-09-12 13:30:24 +02:00
@@ -47,8 +47,6 @@
 extern EventLogger g_eventLogger;
 
 static Gci_container_pod g_empty_gci_container;
-static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4;
-static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
 
 #if defined(VM_TRACE) && defined(NOT_USED)
 static void
@@ -1744,7 +1742,8 @@ NdbEventBuffer::complete_bucket(Gci_cont
 }
 
 void
-NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
+NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep,
+                                         Uint32 len)
 {
   if (unlikely(m_active_op_count == 0))
   {
@@ -1755,6 +1754,12 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
 
   Uint32 gci_hi = rep->gci_hi;
   Uint32 gci_lo = rep->gci_lo;
+
+  if (unlikely(len < SubGcpCompleteRep::SignalLength))
+  {
+    gci_lo = 0;
+  }
+
   const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
   const Uint32 cnt= rep->gcp_complete_rep_count;
 
@@ -1910,7 +1915,7 @@ NdbEventBuffer::insert_event(NdbEventOpe
       if (impl->m_node_bit_mask.get(0u))
       {
         oid_ref = impl->m_oid;
-        insertDataL(impl, &data, ptr);
+        insertDataL(impl, &data, SubTableData::SignalLength, ptr);
       }
       NdbEventOperationImpl* blob_op = impl->theBlobOpList;
       while (blob_op != NULL)
@@ -1918,7 +1923,7 @@ NdbEventBuffer::insert_event(NdbEventOpe
         if (blob_op->m_node_bit_mask.get(0u))
         {
           oid_ref = blob_op->m_oid;
-          insertDataL(blob_op, &data, ptr);
+          insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
         }
         blob_op = blob_op->m_next;
       }
@@ -2106,7 +2111,7 @@ NdbEventBuffer::completeClusterFailed()
   rep.gci_hi= gci >> 32;
   rep.gci_lo= gci & 0xFFFFFFFF;
   rep.gcp_complete_rep_count= cnt;
-  execSUB_GCP_COMPLETE_REP(&rep);
+  execSUB_GCP_COMPLETE_REP(&rep, SubGcpCompleteRep::SignalLength);
 
   DBUG_VOID_RETURN;
 }
@@ -2120,6 +2125,7 @@ NdbEventBuffer::getLatestGCI()
 int
 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
 			    const SubTableData * const sdata, 
+                            Uint32 len,
 			    LinearSectionPtr ptr[3])
 {
   DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
@@ -2127,6 +2133,12 @@ NdbEventBuffer::insertDataL(NdbEventOper
   const Uint32 operation = SubTableData::getOperation(ri);
   Uint32 gci_hi = sdata->gci_hi;
   Uint32 gci_lo = sdata->gci_lo;
+
+  if (unlikely(len < SubTableData::SignalLength))
+  {
+    gci_lo = 0;
+  }
+
   Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
   const bool is_data_event = 
     operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
@@ -2251,7 +2263,7 @@ NdbEventBuffer::insertDataL(NdbEventOper
         op->m_has_error = 2;
         DBUG_RETURN_EVENT(-1);
       }
-      if (unlikely(copy_data(sdata, ptr, data, NULL)))
+      if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
       {
         op->m_has_error = 3;
         DBUG_RETURN_EVENT(-1);
@@ -2297,7 +2309,7 @@ NdbEventBuffer::insertDataL(NdbEventOper
     else
     {
       // event with same op, PK found, merge into old buffer
-      if (unlikely(merge_data(sdata, ptr, data, & bucket->m_data.m_sz)))
+      if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
       {
         op->m_has_error = 3;
         DBUG_RETURN_EVENT(-1);
@@ -2468,7 +2480,7 @@ NdbEventBuffer::dealloc_mem(EventBufData
 }
 
 int 
-NdbEventBuffer::copy_data(const SubTableData * const sdata,
+NdbEventBuffer::copy_data(const SubTableData * const sdata, Uint32 len,
                           LinearSectionPtr ptr[3],
                           EventBufData* data,
                           Uint32 * change_sz)
@@ -2478,6 +2490,12 @@ NdbEventBuffer::copy_data(const SubTable
   if (alloc_mem(data, ptr, change_sz) != 0)
     DBUG_RETURN_EVENT(-1);
   memcpy(data->sdata, sdata, sizeof(SubTableData));
+
+  if (unlikely(len < SubTableData::SignalLength))
+  {
+    data->sdata->gci_lo = 0;
+  }
+
   int i;
   for (i = 0; i <= 2; i++)
     memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
@@ -2545,7 +2563,7 @@ copy_attr(AttributeHeader ah,
 }
 
 int 
-NdbEventBuffer::merge_data(const SubTableData * const sdata,
+NdbEventBuffer::merge_data(const SubTableData * const sdata, Uint32 len,
                            LinearSectionPtr ptr2[3],
                            EventBufData* data,
                            Uint32 * change_sz)
@@ -2557,7 +2575,7 @@ NdbEventBuffer::merge_data(const SubTabl
   int t1 = SubTableData::getOperation(data->sdata->requestInfo);
   int t2 = SubTableData::getOperation(sdata->requestInfo);
   if (t1 == Ev_t::enum_NUL)
-    DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data, change_sz));
+    DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
 
   Ev_t* tp = 0;
   int i;
@@ -2843,7 +2861,7 @@ NdbEventBuffer::get_main_data(Gci_contai
   SubTableData sdata = *blob_data->sdata;
   sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
   SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
-  if (copy_data(&sdata, ptr, main_data, NULL) != 0)
+  if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
     DBUG_RETURN_EVENT(-1);
   hpos.data = main_data;
 
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2007-09-12 13:30:24 +02:00
@@ -514,9 +514,9 @@ public:
 
   // accessed from the "receive thread"
   int insertDataL(NdbEventOperationImpl *op,
-		  const SubTableData * const sdata,
+		  const SubTableData * const sdata, Uint32 len,
 		  LinearSectionPtr ptr[3]);
-  void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
+  void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const, Uint32 len);
   void complete_outof_order_gcis();
   
   void report_node_connected(Uint32 node_id);
@@ -543,11 +543,11 @@ public:
                 Uint32 * change_sz);
   void dealloc_mem(EventBufData* data,
                    Uint32 * change_sz);
-  int copy_data(const SubTableData * const sdata,
+  int copy_data(const SubTableData * const sdata, Uint32 len,
                 LinearSectionPtr ptr[3],
                 EventBufData* data,
                 Uint32 * change_sz);
-  int merge_data(const SubTableData * const sdata,
+  int merge_data(const SubTableData * const sdata, Uint32 len,
                  LinearSectionPtr ptr[3],
                  EventBufData* data,
                  Uint32 * change_sz);
diff -Nrup a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-09-12 13:30:24 +02:00
@@ -1638,13 +1638,18 @@ Parameters:    aSignal: The signal objec
 Remark:        
 ******************************************************************************/
 int			
-NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf)
+NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf, 
+                                     Uint32 len)
 { 
   if(checkState_TransId(&commitConf->transId1)){
     theCommitStatus = Committed;
     theCompletionStatus = CompletedSuccess;
     Uint32 tGCI_hi = commitConf->gci_hi;
     Uint32 tGCI_lo = commitConf->gci_lo;
+    if (unlikely(len < TcCommitConf::SignalLength))
+    {
+      tGCI_lo = 0;
+    }
     Uint64 tGCI = Uint64(tGCI_lo) | (Uint64(tGCI_hi) << 32);
     theGlobalCheckpointId = tGCI;
     // theGlobalCheckpointId == 0 if NoOp transaction
@@ -1825,7 +1830,11 @@ from other transactions.
     Uint32 tNoSent = theNoOfOpSent;
     theNoOfOpCompleted = tNoComp;
     Uint32 tGCI_hi = keyConf->gci_hi;
-    Uint32 tGCI_lo = * (Uint32*)&keyConf->operations[tNoOfOperations];
+    Uint32 tGCI_lo = * tPtr; // After op(s)
+    if (unlikely(aDataLength < TcKeyConf::StaticLength+1 + 2*tNoOfOperations))
+    {
+      tGCI_lo = 0;
+    }
     Uint64 tGCI = Uint64(tGCI_lo) | (Uint64(tGCI_hi) << 32);
     if (tCommitFlag == 1) {
       theCommitStatus = Committed;
@@ -2000,7 +2009,11 @@ NdbTransaction::receiveTCINDXCONF(const 
     }//for
     Uint32 tNoSent = theNoOfOpSent;
     Uint32 tGCI_hi = indxConf->gci_hi;
-    Uint32 tGCI_lo = * (Uint32*)&indxConf->operations[tNoOfOperations];
+    Uint32 tGCI_lo = * tPtr;
+    if (unlikely(aDataLength < TcIndxConf::SignalLength+1+2*tNoOfOperations))
+    {
+      tGCI_lo = 0;
+    }
     Uint64 tGCI = Uint64(tGCI_lo) | (Uint64(tGCI_hi) << 32);
 
     theNoOfOpCompleted = tNoComp;
diff -Nrup a/storage/ndb/src/ndbapi/Ndbif.cpp b/storage/ndb/src/ndbapi/Ndbif.cpp
--- a/storage/ndb/src/ndbapi/Ndbif.cpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp	2007-09-12 13:30:25 +02:00
@@ -532,7 +532,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* 
       tCon = void2con(tFirstDataPtr);
       if ((tCon->checkMagicNumber() == 0) &&
 	  (tCon->theSendStatus == NdbTransaction::sendTC_COMMIT)) {
-	tReturnCode = tCon->receiveTC_COMMITCONF(commitConf);
+	tReturnCode = tCon->receiveTC_COMMITCONF(commitConf, tLen);
 	if (tReturnCode != -1) {
 	  completedTransaction(tCon);
 	}//if
@@ -731,7 +731,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* 
   {
     const SubGcpCompleteRep * const rep=
       CAST_CONSTPTR(SubGcpCompleteRep, aSignal->getDataPtr());
-    theEventBuffer->execSUB_GCP_COMPLETE_REP(rep);
+    theEventBuffer->execSUB_GCP_COMPLETE_REP(rep, tLen);
     return;
   }
   case GSN_SUB_TABLE_DATA:
@@ -764,7 +764,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* 
 		       SubTableData::getOperation(sdata->requestInfo),
 		       sdata->tableId));
 
-    theEventBuffer->insertDataL(op,sdata, ptr);
+    theEventBuffer->insertDataL(op, sdata, tLen, ptr);
     return;
   }
   case GSN_DIHNDBTAMPER:
Thread
bk commit into 5.1 tree (jonas:1.2613)jonas12 Sep