#At file:///export/space/pekka/ndb/version/my51-wl4391/
3145 Pekka Nousiainen 2008-12-01
wl#4391 33e_mixed.diff
mixed - SR
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-01 18:06:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-01 18:06:58 +0000
@@ -15302,10 +15302,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
* WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED
* ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
* --------------------------------------------------------------------- */
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
signal->theData[0] = cownNodeid;
- sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+ }
return;
} else {
jam();
@@ -15319,7 +15324,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
Uint32 index = csrPhasesCompleted;
arrGuard(index, MAX_LOG_EXEC);
- BlockReference ref = calcInstanceBlockRef(DBLQH, fragptr.p->srLqhLognode[index]);
+ Uint32 Tnode = fragptr.p->srLqhLognode[index];
+ Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+ BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
fragptr.p->srStatus = Fragrecord::SS_STARTED;
/* --------------------------------------------------------------------
@@ -15336,7 +15343,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
execFragReq->lastGci = fragptr.p->srLastGci[index];
sendSignal(ref, GSN_EXEC_FRAGREQ, signal,
ExecFragReq::SignalLength, JBB);
-
}
signal->theData[0] = next;
sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15426,6 +15432,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
/* *************** */
void Dblqh::execEXEC_SRCONF(Signal* signal)
{
+ // wl4391_todo workaround until timing fixed
+ if (cnoOutstandingExecFragReq != 0) {
+ ndbout << "delay:" << V(cnoOutstandingExecFragReq) << "\n";
+ sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+ signal, 10, signal->getLength());
+ return;
+ }
jamEntry();
Uint32 nodeId = signal->theData[0];
arrGuard(nodeId, MAX_NDB_NODES);
@@ -16791,9 +16804,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
jamEntry();
signal->theData[0] = cownNodeid;
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
- sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+ }
return;
}//Dblqh::srPhase3Comp()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-12-01 18:06:58 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
// GSN_SUB_GCP_COMPLETE_REP
addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+ // GSN_EXEC_SRREQ
+ addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+ addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
}
DblqhProxy::~DblqhProxy()
@@ -1329,4 +1333,112 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
}
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+ const Ss_EXEC_SR_1::Sig* sig =
+ (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+ Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+
+ sendREQ(signal, ss);
+ ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+ const Ss_EXEC_SR_2::Sig* sig =
+ (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+
+ bool found = false;
+ Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+ if (!found) {
+ jam();
+ setMask(ss);
+ }
+
+ ndbrequire(sig->nodeId == getOwnNodeId());
+ if (ss.m_sigcount == 0) {
+ jam();
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+ } else {
+ jam();
+ ndbrequire(ss.m_gsn == gsn);
+ ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+ }
+ ss.m_sigcount++;
+
+ // reversed roles
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+ if (!lastReply(ss)) {
+ jam();
+ return;
+ }
+
+ NodeBitmask nodes;
+ nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+ NodeReceiverGroup rg(DBLQH, nodes);
+
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+ ssRelease<Ss_EXEC_SR_2>(ssId);
+}
+
BLOCK_FUNCTIONS(DblqhProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-12-01 18:06:58 +0000
@@ -380,9 +380,7 @@ protected:
// GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
static const char* name() { return "START_RECREQ_2"; }
-#endif
struct Req {
enum { SignalLength = 2 };
Uint32 lcpId;
@@ -458,6 +456,71 @@ protected:
void execEMPTY_LCP_CONF(Signal*);
void execEMPTY_LCP_REF(Signal*);
void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_1 [ fictional gsn ]
+ struct Ss_EXEC_SR_1 : SsParallel {
+ /*
+ * Handle EXEC_SRREQ and EXEC_SRCONF. These are broadcast
+ * signals (not REQ/CONF). EXEC_SR_1 receives one signal and
+ * sends it to its workers. EXEC_SR_2 waits for signal from
+ * all workers and broadcasts it to all nodes. These are
+ * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+ */
+ static const char* name() { return "EXEC_SR_1"; }
+ struct Sig {
+ enum { SignalLength = 1 };
+ Uint32 nodeId;
+ };
+ GlobalSignalNumber m_gsn;
+ Sig m_sig;
+ Ss_EXEC_SR_1() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+ m_sendCONF = (SsFUNC)0;
+ m_gsn = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+ }
+ };
+ SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+ Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SRREQ(Signal*);
+ void execEXEC_SRCONF(Signal*);
+ void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_2 [ fictional gsn ]
+ struct Ss_EXEC_SR_2 : SsParallel {
+ static const char* name() { return "EXEC_SR_2"; }
+ struct Sig {
+ enum { SignalLength = 1 + NdbNodeBitmask::Size };
+ Uint32 nodeId;
+ Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+ };
+ GlobalSignalNumber m_gsn;
+ Uint32 m_sigcount;
+ Sig m_sig; // all signals must be identical
+ Ss_EXEC_SR_2() {
+ // reversed roles
+ m_sendREQ = (SsFUNC)0;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+ m_gsn = 0;
+ m_sigcount = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+ }
+ };
+ SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+ Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_2(Signal*, Uint32 ssId);
};
#endif
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (pekka:3145) WL#4391 | Pekka Nousiainen | 1 Dec |