List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:December 1 2008 7:52pm
Subject:bzr push into mysql-5.1 branch (pekka:3139 to 3146) WL#4391
View as plain text  
 3146 Pekka Nousiainen	2008-12-01
      wl#4391 00_fix.diff
      fix - compile error on no-debug
modified:
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp

 3145 Pekka Nousiainen	2008-12-01
      wl#4391 33e_mixed.diff
      mixed - SR
modified:
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp

 3144 Pekka Nousiainen	2008-12-01
      wl#4391 33d_mixed.diff
      mixed - DBLQH: send unpacked if remote is mt-lqh
modified:
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp

 3143 Pekka Nousiainen	2008-12-01
      wl#4391 33c_mixed.diff
      mixed - DBTC: send unpacked if remote is mt-lqh
modified:
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp

 3142 Pekka Nousiainen	2008-12-01
      wl#4391 33b_mixed.diff
      mixed - use instance key, not own instance
modified:
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp

 3141 Pekka Nousiainen	2008-12-01
      wl#4391 32a_mixed.diff
      mixed - store number of lqh workers in NodeInfo
modified:
  storage/ndb/include/kernel/NodeInfo.hpp
  storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp
  storage/ndb/include/ndb_version.h.in
  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
  storage/ndb/src/kernel/main.cpp

 3140 Pekka Nousiainen	2008-12-01
      wl#4391 32b_mutex.diff
      recursive mutex - implement own
modified:
  storage/ndb/src/kernel/blocks/lgman.cpp
  storage/ndb/src/kernel/blocks/tsman.cpp
  storage/ndb/src/kernel/vm/SafeMutex.cpp
  storage/ndb/src/kernel/vm/SafeMutex.hpp

 3139 Jonas Oreland	2008-11-28
      ndb - fix testDict -n FailAddPartition - fix testprg and bugs hidden by bug in
testprg
modified:
  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/test/ndbapi/testDict.cpp

=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp	2008-11-04 08:43:06 +0000
+++ b/storage/ndb/include/kernel/NodeInfo.hpp	2008-12-01 18:04:19 +0000
@@ -37,6 +37,7 @@ public:
   
   Uint32 m_version;       ///< Ndb version
   Uint32 m_mysql_version; ///< MySQL version
+  Uint32 m_lqh_workers;   ///< LQH workers
   Uint32 m_type;          ///< Node type
   Uint32 m_connectCount;  ///< No of times connected
   bool   m_connected;     ///< Node is connected
@@ -50,6 +51,7 @@ inline
 NodeInfo::NodeInfo(){
   m_version = 0;
   m_mysql_version = 0;
+  m_lqh_workers = 0;
   m_type = INVALID;
   m_connectCount = 0;
   m_heartbeat_cnt= 0;

=== modified file 'storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp	2007-01-06 00:21:39 +0000
+++ b/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp	2008-12-01 18:04:19 +0000
@@ -161,7 +161,7 @@ class CmNodeInfoReq {
   friend class Qmgr;
   
 public:
-  STATIC_CONST( SignalLength = 4 );
+  STATIC_CONST( SignalLength = 5 );
   
 private:
   /**
@@ -171,6 +171,7 @@ private:
   Uint32 dynamicId;
   Uint32 version;
   Uint32 mysql_version;
+  Uint32 lqh_workers;   // added in telco-6.4
 };
 
 class CmNodeInfoRef {
@@ -198,20 +199,14 @@ class CmNodeInfoConf {
   friend class Qmgr;
   
 public:
-  STATIC_CONST( SignalLength = 4 );
+  STATIC_CONST( SignalLength = 5 );
   
 private:
   Uint32 nodeId;
   Uint32 dynamicId;
   Uint32 version;
   Uint32 mysql_version;
+  Uint32 lqh_workers;   // added in telco-6.4
 };
 
 #endif
-
-
-
-
-
-
-

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2008-11-14 09:12:01 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2008-12-01 18:04:19 +0000
@@ -102,6 +102,7 @@ Uint32 ndbGetOwnVersion();
 #define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
 #define NDBD_MAX_RECVBYTESIZE_32K MAKE_VERSION(6,3,18)
 #define NDBD_LONG_SCANFRAGREQ MAKE_VERSION(6,4,0)
+#define NDBD_MT_LQH_VERSION MAKE_VERSION(6,4,0)
 
 
 static

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-12-01 18:05:11 +0000
@@ -859,6 +859,11 @@ public:
      * Log part
      */
     Uint32 m_log_part_ptr_i;
+
+    /**
+     * Instance key for fast access.
+     */
+    Uint16 lqhInstanceKey;
   };
   typedef Ptr<Fragrecord> FragrecordPtr;
   

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2008-12-01 18:05:11 +0000
@@ -72,3 +72,10 @@ NdbLogPartInfo::partNoIndex(Uint32 lpno)
   assert(partNo[i] == lpno);
   return i;
 }
+
+Uint32
+NdbLogPartInfo::instanceKey(Uint32 lpno) const
+{
+  assert(lpno < LogParts);
+  return 1 + lpno;
+}

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2008-12-01 18:05:11 +0000
@@ -47,6 +47,7 @@ struct NdbLogPartInfo {
   bool partNoOwner(Uint32 lpno) const;
   bool partNoOwner(Uint32 tabId, Uint32 fragId);
   Uint32 partNoIndex(Uint32 lpno) const;
+  Uint32 instanceKey(Uint32 lpno) const;
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-01 18:50:58 +0000
@@ -720,7 +720,10 @@ void Dblqh::startphase1Lab(Signal* signa
   for (Ti = 0; Ti < chostFileSize; Ti++) {
     ThostPtr.i = Ti;
     ptrCheckGuard(ThostPtr, chostFileSize, hostRecord);
-    // wl4391_todo using own instance() does not work with mixed versions
+    /*
+     * Valid only if receiver has same number of LQH workers.
+     * In general full instance key of fragment must be used.
+     */
     ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
     ThostPtr.p->hostTcBlockRef  = calcTcBlockRef(ThostPtr.i);
     ThostPtr.p->inPackedList = false;
@@ -1594,6 +1597,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
     ndbrequire(ptr.p->logPartNo == logPartNo);
 
     fragptr.p->m_log_part_ptr_i = ptr.i;
+    fragptr.p->lqhInstanceKey = lpinfo.instanceKey(logPartNo);
   }
 
   if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
@@ -3012,99 +3016,163 @@ void Dblqh::sendCommitLqh(Signal* signal
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(alqhBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[5];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->gci_hi;
+  Tdata[2] = tcConnectptr.p->transid[0];
+  Tdata[3] = tcConnectptr.p->transid[1];
+  Tdata[4] = tcConnectptr.p->gci_lo;
+  Uint32 len = 5;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+    len = 4;
+  }
+
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+  if (send_unpacked) {
+    jam();
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    Uint32 Tnode = Thostptr.i;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+    sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
     jam();
     sendPackedSignalLqh(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
-  Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMIT << 28);
-  Uint32 gci_hi = tcConnectptr.p->gci_hi;
-  Uint32 gci_lo = tcConnectptr.p->gci_lo;
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsLqh[pos] = ptrAndType;
-  Thostptr.p->packedWordsLqh[pos + 1] = gci_hi;
-  Thostptr.p->packedWordsLqh[pos + 2] = transid1;
-  Thostptr.p->packedWordsLqh[pos + 3] = transid2;
-  Thostptr.p->packedWordsLqh[pos + 4] = gci_lo;
-  Thostptr.p->noOfPackedWordsLqh = pos + 5;
-
-  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
-  {
-    jam();
-    ndbassert(gci_lo == 0 || getNodeInfo(Thostptr.i).m_connected == false);
-    Thostptr.p->noOfPackedWordsLqh = pos + 4;
   }
-}//Dblqh::sendCommitLqh()
+
+  Tdata[0] |= (ZCOMMIT << 28);
+  Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
+  memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
 
 void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(alqhBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+  if (send_unpacked) {
+    jam();
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    Uint32 Tnode = Thostptr.i;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+    sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsLqh > 22) {
     jam();
     sendPackedSignalLqh(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETE << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETE << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsLqh[pos] = ptrAndType;
-  Thostptr.p->packedWordsLqh[pos + 1] = transid1;
-  Thostptr.p->packedWordsLqh[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsLqh = pos + 3;
-}//Dblqh::sendCompleteLqh()
+  memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
 
 void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(atcBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently TC is single-threaded
+  const bool send_unpacked = false;
+  if (send_unpacked) {
+    jam();
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+    sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsTc > 22) {
     jam();
     sendPackedSignalTc(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMMITTED << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsTc;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMITTED << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsTc[pos] = ptrAndType;
-  Thostptr.p->packedWordsTc[pos + 1] = transid1;
-  Thostptr.p->packedWordsTc[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCommittedTc()
+  memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsTc = pos + len;
+}
 
 void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(atcBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently TC is single-threaded
+  const bool send_unpacked = false;
+  if (send_unpacked) {
+    jam();
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+    sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsTc > 22) {
     jam();
     sendPackedSignalTc(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETED << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsTc;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETED << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsTc[pos] = ptrAndType;
-  Thostptr.p->packedWordsTc[pos + 1] = transid1;
-  Thostptr.p->packedWordsTc[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCompletedTc()
+  memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsTc = pos + len;
+}
 
 void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref)
 {
@@ -3130,7 +3198,9 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig
     lqhKeyConf = (LqhKeyConf *)
       &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
     Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength;
-  } else if(refToMain(atcBlockref) == DBLQH){
+  } else if(refToMain(atcBlockref) == DBLQH &&
+            refToInstance(atcBlockref) == instance()) {
+    //wl4391_todo check
     jam();
 /*******************************************************************
 // This signal was intended for DBLQH as part of log execution or
@@ -5781,8 +5851,10 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
   lqhKeyReq->variableData[nextPos + 0] = sig0;
   nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
 
-  // wl4391_todo for mixed versions must recompute full instance key here
-  BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+  // pass full instance key for remote to map to real instance
+  BlockReference lqhRef = numberToRef(DBLQH,
+                                      fragptr.p->lqhInstanceKey,
+                                      regTcPtr->nextReplica);
   
   if (likely(sendLongReq))
   {
@@ -6112,7 +6184,17 @@ void Dblqh::writeAttrinfoLab(Signal* sig
 /* ------------------------------------------------------------------------- */
 void Dblqh::sendTupkey(Signal* signal) 
 {
-  BlockReference lqhRef = calcInstanceBlockRef(DBLQH, tcConnectptr.p->nextReplica);
+  BlockReference lqhRef = 0;
+  {
+    // wl4391_todo fragptr
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    Uint32 Tnode = tcConnectptr.p->nextReplica;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+  }
+
   signal->theData[0] = tcConnectptr.p->tcOprec;
   signal->theData[1] = tcConnectptr.p->transid[0];
   signal->theData[2] = tcConnectptr.p->transid[1];
@@ -7286,7 +7368,12 @@ void Dblqh::execABORT(Signal* signal) 
 /* ------------------------------------------------------------------------- */
 // We will immediately send the ABORT message also to the next LQH node in line.
 /* ------------------------------------------------------------------------- */
-    BlockReference TLqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = regTcPtr->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    Uint32 Tnode = regTcPtr->nextReplica;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference TLqhRef = numberToRef(DBLQH, instanceKey, Tnode);
     signal->theData[0] = regTcPtr->tcOprec;
     signal->theData[1] = regTcPtr->tcBlockref;
     signal->theData[2] = regTcPtr->transid[0];
@@ -15215,10 +15302,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
      *    WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED 
      *    ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
      * --------------------------------------------------------------------- */
-    BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-    NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
     signal->theData[0] = cownNodeid;
-    sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    if (!isNdbMtLqh()) {
+      NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+      sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    } else {
+      const Uint32 sz = NdbNodeBitmask::Size;
+      m_sr_nodes.copyto(sz, &signal->theData[1]);
+      sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+    }
     return;
   } else {
     jam();
@@ -15232,7 +15324,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       
       Uint32 index = csrPhasesCompleted;
       arrGuard(index, MAX_LOG_EXEC);
-      BlockReference ref = calcInstanceBlockRef(DBLQH,
fragptr.p->srLqhLognode[index]);
+      Uint32 Tnode = fragptr.p->srLqhLognode[index];
+      Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+      BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
       fragptr.p->srStatus = Fragrecord::SS_STARTED;
 
       /* --------------------------------------------------------------------
@@ -15249,7 +15343,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       execFragReq->lastGci = fragptr.p->srLastGci[index];
       sendSignal(ref, GSN_EXEC_FRAGREQ, signal, 
 		 ExecFragReq::SignalLength, JBB);
-
     }
     signal->theData[0] = next;
     sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15339,6 +15432,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
 /* *************** */
 void Dblqh::execEXEC_SRCONF(Signal* signal) 
 {
+  // wl4391_todo workaround until timing fixed
+  if (cnoOutstandingExecFragReq != 0) {
+    ndbout << "delay: reqs=" << cnoOutstandingExecFragReq << endl;
+    sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+                        signal, 10, signal->getLength());
+    return;
+  }
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   arrGuard(nodeId, MAX_NDB_NODES);
@@ -16704,9 +16804,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
   jamEntry();
 
   signal->theData[0] = cownNodeid;
-  BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-  NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
-  sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  if (!isNdbMtLqh()) {
+    NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+    sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  } else {
+    const Uint32 sz = NdbNodeBitmask::Size;
+    m_sr_nodes.copyto(sz, &signal->theData[1]);
+    sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+  }
   return;
 }//Dblqh::srPhase3Comp()
 
@@ -19543,7 +19648,7 @@ Dblqh::validate_filter(Signal* signal)
     default:
       infoEvent("Invalid filter op: 0x%x pos: %ld",
 		* start,
-		start - (signal->theData + 1));
+		(long int)(start - (signal->theData + 1)));
       return false;
     }
   }

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-12-01 18:06:58 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
 
   // GSN_SUB_GCP_COMPLETE_REP
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+  // GSN_EXEC_SRREQ
+  addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+  addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
 }
 
 DblqhProxy::~DblqhProxy()
@@ -1329,4 +1333,112 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
   ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
 }
 
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+  const Ss_EXEC_SR_1::Sig* sig =
+    (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+  Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+  ss.m_gsn = gsn;
+  ss.m_sig = *sig;
+
+  sendREQ(signal, ss);
+  ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+  const Ss_EXEC_SR_2::Sig* sig =
+    (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+
+  bool found = false;
+  Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+  if (!found) {
+    jam();
+    setMask(ss);
+  }
+
+  ndbrequire(sig->nodeId == getOwnNodeId());
+  if (ss.m_sigcount == 0) {
+    jam();
+    ss.m_gsn = gsn;
+    ss.m_sig = *sig;
+  } else {
+    jam();
+    ndbrequire(ss.m_gsn == gsn);
+    ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+  }
+  ss.m_sigcount++;
+
+  // reversed roles
+  recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+  if (!lastReply(ss)) {
+    jam();
+    return;
+  }
+
+  NodeBitmask nodes;
+  nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+  NodeReceiverGroup rg(DBLQH, nodes);
+
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+  ssRelease<Ss_EXEC_SR_2>(ssId);
+}
+
 BLOCK_FUNCTIONS(DblqhProxy)

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-12-01 18:06:58 +0000
@@ -380,9 +380,7 @@ protected:
 
   // GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
   struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
     static const char* name() { return "START_RECREQ_2"; }
-#endif
     struct Req {
       enum { SignalLength = 2 };
       Uint32 lcpId;
@@ -458,6 +456,71 @@ protected:
   void execEMPTY_LCP_CONF(Signal*);
   void execEMPTY_LCP_REF(Signal*);
   void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_1 [ fictional gsn ]
+  struct Ss_EXEC_SR_1 : SsParallel {
+    /*
+     * Handle EXEC_SRREQ and EXEC_SRCONF.  These are broadcast
+     * signals (not REQ/CONF).  EXEC_SR_1 receives one signal and
+     * sends it to its workers.  EXEC_SR_2 waits for signal from
+     * all workers and broadcasts it to all nodes.  These are
+     * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+     */
+    static const char* name() { return "EXEC_SR_1"; }
+    struct Sig {
+      enum { SignalLength = 1 };
+      Uint32 nodeId;
+    };
+    GlobalSignalNumber m_gsn;
+    Sig m_sig;
+    Ss_EXEC_SR_1() {
+      m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+      m_sendCONF = (SsFUNC)0;
+      m_gsn = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+    }
+  };
+  SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+  Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SRREQ(Signal*);
+  void execEXEC_SRCONF(Signal*);
+  void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_2 [ fictional gsn ]
+  struct Ss_EXEC_SR_2 : SsParallel {
+    static const char* name() { return "EXEC_SR_2"; }
+    struct Sig {
+      enum { SignalLength = 1 + NdbNodeBitmask::Size };
+      Uint32 nodeId;
+      Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+    };
+    GlobalSignalNumber m_gsn;
+    Uint32 m_sigcount;
+    Sig m_sig; // all signals must be identical
+    Ss_EXEC_SR_2() {
+      // reversed roles
+      m_sendREQ = (SsFUNC)0;
+      m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+      m_gsn = 0;
+      m_sigcount = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+    }
+  };
+  SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+  Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_2(Signal*, Uint32 ssId);
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-11-13 15:05:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-12-01 18:05:48 +0000
@@ -4829,25 +4829,30 @@ void Dbtc::sendCommitLqh(Signal* signal,
   Thostptr.i = regTcPtr->lastLqhNodeId;
   ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = regTcPtr->lastLqhCon;
-  UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
-  UintR Tdata3 = regApiPtr->transid[0];
-  UintR Tdata4 = regApiPtr->transid[1];
-  UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
+  Uint32 Tdata[5];
+  Tdata[0] = regTcPtr->lastLqhCon;
+  Tdata[1] = Uint32(regApiPtr->globalcheckpointid >> 32);
+  Tdata[2] = regApiPtr->transid[0];
+  Tdata[3] = regApiPtr->transid[1];
+  Tdata[4] = Uint32(regApiPtr->globalcheckpointid);
+  Uint32 len = 5;
 
-  // wl4391_todo testing own config is wrong for mixed versions
-  bool send_unpacked = isNdbMtLqh();
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+    len = 4;
+  }
+
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
   if (send_unpacked) {
     Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata1;
-    data[1] = Tdata2;
-    data[2] = Tdata3;
-    data[3] = Tdata4;
-    data[4] = Tdata5;
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
     Uint32 Tnode = Thostptr.i;
     Uint32 instanceKey = regTcPtr->lqhInstanceKey;
     BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
-    sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB);
+    sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
     return;
   }
 
@@ -4857,23 +4862,14 @@ void Dbtc::sendCommitLqh(Signal* signal,
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMMIT << 28);
   UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
   UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
-  TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
-  TDataPtr[1] = Tdata2;
-  TDataPtr[2] = Tdata3;
-  TDataPtr[3] = Tdata4;
-  TDataPtr[4] = Tdata5;
-  Thostptr.p->noOfPackedWordsLqh = Tindex + 5;
-
-  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
-  {
-    jam();
-    ndbassert(Tdata5 == 0 || getNodeInfo(Thostptr.i).m_connected == false);
-    Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo
-  }
-}//Dbtc::sendCommitLqh()
+  memcpy(TDataPtr, &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
 
 void
 Dbtc::DIVER_node_fail_handling(Signal* signal, Uint64 Tgci)
@@ -5222,16 +5218,16 @@ void Dbtc::sendCompleteLqh(Signal* signa
   Thostptr.i = regTcPtr->lastLqhNodeId; //last???
   ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = regTcPtr->lastLqhCon;
-  UintR Tdata2 = regApiPtr->transid[0];
-  UintR Tdata3 = regApiPtr->transid[1];
+  Uint32 Tdata[3];
+  Tdata[0] = regTcPtr->lastLqhCon;
+  Tdata[1] = regApiPtr->transid[0];
+  Tdata[2] = regApiPtr->transid[1];
+  Uint32 len = 3;
 
-  bool send_unpacked = isNdbMtLqh();
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
   if (send_unpacked) {
-    Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata1;
-    data[1] = Tdata2;
-    data[2] = Tdata3;
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
     Uint32 Tnode = Thostptr.i;
     Uint32 instanceKey = regTcPtr->lqhInstanceKey;
     BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
@@ -5245,14 +5241,14 @@ void Dbtc::sendCompleteLqh(Signal* signa
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETE << 28);
   UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
   UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
-  TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28);
-  TDataPtr[1] = Tdata2;
-  TDataPtr[2] = Tdata3;
-  Thostptr.p->noOfPackedWordsLqh = Tindex + 3;
-}//Dbtc::sendCompleteLqh()
+  memcpy(TDataPtr, &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
 
 void
 Dbtc::execTC_COMMIT_ACK(Signal* signal){
@@ -5301,22 +5297,25 @@ Dbtc::sendRemoveMarker(Signal* signal, 
   hostPtr.i = nodeId;
   ptrCheckGuard(hostPtr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = 0;
-  UintR Tdata2 = transid1;
-  UintR Tdata3 = transid2;
+  Uint32 Tdata[3];
+  Tdata[0] = 0;
+  Tdata[1] = transid1;
+  Tdata[2] = transid2;
+  Uint32 len = 3;
 
-  bool send_unpacked = isNdbMtLqh();
+  // currently packed signals can not address specific instance
+  bool send_unpacked = getNodeInfo(hostPtr.i).m_lqh_workers != 0;
   if (send_unpacked) {
-    Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata2;
-    data[1] = Tdata3;
+    jam();
+    // first word omitted
+    memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2);
     Uint32 Tnode = hostPtr.i;
     Uint32 i;
     for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) {
       // wl4391_todo skip workers not part of tx
       Uint32 instanceKey = 1 + i;
       BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
-      sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 2, JBB);
+      sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
     }
     return;
   }
@@ -5327,14 +5326,13 @@ Dbtc::sendRemoveMarker(Signal* signal, 
   } else {
     jam();
     updatePackedList(signal, hostPtr.p, hostPtr.i);
-  }//if
+  }
   
   UintR  numWord = hostPtr.p->noOfPackedWordsLqh;
   UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
 
-  dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28);
-  dataPtr[1] = Tdata2;
-  dataPtr[2] = Tdata3;
+  Tdata[0] |= (ZREMOVE_MARKER << 28);
+  memcpy(dataPtr, &Tdata[0], len << 2);
   hostPtr.p->noOfPackedWordsLqh = numWord + 3;
 }
 
@@ -12274,7 +12272,7 @@ Dbtc::validate_filter(Signal* signal)
     default:
       infoEvent("Invalid filter op: 0x%x pos: %ld",
 		* start,
-		start - (signal->theData + 1));
+		(long int)(start - (signal->theData + 1)));
       return false;
     }
   }

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2008-12-01 18:03:48 +0000
@@ -56,11 +56,7 @@ Lgman::Lgman(Block_context & ctx) :
   m_tup(0),
   m_logfile_group_list(m_logfile_group_pool),
   m_logfile_group_hash(m_logfile_group_pool),
-#ifdef __sun // temp
-  m_client_mutex(1, false)
-#else
-  m_client_mutex(2, true)
-#endif
+  m_client_mutex("lgman-client", 2, true)
 {
   BLOCK_CONSTRUCTOR(Lgman);
   

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2008-11-13 13:36:29 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2008-12-01 18:04:19 +0000
@@ -991,6 +991,9 @@ void Qmgr::execCM_REGCONF(Signal* signal
   c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);
 
   myNodePtr.p->ndynamicId = TdynamicId;
+
+  // set own MT config here or in REF, and others in CM_NODEINFOREQ/CONF
+  setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
   
 /*--------------------------------------------------------------*/
 // Send this as an EVENT REPORT to inform about hearing about
@@ -1109,6 +1112,7 @@ Qmgr::sendCmNodeInfoReq(Signal* signal, 
   req->dynamicId = self->ndynamicId;
   req->version = getNodeInfo(getOwnNodeId()).m_version;
   req->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+  req->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
   const Uint32 ref = calcQmgrBlockRef(nodeId);
   sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB);
   DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, "");
@@ -1214,6 +1218,9 @@ void Qmgr::execCM_REGREF(Signal* signal)
 
   skip_nodes.bitAND(c_definedNodes);
   c_start.m_skip_nodes.bitOR(skip_nodes);
+
+  // set own MT config here or in CONF, and others in CM_NODEINFOREQ/CONF
+  setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
   
   char buf[100];
   switch (TrefuseReason) {
@@ -1661,11 +1668,17 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
   const Uint32 dynamicId = conf->dynamicId;
   const Uint32 version = conf->version;
   Uint32 mysql_version = conf->mysql_version;
+  Uint32 lqh_workers = conf->lqh_workers;
   if (version < NDBD_SPLIT_VERSION)
   {
     jam();
     mysql_version = 0;
   }
+  if (version < NDBD_MT_LQH_VERSION)
+  {
+    jam();
+    lqh_workers = 0;
+  }
 
   NodeRecPtr nodePtr;  
   nodePtr.i = getOwnNodeId();
@@ -1684,6 +1697,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
   replyNodePtr.p->blockRef = signal->getSendersBlockRef();
   setNodeInfo(replyNodePtr.i).m_version = version;
   setNodeInfo(replyNodePtr.i).m_mysql_version = mysql_version;
+  setNodeInfo(replyNodePtr.i).m_lqh_workers = lqh_workers;
 
   recompute_version_info(NodeInfo::DB, version);
   
@@ -1741,8 +1755,13 @@ void Qmgr::execCM_NODEINFOREQ(Signal* si
   Uint32 mysql_version = req->mysql_version;
   if (req->version < NDBD_SPLIT_VERSION)
     mysql_version = 0;
-  
   setNodeInfo(addNodePtr.i).m_mysql_version = mysql_version;
+
+  Uint32 lqh_workers = req->lqh_workers;
+  if (req->version < NDBD_MT_LQH_VERSION)
+    lqh_workers = 0;
+  setNodeInfo(addNodePtr.i).m_lqh_workers = lqh_workers;
+
   c_maxDynamicId = req->dynamicId;
 
   cmAddPrepare(signal, addNodePtr, nodePtr.p);
@@ -1799,6 +1818,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR
   conf->dynamicId = self->ndynamicId;
   conf->version = getNodeInfo(getOwnNodeId()).m_version;
   conf->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+  conf->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
   sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal,
 	     CmNodeInfoConf::SignalLength, JBB);
   DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2008-12-01 18:03:48 +0000
@@ -45,11 +45,7 @@ Tsman::Tsman(Block_context& ctx) :
   m_pgman(0),
   m_lgman(0),
   m_tup(0),
-#ifdef __sun // temp
-  m_client_mutex(1, false)
-#else
-  m_client_mutex(2, true)
-#endif
+  m_client_mutex("tsman-client", 2, true)
 {
   BLOCK_CONSTRUCTOR(Tsman);
 

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2008-11-09 18:37:29 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2008-12-01 18:04:19 +0000
@@ -293,8 +293,10 @@ get_multithreaded_config(EmulatorData& e
 {
   // multithreaded is compiled in ndbd/ndbmtd for now
   globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
-  if (!globalData.isNdbMt)
+  if (!globalData.isNdbMt) {
+    ndbout << "NDBMT: non-mt" << endl;
     return 0;
+  }
 
   const ndb_mgm_configuration_iterator * p =
     ed.theConfiguration->getOwnConfigIterator();

=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp	2008-12-01 18:03:48 +0000
@@ -15,94 +15,151 @@
 
 #include "SafeMutex.hpp"
 
-NdbOut&
-operator<<(NdbOut& out, const SafeMutex& dm)
-{
-  out << "level=" << dm.m_level << "," << "usage=" <<
dm.m_usage;
-  return out;
-}
-
 int
 SafeMutex::create()
 {
-  if (m_init)
-    return ErrState;
-  int ret = -1;
-#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifndef __WIN__
-  if (m_limit > 1 || m_debug) {
-    pthread_mutexattr_t attr;
-    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-    ret = pthread_mutex_init(&m_mutex, &attr);
-  } else {
-    // error-check mutex does not work right on my linux, skip it
-    ret = pthread_mutex_init(&m_mutex, 0);
-  }
-#else
-  ret = pthread_mutex_init(&m_mutex, 0);
-#endif
-#else
-  if (m_limit > 1 || m_debug)
-    return ErrUnsupp;
+  int ret;
+  if (m_initdone)
+    return err(ErrState, __LINE__);
   ret = pthread_mutex_init(&m_mutex, 0);
-#endif
   if (ret != 0)
-    return ret;
-  m_init = true;
+    return err(ret, __LINE__);
+  ret = pthread_cond_init(&m_cond, 0);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  m_initdone = true;
   return 0;
 }
 
 int
 SafeMutex::destroy()
 {
-  if (!m_init)
-    return ErrState;
-  int ret = pthread_mutex_destroy(&m_mutex);
+  int ret;
+  if (!m_initdone)
+    return err(ErrState, __LINE__);
+  ret = pthread_cond_destroy(&m_cond);
   if (ret != 0)
-    return ret;
-  m_init = false;
+    return err(ret, __LINE__);
+  ret = pthread_mutex_destroy(&m_mutex);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  m_initdone = false;
   return 0;
 }
 
 int
 SafeMutex::lock()
 {
-  pthread_t self = pthread_self();
-  int ret = pthread_mutex_lock(&m_mutex);
-  /* have mutex */
+  int ret;
+  if (m_simple) {
+    ret = pthread_mutex_lock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    return 0;
+  }
+  ret = pthread_mutex_lock(&m_mutex);
   if (ret != 0)
-    return ret;
-  if (!(m_level < m_limit))
-    return ErrLevel;
-  m_level++;
-  if (m_level > m_usage)
-    m_usage = m_level;
-  if (m_level == 1 && m_owner != 0)
-    return ErrOwner1;
-  if (m_level >= 2 && m_owner != self)
-    return ErrOwner2;
-  m_owner = self;
+    return err(ret, __LINE__);
+  return lock_impl();
+}
+
+int
+SafeMutex::lock_impl()
+{
+  int ret;
+  pthread_t self = pthread_self();
+  assert(self != 0);
+  while (1) {
+    if (m_level == 0) {
+      assert(m_owner == 0);
+      m_owner = self;
+    } else if (m_owner != self) {
+      ret = pthread_cond_wait(&m_cond, &m_mutex);
+      if (ret != 0)
+        return err(ret, __LINE__);
+      continue;
+    }
+    if (!(m_level < m_limit))
+      return err(ErrLevel, __LINE__);
+    m_level++;
+    if (m_usage < m_level)
+      m_usage = m_level;
+    ret = pthread_cond_signal(&m_cond);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    ret = pthread_mutex_unlock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    break;
+  }
   return 0;
 }
 
 int
 SafeMutex::unlock()
 {
+  int ret;
+  if (m_simple) {
+    ret = pthread_mutex_unlock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    return 0;
+  }
+  ret = pthread_mutex_lock(&m_mutex);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  return unlock_impl();
+}
+
+int
+SafeMutex::unlock_impl()
+{
+  int ret;
   pthread_t self = pthread_self();
-  if (!(m_level > 0))
-    return ErrState;
+  assert(self != 0);
   if (m_owner != self)
-    return ErrOwner3;
-  if (m_level == 1)
-    m_owner = 0;
+    return err(ErrOwner, __LINE__);
+  if (m_level == 0)
+    return err(ErrNolock, __LINE__);
   m_level--;
-  int ret = pthread_mutex_unlock(&m_mutex);
-  /* lose mutex */
+  if (m_level == 0) {
+    m_owner = 0;
+    ret = pthread_cond_signal(&m_cond);
+    if (ret != 0)
+      return err(ret, __LINE__);
+  }
+  ret = pthread_mutex_unlock(&m_mutex);
   if (ret != 0)
-    return ret;
+    return err(ret, __LINE__);
   return 0;
 }
 
+int
+SafeMutex::err(int errcode, int errline)
+{
+  assert(errcode != 0);
+  m_errcode = errcode;
+  m_errline = errline;
+  ndbout << *this << endl;
+#ifdef UNIT_TEST
+  abort();
+#endif
+  return errcode;
+}
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& sm)
+{
+  out << sm.m_name << ":";
+  out << " level=" << sm.m_level;
+  out << " usage=" << sm.m_usage;
+  if (sm.m_errcode != 0) {
+    out << " errcode=" << sm.m_errcode;
+    out << " errline=" << sm.m_errline;
+  }
+  return out;
+}
+
 #ifdef UNIT_TEST
 
 struct sm_thr {
@@ -144,10 +201,12 @@ sm_run(void* arg)
     }
     if (op == +1) {
       assert(level < thr.limit);
+      //ndbout << thr.index << ": lock" << endl;
       int ret = sm.lock();
       assert(ret == 0);
       level++;
     } else if (op == -1) {
+      //ndbout << thr.index << ": unlock" << endl;
       int ret = sm.unlock();
       assert(ret == 0);
       assert(level != 0);
@@ -161,6 +220,7 @@ sm_run(void* arg)
     assert(ret == 0);
     level--;
   }
+  return 0;
 }
 
 int
@@ -169,18 +229,19 @@ main(int argc, char** argv)
   const uint max_thr = 128;
   struct sm_thr thr[max_thr];
 
-  // threads - loops - max level
+  // threads - loops - max level - debug
   uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
   assert(num_thr != 0 && num_thr <= max_thr);
   uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
   uint limit = argc > 3 ? atoi(argv[3]) : 10;
   assert(limit != 0);
+  bool debug = argc > 4 ? atoi(argv[4]) : true;
 
   ndbout << "threads=" << num_thr;
   ndbout << " loops=" << loops;
   ndbout << " max level=" << limit << endl;
 
-  SafeMutex sm(limit, true);
+  SafeMutex sm("test-mutex", limit, debug);
   int ret;
   ret = sm.create();
   assert(ret == 0);

=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-12-01 18:03:48 +0000
@@ -26,61 +26,70 @@
 #include <ndb_types.h>
 #include <NdbOut.hpp>
 
-#undef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifdef __linux
-#define HAVE_PTHREAD_MUTEX_RECURSIVE
-#endif
-
 /*
- * Recursive mutex with a recursion limit >= 1.  Can be useful for
- * debugging.  If a recursive mutex is not wanted, one must rewrite
- * caller code until limit 1 works.
+ * Recursive mutex with recursion limit >= 1.  Intended for debugging.
+ * One should rewrite caller code until limit 1 works.
  *
- * Implementation for limit > 1 uses a real OS recursive mutex.  Should
- * work on linux and solaris 10.  There is a unit test testSafeMutex.
+ * The implementation uses a default mutex.  If limit is > 1 or debug
+ * is specified then a recursive mutex is simulated.  Operating system
+ * recursive mutex (if any) is not used.  The simulation is several
+ * times slower.  There is a unit test testSafeMutex.
  *
  * The caller currently is multi-threaded disk data.  Here it is easy
  * to verify that the mutex is released within a time-slice.
  */
 
 class SafeMutex {
+  const char* const m_name;
+  const Uint32 m_limit; // error if usage exceeds this
+  const bool m_debug;   // use recursive implementation even for limit 1
+  const bool m_simple;
   pthread_mutex_t m_mutex;
+  pthread_cond_t m_cond;
   pthread_t m_owner;
-  bool m_init;
+  bool m_initdone;
   Uint32 m_level;
   Uint32 m_usage;       // max level used so far
-  const Uint32 m_limit; // error if usage exceeds this
-  const bool m_debug;   // use recursive mutex even for limit 1
+  int m_errcode;
+  int m_errline;
+  int err(int errcode, int errline);
   friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
 
 public:
-  SafeMutex(Uint32 limit, bool debug) :
+  SafeMutex(const char* name, Uint32 limit, bool debug) :
+    m_name(name),
     m_limit(limit),
-    m_debug(debug)
+    m_debug(debug),
+    m_simple(!(limit > 1 || debug))
   {
     assert(m_limit >= 1),
     m_owner = 0;        // wl4391_todo assuming numeric non-zero
-    m_init = false;
+    m_initdone = false;
     m_level = 0;
     m_usage = 0;
+    m_errcode = 0;
+    m_errline = 0;
   };
   ~SafeMutex() {
-    (void)destroy();
+    if (m_initdone)
+      (void)destroy();
   }
 
   enum {
-    // caller must crash on any error
-    ErrUnsupp = -101,   // limit > 1 or debug, and not supported by OS
-    ErrState = -102,    // user error
-    ErrLevel = -103,    // level exceeded limit
-    ErrOwner1 = -104,   // owner not 0 at first lock (OS error)
-    ErrOwner2 = -105,   // owner not self at recursive lock (OS error)
-    ErrOwner3 = -106    // owner not self at unlock (OS error)
+    // caller must crash on any error - recovery is not possible
+    ErrState = -101,    // user error
+    ErrLevel = -102,    // level exceeded limit
+    ErrOwner = -103,    // unlock when not owner
+    ErrNolock = -104    // unlock when no lock
   };
   int create();
   int destroy();
   int lock();
   int unlock();
+
+private:
+  int lock_impl();
+  int unlock_impl();
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-01 18:05:11 +0000
@@ -544,6 +544,7 @@ protected:
   BlockReference calcInstanceBlockRef(BlockNumber aBlock);
 
   // matching instance on another node e.g. LQH-LQH
+  // valid only if receiver has same number of workers
   BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
 
   /** 

Thread
bzr push into mysql-5.1 branch (pekka:3139 to 3146) WL#4391Pekka Nousiainen1 Dec