#At file:///home/jonas/src/telco-7.0/ based on revid:jonas@stripped
4194 jonas oreland 2011-02-15
ndb - add support for 2-pass initial node restart copy
modified:
mysql-test/include/default_ndbd.cnf
storage/ndb/include/kernel/signaldata/CopyFrag.hpp
storage/ndb/include/kernel/signaldata/StartFragReq.hpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
storage/ndb/include/ndb_version.h.in
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/test/run-test/conf-dl145a.cnf
storage/ndb/test/run-test/conf-ndb07.cnf
=== modified file 'mysql-test/include/default_ndbd.cnf'
--- a/mysql-test/include/default_ndbd.cnf 2010-01-22 14:19:56 +0000
+++ b/mysql-test/include/default_ndbd.cnf 2011-02-15 11:41:27 +0000
@@ -14,6 +14,7 @@ TimeBetweenEpochs= 100
NoOfFragmentLogFiles= 4
FragmentLogFileSize= 12M
DiskPageBufferMemory= 4M
+TwoPassInitialNodeRestartCopy=1
# O_DIRECT has issues on 2.4 whach have not been handled, Bug #29612
#ODirect= 1
=== modified file 'storage/ndb/include/kernel/signaldata/CopyFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/CopyFrag.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/CopyFrag.hpp 2011-02-15 11:41:27 +0000
@@ -32,11 +32,23 @@ class CopyFragReq {
*/
friend class Dblqh;
public:
- STATIC_CONST( SignalLength = 10 );
+ STATIC_CONST( SignalLength = 11 );
private:
- Uint32 userPtr;
- Uint32 userRef;
+
+ enum
+ {
+ CFR_TRANSACTIONAL = 1, // Copy rows >= gci in transactional fashion
+ CFR_NON_TRANSACTIONAL = 2 // Copy rows <= gci in non transactional fashion
+ };
+ union {
+ Uint32 userPtr;
+ Uint32 senderData;
+ };
+ union {
+ Uint32 userRef;
+ Uint32 senderRef;
+ };
Uint32 tableId;
Uint32 fragId;
Uint32 nodeId;
@@ -46,6 +58,7 @@ private:
Uint32 nodeCount;
Uint32 nodeList[1];
//Uint32 maxPage; is stored in nodeList[nodeCount]
+ //Uint32 requestInfo is stored after maxPage
};
class CopyFragConf {
@@ -62,7 +75,10 @@ public:
STATIC_CONST( SignalLength = 7 );
private:
- Uint32 userPtr;
+ union {
+ Uint32 userPtr;
+ Uint32 senderData;
+ };
Uint32 sendingNodeId;
Uint32 startingNodeId;
Uint32 tableId;
=== modified file 'storage/ndb/include/kernel/signaldata/StartFragReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/StartFragReq.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/StartFragReq.hpp 2011-02-15 11:41:27 +0000
@@ -32,10 +32,16 @@ class StartFragReq {
*/
friend class Dblqh;
public:
- STATIC_CONST( SignalLength = 19 );
+ STATIC_CONST( SignalLength = 20 );
friend bool printSTART_FRAG_REQ(FILE *, const Uint32 *, Uint32, Uint16);
+ enum
+ {
+ SFR_RESTORE_LCP = 1,
+ SFR_COPY_FRAG = 2
+ };
+
Uint32 userPtr;
Uint32 userRef;
Uint32 lcpNo;
@@ -46,5 +52,6 @@ public:
Uint32 lqhLogNode[4];
Uint32 startGci[4];
Uint32 lastGci[4];
+ Uint32 requestInfo;
};
#endif
=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2011-02-15 11:41:27 +0000
@@ -179,6 +179,8 @@
#define CFG_DB_NUMA 614
#define CFG_DB_LATE_ALLOC 615
+#define CFG_DB_2PASS_INR 616
+
#define CFG_NODE_ARBIT_RANK 200
#define CFG_NODE_ARBIT_DELAY 201
#define CFG_RESERVED_SEND_BUFFER_MEMORY 202
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2011-02-03 14:20:36 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2011-02-15 11:41:27 +0000
@@ -574,4 +574,24 @@ ndbd_sync_req_support(Uint32 x)
return x >= NDBD_SYNC_REQ_SUPPORT_71;
}
+/**
+ * Does not support CopyFragReq::CFR_NON_TRANSACTIONAL
+ */
+#define NDBD_NON_TRANS_COPY_FRAG_REQ_70 NDB_MAKE_VERSION(7,0,22)
+#define NDBD_NON_TRANS_COPY_FRAG_REQ_71 NDB_MAKE_VERSION(7,1,11)
+
+static
+inline
+int
+ndbd_non_trans_copy_frag_req(Uint32 x)
+{
+ const Uint32 major = (x >> 16) & 0xFF;
+ const Uint32 minor = (x >> 8) & 0xFF;
+
+ if (major == 7 && minor == 0)
+ return x >= NDBD_NON_TRANS_COPY_FRAG_REQ_70;
+
+ return x >= NDBD_NON_TRANS_COPY_FRAG_REQ_71;
+}
+
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-02-15 11:41:27 +0000
@@ -1857,6 +1857,8 @@ private:
return instanceKey;
}
Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
+
+ bool c_2pass_inr;
};
#if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2011-02-15 11:41:27 +0000
@@ -70,6 +70,7 @@ void Dbdih::initData()
cntrlblockref = RNIL;
c_set_initial_start_flag = FALSE;
c_sr_wait_to = false;
+ c_2pass_inr = false;
}//Dbdih::initData()
void Dbdih::initRecords()
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-02-03 14:20:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-02-15 11:41:27 +0000
@@ -1023,7 +1023,6 @@ void Dbdih::execFSCLOSEREF(Signal* signa
sprintf(msg, "File system close failed during FileRecord status %d", (Uint32)status);
fsRefError(signal,__LINE__,msg);
}
-
return;
}//Dbdih::execFSCLOSEREF()
@@ -1325,6 +1324,13 @@ void Dbdih::execREAD_CONFIG_REQ(Signal*
initRecords();
initialiseRecordsLab(signal, 0, ref, senderData);
+ {
+ Uint32 val = 0;
+ ndb_mgm_get_int_parameter(p, CFG_DB_2PASS_INR,
+ &val);
+ c_2pass_inr = val ? true : false;
+ }
+
/**
* Set API assigned nodegroup(s)
*/
@@ -1368,7 +1374,6 @@ void Dbdih::execREAD_CONFIG_REQ(Signal*
}
}
}
-
return;
}//Dbdih::execSIZEALT_REP()
@@ -1980,7 +1985,33 @@ void Dbdih::execREAD_NODESCONF(Signal* s
}//if
}//for
nodeArray[index] = RNIL; // terminate
-
+
+ if (c_2pass_inr)
+ {
+ jam();
+ printf("Checking 2-pass initial node restart: ");
+ for (i = 0; i<index; i++)
+ {
+ if (!ndbd_non_trans_copy_frag_req(getNodeInfo(nodeArray[i]).m_version))
+ {
+ jam();
+ c_2pass_inr = false;
+ printf("not ok (node %u) => disabled\n", nodeArray[i]);
+ break;
+ }
+ }
+ if (c_2pass_inr)
+ printf("ok\n");
+
+ /**
+ * Note: In theory it would be ok for just nodes that we plan to copy from
+ * supported this...but in e.g a 3/4-replica scenario,
+ * if one of the nodes does, and the other doesnt, we don't
+ * have enought infrastructure to easily check this...
+ * therefor we require all nodes to support it.
+ */
+ }
+
if(cstarttype == NodeState::ST_SYSTEM_RESTART ||
cstarttype == NodeState::ST_NODE_RESTART)
{
@@ -3646,32 +3677,40 @@ Dbdih::nr_start_fragment(Signal* signal,
Uint32 gci = 0;
Uint32 restorableGCI = takeOverPtr.p->restorableGci;
-
+
+#if defined VM_TRACE || defined ERROR_INSERT
ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
takeOverPtr.p->toCurrentTabref,
takeOverPtr.p->toCurrentFragid,
replicaPtr.p->nextLcp);
-
+#endif
+
Int32 j = replicaPtr.p->noCrashedReplicas - 1;
Uint32 idx = prevLcpNo(replicaPtr.p->nextLcp);
for(i = 0; i<MAX_LCP_USED; i++, idx = prevLcpNo(idx))
{
+#if defined VM_TRACE || defined ERROR_INSERT
printf("scanning idx: %d lcpId: %d crashed replicas: %u %s",
idx, replicaPtr.p->lcpId[idx],
replicaPtr.p->noCrashedReplicas,
replicaPtr.p->lcpStatus[idx] == ZVALID ? "VALID" : "NOT VALID");
+#endif
if (replicaPtr.p->lcpStatus[idx] == ZVALID)
{
Uint32 startGci = replicaPtr.p->maxGciCompleted[idx] + 1;
Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+#if defined VM_TRACE || defined ERROR_INSERT
ndbout_c(" maxGciCompleted: %u maxGciStarted: %u", startGci - 1, stopGci);
+#endif
for (; j>= 0; j--)
{
+#if defined VM_TRACE || defined ERROR_INSERT
ndbout_c("crashed replica: %d(%d) replica(createGci: %u lastGci: %d )",
j,
replicaPtr.p->noCrashedReplicas,
replicaPtr.p->createGci[j],
replicaPtr.p->replicaLastGci[j]);
+#endif
if (replicaPtr.p->createGci[j] <= startGci &&
replicaPtr.p->replicaLastGci[j] >= stopGci)
{
@@ -3684,23 +3723,29 @@ Dbdih::nr_start_fragment(Signal* signal,
}
else
{
+#if defined VM_TRACE || defined ERROR_INSERT
printf("\n");
+#endif
}
}
idx = 2; // backward compat code
+#if defined VM_TRACE || defined ERROR_INSERT
ndbout_c("- scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+#endif
if (replicaPtr.p->lcpStatus[idx] == ZVALID)
{
Uint32 startGci = replicaPtr.p->maxGciCompleted[idx] + 1;
Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
for (;j >= 0; j--)
{
+#if defined VM_TRACE || defined ERROR_INSERT
ndbout_c("crashed replica: %d(%d) replica(createGci: %u lastGci: %d )",
j,
replicaPtr.p->noCrashedReplicas,
replicaPtr.p->createGci[j],
replicaPtr.p->replicaLastGci[j]);
+#endif
if (replicaPtr.p->createGci[j] <= startGci &&
replicaPtr.p->replicaLastGci[j] >= stopGci)
{
@@ -3714,17 +3759,17 @@ Dbdih::nr_start_fragment(Signal* signal,
done:
+ StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+ req->requestInfo = StartFragReq::SFR_RESTORE_LCP;
if (maxLcpIndex == ~ (Uint32) 0)
{
+ /**
+ * we didn't find a local LCP that we can restore
+ */
jam();
ndbassert(gci == 0);
replicaPtr.p->m_restorable_gci = gci;
- ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
- takeOverPtr.p->toStartingNode,
- takeOverPtr.p->toCurrentTabref,
- takeOverPtr.p->toCurrentFragid);
- StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
req->userPtr = 0;
req->userRef = reference();
req->lcpNo = ZNIL;
@@ -3733,6 +3778,60 @@ done:
req->fragId = takeOverPtr.p->toCurrentFragid;
req->noOfLogNodes = 0;
+ if (c_2pass_inr)
+ {
+ /**
+ * Check if we can make 2-phase copy
+ * 1) non-transaction, (after we rebuild indexes)
+ * 2) transaction (maintaining indexes during rebuild)
+ * where the transactional copies efterything >= startGci
+ *
+ * NOTE: c_2pass_inr is only set if all nodes in cluster currently
+ * supports this
+ */
+
+ if (takeOverPtr.p->startGci == 0)
+ {
+ jam();
+ /**
+ * Set a startGci to currently lastCompletedGCI of master
+ * any value will do...as long as subsequent transactinal copy
+ * will be using it (scanning >= this value)
+ */
+ takeOverPtr.p->startGci = SYSFILE->lastCompletedGCI[cmasterNodeId];
+ }
+
+ TabRecordPtr tabPtr;
+ tabPtr.i = takeOverPtr.p->toCurrentTabref;
+ ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
+
+ FragmentstorePtr fragPtr;
+ getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
+ Uint32 nodes[MAX_REPLICAS];
+ extractNodeInfo(fragPtr.p, nodes);
+
+ req->lqhLogNode[0] = nodes[0]; // Source
+ req->requestInfo = StartFragReq::SFR_COPY_FRAG;
+ replicaPtr.p->m_restorable_gci = takeOverPtr.p->startGci;
+ }
+
+ if (req->requestInfo == StartFragReq::SFR_RESTORE_LCP)
+ {
+ ndbout_c("node: %d tab: %d frag: %d no lcp to restore",
+ takeOverPtr.p->toStartingNode,
+ takeOverPtr.p->toCurrentTabref,
+ takeOverPtr.p->toCurrentFragid);
+ }
+ else
+ {
+ ndbout_c("node: %d tab: %d frag: %d copying data from %u (gci: %u)",
+ takeOverPtr.p->toStartingNode,
+ takeOverPtr.p->toCurrentTabref,
+ takeOverPtr.p->toCurrentFragid,
+ req->lqhLogNode[0],
+ req->lcpId);
+ }
+
BlockReference ref = numberToRef(DBLQH, takeOverPtr.p->toStartingNode);
sendSignal(ref, GSN_START_FRAGREQ, signal,
StartFragReq::SignalLength, JBB);
@@ -3748,17 +3847,19 @@ done:
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
-
dump_replica_info(fragPtr.p);
}
ndbassert(gci == restorableGCI);
replicaPtr.p->m_restorable_gci = gci;
Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
- ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d) newestRestorableGCI: %d",
+ ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
+ takeOverPtr.p->toStartingNode,
+ takeOverPtr.p->toCurrentTabref,
+ takeOverPtr.p->toCurrentFragid,
maxLcpId,
- maxLcpIndex,
+ maxLcpIndex,
replicaPtr.p->maxGciStarted[maxLcpIndex],
- replicaPtr.p->maxGciCompleted[maxLcpIndex],
+ replicaPtr.p->maxGciCompleted[maxLcpIndex],
restorableGCI,
SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
SYSFILE->newestRestorableGCI);
@@ -4238,7 +4339,8 @@ Dbdih::toStartCopyFrag(Signal* signal, T
extractNodeInfo(fragPtr.p,
copyFragReq->nodeList);
copyFragReq->nodeList[len] = takeOverPtr.p->maxPage;
- sendSignal(ref, GSN_COPY_FRAGREQ, signal,
+ copyFragReq->nodeList[len+1] = CopyFragReq::CFR_TRANSACTIONAL;
+ sendSignal(ref, GSN_COPY_FRAGREQ, signal,
CopyFragReq::SignalLength + len, JBB);
}//Dbdih::toStartCopy()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-02-15 11:41:27 +0000
@@ -2246,6 +2246,8 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
+ void execCOPY_FRAGREF(Signal* signal);
+ void execCOPY_FRAGCONF(Signal* signal);
void execPREPARE_COPY_FRAG_REQ(Signal* signal);
void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
@@ -2643,6 +2645,7 @@ private:
void restartOperationsAfterStopLab(Signal* signal);
void startphase1Lab(Signal* signal, Uint32 config, Uint32 nodeId);
void tupkeyConfLab(Signal* signal);
+ void copyTupkeyRefLab(Signal* signal);
void copyTupkeyConfLab(Signal* signal);
void scanTupkeyConfLab(Signal* signal);
void scanTupkeyRefLab(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-02-15 11:41:27 +0000
@@ -348,6 +348,8 @@ Dblqh::Dblqh(Block_context& ctx, Uint32
addRecSignal(GSN_STORED_PROCCONF, &Dblqh::execSTORED_PROCCONF);
addRecSignal(GSN_STORED_PROCREF, &Dblqh::execSTORED_PROCREF);
addRecSignal(GSN_COPY_FRAGREQ, &Dblqh::execCOPY_FRAGREQ);
+ addRecSignal(GSN_COPY_FRAGREF, &Dblqh::execCOPY_FRAGREF);
+ addRecSignal(GSN_COPY_FRAGCONF, &Dblqh::execCOPY_FRAGCONF);
addRecSignal(GSN_COPY_ACTIVEREQ, &Dblqh::execCOPY_ACTIVEREQ);
addRecSignal(GSN_COPY_STATEREQ, &Dblqh::execCOPY_STATEREQ);
addRecSignal(GSN_LQH_TRANSREQ, &Dblqh::execLQH_TRANSREQ);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-01-31 17:37:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-02-15 11:41:27 +0000
@@ -3520,7 +3520,7 @@ void Dblqh::execTUPKEYREF(Signal* signal
abortErrorLab(signal);
break;
case TcConnectionrec::COPY_TUPKEY:
- ndbrequire(false);
+ copyTupkeyRefLab(signal);
break;
case TcConnectionrec::SCAN_TUPKEY:
jam();
@@ -11806,8 +11806,6 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
ndbrequire(cfirstfreeTcConrec != RNIL);
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
- Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
-
Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50;
@@ -11821,12 +11819,29 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
}
Uint32 maxPage = copyFragReq->nodeList[nodeCount];
Uint32 version = getNodeInfo(refToNode(userRef)).m_version;
+ Uint32 requestInfo = copyFragReq->nodeList[nodeCount + 1];
if (ndb_check_prep_copy_frag_version(version) < 2)
{
jam();
maxPage = RNIL;
}
-
+
+ if (signal->getLength() < CopyFragReq::SignalLength + nodeCount)
+ {
+ jam();
+ requestInfo = CopyFragReq::CFR_TRANSACTIONAL;
+ }
+
+ if (requestInfo == CopyFragReq::CFR_NON_TRANSACTIONAL)
+ {
+ jam();
+ }
+ else
+ {
+ fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
+ }
+ Uint32 key = fragptr.p->fragDistributionKey;
+
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
/**
@@ -11842,7 +11857,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
CopyFragConf::SignalLength, JBB);
return;
}//if
-
+
LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
ndbrequire(m_reserved_scans.first(scanptr));
m_reserved_scans.remove(scanptr);
@@ -11882,6 +11897,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
scanptr.p->scanLockHold = ZFALSE;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
+ scanptr.p->readCommitted = 0;
initScanTc(0,
0,
@@ -11905,10 +11921,30 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
req->tableId = tabptr.i;
req->fragmentNo = fragId;
req->requestInfo = 0;
- AccScanReq::setLockMode(req->requestInfo, 0);
- AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
- AccScanReq::setNRScanFlag(req->requestInfo, 1);
- AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
+
+ if (requestInfo == CopyFragReq::CFR_TRANSACTIONAL)
+ {
+ jam();
+ /**
+ * An node-recovery scan, is shared lock
+ * and may not perform disk-scan (as it then can miss uncomitted inserts)
+ */
+ AccScanReq::setLockMode(req->requestInfo, 0);
+ AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
+ AccScanReq::setNRScanFlag(req->requestInfo, 1);
+ AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
+ }
+ else
+ {
+ jam();
+ /**
+ * The non-transaction scan is really only a "normal" tup scan
+ * committed read, and don't disable disk-scan
+ */
+ AccScanReq::setLockMode(req->requestInfo, 0);
+ AccScanReq::setReadCommittedFlag(req->requestInfo, 1);
+ scanptr.p->readCommitted = 1;
+ }
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
@@ -12235,6 +12271,51 @@ void Dblqh::execTRANSID_AI(Signal* signa
/*--------------------------------------------------------------------------*/
/* PRECONDITION: TRANSACTION_STATE = COPY_TUPKEY */
/*--------------------------------------------------------------------------*/
+void Dblqh::copyTupkeyRefLab(Signal* signal)
+{
+ //const TupKeyRef * tupKeyRef = (TupKeyRef *)signal->getDataPtr();
+
+ scanptr.i = tcConnectptr.p->tcScanRec;
+ c_scanRecordPool.getPtr(scanptr);
+ ScanRecord* scanP = scanptr.p;
+
+ if (scanP->readCommitted == 0)
+ {
+ jam();
+ ndbrequire(false); // Should not be possibe...we read with lock
+ }
+ else
+ {
+ jam();
+ /**
+ * Any readCommitted scan, can get 626 if it finds a candidate record
+ * that is not visible to the scan (i.e uncommitted inserts)
+ * if scanning with locks (shared/exclusive) this is not visible
+ * to LQH as lock is taken earlier
+ */
+ ndbrequire(terrorCode == 626);
+ }
+
+ ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_TUPKEY_COPY);
+ if (tcConnectptr.p->errorCode != 0)
+ {
+ jam();
+ closeCopyLab(signal);
+ return;
+ }
+
+ if (scanptr.p->scanCompletedStatus == ZTRUE)
+ {
+ jam();
+ closeCopyLab(signal);
+ return;
+ }
+
+ ndbrequire(tcConnectptr.p->copyCountWords < cmaxWordsAtNodeRec);
+ scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
+ nextRecordCopy(signal);
+}
+
void Dblqh::copyTupkeyConfLab(Signal* signal)
{
const TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtr();
@@ -12245,10 +12326,14 @@ void Dblqh::copyTupkeyConfLab(Signal* si
c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p;
- Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, 0, false);
- ndbassert(accOpPtr != (Uint32)-1);
- c_acc->execACCKEY_ORD(signal, accOpPtr);
-
+ if (scanP->readCommitted == 0)
+ {
+ jam();
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, 0, false);
+ ndbassert(accOpPtr != (Uint32)-1);
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+ }
+
if (tcConnectptr.p->errorCode != 0) {
jam();
closeCopyLab(signal);
@@ -16242,6 +16327,12 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
Uint32 lcpNo = startFragReq->lcpNo;
Uint32 noOfLogNodes = startFragReq->noOfLogNodes;
Uint32 lcpId = startFragReq->lcpId;
+ Uint32 requestInfo = startFragReq->requestInfo;
+ if (signal->getLength() < StartFragReq::SignalLength)
+ {
+ jam();
+ requestInfo = StartFragReq::SFR_RESTORE_LCP;
+ }
bool doprint = false;
#ifdef ERROR_INSERT
@@ -16275,6 +16366,16 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
fragptr.p->logFlag = Fragrecord::STATE_FALSE;
fragptr.p->srStatus = Fragrecord::SS_IDLE;
+ if (requestInfo == StartFragReq::SFR_COPY_FRAG)
+ {
+ ndbrequire(lcpNo == ZNIL);
+ Uint32 n = fragptr.p->srLqhLognode[0] = startFragReq->lqhLogNode[0]; // src
+ ndbrequire(ndbd_non_trans_copy_frag_req(getNodeInfo(n).m_version));
+
+ // Magic no, meaning to COPY_FRAGREQ instead of read from disk
+ fragptr.p->srChkpnr = Z8NIL;
+ }
+
if (noOfLogNodes > 0)
{
jam();
@@ -16300,7 +16401,11 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
}
}//if
- if (lcpNo == ZNIL)
+ if (requestInfo == StartFragReq::SFR_COPY_FRAG)
+ {
+ jam();
+ }
+ else if (lcpNo == ZNIL)
{
jam();
/**
@@ -16326,15 +16431,19 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
jam();
c_tup->disk_restart_lcp_id(tabptr.i, fragId, lcpId);
jamEntry();
- }
- if (ERROR_INSERTED(5055))
- {
- ndbrequire(c_lcpId == 0 || lcpId == 0 || c_lcpId == lcpId);
+ if (ERROR_INSERTED(5055))
+ {
+ ndbrequire(c_lcpId == 0 || lcpId == 0 || c_lcpId == lcpId);
+ }
+
+ /**
+ * Keep track of minimal lcp-id
+ */
+ c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
+ c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
}
- c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
- c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
c_lcp_waiting_fragments.add(fragptr);
if(c_lcp_restoring_fragments.isEmpty())
send_restore_lcp(signal);
@@ -16346,18 +16455,85 @@ Dblqh::send_restore_lcp(Signal * signal)
c_lcp_waiting_fragments.first(fragptr);
c_lcp_waiting_fragments.remove(fragptr);
c_lcp_restoring_fragments.add(fragptr);
-
- RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtrSend();
- req->senderData = fragptr.i;
- req->senderRef = reference();
- req->tableId = fragptr.p->tabRef;
- req->fragmentId = fragptr.p->fragId;
- req->lcpNo = fragptr.p->srChkpnr;
- req->lcpId = fragptr.p->lcpId[fragptr.p->srChkpnr];
-
- BlockReference restoreRef = calcInstanceBlockRef(RESTORE);
- sendSignal(restoreRef, GSN_RESTORE_LCP_REQ, signal,
- RestoreLcpReq::SignalLength, JBB);
+
+ if (fragptr.p->srChkpnr != Z8NIL)
+ {
+ jam();
+ RestoreLcpReq* req= (RestoreLcpReq*)signal->getDataPtrSend();
+ req->senderData = fragptr.i;
+ req->senderRef = reference();
+ req->tableId = fragptr.p->tabRef;
+ req->fragmentId = fragptr.p->fragId;
+ req->lcpNo = fragptr.p->srChkpnr;
+ req->lcpId = fragptr.p->lcpId[fragptr.p->srChkpnr];
+ BlockReference restoreRef = calcInstanceBlockRef(RESTORE);
+ sendSignal(restoreRef, GSN_RESTORE_LCP_REQ, signal,
+ RestoreLcpReq::SignalLength, JBB);
+ }
+ else
+ {
+ jam();
+
+ tabptr.i = fragptr.p->tabRef;
+ ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+
+ fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
+ CopyFragReq * req = CAST_PTR(CopyFragReq, signal->getDataPtrSend());
+ req->senderData = fragptr.i;
+ req->senderRef = reference();
+ req->tableId = fragptr.p->tabRef;
+ req->fragId = fragptr.p->fragId;
+ req->nodeId = getOwnNodeId();
+ req->schemaVersion = tabptr.p->schemaVersion;
+ req->distributionKey = 0;
+ req->gci = fragptr.p->lcpId[0];
+ req->nodeCount = 0;
+ req->nodeList[1] = CopyFragReq::CFR_NON_TRANSACTIONAL;
+ Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+ BlockReference ref = numberToRef(DBLQH, instanceKey,
+ fragptr.p->srLqhLognode[0]);
+
+ sendSignal(ref, GSN_COPY_FRAGREQ, signal,
+ CopyFragReq::SignalLength, JBB);
+
+ }
+}
+
+void
+Dblqh::execCOPY_FRAGREF(Signal* signal)
+{
+ jamEntry();
+ ndbrequire(false);
+}
+
+void
+Dblqh::execCOPY_FRAGCONF(Signal* signal)
+{
+ jamEntry();
+ {
+ const CopyFragConf* conf = CAST_CONSTPTR(CopyFragConf,
+ signal->getDataPtr());
+ c_fragment_pool.getPtr(fragptr, conf->senderData);
+ fragptr.p->fragStatus = Fragrecord::CRASH_RECOVERING;
+
+ Uint32 rows_lo = conf->rows_lo;
+ Uint32 bytes_lo = conf->bytes_lo;
+ signal->theData[0] = NDB_LE_NR_CopyFragDone;
+ signal->theData[1] = getOwnNodeId();
+ signal->theData[2] = fragptr.p->tabRef;
+ signal->theData[3] = fragptr.p->fragId;
+ signal->theData[4] = rows_lo;
+ signal->theData[5] = 0;
+ signal->theData[6] = bytes_lo;
+ signal->theData[7] = 0;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
+ }
+
+ {
+ RestoreLcpConf* conf= (RestoreLcpConf*)signal->getDataPtr();
+ conf->senderData = fragptr.i;
+ execRESTORE_LCP_CONF(signal);
+ }
}
void Dblqh::startFragRefLab(Signal* signal)
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2011-02-15 11:41:27 +0000
@@ -1911,6 +1911,21 @@ const ConfigInfo::ParamInfo ConfigInfo::
"1" /* Max */
},
+
+ {
+ CFG_DB_2PASS_INR,
+ "TwoPassInitialNodeRestartCopy",
+ DB_TOKEN,
+ "Copy data in 2 passes for initial node restart, "
+ "this enables multi-threaded-ordered index build for initial node restart",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_BOOL,
+ "false",
+ "false", /* Min */
+ "true" /* Max */
+ },
+
/***************************************************************************
* API
***************************************************************************/
=== modified file 'storage/ndb/test/run-test/conf-dl145a.cnf'
--- a/storage/ndb/test/run-test/conf-dl145a.cnf 2009-02-17 09:26:44 +0000
+++ b/storage/ndb/test/run-test/conf-dl145a.cnf 2011-02-15 11:41:27 +0000
@@ -31,3 +31,4 @@ ODirect=1
SharedGlobalMemory=256M
InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:128M
InitialTablespace=datafile01.dat:128M;datafile02.dat:64M
+TwoPassInitialNodeRestartCopy=1
=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf 2009-09-21 08:44:14 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf 2011-02-15 11:41:27 +0000
@@ -39,3 +39,4 @@ FileSystemPathDataFiles=/data1/autotest
FileSystemPathUndoFiles=/data2/autotest
InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:256M
InitialTablespace=datafile01.dat:256M;datafile02.dat:256M
+TwoPassInitialNodeRestartCopy=1
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20110215114127-83183eu4vvqf6ddn.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (jonas:4194) | jonas oreland | 15 Feb |