2875 Jonas Oreland 2008-10-02
ndbmtd - fix ndb_add_partition, 1) fix thread unsafty in dbtupbuffer 2) fix various
minor tc/dih bugs 3) fix correct hashmap/fragments for unique index 4) detach LQHFRAGREQ
from CREATE_TABLE in proxy
modified:
storage/ndb/include/kernel/signaldata/DiGetNodes.hpp
storage/ndb/include/kernel/signaldata/DictTabInfo.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/DiGetNodes.hpp'
--- a/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-08-11 11:44:42 +0000
+++ b/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-10-02 21:58:44 +0000
@@ -32,13 +32,12 @@ struct DiGetNodesConf {
*/
friend class Dbdih;
- STATIC_CONST( SignalLength = 4 + MAX_REPLICAS );
+ STATIC_CONST( SignalLength = 3 + MAX_REPLICAS );
STATIC_CONST( REORG_MOVING = 0x80000000);
Uint32 zero;
Uint32 fragId;
Uint32 reqinfo;
- Uint32 instanceKey;
Uint32 nodes[MAX_REPLICAS]; //+1
};
/**
=== modified file 'storage/ndb/include/kernel/signaldata/DictTabInfo.hpp'
--- a/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp 2008-06-05 20:25:12 +0000
+++ b/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp 2008-10-02 21:58:44 +0000
@@ -183,8 +183,7 @@ public:
DistrKeyHash = 4,
DistrKeyLin = 5,
UserDefined = 6,
- DistrKeyUniqueHashIndex = 7,
- DistrKeyOrderedIndex = 8,
+ DistrKeyOrderedIndex = 8, // alias
HashMapPartition = 9
};
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-10-02 17:51:17 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-10-02 21:58:44 +0000
@@ -3917,7 +3917,7 @@ Backup::execTRANSID_AI(Signal* signal)
const Uint32 filePtrI = signal->theData[0];
//const Uint32 transId1 = signal->theData[1];
//const Uint32 transId2 = signal->theData[2];
- const Uint32 dataLen = signal->length() - 3;
+ Uint32 dataLen = signal->length() - 3;
BackupFilePtr filePtr LINT_SET_PTR;
c_backupFilePool.getPtr(filePtr, filePtrI);
@@ -3927,14 +3927,30 @@ Backup::execTRANSID_AI(Signal* signal)
/**
* Unpack data
*/
+ Uint32 * dst = op.dst;
+ if (signal->getNoOfSections() == 0)
+ {
+ jam();
+ const Uint32 * src = &signal->theData[3];
+ * dst = htonl(dataLen);
+ memcpy(dst + 1, src, 4*dataLen);
+ }
+ else
+ {
+ jam();
+ SectionHandle handle(this, signal);
+ SegmentedSectionPtr dataPtr;
+ handle.getSection(dataPtr, 0);
+ dataLen = dataPtr.sz;
+
+ * dst = htonl(dataLen);
+ copy(dst + 1, dataPtr);
+ releaseSections(handle);
+ }
+
op.attrSzTotal += dataLen;
ndbrequire(dataLen < op.maxRecordSize);
- const Uint32 * src = &signal->theData[3];
- Uint32 * dst = op.dst;
-
- * dst = htonl(dataLen);
- memcpy(dst + 1, src, 4*dataLen);
op.finished(dataLen);
op.newRecord(dst + dataLen + 1);
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-10-02 17:51:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-10-02 21:58:44 +0000
@@ -4277,8 +4277,9 @@ void Dbdict::handleTabInfoInit(SimplePro
if (fragments == 0)
{
jam();
- fragments = get_default_fragments();
+ tablePtr.p->fragmentCount = fragments = get_default_fragments();
}
+
char buf[MAX_TAB_NAME_SIZE+1];
BaseString::snprintf(buf, sizeof(buf), "DEFAULT-HASHMAP-%u-%u",
NDB_DEFAULT_HASHMAP_BUCKTETS,
@@ -4858,8 +4859,7 @@ Dbdict::create_fragmentation(Signal* sig
* and distributed in the same manner but has always a normal hash
* fragmentation.
*/
- frag_req->primaryTableId = tabPtr.p->primaryTableId;
- frag_req->fragmentationType = DictTabInfo::DistrKeyUniqueHashIndex;
+ frag_req->primaryTableId = RNIL;
}
else
{
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-10-01 06:47:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-10-02 21:58:44 +0000
@@ -1222,14 +1222,12 @@ void Dbdih::execREAD_CONFIG_REQ(Signal*
ndbrequireErr(!ndb_mgm_get_int_parameter(p, CFG_DIH_TABLE, &ctabFileSize),
NDBD_EXIT_INVALID_CONFIG);
- Uint32 workers = 1;
if (isNdbMtLqh())
{
jam();
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &workers);
- c_fragments_per_node = workers;
+ c_fragments_per_node = getLqhWorkers();
}
- ndbout_c("Using %u fragments per node", workers);
+ ndbout_c("Using %u fragments per node", c_fragments_per_node);
cfileFileSize = (2 * ctabFileSize) + 2;
initRecords();
@@ -6916,7 +6914,8 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
const Uint32 flags = req->requestInfo;
Uint32 err = 0;
- const Uint32 defaultFragments = cnoOfNodeGroups * cnoReplicas;
+ const Uint32 defaultFragments =
+ c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
do {
NodeGroupRecordPtr NGPtr;
@@ -7273,7 +7272,6 @@ void Dbdih::execDIADDTABREQ(Signal* sign
jam();
tabPtr.p->method = TabRecord::NORMAL_HASH;
break;
- case DictTabInfo::DistrKeyUniqueHashIndex:
case DictTabInfo::DistrKeyOrderedIndex:
{
TabRecordPtr primTabPtr;
@@ -7340,6 +7338,9 @@ void Dbdih::execDIADDTABREQ(Signal* sign
jam();
tabPtr.p->m_map_ptr_i = req->hashMapPtrI;
tabPtr.p->m_new_map_ptr_i = RNIL;
+ Ptr<Hash2FragmentMap> mapPtr;
+ g_hash_map.getPtr(mapPtr, tabPtr.p->m_map_ptr_i);
+ ndbrequire(tabPtr.p->totalfragments >= mapPtr.p->m_fragments);
}
Uint32 index = 2;
@@ -8256,11 +8257,11 @@ void Dbdih::execDIGETNODESREQ(Signal* si
getFragstore(tabPtr.p, fragId, fragPtr);
Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes);
Uint32 sig2 = (nodeCount - 1) +
- (fragPtr.p->distributionKey << 16);
+ (fragPtr.p->distributionKey << 16) +
+ (dihGetInstanceKey(fragPtr) << 24);
conf->zero = 0;
conf->reqinfo = sig2;
conf->fragId = fragId;
- conf->instanceKey = dihGetInstanceKey(fragPtr);
if (unlikely(newFragId != RNIL))
{
@@ -8270,7 +8271,8 @@ void Dbdih::execDIGETNODESREQ(Signal* si
nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS);
conf->nodes[MAX_REPLICAS] = newFragId;
conf->nodes[MAX_REPLICAS + 1] = (nodeCount - 1) +
- (fragPtr.p->distributionKey << 16);
+ (fragPtr.p->distributionKey << 16) +
+ (dihGetInstanceKey(fragPtr) << 24);
}
}//Dbdih::execDIGETNODESREQ()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-10-01 06:53:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-10-02 21:58:44 +0000
@@ -10358,7 +10358,7 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign
*/
const BlockReference ref = scanP->scanApiBlockref;
const Uint32 scanOp = scanP->m_curr_batch_size_rows;
- const Uint32 nodeId = refToNode(ref);
+ Uint32 nodeId = refToNode(ref);
const bool connectedToNode = getNodeInfo(nodeId).m_connected;
#ifdef NOT_USED
const Uint32 type = getNodeInfo(nodeId).m_type;
@@ -10370,7 +10370,23 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign
Uint32 * dst = keyInfo->keyData;
dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
- Uint32 keyLen = readPrimaryKeys(scanP, tcConP, dst);
+ /**
+ * This is ugly :-(
+ * currently only SUMA receives KEYINFO20 inside kernel..
+ * and it's not really interested in the actual keyinfo,
+ * only the scanInfo_Node...so send only that and avoid
+ * messing with if's below...
+ */
+ Uint32 keyLen ;
+ if (refToMain(ref) == SUMA && nodeId == getOwnNodeId())
+ {
+ keyLen = 0;
+ }
+ else
+ {
+ keyLen = readPrimaryKeys(scanP, tcConP, dst);
+ }
+
Uint32 fragId = tcConP->fragmentid;
keyInfo->clientOpPtr = scanP->scanApiOpPtr;
keyInfo->keyLen = keyLen;
@@ -10384,6 +10400,11 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign
{
jam();
+ if (isNdbMtLqh() && instance() != refToInstance(ref))
+ {
+ jam();
+ nodeId = 0; // prevent execute direct
+ }
if (nodeId == getOwnNodeId())
{
EXECUTE_DIRECT(refToBlock(ref), GSN_KEYINFO20, signal,
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-09-16 18:33:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-10-02 21:58:44 +0000
@@ -32,8 +32,6 @@ DblqhProxy::DblqhProxy(Block_context& ct
// GSN_LQHFRAGREQ
addRecSignal(GSN_LQHFRAGREQ, &DblqhProxy::execLQHFRAGREQ);
- addRecSignal(GSN_LQHFRAGCONF, &DblqhProxy::execLQHFRAGCONF);
- addRecSignal(GSN_LQHFRAGREF, &DblqhProxy::execLQHFRAGREF);
// GSN_TAB_COMMITREQ
addRecSignal(GSN_TAB_COMMITREQ, &DblqhProxy::execTAB_COMMITREQ);
@@ -273,92 +271,17 @@ DblqhProxy::sendLQHADDATTCONF(Signal* si
ssRelease<Ss_LQHADDATTREQ>(ssId);
}
-// GSN_LQHFRAGREQ [ sub-op ]
+// GSN_LQHFRAGREQ [ pass-through ]
void
DblqhProxy::execLQHFRAGREQ(Signal* signal)
{
- Ss_LQHFRAGREQ& ss = ssSeize<Ss_LQHFRAGREQ>(1); // lost connection
-
- const LqhFragReq* req = (const LqhFragReq*)signal->getDataPtr();
- ss.m_req = *req;
- sendREQ(signal, ss);
-}
-
-void
-DblqhProxy::sendLQHFRAGREQ(Signal* signal, Uint32 ssId)
-{
- Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
-
LqhFragReq* req = (LqhFragReq*)signal->getDataPtrSend();
- *req = ss.m_req;
-
- NdbLogPartInfo lpinfo(workerInstance(ss.m_worker));
- Uint32 logPartNo = lpinfo.partNoFromId(req->logPartId);
- if (!lpinfo.partNoOwner(logPartNo)) {
- jam();
- skipReq(ss);
- return;
- }
-
- req->senderRef = reference();
- req->senderData = ssId;
- sendSignal(workerRef(ss.m_worker), GSN_LQHFRAGREQ,
- signal, LqhFragReq::SignalLength, JBB);
-}
-
-void
-DblqhProxy::execLQHFRAGCONF(Signal* signal)
-{
- const LqhFragConf* conf = (const LqhFragConf*)signal->getDataPtr();
- Uint32 ssId = conf->senderData;
- Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
- recvCONF(signal, ss);
-}
-
-void
-DblqhProxy::execLQHFRAGREF(Signal* signal)
-{
- const LqhFragRef* ref = (const LqhFragRef*)signal->getDataPtr();
- Uint32 ssId = ref->senderData;
- Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
- recvREF(signal, ss, ref->errorCode);
-}
-
-void
-DblqhProxy::sendLQHFRAGCONF(Signal* signal, Uint32 ssId)
-{
- Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
- Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
- BlockReference dictRef = ss_main.m_req.senderRef;
-
- if (!lastReply(ss))
- return;
-
- if (ss.m_error == 0) {
- LqhFragConf* conf = (LqhFragConf*)signal->getDataPtrSend();
- conf->senderData = ss.m_req.senderData;
- conf->lqhFragPtr = RNIL; //wl4391_todo
- conf->tableId = ss.m_req.tableId;
- conf->fragId = ss.m_req.fragId;
- conf->changeMask = 0;
- sendSignal(dictRef, GSN_LQHFRAGCONF,
- signal, LqhFragConf::SignalLength, JBB);
- } else {
- jam();
- LqhFragRef* ref = (LqhFragRef*)signal->getDataPtrSend();
- ref->senderData = ss.m_req.senderData;
- ref->errorCode = ss.m_error;
- ref->tableId = ss.m_req.tableId;
- ref->fragId = ss.m_req.fragId;
- ref->requestInfo = 0;
- ref->changeMask = 0;
- sendSignal(dictRef, GSN_LQHFRAGREF,
- signal, LqhFragRef::SignalLength, JBB);
- ssRelease<Ss_CREATE_TAB_REQ>(ssId);
- }
+ Uint32 instance = getInstanceKey(req->tableId, req->fragId);
- ssRelease<Ss_LQHFRAGREQ>(ssId);
+ // wl4391_todo impl. method that fakes senders block-ref
+ sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()),
+ GSN_LQHFRAGREQ, signal, LqhFragReq::SignalLength, JBB);
}
// GSN_TAB_COMMITREQ
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-09-16 18:33:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-10-02 21:58:44 +0000
@@ -82,24 +82,8 @@ protected:
void execLQHADDATTREF(Signal*);
void sendLQHADDATTCONF(Signal*, Uint32 ssId);
- // GSN_LQHFRAGREQ [ sub-op ]
- struct Ss_LQHFRAGREQ : SsParallel {
- LqhFragReq m_req;
- Ss_LQHFRAGREQ() {
- m_sendREQ = (SsFUNC)&DblqhProxy::sendLQHFRAGREQ;
- m_sendCONF = (SsFUNC)&DblqhProxy::sendLQHFRAGCONF;
- }
- enum { poolSize = 1 };
- static SsPool<Ss_LQHFRAGREQ>& pool(LocalProxy* proxy) {
- return ((DblqhProxy*)proxy)->c_ss_LQHFRAGREQ;
- }
- };
- SsPool<Ss_LQHFRAGREQ> c_ss_LQHFRAGREQ;
+ // GSN_LQHFRAGREQ [ pass-through ]
void execLQHFRAGREQ(Signal*);
- void sendLQHFRAGREQ(Signal*, Uint32 ssId);
- void execLQHFRAGCONF(Signal*);
- void execLQHFRAGREF(Signal*);
- void sendLQHFRAGCONF(Signal*, Uint32 ssId);
// GSN_TAB_COMMITREQ [ sub-op ]
struct Ss_TAB_COMMITREQ : SsParallel {
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-10-01 06:36:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-10-02 21:58:44 +0000
@@ -3083,8 +3083,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
regTcPtr->tcNodedata[2] = Tdata5;
regTcPtr->tcNodedata[3] = Tdata6;
- UintR Tdata7 = conf->instanceKey;
- regTcPtr->lqhInstanceKey = Tdata7;
+ regTcPtr->lqhInstanceKey = (Tdata2 >> 24) & 127;// 1 bit used for reorg
moving
Uint8 Toperation = regTcPtr->operation;
Uint8 Tdirty = regTcPtr->dirtyOp;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-09-16 18:29:47 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-10-02 21:58:44 +0000
@@ -250,22 +250,20 @@ void Dbtup::sendReadAttrinfo(Signal* sig
* in LCP case since user-backup uses single worker.
*/
BlockNumber blockMain = blockToMain(block);
+ const bool sameInstance = blockToInstance(block) == instance();
if (blockMain == DBLQH)
{
EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
jamEntry();
}
- else if (blockMain == SUMA)
+ else if (blockMain == SUMA && sameInstance)
{
- // wl4391_todo not MT safe
- EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, 0);
+ EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
jamEntry();
}
- else if (blockMain == BACKUP)
+ else if (blockMain == BACKUP && sameInstance)
{
- Uint32 iNo = blockToInstance(block);
- // wl4391_todo maybe not MT safe in non-LCP case
- EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, iNo);
+ EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
jamEntry();
}
else
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-10-02 17:51:17 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-10-02 21:58:44 +0000
@@ -3415,7 +3415,7 @@ Suma::execTRANSID_AI(Signal* signal)
CRASH_INSERTION(13015);
TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
const Uint32 opPtrI = data->connectPtr;
- const Uint32 length = signal->length() - 3;
+ Uint32 length = signal->length() - 3;
if(f_bufferLock == 0){
f_bufferLock = opPtrI;
@@ -3423,6 +3423,16 @@ Suma::execTRANSID_AI(Signal* signal)
ndbrequire(f_bufferLock == opPtrI);
}
+ if (signal->getNoOfSections())
+ {
+ SectionHandle handle(this, signal);
+ SegmentedSectionPtr dataPtr;
+ handle.getSection(dataPtr, 0);
+ length = dataPtr.sz;
+ copy(data->attrData, dataPtr);
+ releaseSections(handle);
+ }
+
Ptr<SyncRecord> syncPtr;
c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
| Thread |
|---|
| • bzr push into mysql-5.1 branch (jonas:2875) | Jonas Oreland | 3 Oct |