List:Commits« Previous MessageNext Message »
From:jonas oreland Date:January 4 2012 2:35pm
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4397 to 4400)
View as plain text  
 4400 jonas oreland	2012-01-04 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/blocks/trpman.hpp
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.hpp
 4399 jonas oreland	2012-01-04 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/kernel/vm/ArrayPool.hpp
 4398 jonas oreland	2012-01-04 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 4397 jonas oreland	2012-01-02 [merge]
      ndb - merge 70 to 71

    modified:
      mysql-test/suite/ndb/r/ndb_column_properties.result
      mysql-test/suite/ndb/t/ndb_column_properties.test
      mysql-test/t/ctype_cp932_binlog_stm.test
      sql/ha_ndb_index_stat.cc
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2011-12-13 18:32:26 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2012-01-04 13:41:41 +0000
@@ -215,6 +215,9 @@
 #define MAX_NDBMT_TC_THREADS  4
 #endif
 
+#define MAX_NDBMT_SEND_THREADS    0
+#define MAX_NDBMT_RECEIVE_THREADS 1
+
 #define NDB_FILE_BUFFER_SIZE (256*1024)
 
 /**

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-04 08:53:15 +0000
@@ -534,7 +534,8 @@ inline void
 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
 {
   assert(nodeId < MAX_NODES);
-  m_status_overloaded.set(nodeId, val);
+  if (val != m_status_overloaded.get(nodeId))
+    m_status_overloaded.set(nodeId, val);
 }
 
 inline const NodeBitmask&

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2011-11-28 08:07:48 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2012-01-04 13:41:41 +0000
@@ -57,6 +57,7 @@ extern int simulate_error_during_shutdow
 // Index pages used by ACC instances
 Uint32 g_acc_pages_used[1 + MAX_NDBMT_LQH_WORKERS];
 
+extern void mt_init_receiver_cache();
 extern void mt_set_section_chunk_size();
 
 Cmvmi::Cmvmi(Block_context& ctx) :
@@ -83,6 +84,7 @@ Cmvmi::Cmvmi(Block_context& ctx) :
   g_sectionSegmentPool.setSize(long_sig_buffer_size,
                                true,true,true,CFG_DB_LONG_SIGNAL_BUFFER);
 
+  mt_init_receiver_cache();
   mt_set_section_chunk_size();
 
   // Add received signals

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-12-15 10:09:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2012-01-04 08:34:49 +0000
@@ -1127,7 +1127,9 @@ private:
   void allocStoredReplica(FragmentstorePtr regFragptr,
                           ReplicaRecordPtr& newReplicaPtr,
                           Uint32 nodeId);
-  Uint32 extractNodeInfo(const Fragmentstore * fragPtr, Uint32 nodes[]);
+  Uint32 extractNodeInfo(EmulatedJamBuffer *jambuf,
+                         const Fragmentstore * fragPtr,
+                         Uint32 nodes[]);
   bool findBestLogNode(CreateReplicaRecord* createReplica,
                        FragmentstorePtr regFragptr,
                        Uint32 startGci,

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-12-15 10:09:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2012-01-04 08:34:49 +0000
@@ -3846,7 +3846,7 @@ done:
       FragmentstorePtr fragPtr;
       getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
       Uint32 nodes[MAX_REPLICAS];
-      extractNodeInfo(fragPtr.p, nodes);
+      extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
 
       req->lqhLogNode[0] = nodes[0]; // Source
       req->requestInfo = StartFragReq::SFR_COPY_FRAG;
@@ -4179,7 +4179,7 @@ void Dbdih::toCopyFragLab(Signal* signal
   FragmentstorePtr fragPtr;
   getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
   Uint32 nodes[MAX_REPLICAS];
-  extractNodeInfo(fragPtr.p, nodes);
+  extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
   takeOverPtr.p->toCopyNode = nodes[0];
   
   PrepareCopyFragReq* req= (PrepareCopyFragReq*)signal->getDataPtrSend();
@@ -4376,7 +4376,7 @@ Dbdih::toStartCopyFrag(Signal* signal, T
   copyFragReq->distributionKey = fragPtr.p->distributionKey;
   copyFragReq->gci = gci;
   Uint32 len = copyFragReq->nodeCount = 
-    extractNodeInfo(fragPtr.p, 
+    extractNodeInfo(jamBuffer(), fragPtr.p, 
                     copyFragReq->nodeList);
   copyFragReq->nodeList[len] = takeOverPtr.p->maxPage;
   copyFragReq->nodeList[len+1] = CopyFragReq::CFR_TRANSACTIONAL;
@@ -9227,7 +9227,7 @@ loop:
     return;
   }
   getFragstore(tabPtr.p, fragId, fragPtr);
-  Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes);
+  Uint32 nodeCount = extractNodeInfo(jambuf, fragPtr.p, conf->nodes);
   Uint32 sig2 = (nodeCount - 1) + 
     (fragPtr.p->distributionKey << 16) + 
     (dihGetInstanceKey(fragPtr) << 24);
@@ -9240,7 +9240,9 @@ loop:
     thrjam(jambuf);
     conf->reqinfo |= DiGetNodesConf::REORG_MOVING;
     getFragstore(tabPtr.p, newFragId, fragPtr);
-    nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS);
+    nodeCount = extractNodeInfo(jambuf,
+                               fragPtr.p,
+                               conf->nodes + 2 + MAX_REPLICAS);
     conf->nodes[MAX_REPLICAS] = newFragId;
     conf->nodes[MAX_REPLICAS + 1] = (nodeCount - 1) +
       (fragPtr.p->distributionKey << 16) +
@@ -9251,18 +9253,20 @@ loop:
     goto loop;
 }//Dbdih::execDIGETNODESREQ()
 
-Uint32 Dbdih::extractNodeInfo(const Fragmentstore * fragPtr, Uint32 nodes[]) 
+Uint32 Dbdih::extractNodeInfo(EmulatedJamBuffer *jambuf,
+                              const Fragmentstore * fragPtr,
+                              Uint32 nodes[]) 
 {
   Uint32 nodeCount = 0;
   nodes[0] = nodes[1] = nodes[2] = nodes[3] = 0;
   for (Uint32 i = 0; i < fragPtr->fragReplicas; i++) {
-    jam();
+    thrjam(jambuf);
     NodeRecordPtr nodePtr;
     ndbrequire(i < MAX_REPLICAS);
     nodePtr.i = fragPtr->activeNodes[i];
     ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
     if (nodePtr.p->useInTransactions) {
-      jam();
+      thrjam(jambuf);
       nodes[nodeCount] = nodePtr.i;
       nodeCount++;
     }//if
@@ -9543,7 +9547,7 @@ void Dbdih::execDIH_SCAN_GET_NODES_REQ(S
   
   Uint32 nodes[MAX_REPLICAS];
   getFragstore(tabPtr.p, fragId, fragPtr);
-  Uint32 count = extractNodeInfo(fragPtr.p, nodes);
+  Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
 
   DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend();
   conf->senderData = senderData;
@@ -17519,7 +17523,9 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
 	getFragstore(tabPtr.p, j, fragPtr);
 	
 	Uint32 nodeOrder[MAX_REPLICAS];
-	const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, nodeOrder);
+	const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+                                                    fragPtr.p,
+                                                    nodeOrder);
 	char buf[100];
 	BaseString::snprintf(buf, sizeof(buf), " Table %d Fragment %d(%u) LP: %u - ", tabPtr.i, j, dihGetInstanceKey(fragPtr), fragPtr.p->m_log_part_id);
 	for(Uint32 k = 0; k < noOfReplicas; k++){
@@ -18296,7 +18302,9 @@ void Dbdih::execDIH_SWITCH_REPLICA_REQ(S
    * Do funky stuff
    */
   Uint32 oldOrder[MAX_REPLICAS];
-  const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, oldOrder);
+  const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+                                              fragPtr.p,
+                                              oldOrder);
   
   if (noOfReplicas < req->noOfReplicas) {
     jam();
@@ -18434,7 +18442,9 @@ Dbdih::switchReplica(Signal* signal,
     getFragstore(tabPtr.p, fragNo, fragPtr);
     
     Uint32 oldOrder[MAX_REPLICAS];
-    const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, oldOrder);
+    const Uint32 noOfReplicas = extractNodeInfo(jamBuffer(),
+                                                fragPtr.p,
+                                                oldOrder);
 
     if(oldOrder[0] != nodeId) {
       jam();

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-04 12:12:59 +0000
@@ -51,6 +51,13 @@ static NodeBitmask c_error_9000_nodes_ma
 extern Uint32 MAX_RECEIVED_SIGNALS;
 #endif
 
+static
+bool
+handles_this_node(Uint32 nodeId)
+{
+  return true;
+}
+
 void
 Trpman::execOPEN_COMREQ(Signal* signal)
 {
@@ -70,6 +77,12 @@ Trpman::execOPEN_COMREQ(Signal* signal)
 	   && c_error_9000_nodes_mask.get(tStartingNode)))
 #endif
     {
+      if (!handles_this_node(tStartingNode))
+      {
+        jam();
+        goto done;
+      }
+
       globalTransporterRegistry.do_connect(tStartingNode);
       globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
 
@@ -87,7 +100,8 @@ Trpman::execOPEN_COMREQ(Signal* signal)
     for(unsigned int i = 1; i < MAX_NODES; i++ )
     {
       jam();
-      if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2)
+      if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2 &&
+          handles_this_node(i))
       {
 	jam();
 
@@ -106,12 +120,13 @@ Trpman::execOPEN_COMREQ(Signal* signal)
     }
   }
 
+done:
   if (userRef != 0)
   {
     jam();
     signal->theData[0] = tStartingNode;
     signal->theData[1] = tData2;
-    sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA);
+    sendSignal(userRef, GSN_OPEN_COMCONF, signal, 2, JBA);
   }
 }
 
@@ -167,9 +182,10 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
 //  Uint32 noOfNodes = closeCom->noOfNodes;
 
   jamEntry();
-  for (unsigned i = 0; i < MAX_NODES; i++)
+  for (unsigned i = 1; i < MAX_NODES; i++)
   {
-    if (NodeBitmask::get(closeCom->theNodes, i))
+    if (NodeBitmask::get(closeCom->theNodes, i) &&
+        handles_this_node(i))
     {
       jam();
 
@@ -204,6 +220,18 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
   }
 }
 
+/*
+  We need to implement CLOSE_COMCONF signal for the non-multithreaded
+  case where message should go to QMGR, for multithreaded case it
+  needs to pass through TRPMAN proxy on its way back.
+*/
+void
+Trpman::execCLOSE_COMCONF(Signal *signal)
+{
+  jamEntry();
+  sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+}
+
 void
 Trpman::execENABLE_COMREQ(Signal* signal)
 {
@@ -225,6 +253,8 @@ Trpman::execENABLE_COMREQ(Signal* signal
       break;
     search_from = tStartingNode + 1;
 
+    if (!handles_this_node(tStartingNode))
+      continue;
     globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
     setNodeInfo(tStartingNode).m_connected = true;
 
@@ -353,6 +383,12 @@ Trpman::execDBINFO_SCANREQ(Signal *signa
 
     while (rnode < MAX_NODES)
     {
+      if (!handles_this_node(rnode))
+      {
+        rnode++;
+        continue;
+      }
+
       switch(getNodeInfo(rnode).m_type)
       {
       default:
@@ -429,14 +465,15 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
     CLEAR_ERROR_INSERT_VALUE;
     if (signal->getLength() == 1 || signal->theData[1])
     {
-      for (Uint32 i = 0; i<MAX_NODES; i++)
+      for (Uint32 i = 1; i<MAX_NODES; i++)
       {
-	if (c_error_9000_nodes_mask.get(i))
-	{
-	  signal->theData[0] = 0;
-	  signal->theData[1] = i;
+        if (c_error_9000_nodes_mask.get(i) &&
+            handles_this_node(i))
+        {
+          signal->theData[0] = 0;
+          signal->theData[1] = i;
           execOPEN_COMREQ(signal);
-	}
+        }
       }
     }
     c_error_9000_nodes_mask.clear();
@@ -452,10 +489,13 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
   if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
   {
     Uint32 db = signal->theData[1];
-    Uint32 i = c_error_9000_nodes_mask.find(0);
-    signal->theData[0] = i;
-    sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
-    ndbout_c("stopping %u using %u", i, db);
+    Uint32 i = c_error_9000_nodes_mask.find(1);
+    if (handles_this_node(i))
+    {
+      signal->theData[0] = i;
+      sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
+      ndbout_c("stopping %u using %u", i, db);
+    }
     CLEAR_ERROR_INSERT_VALUE;
   }
 #endif
@@ -482,26 +522,28 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
     for (Uint32 n = 1; n < signal->getLength(); n++)
     {
       Uint32 nodeId = signal->theData[n];
+      if (!handles_this_node(nodeId))
+        continue;
 
       if ((nodeId > 0) &&
           (nodeId < MAX_NODES))
       {
         if (block)
         {
-          ndbout_c("CMVMI : Blocking receive from node %u", nodeId);
+          ndbout_c("TRPMAN : Blocking receive from node %u", nodeId);
 
           globalTransporterRegistry.blockReceive(nodeId);
         }
         else
         {
-          ndbout_c("CMVMI : Unblocking receive from node %u", nodeId);
+          ndbout_c("TRPMAN : Unblocking receive from node %u", nodeId);
 
           globalTransporterRegistry.unblockReceive(nodeId);
         }
       }
       else
       {
-        ndbout_c("CMVMI : Ignoring dump %u for node %u",
+        ndbout_c("TRPMAN : Ignoring dump %u for node %u",
                  arg, nodeId);
       }
     }
@@ -512,12 +554,14 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
     if (signal->getLength() > 1)
     {
       pattern = signal->theData[1];
-      ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-",
+      ndbout_c("TRPMAN : Blocking receive from all ndbds matching pattern -%s-",
                ((pattern == 1)? "Other side":"Unknown"));
     }
 
     for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
     {
+      if (!handles_this_node(node))
+        continue;
       if (globalTransporterRegistry.is_connected(node))
       {
         if (getNodeInfo(node).m_type == NodeInfo::DB)
@@ -542,7 +586,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
             default:
               break;
             }
-            ndbout_c("CMVMI : Blocking receive from node %u", node);
+            ndbout_c("TRPMAN : Blocking receive from node %u", node);
             globalTransporterRegistry.blockReceive(node);
           }
         }
@@ -551,8 +595,10 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
   }
   if (arg == 9991) /* Unblock recv from all blocked */
   {
-    for (Uint32 node = 0; node < MAX_NODES; node++)
+    for (Uint32 node = 1; node < MAX_NODES; node++)
     {
+      if (!handles_this_node(node))
+        continue;
       if (globalTransporterRegistry.isBlocked(node))
       {
         ndbout_c("CMVMI : Unblocking receive from node %u", node);

=== modified file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp	2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp	2012-01-04 12:12:59 +0000
@@ -30,6 +30,7 @@ public:
   BLOCK_DEFINES(Trpman);
 
   void execCLOSE_COMREQ(Signal *signal);
+  void execCLOSE_COMCONF(Signal * signal);
   void execOPEN_COMREQ(Signal *signal);
   void execENABLE_COMREQ(Signal *signal);
   void execDISCONNECT_REP(Signal *signal);

=== modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp'
--- a/storage/ndb/src/kernel/vm/ArrayPool.hpp	2011-09-27 06:44:06 +0000
+++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp	2012-01-04 09:08:22 +0000
@@ -166,6 +166,11 @@ public:
   struct Cache
   {
     Cache(Uint32 a0 = 512, Uint32 a1 = 256) { m_first_free = RNIL; m_free_cnt = 0; m_alloc_cnt = a0; m_max_free_cnt = a1; }
+    void init_cache(Uint32 a0, Uint32 a1)
+    {
+      m_alloc_cnt = a0;
+      m_max_free_cnt = a1;
+    }
     Uint32 m_first_free;
     Uint32 m_free_cnt;
     Uint32 m_alloc_cnt;
@@ -248,17 +253,24 @@ public:
 #endif
 
 protected:
-  Uint32 firstFree;
+  T * theArray;
   Uint32 size;
+  /*
+   * Protect size and theArray which are very seldomly updated from
+   * updates of often updated variables such as firstFree, noOfFree.
+   * Protect here means to have them on separate CPU cache lines to
+   * avoid false CPU cache line sharing.
+   */
+  char protect_read_var[64 - (sizeof(Uint32) + sizeof(void*))];
+  Uint32 firstFree;
   Uint32 noOfFree;
   Uint32 noOfFreeMin;
-  T * theArray;
-  void * alloc_ptr;
 #ifdef ARRAY_GUARD
+  bool chunk;
   Uint32 bitmaskSz;
   Uint32 *theAllocatedBitmask;
-  bool chunk;
 #endif
+  void * alloc_ptr;
 };
 
 template <class T>

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2011-11-11 07:46:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2012-01-04 11:13:53 +0000
@@ -635,7 +635,10 @@ private:
    * are real LQHs run by multiple threads.
    */
 protected:
-  enum { MaxInstances = 3 + MAX_NDBMT_TC_THREADS + MAX_NDBMT_LQH_WORKERS + 1 };
+  enum { MaxInstances = 3 +
+         MAX_NDBMT_TC_THREADS +
+         MAX_NDBMT_LQH_WORKERS +
+         MAX_NDBMT_RECEIVE_THREADS };
 private:
   SimulatedBlock** theInstanceList; // set in main, indexed by instance
   SimulatedBlock* theMainInstance;  // set in all

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2011-11-22 13:03:47 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2012-01-04 11:13:53 +0000
@@ -110,7 +110,20 @@ TransporterRegistry globalTransporterReg
 #endif
 
 #ifdef NDBD_MULTITHREADED
-static SectionSegmentPool::Cache cache(1024,1024);
+static struct ReceiverThreadCache
+{
+  SectionSegmentPool::Cache cache_instance;
+  char pad[64 - sizeof(SectionSegmentPool::Cache)];
+} g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
+
+void
+mt_init_receiver_cache()
+{
+  for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
+  {
+    g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
+  }
+}
 
 void
 mt_set_section_chunk_size()
@@ -119,6 +132,7 @@ mt_set_section_chunk_size()
 }
 
 #else
+void mt_init_receiver_cache(){}
 void mt_set_section_chunk_size(){}
 #endif
 
@@ -128,6 +142,9 @@ TransporterCallbackKernel::deliver_signa
                                           Uint32 * const theData,
                                           LinearSectionPtr ptr[3])
 {
+#ifdef NDBD_MULTITHREADED
+  SectionSegmentPool::Cache & cache = g_receiver_thread_cache[0].cache_instance;
+#endif
 
   const Uint32 secCount = header->m_noOfSections;
   const Uint32 length = header->theLength;

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2011-11-22 16:18:24 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-01-04 13:41:41 +0000
@@ -73,10 +73,18 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
 //#define NDB_MT_LOCK_TO_CPU
 
 #define NUM_MAIN_THREADS 2 // except receiver
-#define MAX_THREADS (NUM_MAIN_THREADS +       \
-                     MAX_NDBMT_LQH_THREADS +  \
-                     MAX_NDBMT_TC_THREADS + 1)
-#define MAX_BLOCK_INSTANCES (MAX_THREADS+1)
+/*
+  MAX_BLOCK_THREADS need not include the send threads since it's
+  used to set size of arrays used by all threads that contains a
+  job buffer and executes signals. The send threads only sends
+  messages directed to other nodes and contains no blocks and
+  executes thus no signals.
+*/
+#define MAX_BLOCK_THREADS (NUM_MAIN_THREADS +       \
+                           MAX_NDBMT_LQH_THREADS +  \
+                           MAX_NDBMT_TC_THREADS +   \
+                           MAX_NDBMT_RECEIVE_THREADS)
+#define MAX_BLOCK_INSTANCES (MAX_BLOCK_THREADS+1)
 
 /* If this is too small it crashes before first signal. */
 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
@@ -87,7 +95,7 @@ static Uint32 num_tc_threads = 0;
 static Uint32 num_threads = 0;
 static Uint32 receiver_thread_no = 0;
 
-#define NO_SEND_THREAD (MAX_THREADS + 1)
+#define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
 
 /* max signal is 32 words, 7 for signal header and 25 datawords */
 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
@@ -864,12 +872,12 @@ struct thr_data
    * These are the thread input queues, where other threads deliver signals
    * into.
    */
-  struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
-  struct thr_job_queue m_in_queue[MAX_THREADS];
+  struct thr_job_queue_head m_in_queue_head[MAX_BLOCK_THREADS];
+  struct thr_job_queue m_in_queue[MAX_BLOCK_THREADS];
   /* These are the write states of m_in_queue[self] in each thread. */
-  struct thr_jb_write_state m_write_states[MAX_THREADS];
+  struct thr_jb_write_state m_write_states[MAX_BLOCK_THREADS];
   /* These are the read states of all of our own m_in_queue[]. */
-  struct thr_jb_read_state m_read_states[MAX_THREADS];
+  struct thr_jb_read_state m_read_states[MAX_BLOCK_THREADS];
 
   /* Jam buffers for making trace files at crashes. */
   EmulatedJamBuffer m_jam;
@@ -951,21 +959,20 @@ extern struct thr_repository g_thr_repos
 struct thr_repository
 {
   thr_repository()
-    : m_receive_lock("recvlock"),
-      m_section_lock("sectionlock"),
+    : m_section_lock("sectionlock"),
       m_mem_manager_lock("memmanagerlock"),
       m_jb_pool("jobbufferpool"),
       m_sb_pool("sendbufferpool")
     {}
 
-  struct thr_spin_lock<64> m_receive_lock;
+  struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
   struct thr_spin_lock<64> m_section_lock;
   struct thr_spin_lock<64> m_mem_manager_lock;
   struct thr_safe_pool<thr_job_buffer> m_jb_pool;
   struct thr_safe_pool<thr_send_page> m_sb_pool;
   Ndbd_mem_manager * m_mm;
   unsigned m_thread_count;
-  struct thr_data m_thread[MAX_THREADS];
+  struct thr_data m_thread[MAX_BLOCK_THREADS];
 
   /**
    * send buffer handling
@@ -1006,11 +1013,11 @@ struct thr_repository
     Uint32 m_bytes;
 
     /* read index(es) in thr_send_queue */
-    Uint32 m_read_index[MAX_THREADS];
+    Uint32 m_read_index[MAX_BLOCK_THREADS];
   } m_send_buffers[MAX_NTRANSPORTERS];
 
   /* The buffers published by threads */
-  thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
+  thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_BLOCK_THREADS];
 
   /*
    * These are used to synchronize during crash / trace dumps.
@@ -1667,6 +1674,7 @@ trp_callback::reportSendLen(NodeId nodeI
 void
 trp_callback::lock_transporter(NodeId node)
 {
+  Uint32 recv_thread_no = 0;
   struct thr_repository* rep = &g_thr_repository;
   /**
    * Note: take the send lock _first_, so that we will not hold the receive
@@ -1678,14 +1686,15 @@ trp_callback::lock_transporter(NodeId no
    * non-waiting (so we will not block sending on other transporters).
    */
   lock(&rep->m_send_buffers[node].m_send_lock);
-  lock(&rep->m_receive_lock);
+  lock(&rep->m_receive_lock[recv_thread_no]);
 }
 
 void
 trp_callback::unlock_transporter(NodeId node)
 {
+  Uint32 recv_thread_no = 0;
   struct thr_repository* rep = &g_thr_repository;
-  unlock(&rep->m_receive_lock);
+  unlock(&rep->m_receive_lock[recv_thread_no]);
   unlock(&rep->m_send_buffers[node].m_send_lock);
 }
 
@@ -1742,8 +1751,8 @@ static
 Uint32
 link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
 {
-  Uint32 ri[MAX_THREADS];
-  Uint32 wi[MAX_THREADS];
+  Uint32 ri[MAX_BLOCK_THREADS];
+  Uint32 wi[MAX_BLOCK_THREADS];
   thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
   for (unsigned thr = 0; thr < num_threads; thr++)
   {
@@ -2803,7 +2812,7 @@ init_thread(thr_data *selfptr)
  * Align signal buffer for better cache performance.
  * Also skew it a litte for each thread to avoid cache pollution.
  */
-#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
+#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_BLOCK_THREADS)
 static Signal *
 aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
 {
@@ -2837,6 +2846,7 @@ mt_receiver_thread_main(void *thr_arg)
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
   bool has_received = false;
+  const unsigned recv_thread_no = 0;
 
   init_thread(selfptr);
   receiverThreadId = thr_no;
@@ -2876,9 +2886,9 @@ mt_receiver_thread_main(void *thr_arg)
       if (check_job_buffers(rep) == 0)
       {
 	watchDogCounter = 8;
-        lock(&rep->m_receive_lock);
+        lock(&rep->m_receive_lock[recv_thread_no]);
         globalTransporterRegistry.performReceive();
-        unlock(&rep->m_receive_lock);
+        unlock(&rep->m_receive_lock[recv_thread_no]);
         has_received = true;
       }
     }
@@ -3393,6 +3403,16 @@ thr_init2(struct thr_repository* rep, st
 
 static
 void
+receive_lock_init(Uint32 recv_thread_id, thr_repository *rep)
+{
+  char buf[100];
+  BaseString::snprintf(buf, sizeof(buf), "receive lock thread id %d",
+                       recv_thread_id);
+  register_lock(&rep->m_receive_lock[recv_thread_id], buf);
+}
+
+static
+void
 send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
 {
   char buf[100];
@@ -3425,6 +3445,10 @@ rep_init(struct thr_repository* rep, uns
   NdbMutex_Init(&rep->stop_for_crash_mutex);
   NdbCondition_Init(&rep->stop_for_crash_cond);
 
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(rep->m_receive_lock); i++)
+  {
+    receive_lock_init(i, rep);
+  }
   for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
   {
     send_buffer_init(i, rep->m_send_buffers+i);
@@ -3496,7 +3520,7 @@ ThreadConfig::init()
   num_lqh_threads = globalData.ndbMtLqhThreads;
   num_tc_threads = globalData.ndbMtTcThreads;
   num_threads = NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads + 1;
-  require(num_threads <= MAX_THREADS);
+  require(num_threads <= MAX_BLOCK_THREADS);
   receiver_thread_no = num_threads - 1;
 
   ndbout << "NDBMT: num_threads=" << num_threads << endl;
@@ -3785,14 +3809,14 @@ FastScheduler::dumpSignalMemory(Uint32 t
    * the same order they were executed (order obtained from signal id).
    *
    * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
-   * (and freed) buffers, plus MAX_THREADS buffers for currently active
+   * (and freed) buffers, plus MAX_BLOCK_THREADS buffers for currently active
    * prio B buffers, plus one active prio A buffer.
    */
   struct {
     const thr_job_buffer *m_jb;
     Uint32 m_pos;
     Uint32 m_max;
-  } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
+  } jbs[THR_FREE_BUF_MAX + MAX_BLOCK_THREADS + 1];
 
   Uint32 num_jbs = 0;
 
@@ -4032,7 +4056,7 @@ mt_get_thread_references_for_blocks(cons
                                     Uint32 dst[], Uint32 len)
 {
   Uint32 cnt = 0;
-  Bitmask<(MAX_THREADS+31)/32> mask;
+  Bitmask<(MAX_BLOCK_THREADS+31)/32> mask;
   mask.set(threadId);
   for (Uint32 i = 0; blocks[i] != 0; i++)
   {

=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2011-12-13 13:14:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2012-01-04 11:13:53 +0000
@@ -30,10 +30,11 @@ static const struct THRConfig::Entries m
   // name    type              min  max
   { "main",  THRConfig::T_MAIN,  1, 1 },
   { "ldm",   THRConfig::T_LDM,   1, MAX_NDBMT_LQH_THREADS },
-  { "recv",  THRConfig::T_RECV,  1, 1 },
+  { "recv",  THRConfig::T_RECV,  1, MAX_NDBMT_RECEIVE_THREADS },
   { "rep",   THRConfig::T_REP,   1, 1 },
   { "io",    THRConfig::T_IO,    1, 1 },
-  { "tc",    THRConfig::T_TC,    0, MAX_NDBMT_TC_THREADS }
+  { "tc",    THRConfig::T_TC,    0, MAX_NDBMT_TC_THREADS },
+  { "send",  THRConfig::T_SEND,  0, MAX_NDBMT_SEND_THREADS }
 };
 
 static const struct THRConfig::Param m_params[] =
@@ -144,6 +145,8 @@ THRConfig::do_parse(unsigned MaxNoOfExec
 
   Uint32 tcthreads = 0;
   Uint32 lqhthreads = 0;
+  Uint32 sendthreads = 0;
+  Uint32 recvthreads = 1;
   switch(MaxNoOfExecutionThreads){
   case 0:
   case 1:
@@ -165,24 +168,32 @@ THRConfig::do_parse(unsigned MaxNoOfExec
     lqhthreads = __ndbmt_lqh_threads;
   }
 
-  add(T_MAIN);
-  add(T_REP);
-  add(T_RECV);
+  add(T_MAIN); /* Global */
+  add(T_REP);  /* Local, main consumer is SUMA */
+  for(Uint32 i = 0; i < recvthreads; i++)
+  {
+    add(T_RECV);
+  }
   add(T_IO);
   for(Uint32 i = 0; i < lqhthreads; i++)
   {
     add(T_LDM);
   }
-
   for(Uint32 i = 0; i < tcthreads; i++)
   {
     add(T_TC);
   }
+  for(Uint32 i = 0; i < sendthreads; i++)
+  {
+    add(T_SEND);
+  }
 
   // If we have set TC-threads...we say that this is "new" code
   // and give error for having too few CPU's in mask compared to #threads
   // started
-  const bool allow_too_few_cpus = (tcthreads == 0);
+  const bool allow_too_few_cpus = (tcthreads == 0 &&
+                                   sendthreads == 0 &&
+                                   recvthreads == 1);
   return do_bindings(allow_too_few_cpus) || do_validate();
 }
 
@@ -451,11 +462,15 @@ THRConfig::do_validate()
   }
 
   /**
-   * LDM can be 1 2 4
+   * LDM can be 1 2 4 8 16
    */
-  if (m_threads[T_LDM].size() == 3)
+  if (m_threads[T_LDM].size() != 1 &&
+      m_threads[T_LDM].size() != 2 &&
+      m_threads[T_LDM].size() != 4 &&
+      m_threads[T_LDM].size() != 8 &&
+      m_threads[T_LDM].size() != 16)
   {
-    m_err_msg.assfmt("No of LDM-instances can be 1,2,4. Specified: %u",
+    m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,16. Specified: %u",
                      m_threads[T_LDM].size());
     return -1;
   }
@@ -886,7 +901,11 @@ THRConfig::do_parse(const char * ThreadC
       add((T_Type)i);
   }
 
-  const bool allow_too_few_cpus = m_threads[T_TC].size() == 0;
+  const bool allow_too_few_cpus =
+    m_threads[T_TC].size() == 0 &&
+    m_threads[T_SEND].size() == 0 &&
+    m_threads[T_RECV].size() == 1;
+
   int res = do_bindings(allow_too_few_cpus);
   if (res != 0)
   {
@@ -958,6 +977,21 @@ THRConfigApplier::appendInfo(BaseString&
                              const unsigned short list[], unsigned cnt) const
 {
   const T_Thread* thr = find_thread(list, cnt);
+  appendInfo(str, thr);
+}
+
+void
+THRConfigApplier::appendInfoSendThread(BaseString& str,
+                                       unsigned instance_no) const
+{
+  const T_Thread* thr = &m_threads[T_SEND][instance_no];
+  appendInfo(str, thr);
+}
+
+void
+THRConfigApplier::appendInfo(BaseString& str,
+                             const T_Thread* thr) const
+{
   assert(thr != 0);
   str.appfmt("(%s) ", getEntryName(thr->m_type));
   if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
@@ -989,27 +1023,27 @@ THRConfigApplier::do_bind(NdbThread* thr
                           const unsigned short list[], unsigned cnt)
 {
   const T_Thread* thr = find_thread(list, cnt);
-  if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
-  {
-    int res = NdbThread_LockCPU(thread, thr->m_bind_no);
-    if (res == 0)
-      return 1;
-    else
-      return -res;
-  }
-#if TODO
-  else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
-  {
-  }
-#endif
-
-  return 0;
+  return do_bind(thread, thr);
 }
 
 int
 THRConfigApplier::do_bind_io(NdbThread* thread)
 {
   const T_Thread* thr = &m_threads[T_IO][0];
+  return do_bind(thread, thr);
+}
+
+int
+THRConfigApplier::do_bind_send(NdbThread* thread, unsigned instance)
+{
+  const T_Thread* thr = &m_threads[T_SEND][instance];
+  return do_bind(thread, thr);
+}
+
+int
+THRConfigApplier::do_bind(NdbThread* thread,
+                          const T_Thread* thr)
+{
   if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
   {
     int res = NdbThread_LockCPU(thread, thr->m_bind_no);

=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp	2011-12-13 13:14:03 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp	2012-01-04 11:13:53 +0000
@@ -44,8 +44,9 @@ public:
     T_REP   = 3, /* SUMA */
     T_IO    = 4, /* FS, SocketServer etc */
     T_TC    = 5, /* TC+SPJ */
+    T_SEND  = 6, /* No blocks */
 
-    T_END  = 6
+    T_END  = 7
   };
 
   THRConfig();
@@ -126,11 +127,15 @@ public:
 
   const char * getName(const unsigned short list[], unsigned cnt) const;
   void appendInfo(BaseString&, const unsigned short list[], unsigned cnt) const;
+  void appendInfoSendThread(BaseString&, unsigned instance_no) const;
   int do_bind(NdbThread*, const unsigned short list[], unsigned cnt);
   int do_bind_io(NdbThread*);
+  int do_bind_send(NdbThread*, unsigned);
 
 protected:
   const T_Thread* find_thread(const unsigned short list[], unsigned cnt) const;
+  void appendInfo(BaseString&, const T_Thread*) const;
+  int do_bind(NdbThread*, const T_Thread*);
 };
 
 #endif // IPCConfig_H

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4397 to 4400) jonas oreland9 Jan