4927 Frazer Clement 2012-05-21
Bug #14075825 LCP Watchdog : Fragment scan check
Test commit for CluB X.X
modified:
storage/ndb/include/kernel/GlobalSignalNumbers.h
storage/ndb/include/kernel/signaldata/LCP.hpp
storage/ndb/include/kernel/signaldata/SignalData.hpp
storage/ndb/src/common/debugger/signaldata/LCP.cpp
storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/backup/Backup.hpp
storage/ndb/src/kernel/blocks/backup/BackupInit.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/test/include/NdbMgmd.hpp
storage/ndb/test/ndbapi/testNodeRestart.cpp
storage/ndb/test/run-test/daily-devel-tests.txt
4926 magnus.blaudd@stripped 2012-05-07 [merge]
Merge
modified:
storage/ndb/include/util/SocketAuthenticator.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/util/SocketAuthenticator.cpp
storage/ndb/test/include/NdbTimer.hpp
storage/ndb/test/tools/connect.cpp
=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2012-01-10 10:59:37 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2012-05-21 16:26:42 +0000
@@ -836,9 +836,10 @@ extern const GlobalSignalNumber NO_OF_SI
#define GSN_SYNC_PATH_REQ 613
#define GSN_SYNC_PATH_CONF 614
-#define GSN_615
-#define GSN_616
-#define GSN_617
+
+#define GSN_LCP_STATUS_REQ 615
+#define GSN_LCP_STATUS_CONF 616
+#define GSN_LCP_STATUS_REF 617
#define GSN_618
#define GSN_619
=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-05-21 16:26:42 +0000
@@ -223,4 +223,110 @@ struct EndLcpConf
STATIC_CONST( SignalLength = 2 );
};
+struct LcpStatusReq
+{
+ /**
+ * Sender(s)
+ */
+ friend class Dblqh;
+
+ /**
+ * Sender(s) / Receiver(s)
+ */
+
+ /**
+ * Receiver(s)
+ */
+ friend class Backup;
+
+ friend bool printLCP_STATUS_REQ(FILE *, const Uint32 *, Uint32, Uint16);
+public:
+
+ STATIC_CONST( SignalLength = 2 );
+
+private:
+ Uint32 senderRef;
+ Uint32 reqData;
+};
+
+struct LcpStatusConf
+{
+ /**
+ * Sender(s)
+ */
+ friend class Backup;
+
+ /**
+ * Sender(s) / Receiver(s)
+ */
+
+ /**
+ * Receiver(s)
+ */
+ friend class Dblqh;
+
+ friend bool printLCP_STATUS_CONF(FILE *, const Uint32 *, Uint32, Uint16);
+public:
+ STATIC_CONST( SignalLength = 11 );
+
+ enum LcpState
+ {
+ LCP_IDLE = 0,
+ LCP_PREPARED = 1,
+ LCP_SCANNING = 2,
+ LCP_SCANNED = 3
+ };
+private:
+ Uint32 senderRef;
+ Uint32 reqData;
+ /* Backup stuff */
+ Uint32 lcpState;
+ /* In lcpState == LCP_IDLE, refers to prev LCP
+ * otherwise, refers to current running LCP
+ */
+ Uint32 lcpDoneRowsHi;
+ Uint32 lcpDoneRowsLo;
+ Uint32 lcpDoneBytesHi;
+ Uint32 lcpDoneBytesLo;
+
+ /* Backup stuff valid iff lcpState == LCP_SCANNING */
+ Uint32 tableId;
+ Uint32 fragId;
+ Uint32 replicaDoneRowsHi;
+ Uint32 replicaDoneRowsLo;
+};
+
+struct LcpStatusRef
+{
+ /**
+ * Sender(s)
+ */
+ friend class Backup;
+
+ /**
+ * Sender(s) / Receiver(s)
+ */
+
+ /**
+ * Receiver(s)
+ */
+ friend class Dblqh;
+
+ friend bool printLCP_STATUS_REF(FILE *, const Uint32 *, Uint32, Uint16);
+public:
+ STATIC_CONST( SignalLength = 3 );
+
+ enum StatusFailCodes
+ {
+ NoLCPRecord = 1,
+ NoTableRecord = 2,
+ NoFileRecord = 3
+ };
+
+private:
+ Uint32 senderRef;
+ Uint32 reqData;
+ Uint32 error;
+};
+
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/SignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalData.hpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalData.hpp 2012-05-21 16:26:42 +0000
@@ -321,4 +321,8 @@ GSN_PRINT_SIGNATURE(printGET_CONFIG_REQ)
GSN_PRINT_SIGNATURE(printGET_CONFIG_REF);
GSN_PRINT_SIGNATURE(printGET_CONFIG_CONF);
+GSN_PRINT_SIGNATURE(printLCP_STATUS_REQ);
+GSN_PRINT_SIGNATURE(printLCP_STATUS_CONF);
+GSN_PRINT_SIGNATURE(printLCP_STATUS_REF);
+
#endif
=== modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-05-21 16:26:42 +0000
@@ -89,3 +89,37 @@ printLCP_COMPLETE_REP(FILE * output, con
sig->lcpId, sig->nodeId, getBlockName(sig->blockNo));
return true;
}
+
+bool
+printLCP_STATUS_REQ(FILE * output, const Uint32 * theData,
+ Uint32 len, Uint16 receiverBlockNo){
+ const LcpStatusReq* const sig = (LcpStatusReq*) theData;
+
+ fprintf(output, " SenderRef : %x ReqData : %u\n",
+ sig->senderRef, sig->reqData);
+ return true;
+}
+
+bool
+printLCP_STATUS_CONF(FILE * output, const Uint32 * theData,
+ Uint32 len, Uint16 receiverBlockNo){
+ const LcpStatusConf* const sig = (LcpStatusConf*) theData;
+
+ fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n",
+ sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId);
+ fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
+ (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo,
+ (((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo,
+ (((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo);
+ return true;
+}
+
+bool
+printLCP_STATUS_REF(FILE * output, const Uint32 * theData,
+ Uint32 len, Uint16 receiverBlockNo){
+ const LcpStatusRef* const sig = (LcpStatusRef*) theData;
+
+ fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n",
+ sig->senderRef, sig->reqData, sig->error);
+ return true;
+}
=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp 2012-05-21 16:26:42 +0000
@@ -272,6 +272,10 @@ SignalDataPrintFunctions[] = {
,{ GSN_GET_CONFIG_REF, printGET_CONFIG_REF }
,{ GSN_GET_CONFIG_CONF, printGET_CONFIG_CONF }
+ ,{ GSN_LCP_STATUS_REQ, printLCP_STATUS_REQ }
+ ,{ GSN_LCP_STATUS_CONF, printLCP_STATUS_CONF }
+ ,{ GSN_LCP_STATUS_REF, printLCP_STATUS_REF }
+
,{ 0, 0 }
};
=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2012-05-21 16:26:42 +0000
@@ -775,5 +775,9 @@ const GsnName SignalNames [] = {
,{ GSN_GET_CONFIG_REQ, "GET_CONFIG_REQ" }
,{ GSN_GET_CONFIG_REF, "GET_CONFIG_REF" }
,{ GSN_GET_CONFIG_CONF, "GET_CONFIG_CONF" }
+
+ ,{ GSN_LCP_STATUS_REQ, "LCP_STATUS_REQ" }
+ ,{ GSN_LCP_STATUS_CONF, "LCP_STATUS_CONF" }
+ ,{ GSN_LCP_STATUS_REF, "LCP_STATUS_REF" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2011-12-08 14:32:19 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-05-21 16:26:42 +0000
@@ -4471,6 +4471,10 @@ Backup::fragmentCompleted(Signal* signal
if (ptr.p->is_lcp())
{
+ /* Maintain LCP totals */
+ ptr.p->noOfRecords+= op.noOfRecords;
+ ptr.p->noOfBytes+= op.noOfBytes;
+
ptr.p->slaveState.setState(STOPPING);
filePtr.p->operation.dataBuffer.eof();
}
@@ -5682,7 +5686,14 @@ Backup::execLCP_PREPARE_REQ(Signal* sign
fragPtr.p->scanned = 0;
fragPtr.p->scanning = 0;
fragPtr.p->tableId = req.tableId;
-
+
+ if (req.backupId != ptr.p->backupId)
+ {
+ jam();
+ /* New LCP, reset per-LCP counters */
+ ptr.p->noOfBytes = 0;
+ ptr.p->noOfRecords = 0;
+ }
ptr.p->backupId= req.backupId;
lcp_open_file(signal, ptr);
}
@@ -5843,3 +5854,127 @@ Backup::execEND_LCPREQ(Signal* signal)
sendSignal(ptr.p->masterRef, GSN_END_LCPCONF,
signal, EndLcpConf::SignalLength, JBB);
}
+
+inline
+static
+void setWords(const Uint64 src, Uint32& hi, Uint32& lo)
+{
+ hi = (Uint32) (src >> 32);
+ lo = (Uint32) (src & 0xffffffff);
+}
+
+void
+Backup::execLCP_STATUS_REQ(Signal* signal)
+{
+ jamEntry();
+ const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr();
+
+ const Uint32 senderRef = req->senderRef;
+ const Uint32 reqData = req->reqData;
+ Uint32 failCode = LcpStatusRef::NoLCPRecord;
+
+ /* Find LCP backup, if there is one */
+ BackupRecordPtr ptr;
+ bool found_lcp = false;
+ for (c_backups.first(ptr); ptr.i != RNIL; c_backups.next(ptr))
+ {
+ jam();
+ if (ptr.p->is_lcp())
+ {
+ jam();
+ ndbrequire(found_lcp == false); /* Just one LCP */
+ found_lcp = true;
+
+ LcpStatusConf::LcpState state = LcpStatusConf::LCP_IDLE;
+ switch (ptr.p->slaveState.getState())
+ {
+ case STARTED:
+ state = LcpStatusConf::LCP_PREPARED;
+ break;
+ case SCANNING:
+ state = LcpStatusConf::LCP_SCANNING;
+ break;
+ case STOPPING:
+ state = LcpStatusConf::LCP_SCANNED;
+ break;
+ case DEFINED:
+ state = LcpStatusConf::LCP_IDLE;
+ break;
+ default:
+ ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u",
+ ptr.p->slaveState.getState());
+ state = LcpStatusConf::LCP_IDLE;
+ };
+
+ /* Not all values are set here */
+ const Uint32 UnsetConst = ~0;
+
+ LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
+ conf->senderRef = reference();
+ conf->reqData = reqData;
+ conf->lcpState = state;
+ conf->tableId = UnsetConst;
+ conf->fragId = UnsetConst;
+ conf->replicaDoneRowsHi = UnsetConst;
+ conf->replicaDoneRowsLo = UnsetConst;
+ setWords(ptr.p->noOfRecords,
+ conf->lcpDoneRowsHi,
+ conf->lcpDoneRowsLo);
+ setWords(ptr.p->noOfBytes,
+ conf->lcpDoneBytesHi,
+ conf->lcpDoneBytesLo);
+
+ if (state == LcpStatusConf::LCP_SCANNING)
+ {
+ /* Actually scanning a fragment, let's grab the details */
+ TablePtr tabPtr;
+ FragmentPtr fragPtr;
+ BackupFilePtr filePtr;
+
+ ptr.p->tables.first(tabPtr);
+ if (tabPtr.i == RNIL)
+ {
+ jam();
+ failCode = LcpStatusRef::NoTableRecord;
+ break;
+ }
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+ ndbrequire(fragPtr.p->tableId == tabPtr.p->tableId);
+ if (ptr.p->dataFilePtr == RNIL)
+ {
+ jam();
+ failCode = LcpStatusRef::NoFileRecord;
+ break;
+ }
+ c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+ ndbrequire(filePtr.p->backupPtr == ptr.i);
+ conf->tableId = tabPtr.p->tableId;
+ conf->fragId = fragPtr.p->fragmentId;
+ setWords(filePtr.p->operation.noOfRecords,
+ conf->replicaDoneRowsHi,
+ conf->replicaDoneRowsLo);
+ }
+
+ failCode = 0;
+ }
+ }
+
+ if (failCode == 0)
+ {
+ jam();
+ sendSignal(senderRef, GSN_LCP_STATUS_CONF,
+ signal, LcpStatusConf::SignalLength, JBB);
+ return;
+ }
+
+ jam();
+ LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
+
+ ref->senderRef = reference();
+ ref->reqData = reqData;
+ ref->error = failCode;
+
+ sendSignal(senderRef, GSN_LCP_STATUS_REF,
+ signal, LcpStatusRef::SignalLength, JBB);
+ return;
+}
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2012-05-21 16:26:42 +0000
@@ -160,6 +160,8 @@ protected:
void execDBINFO_SCANREQ(Signal *signal);
+ void execLCP_STATUS_REQ(Signal* signal);
+
private:
void defineBackupMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);
void dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);
=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2012-01-30 12:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2012-05-21 16:26:42 +0000
@@ -117,6 +117,8 @@ Backup::Backup(Block_context& ctx, Uint3
addRecSignal(GSN_BACKUP_LOCK_TAB_CONF, &Backup::execBACKUP_LOCK_TAB_CONF);
addRecSignal(GSN_BACKUP_LOCK_TAB_REF, &Backup::execBACKUP_LOCK_TAB_REF);
+ addRecSignal(GSN_LCP_STATUS_REQ, &Backup::execLCP_STATUS_REQ);
+
/**
* Testing
*/
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-01-25 14:29:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-05-21 16:26:42 +0000
@@ -249,6 +249,7 @@ class Lgman;
#define ZWAIT_REORG_SUMA_FILTER_ENABLED 23
#define ZREBUILD_ORDERED_INDEXES 24
#define ZWAIT_READONLY 25
+#define ZLCP_FRAG_WATCHDOG 26
/* ------------------------------------------------------------------------- */
/* NODE STATE DURING SYSTEM RESTART, VARIABLES CNODES_SR_STATE */
@@ -1083,6 +1084,72 @@ public:
void send_io(Uint32 bytes);
void complete_io(Uint32 bytes);
};
+
+ /**
+ * LCPFragWatchdog
+ *
+ * Structure tracking state of LCP fragment watchdog.
+ * This watchdog polls the state of the current LCP fragment
+ * scan to ensure that forward progress is maintained at
+ * a minimal rate.
+ * It only continues running while this LQH instance
+ * thinks a fragment scan is ongoing
+ */
+ struct LCPFragWatchdog
+ {
+ STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */
+ STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */
+ STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */
+
+ /* Should the watchdog be running? */
+ bool scan_running;
+
+ /* Is there an active thread? */
+ bool thread_active;
+
+ /* LCP position info from Backup block */
+ Uint32 tableId;
+ Uint32 fragId;
+ Uint64 rowCount;
+
+ /* Number of periods with no LCP progress observed */
+ Uint32 pollCount;
+
+ /* Reinitialise the watchdog */
+ void reset()
+ {
+ scan_running = false;
+ tableId = ~Uint32(0);
+ fragId = ~Uint32(0);
+ rowCount = ~Uint64(0);
+ pollCount = 0;
+ }
+
+ /* Handle an LCP Status report */
+ void handleLcpStatusRep(Uint32 repTableId,
+ Uint32 repFragId,
+ Uint64 repRowCount)
+ {
+ if (scan_running)
+ {
+ if ((repRowCount != rowCount) ||
+ (repFragId != fragId) ||
+ (repTableId != tableId))
+ {
+ /* Something moved since last time, reset
+ * poll counter and data.
+ */
+ pollCount = 0;
+ tableId = repTableId;
+ fragId = repFragId;
+ rowCount = repRowCount;
+ }
+ }
+ }
+ };
+
+ LCPFragWatchdog c_lcpFragWatchdog;
+
/* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
/* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
@@ -2736,6 +2803,14 @@ private:
void unlockError(Signal* signal, Uint32 error);
void handleUserUnlockRequest(Signal* signal);
+ void execLCP_STATUS_CONF(Signal* signal);
+ void execLCP_STATUS_REF(Signal* signal);
+
+ void startLcpFragWatchdog(Signal* signal);
+ void stopLcpFragWatchdog();
+ void invokeLcpFragWatchdogThread(Signal* signal);
+ void checkLcpFragWatchdog(Signal* signal);
+
Dbtup* c_tup;
Dbacc* c_acc;
Lgman* c_lgman;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-11-16 11:05:46 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-05-21 16:26:42 +0000
@@ -90,6 +90,9 @@ void Dblqh::initData()
c_max_redo_lag_counter = 3; // 3 strikes and you're out
c_max_parallel_scans_per_frag = 32;
+
+ c_lcpFragWatchdog.reset();
+ c_lcpFragWatchdog.thread_active = false;
}//Dblqh::initData()
void Dblqh::initRecords()
@@ -426,6 +429,9 @@ Dblqh::Dblqh(Block_context& ctx, Uint32
addRecSignal(GSN_FIRE_TRIG_REQ, &Dblqh::execFIRE_TRIG_REQ);
+ addRecSignal(GSN_LCP_STATUS_CONF, &Dblqh::execLCP_STATUS_CONF);
+ addRecSignal(GSN_LCP_STATUS_REF, &Dblqh::execLCP_STATUS_REF);
+
initData();
#ifdef VM_TRACE
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-03 09:49:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-21 16:26:42 +0000
@@ -595,6 +595,12 @@ void Dblqh::execCONTINUEB(Signal* signal
wait_readonly(signal);
return;
}
+ case ZLCP_FRAG_WATCHDOG:
+ {
+ jam();
+ checkLcpFragWatchdog(signal);
+ return;
+ }
default:
ndbrequire(false);
break;
@@ -13705,7 +13711,9 @@ void Dblqh::execLCP_PREPARE_REF(Signal*
*/
ndbrequire(refToMain(signal->getSendersBlockRef()) == BACKUP);
lcpPtr.p->m_error = ref->errorCode;
-
+
+ stopLcpFragWatchdog();
+
if (lcpPtr.p->m_outstanding == 0)
{
jam();
@@ -13916,6 +13924,8 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Sig
ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP);
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
+ stopLcpFragWatchdog();
+
/* ------------------------------------------------------------------------
* THE LOCAL CHECKPOINT HAS BEEN COMPLETED. IT IS NOW TIME TO START
* A LOCAL CHECKPOINT ON THE NEXT FRAGMENT OR COMPLETE THIS LCP ROUND.
@@ -14065,6 +14075,9 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* si
sendSignal(backupRef, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
+ /* Now start the LCP fragment watchdog */
+ startLcpFragWatchdog(signal);
+
}//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
@@ -23469,6 +23482,30 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
}
}
+ if (arg == 2397)
+ {
+ /* Send LCP_STATUS_REQ to BACKUP */
+ LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr();
+ req->senderRef = reference();
+ req->reqData = 0;
+
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
+ LcpStatusReq::SignalLength, JBB);
+ }
+
+
+ if(arg == 2309)
+ {
+ CRASH_INSERTION(5075);
+
+ progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
+ "Please report this as a bug. "
+ "Provide as much info as possible, expecially all the "
+ "ndb_*_out.log files, Thanks. "
+ "Shutting down node due to lack of LCP fragment scan progress");
+ }
+
if (arg == 5050)
{
#ifdef ERROR_INSERT
@@ -23696,6 +23733,195 @@ Dblqh::ndbinfo_write_op(Ndbinfo::Row & r
}
+void
+Dblqh::startLcpFragWatchdog(Signal* signal)
+{
+ jam();
+ /* Must not already be running */
+ /* Thread could still be active from a previous run */
+ ndbrequire(c_lcpFragWatchdog.scan_running == false);
+ c_lcpFragWatchdog.scan_running = true;
+
+ /* If thread is not already active, start it */
+ if (! c_lcpFragWatchdog.thread_active)
+ {
+ jam();
+ invokeLcpFragWatchdogThread(signal);
+ }
+
+ ndbrequire(c_lcpFragWatchdog.thread_active == true);
+}
+
+void
+Dblqh::invokeLcpFragWatchdogThread(Signal* signal)
+{
+ jam();
+ ndbrequire(c_lcpFragWatchdog.scan_running);
+
+ c_lcpFragWatchdog.thread_active = true;
+
+ signal->getDataPtrSend()[0] = ZLCP_FRAG_WATCHDOG;
+ sendSignalWithDelay(cownref, GSN_CONTINUEB, signal,
+ LCPFragWatchdog::PollingPeriodMillis, 1);
+
+ LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr();
+ req->senderRef = cownref;
+ req->reqData = 1;
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
+ LcpStatusReq::SignalLength, JBB);
+}
+
+void
+Dblqh::execLCP_STATUS_CONF(Signal* signal)
+{
+ jamEntry();
+ LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
+
+ if (conf->reqData == 0)
+ {
+ /* DUMP STATE variant */
+ ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef);
+ ndbout_c(" Status = %u, Table = %u, Frag = %u",
+ conf->lcpState,
+ conf->tableId,
+ conf->fragId);
+ ndbout_c(" Replica done rows %llu",
+ (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo);
+ ndbout_c(" Lcp done rows %llu, done bytes %llu",
+ (((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo,
+ (((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo);
+ }
+
+ /* We can ignore the LCP status as if it's complete then we should
+ * promptly stop watching
+ */
+ c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId,
+ conf->fragId,
+ (((Uint64)conf->replicaDoneRowsHi) << 32) +
+ conf->replicaDoneRowsLo);
+}
+
+void
+Dblqh::execLCP_STATUS_REF(Signal* signal)
+{
+ jamEntry();
+ LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
+
+ ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u",
+ ref->senderRef, ref->reqData, ref->error);
+
+ ndbrequire(false);
+}
+
+/**
+ * checkLcpFragWatchdog
+ *
+ * This method implements the LCP Frag watchdog 'thread', periodically
+ * checking for progress in the current LCP fragment scan
+ */
+void
+Dblqh::checkLcpFragWatchdog(Signal* signal)
+{
+ jam();
+ ndbrequire(c_lcpFragWatchdog.thread_active == true);
+
+ if (!c_lcpFragWatchdog.scan_running)
+ {
+ jam();
+ /* We've been asked to stop */
+ c_lcpFragWatchdog.thread_active = false;
+ return;
+ }
+
+ c_lcpFragWatchdog.pollCount++;
+
+ /* Check how long we've been waiting for progress on this scan */
+ if (c_lcpFragWatchdog.pollCount >=
+ LCPFragWatchdog::WarnPeriodsWithNoProgress)
+ {
+ jam();
+ warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
+ " %llu rows completed",
+ c_lcpFragWatchdog.tableId,
+ c_lcpFragWatchdog.fragId,
+ (LCPFragWatchdog::PollingPeriodMillis *
+ c_lcpFragWatchdog.pollCount) / 1000,
+ c_lcpFragWatchdog.rowCount);
+ ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
+ " %llu rows completed",
+ c_lcpFragWatchdog.tableId,
+ c_lcpFragWatchdog.fragId,
+ (LCPFragWatchdog::PollingPeriodMillis *
+ c_lcpFragWatchdog.pollCount) / 1000,
+ c_lcpFragWatchdog.rowCount);
+
+ if (c_lcpFragWatchdog.pollCount >=
+ LCPFragWatchdog::MaxPeriodsWithNoProgress)
+ {
+ jam();
+ /* Too long with no progress... */
+
+ warningEvent("LCP Frag watchdog : Checkpoint of table %u fragment %u "
+ "too slow (no progress for > %u s).",
+ c_lcpFragWatchdog.tableId,
+ c_lcpFragWatchdog.fragId,
+ (LCPFragWatchdog::PollingPeriodMillis *
+ LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000);
+ ndbout_c("LCP Frag watchdog : Checkpoint of table %u fragment %u "
+ "too slow (no progress for > %u s).",
+ c_lcpFragWatchdog.tableId,
+ c_lcpFragWatchdog.fragId,
+ (LCPFragWatchdog::PollingPeriodMillis *
+ LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000);
+
+ /* Dump some LCP state for debugging... */
+ {
+ DumpStateOrd* ds = (DumpStateOrd*) signal->getDataPtrSend();
+
+ /* DIH : */
+ ds->args[0] = DumpStateOrd::DihDumpLCPState;
+ sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+
+ ds->args[0] = 7012;
+ sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+
+ /* BACKUP : */
+ ds->args[0] = 23;
+ sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+
+ ds->args[0] = 24;
+ ds->args[1] = 2424;
+ sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 2, JBA);
+
+ /* LQH : */
+ ds->args[0] = DumpStateOrd::LqhDumpLcpState;
+ sendSignal(cownref, GSN_DUMP_STATE_ORD, signal, 1, JBA);
+
+ /* Delay self-execution to give time for dump output */
+ ds->args[0] = 2309;
+ sendSignalWithDelay(cownref, GSN_DUMP_STATE_ORD, signal, 5*1000, 1);
+ }
+
+ return;
+ }
+ }
+
+ invokeLcpFragWatchdogThread(signal);
+}
+
+void
+Dblqh::stopLcpFragWatchdog()
+{
+ jam();
+ /* Mark watchdog as no longer running,
+ * If the 'thread' is active then it will
+ * stop at the next wakeup
+ */
+ ndbrequire(c_lcpFragWatchdog.scan_running);
+ c_lcpFragWatchdog.reset();
+};
+
/* **************************************************************** */
/* ---------------------------------------------------------------- */
/* ---------------------- TRIGGER HANDLING ------------------------ */
=== modified file 'storage/ndb/test/include/NdbMgmd.hpp'
--- a/storage/ndb/test/include/NdbMgmd.hpp 2011-10-24 07:44:52 +0000
+++ b/storage/ndb/test/include/NdbMgmd.hpp 2012-05-21 16:26:42 +0000
@@ -28,12 +28,16 @@
#include "../../src/mgmsrv/Config.hpp"
+#include <InputStream.hpp>
+
class NdbMgmd {
BaseString m_connect_str;
NdbMgmHandle m_handle;
Uint32 m_nodeid;
bool m_verbose;
unsigned int m_timeout;
+ NDB_SOCKET_TYPE m_event_socket;
+
void error(const char* msg, ...) ATTRIBUTE_FORMAT(printf, 2, 3)
{
if (!m_verbose)
@@ -391,6 +395,74 @@ public:
return true;
}
+ bool subscribe_to_events(void)
+ {
+ if (!is_connected())
+ {
+ error("subscribe_to_events: not connected");
+ return false;
+ }
+
+ int filter[] =
+ {
+ 15, NDB_MGM_EVENT_CATEGORY_STARTUP,
+ 15, NDB_MGM_EVENT_CATEGORY_SHUTDOWN,
+ 15, NDB_MGM_EVENT_CATEGORY_STATISTIC,
+ 15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT,
+ 15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART,
+ 15, NDB_MGM_EVENT_CATEGORY_CONNECTION,
+ 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
+ 15, NDB_MGM_EVENT_CATEGORY_CONGESTION,
+ 15, NDB_MGM_EVENT_CATEGORY_DEBUG,
+ 15, NDB_MGM_EVENT_CATEGORY_INFO,
+ 0
+ };
+
+#ifdef NDB_WIN
+ m_event_socket.s = ndb_mgm_listen_event(m_handle, filter);
+#else
+ m_event_socket.fd = ndb_mgm_listen_event(m_handle, filter);
+#endif
+
+ return my_socket_valid(m_event_socket);
+ }
+
+ bool get_next_event_line(char* buff, int bufflen,
+ int timeout_millis)
+ {
+ if (!is_connected())
+ {
+ error("get_next_event_line: not connected");
+ return false;
+ }
+
+ if (!my_socket_valid(m_event_socket))
+ {
+ error("get_next_event_line: not subscribed");
+ return false;
+ }
+
+ SocketInputStream stream(m_event_socket, timeout_millis);
+
+ const char* result = stream.gets(buff, bufflen);
+ if (result && strlen(result))
+ {
+ return true;
+ }
+ else
+ {
+ if (stream.timedout())
+ {
+ error("get_next_event_line: stream.gets timed out");
+ return false;
+ }
+ }
+
+ error("get_next_event_line: error from stream.gets()");
+ return false;
+ }
+
+
// Pretty printer for 'ndb_mgm_node_type'
class NodeType {
BaseString m_str;
=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp 2012-01-12 13:15:36 +0000
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp 2012-05-21 16:26:42 +0000
@@ -26,6 +26,7 @@
#include <Bitmask.hpp>
#include <RefConvert.hpp>
#include <NdbEnv.h>
+#include <NdbMgmd.hpp>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
@@ -5015,6 +5016,129 @@ runLCPTakeOver(NDBT_Context* ctx, NDBT_S
return NDBT_OK;
}
+int
+runTestScanFragWatchdog(NDBT_Context* ctx, NDBT_Step* step)
+{
+ /* Setup an error insert, then start a checkpoint */
+ NdbRestarter restarter;
+ if (restarter.getNumDbNodes() < 2)
+ {
+ g_err << "Insufficient nodes for test." << endl;
+ ctx->stopTest();
+ return NDBT_OK;
+ }
+
+ do
+ {
+ g_err << "Injecting fault to suspend LCP frag scan..." << endl;
+ Uint32 victim = restarter.getNode(NdbRestarter::NS_RANDOM);
+ Uint32 otherNode = 0;
+ do
+ {
+ otherNode = restarter.getNode(NdbRestarter::NS_RANDOM);
+ } while (otherNode == victim);
+
+ if (restarter.insertErrorInNode(victim, 10039) != 0) /* Cause LCP/backup frag scan to halt */
+ {
+ g_err << "Error insert failed." << endl;
+ break;
+ }
+ if (restarter.insertErrorInNode(victim, 5075) != 0) /* Treat watchdog fail as test success */
+ {
+ g_err << "Error insert failed." << endl;
+ break;
+ }
+
+ g_err << "Triggering LCP..." << endl;
+ /* Now trigger LCP, in case the concurrent updates don't */
+ {
+ int startLcpDumpCode = 7099;
+ if (restarter.dumpStateOneNode(victim, &startLcpDumpCode, 1))
+ {
+ g_err << "Dump state failed." << endl;
+ break;
+ }
+ }
+
+ g_err << "Subscribing to MGMD events..." << endl;
+
+ NdbMgmd mgmd;
+
+ if (!mgmd.connect())
+ {
+ g_err << "Failed to connect to MGMD" << endl;
+ break;
+ }
+
+ if (!mgmd.subscribe_to_events())
+ {
+ g_err << "Failed to subscribe to events" << endl;
+ break;
+ }
+
+ g_err << "Waiting to hear of LCP completion..." << endl;
+ Uint32 completedLcps = 0;
+ Uint64 maxWaitSeconds = 240;
+ Uint64 endTime = NdbTick_CurrentMillisecond() +
+ (maxWaitSeconds * 1000);
+
+ while (NdbTick_CurrentMillisecond() < endTime)
+ {
+ char buff[512];
+
+ if (!mgmd.get_next_event_line(buff,
+ sizeof(buff),
+ 10 * 1000))
+ {
+ g_err << "Failed to get event line " << endl;
+ break;
+ }
+
+ // g_err << "Event : " << buff;
+
+ if (strstr(buff, "Local checkpoint") &&
+ strstr(buff, "completed"))
+ {
+ completedLcps++;
+ g_err << "LCP " << completedLcps << " completed." << endl;
+
+ if (completedLcps == 2)
+ break;
+
+ /* Request + wait for another... */
+ {
+ int startLcpDumpCode = 7099;
+ if (restarter.dumpStateOneNode(otherNode, &startLcpDumpCode, 1))
+ {
+ g_err << "Dump state failed." << endl;
+ break;
+ }
+ }
+ }
+ }
+
+ if (completedLcps != 2)
+ {
+ g_err << "Some problem while waiting for LCP completion" << endl;
+ break;
+ }
+
+ /* Now wait for the node to recover */
+ if (restarter.waitNodesStarted((const int*) &victim, 1, 120) != 0)
+ {
+ g_err << "Failed waiting for node " << victim << "to start" << endl;
+ break;
+ }
+
+ ctx->stopTest();
+ return NDBT_OK;
+ } while (0);
+
+ ctx->stopTest();
+ return NDBT_FAILED;
+}
+
+
NDBT_TESTSUITE(testNodeRestart);
TESTCASE("NoLoad",
"Test that one node at a time can be stopped and then restarted "\
@@ -5575,6 +5699,13 @@ TESTCASE("LCPTakeOver", "")
STEP(runPkUpdateUntilStopped);
STEP(runScanUpdateUntilStopped);
}
+TESTCASE("LCPScanFragWatchdog",
+ "Test LCP scan watchdog")
+{
+ INITIALIZER(runLoadTable);
+ STEP(runPkUpdateUntilStopped);
+ STEP(runTestScanFragWatchdog);
+}
NDBT_TESTSUITE_END(testNodeRestart);
=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt 2011-10-05 13:18:31 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2012-05-21 16:26:42 +0000
@@ -73,3 +73,8 @@ max-time: 1800
cmd: testDict
args: -n SchemaTrans -l 1
+# LCP Frag watchdog
+max-time: 600
+cmd: testNodeRestart
+args: -n LCPScanFragWatchdog T2
+
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4926 to 4927)Bug#14075825 | Frazer Clement | 21 May |