Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1865 05/05/18 14:10:28 joreland@stripped +13 -0
wl1445 - ndb - event during NF
SUMA part
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
1.14 05/05/18 14:10:24 joreland@stripped +1 -0
API_START_REP
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
1.20 05/05/18 14:10:24 joreland@stripped +6 -0
API_START_REP
storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
1.15 05/05/18 14:10:24 joreland@stripped +22 -16
Impl. resend during NF
Change handover protocol
Add buffer management wrt NF
storage/ndb/src/kernel/blocks/suma/Suma.hpp
1.15 05/05/18 14:10:24 joreland@stripped +112 -61
Impl. resend during NF
Change handover protocol
Add buffer management wrt NF
storage/ndb/src/kernel/blocks/suma/Suma.cpp
1.35 05/05/18 14:10:24 joreland@stripped +792 -447
Impl. resend during NF
Change handover protocol
Add buffer management wrt NF
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
1.17 05/05/18 14:10:24 joreland@stripped +3 -0
Let Cntr redistribute API_START_REP
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
1.19 05/05/18 14:10:24 joreland@stripped +10 -0
Let Cntr redistribute API_START_REP
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp
1.7 05/05/18 14:10:23 joreland@stripped +1 -0
Let Cntr redistribute API_START_REP
storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
1.4 05/05/18 14:10:23 joreland@stripped +1 -0
Let Cntr redistribute API_START_REP
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.28 05/05/18 14:10:23 joreland@stripped +1 -0
Allow suma to use TUP pages
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
1.11 05/05/18 14:10:23 joreland@stripped +5 -2
Add signal names for suma signals
storage/ndb/include/kernel/signaldata/SumaImpl.hpp
1.10 05/05/18 14:10:23 joreland@stripped +16 -32
Add bucket mask to HandoverReq
Add ContinueB
storage/ndb/include/kernel/GlobalSignalNumbers.h
1.13 05/05/18 14:10:23 joreland@stripped +2 -1
Add API_START_REP service
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: joreland
# Host: eel.hemma.oreland.se.ndb.mysql.com.ndb.mysql.com.ndb.mysql.com.ndb.mysql.com
# Root: /home/jonas/src/mysql-5.1-wl2325
--- 1.12/storage/ndb/include/kernel/GlobalSignalNumbers.h Fri May 13 12:58:12 2005
+++ 1.13/storage/ndb/include/kernel/GlobalSignalNumbers.h Wed May 18 14:10:23 2005
@@ -169,6 +169,7 @@
#define GSN_ADD_FRAGREF 110
#define GSN_ADD_FRAGREQ 111
+#define GSN_API_START_REP 120
#define GSN_API_FAILCONF 113
#define GSN_API_FAILREQ 114
#define GSN_CNTR_START_REQ 115
@@ -176,7 +177,7 @@
#define GSN_CNTR_START_REF 117
#define GSN_CNTR_START_CONF 118
#define GSN_CNTR_START_REP 119
-/* 120 unused */
+/* 120 not unused */
/* 121 unused */
/* 122 unused */
/* 123 unused */
--- 1.9/storage/ndb/include/kernel/signaldata/SumaImpl.hpp Fri May 13 12:58:12 2005
+++ 1.10/storage/ndb/include/kernel/signaldata/SumaImpl.hpp Wed May 18 14:10:23 2005
@@ -29,7 +29,6 @@
friend struct SumaParticipant;
friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 6 );
enum SubscriptionType {
@@ -59,7 +58,6 @@
friend struct SumaParticipant;
friend bool printSUB_CREATE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 3 );
Uint32 senderRef;
@@ -75,7 +73,6 @@
friend struct SumaParticipant;
friend bool printSUB_CREATE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 senderRef;
@@ -83,7 +80,6 @@
};
struct SubscriptionData {
-public:
enum Part {
MetaData = 1,
TableData = 2
@@ -97,7 +93,6 @@
friend struct Suma;
friend bool printSUB_START_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 6 );
STATIC_CONST( SignalLength2 = SignalLength+1 );
@@ -117,7 +112,6 @@
friend struct Suma;
friend bool printSUB_START_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
enum ErrorCode {
Undefined = 1,
NF_FakeErrorREF = 11,
@@ -146,7 +140,6 @@
friend struct Grep;
friend bool printSUB_START_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 7 );
STATIC_CONST( SignalLength2 = SignalLength+1 );
@@ -168,7 +161,6 @@
friend struct Suma;
friend bool printSUB_STOP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 7 );
Uint32 senderRef;
Uint32 senderData;
@@ -186,7 +178,6 @@
friend struct Suma;
friend bool printSUB_STOP_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
enum ErrorCode {
Undefined = 1,
NF_FakeErrorREF = 11,
@@ -212,7 +203,6 @@
friend struct Grep;
friend bool printSUB_STOP_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 7 );
Uint32 senderRef;
@@ -232,10 +222,8 @@
friend struct Grep;
friend bool printSUB_SYNC_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 5 );
-public:
Uint32 senderRef;
Uint32 senderData;
Uint32 subscriptionId;
@@ -254,7 +242,6 @@
friend struct Grep;
friend bool printSUB_SYNC_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
enum ErrorCode {
Undefined = 1
};
@@ -274,7 +261,6 @@
friend struct Grep;
friend bool printSUB_SYNC_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 senderRef;
@@ -289,7 +275,6 @@
friend struct Grep;
friend bool printSUB_META_DATA(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 3 );
SECTION( DICT_TAB_INFO = 0 );
@@ -306,7 +291,6 @@
friend struct Grep;
friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 7 );
enum LogType {
@@ -338,7 +322,6 @@
friend struct Trix;
friend bool printSUB_SYNC_CONTINUE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 subscriberData;
@@ -354,7 +337,6 @@
friend struct Trix;
friend bool printSUB_SYNC_CONTINUE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 subscriptionId;
@@ -370,7 +352,6 @@
friend struct Trix;
friend bool printSUB_SYNC_CONTINUE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 subscriptionId;
@@ -388,7 +369,6 @@
friend struct Trix;
friend bool printSUB_GCP_COMPLETE_REP(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 3 );
Uint32 gci;
@@ -400,7 +380,6 @@
/**
* Sender(s)/Reciver(s)
*/
-public:
STATIC_CONST( SignalLength = SubGcpCompleteRep::SignalLength );
SubGcpCompleteRep rep;
@@ -414,7 +393,6 @@
friend struct SumaParticipant;
friend bool printSUB_REMOVE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 4 );
Uint32 senderRef;
@@ -431,7 +409,6 @@
friend struct SumaParticipant;
friend bool printSUB_REMOVE_REF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 5 );
enum ErrorCode {
Undefined = 1,
@@ -454,7 +431,6 @@
friend struct SumaParticipant;
friend bool printSUB_REMOVE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 5 );
Uint32 senderRef;
@@ -471,7 +447,6 @@
friend bool printCREATE_SUBSCRIPTION_ID_REQ(FILE *, const Uint32 *,
Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
Uint32 senderRef;
@@ -485,7 +460,6 @@
friend bool printCREATE_SUBSCRIPTION_ID_CONF(FILE *, const Uint32 *,
Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 4 );
Uint32 senderRef;
@@ -501,7 +475,6 @@
friend bool printCREATE_SUBSCRIPTION_ID_REF(FILE *, const Uint32 *,
Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 3 );
Uint32 senderRef;
@@ -510,7 +483,6 @@
};
struct SumaStartMeReq {
-public:
STATIC_CONST( SignalLength = 1 );
Uint32 unused;
};
@@ -529,14 +501,26 @@
};
struct SumaHandoverReq {
-public:
- STATIC_CONST( SignalLength = 1 );
+ STATIC_CONST( SignalLength = 3 );
Uint32 gci;
+ Uint32 nodeId;
+ Uint32 theBucketMask[1];
};
struct SumaHandoverConf {
-public:
- STATIC_CONST( SignalLength = 1 );
+ STATIC_CONST( SignalLength = 3 );
Uint32 gci;
+ Uint32 nodeId;
+ Uint32 theBucketMask[1];
};
+
+struct SumaContinueB
+{
+ enum
+ {
+ RESEND_BUCKET = 1
+ ,RELEASE_GCI = 2
+ };
+};
+
#endif
--- 1.10/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp Wed Apr 27 23:32:57 2005
+++ 1.11/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp Wed May 18 14:10:23 2005
@@ -506,10 +506,13 @@
,{ GSN_CREATE_EVNT_CONF, "CREATE_EVNT_CONF" }
,{ GSN_CREATE_EVNT_REF, "CREATE_EVNT_REF" }
- ,{ GSN_SUMA_START_ME, "SUMA_START_ME" }
+ ,{ GSN_SUMA_START_ME_REQ, "SUMA_START_ME_REQ" }
+ ,{ GSN_SUMA_START_ME_REF, "SUMA_START_ME_REF" }
+ ,{ GSN_SUMA_START_ME_CONF, "SUMA_START_ME_CONF" }
,{ GSN_SUMA_HANDOVER_REQ, "SUMA_HANDOVER_REQ"}
+ ,{ GSN_SUMA_HANDOVER_REF, "SUMA_HANDOVER_REF"}
,{ GSN_SUMA_HANDOVER_CONF, "SUMA_HANDOVER_CONF"}
-
+
,{ GSN_DROP_EVNT_REQ, "DROP_EVNT_REQ" }
,{ GSN_DROP_EVNT_CONF, "DROP_EVNT_CONF" }
,{ GSN_DROP_EVNT_REF, "DROP_EVNT_REF" }
--- 1.27/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Mon May 2 17:06:36 2005
+++ 1.28/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Wed May 18 14:10:23 2005
@@ -345,6 +345,7 @@
#endif
class Dbtup: public SimulatedBlock {
+ friend class Suma;
public:
typedef bool (Dbtup::* ReadFunction)(Uint32*,
--- 1.3/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp Fri Apr 8 02:44:05 2005
+++ 1.4/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp Wed May 18 14:10:23 2005
@@ -159,6 +159,7 @@
BLOCK_DEFINES(Ndbcntr);
// Transit signals
+ void execAPI_START_REP(Signal*);
void execCONTINUEB(Signal* signal);
void execREAD_NODESCONF(Signal* signal);
void execREAD_NODESREF(Signal* signal);
--- 1.6/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp Fri Apr 8 02:44:05 2005
+++ 1.7/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrInit.cpp Wed May 18 14:10:23 2005
@@ -55,6 +55,7 @@
addRecSignal(GSN_CNTR_START_CONF, &Ndbcntr::execCNTR_START_CONF);
addRecSignal(GSN_CNTR_WAITREP, &Ndbcntr::execCNTR_WAITREP);
addRecSignal(GSN_CNTR_START_REP, &Ndbcntr::execCNTR_START_REP);
+ addRecSignal(GSN_API_START_REP, &Ndbcntr::execAPI_START_REP, true);
addRecSignal(GSN_NODE_FAILREP, &Ndbcntr::execNODE_FAILREP);
addRecSignal(GSN_SYSTEM_ERROR , &Ndbcntr::execSYSTEM_ERROR);
--- 1.18/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp Fri May 13 12:58:12 2005
+++ 1.19/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp Wed May 18 14:10:24 2005
@@ -136,6 +136,16 @@
}//switch
}//Ndbcntr::execCONTINUEB()
+void
+Ndbcntr::execAPI_START_REP(Signal* signal)
+{
+ if(refToBlock(signal->getSendersBlockRef()) == QMGR)
+ {
+ for(Uint32 i = 0; i<ALL_BLOCKS_SZ; i++){
+ sendSignal(ALL_BLOCKS[i].Ref, GSN_API_START_REP, signal, 1, JBB);
+ }
+ }
+}
/*******************************/
/* SYSTEM_ERROR */
/*******************************/
--- 1.16/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp Wed Apr 27 23:32:58 2005
+++ 1.17/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp Wed May 18 14:10:24 2005
@@ -1993,6 +1993,9 @@
apiNodePtr.p->blockRef = ref;
signal->theData[0] = apiNodePtr.i;
sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA);
+
+ signal->theData[0] = apiNodePtr.i;
+ EXECUTE_DIRECT(NDBCNTR, GSN_API_START_REP, signal, 1);
}
return;
}//Qmgr::execAPI_REGREQ()
--- 1.34/storage/ndb/src/kernel/blocks/suma/Suma.cpp Fri May 13 12:58:12 2005
+++ 1.35/storage/ndb/src/kernel/blocks/suma/Suma.cpp Wed May 18 14:10:24 2005
@@ -46,6 +46,7 @@
#include <ndbapi/NdbDictionary.hpp>
#include <DebuggerNames.hpp>
+#include <../dbtup/Dbtup.hpp>
//#define HANDOVER_DEBUG
//#define NODEFAIL_DEBUG
@@ -76,6 +77,7 @@
Uint32 g_subPtrI = RNIL;
static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
+static const Uint32 MAX_CONCURRENT_GCP = 2;
/**************************************************************
*
@@ -118,7 +120,7 @@
const Uint32 replicas= c_noNodesInGroup;
Uint32 buckets= 1;
- for(i = 0; i<replicas; i++)
+ for(i = 1; i <= replicas; i++)
buckets *= i;
for(i = 0; i<buckets; i++)
@@ -131,7 +133,6 @@
}
c_no_of_buckets= buckets;
- // ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
#ifndef DBUG_OFF
@@ -160,6 +161,7 @@
if(startphase == 3)
{
jam();
+ ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
signal->theData[0] = reference();
sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
DBUG_VOID_RETURN;
@@ -183,17 +185,18 @@
if (typeOfStart != NodeState::ST_NODE_RESTART &&
typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
{
- for( int i = 0; i < NO_OF_BUCKETS; i++)
+ for( Uint32 i = 0; i < c_no_of_buckets; i++)
{
- if (getResponsibleSumaNodeId(i) == refToNode(reference()))
+ if (get_responsible_node(i) == getOwnNodeId())
{
// I'm running this bucket
DBUG_PRINT("info",("bucket %u set to true", i));
- c_buckets[i].active = true;
+ m_active_buckets.set(i);
+ ndbout_c("m_active_buckets.set(%d)", i);
}
}
}
-
+
if(typeOfStart == NodeState::ST_INITIAL_START &&
c_masterNodeId == getOwnNodeId())
{
@@ -202,7 +205,7 @@
DBUG_VOID_RETURN;
}//if
}//if
-
+
if(startphase == 9)
{
if (typeOfStart == NodeState::ST_NODE_RESTART ||
@@ -211,21 +214,22 @@
/**
* Handover code here
*/
- DBUG_VOID_RETURN;
+ c_startup.m_wait_handover= true;
+ check_start_handover(signal);
}
}
sendSTTORRY(signal);
-
+
DBUG_VOID_RETURN;
}
void
Suma::send_start_me_req(Signal* signal)
{
- Uint32 nodeId= c_restart_server_node_id;
+ Uint32 nodeId= c_startup.m_restart_server_node_id;
do {
- nodeId = c_started_nodes.find(nodeId);
-
+ nodeId = c_alive_nodes.find(nodeId + 1);
+
if(nodeId == getOwnNodeId())
continue;
if(nodeId == NdbNodeBitmask::NotFound)
@@ -236,7 +240,9 @@
break;
} while(true);
- c_restart_server_node_id= nodeId;
+
+ infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
+ c_startup.m_restart_server_node_id= nodeId;
sendSignal(calcSumaBlockRef(nodeId),
GSN_SUMA_START_ME_REQ, signal, 1, JBB);
}
@@ -246,13 +252,19 @@
{
const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
ndbrequire(ref->errorCode == SumaStartMeRef::Busy);
+
+ infoEvent("Suma: node %d refused %d",
+ c_startup.m_restart_server_node_id, ref->errorCode);
send_start_me_req(signal);
}
void
Suma::execSUMA_START_ME_CONF(Signal* signal)
{
+ infoEvent("Suma: node %d has completed restoring me",
+ c_startup.m_restart_server_node_id);
sendSTTORRY(signal);
+ c_startup.m_restart_server_node_id= 0;
}
void
@@ -290,31 +302,72 @@
jamEntry();
ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
- c_started_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
- c_starting_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
-
- c_alive_nodes.assign(c_started_nodes);
- c_alive_nodes.bitOR(c_starting_nodes);
-
+ if(getNodeState().getNodeRestartInProgress())
+ {
+ c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
+ c_alive_nodes.set(getOwnNodeId());
+ }
+ else
+ {
+ c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
+ NdbNodeBitmask tmp;
+ tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
+ ndbrequire(tmp.isclear()); // No nodes can be started during SR
+ }
+
c_masterNodeId = conf->masterNodeId;
sendSTTORRY(signal);
}
void
-Suma::execNODE_START_REP(Signal* signal)
+Suma::execAPI_START_REP(Signal* signal)
{
Uint32 nodeId = signal->theData[0];
- if(c_nodes_in_nodegroup_mask.get(nodeId))
+ c_connected_nodes.set(nodeId);
+
+ if(c_startup.m_wait_handover)
{
- /**
- * A handover should have happened for all nodes in my node group
- */
- ndbrequire(c_started_nodes.get(nodeId));
+ check_start_handover(signal);
+ }
+}
+
+void
+Suma::check_start_handover(Signal* signal)
+{
+ NodeBitmask tmp;
+ tmp.assign(c_connected_nodes);
+ tmp.bitAND(c_subscriber_nodes);
+ if(!c_subscriber_nodes.equal(tmp))
+ {
+ return;
}
- c_started_nodes.set(nodeId);
- c_starting_nodes.clear(nodeId);
+ c_startup.m_wait_handover= false;
+ send_handover_req(signal);
+}
+
+void
+Suma::send_handover_req(Signal* signal)
+{
+ NdbNodeBitmask tmp;
+ tmp.assign(c_alive_nodes);
+ tmp.bitAND(c_nodes_in_nodegroup_mask);
+ tmp.clear(getOwnNodeId());
+ Uint32 gci= m_last_complete_gci + 3;
+
+ SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
+ char buf[255];
+ tmp.getText(buf);
+ infoEvent("Suma: initiate handover with nodes %s GCI: %d",
+ buf, gci);
+
+ req->gci = gci;
+ req->nodeId = getOwnNodeId();
+
+ NodeReceiverGroup rg(SUMA, tmp);
+ sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
+ SumaHandoverReq::SignalLength, JBB);
}
#if 0
@@ -372,6 +425,19 @@
void
Suma::execCONTINUEB(Signal* signal){
jamEntry();
+ Uint32 type= signal->theData[0];
+ switch(type){
+ case SumaContinueB::RELEASE_GCI:
+ release_gci(signal, signal->theData[1], signal->theData[2]);
+ return;
+ case SumaContinueB::RESEND_BUCKET:
+ resend_bucket(signal,
+ signal->theData[1],
+ signal->theData[2],
+ signal->theData[3],
+ signal->theData[4]);
+ return;
+ }
}
@@ -389,12 +455,27 @@
//BlockReference retRef = signal->theData[1];
c_failedApiNodes.set(failedApiNode);
+ c_connected_nodes.clear(failedApiNode);
bool found = removeSubscribersOnNode(signal, failedApiNode);
if(!found){
jam();
c_failedApiNodes.clear(failedApiNode);
}
+
+ SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
+ Ptr<Gcp_record> gcp;
+ for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
+ {
+ ack->rep.gci = gcp.p->m_gci;
+ if(gcp.p->m_subscribers.get(failedApiNode))
+ {
+ ack->rep.senderRef = numberToRef(0, failedApiNode);
+ sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal,
+ SubGcpCompleteAck::SignalLength, JBB);
+ }
+ }
+
DBUG_VOID_RETURN;
}//execAPI_FAILREQ()
@@ -527,58 +608,48 @@
const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
- bool changed = false;
-
- c_nodeFailGCI = getFirstGCI(signal);
if(failed.get(Restart.nodeId))
{
Restart.nodeId = 0;
}
-
+
+ signal->theData[0] = SumaContinueB::RESEND_BUCKET;
+
+ NdbNodeBitmask tmp;
+ tmp.assign(c_alive_nodes);
+ tmp.bitANDC(failed);
+
if(c_nodes_in_nodegroup_mask.overlaps(failed))
{
- Uint32 nodeId= failed.find(0);
- for(; nodeId != NdbNodeBitmask::NotFound; nodeId= failed.find(nodeId))
+ for( Uint32 i = 0; i < c_no_of_buckets; i++)
{
- if (c_handoverToDo) {
- jam();
- // TODO what if I'm a SUMA that is currently restarting and the SUMA
- // responsible for restarting me is the one that died?
-
- // a node has failed whilst handover is going on
- // let's check if we're in the process of handover with that node
- c_handoverToDo = false;
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- if (c_buckets[i].handover) {
- // I'm doing handover, but is it with the dead node?
- if (getResponsibleSumaNodeId(i) == nodeId) {
- // so it was the dead node, has handover started?
- if (c_buckets[i].handover_started) {
- jam();
- // we're not ok and will have lost data!
- // set not active to indicate this -
- // this will generate takeover behaviour
- c_buckets[i].active = false;
- c_buckets[i].handover_started = false;
- } // else we're ok to revert back to state before
- c_buckets[i].handover = false;
- } else {
- jam();
- // ok, we're doing handover with a different node
- c_handoverToDo = true;
- DBUG_PRINT("info",("Handover to do"));
- }
- }
+ if(m_active_buckets.get(i))
+ continue;
+ else if(m_switchover_buckets.get(i))
+ {
+ Uint32 state= c_buckets[i].m_state;
+ if((state & Bucket::BUCKET_HANDOVER) &&
+ failed.get(get_responsible_node(i)))
+ {
+ m_active_buckets.set(i);
+ m_switchover_buckets.clear(i);
+ ndbout_c("aborting handover");
+ }
+ else if(state & Bucket::BUCKET_STARTING)
+ {
+ progError(__LINE__, ERR_SYSTEM_ERROR,
+ "Nodefailure during SUMA takeover");
}
}
+ else if(get_responsible_node(i, tmp) == getOwnNodeId())
+ {
+ start_resend(signal, i);
+ }
}
}
-
- c_failoverBuffer.nodeFailRep();
- c_alive_nodes.bitAND(failed); // this has to be done after the loop above
- c_started_nodes.bitAND(failed);
- c_starting_nodes.bitAND(failed);
+ c_alive_nodes.assign(tmp);
+
DBUG_VOID_RETURN;
}
@@ -591,7 +662,6 @@
ndbrequire(!c_alive_nodes.get(nodeId));
c_alive_nodes.set(nodeId);
- c_starting_nodes.set(nodeId); // set as being prepared
#if 0 // if we include this DIH's got to be prepared, later if needed...
signal->theData[0] = reference();
@@ -692,6 +762,24 @@
infoEvent("Suma: c_removeDataSubscribers count: %d",
count_subscribers(c_removeDataSubscribers));
}
+
+ if(tCase == 8005)
+ {
+ for(Uint32 i = 0; i<c_no_of_buckets; i++)
+ {
+ Bucket* ptr= c_buckets + i;
+ infoEvent("Bucket %d %d%d-%x switch gci: %d max_acked_gci: %d max_gci: %d tail: %d head: %d",
+ i,
+ m_active_buckets.get(i),
+ m_switchover_buckets.get(i),
+ ptr->m_state,
+ ptr->m_switchover_gci,
+ ptr->m_max_acked_gci,
+ ptr->m_buffer_head.m_max_gci,
+ ptr->m_buffer_tail,
+ ptr->m_buffer_head.m_page_id);
+ }
+ }
}
/*************************************************************
@@ -932,15 +1020,6 @@
return;
}
-Uint32
-Suma::getFirstGCI(Signal* signal) {
- if (c_lastCompleteGCI == RNIL) {
- ndbout_c("WARNING: c_lastCompleteGCI == RNIL");
- return 0;
- }
- return c_lastCompleteGCI+3;
-}
-
/**********************************************************
*
* Setting upp trigger for subscription
@@ -1273,10 +1352,7 @@
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
subPtr.p->m_table_ptrI= tabPtr.i;
-
- Uint32 gci= getFirstGCI(signal);
- subbPtr.p->m_firstGCI = gci;
- sendSubStartComplete(signal,subbPtr,gci,
+ sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
SubscriptionData::TableData);
}
DBUG_VOID_RETURN;
@@ -1743,7 +1819,7 @@
req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
req->clientOpPtr = (ptrI << 16);
- req->batch_size_rows= 16;
+ req->batch_size_rows= parallelism;
req->batch_size_bytes= 0;
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@@ -1933,8 +2009,8 @@
key.m_subscriptionId = req->subscriptionId;
key.m_subscriptionKey = req->subscriptionKey;
- if (c_restart_server_node_id &&
- refToNode(senderRef) != c_restart_server_node_id)
+ if (c_startup.m_restart_server_node_id &&
+ refToNode(senderRef) != c_startup.m_restart_server_node_id)
{
/**
* only allow "restart_server" Suma's to come through
@@ -1969,6 +2045,8 @@
c_subscriberPool.getSize(),
c_subscriberPool.getNoOfFree()));
+ c_subscriber_nodes.set(refToNode(subscriberRef));
+
// setup subscription record
subPtr.p->m_state = Subscription::LOCKED;
// store these here for later use
@@ -1978,11 +2056,6 @@
// setup subscriber record
subbPtr.p->m_senderRef = subscriberRef;
subbPtr.p->m_senderData = subscriberData;
- subbPtr.p->m_firstGCI = RNIL;
- if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
- subbPtr.p->m_lastGCI = 0;
- else
- subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
subbPtr.p->m_subPtrI= subPtr.i;
DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
@@ -2012,9 +2085,9 @@
void
Suma::sendSubStartComplete(Signal* signal,
- SubscriberPtr subbPtr,
- Uint32 firstGCI,
- SubscriptionData::Part part)
+ SubscriberPtr subbPtr,
+ Uint32 firstGCI,
+ SubscriptionData::Part part)
{
jam();
DBUG_ENTER("Suma::sendSubStartComplete");
@@ -2735,6 +2808,9 @@
ndbrequire(b_bufferLock == trigId);
+ memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
+ b_trigBufferSize += dataLen;
+
// printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
} else {
jam();
@@ -2747,10 +2823,11 @@
} else {
ndbrequire(f_bufferLock == trigId);
}
+
+ memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
+ f_trigBufferSize += dataLen;
}
- memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
- b_trigBufferSize += dataLen;
DBUG_VOID_RETURN;
}
@@ -2760,29 +2837,18 @@
#endif
Uint32
-Suma::getStoreBucket(Uint32 v)
-{
- // id will contain id to responsible suma or
- // RNIL if we don't have nodegroup info yet
-
- const Uint32 N = NO_OF_BUCKETS;
- const Uint32 D = v % N; // Distibution key
- return D;
-}
-
-Uint32
-Suma::getResponsibleSumaNodeId(Uint32 bucket)
+Suma::get_responsible_node(Uint32 bucket) const
{
// id will contain id to responsible suma or
// RNIL if we don't have nodegroup info yet
jam();
Uint32 node;
- Bucket* ptr= c_buckets + bucket;
+ const Bucket* ptr= c_buckets + bucket;
for(Uint32 i = 0; i<MAX_REPLICAS; i++)
{
node= ptr->m_nodes[i];
- if(c_started_nodes.get(node))
+ if(c_alive_nodes.get(node))
{
break;
}
@@ -2800,57 +2866,35 @@
return node;
}
-Uint32
-Suma::decideWhoToSend(Uint32 nBucket, Uint32 gci){
- bool replicaFlag = true;
- Uint32 nId = RNIL;
-
- // bucket active/not active set by GCP_COMPLETE
- if (c_buckets[nBucket].active) {
- if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) {
- jam();
- replicaFlag = true; // let the other node send this
- nId = RNIL;
- // mark this as started, if we get a node failiure now we have some lost stuff
- c_buckets[nBucket].handover_started = true;
- } else {
- jam();
- replicaFlag = false;
- nId = refToNode(reference());
- }
- } else {
- nId = getResponsibleSumaNodeId(nBucket);
- replicaFlag = !(nId == refToNode(reference()));
-
- if (!replicaFlag) {
- if (!c_buckets[nBucket].handover) {
- jam();
- // appearently a node has failed and we are taking over sending
- // from that bucket. Now we need to go back to latest completed
- // GCI. Handling will depend on Subscriber and Subscription
-
- // TODO, for now we make an easy takeover
- if (gci < c_nodeFailGCI)
- c_lastInconsistentGCI = gci;
-
- // we now have responsability for this bucket and we're actively
- // sending from that
- c_buckets[nBucket].active = true;
- DBUG_PRINT("info",("Takeover Bucket %u", nBucket));
- } else if (c_buckets[nBucket].handoverGCI > gci) {
- jam();
- replicaFlag = true; // handover going on, but don't start sending yet
- nId = RNIL;
- } else {
- jam();
- DBUG_PRINT("info",("Possible error: Will send from GCI = %u", gci));
- }
+Uint32
+Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
+{
+ jam();
+ Uint32 node;
+ const Bucket* ptr= c_buckets + bucket;
+ for(Uint32 i = 0; i<MAX_REPLICAS; i++)
+ {
+ node= ptr->m_nodes[i];
+ if(mask.get(node))
+ {
+ return node;
}
}
- DBUG_PRINT("info",("Suma:bucket %u, responsible id = %u, replicaFlag = %u",
- nBucket, nId, (Uint32)replicaFlag));
- return replicaFlag;
+ return 0;
+}
+
+bool
+Suma::check_switchover(Uint32 bucket, Uint32 gci)
+{
+ const Uint32 send_mask = (Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER);
+ bool send = c_buckets[bucket].m_state & send_mask;
+ ndbassert(m_switchover_buckets.get(bucket));
+ if(unlikely(gci >= c_buckets[bucket].m_switchover_gci))
+ {
+ return send;
+ }
+ return !send;
}
void
@@ -2877,99 +2921,97 @@
f_bufferLock = 0;
b_bufferLock = 0;
- bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
- if (replicaFlag) {
- jam();
- DBUG_PRINT("info",("Other node sending"));
- c_failoverBuffer.subTableData(gci,NULL,0);
- DBUG_VOID_RETURN;
- }
- Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
- ndbrequire(sz == f_trigBufferSize);
-
- /**
- * Reformat as "all headers" + "all data"
- */
- Uint32 dataLen = 0;
- Uint32 noOfAttrs = 0;
- Uint32 * src = f_buffer;
- Uint32 * headers = signal->theData + 25;
- Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
-
- LinearSectionPtr ptr[3];
- int nptr;
-
- ptr[0].p = headers;
- ptr[1].p = dst;
-
- while(sz > 0){
- jam();
- Uint32 tmp = * src ++;
- * headers ++ = tmp;
- Uint32 len = AttributeHeader::getDataSize(tmp);
- memcpy(dst, src, 4 * len);
- dst += len;
- src += len;
+ Uint32 bucket= hashValue % c_no_of_buckets;
+ m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
+ if(m_active_buckets.get(bucket) ||
+ (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
+ {
+ m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
+ Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
+ ndbrequire(sz == f_trigBufferSize);
+
+ /**
+ * Reformat as "all headers" + "all data"
+ */
+ Uint32 dataLen = 0;
+ Uint32 noOfAttrs = 0;
+ Uint32 * src = f_buffer;
+ Uint32 * headers = signal->theData + 25;
+ Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
+
+ LinearSectionPtr ptr[3];
+ int nptr;
+
+ ptr[0].p = headers;
+ ptr[1].p = dst;
+
+ while(sz > 0){
+ jam();
+ Uint32 tmp = * src ++;
+ * headers ++ = tmp;
+ Uint32 len = AttributeHeader::getDataSize(tmp);
+ memcpy(dst, src, 4 * len);
+ dst += len;
+ src += len;
+
+ noOfAttrs++;
+ dataLen += len;
+ sz -= (1 + len);
+ }
+ ndbrequire(sz == 0);
+
+ ptr[0].sz = noOfAttrs;
+ ptr[1].sz = dataLen;
+
+ if (b_trigBufferSize > 0) {
+ jam();
+ ptr[2].p = b_buffer;
+ ptr[2].sz = b_trigBufferSize;
+ nptr = 3;
+ } else {
+ jam();
+ nptr = 2;
+ }
+
+ /**
+ * Signal to subscriber(s)
+ */
+ ndbrequire(tabPtr.p = c_tablePool.getPtr(tabPtr.i));
+
+ SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
+ data->gci = gci;
+ data->tableId = tabPtr.p->m_tableId;
+ data->operation = event;
+ data->noOfAttributes = noOfAttrs;
+ data->dataSize = dataLen;
+ data->logType = 0;
- noOfAttrs++;
- dataLen += len;
- sz -= (1 + len);
- }
- ndbrequire(sz == 0);
-
- ptr[0].sz = noOfAttrs;
- ptr[1].sz = dataLen;
-
- if (b_trigBufferSize > 0) {
- jam();
- ptr[2].p = b_buffer;
- ptr[2].sz = b_trigBufferSize;
- nptr = 3;
- } else {
- jam();
- nptr = 2;
- }
-
- /**
- * Signal to subscriber(s)
- */
- ndbrequire(tabPtr.p = c_tablePool.getPtr(tabPtr.i));
-
- SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
- data->gci = gci;
- data->tableId = tabPtr.p->m_tableId;
- data->operation = event;
- data->noOfAttributes = noOfAttrs;
- data->dataSize = dataLen;
- data->logType = 0;
- if (c_lastInconsistentGCI == data->gci) {
- data->setGCINotConsistent();
- }
-
- {
- LocalDLList<Subscriber>
- subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
- SubscriberPtr subbPtr;
- for(subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr))
{
- if (c_handoverToDo && subbPtr.p->m_firstGCI > gci)
+ LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
{
- DBUG_PRINT("info",("c_handoverToDo: %d, m_firstGCI = %d, gci = %d",
- c_handoverToDo, subbPtr.p->m_firstGCI, gci ));
- jam();
- // we're restarting and waiting for the right gci
- continue;
+ DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
+ refToNode(subbPtr.p->m_senderRef)));
+ data->senderData = subbPtr.p->m_senderData;
+ sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength, JBB, ptr, nptr);
}
-
- DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
- refToNode(subbPtr.p->m_senderRef)));
- data->senderData = subbPtr.p->m_senderData;
- sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
- SubTableData::SignalLength, JBB, ptr, nptr);
}
}
-
+ else
+ {
+ Uint32* dst;
+ Uint32 sz = f_trigBufferSize + b_trigBufferSize;
+ if((dst = get_buffer_ptr(bucket, gci, sz)))
+ {
+ memcpy(dst, f_buffer, f_trigBufferSize << 2);
+ dst += f_trigBufferSize;
+ memcpy(dst, b_buffer, b_trigBufferSize << 2);
+ }
+ }
+
DBUG_VOID_RETURN;
}
@@ -2983,7 +3025,7 @@
rep->senderRef = reference();
Uint32 gci = rep->gci;
- c_lastCompleteGCI = gci;
+ m_last_complete_gci = gci;
/**
* Signal to subscriber(s)
@@ -2995,18 +3037,10 @@
if (it.curr.p->m_state == Table::DEFINING)
continue;
- LocalDLList<Subscriber>
- subbs(c_subscriberPool,it.curr.p->c_subscribers);
+ LocalDLList<Subscriber>list(c_subscriberPool,it.curr.p->c_subscribers);
SubscriberPtr subbPtr;
- for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
+ for(list.first(subbPtr);!subbPtr.isNull();list.next(subbPtr))
{
-#if 0
- if (subbPtr.p->m_firstGCI > gci) {
- jam();
- // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
- continue;
- }
-#endif
rep->senderData = subbPtr.p->m_senderData;
#if PRINT_ONLY
@@ -3019,38 +3053,87 @@
#endif
}
}
+
+ Ptr<Gcp_record> gcp;
+ if(c_gcp_list.seize(gcp))
+ {
+ gcp.p->m_gci = gci;
+ gcp.p->m_subscribers = c_subscriber_nodes;
+ }
- if (c_handoverToDo) {
- jam();
- c_handoverToDo = false;
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- if (c_buckets[i].handover) {
- if (c_buckets[i].handoverGCI > gci) {
- jam();
- c_handoverToDo = true; // still waiting for the right GCI
- break; /* since all handover should happen at the same time
- * we can break here
- */
- } else {
- c_buckets[i].handover = false;
-#ifdef HANDOVER_DEBUG
- ndbout_c("Handover Bucket %u", i);
-#endif
- if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
- // my bucket to be handed over to me
- ndbrequire(!c_buckets[i].active);
- jam();
- c_buckets[i].active = true;
- } else {
- // someone else's bucket to handover to
- ndbrequire(c_buckets[i].active);
- jam();
- c_buckets[i].active = false;
- }
+ /**
+ *
+ */
+ if(!m_switchover_buckets.isclear())
+ {
+ Uint32 i = m_switchover_buckets.find(0);
+ for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
+ {
+ if(c_buckets[i].m_switchover_gci == gci)
+ {
+ Uint32 state = c_buckets[i].m_state;
+ m_switchover_buckets.clear(i);
+ printf("switchover complete bucket %d state: %x", i, state);
+ if(state & Bucket::BUCKET_STARTING)
+ {
+ /**
+ * NR case
+ */
+ m_active_buckets.set(i);
+ c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
+ ndbout_c("starting");
+ }
+ else if(state & Bucket::BUCKET_TAKEOVER)
+ {
+ /**
+ * NF case
+ */
+ Bucket* bucket= c_buckets + i;
+ Page_pos pos= bucket->m_buffer_head;
+ ndbrequire(pos.m_max_gci < gci);
+
+ Buffer_page* page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+ ndbout_c("takeover %d", pos.m_page_id);
+ page->m_max_gci = pos.m_max_gci;
+ page->m_words_used = pos.m_page_pos;
+ page->m_next_page = RNIL;
+ memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
+ bucket->m_buffer_head.m_page_id = RNIL;
+ bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
+
+ m_active_buckets.set(i);
+ c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
+ }
+ else
+ {
+ /**
+ * NR, living node
+ */
+ ndbrequire(state & Bucket::BUCKET_HANDOVER);
+ c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
+ ndbout_c("handover");
}
}
}
}
+
+ /**
+ * Add GCP COMPLETE REP to buffer
+ */
+ for(Uint32 i = 0; i<c_no_of_buckets; i++)
+ {
+ if(m_active_buckets.get(i))
+ continue;
+
+ if(c_buckets[i].m_buffer_tail != RNIL)
+ {
+ Uint32* dst;
+ if((dst= get_buffer_ptr(i, gci, 0)) == 0)
+ {
+ ndbrequire(false);
+ }
+ }
+ }
}
void
@@ -3059,9 +3142,10 @@
jamEntry();
DBUG_ENTER("Suma::execCREATE_TAB_CONF");
+#if 0
CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
Uint32 tableId = conf->senderData;
-#if 0
+
TablePtr tabPtr;
initTable(signal,tableId,tabPtr);
#endif
@@ -3096,14 +3180,14 @@
tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
- if (getResponsibleSumaNodeId(0) != refToNode(reference()))
+ if (get_responsible_node(0) != refToNode(reference()))
{
DBUG_VOID_RETURN;
}
// responsible for bucket 0 sends info to API
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
- data->gci = c_lastCompleteGCI+1;
+ data->gci = m_last_complete_gci+1;
data->tableId = tableId;
data->operation = NdbDictionary::Event::_TE_DROP;
data->noOfAttributes = 0;
@@ -3159,14 +3243,14 @@
tabPtr.p->m_state = Table::ALTERED;
// triggers must be removed, waiting for sub stop req for that
- if (getResponsibleSumaNodeId(0) != refToNode(reference()))
+ if (get_responsible_node(0) != refToNode(reference()))
{
DBUG_VOID_RETURN;
}
// responsible for bucket 0 sends info to API
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
- data->gci = c_lastCompleteGCI+1;
+ data->gci = m_last_complete_gci+1;
data->tableId = tableId;
data->operation = NdbDictionary::Event::_TE_ALTER;
data->noOfAttributes = 0;
@@ -3211,15 +3295,6 @@
{
jam();
- SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
-
- Uint32 gci = ack->rep.gci;
-
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::runSUB_GCP_COMPLETE_ACK gci = %u", gci);
-#endif
-
- c_failoverBuffer.subGcpCompleteRep(gci);
}
void
@@ -3232,94 +3307,51 @@
Uint32 gci = ack->rep.gci;
Uint32 senderRef = ack->rep.senderRef;
Uint32 senderData = ack->rep.senderData;
-
+
if (refToBlock(senderRef) == SUMA) {
jam();
// Ack from other SUMA
- runSUB_GCP_COMPLETE_ACK(signal);
+
+ for(Uint32 i = 0; i<c_no_of_buckets; i++)
+ {
+ release_gci(signal, i, gci);
+ }
return;
}
- jam();
// Ack from User and not an ack from other SUMA, redistribute in nodegroup
- static Uint32 last_gci= 0;
- if (last_gci < gci)
+ Uint32 nodeId = refToNode(senderRef);
+
+ jam();
+ Ptr<Gcp_record> gcp;
+ for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
{
- last_gci= gci;
-
- // tell the other SUMA's that I'm done with this GCI
- jam();
- NdbNodeBitmask tmp= c_alive_nodes;
- tmp.bitAND(c_nodes_in_nodegroup_mask);
- tmp.clear(getOwnNodeId());
- NodeReceiverGroup rg(SUMA, tmp);
- sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
- SubGcpCompleteAck::SignalLength, JBB);
- }
-}
-
-static Uint32 tmpFailoverBuffer[512];
-//Suma::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p)
-// : m_dataList(p),
-Suma::FailoverBuffer::FailoverBuffer()
- :
- c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false)
-{
-}
-
-bool Suma::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz)
-{
- bool ok = true;
-
- if (c_full) {
- ok = false;
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u");
-#endif
- } else {
- c_gcis[c_next] = gci;
- c_next++;
- if (c_next == c_sz) c_next = 0;
- if (c_next == c_first)
- c_full = true;
- // ndbout_c("%u %u %u",c_first,c_next,c_sz);
- }
- return ok;
-}
-bool Suma::FailoverBuffer::subGcpCompleteRep(Uint32 gci)
-{
- bool ok = true;
-
- // ndbout_c("Empty");
- while (true) {
- if (c_first == c_next && !c_full)
- break;
- if (c_gcis[c_first] > gci)
+ if(gcp.p->m_gci == gci)
+ {
+ gcp.p->m_subscribers.clear(nodeId);
+ if(!gcp.p->m_subscribers.isclear())
+ {
+ jam();
+ return;
+ }
break;
- c_full = false;
- c_first++;
- if (c_first == c_sz) c_first = 0;
- // ndbout_c("%u %u %u : ",c_first,c_next,c_sz);
+ }
}
-
- return ok;
-}
-bool Suma::FailoverBuffer::nodeFailRep()
-{
- bool ok = true;
- while (true) {
- if (c_first == c_next && !c_full)
- break;
-
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]);
-#endif
- c_full = false;
- c_first++;
- if (c_first == c_sz) c_first = 0;
+
+ if(gcp.isNull())
+ {
+ ndbout_c("ACK wo/ gcp record");
+ }
+ else
+ {
+ c_gcp_list.release(gcp);
}
- return ok;
+
+ ack->rep.senderRef = reference();
+ NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
+ sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
+ SubGcpCompleteAck::SignalLength, JBB);
}
/**************************************************************
@@ -3842,7 +3874,7 @@
{
jam();
DBUG_ENTER("Suma::Restart::completeRestartingNode");
- SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
+ //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
SumaStartMeConf::SignalLength, JBB);
nodeId = 0;
@@ -3860,69 +3892,41 @@
SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
Uint32 gci = req->gci;
- Uint32 new_gci = getFirstGCI(signal);
+ Uint32 nodeId = req->nodeId;
+ Uint32 new_gci = m_last_complete_gci + MAX_CONCURRENT_GCP + 1;
+
+ Uint32 start_gci = (gci > new_gci ? gci : new_gci);
+ // mark all active buckets really belonging to restarting SUMA
- if (new_gci > gci)
+ Bucket_mask tmp;
+ for( Uint32 i = 0; i < c_no_of_buckets; i++)
{
- gci = new_gci;
- }
-
- { // all recreated subscribers at restarting SUMA start at same GCI
- KeyTable<Table>::Iterator it;
- for(c_tables.first(it);!it.isNull();c_tables.next(it))
+ if(get_responsible_node(i) == nodeId)
{
- LocalDLList<Subscriber>
- subscribers(c_subscriberPool,it.curr.p->c_subscribers);
- SubscriberPtr subbPtr;
- for(subscribers.first(subbPtr);
- !subbPtr.isNull();
- subscribers.next(subbPtr))
+ if (m_active_buckets.get(i))
{
- subbPtr.p->m_firstGCI = gci;
+ // I'm running this bucket but it should really be the restarted node
+ tmp.set(i);
+ m_active_buckets.clear(i);
+ m_switchover_buckets.set(i);
+ c_buckets[i].m_switchover_gci = start_gci;
+ c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
+ ndbout_c("prepare to handover bucket: %d", i);
}
- }
- }
-
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci);
-#endif
-
- c_handoverToDo = false;
- c_restart_server_node_id = 0;
- {
-#ifdef HANDOVER_DEBUG
- int c = 0;
-#endif
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- jam();
- if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
-#ifdef HANDOVER_DEBUG
- c++;
-#endif
- jam();
- c_buckets[i].active = false;
- c_buckets[i].handoverGCI = gci;
- c_buckets[i].handover = true;
- c_buckets[i].handover_started = false;
- c_handoverToDo = true;
+ else if(m_switchover_buckets.get(i))
+ {
+ ndbout_c("dont handover bucket: %d %d", i, nodeId);
}
}
-#ifdef HANDOVER_DEBUG
- ndbout_c("prepared handover of bucket %u buckets", c);
-#endif
- }
-
- for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
- jam();
- Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
- if (ref != reference()) {
- jam();
- sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal,
- SumaHandoverConf::SignalLength, JBB);
- }//if
}
- sendSTTORRY(signal);
+ SumaHandoverConf* conf= (SumaHandoverConf*)signal->getDataPtrSend();
+ tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
+ conf->gci = start_gci;
+ conf->nodeId = getOwnNodeId();
+ sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
+ SumaHandoverConf::SignalLength, JBB);
+
DBUG_VOID_RETURN;
}
@@ -3938,40 +3942,381 @@
jamEntry();
DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
- Uint32 sumaRef = signal->getSendersBlockRef();
SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
Uint32 gci = conf->gci;
-
+ Uint32 nodeId = conf->nodeId;
+ Bucket_mask tmp;
+ tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
#ifdef HANDOVER_DEBUG
ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
#endif
- /* TODO, if we are restarting several SUMA's (>2 in a nodegroup)
- * we have to collect all these conf's before proceding
+ for( Uint32 i = 0; i < c_no_of_buckets; i++)
+ {
+ if (tmp.get(i))
+ {
+ ndbrequire(get_responsible_node(i) == getOwnNodeId());
+ // We should run this bucket, but _nodeId_ is
+ c_buckets[i].m_switchover_gci = gci;
+ c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
+ }
+ }
+
+ char buf[255];
+ tmp.getText(buf);
+ infoEvent("Suma: handover from node %d gci: %d buckets: %s (%d)",
+ nodeId, gci, buf, c_no_of_buckets);
+ m_switchover_buckets.bitOR(tmp);
+ DBUG_VOID_RETURN;
+}
+
+static
+NdbOut&
+operator<<(NdbOut & out, const Suma::Page_pos & pos)
+{
+ out << "[ Page_pos:"
+ << " m_page_id: " << pos.m_page_id
+ << " m_page_pos: " << pos.m_page_pos
+ << " m_max_gci: " << pos.m_max_gci
+ << " ]";
+ return out;
+}
+
+Uint32*
+Suma::get_buffer_ptr(Uint32 buck, Uint32 gci, Uint32 sz)
+{
+ sz += 1; // len
+ Bucket* bucket= c_buckets+buck;
+ Page_pos pos= bucket->m_buffer_head;
+
+ Buffer_page* page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+ Uint32* ptr= page->m_data + pos.m_page_pos;
+
+ const bool same_gci = gci == pos.m_last_gci;
+
+ pos.m_page_pos += sz;
+ pos.m_last_gci = gci;
+ Uint32 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
+
+ if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
+ {
+ pos.m_max_gci = max;
+ bucket->m_buffer_head = pos;
+ * ptr++ = (0x8000 << 16) | sz; // Same gci
+ return ptr;
+ }
+ else if(pos.m_page_pos + 1 <= Buffer_page::DATA_WORDS)
+ {
+loop:
+ pos.m_max_gci = max;
+ pos.m_page_pos += 1;
+ bucket->m_buffer_head = pos;
+ * ptr++ = (sz + 1);
+ * ptr++ = gci;
+ return ptr;
+ }
+ else
+ {
+ /**
+ * new page
+ * 1) save header on last page
+ * 2) seize new page
+ */
+ Uint32 next;
+ if(unlikely((next= seize_page()) == RNIL))
+ {
+ ndbrequire(false);
+ }
+
+ if(likely(pos.m_page_id != RNIL))
+ {
+ page->m_max_gci = pos.m_max_gci;
+ page->m_words_used = pos.m_page_pos - sz;
+ page->m_next_page= next;
+ }
+ else
+ {
+ bucket->m_buffer_tail = next;
+ }
+
+ memset(&pos, 0, sizeof(pos));
+ pos.m_page_id = next;
+ pos.m_page_pos = sz;
+ pos.m_last_gci = gci;
+
+ page= (Buffer_page*)(m_tup->page+pos.m_page_id);
+ page->m_next_page= RNIL;
+ ptr= page->m_data;
+ goto loop; //
+ }
+}
+
+Uint32
+Suma::seize_page()
+{
+loop:
+ Ptr<Page_chunk> ptr;
+ Uint32 ref= m_first_free_page;
+ if(likely(ref != RNIL))
+ {
+ m_first_free_page = ((Buffer_page*)m_tup->page+ref)->m_next_page;
+ Uint32 chunk = ((Buffer_page*)m_tup->page+ref)->m_page_chunk_ptr_i;
+ c_page_chunk_pool.getPtr(ptr, chunk);
+ ndbassert(ptr.p->m_free);
+ ptr.p->m_free--;
+ return ref;
+ }
+
+ if(!c_page_chunk_pool.seize(ptr))
+ return RNIL;
+
+ Uint32 count;
+ m_tup->allocConsPages(16, count, ref);
+ ndbrequire(count > 0);
+
+ ndbout_c("alloc_chunk(%d %d) - ", ref, count);
+
+ m_first_free_page = ptr.p->m_page_id = ref;
+ ptr.p->m_size = count;
+ ptr.p->m_free = count;
+
+ Buffer_page* page;
+ for(Uint32 i = 0; i<count; i++)
+ {
+ page = (Buffer_page*)(m_tup->page+ref);
+ page->m_page_state= SUMA_SEQUENCE;
+ page->m_page_chunk_ptr_i = ptr.i;
+ page->m_next_page = ++ref;
+ }
+ page->m_next_page = RNIL;
+
+ goto loop;
+}
+
+void
+Suma::free_page(Uint32 page_id, Buffer_page* page)
+{
+ Ptr<Page_chunk> ptr;
+ ndbrequire(page->m_page_state == SUMA_SEQUENCE);
+
+ Uint32 chunk= page->m_page_chunk_ptr_i;
+
+ c_page_chunk_pool.getPtr(ptr, chunk);
+
+ ptr.p->m_free ++;
+ page->m_next_page = m_first_free_page;
+ ndbrequire(ptr.p->m_free <= ptr.p->m_size);
+
+ m_first_free_page = page_id;
+}
+
+void
+Suma::release_gci(Signal* signal, Uint32 buck, Uint32 gci)
+{
+ Bucket* bucket= c_buckets+buck;
+ Uint32 tail= bucket->m_buffer_tail;
+ Page_pos head= bucket->m_buffer_head;
+ Uint32 max_acked = bucket->m_max_acked_gci;
+
+ const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
+ if(unlikely(bucket->m_state & mask))
+ {
+ jam();
+ ndbout_c("release_gci(%d, %d) -> node failure -> abort", buck, gci);
+ return;
+ }
+
+ bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
+ if(unlikely(tail == RNIL))
+ {
+ return;
+ }
+
+ if(tail == head.m_page_id)
+ {
+ if(gci >= head.m_max_gci)
+ {
+ jam();
+ head.m_page_pos = 0;
+ head.m_max_gci = gci;
+ bucket->m_buffer_head = head;
+ }
+ return;
+ }
+ else
+ {
+ jam();
+ Buffer_page* page= (Buffer_page*)(m_tup->page+tail);
+ Uint32 max_gci = page->m_max_gci;
+ Uint32 next_page = page->m_next_page;
+
+ ndbassert(max_gci);
+
+ if(gci >= max_gci)
+ {
+ jam();
+ free_page(tail, page);
+
+ bucket->m_buffer_tail = next_page;
+ signal->theData[0] = SumaContinueB::RELEASE_GCI;
+ signal->theData[1] = buck;
+ signal->theData[2] = gci;
+ sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
+ return;
+ }
+ else
+ {
+ ndbout_c("do nothing...");
+ }
+ }
+}
+
+void
+Suma::start_resend(Signal* signal, Uint32 buck)
+{
+ printf("start_resend(%d, ", buck);
+
+ /**
+ * Resend from m_max_acked_gci + 1 until max_gci + 1
*/
+ Bucket* bucket= c_buckets + buck;
+ Page_pos pos= bucket->m_buffer_head;
+
+ if(pos.m_page_id == RNIL)
+ {
+ jam();
+ m_active_buckets.set(buck);
+ ndbout_c("empty bucket(RNIL) -> active");
+ return;
+ }
- // restarting node is now prepared and ready
- c_started_nodes.set(refToNode(sumaRef)); /* !! important to do before
- * below since it affects
- * getResponsibleSumaNodeId()
- */
+ Uint32 min= bucket->m_max_acked_gci + 1;
+ Uint32 max = pos.m_max_gci;
- c_handoverToDo = false;
- // mark all active buckets really belonging to restarting SUMA
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- if (c_buckets[i].active) {
- // I'm running this bucket
- if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) {
- // but it should really be the restarted node
- c_buckets[i].handoverGCI = gci;
- c_buckets[i].handover = true;
- c_buckets[i].handover_started = false;
- c_handoverToDo = true;
- }
+ if(min > max)
+ {
+ ndbrequire(pos.m_page_pos == 0);
+ ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
+ m_active_buckets.set(buck);
+ ndbout_c("empty bucket -> active");
+ return;
+ }
+
+ bucket->m_switchover_gci = max + 1;
+ bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
+
+
+ m_switchover_buckets.set(buck);
+
+ signal->theData[1] = buck;
+ signal->theData[2] = min;
+ signal->theData[3] = 0;
+ signal->theData[4] = 0;
+ sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB);
+
+ ndbout_c("min: %d - max: %d)", min, max);
+ ndbrequire(max >= min);
+}
+
+void
+Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
+ Uint32 pos, Uint32 last_gci)
+{
+ Bucket* bucket= c_buckets+buck;
+ Uint32 tail= bucket->m_buffer_tail;
+
+ Buffer_page* page= (Buffer_page*)(m_tup->page+tail);
+ Uint32 max_gci = page->m_max_gci;
+ Uint32 next_page = page->m_next_page;
+ Uint32 *ptr = page->m_data + pos;
+ Uint32 *end = page->m_data + page->m_words_used;
+ bool delay = false;
+
+ ndbrequire(tail != RNIL);
+
+ if(tail == bucket->m_buffer_head.m_page_id)
+ {
+ /**
+ * Don't dare to resend page while writing to it...
+ */
+ delay = true;
+ goto next;
+ }
+
+ if(pos == 0 && min_gci > max_gci)
+ {
+ free_page(tail, page);
+ tail = bucket->m_buffer_tail = next_page;
+ goto next;
+ }
+
+#if 0
+ for(Uint32 i = 0; i<page->m_words_used; i++)
+ {
+ printf("%.8x ", page->m_data[i]);
+ if(((i + 1) % 8) == 0)
+ printf("\n");
+ }
+ printf("\n");
+#endif
+
+ if(pos == 0)
+ ndbrequire(((* ptr) >> 16) == 0);
+
+ while(ptr < end)
+ {
+ Uint32 *src = ptr;
+ Uint32 tmp = * src++;
+ Uint32 sz = tmp & 0xFFFF;
+
+ if(! (tmp & (0x8000 << 16)))
+ {
+ last_gci = * src ++;
+ }
+
+ ptr += sz;
+ if(last_gci < min_gci)
+ {
+ continue;
}
+
+ break;
}
- DBUG_VOID_RETURN;
+
+ if(ptr == end)
+ {
+ /**
+ * release...
+ */
+ free_page(tail, page);
+ tail = bucket->m_buffer_tail = next_page;
+ pos = 0;
+ last_gci = 0;
+ }
+ else
+ {
+ pos = (ptr - page->m_data);
+ }
+
+next:
+ if(tail == RNIL)
+ {
+ bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
+ ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
+ ndbout_c("resend done...");
+ return;
+ }
+
+ signal->theData[0] = SumaContinueB::RESEND_BUCKET;
+ signal->theData[1] = buck;
+ signal->theData[2] = min_gci;
+ signal->theData[3] = pos;
+ signal->theData[4] = last_gci;
+ if(!delay)
+ sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 5, JBB);
+ else
+ sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 50, 5);
}
template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
--- 1.14/storage/ndb/src/kernel/blocks/suma/Suma.hpp Fri May 13 12:58:12 2005
+++ 1.15/storage/ndb/src/kernel/blocks/suma/Suma.hpp Wed May 18 14:10:24 2005
@@ -24,6 +24,7 @@
#include <SLList.hpp>
#include <DLList.hpp>
+#include <DLFifoList.hpp>
#include <KeyTable.hpp>
#include <DataBuffer.hpp>
#include <SignalCounter.hpp>
@@ -138,8 +139,6 @@
Uint32 m_senderRef;
Uint32 m_senderData;
Uint32 m_subPtrI; //reference to subscription
- Uint32 m_firstGCI; // first GCI to send
- Uint32 m_lastGCI; // last acnowledged GCI
Uint32 nextList;
union { Uint32 nextPool; Uint32 prevList; };
@@ -320,21 +319,7 @@
Uint32 m_table_ptrI;
};
typedef Ptr<Subscription> SubscriptionPtr;
-
- struct Bucket {
- bool active;
- bool handover;
- bool handover_started;
- Uint32 m_nodes[MAX_REPLICAS];
- Uint32 handoverGCI;
- };
- STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4!
- struct Bucket c_buckets[NO_OF_BUCKETS];
- Uint32 c_no_of_buckets;
-
- bool c_handoverToDo;
- Uint32 c_lastCompleteGCI;
-
+
/**
*
*/
@@ -356,19 +341,6 @@
ArrayPool<SyncRecord> c_syncPool;
DataBuffer<15>::DataBufferPool c_dataBufferPool;
- /**
- * for restarting Suma not to start sending data too early
- */
- Uint32 c_restart_server_node_id;
-
- /**
- * for flagging that a GCI containg inconsistent data
- * typically due to node failiure
- */
-
- Uint32 c_lastInconsistentGCI;
- Uint32 c_nodeFailGCI;
-
NodeBitmask c_failedApiNodes;
/**
@@ -397,30 +369,6 @@
void completeSubRemove(SubscriptionPtr subPtr);
Uint32 getFirstGCI(Signal* signal);
- Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci);
-
- struct FailoverBuffer {
- // FailoverBuffer(DataBuffer<15>::DataBufferPool & p);
- FailoverBuffer();
-
- bool subTableData(Uint32 gci, Uint32 *src, int sz);
- bool subGcpCompleteRep(Uint32 gci);
- bool nodeFailRep();
-
- // typedef DataBuffer<15> GCIDataBuffer;
- // GCIDataBuffer m_GCIDataBuffer;
- // GCIDataBuffer::DataBufferIterator m_GCIDataBuffer_it;
-
- Uint32 *c_gcis;
- int c_sz;
-
- // Uint32 *c_buf;
- // int c_buf_sz;
-
- int c_first;
- int c_next;
- bool c_full;
- } c_failoverBuffer;
/**
* Table admin
@@ -444,7 +392,6 @@
*/
void getNodeGroupMembers(Signal* signal);
- void send_start_me_req(Signal* signal);
void execSTTOR(Signal* signal);
void sendSTTORRY(Signal*);
@@ -453,8 +400,8 @@
void execREAD_NODESCONF(Signal* signal);
void execNODE_FAILREP(Signal* signal);
void execINCL_NODEREQ(Signal* signal);
- void execNODE_START_REP(Signal* signal);
void execSIGNAL_DROPPED_REP(Signal* signal);
+ void execAPI_START_REP(Signal* signal);
void execAPI_FAILREQ(Signal* signal) ;
void execSUB_GCP_COMPLETE_ACK(Signal* signal);
@@ -492,9 +439,6 @@
void execUTIL_SEQUENCE_REF(Signal* signal);
void execCREATE_SUBID_REQ(Signal* signal);
- Uint32 getStoreBucket(Uint32 v);
- Uint32 getResponsibleSumaNodeId(Uint32 D);
-
/**
* for Suma that is restarting another
*/
@@ -538,16 +482,123 @@
*/
NodeId c_masterNodeId;
NdbNodeBitmask c_alive_nodes;
- NdbNodeBitmask c_started_nodes;
- NdbNodeBitmask c_starting_nodes;
/**
+ * for restarting Suma not to start sending data too early
+ */
+ struct Startup
+ {
+ bool m_wait_handover;
+ Uint32 m_restart_server_node_id;
+ } c_startup;
+
+ NodeBitmask c_connected_nodes; // (NODE/API) START REP / (API/NODE) FAIL REQ
+ NodeBitmask c_subscriber_nodes; //
+
+ /**
* for all Suma's to keep track of other Suma's in Node group
*/
Uint32 c_nodeGroup;
Uint32 c_noNodesInGroup;
Uint32 c_nodesInGroup[MAX_REPLICAS];
NdbNodeBitmask c_nodes_in_nodegroup_mask; // NodeId's of nodes in nodegroup
+
+ void send_start_me_req(Signal* signal);
+ void check_start_handover(Signal* signal);
+ void send_handover_req(Signal* signal);
+
+ Uint32 get_responsible_node(Uint32 B) const;
+ Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
+ bool check_switchover(Uint32 bucket, Uint32 gci);
+
+public:
+ struct Page_pos
+ {
+ Uint32 m_page_id;
+ Uint32 m_page_pos;
+ Uint32 m_max_gci; // max gci on page
+ Uint32 m_last_gci; // last gci on page
+ };
+private:
+
+ struct Bucket
+ {
+ enum {
+ BUCKET_STARTING = 0x1 // On starting node
+ ,BUCKET_HANDOVER = 0x2 // On running node
+ ,BUCKET_TAKEOVER = 0x4 // On takeing over node
+ ,BUCKET_RESEND = 0x8 // On takeing over node
+ };
+ Uint32 m_state;
+ Uint16 m_nodes[MAX_REPLICAS];
+ Uint32 m_switchover_gci;
+ Uint32 m_max_acked_gci;
+ Uint32 m_buffer_tail; // Page
+ Page_pos m_buffer_head;
+ };
+
+ struct Buffer_page
+ {
+ STATIC_CONST( DATA_WORDS = 8192 - 5);
+ Uint32 m_page_state; // Used by TUP buddy algorithm
+ Uint32 m_page_chunk_ptr_i;
+ Uint32 m_next_page;
+ Uint32 m_words_used; //
+ Uint32 m_max_gci; //
+ Uint32 m_data[DATA_WORDS];
+ };
+
+ STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
+ Uint32 c_no_of_buckets;
+ struct Bucket c_buckets[NO_OF_BUCKETS];
+
+ STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
+ typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
+ Bucket_mask m_active_buckets;
+ Bucket_mask m_switchover_buckets;
+
+ class Dbtup* m_tup;
+ void init_buffers();
+ Uint32* get_buffer_ptr(Uint32 buck, Uint32 gci, Uint32 sz);
+ Uint32 seize_page();
+ void free_page(Uint32 page_id, Buffer_page* page);
+
+ void start_resend(Signal*, Uint32 bucket);
+ void resend_bucket(Signal*, Uint32 bucket, Uint32 gci,
+ Uint32 page_pos, Uint32 last_gci);
+ void release_gci(Signal*, Uint32 bucket, Uint32 gci);
+
+ Uint32 m_max_seen_gci; // FIRE_TRIG_ORD
+ Uint32 m_max_sent_gci; // FIRE_TRIG_ORD -> send
+ Uint32 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
+
+ struct Gcp_record
+ {
+ Uint32 m_gci;
+ NodeBitmask m_subscribers;
+ union {
+ Uint32 nextPool;
+ Uint32 nextList;
+ };
+ Uint32 prevList;
+ };
+ ArrayPool<Gcp_record> c_gcp_pool;
+ DLFifoList<Gcp_record> c_gcp_list;
+
+ struct Page_chunk
+ {
+ Uint32 m_page_id;
+ Uint32 m_size;
+ Uint32 m_free;
+ union {
+ Uint32 nextPool;
+ Uint32 nextList;
+ };
+ Uint32 prevList;
+ };
+
+ Uint32 m_first_free_page;
+ ArrayPool<Page_chunk> c_page_chunk_pool;
};
#endif
--- 1.14/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp Fri May 13 12:58:12 2005
+++ 1.15/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp Wed May 18 14:10:24 2005
@@ -25,23 +25,24 @@
c_removeDataSubscribers(c_subscriberPool),
c_tables(c_tablePool),
c_subscriptions(c_subscriptionPool),
- Restart(*this)
+ Restart(*this),
+ c_gcp_list(c_gcp_pool)
{
c_masterNodeId = getOwnNodeId();
- c_nodeGroup = c_noNodesInGroup = 0;
+ c_no_of_buckets = c_nodeGroup = c_noNodesInGroup = 0;
for (int i = 0; i < MAX_REPLICAS; i++) {
c_nodesInGroup[i] = 0;
}
-
+
// Add received signals
addRecSignal(GSN_STTOR, &Suma::execSTTOR);
addRecSignal(GSN_NDB_STTOR, &Suma::execNDB_STTOR);
addRecSignal(GSN_DUMP_STATE_ORD, &Suma::execDUMP_STATE_ORD);
addRecSignal(GSN_READ_NODESCONF, &Suma::execREAD_NODESCONF);
+ addRecSignal(GSN_API_START_REP, &Suma::execAPI_START_REP, true);
addRecSignal(GSN_API_FAILREQ, &Suma::execAPI_FAILREQ);
addRecSignal(GSN_NODE_FAILREP, &Suma::execNODE_FAILREP);
- addRecSignal(GSN_NODE_START_REP, &Suma::execNODE_START_REP);
addRecSignal(GSN_INCL_NODEREQ, &Suma::execINCL_NODEREQ);
addRecSignal(GSN_CONTINUEB, &Suma::execCONTINUEB);
addRecSignal(GSN_SIGNAL_DROPPED_REP, &Suma::execSIGNAL_DROPPED_REP, true);
@@ -148,6 +149,10 @@
c_subscriptionPool.setSize(noTables);
c_syncPool.setSize(2);
c_dataBufferPool.setSize(noAttrs);
+ c_gcp_pool.setSize(5);
+
+ m_first_free_page= RNIL;
+ c_page_chunk_pool.setSize(5);
{
SLList<SyncRecord> tmp(c_syncPool);
@@ -157,20 +162,21 @@
tmp.release();
}
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- c_buckets[i].active = false;
- c_buckets[i].handover = false;
- c_buckets[i].handover_started = false;
- c_buckets[i].handoverGCI = 0;
- memset(c_buckets[i].m_nodes, 0, sizeof(c_buckets[i].m_nodes));
+ memset(c_buckets, 0, sizeof(c_buckets));
+ for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
+ {
+ Bucket* bucket= c_buckets+i;
+ bucket->m_buffer_tail = RNIL;
+ bucket->m_buffer_head.m_page_id = RNIL;
+ bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
}
- c_handoverToDo = false;
- c_lastInconsistentGCI = RNIL;
- c_lastCompleteGCI = RNIL;
- c_nodeFailGCI = 0;
-
- c_restart_server_node_id = 0; // Server for my NR
+
+ m_max_seen_gci = 0; // FIRE_TRIG_ORD
+ m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send
+ m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
+
c_failedApiNodes.clear();
+ c_startup.m_restart_server_node_id = 0; // Server for my NR
}
Suma::~Suma()
--- 1.19/storage/ndb/src/kernel/vm/SimulatedBlock.cpp Fri May 13 12:58:12 2005
+++ 1.20/storage/ndb/src/kernel/vm/SimulatedBlock.cpp Wed May 18 14:10:24 2005
@@ -141,6 +141,7 @@
a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
a[GSN_READ_CONFIG_REQ] = &SimulatedBlock::execREAD_CONFIG_REQ;
a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
+ a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
}
void
@@ -908,6 +909,11 @@
void
SimulatedBlock::execNODE_START_REP(Signal* signal)
+{
+}
+
+void
+SimulatedBlock::execAPI_START_REP(Signal* signal)
{
}
--- 1.13/storage/ndb/src/kernel/vm/SimulatedBlock.hpp Fri May 13 12:58:12 2005
+++ 1.14/storage/ndb/src/kernel/vm/SimulatedBlock.hpp Wed May 18 14:10:24 2005
@@ -400,6 +400,7 @@
void execSIGNAL_DROPPED_REP(Signal* signal);
void execCONTINUE_FRAGMENTED(Signal* signal);
+ void execAPI_START_REP(Signal* signal);
void execNODE_START_REP(Signal* signal);
Uint32 c_fragmentIdCounter;
| Thread |
|---|
| • bk commit into 5.1 tree (joreland:1.1865) | jonas.oreland | 18 May |