List:Internals« Previous MessageNext Message »
From:mikael Date:March 22 2005 1:14am
Subject:bk commit into 5.0 tree (mronstrom:1.1840)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of mikron. When mikron does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html

ChangeSet
  1.1840 05/03/22 02:14:13 mronstrom@stripped +9 -0
  WL 2405:
  Integrated scan and dictSignal into poll owner handling for
  improved performance of single threaded usage of NDB API
  Still some things to do

  ndb/src/ndbapi/TransporterFacade.hpp
    1.25 05/03/22 02:14:05 mronstrom@stripped +25 -7
    New PollGuard class implementation
    Changed cond wait queue to use NdbWaiter instead of Ndb object

  ndb/src/ndbapi/TransporterFacade.cpp
    1.36 05/03/22 02:14:05 mronstrom@stripped +149 -14
    New PollGuard class implementation
    Changed cond wait queue to use NdbWaiter instead of Ndb object

  ndb/src/ndbapi/Ndbinit.cpp
    1.31 05/03/22 02:14:05 mronstrom@stripped +2 -0
    new variables to initialise in NdbWaiter

  ndb/src/ndbapi/Ndbif.cpp
    1.30 05/03/22 02:14:04 mronstrom@stripped +51 -86
    Moved lots of code to PollGuard class, added some comments
    and used PollGuard object

  ndb/src/ndbapi/NdbWaiter.hpp
    1.2 05/03/22 02:14:04 mronstrom@stripped +19 -2
    NdbWaiter class expanded, is the object in the cond wait queue
    since the Ndb object is not always around
    Don't signal when we are poll_owner object

  ndb/src/ndbapi/NdbScanOperation.cpp
    1.64 05/03/22 02:14:04 mronstrom@stripped +22 -25
    unlock_and_signal hidden by by when poll_guard goes out of context

  ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.66 05/03/22 02:14:04 mronstrom@stripped +11 -20
    WL 2405 Better performance for single threaded usage of NDB API:
    Introduced a PollGuard class
    The wait_n_unlock takes care of what wait on NdbWaiter did plus
    unlock plus poll owner handling

  ndb/include/ndbapi/NdbScanOperation.hpp
    1.30 05/03/22 02:14:04 mronstrom@stripped +3 -1
    WL 2405 Better performance for single threaded usage of NDB API:
    Introduced a PollGuard class

  ndb/include/ndbapi/Ndb.hpp
    1.41 05/03/22 02:14:04 mronstrom@stripped +3 -3
    WL 2405 Better performance for single threaded usage of NDB API:
    Introduced a PollGuard class

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mronstrom
# Host:	c-dc08e253.1238-1-64736c10.cust.bredbandsbolaget.se
# Root:	/Users/mikron/wl2405

--- 1.40/ndb/include/ndbapi/Ndb.hpp	Fri Feb  4 14:40:48 2005
+++ 1.41/ndb/include/ndbapi/Ndb.hpp	Tue Mar 22 02:14:04 2005
@@ -985,6 +985,7 @@
 class NdbBlob;
 class NdbReceiver;
 class TransporterFacade;
+class PollGuard;
 
 typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
 
@@ -1586,10 +1587,9 @@
   Uint32  pollCompleted(NdbTransaction** aCopyArray);
   void    sendPrepTrans(int forceSend);
   void    reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans);
-  int     poll_trans(int milliSecs, int noOfEventsToWaitFor,
-                     TransporterFacade *tp);
+  int     poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg);
   void    waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor,
-		                    TransporterFacade *tp);
+		                    PollGuard *pg);
   void    completedTransaction(NdbTransaction* aTransaction);
   void    completedScanTransaction(NdbTransaction* aTransaction);
 

--- 1.29/ndb/include/ndbapi/NdbScanOperation.hpp	Mon Jan 10 01:25:08 2005
+++ 1.30/ndb/include/ndbapi/NdbScanOperation.hpp	Tue Mar 22 02:14:04 2005
@@ -21,6 +21,7 @@
 
 class NdbBlob;
 class NdbResultSet;
+class PollGuard;
 
 /**
  * @class NdbScanOperation
@@ -159,7 +160,8 @@
   int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
   virtual void release();
   
-  int close_impl(class TransporterFacade*, bool forceSend = false);
+  int close_impl(class TransporterFacade*, bool forceSend,
+                 PollGuard *poll_guard);
 
   // Overloaded methods from NdbCursorOperation
   int executeCursor(int ProcessorId);

--- 1.65/ndb/src/ndbapi/NdbDictionaryImpl.cpp	Thu Jan 27 18:38:14 2005
+++ 1.66/ndb/src/ndbapi/NdbDictionaryImpl.cpp	Tue Mar 22 02:14:04 2005
@@ -946,7 +946,8 @@
     m_buffer.clear();
 
     // Protected area
-    m_transporter->lock_mutex();
+    Uint32 block_no; //TODO fix this
+    PollGuard poll_guard(m_transporter, &m_waiter, block_no);
     Uint32 aNodeId;
     if (useMasterNodeId) {
       if ((m_masterNodeId == 0) ||
@@ -959,7 +960,6 @@
     }
     if(aNodeId == 0){
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       DBUG_RETURN(-1);
     }
     {
@@ -980,21 +980,15 @@
 	r = m_transporter->sendSignal(signal, aNodeId);
       }
       if(r != 0){
-	m_transporter->unlock_mutex();
 	continue;
       }
     }
     
     m_error.code= 0;
-    
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = wst;
-
-    m_waiter.wait(theWait);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst);
     // End of Protected area  
     
-    if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
+    if(ret_val == 0 && m_error.code == 0){
       // Normal return
       DBUG_RETURN(0);
     }
@@ -1002,7 +996,7 @@
     /**
      * Handle error codes
      */
-    if(m_waiter.m_state == WAIT_NODE_FAILURE)
+    if(ret_val == -2) //WAIT_NODE_FAILURE
       continue;
 
     if ( (temporaryMask & m_error.code) != 0 ) {
@@ -3064,26 +3058,23 @@
   for (Uint32 i = 0; i < RETRIES; i++) {
     m_buffer.clear();
     // begin protected
-    m_transporter->lock_mutex();
+    Uint32 block_no; //TODO fix this
+    PollGuard poll_guard(m_transporter, &m_waiter, block_no);
     Uint16 aNodeId = m_transporter->get_an_alive_node();
     if (aNodeId == 0) {
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       return -1;
     }
     if (m_transporter->sendSignal(signal, aNodeId) != 0) {
-      m_transporter->unlock_mutex();
       continue;
     }
     m_error.code= 0;
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
-    m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,
+                                          aNodeId, WAIT_LIST_TABLES_CONF);
     // end protected
-    if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
+    if (ret_val == 0 && m_error.code == 0)
       return 0;
-    if (m_waiter.m_state == WAIT_NODE_FAILURE)
+    if (ret_val == -2) //WAIT_NODE_FAILURE
       continue;
     return -1;
   }

--- 1.63/ndb/src/ndbapi/NdbScanOperation.cpp	Wed Jan 19 18:33:33 2005
+++ 1.64/ndb/src/ndbapi/NdbScanOperation.cpp	Tue Mar 22 02:14:04 2005
@@ -452,7 +452,8 @@
   
   Uint32 nodeId = theNdbCon->theDBnode;
   TransporterFacade* tp = TransporterFacade::instance();
-  Guard guard(tp->theMutexPtr);
+  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                       theNdb->theNdbBlockNumber);
   if(theError.code)
     return -1;
 
@@ -487,10 +488,8 @@
 	/**
 	 * No completed...
 	 */
-	theNdb->theImpl->theWaiter.m_node = nodeId;
-	theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-	int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
-	if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+        int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
+	if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 	  continue;
 	} else {
 	  idx = last;
@@ -647,9 +646,9 @@
 	       m_sent_receivers_count);
     
     TransporterFacade* tp = TransporterFacade::instance();
-    Guard guard(tp->theMutexPtr);
-    close_impl(tp, forceSend);
-    
+    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                         theNdb->theNdbBlockNumber);
+    close_impl(tp, forceSend, &poll_guard);
   } while(0);
   
   theNdbCon->theScanningOp = 0;
@@ -1313,7 +1312,8 @@
     if(fetchAllowed){
       if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
       TransporterFacade* tp = TransporterFacade::instance();
-      Guard guard(tp->theMutexPtr);
+      PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                           theNdb->theNdbBlockNumber);
       if(theError.code)
 	return -1;
       Uint32 seq = theNdbCon->theNodeSequence;
@@ -1323,10 +1323,8 @@
 	Uint32 tmp = m_sent_receivers_count;
 	s_idx = m_current_api_receiver; 
 	while(m_sent_receivers_count > 0 && !theError.code){
-	  theNdb->theImpl->theWaiter.m_node = nodeId;
-	  theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-	  int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
-	  if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+          int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
+	  if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 	    continue;
 	  }
 	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
@@ -1456,7 +1454,9 @@
 }
 
 int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
+                             PollGuard *poll_guard)
+{
   Uint32 seq = theNdbCon->theNodeSequence;
   Uint32 nodeId = theNdbCon->theDBnode;
   
@@ -1471,9 +1471,7 @@
    */
   while(theError.code == 0 && m_sent_receivers_count)
   {
-    theNdb->theImpl->theWaiter.m_node = nodeId;
-    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+    int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
     switch(return_code){
     case 0:
       break;
@@ -1541,9 +1539,7 @@
    */
   while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
   {
-    theNdb->theImpl->theWaiter.m_node = nodeId;
-    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+    int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
     switch(return_code){
     case 0:
       break;
@@ -1583,12 +1579,13 @@
 {
   
   TransporterFacade* tp = TransporterFacade::instance();
-  Guard guard(tp->theMutexPtr);
+  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                       theNdb->theNdbBlockNumber);
   Uint32 nodeId = theNdbCon->theDBnode;
   
   {
     int res;
-    if((res= close_impl(tp, forceSend)))
+    if((res= close_impl(tp, forceSend, &poll_guard)))
     {
       return res;
     }
@@ -1602,7 +1599,6 @@
   theError.code = 0;
   if (doSendScan(nodeId) == -1)
     return -1;
-  
   return 0;
 }
 
@@ -1612,8 +1608,9 @@
   
   {
     TransporterFacade* tp = TransporterFacade::instance();
-    Guard guard(tp->theMutexPtr);
-    res= close_impl(tp, forceSend);
+    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                         theNdb->theNdbBlockNumber);
+    res= close_impl(tp, forceSend, &poll_guard);
   }
 
   if(!res)

--- 1.29/ndb/src/ndbapi/Ndbif.cpp	Fri Feb  4 19:59:52 2005
+++ 1.30/ndb/src/ndbapi/Ndbif.cpp	Tue Mar 22 02:14:04 2005
@@ -334,6 +334,7 @@
   const Uint32 tFirstData = *tDataPtr;
   const Uint32 tLen = aSignal->getLength();
   void * tFirstDataPtr;
+  NdbWaiter *t_waiter;
 
   /*
     In order to support 64 bit processes in the application we need to use
@@ -835,11 +836,32 @@
   } 
   default:
     goto InvalidSignal;
-  }//switch
-  
-  if (theImpl->theWaiter.m_state == NO_WAIT) {
-    // Wake up the thread waiting for response
-    NdbCondition_Signal(theImpl->theWaiter.m_condition);
+  }//swich
+
+  t_waiter= &theImpl->theWaiter;
+  if (t_waiter->get_state() == NO_WAIT)
+  {
+    /*
+      If our waiter object is the owner of the "poll rights", then we
+      can simply return, we will return from this routine to the
+      place where external_poll was called. From there it will move
+      the "poll ownership" to a new thread if available.
+
+      If our waiter object doesn't own the "poll rights", then we must
+      signal the thread from where this waiter object called
+      its conditional wait. This will wake up this thread so that it
+      can continue its work.
+    */
+    TransporterFacade *tp= TransporterFacade::instance();
+    if (tp->get_poll_owner() != t_waiter)
+    {
+      /*
+         Wake up the thread waiting for response and remove it from queue
+         of objects waiting for receive completion
+      */
+      tp->remove_from_cond_wait_queue(t_waiter);
+      t_waiter->cond_signal();
+    }
   }//if
   return;
 
@@ -895,9 +917,17 @@
         (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
       theMinNoOfEventsToWakeUp = 0;
       TransporterFacade *tp = TransporterFacade::instance();
-      if (this != tp->get_poll_owner()) {
-        TransporterFacade::instance()->remove_cond_wait_queue(this);
-        cond_signal();
+      NdbWaiter *t_waiter= &theImpl->theWaiter;
+      if (tp->get_poll_owner() != t_waiter) {
+        /*
+          When we come here, this is executed by the thread owning the "poll
+          rights". This thread is not where our waiter object belongs.
+          Thus we wake up the thread owning this waiter object but first
+          we must remove it from the conditional wait queue so that we
+          don't assign it as poll owner later on.
+        */
+        tp->remove_from_cond_wait_queue(t_waiter);
+        t_waiter->cond_signal();
       }
       return;
     }//if
@@ -1158,7 +1188,7 @@
 void	
 Ndb::waitCompletedTransactions(int aMilliSecondsToWait, 
 			       int noOfEventsToWaitFor,
-                               TransporterFacade *tp)
+                               PollGuard *poll_guard)
 {
   theImpl->theWaiter.m_state = NO_WAIT; 
   /**
@@ -1172,55 +1202,13 @@
   theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
   do {
     if (waitTime < 1000) waitTime = 1000;
-    Ndb *t_poll_owner = tp->get_poll_owner();
-    if (t_poll_owner != NULL &&
-        t_poll_owner != this) {
-      /*
-        We didn't get hold of the poll "right". We will sleep on a
-        conditional mutex until the thread owning the poll "right"
-        will wake us up after all data is received. If no data arrives
-        we will wake up eventually due to the timeout.
-      */
-      cond_wait_index = tp->put_cond_wait_queue(this);
-      NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
-                               (NdbMutex*)theImpl->theWaiter.m_mutex,
-                                waitTime);
-      if (cond_wait_index != TransporterFacade::MAX_NO_THREADS)
-        tp->remove_cond_wait_queue(this);
-    } else {
-      /*
-        We got the poll "right" and we poll until data is received. After
-        receiving data we will check if all data is received, if not we
-        poll again.
-      */
-      tp->set_poll_owner(this);
-      tp->external_poll(waitTime);
-    }
+    poll_guard->wait_for_input(waitTime);
     if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
       break;
     }//if
     theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
     waitTime = (int)(maxTime - NdbTick_CurrentMillisecond());
   } while (waitTime > 0);
-  /*
-   When completing the poll for this thread we must return the poll
-   ownership if we own it. We will give it to the last thread that
-   came here (the most recent) which is likely to be the one also
-   last to complete. We will remove that thread from the conditional
-   wait queue and set him as the new owner of the poll "right".
-   We will wait however with the signal until we have unlocked the
-   mutex for performance reasons.
-   See Stevens book on Unix NetworkProgramming: The Sockets Networking
-   API Volume 1 Third Edition on page 703-704 for a discussion on this
-   subject.
-  */
-  if (tp->get_poll_owner() == this) {
-    tp->set_poll_owner(NULL);
-    cond_signal_ndb = tp->rem_last_from_cond_wait_queue();
-    if (cond_signal_ndb != NULL)
-      tp->set_poll_owner(cond_signal_ndb);
-  }
-  return;
 }//Ndb::waitCompletedTransactions()
 
 void Ndb::cond_signal()
@@ -1254,15 +1242,15 @@
 int	
 Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
 {
-  TransporterFacade *tp = TransporterFacade::instance();
-  tp->lock_mutex();
+  PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+               theNdbBlockNumber);
   sendPrepTrans(forceSend);
-  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, tp);
+  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
 }
 
 int
 Ndb::poll_trans(int aMillisecondNumber, int minNoOfEventsToWakeup,
-                TransporterFacade *tp)
+                PollGuard *pg)
 {
   NdbTransaction* tConArray[1024];
   Uint32         tNoCompletedTransactions;
@@ -1272,19 +1260,15 @@
   }//if
   if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
       (aMillisecondNumber > 0)) {
-    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, tp);
+    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, pg);
     tNoCompletedTransactions = pollCompleted(tConArray);
   } else {
     tNoCompletedTransactions = pollCompleted(tConArray);
   }//if
-  tp->unlock_mutex();
-  if (cond_signal_ndb) {
-    cond_signal_ndb->cond_signal();
-    cond_signal_ndb = NULL;
-  }
+  pg->unlock_and_signal();
   reportCallback(tConArray, tNoCompletedTransactions);
   return tNoCompletedTransactions;
-}//Ndb::sendPollNdb()
+}
 
 /*****************************************************************************
 int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);
@@ -1296,29 +1280,10 @@
 int	
 Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup)
 {
-  TransporterFacade *tp = TransporterFacade::instance();
-  tp->lock_mutex();
-  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, tp);
-}
-/*
-  Duplicated code: moved into poll_trans(..)
-
-  if ((minNoOfEventsToWakeup == 0) ||
-      ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
-    minNoOfEventsToWakeup = theNoOfSentTransactions;
-  }//if
-  if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
-      (aMillisecondNumber > 0)) {
-    waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
-    tNoCompletedTransactions = pollCompleted(tConArray);
-  } else {
-    tNoCompletedTransactions = pollCompleted(tConArray);
-  }//if
-  TransporterFacade::instance()->unlock_mutex();
-  reportCallback(tConArray, tNoCompletedTransactions);
-  return tNoCompletedTransactions;
+  PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+               theNdbBlockNumber);
+  return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
 }
-*/
 
 /*****************************************************************************
 int receiveOptimisedResponse();
@@ -1332,7 +1297,7 @@
 Ndb::receiveResponse(int waitTime){
   int tResultCode;
   TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
-  
+
   theImpl->theWaiter.wait(waitTime);
   
   if(theImpl->theWaiter.m_state == NO_WAIT) {

--- 1.30/ndb/src/ndbapi/Ndbinit.cpp	Fri Feb  4 13:56:37 2005
+++ 1.31/ndb/src/ndbapi/Ndbinit.cpp	Tue Mar 22 02:14:05 2005
@@ -228,6 +228,8 @@
   m_node = 0;
   m_state = NO_WAIT;
   m_mutex = 0;
+  m_poll_owner= false;
+  m_cond_wait_index= TransporterFacade::MAX_NO_THREADS;
   m_condition = NdbCondition_Create();
 }
 

--- 1.35/ndb/src/ndbapi/TransporterFacade.cpp	Fri Feb  4 14:40:48 2005
+++ 1.36/ndb/src/ndbapi/TransporterFacade.cpp	Tue Mar 22 02:14:05 2005
@@ -499,7 +499,7 @@
   doubly linked list. Finally it assigns the ndb object reference to the
   entry.
 */
-Uint32 TransporterFacade::put_cond_wait_queue(Ndb *aNdb)
+Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter)
 {
   /*
    Get first free entry
@@ -519,8 +519,8 @@
     cond_wait_array[last_in_cond_wait].next_cond_wait = index;
   last_in_cond_wait = index;
 
-  cond_wait_array[index].ndb_cond_wait_object = aNdb;
-  aNdb->set_cond_wait_index(index);
+  cond_wait_array[index].cond_wait_object = aWaiter;
+  aWaiter->set_cond_wait_index(index);
   return index;
 }
 
@@ -532,11 +532,11 @@
   NULLifies the ndb object reference entry and sets the index in the
   Ndb object to NIL (=MAX_NO_THREADS)
 */
-void TransporterFacade::remove_cond_wait_queue(Ndb *aNdb)
+void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter)
 {
-  Uint32 index = aNdb->get_cond_wait_index();
+  Uint32 index = aWaiter->get_cond_wait_index();
   assert(index < MAX_NO_THREADS &&
-         cond_wait_array[index].ndb_cond_wait_object == aNdb);
+         cond_wait_array[index].cond_wait_object == aWaiter);
   /*
    Remove from doubly linked list
   */
@@ -558,23 +558,23 @@
   cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS;
   first_free_cond_wait = index;
 
-  cond_wait_array[index].ndb_cond_wait_object = NULL;
-  aNdb->set_cond_wait_index(MAX_NO_THREADS);
+  cond_wait_array[index].cond_wait_object = NULL;
+  aWaiter->set_cond_wait_index(MAX_NO_THREADS);
 }
 
 /*
   Get the latest Ndb object from the conditional wait queue
   and also remove it from the list.
 */
-Ndb* TransporterFacade::rem_last_from_cond_wait_queue()
+NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue()
 {
-  Ndb *tNdb;
+  NdbWaiter *tWaiter;
   Uint32 index = last_in_cond_wait;
   if (last_in_cond_wait == MAX_NO_THREADS)
     return NULL;
-  tNdb = cond_wait_array[index].ndb_cond_wait_object;
-  remove_cond_wait_queue(tNdb);
-  return tNdb;
+  tWaiter = cond_wait_array[index].cond_wait_object;
+  remove_from_cond_wait_queue(tWaiter);
+  return tWaiter;
 }
 
 void TransporterFacade::init_cond_wait_queue()
@@ -590,7 +590,7 @@
   */
   first_free_cond_wait = 0;
   for (i = 0; i < MAX_NO_THREADS; i++) {
-    cond_wait_array[i].ndb_cond_wait_object = NULL;
+    cond_wait_array[i].cond_wait_object = NULL;
     cond_wait_array[i].next_cond_wait = i+1;
     cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS;
   }
@@ -1235,6 +1235,141 @@
   m_objectExecute[number] = oe;
   m_statusFunction[number] = 0;
   return 0;
+}
+
+PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
+                     Uint32 block_no)
+{
+  m_tp= tp;
+  m_waiter= aWaiter;
+  m_locked= true;
+  m_block_no= block_no;
+  tp->lock_mutex();
+}
+
+/*
+  This is a common routine for possibly forcing the send of buffered signals
+  and receiving response the thread is waiting for. It is designed to be
+  useful from:
+  1) PK, UK lookups using the asynchronous interface
+     This routine uses the wait_for_input routine instead since it has
+     special end conditions due to the asynchronous nature of its usage.
+  2) Scans
+  3) dictSignal
+  It uses a NdbWaiter object to wait on the events and this object is
+  linked into the conditional wait queue. Thus this object contains
+  a reference to its place in the queue.
+
+  It replaces the method receiveResponse previously used on the Ndb object
+*/
+int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state)
+{
+  int ret_val;
+  m_waiter->set_node(nodeId);
+  m_waiter->set_state(state);
+  ret_val= wait_for_input_in_loop(wait_time);
+  unlock_and_signal();
+  return ret_val;
+}
+
+int PollGuard::wait_scan(int wait_time, NodeId nodeId)
+{
+  m_waiter->set_node(nodeId);
+  m_waiter->set_state(WAIT_SCAN);
+  return wait_for_input_in_loop(wait_time);
+}
+
+int PollGuard::wait_for_input_in_loop(int wait_time)
+{
+  int ret_val;
+  m_tp->checkForceSend(m_block_no);
+  NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
+  NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
+  do
+  {
+    wait_for_input(wait_time);
+    Uint32 state= m_waiter->get_state();
+    if (state == NO_WAIT)
+      return 0;
+    else if (state == WAIT_NODE_FAILURE)
+    {
+      ret_val= -2;
+      break;
+    }
+    wait_time= max_time - NdbTick_CurrentMillisecond();
+    if (wait_time <= 0)
+    {
+      m_waiter->set_state(WST_WAIT_TIMEOUT);
+      ret_val= -1;
+      break;
+    }
+  } while (1);
+#ifdef VM_TRACE
+  ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
+  ndbout << m_waiter->get_state() << endl;
+#endif
+  m_waiter->set_state(NO_WAIT);
+  return ret_val;
+}
+
+void PollGuard::wait_for_input(int wait_time)
+{
+  NdbWaiter *t_poll_owner= m_tp->get_poll_owner();
+  if (t_poll_owner != NULL && t_poll_owner != m_waiter)
+  {
+    /*
+      We didn't get hold of the poll "right". We will sleep on a
+      conditional mutex until the thread owning the poll "right"
+      will wake us up after all data is received. If no data arrives
+      we will wake up eventually due to the timeout.
+    */
+    Uint32 cond_wait_index= m_tp->put_in_cond_wait_queue(m_waiter);
+    m_waiter->wait(wait_time);
+    if (cond_wait_index != TransporterFacade::MAX_NO_THREADS)
+      m_tp->remove_from_cond_wait_queue(m_waiter);
+  }
+  else
+  {
+    /*
+      We got the poll "right" and we poll until data is received. After
+      receiving data we will check if all data is received, if not we
+      poll again.
+    */
+    m_tp->set_poll_owner(m_waiter);
+    m_waiter->set_poll_owner(true);
+    m_tp->external_poll(wait_time);
+  }
+}
+
+void PollGuard::unlock_and_signal()
+{
+  NdbWaiter *t_signal_cond_waiter= 0;
+  if (!m_locked)
+    return;
+  /*
+   When completing the poll for this thread we must return the poll
+   ownership if we own it. We will give it to the last thread that
+   came here (the most recent) which is likely to be the one also
+   last to complete. We will remove that thread from the conditional
+   wait queue and set him as the new owner of the poll "right".
+   We will wait however with the signal until we have unlocked the
+   mutex for performance reasons.
+   See Stevens book on Unix NetworkProgramming: The Sockets Networking
+   API Volume 1 Third Edition on page 703-704 for a discussion on this
+   subject.
+  */
+  if (m_tp->get_poll_owner() == m_waiter)
+  {
+    m_waiter->set_poll_owner(false);
+    t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue();
+    m_tp->set_poll_owner(t_signal_cond_waiter);
+    if (t_signal_cond_waiter)
+      t_signal_cond_waiter->set_poll_owner(true);
+  }
+  m_tp->unlock_mutex();
+  if (t_signal_cond_waiter)
+    t_signal_cond_waiter->cond_signal();
+  m_locked=false;
 }
 
 template class Vector<NodeStatusFunction>;

--- 1.24/ndb/src/ndbapi/TransporterFacade.hpp	Fri Feb  4 14:40:48 2005
+++ 1.25/ndb/src/ndbapi/TransporterFacade.hpp	Tue Mar 22 02:14:05 2005
@@ -34,6 +34,7 @@
 
 class Ndb;
 class NdbApiSignal;
+class NdbWaiter;
 
 typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
 typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
@@ -137,19 +138,19 @@
   be the last to complete its reception.
 */
   void external_poll(Uint32 wait_time);
-  Ndb *get_poll_owner(void) const { return poll_owner; }
-  void set_poll_owner(Ndb *new_owner) { poll_owner = new_owner; }
-  Uint32 put_cond_wait_queue(Ndb *aNdb);
-  void remove_cond_wait_queue(Ndb* aNdb);
-  Ndb *rem_last_from_cond_wait_queue();
+  NdbWaiter* get_poll_owner(void) const { return poll_owner; }
+  void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
+  Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
+  void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
+  NdbWaiter* rem_last_from_cond_wait_queue();
 private:
   void init_cond_wait_queue();
   struct CondWaitQueueElement {
-    Ndb *ndb_cond_wait_object;
+    NdbWaiter *cond_wait_object;
     Uint32 next_cond_wait;
     Uint32 prev_cond_wait;
   };
-  Ndb *poll_owner;
+  NdbWaiter *poll_owner;
   CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
   Uint32 first_in_cond_wait;
   Uint32 first_free_cond_wait;
@@ -275,6 +276,23 @@
 
 public:
   GlobalDictCache m_globalDictCache;
+};
+
+class PollGuard
+{
+  public:
+  PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
+  ~PollGuard() { unlock_and_signal(); }
+  int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state);
+  int wait_for_input_in_loop(int wait_time);
+  void wait_for_input(int wait_time);
+  int wait_scan(int wait_time, NodeId nodeId);
+  void unlock_and_signal();
+  private:
+  TransporterFacade *m_tp;
+  NdbWaiter *m_waiter;
+  Uint32 m_block_no;
+  bool m_locked;
 };
 
 inline

--- 1.1/ndb/src/ndbapi/NdbWaiter.hpp	Mon Dec 20 12:36:04 2004
+++ 1.2/ndb/src/ndbapi/NdbWaiter.hpp	Tue Mar 22 02:14:04 2005
@@ -54,10 +54,19 @@
   void wait(int waitTime);
   void nodeFail(Uint32 node);
   void signal(Uint32 state);
+  void cond_signal();
+  void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; }
+  Uint32 get_state() { return m_state; }
+  void set_state(Uint32 state) { m_state= state; }
+  void set_node(Uint32 node) { m_node= node; }
+  Uint32 get_cond_wait_index() { return m_cond_wait_index; }
+  void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; }
 
   Uint32 m_node;
   Uint32 m_state;
   void * m_mutex;
+  bool m_poll_owner;
+  Uint32 m_cond_wait_index;
   struct NdbCondition * m_condition;  
 };
 
@@ -88,7 +97,8 @@
 NdbWaiter::nodeFail(Uint32 aNodeId){
   if (m_state != NO_WAIT && m_node == aNodeId){
     m_state = WAIT_NODE_FAILURE;
-    NdbCondition_Signal(m_condition);
+    if (!m_poll_owner)
+      NdbCondition_Signal(m_condition);
   }
 }
 
@@ -96,7 +106,14 @@
 void 
 NdbWaiter::signal(Uint32 state){
   m_state = state;
-  NdbCondition_Signal(m_condition);
+  if (!m_poll_owner)
+    NdbCondition_Signal(m_condition);
 }
 
+inline
+void
+NdbWaiter::cond_signal()
+{
+  NdbCondition_Signal(m_condition);
+}
 #endif
Thread
bk commit into 5.0 tree (mronstrom:1.1840)mikael22 Mar