#At file:///net/atum17/export/home2/tmp/jw159207/mysql/repo/push-scan-scan/ based on revid:ole.john.aske@stripped
3312 Jan Wedvik 2010-10-12
This commits adds the NdbQueryOperation::setBatchSize() method for setting
the batch size (number of rows) for pushed scans. (Implicitly, this also sets
the batch size for descendant lookup operations.)
modified:
storage/ndb/include/ndbapi/NdbQueryOperation.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/include/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbQueryOperation.hpp 2010-10-07 13:54:04 +0000
+++ b/storage/ndb/include/ndbapi/NdbQueryOperation.hpp 2010-10-12 13:00:44 +0000
@@ -327,6 +327,16 @@ public:
*/
int setParallelism(Uint32 parallelism);
+ /** Set the batch size (max rows per batch) for this operation. This
+ * only applies to scan operations, as lookup operations always will
+ * have the same batch size as its parent operation, or 1 if it is the
+ * root operation.
+ * @param batchSize Batch size (in number of rows). A value of 0 means
+ * use the default batch size.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setBatchSize(Uint32 batchSize);
+
/**
* Set the NdbInterpretedCode needed for defining a scan filter for
* this operation.
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-10-11 13:18:51 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-10-12 13:00:44 +0000
@@ -1012,6 +1012,10 @@ int NdbQueryOperation::setParallelism(Ui
return m_impl.setParallelism(parallelism);
}
+int NdbQueryOperation::setBatchSize(Uint32 batchSize){
+ return m_impl.setBatchSize(batchSize);
+}
+
int NdbQueryOperation::setInterpretedCode(const NdbInterpretedCode& code) const
{
return m_impl.setInterpretedCode(code);
@@ -2070,7 +2074,7 @@ NdbQueryImpl::prepareSend()
}
// Some preparation for later batchsize calculations pr. (sub) scan
- getRoot().calculateBatchedRows();
+ getRoot().calculateBatchedRows(NULL);
getRoot().setBatchedRows(1);
// 1. Build receiver structures for each QueryOperation.
@@ -2303,7 +2307,8 @@ NdbQueryImpl::doSend(int nodeId, bool la
batchRows,
batchByteSize,
firstBatchRows);
- assert (batchRows==root.getMaxBatchRows());
+ assert(batchRows==root.getMaxBatchRows());
+ assert(batchRows==firstBatchRows);
ScanTabReq::setScanBatch(reqInfo, batchRows);
scanTabReq->batch_byte_size = batchByteSize;
scanTabReq->first_batch_size = firstBatchRows;
@@ -3635,56 +3640,97 @@ int NdbQueryOperationImpl::serializePara
return 0;
} // NdbQueryOperationImpl::serializeParams
-int
-NdbQueryOperationImpl::calculateBatchedRows(NdbQueryOperationImpl* scanParent)
+Uint32
+NdbQueryOperationImpl
+::calculateBatchedRows(NdbQueryOperationImpl* closestScan)
{
+ NdbQueryOperationImpl* myClosestScan;
if (m_operationDef.isScanOperation())
- scanParent = this;
+ {
+ myClosestScan = this;
+ }
+ else
+ {
+ myClosestScan = closestScan;
+ }
- for (Uint32 i = 0; i < m_children.size(); i++)
- m_children[i]->calculateBatchedRows(scanParent);
+ Uint32 maxBatchRows = 0;
+ if (myClosestScan != NULL)
+ {
#ifdef TEST_SCANREQ
m_maxBatchRows = 4; // To force usage of SCAN_NEXTREQ even for small scans resultsets
#endif
- if (scanParent!=NULL)
- {
- Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
+ const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
- // Calculate batchsize for query as minimum batchRows for all m_operations[].
- // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
- // when building signal after max-batchRows has been determined.
+ /**
+ * For each batch, a lookup operation must be able to receive as many rows
+ * as the closest ancestor scan operation.
+ * We must thus make sure that we do not set a batch size for the scan
+ * that exceeds what any of its scan descendants can use.
+ *
+ * Ignore calculated 'batchByteSize' and 'firstBatchRows'
+ * here - Recalculated when building signal after max-batchRows has been
+ * determined.
+ */
Uint32 batchByteSize, firstBatchRows;
+ /**
+ * myClosestScan->m_maxBatchRows may be zero to indicate that we
+ * should use default values, or non-zero if the application had an
+ * explicit preference.
+ */
+ maxBatchRows = myClosestScan->m_maxBatchRows;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
m_ndbRecord,
m_firstRecAttr,
0, // Key size.
- getQuery().getRootFragCount(), // scanParent->getFragCount()
- scanParent->m_maxBatchRows,
+ m_queryImpl.getRootFragCount(),
+ maxBatchRows,
batchByteSize,
firstBatchRows);
- assert (scanParent->m_maxBatchRows>0);
- assert (firstBatchRows==scanParent->m_maxBatchRows);
- m_maxBatchRows = firstBatchRows; // First guess, ::setBatchedRows() updates later
+ assert(maxBatchRows > 0);
+ assert(firstBatchRows == maxBatchRows);
}
- else
- m_maxBatchRows = 1;
- return 0;
+ // Find the largest value that is acceptable to all lookup descendants.
+ for (Uint32 i = 0; i < m_children.size(); i++)
+ {
+ const Uint32 childMaxBatchRows =
+ m_children[i]->calculateBatchedRows(myClosestScan);
+ maxBatchRows = MIN(maxBatchRows, childMaxBatchRows);
+ }
+
+ if (m_operationDef.isScanOperation())
+ {
+ // Use this value for current op and all lookup descendants.
+ m_maxBatchRows = maxBatchRows;
+ // Return max(Unit32) to avoid interfering with batch size calculation
+ // for parent.
+ return 0xffffffff;
+ }
+ else
+ {
+ return maxBatchRows;
+ }
} // NdbQueryOperationImpl::calculateBatchedRows
void
NdbQueryOperationImpl::setBatchedRows(Uint32 batchedRows)
{
- if (m_operationDef.isScanOperation())
- batchedRows = this->m_maxBatchRows;
+ if (!m_operationDef.isScanOperation())
+ {
+ /** Lookup operations should handle the same number of rows as
+ * the closest scan ancestor.
+ */
+ m_maxBatchRows = batchedRows;
+ }
for (Uint32 i = 0; i < m_children.size(); i++)
- m_children[i]->setBatchedRows(batchedRows);
-
- m_maxBatchRows = batchedRows;
+ {
+ m_children[i]->setBatchedRows(m_maxBatchRows);
+ }
}
@@ -3883,8 +3929,8 @@ NdbQueryOperationImpl::prepareAttrInfo(U
batchRows,
batchByteSize,
firstBatchRows);
- assert (batchRows==getMaxBatchRows());
-
+ assert(batchRows==getMaxBatchRows());
+ assert(batchRows==firstBatchRows);
requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
param->requestInfo = requestInfo;
param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
@@ -4534,6 +4580,17 @@ int NdbQueryOperationImpl::setParallelis
return 0;
}
+int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
+ if (!getQueryOperationDef().isScanOperation())
+ {
+ getQuery().setErrorCodeAbort(QRY_WRONG_OPERATION_TYPE);
+ return -1;
+ }
+
+ m_maxBatchRows = batchSize;
+ return 0;
+}
+
NdbResultStream&
NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
{
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-10-07 13:54:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-10-12 13:00:44 +0000
@@ -586,6 +586,16 @@ public:
*/
int setParallelism(Uint32 parallelism);
+ /** Set the batch size (max rows per batch) for this operation. This
+ * only applies to scan operations, as lookup operations always will
+ * have the same batch size as its parent operation, or 1 if it is the
+ * root operation.
+ * @param batchSize Batch size (in number of rows). A value of 0 means
+ * use the default batch size.
+ * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+ */
+ int setBatchSize(Uint32 batchSize);
+
/**
* Set the NdbInterpretedCode needed for defining a scan filter for
* this operation.
@@ -711,7 +721,7 @@ private:
int serializeProject(Uint32Buffer& attrInfo);
- int calculateBatchedRows(NdbQueryOperationImpl* scanParent=NULL);
+ Uint32 calculateBatchedRows(NdbQueryOperationImpl* closestScan);
void setBatchedRows(Uint32 batchedRows);
/** Construct and prepare receiver streams for result processing. */
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20101012130044-6q5fl5zskspdb2jp.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(jan.wedvik:3312) | Jan Wedvik | 12 Oct |