List:Commits« Previous MessageNext Message »
From:jonas oreland Date:January 23 2012 8:25pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4814 to 4818)
View as plain text  
 4818 mikael.ronstrom@stripped	2012-01-23
      ndb - add more confusing and obfuscated code to flexAsync

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 4817 mikael.ronstrom@stripped	2012-01-23
      ndb - add last pieces needed for multiple receive threads

    modified:
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
 4816 jonas oreland	2012-01-23
      ndb - assert that extra_sockets is not used by ndbmtd (i.e when using several receiveHandles)

    modified:
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
 4815 jonas oreland	2012-01-23
      ndb - allow 12 LDM too

    modified:
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
 4814 jonas oreland	2012-01-23
      ndb - fix CMake don't use "IN ITEMS"

    modified:
      storage/ndb/src/kernel/vm/CMakeLists.txt
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-20 07:41:48 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-23 19:53:55 +0000
@@ -1242,6 +1242,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
   if (extra_socket && recvdata.m_transporters.get(0))
   {
     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
+    assert(&recvdata == receiveHandle); // not used by ndbmtd...
 
     // Poll the wakup-socket for read
     recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1317,6 +1318,7 @@ TransporterRegistry::performReceive(Tran
   if (recvdata.m_has_data_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
+    assert(&recvdata == receiveHandle); // not used by ndbmtd
     recvdata.m_has_data_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
@@ -1619,7 +1621,7 @@ run_start_clients_C(void * me)
 }
 
 /**
- * This method is used to initiate connection, called from the CMVMI blockx.
+ * This method is used to initiate connection, called from the TRPMAN block.
  *
  * This works asynchronously, no actions are taken directly in the calling
  * thread.
@@ -1652,7 +1654,7 @@ TransporterRegistry::do_connect(NodeId n
 }
 
 /**
- * This method is used to initiate disconnect from CMVMI. It is also called
+ * This method is used to initiate disconnect from TRPMAN. It is also called
  * from the TCP transporter in case of an I/O error on the socket.
  *
  * This works asynchronously, similar to do_connect().

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-23 20:23:08 +0000
@@ -23,6 +23,8 @@
 #include <signaldata/RouteOrd.hpp>
 #include <signaldata/DumpStateOrd.hpp>
 
+#include <mt.hpp>
+
 Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
   SimulatedBlock(TRPMAN, ctx, instanceno)
 {
@@ -50,21 +52,17 @@ BLOCK_FUNCTIONS(Trpman)
 #ifdef ERROR_INSERT
 static NodeBitmask c_error_9000_nodes_mask;
 extern Uint32 MAX_RECEIVED_SIGNALS;
-
-class TransporterReceiveHandle *
-mt_get_trp_receive_handle(unsigned instance);
 #endif
 
 bool
 Trpman::handles_this_node(Uint32 nodeId)
 {
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
   return true;
 #else
   if (globalData.ndbMtReceiveThreads <= (Uint32)1)
     return true;
-  return (instance()==
-          (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+  return (instance()== (mt_get_recv_thread_idx(nodeId) + /* proxy */ 1));
 #endif
 }
 
@@ -780,10 +778,10 @@ TrpmanProxy::execROUTE_ORD(Signal* signa
   jamEntry();
 
   ndbassert(nodeId != 0);
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
   Uint32 workerId = 0;
 #else
-  Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+  Uint32 workerId = mt_get_recv_thread_idx(nodeId);
 #endif
   SectionHandle handle(this, signal);
   sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2012-01-23 20:23:08 +0000
@@ -402,6 +402,10 @@ TransporterReceiveHandleKernel::reportCo
   Uint32 secPtr[3];
   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 #else
+  /**
+   * The first argument to sendprioa is from which thread number this
+   * signal is sent, it is always sent from a receive thread
+   */
   sendprioa(m_thr_no /* self */,
             &signal.header, signal.theData, NULL);
 #endif
@@ -413,7 +417,6 @@ TransporterReceiveHandleKernel::reportCo
 void
 TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
 {
-
   DBUG_ENTER("reportDisconnect");
 
   SignalT<sizeof(DisconnectRep)/4> signal;
@@ -481,10 +484,24 @@ TransporterReceiveHandleKernel::checkJob
 #ifndef NDBD_MULTITHREADED
   return globalScheduler.checkDoJob();
 #else
-  return mt_checkDoJob();
+  return mt_checkDoJob(m_receiver_thread_idx);
 #endif
 }
 
+#ifdef NDBD_MULTITHREADED
+void
+TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
+{
+  m_transporters.clear(); /* Clear all first */
+  for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+  {
+    if (recv_thread_idx_array[nodeId] == m_receiver_thread_idx)
+      m_transporters.set(nodeId); /* Belongs to our receive thread */
+  }
+  return;
+}
+#endif
+
 void
 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
 {

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	2012-01-23 20:23:08 +0000
@@ -36,6 +36,11 @@ public:
    *   instance() - 1(proxy)
    */
   Uint32 m_receiver_thread_idx;
+
+  /**
+   * Assign nodes to this TransporterReceiveHandle
+   */
+  void assign_nodes(NodeId *recv_thread_idx_array);
 #endif
 
   /* TransporterCallback interface. */

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-01-23 20:23:08 +0000
@@ -89,8 +89,8 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
 /* If this is too small it crashes before first signal. */
 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
 
-static Uint32 receiver_thread_no = 0;
 static Uint32 num_threads = 0;
+static Uint32 first_receiver_thread_no = 0;
 
 #define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
 
@@ -1526,10 +1526,10 @@ flush_jbb_write_state(thr_data *selfptr)
  * else 1
  */
 static int
-check_job_buffers(struct thr_repository* rep)
+check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
 {
   const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
-  unsigned thr_no = receiver_thread_no;
+  unsigned thr_no = first_receiver_thread_no + recv_thread_id;
   const thr_data *thrptr = rep->m_thread;
   for (unsigned i = 0; i<num_threads; i++, thrptr++)
   {
@@ -1549,7 +1549,6 @@ check_job_buffers(struct thr_repository*
       return 1;
     }
   }
-
   return 0;
 }
 
@@ -1669,7 +1668,7 @@ trp_callback::reportSendLen(NodeId nodeI
 void
 trp_callback::lock_transporter(NodeId node)
 {
-  Uint32 recv_thread_no = 0;
+  Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
   struct thr_repository* rep = &g_thr_repository;
   /**
    * Note: take the send lock _first_, so that we will not hold the receive
@@ -1681,31 +1680,31 @@ trp_callback::lock_transporter(NodeId no
    * non-waiting (so we will not block sending on other transporters).
    */
   lock(&rep->m_send_buffers[node].m_send_lock);
-  lock(&rep->m_receive_lock[recv_thread_no]);
+  lock(&rep->m_receive_lock[recv_thread_idx]);
 }
 
 void
 trp_callback::unlock_transporter(NodeId node)
 {
-  Uint32 recv_thread_no = 0;
+  Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
   struct thr_repository* rep = &g_thr_repository;
-  unlock(&rep->m_receive_lock[recv_thread_no]);
+  unlock(&rep->m_receive_lock[recv_thread_idx]);
   unlock(&rep->m_send_buffers[node].m_send_lock);
 }
 
 int
-mt_checkDoJob()
+mt_checkDoJob(Uint32 recv_thread_idx)
 {
   struct thr_repository* rep = &g_thr_repository;
-  if (unlikely(check_job_buffers(rep)))
+  if (unlikely(check_job_buffers(rep, recv_thread_idx)))
   {
     do 
     {
       /**
        * theoretically (or when we do single threaded by using ndbmtd with
        * all in same thread) we should execute signals here...to 
-       * prevent dead-lock, but...with current ndbmtd only CMVMI runs in
-       * this thread, and other thread is waiting for CMVMI
+       * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
+       * this thread, and other thread is waiting for TRPMAN 
        * except for QMGR open/close connection, but that is not
        * (i think) sufficient to create a deadlock
        */
@@ -1728,7 +1727,7 @@ mt_checkDoJob()
       NdbSleep_MilliSleep(0);
 #endif
 
-    } while (check_job_buffers(rep));
+    } while (check_job_buffers(rep, recv_thread_idx));
   }
 
   return 0;
@@ -2844,6 +2843,11 @@ aligned_signal(unsigned char signal_buf[
 TransporterReceiveHandleKernel *
   g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
 
+/**
+ * Array for mapping nodes to receiver threads and function to access it.
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
 extern "C"
 void *
 mt_receiver_thread_main(void *thr_arg)
@@ -2855,8 +2859,9 @@ mt_receiver_thread_main(void *thr_arg)
   unsigned thr_no = selfptr->m_thr_no;
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
+  const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
   bool has_received = false;
-  const unsigned recv_thread_idx = 0;
+  int cnt = 0;
 
   init_thread(selfptr);
   signal = aligned_signal(signal_buf, thr_no);
@@ -2865,6 +2870,7 @@ mt_receiver_thread_main(void *thr_arg)
    * Object that keeps track of our pollReceive-state
    */
   TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
+  recvdata.assign_nodes(g_node_to_recv_thr_map);
   globalTransporterRegistry.init(recvdata);
 
   /**
@@ -2874,8 +2880,6 @@ mt_receiver_thread_main(void *thr_arg)
 
   while (globalData.theRestartFlag != perform_stop)
   {
-    static int cnt = 0;
-
     if (cnt == 0)
     {
       watchDogCounter = 5;
@@ -2903,7 +2907,7 @@ mt_receiver_thread_main(void *thr_arg)
     has_received = false;
     if (globalTransporterRegistry.pollReceive(1, recvdata))
     {
-      if (check_job_buffers(rep) == 0)
+      if (check_job_buffers(rep, recv_thread_idx) == 0)
       {
 	watchDogCounter = 8;
         lock(&rep->m_receive_lock[recv_thread_idx]);
@@ -2970,7 +2974,7 @@ check_job_buffer_full(thr_data *selfptr)
  *   In order to prevent "job-buffer-full", i.e
  *     that one thread(T1) produces so much signals to another thread(T2)
  *     so that the ring-buffer from T1 to T2 gets full
- *     the mainlop have 2 "config" variables
+ *     the main loop have 2 "config" variables
  *   - m_max_exec_signals
  *     This is the *total* no of signals T1 can execute before calling
  *     this method again
@@ -3170,7 +3174,7 @@ sendlocal(Uint32 self, const SignalHeade
    * to the other thread.
    * This parameter found to be reasonable by benchmarking.
    */
-  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ? 
+  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self >= first_receiver_thread_no) ? 
     MAX_SIGNALS_BEFORE_FLUSH_RECEIVER : 
     MAX_SIGNALS_BEFORE_FLUSH_OTHER;
 
@@ -3541,9 +3545,9 @@ ThreadConfig::init()
   Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
   Uint32 num_tc_threads = globalData.ndbMtTcThreads;
   Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
-
-  receiver_thread_no = NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
-  num_threads = receiver_thread_no + num_recv_threads;
+  first_receiver_thread_no =
+    NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
+  num_threads = first_receiver_thread_no + num_recv_threads;
   require(num_threads <= MAX_BLOCK_THREADS);
 
   ndbout << "NDBMT: number of block threads=" << num_threads << endl;
@@ -3565,6 +3569,45 @@ setcpuaffinity(struct thr_repository* re
   }
 }
 
+/**
+ * return receiver thread handling a particular node
+ *   returned number is indexed from 0 and upwards to #receiver threads
+ *   (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId)
+{
+  assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
+  return g_node_to_recv_thr_map[nodeId];
+}
+
+static
+void
+assign_receiver_threads(void)
+{
+  Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
+  Uint32 recv_thread_idx = 0;
+  for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+  {
+    Transporter *node_trp =
+      globalTransporterRegistry.get_transporter(nodeId);
+
+    if (node_trp)
+    {
+      g_node_to_recv_thr_map[nodeId] = recv_thread_idx;
+      recv_thread_idx++;
+      if (recv_thread_idx == num_recv_threads)
+        recv_thread_idx = 0;
+    }
+    else
+    {
+      /* Flag for no transporter */
+      g_node_to_recv_thr_map[nodeId] = MAX_NODES;
+    }
+  }
+  return;
+}
+
 void
 ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
 {
@@ -3576,38 +3619,59 @@ ThreadConfig::ipControlLoop(NdbThread* p
    */
   setcpuaffinity(rep);
 
+  /**
+   * assign nodes to receiver threads
+   */
+  assign_receiver_threads();
+
   /*
    * Start threads for all execution threads, except for the receiver
    * thread, which runs in the main thread.
    */
+
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
 
-    if (thr_no == receiver_thread_no)
+    if (thr_no == first_receiver_thread_no)
       continue;                 // Will run in the main thread.
 
     /*
      * The NdbThread_Create() takes void **, but that is cast to void * when
      * passed to the thread function. Which is kind of strange ...
      */
-    rep->m_thread[thr_no].m_thread =
-      NdbThread_Create(mt_job_thread_main,
-                       (void **)(rep->m_thread + thr_no),
-                       1024*1024,
-                       "execute thread", //ToDo add number
-                       NDB_THREAD_PRIO_MEAN);
-    require(rep->m_thread[thr_no].m_thread != NULL);
+    if (thr_no < first_receiver_thread_no)
+    {
+      /* Start block threads */
+      rep->m_thread[thr_no].m_thread =
+        NdbThread_Create(mt_job_thread_main,
+                         (void **)(rep->m_thread + thr_no),
+                         1024*1024,
+                         "execute thread", //ToDo add number
+                         NDB_THREAD_PRIO_MEAN);
+      require(rep->m_thread[thr_no].m_thread != NULL);
+    }
+    else
+    {
+      /* Start a receiver thread, also block thread for TRPMAN */
+      rep->m_thread[thr_no].m_thread =
+        NdbThread_Create(mt_receiver_thread_main,
+                         (void **)(rep->m_thread + thr_no),
+                         1024*1024,
+                         "receive thread", //ToDo add number
+                         NDB_THREAD_PRIO_MEAN);
+      require(rep->m_thread[thr_no].m_thread != NULL);
+    }
   }
 
-  /* Now run the main loop for thread 0 directly. */
-  rep->m_thread[receiver_thread_no].m_thread = pThis;
-  mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
+  /* Now run the main loop for first receiver thread directly. */
+  rep->m_thread[first_receiver_thread_no].m_thread = pThis;
+  mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
 
   /* Wait for all threads to shutdown. */
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
-    if (thr_no == receiver_thread_no)
+    if (thr_no == first_receiver_thread_no)
       continue;
     void *dummy_return_status;
     NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2012-01-23 20:23:08 +0000
@@ -47,7 +47,7 @@ SendStatus mt_send_remote(Uint32 self, c
 void mt_section_lock();
 void mt_section_unlock();
 
-int mt_checkDoJob();
+int mt_checkDoJob(Uint32 receiver_thread_idx);
 
 /**
  * Are we (not) multi threaded
@@ -109,4 +109,12 @@ mt_get_thr_stat(class SimulatedBlock *,
 class TransporterReceiveHandle *
 mt_get_trp_receive_handle(unsigned instance);
 
+/**
+ * return receiver thread handling a particular node
+ *   returned number is indexed from 0 and upwards to #receiver threads
+ *   (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId);
+
 #endif

=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2012-01-04 11:13:53 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2012-01-23 19:50:55 +0000
@@ -462,15 +462,16 @@ THRConfig::do_validate()
   }
 
   /**
-   * LDM can be 1 2 4 8 16
+   * LDM can be 1 2 4 8 12 16
    */
   if (m_threads[T_LDM].size() != 1 &&
       m_threads[T_LDM].size() != 2 &&
       m_threads[T_LDM].size() != 4 &&
       m_threads[T_LDM].size() != 8 &&
+      m_threads[T_LDM].size() != 12 &&
       m_threads[T_LDM].size() != 16)
   {
-    m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,16. Specified: %u",
+    m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,12,16. Specified: %u",
                      m_threads[T_LDM].size());
     return -1;
   }

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-12 08:40:08 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-23 19:50:55 +0000
@@ -4828,6 +4828,7 @@ check_2n_number_less_16(Uint32 num)
     case 2:
     case 4:
     case 8:
+    case 12:
     case 16:
       return true;
     default:
@@ -4861,13 +4862,13 @@ checkThreadConfig(InitConfigFileParser::
 
   if (!check_2n_number_less_16(lqhThreads))
   {
-    ctx.reportError("NumLqhThreads must be 0, 1,2,4,8 or 16");
+    ctx.reportError("NumLqhThreads must be 0, 1,2,4,8,12 or 16");
     return false;
   }
   if (!check_2n_number_less_16(ndbLogParts) ||
       ndbLogParts < 4)
   {
-    ctx.reportError("NoOfLogParts must be 4,8 or 16");
+    ctx.reportError("NoOfLogParts must be 4,8,12 or 16");
     return false;
   }
   if (ctx.m_currentSection->get("ThreadConfig", &thrconfig))

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-11-11 08:35:14 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2012-01-23 20:25:28 +0000
@@ -36,9 +36,10 @@
 #define MAX_PARTS 4 
 #define MAX_SEEK 16 
 #define MAXSTRLEN 16 
-#define MAXATTR 64
+#define MAXATTR 511 
 #define MAXTABLES 1
 #define NDB_MAXTHREADS 128
+#define NDB_MAX_NODES 48
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
   #define from <sys/thread.h> on AIX (IBM compiler).  We explicitly
@@ -93,8 +94,8 @@ static bool executeTransLoop(ThreadNdb*
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
-static Uint32 getKey(Uint32, Uint32) ;
 static void input_error();
+static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
 
 
 static int                              retry_opt = 3 ;
@@ -108,7 +109,10 @@ static int
 static longlong                 ThreadExecutions[NDB_MAXTHREADS];
 static StartType                ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
+static const NdbDictionary::Table *     tables[MAXTABLES];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
+static bool                             nodeTableArray[MAXTABLES][NDB_MAX_NODES + 1];
+static Uint32                           numberNodeTable[MAXTABLES];
 static RunType                          tRunType = RunAll;
 static int                              tStdTableNum = 0;
 static int                              tWarmupTime = 10; //Seconds
@@ -119,8 +123,7 @@ static int
 static NdbRecord * g_record[MAXTABLES];
 static bool tNdbRecord = false;
 
-static bool                             tLocal = false;
-static int                              tLocalPart = 0;
+static int                              tLocal = 0;
 static int                              tSendForce = 0;
 static int                              tNoOfLoops = 1;
 static int                              tAttributeSize = 1;
@@ -606,19 +609,18 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       tRunType == RunUpdate ||
       tRunType == RunDelete)
   {
-    longlong total_executions = 0;
-    longlong total_transactions;
+    longlong total_transactions = 0;
     longlong exec_time;
 
     if (tRunType == RunInsert || tRunType == RunDelete) {
-      total_executions = (longlong)tNoOfTransactions;
-      total_executions *= (longlong)tNoOfThreads;
+      total_transactions = (longlong)tNoOfTransactions;
+      total_transactions *= (longlong)tNoOfThreads;
+      total_transactions *= (longlong)tNoOfParallelTrans;
     } else {
       for (Uint32 i = 0; i < tNoOfThreads; i++){
-        total_executions += ThreadExecutions[i];
+        total_transactions += ThreadExecutions[i];
       }
     }
-    total_transactions = total_executions * tNoOfParallelTrans;
     if (tRunType == RunInsert || tRunType == RunDelete) {
       exec_time = (longlong)timer.elapsedTime();
     } else {
@@ -654,9 +656,10 @@ threadLoop(void* ThreadData)
   ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
   int threadNo = tabThread->ThreadNo;
   localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
-  localNdb->init(1024);
+  localNdb->init(MAXPAR);
   localNdb->waitUntilReady(10000);
-  unsigned int threadBase = (threadNo << 16);
+  unsigned int threadBase = threadNo;
+
   
   for (;;){
     while (ThreadStart[threadNo] == stIdle) {
@@ -671,11 +674,18 @@ threadLoop(void* ThreadData)
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
     if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
-      if(!executeThread(tabThread, tType, localNdb, threadBase)){
+      if(!executeThread(tabThread,
+                        tType,
+                        localNdb,
+                        threadBase)){
         break;
       }
     } else {
-      if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+      if(!executeTransLoop(tabThread,
+                           tType,
+                           localNdb,
+                           threadBase,
+                           threadNo)){
         break;
       }
     }
@@ -691,81 +701,142 @@ threadLoop(void* ThreadData)
 static int error_count = 0;
 
 static bool
+update_num_ops(Uint32 *num_ops, NdbConnection **tConArray)
+{
+  /*
+    Move num_ops forward to next unused position, can be old
+    transactions still outstanding
+  */
+  for ( ; *num_ops < tNoOfParallelTrans; (*num_ops)++)
+  {
+    if (tConArray[*num_ops])
+      continue;
+    else
+      break;
+  }
+  if (*num_ops == tNoOfParallelTrans)
+    return true;
+  return false;
+}
+
+static int
 executeTrans(ThreadNdb* pThread,
              StartType aType,
              Ndb* aNdbObject,
              unsigned int threadBase,
-             unsigned int i)
+             unsigned int record,
+             Uint32 nodeId,
+             NdbConnection **tConArray,
+             bool execute_all)
 {
-  NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
+  Uint32 threadBaseLoc, threadBaseLoc2;
+  Uint32 num_ops = 0;
+  Uint32 i, loops;
 
-  if (tLocal == false) {
-    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-  } else {
-    tBase = i * tNoOfParallelTrans * MAX_SEEK;
-  }//if
   START_REAL_TIME;
-  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
-    if (tLocal == false) {
+  for (i = record, loops = 0;
+       i < tNoOfTransactions &&
+       loops < 16 &&
+       num_ops < tNoOfParallelTrans;
+       i++, loops++)
+  {
+    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+    threadBaseLoc = (threadBase * tNoOfTransactions * tNoOfParallelTrans) +
+                    (i * tNoOfParallelTrans);
+    for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+      if (update_num_ops(&num_ops, tConArray))
+        break;
+      threadBaseLoc2 = threadBaseLoc + j;
       tBase2 = tBase + (j * tNoOfOpsPerTrans);
-    } else {
-      tBase2 = tBase + (j * MAX_SEEK);
-      tBase2 = getKey(threadBase, tBase2);
-    }//if
-    if (startTransGuess == true) {
-      union {
-        Uint64 Tkey64;
-        Uint32 Tkey32[2];
-      };
-      Tkey32[0] = threadBase;
-      Tkey32[1] = tBase2;
-      tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                                  (const char*)&Tkey64, //Main PKey
-                                                  (Uint32)4);           //Key Length
-    } else {
-      tConArray[j] = aNdbObject->startTransaction();
-    }//if
-    if (tConArray[j] == NULL){
-      error_handler(aNdbObject->getNdbError());
-      ndbout << endl << "Unable to recover! Quiting now" << endl ;
-      return false;
-    }//if
-    
-    for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
-      //-------------------------------------------------------
-      // Define the operation, but do not execute it yet.
-      //-------------------------------------------------------
-      if (tNdbRecord)
-        defineNdbRecordOperation(pThread, 
-                                 tConArray[j], aType, threadBase,(tBase2+k));
+      if (startTransGuess == true) {
+        union {
+          Uint64 _align;
+          Uint32 Tkey32[2];
+        };
+        (void)_align;
+
+        Tkey32[0] = threadBaseLoc2;
+        Tkey32[1] = tBase2;
+        Ndb::Key_part_ptr hint[2];
+        hint[0].ptr = Tkey32+0;
+        hint[0].len = 4;
+        hint[1].ptr = 0;
+        hint[1].len = 0;
+
+        tConArray[num_ops] = aNdbObject->startTransaction(tables[0], hint);
+      }
       else
-        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
-    }//for
+      {
+        tConArray[num_ops] = aNdbObject->startTransaction();
+      }
+
+      if (tConArray[num_ops] == NULL){
+        error_handler(aNdbObject->getNdbError());
+        ndbout << endl << "Unable to recover! Quitting now" << endl ;
+        return -1;
+      }//if
+   
+      if (nodeId != 0 &&
+          tConArray[num_ops]->getConnectedNodeId() != nodeId)
+      {
+        /*
+          We're running only local operations, this won't be local,
+          ignore this record
+        */
+        aNdbObject->closeTransaction(tConArray[num_ops]);
+        continue;
+      }
+      for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
+        //-------------------------------------------------------
+        // Define the operation, but do not execute it yet.
+        //-------------------------------------------------------
+        if (tNdbRecord)
+          defineNdbRecordOperation(pThread, 
+                                   tConArray[num_ops],
+                                   aType,
+                                   threadBaseLoc2,
+                                   (tBase2+k));
+        else
+          defineOperation(tConArray[num_ops],
+                          aType,
+                          threadBaseLoc2,
+                          (tBase2 + k));
+      }//for
     
-    tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+      tConArray[num_ops]->executeAsynchPrepare(Commit,
+                                               &executeCallback,
+                                               (void*)&tConArray[num_ops]);
+      num_ops++;
+    }//for
   }//for
   STOP_REAL_TIME;
+  if (num_ops == 0)
+    return 0;
   //-------------------------------------------------------
   // Now we have defined a set of operations, it is now time
-  // to execute all of them.
+  // to execute all of them. If execute_all isn't set, we
+  // only execute at least half of them. In this manner we
+  // can cater for different execution speeds in different
+  // parts of the system.
   //-------------------------------------------------------
-  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-  while (unsigned(Tcomp) < tNoOfParallelTrans) {
-    int TlocalComp = aNdbObject->pollNdb(3000, 0);
+  int min_execs = execute_all ? (int)num_ops :
+                     (num_ops > 1 ? (int)(num_ops / 2) : 1);
+  int Tcomp = aNdbObject->sendPollNdb(3000,
+                                      min_execs,
+                                      tSendForce);
+  while (Tcomp < min_execs) {
+    int TlocalComp = aNdbObject->pollNdb(3000, min_execs - Tcomp);
     Tcomp += TlocalComp;
   }//while
-  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
-    if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
-      error_count++;
-      ndbout << "i = " << i << ", j = " << j << ", error = ";
-      ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
-      ndbout << hex << threadBase << endl;
-    }
-    aNdbObject->closeTransaction(tConArray[j]);
-  }//for
-  return true;
+  if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+    error_count++;
+    ndbout << "i = " << i << ", error = ";
+    ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+    ndbout << hex << threadBase << endl;
+  }
+  return Tcomp;
 }
 
 static 
@@ -779,28 +850,66 @@ executeTransLoop(ThreadNdb* pThread,
   int time_expired;
   longlong executions = 0;
   unsigned int i = 0;
+  Uint32 nodeId;
+  int ops = 0;
+  int record;
+  Uint32 local_count = 0;
+  bool execute_all = true;
   DEFINE_TIMER;
+  NdbConnection* tConArray[MAXPAR];
 
+  for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
+  if (tLocal > 0)
+  {
+    nodeId = get_my_node_id((Uint32)0, threadBase);
+  }
+  else
+    nodeId = 0;
   ThreadExecutions[threadNo] = 0;
   START_TIMER;
   do
   {
-    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+    if (tLocal == 2)
+    {
+      /* Select node on round robin basis */
+      local_count++;
+      nodeId = get_my_node_id((Uint32)0, local_count);
+    }
+    else if (tLocal == 3)
+    {
+      /* Select node on random basis */
+      local_count = (Uint32)(rand() % numberNodeTable[0]);
+      nodeId = get_my_node_id((Uint32)0, local_count);
+    }
+    record = rand() % tNoOfTransactions;
+    if ((ops = executeTrans(pThread,
+                            aType,
+                            aNdbObject,
+                            threadBase,
+                            (Uint32)record,
+                            nodeId,
+                            tConArray,
+                            execute_all)) < 0)
       return false;
     STOP_TIMER;
+    if (!continue_flag)
+      break;
     time_expired = (int)(timer.elapsedTime() / 1000);
     if (time_expired < tWarmupTime)
       ; //Do nothing
     else if (time_expired < (tWarmupTime + tExecutionTime)){
-      executions++; //Count measurement
+      executions += ops; //Count measurement
     }
     else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
       ; //Do nothing
     else
+    {
+      execute_all = true;
       continue_flag = false; //Time expired
+    }
     if (i == tNoOfTransactions) /* Make sure the record exists */
       i = 0;
-  } while (continue_flag);
+  } while (1);
   ThreadExecutions[threadNo] = executions;
   return true;
 }//executeTransLoop()
@@ -808,39 +917,32 @@ executeTransLoop(ThreadNdb* pThread,
 static 
 bool
 executeThread(ThreadNdb* pThread, 
-	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+	      StartType aType,
+              Ndb* aNdbObject,
+              unsigned int threadBase) {
+  NdbConnection* tConArray[MAXPAR];
+
+  for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
   for (unsigned int i = 0; i < tNoOfTransactions; i++) {
-    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+    if ((executeTrans(pThread,
+                      aType,
+                      aNdbObject,
+                      threadBase,
+                      i,
+                      (Uint32)0,
+                      tConArray,
+                      true)) < 0)
       return false;
   }//for
   return true;
 }//executeThread()
 
-static 
-Uint32
-getKey(Uint32 aBase, Uint32 anIndex) {
-  Uint32 Tfound = anIndex;
-  union {
-    Uint64 Tkey64;
-    Uint32 Tkey32[2];
-  };
-  Tkey32[0] = aBase;
-  Uint32 hash;
-  for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
-    Tkey32[1] = (Uint32)i;
-    hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
-    hash = (hash >> 6) & (MAX_PARTS - 1);
-    if (hash == unsigned(tLocalPart)) {
-      Tfound = i;
-      break;
-    }//if
-  }//for
-  return Tfound;
-}//getKey()
-
 static void
 executeCallback(int result, NdbConnection* NdbObject, void* aObject)
 {
+  NdbConnection **array_ref = (NdbConnection**)aObject;
+  assert(NdbObject == *array_ref);
+  *array_ref = NULL;
   if (result == -1) {
 
               // Add complete error handling here
@@ -860,8 +962,8 @@ executeCallback(int result, NdbConnectio
 	      //    ndbout << "Error occured in poll:" << endl;
 	      //    ndbout << NdbObject->getNdbError() << endl;
     failed++ ;
-    return;
   }//if
+  NdbObject->close(); /* Close transaction */
   return;
 }//executeCallback()
 
@@ -929,14 +1031,16 @@ defineOperation(NdbConnection* localNdbC
     error_handler(localNdbOperation->getNdbError()); 
   }//default
   }//switch
-  localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
+
+  localNdbOperation->equal((Uint32)0, (char*)(attrValue + 0));
+  localNdbOperation->equal((Uint32)1, (char*)(attrValue + 1));
   switch (aType) {
   case stInsert:      // Insert case
   case stUpdate:      // Update Case
     {
       for (countAttributes = 1;
            countAttributes < loopCountAttributes; countAttributes++) {
-        localNdbOperation->setValue(countAttributes, 
+        localNdbOperation->setValue(countAttributes + 1,
                                     (char*)&attrValue[0]);
       }//for
       break;
@@ -944,7 +1048,7 @@ defineOperation(NdbConnection* localNdbC
   case stRead: {      // Read Case
     for (countAttributes = 1;
          countAttributes < loopCountAttributes; countAttributes++) {
-      localNdbOperation->getValue(countAttributes, 
+      localNdbOperation->getValue(countAttributes + 1,
                                   (char*)&attrValue[0]);
     }//for
     break;
@@ -1059,86 +1163,126 @@ dropTables(Ndb* pMyNdb)
   }
 }
 
+/*
+  Set up nodeTableArray with a boolean true for all nodes that
+  contains the table.
+*/
+static int
+setUpNodeTableArray(Uint32 tableNo, const NdbDictionary::Table *pTab)
+{
+  Uint32 numFragments = pTab->getFragmentCount();
+  Uint32 nodeId;
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+    nodeTableArray[tableNo][i] = false;
+  for (Uint32 i = 0; i < numFragments; i++)
+  {
+    if ((pTab->getFragmentNodes(i, &nodeId, (Uint32)1)) == 0)
+    {
+      return 1;
+    }
+    nodeTableArray[tableNo][nodeId] = true;
+  }
+  numberNodeTable[tableNo] = 0;
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+  {
+    if (nodeTableArray[tableNo][i])
+      numberNodeTable[tableNo]++;
+  }
+  return 0;
+}
+
+static Uint32
+get_my_node_id(Uint32 tableNo, Uint32 threadNo)
+{
+  Uint32 count = 0;
+  Uint32 n = threadNo % numberNodeTable[tableNo];
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+  {
+    if (nodeTableArray[tableNo][i])
+    {
+      if (count == n)
+        return i;
+      count++;
+    }
+  }
+  return 0;
+}
+
 static
 int 
 createTables(Ndb* pMyNdb){
 
-  NdbSchemaCon          *MySchemaTransaction;
-  NdbSchemaOp           *MySchemaOp;
-  int                   check;
-
-  if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
-    for(int i=0; i < MAXTABLES ;i++) {
+  NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+  if (theTableCreateFlag == 0 || tRunType == RunCreateTable)
+  {
+    for(int i=0; i < MAXTABLES ;i++)
+    {
       ndbout << "Creating " << tableName[i] << "..." << endl;
-      MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
-      
-      if(MySchemaTransaction == NULL && 
-         (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      
-      MySchemaOp = MySchemaTransaction->getNdbSchemaOp();       
-      if(MySchemaOp == NULL &&
-         (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
 
+      NdbDictionary::Table tab;
+      tab.setName(tableName[i]);
+      if (tempTable)
+      {
+        tab.setLogging(false);
+      }
 
-      check = MySchemaOp->createTable( tableName[i]
-                                       ,8                       // Table Size
-                                       ,TupleKey                // Key Type
-                                       ,40                      // Nr of Pages
-                                       ,All
-                                       ,6
-                                       ,(tLoadFactor - 5)
-                                       ,(tLoadFactor)
-                                       ,1
-                                       ,!tempTable
-                                       );
-      
-      if (check == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      
-      check = MySchemaOp->createAttribute( (char*)attrName[0],
-                                           TupleKey,
-                                           32,
-                                           PKSIZE,
-                                           UnSigned,
-                                           MMBased,
-                                           NotNullAttribute );
-      
-      if (check == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      for (unsigned j = 1; j < tNoOfAttributes ; j++){
-        check = MySchemaOp->createAttribute( (char*)attrName[j],
-                                             NoKey,
-                                             32,
-                                             tAttributeSize,
-                                             UnSigned,
-                                             MMBased,
-                                             NotNullAttribute );
-        if (check == -1 &&
-            (!error_handler(MySchemaTransaction->getNdbError())))
-          return -1;
+      {
+        NdbDictionary::Column distkey;
+        distkey.setName("DISTKEY");
+        distkey.setType(NdbDictionary::Column::Unsigned);
+        distkey.setPrimaryKey(true);
+        distkey.setDistributionKey(true);
+        tab.addColumn(distkey);
       }
-      
-      if (MySchemaTransaction->execute() == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
+
+      {
+        NdbDictionary::Column pk;
+        pk.setName(attrName[0]);
+        pk.setType(NdbDictionary::Column::Unsigned);
+        pk.setPrimaryKey(true);
+        tab.addColumn(pk);
+      }
+
+      for (unsigned j = 1; j < tNoOfAttributes ; j++)
+      {
+        NdbDictionary::Column col;
+        col.setName(attrName[j]);
+        col.setType(NdbDictionary::Column::Unsigned);
+        col.setLength(tAttributeSize);
+        tab.addColumn(col);
+      }
+
+      int res = pDict->createTable(tab);
+      if (res != 0)
+      {
+        ndbout << pDict->getNdbError() << endl;
         return -1;
-      
-      NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+      }
+    }
+  }
+
+  for(int i=0; i < MAXTABLES ;i++)
+  {
+    const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+    if (pTab == NULL)
+    {
+      error_handler(pDict->getNdbError());
+      return -1;
+    }
+    tables[i] = pTab;
+    if (setUpNodeTableArray(i, pTab))
+    {
+      error_handler(pDict->getNdbError());
+      return -1;
     }
   }
+
   if (tNdbRecord)
   {
-    for(int i=0; i < MAXTABLES ;i++) {
-      NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
-      const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+    for(int i=0; i < MAXTABLES ;i++)
+    {
+      const NdbDictionary::Table * pTab = tables[i];
 
-      if (pTab == NULL){
-        error_handler(pDict->getNdbError());
-        return -1;
-      }
       int off = 0;
       Vector<NdbDictionary::RecordSpecification> spec;
       for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
@@ -1241,13 +1385,12 @@ readArguments(int argc, const char** arg
         return -1;
       }
     } else if (strcmp(argv[i], "-local") == 0){
-      tLocalPart = atoi(argv[i+1]);
-      tLocal = true;
-      startTransGuess = true;
-      if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
-	ndbout_c("Invalid local part");
+      tLocal = atoi(argv[i+1]);
+      if (tLocal < 1 || (tLocal > 3)){
+        ndbout_c("Invalid local value, only 1,2 or 3 allowed");
         return -1;
       }
+      startTransGuess = true;
     } else if (strcmp(argv[i], "-simple") == 0){
       theSimpleFlag = 1;
       argc++;
@@ -1340,7 +1483,7 @@ readArguments(int argc, const char** arg
     argc -= 2;
     i = i + 2;
   }//while
-  if (tLocal == true) {
+  if (tLocal > 0) {
     if (tNoOfOpsPerTrans != 1) {
       ndbout_c("Not valid to have more than one op per trans with local");
     }//if
@@ -1377,7 +1520,7 @@ input_error(){
   ndbout_c("   -adaptive Use adaptive send algorithm (default)");
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
-  ndbout_c("   -local Number of part, only use keys in one part out of 16");
+  ndbout_c("   -local 1 = each thread its own node, 2 = round robin on node per parallel trans 3 = random node per parallel trans");
   ndbout_c("   -ndbrecord Use NDB Record");
   ndbout_c("   -r Number of extra loops");
   ndbout_c("   -insert Only run inserts on standard table");

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4814 to 4818) jonas oreland25 Jan