List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:August 6 2010 8:48am
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3211
to 3214)
View as plain text  
 3214 Jan Wedvik	2010-08-06
      This commit fixes a problem related to running queries with scans driving 
      lookups driving scans.
      
      There was a problem (in the SPJ block) with the algorithm for 
      deciding when to start a non-root scan in a query tree with multiple scans.
      When the SPJ block had received a complete batch for a scan, it would try to 
      start any scan that is either a direct child or a descendant via a chain of 
      lookup operations. But if the second scan is *not* a direct child, it may also
      depend on results from the lookup operations. Therefore, one should wait until
      a complete batch of the parent scan *and* all results from the chain of lookup 
      operations between the two scans has been received. The algorithm has now been
      modified accordingly.
      
      The problem could be reproduced by running the following SQL code (on a two 
      node cluster):
      
      ---------------8<------------------
      create database spj_ndb;
      use spj_ndb;
      set ndb_join_pushdown = true;
      
      create table t3 (pk int primary key, u int not null, a int, b int) engine=ndb;
      create index ix3 on t3(b,a);
      
      insert into t3 values (0,1,10,20);
      insert into t3 values (1,2,20,30);
      insert into t3 values (2,3,30,40);
      
      select * from t3 as x join t3 as y join t3 as z on x.u=y.pk and y.a=z.b;
      ---------------8<------------------
      
      The final query would erroneously return one row rather than two. On one node, 
      all lookups actually returning rows would be local and therefore finish before 
      the parent scan. While on the other node, there was a remote lookup. The result
      of that lookup would arrive after the parent scan had finished, and the SPJ
      block would therefore falsely assume that there was no need to run the second
      scan.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
 3213 Jan Wedvik	2010-08-06
      This commit fixes a problems related to running queries with scans driving 
      lookups driving scans.
      
      There was an error in the mechanism for marking those 
      NdbQueryOperations that had a scan decendant. The problem was that the 
      method for doing so (NdbQueryOperationDefImpl::markScanAncestors()) was
      called before the links to parent operations had been established. It therefore
      failed to traverse the chain of parents.

    modified:
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
 3212 Jan Wedvik	2010-07-15
      Missing commit message that really applies to http://lists.mysql.com/commits/113698 :
      
      This commit rewrites the mechanism for iterating over results for linked 
      operations with multiple scans such that it uses a top-down rather than bottom 
      up approach. We thus iterate linearly over the result tuples of the root 
      operation and match these with corrsponding descendant tuples. The motivation 
      for this change is:
      * It simplifies handling of left outer join semantics.
      * It (hopefully) simplifies the code in general.
      * It should be easier to add support for "bushy" scan queries.
      
      This commit contains a (preliminary) fix for for another problem: Non-root scan 
      operations would produce duplicate tuple identifiers for clusters with more
      than one data node. This has been fixed by including the data node identifier in
      the tuple id.

 3211 Jan Wedvik	2010-07-15
      Added more comments.

    modified:
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2010-07-08 09:39:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2010-08-06 08:47:27 +0000
@@ -392,7 +392,8 @@ public:
      * This function is called on the *child* by the *parent* when *parent*
      *   has completed a batch
      */
-    void (Dbspj::*m_parent_batch_complete)(Signal*,Ptr<Request>,Ptr<TreeNode>);
+    void (Dbspj::*m_parent_batch_complete)(Signal*,Ptr<Request>,Ptr<TreeNode>,
+                                           Uint32 rowCount);
 
     /**
      * This function is called when getting a SCAN_NEXTREQ
@@ -427,6 +428,23 @@ public:
   {
     Uint32 m_api_resultRef;
     Uint32 m_api_resultData;
+    /**
+     * This is the number of rows that we will ask for in this batch. This is
+     * equal to the number of rows that we will receive for the parent operation
+     * (in this batch). Until this number can be decided, this field will
+     * be set to 'invalidRowCount'.
+     */
+    Uint32 m_rows_requested;
+    /**
+     * This is the number of rows that we have received in this batch.
+     */
+    Uint32 m_rows_received;
+    /**
+     * This is the number of TCKEYREF messages received in this batch. When
+     * all lookup requests have been responded to, m_rows_requested should equal
+     * m_rows_received + m_rows_refused.
+     */
+    Uint32 m_rows_refused;
     Uint32 m_lqhKeyReq[LqhKeyReq::FixedSignalLength + 4];
   };
 
@@ -906,7 +924,8 @@ private:
   Uint32 nodeFail(Signal*, Ptr<Request>, NdbNodeBitmask mask);
 
   Uint32 createNode(Build_context&, Ptr<Request>, Ptr<TreeNode> &);
-  void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>, 
+                           Uint32 rowCount);
   void releaseScanBuffers(Ptr<Request> requestPtr);
   void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
   void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
@@ -1012,7 +1031,8 @@ private:
   void lookup_execLQHKEYREF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_execLQHKEYCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_parent_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
-  void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+                                    Uint32 rowCount);
   void lookup_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
   Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
                                NdbNodeBitmask);
@@ -1039,7 +1059,8 @@ private:
   void scanFrag_execSCAN_FRAGREF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void scanFrag_execSCAN_FRAGCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void scanFrag_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr &);
-  void scanFrag_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void scanFrag_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+                                      Uint32 rowCount);
   void scanFrag_execSCAN_NEXTREQ(Signal*, Ptr<Request>,Ptr<TreeNode>);
   void scanFrag_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void scanFrag_cleanup(Ptr<Request>, Ptr<TreeNode>);
@@ -1064,7 +1085,8 @@ private:
   void scanIndex_batchComplete(Signal* signal);
   Uint32 scanIndex_findFrag(Local_ScanIndexFrag_list &, Ptr<ScanIndexFrag>&,
                             Uint32 fragId);
-  void scanIndex_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void scanIndex_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>,
+                                       Uint32 rowCount);
   void scanIndex_execSCAN_NEXTREQ(Signal*, Ptr<Request>,Ptr<TreeNode>);
   void scanIndex_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void scanIndex_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2010-07-15 12:59:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2010-08-06 08:47:27 +0000
@@ -65,6 +65,7 @@
 
 const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, 0 };
+static const Uint32 invalidRowCount = 0xffffffff;
 
 /** A noop for now.*/
 void Dbspj::execREAD_CONFIG_REQ(Signal* signal) 
@@ -1404,7 +1405,7 @@ Dbspj::releaseRequestBuffers(Ptr<Request
 
 void
 Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr, 
-                           Ptr<TreeNode> treeNodePtr)
+                           Ptr<TreeNode> treeNodePtr, Uint32 rowCount)
 {
   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
   Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
@@ -1421,7 +1422,8 @@ Dbspj::reportBatchComplete(Signal * sign
                  childPtr.p->m_info->m_parent_batch_complete !=0 );
       (this->*(childPtr.p->m_info->m_parent_batch_complete))(signal, 
                                                              requestPtr, 
-                                                             childPtr);
+                                                             childPtr,
+                                                             rowCount);
     }
   }
 }
@@ -2490,6 +2492,9 @@ Dbspj::lookup_build(Build_context& ctx,
     ctx.m_resultData = param->resultData;
     treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
     treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
+    treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+    treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+    treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
 
     /**
      * Parse stuff common lookup/scan-frag
@@ -2786,6 +2791,24 @@ Dbspj::lookup_execTRANSID_AI(Signal* sig
   ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
   requestPtr.p->m_lookup_node_data[Tnode] -= 1;
 
+  treeNodePtr.p->m_lookup_data.m_rows_received++;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_lookup_data.m_rows_requested != invalidRowCount
+      && treeNodePtr.p->m_lookup_data.m_rows_requested == 
+      treeNodePtr.p->m_lookup_data.m_rows_received 
+      + treeNodePtr.p->m_lookup_data.m_rows_refused)
+  {
+    jam();
+    // We have received all rows for this operation in this batch.
+    reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                        treeNodePtr.p->m_lookup_data.m_rows_received);
+    // Prepare for next batch.
+    treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+    treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+    treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+  }
+
   checkBatchComplete(signal, requestPtr, 1);
 }
 
@@ -2880,6 +2903,24 @@ Dbspj::lookup_execLQHKEYREF(Signal* sign
   ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
   requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
 
+  treeNodePtr.p->m_lookup_data.m_rows_refused++;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_lookup_data.m_rows_requested != invalidRowCount
+      && treeNodePtr.p->m_lookup_data.m_rows_requested == 
+      treeNodePtr.p->m_lookup_data.m_rows_received 
+      + treeNodePtr.p->m_lookup_data.m_rows_refused)
+  {
+    jam();
+    // We have received all rows for this operation in this batch.
+    reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                        treeNodePtr.p->m_lookup_data.m_rows_received);
+    // Prepare for next batch.
+    treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+    treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+    treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+  }
+
   checkBatchComplete(signal, requestPtr, cnt);
 }
 
@@ -3048,7 +3089,8 @@ Dbspj::lookup_parent_row(Signal* signal,
 void
 Dbspj::lookup_parent_batch_complete(Signal* signal,
                              Ptr<Request> requestPtr,
-                             Ptr<TreeNode> treeNodePtr)
+                             Ptr<TreeNode> treeNodePtr,
+                             Uint32 rowCount)
 {
   jam();
 
@@ -3062,7 +3104,19 @@ Dbspj::lookup_parent_batch_complete(Sign
    */
   ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE);
 
-  reportBatchComplete(signal, requestPtr, treeNodePtr);
+  ndbassert(treeNodePtr.p->m_lookup_data.m_rows_requested == invalidRowCount);
+  treeNodePtr.p->m_lookup_data.m_rows_requested = rowCount;
+  if (treeNodePtr.p->m_lookup_data.m_rows_requested == 
+      treeNodePtr.p->m_lookup_data.m_rows_received + 
+      treeNodePtr.p->m_lookup_data.m_rows_refused)
+  {
+    // The batch is complete for this operation as well.
+    reportBatchComplete(signal, requestPtr, treeNodePtr, rowCount);
+    // Prepare for next batch.
+    treeNodePtr.p->m_lookup_data.m_rows_requested = invalidRowCount;
+    treeNodePtr.p->m_lookup_data.m_rows_received = 0;
+    treeNodePtr.p->m_lookup_data.m_rows_refused = 0;
+  }
 }
 
 void
@@ -3584,7 +3638,8 @@ Dbspj::scanFrag_execTRANSID_AI(Signal* s
     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
     {
       jam();
-      reportBatchComplete(signal, requestPtr, treeNodePtr);
+      reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                          treeNodePtr.p->m_scanfrag_data.m_rows_received);
     }
     
     checkBatchComplete(signal, requestPtr, 1);
@@ -3654,7 +3709,8 @@ Dbspj::scanFrag_execSCAN_FRAGCONF(Signal
     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
     {
       jam();
-      reportBatchComplete(signal, requestPtr, treeNodePtr);
+      reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                          treeNodePtr.p->m_scanfrag_data.m_rows_received);
     }
 
     checkBatchComplete(signal, requestPtr, 1);
@@ -3710,7 +3766,8 @@ Dbspj::scanFrag_parent_row(Signal* signa
 void
 Dbspj::scanFrag_parent_batch_complete(Signal* signal,
                                       Ptr<Request> requestPtr,
-                                      Ptr<TreeNode> treeNodePtr)
+                                      Ptr<TreeNode> treeNodePtr,
+                                      Uint32 rowCount)
 {
   jam();
   ndbrequire(false);
@@ -4363,7 +4420,8 @@ Dbspj::scanIndex_parent_row(Signal* sign
        * We being a T_ONE_SHOT means that we're only be called
        *   with parent_row once, i.e batch is complete
        */
-      scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
+      scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr, 
+                                      data.m_rows_received);
     }
 
     return;
@@ -4412,7 +4470,8 @@ Dbspj::scanIndex_fixupBound(Ptr<ScanInde
 void
 Dbspj::scanIndex_parent_batch_complete(Signal* signal,
                                        Ptr<Request> requestPtr,
-                                       Ptr<TreeNode> treeNodePtr)
+                                       Ptr<TreeNode> treeNodePtr,
+                                       Uint32 rowCount)
 {
   jam();
 
@@ -4617,7 +4676,8 @@ Dbspj::scanIndex_execTRANSID_AI(Signal* 
     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
     {
       jam();
-      reportBatchComplete(signal, requestPtr, treeNodePtr);
+      reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                          data.m_rows_received);
     }
     
     checkBatchComplete(signal, requestPtr, 1);
@@ -4682,7 +4742,8 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
     {
       jam();
-      reportBatchComplete(signal, requestPtr, treeNodePtr);
+      reportBatchComplete(signal, requestPtr, treeNodePtr, 
+                          data.m_rows_received);
     }
     
     checkBatchComplete(signal, requestPtr, 1);

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2010-07-15 17:51:57 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2010-08-06 08:44:14 +0000
@@ -830,6 +830,7 @@ NdbQueryBuilder::scanTable(const NdbDict
   returnErrIf(op==0, Err_MemoryAlloc);
 
   m_pimpl->m_operations.push_back(op);
+  op->markScanAncestors();
   return &op->m_interface;
 }
 
@@ -905,6 +906,7 @@ NdbQueryBuilder::scanIndex(const NdbDict
   }
 
   m_pimpl->m_operations.push_back(op);
+  op->markScanAncestors();
   return &op->m_interface;
 }
 
@@ -1723,6 +1725,8 @@ NdbQueryOperationDefImpl::addColumnRef(c
 
 void NdbQueryOperationDefImpl::markScanAncestors()
 {
+  // Verify that parent links have been established.
+  assert(m_ix == 0 || m_parent != NULL);
   NdbQueryOperationDefImpl* operation = this;
   do
   {
@@ -2248,9 +2252,7 @@ NdbQueryScanOperationDefImpl::NdbQuerySc
                            const char* ident,
                            Uint32      ix)
   : NdbQueryOperationDefImpl(table,options,ident,ix)
-{
-  markScanAncestors();
-}
+{}
 
 int
 NdbQueryScanOperationDefImpl::serialize(Uint32Buffer& serializedDef,


Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20100806084727-7lryn051n4220bjj.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3211to 3214) Jan Wedvik6 Aug