From: Frazer Clement Date: November 7 2012 6:12pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:4073 to 4074) List-Archive: http://lists.mysql.com/commits/145237 Message-Id: <201211071812.qA7ICm8m027793@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4074 Frazer Clement 2012-11-07 [merge] Merge 7.1->7.2 modified: storage/ndb/include/kernel/signaldata/LCP.hpp storage/ndb/src/common/debugger/signaldata/LCP.cpp storage/ndb/src/kernel/blocks/backup/Backup.cpp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 4073 magnus.blaudd@stripped 2012-11-06 [merge] Merge 7.1 -> 7.2 === modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp' --- a/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-05-21 23:05:17 +0000 +++ b/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-11-07 18:00:23 +0000 @@ -246,7 +246,7 @@ public: private: Uint32 senderRef; - Uint32 reqData; + Uint32 senderData; }; struct LcpStatusConf @@ -278,7 +278,7 @@ public: }; private: Uint32 senderRef; - Uint32 reqData; + Uint32 senderData; /* Backup stuff */ Uint32 lcpState; /* In lcpState == LCP_IDLE, refers to prev LCP @@ -289,11 +289,18 @@ private: Uint32 lcpDoneBytesHi; Uint32 lcpDoneBytesLo; - /* Backup stuff valid iff lcpState == LCP_SCANNING */ Uint32 tableId; Uint32 fragId; - Uint32 replicaDoneRowsHi; - Uint32 replicaDoneRowsLo; + /* Backup stuff valid iff lcpState == LCP_SCANNING or + * LCP_SCANNED + * For LCP_SCANNING contains row count of rows scanned + * (Increases as scan proceeds) + * For LCP_SCANNED contains bytes remaining to be flushed + * to file. + * (Decreases as buffer drains to file) + */ + Uint32 completionStateHi; + Uint32 completionStateLo; }; struct LcpStatusRef @@ -325,7 +332,7 @@ public: private: Uint32 senderRef; - Uint32 reqData; + Uint32 senderData; Uint32 error; }; === modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp' --- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-05-21 22:27:28 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-11-07 17:29:01 +0000 @@ -95,8 +95,8 @@ printLCP_STATUS_REQ(FILE * output, const Uint32 len, Uint16 receiverBlockNo){ const LcpStatusReq* const sig = (LcpStatusReq*) theData; - fprintf(output, " SenderRef : %x ReqData : %u\n", - sig->senderRef, sig->reqData); + fprintf(output, " SenderRef : %x SenderData : %u\n", + sig->senderRef, sig->senderData); return true; } @@ -105,10 +105,10 @@ printLCP_STATUS_CONF(FILE * output, cons Uint32 len, Uint16 receiverBlockNo){ const LcpStatusConf* const sig = (LcpStatusConf*) theData; - fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n", - sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId); - fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n", - (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo, + fprintf(output, " SenderRef : %x SenderData : %u LcpState : %u tableId : %u fragId : %u\n", + sig->senderRef, sig->senderData, sig->lcpState, sig->tableId, sig->fragId); + fprintf(output, " replica(Progress : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n", + (((Uint64)sig->completionStateHi) << 32) + sig->completionStateLo, (((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo, (((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo); return true; @@ -119,7 +119,7 @@ printLCP_STATUS_REF(FILE * output, const Uint32 len, Uint16 receiverBlockNo){ const LcpStatusRef* const sig = (LcpStatusRef*) theData; - fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n", - sig->senderRef, sig->reqData, sig->error); + fprintf(output, " SenderRef : %x, SenderData : %u Error : %u\n", + sig->senderRef, sig->senderData, sig->error); return true; } === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-09-13 20:47:06 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-11-07 18:00:23 +0000 @@ -4792,6 +4792,13 @@ Backup::ready_to_write(bool ready, Uint3 ndbout << endl << "Current Millisecond is = "; ndbout << NdbTick_CurrentMillisecond() << endl; #endif + + if (ERROR_INSERTED(10043) && eof) + { + /* Block indefinitely without closing the file */ + return false; + } + if ((ready || eof) && m_words_written_this_period <= m_curr_disk_write_speed) { @@ -5966,7 +5973,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr(); const Uint32 senderRef = req->senderRef; - const Uint32 reqData = req->reqData; + const Uint32 senderData = req->senderData; Uint32 failCode = LcpStatusRef::NoLCPRecord; /* Find LCP backup, if there is one */ @@ -5985,18 +5992,23 @@ Backup::execLCP_STATUS_REQ(Signal* signa switch (ptr.p->slaveState.getState()) { case STARTED: + jam(); state = LcpStatusConf::LCP_PREPARED; break; case SCANNING: + jam(); state = LcpStatusConf::LCP_SCANNING; break; case STOPPING: + jam(); state = LcpStatusConf::LCP_SCANNED; break; case DEFINED: + jam(); state = LcpStatusConf::LCP_IDLE; break; default: + jam(); ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u", ptr.p->slaveState.getState()); state = LcpStatusConf::LCP_IDLE; @@ -6007,12 +6019,12 @@ Backup::execLCP_STATUS_REQ(Signal* signa LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr(); conf->senderRef = reference(); - conf->reqData = reqData; + conf->senderData = senderData; conf->lcpState = state; conf->tableId = UnsetConst; conf->fragId = UnsetConst; - conf->replicaDoneRowsHi = UnsetConst; - conf->replicaDoneRowsLo = UnsetConst; + conf->completionStateHi = UnsetConst; + conf->completionStateLo = UnsetConst; setWords(ptr.p->noOfRecords, conf->lcpDoneRowsHi, conf->lcpDoneRowsLo); @@ -6020,9 +6032,11 @@ Backup::execLCP_STATUS_REQ(Signal* signa conf->lcpDoneBytesHi, conf->lcpDoneBytesLo); - if (state == LcpStatusConf::LCP_SCANNING) + if (state == LcpStatusConf::LCP_SCANNING || + state == LcpStatusConf::LCP_SCANNED) { - /* Actually scanning a fragment, let's grab the details */ + jam(); + /* Actually scanning/closing a fragment, let's grab the details */ TablePtr tabPtr; FragmentPtr fragPtr; BackupFilePtr filePtr; @@ -6046,9 +6060,30 @@ Backup::execLCP_STATUS_REQ(Signal* signa ndbrequire(filePtr.p->backupPtr == ptr.i); conf->tableId = tabPtr.p->tableId; conf->fragId = fragPtr.p->fragmentId; - setWords(filePtr.p->operation.noOfRecords, - conf->replicaDoneRowsHi, - conf->replicaDoneRowsLo); + + if (state == LcpStatusConf::LCP_SCANNING) + { + jam(); + setWords(filePtr.p->operation.noOfRecords, + conf->completionStateHi, + conf->completionStateLo); + } + else if (state == LcpStatusConf::LCP_SCANNED) + { + jam(); + /* May take some time to drain the FS buffer, depending on + * size of buff, achieved rate. + * We provide the buffer fill level so that requestors + * can observe whether there's progress in this phase. + */ + Uint64 flushBacklog = + filePtr.p->operation.dataBuffer.getUsableSize() - + filePtr.p->operation.dataBuffer.getFreeSize(); + + setWords(flushBacklog, + conf->completionStateHi, + conf->completionStateLo); + } } failCode = 0; @@ -6067,7 +6102,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr(); ref->senderRef = reference(); - ref->reqData = reqData; + ref->senderData = senderData; ref->error = failCode; sendSignal(senderRef, GSN_LCP_STATUS_REF, === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-10-17 14:43:50 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-11-07 18:00:23 +0000 @@ -1095,6 +1095,8 @@ public: STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */ STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */ STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */ + + SimulatedBlock* block; /* Should the watchdog be running? */ bool scan_running; @@ -1102,45 +1104,23 @@ public: /* Is there an active thread? */ bool thread_active; - /* LCP position info from Backup block */ + /* LCP position and state info from Backup block */ + LcpStatusConf::LcpState lcpState; Uint32 tableId; Uint32 fragId; - Uint64 rowCount; + Uint64 completionStatus; /* Number of periods with no LCP progress observed */ Uint32 pollCount; /* Reinitialise the watchdog */ - void reset() - { - scan_running = false; - tableId = ~Uint32(0); - fragId = ~Uint32(0); - rowCount = ~Uint64(0); - pollCount = 0; - } + void reset(); /* Handle an LCP Status report */ - void handleLcpStatusRep(Uint32 repTableId, + void handleLcpStatusRep(LcpStatusConf::LcpState repLcpState, + Uint32 repTableId, Uint32 repFragId, - Uint64 repRowCount) - { - if (scan_running) - { - if ((repRowCount != rowCount) || - (repFragId != fragId) || - (repTableId != tableId)) - { - /* Something moved since last time, reset - * poll counter and data. - */ - pollCount = 0; - tableId = repTableId; - fragId = repFragId; - rowCount = repRowCount; - } - } - } + Uint64 repCompletionStatus); }; LCPFragWatchdog c_lcpFragWatchdog; === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-08-13 14:03:42 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-11-07 18:00:23 +0000 @@ -96,6 +96,7 @@ void Dblqh::initData() c_max_parallel_scans_per_frag = 32; + c_lcpFragWatchdog.block = this; c_lcpFragWatchdog.reset(); c_lcpFragWatchdog.thread_active = false; }//Dblqh::initData() === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-10-17 14:43:50 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-11-07 18:00:23 +0000 @@ -23649,7 +23649,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal /* Send LCP_STATUS_REQ to BACKUP */ LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr(); req->senderRef = reference(); - req->reqData = 0; + req->senderData = 0; BlockReference backupRef = calcInstanceBlockRef(BACKUP); sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal, @@ -23996,7 +23996,7 @@ Dblqh::invokeLcpFragWatchdogThread(Signa LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr(); req->senderRef = cownref; - req->reqData = 1; + req->senderData = 1; BlockReference backupRef = calcInstanceBlockRef(BACKUP); sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal, LcpStatusReq::SignalLength, JBB); @@ -24008,7 +24008,7 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa jamEntry(); LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr(); - if (conf->reqData == 0) + if (conf->senderData == 0) { /* DUMP STATE variant */ ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef); @@ -24016,8 +24016,8 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa conf->lcpState, conf->tableId, conf->fragId); - ndbout_c(" Replica done rows %llu", - (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo); + ndbout_c(" Completion State %llu", + (((Uint64)conf->completionStateHi) << 32) + conf->completionStateLo); ndbout_c(" Lcp done rows %llu, done bytes %llu", (((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo, (((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo); @@ -24026,10 +24026,11 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa /* We can ignore the LCP status as if it's complete then we should * promptly stop watching */ - c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId, + c_lcpFragWatchdog.handleLcpStatusRep((LcpStatusConf::LcpState)conf->lcpState, + conf->tableId, conf->fragId, - (((Uint64)conf->replicaDoneRowsHi) << 32) + - conf->replicaDoneRowsLo); + (((Uint64)conf->completionStateHi) << 32) + + conf->completionStateLo); } void @@ -24038,12 +24039,53 @@ Dblqh::execLCP_STATUS_REF(Signal* signal jamEntry(); LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr(); - ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u", - ref->senderRef, ref->reqData, ref->error); + ndbout_c("Received LCP_STATUS_REF from %x, senderData = %u with error code %u", + ref->senderRef, ref->senderData, ref->error); ndbrequire(false); } +void +Dblqh::LCPFragWatchdog::reset() +{ + jamBlock(block); + scan_running = false; + lcpState = LcpStatusConf::LCP_IDLE; + tableId = ~Uint32(0); + fragId = ~Uint32(0); + completionStatus = ~Uint64(0); + pollCount = 0; +} + +void +Dblqh::LCPFragWatchdog::handleLcpStatusRep(LcpStatusConf::LcpState repLcpState, + Uint32 repTableId, + Uint32 repFragId, + Uint64 repCompletionStatus) +{ + jamBlock(block); + if (scan_running) + { + jamBlock(block); + if ((repCompletionStatus != completionStatus) || + (repFragId != fragId) || + (repTableId != tableId) || + (repLcpState != lcpState)) + { + jamBlock(block); + /* Something moved since last time, reset + * poll counter and data. + */ + pollCount = 0; + lcpState = repLcpState; + tableId = repTableId; + fragId = repFragId; + completionStatus = repCompletionStatus; + } + } +} + + /** * checkLcpFragWatchdog * @@ -24071,20 +24113,27 @@ Dblqh::checkLcpFragWatchdog(Signal* sign LCPFragWatchdog::WarnPeriodsWithNoProgress) { jam(); + const char* completionStatusString = + (c_lcpFragWatchdog.lcpState == LcpStatusConf::LCP_SCANNING? + "rows completed": + "bytes remaining."); + warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s." - " %llu rows completed", + " %llu %s", c_lcpFragWatchdog.tableId, c_lcpFragWatchdog.fragId, (LCPFragWatchdog::PollingPeriodMillis * c_lcpFragWatchdog.pollCount) / 1000, - c_lcpFragWatchdog.rowCount); + c_lcpFragWatchdog.completionStatus, + completionStatusString); ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s." - " %llu rows completed", + " %llu %s", c_lcpFragWatchdog.tableId, c_lcpFragWatchdog.fragId, (LCPFragWatchdog::PollingPeriodMillis * c_lcpFragWatchdog.pollCount) / 1000, - c_lcpFragWatchdog.rowCount); + c_lcpFragWatchdog.completionStatus, + completionStatusString); if (c_lcpFragWatchdog.pollCount >= LCPFragWatchdog::MaxPeriodsWithNoProgress) No bundle (reason: useless for push emails).