List:Commits« Previous MessageNext Message »
From:Leonard Zhou Date:November 10 2008 1:31am
Subject:bzr commit into mysql-5.1 branch (leonard:2707)
View as plain text  
#At file:///home/zhl/mysql/testredo/

 2707 Leonard Zhou	2008-11-10
      For merger to 6.4
modified:
  storage/ndb/include/kernel/signaldata/BackupImpl.hpp
  storage/ndb/include/kernel/signaldata/BackupSignalData.hpp
  storage/ndb/include/mgmapi/mgmapi.h
  storage/ndb/src/kernel/blocks/ERROR_codes.txt
  storage/ndb/src/kernel/blocks/backup/Backup.cpp
  storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp
  storage/ndb/src/mgmapi/mgmapi.cpp
  storage/ndb/src/mgmclient/CommandInterpreter.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.hpp
  storage/ndb/src/mgmsrv/Services.cpp
  storage/ndb/test/include/NdbBackup.hpp
  storage/ndb/test/ndbapi/testBackup.cpp
  storage/ndb/test/src/NdbBackup.cpp
  storage/ndb/tools/restore/Restore.cpp
  storage/ndb/tools/restore/Restore.hpp

=== modified file 'storage/ndb/include/kernel/signaldata/BackupImpl.hpp'
--- a/storage/ndb/include/kernel/signaldata/BackupImpl.hpp	2008-03-03 11:12:37 +0000
+++ b/storage/ndb/include/kernel/signaldata/BackupImpl.hpp	2008-11-10 00:31:12 +0000
@@ -65,6 +65,7 @@ private:
    * Backup flags
    */
   /* & 0x3 - waitCompleted
+   * & 0x4 - Use undo log
    */
   Uint32 flags;
 };

=== modified file 'storage/ndb/include/kernel/signaldata/BackupSignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/BackupSignalData.hpp	2008-03-03 11:12:37 +0000
+++ b/storage/ndb/include/kernel/signaldata/BackupSignalData.hpp	2008-11-10 00:31:12 +0000
@@ -36,11 +36,14 @@ class BackupReq {
   friend bool printBACKUP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
 public:
   STATIC_CONST( SignalLength = 4 );
+  STATIC_CONST( WAITCOMPLETED = 0x3 );
+  STATIC_CONST( USE_UNDO_LOG = 0x4 );
 
 private:
   Uint32 senderData;
   Uint32 backupDataLen;
   /* & 0x3 - waitCompleted
+   * & 0x4 - use undo log
    */
   Uint32 flags;
   Uint32 inputBackupId;

=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h	2008-08-07 03:45:20 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h	2008-11-10 00:31:12 +0000
@@ -1037,6 +1037,27 @@ extern "C" {
 			   unsigned int input_backupId);
 
   /**
+   * Start backup
+   *
+   * @param   handle          NDB management handle.
+   * @param   wait_completed  0:  Don't wait for confirmation<br>
+   *                          1:  Wait for backup to be started<br>
+   *                          2:  Wait for backup to be completed
+   * @param   backup_id       Backup ID is returned from function.
+   * @param   reply           Reply message.
+   * @param   input_backupId  run as backupId and set next backup id to input_backupId+1.
+   * @param   backuppoint     Backup happen at start time(1) or complete time(0).
+   * @return                  -1 on error.
+   * @note                    backup_id will not be returned if
+   *                          wait_completed == 0
+   */
+  int ndb_mgm_start_backup3(NdbMgmHandle handle, int wait_completed,
+			   unsigned int* backup_id,
+			   struct ndb_mgm_reply* reply,
+			   unsigned int input_backupId,
+			   unsigned int backuppoint);
+
+  /**
    * Abort backup
    *
    * @param   handle        NDB management handle.

=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-08-11 11:24:12 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-11-10 00:31:12 +0000
@@ -8,7 +8,7 @@ Next DBDICT 6008
 Next DBDIH 7215
 Next DBTC 8064
 Next CMVMI 9000
-Next BACKUP 10041
+Next BACKUP 10042
 Next DBUTIL 11002
 Next DBTUX 12008
 Next SUMA 13043
@@ -485,7 +485,7 @@ Backup Stuff:
 
 10036: Halt backup for table >= 2
 10037: Resume backup (from 10036)
-
+10041: delay backup after create trigger. for testing undo log file
 11001: Send UTIL_SEQUENCE_REF (in master)
 
 5028:  Crash when receiving LQHKEYREQ (in non-master)

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-09-30 07:40:45 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-11-10 00:31:12 +0000
@@ -1139,10 +1139,7 @@ Backup::execBACKUP_REQ(Signal* signal)
   ptr.p->flags = flags;
   ptr.p->masterRef = reference();
   ptr.p->nodes = c_aliveNodes;
-  if(input_backupId)
-    ptr.p->backupId = input_backupId;
-  else
-    ptr.p->backupId = 0;
+  ptr.p->backupId = input_backupId;
   ptr.p->backupKey[0] = 0;
   ptr.p->backupKey[1] = 0;
   ptr.p->backupDataLen = 0;
@@ -1553,7 +1550,21 @@ Backup::sendCreateTrig(Signal* signal, 
   req->setAttributeMask(attrMask);
   req->setTableId(tabPtr.p->tableId);
   req->setIndexId(RNIL);        // not used
-  req->setTriggerType(TriggerType::SUBSCRIPTION);
+  /*
+   * We always send PK for any operations and any triggertypes.
+   * For SUBSCRIPTION_BEFORE
+   *   We send after image for INSERT.
+   *   We send before image for DELETE.
+   *   We send before+after image for UPDATE.
+   * For SUBSCRIPTION
+   *   We send after image for INSERT.
+   *   We send only PK for DELETE.
+   *   We send after image for UPDATE.
+   */
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+    req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+  else
+    req->setTriggerType(TriggerType::SUBSCRIPTION);
   req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
   req->setMonitorReplicas(true);
   req->setMonitorAllAttributes(false);
@@ -1777,8 +1788,16 @@ Backup::startBackupReply(Signal* signal,
   waitGCPReq->senderRef = reference();
   waitGCPReq->senderData = ptr.i;
   waitGCPReq->requestType = WaitGCPReq::CompleteForceStart;
-  sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
-	     WaitGCPReq::SignalLength,JBB);
+  //we delay 10 seconds for testcases to generate events to be recorded in the UNDO log
+//  if (ERROR_INSERTED(10041))
+  if (1)
+  {
+    ndbout_c("zhl will delay 20 seconds to wait");
+    sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal, 30*1000,
WaitGCPReq::SignalLength);
+  }
+  else
+    sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
+    	       WaitGCPReq::SignalLength,JBB);
 }
 
 void
@@ -2206,7 +2225,10 @@ Backup::sendDropTrig(Signal* signal, Bac
   req->setRequestType(DropTrigReq::RT_USER);
   req->setIndexId(RNIL);
   req->setTriggerInfo(0);       // not used on DROP
-  req->setTriggerType(TriggerType::SUBSCRIPTION);
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+    req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+  else
+    req->setTriggerType(TriggerType::SUBSCRIPTION);
   req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
 
   ptr.p->slaveData.dropTrig.tableId = tabPtr.p->tableId;
@@ -2671,7 +2693,7 @@ Backup::execDEFINE_BACKUP_REQ(Signal* si
   if(senderRef == reference())
     ptr.p->flags = req->flags;
   else
-    ptr.p->flags = req->flags & ~((Uint32)0x3); /* remove waitCompleted flags
+    ptr.p->flags = req->flags & ~((Uint32)BackupReq::WAITCOMPLETED); /* remove
waitCompleted flags
 						 * as non master should never
 						 * reply
 						 */
@@ -2781,7 +2803,10 @@ Backup::execDEFINE_BACKUP_REQ(Signal* si
       ptr.p->ctlFilePtr = files[i].i;
       break;
     case 1:
-      files[i].p->fileType = BackupFormat::LOG_FILE;
+      if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+        files[i].p->fileType = BackupFormat::UNDO_FILE;
+      else
+        files[i].p->fileType = BackupFormat::LOG_FILE;
       ptr.p->logFilePtr = files[i].i;
       break;
     case 2:
@@ -2949,6 +2974,10 @@ Backup::openFiles(Signal* signal, Backup
   c_backupFilePool.getPtr(filePtr, ptr.p->logFilePtr);
   filePtr.p->m_flags |= BackupFile::BF_OPENING;
   
+  //write uncompressed log file when enable undo log,since log file is read from back to
front.
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+    req->fileFlags &= ~FsOpenReq::OM_GZ;
+ 
   req->userPointer = filePtr.i;
   FsOpenReq::setVersion(req->fileNumber, 2);
   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_LOG);
@@ -2964,6 +2993,8 @@ Backup::openFiles(Signal* signal, Backup
 
   if (c_defaults.m_o_direct)
     req->fileFlags |= FsOpenReq::OM_DIRECT;
+  if (c_defaults.m_compressed_backup)
+    req->fileFlags |= FsOpenReq::OM_GZ;
   req->userPointer = filePtr.i;
   FsOpenReq::setVersion(req->fileNumber, 2);
   FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
@@ -3067,8 +3098,14 @@ Backup::openFilesReply(Signal* signal, 
       return;
     }//if
     
+    BackupFormat::FileType logfiletype;
+    if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+      logfiletype = BackupFormat::UNDO_FILE;
+    else
+      logfiletype = BackupFormat::LOG_FILE;
+
     ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
-    if(!insertFileHeader(BackupFormat::LOG_FILE, ptr.p, filePtr.p)) {
+    if(!insertFileHeader(logfiletype, ptr.p, filePtr.p)) {
       jam();
       defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
       return;
@@ -4362,13 +4399,27 @@ Backup::execTRIG_ATTRINFO(Signal* signal
     return;
   }//if
 
-  if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES) {
-    jam();
-    /**
-     * Backup is doing REDO logging and don't need before values
-     */
-    return;
-  }//if
+  BackupRecordPtr ptr LINT_SET_PTR;
+  c_backupPool.getPtr(ptr, trigPtr.p->backupPtr);
+
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG) {
+    if(trg->getAttrInfoType() == TrigAttrInfo::AFTER_VALUES) {
+      jam();
+      /**
+       * Backup is doing UNDO logging and don't need after values
+       */
+      return;
+    }//if
+  }
+  else {
+    if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES) {
+      jam();
+      /**
+       * Backup is doing REDO logging and don't need before values
+       */
+      return;
+    }//if
+  }
 
   BackupFormat::LogFile::LogEntry * logEntry = trigPtr.p->logEntry;
   if(logEntry == 0) 
@@ -4462,17 +4513,30 @@ Backup::execFIRE_TRIG_ORD(Signal* signal
     ptr.p->currGCP = gci;
   }
 
+  Uint32 datalen = len;
   len += (sizeof(BackupFormat::LogFile::LogEntry) >> 2) - 2;
   trigPtr.p->logEntry->Length = htonl(len);
 
-  ndbrequire(len + 1 <= trigPtr.p->operation->dataBuffer.getMaxWrite());
-  trigPtr.p->operation->dataBuffer.updateWritePtr(len + 1);
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+  {
+    /* keep the length at both the end of logEntry and ->logEntry variable
+       The total length of logEntry is len + 2
+    */
+    trigPtr.p->logEntry->Data[datalen] = htonl(len);
+  }
+
+  Uint32 entryLength = len +1;
+  if(ptr.p->flags & BackupReq::USE_UNDO_LOG)
+    entryLength ++;
+
+  ndbrequire(entryLength <= trigPtr.p->operation->dataBuffer.getMaxWrite());
+  trigPtr.p->operation->dataBuffer.updateWritePtr(entryLength);
   trigPtr.p->logEntry = 0;
   
   {
-    const Uint32 tmp = (len + 1) << 2;
-    trigPtr.p->operation->noOfBytes     += tmp;
-    trigPtr.p->operation->m_bytes_total += tmp;
+    const Uint32 entryByteLength = entryLength << 2;
+    trigPtr.p->operation->noOfBytes     += entryByteLength;
+    trigPtr.p->operation->m_bytes_total += entryByteLength;
     trigPtr.p->operation->noOfRecords     += 1;
     trigPtr.p->operation->m_records_total += 1;
   }

=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp	2008-03-18 07:12:39 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp	2008-11-10 00:31:12 +0000
@@ -69,9 +69,10 @@ struct BackupFormat {
    */
   enum FileType {
     CTL_FILE = 1,
-    LOG_FILE = 2,
+    LOG_FILE = 2, //redo log file for backup.
     DATA_FILE = 3,
-    LCP_FILE = 4
+    LCP_FILE = 4,
+    UNDO_FILE = 5 //undo log for backup.
   };
   
   /**

=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp	2008-08-21 22:14:40 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp	2008-11-10 00:31:12 +0000
@@ -2082,10 +2082,11 @@ ndb_mgm_start(NdbMgmHandle handle, int n
  *****************************************************************************/
 extern "C"
 int 
-ndb_mgm_start_backup2(NdbMgmHandle handle, int wait_completed,
+ndb_mgm_start_backup3(NdbMgmHandle handle, int wait_completed,
 		     unsigned int* _backup_id,
 		     struct ndb_mgm_reply*, /*reply*/
-		     unsigned int input_backupId) 
+		     unsigned int input_backupId,
+		     unsigned int backuppoint) 
 {
   SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_start_backup");
   const ParserRow<ParserDummy> start_backup_reply[] = {
@@ -2101,6 +2102,7 @@ ndb_mgm_start_backup2(NdbMgmHandle handl
   args.put("completed", wait_completed);
   if(input_backupId > 0)
     args.put("backupid", input_backupId);
+  args.put("backuppoint", backuppoint);
   const Properties *reply;
   { // start backup can take some time, set timeout high
     Uint64 old_timeout= handle->timeout;
@@ -2128,6 +2130,17 @@ ndb_mgm_start_backup2(NdbMgmHandle handl
 
 extern "C"
 int 
+ndb_mgm_start_backup2(NdbMgmHandle handle, int wait_completed,
+		     unsigned int* _backup_id,
+		     struct ndb_mgm_reply*, /*reply*/
+		     unsigned int input_backupId)
+{
+  struct ndb_mgm_reply reply;
+  return ndb_mgm_start_backup3(handle, wait_completed, _backup_id, &reply,
input_backupId, 0);
+}
+
+extern "C"
+int 
 ndb_mgm_start_backup(NdbMgmHandle handle, int wait_completed,
 		     unsigned int* _backup_id,
 		     struct ndb_mgm_reply* /*reply*/)

=== modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp'
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2008-03-14 13:34:05 +0000
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2008-11-10 00:31:12 +0000
@@ -264,8 +264,8 @@ static const char* helpText =
 "SHOW CONFIG                            Print configuration\n"
 "SHOW PARAMETERS                        Print configuration parameters\n"
 #endif
-"START BACKUP [<backup id>] [NOWAIT | WAIT STARTED | WAIT COMPLETED]\n"
-"                                       Start backup (default WAIT COMPLETED)\n"
+"START BACKUP [<backup id>] [SNAPSHOTSTART | SNAPSHOTEND] [NOWAIT | WAIT STARTED |
WAIT COMPLETED]\n"
+"                                       Start backup (default WAIT
COMPLETED,SNAPSHOTEND)\n"
 "ABORT BACKUP <backup id>               Abort backup\n"
 "SHUTDOWN                               Shutdown all processes in cluster\n"
 "CLUSTERLOG ON [<severity>] ...         Enable Cluster logging\n"
@@ -329,7 +329,7 @@ static const char* helpTextStartBackup =
 " NDB Cluster -- Management Client -- Help for START BACKUP command\n"
 "---------------------------------------------------------------------------\n"
 "START BACKUP  Start a cluster backup\n\n"
-"START BACKUP [<backup id>] [NOWAIT | WAIT STARTED | WAIT COMPLETED]\n"
+"START BACKUP [<backup id>] [SNAPSHOTSTART | SNAPSHOTEND] [NOWAIT | WAIT STARTED |
WAIT COMPLETED]\n"
 "                   Start a backup for the cluster.\n"
 "                   Each backup gets an ID number that is reported to the\n"
 "                   user. This ID number can help you find the backup on the\n"
@@ -338,6 +338,10 @@ static const char* helpTextStartBackup =
 "                   You can also start specified backup using START BACKUP <backup
id> \n\n"
 "                   <backup id> \n"
 "                     Start a specified backup using <backup id> as bakcup ID
number.\n" 
+"                   SNAPSHOTSTART \n"
+"                     Backup snapshot is taken around the time the backup is started.\n" 
+"                   SNAPSHOTEND \n"
+"                     Backup snapshot is taken around the time the backup is
completed.\n" 
 "                   NOWAIT \n"
 "                     Start a cluster backup and return immediately.\n"
 "                     The management client will return control directly\n"
@@ -2832,51 +2836,96 @@ CommandInterpreter::executeStartBackup(c
 
   int result;
   int flags = 2;
-  if (sz == 2 && args[1] == "NOWAIT")
-  {
-    flags = 0;
-  }
-  else if (sz == 1 || (sz == 3 && args[1] == "WAIT" && args[2] ==
"COMPLETED"))
-  {
-    flags = 2;
-    ndbout_c("Waiting for completed, this may take several minutes");
-  }
-  else if (sz == 3 && args[1] == "WAIT" && args[2] == "STARTED")
-  {
-    ndbout_c("Waiting for started, this may take several minutes");
-    flags = 1;
-  }
-  else if (sscanf(args[1].c_str(), "%u", &input_backupId) == 1 &&
input_backupId > 0 && input_backupId < MAX_BACKUPS)
+  //1,snapshot at start time. 0 snapshot at end time
+  unsigned int backuppoint = 0;
+  bool b_log = false;
+  bool b_nowait = false;
+  bool b_wait_completed = false;
+  bool b_wait_started = false;
+
+  /*
+   All the commands list as follow:
+   start backup <backupid> nowait | start backup <backupid>
snapshotstart/snapshotend nowati | start backup <backupid> nowait
snapshotstart/snapshotend
+   start backup <backupid> | start backup <backupid> wait completed | start
backup <backupid> snapshotstart/snapshotend
+   start backup <backupid> snapshotstart/snapshotend wait completed | start backup
<backupid> wait completed snapshotstart/snapshotend
+   start backup <backupid> wait started | start backup <backupid>
snapshotstart/snapshotend wait started
+   start backup <backupid> wait started snapshotstart/snapshotend
+  */
+  for (int i= 1; i < sz; i++)
   {
-    // start backup n nowait
-    if (sz == 3 && args[2] == "NOWAIT")
-    {
-      flags = 0;
+    if (i == 1 && sscanf(args[1].c_str(), "%u", &input_backupId) == 1) {
+      if (input_backupId > 0 && input_backupId < MAX_BACKUPS)
+        continue;
+      else {
+        invalid_command(parameters);
+        return -1;
+      }
     }
-    // start backup n; start backup n wait complete
-    else if ( sz == 2 || (sz == 4 && args[2] == "WAIT" && args[3]
=="COMPLETED"))
-    {
-      flags = 2;
-      ndbout_c("Waiting for completed, this may take several minutes");
+
+    if (args[i] == "SNAPSHOTEND") {
+      if (b_log ==true) {
+        invalid_command(parameters);
+        return -1;
+      }
+      b_log = true;
+      backuppoint = 0;
+      continue;
     }
-    //start backup n wait started
-    else if (sz == 4 && args[2] == "WAIT" && args[3] == "STARTED")
-    {
-      ndbout_c("Waiting for started, this may take several minutes");
-      flags = 1;
+    if (args[i] == "SNAPSHOTSTART") {
+      if (b_log ==true) {
+        invalid_command(parameters);
+        return -1;
+      }
+      b_log = true;
+      backuppoint = 1;
+      continue;
     }
-    else
-    {
-      invalid_command(parameters);
-      return -1;
+    if (args[i] == "NOWAIT") {
+      if (b_nowait == true || b_wait_completed == true || b_wait_started ==true) {
+        invalid_command(parameters);
+        return -1;
+      }
+      b_nowait == true;
+      flags = 0;
+      continue;
+    }
+    if (args[i] == "WAIT") {
+      if (b_nowait == true || b_wait_completed == true || b_wait_started ==true) {
+        invalid_command(parameters);
+        return -1;
+      }
+      if (i+1 < sz) {
+        if (args[i+1] == "COMPLETED") {
+          b_wait_completed == true;
+          flags = 2; 
+          i++;
+        }
+        else if (args[i+1] == "STARTED") {
+          b_wait_started == true;
+          flags = 1;
+          i++;
+        }
+        else {
+          invalid_command(parameters);
+          return -1;
+        }
+      }
+      else {
+        invalid_command(parameters);
+        return -1;
+      }
+      continue;
     }
-  }
-  else
-  {
     invalid_command(parameters);
     return -1;
   }
 
+  //print message
+  if (b_wait_completed)
+    ndbout_c("Waiting for completed, this may take several minutes");
+  if (b_wait_started)
+    ndbout_c("Waiting for started, this may take several minutes");
+
   NdbLogEventHandle log_handle= NULL;
   struct ndb_logevent log_event;
   if (flags == 2 && !interactive)
@@ -2890,8 +2939,11 @@ CommandInterpreter::executeStartBackup(c
       return -1;
     }
   }
-  if (input_backupId > 0)
-    result = ndb_mgm_start_backup2(m_mgmsrv, flags, &backupId, &reply,
input_backupId);
+
+  //start backup N | start backup snapshotstart/snapshotend
+  if (input_backupId > 0 || b_log == true)
+    result = ndb_mgm_start_backup3(m_mgmsrv, flags, &backupId, &reply,
input_backupId, backuppoint);
+  //start backup
   else
     result = ndb_mgm_start_backup(m_mgmsrv, flags, &backupId, &reply);
 

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2008-09-02 09:28:24 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2008-11-10 00:31:12 +0000
@@ -2585,7 +2585,7 @@ MgmtSrvr::eventReport(const Uint32 * the
  ***************************************************************************/
 
 int
-MgmtSrvr::startBackup(Uint32& backupId, int waitCompleted, Uint32 input_backupId)
+MgmtSrvr::startBackup(Uint32& backupId, int waitCompleted, Uint32 input_backupId,
Uint32 backuppoint)
 {
   SignalSender ss(theFacade);
   ss.lock(); // lock will be released on exit
@@ -2605,18 +2605,20 @@ MgmtSrvr::startBackup(Uint32& backupId, 
   BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend());
   if(input_backupId > 0)
   {
-    ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ, 
-	     BackupReq::SignalLength);
+    ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
+             BackupReq::SignalLength);
     req->inputBackupId = input_backupId;
   }
   else
-    ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ, 
-	     BackupReq::SignalLength - 1);
-  
+    ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
+             BackupReq::SignalLength - 1);
+
   req->senderData = 19;
   req->backupDataLen = 0;
   assert(waitCompleted < 3);
   req->flags = waitCompleted & 0x3;
+  if(backuppoint == 1)
+    req->flags |= BackupReq::USE_UNDO_LOG;
 
   BackupEvent event;
   int do_send = 1;

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2008-03-14 13:34:05 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2008-11-10 00:31:12 +0000
@@ -309,7 +309,7 @@ public:
   /**
    * Backup functionallity
    */
-  int startBackup(Uint32& backupId, int waitCompleted= 2, Uint32 input_backupId= 0);
+  int startBackup(Uint32& backupId, int waitCompleted= 2, Uint32 input_backupId= 0,
Uint32 backuppoint= 0);
   int abortBackup(Uint32 backupId);
   int performBackup(Uint32* backupId);
 

=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp	2008-08-05 12:15:56 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp	2008-11-10 00:31:12 +0000
@@ -191,6 +191,7 @@ ParserRow<MgmApiSession> commands[] = {
   MGM_CMD("start backup", &MgmApiSession::startBackup, ""),
     MGM_ARG("completed", Int, Optional ,"Wait until completed"),
     MGM_ARG("backupid", Int, Optional ,"User input backup id"),
+    MGM_ARG("backuppoint", Int, Optional ,"backup snapshot at start time or complete
time"),
 
   MGM_CMD("abort backup", &MgmApiSession::abortBackup, ""),
     MGM_ARG("id", Int, Mandatory, "Backup id"),
@@ -700,19 +701,19 @@ MgmApiSession::startBackup(Parser<MgmApi
 			   Properties const &args) {
   DBUG_ENTER("MgmApiSession::startBackup");
   unsigned backupId;
-  unsigned input_backupId;
+  unsigned input_backupId= 0;
+  unsigned backuppoint= 0;
   Uint32 completed= 2;
   int result;
 
   args.get("completed", &completed);
 
   if(args.contains("backupid"))
-  {
     args.get("backupid", &input_backupId);
-    result = m_mgmsrv.startBackup(backupId, completed, input_backupId);
-  }
-  else
-    result = m_mgmsrv.startBackup(backupId, completed);
+  if(args.contains("backuppoint"))
+    args.get("backuppoint", &backuppoint);
+
+  result = m_mgmsrv.startBackup(backupId, completed, input_backupId, backuppoint);
 
   m_output->println("start backup reply");
   if(result != 0)

=== modified file 'storage/ndb/test/include/NdbBackup.hpp'
--- a/storage/ndb/test/include/NdbBackup.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/include/NdbBackup.hpp	2008-11-10 00:31:12 +0000
@@ -26,7 +26,10 @@ public:
   NdbBackup(int _own_id, const char* _addr = 0) 
     : NdbConfig(_own_id, _addr) {};
 
-  int start(unsigned & _backup_id);
+  int start(unsigned & _backup_id,
+	    int flags = 2,
+	    unsigned int user_backup_id= 0,
+	    unsigned int logtype= 0);
   int restore(unsigned _backup_id);
 
   int NFMaster(NdbRestarter& _restarter);
@@ -38,6 +41,8 @@ public:
   int FailMasterAsSlave(NdbRestarter& _restarter);
   int FailSlave(NdbRestarter& _restarter);
   int Fail(NdbRestarter& _restarter, int *Fail_codes, const int sz, bool onMaster);
+  int startLogEvent();
+  int checkBackupStatus();
 
 private:
 
@@ -47,6 +52,7 @@ private:
 		  unsigned _backup_id);
 
   const char * getBackupDataDirForNode(int _node_id);
+  NdbLogEventHandle log_handle;
   
 };
 

=== modified file 'storage/ndb/test/ndbapi/testBackup.cpp'
--- a/storage/ndb/test/ndbapi/testBackup.cpp	2008-03-03 11:12:37 +0000
+++ b/storage/ndb/test/ndbapi/testBackup.cpp	2008-11-10 00:31:12 +0000
@@ -472,6 +472,282 @@ int runRestoreBankAndVerify(NDBT_Context
   
   return result;
 }
+int runCreateUndoData(NDBT_Context* ctx, NDBT_Step* step){
+
+  int undoError = 10041;
+  NdbRestarter restarter;
+
+  Ndb* pNdb= GETNDB(step);
+  NdbDictionary::Dictionary* pDict = pNdb->getDictionary();
+  NdbDictionary::Table tab("TBUNDO1");
+  if (pDict->getTable(tab.getName()) != 0)
+    if(pDict->dropTable(tab.getName()) != 0) {
+      g_err << "Can't drop old table" << endl;
+      return NDBT_FAILED;
+    }
+
+  // col ATTR1 - pk
+  { NdbDictionary::Column col("ATTR1");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  // col ATTR2 - unsigned
+  { NdbDictionary::Column col("ATTR2");
+    col.setType(NdbDictionary::Column::Unsigned);
+    tab.addColumn(col);
+  }
+  // create
+  if(pDict->createTable(tab) != 0) {
+    g_err << "Can't create table" << endl;
+    return NDBT_FAILED;
+  }
+
+  //insert data
+  NdbOperation *pOperation = NULL;
+  NdbTransaction *pTransaction= pNdb->startTransaction();
+  if (pTransaction == NULL) {
+    g_err << "Can't get transaction pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int i = 0; i < 10; i++) {
+    if((pOperation = pTransaction->getNdbOperation(tab.getName())) == NULL) {
+      g_err << "Can't get operation" << endl;
+      return NDBT_FAILED;
+    }
+
+    pOperation->insertTuple();
+    pOperation->equal("ATTR1", i);
+    pOperation->setValue("ATTR2", 1);
+  }
+  if(pTransaction->execute(Commit ) != 0) {
+    g_err << "Can't commit transaction" << endl;
+    return NDBT_FAILED;
+  }
+
+  pNdb->closeTransaction(pTransaction);
+  pTransaction = NULL;
+  pOperation = NULL;
+ 
+  if(restarter.waitClusterStarted(60)){
+    g_err << "waitClusterStarted failed"<< endl;
+    return NDBT_FAILED;
+  }
+  if (restarter.insertErrorInAllNodes(undoError) != 0) {
+    g_err << "error insert failed" << endl;
+    return NDBT_FAILED;
+  } 
+  return NDBT_OK;
+}
+int runBackupWaitStartedUndo(NDBT_Context* ctx, NDBT_Step* step){
+
+  
+  NdbBackup backup(GETNDB(step)->getNodeId()+1);
+
+  unsigned backupId = 0;
+  // start backup wait started
+  if (backup.start(backupId, 1, 0, 1) == -1){
+    return NDBT_FAILED;
+  }
+  ndbout << "Started backup " << backupId << endl;
+  ctx->setProperty("BackupId", backupId);
+
+  /*
+   * Make sure the backup get into ERROR_INSERTED(10041)
+   * Before backup get into ERROR_INSERTED,
+   *   it will create triggers for every table/fragement after the backup return to the
client.
+   */
+  NdbSleep_SecSleep(2);
+
+  return NDBT_OK;
+}
+
+int runChangeUndoDataDuringBackup(NDBT_Context* ctx, NDBT_Step* step){
+  Ndb* pNdb= GETNDB(step);
+  NdbDictionary::Dictionary* pDict= pNdb->getDictionary();
+  const NdbDictionary::Table *pTable= pDict->getTable("TBUNDO1");
+
+  if (pTable == NULL) {
+    g_err << "Can't get table pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  //delete 5 rows(5-9)
+  NdbTransaction *pTransaction= pNdb->startTransaction();
+  if (pTransaction == NULL) {
+    g_err << "Can't get transaction pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int i = 5; i < 10; i++) {
+    NdbOperation *pOperation= pTransaction->getNdbOperation(pTable->getName());
+    if (pOperation == NULL) {
+      g_err << "Can't get operation pointer" << endl;
+      return NDBT_FAILED;
+    }
+    pOperation->deleteTuple();
+    pOperation->equal("ATTR1", i);
+  }
+  if(pTransaction->execute(Commit ) != 0) {
+    g_err << "Can't commit transaction delete" << endl;
+    return NDBT_FAILED;
+  }
+  pNdb->closeTransaction(pTransaction);
+
+  //add 5 new rowsi(5-9)
+  pTransaction= pNdb->startTransaction();
+  if (pTransaction == NULL) {
+    g_err << "Can't get transaction pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int i = 5; i < 10; i++) {
+    NdbOperation *pOperation= pTransaction->getNdbOperation(pTable->getName());
+    if (pOperation == NULL) {
+      g_err << "Can't get operation pointer" << endl;
+      return NDBT_FAILED;
+    }
+    pOperation->insertTuple();
+    pOperation->equal("ATTR1", i);
+    pOperation->setValue("ATTR2", 2);
+  }
+  if(pTransaction->execute(Commit ) != 0) {
+    g_err << "Can't commit transaction add" << endl;
+    return NDBT_FAILED;
+  }
+  pNdb->closeTransaction(pTransaction);
+
+  //update 5 rows(0-4)
+  pTransaction= pNdb->startTransaction();
+  if (pTransaction == NULL) {
+    g_err << "Can't get transaction pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int i = 0; i < 5; i++) {
+    NdbOperation *pOperation= pTransaction->getNdbOperation(pTable->getName());
+    if (pOperation == NULL) {
+      g_err << "Can't get operation pointer" << endl;
+      return NDBT_FAILED;
+    }
+    pOperation->updateTuple();
+    pOperation->equal("ATTR1", i);
+    pOperation->setValue("ATTR2", 3);
+  }
+  if(pTransaction->execute(Commit ) != 0) {
+    g_err << "Can't commit transaction update" << endl;
+    return NDBT_FAILED;
+  }
+  pNdb->closeTransaction(pTransaction);
+  return NDBT_OK;
+
+}
+int runDropUndoTablesRestart(NDBT_Context* ctx, NDBT_Step* step){
+
+  NdbBackup backup(GETNDB(step)->getNodeId()+1);
+
+  // start log event
+  if(backup.startLogEvent() != 0) {
+    g_err << "Can't create log event" << endl;
+    return NDBT_FAILED;
+  }
+  NdbSleep_SecSleep(10);
+  // make sure backup have finish
+  int i = 0;
+  while (1) {
+    if (backup.checkBackupStatus() == 2) //complete
+      break;
+    else if (i == 15) {
+      g_err << "Backup timeout" << endl;
+      return NDBT_FAILED;
+    } else
+      NdbSleep_SecSleep(2);
+    i++;
+  }
+
+  NdbRestarter restarter;
+  Ndb* pNdb= GETNDB(step);
+  NdbDictionary::Dictionary* pDict= pNdb->getDictionary();
+  const NdbDictionary::Table* pTable= pDict->getTable("TBUNDO1");
+  if (pTable == NULL) {
+    g_err << "Can't get table pointer" << endl;
+    return NDBT_FAILED;
+  }
+  if (pNdb->getDictionary()->dropTable(pTable->getName()) != 0) {
+    g_err << "Can't drop table" << endl;
+    return NDBT_FAILED;
+  }
+  runDropTable(ctx,step);
+  if (restarter.restartAll(false) != 0)
+    return NDBT_FAILED;
+
+  if (restarter.waitClusterStarted() != 0)
+    return NDBT_FAILED;
+
+  return NDBT_OK;
+}
+int runVerifyUndoData(NDBT_Context* ctx, NDBT_Step* step){
+  Ndb* pNdb= GETNDB(step);
+  NdbDictionary::Dictionary* pDict= pNdb->getDictionary();
+  const NdbDictionary::Table *pTable= pDict->getTable("TBUNDO1");
+
+  if (pTable == NULL) {
+    g_err << "Can't get table pointer" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int i = 0; i < 10; i++) {
+    NdbTransaction *pTransaction= pNdb->startTransaction();
+    if (pTransaction == NULL) {
+      g_err << "Can't get transaction pointer" << endl;
+      return NDBT_FAILED;
+    }
+
+    NdbOperation *pOperation= pTransaction->getNdbOperation(pTable);
+    if (pOperation == NULL) {
+      g_err << "Can't get operation pointer" << endl;
+      return NDBT_FAILED;
+    }
+
+    pOperation->readTuple(NdbOperation::LM_Read);
+    pOperation->equal("ATTR1", i);
+
+    NdbRecAttr *pRecAttr= pOperation->getValue("ATTR2", NULL);
+    if (pRecAttr == NULL) {
+      g_err << "Can't get value" << endl;
+      return NDBT_FAILED;
+    }
+
+    if(pTransaction->execute( NdbTransaction::Commit ) == -1) {
+      g_err << "Commit error" << endl;
+      return NDBT_FAILED;
+    }
+
+    //diff
+    printf(" %2d    %2d\n", i, pRecAttr->u_32_value());
+    if(pRecAttr->u_32_value() != 1) {
+      g_err << "Table value not correct, should be 1" << endl;
+      return NDBT_FAILED;
+    }
+    pNdb->closeTransaction(pTransaction);
+  }
+  return NDBT_OK;
+}
+int runClearUndoTable(NDBT_Context* ctx, NDBT_Step* step){
+  Ndb* pNdb= GETNDB(step);
+  NdbDictionary::Dictionary* pDict= pNdb->getDictionary();
+  const NdbDictionary::Table *pTable= pDict->getTable("TBUNDO1");
+  if (pTable == NULL) {
+    g_err << "Can't get table pointer" << endl;
+    return NDBT_FAILED;
+  }
+  if (pNdb->getDictionary()->dropTable(pTable->getName()) != 0) {
+    g_err << "Can't drop table" << endl;
+    return NDBT_FAILED;
+  }
+  return NDBT_OK;
+}
 
 NDBT_TESTSUITE(testBackup);
 TESTCASE("BackupOne", 
@@ -539,6 +815,23 @@ TESTCASE("BackupBank", 
   VERIFIER(runRestoreBankAndVerify);
   //  FINALIZER(runDropBank);
 }
+TESTCASE("BackupUndoLog", 
+	 "Test for backup happen at start time\n"
+	 "1. Load table and data for undo log testing\n"
+	 "2. Start backup with wait started\n"
+	 "3. Insert, delete, update data during backup\n"
+	 "4. Drop tables and restart \n"
+	 "5. Restore\n"
+	 "6. Verify content of table\n"
+	 "6. Drop tables\n"){
+  INITIALIZER(runCreateUndoData);
+  INITIALIZER(runBackupWaitStartedUndo);
+  INITIALIZER(runChangeUndoDataDuringBackup);
+  INITIALIZER(runDropUndoTablesRestart);
+  INITIALIZER(runRestoreOne);
+  INITIALIZER(runVerifyUndoData);
+  INITIALIZER(runClearUndoTable);
+}
 TESTCASE("NFMaster", 
 	 "Test that backup behaves during node failiure\n"){
   INITIALIZER(setMaster);

=== modified file 'storage/ndb/test/src/NdbBackup.cpp'
--- a/storage/ndb/test/src/NdbBackup.cpp	2008-03-03 11:12:37 +0000
+++ b/storage/ndb/test/src/NdbBackup.cpp	2008-11-10 00:31:12 +0000
@@ -36,7 +36,10 @@
 #include <mgmapi_configuration.hpp>
 
 int 
-NdbBackup::start(unsigned int & _backup_id){
+NdbBackup::start(unsigned int & _backup_id,
+		 int flags,
+		 unsigned int user_backup_id,
+		 unsigned int logtype){
 
   
   if (!isConnected())
@@ -45,10 +48,12 @@ NdbBackup::start(unsigned int & _backup_
   ndb_mgm_reply reply;
   reply.return_code = 0;
 
-  if (ndb_mgm_start_backup(handle,
-			   2, // wait until completed
+  if (ndb_mgm_start_backup3(handle,
+			   flags,
 			   &_backup_id,
-			   &reply) == -1) {
+			   &reply,
+			   user_backup_id,
+			   logtype) == -1) {
     g_err << "Error: " << ndb_mgm_get_latest_error(handle) << endl;
     g_err << "Error msg: " << ndb_mgm_get_latest_error_msg(handle) <<
endl;
     g_err << "Error desc: " << ndb_mgm_get_latest_error_desc(handle) <<
endl;
@@ -65,6 +70,50 @@ NdbBackup::start(unsigned int & _backup_
   return 0;
 }
 
+int
+NdbBackup::startLogEvent(){
+
+  if (!isConnected())
+    return -1;
+  log_handle= NULL;
+  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0, 0 };
+  log_handle = ndb_mgm_create_logevent_handle(handle, filter);
+  if (!log_handle) {
+    g_err << "Can't create log event" << endl;
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbBackup::checkBackupStatus(){
+
+  struct ndb_logevent log_event;
+  int result = 0;
+  int res;
+  if(!log_handle) {
+    return -1;
+  }
+  if ((res= ndb_logevent_get_next(log_handle, &log_event, 3000)) > 0)
+  {
+    switch (log_event.type) {
+      case NDB_LE_BackupStarted:
+	result = 1;
+	break;
+      case NDB_LE_BackupCompleted:
+	result = 2;
+        break;
+      case NDB_LE_BackupAborted:
+	result = 3;
+        break;
+      default:
+        break;
+    }
+  }
+  ndb_mgm_destroy_logevent_handle(&log_handle);
+  return result;
+}
+
 
 const char * 
 NdbBackup::getBackupDataDirForNode(int _node_id){

=== modified file 'storage/ndb/tools/restore/Restore.cpp'
--- a/storage/ndb/tools/restore/Restore.cpp	2008-09-19 06:45:00 +0000
+++ b/storage/ndb/tools/restore/Restore.cpp	2008-11-10 00:31:12 +0000
@@ -1106,6 +1106,7 @@ BackupFile::BackupFile(void (* _free_dat
 
   m_file_size = 0;
   m_file_pos = 0;
+  m_is_undolog = false;
 }
 
 BackupFile::~BackupFile(){
@@ -1159,21 +1160,104 @@ Uint32 BackupFile::buffer_get_ptr_ahead(
       (*free_data_callback)();
 
     reset_buffers();
-    memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left);
 
-    int error;
-    size_t r = azread(&m_file,
-                      ((char *)m_buffer) + m_buffer_data_left,
-                      m_buffer_sz - m_buffer_data_left, &error);
-    m_file_pos += r;
-    m_buffer_data_left += r;
-    m_buffer_ptr = m_buffer;
+    if (m_is_undolog)
+    {
+      /* move the left data to the end of buffer
+       */
+      size_t r = 0;
+      int error;
+      /* move the left data to the end of buffer
+       * m_buffer_ptr point the end of the left data. buffer_data_start point the start
of left data
+       * m_buffer_data_left is the length of left data.
+       */
+      Uint32 file_left_entry_data = 0;
+      Uint32 buffer_free_space = m_buffer_sz - m_buffer_data_left;
+      void * buffer_end = (char *)m_buffer + m_buffer_sz;
+      void * buffer_data_start = (char *)m_buffer_ptr - m_buffer_data_left;
+
+      memmove((char *)buffer_end - m_buffer_data_left, buffer_data_start,
m_buffer_data_left);
+      buffer_data_start = (char *)buffer_end - m_buffer_data_left;
+      /*
+       * For undo log file we should read log entris backwards from log file.
+       *   That mean the first entries should start at sizeof(m_fileHeader).
+       *   The end of the last entries should be the end of log file(EOF-1).
+       * If ther are entries left in log file to read.
+       *   m_file_pos should bigger than sizeof(m_fileHeader).
+       * If the length of left log entries less than the residual length of buffer,
+       *   we just need to read all the left entries from log file into the buffer.
+       *   and all the left entries in log file should been read into buffer. Or
+       * If the length of left entries is bigger than the residual length of buffer,
+       *   we should fill the buffer because the current buffer can't contain
+           all the left log entries, we should read more times.
+       * 
+       */
+      if (m_file_pos > sizeof(m_fileHeader))
+      {
+        /*
+         * We read(consume) data from the end of the buffer.
+         * If the left data is not enough for next read in buffer,
+         *   we move the residual data to the end of buffer.
+         *   Then we will fill the start of buffer with new data from log file.
+         * eg. If the buffer length is 10. "+" denotes useless content.
+         *                          top        end
+         *   Bytes in file        abcdefgh0123456789
+         *   Byte in buffer       0123456789             --after first read
+         *   Consume datas...     (6789) (2345)
+         *   Bytes in buffer      01++++++++             --after several consumes
+         *   Move data to end     ++++++++01
+         *   Bytes in buffer      abcdefgh01             --after second read
+         */
+	file_left_entry_data = m_file_pos - sizeof(m_fileHeader);
+        if (file_left_entry_data <= buffer_free_space)
+        {
+          /* All remaining data fits in space available in buffer. 
+	   * Read data into buffer before existing data.
+	   */
+          // Move to the start of data to be read
+          azseek(&m_file, sizeof(m_fileHeader), SEEK_SET);
+          r = azread(&m_file, (char *)buffer_data_start - file_left_entry_data,
file_left_entry_data, &error);
+          //move back
+          azseek(&m_file, sizeof(m_fileHeader), SEEK_SET);
+        }
+        else
+        {
+	  // Fill remaing space at start of buffer with data from file.
+          azseek(&m_file, m_file_pos-buffer_free_space, SEEK_SET);
+          r = azread(&m_file, ((char *)m_buffer), buffer_free_space, &error);
+          azseek(&m_file, m_file_pos-buffer_free_space, SEEK_SET);
+        }
+      }
+      m_file_pos -= r;
+      m_buffer_data_left += r;
+      //move to the end of buffer
+      m_buffer_ptr = buffer_end;
+    }
+    else
+    {
+      memmove(m_buffer, m_buffer_ptr, m_buffer_data_left);
+      int error;
+      size_t r = azread(&m_file,
+                        ((char *)m_buffer) + m_buffer_data_left,
+                        m_buffer_sz - m_buffer_data_left, &error);
+      m_file_pos += r;
+      m_buffer_data_left += r;
+      m_buffer_ptr = m_buffer;
+    }
 
     if (sz > m_buffer_data_left)
       sz = size * (m_buffer_data_left / size);
   }
 
-  *p_buf_ptr = m_buffer_ptr;
+  /*
+   * For undolog, the m_buffer_ptr points to the end of the left data.
+   * After we get data from the end of buffer, the data-end move forward.
+   *   So we should move m_buffer_ptr to the right place.
+   */
+  if(m_is_undolog)
+    *p_buf_ptr = (char *)m_buffer_ptr - sz;
+  else
+    *p_buf_ptr = m_buffer_ptr;
 
   return sz/size;
 }
@@ -1181,8 +1265,19 @@ Uint32 BackupFile::buffer_get_ptr(void *
 {
   Uint32 r = buffer_get_ptr_ahead(p_buf_ptr, size, nmemb);
 
-  m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size);
-  m_buffer_data_left -= (r*size);
+  if(m_is_undolog)
+  {
+    /* we read from end of buffer to start of buffer.
+     * m_buffer_ptr keep at the end of real data in buffer.
+     */
+    m_buffer_ptr = ((char*)m_buffer_ptr)-(r*size);
+    m_buffer_data_left -= (r*size);
+  }
+  else
+  {
+    m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size);
+    m_buffer_data_left -= (r*size);
+  }
 
   return r;
 }
@@ -1316,10 +1411,30 @@ BackupFile::readHeader(){
   debug << "ByteOrder is " << m_fileHeader.ByteOrder << endl;
   debug << "magicByteOrder is " << magicByteOrder << endl;
   
-  if (m_fileHeader.FileType != m_expectedFileHeader.FileType){
+
+  if (m_fileHeader.FileType != m_expectedFileHeader.FileType &&
+      !(m_expectedFileHeader.FileType == BackupFormat::LOG_FILE &&
+      m_fileHeader.FileType == BackupFormat::UNDO_FILE)){
+    // UNDO_FILE will do in case where we expect LOG_FILE
     abort();
   }
   
+  if(m_fileHeader.FileType == BackupFormat::UNDO_FILE){
+      m_is_undolog = true;
+      /* move pointer to end of data part. 
+         move 4 bytes from the end of file 
+         because footer contain 4 bytes 0 at the end of file.
+         we discard the remain data stored in m_buffer.
+      */
+      struct stat buf;
+      if (fstat(m_file.file, &buf) == 0)
+        m_file_size = (Uint64)buf.st_size;
+      azseek(&m_file, 4, SEEK_END);  
+      m_file_pos = m_file_size - 4;
+      m_buffer_data_left = 0;
+      m_buffer_ptr = m_buffer;
+  }
+
   // Check for BackupFormat::FileHeader::ByteOrder if swapping is needed
   if (m_fileHeader.ByteOrder == magicByteOrder) {
     m_hostByteOrder = true;
@@ -1541,9 +1656,24 @@ RestoreLogIterator::getNextLogEntry(int 
   do {
     Uint32 len;
     Uint32 *logEntryPtr;
-    if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
-      res= -1;
-      return 0;
+    if(m_is_undolog){
+      int read_result = 0;
+      read_result = buffer_read(&len, sizeof(Uint32), 1);
+      //no more log data to read
+      if (read_result == 0 ) {
+        res = 0;
+        return 0;
+      }
+      if (read_result != 1) {
+        res= -1;
+        return 0;
+      }
+    }
+    else{
+      if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
+        res= -1;
+        return 0;
+      }
     }
     len= ntohl(len);
 
@@ -1598,15 +1728,27 @@ RestoreLogIterator::getNextLogEntry(int 
   } while(m_last_gci > stopGCP + 1);
 
   m_logEntry.m_table = m_metaData.getTable(tableId);
+  /* We should 'invert' the operation type when we restore an Undo log.
+   *   To undo an insert operation, a delete is required.
+   *   To undo a delete operation, an insert is required.
+   * The backup have collected 'before values' for undoing 'delete+update' to make this
work.
+   * To undo insert, we only need primary key.
+   */
   switch(triggerEvent){
   case TriggerEvent::TE_INSERT:
-    m_logEntry.m_type = LogEntry::LE_INSERT;
+    if(m_is_undolog)
+      m_logEntry.m_type = LogEntry::LE_DELETE;
+    else
+      m_logEntry.m_type = LogEntry::LE_INSERT;
     break;
   case TriggerEvent::TE_UPDATE:
     m_logEntry.m_type = LogEntry::LE_UPDATE;
     break;
   case TriggerEvent::TE_DELETE:
-    m_logEntry.m_type = LogEntry::LE_DELETE;
+    if(m_is_undolog)
+      m_logEntry.m_type = LogEntry::LE_INSERT;
+    else
+      m_logEntry.m_type = LogEntry::LE_DELETE;
     break;
   default:
     res = -1;

=== modified file 'storage/ndb/tools/restore/Restore.hpp'
--- a/storage/ndb/tools/restore/Restore.hpp	2008-09-19 06:45:00 +0000
+++ b/storage/ndb/tools/restore/Restore.hpp	2008-11-10 00:31:12 +0000
@@ -298,6 +298,8 @@ protected:
   Uint64 m_file_size;
   Uint64 m_file_pos;
 
+  bool  m_is_undolog;
+
   void (* free_data_callback)();
   virtual void reset_buffers() {}
 

Thread
bzr commit into mysql-5.1 branch (leonard:2707) Leonard Zhou10 Nov