#At file:///export/space/pekka/ms/ms-wl4124-70/ based on revid:pekka@stripped
4376 Pekka Nousiainen 2011-05-19
wl#4124 b03_trix.diff
trix-suma stats scan and cleanup
modified:
storage/ndb/include/kernel/signaldata/SumaImpl.hpp
storage/ndb/include/kernel/signaldata/UtilRelease.hpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/kernel/blocks/suma/Suma.hpp
storage/ndb/src/kernel/blocks/trix/Trix.cpp
storage/ndb/src/kernel/blocks/trix/Trix.hpp
=== modified file 'storage/ndb/include/kernel/signaldata/SumaImpl.hpp'
--- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2011-05-19 09:38:03 +0000
@@ -250,7 +250,7 @@ struct SubSyncReq {
friend class Suma;
friend bool printSUB_SYNC_REQ(FILE *, const Uint32 *, Uint32, Uint16);
- STATIC_CONST( SignalLength = 7 );
+ STATIC_CONST( SignalLength = 8 );
Uint32 senderRef;
Uint32 senderData;
@@ -259,16 +259,21 @@ struct SubSyncReq {
Uint32 part; // SubscriptionData::Part
Uint32 requestInfo;
Uint32 fragCount;
+ Uint32 fragId; // ZNIL if not used
enum {
LM_Exclusive = 0x1
,Reorg = 0x2
,NoDisk = 0x4
,TupOrder = 0x8
+ ,LM_CommittedRead = 0x10
+ ,RangeScan = 0x20
+ ,StatScan = 0x40
};
SECTION( ATTRIBUTE_LIST = 0); // Used when doing SingelTableScan
SECTION( TABLE_LIST = 1 );
+ SECTION( TUX_BOUND_INFO = 1); // If range scan
};
struct SubSyncRef {
=== modified file 'storage/ndb/include/kernel/signaldata/UtilRelease.hpp'
--- a/storage/ndb/include/kernel/signaldata/UtilRelease.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/UtilRelease.hpp 2011-05-19 09:38:03 +0000
@@ -17,7 +17,7 @@
*/
#ifndef UTIL_RELEASE_HPP
-#define UTIL_PREPARE_HPP
+#define UTIL_RELEASE_HPP
#include "SignalData.hpp"
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2011-05-19 09:38:03 +0000
@@ -174,7 +174,12 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
c_subOpPool.setSize(256);
c_syncPool.setSize(2);
- c_dataBufferPool.setSize(noAttrs);
+
+ // Trix: max 5 concurrent index stats ops with max 9 words bounds
+ Uint32 noOfBoundWords = 5 * 9;
+
+ // XXX multiplies number of words by 15 ???
+ c_dataBufferPool.setSize(noAttrs + noOfBoundWords);
c_maxBufferedEpochs = maxBufferedEpochs;
@@ -2292,6 +2297,7 @@ Suma::execSUB_SYNC_REQ(Signal* signal)
syncPtr.p->m_error = 0;
syncPtr.p->m_requestInfo = req->requestInfo;
syncPtr.p->m_frag_cnt = req->fragCount;
+ syncPtr.p->m_frag_id = req->fragId;
syncPtr.p->m_tableId = subPtr.p->m_tableId;
{
@@ -2302,8 +2308,17 @@ Suma::execSUB_SYNC_REQ(Signal* signal)
handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
LocalDataBuffer<15> attrBuf(c_dataBufferPool, syncPtr.p->m_attributeList);
append(attrBuf, ptr, getSectionSegmentPool());
- releaseSections(handle);
}
+ if (req->requestInfo & SubSyncReq::RangeScan)
+ {
+ jam();
+ ndbrequire(handle.m_cnt > 1)
+ SegmentedSectionPtr ptr;
+ handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO);
+ LocalDataBuffer<15> boundBuf(c_dataBufferPool, syncPtr.p->m_boundInfo);
+ append(boundBuf, ptr, getSectionSegmentPool());
+ }
+ releaseSections(handle);
}
/**
@@ -2432,8 +2447,30 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal
fd.m_fragDesc.m_nodeId = conf->nodes[0];
fd.m_fragDesc.m_fragmentNo = fragNo;
fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
- signal->theData[2] = fd.m_dummy;
- fragBuf.append(&signal->theData[2], 1);
+ if (ptr.p->m_frag_id == ZNIL)
+ {
+ signal->theData[2] = fd.m_dummy;
+ fragBuf.append(&signal->theData[2], 1);
+ }
+ else if (ptr.p->m_frag_id == fragNo)
+ {
+ /*
+ * Given fragment must have a replica on this node.
+ */
+ const Uint32 ownNodeId = getOwnNodeId();
+ Uint32 i = 0;
+ for (i = 0; i < nodeCount; i++)
+ if (conf->nodes[i] == ownNodeId)
+ break;
+ if (i == nodeCount)
+ {
+ sendSubSyncRef(signal, 1428);
+ return;
+ }
+ fd.m_fragDesc.m_nodeId = ownNodeId;
+ signal->theData[2] = fd.m_dummy;
+ fragBuf.append(&signal->theData[2], 1);
+ }
}
const Uint32 nextFrag = fragNo + 1;
@@ -2740,6 +2777,21 @@ Suma::SyncRecord::nextScan(Signal* signa
ScanFragReq::setTupScanFlag(req->requestInfo, 1);
}
+ if (m_requestInfo & SubSyncReq::LM_CommittedRead)
+ {
+ ScanFragReq::setReadCommittedFlag(req->requestInfo, 1);
+ }
+
+ if (m_requestInfo & SubSyncReq::RangeScan)
+ {
+ ScanFragReq::setRangeScanFlag(req->requestInfo, 1);
+ }
+
+ if (m_requestInfo & SubSyncReq::StatScan)
+ {
+ ScanFragReq::setStatScanFlag(req->requestInfo, 1);
+ }
+
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0;
@@ -2763,10 +2815,25 @@ Suma::SyncRecord::nextScan(Signal* signa
AttributeHeader::init(&attrInfo[pos++], * it.data, 0);
}
LinearSectionPtr ptr[3];
+ Uint32 noOfSections;
ptr[0].p = attrInfo;
ptr[0].sz = pos;
+ noOfSections = 1;
+ if (m_requestInfo & SubSyncReq::RangeScan)
+ {
+ jam();
+ Uint32 oldpos = pos; // after attrInfo
+ LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
+ for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it))
+ {
+ attrInfo[pos++] = *it.data;
+ }
+ ptr[1].p = &attrInfo[oldpos];
+ ptr[1].sz = pos - oldpos;
+ noOfSections = 2;
+ }
suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
- ScanFragReq::SignalLength, JBB, ptr, 1);
+ ScanFragReq::SignalLength, JBB, ptr, noOfSections);
m_currentNoOfAttributes = attrBuf.getSize();
@@ -5299,6 +5366,9 @@ Suma::SyncRecord::release(){
LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
attrBuf.release();
+
+ LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
+ boundBuf.release();
}
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2011-05-19 09:38:03 +0000
@@ -171,6 +171,7 @@ public:
Uint32 m_requestInfo;
Uint32 m_frag_cnt; // only scan this many fragments...
+ Uint32 m_frag_id; // only scan this specific fragment...
Uint32 m_tableId; // redundant...
/**
@@ -185,6 +186,7 @@ public:
Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
Uint32 m_currentNoOfAttributes; // No of attributes for current table
DataBuffer<15>::Head m_attributeList; // Attribute if other than default
+ DataBuffer<15>::Head m_boundInfo; // For range scan
void startScan(Signal*);
void nextScan(Signal*);
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2011-05-19 09:38:03 +0000
@@ -108,6 +108,17 @@ Trix::Trix(Block_context& ctx) :
addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
+
+ // index stats
+ addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
+ addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
+ addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
+
+ // index stats sys tables
+ c_statGetMetaDone = false;
+ c_statMetaHead = &Ndbcntr::g_sysTable_NDBIS_HEAD;
+ c_statMetaSample = &Ndbcntr::g_sysTable_NDBIS_SAMPLE;
+ c_statMetaSampleX1 = &Ndbcntr::g_sysIndex_NDBIS_SAMPLE_X1;
}
/**
@@ -134,6 +145,7 @@ Trix::execREAD_CONFIG_REQ(Signal* signal
// Allocate pool sizes
c_theAttrOrderBufferPool.setSize(100);
c_theSubscriptionRecPool.setSize(100);
+ c_statOpPool.setSize(5);
DLList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
SubscriptionRecPtr subptr;
@@ -469,12 +481,14 @@ Trix::execDUMP_STATE_ORD(Signal* signal)
if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
{
RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
+ RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
return;
}
if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
{
RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
+ RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
return;
}
@@ -597,6 +611,7 @@ void Trix:: execBUILD_INDX_IMPL_REQ(Sign
subRec->prepareId = RNIL;
subRec->requestType = INDEX_BUILD;
subRec->fragCount = 0;
+ subRec->fragId = ZNIL;
subRec->m_rows_processed = 0;
subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
subRec->m_gci = 0;
@@ -655,6 +670,11 @@ void Trix::execUTIL_PREPARE_CONF(Signal*
printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
return;
}
+ if (subRec->requestType == STAT_UTIL)
+ {
+ statUtilPrepareConf(signal, subRec->m_statPtrI);
+ return;
+ }
subRecPtr.p = subRec;
subRec->prepareId = utilPrepareConf->prepareId;
setupSubscription(signal, subRecPtr);
@@ -672,6 +692,11 @@ void Trix::execUTIL_PREPARE_REF(Signal*
printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
return;
}
+ if (subRec->requestType == STAT_UTIL)
+ {
+ statUtilPrepareRef(signal, subRec->m_statPtrI);
+ return;
+ }
subRecPtr.p = subRec;
subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
@@ -696,6 +721,11 @@ void Trix::execUTIL_EXECUTE_CONF(Signal*
printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
return;
}
+ if (subRec->requestType == STAT_UTIL)
+ {
+ statUtilExecuteConf(signal, subRec->m_statPtrI);
+ return;
+ }
subRecPtr.p = subRec;
subRec->expectedConf--;
@@ -730,6 +760,11 @@ void Trix::execUTIL_EXECUTE_REF(Signal*
printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
return;
}
+ if (subRec->requestType == STAT_UTIL)
+ {
+ statUtilExecuteRef(signal, subRec->m_statPtrI);
+ return;
+ }
subRecPtr.p = subRec;
ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
@@ -890,6 +925,21 @@ void Trix::execSUB_TABLE_DATA(Signal* si
case REORG_DELETE:
executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
break;
+ case STAT_UTIL:
+ ndbrequire(false);
+ break;
+ case STAT_CLEAN:
+ {
+ StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
+ statCleanExecute(signal, stat);
+ }
+ break;
+ case STAT_SCAN:
+ {
+ StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
+ statScanExecute(signal, stat);
+ }
+ break;
}
subRecPtr.p->m_rows_processed++;
@@ -939,8 +989,10 @@ void Trix::startTableScan(Signal* signal
}
// Merge index and key column segments
struct LinearSectionPtr orderPtr[3];
+ Uint32 noOfSections;
orderPtr[0].p = attributeList;
orderPtr[0].sz = subRec->attributeOrder.getSize();
+ noOfSections = 1;
SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
subSyncReq->senderRef = reference();
@@ -950,6 +1002,7 @@ void Trix::startTableScan(Signal* signal
subSyncReq->part = SubscriptionData::TableData;
subSyncReq->requestInfo = 0;
subSyncReq->fragCount = subRec->fragCount;
+ subSyncReq->fragId = subRec->fragId;
if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
{
@@ -974,6 +1027,27 @@ void Trix::startTableScan(Signal* signal
subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
subSyncReq->requestInfo |= SubSyncReq::Reorg;
}
+ else if (subRec->requestType == STAT_CLEAN)
+ {
+ jam();
+ StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
+ StatOp::Clean clean = stat.m_clean;
+ orderPtr[1].p = clean.m_bound;
+ orderPtr[1].sz = clean.m_boundSize;
+ noOfSections = 2;
+ subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
+ subSyncReq->requestInfo |= SubSyncReq::RangeScan;
+ }
+ else if (subRec->requestType == STAT_SCAN)
+ {
+ jam();
+ orderPtr[1].p = 0;
+ orderPtr[1].sz = 0;
+ noOfSections = 2;
+ subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
+ subSyncReq->requestInfo |= SubSyncReq::RangeScan;
+ subSyncReq->requestInfo |= SubSyncReq::StatScan;
+ }
subRecPtr.p->expectedConf = 1;
DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
@@ -981,7 +1055,7 @@ void Trix::startTableScan(Signal* signal
subSyncReq->subscriptionKey));
sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
- signal, SubSyncReq::SignalLength, JBB, orderPtr, 1);
+ signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
}
void Trix::prepareInsertTransactions(Signal* signal,
@@ -1325,6 +1399,24 @@ Trix::execUTIL_RELEASE_CONF(Signal* sign
BuildIndxRef::SignalLength , JBB);
}
break;
+ case STAT_UTIL:
+ ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
+ statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
+ return;
+ case STAT_CLEAN:
+ {
+ subRecPtr.p->prepareId = RNIL;
+ StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
+ statCleanRelease(signal, stat);
+ }
+ return;
+ case STAT_SCAN:
+ {
+ subRecPtr.p->prepareId = RNIL;
+ StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
+ statScanRelease(signal, stat);
+ }
+ return;
}
// Release subscription record
@@ -1394,6 +1486,7 @@ Trix::execCOPY_DATA_IMPL_REQ(Signal* sig
subRec->pendingSubSyncContinueConf = false;
subRec->prepareId = req->transId;
subRec->fragCount = req->srcFragments;
+ subRec->fragId = ZNIL;
subRec->m_rows_processed = 0;
subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
subRec->m_gci = 0;
@@ -1478,6 +1571,1533 @@ Trix::execCOPY_DATA_IMPL_REQ(Signal* sig
}
}
+// index stats
+
+Trix::StatOp&
+Trix::statOpGetPtr(Uint32 statPtrI)
+{
+ ndbrequire(statPtrI != RNIL);
+ return *c_statOpPool.getPtr(statPtrI);
+}
+
+bool
+Trix::statOpSeize(Uint32& statPtrI)
+{
+ StatOpPtr statPtr;
+ if (ERROR_INSERTED(17001) ||
+ !c_statOpPool.seize(statPtr))
+ {
+ jam();
+ D("statOpSeize: seize statOp failed");
+ return false;
+ }
+#ifdef VM_TRACE
+ memset(statPtr.p, 0xf3, sizeof(*statPtr.p));
+#endif
+ new (statPtr.p) StatOp;
+ statPtrI = statPtr.i;
+ StatOp& stat = statOpGetPtr(statPtrI);
+ stat.m_ownPtrI = statPtrI;
+
+ SubscriptionRecPtr subRecPtr;
+ if (ERROR_INSERTED(17002) ||
+ !c_theSubscriptions.seize(subRecPtr))
+ {
+ jam();
+ c_statOpPool.release(statPtr);
+ D("statOpSeize: seize subRec failed");
+ return false;
+ }
+ SubscriptionRecord* subRec = subRecPtr.p;
+ subRec->m_statPtrI = stat.m_ownPtrI;
+ stat.m_subRecPtrI = subRecPtr.i;
+
+ D("statOpSeize" << V(statPtrI) << V(subRecPtr.i));
+ return true;
+}
+
+void
+Trix::statOpRelease(StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ D("statOpRelease" << V(stat));
+
+ if (stat.m_subRecPtrI != RNIL)
+ {
+ jam();
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ ndbrequire(subRec->prepareId == RNIL);
+ subRec->attributeOrder.release();
+ c_theSubscriptions.release(stat.m_subRecPtrI);
+ stat.m_subRecPtrI = RNIL;
+ }
+ ndbrequire(util.m_prepareId == RNIL);
+ c_statOpPool.release(stat.m_ownPtrI);
+}
+
+void
+Trix::execINDEX_STAT_IMPL_REQ(Signal* signal)
+{
+ jamEntry();
+ const IndexStatImplReq* req = (const IndexStatImplReq*)signal->getDataPtr();
+
+ Uint32 statPtrI = RNIL;
+ if (!statOpSeize(statPtrI))
+ {
+ jam();
+ statOpRef(signal, req, IndexStatRef::NoFreeStatOp, __LINE__);
+ return;
+ }
+ StatOp& stat = statOpGetPtr(statPtrI);
+ stat.m_req = *req;
+ stat.m_requestType = req->requestType;
+
+ // set request name for cluster log message
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_CLEAN_NEW:
+ jam();
+ stat.m_requestName = "clean new";
+ break;
+ case IndexStatReq::RT_CLEAN_OLD:
+ jam();
+ stat.m_requestName = "clean old";
+ break;
+ case IndexStatReq::RT_CLEAN_ALL:
+ jam();
+ stat.m_requestName = "clean all";
+ break;
+ case IndexStatReq::RT_SCAN_FRAG:
+ jam();
+ stat.m_requestName = "scan frag";
+ break;
+ case IndexStatReq::RT_DROP_HEAD:
+ jam();
+ stat.m_requestName = "drop head";
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ subRec->prepareId = RNIL;
+ subRec->errorCode = BuildIndxRef::NoError;
+
+ // sys tables are not recreated so do this only once
+ if (!c_statGetMetaDone)
+ {
+ jam();
+ statMetaGetHead(signal, stat);
+ return;
+ }
+ statGetMetaDone(signal, stat);
+}
+
+// sys tables metadata
+
+void
+Trix::statMetaGetHead(Signal* signal, StatOp& stat)
+{
+ D("statMetaGetHead" << V(stat));
+ StatOp::Meta& meta = stat.m_meta;
+ meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
+ meta.m_cb.m_callbackData = stat.m_ownPtrI;
+ const char* name = Ndbcntr::g_sysTable_NDBIS_HEAD.name;
+ sendGetTabInfoReq(signal, stat, name);
+}
+
+void
+Trix::statMetaGetHeadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statMetaGetHeadCB" << V(stat) << V(ret));
+ StatOp::Meta& meta = stat.m_meta;
+ if (ret != 0)
+ {
+ jam();
+ statOpError(signal, stat, ret, __LINE__);
+ return;
+ }
+ c_statMetaHead->tableId = meta.m_conf.tableId;
+ statMetaGetSample(signal, stat);
+}
+
+void
+Trix::statMetaGetSample(Signal* signal, StatOp& stat)
+{
+ D("statMetaGetSample" << V(stat));
+ StatOp::Meta& meta = stat.m_meta;
+ meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
+ meta.m_cb.m_callbackData = stat.m_ownPtrI;
+ const char* name = Ndbcntr::g_sysTable_NDBIS_SAMPLE.name;
+ sendGetTabInfoReq(signal, stat, name);
+}
+
+void
+Trix::statMetaGetSampleCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statMetaGetSampleCB" << V(stat) << V(ret));
+ StatOp::Meta& meta = stat.m_meta;
+ if (ret != 0)
+ {
+ jam();
+ statOpError(signal, stat, ret, __LINE__);
+ return;
+ }
+ c_statMetaSample->tableId = meta.m_conf.tableId;
+ statMetaGetSampleX1(signal, stat);
+}
+
+void
+Trix::statMetaGetSampleX1(Signal* signal, StatOp& stat)
+{
+ D("statMetaGetSampleX1" << V(stat));
+ StatOp::Meta& meta = stat.m_meta;
+ meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
+ meta.m_cb.m_callbackData = stat.m_ownPtrI;
+ const char* name_fmt = Ndbcntr::g_sysIndex_NDBIS_SAMPLE_X1.name;
+ char name[MAX_TAB_NAME_SIZE];
+ snprintf(name, sizeof(name), name_fmt, c_statMetaSample->tableId);
+ sendGetTabInfoReq(signal, stat, name);
+}
+
+void
+Trix::statMetaGetSampleX1CB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statMetaGetSampleX1CB" << V(stat) << V(ret));
+ StatOp::Meta& meta = stat.m_meta;
+ if (ret != 0)
+ {
+ jam();
+ statOpError(signal, stat, ret, __LINE__);
+ return;
+ }
+ c_statMetaSampleX1->tableId = c_statMetaSample->tableId;
+ c_statMetaSampleX1->indexId = meta.m_conf.tableId;
+ statGetMetaDone(signal, stat);
+}
+
+void
+Trix::sendGetTabInfoReq(Signal* signal, StatOp& stat, const char* name)
+{
+ D("sendGetTabInfoReq" << V(stat) << V(name));
+ GetTabInfoReq* req = (GetTabInfoReq*)signal->getDataPtrSend();
+
+ Uint32 name_len = strlen(name) + 1;
+ Uint32 name_len_words = (name_len + 3 ) / 4;
+ Uint32 name_buf[32];
+ ndbrequire(name_len_words <= 32);
+ memset(name_buf, 0, sizeof(name_buf));
+ memcpy(name_buf, name, name_len);
+
+ req->senderData = stat.m_ownPtrI;
+ req->senderRef = reference();
+ req->requestType = GetTabInfoReq::RequestByName |
+ GetTabInfoReq::LongSignalConf;
+ req->tableNameLen = name_len;
+ req->schemaTransId = 0;
+ LinearSectionPtr ptr[3];
+ ptr[0].p = name_buf;
+ ptr[0].sz = name_len_words;
+ sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
+ signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
+}
+
+void
+Trix::execGET_TABINFO_CONF(Signal* signal)
+{
+ jamEntry();
+ if (!assembleFragments(signal)) {
+ jam();
+ return;
+ }
+ const GetTabInfoConf* conf = (const GetTabInfoConf*)signal->getDataPtr();
+ StatOp& stat = statOpGetPtr(conf->senderData);
+ D("execGET_TABINFO_CONF" << V(stat));
+ StatOp::Meta& meta = stat.m_meta;
+ meta.m_conf = *conf;
+
+ // do not need DICTTABINFO
+ SectionHandle handle(this, signal);
+ releaseSections(handle);
+
+ execute(signal, meta.m_cb, 0);
+}
+
+void
+Trix::execGET_TABINFO_REF(Signal* signal)
+{
+ jamEntry();
+ const GetTabInfoRef* ref = (const GetTabInfoRef*)signal->getDataPtr();
+ StatOp& stat = statOpGetPtr(ref->senderData);
+ D("execGET_TABINFO_REF" << V(stat));
+ StatOp::Meta& meta = stat.m_meta;
+
+ ndbrequire(ref->errorCode != 0);
+ execute(signal, meta.m_cb, ref->errorCode);
+}
+
+// continue after metadata retrieval
+
+void
+Trix::statGetMetaDone(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statGetMetaDone" << V(stat));
+
+ c_statGetMetaDone = true;
+
+ subRec->requestType = STAT_UTIL;
+ // fill in constant part
+ ndbrequire(req->fragCount != 0);
+ data.m_indexId = req->indexId;
+ data.m_indexVersion = req->indexVersion;
+ data.m_fragCount = req->fragCount;
+ statHeadRead(signal, stat);
+}
+
+// head table ops
+
+void
+Trix::statHeadRead(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statHeadRead" << V(stat));
+
+ util.m_not_found = false;
+ util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
+ util.m_cb.m_callbackData = stat.m_ownPtrI;
+ send.m_sysTable = c_statMetaHead;
+ send.m_operationType = UtilPrepareReq::Read;
+ statUtilPrepare(signal, stat);
+}
+
+void
+Trix::statHeadReadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ StatOp::Data& data = stat.m_data;
+ StatOp::Util& util = stat.m_util;
+ D("statHeadReadCB" << V(stat) << V(ret));
+
+ ndbrequire(ret == 0);
+ data.m_head_found = !util.m_not_found;
+ statReadHeadDone(signal, stat);
+}
+
+void
+Trix::statHeadInsert(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statHeadInsert" << V(stat));
+
+ util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
+ util.m_cb.m_callbackData = stat.m_ownPtrI;
+ send.m_sysTable = c_statMetaHead;
+ send.m_operationType = UtilPrepareReq::Insert;
+ statUtilPrepare(signal, stat);
+}
+
+void
+Trix::statHeadInsertCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statHeadInsertCB" << V(stat) << V(ret));
+
+ ndbrequire(ret == 0);
+ statInsertHeadDone(signal, stat);
+}
+
+void
+Trix::statHeadUpdate(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statHeadUpdate" << V(stat));
+
+ util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
+ util.m_cb.m_callbackData = stat.m_ownPtrI;
+ send.m_sysTable = c_statMetaHead;
+ send.m_operationType = UtilPrepareReq::Update;
+ statUtilPrepare(signal, stat);
+}
+
+void
+Trix::statHeadUpdateCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statHeadUpdateCB" << V(stat) << V(ret));
+
+ ndbrequire(ret == 0);
+ statUpdateHeadDone(signal, stat);
+}
+
+void
+Trix::statHeadDelete(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statHeadDelete" << V(stat));
+
+ util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
+ util.m_cb.m_callbackData = stat.m_ownPtrI;
+ send.m_sysTable = c_statMetaHead;
+ send.m_operationType = UtilPrepareReq::Delete;
+ statUtilPrepare(signal, stat);
+}
+
+void
+Trix::statHeadDeleteCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statHeadDeleteCB" << V(stat) << V(ret));
+
+ ndbrequire(ret == 0);
+ statDeleteHeadDone(signal, stat);
+}
+
+// util (PK ops, only HEAD for now)
+
+void
+Trix::statUtilPrepare(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ D("statUtilPrepare" << V(stat));
+
+ util.m_prepareId = RNIL;
+ statSendPrepare(signal, stat);
+}
+
+void
+Trix::statUtilPrepareConf(Signal* signal, Uint32 statPtrI)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statUtilPrepareConf" << V(stat));
+
+ const UtilPrepareConf* utilConf =
+ (const UtilPrepareConf*)signal->getDataPtr();
+ util.m_prepareId = utilConf->prepareId;
+
+ const Uint32 ot = send.m_operationType;
+ if (ERROR_INSERTED(17011) && ot == UtilPrepareReq::Read ||
+ ERROR_INSERTED(17012) && ot != UtilPrepareReq::Read)
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ UtilExecuteRef* utilRef =
+ (UtilExecuteRef*)signal->getDataPtrSend();
+ utilRef->senderData = stat.m_ownPtrI;
+ utilRef->errorCode = UtilExecuteRef::AllocationError;
+ utilRef->TCErrorCode = 0;
+ sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
+ signal, UtilExecuteRef::SignalLength, JBB);
+ return;
+ }
+
+ statUtilExecute(signal, stat);
+}
+
+void
+Trix::statUtilPrepareRef(Signal* signal, Uint32 statPtrI)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statUtilPrepareRef" << V(stat));
+
+ const UtilPrepareRef* utilRef =
+ (const UtilPrepareRef*)signal->getDataPtr();
+ Uint32 errorCode = utilRef->errorCode;
+ ndbrequire(errorCode != 0);
+
+ switch (errorCode) {
+ case UtilPrepareRef::PREPARE_SEIZE_ERROR:
+ case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
+ case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
+ errorCode = IndexStatRef::BusyUtilPrepare;
+ break;
+ case UtilPrepareRef::DICT_TAB_INFO_ERROR:
+ errorCode = IndexStatRef::InvalidSysTable;
+ break;
+ case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
+ default:
+ ndbrequire(false);
+ break;
+ }
+ statOpError(signal, stat, errorCode, __LINE__);
+}
+
+void
+Trix::statUtilExecute(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statUtilExecute" << V(stat));
+
+ send.m_prepareId = util.m_prepareId;
+ statSendExecute(signal, stat);
+}
+
+void
+Trix::statUtilExecuteConf(Signal* signal, Uint32 statPtrI)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ StatOp::Attr& attr = stat.m_attr;
+ StatOp::Send& send = stat.m_send;
+ D("statUtilExecuteConf" << V(stat));
+
+ if (send.m_operationType == UtilPrepareReq::Read)
+ {
+ jam();
+ SectionHandle handle(this, signal);
+ Uint32 rattr[20];
+ Uint32 rdata[2048];
+ attr.m_attr = rattr;
+ attr.m_attrMax = 20;
+ attr.m_attrSize = 0;
+ attr.m_data = rdata;
+ attr.m_dataMax = 2048;
+ attr.m_dataSize = 0;
+ {
+ SegmentedSectionPtr ssPtr;
+ handle.getSection(ssPtr, 0);
+ ::copy(rattr, ssPtr);
+ }
+ {
+ SegmentedSectionPtr ssPtr;
+ handle.getSection(ssPtr, 1);
+ ::copy(rdata, ssPtr);
+ }
+ releaseSections(handle);
+
+ const Ndbcntr::SysTable& sysTable = *send.m_sysTable;
+ for (Uint32 i = 0; i < sysTable.columnCount; i++)
+ {
+ jam();
+ statDataIn(stat, i);
+ }
+ }
+
+ statUtilRelease(signal, stat);
+}
+
+void
+Trix::statUtilExecuteRef(Signal* signal, Uint32 statPtrI)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statUtilExecuteRef" << V(stat));
+
+ const UtilExecuteRef* utilRef =
+ (const UtilExecuteRef*)signal->getDataPtr();
+ Uint32 errorCode = utilRef->errorCode;
+ ndbrequire(errorCode != 0);
+
+ switch (errorCode) {
+ case UtilExecuteRef::TCError:
+ errorCode = utilRef->TCErrorCode;
+ ndbrequire(errorCode != 0);
+ if (send.m_operationType == UtilPrepareReq::Read &&
+ errorCode == ZNOT_FOUND)
+ {
+ jam();
+ util.m_not_found = true;
+ errorCode = 0;
+ }
+ break;
+ case UtilExecuteRef::AllocationError:
+ errorCode = IndexStatRef::BusyUtilExecute;
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+
+ if (errorCode != 0)
+ {
+ jam();
+ statOpError(signal, stat, errorCode, __LINE__);
+ return;
+ }
+ statUtilRelease(signal, stat);
+}
+
+void
+Trix::statUtilRelease(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ StatOp::Send& send = stat.m_send;
+ D("statUtilRelease" << V(stat));
+
+ send.m_prepareId = util.m_prepareId;
+ statSendRelease(signal, stat);
+}
+
+void
+Trix::statUtilReleaseConf(Signal* signal, Uint32 statPtrI)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ StatOp::Util& util = stat.m_util;
+ D("statUtilReleaseConf" << V(stat));
+
+ util.m_prepareId = RNIL;
+ execute(signal, util.m_cb, 0);
+}
+
+// continue after head table ops
+
+void
+Trix::statReadHeadDone(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ D("statReadHeadDone" << V(stat));
+
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_CLEAN_NEW:
+ jam();
+ case IndexStatReq::RT_CLEAN_OLD:
+ jam();
+ case IndexStatReq::RT_CLEAN_ALL:
+ jam();
+ statCleanBegin(signal, stat);
+ break;
+
+ case IndexStatReq::RT_SCAN_FRAG:
+ jam();
+ statScanBegin(signal, stat);
+ break;
+
+ case IndexStatReq::RT_DROP_HEAD:
+ jam();
+ statDropBegin(signal, stat);
+ break;
+
+ default:
+ ndbrequire(false);
+ break;
+ }
+}
+
+void
+Trix::statInsertHeadDone(Signal* signal, StatOp& stat)
+{
+ D("statInsertHeadDone" << V(stat));
+
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_SCAN_FRAG:
+ jam();
+ statScanEnd(signal, stat);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+}
+
+void
+Trix::statUpdateHeadDone(Signal* signal, StatOp& stat)
+{
+ D("statUpdateHeadDone" << V(stat));
+
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_SCAN_FRAG:
+ jam();
+ statScanEnd(signal, stat);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+}
+
+void
+Trix::statDeleteHeadDone(Signal* signal, StatOp& stat)
+{
+ D("statDeleteHeadDone" << V(stat));
+
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_DROP_HEAD:
+ jam();
+ statDropEnd(signal, stat);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+}
+
+// clean
+
+void
+Trix::statCleanBegin(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+ D("statCleanBegin" << V(stat));
+
+ if (data.m_head_found == true)
+ {
+ jam();
+ if (data.m_tableId != req->tableId &&
+ stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
+ {
+ jam();
+ // must run ndb_index_stat --drop
+ statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
+ return;
+ }
+ }
+ else
+ {
+ if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
+ {
+ jam();
+ // happens normally on first stats scan
+ stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
+ }
+ }
+ statCleanPrepare(signal, stat);
+}
+
+void
+Trix::statCleanPrepare(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+ StatOp::Clean& clean = stat.m_clean;
+ StatOp::Send& send = stat.m_send;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statCleanPrepare" << V(stat));
+
+ // count of deleted samples is just for info
+ clean.m_cleanCount = 0;
+
+ const Uint32 ao_list[] = {
+ 0, // INDEX_ID
+ 1, // INDEX_VERSION
+ 2, // SAMPLE_VERSION
+ 3 // STAT_KEY
+ };
+ const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
+
+ ndbrequire(req->fragId == ZNIL);
+ subRec->m_flags = 0;
+ subRec->requestType = STAT_CLEAN;
+ subRec->schemaTransId = req->transId;
+ subRec->userReference = 0; // not used
+ subRec->connectionPtr = RNIL;
+ subRec->subscriptionId = rand();
+ subRec->subscriptionKey = rand();
+ subRec->prepareId = RNIL;
+ subRec->indexType = 0; // not used
+ subRec->sourceTableId = c_statMetaSampleX1->indexId;
+ subRec->targetTableId = RNIL;
+ subRec->noOfIndexColumns = ao_size;
+ subRec->noOfKeyColumns = 0;
+ subRec->parallelism = 16;
+ subRec->fragCount = 0;
+ subRec->fragId = ZNIL;
+ subRec->syncPtr = RNIL;
+ subRec->errorCode = BuildIndxRef::NoError;
+ subRec->subscriptionCreated = false;
+ subRec->pendingSubSyncContinueConf = false;
+ subRec->expectedConf = 0;
+ subRec->m_rows_processed = 0;
+ subRec->m_gci = 0;
+
+ AttrOrderBuffer& ao_buf = subRec->attributeOrder;
+ ndbrequire(ao_buf.isEmpty());
+ ao_buf.append(ao_list, ao_size);
+
+ // create TUX bounds
+ clean.m_bound[0] = TuxBoundInfo::BoundEQ;
+ clean.m_bound[1] = AttributeHeader(0, 4).m_value;
+ clean.m_bound[2] = data.m_indexId;
+ clean.m_bound[3] = TuxBoundInfo::BoundEQ;
+ clean.m_bound[4] = AttributeHeader(1, 4).m_value;
+ clean.m_bound[5] = data.m_indexVersion;
+ switch (stat.m_requestType) {
+ case IndexStatReq::RT_CLEAN_NEW:
+ D("statCleanPrepare delete sample versions > " << data.m_sampleVersion);
+ clean.m_bound[6] = TuxBoundInfo::BoundLT;
+ clean.m_bound[7] = AttributeHeader(2, 4).m_value;
+ clean.m_bound[8] = data.m_sampleVersion;
+ clean.m_boundCount = 3;
+ break;
+ case IndexStatReq::RT_CLEAN_OLD:
+ D("statCleanPrepare delete sample versions < " << data.m_sampleVersion);
+ clean.m_bound[6] = TuxBoundInfo::BoundGT;
+ clean.m_bound[7] = AttributeHeader(2, 4).m_value;
+ clean.m_bound[8] = data.m_sampleVersion;
+ clean.m_boundCount = 3;
+ break;
+ case IndexStatReq::RT_CLEAN_ALL:
+ D("statCleanPrepare delete all sample versions");
+ clean.m_boundCount = 2;
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+ clean.m_boundSize = 3 * clean.m_boundCount;
+
+ // TRIX traps the CONF
+ send.m_sysTable = c_statMetaSample;
+ send.m_operationType = UtilPrepareReq::Delete;
+ statSendPrepare(signal, stat);
+}
+
+void
+Trix::statCleanExecute(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ StatOp::Send& send = stat.m_send;
+ StatOp::Clean& clean = stat.m_clean;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statCleanExecute" << V(stat));
+
+ SectionHandle handle(this, signal);
+ ndbrequire(handle.m_cnt == 2);
+
+ // ATTR_INFO
+ AttributeHeader ah[4];
+ SegmentedSectionPtr ptr0;
+ handle.getSection(ptr0, SubTableData::ATTR_INFO);
+ ndbrequire(ptr0.sz == 4);
+ ::copy((Uint32*)ah, ptr0);
+ ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
+ ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
+ ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
+ // read via TUP rounds bytes to words
+ const Uint32 kz = ah[3].getDataSize();
+ ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
+
+ // AFTER_VALUES
+ const Uint32 avmax = 3 + MAX_INDEX_STAT_KEY_SIZE;
+ Uint32 av[avmax];
+ SegmentedSectionPtr ptr1;
+ handle.getSection(ptr1, SubTableData::AFTER_VALUES);
+ ndbrequire(ptr1.sz <= avmax);
+ ::copy(av, ptr1);
+ ndbrequire(data.m_indexId == av[0]);
+ ndbrequire(data.m_indexVersion == av[1]);
+ data.m_sampleVersion = av[2];
+ data.m_statKey = &av[3];
+ const char* kp = (const char*)data.m_statKey;
+ const Uint32 kb = kp[0] + (kp[1] << 8);
+ // key is not empty
+ ndbrequire(kb != 0);
+ ndbrequire(kz == ((2 + kb) + 3) / 4);
+
+ clean.m_cleanCount++;
+ releaseSections(handle);
+
+ const Uint32 rt = stat.m_requestType;
+ if (ERROR_INSERTED(17021) && rt == IndexStatReq::RT_CLEAN_NEW ||
+ ERROR_INSERTED(17022) && rt == IndexStatReq::RT_CLEAN_OLD ||
+ ERROR_INSERTED(17023) && rt == IndexStatReq::RT_CLEAN_ALL)
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ UtilExecuteRef* utilRef =
+ (UtilExecuteRef*)signal->getDataPtrSend();
+ utilRef->senderData = stat.m_ownPtrI;
+ utilRef->errorCode = UtilExecuteRef::TCError;
+ utilRef->TCErrorCode = 626;
+ sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
+ signal, UtilExecuteRef::SignalLength, JBB);
+ subRec->expectedConf++;
+ return;
+ }
+
+ // TRIX traps the CONF
+ send.m_sysTable = c_statMetaSample;
+ send.m_operationType = UtilPrepareReq::Delete;
+ send.m_prepareId = subRec->prepareId;
+ subRec->expectedConf++;
+ statSendExecute(signal, stat);
+}
+
+void
+Trix::statCleanRelease(Signal* signal, StatOp& stat)
+{
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statCleanRelease" << V(stat) << V(subRec->errorCode));
+
+ if (subRec->errorCode != 0)
+ {
+ jam();
+ statOpError(signal, stat, subRec->errorCode, __LINE__);
+ return;
+ }
+ statCleanEnd(signal, stat);
+}
+
+void
+Trix::statCleanEnd(Signal* signal, StatOp& stat)
+{
+ D("statCleanEnd" << V(stat));
+ statOpSuccess(signal, stat);
+}
+
+// scan
+
+void
+Trix::statScanBegin(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+ D("statScanBegin" << V(stat));
+
+ if (data.m_head_found == true &&
+ data.m_tableId != req->tableId)
+ {
+ jam();
+ statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
+ return;
+ }
+ data.m_tableId = req->tableId;
+ statScanPrepare(signal, stat);
+}
+
+void
+Trix::statScanPrepare(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+ StatOp::Scan& scan = stat.m_scan;
+ StatOp::Send& send = stat.m_send;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statScanPrepare" << V(stat));
+
+ // update sample version prior to scan
+ if (data.m_head_found == false)
+ data.m_sampleVersion = 0;
+ data.m_sampleVersion += 1;
+
+ // zero totals
+ scan.m_sampleCount = 0;
+ scan.m_keyBytes = 0;
+
+ const Uint32 ao_list[] = {
+ AttributeHeader::INDEX_STAT_KEY,
+ AttributeHeader::INDEX_STAT_VALUE
+ };
+ const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
+
+ ndbrequire(req->fragId != ZNIL);
+ subRec->m_flags = 0;
+ subRec->requestType = STAT_SCAN;
+ subRec->schemaTransId = req->transId;
+ subRec->userReference = 0; // not used
+ subRec->connectionPtr = RNIL;
+ subRec->subscriptionId = rand();
+ subRec->subscriptionKey = rand();
+ subRec->prepareId = RNIL;
+ subRec->indexType = 0; // not used
+ subRec->sourceTableId = data.m_indexId;
+ subRec->targetTableId = RNIL;
+ subRec->noOfIndexColumns = ao_size;
+ subRec->noOfKeyColumns = 0;
+ subRec->parallelism = 16;
+ subRec->fragCount = 0; // XXX Suma currently checks all frags
+ subRec->fragId = req->fragId;
+ subRec->syncPtr = RNIL;
+ subRec->errorCode = BuildIndxRef::NoError;
+ subRec->subscriptionCreated = false;
+ subRec->pendingSubSyncContinueConf = false;
+ subRec->expectedConf = 0;
+ subRec->m_rows_processed = 0;
+ subRec->m_gci = 0;
+
+ AttrOrderBuffer& ao_buf = subRec->attributeOrder;
+ ndbrequire(ao_buf.isEmpty());
+ ao_buf.append(ao_list, ao_size);
+
+ // TRIX traps the CONF
+ send.m_sysTable = c_statMetaSample;
+ send.m_operationType = UtilPrepareReq::Insert;
+ statSendPrepare(signal, stat);
+}
+
+void
+Trix::statScanExecute(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ StatOp::Scan& scan = stat.m_scan;
+ StatOp::Send& send = stat.m_send;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statScanExecute" << V(stat));
+
+ SectionHandle handle(this, signal);
+ ndbrequire(handle.m_cnt == 2);
+
+ // ATTR_INFO
+ AttributeHeader ah[2];
+ SegmentedSectionPtr ptr0;
+ handle.getSection(ptr0, SubTableData::ATTR_INFO);
+ ndbrequire(ptr0.sz == 2);
+ ::copy((Uint32*)ah, ptr0);
+ ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
+ ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
+ // read via TUP rounds bytes to words
+ const Uint32 kz = ah[0].getDataSize();
+ const Uint32 vz = ah[1].getDataSize();
+ ndbrequire(kz != 0 && vz != 0);
+
+ // AFTER_VALUES
+ const Uint32 avmax = MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
+ Uint32 av[avmax];
+ SegmentedSectionPtr ptr1;
+ handle.getSection(ptr1, SubTableData::AFTER_VALUES);
+ ndbrequire(ptr1.sz <= avmax);
+ ::copy(av, ptr1);
+ data.m_statKey = &av[0];
+ data.m_statValue = &av[kz];
+ const char* kp = (const char*)data.m_statKey;
+ const char* vp = (const char*)data.m_statValue;
+ const Uint32 kb = kp[0] + (kp[1] << 8);
+ const Uint32 vb = vp[0] + (vp[1] << 8);
+ // key and value are not empty
+ ndbrequire(kb != 0 && vb != 0);
+ ndbrequire(kz == ((2 + kb) + 3) / 4);
+ ndbrequire(vz == ((2 + vb) + 3) / 4);
+
+ scan.m_sampleCount++;
+ scan.m_keyBytes += kb;
+ releaseSections(handle);
+
+ if (ERROR_INSERTED(17024))
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ UtilExecuteRef* utilRef =
+ (UtilExecuteRef*)signal->getDataPtrSend();
+ utilRef->senderData = stat.m_ownPtrI;
+ utilRef->errorCode = UtilExecuteRef::TCError;
+ utilRef->TCErrorCode = 630;
+ sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
+ signal, UtilExecuteRef::SignalLength, JBB);
+ subRec->expectedConf++;
+ return;
+ }
+
+ // TRIX traps the CONF
+ send.m_sysTable = c_statMetaSample;
+ send.m_operationType = UtilPrepareReq::Insert;
+ send.m_prepareId = subRec->prepareId;
+ subRec->expectedConf++;
+ statSendExecute(signal, stat);
+}
+
+void
+Trix::statScanRelease(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ StatOp::Scan& scan = stat.m_scan;
+ SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
+ D("statScanRelease" << V(stat) << V(subRec->errorCode));
+
+ if (subRec->errorCode != 0)
+ {
+ jam();
+ statOpError(signal, stat, subRec->errorCode, __LINE__);
+ return;
+ }
+ subRec->requestType = STAT_UTIL;
+
+ const Uint32 now = (Uint32)time(0);
+ data.m_loadTime = now;
+ data.m_sampleCount = scan.m_sampleCount;
+ data.m_keyBytes = scan.m_keyBytes;
+ data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
+
+ if (data.m_head_found == false)
+ {
+ jam();
+ statHeadInsert(signal, stat);
+ }
+ else
+ {
+ jam();
+ statHeadUpdate(signal, stat);
+ }
+}
+
+void
+Trix::statScanEnd(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ const IndexStatImplReq* req = &stat.m_req;
+ D("statScanEnd" << V(stat));
+
+ /*
+ * TRIX reports stats load time to TUX for proper stats monitoring.
+ * Passing this via DBDICT RT_START_MON is not feasible. For MT-LQH
+ * we prefer DbtuxProxy to avoid introducing MT-LQH into TRIX.
+ */
+
+#if trix_index_stat_rep_to_tux_instance
+ Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
+ BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
+#else
+ BlockReference tuxRef = DBTUX_REF;
+#endif
+
+ IndexStatRep* rep = (IndexStatRep*)signal->getDataPtrSend();
+ rep->senderRef = reference();
+ rep->senderData = 0;
+ rep->requestType = IndexStatRep::RT_UPDATE_CONF;
+ rep->requestFlag = 0;
+ rep->indexId = req->indexId;
+ rep->indexVersion = req->indexVersion;
+ rep->tableId = req->tableId;
+ rep->fragId = req->fragId;
+ rep->loadTime = data.m_loadTime;
+ sendSignal(tuxRef, GSN_INDEX_STAT_REP,
+ signal, IndexStatRep::SignalLength, JBB);
+
+ statOpSuccess(signal, stat);
+}
+
+// drop
+
+void
+Trix::statDropBegin(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ D("statDropBegin" << V(stat));
+
+ if (data.m_head_found == true)
+ {
+ jam();
+ statHeadDelete(signal, stat);
+ return;
+ }
+ statDropEnd(signal, stat);
+}
+
+void
+Trix::statDropEnd(Signal* signal, StatOp& stat)
+{
+ D("statDropEnd");
+ statOpSuccess(signal, stat);
+}
+
+// send
+
+void
+Trix::statSendPrepare(Signal* signal, StatOp& stat)
+{
+ StatOp::Send& send = stat.m_send;
+ const IndexStatImplReq* req = &stat.m_req;
+ const Ndbcntr::SysTable& sysTable = *send.m_sysTable;
+ D("statSendPrepare" << V(stat));
+
+ UtilPrepareReq* utilReq =
+ (UtilPrepareReq*)signal->getDataPtrSend();
+ utilReq->senderData = stat.m_ownPtrI;
+ utilReq->senderRef = reference();
+ utilReq->schemaTransId = req->transId;
+
+ Uint32 wbuf[256];
+ LinearWriter w(&wbuf[0], sizeof(wbuf) >> 2);
+
+ w.first();
+ w.add(UtilPrepareReq::NoOfOperations, 1);
+ w.add(UtilPrepareReq::OperationType, send.m_operationType);
+ w.add(UtilPrepareReq::TableId, sysTable.tableId);
+
+ Uint32 i;
+ for (i = 0; i < sysTable.columnCount; i++) {
+ const Ndbcntr::SysColumn& c = sysTable.columnList[i];
+ switch (send.m_operationType) {
+ case UtilPrepareReq::Read:
+ case UtilPrepareReq::Insert:
+ case UtilPrepareReq::Update:
+ jam();
+ w.add(UtilPrepareReq::AttributeId, i);
+ break;
+ case UtilPrepareReq::Delete:
+ jam();
+ if (c.keyFlag)
+ w.add(UtilPrepareReq::AttributeId, i);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+ }
+
+ LinearSectionPtr ptr[3];
+ ptr[0].p = &wbuf[0];
+ ptr[0].sz = w.getWordsUsed();
+ sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
+ signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
+}
+
+void
+Trix::statSendExecute(Signal* signal, StatOp& stat)
+{
+ D("statSendExecute" << V(stat));
+ StatOp::Send& send = stat.m_send;
+ StatOp::Attr& attr = stat.m_attr;
+ const Ndbcntr::SysTable& sysTable = *send.m_sysTable;
+
+ UtilExecuteReq* utilReq =
+ (UtilExecuteReq*)signal->getDataPtrSend();
+ utilReq->senderData = stat.m_ownPtrI;
+ utilReq->senderRef = reference();
+ utilReq->prepareId = send.m_prepareId;
+ utilReq->scanTakeOver = 0;
+
+ Uint32 wattr[20];
+ Uint32 wdata[2048];
+ attr.m_attr = wattr;
+ attr.m_attrMax = 20;
+ attr.m_attrSize = 0;
+ attr.m_data = wdata;
+ attr.m_dataMax = 2048;
+ attr.m_dataSize = 0;
+
+ for (Uint32 i = 0; i < sysTable.columnCount; i++) {
+ const Ndbcntr::SysColumn& c = sysTable.columnList[i];
+ switch (send.m_operationType) {
+ case UtilPrepareReq::Read:
+ case UtilPrepareReq::Insert:
+ case UtilPrepareReq::Update:
+ jam();
+ statDataOut(stat, i);
+ break;
+ case UtilPrepareReq::Delete:
+ jam();
+ if (c.keyFlag)
+ statDataOut(stat, i);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+ }
+
+ LinearSectionPtr ptr[3];
+ ptr[0].p = attr.m_attr;
+ ptr[0].sz = attr.m_attrSize;
+ ptr[1].p = attr.m_data;
+ ptr[1].sz = attr.m_dataSize;
+ sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
+ signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
+}
+
+void
+Trix::statSendRelease(Signal* signal, StatOp& stat)
+{
+ D("statSendRelease" << V(stat));
+ StatOp::Send& send = stat.m_send;
+ ndbrequire(send.m_prepareId != RNIL);
+
+ UtilReleaseReq* utilReq =
+ (UtilReleaseReq*)signal->getDataPtrSend();
+ utilReq->senderData = stat.m_ownPtrI;
+ utilReq->prepareId = send.m_prepareId;
+ sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
+ signal, UtilReleaseReq::SignalLength, JBB);
+}
+
+// data
+
+void
+Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
+{
+ StatOp::Data& data = stat.m_data;
+ StatOp::Send& send = stat.m_send;
+
+ const Ndbcntr::SysTable& sysTable = *send.m_sysTable;
+ ndbrequire(i < sysTable.columnCount);
+ const Ndbcntr::SysColumn& c = sysTable.columnList[i];
+
+ if (&sysTable == c_statMetaHead)
+ {
+ switch (i) {
+ case 0:
+ dptr = &data.m_indexId;
+ bytes = 4;
+ break;
+ case 1:
+ dptr = &data.m_indexVersion;
+ bytes = 4;
+ break;
+ case 2:
+ dptr = &data.m_tableId;
+ bytes = 4;
+ break;
+ case 3:
+ dptr = &data.m_fragCount;
+ bytes = 4;
+ break;
+ case 4:
+ dptr = &data.m_valueFormat;
+ bytes = 4;
+ break;
+ case 5:
+ dptr = &data.m_sampleVersion;
+ bytes = 4;
+ break;
+ case 6:
+ dptr = &data.m_loadTime;
+ bytes = 4;
+ break;
+ case 7:
+ dptr = &data.m_sampleCount;
+ bytes = 4;
+ break;
+ case 8:
+ dptr = &data.m_keyBytes;
+ bytes = 4;
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+ return;
+ }
+
+ if (&sysTable == c_statMetaSample)
+ {
+ switch (i) {
+ case 0:
+ dptr = &data.m_indexId;
+ bytes = 4;
+ break;
+ case 1:
+ dptr = &data.m_indexVersion;
+ bytes = 4;
+ break;
+ case 2:
+ dptr = &data.m_sampleVersion;
+ bytes = 4;
+ break;
+ case 3:
+ {
+ dptr = data.m_statKey;
+ const uchar* p = (uchar*)dptr;
+ ndbrequire(p != 0);
+ bytes = 2 + p[0] + (p[1] << 8);
+ }
+ break;
+ case 4:
+ {
+ dptr = data.m_statValue;
+ const uchar* p = (uchar*)dptr;
+ ndbrequire(p != 0);
+ bytes = 2 + p[0] + (p[1] << 8);
+ }
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+ return;
+ }
+
+ ndbrequire(false);
+}
+
+void
+Trix::statDataOut(StatOp& stat, Uint32 i)
+{
+ StatOp::Attr& attr = stat.m_attr;
+ Uint32* dptr = 0;
+ Uint32 bytes = 0;
+ statDataPtr(stat, i, dptr, bytes);
+
+ ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
+ AttributeHeader::init(&attr.m_attr[attr.m_attrSize], i, bytes);
+ attr.m_attrSize++;
+
+ Uint32 words = (bytes + 3) / 4;
+ ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
+ Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
+ memcpy(dst, dptr, bytes);
+ while (bytes < words * 4)
+ dst[bytes++] = 0;
+ attr.m_dataSize += words;
+ D("statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
+}
+
+void
+Trix::statDataIn(StatOp& stat, Uint32 i)
+{
+ StatOp::Attr& attr = stat.m_attr;
+ Uint32* dptr = 0;
+ Uint32 bytes = 0;
+ statDataPtr(stat, i, dptr, bytes);
+
+ ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
+ const AttributeHeader& ah = attr.m_attr[attr.m_attrSize];
+ attr.m_attrSize++;
+
+ ndbrequire(ah.getByteSize() == bytes);
+ Uint32 words = (bytes + 3) / 4;
+ ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
+ const char* src = (const char*)&attr.m_data[attr.m_dataSize];
+ memcpy(dptr, src, bytes);
+ attr.m_dataSize += words;
+ D("statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
+}
+
+// abort ongoing
+
+void
+Trix::statAbortUtil(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ D("statAbortUtil" << V(stat));
+
+ ndbrequire(util.m_prepareId != RNIL);
+ util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
+ util.m_cb.m_callbackData = stat.m_ownPtrI;
+ statUtilRelease(signal, stat);
+}
+
+void
+Trix::statAbortUtilCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
+{
+ StatOp& stat = statOpGetPtr(statPtrI);
+ D("statAbortUtilCB" << V(stat) << V(ret));
+
+ ndbrequire(ret == 0);
+ statOpAbort(signal, stat);
+}
+
+// conf and ref
+
+void
+Trix::statOpSuccess(Signal* signal, StatOp& stat)
+{
+ StatOp::Data& data = stat.m_data;
+ D("statOpSuccess" << V(stat));
+
+ if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
+ statOpEvent(stat, "I", "created %u samples", data.m_sampleCount);
+
+ statOpConf(signal, stat);
+ statOpRelease(stat);
+}
+
+void
+Trix::statOpConf(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ D("statOpConf" << V(stat));
+
+ IndexStatImplConf* conf = (IndexStatImplConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = req->senderData;
+ sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
+ signal, IndexStatImplConf::SignalLength, JBB);
+}
+
+void
+Trix::statOpError(Signal* signal, StatOp& stat,
+ Uint32 errorCode, Uint32 errorLine)
+{
+ D("statOpError" << V(stat) << V(errorCode) << V(errorLine));
+
+ statOpEvent(stat, "W", "error %u line %u", errorCode, errorLine);
+
+ ndbrequire(stat.m_errorCode == 0);
+ stat.m_errorCode = errorCode;
+ stat.m_errorLine = errorLine;
+ statOpAbort(signal, stat);
+}
+
+void
+Trix::statOpAbort(Signal* signal, StatOp& stat)
+{
+ StatOp::Util& util = stat.m_util;
+ D("statOpAbort" << V(stat));
+
+ if (util.m_prepareId != RNIL)
+ {
+ jam();
+ // returns here when done
+ statAbortUtil(signal, stat);
+ return;
+ }
+ statOpRef(signal, stat);
+ statOpRelease(stat);
+}
+
+void
+Trix::statOpRef(Signal* signal, StatOp& stat)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ D("statOpRef" << V(stat));
+
+ statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
+}
+
+void
+Trix::statOpRef(Signal* signal, const IndexStatImplReq* req,
+ Uint32 errorCode, Uint32 errorLine)
+{
+ D("statOpRef" << V(errorCode) << V(errorLine));
+
+ IndexStatImplRef* ref = (IndexStatImplRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->senderData = req->senderData;
+ ref->errorCode = errorCode;
+ ref->errorLine = errorLine;
+ sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
+ signal, IndexStatImplRef::SignalLength, JBB);
+}
+
+void
+Trix::statOpEvent(StatOp& stat, const char* level, const char* msg, ...)
+{
+ const IndexStatImplReq* req = &stat.m_req;
+ StatOp::Data& data = stat.m_data;
+
+ char tmp1[100];
+ va_list ap;
+ va_start(ap, msg);
+ BaseString::vsnprintf(tmp1, sizeof(tmp1), msg, ap);
+ va_end(ap);
+
+ char tmp2[100];
+ BaseString::snprintf(tmp2, sizeof(tmp2),
+ "index %u stats version %u: %s: %s",
+ data.m_indexId, data.m_sampleVersion,
+ stat.m_requestName, tmp1);
+
+ D("statOpEvent" << V(level) << V(tmp2));
+
+ if (level[0] == 'I')
+ infoEvent("%s", tmp2);
+ if (level[0] == 'W')
+ warningEvent("%s", tmp2);
+}
+
+// debug
+
+class NdbOut&
+operator<<(NdbOut& out, const Trix::StatOp& stat)
+{
+ out << "[";
+ out << " i:" << stat.m_ownPtrI;
+ out << " head_found:" << stat.m_data.m_head_found;
+ out << " ]";
+ return out;
+}
+
BLOCK_FUNCTIONS(Trix)
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.hpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2011-05-19 09:38:03 +0000
@@ -25,6 +25,11 @@
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/CreateTrig.hpp>
#include <signaldata/BuildIndx.hpp>
+#include <signaldata/IndexStatSignal.hpp>
+#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/TuxBound.hpp>
+#include <ndbcntr/Ndbcntr.hpp>
+#define ZNOT_FOUND 626
// Error codes
#define INTERNAL_ERROR_ILLEGAL_CALL 4344
@@ -46,6 +51,9 @@ public:
REORG_COPY = 0
,REORG_DELETE = 1
,INDEX_BUILD = 2
+ ,STAT_UTIL = 3 // PK op of HEAD table directly via DBUTIL
+ ,STAT_CLEAN = 4
+ ,STAT_SCAN = 5
//ALTER_TABLE
};
typedef DataBuffer<11> AttrOrderBuffer;
@@ -126,6 +134,7 @@ private:
Uint32 noOfKeyColumns;
Uint32 parallelism;
Uint32 fragCount;
+ Uint32 fragId;
Uint32 syncPtr;
BuildIndxRef::ErrorCode errorCode;
bool subscriptionCreated;
@@ -133,6 +142,7 @@ private:
Uint32 expectedConf; // Count in n UTIL_EXECUTE_CONF + 1 SUB_SYNC_CONF
Uint64 m_rows_processed;
Uint64 m_gci;
+ Uint32 m_statPtrI;
union {
Uint32 nextPool;
Uint32 nextList;
@@ -153,6 +163,109 @@ private:
*/
DLList<SubscriptionRecord> c_theSubscriptions;
+ /*
+ * Ordered index stats. Implements sub-ops of DBDICT index stat
+ * schema op. Each sub-op is a simple REQ which seizes and releases
+ * a stat op here before returning CONF or REF. A stat op always has
+ * an associated SubscriptionRecord. It is used for SUMA index scans
+ * and as proxy for PK ops to DBUTIL.
+ */
+
+ bool c_statGetMetaDone;
+ const Ndbcntr::SysTable* c_statMetaHead;
+ const Ndbcntr::SysTable* c_statMetaSample;
+ const Ndbcntr::SysIndex* c_statMetaSampleX1;
+
+ struct StatOp {
+ struct Meta {
+ GetTabInfoConf m_conf;
+ Callback m_cb;
+ };
+ struct Data {
+ Int32 m_head_found;
+ Uint32 m_indexId;
+ Uint32 m_indexVersion;
+ Uint32 m_tableId;
+ Uint32 m_fragCount;
+ Uint32 m_valueFormat;
+ Uint32 m_sampleVersion;
+ Uint32 m_loadTime;
+ Uint32 m_sampleCount;
+ Uint32 m_keyBytes;
+ Uint32* m_statKey;
+ Uint32* m_statValue;
+ Data() {
+ m_head_found = -1;
+ m_sampleVersion = 0;
+ }
+ };
+ struct Attr {
+ Uint32* m_attr;
+ Uint32 m_attrMax;
+ Uint32 m_attrSize;
+ Uint32* m_data;
+ Uint32 m_dataMax;
+ Uint32 m_dataSize;
+ Attr() {}
+ };
+ struct Util {
+ Uint32 m_prepareId;
+ bool m_not_found;
+ Callback m_cb;
+ Util() {
+ m_prepareId = RNIL;
+ m_not_found = false; // read + ZNOT_FOUND
+ };
+ };
+ struct Clean {
+ Uint32 m_cleanCount;
+ // bounds on index_id, index_version, sample_version
+ Uint32 m_bound[3 * 3];
+ Uint32 m_boundCount;
+ Uint32 m_boundSize;
+ Clean() {}
+ };
+ struct Scan {
+ Uint32 m_sampleCount;
+ Uint32 m_keyBytes;
+ Scan() {}
+ };
+ struct Drop {
+ };
+ struct Send {
+ const Ndbcntr::SysTable* m_sysTable;
+ Uint32 m_operationType; // UtilPrepareReq::OperationTypeValue
+ Uint32 m_prepareId;
+ Send() {}
+ };
+ IndexStatImplReq m_req;
+ Uint32 m_requestType;
+ const char* m_requestName;
+ Uint32 m_subRecPtrI;
+ Meta m_meta;
+ Data m_data;
+ Attr m_attr;
+ Util m_util;
+ Clean m_clean;
+ Scan m_scan;
+ Drop m_drop;
+ Send m_send;
+ Uint32 m_errorCode;
+ Uint32 m_errorLine;
+ union {
+ Uint32 m_ownPtrI;
+ Uint32 nextPool;
+ };
+ StatOp() {
+ m_subRecPtrI = RNIL;
+ m_errorCode = 0;
+ m_errorLine = 0;
+ };
+ };
+ typedef Ptr<StatOp> StatOpPtr;
+ ArrayPool<StatOp> c_statOpPool;
+ RSS_AP_SNAPSHOT(c_statOpPool);
+
// System start
void execREAD_CONFIG_REQ(Signal* signal);
void execSTTOR(Signal* signal);
@@ -208,6 +321,83 @@ private:
SubscriptionRecPtr subRecPtr,
BuildIndxRef::ErrorCode);
void checkParallelism(Signal* signal, SubscriptionRecord* subRec);
+
+ // index stats
+ StatOp& statOpGetPtr(Uint32 statPtrI);
+ bool statOpSeize(Uint32& statPtrI);
+ void statOpRelease(StatOp&);
+ void execINDEX_STAT_IMPL_REQ(Signal*);
+ // sys tables metadata
+ void statMetaGetHead(Signal*, StatOp&);
+ void statMetaGetHeadCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void statMetaGetSample(Signal*, StatOp&);
+ void statMetaGetSampleCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void statMetaGetSampleX1(Signal*, StatOp&);
+ void statMetaGetSampleX1CB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void sendGetTabInfoReq(Signal*, StatOp&, const char* name);
+ void execGET_TABINFO_CONF(Signal*);
+ void execGET_TABINFO_REF(Signal*);
+ // continue
+ void statGetMetaDone(Signal*, StatOp&);
+ // head table ops
+ void statHeadRead(Signal*, StatOp&);
+ void statHeadReadCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void statHeadInsert(Signal*, StatOp&);
+ void statHeadInsertCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void statHeadUpdate(Signal*, StatOp&);
+ void statHeadUpdateCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ void statHeadDelete(Signal*, StatOp&);
+ void statHeadDeleteCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ // util
+ void statUtilPrepare(Signal*, StatOp&);
+ void statUtilPrepareConf(Signal*, Uint32 statPtrI);
+ void statUtilPrepareRef(Signal*, Uint32 statPtrI);
+ void statUtilExecute(Signal*, StatOp&);
+ void statUtilExecuteConf(Signal*, Uint32 statPtrI);
+ void statUtilExecuteRef(Signal*, Uint32 statPtrI);
+ void statUtilRelease(Signal*, StatOp&);
+ void statUtilReleaseConf(Signal*, Uint32 statPtrI);
+ // continue
+ void statReadHeadDone(Signal*, StatOp&);
+ void statInsertHeadDone(Signal*, StatOp&);
+ void statUpdateHeadDone(Signal*, StatOp&);
+ void statDeleteHeadDone(Signal*, StatOp&);
+ // clean
+ void statCleanBegin(Signal*, StatOp&);
+ void statCleanPrepare(Signal*, StatOp&);
+ void statCleanExecute(Signal*, StatOp&);
+ void statCleanRelease(Signal*, StatOp&);
+ void statCleanEnd(Signal*, StatOp&);
+ // scan
+ void statScanBegin(Signal*, StatOp&);
+ void statScanPrepare(Signal*, StatOp&);
+ void statScanExecute(Signal*, StatOp&);
+ void statScanRelease(Signal*, StatOp&);
+ void statScanEnd(Signal*, StatOp&);
+ // drop
+ void statDropBegin(Signal*, StatOp&);
+ void statDropEnd(Signal*, StatOp&);
+ // send
+ void statSendPrepare(Signal*, StatOp&);
+ void statSendExecute(Signal*, StatOp&);
+ void statSendRelease(Signal*, StatOp&);
+ // data
+ void statDataPtr(StatOp&, Uint32 i, Uint32*& dptr, Uint32& bytes);
+ void statDataOut(StatOp&, Uint32 i);
+ void statDataIn(StatOp&, Uint32 i);
+ // abort ongoing
+ void statAbortUtil(Signal*, StatOp&);
+ void statAbortUtilCB(Signal*, Uint32 statPtrI, Uint32 ret);
+ // conf and ref
+ void statOpSuccess(Signal*, StatOp&);
+ void statOpConf(Signal*, StatOp&);
+ void statOpError(Signal*, StatOp&, Uint32 errorCode, Uint32 errorLine);
+ void statOpAbort(Signal*, StatOp&);
+ void statOpRef(Signal*, StatOp&);
+ void statOpRef(Signal*, const IndexStatImplReq*, Uint32 errorCode, Uint32 errorLine);
+ void statOpEvent(StatOp&, const char* level, const char* msg, ...);
+ // debug
+ friend class NdbOut& operator<<(NdbOut&, const StatOp& stat);
};
#endif
Attachment: [text/bzr-bundle] bzr/pekka@mysql.com-20110519093803-vo0919gzcz832n8m.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4376) WL#4124 | Pekka Nousiainen | 19 May |