3936 Mauritz Sundell 2012-06-11 [merge]
Merge 7.1->7.2
modified:
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java
storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java
storage/ndb/include/ndb_config.h.in
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/portlib/NdbSleep.h
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/ndb_configure.cmake
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/util/HashMap2.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndbinit.cpp
storage/ndb/src/ndbapi/Ndblist.cpp
storage/ndb/test/ndbapi/flexAsynch.cpp
3935 Martin Skold 2012-06-08
Merge missmatch in result file, regenerated
modified:
mysql-test/suite/rpl/r/rpl_stm_relay_ign_space.result
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java 2012-05-31 00:47:21 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java 2012-06-10 20:51:00 +0000
@@ -128,38 +128,42 @@ create table longintstringix (
}
public void testNoWhereAscending() {
- System.out.println("QueryOrderingTest.testNoWhereAscending");
+ logger.info("QueryOrderingTest.testNoWhereAscending");
setOrdering(Ordering.ASCENDING, "id");
noWhereQuery("id", "PRIMARY", null, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24);
failOnError();
}
public void testNoWhereDescending() {
- System.out.println("QueryOrderingTest.testNoWhereDescending");
+ logger.info("QueryOrderingTest.testNoWhereDescending");
setOrdering(Ordering.DESCENDING, "id");
noWhereQuery("id", "PRIMARY", null, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
failOnError();
}
public void testPrimaryEqualAscending() {
+ logger.info("QueryOrderingTest.testPrimaryEqualAscending");
setOrdering(Ordering.ASCENDING, "longix", "intix", "stringix");
equalQuery("id", "PRIMARY", 1, 1);
failOnError();
}
public void testGreaterEqualAscending() {
+ logger.info("QueryOrderingTest.testGreaterEqualAscending");
setOrdering(Ordering.ASCENDING, "longix", "intix", "stringix");
greaterEqualQuery("longix", "idx_long_int_string", 2000000000000000L, 18, 19, 20, 21, 22, 23, 24);
failOnError();
}
public void testGreaterEqualAscendingPartial() {
+ logger.info("QueryOrderingTest.testGreaterEqualAscendingPartial");
setOrdering(Ordering.ASCENDING, "longix", "intix");
greaterEqualQuery("longix", "idx_long_int_string", 2000000000000000L, 18, 19, 20, 21, 22, 23, 24);
failOnError();
}
public void testInAndBetweenAscending() {
+ logger.info("QueryOrderingTest.testInAndBetweenAscending");
setOrdering(Ordering.ASCENDING, "longix", "intix");
inAndBetweenQuery("longix", new Object[] {1000000000000000L, 0L}, "intix", 1, 2, "idx_long_int_string", 12, 13, 14, 15, 16, 17, 3, 4, 5, 6, 7, 8);
inAndBetweenQuery("longix", Arrays.asList(new Object[] {1000000000000000L, 0L}), "stringix", "1", "4", "idx_long_int_string", 10, 11, 13, 14, 16, 17, 1, 2, 4, 5, 7, 8);
@@ -167,6 +171,7 @@ create table longintstringix (
}
public void testInAndBetweenDescending() {
+ logger.info("QueryOrderingTest.testInAndBetweenDescending");
setOrdering(Ordering.DESCENDING, "longix", "intix", "stringix");
inAndBetweenQuery("longix", new Object[] {1000000000000000L, 0L}, "intix", 1, 2, "idx_long_int_string", 17, 16, 15, 14, 13, 12, 8, 7, 6, 5, 4, 3);
inAndBetweenQuery("longix", Arrays.asList(new Object[] {1000000000000000L, 0L}), "stringix", "1", "4", "idx_long_int_string", 17, 16, 14, 13, 11, 10, 8, 7, 5, 4, 2, 1);
@@ -174,6 +179,7 @@ create table longintstringix (
}
public void testBetweenAndInAscending() {
+ logger.info("QueryOrderingTest.testBetweenAndInAscending");
setOrdering(Ordering.ASCENDING, "longix", "intix");
betweenAndInQuery("longix", 0L, 1000000000000000L, "intix", new Object[] {2, 0}, "idx_long_int_string", 0, 1, 2, 6, 7, 8, 9, 10, 11, 15, 16, 17);
betweenAndInQuery("longix", 1000000000000000L, 2000000000000000L, "intix", Arrays.asList(new Object[] {2, 1}), "idx_long_int_string", 12, 13, 14, 15, 16, 17, 21, 22, 23, 24);
@@ -181,6 +187,7 @@ create table longintstringix (
}
public void testBetweenAndInDescending() {
+ logger.info("QueryOrderingTest.testBetweenAndInDescending");
setOrdering(Ordering.DESCENDING, "longix", "intix", "stringix");
betweenAndInQuery("longix", 0L, 1000000000000000L, "intix", new Object[] {2, 0}, "idx_long_int_string", 17, 16, 15, 11, 10, 9, 8, 7, 6, 2, 1, 0);
betweenAndInQuery("longix", 1000000000000000L, 2000000000000000L, "intix", Arrays.asList(new Object[] {2, 1}), "idx_long_int_string", 24, 23, 22, 21, 17, 16, 15, 14, 13, 12);
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 2012-05-31 00:47:21 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 2012-06-10 20:51:00 +0000
@@ -96,6 +96,8 @@ public class NdbRecordIndexScanOperation
/** The buffers used for bounds; held here to prevent garbage collection */
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ private Index index;
+
public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
Index storeIndex, Table storeTable, int lockMode) {
this(clusterTransaction, storeIndex, storeTable, false, lockMode);
@@ -104,6 +106,7 @@ public class NdbRecordIndexScanOperation
public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
Index storeIndex, Table storeTable, boolean multiRange, int lockMode) {
super(clusterTransaction, storeTable, lockMode);
+ this.index = storeIndex;
this.multiRange = multiRange;
if (this.multiRange) {
ndbIndexBoundList = new ArrayList<NdbIndexScanOperation.IndexBound>();
@@ -121,7 +124,7 @@ public class NdbRecordIndexScanOperation
public void endDefinition() {
// get the scan options which also sets the filter
getScanOptions();
- if (logger.isDetailEnabled()) logger.detail("scan options present " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
+ if (logger.isDetailEnabled()) logger.detail("scan index '" + index.getName() + "' with options " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
// create the scan operation
ndbIndexScanOperation = clusterTransaction.scanIndex(
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java 2012-05-31 20:42:32 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java 2012-06-10 21:08:50 +0000
@@ -138,10 +138,11 @@ public abstract class NdbRecordScanOpera
options |= Type.SO_SCANFLAGS;
switch (ordering) {
case ASCENDING:
- flags = ScanFlag.SF_OrderBy;
+ flags |= ScanFlag.SF_OrderBy;
break;
case DESCENDING:
- flags = ScanFlag.SF_Descending;
+ flags |= ScanFlag.SF_Descending;
+ flags |= ScanFlag.SF_OrderBy;
break;
default:
throw new ClusterJFatalInternalException(local.message("ERR_Invalid_Ordering", ordering));
@@ -150,6 +151,7 @@ public abstract class NdbRecordScanOpera
if (multiRange) {
options |= Type.SO_SCANFLAGS;
flags |= ScanFlag.SF_MultiRange;
+ flags |= ScanFlag.SF_ReadRangeNo;
}
if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
options |= Type.SO_SCANFLAGS;
=== modified file 'storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java'
--- a/storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java 2012-04-14 21:37:35 +0000
+++ b/storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java 2012-06-10 20:51:00 +0000
@@ -46,7 +46,6 @@ public abstract class TestCase implement
* to result.errors.
*/
public void run(TestResult result) {
-// System.out.println("--> TestCase.run(TestResult): " + name);
TestListener listener = result.listener;
listener.startTest(this);
try {
@@ -89,4 +88,7 @@ public abstract class TestCase implement
return 0;
}
+ public String toString() {
+ return method.getDeclaringClass().getPackage().getName() + "." + name;
+ }
}
=== modified file 'storage/ndb/include/ndb_config.h.in'
--- a/storage/ndb/include/ndb_config.h.in 2012-02-07 15:41:33 +0000
+++ b/storage/ndb/include/ndb_config.h.in 2012-06-11 13:20:48 +0000
@@ -17,6 +17,7 @@
#cmakedefine HAVE_POSIX_MEMALIGN 1
#cmakedefine HAVE_CLOCK_GETTIME 1
+#cmakedefine HAVE_NANOSLEEP 1
#cmakedefine HAVE_PTHREAD_CONDATTR_SETCLOCK 1
#cmakedefine HAVE_PTHREAD_SELF 1
#cmakedefine HAVE_SCHED_GET_PRIORITY_MIN 1
=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp 2012-01-20 06:22:16 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp 2012-06-11 14:57:25 +0000
@@ -1935,6 +1935,12 @@ private:
* Returns NULL if none found
*/
NdbTransaction* getConnectedNdbTransaction(Uint32 nodeId, Uint32 instance);
+ /**
+ * Handle Connection Array lists
+ */
+ void appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId);
+ void prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId);
+ void removeConnectionArray(NdbTransaction *first, Uint32 nodeId);
// Release and disconnect from DBTC a connection
// and seize it to theConIdleList
@@ -2020,6 +2026,7 @@ private:
NdbTransaction* theTransactionList;
NdbTransaction** theConnectionArray;
+ NdbTransaction** theConnectionArrayLast;
Uint32 theMyRef; // My block reference
Uint32 theNode; // The node number of our node
=== modified file 'storage/ndb/include/portlib/NdbSleep.h'
--- a/storage/ndb/include/portlib/NdbSleep.h 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/portlib/NdbSleep.h 2012-06-11 13:20:48 +0000
@@ -20,6 +20,54 @@
#include <ndb_global.h>
+static inline void NdbSleep_MilliSleep(int milliseconds);
+
+static inline
+void NdbSleep_MicroSleep(int microseconds)
+{
+ assert(0 < microseconds);
+#ifdef _WIN32
+ // Waitable timer use 100ns time unit, negative for relative time periods
+ LARGE_INTEGER liDueTime;
+ liDueTime.QuadPart = -10LL * microseconds;
+
+ HANDLE hTimer = CreateWaitableTimer(NULL, TRUE, NULL);
+ if (NULL == hTimer ||
+ !SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0) ||
+ WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0)
+ {
+#ifndef NDEBUG
+ // Error code for crash analysis
+ DWORD winerr = GetLastError();
+#endif
+ assert(false);
+ // Fallback to millisleep in release
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+ }
+ if (NULL != hTimer)
+ {
+ CloseHandle(hTimer);
+ }
+#elif defined(HAVE_NANOSLEEP)
+ struct timespec t;
+ t.tv_sec = microseconds / 1000000;
+ t.tv_nsec = 1000 * (microseconds % 1000000);
+ while (nanosleep(&t, &t) == -1)
+ {
+ if (errno != EINTR)
+ {
+ assert(false);
+ // Fallback to millisleep in release
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+ return ;
+ }
+ }
+#else
+ // Fallback to millisleep
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+#endif
+}
+
static inline
void NdbSleep_MilliSleep(int milliseconds)
{
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-30 15:12:41 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-11 14:57:25 +0000
@@ -294,6 +294,13 @@ public:
const NodeBitmask& get_status_overloaded() const;
/**
+ * Set or clear slowdown bit.
+ * Query if any slowdown bit is set.
+ */
+ void set_status_slowdown(Uint32 nodeId, bool val);
+ const NodeBitmask& get_status_slowdown() const;
+
+ /**
* prepareSend
*
* When IOState is HaltOutput or HaltIO do not send or insert any
@@ -433,8 +440,10 @@ private:
/**
* Overloaded bits, for fast check.
+ * Similarly slowdown bits for fast check.
*/
NodeBitmask m_status_overloaded;
+ NodeBitmask m_status_slowdown;
/**
* Unpack signal data.
@@ -593,6 +602,8 @@ TransporterRegistry::set_status_overload
assert(nodeId < MAX_NODES);
if (val != m_status_overloaded.get(nodeId))
m_status_overloaded.set(nodeId, val);
+ if (val)
+ set_status_slowdown(nodeId, val);
}
inline const NodeBitmask&
@@ -601,4 +612,18 @@ TransporterRegistry::get_status_overload
return m_status_overloaded;
}
+inline void
+TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
+{
+ assert(nodeId < MAX_NODES);
+ if (val != m_status_slowdown.get(nodeId))
+ m_status_slowdown.set(nodeId, val);
+}
+
+inline const NodeBitmask&
+TransporterRegistry::get_status_slowdown() const
+{
+ return m_status_slowdown;
+}
+
#endif // Define of TransporterRegistry_H
=== modified file 'storage/ndb/ndb_configure.cmake'
--- a/storage/ndb/ndb_configure.cmake 2012-02-07 19:40:05 +0000
+++ b/storage/ndb/ndb_configure.cmake 2012-06-11 14:57:25 +0000
@@ -47,6 +47,7 @@ INCLUDE(ndb_require_variable)
CHECK_FUNCTION_EXISTS(posix_memalign HAVE_POSIX_MEMALIGN)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
CHECK_FUNCTION_EXISTS(pthread_condattr_setclock HAVE_PTHREAD_CONDATTR_SETCLOCK)
CHECK_FUNCTION_EXISTS(pthread_self HAVE_PTHREAD_SELF)
CHECK_FUNCTION_EXISTS(sched_get_priority_min HAVE_SCHED_GET_PRIORITY_MIN)
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2012-01-17 08:33:59 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2012-06-11 14:57:25 +0000
@@ -112,6 +112,10 @@ TCP_Transporter::TCP_Transporter(Transpo
setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
m_overload_limit = overload_limit(conf);
+ /**
+ * Always set slowdown limit to 60% of overload limit
+ */
+ m_slowdown_limit = m_overload_limit * 6 / 10;
}
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2012-05-07 07:51:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2012-06-11 14:57:25 +0000
@@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi
: m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId==serverNodeId),
m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
- m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
+ m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+ isMgmConnection(_isMgmConnection),
m_connected(false),
m_type(_type),
m_transporter_registry(t_reg)
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2012-05-07 07:51:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2012-06-11 14:57:25 +0000
@@ -90,6 +90,8 @@ public:
{
m_transporter_registry.set_status_overloaded(remoteNodeId,
used >= m_overload_limit);
+ m_transporter_registry.set_status_slowdown(remoteNodeId,
+ used >= m_slowdown_limit);
}
virtual int doSend() = 0;
@@ -155,6 +157,7 @@ protected:
Uint32 m_max_send_buffer;
/* Overload limit, as configured with the OverloadLimit config parameter. */
Uint32 m_overload_limit;
+ Uint32 m_slowdown_limit;
private:
=== modified file 'storage/ndb/src/common/util/HashMap2.cpp'
--- a/storage/ndb/src/common/util/HashMap2.cpp 2011-09-07 22:50:01 +0000
+++ b/storage/ndb/src/common/util/HashMap2.cpp 2012-06-07 14:06:59 +0000
@@ -157,14 +157,14 @@ TAPTEST(HashMap2)
/* Test iterator Api */
HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods>::Iterator it(hash1);
- IntIntKVPod* j;
+ IntIntKVPod* k;
for (int i=0; i < 2; i++)
{
int count = 0;
- while((j = it.next()))
+ while((k = it.next()))
{
- OK( j->b == ((j->a * 3) - i) );
- j->b--;
+ OK( k->b == ((k->a * 3) - i) );
+ k->b--;
count++;
}
OK( count == 100 );
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-31 15:07:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-06-11 14:57:25 +0000
@@ -11361,13 +11361,13 @@ void Dblqh::scanTupkeyConfLab(Signal* si
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
- const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+ const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown();
if (unlikely(!all.isclear()))
{
if (all.get(refToNode(scanptr.p->scanApiBlockref)))
{
/**
- * End scan batch if transporter-buffer are overloaded
+ * End scan batch if transporter-buffer are in slowdown state
*
* TODO: We should have counters for this...
*/
=== modified file 'storage/ndb/src/ndbapi/Ndb.cpp'
--- a/storage/ndb/src/ndbapi/Ndb.cpp 2011-10-17 18:13:57 +0000
+++ b/storage/ndb/src/ndbapi/Ndb.cpp 2012-06-11 14:57:25 +0000
@@ -162,6 +162,8 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in
if (prev != 0)
{
prev->theNext = curr->theNext;
+ if (!curr->theNext)
+ theConnectionArrayLast[tNode] = prev;
curr->theNext = tConArray;
theConnectionArray[tNode] = curr;
}
@@ -210,12 +212,10 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in
//************************************************
// Send and receive was successful
//************************************************
- NdbTransaction* tPrevFirst = theConnectionArray[tNode];
tNdbCon->setConnectedNodeId(tNode, nodeSequence);
tNdbCon->setMyBlockReference(theMyRef);
- theConnectionArray[tNode] = tNdbCon;
- tNdbCon->theNext = tPrevFirst;
+ prependConnectionArray(tNdbCon, tNode);
DBUG_RETURN(1);
} else {
//****************************************************************************
@@ -261,6 +261,8 @@ Ndb::getConnectedNdbTransaction(Uint32 n
{
assert(false); // Should have been moved in NDB_connect
prev->theNext = next->theNext;
+ if (!next->theNext)
+ theConnectionArrayLast[nodeId] = prev;
goto found_middle;
}
else
@@ -276,7 +278,7 @@ Ndb::getConnectedNdbTransaction(Uint32 n
return 0;
}
found_first:
- theConnectionArray[nodeId] = next->theNext;
+ removeConnectionArray(next, nodeId);
found_middle:
next->theNext = NULL;
@@ -944,6 +946,48 @@ Ndb::startTransactionLocal(Uint32 aPrior
DBUG_RETURN(tConnection);
}//Ndb::startTransactionLocal()
+void
+Ndb::appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId)
+{
+ NdbTransaction *last = theConnectionArrayLast[nodeId];
+ if (last)
+ {
+ last->theNext = aCon;
+ }
+ else
+ {
+ theConnectionArray[nodeId] = aCon;
+ }
+ aCon->theNext = NULL;
+ theConnectionArrayLast[nodeId] = aCon;
+}
+
+void
+Ndb::prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId)
+{
+ NdbTransaction *first = theConnectionArray[nodeId];
+ aCon->theNext = first;
+ if (!first)
+ {
+ theConnectionArrayLast[nodeId] = aCon;
+ }
+ theConnectionArray[nodeId] = aCon;
+}
+
+void
+Ndb::removeConnectionArray(NdbTransaction *first, Uint32 nodeId)
+{
+ NdbTransaction *next = first->theNext;
+ if (!next)
+ {
+ theConnectionArray[nodeId] = theConnectionArrayLast[nodeId] = NULL;
+ }
+ else
+ {
+ theConnectionArray[nodeId] = next;
+ }
+}
+
/*****************************************************************************
void closeTransaction(NdbTransaction* aConnection);
@@ -1044,8 +1088,8 @@ Ndb::closeTransaction(NdbTransaction* aC
/**
* Put it back in idle list for that node
*/
- aConnection->theNext = theConnectionArray[nodeId];
- theConnectionArray[nodeId] = aConnection;
+ appendConnectionArray(aConnection, nodeId);
+
DBUG_VOID_RETURN;
} else {
aConnection->theReleaseOnClose = false;
=== modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp'
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-10-17 18:13:57 +0000
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp 2012-06-11 14:57:25 +0000
@@ -61,6 +61,7 @@ void Ndb::setup(Ndb_cluster_connection *
theMinNoOfEventsToWakeUp= 0;
theTransactionList= NULL;
theConnectionArray= NULL;
+ theConnectionArrayLast= NULL;
the_last_check_time= 0;
theFirstTransId= 0;
theRestartGCI= 0;
@@ -83,13 +84,15 @@ void Ndb::setup(Ndb_cluster_connection *
theError.code = 0;
theConnectionArray = new NdbConnection * [MAX_NDB_NODES];
+ theConnectionArrayLast = new NdbConnection * [MAX_NDB_NODES];
theCommitAckSignal = NULL;
theCachedMinDbNodeVersion = 0;
int i;
for (i = 0; i < MAX_NDB_NODES ; i++) {
theConnectionArray[i] = NULL;
- }//forg
+ theConnectionArrayLast[i] = NULL;
+ }//for
m_sys_tab_0 = NULL;
theImpl->m_dbname.assign(aDataBase);
@@ -158,6 +161,7 @@ Ndb::~Ndb()
releaseTransactionArrays();
delete []theConnectionArray;
+ delete []theConnectionArrayLast;
if(theCommitAckSignal != NULL){
delete theCommitAckSignal;
theCommitAckSignal = NULL;
=== modified file 'storage/ndb/src/ndbapi/Ndblist.cpp'
--- a/storage/ndb/src/ndbapi/Ndblist.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/Ndblist.cpp 2012-06-11 14:57:25 +0000
@@ -44,6 +44,7 @@ Ndb::checkFailedNode()
*/
NdbTransaction * tNdbCon = theConnectionArray[node_id];
theConnectionArray[node_id] = NULL;
+ theConnectionArrayLast[node_id] = NULL;
while (tNdbCon != NULL) {
NdbTransaction* tempNdbCon = tNdbCon;
tNdbCon = tNdbCon->next();
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-02-14 08:16:48 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-06-11 13:30:32 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2003, 2011, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -24,6 +24,8 @@
#include <md5_hash.hpp>
#include <NdbThread.h>
+#include <NdbMutex.h>
+#include <NdbCondition.h>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <NdbOut.hpp>
@@ -39,6 +41,9 @@
#define MAXATTR 511
#define MAXTABLES 1
#define NDB_MAXTHREADS 128
+#define MAX_EXECUTOR_THREADS 128
+#define MAX_DEFINER_THREADS 32
+#define MAX_REAL_THREADS 160
#define NDB_MAX_NODES 48
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -52,22 +57,22 @@
#define PKSIZE 2
enum StartType {
- stIdle,
- stInsert,
- stRead,
- stUpdate,
- stDelete,
- stStop
+ stIdle = 0,
+ stInsert = 1,
+ stRead = 2,
+ stUpdate = 3,
+ stDelete = 4,
+ stStop = 5
} ;
enum RunType {
- RunInsert,
- RunRead,
- RunUpdate,
- RunDelete,
- RunCreateTable,
- RunDropTable,
- RunAll
+ RunInsert = 1,
+ RunRead = 2,
+ RunUpdate = 3,
+ RunDelete = 4,
+ RunCreateTable = 5,
+ RunDropTable = 6,
+ RunAll = 7
};
struct ThreadNdb
@@ -85,7 +90,7 @@ static void dropTables(Ndb* pMyNdb);
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
-static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType,
+static void defineNdbRecordOperation(char*, NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
@@ -97,17 +102,20 @@ static bool error_handler(const NdbError
static void input_error();
static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
+static void main_thread(RunType run_type, NdbTimer & timer);
+static Uint64 get_total_transactions();
+static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
static int retry_opt = 3 ;
static int failed = 0 ;
ErrorData * flexAsynchErrorData;
-static NdbThread* threadLife[NDB_MAXTHREADS];
+static NdbThread* threadLife[MAX_REAL_THREADS];
static int tNodeId;
-static int ThreadReady[NDB_MAXTHREADS];
-static longlong ThreadExecutions[NDB_MAXTHREADS];
-static StartType ThreadStart[NDB_MAXTHREADS];
+static int ThreadReady[MAX_REAL_THREADS];
+static longlong ThreadExecutions[MAX_REAL_THREADS];
+static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
static const NdbDictionary::Table * tables[MAXTABLES];
static char attrName[MAXATTR][MAXSTRLEN+1];
@@ -136,6 +144,8 @@ static unsigned int tLoadFac
static bool tempTable = false;
static bool startTransGuess = true;
static int tExtraReadLoop = 0;
+static bool tNew = false;
+static bool tImmediate = false;
//Program Flags
static int theTestFlag = 0;
@@ -177,13 +187,13 @@ resetThreads(){
}
static void
-waitForThreads(void)
+waitForThreads(Uint32 num_threads_to_wait_for)
{
int cont = 0;
do {
cont = 0;
NdbSleep_MilliSleep(20);
- for (unsigned i = 0; i < tNoOfThreads ; i++) {
+ for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
if (ThreadReady[i] == 0) {
cont = 1;
}//if
@@ -204,9 +214,8 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
{
ndb_init();
ThreadNdb* pThreadData;
- int tLoops=0;
- int returnValue = NDBT_OK;
DEFINE_TIMER;
+ int returnValue = NDBT_OK;
flexAsynchErrorData = new ErrorData;
flexAsynchErrorData->resetErrorCounters();
@@ -256,7 +265,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ndbout << endl;
- NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
+ NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
/* print Setting */
flexAsynchErrorData->printSettings(ndbout);
@@ -311,7 +320,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
- if (tNdbRecord)
+ if (tNdbRecord && !tNew)
{
Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
sz += 3;
@@ -325,260 +334,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
if(returnValue == NDBT_OK &&
tRunType != RunCreateTable &&
tRunType != RunDropTable){
- /****************************************************************
- * Create NDB objects. *
- ****************************************************************/
- resetThreads();
- for (Uint32 i = 0; i < tNoOfThreads ; i++) {
- pThreadData[i].ThreadNo = i;
- threadLife[i] = NdbThread_Create(threadLoop,
- (void**)&pThreadData[i],
- 32768,
- "flexAsynchThread",
- NDB_THREAD_PRIO_LOW);
- }//for
- ndbout << endl << "All NDB objects and table created" << endl << endl;
- int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
- /****************************************************************
- * Execute program. *
- ****************************************************************/
-
- for(;;) {
-
- int loopCount = tLoops + 1 ;
- ndbout << endl << "Loop # " << loopCount << endl << endl ;
-
- /****************************************************************
- * Perform inserts. *
- ****************************************************************/
-
- failed = 0 ;
- if (tRunType == RunAll || tRunType == RunInsert){
- ndbout << "Executing inserts" << endl;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- }
- if (tRunType == RunAll){
- a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int ci = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"
- << endl << endl;
- ndbout << "Attempting to redo the failed transactions now..."
- << endl ;
- ndbout << "Redo attempt " << ci <<" out of " << retry_opt
- << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- ci++;
- }
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl
- << endl;
- }//if
- }//if
- }//if
- /****************************************************************
- * Perform read. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunRead){
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- ndbout << "Executing reads" << endl;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- if (tRunType == RunAll){
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }//if
- }//for
- }//if
-
- if (tRunType == RunAll){
- if (0 < failed) {
- int i = retry_opt ;
- int cr = 1;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl;
- ndbout <<"Redo attempt " << cr <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr++ ;
- }//while
- if(0 == failed ) {
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
- }//if
- }//if
- }//if
-
-
- /****************************************************************
- * Perform update. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunUpdate){
- ndbout << "Executing updates" << endl;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- }//if
- if (tRunType == RunAll){
- a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
- if (0 < failed) {
- int i = retry_opt ;
- int cu = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cu <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cu++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- } else {
- ndbout << endl;
- ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
- /****************************************************************
- * Perform read. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll){
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- ndbout << "Executing reads" << endl;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr2 = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr2++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
-
- /****************************************************************
- * Perform delete. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunDelete){
- ndbout << "Executing deletes" << endl;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- }//if
- if (tRunType == RunAll){
- a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int cd = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<< endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now:" << endl ;
- ndbout << endl <<"Redo attempt " << cd <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cd++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
- tLoops++;
- ndbout << "--------------------------------------------------" << endl;
-
- if(tNoOfLoops != 0){
- if(tNoOfLoops <= tLoops)
- break ;
- }
- }//for
-
- execute(stStop);
- void * tmp;
- for(Uint32 i = 0; i<tNoOfThreads; i++){
- NdbThread_WaitFor(threadLife[i], &tmp);
- NdbThread_Destroy(&threadLife[i]);
+ if (tNew)
+ {
+ main_thread(tRunType, timer);
+ }
+ else
+ {
+ run_old_flexAsynch(pThreadData, timer);
}
}
@@ -609,27 +371,43 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
tRunType == RunUpdate ||
tRunType == RunDelete)
{
- longlong total_transactions = 0;
- longlong exec_time;
+ Uint64 total_transactions = 0;
+ Uint64 exec_time;
- if (tRunType == RunInsert || tRunType == RunDelete) {
- total_transactions = (longlong)tNoOfTransactions;
- total_transactions *= (longlong)tNoOfThreads;
- total_transactions *= (longlong)tNoOfParallelTrans;
- } else {
- for (Uint32 i = 0; i < tNoOfThreads; i++){
- total_transactions += ThreadExecutions[i];
+ if (tNew)
+ {
+ total_transactions = get_total_transactions();
+ }
+ else
+ {
+ if (tRunType == RunInsert || tRunType == RunDelete)
+ {
+ total_transactions = (longlong)tNoOfTransactions;
+ total_transactions *= (longlong)tNoOfThreads;
+ total_transactions *= (longlong)tNoOfParallelTrans;
+ }
+ else
+ {
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ total_transactions += ThreadExecutions[i];
+ }
}
}
if (tRunType == RunInsert || tRunType == RunDelete) {
- exec_time = (longlong)timer.elapsedTime();
+ exec_time = (Uint64)timer.elapsedTime();
} else {
- exec_time = (longlong)tExecutionTime * 1000;
+ exec_time = (Uint64)tExecutionTime * 1000;
}
ndbout << "Total number of transactions is " << total_transactions;
ndbout << endl;
ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+ if (!exec_time)
+ {
+ exec_time = 1; /* Avoid floating point exception */
+ ndbout_c("Zero execution time!!!");
+ }
total_transactions = (total_transactions * 1000) / exec_time;
int trans_per_sec = (int)total_transactions;
ndbout << "Total transactions per second " << trans_per_sec << endl;
@@ -645,7 +423,7 @@ static void execute(StartType aType)
{
resetThreads();
tellThreads(aType);
- waitForThreads();
+ waitForThreads(tNoOfThreads);
}//execute()
static void*
@@ -794,7 +572,7 @@ executeTrans(ThreadNdb* pThread,
// Define the operation, but do not execute it yet.
//-------------------------------------------------------
if (tNdbRecord)
- defineNdbRecordOperation(pThread,
+ defineNdbRecordOperation(pThread->record,
tConArray[num_ops],
aType,
threadBaseLoc2,
@@ -944,24 +722,31 @@ executeCallback(int result, NdbConnectio
NdbConnection **array_ref = (NdbConnection**)aObject;
assert(NdbObject == *array_ref);
*array_ref = NULL;
- if (result == -1) {
-
- // Add complete error handling here
+ if (result == -1 && failed < 100)
+ {
- int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
- if (retCode == 1) {
- if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
- } else if (retCode == 2) {
- ndbout << "4115 should not happen in flexAsynch" << endl;
- } else if (retCode == 3) {
- /* What can we do here? */
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- }//if(retCode == 3)
+ // Add complete error handling here
- // ndbout << "Error occured in poll:" << endl;
- // ndbout << NdbObject->getNdbError() << endl;
+ int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
+ if (retCode == 1)
+ {
+ if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630)
+ {
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ ndbout_c("Error code = %d", NdbObject->getNdbError().code);
+ }
+ }
+ else if (retCode == 2)
+ {
+ ndbout << "4115 should not happen in flexAsynch" << endl;
+ }
+ else if (retCode == 3)
+ {
+ /* What can we do here? */
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ }//if(retCode == 3)
+ // ndbout << "Error occured in poll:" << endl;
+ // ndbout << NdbObject->getNdbError() << endl;
failed++ ;
}//if
NdbObject->close(); /* Close transaction */
@@ -1067,11 +852,12 @@ defineOperation(NdbConnection* localNdbC
static void
-defineNdbRecordOperation(ThreadNdb* pThread,
- NdbConnection* pTrans, StartType aType,
- Uint32 threadBase, Uint32 aIndex)
+defineNdbRecordOperation(char *record,
+ NdbConnection* pTrans,
+ StartType aType,
+ Uint32 threadBase,
+ Uint32 aIndex)
{
- char * record = pThread->record;
Uint32 offset;
NdbDictionary::getOffset(g_record[0], 0, offset);
* (Uint32*)(record + offset) = threadBase;
@@ -1102,15 +888,24 @@ defineNdbRecordOperation(ThreadNdb* pThr
break;
}//case
case stRead: { // Read Case
- op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead);
+ op = pTrans->readTuple(g_record[0],
+ record,
+ g_record[0],
+ record,
+ NdbOperation::LM_CommittedRead);
break;
}//case
case stUpdate:{ // Update Case
- op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
+ op = pTrans->updateTuple(g_record[0],
+ record,
+ g_record[0],
+ record);
break;
}//case
case stDelete: { // Delete Case
- op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+ op = pTrans->deleteTuple(g_record[0],
+ record,
+ g_record[0]);
break;
}//case
default: {
@@ -1193,6 +988,25 @@ setUpNodeTableArray(Uint32 tableNo, cons
}
static Uint32
+get_node_relative_id(Uint32 tableNo, Uint32 node_id)
+{
+ Uint32 rel_id = 0;
+
+ for (Uint32 i = 1; i < node_id; i++)
+ {
+ if (nodeTableArray[tableNo][i])
+ rel_id++;
+ }
+ return rel_id;
+}
+
+static Uint32
+get_node_count(Uint32 tableNo)
+{
+ return get_node_relative_id(tableNo, NDB_MAX_NODES + 1);
+}
+
+static Uint32
get_my_node_id(Uint32 tableNo, Uint32 threadNo)
{
Uint32 count = 0;
@@ -1327,6 +1141,859 @@ setAggregateRun(void)
theTableCreateFlag = 1;
}
+/* Start NEW Module */
+
+/**
+ * This part contains the code used for the case --local 4 which is using
+ * the design pattern that could be used for asynchronous applications of
+ * the NDB API.
+ *
+ * This variant will always use transaction hints, it will always the
+ * NDB Record format in the NDB API.
+ */
+
+static void* definer_thread(void *data);
+static void* executor_thread(void *data);
+
+static Uint32 tNoOfExecutorThreads = 0;
+static Uint32 tNoOfDefinerThreads = 0;
+
+enum RunState
+{
+ WARMUP = 0,
+ EXECUTING = 1,
+ COOLDOWN = 2
+};
+
+RunState tRunState = WARMUP;
+
+typedef struct KeyOperation KEY_OPERATION;
+struct KeyOperation
+{
+ Uint32 first_key;
+ Uint32 second_key;
+ Uint32 definer_thread_id;
+ Uint32 executor_thread_id;
+ RunType operation_type;
+ KEY_OPERATION *next_key_op;
+};
+
+typedef struct key_list_header KEY_LIST_HEADER;
+struct key_list_header
+{
+ KEY_OPERATION *first_in_list;
+ KEY_OPERATION *last_in_list;
+ Uint32 num_in_list;
+};
+
+
+typedef struct thread_data_struct THREAD_DATA;
+struct thread_data_struct
+{
+ KEY_LIST_HEADER list_header;
+ Uint32 thread_id;
+ bool ready;
+ bool stop;
+ bool start;
+
+ char *record;
+ NdbMutex *transport_mutex;
+ struct NdbCondition *transport_cond;
+ struct NdbCondition *main_cond;
+ struct NdbCondition *start_cond;
+ char not_used[52];
+};
+THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
+
+static Uint64
+get_total_transactions()
+{
+ Uint64 total_transactions = 0;
+
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+ {
+ total_transactions += ThreadExecutions[i];
+ }
+ return total_transactions;
+}
+
+static void
+init_list_headers(KEY_LIST_HEADER *list_header,
+ Uint32 num_list_headers)
+{
+ Uint32 i;
+ KEY_LIST_HEADER *list_header_ref;
+ char *list_header_ptr = (char*)list_header;
+ for (i = 0;
+ i < num_list_headers;
+ i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
+ {
+ list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
+ list_header_ref->first_in_list = NULL;
+ list_header_ref->last_in_list = NULL;
+ list_header_ref->num_in_list = 0;
+ }
+}
+
+static void
+wait_thread_ready(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (1)
+ {
+ if (my_thread_data->ready)
+ break;
+ NdbCondition_Wait(my_thread_data->main_cond,
+ my_thread_data->transport_mutex);
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+wait_for_threads_ready(Uint32 num_threads)
+{
+ for (Uint32 i = 0; i < num_threads; i++)
+ wait_thread_ready(&thread_data_array[i]);
+}
+
+static void
+signal_thread_to_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->start = true;
+ my_thread_data->ready = false;
+ NdbCondition_Signal(my_thread_data->start_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_start(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
+}
+
+static void
+signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->ready = true;
+ NdbCondition_Signal(my_thread_data->main_cond);
+ while (1)
+ {
+ if (my_thread_data->start)
+ break;
+ NdbCondition_Wait(my_thread_data->start_cond,
+ my_thread_data->transport_mutex);
+ }
+ my_thread_data->start = false;
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_thread_to_stop(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->stop = true;
+ NdbCondition_Signal(my_thread_data->transport_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_stop()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_stop()
+{
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+destroy_thread_data(THREAD_DATA *my_thread_data)
+{
+ free(my_thread_data->record);
+ NdbMutex_Destroy(my_thread_data->transport_mutex);
+ NdbCondition_Destroy(my_thread_data->transport_cond);
+ NdbCondition_Destroy(my_thread_data->start_cond);
+ NdbCondition_Destroy(my_thread_data->main_cond);
+}
+
+static void
+init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+ my_thread_data->record = (char*)malloc(sz);
+ memset(my_thread_data->record, 0, sz);
+ init_list_headers(&my_thread_data->list_header, 1);
+ my_thread_data->stop = false;
+ my_thread_data->ready = false;
+ my_thread_data->start = false;
+ my_thread_data->transport_mutex = NdbMutex_Create();
+ my_thread_data->transport_cond = NdbCondition_Create();
+ my_thread_data->main_cond = NdbCondition_Create();
+ my_thread_data->start_cond = NdbCondition_Create();
+ my_thread_data->thread_id = thread_id;
+}
+
+static void
+create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ init_thread_data(my_thread_data, thread_id);
+ threadLife[thread_id] = NdbThread_Create(definer_thread,
+ (void**)my_thread_data,
+ 1024 * 1024,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_definer_threads()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ {
+ Uint32 thread_id = i;
+ create_definer_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
+
+static void
+create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ init_thread_data(my_thread_data, thread_id);
+ threadLife[thread_id] = NdbThread_Create(executor_thread,
+ (void**)my_thread_data,
+ 1024 * 1024,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_executor_threads()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ Uint32 thread_id = tNoOfDefinerThreads + i;
+ create_executor_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
+
+static void
+main_thread(RunType start_type, NdbTimer & timer)
+{
+ bool insert_delete;
+
+ tNoOfExecutorThreads = tNoOfThreads;
+ if (tNoOfDefinerThreads == 0)
+ {
+ tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+ }
+ tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
+
+ if (start_type == RunInsert ||
+ start_type == RunDelete)
+ insert_delete = true;
+ else
+ insert_delete = false;
+
+ create_definer_threads();
+ create_executor_threads();
+
+ wait_for_threads_ready(tNoOfThreads);
+
+ /**
+ * Start threads, start with execution threads to ensure they are
+ * up and running before definer threads starts sending data to
+ * them
+ */
+ START_TIMER;
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
+
+ if (!insert_delete)
+ {
+ sleep(tWarmupTime);
+ tRunState = EXECUTING;
+ sleep(tExecutionTime);
+ tRunState = COOLDOWN;
+ sleep(tCooldownTime);
+ signal_definer_threads_to_stop();
+ }
+ wait_for_threads_ready(tNoOfDefinerThreads);
+ STOP_TIMER;
+
+ signal_executor_threads_to_stop();
+ wait_for_threads_ready(tNoOfThreads);
+
+ /**
+ * Now all threads are stopped and prepared to be destroyed,
+ * now start them just to destroy themselves
+ */
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
+
+ void * tmp;
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ NdbThread_WaitFor(threadLife[i], &tmp);
+ NdbThread_Destroy(&threadLife[i]);
+ }
+}
+
+static NdbConnection*
+get_trans_object(Uint32 first_key,
+ Uint32 second_key,
+ Ndb *my_ndb)
+{
+ union {
+ Uint64 _align;
+ Uint32 Tkey32[2];
+ };
+ (void)_align;
+
+ Tkey32[0] = first_key;
+ Tkey32[1] = second_key;
+ Ndb::Key_part_ptr hint[2];
+ hint[0].ptr = Tkey32+0;
+ hint[0].len = 4;
+ hint[1].ptr = 0;
+ hint[1].len = 0;
+
+ return my_ndb->startTransaction(tables[0], hint);
+}
+
+static Ndb*
+get_ndb_object(Uint32 my_thread_id)
+{
+ Ndb *my_ndb = new Ndb(g_cluster_connection+(my_thread_id % tConnections),
+ "TEST_DB");
+ my_ndb->init(MAXPAR);
+ my_ndb->waitUntilReady(10000);
+ return my_ndb;
+}
+
+static void
+insert_list(KEY_LIST_HEADER *list_header,
+ KEY_OPERATION *insert_op)
+{
+ KEY_OPERATION *current_last = list_header->last_in_list;
+ insert_op->next_key_op = NULL;
+ list_header->last_in_list = insert_op;
+ if (current_last)
+ current_last->next_key_op = insert_op;
+ else
+ list_header->first_in_list = insert_op;
+ list_header->num_in_list++;
+}
+
+static KEY_OPERATION*
+get_first_free(KEY_LIST_HEADER *list_header)
+{
+ assert(list_header->first_in_list);
+ KEY_OPERATION *key_op = list_header->first_in_list;
+ list_header->first_in_list = key_op->next_key_op;
+ list_header->num_in_list--;
+ if (!list_header->first_in_list)
+ {
+ list_header->last_in_list = NULL;
+ }
+ key_op->next_key_op = NULL;
+ return key_op;
+}
+
+static void
+move_list(KEY_LIST_HEADER *src_list_header,
+ KEY_LIST_HEADER *dst_list_header)
+{
+ KEY_OPERATION *last_completed_op = dst_list_header->last_in_list;
+ KEY_OPERATION *first_in_list = src_list_header->first_in_list;
+ if (!first_in_list)
+ return;
+ if (last_completed_op)
+ {
+ last_completed_op->next_key_op = first_in_list;
+ }
+ else
+ {
+ dst_list_header->first_in_list = first_in_list;
+ }
+ dst_list_header->last_in_list = src_list_header->last_in_list;
+ dst_list_header->num_in_list += src_list_header->num_in_list;
+ src_list_header->num_in_list = 0;
+ src_list_header->first_in_list = NULL;
+ src_list_header->last_in_list = NULL;
+}
+
+/**
+ * Retrieve a linked list of prepared operations. If no operations
+ * prepared we wait on a condition until operations are defined for
+ * us to execute.
+ */
+static void
+receive_operations(THREAD_DATA *my_thread_data,
+ KEY_LIST_HEADER *list_header,
+ bool wait)
+{
+ bool first = true;
+ KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
+ list_header->first_in_list = NULL;
+ list_header->last_in_list = NULL;
+ list_header->num_in_list = 0;
+
+recheck:
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (!my_thread_data->stop &&
+ (first || thread_list_header->first_in_list))
+ {
+ move_list(thread_list_header, list_header);
+ if (list_header->first_in_list)
+ break;
+ NdbCondition_Wait(my_thread_data->transport_cond,
+ my_thread_data->transport_mutex);
+ first = false;
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+ if (first && wait &&
+ thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
+ {
+ /**
+ * We will wait for at least 200 microseconds if we haven't yet received
+ * at least half of the number of records we desire to execute.
+ */
+ NdbSleep_MicroSleep(200);
+ first = false;
+ goto recheck;
+ }
+}
+
+static void
+send_operations(Uint32 thread_id,
+ KEY_LIST_HEADER *list_header)
+{
+ THREAD_DATA *recv_thread = &thread_data_array[thread_id];
+
+ NdbMutex_Lock(recv_thread->transport_mutex);
+ /**
+ * We are moving operations into the list, thus we need
+ * to wake any threads waiting for operations to execute.
+ */
+ move_list(list_header,
+ &recv_thread->list_header);
+ NdbCondition_Signal(recv_thread->transport_cond);
+ NdbMutex_Unlock(recv_thread->transport_mutex);
+}
+
+static void
+init_key_op_list(char *key_op_ptr,
+ KEY_LIST_HEADER *list_header,
+ Uint32 max_outstanding,
+ Uint32 my_thread_id,
+ RunType my_run_type)
+{
+ KEY_OPERATION *key_op;
+
+ list_header->first_in_list = (KEY_OPERATION*)key_op_ptr;
+ for (Uint32 i = 0;
+ i < max_outstanding;
+ i++, key_op_ptr += sizeof(KEY_OPERATION))
+ {
+ key_op = (KEY_OPERATION*)key_op_ptr;
+ key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION));
+ key_op->definer_thread_id = my_thread_id;
+ key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+ key_op->operation_type = my_run_type;
+ }
+ key_op->next_key_op = NULL; /* Last key operation */
+ list_header->last_in_list = key_op;
+ list_header->num_in_list = max_outstanding;
+}
+
+static Uint32
+get_thread_id_for_record(Uint32 record_id,
+ Uint32 node_count,
+ Uint32 thread_count,
+ Uint32 thread_group,
+ Uint32 num_thread_groups,
+ Ndb *my_ndb)
+{
+ Uint32 thread_id;
+ NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb);
+ Uint32 node_id = trans->getConnectedNodeId();
+ trans->close();
+ Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
+ if (node_count >= thread_count)
+ {
+ thread_id = node_rel_id % thread_count;
+ return thread_id;
+ }
+
+recalculate:
+ thread_id = thread_group * node_count + node_rel_id;
+ if (thread_id >= thread_count)
+ {
+ /**
+ * Only the last thread group may have less than
+ * node_count threads, so choosing a random group
+ * except the last will always give a valid
+ * thread_id.
+ */
+ thread_group = rand() % (num_thread_groups - 1);
+ goto recalculate;
+ }
+ return thread_id;
+}
+
+void init_thread_id_mem(char *thread_id_mem,
+ Uint32 first_record,
+ Uint32 total_records,
+ Ndb *my_ndb)
+{
+ Uint32 node_count = get_node_count((Uint32)0);
+ Uint32 thread_count = tNoOfExecutorThreads;
+ Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count;
+ Uint32 thread_group = 0;
+ for (Uint32 record_id = first_record, i = 0;
+ i < total_records;
+ i++, record_id++)
+ {
+ thread_id_mem[i] = (char)get_thread_id_for_record(record_id,
+ node_count,
+ thread_count,
+ thread_group,
+ num_thread_groups,
+ my_ndb);
+ thread_group++;
+ if (thread_group == num_thread_groups)
+ thread_group = 0;
+ }
+}
+
+static bool
+check_for_outstanding(Uint32 *thread_state)
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ if (thread_state[i])
+ return true;
+ }
+ return false;
+}
+
+static void
+update_thread_state(KEY_LIST_HEADER *list_header,
+ Uint32 *thread_state)
+{
+ KEY_OPERATION *key_op = list_header->first_in_list;
+
+ while (key_op)
+ {
+ thread_state[key_op->executor_thread_id]--;
+ key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+ key_op = key_op->next_key_op;
+ }
+}
+
+static void
+wait_until_all_completed(THREAD_DATA *my_thread_data,
+ Uint32 *thread_state,
+ KEY_LIST_HEADER *free_list_header)
+{
+ KEY_LIST_HEADER list_header;
+ bool outstanding = true;
+ while (outstanding && !my_thread_data->stop)
+ {
+ receive_operations(my_thread_data, &list_header, false);
+ update_thread_state(&list_header, thread_state);
+ move_list(&list_header, free_list_header);
+ outstanding = check_for_outstanding(thread_state);
+ }
+}
+
+static Uint32
+prepare_operations(char *thread_id_mem,
+ KEY_LIST_HEADER *free_list_header,
+ Uint32 *thread_state,
+ Uint32 first_record_to_define,
+ Uint32 num_records_to_define,
+ Uint32 first_record,
+ Uint32 last_record,
+ Uint32 max_per_thread)
+{
+ KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
+ Uint32 record_id, i, num_records;
+
+ init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
+ for (record_id = first_record_to_define, i = 0;
+ record_id <= last_record && i < num_records_to_define;
+ record_id++, i++)
+ {
+ KEY_OPERATION *define_op = get_first_free(free_list_header);
+ Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record];
+ define_op->first_key = record_id;
+ define_op->second_key = record_id;
+ define_op->executor_thread_id = thread_id;
+ thread_state[thread_id]++;
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
+ insert_list(thread_list_header, define_op);
+ if (thread_list_header->num_in_list >= max_per_thread)
+ {
+ /**
+ * One thread has max number of records, we won't define any
+ * more to keep the code simple.
+ */
+ break;
+ }
+ }
+ num_records = i;
+ for (i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
+ if (thread_list_header->num_in_list)
+ {
+ send_operations(tNoOfDefinerThreads + i, thread_list_header);
+ }
+ }
+ return num_records;
+}
+
+static void*
+definer_thread(void *data)
+{
+ THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+ Uint32 my_thread_id = my_thread_data->thread_id;
+ RunType run_type = tRunType;
+ Uint32 thread_state[MAX_EXECUTOR_THREADS];
+ Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
+ tNoOfDefinerThreads;
+ Uint32 max_per_thread = 1000 / tNoOfDefinerThreads;
+ Uint32 total_records = max_outstanding * tNoOfTransactions;
+ Uint32 first_record = total_records * my_thread_id;
+ Uint32 my_last_record = first_record + total_records - 1;
+ Uint32 current_record = first_record;
+ KEY_LIST_HEADER free_list_header;
+ void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
+ char *thread_id_mem = (char*)malloc(total_records);
+ memset((char*)&thread_state[0], 0, sizeof(thread_state));
+
+ init_key_op_list((char*)key_op_mem,
+ &free_list_header,
+ max_outstanding,
+ my_thread_id,
+ run_type);
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
+ init_thread_id_mem(thread_id_mem,
+ first_record,
+ total_records,
+ my_ndb);
+ delete my_ndb;
+ ThreadExecutions[my_thread_id] = 0;
+ signal_thread_ready_wait_for_start(my_thread_data);
+
+ while (!my_thread_data->stop)
+ {
+ Uint32 defined_ops = prepare_operations(thread_id_mem,
+ &free_list_header,
+ &thread_state[0],
+ current_record,
+ max_outstanding,
+ first_record,
+ my_last_record,
+ max_per_thread);
+ current_record += defined_ops;
+ if (defined_ops)
+ {
+ wait_until_all_completed(my_thread_data,
+ &thread_state[0],
+ &free_list_header);
+ }
+ if (current_record > my_last_record)
+ {
+ if (run_type != RunRead &&
+ run_type != RunUpdate)
+ {
+ /**
+ * Inserts and deletes are done when first round is
+ * completed. Reads and updates proceed until time is
+ * completed.
+ */
+ break;
+ }
+ current_record = first_record;
+ }
+ }
+ signal_thread_ready_wait_for_start(my_thread_data);
+ free(key_op_mem);
+ free(thread_id_mem);
+ destroy_thread_data(my_thread_data);
+ return NULL;
+}
+
+/**
+ * This method receives a linked list of key operations and executes
+ * all of them.
+ *
+ * Return Value: >= 0 means successful completion of this many operations
+ * -1 Failure, stop test
+ */
+static int
+execute_operations(char *record,
+ Ndb* my_ndb,
+ KEY_OPERATION *key_op)
+{
+ NdbConnection* ndb_conn_array[MAXPAR];
+ Uint32 num_ops = 0;
+
+ while (key_op)
+ {
+ ndb_conn_array[num_ops] = get_trans_object(key_op->first_key,
+ key_op->second_key,
+ my_ndb);
+ if (ndb_conn_array[num_ops] == NULL){
+ error_handler(my_ndb->getNdbError());
+ ndbout << endl << "Unable to recover! Quitting now" << endl ;
+ return -1;
+ }
+ //-------------------------------------------------------
+ // Define the operation, but do not execute it yet.
+ //-------------------------------------------------------
+ defineNdbRecordOperation(record,
+ ndb_conn_array[num_ops],
+ (StartType)key_op->operation_type,
+ key_op->first_key,
+ key_op->second_key);
+
+ ndb_conn_array[num_ops]->executeAsynchPrepare(Commit,
+ &executeCallback,
+ (void*)&ndb_conn_array[num_ops]);
+ num_ops++;
+ key_op = key_op->next_key_op;
+ }
+ if (num_ops == 0)
+ return 0;
+
+ /**
+ * Now execute each defined operation and wait for all of them to
+ * complete.
+ */
+ int Tcomp = my_ndb->sendPollNdb(3000,
+ num_ops,
+ tSendForce);
+ if (Tcomp != (int)num_ops &&
+ my_ndb->getNdbError().code != 0)
+ {
+ /* Error handling */
+ if (error_count > 100)
+ return -1;
+
+ error_count++;
+ ndbout << "error = " << my_ndb->getNdbError().code << endl;
+ }
+ return Tcomp;
+}
+
+static void
+report_back_operations(KEY_OPERATION *first_defined_op)
+{
+ KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
+ KEY_OPERATION *next_op, *executed_op;
+
+ init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
+ executed_op = first_defined_op;
+ while (executed_op)
+ {
+ next_op = executed_op->next_key_op;
+ insert_list(&thread_list_header[executed_op->definer_thread_id],
+ executed_op);
+ executed_op = next_op;
+ }
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ {
+ if (thread_list_header[i].first_in_list)
+ {
+ send_operations(i, &thread_list_header[i]);
+ }
+ }
+}
+
+/**
+ * This is the main function of the executor threads, these threads
+ * receive linked lists of operations to execute from the definer
+ * threads. The definer threads stops these threads by simply
+ * sending a stop operation.
+ */
+static void*
+executor_thread(void *data)
+{
+ THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+ Uint32 my_thread_id = my_thread_data->thread_id;
+ Uint64 exec_count = 0;
+ Uint32 error_count = 0;
+ Uint32 executions = 0;
+ Uint32 error_flag = false;
+ int ret_code;
+ KEY_LIST_HEADER list_header;
+
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
+ ThreadExecutions[my_thread_id] = 0;
+
+ signal_thread_ready_wait_for_start(my_thread_data);
+
+ while (!my_thread_data->stop)
+ {
+ receive_operations(my_thread_data, &list_header, !tImmediate);
+ if (list_header.num_in_list == 0)
+ {
+ break;
+ }
+ ret_code = 0;
+ if (!error_flag)
+ {
+ /* Ignore to execute after errors to simplify error handling */
+ ret_code = execute_operations(my_thread_data->record,
+ my_ndb,
+ list_header.first_in_list);
+ }
+ report_back_operations(list_header.first_in_list);
+ if (ret_code < 0)
+ {
+ ndbout_c("executor thread id = %u received error", my_thread_id);
+ error_flag = true;
+ }
+ else if (!error_flag &&
+ (tRunType == RunInsert ||
+ tRunType == RunDelete ||
+ tRunState == EXECUTING))
+ {
+ executions++;
+ exec_count += (Uint64)ret_code;
+ }
+ }
+
+ ThreadExecutions[my_thread_id] = exec_count;
+ if (error_count)
+ {
+ ndbout_c("Received %u errors in executor thread, id = %u",
+ error_count, my_thread_id);
+ }
+ signal_thread_ready_wait_for_start(my_thread_data);
+ delete my_ndb;
+ destroy_thread_data(my_thread_data);
+ return NULL;
+}
+
+/* End NEW Module */
+
static
int
readArguments(int argc, const char** argv){
@@ -1339,6 +2006,15 @@ readArguments(int argc, const char** arg
ndbout_c("Invalid no of threads");
return -1;
}
+ }
+ else if (strcmp(argv[i], "-d") == 0)
+ {
+ tNoOfDefinerThreads = atoi(argv[i+1]);
+ if (tNoOfDefinerThreads > NDB_MAXTHREADS)
+ {
+ ndbout_c("Invalid no of definer threads");
+ return -1;
+ }
} else if (strcmp(argv[i], "-p") == 0){
tNoOfParallelTrans = atoi(argv[i+1]);
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
@@ -1463,6 +2139,18 @@ readArguments(int argc, const char** arg
} else if (strcmp(argv[i], "-create_table") == 0){
tRunType = RunCreateTable;
argc++;
+ }
+ else if (strcmp(argv[i], "-new") == 0)
+ {
+ tNew = true;
+ tNdbRecord = true;
+ argc++;
+ i--;
+ }
+ else if (strcmp(argv[i], "-immediate") == 0)
+ {
+ tImmediate = true;
+ argc++;
i--;
} else if (strcmp(argv[i], "-drop_table") == 0){
tRunType = RunDropTable;
@@ -1536,4 +2224,303 @@ input_error(){
ndbout_c(" -table Number of standard table, default 0");
}
+static void
+run_old_flexAsynch(ThreadNdb *pThreadData,
+ NdbTimer & timer)
+{
+ int tLoops=0;
+ /****************************************************************
+ * Create NDB objects. *
+ ****************************************************************/
+ resetThreads();
+ for (Uint32 i = 0; i < tNoOfThreads ; i++)
+ {
+ pThreadData[i].ThreadNo = i;
+ threadLife[i] = NdbThread_Create(threadLoop,
+ (void**)&pThreadData[i],
+ 32768,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+ }//for
+ ndbout << endl << "All NDB objects and table created" << endl << endl;
+ int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
+ /****************************************************************
+ * Execute program. *
+ ****************************************************************/
+
+ for (;;)
+ {
+
+ int loopCount = tLoops + 1 ;
+ ndbout << endl << "Loop # " << loopCount << endl << endl ;
+
+ /****************************************************************
+ * Perform inserts. *
+ ****************************************************************/
+
+ failed = 0 ;
+ if (tRunType == RunAll || tRunType == RunInsert)
+ {
+ ndbout << "Executing inserts" << endl;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ }
+ if (tRunType == RunAll)
+ {
+ a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int ci = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"
+ << endl << endl;
+ ndbout << "Attempting to redo the failed transactions now..."
+ << endl ;
+ ndbout << "Redo attempt " << ci <<" out of " << retry_opt
+ << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ ci++;
+ }
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl
+ << endl;
+ }//if
+ }//if
+ }//if
+ /****************************************************************
+ * Perform read. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunRead)
+ {
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ if (tRunType == RunAll)
+ {
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }//if
+ }//for
+ }//if
+
+ if (tRunType == RunAll)
+ {
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cr = 1;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl;
+ ndbout <<"Redo attempt " << cr <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }
+ else
+ {
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ }//if
+ }//if
+ }//if
+
+
+ /****************************************************************
+ * Perform update. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunUpdate)
+ {
+ ndbout << "Executing updates" << endl;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll)
+ {
+ a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cu = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cu <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cu++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+ /****************************************************************
+ * Perform read. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll)
+ {
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cr2 = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr2++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+
+ /****************************************************************
+ * Perform delete. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunDelete)
+ {
+ ndbout << "Executing deletes" << endl;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll)
+ {
+ a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cd = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<< endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+ ndbout << endl <<"Redo attempt " << cd <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cd++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+ tLoops++;
+ ndbout << "--------------------------------------------------" << endl;
+
+ if (tNoOfLoops != 0)
+ {
+ if (tNoOfLoops <= tLoops)
+ break ;
+ }
+ }//for
+
+ execute(stStop);
+ void * tmp;
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ NdbThread_WaitFor(threadLife[i], &tmp);
+ NdbThread_Destroy(&threadLife[i]);
+ }
+}
template class Vector<NdbDictionary::RecordSpecification>;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mauritz.sundell:3935 to 3936) | Mauritz Sundell | 11 Jun |