Below is the list of changes that have just been committed into a local
5.0 repository of antony. When antony does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-06-19 23:50:17-07:00, antony@stripped +7 -0
Bug#25513
"Federared Transactions Failure"
Bug occurs when the user performs an operation which inserts more than
one row into the federated table and the federated table references a
remote table stored within a transactional storage engine. When the insert
operation for any one row in the statement fails due to constraint
violation, the federated engine is unable to perform statement rollback
and so the remote table contains a partial commit. The user would expect
a statement to perform the same so a statement rollback is expected.
This bug was fixed by implementing bulk-insert handling into the
federated storage engine. This will relieve the bug for most common
situations by enabling the generation of a multi-row insert into the
remote table and thus permitting the remote table to perform statement
rollback when neccessary.
The multi-row insert is limited to the maximum packet size between servers
and should the size overflow, more than one insert statement will be sent
and this bug will reappear.
The bulk-insert handling will offer a significant performance boost when
inserting a large number of small rows.
This patch builds on Bug29019 and Bug25511
mysql-test/r/federated.result@stripped, 2007-06-19 23:50:13-07:00, antony@stripped +2 -2
fix tests
mysql-test/r/federated_innodb.result@stripped, 2007-06-19 23:50:14-07:00, antony@stripped
+34 -0
New BitKeeper file ``mysql-test/r/federated_innodb.result''
mysql-test/r/federated_innodb.result@stripped, 2007-06-19 23:50:14-07:00, antony@stripped
+0 -0
mysql-test/t/federated.test@stripped, 2007-06-19 23:50:13-07:00, antony@stripped +3 -0
fix tests
mysql-test/t/federated_innodb-slave.opt@stripped, 2007-06-19 23:50:14-07:00,
antony@stripped +1 -0
New BitKeeper file ``mysql-test/t/federated_innodb-slave.opt''
mysql-test/t/federated_innodb-slave.opt@stripped, 2007-06-19 23:50:14-07:00,
antony@stripped +0 -0
mysql-test/t/federated_innodb.test@stripped, 2007-06-19 23:50:14-07:00, antony@stripped
+34 -0
New BitKeeper file ``mysql-test/t/federated_innodb.test''
mysql-test/t/federated_innodb.test@stripped, 2007-06-19 23:50:14-07:00, antony@stripped +0
-0
sql/ha_federated.cc@stripped, 2007-06-19 23:50:13-07:00, antony@stripped +184 -28
bug25513
new member method: append_stmt_insert()
implement bulk insert
sql/ha_federated.h@stripped, 2007-06-19 23:50:14-07:00, antony@stripped +12 -8
bug25513
new member value:
bulk_insert
new member methods:
start_bulk_insert(), end_bulk_insert(), append_stmt_insert
make member methods private:
read_next(), index_read_idx_with_result_set()
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: antony
# Host: ppcg5.local
# Root: /private/Network/Servers/anubis.xiphis.org/home/antony/work/p2-bug25513.1
--- New file ---
+++ mysql-test/r/federated_innodb.result 07/06/19 23:50:14
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
create table federated.t1 (a int primary key, b varchar(64))
engine=myisam;
create table federated.t1 (a int primary key, b varchar(64))
engine=federated
connection='mysql://root@stripped:SLAVE_PORT/federated/t1';
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Duplicate entry '1' for key 1
select * from federated.t1;
a b
1 Larry
2 Curly
truncate federated.t1;
alter table federated.t1 engine=innodb;
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Duplicate entry '1' for key 1
select * from federated.t1;
a b
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
--- New file ---
+++ mysql-test/t/federated_innodb-slave.opt 07/06/19 23:50:14
--innodb
--- New file ---
+++ mysql-test/t/federated_innodb.test 07/06/19 23:50:14
source include/federated.inc;
source include/have_innodb.inc;
#
# Bug#25513 Federated transaction failures
#
connection slave;
create table federated.t1 (a int primary key, b varchar(64))
engine=myisam;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b varchar(64))
engine=federated
connection='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
--error ER_DUP_ENTRY
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
connection slave;
truncate federated.t1;
alter table federated.t1 engine=innodb;
connection master;
--error ER_DUP_ENTRY
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
drop table federated.t1;
connection slave;
drop table federated.t1;
source include/federated_cleanup.inc;
--- 1.42/mysql-test/r/federated.result 2007-06-19 19:36:42 -07:00
+++ 1.43/mysql-test/r/federated.result 2007-06-19 23:50:13 -07:00
@@ -1847,7 +1847,7 @@
DEFAULT CHARSET=utf8;
create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
-connection='mysql://root@stripped:12002/federated/t1'
+connection='mysql://root@stripped:SLAVE_PORT/federated/t1'
DEFAULT CHARSET=utf8;
insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
@@ -1871,7 +1871,7 @@
insert into federated.t1 values (1,2,"original");
create table federated.t1 (a int primary key, b int, c varchar(64))
ENGINE=FEDERATED
-connection='mysql://root@stripped:12002/federated/t1';
+connection='mysql://root@stripped:SLAVE_PORT/federated/t1';
insert into federated.t1 values(1,3,"new") on duplicate key update b=b+100;
select * from federated.t1;
a b c
--- 1.37/mysql-test/t/federated.test 2007-06-19 19:36:43 -07:00
+++ 1.38/mysql-test/t/federated.test 2007-06-19 23:50:13 -07:00
@@ -1,4 +1,5 @@
source include/federated.inc;
+source include/have_innodb.inc;
connection slave;
DROP TABLE IF EXISTS federated.t1;
@@ -1583,6 +1584,7 @@
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
connection master;
+--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@stripped:$SLAVE_MYPORT/federated/t1'
@@ -1610,6 +1612,7 @@
create table federated.t1 (a int primary key, b int, c varchar(64));
insert into federated.t1 values (1,2,"original");
connection master;
+--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b int, c varchar(64))
ENGINE=FEDERATED
connection='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
--- 1.77/sql/ha_federated.cc 2007-06-19 19:36:43 -07:00
+++ 1.78/sql/ha_federated.cc 2007-06-19 23:50:13 -07:00
@@ -348,6 +348,7 @@
// init the hash
static int federated_init= FALSE; // Variable for checking the
// init state of hash
+static const int bulk_padding= 64; // bytes "overhead" in packet
/* Federated storage engine handlerton */
@@ -762,7 +763,9 @@
ha_federated::ha_federated(TABLE *table_arg)
:handler(&federated_hton, table_arg),
mysql(0), stored_result(0)
-{}
+{
+ bzero(&bulk_insert, sizeof(bulk_insert));
+}
/*
@@ -1570,6 +1573,82 @@
DBUG_RETURN(0);
}
+
+/**
+ @brief Construct the INSERT statement.
+
+ @details This method will construct the INSERT statement and appends it to
+ the supplied query string buffer.
+
+ @return
+ @retval FALSE No error
+ @retval TRUE Failure
+*/
+
+bool ha_federated::append_stmt_insert(String *query)
+{
+ char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+ Field **field;
+ uint tmp_length;
+
+ /* The main insert query string */
+ String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+ DBUG_ENTER("ha_federated::append_stmt_insert");
+
+ insert_string.length(0);
+
+ if (replace_duplicates && !retrieve_primary_key)
+ insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
+ else if (ignore_duplicates && !retrieve_primary_key)
+ insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
+ else
+ insert_string.append(STRING_WITH_LEN("INSERT INTO "));
+ append_ident(&insert_string, share->table_name, share->table_name_length);
+ insert_string.append(FEDERATED_OPENPAREN);
+ tmp_length= insert_string.length() - strlen(FEDERATED_COMMA);
+
+ /*
+ loop through the field pointer array, add any fields to both the values
+ list and the fields list that match the current query id
+ */
+ for (field= table->field; *field; field++)
+ {
+ /* append the field name */
+ append_ident(&insert_string, (*field)->field_name,
+ strlen((*field)->field_name));
+
+ /* append commas between both fields and fieldnames */
+ /*
+ unfortunately, we can't use the logic
+ if *(fields + 1) to make the following
+ appends conditional because we may not append
+ if the next field doesn't match the condition:
+ (((*field)->query_id && (*field)->query_id == current_query_id)
+ */
+ insert_string.append(FEDERATED_COMMA);
+ }
+
+ /*
+ remove trailing comma
+ */
+ insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
+
+ /*
+ if there were no fields, we don't want to add a closing paren
+ AND, we don't want to chop off the last char '('
+ insert will be "INSERT INTO t1 VALUES ();"
+ */
+ if (insert_string.length() > tmp_length)
+ {
+ insert_string.append(FEDERATED_CLOSEPAREN);
+ }
+
+ insert_string.append(FEDERATED_VALUES);
+
+ DBUG_RETURN(query->append(insert_string));
+}
+
+
/*
write_row() inserts a row. No extra() hint is given currently if a bulk load
is happeneding. buf() is a byte array of data. You can use the field
@@ -1586,13 +1665,13 @@
int ha_federated::write_row(byte *buf)
{
- char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
Field **field;
+ uint tmp_length;
+ int error= 0;
+ bool use_bulk_insert, have_auto_increment= table->next_number_field;
- /* The main insert query string */
- String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
/* The string containing the values to be added to the insert */
String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
/* The actual value of the field, to be added to the values_string */
@@ -1600,7 +1679,6 @@
sizeof(insert_field_value_buffer),
&my_charset_bin);
values_string.length(0);
- insert_string.length(0);
insert_field_value_string.length(0);
DBUG_ENTER("ha_federated::write_row");
@@ -1608,20 +1686,14 @@
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
+ if (!(use_bulk_insert= bulk_insert.str && !retrieve_primary_key))
+ append_stmt_insert(&values_string);
+
/*
start both our field and field values strings
*/
- if (replace_duplicates && !retrieve_primary_key)
- insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
- else if (ignore_duplicates && !retrieve_primary_key)
- insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
- else
- insert_string.append(STRING_WITH_LEN("INSERT INTO "));
- append_ident(&insert_string, share->table_name, share->table_name_length);
- insert_string.append(FEDERATED_OPENPAREN);
-
- values_string.append(FEDERATED_VALUES);
values_string.append(FEDERATED_OPENPAREN);
+ tmp_length= values_string.length();
/*
loop through the field pointer array, add any fields to both the values
@@ -1640,8 +1712,6 @@
insert_field_value_string.length(0);
}
- /* append the field name */
- insert_string.append((*field)->field_name);
/* append the value */
values_string.append(insert_field_value_string);
@@ -1655,32 +1725,52 @@
if the next field doesn't match the condition:
(((*field)->query_id && (*field)->query_id == current_query_id)
*/
- insert_string.append(FEDERATED_COMMA);
values_string.append(FEDERATED_COMMA);
}
/*
- remove trailing comma
- */
- insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
- /*
if there were no fields, we don't want to add a closing paren
AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();"
*/
- if (table->s->fields)
+ if (values_string.length() > tmp_length)
{
/* chops off leading commas */
values_string.length(values_string.length() - strlen(FEDERATED_COMMA));
- insert_string.append(FEDERATED_CLOSEPAREN);
}
/* we always want to append this, even if there aren't any fields */
values_string.append(FEDERATED_CLOSEPAREN);
- /* add the values */
- insert_string.append(values_string);
+ if (use_bulk_insert)
+ {
+ if (bulk_insert.length + values_string.length() + bulk_padding >
+ mysql->net.max_packet_size && bulk_insert.length)
+ {
+ error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length);
+ bulk_insert.length= 0;
+ }
+ else
+ have_auto_increment= FALSE;
+
+ if (bulk_insert.length == 0)
+ {
+ char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+ String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+ insert_string.length(0);
+ append_stmt_insert(&insert_string);
+ dynstr_append_mem(&bulk_insert, insert_string.ptr(), insert_string.length());
+ }
+ else
+ dynstr_append_mem(&bulk_insert, ",", 1);
- if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+ dynstr_append_mem(&bulk_insert, values_string.ptr(), values_string.length());
+ }
+ else
+ {
+ error= mysql_real_query(mysql, values_string.ptr(), values_string.length());
+ }
+
+ if (error)
{
DBUG_RETURN(stash_remote_error());
}
@@ -1688,11 +1778,77 @@
If the table we've just written a record to contains an auto_increment
field, then store the last_insert_id() value from the foreign server
*/
- if (table->next_number_field)
+ if (have_auto_increment)
update_auto_increment();
DBUG_RETURN(0);
}
+
+
+/**
+ @brief Prepares the storage engine for bulk inserts.
+
+ @param rows estimated number of rows in bulk insert or 0 if unknown.
+
+ @details Initializes memory structures required for bulk insert.
+*/
+
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+ uint page_size;
+ DBUG_ENTER("ha_federated::start_bulk_insert");
+
+ dynstr_free(&bulk_insert);
+
+ /**
+ We don't bother with bulk-insert semantics when the estimated rows == 1
+ The rows value will be 0 if the server does not know how many rows
+ would be inserted. This can occur when performing INSERT...SELECT
+ */
+
+ if (rows == 1)
+ DBUG_VOID_RETURN;
+
+ page_size= (uint) my_getpagesize();
+
+ if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
+ DBUG_VOID_RETURN;
+
+ bulk_insert.length= 0;
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief End bulk insert.
+
+ @details This method will send any remaining rows to the remote server.
+ Finally, it will deinitialize the bulk insert data structure.
+
+ @return
+ @retval 0 No error
+ @retval != 0 Error occured at remote server. Also sets my_errno.
+*/
+
+int ha_federated::end_bulk_insert()
+{
+ int error= 0;
+ DBUG_ENTER("ha_federated::end_bulk_insert");
+
+ if (bulk_insert.str && bulk_insert.length)
+ {
+ if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length))
+ error= stash_remote_error();
+ else
+ if (table->next_number_field)
+ update_auto_increment();
+ }
+
+ dynstr_free(&bulk_insert);
+
+ DBUG_RETURN(my_errno= error);
+}
+
/*
ha_federated::update_auto_increment
--- 1.32/sql/ha_federated.h 2007-06-19 19:36:43 -07:00
+++ 1.33/sql/ha_federated.h 2007-06-19 23:50:14 -07:00
@@ -156,6 +156,7 @@
char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
bool retrieve_primary_key;
bool ignore_duplicates, replace_duplicates;
+ DYNAMIC_STRING bulk_insert;
private:
/*
@@ -170,6 +171,14 @@
bool records_in_range);
int stash_remote_error();
+ bool append_stmt_insert(String *query);
+
+ int read_next(byte *buf, MYSQL_RES *result);
+ int index_read_idx_with_result_set(byte *buf, uint index,
+ const byte *key,
+ uint key_len,
+ ha_rkey_function find_flag,
+ MYSQL_RES **result);
public:
ha_federated(TABLE *table_arg);
~ha_federated()
@@ -255,6 +264,8 @@
int open(const char *name, int mode, uint test_if_locked); // required
int close(void); // required
+ void start_bulk_insert(ha_rows rows);
+ int end_bulk_insert();
int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data);
int delete_row(const byte *buf);
@@ -298,14 +309,7 @@
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); //required
- virtual bool get_error_message(int error, String *buf);
-
- int read_next(byte *buf, MYSQL_RES *result);
- int index_read_idx_with_result_set(byte *buf, uint index,
- const byte *key,
- uint key_len,
- ha_rkey_function find_flag,
- MYSQL_RES **result);
+ bool get_error_message(int error, String *buf);
};
bool federated_db_init(void);