From: Ole John Aske Date: August 17 2011 12:37pm Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4462 to 4464) List-Archive: http://lists.mysql.com/commits/140674 Message-Id: <20110817123737.2BDB0218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4464 Ole John Aske 2011-08-17 SPJ extensions:In preparation for implementing handling of double buffered result sets in class NdbReceiver, extend NdbReceiver with two new methods to resp. set buffer positions for receiving and reading result rows into/from the NdbReceiver. NOTE: Different receive and read position did already exists as part of the NdbReceiver - This patch only extend the interface to be able to set it from the outside. modified: storage/ndb/include/ndbapi/NdbReceiver.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbReceiver.cpp 4463 Ole John Aske 2011-08-17 Rename class NdbReceiver member field 'm_row' to 'm_row_recv' in order to clarify its usage as a row buffer only used when received rows by the NdbReceiver. (As opposed to the when we we *read* rows which has been received by a NdbReceived) modified: storage/ndb/include/ndbapi/NdbReceiver.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbReceiver.cpp 4462 Ole John Aske 2011-08-17 Refactoring as part of preparing SPJ results to be double (multi...?) buffered. 'class SharedFragStack' which was intended as a container for fragments which has received a complete batch of rows is removed. It is replaced with a set of (mutex protected) methods in 'class NdbRootFragment' and OrderedFragSet::prepareMoreResults() The rational for this refactoring is that the SharedFragStack container wasn't a particular good fit to represent the state when *multiple* batches of (double buffered) result set may be available. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp === modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp' --- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:36:56 +0000 @@ -40,7 +40,6 @@ class NdbReceiver friend class NdbIndexScanOperation; friend class NdbTransaction; friend class NdbRootFragment; - friend class ReceiverIdIterator; friend int compare_ndbrecord(const NdbReceiver *r1, const NdbReceiver *r2, const NdbRecord *key_record, @@ -82,6 +81,12 @@ public: void setErrorCode(int); + /* Prepare for receiving of rows into specified buffer */ + void prepareReceive(char *buf); + + /* Prepare for reading of rows from specified buffer */ + void prepareRead(char *buf, Uint32 rows); + private: Uint32 theMagicNumber; Ndb* m_ndb; @@ -140,8 +145,9 @@ private: /* members used for NdbRecord operation. */ struct { const NdbRecord *m_ndb_record; - char *m_row; - /* Block of memory used to receive all rows in a batch during scan. */ + /* Destination to receive next row into. */ + char *m_row_recv; + /* Block of memory used to read all rows in a batch during scan. */ char *m_row_buffer; /* Offsets between two rows in m_row_buffer. @@ -226,7 +232,7 @@ private: const char * & data_ptr) const; int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const; /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/ - void setCurrentRow(Uint32 currentRow); + void setCurrentRow(char* buffer, Uint32 row); /** Used by NdbQueryOperationImpl.*/ Uint32 getCurrentRow() const { return m_current_row; } }; @@ -259,7 +265,7 @@ NdbReceiver::prepareSend(){ if (m_using_ndb_record) { if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION) - m_record.m_row= m_record.m_row_buffer; + m_record.m_row_recv= m_record.m_row_buffer; } theCurrentRecAttr = theFirstRecAttr; } @@ -287,9 +293,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr inline void -NdbReceiver::setCurrentRow(Uint32 currentRow) +NdbReceiver::setCurrentRow(char* buffer, Uint32 row) { - m_current_row = currentRow; + m_record.m_row_buffer = buffer; + m_current_row = row; +#ifdef assert + assert(m_current_row < m_result_rows); +#endif } inline === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:06:04 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:36:56 +0000 @@ -788,7 +788,7 @@ NdbResultStream::firstResult() if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_currentRow); + m_receiver.setCurrentRow(m_buffer, m_currentRow); return m_currentRow; } @@ -805,7 +805,7 @@ NdbResultStream::nextResult() (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_currentRow); + m_receiver.setCurrentRow(m_buffer, m_currentRow); return m_currentRow; } m_iterState = Iter_finished; @@ -829,7 +829,8 @@ NdbResultStream::execTRANSID_AI(const Ui * Store TupleCorrelation as hidden value imm. after received row * (NdbQueryOperationImpl::getRowSize() has reserved space for it) */ - Uint32* row_recv = reinterpret_cast(m_receiver.m_record.m_row); + Uint32* row_recv = reinterpret_cast + (m_receiver.m_record.m_row_recv); row_recv[-1] = correlation.toUint32(); } } // NdbResultStream::execTRANSID_AI() === modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp' --- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-17 12:36:56 +0000 @@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo if (useRec) { m_record.m_ndb_record= NULL; - m_record.m_row= NULL; + m_record.m_row_recv= NULL; m_record.m_row_buffer= NULL; m_record.m_row_offset= 0; m_record.m_read_range_no= false; @@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord* assert(m_using_ndb_record); m_record.m_ndb_record= rec; - m_record.m_row= row_ptr; + m_record.m_row_recv= row_ptr; m_record.m_row_offset= rec->m_row_size; } -#define KEY_ATTR_ID (~(Uint32)0) +void +NdbReceiver::prepareReceive(char *buf) +{ + /* Set pointers etc. to prepare for receiving the first row of the batch. */ + assert(theMagicNumber == 0x11223344); + m_received_result_length = 0; + m_expected_result_length = 0; + if (m_using_ndb_record) + { + m_record.m_row_recv= buf; + } + theCurrentRecAttr = theFirstRecAttr; +} + +void +NdbReceiver::prepareRead(char *buf, Uint32 rows) +{ + /* Set pointers etc. to prepare for reading the first row of the batch. */ + assert(theMagicNumber == 0x11223344); + m_current_row = 0; + m_result_rows = rows; + if (m_using_ndb_record) + { + m_record.m_row_buffer = buf; + } +} + + #define KEY_ATTR_ID (~(Uint32)0) /* Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to @@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd { m_using_ndb_record= true; m_record.m_ndb_record= ndb_record; - m_record.m_row= row_buffer; + m_record.m_row_recv= row_buffer; m_record.m_row_buffer= row_buffer; m_record.m_row_offset= rowsize; m_record.m_read_range_no= read_range_no; @@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui { if (BitmaskImpl::get(bmlen, aDataPtr, ++i)) { - setRecToNULL(col, m_record.m_row); + setRecToNULL(col, m_record.m_row_recv); // Next column... continue; @@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32 assert(m_record.m_read_range_no); assert(attrSize==4); assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize); - memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4); + memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size, + aDataPtr++, 4); aLength--; continue; // Next } @@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size); Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length aDataPtr, - m_record.m_row); + m_record.m_row_recv); aDataPtr+= len; aLength-= len; continue; // Next @@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32 /* Save this extra getValue */ save_pos+= sizeof(Uint32); - memcpy(m_record.m_row + m_record.m_row_offset - save_pos, + memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos, &attrSize, sizeof(Uint32)); if (attrSize > 0) { save_pos+= attrSize; assert (save_pos<=m_record.m_row_offset); - memcpy(m_record.m_row + m_record.m_row_offset - save_pos, + memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos, aDataPtr, attrSize); } @@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32 if (m_using_ndb_record) { /* Move onto next row in scan buffer */ - m_record.m_row+= m_record.m_row_offset; + m_record.m_row_recv+= m_record.m_row_offset; } return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0); } No bundle (reason: useless for push emails).