4651 Frazer Clement 2012-11-07 [merge]
Merge 7.0->7.1
modified:
storage/ndb/include/kernel/signaldata/LCP.hpp
storage/ndb/src/common/debugger/signaldata/LCP.cpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
4650 magnus.blaudd@stripped 2012-11-06 [merge]
Merge 7.0 -> 7.1
=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-05-21 22:27:28 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp 2012-11-07 17:29:01 +0000
@@ -246,7 +246,7 @@ public:
private:
Uint32 senderRef;
- Uint32 reqData;
+ Uint32 senderData;
};
struct LcpStatusConf
@@ -278,7 +278,7 @@ public:
};
private:
Uint32 senderRef;
- Uint32 reqData;
+ Uint32 senderData;
/* Backup stuff */
Uint32 lcpState;
/* In lcpState == LCP_IDLE, refers to prev LCP
@@ -289,11 +289,18 @@ private:
Uint32 lcpDoneBytesHi;
Uint32 lcpDoneBytesLo;
- /* Backup stuff valid iff lcpState == LCP_SCANNING */
Uint32 tableId;
Uint32 fragId;
- Uint32 replicaDoneRowsHi;
- Uint32 replicaDoneRowsLo;
+ /* Backup stuff valid iff lcpState == LCP_SCANNING or
+ * LCP_SCANNED
+ * For LCP_SCANNING contains row count of rows scanned
+ * (Increases as scan proceeds)
+ * For LCP_SCANNED contains bytes remaining to be flushed
+ * to file.
+ * (Decreases as buffer drains to file)
+ */
+ Uint32 completionStateHi;
+ Uint32 completionStateLo;
};
struct LcpStatusRef
@@ -325,7 +332,7 @@ public:
private:
Uint32 senderRef;
- Uint32 reqData;
+ Uint32 senderData;
Uint32 error;
};
=== modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-05-21 22:27:28 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp 2012-11-07 17:29:01 +0000
@@ -95,8 +95,8 @@ printLCP_STATUS_REQ(FILE * output, const
Uint32 len, Uint16 receiverBlockNo){
const LcpStatusReq* const sig = (LcpStatusReq*) theData;
- fprintf(output, " SenderRef : %x ReqData : %u\n",
- sig->senderRef, sig->reqData);
+ fprintf(output, " SenderRef : %x SenderData : %u\n",
+ sig->senderRef, sig->senderData);
return true;
}
@@ -105,10 +105,10 @@ printLCP_STATUS_CONF(FILE * output, cons
Uint32 len, Uint16 receiverBlockNo){
const LcpStatusConf* const sig = (LcpStatusConf*) theData;
- fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n",
- sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId);
- fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
- (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo,
+ fprintf(output, " SenderRef : %x SenderData : %u LcpState : %u tableId : %u fragId : %u\n",
+ sig->senderRef, sig->senderData, sig->lcpState, sig->tableId, sig->fragId);
+ fprintf(output, " replica(Progress : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
+ (((Uint64)sig->completionStateHi) << 32) + sig->completionStateLo,
(((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo,
(((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo);
return true;
@@ -119,7 +119,7 @@ printLCP_STATUS_REF(FILE * output, const
Uint32 len, Uint16 receiverBlockNo){
const LcpStatusRef* const sig = (LcpStatusRef*) theData;
- fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n",
- sig->senderRef, sig->reqData, sig->error);
+ fprintf(output, " SenderRef : %x, SenderData : %u Error : %u\n",
+ sig->senderRef, sig->senderData, sig->error);
return true;
}
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-09-13 20:31:34 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-11-07 17:29:01 +0000
@@ -4792,6 +4792,13 @@ Backup::ready_to_write(bool ready, Uint3
ndbout << endl << "Current Millisecond is = ";
ndbout << NdbTick_CurrentMillisecond() << endl;
#endif
+
+ if (ERROR_INSERTED(10043) && eof)
+ {
+ /* Block indefinitely without closing the file */
+ return false;
+ }
+
if ((ready || eof) &&
m_words_written_this_period <= m_curr_disk_write_speed)
{
@@ -5966,7 +5973,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa
const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr();
const Uint32 senderRef = req->senderRef;
- const Uint32 reqData = req->reqData;
+ const Uint32 senderData = req->senderData;
Uint32 failCode = LcpStatusRef::NoLCPRecord;
/* Find LCP backup, if there is one */
@@ -5985,18 +5992,23 @@ Backup::execLCP_STATUS_REQ(Signal* signa
switch (ptr.p->slaveState.getState())
{
case STARTED:
+ jam();
state = LcpStatusConf::LCP_PREPARED;
break;
case SCANNING:
+ jam();
state = LcpStatusConf::LCP_SCANNING;
break;
case STOPPING:
+ jam();
state = LcpStatusConf::LCP_SCANNED;
break;
case DEFINED:
+ jam();
state = LcpStatusConf::LCP_IDLE;
break;
default:
+ jam();
ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u",
ptr.p->slaveState.getState());
state = LcpStatusConf::LCP_IDLE;
@@ -6007,12 +6019,12 @@ Backup::execLCP_STATUS_REQ(Signal* signa
LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
conf->senderRef = reference();
- conf->reqData = reqData;
+ conf->senderData = senderData;
conf->lcpState = state;
conf->tableId = UnsetConst;
conf->fragId = UnsetConst;
- conf->replicaDoneRowsHi = UnsetConst;
- conf->replicaDoneRowsLo = UnsetConst;
+ conf->completionStateHi = UnsetConst;
+ conf->completionStateLo = UnsetConst;
setWords(ptr.p->noOfRecords,
conf->lcpDoneRowsHi,
conf->lcpDoneRowsLo);
@@ -6020,9 +6032,11 @@ Backup::execLCP_STATUS_REQ(Signal* signa
conf->lcpDoneBytesHi,
conf->lcpDoneBytesLo);
- if (state == LcpStatusConf::LCP_SCANNING)
+ if (state == LcpStatusConf::LCP_SCANNING ||
+ state == LcpStatusConf::LCP_SCANNED)
{
- /* Actually scanning a fragment, let's grab the details */
+ jam();
+ /* Actually scanning/closing a fragment, let's grab the details */
TablePtr tabPtr;
FragmentPtr fragPtr;
BackupFilePtr filePtr;
@@ -6046,9 +6060,30 @@ Backup::execLCP_STATUS_REQ(Signal* signa
ndbrequire(filePtr.p->backupPtr == ptr.i);
conf->tableId = tabPtr.p->tableId;
conf->fragId = fragPtr.p->fragmentId;
- setWords(filePtr.p->operation.noOfRecords,
- conf->replicaDoneRowsHi,
- conf->replicaDoneRowsLo);
+
+ if (state == LcpStatusConf::LCP_SCANNING)
+ {
+ jam();
+ setWords(filePtr.p->operation.noOfRecords,
+ conf->completionStateHi,
+ conf->completionStateLo);
+ }
+ else if (state == LcpStatusConf::LCP_SCANNED)
+ {
+ jam();
+ /* May take some time to drain the FS buffer, depending on
+ * size of buff, achieved rate.
+ * We provide the buffer fill level so that requestors
+ * can observe whether there's progress in this phase.
+ */
+ Uint64 flushBacklog =
+ filePtr.p->operation.dataBuffer.getUsableSize() -
+ filePtr.p->operation.dataBuffer.getFreeSize();
+
+ setWords(flushBacklog,
+ conf->completionStateHi,
+ conf->completionStateLo);
+ }
}
failCode = 0;
@@ -6067,7 +6102,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa
LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
ref->senderRef = reference();
- ref->reqData = reqData;
+ ref->senderData = senderData;
ref->error = failCode;
sendSignal(senderRef, GSN_LCP_STATUS_REF,
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-10-17 13:08:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-11-07 17:29:01 +0000
@@ -1095,6 +1095,8 @@ public:
STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */
STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */
STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */
+
+ SimulatedBlock* block;
/* Should the watchdog be running? */
bool scan_running;
@@ -1102,45 +1104,23 @@ public:
/* Is there an active thread? */
bool thread_active;
- /* LCP position info from Backup block */
+ /* LCP position and state info from Backup block */
+ LcpStatusConf::LcpState lcpState;
Uint32 tableId;
Uint32 fragId;
- Uint64 rowCount;
+ Uint64 completionStatus;
/* Number of periods with no LCP progress observed */
Uint32 pollCount;
/* Reinitialise the watchdog */
- void reset()
- {
- scan_running = false;
- tableId = ~Uint32(0);
- fragId = ~Uint32(0);
- rowCount = ~Uint64(0);
- pollCount = 0;
- }
+ void reset();
/* Handle an LCP Status report */
- void handleLcpStatusRep(Uint32 repTableId,
+ void handleLcpStatusRep(LcpStatusConf::LcpState repLcpState,
+ Uint32 repTableId,
Uint32 repFragId,
- Uint64 repRowCount)
- {
- if (scan_running)
- {
- if ((repRowCount != rowCount) ||
- (repFragId != fragId) ||
- (repTableId != tableId))
- {
- /* Something moved since last time, reset
- * poll counter and data.
- */
- pollCount = 0;
- tableId = repTableId;
- fragId = repFragId;
- rowCount = repRowCount;
- }
- }
- }
+ Uint64 repCompletionStatus);
};
LCPFragWatchdog c_lcpFragWatchdog;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-08-13 13:03:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2012-11-07 17:29:01 +0000
@@ -96,6 +96,7 @@ void Dblqh::initData()
c_max_parallel_scans_per_frag = 32;
+ c_lcpFragWatchdog.block = this;
c_lcpFragWatchdog.reset();
c_lcpFragWatchdog.thread_active = false;
}//Dblqh::initData()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-10-17 13:58:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-11-07 17:49:59 +0000
@@ -23641,7 +23641,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
/* Send LCP_STATUS_REQ to BACKUP */
LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr();
req->senderRef = reference();
- req->reqData = 0;
+ req->senderData = 0;
BlockReference backupRef = calcInstanceBlockRef(BACKUP);
sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
@@ -23988,7 +23988,7 @@ Dblqh::invokeLcpFragWatchdogThread(Signa
LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr();
req->senderRef = cownref;
- req->reqData = 1;
+ req->senderData = 1;
BlockReference backupRef = calcInstanceBlockRef(BACKUP);
sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
LcpStatusReq::SignalLength, JBB);
@@ -24000,7 +24000,7 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
jamEntry();
LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
- if (conf->reqData == 0)
+ if (conf->senderData == 0)
{
/* DUMP STATE variant */
ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef);
@@ -24008,8 +24008,8 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
conf->lcpState,
conf->tableId,
conf->fragId);
- ndbout_c(" Replica done rows %llu",
- (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo);
+ ndbout_c(" Completion State %llu",
+ (((Uint64)conf->completionStateHi) << 32) + conf->completionStateLo);
ndbout_c(" Lcp done rows %llu, done bytes %llu",
(((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo,
(((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo);
@@ -24018,10 +24018,11 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
/* We can ignore the LCP status as if it's complete then we should
* promptly stop watching
*/
- c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId,
+ c_lcpFragWatchdog.handleLcpStatusRep((LcpStatusConf::LcpState)conf->lcpState,
+ conf->tableId,
conf->fragId,
- (((Uint64)conf->replicaDoneRowsHi) << 32) +
- conf->replicaDoneRowsLo);
+ (((Uint64)conf->completionStateHi) << 32) +
+ conf->completionStateLo);
}
void
@@ -24030,12 +24031,53 @@ Dblqh::execLCP_STATUS_REF(Signal* signal
jamEntry();
LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
- ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u",
- ref->senderRef, ref->reqData, ref->error);
+ ndbout_c("Received LCP_STATUS_REF from %x, senderData = %u with error code %u",
+ ref->senderRef, ref->senderData, ref->error);
ndbrequire(false);
}
+void
+Dblqh::LCPFragWatchdog::reset()
+{
+ jamBlock(block);
+ scan_running = false;
+ lcpState = LcpStatusConf::LCP_IDLE;
+ tableId = ~Uint32(0);
+ fragId = ~Uint32(0);
+ completionStatus = ~Uint64(0);
+ pollCount = 0;
+}
+
+void
+Dblqh::LCPFragWatchdog::handleLcpStatusRep(LcpStatusConf::LcpState repLcpState,
+ Uint32 repTableId,
+ Uint32 repFragId,
+ Uint64 repCompletionStatus)
+{
+ jamBlock(block);
+ if (scan_running)
+ {
+ jamBlock(block);
+ if ((repCompletionStatus != completionStatus) ||
+ (repFragId != fragId) ||
+ (repTableId != tableId) ||
+ (repLcpState != lcpState))
+ {
+ jamBlock(block);
+ /* Something moved since last time, reset
+ * poll counter and data.
+ */
+ pollCount = 0;
+ lcpState = repLcpState;
+ tableId = repTableId;
+ fragId = repFragId;
+ completionStatus = repCompletionStatus;
+ }
+ }
+}
+
+
/**
* checkLcpFragWatchdog
*
@@ -24063,20 +24105,27 @@ Dblqh::checkLcpFragWatchdog(Signal* sign
LCPFragWatchdog::WarnPeriodsWithNoProgress)
{
jam();
+ const char* completionStatusString =
+ (c_lcpFragWatchdog.lcpState == LcpStatusConf::LCP_SCANNING?
+ "rows completed":
+ "bytes remaining.");
+
warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
- " %llu rows completed",
+ " %llu %s",
c_lcpFragWatchdog.tableId,
c_lcpFragWatchdog.fragId,
(LCPFragWatchdog::PollingPeriodMillis *
c_lcpFragWatchdog.pollCount) / 1000,
- c_lcpFragWatchdog.rowCount);
+ c_lcpFragWatchdog.completionStatus,
+ completionStatusString);
ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
- " %llu rows completed",
+ " %llu %s",
c_lcpFragWatchdog.tableId,
c_lcpFragWatchdog.fragId,
(LCPFragWatchdog::PollingPeriodMillis *
c_lcpFragWatchdog.pollCount) / 1000,
- c_lcpFragWatchdog.rowCount);
+ c_lcpFragWatchdog.completionStatus,
+ completionStatusString);
if (c_lcpFragWatchdog.pollCount >=
LCPFragWatchdog::MaxPeriodsWithNoProgress)
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4650 to 4651) | Frazer Clement | 12 Nov |