4011 Jonas Oreland 2010-12-02 [merge]
ndb - merge 70 to 71
modified:
storage/ndb/include/kernel/signaldata/LqhKey.hpp
storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
storage/ndb/include/ndbapi/NdbOperation.hpp
storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/src/ndbapi/NdbOperation.cpp
storage/ndb/src/ndbapi/NdbOperationDefine.cpp
storage/ndb/src/ndbapi/NdbOperationExec.cpp
storage/ndb/src/ndbapi/NdbOperationInt.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
storage/ndb/src/ndbapi/ndberror.c
4010 Jonas Oreland 2010-12-02 [merge]
ndb - merge 70 to 71
modified:
storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp
storage/ndb/src/mgmsrv/testConfig.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2010-11-01 16:11:10 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2010-12-02 11:02:29 +0000
@@ -151,6 +151,9 @@ private:
static UintR getNrCopyFlag(const UintR & requestInfo);
static void setNrCopyFlag(UintR & requestInfo, UintR val);
+
+ static UintR getQueueOnRedoProblemFlag(const UintR & requestInfo);
+ static void setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val);
};
/**
@@ -177,6 +180,7 @@ private:
* z = Use rowid for insert - 1 Bit (31)
* g = gci flag - 1 Bit (12)
* n = NR copy - 1 Bit (13)
+ * q = Queue on redo problem - 1 Bit (14)
* Short LQHKEYREQ :
* 1111111111222222222233
@@ -214,6 +218,7 @@ private:
#define RI_ROWID_SHIFT (31)
#define RI_GCI_SHIFT (12)
#define RI_NR_COPY_SHIFT (13)
+#define RI_QUEUE_REDO_SHIFT (14)
/**
* Scan Info
@@ -585,6 +590,20 @@ table_version_major_lqhkeyreq(Uint32 x)
return x & 0xFFFF;
}
+
+inline
+void
+LqhKeyReq::setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val){
+ ASSERT_BOOL(val, "LqhKeyReq::setQueueOnRedoProblem");
+ requestInfo |= (val << RI_QUEUE_REDO_SHIFT);
+}
+
+inline
+UintR
+LqhKeyReq::getQueueOnRedoProblemFlag(const UintR & requestInfo){
+ return (requestInfo >> RI_QUEUE_REDO_SHIFT) & 1;
+}
+
class LqhKeyConf {
/**
* Reciver(s)
=== modified file 'storage/ndb/include/kernel/signaldata/TcKeyReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp 2010-11-10 12:28:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp 2010-12-02 11:02:29 +0000
@@ -202,6 +202,9 @@ private:
static void setReorgFlag(UintR & requestInfo, UintR val);
static UintR getReorgFlag(const UintR & requestInfo);
+
+ static void setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val);
+ static UintR getQueueOnRedoProblemFlag(const UintR & requestInfo);
/**
* Set:ers for scanInfo
*/
@@ -230,6 +233,7 @@ private:
y = Commit Type - 2 Bit 12-13
n = No disk flag - 1 Bit 1
r = reorg flag - 1 Bit 19
+ q = Queue on redo problem - 1 Bit 9
1111111111222222222233
01234567890123456789012345678901
@@ -260,6 +264,7 @@ private:
#define COMMIT_TYPE_MASK (3)
#define TC_REORG_SHIFT (19)
+#define QUEUE_ON_REDO_SHIFT (9)
/**
* Scan Info
@@ -576,4 +581,17 @@ TcKeyReq::setReorgFlag(UintR & requestIn
requestInfo |= (flag << TC_REORG_SHIFT);
}
+inline
+UintR
+TcKeyReq::getQueueOnRedoProblemFlag(const UintR & requestInfo){
+ return (requestInfo >> QUEUE_ON_REDO_SHIFT) & 1;
+}
+
+inline
+void
+TcKeyReq::setQueueOnRedoProblemFlag(UintR & requestInfo, Uint32 flag){
+ ASSERT_BOOL(flag, "TcKeyReq::setNoDiskFlag");
+ requestInfo |= (flag << QUEUE_ON_REDO_SHIFT);
+}
+
#endif
=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2010-10-28 13:24:21 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2010-12-02 11:02:29 +0000
@@ -173,7 +173,8 @@
#define CFG_DB_MAX_START_FAIL 609 /* For StopOnError=0 */
#define CFG_DB_START_FAIL_DELAY_SECS 610 /* For StopOnError=0 */
-/* 611 & 612 reserved */
+#define CFG_DB_REDO_OVERCOMMIT_LIMIT 611
+#define CFG_DB_REDO_OVERCOMMIT_COUNTER 612
#define CFG_DB_EVENTLOG_BUFFER_SIZE 613
#define CFG_DB_NUMA 614
@@ -248,6 +249,7 @@
#define CFG_BATCH_SIZE 802
#define CFG_AUTO_RECONNECT 803
#define CFG_HB_THREAD_PRIO 804
+#define CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION 805
/**
* Internal
@@ -272,4 +274,7 @@
#define ARBIT_METHOD_DEFAULT 1
#define ARBIT_METHOD_WAITEXTERNAL 2
+#define OPERATION_REDO_PROBLEM_ACTION_ABORT 0
+#define OPERATION_REDO_PROBLEM_ACTION_QUEUE 1
+
#endif
=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2010-10-13 09:33:02 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2010-12-02 11:02:29 +0000
@@ -1039,7 +1039,10 @@ public:
OO_INTERPRETED = 0x10,
OO_ANYVALUE = 0x20,
OO_CUSTOMDATA = 0x40,
- OO_LOCKHANDLE = 0x80 };
+ OO_LOCKHANDLE = 0x80,
+ OO_QUEUABLE = 0x100,
+ OO_NOT_QUEUABLE = 0x200
+ };
/* An operation-specific abort option.
* Only necessary if the default abortoption behaviour
@@ -1427,12 +1430,20 @@ protected:
// Note that scan operations always have this
// set true
Int8 theDistrKeyIndicator_; // Indicates whether distr. key is used
- Uint8 m_no_disk_flag;
- /*
- For NdbRecord, this flag indicates that we need to send the Event-attached
- word set by setAnyValue().
- */
- Uint8 m_use_any_value;
+
+ enum OP_FLAGS {
+ OF_NO_DISK = 0x1,
+
+ /*
+ For NdbRecord, this flag indicates that we need to send the Event-attached
+ word set by setAnyValue().
+ */
+ OF_USE_ANY_VALUE = 0x2,
+ OF_QUEUEABLE = 0x4
+ };
+ Uint8 m_flags;
+
+ Uint8 _unused1;
Uint16 m_tcReqGSN;
Uint16 m_keyInfoGSN;
=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp 2010-12-02 11:02:29 +0000
@@ -59,6 +59,8 @@ printLQHKEYREQ(FILE * output, const Uint
fprintf(output, "NrCopy ");
if(LqhKeyReq::getGCIFlag(reqInfo))
fprintf(output, "GCI ");
+ if(LqhKeyReq::getQueueOnRedoProblemFlag(reqInfo))
+ fprintf(output, "Queue ");
fprintf(output, "ScanInfo/noFiredTriggers: H\'%x\n", sig->scanInfo);
=== modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2010-01-28 15:16:46 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2010-12-02 11:02:29 +0000
@@ -72,8 +72,11 @@ printTCKEYREQ(FILE * output, const Uint3
fprintf(output, "Interpreted ");
}
if(sig->getDistributionKeyFlag(sig->requestInfo)){
- fprintf(output, " d-key");
+ fprintf(output, "d-key ");
}
+ if(sig->getQueueOnRedoProblemFlag(sig->requestInfo))
+ fprintf(output, "Queue ");
+
fprintf(output, "\n");
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2010-10-28 12:59:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2010-12-02 11:02:29 +0000
@@ -316,6 +316,7 @@ class Lgman;
#define ZDROP_TABLE_IN_PROGRESS 1226
#define ZINVALID_SCHEMA_VERSION 1227
#define ZTABLE_READ_ONLY 1233
+#define ZREDO_IO_PROBLEM 1234
/* ------------------------------------------------------------------------- */
/* ERROR CODES ADDED IN VERSION 2.X */
@@ -1022,6 +1023,54 @@ public:
Uint32 m_outstanding;
}; // Size 76 bytes
typedef Ptr<LcpRecord> LcpRecordPtr;
+
+ struct IOTracker
+ {
+ STATIC_CONST( SAMPLE_TIME = 128 ); // millis
+ STATIC_CONST( SLIDING_WINDOW_LEN = 1024 ); // millis
+ STATIC_CONST( SLIDING_WINDOW_HISTORY_LEN = 8 );
+
+ void init(Uint32 partNo);
+ Uint32 m_log_part_no;
+ Uint32 m_current_time;
+
+ /**
+ * Keep sliding window of measurement
+ */
+ Uint32 m_save_pos; // current pos in array
+ Uint32 m_save_written_bytes[SLIDING_WINDOW_HISTORY_LEN];
+ Uint32 m_save_elapsed_millis[SLIDING_WINDOW_HISTORY_LEN];
+
+ /**
+ * Current sum of sliding window
+ */
+ Uint32 m_curr_written_bytes;
+ Uint32 m_curr_elapsed_millis;
+
+ /**
+ * Currently outstanding bytes
+ */
+ Uint32 m_sum_outstanding_bytes;
+
+ /**
+ * How many times did we pass lag-threshold
+ */
+ Uint32 m_lag_cnt;
+
+ /**
+ * bytes send during current sample
+ */
+ Uint32 m_sample_sent_bytes;
+
+ /**
+ * bytes completed during current sample
+ */
+ Uint32 m_sample_completed_bytes;
+
+ int tick(Uint32 now, Uint32 maxlag, Uint32 maxlag_cnt);
+ void send_io(Uint32 bytes);
+ void complete_io(Uint32 bytes);
+ };
/* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
/* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
@@ -1062,11 +1111,7 @@ public:
SR_THIRD_PHASE_COMPLETED = 5,
SR_FOURTH_PHASE_STARTED = 6, ///< Finding the log tail and head
///< is the fourth phase.
- SR_FOURTH_PHASE_COMPLETED = 7,
- FILE_CHANGE_PROBLEM = 8 ///< For some reason the write to
- ///< page zero in file zero have not
- ///< finished after 15 mbyte of
- ///< log data have been written
+ SR_FOURTH_PHASE_COMPLETED = 7
};
enum WaitWriteGciLog {
WWGL_TRUE = 0,
@@ -1128,10 +1173,6 @@ public:
* Contains a reference to the first log file, file number 0.
*/
UintR firstLogfile;
- /**
- * The head of the operations queued for logging.
- */
- UintR firstLogQueue;
/**
* This variable contains the oldest operation in this log
* part which have not been committed yet.
@@ -1154,10 +1195,25 @@ public:
* are in memory and which are not.
*/
UintR lastPageRef;
+
+ struct OperationQueue
+ {
+ void init() { firstElement = lastElement = RNIL;}
+ bool isEmpty() const { return firstElement == RNIL; }
+ Uint32 firstElement;
+ Uint32 lastElement;
+ };
+
/**
- * The tail of the operations queued for logging.
+ * operations queued waiting on REDO to prepare
*/
- UintR lastLogQueue;
+ struct OperationQueue m_log_prepare_queue;
+
+ /**
+ * operations queued waiting on REDO to commit/abort
+ */
+ struct OperationQueue m_log_complete_queue;
+
/**
* This variable contains the newest operation in this log
* part which have not been committed yet.
@@ -1202,7 +1258,12 @@ public:
/**
* does current log-part have tail-problem (i.e 410)
*/
- bool m_tail_problem;
+ enum {
+ P_TAIL_PROBLEM = 0x1,// 410
+ P_REDO_IO_PROBLEM = 0x2,// 1234
+ P_FILE_CHANGE_PROBLEM = 0x4 // 1220
+ };
+ Uint32 m_log_problems;
/**
* A timer that is set every time a log page is sent to disk.
@@ -1354,6 +1415,11 @@ public:
* For MT LQH the log part (0-3).
*/
Uint16 logPartNo;
+
+ /**
+ * IO tracker...
+ */
+ struct IOTracker m_io_tracker;
}; // Size 164 Bytes
typedef Ptr<LogPartRecord> LogPartRecordPtr;
@@ -2340,7 +2406,7 @@ private:
LogFileRecordPtr* parLogFilePtr);
void findPageRef(Signal* signal, CommitLogRecord* commitLogRecord);
int findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec);
- void getFirstInLogQueue(Signal* signal);
+ void getFirstInLogQueue(Signal* signal, Ptr<TcConnectionrec>&dst);
bool getFragmentrec(Signal* signal, Uint32 fragId);
void initialiseAddfragrec(Signal* signal);
void initialiseFragrec(Signal* signal);
@@ -2372,7 +2438,7 @@ private:
void initReqinfoExecSr(Signal* signal);
bool insertFragrec(Signal* signal, Uint32 fragId);
void linkFragQueue(Signal* signal);
- void linkWaitLog(Signal* signal, LogPartRecordPtr regLogPartPtr);
+ void linkWaitLog(Signal*, LogPartRecordPtr, LogPartRecord::OperationQueue &);
void logNextStart(Signal* signal);
void moveToPageRef(Signal* signal);
void readAttrinfo(Signal* signal);
@@ -2464,6 +2530,8 @@ private:
void abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode);
void localAbortStateHandlerLab(Signal* signal);
void logLqhkeyreqLab(Signal* signal);
+ void logLqhkeyreqLab_problems(Signal* signal);
+ void update_log_problem(Signal*, LogPartRecordPtr, Uint32 problem, bool);
void lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length);
void rwConcludedAiLab(Signal* signal);
void aiStateErrorCheckLab(Signal* signal, Uint32* dataPtr, Uint32 length);
@@ -3163,6 +3231,8 @@ public:
} c_Counters;
+ Uint32 c_max_redo_lag;
+ Uint32 c_max_redo_lag_counter;
Uint64 cTotalLqhKeyReqCount;
inline bool getAllowRead() const {
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2010-03-10 07:43:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2010-12-02 11:02:29 +0000
@@ -87,6 +87,8 @@ void Dblqh::initData()
c_free_mb_tail_problem_limit = 4;
cTotalLqhKeyReqCount = 0;
+ c_max_redo_lag = 30; // seconds
+ c_max_redo_lag_counter = 3; // 3 strikes and you're out
}//Dblqh::initData()
void Dblqh::initRecords()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-11-24 12:16:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-12-02 11:02:29 +0000
@@ -79,6 +79,9 @@
#include "../suma/Suma.hpp"
#include "DblqhCommon.hpp"
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
+
// Use DEBUG to print messages that should be
// seen only when we debug the product
#ifdef VM_TRACE
@@ -286,94 +289,126 @@ void Dblqh::execCONTINUEB(Signal* signal
return;
}//if
#endif
+ LogPartRecordPtr save;
switch (tcase) {
case ZLOG_LQHKEYREQ:
if (cnoOfLogPages == 0) {
jam();
+ busywait:
sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
return;
}//if
logPartPtr.i = data0;
ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+ save = logPartPtr;
+
+ logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
+
+ if (logPartPtr.p->waitWriteGciLog == LogPartRecord::WWGL_TRUE)
+ {
+ jam();
+ goto startnext;
+ }
+ if (logPartPtr.p->m_log_complete_queue.isEmpty())
+ {
+ jam();
+ /**
+ * prepare is first in queue...check that it's ok to rock'n'roll
+ */
+ if (logPartPtr.p->m_log_problems != 0)
+ {
+ /**
+ * It will be restarted when problems are cleared...
+ */
+ jam();
+ return;
+ }
+
+ if (cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION)
+ {
+ jam();
+ logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
+ goto busywait;
+ }
+ }
+
logFilePtr.i = logPartPtr.p->currentLogfile;
ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
logPagePtr.i = logFilePtr.p->currentLogpage;
ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- tcConnectptr.i = logPartPtr.p->firstLogQueue;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
+ getFirstInLogQueue(signal, tcConnectptr);
fragptr.i = tcConnectptr.p->fragmentptr;
c_fragment_pool.getPtr(fragptr);
- logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
- getFirstInLogQueue(signal);
+ // so that operation can continue...
+ ndbrequire(logPartPtr.p->logPartState == LogPartRecord::ACTIVE);
+ logPartPtr.p->logPartState = LogPartRecord::IDLE;
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::LOG_QUEUED:
- if (tcConnectptr.p->abortState != TcConnectionrec::ABORT_IDLE) {
+ if (tcConnectptr.p->abortState != TcConnectionrec::ABORT_IDLE)
+ {
jam();
- logNextStart(signal);
abortCommonLab(signal);
- return;
- } else {
+ }
+ else
+ {
jam();
-/*------------------------------------------------------------*/
-/* WE MUST SET THE STATE OF THE LOG PART TO IDLE TO */
-/* ENSURE THAT WE ARE NOT QUEUED AGAIN ON THE LOG PART */
-/* WE WILL SET THE LOG PART STATE TO ACTIVE IMMEDIATELY */
-/* SO NO OTHER PROCESS WILL SEE THIS STATE. IT IS MERELY*/
-/* USED TO ENABLE REUSE OF CODE. */
-/*------------------------------------------------------------*/
- if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::IDLE;
- }//if
logLqhkeyreqLab(signal);
- return;
- }//if
+ }
break;
case TcConnectionrec::LOG_ABORT_QUEUED:
jam();
writeAbortLog(signal);
removeLogTcrec(signal);
- logNextStart(signal);
continueAfterLogAbortWriteLab(signal);
- return;
break;
case TcConnectionrec::LOG_COMMIT_QUEUED:
case TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL:
jam();
writeCommitLog(signal, logPartPtr);
- logNextStart(signal);
if (tcConnectptr.p->transactionState == TcConnectionrec::LOG_COMMIT_QUEUED) {
if (tcConnectptr.p->seqNoReplica == 0 ||
- tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY) {
+ tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY)
+ {
jam();
localCommitLab(signal);
- } else {
+ }
+ else
+ {
jam();
commitReplyLab(signal);
- }//if
- return;
- } else {
+ }
+ }
+ else
+ {
jam();
tcConnectptr.p->transactionState = TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL;
- return;
- }//if
+ }
break;
case TcConnectionrec::COMMIT_QUEUED:
jam();
- logNextStart(signal);
localCommitLab(signal);
break;
case TcConnectionrec::ABORT_QUEUED:
jam();
- logNextStart(signal);
abortCommonLab(signal);
break;
default:
ndbrequire(false);
break;
}//switch
+
+ /**
+ * LogFile/LogPage could have altered due to above
+ */
+ startnext:
+ logPartPtr = save;
+ logFilePtr.i = logPartPtr.p->currentLogfile;
+ ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
+ logPagePtr.i = logFilePtr.p->currentLogpage;
+ ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
+ logNextStart(signal);
return;
break;
case ZSR_GCI_LIMITS:
@@ -751,6 +786,7 @@ void Dblqh::execNDB_STTOR(Signal* signal
// Dont setReturnedReadLenAIFlag
// Dont setAPIVersion
LqhKeyReq::setMarkerFlag(preComputedRequestInfoMask, 1);
+ LqhKeyReq::setQueueOnRedoProblemFlag(preComputedRequestInfoMask, 1);
//preComputedRequestInfoMask = 0x003d7fff;
startphase1Lab(signal, /* dummy */ ~0, ownNodeId);
@@ -1288,6 +1324,13 @@ void Dblqh::execREAD_CONFIG_REQ(Signal*
initRecords();
initialiseRecordsLab(signal, 0, ref, senderData);
+ c_max_redo_lag = 30;
+ ndb_mgm_get_int_parameter(p, CFG_DB_REDO_OVERCOMMIT_LIMIT,
+ &c_max_redo_lag);
+
+ c_max_redo_lag_counter = 3;
+ ndb_mgm_get_int_parameter(p, CFG_DB_REDO_OVERCOMMIT_COUNTER,
+ &c_max_redo_lag_counter);
return;
}//Dblqh::execSIZEALT_REP()
@@ -2756,12 +2799,40 @@ Dblqh::wait_readonly(Signal* signal)
void Dblqh::execTIME_SIGNAL(Signal* signal)
{
jamEntry();
- cLqhTimeOutCount++;
- cLqhTimeOutCheckCount++;
+
+ cLqhTimeOutCount ++;
+ cLqhTimeOutCheckCount ++;
+
+ for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++)
+ {
+ jam();
+ ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+ int ret = logPartPtr.p->m_io_tracker.tick(10 * cLqhTimeOutCount,
+ c_max_redo_lag,
+ c_max_redo_lag_counter);
+ if (ret < 0)
+ {
+ /**
+ * set problem
+ */
+ update_log_problem(signal, logPartPtr,
+ LogPartRecord::P_REDO_IO_PROBLEM, true);
+ }
+ else if (ret > 0)
+ {
+ /**
+ * clear
+ */
+ update_log_problem(signal, logPartPtr,
+ LogPartRecord::P_REDO_IO_PROBLEM, false);
+ }
+ }
+
if (cLqhTimeOutCheckCount < 1000) {
jam();
return;
}//if
+
cLqhTimeOutCheckCount = 0;
#ifdef VM_TRACE
TcConnectionrecPtr tTcConptr;
@@ -6132,26 +6203,14 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
UintR tcurrentFilepage;
TcConnectionrecPtr tmpTcConnectptr;
- if (unlikely(cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION) ||
- ERROR_INSERTED(5032))
- {
- jam();
- if(ERROR_INSERTED(5032)){
- CLEAR_ERROR_INSERT_VALUE;
- }
-/*---------------------------------------------------------------------------*/
-// The log disk is having problems in catching up with the speed of execution.
-// We must wait with writing the log of this operation to ensure we do not
-// overload the log.
-/*---------------------------------------------------------------------------*/
- terrorCode = ZTEMPORARY_REDO_LOG_FAILURE;
- abortErrorLab(signal);
- return;
- }//if
+ const bool out_of_log_buffer = cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION;
TcConnectionrec * const regTcPtr = tcConnectptr.p;
logPartPtr.i = regTcPtr->m_log_part_ptr_i;
ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+ bool abort_on_redo_problems =
+ (LqhKeyReq::getQueueOnRedoProblemFlag(regTcPtr->reqinfo) == 0);
+
/* -------------------------------------------------- */
/* THIS PART IS USED TO WRITE THE LOG */
/* -------------------------------------------------- */
@@ -6161,69 +6220,38 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
/* RESTART WHEN THE LOG PART IS FREE AGAIN. */
/* -------------------------------------------------- */
LogPartRecord * const regLogPartPtr = logPartPtr.p;
-
- if (unlikely(regLogPartPtr->m_tail_problem))
+ const bool problem = out_of_log_buffer || regLogPartPtr->m_log_problems != 0;
+ if (unlikely(problem))
{
- jam();
- terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
- abortErrorLab(signal);
- return;
- }
-
- if(ERROR_INSERTED(5033)){
- jam();
- CLEAR_ERROR_INSERT_VALUE;
-
- if ((regLogPartPtr->firstLogQueue != RNIL) &&
- (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
- /* -------------------------------------------------- */
- /* WE HAVE A PROBLEM IN THAT THE LOG HAS NO */
- /* ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
- /* -------------------------------------------------- */
- /* -------------------------------------------------- */
- /* WE MUST STILL RESTART QUEUED OPERATIONS SO */
- /* THEY ALSO CAN BE ABORTED. */
- /* -------------------------------------------------- */
- regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
- signal->theData[0] = ZLOG_LQHKEYREQ;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
-
- terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
- abortErrorLab(signal);
- return;
+ if (abort_on_redo_problems)
+ {
+ logLqhkeyreqLab_problems(signal);
+ return;
+ }
+ else
+ {
+ goto queueop;
+ }
}
- if (regLogPartPtr->logPartState == LogPartRecord::IDLE) {
+ if (regLogPartPtr->logPartState == LogPartRecord::IDLE)
+ {
;
- } else if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) {
+ }
+ else if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE)
+ {
+queueop:
jam();
- linkWaitLog(signal, logPartPtr);
+ linkWaitLog(signal, logPartPtr, logPartPtr.p->m_log_prepare_queue);
regTcPtr->transactionState = TcConnectionrec::LOG_QUEUED;
return;
- } else {
- if ((regLogPartPtr->firstLogQueue != RNIL) &&
- (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
-/* -------------------------------------------------- */
-/* WE HAVE A PROBLEM IN THAT THE LOG HAS NO */
-/* ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
-/* -------------------------------------------------- */
-/* -------------------------------------------------- */
-/* WE MUST STILL RESTART QUEUED OPERATIONS SO */
-/* THEY ALSO CAN BE ABORTED. */
-/* -------------------------------------------------- */
- regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
- signal->theData[0] = ZLOG_LQHKEYREQ;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
- ndbrequire(regLogPartPtr->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM);
- terrorCode = ZFILE_CHANGE_PROBLEM_IN_LOG_ERROR;
- abortErrorLab(signal);
+ }
+ else
+ {
+ ndbrequire(false);
return;
}//if
- regLogPartPtr->logPartState = LogPartRecord::ACTIVE;
+
logFilePtr.i = regLogPartPtr->currentLogfile;
ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
/* -------------------------------------------------- */
@@ -6279,7 +6307,6 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
/* -------------------------------------------------- */
writeAttrinfoLab(signal);
- logNextStart(signal);
/* -------------------------------------------------- */
/* RESET THE STATE OF THE LOG PART. IF ANY */
/* OPERATIONS HAVE QUEUED THEN START THE FIRST */
@@ -6321,6 +6348,80 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
}//if
}//Dblqh::logLqhkeyreqLab()
+void
+Dblqh::logLqhkeyreqLab_problems(Signal * signal)
+{
+ jam();
+ LogPartRecord * const regLogPartPtr = logPartPtr.p;
+ Uint32 problems = regLogPartPtr->m_log_problems;
+
+ if (cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION)
+ {
+ jam();
+ terrorCode = ZTEMPORARY_REDO_LOG_FAILURE;
+ }
+ else if ((problems & LogPartRecord::P_TAIL_PROBLEM) != 0)
+ {
+ jam();
+ terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
+ }
+ else if ((problems & LogPartRecord::P_REDO_IO_PROBLEM) != 0)
+ {
+ jam();
+ terrorCode = ZREDO_IO_PROBLEM;
+ }
+ else if ((problems & LogPartRecord::P_FILE_CHANGE_PROBLEM) != 0)
+ {
+ jam();
+ terrorCode = ZFILE_CHANGE_PROBLEM_IN_LOG_ERROR;
+ }
+ abortErrorLab(signal);
+}
+
+void
+Dblqh::update_log_problem(Signal* signal, Ptr<LogPartRecord> partPtr,
+ Uint32 problem, bool value)
+{
+ Uint32 problems = partPtr.p->m_log_problems;
+ if (value)
+ {
+ /**
+ * set
+ */
+ jam();
+ if ((problems & problem) == 0)
+ {
+ jam();
+ problems |= problem;
+ }
+ }
+ else
+ {
+ /**
+ * clear
+ */
+ jam();
+ if ((problems & problem) != 0)
+ {
+ jam();
+ problems &= ~(Uint32)problem;
+
+ if (partPtr.p->LogLqhKeyReqSent == ZFALSE &&
+ (!partPtr.p->m_log_prepare_queue.isEmpty() ||
+ !partPtr.p->m_log_complete_queue.isEmpty()))
+ {
+ jam();
+
+ partPtr.p->LogLqhKeyReqSent = ZTRUE;
+ signal->theData[0] = ZLOG_LQHKEYREQ;
+ signal->theData[1] = partPtr.i;
+ sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+ }
+ }
+ }
+ partPtr.p->m_log_problems = problems;
+}
+
/* ------------------------------------------------------------------------- */
/* ------- SEND LQHKEYREQ */
/* */
@@ -7585,8 +7686,9 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
jam();
regLogPartPtr.i = regTcPtr->m_log_part_ptr_i;
ptrCheckGuard(regLogPartPtr, clogPartFileSize, logPartRecord);
- if ((regLogPartPtr.p->logPartState == LogPartRecord::ACTIVE) ||
- (noOfLogPages == 0)) {
+ if (!regLogPartPtr.p->m_log_complete_queue.isEmpty() ||
+ (noOfLogPages == 0))
+ {
jam();
/*---------------------------------------------------------------------------*/
/* THIS LOG PART WAS CURRENTLY ACTIVE WRITING ANOTHER LOG RECORD. WE MUST */
@@ -7597,7 +7699,7 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
// log part to ensure that we don't get a buffer explosion in the delayed
// signal buffer instead.
/*---------------------------------------------------------------------------*/
- linkWaitLog(signal, regLogPartPtr);
+ linkWaitLog(signal, regLogPartPtr, regLogPartPtr.p->m_log_complete_queue);
if (transState == TcConnectionrec::PREPARED) {
jam();
regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL;
@@ -7606,10 +7708,6 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
ndbrequire(transState == TcConnectionrec::PREPARED_RECEIVED_COMMIT);
regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED;
}//if
- if (regLogPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- regLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
return;
}//if
writeCommitLog(signal, regLogPartPtr);
@@ -8588,7 +8686,9 @@ void Dblqh::continueAbortLab(Signal* sig
* TRANSACTION.
* ---------------------------------------------------------------------- */
initLogPointers(signal);
- if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
+ if (cnoOfLogPages == 0 ||
+ !logPartPtr.p->m_log_complete_queue.isEmpty())
+ {
jam();
/* --------------------------------------------------------------------
* A PREPARE OPERATION IS CURRENTLY WRITING IN THE LOG.
@@ -8596,24 +8696,8 @@ void Dblqh::continueAbortLab(Signal* sig
* IT IS NECESSARY TO WRITE ONE LOG RECORD COMPLETELY
* AT A TIME OTHERWISE WE WILL SCRAMBLE THE LOG.
* -------------------------------------------------------------------- */
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
- return;
- }//if
- if (cnoOfLogPages == 0) {
- jam();
-/*---------------------------------------------------------------------------*/
-// We must delay the write of commit info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
- linkWaitLog(signal, logPartPtr);
+ linkWaitLog(signal, logPartPtr, logPartPtr.p->m_log_complete_queue);
regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
- if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
return;
}//if
writeAbortLog(signal);
@@ -13692,7 +13776,8 @@ retry:
if (tailmoved && mb > c_free_mb_tail_problem_limit)
{
jam();
- sltLogPartPtr.p->m_tail_problem = false;
+ update_log_problem(signal, sltLogPartPtr,
+ LogPartRecord::P_TAIL_PROBLEM, false);
}
else if (!tailmoved && mb <= c_free_mb_force_lcp_limit)
{
@@ -13941,6 +14026,14 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
jam();
logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_TRUE;
tlogActive = true;
+ if (logPartPtr.p->LogLqhKeyReqSent == ZFALSE)
+ {
+ jam();
+ logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
+ signal->theData[0] = ZLOG_LQHKEYREQ;
+ signal->theData[1] = logPartPtr.i;
+ sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+ }
} else {
jam();
logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_FALSE;
@@ -14690,6 +14783,13 @@ void Dblqh::initFsrwconf(Signal* signal,
logP= logPagePtr;
noPages= 1;
ndbassert(totPages > 0);
+
+ if (write)
+ {
+ Uint32 bytesWritten = totPages * 32768;
+ logPartPtr.p->m_io_tracker.complete_io(bytesWritten);
+ }
+
for (;;)
{
logP.p->logPageWord[ZPOS_IN_WRITING]= 0;
@@ -14703,6 +14803,7 @@ void Dblqh::initFsrwconf(Signal* signal,
ptrCheckGuard(logP, clogPageFileSize, logPageRecord);
noPages++;
}
+
}//Dblqh::initFsrwconf()
/* ######################################################################### */
@@ -14730,32 +14831,8 @@ void Dblqh::timeSup(Signal* signal)
ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
if (logPartPtr.p->logPartTimer != logPartPtr.p->logTimer) {
jam();
-/*--------------------------------------------------------------------------*/
-/* THIS LOG PART HAS NOT WRITTEN TO DISK DURING THE LAST SECOND. */
-/*--------------------------------------------------------------------------*/
- switch (logPartPtr.p->logPartState) {
- case LogPartRecord::FILE_CHANGE_PROBLEM:
- jam();
-/*--------------------------------------------------------------------------*/
-/* THIS LOG PART HAS PROBLEMS IN CHANGING FILES MAKING IT IMPOSSIBLE */
-// TO WRITE TO THE FILE CURRENTLY. WE WILL COMEBACK LATER AND SEE IF
-// THE PROBLEM HAS BEEN FIXED.
-/*--------------------------------------------------------------------------*/
- case LogPartRecord::ACTIVE:
- jam();
-/*---------------------------------------------------------------------------*/
-/* AN OPERATION IS CURRENTLY ACTIVE IN WRITING THIS LOG PART. WE THUS CANNOT */
-/* WRITE ANYTHING TO DISK AT THIS MOMENT. WE WILL SEND A SIGNAL DELAYED FOR */
-/* 10 MS AND THEN TRY AGAIN. POSSIBLY THE LOG PART WILL HAVE BEEN WRITTEN */
-/* UNTIL THEN OR ELSE IT SHOULD BE FREE TO WRITE AGAIN. */
-/*---------------------------------------------------------------------------*/
- signal->theData[0] = ZTIME_SUPERVISION;
- signal->theData[1] = logPartPtr.i;
- sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
- return;
- break;
- case LogPartRecord::IDLE:
- jam();
+ if (true) // less merge conflicts
+ {
/*---------------------------------------------------------------------------*/
/* IDLE AND NOT WRITTEN TO DISK IN A SECOND. ALSO WHEN WE HAVE A TAIL PROBLEM*/
/* WE HAVE TO WRITE TO DISK AT TIMES. WE WILL FIRST CHECK WHETHER ANYTHING */
@@ -14817,12 +14894,9 @@ void Dblqh::timeSup(Signal* signal)
}//if
}//if
}//if
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- }//if
+ }
+ }
+
logPartPtr.p->logTimer++;
return;
}//Dblqh::timeSup()
@@ -15049,29 +15123,14 @@ void Dblqh::lastWriteInFileLab(Signal* s
void Dblqh::writePageZeroLab(Signal* signal, Uint32 from)
{
- if (logPartPtr.p->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM)
+ if ((logPartPtr.p->m_log_problems & LogPartRecord::P_FILE_CHANGE_PROBLEM)!= 0)
{
- if (logPartPtr.p->firstLogQueue == RNIL)
- {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::IDLE;
- }
- else
- {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- if (logPartPtr.p->LogLqhKeyReqSent == ZFALSE)
- {
- jam();
-
- logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
- signal->theData[0] = ZLOG_LQHKEYREQ;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }
- }
+ jam();
+ update_log_problem(signal, logPartPtr,
+ LogPartRecord::P_FILE_CHANGE_PROBLEM,
+ /* clear */ false);
}
-
+
logFilePtr.p->fileChangeState = LogFileRecord::NOT_ONGOING;
/*---------------------------------------------------------------------------*/
@@ -15797,6 +15856,8 @@ void Dblqh::writeSinglePage(Signal* sign
ndbrequire(logFilePtr.p->fileRef != RNIL);
+ logPartPtr.p->m_io_tracker.send_io(32768);
+
if (DEBUG_REDO)
{
ndbout_c("writeSingle 1 page at part: %u file: %u pos: %u",
@@ -16499,7 +16560,8 @@ Dblqh::rebuildOrderedIndexes(Signal* sig
if (mb <= c_free_mb_tail_problem_limit)
{
jam();
- logPartPtr.p->m_tail_problem = true;
+ update_log_problem(signal, logPartPtr,
+ LogPartRecord::P_TAIL_PROBLEM, true);
}
}
@@ -19195,6 +19257,8 @@ void Dblqh::completedLogPage(Signal* sig
ndbrequire(logFilePtr.p->fileRef != RNIL);
+ logPartPtr.p->m_io_tracker.send_io(32768*twlpNoPages);
+
if (DEBUG_REDO)
{
ndbout_c("writing %d pages at part: %u file: %u pos: %u",
@@ -19424,20 +19488,30 @@ void Dblqh::findPageRef(Signal* signal,
/* */
/* SUBROUTINE SHORT NAME = GFL */
/* ------------------------------------------------------------------------- */
-void Dblqh::getFirstInLogQueue(Signal* signal)
+void
+Dblqh::getFirstInLogQueue(Signal* signal,
+ Ptr<TcConnectionrec> & dst)
{
- TcConnectionrecPtr gflTcConnectptr;
+ TcConnectionrecPtr tmp;
/* -------------------------------------------------- */
/* GET THE FIRST FROM THE LOG QUEUE AND REMOVE */
/* IT FROM THE QUEUE. */
/* -------------------------------------------------- */
- gflTcConnectptr.i = logPartPtr.p->firstLogQueue;
- ptrCheckGuard(gflTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- logPartPtr.p->firstLogQueue = gflTcConnectptr.p->nextTcLogQueue;
- if (logPartPtr.p->firstLogQueue == RNIL) {
+ LogPartRecord::OperationQueue * queue = &logPartPtr.p->m_log_complete_queue;
+ tmp.i = queue->firstElement;
+ if (tmp.i == RNIL)
+ {
jam();
- logPartPtr.p->lastLogQueue = RNIL;
+ queue = &logPartPtr.p->m_log_prepare_queue;
+ tmp.i = queue->firstElement;
+ }
+ ptrCheckGuard(tmp, ctcConnectrecFileSize, tcConnectionrec);
+ queue->firstElement = tmp.p->nextTcLogQueue;
+ if (queue->firstElement == RNIL) {
+ jam();
+ queue->lastElement = RNIL;
}//if
+ dst = tmp;
}//Dblqh::getFirstInLogQueue()
/* ---------------------------------------------------------------- */
@@ -19977,18 +20051,19 @@ void Dblqh::initLogpart(Signal* signal)
logPartPtr.p->logExecState = LogPartRecord::LES_IDLE;
logPartPtr.p->firstLogTcrec = RNIL;
logPartPtr.p->lastLogTcrec = RNIL;
- logPartPtr.p->firstLogQueue = RNIL;
- logPartPtr.p->lastLogQueue = RNIL;
logPartPtr.p->gcprec = RNIL;
logPartPtr.p->firstPageRef = RNIL;
logPartPtr.p->lastPageRef = RNIL;
logPartPtr.p->headFileNo = ZNIL;
logPartPtr.p->headPageNo = ZNIL;
logPartPtr.p->headPageIndex = ZNIL;
- logPartPtr.p->m_tail_problem = 0;
+ logPartPtr.p->m_log_problems = 0;
NdbLogPartInfo lpinfo(instance());
ndbrequire(lpinfo.partCount == clogPartFileSize);
logPartPtr.p->logPartNo = lpinfo.partNo[logPartPtr.i];
+ logPartPtr.p->m_io_tracker.init(logPartPtr.p->logPartNo);
+ logPartPtr.p->m_log_prepare_queue.init();
+ logPartPtr.p->m_log_complete_queue.init();
}//Dblqh::initLogpart()
/* ==========================================================================
@@ -20121,26 +20196,30 @@ void Dblqh::linkFragQueue(Signal* signal
// tcConnectptr
// logPartPtr
* ------------------------------------------------------------------------- */
-void Dblqh::linkWaitLog(Signal* signal, LogPartRecordPtr regLogPartPtr)
+void
+Dblqh::linkWaitLog(Signal* signal,
+ LogPartRecordPtr regLogPartPtr,
+ LogPartRecord::OperationQueue & queue)
{
TcConnectionrecPtr lwlTcConnectptr;
-
/* -------------------------------------------------- */
/* LINK ACTIVE OPERATION INTO QUEUE WAITING FOR */
/* ACCESS TO THE LOG PART. */
/* -------------------------------------------------- */
- lwlTcConnectptr.i = regLogPartPtr.p->lastLogQueue;
+ lwlTcConnectptr.i = queue.lastElement;
if (lwlTcConnectptr.i == RNIL) {
jam();
- regLogPartPtr.p->firstLogQueue = tcConnectptr.i;
+ queue.firstElement = tcConnectptr.i;
} else {
jam();
ptrCheckGuard(lwlTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
lwlTcConnectptr.p->nextTcLogQueue = tcConnectptr.i;
}//if
- regLogPartPtr.p->lastLogQueue = tcConnectptr.i;
+ queue.lastElement = tcConnectptr.i;
tcConnectptr.p->nextTcLogQueue = RNIL;
- if (regLogPartPtr.p->LogLqhKeyReqSent == ZFALSE) {
+ regLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
+ if (regLogPartPtr.p->LogLqhKeyReqSent == ZFALSE)
+ {
jam();
regLogPartPtr.p->LogLqhKeyReqSent = ZTRUE;
signal->theData[0] = ZLOG_LQHKEYREQ;
@@ -20164,36 +20243,38 @@ void Dblqh::logNextStart(Signal* signal)
UintR tlnsStillWaiting;
LogPartRecord * const regLogPartPtr = logPartPtr.p;
- if ((regLogPartPtr->firstLogQueue == RNIL) &&
- (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) &&
- (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE)) {
+ if (regLogPartPtr->m_log_prepare_queue.isEmpty() &&
+ regLogPartPtr->m_log_complete_queue.isEmpty() &&
+ (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE))
+ {
// --------------------------------------------------------------------------
// Optimised route for the common case
// --------------------------------------------------------------------------
- regLogPartPtr->logPartState = LogPartRecord::IDLE;
return;
}//if
- if (regLogPartPtr->firstLogQueue != RNIL) {
+
+ if (!regLogPartPtr->m_log_prepare_queue.isEmpty() ||
+ !regLogPartPtr->m_log_complete_queue.isEmpty())
+ {
jam();
- if (regLogPartPtr->LogLqhKeyReqSent == ZFALSE) {
+ regLogPartPtr->logPartState = LogPartRecord::ACTIVE;
+ if (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)
+ {
jam();
regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
signal->theData[0] = ZLOG_LQHKEYREQ;
signal->theData[1] = logPartPtr.i;
sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
}//if
- } else {
- if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) {
- jam();
- regLogPartPtr->logPartState = LogPartRecord::IDLE;
- } else {
- jam();
- }//if
- }//if
- if (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE) {
+ }
+
+ if (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE)
+ {
jam();
return;
- } else {
+ }
+ else
+ {
jam();
/* --------------------------------------------------------------------------
* A COMPLETE GCI LOG RECORD IS WAITING TO BE WRITTEN. WE GIVE THIS HIGHEST
@@ -21201,6 +21282,8 @@ void Dblqh::writeDirty(Signal* signal, U
ndbrequire(logFilePtr.p->fileRef != RNIL);
+ logPartPtr.p->m_io_tracker.send_io(32768);
+
if (DEBUG_REDO)
{
ndbout_c("writeDirty 1 page at part: %u file: %u pos: %u",
@@ -21403,12 +21486,12 @@ void Dblqh::writeNextLog(Signal* signal)
char buf[100];
BaseString::snprintf(buf, sizeof(buf),
"Head/Tail met in REDO log, logpart: %u"
- " file: %u mbyte: %u state: %u tail-problem: %u",
+ " file: %u mbyte: %u state: %u log-problem: %u",
logPartPtr.p->logPartNo,
logFilePtr.p->fileNo,
logFilePtr.p->currentMbyte,
logPartPtr.p->logPartState,
- logPartPtr.p->m_tail_problem);
+ logPartPtr.p->m_log_problems);
signal->theData[0] = 2398;
@@ -21420,9 +21503,12 @@ void Dblqh::writeNextLog(Signal* signal)
if (logFilePtr.p->currentMbyte == (clogFileSize - 1)) {
jam();
twnlNextMbyte = 0;
- if (logFilePtr.p->fileChangeState != LogFileRecord::NOT_ONGOING) {
+ if (logFilePtr.p->fileChangeState != LogFileRecord::NOT_ONGOING)
+ {
jam();
- logPartPtr.p->logPartState = LogPartRecord::FILE_CHANGE_PROBLEM;
+ update_log_problem(signal, logPartPtr,
+ LogPartRecord::P_FILE_CHANGE_PROBLEM,
+ /* set */ true);
}//if
twnlNextFileNo = wnlNextLogFilePtr.p->fileNo;
} else {
@@ -21443,7 +21529,7 @@ void Dblqh::writeNextLog(Signal* signal)
if (free_mb <= c_free_mb_tail_problem_limit)
{
jam();
- logPartPtr.p->m_tail_problem = true;
+ update_log_problem(signal, logPartPtr, LogPartRecord::P_TAIL_PROBLEM, true);
}
if (ERROR_INSERTED(5058) &&
@@ -21915,17 +22001,20 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
Ptr<LogPartRecord> lp;
lp.i = i;
ptrCheckGuard(lp, clogPartFileSize, logPartRecord);
- ndbout_c("LP %d blockInstance: %d partNo: %d state: %d WW_Gci: %d gcprec: %d flq: %d currfile: %d tailFileNo: %d logTailMbyte: %d",
+ ndbout_c("LP %d blockInstance: %d partNo: %d state: %d WW_Gci: %d gcprec: %d flq: %u %u currfile: %d tailFileNo: %d logTailMbyte: %d cnoOfLogPages: %u problems: 0x%x",
i,
instance(),
lp.p->logPartNo,
lp.p->logPartState,
lp.p->waitWriteGciLog,
lp.p->gcprec,
- lp.p->firstLogQueue,
+ lp.p->m_log_prepare_queue.firstElement,
+ lp.p->m_log_complete_queue.firstElement,
lp.p->currentLogfile,
lp.p->logTailFileNo,
- lp.p->logTailMbyte);
+ lp.p->logTailMbyte,
+ cnoOfLogPages,
+ lp.p->m_log_problems);
if(gcp.i == RNIL && lp.p->gcprec != RNIL)
gcp.i = lp.p->gcprec;
@@ -23155,3 +23244,145 @@ Dblqh::send_runredo_event(Signal* signal
signal->theData[11] = lp->stopMbyte;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 12, JBB);
}
+
+void
+Dblqh::IOTracker::init(Uint32 partNo)
+{
+ bzero(this, sizeof(* this));
+ m_log_part_no = partNo;
+}
+
+int
+Dblqh::IOTracker::tick(Uint32 now, Uint32 maxlag, Uint32 maxlag_cnt)
+{
+ Uint32 t = m_current_time;
+
+ if ((t / SAMPLE_TIME) == (now / SAMPLE_TIME))
+ return 0;
+
+ m_current_time = now;
+ if (m_sample_completed_bytes >= m_sample_sent_bytes)
+ {
+ /**
+ * If we completed all io we sent during current sample...
+ * we can't have any problem...and
+ * we can't measure io throughput, so don't add measurement
+ *
+ */
+ m_sample_sent_bytes = 0;
+ m_sample_completed_bytes = 0;
+ }
+ else
+ {
+ // io maxed out...
+ Uint32 elapsed = now - t;
+ m_save_written_bytes[m_save_pos] += m_sample_completed_bytes;
+ m_save_elapsed_millis[m_save_pos] += elapsed;
+
+ m_curr_written_bytes += m_sample_completed_bytes;
+ m_curr_elapsed_millis += elapsed;
+
+ Uint32 bps = (1000 * m_sample_completed_bytes) / elapsed;
+ Uint32 lag = bps ? m_sum_outstanding_bytes / bps : 30;
+ if (false && lag >= 30)
+ {
+ g_eventLogger->info("part: %u tick(%u) m_sample_completed_bytes: %u m_sample_sent_bytes: %u elapsed: %u kbps: %u lag: %u",
+ m_log_part_no,
+ now, m_sample_completed_bytes, m_sample_sent_bytes,
+ elapsed, bps/1000, lag);
+ }
+
+ m_sample_sent_bytes -= m_sample_completed_bytes;
+ m_sample_completed_bytes = 0;
+ }
+
+ int retVal = 0;
+ Uint32 save_lag_cnt = m_lag_cnt;
+ if ((now / SLIDING_WINDOW_LEN) != (t / SLIDING_WINDOW_LEN))
+ {
+ Uint32 lag = m_curr_written_bytes ?
+ ((Uint64(m_sum_outstanding_bytes) / 1000) * Uint64(m_curr_elapsed_millis)) / m_curr_written_bytes:
+ 0;
+
+ if (lag > maxlag)
+ {
+ /**
+ * We did have lag last second...
+ * increase m_lag_cnt and check if it has reached maxlag_cnt
+ */
+ Uint32 tmp = m_lag_cnt;
+ m_lag_cnt += (lag / maxlag);
+ if (tmp < maxlag_cnt && m_lag_cnt >= maxlag_cnt)
+ {
+ retVal = -1; // start aborting transaction
+ }
+ }
+ else
+ {
+ /**
+ * We did not have lag...reset m_lag_cnt
+ */
+ if (m_lag_cnt >= maxlag_cnt)
+ {
+ // stop aborting transcation
+ retVal = 1;
+ }
+ m_lag_cnt = 0;
+ }
+
+#if 1
+ if (m_lag_cnt == 0 && lag == 0)
+ {
+ }
+ else if (lag > 0 && m_lag_cnt == 0)
+ {
+ g_eventLogger->info("part: %u : time to complete: %u",
+ m_log_part_no, lag);
+ }
+ else if (m_lag_cnt < maxlag_cnt && m_lag_cnt == save_lag_cnt)
+ {
+ g_eventLogger->info("part: %u : time to complete: %u lag_cnt: %u => %u => retVal: %d",
+ m_log_part_no,
+ lag,
+ save_lag_cnt,
+ m_lag_cnt,
+ retVal);
+ }
+ else
+ {
+ g_eventLogger->info("part: %u : sum_outstanding: %ukb avg_written: %ukb avg_elapsed: %ums time to complete: %u lag_cnt: %u => %u retVal: %d",
+ m_log_part_no, m_sum_outstanding_bytes / 1024, m_curr_written_bytes/1024, m_curr_elapsed_millis,
+ lag, save_lag_cnt, m_lag_cnt, retVal);
+ }
+#endif
+
+ /**
+ * And finally rotate sliding window
+ */
+ Uint32 last = (m_save_pos + 1) % SLIDING_WINDOW_HISTORY_LEN;
+ assert(m_curr_written_bytes >= m_save_written_bytes[last]);
+ assert(m_curr_elapsed_millis >= m_save_elapsed_millis[last]);
+ m_curr_written_bytes -= m_save_written_bytes[last];
+ m_curr_elapsed_millis -= m_save_elapsed_millis[last];
+ m_save_written_bytes[last] = 0;
+ m_save_elapsed_millis[last] = 0;
+ m_save_pos = last;
+ }
+ return retVal;
+}
+
+void
+Dblqh::IOTracker::send_io(Uint32 bytes)
+{
+ m_sum_outstanding_bytes += bytes;
+ m_sample_sent_bytes += bytes;
+}
+
+void
+Dblqh::IOTracker::complete_io(Uint32 bytes)
+{
+ assert(m_sum_outstanding_bytes >= bytes);
+
+ m_sum_outstanding_bytes -= bytes;
+ m_sample_completed_bytes += bytes;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2010-03-26 07:13:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2010-12-02 11:02:29 +0000
@@ -916,6 +916,7 @@ public:
Uint8 m_special_hash; // collation or distribution key
Uint8 m_no_hash; // Hash not required for LQH (special variant)
Uint8 m_no_disk_flag;
+ Uint8 m_op_queue;
Uint8 lenAiInTckeyreq; /* LENGTH OF ATTRIBUTE INFORMATION IN TCKEYREQ */
Uint8 fragmentDistributionKey; /* DIH generation no */
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2010-11-10 08:40:08 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2010-12-02 11:02:29 +0000
@@ -2785,6 +2785,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
Uint8 TNoDiskFlag = TcKeyReq::getNoDiskFlag(Treqinfo);
Uint8 TexecuteFlag = TexecFlag;
Uint8 Treorg = TcKeyReq::getReorgFlag(Treqinfo);
+ Uint8 Tqueue = TcKeyReq::getQueueOnRedoProblemFlag(Treqinfo);
if (Treorg)
{
@@ -2803,6 +2804,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
regCachePtr->opExec = TInterpretedFlag;
regCachePtr->distributionKeyIndicator = TDistrKeyFlag;
regCachePtr->m_no_disk_flag = TNoDiskFlag;
+ regCachePtr->m_op_queue = Tqueue;
//-------------------------------------------------------------
// The next step is to read the upto three conditional words.
@@ -3546,6 +3548,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
LqhKeyReq::setSimpleFlag(Tdata10, sig0);
LqhKeyReq::setOperation(Tdata10, sig1);
LqhKeyReq::setNoDiskFlag(Tdata10, regCachePtr->m_no_disk_flag);
+ LqhKeyReq::setQueueOnRedoProblemFlag(Tdata10, regCachePtr->m_op_queue);
/* -----------------------------------------------------------------------
* If we are sending a short LQHKEYREQ, then there will be some AttrInfo
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2010-11-22 14:27:12 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2010-12-02 11:02:29 +0000
@@ -249,6 +249,12 @@ const ConfigInfo::Typelib arbit_method_t
{ 0, 0 }
};
+static
+const ConfigInfo::Typelib default_operation_redo_problem_action_typelib [] = {
+ { "abort", OPERATION_REDO_PROBLEM_ACTION_ABORT },
+ { "queue", OPERATION_REDO_PROBLEM_ACTION_QUEUE },
+ { 0, 0 }
+};
/**
* The default constructors create objects with suitable values for the
@@ -1865,6 +1871,34 @@ const ConfigInfo::ParamInfo ConfigInfo::
"1" /* Max */
},
+ {
+ CFG_DB_REDO_OVERCOMMIT_LIMIT,
+ "RedoOverCommitLimit",
+ DB_TOKEN,
+ "Limit for how long it will take to flush current "
+ "RedoBuffer before action is taken (in seconds)",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "20", /* Default */
+ "0", /* Min */
+ STR_VALUE(MAX_INT_RNIL) /* Max */
+ },
+
+ {
+ CFG_DB_REDO_OVERCOMMIT_COUNTER,
+ "RedoOverCommitCounter",
+ DB_TOKEN,
+ "If RedoOverCommitLimit has been reached RedoOverCommitCounter"
+ " in a row times, transactions will be aborted",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "3", /* Default */
+ "0", /* Min */
+ STR_VALUE(MAX_INT_RNIL) /* Max */
+ },
+
/***************************************************************************
* API
***************************************************************************/
@@ -2059,6 +2093,20 @@ const ConfigInfo::ParamInfo ConfigInfo::
ConfigInfo::CI_STRING,
0, 0, 0 },
+ {
+ CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION,
+ "DefaultOperationRedoProblemAction",
+ API_TOKEN,
+ "If Redo-log is having problem, should operation default (unless overridden on transaction/operation level) abort or be put on queue"
+ " in a row times, transactions will be aborted",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_ENUM,
+ 0, /* default for ENUM doesnt seem to work... */
+ (const char*)default_operation_redo_problem_action_typelib,
+ 0
+ },
+
/****************************************************************************
* MGM
***************************************************************************/
=== modified file 'storage/ndb/src/ndbapi/NdbOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperation.cpp 2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperation.cpp 2010-12-02 11:02:29 +0000
@@ -170,11 +170,11 @@ NdbOperation::init(const NdbTableImpl* t
theBlobList = NULL;
m_abortOption = -1;
m_noErrorPropagation = false;
- m_no_disk_flag = 1;
+ m_flags = 0;
+ m_flags |= OF_NO_DISK;
m_interpreted_code = NULL;
m_extraSetValues = NULL;
m_numExtraSetValues = 0;
- m_use_any_value = 0;
tSignal = theNdb->getSignal();
if (tSignal == NULL)
@@ -196,6 +196,10 @@ NdbOperation::init(const NdbTableImpl* t
return -1;
}
m_customData = NULL;
+
+ if (theNdb->theImpl->get_ndbapi_config_parameters().m_default_queue_option)
+ m_flags |= OF_QUEUEABLE;
+
return 0;
}
=== modified file 'storage/ndb/src/ndbapi/NdbOperationDefine.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2010-12-02 11:02:29 +0000
@@ -379,7 +379,10 @@ NdbOperation::getValue_impl(const NdbCol
NdbRecAttr* tRecAttr;
if ((tAttrInfo != NULL) &&
(theStatus != Init)){
- m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
if (theStatus != GetValue) {
if (theStatus == UseNdbRecord)
/* This path for extra GetValues for NdbRecord */
@@ -438,7 +441,10 @@ NdbOperation::getValue_NdbRecord(const N
{
NdbRecAttr* tRecAttr;
- m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
/*
For getValue with NdbRecord operations, we just allocate the NdbRecAttr,
@@ -551,7 +557,10 @@ NdbOperation::setValue( const NdbColumnI
// Insert Attribute Id into ATTRINFO part.
tAttrId = tAttrInfo->m_attrId;
- m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
const char *aValue = aValuePassed;
if (aValue == NULL) {
if (tAttrInfo->m_nullable) {
@@ -1347,7 +1356,7 @@ NdbOperation::handleOperationOptions (co
{
/* Any operation can have an ANYVALUE set */
op->m_any_value = opts->anyValue;
- op->m_use_any_value = 1;
+ op->m_flags |= OF_USE_ANY_VALUE;
}
if (opts->optionsPresent & OperationOptions::OO_CUSTOMDATA)
@@ -1386,5 +1395,15 @@ NdbOperation::handleOperationOptions (co
}
}
+ if (opts->optionsPresent & OperationOptions::OO_QUEUABLE)
+ {
+ op->m_flags |= OF_QUEUEABLE;
+ }
+
+ if (opts->optionsPresent & OperationOptions::OO_NOT_QUEUABLE)
+ {
+ op->m_flags &= ~Uint8(OF_QUEUEABLE);
+ }
+
return 0;
}
=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2010-12-02 11:02:29 +0000
@@ -471,7 +471,8 @@ NdbOperation::prepareSend(Uint32 aTC_Con
Uint8 tCommitIndicator = theCommitIndicator;
Uint8 tStartIndicator = theStartIndicator;
Uint8 tInterpretIndicator = theInterpretIndicator;
- Uint8 tNoDisk = m_no_disk_flag;
+ Uint8 tNoDisk = (m_flags & OF_NO_DISK) != 0;
+ Uint8 tQueable = (m_flags & OF_QUEUEABLE) != 0;
/**
* A dirty read, can not abort the transaction
@@ -489,6 +490,7 @@ NdbOperation::prepareSend(Uint32 aTC_Con
tcKeyReq->setStartFlag(tReqInfo, tStartIndicator);
tcKeyReq->setInterpretedFlag(tReqInfo, tInterpretIndicator);
tcKeyReq->setNoDiskFlag(tReqInfo, tNoDisk);
+ tcKeyReq->setQueueOnRedoProblemFlag(tReqInfo, tQueable);
OperationType tOperationType = theOperationType;
Uint8 abortOption = (ao == DefaultAbortOption) ? (Uint8) m_abortOption : (Uint8) ao;
@@ -958,7 +960,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
attrInfoRemain= 0;
theATTRINFOptr= NULL;
- no_disk_flag= m_no_disk_flag;
+ no_disk_flag = (m_flags & OF_NO_DISK) != 0;
/* If we have an interpreted program then we add 5 words
* of section length information at the start of the
@@ -1074,7 +1076,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
}
}
- if (m_use_any_value &&
+ if (((m_flags & OF_USE_ANY_VALUE) != 0) &&
(tOpType == DeleteRequest))
{
/* Special hack for delete and ANYVALUE pseudo-column
@@ -1333,7 +1335,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
(tOpType == UpdateRequest))
{
/* Handle setAnyValue() for all cases except delete */
- if (m_use_any_value)
+ if ((m_flags & OF_USE_ANY_VALUE) != 0)
{
res= insertATTRINFOHdr_NdbRecord(AttributeHeader::ANY_VALUE, 4);
if(res)
=== modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2010-12-02 11:02:29 +0000
@@ -92,8 +92,10 @@ NdbOperation::incCheck(const NdbColumnIm
setErrorCodeAbort(4231);
return -1;
}
- m_no_disk_flag &=
- (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~(Uint8)OF_NO_DISK;
+ }
return tNdbColumnImpl->m_attrId;
} else {
if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -145,8 +147,10 @@ NdbOperation::write_attrCheck(const NdbC
setErrorCodeAbort(4231);
return -1;
}
- m_no_disk_flag &=
- (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~(Uint8)OF_NO_DISK;
+ }
return tNdbColumnImpl->m_attrId;
} else {
if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -196,8 +200,10 @@ NdbOperation::read_attrCheck(const NdbCo
setErrorCodeAbort(4231);
return -1;
}
- m_no_disk_flag &=
- (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~(Uint8)OF_NO_DISK;
+ }
return tNdbColumnImpl->m_attrId;
} else {
if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -1120,7 +1126,10 @@ NdbOperation::branch_col(Uint32 type,
}
}
- m_no_disk_flag &= (col->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+ if (col->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~(Uint8)OF_NO_DISK;
+ }
Uint32 tempData[ NDB_MAX_TUPLE_SIZE_IN_WORDS ];
if (((UintPtr)val & 3) != 0) {
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2010-12-02 11:02:29 +0000
@@ -170,8 +170,10 @@ NdbScanOperation::addInterpretedCode()
const NdbInterpretedCode* code= m_interpreted_code;
/* Any disk access? */
- m_no_disk_flag &=
- !(code->m_flags & NdbInterpretedCode::UsesDisk);
+ if (code->m_flags & NdbInterpretedCode::UsesDisk)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
/* Main program size depends on whether there's subroutines */
@@ -385,7 +387,7 @@ NdbScanOperation::generatePackedReadAIs(
}
if (col->flags & NdbRecord::IsDisk)
- m_no_disk_flag= false;
+ m_flags &= ~Uint8(OF_NO_DISK);
if (attrId > maxAttrId)
maxAttrId= attrId;
@@ -1370,7 +1372,7 @@ NdbScanOperation::processTableScanDefs(N
if (scan_flags & SF_DiskScan)
{
tupScan = true;
- m_no_disk_flag = false;
+ m_flags &= ~Uint8(OF_NO_DISK);
}
bool rangeScan= false;
@@ -2287,7 +2289,7 @@ int NdbScanOperation::prepareSendScan(Ui
*/
Uint32 reqInfo = req->requestInfo;
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
- ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
+ ScanTabReq::setNoDiskFlag(reqInfo, (m_flags & OF_NO_DISK) != 0);
/* Set distribution key info if required */
ScanTabReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
@@ -2901,8 +2903,10 @@ NdbScanOperation::getValue_NdbRecord_sca
NdbRecAttr *ra;
DBUG_PRINT("info", ("Column: %u", attrInfo->m_attrId));
- m_no_disk_flag &=
- (attrInfo->m_storageType == NDB_STORAGETYPE_MEMORY);
+ if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
res= insertATTRINFOHdr_NdbRecord(attrInfo->m_attrId, 0);
if (res==-1)
@@ -2936,9 +2940,12 @@ NdbScanOperation::getValue_NdbRecAttr_sc
/* Get a RecAttr object, which is linked in to the Receiver's
* RecAttr linked list, and return to caller
*/
- if (attrInfo != NULL) {
- m_no_disk_flag &=
- (attrInfo->m_storageType == NDB_STORAGETYPE_MEMORY);
+ if (attrInfo != NULL)
+ {
+ if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+ {
+ m_flags &= ~Uint8(OF_NO_DISK);
+ }
recAttr = theReceiver.getValue(attrInfo, aValue);
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2010-11-15 09:23:10 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2010-12-02 11:02:29 +0000
@@ -669,6 +669,12 @@ Ndb_cluster_connection_impl::configure(U
timeout = tmp1;
}
m_config.m_waitfor_timeout = timeout;
+
+ Uint32 queue = 0;
+ if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
+ {
+ m_config.m_default_queue_option = queue;
+ }
}
DBUG_RETURN(init_nodes_vector(nodeId, config));
}
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2010-11-15 09:23:10 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2010-12-02 11:02:29 +0000
@@ -43,13 +43,15 @@ struct NdbApiConfig
m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
m_batch_byte_size(SCAN_BATCH_SIZE),
m_batch_size(DEF_BATCH_SIZE),
- m_waitfor_timeout(120000)
+ m_waitfor_timeout(120000),
+ m_default_queue_option(0)
{}
Uint32 m_scan_batch_size;
Uint32 m_batch_byte_size;
Uint32 m_batch_size;
Uint32 m_waitfor_timeout; // in milli seconds...
+ Uint32 m_default_queue_option;
};
class Ndb_cluster_connection_impl : public Ndb_cluster_connection
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2010-05-03 04:49:08 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2010-12-02 11:02:29 +0000
@@ -188,6 +188,7 @@ ErrorBundle ErrorCodes[] = {
{ 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
{ 1218, DMEC, TR, "Send Buffers overloaded in NDB kernel" },
{ 1220, DMEC, TR, "REDO log files overloaded (increase FragmentLogFileSize)" },
+ { 1234, DMEC, TR, "REDO log files overloaded (increase disk hardware)" },
{ 1222, DMEC, TR, "Out of transaction markers in LQH" },
{ 4021, DMEC, TR, "Out of Send Buffer space in NDB API" },
{ 4022, DMEC, TR, "Out of Send Buffer space in NDB API" },
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (jonas:4010 to 4011) | Jonas Oreland | 2 Dec |