4472 Pekka Nousiainen 2011-08-21
wl#4124 g01_enable.diff
MTR: index-stat-enable=1 (but AutoCreate OFF)
modified:
mysql-test/suite/ndb/my.cnf
mysql-test/suite/ndb/r/ndb_index.result
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics0.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat_enable.inc
mysql-test/suite/ndb/t/ndb_share.cnf
4471 jonas oreland 2011-08-19 [merge]
ndb - merge 63 to 70
modified:
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
4470 jonas oreland 2011-08-19 [merge]
ndb - merge 63 to 70
modified:
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
4469 Ole John Aske 2011-08-18
SPJ: Disallow copy and assignment of NdbRootFragment objects
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
4468 Ole John Aske 2011-08-17
SPJ: Rename of class OrderedFragSet member fields
m_emptiedFragCount & m_emptiedFrags to m_fetchMoreFragCount & m_fetchMoreFrags.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
4467 Ole John Aske 2011-08-17
SPJ: Code cleanup: move NdbQueryImpl::OrderedFragSet::clear() to a place closer
to C'tor / D'tor code where it more naturally belongs.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
4466 Ole John Aske 2011-08-17
SPJ: Rename a couple of ::reset() methods to ::prepareNextReceiveSet() which
is more descriptive.
Also reorder code to place it together with NdbResultStream::prepareResultSet()
which is logical related.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
4465 Ole John Aske 2011-08-17
SPJ: Introduce the helper class NdbResultSet which manage the receive/read
buffer for class NdbResultStream.
The intention is to later change class NdbResultStream to have
multiple NdbResultSets -> double buffered results.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
4464 Ole John Aske 2011-08-17
SPJ extensions:In preparation for implementing handling of double buffered result
sets in class NdbReceiver, extend NdbReceiver with two new methods to resp.
set buffer positions for receiving and reading result rows into/from
the NdbReceiver.
NOTE: Different receive and read position did already exists
as part of the NdbReceiver - This patch only extend the
interface to be able to set it from the outside.
modified:
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
4463 Ole John Aske 2011-08-17
Rename class NdbReceiver member field 'm_row' to 'm_row_recv' in order to
clarify its usage as a row buffer only used when received rows by
the NdbReceiver. (As opposed to the when we we *read* rows which has
been received by a NdbReceived)
modified:
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
4462 Ole John Aske 2011-08-17
Refactoring as part of preparing SPJ results to be double (multi...?) buffered.
'class SharedFragStack' which was intended as a container for fragments
which has received a complete batch of rows is removed.
It is replaced with a set of (mutex protected) methods in
'class NdbRootFragment' and OrderedFragSet::prepareMoreResults()
The rational for this refactoring is that the SharedFragStack container
wasn't a particular good fit to represent the state when *multiple*
batches of (double buffered) result set may be available.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
4461 Jonas Oreland 2011-08-17 [merge]
ndb - merge wl4124-new2
removed:
mysql-test/suite/ndb/r/ndb_statistics.result
mysql-test/suite/ndb/t/ndb_statistics.test
added:
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/r/ndb_statistics0.result
mysql-test/suite/ndb/r/ndb_statistics1.result
mysql-test/suite/ndb/t/ndb_index_stat.test
mysql-test/suite/ndb/t/ndb_index_stat_enable.inc
mysql-test/suite/ndb/t/ndb_statistics.inc
mysql-test/suite/ndb/t/ndb_statistics0.test
mysql-test/suite/ndb/t/ndb_statistics1.test
modified:
mysql-test/suite/ndb/r/ndb_restore_misc.result
mysql-test/suite/ndb/t/ndb_restore_misc.test
sql/ha_ndb_index_stat.cc
sql/ha_ndb_index_stat.h
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
storage/ndb/include/ndb_constants.h
storage/ndb/include/ndbapi/NdbIndexStat.hpp
storage/ndb/include/util/NdbPack.hpp
storage/ndb/src/common/util/NdbPack.cpp
storage/ndb/src/ndbapi/NdbIndexStat.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
storage/ndb/test/ndbapi/testIndexStat.cpp
storage/ndb/tools/ndb_index_stat.cpp
=== modified file '.bzr-mysql/default.conf'
--- a/.bzr-mysql/default.conf 2011-08-09 17:09:06 +0000
+++ b/.bzr-mysql/default.conf 2011-08-17 09:09:22 +0000
@@ -1,4 +1,4 @@
[MYSQL]
post_commit_to = commits@stripped
post_push_to = commits@stripped
-tree_name = mysql-5.1-telco-7.0-wl4124-new2
+tree_name = mysql-5.1-telco-7.0
=== modified file 'mysql-test/suite/ndb/my.cnf'
--- a/mysql-test/suite/ndb/my.cnf 2011-04-15 09:34:10 +0000
+++ b/mysql-test/suite/ndb/my.cnf 2011-08-21 08:48:41 +0000
@@ -6,6 +6,8 @@ ndbd=,
ndb_mgmd=
mysqld=,
ndbapi=,,,,,,,,,,,
+IndexStatAutoCreate=0
+IndexStatAutoUpdate=0
[cluster_config.mysqld.1.1]
NodeId=49
@@ -42,6 +44,7 @@ ndb-wait-connected=20
ndb-wait-setup=120
ndb-cluster-connection-pool=3
ndb-extra-logging=99
+ndb-index-stat-enable=1
[ENV]
NDB_CONNECTSTRING= @mysql_cluster.1.ndb_connectstring
=== modified file 'mysql-test/suite/ndb/r/ndb_index.result'
--- a/mysql-test/suite/ndb/r/ndb_index.result 2011-02-28 10:42:04 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index.result 2011-08-21 08:48:41 +0000
@@ -306,7 +306,7 @@ explain
select i,vc from t1
where i>=1 or vc > '0';
id select_type table type possible_keys key key_len ref rows Extra
-1 SIMPLE t1 index_merge PRIMARY,i1,i2 i1,i2 5,18 NULL 6 Using sort_union(i1,i2); Using where with pushed condition
+1 SIMPLE t1 ALL PRIMARY,i1,i2 NULL NULL NULL 23 Using where with pushed condition
select i,vc from t1
where i>=1 or vc > '0';
i vc
@@ -350,7 +350,7 @@ explain
select i,vc from t2
where i>=1 or vc > '0';
id select_type table type possible_keys key key_len ref rows Extra
-1 SIMPLE t2 index_merge i1,i2 i1,i2 5,19 NULL 6 Using sort_union(i1,i2); Using where with pushed condition
+1 SIMPLE t2 ALL i1,i2 NULL NULL NULL 23 Using where with pushed condition
select i,vc from t2
where i>=1 or vc > '0';
i vc
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-07-23 14:38:08 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-08-21 08:48:41 +0000
@@ -2,16 +2,14 @@ DROP TABLE IF EXISTS t1, t2;
set @is_enable_default = @@global.ndb_index_stat_enable;
set @is_enable = 1;
set @is_enable = NULL;
-# is_enable_on=1 is_enable_off=0
+# is_enable_on=0 is_enable_off=0
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
-set @@global.ndb_index_stat_enable = 1;
-set @@local.ndb_index_stat_enable = 1;
+ndb_index_stat_enable ON
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -477,7 +475,7 @@ count(*)
drop table t1;
set @is_enable = @is_enable_default;
set @is_enable = NULL;
-# is_enable_on=0 is_enable_off=1
+# is_enable_on=0 is_enable_off=0
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -485,14 +483,10 @@ ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
ndb_index_stat_enable ON
-set @@local.ndb_index_stat_enable = 0;
-set @@global.ndb_index_stat_enable = 0;
-drop table mysql.ndb_index_stat_sample;
-drop table mysql.ndb_index_stat_head;
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
=== modified file 'mysql-test/suite/ndb/r/ndb_statistics0.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics0.result 2011-07-02 07:05:32 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics0.result 2011-08-21 08:48:41 +0000
@@ -1,14 +1,16 @@
set @is_enable_default = @@global.ndb_index_stat_enable;
set @is_enable = 0;
set @is_enable = NULL;
-# is_enable_on=0 is_enable_off=0
+# is_enable_on=0 is_enable_off=1
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
+set @@local.ndb_index_stat_enable = 0;
+set @@global.ndb_index_stat_enable = 0;
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -180,7 +182,7 @@ DROP TABLE t10,t100,t10000;
End of 5.1 tests
set @is_enable = @is_enable_default;
set @is_enable = NULL;
-# is_enable_on=0 is_enable_off=0
+# is_enable_on=1 is_enable_off=0
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -188,10 +190,12 @@ ndb_index_stat_enable OFF
show local variables like 'ndb_index_stat_enable';
Variable_name Value
ndb_index_stat_enable OFF
+set @@global.ndb_index_stat_enable = 1;
+set @@local.ndb_index_stat_enable = 1;
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
=== modified file 'mysql-test/suite/ndb/r/ndb_statistics1.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-07-25 17:11:41 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics1.result 2011-08-21 08:48:41 +0000
@@ -1,16 +1,14 @@
set @is_enable_default = @@global.ndb_index_stat_enable;
set @is_enable = 1;
set @is_enable = NULL;
-# is_enable_on=1 is_enable_off=0
+# is_enable_on=0 is_enable_off=0
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
-set @@global.ndb_index_stat_enable = 1;
-set @@local.ndb_index_stat_enable = 1;
+ndb_index_stat_enable ON
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -182,7 +180,7 @@ DROP TABLE t10,t100,t10000;
End of 5.1 tests
set @is_enable = @is_enable_default;
set @is_enable = NULL;
-# is_enable_on=0 is_enable_off=1
+# is_enable_on=0 is_enable_off=0
# ndb_index_stat_enable - before
show global variables like 'ndb_index_stat_enable';
Variable_name Value
@@ -190,14 +188,10 @@ ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
ndb_index_stat_enable ON
-set @@local.ndb_index_stat_enable = 0;
-set @@global.ndb_index_stat_enable = 0;
-drop table mysql.ndb_index_stat_sample;
-drop table mysql.ndb_index_stat_head;
# ndb_index_stat_enable - after
show global variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
show local variables like 'ndb_index_stat_enable';
Variable_name Value
-ndb_index_stat_enable OFF
+ndb_index_stat_enable ON
=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat_enable.inc'
--- a/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc 2011-07-02 07:05:32 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc 2011-08-21 08:48:41 +0000
@@ -1,11 +1,12 @@
# turn ndb_index_stat_enable ON or OFF
-# caller sets @is_enable 0/1
+# caller sets @is_enable_default and @is_enable 0/1
# based on global variable, local follows global
# do nothing if value is already correct
# setting OFF drops stats tables to avoid MTR diff
let is_enable_on = `select @is_enable and not @@global.ndb_index_stat_enable`;
let is_enable_off = `select not @is_enable and @@global.ndb_index_stat_enable`;
+let is_enable_default = `select @is_enable_default`;
set @is_enable = NULL;
--echo # is_enable_on=$is_enable_on is_enable_off=$is_enable_off
@@ -30,8 +31,12 @@ if ($is_enable_off)
eval set @@global.ndb_index_stat_enable = 0;
# stats thread does not (and must not) drop stats tables
- eval drop table mysql.ndb_index_stat_sample;
- eval drop table mysql.ndb_index_stat_head;
+ # also do not drop tables if MTR default is enable=1
+ if (!$is_enable_default)
+ {
+ eval drop table mysql.ndb_index_stat_sample;
+ eval drop table mysql.ndb_index_stat_head;
+ }
}
--echo # ndb_index_stat_enable - after
=== modified file 'mysql-test/suite/ndb/t/ndb_share.cnf'
--- a/mysql-test/suite/ndb/t/ndb_share.cnf 2011-07-07 10:00:25 +0000
+++ b/mysql-test/suite/ndb/t/ndb_share.cnf 2011-08-21 08:48:41 +0000
@@ -3,6 +3,12 @@
[cluster_config.1]
mysqld=,,,
+[mysqld]
+# Test does restart -i which drops stats tables
+# MTR check-testcases then fails (before/after state not preserved)
+# Disable until better solution found
+ndb-index-stat-enable=0
+
[mysqld.1.1]
new
log-bin=mysqld-bin
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:36:56 +0000
@@ -40,7 +40,6 @@ class NdbReceiver
friend class NdbIndexScanOperation;
friend class NdbTransaction;
friend class NdbRootFragment;
- friend class ReceiverIdIterator;
friend int compare_ndbrecord(const NdbReceiver *r1,
const NdbReceiver *r2,
const NdbRecord *key_record,
@@ -82,6 +81,12 @@ public:
void setErrorCode(int);
+ /* Prepare for receiving of rows into specified buffer */
+ void prepareReceive(char *buf);
+
+ /* Prepare for reading of rows from specified buffer */
+ void prepareRead(char *buf, Uint32 rows);
+
private:
Uint32 theMagicNumber;
Ndb* m_ndb;
@@ -140,8 +145,9 @@ private:
/* members used for NdbRecord operation. */
struct {
const NdbRecord *m_ndb_record;
- char *m_row;
- /* Block of memory used to receive all rows in a batch during scan. */
+ /* Destination to receive next row into. */
+ char *m_row_recv;
+ /* Block of memory used to read all rows in a batch during scan. */
char *m_row_buffer;
/*
Offsets between two rows in m_row_buffer.
@@ -226,7 +232,7 @@ private:
const char * & data_ptr) const;
int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
/** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
- void setCurrentRow(Uint32 currentRow);
+ void setCurrentRow(char* buffer, Uint32 row);
/** Used by NdbQueryOperationImpl.*/
Uint32 getCurrentRow() const { return m_current_row; }
};
@@ -259,7 +265,7 @@ NdbReceiver::prepareSend(){
if (m_using_ndb_record)
{
if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
- m_record.m_row= m_record.m_row_buffer;
+ m_record.m_row_recv= m_record.m_row_buffer;
}
theCurrentRecAttr = theFirstRecAttr;
}
@@ -287,9 +293,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
inline
void
-NdbReceiver::setCurrentRow(Uint32 currentRow)
+NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
{
- m_current_row = currentRow;
+ m_record.m_row_buffer = buffer;
+ m_current_row = row;
+#ifdef assert
+ assert(m_current_row < m_result_rows);
+#endif
}
inline
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-08-19 08:21:21 +0000
@@ -3880,6 +3880,8 @@ done:
ndbassert(gci == restorableGCI);
replicaPtr.p->m_restorable_gci = gci;
Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
+ if (startGci > gci)
+ startGci = gci;
ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
takeOverPtr.p->toStartingNode,
takeOverPtr.p->toCurrentTabref,
@@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
}
}
- if(arg == 7019 && signal->getLength() == 2)
+ if(arg == 7019 && signal->getLength() == 2 &&
+ signal->theData[1] < MAX_NDB_NODES)
{
char buf2[8+1];
NodeRecordPtr nodePtr;
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-07-04 13:40:57 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-08-19 08:21:21 +0000
@@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s
nodePtr.p->m_failconf_blocks[3],
nodePtr.p->m_failconf_blocks[4]);
warningEvent("%s", buf);
-
- for (Uint32 i = 0; i<NDB_ARRAY_SIZE(nodePtr.p->m_failconf_blocks);
- i++)
- {
- jam();
- if (nodePtr.p->m_failconf_blocks[i] != 0)
- {
- jam();
- signal->theData[0] = 7019;
- signal->theData[1] = nodePtr.i;
- sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i],
- getOwnNodeId()),
- GSN_DUMP_STATE_ORD, signal, 2, JBB);
- }
- else
- {
- jam();
- break;
- }
- }
}
}
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:37:24 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-18 11:47:22 +0000
@@ -181,18 +181,24 @@ public:
*/
void init(NdbQueryImpl& query, Uint32 fragNo);
+ static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
+
Uint32 getFragNo() const
{ return m_fragNo; }
/**
- * Prepare for receiving another batch.
+ * Prepare for receiving another batch of results.
*/
- void reset();
+ void prepareNextReceiveSet();
/**
* Prepare for reading another batch of results.
*/
- void prepareResultSet();
+ void grabNextResultSet(); // Need mutex lock
+
+ bool hasReceivedMore() const; // Need mutex lock
+
+ void setReceivedMore(); // Need mutex lock
void incrOutstandingResults(Int32 delta)
{
@@ -204,7 +210,7 @@ public:
m_outstandingResults = 0;
}
- void setConfReceived();
+ void setConfReceived(Uint32 tcPtrI);
/**
* The root operation will read from a number of fragments of a table.
@@ -261,6 +267,10 @@ public:
void postFetchRelease();
private:
+ /** No copying.*/
+ NdbRootFragment(const NdbRootFragment&);
+ NdbRootFragment& operator=(const NdbRootFragment&);
+
STATIC_CONST( voidFragNo = 0xffffffff);
/** Enclosing query.*/
@@ -273,12 +283,19 @@ private:
NdbResultStream* m_resultStreams;
/**
- * The number of outstanding TCKEYREF or TRANSID_AI
- * messages for the fragment. This includes both messages related to the
+ * Number of available prefetched ResultSets which are completely
+ * received. Will be made available for reading by calling
+ * ::grabNextResultSet()
+ */
+ Uint32 m_availResultSets; // Need mutex
+
+ /**
+ * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
+ * for the fragment. This includes both messages related to the
* root operation and any descendant operation that was instantiated as
* a consequence of tuples found by the root operation.
- * This number may temporarily be negative if e.g. TRANSID_AI arrives before
- * SCAN_TABCONF.
+ * This number may temporarily be negative if e.g. TRANSID_AI arrives
+ * before SCAN_TABCONF.
*/
Int32 m_outstandingResults;
@@ -287,7 +304,8 @@ private:
* operation accesses (i.e. one for a lookup, all for a table scan).
*
* Each element is true iff a SCAN_TABCONF (for that fragment) or
- * TCKEYCONF message has been received */
+ * TCKEYCONF message has been received
+ */
bool m_confReceived;
/**
@@ -311,6 +329,55 @@ private:
int m_idMapNext;
}; //NdbRootFragment
+/**
+ * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
+ * It manages the buffers which rows are received into and
+ * read from.
+ */
+class NdbResultSet
+{
+ friend class NdbResultStream;
+
+public:
+ explicit NdbResultSet();
+
+ void init(NdbQueryImpl& query,
+ Uint32 maxRows, Uint32 rowSize);
+
+ void prepareReceive(NdbReceiver& receiver)
+ {
+ m_rowCount = 0;
+ receiver.prepareReceive(m_buffer);
+ }
+
+ void prepareRead(NdbReceiver& receiver)
+ {
+ receiver.prepareRead(m_buffer,m_rowCount);
+ }
+
+ Uint32 getRowCount() const
+ { return m_rowCount; }
+
+ char* getRow(Uint32 tupleNo) const
+ { return (m_buffer + (tupleNo*m_rowSize)); }
+
+private:
+ /** No copying.*/
+ NdbResultSet(const NdbResultSet&);
+ NdbResultSet& operator=(const NdbResultSet&);
+
+ /** The buffers which we receive the results into */
+ char* m_buffer;
+
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
+
+ Uint32 m_rowSize;
+
+ /** The current #rows in 'm_buffer'.*/
+ Uint32 m_rowCount;
+
+}; // class NdbResultSet
/**
* This class manages the subset of result data for one operation that is
@@ -340,7 +407,7 @@ public:
void prepare();
/** Prepare for receiving next batch of scan results. */
- void reset();
+ void prepareNextReceiveSet();
NdbReceiver& getReceiver()
{ return m_receiver; }
@@ -348,11 +415,8 @@ public:
const NdbReceiver& getReceiver() const
{ return m_receiver; }
- Uint32 getRowCount() const
- { return m_rowCount; }
-
- char* getRow(Uint32 tupleNo) const
- { return (m_buffer + (tupleNo*m_rowSize)); }
+ const char* getCurrentRow() const
+ { return m_receiver.peek_row(); }
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
@@ -372,9 +436,9 @@ public:
bool prepareResultSet(Uint32 remainingScans);
/**
- * Navigate within the current result batch to resp. first and next row.
+ * Navigate within the current ResultSet to resp. first and next row.
* For non-parent operations in the pushed query, navigation is with respect
- * to any preceding parents which results in this NdbResultStream depends on.
+ * to any preceding parents which results in this ResultSet depends on.
* Returns either the tupleNo within TupleSet[] which we navigated to, or
* tupleNotFound().
*/
@@ -490,25 +554,22 @@ private:
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
- /** The buffers which we receive the results into */
- char* m_buffer;
-
- /** Used for checking if buffer overrun occurred. */
- Uint32* m_batchOverflowCheck;
-
- Uint32 m_rowSize;
-
- /** The number of transid_AI messages received.*/
- Uint32 m_rowCount;
+ /**
+ * ResultSets are received into and read from this stream,
+ * intended to be extended into double buffered ResultSet later.
+ */
+ NdbResultSet m_resultSets[1];
+ Uint32 m_read; // We read from m_resultSets[m_read]
+ Uint32 m_recv; // We receive into m_resultSets[m_recv]
/** This is the state of the iterator used by firstResult(), nextResult().*/
enum
{
/** The first row has not been fetched yet.*/
Iter_notStarted,
- /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+ /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
Iter_started,
- /** Last row for current batch has been returned.*/
+ /** Last row for current ResultSet has been returned.*/
Iter_finished
} m_iterState;
@@ -586,6 +647,34 @@ void* NdbBulkAllocator::allocObjMem(Uint
return m_nextObjNo > m_maxObjs ? NULL : result;
}
+///////////////////////////////////////////
+///////// NdbResultSet methods ///////////
+///////////////////////////////////////////
+NdbResultSet::NdbResultSet() :
+ m_buffer(NULL),
+ m_batchOverflowCheck(NULL),
+ m_rowSize(0),
+ m_rowCount(0)
+{}
+
+void
+NdbResultSet::init(NdbQueryImpl& query,
+ Uint32 maxRows,
+ Uint32 rowSize)
+{
+ m_rowSize = rowSize;
+ {
+ const int bufferSize = rowSize * maxRows;
+ NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+ m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+ // So that we can test for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
+ }
+}
+
//////////////////////////////////////////////
///////// NdbResultStream methods ///////////
//////////////////////////////////////////////
@@ -607,10 +696,7 @@ NdbResultStream::NdbResultStream(NdbQuer
| (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
? Is_Inner_Join : 0))),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
- m_buffer(NULL),
- m_batchOverflowCheck(NULL),
- m_rowSize(0),
- m_rowCount(0),
+ m_resultSets(), m_read(0xffffffff), m_recv(0),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
m_maxRows(0),
@@ -631,12 +717,8 @@ NdbResultStream::prepare()
const Uint32 rowSize = m_operation.getRowSize();
NdbQueryImpl &query = m_operation.getQuery();
- m_rowSize = rowSize;
-
/* Parent / child correlation is only relevant for scan type queries
- * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
- * Neither is these structures required for operations not having respective
- * child or parent operations.
+ * Don't create a m_tupleSet with these correlation id's for lookups!
*/
if (isScanQuery())
{
@@ -648,14 +730,7 @@ NdbResultStream::prepare()
else
m_maxRows = 1;
- const int bufferSize = rowSize * m_maxRows;
- NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
- m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
-
- // So that we can test for buffer overrun.
- m_batchOverflowCheck =
- reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
- *m_batchOverflowCheck = 0xacbd1234;
+ m_resultSets[0].init(query, m_maxRows, rowSize);
m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
m_receiver.do_setup_ndbrecord(
@@ -664,32 +739,9 @@ NdbResultStream::prepare()
0 /*key_size*/,
0 /*read_range_no*/,
rowSize,
- m_buffer);
+ m_resultSets[m_recv].m_buffer);
} //NdbResultStream::prepare
-void
-NdbResultStream::reset()
-{
- assert (isScanQuery());
-
- // Root scan-operation need a ScanTabConf to complete
- m_rowCount = 0;
- m_iterState = Iter_notStarted;
- m_currentRow = tupleNotFound;
-
- m_receiver.prepareSend();
- /**
- * If this stream will get new rows in the next batch, then so will
- * all of its descendants.
- */
- for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
- childNo++)
- {
- NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- m_rootFrag.getResultStream(child).reset();
- }
-} //NdbResultStream::reset
-
/** Locate, and return 'tupleNo', of first tuple with specified parentId.
* parentId == tupleNotFound is use as a special value for iterating results
* from the root operation in the order which they was inserted by
@@ -703,11 +755,11 @@ NdbResultStream::findTupleWithParentId(U
{
assert ((parentId==tupleNotFound) == (m_parent==NULL));
- if (likely(m_rowCount>0))
+ if (likely(m_resultSets[m_read].m_rowCount>0))
{
if (m_tupleSet==NULL)
{
- assert (m_rowCount <= 1);
+ assert (m_resultSets[m_read].m_rowCount <= 1);
return 0;
}
@@ -774,7 +826,7 @@ NdbResultStream::firstResult()
if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
@@ -782,7 +834,6 @@ NdbResultStream::firstResult()
return tupleNotFound;
} //NdbResultStream::firstResult()
-
Uint16
NdbResultStream::nextResult()
{
@@ -791,14 +842,13 @@ NdbResultStream::nextResult()
(m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
{
m_iterState = Iter_started;
- m_receiver.setCurrentRow(m_currentRow);
+ m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
return m_currentRow;
}
m_iterState = Iter_finished;
return tupleNotFound;
} //NdbResultStream::nextResult()
-
/**
* Callback when a TRANSID_AI signal (receive row) is processed.
*/
@@ -807,7 +857,7 @@ NdbResultStream::execTRANSID_AI(const Ui
TupleCorrelation correlation)
{
m_receiver.execTRANSID_AI(ptr, len);
- m_rowCount++;
+ m_resultSets[m_recv].m_rowCount++;
if (isScanQuery())
{
@@ -815,12 +865,39 @@ NdbResultStream::execTRANSID_AI(const Ui
* Store TupleCorrelation as hidden value imm. after received row
* (NdbQueryOperationImpl::getRowSize() has reserved space for it)
*/
- Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+ Uint32* row_recv = reinterpret_cast<Uint32*>
+ (m_receiver.m_record.m_row_recv);
row_recv[-1] = correlation.toUint32();
}
} // NdbResultStream::execTRANSID_AI()
/**
+ * Make preparation for another batch of results to be received.
+ * This NdbResultStream, and all its sibling will receive a batch
+ * of results from the datanodes.
+ */
+void
+NdbResultStream::prepareNextReceiveSet()
+{
+ assert (isScanQuery());
+
+ m_iterState = Iter_notStarted;
+ m_currentRow = tupleNotFound;
+ m_resultSets[m_recv].prepareReceive(m_receiver);
+
+ /**
+ * If this stream will get new rows in the next batch, then so will
+ * all of its descendants.
+ */
+ for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+ childNo++)
+ {
+ NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
+ m_rootFrag.getResultStream(child).prepareNextReceiveSet();
+ }
+} //NdbResultStream::prepareNextReceiveSet
+
+/**
* Make preparations for another batch of result to be read:
* - Fill in parent/child result correlations in m_tupleSet[]
* - ... or reset m_tupleSet[] if we reuse the previous.
@@ -833,12 +910,16 @@ NdbResultStream::prepareResultSet(Uint32
bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
assert(isComplete || isScanResult()); //Lookups always 'complete'
- // Set correct #rows received in the NdbReceiver.
- getReceiver().m_result_rows = getRowCount();
+ m_read = m_recv;
+ NdbResultSet& readResult = m_resultSets[m_read];
+
+ // Set correct buffer and #rows received by this ResultSet.
+ readResult.prepareRead(m_receiver);
/**
- * Prepare NdbResultStream for reading - either the next received
- * from datanodes or reuse current.
+ * Prepare NdbResultSet for reading - either the next received
+ * from datanodes or reuse the last as has been determined by
+ * ::prepareNextReceiveSet()
*/
if (m_tupleSet!=NULL)
{
@@ -850,7 +931,7 @@ NdbResultStream::prepareResultSet(Uint32
else
{
// Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
m_tupleSet[tupleNo].m_skip = false;
}
@@ -877,7 +958,7 @@ NdbResultStream::prepareResultSet(Uint32
if (m_tupleSet!=NULL)
{
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
{
if (!m_tupleSet[tupleNo].m_skip)
{
@@ -914,20 +995,23 @@ NdbResultStream::prepareResultSet(Uint32
/**
* Fill m_tupleSet[] with correlation data between parent
* and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row received
+ * and extra Uint32 after each row in the NdbResultSet
* by execTRANSID_AI().
*
* NOTE: In order to reduce work done when holding the
* transporter mutex, the 'TupleCorrelation' is only stored
* in the buffer when it arrives. Later (here) we build the
* correlation hashMap immediately before we prepare to
- * read the NdbResultStream.
+ * read the NdbResultSet.
*/
void
NdbResultStream::buildResultCorrelations()
{
+ const NdbResultSet& readResult = m_resultSets[m_read];
+
// Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+ assert(readResult.m_batchOverflowCheck==NULL ||
+ *readResult.m_batchOverflowCheck==0xacbd1234);
//if (m_tupleSet!=NULL)
{
@@ -937,10 +1021,10 @@ NdbResultStream::buildResultCorrelations
m_tupleSet[i].m_hash_head = tupleNotFound;
}
- /* Rebuild correlation & hashmap from received buffers */
- for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+ /* Rebuild correlation & hashmap from 'readResult' */
+ for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
{
- const Uint32* row = (Uint32*)getRow(tupleNo+1);
+ const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
const TupleCorrelation correlation(row[-1]);
const Uint16 tupleId = correlation.getTupleId();
@@ -1034,6 +1118,7 @@ NdbRootFragment::NdbRootFragment():
m_query(NULL),
m_fragNo(voidFragNo),
m_resultStreams(NULL),
+ m_availResultSets(0),
m_outstandingResults(0),
m_confReceived(false),
m_remainingScans(0),
@@ -1094,7 +1179,46 @@ NdbRootFragment::getResultStream(Uint32
return m_resultStreams[operationNo];
}
-void NdbRootFragment::reset()
+/**
+ * Throw any pending ResultSets from specified rootFrags[]
+ */
+//static
+void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
+{
+ if (rootFrags != NULL)
+ {
+ for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
+ {
+ rootFrags[fragNo].m_availResultSets = 0;
+ }
+ }
+}
+
+/**
+ * Signal that another complete ResultSet is available for
+ * this NdbRootFragment.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::setReceivedMore()
+{
+ assert(m_availResultSets==0);
+ m_availResultSets++;
+}
+
+/**
+ * Check if another ResultSets has been received and is available
+ * for reading. It will be given to the application thread when it
+ * call ::grabNextResultSet().
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+bool NdbRootFragment::hasReceivedMore() const
+{
+ return (m_availResultSets > 0);
+}
+
+void NdbRootFragment::prepareNextReceiveSet()
{
assert(m_fragNo!=voidFragNo);
assert(m_outstandingResults == 0);
@@ -1102,20 +1226,30 @@ void NdbRootFragment::reset()
for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
{
- if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+ NdbResultStream& resultStream = getResultStream(opNo);
+ if (!resultStream.isSubScanComplete(m_remainingScans))
{
/**
- * Reset m_resultStreams[] and all its descendants, since all these
+ * Reset resultStream and all its descendants, since all these
* streams will get a new set of rows in the next batch.
*/
- m_resultStreams[opNo].reset();
+ resultStream.prepareNextReceiveSet();
}
}
m_confReceived = false;
}
-void NdbRootFragment::prepareResultSet()
+/**
+ * Let the application thread takes ownership of an available
+ * ResultSet, prepare it for reading first row.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::grabNextResultSet()
{
+ assert(m_availResultSets>0);
+ m_availResultSets--;
+
NdbResultStream& rootStream = getResultStream(0);
rootStream.prepareResultSet(m_remainingScans);
@@ -1124,19 +1258,20 @@ void NdbRootFragment::prepareResultSet()
rootStream.firstResult();
}
-void NdbRootFragment::setConfReceived()
+void NdbRootFragment::setConfReceived(Uint32 tcPtrI)
{
/* For a query with a lookup root, there may be more than one TCKEYCONF
message. For a scan, there should only be one SCAN_TABCONF per root
fragment.
*/
- assert(!m_query->getQueryDef().isScanQuery() || !m_confReceived);
+ assert(!getResultStream(0).isScanQuery() || !m_confReceived);
+ getResultStream(0).getReceiver().m_tcPtrI = tcPtrI;
m_confReceived = true;
}
bool NdbRootFragment::finalBatchReceived() const
{
- return getReceiverTcPtrI()==RNIL;
+ return m_confReceived && getReceiverTcPtrI()==RNIL;
}
bool NdbRootFragment::isEmpty() const
@@ -1586,6 +1721,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_next(NULL),
m_queryDef(&queryDef),
m_error(),
+ m_errorReceived(0),
m_transaction(trans),
m_scanTransaction(NULL),
m_operations(0),
@@ -1595,7 +1731,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_rootFragCount(0),
m_rootFrags(NULL),
m_applFrags(),
- m_fullFrags(),
m_finalBatchFrags(0),
m_num_bounds(0),
m_shortestBound(0xffffffff),
@@ -2059,10 +2194,9 @@ NdbQueryImpl::nextResult(bool fetchAllow
NdbQuery::NextResultOutcome
NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
{
- /* To minimize lock contention, each query has two separate root fragment
- * conatiners (m_fullFrags and m_applFrags). m_applFrags is only
- * accessed by the application thread, so it is safe to use it without
- * locks.
+ /* To minimize lock contention, each query has the separate root fragment
+ * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
+ * thread, so it is safe to use it without locks.
*/
while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
{
@@ -2128,15 +2262,12 @@ NdbQueryImpl::nextRootResult(bool fetchA
*/
if (fetchAllowed)
{
- // Ask for a new batch if we emptied one.
- NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
- while (emptyFrag != NULL)
+ // Ask for a new batch if we emptied some.
+ NdbRootFragment** frags;
+ const Uint32 cnt = m_applFrags.getFetchMore(frags);
+ if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0)
{
- if (sendFetchMore(*emptyFrag, forceSend) != 0)
- {
- return NdbQuery::NextResult_error;
- }
- emptyFrag = m_applFrags.getEmpty();
+ return NdbQuery::NextResult_error;
}
}
@@ -2181,23 +2312,17 @@ NdbQueryImpl::awaitMoreResults(bool forc
*/
while (likely(!hasReceivedError()))
{
- /* m_fullFrags contains any fragments that are complete (for this batch)
- * but have not yet been moved (under mutex protection) to
- * m_applFrags.
+ /* Scan m_rootFrags (under mutex protection) for fragments
+ * which has received a complete batch. Add these to m_applFrags.
*/
- NdbRootFragment* frag;
- while ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
}
- /* There are noe more available frament results available without
- * first waiting for more to be received from datanodes
+ /* There are no more available fragment results available without
+ * first waiting for more to be received from the datanodes
*/
if (m_pendingFrags == 0)
{
@@ -2238,16 +2363,9 @@ NdbQueryImpl::awaitMoreResults(bool forc
/* The root operation is a lookup. Lookups are guaranteed to be complete
* before NdbTransaction::execute() returns. Therefore we do not set
* the lock, because we know that the signal receiver thread will not
- * be accessing m_fullFrags at this time.
+ * be accessing m_rootFrags at this time.
*/
- NdbRootFragment* frag;
- if ((frag=m_fullFrags.pop()) != NULL)
- {
- frag->prepareResultSet();
- m_applFrags.add(*frag);
- }
- assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
-
+ m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
if (m_applFrags.getCurrent() != NULL)
{
return FetchResult_ok;
@@ -2268,8 +2386,8 @@ NdbQueryImpl::awaitMoreResults(bool forc
/*
::handleBatchComplete() is intended to be called when receiving signals only.
- The PollGuard mutex is then set and the shared 'm_pendingFrags',
- 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+ The PollGuard mutex is then set and the shared 'm_pendingFrags' and
+ 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
returns: 'true' when application thread should be resumed.
*/
@@ -2288,7 +2406,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
* terminated the scan. We are about to close this query,
* and didn't expect any more data - ignore it!
*/
- if (likely(m_fullFrags.m_errorCode == 0))
+ if (likely(m_errorReceived == 0))
{
assert(rootFrag.isFragBatchComplete());
@@ -2302,10 +2420,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
assert(m_finalBatchFrags <= m_rootFragCount);
}
- /* When application thread ::awaitMoreResults() it will later be moved
- * from m_fullFrags to m_applFrags under mutex protection.
+ /* When application thread ::awaitMoreResults() it will later be
+ * added to m_applFrags under mutex protection.
*/
- m_fullFrags.push(rootFrag);
+ rootFrag.setReceivedMore();
return true;
}
@@ -2329,7 +2447,7 @@ NdbQueryImpl::close(bool forceSend)
}
// Throw any pending results
- m_fullFrags.clear();
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
m_applFrags.clear();
Ndb* const ndb = m_transaction.getNdb();
@@ -2425,7 +2543,7 @@ NdbQueryImpl::setFetchTerminated(int err
}
if (errorCode!=0)
{
- m_fullFrags.m_errorCode = errorCode;
+ m_errorReceived = errorCode;
}
m_pendingFrags = 0;
} // NdbQueryImpl::setFetchTerminated()
@@ -2438,9 +2556,9 @@ NdbQueryImpl::setFetchTerminated(int err
bool
NdbQueryImpl::hasReceivedError()
{
- if (unlikely(m_fullFrags.m_errorCode))
+ if (unlikely(m_errorReceived))
{
- setErrorCode(m_fullFrags.m_errorCode);
+ setErrorCode(m_errorReceived);
return true;
}
return false;
@@ -2457,7 +2575,7 @@ NdbQueryImpl::execTCKEYCONF()
NdbRootFragment& rootFrag = m_rootFrags[0];
// We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
- rootFrag.setConfReceived();
+ rootFrag.setConfReceived(RNIL);
rootFrag.incrOutstandingResults(-1);
bool ret = false;
@@ -2545,8 +2663,7 @@ NdbQueryImpl::prepareSend()
}
// Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
error = m_pointerAlloc.init(m_rootFragCount *
- (SharedFragStack::pointersPerFragment +
- OrderedFragSet::pointersPerFragment));
+ (OrderedFragSet::pointersPerFragment));
if (error != 0)
{
setErrorCode(error);
@@ -2633,21 +2750,17 @@ NdbQueryImpl::prepareSend()
keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
assert(keyRec!=NULL);
}
- if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
- getRoot().getOrdering(),
- m_rootFragCount,
- keyRec,
- getRoot().m_ndbRecord)) != 0)
- || (error = m_fullFrags.prepare(m_pointerAlloc,
- m_rootFragCount)) != 0) {
- setErrorCode(error);
- return -1;
- }
+ m_applFrags.prepare(m_pointerAlloc,
+ getRoot().getOrdering(),
+ m_rootFragCount,
+ keyRec,
+ getRoot().m_ndbRecord);
if (getQueryDef().isScanQuery())
{
NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
}
+
#ifdef TRACE_SERIALIZATION
ndbout << "Serialized ATTRINFO : ";
for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
@@ -2735,6 +2848,74 @@ const Uint32* InitialReceiverIdIterator:
}
}
+/** This iterator is used for inserting a sequence of 'TcPtrI'
+ * for a NEXTREQ to a single or multiple fragments via a GenericSectionPtr.*/
+class FetchMoreTcIdIterator: public GenericSectionIterator
+{
+public:
+ FetchMoreTcIdIterator(NdbRootFragment* rootFrags[],
+ Uint32 cnt)
+ :m_rootFrags(rootFrags),
+ m_fragCount(cnt),
+ m_currFragNo(0)
+ {}
+
+ virtual ~FetchMoreTcIdIterator() {};
+
+ /**
+ * Get next batch of receiver ids.
+ * @param sz This will be set to the number of receiver ids that have been
+ * put in the buffer (0 if end has been reached.)
+ * @return Array of receiver ids (or NULL if end reached.
+ */
+ virtual const Uint32* getNextWords(Uint32& sz);
+
+ virtual void reset()
+ { m_currFragNo = 0;};
+
+private:
+ /**
+ * Size of internal receiver id buffer. This value is arbitrary, but
+ * a larger buffer would mean fewer calls to getNextWords(), possibly
+ * improving efficiency.
+ */
+ static const Uint32 bufSize = 16;
+
+ /** Set of root fragments which we want to itterate TcPtrI ids for.*/
+ NdbRootFragment** m_rootFrags;
+ const Uint32 m_fragCount;
+
+ /** The next fragment numnber to be processed. (Range for 0 to no of
+ * fragments.)*/
+ Uint32 m_currFragNo;
+ /** Buffer for storing one batch of receiver ids.*/
+ Uint32 m_receiverIds[bufSize];
+};
+
+const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz)
+{
+ /**
+ * For the initial batch, we want to retrieve one batch for each fragment
+ * whether it is a sorted scan or not.
+ */
+ if (m_currFragNo >= m_fragCount)
+ {
+ sz = 0;
+ return NULL;
+ }
+ else
+ {
+ Uint32 cnt = 0;
+ while (cnt < bufSize && m_currFragNo < m_fragCount)
+ {
+ m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI();
+ cnt++;
+ m_currFragNo++;
+ }
+ sz = cnt;
+ return m_receiverIds;
+ }
+}
/******************************************************************************
int doSend() Send serialized queryTree and parameters encapsulated in
@@ -3011,12 +3192,19 @@ Parameters: emptyFrag: Root frgament
Remark:
******************************************************************************/
int
-NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
+NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[],
+ Uint32 cnt,
+ bool forceSend)
{
- assert(!emptyFrag.finalBatchReceived());
assert(getQueryDef().isScanQuery());
- emptyFrag.reset();
+ for (Uint32 i=0; i<cnt; i++)
+ {
+ NdbRootFragment* rootFrag = rootFrags[i];
+ assert(rootFrag->isFragBatchComplete());
+ assert(!rootFrag->finalBatchReceived());
+ rootFrag->prepareNextReceiveSet();
+ }
Ndb& ndb = *getNdbTransaction().getNdb();
NdbApiSignal tSignal(&ndb);
@@ -3033,12 +3221,11 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
scanNextReq->transId2 = (Uint32) (transId >> 32);
tSignal.setLength(ScanNextReq::SignalLength);
- const uint32 receiverId = emptyFrag.getReceiverTcPtrI();
- LinearSectionIterator receiverIdIter(&receiverId ,1);
+ FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt);
GenericSectionPtr secs[1];
secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
- secs[ScanNextReq::ReceiverIdsSectionNum].sz = 1;
+ secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt;
NdbImpl * impl = ndb.theImpl;
Uint32 nodeId = m_transaction.getConnectedNodeId();
@@ -3051,7 +3238,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
if (unlikely(hasReceivedError()))
{
- // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
+ // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it
return -1;
}
if (impl->getNodeSequence(nodeId) != seq ||
@@ -3062,8 +3249,9 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
}
impl->do_forceSend(forceSend);
- m_pendingFrags++;
+ m_pendingFrags += cnt;
assert(m_pendingFrags <= getRootFragCount());
+
return 0;
} // NdbQueryImpl::sendFetchMore()
@@ -3110,8 +3298,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe
} // while
assert(m_pendingFrags==0);
- m_fullFrags.clear(); // Throw any unhandled results
- m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching
+ NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
+ m_errorReceived = 0; // Clear errors caused by previous fetching
m_error.code = 0;
if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
@@ -3198,63 +3386,35 @@ int NdbQueryImpl::isPrunable(bool& pruna
/****************
- * NdbQueryImpl::SharedFragStack methods.
- ***************/
-
-NdbQueryImpl::SharedFragStack::SharedFragStack():
- m_errorCode(0),
- m_capacity(0),
- m_current(-1),
- m_array(NULL)
-{}
-
-int
-NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
- int capacity)
-{
- assert(m_array==NULL);
- assert(m_capacity==0);
- if (capacity > 0)
- { m_capacity = capacity;
- m_array =
- reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- }
- return 0;
-}
-
-void
-NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
-{
- m_current++;
- assert(m_current<m_capacity);
- m_array[m_current] = &frag;
-}
-
-/****************
* NdbQueryImpl::OrderedFragSet methods.
***************/
NdbQueryImpl::OrderedFragSet::OrderedFragSet():
m_capacity(0),
m_activeFragCount(0),
- m_emptiedFragCount(0),
+ m_fetchMoreFragCount(0),
m_finalFragCount(0),
m_ordering(NdbQueryOptions::ScanOrdering_void),
m_keyRecord(NULL),
m_resultRecord(NULL),
m_activeFrags(NULL),
- m_emptiedFrags(NULL)
+ m_fetchMoreFrags(NULL)
{
}
NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
{
m_activeFrags = NULL;
- m_emptiedFrags= NULL;
+ m_fetchMoreFrags = NULL;
}
+void NdbQueryImpl::OrderedFragSet::clear()
+{
+ m_activeFragCount = 0;
+ m_fetchMoreFragCount = 0;
+}
-int
+void
NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
NdbQueryOptions::ScanOrdering ordering,
int capacity,
@@ -3272,15 +3432,15 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
m_activeFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
- m_emptiedFrags =
+
+ m_fetchMoreFrags =
reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
- bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
+ bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
}
m_ordering = ordering;
m_keyRecord = keyRecord;
m_resultRecord = resultRecord;
- return 0;
-}
+} // OrderedFragSet::prepare()
/**
@@ -3316,8 +3476,7 @@ NdbQueryImpl::OrderedFragSet::getCurrent
assert(!m_activeFrags[m_activeFragCount-1]->isEmpty());
return m_activeFrags[m_activeFragCount-1];
}
-}
-
+} // OrderedFragSet::getCurrent()
/**
* Keep the FragSet ordered, both with respect to specified ScanOrdering, and
@@ -3329,42 +3488,45 @@ NdbQueryImpl::OrderedFragSet::getCurrent
void
NdbQueryImpl::OrderedFragSet::reorganize()
{
+ assert(m_activeFragCount > 0);
+ NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1];
+
// Remove the current fragment if the batch has been emptied.
- if (m_activeFragCount>0 && m_activeFrags[m_activeFragCount-1]->isEmpty())
+ if (frag->isEmpty())
{
- if (m_activeFrags[m_activeFragCount-1]->finalBatchReceived())
+ if (frag->finalBatchReceived())
{
m_finalFragCount++;
}
else
{
- m_emptiedFrags[m_emptiedFragCount++] = m_activeFrags[m_activeFragCount-1];
+ m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
}
m_activeFragCount--;
- assert(m_activeFragCount==0 ||
- !m_activeFrags[m_activeFragCount-1]->isEmpty());
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
<= m_capacity);
+
+ return; // Remaining m_activeFrags[] are sorted
}
// Reorder fragments if this is a sorted scan.
- if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered &&
- m_activeFragCount+m_finalFragCount == m_capacity)
+ if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
{
/**
* This is a sorted scan. There are more data to be read from
* m_activeFrags[m_activeFragCount-1]. Move it to its proper place.
+ *
+ * Use binary search to find the largest record that is smaller than or
+ * equal to m_activeFrags[m_activeFragCount-1].
*/
int first = 0;
int last = m_activeFragCount-1;
- /* Use binary search to find the largest record that is smaller than or
- * equal to m_activeFrags[m_activeFragCount-1] */
int middle = (first+last)/2;
- while(first<last)
+
+ while (first<last)
{
assert(middle<m_activeFragCount);
- const int cmpRes = compare(*m_activeFrags[m_activeFragCount-1],
- *m_activeFrags[middle]);
+ const int cmpRes = compare(*frag, *m_activeFrags[middle]);
if (cmpRes < 0)
{
first = middle + 1;
@@ -3380,87 +3542,54 @@ NdbQueryImpl::OrderedFragSet::reorganize
middle = (first+last)/2;
}
- assert(m_activeFragCount == 0 ||
- compare(*m_activeFrags[m_activeFragCount-1],
- *m_activeFrags[middle]) >= 0);
-
- if(middle < m_activeFragCount-1)
+ // Move into correct sorted position
+ if (middle < m_activeFragCount-1)
{
- NdbRootFragment* const oldTop = m_activeFrags[m_activeFragCount-1];
+ assert(compare(*frag, *m_activeFrags[middle]) >= 0);
memmove(m_activeFrags+middle+1,
m_activeFrags+middle,
(m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*));
- m_activeFrags[middle] = oldTop;
+ m_activeFrags[middle] = frag;
}
assert(verifySortOrder());
}
-}
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
+ <= m_capacity);
+} // OrderedFragSet::reorganize()
void
NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag)
{
- assert(&frag!=NULL);
+ assert(m_activeFragCount+m_finalFragCount < m_capacity);
- if (frag.isEmpty())
- {
- if (frag.finalBatchReceived())
- {
- m_finalFragCount++;
- }
- else
- {
- m_emptiedFrags[m_emptiedFragCount++] = &frag;
- }
- }
- else
+ m_activeFrags[m_activeFragCount++] = &frag; // Add avail fragment
+ reorganize(); // Move into position
+} // OrderedFragSet::add()
+
+void
+NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
+{
+ for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
{
- assert(m_activeFragCount+m_finalFragCount < m_capacity);
- if(m_ordering==NdbQueryOptions::ScanOrdering_unordered)
+ NdbRootFragment& rootFrag = rootFrags[fragNo];
+ if (rootFrag.hasReceivedMore()) // Another ResultSet is available
{
- m_activeFrags[m_activeFragCount++] = &frag;
+ rootFrag.grabNextResultSet(); // Get new ResultSet.
+ add(rootFrag); // Make avail. to appl. thread
}
- else
- {
- int current = 0;
- // Insert the new frag such that the array remains sorted.
- while(current<m_activeFragCount &&
- compare(frag, *m_activeFrags[current]) < 0)
- {
- current++;
- }
- memmove(m_activeFrags+current+1,
- m_activeFrags+current,
- (m_activeFragCount - current) * sizeof(NdbRootFragment*));
- m_activeFrags[current] = &frag;
- m_activeFragCount++;
- assert(verifySortOrder());
- }
- }
- assert(m_activeFragCount==0 ||
- !m_activeFrags[m_activeFragCount-1]->isEmpty());
- assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount
- <= m_capacity);
-}
+ } // for all 'rootFrags[]'
-void NdbQueryImpl::OrderedFragSet::clear()
-{
- m_activeFragCount = 0;
- m_emptiedFragCount = 0;
- m_finalFragCount = 0;
-}
+ assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
+ <= m_capacity);
+} // OrderedFragSet::prepareMoreResults()
-NdbRootFragment*
-NdbQueryImpl::OrderedFragSet::getEmpty()
+Uint32
+NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
{
- if (m_emptiedFragCount > 0)
- {
- assert(m_emptiedFrags[m_emptiedFragCount-1]->isEmpty());
- return m_emptiedFrags[--m_emptiedFragCount];
- }
- else
- {
- return NULL;
- }
+ const int cnt = m_fetchMoreFragCount;
+ frags = m_fetchMoreFrags;
+ m_fetchMoreFragCount = 0;
+ return cnt;
}
bool
@@ -3477,7 +3606,6 @@ NdbQueryImpl::OrderedFragSet::verifySort
return true;
}
-
/**
* Compare frags such that f1<f2 if f1 is empty but f2 is not.
* - Othewise compare record contents.
@@ -3881,7 +4009,7 @@ NdbQueryOperationImpl::nextResult(bool f
void
NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
{
- const char* buff = resultStream.getReceiver().peek_row();
+ const char* buff = resultStream.getCurrentRow();
assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
m_isRowNull = false;
@@ -4796,14 +4924,11 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
assert(false);
return false;
}
- rootFrag->setConfReceived();
+ // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF
+ rootFrag->setConfReceived(tcPtrI);
rootFrag->setRemainingSubScans(nodeMask);
rootFrag->incrOutstandingResults(rowCount);
- // Handle for SCAN_NEXTREQ, RNIL -> EOF
- NdbResultStream& resultStream = rootFrag->getResultStream(*this);
- resultStream.getReceiver().m_tcPtrI = tcPtrI;
-
if(traceSignals){
ndbout << " resultStream {" << rootFrag->getResultStream(*this)
<< "} fragNo" << rootFrag->getFragNo()
@@ -5033,7 +5158,7 @@ NdbOut& operator<<(NdbOut& out, const Nd
}
NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
- out << " m_rowCount: " << stream.m_rowCount;
+ out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
return out;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-16 07:56:53 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000
@@ -271,62 +271,17 @@ private:
FetchResult_noMoreCache = 2
};
- /** A stack of NdbRootFragment pointers.
- * NdbRootFragments which are 'BatchComplete' are pushed on this stack by
- * the receiver thread, and later pop'ed into the application thread when
- * it need more results to process.
- * Due to this shared usage, the PollGuard mutex must be set before
- * accessing SharedFragStack.
+ /**
+ * Container of root fragments that the application is currently
+ * iterating over. 'Owned' by application thread and can be accesed
+ * without requiring a mutex lock.
+ * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults()
+ *
*/
- class SharedFragStack{
- public:
- // For calculating need for dynamically allocated memory.
- static const Uint32 pointersPerFragment = 2;
-
- explicit SharedFragStack();
-
- /**
- * Prepare internal datastructures.
- * param[in] allocator For allocating arrays of pointers.
- * param[in] capacity Max no of root fragments.
- * @return 0 if ok, else errorcode
- */
- int prepare(NdbBulkAllocator& allocator, int capacity);
-
- NdbRootFragment* pop() {
- return m_current>=0 ? m_array[m_current--] : NULL;
- }
-
- void push(NdbRootFragment& frag);
-
- int size() const {
- return (m_current+1);
- }
-
- void clear() {
- m_current = -1;
- }
-
- /** Possible error received from TC / datanodes. */
- int m_errorCode;
-
- private:
- /** Capacity of stack.*/
- int m_capacity;
-
- /** Index of current top of stack.*/
- int m_current;
- NdbRootFragment** m_array;
-
- // No copying.
- SharedFragStack(const SharedFragStack&);
- SharedFragStack& operator=(const SharedFragStack&);
- }; // class SharedFragStack
-
class OrderedFragSet{
public:
// For calculating need for dynamically allocated memory.
- static const Uint32 pointersPerFragment = 1;
+ static const Uint32 pointersPerFragment = 2;
explicit OrderedFragSet();
@@ -339,43 +294,55 @@ private:
* param[in] capacity Max no of root fragments.
* @return 0 if ok, else errorcode
*/
- int prepare(NdbBulkAllocator& allocator,
- NdbQueryOptions::ScanOrdering ordering,
- int capacity,
- const NdbRecord* keyRecord,
- const NdbRecord* resultRecord);
+ void prepare(NdbBulkAllocator& allocator,
+ NdbQueryOptions::ScanOrdering ordering,
+ int capacity,
+ const NdbRecord* keyRecord,
+ const NdbRecord* resultRecord);
+
+ /**
+ * Add root fragments with completed ResultSets to this OrderedFragSet.
+ * The PollGuard mutex must locked, and under its protection
+ * completed root fragments are 'consumed' from rootFrags[] and
+ * added to OrderedFragSet where it become available for the
+ * application thread.
+ */
+ void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt); // Need mutex lock
/** Get the root fragment from which to read the next row.*/
NdbRootFragment* getCurrent() const;
- /** Re-organize the fragments after a row has been consumed. This is
- * needed to remove fragements that has been needed, and to re-sort
- * fragments if doing a sorted scan.*/
+ /**
+ * Re-organize the fragments after a row has been consumed. This is
+ * needed to remove fragments that has been emptied, and to re-sort
+ * fragments if doing a sorted scan.
+ */
void reorganize();
- /** Add a complete fragment that has been received.*/
- void add(NdbRootFragment& frag);
-
/** Reset object to an empty state.*/
void clear();
- /** Get a fragment where all rows have been consumed. (This method is
- * not idempotent - the fragment is removed from the set.
- * @return Emptied fragment (or NULL if there are no more emptied
- * fragments).
+ /**
+ * Get all fragments where more rows may be (pre-)fetched.
+ * (This method is not idempotent - the fragments are removed
+ * from the set.)
+ * @return Number of fragments (in &frags) from which more
+ * results should be requested.
*/
- NdbRootFragment* getEmpty();
+ Uint32 getFetchMore(NdbRootFragment** &rootFrags);
private:
- /** Max no of fragments.*/
+ /** No of fragments to read until '::finalBatchReceived()'.*/
int m_capacity;
/** Number of fragments in 'm_activeFrags'.*/
int m_activeFragCount;
- /** Number of fragments in 'm_emptiedFrags'. */
- int m_emptiedFragCount;
- /** Number of fragments where the final batch has been received
- * and consumed.*/
+ /** Number of fragments in 'm_fetchMoreFrags'. */
+ int m_fetchMoreFragCount;
+ /**
+ * Number of fragments where the final batch has been received
+ * and consumed.
+ */
int m_finalFragCount;
/** Ordering of index scan result.*/
NdbQueryOptions::ScanOrdering m_ordering;
@@ -383,15 +350,21 @@ private:
const NdbRecord* m_keyRecord;
/** Needed for comparing records when ordering results.*/
const NdbRecord* m_resultRecord;
- /** Fragments where some tuples in the current batch has not yet been
- * consumed.*/
+ /**
+ * Fragments where some tuples in the current ResultSet has not
+ * yet been consumed.
+ */
NdbRootFragment** m_activeFrags;
- /** Fragments where all tuples in the current batch have been consumed,
- * but where there are more batches to fetch.*/
- NdbRootFragment** m_emptiedFrags;
- // No copying.
- OrderedFragSet(const OrderedFragSet&);
- OrderedFragSet& operator=(const OrderedFragSet&);
+ /**
+ * Fragments from which we should request more ResultSets.
+ * Either due to the current batch has been consumed, or double buffering
+ * of result sets allows us to request another batch before the current
+ * has been consumed.
+ */
+ NdbRootFragment** m_fetchMoreFrags;
+
+ /** Add a complete fragment that has been received.*/
+ void add(NdbRootFragment& frag);
/** For sorting fragment reads according to index value of first record.
* Also f1<f2 if f2 has reached end of data and f1 has not.
@@ -401,6 +374,10 @@ private:
/** For debugging purposes.*/
bool verifySortOrder() const;
+
+ // No copying.
+ OrderedFragSet(const OrderedFragSet&);
+ OrderedFragSet& operator=(const OrderedFragSet&);
}; // class OrderedFragSet
/** The interface that is visible to the application developer.*/
@@ -429,6 +406,14 @@ private:
/** Possible error status of this query.*/
NdbError m_error;
+
+ /**
+ * Possible error received from TC / datanodes.
+ * Only access w/ PollGuard mutex as it is set by receiver thread.
+ * Checked and moved into 'm_error' with ::hasReceivedError().
+ */
+ int m_errorReceived; // BEWARE: protect with PollGuard mutex
+
/** Transaction in which this query instance executes.*/
NdbTransaction& m_transaction;
@@ -448,7 +433,7 @@ private:
Uint32 m_globalCursor;
/** Number of root fragments not yet completed within the current batch.
- * Only access w/ PollGuard mutex as it is also updated by receiver threa
+ * Only access w/ PollGuard mutex as it is also updated by receiver thread
*/
Uint32 m_pendingFrags; // BEWARE: protect with PollGuard mutex
@@ -469,11 +454,6 @@ private:
*/
OrderedFragSet m_applFrags;
- /** Root frgaments that have received a complete batch. Shared between
- * application thread and receiving thread. Access should be mutex protected.
- */
- SharedFragStack m_fullFrags; // BEWARE: protect with PollGuard mutex
-
/** Number of root fragments for which confirmation for the final batch
* (with tcPtrI=RNIL) has been received. Observe that even if
* m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
@@ -549,10 +529,12 @@ private:
/** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
* @return 0 if send succeeded, -1 otherwise.
*/
- int sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend);
+ int sendFetchMore(NdbRootFragment* rootFrags[], Uint32 cnt,
+ bool forceSend);
/** Wait for more scan results which already has been REQuested to arrive.
- * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * @return 0 if some rows did arrive, a negative value if there are errors
+ * (in m_error.code),
* and 1 of there are no more rows to receive.
*/
FetchResult awaitMoreResults(bool forceSend);
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-17 12:36:56 +0000
@@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo
if (useRec)
{
m_record.m_ndb_record= NULL;
- m_record.m_row= NULL;
+ m_record.m_row_recv= NULL;
m_record.m_row_buffer= NULL;
m_record.m_row_offset= 0;
m_record.m_read_range_no= false;
@@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord*
assert(m_using_ndb_record);
m_record.m_ndb_record= rec;
- m_record.m_row= row_ptr;
+ m_record.m_row_recv= row_ptr;
m_record.m_row_offset= rec->m_row_size;
}
-#define KEY_ATTR_ID (~(Uint32)0)
+void
+NdbReceiver::prepareReceive(char *buf)
+{
+ /* Set pointers etc. to prepare for receiving the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_received_result_length = 0;
+ m_expected_result_length = 0;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_recv= buf;
+ }
+ theCurrentRecAttr = theFirstRecAttr;
+}
+
+void
+NdbReceiver::prepareRead(char *buf, Uint32 rows)
+{
+ /* Set pointers etc. to prepare for reading the first row of the batch. */
+ assert(theMagicNumber == 0x11223344);
+ m_current_row = 0;
+ m_result_rows = rows;
+ if (m_using_ndb_record)
+ {
+ m_record.m_row_buffer = buf;
+ }
+}
+
+ #define KEY_ATTR_ID (~(Uint32)0)
/*
Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
@@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd
{
m_using_ndb_record= true;
m_record.m_ndb_record= ndb_record;
- m_record.m_row= row_buffer;
+ m_record.m_row_recv= row_buffer;
m_record.m_row_buffer= row_buffer;
m_record.m_row_offset= rowsize;
m_record.m_read_range_no= read_range_no;
@@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui
{
if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
{
- setRecToNULL(col, m_record.m_row);
+ setRecToNULL(col, m_record.m_row_recv);
// Next column...
continue;
@@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert(m_record.m_read_range_no);
assert(attrSize==4);
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
- memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
+ memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
+ aDataPtr++, 4);
aLength--;
continue; // Next
}
@@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
aDataPtr,
- m_record.m_row);
+ m_record.m_row_recv);
aDataPtr+= len;
aLength-= len;
continue; // Next
@@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32
/* Save this extra getValue */
save_pos+= sizeof(Uint32);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
&attrSize, sizeof(Uint32));
if (attrSize > 0)
{
save_pos+= attrSize;
assert (save_pos<=m_record.m_row_offset);
- memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+ memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
aDataPtr, attrSize);
}
@@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
if (m_using_ndb_record) {
/* Move onto next row in scan buffer */
- m_record.m_row+= m_record.m_row_offset;
+ m_record.m_row_recv+= m_record.m_row_offset;
}
return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (pekka.nousiainen:4461 to 4472)WL#4124 | Pekka Nousiainen | 22 Aug |