List:Commits« Previous MessageNext Message »
From:rsomla Date:May 29 2007 7:57am
Subject:bk commit into 5.1 tree (rafal:1.2527)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of rafal. When rafal does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-05-29 09:56:57+02:00, rafal@quant.(none) +1 -0
  Merge quant.(none):/ext/mysql/bk/backup/prototype
  into  quant.(none):/ext/mysql/bk/backup/prototype-push
  MERGE: 1.2525.1.2

  sql/backup/data_backup.cc@stripped, 2007-05-29 09:56:55+02:00, rafal@quant.(none) +0 -0
    SCCS merged
    MERGE: 1.15.1.2

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	rafal
# Host:	quant.(none)
# Root:	/ext/mysql/bk/backup/prototype-push/RESYNC

--- 1.16/sql/backup/data_backup.cc	2007-05-29 09:57:00 +02:00
+++ 1.17/sql/backup/data_backup.cc	2007-05-29 09:57:00 +02:00
@@ -1,24 +1,45 @@
 #include "../mysql_priv.h"
 
+/**
+  @file
+
+  Code used to backup table data.
+
+  Function @c write_table_data() and @c restore_table_data() use backup/restore
+  drivers and protocols to create image of the data stored in the tables being
+  backed up.
+ */
+
 #include "backup_engine.h"
 #include "stream.h"
-#include "archive.h"
+#include "backup_kernel.h"
+#include "debug.h"
+
+
+/***********************************************
+
+                  DATA BACKUP
+
+ ***********************************************/
 
 namespace backup {
 
 struct backup_state {
 
- enum value { INACTIVE,
-              INIT,
-              WAITING,
-              PREPARING,
-              READY,
-              FINISHING,
-              DONE,
-              SHUT_DOWN,
+ /// State of a single backup driver.
+ enum value { INACTIVE,   ///< Before backup process is started (phase 0).
+              INIT,       ///< During initial data transfer (phase 1).
+              WAITING,    ///< Waiting for other drivers to finish init phase (phase 2).
+              PREPARING,  ///< Preparing for @c lock() call (phase 3).
+              READY,      ///< Ready for @c lock() call (phase 4)
+              FINISHING,  ///< Final data transfer (phase 7).
+              DONE,       ///< Backup complete.
+              SHUT_DOWN,  ///< After @c end() call.
               ERROR,
               MAX };
 
+#ifndef DBUG_OFF
+
   static const char* name[];
 
   struct Initializer
@@ -40,11 +61,24 @@
  private:
 
   static Initializer init;
+
+#endif
+
 };
 
+#ifndef DBUG_OFF
+
 const char* backup_state::name[backup_state::MAX];
 backup_state::Initializer init;
 
+#endif
+
+/**
+  Used to write data blocks to a stream.
+
+  This class defines how buffers are allocated for data transfers
+  (@c get_buf() method). Each block is written as a separate chunk of data.
+ */
 struct Block_writer
 {
   enum result_t { OK, NO_RES, ERROR };
@@ -58,14 +92,12 @@
 
  private:
 
-//  uint     m_no;
   OStream  &m_str;
   size_t   buf_size;
 
   friend struct Backup_pump;
 };
 
-
 /**
   @class Backup_pump
 
@@ -80,63 +112,41 @@
 
 struct Backup_pump
 {
-  backup_state::value  state;
-  enum { READING, WRITING } mode;
-  size_t  init_size;
+  backup_state::value  state; ///< State of the backup driver.
+
+  enum { READING, ///< Pump is polling driver for data.
+         WRITING  ///< Pump sends data to the stream.
+       } mode;
+
+  size_t  init_size;  ///< The estimate returned by backup driver's @c init_data() method.
   size_t  bytes_in, bytes_out;
 
-  const char *m_name;
+  const char *m_name; ///< Name of the driver (for debug purposes).
 
-  Backup_pump(Backup_subimage&, Block_writer&);
+  Backup_pump(uint, Image_info&, Block_writer&);
   ~Backup_pump();
 
   size_t pump();
-  bool prepare();
 
-  bool begin()
-  {
-    state= backup_state::INIT;
-    DBUG_PRINT("backup/data",(" %s enters INIT state",m_name));
-    return m_drv.begin(m_bw.buf_size) == backup::OK;
-  }
-
-  bool end()
-  {
-    if (state != backup_state::SHUT_DOWN)
-    {
-      DBUG_PRINT("backup/data",(" shutting down %s",m_name));
-      m_drv.end();
-      m_drv.free();
-      state= backup_state::SHUT_DOWN;
-    }
-    return TRUE;
-  }
-
-
-  bool lock()
-  {
-    DBUG_PRINT("backup/data",(" locking %s",m_name));
-    return m_drv.lock() == backup::OK;
-  }
-
-  bool unlock()
-  {
-    DBUG_PRINT("backup/data",(" unlocking %s, goes to FINISHING state",m_name));
-    state= backup_state::FINISHING;
-    return m_drv.unlock() == backup::OK;  // FIXME: better handling of errors
-  }
+  bool begin();
+  bool end();
+  bool prepare();
+  bool lock();
+  bool unlock();
 
+  /// Return the backup driver used by the pump.
   Backup_driver &drv() const
-  { return m_drv; }
+  { return *m_drv; }
 
  private:
 
+  static const uint m_buf_size= 2048; ///< Size of buffers used for data transfers.
   static const uint get_buf_retries= 3; /**< if block writer has no buffers, retry
                                               this many times before giving up */
-  uint          m_drv_no;
-  Backup_driver &m_drv;
-  Block_writer  &m_bw;
-  Buffer        m_buf;
+  uint          m_drv_no; ///< The number of this image in the backup archive.
+  Backup_driver *m_drv;   ///< Pointer to the backup driver.
+  Block_writer  &m_bw;    ///< Block writer used for writing data blocks.
+  Buffer        m_buf;    ///< Buffer used for data transfers.
 
   /**
     Pointer to the memory area used as write buffer.
@@ -153,6 +163,7 @@
   bool drop_buf();
   bool write_buf();
 
+  /// Bitmap showing which streams have been closed by the driver.
   MY_BITMAP     m_closed_streams;
 
   void mark_stream_closed(uint stream_no)
@@ -167,32 +178,6 @@
 
 };
 
-bool Backup_pump::prepare()
-{
-  result_t res= m_drv.prelock();
-
-  state= backup_state::PREPARING;
-
-  switch (res) {
-
-  case READY:
-    state= backup_state::READY;
-
-  case OK:
-    res= OK;
-    break;
-
-  default:
-    break;
-
-  }
-
-  DBUG_PRINT("backup/data",(" preparing %s, goes to %s state",
-                            m_name,backup_state::name[state]));
-
-  return res == backup::OK;
-}
-
 /**
  @class Scheduler
 
@@ -202,10 +187,14 @@
 */
 class Scheduler
 {
+  class Pump;
+  class Pump_iterator;
+
  public:
 
+  void add(Pump*);
   void step();
-  void add(Backup_subimage&);
+
   void prepare();
   void lock();
   void unlock();
@@ -225,94 +214,12 @@
 
  private:
 
-  // Extend Backup_pump with information abouts its position relative
-  // to other pumps.
-
-  class Pump: public Backup_pump
-  {
-    size_t start_pos;
-    Block_writer bw;
-
-    Pump(OStream &s, Backup_subimage &img, size_t pos):
-      Backup_pump(img,bw), start_pos(pos), bw(2048,s)
-    {}
-
-    friend class Scheduler;
-
-   public:
-
-    size_t pos() const
-    { return start_pos + bytes_in; }
-
-  };
-
-  struct Pump_ptr
-  {
-    LIST  *el;
-
-    Pump* operator->()
-    {
-      return el? static_cast<Pump*>(el->data) : NULL;
-    }
-
-    void  operator++()
-    {
-      if(el) el= el->next;
-    }
-
-    operator bool() const
-    { return el && el->data; }
-
-    void operator=(const Pump_ptr &p)
-    { el= p.el; }
-
-    Pump_ptr(): el(NULL)
-    {}
-
-    Pump_ptr(LIST *el): el(el)
-    {}
-  };
-
   LIST   *m_pumps, *m_last;
-  uint   m_count;       // current number of pumps
-  size_t m_total;       // accumulated position of all drivers
-  size_t m_init_left;   // how much of init data is left (estimate)
-  uint   m_known_count; // no. drivers which can estimate init data size
-  OStream &m_str;
-
-  void move_pump_to_end(const Pump_ptr &p)
-  {
-    DBUG_ASSERT(m_pumps);
-    if (m_last != p.el)
-    {
-      m_pumps= list_delete(m_pumps,p.el);
-      m_last->next= p.el;
-      p.el->prev= m_last;
-      p.el->next= NULL;
-      m_last= p.el;
-    }
-  }
-
-  void remove_pump(Pump_ptr &p)
-  {
-    DBUG_ASSERT(p.el);
-
-    if (m_last == p.el)
-      m_last= m_last->prev;
-
-    if (m_pumps)
-    {
-      m_pumps= list_delete(m_pumps,p.el);
-      m_count--;
-    }
-
-    if (p)
-    {
-      p->end();
-      delete static_cast<Pump*>(p.el->data);
-      my_free((::gptr)p.el,MYF(0));
-    }
-  }
+  uint   m_count;       ///< current number of pumps
+  size_t m_total;       ///< accumulated position of all drivers
+  size_t m_init_left;   ///< how much of init data is left (estimate)
+  uint   m_known_count; ///< no. drivers which can estimate init data size
+  OStream &m_str;       ///< stream to which we write
 
   Scheduler(OStream &s):
     init_count(0), prepare_count(0), finish_count(0),
@@ -321,10 +228,34 @@
     m_str(s)
   {}
 
-  friend bool write_table_data(const Backup_info&, OStream&);
+  void move_pump_to_end(const Pump_iterator&);
+  void remove_pump(Pump_iterator&);
 
+  friend bool write_table_data(THD*, Backup_info&, OStream&);
 };
 
+/**
+  Extend Backup_pump with information about its position relative
+  to other pumps.
+ */
+class Scheduler::Pump: public Backup_pump
+{
+  size_t start_pos;
+  Block_writer bw;
+
+  friend class Scheduler;
+
+ public:
+
+  Pump(uint no, Image_info &img, OStream &s):
+    Backup_pump(no,img,bw), start_pos(0), bw(2048,s)
+  {}
+
+  size_t pos() const
+  { return start_pos + bytes_in; }
+};
+
+
 
 #define CHECK_STATE(P,S)  \
  do { \
@@ -333,47 +264,56 @@
    DBUG_ASSERT( (P).state == (S) ); \
  } while(0)
 
-// Poll all backup drivers involved in the backup image for their backup
-// data and write to a backup stream.
-// Currently only one driver is handled.
 
-bool write_table_data(const Backup_info &info, OStream &s)
+/**
+  Save data from tables being backed up.
+
+  Function initializes and controls backup drivers which create the image
+  of table data. Currently single thread is used and drivers are polled in
+  a round robin fashion.
+ */
+bool write_table_data(THD*, Backup_info &info, OStream &s)
 {
   DBUG_ENTER("backup::write_table_data");
 
+  info.data_size= 0;
+
   if (info.img_count==0 || info.table_count==0) // nothing to backup
     DBUG_RETURN(TRUE);
 
   Scheduler   sch(s);               // scheduler instance
-  List<Backup_subimage>  inactive;  // list of images not yet being created
+  List<Scheduler::Pump>  inactive;  // list of images not yet being created
   size_t      max_init_size=0;      // keeps maximal init size for images in inactive list
 
+  size_t      start_bytes= s.bytes;
+
   DBUG_PRINT("backup/data",("initializing scheduler"));
 
   // add unknown "at end" drivers to scheduler, rest to inactive list
 
-  List_iterator<Backup_subimage>  it(const_cast< List<Backup_subimage>& >(info.images));
-  Backup_subimage *img;
-
-  while ((img= it++))
+  for (uint no=0; no < info.img_count; ++no)
   {
-    result_t res;
+    Image_info *i= info.images[no];
 
-    res= img->create_driver();
-    DBUG_ASSERT(res == OK);
+    if (!i)
+      continue;
+
+    Scheduler::Pump *p= new Scheduler::Pump(no,*i,s);
+
+    DBUG_ASSERT(p);
 
-    size_t init_size= img->init_size;
+    size_t init_size= p->init_size;
 
     if (init_size == Driver::UNKNOWN_SIZE)
     {
-      sch.add(*img);
+      sch.add(p);
     }
     else
     {
       if (init_size > max_init_size)
         max_init_size= init_size;
 
-      inactive.push_back(img);
+      inactive.push_back(p);
     }
   }
 
@@ -388,11 +328,14 @@
                             inactive.elements));
 
   DBUG_PRINT("backup/data",("-- INIT PHASE --"));
+  BACKUP_SYNC("data_init");
 
-  // poll "at end" drivers activating inactive ones on the way
-  // note: if scheduler is empty and there are images with non-zero
-  // init size (max_init_size > 0) then enter the loop as one such image
-  // will be added to the scheduler inside.
+  /*
+   poll "at end" drivers activating inactive ones on the way
+   note: if scheduler is empty and there are images with non-zero
+   init size (max_init_size > 0) then enter the loop as one such image
+   will be added to the scheduler inside.
+  */
 
   while (sch.init_count > 0 || sch.is_empty() && max_init_size > 0)
   {
@@ -401,12 +344,12 @@
 
     if (max_init_size > 0 && sch.init_left() <= max_init_size)
     {
-      List_iterator<Backup_subimage>  it(inactive);
-      Backup_subimage *p;
+      List_iterator<Scheduler::Pump>  it(inactive);
+      Scheduler::Pump *p;
 
       size_t second_max= 0;
       max_init_size= 0;
-      img= NULL;
+      Scheduler::Pump *p1= NULL;
 
       while ((p= it++))
       {
@@ -414,31 +357,35 @@
         {
           second_max= max_init_size;
           max_init_size= p->init_size;
-          img= p;
+          p1= p;
         }
       }
 
       max_init_size= second_max;
 
-      sch.add(*img);
+      sch.add(p1);
     }
 
+    // poll drivers
+
     sch.step();
   }
 
   // start "at begin" drivers
   DBUG_PRINT("backup/data",("- activating \"at begin\" drivers"));
 
-  List_iterator<Backup_subimage>  it1(inactive);
+  List_iterator<Scheduler::Pump>  it1(inactive);
+  Scheduler::Pump *p;
 
-  while ((img= it1++))
-    sch.add(*img);
+  while ((p= it1++))
+    sch.add(p);
 
   while (sch.init_count > 0)
     sch.step();
 
   // prepare for VP
   DBUG_PRINT("backup/data",("-- PREPARE PHASE --"));
+  BACKUP_SYNC("data_prepare");
 
   sch.prepare();
 
@@ -447,12 +394,16 @@
 
   // VP creation
   DBUG_PRINT("backup/data",("-- SYNC PHASE --"));
+  BACKUP_SYNC("data_lock");
 
   sch.lock();
+
+  BACKUP_SYNC("data_unlock");
   sch.unlock();
 
   // get final data from drivers
   DBUG_PRINT("backup/data",("-- FINISH PHASE --"));
+  BACKUP_SYNC("data_finish");
 
   while (sch.finish_count > 0)
     sch.step();
@@ -460,18 +411,60 @@
   DBUG_PRINT("backup/data",("-- DONE --"));
   sch.close();
 
-  DBUG_RETURN(TRUE);
+  info.data_size= s.bytes - start_bytes;
 
+  DBUG_RETURN(TRUE);
 }
 
+} // backup namespace
+
 /**************************************************
- * Implementation of Scheduler
+
+        Implementation of Scheduler
+
  **************************************************/
 
-void Scheduler::step()
+namespace backup {
+
+/**
+  Used to iterate over backup pumps of a scheduler.
+ */
+struct Scheduler::Pump_iterator
 {
+  LIST  *el;
+
+  Pump* operator->()
+  {
+    return el? static_cast<Pump*>(el->data) : NULL;
+  }
+
+  void  operator++()
+  {
+    if(el) el= el->next;
+  }
+
+  operator bool() const
+  { return el && el->data; }
+
+  void operator=(const Pump_iterator &p)
+  { el= p.el; }
+
+  Pump_iterator(): el(NULL)
+  {}
 
+  Pump_iterator(const Scheduler &sch): el(sch.m_pumps)
+  {}
 
+};
+
+/**
+  Pick next backup pump and call its @c pump() method.
+
+  Method updates statistics of number of drivers in each phase which is used
+  to detect end of a backup process.
+ */
+void Scheduler::step()
+{
   /* find pump with least data read.
 
      Pick next driver to pump data from. The driver with least data read
@@ -480,7 +473,8 @@
 
      TODO: implement this
    */
-  Pump_ptr p(m_pumps);
+
+  Pump_iterator p(*this);
 
 /*
   for (Pump_ptr it(m_pumps); it; ++it)
@@ -565,18 +559,17 @@
 
 }
 
-
-void Scheduler::add(Backup_subimage &img)
+/// Add backup pump to the scheduler.
+void Scheduler::add(Pump *p)
 {
   size_t  avg= m_count? m_total/m_count + 1 : 0;
   bool res;
 
-  Pump *p= new Pump(m_str,img,avg);
-
   DBUG_ASSERT(p);
+  p->start_pos= avg;
 
   DBUG_PRINT("backup/data",("Adding %s to scheduler (at pos %lu)",
-                            img.name(), (unsigned long)avg));
+                            p->m_name, (unsigned long)avg));
 
   m_pumps= list_cons(p,m_pumps);
   if (!m_last)
@@ -588,9 +581,9 @@
   res= p->begin();
   DBUG_ASSERT(res);
 
-  if (img.init_size != Driver::UNKNOWN_SIZE)
+  if (p->init_size != Driver::UNKNOWN_SIZE)
   {
-    m_init_left += img.init_size;
+    m_init_left += p->init_size;
     m_known_count++;
   }
 
@@ -616,12 +609,53 @@
   DBUG_PRINT("backup/data",("total init data size estimate: %lu",(unsigned long)m_init_left));
 }
 
+/// Move backup pump to the end of scheduler's list.
+void Scheduler::move_pump_to_end(const Pump_iterator &p)
+{
+  DBUG_ASSERT(m_pumps);
+  if (m_last != p.el)
+  {
+    m_pumps= list_delete(m_pumps,p.el);
+    m_last->next= p.el;
+    p.el->prev= m_last;
+    p.el->next= NULL;
+    m_last= p.el;
+  }
+}
+
+/**
+  Remove backup pump from the scheduler.
+
+  The corresponding backup driver is shut down using @c end() call.
+ */
+void Scheduler::remove_pump(Pump_iterator &p)
+{
+  DBUG_ASSERT(p.el);
+
+  if (m_last == p.el)
+    m_last= m_last->prev;
+
+  if (m_pumps)
+  {
+    m_pumps= list_delete(m_pumps,p.el);
+    m_count--;
+  }
+
+  if (p)
+  {
+    p->end();
+    delete static_cast<Pump*>(p.el->data);
+    my_free((::gptr)p.el,MYF(0));
+  }
+}
+
+/// Start prepare phase for all drivers.
 void Scheduler::prepare()
 {
   DBUG_ASSERT(init_count==0);
   DBUG_PRINT("backup/data",("calling prepare() for all drivers"));
 
-  for(Pump_ptr it(m_pumps); it; ++it)
+  for(Pump_iterator it(*this); it; ++it)
   {
     it->prepare();
     if (it->state == backup_state::PREPARING)
@@ -632,22 +666,24 @@
                             m_count, init_count, prepare_count, finish_count));
 }
 
+/// Lock all drivers.
 void Scheduler::lock()
 {
   DBUG_ASSERT(prepare_count==0);
   DBUG_PRINT("backup/data",("calling lock() for all drivers"));
 
-  for(Pump_ptr it(m_pumps); it; ++it)
+  for(Pump_iterator it(*this); it; ++it)
     it->lock();
 
   DBUG_PRINT("backup/data",("driver counts: total=%u, init=%u, prepare=%u, finish=%u.",
                             m_count, init_count, prepare_count, finish_count));
 }
 
+/// Unlock all drivers.
 void Scheduler::unlock()
 {
   DBUG_PRINT("backup/data",("calling unlock() for all drivers"));
-  for(Pump_ptr it(m_pumps); it; ++it)
+  for(Pump_iterator it(*this); it; ++it)
   {
     it->unlock();
     if (it->state == backup_state::FINISHING)
@@ -655,25 +691,28 @@
   }
 }
 
+/// Shut down backup process.
 void Scheduler::close()
 {
   // shutdown any remaining drivers
   while (m_count && m_pumps)
   {
-    Pump_ptr p(m_pumps);
+    Pump_iterator p(*this);
     remove_pump(p);
   }
 }
 
 
 /**************************************************
- * Implementation of Backup_pump
+
+         Implementation of Backup_pump
+
  **************************************************/
 
-Backup_pump::Backup_pump(Backup_subimage &img, Block_writer &bw):
+Backup_pump::Backup_pump(uint no, Image_info &img, Block_writer &bw):
   state(backup_state::INACTIVE), mode(READING),
-  init_size(img.init_size), bytes_in(0), bytes_out(0),
-  m_drv_no(img.no), m_drv(img.driver()), m_bw(bw), m_buf_head(NULL),
+  init_size(0), bytes_in(0), bytes_out(0),
+  m_drv_no(no), m_drv(NULL), m_bw(bw), m_buf_head(NULL),
   m_buf_retries(0)
 {
   m_buf.data= NULL;
@@ -682,13 +721,95 @@
               1+img.tables.count(),
               FALSE); // not thread safe
   m_name= img.name();
+  m_drv= img.get_backup_driver();
+  DBUG_ASSERT(m_drv);
+  init_size= m_drv->init_size();
 }
 
 Backup_pump::~Backup_pump()
 {
+  if (m_drv)
+    m_drv->free();
   bitmap_free(&m_closed_streams);
 }
 
+/// Initialize backup driver.
+bool Backup_pump::begin()
+{
+  state= backup_state::INIT;
+  DBUG_PRINT("backup/data",(" %s enters INIT state",m_name));
+  return m_drv->begin(m_buf_size) == backup::OK;
+}
+
+/// Shut down the driver.
+bool Backup_pump::end()
+{
+  if (state != backup_state::SHUT_DOWN)
+  {
+    DBUG_PRINT("backup/data",(" shutting down %s",m_name));
+    m_drv->end();
+    state= backup_state::SHUT_DOWN;
+  }
+  return TRUE;
+}
+
+/// Start prepare phase for the driver.
+bool Backup_pump::prepare()
+{
+  result_t res= m_drv->prelock();
+
+  state= backup_state::PREPARING;
+
+  switch (res) {
+
+  case READY:
+    state= backup_state::READY;
+
+  case OK:
+    res= OK;
+    break;
+
+  default:
+    break;
+
+  }
+
+  DBUG_PRINT("backup/data",(" preparing %s, goes to %s state",
+                            m_name,backup_state::name[state]));
+  return res == backup::OK;
+}
+
+/// Request VP from the driver.
+bool Backup_pump::lock()
+{
+  DBUG_PRINT("backup/data",(" locking %s",m_name));
+  return m_drv->lock() == backup::OK;
+}
+
+/// Unlock the driver after VP creation.
+bool Backup_pump::unlock()
+{
+  DBUG_PRINT("backup/data",(" unlocking %s, goes to FINISHING state",m_name));
+  state= backup_state::FINISHING;
+  return m_drv->unlock() == backup::OK;  // FIXME: better handling of errors
+}
+
+/**
+  Poll the driver for next block of data and/or write data to stream.
+
+  Depending on the current mode in which the pump is operating (@c mode member)
+  the backup driver is polled for image data or data obtained before is written
+  to the stream. Answers from drivers @c get_data() method are interpreted and
+  the state of the driver is updated accordingly.
+
+  Each block of data obtained from a driver is prefixed with the image
+  number and the table number stored in @c buf.table_no before it is written to
+  the stream:
+  <pre>
+    | img no | table no | data from the driver |
+  </pre>
+ */
+
 // FIXME: return number of bytes *written* to stream.
 
 size_t Backup_pump::pump()
@@ -752,7 +873,14 @@
 
       DBUG_ASSERT(m_buf_head);
 
-      result_t res= m_drv.get_data(m_buf);
+      /*
+        If the global variable "backup_sleep" has been set, use that value
+        to cause the kernel to sleep in seconds.
+      */
+      if (backup_sleep)
+        sleep(backup_sleep);
+
+      result_t res= m_drv->get_data(m_buf);
 
       switch (res) {
 
@@ -843,84 +971,61 @@
   return howmuch;
 }
 
+} // backup namespace
 
-/**************************************************
- * Implementation of Block_writer
- **************************************************/
-
-Block_writer::result_t
-Block_writer::get_buf(Buffer &buf)
-{
-  buf.table_no= 0;
-  buf.last= FALSE;
-  buf.size= buf_size;
-  buf.data= m_str.get_window(buf.size);
-
-  if (!buf.data)
-    return NO_RES;
-
-  return OK;
-}
-
-Block_writer::result_t
-Block_writer::write_buf(const Buffer &buf)
-{
-  stream_result::value  res= m_str.write_window(buf.size);
-  m_str.end_chunk();
-  return  res == stream_result::OK ? OK : ERROR;
-}
-
-Block_writer::result_t
-Block_writer::drop_buf(Buffer &buf)
-{
-  m_str.drop_window();
-  buf.data= NULL;
-  buf.size= 0;
-  return OK;
-}
+/***********************************************
 
-} // backup namespace
+                  DATA RESTORE
 
+ ***********************************************/
 
 namespace backup {
 
-// Read backup image data from a backup stream and forward it to restore drivers.
-// This simple implementation handles only one restore driver.
-
-bool restore_table_data(const Restore_info &info, IStream &s)
+/**
+  Read backup image data from a backup stream and forward it to restore drivers.
+ */
+bool restore_table_data(THD*, Restore_info &info, IStream &s)
 {
   DBUG_ENTER("restore::restore_table_data");
 
   enum { READING, SENDING, DONE, ERROR } state= READING;
 
+  // FIXME: when selective restores are implemented, check that
+  // nothing was *selected* to be restored.
+
   if (info.img_count==0 || info.table_count==0) // nothing to restore
     DBUG_RETURN(TRUE);
 
-  Restore_driver* drv[32];
-  uint drv_count;
+  result_t res;
+  Restore_driver* drv[256];
 
-  List_iterator<Restore_subimage> it(const_cast<Restore_info&>(info).images);
-  Restore_subimage  *img;
+  DBUG_ASSERT(info.img_count < 256);
 
-  for(drv_count=0; (img= it++) && drv_count < 32 ; ++drv_count)
+  for (uint no=0; no < info.img_count; ++no)
   {
-    result_t res;
+    drv[no]= NULL;
+
+    Image_info *img= info.images[no];
+
+    // note: img can be NULL if it is not used in restore.
+    if (!img)
+      continue;
 
-    DBUG_PRINT("restore",("Creating %s (image %u)",img->name(), img->no+1));
-    drv[drv_count]= img->driver();
-    DBUG_ASSERT(drv[drv_count]);
+    drv[no]= img->get_restore_driver();
+    DBUG_ASSERT(drv[no]);
 
-    res= drv[drv_count]->begin(0); // TODO: provide correct size
+    res= drv[no]->begin(0); // TODO: provide correct size
     DBUG_ASSERT(res == OK);
   }
 
   Buffer  buf;
-  uint    img_no;
+  uint    img_no=0;
   uint    repeats=0, errors= 0;
 
   static const uint MAX_ERRORS= 3;
   static const uint MAX_REPEATS= 7;
 
+  size_t start_bytes= s.bytes;
 
   // main data reading loop
 
@@ -961,13 +1066,20 @@
 
     case SENDING:
 
-      if( img_no < 1 || !drv[img_no-1] )
+      if( img_no < 1 || img_no > info.img_count || !drv[img_no-1] )
       {
         DBUG_PRINT("restore",("Skipping data for subimage %u",img_no));
         state= READING;
         break;
       }
 
+      /*
+        If the global variable "backup_sleep" has been set, use that value
+        to cause the kernel to sleep in seconds.
+      */
+      if (backup_sleep)
+        sleep(backup_sleep);
+
       switch( drv[img_no-1]->send_data(buf) ) {
 
       case backup::OK:
@@ -1020,8 +1132,13 @@
   if (state != DONE)
     DBUG_PRINT("restore",("state is %d",state));
 
-  for(uint no=0; no < drv_count; ++no)
+  info.data_size= s.bytes - start_bytes;
+
+  for(uint no=0; no < info.img_count; ++no)
   {
+    if (!drv[no])
+      continue;
+
     DBUG_PRINT("restore",("Shutting down driver %u",no));
     drv[no]->end();
     drv[no]->free();
@@ -1032,4 +1149,71 @@
 
 
 } // backup namespace
+
+
+/**************************************************
+
+       Implementation of Block_writer
+
+ **************************************************/
+
+namespace backup {
+
+/**
+  Allocate new buffer for data transfer.
+
+  The buffer size is given by @c buf.size member.
+
+  Current implementation tries to allocate the data transfer bufer in the
+  stream. It can handle only one buffer at a time.
+
+  @returns @c NO_RES if buffer can not be allocated, @c OK otherwise.
+ */
+Block_writer::result_t
+Block_writer::get_buf(Buffer &buf)
+{
+  buf.table_no= 0;
+  buf.last= FALSE;
+  buf.size= buf_size;
+  buf.data= m_str.get_window(buf.size);
+
+  if (!buf.data)
+    return NO_RES;
+
+  return OK;
+}
+
+/**
+  Write block of data to stream.
+
+  The buffer containing data must be obtained from a previous @c get_buf() call.
+  After this call, bufer is returned to the buffer pool and can be reused for
+  other transfers.
+ */
+Block_writer::result_t
+Block_writer::write_buf(const Buffer &buf)
+{
+  stream_result::value  res= m_str.write_window(buf.size);
+  m_str.end_chunk();
+  return  res == stream_result::OK ? OK : ERROR;
+}
+
+/**
+  Return buffer to the buffer pool.
+
+  If a buffer obtained from @c get_buf() is not written to the stream, this
+  method can return it to the buffer pool so that it can be reused for other
+  transfers.
+ */
+Block_writer::result_t
+Block_writer::drop_buf(Buffer &buf)
+{
+  m_str.drop_window();
+  buf.data= NULL;
+  buf.size= 0;
+  return OK;
+}
+
+} // backup namespace
+
 
Thread
bk commit into 5.1 tree (rafal:1.2527)rsomla29 May