Below is the list of changes that have just been committed into a local
5.1 repository of svoj. When svoj does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-09-29 21:47:14+05:00, svoj@april.(none) +1 -0
Merge mysql.com:/home/svoj/devel/mysql/engines/mysql-5.0-engines
into mysql.com:/home/svoj/devel/mysql/engines/mysql-5.1-engines
MERGE: 1.1810.1697.162
storage/archive/ha_archive.cc@stripped, 2006-09-29 21:47:08+05:00, svoj@april.(none) +0 -11
Use local.
MERGE: 1.60.7.2
storage/archive/ha_archive.cc@stripped, 2006-09-29 21:47:04+05:00, svoj@april.(none) +0
-0
Merge rename: sql/ha_archive.cc -> storage/archive/ha_archive.cc
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: svoj
# Host: april.(none)
# Root: /home/svoj/devel/mysql/engines/mysql-5.1-engines/RESYNC
--- 1.60.7.1/sql/ha_archive.cc 2006-09-29 21:47:20 +05:00
+++ 1.105/storage/archive/ha_archive.cc 2006-09-29 21:47:20 +05:00
@@ -19,11 +19,13 @@
#endif
#include "mysql_priv.h"
+#include <myisam.h>
-#if defined(HAVE_ARCHIVE_DB)
#include "ha_archive.h"
#include <my_dir.h>
+#include <mysql/plugin.h>
+
/*
First, if you want to understand storage engines you should look at
ha_example.cc and ha_example.h.
@@ -31,13 +33,13 @@
a storage engine without indexes that could compress data very well.
So, welcome to a completely compressed storage engine. This storage
engine only does inserts. No replace, deletes, or updates. All reads are
- complete table scans. Compression is done through gzip (bzip compresses
+ complete table scans. Compression is done through azip (bzip compresses
better, but only marginally, if someone asks I could add support for
- it too, but beaware that it costs a lot more in CPU time then gzip).
+ it too, but beaware that it costs a lot more in CPU time then azip).
We keep a file pointer open for each instance of ha_archive for each read
but for writes we keep one open file handle just for that. We flush it
- only if we have a read occur. gzip handles compressing lots of records
+ only if we have a read occur. azip handles compressing lots of records
at once much better then doing lots of little records between writes.
It is possible to not lock on writes but this would then mean we couldn't
handle bulk inserts as well (that is if someone was trying to read at
@@ -64,8 +66,7 @@
pool. For MyISAM its a question of how much the file system caches the
MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
doesn't have enough memory to cache entire table that archive turns out
- to be any faster. For writes it is always a bit slower then MyISAM. It has no
- internal limits though for row length.
+ to be any faster.
Examples between MyISAM (packed) and Archive.
@@ -82,11 +83,8 @@
TODO:
Add bzip optional support.
Allow users to set compression level.
- Add truncate table command.
Implement versioning, should be easy.
Allow for errors, find a way to mark bad rows.
- Talk to the gzip guys, come up with a writable format so that updates are doable
- without switching to a block method.
Add optional feature so that rows can be flushed at interval (which will cause less
compression but may speed up ordered searches).
Checkpoint the meta file to allow for faster rebuilds.
@@ -105,6 +103,7 @@
rows - This is an unsigned long long which is the number of rows in the data
file.
check point - Reserved for future use
+ auto increment - MAX value for autoincrement
dirty - Status of the file, whether or not its values are the latest. This
flag is what causes a repair to occur
@@ -120,55 +119,38 @@ static bool archive_inited= FALSE;
/* Variables for archive share methods */
pthread_mutex_t archive_mutex;
static HASH archive_open_tables;
-static z_off_t max_zfile_size;
-static int zoffset_size;
/* The file extension */
#define ARZ ".ARZ" // The data file
#define ARN ".ARN" // Files used during an optimize call
#define ARM ".ARM" // Meta file
/*
- uchar + uchar + ulonglong + ulonglong + uchar
+ uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + FN_REFLEN
+ + uchar
*/
-#define META_BUFFER_SIZE 19 // Size of the data used in the meta file
+#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
+ + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + FN_REFLEN \
+ + sizeof(uchar)
+
/*
uchar + uchar
*/
#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
-/*
+/* Static declarations for handerton */
+static handler *archive_create_handler(TABLE_SHARE *table, MEM_ROOT *mem_root);
+/*
Number of rows that will force a bulk insert.
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
+handlerton *archive_hton;
-
-/* dummy handlerton - only to have something to return from archive_db_init */
-handlerton archive_hton = {
- "ARCHIVE",
- SHOW_OPTION_YES,
- "Archive storage engine",
- DB_TYPE_ARCHIVE_DB,
- archive_db_init,
- 0, /* slot */
- 0, /* savepoint size. */
- NULL, /* close_connection */
- NULL, /* savepoint */
- NULL, /* rollback to savepoint */
- NULL, /* releas savepoint */
- NULL, /* commit */
- NULL, /* rollback */
- NULL, /* prepare */
- NULL, /* recover */
- NULL, /* commit_by_xid */
- NULL, /* rollback_by_xid */
- NULL, /* create_cursor_read_view */
- NULL, /* set_cursor_read_view */
- NULL, /* close_cursor_read_view */
- HTON_NO_FLAGS
-};
-
+static handler *archive_create_handler(TABLE_SHARE *table, MEM_ROOT *mem_root)
+{
+ return new (mem_root) ha_archive(table);
+}
/*
Used for hash table that tracks open tables.
@@ -186,16 +168,25 @@ static byte* archive_get_key(ARCHIVE_SHA
SYNOPSIS
archive_db_init()
- void
+ void *
RETURN
FALSE OK
TRUE Error
*/
-bool archive_db_init()
+int archive_db_init(void *p)
{
DBUG_ENTER("archive_db_init");
+ if (archive_inited)
+ DBUG_RETURN(FALSE);
+ archive_hton= (handlerton *)p;
+ archive_hton->state=SHOW_OPTION_YES;
+ archive_hton->db_type=DB_TYPE_ARCHIVE_DB;
+ archive_hton->create=archive_create_handler;
+ archive_hton->panic=archive_db_end;
+ archive_hton->flags=HTON_NO_FLAGS;
+
if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
goto error;
if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
@@ -205,23 +196,10 @@ bool archive_db_init()
}
else
{
- zoffset_size= 2 << ((zlibCompileFlags() >> 6) & 3);
- switch (sizeof(z_off_t)) {
- case 2:
- max_zfile_size= INT_MAX16;
- break;
- case 8:
- max_zfile_size= LONGLONG_MAX;
- break;
- case 4:
- default:
- max_zfile_size= INT_MAX32;
- }
archive_inited= TRUE;
DBUG_RETURN(FALSE);
}
error:
- have_archive_db= SHOW_OPTION_DISABLED; // If we couldn't use handler
DBUG_RETURN(TRUE);
}
@@ -229,14 +207,14 @@ error:
Release the archive handler.
SYNOPSIS
- archive_db_end()
+ archive_db_done()
void
RETURN
FALSE OK
*/
-bool archive_db_end()
+int archive_db_done(void *p)
{
if (archive_inited)
{
@@ -244,32 +222,37 @@ bool archive_db_end()
VOID(pthread_mutex_destroy(&archive_mutex));
}
archive_inited= 0;
- return FALSE;
+ return 0;
+}
+
+
+int archive_db_end(ha_panic_function type)
+{
+ return archive_db_done(NULL);
}
-ha_archive::ha_archive(TABLE *table_arg)
- :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0)
+ha_archive::ha_archive(TABLE_SHARE *table_arg)
+ :handler(archive_hton, table_arg), delayed_insert(0), bulk_insert(0)
{
/* Set our original buffer from pre-allocated memory */
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
/* The size of the offset value we will use for position() */
- ref_length = zoffset_size;
- DBUG_ASSERT(ref_length <= sizeof(z_off_t));
+ ref_length = sizeof(my_off_t);
}
/*
This method reads the header of a datafile and returns whether or not it was
successful.
*/
-int ha_archive::read_data_header(gzFile file_to_read)
+int ha_archive::read_data_header(azio_stream *file_to_read)
{
uchar data_buffer[DATA_BUFFER_SIZE];
DBUG_ENTER("ha_archive::read_data_header");
- if (gzrewind(file_to_read) == -1)
+ if (azrewind(file_to_read) == -1)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
- if (gzread(file_to_read, data_buffer, DATA_BUFFER_SIZE) != DATA_BUFFER_SIZE)
+ if (azread(file_to_read, data_buffer, DATA_BUFFER_SIZE) != DATA_BUFFER_SIZE)
DBUG_RETURN(errno ? errno : -1);
DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0]));
@@ -285,7 +268,7 @@ int ha_archive::read_data_header(gzFile
/*
This method writes out the header of a datafile and returns whether or not it was
successful.
*/
-int ha_archive::write_data_header(gzFile file_to_write)
+int ha_archive::write_data_header(azio_stream *file_to_write)
{
uchar data_buffer[DATA_BUFFER_SIZE];
DBUG_ENTER("ha_archive::write_data_header");
@@ -293,7 +276,7 @@ int ha_archive::write_data_header(gzFile
data_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
data_buffer[1]= (uchar)ARCHIVE_VERSION;
- if (gzwrite(file_to_write, &data_buffer, DATA_BUFFER_SIZE) !=
+ if (azwrite(file_to_write, &data_buffer, DATA_BUFFER_SIZE) !=
DATA_BUFFER_SIZE)
goto error;
DBUG_PRINT("ha_archive::write_data_header", ("Check %u", (uint)data_buffer[0]));
@@ -308,9 +291,13 @@ error:
This method reads the header of a meta file and returns whether or not it was
successful.
*rows will contain the current number of rows in the data file upon success.
*/
-int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
+int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
+ ulonglong *auto_increment,
+ ulonglong *forced_flushes,
+ char *real_path)
{
uchar meta_buffer[META_BUFFER_SIZE];
+ uchar *ptr= meta_buffer;
ulonglong check_point;
DBUG_ENTER("ha_archive::read_meta_file");
@@ -322,17 +309,30 @@ int ha_archive::read_meta_file(File meta
/*
Parse out the meta data, we ignore version at the moment
*/
- *rows= (ha_rows)uint8korr(meta_buffer + 2);
- check_point= uint8korr(meta_buffer + 10);
+
+ ptr+= sizeof(uchar)*2; // Move past header
+ *rows= (ha_rows)uint8korr(ptr);
+ ptr+= sizeof(ulonglong); // Move past rows
+ check_point= uint8korr(ptr);
+ ptr+= sizeof(ulonglong); // Move past check_point
+ *auto_increment= uint8korr(ptr);
+ ptr+= sizeof(ulonglong); // Move past auto_increment
+ *forced_flushes= uint8korr(ptr);
+ ptr+= sizeof(ulonglong); // Move past forced_flush
+ memmove(real_path, ptr, FN_REFLEN);
+ ptr+= FN_REFLEN; // Move past the possible location of the file
DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0]));
DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1]));
- DBUG_PRINT("ha_archive::read_meta_file", ("Rows %lld", *rows));
- DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %lld", check_point));
- DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)meta_buffer[18]));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Forced Flushes %llu", *forced_flushes));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Real Path %s", real_path));
+ DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) ||
- ((bool)meta_buffer[18] == TRUE))
+ ((bool)(*ptr)== TRUE))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
my_sync(meta_file, MYF(MY_WME));
@@ -345,22 +345,49 @@ int ha_archive::read_meta_file(File meta
By setting dirty you say whether or not the file represents the actual state of the
data file.
Upon ::open() we set to dirty, and upon ::close() we set to clean.
*/
-int ha_archive::write_meta_file(File meta_file, ha_rows rows, bool dirty)
+int ha_archive::write_meta_file(File meta_file, ha_rows rows,
+ ulonglong auto_increment,
+ ulonglong forced_flushes,
+ char *real_path,
+ bool dirty)
{
uchar meta_buffer[META_BUFFER_SIZE];
+ uchar *ptr= meta_buffer;
ulonglong check_point= 0; //Reserved for the future
DBUG_ENTER("ha_archive::write_meta_file");
- meta_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
- meta_buffer[1]= (uchar)ARCHIVE_VERSION;
- int8store(meta_buffer + 2, (ulonglong)rows);
- int8store(meta_buffer + 10, check_point);
- *(meta_buffer + 18)= (uchar)dirty;
- DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", (uint)ARCHIVE_CHECK_HEADER));
- DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", (uint)ARCHIVE_VERSION));
+ *ptr= (uchar)ARCHIVE_CHECK_HEADER;
+ ptr += sizeof(uchar);
+ *ptr= (uchar)ARCHIVE_VERSION;
+ ptr += sizeof(uchar);
+ int8store(ptr, (ulonglong)rows);
+ ptr += sizeof(ulonglong);
+ int8store(ptr, check_point);
+ ptr += sizeof(ulonglong);
+ int8store(ptr, auto_increment);
+ ptr += sizeof(ulonglong);
+ int8store(ptr, forced_flushes);
+ ptr += sizeof(ulonglong);
+ // No matter what, we pad with nulls
+ if (real_path)
+ strncpy((char *)ptr, real_path, FN_REFLEN);
+ else
+ bzero(ptr, FN_REFLEN);
+ ptr += FN_REFLEN;
+ *ptr= (uchar)dirty;
+ DBUG_PRINT("ha_archive::write_meta_file", ("Check %d",
+ (uint)ARCHIVE_CHECK_HEADER));
+ DBUG_PRINT("ha_archive::write_meta_file", ("Version %d",
+ (uint)ARCHIVE_VERSION));
DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows));
DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point));
+ DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu",
+ auto_increment));
+ DBUG_PRINT("ha_archive::write_meta_file", ("Forced Flushes %llu",
+ forced_flushes));
+ DBUG_PRINT("ha_archive::write_meta_file", ("Real path %s",
+ real_path));
DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty));
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
@@ -411,8 +438,10 @@ ARCHIVE_SHARE *ha_archive::get_share(con
share->table_name= tmp_name;
share->crashed= FALSE;
share->archive_write_open= FALSE;
-
fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
- fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
+ fn_format(share->data_file_name, table_name, "",
+ ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
+ fn_format(meta_file_name, table_name, "", ARM,
+ MY_REPLACE_EXT|MY_UNPACK_FILENAME);
strmov(share->table_name,table_name);
/*
We will use this lock for rows.
@@ -420,15 +449,24 @@ ARCHIVE_SHARE *ha_archive::get_share(con
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
share->crashed= TRUE;
+ DBUG_PRINT("info", ("archive opening (1) up write at %s",
+ share->data_file_name));
/*
- After we read, we set the file to dirty. When we close, we will do the
- opposite. If the meta file will not open we assume it is crashed and
- leave it up to the user to fix.
+ We read the meta file, but do not mark it dirty unless we actually do
+ a write.
*/
- if (read_meta_file(share->meta_file, &share->rows_recorded))
+ if (read_meta_file(share->meta_file, &share->rows_recorded,
+ &share->auto_increment_value,
+ &share->forced_flushes,
+ share->real_path))
share->crashed= TRUE;
-
+ /*
+ Since we now possibly no real_path, we will use it instead if it exists.
+ */
+ if (*share->real_path)
+ fn_format(share->data_file_name, share->real_path, "", ARZ,
+ MY_REPLACE_EXT|MY_UNPACK_FILENAME);
VOID(my_hash_insert(&archive_open_tables, (byte*) share));
thr_lock_init(&share->lock);
}
@@ -462,12 +500,21 @@ int ha_archive::free_share(ARCHIVE_SHARE
hash_delete(&archive_open_tables, (byte*) share);
thr_lock_delete(&share->lock);
VOID(pthread_mutex_destroy(&share->mutex));
- if (share->crashed)
- (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
- else
- (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
+ /*
+ We need to make sure we don't reset the crashed state.
+ If we open a crashed file, wee need to close it as crashed unless
+ it has been repaired.
+ Since we will close the data down after this, we go on and count
+ the flush on close;
+ */
+ share->forced_flushes++;
+ (void)write_meta_file(share->meta_file, share->rows_recorded,
+ share->auto_increment_value,
+ share->forced_flushes,
+ share->real_path,
+ share->crashed ? TRUE :FALSE);
if (share->archive_write_open)
- if (gzclose(share->archive_write) == Z_ERRNO)
+ if (azclose(&(share->archive_write)))
rc= 1;
if (my_close(share->meta_file, MYF(0)))
rc= 1;
@@ -481,21 +528,26 @@ int ha_archive::free_share(ARCHIVE_SHARE
int ha_archive::init_archive_writer()
{
DBUG_ENTER("ha_archive::init_archive_writer");
- (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
+ (void)write_meta_file(share->meta_file, share->rows_recorded,
+ share->auto_increment_value,
+ share->forced_flushes,
+ share->real_path,
+ TRUE);
/*
It is expensive to open and close the data files and since you can't have
a gzip file that can be both read and written we keep a writer open
that is shared amoung all open tables.
*/
- if ((share->archive_write= gzopen(share->data_file_name, "ab")) == NULL)
+ if (!(azopen(&(share->archive_write), share->data_file_name,
+ O_WRONLY|O_APPEND|O_BINARY)))
{
+ DBUG_PRINT("info", ("Could not open archive write file"));
share->crashed= TRUE;
DBUG_RETURN(1);
}
share->archive_write_open= TRUE;
- info(HA_STATUS_TIME);
- share->approx_file_size= data_file_length;
+
DBUG_RETURN(0);
}
@@ -526,7 +578,7 @@ int ha_archive::open(const char *name, i
int rc= 0;
DBUG_ENTER("ha_archive::open");
- DBUG_PRINT("info", ("archive table was opened for crash %s",
+ DBUG_PRINT("info", ("archive table was opened for crash: %s",
(open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
share= get_share(name, table, &rc);
@@ -542,7 +594,8 @@ int ha_archive::open(const char *name, i
thr_lock_data_init(&share->lock,&lock,NULL);
- if ((archive= gzopen(share->data_file_name, "rb")) == NULL)
+ DBUG_PRINT("info", ("archive data_file_name %s", share->data_file_name));
+ if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
{
if (errno == EROFS || errno == EACCES)
DBUG_RETURN(my_errno= errno);
@@ -583,7 +636,7 @@ int ha_archive::close(void)
DBUG_ENTER("ha_archive::close");
/* First close stream */
- if (gzclose(archive) == Z_ERRNO)
+ if (azclose(&archive))
rc= 1;
/* then also close share */
rc|= free_share(share);
@@ -609,6 +662,29 @@ int ha_archive::create(const char *name,
int error;
DBUG_ENTER("ha_archive::create");
+ stats.auto_increment_value= (create_info->auto_increment_value ?
+ create_info->auto_increment_value -1 :
+ (ulonglong) 0);
+
+ for (uint key= 0; key < table_arg->s->keys; key++)
+ {
+ KEY *pos= table_arg->key_info+key;
+ KEY_PART_INFO *key_part= pos->key_part;
+ KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
+
+ for (; key_part != key_part_end; key_part++)
+ {
+ Field *field= key_part->field;
+
+ if (!(field->flags & AUTO_INCREMENT_FLAG))
+ {
+ error= -1;
+ DBUG_PRINT("info", ("Index error in creating archive table"));
+ goto error;
+ }
+ }
+ }
+
if ((create_file= my_create(fn_format(name_buff,name,"",ARM,
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
@@ -616,43 +692,64 @@ int ha_archive::create(const char *name,
error= my_errno;
goto error;
}
- write_meta_file(create_file, 0, FALSE);
+
+ write_meta_file(create_file, 0, stats.auto_increment_value, 0,
+ (char *)create_info->data_file_name,
+ FALSE);
my_close(create_file,MYF(0));
/*
We reuse name_buff since it is available.
*/
- if ((create_file= my_create(fn_format(name_buff,name,"",ARZ,
- MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
- O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
+ if (create_info->data_file_name)
{
- error= my_errno;
- goto error;
+ char linkname[FN_REFLEN];
+ DBUG_PRINT("info", ("archive will create stream file %s",
+ create_info->data_file_name));
+
+ fn_format(name_buff, create_info->data_file_name, "", ARZ,
+ MY_REPLACE_EXT|MY_UNPACK_FILENAME);
+ fn_format(linkname, name, "", ARZ,
+ MY_UNPACK_FILENAME | MY_APPEND_EXT);
+ if ((create_file= my_create_with_symlink(linkname, name_buff, 0,
+ O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
+ {
+ error= my_errno;
+ goto error;
+ }
}
- if ((archive= gzdopen(dup(create_file), "wb")) == NULL)
+ else
+ {
+ if ((create_file= my_create(fn_format(name_buff, name,"", ARZ,
+ MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
+ O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
+ {
+ error= my_errno;
+ goto error;
+ }
+ }
+ if (!azdopen(&archive, create_file, O_WRONLY|O_BINARY))
{
error= errno;
goto error2;
}
- if (write_data_header(archive))
+ if (write_data_header(&archive))
{
error= errno;
goto error3;
}
- if (gzclose(archive))
+ if (azclose(&archive))
{
error= errno;
goto error2;
}
- my_close(create_file, MYF(0));
-
DBUG_RETURN(0);
error3:
- /* We already have an error, so ignore results of gzclose. */
- (void)gzclose(archive);
+ /* We already have an error, so ignore results of azclose. */
+ (void)azclose(&archive);
error2:
my_close(create_file, MYF(0));
delete_table(name);
@@ -664,29 +761,19 @@ error:
/*
This is where the actual row is written out.
*/
-int ha_archive::real_write_row(byte *buf, gzFile writer)
+int ha_archive::real_write_row(byte *buf, azio_stream *writer)
{
- z_off_t written, total_row_length;
+ my_off_t written;
uint *ptr, *end;
DBUG_ENTER("ha_archive::real_write_row");
- total_row_length= table->s->reclength;
- for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields;
- ptr != end; ptr++)
- total_row_length+= ((Field_blob*) table->field[*ptr])->get_length();
- if (share->approx_file_size > max_zfile_size - total_row_length)
- {
- info(HA_STATUS_TIME);
- share->approx_file_size= data_file_length;
- if (share->approx_file_size > max_zfile_size - total_row_length)
- DBUG_RETURN(HA_ERR_RECORD_FILE_FULL);
- }
- share->approx_file_size+= total_row_length;
- written= gzwrite(writer, buf, table->s->reclength);
- DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", written,
table->s->reclength));
+
+ written= azwrite(writer, buf, table->s->reclength);
+ DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d",
+ written, table->s->reclength));
if (!delayed_insert || !bulk_insert)
share->dirty= TRUE;
- if (written != (z_off_t)table->s->reclength)
+ if (written != (my_off_t)table->s->reclength)
DBUG_RETURN(errno ? errno : -1);
/*
We should probably mark the table as damagaged if the record is written
@@ -702,8 +789,8 @@ int ha_archive::real_write_row(byte *buf
if (size)
{
((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr);
- written= gzwrite(writer, data_ptr, (unsigned)size);
- if (written != (z_off_t)size)
+ written= azwrite(writer, data_ptr, (unsigned)size);
+ if (written != (my_off_t)size)
DBUG_RETURN(errno ? errno : -1);
}
}
@@ -723,15 +810,93 @@ int ha_archive::real_write_row(byte *buf
int ha_archive::write_row(byte *buf)
{
int rc;
+ byte *read_buf= NULL;
+ ulonglong temp_auto;
DBUG_ENTER("ha_archive::write_row");
if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
- statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
+ ha_statistic_increment(&SSV::ha_write_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex);
+
+ if (table->next_number_field)
+ {
+ KEY *mkey= &table->s->key_info[0]; // We only support one key right now
+ update_auto_increment();
+ temp_auto= table->next_number_field->val_int();
+
+ /*
+ Bad news, this will cause a search for the unique value which is very
+ expensive since we will have to do a table scan which will lock up
+ all other writers during this period. This could perhaps be optimized
+ in the future.
+ */
+ if (temp_auto == share->auto_increment_value &&
+ mkey->flags & HA_NOSAME)
+ {
+ rc= HA_ERR_FOUND_DUPP_KEY;
+ goto error;
+ }
+
+ if (temp_auto < share->auto_increment_value &&
+ mkey->flags & HA_NOSAME)
+ {
+ /*
+ First we create a buffer that we can use for reading rows, and can pass
+ to get_row().
+ */
+ if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
+ {
+ rc= HA_ERR_OUT_OF_MEM;
+ goto error;
+ }
+ /*
+ All of the buffer must be written out or we won't see all of the
+ data
+ */
+ azflush(&(share->archive_write), Z_SYNC_FLUSH);
+ share->forced_flushes++;
+ /*
+ Set the position of the local read thread to the beginning postion.
+ */
+ if (read_data_header(&archive))
+ {
+ rc= HA_ERR_CRASHED_ON_USAGE;
+ goto error;
+ }
+
+ /*
+ Now we read and check all of the rows.
+ if (!memcmp(table->next_number_field->ptr, mfield->ptr,
mfield->max_length()))
+ if ((longlong)temp_auto ==
+ mfield->val_int((char*)(read_buf + mfield->offset())))
+ */
+ Field *mfield= table->next_number_field;
+
+ while (!(get_row(&archive, read_buf)))
+ {
+ if (!memcmp(read_buf + mfield->offset(), table->next_number_field->ptr,
+ mfield->max_length()))
+ {
+ rc= HA_ERR_FOUND_DUPP_KEY;
+ goto error;
+ }
+ }
+ }
+ else
+ {
+ if (temp_auto > share->auto_increment_value)
+ stats.auto_increment_value= share->auto_increment_value= temp_auto;
+ }
+ }
+
+ /*
+ Notice that the global auto_increment has been increased.
+ In case of a failed row write, we will never try to reuse the value.
+ */
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
@@ -759,12 +924,114 @@ int ha_archive::write_row(byte *buf)
}
share->rows_recorded++;
- rc= real_write_row(buf, share->archive_write);
+ rc= real_write_row(buf, &(share->archive_write));
+error:
pthread_mutex_unlock(&share->mutex);
+ if (read_buf)
+ my_free((gptr) read_buf, MYF(0));
DBUG_RETURN(rc);
}
+
+void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
+ ulonglong nb_desired_values,
+ ulonglong *first_value,
+ ulonglong *nb_reserved_values)
+{
+ *nb_reserved_values= 1;
+ *first_value= share->auto_increment_value + 1;
+}
+
+/* Initialized at each key walk (called multiple times unlike rnd_init()) */
+int ha_archive::index_init(uint keynr, bool sorted)
+{
+ DBUG_ENTER("ha_archive::index_init");
+ active_index= keynr;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ No indexes, so if we get a request for an index search since we tell
+ the optimizer that we have unique indexes, we scan
+*/
+int ha_archive::index_read(byte *buf, const byte *key,
+ uint key_len, enum ha_rkey_function find_flag)
+{
+ int rc;
+ DBUG_ENTER("ha_archive::index_read");
+ rc= index_read_idx(buf, active_index, key, key_len, find_flag);
+ DBUG_RETURN(rc);
+}
+
+
+int ha_archive::index_read_idx(byte *buf, uint index, const byte *key,
+ uint key_len, enum ha_rkey_function find_flag)
+{
+ int rc= 0;
+ bool found= 0;
+ KEY *mkey= &table->s->key_info[index];
+ current_k_offset= mkey->key_part->offset;
+ current_key= key;
+ current_key_len= key_len;
+
+
+ DBUG_ENTER("ha_archive::index_read_idx");
+
+ /*
+ All of the buffer must be written out or we won't see all of the
+ data
+ */
+ pthread_mutex_lock(&share->mutex);
+ azflush(&(share->archive_write), Z_SYNC_FLUSH);
+ share->forced_flushes++;
+ pthread_mutex_unlock(&share->mutex);
+
+ /*
+ Set the position of the local read thread to the beginning postion.
+ */
+ if (read_data_header(&archive))
+ {
+ rc= HA_ERR_CRASHED_ON_USAGE;
+ goto error;
+ }
+
+ while (!(get_row(&archive, buf)))
+ {
+ if (!memcmp(current_key, buf + current_k_offset, current_key_len))
+ {
+ found= 1;
+ break;
+ }
+ }
+
+ if (found)
+ DBUG_RETURN(0);
+
+error:
+ DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
+}
+
+
+int ha_archive::index_next(byte * buf)
+{
+ bool found= 0;
+
+ DBUG_ENTER("ha_archive::index_next");
+
+ while (!(get_row(&archive, buf)))
+ {
+ if (!memcmp(current_key, buf+current_k_offset, current_key_len))
+ {
+ found= 1;
+ break;
+ }
+ }
+
+ DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE);
+}
+
/*
All calls that need to scan the table start with this method. If we are told
that it is a table scan we rewind the file to the beginning, otherwise
@@ -783,11 +1050,11 @@ int ha_archive::rnd_init(bool scan)
{
scan_rows= share->rows_recorded;
DBUG_PRINT("info", ("archive will retrieve %llu rows", scan_rows));
- records= 0;
+ stats.records= 0;
/*
If dirty, we lock, and then reset/flush the data.
- I found that just calling gzflush() doesn't always work.
+ I found that just calling azflush() doesn't always work.
*/
if (share->dirty == TRUE)
{
@@ -795,13 +1062,14 @@ int ha_archive::rnd_init(bool scan)
if (share->dirty == TRUE)
{
DBUG_PRINT("info", ("archive flushing out rows for scan"));
- gzflush(share->archive_write, Z_SYNC_FLUSH);
+ azflush(&(share->archive_write), Z_SYNC_FLUSH);
+ share->forced_flushes++;
share->dirty= FALSE;
}
pthread_mutex_unlock(&share->mutex);
}
- if (read_data_header(archive))
+ if (read_data_header(&archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
}
@@ -813,16 +1081,18 @@ int ha_archive::rnd_init(bool scan)
This is the method that is used to read a row. It assumes that the row is
positioned where you want it.
*/
-int ha_archive::get_row(gzFile file_to_read, byte *buf)
+int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
{
- int read; // Bytes read, gzread() returns int
+ int read; // Bytes read, azread() returns int
uint *ptr, *end;
char *last;
size_t total_blob_length= 0;
+ MY_BITMAP *read_set= table->read_set;
DBUG_ENTER("ha_archive::get_row");
- read= gzread(file_to_read, buf, table->s->reclength);
- DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read,
table->s->reclength));
+ read= azread(file_to_read, buf, table->s->reclength);
+ DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read,
+ table->s->reclength));
if (read == Z_STREAM_ERROR)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
@@ -842,7 +1112,11 @@ int ha_archive::get_row(gzFile file_to_r
for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
ptr != end ;
ptr++)
- total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
+ {
+ if (bitmap_is_set(read_set,
+ (((Field_blob*) table->field[*ptr])->field_index)))
+ total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
+ }
/* Adjust our row buffer if we need be */
buffer.alloc(total_blob_length);
@@ -856,11 +1130,19 @@ int ha_archive::get_row(gzFile file_to_r
size_t size= ((Field_blob*) table->field[*ptr])->get_length();
if (size)
{
- read= gzread(file_to_read, last, size);
- if ((size_t) read != size)
- DBUG_RETURN(HA_ERR_END_OF_FILE);
- ((Field_blob*) table->field[*ptr])->set_ptr(size, last);
- last += size;
+ if (bitmap_is_set(read_set,
+ ((Field_blob*) table->field[*ptr])->field_index))
+ {
+ read= azread(file_to_read, last, size);
+ if ((size_t) read != size)
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+ ((Field_blob*) table->field[*ptr])->set_ptr(size, last);
+ last += size;
+ }
+ else
+ {
+ (void)azseek(file_to_read, size, SEEK_CUR);
+ }
}
}
DBUG_RETURN(0);
@@ -884,14 +1166,13 @@ int ha_archive::rnd_next(byte *buf)
DBUG_RETURN(HA_ERR_END_OF_FILE);
scan_rows--;
- statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
- &LOCK_status);
- current_position= gztell(archive);
- rc= get_row(archive, buf);
+ ha_statistic_increment(&SSV::ha_read_rnd_next_count);
+ current_position= aztell(&archive);
+ rc= get_row(&archive, buf);
if (rc != HA_ERR_END_OF_FILE)
- records++;
+ stats.records++;
DBUG_RETURN(rc);
}
@@ -921,12 +1202,11 @@ void ha_archive::position(const byte *re
int ha_archive::rnd_pos(byte * buf, byte *pos)
{
DBUG_ENTER("ha_archive::rnd_pos");
- statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
- &LOCK_status);
- current_position= (z_off_t)my_get_ptr(pos, ref_length);
- (void)gzseek(archive, current_position, SEEK_SET);
+ ha_statistic_increment(&SSV::ha_read_rnd_next_count);
+ current_position= (my_off_t)my_get_ptr(pos, ref_length);
+ (void)azseek(&archive, current_position, SEEK_SET);
- DBUG_RETURN(get_row(archive, buf));
+ DBUG_RETURN(get_row(&archive, buf));
}
/*
@@ -954,8 +1234,8 @@ int ha_archive::repair(THD* thd, HA_CHEC
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
DBUG_ENTER("ha_archive::optimize");
- int rc;
- gzFile writer;
+ int rc= 0;
+ azio_stream writer;
char writer_filename[FN_REFLEN];
/* Open up the writer if we haven't yet */
@@ -963,13 +1243,14 @@ int ha_archive::optimize(THD* thd, HA_CH
init_archive_writer();
/* Flush any waiting data */
- gzflush(share->archive_write, Z_SYNC_FLUSH);
+ azflush(&(share->archive_write), Z_SYNC_FLUSH);
+ share->forced_flushes++;
/* Lets create a file to contain the new data */
fn_format(writer_filename, share->table_name, "", ARN,
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
- if ((writer= gzopen(writer_filename, "wb")) == NULL)
+ if (!(azopen(&writer, writer_filename, O_CREAT|O_WRONLY|O_TRUNC|O_BINARY)))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/*
@@ -979,6 +1260,7 @@ int ha_archive::optimize(THD* thd, HA_CH
if (check_opt->flags == T_EXTEND)
{
+ DBUG_PRINT("info", ("archive extended rebuild"));
byte *buf;
/*
@@ -995,14 +1277,14 @@ int ha_archive::optimize(THD* thd, HA_CH
Now we will rewind the archive file so that we are positioned at the
start of the file.
*/
- rc= read_data_header(archive);
+ rc= read_data_header(&archive);
/*
Assuming now error from rewinding the archive file, we now write out the
new header for out data file.
*/
if (!rc)
- rc= write_data_header(writer);
+ rc= write_data_header(&writer);
/*
On success of writing out the new header, we now fetch each row and
@@ -1011,9 +1293,19 @@ int ha_archive::optimize(THD* thd, HA_CH
if (!rc)
{
share->rows_recorded= 0;
- while (!(rc= get_row(archive, buf)))
+ stats.auto_increment_value= share->auto_increment_value= 0;
+ while (!(rc= get_row(&archive, buf)))
{
- real_write_row(buf, writer);
+ real_write_row(buf, &writer);
+ if (table->found_next_number_field)
+ {
+ Field *field= table->found_next_number_field;
+ ulonglong auto_value=
+ (ulonglong) field->val_int((char*)(buf + field->offset()));
+ if (share->auto_increment_value < auto_value)
+ stats.auto_increment_value= share->auto_increment_value=
+ auto_value;
+ }
share->rows_recorded++;
}
}
@@ -1025,43 +1317,53 @@ int ha_archive::optimize(THD* thd, HA_CH
}
else
{
+ DBUG_PRINT("info", ("archive quick rebuild"));
/*
The quick method is to just read the data raw, and then compress it directly.
*/
- int read; // Bytes read, gzread() returns int
+ int read; // Bytes read, azread() returns int
char block[IO_SIZE];
- if (gzrewind(archive) == -1)
+ if (azrewind(&archive) == -1)
{
rc= HA_ERR_CRASHED_ON_USAGE;
+ DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE"));
goto error;
}
- while ((read= gzread(archive, block, IO_SIZE)))
- gzwrite(writer, block, read);
+ while ((read= azread(&archive, block, IO_SIZE)) > 0)
+ azwrite(&writer, block, read);
}
- gzflush(writer, Z_SYNC_FLUSH);
+ azclose(&writer);
share->dirty= FALSE;
- gzclose(share->archive_write);
- share->archive_write= writer;
+ share->forced_flushes= 0;
+
+ // now we close both our writer and our reader for the rename
+ azclose(&(share->archive_write));
+ azclose(&archive);
- my_rename(writer_filename,share->data_file_name,MYF(0));
+ // make the file we just wrote be our data file
+ rc = my_rename(writer_filename,share->data_file_name,MYF(0));
/*
- Now we need to reopen our read descriptor since it has changed.
+ now open the shared writer back up
+ we don't check rc here because we want to open the file back up even
+ if the optimize failed but we will return rc below so that we will
+ know it failed.
+ We also need to reopen our read descriptor since it has changed.
*/
- gzclose(archive);
- if ((archive= gzopen(share->data_file_name, "rb")) == NULL)
+ DBUG_PRINT("info", ("Reopening archive data file"));
+ if (!azopen(&(share->archive_write), share->data_file_name,
+ O_WRONLY|O_APPEND|O_BINARY) ||
+ !azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))
{
+ DBUG_PRINT("info", ("Could not open archive write file"));
rc= HA_ERR_CRASHED_ON_USAGE;
- goto error;
}
-
- DBUG_RETURN(0);
-
+ DBUG_RETURN(rc);
error:
- gzclose(writer);
+ azclose(&writer);
DBUG_RETURN(rc);
}
@@ -1088,8 +1390,8 @@ THR_LOCK_DATA **ha_archive::store_lock(T
*/
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
- lock_type <= TL_WRITE) && !thd->in_lock_tables
- && !thd->tablespace_op)
+ lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
+ && !thd_tablespace_op(thd))
lock_type = TL_WRITE_ALLOW_WRITE;
/*
@@ -1100,7 +1402,7 @@ THR_LOCK_DATA **ha_archive::store_lock(T
concurrent inserts to t2.
*/
- if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
+ if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
lock_type = TL_READ;
lock.type=lock_type;
@@ -1111,6 +1413,17 @@ THR_LOCK_DATA **ha_archive::store_lock(T
return to;
}
+void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
+{
+ ha_archive::info(HA_STATUS_AUTO | HA_STATUS_CONST);
+ if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
+ {
+ create_info->auto_increment_value= stats.auto_increment_value;
+ }
+ if (*share->real_path)
+ create_info->data_file_name= share->real_path;
+}
+
/*
Hints for optimizer, see ha_tina for more information
@@ -1122,8 +1435,8 @@ void ha_archive::info(uint flag)
This should be an accurate number now, though bulk and delayed inserts can
cause the number to be inaccurate.
*/
- records= share->rows_recorded;
- deleted= 0;
+ stats.records= share->rows_recorded;
+ stats.deleted= 0;
/* Costs quite a bit more to get all information */
if (flag & HA_STATUS_TIME)
{
@@ -1131,14 +1444,17 @@ void ha_archive::info(uint flag)
VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));
- mean_rec_length= table->s->reclength + buffer.alloced_length();
- data_file_length= file_stat.st_size;
- create_time= file_stat.st_ctime;
- update_time= file_stat.st_mtime;
- max_data_file_length= share->rows_recorded * mean_rec_length;
+ stats.mean_rec_length= table->s->reclength + buffer.alloced_length();
+ stats.data_file_length= file_stat.st_size;
+ stats.create_time= file_stat.st_ctime;
+ stats.update_time= file_stat.st_mtime;
+ stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
}
- delete_length= 0;
- index_file_length=0;
+ stats.delete_length= 0;
+ stats.index_file_length=0;
+
+ if (flag & HA_STATUS_AUTO)
+ stats.auto_increment_value= share->auto_increment_value;
DBUG_VOID_RETURN;
}
@@ -1199,13 +1515,14 @@ int ha_archive::check(THD* thd, HA_CHECK
{
int rc= 0;
byte *buf;
- const char *old_proc_info=thd->proc_info;
+ const char *old_proc_info;
ha_rows count= share->rows_recorded;
DBUG_ENTER("ha_archive::check");
- thd->proc_info= "Checking table";
+ old_proc_info= thd_proc_info(thd, "Checking table");
/* Flush any waiting data */
- gzflush(share->archive_write, Z_SYNC_FLUSH);
+ azflush(&(share->archive_write), Z_SYNC_FLUSH);
+ share->forced_flushes++;
/*
First we create a buffer that we can use for reading rows, and can pass
@@ -1219,15 +1536,15 @@ int ha_archive::check(THD* thd, HA_CHECK
start of the file.
*/
if (!rc)
- read_data_header(archive);
+ read_data_header(&archive);
if (!rc)
- while (!(rc= get_row(archive, buf)))
+ while (!(rc= get_row(&archive, buf)))
count--;
my_free((char*)buf, MYF(0));
- thd->proc_info= old_proc_info;
+ thd_proc_info(thd, old_proc_info);
if ((rc && rc != HA_ERR_END_OF_FILE) || count)
{
@@ -1252,4 +1569,23 @@ bool ha_archive::check_and_repair(THD *t
DBUG_RETURN(repair(thd, &check_opt));
}
-#endif /* HAVE_ARCHIVE_DB */
+
+struct st_mysql_storage_engine archive_storage_engine=
+{ MYSQL_HANDLERTON_INTERFACE_VERSION, archive_hton };
+
+mysql_declare_plugin(archive)
+{
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ &archive_storage_engine,
+ "ARCHIVE",
+ "Brian Aker, MySQL AB",
+ "Archive storage engine",
+ archive_db_init, /* Plugin Init */
+ archive_db_done, /* Plugin Deinit */
+ 0x0100 /* 1.0 */,
+ NULL, /* status variables */
+ NULL, /* system variables */
+ NULL /* config options */
+}
+mysql_declare_plugin_end;
+
| Thread |
|---|
| • bk commit into 5.1 tree (svoj:1.2349) | Sergey Vojtovich | 29 Sep |