List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:October 2 2009 1:18pm
Subject:bzr commit into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2952)
View as plain text  
#At file:///home/oa136780/mysql/mysql-5.1-telco-7.0-spj/ based on revid:jan.wedvik@stripped

 2952 Ole John Aske	2009-10-02
      Initial implementation of NEXT_SCANREQ inside SPJ API.

    modified:
      storage/ndb/src/ndbapi/NdbImpl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp	2009-09-14 12:08:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp	2009-10-02 13:18:45 +0000
@@ -185,7 +185,7 @@ NdbReceiver::getTransaction() const {
     assert(false);
     return NULL;
   case NDB_QUERY_OPERATION:
-    return m_query_operation_impl->getQuery().getNdbTransaction();
+    return &m_query_operation_impl->getQuery().getNdbTransaction();
   default:
     return ((NdbOperation*)m_owner)->theNdbCon;
   }

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-10-01 12:49:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-10-02 13:18:45 +0000
@@ -32,6 +32,9 @@
 #include "NdbApiSignal.hpp"
 #include "NdbTransaction.hpp"
 
+
+#define TEST_SCANREQ
+
 /* Various error codes that are not specific to NdbQuery. */
 STATIC_CONST(Err_MemoryAlloc = 4000);
 STATIC_CONST(Err_SendFailed = 4002);
@@ -189,7 +192,7 @@ NdbQuery::close(bool forceSend, bool rel
 NdbTransaction*
 NdbQuery::getNdbTransaction() const
 {
-  return m_impl.getNdbTransaction();
+  return &m_impl.getNdbTransaction();
 }
 
 const NdbError& 
@@ -560,7 +563,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
 
   /* Check if there are any more completed streams available.*/
   if(m_queryDef.isScanQuery()){
-    Ndb* const ndb = getNdbTransaction()->getNdb();
+    Ndb* const ndb = getNdbTransaction().getNdb();
     
     TransporterFacade* const facade 
       = ndb->theImpl->m_transporter_facade;
@@ -576,19 +579,24 @@ NdbQueryImpl::fetchMoreResults(bool forc
       if(m_fullStreams.top()==NULL){
         if(getRoot().isBatchComplete()){
           /* FIXME: Add code to ask for the next batch if necessary.*/
+#ifdef TEST_SCANREQ
+          const bool scanComplete = false;
+#else
           const bool scanComplete = true;
+#endif
           if(scanComplete){
             /* FIXME: Close scans properly. This would involve sending
              * SCAN_NEXTREQ*/
             return FetchResult_scanComplete;
           }else{
             // FIXME: Ask for new scan batch.
+            sendFetchMore(m_transaction.getConnectedNodeId(),false);
           }
         }
         /* More results are on the way, so we wait for them.*/
         const FetchResult waitResult = static_cast<FetchResult>
           (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
-                                getNdbTransaction()->getConnectedNodeId(), 
+                                m_transaction.getConnectedNodeId(), 
                                 forceSend));
         if(waitResult != FetchResult_ok){
           return waitResult;
@@ -670,18 +678,12 @@ NdbQueryImpl::close(bool forceSend, bool
   // FIXME
 }
 
-NdbTransaction*
-NdbQueryImpl::getNdbTransaction() const
-{
-  return &m_transaction;
-}
-
 void
 NdbQueryImpl::setErrorCodeAbort(int aErrorCode){
   m_error.code = aErrorCode;
-  getNdbTransaction()->theErrorLine = 0;
-  getNdbTransaction()->theErrorOperation = NULL;
-  getNdbTransaction()->setOperationErrorCodeAbort(aErrorCode);
+  m_transaction.theErrorLine = 0;
+  m_transaction.theErrorOperation = NULL;
+  m_transaction.setOperationErrorCodeAbort(aErrorCode);
 }
 
 bool 
@@ -739,11 +741,15 @@ NdbQueryImpl::prepareSend()
       m_parallelism = m_pendingStreams;
     }
 
-    Ndb* const ndb = getNdbTransaction()->getNdb();
+    Ndb* const ndb = m_transaction.getNdb();
     TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
 
     Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
 
+#ifdef TEST_SCANREQ
+    batchRows = 1;  // To force usage of SCAN_NEXTREQ even for small scans
+#endif
+
     // Calculate batchsize for query as minimum batchRows for all m_operations[].
     // Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
     // when building signal after max-batchRows has been determined.
@@ -829,7 +835,7 @@ Remark:         Send a TCKEYREQ or SCAN_
                 KEYINFO and ATTRINFO are included as part of the long signal
 ******************************************************************************/
 int
-NdbQueryImpl::doSend(int nodeId, bool lastFlag)
+NdbQueryImpl::doSend(int nodeId, bool lastFlag)  // TODO: Use 'lastFlag'
 {
   Ndb& ndb = *m_transaction.getNdb();
   TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
@@ -974,7 +980,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
       return -1;
     }
 
-  } else {
+  } else {  // Lookup query
 
     m_signal->setSignal(GSN_TCKEYREQ);  // TODO: or GSN_TCINDXREQ
 
@@ -1067,6 +1073,65 @@ NdbQueryImpl::doSend(int nodeId, bool la
 } // NdbQueryImpl::doSend()
 
 
+/******************************************************************************
+int sendFetchMore()
+
+Return Value:   Return >0 : send was succesful, returns number of signals sent
+                Return -1: In all other case.   
+Parameters:     nodeId: Receiving processor node
+Remark:
+******************************************************************************/
+int
+NdbQueryImpl::sendFetchMore(int nodeId, bool lastFlag)  // TODO: Unsure if we need 'lastFlag'
+{
+  Ndb& ndb = *m_transaction.getNdb();
+  TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+  m_signal = ndb.getSignal();
+  if (m_signal == NULL) {
+    setErrorCodeAbort(Err_MemoryAlloc);  // Allocation failure
+    return -1; 
+  }
+
+  // TODO: It is likely we need to grab the transporter mutex, investigate
+
+  printf("::sendFetchMore, to nodeId:%d\n", nodeId);
+
+  m_signal->setSignal(GSN_SCAN_NEXTREQ);
+
+  Uint32* dataPtr = m_signal->getDataPtrSend();
+  ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, dataPtr);
+  Uint32* receivers = dataPtr + ScanNextReq::SignalLength;
+
+  Uint64 transId = m_transaction.getTransactionId();
+  scanNextReq->apiConnectPtr = m_transaction.theTCConPtr;
+  scanNextReq->stopScan = 0;
+  scanNextReq->transId1 = (Uint32) transId;
+  scanNextReq->transId2 = (Uint32) (transId >> 32);
+
+  // List of fragments from where we request 'NEXT' follows:
+  assert (m_parallelism<=21); // TODO: Else we need a (sectioned) long signal
+//const NdbQueryOperationImpl& queryOp = getRoot();
+  for(unsigned i = 0; i<m_parallelism; i++){
+    receivers[i] = i;  //queryOp.getReceiver(i).getId();  // TODO
+  }
+  m_signal->setLength(ScanNextReq::SignalLength+m_parallelism);
+
+  const int res = tp->sendSignal(m_signal, nodeId);
+  if (unlikely(res == -1))
+  {
+    setErrorCodeAbort(Err_SendFailed);  // Error: 'Send to NDB failed'
+    return -1;
+  }
+
+  ndb.releaseSignal(m_signal);
+  m_signal = NULL;
+
+  return 1;
+} // NdbQueryImpl::sendFetchMore()
+
+
+
 NdbQueryImpl::StreamStack::StreamStack():
   m_size(0),
   m_current(-1),
@@ -1117,7 +1182,7 @@ NdbResultStream::TupleIdMap::get(Uint16 
 
 NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 streamNo):
   m_streamNo(streamNo),
-  m_receiver(operation.getQuery().getNdbTransaction()->getNdb(), &operation),  // FIXME? Use Ndb recycle lists
+  m_receiver(operation.getQuery().getNdbTransaction().getNdb(), &operation),  // FIXME? Use Ndb recycle lists
   m_transidAICount(0),
   m_correlToTupNumMap(),
   m_pendingResults(0),
@@ -1233,7 +1298,7 @@ NdbQueryOperationImpl::NdbQueryOperation
 }
 
 NdbQueryOperationImpl::~NdbQueryOperationImpl(){
-  Ndb* const ndb = m_queryImpl.getNdbTransaction()->getNdb();
+  Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
   // Check against buffer overun.
 #ifndef NDEBUG // Buffer overrun check activated.
   assert(m_batchBuffer==NULL ||
@@ -1331,7 +1396,7 @@ NdbQueryOperationImpl::getValue(
     getQuery().setErrorCode(addResult);
     return NULL;
   }
-  Ndb* const ndb = getQuery().getNdbTransaction()->getNdb();
+  Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
   NdbRecAttr* const recAttr = ndb->getRecAttr();
   if(unlikely(recAttr == NULL)){
     getQuery().setErrorCodeAbort(Err_MemoryAlloc);

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-10-01 12:49:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-10-02 13:18:45 +0000
@@ -84,7 +84,8 @@ public:
   /** Close query*/
   void close(bool forceSend, bool release);
 
-  NdbTransaction* getNdbTransaction() const;
+  NdbTransaction& getNdbTransaction() const
+  { return m_transaction; }
 
   const NdbError& getNdbError() const;
 
@@ -109,11 +110,16 @@ public:
    */
   int prepareSend();
 
-  /** Send prepared signals from this NdbQuery
-   *  @return possible error code.
+  /** Send prepared signals from this NdbQuery to start execution
+   *  @return #signals sent, -1 if error.
    */
   int doSend(int aNodeId, bool lastFlag);
 
+  /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
+   *  @return #signals sent, -1 if error.
+   */
+  int sendFetchMore(int nodeId, bool lastFlag);
+
   NdbQuery& getInterface()
   { return m_interface; }
 


Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20091002131845-b5cyr8hkupjqrubi.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2952) Ole John Aske2 Oct