Below is the list of changes that have just been committed into a local
5.1 repository of rafal. When rafal does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-10-25 19:24:56+00:00, rafal@quant.(none) +1 -0
added sql/backup.cc which mysteriously dissepeared from the previous changeset
sql/backup.cc@stripped, 2006-10-25 19:24:53+00:00, rafal@quant.(none) +1050 -0
Missed in previous changeset, adding
sql/backup.cc@stripped, 2006-10-25 19:24:53+00:00, rafal@quant.(none) +0 -0
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: rafal
# Host: quant.(none)
# Root: /ext/mysql/bk/backup/prototype
--- New file ---
+++ sql/backup.cc 06/10/25 19:24:53
#include "mysql_priv.h"
#include "sql_show.h"
#include "backup.h"
TABLE *create_schema_table(THD *thd, TABLE_LIST *table_list); // defined in sql_show.cc
namespace backup {
/*
Data describing single table column.
Variable length values (field name and comment) are stored in a memory pool pointed
by pool.
This structure can be initialized from a Field structure (used to describe table
columns inside TABLE structures).
*/
struct Field_data
{
enum_field_types type;
uint length;
uint decimals;
Field_data():pool(NULL) {};
/*
Field_data(const Field_data &fd):
type(fd.type), length(fd.length), decimals(fd.decimals), pool(fd.pool)
{
const_cast<Field_data&>(fd).pool= NULL;
};
~Field_data()
{
if( pool )
my_free(pool,MYF(0));
};
*/
Field_data& operator=(const Field &f);
void free()
{
if( pool )
my_free(pool,MYF(0));
};
private:
byte* pool;
friend class Field_create; // this class, defined below, is used to transform the
// column description to the format accepted by table
// creating functions
};
Field_data &Field_data::operator=(const Field &f)
{
type= f.type();
length= 0;
decimals= 0;
// FIXME: legth and decimals for colums of type DECIMAL (look into SHOW CREATE command)
if( type == MYSQL_TYPE_DECIMAL || type == MYSQL_TYPE_NEWDECIMAL )
length= f.field_length - f.decimals(); // TODO: length of strings, other anomalies?
else
length= f.field_length; // FIXME: what about string fields (with wide chars)?
if( type == MYSQL_TYPE_DECIMAL || type == MYSQL_TYPE_NEWDECIMAL
|| type == MYSQL_TYPE_FLOAT || type == MYSQL_TYPE_DOUBLE )
decimals= f.decimals();
char *ptr = pool;
if( pool ) my_free(ptr,MYF(0));
pool= ptr= my_malloc( strlen(f.field_name)+f.comment.length+2, MYF(0) );
strncpy(ptr,f.field_name,strlen(f.field_name)+1);
strncpy(ptr+strlen(ptr)+1,f.comment.str,f.comment.length+1);
// DBUG_PRINT("backup",(" saving field %d %s (%d/%d)",type,ptr,length,decimals));
return *this;
};
/*
* Structure for keeping per table metadata
*/
struct Table_data
{
char name[32]; // table name
unsigned fields_no; // number of columns
Field_data fields[100]; // column descriptions
Table_data():fields_no(0) {};
Table_data(const TABLE &tbl); // read data from open table structure
void free();
};
Table_data::Table_data(const TABLE &tbl)
{
DBUG_PRINT("backup", ("saving fields of table %s", tbl.s->table_name.str));
strncpy(name,tbl.s->table_name.str,32);
fields_no= 0;
for( fields_no=0 ; tbl.field[fields_no] && (fields_no < 100) ; fields_no++ )
{
//DBUG_PRINT("backup", ( " adding field %d: %s", i, t->field[i]->field_name ) );
fields[fields_no] = *tbl.field[fields_no];
};
};
void Table_data::free()
{
while( fields_no > 0 )
fields[--fields_no].free();
}
class Backup_stream
{
public:
static const uint DATA_LEN= 16*1024;
Backup_stream(byte buf[DATA_LEN]):stream(0),data(buf),pos(0) {};
Backup_stream &operator[](uint no) { stream= no; return *this; };
Backup_stream &operator<<(const Buffer &buf);
Backup_stream &operator>>(Buffer &buf);
void rewind() { pos= 0; };
void close()
{
*reinterpret_cast<uint*>(data+pos)= 0;
};
uint size() const
{ return pos; };
private:
uint stream;
byte *data;
uint pos;
struct header
{
uint size;
uint stream;
header():size(0),stream(0) {};
header(const Buffer &buf):size(buf.size),stream(buf.stream_no) {};
};
};
struct Backup_image
{
version_t version;
Table_data meta[10];
unsigned howmuch; // how many entries are saved in meta
Backup_stream str; // stream for table data
Backup_image();
~Backup_image();
void add(const Table_data &td);
};
static Backup_image backup_image;
/*************************************************
*
* BACKUP
*
*************************************************/
/*
Returns tmp table containing records from a given I_S table
*/
TABLE* get_schema_table(THD *thd, ST_SCHEMA_TABLE *st);
// variant using schema table id
inline
TABLE* get_schema_table(THD *thd, enum enum_schema_tables which)
{
if( which < SCH_END )
return get_schema_table(thd, ::get_schema_table(which));
else
return NULL;
};
/*
* Read backup image from backup driver and send it to the stream
*/
int get_se_images(const Table_map&, Backup_stream&);
/*
Implements BACKUP command -- called from sql_parse.cc
Currently we want to backup all tables stored inside the ARCHIVE engine.
*/
int do_backup(THD *thd)
{
DBUG_ENTER("backup::do_backup");
// open I_S.TABLES table
TABLE *tables = get_schema_table(thd,SCH_TABLES);
if( !tables )
{
DBUG_PRINT("backup",("get_schema_table returned NULL!"));
goto error;
};
// scan TABLES table and select tables to be backed-up
{ // group because of gotos
TABLE_LIST *tl= NULL; // for opening the tables
handler *ha = tables->file;
// TODO; locking
ha->ha_rnd_init(TRUE);
// loop through records in I_S.TABLES table and fill TABLE_LIST
while( ! ha->rnd_next(tables->record[0]) )
{
String buf; // to read values of table's fields
// DBUG_PRINT("backup",("got next row from TABLES table"));
// field positions must be synchronized with defs in sql_show.cc
Field* db_f= tables->field[1];
Field* name_f= tables->field[2];
Field* engine_f= tables->field[4];
// select only tables stored in ARCHIVE engine
if( strncmp("ARCHIVE",engine_f->val_str(&buf)->ptr(),7) == 0 )
{
// fill TABLE_LIST element
TABLE_LIST *ptr= (TABLE_LIST*)sql_alloc(sizeof(TABLE_LIST));
bzero(ptr,sizeof(TABLE_LIST));
String *name= name_f->val_str(&buf);
//DBUG_PRINT("backup",("allocating %d (%d) bytes",name->length(),buf.length()));
ptr->alias= ptr->table_name= sql_alloc(name->length()+1);
strncpy(ptr->table_name,name->ptr(),name->length()+1);
DBUG_PRINT("backup",("adding %s to TABLE_LIST",ptr->alias));
ptr->db= (char *)"test";
ptr->lock_type= TL_READ;
// and add it to the list
ptr->next_global= ptr->next_local=
ptr->next_name_resolution_table= tl;
tl= ptr;
};
}; // loop through I_S.TABLES records
ha->ha_rnd_end();
// Open the tables
DBUG_PRINT("backup", ( "calling open_and_lock_tables" ));
if( open_and_lock_tables(thd, tl) )
{
DBUG_PRINT("backup", ( "error!" ));
return backup::ERROR;
};
DBUG_PRINT("backup", ( "tables opened" ));
// add each table to backup list and save its metadata
Table_map backup_tables; // tables to be backed-up
for( TABLE_LIST *ptr= tl; ptr ; ptr= ptr->next_global )
if( ptr->table )
{
Table_ref tr(*ptr);
Table_data td(*ptr->table);
DBUG_PRINT("backup",("adding %s to backup list",tr.name().ptr()));
backup_tables.add(tr);
backup_image.add(td);
};
DBUG_PRINT("backup",("closing tables"));
close_thread_tables(thd,FALSE,TRUE);
/*
// dealocate resources
for( TABLE_LIST *ptr= tl ; ptr ; )
{
TABLE_LIST *aux= ptr;
ptr= ptr->next_global;
free(aux->table_name); // this crashes server
free(aux);
};
*/
// collect backup image data
DBUG_PRINT("backup",("getting backup images for %d tables",backup_tables.count()));
if( backup_tables.count() > 0 )
get_se_images( backup_tables, backup_image.str );
backup_image.str.close();
DBUG_PRINT("backup",("backup image has %d bytes",backup_image.str.size()));
};
ok:
send_ok(thd);
DBUG_RETURN(0);
error:
my_error(ER_NO,MYF(0),"ala","ala");
DBUG_RETURN(1);
};
/*
* Function get_se_images implements the backup protocol
*/
#define MAX_BUF_SIZE 10*1024
#define MAX_ERRORS 3
const LEX_STRING archive_str= { C_STRING_WITH_LEN("ARCHIVE") };
int get_se_images(const Table_map &tables, Backup_stream &str)
{
enum { START, INIT, SYNC, FINAL, DONE } stage= START;
DBUG_ENTER("backup: get_se_images");
// get backup engine and backup driver
handlerton *hton= ha_resolve_by_name(current_thd,&archive_str);
Engine *be;
if( !hton )
{
DBUG_PRINT("backup",("Can't get the handlerton!"));
DBUG_RETURN(1);
};
if( !(hton->backup_engine) )
{
DBUG_PRINT("backup",("Engine doesn't support backup!"));
DBUG_RETURN(1);
};
if( hton->backup_engine(be) || !be )
{
DBUG_PRINT("backup", ("Can't get backup engine!") );
DBUG_RETURN(1);
};
DBUG_PRINT("backup", ("Got backup engine") );
backup_image.version= be->version();
uint streams= tables.count()+1; // number of streams in the image
// get a backup driver
Engine::Backup *drv;
if( be->get_backup(tables,drv) || !drv )
{
DBUG_PRINT("backup", ("Can't get backup driver!") );
be->free();
DBUG_RETURN(2);
};
DBUG_PRINT("backup", ("Got backup driver") );
// allocate buffer
size_t bufsize = drv->buffer_size();
if( bufsize > MAX_BUF_SIZE ) bufsize= MAX_BUF_SIZE;
Buffer buf(bufsize);
if( !buf )
{
DBUG_PRINT("backup", ("Error allocationg buffer") );
drv->free();
be->free();
DBUG_RETURN(3);
};
DBUG_PRINT("backup", ("Got a buffer") );
// initialize creation of backup image
if( drv->start_backup() )
{
DBUG_PRINT("backup", ("Error initializing backup!") );
drv->free();
be->free();
DBUG_RETURN(3);
};
// initial stage
stage= INIT;
DBUG_PRINT("backup", ("initial stage") );
uint errors= 0, res;
while( stage == INIT )
{
buf.reset();
res= drv->get_data(buf);
DBUG_PRINT("backup", (" got data packet of size %d for stream %d (res=%d)",buf.size,buf.stream_no,res) );
switch( res )
{
case INIT_DONE:
stage= SYNC;
case OK:
str[buf.stream_no] << buf;
if( buf.last ) // should not happen!
streams--;
errors= 0;
break;
case ERROR:
errors++;
if( errors > MAX_ERRORS )
{
DBUG_PRINT("backup", ("Error getting backup image!") );
drv->cancel();
drv->free();
be->free();
return 4;
};
case WAIT: case NOT_NOW:
break; // wait a bit?
};
};
// synchronization stage
stage= SYNC;
DBUG_PRINT("backup", ("synchronization stage") );
drv->lock();
drv->cont();
// final stage
stage= FINAL;
DBUG_PRINT("backup", ("final stage") );
while( stage == FINAL )
{
buf.reset();
res= drv->get_data(buf);
DBUG_PRINT("backup", (" got data packet of size %d for stream %d (res=%d)",buf.size,buf.stream_no,res) );
switch( res )
{
case INIT_DONE: // can't happen!
case OK:
str[buf.stream_no] << buf;
// FIXME: watch for ends of all streams
if( buf.last )
{
streams--;
DBUG_PRINT("backup", (" end of stream %d (%d left)",buf.stream_no,streams) );
if( !streams ) stage= DONE;
};
errors= 0;
break;
case ERROR:
errors++;
if( errors > MAX_ERRORS )
{
DBUG_PRINT("backup", ("Error getting backup image!") );
drv->cancel();
drv->free();
be->free();
return 4;
};
case WAIT: case NOT_NOW:
break;// wait a bit?
};
};
// backup ready!
DBUG_PRINT("backup", ("all table images collected") );
drv->free();
be->free();
DBUG_RETURN(0);
};
/*************************************************
*
* RESTORE
*
*************************************************/
/*
Create a single table using info stored in Table_data structure.
Drops the table if it exists.
*/
void restore_table(THD *thd, Table_data const &td);
int do_restore(THD *thd)
{
DBUG_ENTER("backup::do_restore");
DBUG_PRINT("backup",("backup image contains %d tables, %d bytes of table data",
backup_image.howmuch,backup_image.str.size()));
// create empty tables
for( unsigned i=0; i<backup_image.howmuch ; i++ )
{
Table_data &td = backup_image.meta[i];
DBUG_PRINT("backup", ( "restoring table %s (pos %d)",
td.name, i ) );
restore_table(thd,td);
};
// Get restore engine
handlerton *hton= ha_resolve_by_name(thd,&archive_str);
Engine *be;
if( !hton )
{
DBUG_PRINT("backup",("Can't get the handlerton!"));
DBUG_RETURN(1);
};
if( !(hton->backup_engine) )
{
DBUG_PRINT("backup",("Engine doesn't support backup!"));
DBUG_RETURN(1);
};
if( hton->backup_engine(be) || !be )
{
DBUG_PRINT("backup", ("Can't get backup engine!") );
DBUG_RETURN(1);
};
DBUG_PRINT("backup", ("Got backup engine") );
// collect list of tables for the driver
uint streams= backup_image.howmuch+1; // number of streams in the image
Table_map tables;
for( uint n=0 ; n < backup_image.howmuch ; ++n )
{
tables.add(Table_ref("test",backup_image.meta[n].name));
DBUG_PRINT("backup",("Restoring table %s <- %d",
tables[n].name().ptr(),
n+1));
};
// get a restore driver
Engine::Restore *drv;
if( be->get_restore(backup_image.version,tables,drv) || !drv )
{
DBUG_PRINT("backup", ("Can't get backup driver!") );
be->free();
DBUG_RETURN(2);
};
DBUG_PRINT("backup", ("Got backup driver") );
// initialize creation of backup image
if( drv->prepare() )
{
DBUG_PRINT("backup", ("Error initializing backup!") );
drv->free();
be->free();
DBUG_RETURN(3);
};
// pump data into driver
backup_image.str.rewind();
Buffer buf;
enum { SENDING, NEXT, DONE, ERROR } stage= NEXT;
uint errors= 0;
uint repeats= 0;
while( stage != DONE && stage != ERROR )
{
if( stage == NEXT )
{
backup_image.str >>buf; // realocate buffer if too small
if( buf.size == 0 ) // end of data in the stream (or error)
{
stage= DONE;
break;
};
};
DBUG_PRINT("backup",("Got %d bytes from stream %d",buf.size,buf.stream_no));
switch( drv->send_data(buf) )
{
case backup::OK:
stage= NEXT;
repeats= 0;
break;
case backup::ERROR:
if( errors > MAX_ERRORS )
stage= ERROR;
errors++;
break;
case backup::WAIT:
case backup::NOT_NOW:
stage= SENDING;
default:
if( repeats > 7 )
stage= ERROR;
repeats++;
break;
};
}; // data sending loop
DBUG_PRINT("backup",("Data sent"));
// finalize restore
drv->cont();
drv->free();
be->free();
if( stage == ERROR )
goto error;
ok:
send_ok(thd);
DBUG_RETURN(0);
error:
my_error(ER_NO,MYF(0),"ala","ala");
DBUG_RETURN(1);
};
/*
* restore_table() -- creating empty table from data in Table_data structure.
* If table with the same name exists, it is removed.
*/
/*
Create list of table's field descriptions as required by
mysql_create_table().
*/
int setup_fields(THD *thd, const Table_data &td, List<create_field> &fields);
/*
Create list describing keys of a table as required by
mysql_create_table(). (not implemented)
*/
int setup_keys(THD *thd, const Table_data &td, List<Key> &keys);
void restore_table(THD *thd, const Table_data &td)
{
int err;
TABLE_LIST tables;
bzero( (char *)&tables, sizeof(TABLE_LIST));
tables.db= const_cast<char*>("test");
tables.alias= tables.table_name = const_cast<char*>(td.name);
// remove the table if it exists
DBUG_PRINT("backup",(" trying to drop table %s",td.name));
err= mysql_rm_table_part2_with_lock(thd, &tables, TRUE, FALSE, TRUE); // = DROP TABLE IF EXISTS
DBUG_PRINT("backup",(" result= %d",err));
HA_CREATE_INFO info;
bzero( &info, sizeof(HA_CREATE_INFO) );
List<create_field> fields;
List<Key> keys;
setup_fields(thd,td,fields);
setup_keys(thd,td,keys);
handlerton *hton= ha_resolve_by_name(thd,&archive_str);
if( !hton )
{
DBUG_PRINT("backup",("Can't find handlerton for SE 'archive'"));
return;
};
info.db_type= hton;
DBUG_PRINT("backup",(" calling mysql_create_table"));
err= mysql_create_table(thd, "test", td.name, &info, fields, keys, FALSE, 0, FALSE);
DBUG_PRINT("backup",(" result= %d",err));
};
/*
Class Field_create extends the create_field structure used by mysql_create_table
with a constructor initializing it using information stored in a Field_save object.
*/
/* TODO/Limitations:
- support enum and set type (interval_list prop, string_list: in sql_yacc.yy)
- support spatial types,
- (re)create tables so that SHOW CREATE displays the same things as in the original,
- column flags and options
*/
struct Field_create: public ::create_field
{
Field_create(THD* thd, const Field_data& data);
private:
LEX_STRING comment;
char* itoa(int n); // convert integer to C-string (dynamically alocated)
};
int setup_fields(THD *thd, const Table_data &td, List<create_field> &fields)
{
for(uint field=0; field < td.fields_no; field++)
{
Field_create* f= new Field_create(thd,td.fields[field]);
// DBUG_PRINT("backup",(" got field %d: %s",field,f->field_name));
fields.push_back(f);
};
return 0;
};
// Not implemented
int setup_keys(THD *thd, const Table_data &td, List<Key> &keys)
{
return 0;
};
// TODO: Use select condition to get only selected rows from a table
// CHECK: prepare_select_* functions (sql_help.cc)
TABLE* get_schema_table(THD *thd, ST_SCHEMA_TABLE *st)
{
TABLE *t;
TABLE_LIST arg;
bzero( &arg, sizeof(TABLE_LIST) );
/* set context for create_schema_table call */
arg.schema_table= st;
arg.alias= NULL;
arg.select_lex= NULL;
t= create_schema_table(thd,&arg); // Q: who deallocates *t ?
if( !t ) return NULL; // error!
/*
Temporarily set thd->lex->wild to NULL to keep st->fill_table
happy :)
*/
LEX local_lex= *thd->lex;
LEX *saved_lex= thd->lex;
thd->lex= &local_lex;
local_lex.wild = NULL;
local_lex.sql_command = enum_sql_command(0);
// Note: set thd->lex->sql_command to something neutral (see, make_db_list/get_index_file_values).
/* context for fill_table */
arg.table= t;
st->fill_table(thd,&arg,NULL); // NULL = no select condition
/* restore wild field */
thd->lex= saved_lex;
return t;
};
// Implementation of Field_create class
Field_create::Field_create(THD* thd, const Field_data& data)
{
char* ptr= data.pool;
char* name= ptr;
ptr+= strlen(name)+1;
comment.str= ptr;
comment.length= strlen(comment.str);
init(thd,
name,
data.type,
itoa(data.length),
itoa(data.decimals),
0, // fld_type_modifiers ?
NULL, // fld_default_value (TBD)
NULL, // fld_on_update_value (ignore)
&comment,
NULL, // fld_change (ignore)
NULL, // List<String>* fld_interval_list ?
NULL, //&charset,
0 ); // fld_geom_type ?
};
// CHECK: longlong2str
char* Field_create::itoa(int n)
{
char buf[16];
sprintf(buf,"%d",n);
// char *ptr = my_malloc(strlen(buf)+1,MYF(0));
char* ptr = new char[strlen(buf)+1];
strcpy(ptr,buf);
return ptr;
};
// Implementation of Table_map
Table_map::Table_map(): m_head(NULL),m_last(NULL),m_count(0) {};
Table_map::~Table_map()
{
for( node *ptr= m_head ; ptr ; )
{
node *n=ptr;
ptr= n->next;
delete n;
};
};
void Table_map::add(const Table_ref &t)
{
node *n= new node(t);
if( m_head == NULL )
{
m_count=1;
m_head= m_last= n;
}
else
{
m_count++;
m_last->next= n;
m_last= n;
};
};
const Table_ref &Table_map::operator[](uint pos) const
{
DBUG_ASSERT(pos < m_count);
node *ptr;
for( ptr= m_head ; ptr && pos ; ptr= ptr->next )
pos--;
if( ptr )
return ptr->tbl;
else
return m_last->tbl; // can crash if m_last == NULL
};
// Implementation of Table_ref and Db_ref
Table_ref &Table_ref::operator=(const TABLE_LIST &t)
{
m_name.set( STRING_WITH_LEN(t.table_name), table_alias_charset );
m_db= t.db;
return *this;
};
const char *Table_ref::path() const
{
static char buf[FN_REFLEN]= "";
// if( buf[0] == '\0' )
build_table_filename( buf, sizeof(buf)-1,
m_db.name().ptr(),
m_name.ptr(),
"", 0 );
return buf;
};
const String &Db_ref::catalog() const
{
return my_null_string;
};
// In memory stream implementation
static byte image_data[Backup_stream::DATA_LEN];
Backup_image::Backup_image(): str(image_data), howmuch(0) {};
Backup_image::~Backup_image()
{
while( howmuch )
meta[--howmuch].free();
};
void Backup_image::add(const Table_data &td)
{
if( howmuch >= 10 ) return;
meta[howmuch++]= td;
};
Backup_stream &Backup_stream::operator<<(const Buffer &buf)
{
DBUG_PRINT("backup",( "adding %d bytes to backup image (str %d)",
buf.size, stream ));
if( buf.size > 0 && pos+sizeof(header)+buf.size < DATA_LEN )
{
header hdr(buf);
*reinterpret_cast<header*>(data+pos)= hdr;
pos += sizeof(header);
memcpy(data+pos,buf.data,buf.size);
pos += buf.size;
};
return *this;
};
Backup_stream &Backup_stream::operator>>(Buffer &buf)
{
DBUG_PRINT("backup",( "adding %d bytes to backup image (str %d)",
buf.size, stream ));
header hdr;
uint buflen= buf.size;
buf.size= 0;
if( pos+sizeof(header) >= DATA_LEN )
return *this;
hdr= *reinterpret_cast<header*>(data+pos);
pos+= sizeof(header);
if( !hdr.size )
return *this;
if( pos+hdr.size >= DATA_LEN )
return *this;
if( buflen == 0 || buflen < hdr.size ) // reallocate buffer
buf.realloc(hdr.size);
if( !buf ) // no space for buffer
return *this;
buf.size= hdr.size;
buf.stream_no= hdr.stream;
memcpy(buf.data,data+pos,hdr.size);
pos += hdr.size;
return *this;
};
}; // backup namespace
/*
Notes
Check type: Table_ident (sql_parse.cc)
*/
| Thread |
|---|
| • bk commit into 5.1 tree (rafal:1.2351) | rsomla | 25 Oct |