4276 Jonas Oreland 2011-09-09 [merge]
ndb - merge 70 to 71
added:
storage/ndb/src/ndbapi/NdbWaitGroup.cpp
storage/ndb/src/ndbapi/NdbWaitGroup.hpp
storage/ndb/src/ndbapi/WakeupHandler.cpp
storage/ndb/src/ndbapi/WakeupHandler.hpp
storage/ndb/test/ndbapi/testAsynchMultiwait.cpp
modified:
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
storage/ndb/include/transporter/TransporterCallback.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/ndbd.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/ndbapi/API.hpp
storage/ndb/src/ndbapi/CMakeLists.txt
storage/ndb/src/ndbapi/Makefile.am
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/Ndbinit.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
storage/ndb/src/ndbapi/trp_client.hpp
storage/ndb/test/ndbapi/Makefile.am
storage/ndb/test/run-test/daily-basic-tests.txt
4275 jonas oreland 2011-09-08 [merge]
ndb - merge 70 to 71
added:
mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result
mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt
mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
mysql-test/suite/rpl/r/rpl_extra_row_data.result
mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt
mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt
mysql-test/suite/rpl/t/rpl_extra_row_data.test
sql/ndb_conflict_trans.cc
sql/ndb_conflict_trans.h
storage/ndb/include/util/HashMap2.hpp
storage/ndb/include/util/LinkedStack.hpp
storage/ndb/src/common/util/HashMap2.cpp
storage/ndb/src/common/util/LinkedStack.cpp
modified:
libmysqld/Makefile.am
mysql-test/suite/ndb/r/ndb_basic.result
sql/Makefile.am
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/log_event.cc
sql/log_event.h
sql/ndb_mi.cc
sql/ndb_mi.h
sql/rpl_constants.h
sql/slave.h
sql/sql_class.cc
sql/sql_class.h
storage/ndb/CMakeLists.txt
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
storage/ndb/src/common/portlib/NdbTCP.cpp
storage/ndb/src/common/util/CMakeLists.txt
storage/ndb/src/common/util/Makefile.am
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/trp_client.cpp
storage/ndb/src/ndbapi/trp_client.hpp
=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-07 17:12:12 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-09 10:48:14 +0000
@@ -1074,6 +1074,8 @@ class Ndb
friend class PollGuard;
friend class NdbQueryImpl;
friend class NdbQueryOperationImpl;
+ friend class MultiNdbWakeupHandler;
+ friend class NdbWaitGroup;
#endif
public:
=== modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp'
--- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-09-09 12:29:43 +0000
@@ -34,6 +34,7 @@ private:
};
class Ndb;
+class NdbWaitGroup;
/**
* @class Ndb_cluster_connection
@@ -207,8 +208,11 @@ public:
unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter);
unsigned int get_next_alive_node(Ndb_cluster_connection_node_iter &iter);
unsigned get_active_ndb_objects() const;
-
+
Uint64 *get_latest_trans_gci();
+ NdbWaitGroup * create_ndb_wait_group(int size);
+ bool release_ndb_wait_group(NdbWaitGroup *);
+
#endif
private:
@@ -216,6 +220,7 @@ private:
friend class NdbImpl;
friend class Ndb_cluster_connection_impl;
friend class SignalSender;
+ friend class NdbWaitGroup;
class Ndb_cluster_connection_impl & m_impl;
Ndb_cluster_connection(Ndb_cluster_connection_impl&);
=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2011-09-09 10:48:14 +0000
@@ -402,6 +402,14 @@ public:
*/
/**
+ * Notify upper layer of explicit wakeup request
+ *
+ * The is called from the thread holding receiving data from the
+ * transporter, under the protection of the transporter lock.
+ */
+ virtual void reportWakeup() { }
+
+ /**
* Ask upper layer to supply a list of struct iovec's with data to
* send to a node.
*
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2011-09-09 12:29:43 +0000
@@ -1351,6 +1351,9 @@ TransporterRegistry::consume_extra_socke
ret = my_recv(sock, buf, sizeof(buf), 0);
err = my_socket_errno();
} while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
+
+ /* Notify upper layer of explicit wakeup */
+ callbackObj->reportWakeup();
}
void
=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp 2011-08-30 12:00:48 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp 2011-09-08 11:49:24 +0000
@@ -296,7 +296,6 @@ static int
get_multithreaded_config(EmulatorData& ed)
{
// multithreaded is compiled in ndbd/ndbmtd for now
- globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
if (!globalData.isNdbMt)
{
ndbout << "NDBMT: non-mt" << endl;
@@ -304,58 +303,15 @@ get_multithreaded_config(EmulatorData& e
}
THRConfig & conf = ed.theConfiguration->m_thr_config;
-
Uint32 threadcount = conf.getThreadCount();
ndbout << "NDBMT: MaxNoOfExecutionThreads=" << threadcount << endl;
- globalData.isNdbMtLqh = true;
-
- {
- if (conf.getMtClassic())
- {
- globalData.isNdbMtLqh = false;
- }
- }
-
if (!globalData.isNdbMtLqh)
return 0;
- Uint32 threads = conf.getThreadCount(THRConfig::T_LDM);
- Uint32 workers = threads;
- {
- ndb_mgm_configuration * conf = ed.theConfiguration->getClusterConfig();
- if (conf == 0)
- {
- abort();
- }
- ndb_mgm_configuration_iterator * p =
- ndb_mgm_create_configuration_iterator(conf, CFG_SECTION_NODE);
- if (ndb_mgm_find(p, CFG_NODE_ID, globalData.ownId))
- {
- abort();
- }
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers);
- }
-
-#ifdef VM_TRACE
- // testing
- {
- const char* p;
- p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0);
- if (p != 0)
- workers = atoi(p);
- }
-#endif
-
- ndbout << "NDBMT: workers=" << workers
- << " threads=" << threads << endl;
-
- assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS);
- assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS);
- assert(workers % threads == 0);
+ ndbout << "NDBMT: workers=" << globalData.ndbMtLqhWorkers
+ << " threads=" << globalData.ndbMtLqhThreads << endl;
- globalData.ndbMtLqhWorkers = workers;
- globalData.ndbMtLqhThreads = threads;
return 0;
}
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-04 17:04:25 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-09 12:29:43 +0000
@@ -457,6 +457,49 @@ Configuration::setupConfiguration(){
m_clusterConfigIter = ndb_mgm_create_configuration_iterator
(p, CFG_SECTION_NODE);
+ /**
+ * This is parts of get_multithreaded_config
+ */
+ do
+ {
+ globalData.isNdbMt = NdbIsMultiThreaded();
+ if (!globalData.isNdbMt)
+ break;
+
+ globalData.isNdbMtLqh = true;
+ {
+ if (m_thr_config.getMtClassic())
+ {
+ globalData.isNdbMtLqh = false;
+ }
+ }
+
+ if (!globalData.isNdbMtLqh)
+ break;
+
+ Uint32 threads = m_thr_config.getThreadCount(THRConfig::T_LDM);
+ Uint32 workers = threads;
+ iter.get(CFG_NDBMT_LQH_WORKERS, &workers);
+
+#ifdef VM_TRACE
+ // testing
+ {
+ const char* p;
+ p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0);
+ if (p != 0)
+ workers = atoi(p);
+ }
+#endif
+
+
+ assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS);
+ assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS);
+ assert(workers % threads == 0);
+
+ globalData.ndbMtLqhWorkers = workers;
+ globalData.ndbMtLqhThreads = threads;
+ } while (0);
+
calcSizeAlt(cf);
DBUG_VOID_RETURN;
=== modified file 'storage/ndb/src/ndbapi/API.hpp'
--- a/storage/ndb/src/ndbapi/API.hpp 2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/API.hpp 2011-09-09 12:29:43 +0000
@@ -42,6 +42,7 @@
#include <NdbBlob.hpp>
#include <NdbBlobImpl.hpp>
#include <NdbInterpretedCode.hpp>
+#include <NdbWaitGroup.hpp>
#include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp"
=== modified file 'storage/ndb/src/ndbapi/CMakeLists.txt'
--- a/storage/ndb/src/ndbapi/CMakeLists.txt 2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/CMakeLists.txt 2011-09-09 12:29:43 +0000
@@ -61,6 +61,8 @@ ADD_CONVENIENCE_LIBRARY(ndbapi
ObjectMap.cpp
NdbInfo.cpp
NdbInfoScanOperation.cpp
+ NdbWaitGroup.cpp
+ WakeupHandler.cpp
ndb_internal.cpp
trp_client.cpp
trp_node.cpp
=== modified file 'storage/ndb/src/ndbapi/Makefile.am'
--- a/storage/ndb/src/ndbapi/Makefile.am 2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/Makefile.am 2011-09-09 12:29:43 +0000
@@ -43,6 +43,7 @@ libndbapi_la_SOURCES = \
NdbOperationInt.cpp \
NdbOperationDefine.cpp \
NdbOperationExec.cpp \
+ NdbWaitGroup.cpp \
NdbScanOperation.cpp \
NdbScanFilter.cpp \
NdbIndexOperation.cpp \
@@ -66,9 +67,10 @@ libndbapi_la_SOURCES = \
NdbInfo.cpp \
NdbInfoScanOperation.cpp \
ndb_internal.cpp \
- trp_client.cpp \
+ trp_client.cpp \
trp_node.cpp \
- trp_buffer.cpp
+ trp_buffer.cpp \
+ WakeupHandler.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp 2011-09-09 10:48:14 +0000
@@ -31,6 +31,7 @@
#include "trp_client.hpp"
#include "trp_node.hpp"
#include "NdbWaiter.hpp"
+#include "WakeupHandler.hpp"
template <class T>
struct Ndb_free_list_t
@@ -81,6 +82,9 @@ public:
NdbWaiter theWaiter;
+ WakeupHandler* wakeHandler;
+ Uint32 wakeContext;
+
NdbEventOperationImpl *m_ev_op;
int m_optimized_node_selection;
@@ -191,6 +195,7 @@ public:
*/
virtual void trp_deliver_signal(const NdbApiSignal*,
const LinearSectionPtr p[3]);
+ virtual void trp_wakeup();
virtual void recordWaitTimeNanos(Uint64 nanos);
// Is node available for running transactions
bool get_node_alive(NodeId nodeId) const;
@@ -601,4 +606,11 @@ NdbImpl::sendFragmentedSignal(NdbApiSign
return -1;
}
+inline
+void
+NdbImpl::trp_wakeup()
+{
+ wakeHandler->notifyWakeup();
+}
+
#endif
=== added file 'storage/ndb/src/ndbapi/NdbWaitGroup.cpp'
--- a/storage/ndb/src/ndbapi/NdbWaitGroup.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbWaitGroup.cpp 2011-09-09 10:48:14 +0000
@@ -0,0 +1,121 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include <ndb_global.h>
+#include "NdbWaitGroup.hpp"
+#include "WakeupHandler.hpp"
+#include "ndb_cluster_connection.hpp"
+#include "TransporterFacade.hpp"
+#include "ndb_cluster_connection_impl.hpp"
+#include "NdbImpl.hpp"
+
+NdbWaitGroup::NdbWaitGroup(Ndb_cluster_connection *_conn, int _ndbs) :
+ m_conn(_conn),
+ m_array_size(_ndbs),
+ m_count(0),
+ m_nodeId(0),
+ m_multiWaitHandler(0)
+{
+ /* Allocate the array of Ndbs */
+ m_array = new Ndb *[m_array_size];
+
+ /* Call into the TransporterFacade to set up wakeups */
+ bool rc = m_conn->m_impl.m_transporter_facade->setupWakeup();
+ assert(rc);
+
+ /* Get a new Ndb object to be the dedicated "wakeup object" for the group */
+ m_wakeNdb = new Ndb(m_conn);
+ assert(m_wakeNdb);
+ m_wakeNdb->init(1);
+ m_nodeId = m_wakeNdb->theNode;
+
+ /* Get a wakeup handler */
+ m_multiWaitHandler = new MultiNdbWakeupHandler(m_wakeNdb);
+}
+
+
+NdbWaitGroup::~NdbWaitGroup()
+{
+ while (m_count > 0)
+ {
+ m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count--)]);
+ }
+
+ delete m_multiWaitHandler;
+ delete m_wakeNdb;
+ delete[] m_array;
+}
+
+
+bool NdbWaitGroup::addNdb(Ndb *ndb)
+{
+ if (unlikely(ndb->theNode != Uint32(m_nodeId)))
+ {
+ return false; // Ndb belongs to wrong ndb_cluster_connection
+ }
+
+ if (unlikely(m_count == m_array_size))
+ {
+ return false; // array is full
+ }
+
+ if (unlikely(m_multiWaitHandler->ndbIsRegistered(ndb)))
+ {
+ return false; // duplicate of item already in group
+ }
+
+ m_count++;
+ m_array[topDownIdx(m_count)] = ndb;
+ return true;
+}
+
+
+void NdbWaitGroup::wakeup()
+{
+ m_conn->m_impl.m_transporter_facade->requestWakeup();
+}
+
+
+int NdbWaitGroup::wait(Ndb ** & arrayHead /* out */,
+ Uint32 timeout_millis,
+ int min_ndbs)
+{
+ arrayHead = NULL;
+ Ndb ** ndblist = m_array + topDownIdx(m_count);
+
+ int wait_rc;
+ int nready;
+ {
+ PollGuard pg(* m_wakeNdb->theImpl); // get ready to poll
+ wait_rc = m_multiWaitHandler->waitForInput(ndblist, m_count, min_ndbs,
+ & pg, timeout_millis);
+ nready = m_multiWaitHandler->getNumReadyNdbs();
+
+ if (wait_rc == 0)
+ {
+ arrayHead = ndblist; // success
+ for(int i = 0 ; i < nready ; i++) // remove ready Ndbs from group
+ {
+ m_multiWaitHandler->unregisterNdb(m_array[topDownIdx(m_count)]);
+ m_count--;
+ }
+ }
+ } /* release PollGuard */
+
+ return wait_rc ? -1 : nready;
+}
+
=== added file 'storage/ndb/src/ndbapi/NdbWaitGroup.hpp'
--- a/storage/ndb/src/ndbapi/NdbWaitGroup.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbWaitGroup.hpp 2011-09-09 10:48:14 +0000
@@ -0,0 +1,104 @@
+/*
+ Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef NdbWaitGroup_H
+#define NdbWaitGroup_H
+
+class Ndb_cluster_connection;
+class Ndb;
+class MultiNdbWakeupHandler;
+
+/* NdbWaitGroup extends the Asynchronous NDB API, allowing you to wait
+ for asynchronous operations to complete on multiple Ndb objects at once.
+
+ All Ndb objects within a poll group must belong to the same cluster
+ connection, and only one poll group per cluster connection is currently
+ supported. You instantiate this poll group using
+ Ndb_cluster_connection::create_multi_ndb_wait_group().
+
+ Then, after using Ndb::sendPreparedTransactions() to send async operations
+ on a particular Ndb object, you can use NdbWaitGroup::addNdb() to add it
+ to the group.
+
+ NdbWaitGroup::wait() returns whenever some Ndb's are ready for polling; you can
+ then call Ndb::pollNdb(0, 1) on the ones that are ready.
+*/
+
+class NdbWaitGroup {
+friend class Ndb_cluster_connection;
+friend class Ndb_cluster_connection_impl;
+private:
+
+ /** The private constructor is used only by ndb_cluster_connection.
+ It allocates an initializes an NdbWaitGroup with an array of size
+ max_ndb_objects.
+ */
+ NdbWaitGroup(Ndb_cluster_connection *conn, int max_ndb_objects);
+
+ /** The destructor is also private */
+ ~NdbWaitGroup();
+
+public:
+
+ /** Add an Ndb object to the group.
+
+ Returns true on success, false on error. Error could be that the Ndb
+ is created from the wrong Ndb_cluster_connection, or is already in the
+ group, or that the group is full.
+ */
+ bool addNdb(Ndb *);
+
+ /** Wake up the thread that is currently waiting on this group.
+ This can be used by other threads to signal a condition to the
+ waiting thread.
+ If no thread is currently waiting, then delivery is not guaranteed.
+ */
+ void wakeup();
+
+ /** wait for Ndbs to be ready.
+ arrayhead (OUT): on return will hold the list of ready Ndbs.
+ The call will return when:
+ (a) at least min_ready Ndbs are ready for polling, or
+ (b) timeout milliseconds have elapsed, or
+ (c) another thread has called NdbWaitGroup::wakeup()
+
+ The return value is the number of Ndb objects ready for polling, or -1
+ if a timeout occured.
+
+ On return, arrayHead is set to point to the first element of
+ the array of Ndb object pointers that are ready for polling, and those
+ objects are implicitly no longer in the group. These Ndb *'s must be
+ read from arrayHead before before any further calls to addNdb().
+ */
+ int wait(Ndb ** & arrayHead, Uint32 timeout_millis, int min_ready = 1 );
+
+private: /* private internal methods */
+ int topDownIdx(int n) { return m_array_size - n; }
+
+private: /* private instance variables */
+ Ndb_cluster_connection *m_conn;
+ MultiNdbWakeupHandler *m_multiWaitHandler;
+ Ndb *m_wakeNdb;
+ Ndb **m_array;
+ int m_array_size;
+ int m_count;
+ int m_nodeId;
+};
+
+
+#endif
+
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2011-09-07 17:12:12 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2011-09-09 10:48:14 +0000
@@ -1014,12 +1014,24 @@ Ndb::completedTransaction(NdbTransaction
theNoOfSentTransactions = tNoSentTransactions - 1;
aCon->theListState = NdbTransaction::InCompletedList;
aCon->handleExecuteCompletion();
- if ((theMinNoOfEventsToWakeUp != 0) &&
- (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
- theMinNoOfEventsToWakeUp = 0;
- theImpl->theWaiter.signal(NO_WAIT);
- return;
- }//if
+
+ if (theImpl->wakeHandler == 0)
+ {
+ if ((theMinNoOfEventsToWakeUp != 0) &&
+ (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp))
+ {
+ theMinNoOfEventsToWakeUp = 0;
+ theImpl->theWaiter.signal(NO_WAIT);
+ return;
+ }
+ }
+ else
+ {
+ /**
+ * This is for multi-wait handling
+ */
+ theImpl->wakeHandler->notifyTransactionCompleted(this);
+ }
} else {
ndbout << "theNoOfSentTransactions = " << (int) theNoOfSentTransactions;
ndbout << " theListState = " << (int) aCon->theListState;
=== modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp'
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-09-09 10:48:14 +0000
@@ -204,6 +204,8 @@ NdbImpl::NdbImpl(Ndb_cluster_connection
theNdbObjectIdMap(1024,1024),
theNoOfDBnodes(0),
theWaiter(this),
+ wakeHandler(0),
+ wakeContext(~Uint32(0)),
m_ev_op(0),
customDataPtr(0)
{
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-09 12:29:43 +0000
@@ -546,6 +546,7 @@ TransporterFacade::TransporterFacade(Glo
theClusterMgr(NULL),
checkCounter(4),
currentSendLimit(1),
+ dozer(NULL),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
@@ -2047,3 +2048,64 @@ TransporterFacade::ext_doConnect(int aNo
theClusterMgr->unlock();
}
+bool
+TransporterFacade::setupWakeup()
+{
+ /* Ask TransporterRegistry to setup wakeup sockets */
+ bool rc;
+ lock_mutex();
+ {
+ rc = theTransporterRegistry->setup_wakeup_socket();
+ }
+ unlock_mutex();
+ return rc;
+}
+
+bool
+TransporterFacade::registerForWakeup(trp_client* _dozer)
+{
+ /* Called with Transporter lock */
+ /* In future use a DLList for dozers.
+ * Ideally with some way to wake one rather than all
+ * For now, we just have one/TransporterFacade
+ */
+ if (dozer != NULL)
+ return false;
+
+ dozer = _dozer;
+ return true;
+}
+
+bool
+TransporterFacade::unregisterForWakeup(trp_client* _dozer)
+{
+ /* Called with Transporter lock */
+ if (dozer != _dozer)
+ return false;
+
+ dozer = NULL;
+ return true;
+}
+
+void
+TransporterFacade::requestWakeup()
+{
+ /* Forward to TransporterRegistry
+ * No need for locks, assuming only one client at a time will use
+ */
+ theTransporterRegistry->wakeup();
+}
+
+
+void
+TransporterFacade::reportWakeup()
+{
+ /* Explicit wakeup callback
+ * Called with Transporter Mutex held
+ */
+ /* Notify interested parties */
+ if (dozer != NULL)
+ {
+ dozer->trp_wakeup();
+ };
+}
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-07 17:20:19 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-09 10:48:14 +0000
@@ -203,6 +203,21 @@ public:
{
theTransporterRegistry->reset_send_buffer(node, should_be_empty);
}
+ /**
+ * Wakeup
+ *
+ * Clients normally block waiting for a pattern of signals,
+ * or until a timeout expires.
+ * This Api allows them to be woken early.
+ * To use it, a setupWakeup() call must be made once prior
+ * to using the Apis in any client.
+ *
+ */
+ bool setupWakeup();
+ bool registerForWakeup(trp_client* dozer);
+ bool unregisterForWakeup(trp_client* dozer);
+ void requestWakeup();
+ void reportWakeup();
private:
@@ -229,6 +244,11 @@ private:
void calculateSendLimit();
+ /* Single dozer supported currently.
+ * In future, use a DLList to support > 1
+ */
+ trp_client * dozer;
+
// Declarations for the receive and send thread
int theStopReceive;
Uint32 sendThreadWaitMillisec;
=== added file 'storage/ndb/src/ndbapi/WakeupHandler.cpp'
--- a/storage/ndb/src/ndbapi/WakeupHandler.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/WakeupHandler.cpp 2011-09-09 10:48:14 +0000
@@ -0,0 +1,207 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "WakeupHandler.hpp"
+#include "Ndb.hpp"
+#include "NdbImpl.hpp"
+#include "trp_client.hpp"
+
+// ***** Multiwait handler ****
+
+/**
+ * An instance of this class is used when a single thread
+ * wants to wait for the asynchronous completion of transactions
+ * on multiple Ndb objects.
+ * When the thread starts waiting, all Ndb objects are checked
+ * for CompletedTransactions, and their wakeHandler is set to
+ * poin to the same MultiNdbWakeupHandler object. The thread
+ * is then put to sleep / polls on a designated Ndb object.
+ *
+ * As transactions complete, the MultiNdbWakeHandler object
+ * moves their Ndb objects to the start of the passed Ndb
+ * object list and determines whether enough have completed
+ * to wake the waiting thread.
+ * When enough have completed, the waiting thread is woken via
+ * the designated Ndb object.
+ */
+
+MultiNdbWakeupHandler::MultiNdbWakeupHandler(Ndb* _wakeNdb)
+ : wakeNdb(_wakeNdb),
+ woken(false)
+{
+ /* Register the waiter Ndb to receive wakeups for all Ndbs in the group */
+ PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade
+ bool rc = wakeNdb->theImpl->m_transporter_facade->registerForWakeup(wakeNdb->theImpl);
+ assert(rc);
+ wakeNdb->theImpl->wakeHandler = this;
+}
+
+
+MultiNdbWakeupHandler::~MultiNdbWakeupHandler()
+{
+ PollGuard pg(* wakeNdb->theImpl); // Hold mutex before calling into Facade
+ bool rc = wakeNdb->theImpl->m_transporter_facade->
+ unregisterForWakeup(wakeNdb->theImpl);
+ assert(rc);
+}
+
+
+bool MultiNdbWakeupHandler::ndbIsRegistered(Ndb *obj)
+{
+ return (obj->theImpl->wakeHandler == this);
+}
+
+
+bool MultiNdbWakeupHandler::unregisterNdb(Ndb *obj)
+{
+ if (obj->theImpl->wakeHandler == this)
+ {
+ obj->theImpl->wakeHandler = 0;
+ obj->theImpl->wakeContext = ~ Uint32(0);
+ return true;
+ }
+ return false;
+}
+
+
+Uint32 MultiNdbWakeupHandler::getNumReadyNdbs() const
+{
+ return numNdbsWithCompletedTrans;
+}
+
+
+int MultiNdbWakeupHandler::waitForInput(Ndb** _objs, int _cnt, int min_req,
+ PollGuard* pg, int timeout_millis)
+{
+ woken = false;
+ numNdbsWithCompletedTrans = 0;
+ minNdbsToWake = min_req;
+ objs = _objs;
+ cnt = _cnt;
+
+ /* Before sleeping, we register each Ndb, and check whether it already
+ has any completed transactions.
+ */
+ for (Uint32 ndbcnt = 0; ndbcnt < cnt; ndbcnt ++)
+ {
+ Ndb* obj = objs [ndbcnt];
+
+ /* Register the Ndb */
+ obj->theImpl->wakeHandler = this;
+
+ /* Store its list position */
+ obj->theImpl->wakeContext = ndbcnt;
+
+ /* It may already have some completed transactions */
+ if (obj->theNoOfCompletedTransactions)
+ {
+ /* Move that ndb to the start of the array */
+ swapNdbsInArray(ndbcnt, numNdbsWithCompletedTrans);
+ numNdbsWithCompletedTrans++;
+ }
+ }
+
+ if (isReadyToWake()) // already enough
+ {
+ woken = false;
+ return 0;
+ }
+
+ wakeNdb->theImpl->theWaiter.set_node(0);
+ wakeNdb->theImpl->theWaiter.set_state(WAIT_TRANS);
+
+ NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+ NDB_TICKS maxTime = currTime + (NDB_TICKS) timeout_millis;
+
+ do {
+ /* PollGuard will put us to sleep until something relevant happens */
+ pg->wait_for_input(timeout_millis > 10 ? 10 : timeout_millis);
+ wakeNdb->theImpl->incClientStat(Ndb::WaitExecCompleteCount, 1);
+
+ if (isReadyToWake())
+ {
+ woken = false; // reset for next time
+ return 0;
+ }
+ timeout_millis = (int) (maxTime - NdbTick_CurrentMillisecond());
+ } while (timeout_millis > 0);
+
+ return -1; // timeout occured
+}
+
+
+void MultiNdbWakeupHandler::swapNdbsInArray(Uint32 indexA, Uint32 indexB)
+{
+ /* Generally used to move an Ndb object down the list
+ * (bubble sort), so that it is part of a contiguous
+ * list of Ndbs with completed transactions to return
+ * to caller.
+ * If it's already in the given position, no effect
+ */
+ assert(indexA < cnt);
+ assert(indexB < cnt);
+
+ Ndb* a = objs[ indexA ];
+ Ndb* b = objs[ indexB ];
+
+ assert(a->theImpl->wakeContext == indexA);
+ assert(b->theImpl->wakeContext == indexB);
+
+ objs[ indexA ] = b;
+ b->theImpl->wakeContext = indexA;
+
+ objs[ indexB ] = a;
+ a->theImpl->wakeContext = indexB;
+}
+
+
+void MultiNdbWakeupHandler::notifyTransactionCompleted(Ndb* from)
+{
+ Uint32 & completedNdbListPos = from->theImpl->wakeContext;
+
+ /* TODO : assert that transporter lock is held */
+ assert(completedNdbListPos < cnt);
+ assert(wakeNdb->theImpl->wakeHandler == this);
+ assert(from != wakeNdb);
+
+ /* Some Ndb object has just completed another transaction.
+ Ensure that it's in the completed Ndbs list
+ */
+ if (completedNdbListPos >= numNdbsWithCompletedTrans)
+ {
+ /* It's not, swap it with Ndb in 'next' position */
+ swapNdbsInArray(completedNdbListPos, numNdbsWithCompletedTrans);
+ numNdbsWithCompletedTrans ++;
+ }
+
+ if (numNdbsWithCompletedTrans >= minNdbsToWake)
+ {
+ wakeNdb->theImpl->theWaiter.signal(NO_WAIT); // wakeup client thread
+ }
+
+ return;
+}
+
+
+void MultiNdbWakeupHandler::notifyWakeup()
+{
+ assert(wakeNdb->theImpl->wakeHandler == this);
+
+ /* Wakeup client thread, using 'waiter' Ndb */
+ woken = true;
+ wakeNdb->theImpl->theWaiter.signal(NO_WAIT);
+}
=== added file 'storage/ndb/src/ndbapi/WakeupHandler.hpp'
--- a/storage/ndb/src/ndbapi/WakeupHandler.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/WakeupHandler.hpp 2011-09-09 10:48:14 +0000
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef WakeupHandler_H
+#define WakeupHandler_H
+
+#include <ndb_types.h>
+class Ndb;
+class Ndb_cluster_connection;
+class PollGuard;
+
+/**
+ * WakeupHandler
+ *
+ * Help Ndb objects respond to wakeups from the TransporterFacade
+ * when transactions have completed.
+ *
+ * Each Ndb will own an instance of the DefaultWakeupHandler,
+ * and each NdbWaitGroup will create an instance of a more specialized
+ * WakeupHandler.
+ */
+
+class WakeupHandler
+{
+public:
+ virtual void notifyTransactionCompleted(Ndb* from) = 0;
+ virtual void notifyWakeup() = 0;
+ virtual ~WakeupHandler() {};
+};
+
+class MultiNdbWakeupHandler : public WakeupHandler
+{
+public:
+ MultiNdbWakeupHandler(Ndb* _wakeNdb);
+ ~MultiNdbWakeupHandler();
+ bool unregisterNdb(Ndb *);
+ bool ndbIsRegistered(Ndb *);
+ void notifyTransactionCompleted(Ndb* from);
+ void notifyWakeup();
+ Uint32 getNumReadyNdbs() const;
+ /** returns 0 on success, -1 on timeout: */
+ int waitForInput(Ndb **objs, int cnt, int min_requested,
+ PollGuard* pg, int timeout_millis);
+
+private: // private methods
+ void swapNdbsInArray(Uint32 indexA, Uint32 indexB);
+ bool isReadyToWake() const;
+
+private: // private instance variables
+ Uint32 numNdbsWithCompletedTrans;
+ Uint32 minNdbsToWake;
+ Ndb* wakeNdb;
+ Ndb** objs;
+ Uint32 cnt;
+ volatile bool woken;
+};
+
+
+inline bool MultiNdbWakeupHandler::isReadyToWake() const
+{
+ return (numNdbsWithCompletedTrans >= minNdbsToWake) || woken;
+}
+
+#endif
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-09-09 12:29:43 +0000
@@ -363,7 +363,8 @@ Ndb_cluster_connection_impl(const char *
m_first_ndb_object(0),
m_latest_error_msg(),
m_latest_error(0),
- m_max_trans_id(0)
+ m_max_trans_id(0),
+ m_multi_wait_group(0)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this));
@@ -497,6 +498,10 @@ Ndb_cluster_connection_impl::~Ndb_cluste
NdbMutex_Destroy(m_new_delete_ndb_mutex);
m_new_delete_ndb_mutex = 0;
+ if(m_multi_wait_group)
+ delete m_multi_wait_group;
+ m_multi_wait_group = 0;
+
DBUG_VOID_RETURN;
}
@@ -993,4 +998,34 @@ Ndb_cluster_connection::get_max_adaptive
return m_impl.m_transporter_facade->getSendThreadInterval();
}
+NdbWaitGroup *
+Ndb_cluster_connection::create_ndb_wait_group(int size)
+{
+ if(m_impl.m_multi_wait_group == NULL)
+ {
+ m_impl.m_multi_wait_group = new NdbWaitGroup(this, size);
+ return m_impl.m_multi_wait_group;
+ }
+ else
+ {
+ return NULL; // NdbWaitGroup already exists
+ }
+}
+
+bool
+Ndb_cluster_connection::release_ndb_wait_group(NdbWaitGroup *group)
+{
+ if(m_impl.m_multi_wait_group && m_impl.m_multi_wait_group == group)
+ {
+ delete m_impl.m_multi_wait_group;
+ m_impl.m_multi_wait_group = 0;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+
template class Vector<Ndb_cluster_connection_impl::Node>;
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2011-09-09 10:48:14 +0000
@@ -74,6 +74,7 @@ public:
private:
friend class Ndb;
friend class NdbImpl;
+ friend class NdbWaitGroup;
friend void* run_ndb_cluster_connection_connect_thread(void*);
friend class Ndb_cluster_connection;
friend class NdbEventBuffer;
@@ -131,6 +132,8 @@ private:
// Base offset for stats, from Ndb objects that are no
// longer with us
Uint64 globalApiStatsBaseline[ Ndb::NumClientStatistics ];
+
+ NdbWaitGroup *m_multi_wait_group;
};
#endif
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-09 12:29:43 +0000
@@ -35,6 +35,8 @@ public:
virtual void trp_deliver_signal(const NdbApiSignal *,
const LinearSectionPtr ptr[3]) = 0;
+ virtual void trp_wakeup()
+ {};
Uint32 open(class TransporterFacade*, int blockNo = -1);
void close();
=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am 2011-09-02 17:24:52 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am 2011-09-09 10:48:14 +0000
@@ -29,6 +29,7 @@ flexTT \
testBackup \
testBasic \
testBasicAsynch \
+testAsynchMultiwait \
testBlobs \
testDataBuffers \
testDict \
@@ -89,6 +90,7 @@ testBackup_SOURCES = testBackup.cpp
testBasic_SOURCES = testBasic.cpp
testSpj_SOURCES = testSpj.cpp
testBasicAsynch_SOURCES = testBasicAsynch.cpp
+testAsynchMultiwait_SOURCES = testAsynchMultiwait.cpp
testBlobs_SOURCES = testBlobs.cpp
testDataBuffers_SOURCES = testDataBuffers.cpp
testDict_SOURCES = testDict.cpp
=== added file 'storage/ndb/test/ndbapi/testAsynchMultiwait.cpp'
--- a/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/test/ndbapi/testAsynchMultiwait.cpp 2011-09-09 10:48:14 +0000
@@ -0,0 +1,304 @@
+/*
+ Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+
+#include "NDBT_Test.hpp"
+#include "NDBT_ReturnCodes.h"
+#include "HugoTransactions.hpp"
+#include "HugoAsynchTransactions.hpp"
+#include "UtilTransactions.hpp"
+#include "random.h"
+#include "../../src/ndbapi/NdbWaitGroup.hpp"
+
+NdbWaitGroup * global_poll_group;
+
+#define check(b, e) \
+ if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " \
+ << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; }
+
+
+int runSetup(NDBT_Context* ctx, NDBT_Step* step){
+
+ int records = ctx->getNumRecords();
+ int batchSize = ctx->getProperty("BatchSize", 1);
+ int transactions = (records / 100) + 1;
+ int operations = (records / transactions) + 1;
+ Ndb* pNdb = GETNDB(step);
+
+ HugoAsynchTransactions hugoTrans(*ctx->getTab());
+ if (hugoTrans.loadTableAsynch(pNdb, records, batchSize,
+ transactions, operations) != 0){
+ return NDBT_FAILED;
+ }
+
+ Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
+
+ /* The first call to create_multi_ndb_wait_group() should succeed ... */
+ global_poll_group = conn->create_ndb_wait_group(1000);
+ if(global_poll_group == 0) {
+ return NDBT_FAILED;
+ }
+
+ /* and subsequent calls should fail */
+ if(conn->create_ndb_wait_group(1000) != 0) {
+ return NDBT_FAILED;
+ }
+
+ return NDBT_OK;
+}
+
+int runCleanup(NDBT_Context* ctx, NDBT_Step* step){
+ int records = ctx->getNumRecords();
+ int batchSize = ctx->getProperty("BatchSize", 1);
+ int transactions = (records / 100) + 1;
+ int operations = (records / transactions) + 1;
+ Ndb* pNdb = GETNDB(step);
+
+ HugoAsynchTransactions hugoTrans(*ctx->getTab());
+ if (hugoTrans.pkDelRecordsAsynch(pNdb, records, batchSize,
+ transactions, operations) != 0){
+ return NDBT_FAILED;
+ }
+
+ pNdb->get_ndb_cluster_connection().release_ndb_wait_group(global_poll_group);
+
+ return NDBT_OK;
+}
+
+
+int runPkReadMultiBasic(NDBT_Context* ctx, NDBT_Step* step){
+ int loops = ctx->getNumLoops();
+ int records = ctx->getNumRecords();
+ const int MAX_NDBS = 200;
+ Ndb* pNdb = GETNDB(step);
+ Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
+
+ int i = 0;
+ HugoOperations hugoOps(*ctx->getTab());
+
+ Ndb* ndbObjs[ MAX_NDBS ];
+ NdbTransaction* transArray[ MAX_NDBS ];
+ Ndb ** ready_ndbs;
+
+ for (int j=0; j < MAX_NDBS; j++)
+ {
+ Ndb* ndb = new Ndb(conn);
+ check(ndb->init() == 0, (*ndb));
+ ndbObjs[ j ] = ndb;
+ }
+
+ while (i<loops) {
+ ndbout << "Loop : " << i << ": ";
+ int recordsLeft = records;
+
+ do
+ {
+ /* Define and execute Pk read requests on
+ * different Ndb objects
+ */
+ int ndbcnt = 0;
+ int pollcnt = 0;
+ int lumpsize = 1 + myRandom48(MIN(recordsLeft, MAX_NDBS));
+ while(lumpsize &&
+ recordsLeft &&
+ ndbcnt < MAX_NDBS)
+ {
+ Ndb* ndb = ndbObjs[ ndbcnt ];
+ NdbTransaction* trans = ndb->startTransaction();
+ check(trans != NULL, (*ndb));
+ NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
+ check(readOp != NULL, (*trans));
+ check(readOp->readTuple() == 0, (*readOp));
+ check(hugoOps.equalForRow(readOp, recordsLeft) == 0, hugoOps);
+
+ /* Read all other cols */
+ for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
+ {
+ check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
+ (*readOp));
+ }
+
+ /* Now send em off */
+ trans->executeAsynchPrepare(NdbTransaction::Commit,
+ NULL,
+ NULL,
+ NdbOperation::AbortOnError);
+ ndb->sendPreparedTransactions();
+
+ transArray[ndbcnt] = trans;
+ global_poll_group->addNdb(ndb);
+
+ ndbcnt++;
+ pollcnt++;
+ recordsLeft--;
+ lumpsize--;
+ };
+
+ /* Ok, now wait for the Ndbs to complete */
+ while (pollcnt)
+ {
+ /* Occasionally check with no timeout */
+ Uint32 timeout_millis = myRandom48(2)?10000:0;
+ int count = global_poll_group->wait(ready_ndbs, timeout_millis);
+
+ if (count > 0)
+ {
+ for (int y=0; y < count; y++)
+ {
+ Ndb *ndb = ready_ndbs[y];
+ check(ndb->pollNdb(0, 1) != 0, (*ndb));
+ }
+ pollcnt -= count;
+ }
+ }
+
+ /* Ok, now close the transactions */
+ for (int t=0; t < ndbcnt; t++)
+ {
+ transArray[t]->close();
+ }
+ } while (recordsLeft);
+
+ i++;
+ }
+
+ for (int j=0; j < MAX_NDBS; j++)
+ {
+ delete ndbObjs[ j ];
+ }
+
+ return NDBT_OK;
+}
+
+int runPkReadMultiWakeupT1(NDBT_Context* ctx, NDBT_Step* step)
+{
+ HugoOperations hugoOps(*ctx->getTab());
+ Ndb* ndb = GETNDB(step);
+ Uint32 phase = ctx->getProperty("PHASE");
+
+ if (phase != 0)
+ {
+ ndbout << "Thread 1 : Error, initial phase should be 0 not " << phase << endl;
+ return NDBT_FAILED;
+ };
+
+ /* We now start a transaction, locking row 0 */
+ ndbout << "Thread 1 : Starting transaction locking row 0..." << endl;
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+ check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
+ hugoOps);
+ check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
+
+ ndbout << "Thread 1 : Lock taken." << endl;
+ ndbout << "Thread 1 : Triggering Thread 2 by move to phase 1" << endl;
+ /* Ok, now get thread 2 to try to read row */
+ ctx->incProperty("PHASE"); /* Set to 1 */
+
+ /* Here, loop waking up waiter on the cluster connection */
+ /* Check the property has not moved to phase 2 */
+ ndbout << "Thread 1 : Performing async wakeup until phase changes to 2"
+ << endl;
+ while (ctx->getProperty("PHASE") != 2)
+ {
+ global_poll_group->wakeup();
+ NdbSleep_MilliSleep(500);
+ }
+
+ ndbout << "Thread 1 : Phase changed to 2, committing transaction "
+ << "and releasing lock" << endl;
+
+ /* Ok, give them a break, commit transaction */
+ check(hugoOps.execute_Commit(ndb) ==0, hugoOps);
+ hugoOps.closeTransaction(ndb);
+
+ ndbout << "Thread 1 : Finished" << endl;
+ return NDBT_OK;
+}
+
+int runPkReadMultiWakeupT2(NDBT_Context* ctx, NDBT_Step* step)
+{
+ ndbout << "Thread 2 : Waiting for phase 1 notification from Thread 1" << endl;
+ ctx->getPropertyWait("PHASE", 1);
+
+ /* Ok, now thread 1 has locked row 1, we'll attempt to read
+ * it, using the multi_ndb_wait Api to block
+ */
+ HugoOperations hugoOps(*ctx->getTab());
+ Ndb* ndb = GETNDB(step);
+
+ ndbout << "Thread 2 : Starting async transaction to read row" << endl;
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+ check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
+ hugoOps);
+ /* Prepare, Send */
+ check(hugoOps.execute_async(ndb,
+ NdbTransaction::Commit,
+ NdbOperation::AbortOnError) == 0,
+ hugoOps);
+
+ global_poll_group->addNdb(ndb);
+ Ndb ** ready_ndbs;
+ int wait_rc = 0;
+ int acknowledged = 0;
+ do
+ {
+ ndbout << "Thread 2 : Calling NdbWaitGroup::wait()" << endl;
+ wait_rc = global_poll_group->wait(ready_ndbs, 10000);
+ ndbout << " Result : " << wait_rc << endl;
+ if (wait_rc == 0)
+ {
+ if (!acknowledged)
+ {
+ ndbout << "Thread 2 : Woken up, moving to phase 2" << endl;
+ ctx->incProperty("PHASE");
+ acknowledged = 1;
+ }
+ }
+ else if (wait_rc > 0)
+ {
+ ndbout << "Thread 2 : Transaction completed" << endl;
+ ndb->pollNdb(1,0);
+ hugoOps.closeTransaction(ndb);
+ }
+ } while (wait_rc == 0);
+
+ return (wait_rc == 1 ? NDBT_OK : NDBT_FAILED);
+}
+
+NDBT_TESTSUITE(testAsynchMultiwait);
+TESTCASE("AsynchMultiwaitPkRead",
+ "Verify NdbWaitGroup API (1 thread)") {
+ INITIALIZER(runSetup);
+ STEP(runPkReadMultiBasic);
+ FINALIZER(runCleanup);
+}
+TESTCASE("AsynchMultiwaitWakeup",
+ "Verify wait-multi-ndb wakeup Api code") {
+ INITIALIZER(runSetup);
+ TC_PROPERTY("PHASE", Uint32(0));
+ STEP(runPkReadMultiWakeupT1);
+ STEP(runPkReadMultiWakeupT2);
+ FINALIZER(runCleanup);
+}
+NDBT_TESTSUITE_END(testAsynchMultiwait);
+
+int main(int argc, const char** argv){
+ ndb_init();
+ NDBT_TESTSUITE_INSTANCE(testAsynchMultiwait);
+ return testAsynchMultiwait.execute(argc, argv);
+}
+
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2011-09-01 15:12:11 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2011-09-09 10:48:14 +0000
@@ -1744,4 +1744,12 @@ max-time: 300
cmd: testBlobs
args: -bug 62321 -skip p
+# async api extensions
+max-time: 500
+cmd: testAsynchMultiwait
+args: -n AsynchMultiwaitPkRead T1
+
+max-time: 500
+cmd: testAsynchMultiwait
+args: -n AsynchMultiwaitWakeup T1
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4275 to 4276) | Jonas Oreland | 9 Sep |