3442 Jan Wedvik 2011-02-24
This commit reduces the number of malloc/free calls when using linked
operations. For each NdbQueryImpl instance, memory for all needed instances of a given
type is allocated at once whenever possible, instead of allocating for one or a
few instances at a time.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
3441 jonas oreland 2011-02-24 [merge]
ndb spj - merge 70-main into 70-spj
removed:
storage/ndb/include/kernel/signaldata/DbspjErr.hpp
storage/ndb/include/kernel/signaldata/QueryTree.hpp
storage/ndb/src/kernel/blocks/dbspj/
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjInit.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
added:
storage/ndb/include/kernel/signaldata/DbspjErr.hpp
storage/ndb/include/kernel/signaldata/QueryTree.hpp
storage/ndb/src/kernel/blocks/dbspj/
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjInit.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
modified:
configure.in
storage/ndb/include/kernel/signaldata/SignalData.hpp
storage/ndb/ndb_configure.m4
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/tools/ndbinfo_sql.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-02-11 13:12:09 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-02-24 14:16:51 +0000
@@ -362,45 +362,6 @@ public:
/** For debugging.*/
friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
-private:
- /** This stream handles results derived from the m_rootFragNo'th
- * fragment of the root operation.*/
- const Uint32 m_rootFragNo;
-
- /** The receiver object that unpacks transid_AI messages.*/
- NdbReceiver m_receiver;
-
- /** Max #rows which this stream may recieve in its buffer structures */
- Uint32 m_maxRows;
-
- /** The number of transid_AI messages received.*/
- Uint32 m_rowCount;
-
- /** Operation to which this resultStream belong.*/
- NdbQueryOperationImpl& m_operation;
-
- /** This is the state of the iterator used by firstResult(), nextResult().*/
- enum
- {
- /** The first row has not been fetched yet.*/
- Iter_notStarted,
- /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
- Iter_started,
- /** Last row for current batch has been returned.*/
- Iter_finished
- } m_iterState;
-
- /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
- Uint16 m_currentRow;
-
- /**
- * This field is only used for result streams of scan operations. If set,
- * it indicates that the stream is holding the last batch of a sub scan.
- * This means that it is the last batch of the scan that was instantiated
- * from the current batch of its parent operation.
- */
- bool m_subScanComplete;
-
/**
* TupleSet contain two logically distinct set of information:
*
@@ -448,6 +409,45 @@ private:
TupleSet& operator=(const TupleSet&);
};
+private:
+ /** This stream handles results derived from the m_rootFragNo'th
+ * fragment of the root operation.*/
+ const Uint32 m_rootFragNo;
+
+ /** The receiver object that unpacks transid_AI messages.*/
+ NdbReceiver m_receiver;
+
+ /** Max #rows which this stream may recieve in its buffer structures */
+ Uint32 m_maxRows;
+
+ /** The number of transid_AI messages received.*/
+ Uint32 m_rowCount;
+
+ /** Operation to which this resultStream belong.*/
+ NdbQueryOperationImpl& m_operation;
+
+ /** This is the state of the iterator used by firstResult(), nextResult().*/
+ enum
+ {
+ /** The first row has not been fetched yet.*/
+ Iter_notStarted,
+ /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+ Iter_started,
+ /** Last row for current batch has been returned.*/
+ Iter_finished
+ } m_iterState;
+
+ /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
+ Uint16 m_currentRow;
+
+ /**
+ * This field is only used for result streams of scan operations. If set,
+ * it indicates that the stream is holding the last batch of a sub scan.
+ * This means that it is the last batch of the scan that was instantiated
+ * from the current batch of its parent operation.
+ */
+ bool m_subScanComplete;
+
TupleSet* m_tupleSet;
void clearTupleSet();
@@ -471,6 +471,50 @@ private:
NdbResultStream& operator=(const NdbResultStream&);
}; //class NdbResultStream
+//////////////////////////////////////////////
+///////// NdbBulkAllocator methods ///////////
+//////////////////////////////////////////////
+
+NdbBulkAllocator::NdbBulkAllocator(size_t objSize)
+ :m_objSize(objSize),
+ m_maxObjs(0),
+ m_buffer(NULL),
+ m_nextObjNo(0)
+{}
+
+int NdbBulkAllocator::init(Uint32 maxObjs)
+{
+ assert(m_buffer == NULL);
+ m_maxObjs = maxObjs;
+ // Add check for buffer overrun.
+ m_buffer = new char[m_objSize*m_maxObjs+1];
+ if (unlikely(m_buffer == NULL))
+ {
+ return Err_MemoryAlloc;
+ }
+ m_buffer[m_maxObjs * m_objSize] = endMarker;
+ return 0;
+}
+
+void NdbBulkAllocator::reset(){
+ // Overrun check.
+ assert(m_buffer == NULL || m_buffer[m_maxObjs * m_objSize] == endMarker);
+ // Overwrite with 0xff bytes to detect accidental use of released memory.
+ assert(m_buffer == NULL ||
+ memset(m_buffer, 0xff, m_maxObjs * m_objSize) != NULL);
+ delete m_buffer;
+ m_buffer = NULL;
+ m_nextObjNo = 0;
+ m_maxObjs = 0;
+}
+
+void* NdbBulkAllocator::allocObjMem(Uint32 noOfObjs)
+{
+ assert(m_nextObjNo + noOfObjs <= m_maxObjs);
+ void * const result = m_buffer+m_objSize*m_nextObjNo;
+ m_nextObjNo += noOfObjs;
+ return m_nextObjNo > m_maxObjs ? NULL : result;
+}
//////////////////////////////////////////////
///////// NdbResultStream methods ///////////
@@ -488,8 +532,12 @@ NdbResultStream::NdbResultStream(NdbQuer
m_tupleSet(NULL)
{};
-NdbResultStream::~NdbResultStream() {
- delete[] m_tupleSet;
+NdbResultStream::~NdbResultStream()
+{
+ for (int i = static_cast<int>(m_maxRows)-1; i >= 0; i--)
+ {
+ m_tupleSet[i].~TupleSet();
+ }
}
int // Return 0 if ok, else errorcode
@@ -503,9 +551,9 @@ NdbResultStream::prepare()
if (m_operation.getQueryDef().isScanQuery())
{
m_maxRows = m_operation.getMaxBatchRows();
- m_tupleSet = new TupleSet[m_maxRows];
- if (unlikely(m_tupleSet==NULL))
- return Err_MemoryAlloc;
+ m_tupleSet =
+ new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows))
+ TupleSet[m_maxRows];
clearTupleSet();
}
@@ -1352,18 +1400,23 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
m_startIndicator(false),
m_commitIndicator(false),
m_prunability(Prune_No),
- m_pruneHashVal(0)
+ m_pruneHashVal(0),
+ m_operationAlloc(sizeof(NdbQueryOperationImpl)),
+ m_tupleSetAlloc(sizeof(NdbResultStream::TupleSet)),
+ m_resultStreamAlloc(sizeof(NdbResultStream)),
+ m_pointerAlloc(sizeof(void*)),
+ m_rowBufferAlloc(sizeof(char))
{
// Allocate memory for all m_operations[] in a single chunk
m_countOperations = queryDef.getNoOfOperations();
- Uint32 size = m_countOperations *
- static_cast<Uint32>(sizeof(NdbQueryOperationImpl));
- m_operations = static_cast<NdbQueryOperationImpl*> (::operator new(size));
- if (unlikely(m_operations == NULL))
+ const int error = m_operationAlloc.init(m_countOperations);
+ if (unlikely(error != 0))
{
- setErrorCode(Err_MemoryAlloc);
+ setErrorCode(error);
return;
}
+ m_operations = reinterpret_cast<NdbQueryOperationImpl*>
+ (m_operationAlloc.allocObjMem(m_countOperations));
// Then; use placement new to construct each individual
// NdbQueryOperationImpl object in m_operations
@@ -1375,11 +1428,10 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
if (m_error.code != 0)
{
// Destroy those objects that we have already constructed.
- for (Uint32 j=0; j<=i; j++)
+ for (int j = static_cast<int>(i)-1; j>= 0; j--)
{
m_operations[j].~NdbQueryOperationImpl();
}
- ::operator delete(m_operations);
m_operations = NULL;
return;
}
@@ -1402,7 +1454,6 @@ NdbQueryImpl::~NdbQueryImpl()
for (int i=m_countOperations-1; i>=0; --i)
{ m_operations[i].~NdbQueryOperationImpl();
}
- ::operator delete(m_operations);
m_operations = NULL;
}
delete[] m_rootFrags;
@@ -1418,6 +1469,9 @@ NdbQueryImpl::postFetchRelease()
{ m_operations[i].postFetchRelease();
}
}
+ m_rowBufferAlloc.reset();
+ m_tupleSetAlloc.reset();
+ m_resultStreamAlloc.reset();
}
@@ -2278,10 +2332,57 @@ NdbQueryImpl::prepareSend()
m_rootFragCount = 1;
}
+ int error = m_resultStreamAlloc.init(m_rootFragCount * getNoOfOperations());
+ if (error != 0)
+ {
+ setErrorCode(error);
+ return -1;
+ }
+ // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
+ error = m_pointerAlloc.init(m_rootFragCount *
+ (SharedFragStack::pointersPerFragment +
+ OrderedFragSet::pointersPerFragment +
+ // Pointers to NdbResultStream objects.
+ getNoOfOperations()));
+ if (error != 0)
+ {
+ setErrorCode(error);
+ return -1;
+ }
+
// Some preparation for later batchsize calculations pr. (sub) scan
getRoot().calculateBatchedRows(NULL);
getRoot().setBatchedRows(1);
+ /** Calculate total amount of row buffer space for all operations and
+ * fragments.*/
+ Uint32 totalBuffSize = 0;
+ for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
+ {
+ NdbQueryOperationImpl& op = getQueryOperation(opNo);
+
+ op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
+ totalBuffSize += op.m_bufferSize;
+ }
+ /** Add one word per operation for buffer overrun check. We add a word
+ * rather than a byte to avoid possible alignment problems.
+ */
+ m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount +
+ sizeof(Uint32) * getNoOfOperations());
+ if (getQueryDef().isScanQuery())
+ {
+ Uint32 totalRows = 0;
+ for (Uint32 i = 0; i<getNoOfOperations(); i++)
+ {
+ totalRows += getQueryOperation(i).getMaxBatchRows();
+ }
+ error = m_tupleSetAlloc.init(m_rootFragCount * totalRows);
+ if (unlikely(error != 0))
+ {
+ setErrorCode(error);
+ return -1;
+ }
+ }
// 1. Build receiver structures for each QueryOperation.
// 2. Fill in parameters (into ATTRINFO) for QueryTree.
// (Has to complete *after* ::prepareReceiver() as QueryTree params
@@ -2315,12 +2416,13 @@ NdbQueryImpl::prepareSend()
keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
assert(keyRec!=NULL);
}
- int error;
- if (unlikely((error = m_applFrags.prepare(getRoot().getOrdering(),
- m_rootFragCount,
- keyRec,
- getRoot().m_ndbRecord)) != 0)
- || (error = m_fullFrags.prepare(m_rootFragCount)) != 0) {
+ if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
+ getRoot().getOrdering(),
+ m_rootFragCount,
+ keyRec,
+ getRoot().m_ndbRecord)) != 0)
+ || (error = m_fullFrags.prepare(m_pointerAlloc,
+ m_rootFragCount)) != 0) {
setErrorCode(error);
return -1;
}
@@ -2931,15 +3033,15 @@ NdbQueryImpl::SharedFragStack::SharedFra
{}
int
-NdbQueryImpl::SharedFragStack::prepare(int capacity)
+NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
+ int capacity)
{
assert(m_array==NULL);
assert(m_capacity==0);
if (capacity > 0)
{ m_capacity = capacity;
- m_array = new NdbRootFragment*[capacity];
- if (unlikely(m_array==NULL))
- return Err_MemoryAlloc;
+ m_array =
+ reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
}
return 0;
}
@@ -2971,15 +3073,14 @@ NdbQueryImpl::OrderedFragSet::OrderedFra
NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
{
- delete[] m_activeFrags;
m_activeFrags = NULL;
- delete[] m_emptiedFrags;
m_emptiedFrags= NULL;
}
int
-NdbQueryImpl::OrderedFragSet::prepare(NdbQueryOptions::ScanOrdering ordering,
+NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
+ NdbQueryOptions::ScanOrdering ordering,
int capacity,
const NdbRecord* keyRecord,
const NdbRecord* resultRecord)
@@ -2991,13 +3092,12 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
if (capacity > 0)
{
m_capacity = capacity;
- m_activeFrags = new NdbRootFragment*[capacity];
- if (unlikely(m_activeFrags==NULL))
- return Err_MemoryAlloc;
+
+ m_activeFrags =
+ reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
- m_emptiedFrags = new NdbRootFragment*[capacity];
- if (unlikely(m_emptiedFrags==NULL))
- return Err_MemoryAlloc;
+ m_emptiedFrags =
+ reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
}
m_ordering = ordering;
@@ -3254,7 +3354,7 @@ NdbQueryOperationImpl::NdbQueryOperation
m_resultStreams(NULL),
m_params(),
m_bufferSize(0),
- m_batchBuffer(NULL),
+ m_batchOverflowCheck(NULL),
m_resultBuffer(NULL),
m_resultRef(NULL),
m_isRowNull(true),
@@ -3265,7 +3365,8 @@ NdbQueryOperationImpl::NdbQueryOperation
m_ordering(NdbQueryOptions::ScanOrdering_unordered),
m_interpretedCode(NULL),
m_diskInUserProjection(false),
- m_parallelism(0)
+ m_parallelism(0),
+ m_rowSize(0xffffffff)
{
if (errno == ENOMEM)
{
@@ -3304,7 +3405,7 @@ NdbQueryOperationImpl::~NdbQueryOperatio
{
// We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
// Either by fetching through last row, or calling ::close() which forcefully terminates fetch
- assert (m_batchBuffer == NULL);
+ assert (m_batchOverflowCheck == NULL);
assert (m_resultStreams == NULL);
assert (m_firstRecAttr == NULL);
assert (m_interpretedCode == NULL);
@@ -3317,26 +3418,21 @@ NdbQueryOperationImpl::~NdbQueryOperatio
void
NdbQueryOperationImpl::postFetchRelease()
{
- if (m_batchBuffer) {
-#ifndef NDEBUG // Buffer overrun check activated.
- { const Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
- assert(m_batchBuffer[bufLen+0] == 'a' &&
- m_batchBuffer[bufLen+1] == 'b' &&
- m_batchBuffer[bufLen+2] == 'c' &&
- m_batchBuffer[bufLen+3] == 'd');
- }
-#endif
- delete[] m_batchBuffer;
- m_batchBuffer = NULL;
- }
-
- if (m_resultStreams) {
- for(Uint32 i = 0; i<getQuery().getRootFragCount(); i ++){
- delete m_resultStreams[i];
+ // Buffer overrun check.
+ assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck == 0xacbd1234);
+ m_batchOverflowCheck = NULL;
+
+ if (m_resultStreams != NULL)
+ {
+ for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
+ {
+ if (m_resultStreams[i] != NULL)
+ {
+ m_resultStreams[i]->~NdbResultStream();
+ }
}
- delete[] m_resultStreams;
- m_resultStreams = NULL;
}
+ m_resultStreams = NULL;
Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
NdbRecAttr* recAttr = m_firstRecAttr;
@@ -3519,7 +3615,6 @@ NdbQueryOperationImpl::setResultRowBuf (
m_ndbRecord = rec;
m_read_mask = result_mask;
m_resultBuffer = resBuffer;
- assert(m_batchBuffer==NULL);
return 0;
}
@@ -3947,46 +4042,19 @@ NdbQueryOperationImpl::setBatchedRows(Ui
int
NdbQueryOperationImpl::prepareReceiver()
{
- const Uint32 rowSize =
- NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr,0,false);
- m_bufferSize = rowSize * getMaxBatchRows();
-//ndbout "m_bufferSize=" << m_bufferSize << endl;
-
- if (m_bufferSize > 0) { // 0 bytes in batch if no result requested
- Uint32 bufLen = m_bufferSize*m_queryImpl.getRootFragCount();
-#ifdef NDEBUG
- m_batchBuffer = new char[bufLen];
- if (unlikely(m_batchBuffer == NULL)) {
- return Err_MemoryAlloc;
- }
-#else
- /* To be able to check for buffer overrun.*/
- m_batchBuffer = new char[bufLen+4];
- if (unlikely(m_batchBuffer == NULL)) {
- return Err_MemoryAlloc;
- }
- m_batchBuffer[bufLen+0] = 'a';
- m_batchBuffer[bufLen+1] = 'b';
- m_batchBuffer[bufLen+2] = 'c';
- m_batchBuffer[bufLen+3] = 'd';
-#endif
- }
-
// Construct receiver streams and prepare them for receiving scan result
assert(m_resultStreams==NULL);
assert(m_queryImpl.getRootFragCount() > 0);
- m_resultStreams = new NdbResultStream*[m_queryImpl.getRootFragCount()];
- if (unlikely(m_resultStreams == NULL)) {
- return Err_MemoryAlloc;
- }
+
+ m_resultStreams = reinterpret_cast<NdbResultStream**>
+ (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount()));
+
for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
m_resultStreams[i] = NULL; // Init to legal contents for d'tor
}
for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
- m_resultStreams[i] = new NdbResultStream(*this, i);
- if (unlikely(m_resultStreams[i] == NULL)) {
- return Err_MemoryAlloc;
- }
+ m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
+ NdbResultStream(*this, i);
const int error = m_resultStreams[i]->prepare();
if (unlikely(error)) {
return error;
@@ -3994,17 +4062,23 @@ NdbQueryOperationImpl::prepareReceiver()
m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION,
false, this);
+ char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
+ .allocObjMem(m_bufferSize));
m_resultStreams[i]->getReceiver()
.do_setup_ndbrecord(m_ndbRecord,
getMaxBatchRows(),
0 /*key_size*/,
0 /*read_range_no*/,
- rowSize,
- &m_batchBuffer[m_bufferSize*i],
+ getRowSize(),
+ rowBuf,
0);
m_resultStreams[i]->getReceiver().prepareSend();
}
-
+ // So that we can test for for buffer overrun.
+ m_batchOverflowCheck =
+ reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
+ .allocObjMem(sizeof(Uint32)));
+ *m_batchOverflowCheck = 0xacbd1234;
return 0;
}//NdbQueryOperationImpl::prepareReceiver
@@ -4850,6 +4924,16 @@ NdbQueryOperationImpl::getReceiver(Uint3
return m_resultStreams[recNo]->getReceiver();
}
+Uint32 NdbQueryOperationImpl::getRowSize() const
+{
+ // Check if row size has been computed yet.
+ if (m_rowSize == 0xffffffff)
+ {
+ m_rowSize =
+ NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+ }
+ return m_rowSize;
+}
/** For debugging.*/
NdbOut& operator<<(NdbOut& out, const NdbQueryOperationImpl& op){
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-02-08 12:14:41 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-02-24 14:16:51 +0000
@@ -37,6 +37,57 @@ class NdbReceiver;
class NdbOut;
class NdbRootFragment;
+/**
+ * This class simplifies the task of allocating memory for many instances
+ * of the same type at once and then constructing them later.
+ */
+class NdbBulkAllocator
+{
+public:
+ /**
+ * @param[in] objSize Size (in bytes) of each object.
+ */
+ explicit NdbBulkAllocator(size_t objSize);
+ ~NdbBulkAllocator()
+ { reset(); };
+
+ /**
+ * Allocate memory for a number of objects from the heap.
+ * @param[in] maxObjs The maximal number of objects this instance should
+ * accomodate.
+ * @return 0 or possible error code.
+ */
+ int init(Uint32 maxObjs);
+
+ /** Release allocated memory to heap and reset *this to initial state.*/
+ void reset();
+
+ /**
+ * Get an area large enough to hold 'noOfObjs' objects.
+ * @return The memory area.
+ */
+ void* allocObjMem(Uint32 noOfObjs);
+private:
+ /** An end marker for checking for buffer overrun.*/
+ static const char endMarker = -15;
+
+ /** Size of each object (in bytes).*/
+ const size_t m_objSize;
+
+ /** The number of objects this instance can accomodate.*/
+ Uint32 m_maxObjs;
+
+ /** The allocated memory area.*/
+ char* m_buffer;
+
+ /** The number of object areas allocated so far.*/
+ Uint32 m_nextObjNo;
+
+ // No copying.
+ NdbBulkAllocator(const NdbBulkAllocator&);
+ NdbBulkAllocator& operator= (const NdbBulkAllocator&);
+};
+
/** This class is the internal implementation of the interface defined by
* NdbQuery. This class should thus not be visible to the application
* programmer. @see NdbQuery.*/
@@ -194,6 +245,9 @@ public:
Uint32 getRootFragCount() const
{ return m_rootFragCount; }
+ NdbBulkAllocator& getTupleSetAlloc()
+ { return m_tupleSetAlloc; }
+
private:
/** Possible return values from NdbQueryImpl::awaitMoreResults.
* A subset of the integer values also matches those returned
@@ -218,15 +272,18 @@ private:
*/
class SharedFragStack{
public:
- explicit SharedFragStack();
+ // For calculating need for dynamically allocated memory.
+ static const Uint32 pointersPerFragment = 2;
- ~SharedFragStack() {
- delete[] m_array;
- }
+ explicit SharedFragStack();
- // Prepare internal datastructures.
- // Return 0 if ok, else errorcode
- int prepare(int capacity);
+ /**
+ * Prepare internal datastructures.
+ * param[in] allocator For allocating arrays of pointers.
+ * param[in] capacity Max no of root fragments.
+ * @return 0 if ok, else errorcode
+ */
+ int prepare(NdbBulkAllocator& allocator, int capacity);
NdbRootFragment* pop() {
return m_current>=0 ? m_array[m_current--] : NULL;
@@ -260,14 +317,23 @@ private:
class OrderedFragSet{
public:
+ // For calculating need for dynamically allocated memory.
+ static const Uint32 pointersPerFragment = 1;
+
explicit OrderedFragSet();
~OrderedFragSet();
- // Prepare internal datastructures.
- // Return 0 if ok, else errorcode
- int prepare(NdbQueryOptions::ScanOrdering ordering,
- int size,
+ /**
+ * Prepare internal datastructures.
+ * param[in] allocator For allocating arrays of pointers.
+ * param[in] ordering Possible scan ordering.
+ * param[in] capacity Max no of root fragments.
+ * @return 0 if ok, else errorcode
+ */
+ int prepare(NdbBulkAllocator& allocator,
+ NdbQueryOptions::ScanOrdering ordering,
+ int capacity,
const NdbRecord* keyRecord,
const NdbRecord* resultRecord);
@@ -444,6 +510,21 @@ private:
* fragment that should be scanned.*/
Uint32 m_pruneHashVal;
+ /** Allocator for NdbQueryOperationImpl objects.*/
+ NdbBulkAllocator m_operationAlloc;
+
+ /** Allocator for NdbResultStream::TupleSet objects.*/
+ NdbBulkAllocator m_tupleSetAlloc;
+
+ /** Allocator for NdbResultStream objects.*/
+ NdbBulkAllocator m_resultStreamAlloc;
+
+ /** Allocator for pointers.*/
+ NdbBulkAllocator m_pointerAlloc;
+
+ /** Allocator for result row buffers.*/
+ NdbBulkAllocator m_rowBufferAlloc;
+
// Only constructable from factory ::buildQuery();
explicit NdbQueryImpl(
NdbTransaction& trans,
@@ -495,6 +576,13 @@ private:
* @return: 'true' if its time to resume appl. threads
*/
bool handleBatchComplete(Uint32 rootFragNo);
+
+ NdbBulkAllocator& getPointerAlloc()
+ { return m_pointerAlloc; }
+
+ NdbBulkAllocator& getRowBufferAlloc()
+ { return m_rowBufferAlloc; }
+
}; // class NdbQueryImpl
@@ -660,13 +748,12 @@ private:
/** Buffer size allocated for *each* ResultStream/Receiver when
* fetching results.*/
Uint32 m_bufferSize;
- /** Internally allocated temp. buffer for *all* m_resultStreams[]
- * when receiving a batch. (m_bufferSize x #ResultStreams) */
- char* m_batchBuffer;
+ /** Used for checking if buffer overrun occurred. */
+ Uint32* m_batchOverflowCheck;
/** User specified buffer for final storage of result.*/
char* m_resultBuffer;
/** User specified pointer to application pointer that should be
- * set to point to the current row inside m_batchBuffer
+ * set to point to the current row inside a receiver buffer
* @see NdbQueryOperationImpl::setResultRowRef */
const char** m_resultRef;
/** True if this operation gave no result for the current row.*/
@@ -695,6 +782,9 @@ private:
*/
Uint32 m_parallelism;
+ /** Size of each result row (in bytes).*/
+ mutable Uint32 m_rowSize;
+
explicit NdbQueryOperationImpl(NdbQueryImpl& queryImpl,
const NdbQueryOperationDefImpl& def);
~NdbQueryOperationImpl();
@@ -771,6 +861,7 @@ private:
bool diskInUserProjection() const
{ return m_diskInUserProjection; }
+ Uint32 getRowSize() const;
}; // class NdbQueryOperationImpl
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3441to 3442) | Jan Wedvik | 24 Feb |