MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:antony Date:April 27 2007 10:36pm
Subject:bk commit into 5.1 tree (acurtis:1.2489) BUG#25513
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of antony. When antony does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-04-27 15:35:51-07:00, acurtis@stripped +4 -0
  Bug#25513
    "Federated: transaction failures"
    Fix case where using bulk inserts, autocommit=1 and the remote storage engine has
    single statement rollback, make the user-perceived behaviour the same where possible.
    (limitation is server memory and max packet size between server and federated server) 

  mysql-test/r/federated_transactions.result@stripped, 2007-04-27 15:35:47-07:00, acurtis@stripped +10 -0
    bug25513 - test for bulk-insert bug

  mysql-test/t/federated_transactions.test@stripped, 2007-04-27 15:35:47-07:00, acurtis@stripped +17 -0
    bug25513 - test for bulk-insert bug

  storage/federated/ha_federated.cc@stripped, 2007-04-27 15:35:47-07:00, acurtis@stripped +225 -53
    bug25513
      "federated transaction failure"
      Implement support for bulk-inserts

  storage/federated/ha_federated.h@stripped, 2007-04-27 15:35:47-07:00, acurtis@stripped +7 -0
    bug25513
      "federated transaction failure"
      Implement support for bulk-inserts

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	acurtis
# Host:	ltamd64.xiphis.org
# Root:	/home/antony/work2/p2-bug25513.1

--- 1.4/mysql-test/r/federated_transactions.result	2007-04-27 15:36:04 -07:00
+++ 1.5/mysql-test/r/federated_transactions.result	2007-04-27 15:36:04 -07:00
@@ -44,6 +44,16 @@
 6	fig
 DELETE FROM federated.t1;
 DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=InnoDB;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:SLAVE_PORT/federated/t1';
+SET autocommit=1;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;
 DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;

--- 1.7/mysql-test/t/federated_transactions.test	2007-04-27 15:36:04 -07:00
+++ 1.8/mysql-test/t/federated_transactions.test	2007-04-27 15:36:04 -07:00
@@ -37,4 +37,21 @@
 SELECT * FROM federated.t1;
 DELETE FROM federated.t1;
 
+#
+# Bug#25513 - Federated transaction failure on insert
+#
+connection slave;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=InnoDB;
+connection master;
+DROP TABLE IF EXISTS federated.t1;
+# # correct connection, same named tables
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
+SET autocommit=1;
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
 source include/federated_cleanup.inc;

--- 1.99/storage/federated/ha_federated.cc	2007-04-27 15:36:04 -07:00
+++ 1.100/storage/federated/ha_federated.cc	2007-04-27 15:36:04 -07:00
@@ -907,6 +907,9 @@
   mysql(0), stored_result(0)
 {
   trx_next= 0;
+  
+  bzero(&bulk_stmt, sizeof(bulk_stmt));
+  bzero(&bulk_array, sizeof(bulk_array));
 }
 
 
@@ -1742,6 +1745,183 @@
 
 
 /*
+  Prepares the storage engine for bulk-inserts. 
+
+  SYNOPSIS
+    start_bulk_insert()
+      rows      estimated number of rows in bulk insert or 0 if unknown.
+
+    DESCRIPTION
+      This method initializes memory structures required for bulk-inserts.
+      Invoked by ha_start_bulk_insert()
+
+    RETURN VALUE
+      N/A
+*/
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  Field **field;
+  uint initial, old_length;
+  THD *thd= current_thd;
+
+  /* The main insert query string */
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  DBUG_ENTER("ha_federated::start_bulk_insert");
+
+  /*
+    We don't bother with bulk-insert semantics when the estimated rows==1.
+    The rows value will be 0 if the server does not know how many rows
+    would be inserted. This occurs when performing INSERT-SELECT.
+  */
+  
+  if (rows == 1)
+    DBUG_VOID_RETURN;
+
+  /* this is just a sanity-cap */
+  initial= (rows > 32767) ? 32767 : (uint) rows;
+  
+  insert_string.length(0);
+  /*
+    start both our field and field values strings
+  */
+  insert_string.append(STRING_WITH_LEN("INSERT INTO `"));
+  insert_string.append(share->table_name, share->table_name_length);
+  insert_string.append(STRING_WITH_LEN("` (`"));
+  old_length= insert_string.length() - 2;
+
+  for (field= table->field; *field; field++)
+    if (bitmap_is_set(table->write_set, (*field)->field_index))
+    {
+      insert_string.append((*field)->field_name);
+      insert_string.append(STRING_WITH_LEN("`,`"));
+    }
+
+  insert_string.length(insert_string.length() - 2);
+  if (insert_string.length() != old_length)
+    insert_string.append(')');
+  insert_string.append(STRING_WITH_LEN(" VALUES "));
+
+  /*
+    We store the string in the THD memroot so that it will
+    be automatically freed later. This is ok because we only need
+    this string near the end of statement execution anyways.
+  */
+
+  bulk_stmt.str= thd->strmake(insert_string.c_ptr_quick(),
+                              insert_string.length());
+  bulk_stmt.length= insert_string.length();
+  my_init_dynamic_array(&bulk_array, sizeof(LEX_STRING), initial, 0);
+
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Transmits a single statement to the remote server.
+
+  SYNOPSIS
+    emit_bulk_insert()
+
+    DESCRIPTION
+      This method packs many rows into a single bulk-insert statement and
+      transmits it to the remote server.
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::emit_bulk_insert()
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  int rc= 0;
+  uint idx, max_bulk_query_size;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  DBUG_ASSERT(bulk_array.elements);
+  
+  /*
+    We try to limit the packet size to be less than 3/4 of the max
+    packet size but at least 1 row will be emitted per statement.
+    When the bulk-insert is split into multiple statements, then it is
+    possible for the remote server to have a partially complete insert
+    when AUTOCOMMIT=1 on the local server.
+  */
+  max_bulk_query_size= (mysql->net.max_packet_size * 3) / 4;
+
+  insert_string.length(0);
+  insert_string.append(bulk_stmt.str, bulk_stmt.length);
+  for (idx= 0; idx < bulk_array.elements; idx++)
+  {
+    LEX_STRING *row= dynamic_element(&bulk_array, idx, LEX_STRING*);
+    if (idx && (insert_string.length() + row->length > max_bulk_query_size))
+      break;
+    insert_string.append('(');
+    insert_string.append(row->str, row->length);
+    insert_string.append(STRING_WITH_LEN("),"));
+  }
+
+  /* remove trailing comma */
+  insert_string.length(insert_string.length() - 1);
+
+  if (idx >= bulk_array.elements)
+    /* mark the bulk_array as empty */
+    reset_dynamic(&bulk_array);
+  else
+  {
+    /* Here we compact the list of remaining rows to transmit */
+    uint remaining= bulk_array.elements - idx;
+    memmove(bulk_array.buffer, dynamic_array_ptr(&bulk_array, idx),
+            remaining * bulk_array.size_of_element);
+    bulk_array.elements= remaining;
+  }  
+
+  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+    rc= stash_remote_error();
+  else
+  /*
+    If the table we've just written a record to contains an auto_increment
+    field, then store the last_insert_id() value from the foreign server
+  */
+  if (table->next_number_field)
+    update_auto_increment();
+
+  DBUG_RETURN(rc);
+}
+
+
+/*
+  End bulk inserts
+
+  SYNOPSIS
+    end_bulk_insert()
+
+    DESCRIPTION
+      This method will repeatedly call emit_bulk_insert() to transmit all
+      rows into the remote server.
+      Finally, it will deinitialize the bulk-insert structures.
+      Invoked by ha_end_bulk_insert()
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::end_bulk_insert()
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  int rc= 0;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  while (!rc && bulk_array.elements)
+    rc= emit_bulk_insert();
+
+  delete_dynamic(&bulk_array);  
+  
+  DBUG_RETURN(rc);
+}
+
+
+/*
   write_row() inserts a row. No extra() hint is given currently if a bulk load
   is happeneding. buf() is a byte array of data. You can use the field
   information to extract the data from the native byte array type.
@@ -1767,52 +1947,30 @@
     in two strings
       "INSERT INTO t1 ("
       and
-      " VALUES ("
+      ""
 
     If there are fields with values, they get appended, with commas, and 
     the last loop, a trailing comma is there
 
     "INSERT INTO t1 ( col1, col2, colN, "
 
-    " VALUES ( 'val1', 'val2', 'valN', "
+    "'val1', 'val2', 'valN', "
 
     Then, if there are fields, it should decrement the string by ", " length.
 
     "INSERT INTO t1 ( col1, col2, colN"
-    " VALUES ( 'val1', 'val2', 'valN'"
+    "'val1', 'val2', 'valN'"
 
-    Then it adds a close paren to both - if there are fields
+    Then it adds a close paren to the first string - if there are fields
 
     "INSERT INTO t1 ( col1, col2, colN)"
-    " VALUES ( 'val1', 'val2', 'valN')"
+    "'val1', 'val2', 'valN'"
 
     Then appends both together
     "INSERT INTO t1 ( col1, col2, colN) VALUES ( 'val1', 'val2', 'valN')"
 
-    So... the problem, is if you have the original statement:
-
-    "INSERT INTO t1 VALUES ()"
-
-    Which is legitimate, but if the code thinks there are fields
-
-    "INSERT INTO t1 ("
-    " VALUES ( "
-
-    If the field flag is set, but there are no commas, reduces the 
-    string by strlen(", ")
-
-    "INSERT INTO t1 "
-    " VALUES "
-
-    Then adds the close parenthesis
-
-    "INSERT INTO t1  )"
-    " VALUES  )"
-
-    So, I have to use a bool as before, set in the loop where fields and commas
-    are appended to the string
   */
-  my_bool commas_added= FALSE;
+  int rc= 0;
   char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
@@ -1841,11 +1999,7 @@
   */
   insert_string.append(STRING_WITH_LEN("INSERT INTO `"));
   insert_string.append(share->table_name, share->table_name_length);
-  insert_string.append('`');
-  insert_string.append(STRING_WITH_LEN(" ("));
-
-  values_string.append(STRING_WITH_LEN(" VALUES "));
-  values_string.append(STRING_WITH_LEN(" ("));
+  insert_string.append(STRING_WITH_LEN("` (`"));
 
   /*
     loop through the field pointer array, add any fields to both the values
@@ -1855,7 +2009,6 @@
   {
     if (bitmap_is_set(table->write_set, (*field)->field_index))
     {
-      commas_added= TRUE;
       if ((*field)->is_null())
         values_string.append(STRING_WITH_LEN(" NULL "));
       else
@@ -1879,7 +2032,7 @@
         make the following appends conditional as we don't know if the
         next field is in the write set
       */
-      insert_string.append(STRING_WITH_LEN(", "));
+      insert_string.append(STRING_WITH_LEN("`,`"));
       values_string.append(STRING_WITH_LEN(", "));
     }
   }
@@ -1890,37 +2043,56 @@
     AND, we don't want to chop off the last char '('
     insert will be "INSERT INTO t1 VALUES ();"
   */
-  if (commas_added)
+  if (values_string.length())
   {
     insert_string.length(insert_string.length() - sizeof_trailing_comma);
-    /* chops off leading commas */
+    /* chops off trailing comma */
     values_string.length(values_string.length() - sizeof_trailing_comma);
     insert_string.append(STRING_WITH_LEN(") "));
   }
   else
   {
-    /* chops off trailing ) */
-    insert_string.length(insert_string.length() - sizeof_trailing_closeparen);
+    /* chops off trailing "(`" */
+    insert_string.length(insert_string.length() - 2);
   }
 
-  /* we always want to append this, even if there aren't any fields */
-  values_string.append(STRING_WITH_LEN(") "));
+  /*
+    Check if bulk-inserts are in effect
+  */  
+  if (bulk_array.buffer)
+  {
+    LEX_STRING row;
+    THD *thd= current_thd;
+    row.str= thd->strmake(values_string.ptr(), values_string.length());
+    row.length= values_string.length();
+    if (insert_dynamic(&bulk_array, (gptr) &row))
+      rc= HA_ERR_OUT_OF_MEM;
+    else
+    if (estimation_rows_to_insert == (ha_rows) bulk_array.elements)
+      rc= emit_bulk_insert();
+  }
+  else
+  {
+    insert_string.append(STRING_WITH_LEN(" VALUES ("));
 
-  /* add the values */
-  insert_string.append(values_string);
+    /* add the values */
+    insert_string.append(values_string);
 
-  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
-  {
-    DBUG_RETURN(stash_remote_error());
+    /* we always want to append this, even if there aren't any fields */
+    insert_string.append(')');
+
+    if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+      rc= stash_remote_error();
+    else
+    /*
+  If the table we've just written a record to contains an auto_increment
+  field, then store the last_insert_id() value from the foreign server
+    */
+    if (table->next_number_field)
+      update_auto_increment();
   }
-  /*
-    If the table we've just written a record to contains an auto_increment
-    field, then store the last_insert_id() value from the foreign server
-  */
-  if (table->next_number_field)
-    update_auto_increment();
 
-  DBUG_RETURN(0);
+  DBUG_RETURN(rc);
 }
 
 /*

--- 1.45/storage/federated/ha_federated.h	2007-04-27 15:36:04 -07:00
+++ 1.46/storage/federated/ha_federated.h	2007-04-27 15:36:04 -07:00
@@ -102,6 +102,13 @@
                              bool records_in_range, bool eq_range);
   int stash_remote_error();
 
+  /* The following members are for bulk-insert support */
+  LEX_STRING bulk_stmt;
+  DYNAMIC_ARRAY bulk_array;
+  void start_bulk_insert(ha_rows rows);
+  int emit_bulk_insert();
+  int end_bulk_insert();
+
 public:
   ha_federated(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_federated() {}
Thread
bk commit into 5.1 tree (acurtis:1.2489) BUG#25513antony28 Apr