Below is the list of changes that have just been committed into a local
5.2 repository of rafal. When rafal does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-06-25 09:33:52+02:00, rafal@quant.(none) +3 -0
WL#3568 (Online backup: Stream format and server file handling):
Definition of the interface used for writting/reading backup stream plus
basic implementation. Note that the implementation is in prototype stage
and has few known limitations which should be removed in the future:
- there is hard coded limit on the size of a single data chunk,
- data is read/written one chunk at a time regardless of the chunk size.
sql/backup/debug.h@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +81 -0
Macros for debugging backup code.
sql/backup/debug.h@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +0 -0
sql/backup/stream.cc@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +325 -0
Implementation of backup stream classes.
sql/backup/stream.cc@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +0 -0
sql/backup/stream.h@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +645 -0
- templates for serialization of basic datatypes,
- definitions of backup stream classes.
sql/backup/stream.h@stripped, 2007-06-25 09:33:48+02:00, rafal@quant.(none) +0 -0
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: rafal
# Host: quant.(none)
# Root: /ext/mysql/bk/backup/alpha
--- New file ---
+++ sql/backup/debug.h 07/06/25 09:33:48
#ifndef _BACKUP_DEBUG_H
#define _BACKUP_DEBUG_H
#define BACKUP_SYNC_TIMEOUT 300
/*
TODO
- decide how to configure DEBUG_BACKUP
*/
#ifndef DBUG_OFF
# define DBUG_BACKUP
#endif
#ifdef DBUG_BACKUP
/*
Macros for debugging error (or other) conditions. Usage:
TEST_ERROR_IF(<condition deciding if TEST_ERROR should be true>);
if (<other conditions> || TEST_ERROR)
{
<report error>
}
The additional TEST_ERROR condition will be set only if "backup_error_test"
error injection is set in the server.
Notes:
- Whenever TEST_ERROR is used in a condition, TEST_ERROR_IF() should
be called before - otherwise TEST_ERROR might be unintentionally TRUE.
- This mechanism is not thread safe.
*/
namespace backup {
extern bool test_error_flag;
}
#define TEST_ERROR backup::test_error_flag
// FIXME: DBUG_EXECUTE_IF below doesn't work
#define TEST_ERROR_IF(X) \
do { \
backup::test_error_flag= FALSE; \
DBUG_EXECUTE_IF("backup_error_test", backup::test_error_flag= (X);); \
} while(0)
/*
Macros for creating synchronization points in tests.
Usage
In the backup code:
BACKUP_SYNC("<synchronization point name>");
In a client:
SELECT get_lock("<synchronization point name>",<timeout>);
...
SELECT release_lock("<synchronization point name>");
If the lock is kept by a client, server code will wait on the corresponding
BACKUP_SYNC() until it is released.
Consider: set thd->proc_info when waiting on lock
*/
#define BACKUP_SYNC(S) \
do { \
DBUG_PRINT("backup",("== synchronization on '%s' ==",(S))); \
debug_sync_point((S),BACKUP_SYNC_TIMEOUT); \
} while (0)
#else
#define BACKUP_SYNC(S)
#define TEST_ERROR FALSE
#define TEST_ERROR_IF(X)
#endif
#endif
--- New file ---
+++ sql/backup/stream.cc 07/06/25 09:33:48
#include "../mysql_priv.h"
#include "stream.h"
/*
TODO
- blocking of OStream output when data window is allocated.
- use my_read instead of read - need to know how to detect EOF.
- remove fixed chunk size limit (backup::Window::buf_size)
- better file buffering (in case of small data chunks)
*/
namespace backup {
/************** Window *************************/
Window::Result Window::set_length(const size_t len)
{
DBUG_ASSERT(!m_blocked);
m_end= m_head+len;
if (m_end <= last_byte)
return stream_result::OK;
m_end= last_byte;
return out_of_bounds();
}
Window::Result Window::move(const off_t offset)
{
DBUG_ASSERT(!m_blocked);
m_head+= offset;
if (m_head > m_end)
m_end= m_head;
if (m_head <= last_byte)
return stream_result::OK;
m_head= m_end= last_byte;
return out_of_bounds();
}
/************** Stream *************************/
bool Stream::open()
{
close();
m_fd= my_open(m_path.c_ptr(),m_flags,MYF(0));
return m_fd >= 0;
}
void Stream::close()
{
if (m_fd >= 0)
{
my_close(m_fd,MYF(0));
m_fd= -1;
}
}
bool Stream::rewind()
{
return m_fd >= 0 && my_seek(m_fd,0,SEEK_SET,MYF(0)) == 0;
}
/************** OStream *************************/
/*
Implementation of data chunks.
Data is written to the file in form of data chunks. Each chunk is prefixed with its size stored
in 2 bytes (should it be increased to 4?).
OStream instance uses an output buffer of fixed size inherited from Window class. The size of
a chunk is limited by the size of this buffer as a whole chunk is stored inside the buffer
before writing to the file.
writing to the file happens when the current chunk is closed with <code>end_chunk()</code>
method. At the time of writing the output, buffer contents is as follows:
====================== <- m_buf
2 bytes for chunk size
====================== <- m_buf+2 (chunk data starts here)
data written to
the chunk
---------------------- <- m_head
current output window
====================== <- m_end (this is end of chunk data)
*/
byte* OStream::get_window(const size_t len)
{
if (m_blocked || m_end+len > last_byte)
return NULL;
m_head= m_end;
m_end+= len;
m_blocked= TRUE;
return m_head;
}
void OStream::drop_window()
{
if (m_blocked)
m_end= m_head;
m_blocked= FALSE;
}
OStream::Result
OStream::write_window(const size_t len)
{
if (m_blocked)
{
DBUG_ASSERT(m_head+len<=m_end);
m_head+=len;
m_end= m_head;
}
m_blocked= FALSE;
return stream_result::OK;
}
void OStream::close(bool destroy)
{
if (m_fd<0)
return;
end_chunk();
// write 0 at the end
last_byte=m_buf+2;
Window::reset();
write2int(0);
my_write(m_fd,(::byte*)m_buf,2,MYF(0));
Stream::close();
if (destroy)
delete this;
}
stream_result::value OStream::end_chunk()
{
if (m_blocked)
drop_window();
DBUG_ASSERT(m_end >= m_buf+2);
size_t len= m_end - m_buf - 2; // length of the chunk
if (len==0)
{
Window::reset(2);
return stream_result::OK;
}
// store length of chunk in front of the buffer
Window::reset();
write2int(len);
bytes+= len;
len+= 2; // now len is the number of bytes we want to write
uint res= my_write(m_fd,(::byte*)m_buf,len,MY_NABP);
Window::reset(2);
if (res)
return stream_result::ERROR;
return stream_result::OK;
}
/************** IStream *************************/
/*
Handling of stream data chunks.
Chunks are read into the input buffer inherited from <code>Window</code> class. It is assumed
that a whole chunk will always fit into the buffer (otherwise error is reported).
When reading a chunk of data, the size of the next chunk is also read-in in the same file access
and stored in the <code>next_chunk_len</code> member.
The input buffer has the following layout:
=================== <- m_buf (start of input buffer)
chunk data
------------------- <- m_head
current input
window
------------------- <- m_end
chunk data
=================== <- last_byte (end of chunk data)
size of next chunk
=================== <- last_byte+2
The first chunk of data is read into the input buffer when stream is opened. Next chunks are
read inside <code>next_chunk()</code> method.
*/
// PRE: there is at least one chunk in the stream.
bool IStream::rewind()
{
Stream::rewind();
Window::reset();
bytes= 0;
if (my_read(m_fd, (::byte*)m_buf, 2, MYF(0)) < 2)
return FALSE;
last_byte= m_head+2;
read2int(next_chunk_len);
Window::reset(); // ignore the 2 bytes containing chunk length
last_byte= m_buf;
return next_chunk() == stream_result::OK;
}
stream_result::value IStream::next_chunk()
{
bytes+= (last_byte-m_buf); // update statistics
last_byte= m_buf;
if (next_chunk_len == 0)
{
Window::reset();
return stream_result::EOS;
}
size_t len= next_chunk_len+2;
long int howmuch= 0; // POSIX ssize_t not defined on win platform :|
while (len > 0 && (howmuch= ::read(m_fd,m_buf,len)) > 0)
len-= howmuch;
if (howmuch<0) // error reading file
{
next_chunk_len= 0;
Window::reset();
return stream_result::ERROR;
}
if (len == 0)
{
// read length of next chunk (at the end of the buffer)
last_byte+= next_chunk_len+2;
Window::reset(next_chunk_len);
read2int(next_chunk_len);
last_byte-=2;
}
else
{
last_byte+= next_chunk_len+2-len;
next_chunk_len= 0;
}
Window::reset();
return howmuch==0 ? stream_result::EOS : stream_result::OK;
}
#ifdef DBUG_BACKUP
// Show data chunks in a backup stream;
void dump_stream(IStream &s)
{
stream_result::value res;
byte b;
DBUG_PRINT("stream",("=========="));
do {
uint chunk_size;
for( chunk_size=0; (res= s.readbyte(b)) == stream_result::OK ; ++chunk_size );
DBUG_PRINT("stream",(" chunk size= %u",chunk_size));
if( res == stream_result::EOC )
{
DBUG_PRINT("stream",("----------"));
res= s.next_chunk();
}
} while ( res == stream_result::OK );
if (res == stream_result::EOS)
DBUG_PRINT("stream",("=========="));
else
DBUG_PRINT("stream",("== ERROR: %d",(int)res));
}
#endif
} // backup namespace
--- New file ---
+++ sql/backup/stream.h 07/06/25 09:33:48
#ifndef _BACKUP_STREAM_H_
#define _BACKUP_STREAM_H_
#include <backup/api_types.h> // for Buffer definition
#include <backup/debug.h> // for definition of DBUG_BACKUP
/**
@file
Generic Stream Interface for serializing basic types (integers, strings etc).
*/
namespace util
{
typedef unsigned char byte;
struct stream_result
{
enum value { OK=0, NIL, ERROR };
};
/*
Stream window: an abstract object which can be used to access a stream
of bytes through a window in process address space. The methods give pointer
to the beginning of the window and its current size. There is a request for
enlarging the window. We can also move the window beginning to the right
(decreasing window size). If some bytes move outside the window, they can't
be accessed any more.
================================================> (stream of bytes)
|--------------------|
^
|
window header
1. Extend window:
|--------------------............|
^
|
window header
2. Move window header (shrinks window):
.....|---------------|
^
|
window header
Interface
---------
typename Result;
byte* head() const; // pointer to start of the window
byte* end() const; // pointer to the end of the window
size_t length() const; // current length of the window (in bytes)
Result set_length(const size_t size); // extend the window to have (at least) given length
Result move(const off_t offset); // move window header
//void init();
//void done();
operator int(const Result&); // interpret the result: is it error or other
common situation (end of data).
Note: window can be used for reading or writing.
*/
/**
Stream classes.
These classes provide interface for serializing basic data
(numbers, strings) using host independent format.
Interface is parametrised by a class implementing stream window.
Instance of the window class is used to read/write stream data.
*/
template<class SWin>
class IStream
{
SWin &m_win;
public:
typedef typename SWin::Result Result;
IStream(SWin &swin): m_win(swin)
{}
Result readbyte(byte &x);
Result read2int(uint &x);
Result read4int(ulong &x);
Result readint(ulong &x);
Result readint(uint &x)
{
ulong y;
Result res= readint(y);
x= y;
return res;
}
Result readstr(String &s);
};
template<class SWin>
class OStream
{
SWin &m_win;
public:
typedef typename SWin::Result Result;
OStream(SWin &swin): m_win(swin)
{}
Result writebyte(const byte x);
Result write2int(const int x);
Result write4int(const ulong x);
Result writeint(const ulong x);
Result writestr(const String &s);
Result writestr(const char *s)
{ return writestr(String(s,table_alias_charset)); }
Result writenil();
};
template<class SW>
inline
typename IStream<SW>::Result
IStream<SW>::readbyte(byte &x)
{
Result res;
if ((res= m_win.set_length(1)) != stream_result::OK)
return res;
x= *m_win.head();
return m_win.move(1);
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::writebyte(const byte x)
{
Result res;
if ((res= m_win.set_length(1)) != stream_result::OK)
return res;
(*m_win.head())= x;
return m_win.move(1);
}
template<class SW>
inline
typename IStream<SW>::Result
IStream<SW>::read2int(uint &x)
{
Result res;
if ((res= m_win.set_length(2)) != stream_result::OK)
return res;
x= uint2korr(m_win.head());
return m_win.move(2);
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::write2int(const int x)
{
Result res;
if ((res= m_win.set_length(2)) != stream_result::OK)
return res;
int2store(m_win.head(),x);
return m_win.move(2);
}
template<class SW>
inline
typename IStream<SW>::Result
IStream<SW>::read4int(ulong &x)
{
Result res;
if ((res= m_win.set_length(4)) != stream_result::OK)
return res;
x= uint4korr(m_win.head());
return m_win.move(4);
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::write4int(const ulong x)
{
Result res;
if ((res= m_win.set_length(4)) != stream_result::OK)
return res;
int4store(m_win.head(),x);
return m_win.move(4);
}
// write/read number using variable-length encoding
template<class SW>
inline
typename IStream<SW>::Result
IStream<SW>::readint(ulong &x)
{
Result res;
if ((res= m_win.set_length(1)) != stream_result::OK)
return res;
x= *m_win.head();
m_win.move(1);
switch( x ) {
case 251:
return Result(stream_result::NIL);
case 252:
uint y;
res= read2int(y);
x= y;
return res;
case 253:
return read4int(x);
default:
return Result(stream_result::OK);
}
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::writeint(const ulong x)
{
Result res;
if ((res= m_win.set_length(1)) != stream_result::OK)
return res;
if (x < 251)
return writebyte((byte)x);
if (x < (1UL<<16))
{
res= writebyte(252);
return res == stream_result::OK ? write2int(x) : res;
}
res= writebyte(253);
return res == stream_result::OK ? write4int(x) : res;
}
// Write/read string using "length coded string" format
template<class SW>
inline
typename IStream<SW>::Result
IStream<SW>::readstr(String &s)
{
Result res;
uint len;
if ((res= readint(len)) != stream_result::OK)
return res;
if ((res= m_win.set_length(len)) != stream_result::OK)
return res;
s.free();
s.copy((const char*)m_win.head(), len, &::my_charset_bin);
return m_win.move(len);
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::writestr(const String &s)
{
Result res;
uint len= s.length();
if ((res= writeint(len)) != stream_result::OK)
return res;
if ((res= m_win.set_length(len)) != stream_result::OK)
return res;
memcpy(m_win.head(), s.ptr(), len);
return m_win.move(len);
}
template<class SW>
inline
typename OStream<SW>::Result
OStream<SW>::writenil()
{
return writebyte(251);
}
} // util namespace
/************************************************************
Backup Stream Interface
The stream is organized as a sequence of chunks each of which
can have different length. When stream is read chunk boundaries
are detected. If this happens, next_chunk() member must be called
in order to access data in next chunk. When writing to a stream,
data is appended to the current chunk. End_chunk() member closes
the current chunk and starts a new one.
************************************************************/
namespace backup {
struct stream_result
{
enum {
OK= util::stream_result::OK,
NIL= util::stream_result::NIL,
ERROR= util::stream_result::ERROR,
EOC, // end of chunk
EOS // end of stream
};
class value
{
int m_val;
public:
value(): m_val(OK) {}
value(const int val): m_val(val) {}
operator int() { return m_val; }
operator result_t()
{
return m_val == stream_result::ERROR ? backup::ERROR :
m_val == stream_result::EOC || m_val == stream_result::EOS ?
backup::DONE : backup::OK;
}
};
};
/**
Implementation of stream window interface to be used by util::{I,O}Stream
templates.
It provides a window inside a static data buffer <code>m_buf</code> of
fixed size <code>buf_size</code>. The window starts at <code>m_head</code> and ends
at <code>m_end</code>. Window can be moved and resized using <code>move()</code>
and <code>set_length()</code> methods. These methods take into account
size of <code>m_buf</code> and report overflows accordingly.
The window can be used by the util::{I,O}Stream templates to read/write
basic data types in a uniform, host independent way.
*/
class Window
{
public:
typedef stream_result::value Result;
byte* head()
{ return m_head; }
byte* end()
{ return m_end; }
size_t length()
{ return m_end - m_head; }
Result set_length(const size_t); ///< resize window to the given size
Result move(const off_t); ///< move beginning of the window (shrinks size)
virtual ~Window() {}
protected:
Window():
last_byte(m_buf+buf_size), m_head(m_buf), m_end(m_buf), m_blocked(FALSE)
{}
static const size_t buf_size= 4*1024; ///< data buffer size
byte m_buf[buf_size];
byte *last_byte; ///< points at the byte after the last byte of m_buf
byte *m_head; ///< points at first byte of the current window
byte *m_end; ///< points at the byte after the last byte of the current window
bool m_blocked; ///< If true, set_length() and move() are blocked (return ERROR).
/// Create empty window at offset in m_buf
void reset(off_t offset=0)
{ m_head= m_end= m_buf+offset; }
/// Define the result which should be returned in case of buffer overflow.
virtual Result out_of_bounds() const
{ return stream_result::ERROR; }
friend class util::OStream<Window>;
friend class util::IStream<Window>;
};
} // backup namespace
template class util::IStream< backup::Window >;
template class util::OStream< backup::Window >;
namespace backup {
/****************************************************
Definitions of input and output backup streams
****************************************************/
class Stream
{
public:
bool open();
void close();
bool rewind();
/// Check if stream is opened
operator bool()
{ return m_fd>0; }
~Stream()
{ close(); }
protected:
Stream(const String &name, int flags):
m_fd(-1), m_path(name), m_flags(flags) {}
int m_fd;
String m_path;
int m_flags; ///< flags used when opening the file
};
/**
Implements backup stream which writes data to a file.
This class inherits from util::OStream which defines methods for serialization of
basic datatypes (strings and integer). It also implements the concept of data chunks. Data
is stored in chunks - writing to an IStream appends data to the current chunk. A chunk is
closed and a new one is started with <code>end_chunk()</code> method.
A client of this class can ask an instance for an output buffer with <code>get_window()</code>
method. After filling the buffer its contents can be written to the stream. This is to avoid
double buffering and unnecessary copying of data. However, once an output buffer is allocated,
all output to the stream is blocked until the buffer is written with <code>write_window()</code>
or dropped with <code>drop_window()</code>.
*/
class OStream:
Window,
public Stream,
public util::OStream< Window >
{
typedef util::OStream< Window > Base3;
public:
typedef stream_result::value Result; // disambiguate
size_t bytes; ///< number of bytes written
bool open()
{
close(FALSE);
return Stream::open() && rewind();
}
void close(bool destroy=TRUE);
bool rewind()
{
Stream::rewind();
Window::reset(2);
bytes= 0;
return TRUE;
}
Result end_chunk();
/**
Ask stream for output buffer of given size.
If buffer is allocated, stream is blocked for other operations until
either write_window() or drop_window() is called.
@note Writing to stream using output buffer doesn't create chunks
boundaries. Explicit call to end_chunk() is needed.
*/
byte* get_window(const size_t);
Result write_window(const size_t);
void drop_window();
stream_result::value operator <<(const String &str)
{
return writestr(str);
}
OStream(const String &name):
Stream(name,O_WRONLY|O_CREAT|O_TRUNC),
Base3(static_cast<Window&>(*this))
{}
};
/**
Implements backup stream reading data from a file.
This class inherits from util::IStream which defines methods for serialization of
basic datatypes (strings and integer). It also handles chunk boundaries as created by
the OStream class. When reading data at the end of a data chunk, <code>stream_result::EOC</code>
is returned. To access data in next chunk, <code>next_chunk()</code> must be called.
*/
class IStream:
Window,
public Stream,
public util::IStream< Window >
{
typedef util::IStream< Window > Base3;
public:
typedef stream_result::value Result; // disambiguate
size_t bytes; ///< number of bytes read
bool open()
{
close(FALSE);
return Stream::open() && rewind();
}
void close(bool destroy=TRUE)
{
Stream::close();
if (destroy)
delete this;
}
bool rewind();
Result next_chunk();
stream_result::value operator >>(String &str)
{
return readstr(str);
}
/**
Return current chunk.
Will return the same chunk until next_chunk() is called.
*/
stream_result::value operator >>(Buffer &buf)
{
m_end= last_byte;
if (last_byte == m_buf) // empty chunk means end of stream
return stream_result::EOS;
buf.data= m_buf;
buf.size= last_byte - m_buf;
return stream_result::OK;
}
IStream(const String &name):
Stream(name,O_RDONLY),
Base3(static_cast<Window&>(*this)),
bytes(0)
{ last_byte= m_buf; }
private:
/// Length of next chunk of data or 0 if there are no more.
uint next_chunk_len; // we use 2 bytes to store chunk len
Result out_of_bounds() const
{ return next_chunk_len > 0 ? stream_result::EOC : stream_result::EOS; }
friend class stream_instances;
};
#ifdef DBUG_BACKUP
// Function for debugging backup stream implementation.
void dump_stream(IStream &);
#endif
} // backup namespace
#endif /*BACKUP_STREAM_H_*/
| Thread |
|---|
| • bk commit into 5.2 tree (rafal:1.2522) | rsomla | 25 Jun |