MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:antony Date:May 30 2007 11:43pm
Subject:bk commit into 5.1 tree (antony:1.2516) BUG#25513
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of antony. When antony does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-05-30 16:43:01-07:00, antony@stripped +4 -0
  Bug#25513
    "Federated transaction failure"
    When the user performs a bulk insert, perform a bulk insert at the
    storage engine to the remote server so that when operating on a
    table which us transactional, we do not get partially complete inserts.
    However, if the size of the query exceeds the maximum packet size
    which the remote server would accept, attempt to mark a savepoint and
    use that to make the bulk-insert statement 'atomic'.
    New system variables:
      federated-min-bulk-insert
        The minimum number of rows which is required to trigger bulk inserts.
        (When mysqld does not know how many rows will be inserted, this is
        ignored, except for when it is zero, then bulk-insert is disabled.
      federated-max-bulk-insert
        The maximum number of rows which will bulk-insert will be used.
        (When zero, it is unlimited)
    Note - Bulk-inserts will consume memory as it stores in memory
           all rows which will be inserted and would only act upon them 
           when complete.
  TODO:
    make Federated properly transaction aware.

  mysql-test/r/federated_transactions.result@stripped, 2007-05-30 16:42:57-07:00, antony@stripped +17 -0
    Bug25513
      test for bug
      We test using a transactional backing table and non-transactional,
      illustrating the disparity in the results.

  mysql-test/t/federated_transactions.test@stripped, 2007-05-30 16:42:57-07:00, antony@stripped +30 -0
    Bug25513
      test for bug
      We test using a transactional backing table and non-transactional,
      illustrating the disparity in the results.

  storage/federated/ha_federated.cc@stripped, 2007-05-30 16:42:58-07:00, antony@stripped +310 -56
    bug25513
      Implement variables min_bulk_insert and max_bulk_insert to control
      behaviour.
      Implement bulk insert.
    TODO:
      Make this storage engine properly transaction aware, using XA etc.

  storage/federated/ha_federated.h@stripped, 2007-05-30 16:42:58-07:00, antony@stripped +9 -0
    Bug25513
      New member variables and methods for bulk insert

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	antony
# Host:	ppcg5.local
# Root:	/Users/antony/Work/p2-bug25513.2

--- 1.4/mysql-test/r/federated_transactions.result	2007-05-30 16:43:11 -07:00
+++ 1.5/mysql-test/r/federated_transactions.result	2007-05-30 16:43:11 -07:00
@@ -44,6 +44,23 @@ id	name
 6	fig
 DELETE FROM federated.t1;
 DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=InnoDB;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:SLAVE_PORT/federated/t1';
+SET autocommit=1;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+ALTER TABLE federated.t1 ENGINE=MyISAM;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+1
+2
+DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;
 DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;

--- 1.7/mysql-test/t/federated_transactions.test	2007-05-30 16:43:11 -07:00
+++ 1.8/mysql-test/t/federated_transactions.test	2007-05-30 16:43:11 -07:00
@@ -37,4 +37,34 @@ INSERT INTO federated.t1 (id, name) VALU
 SELECT * FROM federated.t1;
 DELETE FROM federated.t1;
 
+#
+# Bug#25513 - Federated transaction failure on insert
+#
+connection slave;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=InnoDB;
+connection master;
+DROP TABLE IF EXISTS federated.t1;
+# # correct connection, same named tables
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
+SET autocommit=1;
+
+# t1 is backed by a transactional storage engine.
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
+connection slave;
+ALTER TABLE federated.t1 ENGINE=MyISAM;
+connection master;
+
+# t1 is not backed by a transactional storage engine.
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
+
+
 source include/federated_cleanup.inc;

--- 1.100/storage/federated/ha_federated.cc	2007-05-30 16:43:11 -07:00
+++ 1.101/storage/federated/ha_federated.cc	2007-05-30 16:43:11 -07:00
@@ -112,7 +112,14 @@
     table! You know  and have heard the screaching of audio feedback? You
     know putting two mirror in front of each other how the reflection
     continues for eternity? Well, need I say more?!
-  * There will not be support for transactions.
+  * Bulk inserts may be performed by the storage engine. This is limited to
+    the maximum packet size used for the connection. If the storage engine
+    has to split the insert operation in to multiple INSERT statements, it
+    will attempt to create a savepoint on the remote server in order to
+    preserve the illusion of atomicity. No checks are performed to verify
+    that the data integrity is good, so if the remote storage engine does
+    not have statement rollback, an incomplete insert will occur.
+  * There is currently limited support for transactions.
   * There is no way for the handler to know if the foreign database or table
     has changed. The reason for this is that this database has to work like a
     data file that would never be written to by anything other than the
@@ -389,6 +396,17 @@
 static HASH federated_open_tables;              // To track open tables
 pthread_mutex_t federated_mutex;                // To init the hash
 
+static MYSQL_THDVAR_UINT(min_bulk_insert, PLUGIN_VAR_RQCMDARG,
+  "Minimum number of rows for which Federated will use bulk inserts. "
+  "Use 0 to disable bulk inserts.",
+  NULL, NULL, 1, 0, INT_MAX24, 0);
+
+static MYSQL_THDVAR_UINT(max_bulk_insert, PLUGIN_VAR_RQCMDARG,
+  "Maximum number of rows for which Federated will use bulk inserts. "
+  "Use 0 to indicate that there is no maximum.",
+  NULL, NULL, 0, 0, INT_MAX24, 0);
+
+
 /* Variables used when chopping off trailing characters */
 static const uint sizeof_trailing_comma= sizeof(", ") - 1;
 static const uint sizeof_trailing_closeparen= sizeof(") ") - 1;
@@ -904,9 +922,13 @@ error:
 ha_federated::ha_federated(handlerton *hton,
                            TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-  mysql(0), stored_result(0)
+  mysql(0), stored_result(0), bulk_savepoint(0)
 {
   trx_next= 0;
+  
+  bzero(&bulk_root, sizeof(bulk_root));
+  bzero(&bulk_stmt, sizeof(bulk_stmt));
+  bzero(&bulk_array, sizeof(bulk_array));
 }
 
 
@@ -1742,6 +1764,241 @@ static inline uint field_in_record_is_nu
 
 
 /*
+  Prepares the storage engine for bulk-inserts. 
+
+  SYNOPSIS
+    start_bulk_insert()
+      rows      estimated number of rows in bulk insert or 0 if unknown.
+
+    DESCRIPTION
+      This method initializes memory structures required for bulk-inserts.
+      Invoked by ha_start_bulk_insert()
+
+    RETURN VALUE
+      N/A
+*/
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  Field **field;
+  uint initial= 4096, old_length;
+  THD *thd= current_thd;
+
+  /* The main insert query string */
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  DBUG_ENTER("ha_federated::start_bulk_insert");
+
+  /*
+    We don't bother with bulk-insert semantics when the estimated rows==1.
+    The rows value will be 0 if the server does not know how many rows
+    would be inserted. This occurs when performing INSERT-SELECT.
+  */
+  
+  if (rows == 1)
+    DBUG_VOID_RETURN;
+  
+  init_alloc_root(&bulk_root, 4096, 4096);
+
+  if (!THDVAR(thd, min_bulk_insert) || 
+      (rows && rows < THDVAR(thd, min_bulk_insert)) ||
+      (THDVAR(thd, max_bulk_insert) && rows > THDVAR(thd, max_bulk_insert)))
+    goto fallback;
+
+  if (rows && rows == (uint) rows)
+    initial= (uint) rows;
+
+  insert_string.length(0);
+  /*
+    start both our field and field values strings
+  */
+  insert_string.append(STRING_WITH_LEN("INSERT INTO `"));
+  insert_string.append(share->table_name, share->table_name_length);
+  insert_string.append(STRING_WITH_LEN("` (`"));
+  
+  /*
+    We subtract 2 because we are temporarily storing the expected
+    value of the string length iff no fields have the write bit
+    set in the bitmap.
+  */
+  old_length= insert_string.length() - 2;
+
+  for (field= table->field; *field; field++)
+    if (bitmap_is_set(table->write_set, (*field)->field_index))
+    {
+      insert_string.append((*field)->field_name);
+      insert_string.append(STRING_WITH_LEN("`,`"));
+    }
+
+  insert_string.length(insert_string.length() - 2);
+  if (insert_string.length() != old_length)
+    insert_string.append(')');
+  insert_string.append(STRING_WITH_LEN(" VALUES "));
+
+  /*
+    We store the string in the THD memroot so that it will
+    be automatically freed later. This is ok because we only need
+    this string near the end of statement execution anyways.
+  */
+
+  bulk_stmt.str= strmake_root(&bulk_root, insert_string.c_ptr_quick(),
+                              insert_string.length());
+  bulk_stmt.length= insert_string.length();
+  if (my_init_dynamic_array(&bulk_array, sizeof(LEX_STRING), initial, 0))
+    goto fallback;
+
+  DBUG_VOID_RETURN;
+
+fallback:
+  /* fallback to not using bulk inserts */    
+  delete_dynamic(&bulk_array);
+  free_root(&bulk_root, MYF(0));
+
+  int len= my_snprintf(insert_buffer, sizeof(insert_buffer),
+                       "SAVEPOINT stmt%x", table->query_id);
+  bulk_savepoint= strmake_root(ha_thd()->mem_root, insert_buffer, len);      
+  if (mysql_real_query(mysql, insert_buffer, len))
+    bulk_savepoint= NULL;
+
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Transmits a single statement to the remote server.
+
+  SYNOPSIS
+    emit_bulk_insert()
+
+    DESCRIPTION
+      This method packs many rows into a single bulk-insert statement and
+      transmits it to the remote server.
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::emit_bulk_insert()
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  int rc= 0;
+  uint idx, max_bulk_query_size;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  DBUG_ASSERT(bulk_array.elements);
+  
+  /*
+    We try to limit the packet size to be less than that of the max
+    packet size but at least 1 row will be emitted per statement.
+    When the bulk-insert is split into multiple statements, then it is
+    possible for the remote server to have a partially complete insert
+    when AUTOCOMMIT=1 on the local server.
+  */
+  max_bulk_query_size= mysql->net.max_packet_size - 512 /* overhead */;
+
+  insert_string.length(0);
+  insert_string.append(bulk_stmt.str, bulk_stmt.length);
+  for (idx= 0; idx < bulk_array.elements; idx++)
+  {
+    LEX_STRING *row= dynamic_element(&bulk_array, idx, LEX_STRING*);
+    if (idx && (insert_string.length() + row->length > max_bulk_query_size))
+      break;
+    insert_string.append('(');
+    insert_string.append(row->str, row->length);
+    insert_string.append(STRING_WITH_LEN("),"));
+  }
+
+  /* remove trailing comma */
+  insert_string.length(insert_string.length() - 1);
+
+  if (idx >= bulk_array.elements)
+    /* mark the bulk_array as empty */
+    reset_dynamic(&bulk_array);
+  else
+  {
+    /* Here we compact the list of remaining rows to transmit */
+    uint remaining= bulk_array.elements - idx;
+    memmove(bulk_array.buffer, dynamic_array_ptr(&bulk_array, idx),
+            remaining * bulk_array.size_of_element);
+    bulk_array.elements= remaining;
+    
+    /* put down a savepoint, we're passing the buck to the remote server */
+    
+    if (!bulk_savepoint)
+    {
+      char buffer[STRING_BUFFER_USUAL_SIZE];
+      int len= my_snprintf(buffer, STRING_BUFFER_USUAL_SIZE,
+                          "SAVEPOINT stmt%x", table->query_id);
+      bulk_savepoint= strdup_root(ha_thd()->mem_root, buffer);      
+      if (mysql_real_query(mysql, buffer, len))
+        DBUG_RETURN(stash_remote_error());
+    }
+  }  
+
+  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+    rc= stash_remote_error();
+  else
+  /*
+    If the table we've just written a record to contains an auto_increment
+    field, then store the last_insert_id() value from the foreign server
+  */
+  if (table->next_number_field)
+    update_auto_increment();
+
+  DBUG_RETURN(rc);
+}
+
+
+/*
+  End bulk inserts
+
+  SYNOPSIS
+    end_bulk_insert()
+
+    DESCRIPTION
+      This method will repeatedly call emit_bulk_insert() to transmit all
+      rows into the remote server.
+      Finally, it will deinitialize the bulk-insert structures.
+      Invoked by ha_end_bulk_insert()
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::end_bulk_insert()
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  int rc= 0;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  while (!rc && bulk_array.elements)
+    rc= emit_bulk_insert();
+
+  if (bulk_savepoint)
+  {
+    char buffer[STRING_BUFFER_USUAL_SIZE];
+    int len;
+
+    if (rc)
+    {
+      len= my_snprintf(buffer, STRING_BUFFER_USUAL_SIZE,
+                      "ROLLBACK %s", bulk_savepoint);
+      mysql_real_query(mysql, buffer, len);
+    }
+
+    len= my_snprintf(buffer, STRING_BUFFER_USUAL_SIZE,
+                    "RELEASE %s", bulk_savepoint);
+    mysql_real_query(mysql, buffer, len);
+    bulk_savepoint= NULL;
+  }
+
+  delete_dynamic(&bulk_array);  
+  free_root(&bulk_root, MYF(0));
+  
+  DBUG_RETURN(rc);
+}
+
+
+/*
   write_row() inserts a row. No extra() hint is given currently if a bulk load
   is happeneding. buf() is a byte array of data. You can use the field
   information to extract the data from the native byte array type.
@@ -1767,52 +2024,30 @@ int ha_federated::write_row(byte *buf)
     in two strings
       "INSERT INTO t1 ("
       and
-      " VALUES ("
+      ""
 
     If there are fields with values, they get appended, with commas, and 
     the last loop, a trailing comma is there
 
     "INSERT INTO t1 ( col1, col2, colN, "
 
-    " VALUES ( 'val1', 'val2', 'valN', "
+    "'val1', 'val2', 'valN', "
 
     Then, if there are fields, it should decrement the string by ", " length.
 
     "INSERT INTO t1 ( col1, col2, colN"
-    " VALUES ( 'val1', 'val2', 'valN'"
+    "'val1', 'val2', 'valN'"
 
-    Then it adds a close paren to both - if there are fields
+    Then it adds a close paren to the first string - if there are fields
 
     "INSERT INTO t1 ( col1, col2, colN)"
-    " VALUES ( 'val1', 'val2', 'valN')"
+    "'val1', 'val2', 'valN'"
 
     Then appends both together
     "INSERT INTO t1 ( col1, col2, colN) VALUES ( 'val1', 'val2', 'valN')"
 
-    So... the problem, is if you have the original statement:
-
-    "INSERT INTO t1 VALUES ()"
-
-    Which is legitimate, but if the code thinks there are fields
-
-    "INSERT INTO t1 ("
-    " VALUES ( "
-
-    If the field flag is set, but there are no commas, reduces the 
-    string by strlen(", ")
-
-    "INSERT INTO t1 "
-    " VALUES "
-
-    Then adds the close parenthesis
-
-    "INSERT INTO t1  )"
-    " VALUES  )"
-
-    So, I have to use a bool as before, set in the loop where fields and commas
-    are appended to the string
   */
-  my_bool commas_added= FALSE;
+  int rc= 0;
   char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
@@ -1841,11 +2076,7 @@ int ha_federated::write_row(byte *buf)
   */
   insert_string.append(STRING_WITH_LEN("INSERT INTO `"));
   insert_string.append(share->table_name, share->table_name_length);
-  insert_string.append('`');
-  insert_string.append(STRING_WITH_LEN(" ("));
-
-  values_string.append(STRING_WITH_LEN(" VALUES "));
-  values_string.append(STRING_WITH_LEN(" ("));
+  insert_string.append(STRING_WITH_LEN("` (`"));
 
   /*
     loop through the field pointer array, add any fields to both the values
@@ -1855,7 +2086,6 @@ int ha_federated::write_row(byte *buf)
   {
     if (bitmap_is_set(table->write_set, (*field)->field_index))
     {
-      commas_added= TRUE;
       if ((*field)->is_null())
         values_string.append(STRING_WITH_LEN(" NULL "));
       else
@@ -1879,7 +2109,7 @@ int ha_federated::write_row(byte *buf)
         make the following appends conditional as we don't know if the
         next field is in the write set
       */
-      insert_string.append(STRING_WITH_LEN(", "));
+      insert_string.append(STRING_WITH_LEN("`,`"));
       values_string.append(STRING_WITH_LEN(", "));
     }
   }
@@ -1890,37 +2120,55 @@ int ha_federated::write_row(byte *buf)
     AND, we don't want to chop off the last char '('
     insert will be "INSERT INTO t1 VALUES ();"
   */
-  if (commas_added)
+  if (values_string.length())
   {
     insert_string.length(insert_string.length() - sizeof_trailing_comma);
-    /* chops off leading commas */
+    /* chops off trailing comma */
     values_string.length(values_string.length() - sizeof_trailing_comma);
     insert_string.append(STRING_WITH_LEN(") "));
   }
   else
   {
-    /* chops off trailing ) */
-    insert_string.length(insert_string.length() - sizeof_trailing_closeparen);
+    /* chops off trailing "(`" */
+    insert_string.length(insert_string.length() - 2);
+  }
+
+  /*
+    Check if bulk-inserts are in effect
+  */  
+  if (bulk_array.buffer)
+  {
+    LEX_STRING row;
+    row.str= strmake_root(&bulk_root, values_string.ptr(), values_string.length());
+    row.length= values_string.length();
+    if (insert_dynamic(&bulk_array, (gptr) &row))
+      rc= HA_ERR_OUT_OF_MEM;
+    else
+    if (estimation_rows_to_insert == (ha_rows) bulk_array.elements)
+      rc= emit_bulk_insert();
   }
+  else
+  {
+    insert_string.append(STRING_WITH_LEN(" VALUES ("));
 
-  /* we always want to append this, even if there aren't any fields */
-  values_string.append(STRING_WITH_LEN(") "));
+    /* add the values */
+    insert_string.append(values_string);
 
-  /* add the values */
-  insert_string.append(values_string);
+    /* we always want to append this, even if there aren't any fields */
+    insert_string.append(')');
 
-  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
-  {
-    DBUG_RETURN(stash_remote_error());
+    if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+      rc= stash_remote_error();
+    else
+    /*
+  If the table we've just written a record to contains an auto_increment
+  field, then store the last_insert_id() value from the foreign server
+    */
+    if (table->next_number_field)
+      update_auto_increment();
   }
-  /*
-    If the table we've just written a record to contains an auto_increment
-    field, then store the last_insert_id() value from the foreign server
-  */
-  if (table->next_number_field)
-    update_auto_increment();
 
-  DBUG_RETURN(0);
+  DBUG_RETURN(rc);
 }
 
 /*
@@ -3074,6 +3322,12 @@ int ha_federated::execute_simple_query(c
 struct st_mysql_storage_engine federated_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };
 
+static struct st_mysql_sys_var* federated_system_variables[]= {
+  MYSQL_SYSVAR(min_bulk_insert),
+  MYSQL_SYSVAR(max_bulk_insert),
+  NULL
+};
+
 mysql_declare_plugin(federated)
 {
   MYSQL_STORAGE_ENGINE_PLUGIN,
@@ -3086,7 +3340,7 @@ mysql_declare_plugin(federated)
   federated_done, /* Plugin Deinit */
   0x0100 /* 1.0 */,
   NULL,                       /* status variables                */
-  NULL,                       /* system variables                */
+  federated_system_variables, /* system variables */
   NULL                        /* config options                  */
 }
 mysql_declare_plugin_end;

--- 1.45/storage/federated/ha_federated.h	2007-05-30 16:43:11 -07:00
+++ 1.46/storage/federated/ha_federated.h	2007-05-30 16:43:11 -07:00
@@ -102,6 +102,15 @@ private:
                              bool records_in_range, bool eq_range);
   int stash_remote_error();
 
+  /* The following members are for bulk-insert support */
+  MEM_ROOT bulk_root;
+  LEX_STRING bulk_stmt;
+  DYNAMIC_ARRAY bulk_array;
+  const char *bulk_savepoint;
+  void start_bulk_insert(ha_rows rows);
+  int emit_bulk_insert();
+  int end_bulk_insert();
+
 public:
   ha_federated(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_federated() {}
Thread
bk commit into 5.1 tree (antony:1.2516) BUG#25513antony31 May