List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:August 20 2010 7:28am
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3233
to 3234)
View as plain text  
 3234 Jan Wedvik	2010-08-20
      With this commit, the API should be able to handle scan-scan queries where
      there are multiple result batches. The API uses ScanTabConf::len as a bit mask
      to see which sub scan that are finished an which that are not. This information
      is used to reset those streams that will get new data in the next batch,
      and to rewind those that will not. Also, this information is used to tell
      when it is appropriate to generate rows with right hand NULL parts for left
      outer joins.
      
      The code has only been tested for one-node clusters, since there is currently
      a bug in the kernel that causes crashed when running such queries on multi
      node clusters.
      
      There is a known error concerning left joins, meaning that some result rows
      may be missing when running such queries.
      
      To test this check-in, try running the SQL code below:
      
      drop database if exists spj_ndb;
      create database spj_ndb;
      use spj_ndb;
      set ndb_join_pushdown = true;
      
      create table t1 (pk int primary key, a int, b int) engine=ndb;
      create index ix1 on t1(b,a);
      
      insert into t1 values (0,10,10);
      insert into t1 values (1,10,10);
      insert into t1 values (2,10,10);
      insert into t1 values (3,10,10);
      insert into t1 values (4,10,10);
      insert into t1 values (5,10,10);
      insert into t1 values (6,10,10);
      insert into t1 values (7,10,10);
      insert into t1 values (8,10,10);
      insert into t1 values (9,10,10);
      insert into t1 values (10,10,10);
      insert into t1 values (11,10,10);
      insert into t1 values (12,10,10);
      insert into t1 values (13,10,10);
      insert into t1 values (14,10,10);
      insert into t1 values (15,10,10);
      insert into t1 values (16,10,10);
      insert into t1 values (17,10,10);
      insert into t1 values (18,10,10);
      insert into t1 values (19,10,10);
      
      select * from t1 as x1 join t1 as x2 on x1.a=x2.b;

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 3233 Jonas Oreland	2010-08-18
      ndb svs - fix maintainance of m_active_nodes mask

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-08-18 12:35:15 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-08-20 07:27:09 +0000
@@ -322,6 +322,20 @@ public:
   bool isEmpty() const
   { return m_iterState == Iter_finished; }
 
+  /** This method is only relevant for scan nodes. It is used for marking 
+   * a resuilt stream as holding the last batch of a sub scan, 
+   * meaning that it is the last batch of the scan that was instantiated from 
+   * the current batch of its parent operation.*/
+  void setSubScanComplete(bool complete)
+  { m_subScanComplete = complete; }
+
+  /** This method is only relevant for scan nodes. It returns true if the
+   * current batch is the last batch of a sub scan, meaning that it is the
+   * last batch of the scan that was instantiated from the current batch
+   * of its parent operation.*/
+  bool isSubScanComplete()
+  { return m_subScanComplete; }
+
   /** For debugging.*/
   friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
 
@@ -360,6 +374,12 @@ private:
    * getNextScanRow())*/
   Uint16 m_currentParentId;
   
+  /** This field is only relevant for scan nodes. It is true if the
+   * current batch is the last batch of a sub scan, meaning that it is the
+   * last batch of the scan that was instantiated from the current batch
+   * of its parent operation.*/
+  bool m_subScanComplete;
+
   /**
    * TupleSet contain two logically distinct set of information:
    *
@@ -432,6 +452,7 @@ NdbResultStream::NdbResultStream(NdbQuer
   m_operation(operation),
   m_iterState(Iter_notStarted),
   m_currentParentId(tupleNotFound),
+  m_subScanComplete(false),
   m_tupleSet(NULL)
 {};
 
@@ -475,6 +496,14 @@ NdbResultStream::reset()
   clearParentChildMap();
 
   m_receiver.prepareSend();
+  /* If this stream will get new rows in the next batch, then so will
+   * all of its descendants.*/
+  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+       childNo++)
+  {
+    m_operation.getChildOperation(childNo)
+      .getResultStream(getRootFragNo()).reset();
+  }
 } //NdbResultStream::reset
 
 
@@ -556,6 +585,7 @@ NdbResultStream::findNextTuple()
 void
 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
 {
+  assert(m_iterState == Iter_notStarted);
   if (m_operation.getQueryDef().isScanQuery())
   {
     const CorrelationData correlData(ptr, len);
@@ -583,16 +613,17 @@ NdbResultStream::execTRANSID_AI(const Ui
     m_receiver.execTRANSID_AI(ptr, len);
   }
   m_rowCount++;
+  /* Set correct #rows received in the NdbReceiver.
+   */
+  getReceiver().m_result_rows = getRowCount();
 } // NdbResultStream::execTRANSID_AI()
 
 
 void 
 NdbResultStream::handleBatchComplete()
 {
-  /* Now we have received all tuples for all operations. 
-   * Set correct #rows received in the NdbReceiver.
-   */
-  getReceiver().m_result_rows = getRowCount();
+  assert(m_iterState == Iter_notStarted || m_iterState == Iter_finished);
+  m_iterState = Iter_notStarted;
 }
 
 
@@ -600,6 +631,7 @@ bool
 NdbResultStream::getNextLookupRow()
 {
   assert(!m_operation.getQueryDef().isScanQuery());
+  assert(m_rootFragNo == 0);
 
   switch (m_iterState)
   {
@@ -617,7 +649,7 @@ NdbResultStream::getNextLookupRow()
           m_operation.getChildOperation(childNo);
       
         // Check that there is a child tuple for each innner join child.
-        if (!child.getResultStream(m_rootFragNo).getNextLookupRow() &&
+        if (!child.getResultStream(0).getNextLookupRow() &&
             child.getQueryOperationDef().getMatchType() 
             != NdbQueryOptions::MatchAll)
         {
@@ -728,14 +760,19 @@ NdbResultStream::getNextScanRow(Uint16 p
       {
         return ScanRowResult_endOfScan;
       }
+      else if(isSubScanComplete() || 
+              !m_operation.getQueryOperationDef().isScanOperation())
+      {
+        /* Next batch will contain rows related the next batch of the
+         * parent scan. So if this is a left outer join, we can generate
+         * rows with NULLs for the right part now.*/
+        return ScanRowResult_endOfScan;
+      }
       else
       {
-#if 0
+        /* Next batch will contain rows related to the current batch of the
+         * parent scan.*/
         return ScanRowResult_endOfBatch;
-#else
-        // FIXME!!! (Handle left outer join properly.)
-        return ScanRowResult_endOfScan;
-#endif
       }
     }
 
@@ -2514,6 +2551,7 @@ NdbQueryImpl::sendFetchMore(int nodeId)
   Uint32 sent = 0;
   assert(getRoot().m_resultStreams!=NULL);
   assert(m_pendingFrags==0);
+  assert(m_queryDef.isScanQuery());
 
   ReceiverIdIterator receiverIdIter(*this);
 
@@ -2532,9 +2570,27 @@ NdbQueryImpl::sendFetchMore(int nodeId)
         m_rootFrags + receiverIdIter.getRootFragNo(i);
       emptyFrag->reset();
   
-      for (unsigned op=0; op<m_countOperations; op++) 
+      for (unsigned opNo=0; opNo<m_countOperations; opNo++) 
       {
-        emptyFrag->getResultStream(op).reset();
+        const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+        // Check if this is a leaf node.
+        if (op.getNoOfChildOperations()==0)
+        {
+          // Find first ancestor that is not finished.
+          const NdbQueryOperationImpl* ancestor = &op;
+          while (ancestor!=NULL && 
+                 ancestor->getResultStream(emptyFrag->getFragNo())
+                 .isSubScanComplete())
+          {
+            ancestor = ancestor->getParentOperation();
+          }
+          if (ancestor!=NULL)
+          {
+            /* Reset ancestor and all its descendants, since all these
+             * streams will get a new set of rows in the next batch. */ 
+            ancestor->getResultStream(emptyFrag->getFragNo()).reset();
+          }
+        }
       }
     }
     receiverIdIter.getNextWords(idBuffSize);
@@ -4260,6 +4316,15 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
     // Don't awake before we have data, or query batch completed.
     ret = !rootFrag.isEmpty() || m_queryImpl.isBatchComplete();
   }
+  for(Uint32 opNo = 0; opNo <  m_queryImpl.getQueryDef().getNoOfOperations();
+      opNo++)
+  {
+    /* Mark each scan node to indicate if the current batch is the last in the
+     * current sub-scan.
+     */
+    rootFrag.getResultStream(opNo)
+      .setSubScanComplete((nodeMask & (1 << opNo)) == 0);
+  }
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
            << ", tcPtrI=" << tcPtrI << " rowCount=" << rowCount 


Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20100820072709-kdiape7yxx8kryg5.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3233to 3234) Jan Wedvik20 Aug