4464 Ole John Aske 2011-08-17
SPJ extensions:In preparation for implementing handling of double buffered result
sets in class NdbReceiver, extend NdbReceiver with two new methods to resp.
set buffer positions for receiving and reading result rows into/from
the NdbReceiver.
NOTE: Different receive and read position did already exists
as part of the NdbReceiver - This patch only extend the
interface to be able to set it from the outside.
modified:
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
4463 Ole John Aske 2011-08-17
Rename class NdbReceiver member field 'm_row' to 'm_row_recv' in order to
clarify its usage as a row buffer only used when received rows by
the NdbReceiver. (As opposed to the when we we *read* rows which has
been received by a NdbReceived)
modified:
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
4462 Ole John Aske 2011-08-17
Refactoring as part of preparing SPJ results to be double (multi...?) buffered.
'class SharedFragStack' which was intended as a container for fragments
which has received a complete batch of rows is removed.
It is replaced with a set of (mutex protected) methods in
'class NdbRootFragment' and OrderedFragSet::prepareMoreResults()
The rational for this refactoring is that the SharedFragStack container
wasn't a particular good fit to represent the state when *multiple*
batches of (double buffered) result set may be available.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:36:56 +0000
@@ -40,7 +40,6 @@ class NdbReceiver
friend class NdbIndexScanOperation;
friend class NdbTransaction;
friend class NdbRootFragment;
- friend class ReceiverIdIterator;
friend int compare_ndbrecord(const NdbReceiver *r1,
const NdbReceiver *r2,
const NdbRecord *key_record,
@@ -82,6 +81,12 @@ public:
void setErrorCode(int);
+ /* Prepare for receiving of rows into specified buffer */
+ void prepareReceive(char *buf);
+
+ /* Prepare for reading of rows from specified buffer */
+ void prepareRead(char *buf, Uint32 rows);
+
private:
Uint32 theMagicNumber;
Ndb* m_ndb;
@@ -140,8 +145,9 @@ private:
/* members used for NdbRecord operation. */
struct {
const NdbRecord *m_ndb_record;
- char *m_row;
- /* Block of memory used to receive all rows in a batch during scan. */
+ /* Destination to receive next row into. */
+ char *m_row_recv;
+ /* Block of memory used to read all rows in a batch during scan. */
char *m_row_buffer;
/*
Offsets between two rows in m_row_buffer.
@@ -226,7 +232,7 @@ private:
const char * & data_ptr) const;
int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
/** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
- void setCurrentRow(Uint32 currentRow);
+ void setCurrentRow(char* buffer, Uint32 row);
/** Used by NdbQueryOperationImpl.*/
Uint32 getCurrentRow() const { return m_current_row; }
};
@@ -259,7 +265,7 @@ NdbReceiver::prepareSend(){
if (m_using_ndb_record)
{
if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
- m_record.m_row= m_record.m_row_buffer;
+ m_record.m_row_recv= m_record.m_row_buffer;
}
theCurrentRecAttr = theFirstRecAttr;
}
@@ -287,9 +293,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
inline
void
-NdbReceiver::setCurrentRow(Uint32 currentRow)
+NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
{
- m_current_row = currentRow;
+ m_record.m_row_buffer = buffer;
+ m_current_row = row;
+#ifdef assert
+ assert(m_current_row < m_result_rows);
+#endif
}
inline
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:06:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:36:56 +0000
@@ -788,7 +788,7 @@ NdbResultStream::firstResult()
if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_buffer, m_currentRow);
return m_currentRow;
}
@@ -805,7 +805,7 @@ NdbResultStream::nextResult()
(m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_buffer, m_currentRow);
return m_currentRow;
}
m_iterState = Iter_finished;
@@ -829,7 +829,8 @@ NdbResultStream::execTRANSID_AI(const Ui
* Store TupleCorrelation as hidden value imm. after received row
* (NdbQueryOperationImpl::getRowSize() has reserved space for it)
*/
- Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+ Uint32* row_recv = reinterpret_cast<Uint32*>
+ (m_receiver.m_record.m_row_recv);
row_recv[-1] = correlation.toUint32();
}
} // NdbResultStream::execTRANSID_AI()
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-17 12:36:56 +0000
@@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo
if (useRec)
{
m_record.m_ndb_record= NULL;
- m_record.m_row= NULL;
+ m_record.m_row_recv= NULL;
m_record.m_row_buffer= NULL;
m_record.m_row_offset= 0;
m_record.m_read_range_no= false;
@@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord*
assert(m_using_ndb_record);
m_record.m_ndb_record= rec;
- m_record.m_row= row_ptr;
+ m_record.m_row_recv= row_ptr;
m_record.m_row_offset= rec->m_row_size;
}
-#define KEY_ATTR_ID (~(Uint32)0)
+void
+NdbReceiver::prepareReceive(char *buf)
+{
+ /* Set pointers etc. to prepare for receiving the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_received_result_length = 0;
+ m_expected_result_length = 0;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_recv= buf;
+ }
+ theCurrentRecAttr = theFirstRecAttr;
+}
+
+void
+NdbReceiver::prepareRead(char *buf, Uint32 rows)
+{
+ /* Set pointers etc. to prepare for reading the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_current_row = 0;
+ m_result_rows = rows;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_buffer = buf;
+ }
+}
+
+ #define KEY_ATTR_ID (~(Uint32)0)
/*
Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
@@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd
{
m_using_ndb_record= true;
m_record.m_ndb_record= ndb_record;
- m_record.m_row= row_buffer;
+ m_record.m_row_recv= row_buffer;
m_record.m_row_buffer= row_buffer;
m_record.m_row_offset= rowsize;
m_record.m_read_range_no= read_range_no;
@@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui
{
if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
{
- setRecToNULL(col, m_record.m_row);
+ setRecToNULL(col, m_record.m_row_recv);
// Next column...
continue;
@@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert(m_record.m_read_range_no);
assert(attrSize==4);
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
- memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
+ memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
+ aDataPtr++, 4);
aLength--;
continue; // Next
}
@@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
aDataPtr,
- m_record.m_row);
+ m_record.m_row_recv);
aDataPtr+= len;
aLength-= len;
continue; // Next
@@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32
/* Save this extra getValue */
save_pos+= sizeof(Uint32);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
&attrSize, sizeof(Uint32));
if (attrSize > 0)
{
save_pos+= attrSize;
assert (save_pos<=m_record.m_row_offset);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
aDataPtr, attrSize);
}
@@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
if (m_using_ndb_record) {
/* Move onto next row in scan buffer */
- m_record.m_row+= m_record.m_row_offset;
+ m_record.m_row_recv+= m_record.m_row_offset;
}
return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4462 to 4464) | Ole John Aske | 22 Aug |