From: Jonas Oreland Date: September 9 2011 12:45pm Subject: bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4275 to 4276) List-Archive: http://lists.mysql.com/commits/140980 Message-Id: <20110909124507.F146C940661@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4276 Jonas Oreland 2011-09-09 [merge] ndb - merge 70 to 71 added: storage/ndb/src/ndbapi/NdbWaitGroup.cpp storage/ndb/src/ndbapi/NdbWaitGroup.hpp storage/ndb/src/ndbapi/WakeupHandler.cpp storage/ndb/src/ndbapi/WakeupHandler.hpp storage/ndb/test/ndbapi/testAsynchMultiwait.cpp modified: storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/include/ndbapi/ndb_cluster_connection.hpp storage/ndb/include/transporter/TransporterCallback.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/ndbapi/API.hpp storage/ndb/src/ndbapi/CMakeLists.txt storage/ndb/src/ndbapi/Makefile.am storage/ndb/src/ndbapi/NdbImpl.hpp storage/ndb/src/ndbapi/Ndbif.cpp storage/ndb/src/ndbapi/Ndbinit.cpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp storage/ndb/src/ndbapi/trp_client.hpp storage/ndb/test/ndbapi/Makefile.am storage/ndb/test/run-test/daily-basic-tests.txt 4275 jonas oreland 2011-09-08 [merge] ndb - merge 70 to 71 added: mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc mysql-test/suite/rpl/r/rpl_extra_row_data.result mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt mysql-test/suite/rpl/t/rpl_extra_row_data.test sql/ndb_conflict_trans.cc sql/ndb_conflict_trans.h storage/ndb/include/util/HashMap2.hpp storage/ndb/include/util/LinkedStack.hpp storage/ndb/src/common/util/HashMap2.cpp storage/ndb/src/common/util/LinkedStack.cpp modified: libmysqld/Makefile.am mysql-test/suite/ndb/r/ndb_basic.result sql/Makefile.am sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h sql/log_event.cc sql/log_event.h sql/ndb_mi.cc sql/ndb_mi.h sql/rpl_constants.h sql/slave.h sql/sql_class.cc sql/sql_class.h storage/ndb/CMakeLists.txt storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/include/ndbapi/ndb_cluster_connection.hpp storage/ndb/src/common/portlib/NdbTCP.cpp storage/ndb/src/common/util/CMakeLists.txt storage/ndb/src/common/util/Makefile.am storage/ndb/src/ndbapi/Ndb.cpp storage/ndb/src/ndbapi/Ndbif.cpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp storage/ndb/src/ndbapi/trp_client.cpp storage/ndb/src/ndbapi/trp_client.hpp === modified file 'storage/ndb/include/ndbapi/Ndb.hpp' --- a/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-07 17:12:12 +0000 +++ b/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-09 10:48:14 +0000 @@ -1074,6 +1074,8 @@ class Ndb friend class PollGuard; friend class NdbQueryImpl; friend class NdbQueryOperationImpl; + friend class MultiNdbWakeupHandler; + friend class NdbWaitGroup; #endif public: === modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp' --- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-09-08 06:22:07 +0000 +++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-09-09 12:29:43 +0000 @@ -34,6 +34,7 @@ private: }; class Ndb; +class NdbWaitGroup; /** * @class Ndb_cluster_connection @@ -207,8 +208,11 @@ public: unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter); unsigned int get_next_alive_node(Ndb_cluster_connection_node_iter &iter); unsigned get_active_ndb_objects() const; - + Uint64 *get_latest_trans_gci(); + NdbWaitGroup * create_ndb_wait_group(int size); + bool release_ndb_wait_group(NdbWaitGroup *); + #endif private: @@ -216,6 +220,7 @@ private: friend class NdbImpl; friend class Ndb_cluster_connection_impl; friend class SignalSender; + friend class NdbWaitGroup; class Ndb_cluster_connection_impl & m_impl; Ndb_cluster_connection(Ndb_cluster_connection_impl&); === modified file 'storage/ndb/include/transporter/TransporterCallback.hpp' --- a/storage/ndb/include/transporter/TransporterCallback.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2011-09-09 10:48:14 +0000 @@ -402,6 +402,14 @@ public: */ /** + * Notify upper layer of explicit wakeup request + * + * The is called from the thread holding receiving data from the + * transporter, under the protection of the transporter lock. + */ + virtual void reportWakeup() { } + + /** * Ask upper layer to supply a list of struct iovec's with data to * send to a node. * === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2011-07-04 16:30:34 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2011-09-09 12:29:43 +0000 @@ -1351,6 +1351,9 @@ TransporterRegistry::consume_extra_socke ret = my_recv(sock, buf, sizeof(buf), 0); err = my_socket_errno(); } while (ret == sizeof(buf) || (ret == -1 && err == EINTR)); + + /* Notify upper layer of explicit wakeup */ + callbackObj->reportWakeup(); } void === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- a/storage/ndb/src/kernel/ndbd.cpp 2011-08-30 12:00:48 +0000 +++ b/storage/ndb/src/kernel/ndbd.cpp 2011-09-08 11:49:24 +0000 @@ -296,7 +296,6 @@ static int get_multithreaded_config(EmulatorData& ed) { // multithreaded is compiled in ndbd/ndbmtd for now - globalData.isNdbMt = SimulatedBlock::isMultiThreaded(); if (!globalData.isNdbMt) { ndbout << "NDBMT: non-mt" << endl; @@ -304,58 +303,15 @@ get_multithreaded_config(EmulatorData& e } THRConfig & conf = ed.theConfiguration->m_thr_config; - Uint32 threadcount = conf.getThreadCount(); ndbout << "NDBMT: MaxNoOfExecutionThreads=" << threadcount << endl; - globalData.isNdbMtLqh = true; - - { - if (conf.getMtClassic()) - { - globalData.isNdbMtLqh = false; - } - } - if (!globalData.isNdbMtLqh) return 0; - Uint32 threads = conf.getThreadCount(THRConfig::T_LDM); - Uint32 workers = threads; - { - ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig(); - if (conf == 0) - { - abort(); - } - ndb_mgm_configuration_iterator * p = - ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE); - if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId)) - { - abort(); - } - ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers); - } - -#ifdef VM_TRACE - // testing - { - const char* p; - p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0); - if (p != 0) - workers = atoi(p); - } -#endif - - ndbout << "NDBMT: workers=" << workers - << " threads=" << threads << endl; - - assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS); - assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS); - assert(workers % threads == 0); + ndbout << "NDBMT: workers=" << globalData.ndbMtLqhWorkers + << " threads=" << globalData.ndbMtLqhThreads << endl; - globalData.ndbMtLqhWorkers = workers; - globalData.ndbMtLqhThreads = threads; return 0; } === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-04 17:04:25 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-09 12:29:43 +0000 @@ -457,6 +457,49 @@ Configuration::setupConfiguration(){ m_clusterConfigIter = ndb_mgm_create_configuration_iterator (p, CFG_SECTION_NODE); + /** + * This is parts of get_multithreaded_config + */ + do + { + globalData.isNdbMt = NdbIsMultiThreaded(); + if (!globalData.isNdbMt) + break; + + globalData.isNdbMtLqh = true; + { + if (m_thr_config.getMtClassic()) + { + globalData.isNdbMtLqh = false; + } + } + + if (!globalData.isNdbMtLqh) + break; + + Uint32 threads = m_thr_config.getThreadCount(THRConfig::T_LDM); + Uint32 workers = threads; + iter.get(CFG_NDBMT_LQH_WORKERS, &workers); + +#ifdef VM_TRACE + // testing + { + const char* p; + p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0); + if (p != 0) + workers = atoi(p); + } +#endif + + + assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS); + assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS); + assert(workers % threads == 0); + + globalData.ndbMtLqhWorkers = workers; + globalData.ndbMtLqhThreads = threads; + } while (0); + calcSizeAlt(cf); DBUG_VOID_RETURN; === modified file 'storage/ndb/src/ndbapi/API.hpp' --- a/storage/ndb/src/ndbapi/API.hpp 2011-07-04 16:30:34 +0000 +++ b/storage/ndb/src/ndbapi/API.hpp 2011-09-09 12:29:43 +0000 @@ -42,6 +42,7 @@ #include #include #include +#include #include #include "NdbEventOperationImpl.hpp" === modified file 'storage/ndb/src/ndbapi/CMakeLists.txt' --- a/storage/ndb/src/ndbapi/CMakeLists.txt 2011-07-04 16:30:34 +0000 +++ b/storage/ndb/src/ndbapi/CMakeLists.txt 2011-09-09 12:29:43 +0000 @@ -61,6 +61,8 @@ ADD_CONVENIENCE_LIBRARY(ndbapi ObjectMap.cpp NdbInfo.cpp NdbInfoScanOperation.cpp + NdbWaitGroup.cpp + WakeupHandler.cpp ndb_internal.cpp trp_client.cpp trp_node.cpp === modified file 'storage/ndb/src/ndbapi/Makefile.am' --- a/storage/ndb/src/ndbapi/Makefile.am 2011-07-04 16:30:34 +0000 +++ b/storage/ndb/src/ndbapi/Makefile.am 2011-09-09 12:29:43 +0000 @@ -43,6 +43,7 @@ libndbapi_la_SOURCES = \ NdbOperationInt.cpp \ NdbOperationDefine.cpp \ NdbOperationExec.cpp \ + NdbWaitGroup.cpp \ NdbScanOperation.cpp \ NdbScanFilter.cpp \ NdbIndexOperation.cpp \ @@ -66,9 +67,10 @@ libndbapi_la_SOURCES = \ NdbInfo.cpp \ NdbInfoScanOperation.cpp \ ndb_internal.cpp \ - trp_client.cpp \ + trp_client.cpp \ trp_node.cpp \ - trp_buffer.cpp + trp_buffer.cpp \ + WakeupHandler.cpp INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi === modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbImpl.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/NdbImpl.hpp 2011-09-09 10:48:14 +0000 @@ -31,6 +31,7 @@ #include "trp_client.hpp" #include "trp_node.hpp" #include "NdbWaiter.hpp" +#include "WakeupHandler.hpp" template struct Ndb_free_list_t @@ -81,6 +82,9 @@ public: NdbWaiter theWaiter; + WakeupHandler* wakeHandler; + Uint32 wakeContext; + NdbEventOperationImpl *m_ev_op; int m_optimized_node_selection; @@ -191,6 +195,7 @@ public: */ virtual void trp_deliver_signal(const NdbApiSignal*, const LinearSectionPtr p[3]); + virtual void trp_wakeup(); virtual void recordWaitTimeNanos(Uint64 nanos); // Is node available for running transactions bool get_node_alive(NodeId nodeId) const; @@ -601,4 +606,11 @@ NdbImpl::sendFragmentedSignal(NdbApiSign return -1; } +inline +void +NdbImpl::trp_wakeup() +{ + wakeHandler->notifyWakeup(); +} + #endif === added file 'storage/ndb/src/ndbapi/NdbWaitGroup.cpp' --- a/storage/ndb/src/ndbapi/NdbWaitGroup.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/ndbapi/NdbWaitGroup.cpp 2011-09-09 10:48:14 +0000 @@ -0,0 +1,121 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include +#include "NdbWaitGroup.hpp" +#include "WakeupHandler.hpp" +#include "ndb_cluster_connection.hpp" +#include "TransporterFacade.hpp" +#include "ndb_cluster_connection_impl.hpp" +#include "NdbImpl.hpp" + +NdbWaitGroup::NdbWaitGroup(Ndb_cluster_connection *_conn, int _ndbs) : + m_conn(_conn), + m_array_size(_ndbs), + m_count(0), + m_nodeId(0), + m_multiWaitHandler(0) +{ + /* Allocate the array of Ndbs */ + m_array = new Ndb *[m_array_size]; + + /* Call into the TransporterFacade to set up wakeups */ + bool rc = m_conn->m_impl.m_transporter_facade->setupWakeup(); + assert(rc); + + /* Get a new Ndb object to be the dedicated "wakeup object" for the group */ + m_wakeNdb = new Ndb(m_conn); + assert(m_wakeNdb); + m_wakeNdb->init(1); + m_nodeId = m_wakeNdb->theNode; + + /* Get a wakeup handler */ + m_multiWaitHandler = new MultiNdbWakeupHandler(m_wakeNdb); +} + + +NdbWaitGroup::~NdbWaitGroup() +{ + while (m_count > 0) + { + m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count--)]); + } + + delete m_multiWaitHandler; + delete m_wakeNdb; + delete[] m_array; +} + + +bool NdbWaitGroup::addNdb(Ndb *ndb) +{ + if (unlikely(ndb->theNode != Uint32(m_nodeId))) + { + return false; // Ndb belongs to wrong ndb_cluster_connection + } + + if (unlikely(m_count == m_array_size)) + { + return false; // array is full + } + + if (unlikely(m_multiWaitHandler->ndbIsRegistered(ndb))) + { + return false; // duplicate of item already in group + } + + m_count++; + m_array[topDownIdx(m_count)] = ndb; + return true; +} + + +void NdbWaitGroup::wakeup() +{ + m_conn->m_impl.m_transporter_facade->requestWakeup(); +} + + +int NdbWaitGroup::wait(Ndb ** & arrayHead /* out */, + Uint32 timeout_millis, + int min_ndbs) +{ + arrayHead = NULL; + Ndb ** ndblist = m_array + topDownIdx(m_count); + + int wait_rc; + int nready; + { + PollGuard pg(* m_wakeNdb->theImpl); // get ready to poll + wait_rc = m_multiWaitHandler->waitForInput(ndblist, m_count, min_ndbs, + & pg, timeout_millis); + nready = m_multiWaitHandler->getNumReadyNdbs(); + + if (wait_rc == 0) + { + arrayHead = ndblist; // success + for(int i = 0 ; i < nready ; i++) // remove ready Ndbs from group + { + m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count)]); + m_count--; + } + } + } /* release PollGuard */ + + return wait_rc ? -1 : nready; +} + === added file 'storage/ndb/src/ndbapi/NdbWaitGroup.hpp' --- a/storage/ndb/src/ndbapi/NdbWaitGroup.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/ndbapi/NdbWaitGroup.hpp 2011-09-09 10:48:14 +0000 @@ -0,0 +1,104 @@ +/* + Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef NdbWaitGroup_H +#define NdbWaitGroup_H + +class Ndb_cluster_connection; +class Ndb; +class MultiNdbWakeupHandler; + +/* NdbWaitGroup extends the Asynchronous NDB API, allowing you to wait + for asynchronous operations to complete on multiple Ndb objects at once. + + All Ndb objects within a poll group must belong to the same cluster + connection, and only one poll group per cluster connection is currently + supported. You instantiate this poll group using + Ndb_cluster_connection::create_multi_ndb_wait_group(). + + Then, after using Ndb::sendPreparedTransactions() to send async operations + on a particular Ndb object, you can use NdbWaitGroup::addNdb() to add it + to the group. + + NdbWaitGroup::wait() returns whenever some Ndb's are ready for polling; you can + then call Ndb::pollNdb(0, 1) on the ones that are ready. +*/ + +class NdbWaitGroup { +friend class Ndb_cluster_connection; +friend class Ndb_cluster_connection_impl; +private: + + /** The private constructor is used only by ndb_cluster_connection. + It allocates an initializes an NdbWaitGroup with an array of size + max_ndb_objects. + */ + NdbWaitGroup(Ndb_cluster_connection *conn, int max_ndb_objects); + + /** The destructor is also private */ + ~NdbWaitGroup(); + +public: + + /** Add an Ndb object to the group. + + Returns true on success, false on error. Error could be that the Ndb + is created from the wrong Ndb_cluster_connection, or is already in the + group, or that the group is full. + */ + bool addNdb(Ndb *); + + /** Wake up the thread that is currently waiting on this group. + This can be used by other threads to signal a condition to the + waiting thread. + If no thread is currently waiting, then delivery is not guaranteed. + */ + void wakeup(); + + /** wait for Ndbs to be ready. + arrayhead (OUT): on return will hold the list of ready Ndbs. + The call will return when: + (a) at least min_ready Ndbs are ready for polling, or + (b) timeout milliseconds have elapsed, or + (c) another thread has called NdbWaitGroup::wakeup() + + The return value is the number of Ndb objects ready for polling, or -1 + if a timeout occured. + + On return, arrayHead is set to point to the first element of + the array of Ndb object pointers that are ready for polling, and those + objects are implicitly no longer in the group. These Ndb *'s must be + read from arrayHead before before any further calls to addNdb(). + */ + int wait(Ndb ** & arrayHead, Uint32 timeout_millis, int min_ready = 1 ); + +private: /* private internal methods */ + int topDownIdx(int n) { return m_array_size - n; } + +private: /* private instance variables */ + Ndb_cluster_connection *m_conn; + MultiNdbWakeupHandler *m_multiWaitHandler; + Ndb *m_wakeNdb; + Ndb **m_array; + int m_array_size; + int m_count; + int m_nodeId; +}; + + +#endif + === modified file 'storage/ndb/src/ndbapi/Ndbif.cpp' --- a/storage/ndb/src/ndbapi/Ndbif.cpp 2011-09-07 17:12:12 +0000 +++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2011-09-09 10:48:14 +0000 @@ -1014,12 +1014,24 @@ Ndb::completedTransaction(NdbTransaction theNoOfSentTransactions = tNoSentTransactions - 1; aCon->theListState = NdbTransaction::InCompletedList; aCon->handleExecuteCompletion(); - if ((theMinNoOfEventsToWakeUp != 0) && - (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { - theMinNoOfEventsToWakeUp = 0; - theImpl->theWaiter.signal(NO_WAIT); - return; - }//if + + if (theImpl->wakeHandler == 0) + { + if ((theMinNoOfEventsToWakeUp != 0) && + (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) + { + theMinNoOfEventsToWakeUp = 0; + theImpl->theWaiter.signal(NO_WAIT); + return; + } + } + else + { + /** + * This is for multi-wait handling + */ + theImpl->wakeHandler->notifyTransactionCompleted(this); + } } else { ndbout << "theNoOfSentTransactions = " << (int) theNoOfSentTransactions; ndbout << " theListState = " << (int) aCon->theListState; === modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp' --- a/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-09-09 10:48:14 +0000 @@ -204,6 +204,8 @@ NdbImpl::NdbImpl(Ndb_cluster_connection theNdbObjectIdMap(1024,1024), theNoOfDBnodes(0), theWaiter(this), + wakeHandler(0), + wakeContext(~Uint32(0)), m_ev_op(0), customDataPtr(0) { === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-08 06:22:07 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-09 12:29:43 +0000 @@ -546,6 +546,7 @@ TransporterFacade::TransporterFacade(Glo theClusterMgr(NULL), checkCounter(4), currentSendLimit(1), + dozer(NULL), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL), @@ -2047,3 +2048,64 @@ TransporterFacade::ext_doConnect(int aNo theClusterMgr->unlock(); } +bool +TransporterFacade::setupWakeup() +{ + /* Ask TransporterRegistry to setup wakeup sockets */ + bool rc; + lock_mutex(); + { + rc = theTransporterRegistry->setup_wakeup_socket(); + } + unlock_mutex(); + return rc; +} + +bool +TransporterFacade::registerForWakeup(trp_client* _dozer) +{ + /* Called with Transporter lock */ + /* In future use a DLList for dozers. + * Ideally with some way to wake one rather than all + * For now, we just have one/TransporterFacade + */ + if (dozer != NULL) + return false; + + dozer = _dozer; + return true; +} + +bool +TransporterFacade::unregisterForWakeup(trp_client* _dozer) +{ + /* Called with Transporter lock */ + if (dozer != _dozer) + return false; + + dozer = NULL; + return true; +} + +void +TransporterFacade::requestWakeup() +{ + /* Forward to TransporterRegistry + * No need for locks, assuming only one client at a time will use + */ + theTransporterRegistry->wakeup(); +} + + +void +TransporterFacade::reportWakeup() +{ + /* Explicit wakeup callback + * Called with Transporter Mutex held + */ + /* Notify interested parties */ + if (dozer != NULL) + { + dozer->trp_wakeup(); + }; +} === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-07 17:20:19 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-09 10:48:14 +0000 @@ -203,6 +203,21 @@ public: { theTransporterRegistry->reset_send_buffer(node, should_be_empty); } + /** + * Wakeup + * + * Clients normally block waiting for a pattern of signals, + * or until a timeout expires. + * This Api allows them to be woken early. + * To use it, a setupWakeup() call must be made once prior + * to using the Apis in any client. + * + */ + bool setupWakeup(); + bool registerForWakeup(trp_client* dozer); + bool unregisterForWakeup(trp_client* dozer); + void requestWakeup(); + void reportWakeup(); private: @@ -229,6 +244,11 @@ private: void calculateSendLimit(); + /* Single dozer supported currently. + * In future, use a DLList to support > 1 + */ + trp_client * dozer; + // Declarations for the receive and send thread int theStopReceive; Uint32 sendThreadWaitMillisec; === added file 'storage/ndb/src/ndbapi/WakeupHandler.cpp' --- a/storage/ndb/src/ndbapi/WakeupHandler.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/ndbapi/WakeupHandler.cpp 2011-09-09 10:48:14 +0000 @@ -0,0 +1,207 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "WakeupHandler.hpp" +#include "Ndb.hpp" +#include "NdbImpl.hpp" +#include "trp_client.hpp" + +// ***** Multiwait handler **** + +/** + * An instance of this class is used when a single thread + * wants to wait for the asynchronous completion of transactions + * on multiple Ndb objects. + * When the thread starts waiting, all Ndb objects are checked + * for CompletedTransactions, and their wakeHandler is set to + * poin to the same MultiNdbWakeupHandler object. The thread + * is then put to sleep / polls on a designated Ndb object. + * + * As transactions complete, the MultiNdbWakeHandler object + * moves their Ndb objects to the start of the passed Ndb + * object list and determines whether enough have completed + * to wake the waiting thread. + * When enough have completed, the waiting thread is woken via + * the designated Ndb object. + */ + +MultiNdbWakeupHandler::MultiNdbWakeupHandler(Ndb* _wakeNdb) + : wakeNdb(_wakeNdb), + woken(false) +{ + /* Register the waiter Ndb to receive wakeups for all Ndbs in the group */ + PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade + bool rc = wakeNdb->theImpl->m_transporter_facade->registerForWakeup(wakeNdb->theImpl); + assert(rc); + wakeNdb->theImpl->wakeHandler = this; +} + + +MultiNdbWakeupHandler::~MultiNdbWakeupHandler() +{ + PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade + bool rc = wakeNdb->theImpl->m_transporter_facade-> + unregisterForWakeup(wakeNdb->theImpl); + assert(rc); +} + + +bool MultiNdbWakeupHandler::ndbIsRegistered(Ndb *obj) +{ + return (obj->theImpl->wakeHandler == this); +} + + +bool MultiNdbWakeupHandler::unregisterNdb(Ndb *obj) +{ + if (obj->theImpl->wakeHandler == this) + { + obj->theImpl->wakeHandler = 0; + obj->theImpl->wakeContext = ~ Uint32(0); + return true; + } + return false; +} + + +Uint32 MultiNdbWakeupHandler::getNumReadyNdbs() const +{ + return numNdbsWithCompletedTrans; +} + + +int MultiNdbWakeupHandler::waitForInput(Ndb** _objs, int _cnt, int min_req, + PollGuard* pg, int timeout_millis) +{ + woken = false; + numNdbsWithCompletedTrans = 0; + minNdbsToWake = min_req; + objs = _objs; + cnt = _cnt; + + /* Before sleeping, we register each Ndb, and check whether it already + has any completed transactions. + */ + for (Uint32 ndbcnt = 0; ndbcnt < cnt; ndbcnt ++) + { + Ndb* obj = objs [ndbcnt]; + + /* Register the Ndb */ + obj->theImpl->wakeHandler = this; + + /* Store its list position */ + obj->theImpl->wakeContext = ndbcnt; + + /* It may already have some completed transactions */ + if (obj->theNoOfCompletedTransactions) + { + /* Move that ndb to the start of the array */ + swapNdbsInArray(ndbcnt, numNdbsWithCompletedTrans); + numNdbsWithCompletedTrans++; + } + } + + if (isReadyToWake()) // already enough + { + woken = false; + return 0; + } + + wakeNdb->theImpl->theWaiter.set_node(0); + wakeNdb->theImpl->theWaiter.set_state(WAIT_TRANS); + + NDB_TICKS currTime = NdbTick_CurrentMillisecond(); + NDB_TICKS maxTime = currTime + (NDB_TICKS) timeout_millis; + + do { + /* PollGuard will put us to sleep until something relevant happens */ + pg->wait_for_input(timeout_millis > 10 ? 10 : timeout_millis); + wakeNdb->theImpl->incClientStat(Ndb::WaitExecCompleteCount, 1); + + if (isReadyToWake()) + { + woken = false; // reset for next time + return 0; + } + timeout_millis = (int) (maxTime - NdbTick_CurrentMillisecond()); + } while (timeout_millis > 0); + + return -1; // timeout occured +} + + +void MultiNdbWakeupHandler::swapNdbsInArray(Uint32 indexA, Uint32 indexB) +{ + /* Generally used to move an Ndb object down the list + * (bubble sort), so that it is part of a contiguous + * list of Ndbs with completed transactions to return + * to caller. + * If it's already in the given position, no effect + */ + assert(indexA < cnt); + assert(indexB < cnt); + + Ndb* a = objs[ indexA ]; + Ndb* b = objs[ indexB ]; + + assert(a->theImpl->wakeContext == indexA); + assert(b->theImpl->wakeContext == indexB); + + objs[ indexA ] = b; + b->theImpl->wakeContext = indexA; + + objs[ indexB ] = a; + a->theImpl->wakeContext = indexB; +} + + +void MultiNdbWakeupHandler::notifyTransactionCompleted(Ndb* from) +{ + Uint32 & completedNdbListPos = from->theImpl->wakeContext; + + /* TODO : assert that transporter lock is held */ + assert(completedNdbListPos < cnt); + assert(wakeNdb->theImpl->wakeHandler == this); + assert(from != wakeNdb); + + /* Some Ndb object has just completed another transaction. + Ensure that it's in the completed Ndbs list + */ + if (completedNdbListPos >= numNdbsWithCompletedTrans) + { + /* It's not, swap it with Ndb in 'next' position */ + swapNdbsInArray(completedNdbListPos, numNdbsWithCompletedTrans); + numNdbsWithCompletedTrans ++; + } + + if (numNdbsWithCompletedTrans >= minNdbsToWake) + { + wakeNdb->theImpl->theWaiter.signal(NO_WAIT); // wakeup client thread + } + + return; +} + + +void MultiNdbWakeupHandler::notifyWakeup() +{ + assert(wakeNdb->theImpl->wakeHandler == this); + + /* Wakeup client thread, using 'waiter' Ndb */ + woken = true; + wakeNdb->theImpl->theWaiter.signal(NO_WAIT); +} === added file 'storage/ndb/src/ndbapi/WakeupHandler.hpp' --- a/storage/ndb/src/ndbapi/WakeupHandler.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/ndbapi/WakeupHandler.hpp 2011-09-09 10:48:14 +0000 @@ -0,0 +1,78 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef WakeupHandler_H +#define WakeupHandler_H + +#include +class Ndb; +class Ndb_cluster_connection; +class PollGuard; + +/** + * WakeupHandler + * + * Help Ndb objects respond to wakeups from the TransporterFacade + * when transactions have completed. + * + * Each Ndb will own an instance of the DefaultWakeupHandler, + * and each NdbWaitGroup will create an instance of a more specialized + * WakeupHandler. + */ + +class WakeupHandler +{ +public: + virtual void notifyTransactionCompleted(Ndb* from) = 0; + virtual void notifyWakeup() = 0; + virtual ~WakeupHandler() {}; +}; + +class MultiNdbWakeupHandler : public WakeupHandler +{ +public: + MultiNdbWakeupHandler(Ndb* _wakeNdb); + ~MultiNdbWakeupHandler(); + bool unregisterNdb(Ndb *); + bool ndbIsRegistered(Ndb *); + void notifyTransactionCompleted(Ndb* from); + void notifyWakeup(); + Uint32 getNumReadyNdbs() const; + /** returns 0 on success, -1 on timeout: */ + int waitForInput(Ndb **objs, int cnt, int min_requested, + PollGuard* pg, int timeout_millis); + +private: // private methods + void swapNdbsInArray(Uint32 indexA, Uint32 indexB); + bool isReadyToWake() const; + +private: // private instance variables + Uint32 numNdbsWithCompletedTrans; + Uint32 minNdbsToWake; + Ndb* wakeNdb; + Ndb** objs; + Uint32 cnt; + volatile bool woken; +}; + + +inline bool MultiNdbWakeupHandler::isReadyToWake() const +{ + return (numNdbsWithCompletedTrans >= minNdbsToWake) || woken; +} + +#endif === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-09-08 06:22:07 +0000 +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-09-09 12:29:43 +0000 @@ -363,7 +363,8 @@ Ndb_cluster_connection_impl(const char * m_first_ndb_object(0), m_latest_error_msg(), m_latest_error(0), - m_max_trans_id(0) + m_max_trans_id(0), + m_multi_wait_group(0) { DBUG_ENTER("Ndb_cluster_connection"); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this)); @@ -497,6 +498,10 @@ Ndb_cluster_connection_impl::~Ndb_cluste NdbMutex_Destroy(m_new_delete_ndb_mutex); m_new_delete_ndb_mutex = 0; + if(m_multi_wait_group) + delete m_multi_wait_group; + m_multi_wait_group = 0; + DBUG_VOID_RETURN; } @@ -993,4 +998,34 @@ Ndb_cluster_connection::get_max_adaptive return m_impl.m_transporter_facade->getSendThreadInterval(); } +NdbWaitGroup * +Ndb_cluster_connection::create_ndb_wait_group(int size) +{ + if(m_impl.m_multi_wait_group == NULL) + { + m_impl.m_multi_wait_group = new NdbWaitGroup(this, size); + return m_impl.m_multi_wait_group; + } + else + { + return NULL; // NdbWaitGroup already exists + } +} + +bool +Ndb_cluster_connection::release_ndb_wait_group(NdbWaitGroup *group) +{ + if(m_impl.m_multi_wait_group && m_impl.m_multi_wait_group == group) + { + delete m_impl.m_multi_wait_group; + m_impl.m_multi_wait_group = 0; + return true; + } + else + { + return false; + } +} + + template class Vector; === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2011-09-09 10:48:14 +0000 @@ -74,6 +74,7 @@ public: private: friend class Ndb; friend class NdbImpl; + friend class NdbWaitGroup; friend void* run_ndb_cluster_connection_connect_thread(void*); friend class Ndb_cluster_connection; friend class NdbEventBuffer; @@ -131,6 +132,8 @@ private: // Base offset for stats, from Ndb objects that are no // longer with us Uint64 globalApiStatsBaseline[ Ndb::NumClientStatistics ]; + + NdbWaitGroup *m_multi_wait_group; }; #endif === modified file 'storage/ndb/src/ndbapi/trp_client.hpp' --- a/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-08 06:22:07 +0000 +++ b/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-09 12:29:43 +0000 @@ -35,6 +35,8 @@ public: virtual void trp_deliver_signal(const NdbApiSignal *, const LinearSectionPtr ptr[3]) = 0; + virtual void trp_wakeup() + {}; Uint32 open(class TransporterFacade*, int blockNo = -1); void close(); === modified file 'storage/ndb/test/ndbapi/Makefile.am' --- a/storage/ndb/test/ndbapi/Makefile.am 2011-09-02 17:24:52 +0000 +++ b/storage/ndb/test/ndbapi/Makefile.am 2011-09-09 10:48:14 +0000 @@ -29,6 +29,7 @@ flexTT \ testBackup \ testBasic \ testBasicAsynch \ +testAsynchMultiwait \ testBlobs \ testDataBuffers \ testDict \ @@ -89,6 +90,7 @@ testBackup_SOURCES = testBackup.cpp testBasic_SOURCES = testBasic.cpp testSpj_SOURCES = testSpj.cpp testBasicAsynch_SOURCES = testBasicAsynch.cpp +testAsynchMultiwait_SOURCES = testAsynchMultiwait.cpp testBlobs_SOURCES = testBlobs.cpp testDataBuffers_SOURCES = testDataBuffers.cpp testDict_SOURCES = testDict.cpp === added file 'storage/ndb/test/ndbapi/testAsynchMultiwait.cpp' --- a/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp 2011-09-09 10:48:14 +0000 @@ -0,0 +1,304 @@ +/* + Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + + +#include "NDBT_Test.hpp" +#include "NDBT_ReturnCodes.h" +#include "HugoTransactions.hpp" +#include "HugoAsynchTransactions.hpp" +#include "UtilTransactions.hpp" +#include "random.h" +#include "../../src/ndbapi/NdbWaitGroup.hpp" + +NdbWaitGroup * global_poll_group; + +#define check(b, e) \ + if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " \ + << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; } + + +int runSetup(NDBT_Context* ctx, NDBT_Step* step){ + + int records = ctx->getNumRecords(); + int batchSize = ctx->getProperty("BatchSize", 1); + int transactions = (records / 100) + 1; + int operations = (records / transactions) + 1; + Ndb* pNdb = GETNDB(step); + + HugoAsynchTransactions hugoTrans(*ctx->getTab()); + if (hugoTrans.loadTableAsynch(pNdb, records, batchSize, + transactions, operations) != 0){ + return NDBT_FAILED; + } + + Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection(); + + /* The first call to create_multi_ndb_wait_group() should succeed ... */ + global_poll_group = conn->create_ndb_wait_group(1000); + if(global_poll_group == 0) { + return NDBT_FAILED; + } + + /* and subsequent calls should fail */ + if(conn->create_ndb_wait_group(1000) != 0) { + return NDBT_FAILED; + } + + return NDBT_OK; +} + +int runCleanup(NDBT_Context* ctx, NDBT_Step* step){ + int records = ctx->getNumRecords(); + int batchSize = ctx->getProperty("BatchSize", 1); + int transactions = (records / 100) + 1; + int operations = (records / transactions) + 1; + Ndb* pNdb = GETNDB(step); + + HugoAsynchTransactions hugoTrans(*ctx->getTab()); + if (hugoTrans.pkDelRecordsAsynch(pNdb, records, batchSize, + transactions, operations) != 0){ + return NDBT_FAILED; + } + + pNdb->get_ndb_cluster_connection().release_ndb_wait_group(global_poll_group); + + return NDBT_OK; +} + + +int runPkReadMultiBasic(NDBT_Context* ctx, NDBT_Step* step){ + int loops = ctx->getNumLoops(); + int records = ctx->getNumRecords(); + const int MAX_NDBS = 200; + Ndb* pNdb = GETNDB(step); + Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection(); + + int i = 0; + HugoOperations hugoOps(*ctx->getTab()); + + Ndb* ndbObjs[ MAX_NDBS ]; + NdbTransaction* transArray[ MAX_NDBS ]; + Ndb ** ready_ndbs; + + for (int j=0; j < MAX_NDBS; j++) + { + Ndb* ndb = new Ndb(conn); + check(ndb->init() == 0, (*ndb)); + ndbObjs[ j ] = ndb; + } + + while (istartTransaction(); + check(trans != NULL, (*ndb)); + NdbOperation* readOp = trans->getNdbOperation(ctx->getTab()); + check(readOp != NULL, (*trans)); + check(readOp->readTuple() == 0, (*readOp)); + check(hugoOps.equalForRow(readOp, recordsLeft) == 0, hugoOps); + + /* Read all other cols */ + for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++) + { + check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL, + (*readOp)); + } + + /* Now send em off */ + trans->executeAsynchPrepare(NdbTransaction::Commit, + NULL, + NULL, + NdbOperation::AbortOnError); + ndb->sendPreparedTransactions(); + + transArray[ndbcnt] = trans; + global_poll_group->addNdb(ndb); + + ndbcnt++; + pollcnt++; + recordsLeft--; + lumpsize--; + }; + + /* Ok, now wait for the Ndbs to complete */ + while (pollcnt) + { + /* Occasionally check with no timeout */ + Uint32 timeout_millis = myRandom48(2)?10000:0; + int count = global_poll_group->wait(ready_ndbs, timeout_millis); + + if (count > 0) + { + for (int y=0; y < count; y++) + { + Ndb *ndb = ready_ndbs[y]; + check(ndb->pollNdb(0, 1) != 0, (*ndb)); + } + pollcnt -= count; + } + } + + /* Ok, now close the transactions */ + for (int t=0; t < ndbcnt; t++) + { + transArray[t]->close(); + } + } while (recordsLeft); + + i++; + } + + for (int j=0; j < MAX_NDBS; j++) + { + delete ndbObjs[ j ]; + } + + return NDBT_OK; +} + +int runPkReadMultiWakeupT1(NDBT_Context* ctx, NDBT_Step* step) +{ + HugoOperations hugoOps(*ctx->getTab()); + Ndb* ndb = GETNDB(step); + Uint32 phase = ctx->getProperty("PHASE"); + + if (phase != 0) + { + ndbout << "Thread 1 : Error, initial phase should be 0 not " << phase << endl; + return NDBT_FAILED; + }; + + /* We now start a transaction, locking row 0 */ + ndbout << "Thread 1 : Starting transaction locking row 0..." << endl; + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0, + hugoOps); + check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps); + + ndbout << "Thread 1 : Lock taken." << endl; + ndbout << "Thread 1 : Triggering Thread 2 by move to phase 1" << endl; + /* Ok, now get thread 2 to try to read row */ + ctx->incProperty("PHASE"); /* Set to 1 */ + + /* Here, loop waking up waiter on the cluster connection */ + /* Check the property has not moved to phase 2 */ + ndbout << "Thread 1 : Performing async wakeup until phase changes to 2" + << endl; + while (ctx->getProperty("PHASE") != 2) + { + global_poll_group->wakeup(); + NdbSleep_MilliSleep(500); + } + + ndbout << "Thread 1 : Phase changed to 2, committing transaction " + << "and releasing lock" << endl; + + /* Ok, give them a break, commit transaction */ + check(hugoOps.execute_Commit(ndb) ==0, hugoOps); + hugoOps.closeTransaction(ndb); + + ndbout << "Thread 1 : Finished" << endl; + return NDBT_OK; +} + +int runPkReadMultiWakeupT2(NDBT_Context* ctx, NDBT_Step* step) +{ + ndbout << "Thread 2 : Waiting for phase 1 notification from Thread 1" << endl; + ctx->getPropertyWait("PHASE", 1); + + /* Ok, now thread 1 has locked row 1, we'll attempt to read + * it, using the multi_ndb_wait Api to block + */ + HugoOperations hugoOps(*ctx->getTab()); + Ndb* ndb = GETNDB(step); + + ndbout << "Thread 2 : Starting async transaction to read row" << endl; + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0, + hugoOps); + /* Prepare, Send */ + check(hugoOps.execute_async(ndb, + NdbTransaction::Commit, + NdbOperation::AbortOnError) == 0, + hugoOps); + + global_poll_group->addNdb(ndb); + Ndb ** ready_ndbs; + int wait_rc = 0; + int acknowledged = 0; + do + { + ndbout << "Thread 2 : Calling NdbWaitGroup::wait()" << endl; + wait_rc = global_poll_group->wait(ready_ndbs, 10000); + ndbout << " Result : " << wait_rc << endl; + if (wait_rc == 0) + { + if (!acknowledged) + { + ndbout << "Thread 2 : Woken up, moving to phase 2" << endl; + ctx->incProperty("PHASE"); + acknowledged = 1; + } + } + else if (wait_rc > 0) + { + ndbout << "Thread 2 : Transaction completed" << endl; + ndb->pollNdb(1,0); + hugoOps.closeTransaction(ndb); + } + } while (wait_rc == 0); + + return (wait_rc == 1 ? NDBT_OK : NDBT_FAILED); +} + +NDBT_TESTSUITE(testAsynchMultiwait); +TESTCASE("AsynchMultiwaitPkRead", + "Verify NdbWaitGroup API (1 thread)") { + INITIALIZER(runSetup); + STEP(runPkReadMultiBasic); + FINALIZER(runCleanup); +} +TESTCASE("AsynchMultiwaitWakeup", + "Verify wait-multi-ndb wakeup Api code") { + INITIALIZER(runSetup); + TC_PROPERTY("PHASE", Uint32(0)); + STEP(runPkReadMultiWakeupT1); + STEP(runPkReadMultiWakeupT2); + FINALIZER(runCleanup); +} +NDBT_TESTSUITE_END(testAsynchMultiwait); + +int main(int argc, const char** argv){ + ndb_init(); + NDBT_TESTSUITE_INSTANCE(testAsynchMultiwait); + return testAsynchMultiwait.execute(argc, argv); +} + === modified file 'storage/ndb/test/run-test/daily-basic-tests.txt' --- a/storage/ndb/test/run-test/daily-basic-tests.txt 2011-09-01 15:12:11 +0000 +++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2011-09-09 10:48:14 +0000 @@ -1744,4 +1744,12 @@ max-time: 300 cmd: testBlobs args: -bug 62321 -skip p +# async api extensions +max-time: 500 +cmd: testAsynchMultiwait +args: -n AsynchMultiwaitPkRead T1 + +max-time: 500 +cmd: testAsynchMultiwait +args: -n AsynchMultiwaitWakeup T1 No bundle (reason: useless for push emails).