Hi Svoj,
The code looks good. I have some requests concerning documentation and error
messages - see below.
Sergey Vojtovich wrote:
> #At file:///home/svoj/devel/mysql/WL4037/mysql-6.0-backup/
>
> 2638 Sergey Vojtovich 2008-06-16
> WL#4037 - Online backup: Use zlib compression to reduce backup
> file size
>
> This patch implements backup image compression/decompression for
> backup and restore.
> added:
> mysql-test/r/backup_compression.result
> mysql-test/t/backup_compression.test
> modified:
> sql/backup/backup_kernel.h
> sql/backup/kernel.cc
> sql/backup/stream.cc
> sql/backup/stream.h
> sql/lex.h
> sql/sql_lex.h
> sql/sql_yacc.yy
>
> per-file messages:
> mysql-test/r/backup_compression.result
> A test case for WL#4037.
> mysql-test/t/backup_compression.test
> A test case for WL#4037.
> sql/backup/backup_kernel.h
> Added compression switch argument to prepare_for_backup().
> sql/backup/kernel.cc
> Added compression switch argument to prepare_for_backup().
> Pass through compression switch from lex to Output_stream
> constructor.
> sql/backup/stream.cc
> Updated stream_write() and stream_read() functions, so they
> optionally compress/decompress stream (this is done if
> m_with_compression switch is true).
>
> As Stream::rewind() is never used, compressed stream cannot
> be rewound in this version. Updated Stream::rewind() method,
> so it returns an error when attempting to rewind compressed
> stream.
>
> Updated functions that handle backup image headers, so that
> they can read/write headers from/to compressed stream.
>
> Initialize inflate/deflate when opening Input/Output
> stream.
>
> Deinitialize inflate/deflate when closing Input/Output
> stream.
> sql/backup/stream.h
> Include zlib header.
> Added new elements to fd_stream structure:
> m_header_buf - buffer for image headers
> m_with_compression - compression switch
> zstream - zlib descriptor
> zbuf - zlib input/output buffer
>
> Added compression switch argument to Output_stream
> constructor.
> sql/lex.h
> Added "COMPRESSION" and "COMPRESSION_ALGORITHM" symbols.
> sql/sql_lex.h
> Added backup_compression switch.
> sql/sql_yacc.yy
> Extended BACKUP statement syntax according to WL#4037:
> BACKUP DATABASE ...
> [ WITH COMPRESSION [ COMPRESSION_ALGORITHM = algorithm_name ] ]
> ...;
> === added file 'mysql-test/r/backup_compression.result'
> --- a/mysql-test/r/backup_compression.result 1970-01-01 00:00:00 +0000
> +++ b/mysql-test/r/backup_compression.result 2008-06-16 15:06:09 +0000
> @@ -0,0 +1,23 @@
> +CREATE DATABASE db1;
> +CREATE TABLE db1.t1(a INT);
> +BACKUP DATABASE db1 TO 'db1.bak.gz' WITH COMPRESSION;
> +backup_id
> +#
> +CREATE TABLE db1.t2(a INT);
> +BACKUP DATABASE db1 TO 'db2.bak.gz' WITH COMPRESSION COMPRESSION_ALGORITHM=gzip;
> +backup_id
> +#
> +RESTORE FROM 'db1.bak.gz';
> +backup_id
> +#
> +SHOW TABLES FROM db1;
> +Tables_in_db1
> +t1
> +RESTORE FROM 'db2.bak.gz';
> +backup_id
> +4
> +SHOW TABLES FROM db1;
> +Tables_in_db1
> +t1
> +t2
> +DROP DATABASE db1;
>
> === added file 'mysql-test/t/backup_compression.test'
> --- a/mysql-test/t/backup_compression.test 1970-01-01 00:00:00 +0000
> +++ b/mysql-test/t/backup_compression.test 2008-06-16 15:06:09 +0000
> @@ -0,0 +1,23 @@
> +--source include/not_embedded.inc
> +--source include/have_compress.inc
> +
> +CREATE DATABASE db1;
> +
> +CREATE TABLE db1.t1(a INT);
> +replace_column 1 #;
> +BACKUP DATABASE db1 TO 'db1.bak.gz' WITH COMPRESSION;
> +
> +CREATE TABLE db1.t2(a INT);
> +replace_column 1 #;
> +BACKUP DATABASE db1 TO 'db2.bak.gz' WITH COMPRESSION COMPRESSION_ALGORITHM=gzip;
> +
> +replace_column 1 #;
> +RESTORE FROM 'db1.bak.gz';
> +SHOW TABLES FROM db1;
> +
> +RESTORE FROM 'db2.bak.gz';
> +SHOW TABLES FROM db1;
> +
> +DROP DATABASE db1;
> +--remove_file $MYSQLTEST_VARDIR/master-data/db1.bak.gz
> +--remove_file $MYSQLTEST_VARDIR/master-data/db2.bak.gz
>
> === modified file 'sql/backup/backup_kernel.h'
> --- a/sql/backup/backup_kernel.h 2008-05-05 15:06:40 +0000
> +++ b/sql/backup/backup_kernel.h 2008-06-16 15:06:09 +0000
> @@ -66,7 +66,7 @@ class Backup_restore_ctx: public backup:
> bool is_valid() const;
> ulonglong op_id() const;
>
> - Backup_info* prepare_for_backup(LEX_STRING location, const char*);
> + Backup_info* prepare_for_backup(LEX_STRING location, const char*, bool);
> Restore_info* prepare_for_restore(LEX_STRING location, const char*);
>
> int do_backup();
>
> === modified file 'sql/backup/kernel.cc'
> --- a/sql/backup/kernel.cc 2008-05-21 10:45:55 +0000
> +++ b/sql/backup/kernel.cc 2008-06-16 15:06:09 +0000
> @@ -142,7 +142,8 @@ execute_backup_command(THD *thd, LEX *le
> {
> // prepare for backup operation
>
> - Backup_info *info= context.prepare_for_backup(lex->backup_dir,
> thd->query);
> + Backup_info *info= context.prepare_for_backup(lex->backup_dir,
> thd->query,
> + lex->backup_compression);
> // reports errors
>
> if (!info || !info->is_valid())
> @@ -447,6 +448,7 @@ int Backup_restore_ctx::prepare(LEX_STRI
>
> @param[in] location path to the file where backup image should be stored
> @param[in] query BACKUP query starting the operation
> + @param[in] with_compression backup image compression switch
>
> @returns Pointer to a @c Backup_info instance which can be used for selecting
> which objects to backup. NULL if an error was detected.
> @@ -459,7 +461,8 @@ int Backup_restore_ctx::prepare(LEX_STRI
> is performed using @c do_backup() method.
> */
> Backup_info*
> -Backup_restore_ctx::prepare_for_backup(LEX_STRING location, const char *query)
> +Backup_restore_ctx::prepare_for_backup(LEX_STRING location, const char *query,
> + bool with_compression)
> {
> using namespace backup;
>
> @@ -488,7 +491,7 @@ Backup_restore_ctx::prepare_for_backup(L
> Open output stream.
> */
>
> - Output_stream *s= new Output_stream(*this, path);
> + Output_stream *s= new Output_stream(*this, path, with_compression);
>
> if (!s)
> {
>
> === modified file 'sql/backup/stream.cc'
> --- a/sql/backup/stream.cc 2008-03-04 16:06:28 +0000
> +++ b/sql/backup/stream.cc 2008-06-16 15:06:09 +0000
> @@ -3,6 +3,10 @@
> #include "backup_stream.h"
> #include "stream.h"
>
> +#ifdef HAVE_COMPRESS
> +#define ZBUF_SIZE 65536
Please add some description for the constant.
> +#endif
> +
> const unsigned char backup_magic_bytes[8]=
> {
> 0xE0, // ###.....
> @@ -23,6 +27,8 @@ namespace backup {
> Pointer to this function is stored in @c backup_stream::stream structure
> and then used by other stream library function for physical writing of
> data.
> +
> + Performs stream compression if requested.
> */
> extern "C" int stream_write(void *instance, bstream_blob *buf, bstream_blob)
> {
> @@ -46,12 +52,34 @@ extern "C" int stream_write(void *instan
> DBUG_ASSERT(buf->end);
>
> size_t howmuch = buf->end - buf->begin;
> +#ifdef HAVE_COMPRESS
> + if (s->m_with_compression)
> + {
> + z_stream *zstream= &s->zstream;
> + zstream->next_in= buf->begin;
> + zstream->avail_in= howmuch;
> + do
> + {
> + if (!zstream->avail_out)
> + {
> + if (my_write(fd, s->zbuf, ZBUF_SIZE, MYF(MY_NABP)))
> + DBUG_RETURN(BSTREAM_ERROR);
> + zstream->next_out= s->zbuf;
> + zstream->avail_out= ZBUF_SIZE;
> + }
> + if (deflate(zstream, Z_NO_FLUSH) != Z_OK)
> + DBUG_RETURN(BSTREAM_ERROR);
> + } while (zstream->avail_in);
> + }
> + else
> +#endif
> + {
> + res= my_write(fd, buf->begin, howmuch,
> + MY_NABP /* error if not all bytes written */ );
>
> - res= my_write(fd, buf->begin, howmuch,
> - MY_NABP /* error if not all bytes written */ );
> -
> - if (res)
> - DBUG_RETURN(BSTREAM_ERROR);
> + if (res)
> + DBUG_RETURN(BSTREAM_ERROR);
> + }
>
> s->bytes += howmuch;
>
> @@ -65,6 +93,8 @@ extern "C" int stream_write(void *instan
> Pointer to this function is stored in @c backup_stream::stream structure
> and then used by other stream library function for physical reading of
> data.
> +
> + Performs stream decompression if requested.
> */
> extern "C" int stream_read(void *instance, bstream_blob *buf, bstream_blob)
> {
> @@ -88,8 +118,40 @@ extern "C" int stream_read(void *instanc
> DBUG_ASSERT(buf->end);
>
> howmuch= buf->end - buf->begin;
> -
> - howmuch= my_read(fd, buf->begin, howmuch, MYF(0));
> +#ifdef HAVE_COMPRESS
> + if (s->m_with_compression)
> + {
> + int zerr;
> + z_stream *zstream= &s->zstream;
> + zstream->next_out= buf->begin;
> + zstream->avail_out= howmuch;
> + do
> + {
> + if (!zstream->avail_in)
> + {
> + zstream->avail_in= my_read(fd, s->zbuf, ZBUF_SIZE, MYF(0));
> + if (zstream->avail_in == (size_t) -1)
> + DBUG_RETURN(BSTREAM_ERROR);
> + else if (!zstream->avail_in)
> + break;
> + zstream->next_in= s->zbuf;
> + }
> + zerr= inflate(zstream, Z_NO_FLUSH);
> + if (zerr == Z_STREAM_END)
> + {
> + howmuch= zstream->next_out - buf->begin;
> + break;
> + }
> + else if (zerr != Z_OK)
> + DBUG_RETURN(BSTREAM_ERROR);
> + howmuch= zstream->next_out - buf->begin;
> + } while (zstream->avail_out);
> + }
> + else
> +#endif
> + {
> + howmuch= my_read(fd, buf->begin, howmuch, MYF(0));
> + }
>
> /*
> How to detect EOF when reading bytes with my_read().
> @@ -142,13 +204,20 @@ void Stream::close()
>
> bool Stream::rewind()
> {
> +#ifdef HAVE_COMPRESS
> + /* Compressed stream cannot be rewound */
> + if (m_with_compression)
> + return FALSE;
> +#endif
> return m_fd >= 0 && my_seek(m_fd, 0, SEEK_SET, MYF(0)) == 0;
> }
>
>
> -Output_stream::Output_stream(Logger &log, const ::String &name)
> +Output_stream::Output_stream(Logger &log, const ::String &name,
> + bool with_compression)
> :Stream(log, name, O_WRONLY|O_CREAT|O_EXCL|O_TRUNC)
> {
> + m_with_compression= with_compression;
> stream.write= stream_write;
> m_block_size=0; // use default block size provided by the backup stram library
> }
> @@ -163,7 +232,7 @@ Output_stream::Output_stream(Logger &log
> int Output_stream::write_magic_and_version()
> {
> byte buf[10];
> -
> + bstream_blob blob;
> DBUG_ASSERT(m_fd >= 0);
>
> memmove(buf, backup_magic_bytes, 8);
> @@ -171,9 +240,10 @@ int Output_stream::write_magic_and_versi
> buf[8]= 0x01;
> buf[9]= 0x00;
>
> - int ret= my_write(m_fd, buf, 10,
> - MY_NABP /* error if not all bytes written */ );
> - if (ret)
> + blob.begin= buf;
> + blob.end= buf + 10;
> + int ret= stream_write((fd_stream*)this, &blob, blob);
> + if (ret != BSTREAM_OK)
> return -1; // error when writing magic bytes
> else
> return 10;
> @@ -221,6 +291,33 @@ bool Output_stream::open()
> if (!ret)
> return FALSE;
>
> + if (m_with_compression)
> + {
> +#ifdef HAVE_COMPRESS
> + int zerr;
> + if (!(zbuf= (uchar*) my_malloc(ZBUF_SIZE, MYF(0))))
> + {
> + m_log.report_error(ER_OUTOFMEMORY, ZBUF_SIZE);
> + return FALSE;
> + }
> + zstream.zalloc= 0;
> + zstream.zfree= 0;
> + zstream.opaque= 0;
> + zstream.next_out= zbuf;
> + zstream.avail_out= ZBUF_SIZE;
> + if ((zerr= deflateInit2(&zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
> + MAX_WBITS + 16, MAX_MEM_LEVEL,
> + Z_DEFAULT_STRATEGY) != Z_OK))
> + {
> + m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "deflateInit2");
Ouch, this is not a good error message for a DBA to see. Can we change it to
something like: "Could not initialize compression of backup image (function
deflateInint2 returned error code <zerr>: <zstream.msg>)".
> + return FALSE;
> + }
> +#else
> + m_log.report_error(ER_FEATURE_DISABLED, "compression", "--with-zlib-dir");
> + return FALSE;
> +#endif
> + }
> +
> return init();
> }
>
> @@ -235,6 +332,34 @@ void Output_stream::close()
> return;
>
> bstream_close(this);
> +#ifdef HAVE_COMPRESS
> + if (m_with_compression)
> + {
> + int zerr;
> + zstream.avail_in= 0;
> + zstream.next_in= 0;
> + do
> + {
> + zerr= deflate(&zstream, Z_FINISH);
> + if (zerr != Z_STREAM_END && zerr != Z_OK)
> + {
> + m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "deflate");
> + break;
> + }
> + if (my_write(m_fd, zbuf, ZBUF_SIZE - zstream.avail_out,
> + MYF(MY_NABP)))
> + {
> + m_log.report_error(ER_GET_ERRMSG, my_errno, "", "my_write");
> + break;
> + }
> + zstream.next_out= zbuf;
> + zstream.avail_out= ZBUF_SIZE;
> + } while (zerr != Z_STREAM_END);
> + if ((zerr= deflateEnd(&zstream)) != Z_OK)
> + m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "deflateEnd");
> + my_free(zbuf, MYF(0));
> + }
> +#endif
> Stream::close();
> }
>
> @@ -274,19 +399,12 @@ Input_stream::Input_stream(Logger &log,
> */
> int Input_stream::check_magic_and_version()
> {
> - byte buf[10];
> -
> DBUG_ASSERT(m_fd >= 0);
>
> - int ret= my_read(m_fd, buf, 10,
> - MY_NABP /* error if not all bytes read */ );
> - if (ret)
> - return -1; // couldn't read magic bytes
> -
> - if (memcmp(buf, backup_magic_bytes, 8))
> + if (memcmp(m_header_buf, backup_magic_bytes, 8))
> return -1; // wrong magic bytes
>
> - unsigned int ver = buf[8] + (buf[9]<<8);
> + unsigned int ver = m_header_buf[8] + (m_header_buf[9]<<8);
>
> if (ver != 1)
> return -1; // unsupported format version
> @@ -335,6 +453,39 @@ bool Input_stream::open()
Please add notes to the documentation of this method:
This method can detect and open compressed streams. In that case stream is
initialized for decompression so that stream_read() function will return
decompressed data.
The first 10 bytes in the stream (whether compressed or not) are not available
for reading with stream_read(). Instead, they are stored in m_header_buf member
and examined by check_magic_and_version().
> if (!ret)
> return FALSE;
>
> + if (my_read(m_fd, m_header_buf, sizeof(m_header_buf),
> + MY_NABP /* error if not all bytes read */ ))
> + return FALSE;
> +
> +#ifdef HAVE_COMPRESS
> + if (!memcmp(m_header_buf, "\x1f\x8b\x08", 3))
> + {
> + int zerr;
> + bstream_blob blob;
> + if (!(zbuf= (uchar*) my_malloc(ZBUF_SIZE, MYF(0))))
> + {
> + m_log.report_error(ER_OUTOFMEMORY, ZBUF_SIZE);
> + return FALSE;
> + }
> + zstream.zalloc= 0;
> + zstream.zfree= 0;
> + zstream.opaque= 0;
> + zstream.next_in= zbuf;
> + zstream.avail_in= 10;
> + memcpy(zbuf, m_header_buf, 10);
> + if ((zerr= inflateInit2(&zstream, MAX_WBITS + 16)) != Z_OK)
> + {
> + m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "inflateInit2");
> + return FALSE;
> + }
> + blob.begin= m_header_buf;
> + blob.end= m_header_buf + 10;
> + if (stream_read((fd_stream*) this, &blob, blob) != BSTREAM_OK ||
> + blob.begin != blob.end)
> + return FALSE;
> + }
> +#endif
> +
> return init();
> }
>
> @@ -349,6 +500,15 @@ void Input_stream::close()
> return;
>
> bstream_close(this);
> +#ifdef HAVE_COMPRESS
> + if (m_with_compression)
> + {
> + int zerr;
> + if ((zerr= inflateEnd(&zstream)) != Z_OK)
> + m_log.report_error(ER_GET_ERRMSG, zerr, zstream.msg, "inflateEnd");
> + my_free(zbuf, (MYF(0)));
> + }
> +#endif
> Stream::close();
> }
>
>
> === modified file 'sql/backup/stream.h'
> --- a/sql/backup/stream.h 2008-03-04 16:06:28 +0000
> +++ b/sql/backup/stream.h 2008-06-16 15:06:09 +0000
> @@ -7,6 +7,9 @@
> #include <backup/image_info.h>
> #include <backup/debug.h> // for definition of DBUG_BACKUP
> #include <backup/logger.h>
> +#ifdef HAVE_COMPRESS
> +#include <zlib.h>
> +#endif
>
> /**
> @file
> @@ -55,6 +58,12 @@ struct fd_stream: public backup_stream
> {
> int m_fd;
> size_t bytes;
> + uchar m_header_buf[10];
> + bool m_with_compression;
> +#ifdef HAVE_COMPRESS
> + z_stream zstream;
> + uchar *zbuf;
> +#endif
>
> fd_stream() :m_fd(-1), bytes(0) {}
> };
> @@ -101,7 +110,7 @@ class Output_stream:
> {
> public:
>
> - Output_stream(Logger&, const ::String&);
> + Output_stream(Logger&, const ::String&, bool);
>
> bool open();
> void close();
>
> === modified file 'sql/lex.h'
> --- a/sql/lex.h 2008-04-01 15:13:57 +0000
> +++ b/sql/lex.h 2008-06-16 15:06:09 +0000
> @@ -119,6 +119,8 @@ static SYMBOL symbols[] = {
> { "COMPACT", SYM(COMPACT_SYM)},
> { "COMPLETION", SYM(COMPLETION_SYM)},
> { "COMPRESSED", SYM(COMPRESSED_SYM)},
> + { "COMPRESSION", SYM(COMPRESSION_SYM)},
> + { "COMPRESSION_ALGORITHM", SYM(COMPRESSION_ALGORITHM_SYM)},
> { "CONCURRENT", SYM(CONCURRENT)},
> { "CONDITION", SYM(CONDITION_SYM)},
> { "CONNECTION", SYM(CONNECTION_SYM)},
>
> === modified file 'sql/sql_lex.h'
> --- a/sql/sql_lex.h 2008-06-04 13:20:03 +0000
> +++ b/sql/sql_lex.h 2008-06-16 15:06:09 +0000
> @@ -1526,6 +1526,7 @@ typedef struct st_lex : public Query_tab
> LEX_STRING name;
> char *help_arg;
> LEX_STRING backup_dir; /* For RESTORE/BACKUP */
> + bool backup_compression;
> char* to_log; /* For PURGE MASTER LOGS TO */
> char* x509_subject,*x509_issuer,*ssl_cipher;
> String *wild;
>
> === modified file 'sql/sql_yacc.yy'
> --- a/sql/sql_yacc.yy 2008-06-04 13:20:03 +0000
> +++ b/sql/sql_yacc.yy 2008-06-16 15:06:09 +0000
> @@ -684,6 +684,8 @@ bool my_yyoverflow(short **a, YYSTYPE **
> %token COMPACT_SYM
> %token COMPLETION_SYM
> %token COMPRESSED_SYM
> +%token COMPRESSION_SYM
> +%token COMPRESSION_ALGORITHM_SYM
> %token CONCURRENT
> %token CONDITION_SYM /* SQL-2003-N */
> %token CONNECTION_SYM
> @@ -1407,7 +1409,8 @@ bool my_yyoverflow(short **a, YYSTYPE **
> install uninstall partition_entry binlog_base64_event
> init_key_options key_options key_opts key_opt key_using_alg
> server_def server_options_list server_option
> - definer_opt no_definer definer
> + definer_opt no_definer definer opt_compression
> + opt_compression_algorithm
> END_OF_INPUT
>
> %type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt
> @@ -6305,6 +6308,7 @@ backup:
> database_list
> TO_SYM
> TEXT_STRING_sys
> + opt_compression
> {
> LEX *lex= Lex;
> if (lex->sphead)
> @@ -6324,6 +6328,26 @@ backup:
> }
> ;
>
> +opt_compression:
> + /* empty */ {}
> + | WITH COMPRESSION_SYM opt_compression_algorithm
> + {
> + Lex->backup_compression= true;
> + }
> + ;
> +
> +opt_compression_algorithm:
> + /* empty */ {}
> + | COMPRESSION_ALGORITHM_SYM opt_equal IDENT_sys
> + {
> + if (my_strcasecmp(system_charset_info, $3.str, "gzip"))
> + {
> + my_error(ER_WRONG_ARGUMENTS, MYF(0), "COMPRESSION_ALGORITHM");
> + MYSQL_YYABORT;
> + }
I'm not going to request that, but I'd prefer the parser to pass whatever user
wrote to the next layer, which then will decide if the values are correct and
react accordingly. Perhaps in the future when we really support that option...
Rafal