#At file:///net/atum17/export/home/tmp/jw159207/mysql/repo/push-scan-scan/ based on revid:jonas@stripped
3418 Jan Wedvik 2011-02-08
This commit adds checks to ensure that the batch size for child scan operations
is sufficiently large. The SPJ block divides the batch size by the number of
fragments that its must scan. If the batch size was smaller than the number of
fragments, then the SPJ block would anyway ask for one row from each fragment.
The API would then receive more rows than expected, triggering an assert.
The API will now make sure that the batch size is greater or equal to the
number of fragments.
modified:
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/ndberror.c
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2010-11-06 09:24:09 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-02-08 12:14:41 +0000
@@ -4889,9 +4889,7 @@ Dbspj::scanIndex_send(Signal* signal,
bs_rows /= cnt;
bs_bytes /= cnt;
-
- if (bs_rows == 0)
- bs_rows = 1;
+ ndbassert(bs_rows > 0);
}
/**
@@ -5226,7 +5224,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
}
const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
- const Uint32 bs_rows = MAX(1, org->batch_size_rows/cnt);
+ const Uint32 bs_rows = org->batch_size_rows/cnt;
+ ndbassert(bs_rows > 0);
ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
req->requestInfo = 0;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp 2011-02-04 08:45:18 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp 2011-02-08 12:14:41 +0000
@@ -45,6 +45,7 @@
#define QRY_CHAR_PARAMETER_TRUNCATED 4823
#define QRY_MULTIPLE_SCAN_BRANCHES 4824
#define QRY_MULTIPLE_SCAN_SORTED 4825
+#define QRY_BATCH_SIZE_TOO_SMALL 4826
#ifdef __cplusplus
#include <Vector.hpp>
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-01-26 08:51:30 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-02-08 12:14:41 +0000
@@ -3832,9 +3832,9 @@ int NdbQueryOperationImpl::serializePara
Uint32
NdbQueryOperationImpl
-::calculateBatchedRows(NdbQueryOperationImpl* closestScan)
+::calculateBatchedRows(const NdbQueryOperationImpl* closestScan)
{
- NdbQueryOperationImpl* myClosestScan;
+ const NdbQueryOperationImpl* myClosestScan;
if (m_operationDef.isScanOperation())
{
myClosestScan = this;
@@ -3849,7 +3849,16 @@ NdbQueryOperationImpl
{
#ifdef TEST_SCANREQ
- m_maxBatchRows = 4; // To force usage of SCAN_NEXTREQ even for small scans resultsets
+ // To force usage of SCAN_NEXTREQ even for small scans resultsets
+ if (this == &getRoot())
+ {
+ m_maxBatchRows = 1;
+ }
+ else
+ {
+ m_maxBatchRows =
+ myClosestScan->getQueryOperationDef().getTable().getFragmentCount();
+ }
#endif
const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
@@ -3875,6 +3884,8 @@ NdbQueryOperationImpl
m_ndbRecord,
m_firstRecAttr,
0, // Key size.
+ getRoot().m_parallelism > 0 ?
+ getRoot().m_parallelism :
m_queryImpl.getRootFragCount(),
maxBatchRows,
batchByteSize,
@@ -3893,6 +3904,14 @@ NdbQueryOperationImpl
if (m_operationDef.isScanOperation())
{
+ if (myClosestScan != &getRoot())
+ {
+ /** Each SPJ block instance will scan each fragment, so the batch size
+ * cannot be smaller than the number of fragments.*/
+ maxBatchRows =
+ MAX(maxBatchRows, myClosestScan->getQueryOperationDef().
+ getTable().getFragmentCount());
+ }
// Use this value for current op and all lookup descendants.
m_maxBatchRows = maxBatchRows;
// Return max(Unit32) to avoid interfering with batch size calculation
@@ -4762,7 +4781,14 @@ int NdbQueryOperationImpl::setBatchSize(
getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
return -1;
}
-
+ if (this != &getRoot() &&
+ batchSize < getQueryOperationDef().getTable().getFragmentCount())
+ {
+ /** Each SPJ block instance will scan each fragment, so the batch size
+ * cannot be smaller than the number of fragments.*/
+ getQuery().setErrorCode(QRY_BATCH_SIZE_TOO_SMALL);
+ return -1;
+ }
m_maxBatchRows = batchSize;
return 0;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-12-28 12:07:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-02-08 12:14:41 +0000
@@ -724,7 +724,7 @@ private:
int serializeProject(Uint32Buffer& attrInfo);
- Uint32 calculateBatchedRows(NdbQueryOperationImpl* closestScan);
+ Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
void setBatchedRows(Uint32 batchedRows);
/** Construct and prepare receiver streams for result processing. */
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2011-02-04 11:45:24 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2011-02-08 12:14:41 +0000
@@ -801,6 +801,8 @@ ErrorBundle ErrorCodes[] = {
"Query with multiple scans may not be sorted." },
{ QRY_SEQUENTIAL_SCAN_SORTED, DMEC, AE,
"Parallelism cannot be restricted for sorted scans." },
+ { QRY_BATCH_SIZE_TOO_SMALL, DMEC, AE,
+ "Batch size for sub scan cannot be smaller than number of fragments." },
{ NO_CONTACT_WITH_PROCESS, DMEC, AE,
"No contact with the process (dead ?)."},
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20110208121441-0j9o1u4vluaui6fb.bundle
Thread |
---|
• bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(jan.wedvik:3418) | Jan Wedvik | 8 Feb |