List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 17 2011 12:37pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4462 to 4464)
View as plain text  
 4464 Ole John Aske	2011-08-17
      SPJ extensions:In preparation for implementing handling of double buffered result
      sets in class NdbReceiver, extend NdbReceiver with two new methods to resp.
      set buffer positions for receiving and reading result rows into/from
      the NdbReceiver.
            
      NOTE: Different receive and read position did already exists 
      as part of the NdbReceiver - This patch only extend the
      interface to be able to set it from the outside.

    modified:
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
 4463 Ole John Aske	2011-08-17
      Rename class NdbReceiver member field 'm_row' to 'm_row_recv' in order to 
      clarify its usage as a row buffer only used when received rows by 
      the NdbReceiver. (As opposed to the when we we *read* rows which has
      been received by a NdbReceived)

    modified:
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
 4462 Ole John Aske	2011-08-17
      Refactoring as part of preparing SPJ results to be double (multi...?) buffered.
        
      'class SharedFragStack' which was intended as a container for fragments
      which has received a complete batch of rows is removed. 
      It is replaced with a set of (mutex protected) methods in 
      'class NdbRootFragment' and OrderedFragSet::prepareMoreResults()
      
      The rational for this refactoring is that the SharedFragStack container
      wasn't a particular good fit to represent the state when *multiple*
      batches of (double buffered) result set may be available.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-08-17 12:36:56 +0000
@@ -40,7 +40,6 @@ class NdbReceiver
   friend class NdbIndexScanOperation;
   friend class NdbTransaction;
   friend class NdbRootFragment;
-  friend class ReceiverIdIterator;
   friend int compare_ndbrecord(const NdbReceiver *r1,
                       const NdbReceiver *r2,
                       const NdbRecord *key_record,
@@ -82,6 +81,12 @@ public:
   
   void setErrorCode(int);
 
+  /* Prepare for receiving of rows into specified buffer */
+  void prepareReceive(char *buf);
+
+  /* Prepare for reading of rows from specified buffer */
+  void prepareRead(char *buf, Uint32 rows);
+
 private:
   Uint32 theMagicNumber;
   Ndb* m_ndb;
@@ -140,8 +145,9 @@ private:
   /* members used for NdbRecord operation. */
   struct {
     const NdbRecord *m_ndb_record;
-    char *m_row;
-    /* Block of memory used to receive all rows in a batch during scan. */
+    /* Destination to receive next row into. */
+    char *m_row_recv;
+    /* Block of memory used to read all rows in a batch during scan. */
     char *m_row_buffer;
     /*
       Offsets between two rows in m_row_buffer.
@@ -226,7 +232,7 @@ private:
                     const char * & data_ptr) const;
   int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
   /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
-  void setCurrentRow(Uint32 currentRow);
+  void setCurrentRow(char* buffer, Uint32 row);
   /** Used by NdbQueryOperationImpl.*/
   Uint32 getCurrentRow() const { return m_current_row; }
 };
@@ -259,7 +265,7 @@ NdbReceiver::prepareSend(){
   if (m_using_ndb_record)
   {
     if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
-      m_record.m_row= m_record.m_row_buffer;
+      m_record.m_row_recv= m_record.m_row_buffer;
   }
   theCurrentRecAttr = theFirstRecAttr;
 }
@@ -287,9 +293,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
 
 inline
 void
-NdbReceiver::setCurrentRow(Uint32 currentRow)
+NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
 {
-  m_current_row = currentRow;
+  m_record.m_row_buffer = buffer;
+  m_current_row = row;
+#ifdef assert
+  assert(m_current_row < m_result_rows);
+#endif
 }
 
 inline

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 12:06:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 12:36:56 +0000
@@ -788,7 +788,7 @@ NdbResultStream::firstResult()
   if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_currentRow);
+    m_receiver.setCurrentRow(m_buffer, m_currentRow);
     return m_currentRow;
   }
 
@@ -805,7 +805,7 @@ NdbResultStream::nextResult()
       (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_currentRow);
+    m_receiver.setCurrentRow(m_buffer, m_currentRow);
     return m_currentRow;
   }
   m_iterState = Iter_finished;
@@ -829,7 +829,8 @@ NdbResultStream::execTRANSID_AI(const Ui
      * Store TupleCorrelation as hidden value imm. after received row
      * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
      */
-    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+    Uint32* row_recv = reinterpret_cast<Uint32*>
+                         (m_receiver.m_record.m_row_recv);
     row_recv[-1] = correlation.toUint32();
   }
 } // NdbResultStream::execTRANSID_AI()

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-08-17 12:36:56 +0000
@@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo
   if (useRec)
   {
     m_record.m_ndb_record= NULL;
-    m_record.m_row= NULL;
+    m_record.m_row_recv= NULL;
     m_record.m_row_buffer= NULL;
     m_record.m_row_offset= 0;
     m_record.m_read_range_no= false;
@@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord* 
   assert(m_using_ndb_record);
 
   m_record.m_ndb_record= rec;
-  m_record.m_row= row_ptr;
+  m_record.m_row_recv= row_ptr;
   m_record.m_row_offset= rec->m_row_size;
 }
 
-#define KEY_ATTR_ID (~(Uint32)0)
+void
+NdbReceiver::prepareReceive(char *buf)
+{
+  /* Set pointers etc. to prepare for receiving the first row of the batch. */
+  assert(theMagicNumber == 0x11223344);
+  m_received_result_length = 0;
+  m_expected_result_length = 0;
+  if (m_using_ndb_record)
+  {
+    m_record.m_row_recv= buf;
+  }
+  theCurrentRecAttr = theFirstRecAttr;
+}
+
+void
+NdbReceiver::prepareRead(char *buf, Uint32 rows)
+{
+  /* Set pointers etc. to prepare for reading the first row of the batch. */
+  assert(theMagicNumber == 0x11223344);
+  m_current_row = 0;
+  m_result_rows = rows;
+  if (m_using_ndb_record)
+  {
+    m_record.m_row_buffer = buf;
+  }
+}
+
+ #define KEY_ATTR_ID (~(Uint32)0)
 
 /*
   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
@@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd
 {
   m_using_ndb_record= true;
   m_record.m_ndb_record= ndb_record;
-  m_record.m_row= row_buffer;
+  m_record.m_row_recv= row_buffer;
   m_record.m_row_buffer= row_buffer;
   m_record.m_row_offset= rowsize;
   m_record.m_read_range_no= read_range_no;
@@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui
       {
 	if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
 	{
-          setRecToNULL(col, m_record.m_row);
+          setRecToNULL(col, m_record.m_row_recv);
 
           // Next column...
 	  continue;
@@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32
         assert(m_record.m_read_range_no);
         assert(attrSize==4);
         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
-        memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
+        memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size, 
+               aDataPtr++, 4);
         aLength--;
         continue; // Next
       }
@@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
         Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
                                              aDataPtr,
-                                             m_record.m_row);
+                                             m_record.m_row_recv);
         aDataPtr+= len;
         aLength-= len;
         continue;  // Next
@@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32
         
         /* Save this extra getValue */
         save_pos+= sizeof(Uint32);
-        memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+        memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
                &attrSize, sizeof(Uint32));
         if (attrSize > 0)
         {
           save_pos+= attrSize;
           assert (save_pos<=m_record.m_row_offset);
-          memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+          memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
                  aDataPtr, attrSize);
         }
 
@@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
 
   if (m_using_ndb_record) {
     /* Move onto next row in scan buffer */
-    m_record.m_row+= m_record.m_row_offset;
+    m_record.m_row_recv+= m_record.m_row_offset;
   }
   return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
 }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4462 to 4464) Ole John Aske22 Aug