Below is the list of changes that have just been committed into a local
6.0 repository of rafal. When rafal does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-11-27 11:03:52+01:00, rafal@quant.(none) +4 -0
WL#4063 (backup stream library):
This patch introduces changes and bug fixes necessary for integrating the backup stream library
with the backup server code.
sql/backup/stream_v1.c@stripped, 2007-11-27 11:03:45+01:00, rafal@quant.(none) +115 -96
- changed the way table position is stored in the catalogue: as snapshot number and position within snapshot's
table list;
- added checks for return values from the bcat_* service functions;
- since item types are stored in 2 bytes, an item list separator consists of two 0x00 bytes now - updated docs
and code;
- when metadata item is read from a stream, the code doesn't try to locate it in the catalogue - the upper
layer can do it on its own; this removes a need for bcat_get_*() service functions;
sql/backup/stream_v1.h@stripped, 2007-11-27 11:03:45+01:00, rafal@quant.(none) +96 -88
- changed signature of bstream_open_* functions
- added mode member to backup_stream and different states
sql/backup/stream_v1_services.h@stripped, 2007-11-27 11:03:45+01:00, rafal@quant.(none) +72 -94
Removed bcat_get_*() functions which are not needed anymore.
sql/backup/stream_v1_transport.c@stripped, 2007-11-27 11:03:45+01:00, rafal@quant.(none) +402 -92
- added code which stores block size in the stream and reads it from there, as required by the stream format;
- changed code to take into account the fact that few first bytes of a block can contain block size and should
be skipped;
- added offset parameter to bstream_open_*() functions;
- replace WRITING and READING states by mode member of backup_stream structure - mode is independent from state;
- extended some comments and documentation;
- fixed several bugs in the block writing/reading logic.
diff -Nrup a/sql/backup/stream_v1.c b/sql/backup/stream_v1.c
--- a/sql/backup/stream_v1.c 2007-11-16 15:10:33 +01:00
+++ b/sql/backup/stream_v1.c 2007-11-27 11:03:45 +01:00
@@ -578,6 +578,9 @@ int bstream_wr_catalogue(backup_stream *
it= bcat_iterator_get(cat,BSTREAM_IT_CHARSET);
+ if (!it)
+ return BSTREAM_ERROR;
+
while ((name= (blob*) bcat_iterator_next(cat,it)))
{
CHECK_WR_RES(bstream_wr_string(s,*name));
@@ -591,6 +594,9 @@ int bstream_wr_catalogue(backup_stream *
it= bcat_iterator_get(cat,BSTREAM_IT_USER);
+ if (!it)
+ return BSTREAM_ERROR;
+
while ((name= (blob*) bcat_iterator_next(cat,it)))
{
CHECK_WR_RES(bstream_wr_string(s,*name));
@@ -604,6 +610,9 @@ int bstream_wr_catalogue(backup_stream *
it= bcat_iterator_get(cat,BSTREAM_IT_DB);
+ if (!it)
+ return BSTREAM_ERROR;
+
while ((db_info= (struct st_bstream_db_info*) bcat_iterator_next(cat,it)))
{
CHECK_WR_RES(bstream_wr_string(s,db_info->base.name));
@@ -616,6 +625,9 @@ int bstream_wr_catalogue(backup_stream *
it= bcat_iterator_get(cat,BSTREAM_IT_DB);
+ if (!it)
+ return BSTREAM_ERROR;
+
while ((db_info= (struct st_bstream_db_info*) bcat_iterator_next(cat,it)))
{
CHECK_WR_RES(bstream_end_chunk(s));
@@ -651,7 +663,9 @@ int bstream_rd_catalogue(backup_stream *
struct st_bstream_item_info item;
struct st_bstream_db_info *db_info;
- bcat_reset(cat);
+ ret= bcat_reset(cat);
+ if (ret != BSTREAM_OK)
+ return BSTREAM_ERROR;
/* charset list */
@@ -666,7 +680,9 @@ int bstream_rd_catalogue(backup_stream *
if (item.name.begin == NULL)
break;
- bcat_add_item(cat,&item);
+ if (bcat_add_item(cat,&item) != BSTREAM_OK)
+ return BSTREAM_ERROR;
+
item.pos++;
} while (ret == BSTREAM_OK);
@@ -683,7 +699,9 @@ int bstream_rd_catalogue(backup_stream *
if (item.name.begin == NULL)
break;
- bcat_add_item(cat,&item);
+ if (bcat_add_item(cat,&item) != BSTREAM_OK)
+ return BSTREAM_ERROR;
+
item.pos++;
} while (ret == BSTREAM_OK);
@@ -705,7 +723,9 @@ int bstream_rd_catalogue(backup_stream *
CHECK_RD_OK(bstream_rd_string(s,&item.name));
- bcat_add_item(cat,&item);
+ if (bcat_add_item(cat,&item) != BSTREAM_OK)
+ return BSTREAM_ERROR;
+
item.pos++;
CHECK_RD_RES(bstream_rd_byte(s,&flags));
@@ -726,6 +746,9 @@ int bstream_rd_catalogue(backup_stream *
iter= bcat_iterator_get(cat,BSTREAM_IT_DB);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((db_info= (struct st_bstream_db_info*) bcat_iterator_next(cat,iter)))
{
if (ret != BSTREAM_EOC)
@@ -736,7 +759,9 @@ int bstream_rd_catalogue(backup_stream *
}
bcat_iterator_free(cat,iter);
- bcat_close(cat);
+
+ if (bcat_close(cat) != BSTREAM_OK)
+ return BSTREAM_ERROR;
rd_error:
@@ -755,7 +780,7 @@ int bstream_rd_catalogue(backup_stream *
- 5 = table,
- 6 = view.
- Value 0 doesn't encode a valid item type and is used for other purposes.
+ Value 0 doesn't encode a valid item type and is used as item list separator.
*/
/**
@@ -843,10 +868,10 @@ int bstream_rd_item_type(backup_stream *
[optional extra data]= [ data_len:1 ! extra data:(data_len) ]
@endverbatim
- If database is empty, it stores single 0x00 byte.
+ If database is empty, it stores two 0x00 bytes.
@verbatim
- [db catalogue (empty)] = [ 0x00 ]
+ [db catalogue (empty)] = [ 0x00 0x00 ]
@endverbatim
*/
@@ -862,6 +887,9 @@ int bstream_wr_db_catalogue(backup_strea
iter= bcat_db_iterator_get(cat, db_info);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((item= bcat_db_iterator_next(cat, db_info, iter)))
{
catalogue_empty= FALSE;
@@ -873,13 +901,14 @@ int bstream_wr_db_catalogue(backup_strea
{
CHECK_WR_RES(bstream_wr_byte(s,0x00)); /* flags: we don't use extra data */
CHECK_WR_RES(bstream_wr_byte(s,((struct st_bstream_table_info*)item)->snap_no));
+ CHECK_WR_RES(bstream_wr_num(s,item->base.pos));
}
}
bcat_db_iterator_free(cat, db_info, iter);
if (catalogue_empty)
- CHECK_WR_RES(bstream_wr_byte(s,0x00));
+ CHECK_WR_RES(bstream_wr_item_type(s,BSTREAM_IT_LAST));
wr_error:
@@ -929,7 +958,8 @@ int bstream_rd_db_catalogue(backup_strea
return BSTREAM_ERROR;
CHECK_RD_OK(bstream_rd_byte(s,&flags)); /* flags are ignored currently */
- CHECK_RD_RES(bstream_rd_byte(s,&ti.snap_no));
+ CHECK_RD_OK(bstream_rd_byte(s,&ti.snap_no));
+ CHECK_RD_RES(bstream_rd_num(s,&ti.base.base.pos));
}
if (bcat_add_item(cat, &ti.base.base) != BSTREAM_OK)
@@ -978,7 +1008,7 @@ int bstream_rd_db_catalogue(backup_strea
all per-table items.
@verbatim
- [other items]= [ per-db items ! 0x00 ! per-table items ]
+ [other items]= [ per-db items ! 0x00 0x00 ! per-table items ]
@endverbatim
The per-database items other than tables can not be grouped by database
@@ -993,12 +1023,12 @@ int bstream_rd_db_catalogue(backup_strea
@endverbatim
Meta data item lists can be empty or consist of several item entries. Empty
- item list consist of a single byte 0x00 which can not start any valid
+ item list consist of two 0x00 bytes which can not start any valid
[item entry].
@verbatim
[item list] = [ item entry ! ... ! item entry ]
- [item list (empty)]= [ 0x00 ]
+ [item list (empty)]= [ 0x00 0x00 ]
@endverbatim
*/
@@ -1017,17 +1047,16 @@ enum enum_bstream_meta_item_kind {
int bstream_wr_meta_item(backup_stream*, enum enum_bstream_meta_item_kind,
unsigned short int, struct st_bstream_item_info*);
-int bstream_rd_meta_item(backup_stream*, struct st_bstream_image_header*,
- struct st_bstream_db_info*,
- enum enum_bstream_meta_item_kind, unsigned short int*,
- struct st_bstream_item_info**);
+int bstream_rd_meta_item(backup_stream *s,
+ enum enum_bstream_meta_item_kind kind,
+ unsigned short int *flags,
+ struct st_bstream_item_info **item);
int bstream_wr_item_def(backup_stream*, struct st_bstream_image_header*,
enum enum_bstream_meta_item_kind,
struct st_bstream_item_info*);
int read_and_create_items(backup_stream *s, struct st_bstream_image_header *cat,
- struct st_bstream_db_info *db,
enum enum_bstream_meta_item_kind kind);
/** Write meta-data section of a backup image */
@@ -1044,6 +1073,9 @@ int bstream_wr_meta_data(backup_stream *
iter= bcat_iterator_get(cat,BSTREAM_IT_GLOBAL);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((item= bcat_iterator_next(cat,iter)))
{
item_written= TRUE;
@@ -1052,7 +1084,7 @@ int bstream_wr_meta_data(backup_stream *
/* mark empty list if no items were written */
if (!item_written)
- bstream_wr_byte(s,0x00);
+ CHECK_WR_RES(bstream_wr_item_type(s,BSTREAM_IT_LAST));
bcat_iterator_free(cat,iter);
@@ -1060,6 +1092,9 @@ int bstream_wr_meta_data(backup_stream *
iter= bcat_iterator_get(cat,BSTREAM_IT_DB);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((db_info= (struct st_bstream_db_info*)bcat_iterator_next(cat,iter)))
{
has_db= TRUE;
@@ -1067,6 +1102,9 @@ int bstream_wr_meta_data(backup_stream *
titer= bcat_db_iterator_get(cat,db_info);
+ if (!titer)
+ return BSTREAM_ERROR;
+
item_written= FALSE;
while ((item= (struct st_bstream_item_info*)
bcat_db_iterator_next(cat,db_info,titer)))
@@ -1080,7 +1118,7 @@ int bstream_wr_meta_data(backup_stream *
/* mark empty list */
if (!item_written)
- bstream_wr_byte(s,0x00);
+ CHECK_WR_RES(bstream_wr_item_type(s,BSTREAM_IT_LAST));
bcat_db_iterator_free(cat,db_info,titer);
}
@@ -1097,6 +1135,9 @@ int bstream_wr_meta_data(backup_stream *
iter= bcat_iterator_get(cat,BSTREAM_IT_PERDB);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((item= bcat_iterator_next(cat,iter)))
{
if (item->type == BSTREAM_IT_TABLE)
@@ -1109,10 +1150,13 @@ int bstream_wr_meta_data(backup_stream *
/* per-table items */
- CHECK_WR_RES(bstream_wr_byte(s,0x00));
+ CHECK_WR_RES(bstream_wr_item_type(s,BSTREAM_IT_LAST));
iter= bcat_iterator_get(cat,BSTREAM_IT_PERTABLE);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((item= bcat_iterator_next(cat,iter)))
{
if (item->type == BSTREAM_IT_TABLE)
@@ -1147,12 +1191,15 @@ int bstream_rd_meta_data(backup_stream *
/* global items */
- CHECK_RD_RES(read_and_create_items(s,cat,NULL,GLOBAL_ITEM));
+ CHECK_RD_RES(read_and_create_items(s,cat,GLOBAL_ITEM));
/* tables */
iter= bcat_iterator_get(cat,BSTREAM_IT_DB);
+ if (!iter)
+ return BSTREAM_ERROR;
+
while ((db_info= (struct st_bstream_db_info*)bcat_iterator_next(cat,iter)))
{
has_db= TRUE;
@@ -1161,7 +1208,7 @@ int bstream_rd_meta_data(backup_stream *
return BSTREAM_ERROR;
CHECK_RD_OK(bstream_next_chunk(s));
- CHECK_RD_RES(read_and_create_items(s,cat,db_info,TABLE_ITEM));
+ CHECK_RD_RES(read_and_create_items(s,cat,TABLE_ITEM));
}
bcat_iterator_free(cat,iter);
@@ -1177,7 +1224,7 @@ int bstream_rd_meta_data(backup_stream *
return BSTREAM_ERROR;
CHECK_RD_OK(bstream_next_chunk(s));
- CHECK_RD_RES(read_and_create_items(s,cat,NULL,PER_DB_ITEM));
+ CHECK_RD_RES(read_and_create_items(s,cat,PER_DB_ITEM));
/*
If we hit end of chunk/stream, there is nothing more to read
@@ -1189,7 +1236,7 @@ int bstream_rd_meta_data(backup_stream *
/* per-table items */
- CHECK_RD_RES(read_and_create_items(s,cat,NULL,PER_TABLE_ITEM));
+ CHECK_RD_RES(read_and_create_items(s,cat,PER_TABLE_ITEM));
rd_error:
@@ -1202,8 +1249,8 @@ int bstream_rd_meta_data(backup_stream *
@subsection item_entry Single item entry
- If list starts with byte different than 0x00, then it is a sequence of
- meta data item entries, each having the following format:
+ Item list is a sequence of meta data item entries, each having the
+ following format:
@verbatim
[item entry]= [ type:2 ! flags:1 ! position in the catalogue !
@@ -1223,13 +1270,13 @@ int bstream_rd_meta_data(backup_stream *
@verbatim
[item position (global)]= [db no]
- [item position (table)]= [pos in db item list]
+ [item position (table)]= [ snap no ! pos in snapshot's table list ]
[item position (other per-db item)]= [ pos in db item list ! db no ]
[item position (per-table item)] = [ pos in table's item list ! db no ! table pos ]
@endverbatim
- Note that for tables, the database to which it belongs is implicit, as tables
- are grouped by database.
+ Note that table is identified by its position inside the snapshot to which it
+ belongs.
@verbatim
[optional extra data]= [ data_len:2 ! extra data:(data_len) ]
@@ -1258,6 +1305,12 @@ int bstream_wr_meta_item(backup_stream *
CHECK_WR_RES(bstream_wr_num(s,item->pos));
+ if (kind == TABLE_ITEM)
+ {
+ CHECK_WR_RES(bstream_wr_byte(s,((struct st_bstream_table_info*)item)->snap_no));
+ return ret;
+ }
+
if (kind == PER_TABLE_ITEM || kind == PER_DB_ITEM)
CHECK_WR_RES(bstream_wr_num(s,((struct st_bstream_dbitem_info*)item)->db->base.pos));
@@ -1294,30 +1347,28 @@ int bstream_wr_meta_item(backup_stream *
are looking at an empty item list.
*/
int bstream_rd_meta_item(backup_stream *s,
- struct st_bstream_image_header *cat,
- struct st_bstream_db_info *db,
enum enum_bstream_meta_item_kind kind,
unsigned short int *flags,
struct st_bstream_item_info **item)
{
- enum enum_bstream_item_type type;
- unsigned long int pos, pos1;
- struct st_bstream_table_info *table= NULL;
+ static struct st_bstream_db_info db;
+ static struct st_bstream_table_info table;
static union
{
+ struct st_bstream_item_info any;
+ struct st_bstream_db_info db;
+ struct st_bstream_table_info table;
struct st_bstream_dbitem_info per_db;
struct st_bstream_titem_info per_table;
- } privilege;
+ } item_buf;
int ret= BSTREAM_OK;
- ASSERT( db != NULL || kind != TABLE_ITEM );
-
- CHECK_RD_RES(bstream_rd_item_type(s,&type));
+ CHECK_RD_RES(bstream_rd_item_type(s,&item_buf.any.type));
/* type == BSTREAM_IT_LAST means that we hit a no-item marker (0x00) */
- if (type == BSTREAM_IT_LAST)
+ if (item_buf.any.type == BSTREAM_IT_LAST)
{
*item= NULL;
return ret;
@@ -1327,26 +1378,34 @@ int bstream_rd_meta_item(backup_stream *
return BSTREAM_ERROR;
ASSERT(item);
+ *item= &item_buf.any;
CHECK_RD_OK(bstream_rd_byte(s,flags));
/* read item's position */
- CHECK_RD_RES(bstream_rd_num(s,&pos));
+ CHECK_RD_RES(bstream_rd_num(s,&item_buf.any.pos));
+
+ if (kind == TABLE_ITEM)
+ {
+ if (ret != BSTREAM_OK)
+ return BSTREAM_ERROR;
+
+ CHECK_RD_RES(bstream_rd_byte(s,&item_buf.table.snap_no));
+ return ret;
+ }
/* read db pos if present */
- if (kind == PER_TABLE_ITEM || (kind == PER_DB_ITEM && db == NULL))
+ if (kind == PER_TABLE_ITEM || kind == PER_DB_ITEM)
{
if (ret != BSTREAM_OK)
return BSTREAM_ERROR;
- CHECK_RD_RES(bstream_rd_num(s,&pos1));
-
- db= (struct st_bstream_db_info*)bcat_get_item(cat,pos1);
+ db.base.type= BSTREAM_IT_DB;
+ CHECK_RD_RES(bstream_rd_num(s,&db.base.pos));
- ASSERT(db != NULL);
- ASSERT(db->base.type == BSTREAM_IT_DB);
+ item_buf.per_db.db= &db;
}
/* read table pos if present */
@@ -1356,54 +1415,13 @@ int bstream_rd_meta_item(backup_stream *
if (ret != BSTREAM_OK)
return BSTREAM_ERROR;
- ASSERT(db != NULL);
+ table.base.base.type= BSTREAM_IT_TABLE;
+ table.base.db= &db;
+ CHECK_RD_RES(bstream_rd_num(s,&table.base.base.pos));
- CHECK_RD_RES(bstream_rd_num(s,&pos1));
-
- table= (struct st_bstream_table_info*)bcat_get_db_item(cat,db,pos1);
-
- ASSERT(table != NULL);
- ASSERT(table->base.base.type == BSTREAM_IT_TABLE);
+ item_buf.per_table.table= &table;
}
- /*
- special case: privileges are not stored in the catalogue and thus we
- store privilege description in a static structure and return pointer
- to it. Warning: not thread safe!
- */
- if (type == BSTREAM_IT_PRIVILEGE)
- {
- if (table)
- {
- *item= &privilege.per_table.base;
- privilege.per_table.table= table;
- }
- else
- {
- *item= &privilege.per_db.base;
- privilege.per_db.db= db;
- }
-
- (*item)->type= BSTREAM_IT_PRIVILEGE;
- (*item)->pos= pos;
- return ret;
- }
-
- /* locate item in the catalogue */
-
- if (table)
- *item= (struct st_bstream_item_info*)bcat_get_table_item(cat,table,pos);
- else if (db)
- *item= (struct st_bstream_item_info*)bcat_get_db_item(cat,db,pos);
- else
- *item= bcat_get_item(cat,pos);
-
- /* signal error if we couldn't locate the item */
- if (*item == NULL)
- return BSTREAM_ERROR;
-
- (*item)->type= type;
-
rd_error:
return ret;
@@ -1461,7 +1479,6 @@ int bstream_wr_item_def(backup_stream *s
stream has been reached
*/
int read_and_create_items(backup_stream *s, struct st_bstream_image_header *cat,
- struct st_bstream_db_info *db,
enum enum_bstream_meta_item_kind kind)
{
unsigned short int flags;
@@ -1471,7 +1488,7 @@ int read_and_create_items(backup_stream
do {
- CHECK_RD_RES(bstream_rd_meta_item(s,cat,db,kind,&flags,&item));
+ CHECK_RD_RES(bstream_rd_meta_item(s,kind,&flags,&item));
/* if 0x00 marker was read, item == NULL */
if (item == NULL)
@@ -1494,7 +1511,8 @@ int read_and_create_items(backup_stream
CHECK_RD_RES(bstream_rd_string(s,&query));
}
- bcat_create_item(cat,item,query,data);
+ if (bcat_create_item(cat,item,query,data) != BSTREAM_OK)
+ return BSTREAM_ERROR;
bstream_free(query.begin);
bstream_free(data.begin);
@@ -1652,7 +1670,8 @@ int bstream_rd_data_chunk(backup_stream
return BSTREAM_ERROR;
/* copy data from old buffer to the new one */
- memmove(new_buf, buf->begin, buf->end - buf->begin);
+ if (buf->begin && buf->end > buf->begin)
+ memmove(new_buf, buf->begin, buf->end - buf->begin);
bstream_free(buf->begin);
buf->begin= new_buf;
diff -Nrup a/sql/backup/stream_v1.h b/sql/backup/stream_v1.h
--- a/sql/backup/stream_v1.h 2007-11-14 19:48:06 +01:00
+++ b/sql/backup/stream_v1.h 2007-11-27 11:03:45 +01:00
@@ -1,7 +1,7 @@
#ifndef BACKUP_STREAM_V1_
#define BACKUP_STREAM_V1_
-/**
+/**
@file
@brief
@@ -9,25 +9,25 @@
This file declares functions and data types used to read and write backup
stream using version 1 of backup stream format.
-*/
+*/
/*********************************************************************
- *
+ *
* BASIC TYPES
- *
+ *
*********************************************************************/
-
+
typedef unsigned char bstream_byte;
-
+
/**
Describes continuous area of memory.
-
+
The @c begin member points at the first byte in the area, @c at one byte
- after the last byte of the area. Thus, blob @c b contains exactly
- <code>b.end - b.begin</code> bytes and is empty if
- <code>b.begin == b.end</code>. A null blob is a blob @c b with
+ after the last byte of the area. Thus, blob @c b contains exactly
+ <code>b.end - b.begin</code> bytes and is empty if
+ <code>b.begin == b.end</code>. A null blob is a blob @c b with
<code>b.begin == NULL</code>.
-*/
+*/
struct st_blob
{
bstream_byte *begin; /**< first byte of the blob */
@@ -38,10 +38,10 @@ typedef struct st_blob bstream_blob;
/**
Stores time point with one second accuracy.
-
+
This structure is similar to the POSIX <code>struct tm</code>. We define it
explicitly to make this header self-contained.
-*/
+*/
struct st_bstream_time
{
unsigned short int sec; /**< seconds [0,61] */
@@ -56,42 +56,42 @@ typedef struct st_bstream_time bstream_t
/**
Describes position of an event in MySQL server's binary log.
-
+
The event is identified by the name of the file in which it is stored and
the position within that file.
-*/
+*/
struct st_bstream_binlog_pos
{
char *file; /**< binlog file storing the event */
- unsigned long int pos; /**< position (offset) within the file */
+ unsigned long int pos; /**< position (offset) within the file */
};
/* struct st_backup_stream is defined below */
typedef struct st_backup_stream backup_stream;
/** Codes returned by backup stream functions */
-enum enum_bstream_ret_codes {
- BSTREAM_OK=0, /**< Success */
+enum enum_bstream_ret_codes {
+ BSTREAM_OK=0, /**< Success */
BSTREAM_EOC, /**< End of chunk detected */
BSTREAM_EOS, /**< End of stream detected */
BSTREAM_ERROR /**< Error */
};
/*********************************************************************
- *
+ *
* TYPES FOR IMAGE HEADER
- *
+ *
*********************************************************************/
/**
Describes version of MySQL server.
-
+
For example, if server has version "5.2.32-online-backup" then:
major = 5
minor = 2
release = 32
extra = "5.2.32-online-backup"
-*/
+*/
struct st_server_version
{
unsigned short int major;
@@ -102,7 +102,7 @@ struct st_server_version
/**
Describes version of storage engine.
-*/
+*/
struct st_bstream_engine_info
{
bstream_blob name; /**< name of the storage engine */
@@ -111,8 +111,8 @@ struct st_bstream_engine_info
};
/** Types of table data snapshots. */
-enum enum_bstream_snapshot_type {
- BI_NATIVE, /**< created by native backup driver of a storage engine */
+enum enum_bstream_snapshot_type {
+ BI_NATIVE, /**< created by native backup driver of a storage engine */
BI_DEFAULT, /**< created by built-in blocking backup driver */
BI_CS /**< created by built-in driver using consistent read transaction */
};
@@ -125,43 +125,43 @@ struct st_bstream_snapshot_info
unsigned int options; /**< snapshot options (not used currently) */
unsigned long int table_count; /**< number of tables in the snapshot */
/**
- In case of native snapshot, information about storage engine
- which created it
+ In case of native snapshot, information about storage engine
+ which created it
*/
- struct st_bstream_engine_info engine;
+ struct st_bstream_engine_info engine;
};
-/**
+/**
Extension of st_bstream_snapshot_info describing snapshot created by a native
backup driver.
-*/
+*/
struct st_native_snapshot_info
{
- struct st_bstream_snapshot_info base; /**< standard snapshot data */
+ struct st_bstream_snapshot_info base; /**< standard snapshot data */
};
-/** Information about backup image. */
+/** Information about backup image. */
struct st_bstream_image_header
{
unsigned int version; /**< image's format version number */
- /** version of the server which created the image */
- struct st_server_version server_version;
+ /** version of the server which created the image */
+ struct st_server_version server_version;
unsigned int flags; /**< image options */
bstream_time_t start_time; /**< time when backup operation started */
bstream_time_t end_time; /**< time when backup operation completed */
- bstream_time_t vp_time; /**< time of the image's validity point */
-
+ bstream_time_t vp_time; /**< time of the image's validity point */
+
/*
- If server which created backup image had binary log enabled, the image
+ If server which created backup image had binary log enabled, the image
contains information about the position inside the log corresponding to
the validity point time.
- */
-
- /** position of the last binlog entry at the VP time */
+ */
+
+ /** position of the last binlog entry at the VP time */
struct st_bstream_binlog_pos binlog_pos;
- /** start of the last binlog event group at the VP time */
+ /** start of the last binlog event group at the VP time */
struct st_bstream_binlog_pos binlog_group;
-
+
/** number of table data snapshots in the image */
unsigned short int snap_count;
/** descriptions of table data snapshots */
@@ -172,40 +172,40 @@ struct st_bstream_image_header
/**
Position of the summary block.
-
+
If this flag is set, the block containing summary info (available at the end
of backup process) is stored in the image's preamble. Otherwise, this block
is appended at the end of the image.
- */
+ */
#define BSTREAM_FLAG_INLINE_SUMMARY (1U<<0)
/**
Byte order of the server which created backup image.
-
+
If set, informs that backup image was created on big-endian server. This might
be useful to detect problems, if backup engines are not endian-agnostic.
-*/
+*/
#define BSTREAM_FLAG_BIG_ENDIAN (1U<<1)
/**
Informs if image stores binlog position.
-
+
If this flag is not set, the @c binlog_pos and @c binlog_group entries in
the image should be ignored.
- */
+ */
#define BSTREAM_FLAG_BINLOG (1U<<2)
/*********************************************************************
- *
+ *
* TYPES FOR DESCRIBING BACKUP ITEMS
- *
+ *
*********************************************************************/
-/**
+/**
Types of items stored in a backup image.
-
+
@note Not all of these types are supported currently.
*/
enum enum_bstream_item_type {
@@ -220,7 +220,7 @@ enum enum_bstream_item_type {
/**
Common data about backup image item.
-*/
+*/
struct st_bstream_item_info
{
enum enum_bstream_item_type type; /**< type of the item */
@@ -230,9 +230,9 @@ struct st_bstream_item_info
/**
Describes database item.
-
+
Currently no data specific to database items is used.
-*/
+*/
struct st_bstream_db_info
{
struct st_bstream_item_info base;
@@ -241,7 +241,7 @@ struct st_bstream_db_info
/**
Describes item which sits inside a database.
-*/
+*/
struct st_bstream_dbitem_info
{
struct st_bstream_item_info base; /**< data common to all items */
@@ -250,10 +250,10 @@ struct st_bstream_dbitem_info
/**
Describes a table item.
-
+
Table is a per-database item. Additionally we store information about the
snapshot which contains its data.
-*/
+*/
struct st_bstream_table_info
{
struct st_bstream_dbitem_info base; /**< data common to all per-db items */
@@ -262,7 +262,7 @@ struct st_bstream_table_info
/**
Describes item which sits inside a table.
- */
+ */
struct st_bstream_titem_info
{
struct st_bstream_item_info base; /**< data common to all items */
@@ -270,11 +270,11 @@ struct st_bstream_titem_info
};
/*
- The following constants denote additional backup item categories. Items of
- different types can fall into one of these categories. Thus the categories
- are something different than item types and therefore are not listed
- inside enum_bstream_item_type but defined separately.
-*/
+ The following constants denote additional backup item categories. Items of
+ different types can fall into one of these categories. Thus the categories
+ are something different than item types and therefore are not listed
+ inside enum_bstream_item_type but defined separately.
+*/
#define BSTREAM_IT_GLOBAL BSTREAM_IT_LAST
#define BSTREAM_IT_PERDB (BSTREAM_IT_LAST+1)
@@ -282,14 +282,14 @@ struct st_bstream_titem_info
/*************************************************************************
- *
+ *
* STRUCTURE FOR WRITING/READING TABLE DATA
- *
+ *
*************************************************************************/
/**
Describe chunk of data from backup driver or for restore driver.
-*/
+*/
struct st_bstream_data_chunk
{
unsigned long int table_no; /**< table to which this data belongs */
@@ -303,24 +303,26 @@ struct st_bstream_data_chunk
/*************************************************************************
- *
+ *
* FUNCTIONS FOR WRITING BACKUP IMAGE
- *
+ *
*************************************************************************/
int bstream_wr_preamble(backup_stream*, struct st_bstream_image_header*);
-int bstream_wr_data_chunk(backup_stream*,
+int bstream_wr_data_chunk(backup_stream*,
struct st_bstream_data_chunk*);
int bstream_wr_summary(backup_stream *s, struct st_bstream_image_header *hdr);
+int bstream_flush(backup_stream*);
+
/*********************************************************************
- *
+ *
* FUNCTIONS FOR READING BACKUP IMAGE
- *
+ *
*********************************************************************/
int bstream_rd_preamble(backup_stream*, struct st_bstream_image_header*);
-int bstream_rd_data_chunk(backup_stream*,
+int bstream_rd_data_chunk(backup_stream*,
struct st_bstream_data_chunk*);
int bstream_rd_summary(backup_stream *s, struct st_bstream_image_header *hdr);
@@ -332,25 +334,25 @@ int bstream_rd_meta_data(backup_stream *
/* basic types */
-int bstream_rd_time(backup_stream*, bstream_time_t*);
+int bstream_rd_time(backup_stream*, bstream_time_t*);
int bstream_rd_string(backup_stream*, bstream_blob*);
int bstream_rd_num(backup_stream*, unsigned long int*);
int bstream_rd_int4(backup_stream*, unsigned long int*);
int bstream_rd_int2(backup_stream*, unsigned int*);
-int bstream_rd_byte(backup_stream*, unsigned short int*);
+int bstream_rd_byte(backup_stream*, unsigned short int*);
int bstream_next_chunk(backup_stream*);
/*********************************************************************
- *
+ *
* DEFINITION OF BACKUP STREAM STRUCTURE
- *
+ *
*********************************************************************/
/**
Structure defining base I/O operations.
-
+
Abstract stream is a way of defining the final destination/source of
the bytes being written to/read from a backup stream. This is done by
storing pointers to functions performing basic I/O operations inside
@@ -363,9 +365,9 @@ struct st_abstract_stream
int (*forward)(void*, unsigned long int*);
};
-/**
- Structure describing backup stream`s input or output buffer.
- @see stream_v1_carrier.c
+/**
+ Structure describing backup stream`s input or output buffer.
+ @see stream_v1_carrier.c
*/
struct st_bstream_buffer
{
@@ -375,21 +377,27 @@ struct st_bstream_buffer
bstream_byte *end;
};
-/**
- Structure describing state of a backup stream.
+/**
+ Structure describing state of a backup stream.
*/
struct st_backup_stream
{
struct st_abstract_stream stream;
- size_t block_size;
- enum { CLOSED, READING, WRITING, EOS, ERROR } state;
- struct st_bstream_buffer buf;
- int reading_last_fragment;
+ unsigned long int block_size;
+ short int init_block_count;
+ enum { CLOSED, /* stream has been closed */
+ FIRST_BLOCK, /* reading/writing the first block of a stream */
+ NORMAL, /* normal operation */
+ LAST_FRAGMENT, /* reading last fragment of a chunk */
+ EOS, /* end of stream detected */
+ ERROR } state;
+ enum { READING, WRITING } mode;
+ struct st_bstream_buffer buf;
bstream_blob mem;
bstream_blob data_buf;
};
-int bstream_open_wr(backup_stream*, unsigned long int);
+int bstream_open_wr(backup_stream*, unsigned long int, unsigned long int);
int bstream_open_rd(backup_stream*, unsigned long int);
int bstream_close(backup_stream*);
diff -Nrup a/sql/backup/stream_v1_services.h b/sql/backup/stream_v1_services.h
--- a/sql/backup/stream_v1_services.h 2007-11-14 19:48:07 +01:00
+++ b/sql/backup/stream_v1_services.h 2007-11-27 11:03:45 +01:00
@@ -3,7 +3,7 @@
#include <stream_v1.h>
-/**
+/**
@file
@brief
@@ -12,232 +12,210 @@
This header declares functions which should be implemented by an user of
the backup stream library. The library will call these functions to browse
the catalogue of a backup image and perform other tasks.
-*/
+*/
/*********************************************************************
- *
+ *
* CATALOGUE SERVICES
- *
+ *
*********************************************************************/
-/*
+/*
Functions for manipulating backup image's catalogue.
Backup stream library doesn't make any assumptions about how the catalogue
of a backup image is stored internally. Instead, program using backup stream
library must define the following functions which are used by the library
- to populate backup catalogue with items or to enumerate and access items
+ to populate backup catalogue with items or to enumerate and access items
stored there.
*/
/**
Clear catalogue and prepare it for populating with items.
-*/
+*/
int bcat_reset(struct st_bstream_image_header *catalogue);
/**
Close catalogue after all items have been added to it.
-*/
+*/
int bcat_close(struct st_bstream_image_header *catalogue);
-/**
+/**
Add item to the catalogue.
The @c item->pos field should be set to indicate position of the item
in the catalogue.
-
- @note
+
+ @note
Global, per-table and per-database items can have independent address
spaces. Thus item belonging to a database is identified by its position inside
that database's item list. Similar for items belonging to tables.
*/
-int bcat_add_item(struct st_bstream_image_header *catalogue,
+int bcat_add_item(struct st_bstream_image_header *catalogue,
struct st_bstream_item_info *item);
-/**
- Get global item stored in the catalogue at given position.
-*/
-struct st_bstream_item_info*
-bcat_get_item(struct st_bstream_image_header *catalogue,
- unsigned long int pos);
-
-/**
- Get per-database item stored in the catalogue at given position.
-*/
-struct st_bstream_dbitem_info*
-bcat_get_db_item(struct st_bstream_image_header *catalogue,
- struct st_bstream_db_info *db,
- unsigned long int pos);
-/**
- Get per-table item stored in the catalogue at given position.
-*/
-struct st_bstream_titem_info*
-bcat_get_table_item(struct st_bstream_image_header *catalogue,
- struct st_bstream_table_info *table,
- unsigned long int pos);
-
/*
Iterators are used to iterate over items inside backup catalogue.
-
+
The internal implementation of an iterator is hidden behind this API which
should be implemented by the program using backup stream library. An iterator
- is represented by a void* pointer pointing at a memory area containing its
+ is represented by a void* pointer pointing at a memory area containing its
internal state.
-*/
+*/
/**
Create global iterator of a given type.
-
+
Possible iterator types.
-
+
- BSTREAM_IT_CHARSET: all charsets
- BSTREAM_IT_USER: all users
- BSTREAM_IT_DB: all databases
-
+
The following types of iterators iterate only over items for which
some meta-data should be saved in the image.
-
+
- BSTREAM_IT_GLOBAL: all global items in create-dependency order
- - BSTREAM_IT_PERDB: all per-db items except tables which are enumerated by
+ - BSTREAM_IT_PERDB: all per-db items except tables which are enumerated by
a table iterator (see below)
- BSTREAM_IT_PERTABLE: all per-table items in create-dependency orders.
-
+
@return Pointer to the iterator or NULL in case of error.
-*/
-void* bcat_iterator_get(struct st_bstream_image_header *catalogue,
+*/
+void* bcat_iterator_get(struct st_bstream_image_header *catalogue,
unsigned int type);
/**
- Return next item pointed by iterator.
-
+ Return next item pointed by iterator.
+
@return NULL if there are no more items in the set.
-*/
-struct st_bstream_item_info*
+*/
+struct st_bstream_item_info*
bcat_iterator_next(struct st_bstream_image_header *catalogue, void *iter);
/**
Free iterator resources.
-
+
@note
The iterator can not be used after call to this function.
-*/
+*/
void bcat_iterator_free(struct st_bstream_image_header *catalogue, void *iter);
/**
Create iterator for items belonging to a given database.
-*/
-void* bcat_db_iterator_get(struct st_bstream_image_header *catalogue,
+*/
+void* bcat_db_iterator_get(struct st_bstream_image_header *catalogue,
struct st_bstream_db_info *db);
/** Return next item from database items iterator */
-struct st_bstream_dbitem_info*
-bcat_db_iterator_next(struct st_bstream_image_header *catalogue,
- struct st_bstream_db_info *db,
+struct st_bstream_dbitem_info*
+bcat_db_iterator_next(struct st_bstream_image_header *catalogue,
+ struct st_bstream_db_info *db,
void *iter);
/** Free database items iterator resources */
-void bcat_db_iterator_free(struct st_bstream_image_header *catalogue,
- struct st_bstream_db_info *db,
+void bcat_db_iterator_free(struct st_bstream_image_header *catalogue,
+ struct st_bstream_db_info *db,
void *iter);
/*********************************************************************
- *
+ *
* SAVING ITEM META DATA AND RESTORING ITEM FROM IT
- *
+ *
*********************************************************************/
/**
Produce CREATE statement for a given item.
-
+
Backup stream library calls that function when saving item's
meta-data. If function successfully produces the statement, it becomes
part of meta-data.
-
+
@retval BSTREAM_OK blob @c stmt contains the CREATE query
- @retval BSTREAM_ERROR no CREATE statement for that item
-*/
-int bcat_get_item_create_query(struct st_bstream_image_header *catalogue,
- struct st_bstream_item_info *item,
+ @retval BSTREAM_ERROR no CREATE statement for that item
+*/
+int bcat_get_item_create_query(struct st_bstream_image_header *catalogue,
+ struct st_bstream_item_info *item,
bstream_blob *stmt);
/**
Return meta-data (other than CREATE statement) for a given item.
-
+
Backup stream library calls that function when saving item's
meta-data. If function returns successfully, the bytes returned become
part of meta-data.
@retval BSTREAM_OK blob @c data contains the meta-data
- @retval BSTREAM_ERROR no extra meta-data for that item
-*/
-int bcat_get_item_create_data(struct st_bstream_image_header *catalogue,
- struct st_bstream_item_info *item,
+ @retval BSTREAM_ERROR no extra meta-data for that item
+*/
+int bcat_get_item_create_data(struct st_bstream_image_header *catalogue,
+ struct st_bstream_item_info *item,
bstream_blob *data);
/**
Create item from its meta-data.
-
+
When the meta-data section of backup image is read, items are created
- as their meta-data is read (so that there is no need to store these
+ as their meta-data is read (so that there is no need to store these
meta-data). Backup stream library calls this function to create an item.
-
+
@note
Either @c create_stmt or @c other_meta_data or both can be empty, depending
on what was stored in the image.
-
+
@retval BSTREAM_OK item created
@retval BSTREAM_ERROR error while creating item
-*/
-int bcat_create_item(struct st_bstream_image_header *catalogue,
- struct st_bstream_item_info *item,
- bstream_blob create_stmt,
+*/
+int bcat_create_item(struct st_bstream_image_header *catalogue,
+ struct st_bstream_item_info *item,
+ bstream_blob create_stmt,
bstream_blob other_meta_data);
/*********************************************************************
- *
+ *
* ABSTRACT STREAM INTERFACE
- *
+ *
*********************************************************************/
/*
The following typedefs define signatures of functions implementing basic
I/O operations on the output stream. Application using backup stream stores
pointers to appropriate functions inside backup_stream::stream structure.
-*/
+*/
/**
Function writing bytes to the underlying media.
-
+
For specification, see @c bstream_write_part() function.
-*/
+*/
typedef int (*as_write_m)(void*, bstream_blob*, bstream_blob);
/**
Function reading bytes from the underlying media.
-
+
For specification, see @c bstream_read_part() function.
-*/
+*/
typedef int (*as_read_m)(void *, bstream_blob*, bstream_blob);
/**
Function skipping bytes in the underlying stream.
-
+
This function should move the read/write "head" of the underlying stream by
the amount stored in variable pointed by @c offset. The variable is updated
to show how much the head was actually moved.
-
+
@retval BSTREAM_OK operation successful
- @retval BSTREAM_ERROR error has happened, @c *offset informs how many bytes
+ @retval BSTREAM_ERROR error has happened, @c *offset informs how many bytes
have been skipped.
-*/
+*/
typedef int (*as_forward_m)(void *, unsigned long int *offest);
/*********************************************************************
- *
+ *
* MEMORY ALLOCATOR
- *
+ *
*********************************************************************/
/*
@@ -246,9 +224,9 @@ typedef int (*as_forward_m)(void *, unsi
allocating functions will be used.
Note: the BSTREAM_USE_MALLOC macro should be used in at most one compilation
- module - otherwise several copies of bstream_{malloc,free} will be defined.
+ module - otherwise several copies of bstream_{malloc,free} will be defined.
*/
-
+
#ifdef BSTREAM_USE_MALLOC
#include <stdlib.h>
diff -Nrup a/sql/backup/stream_v1_transport.c b/sql/backup/stream_v1_transport.c
--- a/sql/backup/stream_v1_transport.c 2007-11-16 16:18:22 +01:00
+++ b/sql/backup/stream_v1_transport.c 2007-11-27 11:03:45 +01:00
@@ -13,12 +13,16 @@
These functions form the transport layer of the 1st version of backup stream
format. They split stream into a sequence of data chunks.
+
+ @todo free internal buffer memory in case of errors.
*/
/**
@brief Default size of a stream block.
When opening stream for writing a different size can be specified.
+
+ @note Block size should be >= 8 bytes.
*/
#define DEFAULT_BLOCK_SIZE (32*1024)
@@ -312,8 +316,43 @@ int as_read_all(struct st_abstract_strea
static
void reset_output_buffer(backup_stream *s)
{
- s->buf.pos= s->buf.header= s->buf.begin= s->mem.begin;
+ unsigned long int block_size= s->block_size;
+ int i;
+
+ s->buf.pos= s->buf.begin= s->mem.begin;
s->buf.end= s->buf.begin + s->block_size;
+
+ /*
+ special case: if we are in an initial block, we store block
+ size in its first 4 bytes.
+ */
+
+ if (s->state == FIRST_BLOCK || s->init_block_count > 0)
+ {
+ for(i=0; i<4; ++i)
+ {
+ *(s->buf.pos++)= block_size & 0xFF;
+ block_size >>= 8;
+ }
+
+ if (s->state != FIRST_BLOCK)
+ s->init_block_count--;
+
+ ASSERT(block_size == 0);
+ }
+
+ /*
+ Special case: if this is the first block, we also store the number
+ of inital blocks (currently none are used)
+ */
+
+ if (s->state == FIRST_BLOCK)
+ {
+ *(s->buf.pos++)= s->init_block_count;
+ }
+
+ s->buf.header= s->buf.pos;
+ s->state= NORMAL;
}
/**
@@ -376,6 +415,8 @@ int append_to_buffer(backup_stream *s, b
becomes the current fragment in the buffer (with header correctly set)
while all other data from the buffer is send to the output stream.
+ If the current fragment is empty, nothing is done.
+
@return Error code if there was an error while writing output stream.
*/
static
@@ -473,76 +514,207 @@ int close_current_fragment(backup_stream
begin pos header end
In each case header points at the position where the header should be if
- the data were present in the buffer.
+ the data were present in the buffer. End points one byte after the end of
+ current input block. As with header, this can be inside the buffer (end <= pos)
+ or still in the stream.
- Invariant:
+ *************************************************************************/
- (end - pos) is the number of bytes left in the input block
- header <= end and header points at the first byte of next fragment
+#define IBUF_INV(B) \
+ ASSERT((B).begin <= (B).pos); \
+ ASSERT((B).begin <= (B).header); \
+ ASSERT((B).header <= (B).end)
- *************************************************************************/
/**
- Prepare stream`s input buffer for reading next block from input stream.
+ Move input buffer to the beginning of available buffer memory.
+
+ This is to make space for more data.
*/
static
void reset_input_buffer(backup_stream *s)
{
- s->buf.header -= (s->buf.pos - s->mem.begin);
- s->buf.pos= s->buf.begin= s->mem.begin;
- s->buf.end= s->buf.begin + s->block_size;
+ unsigned long int data_len= s->buf.pos - s->buf.begin;
+
+ ASSERT(s->buf.begin >= s->mem.begin);
+ ASSERT(s->buf.pos >= s->buf.begin);
+
+ if (data_len > 0)
+ memmove(s->mem.begin, s->buf.begin, data_len);
+
+ s->buf.header -= (s->buf.begin - s->mem.begin);
+ s->buf.end -= (s->buf.begin - s->mem.begin);
+ s->buf.begin= s->mem.begin;
+ s->buf.pos= s->buf.begin + data_len;
+
+ IBUF_INV(s->buf);
}
/**
- Read few more bytes into the stream`s input buffer if it is empty.
+ Ensure that there are non-zero bytes in the input buffer.
- Normally INPUT_BUF_SIZE bytes is read, unless current input block doesn't
- contain that many bytes in which case the rest of the input block is read.
+ Normally the bytes are read from the stream. However, if we move to the next
+ input block, the bytes can be already stored in the buffer and the function
+ only updates buffer pointers to reflect new situation and maintain
+ invariants.
+
+ @post <code>buf.begin < buf.pos && buf.begin < buf.end</code>
+
+ Normally we read INPUT_BUF_SIZE bytes. But if there is not that many bytes
+ left in the input block, only the remaining bytes of the block are read. This
+ is to ensure that reads are aligned at block boundaries (i.e. we read at most
+ a single input block at a time).
+
+ An exception is when we start reading the stream and don't know the block
+ size yet. In this case we always read INPUT_BUF_SIZE bytes which means that
+ we can also load bytes from the following blocks.
@retval BSTREAM_OK input buffer was filled with data
- @retval BSTREAM_EOS end of stream was hit, all remaining bytes from the stream
- are in the buffer
+ @retval BSTREAM_EOS buffer is empty and there are no more bytes in the stream
@retval BSTREAM_ERROR error when reading from the stream
*/
static
int load_buffer(backup_stream *s)
{
int ret= BSTREAM_OK;
- byte *saved_begin;
- size_t howmuch= s->buf.end - s->buf.pos;
+ blob data;
+ unsigned long int howmuch= 0;
+ unsigned long int data_len= s->buf.pos - s->buf.begin;
+ unsigned long int block_size= s->block_size;
+ unsigned short int i;
- /* do nothing if there already is some data in the buffer */
- if (s->buf.pos > s->buf.begin)
- return BSTREAM_OK;
+ IBUF_INV(s->buf);
/*
- Call reset_input_buffer() to move buffer head to the beginning of
- available memory
+ If all bytes of the current input block are read, we move to the next block
+ (by updating the buf.end pointer).
*/
- reset_input_buffer(s);
+ if (s->buf.begin == s->buf.end)
+ {
+ s->buf.end += block_size;
+ block_size= 0; /* to mark that we are reading new input block */
+ }
/*
- If howmuch > 0 the current input block has not been completely read yet.
- In that case we restore input buffer's invariant by setting end pointer
- accordingly.
- */
- if (howmuch > 0)
- s->buf.end= s->buf.pos + howmuch;
- else
+ Determine how many bytes to read:
+
+ - we try to read INPUT_BUF_SIZE bytes if we are at the beginning of a block
+ which can store block size
+
+ - otherwise, if the buffer is empty, we read as many bytes as is left to
+ the end of the current input block.
+
+ - if the buffer is non-empty, we don't try to read any bytes.
+ */
+ if (s->state == FIRST_BLOCK || (block_size == 0 && s->init_block_count > 0))
+ howmuch= INPUT_BUF_SIZE;
+ else if (data_len == 0 && s->buf.pos < s->buf.end)
howmuch= s->buf.end - s->buf.pos;
/* don't read more than INPUT_BUF_SIZE */
if (howmuch > INPUT_BUF_SIZE)
howmuch= INPUT_BUF_SIZE;
- /* read into the buffer howmuch bytes from the input stream */
- s->buf.pos += howmuch;
- saved_begin= s->buf.begin;
+ /*
+ We want to have at least 8 bytes in the buffer since we might need
+ to skip first 4-5 bytes storing block size. If there is not that much
+ space left, we move the buffer to the beginning of available memory.
+ */
+ if (s->buf.begin + 8 > s->mem.end)
+ reset_input_buffer(s);
+
+ /* we should not load more data than available space */
+ if (s->buf.pos + howmuch > s->mem.end)
+ howmuch= s->mem.end - s->buf.pos;
+
+ /* buffer is ready for loading howmuch bytes */
+ ASSERT(s->buf.pos + howmuch <= s->mem.end);
+ ASSERT(s->buf.pos + howmuch >= s->buf.begin+8);
+
+ /* read howmuch bytes from the input stream and append them to the buffer */
+ if (howmuch > 0)
+ {
+ data.begin= s->buf.pos;
+ data.end= s->buf.pos + howmuch;
+
+ ret= as_read(&s->stream,&data,s->mem);
+
+ s->buf.pos= data.begin;
+
+ /*
+ if as_read() reports EOS, there is no more bytes in the input stream, but
+ there still might be some bytes in the input buffer. Only if the input
+ buffer is empty we report EOS of the backup stream.
+
+ TODO: think how it will work if we are reading from an asynchronous stream
+ like a socket.
+ */
+ if (ret == BSTREAM_EOS)
+ {
+ if (s->buf.pos == s->buf.begin)
+ {
+ s->state= EOS;
+ return BSTREAM_EOS;
+ }
+ else
+ ret= BSTREAM_OK;
+ }
+ }
- ret= as_read(&s->stream,(bstream_blob*)&s->buf,s->mem);
+ /* now we should have some data in the buffer */
+ ASSERT(s->buf.pos > s->buf.begin);
- s->buf.pos= s->buf.begin;
- s->buf.begin= saved_begin;
+ /*
+ If we are reading beginning of an initial block, read the block size
+ stored in first 4 bytes and skip them.
+ */
+ if ( s->state == FIRST_BLOCK
+ || ( s->init_block_count > 0 && block_size == 0 ) )
+ {
+ /* check that all 4 bytes are in the buffer */
+ ASSERT(s->buf.pos >= s->buf.begin+4);
+
+ data.begin= s->buf.begin; /* save buf.begin position */
+
+ for (i= 0; i<4; ++i)
+ {
+ block_size >>= 8;
+ block_size |= (*(s->buf.begin++)) << 3*8;
+ }
+
+ /*
+ If this is the first block, save block size and read the number of
+ initial blocks.
+ */
+ if (s->state == FIRST_BLOCK)
+ {
+ ASSERT(s->buf.pos > s->buf.begin);
+ s->block_size= block_size;
+ s->init_block_count = *(s->buf.begin++);
+ /* now, when we know the block size we can setup buf.end pointer */
+ s->buf.end= data.begin + s->block_size;
+ }
+ else if (block_size != s->block_size)
+ {
+ s->state= ERROR;
+ return BSTREAM_ERROR;
+ }
+ else
+ s->init_block_count--;
+
+ /* this is where we can find header of the first fragment */
+ s->buf.header= s->buf.begin;
+ s->state= NORMAL;
+
+ ASSERT(s->buf.begin <= s->buf.pos);
+ /*
+ As we are at the beginning of an input block, check that the buf.end
+ pointer is setup correctly.
+ */
+ ASSERT(s->buf.end == data.begin + s->block_size);
+ }
+
+ IBUF_INV(s->buf);
return ret;
}
@@ -554,8 +726,8 @@ int load_buffer(backup_stream *s)
following the fragment which is entered now. buf.begin will point at the
first byte of the entered fragment.
- @pre All bytes from previous fragment have been consumed (buf.begin ==
- buf.header) and header of next fragment is loaded into the buffer.
+ @pre All bytes from previous fragment have been consumed and header of
+ the next fragment is loaded into the buffer.
@retval BSTREAM_OK next fragment has been entered
@retval BSTREAM_EOC the entered fragment is the last fragment of a chunk
@@ -565,12 +737,15 @@ static
int load_next_fragment(backup_stream *s)
{
byte *saved_header= s->buf.header;
- int ret;
+ int ret= BSTREAM_OK;
+
+ IBUF_INV(s->buf);
+ /* check that the header byte is in the buffer */
ASSERT(s->buf.pos > s->buf.header);
- ASSERT(s->buf.begin == s->buf.header);
+ ASSERT(s->buf.begin <= s->buf.header);
- s->reading_last_fragment= 0;
+ s->state= NORMAL; /* default, unless changed below */
ret= read_fragment_header(&s->buf.header);
@@ -582,11 +757,17 @@ int load_next_fragment(backup_stream *s)
s->buf.header= s->buf.end;
/* move buf.begin to point at the first byte of the fragment */
- s->buf.begin++;
+ if (s->buf.begin < s->buf.end)
+ s->buf.begin++;
+
+ ASSERT(s->buf.begin <= s->buf.end);
/*
- It can happen that fragment header was the last byte in
- the block. In that case we reload input buffer and start over.
+ If we are at the end of input block now, it means that the fragment
+ lies in the next block. Thus we reload input buffer and start over.
+
+ Note: This includes the case when fragment header was the last byte of
+ the block and thus should be ignored.
TODO: remove recursion
*/
@@ -596,13 +777,15 @@ int load_next_fragment(backup_stream *s)
return ret == BSTREAM_OK ? load_next_fragment(s) : ret;
}
+ IBUF_INV(s->buf);
+
switch (ret) {
case BSTREAM_EOS: s->state= EOS; return BSTREAM_EOS;
- case BSTREAM_EOC: s->reading_last_fragment= 1; return BSTREAM_EOC;
+ case BSTREAM_EOC: s->state= LAST_FRAGMENT; return BSTREAM_EOC;
- case FR_LAST: s->reading_last_fragment= 1;
+ case FR_LAST: s->state= LAST_FRAGMENT;
default: return BSTREAM_OK;
}
@@ -618,15 +801,21 @@ int load_next_fragment(backup_stream *s)
/**
Open backup stream for writing.
+ @param block_size size of output stream blocks
+ @param offset current position of the output stream inside the
+ current stream block
+
@pre The abstract stream methods in @c s should be setup and ready for
writing.
@note Output buffer is allocated.
*/
-int bstream_open_wr(backup_stream *s, unsigned long int block_size)
+int bstream_open_wr(backup_stream *s,
+ unsigned long int block_size,
+ unsigned long int offset)
{
s->state= ERROR;
- s->block_size= block_size > 0 ? block_size : DEFAULT_BLOCK_SIZE;
+ s->block_size= block_size >= 8 ? block_size : DEFAULT_BLOCK_SIZE;
s->mem.begin= bstream_alloc(s->block_size);
@@ -634,11 +823,16 @@ int bstream_open_wr(backup_stream *s, un
return BSTREAM_ERROR;
s->mem.end= s->mem.begin + s->block_size;
+ s->mode= WRITING;
+ s->state= FIRST_BLOCK;
+ s->init_block_count= 2; /* number of initial blocks storing block size */
reset_output_buffer(s);
+ /* adjust buf.end to reflect the position where the stream starts */
+ s->buf.end -= offset; /* now the invariant should hold */
+
s->data_buf.begin= s->data_buf.end= NULL;
- s->state= WRITING;
return BSTREAM_OK;
}
@@ -646,29 +840,41 @@ int bstream_open_wr(backup_stream *s, un
/**
Open backup stream for reading.
+ @param offset current position of the input stream inside the
+ current stream block
+
@pre The abstract stream methods in @c s should be setup and ready for
reading.
@note Input buffer is allocated.
*/
-int bstream_open_rd(backup_stream *s, unsigned long int block_size)
+int bstream_open_rd(backup_stream *s, unsigned long int offset)
{
s->state= ERROR;
- s->block_size= block_size;
+ s->block_size= 0;
s->mem.begin= bstream_alloc(INPUT_BUF_SIZE);
if (!s->mem.begin)
return BSTREAM_ERROR;
+ s->data_buf.begin= s->data_buf.end= NULL;
+ s->mode= READING;
+ s->state= FIRST_BLOCK;
s->mem.end= s->mem.begin + INPUT_BUF_SIZE;
/* initialize input buffer */
- reset_input_buffer(s);
- s->buf.header= s->buf.begin;
+ s->buf.header= s->buf.pos= s->buf.end= s->buf.begin= s->mem.begin;
- s->data_buf.begin= s->data_buf.end= NULL;
- s->state= READING;
+ /* load beginning of the first input block - this setups block_size */
+ if (load_buffer(s) != BSTREAM_OK)
+ return BSTREAM_ERROR;
+
+ ASSERT(s->block_size > 0);
+
+ s->buf.end -= offset;
+
+ IBUF_INV(s->buf);
return BSTREAM_OK;
}
@@ -689,12 +895,13 @@ int bstream_close(backup_stream *s)
if (s->state == CLOSED)
return BSTREAM_OK;
- if (s->state == WRITING)
+ if (s->mode == WRITING)
{
bstream_end_chunk(s);
bstream_flush(s);
/* write EOS marker */
- reset_output_buffer(s);
+ s->buf.pos= s->buf.begin= s->mem.begin;
+ s->buf.end= s->buf.begin + s->block_size;
*(s->buf.pos++)= FR_EOS;
ret= write_buffer(s);
}
@@ -758,12 +965,17 @@ int bstream_write_part(backup_stream *s,
blob fragment;
byte *saved_end;
- if (s->state != WRITING)
+ if (s->mode != WRITING)
return BSTREAM_ERROR;
+ /* return if there is nothing to write */
if (data->begin >= data->end)
return BSTREAM_OK;
+ /*
+ If a complete output block is filled, we flush the stream. Output buffer
+ becomes empty and ready to accept new data.
+ */
if (s->buf.pos == s->buf.end)
{
ret= bstream_flush(s);
@@ -778,7 +990,7 @@ int bstream_write_part(backup_stream *s,
s->buf.pos++;
/*
- Setup fragment to describe all the data available in the current fragment
+ Setup fragment to describe all the data available in the last fragment
of the stream`s output buffer together with the data to be written
output buffer
@@ -834,6 +1046,9 @@ int bstream_write_part(backup_stream *s,
To avoid copying bytes to the internal output buffer we try to cut a prefix
of the data to be written which forms a valid fragment and write this
fragment to output stream.
+
+ Note: after call to biggest_fragment_prefix() blob fragment contains the
+ bytes which didn't fit into the prefix.
*/
*(s->buf.header)= biggest_fragment_prefix(&fragment);
@@ -851,19 +1066,27 @@ int bstream_write_part(backup_stream *s,
/* write remainder of the fragment from data blob */
saved_end= data->end;
- data->end= data->begin + (fragment.end - s->buf.pos);
+ data->end= data->begin + (fragment.begin - s->buf.pos);
ret= as_write_all(&s->stream,*data,*data);
data->begin= data->end;
data->end= saved_end;
- /* move buffer beginning to keep the invariant */
+ /*
+ Move buffer beginning so that invariant is keept.
+ */
+ s->buf.header= s->buf.begin = fragment.begin;
+ s->buf.pos= s->buf.header;
+
+ ASSERT(s->buf.begin <= s->buf.end);
- s->buf.begin = fragment.end;
- s->buf.pos= s->buf.header= s->buf.begin;
+ /*
+ Write remainder of the data.
- return ret;
+ TODO: avoid recursion
+ */
+ return bstream_write_part(s,data,buf);
}
/*
@@ -927,23 +1150,54 @@ int bstream_end_chunk(backup_stream *s)
{
int ret= BSTREAM_OK;
- if (s->state != WRITING)
+ if (s->mode != WRITING)
return BSTREAM_ERROR;
-
- ret= close_current_fragment(s);
-
/*
- If buffer is empty, store EOC marker in it otherwise mark the last
- fragment of the chunk.
+ If current fragment is empty we have to save explicit EOC marker. Otherwise
+ we close the fragment and mark end of chunk accordingly.
*/
- if (s->buf.pos == s->buf.begin)
+ if (s->buf.pos == s->buf.header)
{
+ ASSERT(s->buf.pos < s->buf.end);
*(s->buf.header++)= FR_EOC;
s->buf.pos= s->buf.header;
}
else
- *s->buf.header |= FR_LAST;
+ {
+ ret= close_current_fragment(s);
+
+ /*
+ If the last fragment in the buffer is a small fragment, we can mark it as
+ the last fragment of a chunk by setting FR_LAST flag in the header.
+ Otherwise setting a flag is not possible and we must store explicit EOC
+ marker.
+ */
+ if (!(*(s->buf.header) & 0x80))
+ {
+ *s->buf.header |= FR_LAST;
+ }
+ else
+ {
+ /*
+ Write EOC marker only if there is space for it. If not, then the current
+ fragment extends to the end of the output block and thus will be
+ correctly marked below.
+ */
+ if (s->buf.pos < s->buf.end)
+ {
+ *(s->buf.pos++)= FR_EOC;
+ s->buf.header= s->buf.pos;
+ }
+ }
+ }
+
+ /*
+ If last fragment extends to the end of an output block, then we can
+ always mark it as such.
+ */
+ if (s->buf.pos == s->buf.end && s->buf.header < s->buf.pos)
+ *(s->buf.header)= FR_LAST;
/*
Start new fragment. Note that if the current fragment is empty, these
@@ -951,24 +1205,33 @@ int bstream_end_chunk(backup_stream *s)
*/
s->buf.header= s->buf.pos;
+ /*
+ If there is only one byte left in the current output block we fill it
+ with FR_MORE so that the next fragment will start in the next block.
+ */
if (s->buf.pos == s->buf.end-1)
- {
*(s->buf.pos++)= FR_MORE;
+
+ /*
+ If we have a complete output block filled, we flush the stream.
+ */
+ if (s->buf.pos == s->buf.end)
ret= bstream_flush(s);
- }
return ret;
}
/**
Flush backup stream`s output buffer to the output stream.
+
+ This empties the output buffer.
*/
int bstream_flush(backup_stream *s)
{
struct st_bstream_buffer *buf= &s->buf;
int ret;
- if (s->state != WRITING)
+ if (s->mode != WRITING)
return BSTREAM_ERROR;
/* if buffer is empty, do nothing */
@@ -1048,31 +1311,43 @@ int bstream_flush(backup_stream *s)
int bstream_read_part(backup_stream *s, bstream_blob *data, bstream_blob buf)
{
int ret= BSTREAM_OK;
- size_t howmuch;
+ unsigned long int howmuch;
blob saved;
- if (s->state != READING)
- return s->state == EOS ? BSTREAM_EOS : BSTREAM_ERROR;
+ ASSERT(s->mode == READING);
- /* fill input buffer if it is empty */
- if (s->buf.pos == s->buf.begin)
+ if (s->state == ERROR || s->state == CLOSED)
+ return BSTREAM_ERROR;
+
+ if (s->state == EOS)
+ return BSTREAM_EOS;
+
+ IBUF_INV(s->buf);
+
+ /* fill input buffer if we reached end of fragment or of the input block */
+ if (s->buf.begin == s->buf.end || s->buf.begin == s->buf.header)
{
ret= load_buffer(s);
+
if (ret != BSTREAM_OK)
return ret;
}
- ASSERT(s->buf.pos > s->buf.begin);
+ /* check that we are inside input block and reading current fragment */
+ ASSERT(s->buf.end > s->buf.begin);
+ ASSERT(s->buf.header >= s->buf.begin);
/*
If we finished reading a fragment, we should load next one
or signal EOC if it was the last fragment of a chunk.
- */
+ */
if (s->buf.header == s->buf.begin)
{
- if (s->reading_last_fragment)
+ if (s->state == LAST_FRAGMENT)
return BSTREAM_EOC;
+ ASSERT(s->buf.pos > s->buf.header);
+
ret= load_next_fragment(s);
if (ret != BSTREAM_OK)
return ret;
@@ -1093,13 +1368,12 @@ int bstream_read_part(backup_stream *s,
*/
if (howmuch > 0)
{
- if (howmuch > (size_t)(data->end - data->begin))
+ if (data->begin + howmuch > data->end)
howmuch= data->end - data->begin;
memmove(data->begin, s->buf.begin, howmuch);
data->begin += howmuch;
s->buf.begin += howmuch;
-
}
else
{
@@ -1107,7 +1381,7 @@ int bstream_read_part(backup_stream *s,
ASSERT(s->buf.header > s->buf.pos);
howmuch= data->end - data->begin;
- if (howmuch > (size_t)(s->buf.header - s->buf.pos))
+ if (s->buf.pos + howmuch > s->buf.header)
howmuch= s->buf.header - s->buf.pos;
saved= *data;
@@ -1122,7 +1396,9 @@ int bstream_read_part(backup_stream *s,
data->end= saved.end;
}
- return s->buf.begin == s->buf.header && s->reading_last_fragment ?
+ IBUF_INV(s->buf);
+
+ return s->buf.begin == s->buf.header && s->state == LAST_FRAGMENT ?
BSTREAM_EOC: BSTREAM_OK;
}
@@ -1181,11 +1457,31 @@ int bstream_next_chunk(backup_stream *s)
int ret;
unsigned long int howmuch;
- if (s->state != READING)
- return s->state == EOS ? BSTREAM_EOS : BSTREAM_ERROR;
+ ASSERT(s->mode == READING);
- /* if we are not at the beginning of next fragment, move there */
+ if (s->state == ERROR || s->state == CLOSED )
+ return BSTREAM_ERROR;
+ if (s->state == EOS)
+ return BSTREAM_EOS;
+
+ IBUF_INV(s->buf);
+
+ /*
+ If we are at the end of input block, load next one.
+ */
+ if (s->buf.begin == s->buf.end)
+ {
+ ret= load_buffer(s);
+
+ if (ret != BSTREAM_OK)
+ return ret;
+ }
+
+ /* now we should be inside the current input block */
+ ASSERT(s->buf.begin < s->buf.end);
+
+ /* if we are not at the beginning of next fragment, move there */
if (s->buf.begin < s->buf.header)
{
/*
@@ -1205,13 +1501,25 @@ int bstream_next_chunk(backup_stream *s)
}
/*
+ Check that we are still inside the current input block ans at the beginnig
+ of the fragment
+ */
+ ASSERT(s->buf.begin <= s->buf.end);
+ ASSERT(s->buf.begin == s->buf.header);
+
+ /*
If buffer is empty, we load few bytes into it to have access to
the fragment header.
*/
if (s->buf.pos == s->buf.begin)
- load_buffer(s);
+ {
+ ret= load_buffer(s);
- ASSERT(s->buf.begin == s->buf.header);
+ if (ret != BSTREAM_OK)
+ return ret;
+ }
+
+ /* now we should have some bytes in the buffer */
ASSERT(s->buf.pos > s->buf.header);
ret= load_next_fragment(s);
@@ -1219,6 +1527,8 @@ int bstream_next_chunk(backup_stream *s)
/* if we hit EOC marker here, we treat it as empty chunk */
if (ret == BSTREAM_EOC)
ret= BSTREAM_OK;
+
+ IBUF_INV(s->buf);
return ret;
}