MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:antony Date:June 8 2007 2:21am
Subject:bk commit into 5.1 tree (acurtis:1.2548) BUG#25513
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of antony. When antony does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-06-07 19:21:23-07:00, acurtis@stripped +4 -0
  Bug#25513
    "Federated Transactions Failure"
    Implement bulk-insert to relieve common situation where a multi-row insert
    into a transactional table would result in an inconsistant commit.
    By performing a multi-row insert into the remote server preserves the user's
    intent and offers a performance boost when inserting a large number of small rows.

  mysql-test/r/federated_transactions.result@stripped, 2007-06-07 19:21:18-07:00, acurtis@stripped +25 -0
    bug25513
      test for bug

  mysql-test/t/federated_transactions.test@stripped, 2007-06-07 19:21:18-07:00, acurtis@stripped +34 -0
    bug25513
      test for bug

  storage/federated/ha_federated.cc@stripped, 2007-06-07 19:21:18-07:00, acurtis@stripped +239 -54
    bug25513
      Implement support for bulk-inserts,
      new server variable: federated_bulk_insert (default true)
      

  storage/federated/ha_federated.h@stripped, 2007-06-07 19:21:18-07:00, acurtis@stripped +12 -0
    bug25513
      Implement support for bulk-inserts,
      

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	acurtis
# Host:	ltamd64.xiphis.org
# Root:	/home/antony/work2/p2-bug25513.5

--- 1.4/mysql-test/r/federated_transactions.result	2007-06-07 19:21:36 -07:00
+++ 1.5/mysql-test/r/federated_transactions.result	2007-06-07 19:21:36 -07:00
@@ -44,6 +44,31 @@
 6	fig
 DELETE FROM federated.t1;
 DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=MyISAM;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:SLAVE_PORT/federated/t1';
+SET autocommit=1;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+1
+2
+ALTER TABLE federated.t1 ENGINE=InnoDB;
+TRUNCATE federated.t1;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+SET SESSION federated_bulk_insert=0;
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 's1'' from FEDERATED
+SELECT * FROM federated.t1;
+s1
+1
+2
+DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;
 DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;

--- 1.7/mysql-test/t/federated_transactions.test	2007-06-07 19:21:36 -07:00
+++ 1.8/mysql-test/t/federated_transactions.test	2007-06-07 19:21:36 -07:00
@@ -37,4 +37,38 @@
 SELECT * FROM federated.t1;
 DELETE FROM federated.t1;
 
+#
+# Bug#25513 - Federated transaction failure on insert
+#
+connection slave;
+DROP TABLE IF EXISTS federated.t1;
+CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE=MyISAM;
+connection master;
+DROP TABLE IF EXISTS federated.t1;
+# # correct connection, same named tables
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval CREATE TABLE federated.t1 (s1 int, UNIQUE (s1)) ENGINE="FEDERATED"
+  CONNECTION='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
+SET autocommit=1;
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
+connection slave;
+ALTER TABLE federated.t1 ENGINE=InnoDB;
+TRUNCATE federated.t1;
+connection master;
+
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
+# turning off the bulk insert gets the old behaviour
+# until Federated learns to handle transactions properly.
+SET SESSION federated_bulk_insert=0;
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1),(2),(1);
+SELECT * FROM federated.t1;
+
+
 source include/federated_cleanup.inc;

--- 1.101/storage/federated/ha_federated.cc	2007-06-07 19:21:36 -07:00
+++ 1.102/storage/federated/ha_federated.cc	2007-06-07 19:21:36 -07:00
@@ -389,6 +389,10 @@
 static HASH federated_open_tables;              // To track open tables
 pthread_mutex_t federated_mutex;                // To init the hash
 
+static MYSQL_THDVAR_BOOL(bulk_insert, PLUGIN_VAR_OPCMDARG,
+  "Enable Federated bulk-inserts",
+  NULL, NULL, TRUE);
+
 /* Variables used when chopping off trailing characters */
 static const uint sizeof_trailing_comma= sizeof(", ") - 1;
 static const uint sizeof_trailing_closeparen= sizeof(") ") - 1;
@@ -907,6 +911,9 @@
   mysql(0), stored_result(0)
 {
   trx_next= 0;
+  
+  bzero(&bulk_stmt, sizeof(bulk_stmt));
+  bzero(&bulk_array, sizeof(bulk_array));
 }
 
 
@@ -1742,6 +1749,156 @@
 
 
 /*
+  Prepares the storage engine for bulk-inserts. 
+
+  SYNOPSIS
+    start_bulk_insert()
+      rows      estimated number of rows in bulk insert or 0 if unknown.
+
+    DESCRIPTION
+      This method initializes memory structures required for bulk-inserts.
+      Invoked by ha_start_bulk_insert()
+
+    RETURN VALUE
+      N/A
+*/
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+  DBUG_ENTER("ha_federated::start_bulk_insert");
+
+  /*
+    We don't bother with bulk-insert semantics when the estimated rows==1.
+    The rows value will be 0 if the server does not know how many rows
+    would be inserted. This can occur when performing INSERT-SELECT.
+  */
+  
+  if (rows == 1 || !THDVAR(ha_thd(), bulk_insert))
+    DBUG_VOID_RETURN;
+
+  bulk_stmt.str= NULL;
+  bulk_stmt.length= 0;
+  bulk_inserted= 0;
+  bulk_query_size= 0;
+  
+  my_init_dynamic_array(&bulk_array, sizeof(LEX_STRING), 0, 0);
+  init_alloc_root(&bulk_memroot, my_getpagesize(), my_getpagesize());
+
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Transmits a single statement to the remote server.
+
+  SYNOPSIS
+    emit_bulk_insert()
+
+    DESCRIPTION
+      This method packs many rows into a single bulk-insert statement and
+      transmits it to the remote server.
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::emit_bulk_insert()
+{
+  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+  uint max_packet_size= mysql->net.max_packet_size;
+  int rc= 0;
+  uint idx;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  DBUG_ASSERT(bulk_array.elements);
+  
+  insert_string.length(0);
+  insert_string.append(bulk_stmt.str, bulk_stmt.length);
+  insert_string.append('(');
+  
+  max_packet_size-= 64; /* for overhead */
+  
+  for (idx= 0; idx < bulk_array.elements; idx++)
+  {
+    LEX_STRING *row= dynamic_element(&bulk_array, idx, LEX_STRING*);
+    if (idx &&
+	    insert_string.length() + row->length + 4 >= max_packet_size)
+      break;
+    insert_string.append(row->str, row->length);
+    insert_string.append(STRING_WITH_LEN("),("));
+  }
+
+  /* remove trailing comma and parentheses */
+  insert_string.length(insert_string.length() - 2);
+
+  if (idx >= bulk_array.elements)
+  {
+    /* mark the bulk_array as empty */
+    reset_dynamic(&bulk_array);
+	/* release some memory */
+	free_root(&bulk_memroot, MYF(MY_KEEP_PREALLOC));
+  }
+  else
+  {
+    /* Here we compact the list of remaining rows to transmit */
+    uint remaining= bulk_array.elements - idx;
+    memmove(bulk_array.buffer, dynamic_array_ptr(&bulk_array, idx),
+            remaining * bulk_array.size_of_element);
+    bulk_array.elements= remaining;
+  }  
+
+  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+    rc= stash_remote_error();
+  else
+  /*
+    If the table we've just written a record to contains an auto_increment
+    field, then store the last_insert_id() value from the foreign server
+  */
+  if (table->next_number_field)
+    update_auto_increment();
+
+  bulk_query_size= bulk_stmt.length + 64; /* for packet overhead */
+  for (idx= 0; idx < bulk_array.elements; idx++)
+    bulk_query_size+= 
+	  dynamic_element(&bulk_array, idx, LEX_STRING*)->length + 4;
+
+  DBUG_RETURN(rc);
+}
+
+
+/*
+  End bulk inserts
+
+  SYNOPSIS
+    end_bulk_insert()
+
+    DESCRIPTION
+      This method will repeatedly call emit_bulk_insert() to transmit all
+      rows into the remote server.
+      Finally, it will deinitialize the bulk-insert structures.
+      Invoked by ha_end_bulk_insert()
+
+    RETURN VALUE
+      0    No error
+*/
+int ha_federated::end_bulk_insert()
+{
+  int rc= 0;
+  DBUG_ENTER("ha_federated::end_bulk_insert()");
+
+  if (bulk_array.buffer)
+  {
+	while (!rc && bulk_array.elements)
+      rc= emit_bulk_insert();
+
+	delete_dynamic(&bulk_array);  
+	free_root(&bulk_memroot, MYF(0));
+  }
+  
+  DBUG_RETURN(rc);
+}
+
+
+/*
   write_row() inserts a row. No extra() hint is given currently if a bulk load
   is happeneding. buf() is a byte array of data. You can use the field
   information to extract the data from the native byte array type.
@@ -1767,52 +1924,30 @@
     in two strings
       "INSERT INTO t1 ("
       and
-      " VALUES ("
+      ""
 
     If there are fields with values, they get appended, with commas, and 
     the last loop, a trailing comma is there
 
     "INSERT INTO t1 ( col1, col2, colN, "
 
-    " VALUES ( 'val1', 'val2', 'valN', "
+    "'val1', 'val2', 'valN', "
 
     Then, if there are fields, it should decrement the string by ", " length.
 
     "INSERT INTO t1 ( col1, col2, colN"
-    " VALUES ( 'val1', 'val2', 'valN'"
+    "'val1', 'val2', 'valN'"
 
-    Then it adds a close paren to both - if there are fields
+    Then it adds a close paren to the first string - if there are fields
 
     "INSERT INTO t1 ( col1, col2, colN)"
-    " VALUES ( 'val1', 'val2', 'valN')"
+    "'val1', 'val2', 'valN'"
 
     Then appends both together
     "INSERT INTO t1 ( col1, col2, colN) VALUES ( 'val1', 'val2', 'valN')"
 
-    So... the problem, is if you have the original statement:
-
-    "INSERT INTO t1 VALUES ()"
-
-    Which is legitimate, but if the code thinks there are fields
-
-    "INSERT INTO t1 ("
-    " VALUES ( "
-
-    If the field flag is set, but there are no commas, reduces the 
-    string by strlen(", ")
-
-    "INSERT INTO t1 "
-    " VALUES "
-
-    Then adds the close parenthesis
-
-    "INSERT INTO t1  )"
-    " VALUES  )"
-
-    So, I have to use a bool as before, set in the loop where fields and commas
-    are appended to the string
   */
-  my_bool commas_added= FALSE;
+  int rc= 0;
   char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
@@ -1841,11 +1976,7 @@
   */
   insert_string.append(STRING_WITH_LEN("INSERT INTO `"));
   insert_string.append(share->table_name, share->table_name_length);
-  insert_string.append('`');
-  insert_string.append(STRING_WITH_LEN(" ("));
-
-  values_string.append(STRING_WITH_LEN(" VALUES "));
-  values_string.append(STRING_WITH_LEN(" ("));
+  insert_string.append(STRING_WITH_LEN("` (`"));
 
   /*
     loop through the field pointer array, add any fields to both the values
@@ -1855,7 +1986,6 @@
   {
     if (bitmap_is_set(table->write_set, (*field)->field_index))
     {
-      commas_added= TRUE;
       if ((*field)->is_null())
         values_string.append(STRING_WITH_LEN(" NULL "));
       else
@@ -1879,7 +2009,7 @@
         make the following appends conditional as we don't know if the
         next field is in the write set
       */
-      insert_string.append(STRING_WITH_LEN(", "));
+      insert_string.append(STRING_WITH_LEN("`,`"));
       values_string.append(STRING_WITH_LEN(", "));
     }
   }
@@ -1890,37 +2020,84 @@
     AND, we don't want to chop off the last char '('
     insert will be "INSERT INTO t1 VALUES ();"
   */
-  if (commas_added)
+  if (values_string.length())
   {
     insert_string.length(insert_string.length() - sizeof_trailing_comma);
-    /* chops off leading commas */
+    /* chops off trailing comma */
     values_string.length(values_string.length() - sizeof_trailing_comma);
     insert_string.append(STRING_WITH_LEN(") "));
   }
   else
   {
-    /* chops off trailing ) */
-    insert_string.length(insert_string.length() - sizeof_trailing_closeparen);
+    /* chops off trailing "(`" */
+    insert_string.length(insert_string.length() - 2);
+  }
+
+  /*
+    Check if bulk-inserts are in effect
+  */  
+  if (bulk_array.buffer)
+  {
+    LEX_STRING row;
+    row.length= values_string.length();
+
+    if (!bulk_stmt.str)
+	{
+      insert_string.append(STRING_WITH_LEN(" VALUES "));
+	  /*
+    	We store the string in the THD memroot so that it will
+    	be automatically freed when statement is finished.
+		This is needed for each invocation of emit_bulk_insert()
+		We don't store it in the bulk_memroot as it is reset
+		each time emit_bulk_insert() is executed.
+	  */
+	  bulk_stmt.str= ha_thd()->strmake(insert_string.c_ptr_quick(),
+                                       insert_string.length());
+      bulk_stmt.length= insert_string.length();
+	  bulk_query_size+= bulk_stmt.length + 64; /* for packet overhead */
+	}
+
+	/* if the query size is going to overflow the packet, send the rows */
+	if (bulk_query_size >= mysql->net.max_packet_size - row.length)
+	  rc= emit_bulk_insert();	
+
+    while (!rc)
+	{
+      if ((row.str= strmake_root(&bulk_memroot, values_string.ptr(), 
+	                             values_string.length())) &&
+		  !insert_dynamic(&bulk_array, (uchar *) &row))
+	  {
+	    bulk_query_size+= row.length + 4;
+	    /* if we are at our last row, we might as well send them now */
+    	if (estimation_rows_to_insert == ++bulk_inserted && !rc)
+    	  rc= emit_bulk_insert();
+	    break;
+	  }
+	  rc= emit_bulk_insert();
+	}
   }
+  else
+  {
+    insert_string.append(STRING_WITH_LEN(" VALUES ("));
 
-  /* we always want to append this, even if there aren't any fields */
-  values_string.append(STRING_WITH_LEN(") "));
+    /* add the values */
+    insert_string.append(values_string);
 
-  /* add the values */
-  insert_string.append(values_string);
+    /* we always want to append this, even if there aren't any fields */
+    insert_string.append(')');
 
-  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
-  {
-    DBUG_RETURN(stash_remote_error());
+    if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+      rc= stash_remote_error();
+    else
+    /*
+  If the table we've just written a record to contains an auto_increment
+  field, then store the last_insert_id() value from the foreign server
+    */
+    if (table->next_number_field)
+      update_auto_increment();
   }
-  /*
-    If the table we've just written a record to contains an auto_increment
-    field, then store the last_insert_id() value from the foreign server
-  */
-  if (table->next_number_field)
-    update_auto_increment();
 
-  DBUG_RETURN(0);
+  DBUG_RETURN(rc);
 }
 
 /*
@@ -3074,6 +3251,14 @@
 struct st_mysql_storage_engine federated_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };
 
+
+static struct st_mysql_sys_var* federated_system_variables[]=
+{
+  MYSQL_SYSVAR(bulk_insert),
+  NULL
+};
+
+
 mysql_declare_plugin(federated)
 {
   MYSQL_STORAGE_ENGINE_PLUGIN,
@@ -3086,7 +3271,7 @@
   federated_done, /* Plugin Deinit */
   0x0100 /* 1.0 */,
   NULL,                       /* status variables                */
-  NULL,                       /* system variables                */
+  federated_system_variables, /* system variables                */
   NULL                        /* config options                  */
 }
 mysql_declare_plugin_end;

--- 1.46/storage/federated/ha_federated.h	2007-06-07 19:21:36 -07:00
+++ 1.47/storage/federated/ha_federated.h	2007-06-07 19:21:36 -07:00
@@ -102,6 +102,18 @@
                              bool records_in_range, bool eq_range);
   int stash_remote_error();
 
+  /* The following members are for bulk-insert support */
+  LEX_STRING bulk_stmt;
+  MEM_ROOT bulk_memroot;
+  DYNAMIC_ARRAY bulk_array;
+  ha_rows bulk_inserted;
+  uint bulk_query_size;
+  int emit_bulk_insert();
+
+protected:
+  void start_bulk_insert(ha_rows rows);
+  int end_bulk_insert();
+
 public:
   ha_federated(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_federated() {}
Thread
bk commit into 5.1 tree (acurtis:1.2548) BUG#25513antony8 Jun