From: Pekka Nousiainen Date: August 21 2011 8:52am Subject: bzr push into mysql-5.1-telco-7.0 branch (pekka.nousiainen:4461 to 4472) WL#4124 List-Archive: http://lists.mysql.com/commits/140738 Message-Id: <20110821085247.4232955875@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4472 Pekka Nousiainen 2011-08-21 wl#4124 g01_enable.diff MTR: index-stat-enable=1 (but AutoCreate OFF) modified: mysql-test/suite/ndb/my.cnf mysql-test/suite/ndb/r/ndb_index.result mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics0.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat_enable.inc mysql-test/suite/ndb/t/ndb_share.cnf 4471 jonas oreland 2011-08-19 [merge] ndb - merge 63 to 70 modified: storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 4470 jonas oreland 2011-08-19 [merge] ndb - merge 63 to 70 modified: storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 4469 Ole John Aske 2011-08-18 SPJ: Disallow copy and assignment of NdbRootFragment objects modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4468 Ole John Aske 2011-08-17 SPJ: Rename of class OrderedFragSet member fields m_emptiedFragCount & m_emptiedFrags to m_fetchMoreFragCount & m_fetchMoreFrags. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4467 Ole John Aske 2011-08-17 SPJ: Code cleanup: move NdbQueryImpl::OrderedFragSet::clear() to a place closer to C'tor / D'tor code where it more naturally belongs. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4466 Ole John Aske 2011-08-17 SPJ: Rename a couple of ::reset() methods to ::prepareNextReceiveSet() which is more descriptive. Also reorder code to place it together with NdbResultStream::prepareResultSet() which is logical related. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4465 Ole John Aske 2011-08-17 SPJ: Introduce the helper class NdbResultSet which manage the receive/read buffer for class NdbResultStream. The intention is to later change class NdbResultStream to have multiple NdbResultSets -> double buffered results. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4464 Ole John Aske 2011-08-17 SPJ extensions:In preparation for implementing handling of double buffered result sets in class NdbReceiver, extend NdbReceiver with two new methods to resp. set buffer positions for receiving and reading result rows into/from the NdbReceiver. NOTE: Different receive and read position did already exists as part of the NdbReceiver - This patch only extend the interface to be able to set it from the outside. modified: storage/ndb/include/ndbapi/NdbReceiver.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbReceiver.cpp 4463 Ole John Aske 2011-08-17 Rename class NdbReceiver member field 'm_row' to 'm_row_recv' in order to clarify its usage as a row buffer only used when received rows by the NdbReceiver. (As opposed to the when we we *read* rows which has been received by a NdbReceived) modified: storage/ndb/include/ndbapi/NdbReceiver.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbReceiver.cpp 4462 Ole John Aske 2011-08-17 Refactoring as part of preparing SPJ results to be double (multi...?) buffered. 'class SharedFragStack' which was intended as a container for fragments which has received a complete batch of rows is removed. It is replaced with a set of (mutex protected) methods in 'class NdbRootFragment' and OrderedFragSet::prepareMoreResults() The rational for this refactoring is that the SharedFragStack container wasn't a particular good fit to represent the state when *multiple* batches of (double buffered) result set may be available. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4461 Jonas Oreland 2011-08-17 [merge] ndb - merge wl4124-new2 removed: mysql-test/suite/ndb/r/ndb_statistics.result mysql-test/suite/ndb/t/ndb_statistics.test added: mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics0.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat.test mysql-test/suite/ndb/t/ndb_index_stat_enable.inc mysql-test/suite/ndb/t/ndb_statistics.inc mysql-test/suite/ndb/t/ndb_statistics0.test mysql-test/suite/ndb/t/ndb_statistics1.test modified: mysql-test/suite/ndb/r/ndb_restore_misc.result mysql-test/suite/ndb/t/ndb_restore_misc.test sql/ha_ndb_index_stat.cc sql/ha_ndb_index_stat.h sql/ha_ndbcluster.cc sql/ha_ndbcluster.h storage/ndb/include/ndb_constants.h storage/ndb/include/ndbapi/NdbIndexStat.hpp storage/ndb/include/util/NdbPack.hpp storage/ndb/src/common/util/NdbPack.cpp storage/ndb/src/ndbapi/NdbIndexStat.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp storage/ndb/test/ndbapi/testIndexStat.cpp storage/ndb/tools/ndb_index_stat.cpp === modified file '.bzr-mysql/default.conf' --- a/.bzr-mysql/default.conf 2011-08-09 17:09:06 +0000 +++ b/.bzr-mysql/default.conf 2011-08-17 09:09:22 +0000 @@ -1,4 +1,4 @@ [MYSQL] post_commit_to = commits@stripped post_push_to = commits@stripped -tree_name = mysql-5.1-telco-7.0-wl4124-new2 +tree_name = mysql-5.1-telco-7.0 === modified file 'mysql-test/suite/ndb/my.cnf' --- a/mysql-test/suite/ndb/my.cnf 2011-04-15 09:34:10 +0000 +++ b/mysql-test/suite/ndb/my.cnf 2011-08-21 08:48:41 +0000 @@ -6,6 +6,8 @@ ndbd=, ndb_mgmd= mysqld=, ndbapi=,,,,,,,,,,, +IndexStatAutoCreate=0 +IndexStatAutoUpdate=0 [cluster_config.mysqld.1.1] NodeId=49 @@ -42,6 +44,7 @@ ndb-wait-connected=20 ndb-wait-setup=120 ndb-cluster-connection-pool=3 ndb-extra-logging=99 +ndb-index-stat-enable=1 [ENV] NDB_CONNECTSTRING= @mysql_cluster.1.ndb_connectstring === modified file 'mysql-test/suite/ndb/r/ndb_index.result' --- a/mysql-test/suite/ndb/r/ndb_index.result 2011-02-28 10:42:04 +0000 +++ b/mysql-test/suite/ndb/r/ndb_index.result 2011-08-21 08:48:41 +0000 @@ -306,7 +306,7 @@ explain select i,vc from t1 where i>=1 or vc > '0'; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t1 index_merge PRIMARY,i1,i2 i1,i2 5,18 NULL 6 Using sort_union(i1,i2); Using where with pushed condition +1 SIMPLE t1 ALL PRIMARY,i1,i2 NULL NULL NULL 23 Using where with pushed condition select i,vc from t1 where i>=1 or vc > '0'; i vc @@ -350,7 +350,7 @@ explain select i,vc from t2 where i>=1 or vc > '0'; id select_type table type possible_keys key key_len ref rows Extra -1 SIMPLE t2 index_merge i1,i2 i1,i2 5,19 NULL 6 Using sort_union(i1,i2); Using where with pushed condition +1 SIMPLE t2 ALL i1,i2 NULL NULL NULL 23 Using where with pushed condition select i,vc from t2 where i>=1 or vc > '0'; i vc === modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result' --- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-07-23 14:38:08 +0000 +++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-21 08:48:41 +0000 @@ -2,16 +2,14 @@ DROP TABLE IF EXISTS t1, t2; set @is_enable_default = @@global.ndb_index_stat_enable; set @is_enable = 1; set @is_enable = NULL; -# is_enable_on=1 is_enable_off=0 +# is_enable_on=0 is_enable_off=0 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF -set @@global.ndb_index_stat_enable = 1; -set @@local.ndb_index_stat_enable = 1; +ndb_index_stat_enable ON # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -477,7 +475,7 @@ count(*) drop table t1; set @is_enable = @is_enable_default; set @is_enable = NULL; -# is_enable_on=0 is_enable_off=1 +# is_enable_on=0 is_enable_off=0 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -485,14 +483,10 @@ ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value ndb_index_stat_enable ON -set @@local.ndb_index_stat_enable = 0; -set @@global.ndb_index_stat_enable = 0; -drop table mysql.ndb_index_stat_sample; -drop table mysql.ndb_index_stat_head; # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON === modified file 'mysql-test/suite/ndb/r/ndb_statistics0.result' --- a/mysql-test/suite/ndb/r/ndb_statistics0.result 2011-07-02 07:05:32 +0000 +++ b/mysql-test/suite/ndb/r/ndb_statistics0.result 2011-08-21 08:48:41 +0000 @@ -1,14 +1,16 @@ set @is_enable_default = @@global.ndb_index_stat_enable; set @is_enable = 0; set @is_enable = NULL; -# is_enable_on=0 is_enable_off=0 +# is_enable_on=0 is_enable_off=1 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON +set @@local.ndb_index_stat_enable = 0; +set @@global.ndb_index_stat_enable = 0; # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -180,7 +182,7 @@ DROP TABLE t10,t100,t10000; End of 5.1 tests set @is_enable = @is_enable_default; set @is_enable = NULL; -# is_enable_on=0 is_enable_off=0 +# is_enable_on=1 is_enable_off=0 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -188,10 +190,12 @@ ndb_index_stat_enable OFF show local variables like 'ndb_index_stat_enable'; Variable_name Value ndb_index_stat_enable OFF +set @@global.ndb_index_stat_enable = 1; +set @@local.ndb_index_stat_enable = 1; # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON === modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result' --- a/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-07-25 17:11:41 +0000 +++ b/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-21 08:48:41 +0000 @@ -1,16 +1,14 @@ set @is_enable_default = @@global.ndb_index_stat_enable; set @is_enable = 1; set @is_enable = NULL; -# is_enable_on=1 is_enable_off=0 +# is_enable_on=0 is_enable_off=0 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF -set @@global.ndb_index_stat_enable = 1; -set @@local.ndb_index_stat_enable = 1; +ndb_index_stat_enable ON # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -182,7 +180,7 @@ DROP TABLE t10,t100,t10000; End of 5.1 tests set @is_enable = @is_enable_default; set @is_enable = NULL; -# is_enable_on=0 is_enable_off=1 +# is_enable_on=0 is_enable_off=0 # ndb_index_stat_enable - before show global variables like 'ndb_index_stat_enable'; Variable_name Value @@ -190,14 +188,10 @@ ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value ndb_index_stat_enable ON -set @@local.ndb_index_stat_enable = 0; -set @@global.ndb_index_stat_enable = 0; -drop table mysql.ndb_index_stat_sample; -drop table mysql.ndb_index_stat_head; # ndb_index_stat_enable - after show global variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON show local variables like 'ndb_index_stat_enable'; Variable_name Value -ndb_index_stat_enable OFF +ndb_index_stat_enable ON === modified file 'mysql-test/suite/ndb/t/ndb_index_stat_enable.inc' --- a/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc 2011-07-02 07:05:32 +0000 +++ b/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc 2011-08-21 08:48:41 +0000 @@ -1,11 +1,12 @@ # turn ndb_index_stat_enable ON or OFF -# caller sets @is_enable 0/1 +# caller sets @is_enable_default and @is_enable 0/1 # based on global variable, local follows global # do nothing if value is already correct # setting OFF drops stats tables to avoid MTR diff let is_enable_on = `select @is_enable and not @@global.ndb_index_stat_enable`; let is_enable_off = `select not @is_enable and @@global.ndb_index_stat_enable`; +let is_enable_default = `select @is_enable_default`; set @is_enable = NULL; --echo # is_enable_on=$is_enable_on is_enable_off=$is_enable_off @@ -30,8 +31,12 @@ if ($is_enable_off) eval set @@global.ndb_index_stat_enable = 0; # stats thread does not (and must not) drop stats tables - eval drop table mysql.ndb_index_stat_sample; - eval drop table mysql.ndb_index_stat_head; + # also do not drop tables if MTR default is enable=1 + if (!$is_enable_default) + { + eval drop table mysql.ndb_index_stat_sample; + eval drop table mysql.ndb_index_stat_head; + } } --echo # ndb_index_stat_enable - after === modified file 'mysql-test/suite/ndb/t/ndb_share.cnf' --- a/mysql-test/suite/ndb/t/ndb_share.cnf 2011-07-07 10:00:25 +0000 +++ b/mysql-test/suite/ndb/t/ndb_share.cnf 2011-08-21 08:48:41 +0000 @@ -3,6 +3,12 @@ [cluster_config.1] mysqld=,,, +[mysqld] +# Test does restart -i which drops stats tables +# MTR check-testcases then fails (before/after state not preserved) +# Disable until better solution found +ndb-index-stat-enable=0 + [mysqld.1.1] new log-bin=mysqld-bin === modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp' --- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:36:56 +0000 @@ -40,7 +40,6 @@ class NdbReceiver friend class NdbIndexScanOperation; friend class NdbTransaction; friend class NdbRootFragment; - friend class ReceiverIdIterator; friend int compare_ndbrecord(const NdbReceiver *r1, const NdbReceiver *r2, const NdbRecord *key_record, @@ -82,6 +81,12 @@ public: void setErrorCode(int); + /* Prepare for receiving of rows into specified buffer */ + void prepareReceive(char *buf); + + /* Prepare for reading of rows from specified buffer */ + void prepareRead(char *buf, Uint32 rows); + private: Uint32 theMagicNumber; Ndb* m_ndb; @@ -140,8 +145,9 @@ private: /* members used for NdbRecord operation. */ struct { const NdbRecord *m_ndb_record; - char *m_row; - /* Block of memory used to receive all rows in a batch during scan. */ + /* Destination to receive next row into. */ + char *m_row_recv; + /* Block of memory used to read all rows in a batch during scan. */ char *m_row_buffer; /* Offsets between two rows in m_row_buffer. @@ -226,7 +232,7 @@ private: const char * & data_ptr) const; int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const; /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/ - void setCurrentRow(Uint32 currentRow); + void setCurrentRow(char* buffer, Uint32 row); /** Used by NdbQueryOperationImpl.*/ Uint32 getCurrentRow() const { return m_current_row; } }; @@ -259,7 +265,7 @@ NdbReceiver::prepareSend(){ if (m_using_ndb_record) { if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION) - m_record.m_row= m_record.m_row_buffer; + m_record.m_row_recv= m_record.m_row_buffer; } theCurrentRecAttr = theFirstRecAttr; } @@ -287,9 +293,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr inline void -NdbReceiver::setCurrentRow(Uint32 currentRow) +NdbReceiver::setCurrentRow(char* buffer, Uint32 row) { - m_current_row = currentRow; + m_record.m_row_buffer = buffer; + m_current_row = row; +#ifdef assert + assert(m_current_row < m_result_rows); +#endif } inline === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-08-19 08:21:21 +0000 @@ -3880,6 +3880,8 @@ done: ndbassert(gci == restorableGCI); replicaPtr.p->m_restorable_gci = gci; Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1; + if (startGci > gci) + startGci = gci; ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)", takeOverPtr.p->toStartingNode, takeOverPtr.p->toCurrentTabref, @@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal } } - if(arg == 7019 && signal->getLength() == 2) + if(arg == 7019 && signal->getLength() == 2 && + signal->theData[1] < MAX_NDB_NODES) { char buf2[8+1]; NodeRecordPtr nodePtr; === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-07-04 13:40:57 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-08-19 08:21:21 +0000 @@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s nodePtr.p->m_failconf_blocks[3], nodePtr.p->m_failconf_blocks[4]); warningEvent("%s", buf); - - for (Uint32 i = 0; im_failconf_blocks); - i++) - { - jam(); - if (nodePtr.p->m_failconf_blocks[i] != 0) - { - jam(); - signal->theData[0] = 7019; - signal->theData[1] = nodePtr.i; - sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i], - getOwnNodeId()), - GSN_DUMP_STATE_ORD, signal, 2, JBB); - } - else - { - jam(); - break; - } - } } } } === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:37:24 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-18 11:47:22 +0000 @@ -181,18 +181,24 @@ public: */ void init(NdbQueryImpl& query, Uint32 fragNo); + static void clear(NdbRootFragment* frags, Uint32 noOfFrags); + Uint32 getFragNo() const { return m_fragNo; } /** - * Prepare for receiving another batch. + * Prepare for receiving another batch of results. */ - void reset(); + void prepareNextReceiveSet(); /** * Prepare for reading another batch of results. */ - void prepareResultSet(); + void grabNextResultSet(); // Need mutex lock + + bool hasReceivedMore() const; // Need mutex lock + + void setReceivedMore(); // Need mutex lock void incrOutstandingResults(Int32 delta) { @@ -204,7 +210,7 @@ public: m_outstandingResults = 0; } - void setConfReceived(); + void setConfReceived(Uint32 tcPtrI); /** * The root operation will read from a number of fragments of a table. @@ -261,6 +267,10 @@ public: void postFetchRelease(); private: + /** No copying.*/ + NdbRootFragment(const NdbRootFragment&); + NdbRootFragment& operator=(const NdbRootFragment&); + STATIC_CONST( voidFragNo = 0xffffffff); /** Enclosing query.*/ @@ -273,12 +283,19 @@ private: NdbResultStream* m_resultStreams; /** - * The number of outstanding TCKEYREF or TRANSID_AI - * messages for the fragment. This includes both messages related to the + * Number of available prefetched ResultSets which are completely + * received. Will be made available for reading by calling + * ::grabNextResultSet() + */ + Uint32 m_availResultSets; // Need mutex + + /** + * The number of outstanding TCKEYREF or TRANSID_AI messages to receive + * for the fragment. This includes both messages related to the * root operation and any descendant operation that was instantiated as * a consequence of tuples found by the root operation. - * This number may temporarily be negative if e.g. TRANSID_AI arrives before - * SCAN_TABCONF. + * This number may temporarily be negative if e.g. TRANSID_AI arrives + * before SCAN_TABCONF. */ Int32 m_outstandingResults; @@ -287,7 +304,8 @@ private: * operation accesses (i.e. one for a lookup, all for a table scan). * * Each element is true iff a SCAN_TABCONF (for that fragment) or - * TCKEYCONF message has been received */ + * TCKEYCONF message has been received + */ bool m_confReceived; /** @@ -311,6 +329,55 @@ private: int m_idMapNext; }; //NdbRootFragment +/** + * 'class NdbResultSet' is a helper for 'class NdbResultStream'. + * It manages the buffers which rows are received into and + * read from. + */ +class NdbResultSet +{ + friend class NdbResultStream; + +public: + explicit NdbResultSet(); + + void init(NdbQueryImpl& query, + Uint32 maxRows, Uint32 rowSize); + + void prepareReceive(NdbReceiver& receiver) + { + m_rowCount = 0; + receiver.prepareReceive(m_buffer); + } + + void prepareRead(NdbReceiver& receiver) + { + receiver.prepareRead(m_buffer,m_rowCount); + } + + Uint32 getRowCount() const + { return m_rowCount; } + + char* getRow(Uint32 tupleNo) const + { return (m_buffer + (tupleNo*m_rowSize)); } + +private: + /** No copying.*/ + NdbResultSet(const NdbResultSet&); + NdbResultSet& operator=(const NdbResultSet&); + + /** The buffers which we receive the results into */ + char* m_buffer; + + /** Used for checking if buffer overrun occurred. */ + Uint32* m_batchOverflowCheck; + + Uint32 m_rowSize; + + /** The current #rows in 'm_buffer'.*/ + Uint32 m_rowCount; + +}; // class NdbResultSet /** * This class manages the subset of result data for one operation that is @@ -340,7 +407,7 @@ public: void prepare(); /** Prepare for receiving next batch of scan results. */ - void reset(); + void prepareNextReceiveSet(); NdbReceiver& getReceiver() { return m_receiver; } @@ -348,11 +415,8 @@ public: const NdbReceiver& getReceiver() const { return m_receiver; } - Uint32 getRowCount() const - { return m_rowCount; } - - char* getRow(Uint32 tupleNo) const - { return (m_buffer + (tupleNo*m_rowSize)); } + const char* getCurrentRow() const + { return m_receiver.peek_row(); } /** * Process an incomming tuple for this stream. Extract parent and own tuple @@ -372,9 +436,9 @@ public: bool prepareResultSet(Uint32 remainingScans); /** - * Navigate within the current result batch to resp. first and next row. + * Navigate within the current ResultSet to resp. first and next row. * For non-parent operations in the pushed query, navigation is with respect - * to any preceding parents which results in this NdbResultStream depends on. + * to any preceding parents which results in this ResultSet depends on. * Returns either the tupleNo within TupleSet[] which we navigated to, or * tupleNotFound(). */ @@ -490,25 +554,22 @@ private: /** The receiver object that unpacks transid_AI messages.*/ NdbReceiver m_receiver; - /** The buffers which we receive the results into */ - char* m_buffer; - - /** Used for checking if buffer overrun occurred. */ - Uint32* m_batchOverflowCheck; - - Uint32 m_rowSize; - - /** The number of transid_AI messages received.*/ - Uint32 m_rowCount; + /** + * ResultSets are received into and read from this stream, + * intended to be extended into double buffered ResultSet later. + */ + NdbResultSet m_resultSets[1]; + Uint32 m_read; // We read from m_resultSets[m_read] + Uint32 m_recv; // We receive into m_resultSets[m_recv] /** This is the state of the iterator used by firstResult(), nextResult().*/ enum { /** The first row has not been fetched yet.*/ Iter_notStarted, - /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/ + /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/ Iter_started, - /** Last row for current batch has been returned.*/ + /** Last row for current ResultSet has been returned.*/ Iter_finished } m_iterState; @@ -586,6 +647,34 @@ void* NdbBulkAllocator::allocObjMem(Uint return m_nextObjNo > m_maxObjs ? NULL : result; } +/////////////////////////////////////////// +///////// NdbResultSet methods /////////// +/////////////////////////////////////////// +NdbResultSet::NdbResultSet() : + m_buffer(NULL), + m_batchOverflowCheck(NULL), + m_rowSize(0), + m_rowCount(0) +{} + +void +NdbResultSet::init(NdbQueryImpl& query, + Uint32 maxRows, + Uint32 rowSize) +{ + m_rowSize = rowSize; + { + const int bufferSize = rowSize * maxRows; + NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); + m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); + + // So that we can test for buffer overrun. + m_batchOverflowCheck = + reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); + *m_batchOverflowCheck = 0xacbd1234; + } +} + ////////////////////////////////////////////// ///////// NdbResultStream methods /////////// ////////////////////////////////////////////// @@ -607,10 +696,7 @@ NdbResultStream::NdbResultStream(NdbQuer | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll ? Is_Inner_Join : 0))), m_receiver(operation.getQuery().getNdbTransaction().getNdb()), - m_buffer(NULL), - m_batchOverflowCheck(NULL), - m_rowSize(0), - m_rowCount(0), + m_resultSets(), m_read(0xffffffff), m_recv(0), m_iterState(Iter_notStarted), m_currentRow(tupleNotFound), m_maxRows(0), @@ -631,12 +717,8 @@ NdbResultStream::prepare() const Uint32 rowSize = m_operation.getRowSize(); NdbQueryImpl &query = m_operation.getQuery(); - m_rowSize = rowSize; - /* Parent / child correlation is only relevant for scan type queries - * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups! - * Neither is these structures required for operations not having respective - * child or parent operations. + * Don't create a m_tupleSet with these correlation id's for lookups! */ if (isScanQuery()) { @@ -648,14 +730,7 @@ NdbResultStream::prepare() else m_maxRows = 1; - const int bufferSize = rowSize * m_maxRows; - NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); - m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); - - // So that we can test for buffer overrun. - m_batchOverflowCheck = - reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); - *m_batchOverflowCheck = 0xacbd1234; + m_resultSets[0].init(query, m_maxRows, rowSize); m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation); m_receiver.do_setup_ndbrecord( @@ -664,32 +739,9 @@ NdbResultStream::prepare() 0 /*key_size*/, 0 /*read_range_no*/, rowSize, - m_buffer); + m_resultSets[m_recv].m_buffer); } //NdbResultStream::prepare -void -NdbResultStream::reset() -{ - assert (isScanQuery()); - - // Root scan-operation need a ScanTabConf to complete - m_rowCount = 0; - m_iterState = Iter_notStarted; - m_currentRow = tupleNotFound; - - m_receiver.prepareSend(); - /** - * If this stream will get new rows in the next batch, then so will - * all of its descendants. - */ - for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); - childNo++) - { - NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); - m_rootFrag.getResultStream(child).reset(); - } -} //NdbResultStream::reset - /** Locate, and return 'tupleNo', of first tuple with specified parentId. * parentId == tupleNotFound is use as a special value for iterating results * from the root operation in the order which they was inserted by @@ -703,11 +755,11 @@ NdbResultStream::findTupleWithParentId(U { assert ((parentId==tupleNotFound) == (m_parent==NULL)); - if (likely(m_rowCount>0)) + if (likely(m_resultSets[m_read].m_rowCount>0)) { if (m_tupleSet==NULL) { - assert (m_rowCount <= 1); + assert (m_resultSets[m_read].m_rowCount <= 1); return 0; } @@ -774,7 +826,7 @@ NdbResultStream::firstResult() if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_currentRow); + m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow); return m_currentRow; } @@ -782,7 +834,6 @@ NdbResultStream::firstResult() return tupleNotFound; } //NdbResultStream::firstResult() - Uint16 NdbResultStream::nextResult() { @@ -791,14 +842,13 @@ NdbResultStream::nextResult() (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_currentRow); + m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow); return m_currentRow; } m_iterState = Iter_finished; return tupleNotFound; } //NdbResultStream::nextResult() - /** * Callback when a TRANSID_AI signal (receive row) is processed. */ @@ -807,7 +857,7 @@ NdbResultStream::execTRANSID_AI(const Ui TupleCorrelation correlation) { m_receiver.execTRANSID_AI(ptr, len); - m_rowCount++; + m_resultSets[m_recv].m_rowCount++; if (isScanQuery()) { @@ -815,12 +865,39 @@ NdbResultStream::execTRANSID_AI(const Ui * Store TupleCorrelation as hidden value imm. after received row * (NdbQueryOperationImpl::getRowSize() has reserved space for it) */ - Uint32* row_recv = reinterpret_cast(m_receiver.m_record.m_row); + Uint32* row_recv = reinterpret_cast + (m_receiver.m_record.m_row_recv); row_recv[-1] = correlation.toUint32(); } } // NdbResultStream::execTRANSID_AI() /** + * Make preparation for another batch of results to be received. + * This NdbResultStream, and all its sibling will receive a batch + * of results from the datanodes. + */ +void +NdbResultStream::prepareNextReceiveSet() +{ + assert (isScanQuery()); + + m_iterState = Iter_notStarted; + m_currentRow = tupleNotFound; + m_resultSets[m_recv].prepareReceive(m_receiver); + + /** + * If this stream will get new rows in the next batch, then so will + * all of its descendants. + */ + for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); + childNo++) + { + NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); + m_rootFrag.getResultStream(child).prepareNextReceiveSet(); + } +} //NdbResultStream::prepareNextReceiveSet + +/** * Make preparations for another batch of result to be read: * - Fill in parent/child result correlations in m_tupleSet[] * - ... or reset m_tupleSet[] if we reuse the previous. @@ -833,12 +910,16 @@ NdbResultStream::prepareResultSet(Uint32 bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows assert(isComplete || isScanResult()); //Lookups always 'complete' - // Set correct #rows received in the NdbReceiver. - getReceiver().m_result_rows = getRowCount(); + m_read = m_recv; + NdbResultSet& readResult = m_resultSets[m_read]; + + // Set correct buffer and #rows received by this ResultSet. + readResult.prepareRead(m_receiver); /** - * Prepare NdbResultStream for reading - either the next received - * from datanodes or reuse current. + * Prepare NdbResultSet for reading - either the next received + * from datanodes or reuse the last as has been determined by + * ::prepareNextReceiveSet() */ if (m_tupleSet!=NULL) { @@ -850,7 +931,7 @@ NdbResultStream::prepareResultSet(Uint32 else { // Makes all rows in 'TupleSet' available (clear 'm_skip' flag) - for (Uint32 tupleNo=0; tupleNo 0); +} + +void NdbRootFragment::prepareNextReceiveSet() { assert(m_fragNo!=voidFragNo); assert(m_outstandingResults == 0); @@ -1102,20 +1226,30 @@ void NdbRootFragment::reset() for (unsigned opNo=0; opNogetNoOfOperations(); opNo++) { - if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans)) + NdbResultStream& resultStream = getResultStream(opNo); + if (!resultStream.isSubScanComplete(m_remainingScans)) { /** - * Reset m_resultStreams[] and all its descendants, since all these + * Reset resultStream and all its descendants, since all these * streams will get a new set of rows in the next batch. */ - m_resultStreams[opNo].reset(); + resultStream.prepareNextReceiveSet(); } } m_confReceived = false; } -void NdbRootFragment::prepareResultSet() +/** + * Let the application thread takes ownership of an available + * ResultSet, prepare it for reading first row. + * Need mutex lock as 'm_availResultSets' is accesed both from + * receiver and application thread. + */ +void NdbRootFragment::grabNextResultSet() { + assert(m_availResultSets>0); + m_availResultSets--; + NdbResultStream& rootStream = getResultStream(0); rootStream.prepareResultSet(m_remainingScans); @@ -1124,19 +1258,20 @@ void NdbRootFragment::prepareResultSet() rootStream.firstResult(); } -void NdbRootFragment::setConfReceived() +void NdbRootFragment::setConfReceived(Uint32 tcPtrI) { /* For a query with a lookup root, there may be more than one TCKEYCONF message. For a scan, there should only be one SCAN_TABCONF per root fragment. */ - assert(!m_query->getQueryDef().isScanQuery() || !m_confReceived); + assert(!getResultStream(0).isScanQuery() || !m_confReceived); + getResultStream(0).getReceiver().m_tcPtrI = tcPtrI; m_confReceived = true; } bool NdbRootFragment::finalBatchReceived() const { - return getReceiverTcPtrI()==RNIL; + return m_confReceived && getReceiverTcPtrI()==RNIL; } bool NdbRootFragment::isEmpty() const @@ -1586,6 +1721,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio m_next(NULL), m_queryDef(&queryDef), m_error(), + m_errorReceived(0), m_transaction(trans), m_scanTransaction(NULL), m_operations(0), @@ -1595,7 +1731,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio m_rootFragCount(0), m_rootFrags(NULL), m_applFrags(), - m_fullFrags(), m_finalBatchFrags(0), m_num_bounds(0), m_shortestBound(0xffffffff), @@ -2059,10 +2194,9 @@ NdbQueryImpl::nextResult(bool fetchAllow NdbQuery::NextResultOutcome NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend) { - /* To minimize lock contention, each query has two separate root fragment - * conatiners (m_fullFrags and m_applFrags). m_applFrags is only - * accessed by the application thread, so it is safe to use it without - * locks. + /* To minimize lock contention, each query has the separate root fragment + * conatiner 'm_applFrags'. m_applFrags is only accessed by the application + * thread, so it is safe to use it without locks. */ while (m_state != EndOfData) // Or likely: return when 'gotRow' or error { @@ -2128,15 +2262,12 @@ NdbQueryImpl::nextRootResult(bool fetchA */ if (fetchAllowed) { - // Ask for a new batch if we emptied one. - NdbRootFragment* emptyFrag = m_applFrags.getEmpty(); - while (emptyFrag != NULL) + // Ask for a new batch if we emptied some. + NdbRootFragment** frags; + const Uint32 cnt = m_applFrags.getFetchMore(frags); + if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0) { - if (sendFetchMore(*emptyFrag, forceSend) != 0) - { - return NdbQuery::NextResult_error; - } - emptyFrag = m_applFrags.getEmpty(); + return NdbQuery::NextResult_error; } } @@ -2181,23 +2312,17 @@ NdbQueryImpl::awaitMoreResults(bool forc */ while (likely(!hasReceivedError())) { - /* m_fullFrags contains any fragments that are complete (for this batch) - * but have not yet been moved (under mutex protection) to - * m_applFrags. + /* Scan m_rootFrags (under mutex protection) for fragments + * which has received a complete batch. Add these to m_applFrags. */ - NdbRootFragment* frag; - while ((frag=m_fullFrags.pop()) != NULL) - { - frag->prepareResultSet(); - m_applFrags.add(*frag); - } + m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount); if (m_applFrags.getCurrent() != NULL) { return FetchResult_ok; } - /* There are noe more available frament results available without - * first waiting for more to be received from datanodes + /* There are no more available fragment results available without + * first waiting for more to be received from the datanodes */ if (m_pendingFrags == 0) { @@ -2238,16 +2363,9 @@ NdbQueryImpl::awaitMoreResults(bool forc /* The root operation is a lookup. Lookups are guaranteed to be complete * before NdbTransaction::execute() returns. Therefore we do not set * the lock, because we know that the signal receiver thread will not - * be accessing m_fullFrags at this time. + * be accessing m_rootFrags at this time. */ - NdbRootFragment* frag; - if ((frag=m_fullFrags.pop()) != NULL) - { - frag->prepareResultSet(); - m_applFrags.add(*frag); - } - assert(m_fullFrags.pop()==NULL); // Only one stream for lookups. - + m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount); if (m_applFrags.getCurrent() != NULL) { return FetchResult_ok; @@ -2268,8 +2386,8 @@ NdbQueryImpl::awaitMoreResults(bool forc /* ::handleBatchComplete() is intended to be called when receiving signals only. - The PollGuard mutex is then set and the shared 'm_pendingFrags', - 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated. + The PollGuard mutex is then set and the shared 'm_pendingFrags' and + 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled. returns: 'true' when application thread should be resumed. */ @@ -2288,7 +2406,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo * terminated the scan. We are about to close this query, * and didn't expect any more data - ignore it! */ - if (likely(m_fullFrags.m_errorCode == 0)) + if (likely(m_errorReceived == 0)) { assert(rootFrag.isFragBatchComplete()); @@ -2302,10 +2420,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo assert(m_finalBatchFrags <= m_rootFragCount); } - /* When application thread ::awaitMoreResults() it will later be moved - * from m_fullFrags to m_applFrags under mutex protection. + /* When application thread ::awaitMoreResults() it will later be + * added to m_applFrags under mutex protection. */ - m_fullFrags.push(rootFrag); + rootFrag.setReceivedMore(); return true; } @@ -2329,7 +2447,7 @@ NdbQueryImpl::close(bool forceSend) } // Throw any pending results - m_fullFrags.clear(); + NdbRootFragment::clear(m_rootFrags,m_rootFragCount); m_applFrags.clear(); Ndb* const ndb = m_transaction.getNdb(); @@ -2425,7 +2543,7 @@ NdbQueryImpl::setFetchTerminated(int err } if (errorCode!=0) { - m_fullFrags.m_errorCode = errorCode; + m_errorReceived = errorCode; } m_pendingFrags = 0; } // NdbQueryImpl::setFetchTerminated() @@ -2438,9 +2556,9 @@ NdbQueryImpl::setFetchTerminated(int err bool NdbQueryImpl::hasReceivedError() { - if (unlikely(m_fullFrags.m_errorCode)) + if (unlikely(m_errorReceived)) { - setErrorCode(m_fullFrags.m_errorCode); + setErrorCode(m_errorReceived); return true; } return false; @@ -2457,7 +2575,7 @@ NdbQueryImpl::execTCKEYCONF() NdbRootFragment& rootFrag = m_rootFrags[0]; // We will get 1 + #leaf-nodes TCKEYCONF for a lookup... - rootFrag.setConfReceived(); + rootFrag.setConfReceived(RNIL); rootFrag.incrOutstandingResults(-1); bool ret = false; @@ -2545,8 +2663,7 @@ NdbQueryImpl::prepareSend() } // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects. error = m_pointerAlloc.init(m_rootFragCount * - (SharedFragStack::pointersPerFragment + - OrderedFragSet::pointersPerFragment)); + (OrderedFragSet::pointersPerFragment)); if (error != 0) { setErrorCode(error); @@ -2633,21 +2750,17 @@ NdbQueryImpl::prepareSend() keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord(); assert(keyRec!=NULL); } - if (unlikely((error = m_applFrags.prepare(m_pointerAlloc, - getRoot().getOrdering(), - m_rootFragCount, - keyRec, - getRoot().m_ndbRecord)) != 0) - || (error = m_fullFrags.prepare(m_pointerAlloc, - m_rootFragCount)) != 0) { - setErrorCode(error); - return -1; - } + m_applFrags.prepare(m_pointerAlloc, + getRoot().getOrdering(), + m_rootFragCount, + keyRec, + getRoot().m_ndbRecord); if (getQueryDef().isScanQuery()) { NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount); } + #ifdef TRACE_SERIALIZATION ndbout << "Serialized ATTRINFO : "; for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){ @@ -2735,6 +2848,74 @@ const Uint32* InitialReceiverIdIterator: } } +/** This iterator is used for inserting a sequence of 'TcPtrI' + * for a NEXTREQ to a single or multiple fragments via a GenericSectionPtr.*/ +class FetchMoreTcIdIterator: public GenericSectionIterator +{ +public: + FetchMoreTcIdIterator(NdbRootFragment* rootFrags[], + Uint32 cnt) + :m_rootFrags(rootFrags), + m_fragCount(cnt), + m_currFragNo(0) + {} + + virtual ~FetchMoreTcIdIterator() {}; + + /** + * Get next batch of receiver ids. + * @param sz This will be set to the number of receiver ids that have been + * put in the buffer (0 if end has been reached.) + * @return Array of receiver ids (or NULL if end reached. + */ + virtual const Uint32* getNextWords(Uint32& sz); + + virtual void reset() + { m_currFragNo = 0;}; + +private: + /** + * Size of internal receiver id buffer. This value is arbitrary, but + * a larger buffer would mean fewer calls to getNextWords(), possibly + * improving efficiency. + */ + static const Uint32 bufSize = 16; + + /** Set of root fragments which we want to itterate TcPtrI ids for.*/ + NdbRootFragment** m_rootFrags; + const Uint32 m_fragCount; + + /** The next fragment numnber to be processed. (Range for 0 to no of + * fragments.)*/ + Uint32 m_currFragNo; + /** Buffer for storing one batch of receiver ids.*/ + Uint32 m_receiverIds[bufSize]; +}; + +const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz) +{ + /** + * For the initial batch, we want to retrieve one batch for each fragment + * whether it is a sorted scan or not. + */ + if (m_currFragNo >= m_fragCount) + { + sz = 0; + return NULL; + } + else + { + Uint32 cnt = 0; + while (cnt < bufSize && m_currFragNo < m_fragCount) + { + m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI(); + cnt++; + m_currFragNo++; + } + sz = cnt; + return m_receiverIds; + } +} /****************************************************************************** int doSend() Send serialized queryTree and parameters encapsulated in @@ -3011,12 +3192,19 @@ Parameters: emptyFrag: Root frgament Remark: ******************************************************************************/ int -NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend) +NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[], + Uint32 cnt, + bool forceSend) { - assert(!emptyFrag.finalBatchReceived()); assert(getQueryDef().isScanQuery()); - emptyFrag.reset(); + for (Uint32 i=0; iisFragBatchComplete()); + assert(!rootFrag->finalBatchReceived()); + rootFrag->prepareNextReceiveSet(); + } Ndb& ndb = *getNdbTransaction().getNdb(); NdbApiSignal tSignal(&ndb); @@ -3033,12 +3221,11 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm scanNextReq->transId2 = (Uint32) (transId >> 32); tSignal.setLength(ScanNextReq::SignalLength); - const uint32 receiverId = emptyFrag.getReceiverTcPtrI(); - LinearSectionIterator receiverIdIter(&receiverId ,1); + FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt); GenericSectionPtr secs[1]; secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter; - secs[ScanNextReq::ReceiverIdsSectionNum].sz = 1; + secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt; NdbImpl * impl = ndb.theImpl; Uint32 nodeId = m_transaction.getConnectedNodeId(); @@ -3051,7 +3238,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm if (unlikely(hasReceivedError())) { - // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it + // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it return -1; } if (impl->getNodeSequence(nodeId) != seq || @@ -3062,8 +3249,9 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm } impl->do_forceSend(forceSend); - m_pendingFrags++; + m_pendingFrags += cnt; assert(m_pendingFrags <= getRootFragCount()); + return 0; } // NdbQueryImpl::sendFetchMore() @@ -3110,8 +3298,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe } // while assert(m_pendingFrags==0); - m_fullFrags.clear(); // Throw any unhandled results - m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching + NdbRootFragment::clear(m_rootFrags,m_rootFragCount); + m_errorReceived = 0; // Clear errors caused by previous fetching m_error.code = 0; if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor. @@ -3198,63 +3386,35 @@ int NdbQueryImpl::isPrunable(bool& pruna /**************** - * NdbQueryImpl::SharedFragStack methods. - ***************/ - -NdbQueryImpl::SharedFragStack::SharedFragStack(): - m_errorCode(0), - m_capacity(0), - m_current(-1), - m_array(NULL) -{} - -int -NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator, - int capacity) -{ - assert(m_array==NULL); - assert(m_capacity==0); - if (capacity > 0) - { m_capacity = capacity; - m_array = - reinterpret_cast(allocator.allocObjMem(capacity)); - } - return 0; -} - -void -NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag) -{ - m_current++; - assert(m_current(allocator.allocObjMem(capacity)); bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*)); - m_emptiedFrags = + + m_fetchMoreFrags = reinterpret_cast(allocator.allocObjMem(capacity)); - bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*)); + bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*)); } m_ordering = ordering; m_keyRecord = keyRecord; m_resultRecord = resultRecord; - return 0; -} +} // OrderedFragSet::prepare() /** @@ -3316,8 +3476,7 @@ NdbQueryImpl::OrderedFragSet::getCurrent assert(!m_activeFrags[m_activeFragCount-1]->isEmpty()); return m_activeFrags[m_activeFragCount-1]; } -} - +} // OrderedFragSet::getCurrent() /** * Keep the FragSet ordered, both with respect to specified ScanOrdering, and @@ -3329,42 +3488,45 @@ NdbQueryImpl::OrderedFragSet::getCurrent void NdbQueryImpl::OrderedFragSet::reorganize() { + assert(m_activeFragCount > 0); + NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1]; + // Remove the current fragment if the batch has been emptied. - if (m_activeFragCount>0 && m_activeFrags[m_activeFragCount-1]->isEmpty()) + if (frag->isEmpty()) { - if (m_activeFrags[m_activeFragCount-1]->finalBatchReceived()) + if (frag->finalBatchReceived()) { m_finalFragCount++; } else { - m_emptiedFrags[m_emptiedFragCount++] = m_activeFrags[m_activeFragCount-1]; + m_fetchMoreFrags[m_fetchMoreFragCount++] = frag; } m_activeFragCount--; - assert(m_activeFragCount==0 || - !m_activeFrags[m_activeFragCount-1]->isEmpty()); - assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount <= m_capacity); + + return; // Remaining m_activeFrags[] are sorted } // Reorder fragments if this is a sorted scan. - if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered && - m_activeFragCount+m_finalFragCount == m_capacity) + if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered) { /** * This is a sorted scan. There are more data to be read from * m_activeFrags[m_activeFragCount-1]. Move it to its proper place. + * + * Use binary search to find the largest record that is smaller than or + * equal to m_activeFrags[m_activeFragCount-1]. */ int first = 0; int last = m_activeFragCount-1; - /* Use binary search to find the largest record that is smaller than or - * equal to m_activeFrags[m_activeFragCount-1] */ int middle = (first+last)/2; - while(first= 0); - - if(middle < m_activeFragCount-1) + // Move into correct sorted position + if (middle < m_activeFragCount-1) { - NdbRootFragment* const oldTop = m_activeFrags[m_activeFragCount-1]; + assert(compare(*frag, *m_activeFrags[middle]) >= 0); memmove(m_activeFrags+middle+1, m_activeFrags+middle, (m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*)); - m_activeFrags[middle] = oldTop; + m_activeFrags[middle] = frag; } assert(verifySortOrder()); } -} + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount + <= m_capacity); +} // OrderedFragSet::reorganize() void NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag) { - assert(&frag!=NULL); + assert(m_activeFragCount+m_finalFragCount < m_capacity); - if (frag.isEmpty()) - { - if (frag.finalBatchReceived()) - { - m_finalFragCount++; - } - else - { - m_emptiedFrags[m_emptiedFragCount++] = &frag; - } - } - else + m_activeFrags[m_activeFragCount++] = &frag; // Add avail fragment + reorganize(); // Move into position +} // OrderedFragSet::add() + +void +NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt) +{ + for (Uint32 fragNo = 0; fragNo < cnt; fragNo++) { - assert(m_activeFragCount+m_finalFragCount < m_capacity); - if(m_ordering==NdbQueryOptions::ScanOrdering_unordered) + NdbRootFragment& rootFrag = rootFrags[fragNo]; + if (rootFrag.hasReceivedMore()) // Another ResultSet is available { - m_activeFrags[m_activeFragCount++] = &frag; + rootFrag.grabNextResultSet(); // Get new ResultSet. + add(rootFrag); // Make avail. to appl. thread } - else - { - int current = 0; - // Insert the new frag such that the array remains sorted. - while(currentisEmpty()); - assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount - <= m_capacity); -} + } // for all 'rootFrags[]' -void NdbQueryImpl::OrderedFragSet::clear() -{ - m_activeFragCount = 0; - m_emptiedFragCount = 0; - m_finalFragCount = 0; -} + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount + <= m_capacity); +} // OrderedFragSet::prepareMoreResults() -NdbRootFragment* -NdbQueryImpl::OrderedFragSet::getEmpty() +Uint32 +NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags) { - if (m_emptiedFragCount > 0) - { - assert(m_emptiedFrags[m_emptiedFragCount-1]->isEmpty()); - return m_emptiedFrags[--m_emptiedFragCount]; - } - else - { - return NULL; - } + const int cnt = m_fetchMoreFragCount; + frags = m_fetchMoreFrags; + m_fetchMoreFragCount = 0; + return cnt; } bool @@ -3477,7 +3606,6 @@ NdbQueryImpl::OrderedFragSet::verifySort return true; } - /** * Compare frags such that f1setConfReceived(); + // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF + rootFrag->setConfReceived(tcPtrI); rootFrag->setRemainingSubScans(nodeMask); rootFrag->incrOutstandingResults(rowCount); - // Handle for SCAN_NEXTREQ, RNIL -> EOF - NdbResultStream& resultStream = rootFrag->getResultStream(*this); - resultStream.getReceiver().m_tcPtrI = tcPtrI; - if(traceSignals){ ndbout << " resultStream {" << rootFrag->getResultStream(*this) << "} fragNo" << rootFrag->getFragNo() @@ -5033,7 +5158,7 @@ NdbOut& operator<<(NdbOut& out, const Nd } NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){ - out << " m_rowCount: " << stream.m_rowCount; + out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount(); return out; } === modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-16 07:56:53 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000 @@ -271,62 +271,17 @@ private: FetchResult_noMoreCache = 2 }; - /** A stack of NdbRootFragment pointers. - * NdbRootFragments which are 'BatchComplete' are pushed on this stack by - * the receiver thread, and later pop'ed into the application thread when - * it need more results to process. - * Due to this shared usage, the PollGuard mutex must be set before - * accessing SharedFragStack. + /** + * Container of root fragments that the application is currently + * iterating over. 'Owned' by application thread and can be accesed + * without requiring a mutex lock. + * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults() + * */ - class SharedFragStack{ - public: - // For calculating need for dynamically allocated memory. - static const Uint32 pointersPerFragment = 2; - - explicit SharedFragStack(); - - /** - * Prepare internal datastructures. - * param[in] allocator For allocating arrays of pointers. - * param[in] capacity Max no of root fragments. - * @return 0 if ok, else errorcode - */ - int prepare(NdbBulkAllocator& allocator, int capacity); - - NdbRootFragment* pop() { - return m_current>=0 ? m_array[m_current--] : NULL; - } - - void push(NdbRootFragment& frag); - - int size() const { - return (m_current+1); - } - - void clear() { - m_current = -1; - } - - /** Possible error received from TC / datanodes. */ - int m_errorCode; - - private: - /** Capacity of stack.*/ - int m_capacity; - - /** Index of current top of stack.*/ - int m_current; - NdbRootFragment** m_array; - - // No copying. - SharedFragStack(const SharedFragStack&); - SharedFragStack& operator=(const SharedFragStack&); - }; // class SharedFragStack - class OrderedFragSet{ public: // For calculating need for dynamically allocated memory. - static const Uint32 pointersPerFragment = 1; + static const Uint32 pointersPerFragment = 2; explicit OrderedFragSet(); @@ -339,43 +294,55 @@ private: * param[in] capacity Max no of root fragments. * @return 0 if ok, else errorcode */ - int prepare(NdbBulkAllocator& allocator, - NdbQueryOptions::ScanOrdering ordering, - int capacity, - const NdbRecord* keyRecord, - const NdbRecord* resultRecord); + void prepare(NdbBulkAllocator& allocator, + NdbQueryOptions::ScanOrdering ordering, + int capacity, + const NdbRecord* keyRecord, + const NdbRecord* resultRecord); + + /** + * Add root fragments with completed ResultSets to this OrderedFragSet. + * The PollGuard mutex must locked, and under its protection + * completed root fragments are 'consumed' from rootFrags[] and + * added to OrderedFragSet where it become available for the + * application thread. + */ + void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt); // Need mutex lock /** Get the root fragment from which to read the next row.*/ NdbRootFragment* getCurrent() const; - /** Re-organize the fragments after a row has been consumed. This is - * needed to remove fragements that has been needed, and to re-sort - * fragments if doing a sorted scan.*/ + /** + * Re-organize the fragments after a row has been consumed. This is + * needed to remove fragments that has been emptied, and to re-sort + * fragments if doing a sorted scan. + */ void reorganize(); - /** Add a complete fragment that has been received.*/ - void add(NdbRootFragment& frag); - /** Reset object to an empty state.*/ void clear(); - /** Get a fragment where all rows have been consumed. (This method is - * not idempotent - the fragment is removed from the set. - * @return Emptied fragment (or NULL if there are no more emptied - * fragments). + /** + * Get all fragments where more rows may be (pre-)fetched. + * (This method is not idempotent - the fragments are removed + * from the set.) + * @return Number of fragments (in &frags) from which more + * results should be requested. */ - NdbRootFragment* getEmpty(); + Uint32 getFetchMore(NdbRootFragment** &rootFrags); private: - /** Max no of fragments.*/ + /** No of fragments to read until '::finalBatchReceived()'.*/ int m_capacity; /** Number of fragments in 'm_activeFrags'.*/ int m_activeFragCount; - /** Number of fragments in 'm_emptiedFrags'. */ - int m_emptiedFragCount; - /** Number of fragments where the final batch has been received - * and consumed.*/ + /** Number of fragments in 'm_fetchMoreFrags'. */ + int m_fetchMoreFragCount; + /** + * Number of fragments where the final batch has been received + * and consumed. + */ int m_finalFragCount; /** Ordering of index scan result.*/ NdbQueryOptions::ScanOrdering m_ordering; @@ -383,15 +350,21 @@ private: const NdbRecord* m_keyRecord; /** Needed for comparing records when ordering results.*/ const NdbRecord* m_resultRecord; - /** Fragments where some tuples in the current batch has not yet been - * consumed.*/ + /** + * Fragments where some tuples in the current ResultSet has not + * yet been consumed. + */ NdbRootFragment** m_activeFrags; - /** Fragments where all tuples in the current batch have been consumed, - * but where there are more batches to fetch.*/ - NdbRootFragment** m_emptiedFrags; - // No copying. - OrderedFragSet(const OrderedFragSet&); - OrderedFragSet& operator=(const OrderedFragSet&); + /** + * Fragments from which we should request more ResultSets. + * Either due to the current batch has been consumed, or double buffering + * of result sets allows us to request another batch before the current + * has been consumed. + */ + NdbRootFragment** m_fetchMoreFrags; + + /** Add a complete fragment that has been received.*/ + void add(NdbRootFragment& frag); /** For sorting fragment reads according to index value of first record. * Also f1m_row_size; } -#define KEY_ATTR_ID (~(Uint32)0) +void +NdbReceiver::prepareReceive(char *buf) +{ + /* Set pointers etc. to prepare for receiving the first row of the batch. */ + assert(theMagicNumber == 0x11223344); + m_received_result_length = 0; + m_expected_result_length = 0; + if (m_using_ndb_record) + { + m_record.m_row_recv= buf; + } + theCurrentRecAttr = theFirstRecAttr; +} + +void +NdbReceiver::prepareRead(char *buf, Uint32 rows) +{ + /* Set pointers etc. to prepare for reading the first row of the batch. */ + assert(theMagicNumber == 0x11223344); + m_current_row = 0; + m_result_rows = rows; + if (m_using_ndb_record) + { + m_record.m_row_buffer = buf; + } +} + + #define KEY_ATTR_ID (~(Uint32)0) /* Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to @@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd { m_using_ndb_record= true; m_record.m_ndb_record= ndb_record; - m_record.m_row= row_buffer; + m_record.m_row_recv= row_buffer; m_record.m_row_buffer= row_buffer; m_record.m_row_offset= rowsize; m_record.m_read_range_no= read_range_no; @@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui { if (BitmaskImpl::get(bmlen, aDataPtr, ++i)) { - setRecToNULL(col, m_record.m_row); + setRecToNULL(col, m_record.m_row_recv); // Next column... continue; @@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32 assert(m_record.m_read_range_no); assert(attrSize==4); assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize); - memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4); + memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size, + aDataPtr++, 4); aLength--; continue; // Next } @@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size); Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length aDataPtr, - m_record.m_row); + m_record.m_row_recv); aDataPtr+= len; aLength-= len; continue; // Next @@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32 /* Save this extra getValue */ save_pos+= sizeof(Uint32); - memcpy(m_record.m_row + m_record.m_row_offset - save_pos, + memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos, &attrSize, sizeof(Uint32)); if (attrSize > 0) { save_pos+= attrSize; assert (save_pos<=m_record.m_row_offset); - memcpy(m_record.m_row + m_record.m_row_offset - save_pos, + memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos, aDataPtr, attrSize); } @@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32 if (m_using_ndb_record) { /* Move onto next row in scan buffer */ - m_record.m_row+= m_record.m_row_offset; + m_record.m_row_recv+= m_record.m_row_offset; } return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0); } No bundle (reason: useless for push emails).