List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:May 21 2012 10:32pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4926 to 4927)
Bug#14075825
View as plain text  
 4927 Frazer Clement	2012-05-21
      Bug #14075825 LCP Watchdog : Fragment scan check
      
      The Local Checkpoint (LCP) mechanism allows Ndb to trim its redo and
      undo logs.  This keeps node and system recovery time within bounds, 
      and maintains free redo and undo space to support logging DML 
      operations. 
       
      A watchdog mechanism is added to supervise fragment scans occurring
      as part of a local checkpoint.  This is intended to guard against any
      scan related OS level IO errors or bugs causing issues with LCP and 
      endangering write service and recovery times.
      
      Each node independently monitors the progress of local fragment 
      scans occurring as part of an LCP.  If no progress is made for 
      20 seconds, warning logs will be generated every 10 seconds.  
      If no progress is made for one minute then the fragment scan is 
      considered to be hung, and the node will be restarted to enable 
      LCP to continue. 
      
      Any occurrence of warning logs or other action taken by this 
      watchdog should be reported as a bug.
      
      A new test scenario is added to testNodeRestart. 

    modified:
      storage/ndb/include/kernel/GlobalSignalNumbers.h
      storage/ndb/include/kernel/signaldata/LCP.hpp
      storage/ndb/include/kernel/signaldata/SignalData.hpp
      storage/ndb/src/common/debugger/signaldata/LCP.cpp
      storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
      storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
      storage/ndb/src/kernel/blocks/backup/Backup.cpp
      storage/ndb/src/kernel/blocks/backup/Backup.hpp
      storage/ndb/src/kernel/blocks/backup/BackupInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/test/include/NdbMgmd.hpp
      storage/ndb/test/ndbapi/testNodeRestart.cpp
      storage/ndb/test/run-test/daily-devel-tests.txt
 4926 magnus.blaudd@stripped	2012-05-07 [merge]
      Merge

    modified:
      storage/ndb/include/util/SocketAuthenticator.hpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/util/SocketAuthenticator.cpp
      storage/ndb/test/include/NdbTimer.hpp
      storage/ndb/test/tools/connect.cpp
=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2012-01-10 10:59:37 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2012-05-21 22:27:28 +0000
@@ -836,9 +836,10 @@ extern const GlobalSignalNumber NO_OF_SI
 
 #define GSN_SYNC_PATH_REQ               613
 #define GSN_SYNC_PATH_CONF              614
-#define GSN_615
-#define GSN_616
-#define GSN_617
+
+#define GSN_LCP_STATUS_REQ              615
+#define GSN_LCP_STATUS_CONF             616
+#define GSN_LCP_STATUS_REF              617
 
 #define GSN_618
 #define GSN_619

=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp	2012-05-21 22:27:28 +0000
@@ -223,4 +223,110 @@ struct EndLcpConf
   STATIC_CONST( SignalLength = 2 );
 };
 
+struct LcpStatusReq
+{
+  /** 
+   * Sender(s)
+   */
+  friend class Dblqh;
+  
+  /**
+   * Sender(s) / Receiver(s)
+   */
+
+  /**
+   * Receiver(s)
+   */
+  friend class Backup;
+
+  friend bool printLCP_STATUS_REQ(FILE *, const Uint32 *, Uint32, Uint16);
+public:
+  
+  STATIC_CONST( SignalLength = 2 );
+
+private:
+  Uint32 senderRef;
+  Uint32 reqData;
+};
+
+struct LcpStatusConf
+{
+  /** 
+   * Sender(s)
+   */
+  friend class Backup;
+  
+  /**
+   * Sender(s) / Receiver(s)
+   */
+
+  /**
+   * Receiver(s)
+   */
+  friend class Dblqh;
+
+  friend bool printLCP_STATUS_CONF(FILE *, const Uint32 *, Uint32, Uint16);  
+public:
+  STATIC_CONST( SignalLength = 11 );
+
+  enum LcpState
+  {
+    LCP_IDLE       = 0,
+    LCP_PREPARED   = 1,
+    LCP_SCANNING   = 2,
+    LCP_SCANNED    = 3
+  };
+private:
+  Uint32 senderRef;
+  Uint32 reqData;
+  /* Backup stuff */
+  Uint32 lcpState;
+  /* In lcpState == LCP_IDLE, refers to prev LCP
+   * otherwise, refers to current running LCP
+   */
+  Uint32 lcpDoneRowsHi;
+  Uint32 lcpDoneRowsLo;
+  Uint32 lcpDoneBytesHi;
+  Uint32 lcpDoneBytesLo;
+  
+  /* Backup stuff valid iff lcpState == LCP_SCANNING */
+  Uint32 tableId;
+  Uint32 fragId;
+  Uint32 replicaDoneRowsHi;
+  Uint32 replicaDoneRowsLo;
+};
+
+struct LcpStatusRef
+{
+  /** 
+   * Sender(s)
+   */
+  friend class Backup;
+  
+  /**
+   * Sender(s) / Receiver(s)
+   */
+
+  /**
+   * Receiver(s)
+   */
+  friend class Dblqh;
+
+  friend bool printLCP_STATUS_REF(FILE *, const Uint32 *, Uint32, Uint16);  
+public:
+  STATIC_CONST( SignalLength = 3 );
+  
+  enum StatusFailCodes
+  {
+    NoLCPRecord    = 1,
+    NoTableRecord  = 2,
+    NoFileRecord   = 3
+  };
+
+private:
+  Uint32 senderRef;
+  Uint32 reqData;
+  Uint32 error;
+};
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/SignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalData.hpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalData.hpp	2012-05-21 22:27:28 +0000
@@ -321,4 +321,8 @@ GSN_PRINT_SIGNATURE(printGET_CONFIG_REQ)
 GSN_PRINT_SIGNATURE(printGET_CONFIG_REF);
 GSN_PRINT_SIGNATURE(printGET_CONFIG_CONF);
 
+GSN_PRINT_SIGNATURE(printLCP_STATUS_REQ);
+GSN_PRINT_SIGNATURE(printLCP_STATUS_CONF);
+GSN_PRINT_SIGNATURE(printLCP_STATUS_REF);
+
 #endif

=== modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp	2012-05-21 22:27:28 +0000
@@ -89,3 +89,37 @@ printLCP_COMPLETE_REP(FILE * output, con
 	  sig->lcpId, sig->nodeId, getBlockName(sig->blockNo));
   return true;
 }
+
+bool
+printLCP_STATUS_REQ(FILE * output, const Uint32 * theData, 
+                    Uint32 len, Uint16 receiverBlockNo){
+  const LcpStatusReq* const sig = (LcpStatusReq*) theData;
+  
+  fprintf(output, " SenderRef : %x ReqData : %u\n", 
+          sig->senderRef, sig->reqData);
+  return true;
+}
+
+bool
+printLCP_STATUS_CONF(FILE * output, const Uint32 * theData, 
+                     Uint32 len, Uint16 receiverBlockNo){
+  const LcpStatusConf* const sig = (LcpStatusConf*) theData;
+  
+  fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n",
+          sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId);
+  fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
+          (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo,
+          (((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo,
+          (((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo);
+  return true;
+}
+
+bool
+printLCP_STATUS_REF(FILE * output, const Uint32 * theData, 
+                    Uint32 len, Uint16 receiverBlockNo){
+  const LcpStatusRef* const sig = (LcpStatusRef*) theData;
+  
+  fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n", 
+          sig->senderRef, sig->reqData, sig->error);
+  return true;
+}

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp	2012-05-21 22:27:28 +0000
@@ -272,6 +272,10 @@ SignalDataPrintFunctions[] = {
   ,{ GSN_GET_CONFIG_REF, printGET_CONFIG_REF }
   ,{ GSN_GET_CONFIG_CONF, printGET_CONFIG_CONF }
 
+  ,{ GSN_LCP_STATUS_REQ, printLCP_STATUS_REQ }
+  ,{ GSN_LCP_STATUS_CONF, printLCP_STATUS_CONF }
+  ,{ GSN_LCP_STATUS_REF, printLCP_STATUS_REF }
+
   ,{ 0, 0 }
 };
 

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2012-05-21 22:27:28 +0000
@@ -775,5 +775,9 @@ const GsnName SignalNames [] = {
   ,{ GSN_GET_CONFIG_REQ, "GET_CONFIG_REQ" }
   ,{ GSN_GET_CONFIG_REF, "GET_CONFIG_REF" }
   ,{ GSN_GET_CONFIG_CONF, "GET_CONFIG_CONF" }
+
+  ,{ GSN_LCP_STATUS_REQ, "LCP_STATUS_REQ" }
+  ,{ GSN_LCP_STATUS_CONF, "LCP_STATUS_CONF" }
+  ,{ GSN_LCP_STATUS_REF, "LCP_STATUS_REF" }
 };
 const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2011-12-08 14:32:19 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2012-05-21 22:27:28 +0000
@@ -4471,6 +4471,10 @@ Backup::fragmentCompleted(Signal* signal
   
   if (ptr.p->is_lcp())
   {
+    /* Maintain LCP totals */
+    ptr.p->noOfRecords+= op.noOfRecords;
+    ptr.p->noOfBytes+= op.noOfBytes;
+    
     ptr.p->slaveState.setState(STOPPING);
     filePtr.p->operation.dataBuffer.eof();
   }
@@ -5682,7 +5686,14 @@ Backup::execLCP_PREPARE_REQ(Signal* sign
   fragPtr.p->scanned = 0;
   fragPtr.p->scanning = 0;
   fragPtr.p->tableId = req.tableId;
-  
+
+  if (req.backupId != ptr.p->backupId)
+  {
+    jam();
+    /* New LCP, reset per-LCP counters */
+    ptr.p->noOfBytes = 0;
+    ptr.p->noOfRecords = 0;
+  }
   ptr.p->backupId= req.backupId;
   lcp_open_file(signal, ptr);
 }
@@ -5843,3 +5854,127 @@ Backup::execEND_LCPREQ(Signal* signal)
   sendSignal(ptr.p->masterRef, GSN_END_LCPCONF,
 	     signal, EndLcpConf::SignalLength, JBB);
 }
+
+inline
+static 
+void setWords(const Uint64 src, Uint32& hi, Uint32& lo)
+{
+  hi = (Uint32) (src >> 32);
+  lo = (Uint32) (src & 0xffffffff);
+}
+
+void
+Backup::execLCP_STATUS_REQ(Signal* signal)
+{
+  jamEntry();
+  const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr();
+  
+  const Uint32 senderRef = req->senderRef;
+  const Uint32 reqData = req->reqData;
+  Uint32 failCode = LcpStatusRef::NoLCPRecord;
+
+  /* Find LCP backup, if there is one */
+  BackupRecordPtr ptr;
+  bool found_lcp = false;
+  for (c_backups.first(ptr); ptr.i != RNIL; c_backups.next(ptr))
+  {
+    jam();
+    if (ptr.p->is_lcp())
+    {
+      jam();
+      ndbrequire(found_lcp == false); /* Just one LCP */
+      found_lcp = true;
+      
+      LcpStatusConf::LcpState state = LcpStatusConf::LCP_IDLE;
+      switch (ptr.p->slaveState.getState())
+      {
+      case STARTED:
+        state = LcpStatusConf::LCP_PREPARED;
+        break;
+      case SCANNING:
+        state = LcpStatusConf::LCP_SCANNING;
+        break;
+      case STOPPING:
+        state = LcpStatusConf::LCP_SCANNED;
+        break;
+      case DEFINED:
+        state = LcpStatusConf::LCP_IDLE;
+        break;
+      default:
+        ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u",
+                 ptr.p->slaveState.getState());
+        state = LcpStatusConf::LCP_IDLE;
+      };
+        
+      /* Not all values are set here */
+      const Uint32 UnsetConst = ~0;
+      
+      LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
+      conf->senderRef = reference();
+      conf->reqData = reqData;
+      conf->lcpState = state;
+      conf->tableId = UnsetConst;
+      conf->fragId = UnsetConst;
+      conf->replicaDoneRowsHi = UnsetConst;
+      conf->replicaDoneRowsLo = UnsetConst;
+      setWords(ptr.p->noOfRecords,
+               conf->lcpDoneRowsHi,
+               conf->lcpDoneRowsLo);
+      setWords(ptr.p->noOfBytes,
+               conf->lcpDoneBytesHi,
+               conf->lcpDoneBytesLo);
+      
+      if (state == LcpStatusConf::LCP_SCANNING)
+      {
+        /* Actually scanning a fragment, let's grab the details */
+        TablePtr tabPtr;
+        FragmentPtr fragPtr;
+        BackupFilePtr filePtr;
+        
+        ptr.p->tables.first(tabPtr);
+        if (tabPtr.i == RNIL)
+        {
+          jam();
+          failCode = LcpStatusRef::NoTableRecord;
+          break;
+        }
+        tabPtr.p->fragments.getPtr(fragPtr, 0);
+        ndbrequire(fragPtr.p->tableId == tabPtr.p->tableId);
+        if (ptr.p->dataFilePtr == RNIL)
+        {
+          jam();
+          failCode = LcpStatusRef::NoFileRecord;
+          break;
+        }
+        c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+        ndbrequire(filePtr.p->backupPtr == ptr.i);
+        conf->tableId = tabPtr.p->tableId;
+        conf->fragId = fragPtr.p->fragmentId;
+        setWords(filePtr.p->operation.noOfRecords,
+                 conf->replicaDoneRowsHi,
+                 conf->replicaDoneRowsLo);
+      }
+      
+      failCode = 0;
+    }
+  }
+
+  if (failCode == 0)
+  {
+    jam();
+    sendSignal(senderRef, GSN_LCP_STATUS_CONF, 
+               signal, LcpStatusConf::SignalLength, JBB);
+    return;
+  }
+
+  jam();
+  LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
+  
+  ref->senderRef = reference();
+  ref->reqData = reqData;
+  ref->error = failCode;
+  
+  sendSignal(senderRef, GSN_LCP_STATUS_REF, 
+             signal, LcpStatusRef::SignalLength, JBB);
+  return;
+}

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2012-05-21 22:27:28 +0000
@@ -160,6 +160,8 @@ protected:
 
   void execDBINFO_SCANREQ(Signal *signal);
 
+  void execLCP_STATUS_REQ(Signal* signal);
+
 private:
   void defineBackupMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);
   void dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);

=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp	2012-01-30 12:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp	2012-05-21 22:27:28 +0000
@@ -117,6 +117,8 @@ Backup::Backup(Block_context& ctx, Uint3
   addRecSignal(GSN_BACKUP_LOCK_TAB_CONF, &Backup::execBACKUP_LOCK_TAB_CONF);
   addRecSignal(GSN_BACKUP_LOCK_TAB_REF, &Backup::execBACKUP_LOCK_TAB_REF);
 
+  addRecSignal(GSN_LCP_STATUS_REQ, &Backup::execLCP_STATUS_REQ);
+
   /**
    * Testing
    */

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-01-25 14:29:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-05-21 22:27:28 +0000
@@ -249,6 +249,7 @@ class Lgman;
 #define ZWAIT_REORG_SUMA_FILTER_ENABLED 23
 #define ZREBUILD_ORDERED_INDEXES 24
 #define ZWAIT_READONLY 25
+#define ZLCP_FRAG_WATCHDOG 26
 
 /* ------------------------------------------------------------------------- */
 /*        NODE STATE DURING SYSTEM RESTART, VARIABLES CNODES_SR_STATE        */
@@ -1083,6 +1084,72 @@ public:
     void send_io(Uint32 bytes);
     void complete_io(Uint32 bytes);
   };
+
+  /**
+   * LCPFragWatchdog
+   *
+   * Structure tracking state of LCP fragment watchdog.
+   * This watchdog polls the state of the current LCP fragment
+   * scan to ensure that forward progress is maintained at
+   * a minimal rate.
+   * It only continues running while this LQH instance 
+   * thinks a fragment scan is ongoing
+   */
+  struct LCPFragWatchdog
+  {
+    STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */
+    STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */
+    STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */
+    
+    /* Should the watchdog be running? */
+    bool scan_running;
+    
+    /* Is there an active thread? */
+    bool thread_active;
+    
+    /* LCP position info from Backup block */
+    Uint32 tableId;
+    Uint32 fragId;
+    Uint64 rowCount;
+
+    /* Number of periods with no LCP progress observed */ 
+    Uint32 pollCount;
+
+    /* Reinitialise the watchdog */
+    void reset()
+    {
+      scan_running = false;
+      tableId = ~Uint32(0);
+      fragId = ~Uint32(0);
+      rowCount = ~Uint64(0);
+      pollCount = 0;
+    }
+
+    /* Handle an LCP Status report */
+    void handleLcpStatusRep(Uint32 repTableId,
+                            Uint32 repFragId,
+                            Uint64 repRowCount)
+    {
+      if (scan_running)
+      {
+        if ((repRowCount != rowCount) ||
+            (repFragId != fragId) ||
+            (repTableId != tableId))
+        {
+          /* Something moved since last time, reset
+           * poll counter and data.
+           */
+          pollCount = 0;
+          tableId = repTableId;
+          fragId = repFragId;
+          rowCount = repRowCount;
+        }
+      }
+    }
+  };
+  
+  LCPFragWatchdog c_lcpFragWatchdog;
+    
     
   /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
   /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
@@ -2736,6 +2803,14 @@ private:
   void unlockError(Signal* signal, Uint32 error);
   void handleUserUnlockRequest(Signal* signal);
   
+  void execLCP_STATUS_CONF(Signal* signal);
+  void execLCP_STATUS_REF(Signal* signal);
+
+  void startLcpFragWatchdog(Signal* signal);
+  void stopLcpFragWatchdog();
+  void invokeLcpFragWatchdogThread(Signal* signal);
+  void checkLcpFragWatchdog(Signal* signal);
+  
   Dbtup* c_tup;
   Dbacc* c_acc;
   Lgman* c_lgman;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-11-16 11:05:46 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2012-05-21 22:27:28 +0000
@@ -90,6 +90,9 @@ void Dblqh::initData()
   c_max_redo_lag_counter = 3; // 3 strikes and you're out
 
   c_max_parallel_scans_per_frag = 32;
+
+  c_lcpFragWatchdog.reset();
+  c_lcpFragWatchdog.thread_active = false;
 }//Dblqh::initData()
 
 void Dblqh::initRecords() 
@@ -426,6 +429,9 @@ Dblqh::Dblqh(Block_context& ctx, Uint32
 
   addRecSignal(GSN_FIRE_TRIG_REQ, &Dblqh::execFIRE_TRIG_REQ);
 
+  addRecSignal(GSN_LCP_STATUS_CONF, &Dblqh::execLCP_STATUS_CONF);
+  addRecSignal(GSN_LCP_STATUS_REF, &Dblqh::execLCP_STATUS_REF);
+
   initData();
 
 #ifdef VM_TRACE

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-05-03 09:49:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-05-21 22:27:28 +0000
@@ -595,6 +595,12 @@ void Dblqh::execCONTINUEB(Signal* signal
     wait_readonly(signal);
     return;
   }
+  case ZLCP_FRAG_WATCHDOG:
+  {
+    jam();
+    checkLcpFragWatchdog(signal);
+    return;
+  }
   default:
     ndbrequire(false);
     break;
@@ -13705,7 +13711,9 @@ void Dblqh::execLCP_PREPARE_REF(Signal*
    */
   ndbrequire(refToMain(signal->getSendersBlockRef()) == BACKUP);
   lcpPtr.p->m_error = ref->errorCode;
-  
+
+  stopLcpFragWatchdog();
+
   if (lcpPtr.p->m_outstanding == 0)
   {
     jam();
@@ -13916,6 +13924,8 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Sig
   ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP);
   lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
 
+  stopLcpFragWatchdog();
+
   /* ------------------------------------------------------------------------
    *   THE LOCAL CHECKPOINT HAS BEEN COMPLETED. IT IS NOW TIME TO START 
    *   A LOCAL CHECKPOINT ON THE NEXT FRAGMENT OR COMPLETE THIS LCP ROUND.
@@ -14065,6 +14075,9 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* si
   sendSignal(backupRef, GSN_LCP_PREPARE_REQ, signal, 
 	     LcpPrepareReq::SignalLength, JBB);
 
+  /* Now start the LCP fragment watchdog */
+  startLcpFragWatchdog(signal);
+
 }//Dblqh::sendLCP_FRAGIDREQ()
 
 void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
@@ -23469,6 +23482,30 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
     }
   }
 
+  if (arg == 2397)
+  {
+    /* Send LCP_STATUS_REQ to BACKUP */
+    LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr();
+    req->senderRef = reference();
+    req->reqData = 0;
+    
+    BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+    sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
+               LcpStatusReq::SignalLength, JBB);
+  }
+
+
+  if(arg == 2309)
+  {
+    CRASH_INSERTION(5075);
+
+    progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 
+              "Please report this as a bug. "
+              "Provide as much info as possible, expecially all the "
+              "ndb_*_out.log files, Thanks. "
+              "Shutting down node due to lack of LCP fragment scan progress");  
+  }
+
   if (arg == 5050)
   {
 #ifdef ERROR_INSERT
@@ -23696,6 +23733,195 @@ Dblqh::ndbinfo_write_op(Ndbinfo::Row & r
 }
 
 
+void
+Dblqh::startLcpFragWatchdog(Signal* signal)
+{
+  jam();
+  /* Must not already be running */
+  /* Thread could still be active from a previous run */
+  ndbrequire(c_lcpFragWatchdog.scan_running == false);
+  c_lcpFragWatchdog.scan_running = true;
+  
+  /* If thread is not already active, start it */
+  if (! c_lcpFragWatchdog.thread_active)
+  {
+    jam();
+    invokeLcpFragWatchdogThread(signal);
+  }
+
+  ndbrequire(c_lcpFragWatchdog.thread_active == true);
+}
+
+void
+Dblqh::invokeLcpFragWatchdogThread(Signal* signal)
+{
+  jam();
+  ndbrequire(c_lcpFragWatchdog.scan_running);
+  
+  c_lcpFragWatchdog.thread_active = true;
+  
+  signal->getDataPtrSend()[0] = ZLCP_FRAG_WATCHDOG;
+  sendSignalWithDelay(cownref, GSN_CONTINUEB, signal,
+                      LCPFragWatchdog::PollingPeriodMillis, 1);
+  
+  LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr();
+  req->senderRef = cownref;
+  req->reqData = 1;
+  BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+  sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
+             LcpStatusReq::SignalLength, JBB);
+}
+
+void
+Dblqh::execLCP_STATUS_CONF(Signal* signal)
+{
+  jamEntry();
+  LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
+  
+  if (conf->reqData == 0)
+  {
+    /* DUMP STATE variant */
+    ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef);
+    ndbout_c("  Status = %u, Table = %u, Frag = %u",
+             conf->lcpState,
+             conf->tableId,
+             conf->fragId);
+    ndbout_c("  Replica done rows %llu",
+             (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo);
+    ndbout_c("  Lcp done rows %llu, done bytes %llu",
+             (((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo,
+             (((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo);
+  }
+  
+  /* We can ignore the LCP status as if it's complete then we should
+   * promptly stop watching
+   */
+  c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId,
+                                       conf->fragId,
+                                       (((Uint64)conf->replicaDoneRowsHi) << 32) + 
+                                       conf->replicaDoneRowsLo);
+}
+
+void
+Dblqh::execLCP_STATUS_REF(Signal* signal)
+{
+  jamEntry();
+  LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
+
+  ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u",
+           ref->senderRef, ref->reqData, ref->error);
+
+  ndbrequire(false);
+}
+
+/**
+ * checkLcpFragWatchdog
+ *
+ * This method implements the LCP Frag watchdog 'thread', periodically
+ * checking for progress in the current LCP fragment scan
+ */
+void
+Dblqh::checkLcpFragWatchdog(Signal* signal)
+{
+  jam();
+  ndbrequire(c_lcpFragWatchdog.thread_active == true);
+
+  if (!c_lcpFragWatchdog.scan_running)
+  {
+    jam();
+    /* We've been asked to stop */
+    c_lcpFragWatchdog.thread_active = false;
+    return;
+  }
+
+  c_lcpFragWatchdog.pollCount++;
+
+  /* Check how long we've been waiting for progress on this scan */
+  if (c_lcpFragWatchdog.pollCount >=
+      LCPFragWatchdog::WarnPeriodsWithNoProgress)
+  {
+    jam();
+    warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
+                 "  %llu rows completed",
+                 c_lcpFragWatchdog.tableId,
+                 c_lcpFragWatchdog.fragId,
+                 (LCPFragWatchdog::PollingPeriodMillis * 
+                  c_lcpFragWatchdog.pollCount) / 1000,
+                 c_lcpFragWatchdog.rowCount);
+    ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
+             "  %llu rows completed",
+             c_lcpFragWatchdog.tableId,
+             c_lcpFragWatchdog.fragId,
+             (LCPFragWatchdog::PollingPeriodMillis * 
+              c_lcpFragWatchdog.pollCount) / 1000,
+             c_lcpFragWatchdog.rowCount);
+    
+    if (c_lcpFragWatchdog.pollCount >= 
+        LCPFragWatchdog::MaxPeriodsWithNoProgress)
+    {
+      jam();
+      /* Too long with no progress... */
+      
+      warningEvent("LCP Frag watchdog : Checkpoint of table %u fragment %u "
+                   "too slow (no progress for > %u s).",
+                   c_lcpFragWatchdog.tableId,
+                   c_lcpFragWatchdog.fragId,
+                   (LCPFragWatchdog::PollingPeriodMillis * 
+                    LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000);
+      ndbout_c("LCP Frag watchdog : Checkpoint of table %u fragment %u "
+               "too slow (no progress for > %u s).",
+               c_lcpFragWatchdog.tableId,
+               c_lcpFragWatchdog.fragId,
+               (LCPFragWatchdog::PollingPeriodMillis * 
+                LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000);
+      
+      /* Dump some LCP state for debugging... */
+      {
+        DumpStateOrd* ds = (DumpStateOrd*) signal->getDataPtrSend();
+
+        /* DIH : */
+        ds->args[0] = DumpStateOrd::DihDumpLCPState;
+        sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+        
+        ds->args[0] = 7012;
+        sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+        
+        /* BACKUP : */
+        ds->args[0] = 23;
+        sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+        
+        ds->args[0] = 24;
+        ds->args[1] = 2424;
+        sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 2, JBA);
+
+        /* LQH : */
+        ds->args[0] = DumpStateOrd::LqhDumpLcpState;
+        sendSignal(cownref, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+        
+        /* Delay self-execution to give time for dump output */
+        ds->args[0] = 2309;
+        sendSignalWithDelay(cownref, GSN_DUMP_STATE_ORD, signal, 5*1000, 1);
+      }
+      
+      return;
+    }
+  }
+  
+  invokeLcpFragWatchdogThread(signal);
+}
+
+void
+Dblqh::stopLcpFragWatchdog()
+{
+  jam();
+  /* Mark watchdog as no longer running, 
+   * If the 'thread' is active then it will 
+   * stop at the next wakeup
+   */
+  ndbrequire(c_lcpFragWatchdog.scan_running);
+  c_lcpFragWatchdog.reset();
+};
+    
 /* **************************************************************** */
 /* ---------------------------------------------------------------- */
 /* ---------------------- TRIGGER HANDLING ------------------------ */

=== modified file 'storage/ndb/test/include/NdbMgmd.hpp'
--- a/storage/ndb/test/include/NdbMgmd.hpp	2011-10-24 07:44:52 +0000
+++ b/storage/ndb/test/include/NdbMgmd.hpp	2012-05-21 22:27:28 +0000
@@ -28,12 +28,16 @@
 
 #include "../../src/mgmsrv/Config.hpp"
 
+#include <InputStream.hpp>
+
 class NdbMgmd {
   BaseString m_connect_str;
   NdbMgmHandle m_handle;
   Uint32 m_nodeid;
   bool m_verbose;
   unsigned int m_timeout;
+  NDB_SOCKET_TYPE m_event_socket;
+  
   void error(const char* msg, ...) ATTRIBUTE_FORMAT(printf, 2, 3)
   {
     if (!m_verbose)
@@ -391,6 +395,74 @@ public:
     return true;
   }
 
+  bool subscribe_to_events(void)
+  {
+    if (!is_connected())
+    {
+      error("subscribe_to_events: not connected");
+      return false;
+    }
+    
+    int filter[] = 
+    {
+      15, NDB_MGM_EVENT_CATEGORY_STARTUP,
+      15, NDB_MGM_EVENT_CATEGORY_SHUTDOWN,
+      15, NDB_MGM_EVENT_CATEGORY_STATISTIC,
+      15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT,
+      15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART,
+      15, NDB_MGM_EVENT_CATEGORY_CONNECTION,
+      15, NDB_MGM_EVENT_CATEGORY_BACKUP,
+      15, NDB_MGM_EVENT_CATEGORY_CONGESTION,
+      15, NDB_MGM_EVENT_CATEGORY_DEBUG,
+      15, NDB_MGM_EVENT_CATEGORY_INFO,
+      0
+    };
+
+#ifdef NDB_WIN
+    m_event_socket.s = ndb_mgm_listen_event(m_handle, filter); 
+#else
+    m_event_socket.fd = ndb_mgm_listen_event(m_handle, filter);
+#endif
+    
+    return my_socket_valid(m_event_socket);
+  }
+
+  bool get_next_event_line(char* buff, int bufflen,
+                          int timeout_millis)
+  {
+    if (!is_connected())
+    {
+      error("get_next_event_line: not connected");
+      return false;
+    }
+    
+    if (!my_socket_valid(m_event_socket))
+    {
+      error("get_next_event_line: not subscribed");
+      return false;
+    }
+
+    SocketInputStream stream(m_event_socket, timeout_millis);
+    
+    const char* result = stream.gets(buff, bufflen);
+    if (result && strlen(result))
+    {
+      return true;
+    }
+    else
+    {
+      if (stream.timedout())
+      {
+        error("get_next_event_line: stream.gets timed out");
+        return false;
+      }
+    }
+    
+    error("get_next_event_line: error from stream.gets()");
+    return false;
+  }
+  
+
   // Pretty printer for 'ndb_mgm_node_type'
   class NodeType {
     BaseString m_str;

=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp	2012-01-12 13:15:36 +0000
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp	2012-05-21 22:27:28 +0000
@@ -26,6 +26,7 @@
 #include <Bitmask.hpp>
 #include <RefConvert.hpp>
 #include <NdbEnv.h>
+#include <NdbMgmd.hpp>
 
 int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
 
@@ -5015,6 +5016,129 @@ runLCPTakeOver(NDBT_Context* ctx, NDBT_S
   return NDBT_OK;
 }
 
+int
+runTestScanFragWatchdog(NDBT_Context* ctx, NDBT_Step* step)
+{
+  /* Setup an error insert, then start a checkpoint */
+  NdbRestarter restarter;
+  if (restarter.getNumDbNodes() < 2)
+  {
+    g_err << "Insufficient nodes for test." << endl;
+    ctx->stopTest();
+    return NDBT_OK;
+  }
+  
+  do
+  {
+    g_err << "Injecting fault to suspend LCP frag scan..." << endl;
+    Uint32 victim = restarter.getNode(NdbRestarter::NS_RANDOM);
+    Uint32 otherNode = 0;
+    do
+    {
+      otherNode = restarter.getNode(NdbRestarter::NS_RANDOM);
+    } while (otherNode == victim);
+
+    if (restarter.insertErrorInNode(victim, 10039) != 0) /* Cause LCP/backup frag scan to halt */
+    {
+      g_err << "Error insert failed." << endl;
+      break;
+    }
+    if (restarter.insertErrorInNode(victim, 5075) != 0) /* Treat watchdog fail as test success */
+    {
+      g_err << "Error insert failed." << endl;
+      break;
+    }
+    
+    g_err << "Triggering LCP..." << endl;
+    /* Now trigger LCP, in case the concurrent updates don't */
+    {
+      int startLcpDumpCode = 7099;
+      if (restarter.dumpStateOneNode(victim, &startLcpDumpCode, 1))
+      {
+        g_err << "Dump state failed." << endl;
+        break;
+      }
+    }
+    
+    g_err << "Subscribing to MGMD events..." << endl;
+
+    NdbMgmd mgmd;
+    
+    if (!mgmd.connect())
+    {
+      g_err << "Failed to connect to MGMD" << endl;
+      break;
+    }
+
+    if (!mgmd.subscribe_to_events())
+    {
+      g_err << "Failed to subscribe to events" << endl;
+      break;
+    }
+
+    g_err << "Waiting to hear of LCP completion..." << endl;
+    Uint32 completedLcps = 0;
+    Uint64 maxWaitSeconds = 240;
+    Uint64 endTime = NdbTick_CurrentMillisecond() + 
+      (maxWaitSeconds * 1000);
+    
+    while (NdbTick_CurrentMillisecond() < endTime)
+    {
+      char buff[512];
+      
+      if (!mgmd.get_next_event_line(buff,
+                                    sizeof(buff),
+                                    10 * 1000))
+      {
+        g_err << "Failed to get event line " << endl;
+        break;
+      }
+
+      // g_err << "Event : " << buff;
+
+      if (strstr(buff, "Local checkpoint") &&
+          strstr(buff, "completed"))
+      {
+        completedLcps++;
+        g_err << "LCP " << completedLcps << " completed." << endl;
+        
+        if (completedLcps == 2)
+          break;
+
+        /* Request + wait for another... */
+        {
+          int startLcpDumpCode = 7099;
+          if (restarter.dumpStateOneNode(otherNode, &startLcpDumpCode, 1))
+          {
+            g_err << "Dump state failed." << endl;
+            break;
+          }
+        }
+      } 
+    }
+    
+    if (completedLcps != 2)
+    {
+      g_err << "Some problem while waiting for LCP completion" << endl;
+      break;
+    }
+
+    /* Now wait for the node to recover */
+    if (restarter.waitNodesStarted((const int*) &victim, 1, 120) != 0)
+    {
+      g_err << "Failed waiting for node " << victim << "to start" << endl;
+      break;
+    }
+    
+    ctx->stopTest();
+    return NDBT_OK;
+  } while (0);
+  
+  ctx->stopTest();
+  return NDBT_FAILED;
+}
+
+
 NDBT_TESTSUITE(testNodeRestart);
 TESTCASE("NoLoad", 
 	 "Test that one node at a time can be stopped and then restarted "\
@@ -5575,6 +5699,13 @@ TESTCASE("LCPTakeOver", "")
   STEP(runPkUpdateUntilStopped);
   STEP(runScanUpdateUntilStopped);
 }
+TESTCASE("LCPScanFragWatchdog", 
+         "Test LCP scan watchdog")
+{
+  INITIALIZER(runLoadTable);
+  STEP(runPkUpdateUntilStopped);
+  STEP(runTestScanFragWatchdog);
+}
 
 NDBT_TESTSUITE_END(testNodeRestart);
 

=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt	2011-10-05 13:18:31 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt	2012-05-21 22:27:28 +0000
@@ -73,3 +73,8 @@ max-time: 1800
 cmd: testDict
 args: -n SchemaTrans -l 1
 
+# LCP Frag watchdog
+max-time: 600
+cmd: testNodeRestart
+args: -n LCPScanFragWatchdog T2
+

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4926 to 4927)Bug#14075825Frazer Clement22 May