#At file:///export/home2/tmp/jw1159207/mysql/mysql-5.1-telco-7.0-spj/ based on revid:jan.wedvik@stripped
2958 Jan Wedvik 2009-08-31 [merge]
Before rewriting NdbQueryOperationImpl::fetchRecAttrResults()
modified:
storage/ndb/include/ndbapi/NdbRecAttr.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/test/tools/test_spj.cpp
=== modified file 'storage/ndb/include/ndbapi/NdbRecAttr.hpp'
--- a/storage/ndb/include/ndbapi/NdbRecAttr.hpp 2009-06-13 18:56:08 +0000
+++ b/storage/ndb/include/ndbapi/NdbRecAttr.hpp 2009-08-31 11:17:11 +0000
@@ -81,6 +81,7 @@ class NdbRecAttr
friend class NdbEventOperationImpl;
friend class NdbReceiver;
friend class Ndb;
+ friend class NdbQueryOperationImpl;
friend class NdbOut& operator<<(class NdbOut&, const class AttributeS&);
#endif
=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2009-08-27 14:09:21 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2009-08-31 11:17:11 +0000
@@ -150,6 +150,7 @@ class NdbTransaction
friend class NdbIndexScanOperation;
friend class NdbBlob;
friend class ha_ndbcluster;
+ friend class NdbQueryImpl;
friend class NdbQueryOperationImpl;
#endif
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-08-27 12:10:05 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-08-31 11:17:11 +0000
@@ -27,6 +27,15 @@
#include "AttributeHeader.hpp"
#include "NdbRecord.hpp"
#include "TransporterFacade.hpp"
+#include "NdbRecAttr.hpp"
+
+/* Various error codes that are not specific to NdbQuery. */
+STATIC_CONST(Err_MemoryAlloc = 4000);
+STATIC_CONST(Err_UnknownColumn = 4004);
+STATIC_CONST(Err_ReceiveFromNdbFailed = 4008);
+STATIC_CONST(Err_NodeFailCausedAbort = 4028);
+STATIC_CONST(Err_MixRecAttrAndRecord = 4284);
+STATIC_CONST(Err_DifferentTabForKeyRecAndAttrRec = 4287);
NdbQuery::NdbQuery(NdbQueryImpl& impl):
m_impl(impl)
@@ -354,6 +363,14 @@ NdbQueryImpl::getNdbTransaction() const
return &m_transaction;
}
+void
+NdbQueryImpl::setErrorCodeAbort(int aErrorCode){
+ m_error.code = aErrorCode;
+ getNdbTransaction()->theErrorLine = 0;
+ getNdbTransaction()->theErrorOperation = NULL;
+ getNdbTransaction()->setOperationErrorCodeAbort(aErrorCode);
+}
+
bool
NdbQueryImpl::execTCKEYCONF(){
ndbout << "NdbQueryImpl::execTCKEYCONF() m_pendingStreams="
@@ -474,7 +491,9 @@ NdbQueryOperationImpl::NdbQueryOperation
m_applStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
queryImpl.getParallelism() : 0),
m_fullStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
- queryImpl.getParallelism() : 0)
+ queryImpl.getParallelism() : 0),
+ m_firstRecAttr(NULL),
+ m_lastRecAttr(NULL)
{
assert(m_id != NdbObjectIdMap::InvalidId);
@@ -498,9 +517,9 @@ NdbQueryOperationImpl::NdbQueryOperation
}
NdbQueryOperationImpl::~NdbQueryOperationImpl(){
+ Ndb* const ndb = m_queryImpl.getNdbTransaction()->getNdb();
if (m_id != NdbObjectIdMap::InvalidId) {
- m_queryImpl.getNdbTransaction()->getNdb()->theImpl
- ->theNdbObjectIdMap.unmap(m_id, this);
+ ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
}
#ifndef NDEBUG // Buffer overrun check activated.
if(m_batchBuffer){
@@ -519,6 +538,11 @@ NdbQueryOperationImpl::~NdbQueryOperatio
delete m_resultStreams[i];
}
delete[] m_resultStreams;
+ NdbRecAttr* recAttr;
+ while(recAttr!=NULL){
+ ndb->releaseRecAttr(recAttr);
+ recAttr = recAttr->next();
+ }
}
@@ -566,6 +590,7 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* const column
= m_operationDef.getTable().getColumn(anAttrName);
if(unlikely(column==NULL)){
+ getQuery().setErrorCodeAbort(Err_UnknownColumn);
return NULL; // FIXME: Don't return NULL wo/ setting errorcode
} else {
return getValue(column, resultBuffer);
@@ -580,6 +605,7 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* const column
= m_operationDef.getTable().getColumn(anAttrId);
if(unlikely(column==NULL)){
+ getQuery().setErrorCodeAbort(Err_UnknownColumn);
return NULL;
} else {
return getValue(column, resultBuffer);
@@ -591,17 +617,40 @@ NdbQueryOperationImpl::getValue(
const NdbDictionary::Column* column,
char* resultBuffer)
{
- /* This code will only work for the lookup example in test_spj.cpp.
- */
if(unlikely(m_resultStyle == Style_NdbRecord)){
+ getQuery().setErrorCode(Err_MixRecAttrAndRecord);
return NULL;
}
m_resultStyle = Style_NdbRecAttr;
- if(unlikely(m_userProjection.addColumn(*column) !=0)){
+ const int addResult = m_userProjection.addColumn(*column);
+ if(unlikely(addResult !=0)){
+ getQuery().setErrorCode(addResult);
return NULL;
}
- return NULL; // FIXME
- //return m_receiver.getValue(&NdbColumnImpl::getImpl(*column), resultBuffer);
+ Ndb* const ndb = getQuery().getNdbTransaction()->getNdb();
+ NdbRecAttr* const recAttr = ndb->getRecAttr();
+ if(unlikely(recAttr == NULL)){
+ getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+ }
+ if(unlikely(recAttr->setup(column, resultBuffer))){
+ ndb->releaseRecAttr(recAttr);
+ getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+ return NULL;
+ }
+ // Append to tail of list.
+ if(m_firstRecAttr == NULL){
+ m_firstRecAttr = recAttr;
+ }else{
+ m_lastRecAttr->next(recAttr);
+ }
+ m_lastRecAttr = recAttr;
+ assert(recAttr->next()==NULL);
+ /* For all operations, results are handled as scan results. And for scan
+ * results, an NdbRecord is always needed.*/
+ if(m_ndbRecord==NULL){
+ m_ndbRecord = m_operationDef.getTable().getDefaultRecord();
+ }
+ return recAttr;
}
static bool isSetInMask(const unsigned char* mask, int bitNo){
@@ -649,11 +698,11 @@ NdbQueryOperationImpl::setResultRowBuf (
static_cast<Uint32>(m_operationDef.getTable().getTableId())){
/* The key_record and attribute_record in primary key operation do not
belong to the same table.*/
- return 4287;
+ return Err_DifferentTabForKeyRecAndAttrRec;
}
if(unlikely(m_resultStyle==Style_NdbRecAttr)){
/* Cannot mix NdbRecAttr and NdbRecord methods in one operation. */
- return 4284;
+ return Err_MixRecAttrAndRecord;
}else if(unlikely(m_resultStyle==Style_NdbRecord)){
return QRY_RESULT_ROW_ALREADY_DEFINED;
}
@@ -680,6 +729,25 @@ NdbQueryOperationImpl::setResultRowRef (
return setResultRowBuf(rec, NULL, result_mask);
}
+void
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+ NdbRecAttr* recAttr = m_firstRecAttr;
+ Uint32 posInRow = 0;
+ while(recAttr != NULL){
+ const char *attrData = NULL;
+ Uint32 attrSize = 0;
+ const int retVal1 = m_resultStreams[streamNo]->m_receiver
+ .getScanAttrData(attrData, attrSize, posInRow);
+ assert(retVal1==0);
+ assert(attrSize!=0);
+ assert(attrData!=NULL);
+ const bool retVal2 = recAttr
+ ->receive_data(reinterpret_cast<const Uint32*>(attrData), attrSize);
+ assert(retVal2);
+ recAttr = recAttr->next();
+ }
+}
+
NdbQuery::NextResultOutcome
NdbQueryOperationImpl::nextResult(NdbQueryImpl& queryImpl,
bool fetchAllowed,
@@ -697,17 +765,17 @@ NdbQueryOperationImpl::nextResult(NdbQue
switch(fetchResult){
case FetchResult_otherError:
// FIXME: copy semantics from NdbScanOperation.
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_sendFail:
// FIXME: copy semantics from NdbScanOperation.
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_nodeFail:
- queryImpl.setErrorCode(4028); // Node fail
+ queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
return NdbQuery::NextResult_error;
case FetchResult_timeOut:
- queryImpl.setErrorCode(4008); // Timeout
+ queryImpl.setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
return NdbQuery::NextResult_error;
case FetchResult_ok:
break;
@@ -721,8 +789,9 @@ NdbQueryOperationImpl::nextResult(NdbQue
return NdbQuery::NextResult_bufferEmpty;
}
}
- const char* rootBuff = root.m_applStreams.top()->m_receiver.get_row();
+ const char* const rootBuff = root.m_applStreams.top()->m_receiver.get_row();
assert(rootBuff!=NULL);
+ root.fetchRecAttrResults(root.m_applStreams.top()->m_streamNo);
if(root.m_resultRef!=NULL){
// Set application pointer to point into internal buffer.
*root.m_resultRef = rootBuff;
@@ -750,6 +819,7 @@ NdbQueryOperationImpl::nextResult(NdbQue
if(operation.m_resultStreams[0]->m_transidAICount==1){
operation.m_isRowNull = false;
const char* buff = operation.m_resultStreams[0]->m_receiver.get_row();
+ operation.fetchRecAttrResults(0);
if(operation.m_resultRef!=NULL){
// Set application pointer to point into internal buffer.
*operation.m_resultRef = buff;
@@ -789,6 +859,7 @@ NdbQueryOperationImpl::updateChildResult
resultStream.m_receiver.setCurrentRow(rowNo);
const char* buff = resultStream.m_receiver.get_row();
assert(buff!=NULL);
+ fetchRecAttrResults(streamNo);
if(m_resultRef!=NULL){
// Set application pointer to point into internal buffer.
*m_resultRef = buff;
@@ -933,7 +1004,7 @@ NdbQueryOperationImpl::UserProjection::s
bool withCorrelation) const{
/* If the columns in the projections are ordered according to ascending
* column number, we can pack the projection more compactly.*/
- if(m_isOrdered){
+ if(m_isOrdered && false){
// Special case: get all columns.
if(m_columnCount==m_noOfColsInTable){
dst.get(0) = withCorrelation ? 2 : 1; // Size of projection in words.
@@ -947,7 +1018,7 @@ NdbQueryOperationImpl::UserProjection::s
/* Serialize projection as a bitmap.*/
const Uint32 wordCount = 1+m_maxColNo/32; // Size of mask.
// Size of projection in words.
- dst.get(0) = wordCount+ withCorrelation ? 2 : 1;
+ dst.get(0) = wordCount+ (withCorrelation ? 2 : 1);
AttributeHeader::init(&dst.get(1),
AttributeHeader::READ_PACKED, 4*wordCount);
memcpy(&dst.get(2, wordCount), &m_mask, 4*wordCount);
@@ -959,14 +1030,14 @@ NdbQueryOperationImpl::UserProjection::s
}else{
/* General case: serialize projection as a list of column numbers.*/
// Size of projection in words.
- dst.get(0) = m_columnCount+ withCorrelation ? 1 : 0 ;
+ dst.get(0) = m_columnCount+ (withCorrelation ? 1 : 0);
for(int i = 0; i<m_columnCount; i++){
AttributeHeader::init(&dst.get(i+1),
m_columns[i]->getColumnNo(),
0);
}
if(withCorrelation){
- AttributeHeader::init(&dst.get(m_columnCount),
+ AttributeHeader::init(&dst.get(m_columnCount+1),
AttributeHeader::READ_ANY_VALUE, 0);
}
}
@@ -1077,21 +1148,12 @@ NdbQueryOperationImpl::prepareSend(Uint3
{
const NdbQueryOperationDefImpl& def = getQueryOperationDef();
- Uint32 rowSize = 0;
- if(m_ndbRecord==NULL){
- assert(false); // FIXME.
- Uint32 firstBatchRows = 0;
- Uint32 batchRows = 1;
- // Find size of single row.
- m_resultStreams[0]->m_receiver
- .calculate_batch_size(0,
- 1,
- batchRows,
- rowSize,
- firstBatchRows,
- m_ndbRecord);
- }else{
+ Uint32 rowSize;
+ if(m_ndbRecord!=NULL){
rowSize = m_ndbRecord->m_row_size;
+ }else{
+ // The application did not ask for any attributes
+ rowSize = correlationWordCount;
}
m_batchByteSize = rowSize * getRoot().m_maxBatchRows;
ndbout << "m_batchByteSize=" << m_batchByteSize << endl;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-08-27 12:10:05 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-08-31 11:17:11 +0000
@@ -504,10 +504,18 @@ private:
* application thread and receiving thread. Access should be mutex protected.
*/
StreamStack m_fullStreams;
+ /** Points to head of list. Used for old-style result retrieval
+ * (using getValue()).*/
+ NdbRecAttr* m_firstRecAttr;
+ /** Points to tail of list. Used for old-style result retrieval
+ * (using getValue()).*/
+ NdbRecAttr* m_lastRecAttr;
/** Fetch result for non-root operation.*/
void updateChildResult(Uint32 resultStreamNo, Uint32 rowNo);
/** Get more scan results, ask for the next batch if necessary.*/
FetchResult fetchMoreResults(bool forceSend);
+ /** Copy any NdbRecAttr results into application buffers.*/
+ void fetchRecAttrResults(Uint32 streamNo);
}; // class NdbQueryOperationImpl
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2009-08-27 14:09:21 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2009-08-31 11:17:11 +0000
@@ -40,7 +40,7 @@
#include <signaldata/SumaImpl.hpp>
//#define REPORT_TRANSPORTER
-//#define API_TRACE
+#define API_TRACE
static int numberToIndex(int number)
{
=== modified file 'storage/ndb/test/tools/test_spj.cpp'
--- a/storage/ndb/test/tools/test_spj.cpp 2009-08-27 16:17:48 +0000
+++ b/storage/ndb/test/tools/test_spj.cpp 2009-08-31 11:17:11 +0000
@@ -517,6 +517,74 @@ int spjTest(int argc, char** argv){
exit(-1); }
+int plainScan(int argc, char** argv){
+ NDB_INIT(argv[0]);
+
+ const char *load_default_groups[]= { "mysql_cluster",0 };
+ load_defaults("my",load_default_groups,&argc,&argv);
+ int ho_error;
+#ifndef DBUG_OFF
+ opt_debug= "d:t:O,/tmp/ndb_desc.trace";
+#endif
+ if ((ho_error=handle_options(&argc, &argv, my_long_options,
+ ndb_std_get_one_option)))
+ return NDBT_ProgramExit(NDBT_WRONGARGS);
+
+ Ndb_cluster_connection con(opt_connect_str);
+ if(con.connect(12, 5, 1) != 0)
+ {
+ ndbout << "Unable to connect to management server." << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ int res = con.wait_until_ready(30,30);
+ if (res != 0)
+ {
+ ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ Ndb myNdb(&con, _dbname);
+ if(myNdb.init() != 0){
+ ERR(myNdb.getNdbError());
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ NdbDictionary::Dictionary* const myDict = myNdb.getDictionary();
+ const NdbDictionary::Table* const tab = myDict->getTable("T");
+
+ assert(tab!=NULL);
+
+ //APIERROR(myDict->getNdbError());
+
+ NdbTransaction* myTransaction= myNdb.startTransaction();
+
+ NdbScanOperation* const scanOp = myTransaction->getNdbScanOperation(tab);
+ Uint32 results[6] = {0};
+ for(Uint32 i = 0; i<6; i++){
+ scanOp->getValue(i, reinterpret_cast<char*>(&results[i]));
+ }
+
+ assert(myTransaction->execute(NoCommit)==0);
+ bool done = false;
+ while(!done){
+ switch(scanOp->nextResult(true, false)){
+ case 0:
+ for(Uint32 i = 0; i<6; i++){
+ ndbout << results[i] << " ";
+ }
+ ndbout << endl;
+ break;
+ case 1:
+ done = true;
+ break;
+ default:
+ assert(0);
+ }
+ }
+ return 0;
+}; // plainScan()
+
int testSerialize(bool scan, int argc, char** argv){
@@ -697,8 +765,13 @@ int testSerialize(bool scan, int argc, c
}
const NdbRecord* resultRec = tab->getDefaultRecord();
assert(resultRec!=NULL);
- Uint32 results[recordOpCount][6] = {0};
- const unsigned char mask[] = {0x0e};
+ Uint32 results[recordOpCount][6] = {0U};
+ const unsigned char mask[] = {0xe};
+ /*for(Uint32 i = 0; i<recordOpCount; i++){
+ for(Uint32 j = 0; j<6; i++){
+ results[i][j]= 0x0;
+ }
+ }*/
for(Uint32 i = 0; i<recordOpCount; i++){
const int error = query->getQueryOperation(i)
->setResultRowBuf(resultRec,
@@ -726,8 +799,9 @@ int testSerialize(bool scan, int argc, c
int main(int argc, char** argv){
//return spjTest(argc, argv);
- testSerialize(false, argc, argv);
- return testSerialize(true, argc, argv);
+ //testSerialize(false, argc, argv);
+ //return 0;
+ return plainScan(argc, argv);//testSerialize(true, argc, argv);
}
/**
* Store list of 16-bit integers put into 32-bit integers
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20090831111711-ynlrujsp2fqq89x1.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2958) | Jan Wedvik | 31 Aug |