Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2299 06/04/07 10:32:13 jonas@stripped +11 -0
Merge perch.ndb.mysql.com:/home/jonas/src/50-jonas
into perch.ndb.mysql.com:/home/jonas/src/51-work
storage/ndb/src/mgmsrv/Services.cpp
1.61 06/04/07 10:32:09 jonas@stripped +0 -0
Auto merged
storage/ndb/src/mgmsrv/Services.cpp
1.45.15.3 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/mgmsrv/Services.cpp -> storage/ndb/src/mgmsrv/Services.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
1.92 06/04/07 10:32:09 jonas@stripped +0 -0
Auto merged
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
1.73.19.3 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/mgmsrv/MgmtSrvr.cpp -> storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
1.42 06/04/07 10:32:09 jonas@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
1.31 06/04/07 10:32:09 jonas@stripped +0 -0
Auto merged
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
1.30.10.2 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/mgmsrv/MgmtSrvr.hpp -> storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/mgmclient/CommandInterpreter.cpp
1.60 06/04/07 10:32:09 jonas@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
1.13.14.3 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/qmgr/QmgrMain.cpp -> storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
1.33 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
storage/ndb/include/kernel/signaldata/StopReq.hpp
1.5 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
storage/ndb/src/mgmclient/CommandInterpreter.cpp
1.49.8.2 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/mgmclient/CommandInterpreter.cpp -> storage/ndb/src/mgmclient/CommandInterpreter.cpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
1.14.9.2 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp -> storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/include/kernel/signaldata/StopReq.hpp
1.1.3.2 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/include/kernel/signaldata/StopReq.hpp -> storage/ndb/include/kernel/signaldata/StopReq.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
1.60 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
1.24.23.2 06/04/07 10:32:08 jonas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/dbdih/DbdihMain.cpp -> storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
sql-common/client.c
1.90 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
mysql-test/mysql-test-run.pl
1.100 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
libmysql/libmysql.c
1.246 06/04/07 10:32:08 jonas@stripped +0 -0
Auto merged
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-work/RESYNC
--- 1.245/libmysql/libmysql.c 2006-03-21 21:59:02 +01:00
+++ 1.246/libmysql/libmysql.c 2006-04-07 10:32:08 +02:00
@@ -1380,35 +1380,6 @@
}
-/*
- Get version number for server in a form easy to test on
-
- SYNOPSIS
- mysql_get_server_version()
- mysql Connection
-
- EXAMPLE
- 4.1.0-alfa -> 40100
-
- NOTES
- We will ensure that a newer server always has a bigger number.
-
- RETURN
- Signed number > 323000
-*/
-
-ulong STDCALL
-mysql_get_server_version(MYSQL *mysql)
-{
- uint major, minor, version;
- char *pos= mysql->server_version, *end_pos;
- major= (uint) strtoul(pos, &end_pos, 10); pos=end_pos+1;
- minor= (uint) strtoul(pos, &end_pos, 10); pos=end_pos+1;
- version= (uint) strtoul(pos, &end_pos, 10);
- return (ulong) major*10000L+(ulong) (minor*100+version);
-}
-
-
const char * STDCALL
mysql_get_host_info(MYSQL *mysql)
{
--- 1.89/sql-common/client.c 2006-03-28 06:26:48 +02:00
+++ 1.90/sql-common/client.c 2006-04-07 10:32:08 +02:00
@@ -2817,6 +2817,36 @@
return mysql->net.last_error;
}
+
+/*
+ Get version number for server in a form easy to test on
+
+ SYNOPSIS
+ mysql_get_server_version()
+ mysql Connection
+
+ EXAMPLE
+ 4.1.0-alfa -> 40100
+
+ NOTES
+ We will ensure that a newer server always has a bigger number.
+
+ RETURN
+ Signed number > 323000
+*/
+
+ulong STDCALL
+mysql_get_server_version(MYSQL *mysql)
+{
+ uint major, minor, version;
+ char *pos= mysql->server_version, *end_pos;
+ major= (uint) strtoul(pos, &end_pos, 10); pos=end_pos+1;
+ minor= (uint) strtoul(pos, &end_pos, 10); pos=end_pos+1;
+ version= (uint) strtoul(pos, &end_pos, 10);
+ return (ulong) major*10000L+(ulong) (minor*100+version);
+}
+
+
/*
mysql_set_character_set function sends SET NAMES cs_name to
the server (which changes character_set_client, character_set_result
@@ -2836,6 +2866,9 @@
{
char buff[MY_CS_NAME_SIZE + 10];
charsets_dir= save_csdir;
+ /* Skip execution of "SET NAMES" for pre-4.1 servers */
+ if (mysql_get_server_version(mysql) < 40100)
+ return 0;
sprintf(buff, "SET NAMES %s", cs_name);
if (!mysql_real_query(mysql, buff, strlen(buff)))
{
--- 1.24.23.1/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-04-06 11:55:32 +02:00
+++ 1.60/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-04-07 10:32:08 +02:00
@@ -67,6 +67,7 @@
#include <signaldata/CreateFragmentation.hpp>
#include <signaldata/LqhFrag.hpp>
#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/DihFragCount.hpp>
#include <DebuggerNames.hpp>
#include <EventLogger.hpp>
@@ -609,6 +610,14 @@
checkWaitDropTabFailedLqh(signal, nodeId, tableId);
return;
}
+ case DihContinueB::ZTO_START_FRAGMENTS:
+ {
+ TakeOverRecordPtr takeOverPtr;
+ takeOverPtr.i = signal->theData[1];
+ ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+ nr_start_fragments(signal, takeOverPtr);
+ return;
+ }
}//switch
ndbrequire(false);
@@ -639,9 +648,11 @@
c_copyGCISlave.m_expectedNextWord += CopyGCIReq::DATA_SIZE;
return;
}//if
-
+
+ Uint32 tmp= SYSFILE->m_restart_seq;
memcpy(sysfileData, cdata, sizeof(sysfileData));
-
+ SYSFILE->m_restart_seq = tmp;
+
c_copyGCISlave.m_copyReason = reason;
c_copyGCISlave.m_senderRef = signal->senderBlockRef();
c_copyGCISlave.m_senderData = copyGCI->anyData;
@@ -1052,7 +1063,7 @@
jamEntry();
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequireErr(p != 0, NDBD_EXIT_INVALID_CONFIG);
initData();
@@ -1138,7 +1149,7 @@
{
jamEntry();
cntrlblockref = signal->theData[0];
- if(theConfiguration.getInitialStart()){
+ if(m_ctx.m_config.getInitialStart()){
sendSignal(cntrlblockref, GSN_DIH_RESTARTREF, signal, 1, JBB);
} else {
readGciFileLab(signal);
@@ -1656,12 +1667,15 @@
*
* But dont copy lastCompletedGCI:s
*/
+ Uint32 key = SYSFILE->m_restart_seq;
Uint32 tempGCP[MAX_NDB_NODES];
for(i = 0; i < MAX_NDB_NODES; i++)
tempGCP[i] = SYSFILE->lastCompletedGCI[i];
for(i = 0; i < Sysfile::SYSFILE_SIZE32; i++)
sysfileData[i] = cdata[i];
+
+ SYSFILE->m_restart_seq = key;
for(i = 0; i < MAX_NDB_NODES; i++)
SYSFILE->lastCompletedGCI[i] = tempGCP[i];
@@ -1819,11 +1833,6 @@
ndbrequire(c_nodeStartMaster.startNode == Tnodeid);
ndbrequire(getNodeStatus(Tnodeid) == NodeRecord::STARTING);
- sendSTART_RECREQ(signal, Tnodeid);
-}//Dbdih::execSTART_MEREQ()
-
-void Dbdih::nodeRestartStartRecConfLab(Signal* signal)
-{
c_nodeStartMaster.blockLcp = true;
if ((c_lcpState.lcpStatus != LCP_STATUS_IDLE) &&
(c_lcpState.lcpStatus != LCP_TCGET)) {
@@ -2634,13 +2643,14 @@
return;
}//if
c_startToLock = takeOverPtrI;
+
+ takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
StartToReq * const req = (StartToReq *)&signal->theData[0];
req->userPtr = takeOverPtr.i;
req->userRef = reference();
req->startingNodeId = takeOverPtr.p->toStartingNode;
req->nodeTakenOver = takeOverPtr.p->toFailedNode;
req->nodeRestart = takeOverPtr.p->toNodeRestart;
- takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
sendLoopMacro(START_TOREQ, sendSTART_TOREQ);
}//Dbdih::sendStartTo()
@@ -2684,9 +2694,153 @@
CRASH_INSERTION(7134);
c_startToLock = RNIL;
+ if (takeOverPtr.p->toNodeRestart)
+ {
+ jam();
+ takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING_LOCAL_FRAGMENTS;
+ nr_start_fragments(signal, takeOverPtr);
+ return;
+ }
+
startNextCopyFragment(signal, takeOverPtr.i);
}//Dbdih::execSTART_TOCONF()
+void
+Dbdih::nr_start_fragments(Signal* signal,
+ TakeOverRecordPtr takeOverPtr)
+{
+ Uint32 loopCount = 0 ;
+ TabRecordPtr tabPtr;
+ while (loopCount++ < 100) {
+ tabPtr.i = takeOverPtr.p->toCurrentTabref;
+ if (tabPtr.i >= ctabFileSize) {
+ jam();
+ nr_run_redo(signal, takeOverPtr);
+ return;
+ }//if
+ ptrAss(tabPtr, tabRecord);
+ if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE){
+ jam();
+ takeOverPtr.p->toCurrentFragid = 0;
+ takeOverPtr.p->toCurrentTabref++;
+ continue;
+ }//if
+ Uint32 fragId = takeOverPtr.p->toCurrentFragid;
+ if (fragId >= tabPtr.p->totalfragments) {
+ jam();
+ takeOverPtr.p->toCurrentFragid = 0;
+ takeOverPtr.p->toCurrentTabref++;
+ continue;
+ }//if
+ FragmentstorePtr fragPtr;
+ getFragstore(tabPtr.p, fragId, fragPtr);
+ ReplicaRecordPtr loopReplicaPtr;
+ loopReplicaPtr.i = fragPtr.p->oldStoredReplicas;
+ while (loopReplicaPtr.i != RNIL) {
+ ptrCheckGuard(loopReplicaPtr, creplicaFileSize, replicaRecord);
+ if (loopReplicaPtr.p->procNode == takeOverPtr.p->toStartingNode) {
+ jam();
+ nr_start_fragment(signal, takeOverPtr, loopReplicaPtr);
+ break;
+ } else {
+ jam();
+ loopReplicaPtr.i = loopReplicaPtr.p->nextReplica;
+ }//if
+ }//while
+ takeOverPtr.p->toCurrentFragid++;
+ }//while
+ signal->theData[0] = DihContinueB::ZTO_START_FRAGMENTS;
+ signal->theData[1] = takeOverPtr.i;
+ sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+}
+
+void
+Dbdih::nr_start_fragment(Signal* signal,
+ TakeOverRecordPtr takeOverPtr,
+ ReplicaRecordPtr replicaPtr)
+{
+ Uint32 i, j = 0;
+ Uint32 maxLcpId = 0;
+ Uint32 maxLcpIndex = ~0;
+
+ Uint32 restorableGCI = 0;
+
+ ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
+ takeOverPtr.p->toCurrentTabref,
+ takeOverPtr.p->toCurrentFragid,
+ replicaPtr.p->nextLcp);
+
+ Uint32 idx = replicaPtr.p->nextLcp;
+ for(i = 0; i<MAX_LCP_STORED; i++, idx = nextLcpNo(idx))
+ {
+ ndbout_c("scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+ if (replicaPtr.p->lcpStatus[idx] == ZVALID)
+ {
+ ndbrequire(replicaPtr.p->lcpId[idx] > maxLcpId);
+ Uint32 startGci = replicaPtr.p->maxGciCompleted[idx];
+ Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+ for (;j < replicaPtr.p->noCrashedReplicas; j++)
+ {
+ ndbout_c("crashed replica: %d(%d) replicaLastGci: %d",
+ j,
+ replicaPtr.p->noCrashedReplicas,
+ replicaPtr.p->replicaLastGci[j]);
+ if (replicaPtr.p->replicaLastGci[j] > stopGci)
+ {
+ maxLcpId = replicaPtr.p->lcpId[idx];
+ maxLcpIndex = idx;
+ restorableGCI = replicaPtr.p->replicaLastGci[j];
+ break;
+ }
+ }
+ }
+ }
+
+ if (maxLcpIndex == ~0)
+ {
+ ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
+ takeOverPtr.p->toStartingNode,
+ takeOverPtr.p->toCurrentTabref,
+ takeOverPtr.p->toCurrentFragid);
+ replicaPtr.p->lcpIdStarted = 0;
+ }
+ else
+ {
+ ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d) newestRestorableGCI: %d",
+ maxLcpId,
+ maxLcpIndex,
+ replicaPtr.p->maxGciStarted[maxLcpIndex],
+ replicaPtr.p->maxGciCompleted[maxLcpIndex],
+ restorableGCI,
+ SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
+ SYSFILE->newestRestorableGCI);
+
+ replicaPtr.p->lcpIdStarted = restorableGCI;
+ BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+ StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+ req->userPtr = 0;
+ req->userRef = reference();
+ req->lcpNo = maxLcpIndex;
+ req->lcpId = maxLcpId;
+ req->tableId = takeOverPtr.p->toCurrentTabref;
+ req->fragId = takeOverPtr.p->toCurrentFragid;
+ req->noOfLogNodes = 1;
+ req->lqhLogNode[0] = takeOverPtr.p->toStartingNode;
+ req->startGci[0] = replicaPtr.p->maxGciCompleted[maxLcpIndex];
+ req->lastGci[0] = restorableGCI;
+ sendSignal(ref, GSN_START_FRAGREQ, signal,
+ StartFragReq::SignalLength, JBB);
+ }
+}
+
+void
+Dbdih::nr_run_redo(Signal* signal, TakeOverRecordPtr takeOverPtr)
+{
+ takeOverPtr.p->toCurrentTabref = 0;
+ takeOverPtr.p->toCurrentFragid = 0;
+ sendSTART_RECREQ(signal, takeOverPtr.p->toStartingNode);
+}
+
void Dbdih::initStartTakeOver(const StartToReq * req,
TakeOverRecordPtr takeOverPtr)
{
@@ -3019,6 +3173,14 @@
/*---------------------------------------------------------------------- */
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, fragId, fragPtr);
+ Uint32 gci = 0;
+ if (takeOverPtr.p->toNodeRestart)
+ {
+ ReplicaRecordPtr replicaPtr;
+ findReplica(replicaPtr, fragPtr.p, takeOverPtr.p->toStartingNode, true);
+ gci = replicaPtr.p->lcpIdStarted;
+ replicaPtr.p->lcpIdStarted = 0;
+ }
takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toCopyNode);
CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
@@ -3029,6 +3191,7 @@
copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
+ copyFragReq->gci = gci;
sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
@@ -3550,6 +3713,7 @@
/* WE ALSO COPY TO OUR OWN NODE. TO ENABLE US TO DO THIS PROPERLY WE */
/* START BY CLOSING THIS FILE. */
/* ----------------------------------------------------------------------- */
+ globalData.m_restart_seq = ++SYSFILE->m_restart_seq;
closeFile(signal, filePtr);
filePtr.p->reqStatus = FileRecord::CLOSING_GCP;
}//Dbdih::readingGcpLab()
@@ -4068,6 +4232,8 @@
Uint32 takeOverPtrI)
{
jam();
+ ndbout_c("checkTakeOverInMasterStartNodeFailure %x",
+ takeOverPtrI);
if (takeOverPtrI == RNIL) {
jam();
return;
@@ -4081,6 +4247,9 @@
takeOverPtr.i = takeOverPtrI;
ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+ ndbout_c("takeOverPtr.p->toMasterStatus: %x",
+ takeOverPtr.p->toMasterStatus);
+
bool ok = false;
switch (takeOverPtr.p->toMasterStatus) {
case TakeOverRecord::IDLE:
@@ -4189,6 +4358,13 @@
//-----------------------------------------------------------------------
endTakeOver(takeOverPtr.i);
break;
+
+ case TakeOverRecord::STARTING_LOCAL_FRAGMENTS:
+ ok = true;
+ jam();
+ endTakeOver(takeOverPtr.i);
+ break;
+
/**
* The following are states that it should not be possible to "be" in
*/
@@ -5601,11 +5777,9 @@
#endif
}
- bool ok = false;
MasterLCPConf::State lcpState;
switch (c_lcpState.lcpStatus) {
case LCP_STATUS_IDLE:
- ok = true;
jam();
/*------------------------------------------------*/
/* LOCAL CHECKPOINT IS CURRENTLY NOT ACTIVE */
@@ -5616,7 +5790,6 @@
lcpState = MasterLCPConf::LCP_STATUS_IDLE;
break;
case LCP_STATUS_ACTIVE:
- ok = true;
jam();
/*--------------------------------------------------*/
/* COPY OF RESTART INFORMATION HAS BEEN */
@@ -5625,7 +5798,6 @@
lcpState = MasterLCPConf::LCP_STATUS_ACTIVE;
break;
case LCP_TAB_COMPLETED:
- ok = true;
jam();
/*--------------------------------------------------------*/
/* ALL LCP_REPORT'S HAVE BEEN COMPLETED FOR */
@@ -5635,7 +5807,6 @@
lcpState = MasterLCPConf::LCP_TAB_COMPLETED;
break;
case LCP_TAB_SAVED:
- ok = true;
jam();
/*--------------------------------------------------------*/
/* ALL LCP_REPORT'S HAVE BEEN COMPLETED FOR */
@@ -5659,15 +5830,15 @@
break;
case LCP_COPY_GCI:
case LCP_INIT_TABLES:
- ok = true;
/**
* These two states are handled by if statements above
*/
ndbrequire(false);
lcpState= MasterLCPConf::LCP_STATUS_IDLE; // remove warning
break;
+ default:
+ ndbrequire(false);
}//switch
- ndbrequire(ok);
Uint32 failedNodeId = c_lcpState.m_MASTER_LCPREQ_FailedNodeId;
MasterLCPConf * const conf = (MasterLCPConf *)&signal->theData[0];
@@ -6265,96 +6436,146 @@
3.7.1 A D D T A B L E M A I N L Y
***************************************
*/
-void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
+
+static inline void inc_node_or_group(Uint32 &node, Uint32 max_node)
+{
+ Uint32 next = node + 1;
+ node = (next == max_node ? 0 : next);
+}
+
+/*
+ Spread fragments in backwards compatible mode
+*/
+static void set_default_node_groups(Signal *signal, Uint32 noFrags)
+{
+ Uint16 *node_group_array = (Uint16*)&signal->theData[25];
+ Uint32 i;
+ node_group_array[0] = 0;
+ for (i = 1; i < noFrags; i++)
+ node_group_array[i] = UNDEF_NODEGROUP;
+}
+void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal)
+{
+ Uint16 node_group_id[MAX_NDB_PARTITIONS];
jamEntry();
CreateFragmentationReq * const req =
(CreateFragmentationReq*)signal->getDataPtr();
const Uint32 senderRef = req->senderRef;
const Uint32 senderData = req->senderData;
- const Uint32 fragmentNode = req->fragmentNode;
- const Uint32 fragmentType = req->fragmentationType;
- //const Uint32 fragmentCount = req->noOfFragments;
+ Uint32 noOfFragments = req->noOfFragments;
+ const Uint32 fragType = req->fragmentationType;
const Uint32 primaryTableId = req->primaryTableId;
Uint32 err = 0;
do {
- Uint32 noOfFragments = 0;
- Uint32 noOfReplicas = cnoReplicas;
- switch(fragmentType){
- case DictTabInfo::AllNodesSmallTable:
- jam();
- noOfFragments = csystemnodes;
- break;
- case DictTabInfo::AllNodesMediumTable:
- jam();
- noOfFragments = 2 * csystemnodes;
- break;
- case DictTabInfo::AllNodesLargeTable:
- jam();
- noOfFragments = 4 * csystemnodes;
- break;
- case DictTabInfo::SingleFragment:
- jam();
- noOfFragments = 1;
- break;
-#if 0
- case DictTabInfo::SpecifiedFragmentCount:
- noOfFragments = (fragmentCount == 0 ? 1 : (fragmentCount + 1)/ 2);
- break;
-#endif
- default:
- jam();
- err = CreateFragmentationRef::InvalidFragmentationType;
- break;
- }
- if(err)
- break;
-
NodeGroupRecordPtr NGPtr;
TabRecordPtr primTabPtr;
+ Uint32 count = 2;
+ Uint16 noOfReplicas = cnoReplicas;
+ Uint16 *fragments = (Uint16*)(signal->theData+25);
if (primaryTableId == RNIL) {
- if(fragmentNode == 0){
- jam();
- NGPtr.i = 0;
- if(noOfFragments < csystemnodes)
- {
- NGPtr.i = c_nextNodeGroup;
- c_nextNodeGroup = (NGPtr.i + 1 == cnoOfNodeGroups ? 0 : NGPtr.i + 1);
- }
- } else if(! (fragmentNode < MAX_NDB_NODES)) {
- jam();
- err = CreateFragmentationRef::InvalidNodeId;
- } else {
- jam();
- const Uint32 stat = Sysfile::getNodeStatus(fragmentNode,
- SYSFILE->nodeStatus);
- switch (stat) {
- case Sysfile::NS_Active:
- case Sysfile::NS_ActiveMissed_1:
- case Sysfile::NS_ActiveMissed_2:
- case Sysfile::NS_TakeOver:
+ jam();
+ switch ((DictTabInfo::FragmentType)fragType)
+ {
+ /*
+ Backward compatability and for all places in code not changed.
+ */
+ case DictTabInfo::AllNodesSmallTable:
jam();
+ noOfFragments = csystemnodes;
+ set_default_node_groups(signal, noOfFragments);
break;
- case Sysfile::NS_NotActive_NotTakenOver:
+ case DictTabInfo::AllNodesMediumTable:
jam();
+ noOfFragments = 2 * csystemnodes;
+ set_default_node_groups(signal, noOfFragments);
break;
- case Sysfile::NS_HotSpare:
+ case DictTabInfo::AllNodesLargeTable:
jam();
- case Sysfile::NS_NotDefined:
+ noOfFragments = 4 * csystemnodes;
+ set_default_node_groups(signal, noOfFragments);
+ break;
+ case DictTabInfo::SingleFragment:
+ jam();
+ noOfFragments = 1;
+ set_default_node_groups(signal, noOfFragments);
+ break;
+ case DictTabInfo::DistrKeyHash:
jam();
+ case DictTabInfo::DistrKeyLin:
+ jam();
+ if (noOfFragments == 0)
+ {
+ jam();
+ noOfFragments = csystemnodes;
+ set_default_node_groups(signal, noOfFragments);
+ }
+ break;
default:
jam();
- err = CreateFragmentationRef::InvalidNodeType;
+ if (noOfFragments == 0)
+ {
+ jam();
+ err = CreateFragmentationRef::InvalidFragmentationType;
+ }
break;
+ }
+ if (err)
+ break;
+ /*
+ When we come here the the exact partition is specified
+ and there is an array of node groups sent along as well.
+ */
+ memcpy(&node_group_id[0], &signal->theData[25], 2 * noOfFragments);
+ Uint16 next_replica_node[MAX_NDB_NODES];
+ memset(next_replica_node,0,sizeof(next_replica_node));
+ Uint32 default_node_group= c_nextNodeGroup;
+ for(Uint32 fragNo = 0; fragNo < noOfFragments; fragNo++)
+ {
+ jam();
+ NGPtr.i = node_group_id[fragNo];
+ if (NGPtr.i == UNDEF_NODEGROUP)
+ {
+ jam();
+ NGPtr.i = default_node_group;
}
- if(err)
+ if (NGPtr.i > cnoOfNodeGroups)
+ {
+ jam();
+ err = CreateFragmentationRef::InvalidNodeGroup;
break;
- NGPtr.i = Sysfile::getNodeGroup(fragmentNode,
- SYSFILE->nodeGroups);
+ }
+ ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+ const Uint32 max = NGPtr.p->nodeCount;
+
+ Uint32 tmp= next_replica_node[NGPtr.i];
+ for(Uint32 replicaNo = 0; replicaNo < noOfReplicas; replicaNo++)
+ {
+ jam();
+ const Uint16 nodeId = NGPtr.p->nodesInGroup[tmp];
+ fragments[count++]= nodeId;
+ inc_node_or_group(tmp, max);
+ }
+ inc_node_or_group(tmp, max);
+ next_replica_node[NGPtr.i]= tmp;
+
+ /**
+ * Next node group for next fragment
+ */
+ inc_node_or_group(default_node_group, cnoOfNodeGroups);
+ }
+ if (err)
+ {
+ jam();
break;
}
+ else
+ {
+ jam();
+ c_nextNodeGroup = default_node_group;
+ }
} else {
if (primaryTableId >= ctabFileSize) {
jam();
@@ -6368,49 +6589,14 @@
err = CreateFragmentationRef::InvalidPrimaryTable;
break;
}
- if (noOfFragments != primTabPtr.p->totalfragments) {
- jam();
- err = CreateFragmentationRef::InvalidFragmentationType;
- break;
- }
- }
-
- Uint32 count = 2;
- Uint16 *fragments = (Uint16*)(signal->theData+25);
- if (primaryTableId == RNIL) {
- jam();
- Uint8 next_replica_node[MAX_NDB_NODES];
- memset(next_replica_node,0,sizeof(next_replica_node));
- for(Uint32 fragNo = 0; fragNo<noOfFragments; fragNo++){
- jam();
- ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
- const Uint32 max = NGPtr.p->nodeCount;
-
- Uint32 tmp= next_replica_node[NGPtr.i];
- for(Uint32 replicaNo = 0; replicaNo<noOfReplicas; replicaNo++)
- {
- jam();
- const Uint32 nodeId = NGPtr.p->nodesInGroup[tmp++];
- fragments[count++] = nodeId;
- tmp = (tmp >= max ? 0 : tmp);
- }
- tmp++;
- next_replica_node[NGPtr.i]= (tmp >= max ? 0 : tmp);
-
- /**
- * Next node group for next fragment
- */
- NGPtr.i++;
- NGPtr.i = (NGPtr.i == cnoOfNodeGroups ? 0 : NGPtr.i);
- }
- } else {
+ noOfFragments= primTabPtr.p->totalfragments;
for (Uint32 fragNo = 0;
- fragNo < primTabPtr.p->totalfragments; fragNo++) {
+ fragNo < noOfFragments; fragNo++) {
jam();
FragmentstorePtr fragPtr;
ReplicaRecordPtr replicaPtr;
getFragstore(primTabPtr.p, fragNo, fragPtr);
- fragments[count++] = fragPtr.p->preferredPrimary;
+ fragments[count++]= fragPtr.p->preferredPrimary;
for (replicaPtr.i = fragPtr.p->storedReplicas;
replicaPtr.i != RNIL;
replicaPtr.i = replicaPtr.p->nextReplica) {
@@ -6418,9 +6604,9 @@
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
if (replicaPtr.p->procNode != fragPtr.p->preferredPrimary) {
jam();
- fragments[count++] = replicaPtr.p->procNode;
- }//if
- }//for
+ fragments[count++]= replicaPtr.p->procNode;
+ }
+ }
for (replicaPtr.i = fragPtr.p->oldStoredReplicas;
replicaPtr.i != RNIL;
replicaPtr.i = replicaPtr.p->nextReplica) {
@@ -6428,25 +6614,26 @@
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
if (replicaPtr.p->procNode != fragPtr.p->preferredPrimary) {
jam();
- fragments[count++] = replicaPtr.p->procNode;
- }//if
- }//for
+ fragments[count++]= replicaPtr.p->procNode;
+ }
+ }
}
}
- ndbrequire(count == (2 + noOfReplicas * noOfFragments));
+ ndbrequire(count == (2U + noOfReplicas * noOfFragments));
CreateFragmentationConf * const conf =
(CreateFragmentationConf*)signal->getDataPtrSend();
conf->senderRef = reference();
conf->senderData = senderData;
- conf->noOfReplicas = noOfReplicas;
- conf->noOfFragments = noOfFragments;
+ conf->noOfReplicas = (Uint32)noOfReplicas;
+ conf->noOfFragments = (Uint32)noOfFragments;
- fragments[0] = noOfReplicas;
- fragments[1] = noOfFragments;
+ fragments[0]= noOfReplicas;
+ fragments[1]= noOfFragments;
if(senderRef != 0)
{
+ jam();
LinearSectionPtr ptr[3];
ptr[0].p = (Uint32*)&fragments[0];
ptr[0].sz = (count + 1) / 2;
@@ -6458,33 +6645,17 @@
ptr,
1);
}
- else
- {
- // Execute direct
- signal->theData[0] = 0;
- }
+ // Always ACK/NACK (here ACK)
+ signal->theData[0] = 0;
return;
} while(false);
-
- if(senderRef != 0)
- {
- CreateFragmentationRef * const ref =
- (CreateFragmentationRef*)signal->getDataPtrSend();
- ref->senderRef = reference();
- ref->senderData = senderData;
- ref->errorCode = err;
- sendSignal(senderRef, GSN_CREATE_FRAGMENTATION_REF, signal,
- CreateFragmentationRef::SignalLength, JBB);
- }
- else
- {
- // Execute direct
- signal->theData[0] = err;
- }
+ // Always ACK/NACK (here NACK)
+ signal->theData[0] = err;
}
void Dbdih::execDIADDTABREQ(Signal* signal)
{
+ Uint32 fragType;
jamEntry();
DiAddTabReq * const req = (DiAddTabReq*)signal->getDataPtr();
@@ -6509,6 +6680,7 @@
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
tabPtr.p->connectrec = connectPtr.i;
tabPtr.p->tableType = req->tableType;
+ fragType= req->fragType;
tabPtr.p->schemaVersion = req->schemaVersion;
tabPtr.p->primaryTableId = req->primaryTableId;
@@ -6545,9 +6717,33 @@
/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
tabPtr.p->tabStatus = TabRecord::TS_CREATING;
tabPtr.p->storedTable = req->storedTable;
- tabPtr.p->method = TabRecord::HASH;
tabPtr.p->kvalue = req->kValue;
+ switch ((DictTabInfo::FragmentType)fragType)
+ {
+ case DictTabInfo::AllNodesSmallTable:
+ case DictTabInfo::AllNodesMediumTable:
+ case DictTabInfo::AllNodesLargeTable:
+ case DictTabInfo::SingleFragment:
+ jam();
+ case DictTabInfo::DistrKeyLin:
+ jam();
+ tabPtr.p->method= TabRecord::LINEAR_HASH;
+ break;
+ case DictTabInfo::DistrKeyHash:
+ case DictTabInfo::DistrKeyUniqueHashIndex:
+ case DictTabInfo::DistrKeyOrderedIndex:
+ jam();
+ tabPtr.p->method= TabRecord::NORMAL_HASH;
+ break;
+ case DictTabInfo::UserDefined:
+ jam();
+ tabPtr.p->method= TabRecord::USER_DEFINED;
+ break;
+ default:
+ ndbrequire(false);
+ }
+
union {
Uint16 fragments[2 + MAX_FRAG_PER_NODE*MAX_REPLICAS*MAX_NDB_NODES];
Uint32 align;
@@ -6597,6 +6793,8 @@
Uint32 activeIndex = 0;
getFragstore(tabPtr.p, fragId, fragPtr);
fragPtr.p->preferredPrimary = fragments[index];
+ fragPtr.p->m_log_part_id = c_nextLogPart++;
+
for (Uint32 i = 0; i<noReplicas; i++) {
const Uint32 nodeId = fragments[index++];
ReplicaRecordPtr replicaPtr;
@@ -6641,9 +6839,9 @@
jam();
const Uint32 fragCount = tabPtr.p->totalfragments;
ReplicaRecordPtr replicaPtr; replicaPtr.i = RNIL;
+ FragmentstorePtr fragPtr;
for(; fragId<fragCount; fragId++){
jam();
- FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, fragId, fragPtr);
replicaPtr.i = fragPtr.p->storedReplicas;
@@ -6701,6 +6899,7 @@
req->nodeId = getOwnNodeId();
req->totalFragments = fragCount;
req->startGci = SYSFILE->newestRestorableGCI;
+ req->logPartId = fragPtr.p->m_log_part_id;
sendSignal(DBDICT_REF, GSN_ADD_FRAGREQ, signal,
AddFragReq::SignalLength, JBB);
return;
@@ -6982,17 +7181,40 @@
tabPtr.i = req->tableId;
Uint32 hashValue = req->hashValue;
Uint32 ttabFileSize = ctabFileSize;
+ Uint32 fragId;
+ DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
TabRecord* regTabDesc = tabRecord;
jamEntry();
ptrCheckGuard(tabPtr, ttabFileSize, regTabDesc);
- Uint32 fragId = hashValue & tabPtr.p->mask;
- ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
- if (fragId < tabPtr.p->hashpointer) {
+ if (tabPtr.p->method == TabRecord::LINEAR_HASH)
+ {
jam();
- fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
- }//if
+ fragId = hashValue & tabPtr.p->mask;
+ ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
+ if (fragId < tabPtr.p->hashpointer) {
+ jam();
+ fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
+ }//if
+ }
+ else if (tabPtr.p->method == TabRecord::NORMAL_HASH)
+ {
+ jam();
+ fragId= hashValue % tabPtr.p->totalfragments;
+ }
+ else
+ {
+ jam();
+ ndbassert(tabPtr.p->method == TabRecord::USER_DEFINED);
+ fragId= hashValue;
+ if (fragId >= tabPtr.p->totalfragments)
+ {
+ jam();
+ conf->zero= 1; //Indicate error;
+ signal->theData[1]= ZUNDEFINED_FRAGMENT_ERROR;
+ return;
+ }
+ }
getFragstore(tabPtr.p, fragId, fragPtr);
- DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes);
Uint32 sig2 = (nodeCount - 1) +
(fragPtr.p->distributionKey << 16);
@@ -7159,39 +7381,66 @@
void Dbdih::execDI_FCOUNTREQ(Signal* signal)
{
+ DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtr();
ConnectRecordPtr connectPtr;
TabRecordPtr tabPtr;
+ const BlockReference senderRef = signal->senderBlockRef();
+ const Uint32 senderData = req->m_senderData;
jamEntry();
- connectPtr.i = signal->theData[0];
- tabPtr.i = signal->theData[1];
+ connectPtr.i = req->m_connectionData;
+ tabPtr.i = req->m_tableRef;
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
- ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
+ if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+ {
+ DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+ //connectPtr.i == RNIL -> question without connect record
+ if(connectPtr.i == RNIL)
+ ref->m_connectionData = RNIL;
+ else
+ ref->m_connectionData = connectPtr.p->userpointer;
+ ref->m_tableRef = tabPtr.i;
+ ref->m_senderData = senderData;
+ ref->m_error = DihFragCountRef::ErroneousTableState;
+ ref->m_tableStatus = tabPtr.p->tabStatus;
+ sendSignal(senderRef, GSN_DI_FCOUNTREF, signal,
+ DihFragCountRef::SignalLength, JBB);
+ return;
+ }
if(connectPtr.i != RNIL){
ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
if (connectPtr.p->connectState == ConnectRecord::INUSE) {
jam();
- signal->theData[0] = connectPtr.p->userpointer;
- signal->theData[1] = tabPtr.p->totalfragments;
- sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,2, JBB);
- return;
- }//if
- signal->theData[0] = connectPtr.p->userpointer;
- signal->theData[1] = ZERRONOUSSTATE;
- sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal, 2, JBB);
+ DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
+ conf->m_connectionData = connectPtr.p->userpointer;
+ conf->m_tableRef = tabPtr.i;
+ conf->m_senderData = senderData;
+ conf->m_fragmentCount = tabPtr.p->totalfragments;
+ conf->m_noOfBackups = tabPtr.p->noOfBackups;
+ sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTCONF, signal,
+ DihFragCountConf::SignalLength, JBB);
+ return;
+ }//if
+ DihFragCountRef* ref = (DihFragCountRef*)signal->getDataPtrSend();
+ ref->m_connectionData = connectPtr.p->userpointer;
+ ref->m_tableRef = tabPtr.i;
+ ref->m_senderData = senderData;
+ ref->m_error = DihFragCountRef::ErroneousTableState;
+ ref->m_tableStatus = tabPtr.p->tabStatus;
+ sendSignal(connectPtr.p->userblockref, GSN_DI_FCOUNTREF, signal,
+ DihFragCountRef::SignalLength, JBB);
return;
}//if
-
+ DihFragCountConf* conf = (DihFragCountConf*)signal->getDataPtrSend();
//connectPtr.i == RNIL -> question without connect record
- const Uint32 senderData = signal->theData[2];
- const BlockReference senderRef = signal->senderBlockRef();
- signal->theData[0] = RNIL;
- signal->theData[1] = tabPtr.p->totalfragments;
- signal->theData[2] = tabPtr.i;
- signal->theData[3] = senderData;
- signal->theData[4] = tabPtr.p->noOfBackups;
- sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal, 5, JBB);
+ conf->m_connectionData = RNIL;
+ conf->m_tableRef = tabPtr.i;
+ conf->m_senderData = senderData;
+ conf->m_fragmentCount = tabPtr.p->totalfragments;
+ conf->m_noOfBackups = tabPtr.p->noOfBackups;
+ sendSignal(senderRef, GSN_DI_FCOUNTCONF, signal,
+ DihFragCountConf::SignalLength, JBB);
}//Dbdih::execDI_FCOUNTREQ()
void Dbdih::execDIGETPRIMREQ(Signal* signal)
@@ -7950,9 +8199,12 @@
SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
rep->gci = coldgcp;
- rep->senderData = 0;
sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal,
SubGcpCompleteRep::SignalLength, JBB);
+
+ EXECUTE_DIRECT(LGMAN, GSN_SUB_GCP_COMPLETE_REP, signal,
+ SubGcpCompleteRep::SignalLength);
+ jamEntry();
}
jam();
@@ -8519,8 +8771,7 @@
rf.rwfTabPtr.p->hashpointer = readPageWord(&rf);
rf.rwfTabPtr.p->kvalue = readPageWord(&rf);
rf.rwfTabPtr.p->mask = readPageWord(&rf);
- ndbrequire(readPageWord(&rf) == TabRecord::HASH);
- rf.rwfTabPtr.p->method = TabRecord::HASH;
+ rf.rwfTabPtr.p->method = (TabRecord::Method)readPageWord(&rf);
/* ---------------------------------- */
/* Type of table, 2 = temporary table */
/* ---------------------------------- */
@@ -8614,7 +8865,7 @@
writePageWord(&wf, tabPtr.p->hashpointer);
writePageWord(&wf, tabPtr.p->kvalue);
writePageWord(&wf, tabPtr.p->mask);
- writePageWord(&wf, TabRecord::HASH);
+ writePageWord(&wf, tabPtr.p->method);
writePageWord(&wf, tabPtr.p->storedTable);
signal->theData[0] = DihContinueB::ZPACK_FRAG_INTO_PAGES;
@@ -8715,6 +8966,80 @@
/*****************************************************************************/
/* ********** START FRAGMENT MODULE *************/
/*****************************************************************************/
+void
+Dbdih::dump_replica_info()
+{
+ TabRecordPtr tabPtr;
+ FragmentstorePtr fragPtr;
+
+ for(tabPtr.i = 0; tabPtr.i < ctabFileSize; tabPtr.i++)
+ {
+ ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
+ if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+ continue;
+
+ for(Uint32 fid = 0; fid<tabPtr.p->totalfragments; fid++)
+ {
+ getFragstore(tabPtr.p, fid, fragPtr);
+ ndbout_c("tab: %d frag: %d gci: %d\n -- storedReplicas:",
+ tabPtr.i, fid, SYSFILE->newestRestorableGCI);
+
+ Uint32 i;
+ ReplicaRecordPtr replicaPtr;
+ replicaPtr.i = fragPtr.p->storedReplicas;
+ for(; replicaPtr.i != RNIL; replicaPtr.i = replicaPtr.p->nextReplica)
+ {
+ ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
+ ndbout_c(" node: %d initialGci: %d nextLcp: %d noCrashedReplicas: %d",
+ replicaPtr.p->procNode,
+ replicaPtr.p->initialGci,
+ replicaPtr.p->nextLcp,
+ replicaPtr.p->noCrashedReplicas);
+ for(i = 0; i<MAX_LCP_STORED; i++)
+ {
+ ndbout_c(" i: %d %s : lcpId: %d maxGci Completed: %d Started: %d",
+ i,
+ (replicaPtr.p->lcpStatus[i] == ZVALID ?"VALID":"INVALID"),
+ replicaPtr.p->lcpId[i],
+ replicaPtr.p->maxGciCompleted[i],
+ replicaPtr.p->maxGciStarted[i]);
+ }
+
+ for (i = 0; i < 8; i++)
+ {
+ ndbout_c(" crashed replica: %d replicaLastGci: %d createGci: %d",
+ i,
+ replicaPtr.p->replicaLastGci[i],
+ replicaPtr.p->createGci[i]);
+ }
+ }
+ ndbout_c(" -- oldStoredReplicas");
+ replicaPtr.i = fragPtr.p->oldStoredReplicas;
+ for(; replicaPtr.i != RNIL; replicaPtr.i = replicaPtr.p->nextReplica)
+ {
+ ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
+ for(i = 0; i<MAX_LCP_STORED; i++)
+ {
+ ndbout_c(" i: %d %s : lcpId: %d maxGci Completed: %d Started: %d",
+ i,
+ (replicaPtr.p->lcpStatus[i] == ZVALID ?"VALID":"INVALID"),
+ replicaPtr.p->lcpId[i],
+ replicaPtr.p->maxGciCompleted[i],
+ replicaPtr.p->maxGciStarted[i]);
+ }
+
+ for (i = 0; i < 8; i++)
+ {
+ ndbout_c(" crashed replica: %d replicaLastGci: %d createGci: %d",
+ i,
+ replicaPtr.p->replicaLastGci[i],
+ replicaPtr.p->createGci[i]);
+ }
+ }
+ }
+ }
+}
+
void Dbdih::startFragment(Signal* signal, Uint32 tableId, Uint32 fragId)
{
Uint32 TloopCount = 0;
@@ -8776,6 +9101,7 @@
/* SEARCH FOR STORED REPLICAS THAT CAN BE USED TO RESTART THE SYSTEM. */
/* ----------------------------------------------------------------------- */
searchStoredReplicas(fragPtr);
+
if (cnoOfCreateReplicas == 0) {
/* --------------------------------------------------------------------- */
/* THERE WERE NO STORED REPLICAS AVAILABLE THAT CAN SERVE AS REPLICA TO*/
@@ -8788,6 +9114,10 @@
char buf[64];
BaseString::snprintf(buf, sizeof(buf), "table: %d fragment: %d gci: %d",
tableId, fragId, SYSFILE->newestRestorableGCI);
+
+ ndbout_c(buf);
+ dump_replica_info();
+
progError(__LINE__, NDBD_EXIT_NO_RESTORABLE_REPLICA, buf);
ndbrequire(false);
return;
@@ -8864,8 +9194,8 @@
// otherwise we have a problem.
/* --------------------------------------------------------------------- */
jam();
- ndbrequire(senderNodeId == c_nodeStartMaster.startNode);
- nodeRestartStartRecConfLab(signal);
+ ndbout_c("startNextCopyFragment");
+ startNextCopyFragment(signal, findTakeOver(senderNodeId));
return;
} else {
/* --------------------------------------------------------------------- */
@@ -9883,9 +10213,11 @@
}
void Dbdih::findReplica(ReplicaRecordPtr& replicaPtr,
- Fragmentstore* fragPtrP, Uint32 nodeId)
+ Fragmentstore* fragPtrP,
+ Uint32 nodeId,
+ bool old)
{
- replicaPtr.i = fragPtrP->storedReplicas;
+ replicaPtr.i = old ? fragPtrP->oldStoredReplicas : fragPtrP->storedReplicas;
while(replicaPtr.i != RNIL){
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
if (replicaPtr.p->procNode == nodeId) {
@@ -11138,6 +11470,7 @@
cnoHotSpare = 0;
cnoOfActiveTables = 0;
cnoOfNodeGroups = 0;
+ c_nextNodeGroup = 0;
cnoReplicas = 0;
coldgcp = 0;
coldGcpId = 0;
@@ -11157,6 +11490,7 @@
c_newest_restorable_gci = 0;
cverifyQueueCounter = 0;
cwaitLcpSr = false;
+ c_nextLogPart = 0;
nodeResetStart();
c_nodeStartMaster.wait = ZFALSE;
@@ -11164,7 +11498,7 @@
memset(&sysfileData[0], 0, sizeof(sysfileData));
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
c_lcpState.clcpDelay = 20;
@@ -11243,6 +11577,8 @@
SYSFILE->takeOver[i] = 0;
}//for
Sysfile::setInitialStartOngoing(SYSFILE->systemRestartBits);
+ srand(time(0));
+ globalData.m_restart_seq = SYSFILE->m_restart_seq = 0;
}//Dbdih::initRestartInfo()
/*--------------------------------------------------------------------*/
@@ -11993,6 +12329,8 @@
jam();
fragPtr.p->distributionKey = TdistKey;
}//if
+
+ fragPtr.p->m_log_part_id = readPageWord(rf);
}//Dbdih::readFragment()
Uint32 Dbdih::readPageWord(RWFragment* rf)
@@ -13086,6 +13424,7 @@
writePageWord(wf, fragPtr.p->noStoredReplicas);
writePageWord(wf, fragPtr.p->noOldStoredReplicas);
writePageWord(wf, fragPtr.p->distributionKey);
+ writePageWord(wf, fragPtr.p->m_log_part_id);
}//Dbdih::writeFragment()
void Dbdih::writePageWord(RWFragment* wf, Uint32 dataWord)
@@ -13152,7 +13491,7 @@
signal->theData[0] = filePtr.p->fileRef;
signal->theData[1] = reference();
signal->theData[2] = filePtr.i;
- signal->theData[3] = ZLIST_OF_PAIRS;
+ signal->theData[3] = ZLIST_OF_PAIRS_SYNCH;
signal->theData[4] = ZVAR_NO_WORD;
signal->theData[5] = tab->noPages;
for (Uint32 i = 0; i < tab->noPages; i++) {
@@ -13515,7 +13854,7 @@
if (signal->getLength() == 1)
{
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &cgcpDelay);
}
--- 1.14.9.1/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2006-04-07 10:39:40 +02:00
+++ 1.33/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2006-04-07 10:32:08 +02:00
@@ -68,13 +68,13 @@
};
static BlockInfo ALL_BLOCKS[] = {
+ { NDBFS_REF, 0 , 2000, 2999 },
{ DBTC_REF, 1 , 8000, 8035 },
{ DBDIH_REF, 1 , 7000, 7173 },
{ DBLQH_REF, 1 , 5000, 5030 },
{ DBACC_REF, 1 , 3000, 3999 },
{ DBTUP_REF, 1 , 4000, 4007 },
{ DBDICT_REF, 1 , 6000, 6003 },
- { NDBFS_REF, 0 , 2000, 2999 },
{ NDBCNTR_REF, 0 , 1000, 1999 },
{ QMGR_REF, 1 , 1, 999 },
{ CMVMI_REF, 1 , 9000, 9999 },
@@ -83,11 +83,16 @@
{ DBUTIL_REF, 1 , 11000, 11999 },
{ SUMA_REF, 1 , 13000, 13999 },
{ DBTUX_REF, 1 , 12000, 12999 }
+ ,{ TSMAN_REF, 1 , 0, 0 }
+ ,{ LGMAN_REF, 1 , 0, 0 }
+ ,{ PGMAN_REF, 1 , 0, 0 }
+ ,{ RESTORE_REF,1 , 0, 0 }
};
static const Uint32 ALL_BLOCKS_SZ = sizeof(ALL_BLOCKS)/sizeof(BlockInfo);
static BlockReference readConfigOrder[ALL_BLOCKS_SZ] = {
+ CMVMI_REF,
DBTUP_REF,
DBACC_REF,
DBTC_REF,
@@ -98,11 +103,14 @@
NDBFS_REF,
NDBCNTR_REF,
QMGR_REF,
- CMVMI_REF,
TRIX_REF,
BACKUP_REF,
DBUTIL_REF,
- SUMA_REF
+ SUMA_REF,
+ TSMAN_REF,
+ LGMAN_REF,
+ PGMAN_REF,
+ RESTORE_REF
};
/*******************************/
@@ -131,7 +139,7 @@
jam();
Uint32 to_3= 0;
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndb_mgm_get_int_parameter(p, CFG_DB_START_FAILURE_TIMEOUT, &to_3);
BaseString tmp;
tmp.append("Shutting down node as total restart time exceeds "
@@ -160,6 +168,16 @@
}//switch
}//Ndbcntr::execCONTINUEB()
+void
+Ndbcntr::execAPI_START_REP(Signal* signal)
+{
+ if(refToBlock(signal->getSendersBlockRef()) == QMGR)
+ {
+ for(Uint32 i = 0; i<ALL_BLOCKS_SZ; i++){
+ sendSignal(ALL_BLOCKS[i].Ref, GSN_API_START_REP, signal, 1, JBB);
+ }
+ }
+}
/*******************************/
/* SYSTEM_ERROR */
/*******************************/
@@ -187,6 +205,20 @@
killingNode, data1);
break;
+ case SystemError::CopySubscriptionRef:
+ BaseString::snprintf(buf, sizeof(buf),
+ "Node %d killed this node because "
+ "it could not copy a subscription during node restart. "
+ "Copy subscription error code: %u.",
+ killingNode, data1);
+ break;
+ case SystemError::CopySubscriberRef:
+ BaseString::snprintf(buf, sizeof(buf),
+ "Node %d killed this node because "
+ "it could not start a subscriber during node restart. "
+ "Copy subscription error code: %u.",
+ killingNode, data1);
+ break;
default:
BaseString::snprintf(buf, sizeof(buf), "System error %d, "
" this node was killed by node %d",
@@ -209,7 +241,7 @@
Uint32 senderData = req->senderData;
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -224,16 +256,12 @@
jamEntry();
cstartPhase = signal->theData[1];
- NodeState newState(NodeState::SL_STARTING, cstartPhase,
- (NodeState::StartType)ctypeOfStart);
- updateNodeState(signal, newState);
-
cndbBlocksCount = 0;
cinternalStartphase = cstartPhase - 1;
switch (cstartPhase) {
case 0:
- if(theConfiguration.getInitialStart()){
+ if(m_ctx.m_config.getInitialStart()){
jam();
c_fsRemoveCount = 0;
clearFilesystem(signal);
@@ -476,7 +504,7 @@
Uint32 to_3 = 0;
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
ndb_mgm_get_int_parameter(p, CFG_DB_START_PARTIAL_TIMEOUT, &to_1);
@@ -591,6 +619,13 @@
Uint32 nodeId = signal->theData[0];
c_startedNodes.set(nodeId);
c_start.m_starting.clear(nodeId);
+
+ /**
+ * Inform all interested blocks that node has started
+ */
+ for(Uint32 i = 0; i<ALL_BLOCKS_SZ; i++){
+ sendSignal(ALL_BLOCKS[i].Ref, GSN_NODE_START_REP, signal, 1, JBB);
+ }
if(!c_start.m_starting.isclear()){
jam();
@@ -1464,6 +1499,9 @@
sendSignal(SUMA_REF, GSN_NODE_FAILREP, signal,
NodeFailRep::SignalLength, JBB);
+ sendSignal(QMGR_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
if (c_stopRec.stopReq.senderRef)
{
jam();
@@ -1632,7 +1670,6 @@
//w.add(DictTabInfo::MinLoadFactor, 70);
//w.add(DictTabInfo::MaxLoadFactor, 80);
w.add(DictTabInfo::FragmentTypeVal, (Uint32)table.fragmentType);
- //w.add(DictTabInfo::TableStorageVal, (Uint32)DictTabInfo::MainMemory);
//w.add(DictTabInfo::NoOfKeyAttr, 1);
w.add(DictTabInfo::NoOfAttributes, (Uint32)table.columnCount);
//w.add(DictTabInfo::NoOfNullable, (Uint32)0);
@@ -1644,9 +1681,12 @@
const SysColumn& column = table.columnList[i];
ndbassert(column.pos == i);
w.add(DictTabInfo::AttributeName, column.name);
- w.add(DictTabInfo::AttributeId, (Uint32)column.pos);
+ w.add(DictTabInfo::AttributeId, (Uint32)i);
w.add(DictTabInfo::AttributeKeyFlag, (Uint32)column.keyFlag);
- //w.add(DictTabInfo::AttributeStorage, (Uint32)DictTabInfo::MainMemory);
+ w.add(DictTabInfo::AttributeStorageType,
+ (Uint32)NDB_STORAGETYPE_MEMORY);
+ w.add(DictTabInfo::AttributeArrayType,
+ (Uint32)NDB_ARRAYTYPE_FIXED);
w.add(DictTabInfo::AttributeNullableFlag, (Uint32)column.nullable);
w.add(DictTabInfo::AttributeExtType, (Uint32)column.type);
w.add(DictTabInfo::AttributeExtLength, (Uint32)column.length);
@@ -1767,9 +1807,9 @@
tKeyDataPtr[0] = tkey;
- AttributeHeader::init(&tAIDataPtr[0], 0, 1);
+ AttributeHeader::init(&tAIDataPtr[0], 0, 1 << 2);
tAIDataPtr[1] = tkey;
- AttributeHeader::init(&tAIDataPtr[2], 1, 2);
+ AttributeHeader::init(&tAIDataPtr[2], 1, 2 << 2);
tAIDataPtr[3] = (tkey << 16);
tAIDataPtr[4] = 1;
sendSignal(DBTC_REF, GSN_TCKEYREQ, signal,
@@ -1984,8 +2024,8 @@
}
if (arg == DumpStateOrd::NdbcntrTestStopOnError){
- if (theConfiguration.stopOnError() == true)
- ((Configuration&)theConfiguration).stopOnError(false);
+ if (m_ctx.m_config.stopOnError() == true)
+ ((Configuration&)m_ctx.m_config).stopOnError(false);
const BlockReference tblockref = calcNdbCntrBlockRef(getOwnNodeId());
@@ -2188,7 +2228,7 @@
jam();
if(StopReq::getPerformRestart(c_stopRec.stopReq.requestInfo))
{
- ((Configuration&)theConfiguration).stopOnError(false);
+ ((Configuration&)m_ctx.m_config).stopOnError(false);
}
}
if(!c_stopRec.checkNodeFail(signal))
@@ -2830,6 +2870,10 @@
currentBlockIndex = 0;
+ NodeState newState(NodeState::SL_STARTING, currentStartPhase,
+ (NodeState::StartType)cntr.ctypeOfStart);
+ cntr.updateNodeState(signal, newState);
+
if(start != 0){
/**
* At least one wanted this start phase, report it
--- 1.13.14.2/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2006-04-07 10:29:45 +02:00
+++ 1.31/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2006-04-07 10:32:09 +02:00
@@ -228,7 +228,7 @@
Uint32 senderData = req->senderData;
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -2005,7 +2005,7 @@
* Timeouts
*/
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
Uint32 hbDBDB = 1500;
@@ -2054,7 +2054,7 @@
sd->ticket.clear();
sd->mask.clear();
ndb_mgm_configuration_iterator * iter =
- theConfiguration.getClusterConfigIterator();
+ m_ctx.m_config.getClusterConfigIterator();
for (ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)) {
Uint32 tmp = 0;
if (ndb_mgm_get_int_parameter(iter, CFG_NODE_ARBIT_RANK, &tmp) == 0 &&
@@ -2611,9 +2611,6 @@
case NodeInfo::MGM:
compatability_check = ndbCompatible_ndb_mgmt(NDB_VERSION, version);
break;
- case NodeInfo::REP:
- // compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version);
- // break;
case NodeInfo::DB:
case NodeInfo::INVALID:
default:
@@ -2644,7 +2641,7 @@
apiRegConf->qmgrRef = reference();
apiRegConf->apiHeartbeatFrequency = (chbApiDelay / 10);
apiRegConf->version = NDB_VERSION;
- apiRegConf->nodeState = getNodeState();
+ NodeState state= apiRegConf->nodeState = getNodeState();
{
NodeRecPtr nodePtr;
nodePtr.i = getOwnNodeId();
@@ -2662,9 +2659,12 @@
sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB);
- if ((getNodeState().startLevel == NodeState::SL_STARTED ||
- getNodeState().getSingleUserMode())
- && apiNodePtr.p->phase == ZAPI_INACTIVE) {
+ if (apiNodePtr.p->phase == ZAPI_INACTIVE &&
+ (state.startLevel == NodeState::SL_STARTED ||
+ state.getSingleUserMode() ||
+ (state.startLevel == NodeState::SL_STARTING &&
+ state.starting.startPhase >= 100)))
+ {
jam();
/**----------------------------------------------------------------------
* THE API NODE IS REGISTERING. WE WILL ACCEPT IT BY CHANGING STATE AND
@@ -2674,6 +2674,9 @@
apiNodePtr.p->blockRef = ref;
signal->theData[0] = apiNodePtr.i;
sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA);
+
+ signal->theData[0] = apiNodePtr.i;
+ EXECUTE_DIRECT(NDBCNTR, GSN_API_START_REP, signal, 1);
}
return;
}//Qmgr::execAPI_REGREQ()
@@ -4718,6 +4721,170 @@
}
void
+Qmgr::execNODE_FAILREP(Signal * signal)
+{
+ jamEntry();
+ // make sure any distributed signals get acknowledged
+ // destructive of the signal
+ c_counterMgr.execNODE_FAILREP(signal);
+}
+
+void
+Qmgr::execALLOC_NODEID_REQ(Signal * signal)
+{
+ jamEntry();
+ const AllocNodeIdReq * req = (AllocNodeIdReq*)signal->getDataPtr();
+ Uint32 senderRef = req->senderRef;
+ Uint32 nodeId = req->nodeId;
+ Uint32 error = 0;
+
+ if (refToBlock(senderRef) != QMGR) // request from management server
+ {
+ /* master */
+
+ if (getOwnNodeId() != cpresident)
+ error = AllocNodeIdRef::NotMaster;
+ else if (!opAllocNodeIdReq.m_tracker.done())
+ error = AllocNodeIdRef::Busy;
+ else if (c_connectedNodes.get(nodeId))
+ error = AllocNodeIdRef::NodeConnected;
+
+ if (error)
+ {
+ jam();
+ AllocNodeIdRef * ref = (AllocNodeIdRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->errorCode = error;
+ ref->masterRef = numberToRef(QMGR, cpresident);
+ sendSignal(senderRef, GSN_ALLOC_NODEID_REF, signal,
+ AllocNodeIdRef::SignalLength, JBB);
+ return;
+ }
+
+ opAllocNodeIdReq.m_req = *req;
+ opAllocNodeIdReq.m_error = 0;
+ opAllocNodeIdReq.m_connectCount = getNodeInfo(refToNode(senderRef)).m_connectCount;
+
+ jam();
+ AllocNodeIdReq * req = (AllocNodeIdReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ NodeReceiverGroup rg(QMGR, c_clusterNodes);
+ RequestTracker & p = opAllocNodeIdReq.m_tracker;
+ p.init<AllocNodeIdRef>(c_counterMgr, rg, GSN_ALLOC_NODEID_REF, 0);
+
+ sendSignal(rg, GSN_ALLOC_NODEID_REQ, signal,
+ AllocNodeIdReq::SignalLength, JBB);
+ return;
+ }
+
+ /* participant */
+
+ if (c_connectedNodes.get(nodeId))
+ error = AllocNodeIdRef::NodeConnected;
+ else
+ {
+ NodeRecPtr nodePtr;
+ nodePtr.i = nodeId;
+ ptrAss(nodePtr, nodeRec);
+ if (nodePtr.p->failState != NORMAL)
+ error = AllocNodeIdRef::NodeFailureHandlingNotCompleted;
+ }
+
+ if (error)
+ {
+ AllocNodeIdRef * ref = (AllocNodeIdRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->errorCode = error;
+ sendSignal(senderRef, GSN_ALLOC_NODEID_REF, signal,
+ AllocNodeIdRef::SignalLength, JBB);
+ return;
+ }
+
+ AllocNodeIdConf * conf = (AllocNodeIdConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ sendSignal(senderRef, GSN_ALLOC_NODEID_CONF, signal,
+ AllocNodeIdConf::SignalLength, JBB);
+}
+
+void
+Qmgr::execALLOC_NODEID_CONF(Signal * signal)
+{
+ /* master */
+
+ jamEntry();
+ const AllocNodeIdConf * conf = (AllocNodeIdConf*)signal->getDataPtr();
+ opAllocNodeIdReq.m_tracker.reportConf(c_counterMgr,
+ refToNode(conf->senderRef));
+ completeAllocNodeIdReq(signal);
+}
+
+
+void
+Qmgr::execALLOC_NODEID_REF(Signal * signal)
+{
+ /* master */
+
+ jamEntry();
+ const AllocNodeIdRef * ref = (AllocNodeIdRef*)signal->getDataPtr();
+ if (ref->errorCode == AllocNodeIdRef::NF_FakeErrorREF)
+ {
+ opAllocNodeIdReq.m_tracker.ignoreRef(c_counterMgr,
+ refToNode(ref->senderRef));
+ }
+ else
+ {
+ opAllocNodeIdReq.m_tracker.reportRef(c_counterMgr,
+ refToNode(ref->senderRef));
+ if (opAllocNodeIdReq.m_error == 0)
+ opAllocNodeIdReq.m_error = ref->errorCode;
+ }
+ completeAllocNodeIdReq(signal);
+}
+
+void
+Qmgr::completeAllocNodeIdReq(Signal *signal)
+{
+ /* master */
+
+ if (!opAllocNodeIdReq.m_tracker.done())
+ {
+ jam();
+ return;
+ }
+
+ if (opAllocNodeIdReq.m_connectCount !=
+ getNodeInfo(refToNode(opAllocNodeIdReq.m_req.senderRef)).m_connectCount)
+ {
+ // management server not same version as the original requester
+ jam();
+ return;
+ }
+
+ if (opAllocNodeIdReq.m_tracker.hasRef())
+ {
+ jam();
+ AllocNodeIdRef * ref = (AllocNodeIdRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->senderData = opAllocNodeIdReq.m_req.senderData;
+ ref->nodeId = opAllocNodeIdReq.m_req.nodeId;
+ ref->errorCode = opAllocNodeIdReq.m_error;
+ ref->masterRef = numberToRef(QMGR, cpresident);
+ ndbassert(AllocNodeIdRef::SignalLength == 5);
+ sendSignal(opAllocNodeIdReq.m_req.senderRef, GSN_ALLOC_NODEID_REF, signal,
+ AllocNodeIdRef::SignalLength, JBB);
+ return;
+ }
+ jam();
+ AllocNodeIdConf * conf = (AllocNodeIdConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = opAllocNodeIdReq.m_req.senderData;
+ conf->nodeId = opAllocNodeIdReq.m_req.nodeId;
+ ndbassert(AllocNodeIdConf::SignalLength == 3);
+ sendSignal(opAllocNodeIdReq.m_req.senderRef, GSN_ALLOC_NODEID_CONF, signal,
+ AllocNodeIdConf::SignalLength, JBB);
+}
+
+void
Qmgr::execSTOP_REQ(Signal* signal)
{
jamEntry();
@@ -4725,6 +4892,7 @@
if (c_stopReq.senderRef)
{
+ jam();
ndbrequire(NdbNodeBitmask::get(c_stopReq.nodes, getOwnNodeId()));
StopConf *conf = (StopConf*)signal->getDataPtrSend();
--- 1.49.8.1/ndb/src/mgmclient/CommandInterpreter.cpp 2006-04-07 10:39:40 +02:00
+++ 1.60/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2006-04-07 10:32:09 +02:00
@@ -16,14 +16,7 @@
#include <ndb_global.h>
#include <my_sys.h>
-
-//#define HAVE_GLOBAL_REPLICATION
-
#include <Vector.hpp>
-#ifdef HAVE_GLOBAL_REPLICATION
-#include "../rep/repapi/repapi.h"
-#endif
-
#include <mgmapi.h>
#include <util/BaseString.hpp>
@@ -166,11 +159,6 @@
int m_verbose;
int try_reconnect;
int m_error;
-#ifdef HAVE_GLOBAL_REPLICATION
- NdbRepHandle m_repserver;
- const char *rep_host;
- bool rep_connected;
-#endif
struct NdbThread* m_event_thread;
};
@@ -228,10 +216,6 @@
#include <NdbMem.h>
#include <EventLogger.hpp>
#include <signaldata/SetLogLevelOrd.hpp>
-#include <signaldata/GrepImpl.hpp>
-#ifdef HAVE_GLOBAL_REPLICATION
-
-#endif // HAVE_GLOBAL_REPLICATION
#include "MgmtErrorReporter.hpp"
#include <Parser.hpp>
#include <SocketServer.hpp>
@@ -259,9 +243,6 @@
"---------------------------------------------------------------------------\n"
"HELP Print help text\n"
"HELP SHOW Help for SHOW command\n"
-#ifdef HAVE_GLOBAL_REPLICATION
-"HELP REPLICATION Help for global replication\n"
-#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
"HELP DEBUG Help for debug compiled version\n"
#endif
@@ -285,9 +266,6 @@
"EXIT SINGLE USER MODE Exit single user mode\n"
"<id> STATUS Print status\n"
"<id> CLUSTERLOG {<category>=<level>}+ Set log level for cluster log\n"
-#ifdef HAVE_GLOBAL_REPLICATION
-"REP CONNECT <host:port> Connect to REP server on host:port\n"
-#endif
"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n"
"CONNECT [<connectstring>] Connect to management server (reconnect if already connected)\n"
"QUIT Quit management client\n"
@@ -305,39 +283,6 @@
#endif
;
-#ifdef HAVE_GLOBAL_REPLICATION
-static const char* helpTextRep =
-"---------------------------------------------------------------------------\n"
-" NDB Cluster -- Management Client -- Help for Global Replication\n"
-"---------------------------------------------------------------------------\n"
-"Commands should be executed on the standby NDB Cluster\n"
-"These features are in an experimental release state.\n"
-"\n"
-"Simple Commands:\n"
-"REP START Start Global Replication\n"
-"REP START REQUESTOR Start Global Replication Requestor\n"
-"REP STATUS Show Global Replication status\n"
-"REP STOP Stop Global Replication\n"
-"REP STOP REQUESTOR Stop Global Replication Requestor\n"
-"\n"
-"Advanced Commands:\n"
-"REP START <protocol> Starts protocol\n"
-"REP STOP <protocol> Stops protocol\n"
-"<protocol> = TRANSFER | APPLY | DELETE\n"
-"\n"
-#ifdef VM_TRACE // DEBUG ONLY
-"Debugging commands:\n"
-"REP DELETE Removes epochs stored in primary and standy systems\n"
-"REP DROP <tableid> Drop a table in SS identified by table id\n"
-"REP SLOWSTOP Stop Replication (Tries to synchonize with primary)\n"
-"REP FASTSTOP Stop Replication (Stops in consistent state)\n"
-"<component> = SUBSCRIPTION\n"
-" METALOG | METASCAN | DATALOG | DATASCAN\n"
-" REQUESTOR | TRANSFER | APPLY | DELETE\n"
-#endif
-;
-#endif // HAVE_GLOBAL_REPLICATION
-
#ifdef VM_TRACE // DEBUG ONLY
static const char* helpTextDebug =
"---------------------------------------------------------------------------\n"
@@ -409,11 +354,6 @@
m_connected= false;
m_event_thread= 0;
try_reconnect = 0;
-#ifdef HAVE_GLOBAL_REPLICATION
- rep_host = NULL;
- m_repserver = NULL;
- rep_connected = false;
-#endif
}
/*
@@ -713,13 +653,6 @@
executePurge(allAfterFirstToken);
DBUG_RETURN(true);
}
-#ifdef HAVE_GLOBAL_REPLICATION
- else if(strcasecmp(firstToken, "REPLICATION") == 0 ||
- strcasecmp(firstToken, "REP") == 0) {
- executeRep(allAfterFirstToken);
- DBUG_RETURN(true);
- }
-#endif // HAVE_GLOBAL_REPLICATION
else if(strcasecmp(firstToken, "ENTER") == 0 &&
allAfterFirstToken != NULL &&
strncasecmp(allAfterFirstToken, "SINGLE USER MODE ",
@@ -1026,11 +959,6 @@
ndbout << endl;
} else if (strcasecmp(parameters, "SHOW") == 0) {
ndbout << helpTextShow;
-#ifdef HAVE_GLOBAL_REPLICATION
- } else if (strcasecmp(parameters, "REPLICATION") == 0 ||
- strcasecmp(parameters, "REP") == 0) {
- ndbout << helpTextRep;
-#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
} else if (strcasecmp(parameters, "DEBUG") == 0) {
ndbout << helpTextDebug;
@@ -1275,8 +1203,6 @@
case NDB_MGM_NODE_TYPE_UNKNOWN:
ndbout << "Error: Unknown Node Type" << endl;
return;
- case NDB_MGM_NODE_TYPE_REP:
- abort();
}
}
@@ -2216,226 +2142,5 @@
ndbout << "Invalid arguments: expected <BackupId>" << endl;
return;
}
-
-#ifdef HAVE_GLOBAL_REPLICATION
-/*****************************************************************************
- * Global Replication
- *
- * For information about the different commands, see
- * GrepReq::Request in file signaldata/grepImpl.cpp.
- *
- * Below are commands as of 2003-07-05 (may change!):
- * START = 0, ///< Start Global Replication (all phases)
- * START_METALOG = 1, ///< Start Global Replication (all phases)
- * START_METASCAN = 2, ///< Start Global Replication (all phases)
- * START_DATALOG = 3, ///< Start Global Replication (all phases)
- * START_DATASCAN = 4, ///< Start Global Replication (all phases)
- * START_REQUESTOR = 5, ///< Start Global Replication (all phases)
- * ABORT = 6, ///< Immediate stop (removes subscription)
- * SLOW_STOP = 7, ///< Stop after finishing applying current GCI epoch
- * FAST_STOP = 8, ///< Stop after finishing applying all PS GCI epochs
- * START_TRANSFER = 9, ///< Start SS-PS transfer
- * STOP_TRANSFER = 10, ///< Stop SS-PS transfer
- * START_APPLY = 11, ///< Start applying GCI epochs in SS
- * STOP_APPLY = 12, ///< Stop applying GCI epochs in SS
- * STATUS = 13, ///< Status
- * START_SUBSCR = 14,
- * REMOVE_BUFFERS = 15,
- * DROP_TABLE = 16
-
- *****************************************************************************/
-
-void
-CommandInterpreter::executeRep(char* parameters)
-{
- if (emptyString(parameters)) {
- ndbout << helpTextRep;
- return;
- }
-
- char * line = my_strdup(parameters,MYF(MY_WME));
- My_auto_ptr<char> ap1((char*)line);
- char * firstToken = strtok(line, " ");
-
- struct ndb_rep_reply reply;
- unsigned int repId;
-
-
- if (!strcasecmp(firstToken, "CONNECT")) {
- char * host = strtok(NULL, "\0");
- for (unsigned int i = 0; i < strlen(host); ++i) {
- host[i] = tolower(host[i]);
- }
-
- if(host == NULL)
- {
- ndbout_c("host:port must be specified.");
- return;
- }
-
- if(rep_connected) {
- if(m_repserver != NULL) {
- ndb_rep_disconnect(m_repserver);
- rep_connected = false;
- }
- }
-
- if(m_repserver == NULL)
- m_repserver = ndb_rep_create_handle();
- if(ndb_rep_connect(m_repserver, host) < 0)
- ndbout_c("Failed to connect to %s", host);
- else
- rep_connected=true;
- return;
-
- if(!rep_connected) {
- ndbout_c("Not connected to REP server");
- }
- }
-
- /********
- * START
- ********/
- if (!strcasecmp(firstToken, "START")) {
-
- unsigned int req;
- char *startType = strtok(NULL, "\0");
-
- if (startType == NULL) {
- req = GrepReq::START;
- } else if (!strcasecmp(startType, "SUBSCRIPTION")) {
- req = GrepReq::START_SUBSCR;
- } else if (!strcasecmp(startType, "METALOG")) {
- req = GrepReq::START_METALOG;
- } else if (!strcasecmp(startType, "METASCAN")) {
- req = GrepReq::START_METASCAN;
- } else if (!strcasecmp(startType, "DATALOG")) {
- req = GrepReq::START_DATALOG;
- } else if (!strcasecmp(startType, "DATASCAN")) {
- req = GrepReq::START_DATASCAN;
- } else if (!strcasecmp(startType, "REQUESTOR")) {
- req = GrepReq::START_REQUESTOR;
- } else if (!strcasecmp(startType, "TRANSFER")) {
- req = GrepReq::START_TRANSFER;
- } else if (!strcasecmp(startType, "APPLY")) {
- req = GrepReq::START_APPLY;
- } else if (!strcasecmp(startType, "DELETE")) {
- req = GrepReq::START_DELETE;
- } else {
- ndbout_c("Illegal argument to command 'REPLICATION START'");
- return;
- }
-
- int result = ndb_rep_command(m_repserver, req, &repId, &reply);
-
- if (result != 0) {
- ndbout << "Start of Global Replication failed" << endl;
- } else {
- ndbout << "Start of Global Replication ordered" << endl;
- }
- return;
- }
-
- /********
- * STOP
- ********/
- if (!strcasecmp(firstToken, "STOP")) {
- unsigned int req;
- char *startType = strtok(NULL, " ");
- unsigned int epoch = 0;
-
- if (startType == NULL) {
- /**
- * Stop immediately
- */
- req = GrepReq::STOP;
- } else if (!strcasecmp(startType, "EPOCH")) {
- char *strEpoch = strtok(NULL, "\0");
- if(strEpoch == NULL) {
- ndbout_c("Epoch expected!");
- return;
- }
- req = GrepReq::STOP;
- epoch=atoi(strEpoch);
- } else if (!strcasecmp(startType, "SUBSCRIPTION")) {
- req = GrepReq::STOP_SUBSCR;
- } else if (!strcasecmp(startType, "METALOG")) {
- req = GrepReq::STOP_METALOG;
- } else if (!strcasecmp(startType, "METASCAN")) {
- req = GrepReq::STOP_METASCAN;
- } else if (!strcasecmp(startType, "DATALOG")) {
- req = GrepReq::STOP_DATALOG;
- } else if (!strcasecmp(startType, "DATASCAN")) {
- req = GrepReq::STOP_DATASCAN;
- } else if (!strcasecmp(startType, "REQUESTOR")) {
- req = GrepReq::STOP_REQUESTOR;
- } else if (!strcasecmp(startType, "TRANSFER")) {
- req = GrepReq::STOP_TRANSFER;
- } else if (!strcasecmp(startType, "APPLY")) {
- req = GrepReq::STOP_APPLY;
- } else if (!strcasecmp(startType, "DELETE")) {
- req = GrepReq::STOP_DELETE;
- } else {
- ndbout_c("Illegal argument to command 'REPLICATION STOP'");
- return;
- }
- int result = ndb_rep_command(m_repserver, req, &repId, &reply, epoch);
-
- if (result != 0) {
- ndbout << "Stop command failed" << endl;
- } else {
- ndbout << "Stop ordered" << endl;
- }
- return;
- }
-
- /*********
- * STATUS
- *********/
- if (!strcasecmp(firstToken, "STATUS")) {
- struct rep_state repstate;
- int result =
- ndb_rep_get_status(m_repserver, &repId, &reply, &repstate);
-
- if (result != 0) {
- ndbout << "Status request of Global Replication failed" << endl;
- } else {
- ndbout << "Status request of Global Replication ordered" << endl;
- ndbout << "See printout at one of the DB nodes" << endl;
- ndbout << "(Better status report is under development.)" << endl;
- ndbout << " SubscriptionId " << repstate.subid
- << " SubscriptionKey " << repstate.subkey << endl;
- }
- return;
- }
-
- /*********
- * QUERY (see repapi.h for querable counters)
- *********/
- if (!strcasecmp(firstToken, "QUERY")) {
- char *query = strtok(NULL, "\0");
- int queryCounter=-1;
- if(query != NULL) {
- queryCounter = atoi(query);
- }
- struct rep_state repstate;
- unsigned repId = 0;
- int result = ndb_rep_query(m_repserver, (QueryCounter)queryCounter,
- &repId, &reply, &repstate);
-
- if (result != 0) {
- ndbout << "Query repserver failed" << endl;
- } else {
- ndbout << "Query repserver sucessful" << endl;
- ndbout_c("repstate : QueryCounter %d, f=%d l=%d"
- " nodegroups %d" ,
- repstate.queryCounter,
- repstate.first[0], repstate.last[0],
- repstate.no_of_nodegroups );
- }
- return;
- }
-}
-#endif // HAVE_GLOBAL_REPLICATION
template class Vector<char const*>;
--- 1.73.19.2/ndb/src/mgmsrv/MgmtSrvr.cpp 2006-04-07 12:00:59 +02:00
+++ 1.92/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2006-04-07 10:32:09 +02:00
@@ -37,10 +37,10 @@
#include <signaldata/EventReport.hpp>
#include <signaldata/DumpStateOrd.hpp>
#include <signaldata/BackupSignalData.hpp>
-#include <signaldata/GrepImpl.hpp>
#include <signaldata/ManagementServer.hpp>
#include <signaldata/NFCompleteRep.hpp>
#include <signaldata/NodeFailRep.hpp>
+#include <signaldata/AllocNodeId.hpp>
#include <NdbSleep.h>
#include <EventLogger.hpp>
#include <DebuggerNames.hpp>
@@ -485,10 +485,6 @@
case NODE_TYPE_MGM:
nodeTypes[id] = NDB_MGM_NODE_TYPE_MGM;
break;
- case NODE_TYPE_REP:
- nodeTypes[id] = NDB_MGM_NODE_TYPE_REP;
- break;
- case NODE_TYPE_EXT_REP:
default:
break;
}
@@ -570,8 +566,7 @@
DBUG_RETURN(false);
}
}
- theFacade= TransporterFacade::theFacadeInstance
- = new TransporterFacade();
+ theFacade= new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
@@ -1797,9 +1792,6 @@
break;
case GSN_EVENT_REP:
{
- EventReport *rep = CAST_PTR(EventReport, signal->getDataPtrSend());
- if (rep->getNodeId() == 0)
- rep->setNodeId(refToNode(signal->theSendersBlockRef));
eventReport(signal->getDataPtr());
break;
}
@@ -1840,7 +1832,6 @@
DBUG_VOID_RETURN;
}
}
-
rep->setNodeId(_ownNodeId);
eventReport(theData);
DBUG_VOID_RETURN;
@@ -1913,6 +1904,88 @@
}
}
+int
+MgmtSrvr::alloc_node_id_req(Uint32 free_node_id)
+{
+ SignalSender ss(theFacade);
+ ss.lock(); // lock will be released on exit
+
+ SimpleSignal ssig;
+ AllocNodeIdReq* req = CAST_PTR(AllocNodeIdReq, ssig.getDataPtrSend());
+ ssig.set(ss, TestOrd::TraceAPI, QMGR, GSN_ALLOC_NODEID_REQ,
+ AllocNodeIdReq::SignalLength);
+
+ req->senderRef = ss.getOwnRef();
+ req->senderData = 19;
+ req->nodeId = free_node_id;
+
+ int do_send = 1;
+ NodeId nodeId = 0;
+ while (1)
+ {
+ if (nodeId == 0)
+ {
+ bool next;
+ while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
+ theFacade->get_node_alive(nodeId) == false);
+ if (!next)
+ return NO_CONTACT_WITH_DB_NODES;
+ do_send = 1;
+ }
+ if (do_send)
+ {
+ if (ss.sendSignal(nodeId, &ssig) != SEND_OK) {
+ return SEND_OR_RECEIVE_FAILED;
+ }
+ do_send = 0;
+ }
+
+ SimpleSignal *signal = ss.waitFor();
+
+ int gsn = signal->readSignalNumber();
+ switch (gsn) {
+ case GSN_ALLOC_NODEID_CONF:
+ {
+ const AllocNodeIdConf * const conf =
+ CAST_CONSTPTR(AllocNodeIdConf, signal->getDataPtr());
+ return 0;
+ }
+ case GSN_ALLOC_NODEID_REF:
+ {
+ const AllocNodeIdRef * const ref =
+ CAST_CONSTPTR(AllocNodeIdRef, signal->getDataPtr());
+ if (ref->errorCode == AllocNodeIdRef::NotMaster ||
+ ref->errorCode == AllocNodeIdRef::Busy)
+ {
+ do_send = 1;
+ nodeId = refToNode(ref->masterRef);
+ continue;
+ }
+ return ref->errorCode;
+ }
+ case GSN_NF_COMPLETEREP:
+ {
+ const NFCompleteRep * const rep =
+ CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
+#ifdef VM_TRACE
+ ndbout_c("Node %d fail completed", rep->failedNodeId);
+#endif
+ if (rep->failedNodeId == nodeId)
+ nodeId = 0;
+ continue;
+ }
+ case GSN_NODE_FAILREP:{
+ // ignore NF_COMPLETEREP will come
+ continue;
+ }
+ default:
+ report_unknown_signal(signal);
+ return SEND_OR_RECEIVE_FAILED;
+ }
+ }
+ return 0;
+}
+
bool
MgmtSrvr::alloc_node_id(NodeId * nodeId,
enum ndb_mgm_node_type type,
@@ -2037,6 +2110,39 @@
}
NdbMutex_Unlock(m_configMutex);
+ if (id_found && client_addr != 0)
+ {
+ int res = alloc_node_id_req(id_found);
+ unsigned save_id_found = id_found;
+ switch (res)
+ {
+ case 0:
+ // ok continue
+ break;
+ case NO_CONTACT_WITH_DB_NODES:
+ // ok continue
+ break;
+ default:
+ // something wrong
+ id_found = 0;
+ break;
+
+ }
+ if (id_found == 0)
+ {
+ char buf[128];
+ ndb_error_string(res, buf, sizeof(buf));
+ error_string.appfmt("Cluster refused allocation of id %d. Error: %d (%s).",
+ save_id_found, res, buf);
+ g_eventLogger.warning("Cluster refused allocation of id %d. "
+ "Connection from ip %s. "
+ "Returned error string \"%s\"", save_id_found,
+ inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr),
+ error_string.c_str());
+ DBUG_RETURN(false);
+ }
+ }
+
if (id_found)
{
*nodeId= id_found;
@@ -2363,17 +2469,6 @@
return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}
-
-/*****************************************************************************
- * Global Replication
- *****************************************************************************/
-
-int
-MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
-{
- require(false);
- return 0;
-}
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m)
--- 1.30.10.1/ndb/src/mgmsrv/MgmtSrvr.hpp 2006-04-07 10:39:41 +02:00
+++ 1.42/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2006-04-07 10:32:09 +02:00
@@ -341,11 +341,6 @@
int abortBackup(Uint32 backupId);
int performBackup(Uint32* backupId);
- /**
- * Global Replication
- */
- int repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted = false);
-
//**************************************************************************
// Description: Set event report level for a DB process
// Parameters:
@@ -520,7 +515,8 @@
* @return -1 if block not found, otherwise block number
*/
int getBlockNumber(const BaseString &blockName);
-
+
+ int alloc_node_id_req(Uint32 free_node_id);
//**************************************************************************
int _blockNumber;
--- 1.45.15.2/ndb/src/mgmsrv/Services.cpp 2006-04-07 12:01:00 +02:00
+++ 1.61/storage/ndb/src/mgmsrv/Services.cpp 2006-04-07 10:32:09 +02:00
@@ -451,9 +451,9 @@
return;
}
- struct sockaddr addr;
+ struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
- int r = getpeername(m_socket, &addr, &addrlen);
+ int r = getpeername(m_socket, (struct sockaddr*)&addr, &addrlen);
if (r != 0 ) {
m_output->println(cmd);
m_output->println("result: getpeername(%d) failed, err= %d", m_socket, r);
@@ -465,7 +465,7 @@
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string;
if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
- &addr, &addrlen, error_string)){
+ (struct sockaddr*)&addr, &addrlen, error_string)){
const char *alias;
const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2299) | jonas | 7 Apr |