List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:October 12 2010 1:00pm
Subject:bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(jan.wedvik:3312)
View as plain text  
#At file:///net/atum17/export/home2/tmp/jw159207/mysql/repo/push-scan-scan/ based on revid:ole.john.aske@stripped

 3312 Jan Wedvik	2010-10-12
      This commits adds the NdbQueryOperation::setBatchSize() method for setting 
      the batch size (number of rows) for pushed scans. (Implicitly, this also sets
      the batch size for descendant lookup operations.)

    modified:
      storage/ndb/include/ndbapi/NdbQueryOperation.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/include/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbQueryOperation.hpp	2010-10-07 13:54:04 +0000
+++ b/storage/ndb/include/ndbapi/NdbQueryOperation.hpp	2010-10-12 13:00:44 +0000
@@ -327,6 +327,16 @@ public:
    */
   int setParallelism(Uint32 parallelism);
 
+  /** Set the batch size (max rows per batch) for this operation. This
+   * only applies to scan operations, as lookup operations always will
+   * have the same batch size as its parent operation, or 1 if it is the
+   * root operation.
+   * @param batchSize Batch size (in number of rows). A value of 0 means
+   * use the default batch size.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setBatchSize(Uint32 batchSize);
+
   /**
    * Set the NdbInterpretedCode needed for defining a scan filter for 
    * this operation. 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-10-11 13:18:51 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-10-12 13:00:44 +0000
@@ -1012,6 +1012,10 @@ int NdbQueryOperation::setParallelism(Ui
   return m_impl.setParallelism(parallelism);
 }
 
+int NdbQueryOperation::setBatchSize(Uint32 batchSize){
+  return m_impl.setBatchSize(batchSize);
+}
+
 int NdbQueryOperation::setInterpretedCode(const NdbInterpretedCode& code) const
 {
   return m_impl.setInterpretedCode(code);
@@ -2070,7 +2074,7 @@ NdbQueryImpl::prepareSend()
   }
 
   // Some preparation for later batchsize calculations pr. (sub) scan
-  getRoot().calculateBatchedRows();
+  getRoot().calculateBatchedRows(NULL);
   getRoot().setBatchedRows(1);
 
   // 1. Build receiver structures for each QueryOperation.
@@ -2303,7 +2307,8 @@ NdbQueryImpl::doSend(int nodeId, bool la
                                       batchRows,
                                       batchByteSize,
                                       firstBatchRows);
-    assert (batchRows==root.getMaxBatchRows());
+    assert(batchRows==root.getMaxBatchRows());
+    assert(batchRows==firstBatchRows);
     ScanTabReq::setScanBatch(reqInfo, batchRows);
     scanTabReq->batch_byte_size = batchByteSize;
     scanTabReq->first_batch_size = firstBatchRows;
@@ -3635,56 +3640,97 @@ int NdbQueryOperationImpl::serializePara
   return 0;
 } // NdbQueryOperationImpl::serializeParams
 
-int
-NdbQueryOperationImpl::calculateBatchedRows(NdbQueryOperationImpl* scanParent)
+Uint32
+NdbQueryOperationImpl
+::calculateBatchedRows(NdbQueryOperationImpl* closestScan)
 {
+  NdbQueryOperationImpl* myClosestScan;
   if (m_operationDef.isScanOperation())
-    scanParent = this;
+  {
+    myClosestScan = this;
+  }
+  else
+  {
+    myClosestScan = closestScan;
+  }
 
-  for (Uint32 i = 0; i < m_children.size(); i++)
-    m_children[i]->calculateBatchedRows(scanParent);
+  Uint32 maxBatchRows = 0;
+  if (myClosestScan != NULL)
+  {
 
 #ifdef TEST_SCANREQ
     m_maxBatchRows = 4;  // To force usage of SCAN_NEXTREQ even for small scans resultsets
 #endif
 
-  if (scanParent!=NULL)
-  {
-    Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
+    const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
 
-    // Calculate batchsize for query as minimum batchRows for all m_operations[].
-    // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
-    // when building signal after max-batchRows has been determined.
+    /**
+     * For each batch, a lookup operation must be able to receive as many rows
+     * as the closest ancestor scan operation. 
+     * We must thus make sure that we do not set a batch size for the scan 
+     * that exceeds what any of its scan descendants can use.
+     *
+     * Ignore calculated 'batchByteSize' and 'firstBatchRows' 
+     * here - Recalculated when building signal after max-batchRows has been 
+     * determined.
+     */
     Uint32 batchByteSize, firstBatchRows;
+    /**
+     * myClosestScan->m_maxBatchRows may be zero to indicate that we
+     * should use default values, or non-zero if the application had an 
+     * explicit preference.
+     */
+    maxBatchRows = myClosestScan->m_maxBatchRows;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
                                       m_ndbRecord,
                                       m_firstRecAttr,
                                       0, // Key size.
-                                      getQuery().getRootFragCount(),  // scanParent->getFragCount()
-                                      scanParent->m_maxBatchRows,
+                                      m_queryImpl.getRootFragCount(),
+                                      maxBatchRows,
                                       batchByteSize,
                                       firstBatchRows);
-    assert (scanParent->m_maxBatchRows>0);
-    assert (firstBatchRows==scanParent->m_maxBatchRows);
-    m_maxBatchRows = firstBatchRows; // First guess, ::setBatchedRows() updates later  
+    assert(maxBatchRows > 0);
+    assert(firstBatchRows == maxBatchRows);
   }
-  else
-    m_maxBatchRows = 1;
 
-  return 0;
+  // Find the largest value that is acceptable to all lookup descendants.
+  for (Uint32 i = 0; i < m_children.size(); i++)
+  {
+    const Uint32 childMaxBatchRows = 
+      m_children[i]->calculateBatchedRows(myClosestScan);
+    maxBatchRows = MIN(maxBatchRows, childMaxBatchRows);
+  }
+  
+  if (m_operationDef.isScanOperation())
+  {
+    // Use this value for current op and all lookup descendants.
+    m_maxBatchRows = maxBatchRows;
+    // Return max(Unit32) to avoid interfering with batch size calculation 
+    // for parent.
+    return 0xffffffff;
+  }
+  else
+  {
+    return maxBatchRows;
+  }
 } // NdbQueryOperationImpl::calculateBatchedRows
 
 
 void
 NdbQueryOperationImpl::setBatchedRows(Uint32 batchedRows)
 {
-  if (m_operationDef.isScanOperation())
-    batchedRows = this->m_maxBatchRows;
+  if (!m_operationDef.isScanOperation())
+  {
+    /** Lookup operations should handle the same number of rows as 
+     * the closest scan ancestor.
+     */
+    m_maxBatchRows = batchedRows;
+  }
 
   for (Uint32 i = 0; i < m_children.size(); i++)
-    m_children[i]->setBatchedRows(batchedRows);
-
-  m_maxBatchRows = batchedRows;
+  {
+    m_children[i]->setBatchedRows(m_maxBatchRows);
+  }
 }
 
 
@@ -3883,8 +3929,8 @@ NdbQueryOperationImpl::prepareAttrInfo(U
                                       batchRows,
                                       batchByteSize,
                                       firstBatchRows);
-    assert (batchRows==getMaxBatchRows());
-
+    assert(batchRows==getMaxBatchRows());
+    assert(batchRows==firstBatchRows);
     requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
     param->requestInfo = requestInfo; 
     param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
@@ -4534,6 +4580,17 @@ int NdbQueryOperationImpl::setParallelis
   return 0;
 }
 
+int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCodeAbort(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+
+  m_maxBatchRows = batchSize;
+  return 0;
+}
+
 NdbResultStream& 
 NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
 {

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-10-07 13:54:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-10-12 13:00:44 +0000
@@ -586,6 +586,16 @@ public:
    */
   int setParallelism(Uint32 parallelism);
 
+  /** Set the batch size (max rows per batch) for this operation. This
+   * only applies to scan operations, as lookup operations always will
+   * have the same batch size as its parent operation, or 1 if it is the
+   * root operation.
+   * @param batchSize Batch size (in number of rows). A value of 0 means
+   * use the default batch size.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setBatchSize(Uint32 batchSize);
+
   /**
    * Set the NdbInterpretedCode needed for defining a scan filter for 
    * this operation. 
@@ -711,7 +721,7 @@ private:
 
   int serializeProject(Uint32Buffer& attrInfo);
 
-  int calculateBatchedRows(NdbQueryOperationImpl* scanParent=NULL);
+  Uint32 calculateBatchedRows(NdbQueryOperationImpl* closestScan);
   void setBatchedRows(Uint32 batchedRows);
 
   /** Construct and prepare receiver streams for result processing. */


Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20101012130044-6q5fl5zskspdb2jp.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(jan.wedvik:3312) Jan Wedvik12 Oct