List:Commits« Previous MessageNext Message »
From:John David Duncan Date:June 16 2011 5:16am
Subject:bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4182)
View as plain text  
#At file:///Users/jdd/bzr-repo/working/cluster-7.2-multiwait/ based on revid:john.duncan@stripped

 4182 John David Duncan	2011-06-15
      This is a revised version of the Async API Multi-wait patch.  
      The top-level API is moved out of Ndb_cluster_connection into a new class 
      NdbPollGroup.  
      The NdbPollGroup (rather than the application code) owns the array of Ndb 
      objects used for waiting.    
      The "waiter" Ndb object is a dedicated Ndb owned by the NdbPollGroup (rather
      than the first Ndb in the user's list).
      There is still only one NdbPollGroup allowed per cluster connection, but now 
      that restriction could be lifted in a way that would not break existing code.

    added:
      storage/ndb/include/ndbapi/NdbPollGroup.hpp
      storage/ndb/src/ndbapi/NdbPollGroup.cpp
      storage/ndb/src/ndbapi/WakeupHandler.cpp
      storage/ndb/src/ndbapi/WakeupHandler.hpp
      storage/ndb/test/ndbapi/testAsynchMultiwait.cpp
    modified:
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/ndbapi/NdbApi.hpp
      storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
      storage/ndb/include/transporter/TransporterCallback.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/ndbapi/API.hpp
      storage/ndb/src/ndbapi/Makefile.am
      storage/ndb/src/ndbapi/NdbImpl.hpp
      storage/ndb/src/ndbapi/Ndbif.cpp
      storage/ndb/src/ndbapi/Ndbinit.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
      storage/ndb/src/ndbapi/trp_client.hpp
      storage/ndb/test/ndbapi/Makefile.am
      storage/ndb/test/run-test/daily-devel-tests.txt
=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp	2011-04-18 23:20:12 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp	2011-06-16 05:09:51 +0000
@@ -1074,6 +1074,9 @@ class Ndb
   friend class PollGuard;
   friend class NdbQueryImpl;
   friend class NdbQueryOperationImpl;
+  friend class DefaultWakeupHandler;
+  friend class MultiNdbWakeupHandler;
+  friend class NdbPollGroup;
 #endif
 
 public:

=== modified file 'storage/ndb/include/ndbapi/NdbApi.hpp'
--- a/storage/ndb/include/ndbapi/NdbApi.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbApi.hpp	2011-06-16 05:09:51 +0000
@@ -35,4 +35,5 @@
 #include "NdbEventOperation.hpp"
 #include "NdbPool.hpp"
 #include "NdbBlob.hpp"
+#include "NdbPollGroup.hpp"
 #endif

=== added file 'storage/ndb/include/ndbapi/NdbPollGroup.hpp'
--- a/storage/ndb/include/ndbapi/NdbPollGroup.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/ndbapi/NdbPollGroup.hpp	2011-06-16 05:09:51 +0000
@@ -0,0 +1,104 @@
+/*
+ Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ 
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef NdbPollGroup_H
+#define NdbPollGroup_H
+
+class Ndb_cluster_connection;
+class Ndb;
+class MultiNdbWakeupHandler;
+
+/* NdbPollGroup extends the Asynchronous NDB API, allowing the user to
+   wait for asynchronous operations to complete on multiple Ndb objects 
+   with a single call.
+
+   All Ndb objects within a poll group must belong to the same cluster connection.
+   In the current implementation, only one poll group per cluster connection
+   is supported.  The user can instantiate this poll group using 
+   ndb_cluster_connection::get_asynch_poll_group(). 
+   
+   Once Ndb::sendPreparedTransactions() has been used to send an async operation
+   on a particular Ndb object, that Ndb can be added to the poll group. 
+  
+   NdbPollGroup::wait() returns when some Ndb's are ready for polling; at this
+   point the user can call pollNdb(0, 1) on the ones that are ready. 
+*/
+
+class NdbPollGroup {
+friend class Ndb_cluster_connection; // can use the protected constructor
+
+public: 
+  /** Construct an NdbPollGroup for cluster connection conn 
+   * with up to max_ndb_objects objects. 
+   * 
+   * The current implementation supports only one NdbPollGroup per 
+   * cluster connection; any attempt to create a second one will result
+   * in a failed assertion in NdbMultiWakeupHandler().
+   */
+  NdbPollGroup(Ndb_cluster_connection *conn, int max_ndb_objects); 
+
+  ~NdbPollGroup();
+
+  /** Add an Ndb object to the group.  
+      The client thread uses this method to add Ndb objects to the group
+      before calling waitNdbs() on them.
+      
+      Returns true on success, false on error.
+  */
+  bool addNdb(Ndb *);
+
+
+  /** Remove an Ndb object from the group.
+      This restores normal   
+  
+  /** Wake up the thread that is polling this group. 
+      This can be used by other threads to signal a condition to the
+      waiting thread.
+  */
+  void wakeup(); 
+
+
+  /** wait for Ndbs to be ready.
+      The call will return when:
+        (a) at least min_ready Ndbs are ready for polling, or
+        (b) timeout milliseconds have elapsed, or  
+        (c) another thread has called NdbPollGroup::wakeup() 
+
+      The return value is the number of Ndb objects ready for polling, 
+      with 0 indicating a timeout or wakeup, or negative on error.
+
+      On return, arrayHead is set to the head of the list of Ndb objectss that 
+      are ready for polling, and those objects have been implicitly removed 
+      from the group.      
+  */
+  int wait(Ndb ** & arrayHead, Uint32 timeout_millis, Uint32 min_ready = 1 ); 
+  
+private:   /* private methods */
+   int topDownIdx(int n) { return m_array_size - n; }
+
+private:  /* private instance variables */
+   Ndb_cluster_connection *m_conn;
+   MultiNdbWakeupHandler *m_multiWaitHandler;
+   Ndb *m_wakeNdb;
+   Ndb **m_array;
+   int m_array_size;
+   int m_count;
+};
+
+
+#endif
+

=== modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp'
--- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	2011-02-04 18:34:09 +0000
+++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	2011-06-16 05:09:51 +0000
@@ -206,6 +206,7 @@ private:
   friend class NdbImpl;
   friend class Ndb_cluster_connection_impl;
   friend class SignalSender;
+  friend class NdbPollGroup;
   class Ndb_cluster_connection_impl & m_impl;
   Ndb_cluster_connection(Ndb_cluster_connection_impl&);
 

=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp	2011-06-16 05:09:51 +0000
@@ -402,6 +402,14 @@ public:
    */
 
   /**
+   * Notify upper layer of explicit wakeup request
+   * 
+   * The is called from the thread holding receiving data from the 
+   * transporter, under the protection of the transporter lock.
+   */
+  virtual void reportWakeup() { }
+
+  /**
    * Ask upper layer to supply a list of struct iovec's with data to
    * send to a node.
    *

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2011-06-16 05:09:51 +0000
@@ -1278,6 +1278,9 @@ TransporterRegistry::consume_extra_socke
     ret = my_recv(sock, buf, sizeof(buf), 0);
     err = my_socket_errno();
   } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
+  
+  /* Notify upper layer of explicit wakeup */
+  callbackObj->reportWakeup();
 }
 
 void

=== modified file 'storage/ndb/src/ndbapi/API.hpp'
--- a/storage/ndb/src/ndbapi/API.hpp	2011-03-30 13:26:02 +0000
+++ b/storage/ndb/src/ndbapi/API.hpp	2011-06-16 05:09:51 +0000
@@ -40,6 +40,7 @@
 #include <NdbBlob.hpp>
 #include <NdbBlobImpl.hpp>
 #include <NdbInterpretedCode.hpp>
+#include <NdbPollGroup.hpp>
 
 #include <NdbEventOperation.hpp>
 #include "NdbEventOperationImpl.hpp"

=== modified file 'storage/ndb/src/ndbapi/Makefile.am'
--- a/storage/ndb/src/ndbapi/Makefile.am	2011-02-04 11:45:24 +0000
+++ b/storage/ndb/src/ndbapi/Makefile.am	2011-06-16 05:09:51 +0000
@@ -43,6 +43,7 @@ libndbapi_la_SOURCES = \
         NdbOperationInt.cpp     \
         NdbOperationDefine.cpp  \
         NdbOperationExec.cpp    \
+        NdbPollGroup.cpp \
         NdbScanOperation.cpp    \
         NdbScanFilter.cpp       \
         NdbIndexOperation.cpp   \
@@ -64,9 +65,10 @@ libndbapi_la_SOURCES = \
 	NdbInfo.cpp \
 	NdbInfoScanOperation.cpp \
         ndb_internal.cpp \
-	trp_client.cpp \
+        trp_client.cpp \
         trp_node.cpp \
-	trp_buffer.cpp
+        trp_buffer.cpp \
+        WakeupHandler.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 

=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp	2011-04-18 23:20:12 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp	2011-06-16 05:09:51 +0000
@@ -31,6 +31,7 @@
 #include "trp_client.hpp"
 #include "trp_node.hpp"
 #include "NdbWaiter.hpp"
+#include "WakeupHandler.hpp"
 
 template <class T>
 struct Ndb_free_list_t 
@@ -80,7 +81,11 @@ public:
   Uint32 the_release_ind[MAX_NDB_NODES];
 
   NdbWaiter             theWaiter;
-
+ 
+  DefaultWakeupHandler normalWakeHandler;
+  WakeupHandler* wakeHandler;
+  Uint32 wakeContext;
+  
   NdbEventOperationImpl *m_ev_op;
 
   int m_optimized_node_selection;
@@ -190,6 +195,7 @@ public:
    */
   virtual void trp_deliver_signal(const NdbApiSignal*,
                                   const LinearSectionPtr p[3]);
+  virtual void trp_wakeup();
   virtual void recordWaitTimeNanos(Uint64 nanos);
   // Is node available for running transactions
   bool   get_node_alive(NodeId nodeId) const;
@@ -599,4 +605,13 @@ NdbImpl::sendFragmentedSignal(NdbApiSign
   }
   return -1;
 }
+
+inline 
+void
+NdbImpl::trp_wakeup() 
+{
+  wakeHandler->notifyWakeup();
+}
+
+
 #endif

=== added file 'storage/ndb/src/ndbapi/NdbPollGroup.cpp'
--- a/storage/ndb/src/ndbapi/NdbPollGroup.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbPollGroup.cpp	2011-06-16 05:09:51 +0000
@@ -0,0 +1,115 @@
+/*
+ Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
+ 
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#include <ndb_global.h>
+#include "NdbPollGroup.hpp"
+#include "WakeupHandler.hpp"
+#include <Ndb_cluster_connection.hpp>
+#include "TransporterFacade.hpp"
+#include "ndb_cluster_connection_impl.hpp"
+#include "NdbImpl.hpp"
+
+NdbPollGroup::NdbPollGroup(Ndb_cluster_connection *_conn, int _ndbs) :
+  m_conn(_conn),
+  m_array_size(_ndbs),
+  m_count(0),
+  m_multiWaitHandler(0)
+{
+  /* Allocate the array of Ndbs */
+  m_array = new Ndb *[m_array_size];
+
+  /* Call into the TransporterFacade to set up wakeups */
+  bool rc = m_conn->m_impl.m_transporter_facade->setupWakeup();
+  assert(rc);
+  
+  /* Get a new Ndb object to be the dedicated "wakeup object" for the group */
+  m_wakeNdb = new Ndb(m_conn);
+  assert(m_wakeNdb);
+  m_wakeNdb->init(4);
+  
+  /* Get a wakeup handler */
+  m_multiWaitHandler = new MultiNdbWakeupHandler(m_wakeNdb);
+}
+
+
+NdbPollGroup::~NdbPollGroup() 
+{
+  while(m_count > 0) 
+  {
+    m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count--)]);
+  }
+
+  delete[] m_array;
+  delete m_multiWaitHandler;
+}
+
+
+bool NdbPollGroup::addNdb(Ndb *ndb)
+{
+  if(unlikely(& ndb->theImpl->m_ndb_cluster_connection != m_conn))
+  {
+    abort();
+  }
+  
+  if(unlikely(topDownIdx(m_count) == 0))
+  {
+    return false;
+  }
+  
+  m_count++;   
+  m_array[topDownIdx(m_count)] = ndb; 
+  return true;
+}
+
+
+void NdbPollGroup::wakeup() 
+{
+  m_conn->m_impl.m_transporter_facade->requestWakeup();
+}
+
+
+int NdbPollGroup::wait(Ndb ** & arrayHead    /* out */,
+                       Uint32 timeout_millis,
+                       Uint32 min_ready)
+{
+  arrayHead = m_array + topDownIdx(m_count);
+  m_multiWaitHandler->setWakeThreshold(min_ready);
+
+  /* Update the statistics on the Ndb waiter */
+  m_wakeNdb->theImpl->incClientStat(Ndb::WaitExecCompleteCount, 1);
+
+  int wait_rc;
+  int nready;
+  {
+    PollGuard pg(* m_wakeNdb->theImpl);   // get ready to poll
+    m_multiWaitHandler->setNdbList(arrayHead, m_count);
+    wait_rc = m_multiWaitHandler->waitForInput(& pg, timeout_millis);
+    nready = m_multiWaitHandler->getNumReadyNdbs();
+
+    if(wait_rc == 0)  // success.  remove ready Ndbs from poll list.
+    {
+      for(int i = 0 ; i < nready ; i++) 
+      {
+        m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count)]);
+        m_count--;
+      }
+    }
+  }   /* release PollGuard */
+
+  return wait_rc ? -1 : nready;
+}
+

=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp	2011-02-28 12:25:52 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp	2011-06-16 05:09:51 +0000
@@ -1002,12 +1002,8 @@ Ndb::completedTransaction(NdbTransaction
     theNoOfSentTransactions = tNoSentTransactions - 1;
     aCon->theListState = NdbTransaction::InCompletedList;
     aCon->handleExecuteCompletion();
-    if ((theMinNoOfEventsToWakeUp != 0) &&
-        (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
-      theMinNoOfEventsToWakeUp = 0;
-      theImpl->theWaiter.signal(NO_WAIT);
-      return;
-    }//if
+    
+    theImpl->wakeHandler->notifyTransactionCompleted(this);
   } else {
     ndbout << "theNoOfSentTransactions = " << (int) theNoOfSentTransactions;
     ndbout << " theListState = " << (int) aCon->theListState;
@@ -1235,7 +1231,7 @@ Ndb::sendPrepTrans(int forceSend)
     insert_completed_list(a_con);
   }//for
   theNoOfPreparedTransactions = 0;
-  theImpl->do_forceSend(forceSend);
+  theImpl->do_forceSend(forceSend);  
   return;
 }//Ndb::sendPrepTrans()
 

=== modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp'
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp	2011-04-18 23:20:12 +0000
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp	2011-06-16 05:09:51 +0000
@@ -192,6 +192,8 @@ NdbImpl::NdbImpl(Ndb_cluster_connection 
 		      1024,1024),
     theNoOfDBnodes(0),
     theWaiter(this),
+    wakeHandler(&normalWakeHandler),
+    wakeContext(~Uint32(0)),
     m_ev_op(0),
     customDataPtr(0)
 {

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2011-03-30 22:07:57 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2011-06-16 05:09:51 +0000
@@ -533,6 +533,7 @@ TransporterFacade::TransporterFacade(Glo
   theClusterMgr(NULL),
   checkCounter(4),
   currentSendLimit(1),
+  dozer(NULL),
   theStopReceive(0),
   theSendThread(NULL),
   theReceiveThread(NULL),
@@ -2031,3 +2032,64 @@ TransporterFacade::ext_doConnect(int aNo
   theClusterMgr->unlock();
 }
 
+bool
+TransporterFacade::setupWakeup()
+{
+  /* Ask TransporterRegistry to setup wakeup sockets */
+  bool rc;
+  lock_mutex();
+  {
+    rc = theTransporterRegistry->setup_wakeup_socket();
+  }
+  unlock_mutex();
+  return rc;
+}
+
+bool 
+TransporterFacade::registerForWakeup(trp_client* _dozer)
+{
+  /* Called with Transporter lock */
+  /* In future use a DLList for dozers.
+   * Ideally with some way to wake one rather than all
+   * For now, we just have one/TransporterFacade
+   */
+  if (dozer != NULL)
+    return false;
+
+  dozer = _dozer;
+  return true;
+}
+
+bool
+TransporterFacade::unregisterForWakeup(trp_client* _dozer)
+{
+  /* Called with Transporter lock */
+  if (dozer != _dozer)
+    return false;
+
+  dozer = NULL;
+  return true;
+}
+
+void
+TransporterFacade::requestWakeup()
+{
+  /* Forward to TransporterRegistry
+   * No need for locks, assuming only one client at a time will use
+   */
+  theTransporterRegistry->wakeup();
+}
+
+
+void
+TransporterFacade::reportWakeup()
+{
+  /* Explicit wakeup callback
+   * Called with Transporter Mutex held
+   */
+  /* Notify interested parties */
+  if (dozer != NULL)
+  {
+    dozer->trp_wakeup();
+  };
+}

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2011-02-28 12:25:52 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2011-06-16 05:09:51 +0000
@@ -197,6 +197,21 @@ public:
   {
     theTransporterRegistry->reset_send_buffer(node, should_be_empty);
   }
+  /**
+   * Wakeup
+   *
+   * Clients normally block waiting for a pattern of signals,
+   * or until a timeout expires.
+   * This Api allows them to be woken early.
+   * To use it, a setupWakeup() call must be made once prior
+   * to using the Apis in any client.
+   * 
+   */
+  bool setupWakeup();
+  bool registerForWakeup(trp_client* dozer);
+  bool unregisterForWakeup(trp_client* dozer);
+  void requestWakeup();
+  void reportWakeup();
 
 private:
 
@@ -223,6 +238,11 @@ private:
   
   void calculateSendLimit();
 
+  /* Single dozer supported currently.
+   * In future, use a DLList to support > 1
+   */
+  trp_client * dozer;
+
   // Declarations for the receive and send thread
   int  theStopReceive;
 

=== added file 'storage/ndb/src/ndbapi/WakeupHandler.cpp'
--- a/storage/ndb/src/ndbapi/WakeupHandler.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/WakeupHandler.cpp	2011-06-16 05:09:51 +0000
@@ -0,0 +1,223 @@
+
+/*
+ Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+ 
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#include "WakeupHandler.hpp"
+#include "Ndb.hpp"
+#include "NdbImpl.hpp"
+#include "trp_client.hpp"
+
+//  ****** Default handler ****
+void DefaultWakeupHandler::notifyTransactionCompleted(Ndb *from) 
+{
+  if ((from->theMinNoOfEventsToWakeUp != 0) &&
+      (from->theNoOfCompletedTransactions >= from->theMinNoOfEventsToWakeUp)) {
+    from->theMinNoOfEventsToWakeUp = 0;
+    from->theImpl->theWaiter.signal(NO_WAIT);
+  }
+}
+
+void DefaultWakeupHandler::notifyWakeup()
+{
+}
+
+
+
+// ***** Multiwait handler ****
+
+/**
+ * An instance of this class is used when a single thread
+ * wants to wait for the asynchronous completion of transactions
+ * on multiple Ndb objects.
+ * When the thread starts waiting, all Ndb objects are checked
+ * for CompletedTransactions, and their wakeHandler is set to
+ * poin to the same MultiNdbWakeupHandler object.  The thread
+ * is then put to sleep / polls on a designated Ndb object.
+ *
+ * As transactions complete, the MultiNdbWakeHandler object
+ * moves their Ndb objects to the start of the passed Ndb
+ * object list and determines whether enough have completed 
+ * to wake the waiting thread.
+ * When enough have completed, the waiting thread is woken via
+ * the designated Ndb object.
+ */
+
+MultiNdbWakeupHandler::MultiNdbWakeupHandler(Ndb* _wakeNdb)
+ : wakeNdb(_wakeNdb),
+   woken(false),
+   minCompletedTransToWake(1)
+{
+  /* Register the waiter Ndb to receive wakeups for all Ndbs in the group */
+  PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade  
+  bool rc = wakeNdb->theImpl->m_transporter_facade->registerForWakeup(wakeNdb->theImpl);
+  assert(rc);
+  
+  /* And set its wakeHandler */
+  wakeNdb->theImpl->wakeHandler = this;
+}
+
+
+bool MultiNdbWakeupHandler::setNdbList(Ndb** _objs, Uint32 _cnt)
+{
+  numNdbsWithCompletedTrans = 0;
+  numCompletedTrans = 0;
+  objs = _objs;
+  cnt = _cnt;
+  for (Uint32 ndbcnt = 0; ndbcnt < cnt; ndbcnt ++)
+  {
+    Ndb* obj = objs [ndbcnt];
+    
+    /* Set each Ndb object's wakeHandler */
+    obj->theImpl->wakeHandler = this;
+    
+    /* Store the object's list position in its wake context */
+    obj->theImpl->wakeContext = ndbcnt;
+    
+    /* An Ndb may already have some completed transactions */
+    if (obj->theNoOfCompletedTransactions)
+    {
+      numCompletedTrans += obj->theNoOfCompletedTransactions;
+      /* Move that ndb to the start of the array */
+      swapNdbsInArray(ndbcnt, numNdbsWithCompletedTrans);
+      numNdbsWithCompletedTrans++;
+    }
+  }
+  
+  return true;
+}
+
+
+MultiNdbWakeupHandler::~MultiNdbWakeupHandler()
+{
+  PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade  
+  bool rc = wakeNdb->theImpl->m_transporter_facade->
+            unregisterForWakeup(wakeNdb->theImpl);
+  assert(rc);
+}
+
+    
+Uint32 MultiNdbWakeupHandler::getNumReadyNdbs() const
+{
+  return numNdbsWithCompletedTrans;
+}
+
+    
+bool MultiNdbWakeupHandler::isReadyToWake() const
+{
+  return (numCompletedTrans >= minCompletedTransToWake) || woken;
+}
+
+    
+int MultiNdbWakeupHandler::waitForInput(PollGuard* pg, int timeout_millis)
+{
+  // TODO : Check wakeNdb is the Ndb in pg
+  wakeNdb->theImpl->theWaiter.set_node(0);
+  wakeNdb->theImpl->theWaiter.set_state(WAIT_TRANS);
+  
+  /* We need to wait for some event(s) to fire */
+  NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+  NDB_TICKS maxTime = currTime + (NDB_TICKS) timeout_millis;
+  
+  int maxsleep = timeout_millis > 10 ? 10 : timeout_millis;
+  
+  do {
+    /* PollGuard will put us to sleep until something relevant happens */
+    pg->wait_for_input(maxsleep);
+    if (isReadyToWake())
+    {
+      return 0;
+    }
+    timeout_millis = (int) (maxTime - NdbTick_CurrentMillisecond());
+  } while (timeout_millis > 0);
+  
+  return -1;
+}
+
+
+void MultiNdbWakeupHandler::unregisterNdb(Ndb *obj) {
+  assert(obj->theImpl->wakeHandler == this);
+
+  obj->theImpl->wakeHandler = & obj->theImpl->normalWakeHandler;
+  obj->theImpl->wakeContext = 0;
+}
+
+
+    
+void MultiNdbWakeupHandler::swapNdbsInArray(Uint32 indexA, Uint32 indexB)
+{
+  /* Generally used to move an Ndb object down the list
+   * (bubble sort), so that it is part of a contiguous
+   * list of Ndbs with completed transactions to return
+   * to caller.
+   * If it's already in the given position, no effect
+   */
+  assert(indexA < cnt);
+  assert(indexB < cnt);
+  
+  Ndb* a = objs[ indexA ];
+  Ndb* b = objs[ indexB ];
+  
+  assert(a->theImpl->wakeContext == indexA);
+  assert(b->theImpl->wakeContext == indexB);
+  
+  objs[ indexA ] = b;
+  b->theImpl->wakeContext = indexA;
+  
+  objs[ indexB ] = a;
+  a->theImpl->wakeContext = indexB;
+}
+    
+      
+void MultiNdbWakeupHandler::notifyTransactionCompleted(Ndb* from)
+{
+  // TODO : assert tp lock held.
+  /* Some Ndb object has just completed another transaction.
+     Ensure that it's in the completed Ndbs list 
+  */
+  assert(from != wakeNdb);
+  
+  Uint32 completedNdbListPos = from->theImpl->wakeContext;
+  assert(completedNdbListPos < cnt);
+  if (completedNdbListPos >= numNdbsWithCompletedTrans)
+  {
+    /* It's not, swap it with Ndb in 'next' position */
+    swapNdbsInArray(completedNdbListPos, numNdbsWithCompletedTrans);
+    
+    numNdbsWithCompletedTrans ++;
+  }
+  
+  numCompletedTrans ++;
+  
+  if (numCompletedTrans >= minCompletedTransToWake)
+  {
+    /* Wakeup client thread, using 'waiter' Ndb */
+    assert(wakeNdb->theImpl->wakeHandler == this);
+    wakeNdb->theImpl->theWaiter.signal(NO_WAIT);
+  }
+  
+  return;
+}
+
+    
+void MultiNdbWakeupHandler::notifyWakeup()
+{
+  woken = true;
+  
+  /* Wakeup client thread, using 'waiter' Ndb */
+  assert(wakeNdb->theImpl->wakeHandler == this);
+  wakeNdb->theImpl->theWaiter.signal(NO_WAIT);
+}

=== added file 'storage/ndb/src/ndbapi/WakeupHandler.hpp'
--- a/storage/ndb/src/ndbapi/WakeupHandler.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/WakeupHandler.hpp	2011-06-16 05:09:51 +0000
@@ -0,0 +1,86 @@
+/*
+ Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+ 
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef WakeupHandler_H
+#define WakeupHandler_H
+
+#include <ndb_types.h>
+class Ndb;
+class Ndb_cluster_connection;
+class PollGuard;
+
+/**
+ * WakeupHandler
+ *
+ * Help Ndb objects respond to wakeups from the TransporterFacade
+ * when transactions have completed.
+ *
+ * Each Ndb will own an instance of the DefaultWakeupHandler,
+ * and each NdbPollGroup will create an instance of a more specialized
+ * WakeupHandler.
+ */
+
+class WakeupHandler
+{
+  public:
+    virtual void notifyTransactionCompleted(Ndb* from) = 0;
+    virtual void notifyWakeup() = 0;
+    virtual ~WakeupHandler() {};
+};
+
+
+class DefaultWakeupHandler : public WakeupHandler
+{
+  void notifyTransactionCompleted(Ndb* from);
+  void notifyWakeup();
+};
+
+
+class MultiNdbWakeupHandler : public WakeupHandler
+{
+  public:
+    MultiNdbWakeupHandler(Ndb* _wakeNdb);
+    bool setNdbList(Ndb** _objs, Uint32 _cnt);
+    void unregisterNdb(Ndb *);
+    void notifyTransactionCompleted(Ndb* from);
+    void notifyWakeup();
+    ~MultiNdbWakeupHandler();
+
+    void setWakeThreshold(int n);  
+    Uint32 getNumReadyNdbs() const;
+    bool isReadyToWake() const;
+    int waitForInput(PollGuard* pg, int timeout_millis);
+
+  private:   // private methods
+     void swapNdbsInArray(Uint32 indexA, Uint32 indexB);
+ 
+  private:   // private instance variables
+    Uint32 numNdbsWithCompletedTrans;
+    Uint32 numCompletedTrans;
+    Uint32 minCompletedTransToWake;
+    Ndb* wakeNdb;
+    Ndb** objs;
+    Uint32 cnt;
+    bool woken;
+};
+
+
+inline void MultiNdbWakeupHandler::setWakeThreshold(int n) {
+  minCompletedTransToWake = n;
+}
+
+#endif

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2011-03-31 20:51:28 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2011-06-16 05:09:51 +0000
@@ -73,6 +73,7 @@ public:
 private:
   friend class Ndb;
   friend class NdbImpl;
+  friend class NdbPollGroup;
   friend void* run_ndb_cluster_connection_connect_thread(void*);
   friend class Ndb_cluster_connection;
   friend class NdbEventBuffer;

=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp	2011-02-24 22:07:05 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp	2011-06-16 05:09:51 +0000
@@ -35,6 +35,8 @@ public:
 
   virtual void trp_deliver_signal(const NdbApiSignal *,
                                   const LinearSectionPtr ptr[3]) = 0;
+  virtual void trp_wakeup()
+    {};
 
   Uint32 open(class TransporterFacade*, int blockNo = -1);
   void close();

=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am	2011-02-04 11:45:24 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am	2011-06-16 05:09:51 +0000
@@ -29,6 +29,7 @@ flexTT \
 testBackup \
 testBasic \
 testBasicAsynch \
+testAsynchMultiwait \
 testBlobs \
 testDataBuffers \
 testDict \
@@ -89,6 +90,7 @@ testBackup_SOURCES = testBackup.cpp
 testBasic_SOURCES = testBasic.cpp
 testSpj_SOURCES = testSpj.cpp
 testBasicAsynch_SOURCES = testBasicAsynch.cpp
+testAsynchMultiwait_SOURCES = testAsynchMultiwait.cpp
 testBlobs_SOURCES = testBlobs.cpp
 testDataBuffers_SOURCES = testDataBuffers.cpp
 testDict_SOURCES = testDict.cpp

=== added file 'storage/ndb/test/ndbapi/testAsynchMultiwait.cpp'
--- a/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp	2011-06-16 05:09:51 +0000
@@ -0,0 +1,296 @@
+/*
+ Copyright (C) 2003-2006, 2008 MySQL AB
+ All rights reserved. Use is subject to license terms.
+ 
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#include "NDBT_Test.hpp"
+#include "NDBT_ReturnCodes.h"
+#include "HugoTransactions.hpp"
+#include "HugoAsynchTransactions.hpp"
+#include "UtilTransactions.hpp"
+#include "random.h"
+
+NdbPollGroup * global_poll_group;
+
+#define check(b, e) \
+  if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " \
+  << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; }
+
+
+int runSetup(NDBT_Context* ctx, NDBT_Step* step){
+  
+  int records = ctx->getNumRecords();
+  int batchSize = ctx->getProperty("BatchSize", 1);
+  int transactions = (records / 100) + 1;
+  int operations = (records / transactions) + 1;
+  
+  HugoAsynchTransactions hugoTrans(*ctx->getTab());
+  if (hugoTrans.loadTableAsynch(GETNDB(step), records, batchSize, 
+				transactions, operations) != 0){
+    return NDBT_FAILED;
+  }
+
+  Ndb* pNdb = GETNDB(step);
+  Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
+  global_poll_group = new NdbPollGroup(conn, 1000);
+
+  if(global_poll_group == 0) {
+    return NDBT_FAILED;
+  }
+
+  return NDBT_OK;
+}
+
+int runCleanup(NDBT_Context* ctx, NDBT_Step* step){
+  int records = ctx->getNumRecords();
+  int batchSize = ctx->getProperty("BatchSize", 1);
+  int transactions = (records / 100) + 1;
+  int operations = (records / transactions) + 1;
+  
+  HugoAsynchTransactions hugoTrans(*ctx->getTab());
+  if (hugoTrans.pkDelRecordsAsynch(GETNDB(step),  records, batchSize, 
+				   transactions, operations) != 0){
+    return NDBT_FAILED;
+  }
+
+  delete global_poll_group;
+  
+  return NDBT_OK;
+}
+
+
+int runPkReadMultiBasic(NDBT_Context* ctx, NDBT_Step* step){
+  int loops = ctx->getNumLoops();
+  int records = ctx->getNumRecords();
+  const int MAX_NDBS = 200;
+  Ndb* pNdb = GETNDB(step);
+  Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
+  
+  int i = 0;
+  HugoOperations hugoOps(*ctx->getTab());
+  
+  Ndb* ndbObjs[ MAX_NDBS ];  
+  NdbTransaction* transArray[ MAX_NDBS ];
+  Ndb ** ready_ndbs;
+  
+  for (int j=0; j < MAX_NDBS; j++)
+  {
+    Ndb* ndb = new Ndb(conn);
+    check(ndb->init() == 0, (*ndb));
+    ndbObjs[ j ] = ndb;
+  }
+  
+  while (i<loops) {
+    ndbout << "Loop : " << i << ": ";
+    int recordsLeft = records;
+    
+    do
+    {
+      /* Define and execute Pk read requests on 
+       * different Ndb objects
+       */
+      int ndbcnt = 0;
+      int pollcnt = 0;
+      int lumpsize = 1 + myRandom48(MIN(recordsLeft, MAX_NDBS));
+      while(lumpsize &&
+            recordsLeft && 
+            ndbcnt < MAX_NDBS)
+      {
+        Ndb* ndb = ndbObjs[ ndbcnt ];
+        NdbTransaction* trans = ndb->startTransaction();
+        check(trans != NULL, (*ndb));
+        NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
+        check(readOp != NULL, (*trans));
+        check(readOp->readTuple() == 0, (*readOp));
+        check(hugoOps.equalForRow(readOp, recordsLeft) == 0, hugoOps);
+        
+        /* Read all other cols */
+        for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
+        {
+          check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
+                (*readOp));
+        }
+        
+        /* Now send em off */
+        trans->executeAsynchPrepare(NdbTransaction::Commit,
+                                    NULL,
+                                    NULL,
+                                    NdbOperation::AbortOnError);
+        ndb->sendPreparedTransactions();
+        
+        transArray[ndbcnt] = trans;
+        global_poll_group->addNdb(ndb);
+        
+        ndbcnt++;
+        pollcnt++;
+        recordsLeft--;
+        lumpsize--;
+      };
+      
+      /* Ok, now wait for the Ndbs to complete */
+      while (pollcnt)
+      {
+        /* Occasionally check with no timeout */
+        Uint32 timeout_millis = myRandom48(2)?10000:0;
+        int count = global_poll_group->wait(ready_ndbs, timeout_millis);
+
+        if (count > 0)
+        {
+          for (int y=0; y < count; y++)
+          {
+            Ndb *ndb = ready_ndbs[y];
+            check(ndb->pollNdb(0, 1) != 0, (*ndb));
+          }
+          pollcnt -= count;
+        }
+      }
+      
+      /* Ok, now close the transactions */
+      for (int t=0; t < ndbcnt; t++)
+      {
+        transArray[t]->close();
+      }
+    } while (recordsLeft);
+    
+    i++;
+  }
+  
+  for (int j=0; j < MAX_NDBS; j++)
+  {
+    delete ndbObjs[ j ];
+  }
+  
+  return NDBT_OK;
+}
+
+int runPkReadMultiWakeupT1(NDBT_Context* ctx, NDBT_Step* step)
+{
+  HugoOperations hugoOps(*ctx->getTab());
+  Ndb* ndb = GETNDB(step);
+  Uint32 phase = ctx->getProperty("PHASE");
+  
+  if (phase != 0)
+  {
+    ndbout << "Thread 1 : Error, initial phase should be 0 not " << phase << endl;
+    return NDBT_FAILED;
+  };
+  
+  /* We now start a transaction, locking row 0 */
+  ndbout << "Thread 1 : Starting transaction locking row 0..." << endl;
+  check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+  check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
+        hugoOps);
+  check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
+  
+  ndbout << "Thread 1 : Lock taken." << endl;
+  ndbout << "Thread 1 : Triggering Thread 2 by move to phase 1" << endl;
+  /* Ok, now get thread 2 to try to read row */
+  ctx->incProperty("PHASE"); /* Set to 1 */
+  
+  /* Here, loop waking up waiter on the cluster connection */
+  /* Check the property has not moved to phase 2 */
+  ndbout << "Thread 1 : Performing async wakeup until phase changes to 2"
+  << endl;
+  while (ctx->getProperty("PHASE") != 2)
+  {
+    global_poll_group->wakeup();
+    NdbSleep_MilliSleep(500);
+  }
+  
+  ndbout << "Thread 1 : Phase changed to 2, committing transaction "
+  << "and releasing lock" << endl;
+  
+  /* Ok, give them a break, commit transaction */
+  check(hugoOps.execute_Commit(ndb) ==0, hugoOps);
+  hugoOps.closeTransaction(ndb);
+  
+  ndbout << "Thread 1 : Finished" << endl;
+  return NDBT_OK;
+}  
+
+int runPkReadMultiWakeupT2(NDBT_Context* ctx, NDBT_Step* step)
+{
+  ndbout << "Thread 2 : Waiting for phase 1 notification from Thread 1" << endl;
+  ctx->getPropertyWait("PHASE", 1);
+  
+  /* Ok, now thread 1 has locked row 1, we'll attempt to read
+   * it, using the multi_ndb_wait Api to block
+   */
+  HugoOperations hugoOps(*ctx->getTab());
+  Ndb* ndb = GETNDB(step);
+  
+  ndbout << "Thread 2 : Starting async transaction to read row" << endl;
+  check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+  check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
+        hugoOps);
+  /* Prepare, Send */
+  check(hugoOps.execute_async(ndb, 
+                              NdbTransaction::Commit, 
+                              NdbOperation::AbortOnError) == 0,
+        hugoOps);
+
+  global_poll_group->addNdb(ndb);
+  Ndb ** ready_ndbs;
+  int wait_rc = 0;
+  int acknowledged = 0;
+  do
+  {
+    ndbout << "Thread 2 : Calling NdbPollGroup::wait()" << endl;
+    wait_rc = global_poll_group->wait(ready_ndbs, 10000);
+    ndbout << "           Result : " << wait_rc << endl;
+    if (wait_rc == 0)
+    {
+      if (!acknowledged)
+      {
+        ndbout << "Thread 2 : Woken up, moving to phase 2" << endl;
+        ctx->incProperty("PHASE");
+        acknowledged = 1;
+      }
+    }
+    else if (wait_rc > 0)
+    {
+      ndbout << "Thread 2 : Transaction completed" << endl;
+      ndb->pollNdb(1,0);
+      hugoOps.closeTransaction(ndb);
+    }
+  } while (wait_rc == 0);
+  
+  return (wait_rc == 1 ? NDBT_OK : NDBT_FAILED);
+}
+
+NDBT_TESTSUITE(testAsynchMultiwait);
+TESTCASE("PkReadAsynchMultiBasic",
+         "Verify wait-multi-ndb api using read ops") {
+  INITIALIZER(runSetup);
+  STEP(runPkReadMultiBasic);
+  FINALIZER(runCleanup);
+}
+TESTCASE("PkReadAsynchMultiWakeup",
+         "Verify wait-multi-ndb wakeup Api code") {
+  INITIALIZER(runSetup);
+  TC_PROPERTY("PHASE", Uint32(0));
+  STEP(runPkReadMultiWakeupT1);
+  STEP(runPkReadMultiWakeupT2);
+  FINALIZER(runCleanup);
+}
+NDBT_TESTSUITE_END(testAsynchMultiwait);
+
+int main(int argc, const char** argv){
+  ndb_init();
+  NDBT_TESTSUITE_INSTANCE(testAsynchMultiwait);
+  return testAsynchMultiwait.execute(argc, argv);
+}
+

=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt	2011-02-01 08:36:25 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt	2011-06-16 05:09:51 +0000
@@ -129,3 +129,13 @@ max-time: 1800
 cmd: testDict
 args: -n SchemaTrans -l 1
 
+# async api extensions
+
+max-time : 500
+cmd : testAsynchMultiwait
+args: -nPkReadAsynchMultiBasic T1
+
+max-time : 500
+cmd : testAsynchMultiwait
+args: -nPkReadAsynchMultiWakeup T1
+


Attachment: [text/bzr-bundle] bzr/john.duncan@oracle.com-20110616050951-4ywxur942iw47cnq.bundle
Thread
bzr commit into mysql-5.1-telco-7.2 branch (john.duncan:4182) John David Duncan16 Jun