List:Commits« Previous MessageNext Message »
From:Mauritz Sundell Date:June 11 2012 2:58pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mauritz.sundell:3935 to 3936)
View as plain text  
 3936 Mauritz Sundell	2012-06-11 [merge]
      Merge 7.1->7.2

    modified:
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java
      storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java
      storage/ndb/include/ndb_config.h.in
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/portlib/NdbSleep.h
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/ndb_configure.cmake
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/util/HashMap2.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/ndbapi/Ndb.cpp
      storage/ndb/src/ndbapi/Ndbinit.cpp
      storage/ndb/src/ndbapi/Ndblist.cpp
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3935 Martin Skold	2012-06-08
      Merge missmatch in result file, regenerated

    modified:
      mysql-test/suite/rpl/r/rpl_stm_relay_ign_space.result
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java	2012-05-31 00:47:21 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryOrderingTest.java	2012-06-10 20:51:00 +0000
@@ -128,38 +128,42 @@ create table longintstringix (
     }
 
     public void testNoWhereAscending() {
-        System.out.println("QueryOrderingTest.testNoWhereAscending");
+        logger.info("QueryOrderingTest.testNoWhereAscending");
         setOrdering(Ordering.ASCENDING, "id");
         noWhereQuery("id", "PRIMARY", null, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24);
         failOnError();
     }
 
     public void testNoWhereDescending() {
-        System.out.println("QueryOrderingTest.testNoWhereDescending");
+        logger.info("QueryOrderingTest.testNoWhereDescending");
         setOrdering(Ordering.DESCENDING, "id");
         noWhereQuery("id", "PRIMARY", null, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
         failOnError();
     }
 
     public void testPrimaryEqualAscending() {
+        logger.info("QueryOrderingTest.testPrimaryEqualAscending");
         setOrdering(Ordering.ASCENDING, "longix", "intix", "stringix");
         equalQuery("id", "PRIMARY", 1, 1);
         failOnError();
     }
 
     public void testGreaterEqualAscending() {
+        logger.info("QueryOrderingTest.testGreaterEqualAscending");
         setOrdering(Ordering.ASCENDING, "longix", "intix", "stringix");
         greaterEqualQuery("longix", "idx_long_int_string", 2000000000000000L, 18, 19, 20, 21, 22, 23, 24);
         failOnError();
     }
 
     public void testGreaterEqualAscendingPartial() {
+        logger.info("QueryOrderingTest.testGreaterEqualAscendingPartial");
         setOrdering(Ordering.ASCENDING, "longix", "intix");
         greaterEqualQuery("longix", "idx_long_int_string", 2000000000000000L, 18, 19, 20, 21, 22, 23, 24);
         failOnError();
     }
 
     public void testInAndBetweenAscending() {
+        logger.info("QueryOrderingTest.testInAndBetweenAscending");
         setOrdering(Ordering.ASCENDING, "longix", "intix");
         inAndBetweenQuery("longix", new Object[] {1000000000000000L, 0L}, "intix", 1, 2, "idx_long_int_string", 12, 13, 14, 15, 16, 17, 3, 4, 5, 6, 7, 8);
         inAndBetweenQuery("longix", Arrays.asList(new Object[] {1000000000000000L, 0L}), "stringix", "1", "4", "idx_long_int_string", 10, 11, 13, 14, 16, 17, 1, 2, 4, 5, 7, 8);
@@ -167,6 +171,7 @@ create table longintstringix (
     }
 
     public void testInAndBetweenDescending() {
+        logger.info("QueryOrderingTest.testInAndBetweenDescending");
         setOrdering(Ordering.DESCENDING, "longix", "intix", "stringix");
         inAndBetweenQuery("longix", new Object[] {1000000000000000L, 0L}, "intix", 1, 2, "idx_long_int_string", 17, 16, 15, 14, 13, 12, 8, 7, 6, 5, 4, 3);
         inAndBetweenQuery("longix", Arrays.asList(new Object[] {1000000000000000L, 0L}), "stringix", "1", "4", "idx_long_int_string", 17, 16, 14, 13, 11, 10, 8, 7, 5, 4, 2, 1);
@@ -174,6 +179,7 @@ create table longintstringix (
     }
 
     public void testBetweenAndInAscending() {
+        logger.info("QueryOrderingTest.testBetweenAndInAscending");
         setOrdering(Ordering.ASCENDING, "longix", "intix");
         betweenAndInQuery("longix", 0L, 1000000000000000L, "intix", new Object[] {2, 0}, "idx_long_int_string", 0, 1, 2, 6, 7, 8, 9, 10, 11, 15, 16, 17);
         betweenAndInQuery("longix", 1000000000000000L, 2000000000000000L, "intix", Arrays.asList(new Object[] {2, 1}), "idx_long_int_string", 12, 13, 14, 15, 16, 17, 21, 22, 23, 24);
@@ -181,6 +187,7 @@ create table longintstringix (
     }
 
     public void testBetweenAndInDescending() {
+        logger.info("QueryOrderingTest.testBetweenAndInDescending");
         setOrdering(Ordering.DESCENDING, "longix", "intix", "stringix");
         betweenAndInQuery("longix", 0L, 1000000000000000L, "intix", new Object[] {2, 0}, "idx_long_int_string", 17, 16, 15, 11, 10, 9, 8, 7, 6, 2, 1, 0);
         betweenAndInQuery("longix", 1000000000000000L, 2000000000000000L, "intix", Arrays.asList(new Object[] {2, 1}), "idx_long_int_string", 24, 23, 22, 21, 17, 16, 15, 14, 13, 12);

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	2012-05-31 00:47:21 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	2012-06-10 20:51:00 +0000
@@ -96,6 +96,8 @@ public class NdbRecordIndexScanOperation
     /** The buffers used for bounds; held here to prevent garbage collection */
     List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
 
+    private Index index;
+
     public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
             Index storeIndex, Table storeTable, int lockMode) {
         this(clusterTransaction, storeIndex, storeTable, false, lockMode);
@@ -104,6 +106,7 @@ public class NdbRecordIndexScanOperation
     public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
                 Index storeIndex, Table storeTable, boolean multiRange, int lockMode) {
         super(clusterTransaction, storeTable, lockMode);
+        this.index = storeIndex;
         this.multiRange = multiRange;
         if (this.multiRange) {
             ndbIndexBoundList = new ArrayList<NdbIndexScanOperation.IndexBound>();
@@ -121,7 +124,7 @@ public class NdbRecordIndexScanOperation
     public void endDefinition() {
         // get the scan options which also sets the filter
         getScanOptions();
-        if (logger.isDetailEnabled()) logger.detail("scan options present " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
+        if (logger.isDetailEnabled()) logger.detail("scan index '" + index.getName() + "' with options " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
 
         // create the scan operation
         ndbIndexScanOperation = clusterTransaction.scanIndex(

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	2012-05-31 20:42:32 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	2012-06-10 21:08:50 +0000
@@ -138,10 +138,11 @@ public abstract class NdbRecordScanOpera
                 options |= Type.SO_SCANFLAGS;
                 switch (ordering) {
                     case ASCENDING:
-                        flags = ScanFlag.SF_OrderBy;
+                        flags |= ScanFlag.SF_OrderBy;
                         break;
                     case DESCENDING:
-                        flags = ScanFlag.SF_Descending;
+                        flags |= ScanFlag.SF_Descending;
+                        flags |= ScanFlag.SF_OrderBy;
                         break;
                     default:
                         throw new ClusterJFatalInternalException(local.message("ERR_Invalid_Ordering", ordering));
@@ -150,6 +151,7 @@ public abstract class NdbRecordScanOpera
             if (multiRange) {
                 options |= Type.SO_SCANFLAGS;
                 flags |= ScanFlag.SF_MultiRange;
+                flags |= ScanFlag.SF_ReadRangeNo;
             }
             if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
                 options |= Type.SO_SCANFLAGS;

=== modified file 'storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java'
--- a/storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java	2012-04-14 21:37:35 +0000
+++ b/storage/ndb/clusterj/clusterj-unit/src/main/java/junit/framework/TestCase.java	2012-06-10 20:51:00 +0000
@@ -46,7 +46,6 @@ public abstract class TestCase implement
      * to result.errors.
      */
     public void run(TestResult result) {
-//        System.out.println("--> TestCase.run(TestResult): " + name);
         TestListener listener = result.listener;
         listener.startTest(this);
         try {
@@ -89,4 +88,7 @@ public abstract class TestCase implement
         return 0;
     }
 
+    public String toString() {
+        return method.getDeclaringClass().getPackage().getName() + "." + name;
+    }
 }

=== modified file 'storage/ndb/include/ndb_config.h.in'
--- a/storage/ndb/include/ndb_config.h.in	2012-02-07 15:41:33 +0000
+++ b/storage/ndb/include/ndb_config.h.in	2012-06-11 13:20:48 +0000
@@ -17,6 +17,7 @@
 
 #cmakedefine HAVE_POSIX_MEMALIGN 1
 #cmakedefine HAVE_CLOCK_GETTIME 1
+#cmakedefine HAVE_NANOSLEEP 1
 #cmakedefine HAVE_PTHREAD_CONDATTR_SETCLOCK 1
 #cmakedefine HAVE_PTHREAD_SELF 1
 #cmakedefine HAVE_SCHED_GET_PRIORITY_MIN 1

=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp	2012-01-20 06:22:16 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp	2012-06-11 14:57:25 +0000
@@ -1935,6 +1935,12 @@ private:
    *   Returns NULL if none found
    */
   NdbTransaction* getConnectedNdbTransaction(Uint32 nodeId, Uint32 instance);
+  /**
+   * Handle Connection Array lists
+   */
+  void appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId);
+  void prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId);
+  void removeConnectionArray(NdbTransaction *first, Uint32 nodeId);
 
   // Release and disconnect from DBTC a connection
   // and seize it to theConIdleList
@@ -2020,6 +2026,7 @@ private:
 
   NdbTransaction*	theTransactionList;
   NdbTransaction**      theConnectionArray;
+  NdbTransaction**      theConnectionArrayLast;
 
   Uint32   theMyRef;        // My block reference  
   Uint32   theNode;         // The node number of our node

=== modified file 'storage/ndb/include/portlib/NdbSleep.h'
--- a/storage/ndb/include/portlib/NdbSleep.h	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/portlib/NdbSleep.h	2012-06-11 13:20:48 +0000
@@ -20,6 +20,54 @@
 
 #include <ndb_global.h>
 
+static inline void NdbSleep_MilliSleep(int milliseconds);
+
+static inline
+void NdbSleep_MicroSleep(int microseconds)
+{
+  assert(0 < microseconds);
+#ifdef _WIN32
+  // Waitable timer use 100ns time unit, negative for relative time periods
+  LARGE_INTEGER liDueTime;
+  liDueTime.QuadPart = -10LL * microseconds;
+
+  HANDLE hTimer = CreateWaitableTimer(NULL, TRUE, NULL);
+  if (NULL == hTimer ||
+      !SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0) ||
+      WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0)
+  {
+#ifndef NDEBUG
+    // Error code for crash analysis
+    DWORD winerr = GetLastError();
+#endif
+    assert(false);
+    // Fallback to millisleep in release
+    NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+  }
+  if (NULL != hTimer)
+  {
+    CloseHandle(hTimer);
+  }
+#elif defined(HAVE_NANOSLEEP)
+  struct timespec t;
+  t.tv_sec = microseconds / 1000000;
+  t.tv_nsec = 1000 * (microseconds % 1000000);
+  while (nanosleep(&t, &t) == -1)
+  {
+    if (errno != EINTR)
+    {
+      assert(false);
+      // Fallback to millisleep in release
+      NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+      return ;
+    }
+  }
+#else
+  // Fallback to millisleep
+  NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+#endif
+}
+
 static inline
 void NdbSleep_MilliSleep(int milliseconds)
 {

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-30 15:12:41 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-06-11 14:57:25 +0000
@@ -294,6 +294,13 @@ public:
   const NodeBitmask& get_status_overloaded() const;
   
   /**
+   * Set or clear slowdown bit.
+   * Query if any slowdown bit is set.
+   */
+  void set_status_slowdown(Uint32 nodeId, bool val);
+  const NodeBitmask& get_status_slowdown() const;
+ 
+  /**
    * prepareSend
    *
    * When IOState is HaltOutput or HaltIO do not send or insert any 
@@ -433,8 +440,10 @@ private:
 
   /**
    * Overloaded bits, for fast check.
+   * Similarly slowdown bits for fast check.
    */
   NodeBitmask m_status_overloaded;
+  NodeBitmask m_status_slowdown;
 
   /**
    * Unpack signal data.
@@ -593,6 +602,8 @@ TransporterRegistry::set_status_overload
   assert(nodeId < MAX_NODES);
   if (val != m_status_overloaded.get(nodeId))
     m_status_overloaded.set(nodeId, val);
+  if (val)
+    set_status_slowdown(nodeId, val);
 }
 
 inline const NodeBitmask&
@@ -601,4 +612,18 @@ TransporterRegistry::get_status_overload
   return m_status_overloaded;
 }
 
+inline void
+TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
+{
+  assert(nodeId < MAX_NODES);
+  if (val != m_status_slowdown.get(nodeId))
+    m_status_slowdown.set(nodeId, val);
+}
+
+inline const NodeBitmask&
+TransporterRegistry::get_status_slowdown() const
+{
+  return m_status_slowdown;
+}
+
 #endif // Define of TransporterRegistry_H

=== modified file 'storage/ndb/ndb_configure.cmake'
--- a/storage/ndb/ndb_configure.cmake	2012-02-07 19:40:05 +0000
+++ b/storage/ndb/ndb_configure.cmake	2012-06-11 14:57:25 +0000
@@ -47,6 +47,7 @@ INCLUDE(ndb_require_variable)
 
 CHECK_FUNCTION_EXISTS(posix_memalign HAVE_POSIX_MEMALIGN)
 CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
 CHECK_FUNCTION_EXISTS(pthread_condattr_setclock HAVE_PTHREAD_CONDATTR_SETCLOCK)
 CHECK_FUNCTION_EXISTS(pthread_self HAVE_PTHREAD_SELF)
 CHECK_FUNCTION_EXISTS(sched_get_priority_min HAVE_SCHED_GET_PRIORITY_MIN)

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-01-17 08:33:59 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-06-11 14:57:25 +0000
@@ -112,6 +112,10 @@ TCP_Transporter::TCP_Transporter(Transpo
   setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
 
   m_overload_limit = overload_limit(conf);
+  /**
+   * Always set slowdown limit to 60% of overload limit
+   */
+  m_slowdown_limit = m_overload_limit * 6 / 10;
 }
 
 

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2012-05-07 07:51:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2012-06-11 14:57:25 +0000
@@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi
   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
-    m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
+    m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+    isMgmConnection(_isMgmConnection),
     m_connected(false),
     m_type(_type),
     m_transporter_registry(t_reg)

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2012-05-07 07:51:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2012-06-11 14:57:25 +0000
@@ -90,6 +90,8 @@ public:
   {
     m_transporter_registry.set_status_overloaded(remoteNodeId,
                                                  used >= m_overload_limit);
+    m_transporter_registry.set_status_slowdown(remoteNodeId,
+                                               used >= m_slowdown_limit);
   }
 
   virtual int doSend() = 0;
@@ -155,6 +157,7 @@ protected:
   Uint32 m_max_send_buffer;
   /* Overload limit, as configured with the OverloadLimit config parameter. */
   Uint32 m_overload_limit;
+  Uint32 m_slowdown_limit;
 
 private:
 

=== modified file 'storage/ndb/src/common/util/HashMap2.cpp'
--- a/storage/ndb/src/common/util/HashMap2.cpp	2011-09-07 22:50:01 +0000
+++ b/storage/ndb/src/common/util/HashMap2.cpp	2012-06-07 14:06:59 +0000
@@ -157,14 +157,14 @@ TAPTEST(HashMap2)
     /* Test iterator Api */
     HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods>::Iterator it(hash1);
 
-    IntIntKVPod* j;
+    IntIntKVPod* k;
     for (int i=0; i < 2; i++)
     {
       int count = 0;
-      while((j = it.next()))
+      while((k = it.next()))
       {
-        OK( j->b == ((j->a * 3) - i) );
-        j->b--;
+        OK( k->b == ((k->a * 3) - i) );
+        k->b--;
         count++;
       }
       OK( count == 100 );

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-05-31 15:07:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-06-11 14:57:25 +0000
@@ -11361,13 +11361,13 @@ void Dblqh::scanTupkeyConfLab(Signal* si
   scanptr.p->m_curr_batch_size_rows = rows + 1;
   scanptr.p->m_last_row = tdata5;
 
-  const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+  const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown();
   if (unlikely(!all.isclear()))
   {
     if (all.get(refToNode(scanptr.p->scanApiBlockref)))
     {
       /**
-       * End scan batch if transporter-buffer are overloaded
+       * End scan batch if transporter-buffer are in slowdown state
        *
        * TODO: We should have counters for this...
        */

=== modified file 'storage/ndb/src/ndbapi/Ndb.cpp'
--- a/storage/ndb/src/ndbapi/Ndb.cpp	2011-10-17 18:13:57 +0000
+++ b/storage/ndb/src/ndbapi/Ndb.cpp	2012-06-11 14:57:25 +0000
@@ -162,6 +162,8 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in
         if (prev != 0)
         {
           prev->theNext = curr->theNext;
+          if (!curr->theNext)
+            theConnectionArrayLast[tNode] = prev;
           curr->theNext = tConArray;
           theConnectionArray[tNode] = curr;
         }
@@ -210,12 +212,10 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in
     //************************************************
     // Send and receive was successful
     //************************************************
-    NdbTransaction* tPrevFirst = theConnectionArray[tNode];
     tNdbCon->setConnectedNodeId(tNode, nodeSequence);
     
     tNdbCon->setMyBlockReference(theMyRef);
-    theConnectionArray[tNode] = tNdbCon;
-    tNdbCon->theNext = tPrevFirst;
+    prependConnectionArray(tNdbCon, tNode);
     DBUG_RETURN(1);
   } else {
 //****************************************************************************
@@ -261,6 +261,8 @@ Ndb::getConnectedNdbTransaction(Uint32 n
         {
           assert(false); // Should have been moved in NDB_connect
           prev->theNext = next->theNext;
+          if (!next->theNext)
+            theConnectionArrayLast[nodeId] = prev;
           goto found_middle;
         }
         else
@@ -276,7 +278,7 @@ Ndb::getConnectedNdbTransaction(Uint32 n
     return 0;
   }
 found_first:
-  theConnectionArray[nodeId] = next->theNext;
+  removeConnectionArray(next, nodeId);
 found_middle:
   next->theNext = NULL;
 
@@ -944,6 +946,48 @@ Ndb::startTransactionLocal(Uint32 aPrior
   DBUG_RETURN(tConnection);
 }//Ndb::startTransactionLocal()
 
+void
+Ndb::appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId)
+{
+  NdbTransaction *last = theConnectionArrayLast[nodeId];
+  if (last)
+  {
+    last->theNext = aCon;
+  }
+  else
+  {
+    theConnectionArray[nodeId] = aCon;
+  }
+  aCon->theNext = NULL;
+  theConnectionArrayLast[nodeId] = aCon;
+}
+
+void
+Ndb::prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId)
+{
+  NdbTransaction *first = theConnectionArray[nodeId];
+  aCon->theNext = first;
+  if (!first)
+  {
+    theConnectionArrayLast[nodeId] = aCon;
+  }
+  theConnectionArray[nodeId] = aCon;
+}
+
+void
+Ndb::removeConnectionArray(NdbTransaction *first, Uint32 nodeId)
+{
+  NdbTransaction *next = first->theNext;
+  if (!next)
+  {
+    theConnectionArray[nodeId] = theConnectionArrayLast[nodeId] = NULL;
+  }
+  else
+  {
+    theConnectionArray[nodeId] = next;
+  }
+}
+
 /*****************************************************************************
 void closeTransaction(NdbTransaction* aConnection);
 
@@ -1044,8 +1088,8 @@ Ndb::closeTransaction(NdbTransaction* aC
     /**
      * Put it back in idle list for that node
      */
-    aConnection->theNext = theConnectionArray[nodeId];
-    theConnectionArray[nodeId] = aConnection;
+    appendConnectionArray(aConnection, nodeId);
+
     DBUG_VOID_RETURN;
   } else {
     aConnection->theReleaseOnClose = false;

=== modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp'
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp	2011-10-17 18:13:57 +0000
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp	2012-06-11 14:57:25 +0000
@@ -61,6 +61,7 @@ void Ndb::setup(Ndb_cluster_connection *
   theMinNoOfEventsToWakeUp= 0;
   theTransactionList= NULL;
   theConnectionArray= NULL;
+  theConnectionArrayLast= NULL;
   the_last_check_time= 0;
   theFirstTransId= 0;
   theRestartGCI= 0;
@@ -83,13 +84,15 @@ void Ndb::setup(Ndb_cluster_connection *
   theError.code = 0;
 
   theConnectionArray = new NdbConnection * [MAX_NDB_NODES];
+  theConnectionArrayLast = new NdbConnection * [MAX_NDB_NODES];
   theCommitAckSignal = NULL;
   theCachedMinDbNodeVersion = 0;
   
   int i;
   for (i = 0; i < MAX_NDB_NODES ; i++) {
     theConnectionArray[i] = NULL;
-  }//forg
+    theConnectionArrayLast[i] = NULL;
+  }//for
   m_sys_tab_0 = NULL;
 
   theImpl->m_dbname.assign(aDataBase);
@@ -158,6 +161,7 @@ Ndb::~Ndb()
   releaseTransactionArrays();
 
   delete []theConnectionArray;
+  delete []theConnectionArrayLast;
   if(theCommitAckSignal != NULL){
     delete theCommitAckSignal; 
     theCommitAckSignal = NULL;

=== modified file 'storage/ndb/src/ndbapi/Ndblist.cpp'
--- a/storage/ndb/src/ndbapi/Ndblist.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/Ndblist.cpp	2012-06-11 14:57:25 +0000
@@ -44,6 +44,7 @@ Ndb::checkFailedNode()
        */
       NdbTransaction * tNdbCon = theConnectionArray[node_id];
       theConnectionArray[node_id] = NULL;
+      theConnectionArrayLast[node_id] = NULL;
       while (tNdbCon != NULL) {
         NdbTransaction* tempNdbCon = tNdbCon;
         tNdbCon = tNdbCon->next();

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2012-02-14 08:16:48 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2012-06-11 13:30:32 +0000
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2003, 2011, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -24,6 +24,8 @@
 #include <md5_hash.hpp>
 
 #include <NdbThread.h>
+#include <NdbMutex.h>
+#include <NdbCondition.h>
 #include <NdbSleep.h>
 #include <NdbTick.h>
 #include <NdbOut.hpp>
@@ -39,6 +41,9 @@
 #define MAXATTR 511 
 #define MAXTABLES 1
 #define NDB_MAXTHREADS 128
+#define MAX_EXECUTOR_THREADS 128
+#define MAX_DEFINER_THREADS 32
+#define MAX_REAL_THREADS 160
 #define NDB_MAX_NODES 48
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -52,22 +57,22 @@
 #define PKSIZE 2
 
 enum StartType { 
-  stIdle, 
-  stInsert,
-  stRead,
-  stUpdate,
-  stDelete, 
-  stStop 
+  stIdle = 0,
+  stInsert = 1,
+  stRead = 2,
+  stUpdate = 3,
+  stDelete = 4,
+  stStop = 5
 } ;
 
 enum RunType {
-  RunInsert,
-  RunRead,
-  RunUpdate,
-  RunDelete,
-  RunCreateTable,
-  RunDropTable,
-  RunAll
+  RunInsert = 1,
+  RunRead = 2,
+  RunUpdate = 3,
+  RunDelete = 4,
+  RunCreateTable = 5,
+  RunDropTable = 6,
+  RunAll = 7
 };
 
 struct ThreadNdb
@@ -85,7 +90,7 @@ static void dropTables(Ndb* pMyNdb);
 static int createTables(Ndb*);
 static void defineOperation(NdbConnection* aTransObject, StartType aType, 
                             Uint32 base, Uint32 aIndex);
-static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType, 
+static void defineNdbRecordOperation(char*, NdbConnection* aTransObject, StartType aType,
                             Uint32 base, Uint32 aIndex);
 static void execute(StartType aType);
 static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
@@ -97,17 +102,20 @@ static bool error_handler(const NdbError
 static void input_error();
 static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
 
+static void main_thread(RunType run_type, NdbTimer & timer);
+static Uint64 get_total_transactions();
+static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
 
 static int                              retry_opt = 3 ;
 static int                              failed = 0 ;
                 
 ErrorData * flexAsynchErrorData;                        
 
-static NdbThread*               threadLife[NDB_MAXTHREADS];
+static NdbThread*                       threadLife[MAX_REAL_THREADS];
 static int                              tNodeId;
-static int                              ThreadReady[NDB_MAXTHREADS];
-static longlong                 ThreadExecutions[NDB_MAXTHREADS];
-static StartType                ThreadStart[NDB_MAXTHREADS];
+static int                              ThreadReady[MAX_REAL_THREADS];
+static longlong                         ThreadExecutions[MAX_REAL_THREADS];
+static StartType                        ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
 static const NdbDictionary::Table *     tables[MAXTABLES];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
@@ -136,6 +144,8 @@ static unsigned int             tLoadFac
 static bool                     tempTable = false;
 static bool                     startTransGuess = true;
 static int                      tExtraReadLoop = 0;
+static bool                     tNew = false;
+static bool                     tImmediate = false;
 
 //Program Flags
 static int                              theTestFlag = 0;
@@ -177,13 +187,13 @@ resetThreads(){
 }
 
 static void 
-waitForThreads(void)
+waitForThreads(Uint32 num_threads_to_wait_for)
 {
   int cont = 0;
   do {
     cont = 0;
     NdbSleep_MilliSleep(20);
-    for (unsigned i = 0; i < tNoOfThreads ; i++) {
+    for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
       if (ThreadReady[i] == 0) {
         cont = 1;
       }//if
@@ -204,9 +214,8 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
 {
   ndb_init();
   ThreadNdb*            pThreadData;
-  int                   tLoops=0;
-  int                   returnValue = NDBT_OK;
   DEFINE_TIMER;
+  int                   returnValue = NDBT_OK;
 
   flexAsynchErrorData = new ErrorData;
   flexAsynchErrorData->resetErrorCounters();
@@ -256,7 +265,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
 
   ndbout << endl;
 
-  NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
+  NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
 
   /* print Setting */
   flexAsynchErrorData->printSettings(ndbout);
@@ -311,7 +320,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
-  if (tNdbRecord)
+  if (tNdbRecord && !tNew)
   {
     Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
     sz += 3;
@@ -325,260 +334,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   if(returnValue == NDBT_OK &&
      tRunType != RunCreateTable &&
      tRunType != RunDropTable){
-    /****************************************************************
-     *  Create NDB objects.                                   *
-     ****************************************************************/
-    resetThreads();
-    for (Uint32 i = 0; i < tNoOfThreads ; i++) {
-      pThreadData[i].ThreadNo = i;
-      threadLife[i] = NdbThread_Create(threadLoop,
-                                       (void**)&pThreadData[i],
-                                       32768,
-                                       "flexAsynchThread",
-                                       NDB_THREAD_PRIO_LOW);
-    }//for
-    ndbout << endl <<  "All NDB objects and table created" << endl << endl;
-    int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
-    /****************************************************************
-     * Execute program.                                             *
-     ****************************************************************/
-
-    for(;;) {
-
-      int loopCount = tLoops + 1 ;
-      ndbout << endl << "Loop # " << loopCount  << endl << endl ;
-
-      /****************************************************************
-       * Perform inserts.                                             *
-       ****************************************************************/
-          
-      failed = 0 ;
-      if (tRunType == RunAll || tRunType == RunInsert){
-        ndbout << "Executing inserts" << endl;
-        START_TIMER;
-        execute(stInsert);
-        STOP_TIMER;
-      }
-      if (tRunType == RunAll){
-        a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
-        if (0 < failed) {
-          int i = retry_opt ;
-          int ci = 1 ;
-          while (0 < failed && 0 < i){
-            ndbout << failed << " of the transactions returned errors!" 
-                   << endl << endl;
-            ndbout << "Attempting to redo the failed transactions now..." 
-                   << endl ;
-            ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
-                   << endl << endl;
-            failed = 0 ;
-            START_TIMER;
-            execute(stInsert);
-            STOP_TIMER;
-            PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-            i-- ;
-            ci++;
-          }
-          if(0 == failed ){
-            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-          }else{
-            ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
-                   << endl;
-          }//if
-        }//if
-      }//if  
-      /****************************************************************
-       * Perform read.                                                *
-       ****************************************************************/
-      
-      failed = 0 ;
-
-      if (tRunType == RunAll || tRunType == RunRead){
-        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-        {
-          ndbout << "Executing reads" << endl;
-          START_TIMER;
-          execute(stRead);
-          STOP_TIMER;
-          if (tRunType == RunAll){
-            a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          }//if
-        }//for
-      }//if
-
-      if (tRunType == RunAll){
-        if (0 < failed) {
-          int i = retry_opt ;
-          int cr = 1;
-          while (0 < failed && 0 < i){
-            ndbout << failed << " of the transactions returned errors!"<<endl ;
-            ndbout << endl;
-            ndbout <<"Attempting to redo the failed transactions now..." << endl;
-            ndbout << endl;
-            ndbout <<"Redo attempt " << cr <<" out of ";
-            ndbout << retry_opt << endl << endl;
-            failed = 0 ;
-            START_TIMER;
-            execute(stRead);
-            STOP_TIMER;
-            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-            i-- ;
-            cr++ ;
-          }//while
-          if(0 == failed ) {
-            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-          }else{
-            ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
-          }//if
-        }//if
-      }//if
-          
-          
-      /****************************************************************
-       * Perform update.                                              *
-       ****************************************************************/
-      
-      failed = 0 ;
-
-      if (tRunType == RunAll || tRunType == RunUpdate){
-        ndbout << "Executing updates" << endl;
-        START_TIMER;
-        execute(stUpdate);
-        STOP_TIMER;
-      }//if
-      if (tRunType == RunAll){
-        a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
-        if (0 < failed) {
-          int i = retry_opt ;
-          int cu = 1 ;
-          while (0 < failed && 0 < i){
-            ndbout << failed << " of the transactions returned errors!"<<endl ;
-            ndbout << endl;
-            ndbout <<"Attempting to redo the failed transactions now..." << endl;
-            ndbout << endl <<"Redo attempt " << cu <<" out of ";
-            ndbout << retry_opt << endl << endl;
-            failed = 0 ;
-            START_TIMER;
-            execute(stUpdate);
-            STOP_TIMER;
-            PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
-            i-- ;
-            cu++ ;
-          }//while
-          if(0 == failed ){
-            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-          } else {
-            ndbout << endl;
-            ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
-          }//if
-        }//if
-      }//if
-          
-      /****************************************************************
-       * Perform read.                                             *
-       ****************************************************************/
-      
-      failed = 0 ;
-
-      if (tRunType == RunAll){
-        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-        {
-          ndbout << "Executing reads" << endl;
-          START_TIMER;
-          execute(stRead);
-          STOP_TIMER;
-          a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-        }        
-
-        if (0 < failed) {
-          int i = retry_opt ;
-          int cr2 = 1 ;
-          while (0 < failed && 0 < i){
-            ndbout << failed << " of the transactions returned errors!"<<endl ;
-            ndbout << endl;
-            ndbout <<"Attempting to redo the failed transactions now..." << endl;
-            ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
-            ndbout << retry_opt << endl << endl;
-            failed = 0 ;
-            START_TIMER;
-            execute(stRead);
-            STOP_TIMER;
-            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-            i-- ;
-            cr2++ ;
-          }//while
-          if(0 == failed ){
-            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-          }else{
-            ndbout << endl;
-            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
-          }//if
-        }//if
-      }//if
-          
-
-      /****************************************************************
-       * Perform delete.                                              *
-       ****************************************************************/
-      
-      failed = 0 ;
-          
-      if (tRunType == RunAll || tRunType == RunDelete){
-        ndbout << "Executing deletes" << endl;
-        START_TIMER;
-        execute(stDelete);
-        STOP_TIMER;
-      }//if
-      if (tRunType == RunAll){
-        a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
-        if (0 < failed) {
-          int i = retry_opt ;
-          int cd = 1 ;
-          while (0 < failed && 0 < i){
-            ndbout << failed << " of the transactions returned errors!"<< endl ;
-            ndbout << endl;
-            ndbout <<"Attempting to redo the failed transactions now:" << endl ;
-            ndbout << endl <<"Redo attempt " << cd <<" out of ";
-            ndbout << retry_opt << endl << endl;
-            failed = 0 ;
-            START_TIMER;
-            execute(stDelete);
-            STOP_TIMER;
-            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-            i-- ;
-            cd++ ;
-          }//while
-          if(0 == failed ){
-            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-          }else{
-            ndbout << endl;
-            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
-          }//if
-        }//if
-      }//if
-          
-      tLoops++;
-      ndbout << "--------------------------------------------------" << endl;
-    
-      if(tNoOfLoops != 0){
-        if(tNoOfLoops <= tLoops)
-          break ;
-      }
-    }//for
-    
-    execute(stStop);
-    void * tmp;
-    for(Uint32 i = 0; i<tNoOfThreads; i++){
-      NdbThread_WaitFor(threadLife[i], &tmp);
-      NdbThread_Destroy(&threadLife[i]);
+    if (tNew)
+    {
+      main_thread(tRunType, timer);
+    }
+    else
+    {
+      run_old_flexAsynch(pThreadData, timer);
     }
   }
 
@@ -609,27 +371,43 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       tRunType == RunUpdate ||
       tRunType == RunDelete)
   {
-    longlong total_transactions = 0;
-    longlong exec_time;
+    Uint64 total_transactions = 0;
+    Uint64 exec_time;
 
-    if (tRunType == RunInsert || tRunType == RunDelete) {
-      total_transactions = (longlong)tNoOfTransactions;
-      total_transactions *= (longlong)tNoOfThreads;
-      total_transactions *= (longlong)tNoOfParallelTrans;
-    } else {
-      for (Uint32 i = 0; i < tNoOfThreads; i++){
-        total_transactions += ThreadExecutions[i];
+    if (tNew)
+    {
+      total_transactions = get_total_transactions();
+    }
+    else
+    {
+      if (tRunType == RunInsert || tRunType == RunDelete)
+      {
+        total_transactions = (longlong)tNoOfTransactions;
+        total_transactions *= (longlong)tNoOfThreads;
+        total_transactions *= (longlong)tNoOfParallelTrans;
+      }
+      else
+      {
+        for (Uint32 i = 0; i < tNoOfThreads; i++)
+        {
+          total_transactions += ThreadExecutions[i];
+        }
       }
     }
     if (tRunType == RunInsert || tRunType == RunDelete) {
-      exec_time = (longlong)timer.elapsedTime();
+      exec_time = (Uint64)timer.elapsedTime();
     } else {
-      exec_time = (longlong)tExecutionTime * 1000;
+      exec_time = (Uint64)tExecutionTime * 1000;
     }
     ndbout << "Total number of transactions is " << total_transactions;
     ndbout << endl;
     ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
 
+    if (!exec_time)
+    {
+      exec_time = 1; /* Avoid floating point exception */
+      ndbout_c("Zero execution time!!!");
+    }
     total_transactions = (total_transactions * 1000) / exec_time;
     int trans_per_sec = (int)total_transactions;
     ndbout << "Total transactions per second " << trans_per_sec << endl;
@@ -645,7 +423,7 @@ static void execute(StartType aType)
 {
   resetThreads();
   tellThreads(aType);
-  waitForThreads();
+  waitForThreads(tNoOfThreads);
 }//execute()
 
 static void*
@@ -794,7 +572,7 @@ executeTrans(ThreadNdb* pThread,
         // Define the operation, but do not execute it yet.
         //-------------------------------------------------------
         if (tNdbRecord)
-          defineNdbRecordOperation(pThread, 
+          defineNdbRecordOperation(pThread->record,
                                    tConArray[num_ops],
                                    aType,
                                    threadBaseLoc2,
@@ -944,24 +722,31 @@ executeCallback(int result, NdbConnectio
   NdbConnection **array_ref = (NdbConnection**)aObject;
   assert(NdbObject == *array_ref);
   *array_ref = NULL;
-  if (result == -1) {
-
-              // Add complete error handling here
+  if (result == -1 && failed < 100)
+  {
 
-              int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
-              if (retCode == 1) {
-		if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
-                ndbout_c("execute: %s", NdbObject->getNdbError().message);
-                ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
-              } else if (retCode == 2) {
-                ndbout << "4115 should not happen in flexAsynch" << endl;
-              } else if (retCode == 3) {
-		/* What can we do here? */
-                ndbout_c("execute: %s", NdbObject->getNdbError().message);
-                 }//if(retCode == 3)
+    // Add complete error handling here
 
-	      //    ndbout << "Error occured in poll:" << endl;
-	      //    ndbout << NdbObject->getNdbError() << endl;
+    int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
+    if (retCode == 1)
+    {
+      if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630)
+      {
+        ndbout_c("execute: %s", NdbObject->getNdbError().message);
+        ndbout_c("Error code = %d", NdbObject->getNdbError().code);
+      }
+    }
+    else if (retCode == 2)
+    {
+      ndbout << "4115 should not happen in flexAsynch" << endl;
+    }
+    else if (retCode == 3)
+    {
+      /* What can we do here? */
+      ndbout_c("execute: %s", NdbObject->getNdbError().message);
+    }//if(retCode == 3)
+    //    ndbout << "Error occured in poll:" << endl;
+    //    ndbout << NdbObject->getNdbError() << endl;
     failed++ ;
   }//if
   NdbObject->close(); /* Close transaction */
@@ -1067,11 +852,12 @@ defineOperation(NdbConnection* localNdbC
 
 
 static void
-defineNdbRecordOperation(ThreadNdb* pThread, 
-			 NdbConnection* pTrans, StartType aType,
-			 Uint32 threadBase, Uint32 aIndex)
+defineNdbRecordOperation(char *record,
+                         NdbConnection* pTrans,
+                         StartType aType,
+                         Uint32 threadBase,
+                         Uint32 aIndex)
 {
-  char * record = pThread->record;
   Uint32 offset;
   NdbDictionary::getOffset(g_record[0], 0, offset);
   * (Uint32*)(record + offset) = threadBase;
@@ -1102,15 +888,24 @@ defineNdbRecordOperation(ThreadNdb* pThr
     break;
   }//case
   case stRead: {     // Read Case
-    op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead);
+    op = pTrans->readTuple(g_record[0],
+                           record,
+                           g_record[0],
+                           record,
+                           NdbOperation::LM_CommittedRead);
     break;
   }//case
   case stUpdate:{    // Update Case
-    op = pTrans->updateTuple(g_record[0],record,g_record[0],record);    
+    op = pTrans->updateTuple(g_record[0],
+                             record,
+                             g_record[0],
+                             record);
     break;
   }//case
   case stDelete: {   // Delete Case
-    op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+    op = pTrans->deleteTuple(g_record[0],
+                             record,
+                             g_record[0]);
     break;
   }//case
   default: {
@@ -1193,6 +988,25 @@ setUpNodeTableArray(Uint32 tableNo, cons
 }
 
 static Uint32
+get_node_relative_id(Uint32 tableNo, Uint32 node_id)
+{
+  Uint32 rel_id = 0;
+
+  for (Uint32 i = 1; i < node_id; i++)
+  {
+    if (nodeTableArray[tableNo][i])
+      rel_id++;
+  }
+  return rel_id;
+}
+
+static Uint32
+get_node_count(Uint32 tableNo)
+{
+  return get_node_relative_id(tableNo, NDB_MAX_NODES + 1);
+}
+
+static Uint32
 get_my_node_id(Uint32 tableNo, Uint32 threadNo)
 {
   Uint32 count = 0;
@@ -1327,6 +1141,859 @@ setAggregateRun(void)
   theTableCreateFlag = 1;
 }
 
+/* Start NEW Module */
+
+/**
+ * This part contains the code used for the case --local 4 which is using
+ * the design pattern that could be used for asynchronous applications of
+ * the NDB API.
+ *
+ * This variant will always use transaction hints, it will always the
+ * NDB Record format in the NDB API.
+ */
+
+static void* definer_thread(void *data);
+static void* executor_thread(void *data);
+
+static Uint32 tNoOfExecutorThreads = 0;
+static Uint32 tNoOfDefinerThreads = 0;
+
+enum RunState
+{
+  WARMUP = 0,
+  EXECUTING = 1,
+  COOLDOWN = 2
+};
+
+RunState tRunState = WARMUP;
+
+typedef struct KeyOperation KEY_OPERATION;
+struct KeyOperation
+{
+  Uint32 first_key;
+  Uint32 second_key;
+  Uint32 definer_thread_id;
+  Uint32 executor_thread_id;
+  RunType operation_type;
+  KEY_OPERATION *next_key_op;
+};
+
+typedef struct key_list_header KEY_LIST_HEADER;
+struct key_list_header
+{
+  KEY_OPERATION *first_in_list;
+  KEY_OPERATION *last_in_list;
+  Uint32 num_in_list;
+};
+
+
+typedef struct thread_data_struct THREAD_DATA;
+struct thread_data_struct
+{
+  KEY_LIST_HEADER list_header;
+  Uint32 thread_id;
+  bool ready;
+  bool stop;
+  bool start;
+
+  char *record;
+  NdbMutex *transport_mutex;
+  struct NdbCondition *transport_cond;
+  struct NdbCondition *main_cond;
+  struct NdbCondition *start_cond;
+  char not_used[52];
+};
+THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
+
+static Uint64
+get_total_transactions()
+{
+  Uint64 total_transactions = 0;
+
+  for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+  {
+    total_transactions += ThreadExecutions[i];
+  }
+  return total_transactions;
+}
+
+static void
+init_list_headers(KEY_LIST_HEADER *list_header,
+                  Uint32 num_list_headers)
+{
+  Uint32 i;
+  KEY_LIST_HEADER *list_header_ref;
+  char *list_header_ptr = (char*)list_header;
+  for (i = 0;
+       i < num_list_headers;
+       i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
+  {
+    list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
+    list_header_ref->first_in_list = NULL;
+    list_header_ref->last_in_list = NULL;
+    list_header_ref->num_in_list = 0;
+  }
+}
+
+static void
+wait_thread_ready(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  while (1)
+  {
+    if (my_thread_data->ready)
+      break;
+    NdbCondition_Wait(my_thread_data->main_cond,
+                      my_thread_data->transport_mutex);
+  }
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+wait_for_threads_ready(Uint32 num_threads)
+{
+  for (Uint32 i = 0; i < num_threads; i++)
+    wait_thread_ready(&thread_data_array[i]);
+}
+
+static void
+signal_thread_to_start(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->start = true;
+  my_thread_data->ready = false;
+  NdbCondition_Signal(my_thread_data->start_cond);
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_start()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+    signal_thread_to_start(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_start()
+{
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+    signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
+}
+
+static void
+signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->ready = true;
+  NdbCondition_Signal(my_thread_data->main_cond);
+  while (1)
+  {
+    if (my_thread_data->start)
+      break;
+    NdbCondition_Wait(my_thread_data->start_cond,
+                      my_thread_data->transport_mutex);
+  }
+  my_thread_data->start = false;
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_thread_to_stop(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->stop = true;
+  NdbCondition_Signal(my_thread_data->transport_cond);
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_stop()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+    signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_stop()
+{
+  for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+    signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+destroy_thread_data(THREAD_DATA *my_thread_data)
+{
+  free(my_thread_data->record);
+  NdbMutex_Destroy(my_thread_data->transport_mutex);
+  NdbCondition_Destroy(my_thread_data->transport_cond);
+  NdbCondition_Destroy(my_thread_data->start_cond);
+  NdbCondition_Destroy(my_thread_data->main_cond);
+}
+
+static void
+init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+  Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+  my_thread_data->record = (char*)malloc(sz);
+  memset(my_thread_data->record, 0, sz);
+  init_list_headers(&my_thread_data->list_header, 1);
+  my_thread_data->stop = false;
+  my_thread_data->ready = false;
+  my_thread_data->start = false;
+  my_thread_data->transport_mutex = NdbMutex_Create();
+  my_thread_data->transport_cond = NdbCondition_Create();
+  my_thread_data->main_cond = NdbCondition_Create();
+  my_thread_data->start_cond = NdbCondition_Create();
+  my_thread_data->thread_id = thread_id;
+}
+
+static void
+create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+  init_thread_data(my_thread_data, thread_id);
+  threadLife[thread_id] = NdbThread_Create(definer_thread,
+                                           (void**)my_thread_data,
+                                           1024 * 1024,
+                                           "flexAsynchThread",
+                                           NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_definer_threads()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+  {
+    Uint32 thread_id = i;
+    create_definer_thread(&thread_data_array[thread_id], thread_id);
+  }
+}
+
+static void
+create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+  init_thread_data(my_thread_data, thread_id);
+  threadLife[thread_id] = NdbThread_Create(executor_thread,
+                                           (void**)my_thread_data,
+                                           1024 * 1024,
+                                           "flexAsynchThread",
+                                           NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_executor_threads()
+{
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+  {
+    Uint32 thread_id = tNoOfDefinerThreads + i;
+    create_executor_thread(&thread_data_array[thread_id], thread_id);
+  }
+}
+
+static void
+main_thread(RunType start_type, NdbTimer & timer)
+{
+  bool insert_delete;
+
+  tNoOfExecutorThreads = tNoOfThreads;
+  if (tNoOfDefinerThreads == 0)
+  {
+    tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+  }
+  tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
+
+  if (start_type == RunInsert ||
+      start_type == RunDelete)
+    insert_delete = true;
+  else
+    insert_delete = false;
+
+  create_definer_threads();
+  create_executor_threads();
+
+  wait_for_threads_ready(tNoOfThreads);
+
+  /**
+   * Start threads, start with execution threads to ensure they are
+   * up and running before definer threads starts sending data to
+   * them
+   */
+  START_TIMER;
+  signal_definer_threads_to_start();
+  signal_executor_threads_to_start();
+
+  if (!insert_delete)
+  {
+    sleep(tWarmupTime);
+    tRunState = EXECUTING;
+    sleep(tExecutionTime);
+    tRunState = COOLDOWN;
+    sleep(tCooldownTime);
+    signal_definer_threads_to_stop();
+  }
+  wait_for_threads_ready(tNoOfDefinerThreads);
+  STOP_TIMER;
+
+  signal_executor_threads_to_stop();
+  wait_for_threads_ready(tNoOfThreads);
+
+  /**
+   * Now all threads are stopped and prepared to be destroyed,
+   * now start them just to destroy themselves
+   */
+  signal_definer_threads_to_start();
+  signal_executor_threads_to_start();
+
+  void * tmp;
+  for (Uint32 i = 0; i < tNoOfThreads; i++)
+  {
+    NdbThread_WaitFor(threadLife[i], &tmp);
+    NdbThread_Destroy(&threadLife[i]);
+  }
+}
+
+static NdbConnection*
+get_trans_object(Uint32 first_key,
+                 Uint32 second_key,
+                 Ndb *my_ndb)
+{
+  union {
+    Uint64 _align;
+    Uint32 Tkey32[2];
+  };
+  (void)_align;
+
+  Tkey32[0] = first_key;
+  Tkey32[1] = second_key;
+  Ndb::Key_part_ptr hint[2];
+  hint[0].ptr = Tkey32+0;
+  hint[0].len = 4;
+  hint[1].ptr = 0;
+  hint[1].len = 0;
+
+  return my_ndb->startTransaction(tables[0], hint);
+}
+
+static Ndb*
+get_ndb_object(Uint32 my_thread_id)
+{
+  Ndb *my_ndb = new Ndb(g_cluster_connection+(my_thread_id % tConnections),
+                        "TEST_DB");
+  my_ndb->init(MAXPAR);
+  my_ndb->waitUntilReady(10000);
+  return my_ndb;
+}
+
+static void
+insert_list(KEY_LIST_HEADER *list_header,
+            KEY_OPERATION *insert_op)
+{
+  KEY_OPERATION *current_last = list_header->last_in_list;
+  insert_op->next_key_op = NULL;
+  list_header->last_in_list = insert_op;
+  if (current_last)
+    current_last->next_key_op = insert_op;
+  else
+    list_header->first_in_list = insert_op;
+  list_header->num_in_list++;
+}
+
+static KEY_OPERATION*
+get_first_free(KEY_LIST_HEADER *list_header)
+{
+  assert(list_header->first_in_list);
+  KEY_OPERATION *key_op = list_header->first_in_list;
+  list_header->first_in_list = key_op->next_key_op;
+  list_header->num_in_list--;
+  if (!list_header->first_in_list)
+  {
+    list_header->last_in_list = NULL;
+  }
+  key_op->next_key_op = NULL;
+  return key_op;
+}
+
+static void
+move_list(KEY_LIST_HEADER *src_list_header,
+          KEY_LIST_HEADER *dst_list_header)
+{
+  KEY_OPERATION *last_completed_op = dst_list_header->last_in_list;
+  KEY_OPERATION *first_in_list = src_list_header->first_in_list;
+  if (!first_in_list)
+    return;
+  if (last_completed_op)
+  {
+    last_completed_op->next_key_op = first_in_list;
+  }
+  else
+  {
+    dst_list_header->first_in_list = first_in_list;
+  }
+  dst_list_header->last_in_list = src_list_header->last_in_list;
+  dst_list_header->num_in_list += src_list_header->num_in_list;
+  src_list_header->num_in_list = 0;
+  src_list_header->first_in_list = NULL;
+  src_list_header->last_in_list = NULL;
+}
+
+/**
+ * Retrieve a linked list of prepared operations. If no operations
+ * prepared we wait on a condition until operations are defined for
+ * us to execute.
+ */
+static void
+receive_operations(THREAD_DATA *my_thread_data,
+                   KEY_LIST_HEADER *list_header,
+                   bool wait)
+{
+  bool first = true;
+  KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
+  list_header->first_in_list = NULL;
+  list_header->last_in_list = NULL;
+  list_header->num_in_list = 0;
+
+recheck:
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  while (!my_thread_data->stop &&
+         (first || thread_list_header->first_in_list))
+  {
+    move_list(thread_list_header, list_header);
+    if (list_header->first_in_list)
+      break;
+    NdbCondition_Wait(my_thread_data->transport_cond,
+                      my_thread_data->transport_mutex);
+    first = false;
+  }
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+  if (first && wait &&
+      thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
+  {
+    /**
+     * We will wait for at least 200 microseconds if we haven't yet received
+     * at least half of the number of records we desire to execute.
+     */
+    NdbSleep_MicroSleep(200);
+    first = false;
+    goto recheck;
+  }
+}
+
+static void
+send_operations(Uint32 thread_id,
+                KEY_LIST_HEADER *list_header)
+{
+  THREAD_DATA *recv_thread = &thread_data_array[thread_id];
+
+  NdbMutex_Lock(recv_thread->transport_mutex);
+  /**
+   * We are moving operations into the list, thus we need
+   * to wake any threads waiting for operations to execute.
+   */
+  move_list(list_header,
+            &recv_thread->list_header);
+  NdbCondition_Signal(recv_thread->transport_cond);
+  NdbMutex_Unlock(recv_thread->transport_mutex);
+}
+
+static void
+init_key_op_list(char *key_op_ptr,
+                 KEY_LIST_HEADER *list_header,
+                 Uint32 max_outstanding,
+                 Uint32 my_thread_id,
+                 RunType my_run_type)
+{
+  KEY_OPERATION *key_op;
+
+  list_header->first_in_list = (KEY_OPERATION*)key_op_ptr;
+  for (Uint32 i = 0;
+       i < max_outstanding;
+       i++, key_op_ptr += sizeof(KEY_OPERATION))
+  {
+    key_op = (KEY_OPERATION*)key_op_ptr;
+    key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION));
+    key_op->definer_thread_id = my_thread_id;
+    key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+    key_op->operation_type = my_run_type;
+  }
+  key_op->next_key_op = NULL; /* Last key operation */
+  list_header->last_in_list = key_op;
+  list_header->num_in_list = max_outstanding;
+}
+
+static Uint32
+get_thread_id_for_record(Uint32 record_id,
+                         Uint32 node_count,
+                         Uint32 thread_count,
+                         Uint32 thread_group,
+                         Uint32 num_thread_groups,
+                         Ndb *my_ndb)
+{
+  Uint32 thread_id;
+  NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb);
+  Uint32 node_id = trans->getConnectedNodeId();
+  trans->close();
+  Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
+  if (node_count >= thread_count)
+  {
+    thread_id = node_rel_id % thread_count;
+    return thread_id;
+  }
+
+recalculate:
+  thread_id = thread_group * node_count + node_rel_id;
+  if (thread_id >= thread_count)
+  {
+    /**
+     * Only the last thread group may have less than
+     * node_count threads, so choosing a random group
+     * except the last will always give a valid
+     * thread_id.
+     */
+    thread_group = rand() % (num_thread_groups - 1);
+    goto recalculate;
+  }
+  return thread_id;
+}
+
+void init_thread_id_mem(char *thread_id_mem,
+                        Uint32 first_record,
+                        Uint32 total_records,
+                        Ndb *my_ndb)
+{
+  Uint32 node_count = get_node_count((Uint32)0);
+  Uint32 thread_count = tNoOfExecutorThreads;
+  Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count;
+  Uint32 thread_group = 0;
+  for (Uint32 record_id = first_record, i = 0;
+       i < total_records;
+       i++, record_id++)
+  {
+    thread_id_mem[i] = (char)get_thread_id_for_record(record_id,
+                                                      node_count,
+                                                      thread_count,
+                                                      thread_group,
+                                                      num_thread_groups,
+                                                      my_ndb);
+    thread_group++;
+    if (thread_group == num_thread_groups)
+      thread_group = 0;
+  }
+}
+
+static bool
+check_for_outstanding(Uint32 *thread_state)
+{
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+  {
+    if (thread_state[i])
+      return true;
+  }
+  return false;
+}
+
+static void
+update_thread_state(KEY_LIST_HEADER *list_header,
+                    Uint32 *thread_state)
+{
+  KEY_OPERATION *key_op = list_header->first_in_list;
+
+  while (key_op)
+  {
+    thread_state[key_op->executor_thread_id]--;
+    key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+    key_op = key_op->next_key_op;
+  }
+}
+
+static void
+wait_until_all_completed(THREAD_DATA *my_thread_data,
+                         Uint32 *thread_state,
+                         KEY_LIST_HEADER *free_list_header)
+{
+  KEY_LIST_HEADER list_header;
+  bool outstanding = true;
+  while (outstanding && !my_thread_data->stop)
+  {
+    receive_operations(my_thread_data, &list_header, false);
+    update_thread_state(&list_header, thread_state);
+    move_list(&list_header, free_list_header);
+    outstanding = check_for_outstanding(thread_state);
+  }
+}
+
+static Uint32
+prepare_operations(char *thread_id_mem,
+                   KEY_LIST_HEADER *free_list_header,
+                   Uint32 *thread_state,
+                   Uint32 first_record_to_define,
+                   Uint32 num_records_to_define,
+                   Uint32 first_record,
+                   Uint32 last_record,
+                   Uint32 max_per_thread)
+{
+  KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
+  Uint32 record_id, i, num_records;
+
+  init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
+  for (record_id = first_record_to_define, i = 0;
+       record_id <= last_record && i < num_records_to_define;
+       record_id++, i++)
+  {
+    KEY_OPERATION *define_op = get_first_free(free_list_header);
+    Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record];
+    define_op->first_key = record_id;
+    define_op->second_key = record_id;
+    define_op->executor_thread_id = thread_id;
+    thread_state[thread_id]++;
+    KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
+    insert_list(thread_list_header, define_op);
+    if (thread_list_header->num_in_list >= max_per_thread)
+    {
+      /**
+       * One thread has max number of records, we won't define any
+       * more to keep the code simple.
+       */
+      break;
+    }
+  }
+  num_records = i;
+  for (i = 0; i < tNoOfExecutorThreads; i++)
+  {
+    KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
+    if (thread_list_header->num_in_list)
+    {
+      send_operations(tNoOfDefinerThreads + i, thread_list_header);
+    }
+  }
+  return num_records;
+}
+
+static void*
+definer_thread(void *data)
+{
+  THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+  Uint32 my_thread_id = my_thread_data->thread_id;
+  RunType  run_type = tRunType;
+  Uint32 thread_state[MAX_EXECUTOR_THREADS];
+  Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
+                            tNoOfDefinerThreads;
+  Uint32 max_per_thread = 1000 / tNoOfDefinerThreads;
+  Uint32 total_records = max_outstanding * tNoOfTransactions;
+  Uint32 first_record = total_records * my_thread_id;
+  Uint32 my_last_record = first_record + total_records - 1;
+  Uint32 current_record = first_record;
+  KEY_LIST_HEADER free_list_header;
+  void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
+  char *thread_id_mem = (char*)malloc(total_records);
+  memset((char*)&thread_state[0], 0, sizeof(thread_state));
+
+  init_key_op_list((char*)key_op_mem,
+                   &free_list_header,
+                   max_outstanding,
+                   my_thread_id,
+                   run_type);
+  Ndb *my_ndb = get_ndb_object(my_thread_id);
+  init_thread_id_mem(thread_id_mem,
+                     first_record,
+                     total_records,
+                     my_ndb);
+  delete my_ndb;
+  ThreadExecutions[my_thread_id] = 0;
+  signal_thread_ready_wait_for_start(my_thread_data);
+
+  while (!my_thread_data->stop)
+  {
+    Uint32 defined_ops = prepare_operations(thread_id_mem,
+                                            &free_list_header,
+                                            &thread_state[0],
+                                            current_record,
+                                            max_outstanding,
+                                            first_record,
+                                            my_last_record,
+                                            max_per_thread);
+    current_record += defined_ops;
+    if (defined_ops)
+    {
+      wait_until_all_completed(my_thread_data,
+                               &thread_state[0],
+                               &free_list_header);
+    }
+    if (current_record > my_last_record)
+    {
+      if (run_type != RunRead &&
+          run_type != RunUpdate)
+      {
+        /**
+         * Inserts and deletes are done when first round is
+         * completed. Reads and updates proceed until time is
+         * completed.
+         */
+        break;
+      }
+      current_record = first_record;
+    }
+  }
+  signal_thread_ready_wait_for_start(my_thread_data);
+  free(key_op_mem);
+  free(thread_id_mem);
+  destroy_thread_data(my_thread_data);
+  return NULL;
+}
+
+/**
+ * This method receives a linked list of key operations and executes
+ * all of them.
+ *
+ * Return Value: >= 0 means successful completion of this many operations
+ *               -1 Failure, stop test
+ */
+static int
+execute_operations(char *record,
+                   Ndb* my_ndb,
+                   KEY_OPERATION *key_op)
+{
+  NdbConnection* ndb_conn_array[MAXPAR];
+  Uint32 num_ops = 0;
+
+  while (key_op)
+  {
+    ndb_conn_array[num_ops] = get_trans_object(key_op->first_key,
+                                               key_op->second_key,
+                                               my_ndb);
+    if (ndb_conn_array[num_ops] == NULL){
+      error_handler(my_ndb->getNdbError());
+      ndbout << endl << "Unable to recover! Quitting now" << endl ;
+      return -1;
+    }
+    //-------------------------------------------------------
+    // Define the operation, but do not execute it yet.
+    //-------------------------------------------------------
+    defineNdbRecordOperation(record,
+                             ndb_conn_array[num_ops],
+                             (StartType)key_op->operation_type,
+                             key_op->first_key,
+                             key_op->second_key);
+
+    ndb_conn_array[num_ops]->executeAsynchPrepare(Commit,
+                                        &executeCallback,
+                                        (void*)&ndb_conn_array[num_ops]);
+    num_ops++;
+    key_op = key_op->next_key_op;
+  }
+  if (num_ops == 0)
+    return 0;
+
+  /**
+   * Now execute each defined operation and wait for all of them to
+   * complete.
+   */
+  int Tcomp = my_ndb->sendPollNdb(3000,
+                                  num_ops,
+                                  tSendForce);
+  if (Tcomp != (int)num_ops &&
+      my_ndb->getNdbError().code != 0)
+  {
+    /* Error handling */
+    if (error_count > 100)
+      return -1;
+
+    error_count++;
+    ndbout << "error = " << my_ndb->getNdbError().code << endl;
+  }
+  return Tcomp;
+}
+
+static void
+report_back_operations(KEY_OPERATION *first_defined_op)
+{
+  KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
+  KEY_OPERATION *next_op, *executed_op;
+
+  init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
+  executed_op = first_defined_op;
+  while (executed_op)
+  {
+    next_op = executed_op->next_key_op;
+    insert_list(&thread_list_header[executed_op->definer_thread_id],
+                executed_op);
+    executed_op = next_op;
+  }
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+  {
+    if (thread_list_header[i].first_in_list)
+    {
+      send_operations(i, &thread_list_header[i]);
+    }
+  }
+}
+
+/**
+ * This is the main function of the executor threads, these threads
+ * receive linked lists of operations to execute from the definer
+ * threads. The definer threads stops these threads by simply
+ * sending a stop operation.
+ */
+static void*
+executor_thread(void *data)
+{
+  THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+  Uint32 my_thread_id = my_thread_data->thread_id;
+  Uint64 exec_count = 0;
+  Uint32 error_count = 0;
+  Uint32 executions = 0;
+  Uint32 error_flag = false;
+  int ret_code;
+  KEY_LIST_HEADER list_header;
+
+  Ndb *my_ndb = get_ndb_object(my_thread_id);
+  ThreadExecutions[my_thread_id] = 0;
+
+  signal_thread_ready_wait_for_start(my_thread_data);
+
+  while (!my_thread_data->stop)
+  {
+    receive_operations(my_thread_data, &list_header, !tImmediate);
+    if (list_header.num_in_list == 0)
+    {
+      break;
+    }
+    ret_code = 0;
+    if (!error_flag)
+    {
+      /* Ignore to execute after errors to simplify error handling */
+      ret_code = execute_operations(my_thread_data->record,
+                                    my_ndb,
+                                    list_header.first_in_list);
+    }
+    report_back_operations(list_header.first_in_list);
+    if (ret_code < 0)
+    {
+      ndbout_c("executor thread id = %u received error", my_thread_id);
+      error_flag = true;
+    }
+    else if (!error_flag &&
+             (tRunType == RunInsert ||
+              tRunType == RunDelete ||
+              tRunState == EXECUTING))
+    {
+      executions++;
+      exec_count += (Uint64)ret_code;
+    }
+  }
+
+  ThreadExecutions[my_thread_id] = exec_count;
+  if (error_count)
+  {
+    ndbout_c("Received %u errors in executor thread, id = %u",
+             error_count, my_thread_id);
+  }
+  signal_thread_ready_wait_for_start(my_thread_data);
+  delete my_ndb;
+  destroy_thread_data(my_thread_data);
+  return NULL;
+}
+
+/* End NEW Module */
+
 static
 int 
 readArguments(int argc, const char** argv){
@@ -1339,6 +2006,15 @@ readArguments(int argc, const char** arg
 	ndbout_c("Invalid no of threads");
         return -1;
       }
+    }
+    else if (strcmp(argv[i], "-d") == 0)
+    {
+      tNoOfDefinerThreads = atoi(argv[i+1]);
+      if (tNoOfDefinerThreads > NDB_MAXTHREADS)
+      {
+        ndbout_c("Invalid no of definer threads");
+        return -1;
+      }
     } else if (strcmp(argv[i], "-p") == 0){
       tNoOfParallelTrans = atoi(argv[i+1]);
       if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
@@ -1463,6 +2139,18 @@ readArguments(int argc, const char** arg
     } else if (strcmp(argv[i], "-create_table") == 0){
       tRunType = RunCreateTable;
       argc++;
+    }
+    else if (strcmp(argv[i], "-new") == 0)
+    {
+      tNew = true;
+      tNdbRecord = true;
+      argc++;
+      i--;
+    }
+    else if (strcmp(argv[i], "-immediate") == 0)
+    {
+      tImmediate = true;
+      argc++;
       i--;
     } else if (strcmp(argv[i], "-drop_table") == 0){
       tRunType = RunDropTable;
@@ -1536,4 +2224,303 @@ input_error(){
   ndbout_c("   -table Number of standard table, default 0");
 }
   
+static void
+run_old_flexAsynch(ThreadNdb *pThreadData,
+                   NdbTimer & timer)
+{
+  int                   tLoops=0;
+  /****************************************************************
+   *  Create NDB objects.                                   *
+  ****************************************************************/
+  resetThreads();
+  for (Uint32 i = 0; i < tNoOfThreads ; i++)
+  {
+    pThreadData[i].ThreadNo = i;
+    threadLife[i] = NdbThread_Create(threadLoop,
+                                     (void**)&pThreadData[i],
+                                     32768,
+                                     "flexAsynchThread",
+                                     NDB_THREAD_PRIO_LOW);
+  }//for
+  ndbout << endl <<  "All NDB objects and table created" << endl << endl;
+  int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
+  /****************************************************************
+   * Execute program.                                             *
+   ****************************************************************/
+
+  for (;;)
+  {
+
+    int loopCount = tLoops + 1 ;
+    ndbout << endl << "Loop # " << loopCount  << endl << endl ;
+
+    /****************************************************************
+     * Perform inserts.                                             *
+     ****************************************************************/
+
+    failed = 0 ;
+    if (tRunType == RunAll || tRunType == RunInsert)
+    {
+      ndbout << "Executing inserts" << endl;
+      START_TIMER;
+      execute(stInsert);
+      STOP_TIMER;
+    }
+    if (tRunType == RunAll)
+    {
+      a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+      PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+      if (0 < failed)
+      {
+        int i = retry_opt ;
+        int ci = 1 ;
+        while (0 < failed && 0 < i)
+        {
+          ndbout << failed << " of the transactions returned errors!"
+                 << endl << endl;
+          ndbout << "Attempting to redo the failed transactions now..."
+                 << endl ;
+          ndbout << "Redo attempt " << ci <<" out of " << retry_opt
+                 << endl << endl;
+          failed = 0 ;
+          START_TIMER;
+          execute(stInsert);
+          STOP_TIMER;
+          PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+          i-- ;
+          ci++;
+        }
+        if (0 == failed )
+        {
+          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+        }
+        else
+        {
+          ndbout << endl <<"Redo attempt failed, moving on now..." << endl
+                 << endl;
+        }//if
+      }//if
+    }//if
+    /****************************************************************
+     * Perform read.                                                *
+     ****************************************************************/
+
+    failed = 0 ;
+
+    if (tRunType == RunAll || tRunType == RunRead)
+    {
+      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+      {
+        ndbout << "Executing reads" << endl;
+        START_TIMER;
+        execute(stRead);
+        STOP_TIMER;
+        if (tRunType == RunAll)
+        {
+          a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+        }//if
+      }//for
+    }//if
+
+    if (tRunType == RunAll)
+    {
+      if (0 < failed)
+      {
+        int i = retry_opt ;
+        int cr = 1;
+        while (0 < failed && 0 < i)
+        {
+          ndbout << failed << " of the transactions returned errors!"<<endl ;
+          ndbout << endl;
+          ndbout <<"Attempting to redo the failed transactions now..." << endl;
+          ndbout << endl;
+          ndbout <<"Redo attempt " << cr <<" out of ";
+          ndbout << retry_opt << endl << endl;
+          failed = 0 ;
+          START_TIMER;
+          execute(stRead);
+          STOP_TIMER;
+          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          i-- ;
+          cr++ ;
+        }//while
+        if (0 == failed )
+        {
+          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+        }
+        else
+        {
+          ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+        }//if
+      }//if
+    }//if
+
+
+    /****************************************************************
+     * Perform update.                                              *
+     ****************************************************************/
+
+    failed = 0 ;
+
+    if (tRunType == RunAll || tRunType == RunUpdate)
+    {
+      ndbout << "Executing updates" << endl;
+      START_TIMER;
+      execute(stUpdate);
+      STOP_TIMER;
+    }//if
+    if (tRunType == RunAll)
+    {
+      a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+      PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+      if (0 < failed)
+      {
+        int i = retry_opt ;
+        int cu = 1 ;
+        while (0 < failed && 0 < i)
+        {
+          ndbout << failed << " of the transactions returned errors!"<<endl ;
+          ndbout << endl;
+          ndbout <<"Attempting to redo the failed transactions now..." << endl;
+          ndbout << endl <<"Redo attempt " << cu <<" out of ";
+          ndbout << retry_opt << endl << endl;
+          failed = 0 ;
+          START_TIMER;
+          execute(stUpdate);
+          STOP_TIMER;
+          PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+          i-- ;
+          cu++ ;
+        }//while
+        if (0 == failed )
+        {
+          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+        }
+        else
+        {
+          ndbout << endl;
+          ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+        }//if
+      }//if
+    }//if
+
+    /****************************************************************
+     * Perform read.                                             *
+     ****************************************************************/
+
+    failed = 0 ;
+
+    if (tRunType == RunAll)
+    {
+      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+      {
+        ndbout << "Executing reads" << endl;
+        START_TIMER;
+        execute(stRead);
+        STOP_TIMER;
+        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+      }
+
+      if (0 < failed)
+      {
+        int i = retry_opt ;
+        int cr2 = 1 ;
+        while (0 < failed && 0 < i)
+        {
+          ndbout << failed << " of the transactions returned errors!"<<endl ;
+          ndbout << endl;
+          ndbout <<"Attempting to redo the failed transactions now..." << endl;
+          ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+          ndbout << retry_opt << endl << endl;
+          failed = 0 ;
+          START_TIMER;
+          execute(stRead);
+          STOP_TIMER;
+          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          i-- ;
+          cr2++ ;
+        }//while
+        if (0 == failed )
+        {
+          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+        }
+        else
+        {
+          ndbout << endl;
+          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+        }//if
+      }//if
+    }//if
+
+
+    /****************************************************************
+     * Perform delete.                                              *
+     ****************************************************************/
+
+    failed = 0 ;
+
+    if (tRunType == RunAll || tRunType == RunDelete)
+    {
+      ndbout << "Executing deletes" << endl;
+      START_TIMER;
+      execute(stDelete);
+      STOP_TIMER;
+    }//if
+    if (tRunType == RunAll)
+    {
+      a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+      PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+      if (0 < failed) {
+        int i = retry_opt ;
+        int cd = 1 ;
+        while (0 < failed && 0 < i)
+        {
+          ndbout << failed << " of the transactions returned errors!"<< endl ;
+          ndbout << endl;
+          ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+          ndbout << endl <<"Redo attempt " << cd <<" out of ";
+          ndbout << retry_opt << endl << endl;
+          failed = 0 ;
+          START_TIMER;
+          execute(stDelete);
+          STOP_TIMER;
+          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          i-- ;
+          cd++ ;
+        }//while
+        if (0 == failed )
+        {
+          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+        }
+        else
+        {
+          ndbout << endl;
+          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+        }//if
+      }//if
+    }//if
+
+    tLoops++;
+    ndbout << "--------------------------------------------------" << endl;
+
+    if (tNoOfLoops != 0)
+    {
+      if (tNoOfLoops <= tLoops)
+        break ;
+    }
+  }//for
+
+  execute(stStop);
+  void * tmp;
+  for (Uint32 i = 0; i < tNoOfThreads; i++)
+  {
+    NdbThread_WaitFor(threadLife[i], &tmp);
+    NdbThread_Destroy(&threadLife[i]);
+  }
+}
 template class Vector<NdbDictionary::RecordSpecification>;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mauritz.sundell:3935 to 3936) Mauritz Sundell11 Jun