3534 Ole John Aske 2011-08-16 [merge]
Merge
modified:
mysql-test/r/group_by.result
mysql-test/t/group_by.test
sql/sql_select.cc
storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3533 Ole John Aske 2011-08-16 [merge]
Merge
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'mysql-test/r/group_by.result'
--- a/mysql-test/r/group_by.result 2010-10-29 08:23:06 +0000
+++ b/mysql-test/r/group_by.result 2011-08-16 10:20:19 +0000
@@ -1856,3 +1856,18 @@ COUNT(*)
2
DROP TABLE t1;
# End of 5.1 tests
+#
+# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+#
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk
+GROUP BY v1.pk;
+pk
+DROP VIEW v1;
+DROP TABLE t1,t2;
+# End of Bug#12798270
=== modified file 'mysql-test/t/group_by.test'
--- a/mysql-test/t/group_by.test 2010-10-29 08:23:06 +0000
+++ b/mysql-test/t/group_by.test 2011-08-16 10:20:19 +0000
@@ -1248,3 +1248,24 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1;
DROP TABLE t1;
--echo # End of 5.1 tests
+
+--echo #
+--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+--echo #
+
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk
+GROUP BY v1.pk;
+
+DROP VIEW v1;
+DROP TABLE t1,t2;
+
+--echo # End of Bug#12798270
=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc 2011-07-09 11:16:31 +0000
+++ b/sql/sql_select.cc 2011-08-16 13:33:13 +0000
@@ -6948,7 +6948,15 @@ make_join_readinfo(JOIN *join, ulonglong
(join->sort_by_table == (TABLE *) 1 && i != join->const_tables)))
ordered_set= 1;
+#ifdef MCP_BUG12798270
tab->sorted= sorted;
+#else
+ /*
+ For eq_ref there is at most one join match for each row from
+ previous tables so ordering is not useful.
+ */
+ tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false;
+#endif
sorted= 0; // only first must be sorted
table->status=STATUS_NO_RECORD;
pick_table_access_method (tab);
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-08-16 08:47:35 +0000
@@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
{
const Frag& frag = node.m_frag;
const Index& index = *c_indexPool.getPtr(frag.m_indexId);
- KeyData prefKey(index.m_keySpec, false, 0);
- prefKey.set_buf(node.getPref(), index.m_prefBytes);
+ /*
+ * bug#12873640
+ * Node prefix exists if it has non-zero number of attributes. It is
+ * then a partial instance of KeyData. If the prefix does not exist
+ * then set_buf() could overwrite m_pageId1 in first entry, causing
+ * random crash in TUP via readKeyAttrs().
+ */
if (index.m_prefAttrs > 0) {
+ KeyData prefKey(index.m_keySpec, false, 0);
+ prefKey.set_buf(node.getPref(), index.m_prefBytes);
jam();
readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
- }
#ifdef VM_TRACE
- if (debugFlags & DebugMaint) {
- debugOut << "setNodePref: " << node;
- debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
- debugOut << endl;
- }
+ if (debugFlags & DebugMaint) {
+ debugOut << "setNodePref: " << node;
+ debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
+ }
#endif
+ }
}
// node operations
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-08-16 08:27:14 +0000
@@ -39,6 +39,16 @@ static const char * f_method = "MSms";
#endif
#define MAX_CHUNKS 10
+#ifdef VM_TRACE
+#ifndef NDBD_RANDOM_START_PAGE
+#define NDBD_RANDOM_START_PAGE
+#endif
+#endif
+
+#ifdef NDBD_RANDOM_START_PAGE
+static Uint32 g_random_start_page_id = 0;
+#endif
+
/*
* For muti-threaded ndbd, these calls are used for locking around
* memory allocation operations.
@@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager()
mt_mem_manager_init();
}
+void*
+Ndbd_mem_manager::get_memroot() const
+{
+#ifdef NDBD_RANDOM_START_PAGE
+ return (void*)(m_base_page - g_random_start_page_id);
+#else
+ return (void*)m_base_page;
+#endif
+}
+
/**
*
* resource 0 has following semantics:
@@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun
}
#endif
+#ifdef NDBD_RANDOM_START_PAGE
+ /**
+ * In order to find bad-users of page-id's
+ * we add a random offset to the page-id's returned
+ * however, due to ZONE_LO that offset can't be that big
+ * (since we at get_page don't know if it's a HI/LO page)
+ */
+ Uint32 max_rand_start = ZONE_LO_BOUND - 1;
+ if (max_rand_start > pages)
+ {
+ max_rand_start -= pages;
+ if (max_rand_start > 0x10000)
+ g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000));
+ else if (max_rand_start)
+ g_random_start_page_id = rand() % max_rand_start;
+
+ assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF);
+
+ ndbout_c("using g_random_start_page_id: %u (%.8x)",
+ g_random_start_page_id, g_random_start_page_id);
+ }
+#endif
+
/**
* Do malloc
*/
@@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone,
return;
* pages = save;
}
-
+
alloc_impl(ZONE_LO, ret, pages, min);
}
@@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type
check_resource_limits(m_resource_limit);
mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+ return m_base_page + *i - g_random_start_page_id;
+#else
return m_base_page + *i;
+#endif
}
}
mt_mem_manager_unlock();
@@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty
mt_mem_manager_lock();
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
-
+
+#ifdef NDBD_RANDOM_START_PAGE
+ i -= g_random_start_page_id;
+#endif
+
Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
release(i, 1);
m_resource_limit[0].m_curr = tot.m_curr - 1;
@@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ
m_resource_limit[idx].m_curr = rl.m_curr + req;
check_resource_limits(m_resource_limit);
mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+#endif
return ;
}
mt_mem_manager_unlock();
* cnt = req;
+#ifdef NDBD_RANDOM_START_PAGE
+ *i += g_random_start_page_id;
+#endif
return;
}
@@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t
mt_mem_manager_lock();
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
-
+
+#ifdef NDBD_RANDOM_START_PAGE
+ i -= g_random_start_page_id;
+#endif
+
release(i, cnt);
Uint32 currnew = rl.m_curr - cnt;
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-08-16 13:33:13 +0000
@@ -68,7 +68,7 @@ public:
bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true);
void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0);
- void* get_memroot() const { return (void*)m_base_page;}
+ void* get_memroot() const;
void dump() const ;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 08:15:13 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:33:13 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
* children. That way, results for child operations can be updated correctly
* when the application iterates over the results of the root scan operation.
*/
+class TupleCorrelation
+{
+public:
+ static const Uint32 wordCount = 1;
+
+ explicit TupleCorrelation()
+ : m_correlation((tupleNotFound<<16) | tupleNotFound)
+ {}
+
+ /** Conversion to/from Uint32 to store/fetch from buffers */
+ explicit TupleCorrelation(Uint32 val)
+ : m_correlation(val)
+ {}
+ Uint32 toUint32() const
+ { return m_correlation; }
+
+ Uint16 getTupleId() const
+ { return m_correlation & 0xffff;}
+
+ Uint16 getParentTupleId() const
+ { return m_correlation >> 16;}
+
+private:
+ Uint32 m_correlation;
+}; // class TupleCorrelation
+
class CorrelationData
{
public:
@@ -99,18 +125,15 @@ public:
assert(AttributeHeader(m_corrPart[0]).getAttributeId()
== AttributeHeader::CORR_FACTOR64);
assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
- assert(getTupleId()<tupleNotFound);
- assert(getParentTupleId()<tupleNotFound);
+ assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+ assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
}
Uint32 getRootReceiverId() const
{ return m_corrPart[2];}
- Uint16 getTupleId() const
- { return m_corrPart[1] & 0xffff;}
-
- Uint16 getParentTupleId() const
- { return m_corrPart[1] >> 16;}
+ const TupleCorrelation getTupleCorrelation() const
+ { return TupleCorrelation(m_corrPart[1]); }
private:
const Uint32* const m_corrPart;
@@ -166,6 +189,11 @@ public:
*/
void reset();
+ /**
+ * Prepare for reading another batch of results.
+ */
+ void prepareResultSet();
+
void incrOutstandingResults(Int32 delta)
{
m_outstandingResults += delta;
@@ -307,6 +335,9 @@ public:
Uint32 getRowCount() const
{ return m_rowCount; }
+ char* getRow(Uint32 tupleNo) const
+ { return (m_buffer + (tupleNo*m_rowSize)); }
+
/**
* Process an incomming tuple for this stream. Extract parent and own tuple
* ids and pass it on to m_receiver.
@@ -314,12 +345,15 @@ public:
* @param ptr buffer holding tuple.
* @param len buffer length.
*/
- void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+ void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+ TupleCorrelation correlation);
- /** A complete batch has been received for a fragment on this NdbResultStream,
- * Update whatever required before the appl. are allowed to navigate the result.
+ /**
+ * A complete batch has been received for a fragment on this NdbResultStream,
+ * Update whatever required before the appl. are allowed to navigate the result.
+ * @return true if node and all its siblings have returned all rows.
*/
- void handleBatchComplete();
+ bool prepareResultSet();
/**
* Navigate within the current result batch to resp. first and next row.
@@ -364,12 +398,6 @@ public:
return m_subScanComplete;
}
- /** Variant of isSubScanComplete() above which checks that this resultstream
- * and all its descendants have consumed all batches of rows instantiated
- * from their parent operation(s).
- */
- bool isAllSubScansComplete() const;
-
bool isScanQuery() const
{ return (m_properties & Is_Scan_Query); }
@@ -420,7 +448,7 @@ public:
* that had no matching children.*/
Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
- explicit TupleSet()
+ explicit TupleSet() : m_hash_head(tupleNotFound)
{}
private:
@@ -452,9 +480,14 @@ private:
/** The receiver object that unpacks transid_AI messages.*/
NdbReceiver m_receiver;
+ /** The buffers which we receive the results into */
+ char* m_buffer;
+
/** Used for checking if buffer overrun occurred. */
Uint32* m_batchOverflowCheck;
+ Uint32 m_rowSize;
+
/** The number of transid_AI messages received.*/
Uint32 m_rowCount;
@@ -489,11 +522,7 @@ private:
/** TupleSet contains the correlation between parent/childs */
TupleSet* m_tupleSet;
- void clearTupleSet();
-
- void setParentChildMap(Uint16 parentId,
- Uint16 tupleId,
- Uint16 tupleNo);
+ void buildResultCorrelations();
Uint16 getTupleId(Uint16 tupleNo) const
{ return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -576,7 +605,9 @@ NdbResultStream::NdbResultStream(NdbQuer
| (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
? Is_Inner_Join : 0))),
m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
+ m_buffer(NULL),
m_batchOverflowCheck(NULL),
+ m_rowSize(0),
m_rowCount(0),
m_iterState(Iter_notStarted),
m_currentRow(tupleNotFound),
@@ -599,6 +630,8 @@ NdbResultStream::prepare()
const Uint32 rowSize = m_operation.getRowSize();
NdbQueryImpl &query = m_operation.getQuery();
+ m_rowSize = rowSize;
+
/* Parent / child correlation is only relevant for scan type queries
* Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
* Neither is these structures required for operations not having respective
@@ -610,16 +643,13 @@ NdbResultStream::prepare()
m_tupleSet =
new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
TupleSet[m_maxRows];
-
- clearTupleSet();
}
else
m_maxRows = 1;
const int bufferSize = rowSize * m_maxRows;
NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
- char* const rowBuf = reinterpret_cast<char*>(bufferAlloc
- .allocObjMem(bufferSize));
+ m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
// So that we can test for buffer overrun.
m_batchOverflowCheck =
@@ -633,7 +663,7 @@ NdbResultStream::prepare()
0 /*key_size*/,
0 /*read_range_no*/,
rowSize,
- rowBuf);
+ m_buffer);
} //NdbResultStream::prepare
void
@@ -646,7 +676,6 @@ NdbResultStream::reset()
m_iterState = Iter_notStarted;
m_currentRow = tupleNotFound;
- clearTupleSet();
m_receiver.prepareSend();
/**
* If this stream will get new rows in the next batch, then so will
@@ -660,84 +689,10 @@ NdbResultStream::reset()
}
} //NdbResultStream::reset
-void
-NdbResultStream::clearTupleSet()
-{
- assert (isScanQuery());
- for (Uint32 i=0; i<m_maxRows; i++)
- {
- m_tupleSet[i].m_parentId = tupleNotFound;
- m_tupleSet[i].m_tupleId = tupleNotFound;
- m_tupleSet[i].m_hash_head = tupleNotFound;
- m_tupleSet[i].m_skip = false;
- m_tupleSet[i].m_hasMatchingChild.clear();
- }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{
- // Lookups should always be 'complete'
- assert(m_subScanComplete || isScanResult());
-
- if (!m_subScanComplete)
- return false;
-
- for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
- childNo++)
- {
- const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
- const NdbResultStream& childStream = m_rootFrag.getResultStream(child);
- if (!childStream.isAllSubScansComplete())
- return false;
- }
- return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
- Uint16 tupleId,
- Uint16 tupleNo)
-{
- assert (isScanQuery());
- assert (tupleNo < m_maxRows);
- assert (tupleId != tupleNotFound);
-
- for (Uint32 i = 0; i < tupleNo; i++)
- {
- // Check that tuple id is unique.
- assert (m_tupleSet[i].m_tupleId != tupleId);
- }
- m_tupleSet[tupleNo].m_parentId = parentId;
- m_tupleSet[tupleNo].m_tupleId = tupleId;
-
- const Uint16 hash = (parentId % m_maxRows);
- if (parentId == tupleNotFound)
- {
- /* Root stream: Insert sequentially in hash_next to make it
- * possible to use ::findTupleWithParentId() and ::findNextTuple()
- * to navigate even the root operation.
- */
- assert (m_parent==NULL);
- /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
- if (tupleNo==0)
- m_tupleSet[hash].m_hash_head = 0;
- else
- m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
- m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
- }
- else
- {
- /* Insert parentId in HashMap */
- m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
- m_tupleSet[hash].m_hash_head = tupleNo;
- }
-}
-
/** Locate, and return 'tupleNo', of first tuple with specified parentId.
* parentId == tupleNotFound is use as a special value for iterating results
- * from the root operation in the order which they was inserted by ::setParentChildMap()
+ * from the root operation in the order which they was inserted by
+ * ::buildResultCorrelations()
*
* Position of 'currentRow' is *not* updated and should be modified by callee
* if it want to keep the new position.
@@ -843,99 +798,188 @@ NdbResultStream::nextResult()
} //NdbResultStream::nextResult()
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+ TupleCorrelation correlation)
{
- assert(m_iterState == Iter_notStarted);
+ m_receiver.execTRANSID_AI(ptr, len);
+ m_rowCount++;
+
if (isScanQuery())
{
- const CorrelationData correlData(ptr, len);
-
- assert(m_rootFrag.getResultStream(0).m_receiver.getId() ==
- correlData.getRootReceiverId());
-
- m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
-
/**
- * Keep correlation data between parent and child tuples.
- * Since tuples may arrive in any order, we cannot match
- * parent and child until all tuples (for this batch and
- * root fragment) have arrived.
+ * Store TupleCorrelation as hidden value imm. after received row
+ * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
*/
- setParentChildMap(m_parent==NULL
- ? tupleNotFound
- : correlData.getParentTupleId(),
- correlData.getTupleId(),
- m_rowCount);
+ Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+ row_recv[-1] = correlation.toUint32();
}
- else
- {
- // Lookup query.
- m_receiver.execTRANSID_AI(ptr, len);
- }
- m_rowCount++;
- /* Set correct #rows received in the NdbReceiver.
- */
- getReceiver().m_result_rows = getRowCount();
} // NdbResultStream::execTRANSID_AI()
-
/**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ * - Fill in parent/child result correlations in m_tupleSet[]
+ * - ... or reset m_tupleSet[] if we reuse the previous.
+ * - Apply inner/outer join filtering to remove non qualifying
+ * rows.
*/
-void
-NdbResultStream::handleBatchComplete()
+bool
+NdbResultStream::prepareResultSet()
{
- // Buffer overrun check.
- assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+ bool isComplete = isSubScanComplete(); //Childs with more rows
+ assert(isComplete || isScanResult()); //Lookups always 'complete'
+
+ // Set correct #rows received in the NdbReceiver.
+ getReceiver().m_result_rows = getRowCount();
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ /**
+ * Prepare NdbResultStream for reading - either the next received
+ * from datanodes or reuse current.
+ */
+ if (m_tupleSet!=NULL)
{
- m_tupleSet[tupleNo].m_skip = false;
+ const bool newResults = (m_iterState!=Iter_finished);
+ if (newResults)
+ {
+ buildResultCorrelations();
+ }
+ else
+ {
+ // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+ for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ {
+ m_tupleSet[tupleNo].m_skip = false;
+ }
+ }
}
+ /**
+ * Recursively iterate all child results depth first.
+ * Filter away any result rows which should not be visible (yet) -
+ * Either due to incomplete child batches, or the join being an 'inner join'.
+ * Set result itterator state to 'before first' resultrow.
+ */
for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
{
const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
NdbResultStream& childStream = m_rootFrag.getResultStream(child);
- childStream.handleBatchComplete();
+ const bool allSubScansComplete = childStream.prepareResultSet();
+
+ Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
- const bool isInnerJoin = childStream.isInnerJoin();
- const bool allSubScansComplete = childStream.isAllSubScansComplete();
+ /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+ const bool skipNonMatches = !allSubScansComplete || // 1)
+ childStream.isInnerJoin(); // 2)
- for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+ if (m_tupleSet!=NULL)
{
- if (!m_tupleSet[tupleNo].m_skip)
+ for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
{
- Uint16 tupleId = getTupleId(tupleNo);
- if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
- m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
- /////////////////////////////////
- // No child matched for this row. Making parent row visible
- // will cause a NULL (outer join) row to be produced.
- // Skip NULL row production when:
- // 1) Some child batches are not complete; they may contain later matches.
- // 2) A match was found in a previous batch.
- // 3) Join type is 'inner join', skip as no child are matching.
- //
- else if (!allSubScansComplete // 1)
- || m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo) // 2)
- || isInnerJoin) // 3)
- m_tupleSet[tupleNo].m_skip = true;
+ if (!m_tupleSet[tupleNo].m_skip)
+ {
+ Uint16 tupleId = getTupleId(tupleNo);
+ if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+ m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+ /////////////////////////////////
+ // No child matched for this row. Making parent row visible
+ // will cause a NULL (outer join) row to be produced.
+ // Skip NULL row production when:
+ // 1) Some child batches are not complete; they may contain later matches.
+ // 2) Join type is 'inner join', skip as no child are matching.
+ // 3) A match was found in a previous batch.
+ // Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+ //
+ else if (skipNonMatches // 1 & 2)
+ || m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+ m_tupleSet[tupleNo].m_skip = true;
+ }
}
}
+ isComplete &= allSubScansComplete;
}
- m_currentRow = tupleNotFound;
+
+ // Set current position 'before first'
m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+ m_currentRow = tupleNotFound;
+
+ return isComplete;
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to
+ * read the NdbResultStream.
+ */
+void
+NdbResultStream::buildResultCorrelations()
+{
+ // Buffer overrun check.
+ assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+ {
+ /* Clear the hashmap structures */
+ for (Uint32 i=0; i<m_maxRows; i++)
+ {
+ m_tupleSet[i].m_hash_head = tupleNotFound;
+ }
+
+ /* Rebuild correlation & hashmap from received buffers */
+ for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+ {
+ const Uint32* row = (Uint32*)getRow(tupleNo+1);
+ const TupleCorrelation correlation(row[-1]);
+
+ const Uint16 tupleId = correlation.getTupleId();
+ const Uint16 parentId = (m_parent!=NULL)
+ ? correlation.getParentTupleId()
+ : tupleNotFound;
+
+ m_tupleSet[tupleNo].m_skip = false;
+ m_tupleSet[tupleNo].m_parentId = parentId;
+ m_tupleSet[tupleNo].m_tupleId = tupleId;
+ m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+ /* Insert into parentId-hashmap */
+ const Uint16 hash = (parentId % m_maxRows);
+ if (m_parent==NULL)
+ {
+ /* Root stream: Insert sequentially in hash_next to make it
+ * possible to use ::findTupleWithParentId() and ::findNextTuple()
+ * to navigate even the root operation.
+ */
+ /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+ if (tupleNo==0)
+ m_tupleSet[hash].m_hash_head = tupleNo;
+ else
+ m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
+ m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
+ }
+ else
+ {
+ /* Insert parentId in HashMap */
+ m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+ m_tupleSet[hash].m_hash_head = tupleNo;
+ }
+ }
+ }
+} // NdbResultStream::buildResultCorrelations
///////////////////////////////////////////
-///////// NdbRootFragment methods ///////////
+//////// NdbRootFragment methods /////////
///////////////////////////////////////////
void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags,
Uint32 noOfFrags)
@@ -1056,6 +1100,16 @@ void NdbRootFragment::reset()
m_confReceived = false;
}
+void NdbRootFragment::prepareResultSet()
+{
+ NdbResultStream& rootStream = getResultStream(0);
+ rootStream.prepareResultSet();
+
+ /* Position at the first (sorted?) row available from this fragments.
+ */
+ rootStream.firstResult();
+}
+
void NdbRootFragment::setConfReceived()
{
/* For a query with a lookup root, there may be more than one TCKEYCONF
@@ -2120,6 +2174,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
NdbRootFragment* frag;
while ((frag=m_fullFrags.pop()) != NULL)
{
+ frag->prepareResultSet();
m_applFrags.add(*frag);
}
if (m_applFrags.getCurrent() != NULL)
@@ -2174,6 +2229,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
NdbRootFragment* frag;
if ((frag=m_fullFrags.pop()) != NULL)
{
+ frag->prepareResultSet();
m_applFrags.add(*frag);
}
assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2213,7 +2269,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
<< ", finalBatchFrags=" << m_finalBatchFrags
<< endl;
}
- bool resume = false;
/* May received fragment data after a SCANREF() (timeout?)
* terminated the scan. We are about to close this query,
@@ -2221,7 +2276,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
*/
if (likely(m_fullFrags.m_errorCode == 0))
{
- NdbQueryOperationImpl& root = getRoot();
assert(rootFrag.isFragBatchComplete());
assert(m_pendingFrags > 0); // Check against underflow.
@@ -2234,34 +2288,14 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
assert(m_finalBatchFrags <= m_rootFragCount);
}
- if (getQueryDef().isScanQuery())
- {
- // Only required for scans
- rootFrag.getResultStream(root).handleBatchComplete();
-
- // Only ordered scans has to wait until all pending completed
- resume = (m_pendingFrags==0) ||
- (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
- }
- else
- {
- assert(rootFrag.getResultStream(root).getReceiver().m_tcPtrI==RNIL);
- assert(m_finalBatchFrags==1);
- assert(m_pendingFrags==0); // Lookup query should be complete now.
- resume = true;
- }
-
- /* Position at the first (sorted?) row available from this fragments.
- */
- rootFrag.getResultStream(root).firstResult();
-
/* When application thread ::awaitMoreResults() it will later be moved
* from m_fullFrags to m_applFrags under mutex protection.
*/
m_fullFrags.push(rootFrag);
+ return true;
}
- return resume;
+ return false;
} // NdbQueryImpl::handleBatchComplete
int
@@ -4619,10 +4653,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
bool
NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
{
+ TupleCorrelation tupleCorrelation;
NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
- Uint32 rootFragNo = 0;
+
if (getQueryDef().isScanQuery())
{
+ const CorrelationData correlData(ptr, len);
const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
/** receiverId holds the Id of the receiver of the corresponding stream
@@ -4638,18 +4674,21 @@ NdbQueryOperationImpl::execTRANSID_AI(co
assert(false);
return false;
}
- rootFragNo = rootFrag->getFragNo();
+
+ // Extract tuple correlation.
+ tupleCorrelation = correlData.getTupleCorrelation();
+ len -= CorrelationData::wordCount;
}
+
if (traceSignals) {
ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
<< ", operation no: " << getQueryOperationDef().getQueryOperationIx()
- << ", fragment no: " << rootFragNo
+ << ", fragment no: " << rootFrag->getFragNo()
<< endl;
}
// Process result values.
- rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len);
-
+ rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
rootFrag->incrOutstandingResults(-1);
bool ret = false;
@@ -4985,6 +5024,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
{
m_rowSize =
NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+ const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+ if (withCorrelation)
+ {
+ m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+ }
}
return m_rowSize;
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3533 to 3534) | Ole John Aske | 17 Aug |