Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2426 07/03/07 15:31:11 tomas@stripped +5 -0
Merge poseidon.mysql.com:/home/tomas/mysql-5.0-telco-gca-single-user
into poseidon.mysql.com:/home/tomas/mysql-5.1-telco-gca-single-user
storage/ndb/src/ndbapi/TransporterFacade.hpp
1.35 07/03/07 15:31:00 tomas@stripped +0 -0
Auto merged
storage/ndb/src/ndbapi/Ndb.cpp
1.84 07/03/07 15:31:00 tomas@stripped +0 -0
Auto merged
storage/ndb/src/ndbapi/ClusterMgr.cpp
1.35 07/03/07 15:31:00 tomas@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
1.132 07/03/07 15:30:58 tomas@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
1.47 07/03/07 15:30:58 tomas@stripped +0 -0
Auto merged
storage/ndb/src/ndbapi/TransporterFacade.hpp
1.23.6.2 07/03/07 15:30:58 tomas@stripped +0 -0
Merge rename: ndb/src/ndbapi/TransporterFacade.hpp ->
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/Ndb.cpp
1.49.16.2 07/03/07 15:30:58 tomas@stripped +0 -0
Merge rename: ndb/src/ndbapi/Ndb.cpp -> storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
1.17.12.2 07/03/07 15:30:58 tomas@stripped +0 -0
Merge rename: ndb/src/ndbapi/ClusterMgr.cpp ->
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
1.73.35.2 07/03/07 15:30:58 tomas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/dbtc/DbtcMain.cpp ->
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
1.26.13.2 07/03/07 15:30:58 tomas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/dbtc/Dbtc.hpp ->
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.mysql.com
# Root: /home/tomas/mysql-5.1-telco-gca-single-user/RESYNC
--- 1.26.13.1/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2007-03-07 10:46:37 +07:00
+++ 1.47/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2007-03-07 15:30:58 +07:00
@@ -387,6 +387,13 @@ public:
Uint32 fireingOperation;
/**
+ * The fragment id of the firing operation. This will be appended
+ * to the Primary Key such that the record can be found even in the
+ * case of user defined partitioning.
+ */
+ Uint32 fragId;
+
+ /**
* Used for scrapping in case of node failure
*/
Uint32 nodeId;
@@ -524,7 +531,7 @@ public:
/**
* The list of defined indexes
*/
- ArrayList<TcIndexData> c_theIndexes;
+ DLList<TcIndexData> c_theIndexes;
UintR c_maxNumberOfIndexes;
struct TcIndexOperation {
@@ -730,7 +737,7 @@ public:
UintR accumulatingIndexOp;
UintR executingIndexOp;
UintR tcIndxSendArray[6];
- ArrayList<TcIndexOperation> theSeizedIndexOperations;
+ DLList<TcIndexOperation> theSeizedIndexOperations;
};
typedef Ptr<ApiConnectRecord> ApiConnectRecordPtr;
@@ -865,7 +872,7 @@ public:
Uint8 distributionKeyIndicator;
Uint8 m_special_hash; // collation or distribution key
- Uint8 unused2;
+ Uint8 m_no_disk_flag;
Uint8 lenAiInTckeyreq; /* LENGTH OF ATTRIBUTE INFORMATION IN TCKEYREQ */
Uint8 fragmentDistributionKey; /* DIH generation no */
@@ -876,11 +883,7 @@ public:
*/
Uint8 opExec;
- /**
- * LOCK TYPE OF OPERATION IF READ OPERATION
- * 0 = READ LOCK, 1 = WRITE LOCK
- */
- Uint8 opLock;
+ Uint8 unused;
/**
* IS THE OPERATION A SIMPLE TRANSACTION
@@ -940,8 +943,7 @@ public:
NF_CHECK_SCAN = 0x2,
NF_CHECK_TRANSACTION = 0x4,
NF_CHECK_DROP_TAB = 0x8,
- NF_NODE_FAIL_BITS = 0xF, // All bits...
- NF_STARTED = 0x10
+ NF_NODE_FAIL_BITS = 0xF // All bits...
};
Uint32 m_nf_bits;
NdbNodeBitmask m_lqh_trans_conf;
@@ -965,7 +967,8 @@ public:
Uint8 noOfKeyAttr;
Uint8 hasCharAttr;
Uint8 noOfDistrKeys;
-
+ Uint8 hasVarKeys;
+
bool checkTable(Uint32 schemaVersion) const {
return enabled && !dropping &&
(table_version_major(schemaVersion) == table_version_major(currentSchemaVersion));
@@ -1260,7 +1263,7 @@ public:
typedef Ptr<TcFailRecord> TcFailRecordPtr;
public:
- Dbtc(const class Configuration &);
+ Dbtc(Block_context&);
virtual ~Dbtc();
private:
@@ -1280,7 +1283,7 @@ private:
void execLQHKEYREF(Signal* signal);
void execTRANSID_AI_R(Signal* signal);
void execKEYINFO20_R(Signal* signal);
-
+ void execROUTE_ORD(Signal* signal);
// Received signals
void execDUMP_STATE_ORD(Signal* signal);
void execSEND_PACKED(Signal* signal);
@@ -1318,7 +1321,6 @@ private:
void execCOMMITCONF(Signal* signal);
void execABORTCONF(Signal* signal);
void execNODE_FAILREP(Signal* signal);
- void execNODE_START_REP(Signal* signal);
void execINCL_NODEREQ(Signal* signal);
void execTIME_SIGNAL(Signal* signal);
void execAPI_FAILREQ(Signal* signal);
--- 1.73.35.1/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-03-07 10:46:37 +07:00
+++ 1.132/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-03-07 15:30:58 +07:00
@@ -37,6 +37,7 @@
#include <signaldata/TcContinueB.hpp>
#include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/AbortAll.hpp>
+#include <signaldata/DihFragCount.hpp>
#include <signaldata/ScanFrag.hpp>
#include <signaldata/ScanTab.hpp>
#include <signaldata/PrepDropTab.hpp>
@@ -70,6 +71,8 @@
#include <NdbOut.hpp>
#include <DebuggerNames.hpp>
+#include <signaldata/RouteOrd.hpp>
+
// Use DEBUG to print messages that should be
// seen only when we debug the product
#ifdef VM_TRACE
@@ -343,7 +346,7 @@ void Dbtc::execTC_SCHVERREQ(Signal* sign
tabptr.p->noOfKeyAttr = desc->noOfKeyAttr;
tabptr.p->hasCharAttr = desc->hasCharAttr;
tabptr.p->noOfDistrKeys = desc->noOfDistrKeys;
-
+ tabptr.p->hasVarKeys = desc->noOfVarKeys > 0;
signal->theData[0] = tabptr.i;
signal->theData[1] = retPtr;
sendSignal(retRef, GSN_TC_SCHVERCONF, signal, 2, JBB);
@@ -609,7 +612,7 @@ void Dbtc::execREAD_CONFIG_REQ(Signal* s
jamEntry();
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
initData();
@@ -1299,6 +1302,7 @@ void Dbtc::execTCRELEASEREQ(Signal* sign
apiConnectptr.p->firstTcConnect == RNIL))
{
jam(); /* JUST REPLY OK */
+ apiConnectptr.p->m_transaction_nodes.clear();
releaseApiCon(signal, apiConnectptr.i);
signal->theData[0] = tuserpointer;
sendSignal(tapiBlockref,
@@ -2320,14 +2324,15 @@ Dbtc::handle_special_hash(Uint32 dstHash
{
Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY];
const TableRecord* tabPtrP = &tableRecord[tabPtrI];
+ const bool hasVarKeys = tabPtrP->hasVarKeys;
const bool hasCharAttr = tabPtrP->hasCharAttr;
- const bool hasDistKeys = tabPtrP->noOfDistrKeys > 0;
+ const bool compute_distkey = distr && (tabPtrP->noOfDistrKeys > 0);
Uint32 *dst = (Uint32*)Tmp;
Uint32 dstPos = 0;
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
Uint32 * keyPartLenPtr;
- if(hasCharAttr)
+ if(hasCharAttr || (compute_distkey && hasVarKeys))
{
keyPartLenPtr = keyPartLen;
dstPos = xfrm_key(tabPtrI, src, dst, sizeof(Tmp) >> 2, keyPartLenPtr);
@@ -2345,7 +2350,7 @@ Dbtc::handle_special_hash(Uint32 dstHash
md5_hash(dstHash, (Uint64*)dst, dstPos);
- if(distr && hasDistKeys)
+ if(compute_distkey)
{
jam();
@@ -2755,12 +2760,14 @@ void Dbtc::execTCKEYREQ(Signal* signal)
Uint8 TDirtyFlag = tcKeyReq->getDirtyFlag(Treqinfo);
Uint8 TInterpretedFlag = tcKeyReq->getInterpretedFlag(Treqinfo);
Uint8 TDistrKeyFlag = tcKeyReq->getDistributionKeyFlag(Treqinfo);
+ Uint8 TNoDiskFlag = TcKeyReq::getNoDiskFlag(Treqinfo);
Uint8 TexecuteFlag = TexecFlag;
regCachePtr->opSimple = TSimpleFlag;
regCachePtr->opExec = TInterpretedFlag;
regTcPtr->dirtyOp = TDirtyFlag;
regCachePtr->distributionKeyIndicator = TDistrKeyFlag;
+ regCachePtr->m_no_disk_flag = TNoDiskFlag;
//-------------------------------------------------------------
// The next step is to read the upto three conditional words.
@@ -2824,17 +2831,9 @@ void Dbtc::execTCKEYREQ(Signal* signal)
regCachePtr->attrinfo15[2] = Tdata4;
regCachePtr->attrinfo15[3] = Tdata5;
- if (TOperationType == ZREAD) {
+ if (TOperationType == ZREAD || TOperationType == ZREAD_EX) {
Uint32 TreadCount = c_counters.creadCount;
jam();
- regCachePtr->opLock = 0;
- c_counters.creadCount = TreadCount + 1;
- } else if(TOperationType == ZREAD_EX){
- Uint32 TreadCount = c_counters.creadCount;
- jam();
- TOperationType = ZREAD;
- regTcPtr->operation = ZREAD;
- regCachePtr->opLock = ZUPDATE;
c_counters.creadCount = TreadCount + 1;
} else {
if(regApiPtr->commitAckMarker == RNIL){
@@ -2868,24 +2867,10 @@ void Dbtc::execTCKEYREQ(Signal* signal)
c_counters.cwriteCount = TwriteCount + 1;
switch (TOperationType) {
case ZUPDATE:
- jam();
- if (TattrLen == 0) {
- //TCKEY_abort(signal, 5);
- //return;
- }//if
- /*---------------------------------------------------------------------*/
- // The missing break is intentional since we also want to set the opLock
- // variable also for updates
- /*---------------------------------------------------------------------*/
case ZINSERT:
case ZDELETE:
- jam();
- regCachePtr->opLock = TOperationType;
- break;
case ZWRITE:
jam();
- // A write operation is originally an insert operation.
- regCachePtr->opLock = ZINSERT;
break;
default:
TCKEY_abort(signal, 9);
@@ -3060,7 +3045,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
tnoOfStandby = (tnodeinfo >> 8) & 3;
regCachePtr->fragmentDistributionKey = (tnodeinfo >> 16) & 255;
- if (Toperation == ZREAD) {
+ if (Toperation == ZREAD || Toperation == ZREAD_EX) {
if (Tdirty == 1) {
jam();
/*-------------------------------------------------------------*/
@@ -3093,28 +3078,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
}//if
}//for
}
-
- if (regTcPtr->tcNodedata[0] != getOwnNodeId())
- {
- jam();
- for (Uint32 i = 0; i < tnoOfBackup + 1; i++)
- {
- HostRecordPtr hostPtr;
- hostPtr.i = regTcPtr->tcNodedata[i];
- ptrCheckGuard(hostPtr, chostFilesize, hostRecord);
- if (hostPtr.p->m_nf_bits & HostRecord::NF_STARTED)
- {
- jam();
- if (i != 0)
- {
- jam();
- regTcPtr->tcNodedata[0] = hostPtr.i;
- }
- break;
- }
- }
- }//if
- }
+ }//if
jam();
regTcPtr->lastReplicaNo = 0;
regTcPtr->noOfNodes = 1;
@@ -3210,6 +3174,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
TcConnectRecord * const regTcPtr = tcConnectptr.p;
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
CacheRecord * const regCachePtr = cachePtr.p;
+ Uint32 version = getNodeInfo(refToNode(TBRef)).m_version;
UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
#ifdef ERROR_INSERT
if (ERROR_INSERTED(8002)) {
@@ -3253,7 +3218,12 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
bool simpleRead = (sig1 == ZREAD && sig0 == ZTRUE);
LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
- LqhKeyReq::setLockType(Tdata10, regCachePtr->opLock);
+ if (unlikely(version < NDBD_ROWID_VERSION))
+ {
+ Uint32 op = regTcPtr->operation;
+ Uint32 lock = (Operation_t) op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ?
ZINSERT : (Operation_t) op;
+ LqhKeyReq::setLockType(Tdata10, lock);
+ }
/* ---------------------------------------------------------------------- */
// Indicate Application Reference is present in bit 15
/* ---------------------------------------------------------------------- */
@@ -3262,6 +3232,8 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
LqhKeyReq::setInterpretedFlag(Tdata10, regCachePtr->opExec);
LqhKeyReq::setSimpleFlag(Tdata10, sig0);
LqhKeyReq::setOperation(Tdata10, sig1);
+ LqhKeyReq::setNoDiskFlag(Tdata10, regCachePtr->m_no_disk_flag);
+
/* -----------------------------------------------------------------------
* Sequential Number of first LQH = 0, bit 22-23
* IF ATTRIBUTE INFORMATION IS SENT IN TCKEYREQ,
@@ -3979,7 +3951,7 @@ void Dbtc::sendtckeyconf(Signal* signal,
const UintR TopWords = (UintR)regApiPtr->tckeyrec;
localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
const Uint32 type = getNodeInfo(localHostptr.i).m_type;
- const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+ const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1;
ptrAss(localHostptr, hostRecord);
@@ -4661,7 +4633,8 @@ void Dbtc::sendApiCommit(Signal* signal)
commitConf->transId1 = regApiPtr->transid[0];
commitConf->transId2 = regApiPtr->transid[1];
commitConf->gci = regApiPtr->globalcheckpointid;
- sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
+
+ sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
TcCommitConf::SignalLength, JBB);
} else if (regApiPtr->returnsignal == RS_NO_RETURN) {
jam();
@@ -4851,13 +4824,14 @@ Dbtc::execTC_COMMIT_ACK(Signal* signal){
key.transid2 = signal->theData[1];
CommitAckMarkerPtr removedMarker;
- m_commitAckMarkerHash.release(removedMarker, key);
+ m_commitAckMarkerHash.remove(removedMarker, key);
if (removedMarker.i == RNIL) {
jam();
warningHandlerLab(signal, __LINE__);
return;
}//if
sendRemoveMarkers(signal, removedMarker.p);
+ m_commitAckMarkerPool.release(removedMarker);
}
void
@@ -5202,6 +5176,19 @@ void Dbtc::execLQHKEYREF(Signal* signal)
return;
}
+ /* Only ref in certain situations */
+ {
+ const Uint32 opType = regTcPtr->operation;
+ if ( (opType == ZDELETE && errCode != ZNOT_FOUND)
+ || (opType == ZINSERT && errCode != ZALREADYEXIST)
+ || (opType == ZUPDATE && errCode != ZNOT_FOUND)
+ || (opType == ZWRITE && errCode != 839 && errCode != 840))
+ {
+ TCKEY_abort(signal, 49);
+ return;
+ }
+ }
+
/* *************** */
/* TCKEYREF < */
/* *************** */
@@ -7109,19 +7096,6 @@ void Dbtc::execNODE_FAILREP(Signal* sign
}//Dbtc::execNODE_FAILREP()
void
-Dbtc::execNODE_START_REP(Signal* signal)
-{
- Uint32 nodeId = signal->theData[0];
- hostptr.i = nodeId;
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- if (hostptr.p->m_nf_bits == 0)
- {
- jam();
- hostptr.p->m_nf_bits |= HostRecord::NF_STARTED;
- }
-}
-
-void
Dbtc::checkNodeFailComplete(Signal* signal,
Uint32 failedNodeId,
Uint32 bit)
@@ -7209,15 +7183,20 @@ Dbtc::nodeFailCheckTransactions(Signal*
for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++)
{
ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord);
+ Uint32 state = transPtr.p->apiConnectstate;
if (transPtr.p->m_transaction_nodes.get(failedNodeId))
{
jam();
-
- // Force timeout regardless of state
- c_appl_timeout_value = 1;
- setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
- timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
- c_appl_timeout_value = TapplTimeout;
+
+ // avoid assertion in timeoutfoundlab
+ if (state != CS_PREPARE_TO_COMMIT)
+ {
+ // Force timeout regardless of state
+ c_appl_timeout_value = 1;
+ setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
+ timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
+ c_appl_timeout_value = TapplTimeout;
+ }
}
// Send CONTINUEB to continue later
@@ -8919,6 +8898,7 @@ void Dbtc::initScanrec(ScanRecordPtr sca
ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
ScanFragReq::setTupScanFlag(tmp, ScanTabReq::getTupScanFlag(ri));
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
+ ScanFragReq::setNoDiskFlag(tmp, ScanTabReq::getNoDiskFlag(ri));
scanptr.p->scanRequestInfo = tmp;
scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
@@ -9040,9 +9020,11 @@ void Dbtc::diFcountReqLab(Signal* signal
* THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED.
* WE MUST FIRST GET THE NUMBER OF FRAGMENTS IN THE TABLE.
***************************************************/
- signal->theData[0] = tcConnectptr.p->dihConnectptr;
- signal->theData[1] = scanptr.p->scanTableref;
- sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB);
+ DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+ req->m_connectionData = tcConnectptr.p->dihConnectptr;
+ req->m_tableRef = scanptr.p->scanTableref;
+ sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal,
+ DihFragCountReq::SignalLength, JBB);
}
else
{
@@ -9053,17 +9035,18 @@ void Dbtc::diFcountReqLab(Signal* signal
UintR TerrorIndicator = signal->theData[0];
jamEntry();
if (TerrorIndicator != 0) {
- signal->theData[0] = tcConnectptr.i;
- //signal->theData[1] Contains error
+ DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+ ref->m_connectionData = tcConnectptr.i;
+ ref->m_error = signal->theData[1];
execDI_FCOUNTREF(signal);
return;
}
UintR Tdata1 = signal->theData[1];
scanptr.p->scanNextFragId = Tdata1;
-
- signal->theData[0] = tcConnectptr.i;
- signal->theData[1] = 1; // Frag count
+ DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+ conf->m_connectionData = tcConnectptr.i;
+ conf->m_fragmentCount = 1; // Frag count
execDI_FCOUNTCONF(signal);
}
return;
@@ -9081,8 +9064,9 @@ void Dbtc::diFcountReqLab(Signal* signal
void Dbtc::execDI_FCOUNTCONF(Signal* signal)
{
jamEntry();
- tcConnectptr.i = signal->theData[0];
- Uint32 tfragCount = signal->theData[1];
+ DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+ tcConnectptr.i = conf->m_connectionData;
+ Uint32 tfragCount = conf->m_fragmentCount;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -9166,9 +9150,10 @@ void Dbtc::execDI_FCOUNTCONF(Signal* sig
void Dbtc::execDI_FCOUNTREF(Signal* signal)
{
jamEntry();
- tcConnectptr.i = signal->theData[0];
+ DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+ tcConnectptr.i = ref->m_connectionData;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- const Uint32 errCode = signal->theData[1];
+ const Uint32 errCode = ref->m_error;
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ScanRecordPtr scanptr;
@@ -10323,6 +10308,7 @@ void Dbtc::initTable(Signal* signal)
tabptr.p->noOfKeyAttr = 0;
tabptr.p->hasCharAttr = 0;
tabptr.p->noOfDistrKeys = 0;
+ tabptr.p->hasVarKeys = 0;
}//for
}//Dbtc::initTable()
@@ -11387,7 +11373,6 @@ void Dbtc::execFIRE_TRIG_ORD(Signal* sig
ApiConnectRecordPtr transPtr;
TcConnectRecord *localTcConnectRecord = tcConnectRecord;
TcConnectRecordPtr opPtr;
-
/**
* TODO
* Check transid,
@@ -11401,6 +11386,7 @@ void Dbtc::execFIRE_TRIG_ORD(Signal* sig
c_firedTriggerHash.remove(trigPtr);
+ trigPtr.p->fragId= fireOrd->fragId;
bool ok = trigPtr.p->keyValues.getSize() == fireOrd->m_noPrimKeyWords;
ok &= trigPtr.p->afterValues.getSize() == fireOrd->m_noAfterValueWords;
ok &= trigPtr.p->beforeValues.getSize() == fireOrd->m_noBeforeValueWords;
@@ -11623,7 +11609,7 @@ void Dbtc::sendTcIndxConf(Signal* signal
const UintR TopWords = (UintR)regApiPtr->tcindxrec;
localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
const Uint32 type = getNodeInfo(localHostptr.i).m_type;
- const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+ const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL ? 0 : 1);
ptrAss(localHostptr, hostRecord);
@@ -12322,7 +12308,11 @@ void Dbtc::executeIndexOperation(Signal*
Uint32 dataPos = 0;
TcKeyReq * const tcIndxReq = &indexOp->tcIndxReq;
TcKeyReq * const tcKeyReq = (TcKeyReq *)signal->getDataPtrSend();
- Uint32 * dataPtr = &tcKeyReq->scanInfo;
+ /*
+ Data points to distrGroupHashValue since scanInfo is used to send
+ fragment id of receiving fragment
+ */
+ Uint32 * dataPtr = &tcKeyReq->distrGroupHashValue;
Uint32 tcKeyLength = TcKeyReq::StaticLength;
Uint32 tcKeyRequestInfo = tcIndxReq->requestInfo;
TcIndexData* indexData;
@@ -12361,11 +12351,16 @@ void Dbtc::executeIndexOperation(Signal*
regApiPtr->executingIndexOp = indexOp->indexOpId;;
regApiPtr->noIndexOp++; // Increase count
- // Filter out AttributeHeader:s since this should not be in key
+ /*
+ Filter out AttributeHeader:s since this should not be in key.
+ Also filter out fragment id from primary key and handle that
+ separately by setting it as Distribution Key and set indicator.
+ */
+
AttributeHeader* attrHeader = (AttributeHeader *) aiIter.data;
Uint32 headerSize = attrHeader->getHeaderSize();
- Uint32 keySize = attrHeader->getDataSize();
+ Uint32 keySize = attrHeader->getDataSize() - 1;
TcKeyReq::setKeyLength(tcKeyRequestInfo, keySize);
// Skip header
if (headerSize == 1) {
@@ -12375,6 +12370,9 @@ void Dbtc::executeIndexOperation(Signal*
jam();
moreKeyData = indexOp->transIdAI.next(aiIter, headerSize - 1);
}//if
+ tcKeyReq->scanInfo = *aiIter.data; //Fragment Id
+ moreKeyData = indexOp->transIdAI.next(aiIter);
+ TcKeyReq::setDistributionKeyFlag(tcKeyRequestInfo, 1U);
while(// If we have not read complete key
(keySize != 0) &&
(dataPos < keyBufSize)) {
@@ -12640,7 +12638,7 @@ void Dbtc::executeTriggers(Signal* signa
tmp2.release();
LocalDataBuffer<11> tmp3(pool, trigPtr.p->afterValues);
tmp3.release();
- regApiPtr->theFiredTriggers.release(trigPtr.i);
+ regApiPtr->theFiredTriggers.release(trigPtr);
}
trigPtr = nextTrigPtr;
}
@@ -12757,7 +12755,7 @@ void Dbtc::insertIntoIndexTable(Signal*
AttributeBuffer::DataBufferIterator iter;
Uint32 attrId = 0;
Uint32 keyLength = 0;
- Uint32 totalPrimaryKeyLength = 0;
+ Uint32 totalPrimaryKeyLength = 1; // fragment length
Uint32 hops;
indexTabPtr.i = indexData->indexId;
@@ -12810,11 +12808,12 @@ void Dbtc::insertIntoIndexTable(Signal*
hops = attrHeader->getHeaderSize() + attrHeader->getDataSize();
moreAttrData = keyValues.next(iter, hops);
}
- AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength);
+ AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength << 2);
+ Uint32 attributesLength = afterValues.getSize() +
+ pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
TcKeyReq::setKeyLength(tcKeyRequestInfo, keyLength);
- tcKeyReq->attrLen = afterValues.getSize() +
- pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+ tcKeyReq->attrLen = attributesLength;
tcKeyReq->tableId = indexData->indexId;
TcKeyReq::setOperationType(tcKeyRequestInfo, ZINSERT);
TcKeyReq::setExecutingTrigger(tcKeyRequestInfo, true);
@@ -12864,8 +12863,11 @@ void Dbtc::insertIntoIndexTable(Signal*
}
tcKeyLength += dataPos;
- Uint32 attributesLength = afterValues.getSize() +
- pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+ /*
+ Size of attrinfo is unique index attributes one by one, header for each
+ of them (all contained in the afterValues data structure), plus a header,
+ the primary key (compacted) and the fragment id before the primary key
+ */
if (attributesLength <= attrBufSize) {
jam();
// ATTRINFO fits in TCKEYREQ
@@ -12882,6 +12884,10 @@ void Dbtc::insertIntoIndexTable(Signal*
// as one attribute
pkAttrHeader.insertHeader(dataPtr);
dataPtr += pkAttrHeader.getHeaderSize();
+ /*
+ Insert fragment id before primary key as part of reference to tuple
+ */
+ *dataPtr++ = firedTriggerData->fragId;
moreAttrData = keyValues.first(iter);
while(moreAttrData) {
jam();
@@ -13077,6 +13083,29 @@ void Dbtc::insertIntoIndexTable(Signal*
pkAttrHeader.insertHeader(dataPtr);
dataPtr += pkAttrHeader.getHeaderSize();
attrInfoPos += pkAttrHeader.getHeaderSize();
+ /*
+ Add fragment id before primary key
+ TODO: This code really needs to be made into a long signal
+ to remove this messy code.
+ */
+ if (attrInfoPos == AttrInfo::DataLength)
+ {
+ jam();
+ // Flush ATTRINFO
+#if INTERNAL_TRIGGER_TCKEYREQ_JBA
+ sendSignal(reference(), GSN_ATTRINFO, signal,
+ AttrInfo::HeaderLength + AttrInfo::DataLength, JBA);
+#else
+ EXECUTE_DIRECT(DBTC, GSN_ATTRINFO, signal,
+ AttrInfo::HeaderLength + AttrInfo::DataLength);
+ jamEntry();
+#endif
+ dataPtr = (Uint32 *) &attrInfo->attrData;
+ attrInfoPos = 0;
+ }
+ attrInfoPos++;
+ *dataPtr++ = firedTriggerData->fragId;
+
moreAttrData = keyValues.first(iter);
while(moreAttrData) {
jam();
@@ -13377,3 +13406,56 @@ Dbtc::TableRecord::getErrorCode(Uint32 s
return 0;
}
+void
+Dbtc::execROUTE_ORD(Signal* signal)
+{
+ jamEntry();
+ if(!assembleFragments(signal)){
+ jam();
+ return;
+ }
+
+ RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+ Uint32 dstRef = ord->dstRef;
+ Uint32 srcRef = ord->srcRef;
+ Uint32 gsn = ord->gsn;
+ Uint32 cnt = ord->cnt;
+
+ if (likely(getNodeInfo(refToNode(dstRef)).m_connected))
+ {
+ jam();
+ Uint32 secCount = signal->getNoOfSections();
+ SegmentedSectionPtr ptr[3];
+ ndbrequire(secCount >= 1 && secCount <= 3);
+
+ jamLine(secCount);
+ for (Uint32 i = 0; i<secCount; i++)
+ signal->getSection(ptr[i], i);
+
+ /**
+ * Put section 0 in signal->theData
+ */
+ ndbrequire(ptr[0].sz <= 25);
+ copy(signal->theData, ptr[0]);
+
+ signal->header.m_noOfSections = 0;
+
+ /**
+ * Shift rest of sections
+ */
+ for(Uint32 i = 1; i<secCount; i++)
+ {
+ signal->setSection(ptr[i], i - 1);
+ }
+
+ sendSignal(dstRef, gsn, signal, ptr[0].sz, JBB);
+
+ signal->header.m_noOfSections = 0;
+ signal->setSection(ptr[0], 0);
+ releaseSections(signal);
+ return ;
+ }
+
+ warningEvent("Unable to route GSN: %d from %x to %x",
+ gsn, srcRef, dstRef);
+}
--- 1.17.12.1/ndb/src/ndbapi/ClusterMgr.cpp 2007-03-07 10:46:37 +07:00
+++ 1.35/storage/ndb/src/ndbapi/ClusterMgr.cpp 2007-03-07 15:31:00 +07:00
@@ -36,6 +36,7 @@
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
+int global_flag_skip_invalidate_cache = 0;
//#define DEBUG_REG
// Just a C wrapper for threadMain
@@ -44,15 +45,7 @@ void*
runClusterMgr_C(void * me)
{
((ClusterMgr*) me)->threadMain();
- /**
- * Sleep to allow another thread that is not exiting to take control
- * of signals allocated by this thread
- *
- * see Ndb::~Ndb() in Ndbinit.cpp
- */
-#ifdef NDB_OSE
- NdbSleep_MilliSleep(50);
-#endif
+
return NULL;
}
@@ -72,6 +65,7 @@ ClusterMgr::ClusterMgr(TransporterFacade
noOfConnectedNodes= 0;
theClusterMgrThread= 0;
m_connect_count = 0;
+ m_cluster_state = CS_waiting_for_clean_cache;
DBUG_VOID_RETURN;
}
@@ -112,18 +106,6 @@ ClusterMgr::init(ndb_mgm_configuration_i
case NODE_TYPE_MGM:
theNodes[tmp].m_info.m_type = NodeInfo::MGM;
break;
- case NODE_TYPE_REP:
- theNodes[tmp].m_info.m_type = NodeInfo::REP;
- break;
- case NODE_TYPE_EXT_REP:
- theNodes[tmp].m_info.m_type = NodeInfo::REP;
- {
- Uint32 hbFreq = 10000;
- //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq);
- theNodes[tmp].hbFrequency = hbFreq;
- assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000);
- }
- break;
default:
type = type;
#if 0
@@ -250,6 +232,16 @@ ClusterMgr::threadMain( ){
/**
* Start of Secure area for use of Transporter
*/
+ if (m_cluster_state == CS_waiting_for_clean_cache)
+ {
+ theFacade.m_globalDictCache.lock();
+ unsigned sz= theFacade.m_globalDictCache.get_size();
+ theFacade.m_globalDictCache.unlock();
+ if (sz)
+ goto next;
+ m_cluster_state = CS_waiting_for_first_connect;
+ }
+
theFacade.lock_mutex();
for (int i = 1; i < MAX_NODES; i++){
/**
@@ -281,13 +273,6 @@ ClusterMgr::threadMain( ){
theNode.hbCounter = 0;
}
- /**
- * If the node is of type REP,
- * then the receiver of the signal should be API_CLUSTERMGR
- */
- if (theNode.m_info.m_type == NodeInfo::REP) {
- signal.theReceiversBlockNumber = API_CLUSTERMGR;
- }
#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
#endif
@@ -304,6 +289,7 @@ ClusterMgr::threadMain( ){
*/
theFacade.unlock_mutex();
+next:
// Sleep for 100 ms between each Registration Heartbeat
Uint64 before = now;
NdbSleep_MilliSleep(100);
@@ -412,9 +398,6 @@ ClusterMgr::execAPI_REGCONF(const Uint32
}//if
node.m_info.m_heartbeat_cnt = 0;
node.hbCounter = 0;
- if (node.m_info.m_type != NodeInfo::REP) {
- node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
- }
if(waitingForHB)
{
@@ -426,6 +409,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32
NdbCondition_Broadcast(waitForHBCond);
}
}
+ node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
}
void
@@ -483,6 +467,8 @@ ClusterMgr::execNF_COMPLETEREP(const Uin
void
ClusterMgr::reportConnected(NodeId nodeId){
+ DBUG_ENTER("ClusterMgr::reportConnected");
+ DBUG_PRINT("info", ("nodeId: %u", nodeId));
/**
* Ensure that we are sending heartbeat every 100 ms
* until we have got the first reply from NDB providing
@@ -502,16 +488,14 @@ ClusterMgr::reportConnected(NodeId nodeI
* if first API_REGCONF has not arrived
*/
theNode.m_state.m_connected_nodes.set(nodeId);
-
- if (theNode.m_info.m_type != NodeInfo::REP) {
- theNode.hbFrequency = 0;
- }
+ theNode.hbFrequency = 0;
theNode.m_info.m_version = 0;
theNode.compatible = true;
theNode.nfCompleteRep = true;
theNode.m_state.startLevel = NodeState::SL_NOTHING;
theFacade.ReportNodeAlive(nodeId);
+ DBUG_VOID_RETURN;
}
void
@@ -551,10 +535,14 @@ ClusterMgr::reportNodeFailed(NodeId node
theNode.nfCompleteRep = false;
if(noOfAliveNodes == 0)
{
- theFacade.m_globalDictCache.lock();
- theFacade.m_globalDictCache.invalidate_all();
- theFacade.m_globalDictCache.unlock();
- m_connect_count ++;
+ if (!global_flag_skip_invalidate_cache)
+ {
+ theFacade.m_globalDictCache.lock();
+ theFacade.m_globalDictCache.invalidate_all();
+ theFacade.m_globalDictCache.unlock();
+ m_connect_count ++;
+ m_cluster_state = CS_waiting_for_clean_cache;
+ }
NFCompleteRep rep;
for(Uint32 i = 1; i<MAX_NODES; i++){
if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
--- 1.49.16.1/ndb/src/ndbapi/Ndb.cpp 2007-03-07 10:46:38 +07:00
+++ 1.84/storage/ndb/src/ndbapi/Ndb.cpp 2007-03-07 15:31:00 +07:00
@@ -27,6 +27,8 @@ Name: Ndb.cpp
#include "NdbImpl.hpp"
#include <NdbOperation.hpp>
#include <NdbTransaction.hpp>
+#include <NdbEventOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
#include <NdbRecAttr.hpp>
#include <md5_hash.hpp>
#include <NdbSleep.h>
@@ -142,7 +144,7 @@ Ndb::NDB_connect(Uint32 tNode)
//***************************************************************************
int tReturnCode;
- TransporterFacade *tp = TransporterFacade::instance();
+ TransporterFacade *tp = theImpl->m_transporter_facade;
DBUG_ENTER("Ndb::NDB_connect");
@@ -177,23 +179,9 @@ Ndb::NDB_connect(Uint32 tNode)
tSignal->setData(theMyRef, 2); // Set my block reference
tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
Uint32 nodeSequence;
- { // send and receive signal
- Guard guard(tp->theMutexPtr);
- nodeSequence = tp->getNodeSequence(tNode);
- bool node_is_alive = tp->get_node_alive(tNode);
- if (node_is_alive) {
- tReturnCode = tp->sendSignal(tSignal, tNode);
- releaseSignal(tSignal);
- if (tReturnCode != -1) {
- theImpl->theWaiter.m_node = tNode;
- theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
- tReturnCode = receiveResponse();
- }//if
- } else {
- releaseSignal(tSignal);
- tReturnCode = -1;
- }//if
- }
+ tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
+ 0, &nodeSequence);
+ releaseSignal(tSignal);
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected))
{
//************************************************
// Send and receive was successful
@@ -239,10 +227,9 @@ Remark: Disconnect all connection
void
Ndb::doDisconnect()
{
+ DBUG_ENTER("Ndb::doDisconnect");
NdbTransaction* tNdbCon;
CHECK_STATUS_MACRO_VOID;
- /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */
- DBUG_ENTER("Ndb::doDisconnect");
Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
Uint8 *theDBnodes= theImpl->theDBnodes;
@@ -600,6 +587,7 @@ Ndb::NdbTamper(TamperType aAction, int a
#ifdef CUSTOMER_RELEASE
return -1;
#else
+ DBUG_ENTER("Ndb::NdbTamper");
CHECK_STATUS_MACRO;
checkFailedNode();
@@ -621,20 +609,20 @@ Ndb::NdbTamper(TamperType aAction, int a
break;
default:
theError.code = 4102;
- return -1;
+ DBUG_RETURN(-1);
}
tNdbConn = getNdbCon(); // Get free connection object
if (tNdbConn == NULL) {
theError.code = 4000;
- return -1;
+ DBUG_RETURN(-1);
}
tSignal.setSignal(GSN_DIHNDBTAMPER);
tSignal.setData (tAction, 1);
tSignal.setData(tNdbConn->ptr2int(),2);
tSignal.setData(theMyRef,3); // Set return block reference
tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
- TransporterFacade *tp = TransporterFacade::instance();
+ TransporterFacade *tp = theImpl->m_transporter_facade;
if (tAction == 3) {
tp->lock_mutex();
tp->sendSignal(&tSignal, aNode);
@@ -646,12 +634,12 @@ Ndb::NdbTamper(TamperType aAction, int a
if (tNode == 0) {
theError.code = 4002;
releaseNdbCon(tNdbConn);
- return -1;
+ DBUG_RETURN(-1);
}//if
ret_code = tp->sendSignal(&tSignal,aNode);
tp->unlock_mutex();
releaseNdbCon(tNdbConn);
- return ret_code;
+ DBUG_RETURN(ret_code);
} else {
do {
tp->lock_mutex();
@@ -662,7 +650,7 @@ Ndb::NdbTamper(TamperType aAction, int a
if (tNode == 0) {
theError.code = 4009;
releaseNdbCon(tNdbConn);
- return -1;
+ DBUG_RETURN(-1);
}//if
ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
if (ret_code == 0) {
@@ -670,15 +658,15 @@ Ndb::NdbTamper(TamperType aAction, int a
theRestartGCI = 0;
}//if
releaseNdbCon(tNdbConn);
- return theRestartGCI;
+ DBUG_RETURN(theRestartGCI);
} else if ((ret_code == -5) || (ret_code == -2)) {
TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
} else {
- return -1;
+ DBUG_RETURN(-1);
}//if
} while (1);
}
- return 0;
+ DBUG_RETURN(0);
#endif
}
#if 0
@@ -777,15 +765,18 @@ Ndb::getAutoIncrementValue(const char* a
Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+ const NdbTableImpl* table = info->m_table_impl;
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
DBUG_RETURN(0);
@@ -796,31 +787,48 @@ Ndb::getAutoIncrementValue(const NdbDict
Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+ DBUG_RETURN(-1);
+ DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_RETURN(0);
+}
+
+int
+Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
+ TupleIdRange & range, Uint64 & tupleId,
+ Uint32 cacheSize)
+{
+ DBUG_ENTER("Ndb::getAutoIncrementValue");
+ assert(aTable != 0);
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+ if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
DBUG_RETURN(0);
}
int
-Ndb::getTupleIdFromNdb(Ndb_local_table_info* info,
- Uint64 & tupleId, Uint32 cacheSize)
+Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
+ TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
{
DBUG_ENTER("Ndb::getTupleIdFromNdb");
- if (info->m_first_tuple_id != info->m_last_tuple_id)
+ if (range.m_first_tuple_id != range.m_last_tuple_id)
{
- assert(info->m_first_tuple_id < info->m_last_tuple_id);
- tupleId = ++info->m_first_tuple_id;
+ assert(range.m_first_tuple_id < range.m_last_tuple_id);
+ tupleId = ++range.m_first_tuple_id;
DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
}
else
@@ -833,7 +841,7 @@ Ndb::getTupleIdFromNdb(Ndb_local_table_i
* and returns first tupleId in the new range.
*/
Uint64 opValue = cacheSize;
- if (opTupleIdOnNdb(info, opValue, 0) == -1)
+ if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
}
@@ -845,15 +853,18 @@ Ndb::readAutoIncrementValue(const char*
Uint64 & tupleId)
{
DBUG_ENTER("Ndb::readAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (readTupleIdFromNdb(info, tupleId) == -1)
+ const NdbTableImpl* table = info->m_table_impl;
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
DBUG_RETURN(0);
@@ -864,31 +875,47 @@ Ndb::readAutoIncrementValue(const NdbDic
Uint64 & tupleId)
{
DBUG_ENTER("Ndb::readAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (readTupleIdFromNdb(info, tupleId) == -1)
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (readTupleIdFromNdb(table, range, tupleId) == -1)
DBUG_RETURN(-1);
DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
DBUG_RETURN(0);
}
int
-Ndb::readTupleIdFromNdb(Ndb_local_table_info* info,
- Uint64 & tupleId)
+Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
+ TupleIdRange & range, Uint64 & tupleId)
+{
+ DBUG_ENTER("Ndb::readAutoIncrementValue");
+ assert(aTable != 0);
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+ if (readTupleIdFromNdb(table, range, tupleId) == -1)
+ DBUG_RETURN(-1);
+ DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_RETURN(0);
+}
+
+int
+Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
+ TupleIdRange & range, Uint64 & tupleId)
{
DBUG_ENTER("Ndb::readTupleIdFromNdb");
- if (info->m_first_tuple_id != info->m_last_tuple_id)
+ if (range.m_first_tuple_id != range.m_last_tuple_id)
{
- assert(info->m_first_tuple_id < info->m_last_tuple_id);
- tupleId = info->m_first_tuple_id + 1;
+ assert(range.m_first_tuple_id < range.m_last_tuple_id);
+ tupleId = range.m_first_tuple_id + 1;
}
else
{
@@ -897,7 +924,7 @@ Ndb::readTupleIdFromNdb(Ndb_local_table_
* only if no other transactions are allowed.
*/
Uint64 opValue = 0;
- if (opTupleIdOnNdb(info, opValue, 3) == -1)
+ if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
DBUG_RETURN(-1);
tupleId = opValue;
}
@@ -909,15 +936,18 @@ Ndb::setAutoIncrementValue(const char* a
Uint64 tupleId, bool increase)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (setTupleIdInNdb(info, tupleId, increase) == -1)
+ const NdbTableImpl* table = info->m_table_impl;
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
@@ -927,36 +957,52 @@ Ndb::setAutoIncrementValue(const NdbDict
Uint64 tupleId, bool increase)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
+ ASSERT_NOT_MYSQLD;
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
- theDictionary->get_local_table_info(internal_tabname, false);
+ theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(-1);
}
- if (setTupleIdInNdb(info, tupleId, increase) == -1)
+ TupleIdRange & range = info->m_tuple_id_range;
+ if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+ DBUG_RETURN(-1);
+ DBUG_RETURN(0);
+}
+
+int
+Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
+ TupleIdRange & range, Uint64 tupleId,
+ bool increase)
+{
+ DBUG_ENTER("Ndb::setAutoIncrementValue");
+ assert(aTable != 0);
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+ if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
-Ndb::setTupleIdInNdb(Ndb_local_table_info* info,
- Uint64 tupleId, bool increase)
+Ndb::setTupleIdInNdb(const NdbTableImpl* table,
+ TupleIdRange & range, Uint64 tupleId, bool increase)
{
DBUG_ENTER("Ndb::setTupleIdInNdb");
if (increase)
{
- if (info->m_first_tuple_id != info->m_last_tuple_id)
+ if (range.m_first_tuple_id != range.m_last_tuple_id)
{
- assert(info->m_first_tuple_id < info->m_last_tuple_id);
- if (tupleId <= info->m_first_tuple_id + 1)
+ assert(range.m_first_tuple_id < range.m_last_tuple_id);
+ if (tupleId <= range.m_first_tuple_id + 1)
DBUG_RETURN(0);
- if (tupleId <= info->m_last_tuple_id)
+ if (tupleId <= range.m_last_tuple_id)
{
- info->m_first_tuple_id = tupleId - 1;
+ range.m_first_tuple_id = tupleId - 1;
DBUG_PRINT("info",
("Setting next auto increment cached value to %lu",
(ulong)tupleId));
@@ -967,7 +1013,7 @@ Ndb::setTupleIdInNdb(Ndb_local_table_inf
* if tupleId <= NEXTID, do nothing. otherwise update NEXTID to
* tupleId and set cached range to first = last = tupleId - 1.
*/
- if (opTupleIdOnNdb(info, tupleId, 2) == -1)
+ if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
DBUG_RETURN(-1);
}
else
@@ -975,40 +1021,62 @@ Ndb::setTupleIdInNdb(Ndb_local_table_inf
/*
* update NEXTID to given value. reset cached range.
*/
- if (opTupleIdOnNdb(info, tupleId, 1) == -1)
+ if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
+int Ndb::initAutoIncrement()
+{
+ if (m_sys_tab_0)
+ return 0;
+
+ BaseString currentDb(getDatabaseName());
+ BaseString currentSchema(getDatabaseSchemaName());
+
+ setDatabaseName("sys");
+ setDatabaseSchemaName("def");
+
+ m_sys_tab_0 = theDictionary->getTableGlobal("SYSTAB_0");
+
+ // Restore current name space
+ setDatabaseName(currentDb.c_str());
+ setDatabaseSchemaName(currentSchema.c_str());
+
+ if (m_sys_tab_0 == NULL) {
+ assert(theDictionary->m_error.code != 0);
+ theError.code = theDictionary->m_error.code;
+ return -1;
+ }
+
+ return 0;
+}
+
int
-Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op)
+Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
+ TupleIdRange & range, Uint64 & opValue, Uint32 op)
{
DBUG_ENTER("Ndb::opTupleIdOnNdb");
- Uint32 aTableId = info->m_table_impl->m_tableId;
+ Uint32 aTableId = table->m_id;
DBUG_PRINT("enter", ("table: %u value: %lu op: %u",
aTableId, (ulong) opValue, op));
- NdbTransaction* tConnection;
- NdbOperation* tOperation= 0; // Compiler warning if not initialized
+ NdbTransaction* tConnection = NULL;
+ NdbOperation* tOperation = NULL;
Uint64 tValue;
NdbRecAttr* tRecAttrResult;
- CHECK_STATUS_MACRO_ZERO;
+ CHECK_STATUS_MACRO;
- BaseString currentDb(getDatabaseName());
- BaseString currentSchema(getDatabaseSchemaName());
+ if (initAutoIncrement() == -1)
+ goto error_handler;
- setDatabaseName("sys");
- setDatabaseSchemaName("def");
tConnection = this->startTransaction();
if (tConnection == NULL)
- goto error_return;
+ goto error_handler;
- if (usingFullyQualifiedNames())
- tOperation = tConnection->getNdbOperation("SYSTAB_0");
- else
- tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
+ tOperation = tConnection->getNdbOperation(m_sys_tab_0);
if (tOperation == NULL)
goto error_handler;
@@ -1016,7 +1084,7 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
{
case 0:
tOperation->interpretedUpdateTuple();
- tOperation->equal("SYSKEY_0", aTableId );
+ tOperation->equal("SYSKEY_0", aTableId);
tOperation->incValue("NEXTID", opValue);
tRecAttrResult = tOperation->getValue("NEXTID");
@@ -1025,9 +1093,9 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
tValue = tRecAttrResult->u_64_value();
- info->m_first_tuple_id = tValue - opValue;
- info->m_last_tuple_id = tValue - 1;
- opValue = info->m_first_tuple_id; // out
+ range.m_first_tuple_id = tValue - opValue;
+ range.m_last_tuple_id = tValue - 1;
+ opValue = range.m_first_tuple_id; // out
break;
case 1:
// create on first use
@@ -1038,8 +1106,7 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
- info->m_first_tuple_id = ~(Uint64)0;
- info->m_last_tuple_id = ~(Uint64)0;
+ range.reset();
break;
case 2:
tOperation->interpretedUpdateTuple();
@@ -1062,8 +1129,8 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
{
DBUG_PRINT("info",
("Setting next auto increment value (db) to %lu",
- (ulong)opValue));
- info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
+ (ulong) opValue));
+ range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
}
break;
case 3:
@@ -1080,24 +1147,28 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
this->closeTransaction(tConnection);
- // Restore current name space
- setDatabaseName(currentDb.c_str());
- setDatabaseSchemaName(currentSchema.c_str());
-
DBUG_RETURN(0);
- error_handler:
+error_handler:
+ DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
+ theError.code,
+ tConnection != NULL ? tConnection->theError.code : -1,
+ tOperation != NULL ? tOperation->theError.code : -1));
+
+ if (theError.code == 0 && tConnection != NULL)
theError.code = tConnection->theError.code;
+ if (theError.code == 0 && tOperation != NULL)
+ theError.code = tOperation->theError.code;
+ DBUG_ASSERT(theError.code != 0);
+
+ NdbError savedError;
+ savedError = theError;
+
+ if (tConnection != NULL)
this->closeTransaction(tConnection);
- error_return:
- // Restore current name space
- setDatabaseName(currentDb.c_str());
- setDatabaseSchemaName(currentSchema.c_str());
- DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
- theError.code,
- tConnection ? tConnection->theError.code : -1,
- tOperation ? tOperation->theError.code : -1));
+ theError = savedError;
+
DBUG_RETURN(-1);
}
@@ -1118,39 +1189,43 @@ convertEndian(Uint32 Data)
return Data;
#endif
}
+
+// <internal>
+Ndb_cluster_connection &
+Ndb::get_ndb_cluster_connection()
+{
+ return theImpl->m_ndb_cluster_connection;
+}
+
const char * Ndb::getCatalogName() const
{
return theImpl->m_dbname.c_str();
}
-
void Ndb::setCatalogName(const char * a_catalog_name)
{
- if (a_catalog_name)
- {
+ // TODO can table_name_separator be escaped?
+ if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
theImpl->m_dbname.assign(a_catalog_name);
theImpl->update_prefix();
}
}
-
const char * Ndb::getSchemaName() const
{
return theImpl->m_schemaname.c_str();
}
-
void Ndb::setSchemaName(const char * a_schema_name)
{
- if (a_schema_name) {
+ // TODO can table_name_separator be escaped?
+ if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
theImpl->m_schemaname.assign(a_schema_name);
theImpl->update_prefix();
}
}
+// </internal>
-/*
-Deprecated functions
-*/
const char * Ndb::getDatabaseName() const
{
return getCatalogName();
@@ -1170,6 +1245,26 @@ void Ndb::setDatabaseSchemaName(const ch
{
setSchemaName(a_schema_name);
}
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+ const char* s0 = t->m_impl.m_internalName.c_str();
+ const char* s1 = strchr(s0, table_name_separator);
+ if (s1 && s1 != s0) {
+ const char* s2 = strchr(s1 + 1, table_name_separator);
+ if (s2 && s2 != s1 + 1) {
+ char buf[NAME_LEN + 1];
+ if (s1 - s0 <= NAME_LEN && s2 - (s1 + 1) <= NAME_LEN) {
+ sprintf(buf, "%.*s", (int) (s1 - s0), s0);
+ setDatabaseName(buf);
+ sprintf(buf, "%.*s", (int) (s2 - (s1 + 1)), s1 + 1);
+ setDatabaseSchemaName(buf);
+ return 0;
+ }
+ }
+ }
+ return -1;
+}
bool Ndb::usingFullyQualifiedNames()
{
@@ -1232,9 +1327,16 @@ Ndb::internalize_table_name(const char *
if (fullyQualifiedNames)
{
/* Internal table name format <db>/<schema>/<table>
- <db>/<schema> is already available in m_prefix
+ <db>/<schema>/ is already available in m_prefix
so just concat the two strings
*/
+#ifdef VM_TRACE
+ // verify that m_prefix looks like abc/def/
+ const char* s0 = theImpl->m_prefix.c_str();
+ const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+ const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+ assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 +
1) == 0);
+#endif
ret.assfmt("%s%s",
theImpl->m_prefix.c_str(),
external_name);
@@ -1246,6 +1348,35 @@ Ndb::internalize_table_name(const char *
DBUG_RETURN(ret);
}
+const BaseString
+Ndb::old_internalize_index_name(const NdbTableImpl * table,
+ const char * external_name) const
+{
+ BaseString ret;
+ DBUG_ENTER("old_internalize_index_name");
+ DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
+ external_name, table ? table->m_id : ~0));
+ if (!table)
+ {
+ DBUG_PRINT("error", ("!table"));
+ DBUG_RETURN(ret);
+ }
+
+ if (fullyQualifiedNames)
+ {
+ /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+ ret.assfmt("%s%d%c%s",
+ theImpl->m_prefix.c_str(),
+ table->m_id,
+ table_name_separator,
+ external_name);
+ }
+ else
+ ret.assign(external_name);
+
+ DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
+ DBUG_RETURN(ret);
+}
const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table,
@@ -1254,7 +1385,7 @@ Ndb::internalize_index_name(const NdbTab
BaseString ret;
DBUG_ENTER("internalize_index_name");
DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
- external_name, table ? table->m_tableId : ~0));
+ external_name, table ? table->m_id : ~0));
if (!table)
{
DBUG_PRINT("error", ("!table"));
@@ -1263,10 +1394,10 @@ Ndb::internalize_index_name(const NdbTab
if (fullyQualifiedNames)
{
- /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+ /* Internal index name format sys/def/<tabid>/<table> */
ret.assfmt("%s%d%c%s",
- theImpl->m_prefix.c_str(),
- table->m_tableId,
+ theImpl->m_systemPrefix.c_str(),
+ table->m_id,
table_name_separator,
external_name);
}
@@ -1311,6 +1442,113 @@ Ndb::getSchemaFromInternalName(const cha
BaseString ret = BaseString(schemaName);
delete [] schemaName;
return ret;
+}
+
+// ToDo set event buffer size
+NdbEventOperation* Ndb::createEventOperation(const char* eventName)
+{
+ DBUG_ENTER("Ndb::createEventOperation");
+ NdbEventOperation* tOp= theEventBuffer->createEventOperation(eventName,
+ theError);
+ if (tOp)
+ {
+ // keep track of all event operations
+ NdbEventOperationImpl *op=
+ NdbEventBuffer::getEventOperationImpl(tOp);
+ op->m_next= theImpl->m_ev_op;
+ op->m_prev= 0;
+ theImpl->m_ev_op= op;
+ if (op->m_next)
+ op->m_next->m_prev= op;
+ }
+
+ DBUG_RETURN(tOp);
+}
+
+int Ndb::dropEventOperation(NdbEventOperation* tOp)
+{
+ DBUG_ENTER("Ndb::dropEventOperation");
+ DBUG_PRINT("info", ("name: %s", tOp->getEvent()->getTable()->getName()));
+ // remove it from list
+ NdbEventOperationImpl *op=
+ NdbEventBuffer::getEventOperationImpl(tOp);
+ if (op->m_next)
+ op->m_next->m_prev= op->m_prev;
+ if (op->m_prev)
+ op->m_prev->m_next= op->m_next;
+ else
+ theImpl->m_ev_op= op->m_next;
+
+ DBUG_PRINT("info", ("first: %s",
+ theImpl->m_ev_op ?
theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
+ assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);
+
+ theEventBuffer->dropEventOperation(tOp);
+ DBUG_RETURN(0);
+}
+
+NdbEventOperation *Ndb::getEventOperation(NdbEventOperation* tOp)
+{
+ NdbEventOperationImpl *op;
+ if (tOp)
+ op= NdbEventBuffer::getEventOperationImpl(tOp)->m_next;
+ else
+ op= theImpl->m_ev_op;
+ if (op)
+ return op->m_facade;
+ return 0;
+}
+
+int
+Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
+{
+ return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
+}
+
+int
+Ndb::flushIncompleteEvents(Uint64 gci)
+{
+ return theEventBuffer->flushIncompleteEvents(gci);
+}
+
+NdbEventOperation *Ndb::nextEvent()
+{
+ return theEventBuffer->nextEvent();
+}
+
+const NdbEventOperation*
+Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
+{
+ NdbEventOperationImpl* op =
+ theEventBuffer->getGCIEventOperations(iter, event_types);
+ if (op != NULL)
+ return op->m_facade;
+ return NULL;
+}
+
+Uint64 Ndb::getLatestGCI()
+{
+ return theEventBuffer->getLatestGCI();
+}
+
+void Ndb::setReportThreshEventGCISlip(unsigned thresh)
+{
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+ theEventBuffer->m_free_thresh= thresh;
+ theEventBuffer->m_min_free_thresh= thresh;
+ theEventBuffer->m_max_free_thresh= 100;
+ }
+}
+
+void Ndb::setReportThreshEventFreeMem(unsigned thresh)
+{
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+ theEventBuffer->m_free_thresh= thresh;
+ theEventBuffer->m_min_free_thresh= thresh;
+ theEventBuffer->m_max_free_thresh= 100;
+ }
}
#ifdef VM_TRACE
--- 1.23.6.1/ndb/src/ndbapi/TransporterFacade.hpp 2007-03-07 10:46:38 +07:00
+++ 1.35/storage/ndb/src/ndbapi/TransporterFacade.hpp 2007-03-07 15:31:00 +07:00
@@ -33,6 +33,7 @@ class ConfigRetriever;
class Ndb;
class NdbApiSignal;
+class NdbWaiter;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
@@ -46,13 +47,17 @@ extern "C" {
class TransporterFacade
{
public:
+ /**
+ * Max number of Ndb objects.
+ * (Ndb objects should not be shared by different threads.)
+ */
+ STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade();
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
- static TransporterFacade* instance();
int start_instance(int, const ndb_mgm_configuration*);
- static void stop_instance();
+ void stop_instance();
/**
* Register this block for sending/receiving signals
@@ -62,6 +67,7 @@ public:
// Close this block number
int close(BlockNumber blockNumber, Uint64 trans_id);
+ Uint32 get_active_ndb_objects() const;
// Only sends to nodes which are alive
int sendSignal(NdbApiSignal * signal, NodeId nodeId);
@@ -95,6 +101,11 @@ public:
void ReportNodeAlive(NodeId nodeId);
void ReportNodeDead(NodeId nodeId);
void ReportNodeFailureComplete(NodeId nodeId);
+
+ /**
+ * Send signal to each registered object
+ */
+ void for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]);
void lock_mutex();
void unlock_mutex();
@@ -110,13 +121,48 @@ public:
Uint32 get_scan_batch_size();
Uint32 get_batch_byte_size();
Uint32 get_batch_size();
+ Uint32 m_waitfor_timeout; // in milli seconds...
TransporterRegistry* get_registry() { return theTransporterRegistry;};
+/*
+ When a thread has sent its signals and is ready to wait for reception
+ of these it does normally always wait on a conditional mutex and
+ the actual reception is handled by the receiver thread in the NDB API.
+ With the below new methods and variables each thread has the possibility
+ of becoming owner of the "right" to poll for signals. Effectually this
+ means that the thread acts temporarily as a receiver thread.
+ For the thread that succeeds in grabbing this "ownership" it will avoid
+ a number of expensive calls to conditional mutex and even more expensive
+ context switches to wake up.
+ When an owner of the poll "right" has completed its own task it is likely
+ that there are others still waiting. In this case we pick one of the
+ threads as new owner of the poll "right". Since we want to switch owner
+ as seldom as possible we always pick the last thread which is likely to
+ be the last to complete its reception.
+*/
+ void external_poll(Uint32 wait_time);
+ NdbWaiter* get_poll_owner(void) const { return poll_owner; }
+ void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
+ Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
+ void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
+ NdbWaiter* rem_last_from_cond_wait_queue();
// heart beat received from a node (e.g. a signal came)
void hb_received(NodeId n);
private:
+ void init_cond_wait_queue();
+ struct CondWaitQueueElement {
+ NdbWaiter *cond_wait_object;
+ Uint32 next_cond_wait;
+ Uint32 prev_cond_wait;
+ };
+ NdbWaiter *poll_owner;
+ CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
+ Uint32 first_in_cond_wait;
+ Uint32 first_free_cond_wait;
+ Uint32 last_in_cond_wait;
+ /* End poll owner stuff */
/**
* Send a signal unconditional of node status (used by ClusterMgr)
*/
@@ -172,13 +218,6 @@ private:
/**
* Block number handling
*/
-public:
- /**
- * Max number of Ndb objects.
- * (Ndb objects should not be shared by different threads.)
- */
- STATIC_CONST( MAX_NO_THREADS = 4711 );
- Uint32 m_waitfor_timeout; // in milli seconds...
private:
struct ThreadData {
@@ -201,6 +240,7 @@ private:
NodeStatusFunction m_statusFunction;
};
+ Uint32 m_use_cnt;
Uint32 m_firstFree;
Vector<Uint32> m_statusNext;
Vector<Object_Execute> m_objectExecute;
@@ -212,7 +252,7 @@ private:
inline Object_Execute get(Uint16 blockNo) const {
blockNo -= MIN_API_BLOCK_NO;
- if(blockNo < m_objectExecute.size()){
+ if(likely (blockNo < m_objectExecute.size())){
return m_objectExecute[blockNo];
}
Object_Execute oe = { 0, 0 };
@@ -239,19 +279,29 @@ private:
public:
NdbMutex* theMutexPtr;
-private:
- static TransporterFacade* theFacadeInstance;
public:
GlobalDictCache m_globalDictCache;
};
-inline
-TransporterFacade*
-TransporterFacade::instance()
+class PollGuard
{
- return theFacadeInstance;
-}
+ public:
+ PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
+ ~PollGuard() { unlock_and_signal(); }
+ int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
+ bool forceSend= false);
+ int wait_for_input_in_loop(int wait_time, bool forceSend);
+ void wait_for_input(int wait_time);
+ int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
+ void unlock_and_signal();
+ private:
+ TransporterFacade *m_tp;
+ NdbWaiter *m_waiter;
+ Uint32 m_block_no;
+ bool m_locked;
+};
+
inline
void
@@ -272,7 +322,7 @@ TransporterFacade::unlock_mutex()
inline
unsigned Ndb_cluster_connection_impl::get_connect_count() const
{
- return TransporterFacade::instance()->theClusterMgr->m_connect_count;
+ return m_transporter_facade->theClusterMgr->m_connect_count;
}
inline
@@ -330,11 +380,6 @@ TransporterFacade::getIsNodeSendable(Nod
return node.compatible && (startLevel == NodeState::SL_STARTED ||
startLevel == NodeState::SL_STOPPING_1 ||
node.m_state.getSingleUserMode());
- } else if (node.m_info.m_type == NodeInfo::REP) {
- /**
- * @todo Check that REP node actually has received API_REG_REQ
- */
- return node.compatible;
} else {
ndbout_c("TransporterFacade::getIsNodeSendable: Illegal node type: "
"%d of node: %d",
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.2426) | tomas | 7 Mar |