List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:July 14 2010 5:04pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3204 to 3205)
View as plain text  
 3205 Ole John Aske	2010-07-14
      Correctly set ScanIndexParameters::batchSize argument for a child scan in a pushed join operation.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
 3204 Ole John Aske	2010-07-13 [merge]
      Merge with main SPJ branch.

    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/test/rqg/runall.sh
      storage/ndb/test/rqg/spj_test.yy
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-07-08 13:56:43 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-07-14 17:03:46 +0000
@@ -607,7 +607,7 @@ bool NdbRootFragment::isEmpty() const
   {
     /* We iterate over the tuples of the bottom scan operation.
      * When we have consumed all of these (for this batch and root fragment),
-     * this NrbRootFragment is empty.
+     * this NdbRootFragment is empty.
      */
     return m_currentTuple == 
       m_query->getScan(m_query->getScanCount()-1)
@@ -1865,14 +1865,15 @@ NdbQueryImpl::prepareSend()
       = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     
     Ndb* const ndb = m_transaction.getNdb();
-    TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
-
-    Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
 
 #ifdef TEST_SCANREQ
-    batchRows = 1;  // To force usage of SCAN_NEXTREQ even for small scans resultsets
+    m_maxBatchRows = 1;  // To force usage of SCAN_NEXTREQ even for small scans resultsets
 #endif
 
+#if 1
+    TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
+    Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
+
     // Calculate batchsize for query as minimum batchRows for all m_operations[].
     // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
     // when building signal after max-batchRows has been determined.
@@ -1890,11 +1891,16 @@ NdbQueryImpl::prepareSend()
       assert (firstBatchRows==batchRows);
     }
     m_maxBatchRows = batchRows;
+#else
+    // Some preparation for later batchsize calculations pr. (sub) scan
+    getRoot().calculateBatchedRows(m_maxBatchRows);
+#endif
+
     /**
      * Tuples are indentified with 16 bit unsigned integers, and tupleNotFound
      * (=0xffff) is used for representing unknown tuples.
      */
-    assert(batchRows < tupleNotFound);
+    assert(m_maxBatchRows < tupleNotFound);
 
     /** Scan operations need a own sub-transaction object associated with each 
      *  query.
@@ -2952,6 +2958,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_children(def.getNoOfChildOperations()),
   m_resultStreams(NULL),
   m_params(),
+  m_bufferSize(0),
   m_batchBuffer(NULL),
   m_resultBuffer(NULL),
   m_resultRef(NULL),
@@ -3004,7 +3011,7 @@ NdbQueryOperationImpl::postFetchRelease(
 {
   if (m_batchBuffer) {
 #ifndef NDEBUG // Buffer overrun check activated.
-    { const Uint32 bufLen = m_batchByteSize*m_queryImpl.getRootFragCount();
+    { const Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
       assert(m_batchBuffer[bufLen+0] == 'a' &&
              m_batchBuffer[bufLen+1] == 'b' &&
              m_batchBuffer[bufLen+2] == 'c' &&
@@ -3488,16 +3495,45 @@ int NdbQueryOperationImpl::serializePara
 } // NdbQueryOperationImpl::serializeParams
 
 
+int
+NdbQueryOperationImpl::calculateBatchedRows(Uint32& batchedRows)
+{
+  for (Uint32 i = 0; i < m_children.size(); i++)
+  {
+    m_children[i]->calculateBatchedRows(batchedRows);
+  }
+
+  Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
+  TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+  // Calculate batchsize for query as minimum batchRows for all m_operations[].
+  // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
+  // when building signal after max-batchRows has been determined.
+  Uint32 batchByteSize, firstBatchRows;
+  NdbReceiver::calculate_batch_size(tp,
+                                    m_ndbRecord,
+                                    m_firstRecAttr,
+                                    0, // Key size.
+                                    getQuery().getRootFragCount(),
+                                    batchedRows,
+                                    batchByteSize,
+                                    firstBatchRows);
+  assert (batchedRows>0);
+  assert (firstBatchRows==batchedRows);
+  return 0;
+} // NdbQueryOperationImpl::calculateBatchedRows
+
+
 int 
 NdbQueryOperationImpl::prepareReceiver()
 {
   const Uint32 rowSize = 
     NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr,0,false);
-  m_batchByteSize = rowSize * m_queryImpl.getMaxBatchRows();
-//ndbout "m_batchByteSize=" << m_batchByteSize << endl;
+  m_bufferSize = rowSize * m_queryImpl.getMaxBatchRows();
+//ndbout "m_bufferSize=" << m_bufferSize << endl;
 
-  if (m_batchByteSize > 0) { // 0 bytes in batch if no result requested
-    Uint32 bufLen = m_batchByteSize*m_queryImpl.getRootFragCount();
+  if (m_bufferSize > 0) { // 0 bytes in batch if no result requested
+    Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
 #ifdef NDEBUG
     m_batchBuffer = new char[bufLen];
     if (unlikely(m_batchBuffer == NULL)) {
@@ -3544,7 +3580,7 @@ NdbQueryOperationImpl::prepareReceiver()
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
                           rowSize,
-                          &m_batchBuffer[m_batchByteSize*i],
+                          &m_batchBuffer[m_bufferSize*i],
                           0);
     m_resultStreams[i]->getReceiver().prepareSend();
   }
@@ -3668,9 +3704,24 @@ NdbQueryOperationImpl::prepareAttrInfo(U
     if (unlikely(param==NULL))
       return Err_MemoryAlloc;
 
+    Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
+    TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+    Uint32 batchRows = m_queryImpl.getMaxBatchRows();
+    Uint32 batchByteSize, firstBatchRows;
+    NdbReceiver::calculate_batch_size(tp,
+                                      m_ndbRecord,
+                                      m_firstRecAttr,
+                                      0, // Key size.
+                                      m_queryImpl.getRootFragCount(),
+                                      batchRows,
+                                      batchByteSize,
+                                      firstBatchRows);
+    assert (batchRows==m_queryImpl.getMaxBatchRows());
+
     requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
     param->requestInfo = requestInfo; 
-    param->batchSize = 0xffff0100;                       // TODO FIXME (bytes=0xffff, rows=0x100)
+    param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
     param->resultData = getIdOfReceiver();
     QueryNodeParameters::setOpLen(param->len, paramType, length);
   }

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-07-07 13:24:23 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-07-14 17:03:46 +0000
@@ -535,7 +535,7 @@ public:
 
   bool isRowNULL() const;    // Row associated with Operation is NULL value?
 
-  bool isRowChanged() const; // Prev ::nextResult() on NdbQuery retrived a new
+  bool isRowChanged() const; // Prev ::nextResult() on NdbQuery retrieved a new
                              // value for this NdbQueryOperation
 
   /** Process result data for this operation. Return true if batch complete.*/
@@ -618,7 +618,11 @@ private:
   /** Buffer for parameters in serialized format */
   Uint32Buffer m_params;
 
-  /** Internally allocated buffer for temporary storing one result batch.*/
+  /** Buffer size allocated for *each* ResultStream/Receiver when 
+   *  fetching results.*/
+  Uint32 m_bufferSize;
+  /** Internally allocated temp. buffer for *all* m_resultStreams[]
+   *  when receiving a batch. (m_bufferSize x #ResultStreams) */
   char* m_batchBuffer;
   /** User specified buffer for final storage of result.*/
   char* m_resultBuffer;
@@ -628,8 +632,6 @@ private:
   const char** m_resultRef;
   /** True if this operation gave no result for the current row.*/
   bool m_isRowNull;
-  /** Batch size for scans or lookups with scan parents.*/
-  Uint32 m_batchByteSize;
 
   /** Result record & optional bitmask to disable read of selected cols.*/
   const NdbRecord* m_ndbRecord;
@@ -679,6 +681,8 @@ private:
 
   int serializeProject(Uint32Buffer& attrInfo);
 
+  int calculateBatchedRows(Uint32& batchedRows);
+
   /** Construct and prepare receiver streams for result processing. */
   int prepareReceiver();
 

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2010-03-10 09:36:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2010-07-14 17:03:46 +0000
@@ -132,6 +132,7 @@ NdbReceiver::getValues(const NdbRecord* 
 
   m_record.m_ndb_record= rec;
   m_record.m_row= row_ptr;
+  m_record.m_row_offset= rec->m_row_size;
 }
 
 #define KEY_ATTR_ID (~(Uint32)0)
@@ -699,6 +700,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
       {
         assert(m_record.m_read_range_no);
         assert(attrSize==4);
+        assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
         memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
         aLength--;
         continue; // Next
@@ -710,6 +712,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
        */
       if (attrId == AttributeHeader::READ_PACKED)
       {
+        assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
         Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
                                              aDataPtr,
                                              m_record.m_row);


Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20100714170346-yi4y73yxzugc1667.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3204 to 3205) Ole John Aske14 Jul