List:Commits« Previous MessageNext Message »
From:John David Duncan Date:May 31 2012 6:59pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3929 to 3930)
View as plain text  
 3930 John David Duncan	2012-05-31
      7.2-atc: Apply Jonas' ATC patch to 7.2

    modified:
      storage/ndb/include/transporter/TransporterCallback.hpp
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/Packer.cpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/ndbapi/ClusterMgr.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/src/ndbapi/Ndbif.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
      storage/ndb/src/ndbapi/trp_buffer.cpp
      storage/ndb/src/ndbapi/trp_buffer.hpp
      storage/ndb/src/ndbapi/trp_client.cpp
      storage/ndb/src/ndbapi/trp_client.hpp
 3929 Frazer Clement	2012-05-31
      Bug#14083116 (Bad error handling in LQH corrupts transid hash)
      
      Testcase verifying fix.

    modified:
      mysql-test/suite/ndb/r/ndb_join_pushdown_default.result
      mysql-test/suite/ndb/t/ndb_join_pushdown.inc
=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp	2012-01-17 08:33:59 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp	2012-05-31 18:26:14 +0000
@@ -46,8 +46,10 @@ public:
    *
    * The method may either execute the signal immediately (NDB API), or
    * queue it for later execution (kernel).
+   *
+   * @returns true if no more signals should be delivered
    */
-  virtual void deliver_signal(SignalHeader * const header,
+  virtual bool deliver_signal(SignalHeader * const header,
                               Uint8 prio,
                               Uint32 * const signalData,
                               LinearSectionPtr ptr[3]) = 0;

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-30 15:12:41 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-05-31 18:26:14 +0000
@@ -445,13 +445,15 @@ private:
                 Uint32 * readPtr,
                 Uint32 bufferSize,
                 NodeId remoteNodeId,
-                IOState state);
+                IOState state,
+		bool & stopReceiving);
 
   Uint32 * unpack(TransporterReceiveHandle&,
                   Uint32 * readPtr,
                   Uint32 * eodPtr,
                   NodeId remoteNodeId,
-                  IOState state);
+                  IOState state,
+		  bool & stopReceiving);
 
   static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
   /** 
@@ -488,6 +490,7 @@ private:
   void updateWritePtr(TransporterSendBufferHandle *handle,
                       NodeId node, Uint32 lenBytes, Uint32 prio);
 
+public:
   /**
    * TransporterSendBufferHandle implementation.
    *

=== modified file 'storage/ndb/src/common/transporter/Packer.cpp'
--- a/storage/ndb/src/common/transporter/Packer.cpp	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/common/transporter/Packer.cpp	2012-05-31 18:26:14 +0000
@@ -33,16 +33,21 @@ TransporterRegistry::unpack(TransporterR
                             Uint32 * readPtr,
                             Uint32 sizeOfData,
                             NodeId remoteNodeId,
-                            IOState state) {
+                            IOState state,
+			    bool & stopReceiving)
+{
+  assert(stopReceiving == false);
   SignalHeader signalHeader;
   LinearSectionPtr ptr[3];
   
   Uint32 usedData   = 0;
   Uint32 loop_count = 0; 
+  bool doStopReceiving = false;
  
   if(state == NoHalt || state == HaltOutput){
     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
-           (loop_count < MAX_RECEIVED_SIGNALS)) {
+           (loop_count < MAX_RECEIVED_SIGNALS) &&
+	   doStopReceiving == false) {
       Uint32 word1 = readPtr[0];
       Uint32 word2 = readPtr[1];
       Uint32 word3 = readPtr[2];
@@ -113,19 +118,21 @@ TransporterRegistry::unpack(TransporterR
 	sectionData += sz;
       }
 
-      recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+      doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       
       readPtr     += messageLen32;
       sizeOfData  -= messageLenBytes;
       usedData    += messageLenBytes;
     }//while
-    
+
+    stopReceiving = doStopReceiving;
     return usedData;
   } else {
     /** state = HaltIO || state == HaltInput */
 
     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
-           (loop_count < MAX_RECEIVED_SIGNALS)) {
+           (loop_count < MAX_RECEIVED_SIGNALS) &&
+	   doStopReceiving == false) {
       Uint32 word1 = readPtr[0];
       Uint32 word2 = readPtr[1];
       Uint32 word3 = readPtr[2];
@@ -199,7 +206,7 @@ TransporterRegistry::unpack(TransporterR
 	  sectionData += sz;
 	}
 
-	recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+	doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       } else {
 	DEBUG("prepareReceive(...) - Discarding message to block: "
 	      << rBlockNum << " from Node: " << remoteNodeId);
@@ -210,7 +217,7 @@ TransporterRegistry::unpack(TransporterR
       usedData    += messageLenBytes;
     }//while
     
-
+    stopReceiving = doStopReceiving;
     return usedData;
   }//if
 }
@@ -220,12 +227,15 @@ TransporterRegistry::unpack(TransporterR
                             Uint32 * readPtr,
                             Uint32 * eodPtr,
                             NodeId remoteNodeId,
-                            IOState state) {
+                            IOState state,
+			    bool & stopReceiving) {
+  assert(stopReceiving == false);
   SignalHeader signalHeader;
   LinearSectionPtr ptr[3];
   Uint32 loop_count = 0;
+  bool doStopReceiving = false;
   if(state == NoHalt || state == HaltOutput){
-    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
+    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) {
       Uint32 word1 = readPtr[0];
       Uint32 word2 = readPtr[1];
       Uint32 word3 = readPtr[2];
@@ -291,14 +301,14 @@ TransporterRegistry::unpack(TransporterR
 	sectionData += sz;
       }
       
-      recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+      doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       
       readPtr += messageLen32;
     }//while
   } else {
     /** state = HaltIO || state == HaltInput */
 
-    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
+    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS) && doStopReceiving == false) {
       Uint32 word1 = readPtr[0];
       Uint32 word2 = readPtr[1];
       Uint32 word3 = readPtr[2];
@@ -368,7 +378,7 @@ TransporterRegistry::unpack(TransporterR
 	  sectionData += sz;
 	}
 
-	recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
+	doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       } else {
 	DEBUG("prepareReceive(...) - Discarding message to block: "
 	      << rBlockNum << " from Node: " << remoteNodeId);
@@ -377,6 +387,7 @@ TransporterRegistry::unpack(TransporterR
       readPtr += messageLen32;
     }//while
   }//if
+  stopReceiving = doStopReceiving;
   return readPtr;
 }
 

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-03-14 10:31:02 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-05-31 18:26:14 +0000
@@ -1333,6 +1333,7 @@ TransporterRegistry::performReceive(Tran
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
 
   bool hasReceived = false;
+  bool stopReceiving = false;
 
   if (recvdata.m_has_data_transporters.get(0))
   {
@@ -1357,7 +1358,7 @@ TransporterRegistry::performReceive(Tran
 
 #ifdef NDB_TCP_TRANSPORTER
   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
-      id != BitmaskImpl::NotFound;
+      id != BitmaskImpl::NotFound && !stopReceiving;
       id = recvdata.m_has_data_transporters.find_next(id + 1))
   {
     bool hasdata = false;
@@ -1376,7 +1377,7 @@ TransporterRegistry::performReceive(Tran
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
         recvdata.transporter_recv_from(id);
-        Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
+        Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id], stopReceiving);
         t->updateReceiveDataPtr(szUsed);
         hasdata = t->hasReceiveData();
       }
@@ -1389,7 +1390,7 @@ TransporterRegistry::performReceive(Tran
 #ifdef NDB_SCI_TRANSPORTER
   //performReceive
   //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
-  for (int i=0; i<nSCITransporters; i++) 
+  for (int i=0; i<nSCITransporters && !stopReceiving; i++)
   {
     SCI_Transporter  *t = theSCITransporters[i];
     const NodeId nodeId = t->getRemoteNodeId();
@@ -1404,14 +1405,14 @@ TransporterRegistry::performReceive(Tran
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
         callbackObj->transporter_recv_from(nodeId);
-        Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+        Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId], stopReceiving);
         t->updateReceivePtr(newPtr);
       }
     } 
   }
 #endif
 #ifdef NDB_SHM_TRANSPORTER
-  for (int i=0; i<nSHMTransporters; i++) 
+  for (int i=0; i<nSHMTransporters && !stopReceiving; i++)
   {
     SHM_Transporter *t = theSHMTransporters[i];
     const NodeId nodeId = t->getRemoteNodeId();
@@ -1426,7 +1427,8 @@ TransporterRegistry::performReceive(Tran
         t->getReceivePtr(&readPtr, &eodPtr);
         recvdata.transporter_recv_from(nodeId);
         Uint32 *newPtr = unpack(recvdata,
-                                readPtr, eodPtr, nodeId, ioStates[nodeId]);
+                                readPtr, eodPtr, nodeId, ioStates[nodeId],
+				stopReceiving);
         t->updateReceivePtr(newPtr);
       }
     } 

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2012-01-24 06:20:13 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2012-05-31 18:26:14 +0000
@@ -133,7 +133,7 @@ void mt_init_receiver_cache(){}
 void mt_set_section_chunk_size(){}
 #endif
 
-void
+bool
 TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
                                                Uint8 prio,
                                                Uint32 * const theData,
@@ -198,7 +198,7 @@ TransporterReceiveHandleKernel::deliver_
                 header, theData, secPtrI);
 
 #endif
-    return;
+    return false;
   }
   
   /**
@@ -234,6 +234,7 @@ TransporterReceiveHandleKernel::deliver_
     sendprioa(m_thr_no /* self */,
               header, theData, NULL);
 #endif
+  return false;
 }
 
 NdbOut & 

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	2012-01-23 20:23:08 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	2012-05-31 18:26:14 +0000
@@ -44,7 +44,7 @@ public:
 #endif
 
   /* TransporterCallback interface. */
-  void deliver_signal(SignalHeader * const header,
+  bool deliver_signal(SignalHeader * const header,
                       Uint8 prio,
                       Uint32 * const signalData,
                       LinearSectionPtr ptr[3]);

=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp	2012-05-31 18:26:14 +0000
@@ -217,12 +217,12 @@ ClusterMgr::doStop( ){
 void
 ClusterMgr::forceHB()
 {
-  theFacade.lock_mutex();
+  theFacade.lock_poll_mutex();
 
   if(waitingForHB)
   {
-    NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
-    theFacade.unlock_mutex();
+    NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000);
+    theFacade.unlock_poll_mutex();
     return;
   }
 
@@ -243,7 +243,7 @@ ClusterMgr::forceHB()
     }
   }
   waitForHBFromNodes.bitAND(ndb_nodes);
-  theFacade.unlock_mutex();
+  theFacade.unlock_poll_mutex();
 
 #ifdef DEBUG_REG
   char buf[128];
@@ -273,18 +273,19 @@ ClusterMgr::forceHB()
 #endif
       raw_sendSignal(&signal, nodeId);
     }
+    flush_send_buffers();
     unlock();
   }
   /* Wait for nodes to reply - if any heartbeats was sent */
-  theFacade.lock_mutex();
+  theFacade.lock_poll_mutex();
   if (!waitForHBFromNodes.isclear())
-    NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
+    NdbCondition_WaitTimeout(waitForHBCond, theFacade.thePollMutex, 1000);
 
   waitingForHB= false;
 #ifdef DEBUG_REG
   ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
 #endif
-  theFacade.unlock_mutex();
+  theFacade.unlock_poll_mutex();
 }
 
 void
@@ -298,12 +299,14 @@ ClusterMgr::startup()
 
   lock();
   theFacade.doConnect(nodeId);
+  flush_send_buffers();
   unlock();
 
   for (Uint32 i = 0; i<3000; i++)
   {
     lock();
     theFacade.theTransporterRegistry->update_connections();
+    flush_send_buffers();
     unlock();
     if (theNode.is_connected())
       break;
@@ -345,9 +348,9 @@ ClusterMgr::threadMain()
   {
     /* Sleep at 100ms between each heartbeat check */
     NDB_TICKS before = now;
-    for (Uint32 i = 0; i<10; i++)
+    for (Uint32 i = 0; i<5; i++)
     {
-      NdbSleep_MilliSleep(10);
+      NdbSleep_MilliSleep(20);
       {
         Guard g(clusterMgrThreadMutex);
         /**
@@ -381,7 +384,7 @@ ClusterMgr::threadMain()
     nodeFailRep->noOfNodes = 0;
     NodeBitmask::clear(nodeFailRep->theNodes);
 
-    trp_client::lock();
+    lock();
     for (int i = 1; i < MAX_NODES; i++){
       /**
        * Send register request (heartbeat) to all available nodes 
@@ -444,12 +447,16 @@ ClusterMgr::threadMain()
         NodeBitmask::set(nodeFailRep->theNodes, nodeId);
       }
     }
+    flush_send_buffers();
+    unlock();
 
     if (nodeFailRep->noOfNodes)
     {
+      lock();
       raw_sendSignal(&nodeFail_signal, getOwnNodeId());
+      flush_send_buffers();
+      unlock();
     }
-    trp_client::unlock();
   }
 }
 
@@ -530,7 +537,7 @@ ClusterMgr::trp_deliver_signal(const Ndb
       tSignal.theReceiversBlockNumber= refToBlock(ref);
       tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
       tSignal.theSendersBlockRef = API_CLUSTERMGR;
-      safe_sendSignal(&tSignal, aNodeId);
+      safe_noflush_sendSignal(&tSignal, aNodeId);
     }
     break;
   }
@@ -1435,7 +1442,7 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal& 
   {
     m_clusterMgr.lock();
     m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
+    m_clusterMgr.flush_send_buffers();
     m_clusterMgr.unlock();
   }
 }
-

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2012-05-31 18:26:14 +0000
@@ -1669,7 +1669,6 @@ NdbScanOperation::executeCursor(int node
    * Call finaliseScanOldApi() for old style scans before
    * proceeding
    */  
-  bool locked = false;
   NdbImpl* theImpl = theNdb->theImpl;
 
   int res = 0;
@@ -1680,9 +1679,7 @@ NdbScanOperation::executeCursor(int node
   }
 
   {
-    locked = true;
     NdbTransaction * tCon = theNdbCon;
-    theImpl->lock();
     
     Uint32 seq = tCon->theNodeSequence;
     
@@ -1733,9 +1730,6 @@ done:
     m_api_receivers_count = theParallelism;
   }
 
-  if (locked)
-    theImpl->unlock();
-
   return res;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2012-05-31 18:26:14 +0000
@@ -733,27 +733,6 @@ NdbTransaction::executeAsynchPrepare(Ndb
     releaseCompletedQueries();
   }
 
-  NdbScanOperation* tcOp = m_theFirstScanOperation;
-  if (tcOp != 0){
-    // Execute any cursor operations
-    while (tcOp != NULL) {
-      int tReturnCode;
-      tReturnCode = tcOp->executeCursor(theDBnode);
-      if (tReturnCode == -1) {
-        DBUG_VOID_RETURN;
-      }//if
-      tcOp->postExecuteRelease(); // Release unneeded resources
-                                  // outside TP mutex
-      tcOp = (NdbScanOperation*)tcOp->next();
-    } // while
-    m_theLastScanOperation->next(m_firstExecutedScanOp);
-    m_firstExecutedScanOp = m_theFirstScanOperation;
-    // Discard cursor operations, since these are also
-    // in the complete operations list we do not need
-    // to release them.
-    m_theFirstScanOperation = m_theLastScanOperation = NULL;
-  }
-
   bool tTransactionIsStarted = theTransactionIsStarted;
   NdbOperation*	tLastOp = theLastOpInList;
   Ndb* tNdb = theNdb;
@@ -1013,6 +992,7 @@ NdbTransaction::sendTC_HBREP()		// Send 
  
   tNdb->theImpl->lock();
   const int res = tNdb->theImpl->sendSignal(tSignal,theDBnode);
+  tNdb->theImpl->flush_send_buffers();
   tNdb->theImpl->unlock();
   tNdb->releaseSignal(tSignal);
 
@@ -1040,6 +1020,26 @@ NdbTransaction::doSend()
   This method assumes that at least one operation or query have been defined.
   This is ensured by the caller of this routine (=execute).
   */
+  NdbScanOperation* tcOp = m_theFirstScanOperation;
+  if (tcOp != 0){
+    // Execute any cursor operations
+    while (tcOp != NULL) {
+      int tReturnCode;
+      tReturnCode = tcOp->executeCursor(theDBnode);
+      if (tReturnCode == -1) {
+        goto fail;
+      }//if
+      tcOp->postExecuteRelease(); // Release unneeded resources
+                                  // outside TP mutex
+      tcOp = (NdbScanOperation*)tcOp->next();
+    } // while
+    m_theLastScanOperation->next(m_firstExecutedScanOp);
+    m_firstExecutedScanOp = m_theFirstScanOperation;
+    // Discard cursor operations, since these are also
+    // in the complete operations list we do not need
+    // to release them.
+    m_theFirstScanOperation = m_theLastScanOperation = NULL;
+  }
 
   switch(theSendStatus){
   case sendOperations: {

=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp	2012-05-31 18:26:14 +0000
@@ -73,7 +73,7 @@ Ndb::init(int aMaxNoOfTransactions)
   }//if
   theInitState = StartingInit;
   TransporterFacade * theFacade =  theImpl->m_transporter_facade;
-  theEventBuffer->m_mutex = theFacade->theMutexPtr;
+  theEventBuffer->m_mutex = theImpl->m_mutex;
 
   const Uint32 tRef = theImpl->open(theFacade);
 
@@ -92,9 +92,9 @@ Ndb::init(int aMaxNoOfTransactions)
   }
 
   /* Init cached min node version */
-  theFacade->lock_mutex();
+  theFacade->lock_poll_mutex();
   theCachedMinDbNodeVersion = theFacade->getMinDbNodeVersion();
-  theFacade->unlock_mutex();
+  theFacade->unlock_poll_mutex();
   
   theDictionary->setTransporter(this, theFacade);
   
@@ -1465,7 +1465,7 @@ NdbTransaction::sendTC_COMMIT_ACK(NdbImp
   Uint32 * dataPtr = aSignal->getDataPtrSend();
   dataPtr[0] = transId1;
   dataPtr[1] = transId2;
-  impl->safe_sendSignal(aSignal, refToNode(aTCRef));
+  impl->safe_noflush_sendSignal(aSignal, refToNode(aTCRef));
 }
 
 int
@@ -1503,6 +1503,7 @@ NdbImpl::send_event_report(bool has_lock
 done:
   if (!has_lock)
   {
+    flush_send_buffers();
     unlock();
   }
   return ret;

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2012-05-31 18:26:14 +0000
@@ -59,6 +59,9 @@ static int indexToNumber(int index)
 #define TRP_DEBUG(t)
 #endif
 
+#define DBG_POLL 0
+#define dbg(x,y) if (DBG_POLL) printf("%llu : " x "\n", NdbTick_CurrentNanosecond() / 1000, y)
+
 /*****************************************************************************
  * Call back functions
  *****************************************************************************/
@@ -206,7 +209,7 @@ TRACE_GSN(Uint32 gsn)
 /**
  * The execute function : Handle received signal
  */
-void
+bool
 TransporterFacade::deliver_signal(SignalHeader * const header,
                                   Uint8 prio, Uint32 * const theData,
                                   LinearSectionPtr ptr[3])
@@ -224,11 +227,13 @@ TransporterFacade::deliver_signal(Signal
   }
 #endif  
 
+  bool last_message = false;
   if (tRecBlockNo >= MIN_API_BLOCK_NO)
   {
     trp_client * clnt = m_threads.get(tRecBlockNo);
     if (clnt != 0)
     {
+      last_message = m_poll_owner->m_poll.lock_client(clnt);
       /**
        * Handle received signal immediately to avoid any unnecessary
        * copying of data, allocation of memory and other things. Copying
@@ -285,13 +290,13 @@ TransporterFacade::deliver_signal(Signal
               NdbApiSignal tmpSignal(*header);
               NdbApiSignal * tSignal = &tmpSignal;
               tSignal->setDataPtr(tDataPtr);
+              last_message = m_poll_owner->m_poll.lock_client(clnt);
               clnt->trp_deliver_signal(tSignal, 0);
             }
           }
         }
       }
     }
-    return;
   }
   else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
            tRecBlockNo <= MAX_API_FIXED_BLOCK_NO) 
@@ -303,6 +308,7 @@ TransporterFacade::deliver_signal(Signal
       NdbApiSignal tmpSignal(*header);
       NdbApiSignal * tSignal = &tmpSignal;
       tSignal->setDataPtr(theData);
+      last_message = m_poll_owner->m_poll.lock_client(clnt);
       clnt->trp_deliver_signal(tSignal, ptr);
     }//if   
   }
@@ -317,6 +323,7 @@ TransporterFacade::deliver_signal(Signal
       abort();
     }
   }
+  return last_message;
 }
 
 // These symbols are needed, but not used in the API
@@ -351,7 +358,7 @@ TransporterFacade::start_instance(NodeId
   (void)signal(SIGPIPE, SIG_IGN);
 #endif
 
-  theTransporterRegistry = new TransporterRegistry(this, this);
+  theTransporterRegistry = new TransporterRegistry(this, this, false);
   if (theTransporterRegistry == NULL)
     return -1;
 
@@ -446,6 +453,41 @@ runSendRequest_C(void * me)
   return 0;
 }
 
+inline
+void
+link_buffer(TFBuffer* dst, const TFBuffer * src)
+{
+  assert(dst);
+  assert(src);
+  assert(src->m_head);
+  assert(src->m_tail);
+  TFBufferGuard g0(* dst);
+  TFBufferGuard g1(* src);
+  if (dst->m_head == 0)
+  {
+    dst->m_head = src->m_head;
+  }
+  else
+  {
+    dst->m_tail->m_next = src->m_head;
+  }
+  dst->m_tail = src->m_tail;
+  dst->m_bytes_in_buffer += src->m_bytes_in_buffer;
+}
+
+static const Uint32 SEND_THREAD_NO = 0;
+
+void
+TransporterFacade::wakeup_send_thread(Uint32 node)
+{
+  Guard g(m_send_thread_mutex);
+  if (m_send_thread_nodes.get(SEND_THREAD_NO) == false)
+  {
+    NdbCondition_Signal(m_send_thread_cond);
+  }
+  m_send_thread_nodes.set(SEND_THREAD_NO);
+}
+
 void TransporterFacade::threadMainSend(void)
 {
   theTransporterRegistry->startSending();
@@ -456,15 +498,61 @@ void TransporterFacade::threadMainSend(v
 
   m_socket_server.startServer();
 
-  while(!theStopReceive) {
-    NdbSleep_MilliSleep(sendThreadWaitMillisec);
-    NdbMutex_Lock(theMutexPtr);
-    if (sendPerformedLastInterval == 0) {
-      theTransporterRegistry->performSend();
-    }
-    sendPerformedLastInterval = 0;
-    NdbMutex_Unlock(theMutexPtr);
+  NdbMutex_Lock(m_send_thread_mutex);
+  while(!theStopReceive)
+  {
+    if (m_send_thread_nodes.get(SEND_THREAD_NO) == false)
+    {
+      NdbCondition_WaitTimeout(m_send_thread_cond,
+                               m_send_thread_mutex,
+                               sendThreadWaitMillisec);
+    }
+    m_send_thread_nodes.clear(SEND_THREAD_NO);
+    NdbMutex_Unlock(m_send_thread_mutex);
+    bool all_empty = true;
+    do
+    {
+      all_empty = true;
+      for (Uint32 i = 0; i<MAX_NODES; i++)
+      {
+        struct TFSendBuffer * b = m_send_buffers + i;
+        NdbMutex_Lock(&b->m_mutex);
+        if (b->m_sending)
+        {
+          /**
+           * sender does stuff when clearing m_sending
+           */
+        }
+        else if (b->m_buffer.m_bytes_in_buffer > 0 ||
+                 b->m_out_buffer.m_bytes_in_buffer > 0)
+        {
+          /**
+           * Copy all data from m_buffer to m_out_buffer
+           */
+          TFBuffer copy = b->m_buffer;
+          bzero(&b->m_buffer, sizeof(b->m_buffer));
+          b->m_sending = true;
+          NdbMutex_Unlock(&b->m_mutex);
+          if (copy.m_bytes_in_buffer > 0)
+          {
+            link_buffer(&b->m_out_buffer, &copy);
+          }
+          theTransporterRegistry->performSend(i);
+          NdbMutex_Lock(&b->m_mutex);
+          b->m_sending = false;
+          if (b->m_buffer.m_bytes_in_buffer > 0 ||
+              b->m_out_buffer.m_bytes_in_buffer > 0)
+          {
+            all_empty = false;
+          }
+        }
+        NdbMutex_Unlock(&b->m_mutex);
+      }
+    } while (!theStopReceive && all_empty == false);
+
+    NdbMutex_Lock(m_send_thread_mutex);
   }
+  NdbMutex_Unlock(m_send_thread_mutex);
   theTransporterRegistry->stopSending();
 
   m_socket_server.stopServer();
@@ -499,6 +587,7 @@ void TransporterFacade::threadMainReceiv
   {
     theClusterMgr->lock();
     theTransporterRegistry->update_connections();
+    theClusterMgr->flush_send_buffers();
     theClusterMgr->unlock();
     NdbSleep_MilliSleep(100);
   }//while
@@ -510,10 +599,9 @@ void TransporterFacade::threadMainReceiv
   and returns to caller. It will quickly come back here if not all
   data was received for the worker thread.
 */
-void TransporterFacade::external_poll(Uint32 wait_time)
+void
+TransporterFacade::external_poll(trp_client* clnt, Uint32 wait_time)
 {
-  NdbMutex_Unlock(theMutexPtr);
-
 #ifdef NDB_SHM_TRANSPORTER
   /*
     If shared memory transporters are used we need to set our sigmask
@@ -529,10 +617,11 @@ void TransporterFacade::external_poll(Ui
   NdbThread_set_shm_sigmask(TRUE);
 #endif
 
-  NdbMutex_Lock(theMutexPtr);
   if (res > 0)
   {
+    m_poll_cnt++;
     theTransporterRegistry->performReceive();
+    m_poll_cnt++;
   }
 }
 
@@ -552,11 +641,22 @@ TransporterFacade::TransporterFacade(Glo
   theSendThread(NULL),
   theReceiveThread(NULL),
   m_fragmented_signal_id(0),
-  m_globalDictCache(cache)
+  m_globalDictCache(cache),
+   m_send_buffer("sendbufferpool")
 {
   DBUG_ENTER("TransporterFacade::TransporterFacade");
-  theMutexPtr = NdbMutex_CreateWithName("TTFM");
+  thePollMutex = NdbMutex_CreateWithName("PollMutex");
   sendPerformedLastInterval = 0;
+  m_open_close_mutex = NdbMutex_Create();
+  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(m_send_buffers); i++)
+  {
+    BaseString n;
+    n.assfmt("sendbuffer:%u", i);
+    NdbMutex_InitWithName(&m_send_buffers[i].m_mutex, n.c_str());
+  }
+
+  m_send_thread_cond = NdbCondition_Create();
+  m_send_thread_mutex = NdbMutex_CreateWithName("SendThreadMutex");
 
   for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
     m_fixed2dynamic[i]= RNIL;
@@ -567,6 +667,7 @@ TransporterFacade::TransporterFacade(Glo
 
   theClusterMgr = new ClusterMgr(*this);
 
+  m_poll_cnt = 0;
   DBUG_VOID_RETURN;
 }
 
@@ -641,12 +742,28 @@ TransporterFacade::configure(NodeId node
     DBUG_RETURN(false);
 
   // Configure send buffers
-  Uint32 total_send_buffer = 0;
-  iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
-  Uint64 extra_send_buffer = 0;
-  iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
-  theTransporterRegistry->allocate_send_buffers(total_send_buffer,
-                                                extra_send_buffer);
+  if (!m_send_buffer.inited())
+  {
+    Uint32 total_send_buffer = 0;
+    iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
+
+    if (total_send_buffer == 0)
+    {
+      total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
+    }
+
+    Uint64 extra_send_buffer = 0;
+    iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
+
+    total_send_buffer += extra_send_buffer;
+    if (!m_send_buffer.init(total_send_buffer))
+    {
+      ndbout << "Unable to allocate "
+             << total_send_buffer
+             << " bytes of memory for send buffers!!" << endl;
+      return false;
+    }
+  }
 
   Uint32 auto_reconnect=1;
   iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
@@ -686,15 +803,52 @@ TransporterFacade::for_each(trp_client* 
                             const NdbApiSignal* aSignal, 
                             const LinearSectionPtr ptr[3])
 {
+  trp_client * woken[16];
+  Uint32 cnt_woken = 0;
   Uint32 sz = m_threads.m_statusNext.size();
   for (Uint32 i = 0; i < sz ; i ++) 
   {
     trp_client * clnt = m_threads.m_objectExecute[i];
     if (clnt != 0 && clnt != sender)
     {
-      clnt->trp_deliver_signal(aSignal, ptr);
+      bool res = m_poll_owner->m_poll.check_if_locked(clnt);
+      if (res)
+      {
+        clnt->trp_deliver_signal(aSignal, ptr);
+      }
+      else
+      {
+	NdbMutex_Lock(clnt->m_mutex);
+        int save = clnt->m_poll.m_waiting;
+        clnt->trp_deliver_signal(aSignal, ptr);
+        if (save != clnt->m_poll.m_waiting &&
+            clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN)
+        {
+          woken[cnt_woken++] = clnt;
+          if (cnt_woken == NDB_ARRAY_SIZE(woken))
+          {
+            lock_poll_mutex();
+            remove_from_poll_queue(woken, cnt_woken);
+            unlock_poll_mutex();
+            unlock_and_signal(woken, cnt_woken);
+            cnt_woken = 0;
+          }
+        }
+        else
+        {
+          NdbMutex_Unlock(clnt->m_mutex);
+        }
+      }
     }
   }
+
+  if (cnt_woken != 0)
+  {
+    lock_poll_mutex();
+    remove_from_poll_queue(woken, cnt_woken);
+    unlock_poll_mutex();
+    unlock_and_signal(woken, cnt_woken);
+  }
 }
 
 void
@@ -732,7 +886,8 @@ TransporterFacade::close_clnt(trp_client
   int ret = -1;
   if (clnt)
   {
-    NdbMutex_Lock(theMutexPtr);
+    Guard g(m_open_close_mutex);
+    if (DBG_POLL) ndbout_c("close(%p)", clnt);
     if (m_threads.get(clnt->m_blockNo) == clnt)
     {
       m_threads.close(clnt->m_blockNo);
@@ -742,7 +897,20 @@ TransporterFacade::close_clnt(trp_client
     {
       assert(0);
     }
-    NdbMutex_Unlock(theMutexPtr);
+
+    /**
+     * Wait for any ongoing poll
+     */
+    clnt->lock();
+    Uint32 start_val = m_poll_cnt;
+    clnt->unlock();
+    Uint32 curr_val;
+    do
+    {
+      clnt->lock();
+      curr_val = m_poll_cnt;
+      clnt->unlock();
+    } while (start_val == curr_val && ((start_val & 1) == 1));
   }
   return ret;
 }
@@ -751,7 +919,8 @@ Uint32
 TransporterFacade::open_clnt(trp_client * clnt, int blockNo)
 {
   DBUG_ENTER("TransporterFacade::open");
-  Guard g(theMutexPtr);
+  Guard g(m_open_close_mutex);
+  if (DBG_POLL) ndbout_c("open(%p)", clnt);
   int r= m_threads.open(clnt);
   if (r < 0)
   {
@@ -785,10 +954,13 @@ TransporterFacade::~TransporterFacade()
   DBUG_ENTER("TransporterFacade::~TransporterFacade");
 
   delete theClusterMgr;  
-  NdbMutex_Lock(theMutexPtr);
+  NdbMutex_Lock(thePollMutex);
   delete theTransporterRegistry;
-  NdbMutex_Unlock(theMutexPtr);
-  NdbMutex_Destroy(theMutexPtr);
+  NdbMutex_Unlock(thePollMutex);
+  NdbMutex_Destroy(thePollMutex);
+  NdbMutex_Destroy(m_open_close_mutex);
+  NdbMutex_Destroy(m_send_thread_mutex);
+  NdbCondition_Destroy(m_send_thread_cond);
 #ifdef API_TRACE
   signalLogger.setOutputStream(0);
 #endif
@@ -821,6 +993,7 @@ TransporterFacade::calculateSendLimit()
 // adaptive algorithm.
 //-------------------------------------------------
 void TransporterFacade::forceSend(Uint32 block_number) {
+#if 0
   checkCounter--;
   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
   sendPerformedLastInterval = 1;
@@ -828,6 +1001,7 @@ void TransporterFacade::forceSend(Uint32
     calculateSendLimit();
   }
   theTransporterRegistry->forceSendCheck(0);
+#endif
 }
 
 //-------------------------------------------------
@@ -835,6 +1009,7 @@ void TransporterFacade::forceSend(Uint32
 //-------------------------------------------------
 int
 TransporterFacade::checkForceSend(Uint32 block_number) {  
+#if 0
   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
   //-------------------------------------------------
   // This code is an adaptive algorithm to discover when
@@ -856,6 +1031,9 @@ TransporterFacade::checkForceSend(Uint32
     calculateSendLimit();
   }
   return did_send;
+#else
+  return 0;
+#endif
 }
 
 
@@ -863,7 +1041,8 @@ TransporterFacade::checkForceSend(Uint32
  * SEND SIGNAL METHODS
  *****************************************************************************/
 int
-TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode)
+TransporterFacade::sendSignal(trp_client* clnt,
+                              const NdbApiSignal * aSignal, NodeId aNode)
 {
   const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
   Uint32 Tlen = aSignal->theLength;
@@ -881,7 +1060,8 @@ TransporterFacade::sendSignal(const NdbA
   }
 #endif
   if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
-    SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
+    SendStatus ss = theTransporterRegistry->prepareSend(clnt,
+                                                        aSignal,
                                                         1, // JBB
                                                         tDataPtr,
                                                         aNode,
@@ -1129,7 +1309,8 @@ public:
  * boundaries to simplify reassembly in the kernel.
  */
 int
-TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal,
+TransporterFacade::sendFragmentedSignal(trp_client* clnt,
+                                        const NdbApiSignal* inputSignal,
                                         NodeId aNode,
                                         const GenericSectionPtr ptr[3],
                                         Uint32 secs)
@@ -1144,7 +1325,7 @@ TransporterFacade::sendFragmentedSignal(
   
   /* If there's no need to fragment, send normally */
   if (totalSectionLength <= CHUNK_SZ)
-    return sendSignal(aSignal, aNode, ptr, secs);
+    return sendSignal(clnt, aSignal, aNode, ptr, secs);
   
   // TODO : Consider tracing fragment signals?
 #ifdef API_TRACE
@@ -1263,7 +1444,8 @@ TransporterFacade::sendFragmentedSignal(
       // do prepare send
       {
 	SendStatus ss = theTransporterRegistry->prepareSend
-	  (&tmp_signal, 
+	  (clnt,
+           &tmp_signal,
 	   1, /*JBB*/
 	   tmp_signal_data,
 	   aNode, 
@@ -1318,7 +1500,8 @@ TransporterFacade::sendFragmentedSignal(
   int ret;
   {
     SendStatus ss = theTransporterRegistry->prepareSend
-      (aSignal,
+      (clnt,
+       aSignal,
        1/*JBB*/,
        aSignal->getConstDataPtrSend(),
        aNode,
@@ -1338,7 +1521,8 @@ TransporterFacade::sendFragmentedSignal(
 }
 
 int
-TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal,
+TransporterFacade::sendFragmentedSignal(trp_client* clnt,
+                                        const NdbApiSignal* aSignal,
                                         NodeId aNode,
                                         const LinearSectionPtr ptr[3],
                                         Uint32 secs)
@@ -1364,12 +1548,13 @@ TransporterFacade::sendFragmentedSignal(
   tmpPtr[2].sz= linCopy[2].sz;
   tmpPtr[2].sectionIter= &two;
 
-  return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
+  return sendFragmentedSignal(clnt, aSignal, aNode, tmpPtr, secs);
 }
   
 
 int
-TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
+TransporterFacade::sendSignal(trp_client* clnt,
+                              const NdbApiSignal* aSignal, NodeId aNode,
                               const LinearSectionPtr ptr[3], Uint32 secs)
 {
   Uint32 save = aSignal->m_noOfSections;
@@ -1386,7 +1571,8 @@ TransporterFacade::sendSignal(const NdbA
   }
 #endif
   SendStatus ss = theTransporterRegistry->prepareSend
-    (aSignal,
+    (clnt,
+     aSignal,
      1, // JBB
      aSignal->getConstDataPtrSend(),
      aNode,
@@ -1402,7 +1588,8 @@ TransporterFacade::sendSignal(const NdbA
 }
 
 int
-TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
+TransporterFacade::sendSignal(trp_client* clnt,
+                              const NdbApiSignal* aSignal, NodeId aNode,
                               const GenericSectionPtr ptr[3], Uint32 secs)
 {
   Uint32 save = aSignal->m_noOfSections;
@@ -1421,7 +1608,8 @@ TransporterFacade::sendSignal(const NdbA
   }
 #endif
   SendStatus ss = theTransporterRegistry->prepareSend
-    (aSignal,
+    (clnt,
+     aSignal,
      1, // JBB
      aSignal->getConstDataPtrSend(),
      aNode,
@@ -1566,21 +1754,25 @@ TransporterFacade::get_active_ndb_object
   return m_threads.m_use_cnt;
 }
 
-
 void
 TransporterFacade::start_poll(trp_client* clnt)
 {
-  lock_mutex();
-  clnt->m_poll.m_locked = true;
+  assert(clnt->m_poll.m_locked == true);
+  assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == false);
+  assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
 }
 
 void
 TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time)
 {
-  clnt->m_poll.m_waiting = true;
+  dbg("do_poll(%p)", clnt);
+  clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WAITING;
   assert(clnt->m_poll.m_locked == true);
-  trp_client* owner = m_poll_owner;
-  if (owner != NULL && owner != clnt)
+  assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == false);
+  lock_poll_mutex();
+  if (m_poll_owner != NULL)
   {
     /*
       We didn't get hold of the poll "right". We will sleep on a
@@ -1591,82 +1783,371 @@ TransporterFacade::do_poll(trp_client* c
       queue if it hasn't happened already. It is usually already out of the
       queue but at time-out it could be that the object is still there.
     */
-    assert(clnt->m_poll.m_poll_owner == false);
     add_to_poll_queue(clnt);
-    NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
-                             wait_time);
-    if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
+    unlock_poll_mutex();
+    dbg("cond_wait(%p)", clnt);
+    NdbCondition_WaitTimeout(clnt->m_poll.m_condition,
+                             clnt->m_mutex,
+			     wait_time);
+
+    switch(clnt->m_poll.m_waiting) {
+    case trp_client::PollQueue::PQ_WOKEN:
+      dbg("%p - PQ_WOKEN", clnt);
+      // we have already been taken out of poll queue
+      assert(clnt->m_poll.m_poll_queue == false);
+
+      /**
+       * clear m_poll_owner
+       *   it can be that we were proposed as poll owner
+       *   and later woken by another thread that became poll owner
+       */
+      clnt->m_poll.m_poll_owner = false;
+      clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+      return;
+    case trp_client::PollQueue::PQ_IDLE:
+      dbg("%p - PQ_IDLE", clnt);
+      assert(false); // should not happen!!
+      // ...treat as timeout...fall-through
+    case trp_client::PollQueue::PQ_WAITING:
+      dbg("%p - PQ_WAITING", clnt);
+      break;
+    }
+
+    lock_poll_mutex();
+    if (clnt->m_poll.m_poll_owner == false)
     {
+      /**
+       * We got timeout...hopefully rare...
+       */
+      assert(clnt->m_poll.m_poll_queue == true);
       remove_from_poll_queue(clnt);
+      unlock_poll_mutex();
+      clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+      dbg("%p - PQ_WAITING poll_owner == false => return", clnt);
+      return;
+    }
+    else if (m_poll_owner != 0)
+    {
+      /**
+       * We were proposed as new poll owner...but someone else beat us too it
+       *   break out...and retry the whole thing...
+       */
+      clnt->m_poll.m_poll_owner = false;
+      assert(clnt->m_poll.m_poll_queue == false);
+      unlock_poll_mutex();
+      clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+      dbg("%p - PQ_WAITING m_poll_owner != 0 => return", clnt);
+      return;
     }
+
+    /**
+     * We were proposed as new poll owner, and was first to wakeup
+     */
+    dbg("%p - PQ_WAITING => new poll_owner", clnt);
   }
-  else
-  {
-    /*
-      We got the poll "right" and we poll until data is received. After
-      receiving data we will check if all data is received, if not we
-      poll again.
+  m_poll_owner = clnt;
+  unlock_poll_mutex();
+
+  /**
+   * We have the poll "right" and we poll until data is received. After
+   * receiving data we will check if all data is received,
+   * if not we poll again.
     */
-    assert(owner == clnt || clnt->m_poll.m_poll_owner == false);
-    m_poll_owner = clnt;
-    clnt->m_poll.m_poll_owner = true;
-    external_poll(wait_time);
+  clnt->m_poll.m_poll_owner = true;
+  clnt->m_poll.start_poll(clnt);
+  dbg("%p->external_poll", clnt);
+  external_poll(clnt, wait_time);
+
+#ifndef NDEBUG
+  {
+    Uint32 cnt = clnt->m_poll.m_locked_cnt;
+    assert(cnt >= 1);
+    assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+    assert(clnt->m_poll.m_locked_clients[0] == clnt);
+    // no duplicates
+    if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt);
+    for (Uint32 i = 0; i < cnt; i++)
+    {
+      trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+      if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
+      for (Uint32 j = i + 1; j < cnt; j++)
+      {
+        assert(tmp != clnt->m_poll.m_locked_clients[j]);
+      }
+    }
+    if (DBG_POLL) printf("\n");
+
+    for (Uint32 i = 1; i < cnt; i++)
+    {
+      trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+      if (tmp->m_poll.m_locked == true)
+      {
+        assert(tmp->m_poll.m_waiting != trp_client::PollQueue::PQ_IDLE);
+      }
+      else
+      {
+        assert(tmp->m_poll.m_poll_owner == false);
+        assert(tmp->m_poll.m_poll_queue == false);
+        assert(tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
+      }
+    }
   }
-}
+#endif
 
-void
-TransporterFacade::wakeup(trp_client* clnt)
-{
-  if (clnt->m_poll.m_waiting)
+  /**
+   * we're finished polling
+   */
+  clnt->m_poll.m_poll_owner = false;
+  clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_IDLE;
+
+  /**
+   * count woken clients
+   *   and put them to the left in array
+   */
+  Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
+  Uint32 cnt_woken = 0;
+  trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
+  for (Uint32 i = 0; i < cnt; i++)
   {
-    clnt->m_poll.m_waiting = false;
-    if (m_poll_owner != clnt)
+    trp_client * tmp = arr[i];
+    if (tmp->m_poll.m_waiting == trp_client::PollQueue::PQ_WOKEN)
     {
-      remove_from_poll_queue(clnt);
-      NdbCondition_Signal(clnt->m_poll.m_condition);
+      arr[i] = arr[cnt_woken];
+      arr[cnt_woken] = tmp;
+      cnt_woken++;
+    }
+  }
+
+  if (DBG_POLL)
+  {
+    Uint32 cnt = clnt->m_poll.m_locked_cnt;
+    printf("after sort: cnt: %u ", cnt);
+    for (Uint32 i = 0; i < cnt; i++)
+    {
+      trp_client * tmp = clnt->m_poll.m_locked_clients[i];
+      printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
     }
+    printf("\n");
   }
+
+  lock_poll_mutex();
+
+  /**
+   * now remove all woken from poll queue
+   * note: poll mutex held
+   */
+  remove_from_poll_queue(arr, cnt_woken);
+
+  /**
+   * take last client in poll queue and try lock it
+   */
+  bool new_owner_locked = true;
+  trp_client * new_owner = remove_last_from_poll_queue();
+  assert(new_owner != clnt);
+  if (new_owner != 0)
+  {
+    dbg("0 new_owner: %p", new_owner);
+    /**
+     * Note: we can only try lock here, to prevent potential deadlock
+     *   given that we acquire mutex in different order when starting to poll
+     */
+    if (NdbMutex_Trylock(new_owner->m_mutex) != 0)
+    {
+      /**
+       * If we fail to try lock...we put him back into poll-queue
+       */
+      new_owner_locked = false;
+      add_to_poll_queue(new_owner);
+      dbg("try-lock failed %p", new_owner);
+    }
+  }
+
+  /**
+   * clear poll owner variable and unlock
+   */
+  m_poll_owner = 0;
+  unlock_poll_mutex();
+
+  if (new_owner && new_owner_locked)
+  {
+    /**
+     * Propose a poll owner
+     *   Wakeup a client, that will race to become poll-owner
+     *   I.e we don't assign m_poll_owner but let the wakeing up thread
+     *   do this itself, if it is first
+     */
+    dbg("wake new_owner(%p)", new_owner);
+#ifndef NDEBUG
+    for (Uint32 i = 0; i <= cnt; i++)
+    {
+      assert(clnt->m_poll.m_locked_clients[i] != new_owner);
+    }
+#endif
+    assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING);
+    new_owner->m_poll.m_poll_owner = true;
+    NdbCondition_Signal(new_owner->m_poll.m_condition);
+    NdbMutex_Unlock(new_owner->m_mutex);
+  }
+
+  /**
+   * Now wake all the woken clients
+   */
+  unlock_and_signal(arr, cnt_woken);
+
+  /**
+   * And unlock the rest that we delivered messages to
+   */
+  for (Uint32 i = cnt_woken; i < cnt; i++)
+  {
+    dbg("unlock (%p)", arr[i]);
+    NdbMutex_Unlock(arr[i]->m_mutex);
+  }
+
+  /**
+   * If we failed to propose new poll owner above, then we retry it here
+   */
+  if (new_owner_locked == false)
+  {
+    dbg("new_owner_locked == false", 0);
+    trp_client * new_owner;
+    while (true)
+    {
+      new_owner = 0;
+      lock_poll_mutex();
+      if (m_poll_owner != 0)
+      {
+        /**
+         * new poll owner already appointed...no need to do anything
+         */
+        break;
+      }
+
+      new_owner = remove_last_from_poll_queue();
+      if (new_owner == 0)
+      {
+        /**
+         * poll queue empty...no need to do anything
+         */
+        break;
+      }
+
+      if (NdbMutex_Trylock(new_owner->m_mutex) == 0)
+      {
+        /**
+         * We locked a client that we will propose as poll owner
+         */
+        break;
+      }
+
+      /**
+       * Failed to lock new owner, but him back on queue, and retry
+       */
+      add_to_poll_queue(new_owner);
+      unlock_poll_mutex();
+    }
+
+    unlock_poll_mutex();
+
+    if (new_owner)
+    {
+      /**
+       * Propose a poll owner
+       */
+      assert(new_owner->m_poll.m_waiting == trp_client::PollQueue::PQ_WAITING);
+      new_owner->m_poll.m_poll_owner = true;
+      NdbCondition_Signal(new_owner->m_poll.m_condition);
+      NdbMutex_Unlock(new_owner->m_mutex);
+    }
+  }
+
+  clnt->m_poll.m_locked_cnt = 0;
+  dbg("%p->do_poll return", clnt);
 }
 
 void
-TransporterFacade::complete_poll(trp_client* clnt)
+TransporterFacade::wakeup(trp_client* clnt)
 {
-  clnt->m_poll.m_waiting = false;
-  if (!clnt->m_poll.m_locked)
-  {
-    assert(clnt->m_poll.m_poll_owner == false);
-    return;
+  switch(clnt->m_poll.m_waiting) {
+  case trp_client::PollQueue::PQ_WAITING:
+    dbg("TransporterFacade::wakeup(%p) PQ_WAITING => PQ_WOKEN", clnt);
+    clnt->m_poll.m_waiting = trp_client::PollQueue::PQ_WOKEN;
+    break;
+  case trp_client::PollQueue::PQ_WOKEN:
+    dbg("TransporterFacade::wakeup(%p) PQ_WOKEN", clnt);
+    break;
+  case trp_client::PollQueue::PQ_IDLE:
+    dbg("TransporterFacade::wakeup(%p) PQ_IDLE", clnt);
+    break;
   }
+}
 
-  /*
-   When completing the poll for this thread we must return the poll
-   ownership if we own it. We will give it to the last thread that
-   came here (the most recent) which is likely to be the one also
-   last to complete. We will remove that thread from the conditional
-   wait queue and set him as the new owner of the poll "right".
-   We will wait however with the signal until we have unlocked the
-   mutex for performance reasons.
-   See Stevens book on Unix NetworkProgramming: The Sockets Networking
-   API Volume 1 Third Edition on page 703-704 for a discussion on this
-   subject.
-  */
-  trp_client* new_owner = 0;
-  if (m_poll_owner == clnt)
+void
+TransporterFacade::unlock_and_signal(trp_client ** arr, Uint32 cnt)
+{
+  for (Uint32 i = 0; i < cnt; i++)
   {
-    assert(clnt->m_poll.m_poll_owner == true);
-    m_poll_owner = new_owner = remove_last_from_poll_queue();
+    NdbCondition_Signal(arr[i]->m_poll.m_condition);
+    NdbMutex_Unlock(arr[i]->m_mutex);
   }
-  if (new_owner)
+}
+
+void
+TransporterFacade::complete_poll(trp_client* clnt)
+{
+  assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == false);
+  assert(clnt->m_poll.m_waiting == trp_client::PollQueue::PQ_IDLE);
+  clnt->flush_send_buffers();
+}
+
+inline
+void
+trp_client::PollQueue::start_poll(trp_client* self)
+{
+  assert(m_waiting == PQ_WAITING);
+  assert(m_locked);
+  assert(m_poll_owner);
+  assert(m_locked_cnt == 0);
+  assert(&self->m_poll == this);
+  m_locked_cnt = 1;
+  m_locked_clients[0] = self;
+}
+
+inline
+bool
+trp_client::PollQueue::check_if_locked(const trp_client* clnt) const
+{
+  for (Uint32 i = 0; i<m_locked_cnt; i++)
   {
-    assert(new_owner->m_poll.m_poll_owner == false);
-    assert(new_owner->m_poll.m_locked == true);
-    assert(new_owner->m_poll.m_waiting == true);
-    NdbCondition_Signal(new_owner->m_poll.m_condition);
-    new_owner->m_poll.m_poll_owner = true;
+    if (m_locked_clients[i] == clnt) // already locked
+      return true;
   }
-  clnt->m_poll.m_locked = false;
-  clnt->m_poll.m_poll_owner = false;
-  unlock_mutex();
+  return false;
+}
+
+inline
+bool
+trp_client::PollQueue::lock_client (trp_client* clnt)
+{
+  assert(m_locked_cnt <= NDB_ARRAY_SIZE(m_locked_clients));
+  if (check_if_locked(clnt))
+    return false;
+
+  dbg("lock_client(%p)", clnt);
+
+  /**
+   * API_PACKED contains a number of messages,
+   *   so even if we signal "last_message"...
+   *   there will(might) be a few more
+   *
+   * TODO: Check that 3 is correct number!!
+   */
+  const Uint32 MAX_MESSAGES_IN_API_PACKED = 3;
+
+  assert(m_locked_cnt < NDB_ARRAY_SIZE(m_locked_clients));
+  NdbMutex_Lock(clnt->m_mutex);
+  m_locked_clients[m_locked_cnt++] = clnt;
+  return m_locked_cnt + MAX_MESSAGES_IN_API_PACKED >= NDB_ARRAY_SIZE(m_locked_clients);
 }
 
 void
@@ -1677,7 +2158,9 @@ TransporterFacade::add_to_poll_queue(trp
   assert(clnt->m_poll.m_next == 0);
   assert(clnt->m_poll.m_locked == true);
   assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == false);
 
+  clnt->m_poll.m_poll_queue = true;
   if (m_poll_queue_head == 0)
   {
     assert(m_poll_queue_tail == 0);
@@ -1694,12 +2177,26 @@ TransporterFacade::add_to_poll_queue(trp
 }
 
 void
+TransporterFacade::remove_from_poll_queue(trp_client** arr, Uint32 cnt)
+{
+  for (Uint32 i = 0; i< cnt; i++)
+  {
+    if (arr[i]->m_poll.m_poll_queue)
+    {
+      remove_from_poll_queue(arr[i]);
+    }
+  }
+}
+
+void
 TransporterFacade::remove_from_poll_queue(trp_client* clnt)
 {
   assert(clnt != 0);
   assert(clnt->m_poll.m_locked == true);
   assert(clnt->m_poll.m_poll_owner == false);
+  assert(clnt->m_poll.m_poll_queue == true);
 
+  clnt->m_poll.m_poll_queue = false;
   if (clnt->m_poll.m_prev != 0)
   {
     clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
@@ -1758,6 +2255,174 @@ SignalSectionIterator::getNextWords(Uint
   return NULL;
 }
 
+void
+TransporterFacade::flush_send_buffer(Uint32 node, const TFBuffer * sb)
+{
+  Guard g(&m_send_buffers[node].m_mutex);
+  assert(node < NDB_ARRAY_SIZE(m_send_buffers));
+  link_buffer(&m_send_buffers[node].m_buffer, sb);
+}
+
+void
+TransporterFacade::flush_and_send_buffer(Uint32 node, const TFBuffer * sb)
+{
+  assert(node < NDB_ARRAY_SIZE(m_send_buffers));
+  struct TFSendBuffer * b = m_send_buffers + node;
+  bool wake = false;
+  NdbMutex_Lock(&b->m_mutex);
+  link_buffer(&b->m_buffer, sb);
+
+  if (b->m_sending == true)
+  {
+    /**
+     * Sender will check if here is data, and wake send-thread
+     * if needed
+     */
+  }
+  else
+  {
+    b->m_sending = true;
+
+    /**
+     * Copy all data from m_buffer to m_out_buffer
+     */
+    TFBuffer copy = b->m_buffer;
+    bzero(&b->m_buffer, sizeof(b->m_buffer));
+    NdbMutex_Unlock(&b->m_mutex);
+    link_buffer(&b->m_out_buffer, &copy);
+    theTransporterRegistry->performSend(node);
+    NdbMutex_Lock(&b->m_mutex);
+    b->m_sending = false;
+    if (b->m_buffer.m_bytes_in_buffer > 0 ||
+        b->m_out_buffer.m_bytes_in_buffer > 0)
+    {
+      wake = true;
+    }
+  }
+  NdbMutex_Unlock(&b->m_mutex);
+
+  if (wake)
+  {
+    wakeup_send_thread(node);
+  }
+}
+
+Uint32
+TransporterFacade::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
+                                           Uint32 max)
+{
+  if (max == 0)
+  {
+    return 0;
+  }
+
+  Uint32 count = 0;
+  TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+  TFBufferGuard g0(* b);
+  TFPage *page = b->m_head;
+  while (page != NULL && count < max)
+  {
+    dst[count].iov_base = page->m_data+page->m_start;
+    dst[count].iov_len = page->m_bytes;
+    assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+    page = page->m_next;
+    count++;
+  }
+
+  return count;
+}
+
+Uint32
+TransporterFacade::bytes_sent(NodeId node, Uint32 bytes)
+{
+  TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+  TFBufferGuard g0(* b);
+  Uint32 used_bytes = b->m_bytes_in_buffer;
+
+  if (bytes == 0)
+  {
+    return used_bytes;
+  }
+
+  assert(used_bytes >= bytes);
+  used_bytes -= bytes;
+  b->m_bytes_in_buffer = used_bytes;
+
+  TFPage *page = b->m_head;
+  TFPage *prev = 0;
+  while (bytes && bytes >= page->m_bytes)
+  {
+    prev = page;
+    bytes -= page->m_bytes;
+    page = page->m_next;
+  }
+
+  if (used_bytes == 0)
+  {
+    m_send_buffer.release(b->m_head, b->m_tail);
+    b->m_head = 0;
+    b->m_tail = 0;
+  }
+  else
+  {
+    if (prev)
+    {
+      m_send_buffer.release(b->m_head, prev);
+    }
+
+    page->m_start += bytes;
+    page->m_bytes -= bytes;
+    assert(page->m_start + page->m_bytes <= page->max_data_bytes());
+    b->m_head = page;
+  }
+
+  return used_bytes;
+}
+
+bool
+TransporterFacade::has_data_to_send(NodeId node)
+{
+  /**
+   * Not used...
+   */
+  abort();
+}
+
+void
+TransporterFacade::reset_send_buffer(NodeId node, bool should_be_empty)
+{
+  // Make sure that buffer is already empty if the "should_be_empty"
+  // flag is set. This is done to quickly catch any stray signals
+  // written to the send buffer while not being connected
+  TFBuffer *b0 = &m_send_buffers[node].m_buffer;
+  TFBuffer *b1 = &m_send_buffers[node].m_out_buffer;
+  bool has_data_to_send =
+    (b0->m_head != NULL && b0->m_head->m_bytes) ||
+    (b1->m_head != NULL && b1->m_head->m_bytes);
+
+  if (should_be_empty && !has_data_to_send)
+     return;
+  assert(!should_be_empty);
+
+  {
+    TFBuffer *b = &m_send_buffers[node].m_buffer;
+    if (b->m_head != 0)
+    {
+      m_send_buffer.release(b->m_head, b->m_tail);
+    }
+    bzero(b, sizeof(* b));
+  }
+
+  {
+    TFBuffer *b = &m_send_buffers[node].m_out_buffer;
+    if (b->m_head != 0)
+    {
+      m_send_buffer.release(b->m_head, b->m_tail);
+    }
+    bzero(b, sizeof(* b));
+  }
+}
+
 #ifdef UNIT_TEST
 
 // Unit test code starts
@@ -2035,6 +2700,7 @@ TransporterFacade::ext_update_connection
 {
   theClusterMgr->lock();
   theTransporterRegistry->update_connections();
+  theClusterMgr->flush_send_buffers();
   theClusterMgr->unlock();
 }
 
@@ -2074,11 +2740,11 @@ TransporterFacade::setupWakeup()
 {
   /* Ask TransporterRegistry to setup wakeup sockets */
   bool rc;
-  lock_mutex();
+  lock_poll_mutex();
   {
     rc = theTransporterRegistry->setup_wakeup_socket();
   }
-  unlock_mutex();
+  unlock_poll_mutex();
   return rc;
 }
 

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2012-05-31 18:26:14 +0000
@@ -26,6 +26,7 @@
 #include "DictCache.hpp"
 #include <BlockNumbers.h>
 #include <mgmapi.h>
+#include "trp_buffer.hpp"
 
 class ClusterMgr;
 class ArbitMgr;
@@ -81,14 +82,14 @@ public:
 
   // Only sends to nodes which are alive
 private:
-  int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
-  int sendSignal(const NdbApiSignal*, NodeId,
+  int sendSignal(trp_client*, const NdbApiSignal *, NodeId nodeId);
+  int sendSignal(trp_client*, const NdbApiSignal*, NodeId,
                  const LinearSectionPtr ptr[3], Uint32 secs);
-  int sendSignal(const NdbApiSignal*, NodeId,
+  int sendSignal(trp_client*, const NdbApiSignal*, NodeId,
                  const GenericSectionPtr ptr[3], Uint32 secs);
-  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
+  int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId,
                            const LinearSectionPtr ptr[3], Uint32 secs);
-  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
+  int sendFragmentedSignal(trp_client*, const NdbApiSignal*, NodeId,
                            const GenericSectionPtr ptr[3], Uint32 secs);
 public:
 
@@ -129,8 +130,8 @@ public:
   void for_each(trp_client* clnt,
                 const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
   
-  void lock_mutex();
-  void unlock_mutex();
+  void lock_poll_mutex();
+  void unlock_poll_mutex();
 
   // Improving the API performance
   void forceSend(Uint32 block_number);
@@ -159,7 +160,10 @@ public:
   void complete_poll(trp_client*);
   void wakeup(trp_client*);
 
-  void external_poll(Uint32 wait_time);
+  void external_poll(trp_client* clnt, Uint32 wait_time);
+
+  void remove_from_poll_queue(trp_client**, Uint32 cnt);
+  void unlock_and_signal(trp_client**, Uint32 cnt);
 
   trp_client* get_poll_owner(bool) const { return m_poll_owner;}
   trp_client* remove_last_from_poll_queue();
@@ -177,7 +181,7 @@ public:
   int get_auto_reconnect() const;
 
   /* TransporterCallback interface. */
-  void deliver_signal(SignalHeader * const header,
+  bool deliver_signal(SignalHeader * const header,
                       Uint8 prio,
                       Uint32 * const signalData,
                       LinearSectionPtr ptr[3]);
@@ -189,22 +193,7 @@ public:
   void reportError(NodeId nodeId, TransporterError errorCode,
                    const char *info = 0);
   void transporter_recv_from(NodeId node);
-  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
-  {
-    return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
-  }
-  Uint32 bytes_sent(NodeId node, Uint32 bytes)
-  {
-    return theTransporterRegistry->bytes_sent(node, bytes);
-  }
-  bool has_data_to_send(NodeId node)
-  {
-    return theTransporterRegistry->has_data_to_send(node);
-  }
-  void reset_send_buffer(NodeId node, bool should_be_empty)
-  {
-    theTransporterRegistry->reset_send_buffer(node, should_be_empty);
-  }
+
   /**
    * Wakeup
    *
@@ -300,24 +289,67 @@ private:
   Uint32 m_fragmented_signal_id;
 
 public:
-  NdbMutex* theMutexPtr;
+  volatile Uint32 m_poll_cnt;
+  NdbMutex* m_open_close_mutex;
+  NdbMutex* thePollMutex;
 
 public:
   GlobalDictCache *m_globalDictCache;
+
+public:
+  /**
+   * Add a send buffer to out-buffer
+   */
+  void flush_send_buffer(Uint32 node, const TFBuffer* buffer);
+  void flush_and_send_buffer(Uint32 node, const TFBuffer* buffer);
+
+  /**
+   * Allocate a send buffer
+   */
+  TFPage *alloc_sb_page() { return m_send_buffer.try_alloc(1);}
+
+  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
+  Uint32 bytes_sent(NodeId node, Uint32 bytes);
+  bool has_data_to_send(NodeId node);
+  void reset_send_buffer(NodeId node, bool should_be_empty);
+
+private:
+  TFMTPool m_send_buffer;
+  struct TFSendBuffer
+  {
+    TFSendBuffer() { m_sending = false; }
+    NdbMutex m_mutex;
+    bool m_sending;
+
+    /**
+     * This is data that have been "scheduled" to be sent
+     */
+    TFBuffer m_buffer;
+
+    /**
+     * This is data that is being sent
+     */
+    TFBuffer m_out_buffer;
+  } m_send_buffers[MAX_NODES];
+
+  void wakeup_send_thread(Uint32 node);
+  NdbMutex * m_send_thread_mutex;
+  NdbCondition * m_send_thread_cond;
+  NodeBitmask m_send_thread_nodes;
 };
 
 inline
 void 
-TransporterFacade::lock_mutex()
+TransporterFacade::lock_poll_mutex()
 {
-  NdbMutex_Lock(theMutexPtr);
+  NdbMutex_Lock(thePollMutex);
 }
 
 inline
 void 
-TransporterFacade::unlock_mutex()
+TransporterFacade::unlock_poll_mutex()
 {
-  NdbMutex_Unlock(theMutexPtr);
+  NdbMutex_Unlock(thePollMutex);
 }
 
 #include "ClusterMgr.hpp"

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2012-03-05 13:04:02 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2012-05-31 18:26:14 +0000
@@ -204,13 +204,13 @@ Ndb_cluster_connection_impl::get_next_al
 
   while ((id = get_next_node(iter)))
   {
-    tp->lock_mutex();
+    tp->lock_poll_mutex();
     if (tp->get_node_alive(id) != 0)
     {
-      tp->unlock_mutex();
+      tp->unlock_poll_mutex();
       return id;
     }
-    tp->unlock_mutex();
+    tp->unlock_poll_mutex();
   }
   return 0;
 }
@@ -235,7 +235,7 @@ Ndb_cluster_connection::max_nodegroup()
     return 0;
 
   Bitmask<MAX_NDB_NODES> ng;
-  tp->lock_mutex();
+  tp->lock_poll_mutex();
   for(unsigned i= 0; i < no_db_nodes(); i++)
   {
     //************************************************
@@ -245,7 +245,7 @@ Ndb_cluster_connection::max_nodegroup()
     if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES)
       ng.set(n.m_state.nodeGroup);
   }
-  tp->unlock_mutex();
+  tp->unlock_poll_mutex();
 
   if (ng.isclear())
     return 0;
@@ -267,7 +267,7 @@ int Ndb_cluster_connection::get_no_ready
     return -1;
 
   unsigned int foundAliveNode = 0;
-  tp->lock_mutex();
+  tp->lock_poll_mutex();
   for(unsigned i= 0; i < no_db_nodes(); i++)
   {
     //************************************************
@@ -277,7 +277,7 @@ int Ndb_cluster_connection::get_no_ready
       foundAliveNode++;
     }
   }
-  tp->unlock_mutex();
+  tp->unlock_poll_mutex();
 
   return foundAliveNode;
 }

=== modified file 'storage/ndb/src/ndbapi/trp_buffer.cpp'
--- a/storage/ndb/src/ndbapi/trp_buffer.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_buffer.cpp	2012-05-31 18:26:14 +0000
@@ -44,6 +44,11 @@ TFPool::~TFPool()
     free (m_alloc_ptr);
 }
 
+TFMTPool::TFMTPool(const char * name)
+{
+  NdbMutex_InitWithName(&m_mutex, name);
+}
+
 void
 TFBuffer::validate() const
 {
@@ -52,8 +57,8 @@ TFBuffer::validate() const
     assert(m_head == m_tail);
     if (m_head)
     {
-      assert(m_head->m_bytes < m_head->m_size);  // Full pages should be release
-      assert(m_head->m_bytes == m_head->m_start);
+      assert(m_head->m_start < m_head->m_size);  // Full pages should be release
+      assert(m_head->m_bytes == 0);
     }
     return;
   }
@@ -67,10 +72,10 @@ TFBuffer::validate() const
   while (p)
   {
     assert(p->m_bytes <= p->m_size);
-    assert(p->m_start <= p->m_bytes);
-    assert((p->m_start & 3) == 0);
-    assert(p->m_bytes - p->m_start > 0);
-    assert(p->m_bytes - p->m_start <= (int)m_bytes_in_buffer);
+    assert(p->m_start <= p->m_size);
+    assert((p->m_bytes & 3) == 0);
+    assert(p->m_start + p->m_bytes <= p->m_size);
+    assert(p->m_bytes <= (int)m_bytes_in_buffer);
     assert(p->m_next != p);
     if (p == m_tail)
     {
@@ -80,7 +85,7 @@ TFBuffer::validate() const
     {
       assert(p->m_next != 0);
     }
-    sum += p->m_bytes - p->m_start;
+    sum += p->m_bytes;
     p = p->m_next;
   }
   assert(sum == m_bytes_in_buffer);

=== modified file 'storage/ndb/src/ndbapi/trp_buffer.hpp'
--- a/storage/ndb/src/ndbapi/trp_buffer.hpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_buffer.hpp	2012-05-31 18:26:14 +0000
@@ -20,6 +20,7 @@
 
 #include <ndb_global.h>
 #include <ndb_socket.h> // struct iovec
+#include <portlib/NdbMutex.h>
 
 struct TFPage
 {
@@ -98,9 +99,9 @@ struct TFSentinel
 struct TFBuffer
 {
   TFBuffer() { m_bytes_in_buffer = 0; m_head = m_tail = 0;}
-  Uint32 m_bytes_in_buffer;
   struct TFPage * m_head;
   struct TFPage * m_tail;
+  Uint32 m_bytes_in_buffer;
 
   void validate() const;
 };
@@ -139,6 +140,37 @@ public:
   void release_list(TFPage*);
 };
 
+class TFMTPool : private TFPool
+{
+  NdbMutex m_mutex;
+public:
+  TFMTPool(const char * name = 0);
+
+  bool init(size_t total_memory, size_t page_sz = 32768) {
+    return TFPool::init(total_memory, page_sz);
+  }
+  bool inited() const {
+    return TFPool::inited();
+  }
+
+  TFPage* try_alloc(Uint32 N) {
+    Guard g(&m_mutex);
+    return TFPool::try_alloc(N);
+  }
+
+  void release(TFPage* first, TFPage* last) {
+    Guard g(&m_mutex);
+    TFPool::release(first, last);
+  }
+
+  void release_list(TFPage* head) {
+    TFPage * tail = head;
+    while (tail->m_next != 0)
+      tail = tail->m_next;
+    release(head, tail);
+  }
+};
+
 inline
 TFPage *
 TFPool::try_alloc(Uint32 n)

=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp	2011-09-08 06:22:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.cpp	2012-05-31 18:26:14 +0000
@@ -21,12 +21,10 @@
 trp_client::trp_client()
   : m_blockNo(~Uint32(0)), m_facade(0)
 {
-  m_poll.m_waiting = false;
-  m_poll.m_locked = false;
-  m_poll.m_poll_owner = false;
-  m_poll.m_next = 0;
-  m_poll.m_prev = 0;
-  m_poll.m_condition = NdbCondition_Create();
+  m_mutex = NdbMutex_Create();
+
+  m_send_nodes_cnt = 0;
+  m_send_buffers = new TFBuffer[MAX_NODES];
 }
 
 trp_client::~trp_client()
@@ -35,13 +33,14 @@ trp_client::~trp_client()
    * require that trp_client user
    *  doesnt destroy object when holding any locks
    */
-  assert(m_poll.m_locked == 0);
-  assert(m_poll.m_poll_owner == false);
-  assert(m_poll.m_next == 0);
-  assert(m_poll.m_prev == 0);
+  m_poll.assert_destroy();
 
   close();
   NdbCondition_Destroy(m_poll.m_condition);
+  NdbMutex_Destroy(m_mutex);
+
+  assert(m_send_nodes_cnt == 0);
+  delete [] m_send_buffers;
 }
 
 Uint32
@@ -86,6 +85,9 @@ trp_client::close()
 void
 trp_client::start_poll()
 {
+  NdbMutex_Lock(m_mutex);
+  assert(m_poll.m_locked == false);
+  m_poll.m_locked = true;
   m_facade->start_poll(this);
 }
 
@@ -98,30 +100,163 @@ trp_client::do_poll(Uint32 to)
 void
 trp_client::complete_poll()
 {
-  m_facade->complete_poll(this);
+  if (m_poll.m_locked == true)
+  {
+    m_facade->complete_poll(this);
+    m_poll.m_locked = false;
+    NdbMutex_Unlock(m_mutex);
+  }
 }
 
 int
 trp_client::do_forceSend(int val)
 {
-  int did_send = 1;
+  /**
+   * since force send is disabled in this "version"
+   *   set forceSend=1 always...
+   */
+  val = 1;
+
   if (val == 0)
   {
-    did_send = m_facade->checkForceSend(m_blockNo);
+    flush_send_buffers();
+    return 0;
   }
   else if (val == 1)
   {
-    m_facade->forceSend(m_blockNo);
+    for (Uint32 i = 0; i < m_send_nodes_cnt; i++)
+    {
+      Uint32 n = m_send_nodes_list[i];
+      TFBuffer* b = m_send_buffers + n;
+      TFBufferGuard g0(* b);
+      m_facade->flush_and_send_buffer(n, b);
+      bzero(b, sizeof(* b));
+    }
+    m_send_nodes_cnt = 0;
+    m_send_nodes_mask.clear();
+    return 1;
   }
-  return did_send;
+  return 0;
 }
 
 int
-trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
+trp_client::safe_noflush_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
 {
   return m_facade->m_poll_owner->raw_sendSignal(signal, nodeId);
 }
 
+int
+trp_client::safe_sendSignal(const NdbApiSignal* signal, Uint32 nodeId)
+{
+  int res;
+  if ((res = safe_noflush_sendSignal(signal, nodeId)) != -1)
+  {
+    m_facade->m_poll_owner->flush_send_buffers();
+  }
+  return res;
+}
+
+Uint32 *
+trp_client::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
+                        Uint32 max_use)
+{
+  TFBuffer* b = m_send_buffers+node;
+  TFBufferGuard g0(* b);
+  bool found = m_send_nodes_mask.get(node);
+  if (likely(found))
+  {
+    TFPage * page = b->m_tail;
+    assert(page != 0);
+    if (page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
+    {
+      return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
+    }
+  }
+  else
+  {
+    Uint32 cnt = m_send_nodes_cnt;
+    m_send_nodes_mask.set(node);
+    m_send_nodes_list[cnt] = node;
+    m_send_nodes_cnt = cnt + 1;
+  }
+
+  TFPage* page = m_facade->alloc_sb_page();
+  if (likely(page != 0))
+  {
+    page->init();
+
+    if (b->m_tail == NULL)
+    {
+      assert(!found);
+      b->m_head = page;
+      b->m_tail = page;
+    }
+    else
+    {
+      assert(found);
+      assert(b->m_head != NULL);
+      b->m_tail->m_next = page;
+      b->m_tail = page;
+    }
+    return (Uint32 *)(page->m_data);
+  }
+
+  if (b->m_tail == 0)
+  {
+    assert(!found);
+    m_send_nodes_mask.clear(node);
+    m_send_nodes_cnt--;
+  }
+  else
+  {
+    assert(found);
+  }
+
+  return NULL;
+}
+
+Uint32
+trp_client::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
+{
+  TFBuffer* b = m_send_buffers+node;
+  TFBufferGuard g0(* b);
+  assert(m_send_nodes_mask.get(node));
+  assert(b->m_head != 0);
+  assert(b->m_tail != 0);
+
+  TFPage *page = b->m_tail;
+  assert(page->m_bytes + lenBytes <= page->max_data_bytes());
+  page->m_bytes += lenBytes;
+  b->m_bytes_in_buffer += lenBytes;
+  return b->m_bytes_in_buffer;
+}
+
+void
+trp_client::flush_send_buffers()
+{
+  assert(m_poll.m_locked);
+  Uint32 cnt = m_send_nodes_cnt;
+  for (Uint32 i = 0; i<cnt; i++)
+  {
+    Uint32 node = m_send_nodes_list[i];
+    assert(m_send_nodes_mask.get(node));
+    TFBuffer* b = m_send_buffers + node;
+    TFBufferGuard g0(* b);
+    m_facade->flush_send_buffer(node, b);
+    bzero(b, sizeof(* b));
+  }
+
+  m_send_nodes_cnt = 0;
+  m_send_nodes_mask.clear();
+}
+
+bool
+trp_client::forceSend(NodeId node)
+{
+  do_forceSend();
+  return true;
+}
+
 #include "NdbImpl.hpp"
 
 PollGuard::PollGuard(NdbImpl& impl)

=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp	2011-09-09 12:29:43 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp	2012-05-31 18:26:14 +0000
@@ -20,13 +20,17 @@
 
 #include <ndb_global.h>
 #include <NdbCondition.h>
+#include <TransporterRegistry.hpp>
+#include <NodeBitmask.hpp>
 
 struct trp_node;
 class NdbApiSignal;
 struct LinearSectionPtr;
 struct GenericSectionPtr;
+struct TFBuffer;
+class trp_client;
 
-class trp_client
+class trp_client : TransporterSendBufferHandle
 {
   friend class TransporterFacade;
 public:
@@ -46,6 +50,7 @@ public:
   void complete_poll();
   void wakeup();
 
+  void flush_send_buffers();
   int do_forceSend(int val = 1);
 
   int raw_sendSignal(const NdbApiSignal*, Uint32 nodeId);
@@ -71,10 +76,29 @@ public:
   /**
    * This sendSignal variant can be called on any trp_client
    *   but perform the send on the trp_client-object that
-   *   is currently receiving
+   *   is currently receiving (m_poll_owner)
+   *
+   * This variant does flush thread-local send-buffer
    */
   int safe_sendSignal(const NdbApiSignal*, Uint32 nodeId);
 
+  /**
+   * This sendSignal variant can be called on any trp_client
+   *   but perform the send on the trp_client-object that
+   *   is currently receiving (m_poll_owner)
+   *
+   * This variant does not flush thread-local send-buffer
+   */
+  int safe_noflush_sendSignal(const NdbApiSignal*, Uint32 nodeId);
+private:
+  /**
+   * TransporterSendBufferHandle interface
+   */
+  virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
+                              Uint32 max_use);
+  virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
+  virtual bool forceSend(NodeId node);
+
 private:
   Uint32 m_blockNo;
   TransporterFacade * m_facade;
@@ -82,15 +106,41 @@ private:
   /**
    * This is used for polling
    */
+public:
+  NdbMutex* m_mutex; // thread local mutex...
+private:
   struct PollQueue
   {
+    PollQueue();
+    void assert_destroy() const;
+
     bool m_locked;
     bool m_poll_owner;
-    bool m_waiting;
+    bool m_poll_queue;
+    enum { PQ_WOKEN, PQ_IDLE, PQ_WAITING } m_waiting;
     trp_client *m_prev;
     trp_client *m_next;
     NdbCondition * m_condition;
+
+    /**
+     * This is called by poll owner
+     *   before doing external poll
+     */
+    void start_poll(trp_client* self);
+
+    bool lock_client(trp_client*);
+    bool check_if_locked(const trp_client*) const;
+    Uint32 m_locked_cnt;
+    trp_client * m_locked_clients[16];
   } m_poll;
+
+  /**
+   * This is used for sending
+   */
+  Uint32 m_send_nodes_cnt;
+  Uint8 m_send_nodes_list[MAX_NODES];
+  NodeBitmask m_send_nodes_mask;
+  TFBuffer* m_send_buffers;
 };
 
 class PollGuard
@@ -115,7 +165,7 @@ inline
 void
 trp_client::lock()
 {
-  NdbMutex_Lock(m_facade->theMutexPtr);
+  NdbMutex_Lock(m_mutex);
   assert(m_poll.m_locked == false);
   m_poll.m_locked = true;
 }
@@ -124,9 +174,10 @@ inline
 void
 trp_client::unlock()
 {
+  assert(m_send_nodes_mask.isclear()); // Nothing unsent when unlocking...
   assert(m_poll.m_locked == true);
   m_poll.m_locked = false;
-  NdbMutex_Unlock(m_facade->theMutexPtr);
+  NdbMutex_Unlock(m_mutex);
 }
 
 inline
@@ -141,7 +192,7 @@ int
 trp_client::raw_sendSignal(const NdbApiSignal * signal, Uint32 nodeId)
 {
   assert(m_poll.m_locked);
-  return m_facade->sendSignal(signal, nodeId);
+  return m_facade->sendSignal(this, signal, nodeId);
 }
 
 inline
@@ -150,7 +201,7 @@ trp_client::raw_sendSignal(const NdbApiS
                            const LinearSectionPtr ptr[3], Uint32 secs)
 {
   assert(m_poll.m_locked);
-  return m_facade->sendSignal(signal, nodeId, ptr, secs);
+  return m_facade->sendSignal(this, signal, nodeId, ptr, secs);
 }
 
 inline
@@ -159,7 +210,7 @@ trp_client::raw_sendSignal(const NdbApiS
                            const GenericSectionPtr ptr[3], Uint32 secs)
 {
   assert(m_poll.m_locked);
-  return m_facade->sendSignal(signal, nodeId, ptr, secs);
+  return m_facade->sendSignal(this, signal, nodeId, ptr, secs);
 }
 
 inline
@@ -168,7 +219,7 @@ trp_client::raw_sendFragmentedSignal(con
                                      const LinearSectionPtr ptr[3], Uint32 secs)
 {
   assert(m_poll.m_locked);
-  return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+  return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs);
 }
 
 inline
@@ -178,7 +229,33 @@ trp_client::raw_sendFragmentedSignal(con
                                      Uint32 secs)
 {
   assert(m_poll.m_locked);
-  return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+  return m_facade->sendFragmentedSignal(this, signal, nodeId, ptr, secs);
+}
+
+inline
+trp_client::PollQueue::PollQueue()
+{
+  m_waiting = PQ_IDLE;
+  m_locked = false;
+  m_poll_owner = false;
+  m_poll_queue = false;
+  m_next = 0;
+  m_prev = 0;
+  m_condition = NdbCondition_Create();
+  m_locked_cnt = 0;
+}
+
+inline
+void
+trp_client::PollQueue::assert_destroy() const
+{
+  assert(m_waiting == PQ_IDLE);
+  assert(m_locked == false);
+  assert(m_poll_owner == false);
+  assert(m_poll_queue == false);
+  assert(m_next == 0);
+  assert(m_prev == 0);
+  assert(m_locked_cnt == 0);
 }
 
 #endif

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3929 to 3930) John David Duncan31 May