List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:September 1 2009 7:42am
Subject:bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2959)
View as plain text  
#At file:///export/home2/tmp/jw1159207/mysql/mysql-5.1-telco-7.0-spj/ based on revid:jan.wedvik@stripped

 2959 Jan Wedvik	2009-09-01
      NdbRecAttr result retrieval for queries works again.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/test/tools/test_spj.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-08-31 11:17:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-09-01 07:42:03 +0000
@@ -668,18 +668,21 @@ void
 NdbQueryOperationImpl::findMaxRows(){
   assert(m_operationDef.getQueryOperationIx()==0);
   if(isScan()){
-    const NdbReceiver& receiver =  m_resultStreams[0]->m_receiver;
-    // Root operation is a scan.
-    Uint32 firstBatchRows = 0;
-    Uint32 batchByteSize = 0;
-    receiver.calculate_batch_size(0, // Key size.
-                                  getQuery().getParallelism(),
-                                  m_maxBatchRows,
-                                  batchByteSize,
-                                  firstBatchRows,
-                                  m_ndbRecord);
-    assert(m_maxBatchRows!=0);
-    assert(firstBatchRows==m_maxBatchRows);
+    if(false){
+      const NdbReceiver& receiver =  m_resultStreams[0]->m_receiver;
+      // Root operation is a scan.
+      Uint32 firstBatchRows = 0;
+      Uint32 batchByteSize = 0;
+      receiver.calculate_batch_size(0, // Key size.
+                                    getQuery().getParallelism(),
+                                    m_maxBatchRows,
+                                    batchByteSize,
+                                    firstBatchRows,
+                                    m_ndbRecord);
+      assert(m_maxBatchRows!=0);
+      assert(firstBatchRows==m_maxBatchRows);
+    }
+    m_maxBatchRows = 64;
   }else{
     // Lookup.
     m_maxBatchRows = 1;
@@ -729,8 +732,25 @@ NdbQueryOperationImpl::setResultRowRef (
   return setResultRowBuf(rec, NULL, result_mask);
 }
 
+#ifdef UNUSED
 void 
 NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+  const char* buff = m_resultStreams[streamNo]->m_receiver.get_row();
+  assert(buff!=NULL);
+  NdbRecAttr* recAttr = m_firstRecAttr;
+  while(recAttr != NULL){
+    const AttributeHeader* const attHead = 
+      reinterpret_cast<const AttributeHeader*>(buff);
+    assert(recAttr->attrId() == attHead->getAttributeId());
+    memcpy(recAttr->theRef, attHead->getDataPtr(), attHead->getByteSize());
+    buff += sizeof(AttributeHeader) + attHead->getByteSize();
+    recAttr = recAttr->next();
+  }
+}
+#endif
+
+void
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
   NdbRecAttr* recAttr = m_firstRecAttr;
   Uint32 posInRow = 0;
   while(recAttr != NULL){
@@ -791,15 +811,18 @@ NdbQueryOperationImpl::nextResult(NdbQue
   }
   const char* const rootBuff = root.m_applStreams.top()->m_receiver.get_row();
   assert(rootBuff!=NULL);
-  root.fetchRecAttrResults(root.m_applStreams.top()->m_streamNo);
-  if(root.m_resultRef!=NULL){
-    // Set application pointer to point into internal buffer.
-    *root.m_resultRef = rootBuff;
-  }else if(root.m_resultBuffer!=NULL){
-    // Copy result to buffer supplied by application.
-    memcpy(root.m_resultBuffer, rootBuff, 
-           root.m_applStreams.top()->m_receiver.m_record.m_ndb_record
+  if(root.m_resultStyle==Style_NdbRecAttr){
+    root.fetchRecAttrResults(root.m_applStreams.top()->m_streamNo);
+  }else if(root.m_resultStyle==Style_NdbRecord){
+    if(root.m_resultRef!=NULL){
+      // Set application pointer to point into internal buffer.
+      *root.m_resultRef = rootBuff;
+    }else if(root.m_resultBuffer!=NULL){
+      // Copy result to buffer supplied by application.
+      memcpy(root.m_resultBuffer, rootBuff, 
+             root.m_applStreams.top()->m_receiver.m_record.m_ndb_record
              ->m_row_size);
+    }
   }
   if(root.isScan()){
     const Uint32 rowNo 
@@ -819,15 +842,19 @@ NdbQueryOperationImpl::nextResult(NdbQue
       if(operation.m_resultStreams[0]->m_transidAICount==1){
         operation.m_isRowNull = false;
         const char* buff = operation.m_resultStreams[0]->m_receiver.get_row();
-        operation.fetchRecAttrResults(0);
-        if(operation.m_resultRef!=NULL){
-          // Set application pointer to point into internal buffer.
-          *operation.m_resultRef = buff;
-        }else if(operation.m_resultBuffer!=NULL){
-          // Copy result to buffer supplied by application.
-          memcpy(operation.m_resultBuffer, buff, 
-                 operation.m_resultStreams[0]->m_receiver.m_record.m_ndb_record
+        if(operation.m_resultStyle==Style_NdbRecAttr){
+          operation.fetchRecAttrResults(0);
+        }else if(operation.m_resultStyle==Style_NdbRecord){
+          if(operation.m_resultRef!=NULL){
+            // Set application pointer to point into internal buffer.
+            *operation.m_resultRef = buff;
+          }else if(operation.m_resultBuffer!=NULL){
+            // Copy result to buffer supplied by application.
+            memcpy(operation.m_resultBuffer, buff, 
+                   operation.m_resultStreams[0]
+                   ->m_receiver.m_record.m_ndb_record
                    ->m_row_size);
+          }
         }
       }else{
         if(operation.m_resultRef!=NULL){
@@ -859,14 +886,17 @@ NdbQueryOperationImpl::updateChildResult
     resultStream.m_receiver.setCurrentRow(rowNo);
     const char* buff = resultStream.m_receiver.get_row();
     assert(buff!=NULL);
-    fetchRecAttrResults(streamNo);
-    if(m_resultRef!=NULL){
-      // Set application pointer to point into internal buffer.
-      *m_resultRef = buff;
-    }else if(m_resultBuffer!=NULL){
-      // Copy result to buffer supplied by application.
-      memcpy(m_resultBuffer, buff, 
-             resultStream.m_receiver.m_record.m_ndb_record->m_row_size);
+    if(m_resultStyle==Style_NdbRecAttr){
+      fetchRecAttrResults(streamNo);
+    }else if(m_resultStyle==Style_NdbRecord){
+      if(m_resultRef!=NULL){
+        // Set application pointer to point into internal buffer.
+        *m_resultRef = buff;
+      }else if(m_resultBuffer!=NULL){
+        // Copy result to buffer supplied by application.
+        memcpy(m_resultBuffer, buff, 
+               resultStream.m_receiver.m_record.m_ndb_record->m_row_size);
+      }
     }
     for(Uint32 i = 0; i<getNoOfChildOperations(); i++){
       getChildOperation(i).updateChildResult(streamNo, 
@@ -1001,10 +1031,12 @@ NdbQueryOperationImpl::UserProjection
 
 int
 NdbQueryOperationImpl::UserProjection::serialize(Uint32Slice dst,
+                                                 bool compact, 
                                                  bool withCorrelation) const{
   /* If the columns in the projections are ordered according to ascending
    * column number, we can pack the projection more compactly.*/
-  if(m_isOrdered && false){
+  if(compact){
+    assert(m_isOrdered);
     // Special case: get all columns.
     if(m_columnCount==m_noOfColsInTable){
       dst.get(0) = withCorrelation ? 2 : 1; // Size of projection in words.
@@ -1149,11 +1181,21 @@ NdbQueryOperationImpl::prepareSend(Uint3
   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
 
   Uint32 rowSize;
-  if(m_ndbRecord!=NULL){
+  switch(m_resultStyle){
+  case Style_NdbRecAttr:
+    rowSize = m_ndbRecord->m_row_size + 
+      sizeof(AttributeHeader)* m_userProjection.getColumnCount();
+    break;
+  case Style_NdbRecord:
     rowSize = m_ndbRecord->m_row_size;
-  }else{
-    // The application did not ask for any attributes
-    rowSize = correlationWordCount;
+    break;
+  case Style_None:
+    assert(m_ndbRecord == NULL);
+     m_ndbRecord = m_operationDef.getTable().getDefaultRecord();
+     rowSize = 0;
+     break;
+  default:
+    assert(false);
   }
   m_batchByteSize = rowSize * getRoot().m_maxBatchRows;
   ndbout << "m_batchByteSize=" << m_batchByteSize << endl;
@@ -1238,15 +1280,13 @@ NdbQueryOperationImpl::prepareSend(Uint3
     serializedParams.append(m_params);    
   }
 
-  if (true)
-  {
-    param.requestInfo |= DABits::PI_ATTR_LIST;
-    const int error = 
-      m_userProjection.serialize(Uint32Slice(serializedParams),
-                                 getRoot().isScan());
-    if (unlikely(error!=0)) {
-      return error;
-    }
+  param.requestInfo |= DABits::PI_ATTR_LIST;
+  const int error = 
+    m_userProjection.serialize(Uint32Slice(serializedParams),
+                               m_resultStyle == Style_NdbRecord,
+                               getRoot().isScan());
+  if (unlikely(error!=0)) {
+    return error;
   }
 
   QueryNodeParameters::setOpLen(param.len,

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-08-31 11:17:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-09-01 07:42:03 +0000
@@ -290,9 +290,10 @@ private:
     /** Make a serialize representation of this object, to be sent to the 
      * SPJ block.
      * @param dst Buffer for storing serialized projection.
+     * @param compact Use READ_PACKED or READ_ALL.
      * @param withCorrelation Include correlation data in projection.
      * @return Possible error code.*/
-    int serialize(Uint32Slice dst, bool withCorrelation) const;
+    int serialize(Uint32Slice dst, bool compact, bool withCorrelation) const;
     
     /** Get number of columns.*/
     int getColumnCount() const {return m_columnCount;}

=== modified file 'storage/ndb/test/tools/test_spj.cpp'
--- a/storage/ndb/test/tools/test_spj.cpp	2009-08-31 11:17:11 +0000
+++ b/storage/ndb/test/tools/test_spj.cpp	2009-09-01 07:42:03 +0000
@@ -646,14 +646,17 @@ int testSerialize(bool scan, int argc, c
     const void* params[] = {NULL};
     // Instantiate NdbQuery for this transaction.
     NdbQuery* query = myTransaction->createQuery(scanDef, params);
-    //Uint32 results[10][6];
-    //char* resultPtr = NULL;
-    const Uint32* scanResultPtr;
+    //const Uint32* scanResultPtr;
     //const unsigned char* mask = NULL;
     NdbQueryOperation* const scanOp = query->getQueryOperation(0U);
-    scanOp->setResultRowRef(resultRec, 
+    /*scanOp->setResultRowRef(resultRec, 
                         reinterpret_cast<const char*&>(scanResultPtr),
-                        NULL);
+                        NULL);*/
+    Uint32 scanResultPtr[6];
+    for(Uint32 i = 0; i<6; i++){
+      assert(scanOp->getValue(i, reinterpret_cast<char*>(scanResultPtr+i))
+             !=NULL);
+    }
     const Uint32* lookupResultPtr;
     NdbQueryOperation* const lookupOp = query->getQueryOperation(1);
     lookupOp->setResultRowRef(resultRec, 
@@ -757,7 +760,7 @@ int testSerialize(bool scan, int argc, c
 
     /* Read all attributes from result tuples.*/
     const Uint32 opCount = query->getNoOfOperations();
-    const Uint32 recordOpCount = 2;
+    const Uint32 recordOpCount = 0;
     const ResultSet** resultSet = new const ResultSet*[opCount-recordOpCount];
     for(Uint32 i = 0; i < opCount-recordOpCount; i++){
       resultSet[i] 
@@ -765,13 +768,13 @@ int testSerialize(bool scan, int argc, c
     }
     const NdbRecord* resultRec = tab->getDefaultRecord();
     assert(resultRec!=NULL);
-    Uint32 results[recordOpCount][6] = {0U};
+    Uint32 results[2][6] = {0U};
     const unsigned char mask[] = {0xe};
-    /*for(Uint32 i = 0; i<recordOpCount; i++){
-      for(Uint32 j = 0; j<6; i++){
+    for(Uint32 i = 0; i<recordOpCount; i++){
+      for(Uint32 j = 0; j<6; j++){
         results[i][j]= 0x0;
       }
-    }*/
+    }
     for(Uint32 i = 0; i<recordOpCount; i++){
       const int error = query->getQueryOperation(i)
         ->setResultRowBuf(resultRec, 
@@ -798,10 +801,10 @@ int testSerialize(bool scan, int argc, c
 
 
 int main(int argc, char** argv){
-  //return spjTest(argc, argv);
-  //testSerialize(false, argc, argv);
-  //return 0;
-  return plainScan(argc, argv);//testSerialize(true, argc, argv);
+  //plainScan(argc, argv)
+  testSerialize(false, argc, argv);
+  testSerialize(true, argc, argv);
+  return 0;
 }
 /**
  * Store list of 16-bit integers put into 32-bit integers


Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20090901074203-8i2t8pihxmuvweq4.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj branch (jan.wedvik:2959) Jan Wedvik1 Sep