List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:December 2 2010 11:14am
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas:4010 to 4011)
View as plain text  
 4011 Jonas Oreland	2010-12-02 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/include/kernel/signaldata/LqhKey.hpp
      storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
      storage/ndb/include/ndbapi/NdbOperation.hpp
      storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
      storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/src/ndbapi/NdbOperation.cpp
      storage/ndb/src/ndbapi/NdbOperationDefine.cpp
      storage/ndb/src/ndbapi/NdbOperationExec.cpp
      storage/ndb/src/ndbapi/NdbOperationInt.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
      storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
      storage/ndb/src/ndbapi/ndberror.c
 4010 Jonas Oreland	2010-12-02 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp
      storage/ndb/src/mgmsrv/testConfig.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2010-11-01 16:11:10 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2010-12-02 11:02:29 +0000
@@ -151,6 +151,9 @@ private:
 
   static UintR getNrCopyFlag(const UintR & requestInfo);
   static void setNrCopyFlag(UintR & requestInfo, UintR val);
+
+  static UintR getQueueOnRedoProblemFlag(const UintR & requestInfo);
+  static void setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val);
 };
 
 /**
@@ -177,6 +180,7 @@ private:
  * z = Use rowid for insert   - 1  Bit (31)
  * g = gci flag               - 1  Bit (12)
  * n = NR copy                - 1  Bit (13)
+ * q = Queue on redo problem  - 1  Bit (14)
 
  * Short LQHKEYREQ :
  *             1111111111222222222233
@@ -214,6 +218,7 @@ private:
 #define RI_ROWID_SHIFT       (31)
 #define RI_GCI_SHIFT         (12)
 #define RI_NR_COPY_SHIFT     (13)
+#define RI_QUEUE_REDO_SHIFT  (14)
 
 /**
  * Scan Info
@@ -585,6 +590,20 @@ table_version_major_lqhkeyreq(Uint32 x)
   return x & 0xFFFF;
 }
 
+
+inline
+void
+LqhKeyReq::setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val){
+  ASSERT_BOOL(val, "LqhKeyReq::setQueueOnRedoProblem");
+  requestInfo |= (val << RI_QUEUE_REDO_SHIFT);
+}
+
+inline
+UintR
+LqhKeyReq::getQueueOnRedoProblemFlag(const UintR & requestInfo){
+  return (requestInfo >> RI_QUEUE_REDO_SHIFT) & 1;
+}
+
 class LqhKeyConf {
   /**
    * Reciver(s)

=== modified file 'storage/ndb/include/kernel/signaldata/TcKeyReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp	2010-11-10 12:28:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp	2010-12-02 11:02:29 +0000
@@ -202,6 +202,9 @@ private:
 
   static void setReorgFlag(UintR & requestInfo, UintR val);
   static UintR getReorgFlag(const UintR & requestInfo);
+
+  static void setQueueOnRedoProblemFlag(UintR & requestInfo, UintR val);
+  static UintR getQueueOnRedoProblemFlag(const UintR & requestInfo);
   /**
    * Set:ers for scanInfo
    */
@@ -230,6 +233,7 @@ private:
  y = Commit Type           - 2  Bit 12-13
  n = No disk flag          - 1  Bit 1
  r = reorg flag            - 1  Bit 19
+ q = Queue on redo problem - 1  Bit 9
 
            1111111111222222222233
  01234567890123456789012345678901
@@ -260,6 +264,7 @@ private:
 #define COMMIT_TYPE_MASK   (3)
 
 #define TC_REORG_SHIFT     (19)
+#define QUEUE_ON_REDO_SHIFT (9)
 
 /**
  * Scan Info
@@ -576,4 +581,17 @@ TcKeyReq::setReorgFlag(UintR & requestIn
   requestInfo |= (flag << TC_REORG_SHIFT);
 }
 
+inline
+UintR
+TcKeyReq::getQueueOnRedoProblemFlag(const UintR & requestInfo){
+  return (requestInfo >> QUEUE_ON_REDO_SHIFT) & 1;
+}
+
+inline
+void
+TcKeyReq::setQueueOnRedoProblemFlag(UintR & requestInfo, Uint32 flag){
+  ASSERT_BOOL(flag, "TcKeyReq::setNoDiskFlag");
+  requestInfo |= (flag << QUEUE_ON_REDO_SHIFT);
+}
+
 #endif

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2010-10-28 13:24:21 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2010-12-02 11:02:29 +0000
@@ -173,7 +173,8 @@
 #define CFG_DB_MAX_START_FAIL         609 /* For StopOnError=0 */
 #define CFG_DB_START_FAIL_DELAY_SECS  610 /* For StopOnError=0 */
 
-/* 611 & 612 reserved */
+#define CFG_DB_REDO_OVERCOMMIT_LIMIT  611
+#define CFG_DB_REDO_OVERCOMMIT_COUNTER 612
 
 #define CFG_DB_EVENTLOG_BUFFER_SIZE   613
 #define CFG_DB_NUMA                   614
@@ -248,6 +249,7 @@
 #define CFG_BATCH_SIZE                802
 #define CFG_AUTO_RECONNECT            803
 #define CFG_HB_THREAD_PRIO            804
+#define CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION 805
 
 /**
  * Internal
@@ -272,4 +274,7 @@
 #define ARBIT_METHOD_DEFAULT          1
 #define ARBIT_METHOD_WAITEXTERNAL     2
 
+#define OPERATION_REDO_PROBLEM_ACTION_ABORT 0
+#define OPERATION_REDO_PROBLEM_ACTION_QUEUE 1
+
 #endif

=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2010-10-13 09:33:02 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2010-12-02 11:02:29 +0000
@@ -1039,7 +1039,10 @@ public:
                  OO_INTERPRETED  = 0x10,
                  OO_ANYVALUE     = 0x20,
                  OO_CUSTOMDATA   = 0x40,
-                 OO_LOCKHANDLE   = 0x80 };
+                 OO_LOCKHANDLE   = 0x80,
+                 OO_QUEUABLE     = 0x100,
+                 OO_NOT_QUEUABLE = 0x200
+    };
 
     /* An operation-specific abort option.
      * Only necessary if the default abortoption behaviour
@@ -1427,12 +1430,20 @@ protected:
                                  // Note that scan operations always have this
                                  // set true
   Int8  theDistrKeyIndicator_;    // Indicates whether distr. key is used
-  Uint8  m_no_disk_flag;          
-  /*
-    For NdbRecord, this flag indicates that we need to send the Event-attached
-    word set by setAnyValue().
-  */
-  Uint8 m_use_any_value;
+
+  enum OP_FLAGS {
+    OF_NO_DISK = 0x1,
+
+    /*
+      For NdbRecord, this flag indicates that we need to send the Event-attached
+      word set by setAnyValue().
+    */
+    OF_USE_ANY_VALUE = 0x2,
+    OF_QUEUEABLE = 0x4
+  };
+  Uint8  m_flags;
+
+  Uint8 _unused1;
 
   Uint16 m_tcReqGSN;
   Uint16 m_keyInfoGSN;

=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2010-12-02 11:02:29 +0000
@@ -59,6 +59,8 @@ printLQHKEYREQ(FILE * output, const Uint
     fprintf(output, "NrCopy ");
   if(LqhKeyReq::getGCIFlag(reqInfo))
     fprintf(output, "GCI ");
+  if(LqhKeyReq::getQueueOnRedoProblemFlag(reqInfo))
+    fprintf(output, "Queue ");
   
   fprintf(output, "ScanInfo/noFiredTriggers: H\'%x\n", sig->scanInfo);
   

=== modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp	2010-01-28 15:16:46 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp	2010-12-02 11:02:29 +0000
@@ -72,8 +72,11 @@ printTCKEYREQ(FILE * output, const Uint3
       fprintf(output, "Interpreted ");
     }
     if(sig->getDistributionKeyFlag(sig->requestInfo)){
-      fprintf(output, " d-key");
+      fprintf(output, "d-key ");
     }
+    if(sig->getQueueOnRedoProblemFlag(sig->requestInfo))
+      fprintf(output, "Queue ");
+
     fprintf(output, "\n");
   }
   

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2010-10-28 12:59:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2010-12-02 11:02:29 +0000
@@ -316,6 +316,7 @@ class Lgman;
 #define ZDROP_TABLE_IN_PROGRESS 1226
 #define ZINVALID_SCHEMA_VERSION 1227
 #define ZTABLE_READ_ONLY 1233
+#define ZREDO_IO_PROBLEM 1234
 
 /* ------------------------------------------------------------------------- */
 /*       ERROR CODES ADDED IN VERSION 2.X                                    */
@@ -1022,6 +1023,54 @@ public:
     Uint32 m_outstanding;
   }; // Size 76 bytes
   typedef Ptr<LcpRecord> LcpRecordPtr;
+
+  struct IOTracker
+  {
+    STATIC_CONST( SAMPLE_TIME = 128 );              // millis
+    STATIC_CONST( SLIDING_WINDOW_LEN = 1024 );      // millis
+    STATIC_CONST( SLIDING_WINDOW_HISTORY_LEN = 8 );
+
+    void init(Uint32 partNo);
+    Uint32 m_log_part_no;
+    Uint32 m_current_time;
+
+    /**
+     * Keep sliding window of measurement
+     */
+    Uint32 m_save_pos; // current pos in array
+    Uint32 m_save_written_bytes[SLIDING_WINDOW_HISTORY_LEN];
+    Uint32 m_save_elapsed_millis[SLIDING_WINDOW_HISTORY_LEN];
+
+    /**
+     * Current sum of sliding window
+     */
+    Uint32 m_curr_written_bytes;
+    Uint32 m_curr_elapsed_millis;
+
+    /**
+     * Currently outstanding bytes
+     */
+    Uint32 m_sum_outstanding_bytes;
+
+    /**
+     * How many times did we pass lag-threshold
+     */
+    Uint32 m_lag_cnt;
+
+    /**
+     * bytes send during current sample
+     */
+    Uint32 m_sample_sent_bytes;
+
+    /**
+     * bytes completed during current sample
+     */
+    Uint32 m_sample_completed_bytes;
+
+    int tick(Uint32 now, Uint32 maxlag, Uint32 maxlag_cnt);
+    void send_io(Uint32 bytes);
+    void complete_io(Uint32 bytes);
+  };
     
   /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
   /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */
@@ -1062,11 +1111,7 @@ public:
       SR_THIRD_PHASE_COMPLETED = 5,
       SR_FOURTH_PHASE_STARTED = 6,    ///< Finding the log tail and head 
                                       ///< is the fourth phase.
-      SR_FOURTH_PHASE_COMPLETED = 7,
-      FILE_CHANGE_PROBLEM = 8         ///< For some reason the write to 
-                                      ///< page zero in file zero have not   
-                                      ///< finished after 15 mbyte of 
-                                      ///< log data have been written
+      SR_FOURTH_PHASE_COMPLETED = 7
     };
     enum WaitWriteGciLog {
       WWGL_TRUE = 0,
@@ -1128,10 +1173,6 @@ public:
      *       Contains a reference to the first log file, file number 0.
      */
     UintR firstLogfile;
-    /**
-     *       The head of the operations queued for logging.
-     */
-    UintR firstLogQueue;
     /** 
      *       This variable contains the oldest operation in this log
      *       part which have not been committed yet.
@@ -1154,10 +1195,25 @@ public:
      *       are in memory and which are not.
      */
     UintR lastPageRef;
+
+    struct OperationQueue
+    {
+      void init() { firstElement = lastElement = RNIL;}
+      bool isEmpty() const { return firstElement == RNIL; }
+      Uint32 firstElement;
+      Uint32 lastElement;
+    };
+
     /**
-     *       The tail of the operations queued for logging.                   
+     * operations queued waiting on REDO to prepare
      */
-    UintR lastLogQueue;
+    struct OperationQueue m_log_prepare_queue;
+
+    /**
+     * operations queued waiting on REDO to commit/abort
+     */
+    struct OperationQueue m_log_complete_queue;
+
     /**
      *       This variable contains the newest operation in this log
      *       part which have not been committed yet.
@@ -1202,7 +1258,12 @@ public:
     /**
      * does current log-part have tail-problem (i.e 410)
      */
-    bool m_tail_problem;
+    enum {
+      P_TAIL_PROBLEM        = 0x1,// 410
+      P_REDO_IO_PROBLEM     = 0x2,// 1234
+      P_FILE_CHANGE_PROBLEM = 0x4 // 1220
+    };
+    Uint32 m_log_problems;
 
     /**
      *       A timer that is set every time a log page is sent to disk.
@@ -1354,6 +1415,11 @@ public:
      *       For MT LQH the log part (0-3).
      */
     Uint16 logPartNo;
+
+    /**
+     * IO tracker...
+     */
+    struct IOTracker m_io_tracker;
   }; // Size 164 Bytes
   typedef Ptr<LogPartRecord> LogPartRecordPtr;
   
@@ -2340,7 +2406,7 @@ private:
                    LogFileRecordPtr* parLogFilePtr);
   void findPageRef(Signal* signal, CommitLogRecord* commitLogRecord);
   int  findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec);
-  void getFirstInLogQueue(Signal* signal);
+  void getFirstInLogQueue(Signal* signal, Ptr<TcConnectionrec>&dst);
   bool getFragmentrec(Signal* signal, Uint32 fragId);
   void initialiseAddfragrec(Signal* signal);
   void initialiseFragrec(Signal* signal);
@@ -2372,7 +2438,7 @@ private:
   void initReqinfoExecSr(Signal* signal);
   bool insertFragrec(Signal* signal, Uint32 fragId);
   void linkFragQueue(Signal* signal);
-  void linkWaitLog(Signal* signal, LogPartRecordPtr regLogPartPtr);
+  void linkWaitLog(Signal*, LogPartRecordPtr, LogPartRecord::OperationQueue &);
   void logNextStart(Signal* signal);
   void moveToPageRef(Signal* signal);
   void readAttrinfo(Signal* signal);
@@ -2464,6 +2530,8 @@ private:
   void abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode);
   void localAbortStateHandlerLab(Signal* signal);
   void logLqhkeyreqLab(Signal* signal);
+  void logLqhkeyreqLab_problems(Signal* signal);
+  void update_log_problem(Signal*, LogPartRecordPtr, Uint32 problem, bool);
   void lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length);
   void rwConcludedAiLab(Signal* signal);
   void aiStateErrorCheckLab(Signal* signal, Uint32* dataPtr, Uint32 length);
@@ -3163,6 +3231,8 @@ public:
 
   } c_Counters;
 
+  Uint32 c_max_redo_lag;
+  Uint32 c_max_redo_lag_counter;
   Uint64 cTotalLqhKeyReqCount;
 
   inline bool getAllowRead() const {

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2010-03-10 07:43:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2010-12-02 11:02:29 +0000
@@ -87,6 +87,8 @@ void Dblqh::initData() 
   c_free_mb_tail_problem_limit = 4;
 
   cTotalLqhKeyReqCount = 0;
+  c_max_redo_lag = 30; // seconds
+  c_max_redo_lag_counter = 3; // 3 strikes and you're out
 }//Dblqh::initData()
 
 void Dblqh::initRecords() 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2010-11-24 12:16:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2010-12-02 11:02:29 +0000
@@ -79,6 +79,9 @@
 #include "../suma/Suma.hpp"
 #include "DblqhCommon.hpp"
 
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
+
 // Use DEBUG to print messages that should be
 // seen only when we debug the product
 #ifdef VM_TRACE
@@ -286,94 +289,126 @@ void Dblqh::execCONTINUEB(Signal* signal
     return;
   }//if
 #endif
+  LogPartRecordPtr save;
   switch (tcase) {
   case ZLOG_LQHKEYREQ:
     if (cnoOfLogPages == 0) {
       jam();
+  busywait:
       sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
       return;
     }//if
     logPartPtr.i = data0;
     ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+    save = logPartPtr;
+
+    logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
+
+    if (logPartPtr.p->waitWriteGciLog == LogPartRecord::WWGL_TRUE)
+    {
+      jam();
+      goto startnext;
+    }
+    if (logPartPtr.p->m_log_complete_queue.isEmpty())
+    {
+      jam();
+      /**
+       * prepare is first in queue...check that it's ok to rock'n'roll
+       */
+      if (logPartPtr.p->m_log_problems != 0)
+      {
+        /**
+         * It will be restarted when problems are cleared...
+         */
+        jam();
+        return;
+      }
+
+      if (cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION)
+      {
+        jam();
+        logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
+        goto busywait;
+      }
+    }
+
     logFilePtr.i = logPartPtr.p->currentLogfile;
     ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
     logPagePtr.i = logFilePtr.p->currentLogpage;
     ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
 
-    tcConnectptr.i = logPartPtr.p->firstLogQueue;
-    ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
+    getFirstInLogQueue(signal, tcConnectptr);
     fragptr.i = tcConnectptr.p->fragmentptr;
     c_fragment_pool.getPtr(fragptr);
-    logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
-    getFirstInLogQueue(signal);
 
+    // so that operation can continue...
+    ndbrequire(logPartPtr.p->logPartState == LogPartRecord::ACTIVE);
+    logPartPtr.p->logPartState = LogPartRecord::IDLE;
     switch (tcConnectptr.p->transactionState) {
     case TcConnectionrec::LOG_QUEUED:
-      if (tcConnectptr.p->abortState != TcConnectionrec::ABORT_IDLE) {
+      if (tcConnectptr.p->abortState != TcConnectionrec::ABORT_IDLE)
+      {
         jam();
-        logNextStart(signal);
         abortCommonLab(signal);
-        return;
-      } else {
+      }
+      else
+      {
         jam();
-/*------------------------------------------------------------*/
-/*       WE MUST SET THE STATE OF THE LOG PART TO IDLE TO     */
-/*       ENSURE THAT WE ARE NOT QUEUED AGAIN ON THE LOG PART  */
-/*       WE WILL SET THE LOG PART STATE TO ACTIVE IMMEDIATELY */
-/*       SO NO OTHER PROCESS WILL SEE THIS STATE. IT IS MERELY*/
-/*       USED TO ENABLE REUSE OF CODE.                        */
-/*------------------------------------------------------------*/
-        if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
-          jam();
-          logPartPtr.p->logPartState = LogPartRecord::IDLE;
-        }//if
         logLqhkeyreqLab(signal);
-        return;
-      }//if
+      }
       break;
     case TcConnectionrec::LOG_ABORT_QUEUED:
       jam();
       writeAbortLog(signal);
       removeLogTcrec(signal);
-      logNextStart(signal);
       continueAfterLogAbortWriteLab(signal);
-      return;
       break;
     case TcConnectionrec::LOG_COMMIT_QUEUED:
     case TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL:
       jam();
       writeCommitLog(signal, logPartPtr);
-      logNextStart(signal);
       if (tcConnectptr.p->transactionState == TcConnectionrec::LOG_COMMIT_QUEUED) {
         if (tcConnectptr.p->seqNoReplica == 0 ||
-	    tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY) {
+	    tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY)
+        {
           jam();
           localCommitLab(signal);
-        } else {
+        }
+        else
+        {
           jam();
           commitReplyLab(signal);
-        }//if
-        return;
-      } else {
+        }
+      }
+      else
+      {
         jam();
         tcConnectptr.p->transactionState = TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL;
-        return;
-      }//if
+      }
       break;
     case TcConnectionrec::COMMIT_QUEUED:
       jam();
-      logNextStart(signal);
       localCommitLab(signal);
       break;
     case TcConnectionrec::ABORT_QUEUED:
       jam();
-      logNextStart(signal);
       abortCommonLab(signal);
       break;
     default:
       ndbrequire(false);
       break;
     }//switch
+
+    /**
+     * LogFile/LogPage could have altered due to above
+     */
+  startnext:
+    logPartPtr = save;
+    logFilePtr.i = logPartPtr.p->currentLogfile;
+    ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
+    logPagePtr.i = logFilePtr.p->currentLogpage;
+    ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
+    logNextStart(signal);
     return;
     break;
   case ZSR_GCI_LIMITS:
@@ -751,6 +786,7 @@ void Dblqh::execNDB_STTOR(Signal* signal
     // Dont setReturnedReadLenAIFlag
     // Dont setAPIVersion
     LqhKeyReq::setMarkerFlag(preComputedRequestInfoMask, 1);
+    LqhKeyReq::setQueueOnRedoProblemFlag(preComputedRequestInfoMask, 1);
     //preComputedRequestInfoMask = 0x003d7fff;
     startphase1Lab(signal, /* dummy */ ~0, ownNodeId);
 
@@ -1288,6 +1324,13 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* 
   initRecords();
   initialiseRecordsLab(signal, 0, ref, senderData);
 
+  c_max_redo_lag = 30;
+  ndb_mgm_get_int_parameter(p, CFG_DB_REDO_OVERCOMMIT_LIMIT,
+                            &c_max_redo_lag);
+
+  c_max_redo_lag_counter = 3;
+  ndb_mgm_get_int_parameter(p, CFG_DB_REDO_OVERCOMMIT_COUNTER,
+                            &c_max_redo_lag_counter);
   return;
 }//Dblqh::execSIZEALT_REP()
 
@@ -2756,12 +2799,40 @@ Dblqh::wait_readonly(Signal* signal)
 void Dblqh::execTIME_SIGNAL(Signal* signal)
 {
   jamEntry();
-  cLqhTimeOutCount++;
-  cLqhTimeOutCheckCount++;
+
+  cLqhTimeOutCount ++;
+  cLqhTimeOutCheckCount ++;
+
+  for (logPartPtr.i = 0; logPartPtr.i < clogPartFileSize; logPartPtr.i++)
+  {
+    jam();
+    ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+    int ret = logPartPtr.p->m_io_tracker.tick(10 * cLqhTimeOutCount,
+                                              c_max_redo_lag,
+                                              c_max_redo_lag_counter);
+    if (ret < 0)
+    {
+      /**
+       * set problem
+       */
+      update_log_problem(signal, logPartPtr,
+                         LogPartRecord::P_REDO_IO_PROBLEM, true);
+    }
+    else if (ret > 0)
+    {
+      /**
+       * clear
+       */
+      update_log_problem(signal, logPartPtr,
+                         LogPartRecord::P_REDO_IO_PROBLEM, false);
+    }
+  }
+
   if (cLqhTimeOutCheckCount < 1000) {
     jam();
     return;
   }//if
+
   cLqhTimeOutCheckCount = 0;
 #ifdef VM_TRACE
   TcConnectionrecPtr tTcConptr;
@@ -6132,26 +6203,14 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
   UintR tcurrentFilepage;
   TcConnectionrecPtr tmpTcConnectptr;
 
-  if (unlikely(cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION) || 
-      ERROR_INSERTED(5032))
-  {
-    jam();
-    if(ERROR_INSERTED(5032)){
-      CLEAR_ERROR_INSERT_VALUE;
-    }
-/*---------------------------------------------------------------------------*/
-// The log disk is having problems in catching up with the speed of execution. 
-// We must wait with writing the log of this operation to ensure we do not 
-// overload the log.
-/*---------------------------------------------------------------------------*/
-    terrorCode = ZTEMPORARY_REDO_LOG_FAILURE;
-    abortErrorLab(signal);
-    return;
-  }//if
+  const bool out_of_log_buffer = cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION;
 
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   logPartPtr.i = regTcPtr->m_log_part_ptr_i;
   ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+  bool abort_on_redo_problems =
+    (LqhKeyReq::getQueueOnRedoProblemFlag(regTcPtr->reqinfo) == 0);
+
 /* -------------------------------------------------- */
 /*       THIS PART IS USED TO WRITE THE LOG           */
 /* -------------------------------------------------- */
@@ -6161,69 +6220,38 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
 /*       RESTART WHEN THE LOG PART IS FREE AGAIN.     */
 /* -------------------------------------------------- */
   LogPartRecord * const regLogPartPtr = logPartPtr.p;
-
-  if (unlikely(regLogPartPtr->m_tail_problem))
+  const bool problem = out_of_log_buffer || regLogPartPtr->m_log_problems != 0;
+  if (unlikely(problem))
   {
-    jam();
-    terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
-    abortErrorLab(signal);
-    return;
-  }
-
-  if(ERROR_INSERTED(5033)){
-    jam();
-    CLEAR_ERROR_INSERT_VALUE;
-
-    if ((regLogPartPtr->firstLogQueue != RNIL) &&
-        (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
-      /* -------------------------------------------------- */
-      /*       WE HAVE A PROBLEM IN THAT THE LOG HAS NO     */
-      /*       ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
-      /* -------------------------------------------------- */
-      /* -------------------------------------------------- */
-      /*       WE MUST STILL RESTART QUEUED OPERATIONS SO   */
-      /*       THEY ALSO CAN BE ABORTED.                    */
-      /* -------------------------------------------------- */
-      regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
-      signal->theData[0] = ZLOG_LQHKEYREQ;
-      signal->theData[1] = logPartPtr.i;
-      sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-    }//if
-    
-    terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
-    abortErrorLab(signal);
-    return;
+    if (abort_on_redo_problems)
+    {
+      logLqhkeyreqLab_problems(signal);
+      return;
+    }
+    else
+    {
+      goto queueop;
+    }
   }
   
-  if (regLogPartPtr->logPartState == LogPartRecord::IDLE) {
+  if (regLogPartPtr->logPartState == LogPartRecord::IDLE)
+  {
     ;
-  } else if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) {
+  }
+  else if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE)
+  {
+queueop:
     jam();
-    linkWaitLog(signal, logPartPtr);
+    linkWaitLog(signal, logPartPtr, logPartPtr.p->m_log_prepare_queue);
     regTcPtr->transactionState = TcConnectionrec::LOG_QUEUED;
     return;
-  } else {
-    if ((regLogPartPtr->firstLogQueue != RNIL) &&
-        (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
-/* -------------------------------------------------- */
-/*       WE HAVE A PROBLEM IN THAT THE LOG HAS NO     */
-/*       ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
-/* -------------------------------------------------- */
-/* -------------------------------------------------- */
-/*       WE MUST STILL RESTART QUEUED OPERATIONS SO   */
-/*       THEY ALSO CAN BE ABORTED.                    */
-/* -------------------------------------------------- */
-      regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
-      signal->theData[0] = ZLOG_LQHKEYREQ;
-      signal->theData[1] = logPartPtr.i;
-      sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-    }//if
-    ndbrequire(regLogPartPtr->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM);
-    terrorCode = ZFILE_CHANGE_PROBLEM_IN_LOG_ERROR;
-    abortErrorLab(signal);
+  }
+  else
+  {
+    ndbrequire(false);
     return;
   }//if
-  regLogPartPtr->logPartState = LogPartRecord::ACTIVE;
+
   logFilePtr.i = regLogPartPtr->currentLogfile;
   ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
 /* -------------------------------------------------- */
@@ -6279,7 +6307,6 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
 /* -------------------------------------------------- */
   writeAttrinfoLab(signal);
 
-  logNextStart(signal);
 /* -------------------------------------------------- */
 /*       RESET THE STATE OF THE LOG PART. IF ANY      */
 /*       OPERATIONS HAVE QUEUED THEN START THE FIRST  */
@@ -6321,6 +6348,80 @@ void Dblqh::logLqhkeyreqLab(Signal* sign
   }//if
 }//Dblqh::logLqhkeyreqLab()
 
+void
+Dblqh::logLqhkeyreqLab_problems(Signal * signal)
+{
+  jam();
+  LogPartRecord * const regLogPartPtr = logPartPtr.p;
+  Uint32 problems = regLogPartPtr->m_log_problems;
+
+  if (cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION)
+  {
+    jam();
+    terrorCode = ZTEMPORARY_REDO_LOG_FAILURE;
+  }
+  else if ((problems & LogPartRecord::P_TAIL_PROBLEM) != 0)
+  {
+    jam();
+    terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
+  }
+  else if ((problems & LogPartRecord::P_REDO_IO_PROBLEM) != 0)
+  {
+    jam();
+    terrorCode = ZREDO_IO_PROBLEM;
+  }
+  else if ((problems & LogPartRecord::P_FILE_CHANGE_PROBLEM) != 0)
+  {
+    jam();
+    terrorCode = ZFILE_CHANGE_PROBLEM_IN_LOG_ERROR;
+  }
+  abortErrorLab(signal);
+}
+
+void
+Dblqh::update_log_problem(Signal* signal, Ptr<LogPartRecord> partPtr,
+                          Uint32 problem, bool value)
+{
+  Uint32 problems = partPtr.p->m_log_problems;
+  if (value)
+  {
+    /**
+     * set
+     */
+    jam();
+    if ((problems & problem) == 0)
+    {
+      jam();
+      problems |= problem;
+    }
+  }
+  else
+  {
+    /**
+     * clear
+     */
+    jam();
+    if ((problems & problem) != 0)
+    {
+      jam();
+      problems &= ~(Uint32)problem;
+
+      if (partPtr.p->LogLqhKeyReqSent == ZFALSE &&
+          (!partPtr.p->m_log_prepare_queue.isEmpty() ||
+           !partPtr.p->m_log_complete_queue.isEmpty()))
+      {
+        jam();
+
+        partPtr.p->LogLqhKeyReqSent = ZTRUE;
+        signal->theData[0] = ZLOG_LQHKEYREQ;
+        signal->theData[1] = partPtr.i;
+        sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+      }
+    }
+  }
+  partPtr.p->m_log_problems = problems;
+}
+
 /* ------------------------------------------------------------------------- */
 /* -------                        SEND LQHKEYREQ                             */
 /*                                                                           */
@@ -7585,8 +7686,9 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
     jam();
     regLogPartPtr.i = regTcPtr->m_log_part_ptr_i;
     ptrCheckGuard(regLogPartPtr, clogPartFileSize, logPartRecord);
-    if ((regLogPartPtr.p->logPartState == LogPartRecord::ACTIVE) ||
-        (noOfLogPages == 0)) {
+    if (!regLogPartPtr.p->m_log_complete_queue.isEmpty() ||
+        (noOfLogPages == 0))
+    {
       jam();
 /*---------------------------------------------------------------------------*/
 /* THIS LOG PART WAS CURRENTLY ACTIVE WRITING ANOTHER LOG RECORD. WE MUST    */
@@ -7597,7 +7699,7 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
 // log part to ensure that we don't get a buffer explosion in the delayed
 // signal buffer instead.
 /*---------------------------------------------------------------------------*/
-      linkWaitLog(signal, regLogPartPtr);
+      linkWaitLog(signal, regLogPartPtr, regLogPartPtr.p->m_log_complete_queue);
       if (transState == TcConnectionrec::PREPARED) {
         jam();
         regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL;
@@ -7606,10 +7708,6 @@ void Dblqh::execLQH_WRITELOG_REQ(Signal*
         ndbrequire(transState == TcConnectionrec::PREPARED_RECEIVED_COMMIT);
         regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED;
       }//if
-      if (regLogPartPtr.p->logPartState == LogPartRecord::IDLE) {
-        jam();
-        regLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-      }//if
       return;
     }//if
     writeCommitLog(signal, regLogPartPtr);
@@ -8588,7 +8686,9 @@ void Dblqh::continueAbortLab(Signal* sig
      * TRANSACTION.
      * ---------------------------------------------------------------------- */
     initLogPointers(signal);
-    if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
+    if (cnoOfLogPages == 0 ||
+        !logPartPtr.p->m_log_complete_queue.isEmpty())
+    {
       jam();
       /* --------------------------------------------------------------------
        * A PREPARE OPERATION IS CURRENTLY WRITING IN THE LOG. 
@@ -8596,24 +8696,8 @@ void Dblqh::continueAbortLab(Signal* sig
        * IT IS NECESSARY TO WRITE ONE LOG RECORD COMPLETELY 
        * AT A TIME OTHERWISE WE WILL SCRAMBLE THE LOG.
        * -------------------------------------------------------------------- */
-      linkWaitLog(signal, logPartPtr);
-      regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
-      return;
-    }//if
-    if (cnoOfLogPages == 0) {
-      jam();
-/*---------------------------------------------------------------------------*/
-// We must delay the write of commit info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
-      linkWaitLog(signal, logPartPtr);
+      linkWaitLog(signal, logPartPtr, logPartPtr.p->m_log_complete_queue);
       regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
-      if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
-        jam();
-        logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-      }//if
       return;
     }//if
     writeAbortLog(signal);
@@ -13692,7 +13776,8 @@ retry:
       if (tailmoved && mb > c_free_mb_tail_problem_limit)
       {
         jam();
-        sltLogPartPtr.p->m_tail_problem = false;
+        update_log_problem(signal, sltLogPartPtr,
+                           LogPartRecord::P_TAIL_PROBLEM, false);
       }
       else if (!tailmoved && mb <= c_free_mb_force_lcp_limit)
       {
@@ -13941,6 +14026,14 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
       jam();
       logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_TRUE;
       tlogActive = true;
+      if (logPartPtr.p->LogLqhKeyReqSent == ZFALSE)
+      {
+        jam();
+        logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
+        signal->theData[0] = ZLOG_LQHKEYREQ;
+        signal->theData[1] = logPartPtr.i;
+        sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+      }
     } else {
       jam();
       logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_FALSE;
@@ -14690,6 +14783,13 @@ void Dblqh::initFsrwconf(Signal* signal,
   logP= logPagePtr;
   noPages= 1;
   ndbassert(totPages > 0);
+
+  if (write)
+  {
+    Uint32 bytesWritten = totPages * 32768;
+    logPartPtr.p->m_io_tracker.complete_io(bytesWritten);
+  }
+
   for (;;)
   {
     logP.p->logPageWord[ZPOS_IN_WRITING]= 0;
@@ -14703,6 +14803,7 @@ void Dblqh::initFsrwconf(Signal* signal,
     ptrCheckGuard(logP, clogPageFileSize, logPageRecord);
     noPages++;
   }
+
 }//Dblqh::initFsrwconf()
 
 /* ######################################################################### */
@@ -14730,32 +14831,8 @@ void Dblqh::timeSup(Signal* signal) 
   ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
   if (logPartPtr.p->logPartTimer != logPartPtr.p->logTimer) {
     jam();
-/*--------------------------------------------------------------------------*/
-/*       THIS LOG PART HAS NOT WRITTEN TO DISK DURING THE LAST SECOND.      */
-/*--------------------------------------------------------------------------*/
-    switch (logPartPtr.p->logPartState) {
-    case LogPartRecord::FILE_CHANGE_PROBLEM:
-      jam();
-/*--------------------------------------------------------------------------*/
-/*       THIS LOG PART HAS PROBLEMS IN CHANGING FILES MAKING IT IMPOSSIBLE  */
-//       TO WRITE TO THE FILE CURRENTLY. WE WILL COMEBACK LATER AND SEE IF
-//       THE PROBLEM HAS BEEN FIXED.
-/*--------------------------------------------------------------------------*/
-    case LogPartRecord::ACTIVE:
-      jam();
-/*---------------------------------------------------------------------------*/
-/* AN OPERATION IS CURRENTLY ACTIVE IN WRITING THIS LOG PART. WE THUS CANNOT */
-/* WRITE ANYTHING TO DISK AT THIS MOMENT. WE WILL SEND A SIGNAL DELAYED FOR  */
-/* 10 MS AND THEN TRY AGAIN. POSSIBLY THE LOG PART WILL HAVE BEEN WRITTEN    */
-/* UNTIL THEN OR ELSE IT SHOULD BE FREE TO WRITE AGAIN.                      */
-/*---------------------------------------------------------------------------*/
-      signal->theData[0] = ZTIME_SUPERVISION;
-      signal->theData[1] = logPartPtr.i;
-      sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
-      return;
-      break;
-    case LogPartRecord::IDLE:
-      jam();
+    if (true) // less merge conflicts
+    {
 /*---------------------------------------------------------------------------*/
 /* IDLE AND NOT WRITTEN TO DISK IN A SECOND. ALSO WHEN WE HAVE A TAIL PROBLEM*/
 /* WE HAVE TO WRITE TO DISK AT TIMES. WE WILL FIRST CHECK WHETHER ANYTHING   */
@@ -14817,12 +14894,9 @@ void Dblqh::timeSup(Signal* signal) 
           }//if
         }//if
       }//if
-      break;
-    default:
-      ndbrequire(false);
-      break;
-    }//switch
-  }//if
+    }
+  }
+
   logPartPtr.p->logTimer++;
   return;
 }//Dblqh::timeSup()
@@ -15049,29 +15123,14 @@ void Dblqh::lastWriteInFileLab(Signal* s
 
 void Dblqh::writePageZeroLab(Signal* signal, Uint32 from) 
 {
-  if (logPartPtr.p->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM) 
+  if ((logPartPtr.p->m_log_problems & LogPartRecord::P_FILE_CHANGE_PROBLEM)!= 0)
   {
-    if (logPartPtr.p->firstLogQueue == RNIL) 
-    {
-      jam();
-      logPartPtr.p->logPartState = LogPartRecord::IDLE;
-    } 
-    else 
-    {
-      jam();
-      logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-      if (logPartPtr.p->LogLqhKeyReqSent == ZFALSE)
-      {
-        jam();
-
-        logPartPtr.p->LogLqhKeyReqSent = ZTRUE;
-        signal->theData[0] = ZLOG_LQHKEYREQ;
-        signal->theData[1] = logPartPtr.i;
-        sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-      }
-    }
+    jam();
+    update_log_problem(signal, logPartPtr,
+                       LogPartRecord::P_FILE_CHANGE_PROBLEM,
+                       /* clear */ false);
   }
-  
+
   logFilePtr.p->fileChangeState = LogFileRecord::NOT_ONGOING;
 
 /*---------------------------------------------------------------------------*/
@@ -15797,6 +15856,8 @@ void Dblqh::writeSinglePage(Signal* sign
 
   ndbrequire(logFilePtr.p->fileRef != RNIL);
 
+  logPartPtr.p->m_io_tracker.send_io(32768);
+
   if (DEBUG_REDO)
   {
     ndbout_c("writeSingle 1 page at part: %u file: %u pos: %u",
@@ -16499,7 +16560,8 @@ Dblqh::rebuildOrderedIndexes(Signal* sig
       if (mb <= c_free_mb_tail_problem_limit)
       {
         jam();
-        logPartPtr.p->m_tail_problem = true;
+        update_log_problem(signal, logPartPtr,
+                           LogPartRecord::P_TAIL_PROBLEM, true);
       }
     }
     
@@ -19195,6 +19257,8 @@ void Dblqh::completedLogPage(Signal* sig
 
   ndbrequire(logFilePtr.p->fileRef != RNIL);
 
+  logPartPtr.p->m_io_tracker.send_io(32768*twlpNoPages);
+
   if (DEBUG_REDO)
   {
     ndbout_c("writing %d pages at part: %u file: %u pos: %u",
@@ -19424,20 +19488,30 @@ void Dblqh::findPageRef(Signal* signal, 
 /*                                                                           */
 /*      SUBROUTINE SHORT NAME = GFL                                          */
 /* ------------------------------------------------------------------------- */
-void Dblqh::getFirstInLogQueue(Signal* signal) 
+void
+Dblqh::getFirstInLogQueue(Signal* signal,
+                          Ptr<TcConnectionrec> & dst)
 {
-  TcConnectionrecPtr gflTcConnectptr;
+  TcConnectionrecPtr tmp;
 /* -------------------------------------------------- */
 /*       GET THE FIRST FROM THE LOG QUEUE AND REMOVE  */
 /*       IT FROM THE QUEUE.                           */
 /* -------------------------------------------------- */
-  gflTcConnectptr.i = logPartPtr.p->firstLogQueue;
-  ptrCheckGuard(gflTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  logPartPtr.p->firstLogQueue = gflTcConnectptr.p->nextTcLogQueue;
-  if (logPartPtr.p->firstLogQueue == RNIL) {
+  LogPartRecord::OperationQueue * queue = &logPartPtr.p->m_log_complete_queue;
+  tmp.i = queue->firstElement;
+  if (tmp.i == RNIL)
+  {
     jam();
-    logPartPtr.p->lastLogQueue = RNIL;
+    queue = &logPartPtr.p->m_log_prepare_queue;
+    tmp.i = queue->firstElement;
+  }
+  ptrCheckGuard(tmp, ctcConnectrecFileSize, tcConnectionrec);
+  queue->firstElement = tmp.p->nextTcLogQueue;
+  if (queue->firstElement == RNIL) {
+    jam();
+    queue->lastElement = RNIL;
   }//if
+  dst = tmp;
 }//Dblqh::getFirstInLogQueue()
 
 /* ---------------------------------------------------------------- */
@@ -19977,18 +20051,19 @@ void Dblqh::initLogpart(Signal* signal) 
   logPartPtr.p->logExecState = LogPartRecord::LES_IDLE;
   logPartPtr.p->firstLogTcrec = RNIL;
   logPartPtr.p->lastLogTcrec = RNIL;
-  logPartPtr.p->firstLogQueue = RNIL;
-  logPartPtr.p->lastLogQueue = RNIL;
   logPartPtr.p->gcprec = RNIL;
   logPartPtr.p->firstPageRef = RNIL;
   logPartPtr.p->lastPageRef = RNIL;
   logPartPtr.p->headFileNo = ZNIL;
   logPartPtr.p->headPageNo = ZNIL;
   logPartPtr.p->headPageIndex = ZNIL;
-  logPartPtr.p->m_tail_problem = 0;
+  logPartPtr.p->m_log_problems = 0;
   NdbLogPartInfo lpinfo(instance());
   ndbrequire(lpinfo.partCount == clogPartFileSize);
   logPartPtr.p->logPartNo = lpinfo.partNo[logPartPtr.i];
+  logPartPtr.p->m_io_tracker.init(logPartPtr.p->logPartNo);
+  logPartPtr.p->m_log_prepare_queue.init();
+  logPartPtr.p->m_log_complete_queue.init();
 }//Dblqh::initLogpart()
 
 /* ========================================================================== 
@@ -20121,26 +20196,30 @@ void Dblqh::linkFragQueue(Signal* signal
 // tcConnectptr
 // logPartPtr
  * ------------------------------------------------------------------------- */
-void Dblqh::linkWaitLog(Signal* signal, LogPartRecordPtr regLogPartPtr) 
+void
+Dblqh::linkWaitLog(Signal* signal,
+                   LogPartRecordPtr regLogPartPtr,
+                   LogPartRecord::OperationQueue & queue)
 {
   TcConnectionrecPtr lwlTcConnectptr;
-
 /* -------------------------------------------------- */
 /*       LINK ACTIVE OPERATION INTO QUEUE WAITING FOR */
 /*       ACCESS TO THE LOG PART.                      */
 /* -------------------------------------------------- */
-  lwlTcConnectptr.i = regLogPartPtr.p->lastLogQueue;
+  lwlTcConnectptr.i = queue.lastElement;
   if (lwlTcConnectptr.i == RNIL) {
     jam();
-    regLogPartPtr.p->firstLogQueue = tcConnectptr.i;
+    queue.firstElement = tcConnectptr.i;
   } else {
     jam();
     ptrCheckGuard(lwlTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
     lwlTcConnectptr.p->nextTcLogQueue = tcConnectptr.i;
   }//if
-  regLogPartPtr.p->lastLogQueue = tcConnectptr.i;
+  queue.lastElement = tcConnectptr.i;
   tcConnectptr.p->nextTcLogQueue = RNIL;
-  if (regLogPartPtr.p->LogLqhKeyReqSent == ZFALSE) {
+  regLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
+  if (regLogPartPtr.p->LogLqhKeyReqSent == ZFALSE)
+  {
     jam();
     regLogPartPtr.p->LogLqhKeyReqSent = ZTRUE;
     signal->theData[0] = ZLOG_LQHKEYREQ;
@@ -20164,36 +20243,38 @@ void Dblqh::logNextStart(Signal* signal)
   UintR tlnsStillWaiting;
   LogPartRecord * const regLogPartPtr = logPartPtr.p;
 
-  if ((regLogPartPtr->firstLogQueue == RNIL) &&
-      (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) &&
-      (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE)) {
+  if (regLogPartPtr->m_log_prepare_queue.isEmpty() &&
+      regLogPartPtr->m_log_complete_queue.isEmpty() &&
+      (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE))
+  {
 // --------------------------------------------------------------------------
 // Optimised route for the common case
 // -------------------------------------------------------------------------- 
-    regLogPartPtr->logPartState = LogPartRecord::IDLE;
     return;
   }//if
-  if (regLogPartPtr->firstLogQueue != RNIL) {
+
+  if (!regLogPartPtr->m_log_prepare_queue.isEmpty() ||
+      !regLogPartPtr->m_log_complete_queue.isEmpty())
+  {
     jam();
-    if (regLogPartPtr->LogLqhKeyReqSent == ZFALSE) {
+    regLogPartPtr->logPartState = LogPartRecord::ACTIVE;
+    if (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)
+    {
       jam();
       regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
       signal->theData[0] = ZLOG_LQHKEYREQ;
       signal->theData[1] = logPartPtr.i;
       sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
     }//if
-  } else {
-    if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) {
-      jam();
-      regLogPartPtr->logPartState = LogPartRecord::IDLE;
-    } else {
-      jam();
-    }//if
-  }//if
-  if (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE) {
+  }
+
+  if (regLogPartPtr->waitWriteGciLog != LogPartRecord::WWGL_TRUE)
+  {
     jam();
     return;
-  } else {
+  }
+  else
+  {
     jam();
 /* -------------------------------------------------------------------------- 
  *   A COMPLETE GCI LOG RECORD IS WAITING TO BE WRITTEN. WE GIVE THIS HIGHEST
@@ -21201,6 +21282,8 @@ void Dblqh::writeDirty(Signal* signal, U
 
   ndbrequire(logFilePtr.p->fileRef != RNIL);
 
+  logPartPtr.p->m_io_tracker.send_io(32768);
+
   if (DEBUG_REDO)
   {
     ndbout_c("writeDirty 1 page at part: %u file: %u pos: %u",
@@ -21403,12 +21486,12 @@ void Dblqh::writeNextLog(Signal* signal)
       char buf[100];
       BaseString::snprintf(buf, sizeof(buf), 
                            "Head/Tail met in REDO log, logpart: %u"
-                           " file: %u mbyte: %u state: %u tail-problem: %u",
+                           " file: %u mbyte: %u state: %u log-problem: %u",
                            logPartPtr.p->logPartNo,
                            logFilePtr.p->fileNo,
                            logFilePtr.p->currentMbyte,
                            logPartPtr.p->logPartState,
-                           logPartPtr.p->m_tail_problem);
+                           logPartPtr.p->m_log_problems);
 
 
       signal->theData[0] = 2398;
@@ -21420,9 +21503,12 @@ void Dblqh::writeNextLog(Signal* signal)
   if (logFilePtr.p->currentMbyte == (clogFileSize - 1)) {
     jam();
     twnlNextMbyte = 0;
-    if (logFilePtr.p->fileChangeState != LogFileRecord::NOT_ONGOING) {
+    if (logFilePtr.p->fileChangeState != LogFileRecord::NOT_ONGOING)
+    {
       jam();
-      logPartPtr.p->logPartState = LogPartRecord::FILE_CHANGE_PROBLEM;
+      update_log_problem(signal, logPartPtr,
+                         LogPartRecord::P_FILE_CHANGE_PROBLEM,
+                         /* set */ true);
     }//if
     twnlNextFileNo = wnlNextLogFilePtr.p->fileNo;
   } else {
@@ -21443,7 +21529,7 @@ void Dblqh::writeNextLog(Signal* signal)
   if (free_mb <= c_free_mb_tail_problem_limit)
   {
     jam();
-    logPartPtr.p->m_tail_problem = true;
+    update_log_problem(signal, logPartPtr, LogPartRecord::P_TAIL_PROBLEM, true);
   }
 
   if (ERROR_INSERTED(5058) &&
@@ -21915,17 +22001,20 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
       Ptr<LogPartRecord> lp;
       lp.i = i;
       ptrCheckGuard(lp, clogPartFileSize, logPartRecord);
-      ndbout_c("LP %d blockInstance: %d partNo: %d state: %d WW_Gci: %d gcprec: %d flq: %d currfile: %d tailFileNo: %d logTailMbyte: %d",
+      ndbout_c("LP %d blockInstance: %d partNo: %d state: %d WW_Gci: %d gcprec: %d flq: %u %u currfile: %d tailFileNo: %d logTailMbyte: %d cnoOfLogPages: %u problems: 0x%x",
                i,
                instance(),
                lp.p->logPartNo,
 	       lp.p->logPartState,
 	       lp.p->waitWriteGciLog,
 	       lp.p->gcprec,
-	       lp.p->firstLogQueue,
+	       lp.p->m_log_prepare_queue.firstElement,
+	       lp.p->m_log_complete_queue.firstElement,
 	       lp.p->currentLogfile,
 	       lp.p->logTailFileNo,
-	       lp.p->logTailMbyte);
+	       lp.p->logTailMbyte,
+               cnoOfLogPages,
+               lp.p->m_log_problems);
       
       if(gcp.i == RNIL && lp.p->gcprec != RNIL)
 	gcp.i = lp.p->gcprec;
@@ -23155,3 +23244,145 @@ Dblqh::send_runredo_event(Signal* signal
   signal->theData[11] = lp->stopMbyte;
   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 12, JBB);
 }
+
+void
+Dblqh::IOTracker::init(Uint32 partNo)
+{
+  bzero(this, sizeof(* this));
+  m_log_part_no = partNo;
+}
+
+int
+Dblqh::IOTracker::tick(Uint32 now, Uint32 maxlag, Uint32 maxlag_cnt)
+{
+  Uint32 t = m_current_time;
+
+  if ((t / SAMPLE_TIME) == (now / SAMPLE_TIME))
+    return 0;
+
+  m_current_time = now;
+  if (m_sample_completed_bytes >= m_sample_sent_bytes)
+  {
+    /**
+     * If we completed all io we sent during current sample...
+     *   we can't have any problem...and
+     *   we can't measure io throughput, so don't add measurement
+     *
+     */
+    m_sample_sent_bytes = 0;
+    m_sample_completed_bytes = 0;
+  }
+  else
+  {
+    // io maxed out...
+    Uint32 elapsed = now - t;
+    m_save_written_bytes[m_save_pos] += m_sample_completed_bytes;
+    m_save_elapsed_millis[m_save_pos] += elapsed;
+
+    m_curr_written_bytes += m_sample_completed_bytes;
+    m_curr_elapsed_millis += elapsed;
+
+    Uint32 bps = (1000 * m_sample_completed_bytes) / elapsed;
+    Uint32 lag = bps ? m_sum_outstanding_bytes / bps : 30;
+    if (false && lag >= 30)
+    {
+      g_eventLogger->info("part: %u tick(%u) m_sample_completed_bytes: %u m_sample_sent_bytes: %u elapsed: %u kbps: %u lag: %u",
+                          m_log_part_no,
+                          now, m_sample_completed_bytes, m_sample_sent_bytes,
+                          elapsed, bps/1000, lag);
+    }
+
+    m_sample_sent_bytes -= m_sample_completed_bytes;
+    m_sample_completed_bytes = 0;
+  }
+
+  int retVal = 0;
+  Uint32 save_lag_cnt = m_lag_cnt;
+  if ((now / SLIDING_WINDOW_LEN) != (t / SLIDING_WINDOW_LEN))
+  {
+    Uint32 lag = m_curr_written_bytes ?
+      ((Uint64(m_sum_outstanding_bytes) / 1000) * Uint64(m_curr_elapsed_millis)) / m_curr_written_bytes:
+      0;
+
+    if (lag > maxlag)
+    {
+      /**
+       * We did have lag last second...
+       *   increase m_lag_cnt and check if it has reached maxlag_cnt
+       */
+      Uint32 tmp = m_lag_cnt;
+      m_lag_cnt += (lag / maxlag);
+      if (tmp < maxlag_cnt && m_lag_cnt >= maxlag_cnt)
+      {
+        retVal = -1; // start aborting transaction
+      }
+    }
+    else
+    {
+      /**
+       * We did not have lag...reset m_lag_cnt
+       */
+      if (m_lag_cnt >= maxlag_cnt)
+      {
+        // stop aborting transcation
+        retVal = 1;
+      }
+      m_lag_cnt = 0;
+    }
+
+#if 1
+    if (m_lag_cnt == 0 && lag == 0)
+    {
+    }
+    else if (lag > 0 && m_lag_cnt == 0)
+    {
+      g_eventLogger->info("part: %u : time to complete: %u",
+                          m_log_part_no, lag);
+    }
+    else if (m_lag_cnt < maxlag_cnt && m_lag_cnt == save_lag_cnt)
+    {
+      g_eventLogger->info("part: %u : time to complete: %u lag_cnt: %u => %u => retVal: %d",
+                          m_log_part_no,
+                          lag,
+                          save_lag_cnt,
+                          m_lag_cnt,
+                          retVal);
+    }
+    else
+    {
+      g_eventLogger->info("part: %u : sum_outstanding: %ukb avg_written: %ukb avg_elapsed: %ums time to complete: %u lag_cnt: %u => %u retVal: %d",
+                          m_log_part_no, m_sum_outstanding_bytes / 1024, m_curr_written_bytes/1024, m_curr_elapsed_millis,
+             lag, save_lag_cnt, m_lag_cnt, retVal);
+    }
+#endif
+
+    /**
+     * And finally rotate sliding window
+     */
+    Uint32 last = (m_save_pos + 1) % SLIDING_WINDOW_HISTORY_LEN;
+    assert(m_curr_written_bytes >= m_save_written_bytes[last]);
+    assert(m_curr_elapsed_millis >= m_save_elapsed_millis[last]);
+    m_curr_written_bytes -= m_save_written_bytes[last];
+    m_curr_elapsed_millis -= m_save_elapsed_millis[last];
+    m_save_written_bytes[last] = 0;
+    m_save_elapsed_millis[last] = 0;
+    m_save_pos = last;
+  }
+  return retVal;
+}
+
+void
+Dblqh::IOTracker::send_io(Uint32 bytes)
+{
+  m_sum_outstanding_bytes += bytes;
+  m_sample_sent_bytes += bytes;
+}
+
+void
+Dblqh::IOTracker::complete_io(Uint32 bytes)
+{
+  assert(m_sum_outstanding_bytes >= bytes);
+
+  m_sum_outstanding_bytes -= bytes;
+  m_sample_completed_bytes += bytes;
+}

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2010-03-26 07:13:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2010-12-02 11:02:29 +0000
@@ -916,6 +916,7 @@ public:
       Uint8  m_special_hash; // collation or distribution key
       Uint8  m_no_hash;      // Hash not required for LQH (special variant)
       Uint8  m_no_disk_flag; 
+      Uint8  m_op_queue;
       Uint8  lenAiInTckeyreq;  /* LENGTH OF ATTRIBUTE INFORMATION IN TCKEYREQ */
     
       Uint8  fragmentDistributionKey;  /* DIH generation no */

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2010-11-10 08:40:08 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2010-12-02 11:02:29 +0000
@@ -2785,6 +2785,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   Uint8 TNoDiskFlag         = TcKeyReq::getNoDiskFlag(Treqinfo);
   Uint8 TexecuteFlag        = TexecFlag;
   Uint8 Treorg              = TcKeyReq::getReorgFlag(Treqinfo);
+  Uint8 Tqueue              = TcKeyReq::getQueueOnRedoProblemFlag(Treqinfo);
 
   if (Treorg)
   {
@@ -2803,6 +2804,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   regCachePtr->opExec   = TInterpretedFlag;
   regCachePtr->distributionKeyIndicator = TDistrKeyFlag;
   regCachePtr->m_no_disk_flag = TNoDiskFlag;
+  regCachePtr->m_op_queue = Tqueue;
 
   //-------------------------------------------------------------
   // The next step is to read the upto three conditional words.
@@ -3546,6 +3548,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
   LqhKeyReq::setSimpleFlag(Tdata10, sig0);
   LqhKeyReq::setOperation(Tdata10, sig1);
   LqhKeyReq::setNoDiskFlag(Tdata10, regCachePtr->m_no_disk_flag);
+  LqhKeyReq::setQueueOnRedoProblemFlag(Tdata10, regCachePtr->m_op_queue);
 
   /* ----------------------------------------------------------------------- 
    * If we are sending a short LQHKEYREQ, then there will be some AttrInfo

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2010-11-22 14:27:12 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2010-12-02 11:02:29 +0000
@@ -249,6 +249,12 @@ const ConfigInfo::Typelib arbit_method_t
   { 0, 0 }
 };
 
+static
+const ConfigInfo::Typelib default_operation_redo_problem_action_typelib [] = {
+  { "abort", OPERATION_REDO_PROBLEM_ACTION_ABORT },
+  { "queue", OPERATION_REDO_PROBLEM_ACTION_QUEUE },
+  { 0, 0 }
+};
 
 /**
  * The default constructors create objects with suitable values for the
@@ -1865,6 +1871,34 @@ const ConfigInfo::ParamInfo ConfigInfo::
     "1"                      /* Max */
   },
 
+  {
+    CFG_DB_REDO_OVERCOMMIT_LIMIT,
+    "RedoOverCommitLimit",
+    DB_TOKEN,
+    "Limit for how long it will take to flush current "
+    "RedoBuffer before action is taken (in seconds)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "20",                    /* Default */
+    "0",                     /* Min */
+    STR_VALUE(MAX_INT_RNIL)  /* Max */
+  },
+
+  {
+    CFG_DB_REDO_OVERCOMMIT_COUNTER,
+    "RedoOverCommitCounter",
+    DB_TOKEN,
+    "If RedoOverCommitLimit has been reached RedoOverCommitCounter"
+    " in a row times, transactions will be aborted",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "3",                     /* Default */
+    "0",                     /* Min */
+    STR_VALUE(MAX_INT_RNIL)  /* Max */
+  },
+
   /***************************************************************************
    * API
    ***************************************************************************/
@@ -2059,6 +2093,20 @@ const ConfigInfo::ParamInfo ConfigInfo::
     ConfigInfo::CI_STRING,
     0, 0, 0 },
 
+  {
+    CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION,
+    "DefaultOperationRedoProblemAction",
+    API_TOKEN,
+    "If Redo-log is having problem, should operation default (unless overridden on transaction/operation level) abort or be put on queue"
+    " in a row times, transactions will be aborted",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_ENUM,
+    0, /* default for ENUM doesnt seem to work... */
+    (const char*)default_operation_redo_problem_action_typelib,
+    0
+  },
+
   /****************************************************************************
    * MGM
    ***************************************************************************/

=== modified file 'storage/ndb/src/ndbapi/NdbOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperation.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperation.cpp	2010-12-02 11:02:29 +0000
@@ -170,11 +170,11 @@ NdbOperation::init(const NdbTableImpl* t
   theBlobList = NULL;
   m_abortOption = -1;
   m_noErrorPropagation = false;
-  m_no_disk_flag = 1;
+  m_flags = 0;
+  m_flags |= OF_NO_DISK;
   m_interpreted_code = NULL;
   m_extraSetValues = NULL;
   m_numExtraSetValues = 0;
-  m_use_any_value = 0;
 
   tSignal = theNdb->getSignal();
   if (tSignal == NULL)
@@ -196,6 +196,10 @@ NdbOperation::init(const NdbTableImpl* t
     return -1;
   }
   m_customData = NULL;
+
+  if (theNdb->theImpl->get_ndbapi_config_parameters().m_default_queue_option)
+    m_flags |= OF_QUEUEABLE;
+
   return 0;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbOperationDefine.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2010-12-02 11:02:29 +0000
@@ -379,7 +379,10 @@ NdbOperation::getValue_impl(const NdbCol
   NdbRecAttr* tRecAttr;
   if ((tAttrInfo != NULL) &&
       (theStatus != Init)){
-    m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+    if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+    {
+      m_flags &= ~Uint8(OF_NO_DISK);
+    }
     if (theStatus != GetValue) {
       if (theStatus == UseNdbRecord)
         /* This path for extra GetValues for NdbRecord */
@@ -438,7 +441,10 @@ NdbOperation::getValue_NdbRecord(const N
 {
   NdbRecAttr* tRecAttr;
 
-  m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+  if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+  {
+    m_flags &= ~Uint8(OF_NO_DISK);
+  }
 
   /*
     For getValue with NdbRecord operations, we just allocate the NdbRecAttr,
@@ -551,7 +557,10 @@ NdbOperation::setValue( const NdbColumnI
   
   // Insert Attribute Id into ATTRINFO part. 
   tAttrId = tAttrInfo->m_attrId;
-  m_no_disk_flag &= (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+  if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+  {
+    m_flags &= ~Uint8(OF_NO_DISK);
+  }
   const char *aValue = aValuePassed; 
   if (aValue == NULL) {
     if (tAttrInfo->m_nullable) {
@@ -1347,7 +1356,7 @@ NdbOperation::handleOperationOptions (co
   {
     /* Any operation can have an ANYVALUE set */
     op->m_any_value = opts->anyValue;
-    op->m_use_any_value = 1;
+    op->m_flags |= OF_USE_ANY_VALUE;
   }
 
   if (opts->optionsPresent & OperationOptions::OO_CUSTOMDATA)
@@ -1386,5 +1395,15 @@ NdbOperation::handleOperationOptions (co
     }    
   }
 
+  if (opts->optionsPresent & OperationOptions::OO_QUEUABLE)
+  {
+    op->m_flags |= OF_QUEUEABLE;
+  }
+
+  if (opts->optionsPresent & OperationOptions::OO_NOT_QUEUABLE)
+  {
+    op->m_flags &= ~Uint8(OF_QUEUEABLE);
+  }
+
   return 0;
 }

=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2010-12-02 11:02:29 +0000
@@ -471,7 +471,8 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   Uint8 tCommitIndicator = theCommitIndicator;
   Uint8 tStartIndicator = theStartIndicator;
   Uint8 tInterpretIndicator = theInterpretIndicator;
-  Uint8 tNoDisk = m_no_disk_flag;
+  Uint8 tNoDisk = (m_flags & OF_NO_DISK) != 0;
+  Uint8 tQueable = (m_flags & OF_QUEUEABLE) != 0;
 
   /**
    * A dirty read, can not abort the transaction
@@ -489,6 +490,7 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   tcKeyReq->setStartFlag(tReqInfo, tStartIndicator);
   tcKeyReq->setInterpretedFlag(tReqInfo, tInterpretIndicator);
   tcKeyReq->setNoDiskFlag(tReqInfo, tNoDisk);
+  tcKeyReq->setQueueOnRedoProblemFlag(tReqInfo, tQueable);
 
   OperationType tOperationType = theOperationType;
   Uint8 abortOption = (ao == DefaultAbortOption) ? (Uint8) m_abortOption : (Uint8) ao;
@@ -958,7 +960,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
   attrInfoRemain= 0;
   theATTRINFOptr= NULL;
 
-  no_disk_flag= m_no_disk_flag;
+  no_disk_flag = (m_flags & OF_NO_DISK) != 0;
 
   /* If we have an interpreted program then we add 5 words
    * of section length information at the start of the
@@ -1074,7 +1076,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
     }
   }
 
-  if (m_use_any_value && 
+  if (((m_flags & OF_USE_ANY_VALUE) != 0) &&
       (tOpType == DeleteRequest))
   {
     /* Special hack for delete and ANYVALUE pseudo-column
@@ -1333,7 +1335,7 @@ NdbOperation::buildSignalsNdbRecord(Uint
       (tOpType == UpdateRequest))
   {
     /* Handle setAnyValue() for all cases except delete */
-    if (m_use_any_value)
+    if ((m_flags & OF_USE_ANY_VALUE) != 0)
     {
       res= insertATTRINFOHdr_NdbRecord(AttributeHeader::ANY_VALUE, 4);
       if(res)

=== modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2010-12-02 11:02:29 +0000
@@ -92,8 +92,10 @@ NdbOperation::incCheck(const NdbColumnIm
       setErrorCodeAbort(4231);
       return -1;
     }
-    m_no_disk_flag &= 
-      (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+    if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+    {
+      m_flags &= ~(Uint8)OF_NO_DISK;
+    }
     return tNdbColumnImpl->m_attrId;
   } else {
     if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -145,8 +147,10 @@ NdbOperation::write_attrCheck(const NdbC
       setErrorCodeAbort(4231);
       return -1;
     }
-    m_no_disk_flag &= 
-      (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+    if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+    {
+      m_flags &= ~(Uint8)OF_NO_DISK;
+    }
     return tNdbColumnImpl->m_attrId;
   } else {
     if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -196,8 +200,10 @@ NdbOperation::read_attrCheck(const NdbCo
       setErrorCodeAbort(4231);
       return -1;
     }
-    m_no_disk_flag &= 
-      (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+    if (tNdbColumnImpl->m_storageType == NDB_STORAGETYPE_DISK)
+    {
+      m_flags &= ~(Uint8)OF_NO_DISK;
+    }
     return tNdbColumnImpl->m_attrId;
   } else {
     if (theNdbCon->theCommitStatus == NdbTransaction::Started)
@@ -1120,7 +1126,10 @@ NdbOperation::branch_col(Uint32 type, 
     }
   }
 
-  m_no_disk_flag &= (col->m_storageType == NDB_STORAGETYPE_DISK ? 0:1);
+  if (col->m_storageType == NDB_STORAGETYPE_DISK)
+  {
+    m_flags &= ~(Uint8)OF_NO_DISK;
+  }
 
   Uint32 tempData[ NDB_MAX_TUPLE_SIZE_IN_WORDS ];
   if (((UintPtr)val & 3) != 0) {

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2010-12-02 11:02:29 +0000
@@ -170,8 +170,10 @@ NdbScanOperation::addInterpretedCode()
   const NdbInterpretedCode* code= m_interpreted_code;
 
   /* Any disk access? */
-  m_no_disk_flag &= 
-    !(code->m_flags & NdbInterpretedCode::UsesDisk);
+  if (code->m_flags & NdbInterpretedCode::UsesDisk)
+  {
+    m_flags &= ~Uint8(OF_NO_DISK);
+  }
 
 
   /* Main program size depends on whether there's subroutines */
@@ -385,7 +387,7 @@ NdbScanOperation::generatePackedReadAIs(
     }
 
     if (col->flags & NdbRecord::IsDisk)
-      m_no_disk_flag= false;
+      m_flags &= ~Uint8(OF_NO_DISK);
 
     if (attrId > maxAttrId)
       maxAttrId= attrId;
@@ -1370,7 +1372,7 @@ NdbScanOperation::processTableScanDefs(N
   if (scan_flags & SF_DiskScan)
   {
     tupScan = true;
-    m_no_disk_flag = false;
+    m_flags &= ~Uint8(OF_NO_DISK);
   }
   
   bool rangeScan= false;
@@ -2287,7 +2289,7 @@ int NdbScanOperation::prepareSendScan(Ui
    */
   Uint32 reqInfo = req->requestInfo;
   ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
-  ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
+  ScanTabReq::setNoDiskFlag(reqInfo, (m_flags & OF_NO_DISK) != 0);
 
   /* Set distribution key info if required */
   ScanTabReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
@@ -2901,8 +2903,10 @@ NdbScanOperation::getValue_NdbRecord_sca
   NdbRecAttr *ra;
   DBUG_PRINT("info", ("Column: %u", attrInfo->m_attrId));
 
-  m_no_disk_flag &= 
-    (attrInfo->m_storageType == NDB_STORAGETYPE_MEMORY);
+  if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+  {
+    m_flags &= ~Uint8(OF_NO_DISK);
+  }
 
   res= insertATTRINFOHdr_NdbRecord(attrInfo->m_attrId, 0);
   if (res==-1)
@@ -2936,9 +2940,12 @@ NdbScanOperation::getValue_NdbRecAttr_sc
   /* Get a RecAttr object, which is linked in to the Receiver's
    * RecAttr linked list, and return to caller
    */
-  if (attrInfo != NULL) {
-    m_no_disk_flag &= 
-      (attrInfo->m_storageType == NDB_STORAGETYPE_MEMORY);
+  if (attrInfo != NULL)
+  {
+    if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
+    {
+      m_flags &= ~Uint8(OF_NO_DISK);
+    }
   
     recAttr = theReceiver.getValue(attrInfo, aValue);
     

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2010-11-15 09:23:10 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2010-12-02 11:02:29 +0000
@@ -669,6 +669,12 @@ Ndb_cluster_connection_impl::configure(U
         timeout = tmp1;
     }
     m_config.m_waitfor_timeout = timeout;
+
+    Uint32 queue = 0;
+    if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
+    {
+      m_config.m_default_queue_option = queue;
+    }
   }
   DBUG_RETURN(init_nodes_vector(nodeId, config));
 }

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2010-11-15 09:23:10 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2010-12-02 11:02:29 +0000
@@ -43,13 +43,15 @@ struct NdbApiConfig
     m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
     m_batch_byte_size(SCAN_BATCH_SIZE),
     m_batch_size(DEF_BATCH_SIZE),
-    m_waitfor_timeout(120000)
+    m_waitfor_timeout(120000),
+    m_default_queue_option(0)
     {}
 
   Uint32 m_scan_batch_size;
   Uint32 m_batch_byte_size;
   Uint32 m_batch_size;
   Uint32 m_waitfor_timeout; // in milli seconds...
+  Uint32 m_default_queue_option;
 };
 
 class Ndb_cluster_connection_impl : public Ndb_cluster_connection

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2010-05-03 04:49:08 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2010-12-02 11:02:29 +0000
@@ -188,6 +188,7 @@ ErrorBundle ErrorCodes[] = {
   { 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
   { 1218, DMEC, TR, "Send Buffers overloaded in NDB kernel" },
   { 1220, DMEC, TR, "REDO log files overloaded (increase FragmentLogFileSize)" },
+  { 1234, DMEC, TR, "REDO log files overloaded (increase disk hardware)" },
   { 1222, DMEC, TR, "Out of transaction markers in LQH" },
   { 4021, DMEC, TR, "Out of Send Buffer space in NDB API" },
   { 4022, DMEC, TR, "Out of Send Buffer space in NDB API" },

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas:4010 to 4011) Jonas Oreland2 Dec