List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:August 31 2009 11:17am
Subject:bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2958)
View as plain text  
#At file:///export/home2/tmp/jw1159207/mysql/mysql-5.1-telco-7.0-spj/ based on revid:jan.wedvik@stripped

 2958 Jan Wedvik	2009-08-31 [merge]
      Before rewriting NdbQueryOperationImpl::fetchRecAttrResults()

    modified:
      storage/ndb/include/ndbapi/NdbRecAttr.hpp
      storage/ndb/include/ndbapi/NdbTransaction.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/test/tools/test_spj.cpp
=== modified file 'storage/ndb/include/ndbapi/NdbRecAttr.hpp'
--- a/storage/ndb/include/ndbapi/NdbRecAttr.hpp	2009-06-13 18:56:08 +0000
+++ b/storage/ndb/include/ndbapi/NdbRecAttr.hpp	2009-08-31 11:17:11 +0000
@@ -81,6 +81,7 @@ class NdbRecAttr
   friend class NdbEventOperationImpl;
   friend class NdbReceiver;
   friend class Ndb;
+  friend class NdbQueryOperationImpl;
   friend class NdbOut& operator<<(class NdbOut&, const class AttributeS&);
 #endif
 

=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2009-08-27 14:09:21 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2009-08-31 11:17:11 +0000
@@ -150,6 +150,7 @@ class NdbTransaction
   friend class NdbIndexScanOperation;
   friend class NdbBlob;
   friend class ha_ndbcluster;
+  friend class NdbQueryImpl;
   friend class NdbQueryOperationImpl;
 #endif
 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-08-27 12:10:05 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-08-31 11:17:11 +0000
@@ -27,6 +27,15 @@
 #include "AttributeHeader.hpp"
 #include "NdbRecord.hpp"
 #include "TransporterFacade.hpp"
+#include "NdbRecAttr.hpp"
+
+/* Various error codes that are not specific to NdbQuery. */
+STATIC_CONST(Err_MemoryAlloc = 4000);
+STATIC_CONST(Err_UnknownColumn = 4004);
+STATIC_CONST(Err_ReceiveFromNdbFailed = 4008);
+STATIC_CONST(Err_NodeFailCausedAbort = 4028);
+STATIC_CONST(Err_MixRecAttrAndRecord = 4284);
+STATIC_CONST(Err_DifferentTabForKeyRecAndAttrRec = 4287);
 
 NdbQuery::NdbQuery(NdbQueryImpl& impl):
   m_impl(impl)
@@ -354,6 +363,14 @@ NdbQueryImpl::getNdbTransaction() const
   return &m_transaction;
 }
 
+void
+NdbQueryImpl::setErrorCodeAbort(int aErrorCode){
+  m_error.code = aErrorCode;
+  getNdbTransaction()->theErrorLine = 0;
+  getNdbTransaction()->theErrorOperation = NULL;
+  getNdbTransaction()->setOperationErrorCodeAbort(aErrorCode);
+}
+
 bool 
 NdbQueryImpl::execTCKEYCONF(){
   ndbout << "NdbQueryImpl::execTCKEYCONF()  m_pendingStreams=" 
@@ -474,7 +491,9 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_applStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
                 queryImpl.getParallelism() : 0),
   m_fullStreams(def.getQueryOperationIx() == 0 ? // Needed for root only.
-                queryImpl.getParallelism() : 0)
+                queryImpl.getParallelism() : 0),
+  m_firstRecAttr(NULL),
+  m_lastRecAttr(NULL)
 { 
   assert(m_id != NdbObjectIdMap::InvalidId);
 
@@ -498,9 +517,9 @@ NdbQueryOperationImpl::NdbQueryOperation
 }
 
 NdbQueryOperationImpl::~NdbQueryOperationImpl(){
+  Ndb* const ndb = m_queryImpl.getNdbTransaction()->getNdb();
   if (m_id != NdbObjectIdMap::InvalidId) {
-    m_queryImpl.getNdbTransaction()->getNdb()->theImpl
-      ->theNdbObjectIdMap.unmap(m_id, this);
+    ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
   }
 #ifndef NDEBUG // Buffer overrun check activated.
   if(m_batchBuffer){
@@ -519,6 +538,11 @@ NdbQueryOperationImpl::~NdbQueryOperatio
     delete m_resultStreams[i];
   }
   delete[] m_resultStreams;
+  NdbRecAttr* recAttr;
+  while(recAttr!=NULL){
+    ndb->releaseRecAttr(recAttr);
+    recAttr = recAttr->next();
+  }
 }
 
 
@@ -566,6 +590,7 @@ NdbQueryOperationImpl::getValue(
   const NdbDictionary::Column* const column 
     = m_operationDef.getTable().getColumn(anAttrName);
   if(unlikely(column==NULL)){
+    getQuery().setErrorCodeAbort(Err_UnknownColumn);
     return NULL;  // FIXME: Don't return NULL wo/ setting errorcode
   } else {
     return getValue(column, resultBuffer);
@@ -580,6 +605,7 @@ NdbQueryOperationImpl::getValue(
   const NdbDictionary::Column* const column 
     = m_operationDef.getTable().getColumn(anAttrId);
   if(unlikely(column==NULL)){
+    getQuery().setErrorCodeAbort(Err_UnknownColumn);
     return NULL;
   } else {
     return getValue(column, resultBuffer);
@@ -591,17 +617,40 @@ NdbQueryOperationImpl::getValue(
                             const NdbDictionary::Column* column, 
                             char* resultBuffer)
 {
-  /* This code will only work for the lookup example in test_spj.cpp.
-   */
   if(unlikely(m_resultStyle == Style_NdbRecord)){
+    getQuery().setErrorCode(Err_MixRecAttrAndRecord);
     return NULL;
   }
   m_resultStyle = Style_NdbRecAttr;
-  if(unlikely(m_userProjection.addColumn(*column) !=0)){
+  const int addResult = m_userProjection.addColumn(*column);
+  if(unlikely(addResult !=0)){
+    getQuery().setErrorCode(addResult);
     return NULL;
   }
-  return NULL; // FIXME
-  //return m_receiver.getValue(&NdbColumnImpl::getImpl(*column), resultBuffer);
+  Ndb* const ndb = getQuery().getNdbTransaction()->getNdb();
+  NdbRecAttr* const recAttr = ndb->getRecAttr();
+  if(unlikely(recAttr == NULL)){
+    getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+  }
+  if(unlikely(recAttr->setup(column, resultBuffer))){
+    ndb->releaseRecAttr(recAttr);
+    getQuery().setErrorCodeAbort(Err_MemoryAlloc);
+    return NULL;
+  }
+  // Append to tail of list.
+  if(m_firstRecAttr == NULL){
+    m_firstRecAttr = recAttr;
+  }else{
+    m_lastRecAttr->next(recAttr);
+  }
+  m_lastRecAttr = recAttr;
+  assert(recAttr->next()==NULL);
+  /* For all operations, results are handled as scan results. And for scan
+   * results, an NdbRecord is always needed.*/
+  if(m_ndbRecord==NULL){
+    m_ndbRecord = m_operationDef.getTable().getDefaultRecord();
+  }
+  return recAttr;
 }
 
 static bool isSetInMask(const unsigned char* mask, int bitNo){
@@ -649,11 +698,11 @@ NdbQueryOperationImpl::setResultRowBuf (
       static_cast<Uint32>(m_operationDef.getTable().getTableId())){
     /* The key_record and attribute_record in primary key operation do not 
        belong to the same table.*/
-    return 4287;
+    return Err_DifferentTabForKeyRecAndAttrRec;
   }
   if(unlikely(m_resultStyle==Style_NdbRecAttr)){
     /* Cannot mix NdbRecAttr and NdbRecord methods in one operation. */
-    return 4284; 
+    return Err_MixRecAttrAndRecord; 
   }else if(unlikely(m_resultStyle==Style_NdbRecord)){
     return QRY_RESULT_ROW_ALREADY_DEFINED;
   }
@@ -680,6 +729,25 @@ NdbQueryOperationImpl::setResultRowRef (
   return setResultRowBuf(rec, NULL, result_mask);
 }
 
+void 
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+  NdbRecAttr* recAttr = m_firstRecAttr;
+  Uint32 posInRow = 0;
+  while(recAttr != NULL){
+    const char *attrData = NULL;
+    Uint32 attrSize = 0;
+    const int retVal1 = m_resultStreams[streamNo]->m_receiver
+      .getScanAttrData(attrData, attrSize, posInRow);
+    assert(retVal1==0);
+    assert(attrSize!=0);
+    assert(attrData!=NULL);
+    const bool retVal2 = recAttr
+      ->receive_data(reinterpret_cast<const Uint32*>(attrData), attrSize);
+    assert(retVal2);
+    recAttr = recAttr->next();
+  }
+}
+
 NdbQuery::NextResultOutcome
 NdbQueryOperationImpl::nextResult(NdbQueryImpl& queryImpl,
                                   bool fetchAllowed, 
@@ -697,17 +765,17 @@ NdbQueryOperationImpl::nextResult(NdbQue
       switch(fetchResult){
       case FetchResult_otherError:
         // FIXME: copy semantics from NdbScanOperation.
-        queryImpl.setErrorCode(4028); // Node fail
+        queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
         return NdbQuery::NextResult_error;
       case FetchResult_sendFail:
         // FIXME: copy semantics from NdbScanOperation.
-        queryImpl.setErrorCode(4028); // Node fail
+        queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
         return NdbQuery::NextResult_error;
       case FetchResult_nodeFail:
-        queryImpl.setErrorCode(4028); // Node fail
+        queryImpl.setErrorCode(Err_NodeFailCausedAbort); // Node fail
         return NdbQuery::NextResult_error;
       case FetchResult_timeOut:
-        queryImpl.setErrorCode(4008); // Timeout
+        queryImpl.setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
         return NdbQuery::NextResult_error;
       case FetchResult_ok:
         break;
@@ -721,8 +789,9 @@ NdbQueryOperationImpl::nextResult(NdbQue
       return NdbQuery::NextResult_bufferEmpty; 
     }
   }
-  const char* rootBuff = root.m_applStreams.top()->m_receiver.get_row();
+  const char* const rootBuff = root.m_applStreams.top()->m_receiver.get_row();
   assert(rootBuff!=NULL);
+  root.fetchRecAttrResults(root.m_applStreams.top()->m_streamNo);
   if(root.m_resultRef!=NULL){
     // Set application pointer to point into internal buffer.
     *root.m_resultRef = rootBuff;
@@ -750,6 +819,7 @@ NdbQueryOperationImpl::nextResult(NdbQue
       if(operation.m_resultStreams[0]->m_transidAICount==1){
         operation.m_isRowNull = false;
         const char* buff = operation.m_resultStreams[0]->m_receiver.get_row();
+        operation.fetchRecAttrResults(0);
         if(operation.m_resultRef!=NULL){
           // Set application pointer to point into internal buffer.
           *operation.m_resultRef = buff;
@@ -789,6 +859,7 @@ NdbQueryOperationImpl::updateChildResult
     resultStream.m_receiver.setCurrentRow(rowNo);
     const char* buff = resultStream.m_receiver.get_row();
     assert(buff!=NULL);
+    fetchRecAttrResults(streamNo);
     if(m_resultRef!=NULL){
       // Set application pointer to point into internal buffer.
       *m_resultRef = buff;
@@ -933,7 +1004,7 @@ NdbQueryOperationImpl::UserProjection::s
                                                  bool withCorrelation) const{
   /* If the columns in the projections are ordered according to ascending
    * column number, we can pack the projection more compactly.*/
-  if(m_isOrdered){
+  if(m_isOrdered && false){
     // Special case: get all columns.
     if(m_columnCount==m_noOfColsInTable){
       dst.get(0) = withCorrelation ? 2 : 1; // Size of projection in words.
@@ -947,7 +1018,7 @@ NdbQueryOperationImpl::UserProjection::s
       /* Serialize projection as a bitmap.*/
       const Uint32 wordCount = 1+m_maxColNo/32; // Size of mask.
       // Size of projection in words.
-      dst.get(0) = wordCount+ withCorrelation ? 2 : 1; 
+      dst.get(0) = wordCount+ (withCorrelation ? 2 : 1); 
       AttributeHeader::init(&dst.get(1), 
                             AttributeHeader::READ_PACKED, 4*wordCount);
       memcpy(&dst.get(2, wordCount), &m_mask, 4*wordCount);
@@ -959,14 +1030,14 @@ NdbQueryOperationImpl::UserProjection::s
   }else{
     /* General case: serialize projection as a list of column numbers.*/
     // Size of projection in words. 
-    dst.get(0) = m_columnCount+ withCorrelation ? 1 : 0 ; 
+    dst.get(0) = m_columnCount+ (withCorrelation ? 1 : 0); 
     for(int i = 0; i<m_columnCount; i++){
       AttributeHeader::init(&dst.get(i+1),
                             m_columns[i]->getColumnNo(),
                             0);
     }
     if(withCorrelation){
-      AttributeHeader::init(&dst.get(m_columnCount), 
+      AttributeHeader::init(&dst.get(m_columnCount+1), 
                             AttributeHeader::READ_ANY_VALUE, 0);
     }
   }
@@ -1077,21 +1148,12 @@ NdbQueryOperationImpl::prepareSend(Uint3
 {
   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
 
-  Uint32 rowSize = 0;
-  if(m_ndbRecord==NULL){
-    assert(false); // FIXME.
-    Uint32 firstBatchRows = 0;
-    Uint32 batchRows = 1;
-    // Find size of single row.
-    m_resultStreams[0]->m_receiver
-      .calculate_batch_size(0,  
-                            1,
-                            batchRows,
-                            rowSize,
-                            firstBatchRows,
-                            m_ndbRecord);
-  }else{
+  Uint32 rowSize;
+  if(m_ndbRecord!=NULL){
     rowSize = m_ndbRecord->m_row_size;
+  }else{
+    // The application did not ask for any attributes
+    rowSize = correlationWordCount;
   }
   m_batchByteSize = rowSize * getRoot().m_maxBatchRows;
   ndbout << "m_batchByteSize=" << m_batchByteSize << endl;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-08-27 12:10:05 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-08-31 11:17:11 +0000
@@ -504,10 +504,18 @@ private:
    * application thread and receiving thread. Access should be mutex protected.
    */
   StreamStack m_fullStreams;
+  /** Points to head of list. Used for old-style result retrieval 
+   * (using getValue()).*/
+  NdbRecAttr* m_firstRecAttr;
+  /** Points to tail of list. Used for old-style result retrieval 
+   * (using getValue()).*/
+  NdbRecAttr* m_lastRecAttr;
   /** Fetch result for non-root operation.*/
   void updateChildResult(Uint32 resultStreamNo, Uint32 rowNo);
   /** Get more scan results, ask for the next batch if necessary.*/
   FetchResult fetchMoreResults(bool forceSend);
+  /** Copy any NdbRecAttr results into application buffers.*/
+  void fetchRecAttrResults(Uint32 streamNo);
 }; // class NdbQueryOperationImpl
 
 

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2009-08-27 14:09:21 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2009-08-31 11:17:11 +0000
@@ -40,7 +40,7 @@
 #include <signaldata/SumaImpl.hpp>
 
 //#define REPORT_TRANSPORTER
-//#define API_TRACE
+#define API_TRACE
 
 static int numberToIndex(int number)
 {

=== modified file 'storage/ndb/test/tools/test_spj.cpp'
--- a/storage/ndb/test/tools/test_spj.cpp	2009-08-27 16:17:48 +0000
+++ b/storage/ndb/test/tools/test_spj.cpp	2009-08-31 11:17:11 +0000
@@ -517,6 +517,74 @@ int spjTest(int argc, char** argv){
   exit(-1); }
 
 
+int plainScan(int argc, char** argv){
+  NDB_INIT(argv[0]);
+
+  const char *load_default_groups[]= { "mysql_cluster",0 };
+  load_defaults("my",load_default_groups,&argc,&argv);
+  int ho_error;
+#ifndef DBUG_OFF
+  opt_debug= "d:t:O,/tmp/ndb_desc.trace";
+#endif
+  if ((ho_error=handle_options(&argc, &argv, my_long_options,
+			       ndb_std_get_one_option)))
+    return NDBT_ProgramExit(NDBT_WRONGARGS);
+
+  Ndb_cluster_connection con(opt_connect_str);
+  if(con.connect(12, 5, 1) != 0)
+  {
+    ndbout << "Unable to connect to management server." << endl;
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  int res = con.wait_until_ready(30,30);
+  if (res != 0)
+  {
+    ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  Ndb myNdb(&con, _dbname);
+  if(myNdb.init() != 0){
+    ERR(myNdb.getNdbError());
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  NdbDictionary::Dictionary*  const myDict = myNdb.getDictionary();
+  const NdbDictionary::Table* const tab = myDict->getTable("T");
+
+  assert(tab!=NULL);
+  
+  //APIERROR(myDict->getNdbError());
+  
+  NdbTransaction* myTransaction= myNdb.startTransaction();
+
+  NdbScanOperation* const scanOp = myTransaction->getNdbScanOperation(tab);
+  Uint32 results[6] = {0};
+  for(Uint32 i = 0; i<6; i++){
+    scanOp->getValue(i, reinterpret_cast<char*>(&results[i]));
+  }
+  
+  assert(myTransaction->execute(NoCommit)==0);
+  bool done = false;
+  while(!done){
+    switch(scanOp->nextResult(true, false)){
+    case 0:
+      for(Uint32 i = 0; i<6; i++){
+        ndbout << results[i] << " ";
+      }
+      ndbout << endl;
+      break;
+    case 1:
+      done = true;
+      break;
+    default:
+      assert(0);
+    }
+  }
+  return 0;
+}; // plainScan()
+
 
 
 int testSerialize(bool scan, int argc, char** argv){
@@ -697,8 +765,13 @@ int testSerialize(bool scan, int argc, c
     }
     const NdbRecord* resultRec = tab->getDefaultRecord();
     assert(resultRec!=NULL);
-    Uint32 results[recordOpCount][6] = {0};
-    const unsigned char mask[] = {0x0e};
+    Uint32 results[recordOpCount][6] = {0U};
+    const unsigned char mask[] = {0xe};
+    /*for(Uint32 i = 0; i<recordOpCount; i++){
+      for(Uint32 j = 0; j<6; i++){
+        results[i][j]= 0x0;
+      }
+    }*/
     for(Uint32 i = 0; i<recordOpCount; i++){
       const int error = query->getQueryOperation(i)
         ->setResultRowBuf(resultRec, 
@@ -726,8 +799,9 @@ int testSerialize(bool scan, int argc, c
 
 int main(int argc, char** argv){
   //return spjTest(argc, argv);
-  testSerialize(false, argc, argv);
-  return testSerialize(true, argc, argv);
+  //testSerialize(false, argc, argv);
+  //return 0;
+  return plainScan(argc, argv);//testSerialize(true, argc, argv);
 }
 /**
  * Store list of 16-bit integers put into 32-bit integers


Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20090831111711-ynlrujsp2fqq89x1.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2958) Jan Wedvik31 Aug