#At file:///home/oa136780/mysql/mysql-5.1-telco-7.0-spj/ based on revid:jan.wedvik@stripped
2952 Ole John Aske 2009-10-02
Initial implementation of NEXT_SCANREQ inside SPJ API.
modified:
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp 2009-09-14 12:08:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp 2009-10-02 13:18:45 +0000
@@ -185,7 +185,7 @@ NdbReceiver::getTransaction() const {
assert(false);
return NULL;
case NDB_QUERY_OPERATION:
- return m_query_operation_impl->getQuery().getNdbTransaction();
+ return &m_query_operation_impl->getQuery().getNdbTransaction();
default:
return ((NdbOperation*)m_owner)->theNdbCon;
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-10-01 12:49:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2009-10-02 13:18:45 +0000
@@ -32,6 +32,9 @@
#include "NdbApiSignal.hpp"
#include "NdbTransaction.hpp"
+
+#define TEST_SCANREQ
+
/* Various error codes that are not specific to NdbQuery. */
STATIC_CONST(Err_MemoryAlloc = 4000);
STATIC_CONST(Err_SendFailed = 4002);
@@ -189,7 +192,7 @@ NdbQuery::close(bool forceSend, bool rel
NdbTransaction*
NdbQuery::getNdbTransaction() const
{
- return m_impl.getNdbTransaction();
+ return &m_impl.getNdbTransaction();
}
const NdbError&
@@ -560,7 +563,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
/* Check if there are any more completed streams available.*/
if(m_queryDef.isScanQuery()){
- Ndb* const ndb = getNdbTransaction()->getNdb();
+ Ndb* const ndb = getNdbTransaction().getNdb();
TransporterFacade* const facade
= ndb->theImpl->m_transporter_facade;
@@ -576,19 +579,24 @@ NdbQueryImpl::fetchMoreResults(bool forc
if(m_fullStreams.top()==NULL){
if(getRoot().isBatchComplete()){
/* FIXME: Add code to ask for the next batch if necessary.*/
+#ifdef TEST_SCANREQ
+ const bool scanComplete = false;
+#else
const bool scanComplete = true;
+#endif
if(scanComplete){
/* FIXME: Close scans properly. This would involve sending
* SCAN_NEXTREQ*/
return FetchResult_scanComplete;
}else{
// FIXME: Ask for new scan batch.
+ sendFetchMore(m_transaction.getConnectedNodeId(),false);
}
}
/* More results are on the way, so we wait for them.*/
const FetchResult waitResult = static_cast<FetchResult>
(poll_guard.wait_scan(3*facade->m_waitfor_timeout,
- getNdbTransaction()->getConnectedNodeId(),
+ m_transaction.getConnectedNodeId(),
forceSend));
if(waitResult != FetchResult_ok){
return waitResult;
@@ -670,18 +678,12 @@ NdbQueryImpl::close(bool forceSend, bool
// FIXME
}
-NdbTransaction*
-NdbQueryImpl::getNdbTransaction() const
-{
- return &m_transaction;
-}
-
void
NdbQueryImpl::setErrorCodeAbort(int aErrorCode){
m_error.code = aErrorCode;
- getNdbTransaction()->theErrorLine = 0;
- getNdbTransaction()->theErrorOperation = NULL;
- getNdbTransaction()->setOperationErrorCodeAbort(aErrorCode);
+ m_transaction.theErrorLine = 0;
+ m_transaction.theErrorOperation = NULL;
+ m_transaction.setOperationErrorCodeAbort(aErrorCode);
}
bool
@@ -739,11 +741,15 @@ NdbQueryImpl::prepareSend()
m_parallelism = m_pendingStreams;
}
- Ndb* const ndb = getNdbTransaction()->getNdb();
+ Ndb* const ndb = m_transaction.getNdb();
TransporterFacade *tp = ndb->theImpl->m_transporter_facade;
Uint32 batchRows = m_maxBatchRows; // >0: User specified prefered value, ==0: Use default CFG values
+#ifdef TEST_SCANREQ
+ batchRows = 1; // To force usage of SCAN_NEXTREQ even for small scans
+#endif
+
// Calculate batchsize for query as minimum batchRows for all m_operations[].
// Ignore calculated 'batchByteSize' and 'firstBatchRows' here - Recalculated
// when building signal after max-batchRows has been determined.
@@ -829,7 +835,7 @@ Remark: Send a TCKEYREQ or SCAN_
KEYINFO and ATTRINFO are included as part of the long signal
******************************************************************************/
int
-NdbQueryImpl::doSend(int nodeId, bool lastFlag)
+NdbQueryImpl::doSend(int nodeId, bool lastFlag) // TODO: Use 'lastFlag'
{
Ndb& ndb = *m_transaction.getNdb();
TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
@@ -974,7 +980,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
return -1;
}
- } else {
+ } else { // Lookup query
m_signal->setSignal(GSN_TCKEYREQ); // TODO: or GSN_TCINDXREQ
@@ -1067,6 +1073,65 @@ NdbQueryImpl::doSend(int nodeId, bool la
} // NdbQueryImpl::doSend()
+/******************************************************************************
+int sendFetchMore()
+
+Return Value: Return >0 : send was succesful, returns number of signals sent
+ Return -1: In all other case.
+Parameters: nodeId: Receiving processor node
+Remark:
+******************************************************************************/
+int
+NdbQueryImpl::sendFetchMore(int nodeId, bool lastFlag) // TODO: Unsure if we need 'lastFlag'
+{
+ Ndb& ndb = *m_transaction.getNdb();
+ TransporterFacade *tp = ndb.theImpl->m_transporter_facade;
+
+ m_signal = ndb.getSignal();
+ if (m_signal == NULL) {
+ setErrorCodeAbort(Err_MemoryAlloc); // Allocation failure
+ return -1;
+ }
+
+ // TODO: It is likely we need to grab the transporter mutex, investigate
+
+ printf("::sendFetchMore, to nodeId:%d\n", nodeId);
+
+ m_signal->setSignal(GSN_SCAN_NEXTREQ);
+
+ Uint32* dataPtr = m_signal->getDataPtrSend();
+ ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, dataPtr);
+ Uint32* receivers = dataPtr + ScanNextReq::SignalLength;
+
+ Uint64 transId = m_transaction.getTransactionId();
+ scanNextReq->apiConnectPtr = m_transaction.theTCConPtr;
+ scanNextReq->stopScan = 0;
+ scanNextReq->transId1 = (Uint32) transId;
+ scanNextReq->transId2 = (Uint32) (transId >> 32);
+
+ // List of fragments from where we request 'NEXT' follows:
+ assert (m_parallelism<=21); // TODO: Else we need a (sectioned) long signal
+//const NdbQueryOperationImpl& queryOp = getRoot();
+ for(unsigned i = 0; i<m_parallelism; i++){
+ receivers[i] = i; //queryOp.getReceiver(i).getId(); // TODO
+ }
+ m_signal->setLength(ScanNextReq::SignalLength+m_parallelism);
+
+ const int res = tp->sendSignal(m_signal, nodeId);
+ if (unlikely(res == -1))
+ {
+ setErrorCodeAbort(Err_SendFailed); // Error: 'Send to NDB failed'
+ return -1;
+ }
+
+ ndb.releaseSignal(m_signal);
+ m_signal = NULL;
+
+ return 1;
+} // NdbQueryImpl::sendFetchMore()
+
+
+
NdbQueryImpl::StreamStack::StreamStack():
m_size(0),
m_current(-1),
@@ -1117,7 +1182,7 @@ NdbResultStream::TupleIdMap::get(Uint16
NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 streamNo):
m_streamNo(streamNo),
- m_receiver(operation.getQuery().getNdbTransaction()->getNdb(), &operation), // FIXME? Use Ndb recycle lists
+ m_receiver(operation.getQuery().getNdbTransaction().getNdb(), &operation), // FIXME? Use Ndb recycle lists
m_transidAICount(0),
m_correlToTupNumMap(),
m_pendingResults(0),
@@ -1233,7 +1298,7 @@ NdbQueryOperationImpl::NdbQueryOperation
}
NdbQueryOperationImpl::~NdbQueryOperationImpl(){
- Ndb* const ndb = m_queryImpl.getNdbTransaction()->getNdb();
+ Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
// Check against buffer overun.
#ifndef NDEBUG // Buffer overrun check activated.
assert(m_batchBuffer==NULL ||
@@ -1331,7 +1396,7 @@ NdbQueryOperationImpl::getValue(
getQuery().setErrorCode(addResult);
return NULL;
}
- Ndb* const ndb = getQuery().getNdbTransaction()->getNdb();
+ Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
NdbRecAttr* const recAttr = ndb->getRecAttr();
if(unlikely(recAttr == NULL)){
getQuery().setErrorCodeAbort(Err_MemoryAlloc);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-10-01 12:49:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2009-10-02 13:18:45 +0000
@@ -84,7 +84,8 @@ public:
/** Close query*/
void close(bool forceSend, bool release);
- NdbTransaction* getNdbTransaction() const;
+ NdbTransaction& getNdbTransaction() const
+ { return m_transaction; }
const NdbError& getNdbError() const;
@@ -109,11 +110,16 @@ public:
*/
int prepareSend();
- /** Send prepared signals from this NdbQuery
- * @return possible error code.
+ /** Send prepared signals from this NdbQuery to start execution
+ * @return #signals sent, -1 if error.
*/
int doSend(int aNodeId, bool lastFlag);
+ /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
+ * @return #signals sent, -1 if error.
+ */
+ int sendFetchMore(int nodeId, bool lastFlag);
+
NdbQuery& getInterface()
{ return m_interface; }
Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20091002131845-b5cyr8hkupjqrubi.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2952) | Ole John Aske | 2 Oct |