4039 Pekka Nousiainen 2012-10-18 [merge]
merge
modified:
BUILD/compile-dist
storage/ndb/CMakeLists.txt
storage/ndb/src/common/portlib/NdbThread.c
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2012-09-24 14:57:07 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2012-10-17 12:23:49 +0000
@@ -6569,7 +6569,17 @@ injectApplyStatusWriteRow(injector::tran
/*
Intialize apply_status_table->record[0]
+
+ When iterating past the end of the last epoch, the first event of
+ the new epoch may be on ndb_apply_status. Its event data saved
+ in record[0] would be overwritten here by a subsequent event on a
+ normal table. So save and restore its record[0].
*/
+ static const ulong sav_max= 512; // current is 284
+ const ulong sav_len= apply_status_table->s->reclength;
+ DBUG_ASSERT(sav_len <= sav_max);
+ uchar sav_buf[sav_max];
+ memcpy(sav_buf, apply_status_table->record[0], sav_len);
empty_record(apply_status_table);
apply_status_table->field[0]->store((longlong)::server_id, true);
@@ -6595,6 +6605,7 @@ injectApplyStatusWriteRow(injector::tran
assert(ret == 0);
+ memcpy(apply_status_table->record[0], sav_buf, sav_len);
DBUG_RETURN(true);
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-08-13 14:03:42 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-10-17 14:43:50 +0000
@@ -950,16 +950,11 @@ public:
typedef Ptr<GcpRecord> GcpRecordPtr;
struct HostRecord {
+ struct PackedWordsContainer lqh_pack[MAX_NDBMT_LQH_THREADS+1];
+ struct PackedWordsContainer tc_pack[MAX_NDBMT_TC_THREADS+1];
Uint8 inPackedList;
Uint8 nodestatus;
- Uint8 _unused[2];
- UintR noOfPackedWordsLqh;
- UintR packedWordsLqh[30];
- UintR noOfPackedWordsTc;
- UintR packedWordsTc[29];
- BlockReference hostLqhBlockRef;
- BlockReference hostTcBlockRef;
- };// Size 128 bytes
+ };
typedef Ptr<HostRecord> HostRecordPtr;
/* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
@@ -2433,8 +2428,8 @@ private:
void sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref);
void sendCommitLqh(Signal* signal, BlockReference alqhBlockref);
void sendCompleteLqh(Signal* signal, BlockReference alqhBlockref);
- void sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr);
- void sendPackedSignalTc(Signal* signal, HostRecord * ahostptr);
+ void sendPackedSignal(Signal* signal,
+ struct PackedWordsContainer * container);
void cleanUp(Signal* signal);
void sendAttrinfoLoop(Signal* signal);
void sendAttrinfoSignal(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-28 23:55:14 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-10-17 14:43:50 +0000
@@ -844,7 +844,7 @@ void Dblqh::execNDB_STTOR(Signal* signal
/* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
void Dblqh::startphase1Lab(Signal* signal, Uint32 _dummy, Uint32 ownNodeId)
{
- UintR Ti;
+ UintR Ti, Tj;
HostRecordPtr ThostPtr;
/* ------- INITIATE ALL RECORDS ------- */
@@ -861,11 +861,19 @@ void Dblqh::startphase1Lab(Signal* signa
* Valid only if receiver has same number of LQH workers.
* In general full instance key of fragment must be used.
*/
- ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
- ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i);
ThostPtr.p->inPackedList = false;
- ThostPtr.p->noOfPackedWordsLqh = 0;
- ThostPtr.p->noOfPackedWordsTc = 0;
+ for (Tj = 0; Tj < NDB_ARRAY_SIZE(ThostPtr.p->lqh_pack); Tj++)
+ {
+ ThostPtr.p->lqh_pack[Tj].noOfPackedWords = 0;
+ ThostPtr.p->lqh_pack[Tj].hostBlockRef =
+ numberToRef(DBLQH, Tj, ThostPtr.i);
+ }
+ for (Tj = 0; Tj < NDB_ARRAY_SIZE(ThostPtr.p->tc_pack); Tj++)
+ {
+ ThostPtr.p->tc_pack[Tj].noOfPackedWords = 0;
+ ThostPtr.p->tc_pack[Tj].hostBlockRef =
+ numberToRef(DBTC, Tj, ThostPtr.i);
+ }
ThostPtr.p->nodestatus = ZNODE_DOWN;
}//for
cpackedListIndex = 0;
@@ -3452,6 +3460,7 @@ void Dblqh::execSEND_PACKED(Signal* sign
{
HostRecordPtr Thostptr;
UintR i;
+ UintR j;
UintR TpackedListIndex = cpackedListIndex;
jamEntry();
for (i = 0; i < TpackedListIndex; i++) {
@@ -3459,14 +3468,22 @@ void Dblqh::execSEND_PACKED(Signal* sign
ptrAss(Thostptr, hostRecord);
jam();
ndbrequire(Thostptr.i - 1 < MAX_NDB_NODES - 1);
- if (Thostptr.p->noOfPackedWordsLqh > 0) {
- jam();
- sendPackedSignalLqh(signal, Thostptr.p);
- }//if
- if (Thostptr.p->noOfPackedWordsTc > 0) {
- jam();
- sendPackedSignalTc(signal, Thostptr.p);
- }//if
+ for (j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->lqh_pack); j++)
+ {
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[j];
+ if (container->noOfPackedWords > 0) {
+ jam();
+ sendPackedSignal(signal, container);
+ }
+ }
+ for (j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->tc_pack); j++)
+ {
+ struct PackedWordsContainer * container = &Thostptr.p->tc_pack[j];
+ if (container->noOfPackedWords > 0) {
+ jam();
+ sendPackedSignal(signal, container);
+ }
+ }
Thostptr.p->inPackedList = false;
}//for
cpackedListIndex = 0;
@@ -3708,33 +3725,39 @@ void Dblqh::execTUPKEYREF(Signal* signal
}//switch
}//Dblqh::execTUPKEYREF()
-void Dblqh::sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr)
+void Dblqh::sendPackedSignal(Signal* signal,
+ struct PackedWordsContainer * container)
{
- Uint32 noOfWords = ahostptr->noOfPackedWordsLqh;
- BlockReference hostRef = ahostptr->hostLqhBlockRef;
+ Uint32 noOfWords = container->noOfPackedWords;
+ BlockReference hostRef = container->hostBlockRef;
+ container->noOfPackedWords = 0;
MEMCOPY_NO_WORDS(&signal->theData[0],
- &ahostptr->packedWordsLqh[0],
+ &container->packedWords[0],
noOfWords);
sendSignal(hostRef, GSN_PACKED_SIGNAL, signal, noOfWords, JBB);
- ahostptr->noOfPackedWordsLqh = 0;
-}//Dblqh::sendPackedSignalLqh()
-
-void Dblqh::sendPackedSignalTc(Signal* signal, HostRecord * ahostptr)
-{
- Uint32 noOfWords = ahostptr->noOfPackedWordsTc;
- BlockReference hostRef = ahostptr->hostTcBlockRef;
- MEMCOPY_NO_WORDS(&signal->theData[0],
- &ahostptr->packedWordsTc[0],
- noOfWords);
- sendSignal(hostRef, GSN_PACKED_SIGNAL, signal, noOfWords, JBB);
- ahostptr->noOfPackedWordsTc = 0;
-}//Dblqh::sendPackedSignalTc()
+}
void Dblqh::sendCommitLqh(Signal* signal, BlockReference alqhBlockref)
{
+ Uint32 instanceKey = refToInstance(alqhBlockref);
+ ndbassert(refToMain(alqhBlockref) == DBLQH);
+
+ if (instanceKey > MAX_NDBMT_LQH_THREADS)
+ {
+ /* No send packed support in these cases */
+ jam();
+ signal->theData[0] = tcConnectptr.p->clientConnectrec;
+ signal->theData[1] = tcConnectptr.p->transid[0];
+ signal->theData[2] = tcConnectptr.p->transid[1];
+ sendSignal(alqhBlockref, GSN_COMMIT, signal, 3, JBB);
+ return;
+ }
+
HostRecordPtr Thostptr;
+
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey];
Uint32 Tdata[5];
Tdata[0] = tcConnectptr.p->clientConnectrec;
@@ -3751,40 +3774,41 @@ void Dblqh::sendCommitLqh(Signal* signal
len = 4;
}
- // currently packed signal cannot address specific instance
- const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
- if (send_unpacked) {
+ if (container->noOfPackedWords > 25 - len) {
jam();
- FragrecordPtr Tfragptr;
- Tfragptr.i = tcConnectptr.p->fragmentptr;
- c_fragment_pool.getPtr(Tfragptr);
- memcpy(&signal->theData[0], &Tdata[0], len << 2);
- Uint32 Tnode = Thostptr.i;
- Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
- BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
- return;
- }
-
- if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
- jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}
Tdata[0] |= (ZCOMMIT << 28);
- Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsLqh = pos + len;
+ Uint32 pos = container->noOfPackedWords;
+ container->noOfPackedWords = pos + len;
+ memcpy(&container->packedWords[pos], &Tdata[0], len << 2);
}
void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
{
+ Uint32 instanceKey = refToInstance(alqhBlockref);
+ ndbassert(refToMain(alqhBlockref) == DBLQH);
+
+ if (instanceKey > MAX_NDBMT_LQH_THREADS)
+ {
+ /* No send packed support in these cases */
+ jam();
+ signal->theData[0] = tcConnectptr.p->clientConnectrec;
+ signal->theData[1] = tcConnectptr.p->transid[0];
+ signal->theData[2] = tcConnectptr.p->transid[1];
+ sendSignal(alqhBlockref, GSN_COMPLETE, signal, 3, JBB);
+ return;
+ }
+
HostRecordPtr Thostptr;
+
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey];
Uint32 Tdata[3];
Tdata[0] = tcConnectptr.p->clientConnectrec;
@@ -3792,39 +3816,28 @@ void Dblqh::sendCompleteLqh(Signal* sign
Tdata[2] = tcConnectptr.p->transid[1];
Uint32 len = 3;
- // currently packed signal cannot address specific instance
- const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
- if (send_unpacked) {
+ if (container->noOfPackedWords > 22) {
jam();
- FragrecordPtr Tfragptr;
- Tfragptr.i = tcConnectptr.p->fragmentptr;
- c_fragment_pool.getPtr(Tfragptr);
- memcpy(&signal->theData[0], &Tdata[0], len << 2);
- Uint32 Tnode = Thostptr.i;
- Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
- BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB);
- return;
- }
-
- if (Thostptr.p->noOfPackedWordsLqh > 22) {
- jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}
Tdata[0] |= (ZCOMPLETE << 28);
- Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsLqh = pos + len;
+ Uint32 pos = container->noOfPackedWords;
+ container->noOfPackedWords = pos + len;
+ memcpy(&container->packedWords[pos], &Tdata[0], len << 2);
}
void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref)
{
- if (refToInstance(atcBlockref))
+ Uint32 instanceKey = refToInstance(atcBlockref);
+
+ ndbassert(refToMain(atcBlockref) == DBTC);
+ if (instanceKey > MAX_NDBMT_TC_THREADS)
{
+ /* No send packed support in these cases */
jam();
signal->theData[0] = tcConnectptr.p->clientConnectrec;
signal->theData[1] = tcConnectptr.p->transid[0];
@@ -3836,6 +3849,7 @@ void Dblqh::sendCommittedTc(Signal* sign
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+ struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey];
Uint32 Tdata[3];
Tdata[0] = tcConnectptr.p->clientConnectrec;
@@ -3843,34 +3857,28 @@ void Dblqh::sendCommittedTc(Signal* sign
Tdata[2] = tcConnectptr.p->transid[1];
Uint32 len = 3;
- // currently TC is single-threaded
- const bool send_unpacked = false;
- if (send_unpacked) {
- jam();
- memcpy(&signal->theData[0], &Tdata[0], len << 2);
- BlockReference tcRef = Thostptr.p->hostTcBlockRef;
- sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
- return;
- }
-
- if (Thostptr.p->noOfPackedWordsTc > 22) {
+ if (container->noOfPackedWords > 22) {
jam();
- sendPackedSignalTc(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}
Tdata[0] |= (ZCOMMITTED << 28);
- Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsTc = pos + len;
+ Uint32 pos = container->noOfPackedWords;
+ container->noOfPackedWords = pos + len;
+ memcpy(&container->packedWords[pos], &Tdata[0], len << 2);
}
void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref)
{
- if (refToInstance(atcBlockref))
+ Uint32 instanceKey = refToInstance(atcBlockref);
+
+ ndbassert(refToMain(atcBlockref) == DBTC);
+ if (instanceKey > MAX_NDBMT_TC_THREADS)
{
+ /* No handling of send packed in those cases */
jam();
signal->theData[0] = tcConnectptr.p->clientConnectrec;
signal->theData[1] = tcConnectptr.p->transid[0];
@@ -3882,6 +3890,7 @@ void Dblqh::sendCompletedTc(Signal* sign
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+ struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey];
Uint32 Tdata[3];
Tdata[0] = tcConnectptr.p->clientConnectrec;
@@ -3889,110 +3898,107 @@ void Dblqh::sendCompletedTc(Signal* sign
Tdata[2] = tcConnectptr.p->transid[1];
Uint32 len = 3;
- // currently TC is single-threaded
- const bool send_unpacked = false;
- if (send_unpacked) {
- jam();
- memcpy(&signal->theData[0], &Tdata[0], len << 2);
- BlockReference tcRef = Thostptr.p->hostTcBlockRef;
- sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
- return;
- }
-
- if (Thostptr.p->noOfPackedWordsTc > 22) {
+ if (container->noOfPackedWords > 22) {
jam();
- sendPackedSignalTc(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}
Tdata[0] |= (ZCOMPLETED << 28);
- Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsTc = pos + len;
+ Uint32 pos = container->noOfPackedWords;
+ container->noOfPackedWords = pos + len;
+ memcpy(&container->packedWords[pos], &Tdata[0], len << 2);
}
void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref)
{
LqhKeyConf* lqhKeyConf;
+ struct PackedWordsContainer * container;
+ bool send_packed = true;
HostRecordPtr Thostptr;
-
- bool packed= true;
Thostptr.i = refToNode(atcBlockref);
+ Uint32 instanceKey = refToInstance(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
- if (refToBlock(atcBlockref) == DBTC) {
- jam();
+ Uint32 block = refToMain(atcBlockref);
+
+ if (block == DBLQH)
+ {
+ if (instanceKey <= MAX_NDBMT_LQH_THREADS)
+ {
+ container = &Thostptr.p->lqh_pack[instanceKey];
+ }
+ else
+ {
+ send_packed = false;
+ }
+ }
+ else if (block == DBTC)
+ {
+ if (instanceKey <= MAX_NDBMT_TC_THREADS)
+ {
+ container = &Thostptr.p->tc_pack[instanceKey];
+ }
+ else
+ {
+ send_packed = false;
+ }
+ }
+ else
+ {
+ send_packed = false;
+ }
+
/*******************************************************************
+// Normal path
// This signal was intended for DBTC as part of the normal transaction
// execution.
-********************************************************************/
- if (Thostptr.p->noOfPackedWordsTc > (25 - LqhKeyConf::SignalLength)) {
- jam();
- sendPackedSignalTc(signal, Thostptr.p);
- } else {
- jam();
- updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
- lqhKeyConf = (LqhKeyConf *)
- &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
- Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength;
- } else if(refToMain(atcBlockref) == DBLQH &&
- refToInstance(atcBlockref) == instance()) {
- //wl4391_todo check
- jam();
-/*******************************************************************
+// More unusual path
// This signal was intended for DBLQH as part of log execution or
// node recovery.
+// Yet another path
+// Intended for DBSPJ as part of join processing
********************************************************************/
- if (Thostptr.p->noOfPackedWordsLqh > (25 - LqhKeyConf::SignalLength)) {
+ if (send_packed)
+ {
+ if (container->noOfPackedWords > (25 - LqhKeyConf::SignalLength)) {
jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}//if
lqhKeyConf = (LqhKeyConf *)
- &Thostptr.p->packedWordsLqh[Thostptr.p->noOfPackedWordsLqh];
- Thostptr.p->noOfPackedWordsLqh += LqhKeyConf::SignalLength;
- } else {
- packed= false;
- lqhKeyConf = (LqhKeyConf *)signal->getDataPtrSend();
+ &container->packedWords[container->noOfPackedWords];
+ container->noOfPackedWords += LqhKeyConf::SignalLength;
}
+ else
+ {
+ lqhKeyConf = (LqhKeyConf *)&signal->theData[0];
+ }
+
Uint32 ptrAndType = tcConnectptr.i | (ZLQHKEYCONF << 28);
Uint32 tcOprec = tcConnectptr.p->tcOprec;
Uint32 ownRef = cownref;
+ lqhKeyConf->connectPtr = ptrAndType;
+ lqhKeyConf->opPtr = tcOprec;
+ lqhKeyConf->userRef = ownRef;
+
Uint32 readlenAi = tcConnectptr.p->readlenAi;
Uint32 transid1 = tcConnectptr.p->transid[0];
Uint32 transid2 = tcConnectptr.p->transid[1];
Uint32 noFiredTriggers = tcConnectptr.p->noFiredTriggers;
- lqhKeyConf->connectPtr = ptrAndType;
- lqhKeyConf->opPtr = tcOprec;
- lqhKeyConf->userRef = ownRef;
lqhKeyConf->readLen = readlenAi;
lqhKeyConf->transId1 = transid1;
lqhKeyConf->transId2 = transid2;
lqhKeyConf->noFiredTriggers = noFiredTriggers;
- if(!packed)
+ if (!send_packed)
{
lqhKeyConf->connectPtr = tcConnectptr.i;
- if (instance() == refToInstance(atcBlockref) &&
- (Thostptr.i == 0 || Thostptr.i == getOwnNodeId()) &&
- globalData.ndbMtTcThreads == 0)
- {
- /**
- * This EXECUTE_DIRECT is multi-thread safe, as we only get here
- * for RESTORE block.
- */
- EXECUTE_DIRECT(refToMain(atcBlockref), GSN_LQHKEYCONF,
- signal, LqhKeyConf::SignalLength);
- }
- else
- {
- sendSignal(atcBlockref, GSN_LQHKEYCONF,
- signal, LqhKeyConf::SignalLength, JBB);
- }
+ sendSignal(atcBlockref, GSN_LQHKEYCONF,
+ signal, LqhKeyConf::SignalLength, JBB);
}
}//Dblqh::sendLqhkeyconfTc()
@@ -7595,7 +7601,10 @@ Dblqh::sendFireTrigConfTc(Signal* signal
BlockReference atcBlockref,
Uint32 Tdata[])
{
- if (refToInstance(atcBlockref) != 0)
+ Uint32 instanceKey = refToInstance(atcBlockref);
+
+ ndbassert(refToMain(atcBlockref) == DBTC);
+ if (instanceKey > MAX_NDBMT_TC_THREADS)
{
jam();
memcpy(signal->theData, Tdata, 4 * FireTrigConf::SignalLength);
@@ -7605,15 +7614,15 @@ Dblqh::sendFireTrigConfTc(Signal* signal
}
HostRecordPtr Thostptr;
- Uint32 len = FireTrigConf::SignalLength;
-
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+ Uint32 len = FireTrigConf::SignalLength;
+ struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey];
- if (Thostptr.p->noOfPackedWordsTc > (25 - len))
+ if (container->noOfPackedWords > (25 - len))
{
jam();
- sendPackedSignalTc(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
}
else
{
@@ -7622,8 +7631,8 @@ Dblqh::sendFireTrigConfTc(Signal* signal
}
ndbassert(FireTrigConf::SignalLength == 4);
- Uint32 * dst = &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
- Thostptr.p->noOfPackedWordsTc += len;
+ Uint32 * dst = &container->packedWords[container->noOfPackedWords];
+ container->noOfPackedWords += len;
dst[0] = Tdata[0] | (ZFIRE_TRIG_CONF << 28);
dst[1] = Tdata[1];
dst[2] = Tdata[2];
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-09-12 15:11:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-10-17 14:43:50 +0000
@@ -398,6 +398,10 @@ public:
AttributeBuffer::DataBufferPool c_theAttributeBufferPool;
+ typedef DataBuffer<5> CommitAckMarkerBuffer;
+
+ CommitAckMarkerBuffer::DataBufferPool c_theCommitAckMarkerBufferPool;
+
UintR c_transactionBufferSpace;
@@ -998,14 +1002,11 @@ public:
/* THIS RECORD IS ALIGNED TO BE 128 BYTES. */
/********************************************************/
struct HostRecord {
+ struct PackedWordsContainer lqh_pack[MAX_NDBMT_LQH_THREADS+1];
+ struct PackedWordsContainer packTCKEYCONF;
HostState hostStatus;
LqhTransState lqhTransStatus;
bool inPackedList;
- UintR noOfPackedWordsLqh;
- UintR packedWordsLqh[26];
- UintR noOfWordsTCKEYCONF;
- UintR packedWordsTCKEYCONF[30];
- BlockReference hostLqhBlockRef;
enum NodeFailBits
{
@@ -1017,7 +1018,7 @@ public:
};
Uint32 m_nf_bits;
NdbNodeBitmask m_lqh_trans_conf;
- }; /* p2c: size = 128 bytes */
+ };
typedef Ptr<HostRecord> HostRecordPtr;
@@ -1454,7 +1455,8 @@ private:
void sendPackedTCKEYCONF(Signal* signal,
HostRecord * ahostptr,
UintR hostId);
- void sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr);
+ void sendPackedSignal(Signal* signal,
+ struct PackedWordsContainer * container);
Uint32 sendCommitLqh(Signal* signal,
TcConnectRecord * const regTcPtr);
Uint32 sendCompleteLqh(Signal* signal,
@@ -1473,14 +1475,14 @@ private:
void timeOutFoundFragLab(Signal* signal, Uint32 TscanConPtr);
void timeOutLoopStartFragLab(Signal* signal, Uint32 TscanConPtr);
int releaseAndAbort(Signal* signal);
- void findApiConnectFail(Signal* signal);
+ void findApiConnectFail(Signal* signal, Uint32 instanceKey);
void findTcConnectFail(Signal* signal, Uint32 instanceKey);
- void initApiConnectFail(Signal* signal);
+ void initApiConnectFail(Signal* signal, Uint32 instanceKey);
void initTcConnectFail(Signal* signal, Uint32 instanceKey);
void initTcFail(Signal* signal);
void releaseTakeOver(Signal* signal);
void setupFailData(Signal* signal);
- void updateApiStateFail(Signal* signal);
+ void updateApiStateFail(Signal* signal, Uint32 instanceKey);
void updateTcStateFail(Signal* signal, Uint32 instanceKey);
void handleApiFailState(Signal* signal, UintR anApiConnectptr);
void handleFailedApiNode(Signal* signal,
@@ -1975,7 +1977,7 @@ public:
Uint32 prevHash;
Uint32 apiConnectPtr;
Uint16 apiNodeId;
- NdbNodeBitmask m_commit_ack_marker_nodes;
+ CommitAckMarkerBuffer::Head theDataBuffer;
inline bool equal(const CommitAckMarker & p) const {
return ((p.transid1 == transid1) && (p.transid2 == transid2));
@@ -1984,6 +1986,9 @@ public:
inline Uint32 hashValue() const {
return transid1;
}
+ bool insert_in_commit_ack_marker(Dbtc *tc,
+ Uint32 instanceKey,
+ NodeId nodeId);
};
private:
@@ -1995,9 +2000,10 @@ private:
RSS_AP_SNAPSHOT(m_commitAckMarkerPool);
void execTC_COMMIT_ACK(Signal* signal);
- void sendRemoveMarkers(Signal*, const CommitAckMarker *);
+ void sendRemoveMarkers(Signal*, CommitAckMarker *);
void sendRemoveMarker(Signal* signal,
NodeId nodeId,
+ Uint32 instanceKey,
Uint32 transid1,
Uint32 transid2);
void removeMarkerForFailedAPI(Signal* signal, Uint32 nodeId, Uint32 bucket);
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2012-10-17 14:43:50 +0000
@@ -116,6 +116,7 @@ void Dbtc::initRecords()
m_commitAckMarkerPool.setSize(2 * capiConnectFilesize);
m_commitAckMarkerHash.setSize(1024);
+ c_theCommitAckMarkerBufferPool.setSize(4 * capiConnectFilesize);
hostRecord = (HostRecord*)allocRecord("HostRecord",
sizeof(HostRecord),
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-09-25 11:33:57 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-10-17 14:43:50 +0000
@@ -398,17 +398,6 @@ void Dbtc::execINCL_NODEREQ(Signal* sign
}
Uint32 Tnode = hostptr.i;
- Uint32 lqhWorkers = getNodeInfo(Tnode).m_lqh_workers;
- if (lqhWorkers == 1)
- {
- jam();
- hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, 1, Tnode);
- }
- else
- {
- jam();
- hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, Tnode);
- }
sendSignal(tblockref, GSN_INCL_NODECONF, signal, 2, JBB);
@@ -894,16 +883,6 @@ void Dbtc::execREAD_NODESCONF(Signal* si
con_lineNodes++;
hostptr.p->hostStatus = HS_ALIVE;
c_alive_nodes.set(i);
- if (getNodeInfo(i).m_lqh_workers == 1)
- {
- jam();
- hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, 1, i);
- }
- else
- {
- jam();
- hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, i);
- }
if (!ndbd_deferred_unique_constraints(getNodeInfo(i).m_version))
{
jam();
@@ -3066,11 +3045,14 @@ void Dbtc::execTCKEYREQ(Signal* signal)
{
regTcPtr->commitAckMarker = tmp.i;
regApiPtr->commitAckMarker = tmp.i;
+ new (tmp.p) CommitAckMarker();
tmp.p->transid1 = tcKeyReq->transId1;
tmp.p->transid2 = tcKeyReq->transId2;
tmp.p->apiNodeId = refToNode(regApiPtr->ndbapiBlockref);
tmp.p->apiConnectPtr = TapiIndex;
- tmp.p->m_commit_ack_marker_nodes.clear();
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer);
#if defined VM_TRACE || defined ERROR_INSERT
{
CommitAckMarkerPtr check;
@@ -3822,7 +3804,6 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
handle.m_cnt= 2;
}
-
sendSignal(TBRef, GSN_LQHKEYREQ, signal,
nextPos + LqhKeyReq::FixedSignalLength, JBB,
&handle);
@@ -4262,6 +4243,17 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal
return;
}
+bool
+Dbtc::CommitAckMarker::insert_in_commit_ack_marker(Dbtc *tc,
+ Uint32 instanceKey,
+ NodeId node_id)
+{
+ Uint32 item = instanceKey + (node_id << 16);
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ tc->c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> tmp(pool, this->theDataBuffer);
+ return tmp.append(&item, (Uint32)1);
+}
void Dbtc::execLQHKEYCONF(Signal* signal)
{
@@ -4408,7 +4400,6 @@ void Dbtc::execLQHKEYCONF(Signal* signal
Uint32 commitAckMarker = regTcPtr->commitAckMarker;
regTcPtr->commitAckMarker = RNIL;
setApiConTimer(apiConnectptr.i, TtcTimer, __LINE__);
-
if (commitAckMarker != RNIL)
{
const Uint32 noOfLqhs = regTcPtr->noOfNodes;
@@ -4419,7 +4410,16 @@ void Dbtc::execLQHKEYCONF(Signal* signal
* Populate LQH array
*/
for(Uint32 i = 0; i < noOfLqhs; i++)
- tmp->m_commit_ack_marker_nodes.set(regTcPtr->tcNodedata[i]);
+ {
+ jam();
+ if (!tmp->insert_in_commit_ack_marker(this,
+ regTcPtr->lqhInstanceKey,
+ regTcPtr->tcNodedata[i]))
+ {
+ ndbout_c("Failed insert_in_commit_ack_marker");
+ ; //RONM TODO error handling
+ }
+ }
}
if (regTcPtr->isIndexOp(regTcPtr->m_special_op_flags)) {
jam();
@@ -4743,7 +4743,8 @@ void Dbtc::sendtckeyconf(Signal* signal,
const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1;
ptrAss(localHostptr, hostRecord);
- UintR TcurrLen = localHostptr.p->noOfWordsTCKEYCONF;
+ struct PackedWordsContainer * container = &localHostptr.p->packTCKEYCONF;
+ UintR TcurrLen = container->noOfPackedWords;
UintR confInfo = 0;
TcKeyConf::setCommitFlag(confInfo, TcommitFlag == 1);
TcKeyConf::setMarkerFlag(confInfo, Tmarker);
@@ -4779,7 +4780,7 @@ void Dbtc::sendtckeyconf(Signal* signal,
tc_clearbit(regApiPtr->m_flags, ApiConnectRecord::TF_EXEC_FLAG);
}
TcKeyConf::setNoOfOperations(confInfo, (TopWords >> 1));
- if ((TpacketLen + 1 /** gci_lo */ > 25) || !is_api){
+ if ((TpacketLen + 1 /** gci_lo */ > 25) ||!is_api){
TcKeyConf * const tcKeyConf = (TcKeyConf *)signal->getDataPtrSend();
jam();
@@ -4810,6 +4811,8 @@ void Dbtc::sendtckeyconf(Signal* signal,
// length - 3, since we have the real signal length plus one additional word
// for the header we have to do - 4.
// -------------------------------------------------------------------------
+ container->noOfPackedWords = TcurrLen + TpacketLen + 1 /** gci_lo */;
+
UintR Tpack0 = (TblockNum << 16) + (TpacketLen - 4 + 1 /** gci_lo */);
UintR Tpack1 = regApiPtr->ndbapiConnect;
UintR Tpack2 = Uint32(regApiPtr->globalcheckpointid >> 32);
@@ -4818,21 +4821,18 @@ void Dbtc::sendtckeyconf(Signal* signal,
UintR Tpack5 = regApiPtr->transid[1];
UintR Tpack6 = Uint32(regApiPtr->globalcheckpointid);
- localHostptr.p->noOfWordsTCKEYCONF = TcurrLen + TpacketLen + 1 /** gci_lo */;
-
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 0] = Tpack0;
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 1] = Tpack1;
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 2] = Tpack2;
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 3] = Tpack3;
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 4] = Tpack4;
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 5] = Tpack5;
+ container->packedWords[TcurrLen + 0] = Tpack0;
+ container->packedWords[TcurrLen + 1] = Tpack1;
+ container->packedWords[TcurrLen + 2] = Tpack2;
+ container->packedWords[TcurrLen + 3] = Tpack3;
+ container->packedWords[TcurrLen + 4] = Tpack4;
+ container->packedWords[TcurrLen + 5] = Tpack5;
+
+ container->packedWords[TcurrLen + TpacketLen] = Tpack6;
- UintR Ti;
- for (Ti = 6; Ti < TpacketLen; Ti++) {
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + Ti] =
- regApiPtr->tcSendArray[Ti - 6];
+ for (Uint32 Ti = 6; Ti < TpacketLen; Ti++) {
+ container->packedWords[TcurrLen + Ti] = regApiPtr->tcSendArray[Ti - 6];
}//for
- localHostptr.p->packedWordsTCKEYCONF[TcurrLen + TpacketLen] = Tpack6;
if (unlikely(!ndb_check_micro_gcp(getNodeInfo(localHostptr.i).m_version)))
{
@@ -4871,17 +4871,21 @@ void Dbtc::execSEND_PACKED(Signal* signa
UintR TpackedListIndex = cpackedListIndex;
jamEntry();
for (i = 0; i < TpackedListIndex; i++) {
+ jam();
Thostptr.i = cpackedList[i];
ptrAss(Thostptr, localHostRecord);
arrGuard(Thostptr.i - 1, MAX_NODES - 1);
- UintR TnoOfPackedWordsLqh = Thostptr.p->noOfPackedWordsLqh;
- UintR TnoOfWordsTCKEYCONF = Thostptr.p->noOfWordsTCKEYCONF;
- jam();
- if (TnoOfPackedWordsLqh > 0) {
+ for (Uint32 j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->lqh_pack); j++)
+ {
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[j];
jam();
- sendPackedSignalLqh(signal, Thostptr.p);
- }//if
- if (TnoOfWordsTCKEYCONF > 0) {
+ if (container->noOfPackedWords > 0) {
+ jam();
+ sendPackedSignal(signal, container);
+ }
+ }
+ struct PackedWordsContainer * container = &Thostptr.p->packTCKEYCONF;
+ if (container->noOfPackedWords > 0) {
jam();
sendPackedTCKEYCONF(signal, Thostptr.p, (Uint32)Thostptr.i);
}//if
@@ -4903,46 +4907,30 @@ Dbtc::updatePackedList(Signal* signal, H
}//if
}//Dbtc::updatePackedList()
-void Dbtc::sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr)
+void Dbtc::sendPackedSignal(Signal* signal,
+ struct PackedWordsContainer * container)
{
- UintR Tj;
- UintR TnoOfWords = ahostptr->noOfPackedWordsLqh;
- for (Tj = 0; Tj < TnoOfWords; Tj += 4) {
- UintR sig0 = ahostptr->packedWordsLqh[Tj + 0];
- UintR sig1 = ahostptr->packedWordsLqh[Tj + 1];
- UintR sig2 = ahostptr->packedWordsLqh[Tj + 2];
- UintR sig3 = ahostptr->packedWordsLqh[Tj + 3];
- signal->theData[Tj + 0] = sig0;
- signal->theData[Tj + 1] = sig1;
- signal->theData[Tj + 2] = sig2;
- signal->theData[Tj + 3] = sig3;
- }//for
- ahostptr->noOfPackedWordsLqh = 0;
- sendSignal(ahostptr->hostLqhBlockRef,
+ UintR TnoOfWords = container->noOfPackedWords;
+ ndbassert(TnoOfWords <= 25);
+ container->noOfPackedWords = 0;
+ memcpy(&signal->theData[0], &container->packedWords[0], 4 * TnoOfWords);
+ sendSignal(container->hostBlockRef,
GSN_PACKED_SIGNAL,
signal,
TnoOfWords,
JBB);
-}//Dbtc::sendPackedSignalLqh()
+}//Dbtc::sendPackedSignal()
void Dbtc::sendPackedTCKEYCONF(Signal* signal,
HostRecord * ahostptr,
UintR hostId)
{
- UintR Tj;
- UintR TnoOfWords = ahostptr->noOfWordsTCKEYCONF;
+ struct PackedWordsContainer * container = &ahostptr->packTCKEYCONF;
+ UintR TnoOfWords = container->noOfPackedWords;
+ ndbassert(TnoOfWords <= 25);
+ container->noOfPackedWords = 0;
BlockReference TBref = numberToRef(API_PACKED, hostId);
- for (Tj = 0; Tj < ahostptr->noOfWordsTCKEYCONF; Tj += 4) {
- UintR sig0 = ahostptr->packedWordsTCKEYCONF[Tj + 0];
- UintR sig1 = ahostptr->packedWordsTCKEYCONF[Tj + 1];
- UintR sig2 = ahostptr->packedWordsTCKEYCONF[Tj + 2];
- UintR sig3 = ahostptr->packedWordsTCKEYCONF[Tj + 3];
- signal->theData[Tj + 0] = sig0;
- signal->theData[Tj + 1] = sig1;
- signal->theData[Tj + 2] = sig2;
- signal->theData[Tj + 3] = sig3;
- }//for
- ahostptr->noOfWordsTCKEYCONF = 0;
+ memcpy(&signal->theData[0], &container->packedWords[0], 4 * TnoOfWords);
sendSignal(TBref, GSN_TCKEYCONF, signal, TnoOfWords, JBB);
}//Dbtc::sendPackedTCKEYCONF()
@@ -5330,6 +5318,7 @@ Dbtc::sendCommitLqh(Signal* signal,
{
HostRecordPtr Thostptr;
UintR ThostFilesize = chostFilesize;
+ Uint32 instanceKey = regTcPtr->lqhInstanceKey;
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
Thostptr.i = regTcPtr->lastLqhNodeId;
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
@@ -5352,20 +5341,18 @@ Dbtc::sendCommitLqh(Signal* signal,
ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_version == 0);
len = 4;
}
-
- // currently packed signal cannot address specific instance
- const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1;
- if (send_unpacked) {
+ if (instanceKey > MAX_NDBMT_LQH_THREADS) {
memcpy(&signal->theData[0], &Tdata[0], len << 2);
- Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
return ret;
}
- if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey];
+
+ if (container->noOfPackedWords > 25 - len) {
jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
ret = 1;
@@ -5373,10 +5360,10 @@ Dbtc::sendCommitLqh(Signal* signal,
}
Tdata[0] |= (ZCOMMIT << 28);
- UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
- UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
+ UintR Tindex = container->noOfPackedWords;
+ container->noOfPackedWords = Tindex + len;
+ UintR* TDataPtr = &container->packedWords[Tindex];
memcpy(TDataPtr, &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsLqh = Tindex + len;
return ret;
}
@@ -5726,8 +5713,9 @@ Dbtc::sendCompleteLqh(Signal* signal,
{
HostRecordPtr Thostptr;
UintR ThostFilesize = chostFilesize;
+ Uint32 instanceKey = regTcPtr->lqhInstanceKey;
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
- Thostptr.i = regTcPtr->lastLqhNodeId; //last???
+ Thostptr.i = regTcPtr->lastLqhNodeId;
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
Uint32 Tnode = Thostptr.i;
@@ -5740,19 +5728,18 @@ Dbtc::sendCompleteLqh(Signal* signal,
Tdata[2] = regApiPtr->transid[1];
Uint32 len = 3;
- // currently packed signal cannot address specific instance
- const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1;
- if (send_unpacked) {
+ if (instanceKey > MAX_NDBMT_LQH_THREADS) {
memcpy(&signal->theData[0], &Tdata[0], len << 2);
- Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
sendSignal(lqhRef, GSN_COMPLETE, signal, 3, JBB);
return ret;
}
-
- if (Thostptr.p->noOfPackedWordsLqh > 22) {
+
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey];
+
+ if (container->noOfPackedWords > 22) {
jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
ret = 1;
@@ -5760,11 +5747,10 @@ Dbtc::sendCompleteLqh(Signal* signal,
}
Tdata[0] |= (ZCOMPLETE << 28);
- UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
- UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
+ UintR Tindex = container->noOfPackedWords;
+ container->noOfPackedWords = Tindex + len;
+ UintR* TDataPtr = &container->packedWords[Tindex];
memcpy(TDataPtr, &Tdata[0], len << 2);
- Thostptr.p->noOfPackedWordsLqh = Tindex + len;
-
return ret;
}
@@ -5852,6 +5838,7 @@ Dbtc::sendFireTrigReqLqh(Signal* signal,
{
HostRecordPtr Thostptr;
UintR ThostFilesize = chostFilesize;
+ Uint32 instanceKey = regTcPtr.p->lqhInstanceKey;
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
Thostptr.i = regTcPtr.p->tcNodedata[0];
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
@@ -5868,19 +5855,18 @@ Dbtc::sendFireTrigReqLqh(Signal* signal,
req->pass = pass;
Uint32 len = FireTrigReq::SignalLength;
- // currently packed signal cannot address specific instance
- const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1;
- if (send_unpacked) {
+ if (instanceKey > MAX_NDBMT_LQH_THREADS) {
memcpy(signal->theData, Tdata, len << 2);
- Uint32 instanceKey = regTcPtr.p->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
sendSignal(lqhRef, GSN_FIRE_TRIG_REQ, signal, len, JBB);
return ret;
}
- if (Thostptr.p->noOfPackedWordsLqh > 25 - len) {
+ struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey];
+
+ if (container->noOfPackedWords > 25 - len) {
jam();
- sendPackedSignalLqh(signal, Thostptr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
ret = 1;
@@ -5888,10 +5874,10 @@ Dbtc::sendFireTrigReqLqh(Signal* signal,
}
Tdata[0] |= (ZFIRE_TRIG_REQ << 28);
- UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
- UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
+ UintR Tindex = container->noOfPackedWords;
+ container->noOfPackedWords = Tindex + len;
+ UintR* TDataPtr = &container->packedWords[Tindex];
memcpy(TDataPtr, Tdata, len << 2);
- Thostptr.p->noOfPackedWordsLqh = Tindex + len;
return ret;
}
@@ -6029,23 +6015,33 @@ Dbtc::execTC_COMMIT_ACK(Signal* signal){
}
void
-Dbtc::sendRemoveMarkers(Signal* signal, const CommitAckMarker * marker)
+Dbtc::sendRemoveMarkers(Signal* signal, CommitAckMarker * marker)
{
jam();
const Uint32 transId1 = marker->transid1;
const Uint32 transId2 = marker->transid2;
-
- for(Uint32 node_id = 1; node_id < MAX_NDB_NODES; node_id++)
+
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> commitAckMarkers(pool, marker->theDataBuffer);
+ CommitAckMarkerBuffer::DataBufferIterator iter;
+ bool next_flag = commitAckMarkers.first(iter);
+ while (next_flag)
{
jam();
- if (marker->m_commit_ack_marker_nodes.get(node_id))
- sendRemoveMarker(signal, node_id, transId1, transId2);
+ Uint32 dataWord = *iter.data;
+ Uint32 nodeId = dataWord >> 16;
+ Uint32 instanceKey = dataWord & 0xFFFF;
+ sendRemoveMarker(signal, nodeId, instanceKey, transId1, transId2);
+ next_flag = commitAckMarkers.next(iter, 1);
}
+ commitAckMarkers.release();
}
void
Dbtc::sendRemoveMarker(Signal* signal,
NodeId nodeId,
+ Uint32 instanceKey,
Uint32 transid1,
Uint32 transid2){
/**
@@ -6062,38 +6058,32 @@ Dbtc::sendRemoveMarker(Signal* signal,
Tdata[2] = transid2;
Uint32 len = 3;
- // currently packed signals can not address specific instance
- Uint32 cnt_workers = getNodeInfo(hostPtr.i).m_lqh_workers;
- bool send_unpacked = cnt_workers > 1;
- if (send_unpacked) {
+ if (instanceKey > MAX_NDBMT_LQH_THREADS) {
jam();
// first word omitted
memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2);
Uint32 Tnode = hostPtr.i;
- Uint32 i;
- for (i = 0; i < cnt_workers; i++) {
- // wl4391_todo skip workers not part of tx
- Uint32 instanceKey = 1 + i;
- BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
- }
+ BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
return;
}
- if (hostPtr.p->noOfPackedWordsLqh > (25 - 3)){
+ struct PackedWordsContainer * container = &hostPtr.p->lqh_pack[instanceKey];
+
+ if (container->noOfPackedWords > (25 - 3)){
jam();
- sendPackedSignalLqh(signal, hostPtr.p);
+ sendPackedSignal(signal, container);
} else {
jam();
updatePackedList(signal, hostPtr.p, hostPtr.i);
}
- UintR numWord = hostPtr.p->noOfPackedWordsLqh;
- UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
+ UintR numWord = container->noOfPackedWords;
+ UintR* dataPtr = &container->packedWords[numWord];
+ container->noOfPackedWords = numWord + len;
Tdata[0] |= (ZREMOVE_MARKER << 28);
memcpy(dataPtr, &Tdata[0], len << 2);
- hostPtr.p->noOfPackedWordsLqh = numWord + 3;
}
void Dbtc::execCOMPLETED(Signal* signal)
@@ -6510,6 +6500,11 @@ void Dbtc::clearCommitAckMarker(ApiConne
regApiPtr->commitAckMarker = RNIL;
tc_clearbit(regApiPtr->m_flags,
ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED);
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(commitAckMarker);
+ LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer);
+ commitAckMarkers.release();
m_commitAckMarkerHash.release(commitAckMarker);
}
}
@@ -8919,7 +8914,19 @@ void Dbtc::execLQH_TRANSCONF(Signal* sig
}
}
- findApiConnectFail(signal);
+ Uint32 instanceKey;
+
+ if (unlikely(signal->getLength() < LqhTransConf::SignalLength_FRAG_ID))
+ {
+ jam();
+ instanceKey = 0;
+ }
+ else
+ {
+ jam();
+ instanceKey = getInstanceKey(tableId, fragId);
+ }
+ findApiConnectFail(signal, instanceKey);
if(apiConnectptr.p->ndbapiBlockref == 0 && tapplRef != 0){
apiConnectptr.p->ndbapiBlockref = ref;
@@ -8929,19 +8936,6 @@ void Dbtc::execLQH_TRANSCONF(Signal* sig
if (ttransStatus != LqhTransConf::Marker)
{
jam();
-
- Uint32 instanceKey;
-
- if (unlikely(signal->getLength() < LqhTransConf::SignalLength_FRAG_ID))
- {
- jam();
- instanceKey = 0;
- }
- else
- {
- jam();
- instanceKey = getInstanceKey(tableId, fragId);
- }
findTcConnectFail(signal, instanceKey);
}
}//Dbtc::execLQH_TRANSCONF()
@@ -9217,6 +9211,11 @@ Dbtc::sendTCKEY_FAILREF(Signal* signal,
if(marker != RNIL)
{
jam();
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(marker);
+ LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer);
+ commitAckMarkers.release();
m_commitAckMarkerHash.release(marker);
regApiPtr->commitAckMarker = RNIL;
}
@@ -9767,7 +9766,7 @@ void Dbtc::toCompleteHandlingLab(Signal*
/* YET THEN SEIZE A NEW API CONNECT RECORD AND LINK IT */
/* INTO THE HASH TABLE. */
/*------------------------------------------------------------*/
-void Dbtc::findApiConnectFail(Signal* signal)
+void Dbtc::findApiConnectFail(Signal* signal, Uint32 instanceKey)
{
ApiConnectRecordPtr fafPrevApiConnectptr;
ApiConnectRecordPtr fafNextApiConnectptr;
@@ -9798,7 +9797,7 @@ FAF_LOOP:
fafPrevApiConnectptr.p->nextApiConnect = apiConnectptr.i;
}//if
apiConnectptr.p->nextApiConnect = RNIL;
- initApiConnectFail(signal);
+ initApiConnectFail(signal, instanceKey);
} else {
jam();
fafPrevApiConnectptr.i = fafNextApiConnectptr.i;
@@ -9811,7 +9810,7 @@ FAF_LOOP:
(apiConnectptr.p->transid[0] != ttransid1)) {
goto FAF_LOOP;
}//if
- updateApiStateFail(signal);
+ updateApiStateFail(signal, instanceKey);
}//if
}//Dbtc::findApiConnectFail()
@@ -9854,7 +9853,7 @@ void Dbtc::findTcConnectFail(Signal* sig
/*----------------------------------------------------------*/
/* INITIALISE AN API CONNECT FAIL RECORD */
/*----------------------------------------------------------*/
-void Dbtc::initApiConnectFail(Signal* signal)
+void Dbtc::initApiConnectFail(Signal* signal, Uint32 instanceKey)
{
apiConnectptr.p->transid[0] = ttransid1;
apiConnectptr.p->transid[1] = ttransid2;
@@ -9899,14 +9898,17 @@ void Dbtc::initApiConnectFail(Signal* si
m_commitAckMarkerHash.seize(tmp);
ndbrequire(tmp.i != RNIL);
-
apiConnectptr.p->commitAckMarker = tmp.i;
+
+ new (tmp.p) CommitAckMarker();
tmp.p->transid1 = ttransid1;
tmp.p->transid2 = ttransid2;
tmp.p->apiNodeId = refToNode(tapplRef);
- tmp.p->m_commit_ack_marker_nodes.clear();
- tmp.p->m_commit_ack_marker_nodes.set(tnodeid);
tmp.p->apiConnectPtr = apiConnectptr.i;
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer);
+ ndbrequire(tmp.p->insert_in_commit_ack_marker(this, instanceKey, tnodeid));
#if defined VM_TRACE || defined ERROR_INSERT
{
@@ -10052,7 +10054,7 @@ void Dbtc::setupFailData(Signal* signal)
/*----------------------------------------------------------*/
/* UPDATE THE STATE OF THE API CONNECT FOR THIS PART. */
/*----------------------------------------------------------*/
-void Dbtc::updateApiStateFail(Signal* signal)
+void Dbtc::updateApiStateFail(Signal* signal, Uint32 instanceKey)
{
if(LqhTransConf::getMarkerFlag(treqinfo))
{
@@ -10065,12 +10067,15 @@ void Dbtc::updateApiStateFail(Signal* si
m_commitAckMarkerHash.seize(tmp);
ndbrequire(tmp.i != RNIL);
+ new (tmp.p) CommitAckMarker();
apiConnectptr.p->commitAckMarker = tmp.i;
tmp.p->transid1 = ttransid1;
tmp.p->transid2 = ttransid2;
tmp.p->apiNodeId = refToNode(tapplRef);
tmp.p->apiConnectPtr = apiConnectptr.i;
- tmp.p->m_commit_ack_marker_nodes.clear();
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer);
#if defined VM_TRACE || defined ERROR_INSERT
{
CommitAckMarkerPtr check;
@@ -10086,7 +10091,7 @@ void Dbtc::updateApiStateFail(Signal* si
ndbassert(tmp.p->transid1 == ttransid1);
ndbassert(tmp.p->transid2 == ttransid2);
}
- tmp.p->m_commit_ack_marker_nodes.set(tnodeid);
+ ndbrequire(tmp.p->insert_in_commit_ack_marker(this, instanceKey, tnodeid));
}
switch (ttransStatus) {
@@ -12614,9 +12619,15 @@ void Dbtc::inithost(Signal* signal)
hostptr.p->hostStatus = HS_DEAD;
hostptr.p->inPackedList = false;
hostptr.p->lqhTransStatus = LTS_IDLE;
- hostptr.p->noOfWordsTCKEYCONF = 0;
- hostptr.p->noOfPackedWordsLqh = 0;
- hostptr.p->hostLqhBlockRef = calcLqhBlockRef(hostptr.i);
+ struct PackedWordsContainer * containerTCKEYCONF =
+ &hostptr.p->packTCKEYCONF;
+ containerTCKEYCONF->noOfPackedWords = 0;
+ for (Uint32 i = 0; i < NDB_ARRAY_SIZE(hostptr.p->lqh_pack); i++)
+ {
+ struct PackedWordsContainer * container = &hostptr.p->lqh_pack[i];
+ container->noOfPackedWords = 0;
+ container->hostBlockRef = numberToRef(DBLQH, i, hostptr.i);
+ }
hostptr.p->m_nf_bits = 0;
}//for
c_alive_nodes.clear();
@@ -12875,6 +12886,11 @@ void Dbtc::releaseAbortResources(Signal*
if (marker != RNIL)
{
jam();
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(marker);
+ LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer);
+ commitAckMarkers.release();
m_commitAckMarkerHash.release(marker);
apiConnectptr.p->commitAckMarker = RNIL;
}
@@ -13131,16 +13147,27 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
CommitAckMarkerIterator iter;
for(m_commitAckMarkerHash.first(iter); iter.curr.i != RNIL;
m_commitAckMarkerHash.next(iter)){
+ Uint32 data[4];
+ data[0] = data[1] = data[2] = data[3] = 0;
+ CommitAckMarkerBuffer::DataBufferPool & pool =
+ c_theCommitAckMarkerBufferPool;
+ LocalDataBuffer<5> commitAckMarkers(pool, iter.curr.p->theDataBuffer);
+ CommitAckMarkerBuffer::DataBufferIterator data_buf_iter;
+ bool next_flag = commitAckMarkers.first(data_buf_iter);
+ for (Uint32 i = 0; i < 4; i++)
+ {
+ if (!next_flag)
+ break;
+ data[i] = *data_buf_iter.data;
+ next_flag = commitAckMarkers.next(data_buf_iter);
+ }
infoEvent("CommitAckMarker: i = %d (0x%x, 0x%x)"
" Api: %d %x %x %x %x bucket = %d",
iter.curr.i,
iter.curr.p->transid1,
iter.curr.p->transid2,
iter.curr.p->apiNodeId,
- iter.curr.p->m_commit_ack_marker_nodes.getWord(0),
- iter.curr.p->m_commit_ack_marker_nodes.getWord(1),
- iter.curr.p->m_commit_ack_marker_nodes.getWord(2),
- iter.curr.p->m_commit_ack_marker_nodes.getWord(3),
+ data[0], data[1], data[2], data[3],
iter.bucket);
}
return;
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2012-06-21 15:24:52 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2012-10-17 14:43:50 +0000
@@ -103,6 +103,12 @@ struct Block_context
class Ndbd_mem_manager& m_mm;
};
+struct PackedWordsContainer
+{
+ BlockReference hostBlockRef;
+ Uint32 noOfPackedWords;
+ Uint32 packedWords[30];
+}; // 128 bytes
class SimulatedBlock {
friend class TraceLCP;
friend class SafeCounter;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (pekka.nousiainen:4039) | Pekka Nousiainen | 18 Oct |