List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:August 13 2012 2:10pm
Subject:bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4590 to 4591)
View as plain text  
 4591 Frazer Clement	2012-08-13 [merge]
      Merge 7.0->7.1

    added:
      storage/ndb/test/ndbapi/testRedo.cpp
    modified:
      storage/ndb/include/mgmapi/ndb_logevent.h
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/mgmapi/ndb_logevent.cpp
      storage/ndb/test/ndbapi/CMakeLists.txt
      storage/ndb/test/ndbapi/Makefile.am
      storage/ndb/test/run-test/daily-basic-tests.txt
 4590 Mauritz Sundell	2012-07-12 [merge]
      Merge 7.0 -> 7.1

    modified:
      storage/ndb/test/run-test/conf-ndb07.cnf
=== modified file 'storage/ndb/include/mgmapi/ndb_logevent.h'
--- a/storage/ndb/include/mgmapi/ndb_logevent.h	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/include/mgmapi/ndb_logevent.h	2012-08-13 13:39:58 +0000
@@ -707,6 +707,8 @@ extern "C" {
     unsigned total_lo;
     unsigned free_hi;
     unsigned free_lo;
+    unsigned no_logfiles;
+    unsigned logfilesize;
   };
 
   /** Log event data @ref NDB_LE_LogFileInitStatus */

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-05-31 14:42:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-08-13 13:03:29 +0000
@@ -1387,12 +1387,21 @@ public:
      *       command when that is needed.
      */
     UintR prevLogpage;
-    /**
-     *       The number of files remaining to gather GCI information
-     *       for during system restart.  Only used if number of files
-     *       is larger than 60.
-     */
-    UintR srRemainingFiles;
+    union {
+      /**
+       *       The number of files remaining to gather GCI information
+       *       for during system restart.  Only used if number of files
+       *       is larger than 60.
+       */
+      UintR srRemainingFiles;
+
+      /**
+       *       The index of the file which we should start loading redo
+       *       meta information from after the 'FRONTPAGE' file has been
+       *       closed.
+       */
+      UintR srLastFileIndex;
+    };
     /**
      *       The log file where to start executing the log during
      *       system restart.
@@ -1572,6 +1581,7 @@ public:
       ,OPEN_EXEC_LOG_CACHED = 28
       ,CLOSING_EXEC_LOG_CACHED = 29
 #endif
+      ,CLOSING_SR_FRONTPAGE = 30
     };
     
     /**
@@ -2709,6 +2719,7 @@ private:
   void openExecLogLab(Signal* signal);
   void checkInitCompletedLab(Signal* signal);
   void closingSrLab(Signal* signal);
+  void closingSrFrontPage(Signal* signal);
   void closeExecSrLab(Signal* signal);
   void execLogComp(Signal* signal);
   void execLogComp_extra_files_closed(Signal* signal);
@@ -2917,7 +2928,17 @@ private:
   LogPartRecordPtr logPartPtr;
   UintR clogPartFileSize;
   Uint32 clogFileSize; // In MBYTE
-  Uint32 cmaxLogFilesInPageZero; //
+  /* Max entries for log file:mb meta info in file page zero */
+  Uint32 cmaxLogFilesInPageZero; 
+  /* Max valid entries for log file:mb meta info in file page zero 
+   *  = cmaxLogFilesInPageZero - 1
+   * as entry zero (for current file) is invalid.
+   */
+  Uint32 cmaxValidLogFilesInPageZero;
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  Uint32 cmaxLogFilesInPageZero_DUMP;
+#endif
 
 // Configurable
   LogFileRecord *logFileRecord;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2012-05-21 22:27:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2012-08-13 13:03:29 +0000
@@ -69,6 +69,11 @@ void Dblqh::initData()
   m_backup_ptr = RNIL;
   clogFileSize = 16;
   cmaxLogFilesInPageZero = 40;
+  cmaxValidLogFilesInPageZero = cmaxLogFilesInPageZero - 1;
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  cmaxLogFilesInPageZero_DUMP = 0;
+#endif
 
    totalLogFiles = 0;
    logFileInitDone = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-06-11 14:47:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-08-13 13:39:58 +0000
@@ -1353,7 +1353,28 @@ void Dblqh::execREAD_CONFIG_REQ(Signal*
     ndbrequire(cmaxLogFilesInPageZero);
   }
 
+#if defined VM_TRACE || defined ERROR_INSERT
+  if (cmaxLogFilesInPageZero_DUMP != 0)
   {
+    ndbout << "LQH DUMP 2396 " << cmaxLogFilesInPageZero_DUMP;
+    if (cmaxLogFilesInPageZero_DUMP > cmaxLogFilesInPageZero)
+    {
+      ndbout << ": max allowed is " << cmaxLogFilesInPageZero << endl;
+      // do not continue with useless test
+      ndbrequire(false);
+    }
+    cmaxLogFilesInPageZero = cmaxLogFilesInPageZero_DUMP;
+    ndbout << endl;
+  }
+#endif
+
+  /* How many file's worth of info is actually valid? */
+  cmaxValidLogFilesInPageZero = cmaxLogFilesInPageZero - 1;
+
+  /* Must be at least 1 */
+  ndbrequire(cmaxValidLogFilesInPageZero > 0);
+
+   {
     Uint32 config_val = 20;
     ndb_mgm_get_int_parameter(p, CFG_DB_LCP_INTERVAL, &config_val);
     config_val = config_val > 31 ? 31 : config_val;
@@ -14363,6 +14384,8 @@ retry:
 /*SET THE NEW LOG TAIL AND CONTINUE WITH NEXT LOG PART.                      */
 /*THIS MBYTE IS NOT TO BE INCLUDED SO WE NEED TO STEP BACK ONE MBYTE.        */
 /* ------------------------------------------------------------------------- */
+        /* Check keepGCI MB has a reasonable GCI value */
+        ndbrequire(sltLogFilePtr.p->logMaxGciStarted[tsltIndex] != ((Uint32) -1));
         if (tsltIndex != 0) {
           jam();
           tsltMbyte = tsltIndex - 1;
@@ -15076,6 +15099,10 @@ void Dblqh::execFSCLOSECONF(Signal* sign
     release(signal, m_redo_open_file_cache);
     return;
 #endif
+  case LogFileRecord::CLOSING_SR_FRONTPAGE:
+    jam();
+    closingSrFrontPage(signal);
+    return;
   default:
     jam();
     systemErrorLab(signal, __LINE__);
@@ -16423,6 +16450,15 @@ void Dblqh::writeFileHeaderOpen(Signal*
   logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_LOG_TYPE] = ZFD_TYPE;
   logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_FILE_NO] = 
     logFilePtr.p->fileNo;
+  /* 
+   * When writing a file header on open, we write cmaxLogFilesInPageZero,
+   * though the entries for the first file (this file), will be invalid,
+   * as we do not know e.g. which GCIs will be included by log records
+   * in the MBs in this file.  On the first lap these will be initial values
+   * on subsequent laps, they will be values from the previous lap.
+   * We take care when reading these values back, not to use the values for
+   * the current file.
+   */
   if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
     jam();
     twmoNoLogDescriptors = cmaxLogFilesInPageZero;
@@ -16629,25 +16665,19 @@ void Dblqh::readSrFrontpageLab(Signal* s
   }
 
   Uint32 fileNo = logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_FILE_NO];
-  if (fileNo == 0) {
-    jam();
-    /* ----------------------------------------------------------------------
-     *       FILE 0 WAS ALSO LAST FILE SO WE DO NOT NEED TO READ IT AGAIN.
-     * ---------------------------------------------------------------------- */
-    readSrLastFileLab(signal);
-    return;
-  }//if
   /* ------------------------------------------------------------------------
    *    CLOSE FILE 0 SO THAT WE HAVE CLOSED ALL FILES WHEN STARTING TO READ 
    *    THE FRAGMENT LOG. ALSO RELEASE PAGE ZERO.
    * ------------------------------------------------------------------------ */
   releaseLogpage(signal);
-  logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
+  logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR_FRONTPAGE;
   closeFile(signal, logFilePtr, __LINE__);
+  /* Lookup index of last file */
   LogFileRecordPtr locLogFilePtr;
   findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
-  locLogFilePtr.p->logFileStatus = LogFileRecord::OPEN_SR_LAST_FILE;
-  openFileRw(signal, locLogFilePtr);
+  
+  /* Store in logPart record for use once file 0 is closed */
+  logPartPtr.p->srLastFileIndex = locLogFilePtr.i;
   return;
 }//Dblqh::readSrFrontpageLab()
 
@@ -16669,9 +16699,9 @@ void Dblqh::readSrLastFileLab(Signal* si
              logPartPtr.p->logPartState,
              logPartPtr.p->logLap);
   }
-  if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
+  if (logPartPtr.p->noLogFiles > cmaxValidLogFilesInPageZero) {
     jam();
-    initGciInLogFileRec(signal, cmaxLogFilesInPageZero);
+    initGciInLogFileRec(signal, cmaxValidLogFilesInPageZero);
   } else {
     jam();
     initGciInLogFileRec(signal, logPartPtr.p->noLogFiles);
@@ -16731,33 +16761,40 @@ void Dblqh::readSrLastMbyteLab(Signal* s
   }//if
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
   closeFile(signal, logFilePtr, __LINE__);
-  if (logPartPtr.p->noLogFiles > cmaxLogFilesInPageZero) {
+
+  /* Head file is initialised by reading per-MB headers rather than per-file
+   * headers.  Therefore, when stepping back through the redo files to get
+   * the previous file's metadata, we must be careful not to read the 
+   * per-file header info over the just-read per-MB headers, invalidating
+   * the head metainfo.
+   */
+  Uint32 nonHeadFileCount = logPartPtr.p->noLogFiles - 1;
+
+  if (logPartPtr.p->noLogFiles > cmaxValidLogFilesInPageZero) {
+    /* Step back from head to get file:mb metadata from a 
+     * previous file's page zero
+     */
     Uint32 fileNo;
-    if (logFilePtr.p->fileNo >= cmaxLogFilesInPageZero) {
+    if (logFilePtr.p->fileNo >= cmaxValidLogFilesInPageZero) {
       jam();
-      fileNo = logFilePtr.p->fileNo - cmaxLogFilesInPageZero;
+      fileNo = logFilePtr.p->fileNo - cmaxValidLogFilesInPageZero;
     } else {
+      /* Wrap at 0:0 */
       jam();
       fileNo = 
 	(logPartPtr.p->noLogFiles + logFilePtr.p->fileNo) - 
-	cmaxLogFilesInPageZero;
-    }//if
-    if (fileNo == 0) {
-      jam();
-      /* --------------------------------------------------------------------
-       *  AVOID USING FILE 0 AGAIN SINCE THAT IS PROBABLY CLOSING AT THE 
-       *  MOMENT.
-       * -------------------------------------------------------------------- */
-      fileNo = 1;
-      logPartPtr.p->srRemainingFiles = 
-	logPartPtr.p->noLogFiles - (cmaxLogFilesInPageZero - 1);
-    } else {
-      jam();
-      logPartPtr.p->srRemainingFiles = 
-	logPartPtr.p->noLogFiles - cmaxLogFilesInPageZero;
+	cmaxValidLogFilesInPageZero;
     }//if
+
+    jam();
+    logPartPtr.p->srRemainingFiles = 
+      nonHeadFileCount - cmaxValidLogFilesInPageZero;
+
+    /* Check we're making progress */
+    ndbrequire(fileNo != logFilePtr.p->fileNo);
     LogFileRecordPtr locLogFilePtr;
     findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
+    ndbrequire(locLogFilePtr.p->logFileStatus == LogFileRecord::CLOSED);
     locLogFilePtr.p->logFileStatus = LogFileRecord::OPEN_SR_NEXT_FILE;
     openFileRw(signal, locLogFilePtr);
     return;
@@ -16779,9 +16816,9 @@ void Dblqh::openSrNextFileLab(Signal* si
 
 void Dblqh::readSrNextFileLab(Signal* signal) 
 {
-  if (logPartPtr.p->srRemainingFiles > cmaxLogFilesInPageZero) {
+  if (logPartPtr.p->srRemainingFiles > cmaxValidLogFilesInPageZero) {
     jam();
-    initGciInLogFileRec(signal, cmaxLogFilesInPageZero);
+    initGciInLogFileRec(signal, cmaxValidLogFilesInPageZero);
   } else {
     jam();
     initGciInLogFileRec(signal, logPartPtr.p->srRemainingFiles);
@@ -16789,32 +16826,31 @@ void Dblqh::readSrNextFileLab(Signal* si
   releaseLogpage(signal);
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
   closeFile(signal, logFilePtr, __LINE__);
-  if (logPartPtr.p->srRemainingFiles > cmaxLogFilesInPageZero) {
+  if (logPartPtr.p->srRemainingFiles > cmaxValidLogFilesInPageZero) {
+    /* Step back from head to get file:mb metadata from a
+     * previous file's page zero
+     */
     Uint32 fileNo;
-    if (logFilePtr.p->fileNo >= cmaxLogFilesInPageZero) {
+    if (logFilePtr.p->fileNo >= cmaxValidLogFilesInPageZero) {
       jam();
-      fileNo = logFilePtr.p->fileNo - cmaxLogFilesInPageZero;
+      fileNo = logFilePtr.p->fileNo - cmaxValidLogFilesInPageZero;
     } else {
+      /* Wrap at 0:0 */
       jam();
       fileNo = 
 	(logPartPtr.p->noLogFiles + logFilePtr.p->fileNo) - 
-	cmaxLogFilesInPageZero;
-    }//if
-    if (fileNo == 0) {
-      jam();
-      /* --------------------------------------------------------------------
-       * AVOID USING FILE 0 AGAIN SINCE THAT IS PROBABLY CLOSING AT THE MOMENT.
-       * -------------------------------------------------------------------- */
-      fileNo = 1;
-      logPartPtr.p->srRemainingFiles = 
-	logPartPtr.p->srRemainingFiles - (cmaxLogFilesInPageZero - 1);
-    } else {
-      jam();
-      logPartPtr.p->srRemainingFiles = 
-	logPartPtr.p->srRemainingFiles - cmaxLogFilesInPageZero;
+	cmaxValidLogFilesInPageZero;
     }//if
+
+    jam();
+    logPartPtr.p->srRemainingFiles = 
+      logPartPtr.p->srRemainingFiles - cmaxValidLogFilesInPageZero;
+    
+    /* Check we're making progress */
+    ndbrequire(fileNo != logFilePtr.p->fileNo);
     LogFileRecordPtr locLogFilePtr;
     findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
+    ndbrequire(locLogFilePtr.p->logFileStatus == LogFileRecord::CLOSED);
     locLogFilePtr.p->logFileStatus = LogFileRecord::OPEN_SR_NEXT_FILE;
     openFileRw(signal, locLogFilePtr);
   }//if
@@ -16826,6 +16862,36 @@ void Dblqh::readSrNextFileLab(Signal* si
   return;
 }//Dblqh::readSrNextFileLab()
 
+void Dblqh::closingSrFrontPage(Signal* signal)
+{
+  jam();
+  /* Front page (file 0) has closed, now it's safe to continue
+   * to read any page (including file 0) as part of restoring
+   * redo metadata
+   */
+  logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
+  logPartPtr.i = logFilePtr.p->logPartRec;
+  ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+  logFilePtr.i = logPartPtr.p->firstLogfile;
+  
+  /* Pre-restart head file index was stored in logPartPtr.p->srLastFileIndex
+   * prior to closing this file, now let's use it...
+   */
+  ndbrequire(logPartPtr.p->srLastFileIndex != RNIL);
+  
+  LogFileRecordPtr oldHead;
+  oldHead.i = logPartPtr.p->srLastFileIndex;
+  ptrCheckGuard(oldHead, clogFileFileSize, logFileRecord);
+
+  /* Reset srLastFileIndex */
+  logPartPtr.p->srLastFileIndex = RNIL;
+  
+  /* And now open the head file to begin redo meta reload */
+  oldHead.p->logFileStatus = LogFileRecord::OPEN_SR_LAST_FILE;
+  openFileRw(signal, oldHead);
+  return;
+}
+
 void Dblqh::closingSrLab(Signal* signal) 
 {
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
@@ -20935,9 +21001,25 @@ void Dblqh::initFragrec(Signal* signal,
  * ========================================================================= */
 void Dblqh::initGciInLogFileRec(Signal* signal, Uint32 noFdDescriptors) 
 {
+  /* We are reading the per file:mb metadata from page zero in this file
+   * We cannot use the data for this file (fd 0), but the data for
+   * previous files is valid.
+   * So we start reading at fd 1.
+   * The metadata for this file (fd 0) is set either reading the next file,
+   * or by probing the last megabytes.
+   */
   LogFileRecordPtr filePtr = logFilePtr;
   Uint32 pos = ZPAGE_HEADER_SIZE + ZFD_HEADER_SIZE;
-  for (Uint32 fd = 0; fd < noFdDescriptors; fd++)
+  ndbrequire(noFdDescriptors <= cmaxValidLogFilesInPageZero);
+
+  /* We start by initialising the previous file's metadata, 
+   * so lets move there now...
+   */
+  filePtr.i = filePtr.p->prevLogFile;
+  ptrCheckGuard(filePtr, clogFileFileSize, logFileRecord);
+    
+           
+  for (Uint32 fd = 1; fd <= noFdDescriptors; fd++)
   {
     jam();
     for (Uint32 mb = 0; mb < clogFileSize; mb++)
@@ -20953,7 +21035,7 @@ void Dblqh::initGciInLogFileRec(Signal*
       filePtr.p->logMaxGciStarted[mb] = logPagePtr.p->logPageWord[pos1];
       filePtr.p->logLastPrepRef[mb] = logPagePtr.p->logPageWord[pos2];
     }
-    if (fd + 1 < noFdDescriptors)
+    if (fd + 1 <= noFdDescriptors)
     {
       jam();
       filePtr.i = filePtr.p->prevLogFile;
@@ -23509,7 +23591,9 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
       signal->theData[7] = Uint32(total);
       signal->theData[8] = Uint32(mb >> 32);
       signal->theData[9] = Uint32(mb);
-      sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 10, JBB);
+      signal->theData[10] = logPartPtr.p->noLogFiles;
+      signal->theData[11] = clogFileSize;
+      sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 12, JBB);
     }
   }
 
@@ -23544,6 +23628,9 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
     }
   }
 
+  if (arg == 2396 && signal->length() == 2)
+      cmaxLogFilesInPageZero_DUMP = dumpState->args[1];
+
   if (arg == 2397)
   {
     /* Send LCP_STATUS_REQ to BACKUP */

=== modified file 'storage/ndb/src/mgmapi/ndb_logevent.cpp'
--- a/storage/ndb/src/mgmapi/ndb_logevent.cpp	2011-10-20 13:01:37 +0000
+++ b/storage/ndb/src/mgmapi/ndb_logevent.cpp	2012-08-13 13:03:29 +0000
@@ -404,6 +404,19 @@ struct Ndb_logevent_body_row ndb_logeven
   ROW( LogFileInitStatus,   "total_mbytes",  4, total_mbytes),
   ROW( LogFileInitStatus,   "mbytes_done",   5, mbytes_done),
 
+ 
+  ROW( RedoStatus,          "log_part",      1, log_part), 
+  ROW( RedoStatus,          "head_file_no",  2, head_file_no), 
+  ROW( RedoStatus,          "head_mbyte",    3, head_mbyte), 
+  ROW( RedoStatus,          "tail_file_no",  4, tail_file_no), 
+  ROW( RedoStatus,          "tail_mbyte",    5, tail_mbyte), 
+  ROW( RedoStatus,          "total_hi",      6, total_hi), 
+  ROW( RedoStatus,          "total_lo",      7, total_lo), 
+  ROW( RedoStatus,          "free_hi",       8, free_hi), 
+  ROW( RedoStatus,          "free_lo",       9, free_lo), 
+  ROW( RedoStatus,          "no_logfiles",  10, no_logfiles),
+  ROW( RedoStatus,          "logfilesize",  11, logfilesize),
+ 
   { NDB_LE_ILLEGAL_TYPE, 0, 0, 0, 0, 0}
 };
 

=== modified file 'storage/ndb/test/ndbapi/CMakeLists.txt'
--- a/storage/ndb/test/ndbapi/CMakeLists.txt	2011-12-09 12:29:58 +0000
+++ b/storage/ndb/test/ndbapi/CMakeLists.txt	2012-08-13 13:03:29 +0000
@@ -70,6 +70,7 @@ TARGET_LINK_LIBRARIES(testSRBank ndbbank
 ADD_EXECUTABLE(testLimits testLimits.cpp)
 ADD_EXECUTABLE(testSingleUserMode testSingleUserMode.cpp)
 ADD_EXECUTABLE(testIndexStat testIndexStat.cpp)
+ADD_EXECUTABLE(testRedo testRedo.cpp)
 
 INSTALL(TARGETS create_all_tabs create_tab
   drop_all_tabs flexAsynch flexBench
@@ -84,5 +85,5 @@ INSTALL(TARGETS create_all_tabs create_t
   DbAsyncGenerator test_event_merge testNdbinfo
   testNativeDefault testLimits testSpj
   testSingleUserMode
-  testIndexStat
+  testIndexStat testRedo
 DESTINATION bin)

=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am	2011-10-14 13:24:26 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am	2012-08-13 13:03:29 +0000
@@ -67,7 +67,8 @@ testNDBT \
 testReconnect \
 testNdbinfo \
 NdbRepStress \
-msa
+msa \
+testRedo
 
 EXTRA_PROGRAMS = \
  test_event \
@@ -149,6 +150,7 @@ test_event_multi_table_SOURCES = test_ev
 testIndexStat_SOURCES = testIndexStat.cpp
 reorg_tab_SOURCES = reorg_tab.cpp
 msa_SOURCES = msa.cpp
+testRedo_SOURCES = testRedo.cpp
 
 ndbapi_50compat0_CPPFLAGS = -DNDBAPI_50_COMPAT
 ndbapi_50compat0_SOURCES = ndbapi_50compat0.cpp

=== added file 'storage/ndb/test/ndbapi/testRedo.cpp'
--- a/storage/ndb/test/ndbapi/testRedo.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/test/ndbapi/testRedo.cpp	2012-08-13 13:03:29 +0000
@@ -0,0 +1,1333 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <NDBT.hpp>
+#include <NDBT_Test.hpp>
+#include <HugoOperations.hpp>
+#include <NdbRestarter.hpp>
+#include <mgmapi.h>
+#include <ndb_logevent.h>
+#include <NdbTick.h>
+#include <NDBT_Stats.hpp>
+#include <random.h>
+
+#include "storage/ndb/src/mgmapi/mgmapi_configuration.hpp"
+
+static NdbMutex* g_msgmutex = 0;
+
+#undef require
+#define require(b) \
+  if (!(b)) { \
+    NdbMutex_Lock(g_msgmutex); \
+    g_err << "ABORT: " << #b << " failed at line " << __LINE__ << endl; \
+    NdbMutex_Unlock(g_msgmutex); \
+    abort(); \
+  }
+
+#define CHK1(b) \
+  if (!(b)) { \
+    NdbMutex_Lock(g_msgmutex); \
+    g_err << "ERROR: " << #b << " failed at line " << __LINE__ << endl; \
+    NdbMutex_Unlock(g_msgmutex); \
+    result = NDBT_FAILED; \
+    break; \
+  }
+
+#define CHK2(b, e) \
+  if (!(b)) { \
+    NdbMutex_Lock(g_msgmutex); \
+    g_err << "ERROR: " << #b << " failed at line " << __LINE__ \
+          << ": " << e << endl; \
+    NdbMutex_Unlock(g_msgmutex); \
+    result = NDBT_FAILED; \
+    break; \
+  }
+
+#define info(x) \
+  do { \
+    NdbMutex_Lock(g_msgmutex); \
+    g_info << x << endl; \
+    NdbMutex_Unlock(g_msgmutex); \
+  } while (0)
+
+static const int g_tabmax = 3;
+static const char* g_tabname[g_tabmax] = { "T1", "T2", "T4" };
+static const NdbDictionary::Table* g_tabptr[g_tabmax] = { 0, 0, 0 };
+
+static int
+runCreate(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  NdbDictionary::Dictionary* pDic = pNdb->getDictionary();
+  int result = NDBT_OK;
+  int tabmask = ctx->getProperty("TABMASK", (Uint32)0);
+
+  for (int i = 0; i < g_tabmax; i++)
+  {
+    if (!(tabmask & (1 << i)))
+      continue;
+    const char* tabname = g_tabname[i];
+    (void)pDic->dropTable(tabname);
+
+    const NdbDictionary::Table* pTab = NDBT_Tables::getTable(tabname);
+    require(pTab != 0);
+    NdbDictionary::Table tab2(*pTab);
+    // make sure to hit all log parts
+    tab2.setFragmentType(NdbDictionary::Object::FragAllLarge);
+    //tab2.setMaxRows(100000000);
+    CHK2(pDic->createTable(tab2) == 0, pDic->getNdbError());
+
+    g_tabptr[i] = pDic->getTable(tabname);
+    require(g_tabptr[i] != 0);
+    info("created " << tabname);
+  }
+
+  return result;
+}
+
+static int
+runDrop(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  NdbDictionary::Dictionary* pDic = pNdb->getDictionary();
+  int result = NDBT_OK;
+  int tabmask = ctx->getProperty("TABMASK", (Uint32)0);
+
+  for (int i = 0; i < g_tabmax; i++)
+  {
+    if (!(tabmask & (1 << i)))
+      continue;
+    const char* tabname = g_tabname[i];
+    if (g_tabptr[i] != 0)
+    {
+      CHK2(pDic->dropTable(tabname) == 0, pDic->getNdbError());
+      g_tabptr[i] = 0;
+      info("dropped " << tabname);
+    }
+  }
+
+  return result;
+}
+
+// 0-writer has not seen 410 error
+// 1-writer sees 410 error
+// 2-longtrans has rolled back
+
+static int
+get_err410(NDBT_Context* ctx)
+{
+  int v = (int)ctx->getProperty("ERR410", (Uint32)0);
+  require(v == 0 || v == 1 || v == 2);
+  return v;
+}
+
+static void
+set_err410(NDBT_Context* ctx, int v)
+{
+  require(v == 0 || v == 1 || v == 2);
+  require(get_err410(ctx) != v);
+  ctx->setProperty("ERR410", (Uint32)v);
+}
+
+static int
+runLongtrans(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  int result = NDBT_OK;
+  const int sleep410 = ctx->getProperty("SLEEP410", (Uint32)0);
+
+  const NdbDictionary::Table* pTab = g_tabptr[0];
+  require(pTab != 0);
+
+  info("longtrans: start");
+  int loop = 0;
+  while (!ctx->isTestStopped())
+  {
+    info("longtrans: loop " << loop);
+    HugoOperations ops(*pTab);
+    ops.setQuiet();
+    CHK2(ops.startTransaction(pNdb) == 0, ops.getNdbError());
+    CHK2(ops.pkInsertRecord(pNdb, 0, 1, 0) == 0, ops.getNdbError());
+
+    while (!ctx->isTestStopped())
+    {
+      int v = get_err410(ctx);
+      require(v == 0 || v == 1);
+      if (v != 0)
+      {
+        info("longtrans: 410 seen");
+        if (sleep410 > 0)
+        {
+          info("longtrans: sleep " << sleep410);
+          sleep(sleep410);
+        }
+
+        CHK2(ops.execute_Rollback(pNdb) == 0, ops.getNdbError());
+        ops.closeTransaction(pNdb);
+        info("longtrans: rollback done");
+        set_err410(ctx, 2);
+
+        while (!ctx->isTestStopped())
+        {
+          int v = get_err410(ctx);
+          if (v != 0)
+          {
+          }
+          else
+          {
+            info("longtrans: 410 cleared");
+            break;
+          }
+          sleep(1);
+        }
+        break;
+      }
+      sleep(1);
+    }
+    CHK1(result == NDBT_OK);
+
+    if (ops.getTransaction() != NULL)
+    {
+      info("longtrans: close leftover transaction");
+      ops.closeTransaction(pNdb);
+    }
+    
+    loop++;
+  }
+
+  info("longtrans: stop");
+  return result;
+}
+
+static int
+run_write_ops(NDBT_Context* ctx, NDBT_Step* step, int upval, NdbError& err)
+{
+  Ndb* pNdb = GETNDB(step);
+  const int records = ctx->getNumRecords();
+  int result = NDBT_OK;
+
+  const NdbDictionary::Table* pTab = g_tabptr[1];
+  require(pTab != 0);
+
+  while (!ctx->isTestStopped())
+  {
+    HugoOperations ops(*pTab);
+    ops.setQuiet();
+    CHK2(ops.startTransaction(pNdb) == 0, ops.getNdbError());
+
+    for (int record = 0; record < records; record++)
+    {
+      CHK2(ops.pkWriteRecord(pNdb, record, 1, upval) == 0, ops.getNdbError());
+    }
+    CHK1(result == NDBT_OK);
+
+    int ret = ops.execute_Commit(pNdb);
+    err = ops.getNdbError();
+    ops.closeTransaction(pNdb);
+
+    if (ret == 0)
+      break;
+
+    require(err.code != 0);
+    CHK2(err.status == NdbError::TemporaryError, err);
+
+    if (err.code == 410)
+      break;
+
+    info("write: continue on " << err);
+    NdbSleep_MilliSleep(100);
+  }
+
+  return result;
+}
+
+static int
+runWriteOK(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result = NDBT_OK;
+
+  info("write: start");
+  int loop = 0;
+  int upval = 0;
+
+  while (!ctx->isTestStopped())
+  {
+    if (loop % 100 == 0)
+      info("write: loop " << loop);
+
+    NdbError err;
+    CHK2(run_write_ops(ctx, step, upval++, err) == 0, err);
+    require(err.code == 0 || err.code == 410);
+    CHK2(err.code == 0, err);
+    NdbSleep_MilliSleep(100);
+
+    loop++;
+  }
+
+  return result;
+}
+
+static int
+runWrite410(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result = NDBT_OK;
+  const int loops = ctx->getNumLoops();
+
+  info("write: start");
+  int loop = 0;
+  int upval = 0;
+
+  while (loop < loops && !ctx->isTestStopped())
+  {
+    info("write: loop " << loop);
+
+    while (!ctx->isTestStopped())
+    {
+      NdbError err;
+      CHK2(run_write_ops(ctx, step, upval++, err) == 0, err);
+      if (err.code != 0)
+      {
+        require(err.code == 410);
+        info("write: setting 410");
+        set_err410(ctx, 1);
+        break;
+      }
+      NdbSleep_MilliSleep(100);
+    }
+
+    while (1)
+    {
+      int v = get_err410(ctx);
+      if (v != 2)
+      {
+        require(v == 1);
+      }
+      else
+      {
+        info("write: longtrans rollback seen");
+        break;
+      }
+      sleep(1);
+    }
+
+    while (!ctx->isTestStopped())
+    {
+      NdbError err;
+      CHK2(run_write_ops(ctx, step, upval++, err) == 0, err);
+      if (err.code == 0)
+      {
+        info("write: clearing 410");
+        set_err410(ctx, 0);
+        break;
+      }
+      require(err.code == 410);
+      NdbSleep_MilliSleep(100);
+    }
+
+    loop++;
+  }
+
+  info("write: stop test");
+  ctx->stopTest();
+  return result;
+}
+
+struct OpLat {
+  const int m_op;
+  const int m_repeat;
+  // 2: 0-410 off 1-410 on
+  // 3: 0-op ok 1-op 410 2-op other temp error
+  NDBT_Stats m_lat[2][3];
+  OpLat(int op, int repeat) :
+    m_op(op),
+    m_repeat(repeat) {}
+};
+
+static void
+run_latency_report(const OpLat* oplist, int opcnt)
+{
+  for (int i = 0; i < opcnt; i++)
+  {
+    const OpLat& oplat = oplist[i];
+    printf("optype: %d\n", oplat.m_op);
+    for (int i0 = 0; i0 < 2; i0++)
+    {
+      printf("410 off/on: %d\n", i0);
+      printf("op status ok / 410 / other temp error:\n");
+      for (int i1 = 0; i1 < 3; i1++)
+      {
+        const NDBT_Stats& lat = oplat.m_lat[i0][i1];
+        printf("count: %d", lat.getCount());
+        if (lat.getCount() > 0)
+        {
+          printf(" mean: %.2f min: %.2f max: %.2f stddev: %.2f",
+                 lat.getMean(), lat.getMin(), lat.getMax(), lat.getStddev());
+        }
+        printf("\n");
+      }
+    }
+  }
+}
+
+static int
+run_latency_ops(NDBT_Context* ctx, NDBT_Step* step, OpLat& oplat, int upval, NdbError& err)
+{
+  Ndb* pNdb = GETNDB(step);
+  const int records = ctx->getNumRecords();
+  int result = NDBT_OK;
+
+  const NdbDictionary::Table* pTab = g_tabptr[2];
+  require(pTab != 0);
+
+  int record = 0;
+  while (record < records && !ctx->isTestStopped())
+  {
+    HugoOperations ops(*pTab);
+    ops.setQuiet();
+
+    MicroSecondTimer timer_start;
+    MicroSecondTimer timer_stop;
+    require(NdbTick_getMicroTimer(&timer_start) == 0);
+
+    CHK2(ops.startTransaction(pNdb) == 0, ops.getNdbError());
+
+    switch (oplat.m_op)
+    {
+      case NdbOperation::InsertRequest:
+        CHK2(ops.pkInsertRecord(pNdb, record, 1, upval) == 0, ops.getNdbError());
+        break;
+      case NdbOperation::UpdateRequest:
+        CHK2(ops.pkUpdateRecord(pNdb, record, 1, upval) == 0, ops.getNdbError());
+        break;
+      case NdbOperation::ReadRequest:
+        CHK2(ops.pkReadRecord(pNdb, record, 1) == 0, ops.getNdbError());
+        break;
+      case NdbOperation::DeleteRequest:
+        CHK2(ops.pkDeleteRecord(pNdb, record, 1) == 0, ops.getNdbError());
+        break;
+      default:
+        require(false);
+        break;
+    }
+    CHK2(result == NDBT_OK, "latency: ndbapi error at op " << oplat.m_op << " record" << record);
+
+    int ret = ops.execute_Commit(pNdb);
+    err = ops.getNdbError();
+    ops.closeTransaction(pNdb);
+
+    if (ret != 0)
+    {
+      require(err.code != 0);
+      CHK2(err.status == NdbError::TemporaryError, err);
+    }
+
+    require(NdbTick_getMicroTimer(&timer_stop) == 0);
+    NDB_TICKS tt = NdbTick_getMicrosPassed(timer_start, timer_stop);
+    require(tt > 0);
+    double td = (double)tt;
+    int i0 = get_err410(ctx);
+    int i1 = (ret == 0 ? 0 : (err.code == 410 ? 1 : 2));
+    NDBT_Stats& lat = oplat.m_lat[i0][i1];
+    lat.addObservation(td);
+
+    if (ret == 0)
+      record++;
+  }
+
+  return result;
+}
+
+static int
+runLatency(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result = NDBT_OK;
+
+  info("latency: start");
+  const int opcnt = 4; 
+  OpLat oplist[opcnt] = {
+    OpLat(NdbOperation::InsertRequest, 1),
+    OpLat(NdbOperation::UpdateRequest, 10),
+    OpLat(NdbOperation::ReadRequest, 5),
+    OpLat(NdbOperation::DeleteRequest, 1)
+  };
+
+  int loop = 0;
+  int upval = 0;
+
+  while (!ctx->isTestStopped())
+  {
+    info("latency: loop " << loop);
+    for (int i = 0; i < opcnt && !ctx->isTestStopped(); i++)
+    {
+      OpLat& oplat = oplist[i];
+      NdbError err;
+      for (int j = 0; j < oplat.m_repeat && !ctx->isTestStopped(); j++)
+      {
+        CHK2(run_latency_ops(ctx, step, oplat, upval, err) == 0, err);
+        upval++;
+      }
+      CHK1(result == NDBT_OK);
+    }
+    loop++;
+  }
+
+  run_latency_report(oplist, opcnt);
+  return result;
+}
+
+// gnu bitmap madness
+#undef reset
+#undef isset
+
+struct LogPos {
+  int m_fileno;
+  int m_mb;
+  int m_pos; // absolute mb
+  friend NdbOut& operator<<(NdbOut&, const LogPos&);
+};
+
+NdbOut&
+operator<<(NdbOut& out, const LogPos& pos)
+{
+  out << pos.m_fileno;
+  out << "." << pos.m_mb;
+  out << "-" << pos.m_pos;
+  return out;
+}
+
+struct LogPart {
+  int m_partno; // for print
+  bool m_set;
+  int m_files; // redo files
+  int m_filesize; // mb
+  int m_total; // m_files * m_filesize
+  int m_free; // mb
+  int m_used; // mb
+  LogPos m_head;
+  LogPos m_tail;
+  int m_fileused;
+  void reset() {
+    m_set = false;
+  }
+  bool isset() {
+    return m_set;
+  }
+  friend NdbOut& operator<<(NdbOut&, const LogPart&);
+};
+
+NdbOut&
+operator<<(NdbOut& out, const LogPart& lp)
+{
+  out << "part " << lp.m_partno << ":";
+  out << " files=" << lp.m_files;
+  out << " filesize=" << lp.m_filesize;
+  out << " total=" << lp.m_total;
+  out << " free=" << lp.m_free;
+  out << " head: " << lp.m_head;
+  out << " tail: "  << lp.m_tail;
+  out << " fileused=" << lp.m_fileused;
+  return out;
+}
+
+struct LogNode {
+  int m_nodeid;
+  LogPart m_logpart[4];
+  int m_files; // from LogPart (must be same for all)
+  int m_filesize;
+  int m_minfds; // min and max FDs in page 0
+  int m_maxfds; // LQH uses max FDs by default
+  void reset() {
+    for (int i = 0; i < 4; i++) {
+      m_logpart[i].m_partno = i;
+      m_logpart[i].reset();
+    }
+  }
+  bool isset() {
+    for (int i = 0; i < 4; i++)
+      if (!m_logpart[i].isset())
+        return false;
+    return true;
+  }
+};
+
+struct LogInfo {
+  int m_nodes;
+  LogNode* m_lognode;
+  int m_files; // from LogNode (config is same for all in these tests)
+  int m_filesize;
+  int m_minfds;
+  int m_maxfds;
+  LogInfo(int nodes) {
+    m_nodes = nodes;
+    m_lognode = new LogNode [nodes];
+    reset();
+  }
+  ~LogInfo() {
+    m_nodes = 0;
+    delete [] m_lognode;
+  }
+  void reset() {
+    for (int n = 0; n < m_nodes; n++)
+      m_lognode[n].reset();
+  }
+  bool isset() {
+    for (int n = 0; n < m_nodes; n++)
+      if (!m_lognode[n].isset())
+        return false;
+    return true;
+  }
+  LogNode* findnode(int nodeid) const {
+    for (int n = 0; n < m_nodes; n++) {
+      if (m_lognode[n].m_nodeid == nodeid)
+        return &m_lognode[n];
+    }
+    return 0;
+  }
+  void copyto(LogInfo& li2) const {
+    require(m_nodes == li2.m_nodes);
+    for (int n = 0; n < m_nodes; n++) {
+      const LogNode& ln1 = m_lognode[n];
+      LogNode& ln2 = li2.m_lognode[n];
+      ln2 = ln1;
+    }
+  }
+};
+
+static int
+get_nodestatus(NdbMgmHandle h, LogInfo& li)
+{
+  int result = 0;
+  ndb_mgm_cluster_state* cs = 0;
+
+  do
+  {
+    require(h != 0);
+    CHK2((cs = ndb_mgm_get_status(h)) != 0, ndb_mgm_get_latest_error_msg(h));
+    int n = 0;
+    for (int i = 0; i < cs->no_of_nodes; i++)
+    {
+      ndb_mgm_node_state& ns = cs->node_states[i];
+      if (ns.node_type == NDB_MGM_NODE_TYPE_NDB)
+      {
+        // called only when all started
+        CHK1(ns.node_status == NDB_MGM_NODE_STATUS_STARTED);
+        CHK1(n < li.m_nodes);
+
+        LogNode& ln = li.m_lognode[n];
+        ln.m_nodeid = ns.node_id;
+        info("node " << n << ": " << ln.m_nodeid);
+        n++;
+      }
+      CHK1(result == 0);
+    }
+    CHK1(n == li.m_nodes);
+  }
+  while (0);
+
+  free(cs);
+  info("get_nodestatus result=" << result);
+  return result;
+}
+
+static int
+get_redostatus(NdbMgmHandle h, LogInfo& li)
+{
+  int result = 0;
+
+  do
+  {
+    li.reset();
+    require(h != 0);
+
+    int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT, 0 };
+    NdbLogEventHandle evh = 0;
+    CHK2((evh = ndb_mgm_create_logevent_handle(h, filter)) != 0, ndb_mgm_get_latest_error_msg(h));
+
+    for (int n = 0; n < li.m_nodes; n++)
+    {
+      int dump[] = { 2399 };
+      const LogNode& ln = li.m_lognode[n];
+      struct ndb_mgm_reply reply;
+      CHK2(ndb_mgm_dump_state(h, ln.m_nodeid, dump, 1, &reply) == 0, ndb_mgm_get_latest_error_msg(h));
+    }
+    CHK1(result == 0);
+
+    int maxcnt = 4 * li.m_nodes;
+    int rescnt = 0;
+    time_t start = time(0);
+    int maxwait = 5;
+
+    while (rescnt < maxcnt && time(0) < start + maxwait)
+    {
+      while (1)
+      {
+        int res;
+        ndb_logevent ev;
+        int msec = 100;
+        CHK2((res = ndb_logevent_get_next(evh, &ev, msec)) >= 0, ndb_mgm_get_latest_error_msg(h));
+        if (res == 0)
+        break;
+      if (ev.type != NDB_LE_RedoStatus)
+          continue;
+
+        LogNode* lnptr = 0;
+        CHK2((lnptr = li.findnode(ev.source_nodeid)) != 0, "unknown nodeid " << ev.source_nodeid);
+        LogNode& ln = *lnptr;
+
+        const ndb_logevent_RedoStatus& rs = ev.RedoStatus;
+        CHK1(rs.log_part < 4);
+        LogPart& lp = ln.m_logpart[rs.log_part];
+
+        CHK1(!lp.m_set);
+        LogPos& head = lp.m_head;
+        LogPos& tail = lp.m_tail;
+        lp.m_files = rs.no_logfiles;
+        lp.m_filesize = rs.logfilesize;
+        head.m_fileno = rs.head_file_no;
+        head.m_mb = rs.head_mbyte;
+        head.m_pos = head.m_fileno * lp.m_filesize + head.m_mb;
+        tail.m_fileno = rs.tail_file_no;
+        tail.m_mb = rs.tail_mbyte;
+        tail.m_pos = tail.m_fileno * lp.m_filesize + tail.m_mb;
+        CHK1(rs.total_hi == 0 && rs.total_lo < (1u << 31));
+        lp.m_total = rs.total_lo;
+        CHK1(rs.free_hi == 0 && rs.free_lo < (1u << 31));
+        lp.m_free = rs.free_lo;
+        lp.m_used = lp.m_total - lp.m_free;
+
+        // set number of files used
+        if (tail.m_fileno < head.m_fileno)
+        {
+          lp.m_fileused = head.m_fileno - tail.m_fileno + 1;
+        }
+        else if (tail.m_fileno > head.m_fileno)
+        {
+          lp.m_fileused = lp.m_files - (tail.m_fileno - head.m_fileno - 1);
+        }
+        else if (tail.m_pos < head.m_pos)
+        {
+          lp.m_fileused = 1;
+        }
+        else if (tail.m_pos > head.m_pos)
+        {
+          lp.m_fileused = lp.m_files;
+        }
+        else
+        {
+          lp.m_fileused = 0;
+        }
+
+        // sanity checks
+        {
+          CHK2(lp.m_total == lp.m_files * lp.m_filesize, lp);
+          CHK2(head.m_fileno < lp.m_files, lp);
+          CHK2(head.m_mb < lp.m_filesize, lp);
+          require(head.m_pos < lp.m_total);
+          CHK2(tail.m_fileno < lp.m_files, lp);
+          CHK2(tail.m_mb < lp.m_filesize, lp);
+          require(tail.m_pos < lp.m_total);
+          CHK2(lp.m_free <= lp.m_total, lp);
+          if (tail.m_pos <= head.m_pos)
+          {
+            CHK2(lp.m_free == lp.m_total - (head.m_pos - tail.m_pos), lp);
+          }
+          else
+          {
+            CHK2(lp.m_free == tail.m_pos - head.m_pos, lp);
+          }
+        }
+        lp.m_set = true;
+        //info("node " << ln.m_nodeid << ": " << lp);
+
+        rescnt++;
+      }
+      CHK1(result == 0);
+    }
+    CHK1(result == 0);
+    CHK2(rescnt == maxcnt, "got events " << rescnt << " != " << maxcnt);
+    require(li.isset()); // already implied by counts
+
+    for (int n = 0; n < li.m_nodes; n++)
+    {
+      LogNode& ln = li.m_lognode[n];
+      for (int i = 0; i < 4; i++)
+      {
+        LogPart& lp = ln.m_logpart[i];
+        if (i == 0)
+        {
+          ln.m_files = lp.m_files;
+          ln.m_filesize = lp.m_filesize;
+          CHK1(ln.m_files >= 3 && ln.m_filesize >= 4);
+
+          // see Dblqh::execREAD_CONFIG_REQ()
+          ln.m_minfds = 2;
+          ln.m_maxfds = (8192 - 32 - 128) / (3 * ln.m_filesize);
+          if (ln.m_maxfds > 40)
+            ln.m_maxfds = 40;
+          CHK1(ln.m_minfds <= ln.m_maxfds);
+        }
+        else
+        {
+          CHK1(ln.m_files == lp.m_files && ln.m_filesize == lp.m_filesize);
+        }
+      }
+
+      if (n == 0)
+      {
+        li.m_files = ln.m_files;
+        li.m_filesize = ln.m_filesize;
+        li.m_minfds = ln.m_minfds;
+        li.m_maxfds = ln.m_maxfds;
+        require(li.m_files > 0 && li.m_filesize > 0);
+        require(li.m_minfds <= li.m_maxfds);
+      }
+      else
+      {
+        CHK1(li.m_files == ln.m_files && li.m_filesize == ln.m_filesize);
+        require(li.m_minfds == ln.m_minfds && li.m_maxfds == ln.m_maxfds);
+      }
+
+      CHK1(result == 0);
+    }
+    CHK1(result == 0);
+
+    ndb_mgm_destroy_logevent_handle(&evh);
+  }
+  while (0);
+
+  info("get_redostatus result=" << result);
+  return result;
+}
+
+// get node with max redo files used in some part
+
+struct LogUsed {
+  int m_nodeidx;
+  int m_nodeid;
+  int m_partno;
+  int m_used; // mb
+  LogPos m_head;
+  LogPos m_tail;
+  int m_fileused;
+  int m_rand; // randomize node to restart if file usage is same
+  friend NdbOut& operator<<(NdbOut&, const LogUsed&);
+};
+
+NdbOut&
+operator<<(NdbOut& out, const LogUsed& lu)
+{
+  out << "n=" << lu.m_nodeid;
+  out << " p=" << lu.m_partno;
+  out << " u=" << lu.m_used;
+  out << " h=" << lu.m_head;
+  out << " t=" << lu.m_tail;
+  out << " f=" << lu.m_fileused;
+  return out;
+}
+
+static int
+cmp_logused(const void* a1, const void* a2)
+{
+  const LogUsed& lu1 = *(const LogUsed*)a1;
+  const LogUsed& lu2 = *(const LogUsed*)a2;
+  int k = lu1.m_fileused - lu2.m_fileused;
+  if (k != 0)
+  {
+    // sorting by larger file usage
+    return (-1) * k;
+  }
+  return lu1.m_rand - lu2.m_rand;
+}
+
+struct LogMax {
+  int m_nodes;
+  LogUsed* m_logused;
+  LogMax(int nodes) {
+    m_nodes = nodes;
+    m_logused = new LogUsed [nodes];
+  };
+  ~LogMax() {
+    m_nodes = 0;
+    delete [] m_logused;
+  }
+};
+
+static void
+get_redoused(const LogInfo& li, LogMax& lx)
+{
+  require(li.m_nodes == lx.m_nodes);
+  for (int n = 0; n < li.m_nodes; n++)
+  {
+    const LogNode& ln = li.m_lognode[n];
+    LogUsed& lu = lx.m_logused[n];
+    lu.m_used = -1;
+    for (int i = 0; i < 4; i++)
+    {
+      const LogPart& lp = ln.m_logpart[i];
+      if (lu.m_used <  lp.m_used)
+      {
+        lu.m_nodeidx = n;
+        lu.m_nodeid = ln.m_nodeid;
+        lu.m_partno = i;
+        lu.m_used = lp.m_used;
+        lu.m_head = lp.m_head;
+        lu.m_tail = lp.m_tail;
+        lu.m_fileused = lp.m_fileused;
+        lu.m_rand = myRandom48(100);
+      }
+    }
+  }
+  qsort(lx.m_logused, lx.m_nodes, sizeof(LogUsed), cmp_logused);
+  for (int n = 0; n + 1 < li.m_nodes; n++)
+  {
+    const LogUsed& lu1 = lx.m_logused[n];
+    const LogUsed& lu2 = lx.m_logused[n + 1];
+    require(lu1.m_fileused >= lu2.m_fileused);
+  }
+}
+
+struct LogDiff {
+  bool m_tailmove; // all tails since all redo parts are used
+};
+
+static void
+get_redodiff(const LogInfo& li1, const LogInfo& li2, LogDiff& ld)
+{
+  require(li1.m_nodes == li2.m_nodes);
+  ld.m_tailmove = true;
+  for (int i = 0; i < li1.m_nodes; i++)
+  {
+    LogNode& ln1 = li1.m_lognode[i];
+    LogNode& ln2 = li2.m_lognode[i];
+    for (int j = 0; j < 4; j++)
+    {
+      LogPart& lp1 = ln1.m_logpart[j];
+      LogPart& lp2 = ln2.m_logpart[j];
+      if (lp1.m_tail.m_pos == lp2.m_tail.m_pos)
+      {
+        ld.m_tailmove = false;
+      }
+    }
+  }
+}
+
+static int
+runRestartOK(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result = NDBT_OK;
+  const int loops = ctx->getNumLoops();
+  NdbRestarter restarter;
+
+  info("restart01: start");
+  int nodes = restarter.getNumDbNodes();
+  require(nodes >= 1);
+  info("restart: nodes " << nodes);
+
+  if (nodes == 1)
+  {
+    info("restart01: need at least 2 nodes");
+    return result;
+  }
+
+  int nodeidx = myRandom48(nodes);
+  int nodeid = restarter.getDbNodeId(nodeidx);
+  info("restart01: using nodeid " << nodeid);
+
+  LogInfo logInfo(nodes);
+
+  int loop = 0;
+  while (loop < loops && !ctx->isTestStopped())
+  {
+    info("restart01: loop " << loop);
+    CHK1(get_nodestatus(restarter.handle, logInfo) == 0);
+
+    bool fi = false;
+    bool fn = false;
+    bool fa = false;
+    info("restart01: restart nodeid " << nodeid);
+    CHK1(restarter.restartOneDbNode(nodeid, fi, fn, fa) == 0);
+    CHK1(restarter.waitClusterStarted() == 0);
+    info("restart01: cluster up again");
+
+    // let write run until redo wraps (no check yet)
+    sleep(300);
+    loop++;
+  }
+
+  info("restart01: stop test");
+  ctx->stopTest();
+  return result;
+}
+
+#define g_SETFDS "SETFDS"
+
+static int
+run_write_ops(NDBT_Context* ctx, NDBT_Step* step, int cnt, int& upval, NdbError& err)
+{
+  int result = NDBT_OK;
+
+  for (int i = 0; i < cnt && !ctx->isTestStopped(); i++)
+  {
+    CHK2(run_write_ops(ctx, step, upval++, err) == 0, err);
+    if (err.code != 0)
+    {
+      require(err.code == 410);
+      break;
+    }
+  }
+
+  return result;
+}
+
+static int
+get_newfds(const LogInfo& li)
+{
+  require(li.m_files >= 3);
+  int newfds = li.m_files - 1;
+  require(newfds >= li.m_minfds);
+  // twice to prefer smaller
+  newfds = li.m_minfds + myRandom48(newfds - li.m_minfds + 1);
+  newfds = li.m_minfds + myRandom48(newfds - li.m_minfds + 1);
+  return newfds;
+}
+
+static int
+get_limfds(const LogInfo& li, int newfds)
+{
+  int off = li.m_files - newfds;
+  require(off > 0);
+  off= myRandom48(off + 1);
+  off = myRandom48(off + 1);
+  return newfds + off;
+}
+
+static int
+run_restart(NDBT_Context* ctx, NDBT_Step* step, int nodeid, bool fi)
+{
+  int result = NDBT_OK;
+  int setfds = ctx->getProperty(g_SETFDS, (Uint32)0xff);
+  require(setfds != 0xff);
+  int dump[2] = { 2396, setfds };
+  NdbRestarter restarter;
+  info("run_restart: nodeid=" << nodeid << " initial=" << fi << " setfds=" << setfds);
+
+  /*
+   * When starting non-initial the node(s) have already some setfds
+   * but it is lost on restart.  We must dump the same setfds again.
+   */
+  do
+  {
+    bool fn = true;
+    bool fa = false;
+    if (nodeid == 0)
+    {
+      info("run_restart: restart all nostart");
+      CHK1(restarter.restartAll(fi, fn, fa) == 0);
+      info("run_restart: wait nostart");
+      CHK1(restarter.waitClusterNoStart() == 0);
+      info("run_restart: dump " << dump[0] << " " << dump[1]);
+      CHK1(restarter.dumpStateAllNodes(dump, 2) == 0);
+      info("run_restart: start all");
+      CHK1(restarter.startAll() == 0);
+    }
+    else
+    {
+      info("run_restart: restart node nostart");
+      CHK1(restarter.restartOneDbNode(nodeid, fi, fn, fa) == 0);
+      info("run_restart: wait nostart");
+      CHK1(restarter.waitNodesNoStart(&nodeid, 1) == 0);
+      info("run_restart: dump " << dump[0] << " " << dump[1]);
+      CHK1(restarter.dumpStateAllNodes(dump, 2) == 0);
+      info("run_restart: start all");
+      CHK1(restarter.startAll() == 0);
+    }
+    info("run_restart: wait started");
+    CHK1(restarter.waitClusterStarted() == 0);
+    info("run_restart: started");
+  }
+  while (0);
+
+  info("run_restart: result=" << result);
+  return result;
+}
+
+static int
+run_start_lcp(NdbRestarter& restarter)
+{
+  int result = NDBT_OK;
+  int dump[1] = { 7099 };
+  do
+  {
+    CHK1(restarter.dumpStateAllNodes(dump, 1) == 0);
+  }
+  while (0);
+  info("run_start_lcp: result=" << result);
+  return result;
+}
+
+/*
+ * Start long trans to freeze log tail.  Run writes until over
+ * FDs stored in zero-pages (may hit 410).  Run restart (which
+ * aborts long trans) and verify log tail moves (must not hit 410).
+ * At start and every 5 loops do initial restart and DUMP to
+ * change number of FDs stored to a random number between 2
+ * (minimum) and number of redo log files minus 1.
+ */
+ 
+static int
+runRestartFD(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  int result = NDBT_OK;
+  const int loops = ctx->getNumLoops();
+  const bool srflag = ctx->getProperty("SRFLAG", (Uint32)0);
+  NdbRestarter restarter;
+
+  info("restart: start srflag=" << srflag);
+  int nodes = restarter.getNumDbNodes();
+  require(nodes >= 1);
+  info("restart: nodes " << nodes);
+
+  if (nodes == 1 && !srflag)
+  {
+    info("restart: need at least 2 nodes");
+    return result;
+  }
+
+  LogInfo logInfo(nodes);
+  LogInfo logInfo2(nodes);
+  LogMax logMax(nodes);
+  LogDiff logDiff;
+
+  const NdbDictionary::Table* pTab = 0;
+
+  int upval = 0;
+  int loop = 0;
+  int newfds = 0;
+  int limfds = 0;
+  while (loop < loops && !ctx->isTestStopped())
+  {
+    info("restart: loop " << loop);
+    if (loop % 5 == 0)
+    {
+      CHK1(get_nodestatus(restarter.handle, logInfo) == 0);
+      CHK1(get_redostatus(restarter.handle, logInfo) == 0);
+
+      // set new cmaxLogFilesInPageZero in all LQH nodes
+      newfds = get_newfds(logInfo);
+      ctx->setProperty(g_SETFDS, (Uint32)newfds);
+      bool nodeid = 0; // all nodes
+      bool fi = true; // initial start
+      CHK1(run_restart(ctx, step, nodeid, fi) == 0);
+
+      CHK1(runCreate(ctx, step) == 0);
+      pTab = g_tabptr[0];
+      require(pTab != 0);
+    }
+
+    // start long trans
+    HugoOperations ops(*pTab);
+    ops.setQuiet();
+    CHK2(ops.startTransaction(pNdb) == 0, ops.getNdbError());
+    for (int i = 0; i < 100; i++)
+    {
+      CHK2(ops.pkInsertRecord(pNdb, i, 1, 0) == 0, ops.getNdbError());
+    }
+    CHK2(ops.execute_NoCommit(pNdb) == 0, ops.getNdbError());
+
+    // randomize load1 limit a bit upwards
+    limfds = get_limfds(logInfo, newfds);
+    // may be up to logInfo.m_files and then hit 410
+    require(newfds <= limfds && limfds <= logInfo.m_files);
+    info("restart: newfds=" << newfds << " limfds=" << limfds);
+
+    // start load1 loop
+    info("restart: load1");
+    while (!ctx->isTestStopped())
+    {
+      info("restart: load1 at " << upval);
+      NdbError err;
+      int cnt = 100 + myRandom48(100);
+      CHK1(run_write_ops(ctx, step, cnt, upval, err) == 0);
+
+      CHK1(get_redostatus(restarter.handle, logInfo) == 0);
+      get_redoused(logInfo, logMax);
+      info("restart: load1 max: " << logMax.m_logused[0]);
+      info("restart: load1 min: " << logMax.m_logused[nodes - 1]);
+
+      if (err.code != 0)
+      {
+        require(err.code == 410);
+        info("restart: break load1 on 410");
+        break;
+      }
+
+      int fileused = logMax.m_logused[0].m_fileused;
+      if (fileused > limfds)
+      {
+        info("restart: break load1 on file usage > FDs");
+        break;
+      }
+    }
+    CHK1(result == NDBT_OK);
+
+    // restart
+    if (srflag)
+    {
+      int nodeid = 0;
+      int fi = false;
+      CHK1(run_restart(ctx, step, nodeid, fi) == 0);
+    }
+    else
+    {
+      int nodeid = logMax.m_logused[0].m_nodeid;
+      int fi = false;
+      CHK1(run_restart(ctx, step, nodeid, fi) == 0);
+    }
+
+    // start load2 loop
+    info("restart: load2");
+    CHK1(get_redostatus(restarter.handle, logInfo) == 0);
+    logInfo.copyto(logInfo2);
+
+    // should be fast but allow for slow machines
+    int retry2 = 0;
+    while (!ctx->isTestStopped())
+    {
+      info("restart: load2 at " << upval);
+      NdbError err;
+      int cnt = 100 + myRandom48(100);
+      CHK1(run_write_ops(ctx, step, cnt, upval, err) == 0);
+
+      CHK1(get_redostatus(restarter.handle, logInfo2) == 0);
+      get_redoused(logInfo2, logMax);
+      info("restart: load2 max: " << logMax.m_logused[0]);
+      info("restart: load2 min: " << logMax.m_logused[nodes - 1]);
+
+      require(err.code == 0 || err.code == 410);
+      CHK2(retry2 < 60 || err.code == 0, err);
+
+      get_redodiff(logInfo, logInfo2, logDiff);
+      if (logDiff.m_tailmove)
+      {
+        info("restart: break load2");
+        break;
+      }
+
+      info("restart: retry2=" << retry2);
+      if (retry2 % 5 == 0)
+      {
+        CHK1(run_start_lcp(restarter) == 0);
+        NdbSleep_MilliSleep(1000);
+      }
+      retry2++;
+    }
+    CHK1(result == NDBT_OK);
+
+    sleep(1 + myRandom48(10));
+    loop++;
+  }
+
+  info("restart: stop test");
+  ctx->stopTest();
+  return result;
+}
+
+static int
+runResetFD(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result = NDBT_OK;
+  int oldfds = ctx->getProperty(g_SETFDS, (Uint32)-1);
+  do
+  {
+    if (oldfds == -1)
+    {
+      // never changed (some step failed)
+      break;
+    }
+    ctx->setProperty(g_SETFDS, (Uint32)0);
+    CHK1(run_restart(ctx, step, 0, true) == 0);
+  }
+  while (0);
+  return result;
+}
+
+NDBT_TESTSUITE(testRedo);
+TESTCASE("WriteOK", 
+	 "Run only write to verify REDO size is adequate"){
+  TC_PROPERTY("TABMASK", (Uint32)(2));
+  INITIALIZER(runCreate);
+  STEP(runWriteOK);
+  FINALIZER(runDrop);
+}
+TESTCASE("Bug36500", 
+	 "Long trans and recovery from 410"){
+  TC_PROPERTY("TABMASK", (Uint32)(1|2));
+  INITIALIZER(runCreate);
+  STEP(runLongtrans);
+  STEP(runWrite410);
+  FINALIZER(runDrop);
+}
+TESTCASE("Latency410", 
+	 "Transaction latency under 410"){
+  TC_PROPERTY("TABMASK", (Uint32)(1|2|4));
+  TC_PROPERTY("SLEEP410", (Uint32)60);
+  INITIALIZER(runCreate);
+  STEP(runLongtrans);
+  STEP(runWrite410);
+  STEP(runLatency);
+  FINALIZER(runDrop);
+}
+TESTCASE("RestartOK", 
+	 "Node restart"){
+  TC_PROPERTY("TABMASK", (Uint32)(2));
+  INITIALIZER(runCreate);
+  STEP(runWriteOK);
+  STEP(runRestartOK);
+  FINALIZER(runDrop);
+}
+TESTCASE("RestartFD", 
+	 "Long trans and node restart with few LQH FDs"){
+  TC_PROPERTY("TABMASK", (Uint32)(1|2));
+  STEP(runRestartFD);
+  FINALIZER(runDrop);
+  FINALIZER(runResetFD);
+}
+TESTCASE("RestartFDSR", 
+	 "RestartFD using system restart"){
+  TC_PROPERTY("TABMASK", (Uint32)(1|2));
+  TC_PROPERTY("SRFLAG", (Uint32)1);
+  STEP(runRestartFD);
+  FINALIZER(runDrop);
+  FINALIZER(runResetFD);
+}
+NDBT_TESTSUITE_END(testRedo);
+
+
+int
+main(int argc, const char** argv)
+{
+  ndb_init();
+  NDBT_TESTSUITE_INSTANCE(testRedo);
+  testRedo.setCreateTable(false);
+  myRandom48Init(NdbTick_CurrentMillisecond());
+  g_msgmutex = NdbMutex_Create();
+  require(g_msgmutex != 0);
+  int ret = testRedo.execute(argc, argv);
+  NdbMutex_Destroy(g_msgmutex);
+  return ret;
+}

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2012-06-26 09:18:05 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2012-08-13 13:03:29 +0000
@@ -1875,3 +1875,7 @@ max-time: 300
 cmd: testScan
 args:  -n extraNextResultBug11748194 T1
 
+max-time: 600
+cmd: testRedo
+args: -n RestartFD -l 2 T1
+

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4590 to 4591) Frazer Clement13 Aug