List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 22 2008 10:36am
Subject:bzr commit into mysql-5.1 branch (pekka:2722) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 2722 Pekka Nousiainen	2008-08-22
      wl#4391 14.diff
      Divide log parts between LQH workers (required for SR).
added:
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
modified:
  storage/ndb/src/kernel/blocks/LocalProxy.hpp
  storage/ndb/src/kernel/blocks/Makefile.am
  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-08-22 10:36:33 +0000
@@ -50,6 +50,12 @@ protected:
   virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
   virtual void loadWorkers();
 
+  // worker index to worker instance
+  Uint32 workerInstance(Uint32 i) {
+    ndbrequire(i < c_workers);
+    return 1 + i;
+  }
+
   // worker index to worker ref
   BlockReference workerRef(Uint32 i) {
     ndbrequire(i < c_workers);

=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am	2008-08-22 10:36:33 +0000
@@ -61,7 +61,8 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
   dbtup/DbtupProxy.cpp \
   dbtux/DbtuxProxy.cpp \
   backup/BackupProxy.cpp \
-  RestoreProxy.cpp
+  RestoreProxy.cpp \
+  dblqh/DblqhCommon.cpp
 
 EXTRA_PROGRAMS = ndb_print_file
 ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-08-11 11:21:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-08-22 10:36:33 +0000
@@ -635,6 +635,7 @@ public:
   };
   
 private:
+  friend class SimulatedBlock;
   BLOCK_DEFINES(Dbdih);
   
   void execDUMP_STATE_ORD(Signal *);
@@ -1774,8 +1775,8 @@ private:
   NdbNodeBitmask m_sr_nodes;
   NdbNodeBitmask m_to_nodes;
 
-  // block instances
-public:
+  // MT LQH
+
   Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
     ndbrequire(!tFragPtr.isNull());
     Uint32 log_part_id = tFragPtr.p->m_log_part_id;
@@ -1783,7 +1784,6 @@ public:
     return instanceKey;
   }
   Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
-  Uint32 dihGetLogPartId(Uint32 tabId, Uint32 fragId);
 };
 
 #if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-08-11 12:48:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-08-22 10:36:33 +0000
@@ -16936,7 +16936,8 @@ do_send:
 
 #endif
 
-// block instances
+// MT LQH
+
 Uint32
 Dbdih::dihGetInstanceKey(Uint32 tabId, Uint32 fragId)
 {
@@ -16948,15 +16949,3 @@ Dbdih::dihGetInstanceKey(Uint32 tabId, U
   Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
   return instanceKey;
 }
-
-Uint32
-Dbdih::dihGetLogPartId(Uint32 tabId, Uint32 fragId)
-{
-  TabRecordPtr tTabPtr;
-  tTabPtr.i = tabId;
-  ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
-  FragmentstorePtr tFragPtr;
-  getFragstore(tTabPtr.p, fragId, tFragPtr);
-  Uint32 logPartId = tFragPtr.p->m_log_part_id;
-  return logPartId;
-}

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-08-21 16:10:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-08-22 10:36:33 +0000
@@ -1342,6 +1342,10 @@ public:
      *       occuring during system/node restart.
      */
     Uint16 invalidatePageNo;
+    /**
+     *       For MT LQH the log part (0-3).
+     */
+    Uint16 logPartNo;
   }; // Size 164 Bytes
   typedef Ptr<LogPartRecord> LogPartRecordPtr;
   

=== added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2008-08-22 10:36:33 +0000
@@ -0,0 +1,63 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <GlobalData.hpp>
+#include "DblqhCommon.hpp"
+
+NdbLogPartInfo::NdbLogPartInfo(Uint32 instanceNo)
+{
+  lqhWorkers = globalData.ndbMtLqhWorkers;
+  partCount = 0;
+  partMask.clear();
+  Uint32 lpno;
+  for (lpno = 0; lpno < LogParts; lpno++) {
+    if (instanceNo != 0) {
+      Uint32 worker = instanceNo - 1;
+      assert(worker < lqhWorkers);
+      if (worker != lpno % lqhWorkers)
+        continue;
+    }
+    partNo[partCount++] = lpno;
+    partMask.set(lpno);
+  }
+}
+
+Uint32
+NdbLogPartInfo::partNoFromId(Uint32 lpid) const
+{
+  return lpid % LogParts;
+}
+
+bool
+NdbLogPartInfo::partNoOwner(Uint32 lpno) const
+{
+  assert(lpno < LogParts);
+  return partMask.get(lpno);
+}
+
+Uint32
+NdbLogPartInfo::partNoIndex(Uint32 lpno) const
+{
+  assert(lpno < LogParts);
+  assert(partMask.get(lpno));
+  Uint32 i = 0;
+  if (lqhWorkers == 0)
+    i = lpno;
+  else
+    i = lpno / lqhWorkers;
+  assert(i < partCount);
+  assert(partNo[i] == lpno);
+  return i;
+}

=== added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2008-08-22 10:36:33 +0000
@@ -0,0 +1,51 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef DBLQH_COMMON_H
+#define DBLQH_COMMON_H
+
+#include <pc.hpp>
+#include <ndb_types.h>
+#include <Bitmask.hpp>
+
+/*
+ * Log part id is from DBDIH.  Number of log parts is fixed as 4.
+ * A log part is identified by log part number (0-3)
+ *
+ *   log part number = log part id % 4
+ *
+ * Currently instance key (1-4) is
+ *
+ *   instance key = 1 + log part number
+ *
+ * This may change, and the code (except this file) must not assume
+ * any connection between log part number and instance key.
+ *
+ * Following structure computes log part info for a specific LQH
+ * instance (main instance 0 or worker instances 1-4).
+ */
+struct NdbLogPartInfo {
+  enum { LogParts = 4 };
+  NdbLogPartInfo(Uint32 instanceNo);
+  Uint32 lqhWorkers;
+  Uint32 partCount;
+  Uint16 partNo[LogParts];
+  Bitmask<(LogParts+31)/32> partMask;
+  Uint32 partNoFromId(Uint32 lpid) const;
+  bool partNoOwner(Uint32 lpno) const;
+  Uint32 partNoIndex(Uint32 lpno) const;
+};
+
+#endif

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-08-21 14:04:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-08-22 10:36:33 +0000
@@ -18,6 +18,7 @@
 #define DBLQH_C
 #include "Dblqh.hpp"
 #include <ndb_limits.h>
+#include "DblqhCommon.hpp"
 
 #define DEBUG(x) { ndbout << "LQH::" << x << endl; }
 
@@ -32,7 +33,10 @@ void Dblqh::initData() 
   clcpFileSize = ZNO_CONCURRENT_LCP;
   clfoFileSize = 0;
   clogFileFileSize = 0;
-  clogPartFileSize = ZLOG_PART_FILE_SIZE;
+
+  NdbLogPartInfo lpinfo(instance());
+  clogPartFileSize = lpinfo.partCount;
+
   cpageRefFileSize = ZPAGE_REF_FILE_SIZE;
   cscanrecFileSize = 0;
   ctabrecFileSize = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-08-21 22:18:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-08-22 10:36:33 +0000
@@ -71,6 +71,7 @@
 #include <signaldata/SignalDroppedRep.hpp>
 
 #include "../suma/Suma.hpp"
+#include "DblqhCommon.hpp"
 
 // Use DEBUG to print messages that should be
 // seen only when we debug the product
@@ -832,7 +833,7 @@ void Dblqh::startphase3Lab(Signal* signa
     reportStatus(signal); 
   }
   initReportStatus(signal);
-  for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     initLogpart(signal);
@@ -879,7 +880,7 @@ void Dblqh::startphase3Lab(Signal* signa
      * THE RESTART BY FINDING THE END OF THE LOG AND FROM THERE FINDING THE 
      * INFO ABOUT THE GLOBAL CHECKPOINTS IN THE FRAGMENT LOG. 
      --------------------------------------------------------------------- */
-    for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+    for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
       jam();
       LogFileRecordPtr locLogFilePtr;
       ptrAss(logPartPtr, logPartRecord);
@@ -1540,7 +1541,19 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
   fragptr.p->startGci = req->startGci;
   fragptr.p->newestGci = req->startGci;
   fragptr.p->tableType = tabptr.p->tableType;
-  fragptr.p->m_log_part_ptr_i = (req->logPartId & 3); // assumes array
+
+  {
+    NdbLogPartInfo lpinfo(instance());
+    Uint32 logPartNo = lpinfo.partNoFromId(req->logPartId);
+    ndbrequire(lpinfo.partNoOwner(logPartNo));
+
+    LogPartRecordPtr ptr;
+    ptr.i = lpinfo.partNoIndex(logPartNo);
+    ptrCheckGuard(ptr, clogPartFileSize, logPartRecord);
+    ndbrequire(ptr.p->logPartNo == logPartNo);
+
+    fragptr.p->m_log_part_ptr_i = ptr.i;
+  }
 
   if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
     jam();
@@ -12559,7 +12572,7 @@ void Dblqh::setLogTail(Signal* signal, U
   UintR tsltIndex;
   UintR tsltFlag;
 
-  for (sltLogPartPtr.i = 0; sltLogPartPtr.i < 4; sltLogPartPtr.i++) {
+  for (sltLogPartPtr.i = 0; sltLogPartPtr.i < clogPartFileSize; sltLogPartPtr.i++) {
     jam();
     ptrAss(sltLogPartPtr, logPartRecord);
     findLogfile(signal, sltLogPartPtr.p->logTailFileNo,
@@ -12842,7 +12855,7 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
   gcpPtr.p->gcpUserptr = dihPtr;
   gcpPtr.p->gcpId = gci;
   bool tlogActive = false;
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     ptrAss(logPartPtr, logPartRecord);
     if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
       jam();
@@ -12872,7 +12885,7 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
 /* ------------------------------------------------------------------------- */
 void Dblqh::startTimeSupervision(Signal* signal) 
 {
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
@@ -12899,7 +12912,7 @@ void Dblqh::initGcpRecLab(Signal* signal
 /*                                                                          */
 /*       SUBROUTINE SHORT NAME = IGR                                        */
 /* ======================================================================== */
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
 /*--------------------------------------------------*/
@@ -12935,6 +12948,14 @@ void Dblqh::initGcpRecLab(Signal* signal
 	logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] - 1;
     }//if
   }//for
+  // initialize un-used part
+  Uint32 Ti;
+  for (Ti = clogPartFileSize; Ti < ZLOG_PART_FILE_SIZE; Ti++) {
+    gcpPtr.p->gcpFilePtr[Ti] = ZNIL;
+    gcpPtr.p->gcpPageNo[Ti] = ZNIL;
+    gcpPtr.p->gcpSyncReady[Ti] = FALSE;
+    gcpPtr.p->gcpWordNo[Ti] = ZNIL;
+  }
   return;
 }//Dblqh::initGcpRecLab()
 
@@ -12987,7 +13008,7 @@ void Dblqh::checkGcpCompleted(Signal* si
       logPartPtr.p->gcprec = RNIL;
       gcpPtr.p->gcpLogPartState[logPartPtr.i] = ZON_DISK;
       tcgcFlag = ZTRUE;
-      for (tcgcJ = 0; tcgcJ <= 3; tcgcJ++) {
+      for (tcgcJ = 0; tcgcJ < clogPartFileSize; tcgcJ++) {
         jam();
         if (gcpPtr.p->gcpLogPartState[tcgcJ] != ZON_DISK) {
           jam();
@@ -13008,7 +13029,7 @@ void Dblqh::checkGcpCompleted(Signal* si
 // log files where the last log word resided first before proceeding.
 /* ------------------------------------------------------------------------- */
         UintR Ti;
-        for (Ti = 0; Ti < 4; Ti++) {
+        for (Ti = 0; Ti < clogPartFileSize; Ti++) {
           LogFileRecordPtr loopLogFilePtr;
           loopLogFilePtr.i = gcpPtr.p->gcpFilePtr[Ti];
           ptrCheckGuard(loopLogFilePtr, clogFileFileSize, logFileRecord);
@@ -13048,7 +13069,7 @@ Dblqh::execFSSYNCCONF(Signal* signal)
   ptrCheckGuard(localGcpPtr, cgcprecFileSize, gcpRecord);
   localGcpPtr.p->gcpSyncReady[localLogPartPtr.i] = ZTRUE;
   UintR Ti;
-  for (Ti = 0; Ti < 4; Ti++) {
+  for (Ti = 0; Ti < clogPartFileSize; Ti++) {
     jam();
     if (localGcpPtr.p->gcpSyncReady[Ti] == ZFALSE) {
       jam();
@@ -13995,7 +14016,7 @@ CHECK_LOG_PARTS_LOOP:
 /*---------------------------------------------------------------------------*/
     return;
   }//if
-  if (logPartPtr.i == 3) {
+  if (logPartPtr.i + 1 == clogPartFileSize) {
     jam();
 /*---------------------------------------------------------------------------*/
 /* ALL LOG PARTS ARE COMPLETED. NOW WE CAN CONTINUE WITH THE RESTART         */
@@ -14003,7 +14024,7 @@ CHECK_LOG_PARTS_LOOP:
 /* NEED TO INITIALISE ALL NEEDED DATA AND TO OPEN FILE ZERO AND THE NEXT AND */
 /* TO SET THE CURRENT LOG PAGE TO BE PAGE 1 IN FILE ZERO.                    */
 /*---------------------------------------------------------------------------*/
-    for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+    for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
       ptrAss(logPartPtr, logPartRecord);
       signal->theData[0] = ZINIT_FOURTH;
       signal->theData[1] = logPartPtr.i;
@@ -14050,7 +14071,7 @@ void Dblqh::initLogfile(Signal* signal, 
   logFilePtr.p->fileName[2] = fileNo;	        /* Sfile_no */
   tilTmp = 1;	                        /* VERSION 1 OF FILE NAME */
   tilTmp = (tilTmp << 8) + 1;	    /* FRAGMENT LOG => .FRAGLOG AS EXTENSION */
-  tilTmp = (tilTmp << 8) + (8 + logPartPtr.i); /* DIRECTORY = D(8+Part)/DBLQH */
+  tilTmp = (tilTmp << 8) + (8 + logPartPtr.p->logPartNo); /* DIRECTORY = D(8+Part)/DBLQH */
   tilTmp = (tilTmp << 8) + 255;	              /* IGNORE Pxx PART OF FILE NAME */
   logFilePtr.p->fileName[3] = tilTmp;
 /* ========================================================================= */
@@ -14723,7 +14744,7 @@ void Dblqh::closingSrLab(Signal* signal)
    *  CHECK IF ALL OTHER LOG PARTS ARE ALSO COMPLETED.
    * ------------------------------------------------------------------------ */
   logPartPtr.p->logPartState = LogPartRecord::SR_FIRST_PHASE_COMPLETED;
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     if (logPartPtr.p->logPartState != LogPartRecord::SR_FIRST_PHASE_COMPLETED) {
@@ -14981,7 +15002,7 @@ void Dblqh::execSTART_RECREQ(Signal* sig
     cstartRecReqData = RNIL;
   }
   
-  for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->logPartNewestCompletedGCI = cnewestCompletedGci;
   }//for
@@ -15265,7 +15286,7 @@ void Dblqh::execSrCompletedLab(Signal* s
      * HAVE TO FIND THE ACTUAL PAGE NUMBER AND PAGE INDEX WHERE TO 
      * CONTINUE WRITING THE LOG AFTER THE SYSTEM RESTART.
      * --------------------------------------------------------------------- */
-    for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+    for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
       jam();
       ptrAss(logPartPtr, logPartRecord);
       logPartPtr.p->logPartState = LogPartRecord::SR_FOURTH_PHASE_STARTED;
@@ -15380,7 +15401,7 @@ void Dblqh::srPhase3Start(Signal* signal
   ndbrequire(csrPhaseStarted != ZSR_BOTH_PHASES_STARTED);
 
   csrPhaseStarted = ZSR_BOTH_PHASES_STARTED;
-  for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->logPartState = LogPartRecord::SR_THIRD_PHASE_STARTED;
@@ -15445,7 +15466,7 @@ void Dblqh::srGciLimits(Signal* signal) 
     }//if
   }
 
-  for(Uint32 i = 1; i<4; i++)
+  for(Uint32 i = 1; i < clogPartFileSize; i++)
   {
     LogPartRecordPtr tmp;
     tmp.i = i;
@@ -15464,7 +15485,7 @@ void Dblqh::srGciLimits(Signal* signal) 
     logPartPtr.p->logStartGci = logPartPtr.p->logLastGci;
   }//if
   
-  for (logPartPtr.i = 0; logPartPtr.i < 4; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->logExecState = LogPartRecord::LES_SEARCH_STOP;
@@ -16497,7 +16518,7 @@ void Dblqh::execLogComp(Signal* signal) 
   tcConnectptr.i = logPartPtr.p->logTcConrec;
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   releaseTcrecLog(signal, tcConnectptr);
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     if (logPartPtr.p->logPartState != LogPartRecord::SR_THIRD_PHASE_COMPLETED) {
@@ -16774,7 +16795,7 @@ void Dblqh::srFourthComp(Signal* signal)
   logPartPtr.i = signal->theData[0];
   ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
   logPartPtr.p->logPartState = LogPartRecord::SR_FOURTH_PHASE_COMPLETED;
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     jam();
     ptrAss(logPartPtr, logPartRecord);
     if (logPartPtr.p->logPartState != LogPartRecord::SR_FOURTH_PHASE_COMPLETED) {
@@ -16799,7 +16820,7 @@ void Dblqh::srFourthComp(Signal* signal)
    *  SET LOG PART STATE TO IDLE TO
    *  INDICATE THAT NOTHING IS GOING ON IN THE LOG PART.
    * ----------------------------------------------------------------------- */
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->logPartState = LogPartRecord::IDLE;
   }//for
@@ -17554,7 +17575,7 @@ void Dblqh::initialiseGcprec(Signal* sig
   if (cgcprecFileSize != 0) {
     for (gcpPtr.i = 0; gcpPtr.i < cgcprecFileSize; gcpPtr.i++) {
       ptrAss(gcpPtr, gcpRecord);
-      for (tigpIndex = 0; tigpIndex <= 3; tigpIndex++) {
+      for (tigpIndex = 0; tigpIndex < ZLOG_PART_FILE_SIZE; tigpIndex++) {
         gcpPtr.p->gcpLogPartState[tigpIndex] = ZIDLE;
         gcpPtr.p->gcpSyncReady[tigpIndex] = ZFALSE;
       }//for
@@ -17672,7 +17693,7 @@ void Dblqh::initialiseLogPage(Signal* si
  * ========================================================================= */
 void Dblqh::initialiseLogPart(Signal* signal) 
 {
-  for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++) {
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_FALSE;
     logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
@@ -18034,6 +18055,10 @@ void Dblqh::initLogpart(Signal* signal) 
   logPartPtr.p->headFileNo = ZNIL;
   logPartPtr.p->headPageNo = ZNIL;
   logPartPtr.p->headPageIndex = ZNIL;
+
+  NdbLogPartInfo lpinfo(instance());
+  ndbrequire(lpinfo.partCount == clogPartFileSize);
+  logPartPtr.p->logPartNo = lpinfo.partNo[logPartPtr.i];
 }//Dblqh::initLogpart()
 
 /* ========================================================================== 
@@ -18249,7 +18274,7 @@ void Dblqh::logNextStart(Signal* signal)
     writeCompletedGciLog(signal);
     logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_FALSE;
     tlnsStillWaiting = ZFALSE;
-    for (lnsLogPartPtr.i = 0; lnsLogPartPtr.i < 4; lnsLogPartPtr.i++) {
+    for (lnsLogPartPtr.i = 0; lnsLogPartPtr.i < clogPartFileSize; lnsLogPartPtr.i++) {
       jam();
       ptrAss(lnsLogPartPtr, logPartRecord);
       if (lnsLogPartPtr.p->waitWriteGciLog == LogPartRecord::WWGL_TRUE) {
@@ -19872,12 +19897,14 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
     jam();
     Uint32 i;
     GcpRecordPtr gcp; gcp.i = RNIL;
-    for(i = 0; i<4; i++)
+    for(i = 0; i < clogPartFileSize; i++)
     {
       logPartPtr.i = i;
       ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
-      ndbout_c("LP %d state: %d WW_Gci: %d gcprec: %d flq: %d currfile: %d tailFileNo: %d logTailMbyte: %d", 
+      ndbout_c("LP %d blockInstance: %d partNo: %d state: %d WW_Gci: %d gcprec: %d flq: %d currfile: %d tailFileNo: %d logTailMbyte: %d", 
 	       i,
+               instance(),
+               logPartPtr.p->logPartNo,
 	       logPartPtr.p->logPartState,
 	       logPartPtr.p->waitWriteGciLog,
 	       logPartPtr.p->gcprec,

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-08-22 10:36:33 +0000
@@ -15,6 +15,7 @@
 
 #include "DblqhProxy.hpp"
 #include "Dblqh.hpp"
+#include "DblqhCommon.hpp"
 
 DblqhProxy::DblqhProxy(Block_context& ctx) :
   LocalProxy(DBLQH, ctx)
@@ -275,7 +276,9 @@ DblqhProxy::sendLQHFRAGREQ(Signal* signa
   LqhFragReq* req = (LqhFragReq*)signal->getDataPtrSend();
   *req = ss.m_req;
 
-  if (!isLogPartOwner(ss.m_worker, req->logPartId)) {
+  NdbLogPartInfo lpinfo(workerInstance(ss.m_worker));
+  Uint32 logPartNo = lpinfo.partNoFromId(req->logPartId);
+  if (!lpinfo.partNoOwner(logPartNo)) {
     jam();
     skipReq(ss);
     return;

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-08-22 10:36:33 +0000
@@ -217,34 +217,6 @@ SimulatedBlock::addRecSignalImpl(GlobalS
   theExecArray[gsn] = f;
 }
 
-Uint32
-SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
-{
-  Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
-  ndbrequire(dbdih != 0);
-  Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
-  return instanceKey;
-}
-
-Uint32
-SimulatedBlock::getLogPartId(Uint32 tabId, Uint32 fragId)
-{
-  Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
-  ndbrequire(dbdih != 0);
-  Uint32 logPartId = dbdih->dihGetLogPartId(tabId, fragId);
-  return logPartId;
-}
-
-bool
-SimulatedBlock::isLogPartOwner(Uint32 worker, Uint32 logPartId)
-{
-  if (!globalData.isNdbMtLqh)
-    return true;
-  Uint32 workers = globalData.ndbMtLqhWorkers;
-  ndbrequire(workers != 0 && worker < workers);
-  return worker == logPartId % workers;
-}
-
 void
 SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
                                Uint32 *watchDogCounter)
@@ -254,6 +226,14 @@ SimulatedBlock::assignToThread(Uint32 th
   m_watchDogCounter = watchDogCounter;
 }
 
+Uint32
+SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+  Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+  Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
+  return instanceKey;
+}
+
 void
 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo, 
 			     const char* filename, int lineno) const 

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-08-22 10:36:33 +0000
@@ -173,14 +173,13 @@ public:
   static Uint32 getLqhWorkers() { return globalData.ndbMtLqhWorkers; }
 
   /*
-   * Instance key (1-4, even if not MT LQH) is set in receiver block ref.
-   * The receiver maps it to a real instance (0, if not MT LQH).
+   * Instance key (1-4) is used only when sending a signal.  Receiver
+   * maps it to actual instance (0, if receiver is not MT LQH).
+   *
+   * For performance reason, DBTC gets instance key directly from DBDIH
+   * via DI*GET*NODES*REQ signals.
    */
-  Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
-
-  /* MT LQH log parts info for use by this node */
-  Uint32 getLogPartId(Uint32 tabId, Uint32 fragId);
-  bool isLogPartOwner(Uint32 worker, Uint32 logPartId);
+  static Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
 
 public:
   typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, 

Thread
bzr commit into mysql-5.1 branch (pekka:2722) WL#4391Pekka Nousiainen22 Aug