List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:February 24 2011 2:17pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3441
to 3442)
View as plain text  
 3442 Jan Wedvik	2011-02-24
      This commit reduces the number of malloc/free calls when using linked 
      operations. For each NdbQueryImpl instance, memory for all needed instances of a given 
      type is allocated at once whenever possible, instead of allocating for one or a 
      few instances at a time.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 3441 jonas oreland	2011-02-24 [merge]
      ndb spj - merge 70-main into 70-spj

    removed:
      storage/ndb/include/kernel/signaldata/DbspjErr.hpp
      storage/ndb/include/kernel/signaldata/QueryTree.hpp
      storage/ndb/src/kernel/blocks/dbspj/
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjInit.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
    added:
      storage/ndb/include/kernel/signaldata/DbspjErr.hpp
      storage/ndb/include/kernel/signaldata/QueryTree.hpp
      storage/ndb/src/kernel/blocks/dbspj/
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjInit.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
    modified:
      configure.in
      storage/ndb/include/kernel/signaldata/SignalData.hpp
      storage/ndb/ndb_configure.m4
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
      storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
      storage/ndb/tools/ndbinfo_sql.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-02-11 13:12:09 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-02-24 14:16:51 +0000
@@ -362,45 +362,6 @@ public:
   /** For debugging.*/
   friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
 
-private:
-  /** This stream handles results derived from the m_rootFragNo'th 
-   * fragment of the root operation.*/
-  const Uint32 m_rootFragNo;
-
-  /** The receiver object that unpacks transid_AI messages.*/
-  NdbReceiver m_receiver;
-
-  /** Max #rows which this stream may recieve in its buffer structures */
-  Uint32 m_maxRows;
-
-  /** The number of transid_AI messages received.*/
-  Uint32 m_rowCount;
-
-  /** Operation to which this resultStream belong.*/
-  NdbQueryOperationImpl& m_operation;
-
-  /** This is the state of the iterator used by firstResult(), nextResult().*/
-  enum
-  {
-    /** The first row has not been fetched yet.*/
-    Iter_notStarted,
-    /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
-    Iter_started,  
-    /** Last row for current batch has been returned.*/
-    Iter_finished
-  } m_iterState;
-
-  /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
-  Uint16 m_currentRow;
-  
-  /** 
-   * This field is only used for result streams of scan operations. If set,
-   * it indicates that the stream is holding the last batch of a sub scan. 
-   * This means that it is the last batch of the scan that was instantiated 
-   * from the current batch of its parent operation.
-   */
-  bool m_subScanComplete;
-
   /**
    * TupleSet contain two logically distinct set of information:
    *
@@ -448,6 +409,45 @@ private:
     TupleSet& operator=(const TupleSet&);
   };
 
+private:
+  /** This stream handles results derived from the m_rootFragNo'th 
+   * fragment of the root operation.*/
+  const Uint32 m_rootFragNo;
+
+  /** The receiver object that unpacks transid_AI messages.*/
+  NdbReceiver m_receiver;
+
+  /** Max #rows which this stream may recieve in its buffer structures */
+  Uint32 m_maxRows;
+
+  /** The number of transid_AI messages received.*/
+  Uint32 m_rowCount;
+
+  /** Operation to which this resultStream belong.*/
+  NdbQueryOperationImpl& m_operation;
+
+  /** This is the state of the iterator used by firstResult(), nextResult().*/
+  enum
+  {
+    /** The first row has not been fetched yet.*/
+    Iter_notStarted,
+    /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+    Iter_started,  
+    /** Last row for current batch has been returned.*/
+    Iter_finished
+  } m_iterState;
+
+  /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
+  Uint16 m_currentRow;
+  
+  /** 
+   * This field is only used for result streams of scan operations. If set,
+   * it indicates that the stream is holding the last batch of a sub scan. 
+   * This means that it is the last batch of the scan that was instantiated 
+   * from the current batch of its parent operation.
+   */
+  bool m_subScanComplete;
+
   TupleSet* m_tupleSet;
 
   void clearTupleSet();
@@ -471,6 +471,50 @@ private:
   NdbResultStream& operator=(const NdbResultStream&);
 }; //class NdbResultStream
 
+//////////////////////////////////////////////
+/////////  NdbBulkAllocator methods ///////////
+//////////////////////////////////////////////
+
+NdbBulkAllocator::NdbBulkAllocator(size_t objSize)
+  :m_objSize(objSize),
+   m_maxObjs(0),
+   m_buffer(NULL),
+   m_nextObjNo(0)
+{}
+
+int NdbBulkAllocator::init(Uint32 maxObjs)
+{
+  assert(m_buffer == NULL);
+  m_maxObjs = maxObjs;
+  // Add check for buffer overrun.
+  m_buffer = new char[m_objSize*m_maxObjs+1];
+  if (unlikely(m_buffer == NULL))
+  {
+    return Err_MemoryAlloc;
+  }
+  m_buffer[m_maxObjs * m_objSize] = endMarker;
+  return 0;
+}
+
+void NdbBulkAllocator::reset(){
+  // Overrun check.
+  assert(m_buffer == NULL || m_buffer[m_maxObjs * m_objSize] == endMarker); 
+  // Overwrite with 0xff bytes to detect accidental use of released memory.
+  assert(m_buffer == NULL || 
+         memset(m_buffer, 0xff, m_maxObjs * m_objSize) != NULL);
+  delete m_buffer;
+  m_buffer = NULL;
+  m_nextObjNo = 0;
+  m_maxObjs = 0;
+}
+
+void* NdbBulkAllocator::allocObjMem(Uint32 noOfObjs)
+{
+  assert(m_nextObjNo + noOfObjs <=  m_maxObjs);
+  void * const result = m_buffer+m_objSize*m_nextObjNo;
+  m_nextObjNo += noOfObjs;
+  return m_nextObjNo > m_maxObjs ? NULL : result;
+}
 
 //////////////////////////////////////////////
 /////////  NdbResultStream methods ///////////
@@ -488,8 +532,12 @@ NdbResultStream::NdbResultStream(NdbQuer
   m_tupleSet(NULL)
 {};
 
-NdbResultStream::~NdbResultStream() { 
-  delete[] m_tupleSet; 
+NdbResultStream::~NdbResultStream()
+{
+  for (int i = static_cast<int>(m_maxRows)-1; i >= 0; i--)
+  {
+    m_tupleSet[i].~TupleSet();
+  }
 }
 
 int  // Return 0 if ok, else errorcode
@@ -503,9 +551,9 @@ NdbResultStream::prepare()
   if (m_operation.getQueryDef().isScanQuery())
   {
     m_maxRows  = m_operation.getMaxBatchRows();
-    m_tupleSet = new TupleSet[m_maxRows];
-    if (unlikely(m_tupleSet==NULL))
-      return Err_MemoryAlloc;
+    m_tupleSet = 
+      new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) 
+      TupleSet[m_maxRows];
 
     clearTupleSet();
   }
@@ -1352,18 +1400,23 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
   m_startIndicator(false),
   m_commitIndicator(false),
   m_prunability(Prune_No),
-  m_pruneHashVal(0)
+  m_pruneHashVal(0),
+  m_operationAlloc(sizeof(NdbQueryOperationImpl)),
+  m_tupleSetAlloc(sizeof(NdbResultStream::TupleSet)),
+  m_resultStreamAlloc(sizeof(NdbResultStream)),
+  m_pointerAlloc(sizeof(void*)),
+  m_rowBufferAlloc(sizeof(char))
 {
   // Allocate memory for all m_operations[] in a single chunk
   m_countOperations = queryDef.getNoOfOperations();
-  Uint32  size = m_countOperations * 
-    static_cast<Uint32>(sizeof(NdbQueryOperationImpl));
-  m_operations = static_cast<NdbQueryOperationImpl*> (::operator new(size));
-  if (unlikely(m_operations == NULL))
+  const int error = m_operationAlloc.init(m_countOperations);
+  if (unlikely(error != 0))
   {
-    setErrorCode(Err_MemoryAlloc);
+    setErrorCode(error);
     return;
   }
+  m_operations = reinterpret_cast<NdbQueryOperationImpl*>
+    (m_operationAlloc.allocObjMem(m_countOperations));
 
   // Then; use placement new to construct each individual 
   // NdbQueryOperationImpl object in m_operations
@@ -1375,11 +1428,10 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
     if (m_error.code != 0) 
     {
       // Destroy those objects that we have already constructed.
-      for (Uint32 j=0; j<=i; j++)
+      for (int j = static_cast<int>(i)-1; j>= 0; j--)
       { 
         m_operations[j].~NdbQueryOperationImpl();
       }
-      ::operator delete(m_operations);
       m_operations = NULL;
       return;
     }
@@ -1402,7 +1454,6 @@ NdbQueryImpl::~NdbQueryImpl()
     for (int i=m_countOperations-1; i>=0; --i)
     { m_operations[i].~NdbQueryOperationImpl();
     }
-    ::operator delete(m_operations);
     m_operations = NULL;
   }
   delete[] m_rootFrags;
@@ -1418,6 +1469,9 @@ NdbQueryImpl::postFetchRelease()
     { m_operations[i].postFetchRelease();
     }
   }
+  m_rowBufferAlloc.reset();
+  m_tupleSetAlloc.reset();
+  m_resultStreamAlloc.reset();
 }
 
 
@@ -2278,10 +2332,57 @@ NdbQueryImpl::prepareSend()
     m_rootFragCount = 1;
   }
 
+  int error = m_resultStreamAlloc.init(m_rootFragCount * getNoOfOperations());
+  if (error != 0)
+  {
+    setErrorCode(error);
+    return -1;
+  }
+  // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
+  error = m_pointerAlloc.init(m_rootFragCount * 
+                              (SharedFragStack::pointersPerFragment +
+                               OrderedFragSet::pointersPerFragment +
+                               // Pointers to NdbResultStream objects.
+                               getNoOfOperations()));
+  if (error != 0)
+  {
+    setErrorCode(error);
+    return -1;
+  }
+    
   // Some preparation for later batchsize calculations pr. (sub) scan
   getRoot().calculateBatchedRows(NULL);
   getRoot().setBatchedRows(1);
 
+  /** Calculate total amount of row buffer space for all operations and
+   * fragments.*/
+  Uint32 totalBuffSize = 0;
+  for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
+  {
+    NdbQueryOperationImpl& op = getQueryOperation(opNo);
+
+    op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
+    totalBuffSize += op.m_bufferSize;
+  }
+  /** Add one word per operation for buffer overrun check. We add a word
+   * rather than a byte to avoid possible alignment problems.
+   */
+  m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount + 
+                        sizeof(Uint32) * getNoOfOperations());
+  if (getQueryDef().isScanQuery())
+  {
+    Uint32 totalRows = 0;
+    for (Uint32 i = 0; i<getNoOfOperations(); i++)
+    {
+      totalRows += getQueryOperation(i).getMaxBatchRows();
+    }
+    error = m_tupleSetAlloc.init(m_rootFragCount * totalRows);
+    if (unlikely(error != 0))
+    {
+      setErrorCode(error);
+      return -1;
+    }
+  }
   // 1. Build receiver structures for each QueryOperation.
   // 2. Fill in parameters (into ATTRINFO) for QueryTree.
   //    (Has to complete *after* ::prepareReceiver() as QueryTree params
@@ -2315,12 +2416,13 @@ NdbQueryImpl::prepareSend()
     keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
     assert(keyRec!=NULL);
   }
-  int error;
-  if (unlikely((error = m_applFrags.prepare(getRoot().getOrdering(),
-                                              m_rootFragCount, 
-                                              keyRec,
-                                              getRoot().m_ndbRecord)) != 0)
-            || (error = m_fullFrags.prepare(m_rootFragCount)) != 0) {
+  if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
+                                            getRoot().getOrdering(),
+                                            m_rootFragCount, 
+                                            keyRec,
+                                            getRoot().m_ndbRecord)) != 0)
+      || (error = m_fullFrags.prepare(m_pointerAlloc,
+                                      m_rootFragCount)) != 0) {
     setErrorCode(error);
     return -1;
   }
@@ -2931,15 +3033,15 @@ NdbQueryImpl::SharedFragStack::SharedFra
 {}
 
 int
-NdbQueryImpl::SharedFragStack::prepare(int capacity)
+NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
+                                       int capacity)
 {
   assert(m_array==NULL);
   assert(m_capacity==0);
   if (capacity > 0) 
   { m_capacity = capacity;
-    m_array = new NdbRootFragment*[capacity];
-    if (unlikely(m_array==NULL))
-      return Err_MemoryAlloc;
+    m_array = 
+      reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity)); 
   }
   return 0;
 }
@@ -2971,15 +3073,14 @@ NdbQueryImpl::OrderedFragSet::OrderedFra
 
 NdbQueryImpl::OrderedFragSet::~OrderedFragSet() 
 { 
-  delete[] m_activeFrags;
   m_activeFrags = NULL;
-  delete[] m_emptiedFrags;
   m_emptiedFrags= NULL;
 }
 
 
 int
-NdbQueryImpl::OrderedFragSet::prepare(NdbQueryOptions::ScanOrdering ordering, 
+NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
+                                      NdbQueryOptions::ScanOrdering ordering, 
                                       int capacity,                
                                       const NdbRecord* keyRecord,
                                       const NdbRecord* resultRecord)
@@ -2991,13 +3092,12 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
   if (capacity > 0) 
   { 
     m_capacity = capacity;
-    m_activeFrags = new NdbRootFragment*[capacity];
-    if (unlikely(m_activeFrags==NULL))
-      return Err_MemoryAlloc;
+
+    m_activeFrags =  
+      reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity)); 
     bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
-    m_emptiedFrags = new NdbRootFragment*[capacity];
-    if (unlikely(m_emptiedFrags==NULL))
-      return Err_MemoryAlloc;
+    m_emptiedFrags = 
+      reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
     bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
   }
   m_ordering = ordering;
@@ -3254,7 +3354,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_resultStreams(NULL),
   m_params(),
   m_bufferSize(0),
-  m_batchBuffer(NULL),
+  m_batchOverflowCheck(NULL),
   m_resultBuffer(NULL),
   m_resultRef(NULL),
   m_isRowNull(true),
@@ -3265,7 +3365,8 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
   m_interpretedCode(NULL),
   m_diskInUserProjection(false),
-  m_parallelism(0)
+  m_parallelism(0),
+  m_rowSize(0xffffffff)
 { 
   if (errno == ENOMEM)
   {
@@ -3304,7 +3405,7 @@ NdbQueryOperationImpl::~NdbQueryOperatio
 {
   // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
   // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
-  assert (m_batchBuffer == NULL);
+  assert (m_batchOverflowCheck == NULL);
   assert (m_resultStreams == NULL);
   assert (m_firstRecAttr == NULL);
   assert (m_interpretedCode == NULL);
@@ -3317,26 +3418,21 @@ NdbQueryOperationImpl::~NdbQueryOperatio
 void
 NdbQueryOperationImpl::postFetchRelease()
 {
-  if (m_batchBuffer) {
-#ifndef NDEBUG // Buffer overrun check activated.
-    { const Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
-      assert(m_batchBuffer[bufLen+0] == 'a' &&
-             m_batchBuffer[bufLen+1] == 'b' &&
-             m_batchBuffer[bufLen+2] == 'c' &&
-             m_batchBuffer[bufLen+3] == 'd');
-    }
-#endif
-    delete[] m_batchBuffer;
-    m_batchBuffer = NULL;
-  }
-
-  if (m_resultStreams) {
-    for(Uint32 i = 0; i<getQuery().getRootFragCount(); i ++){
-      delete m_resultStreams[i];
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck ==  0xacbd1234);
+  m_batchOverflowCheck = NULL;
+  
+  if (m_resultStreams != NULL)
+  { 
+    for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
+    {
+      if (m_resultStreams[i] != NULL)
+      {
+        m_resultStreams[i]->~NdbResultStream();
+      }
     }
-    delete[] m_resultStreams;
-    m_resultStreams = NULL;
   }
+  m_resultStreams = NULL;
 
   Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
   NdbRecAttr* recAttr = m_firstRecAttr;
@@ -3519,7 +3615,6 @@ NdbQueryOperationImpl::setResultRowBuf (
   m_ndbRecord = rec;
   m_read_mask = result_mask;
   m_resultBuffer = resBuffer;
-  assert(m_batchBuffer==NULL);
   return 0;
 }
 
@@ -3947,46 +4042,19 @@ NdbQueryOperationImpl::setBatchedRows(Ui
 int 
 NdbQueryOperationImpl::prepareReceiver()
 {
-  const Uint32 rowSize = 
-    NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr,0,false);
-  m_bufferSize = rowSize * getMaxBatchRows();
-//ndbout "m_bufferSize=" << m_bufferSize << endl;
-
-  if (m_bufferSize > 0) { // 0 bytes in batch if no result requested
-    Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
-#ifdef NDEBUG
-    m_batchBuffer = new char[bufLen];
-    if (unlikely(m_batchBuffer == NULL)) {
-      return Err_MemoryAlloc;
-    }
-#else
-    /* To be able to check for buffer overrun.*/
-    m_batchBuffer = new char[bufLen+4];
-    if (unlikely(m_batchBuffer == NULL)) {
-      return Err_MemoryAlloc;
-    }
-    m_batchBuffer[bufLen+0] = 'a';
-    m_batchBuffer[bufLen+1] = 'b';
-    m_batchBuffer[bufLen+2] = 'c';
-    m_batchBuffer[bufLen+3] = 'd';
-#endif
-  }
-
   // Construct receiver streams and prepare them for receiving scan result
   assert(m_resultStreams==NULL);
   assert(m_queryImpl.getRootFragCount() > 0);
-  m_resultStreams = new NdbResultStream*[m_queryImpl.getRootFragCount()];
-  if (unlikely(m_resultStreams == NULL)) {
-    return Err_MemoryAlloc;
-  }
+
+  m_resultStreams = reinterpret_cast<NdbResultStream**>
+    (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); 
+
   for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
     m_resultStreams[i] = NULL;  // Init to legal contents for d'tor
   }
   for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = new NdbResultStream(*this, i);
-    if (unlikely(m_resultStreams[i] == NULL)) {
-      return Err_MemoryAlloc;
-    }
+    m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
+      NdbResultStream(*this, i);
     const int error = m_resultStreams[i]->prepare();
     if (unlikely(error)) {
       return error;
@@ -3994,17 +4062,23 @@ NdbQueryOperationImpl::prepareReceiver()
 
     m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, 
                                         false, this);
+    char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
+                                                 .allocObjMem(m_bufferSize));
     m_resultStreams[i]->getReceiver()
       .do_setup_ndbrecord(m_ndbRecord,
                           getMaxBatchRows(), 
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
-                          rowSize,
-                          &m_batchBuffer[m_bufferSize*i],
+                          getRowSize(),
+                          rowBuf,
                           0);
     m_resultStreams[i]->getReceiver().prepareSend();
   }
-
+  // So that we can test for for buffer overrun.
+  m_batchOverflowCheck = 
+    reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
+                              .allocObjMem(sizeof(Uint32)));
+  *m_batchOverflowCheck = 0xacbd1234;
   return 0;
 }//NdbQueryOperationImpl::prepareReceiver
 
@@ -4850,6 +4924,16 @@ NdbQueryOperationImpl::getReceiver(Uint3
   return m_resultStreams[recNo]->getReceiver();
 }
 
+Uint32 NdbQueryOperationImpl::getRowSize() const
+{
+  // Check if row size has been computed yet.
+  if (m_rowSize == 0xffffffff)
+  {
+    m_rowSize = 
+      NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+  }
+  return m_rowSize;
+}
 
 /** For debugging.*/
 NdbOut& operator<<(NdbOut& out, const NdbQueryOperationImpl& op){

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-02-08 12:14:41 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-02-24 14:16:51 +0000
@@ -37,6 +37,57 @@ class NdbReceiver;
 class NdbOut;
 class NdbRootFragment;
 
+/**
+ * This class simplifies the task of allocating memory for many instances
+ * of the same type at once and then constructing them later.
+ */
+class NdbBulkAllocator
+{
+public:
+  /**
+   * @param[in] objSize Size (in bytes) of each object.
+   */
+  explicit NdbBulkAllocator(size_t objSize);
+  ~NdbBulkAllocator()
+  { reset(); };
+
+  /**
+   * Allocate memory for a number of objects from the heap.
+   * @param[in] maxObjs The maximal number of objects this instance should
+   * accomodate.
+   * @return 0 or possible error code.
+   */
+  int init(Uint32 maxObjs);
+
+  /** Release allocated memory to heap and reset *this to initial state.*/
+  void reset();
+
+  /**
+   * Get an area large enough to hold 'noOfObjs' objects.
+   * @return The memory area.
+   */
+  void* allocObjMem(Uint32 noOfObjs);
+private:
+  /** An end marker for checking for buffer overrun.*/
+  static const char endMarker = -15;
+
+  /** Size of each object (in bytes).*/
+  const size_t m_objSize;
+
+  /** The number of objects this instance can accomodate.*/
+  Uint32 m_maxObjs;
+
+  /** The allocated memory area.*/
+  char* m_buffer;
+
+  /** The number of object areas allocated so far.*/
+  Uint32 m_nextObjNo;
+
+  // No copying.
+  NdbBulkAllocator(const NdbBulkAllocator&);
+  NdbBulkAllocator& operator= (const NdbBulkAllocator&);
+};
+
 /** This class is the internal implementation of the interface defined by
  * NdbQuery. This class should thus not be visible to the application 
  * programmer. @see NdbQuery.*/
@@ -194,6 +245,9 @@ public:
   Uint32 getRootFragCount() const
   { return m_rootFragCount; }
 
+  NdbBulkAllocator& getTupleSetAlloc()
+  { return m_tupleSetAlloc; }
+
 private:
   /** Possible return values from NdbQueryImpl::awaitMoreResults. 
    * A subset of the integer values also matches those returned
@@ -218,15 +272,18 @@ private:
    */
   class SharedFragStack{
   public:
-    explicit SharedFragStack();
+    // For calculating need for dynamically allocated memory.
+    static const Uint32 pointersPerFragment = 2;
 
-    ~SharedFragStack() {
-      delete[] m_array;
-    }
+    explicit SharedFragStack();
 
-    // Prepare internal datastructures.
-    // Return 0 if ok, else errorcode
-    int prepare(int capacity);
+    /** 
+     * Prepare internal datastructures.
+     * param[in] allocator For allocating arrays of pointers.
+     * param[in] capacity Max no of root fragments.
+     * @return 0 if ok, else errorcode
+     */
+    int prepare(NdbBulkAllocator& allocator, int capacity);
 
     NdbRootFragment* pop() { 
       return m_current>=0 ? m_array[m_current--] : NULL;
@@ -260,14 +317,23 @@ private:
 
   class OrderedFragSet{
   public:
+    // For calculating need for dynamically allocated memory.
+    static const Uint32 pointersPerFragment = 1;
+
     explicit OrderedFragSet();
 
     ~OrderedFragSet(); 
 
-    // Prepare internal datastructures.
-    // Return 0 if ok, else errorcode
-    int prepare(NdbQueryOptions::ScanOrdering ordering, 
-                int size, 
+    /** 
+     * Prepare internal datastructures.
+     * param[in] allocator For allocating arrays of pointers.
+     * param[in] ordering Possible scan ordering.
+     * param[in] capacity Max no of root fragments.
+     * @return 0 if ok, else errorcode
+     */
+    int prepare(NdbBulkAllocator& allocator,
+                NdbQueryOptions::ScanOrdering ordering, 
+                int capacity,  
                 const NdbRecord* keyRecord,
                 const NdbRecord* resultRecord);
 
@@ -444,6 +510,21 @@ private:
    * fragment that should be scanned.*/
   Uint32 m_pruneHashVal;
 
+  /** Allocator for NdbQueryOperationImpl objects.*/
+  NdbBulkAllocator m_operationAlloc;
+
+  /** Allocator for NdbResultStream::TupleSet objects.*/
+  NdbBulkAllocator m_tupleSetAlloc;
+
+  /** Allocator for NdbResultStream objects.*/
+  NdbBulkAllocator m_resultStreamAlloc;
+
+  /** Allocator for pointers.*/
+  NdbBulkAllocator m_pointerAlloc;
+
+  /** Allocator for result row buffers.*/
+  NdbBulkAllocator m_rowBufferAlloc;
+
   // Only constructable from factory ::buildQuery();
   explicit NdbQueryImpl(
              NdbTransaction& trans,
@@ -495,6 +576,13 @@ private:
    *  @return: 'true' if its time to resume appl. threads
    */ 
   bool handleBatchComplete(Uint32 rootFragNo);
+
+  NdbBulkAllocator& getPointerAlloc()
+  { return m_pointerAlloc; }
+  
+  NdbBulkAllocator& getRowBufferAlloc()
+  { return m_rowBufferAlloc; }
+
 }; // class NdbQueryImpl
 
 
@@ -660,13 +748,12 @@ private:
   /** Buffer size allocated for *each* ResultStream/Receiver when 
    *  fetching results.*/
   Uint32 m_bufferSize;
-  /** Internally allocated temp. buffer for *all* m_resultStreams[]
-   *  when receiving a batch. (m_bufferSize x #ResultStreams) */
-  char* m_batchBuffer;
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
   /** User specified buffer for final storage of result.*/
   char* m_resultBuffer;
   /** User specified pointer to application pointer that should be 
-   * set to point to the current row inside m_batchBuffer
+   * set to point to the current row inside a receiver buffer
    * @see NdbQueryOperationImpl::setResultRowRef */
   const char** m_resultRef;
   /** True if this operation gave no result for the current row.*/
@@ -695,6 +782,9 @@ private:
    */
   Uint32 m_parallelism;
   
+  /** Size of each result row (in bytes).*/
+  mutable Uint32 m_rowSize;
+
   explicit NdbQueryOperationImpl(NdbQueryImpl& queryImpl, 
                                  const NdbQueryOperationDefImpl& def);
   ~NdbQueryOperationImpl();
@@ -771,6 +861,7 @@ private:
   bool diskInUserProjection() const
   { return m_diskInUserProjection; }
 
+  Uint32 getRowSize() const;
 }; // class NdbQueryOperationImpl
 
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3441to 3442) Jan Wedvik24 Feb