3214 Jan Wedvik 2010-08-06
This commit fixes a problem related to running queries with scans driving
lookups driving scans.
There was a problem (in the SPJ block) with the algorithm for
deciding when to start a non-root scan in a query tree with multiple scans.
When the SPJ block had received a complete batch for a scan, it would try to
start any scan that is either a direct child or a descendant via a chain of
lookup operations. But if the second scan is *not* a direct child, it may also
depend on results from the lookup operations. Therefore, one should wait until
a complete batch of the parent scan *and* all results from the chain of lookup
operations between the two scans has been received. The algorithm has now been
modified accordingly.
The problem could be reproduced by running the following SQL code (on a two
node cluster):
---------------8<------------------
create database spj_ndb;
use spj_ndb;
set ndb_join_pushdown = true;
create table t3 (pk int primary key, u int not null, a int, b int) engine=ndb;
create index ix3 on t3(b,a);
insert into t3 values (0,1,10,20);
insert into t3 values (1,2,20,30);
insert into t3 values (2,3,30,40);
select * from t3 as x join t3 as y join t3 as z on x.u=y.pk and y.a=z.b;
---------------8<------------------
The final query would erroneously return one row rather than two. On one node,
all lookups actually returning rows would be local and therefore finish before
the parent scan. While on the other node, there was a remote lookup. The result
of that lookup would arrive after the parent scan had finished, and the SPJ
block would therefore falsely assume that there was no need to run the second
scan.
modified:
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
3213 Jan Wedvik 2010-08-06
This commit fixes a problems related to running queries with scans driving
lookups driving scans.
There was an error in the mechanism for marking those
NdbQueryOperations that had a scan decendant. The problem was that the
method for doing so (NdbQueryOperationDefImpl::markScanAncestors()) was
called before the links to parent operations had been established. It therefore
failed to traverse the chain of parents.
modified:
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
3212 Jan Wedvik 2010-07-15
Missing commit message that really applies to http://lists.mysql.com/commits/113698 :
This commit rewrites the mechanism for iterating over results for linked
operations with multiple scans such that it uses a top-down rather than bottom
up approach. We thus iterate linearly over the result tuples of the root
operation and match these with corrsponding descendant tuples. The motivation
for this change is:
* It simplifies handling of left outer join semantics.
* It (hopefully) simplifies the code in general.
* It should be easier to add support for "bushy" scan queries.
This commit contains a (preliminary) fix for for another problem: Non-root scan
operations would produce duplicate tuple identifiers for clusters with more
than one data node. This has been fixed by including the data node identifier in
the tuple id.
3211 Jan Wedvik 2010-07-15
Added more comments.
modified:
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2010-07-08 09:39:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2010-08-06 08:47:27 +0000
@@ -392,7 +392,8 @@ public:
* This function is called on the *child* by the *parent* when *parent*
* has completed a batch
*/
- void (Dbspj::*m_parent_batch_complete)(Signal*,Ptr<Request>,Ptr<TreeNode>);
+ void (Dbspj::*m_parent_batch_complete)(Signal*,Ptr<Request>,Ptr<TreeNode>,
+ Uint32 rowCount);
/**
* This function is called when getting a SCAN_NEXTREQ
@@ -427,6 +428,23 @@ public:
{
Uint32 m_api_resultRef;
Uint32 m_api_resultData;
+ /**
+ * This is the number of rows that we will ask for in this batch. This is
+ * equal to the number of rows that we will receive for the parent operation
+ * (in this batch). Until this number can be decided, this field will
+ * be set to 'invalidRowCount'.
+ */
+ Uint32 m_rows_requested;
+ /**
+ * This is the number of rows that we have received in this batch.
+ */
+ Uint32 m_rows_received;
+ /**
+ * This is the number of TCKEYREF messages received in this batch. When
+ * all lookup requests have been responded to, m_rows_requested should equal
+ * m_rows_received + m_rows_refused.
+ */
+ Uint32 m_rows_refused;
Uint32 m_lqhKeyReq[LqhKeyReq::FixedSignalLength + 4];
};
@@ -906,7 +924,8 @@ private:
Uint32 nodeFail(Signal*, Ptr<Request>, NdbNodeBitmask mask);
Uint32 createNode(Build_context&, Ptr<Request>, Ptr<TreeNode> &);
- void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+ Uint32 rowCount);
void releaseScanBuffers(Ptr<Request> requestPtr);
void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
@@ -1012,7 +1031,8 @@ private:
void lookup_execLQHKEYREF(Signal*, Ptr<Request>, Ptr<TreeNode>);
void lookup_execLQHKEYCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
void lookup_parent_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
- void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+ Uint32 rowCount);
void lookup_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
NdbNodeBitmask);
@@ -1039,7 +1059,8 @@ private:
void scanFrag_execSCAN_FRAGREF(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanFrag_execSCAN_FRAGCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanFrag_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr &);
- void scanFrag_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void scanFrag_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+ Uint32 rowCount);
void scanFrag_execSCAN_NEXTREQ(Signal*, Ptr<Request>,Ptr<TreeNode>);
void scanFrag_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanFrag_cleanup(Ptr<Request>, Ptr<TreeNode>);
@@ -1064,7 +1085,8 @@ private:
void scanIndex_batchComplete(Signal* signal);
Uint32 scanIndex_findFrag(Local_ScanIndexFrag_list &, Ptr<ScanIndexFrag>&,
Uint32 fragId);
- void scanIndex_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void scanIndex_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+ Uint32 rowCount);
void scanIndex_execSCAN_NEXTREQ(Signal*, Ptr<Request>,Ptr<TreeNode>);
void scanIndex_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanIndex_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2010-07-15 12:59:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2010-08-06 08:47:27 +0000
@@ -65,6 +65,7 @@
const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, 0 };
+static const Uint32 invalidRowCount = 0xffffffff;
/** A noop for now.*/
void Dbspj::execREAD_CONFIG_REQ(Signal* signal)
@@ -1404,7 +1405,7 @@ Dbspj::releaseRequestBuffers(Ptr<Request
void
Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr, Uint32 rowCount)
{
LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
@@ -1421,7 +1422,8 @@ Dbspj::reportBatchComplete(Signal * sign
childPtr.p->m_info->m_parent_batch_complete !=0 );
(this->*(childPtr.p->m_info->m_parent_batch_complete))(signal,
requestPtr,
- childPtr);
+ childPtr,
+ rowCount);
}
}
}
@@ -2490,6 +2492,9 @@ Dbspj::lookup_build(Build_context& ctx,
ctx.m_resultData = param->resultData;
treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
+ treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+ treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+ treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
/**
* Parse stuff common lookup/scan-frag
@@ -2786,6 +2791,24 @@ Dbspj::lookup_execTRANSID_AI(Signal* sig
ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
requestPtr.p->m_lookup_node_data[Tnode] -= 1;
+ treeNodePtr.p->m_lookup_data.m_rows_received++;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_lookup_data.m_rows_requested != invalidRowCount
+ && treeNodePtr.p->m_lookup_data.m_rows_requested ==
+ treeNodePtr.p->m_lookup_data.m_rows_received
+ + treeNodePtr.p->m_lookup_data.m_rows_refused)
+ {
+ jam();
+ // We have received all rows for this operation in this batch.
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ treeNodePtr.p->m_lookup_data.m_rows_received);
+ // Prepare for next batch.
+ treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+ treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+ treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+ }
+
checkBatchComplete(signal, requestPtr, 1);
}
@@ -2880,6 +2903,24 @@ Dbspj::lookup_execLQHKEYREF(Signal* sign
ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
+ treeNodePtr.p->m_lookup_data.m_rows_refused++;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_lookup_data.m_rows_requested != invalidRowCount
+ && treeNodePtr.p->m_lookup_data.m_rows_requested ==
+ treeNodePtr.p->m_lookup_data.m_rows_received
+ + treeNodePtr.p->m_lookup_data.m_rows_refused)
+ {
+ jam();
+ // We have received all rows for this operation in this batch.
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ treeNodePtr.p->m_lookup_data.m_rows_received);
+ // Prepare for next batch.
+ treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+ treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+ treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+ }
+
checkBatchComplete(signal, requestPtr, cnt);
}
@@ -3048,7 +3089,8 @@ Dbspj::lookup_parent_row(Signal* signal,
void
Dbspj::lookup_parent_batch_complete(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 rowCount)
{
jam();
@@ -3062,7 +3104,19 @@ Dbspj::lookup_parent_batch_complete(Sign
*/
ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE);
- reportBatchComplete(signal, requestPtr, treeNodePtr);
+ ndbassert(treeNodePtr.p->m_lookup_data.m_rows_requested == invalidRowCount);
+ treeNodePtr.p->m_lookup_data.m_rows_requested = rowCount;
+ if (treeNodePtr.p->m_lookup_data.m_rows_requested ==
+ treeNodePtr.p->m_lookup_data.m_rows_received +
+ treeNodePtr.p->m_lookup_data.m_rows_refused)
+ {
+ // The batch is complete for this operation as well.
+ reportBatchComplete(signal, requestPtr, treeNodePtr, rowCount);
+ // Prepare for next batch.
+ treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+ treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+ treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+ }
}
void
@@ -3584,7 +3638,8 @@ Dbspj::scanFrag_execTRANSID_AI(Signal* s
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
{
jam();
- reportBatchComplete(signal, requestPtr, treeNodePtr);
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ treeNodePtr.p->m_scanfrag_data.m_rows_received);
}
checkBatchComplete(signal, requestPtr, 1);
@@ -3654,7 +3709,8 @@ Dbspj::scanFrag_execSCAN_FRAGCONF(Signal
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
{
jam();
- reportBatchComplete(signal, requestPtr, treeNodePtr);
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ treeNodePtr.p->m_scanfrag_data.m_rows_received);
}
checkBatchComplete(signal, requestPtr, 1);
@@ -3710,7 +3766,8 @@ Dbspj::scanFrag_parent_row(Signal* signa
void
Dbspj::scanFrag_parent_batch_complete(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 rowCount)
{
jam();
ndbrequire(false);
@@ -4363,7 +4420,8 @@ Dbspj::scanIndex_parent_row(Signal* sign
* We being a T_ONE_SHOT means that we're only be called
* with parent_row once, i.e batch is complete
*/
- scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
+ scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr,
+ data.m_rows_received);
}
return;
@@ -4412,7 +4470,8 @@ Dbspj::scanIndex_fixupBound(Ptr<ScanInde
void
Dbspj::scanIndex_parent_batch_complete(Signal* signal,
Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+ Ptr<TreeNode> treeNodePtr,
+ Uint32 rowCount)
{
jam();
@@ -4617,7 +4676,8 @@ Dbspj::scanIndex_execTRANSID_AI(Signal*
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
{
jam();
- reportBatchComplete(signal, requestPtr, treeNodePtr);
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ data.m_rows_received);
}
checkBatchComplete(signal, requestPtr, 1);
@@ -4682,7 +4742,8 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
{
jam();
- reportBatchComplete(signal, requestPtr, treeNodePtr);
+ reportBatchComplete(signal, requestPtr, treeNodePtr,
+ data.m_rows_received);
}
checkBatchComplete(signal, requestPtr, 1);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2010-07-15 17:51:57 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2010-08-06 08:44:14 +0000
@@ -830,6 +830,7 @@ NdbQueryBuilder::scanTable(const NdbDict
returnErrIf(op==0, Err_MemoryAlloc);
m_pimpl->m_operations.push_back(op);
+ op->markScanAncestors();
return &op->m_interface;
}
@@ -905,6 +906,7 @@ NdbQueryBuilder::scanIndex(const NdbDict
}
m_pimpl->m_operations.push_back(op);
+ op->markScanAncestors();
return &op->m_interface;
}
@@ -1723,6 +1725,8 @@ NdbQueryOperationDefImpl::addColumnRef(c
void NdbQueryOperationDefImpl::markScanAncestors()
{
+ // Verify that parent links have been established.
+ assert(m_ix == 0 || m_parent != NULL);
NdbQueryOperationDefImpl* operation = this;
do
{
@@ -2248,9 +2252,7 @@ NdbQueryScanOperationDefImpl::NdbQuerySc
const char* ident,
Uint32 ix)
: NdbQueryOperationDefImpl(table,options,ident,ix)
-{
- markScanAncestors();
-}
+{}
int
NdbQueryScanOperationDefImpl::serialize(Uint32Buffer& serializedDef,
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20100806084727-7lryn051n4220bjj.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3211to 3214) | Jan Wedvik | 6 Aug |