#At file:///export/home2/tmp/jw1159207/mysql/push_spj/ based on revid:ole.john.aske@stripped
2933 Jan Wedvik 2009-09-02
NdbRecAttr result retrieval has been implemented for scan
queries (and made to work again for lookup queries.)
Query operations with empty user projections used to cause core
dumps. This has now been fixed.
Missing use of NdbError objects has been fixed in several places.
Some previously empty getter methods have been implemented.
modified:
storage/ndb/include/ndbapi/NdbRecAttr.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/test/tools/test_spj.cpp
=== modified file 'storage/ndb/include/ndbapi/NdbRecAttr.hpp'
--- a/storage/ndb/include/ndbapi/NdbRecAttr.hpp 2009-06-13 18:56:08 +0000
+++ b/storage/ndb/include/ndbapi/NdbRecAttr.hpp 2009-09-02 07:15:55 +0000
@@ -81,6 +81,7 @@ class NdbRecAttr
friend class NdbEventOperationImpl;
friend class NdbReceiver;
friend class Ndb;
+ friend class NdbQueryOperationImpl;
friend class NdbOut& operator<<(class NdbOut&, const class AttributeS&);
#endif
=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2009-08-26 08:33:16 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2009-09-02 07:15:55 +0000
@@ -150,6 +150,7 @@ class NdbTransaction
friend class NdbIndexScanOperation;
friend class NdbBlob;
friend class ha_ndbcluster;
+ friend class NdbQueryImpl;
friend class NdbQueryOperationImpl;
#endif
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-09-01 13:36:16 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-09-02 07:15:55 +0000
@@ -27,6 +27,15 @@
#include "AttributeHeader.hpp"
#include "NdbRecord.hpp"
#include "TransporterFacade.hpp"
+#include "NdbRecAttr.hpp"
+
+/* Various error codes that are not specific to NdbQuery. */
+STATIC_CONST(Err_MemoryAlloc = 4000);
+STATIC_CONST(Err_UnknownColumn = 4004);
+STATIC_CONST(Err_ReceiveFromNdbFailed = 4008);
+STATIC_CONST(Err_NodeFailCausedAbort = 4028);
+STATIC_CONST(Err_MixRecAttrAndRecord = 4284);
+STATIC_CONST(Err_DifferentTabForKeyRecAndAttrRec = 4287);
NdbQuery::NdbQuery(NdbQueryImpl& impl):
m_impl(impl)
@@ -274,8 +283,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
NdbQueryImpl::~NdbQueryImpl()
{
- this->release();
-
if (m_id != NdbObjectIdMap::InvalidId) {
m_transaction.getNdb()->theImpl->theNdbObjectIdMap.unmap(m_id, this);
}
@@ -309,13 +316,19 @@ NdbQueryImpl::getNoOfOperations() const
NdbQueryOperationImpl&
NdbQueryImpl::getQueryOperation(Uint32 index) const
{
+ assert(index<m_countOperations);
return m_operations[index];
}
NdbQueryOperationImpl*
NdbQueryImpl::getQueryOperation(const char* ident) const
{
- return NULL; // FIXME
+ for(Uint32 i = 0; i<m_countOperations; i++){
+ if(strcmp(m_operations[i].getQueryOperationDef().getName(), ident) == 0){
+ return &m_operations[i];
+ }
+ }
+ return NULL;
}
Uint32
@@ -354,6 +367,14 @@ NdbQueryImpl::getNdbTransaction() const
return &m_transaction;
}
+void
+NdbQueryImpl::setErrorCodeAbort(int aErrorCode){
+ m_error.code = aErrorCode;
+ getNdbTransaction()->theErrorLine = 0;
+ getNdbTransaction()->theErrorOperation = NULL;
+ getNdbTransaction()->setOperationErrorCodeAbort(aErrorCode);
+}
+
bool
NdbQueryImpl::execTCKEYCONF(){
ndbout << "NdbQueryImpl::execTCKEYCONF() m_pendingStreams="
@@ -438,14 +459,6 @@ NdbQueryImpl::prepareSend(){
return 0;
}
-void
-NdbQueryImpl::release(){
- // FIXME: Should this be called as part of destructor?
- for(Uint32 i = 0; i < m_countOperations; i++){
- m_operations[i].release();
- }
-}
-
////////////////////////////////////////////////////
///////// NdbQueryOperationImpl methods ///////////
////////////////////////////////////////////////////
@@ -474,7 +487,9 @@ NdbQueryOperationImpl::NdbQueryOperation
m_applStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
queryImpl.getParallelism() : 0),
m_fullStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
- queryImpl.getParallelism() : 0)
+ queryImpl.getParallelism() : 0),
+ m_firstRecAttr(NULL),
+ m_lastRecAttr(NULL)
{
assert(m_id != NdbObjectIdMap::InvalidId);
@@ -499,29 +514,33 @@ NdbQueryOperationImpl::NdbQueryOperation
}
NdbQueryOperationImpl::~NdbQueryOperationImpl(){
+ Ndb* const ndb = m_queryImpl.getNdbTransaction()->getNdb();
if (m_id != NdbObjectIdMap::InvalidId) {
- m_queryImpl.getNdbTransaction()->getNdb()->theImpl
- ->theNdbObjectIdMap.unmap(m_id, this);
- }
- if(m_batchBuffer){
-#ifndef NDEBUG // Buffer overrun check activated.
- // Check against buffer overun.
- assert(m_batchBuffer[m_batchByteSize*getQuery().getParallelism()] == 'a' &&
- m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+1]
- == 'b' &&
- m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+2]
- == 'c' &&
- m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+3]
- == 'd');
-#endif
- delete[] m_batchBuffer;
+ ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
}
+ // Check against buffer overun.
+ assert(m_batchBuffer==NULL ||
+ (m_batchBuffer[m_batchByteSize*getQuery().getParallelism()]
+ == 'a' &&
+ m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+1]
+ == 'b' &&
+ m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+2]
+ == 'c' &&
+ m_batchBuffer[m_batchByteSize*getQuery().getParallelism()+3]
+ == 'd'));
+ delete[] m_batchBuffer;
if (m_resultStreams) {
for(Uint32 i = 0; i<getQuery().getParallelism(); i ++){
delete m_resultStreams[i];
}
delete[] m_resultStreams;
}
+
+ NdbRecAttr* recAttr;
+ while(recAttr!=NULL){
+ ndb->releaseRecAttr(recAttr);
+ recAttr = recAttr->next();
+ }
}
@@ -557,7 +576,8 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* const column
= m_operationDef.getTable().getColumn(anAttrName);
if(unlikely(column==NULL)){
- return NULL; // FIXME: Don't return NULL wo/ setting errorcode
+ getQuery().setErrorCodeAbort(Err_UnknownColumn);
+ return NULL;
} else {
return getValue(column, resultBuffer);
}
@@ -571,6 +591,7 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* const column
= m_operationDef.getTable().getColumn(anAttrId);
if(unlikely(column==NULL)){
+ getQuery().setErrorCodeAbort(Err_UnknownColumn);
return NULL;
} else {
return getValue(column, resultBuffer);
@@ -582,17 +603,40 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* column,
char* resultBuffer)
{
- /* This code will only work for the lookup example in test_spj.cpp.
- */
if(unlikely(m_resultStyle == Style_NdbRecord)){
+ getQuery().setErrorCode(Err_MixRecAttrAndRecord);
return NULL;
}
m_resultStyle = Style_NdbRecAttr;
- if(unlikely(m_userProjection.addColumn(*column) !=0)){
+ const int addResult = m_userProjection.addColumn(*column);
+ if(unlikely(addResult !=0)){
+ getQuery().setErrorCode(addResult);
return NULL;
}
- return NULL; // FIXME
- //return m_receiver.getValue(&NdbColumnImpl::getImpl(*column), resultBuffer);
+ Ndb* const ndb = getQuery().getNdbTransaction()->getNdb();
+ NdbRecAttr* const recAttr = ndb->getRecAttr();
+ if(unlikely(recAttr == NULL)){
+ getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+ }
+ if(unlikely(recAttr->setup(column, resultBuffer))){
+ ndb->releaseRecAttr(recAttr);
+ getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+ return NULL;
+ }
+ // Append to tail of list.
+ if(m_firstRecAttr == NULL){
+ m_firstRecAttr = recAttr;
+ }else{
+ m_lastRecAttr->next(recAttr);
+ }
+ m_lastRecAttr = recAttr;
+ assert(recAttr->next()==NULL);
+ /* For all operations, results are handled as scan results. And for scan
+ * results, an NdbRecord is always needed.*/
+ if(m_ndbRecord==NULL){
+ m_ndbRecord = m_operationDef.getTable().getDefaultRecord();
+ }
+ return recAttr;
}
static bool isSetInMask(const unsigned char* mask, int bitNo){
@@ -610,18 +654,21 @@ void
NdbQueryOperationImpl::findMaxRows(){
assert(m_operationDef.getQueryOperationIx()==0);
if(m_operationDef.isScanOperation()){
- const NdbReceiver& receiver = m_resultStreams[0]->m_receiver;
- // Root operation is a scan.
- Uint32 firstBatchRows = 0;
- Uint32 batchByteSize = 0;
- receiver.calculate_batch_size(0, // Key size.
- getQuery().getParallelism(),
- m_maxBatchRows,
- batchByteSize,
- firstBatchRows,
- m_ndbRecord);
- assert(m_maxBatchRows!=0);
- assert(firstBatchRows==m_maxBatchRows);
+ if(false){
+ const NdbReceiver& receiver = m_resultStreams[0]->m_receiver;
+ // Root operation is a scan.
+ Uint32 firstBatchRows = 0;
+ Uint32 batchByteSize = 0;
+ receiver.calculate_batch_size(0, // Key size.
+ getQuery().getParallelism(),
+ m_maxBatchRows,
+ batchByteSize,
+ firstBatchRows,
+ m_ndbRecord);
+ assert(m_maxBatchRows!=0);
+ assert(firstBatchRows==m_maxBatchRows);
+ }
+ m_maxBatchRows = 64;
}else{
// Lookup.
m_maxBatchRows = 1;
@@ -635,18 +682,20 @@ NdbQueryOperationImpl::setResultRowBuf (
char* resBuffer,
const unsigned char* result_mask)
{
- // FIXME: Errors must be set in the NdbError object owned by this operation.
if (rec->tableId !=
static_cast<Uint32>(m_operationDef.getTable().getTableId())){
/* The key_record and attribute_record in primary key operation do not
belong to the same table.*/
- return 4287;
+ getQuery().setErrorCode(Err_DifferentTabForKeyRecAndAttrRec);
+ return -1;
}
if(unlikely(m_resultStyle==Style_NdbRecAttr)){
/* Cannot mix NdbRecAttr and NdbRecord methods in one operation. */
- return 4284;
+ getQuery().setErrorCode(Err_MixRecAttrAndRecord);
+ return -1;
}else if(unlikely(m_resultStyle==Style_NdbRecord)){
- return QRY_RESULT_ROW_ALREADY_DEFINED;
+ getQuery().setErrorCode(QRY_RESULT_ROW_ALREADY_DEFINED);
+ return -1;
}
m_ndbRecord = rec;
m_resultStyle = Style_NdbRecord;
@@ -671,6 +720,25 @@ NdbQueryOperationImpl::setResultRowRef (
return setResultRowBuf(rec, NULL, result_mask);
}
+void
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+ NdbRecAttr* recAttr = m_firstRecAttr;
+ Uint32 posInRow = 0;
+ while(recAttr != NULL){
+ const char *attrData = NULL;
+ Uint32 attrSize = 0;
+ const int retVal1 = m_resultStreams[streamNo]->m_receiver
+ .getScanAttrData(attrData, attrSize, posInRow);
+ assert(retVal1==0);
+ assert(attrSize!=0);
+ assert(attrData!=NULL);
+ const bool retVal2 = recAttr
+ ->receive_data(reinterpret_cast<const Uint32*>(attrData), attrSize);
+ assert(retVal2);
+ recAttr = recAttr->next();
+ }
+}
+
NdbQuery::NextResultOutcome
NdbQueryOperationImpl::nextResult(NdbQueryImpl& queryImpl,
bool fetchAllowed,
@@ -688,17 +756,17 @@ NdbQueryOperationImpl::nextResult(NdbQue
switch(fetchResult){
case FetchResult_otherError:
// FIXME: copy semantics from NdbScanOperation.
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_sendFail:
// FIXME: copy semantics from NdbScanOperation.
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_nodeFail:
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_timeOut:
- queryImpl.setErrorCode(4008); // Timeout
+ queryImpl.setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
return NdbQuery::NextResult_error;
case FetchResult_ok:
break;
@@ -712,16 +780,21 @@ NdbQueryOperationImpl::nextResult(NdbQue
return NdbQuery::NextResult_bufferEmpty;
}
}
- const char* rootBuff = root.m_applStreams.top()->m_receiver.get_row();
+ const char* const rootBuff = root.m_applStreams.top()->m_receiver.get_row();
assert(rootBuff!=NULL);
- if(root.m_resultRef!=NULL){
- // Set application pointer to point into internal buffer.
- *root.m_resultRef = rootBuff;
- }else if(root.m_resultBuffer!=NULL){
- // Copy result to buffer supplied by application.
- memcpy(root.m_resultBuffer, rootBuff,
- root.m_applStreams.top()->m_receiver.m_record.m_ndb_record
+ if(root.m_resultStyle==Style_NdbRecAttr){
+ root.fetchRecAttrResults(root.m_applStreams.top()->m_streamNo);
+ }else if(root.m_resultStyle==Style_NdbRecord){
+ if(root.m_resultRef!=NULL){
+ // Set application pointer to point into internal buffer.
+ *root.m_resultRef = rootBuff;
+ }else{
+ assert(root.m_resultBuffer!=NULL);
+ // Copy result to buffer supplied by application.
+ memcpy(root.m_resultBuffer, rootBuff,
+ root.m_applStreams.top()->m_receiver.m_record.m_ndb_record
->m_row_size);
+ }
}
if(queryImpl.getQueryDef().isScanQuery()){
const Uint32 rowNo
@@ -741,14 +814,20 @@ NdbQueryOperationImpl::nextResult(NdbQue
if(operation.m_resultStreams[0]->m_transidAICount==1){
operation.m_isRowNull = false;
const char* buff = operation.m_resultStreams[0]->m_receiver.get_row();
- if(operation.m_resultRef!=NULL){
- // Set application pointer to point into internal buffer.
- *operation.m_resultRef = buff;
- }else if(operation.m_resultBuffer!=NULL){
- // Copy result to buffer supplied by application.
- memcpy(operation.m_resultBuffer, buff,
- operation.m_resultStreams[0]->m_receiver.m_record.m_ndb_record
+ if(operation.m_resultStyle==Style_NdbRecAttr){
+ operation.fetchRecAttrResults(0);
+ }else if(operation.m_resultStyle==Style_NdbRecord){
+ if(operation.m_resultRef!=NULL){
+ // Set application pointer to point into internal buffer.
+ *operation.m_resultRef = buff;
+ }else{
+ assert(operation.m_resultBuffer!=NULL);
+ // Copy result to buffer supplied by application.
+ memcpy(operation.m_resultBuffer, buff,
+ operation.m_resultStreams[0]
+ ->m_receiver.m_record.m_ndb_record
->m_row_size);
+ }
}
}else{
if(operation.m_resultRef!=NULL){
@@ -780,13 +859,18 @@ NdbQueryOperationImpl::updateChildResult
resultStream.m_receiver.setCurrentRow(rowNo);
const char* buff = resultStream.m_receiver.get_row();
assert(buff!=NULL);
- if(m_resultRef!=NULL){
- // Set application pointer to point into internal buffer.
- *m_resultRef = buff;
- }else if(m_resultBuffer!=NULL){
- // Copy result to buffer supplied by application.
- memcpy(m_resultBuffer, buff,
- resultStream.m_receiver.m_record.m_ndb_record->m_row_size);
+ if(m_resultStyle==Style_NdbRecAttr){
+ fetchRecAttrResults(streamNo);
+ }else if(m_resultStyle==Style_NdbRecord){
+ if(m_resultRef!=NULL){
+ // Set application pointer to point into internal buffer.
+ *m_resultRef = buff;
+ }else{
+ assert(m_resultBuffer!=NULL);
+ // Copy result to buffer supplied by application.
+ memcpy(m_resultBuffer, buff,
+ resultStream.m_receiver.m_record.m_ndb_record->m_row_size);
+ }
}
for(Uint32 i = 0; i<getNoOfChildOperations(); i++){
getChildOperation(i).updateChildResult(streamNo,
@@ -921,46 +1005,46 @@ NdbQueryOperationImpl::UserProjection
int
NdbQueryOperationImpl::UserProjection::serialize(Uint32Slice dst,
+ ResultStyle resultStyle,
bool withCorrelation) const{
/* If the columns in the projections are ordered according to ascending
* column number, we can pack the projection more compactly.*/
- if(m_isOrdered){
+ switch(resultStyle){
+ case Style_NdbRecord:
+ assert(m_isOrdered);
// Special case: get all columns.
if(m_columnCount==m_noOfColsInTable){
- dst.get(0) = withCorrelation ? 2 : 1; // Size of projection in words.
AttributeHeader::init(&dst.get(1),
AttributeHeader::READ_ALL,
m_columnCount);
- if(withCorrelation){
- AttributeHeader::init(&dst.get(2), AttributeHeader::READ_ANY_VALUE, 0);
- }
}else{
/* Serialize projection as a bitmap.*/
const Uint32 wordCount = 1+m_maxColNo/32; // Size of mask.
- // Size of projection in words.
- dst.get(0) = wordCount+ withCorrelation ? 2 : 1;
AttributeHeader::init(&dst.get(1),
AttributeHeader::READ_PACKED, 4*wordCount);
memcpy(&dst.get(2, wordCount), &m_mask, 4*wordCount);
- if(withCorrelation){
- AttributeHeader::init(&dst.get(wordCount+1),
- AttributeHeader::READ_ANY_VALUE, 0);
- }
}
- }else{
- /* General case: serialize projection as a list of column numbers.*/
- // Size of projection in words.
- dst.get(0) = m_columnCount+ withCorrelation ? 1 : 0 ;
+ break;
+ case Style_NdbRecAttr:
+ /* Serialize projection as a list of column numbers.*/
for(int i = 0; i<m_columnCount; i++){
AttributeHeader::init(&dst.get(i+1),
m_columns[i]->getColumnNo(),
0);
}
- if(withCorrelation){
- AttributeHeader::init(&dst.get(m_columnCount),
- AttributeHeader::READ_ANY_VALUE, 0);
- }
+ break;
+ case Style_None:
+ assert(m_columnCount==0);
+ break;
+ default:
+ assert(false);
+ }
+ if(withCorrelation){
+ AttributeHeader::init(&dst.get(dst.getSize()),
+ AttributeHeader::READ_ANY_VALUE, 0);
}
+ // Size of projection in words.
+ dst.get(0) = dst.getSize()-1;
if(unlikely(dst.isMaxSizeExceeded())){
return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
}
@@ -1068,25 +1152,28 @@ NdbQueryOperationImpl::prepareSend(Uint3
{
const NdbQueryOperationDefImpl& def = getQueryOperationDef();
- Uint32 rowSize = 0;
- if(m_ndbRecord==NULL){
- assert(false); // FIXME.
- Uint32 firstBatchRows = 0;
- Uint32 batchRows = 1;
- // Find size of single row.
- m_resultStreams[0]->m_receiver
- .calculate_batch_size(0,
- 1,
- batchRows,
- rowSize,
- firstBatchRows,
- m_ndbRecord);
- }else{
+ Uint32 rowSize;
+ switch(m_resultStyle){
+ case Style_NdbRecAttr:
+ rowSize = m_ndbRecord->m_row_size +
+ sizeof(AttributeHeader)* m_userProjection.getColumnCount();
+ break;
+ case Style_NdbRecord:
rowSize = m_ndbRecord->m_row_size;
+ break;
+ case Style_None:
+ /* The user projection is empty, but we still need the TRANSID_AI
+ * signals to extract the correlation data and to keep count of pending
+ * operations.*/
+ assert(m_ndbRecord == NULL);
+ m_ndbRecord = m_operationDef.getTable().getDefaultRecord();
+ rowSize = 0;
+ break;
+ default:
+ assert(false);
}
m_batchByteSize = rowSize * getRoot().m_maxBatchRows;
ndbout << "m_batchByteSize=" << m_batchByteSize << endl;
- assert(m_batchByteSize>0);
#ifdef NDEBUG
m_batchBuffer = new char[m_batchByteSize*getQuery().getParallelism()];
#else
@@ -1168,15 +1255,13 @@ NdbQueryOperationImpl::prepareSend(Uint3
serializedParams.append(m_params);
}
- if (true)
- {
- param.requestInfo |= DABits::PI_ATTR_LIST;
- const int error =
- m_userProjection.serialize(Uint32Slice(serializedParams),
- getQueryDef().isScanQuery());
- if (unlikely(error!=0)) {
- return error;
- }
+ param.requestInfo |= DABits::PI_ATTR_LIST;
+ const int error =
+ m_userProjection.serialize(Uint32Slice(serializedParams),
+ m_resultStyle,
+ getRoot().getQueryDef().isScanQuery());
+ if (unlikely(error!=0)) {
+ return error;
}
QueryNodeParameters::setOpLen(param.len,
@@ -1218,30 +1303,17 @@ NdbQueryOperationImpl::prepareSend(Uint3
}
-void NdbQueryOperationImpl::release(){
- for(Uint32 i = 0; i<getQuery().getParallelism(); i++)
- {
- m_resultStreams[i]->m_receiver.release();
- }
-}
-
-static bool getCorrelationData(const Uint32* ptr,
+static void getCorrelationData(const Uint32* ptr,
Uint32 len,
Uint32& receverId,
Uint32& correlationNum){
- if(len>=correlationWordCount){
- const Uint32* corrTail = ptr + len - correlationWordCount;
- const AttributeHeader attHead(corrTail[0]);
- if(attHead.getAttributeId() == AttributeHeader::READ_ANY_VALUE &&
- attHead.getByteSize()==8){
- receverId = corrTail[1];
- correlationNum = corrTail[2];
- return true;
- }
- }
- // FIXME: handle errors.
- assert(false);
- return false;
+ assert(len>=correlationWordCount);
+ const Uint32* corrTail = ptr + len - correlationWordCount;
+ const AttributeHeader attHead(corrTail[0]);
+ assert(attHead.getAttributeId() == AttributeHeader::READ_ANY_VALUE);
+ assert(attHead.getByteSize()==8);
+ receverId = corrTail[1];
+ correlationNum = corrTail[2];
}
bool
@@ -1263,9 +1335,8 @@ NdbQueryOperationImpl::execTRANSID_AI(co
streamNo++);
assert(streamNo<getQuery().getParallelism());
// Process result values.
- const int retVal = m_resultStreams[streamNo]->m_receiver
+ m_resultStreams[streamNo]->m_receiver
.execTRANSID_AI(ptr, len - correlationWordCount);
- assert(retVal==0); // FIXME.
m_resultStreams[streamNo]->m_transidAICount++;
/* Put into the map such that parent and child can be macthed.
@@ -1287,9 +1358,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
return false;
}else{
// The root operation is a lookup.
- const int retVal = m_resultStreams[0]->m_receiver
- .execTRANSID_AI(ptr, len);
- assert(retVal==0); // FIXME.
+ m_resultStreams[0]->m_receiver.execTRANSID_AI(ptr, len);
m_resultStreams[0]->m_transidAICount++;
m_resultStreams[0]->m_pendingResults--;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-09-01 13:36:16 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-09-02 07:15:55 +0000
@@ -83,9 +83,6 @@ public:
* @return possible error code.*/
int prepareSend();
- /** Release all NdbReceiver instances.*/
- void release();
-
bool checkMagicNumber() const
{ return m_magic == MAGIC; }
@@ -235,9 +232,6 @@ public:
* @return possible error code.*/
int prepareSend(Uint32Buffer& serializedParams);
- /** Release NdbReceiver objects.*/
- void release();
-
/* TODO: Remove this method. Only needed in spj_test.cpp.*/
/** Return I-value for putting in object map.*/
Uint32 ptr2int() const {
@@ -266,6 +260,14 @@ public:
{ return m_maxBatchRows; }
private:
+ /** NdbRecord and NdbRecAttr may not be combined. Also, results may not
+ * be requested for all operations.*/
+ enum ResultStyle{
+ Style_None, // Not set yet.
+ Style_NdbRecord, // Use old style result retrieval.
+ Style_NdbRecAttr, // Use NdbRecord.
+ };
+
/** This class represents a projection that shall be sent to the
* application.*/
class UserProjection{
@@ -279,9 +281,12 @@ private:
/** Make a serialize representation of this object, to be sent to the
* SPJ block.
* @param dst Buffer for storing serialized projection.
+ * @param style NdbRecord, NdbRecattr or empty projection.
* @param withCorrelation Include correlation data in projection.
* @return Possible error code.*/
- int serialize(Uint32Slice dst, bool withCorrelation) const;
+ int serialize(Uint32Slice dst,
+ ResultStyle resultStyle,
+ bool withCorrelation) const;
/** Get number of columns.*/
int getColumnCount() const {return m_columnCount;}
@@ -464,12 +469,8 @@ private:
Uint32Buffer m_params;
/** Projection to be sent to the application.*/
UserProjection m_userProjection;
- /** NdbRecord and old style result retrieval may not be combined.*/
- enum {
- Style_None, // Not set yet.
- Style_NdbRecord, // Use old style result retrieval.
- Style_NdbRecAttr, // Use NdbRecord.
- } m_resultStyle;
+ /** NdbRecord, NdbRecAttr or none.*/
+ ResultStyle m_resultStyle;
/** For temporary storing one result batch.*/
char* m_batchBuffer;
/** Buffer for final storage of result.*/
@@ -493,10 +494,18 @@ private:
* application thread and receiving thread. Access should be mutex protected.
*/
StreamStack m_fullStreams;
+ /** Points to head of list. Used for old-style result retrieval
+ * (using getValue()).*/
+ NdbRecAttr* m_firstRecAttr;
+ /** Points to tail of list. Used for old-style result retrieval
+ * (using getValue()).*/
+ NdbRecAttr* m_lastRecAttr;
/** Fetch result for non-root operation.*/
void updateChildResult(Uint32 resultStreamNo, Uint32 rowNo);
/** Get more scan results, ask for the next batch if necessary.*/
FetchResult fetchMoreResults(bool forceSend);
+ /** Copy any NdbRecAttr results into application buffers.*/
+ void fetchRecAttrResults(Uint32 streamNo);
}; // class NdbQueryOperationImpl
=== modified file 'storage/ndb/test/tools/test_spj.cpp'
--- a/storage/ndb/test/tools/test_spj.cpp 2009-08-27 16:30:26 +0000
+++ b/storage/ndb/test/tools/test_spj.cpp 2009-09-02 07:15:55 +0000
@@ -517,6 +517,74 @@ int spjTest(int argc, char** argv){
exit(-1); }
+int plainScan(int argc, char** argv){
+ NDB_INIT(argv[0]);
+
+ const char *load_default_groups[]= { "mysql_cluster",0 };
+ load_defaults("my",load_default_groups,&argc,&argv);
+ int ho_error;
+#ifndef DBUG_OFF
+ opt_debug= "d:t:O,/tmp/ndb_desc.trace";
+#endif
+ if ((ho_error=handle_options(&argc, &argv, my_long_options,
+ ndb_std_get_one_option)))
+ return NDBT_ProgramExit(NDBT_WRONGARGS);
+
+ Ndb_cluster_connection con(opt_connect_str);
+ if(con.connect(12, 5, 1) != 0)
+ {
+ ndbout << "Unable to connect to management server." << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ int res = con.wait_until_ready(30,30);
+ if (res != 0)
+ {
+ ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ Ndb myNdb(&con, _dbname);
+ if(myNdb.init() != 0){
+ ERR(myNdb.getNdbError());
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ NdbDictionary::Dictionary* const myDict = myNdb.getDictionary();
+ const NdbDictionary::Table* const tab = myDict->getTable("T");
+
+ assert(tab!=NULL);
+
+ //APIERROR(myDict->getNdbError());
+
+ NdbTransaction* myTransaction= myNdb.startTransaction();
+
+ NdbScanOperation* const scanOp = myTransaction->getNdbScanOperation(tab);
+ Uint32 results[6] = {0};
+ for(Uint32 i = 0; i<6; i++){
+ scanOp->getValue(i, reinterpret_cast<char*>(&results[i]));
+ }
+
+ assert(myTransaction->execute(NoCommit)==0);
+ bool done = false;
+ while(!done){
+ switch(scanOp->nextResult(true, false)){
+ case 0:
+ for(Uint32 i = 0; i<6; i++){
+ ndbout << results[i] << " ";
+ }
+ ndbout << endl;
+ break;
+ case 1:
+ done = true;
+ break;
+ default:
+ assert(0);
+ }
+ }
+ return 0;
+}; // plainScan()
+
int testSerialize(bool scan, int argc, char** argv){
@@ -578,14 +646,17 @@ int testSerialize(bool scan, int argc, c
const void* params[] = {NULL};
// Instantiate NdbQuery for this transaction.
NdbQuery* query = myTransaction->createQuery(scanDef, params);
- //Uint32 results[10][6];
- //char* resultPtr = NULL;
- const Uint32* scanResultPtr;
+ //const Uint32* scanResultPtr;
//const unsigned char* mask = NULL;
NdbQueryOperation* const scanOp = query->getQueryOperation(0U);
- scanOp->setResultRowRef(resultRec,
+ /*scanOp->setResultRowRef(resultRec,
reinterpret_cast<const char*&>(scanResultPtr),
- NULL);
+ NULL);*/
+ Uint32 scanResultPtr[6] = {0};
+ for(Uint32 i = 0; i<6; i++){
+ assert(scanOp->getValue(i, reinterpret_cast<char*>(scanResultPtr+i))
+ !=NULL);
+ }
const Uint32* lookupResultPtr;
NdbQueryOperation* const lookupOp = query->getQueryOperation(1);
lookupOp->setResultRowRef(resultRec,
@@ -689,7 +760,7 @@ int testSerialize(bool scan, int argc, c
/* Read all attributes from result tuples.*/
const Uint32 opCount = query->getNoOfOperations();
- const Uint32 recordOpCount = 2;
+ const Uint32 recordOpCount = 0;
const ResultSet** resultSet = new const ResultSet*[opCount-recordOpCount];
for(Uint32 i = 0; i < opCount-recordOpCount; i++){
resultSet[i]
@@ -697,8 +768,13 @@ int testSerialize(bool scan, int argc, c
}
const NdbRecord* resultRec = tab->getDefaultRecord();
assert(resultRec!=NULL);
- Uint32 results[recordOpCount][6] = {0};
- const unsigned char mask[] = {0x0e};
+ Uint32 results[2][6] = {0U};
+ const unsigned char mask[] = {0xe};
+ for(Uint32 i = 0; i<recordOpCount; i++){
+ for(Uint32 j = 0; j<6; j++){
+ results[i][j]= 0x0;
+ }
+ }
for(Uint32 i = 0; i<recordOpCount; i++){
const int error = query->getQueryOperation(i)
->setResultRowBuf(resultRec,
@@ -725,9 +801,10 @@ int testSerialize(bool scan, int argc, c
int main(int argc, char** argv){
- //return spjTest(argc, argv);
+ //plainScan(argc, argv)
testSerialize(false, argc, argv);
- return testSerialize(true, argc, argv);
+ testSerialize(true, argc, argv);
+ return 0;
}
/**
* Store list of 16-bit integers put into 32-bit integers
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20090902071555-g7x6d9dsvtfb8uq1.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2933) | Jan Wedvik | 2 Sep |