#At file:///home/svoj/devel/mysql/WL4037/mysql-6.0-backup/
2639 Sergey Vojtovich 2008-06-25
WL#4037 - Online backup: Use zlib compression to reduce backup
file size
This patch implements backup image compression/decompression for
backup and restore.
added:
mysql-test/r/backup_compression.result
mysql-test/t/backup_compression.test
modified:
sql/backup/Makefile.am
sql/backup/backup_kernel.h
sql/backup/kernel.cc
sql/backup/stream.cc
sql/backup/stream.h
sql/lex.h
sql/share/errmsg.txt
sql/sql_lex.h
sql/sql_yacc.yy
per-file messages:
mysql-test/r/backup_compression.result
A test case for WL#4037.
mysql-test/t/backup_compression.test
A test case for WL#4037.
sql/backup/Makefile.am
Added zlib headers path to INCLUDES.
sql/backup/backup_kernel.h
Added compression switch argument to prepare_for_backup().
sql/backup/kernel.cc
Added compression switch argument to prepare_for_backup().
Pass through compression switch from lex to Output_stream
constructor.
sql/backup/stream.cc
Updated stream_write() and stream_read() functions, so they
optionally compress/decompress stream (this is done if
m_with_compression switch is true).
As Stream::rewind() is never used, compressed stream cannot
be rewound in this version. Updated Stream::rewind() method,
so it returns an error when attempting to rewind compressed
stream.
Updated functions that handle backup image headers, so that
they can read/write headers from/to compressed stream.
Initialize inflate/deflate when opening Input/Output
stream.
Deinitialize inflate/deflate when closing Input/Output
stream.
sql/backup/stream.h
Include zlib header.
Added new elements to fd_stream structure:
m_header_buf - buffer for image headers
m_with_compression - compression switch
zstream - zlib descriptor
zbuf - zlib input/output buffer
Added compression switch argument to Output_stream
constructor.
sql/lex.h
Added "COMPRESSION" and "COMPRESSION_ALGORITHM" symbols.
sql/share/errmsg.txt
Added error message for failure to initialize compression
library in backup stream.
sql/sql_lex.h
Added backup_compression switch.
sql/sql_yacc.yy
Extended BACKUP statement syntax according to WL#4037:
BACKUP DATABASE ...
[ WITH COMPRESSION [ COMPRESSION_ALGORITHM = algorithm_name ] ]
...;
=== added file 'mysql-test/r/backup_compression.result'
--- a/mysql-test/r/backup_compression.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/r/backup_compression.result 2008-06-25 13:30:04 +0000
@@ -0,0 +1,23 @@
+CREATE DATABASE db1;
+CREATE TABLE db1.t1(a INT);
+BACKUP DATABASE db1 TO 'db1.bak.gz' WITH COMPRESSION;
+backup_id
+#
+CREATE TABLE db1.t2(a INT);
+BACKUP DATABASE db1 TO 'db2.bak.gz' WITH COMPRESSION COMPRESSION_ALGORITHM=gzip;
+backup_id
+#
+RESTORE FROM 'db1.bak.gz';
+backup_id
+#
+SHOW TABLES FROM db1;
+Tables_in_db1
+t1
+RESTORE FROM 'db2.bak.gz';
+backup_id
+#
+SHOW TABLES FROM db1;
+Tables_in_db1
+t1
+t2
+DROP DATABASE db1;
=== added file 'mysql-test/t/backup_compression.test'
--- a/mysql-test/t/backup_compression.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/t/backup_compression.test 2008-06-25 13:30:04 +0000
@@ -0,0 +1,24 @@
+--source include/not_embedded.inc
+--source include/have_compress.inc
+
+CREATE DATABASE db1;
+
+CREATE TABLE db1.t1(a INT);
+replace_column 1 #;
+BACKUP DATABASE db1 TO 'db1.bak.gz' WITH COMPRESSION;
+
+CREATE TABLE db1.t2(a INT);
+replace_column 1 #;
+BACKUP DATABASE db1 TO 'db2.bak.gz' WITH COMPRESSION COMPRESSION_ALGORITHM=gzip;
+
+replace_column 1 #;
+RESTORE FROM 'db1.bak.gz';
+SHOW TABLES FROM db1;
+
+replace_column 1 #;
+RESTORE FROM 'db2.bak.gz';
+SHOW TABLES FROM db1;
+
+DROP DATABASE db1;
+--remove_file $MYSQLTEST_VARDIR/master-data/db1.bak.gz
+--remove_file $MYSQLTEST_VARDIR/master-data/db2.bak.gz
=== modified file 'sql/backup/Makefile.am'
--- a/sql/backup/Makefile.am 2008-05-14 00:24:06 +0000
+++ b/sql/backup/Makefile.am 2008-06-25 13:30:04 +0000
@@ -20,6 +20,7 @@
noinst_LTLIBRARIES = libbackup.la libbackupstream.la
INCLUDES = \
+ @ZLIB_INCLUDES@ \
-I$(top_builddir)/include \
-I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
=== modified file 'sql/backup/backup_kernel.h'
--- a/sql/backup/backup_kernel.h 2008-05-05 15:06:40 +0000
+++ b/sql/backup/backup_kernel.h 2008-06-25 13:30:04 +0000
@@ -66,7 +66,7 @@ class Backup_restore_ctx: public backup:
bool is_valid() const;
ulonglong op_id() const;
- Backup_info* prepare_for_backup(LEX_STRING location, const char*);
+ Backup_info* prepare_for_backup(LEX_STRING location, const char*, bool);
Restore_info* prepare_for_restore(LEX_STRING location, const char*);
int do_backup();
=== modified file 'sql/backup/kernel.cc'
--- a/sql/backup/kernel.cc 2008-05-21 10:45:55 +0000
+++ b/sql/backup/kernel.cc 2008-06-25 13:30:04 +0000
@@ -142,7 +142,8 @@ execute_backup_command(THD *thd, LEX *le
{
// prepare for backup operation
- Backup_info *info= context.prepare_for_backup(lex->backup_dir, thd->query);
+ Backup_info *info= context.prepare_for_backup(lex->backup_dir, thd->query,
+ lex->backup_compression);
// reports errors
if (!info || !info->is_valid())
@@ -447,6 +448,7 @@ int Backup_restore_ctx::prepare(LEX_STRI
@param[in] location path to the file where backup image should be stored
@param[in] query BACKUP query starting the operation
+ @param[in] with_compression backup image compression switch
@returns Pointer to a @c Backup_info instance which can be used for selecting
which objects to backup. NULL if an error was detected.
@@ -459,7 +461,8 @@ int Backup_restore_ctx::prepare(LEX_STRI
is performed using @c do_backup() method.
*/
Backup_info*
-Backup_restore_ctx::prepare_for_backup(LEX_STRING location, const char *query)
+Backup_restore_ctx::prepare_for_backup(LEX_STRING location, const char *query,
+ bool with_compression)
{
using namespace backup;
@@ -488,7 +491,7 @@ Backup_restore_ctx::prepare_for_backup(L
Open output stream.
*/
- Output_stream *s= new Output_stream(*this, path);
+ Output_stream *s= new Output_stream(*this, path, with_compression);
if (!s)
{
=== modified file 'sql/backup/stream.cc'
--- a/sql/backup/stream.cc 2008-03-04 16:06:28 +0000
+++ b/sql/backup/stream.cc 2008-06-25 13:30:04 +0000
@@ -3,6 +3,10 @@
#include "backup_stream.h"
#include "stream.h"
+#ifdef HAVE_COMPRESS
+#define ZBUF_SIZE 65536 // compression I/O buffer size
+#endif
+
const unsigned char backup_magic_bytes[8]=
{
0xE0, // ###.....
@@ -23,6 +27,8 @@ namespace backup {
Pointer to this function is stored in @c backup_stream::stream structure
and then used by other stream library function for physical writing of
data.
+
+ Performs stream compression if requested.
*/
extern "C" int stream_write(void *instance, bstream_blob *buf, bstream_blob)
{
@@ -46,12 +52,34 @@ extern "C" int stream_write(void *instan
DBUG_ASSERT(buf->end);
size_t howmuch = buf->end - buf->begin;
+#ifdef HAVE_COMPRESS
+ if (s->m_with_compression)
+ {
+ z_stream *zstream= &s->zstream;
+ zstream->next_in= buf->begin;
+ zstream->avail_in= howmuch;
+ do
+ {
+ if (!zstream->avail_out)
+ {
+ if (my_write(fd, s->zbuf, ZBUF_SIZE, MYF(MY_NABP)))
+ DBUG_RETURN(BSTREAM_ERROR);
+ zstream->next_out= s->zbuf;
+ zstream->avail_out= ZBUF_SIZE;
+ }
+ if (deflate(zstream, Z_NO_FLUSH) != Z_OK)
+ DBUG_RETURN(BSTREAM_ERROR);
+ } while (zstream->avail_in);
+ }
+ else
+#endif
+ {
+ res= my_write(fd, buf->begin, howmuch,
+ MY_NABP /* error if not all bytes written */ );
- res= my_write(fd, buf->begin, howmuch,
- MY_NABP /* error if not all bytes written */ );
-
- if (res)
- DBUG_RETURN(BSTREAM_ERROR);
+ if (res)
+ DBUG_RETURN(BSTREAM_ERROR);
+ }
s->bytes += howmuch;
@@ -65,6 +93,8 @@ extern "C" int stream_write(void *instan
Pointer to this function is stored in @c backup_stream::stream structure
and then used by other stream library function for physical reading of
data.
+
+ Performs stream decompression if requested.
*/
extern "C" int stream_read(void *instance, bstream_blob *buf, bstream_blob)
{
@@ -88,8 +118,40 @@ extern "C" int stream_read(void *instanc
DBUG_ASSERT(buf->end);
howmuch= buf->end - buf->begin;
-
- howmuch= my_read(fd, buf->begin, howmuch, MYF(0));
+#ifdef HAVE_COMPRESS
+ if (s->m_with_compression)
+ {
+ int zerr;
+ z_stream *zstream= &s->zstream;
+ zstream->next_out= buf->begin;
+ zstream->avail_out= howmuch;
+ do
+ {
+ if (!zstream->avail_in)
+ {
+ zstream->avail_in= my_read(fd, s->zbuf, ZBUF_SIZE, MYF(0));
+ if (zstream->avail_in == (size_t) -1)
+ DBUG_RETURN(BSTREAM_ERROR);
+ else if (!zstream->avail_in)
+ break;
+ zstream->next_in= s->zbuf;
+ }
+ zerr= inflate(zstream, Z_NO_FLUSH);
+ if (zerr == Z_STREAM_END)
+ {
+ howmuch= zstream->next_out - buf->begin;
+ break;
+ }
+ else if (zerr != Z_OK)
+ DBUG_RETURN(BSTREAM_ERROR);
+ howmuch= zstream->next_out - buf->begin;
+ } while (zstream->avail_out);
+ }
+ else
+#endif
+ {
+ howmuch= my_read(fd, buf->begin, howmuch, MYF(0));
+ }
/*
How to detect EOF when reading bytes with my_read().
@@ -142,13 +204,20 @@ void Stream::close()
bool Stream::rewind()
{
+#ifdef HAVE_COMPRESS
+ /* Compressed stream cannot be rewound */
+ if (m_with_compression)
+ return FALSE;
+#endif
return m_fd >= 0 && my_seek(m_fd, 0, SEEK_SET, MYF(0)) == 0;
}
-Output_stream::Output_stream(Logger &log, const ::String &name)
+Output_stream::Output_stream(Logger &log, const ::String &name,
+ bool with_compression)
:Stream(log, name, O_WRONLY|O_CREAT|O_EXCL|O_TRUNC)
{
+ m_with_compression= with_compression;
stream.write= stream_write;
m_block_size=0; // use default block size provided by the backup stram library
}
@@ -163,7 +232,7 @@ Output_stream::Output_stream(Logger &log
int Output_stream::write_magic_and_version()
{
byte buf[10];
-
+ bstream_blob blob;
DBUG_ASSERT(m_fd >= 0);
memmove(buf, backup_magic_bytes, 8);
@@ -171,9 +240,10 @@ int Output_stream::write_magic_and_versi
buf[8]= 0x01;
buf[9]= 0x00;
- int ret= my_write(m_fd, buf, 10,
- MY_NABP /* error if not all bytes written */ );
- if (ret)
+ blob.begin= buf;
+ blob.end= buf + 10;
+ int ret= stream_write((fd_stream*)this, &blob, blob);
+ if (ret != BSTREAM_OK)
return -1; // error when writing magic bytes
else
return 10;
@@ -221,6 +291,35 @@ bool Output_stream::open()
if (!ret)
return FALSE;
+ if (m_with_compression)
+ {
+#ifdef HAVE_COMPRESS
+ int zerr;
+ if (!(zbuf= (uchar*) my_malloc(ZBUF_SIZE, MYF(0))))
+ {
+ m_log.report_error(ER_OUTOFMEMORY, ZBUF_SIZE);
+ return FALSE;
+ }
+ zstream.zalloc= 0;
+ zstream.zfree= 0;
+ zstream.opaque= 0;
+ zstream.msg= 0;
+ zstream.next_out= zbuf;
+ zstream.avail_out= ZBUF_SIZE;
+ if ((zerr= deflateInit2(&zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
+ MAX_WBITS + 16, MAX_MEM_LEVEL,
+ Z_DEFAULT_STRATEGY) != Z_OK))
+ {
+ m_log.report_error(ER_BACKUP_FAILED_TO_INIT_COMPRESSION,
+ zerr, zstream.msg);
+ return FALSE;
+ }
+#else
+ m_log.report_error(ER_FEATURE_DISABLED, "compression", "--with-zlib-dir");
+ return FALSE;
+#endif
+ }
+
return init();
}
@@ -235,6 +334,34 @@ void Output_stream::close()
return;
bstream_close(this);
+#ifdef HAVE_COMPRESS
+ if (m_with_compression)
+ {
+ int zerr;
+ zstream.avail_in= 0;
+ zstream.next_in= 0;
+ do
+ {
+ zerr= deflate(&zstream, Z_FINISH);
+ if (zerr != Z_STREAM_END && zerr != Z_OK)
+ {
+ m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "deflate");
+ break;
+ }
+ if (my_write(m_fd, zbuf, ZBUF_SIZE - zstream.avail_out,
+ MYF(MY_NABP)))
+ {
+ m_log.report_error(ER_GET_ERRMSG, my_errno, "", "my_write");
+ break;
+ }
+ zstream.next_out= zbuf;
+ zstream.avail_out= ZBUF_SIZE;
+ } while (zerr != Z_STREAM_END);
+ if ((zerr= deflateEnd(&zstream)) != Z_OK)
+ m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "deflateEnd");
+ my_free(zbuf, MYF(0));
+ }
+#endif
Stream::close();
}
@@ -274,19 +401,12 @@ Input_stream::Input_stream(Logger &log,
*/
int Input_stream::check_magic_and_version()
{
- byte buf[10];
-
DBUG_ASSERT(m_fd >= 0);
- int ret= my_read(m_fd, buf, 10,
- MY_NABP /* error if not all bytes read */ );
- if (ret)
- return -1; // couldn't read magic bytes
-
- if (memcmp(buf, backup_magic_bytes, 8))
+ if (memcmp(m_header_buf, backup_magic_bytes, 8))
return -1; // wrong magic bytes
- unsigned int ver = buf[8] + (buf[9]<<8);
+ unsigned int ver = m_header_buf[8] + (m_header_buf[9]<<8);
if (ver != 1)
return -1; // unsupported format version
@@ -321,6 +441,14 @@ bool Input_stream::init()
/**
Open backup stream for reading.
+ @details This method can detect and open compressed streams. In that case
+ stream is initialized for decompression so that stream_read() function will
+ return decompressed data.
+
+ The first 10 bytes in the stream (whether compressed or not) are not
+ available for reading with stream_read(). Instead, they are stored in
+ m_header_buf member and examined by check_magic_and_version().
+
@retval TRUE operation succeeded
@retval FALSE operation failed
@@ -335,6 +463,40 @@ bool Input_stream::open()
if (!ret)
return FALSE;
+ if (my_read(m_fd, m_header_buf, sizeof(m_header_buf),
+ MY_NABP /* error if not all bytes read */ ))
+ return FALSE;
+
+#ifdef HAVE_COMPRESS
+ if (!memcmp(m_header_buf, "\x1f\x8b\x08", 3))
+ {
+ int zerr;
+ bstream_blob blob;
+ if (!(zbuf= (uchar*) my_malloc(ZBUF_SIZE, MYF(0))))
+ {
+ m_log.report_error(ER_OUTOFMEMORY, ZBUF_SIZE);
+ return FALSE;
+ }
+ zstream.zalloc= 0;
+ zstream.zfree= 0;
+ zstream.opaque= 0;
+ zstream.msg= 0;
+ zstream.next_in= zbuf;
+ zstream.avail_in= 10;
+ memcpy(zbuf, m_header_buf, 10);
+ if ((zerr= inflateInit2(&zstream, MAX_WBITS + 16)) != Z_OK)
+ {
+ m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "inflateInit2");
+ return FALSE;
+ }
+ blob.begin= m_header_buf;
+ blob.end= m_header_buf + 10;
+ if (stream_read((fd_stream*) this, &blob, blob) != BSTREAM_OK ||
+ blob.begin != blob.end)
+ return FALSE;
+ }
+#endif
+
return init();
}
@@ -349,6 +511,15 @@ void Input_stream::close()
return;
bstream_close(this);
+#ifdef HAVE_COMPRESS
+ if (m_with_compression)
+ {
+ int zerr;
+ if ((zerr= inflateEnd(&zstream)) != Z_OK)
+ m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "inflateEnd");
+ my_free(zbuf, (MYF(0)));
+ }
+#endif
Stream::close();
}
=== modified file 'sql/backup/stream.h'
--- a/sql/backup/stream.h 2008-03-04 16:06:28 +0000
+++ b/sql/backup/stream.h 2008-06-25 13:30:04 +0000
@@ -7,6 +7,9 @@
#include <backup/image_info.h>
#include <backup/debug.h> // for definition of DBUG_BACKUP
#include <backup/logger.h>
+#ifdef HAVE_COMPRESS
+#include <zlib.h>
+#endif
/**
@file
@@ -55,6 +58,12 @@ struct fd_stream: public backup_stream
{
int m_fd;
size_t bytes;
+ uchar m_header_buf[10];
+ bool m_with_compression;
+#ifdef HAVE_COMPRESS
+ z_stream zstream;
+ uchar *zbuf;
+#endif
fd_stream() :m_fd(-1), bytes(0) {}
};
@@ -101,7 +110,7 @@ class Output_stream:
{
public:
- Output_stream(Logger&, const ::String&);
+ Output_stream(Logger&, const ::String&, bool);
bool open();
void close();
=== modified file 'sql/lex.h'
--- a/sql/lex.h 2008-04-01 15:13:57 +0000
+++ b/sql/lex.h 2008-06-25 13:30:04 +0000
@@ -119,6 +119,8 @@ static SYMBOL symbols[] = {
{ "COMPACT", SYM(COMPACT_SYM)},
{ "COMPLETION", SYM(COMPLETION_SYM)},
{ "COMPRESSED", SYM(COMPRESSED_SYM)},
+ { "COMPRESSION", SYM(COMPRESSION_SYM)},
+ { "COMPRESSION_ALGORITHM", SYM(COMPRESSION_ALGORITHM_SYM)},
{ "CONCURRENT", SYM(CONCURRENT)},
{ "CONDITION", SYM(CONDITION_SYM)},
{ "CONNECTION", SYM(CONNECTION_SYM)},
=== modified file 'sql/share/errmsg.txt'
--- a/sql/share/errmsg.txt 2008-06-04 13:20:03 +0000
+++ b/sql/share/errmsg.txt 2008-06-25 13:30:04 +0000
@@ -6359,4 +6359,5 @@ ER_DEBUG_SYNC_TIMEOUT
ER_DEBUG_SYNC_HIT_LIMIT
eng "debug sync point hit limit reached"
ger "Debug Sync Point Hit Limit erreicht"
-
+ER_BACKUP_FAILED_TO_INIT_COMPRESSION
+ eng "Could not initialize compression of backup image (function deflateInit2 returned error code %d: %-.64s)"
=== modified file 'sql/sql_lex.h'
--- a/sql/sql_lex.h 2008-06-04 13:20:03 +0000
+++ b/sql/sql_lex.h 2008-06-25 13:30:04 +0000
@@ -1526,6 +1526,7 @@ typedef struct st_lex : public Query_tab
LEX_STRING name;
char *help_arg;
LEX_STRING backup_dir; /* For RESTORE/BACKUP */
+ bool backup_compression;
char* to_log; /* For PURGE MASTER LOGS TO */
char* x509_subject,*x509_issuer,*ssl_cipher;
String *wild;
=== modified file 'sql/sql_yacc.yy'
--- a/sql/sql_yacc.yy 2008-06-04 13:20:03 +0000
+++ b/sql/sql_yacc.yy 2008-06-25 13:30:04 +0000
@@ -684,6 +684,8 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token COMPACT_SYM
%token COMPLETION_SYM
%token COMPRESSED_SYM
+%token COMPRESSION_SYM
+%token COMPRESSION_ALGORITHM_SYM
%token CONCURRENT
%token CONDITION_SYM /* SQL-2003-N */
%token CONNECTION_SYM
@@ -1407,7 +1409,8 @@ bool my_yyoverflow(short **a, YYSTYPE **
install uninstall partition_entry binlog_base64_event
init_key_options key_options key_opts key_opt key_using_alg
server_def server_options_list server_option
- definer_opt no_definer definer
+ definer_opt no_definer definer opt_compression
+ opt_compression_algorithm
END_OF_INPUT
%type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt
@@ -6305,6 +6308,7 @@ backup:
database_list
TO_SYM
TEXT_STRING_sys
+ opt_compression
{
LEX *lex= Lex;
if (lex->sphead)
@@ -6324,6 +6328,26 @@ backup:
}
;
+opt_compression:
+ /* empty */ {}
+ | WITH COMPRESSION_SYM opt_compression_algorithm
+ {
+ Lex->backup_compression= true;
+ }
+ ;
+
+opt_compression_algorithm:
+ /* empty */ {}
+ | COMPRESSION_ALGORITHM_SYM opt_equal IDENT_sys
+ {
+ if (my_strcasecmp(system_charset_info, $3.str, "gzip"))
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "COMPRESSION_ALGORITHM");
+ MYSQL_YYABORT;
+ }
+ }
+ ;
+
database_list:
'*'
{
@@ -10933,6 +10957,8 @@ keyword_sp:
| COMPACT_SYM {}
| COMPLETION_SYM {}
| COMPRESSED_SYM {}
+ | COMPRESSION_SYM {}
+ | COMPRESSION_ALGORITHM_SYM{}
| CONCURRENT {}
| CONNECTION_SYM {}
| CONSISTENT_SYM {}
| Thread |
|---|
| • bzr commit into mysql-6.0-backup branch (svoj:2639) WL#4037 | Sergey Vojtovich | 25 Jun |