3146 Pekka Nousiainen 2008-12-01
wl#4391 00_fix.diff
fix - compile error on no-debug
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
3145 Pekka Nousiainen 2008-12-01
wl#4391 33e_mixed.diff
mixed - SR
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
3144 Pekka Nousiainen 2008-12-01
wl#4391 33d_mixed.diff
mixed - DBLQH: send unpacked if remote is mt-lqh
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
3143 Pekka Nousiainen 2008-12-01
wl#4391 33c_mixed.diff
mixed - DBTC: send unpacked if remote is mt-lqh
modified:
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
3142 Pekka Nousiainen 2008-12-01
wl#4391 33b_mixed.diff
mixed - use instance key, not own instance
modified:
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
3141 Pekka Nousiainen 2008-12-01
wl#4391 32a_mixed.diff
mixed - store number of lqh workers in NodeInfo
modified:
storage/ndb/include/kernel/NodeInfo.hpp
storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp
storage/ndb/include/ndb_version.h.in
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/main.cpp
3140 Pekka Nousiainen 2008-12-01
wl#4391 32b_mutex.diff
recursive mutex - implement own
modified:
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/vm/SafeMutex.cpp
storage/ndb/src/kernel/vm/SafeMutex.hpp
3139 Jonas Oreland 2008-11-28
ndb - fix testDict -n FailAddPartition - fix testprg and bugs hidden by bug in
testprg
modified:
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/test/ndbapi/testDict.cpp
=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp 2008-11-04 08:43:06 +0000
+++ b/storage/ndb/include/kernel/NodeInfo.hpp 2008-12-01 18:04:19 +0000
@@ -37,6 +37,7 @@ public:
Uint32 m_version; ///< Ndb version
Uint32 m_mysql_version; ///< MySQL version
+ Uint32 m_lqh_workers; ///< LQH workers
Uint32 m_type; ///< Node type
Uint32 m_connectCount; ///< No of times connected
bool m_connected; ///< Node is connected
@@ -50,6 +51,7 @@ inline
NodeInfo::NodeInfo(){
m_version = 0;
m_mysql_version = 0;
+ m_lqh_workers = 0;
m_type = INVALID;
m_connectCount = 0;
m_heartbeat_cnt= 0;
=== modified file 'storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2007-01-06 00:21:39 +0000
+++ b/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2008-12-01 18:04:19 +0000
@@ -161,7 +161,7 @@ class CmNodeInfoReq {
friend class Qmgr;
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
/**
@@ -171,6 +171,7 @@ private:
Uint32 dynamicId;
Uint32 version;
Uint32 mysql_version;
+ Uint32 lqh_workers; // added in telco-6.4
};
class CmNodeInfoRef {
@@ -198,20 +199,14 @@ class CmNodeInfoConf {
friend class Qmgr;
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
Uint32 nodeId;
Uint32 dynamicId;
Uint32 version;
Uint32 mysql_version;
+ Uint32 lqh_workers; // added in telco-6.4
};
#endif
-
-
-
-
-
-
-
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2008-11-14 09:12:01 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2008-12-01 18:04:19 +0000
@@ -102,6 +102,7 @@ Uint32 ndbGetOwnVersion();
#define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
#define NDBD_MAX_RECVBYTESIZE_32K MAKE_VERSION(6,3,18)
#define NDBD_LONG_SCANFRAGREQ MAKE_VERSION(6,4,0)
+#define NDBD_MT_LQH_VERSION MAKE_VERSION(6,4,0)
static
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-12-01 18:05:11 +0000
@@ -859,6 +859,11 @@ public:
* Log part
*/
Uint32 m_log_part_ptr_i;
+
+ /**
+ * Instance key for fast access.
+ */
+ Uint16 lqhInstanceKey;
};
typedef Ptr<Fragrecord> FragrecordPtr;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-12-01 18:05:11 +0000
@@ -72,3 +72,10 @@ NdbLogPartInfo::partNoIndex(Uint32 lpno)
assert(partNo[i] == lpno);
return i;
}
+
+Uint32
+NdbLogPartInfo::instanceKey(Uint32 lpno) const
+{
+ assert(lpno < LogParts);
+ return 1 + lpno;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-12-01 18:05:11 +0000
@@ -47,6 +47,7 @@ struct NdbLogPartInfo {
bool partNoOwner(Uint32 lpno) const;
bool partNoOwner(Uint32 tabId, Uint32 fragId);
Uint32 partNoIndex(Uint32 lpno) const;
+ Uint32 instanceKey(Uint32 lpno) const;
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-01 18:50:58 +0000
@@ -720,7 +720,10 @@ void Dblqh::startphase1Lab(Signal* signa
for (Ti = 0; Ti < chostFileSize; Ti++) {
ThostPtr.i = Ti;
ptrCheckGuard(ThostPtr, chostFileSize, hostRecord);
- // wl4391_todo using own instance() does not work with mixed versions
+ /*
+ * Valid only if receiver has same number of LQH workers.
+ * In general full instance key of fragment must be used.
+ */
ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i);
ThostPtr.p->inPackedList = false;
@@ -1594,6 +1597,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
ndbrequire(ptr.p->logPartNo == logPartNo);
fragptr.p->m_log_part_ptr_i = ptr.i;
+ fragptr.p->lqhInstanceKey = lpinfo.instanceKey(logPartNo);
}
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
@@ -3012,99 +3016,163 @@ void Dblqh::sendCommitLqh(Signal* signal
HostRecordPtr Thostptr;
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[5];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->gci_hi;
+ Tdata[2] = tcConnectptr.p->transid[0];
+ Tdata[3] = tcConnectptr.p->transid[1];
+ Tdata[4] = tcConnectptr.p->gci_lo;
+ Uint32 len = 5;
+
+ if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+ {
+ jam();
+ ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+ len = 4;
+ }
+
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+ if (send_unpacked) {
+ jam();
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
- Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMIT << 28);
- Uint32 gci_hi = tcConnectptr.p->gci_hi;
- Uint32 gci_lo = tcConnectptr.p->gci_lo;
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsLqh[pos] = ptrAndType;
- Thostptr.p->packedWordsLqh[pos + 1] = gci_hi;
- Thostptr.p->packedWordsLqh[pos + 2] = transid1;
- Thostptr.p->packedWordsLqh[pos + 3] = transid2;
- Thostptr.p->packedWordsLqh[pos + 4] = gci_lo;
- Thostptr.p->noOfPackedWordsLqh = pos + 5;
-
- if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
- {
- jam();
- ndbassert(gci_lo == 0 || getNodeInfo(Thostptr.i).m_connected == false);
- Thostptr.p->noOfPackedWordsLqh = pos + 4;
}
-}//Dblqh::sendCommitLqh()
+
+ Tdata[0] |= (ZCOMMIT << 28);
+ Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
+ memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+ if (send_unpacked) {
+ jam();
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 22) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETE << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETE << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsLqh[pos] = ptrAndType;
- Thostptr.p->packedWordsLqh[pos + 1] = transid1;
- Thostptr.p->packedWordsLqh[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsLqh = pos + 3;
-}//Dblqh::sendCompleteLqh()
+ memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently TC is single-threaded
+ const bool send_unpacked = false;
+ if (send_unpacked) {
+ jam();
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+ sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsTc > 22) {
jam();
sendPackedSignalTc(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMMITTED << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMITTED << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsTc[pos] = ptrAndType;
- Thostptr.p->packedWordsTc[pos + 1] = transid1;
- Thostptr.p->packedWordsTc[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCommittedTc()
+ memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsTc = pos + len;
+}
void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently TC is single-threaded
+ const bool send_unpacked = false;
+ if (send_unpacked) {
+ jam();
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+ sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsTc > 22) {
jam();
sendPackedSignalTc(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETED << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETED << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsTc[pos] = ptrAndType;
- Thostptr.p->packedWordsTc[pos + 1] = transid1;
- Thostptr.p->packedWordsTc[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCompletedTc()
+ memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsTc = pos + len;
+}
void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref)
{
@@ -3130,7 +3198,9 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig
lqhKeyConf = (LqhKeyConf *)
&Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength;
- } else if(refToMain(atcBlockref) == DBLQH){
+ } else if(refToMain(atcBlockref) == DBLQH &&
+ refToInstance(atcBlockref) == instance()) {
+ //wl4391_todo check
jam();
/*******************************************************************
// This signal was intended for DBLQH as part of log execution or
@@ -5781,8 +5851,10 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
lqhKeyReq->variableData[nextPos + 0] = sig0;
nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
- // wl4391_todo for mixed versions must recompute full instance key here
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+ // pass full instance key for remote to map to real instance
+ BlockReference lqhRef = numberToRef(DBLQH,
+ fragptr.p->lqhInstanceKey,
+ regTcPtr->nextReplica);
if (likely(sendLongReq))
{
@@ -6112,7 +6184,17 @@ void Dblqh::writeAttrinfoLab(Signal* sig
/* ------------------------------------------------------------------------- */
void Dblqh::sendTupkey(Signal* signal)
{
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH, tcConnectptr.p->nextReplica);
+ BlockReference lqhRef = 0;
+ {
+ // wl4391_todo fragptr
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ Uint32 Tnode = tcConnectptr.p->nextReplica;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ }
+
signal->theData[0] = tcConnectptr.p->tcOprec;
signal->theData[1] = tcConnectptr.p->transid[0];
signal->theData[2] = tcConnectptr.p->transid[1];
@@ -7286,7 +7368,12 @@ void Dblqh::execABORT(Signal* signal)
/* ------------------------------------------------------------------------- */
// We will immediately send the ABORT message also to the next LQH node in line.
/* ------------------------------------------------------------------------- */
- BlockReference TLqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = regTcPtr->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ Uint32 Tnode = regTcPtr->nextReplica;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference TLqhRef = numberToRef(DBLQH, instanceKey, Tnode);
signal->theData[0] = regTcPtr->tcOprec;
signal->theData[1] = regTcPtr->tcBlockref;
signal->theData[2] = regTcPtr->transid[0];
@@ -15215,10 +15302,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
* WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED
* ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
* --------------------------------------------------------------------- */
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
signal->theData[0] = cownNodeid;
- sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+ }
return;
} else {
jam();
@@ -15232,7 +15324,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
Uint32 index = csrPhasesCompleted;
arrGuard(index, MAX_LOG_EXEC);
- BlockReference ref = calcInstanceBlockRef(DBLQH,
fragptr.p->srLqhLognode[index]);
+ Uint32 Tnode = fragptr.p->srLqhLognode[index];
+ Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+ BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
fragptr.p->srStatus = Fragrecord::SS_STARTED;
/* --------------------------------------------------------------------
@@ -15249,7 +15343,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
execFragReq->lastGci = fragptr.p->srLastGci[index];
sendSignal(ref, GSN_EXEC_FRAGREQ, signal,
ExecFragReq::SignalLength, JBB);
-
}
signal->theData[0] = next;
sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15339,6 +15432,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
/* *************** */
void Dblqh::execEXEC_SRCONF(Signal* signal)
{
+ // wl4391_todo workaround until timing fixed
+ if (cnoOutstandingExecFragReq != 0) {
+ ndbout << "delay: reqs=" << cnoOutstandingExecFragReq << endl;
+ sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+ signal, 10, signal->getLength());
+ return;
+ }
jamEntry();
Uint32 nodeId = signal->theData[0];
arrGuard(nodeId, MAX_NDB_NODES);
@@ -16704,9 +16804,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
jamEntry();
signal->theData[0] = cownNodeid;
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
- sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+ }
return;
}//Dblqh::srPhase3Comp()
@@ -19543,7 +19648,7 @@ Dblqh::validate_filter(Signal* signal)
default:
infoEvent("Invalid filter op: 0x%x pos: %ld",
* start,
- start - (signal->theData + 1));
+ (long int)(start - (signal->theData + 1)));
return false;
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-12-01 18:06:58 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
// GSN_SUB_GCP_COMPLETE_REP
addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+ // GSN_EXEC_SRREQ
+ addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+ addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
}
DblqhProxy::~DblqhProxy()
@@ -1329,4 +1333,112 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
}
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+ const Ss_EXEC_SR_1::Sig* sig =
+ (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+ Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+
+ sendREQ(signal, ss);
+ ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+ const Ss_EXEC_SR_2::Sig* sig =
+ (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+
+ bool found = false;
+ Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+ if (!found) {
+ jam();
+ setMask(ss);
+ }
+
+ ndbrequire(sig->nodeId == getOwnNodeId());
+ if (ss.m_sigcount == 0) {
+ jam();
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+ } else {
+ jam();
+ ndbrequire(ss.m_gsn == gsn);
+ ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+ }
+ ss.m_sigcount++;
+
+ // reversed roles
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+ if (!lastReply(ss)) {
+ jam();
+ return;
+ }
+
+ NodeBitmask nodes;
+ nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+ NodeReceiverGroup rg(DBLQH, nodes);
+
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+ ssRelease<Ss_EXEC_SR_2>(ssId);
+}
+
BLOCK_FUNCTIONS(DblqhProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-12-01 18:06:58 +0000
@@ -380,9 +380,7 @@ protected:
// GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
static const char* name() { return "START_RECREQ_2"; }
-#endif
struct Req {
enum { SignalLength = 2 };
Uint32 lcpId;
@@ -458,6 +456,71 @@ protected:
void execEMPTY_LCP_CONF(Signal*);
void execEMPTY_LCP_REF(Signal*);
void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_1 [ fictional gsn ]
+ struct Ss_EXEC_SR_1 : SsParallel {
+ /*
+ * Handle EXEC_SRREQ and EXEC_SRCONF. These are broadcast
+ * signals (not REQ/CONF). EXEC_SR_1 receives one signal and
+ * sends it to its workers. EXEC_SR_2 waits for signal from
+ * all workers and broadcasts it to all nodes. These are
+ * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+ */
+ static const char* name() { return "EXEC_SR_1"; }
+ struct Sig {
+ enum { SignalLength = 1 };
+ Uint32 nodeId;
+ };
+ GlobalSignalNumber m_gsn;
+ Sig m_sig;
+ Ss_EXEC_SR_1() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+ m_sendCONF = (SsFUNC)0;
+ m_gsn = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+ }
+ };
+ SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+ Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SRREQ(Signal*);
+ void execEXEC_SRCONF(Signal*);
+ void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_2 [ fictional gsn ]
+ struct Ss_EXEC_SR_2 : SsParallel {
+ static const char* name() { return "EXEC_SR_2"; }
+ struct Sig {
+ enum { SignalLength = 1 + NdbNodeBitmask::Size };
+ Uint32 nodeId;
+ Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+ };
+ GlobalSignalNumber m_gsn;
+ Uint32 m_sigcount;
+ Sig m_sig; // all signals must be identical
+ Ss_EXEC_SR_2() {
+ // reversed roles
+ m_sendREQ = (SsFUNC)0;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+ m_gsn = 0;
+ m_sigcount = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+ }
+ };
+ SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+ Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_2(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-11-13 15:05:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-12-01 18:05:48 +0000
@@ -4829,25 +4829,30 @@ void Dbtc::sendCommitLqh(Signal* signal,
Thostptr.i = regTcPtr->lastLqhNodeId;
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
- UintR Tdata1 = regTcPtr->lastLqhCon;
- UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
- UintR Tdata3 = regApiPtr->transid[0];
- UintR Tdata4 = regApiPtr->transid[1];
- UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
+ Uint32 Tdata[5];
+ Tdata[0] = regTcPtr->lastLqhCon;
+ Tdata[1] = Uint32(regApiPtr->globalcheckpointid >> 32);
+ Tdata[2] = regApiPtr->transid[0];
+ Tdata[3] = regApiPtr->transid[1];
+ Tdata[4] = Uint32(regApiPtr->globalcheckpointid);
+ Uint32 len = 5;
- // wl4391_todo testing own config is wrong for mixed versions
- bool send_unpacked = isNdbMtLqh();
+ if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+ {
+ jam();
+ ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+ len = 4;
+ }
+
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
if (send_unpacked) {
Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata1;
- data[1] = Tdata2;
- data[2] = Tdata3;
- data[3] = Tdata4;
- data[4] = Tdata5;
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
Uint32 Tnode = Thostptr.i;
Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB);
+ sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
return;
}
@@ -4857,23 +4862,14 @@ void Dbtc::sendCommitLqh(Signal* signal,
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMMIT << 28);
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
- TDataPtr[1] = Tdata2;
- TDataPtr[2] = Tdata3;
- TDataPtr[3] = Tdata4;
- TDataPtr[4] = Tdata5;
- Thostptr.p->noOfPackedWordsLqh = Tindex + 5;
-
- if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
- {
- jam();
- ndbassert(Tdata5 == 0 || getNodeInfo(Thostptr.i).m_connected == false);
- Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo
- }
-}//Dbtc::sendCommitLqh()
+ memcpy(TDataPtr, &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
void
Dbtc::DIVER_node_fail_handling(Signal* signal, Uint64 Tgci)
@@ -5222,16 +5218,16 @@ void Dbtc::sendCompleteLqh(Signal* signa
Thostptr.i = regTcPtr->lastLqhNodeId; //last???
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
- UintR Tdata1 = regTcPtr->lastLqhCon;
- UintR Tdata2 = regApiPtr->transid[0];
- UintR Tdata3 = regApiPtr->transid[1];
+ Uint32 Tdata[3];
+ Tdata[0] = regTcPtr->lastLqhCon;
+ Tdata[1] = regApiPtr->transid[0];
+ Tdata[2] = regApiPtr->transid[1];
+ Uint32 len = 3;
- bool send_unpacked = isNdbMtLqh();
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
if (send_unpacked) {
- Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata1;
- data[1] = Tdata2;
- data[2] = Tdata3;
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
Uint32 Tnode = Thostptr.i;
Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
@@ -5245,14 +5241,14 @@ void Dbtc::sendCompleteLqh(Signal* signa
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETE << 28);
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28);
- TDataPtr[1] = Tdata2;
- TDataPtr[2] = Tdata3;
- Thostptr.p->noOfPackedWordsLqh = Tindex + 3;
-}//Dbtc::sendCompleteLqh()
+ memcpy(TDataPtr, &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
void
Dbtc::execTC_COMMIT_ACK(Signal* signal){
@@ -5301,22 +5297,25 @@ Dbtc::sendRemoveMarker(Signal* signal,
hostPtr.i = nodeId;
ptrCheckGuard(hostPtr, ThostFilesize, hostRecord);
- UintR Tdata1 = 0;
- UintR Tdata2 = transid1;
- UintR Tdata3 = transid2;
+ Uint32 Tdata[3];
+ Tdata[0] = 0;
+ Tdata[1] = transid1;
+ Tdata[2] = transid2;
+ Uint32 len = 3;
- bool send_unpacked = isNdbMtLqh();
+ // currently packed signals can not address specific instance
+ bool send_unpacked = getNodeInfo(hostPtr.i).m_lqh_workers != 0;
if (send_unpacked) {
- Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata2;
- data[1] = Tdata3;
+ jam();
+ // first word omitted
+ memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2);
Uint32 Tnode = hostPtr.i;
Uint32 i;
for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) {
// wl4391_todo skip workers not part of tx
Uint32 instanceKey = 1 + i;
BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 2, JBB);
+ sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
}
return;
}
@@ -5327,14 +5326,13 @@ Dbtc::sendRemoveMarker(Signal* signal,
} else {
jam();
updatePackedList(signal, hostPtr.p, hostPtr.i);
- }//if
+ }
UintR numWord = hostPtr.p->noOfPackedWordsLqh;
UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
- dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28);
- dataPtr[1] = Tdata2;
- dataPtr[2] = Tdata3;
+ Tdata[0] |= (ZREMOVE_MARKER << 28);
+ memcpy(dataPtr, &Tdata[0], len << 2);
hostPtr.p->noOfPackedWordsLqh = numWord + 3;
}
@@ -12274,7 +12272,7 @@ Dbtc::validate_filter(Signal* signal)
default:
infoEvent("Invalid filter op: 0x%x pos: %ld",
* start,
- start - (signal->theData + 1));
+ (long int)(start - (signal->theData + 1)));
return false;
}
}
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-12-01 18:03:48 +0000
@@ -56,11 +56,7 @@ Lgman::Lgman(Block_context & ctx) :
m_tup(0),
m_logfile_group_list(m_logfile_group_pool),
m_logfile_group_hash(m_logfile_group_pool),
-#ifdef __sun // temp
- m_client_mutex(1, false)
-#else
- m_client_mutex(2, true)
-#endif
+ m_client_mutex("lgman-client", 2, true)
{
BLOCK_CONSTRUCTOR(Lgman);
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-11-13 13:36:29 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-12-01 18:04:19 +0000
@@ -991,6 +991,9 @@ void Qmgr::execCM_REGCONF(Signal* signal
c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);
myNodePtr.p->ndynamicId = TdynamicId;
+
+ // set own MT config here or in REF, and others in CM_NODEINFOREQ/CONF
+ setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
/*--------------------------------------------------------------*/
// Send this as an EVENT REPORT to inform about hearing about
@@ -1109,6 +1112,7 @@ Qmgr::sendCmNodeInfoReq(Signal* signal,
req->dynamicId = self->ndynamicId;
req->version = getNodeInfo(getOwnNodeId()).m_version;
req->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+ req->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
const Uint32 ref = calcQmgrBlockRef(nodeId);
sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB);
DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, "");
@@ -1214,6 +1218,9 @@ void Qmgr::execCM_REGREF(Signal* signal)
skip_nodes.bitAND(c_definedNodes);
c_start.m_skip_nodes.bitOR(skip_nodes);
+
+ // set own MT config here or in CONF, and others in CM_NODEINFOREQ/CONF
+ setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
char buf[100];
switch (TrefuseReason) {
@@ -1661,11 +1668,17 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
const Uint32 dynamicId = conf->dynamicId;
const Uint32 version = conf->version;
Uint32 mysql_version = conf->mysql_version;
+ Uint32 lqh_workers = conf->lqh_workers;
if (version < NDBD_SPLIT_VERSION)
{
jam();
mysql_version = 0;
}
+ if (version < NDBD_MT_LQH_VERSION)
+ {
+ jam();
+ lqh_workers = 0;
+ }
NodeRecPtr nodePtr;
nodePtr.i = getOwnNodeId();
@@ -1684,6 +1697,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
replyNodePtr.p->blockRef = signal->getSendersBlockRef();
setNodeInfo(replyNodePtr.i).m_version = version;
setNodeInfo(replyNodePtr.i).m_mysql_version = mysql_version;
+ setNodeInfo(replyNodePtr.i).m_lqh_workers = lqh_workers;
recompute_version_info(NodeInfo::DB, version);
@@ -1741,8 +1755,13 @@ void Qmgr::execCM_NODEINFOREQ(Signal* si
Uint32 mysql_version = req->mysql_version;
if (req->version < NDBD_SPLIT_VERSION)
mysql_version = 0;
-
setNodeInfo(addNodePtr.i).m_mysql_version = mysql_version;
+
+ Uint32 lqh_workers = req->lqh_workers;
+ if (req->version < NDBD_MT_LQH_VERSION)
+ lqh_workers = 0;
+ setNodeInfo(addNodePtr.i).m_lqh_workers = lqh_workers;
+
c_maxDynamicId = req->dynamicId;
cmAddPrepare(signal, addNodePtr, nodePtr.p);
@@ -1799,6 +1818,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR
conf->dynamicId = self->ndynamicId;
conf->version = getNodeInfo(getOwnNodeId()).m_version;
conf->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+ conf->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal,
CmNodeInfoConf::SignalLength, JBB);
DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-12-01 18:03:48 +0000
@@ -45,11 +45,7 @@ Tsman::Tsman(Block_context& ctx) :
m_pgman(0),
m_lgman(0),
m_tup(0),
-#ifdef __sun // temp
- m_client_mutex(1, false)
-#else
- m_client_mutex(2, true)
-#endif
+ m_client_mutex("tsman-client", 2, true)
{
BLOCK_CONSTRUCTOR(Tsman);
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-11-09 18:37:29 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-12-01 18:04:19 +0000
@@ -293,8 +293,10 @@ get_multithreaded_config(EmulatorData& e
{
// multithreaded is compiled in ndbd/ndbmtd for now
globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
- if (!globalData.isNdbMt)
+ if (!globalData.isNdbMt) {
+ ndbout << "NDBMT: non-mt" << endl;
return 0;
+ }
const ndb_mgm_configuration_iterator * p =
ed.theConfiguration->getOwnConfigIterator();
=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-12-01 18:03:48 +0000
@@ -15,94 +15,151 @@
#include "SafeMutex.hpp"
-NdbOut&
-operator<<(NdbOut& out, const SafeMutex& dm)
-{
- out << "level=" << dm.m_level << "," << "usage=" <<
dm.m_usage;
- return out;
-}
-
int
SafeMutex::create()
{
- if (m_init)
- return ErrState;
- int ret = -1;
-#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifndef __WIN__
- if (m_limit > 1 || m_debug) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- ret = pthread_mutex_init(&m_mutex, &attr);
- } else {
- // error-check mutex does not work right on my linux, skip it
- ret = pthread_mutex_init(&m_mutex, 0);
- }
-#else
- ret = pthread_mutex_init(&m_mutex, 0);
-#endif
-#else
- if (m_limit > 1 || m_debug)
- return ErrUnsupp;
+ int ret;
+ if (m_initdone)
+ return err(ErrState, __LINE__);
ret = pthread_mutex_init(&m_mutex, 0);
-#endif
if (ret != 0)
- return ret;
- m_init = true;
+ return err(ret, __LINE__);
+ ret = pthread_cond_init(&m_cond, 0);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ m_initdone = true;
return 0;
}
int
SafeMutex::destroy()
{
- if (!m_init)
- return ErrState;
- int ret = pthread_mutex_destroy(&m_mutex);
+ int ret;
+ if (!m_initdone)
+ return err(ErrState, __LINE__);
+ ret = pthread_cond_destroy(&m_cond);
if (ret != 0)
- return ret;
- m_init = false;
+ return err(ret, __LINE__);
+ ret = pthread_mutex_destroy(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ m_initdone = false;
return 0;
}
int
SafeMutex::lock()
{
- pthread_t self = pthread_self();
- int ret = pthread_mutex_lock(&m_mutex);
- /* have mutex */
+ int ret;
+ if (m_simple) {
+ ret = pthread_mutex_lock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return 0;
+ }
+ ret = pthread_mutex_lock(&m_mutex);
if (ret != 0)
- return ret;
- if (!(m_level < m_limit))
- return ErrLevel;
- m_level++;
- if (m_level > m_usage)
- m_usage = m_level;
- if (m_level == 1 && m_owner != 0)
- return ErrOwner1;
- if (m_level >= 2 && m_owner != self)
- return ErrOwner2;
- m_owner = self;
+ return err(ret, __LINE__);
+ return lock_impl();
+}
+
+int
+SafeMutex::lock_impl()
+{
+ int ret;
+ pthread_t self = pthread_self();
+ assert(self != 0);
+ while (1) {
+ if (m_level == 0) {
+ assert(m_owner == 0);
+ m_owner = self;
+ } else if (m_owner != self) {
+ ret = pthread_cond_wait(&m_cond, &m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ continue;
+ }
+ if (!(m_level < m_limit))
+ return err(ErrLevel, __LINE__);
+ m_level++;
+ if (m_usage < m_level)
+ m_usage = m_level;
+ ret = pthread_cond_signal(&m_cond);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ ret = pthread_mutex_unlock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ break;
+ }
return 0;
}
int
SafeMutex::unlock()
{
+ int ret;
+ if (m_simple) {
+ ret = pthread_mutex_unlock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return 0;
+ }
+ ret = pthread_mutex_lock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return unlock_impl();
+}
+
+int
+SafeMutex::unlock_impl()
+{
+ int ret;
pthread_t self = pthread_self();
- if (!(m_level > 0))
- return ErrState;
+ assert(self != 0);
if (m_owner != self)
- return ErrOwner3;
- if (m_level == 1)
- m_owner = 0;
+ return err(ErrOwner, __LINE__);
+ if (m_level == 0)
+ return err(ErrNolock, __LINE__);
m_level--;
- int ret = pthread_mutex_unlock(&m_mutex);
- /* lose mutex */
+ if (m_level == 0) {
+ m_owner = 0;
+ ret = pthread_cond_signal(&m_cond);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ }
+ ret = pthread_mutex_unlock(&m_mutex);
if (ret != 0)
- return ret;
+ return err(ret, __LINE__);
return 0;
}
+int
+SafeMutex::err(int errcode, int errline)
+{
+ assert(errcode != 0);
+ m_errcode = errcode;
+ m_errline = errline;
+ ndbout << *this << endl;
+#ifdef UNIT_TEST
+ abort();
+#endif
+ return errcode;
+}
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& sm)
+{
+ out << sm.m_name << ":";
+ out << " level=" << sm.m_level;
+ out << " usage=" << sm.m_usage;
+ if (sm.m_errcode != 0) {
+ out << " errcode=" << sm.m_errcode;
+ out << " errline=" << sm.m_errline;
+ }
+ return out;
+}
+
#ifdef UNIT_TEST
struct sm_thr {
@@ -144,10 +201,12 @@ sm_run(void* arg)
}
if (op == +1) {
assert(level < thr.limit);
+ //ndbout << thr.index << ": lock" << endl;
int ret = sm.lock();
assert(ret == 0);
level++;
} else if (op == -1) {
+ //ndbout << thr.index << ": unlock" << endl;
int ret = sm.unlock();
assert(ret == 0);
assert(level != 0);
@@ -161,6 +220,7 @@ sm_run(void* arg)
assert(ret == 0);
level--;
}
+ return 0;
}
int
@@ -169,18 +229,19 @@ main(int argc, char** argv)
const uint max_thr = 128;
struct sm_thr thr[max_thr];
- // threads - loops - max level
+ // threads - loops - max level - debug
uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
assert(num_thr != 0 && num_thr <= max_thr);
uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
uint limit = argc > 3 ? atoi(argv[3]) : 10;
assert(limit != 0);
+ bool debug = argc > 4 ? atoi(argv[4]) : true;
ndbout << "threads=" << num_thr;
ndbout << " loops=" << loops;
ndbout << " max level=" << limit << endl;
- SafeMutex sm(limit, true);
+ SafeMutex sm("test-mutex", limit, debug);
int ret;
ret = sm.create();
assert(ret == 0);
=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-12-01 18:03:48 +0000
@@ -26,61 +26,70 @@
#include <ndb_types.h>
#include <NdbOut.hpp>
-#undef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifdef __linux
-#define HAVE_PTHREAD_MUTEX_RECURSIVE
-#endif
-
/*
- * Recursive mutex with a recursion limit >= 1. Can be useful for
- * debugging. If a recursive mutex is not wanted, one must rewrite
- * caller code until limit 1 works.
+ * Recursive mutex with recursion limit >= 1. Intended for debugging.
+ * One should rewrite caller code until limit 1 works.
*
- * Implementation for limit > 1 uses a real OS recursive mutex. Should
- * work on linux and solaris 10. There is a unit test testSafeMutex.
+ * The implementation uses a default mutex. If limit is > 1 or debug
+ * is specified then a recursive mutex is simulated. Operating system
+ * recursive mutex (if any) is not used. The simulation is several
+ * times slower. There is a unit test testSafeMutex.
*
* The caller currently is multi-threaded disk data. Here it is easy
* to verify that the mutex is released within a time-slice.
*/
class SafeMutex {
+ const char* const m_name;
+ const Uint32 m_limit; // error if usage exceeds this
+ const bool m_debug; // use recursive implementation even for limit 1
+ const bool m_simple;
pthread_mutex_t m_mutex;
+ pthread_cond_t m_cond;
pthread_t m_owner;
- bool m_init;
+ bool m_initdone;
Uint32 m_level;
Uint32 m_usage; // max level used so far
- const Uint32 m_limit; // error if usage exceeds this
- const bool m_debug; // use recursive mutex even for limit 1
+ int m_errcode;
+ int m_errline;
+ int err(int errcode, int errline);
friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
public:
- SafeMutex(Uint32 limit, bool debug) :
+ SafeMutex(const char* name, Uint32 limit, bool debug) :
+ m_name(name),
m_limit(limit),
- m_debug(debug)
+ m_debug(debug),
+ m_simple(!(limit > 1 || debug))
{
assert(m_limit >= 1),
m_owner = 0; // wl4391_todo assuming numeric non-zero
- m_init = false;
+ m_initdone = false;
m_level = 0;
m_usage = 0;
+ m_errcode = 0;
+ m_errline = 0;
};
~SafeMutex() {
- (void)destroy();
+ if (m_initdone)
+ (void)destroy();
}
enum {
- // caller must crash on any error
- ErrUnsupp = -101, // limit > 1 or debug, and not supported by OS
- ErrState = -102, // user error
- ErrLevel = -103, // level exceeded limit
- ErrOwner1 = -104, // owner not 0 at first lock (OS error)
- ErrOwner2 = -105, // owner not self at recursive lock (OS error)
- ErrOwner3 = -106 // owner not self at unlock (OS error)
+ // caller must crash on any error - recovery is not possible
+ ErrState = -101, // user error
+ ErrLevel = -102, // level exceeded limit
+ ErrOwner = -103, // unlock when not owner
+ ErrNolock = -104 // unlock when no lock
};
int create();
int destroy();
int lock();
int unlock();
+
+private:
+ int lock_impl();
+ int unlock_impl();
};
#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-12-01 18:05:11 +0000
@@ -544,6 +544,7 @@ protected:
BlockReference calcInstanceBlockRef(BlockNumber aBlock);
// matching instance on another node e.g. LQH-LQH
+ // valid only if receiver has same number of workers
BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
/**
| Thread |
|---|
| • bzr push into mysql-5.1 branch (pekka:3139 to 3146) WL#4391 | Pekka Nousiainen | 1 Dec |