3205 Ole John Aske 2010-07-14
Correctly set ScanIndexParameters::batchSize argument for a child scan in a pushed join operation.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
3204 Ole John Aske 2010-07-13 [merge]
Merge with main SPJ branch.
modified:
sql/ha_ndbcluster.cc
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/test/rqg/runall.sh
storage/ndb/test/rqg/spj_test.yy
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-07-08 13:56:43 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-07-14 17:03:46 +0000
@@ -607,7 +607,7 @@ bool NdbRootFragment::isEmpty() const
{
/* We iterate over the tuples of the bottom scan operation.
* When we have consumed all of these (for this batch and root fragment),
- * this NrbRootFragment is empty.
+ * this NdbRootFragment is empty.
*/
return m_currentTuple ==
m_query->getScan(m_query->getScanCount()-1)
@@ -1865,14 +1865,15 @@ NdbQueryImpl::prepareSend()
= getRoot().getQueryOperationDef().getTable().getFragmentCount();
Ndb* const ndb = m_transaction.getNdb();
- TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
-
- Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
#ifdef TEST_SCANREQ
- batchRows = 1; // To force usage of SCAN_NEXTREQ even for small scans resultsets
+ m_maxBatchRows = 1; // To force usage of SCAN_NEXTREQ even for small scans resultsets
#endif
+#if 1
+ TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
+ Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
+
// Calculate batchsize for query as minimum batchRows for all m_operations[].
// Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
// when building signal after max-batchRows has been determined.
@@ -1890,11 +1891,16 @@ NdbQueryImpl::prepareSend()
assert (firstBatchRows==batchRows);
}
m_maxBatchRows = batchRows;
+#else
+ // Some preparation for later batchsize calculations pr. (sub) scan
+ getRoot().calculateBatchedRows(m_maxBatchRows);
+#endif
+
/**
* Tuples are indentified with 16 bit unsigned integers, and tupleNotFound
* (=0xffff) is used for representing unknown tuples.
*/
- assert(batchRows < tupleNotFound);
+ assert(m_maxBatchRows < tupleNotFound);
/** Scan operations need a own sub-transaction object associated with each
* query.
@@ -2952,6 +2958,7 @@ NdbQueryOperationImpl::NdbQueryOperation
m_children(def.getNoOfChildOperations()),
m_resultStreams(NULL),
m_params(),
+ m_bufferSize(0),
m_batchBuffer(NULL),
m_resultBuffer(NULL),
m_resultRef(NULL),
@@ -3004,7 +3011,7 @@ NdbQueryOperationImpl::postFetchRelease(
{
if (m_batchBuffer) {
#ifndef NDEBUG // Buffer overrun check activated.
- { const Uint32 bufLen = m_batchByteSize*m_queryImpl.getRootFragCount();
+ { const Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
assert(m_batchBuffer[bufLen+0] == 'a' &&
m_batchBuffer[bufLen+1] == 'b' &&
m_batchBuffer[bufLen+2] == 'c' &&
@@ -3488,16 +3495,45 @@ int NdbQueryOperationImpl::serializePara
} // NdbQueryOperationImpl::serializeParams
+int
+NdbQueryOperationImpl::calculateBatchedRows(Uint32& batchedRows)
+{
+ for (Uint32 i = 0; i < m_children.size(); i++)
+ {
+ m_children[i]->calculateBatchedRows(batchedRows);
+ }
+
+ Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
+ TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+ // Calculate batchsize for query as minimum batchRows for all m_operations[].
+ // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
+ // when building signal after max-batchRows has been determined.
+ Uint32 batchByteSize, firstBatchRows;
+ NdbReceiver::calculate_batch_size(tp,
+ m_ndbRecord,
+ m_firstRecAttr,
+ 0, // Key size.
+ getQuery().getRootFragCount(),
+ batchedRows,
+ batchByteSize,
+ firstBatchRows);
+ assert (batchedRows>0);
+ assert (firstBatchRows==batchedRows);
+ return 0;
+} // NdbQueryOperationImpl::calculateBatchedRows
+
+
int
NdbQueryOperationImpl::prepareReceiver()
{
const Uint32 rowSize =
NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr,0,false);
- m_batchByteSize = rowSize * m_queryImpl.getMaxBatchRows();
-//ndbout "m_batchByteSize=" << m_batchByteSize << endl;
+ m_bufferSize = rowSize * m_queryImpl.getMaxBatchRows();
+//ndbout "m_bufferSize=" << m_bufferSize << endl;
- if (m_batchByteSize > 0) { // 0 bytes in batch if no result requested
- Uint32 bufLen = m_batchByteSize*m_queryImpl.getRootFragCount();
+ if (m_bufferSize > 0) { // 0 bytes in batch if no result requested
+ Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
#ifdef NDEBUG
m_batchBuffer = new char[bufLen];
if (unlikely(m_batchBuffer == NULL)) {
@@ -3544,7 +3580,7 @@ NdbQueryOperationImpl::prepareReceiver()
0 /*key_size*/,
0 /*read_range_no*/,
rowSize,
- &m_batchBuffer[m_batchByteSize*i],
+ &m_batchBuffer[m_bufferSize*i],
0);
m_resultStreams[i]->getReceiver().prepareSend();
}
@@ -3668,9 +3704,24 @@ NdbQueryOperationImpl::prepareAttrInfo(U
if (unlikely(param==NULL))
return Err_MemoryAlloc;
+ Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
+ TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+ Uint32 batchRows = m_queryImpl.getMaxBatchRows();
+ Uint32 batchByteSize, firstBatchRows;
+ NdbReceiver::calculate_batch_size(tp,
+ m_ndbRecord,
+ m_firstRecAttr,
+ 0, // Key size.
+ m_queryImpl.getRootFragCount(),
+ batchRows,
+ batchByteSize,
+ firstBatchRows);
+ assert (batchRows==m_queryImpl.getMaxBatchRows());
+
requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
param->requestInfo = requestInfo;
- param->batchSize = 0xffff0100; // TODO FIXME (bytes=0xffff, rows=0x100)
+ param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
param->resultData = getIdOfReceiver();
QueryNodeParameters::setOpLen(param->len, paramType, length);
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-07-07 13:24:23 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2010-07-14 17:03:46 +0000
@@ -535,7 +535,7 @@ public:
bool isRowNULL() const; // Row associated with Operation is NULL value?
- bool isRowChanged() const; // Prev ::nextResult() on NdbQuery retrived a new
+ bool isRowChanged() const; // Prev ::nextResult() on NdbQuery retrieved a new
// value for this NdbQueryOperation
/** Process result data for this operation. Return true if batch complete.*/
@@ -618,7 +618,11 @@ private:
/** Buffer for parameters in serialized format */
Uint32Buffer m_params;
- /** Internally allocated buffer for temporary storing one result batch.*/
+ /** Buffer size allocated for *each* ResultStream/Receiver when
+ * fetching results.*/
+ Uint32 m_bufferSize;
+ /** Internally allocated temp. buffer for *all* m_resultStreams[]
+ * when receiving a batch. (m_bufferSize x #ResultStreams) */
char* m_batchBuffer;
/** User specified buffer for final storage of result.*/
char* m_resultBuffer;
@@ -628,8 +632,6 @@ private:
const char** m_resultRef;
/** True if this operation gave no result for the current row.*/
bool m_isRowNull;
- /** Batch size for scans or lookups with scan parents.*/
- Uint32 m_batchByteSize;
/** Result record & optional bitmask to disable read of selected cols.*/
const NdbRecord* m_ndbRecord;
@@ -679,6 +681,8 @@ private:
int serializeProject(Uint32Buffer& attrInfo);
+ int calculateBatchedRows(Uint32& batchedRows);
+
/** Construct and prepare receiver streams for result processing. */
int prepareReceiver();
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2010-03-10 09:36:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2010-07-14 17:03:46 +0000
@@ -132,6 +132,7 @@ NdbReceiver::getValues(const NdbRecord*
m_record.m_ndb_record= rec;
m_record.m_row= row_ptr;
+ m_record.m_row_offset= rec->m_row_size;
}
#define KEY_ATTR_ID (~(Uint32)0)
@@ -699,6 +700,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
{
assert(m_record.m_read_range_no);
assert(attrSize==4);
+ assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
aLength--;
continue; // Next
@@ -710,6 +712,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
*/
if (attrId == AttributeHeader::READ_PACKED)
{
+ assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
aDataPtr,
m_record.m_row);
Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20100714170346-yi4y73yxzugc1667.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3204 to 3205) | Ole John Aske | 14 Jul |