4426 Pekka Nousiainen 2011-08-16 [merge]
merge telco-7.0 to wl4124-new2
modified:
mysql-test/r/group_by.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf
mysql-test/t/group_by.test
sql/sql_select.cc
storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
4425 Pekka Nousiainen 2011-08-16
wl#4124 x20_fix.diff
no dtor is called due to useless pthread_exit(0)
modified:
sql/ha_ndb_index_stat.cc
=== modified file 'mysql-test/r/group_by.result'
--- a/mysql-test/r/group_by.result 2010-10-29 08:23:06 +0000
+++ b/mysql-test/r/group_by.result 2011-08-16 10:20:19 +0000
@@ -1856,3 +1856,18 @@ COUNT(*)
2
DROP TABLE t1;
# End of 5.1 tests
+#
+# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+#
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk
+GROUP BY v1.pk;
+pk
+DROP VIEW v1;
+DROP TABLE t1,t2;
+# End of Bug#12798270
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf 2011-07-07 14:48:06 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf 2011-07-11 10:40:00 +0000
@@ -25,14 +25,7 @@ skip-slave-start
[mysqld.2.slave]
server-id= 4
log-bin = sec-master-2
-master-host= 127.0.0.1
-master-port= @mysqld.2.1.port
-master-password= @mysqld.2.1.#password
-master-user= @mysqld.2.1.#user
-master-connect-retry= 1
-init-rpl-role= slave
skip-slave-start
-ndb_connectstring= @mysql_cluster.slave.ndb_connectstring
[ENV]
=== modified file 'mysql-test/t/group_by.test'
--- a/mysql-test/t/group_by.test 2010-10-29 08:23:06 +0000
+++ b/mysql-test/t/group_by.test 2011-08-16 10:20:19 +0000
@@ -1248,3 +1248,24 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1;
DROP TABLE t1;
--echo # End of 5.1 tests
+
+--echo #
+--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+--echo #
+
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk
+GROUP BY v1.pk;
+
+DROP VIEW v1;
+DROP TABLE t1,t2;
+
+--echo # End of Bug#12798270
=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc 2011-06-30 15:59:25 +0000
+++ b/sql/sql_select.cc 2011-08-16 10:20:19 +0000
@@ -6876,7 +6876,15 @@ make_join_readinfo(JOIN *join, ulonglong
(join->sort_by_table == (TABLE *) 1 && i != join->const_tables)))
ordered_set= 1;
+#ifdef MCP_BUG12798270
tab->sorted= sorted;
+#else
+ /*
+ For eq_ref there is at most one join match for each row from
+ previous tables so ordering is not useful.
+ */
+ tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false;
+#endif
sorted= 0; // only first must be sorted
table->status=STATUS_NO_RECORD;
pick_table_access_method (tab);
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-08-16 08:47:35 +0000
@@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
{
const Frag& frag = node.m_frag;
const Index& index = *c_indexPool.getPtr(frag.m_indexId);
- KeyData prefKey(index.m_keySpec, false, 0);
- prefKey.set_buf(node.getPref(), index.m_prefBytes);
+ /*
+ * bug#12873640
+ * Node prefix exists if it has non-zero number of attributes. It is
+ * then a partial instance of KeyData. If the prefix does not exist
+ * then set_buf() could overwrite m_pageId1 in first entry, causing
+ * random crash in TUP via readKeyAttrs().
+ */
if (index.m_prefAttrs > 0) {
+ KeyData prefKey(index.m_keySpec, false, 0);
+ prefKey.set_buf(node.getPref(), index.m_prefBytes);
jam();
readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
- }
#ifdef VM_TRACE
- if (debugFlags & DebugMaint) {
- debugOut << "setNodePref: " << node;
- debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
- debugOut << endl;
- }
+ if (debugFlags & DebugMaint) {
+ debugOut << "setNodePref: " << node;
+ debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
+ }
#endif
+ }
}
// node operations
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-08-16 08:27:14 +0000
@@ -39,6 +39,16 @@ static const char * f_method = "MSms";
#endif
#define MAX_CHUNKS 10
+#ifdef VM_TRACE
+#ifndef NDBD_RANDOM_START_PAGE
+#define NDBD_RANDOM_START_PAGE
+#endif
+#endif
+
+#ifdef NDBD_RANDOM_START_PAGE
+static Uint32 g_random_start_page_id = 0;
+#endif
+
/*
* For muti-threaded ndbd, these calls are used for locking around
* memory allocation operations.
@@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager()
mt_mem_manager_init();
}
+void*
+Ndbd_mem_manager::get_memroot() const
+{
+#ifdef NDBD_RANDOM_START_PAGE
+ return (void*)(m_base_page - g_random_start_page_id);
+#else
+ return (void*)m_base_page;
+#endif
+}
+
/**
*
* resource 0 has following semantics:
@@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun
}
#endif
+#ifdef NDBD_RANDOM_START_PAGE
+ /**
+ * In order to find bad-users of page-id's
+ * we add a random offset to the page-id's returned
+ * however, due to ZONE_LO that offset can't be that big
+ * (since we at get_page don't know if it's a HI/LO page)
+ */
+ Uint32 max_rand_start = ZONE_LO_BOUND - 1;
+ if (max_rand_start > pages)
+ {
+ max_rand_start -= pages;
+ if (max_rand_start > 0x10000)
+ g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000));
+ else if (max_rand_start)
+ g_random_start_page_id = rand() % max_rand_start;
+
+ assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF);
+
+ ndbout_c("using g_random_start_page_id: %u (%.8x)",
+ g_random_start_page_id, g_random_start_page_id);
+ }
+#endif
+
/**
* Do malloc
*/
@@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone,
return;
* pages = save;
}
-
+
alloc_impl(ZONE_LO, ret, pages, min);
}
@@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type
check_resource_limits(m_resource_limit);
mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+ return m_base_page + *i - g_random_start_page_id;
+#else
return m_base_page + *i;
+#endif
}
}
mt_mem_manager_unlock();
@@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty
mt_mem_manager_lock();
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
-
+
+#ifdef NDBD_RANDOM_START_PAGE
+ i -= g_random_start_page_id;
+#endif
+
Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
release(i, 1);
m_resource_limit[0].m_curr = tot.m_curr - 1;
@@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ
m_resource_limit[idx].m_curr = rl.m_curr + req;
check_resource_limits(m_resource_limit);
mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+#endif
return ;
}
mt_mem_manager_unlock();
* cnt = req;
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+#endif
return;
}
@@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t
mt_mem_manager_lock();
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
-
+
+#ifdef NDBD_RANDOM_START_PAGE
+ i -= g_random_start_page_id;
+#endif
+
release(i, cnt);
Uint32 currnew = rl.m_curr - cnt;
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-08-16 08:27:14 +0000
@@ -68,7 +68,7 @@ public:
bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true);
void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0);
- void* get_memroot() const { return (void*)m_base_page;}
+ void* get_memroot() const;
void dump() const ;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-07-01 10:02:15 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:37:24 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
* children. That way, results for child operations can be updated correctly
* when the application iterates over the results of the root scan operation.
*/
+class TupleCorrelation
+{
+public:
+ static const Uint32 wordCount = 1;
+
+ explicit TupleCorrelation()
+ : m_correlation((tupleNotFound<<16) | tupleNotFound)
+ {}
+
+ /** Conversion to/from Uint32 to store/fetch from buffers */
+ explicit TupleCorrelation(Uint32 val)
+ : m_correlation(val)
+ {}
+ Uint32 toUint32() const
+ { return m_correlation; }
+
+ Uint16 getTupleId() const
+ { return m_correlation & 0xffff;}
+
+ Uint16 getParentTupleId() const
+ { return m_correlation >> 16;}
+
+private:
+ Uint32 m_correlation;
+}; // class TupleCorrelation
+
class CorrelationData
{
public:
@@ -99,18 +125,15 @@ public:
assert(AttributeHeader(m_corrPart[0]).getAttributeId()
== AttributeHeader::CORR_FACTOR64);
assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
- assert(getTupleId()<tupleNotFound);
- assert(getParentTupleId()<tupleNotFound);
+ assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+ assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
}
Uint32 getRootReceiverId() const
{ return m_corrPart[2];}
- Uint16 getTupleId() const
- { return m_corrPart[1] & 0xffff;}
-
- Uint16 getParentTupleId() const
- { return m_corrPart[1] >> 16;}
+ const TupleCorrelation getTupleCorrelation() const
+ { return TupleCorrelation(m_corrPart[1]); }
private:
const Uint32* const m_corrPart;
@@ -148,6 +171,8 @@ public:
explicit NdbRootFragment();
+ ~NdbRootFragment();
+
/**
* Initialize object.
* @param query Enclosing query.
@@ -164,6 +189,11 @@ public:
*/
void reset();
+ /**
+ * Prepare for reading another batch of results.
+ */
+ void prepareResultSet();
+
void incrOutstandingResults(Int32 delta)
{
m_outstandingResults += delta;
@@ -198,9 +228,14 @@ public:
* @param operationNo The id of the operation.
* @return The result stream for this root fragment.
*/
- NdbResultStream& getResultStream(Uint32 operationNo) const
- { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); }
-
+ NdbResultStream& getResultStream(Uint32 operationNo) const;
+
+ NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
+ { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
+
+ Uint32 getReceiverId() const;
+ Uint32 getReceiverTcPtrI() const;
+
/**
* @return True if there are no more batches to be received for this fragment.
*/
@@ -212,6 +247,19 @@ public:
*/
bool isEmpty() const;
+ /**
+ * This method is used for marking which streams belonging to this
+ * NdbRootFragment which has remaining batches for a sub scan
+ * instantiated from the current batch of its parent operation.
+ */
+ void setRemainingSubScans(Uint32 nodeMask)
+ {
+ m_remainingScans = nodeMask;
+ }
+
+ /** Release resources after last row has been returned */
+ void postFetchRelease();
+
private:
STATIC_CONST( voidFragNo = 0xffffffff);
@@ -221,6 +269,9 @@ private:
/** Number of the root operation fragment.*/
Uint32 m_fragNo;
+ /** For processing results originating from this root fragment (Array of).*/
+ NdbResultStream* m_resultStreams;
+
/**
* The number of outstanding TCKEYREF or TRANSID_AI
* messages for the fragment. This includes both messages related to the
@@ -239,6 +290,12 @@ private:
* TCKEYCONF message has been received */
bool m_confReceived;
+ /**
+ * A bitmask of operation id's for which we will receive more
+ * ResultSets in a NEXTREQ.
+ */
+ Uint32 m_remainingScans;
+
/**
* Used for implementing a hash map from root receiver ids to a
* NdbRootFragment instance. m_idMapHead is the index of the first
@@ -272,26 +329,19 @@ public:
* @param operation The operation for which we will receive results.
* @param rootFragNo 0..n-1 when the root operation reads from n fragments.
*/
- explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo);
+ explicit NdbResultStream(NdbQueryOperationImpl& operation,
+ NdbRootFragment& rootFrag);
~NdbResultStream();
/**
* Prepare for receiving first results.
- * @return possible error code.
*/
- int prepare();
+ void prepare();
/** Prepare for receiving next batch of scan results. */
void reset();
- /**
- * 0..n-1 if the root operation reads from n fragments. This stream holds data
- * derived from one of those fragments.
- */
- Uint32 getRootFragNo() const
- { return m_rootFragNo; }
-
NdbReceiver& getReceiver()
{ return m_receiver; }
@@ -301,6 +351,9 @@ public:
Uint32 getRowCount() const
{ return m_rowCount; }
+ char* getRow(Uint32 tupleNo) const
+ { return (m_buffer + (tupleNo*m_rowSize)); }
+
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
* ids and pass it on to m_receiver.
@@ -308,12 +361,15 @@ public:
* @param ptr buffer holding tuple.
* @param len buffer length.
*/
- void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+ void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+ TupleCorrelation correlation);
- /** A complete batch has been received for a fragment on this NdbResultStream,
- * Update whatever required before the appl. are allowed to navigate the result.
+ /**
+ * A complete batch has been received for a fragment on this NdbResultStream,
+ * Update whatever required before the appl. are allowed to navigate the result.
+ * @return true if node and all its siblings have returned all rows.
*/
- void handleBatchComplete();
+ bool prepareResultSet(Uint32 remainingScans);
/**
* Navigate within the current result batch to resp. first and next row.
@@ -333,36 +389,33 @@ public:
{ return m_iterState == Iter_finished; }
/**
- * This method is
- * used for marking a stream as holding the last batch of a sub scan.
- * This means that it is the last batch of the scan that was instantiated
- * from the current batch of its parent operation.
- */
- void setSubScanCompletion(bool complete)
- {
- // Lookups should always be 'complete'
- assert(complete || m_operation.getQueryOperationDef().isScanOperation());
- m_subScanComplete = complete;
- }
-
- /**
* This method
- * returns true if this result stream holds the last batch of a sub scan
+ * returns true if this result stream holds the last batch of a sub scan.
* This means that it is the last batch of the scan that was instantiated
* from the current batch of its parent operation.
*/
- bool isSubScanComplete() const
+ bool isSubScanComplete(Uint32 remainingScans) const
{
- // Lookups should always be 'complete'
- assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
- return m_subScanComplete;
+ /**
+ * Find the node number seen by the SPJ block. Since a unique index
+ * operation will have two distincts nodes in the tree used by the
+ * SPJ block, this number may be different from 'opNo'.
+ */
+ const Uint32 internalOpNo = m_operation.getQueryOperationDef().getQueryOperationId();
+
+ const bool complete = !((remainingScans >> internalOpNo) & 1);
+ assert(complete || isScanResult()); // Lookups should always be 'complete'
+ return complete;
}
- /** Variant of isSubScanComplete() above which checks that this resultstream
- * and all its descendants have consumed all batches of rows instantiated
- * from their parent operation(s).
- */
- bool isAllSubScansComplete() const;
+ bool isScanQuery() const
+ { return (m_properties & Is_Scan_Query); }
+
+ bool isScanResult() const
+ { return (m_properties & Is_Scan_Result); }
+
+ bool isInnerJoin() const
+ { return (m_properties & Is_Inner_Join); }
/** For debugging.*/
friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
@@ -405,7 +458,7 @@ public:
* that had no matching children.*/
Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
- explicit TupleSet()
+ explicit TupleSet() : m_hash_head(tupleNotFound)
{}
private:
@@ -415,22 +468,39 @@ public:
};
private:
- /** This stream handles results derived from the m_rootFragNo'th
- * fragment of the root operation.*/
- const Uint32 m_rootFragNo;
+ /**
+ * This stream handles results derived from specified
+ * m_rootFrag of the root operation.
+ */
+ const NdbRootFragment& m_rootFrag;
+
+ /** Operation to which this resultStream belong.*/
+ NdbQueryOperationImpl& m_operation;
+
+ /** ResultStream for my parent operation, or NULL if I am root */
+ NdbResultStream* const m_parent;
+
+ const enum properties
+ {
+ Is_Scan_Query = 0x01,
+ Is_Scan_Result = 0x02,
+ Is_Inner_Join = 0x10
+ } m_properties;
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
- /** Max #rows which this stream may recieve in its buffer structures */
- Uint32 m_maxRows;
+ /** The buffers which we receive the results into */
+ char* m_buffer;
+
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
+
+ Uint32 m_rowSize;
/** The number of transid_AI messages received.*/
Uint32 m_rowCount;
- /** Operation to which this resultStream belong.*/
- NdbQueryOperationImpl& m_operation;
-
/** This is the state of the iterator used by firstResult(), nextResult().*/
enum
{
@@ -442,24 +512,19 @@ private:
Iter_finished
} m_iterState;
- /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
+ /**
+ * Tuple id of the current tuple, or 'tupleNotFound'
+ * if Iter_notStarted or Iter_finished.
+ */
Uint16 m_currentRow;
- /**
- * This field is only used for result streams of scan operations. If set,
- * it indicates that the stream is holding the last batch of a sub scan.
- * This means that it is the last batch of the scan that was instantiated
- * from the current batch of its parent operation.
- */
- bool m_subScanComplete;
+ /** Max #rows which this stream may recieve in its TupleSet structures */
+ Uint32 m_maxRows;
+ /** TupleSet contains the correlation between parent/childs */
TupleSet* m_tupleSet;
- void clearTupleSet();
-
- void setParentChildMap(Uint16 parentId,
- Uint16 tupleId,
- Uint16 tupleNo);
+ void buildResultCorrelations();
Uint16 getTupleId(Uint16 tupleNo) const
{ return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -525,15 +590,30 @@ void* NdbBulkAllocator::allocObjMem(Uint
///////// NdbResultStream methods ///////////
//////////////////////////////////////////////
-NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo):
- m_rootFragNo(rootFragNo),
+NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
+ NdbRootFragment& rootFrag)
+:
+ m_rootFrag(rootFrag),
+ m_operation(operation),
+ m_parent(operation.getParentOperation()
+ ? &rootFrag.getResultStream(*operation.getParentOperation())
+ : NULL),
+ m_properties(
+ (enum properties)
+ ((operation.getQueryDef().isScanQuery()
+ ? Is_Scan_Query : 0)
+ | (operation.getQueryOperationDef().isScanOperation()
+ ? Is_Scan_Result : 0)
+ | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
+ ? Is_Inner_Join : 0))),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
- m_maxRows(0),
+ m_buffer(NULL),
+ m_batchOverflowCheck(NULL),
+ m_rowSize(0),
m_rowCount(0),
- m_operation(operation),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
- m_subScanComplete(true),
+ m_maxRows(0),
m_tupleSet(NULL)
{};
@@ -545,41 +625,58 @@ NdbResultStream::~NdbResultStream()
}
}
-int // Return 0 if ok, else errorcode
+void
NdbResultStream::prepare()
{
+ const Uint32 rowSize = m_operation.getRowSize();
+ NdbQueryImpl &query = m_operation.getQuery();
+
+ m_rowSize = rowSize;
+
/* Parent / child correlation is only relevant for scan type queries
* Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
* Neither is these structures required for operations not having respective
* child or parent operations.
*/
- if (m_operation.getQueryDef().isScanQuery())
+ if (isScanQuery())
{
m_maxRows = m_operation.getMaxBatchRows();
m_tupleSet =
- new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows))
+ new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
TupleSet[m_maxRows];
-
- clearTupleSet();
}
else
m_maxRows = 1;
- return 0;
-} //NdbResultStream::prepare
+ const int bufferSize = rowSize * m_maxRows;
+ NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+ m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+ // So that we can test for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
+
+ m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
+ m_receiver.do_setup_ndbrecord(
+ m_operation.getNdbRecord(),
+ m_maxRows,
+ 0 /*key_size*/,
+ 0 /*read_range_no*/,
+ rowSize,
+ m_buffer);
+} //NdbResultStream::prepare
void
NdbResultStream::reset()
{
- assert (m_operation.getQueryDef().isScanQuery());
+ assert (isScanQuery());
// Root scan-operation need a ScanTabConf to complete
m_rowCount = 0;
m_iterState = Iter_notStarted;
m_currentRow = tupleNotFound;
- clearTupleSet();
m_receiver.prepareSend();
/**
* If this stream will get new rows in the next batch, then so will
@@ -589,88 +686,14 @@ NdbResultStream::reset()
childNo++)
{
NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- child.getResultStream(getRootFragNo()).reset();
+ m_rootFrag.getResultStream(child).reset();
}
} //NdbResultStream::reset
-void
-NdbResultStream::clearTupleSet()
-{
- assert (m_operation.getQueryDef().isScanQuery());
- for (Uint32 i=0; i<m_maxRows; i++)
- {
- m_tupleSet[i].m_parentId = tupleNotFound;
- m_tupleSet[i].m_tupleId = tupleNotFound;
- m_tupleSet[i].m_hash_head = tupleNotFound;
- m_tupleSet[i].m_skip = false;
- m_tupleSet[i].m_hasMatchingChild.clear();
- }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{
- // Lookups should always be 'complete'
- assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
-
- if (!m_subScanComplete)
- return false;
-
- for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
- childNo++)
- {
- const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- const NdbResultStream& childStream = child.getResultStream(getRootFragNo());
- if (!childStream.isAllSubScansComplete())
- return false;
- }
- return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
- Uint16 tupleId,
- Uint16 tupleNo)
-{
- assert (m_operation.getQueryDef().isScanQuery());
- assert (tupleNo < m_maxRows);
- assert (tupleId != tupleNotFound);
-
- for (Uint32 i = 0; i < tupleNo; i++)
- {
- // Check that tuple id is unique.
- assert (m_tupleSet[i].m_tupleId != tupleId);
- }
- m_tupleSet[tupleNo].m_parentId = parentId;
- m_tupleSet[tupleNo].m_tupleId = tupleId;
-
- const Uint16 hash = (parentId % m_maxRows);
- if (parentId == tupleNotFound)
- {
- /* Root stream: Insert sequentially in hash_next to make it
- * possible to use ::findTupleWithParentId() and ::findNextTuple()
- * to navigate even the root operation.
- */
- assert (m_operation.getParentOperation()==NULL);
- /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
- if (tupleNo==0)
- m_tupleSet[hash].m_hash_head = 0;
- else
- m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
- m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
- }
- else
- {
- /* Insert parentId in HashMap */
- m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
- m_tupleSet[hash].m_hash_head = tupleNo;
- }
-}
-
/** Locate, and return 'tupleNo', of first tuple with specified parentId.
* parentId == tupleNotFound is use as a special value for iterating results
- * from the root operation in the order which they was inserted by ::setParentChildMap()
+ * from the root operation in the order which they was inserted by
+ * ::buildResultCorrelations()
*
* Position of 'currentRow' is *not* updated and should be modified by callee
* if it want to keep the new position.
@@ -678,7 +701,7 @@ NdbResultStream::setParentChildMap(Uint1
Uint16
NdbResultStream::findTupleWithParentId(Uint16 parentId) const
{
- assert ((parentId==tupleNotFound) == (m_operation.getParentOperation()==NULL));
+ assert ((parentId==tupleNotFound) == (m_parent==NULL));
if (likely(m_rowCount>0))
{
@@ -736,14 +759,10 @@ NdbResultStream::findNextTuple(Uint16 tu
Uint16
NdbResultStream::firstResult()
{
- NdbQueryOperationImpl* parent = m_operation.getParentOperation();
-
Uint16 parentId = tupleNotFound;
- if (parent!=NULL)
+ if (m_parent!=NULL)
{
- const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo);
- parentId = parentStream.getCurrentTupleId();
-
+ parentId = m_parent->getCurrentTupleId();
if (parentId == tupleNotFound)
{
m_currentRow = tupleNotFound;
@@ -780,104 +799,195 @@ NdbResultStream::nextResult()
} //NdbResultStream::nextResult()
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+ TupleCorrelation correlation)
{
- assert(m_iterState == Iter_notStarted);
- if (m_operation.getQueryDef().isScanQuery())
- {
- const CorrelationData correlData(ptr, len);
-
- assert(m_operation.getRoot().getResultStream(m_rootFragNo)
- .m_receiver.getId() == correlData.getRootReceiverId());
-
- m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
+ m_receiver.execTRANSID_AI(ptr, len);
+ m_rowCount++;
+ if (isScanQuery())
+ {
/**
- * Keep correlation data between parent and child tuples.
- * Since tuples may arrive in any order, we cannot match
- * parent and child until all tuples (for this batch and
- * root fragment) have arrived.
+ * Store TupleCorrelation as hidden value imm. after received row
+ * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
*/
- setParentChildMap(m_operation.getParentOperation()==NULL
- ? tupleNotFound
- : correlData.getParentTupleId(),
- correlData.getTupleId(),
- m_rowCount);
+ Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+ row_recv[-1] = correlation.toUint32();
}
- else
- {
- // Lookup query.
- m_receiver.execTRANSID_AI(ptr, len);
- }
- m_rowCount++;
- /* Set correct #rows received in the NdbReceiver.
- */
- getReceiver().m_result_rows = getRowCount();
} // NdbResultStream::execTRANSID_AI()
-
/**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ * - Fill in parent/child result correlations in m_tupleSet[]
+ * - ... or reset m_tupleSet[] if we reuse the previous.
+ * - Apply inner/outer join filtering to remove non qualifying
+ * rows.
*/
-void
-NdbResultStream::handleBatchComplete()
+bool
+NdbResultStream::prepareResultSet(Uint32 remainingScans)
{
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
+ assert(isComplete || isScanResult()); //Lookups always 'complete'
+
+ // Set correct #rows received in the NdbReceiver.
+ getReceiver().m_result_rows = getRowCount();
+
+ /**
+ * Prepare NdbResultStream for reading - either the next received
+ * from datanodes or reuse current.
+ */
+ if (m_tupleSet!=NULL)
{
- m_tupleSet[tupleNo].m_skip = false;
+ const bool newResults = (m_iterState!=Iter_finished);
+ if (newResults)
+ {
+ buildResultCorrelations();
+ }
+ else
+ {
+ // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+ for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ {
+ m_tupleSet[tupleNo].m_skip = false;
+ }
+ }
}
+ /**
+ * Recursively iterate all child results depth first.
+ * Filter away any result rows which should not be visible (yet) -
+ * Either due to incomplete child batches, or the join being an 'inner join'.
+ * Set result itterator state to 'before first' resultrow.
+ */
for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
{
const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- NdbResultStream& childStream = child.getResultStream(m_rootFragNo);
- childStream.handleBatchComplete();
+ NdbResultStream& childStream = m_rootFrag.getResultStream(child);
+ const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
- const bool isInnerJoin = child.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll;
- const bool allSubScansComplete = childStream.isAllSubScansComplete();
+ Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+ const bool skipNonMatches = !allSubScansComplete || // 1)
+ childStream.isInnerJoin(); // 2)
+
+ if (m_tupleSet!=NULL)
{
- if (!m_tupleSet[tupleNo].m_skip)
+ for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
{
- Uint16 tupleId = getTupleId(tupleNo);
- if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
- m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
- /////////////////////////////////
- // No child matched for this row. Making parent row visible
- // will cause a NULL (outer join) row to be produced.
- // Skip NULL row production when:
- // 1) Some child batches are not complete; they may contain later matches.
- // 2) A match was found in a previous batch.
- // 3) Join type is 'inner join', skip as no child are matching.
- //
- else if (!allSubScansComplete // 1)
- || m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo) // 2)
- || isInnerJoin) // 3)
- m_tupleSet[tupleNo].m_skip = true;
+ if (!m_tupleSet[tupleNo].m_skip)
+ {
+ Uint16 tupleId = getTupleId(tupleNo);
+ if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+ m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+ /////////////////////////////////
+ // No child matched for this row. Making parent row visible
+ // will cause a NULL (outer join) row to be produced.
+ // Skip NULL row production when:
+ // 1) Some child batches are not complete; they may contain later matches.
+ // 2) Join type is 'inner join', skip as no child are matching.
+ // 3) A match was found in a previous batch.
+ // Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+ //
+ else if (skipNonMatches // 1 & 2)
+ || m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+ m_tupleSet[tupleNo].m_skip = true;
+ }
}
}
+ isComplete &= allSubScansComplete;
}
- m_currentRow = tupleNotFound;
+
+ // Set current position 'before first'
m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+ m_currentRow = tupleNotFound;
+
+ return isComplete;
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to
+ * read the NdbResultStream.
+ */
+void
+NdbResultStream::buildResultCorrelations()
+{
+ // Buffer overrun check.
+ assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+ {
+ /* Clear the hashmap structures */
+ for (Uint32 i=0; i<m_maxRows; i++)
+ {
+ m_tupleSet[i].m_hash_head = tupleNotFound;
+ }
+
+ /* Rebuild correlation & hashmap from received buffers */
+ for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+ {
+ const Uint32* row = (Uint32*)getRow(tupleNo+1);
+ const TupleCorrelation correlation(row[-1]);
+
+ const Uint16 tupleId = correlation.getTupleId();
+ const Uint16 parentId = (m_parent!=NULL)
+ ? correlation.getParentTupleId()
+ : tupleNotFound;
+
+ m_tupleSet[tupleNo].m_skip = false;
+ m_tupleSet[tupleNo].m_parentId = parentId;
+ m_tupleSet[tupleNo].m_tupleId = tupleId;
+ m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+ /* Insert into parentId-hashmap */
+ const Uint16 hash = (parentId % m_maxRows);
+ if (m_parent==NULL)
+ {
+ /* Root stream: Insert sequentially in hash_next to make it
+ * possible to use ::findTupleWithParentId() and ::findNextTuple()
+ * to navigate even the root operation.
+ */
+ /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+ if (tupleNo==0)
+ m_tupleSet[hash].m_hash_head = tupleNo;
+ else
+ m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
+ m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
+ }
+ else
+ {
+ /* Insert parentId in HashMap */
+ m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+ m_tupleSet[hash].m_hash_head = tupleNo;
+ }
+ }
+ }
+} // NdbResultStream::buildResultCorrelations
///////////////////////////////////////////
-///////// NdbRootFragment methods ///////////
+//////// NdbRootFragment methods /////////
///////////////////////////////////////////
void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags,
Uint32 noOfFrags)
{
for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
{
- const Uint32 receiverId =
- frags[fragNo].getResultStream(0).getReceiver().getId();
+ const Uint32 receiverId = frags[fragNo].getReceiverId();
/**
* For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
* so we must do the opposite to get a good hash distribution.
@@ -890,6 +1000,7 @@ void NdbRootFragment::buildReciverIdMap(
}
}
+//static
NdbRootFragment*
NdbRootFragment::receiverIdLookup(NdbRootFragment* frags,
Uint32 noOfFrags,
@@ -903,9 +1014,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo
const int hash = (receiverId >> 2) % noOfFrags;
int current = frags[hash].m_idMapHead;
assert(current < static_cast<int>(noOfFrags));
- while (current >= 0 &&
- frags[current].getResultStream(0).getReceiver().getId()
- != receiverId)
+ while (current >= 0 && frags[current].getReceiverId() != receiverId)
{
current = frags[current].m_idMapNext;
assert(current < static_cast<int>(noOfFrags));
@@ -924,18 +1033,65 @@ NdbRootFragment::receiverIdLookup(NdbRoo
NdbRootFragment::NdbRootFragment():
m_query(NULL),
m_fragNo(voidFragNo),
+ m_resultStreams(NULL),
m_outstandingResults(0),
m_confReceived(false),
+ m_remainingScans(0),
m_idMapHead(-1),
m_idMapNext(-1)
{
}
+NdbRootFragment::~NdbRootFragment()
+{
+ assert(m_resultStreams==NULL);
+}
+
void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
{
assert(m_fragNo==voidFragNo);
m_query = &query;
m_fragNo = fragNo;
+
+ m_resultStreams = reinterpret_cast<NdbResultStream*>
+ (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
+ assert(m_resultStreams!=NULL);
+
+ for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++)
+ {
+ NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
+ new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
+ m_resultStreams[opNo].prepare();
+ }
+}
+
+/**
+ * Release what we want need anymore after last available row has been
+ * returned from datanodes.
+ */
+void
+NdbRootFragment::postFetchRelease()
+{
+ if (m_resultStreams != NULL)
+ {
+ for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
+ {
+ m_resultStreams[opNo].~NdbResultStream();
+ }
+ }
+ /**
+ * Don't 'delete' the object as it was in-place constructed from
+ * ResultStreamAlloc'ed memory. Memory is released by
+ * ResultStreamAlloc::reset().
+ */
+ m_resultStreams = NULL;
+}
+
+NdbResultStream&
+NdbRootFragment::getResultStream(Uint32 operationNo) const
+{
+ assert(m_resultStreams);
+ return m_resultStreams[operationNo];
}
void NdbRootFragment::reset()
@@ -943,9 +1099,31 @@ void NdbRootFragment::reset()
assert(m_fragNo!=voidFragNo);
assert(m_outstandingResults == 0);
assert(m_confReceived);
+
+ for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
+ {
+ if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+ {
+ /**
+ * Reset m_resultStreams[] and all its descendants, since all these
+ * streams will get a new set of rows in the next batch.
+ */
+ m_resultStreams[opNo].reset();
+ }
+ }
m_confReceived = false;
}
+void NdbRootFragment::prepareResultSet()
+{
+ NdbResultStream& rootStream = getResultStream(0);
+ rootStream.prepareResultSet(m_remainingScans);
+
+ /* Position at the first (sorted?) row available from this fragments.
+ */
+ rootStream.firstResult();
+}
+
void NdbRootFragment::setConfReceived()
{
/* For a query with a lookup root, there may be more than one TCKEYCONF
@@ -958,14 +1136,31 @@ void NdbRootFragment::setConfReceived()
bool NdbRootFragment::finalBatchReceived() const
{
- return getResultStream(0).getReceiver().m_tcPtrI==RNIL;
+ return getReceiverTcPtrI()==RNIL;
}
-bool NdbRootFragment::isEmpty() const
+bool NdbRootFragment::isEmpty() const
{
return getResultStream(0).isEmpty();
}
+/**
+ * SPJ requests are identified by the receiver-id of the
+ * *root* ResultStream for each RootFragment. Furthermore
+ * a NEXTREQ use the tcPtrI saved in this ResultStream to
+ * identify the 'cursor' to restart.
+ *
+ * We provide some convenient accessors for fetching this info
+ */
+Uint32 NdbRootFragment::getReceiverId() const
+{
+ return getResultStream(0).getReceiver().getId();
+}
+
+Uint32 NdbRootFragment::getReceiverTcPtrI() const
+{
+ return getResultStream(0).getReceiver().m_tcPtrI;
+}
///////////////////////////////////////////
///////// NdbQuery API methods ///////////
@@ -1475,7 +1670,14 @@ NdbQueryImpl::~NdbQueryImpl()
void
NdbQueryImpl::postFetchRelease()
{
- if (m_operations != NULL) {
+ if (m_rootFrags != NULL)
+ {
+ for (unsigned i=0; i<m_rootFragCount; i++)
+ { m_rootFrags[i].postFetchRelease();
+ }
+ }
+ if (m_operations != NULL)
+ {
for (unsigned i=0; i<m_countOperations; i++)
{ m_operations[i].postFetchRelease();
}
@@ -1986,6 +2188,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
NdbRootFragment* frag;
while ((frag=m_fullFrags.pop()) != NULL)
{
+ frag->prepareResultSet();
m_applFrags.add(*frag);
}
if (m_applFrags.getCurrent() != NULL)
@@ -2040,6 +2243,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
NdbRootFragment* frag;
if ((frag=m_fullFrags.pop()) != NULL)
{
+ frag->prepareResultSet();
m_applFrags.add(*frag);
}
assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2070,15 +2274,15 @@ NdbQueryImpl::awaitMoreResults(bool forc
returns: 'true' when application thread should be resumed.
*/
bool
-NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
+NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
{
if (traceSignals) {
- ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+ ndbout << "NdbQueryImpl::handleBatchComplete"
+ << ", fragNo=" << rootFrag.getFragNo()
<< ", pendingFrags=" << (m_pendingFrags-1)
<< ", finalBatchFrags=" << m_finalBatchFrags
<< endl;
}
- bool resume = false;
/* May received fragment data after a SCANREF() (timeout?)
* terminated the scan. We are about to close this query,
@@ -2086,8 +2290,6 @@ NdbQueryImpl::handleBatchComplete(Uint32
*/
if (likely(m_fullFrags.m_errorCode == 0))
{
- NdbQueryOperationImpl& root = getRoot();
- NdbRootFragment& rootFrag = m_rootFrags[fragNo];
assert(rootFrag.isFragBatchComplete());
assert(m_pendingFrags > 0); // Check against underflow.
@@ -2100,34 +2302,14 @@ NdbQueryImpl::handleBatchComplete(Uint32
assert(m_finalBatchFrags <= m_rootFragCount);
}
- if (getQueryDef().isScanQuery())
- {
- // Only required for scans
- root.getResultStream(fragNo).handleBatchComplete();
-
- // Only ordered scans has to wait until all pending completed
- resume = (m_pendingFrags==0) ||
- (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
- }
- else
- {
- assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
- assert(m_finalBatchFrags==1);
- assert(m_pendingFrags==0); // Lookup query should be complete now.
- resume = true;
- }
-
- /* Position at the first (sorted?) row available from this fragments.
- */
- root.m_resultStreams[fragNo]->firstResult();
-
/* When application thread ::awaitMoreResults() it will later be moved
* from m_fullFrags to m_applFrags under mutex protection.
*/
m_fullFrags.push(rootFrag);
+ return true;
}
- return resume;
+ return false;
} // NdbQueryImpl::handleBatchComplete
int
@@ -2272,22 +2454,22 @@ NdbQueryImpl::execTCKEYCONF()
ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
}
assert(!getQueryDef().isScanQuery());
+ NdbRootFragment& rootFrag = m_rootFrags[0];
// We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
- m_rootFrags[0].setConfReceived();
- m_rootFrags[0].incrOutstandingResults(-1);
+ rootFrag.setConfReceived();
+ rootFrag.incrOutstandingResults(-1);
bool ret = false;
- if (m_rootFrags[0].isFragBatchComplete())
+ if (rootFrag.isFragBatchComplete())
{
- ret = handleBatchComplete(0);
+ ret = handleBatchComplete(rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
<< ", m_pendingFrags=" << m_pendingFrags
- << ", *getRoot().m_resultStreams[0]="
- << *getRoot().m_resultStreams[0]
+ << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
<< endl;
}
return ret;
@@ -2364,9 +2546,7 @@ NdbQueryImpl::prepareSend()
// Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
error = m_pointerAlloc.init(m_rootFragCount *
(SharedFragStack::pointersPerFragment +
- OrderedFragSet::pointersPerFragment +
- // Pointers to NdbResultStream objects.
- getNoOfOperations()));
+ OrderedFragSet::pointersPerFragment));
if (error != 0)
{
setErrorCode(error);
@@ -2377,21 +2557,22 @@ NdbQueryImpl::prepareSend()
getRoot().calculateBatchedRows(NULL);
getRoot().setBatchedRows(1);
- /** Calculate total amount of row buffer space for all operations and
- * fragments.*/
+ /**
+ * Calculate total amount of row buffer space for all operations and
+ * fragments.
+ */
Uint32 totalBuffSize = 0;
for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
{
- NdbQueryOperationImpl& op = getQueryOperation(opNo);
-
- op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
- totalBuffSize += op.m_bufferSize;
+ const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+ totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
}
- /** Add one word per operation for buffer overrun check. We add a word
+ /**
+ * Add one word per ResultStream for buffer overrun check. We add a word
* rather than a byte to avoid possible alignment problems.
*/
- m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount +
- sizeof(Uint32) * getNoOfOperations());
+ m_rowBufferAlloc.init(m_rootFragCount *
+ (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
if (getQueryDef().isScanQuery())
{
Uint32 totalRows = 0;
@@ -2406,15 +2587,28 @@ NdbQueryImpl::prepareSend()
return -1;
}
}
- // 1. Build receiver structures for each QueryOperation.
- // 2. Fill in parameters (into ATTRINFO) for QueryTree.
- // (Has to complete *after* ::prepareReceiver() as QueryTree params
- // refer receiver id's.)
- //
+
+ /**
+ * Allocate and initialize fragment state variables.
+ * Will also cause a ResultStream object containing a
+ * NdbReceiver to be constructed for each operation in QueryTree
+ */
+ m_rootFrags = new NdbRootFragment[m_rootFragCount];
+ if (m_rootFrags == NULL)
+ {
+ setErrorCode(Err_MemoryAlloc);
+ return -1;
+ }
+ for (Uint32 i = 0; i<m_rootFragCount; i++)
+ {
+ m_rootFrags[i].init(*this, i); // Set fragment number.
+ }
+
+ // Fill in parameters (into ATTRINFO) for QueryTree.
for (Uint32 i = 0; i < m_countOperations; i++) {
- int error;
- if (unlikely((error = m_operations[i].prepareReceiver()) != 0)
- || (error = m_operations[i].prepareAttrInfo(m_attrInfo)) != 0) {
+ const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
+ if (unlikely(error))
+ {
setErrorCode(error);
return -1;
}
@@ -2450,23 +2644,6 @@ NdbQueryImpl::prepareSend()
return -1;
}
- /**
- * Allocate and initialize fragment state variables.
- */
- m_rootFrags = new NdbRootFragment[m_rootFragCount];
- if(m_rootFrags == NULL)
- {
- setErrorCode(Err_MemoryAlloc);
- return -1;
- }
- else
- {
- for(Uint32 i = 0; i<m_rootFragCount; i++)
- {
- m_rootFrags[i].init(*this, i); // Set fragment number.
- }
- }
-
if (getQueryDef().isScanQuery())
{
NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
@@ -2494,11 +2671,13 @@ class InitialReceiverIdIterator: public
{
public:
- InitialReceiverIdIterator(const NdbQueryImpl& query)
- :m_query(query),
+ InitialReceiverIdIterator(NdbRootFragment rootFrags[],
+ Uint32 cnt)
+ :m_rootFrags(rootFrags),
+ m_fragCount(cnt),
m_currFragNo(0)
{}
-
+
virtual ~InitialReceiverIdIterator() {};
/**
@@ -2519,8 +2698,11 @@ private:
* improving efficiency.
*/
static const Uint32 bufSize = 16;
- /** The query with the scan root operation that we list receiver ids for.*/
- const NdbQueryImpl& m_query;
+
+ /** Set of root fragments which we want to itterate receiver ids for.*/
+ NdbRootFragment* m_rootFrags;
+ const Uint32 m_fragCount;
+
/** The next fragment numnber to be processed. (Range for 0 to no of
* fragments.)*/
Uint32 m_currFragNo;
@@ -2530,25 +2712,25 @@ private:
const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz)
{
- sz = 0;
/**
* For the initial batch, we want to retrieve one batch for each fragment
* whether it is a sorted scan or not.
*/
- if (m_currFragNo >= m_query.getRootFragCount())
+ if (m_currFragNo >= m_fragCount)
{
+ sz = 0;
return NULL;
}
else
{
- const NdbQueryOperationImpl& root = m_query.getQueryOperation(0U);
- while (sz < bufSize &&
- m_currFragNo < m_query.getRootFragCount())
+ Uint32 cnt = 0;
+ while (cnt < bufSize && m_currFragNo < m_fragCount)
{
- m_receiverIds[sz] = root.getReceiver(m_currFragNo).getId();
- sz++;
+ m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
+ cnt++;
m_currFragNo++;
}
+ sz = cnt;
return m_receiverIds;
}
}
@@ -2690,7 +2872,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
* Section 2 : Optional KEYINFO section
*/
GenericSectionPtr secs[3];
- InitialReceiverIdIterator receiverIdIter(*this);
+ InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
@@ -2831,28 +3013,11 @@ Remark:
int
NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
{
- assert(getRoot().m_resultStreams!=NULL);
assert(!emptyFrag.finalBatchReceived());
assert(getQueryDef().isScanQuery());
- const Uint32 fragNo = emptyFrag.getFragNo();
emptyFrag.reset();
- for (unsigned opNo=0; opNo<m_countOperations; opNo++)
- {
- NdbResultStream& resultStream =
- getQueryOperation(opNo).getResultStream(fragNo);
-
- if (!resultStream.isSubScanComplete())
- {
- /**
- * Reset resultstream and all its descendants, since all these
- * streams will get a new set of rows in the next batch.
- */
- resultStream.reset();
- }
- }
-
Ndb& ndb = *getNdbTransaction().getNdb();
NdbApiSignal tSignal(&ndb);
tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
@@ -2868,8 +3033,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
scanNextReq->transId2 = (Uint32) (transId >> 32);
tSignal.setLength(ScanNextReq::SignalLength);
- const uint32 receiverId =
- emptyFrag.getResultStream(0).getReceiver().m_tcPtrI;
+ const uint32 receiverId = emptyFrag.getReceiverTcPtrI();
LinearSectionIterator receiverIdIter(&receiverId ,1);
GenericSectionPtr secs[1];
@@ -3302,9 +3466,9 @@ NdbQueryImpl::OrderedFragSet::getEmpty()
bool
NdbQueryImpl::OrderedFragSet::verifySortOrder() const
{
- for(int i = 0; i<m_activeFragCount-2; i++)
+ for (int i = 0; i<m_activeFragCount-1; i++)
{
- if(compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
+ if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
{
assert(false);
return false;
@@ -3364,10 +3528,7 @@ NdbQueryOperationImpl::NdbQueryOperation
m_parent(NULL),
m_children(def.getNoOfChildOperations()),
m_maxBatchRows(0), // >0: User specified prefered value, ==0: Use default CFG values
- m_resultStreams(NULL),
m_params(),
- m_bufferSize(0),
- m_batchOverflowCheck(NULL),
m_resultBuffer(NULL),
m_resultRef(NULL),
m_isRowNull(true),
@@ -3416,10 +3577,10 @@ NdbQueryOperationImpl::NdbQueryOperation
NdbQueryOperationImpl::~NdbQueryOperationImpl()
{
- // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
- // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
- assert (m_batchOverflowCheck == NULL);
- assert (m_resultStreams == NULL);
+ /**
+ * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
+ * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
+ */
assert (m_firstRecAttr == NULL);
assert (m_interpretedCode == NULL);
} //NdbQueryOperationImpl::~NdbQueryOperationImpl()
@@ -3431,22 +3592,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio
void
NdbQueryOperationImpl::postFetchRelease()
{
- // Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck == 0xacbd1234);
- m_batchOverflowCheck = NULL;
-
- if (m_resultStreams != NULL)
- {
- for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
- {
- if (m_resultStreams[i] != NULL)
- {
- m_resultStreams[i]->~NdbResultStream();
- }
- }
- }
- m_resultStreams = NULL;
-
Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
NdbRecAttr* recAttr = m_firstRecAttr;
while (recAttr != NULL) {
@@ -3680,7 +3825,7 @@ NdbQueryOperationImpl::firstResult()
if (rootFrag != NULL)
{
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
if (resultStream.firstResult() != tupleNotFound)
{
fetchRow(resultStream);
@@ -3720,7 +3865,7 @@ NdbQueryOperationImpl::nextResult(bool f
const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
if (rootFrag!=NULL)
{
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
if (resultStream.nextResult() != tupleNotFound)
{
fetchRow(resultStream);
@@ -3736,7 +3881,7 @@ NdbQueryOperationImpl::nextResult(bool f
void
NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
{
- const char* buff = resultStream.getReceiver().get_row();
+ const char* buff = resultStream.getReceiver().peek_row();
assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
m_isRowNull = false;
@@ -4051,55 +4196,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui
}
}
-
-int
-NdbQueryOperationImpl::prepareReceiver()
-{
- // Construct receiver streams and prepare them for receiving scan result
- assert(m_resultStreams==NULL);
- assert(m_queryImpl.getRootFragCount() > 0);
-
- m_resultStreams = reinterpret_cast<NdbResultStream**>
- (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount()));
-
- for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
- m_resultStreams[i] = NULL; // Init to legal contents for d'tor
- }
- for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
- m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
- NdbResultStream(*this, i);
- const int error = m_resultStreams[i]->prepare();
- if (unlikely(error)) {
- return error;
- }
-
- m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION,
- false, this);
- char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
- .allocObjMem(m_bufferSize));
- m_resultStreams[i]->getReceiver()
- .do_setup_ndbrecord(m_ndbRecord,
- getMaxBatchRows(),
- 0 /*key_size*/,
- 0 /*read_range_no*/,
- getRowSize(),
- rowBuf);
- m_resultStreams[i]->getReceiver().prepareSend();
- }
- // So that we can test for for buffer overrun.
- m_batchOverflowCheck =
- reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
- .allocObjMem(sizeof(Uint32)));
- *m_batchOverflowCheck = 0xacbd1234;
- return 0;
-}//NdbQueryOperationImpl::prepareReceiver
-
int
NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
{
- // ::prepareReceiver() need to complete first:
- assert (m_resultStreams != NULL);
-
const NdbQueryOperationDefImpl& def = getQueryOperationDef();
/**
@@ -4553,10 +4652,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
bool
NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
{
+ TupleCorrelation tupleCorrelation;
NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
- Uint32 rootFragNo = 0;
+
if (getQueryDef().isScanQuery())
{
+ const CorrelationData correlData(ptr, len);
const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
/** receiverId holds the Id of the receiver of the corresponding stream
@@ -4572,24 +4673,27 @@ NdbQueryOperationImpl::execTRANSID_AI(co
assert(false);
return false;
}
- rootFragNo = rootFrag->getFragNo();
+
+ // Extract tuple correlation.
+ tupleCorrelation = correlData.getTupleCorrelation();
+ len -= CorrelationData::wordCount;
}
+
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
<< ", operation no: " << getQueryOperationDef().getQueryOperationIx()
- << ", fragment no: " << rootFragNo
+ << ", fragment no: " << rootFrag->getFragNo()
<< endl;
}
// Process result values.
- m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
-
+ rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
rootFrag->incrOutstandingResults(-1);
bool ret = false;
if (rootFrag->isFragBatchComplete())
{
- ret = m_queryImpl.handleBatchComplete(rootFragNo);
+ ret = m_queryImpl.handleBatchComplete(*rootFrag);
}
if (traceSignals) {
@@ -4631,7 +4735,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons
}
}
- Uint32 rootFragNo = 0;
NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
if (ref->errorCode != DbspjErr::NodeFailure)
@@ -4656,13 +4759,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons
bool ret = false;
if (rootFrag.isFragBatchComplete())
{
- ret = m_queryImpl.handleBatchComplete(rootFragNo);
+ ret = m_queryImpl.handleBatchComplete(rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
- << ", *getRoot().m_resultStreams[0] {"
- << *getRoot().m_resultStreams[0] << "}"
+ << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
<< ", *this=" << *this << endl;
}
return ret;
@@ -4695,47 +4797,24 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
return false;
}
rootFrag->setConfReceived();
+ rootFrag->setRemainingSubScans(nodeMask);
rootFrag->incrOutstandingResults(rowCount);
// Handle for SCAN_NEXTREQ, RNIL -> EOF
- NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+ NdbResultStream& resultStream = rootFrag->getResultStream(*this);
resultStream.getReceiver().m_tcPtrI = tcPtrI;
if(traceSignals){
- ndbout << " resultStream(root) {" << resultStream << "} fragNo"
- << rootFrag->getFragNo() << endl;
+ ndbout << " resultStream {" << rootFrag->getResultStream(*this)
+ << "} fragNo" << rootFrag->getFragNo()
+ << endl;
}
- const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
- /* Mark each scan node to indicate if the current batch is the last in the
- * current sub-scan or not.
- */
- for (Uint32 opNo = 0; opNo < queryDef.getNoOfOperations(); opNo++)
- {
- const NdbQueryOperationImpl& op = m_queryImpl.getQueryOperation(opNo);
- /**
- * Find the node number seen by the SPJ block. Since a unique index
- * operation will have two distincts nodes in the tree used by the
- * SPJ block, this number may be different from 'opNo'.
- */
- const Uint32 internalOpNo = op.getQueryOperationDef().getQueryOperationId();
- assert(internalOpNo >= opNo);
- const bool complete = ((nodeMask >> internalOpNo) & 1) == 0;
-
- // Lookups should always be 'complete'
- assert(complete || op.getQueryOperationDef().isScanOperation());
- rootFrag->getResultStream(opNo).setSubScanCompletion(complete);
- }
- // Check that nodeMask does not have more bits than we have operations.
- assert(nodeMask >>
- (1+queryDef.getQueryOperation(queryDef.getNoOfOperations() - 1)
- .getQueryOperationId()) == 0);
-
bool ret = false;
if (rootFrag->isFragBatchComplete())
{
/* This fragment is now complete */
- ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo());
+ ret = m_queryImpl.handleBatchComplete(*rootFrag);
}
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
@@ -4871,13 +4950,6 @@ int NdbQueryOperationImpl::setBatchSize(
return 0;
}
-NdbResultStream&
-NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
-{
- assert(rootFragNo < getQuery().getRootFragCount());
- return *m_resultStreams[rootFragNo];
-}
-
bool
NdbQueryOperationImpl::hasInterpretedCode() const
{
@@ -4916,15 +4988,8 @@ NdbQueryOperationImpl::prepareInterprete
Uint32
NdbQueryOperationImpl::getIdOfReceiver() const {
- return m_resultStreams[0]->getReceiver().getId();
-}
-
-
-const NdbReceiver&
-NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
- assert(recNo<getQuery().getRootFragCount());
- assert(m_resultStreams!=NULL);
- return m_resultStreams[recNo]->getReceiver();
+ NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
+ return rootFrag.getResultStream(*this).getReceiver().getId();
}
Uint32 NdbQueryOperationImpl::getRowSize() const
@@ -4934,6 +4999,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
{
m_rowSize =
NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+ const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+ if (withCorrelation)
+ {
+ m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+ }
}
return m_rowSize;
}
@@ -4953,7 +5024,8 @@ NdbOut& operator<<(NdbOut& out, const Nd
out << " m_queryImpl: " << &op.m_queryImpl;
out << " m_operationDef: " << &op.m_operationDef;
for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
- out << " m_resultStream[" << i << "]{" << *op.m_resultStreams[i] << "}";
+ NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
+ out << " m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
}
out << " m_isRowNull " << op.m_isRowNull;
out << " ]";
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-16 07:56:53 +0000
@@ -247,9 +247,15 @@ public:
Uint32 getRootFragCount() const
{ return m_rootFragCount; }
+ NdbBulkAllocator& getResultStreamAlloc()
+ { return m_resultStreamAlloc; }
+
NdbBulkAllocator& getTupleSetAlloc()
{ return m_tupleSetAlloc; }
+ NdbBulkAllocator& getRowBufferAlloc()
+ { return m_rowBufferAlloc; }
+
private:
/** Possible return values from NdbQueryImpl::awaitMoreResults.
* A subset of the integer values also matches those returned
@@ -577,13 +583,10 @@ private:
* the result.
* @return: 'true' if its time to resume appl. threads
*/
- bool handleBatchComplete(Uint32 rootFragNo);
+ bool handleBatchComplete(NdbRootFragment& rootFrag);
NdbBulkAllocator& getPointerAlloc()
{ return m_pointerAlloc; }
-
- NdbBulkAllocator& getRowBufferAlloc()
- { return m_rowBufferAlloc; }
}; // class NdbQueryImpl
@@ -705,10 +708,6 @@ public:
int setInterpretedCode(const NdbInterpretedCode& code);
bool hasInterpretedCode() const;
- NdbResultStream& getResultStream(Uint32 rootFragNo) const;
-
- const NdbReceiver& getReceiver(Uint32 rootFragNo) const;
-
/** Verify magic number.*/
bool checkMagicNumber() const
{ return m_magic == MAGIC; }
@@ -719,6 +718,12 @@ public:
Uint32 getMaxBatchRows() const
{ return m_maxBatchRows; }
+ /** Get size of row as required to buffer it. */
+ Uint32 getRowSize() const;
+
+ const NdbRecord* getNdbRecord() const
+ { return m_ndbRecord; }
+
private:
STATIC_CONST (MAGIC = 0xfade1234);
@@ -742,16 +747,9 @@ private:
/** Max rows (per resultStream) in a scan batch.*/
Uint32 m_maxBatchRows;
- /** For processing results from this operation (Array of).*/
- NdbResultStream** m_resultStreams;
/** Buffer for parameters in serialized format */
Uint32Buffer m_params;
- /** Buffer size allocated for *each* ResultStream/Receiver when
- * fetching results.*/
- Uint32 m_bufferSize;
- /** Used for checking if buffer overrun occurred. */
- Uint32* m_batchOverflowCheck;
/** User specified buffer for final storage of result.*/
char* m_resultBuffer;
/** User specified pointer to application pointer that should be
@@ -819,9 +817,6 @@ private:
Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
void setBatchedRows(Uint32 batchedRows);
- /** Construct and prepare receiver streams for result processing. */
- int prepareReceiver();
-
/** Prepare ATTRINFO for execution. (Add execution params++)
* @return possible error code.*/
int prepareAttrInfo(Uint32Buffer& attrInfo);
@@ -863,7 +858,6 @@ private:
bool diskInUserProjection() const
{ return m_diskInUserProjection; }
- Uint32 getRowSize() const;
}; // class NdbQueryOperationImpl
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4425to 4426) | Pekka Nousiainen | 17 Aug |