From: Frazer Clement Date: May 22 2012 9:51am Subject: bzr push into mysql-5.5-cluster-7.3 branch (frazer.clement:3888 to 3890) List-Archive: http://lists.mysql.com/commits/143889 Message-Id: <201205220951.q4M9pBna021075@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3890 Frazer Clement 2012-05-22 [merge] Merge 7.2->7.3 modified: storage/ndb/include/kernel/GlobalSignalNumbers.h storage/ndb/include/kernel/signaldata/LCP.hpp storage/ndb/include/kernel/signaldata/SignalData.hpp storage/ndb/src/common/debugger/signaldata/LCP.cpp storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp storage/ndb/src/common/debugger/signaldata/SignalNames.cpp storage/ndb/src/kernel/blocks/backup/Backup.cpp storage/ndb/src/kernel/blocks/backup/Backup.hpp storage/ndb/src/kernel/blocks/backup/BackupInit.cpp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/test/include/NdbMgmd.hpp storage/ndb/test/ndbapi/testNodeRestart.cpp storage/ndb/test/run-test/daily-devel-tests.txt 3889 Frazer Clement 2012-05-22 [merge] Merge 7.2->7.3 modified: storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbInterpretedCode.java storage/ndb/src/ndbjtie/ndbapi_jtie.hpp 3888 Jan Wedvik 2012-05-15 [merge] Merge 7.2->7.3 modified: libmysqld/examples/CMakeLists.txt mysql-test/suite/ndb/r/ndb_join_pushdown_default.result mysql-test/suite/ndb/t/ndb_join_pushdown.inc storage/ndb/test/CMakeLists.txt === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-05-06 13:55:09 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-05-17 01:36:46 +0000 @@ -431,7 +431,7 @@ class DbImpl implements com.mysql.cluste IndexBound.delete(ndbIndexBound); } - public NdbInterpretedCode createInterpretedCode(TableConst ndbTable, int[] buffer, int i) { + public NdbInterpretedCode createInterpretedCode(TableConst ndbTable, ByteBuffer buffer, int i) { ++numberOfInterpretedCodeCreated; return NdbInterpretedCode.create(ndbTable, buffer, i); } === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 2012-05-06 13:55:09 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 2012-05-17 00:19:25 +0000 @@ -93,6 +93,9 @@ public class NdbRecordIndexScanOperation /** The single index bound for a single range scan */ NdbIndexScanOperation.IndexBound ndbIndexBound = null; + /** The buffers used for bounds; held here to prevent garbage collection */ + List buffers = new ArrayList(); + public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction, Index storeIndex, Table storeTable, int lockMode) { this(clusterTransaction, storeIndex, storeTable, false, lockMode); @@ -108,7 +111,11 @@ public class NdbRecordIndexScanOperation ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable); keyBufferSize = ndbRecordKeys.bufferSize; indexBoundLowBuffer = ndbRecordKeys.newBuffer(); + // hold a reference to the buffer to prevent garbage collection + buffers.add(indexBoundLowBuffer); indexBoundHighBuffer = ndbRecordKeys.newBuffer(); + // hold a reference to the buffer to prevent garbage collection + buffers.add(indexBoundHighBuffer); } public void endDefinition() { @@ -351,8 +358,12 @@ public class NdbRecordIndexScanOperation ndbRecordKeys.initializeBuffer(reclaimed); } else { indexBoundLowBuffer = ndbRecordKeys.newBuffer(); + // hold a reference to the buffer to prevent garbage collection + buffers.add(indexBoundLowBuffer); } indexBoundHighBuffer = ndbRecordKeys.newBuffer(); + // hold a reference to the buffer to prevent garbage collection + buffers.add(indexBoundHighBuffer); indexBoundLowCount = 0; indexBoundHighCount = 0; indexBoundLowStrict = false; @@ -370,6 +381,8 @@ public class NdbRecordIndexScanOperation */ public void freeResourcesAfterExecute() { super.freeResourcesAfterExecute(); + // allow garbage collection for buffers used in IndexBound + buffers = null; if (ndbIndexBound != null) { db.delete(ndbIndexBound); ndbIndexBound = null; === modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h' --- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2012-05-21 23:05:17 +0000 @@ -836,9 +836,10 @@ extern const GlobalSignalNumber NO_OF_SI #define GSN_SYNC_PATH_REQ 613 #define GSN_SYNC_PATH_CONF 614 -#define GSN_615 -#define GSN_616 -#define GSN_617 + +#define GSN_LCP_STATUS_REQ 615 +#define GSN_LCP_STATUS_CONF 616 +#define GSN_LCP_STATUS_REF 617 #define GSN_618 #define GSN_619 === modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp' --- a/storage/ndb/include/kernel/signaldata/LCP.hpp 2011-07-05 12:46:07 +0000 +++ b/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-05-21 23:05:17 +0000 @@ -223,4 +223,110 @@ struct EndLcpConf STATIC_CONST( SignalLength = 2 ); }; +struct LcpStatusReq +{ + /** + * Sender(s) + */ + friend class Dblqh; + + /** + * Sender(s) / Receiver(s) + */ + + /** + * Receiver(s) + */ + friend class Backup; + + friend bool printLCP_STATUS_REQ(FILE *, const Uint32 *, Uint32, Uint16); +public: + + STATIC_CONST( SignalLength = 2 ); + +private: + Uint32 senderRef; + Uint32 reqData; +}; + +struct LcpStatusConf +{ + /** + * Sender(s) + */ + friend class Backup; + + /** + * Sender(s) / Receiver(s) + */ + + /** + * Receiver(s) + */ + friend class Dblqh; + + friend bool printLCP_STATUS_CONF(FILE *, const Uint32 *, Uint32, Uint16); +public: + STATIC_CONST( SignalLength = 11 ); + + enum LcpState + { + LCP_IDLE = 0, + LCP_PREPARED = 1, + LCP_SCANNING = 2, + LCP_SCANNED = 3 + }; +private: + Uint32 senderRef; + Uint32 reqData; + /* Backup stuff */ + Uint32 lcpState; + /* In lcpState == LCP_IDLE, refers to prev LCP + * otherwise, refers to current running LCP + */ + Uint32 lcpDoneRowsHi; + Uint32 lcpDoneRowsLo; + Uint32 lcpDoneBytesHi; + Uint32 lcpDoneBytesLo; + + /* Backup stuff valid iff lcpState == LCP_SCANNING */ + Uint32 tableId; + Uint32 fragId; + Uint32 replicaDoneRowsHi; + Uint32 replicaDoneRowsLo; +}; + +struct LcpStatusRef +{ + /** + * Sender(s) + */ + friend class Backup; + + /** + * Sender(s) / Receiver(s) + */ + + /** + * Receiver(s) + */ + friend class Dblqh; + + friend bool printLCP_STATUS_REF(FILE *, const Uint32 *, Uint32, Uint16); +public: + STATIC_CONST( SignalLength = 3 ); + + enum StatusFailCodes + { + NoLCPRecord = 1, + NoTableRecord = 2, + NoFileRecord = 3 + }; + +private: + Uint32 senderRef; + Uint32 reqData; + Uint32 error; +}; + #endif === modified file 'storage/ndb/include/kernel/signaldata/SignalData.hpp' --- a/storage/ndb/include/kernel/signaldata/SignalData.hpp 2011-09-02 09:16:56 +0000 +++ b/storage/ndb/include/kernel/signaldata/SignalData.hpp 2012-05-21 23:05:17 +0000 @@ -321,4 +321,8 @@ GSN_PRINT_SIGNATURE(printGET_CONFIG_REQ) GSN_PRINT_SIGNATURE(printGET_CONFIG_REF); GSN_PRINT_SIGNATURE(printGET_CONFIG_CONF); +GSN_PRINT_SIGNATURE(printLCP_STATUS_REQ); +GSN_PRINT_SIGNATURE(printLCP_STATUS_CONF); +GSN_PRINT_SIGNATURE(printLCP_STATUS_REF); + #endif === modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp' --- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-05-21 22:27:28 +0000 @@ -89,3 +89,37 @@ printLCP_COMPLETE_REP(FILE * output, con sig->lcpId, sig->nodeId, getBlockName(sig->blockNo)); return true; } + +bool +printLCP_STATUS_REQ(FILE * output, const Uint32 * theData, + Uint32 len, Uint16 receiverBlockNo){ + const LcpStatusReq* const sig = (LcpStatusReq*) theData; + + fprintf(output, " SenderRef : %x ReqData : %u\n", + sig->senderRef, sig->reqData); + return true; +} + +bool +printLCP_STATUS_CONF(FILE * output, const Uint32 * theData, + Uint32 len, Uint16 receiverBlockNo){ + const LcpStatusConf* const sig = (LcpStatusConf*) theData; + + fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n", + sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId); + fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n", + (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo, + (((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo, + (((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo); + return true; +} + +bool +printLCP_STATUS_REF(FILE * output, const Uint32 * theData, + Uint32 len, Uint16 receiverBlockNo){ + const LcpStatusRef* const sig = (LcpStatusRef*) theData; + + fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n", + sig->senderRef, sig->reqData, sig->error); + return true; +} === modified file 'storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp' --- a/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp 2011-09-02 09:16:56 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp 2012-05-21 23:05:17 +0000 @@ -272,6 +272,10 @@ SignalDataPrintFunctions[] = { ,{ GSN_GET_CONFIG_REF, printGET_CONFIG_REF } ,{ GSN_GET_CONFIG_CONF, printGET_CONFIG_CONF } + ,{ GSN_LCP_STATUS_REQ, printLCP_STATUS_REQ } + ,{ GSN_LCP_STATUS_CONF, printLCP_STATUS_CONF } + ,{ GSN_LCP_STATUS_REF, printLCP_STATUS_REF } + ,{ 0, 0 } }; === modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp' --- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2012-01-10 13:01:14 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2012-05-21 23:05:17 +0000 @@ -775,5 +775,9 @@ const GsnName SignalNames [] = { ,{ GSN_GET_CONFIG_REQ, "GET_CONFIG_REQ" } ,{ GSN_GET_CONFIG_REF, "GET_CONFIG_REF" } ,{ GSN_GET_CONFIG_CONF, "GET_CONFIG_CONF" } + + ,{ GSN_LCP_STATUS_REQ, "LCP_STATUS_REQ" } + ,{ GSN_LCP_STATUS_CONF, "LCP_STATUS_CONF" } + ,{ GSN_LCP_STATUS_REF, "LCP_STATUS_REF" } }; const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName); === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-05-21 23:05:17 +0000 @@ -4471,6 +4471,10 @@ Backup::fragmentCompleted(Signal* signal if (ptr.p->is_lcp()) { + /* Maintain LCP totals */ + ptr.p->noOfRecords+= op.noOfRecords; + ptr.p->noOfBytes+= op.noOfBytes; + ptr.p->slaveState.setState(STOPPING); filePtr.p->operation.dataBuffer.eof(); } @@ -5682,7 +5686,14 @@ Backup::execLCP_PREPARE_REQ(Signal* sign fragPtr.p->scanned = 0; fragPtr.p->scanning = 0; fragPtr.p->tableId = req.tableId; - + + if (req.backupId != ptr.p->backupId) + { + jam(); + /* New LCP, reset per-LCP counters */ + ptr.p->noOfBytes = 0; + ptr.p->noOfRecords = 0; + } ptr.p->backupId= req.backupId; lcp_open_file(signal, ptr); } @@ -5843,3 +5854,127 @@ Backup::execEND_LCPREQ(Signal* signal) sendSignal(ptr.p->masterRef, GSN_END_LCPCONF, signal, EndLcpConf::SignalLength, JBB); } + +inline +static +void setWords(const Uint64 src, Uint32& hi, Uint32& lo) +{ + hi = (Uint32) (src >> 32); + lo = (Uint32) (src & 0xffffffff); +} + +void +Backup::execLCP_STATUS_REQ(Signal* signal) +{ + jamEntry(); + const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr(); + + const Uint32 senderRef = req->senderRef; + const Uint32 reqData = req->reqData; + Uint32 failCode = LcpStatusRef::NoLCPRecord; + + /* Find LCP backup, if there is one */ + BackupRecordPtr ptr; + bool found_lcp = false; + for (c_backups.first(ptr); ptr.i != RNIL; c_backups.next(ptr)) + { + jam(); + if (ptr.p->is_lcp()) + { + jam(); + ndbrequire(found_lcp == false); /* Just one LCP */ + found_lcp = true; + + LcpStatusConf::LcpState state = LcpStatusConf::LCP_IDLE; + switch (ptr.p->slaveState.getState()) + { + case STARTED: + state = LcpStatusConf::LCP_PREPARED; + break; + case SCANNING: + state = LcpStatusConf::LCP_SCANNING; + break; + case STOPPING: + state = LcpStatusConf::LCP_SCANNED; + break; + case DEFINED: + state = LcpStatusConf::LCP_IDLE; + break; + default: + ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u", + ptr.p->slaveState.getState()); + state = LcpStatusConf::LCP_IDLE; + }; + + /* Not all values are set here */ + const Uint32 UnsetConst = ~0; + + LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr(); + conf->senderRef = reference(); + conf->reqData = reqData; + conf->lcpState = state; + conf->tableId = UnsetConst; + conf->fragId = UnsetConst; + conf->replicaDoneRowsHi = UnsetConst; + conf->replicaDoneRowsLo = UnsetConst; + setWords(ptr.p->noOfRecords, + conf->lcpDoneRowsHi, + conf->lcpDoneRowsLo); + setWords(ptr.p->noOfBytes, + conf->lcpDoneBytesHi, + conf->lcpDoneBytesLo); + + if (state == LcpStatusConf::LCP_SCANNING) + { + /* Actually scanning a fragment, let's grab the details */ + TablePtr tabPtr; + FragmentPtr fragPtr; + BackupFilePtr filePtr; + + ptr.p->tables.first(tabPtr); + if (tabPtr.i == RNIL) + { + jam(); + failCode = LcpStatusRef::NoTableRecord; + break; + } + tabPtr.p->fragments.getPtr(fragPtr, 0); + ndbrequire(fragPtr.p->tableId == tabPtr.p->tableId); + if (ptr.p->dataFilePtr == RNIL) + { + jam(); + failCode = LcpStatusRef::NoFileRecord; + break; + } + c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr); + ndbrequire(filePtr.p->backupPtr == ptr.i); + conf->tableId = tabPtr.p->tableId; + conf->fragId = fragPtr.p->fragmentId; + setWords(filePtr.p->operation.noOfRecords, + conf->replicaDoneRowsHi, + conf->replicaDoneRowsLo); + } + + failCode = 0; + } + } + + if (failCode == 0) + { + jam(); + sendSignal(senderRef, GSN_LCP_STATUS_CONF, + signal, LcpStatusConf::SignalLength, JBB); + return; + } + + jam(); + LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr(); + + ref->senderRef = reference(); + ref->reqData = reqData; + ref->error = failCode; + + sendSignal(senderRef, GSN_LCP_STATUS_REF, + signal, LcpStatusRef::SignalLength, JBB); + return; +} === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2012-05-21 22:27:28 +0000 @@ -160,6 +160,8 @@ protected: void execDBINFO_SCANREQ(Signal *signal); + void execLCP_STATUS_REQ(Signal* signal); + private: void defineBackupMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal); void dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal); === modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp' --- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2012-01-30 12:31:48 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2012-05-21 23:05:17 +0000 @@ -117,6 +117,8 @@ Backup::Backup(Block_context& ctx, Uint3 addRecSignal(GSN_BACKUP_LOCK_TAB_CONF, &Backup::execBACKUP_LOCK_TAB_CONF); addRecSignal(GSN_BACKUP_LOCK_TAB_REF, &Backup::execBACKUP_LOCK_TAB_REF); + addRecSignal(GSN_LCP_STATUS_REQ, &Backup::execLCP_STATUS_REQ); + /** * Testing */ === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-05-21 23:05:17 +0000 @@ -249,6 +249,7 @@ class Lgman; #define ZWAIT_REORG_SUMA_FILTER_ENABLED 23 #define ZREBUILD_ORDERED_INDEXES 24 #define ZWAIT_READONLY 25 +#define ZLCP_FRAG_WATCHDOG 26 /* ------------------------------------------------------------------------- */ /* NODE STATE DURING SYSTEM RESTART, VARIABLES CNODES_SR_STATE */ @@ -1083,6 +1084,72 @@ public: void send_io(Uint32 bytes); void complete_io(Uint32 bytes); }; + + /** + * LCPFragWatchdog + * + * Structure tracking state of LCP fragment watchdog. + * This watchdog polls the state of the current LCP fragment + * scan to ensure that forward progress is maintained at + * a minimal rate. + * It only continues running while this LQH instance + * thinks a fragment scan is ongoing + */ + struct LCPFragWatchdog + { + STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */ + STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */ + STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */ + + /* Should the watchdog be running? */ + bool scan_running; + + /* Is there an active thread? */ + bool thread_active; + + /* LCP position info from Backup block */ + Uint32 tableId; + Uint32 fragId; + Uint64 rowCount; + + /* Number of periods with no LCP progress observed */ + Uint32 pollCount; + + /* Reinitialise the watchdog */ + void reset() + { + scan_running = false; + tableId = ~Uint32(0); + fragId = ~Uint32(0); + rowCount = ~Uint64(0); + pollCount = 0; + } + + /* Handle an LCP Status report */ + void handleLcpStatusRep(Uint32 repTableId, + Uint32 repFragId, + Uint64 repRowCount) + { + if (scan_running) + { + if ((repRowCount != rowCount) || + (repFragId != fragId) || + (repTableId != tableId)) + { + /* Something moved since last time, reset + * poll counter and data. + */ + pollCount = 0; + tableId = repTableId; + fragId = repFragId; + rowCount = repRowCount; + } + } + } + }; + + LCPFragWatchdog c_lcpFragWatchdog; + /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */ /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */ @@ -2736,6 +2803,14 @@ private: void unlockError(Signal* signal, Uint32 error); void handleUserUnlockRequest(Signal* signal); + void execLCP_STATUS_CONF(Signal* signal); + void execLCP_STATUS_REF(Signal* signal); + + void startLcpFragWatchdog(Signal* signal); + void stopLcpFragWatchdog(); + void invokeLcpFragWatchdogThread(Signal* signal); + void checkLcpFragWatchdog(Signal* signal); + Dbtup* c_tup; Dbacc* c_acc; Lgman* c_lgman; === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-11-18 06:47:23 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-05-21 23:05:17 +0000 @@ -90,6 +90,9 @@ void Dblqh::initData() c_max_redo_lag_counter = 3; // 3 strikes and you're out c_max_parallel_scans_per_frag = 32; + + c_lcpFragWatchdog.reset(); + c_lcpFragWatchdog.thread_active = false; }//Dblqh::initData() void Dblqh::initRecords() @@ -426,6 +429,9 @@ Dblqh::Dblqh(Block_context& ctx, Uint32 addRecSignal(GSN_FIRE_TRIG_REQ, &Dblqh::execFIRE_TRIG_REQ); + addRecSignal(GSN_LCP_STATUS_CONF, &Dblqh::execLCP_STATUS_CONF); + addRecSignal(GSN_LCP_STATUS_REF, &Dblqh::execLCP_STATUS_REF); + initData(); #ifdef VM_TRACE === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-07 07:51:09 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-21 23:05:17 +0000 @@ -595,6 +595,12 @@ void Dblqh::execCONTINUEB(Signal* signal wait_readonly(signal); return; } + case ZLCP_FRAG_WATCHDOG: + { + jam(); + checkLcpFragWatchdog(signal); + return; + } default: ndbrequire(false); break; @@ -13713,7 +13719,9 @@ void Dblqh::execLCP_PREPARE_REF(Signal* */ ndbrequire(refToMain(signal->getSendersBlockRef()) == BACKUP); lcpPtr.p->m_error = ref->errorCode; - + + stopLcpFragWatchdog(); + if (lcpPtr.p->m_outstanding == 0) { jam(); @@ -13924,6 +13932,8 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Sig ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP); lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED; + stopLcpFragWatchdog(); + /* ------------------------------------------------------------------------ * THE LOCAL CHECKPOINT HAS BEEN COMPLETED. IT IS NOW TIME TO START * A LOCAL CHECKPOINT ON THE NEXT FRAGMENT OR COMPLETE THIS LCP ROUND. @@ -14073,6 +14083,9 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* si sendSignal(backupRef, GSN_LCP_PREPARE_REQ, signal, LcpPrepareReq::SignalLength, JBB); + /* Now start the LCP fragment watchdog */ + startLcpFragWatchdog(signal); + }//Dblqh::sendLCP_FRAGIDREQ() void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle) @@ -23477,6 +23490,30 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal } } + if (arg == 2397) + { + /* Send LCP_STATUS_REQ to BACKUP */ + LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr(); + req->senderRef = reference(); + req->reqData = 0; + + BlockReference backupRef = calcInstanceBlockRef(BACKUP); + sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal, + LcpStatusReq::SignalLength, JBB); + } + + + if(arg == 2309) + { + CRASH_INSERTION(5075); + + progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, + "Please report this as a bug. " + "Provide as much info as possible, expecially all the " + "ndb_*_out.log files, Thanks. " + "Shutting down node due to lack of LCP fragment scan progress"); + } + if (arg == 5050) { #ifdef ERROR_INSERT @@ -23704,6 +23741,195 @@ Dblqh::ndbinfo_write_op(Ndbinfo::Row & r } +void +Dblqh::startLcpFragWatchdog(Signal* signal) +{ + jam(); + /* Must not already be running */ + /* Thread could still be active from a previous run */ + ndbrequire(c_lcpFragWatchdog.scan_running == false); + c_lcpFragWatchdog.scan_running = true; + + /* If thread is not already active, start it */ + if (! c_lcpFragWatchdog.thread_active) + { + jam(); + invokeLcpFragWatchdogThread(signal); + } + + ndbrequire(c_lcpFragWatchdog.thread_active == true); +} + +void +Dblqh::invokeLcpFragWatchdogThread(Signal* signal) +{ + jam(); + ndbrequire(c_lcpFragWatchdog.scan_running); + + c_lcpFragWatchdog.thread_active = true; + + signal->getDataPtrSend()[0] = ZLCP_FRAG_WATCHDOG; + sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, + LCPFragWatchdog::PollingPeriodMillis, 1); + + LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr(); + req->senderRef = cownref; + req->reqData = 1; + BlockReference backupRef = calcInstanceBlockRef(BACKUP); + sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal, + LcpStatusReq::SignalLength, JBB); +} + +void +Dblqh::execLCP_STATUS_CONF(Signal* signal) +{ + jamEntry(); + LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr(); + + if (conf->reqData == 0) + { + /* DUMP STATE variant */ + ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef); + ndbout_c(" Status = %u, Table = %u, Frag = %u", + conf->lcpState, + conf->tableId, + conf->fragId); + ndbout_c(" Replica done rows %llu", + (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo); + ndbout_c(" Lcp done rows %llu, done bytes %llu", + (((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo, + (((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo); + } + + /* We can ignore the LCP status as if it's complete then we should + * promptly stop watching + */ + c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId, + conf->fragId, + (((Uint64)conf->replicaDoneRowsHi) << 32) + + conf->replicaDoneRowsLo); +} + +void +Dblqh::execLCP_STATUS_REF(Signal* signal) +{ + jamEntry(); + LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr(); + + ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u", + ref->senderRef, ref->reqData, ref->error); + + ndbrequire(false); +} + +/** + * checkLcpFragWatchdog + * + * This method implements the LCP Frag watchdog 'thread', periodically + * checking for progress in the current LCP fragment scan + */ +void +Dblqh::checkLcpFragWatchdog(Signal* signal) +{ + jam(); + ndbrequire(c_lcpFragWatchdog.thread_active == true); + + if (!c_lcpFragWatchdog.scan_running) + { + jam(); + /* We've been asked to stop */ + c_lcpFragWatchdog.thread_active = false; + return; + } + + c_lcpFragWatchdog.pollCount++; + + /* Check how long we've been waiting for progress on this scan */ + if (c_lcpFragWatchdog.pollCount >= + LCPFragWatchdog::WarnPeriodsWithNoProgress) + { + jam(); + warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s." + " %llu rows completed", + c_lcpFragWatchdog.tableId, + c_lcpFragWatchdog.fragId, + (LCPFragWatchdog::PollingPeriodMillis * + c_lcpFragWatchdog.pollCount) / 1000, + c_lcpFragWatchdog.rowCount); + ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s." + " %llu rows completed", + c_lcpFragWatchdog.tableId, + c_lcpFragWatchdog.fragId, + (LCPFragWatchdog::PollingPeriodMillis * + c_lcpFragWatchdog.pollCount) / 1000, + c_lcpFragWatchdog.rowCount); + + if (c_lcpFragWatchdog.pollCount >= + LCPFragWatchdog::MaxPeriodsWithNoProgress) + { + jam(); + /* Too long with no progress... */ + + warningEvent("LCP Frag watchdog : Checkpoint of table %u fragment %u " + "too slow (no progress for > %u s).", + c_lcpFragWatchdog.tableId, + c_lcpFragWatchdog.fragId, + (LCPFragWatchdog::PollingPeriodMillis * + LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000); + ndbout_c("LCP Frag watchdog : Checkpoint of table %u fragment %u " + "too slow (no progress for > %u s).", + c_lcpFragWatchdog.tableId, + c_lcpFragWatchdog.fragId, + (LCPFragWatchdog::PollingPeriodMillis * + LCPFragWatchdog::MaxPeriodsWithNoProgress) / 1000); + + /* Dump some LCP state for debugging... */ + { + DumpStateOrd* ds = (DumpStateOrd*) signal->getDataPtrSend(); + + /* DIH : */ + ds->args[0] = DumpStateOrd::DihDumpLCPState; + sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA); + + ds->args[0] = 7012; + sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA); + + /* BACKUP : */ + ds->args[0] = 23; + sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 1, JBA); + + ds->args[0] = 24; + ds->args[1] = 2424; + sendSignal(BACKUP_REF, GSN_DUMP_STATE_ORD, signal, 2, JBA); + + /* LQH : */ + ds->args[0] = DumpStateOrd::LqhDumpLcpState; + sendSignal(cownref, GSN_DUMP_STATE_ORD, signal, 1, JBA); + + /* Delay self-execution to give time for dump output */ + ds->args[0] = 2309; + sendSignalWithDelay(cownref, GSN_DUMP_STATE_ORD, signal, 5*1000, 1); + } + + return; + } + } + + invokeLcpFragWatchdogThread(signal); +} + +void +Dblqh::stopLcpFragWatchdog() +{ + jam(); + /* Mark watchdog as no longer running, + * If the 'thread' is active then it will + * stop at the next wakeup + */ + ndbrequire(c_lcpFragWatchdog.scan_running); + c_lcpFragWatchdog.reset(); +}; + /* **************************************************************** */ /* ---------------------------------------------------------------- */ /* ---------------------- TRIGGER HANDLING ------------------------ */ === modified file 'storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbInterpretedCode.java' --- a/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbInterpretedCode.java 2011-07-05 12:46:07 +0000 +++ b/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbInterpretedCode.java 2012-05-17 01:50:26 +0000 @@ -30,7 +30,7 @@ public class NdbInterpretedCode extends public final native NdbDictionary.TableConst/*_const NdbDictionary.Table *_*/ getTable() /*_const_*/; public final native NdbErrorConst/*_const NdbError &_*/ getNdbError() /*_const_*/; public final native int/*_Uint32_*/ getWordsUsed() /*_const_*/; - static public final native NdbInterpretedCode create(NdbDictionary.TableConst/*_const NdbDictionary.Table *_*/ table /*_= 0_*/, int[]/*_Uint32 *_*/ buffer /*_= 0_*/, int/*_Uint32_*/ buffer_word_size /*_= 0_*/); + static public final native NdbInterpretedCode create(NdbDictionary.TableConst/*_const NdbDictionary.Table *_*/ table /*_= 0_*/, ByteBuffer/*_Uint32 *_*/ buffer /*_= 0_*/, int/*_Uint32_*/ buffer_word_size /*_= 0_*/); static public final native void delete(NdbInterpretedCode p0); public final native int load_const_null(int/*_Uint32_*/ RegDest); public final native int load_const_u16(int/*_Uint32_*/ RegDest, int/*_Uint32_*/ Constant); === modified file 'storage/ndb/src/ndbjtie/ndbapi_jtie.hpp' --- a/storage/ndb/src/ndbjtie/ndbapi_jtie.hpp 2012-03-23 12:49:18 +0000 +++ b/storage/ndb/src/ndbjtie/ndbapi_jtie.hpp 2012-05-17 01:50:26 +0000 @@ -6566,7 +6566,7 @@ JNIEXPORT jobject JNICALL Java_com_mysql_ndbjtie_ndbapi_NdbInterpretedCode_create(JNIEnv * env, jclass cls, jobject p0, jintArray p1, jint p2) { TRACE("jobject Java_com_mysql_ndbjtie_ndbapi_NdbInterpretedCode_create(JNIEnv *, jclass, jobject, jintArray, jint)"); - return gcreate< ttrait_c_m_n_n_NdbInterpretedCode_r, ttrait_c_m_n_n_NdbDictionary_Table_cp, ttrait_Uint32_0p_a, ttrait_Uint32 >(env, cls, p0, p1, p2); + return gcreate< ttrait_c_m_n_n_NdbInterpretedCode_r, ttrait_c_m_n_n_NdbDictionary_Table_cp, ttrait_Uint32_0p_bb, ttrait_Uint32 >(env, cls, p0, p1, p2); } /* === modified file 'storage/ndb/test/include/NdbMgmd.hpp' --- a/storage/ndb/test/include/NdbMgmd.hpp 2011-10-24 13:14:28 +0000 +++ b/storage/ndb/test/include/NdbMgmd.hpp 2012-05-21 23:05:17 +0000 @@ -28,12 +28,16 @@ #include "../../src/mgmsrv/Config.hpp" +#include + class NdbMgmd { BaseString m_connect_str; NdbMgmHandle m_handle; Uint32 m_nodeid; bool m_verbose; unsigned int m_timeout; + NDB_SOCKET_TYPE m_event_socket; + void error(const char* msg, ...) ATTRIBUTE_FORMAT(printf, 2, 3) { if (!m_verbose) @@ -391,6 +395,74 @@ public: return true; } + bool subscribe_to_events(void) + { + if (!is_connected()) + { + error("subscribe_to_events: not connected"); + return false; + } + + int filter[] = + { + 15, NDB_MGM_EVENT_CATEGORY_STARTUP, + 15, NDB_MGM_EVENT_CATEGORY_SHUTDOWN, + 15, NDB_MGM_EVENT_CATEGORY_STATISTIC, + 15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT, + 15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART, + 15, NDB_MGM_EVENT_CATEGORY_CONNECTION, + 15, NDB_MGM_EVENT_CATEGORY_BACKUP, + 15, NDB_MGM_EVENT_CATEGORY_CONGESTION, + 15, NDB_MGM_EVENT_CATEGORY_DEBUG, + 15, NDB_MGM_EVENT_CATEGORY_INFO, + 0 + }; + +#ifdef NDB_WIN + m_event_socket.s = ndb_mgm_listen_event(m_handle, filter); +#else + m_event_socket.fd = ndb_mgm_listen_event(m_handle, filter); +#endif + + return my_socket_valid(m_event_socket); + } + + bool get_next_event_line(char* buff, int bufflen, + int timeout_millis) + { + if (!is_connected()) + { + error("get_next_event_line: not connected"); + return false; + } + + if (!my_socket_valid(m_event_socket)) + { + error("get_next_event_line: not subscribed"); + return false; + } + + SocketInputStream stream(m_event_socket, timeout_millis); + + const char* result = stream.gets(buff, bufflen); + if (result && strlen(result)) + { + return true; + } + else + { + if (stream.timedout()) + { + error("get_next_event_line: stream.gets timed out"); + return false; + } + } + + error("get_next_event_line: error from stream.gets()"); + return false; + } + + // Pretty printer for 'ndb_mgm_node_type' class NodeType { BaseString m_str; === modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp' --- a/storage/ndb/test/ndbapi/testNodeRestart.cpp 2012-01-12 13:36:25 +0000 +++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp 2012-05-21 23:05:17 +0000 @@ -26,6 +26,7 @@ #include #include #include +#include int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ @@ -5015,6 +5016,129 @@ runLCPTakeOver(NDBT_Context* ctx, NDBT_S return NDBT_OK; } +int +runTestScanFragWatchdog(NDBT_Context* ctx, NDBT_Step* step) +{ + /* Setup an error insert, then start a checkpoint */ + NdbRestarter restarter; + if (restarter.getNumDbNodes() < 2) + { + g_err << "Insufficient nodes for test." << endl; + ctx->stopTest(); + return NDBT_OK; + } + + do + { + g_err << "Injecting fault to suspend LCP frag scan..." << endl; + Uint32 victim = restarter.getNode(NdbRestarter::NS_RANDOM); + Uint32 otherNode = 0; + do + { + otherNode = restarter.getNode(NdbRestarter::NS_RANDOM); + } while (otherNode == victim); + + if (restarter.insertErrorInNode(victim, 10039) != 0) /* Cause LCP/backup frag scan to halt */ + { + g_err << "Error insert failed." << endl; + break; + } + if (restarter.insertErrorInNode(victim, 5075) != 0) /* Treat watchdog fail as test success */ + { + g_err << "Error insert failed." << endl; + break; + } + + g_err << "Triggering LCP..." << endl; + /* Now trigger LCP, in case the concurrent updates don't */ + { + int startLcpDumpCode = 7099; + if (restarter.dumpStateOneNode(victim, &startLcpDumpCode, 1)) + { + g_err << "Dump state failed." << endl; + break; + } + } + + g_err << "Subscribing to MGMD events..." << endl; + + NdbMgmd mgmd; + + if (!mgmd.connect()) + { + g_err << "Failed to connect to MGMD" << endl; + break; + } + + if (!mgmd.subscribe_to_events()) + { + g_err << "Failed to subscribe to events" << endl; + break; + } + + g_err << "Waiting to hear of LCP completion..." << endl; + Uint32 completedLcps = 0; + Uint64 maxWaitSeconds = 240; + Uint64 endTime = NdbTick_CurrentMillisecond() + + (maxWaitSeconds * 1000); + + while (NdbTick_CurrentMillisecond() < endTime) + { + char buff[512]; + + if (!mgmd.get_next_event_line(buff, + sizeof(buff), + 10 * 1000)) + { + g_err << "Failed to get event line " << endl; + break; + } + + // g_err << "Event : " << buff; + + if (strstr(buff, "Local checkpoint") && + strstr(buff, "completed")) + { + completedLcps++; + g_err << "LCP " << completedLcps << " completed." << endl; + + if (completedLcps == 2) + break; + + /* Request + wait for another... */ + { + int startLcpDumpCode = 7099; + if (restarter.dumpStateOneNode(otherNode, &startLcpDumpCode, 1)) + { + g_err << "Dump state failed." << endl; + break; + } + } + } + } + + if (completedLcps != 2) + { + g_err << "Some problem while waiting for LCP completion" << endl; + break; + } + + /* Now wait for the node to recover */ + if (restarter.waitNodesStarted((const int*) &victim, 1, 120) != 0) + { + g_err << "Failed waiting for node " << victim << "to start" << endl; + break; + } + + ctx->stopTest(); + return NDBT_OK; + } while (0); + + ctx->stopTest(); + return NDBT_FAILED; +} + + NDBT_TESTSUITE(testNodeRestart); TESTCASE("NoLoad", "Test that one node at a time can be stopped and then restarted "\ @@ -5575,6 +5699,13 @@ TESTCASE("LCPTakeOver", "") STEP(runPkUpdateUntilStopped); STEP(runScanUpdateUntilStopped); } +TESTCASE("LCPScanFragWatchdog", + "Test LCP scan watchdog") +{ + INITIALIZER(runLoadTable); + STEP(runPkUpdateUntilStopped); + STEP(runTestScanFragWatchdog); +} NDBT_TESTSUITE_END(testNodeRestart); === modified file 'storage/ndb/test/run-test/daily-devel-tests.txt' --- a/storage/ndb/test/run-test/daily-devel-tests.txt 2011-10-05 13:57:58 +0000 +++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2012-05-21 23:05:17 +0000 @@ -73,3 +73,8 @@ max-time: 1800 cmd: testDict args: -n SchemaTrans -l 1 +# LCP Frag watchdog +max-time: 600 +cmd: testNodeRestart +args: -n LCPScanFragWatchdog T2 + No bundle (reason: useless for push emails).