#At file:///home/jonas/src/telco-6.4/
2937 Jonas Oreland 2009-03-13
ndb - bug#43108 - redo DblqhProxy handling of LCP
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2009-03-13 06:59:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2009-03-13 07:49:42 +0000
@@ -12029,6 +12029,8 @@ void Dblqh::execEMPTY_LCP_REQ(Signal* si
CRASH_INSERTION(5008);
EmptyLcpReq * const emptyLcpOrd = (EmptyLcpReq*)&signal->theData[0];
+ ndbrequire(!isNdbMtLqh()); // Handled by DblqhProxy
+
lcpPtr.i = 0;
ptrAss(lcpPtr, lcpRecord);
@@ -12201,21 +12203,28 @@ void Dblqh::execLCP_PREPARE_REF(Signal*
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
lcpPtr.p->firstFragmentFlag= false;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
- }
- /**
- * First fragment mean that last LCP is complete :-)
- */
- if (!isNdbMtLqh()) {
+ /**
+ * First fragment mean that last LCP is complete :-)
+ */
+ jam();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD,
signal, signal->length(), 0);
jamEntry();
}
+ else
+ {
+ /**
+ * Handle by LqhProxy
+ */
+ }
}
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
@@ -12261,21 +12270,28 @@ void Dblqh::execLCP_PREPARE_CONF(Signal*
lcpPtr.p->firstFragmentFlag= false;
// proxy is used in MT LQH to handle also the extra pgman worker
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
- }
- /**
- * First fragment mean that last LCP is complete :-)
- */
- if (!isNdbMtLqh()) {
+ /**
+ * First fragment mean that last LCP is complete :-)
+ */
+ jam();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD,
signal, signal->length(), 0);
jamEntry();
}
+ else
+ {
+ /**
+ * Handled by proxy
+ */
+ }
}
if (lcpPtr.p->m_error)
@@ -12533,13 +12549,9 @@ void Dblqh::sendEMPTY_LCP_CONF(Signal* s
rep->lcpId = c_lcpId;
}
- if (!isNdbMtLqh())
- {
- jam();
- lcpPtr.p->m_EMPTY_LCP_REQ.copyto(NdbNodeBitmask::Size, sig->receiverGroup);
- sendSignal(DBDIH_REF, GSN_EMPTY_LCP_REP, signal,
- EmptyLcpRep::SignalLength + EmptyLcpConf::SignalLength, JBB);
- }
+ lcpPtr.p->m_EMPTY_LCP_REQ.copyto(NdbNodeBitmask::Size, sig->receiverGroup);
+ sendSignal(DBDIH_REF, GSN_EMPTY_LCP_REP, signal,
+ EmptyLcpRep::SignalLength + EmptyLcpConf::SignalLength, JBB);
lcpPtr.p->reportEmpty = false;
lcpPtr.p->m_EMPTY_LCP_REQ.clear();
@@ -12553,6 +12565,10 @@ void Dblqh::completeLcpRoundLab(Signal*
{
clcpCompletedState = LCP_CLOSE_STARTED;
+ lcpPtr.i = 0;
+ ptrAss(lcpPtr, lcpRecord);
+ lcpPtr.p->m_outstanding = 0;
+
EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
req->senderData= lcpPtr.i;
req->senderRef= reference();
@@ -12560,43 +12576,31 @@ void Dblqh::completeLcpRoundLab(Signal*
req->backupId= lcpId;
BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+
+ lcpPtr.p->m_outstanding++;
sendSignal(backupRef, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
+ lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
- } else {
- jam();
- req->proxyBlockNo = PGMAN;
- sendSignal(DBLQH_REF, GSN_END_LCP_REQ, signal,
- EndLcpReq::SignalLength + 1, JBB);
- }
- if (!isNdbMtLqh()) {
+ lcpPtr.p->m_outstanding++;
sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
- } else {
- jam();
- req->proxyBlockNo = LGMAN;
- sendSignal(DBLQH_REF, GSN_END_LCP_REQ,
- signal, EndLcpReq::SignalLength + 1, JBB);
- }
- if (!isNdbMtLqh()) {
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ,
signal, EndLcpReq::SignalLength, 0);
- jamEntry();
- } else {
- jam();
- req->proxyBlockNo = TSMAN;
- sendSignal(DBLQH_REF, GSN_END_LCP_REQ,
- signal, EndLcpReq::SignalLength + 1, JBB);
}
-
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
- lcpPtr.p->m_outstanding = 3;
+ else
+ {
+ /**
+ * This is all handled by LqhProxy
+ */
+ }
return;
}//Dblqh::completeLcpRoundLab()
@@ -15219,10 +15223,15 @@ void Dblqh::execRESTORE_LCP_CONF(Signal*
ptrAss(lcpPtr, lcpRecord);
lcpPtr.p->m_outstanding = 1;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
signal->theData[0] = c_lcpId;
sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
- } else {
+ }
+ else
+ {
+ jam();
signal->theData[0] = c_lcpId;
signal->theData[1] = LGMAN;
sendSignal(DBLQH_REF, GSN_START_RECREQ, signal, 2, JBB);
@@ -15294,10 +15303,15 @@ void Dblqh::execSTART_RECREQ(Signal* sig
ptrAss(lcpPtr, lcpRecord);
lcpPtr.p->m_outstanding = 1;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
signal->theData[0] = c_lcpId;
sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
- } else {
+ }
+ else
+ {
+ jam();
signal->theData[0] = c_lcpId;
signal->theData[1] = LGMAN;
sendSignal(DBLQH_REF, GSN_START_RECREQ, signal, 2, JBB);
@@ -15332,10 +15346,15 @@ void Dblqh::execSTART_RECCONF(Signal* si
case LGMAN:
jam();
lcpPtr.p->m_outstanding++;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
signal->theData[0] = c_lcpId;
sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
- } else {
+ }
+ else
+ {
+ jam();
signal->theData[0] = c_lcpId;
signal->theData[1] = TSMAN;
sendSignal(DBLQH_REF, GSN_START_RECREQ, signal, 2, JBB);
@@ -15395,10 +15414,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
* ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
* --------------------------------------------------------------------- */
signal->theData[0] = cownNodeid;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
NodeReceiverGroup rg(DBLQH, m_sr_nodes);
sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
- } else {
+ }
+ else
+ {
+ jam();
const Uint32 sz = NdbNodeBitmask::Size;
m_sr_nodes.copyto(sz, &signal->theData[1]);
sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
@@ -16896,10 +16920,15 @@ void Dblqh::srPhase3Comp(Signal* signal)
jamEntry();
signal->theData[0] = cownNodeid;
- if (!isNdbMtLqh()) {
+ if (!isNdbMtLqh())
+ {
+ jam();
NodeReceiverGroup rg(DBLQH, m_sr_nodes);
sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
- } else {
+ }
+ else
+ {
+ jam();
const Uint32 sz = NdbNodeBitmask::Size;
m_sr_nodes.copyto(sz, &signal->theData[1]);
sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2009-03-12 06:52:39 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2009-03-13 07:49:42 +0000
@@ -43,10 +43,11 @@ DblqhProxy::DblqhProxy(Block_context& ct
// GSN_LCP_FRAG_ORD
addRecSignal(GSN_LCP_FRAG_ORD, &DblqhProxy::execLCP_FRAG_ORD);
addRecSignal(GSN_LCP_FRAG_REP, &DblqhProxy::execLCP_FRAG_REP);
- addRecSignal(GSN_END_LCP_REQ, &DblqhProxy::execEND_LCP_REQ);
addRecSignal(GSN_END_LCP_CONF, &DblqhProxy::execEND_LCP_CONF);
addRecSignal(GSN_LCP_COMPLETE_REP, &DblqhProxy::execLCP_COMPLETE_REP);
+ addRecSignal(GSN_EMPTY_LCP_REQ, &DblqhProxy::execEMPTY_LCP_REQ);
+
// GSN_GCP_SAVEREQ
addRecSignal(GSN_GCP_SAVEREQ, &DblqhProxy::execGCP_SAVEREQ);
addRecSignal(GSN_GCP_SAVECONF, &DblqhProxy::execGCP_SAVECONF);
@@ -78,10 +79,6 @@ DblqhProxy::DblqhProxy(Block_context& ct
addRecSignal(GSN_LQH_TRANSREQ, &DblqhProxy::execLQH_TRANSREQ);
addRecSignal(GSN_LQH_TRANSCONF, &DblqhProxy::execLQH_TRANSCONF);
- // GSN_EMPTY_LCP_REQ
- addRecSignal(GSN_EMPTY_LCP_REQ, &DblqhProxy::execEMPTY_LCP_REQ);
- addRecSignal(GSN_EMPTY_LCP_CONF, &DblqhProxy::execEMPTY_LCP_CONF);
-
// GSN_SUB_GCP_COMPLETE_REP
addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
@@ -383,7 +380,14 @@ DblqhProxy::sendTAB_COMMITCONF(Signal* s
ssRelease<Ss_TAB_COMMITREQ>(ssId);
}
-// GSN_LCP_FRAG_ORD
+// LCP handling
+
+Uint32
+DblqhProxy::getNoOfOutstanding(const LcpRecord & rec) const
+{
+ ndbrequire(rec.m_lcp_frag_ord_cnt >= rec.m_lcp_frag_rep_cnt);
+ return rec.m_lcp_frag_ord_cnt - rec.m_lcp_frag_rep_cnt;
+}
void
DblqhProxy::execLCP_FRAG_ORD(Signal* signal)
@@ -392,18 +396,20 @@ DblqhProxy::execLCP_FRAG_ORD(Signal* sig
const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
const LcpFragOrd req_copy = *req;
- Uint32 ssId = getSsId(req);
- bool found = false;
- Ss_LCP_FRAG_ORD& ss = ssFindSeize<Ss_LCP_FRAG_ORD>(ssId, &found);
+ bool lcp_complete_ord = req->lastFragmentFlag;
- if (!found) {
+ if (c_lcpRecord.m_state == LcpRecord::L_IDLE)
+ {
jam();
D("LCP: start" << V(req->lcpId));
- ndbrequire(c_lcpRecord.m_idle);
- c_lcpRecord.m_idle = false;
+ c_lcpRecord.m_state = LcpRecord::L_STARTING;
c_lcpRecord.m_lcpId = req->lcpId;
- c_lcpRecord.m_frags = 0;
+ c_lcpRecord.m_lcp_frag_rep_cnt = 0;
+ c_lcpRecord.m_lcp_frag_ord_cnt = 0;
+ c_lcpRecord.m_complete_outstanding = 0;
+ c_lcpRecord.m_lastFragmentFlag = false;
+ c_lcpRecord.m_empty_lcp_req.clear();
// handle start of LCP in PGMAN and TSMAN
LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
@@ -413,71 +419,57 @@ DblqhProxy::execLCP_FRAG_ORD(Signal* sig
*req = req_copy;
EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD,
signal, LcpFragOrd::SignalLength);
- } else {
- jam();
- D("LCP: continue" << V(req->lcpId) << V(c_lcpRecord.m_frags));
- ndbrequire(!c_lcpRecord.m_idle);
- ndbrequire(c_lcpRecord.m_lcpId == req->lcpId);
- if (c_lcpRecord.m_frags == 0) {
- // first frag has not replied
+ c_lcpRecord.m_state = LcpRecord::L_RUNNING;
+ }
+
+ jam();
+ D("LCP: continue" << V(req->lcpId) << V(c_lcpRecord.m_lcp_frag_ord_cnt));
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
+ ndbrequire(c_lcpRecord.m_lcpId == req->lcpId);
+
+ if (lcp_complete_ord)
+ {
+ jam();
+ c_lcpRecord.m_lastFragmentFlag = true;
+ if (getNoOfOutstanding(c_lcpRecord) == 0)
+ {
jam();
- D("LCP: delay" << V(req->lcpId) << V(c_lcpRecord.m_frags));
- LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
- *req = req_copy;
- sendSignalWithDelay(reference(), GSN_LCP_FRAG_ORD,
- signal, 100, LcpFragOrd::SignalLength);
+ completeLCP_1(signal);
return;
}
- }
- if (req->lastFragmentFlag) {
- jam();
- D("LCP: complete" << V(req->lcpId));
- execLCP_COMPLETE_ORD(signal);
+ /**
+ * Wait for all LCP_FRAG_ORD/REP to complete
+ */
return;
}
- sendREQ(signal, ss);
-}
-
-void
-DblqhProxy::sendLCP_FRAG_ORD(Signal* signal, Uint32 ssId)
-{
- Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
- const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
-
- NdbLogPartInfo lpinfo(workerInstance(ss.m_worker));
- if (!lpinfo.partNoOwner(req->tableId, req->fragmentId)) {
+ else
+ {
jam();
- skipReq(ss);
- return;
+ c_lcpRecord.m_last_lcp_frag_ord = req_copy;
}
- if (!ss.m_active.get(ss.m_worker)) {
- jam();
- D("LCP: active" << V(ss.m_worker));
- ss.m_active.set(ss.m_worker);
- }
+ c_lcpRecord.m_lcp_frag_ord_cnt++;
- sendSignal(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
- signal, LcpFragOrd::SignalLength, JBB);
+ // Forward
+ Uint32 instance = getInstanceKey(req->tableId, req->fragmentId);
+ sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()),
+ GSN_LCP_FRAG_ORD, signal, LcpFragOrd::SignalLength, JBB);
}
-// increment frag count and pass through
void
DblqhProxy::execLCP_FRAG_REP(Signal* signal)
{
ndbrequire(signal->getLength() == LcpFragRep::SignalLength);
LcpFragRep* conf = (LcpFragRep*)signal->getDataPtr();
- Uint32 ssId = getSsId(conf);
- Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
- ndbrequire(!c_lcpRecord.m_idle);
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
ndbrequire(c_lcpRecord.m_lcpId == conf->lcpId);
- c_lcpRecord.m_frags++;
- D("LCP: rep" << V(conf->lcpId) << V(c_lcpRecord.m_frags));
+ c_lcpRecord.m_lcp_frag_rep_cnt++;
+ D("LCP: rep" << V(conf->lcpId) << V(c_lcpRecord.m_lcp_frag_rep_cnt));
/**
* But instead of broadcasting to all DIH's
@@ -486,194 +478,261 @@ DblqhProxy::execLCP_FRAG_REP(Signal* sig
conf->nodeId = LcpFragRep::BROADCAST_REQ;
sendSignal(DBDIH_REF, GSN_LCP_FRAG_REP,
signal, LcpFragRep::SignalLength, JBB);
-}
-
-// GSN_LCP_COMPLETE_ORD [ sub-op, fictional gsn ]
-void
-DblqhProxy::execLCP_COMPLETE_ORD(Signal* signal)
-{
- const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
- Uint32 ssId = getSsId(req);
- Ss_LCP_COMPLETE_ORD& ss = ssSeize<Ss_LCP_COMPLETE_ORD>(ssId);
- ss.m_req = *req;
-
- Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ssId);
- const Uint32 activeCount = ssLcp.m_active.count();
- D("LCP: complete" << V(activeCount));
- // database with no fragments is not handled
- ndbrequire(activeCount != 0);
-
- // seize END_LCP_REQ records
- Uint32 i;
- for (i = 0; i < ss.BlockCnt; i++) {
- EndLcpReq tmp;
- tmp.backupId = ss.m_req.lcpId;
- tmp.proxyBlockNo = ss.m_endLcp[i].m_blockNo;
- Uint32 ssIdEnd = getSsId(&tmp);
- Ss_END_LCP_REQ& ssEnd = ssSeize<Ss_END_LCP_REQ>(ssIdEnd);
- ss.m_endLcp[i].m_ssId = ssIdEnd;
- ssEnd.m_ssIdLcp = ssId;
-
- // set wait-for bitmask
- setMask(ssEnd, ssLcp.m_active);
+ if (c_lcpRecord.m_lastFragmentFlag)
+ {
+ jam();
+ /**
+ * lastFragmentFlag has arrived...
+ */
+ if (getNoOfOutstanding(c_lcpRecord) == 0)
+ {
+ jam();
+ /*
+ * and we have all fragments has been processed
+ */
+ completeLCP_1(signal);
+ }
+ return;
}
- sendREQ(signal, ss);
+ checkSendEMPTY_LCP_CONF(signal);
}
void
-DblqhProxy::sendLCP_COMPLETE_ORD(Signal* signal, Uint32 ssId)
+DblqhProxy::completeLCP_1(Signal* signal)
{
- Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
+ c_lcpRecord.m_state = LcpRecord::L_COMPLETING_1;
+ ndbrequire(c_lcpRecord.m_complete_outstanding == 0);
- LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
- *req = ss.m_req;
- sendSignal(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
- signal, LcpFragOrd::SignalLength, JBB);
+ /**
+ * send LCP_FRAG_ORD (lastFragmentFlag = true)
+ * to all LQH instances...
+ * they will reply with LCP_COMPLETE_REP
+ */
+ LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtrSend();
+ ord->lcpId = c_lcpRecord.m_lcpId;
+ ord->lastFragmentFlag = true;
+ for (Uint32 i = 0; i<c_workers; i++)
+ {
+ jam();
+ c_lcpRecord.m_complete_outstanding++;
+ sendSignal(workerRef(i), GSN_LCP_FRAG_ORD, signal,
+ LcpFragOrd::SignalLength, JBB);
+ }
+
+ /**
+ * send END_LCP_REQ to all pgman instances (except "extra" pgman)
+ * they will reply with END_LCP_CONF
+ */
+ EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
+ req->senderData= 0;
+ req->senderRef= reference();
+ req->backupPtr= 0;
+ req->backupId= c_lcpRecord.m_lcpId;
+ for (Uint32 i = 0; i<c_workers; i++)
+ {
+ jam();
+ c_lcpRecord.m_complete_outstanding++;
+ sendSignal(numberToRef(PGMAN, workerInstance(i), getOwnNodeId()),
+ GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB);
+ }
}
void
DblqhProxy::execLCP_COMPLETE_REP(Signal* signal)
{
- const LcpCompleteRep* conf = (const LcpCompleteRep*)signal->getDataPtr();
- Uint32 ssId = getSsId(conf);
- Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
- recvCONF(signal, ss);
+ jamEntry();
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1);
+ ndbrequire(c_lcpRecord.m_complete_outstanding);
+ c_lcpRecord.m_complete_outstanding--;
+
+ if (c_lcpRecord.m_complete_outstanding == 0)
+ {
+ jam();
+ completeLCP_2(signal);
+ return;
+ }
}
void
-DblqhProxy::sendLCP_COMPLETE_REP(Signal* signal, Uint32 ssId)
+DblqhProxy::execEND_LCP_CONF(Signal* signal)
{
- Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
- Uint32 i;
-
- if (!lastReply(ss))
- return;
-
- // verify the protocol
- for (i = 0; i < ss.BlockCnt; i++) {
- Uint32 ssIdEnd = ss.m_endLcp[i].m_ssId;
- Ss_END_LCP_REQ& ssEnd = ssFind<Ss_END_LCP_REQ>(ssIdEnd);
- const Uint32 rb = ssEnd.m_proxyBlockNo;
- switch (rb) {
- case PGMAN:
- ndbrequire(ssEnd.m_confcount == 1);
- break;
- case TSMAN:
- ndbrequire(ssEnd.m_confcount == 0);
- break;
- case LGMAN:
- ndbrequire(ssEnd.m_confcount == 1);
- break;
- default:
- ndbrequire(false);
- break;
- }
- }
+ jamEntry();
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1 ||
+ c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2 ||
+ c_lcpRecord.m_state == LcpRecord::L_COMPLETING_3);
-
- LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
- conf->nodeId = LcpFragRep::BROADCAST_REQ;
- conf->blockNo = DBLQH;
- conf->lcpId = ss.m_req.lcpId;
- sendSignal(DBDIH_REF, GSN_LCP_COMPLETE_REP,
- signal, LcpCompleteRep::SignalLength, JBB);
+ ndbrequire(c_lcpRecord.m_complete_outstanding);
+ c_lcpRecord.m_complete_outstanding--;
- for (i = 0; i < ss.BlockCnt; i++) {
+ if (c_lcpRecord.m_complete_outstanding == 0)
+ {
jam();
- Uint32 ssIdEnd = ss.m_endLcp[i].m_ssId;
- ssRelease<Ss_END_LCP_REQ>(ssIdEnd);
+ if (c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1)
+ {
+ jam();
+ completeLCP_2(signal);
+ return;
+ }
+ else if (c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2)
+ {
+ jam();
+ completeLCP_3(signal);
+ return;
+ }
+ else
+ {
+ jam();
+ sendLCP_COMPLETE_REP(signal);
+ return;
+ }
}
- ssRelease<Ss_LCP_COMPLETE_ORD>(ssId);
- ssRelease<Ss_LCP_FRAG_ORD>(ssId);
- c_lcpRecord.m_idle = true;
}
-// GSN_END_LCP_REQ [ sub-op ]
-
void
-DblqhProxy::execEND_LCP_REQ(Signal* signal)
+DblqhProxy::completeLCP_2(Signal* signal)
{
- ndbrequire(refToMain(signal->getSendersBlockRef()) == DBLQH);
-
- const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
- Uint32 rb = req->proxyBlockNo;
- ndbrequire(rb == PGMAN || rb == TSMAN || rb == LGMAN);
+ jamEntry();
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1);
+ c_lcpRecord.m_state = LcpRecord::L_COMPLETING_2;
- Uint32 ssId = getSsId(req);
- Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
+ EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
+ req->senderData= 0;
+ req->senderRef= reference();
+ req->backupPtr= 0;
+ req->backupId= c_lcpRecord.m_lcpId;
+ c_lcpRecord.m_complete_outstanding++;
- if (++ss.m_reqcount == 1) {
- ss.m_backupId = req->backupId;
- ss.m_proxyBlockNo = req->proxyBlockNo;
- } else {
- ndbrequire(ss.m_backupId == req->backupId);
- ndbrequire(ss.m_proxyBlockNo == req->proxyBlockNo);
- }
-
- // reversed roles
- recvCONF(signal, ss);
+ /**
+ * send to "extra" instance
+ * that will checkpoint extent-pages
+ */
+ // NOTE: ugly to use MaxLqhWorkers directly
+ Uint32 instance = MaxLqhWorkers + 1;
+ sendSignal(numberToRef(PGMAN, instance, getOwnNodeId()),
+ GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB);
}
+
void
-DblqhProxy::sendEND_LCP_REQ(Signal* signal, Uint32 ssId)
+DblqhProxy::completeLCP_3(Signal* signal)
{
- Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
+ jamEntry();
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2);
+ c_lcpRecord.m_state = LcpRecord::L_COMPLETING_3;
- const EndLcpReq* req = (const EndLcpReq*)signal->getDataPtr();
- ss.m_req[ss.m_worker] = *req;
+ /**
+ * And finally also checkpoint UNDO LOG
+ * and inform TSMAN that checkpoint is "complete"
+ */
+ EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
+ req->senderData= 0;
+ req->senderRef= reference();
+ req->backupPtr= 0;
+ req->backupId= c_lcpRecord.m_lcpId;
+
+ // no reply from this
+ sendSignal(TSMAN_REF, GSN_END_LCP_REQ, signal,
+ EndLcpReq::SignalLength, JBB);
- if (!lastReply(ss)) {
+ if (c_lcpRecord.m_lcp_frag_rep_cnt)
+ {
jam();
- return;
+ c_lcpRecord.m_complete_outstanding++;
+ sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
+ EndLcpReq::SignalLength, JBB);
}
-
+ else
{
- const Uint32 rb = ss.m_proxyBlockNo;
- EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
- req->senderData = ssId;
- req->senderRef = reference();
- req->backupPtr = 0;
- req->backupId = ss.m_backupId;
- EXECUTE_DIRECT(rb, GSN_END_LCP_REQ,
- signal, EndLcpReq::SignalLength, 0);
+ jam();
+ /**
+ * lgman does currently not like 0 fragments,
+ * cause then it does not get a LCP_FRAG_ORD
+ *
+ * this should change so that it gets this first (style)
+ */
+ sendLCP_COMPLETE_REP(signal);
}
}
void
-DblqhProxy::execEND_LCP_CONF(Signal* signal)
+DblqhProxy::sendLCP_COMPLETE_REP(Signal* signal)
{
- const EndLcpConf* req = (const EndLcpConf*)signal->getDataPtr();
- Uint32 ssId = getSsId(req);
- Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
- ndbrequire(ss.m_confcount == 0);
- ss.m_confcount++;
+ ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_3);
+
+ LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
+ conf->nodeId = LcpFragRep::BROADCAST_REQ;
+ conf->blockNo = DBLQH;
+ conf->lcpId = c_lcpRecord.m_lcpId;
+ sendSignal(DBDIH_REF, GSN_LCP_COMPLETE_REP,
+ signal, LcpCompleteRep::SignalLength, JBB);
- const Uint32 sb = refToMain(req->senderRef);
- ndbrequire(sb == PGMAN || sb == LGMAN);
+ c_lcpRecord.m_state = LcpRecord::L_IDLE;
+ checkSendEMPTY_LCP_CONF(signal);
+}
- // reversed roles
- sendREQ(signal, ss);
+void
+DblqhProxy::execEMPTY_LCP_REQ(Signal* signal)
+{
+ jam();
+
+ EmptyLcpReq * const req = (EmptyLcpReq*)&signal->theData[0];
+ Uint32 nodeId = refToNode(req->senderRef);
+ c_lcpRecord.m_empty_lcp_req.set(nodeId);
+ checkSendEMPTY_LCP_CONF(signal);
}
void
-DblqhProxy::sendEND_LCP_CONF(Signal* signal, Uint32 ssId)
+DblqhProxy::checkSendEMPTY_LCP_CONF_impl(Signal* signal)
{
- Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
- Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ss.m_ssIdLcp);
+ ndbrequire(!c_lcpRecord.m_empty_lcp_req.isclear());
+
+ EmptyLcpRep * rep = (EmptyLcpRep*)signal->getDataPtrSend();
+ EmptyLcpConf * conf = (EmptyLcpConf*)rep->conf;
- // workers handling no fragments sent no REQ and get no CONF
- if (!ssLcp.m_active.get(ss.m_worker)) {
+ switch(c_lcpRecord.m_state){
+ case LcpRecord::L_IDLE:
jam();
+ conf->idle = true;
+ break;
+ case LcpRecord::L_STARTING:
+ jam();
+ return;
+ case LcpRecord::L_RUNNING:{
+ jam();
+ if (getNoOfOutstanding(c_lcpRecord) == 0)
+ {
+ jam();
+ /**
+ * Given that we wait for all ongoing...
+ * we can simply return last LCP_FRAG_ORD sent to us
+ */
+ conf->tableId = c_lcpRecord.m_last_lcp_frag_ord.tableId;
+ conf->fragmentId = c_lcpRecord.m_last_lcp_frag_ord.fragmentId;
+ conf->lcpId = c_lcpRecord.m_last_lcp_frag_ord.lcpId;
+ conf->lcpNo = c_lcpRecord.m_last_lcp_frag_ord.lcpNo;
+ break;
+ }
return;
}
-
- EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
- conf->senderData = ss.m_req[ss.m_worker].senderData;
- conf->senderRef = reference();
- sendSignal(workerRef(ss.m_worker), GSN_END_LCP_CONF,
- signal, EndLcpConf::SignalLength, JBB);
+ case LcpRecord::L_COMPLETING_1:
+ jam();
+ case LcpRecord::L_COMPLETING_2:
+ jam();
+ case LcpRecord::L_COMPLETING_3:
+ jam();
+ return;
+ }
+
+ conf->senderNodeId = getOwnNodeId();
+
+ c_lcpRecord.m_empty_lcp_req.copyto(NdbNodeBitmask::Size, rep->receiverGroup);
+ sendSignal(DBDIH_REF, GSN_EMPTY_LCP_REP, signal,
+ EmptyLcpRep::SignalLength + EmptyLcpConf::SignalLength, JBB);
+
+ c_lcpRecord.m_empty_lcp_req.clear();
}
// GSN_GCP_SAVEREQ
@@ -1260,88 +1319,6 @@ DblqhProxy::sendLQH_TRANSCONF(Signal* si
ssRelease<Ss_LQH_TRANSREQ>(ssId);
}
-// GSN_EMPTY_LCP_REQ
-
-void
-DblqhProxy::execEMPTY_LCP_REQ(Signal* signal)
-{
- const EmptyLcpReq* req = (const EmptyLcpReq*)signal->getDataPtr();
- Ss_EMPTY_LCP_REQ& ss = ssSeize<Ss_EMPTY_LCP_REQ>(1);
- ss.m_req = *req;
- ndbrequire(signal->getLength() == EmptyLcpReq::SignalLength);
- sendREQ(signal, ss);
-}
-
-void
-DblqhProxy::sendEMPTY_LCP_REQ(Signal* signal, Uint32 ssId)
-{
- Ss_EMPTY_LCP_REQ& ss = ssFind<Ss_EMPTY_LCP_REQ>(ssId);
-
- EmptyLcpReq* req = (EmptyLcpReq*)signal->getDataPtrSend();
- *req = ss.m_req;
-
- req->senderRef = reference();
- sendSignal(workerRef(ss.m_worker), GSN_EMPTY_LCP_REQ,
- signal, EmptyLcpReq::SignalLength, JBB);
-}
-
-void
-DblqhProxy::execEMPTY_LCP_CONF(Signal* signal)
-{
- Ss_EMPTY_LCP_REQ& ss = ssFind<Ss_EMPTY_LCP_REQ>(1);
- recvCONF(signal, ss);
-}
-
-void
-DblqhProxy::sendEMPTY_LCP_CONF(Signal* signal, Uint32 ssId)
-{
- Ss_EMPTY_LCP_REQ& ss = ssFind<Ss_EMPTY_LCP_REQ>(ssId);
- const EmptyLcpConf* conf = (const EmptyLcpConf*)signal->getDataPtr();
-
- if (firstReply(ss)) {
- jam();
- ss.m_conf = *conf;
- } else if (ss.m_conf.idle && conf->idle) {
- jam();
- ndbrequire(ss.m_conf.lcpId == conf->lcpId);
- } else if (ss.m_conf.idle && !conf->idle) {
- jam();
- ndbrequire(ss.m_conf.lcpId == conf->lcpId);
- ss.m_conf = *conf;
- } else if (!ss.m_conf.idle && conf->idle) {
- jam();
- ndbrequire(ss.m_conf.lcpId == conf->lcpId);
- } else if (!ss.m_conf.idle && !conf->idle) {
- jam();
- if (ss.m_conf.tableId < conf->tableId ||
- (ss.m_conf.tableId == conf->tableId &&
- ss.m_conf.fragmentId < conf->fragmentId)) {
- jam();
- ss.m_conf.tableId = conf->tableId;
- ss.m_conf.fragmentId = conf->fragmentId;
- ndbrequire(ss.m_conf.lcpNo == conf->lcpNo);
- ndbrequire(ss.m_conf.lcpId == conf->lcpId);
- }
- } else {
- ndbassert(false);
- }
-
- if (!lastReply(ss))
- return;
-
- if (ss.m_error == 0) {
- jam();
- EmptyLcpConf* conf = (EmptyLcpConf*)signal->getDataPtrSend();
- *conf = ss.m_conf;
- sendSignal(ss.m_req.senderRef, GSN_EMPTY_LCP_CONF,
- signal, EmptyLcpConf::SignalLength, JBB);
- } else {
- ndbrequire(false);
- }
-
- ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
-}
-
// GSN_EXEC_SR_1 [fictional gsn ]
void
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-12-22 09:40:33 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2009-03-13 07:49:42 +0000
@@ -41,18 +41,6 @@ protected:
// system info
- struct LcpRecord {
- bool m_idle;
- Uint32 m_lcpId;
- Uint32 m_frags;
- LcpRecord() {
- m_idle = true;
- m_lcpId = 0;
- m_frags = 0; // completed
- };
- };
- LcpRecord c_lcpRecord;
-
// GSN_NDB_STTOR
virtual void callNDB_STTOR(Signal*);
@@ -118,108 +106,6 @@ protected:
void execTAB_COMMITREF(Signal*);
void sendTAB_COMMITCONF(Signal*, Uint32 ssId);
- // GSN_LCP_FRAG_ORD
- struct Ss_LCP_FRAG_ORD : SsParallel {
- /*
- * Used for entire LCP. There is no start signal to LQH so we
- * keep state in LcpRecord. Last signal has only lastFragmentFlag
- * set and is treated as a fictional signal GSN_LCP_COMPLETE_ORD.
- */
- static const char* name() { return "LCP_FRAG_ORD"; }
- WorkerMask m_active; // handled at least 1 fragment
- Ss_LCP_FRAG_ORD() {
- m_sendREQ = (SsFUNC)&DblqhProxy::sendLCP_FRAG_ORD;
- m_sendCONF = (SsFUNC)0;
- }
- enum { poolSize = 1 };
- static SsPool<Ss_LCP_FRAG_ORD>& pool(LocalProxy* proxy) {
- return ((DblqhProxy*)proxy)->c_ss_LCP_FRAG_ORD;
- }
- };
- SsPool<Ss_LCP_FRAG_ORD> c_ss_LCP_FRAG_ORD;
- static Uint32 getSsId(const LcpFragOrd* req) {
- return SsIdBase | (req->lcpId & 0xFFFF);
- }
- static Uint32 getSsId(const LcpFragRep* conf) {
- return SsIdBase | (conf->lcpId & 0xFFFF);
- }
- static Uint32 getSsId(const LcpCompleteRep* conf) {
- return SsIdBase | (conf->lcpId & 0xFFFF);
- }
- void execLCP_FRAG_ORD(Signal*);
- void sendLCP_FRAG_ORD(Signal*, Uint32 ssId);
- void execLCP_FRAG_REP(Signal*);
-
- // GSN_LCP_COMPLETE_ORD [ sub-op, fictional gsn ]
- struct Ss_LCP_COMPLETE_ORD : SsParallel {
- static const char* name() { return "LCP_COMPLETE_ORD"; }
- LcpFragOrd m_req;
- // pointers to Ss_END_LCP_REQ for PGMAN, TSMAN, LGMAN
- enum { BlockCnt = 3 };
- struct BlockInfo {
- Uint32 m_blockNo;
- Uint32 m_ssId;
- BlockInfo() : m_blockNo(0), m_ssId(0) {}
- } m_endLcp[BlockCnt];
- Ss_LCP_COMPLETE_ORD() {
- m_sendREQ = (SsFUNC)&DblqhProxy::sendLCP_COMPLETE_ORD;
- m_sendCONF = (SsFUNC)&DblqhProxy::sendLCP_COMPLETE_REP;
- m_endLcp[0].m_blockNo = PGMAN;
- m_endLcp[1].m_blockNo = TSMAN;
- m_endLcp[2].m_blockNo = LGMAN;
- }
- enum { poolSize = 1 };
- static SsPool<Ss_LCP_COMPLETE_ORD>& pool(LocalProxy* proxy) {
- return ((DblqhProxy*)proxy)->c_ss_LCP_COMPLETE_ORD;
- }
- };
- SsPool<Ss_LCP_COMPLETE_ORD> c_ss_LCP_COMPLETE_ORD;
- void execLCP_COMPLETE_ORD(Signal*);
- void sendLCP_COMPLETE_ORD(Signal*, Uint32 ssId);
- void execLCP_COMPLETE_REP(Signal*);
- void sendLCP_COMPLETE_REP(Signal*, Uint32 ssId);
-
- // GSN_END_LCP_REQ [ sub-op ]
- struct Ss_END_LCP_REQ : SsParallel {
- /*
- * Starts with worker REQs so the roles of sendREQ/sendCONF
- * are reversed. Workers are forced to send END_LCP_REQ because
- * making LCP_COMPLETE_REP answer here is too complicated.
- * Note TSMAN sends no END_LCP_CONF.
- */
- static const char* name() { return "END_LCP_REQ"; }
- Uint32 m_ssIdLcp;
- Uint32 m_reqcount;
- Uint32 m_backupId;
- Uint32 m_proxyBlockNo;
- Uint32 m_confcount;
- EndLcpReq m_req[MaxWorkers];
- Ss_END_LCP_REQ() {
- m_sendREQ = (SsFUNC)&DblqhProxy::sendEND_LCP_CONF;
- m_sendCONF = (SsFUNC)&DblqhProxy::sendEND_LCP_REQ;
- m_ssIdLcp = 0;
- m_reqcount = 0;
- m_backupId = 0;
- m_proxyBlockNo = 0;
- m_confcount = 0;
- };
- enum { poolSize = 3 }; // PGMAN, TSMAN, LGMAN
- static SsPool<Ss_END_LCP_REQ>& pool(LocalProxy* proxy) {
- return ((DblqhProxy*)proxy)->c_ss_END_LCP_REQ;
- }
- };
- SsPool<Ss_END_LCP_REQ> c_ss_END_LCP_REQ;
- static Uint32 getSsId(const EndLcpReq* req) {
- return (req->proxyBlockNo << 16) | (req->backupId & 0xFFFF);
- }
- static Uint32 getSsId(const EndLcpConf* conf) {
- return conf->senderData;
- }
- void execEND_LCP_REQ(Signal*);
- void sendEND_LCP_REQ(Signal*, Uint32 ssId);
- void execEND_LCP_CONF(Signal*);
- void sendEND_LCP_CONF(Signal*, Uint32 ssId);
-
// GSN_GCP_SAVEREQ
struct Ss_GCP_SAVEREQ : SsParallel {
static const char* name() { return "GCP_SAVEREQ"; }
@@ -438,28 +324,6 @@ protected:
void execLQH_TRANSCONF(Signal*);
void sendLQH_TRANSCONF(Signal*, Uint32 ssId);
- // GSN_EMPTY_LCP_REQ
- struct Ss_EMPTY_LCP_REQ : SsParallel {
- static const char* name() { return "EMPTY_LCP_REQ"; }
- EmptyLcpReq m_req;
- EmptyLcpConf m_conf; // build final conf here
- Ss_EMPTY_LCP_REQ() {
- m_conf.idle = 1;
- m_sendREQ = (SsFUNC)&DblqhProxy::sendEMPTY_LCP_REQ;
- m_sendCONF = (SsFUNC)&DblqhProxy::sendEMPTY_LCP_CONF;
- }
- enum { poolSize = 1 };
- static SsPool<Ss_EMPTY_LCP_REQ>& pool(LocalProxy* proxy) {
- return ((DblqhProxy*)proxy)->c_ss_EMPTY_LCP_REQ;
- }
- };
- SsPool<Ss_EMPTY_LCP_REQ> c_ss_EMPTY_LCP_REQ;
- void execEMPTY_LCP_REQ(Signal*);
- void sendEMPTY_LCP_REQ(Signal*, Uint32 ssId);
- void execEMPTY_LCP_CONF(Signal*);
- void execEMPTY_LCP_REF(Signal*);
- void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
-
// GSN_EXEC_SR_1 [ fictional gsn ]
struct Ss_EXEC_SR_1 : SsParallel {
/*
@@ -552,6 +416,52 @@ protected:
void execDROP_FRAG_CONF(Signal*);
void execDROP_FRAG_REF(Signal*);
void sendDROP_FRAG_CONF(Signal*, Uint32 ssId);
+
+ // LCP handling
+ void execEMPTY_LCP_REQ(Signal*);
+ void execLCP_FRAG_ORD(Signal*);
+ void execLCP_FRAG_REP(Signal*);
+ void execEND_LCP_CONF(Signal*);
+ void execLCP_COMPLETE_REP(Signal*);
+
+ struct LcpRecord {
+ enum {
+ L_IDLE = 0,
+ L_STARTING = 1,
+ L_RUNNING = 2,
+ L_COMPLETING_1 = 3,
+ L_COMPLETING_2 = 4,
+ L_COMPLETING_3 = 5
+ } m_state;
+ Uint32 m_lcpId;
+ Uint32 m_lcp_frag_ord_cnt; // No of LCP_FRAG_ORD received
+ Uint32 m_lcp_frag_rep_cnt; // No of LCP_FRAG_REP sent
+ Uint32 m_complete_outstanding; // Outstanding END_LCP_REQ
+ NdbNodeBitmask m_empty_lcp_req;// Nodes waiting for EMPTY_LCP_CONF
+ LcpFragOrd m_last_lcp_frag_ord;// Last received LCP_FRAG_ORD
+ bool m_lastFragmentFlag;
+
+ LcpRecord(){
+ m_state = L_IDLE;
+ m_lcpId = 0;
+ m_lcp_frag_ord_cnt = 0;
+ m_lcp_frag_rep_cnt = 0;
+ m_lastFragmentFlag = false;
+ };
+ };
+ LcpRecord c_lcpRecord;
+ Uint32 getNoOfOutstanding(const LcpRecord&) const;
+ void completeLCP_1(Signal* signal);
+ void completeLCP_2(Signal* signal);
+ void completeLCP_3(Signal* signal);
+ void sendLCP_COMPLETE_REP(Signal*);
+
+ void checkSendEMPTY_LCP_CONF_impl(Signal* signal);
+ void checkSendEMPTY_LCP_CONF(Signal* signal) {
+ if (c_lcpRecord.m_empty_lcp_req.isclear())
+ return;
+ checkSendEMPTY_LCP_CONF_impl(signal);
+ }
};
#endif
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (jonas:2937) Bug#43108 | Jonas Oreland | 13 Mar |