MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Mattias Jonsson Date:January 14 2009 10:58am
Subject:bzr commit into mysql-5.1 branch (mattias.jonsson:2735)
View as plain text  
#At file:///Users/mattiasj/clones/bzrroot/wl2550-51-bugteam/ based on revid:kgeorge@stripped

 2735 Mattias Jonsson	2009-01-14
      still in development... just to share
modified:
  sql/ha_partition.cc
  sql/ha_partition.h
  sql/handler.cc
  sql/handler.h
  sql/mysqld.cc
  sql/set_var.cc
  sql/sql_base.cc
  sql/sql_class.h
  sql/sql_select.cc
  sql/sql_table.cc
  sql/table.cc
  sql/unireg.cc

=== modified file 'sql/ha_partition.cc'
--- a/sql/ha_partition.cc	2008-08-20 15:29:14 +0000
+++ b/sql/ha_partition.cc	2009-01-14 10:58:32 +0000
@@ -1643,6 +1643,22 @@ int ha_partition::copy_partitions(ulongl
       set_linear_hash_mask(m_part_info, m_part_info->no_subparts);
   }
 
+  /*
+    TODO: To parallelize it:
+          Start m_reorged_parts scanning threads
+          have some check mechanism that starts up a new thread for each
+          new needed write thread.
+          Have a global area with all partitions write_threads
+          Have a thread specific area with the current write_threads
+          when a thread needs a partition not marked in its own area
+          take a lock on the global write_thread bitmap, signal that it should
+          start a new thread for part_N, on behalf of scan thread I,
+          If that write thread is already started, just copy the global area
+          to the local thread area.
+          otherwise start it, mark it started on the global area, and copy it
+          to the thread.
+          In the future, this could be interesting statistics..
+  */
   while (reorg_part < m_reorged_parts)
   {
     handler *file= m_reorged_file[reorg_part];
@@ -1653,6 +1669,7 @@ int ha_partition::copy_partitions(ulongl
       goto error;
     while (TRUE)
     {
+      //if ((result= file->rnd_next(table->record[0])))
       if ((result= file->rnd_next(m_rec0)))
       {
         if (result == HA_ERR_RECORD_DELETED)
@@ -1674,14 +1691,15 @@ int ha_partition::copy_partitions(ulongl
            table since it doesn't fit into any partition any longer due to
            changed partitioning ranges or list values.
         */
-        deleted++;
+        (*deleted)++;
       }
       else
       {
         THD *thd= ha_thd();
         /* Copy record to new handler */
-        copied++;
+        (*copied)++;
         tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
+        //result= m_new_file[new_part]->ha_write_row(table->record[0]);
         result= m_new_file[new_part]->ha_write_row(m_rec0);
         reenable_binlog(thd);
         if (result)
@@ -2436,11 +2454,17 @@ int ha_partition::open(const char *name,
     }
   }
 
+  /* Initialise the bitmap we use to minimize ha_start_bulk_insert calls */
+  if (bitmap_init(&m_bulk_insert_started, NULL, m_tot_parts + 1, FALSE))
+    DBUG_RETURN(1);
   /* Initialise the bitmap we use to determine what partitions are used */
   if (!is_clone)
   {
     if (bitmap_init(&(m_part_info->used_partitions), NULL, m_tot_parts, TRUE))
+    {
+      bitmap_free(&m_bulk_insert_started);
       DBUG_RETURN(1);
+    }
     bitmap_set_all(&(m_part_info->used_partitions));
   }
 
@@ -2494,6 +2518,7 @@ int ha_partition::open(const char *name,
 err_handler:
   while (file-- != m_file)
     (*file)->close();
+  bitmap_free(&m_bulk_insert_started);
   if (!is_clone)
     bitmap_free(&(m_part_info->used_partitions));
 
@@ -2540,6 +2565,7 @@ int ha_partition::close(void)
   DBUG_ENTER("ha_partition::close");
 
   delete_queue(&m_queue);
+  bitmap_free(&m_bulk_insert_started);
   if (!is_clone)
     bitmap_free(&(m_part_info->used_partitions));
   file= m_file;
@@ -2831,9 +2857,12 @@ int ha_partition::write_row(uchar * buf)
   timestamp_auto_set_type orig_timestamp_type= table->timestamp_field_type;
 #ifdef NOT_NEEDED
   uchar *rec0= m_rec0;
+  //uchar *rec0= table->record[0];
 #endif
   DBUG_ENTER("ha_partition::write_row");
+  /* needed for some engines (MyISAM...) */
   DBUG_ASSERT(buf == m_rec0);
+  //DBUG_ASSERT(buf == table->record[0]);
 
   /* If we have a timestamp column, update it to the current time */
   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
@@ -2901,6 +2930,14 @@ int ha_partition::write_row(uchar * buf)
   }
   m_last_part= part_id;
   DBUG_PRINT("info", ("Insert in partition %d", part_id));
+  if (!bitmap_is_set(&m_bulk_insert_started, part_id) &&
+      bitmap_is_set(&m_bulk_insert_started, m_tot_parts))
+  {
+    m_file[part_id]->ha_start_bulk_insert(guess_bulk_insert_rows());
+    bitmap_set_bit(&m_bulk_insert_started, part_id);
+  }
+  m_bulk_inserted_rows++;
+
   tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
   error= m_file[part_id]->ha_write_row(buf);
   reenable_binlog(thd);
@@ -2975,6 +3012,13 @@ int ha_partition::update_row(const uchar
         max(set_internal_auto_increment, new_data->auto_increment)
   */
   m_last_part= new_part_id;
+  if (bitmap_is_set(&m_bulk_insert_started, m_tot_parts) &&
+      !bitmap_is_set(&m_bulk_insert_started, new_part_id))
+  {
+    m_file[new_part_id]->ha_start_bulk_insert(guess_bulk_insert_rows());
+    bitmap_set_bit(&m_bulk_insert_started, new_part_id);
+  }
+  m_bulk_inserted_rows++;
   if (new_part_id == old_part_id)
   {
     DBUG_PRINT("info", ("Update in partition %d", new_part_id));
@@ -3046,6 +3090,7 @@ int ha_partition::delete_row(const uchar
   THD *thd= ha_thd();
   DBUG_ENTER("ha_partition::delete_row");
 
+  //if ((error= get_part_for_delete(buf, table->record[0], m_part_info,
   if ((error= get_part_for_delete(buf, m_rec0, m_part_info, &part_id)))
   {
     DBUG_RETURN(error);
@@ -3109,23 +3154,58 @@ int ha_partition::delete_all_rows()
   DESCRIPTION
     rows == 0 means we will probably insert many rows
 */
-
 void ha_partition::start_bulk_insert(ha_rows rows)
 {
-  handler **file;
   DBUG_ENTER("ha_partition::start_bulk_insert");
 
-  rows= rows ? rows/m_tot_parts + 1 : 0;
-  file= m_file;
-  do
-  {
-    (*file)->ha_start_bulk_insert(rows);
-  } while (*(++file));
+  m_bulk_total_rows= rows;
+  m_bulk_inserted_rows= 0;
+  bitmap_clear_all(&m_bulk_insert_started);
+  /* use the last bit for marking if bulk_insert_started was called */
+  bitmap_set_bit(&m_bulk_insert_started, m_tot_parts);
   DBUG_VOID_RETURN;
 }
 
 
 /*
+  Try to predict the number of inserts into this partition.
+
+  If hash or key partitioning and the number of inserts so far is
+  less than twice the expected average number of rows per partition so far:
+    guess that it is evently distributed over all partitions
+  otherwise
+    guess that the number of rows decrease for each newly used partition
+    (i.e. guess is 50% for the first used partition, 33 % of the remaining
+     inserts to the second used partition, 25% of the remains to the
+     third used and so on.)
+*/ 
+ha_rows ha_partition::guess_bulk_insert_rows()
+{
+  uint call_num;
+  ha_rows average_rows_per_partition;
+  DBUG_ENTER("guess_bulk_insert_rows");
+  if (m_bulk_total_rows == 0)
+    DBUG_RETURN(0);
+  average_rows_per_partition= (m_bulk_total_rows / m_tot_parts);
+  if (!average_rows_per_partition)
+    average_rows_per_partition= 1;
+  call_num= bitmap_bits_set(&m_bulk_insert_started);
+  if (m_part_info->part_type != HASH_PARTITION ||
+      (m_bulk_inserted_rows) > ((2 * call_num) / average_rows_per_partition))
+  {
+    average_rows_per_partition= (m_bulk_total_rows - m_bulk_inserted_rows) /
+                                (call_num + 1);
+  }
+  if (!average_rows_per_partition)
+    average_rows_per_partition= 1;
+  DBUG_PRINT("info", ("tot %lu done %lu call_num %u guess %lu",
+                      m_bulk_total_rows, m_bulk_inserted_rows, call_num,
+                      average_rows_per_partition));
+  DBUG_RETURN(average_rows_per_partition);
+}
+
+
+/*
   Finish a large batch of insert rows
 
   SYNOPSIS
@@ -3139,16 +3219,18 @@ void ha_partition::start_bulk_insert(ha_
 int ha_partition::end_bulk_insert()
 {
   int error= 0;
-  handler **file;
+  uint i;
   DBUG_ENTER("ha_partition::end_bulk_insert");
 
-  file= m_file;
-  do
+  DBUG_ASSERT(bitmap_is_set(&m_bulk_insert_started, m_tot_parts));
+  for (i= 0; i < m_tot_parts; i++)
   {
     int tmp;
-    if ((tmp= (*file)->ha_end_bulk_insert()))
+    if (bitmap_is_set(&m_bulk_insert_started, i) &&
+        (tmp= m_file[i]->ha_end_bulk_insert()))
       error= tmp;
-  } while (*(++file));
+  }
+  bitmap_clear_all(&m_bulk_insert_started);
   DBUG_RETURN(error);
 }
 
@@ -3356,6 +3438,7 @@ int ha_partition::rnd_next(uchar *buf)
   
   while (TRUE)
   {
+    DBUG_PRINT("info", ("rnd_next on partition %d", part_id));
     int result= file->rnd_next(buf);
     if (!result)
     {
@@ -3516,7 +3599,9 @@ int ha_partition::rnd_pos_by_record(ucha
 {
   DBUG_ENTER("ha_partition::rnd_pos_by_record");
 
-  if (unlikely(get_part_for_delete(record, m_rec0, m_part_info, &m_last_part)))
+  //if (unlikely(get_part_for_delete(record, table->record[0], m_part_info,
+  if (unlikely(get_part_for_delete(record, m_rec0, m_part_info,
+                                   &m_last_part)))
     DBUG_RETURN(1);
 
   DBUG_RETURN(handler::rnd_pos_by_record(record));
@@ -3998,11 +4083,12 @@ int ha_partition::read_range_first(const
       m_index_scan_type= partition_index_first;
     else
       m_index_scan_type= partition_index_first_unordered;
-    error= common_first_last(m_rec0);
+    error= common_first_last(table->record[0]);
   }
   else
   {
     m_index_scan_type= partition_index_read;
+    //error= common_index_read(table->record[0],
     error= common_index_read(m_rec0,
 			     start_key->key,
                              start_key->keypart_map, start_key->flag);
@@ -4030,7 +4116,7 @@ int ha_partition::read_range_next()
   {
     DBUG_RETURN(handler::read_range_next());
   }
-  DBUG_RETURN(handle_unordered_next(m_rec0, eq_range));
+  DBUG_RETURN(handle_unordered_next(table->record[0], eq_range));
 }
 
 
@@ -4190,6 +4276,7 @@ int ha_partition::handle_unordered_next(
 int ha_partition::handle_unordered_scan_next_partition(uchar * buf)
 {
   uint i;
+  uchar *old_rec0;
   DBUG_ENTER("ha_partition::handle_unordered_scan_next_partition");
 
   for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
@@ -4220,9 +4307,11 @@ int ha_partition::handle_unordered_scan_
         order.
       */
       DBUG_PRINT("info", ("read_range_first on partition %d", i));
+      //old_rec0= table->record[0];
       table->record[0]= buf;
       error= file->read_range_first(0, end_range, eq_range, 0);
       table->record[0]= m_rec0;
+      //table->record[0]= old_rec0;
       break;
     default:
       DBUG_ASSERT(FALSE);

=== modified file 'sql/ha_partition.h'
--- a/sql/ha_partition.h	2008-08-11 18:06:08 +0000
+++ b/sql/ha_partition.h	2009-01-14 10:58:32 +0000
@@ -141,6 +141,12 @@ private:
     "own" the m_part_info structure.
   */
   bool is_clone;
+  /** for support of delayed open of partitions */
+  MY_BITMAP m_opened_parts;
+  /** For optimizing ha_start_bulk_insert calls */
+  MY_BITMAP m_bulk_insert_started;
+  ha_rows   m_bulk_total_rows;
+  ha_rows   m_bulk_inserted_rows;
 public:
   handler *clone(MEM_ROOT *mem_root);
   virtual void set_part_info(partition_info *part_info)
@@ -308,7 +314,6 @@ public:
     Bulk inserts are supported if all underlying handlers support it.
     start_bulk_insert and end_bulk_insert is called before and after a
     number of calls to write_row.
-    Not yet though.
   */
   virtual int write_row(uchar * buf);
   virtual int update_row(const uchar * old_data, uchar * new_data);
@@ -316,6 +321,9 @@ public:
   virtual int delete_all_rows(void);
   virtual void start_bulk_insert(ha_rows rows);
   virtual int end_bulk_insert();
+private:
+  ha_rows guess_bulk_insert_rows();
+public:
 
   virtual bool is_fatal_error(int error, uint flags)
   {

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2008-08-12 10:26:23 +0000
+++ b/sql/handler.cc	2009-01-14 10:58:32 +0000
@@ -3566,7 +3566,7 @@ int ha_resize_key_cache(KEY_CACHE *key_c
 
 
 /**
-  Change parameters for key cache (like size)
+  Change parameters for key cache (like age_threshold/division_limit)
 */
 int ha_change_key_cache_param(KEY_CACHE *key_cache)
 {

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2008-08-11 18:02:03 +0000
+++ b/sql/handler.h	2009-01-14 10:58:32 +0000
@@ -229,6 +229,7 @@
 #define HA_ABORT_IF_LOCKED	128	/* skip if locked on open.*/
 #define HA_BLOCK_LOCK		256	/* unlock when reading some records */
 #define HA_OPEN_TEMPORARY	512
+#define HA_OPEN_WAIT_FOR_USAGE  1024    /* delayed open for partitions */
 
 	/* Some key definitions */
 #define HA_KEY_NULL_LENGTH	1

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-08-26 09:31:17 +0000
+++ b/sql/mysqld.cc	2009-01-14 10:58:32 +0000
@@ -5536,7 +5536,8 @@ enum options_mysqld
   OPT_MIN_EXAMINED_ROW_LIMIT,
   OPT_LOG_SLOW_SLAVE_STATEMENTS,
   OPT_OLD_MODE,
-  OPT_SLAVE_EXEC_MODE
+  OPT_SLAVE_EXEC_MODE,
+  OPT_ALTER_THREADS
 };
 
 
@@ -5935,6 +5936,21 @@ master-ssl",
 #endif /* HAVE_REPLICATION */
   {"memlock", OPT_MEMLOCK, "Lock mysqld in memory.", (uchar**) &locked_in_memory,
    (uchar**) &locked_in_memory, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+  {"max-query-memory", OPT_ALTER_THREADS,
+   "Max extra memory allocation allowed for parallel query execution (MB)",
+   (uchar**) &global_system_variables.max_query_memory,
+   (uchar**) &max_system_variables.max_query_memory, 0, GET_UINT,
+   OPT_ARG, 1, 1, 64 * 1024 - 1, 0, 1, 0 },
+  {"max-query-threads", OPT_ALTER_THREADS,
+   "Max number of parallel threads to use in a query",
+   (uchar**) &global_system_variables.max_query_threads,
+   (uchar**) &max_system_variables.max_query_threads, 0, GET_UINT,
+   OPT_ARG, 1, 1, 4096, 0, 1, 0 },
+  {"query-thread-bias", OPT_ALTER_THREADS,
+   "Percentage of threads in read operation (vs write)",
+   (uchar**) &global_system_variables.query_thread_bias,
+   (uchar**) &max_system_variables.query_thread_bias, 0, GET_UINT,
+   OPT_ARG, 50, 1, 99, 0, 1, 0 },
   {"myisam-recover", OPT_MYISAM_RECOVER,
    "Syntax: myisam-recover[=option[,option...]], where option can be DEFAULT, BACKUP, FORCE or QUICK.",
    (uchar**) &myisam_recover_options_str, (uchar**) &myisam_recover_options_str, 0,

=== modified file 'sql/set_var.cc'
--- a/sql/set_var.cc	2008-08-25 12:44:05 +0000
+++ b/sql/set_var.cc	2009-01-14 10:58:32 +0000
@@ -331,6 +331,15 @@ static sys_var_thd_ulong	sys_max_seeks_f
 					      &SV::max_seeks_for_key);
 static sys_var_thd_ulong   sys_max_length_for_sort_data(&vars, "max_length_for_sort_data",
                                                  &SV::max_length_for_sort_data);
+static sys_var_thd_ulong sys_max_query_threads(&vars,
+                                               "max_query_threads",
+                                               &SV::max_query_threads);
+static sys_var_thd_ulong sys_max_query_memory(&vars,
+                                              "max_query_memory",
+                                              &SV::max_query_memory);
+static sys_var_thd_ulong sys_query_thread_bias(&vars,
+                                               "query_thread_bias",
+                                               &SV::query_thread_bias);
 #ifndef TO_BE_DELETED	/* Alias for max_join_size */
 static sys_var_thd_ha_rows	sys_sql_max_join_size(&vars, "sql_max_join_size",
 					      &SV::max_join_size,

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2008-08-21 11:47:23 +0000
+++ b/sql/sql_base.cc	2009-01-14 10:58:32 +0000
@@ -2767,7 +2767,8 @@ TABLE *open_table(THD *thd, TABLE_LIST *
       c1: name lock t2; -- blocks
       c2: open t1; -- blocks
     */
-    if (table->needs_reopen_or_name_lock())
+    if (table->needs_reopen_or_name_lock() &&
+        !(table->s->version == 0 && thd->ignore_name_lock))
     {
       DBUG_PRINT("note",
                  ("Found table '%s.%s' with different refresh version",

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2008-08-26 10:01:49 +0000
+++ b/sql/sql_class.h	2009-01-14 10:58:32 +0000
@@ -396,6 +396,10 @@ struct system_variables
   DATE_TIME_FORMAT *datetime_format;
   DATE_TIME_FORMAT *time_format;
   my_bool sysdate_is_now;
+
+  ulong max_query_memory;
+  ulong max_query_threads;
+  ulong query_thread_bias;
 };
 
 
@@ -931,6 +935,11 @@ public:
     Flags with information about the open tables state.
   */
   uint state_flags;
+  /*
+    Flag to indicate if it's ok to open more instances even in the precence
+    of a name lock
+  */
+  bool ignore_name_lock;
 
   /*
     This constructor serves for creation of Open_tables_state instances

=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc	2008-08-28 09:54:50 +0000
+++ b/sql/sql_select.cc	2009-01-14 10:58:32 +0000
@@ -9808,6 +9808,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARA
 	    goto err;					// Should be OOM
 	  tmp_from_field++;
 	  reclength+=new_field->pack_length();
+          DBUG_PRINT("info", ("new reclength: %u", reclength));
 	  if (new_field->flags & BLOB_FLAG)
 	  {
 	    *blob_field++= fieldnr;
@@ -9873,6 +9874,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARA
 	((Item_sum *) item)->result_field= new_field;
       tmp_from_field++;
       reclength+=new_field->pack_length();
+      DBUG_PRINT("info", ("new reclength: %u", reclength));
       if (!(new_field->flags & NOT_NULL_FLAG))
 	null_count++;
       if (new_field->type() == MYSQL_TYPE_BIT)
@@ -9938,6 +9940,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARA
 
   if (!using_unique_constraint)
     reclength+= group_null_items;	// null flag is stored separately
+  DBUG_PRINT("info", ("new reclength: %u", reclength));
 
   share->blob_fields= blob_count;
   if (blob_count == 0)
@@ -9952,6 +9955,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARA
   null_pack_length= (hidden_null_pack_length +
                      (null_count + total_uneven_bit_length + 7) / 8);
   reclength+=null_pack_length;
+  DBUG_PRINT("info", ("new reclength: %u", reclength));
   if (!reclength)
     reclength=1;				// Dummy select
   /* Use packed rows if there is blobs or a lot of space to gain */
@@ -9960,6 +9964,7 @@ create_tmp_table(THD *thd,TMP_TABLE_PARA
       (reclength / string_total_length <= RATIO_TO_PACK_ROWS ||
        string_total_length / string_count >= AVG_STRING_LENGTH_TO_PACK_ROWS))
     use_packed_rows= 1;
+  DBUG_PRINT("info", ("use_packed_rows: %u", use_packed_rows));
 
   share->reclength= reclength;
   {
@@ -14769,8 +14774,6 @@ test_if_group_changed(List<Cached_item> 
 /**
   Setup copy_fields to save fields at start of new group.
 
-  Setup copy_fields to save fields at start of new group
-
   Only FIELD_ITEM:s and FUNC_ITEM:s needs to be saved between groups.
   Change old item_field to use a new field with points at saved fieldvalue
   This function is only called before use of send_fields.

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2008-08-11 18:02:03 +0000
+++ b/sql/sql_table.cc	2009-01-14 10:58:32 +0000
@@ -34,7 +34,11 @@ const char *primary_key_name="PRIMARY";
 static bool check_if_keyname_exists(const char *name,KEY *start, KEY *end);
 static char *make_unique_key_name(const char *field_name,KEY *start,KEY *end);
 static int copy_data_between_tables(TABLE *from,TABLE *to,
-                                    List<Create_field> &create, bool ignore,
+                                    TABLE_LIST *table_list,
+                                    List<Create_field> *create, THD *thd,
+                                    uint alter_flags, char *tmp_name,
+                                    char *new_db,
+                                    bool ignore,
 				    uint order_num, ORDER *order,
 				    ha_rows *copied,ha_rows *deleted,
                                     enum enum_enable_or_disable keys_onoff,
@@ -3185,8 +3189,7 @@ void sp_prepare_create_field(THD *thd, C
     db			Database
     table_name		Table name
     create_info	        Create information (like MAX_ROWS)
-    fields		List of fields to create
-    keys		List of keys to create
+    alter_info          Alter information (like altered fields)
     internal_tmp_table  Set to 1 if this is an internal temporary table
 			(From ALTER TABLE)
     select_field_count
@@ -5916,6 +5919,45 @@ err:
 }
 
 
+/**
+  Get a tmp table for alter use
+
+  table      original table for determine if the copy ...
+TODO: Fill in...
+*/
+static TABLE*
+get_alter_tmp_table(TABLE *table, THD *thd,
+                    char *tmp_name, char *new_db)
+{
+  TABLE *new_table;
+  DBUG_ENTER("get_alter_tmp_table");
+  DBUG_PRINT("info", ("tmp_table %u", table->s->tmp_table));
+  if (table->s->tmp_table != NO_TMP_TABLE)
+  {
+    TABLE_LIST tbl;
+    bzero((void*) &tbl, sizeof(tbl));
+    tbl.db= new_db;
+    tbl.table_name= tbl.alias= tmp_name;
+    /* Table is in thd->temporary_tables */
+    DBUG_PRINT("info", ("opening a copy of the tmp table '%s/%s' (%p)",
+                        new_db, tmp_name, table));
+    new_table= open_table(thd, &tbl, thd->mem_root, (bool*) 0,
+                          MYSQL_LOCK_IGNORE_FLUSH);
+  }
+  else
+  {
+    char path[FN_REFLEN];
+    /* table is a normal table: Create temporary table in same directory */
+    build_table_filename(path, sizeof(path), new_db, tmp_name, "",
+                         FN_IS_TMP);
+    /* Open our intermediate table */
+    DBUG_PRINT("info", ("opening tmp table '%s/%s' as a copy of norm tbl (%p)",
+                        new_db, tmp_name, table));
+    new_table= open_temporary_table(thd, path, new_db, tmp_name,0);
+  }
+  DBUG_RETURN(new_table);
+}
+
 /*
   Alter table
 
@@ -6607,25 +6649,7 @@ view_err:
   DBUG_PRINT("info", ("need_copy_table: %u", need_copy_table));
   if (need_copy_table != ALTER_TABLE_METADATA_ONLY)
   {
-    if (table->s->tmp_table)
-    {
-      TABLE_LIST tbl;
-      bzero((void*) &tbl, sizeof(tbl));
-      tbl.db= new_db;
-      tbl.table_name= tbl.alias= tmp_name;
-      /* Table is in thd->temporary_tables */
-      new_table= open_table(thd, &tbl, thd->mem_root, (bool*) 0,
-                            MYSQL_LOCK_IGNORE_FLUSH);
-    }
-    else
-    {
-      char path[FN_REFLEN];
-      /* table is a normal table: Create temporary table in same directory */
-      build_table_filename(path, sizeof(path), new_db, tmp_name, "",
-                           FN_IS_TMP);
-      /* Open our intermediate table */
-      new_table=open_temporary_table(thd, path, new_db, tmp_name,0);
-    }
+    new_table= get_alter_tmp_table(table, thd, tmp_name, new_db);
     if (!new_table)
       goto err1;
     /*
@@ -6648,8 +6672,10 @@ view_err:
     /* We don't want update TIMESTAMP fields during ALTER TABLE. */
     new_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
     new_table->next_number_field=new_table->found_next_number_field;
-    error= copy_data_between_tables(table, new_table,
-                                    alter_info->create_list, ignore,
+    error= copy_data_between_tables(table, new_table, table_list,
+                                    &alter_info->create_list, thd,
+                                    alter_info->flags,
+                                    tmp_name, new_db, ignore,
                                     order_num, order, &copied, &deleted,
                                     alter_info->keys_onoff,
                                     alter_info->error_if_not_empty);
@@ -7033,6 +7059,7 @@ err:
     unlink_open_table(thd, name_lock, FALSE);
     pthread_mutex_unlock(&LOCK_open);
   }
+  /* TODO: thd->main_da.m_status must be set before leaving the function! */
   DBUG_RETURN(TRUE);
 
 err_with_placeholders:
@@ -7049,31 +7076,108 @@ err_with_placeholders:
 }
 /* mysql_alter_table */
 
+/* Parallel alter module start */
+struct free_record_buffer_st
+{
+  pthread_mutex_t lock;
+  /* array of free record numbers in record_buffer */
+  ulong* list;
+  ulonglong num_freed_records;
+  ulonglong num_taken_records;
+  ulong num_wake_calls;
+  ulong num_tot_waits;
+  uint* sleeping; /* array of size num_read_threads, used for wake readers */
+};
+typedef struct free_record_buffer_st FREE_RECORDS;
+
+struct record_queue_st
+{
+  /* uses write_data_thread->lock */
+  uint *records_in_queue; /* records to write */
+  ulonglong current_record; /* num of records taken to this queue */
+  ulonglong record_count; /* num of records inserted to this queue */
+  bool sleeping; /* set at start, and when the writer waits for records */
+  ulong num_wake_calls;
+};
+typedef struct record_queue_st RECORD_QUEUE;
+
+enum enum_alter_thread_type {
+  COPY_THREAD,
+  READ_THREAD,
+  WRITE_THREAD
+};
+
+/* shared array of 'static' thread data, READ ONLY after initialization */
+struct alter_data_share_st
+{
+  TABLE *from;                               // From mysql_alter_table
+  TABLE *to;                                 // From mysql_alter_table
+  TABLE_LIST *table_list;
+  volatile THD::killed_state *kill_flag;
+  List<Create_field> *create;
+  FREE_RECORDS *free_records;                // List/Queue of free record ids
+  RECORD_QUEUE *record_queues;               // Queues to writers
+  uchar* record_buffer;                      // Shared record buffer
+  ulong buf_reclength;                       // Aligned length of one record
+  ha_rows estimated_rows;                    // Estimated rows in from table
+  uint order_num;
+  ORDER *order;
+  pthread_cond_t *read_com_cond;             // communication cond
+  pthread_cond_t *write_com_cond;            // communication cond
+  pthread_cond_t *read_ctrl_cond;            // thread control cond
+  pthread_cond_t *write_ctrl_cond;           // thread control cond
+  pthread_mutex_t *read_mutex;
+  pthread_mutex_t *write_mutex;
+  ulong sql_mode;
+  uint num_read_parts;
+  uint num_write_parts;
+  uint num_read_threads;
+  uint num_write_threads;
+  ulong max_rec_in_buffer;
+  char *new_db;
+  char *tmp_name;
+  enum enum_enable_or_disable keys_onoff;
+  int indexes_are_disabled;
+  bool error_if_not_empty;
+  bool ignore;
+};
+typedef struct alter_data_share_st ALTER_DATA_SHARE;
+
+/* Thread specific data */
+struct alter_data_thread_st
+{
+  TABLE *to;
+  TABLE *from;
+  TABLE_LIST *table_list;
+  THD *thd;
+  pthread_t pthread_id;
+  uint thread_num;
+  ha_rows copied;
+  ha_rows deleted;
+  int ha_error;                              // Handler error code
+  bool killed;
+  bool done;                                 // Thread has ended
+  bool ok;                                   // No non handler error
+  bool wait_for_signal;
+  ALTER_DATA_SHARE *alter_data_share;
+  /* pointers to corresponding cond/mutex in alter_data_share */
+  pthread_cond_t *com_cond;                  // communication cond
+  pthread_cond_t *ctrl_cond;                 // thread control cond
+  pthread_mutex_t *lock;
+};
+typedef struct alter_data_thread_st ALTER_DATA_THREAD;
+
+
 static int
-copy_data_between_tables(TABLE *from,TABLE *to,
-			 List<Create_field> &create,
-                         bool ignore,
-			 uint order_num, ORDER *order,
-			 ha_rows *copied,
-			 ha_rows *deleted,
-                         enum enum_enable_or_disable keys_onoff,
-                         bool error_if_not_empty)
+pre_trans_part(TABLE *to,
+               THD *thd,
+               bool ignore,
+               int indexes_are_disabled,
+               enum enum_enable_or_disable keys_onoff,
+               ha_rows estimated_rows, bool only_read)
 {
   int error;
-  Copy_field *copy,*copy_end;
-  ulong found_count,delete_count;
-  THD *thd= current_thd;
-  uint length= 0;
-  SORT_FIELD *sortorder;
-  READ_RECORD info;
-  TABLE_LIST   tables;
-  List<Item>   fields;
-  List<Item>   all_fields;
-  ha_rows examined_rows;
-  bool auto_increment_field_copied= 0;
-  ulong save_sql_mode;
-  ulonglong prev_insert_id;
-  DBUG_ENTER("copy_data_between_tables");
+  DBUG_ENTER("pre_trans_part");
 
   /*
     Turn off recovery logging since rollback of an alter table is to
@@ -7085,85 +7189,277 @@ copy_data_between_tables(TABLE *from,TAB
   if (error)
     DBUG_RETURN(-1);
   
-  if (!(copy= new Copy_field[to->s->fields]))
-    DBUG_RETURN(-1);				/* purecov: inspected */
-
-  if (to->file->ha_external_lock(thd, F_WRLCK))
-    DBUG_RETURN(-1);
-
-  /* We need external lock before we can disable/enable keys */
-  alter_table_manage_keys(to, from->file->indexes_are_disabled(), keys_onoff);
+  if (only_read)
+  {
+    if (to->file->ha_external_lock(thd, F_RDLCK))
+      DBUG_RETURN(-1);
+  }
+  else
+  {
+    if (to->file->ha_external_lock(thd, F_WRLCK))
+      DBUG_RETURN(-1);
+  }
 
   /* We can abort alter table for any table type */
   thd->abort_on_warning= !ignore && test(thd->variables.sql_mode &
                                          (MODE_STRICT_TRANS_TABLES |
                                           MODE_STRICT_ALL_TABLES));
 
-  from->file->info(HA_STATUS_VARIABLE);
-  to->file->ha_start_bulk_insert(from->file->stats.records);
+  /* We need external lock before we can disable/enable keys */
+  if (!only_read)
+  {
+    alter_table_manage_keys(to, indexes_are_disabled, keys_onoff);
+    to->file->ha_start_bulk_insert(estimated_rows);
+  }
+  DBUG_RETURN(0);
+}
 
-  save_sql_mode= thd->variables.sql_mode;
 
-  List_iterator<Create_field> it(create);
-  Create_field *def;
-  copy_end=copy;
-  for (Field **ptr=to->field ; *ptr ; ptr++)
+static void
+post_trans_part(THD *thd, TABLE *to, int *ha_error, bool only_read)
+{
+  int error;
+  DBUG_ENTER("post_trans_part");
+  if (!only_read)
+  {
+    to->file->ha_release_auto_increment();
+    if ((error= to->file->ha_end_bulk_insert()) && !(*ha_error))
+      *ha_error= error;
+    to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+  }
+
+  /* Note that all return values is not HA_ERR... */
+  if ((error= ha_enable_transaction(thd, TRUE)))
+  {
+    if (!*ha_error)
+      *ha_error= error;
+    DBUG_VOID_RETURN;
+  }
+  /*
+    Ensure that the new table is saved properly to disk so that we
+    can do a rename
+  */
+  if ((error= ha_autocommit_or_rollback(thd, 0)) && !(*ha_error))
+    *ha_error= error;
+  if ((error= end_active_trans(thd)) && !(*ha_error))
+    *ha_error= error;
+  if ((error= to->file->ha_external_lock(thd,F_UNLCK)) && !(*ha_error))
+    *ha_error= error;
+  DBUG_VOID_RETURN;
+}
+
+static void
+close_down_thread(ALTER_DATA_THREAD *thread)
+{
+  if (thread->to)
   {
-    def=it++;
+    /* TODO: I got stuck on an assert here on LOCK_open */
+    intern_close_table(thread->to);
+    my_free(thread->to, MYF(0));
+  }
+  if (thread->from)
+  {
+    mysql_unlock_tables(thread->thd, thread->thd->lock);
+    thread->thd->lock= 0;
+    safe_mutex_assert_not_owner(&LOCK_open);
+    pthread_mutex_lock(&LOCK_open);
+    close_thread_table(thread->thd,
+                       &thread->thd->open_tables);
+    pthread_mutex_unlock(&LOCK_open);
+  }
+  delete thread->thd;
+  thread->to= 0;
+  thread->from= 0;
+  thread->thd= 0;
+}
+
+
+static int
+create_copy_fields(TABLE *to,
+                   TABLE *from,
+                   TABLE *orig_from,
+                   List<Create_field> *create,
+                   THD *thd,
+                   bool *auto_increment_field_copied,
+                   bool *blob_field_copied,
+                   Copy_field **copy,
+                   Copy_field **copy_end)
+{
+  Create_field *def;
+  uint skipped_fields= 0;
+  List_iterator<Create_field> it(*create);
+  DBUG_ENTER("create_copy_fields");
+
+  if (!(*copy= new Copy_field[to->s->fields]))
+    DBUG_RETURN(-1);                            /* purecov: inspected */
+
+  *copy_end= *copy;
+  for (Field **ptr= to->field ; *ptr ; ptr++)
+  {
+    //Field *from_field;
+    DBUG_PRINT("info", ("to field '%s'", (*ptr)->field_name));
+    /* matching create field */
+    def= it++;
     if (def->field)
     {
+      /* def->field is the corresponding 'from' field */
+      DBUG_PRINT("info",
+                 ("create field '%s' change '%s' old field (%p) old index (%u)",
+                  def->field_name, def->change, def->field,
+                  def->field->field_index));
+      bitmap_set_bit(from->read_set, def->field->field_index);
+      if (def->field->flags & BLOB_FLAG)
+        *blob_field_copied= TRUE;
+
       if (*ptr == to->next_number_field)
       {
-        auto_increment_field_copied= TRUE;
+        *auto_increment_field_copied= TRUE;
         /*
           If we are going to copy contents of one auto_increment column to
           another auto_increment column it is sensible to preserve zeroes.
-          This condition also covers case when we are don't actually alter
+          This condition also covers case when we don't actually alter
           auto_increment column.
+          NOTE: This should always be done, i.e. only substitute a value
+          in a 'from'-table if it is NULL
         */
-        if (def->field == from->found_next_number_field)
-          thd->variables.sql_mode|= MODE_NO_AUTO_VALUE_ON_ZERO;
+        /*
+        if (def->field == orig_from->found_next_number_field)
+        */
+        thd->variables.sql_mode|= MODE_NO_AUTO_VALUE_ON_ZERO;
       }
-      (copy_end++)->set(*ptr,def->field,0);
+      ((*copy_end)++)->set(*ptr,from->field[def->field->field_index],0);
+    }
+    else
+    {
+      DBUG_PRINT("info", ("Added field '%s' (not in from-table)",
+                          def->field_name));
+      skipped_fields++;
     }
+  }
+  DBUG_ASSERT(*copy_end == (*copy + to->s->fields - skipped_fields));
+  DBUG_RETURN(0);
+}
+
+static int
+handle_scan_order(TABLE *from,
+                  THD *thd,
+                  int order_num,
+                  ORDER *order,
+                  ha_rows *examined_rows)
+{
+  int error;
+  uint length= 0;
+  SORT_FIELD *sortorder;
+  List<Item>   fields;
+  List<Item>   all_fields;
+  TABLE_LIST   tables;
+  DBUG_ENTER("handle_order");
 
+  from->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
+                                            MYF(MY_FAE | MY_ZEROFILL));
+  bzero((char *) &tables, sizeof(tables));
+  tables.table= from;
+  tables.alias= tables.table_name= from->s->table_name.str;
+  tables.db= from->s->db.str;
+  error= 1;
+
+  if (thd->lex->select_lex.setup_ref_array(thd, order_num) ||
+      setup_order(thd, thd->lex->select_lex.ref_pointer_array,
+                  &tables, fields, all_fields, order) ||
+      !(sortorder= make_unireg_sortorder(order, &length, NULL)) ||
+      (from->sort.found_records= filesort(thd, from, sortorder, length,
+                                          (SQL_SELECT *) 0, HA_POS_ERROR,
+                                          1, examined_rows)) ==
+      HA_POS_ERROR)
+  {
+    DBUG_RETURN(1);
   }
+  DBUG_RETURN(0);
+}
+
+
+static THD*
+get_new_thd()
+{
+  THD *new_thd;
+  new_thd= new THD;
+  new_thd->thread_stack= (char*)&new_thd;
+  new_thd->store_globals();
+  lex_start(new_thd);
+  return new_thd;
+}
+
+#define MAX_NUM_THREADS 64
 
-  found_count=delete_count=0;
+#define RECORD_QUEUE_DONE (ulong) -1L
 
-  if (order)
+/*
+  Copies a record from the read table to the write table
+  using Copy_fields.
+*/
+static void
+copy_altered_record(TABLE *to, bool auto_increment_field_copied,
+                    Copy_field *copy, Copy_field *copy_end, uchar *old_ptr)
+{
+  DBUG_ENTER("copy_altered_record");
+  if (to->next_number_field)
   {
-    if (to->s->primary_key != MAX_KEY && to->file->primary_key_is_clustered())
-    {
-      char warn_buff[MYSQL_ERRMSG_SIZE];
-      my_snprintf(warn_buff, sizeof(warn_buff), 
-                  "ORDER BY ignored as there is a user-defined clustered index"
-                  " in the table '%-.192s'", from->s->table_name.str);
-      push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
-                   warn_buff);
-    }
+    if (auto_increment_field_copied)
+      to->auto_increment_field_not_null= TRUE;
     else
-    {
-      from->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
-                                                MYF(MY_FAE | MY_ZEROFILL));
-      bzero((char *) &tables, sizeof(tables));
-      tables.table= from;
-      tables.alias= tables.table_name= from->s->table_name.str;
-      tables.db= from->s->db.str;
-      error= 1;
-
-      if (thd->lex->select_lex.setup_ref_array(thd, order_num) ||
-          setup_order(thd, thd->lex->select_lex.ref_pointer_array,
-                      &tables, fields, all_fields, order) ||
-          !(sortorder= make_unireg_sortorder(order, &length, NULL)) ||
-          (from->sort.found_records= filesort(thd, from, sortorder, length,
-                                              (SQL_SELECT *) 0, HA_POS_ERROR,
-                                              1, &examined_rows)) ==
-          HA_POS_ERROR)
-        goto err;
+      to->next_number_field->reset();
+  }
+  
+  for (Copy_field *copy_ptr= copy ; copy_ptr != copy_end ; copy_ptr++)
+  {
+    /*
+      do_copy only copies the record, not the content of the pointers
+      in the record. (Affects blobs.)
+    */
+    DBUG_PRINT("info", ("from field (%p) to (%p)", copy_ptr->from_ptr,
+                        copy_ptr->to_ptr));
+    /* First, use the correct pointers */
+    if (old_ptr)
+    {
+      //copy_ptr->set(copy_ptr->from_ptr,
+      //              copy_ptr->to_ptr - old_ptr + to->record[0], 0);
+      copy_ptr->to_ptr= copy_ptr->to_ptr - old_ptr + to->record[0];
+      copy_ptr->to_null_ptr= copy_ptr->to_null_ptr - old_ptr + to->record[0];
+      DBUG_PRINT("info", ("from field (%p) to (%p)", copy_ptr->from_ptr,
+                          copy_ptr->to_ptr));
     }
-  };
+    copy_ptr->do_copy(copy_ptr);
+  }
+  DBUG_VOID_RETURN;
+}
+
+/*
+  reading from one partition set and writing to another partition set,
+  without using row-queue.
+*/
+static int
+copy_data_part(TABLE *to,
+               TABLE *from,
+               THD *thd,
+               volatile THD::killed_state *kill_flag,
+               bool *killed,
+               bool error_if_not_empty,
+               bool auto_increment_field_copied,
+               bool ignore,
+               ha_rows *copied,
+               ha_rows *deleted,
+               Copy_field *copy,
+               Copy_field *copy_end,
+               int *ha_error)
+{
+  int error= 0;
+  READ_RECORD info;
+  ulong found_count,delete_count;
+  ulonglong prev_insert_id;
+  DBUG_ENTER("copy_data_part");
+  DBUG_PRINT("info", ("from %p to %p thd %p", from, to, thd));
 
+  found_count= delete_count= 0;
   /* Tell handler that we have values for all columns in the to table */
   to->use_all_columns();
   init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1, 1, FALSE);
@@ -7171,12 +7467,15 @@ copy_data_between_tables(TABLE *from,TAB
     to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
   thd->row_count= 0;
   restore_record(to, s->default_values);        // Create empty record
-  while (!(error=info.read_record(&info)))
+  while (!(error= info.read_record(&info)))
   {
-    if (thd->killed)
+    /* FIXME: This kill does not appear to work, verify... */
+    if (*kill_flag)
     {
-      thd->send_kill_message();
+      DBUG_PRINT("info", ("kill_flag was set to %u",
+                          *kill_flag));
       error= 1;
+      *killed= TRUE;
       break;
     }
     thd->row_count++;
@@ -7186,45 +7485,27 @@ copy_data_between_tables(TABLE *from,TAB
       error= 1;
       break;
     }
-    if (to->next_number_field)
-    {
-      if (auto_increment_field_copied)
-        to->auto_increment_field_not_null= TRUE;
-      else
-        to->next_number_field->reset();
-    }
-    
-    for (Copy_field *copy_ptr=copy ; copy_ptr != copy_end ; copy_ptr++)
-    {
-      copy_ptr->do_copy(copy_ptr);
-    }
+    copy_altered_record(to, auto_increment_field_copied, copy, copy_end, NULL);
     prev_insert_id= to->file->next_insert_id;
-    error=to->file->ha_write_row(to->record[0]);
+    (*ha_error)= to->file->ha_write_row(to->record[0]);
     to->auto_increment_field_not_null= FALSE;
-    if (error)
+    if (*ha_error)
     {
-      if (!ignore ||
-          to->file->is_fatal_error(error, HA_CHECK_DUP))
-      {
-         if (!to->file->is_fatal_error(error, HA_CHECK_DUP))
-         {
-           uint key_nr= to->file->get_dup_key(error);
-           if ((int) key_nr >= 0)
-           {
-             const char *err_msg= ER(ER_DUP_ENTRY_WITH_KEY_NAME);
-             if (key_nr == 0 &&
-                 (to->key_info[0].key_part[0].field->flags &
-                  AUTO_INCREMENT_FLAG))
-               err_msg= ER(ER_DUP_ENTRY_AUTOINCREMENT_CASE);
-             to->file->print_keydup_error(key_nr, err_msg);
-             break;
-           }
-         }
+      /* Only allow duplicate errors if non fatal and ignore is set */
 
-	to->file->print_error(error,MYF(0));
-	break;
+      if (!ignore || to->file->is_fatal_error(error, HA_CHECK_DUP))
+      {
+        /* Don't report errors here, do it in the main thread later! */
+        /*
+          TODO: may we set kill_flag (main thread THD->killed) here? 
+          to stop the other running threads.
+        */
+        DBUG_PRINT("info", ("got error %d from ha_write_row", error));
+        error= 1;
+        break;
       }
       to->file->restore_auto_increment(prev_insert_id);
+      (*ha_error)= 0;
       delete_count++;
     }
     else
@@ -7232,42 +7513,1581 @@ copy_data_between_tables(TABLE *from,TAB
   }
   end_read_record(&info);
   free_io_cache(from);
-  delete [] copy;				// This is never 0
+  delete [] copy;                               // This is never 0
+  *copied= found_count;
+  *deleted= delete_count;
+  DBUG_RETURN(error > 0 ? -1 : 0);
+}
+
+static ulong
+get_free_rec_id(ALTER_DATA_THREAD *loc_thread)
+{
+  DBUG_ENTER("get_free_rec_id");
+  ulong rec_id;
+  ulong thread_num;
+  FREE_RECORDS *f;
+  ALTER_DATA_SHARE *s;
+  /* test to use a alder_data_share as a argument */
+  pthread_mutex_lock(loc_thread->lock);
+  s= loc_thread->alter_data_share;
+  thread_num= loc_thread->thread_num;
+  pthread_mutex_unlock(loc_thread->lock);
+  f= s->free_records;
 
-  if (to->file->ha_end_bulk_insert() && error <= 0)
+  /* TODO: use a list of free records */
+  do
   {
-    to->file->print_error(my_errno,MYF(0));
-    error=1;
+    pthread_mutex_lock(&f->lock);
+    if (f->num_taken_records < f->num_freed_records)
+    {
+      rec_id= f->list[f->num_taken_records++ % s->max_rec_in_buffer];
+      pthread_mutex_unlock(&f->lock);
+      DBUG_RETURN(rec_id);
+    }
+    /* add to the sleeping list */
+    f->sleeping[f->num_tot_waits++ % s->num_read_threads]= thread_num;
+    /* add loop */
+    pthread_cond_wait(loc_thread->com_cond, &f->lock);
+    /* add loc_thread->waits and use that for priority of get_free_rec_id */
+    pthread_mutex_unlock(&f->lock);
+  } while (1);
+  /* should never come here */
+  DBUG_ASSERT(0);
+  DBUG_RETURN((ulong) -1L);
+}
+
+static void
+free_rec_id(ALTER_DATA_THREAD *loc_thread, ulong rec_id)
+{
+  DBUG_ENTER("free_one_rec_in_buf");
+  FREE_RECORDS *f;
+  ALTER_DATA_SHARE *s;
+  uint wake_thread_num;
+  pthread_mutex_lock(loc_thread->lock);
+  s= loc_thread->alter_data_share;
+  pthread_mutex_unlock(loc_thread->lock);
+  f= s->free_records;
+
+  /* TODO: use a list of free records */
+  pthread_mutex_lock(&f->lock);
+  DBUG_ASSERT(f->num_taken_records <= f->num_freed_records);
+  f->list[f->num_freed_records++ % s->max_rec_in_buffer]= rec_id;
+  /* check the sleeping list */
+  if (f->num_wake_calls < f->num_tot_waits)
+  {
+    wake_thread_num= f->sleeping[f->num_wake_calls++ % s->num_read_threads];
+    pthread_cond_signal(&s->read_com_cond[wake_thread_num]);
+  }
+  pthread_mutex_unlock(&f->lock);
+  DBUG_VOID_RETURN;
+}
+
+/*
+  multiple writers to the queue, single reader (always called by read threads)
+*/
+static void
+put_in_rec_queue(ALTER_DATA_THREAD *loc_thread, uint to_part_id, uint rec_id)
+{
+  uint write_thr_num;
+  RECORD_QUEUE *q;
+  ALTER_DATA_SHARE *s;
+
+  DBUG_ENTER("put_in_rec_queue");
+  pthread_mutex_lock(loc_thread->lock);
+  s= loc_thread->alter_data_share;
+  pthread_mutex_unlock(loc_thread->lock);
+
+  write_thr_num= to_part_id % s->num_write_threads;
+  q= &s->record_queues[write_thr_num];
+  pthread_mutex_lock(&s->write_mutex[write_thr_num]);
+  q->records_in_queue[q->record_count++ % s->max_rec_in_buffer]= rec_id;
+  if (q->sleeping)
+  {
+    pthread_cond_signal(&s->write_com_cond[write_thr_num]);
+    q->sleeping= FALSE;
+    q->num_wake_calls++;
   }
-  to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+  pthread_mutex_unlock(&s->write_mutex[write_thr_num]);
+  DBUG_VOID_RETURN;
+}
 
-  if (ha_enable_transaction(thd, TRUE))
+/*
+  multiple writers to the queue, single reader (always called by writer)
+*/
+static ulong
+get_from_rec_queue(ALTER_DATA_THREAD *loc_thread)
+{
+  ulong rec_id= RECORD_QUEUE_DONE;
+  RECORD_QUEUE *q;
+  ALTER_DATA_SHARE *s;
+  DBUG_ENTER("get_from_rec_queue");
+  pthread_mutex_lock(loc_thread->lock);
+  s= loc_thread->alter_data_share;
+  q= &s->record_queues[loc_thread->thread_num];
+  if (q->current_record < q->record_count)
   {
-    error= 1;
-    goto err;
+    rec_id= q->records_in_queue[q->current_record++ % s->max_rec_in_buffer];
   }
-  
+  else
+  {
+    /*
+      This can only be done if there is a single reader.
+      It is also signaled when all readers are done.
+    */
+    q->sleeping= TRUE;
+    /* add loop */
+    pthread_cond_wait(loc_thread->com_cond, loc_thread->lock);
+    /* will always be true, since we add an end record */
+    if (q->current_record < q->record_count)
+      rec_id= q->records_in_queue[q->current_record++ % s->max_rec_in_buffer];
+  }
+  pthread_mutex_unlock(loc_thread->lock);
+  DBUG_RETURN(rec_id);
+}
+
+/*
+static void print_records(TABLE *table, const uchar *record)
+{
+  DBUG_PRINT("info", ("record[0] (%p) record (%p)", table->record[0],
+                      record));
+  for (uint j= 0; j < table->s->fields; j++)
+  {
+    char buf[40];
+    int pos= 0;
+    Field *field= table->field[j];
+    const uchar* field_ptr= field->ptr - table->record[0] + record;
+    int pack_len= field->pack_length();
+    int n= pack_len < 10 ? pack_len : 10;
+
+    DBUG_PRINT("info", ("field_ptr (%p)", field_ptr));
+    for (int i= 0; i < n && pos < 80; i++)
+    {
+      pos+= sprintf(&buf[pos],"%x'%c'", (int) (uchar) field_ptr[i],
+                                         (uchar) field_ptr[i]);
+    }
+    buf[pos]= 0;
+    DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
+  }
+}
+*/
+
+
+/* no real reason why not just let it be in parallel_read_data_thread */
+static int
+do_read_data_from_table(ALTER_DATA_THREAD *loc_thread)
+{
+  int error= 0;
+  int ha_error;
+  READ_RECORD info;
+  TABLE *to, *from;
+  ALTER_DATA_SHARE *data_share;
+  THD *thd;
+  bool ignore;
+  Copy_field *copy,*copy_end;
+  partition_info *part_info;
+  bool auto_increment_field_copied= FALSE;
+  bool blob_field_copied= FALSE;
+  bool killed= FALSE;
+  uchar *buf; /* pointer where to copy the new record */
+  uchar *record_buffer;
+  ulong free_record_id;
+  uint32 to_part_id= 0; /* 0 if non partitioned table */
+  longlong func_value; /* not really used */
+  ulong save_sql_mode;
+  ha_rows copied= 0;
+  uchar *saved_rec0, *prev_rec0;
+
+  DBUG_ENTER("do_read_data_from_table");
+  DBUG_ASSERT(loc_thread);
+
+  pthread_mutex_lock(loc_thread->lock);
+  data_share= loc_thread->alter_data_share;
+  to= loc_thread->to;
+  from= loc_thread->from;
+  thd= loc_thread->thd;
+  DBUG_PRINT("info", ("from %p thd %p (reclen: %lu)", loc_thread->from,
+                      loc_thread->thd, loc_thread->from->s->reclength));
+  DBUG_ASSERT(!data_share->order);
+  record_buffer= data_share->record_buffer;
+  pthread_mutex_unlock(loc_thread->lock);
+
+  part_info= to->part_info;
+
+  save_sql_mode= thd->variables.sql_mode;
   /*
-    Ensure that the new table is saved properly to disk so that we
-    can do a rename
+    Copy only the read part of the row (use marked columns and use a
+    new record (internal row format) from the previously allocated
+    record_buffer (TODO: Further check how to minimize read and copy)
+    Use the same copy method as delayed insert uses.
+    In the write thread, just use the ready to insert row as record[0].
   */
-  if (ha_autocommit_or_rollback(thd, 0))
-    error=1;
-  if (end_active_trans(thd))
-    error=1;
+  to->use_all_columns();
 
- err:
+  if (create_copy_fields(to, from, data_share->from, data_share->create,
+                         thd, &auto_increment_field_copied,
+                         &blob_field_copied, &copy, &copy_end))
+    goto early_error;
+
+  /* TODO: lift this limitation, instead, wait for the row to be written */
+  if (blob_field_copied)
+    goto early_error;
+
+  /* now like copy_data_part */
+  init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1, 1, FALSE);
+  if (ignore)
+    to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+  thd->row_count= 0;
+  saved_rec0= to->record[0];
+
+  while (!(error= info.read_record(&info)))
+  {
+    if (*(data_share->kill_flag))
+    {
+      DBUG_PRINT("info", ("kill_flag was set to %u",
+                          *(data_share->kill_flag)));
+      error= 1;
+      killed= TRUE;
+      break;
+    }
+    /* Return error if source table isn't empty. */
+    if (data_share->error_if_not_empty)
+    {
+      DBUG_PRINT("info", ("marking error since non empty table"));
+      error= 1;
+      break;
+    }
+    free_record_id= get_free_rec_id(loc_thread);
+    buf= &record_buffer[free_record_id * data_share->buf_reclength];
+    /* set to->record[0] to the new record in buf */
+    DBUG_PRINT("info", ("to->field (%p) buf (%p) to->rec (%p)", to->field, buf,
+                        to->record[0]));
+    set_field_ptr(to->field, buf, to->record[0]);
+    prev_rec0= to->record[0];
+    to->record[0]= buf;
+
+    /*
+      Maybe this should be done when initializing record_buffer?
+      It must be done once for every record that will be copied to,
+      if not all columns will be written/copied to.
+    */
+    DBUG_ASSERT(buf);
+    DBUG_ASSERT(to->record[0]);
+    DBUG_ASSERT(to);
+    DBUG_ASSERT(to->s->default_values);
+    restore_record(to, s->default_values);
+
+    DBUG_PRINT("info", ("read from record"));
+    //print_records(from, from->record[0]);
+    copy_altered_record(to, auto_increment_field_copied, copy, copy_end,
+                        prev_rec0);
+    DBUG_PRINT("info", ("copied to record"));
+    //print_records(to, buf);
+
+    /*
+      TODO: If all fields in [sub]part_expr exists in 'from' table one could
+      do this already on the 'from' table, and then choose a buffer 'local' to
+      that write thread, this would probably increase throughput
+    */
+    if (part_info && (ha_error= part_info->get_partition_id(part_info,
+                                                            &to_part_id,
+                                                            &func_value)))
+      break;
+    put_in_rec_queue(loc_thread, to_part_id, free_record_id);
+    copied++;
+  }
+  to->record[0]= saved_rec0;
+  end_read_record(&info);
+
+  pthread_mutex_lock(loc_thread->lock);
+  loc_thread->killed= killed;
+  loc_thread->ha_error= ha_error;
+  loc_thread->copied= copied;
+  /* read_record returns -1 if HA_ERR_END_OF_FILE, else > 0 */
+  if (error > 0)
+  {
+    DBUG_PRINT("info", ("error %u in read thread", error));
+    loc_thread->ok= FALSE;
+  }
+  pthread_mutex_unlock(loc_thread->lock);
+
+  /* now like do_copy_data_between_tables */
   thd->variables.sql_mode= save_sql_mode;
   thd->abort_on_warning= 0;
+  /* Scan end handling */
   free_io_cache(from);
-  *copied= found_count;
-  *deleted=delete_count;
-  to->file->ha_release_auto_increment();
-  if (to->file->ha_external_lock(thd,F_UNLCK))
-    error=1;
   DBUG_RETURN(error > 0 ? -1 : 0);
-}
 
+ early_error:
+  thd->variables.sql_mode= save_sql_mode;
+  thd->abort_on_warning= 0;
+  DBUG_RETURN(-1);                              /* purecov: inspected */
+}
+
+/*
+  create new THD and open the table to read from
+  thread->lock must be locked.
+*/
+static bool
+init_alter_data_thread(ALTER_DATA_THREAD *thread,
+                       enum enum_alter_thread_type type)
+{
+  ALTER_DATA_SHARE *s= thread->alter_data_share;
+  ha_rows rows= s->estimated_rows;
+  DBUG_ENTER("init_alter_data_thread");
+
+  thread->ok= FALSE;
+  if (!(thread->thd= get_new_thd()))
+    DBUG_RETURN(TRUE);
+
+  thread->thd->variables.sql_mode= s->sql_mode;
+
+  /*
+    Both read/write and copy threads need an own 'to' table, because it is
+    used for copying/converting to the new column formats.
+  */
+  DBUG_PRINT("info", ("to (%p) from (%p)", s->to, s->from));
+  /* TODO: Why should it be 'from' and not 'to'? */
+  /* 'from' table only used to determine if original table is TMP or not */
+  if (!(thread->to= get_alter_tmp_table(s->from,
+                                        thread->thd,
+                                        s->tmp_name,
+                                        s->new_db)))
+    DBUG_RETURN(TRUE);
+
+  DBUG_PRINT("info", ("Thread %u thd (%p) to->in_use (%p) to (%p)",
+                      thread->thread_num, thread->thd, thread->to->in_use,
+                      thread->to));
+  if (type != WRITE_THREAD)
+  {
+    /* Only read/copy threads needs an 'from' table. */
+    thread->table_list->table= NULL;
+    /* Allow the table to be opened and locked multiple times. */
+    thread->thd->ignore_name_lock= TRUE;
+    if (!(thread->from= open_n_lock_single_table(thread->thd,
+                                                 thread->table_list,
+                                                 TL_READ)))
+      DBUG_RETURN(TRUE);
+
+    thread->thd->ignore_name_lock= FALSE;
+    thread->from= thread->table_list->table;
+
+    DBUG_PRINT("info", ("Thread %u thd (%p) from->in_use (%p)",
+                        thread->thread_num, thread->thd, thread->from->in_use));
+  }
+  
+  thread->table_list->table= NULL;
+
+  /*
+    Now it's time to set-up partition bitmap if there are more than one read
+    thread active. In this case the threads should only scan a subset of
+    the table. (Only for read/copy threads).
+  */
+  if (type != WRITE_THREAD)
+  {
+    uint next_part_id= thread->thread_num;
+    DBUG_ASSERT(next_part_id < s->num_read_parts);
+    bitmap_clear_all(&thread->from->part_info->used_partitions);
+    do
+    {
+      bitmap_set_bit(&thread->from->part_info->used_partitions,
+                     next_part_id);
+      next_part_id+= s->num_read_threads;
+    } while (next_part_id < s->num_read_parts);
+  }
+
+  if (type == COPY_THREAD)
+  {
+    /* 1:1 between read partitions and write partitions */
+    thread->from->file->info(HA_STATUS_VARIABLE);
+    rows= thread->from->file->stats.records;
+  }
+  if (type == READ_THREAD)
+  {
+    if (pre_trans_part(thread->to, thread->thd, s->ignore,
+                       s->indexes_are_disabled, s->keys_onoff, rows, TRUE))
+      DBUG_RETURN(TRUE);
+  }
+  else
+  {
+    if (pre_trans_part(thread->to, thread->thd, s->ignore,
+                       s->indexes_are_disabled, s->keys_onoff, rows, FALSE))
+      DBUG_RETURN(TRUE);
+  }
+
+  /*
+    TODO: create an own KEY_CACHE per thread, at least for MyISAM
+    The keys are the main performance problem, just copying data is OK
+  */
+  thread->ok= TRUE;
+  DBUG_RETURN(FALSE);
+}
+
+/*
+  Read data from its assigned partitions and putting them in the correct 'to'
+  partition row-queue.
+*/
+pthread_handler_t
+parallel_read_data_thread(void *arg)
+{
+  ALTER_DATA_THREAD *read_thread= (ALTER_DATA_THREAD*)arg;
+  bool ok;
+  int error;
+  int ha_error= 0;
+
+  my_thread_init();
+  DBUG_ENTER("parallel_read_data_thread");
+  /* We start by allocating a THD object and table objects */
+  pthread_mutex_lock(read_thread->lock);
+  if (!read_thread->ok)
+    goto end;
+  /* Thread local tmp table setup etc */
+  if (init_alter_data_thread(read_thread, READ_THREAD))
+    goto end;
+  /*
+    Initial phase done, signal to start_new_alter_thread to continue with next
+    thread or start of next phase
+  */
+  pthread_cond_signal(read_thread->ctrl_cond);
+
+  /* Wait for connection thread to start copying phase */
+    /* add loop */
+  pthread_cond_wait(read_thread->ctrl_cond, read_thread->lock);
+  ok= read_thread->ok;
+  pthread_mutex_unlock(read_thread->lock);
+
+  if (ok)
+  {
+    /* Now it is time to perform the actual reading phase */
+    error= do_read_data_from_table(read_thread);
+  }
+  pthread_mutex_lock(read_thread->lock);
+  if (error)
+  {
+    DBUG_PRINT("info", ("Got error %u from do_read_data_from_table", error));
+    read_thread->ok= FALSE;
+  }
+
+  /* Signal to connection thread that we're done with the copying phase */
+ end:
+  /* must end transaction, even if we did not change anything */
+  post_trans_part(read_thread->thd, read_thread->to, &ha_error, TRUE);
+  read_thread->ha_error= ha_error;
+  close_down_thread(read_thread);
+  if (read_thread->wait_for_signal)
+    pthread_cond_signal(read_thread->ctrl_cond);
+  read_thread->done= TRUE;
+  pthread_mutex_unlock(read_thread->lock);
+  my_thread_end();
+  pthread_exit(0);
+  DBUG_RETURN(0);
+}
+
+
+/*
+  reading data from the row queue and writing it to the assigned partition
+*/
+pthread_handler_t
+parallel_write_data_thread(void *arg)
+{
+  ALTER_DATA_THREAD *write_thread= (ALTER_DATA_THREAD*)arg;
+  ALTER_DATA_SHARE *data_share;
+  TABLE *to;
+  THD *thd;
+  ha_rows copied, deleted;
+  ulonglong prev_insert_id;
+  ulong record_id;
+  uchar *buf, *saved_record0, *record_buffer;
+  int ha_error= 0;
+
+  my_thread_init();
+  DBUG_ENTER("parallel_write_data_thread");
+  /* We start by allocating a THD object and table objects */
+  pthread_mutex_lock(write_thread->lock);
+  /* TODO: check if we really should 'goto end' */
+  if (!write_thread->ok)
+    goto end;
+  /* Thread local tmp table setup, start transaction etc */
+  if (init_alter_data_thread(write_thread, WRITE_THREAD))
+    goto end;
+  to= write_thread->to;
+  saved_record0= to->record[0];
+  copied= write_thread->copied;
+  deleted= write_thread->deleted;
+  thd= write_thread->thd;
+  DBUG_PRINT("info", ("Write thread %u thd (%p) signalling 'started'",
+                      write_thread->thread_num, thd));
+  /*
+    Initial phase done, signal to start_new_alter_thread to continue with next
+    thread or start of next phase
+  */
+  pthread_cond_signal(write_thread->ctrl_cond);
+
+  /* Wait for connection thread to start copying phase */
+  DBUG_PRINT("info", ("Write thread %u waiting for start signal",
+                      write_thread->thread_num));
+  /* add loop */
+  pthread_cond_wait(write_thread->ctrl_cond, write_thread->lock);
+  data_share= write_thread->alter_data_share;
+  record_buffer= data_share->record_buffer;
+  pthread_mutex_unlock(write_thread->lock);
+
+  /* Tell handler that we have values for all columns in the to table */
+  to->use_all_columns();
+  if (data_share->ignore)
+    to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+  /* WHILE (get record from a scan thread) */
+  while (1)
+  {
+    if (*(data_share->kill_flag))
+    {
+      DBUG_PRINT("info", ("kill_flag was set to %u",
+                          *(data_share->kill_flag)));
+      pthread_mutex_lock(write_thread->lock);
+      write_thread->ok= FALSE;
+      write_thread->killed= TRUE;
+      pthread_mutex_unlock(write_thread->lock);
+      break;
+    }
+    /* TODO: use prefetch to increase throughput */
+    record_id= get_from_rec_queue(write_thread);
+    if (record_id == RECORD_QUEUE_DONE)
+      break;
+    buf= &record_buffer[record_id * data_share->buf_reclength];
+    /* Call set_field_ptr */
+    //set_field_ptr(to->field, buf, to->record[0]);
+    //to->record[0]= buf;
+    /*
+      Currently ha_partition uses a local m_rec0 as table->record[0] 
+      which limits us to copy the data, rather than just set that to buf
+    */
+    /* TODO: do prefetch here. */
+    bcopy(buf, to->record[0], to->s->reclength);
+    DBUG_PRINT("info", ("write to record bcopy (%p) -> (%p)",
+                        buf, to->record[0]));
+    //print_records(to, to->record[0]);
+    /* auto_increment/next_number_field was reset in copy_altered_record */
+    prev_insert_id= to->file->next_insert_id;
+    DBUG_PRINT("info", ("write_thread: thd (%p) to->in_use (%p) to (%p)",
+                        thd, to->in_use, to));
+    ha_error= to->file->ha_write_row(to->record[0]);
+    free_rec_id(write_thread, record_id);
+    /* Post record written to scan thread */
+    to->auto_increment_field_not_null= FALSE;
+    if (ha_error)
+    {
+      /* Only allow duplicate errors if non fatal and ignore is set */
+
+      if (!data_share->ignore ||
+          to->file->is_fatal_error(ha_error, HA_CHECK_DUP))
+      {
+        /* Don't report errors here, do it in the main thread later! */
+        /*
+          TODO: may we set kill_flag (main thread THD->killed) here? 
+          to stop the other running threads.
+          How do we stop all other threads? use the record/free queues?
+        */
+        DBUG_PRINT("info", ("got ha_error %d from ha_write_row", ha_error));
+        break;
+      }
+      to->file->restore_auto_increment(prev_insert_id);
+      ha_error= 0;
+      deleted++;
+    }
+    else
+      copied++;
+  }
+  /* is set_field_ptr needed here? */
+  set_field_ptr(to->field, saved_record0, to->record[0]);
+  to->record[0]= saved_record0;
+
+  /* End of write session */
+  post_trans_part(thd, to, &ha_error, FALSE);
+
+  pthread_mutex_lock(write_thread->lock);
+  write_thread->copied= copied;
+  write_thread->deleted= deleted;
+  write_thread->ha_error= ha_error;
+
+  /* Signal to connection thread that we're done writing */
+end:
+  close_down_thread(write_thread);
+  if (write_thread->wait_for_signal)
+    pthread_cond_signal(write_thread->ctrl_cond);
+  write_thread->done= TRUE;
+  pthread_mutex_unlock(write_thread->lock);
+  my_thread_end();
+  pthread_exit(0);
+  DBUG_RETURN(0);
+}
+
+
+static int
+do_copy_data_between_tables(TABLE *from,
+                            TABLE *to,
+                            TABLE *orig_from,
+                            List<Create_field> *create,
+                            THD *thd,
+                            volatile THD::killed_state *kill_flag,
+                            bool *killed,
+                            bool ignore,
+                            uint order_num,
+                            ORDER *order,
+                            ha_rows *copied,
+                            ha_rows *deleted,
+                            enum enum_enable_or_disable keys_onoff,
+                            bool error_if_not_empty,
+                            int *ha_error,
+                            ha_rows estimated_rows)
+{
+  int error= -1;
+  Copy_field *copy,*copy_end;
+  bool auto_increment_field_copied= FALSE;
+  bool blob_field_copied= FALSE;
+  ha_rows examined_rows;
+  ulong save_sql_mode;
+  DBUG_ENTER("do_copy_data_between_tables");
+  /* DBUG_PRINT("info", ("from %p to %p orig_from %p thd %p", from, to, orig_from,
+                      thd)); */
+  DBUG_PRINT("info", ("from %p to %p thd %p", from, to, thd));
+
+  to->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+  to->next_number_field= to->found_next_number_field;
+  save_sql_mode= thd->variables.sql_mode;
+  *copied= *deleted= 0;
+
+  if ((create_copy_fields(to, from, orig_from, create, thd,
+                          &auto_increment_field_copied,
+                          &blob_field_copied, &copy, &copy_end)))
+    goto early_error;
+  
+  if (order && (error= handle_scan_order(from, thd, order_num,
+                                         order, &examined_rows)))
+    goto err;
+
+  error= copy_data_part(to, from, thd,
+                        kill_flag, killed, error_if_not_empty,
+                        auto_increment_field_copied, ignore,
+                        copied, deleted,
+                        copy, copy_end, ha_error);
+
+ err:
+  thd->variables.sql_mode= save_sql_mode;
+  thd->abort_on_warning= 0;
+  /* Scan end handling */
+  free_io_cache(from);
+  DBUG_RETURN(error);
+
+ early_error:
+  thd->variables.sql_mode= save_sql_mode;
+  thd->abort_on_warning= 0;
+  DBUG_RETURN(-1);                              /* purecov: inspected */
+}
+
+pthread_handler_t
+parallel_copy_data_thread(void *arg)
+{
+  ALTER_DATA_THREAD *loc_thread= (ALTER_DATA_THREAD*)arg;
+  ALTER_DATA_SHARE *data_share;
+  THD *thd;
+  TABLE *to;
+  int error;
+  int ha_error= 0;
+
+  /* We start by allocating a THD object and table objects */
+  my_thread_init();
+  DBUG_ENTER("parallel_copy_data_thread");
+  /*
+    Keep the mutex until the end (open during cond_wait),
+    since we use the loc_thread variables
+  */
+  pthread_mutex_lock(loc_thread->lock);
+  if (!loc_thread->ok)
+    goto end;
+  /* Thread local tmp table setup, start transaction etc */
+  if (init_alter_data_thread(loc_thread, COPY_THREAD))
+    goto end;
+  DBUG_ASSERT(loc_thread->ok);
+  to= loc_thread->to;
+  thd= loc_thread->thd;
+  /*
+    Initial phase done, signal to start_new_alter_thread to continue with next
+    thread or start of next phase
+  */
+  pthread_cond_signal(loc_thread->ctrl_cond);
+
+  /* Wait for connection thread to start copying phase */
+  /* add loop */
+  pthread_cond_wait(loc_thread->ctrl_cond, loc_thread->lock);
+  data_share= loc_thread->alter_data_share;
+
+  if (loc_thread->ok)
+  {
+    /* Now it is time to perform the actual copying phase */
+    error= do_copy_data_between_tables(loc_thread->from,
+                                       to,
+                                       data_share->from,
+                                       data_share->create,
+                                       thd,
+                                       data_share->kill_flag,
+                                       &loc_thread->killed,
+                                       data_share->ignore,
+                                       data_share->order_num,
+                                       data_share->order,
+                                       &loc_thread->copied,
+                                       &loc_thread->deleted,
+                                       data_share->keys_onoff,
+                                       data_share->error_if_not_empty,
+                                       &ha_error,
+                                       data_share->estimated_rows);
+  }
+  loc_thread->ha_error= ha_error;
+  if (error)
+    loc_thread->ok= FALSE;
+
+  post_trans_part(thd, to, &ha_error, FALSE);
+
+  /* Signal to connection thread that we're done with the copying phase */
+end:
+  close_down_thread(loc_thread);
+  if (loc_thread->wait_for_signal)
+    pthread_cond_signal(loc_thread->ctrl_cond);
+  loc_thread->done= TRUE;
+  pthread_mutex_unlock(loc_thread->lock);
+  my_thread_end();
+  pthread_exit(0);
+  DBUG_RETURN(0);
+}
+
+/*
+  initializes mutex and condition, locks mutex, and then starts the thread,
+  signal condition, unlocks mutex.
+  Regardless of failure and status, it always leaves the mutex unlocked.
+*/
+
+static bool
+start_new_alter_thread(ALTER_DATA_THREAD *loc_thread,
+                       void* (*loc_thread_function)(void *))
+{
+  DBUG_ENTER("start_new_alter_thread");
+  DBUG_PRINT("info", ("Thread %u to be started", loc_thread->thread_num));
+
+  if (pthread_mutex_lock(loc_thread->lock))
+  {
+    loc_thread->ok= FALSE;
+    DBUG_PRINT("info", ("Failed to init/lock cond/mutex for thread %u",
+                        loc_thread->thread_num));
+    DBUG_RETURN(TRUE);
+  }
+
+  /* Create new thread */
+  if (pthread_create(&loc_thread->pthread_id, &connection_attrib,
+                     loc_thread_function,
+                     (void*)loc_thread) || !loc_thread->pthread_id)
+  {
+    DBUG_ASSERT(!loc_thread->pthread_id);
+    loc_thread->pthread_id= 0;
+    loc_thread->ok= FALSE;
+    DBUG_PRINT("info", ("Thread %u failed to start", loc_thread->thread_num));
+    /* release the mutex to allow later destroy */
+    pthread_mutex_unlock(loc_thread->lock);
+    DBUG_RETURN(TRUE);
+  }
+  /* Wait until new thread has created THD and the two table objects */
+  DBUG_PRINT("info", ("Thread %u have started, waiting for its signal",
+                      loc_thread->thread_num));
+    /* add loop */
+  if (pthread_cond_wait(loc_thread->ctrl_cond, loc_thread->lock))
+  {
+    /* Handle errors in initialisation of threads */
+    DBUG_PRINT("info", ("Thread %d failed to initiate",
+                        loc_thread->thread_num));
+    loc_thread->ok= FALSE;
+    /* release the mutex to allow the thread to handle the error */
+    pthread_mutex_unlock(loc_thread->lock);
+    DBUG_RETURN(TRUE);
+  }
+  DBUG_PRINT("info", ("Thread %u got signal", loc_thread->thread_num));
+
+  if (!loc_thread->ok)
+  {
+    /* Handle errors in initialisation of threads */
+    DBUG_PRINT("info", ("Thread %d failed to initiate",
+                        loc_thread->thread_num));
+    /* release the mutex to allow the thread to handle the error */
+    pthread_mutex_unlock(loc_thread->lock);
+    DBUG_RETURN(TRUE);
+  }
+  DBUG_PRINT("info", ("Thread %u have created its object and is ready to run",
+                      loc_thread->thread_num));
+  /* TODO: if myisam, assign key cache for better concurrency */
+  /*
+    TODO: assign new/tmp keycache to every thread
+    1) create keycache X bytes (get_or_create)
+    2) assign_to_key_cache (all partitions for the thread)
+    3) set the keycache to 0 bytes (there is no delete of key caches)
+  */
+  pthread_mutex_unlock(loc_thread->lock);
+  DBUG_RETURN(FALSE);
+}
+
+
+static bool
+allocate_alter_copy_structs(THD *thd, ALTER_DATA_SHARE *s,
+                           ALTER_DATA_THREAD **read_data_threads,
+                           ALTER_DATA_THREAD **write_data_threads)
+{
+  /* TODO: Maybe rearrange this so it does not suffer from shared caches */
+  if (!(*read_data_threads= (ALTER_DATA_THREAD*)
+                           thd->alloc(sizeof(ALTER_DATA_THREAD) *
+                                      s->num_read_threads)) ||
+      !(s->read_ctrl_cond= (pthread_cond_t*)
+                            thd->alloc(sizeof(pthread_cond_t) *
+                                       s->num_read_threads)) ||
+      !(s->read_mutex= (pthread_mutex_t*)
+                          thd->alloc(sizeof(pthread_mutex_t) *
+                                       s->num_read_threads)))
+      return TRUE;
+
+  if (write_data_threads)
+  {
+    /* TODO: Maybe rearrange this so it does not suffer from shared caches */
+    if (!(s->read_com_cond= (pthread_cond_t*)
+                            thd->alloc(sizeof(pthread_cond_t) *
+                                         s->num_read_threads)) ||
+        !(*write_data_threads= (ALTER_DATA_THREAD*)
+                              thd->alloc(sizeof(ALTER_DATA_THREAD) *
+                                         s->num_write_threads)) ||
+        !(s->write_com_cond= (pthread_cond_t*)
+                              thd->alloc(sizeof(pthread_cond_t) *
+                                         s->num_write_threads)) ||
+        !(s->write_ctrl_cond= (pthread_cond_t*)
+                              thd->alloc(sizeof(pthread_cond_t) *
+                                         s->num_write_threads)) ||
+        !(s->write_mutex= (pthread_mutex_t*)
+                          thd->alloc(sizeof(pthread_mutex_t) *
+                                         s->num_write_threads)))
+      return TRUE;
+  }
+  return FALSE;
+}
+
+/* if write_ok != NULL also assign com cond and write mutex/conds */
+static bool
+init_alter_copy_conds(ALTER_DATA_SHARE *s, bool writer)
+{
+  uint i;
+  DBUG_ENTER("init_alter_copy_conds");
+  for (i= 0; i < s->num_read_threads; i++)
+  {
+    if (pthread_mutex_init(&s->read_mutex[i], MY_MUTEX_INIT_FAST) ||
+        pthread_cond_init(&s->read_ctrl_cond[i], NULL) ||
+        (writer && pthread_cond_init(&s->read_com_cond[i], NULL)))
+    {
+      s->num_read_threads= i;
+      s->num_write_threads= 0;
+      DBUG_RETURN(TRUE);
+    }
+  }
+  if (writer)
+  {
+    for (i= 0; i < s->num_write_threads; i++)
+    {
+      if (pthread_mutex_init(&s->write_mutex[i], MY_MUTEX_INIT_FAST) ||
+          pthread_cond_init(&s->write_com_cond[i], NULL) ||
+          pthread_cond_init(&s->write_ctrl_cond[i], NULL))
+      {
+        s->num_write_threads= i;
+        DBUG_RETURN(TRUE);
+      }
+    }
+  }
+  DBUG_RETURN(FALSE);
+}
+
+static void
+destroy_alter_copy_conds(ALTER_DATA_SHARE *s, bool writer)
+{
+  uint i;
+  DBUG_ENTER("destroy_alter_copy_cond");
+  for (i= 0; i < s->num_read_threads; i++)
+  {
+    pthread_mutex_destroy(&s->read_mutex[i]);
+    pthread_cond_destroy(&s->read_ctrl_cond[i]);
+    if (writer)
+      pthread_cond_destroy(&s->read_com_cond[i]);
+  }
+  if (writer)
+  {
+    for (i= 0; i < s->num_write_threads; i++)
+    {
+      pthread_mutex_destroy(&s->write_mutex[i]);
+      pthread_cond_destroy(&s->write_com_cond[i]);
+      pthread_cond_destroy(&s->write_ctrl_cond[i]);
+    }
+  }
+  DBUG_VOID_RETURN;
+}
+
+
+/* to avoid the overhead of handling extra threads */
+#define MIN_ROWS_FOR_PARALLEL_ALTER 1
+//#define MIN_ROWS_FOR_PARALLEL_ALTER 1000
+
+static int
+copy_data_between_tables(TABLE *from,TABLE *to, TABLE_LIST *table_list,
+                         List<Create_field> *create,
+                         THD *thd,
+                         uint alter_flags,
+                         char *tmp_name,
+                         char *new_db,
+                         bool ignore,
+                         uint order_num, ORDER *order,
+                         ha_rows *copied,
+                         ha_rows *deleted,
+                         enum enum_enable_or_disable keys_onoff,
+                         bool error_if_not_empty)
+{
+  ha_rows estimated_rows;
+  uint num_read_parts, num_write_parts, i;
+  int error= 0;
+  int ha_error= 0;
+  uint num_threads= thd->variables.max_query_threads;
+  bool killed= FALSE;
+  bool readers_ok= TRUE;
+  bool writers_ok= TRUE;
+  int indexes_are_disabled;
+  DBUG_ENTER("copy_data_between_tables");
+  DBUG_PRINT("info", ("to (%p) from (%p)", to, from));
+  if (order &&
+      to->s->primary_key != MAX_KEY && to->file->primary_key_is_clustered())
+  {
+    char warn_buff[MYSQL_ERRMSG_SIZE];
+    my_snprintf(warn_buff, sizeof(warn_buff), 
+                "ORDER BY ignored as there is a user-defined clustered index"
+                " in the table '%-.192s'", from->s->table_name.str);
+    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
+                 warn_buff);
+    order= NULL;
+  }
+  indexes_are_disabled= from->file->indexes_are_disabled();
+  from->file->info(HA_STATUS_VARIABLE);
+  estimated_rows= from->file->stats.records;
+  DBUG_PRINT("info", ("estimated_rows %lu", estimated_rows));
+  num_threads= min(num_threads, MAX_NUM_THREADS);
+  /* TODO: remove this test!!! */
+  if (num_threads < 5)
+    num_threads= 5;
+  DBUG_PRINT("info", ("num_threads = %d", num_threads));
+  /*
+    TODO:
+    Check if new table included BLOBS, and if, add limitations.
+    remove the 'order' limit, to allow it to work on parallel alter too
+    It will work like this:
+    The order will be in read order, so for no partitioning changes, each
+    partition will be ordered, but if it is not partitioned the same way as
+    before, the result will be multiplexed streams of sorted rows.
+    (One stream for each 'from' partition).
+      ((num_read_parts <= 1) && (num_write_parts <= 1)) ||
+      from->s->blob_field || to->s->blob_field)
+      (num_read_parts == 0) || (num_write_parts == 0) ||
+  */
+  if (estimated_rows < MIN_ROWS_FOR_PARALLEL_ALTER ||
+      to->file->get_no_parts(tmp_name, &num_write_parts) ||
+      from->file->get_no_parts(tmp_name, &num_read_parts) ||
+      (num_read_parts == 0) || (num_write_parts == 0) ||
+      (num_threads <= 1) ||
+      (order != NULL))
+  {
+    /* This is the 'old' single threaded path, no threads are created */
+    num_threads= 0;
+    DBUG_PRINT("info", ("doing old, single threaded copy"));
+    if (pre_trans_part(to, thd, ignore, indexes_are_disabled, keys_onoff,
+                       estimated_rows, FALSE))
+      error= 1;
+    else
+    {
+
+      error= do_copy_data_between_tables(from, to, from, create, thd,
+                                         &thd->killed, &killed,
+                                         ignore, order_num,
+                                         order, copied, deleted, keys_onoff,
+                                         error_if_not_empty, &ha_error,
+                                         estimated_rows);
+      DBUG_PRINT("info", ("got error %d (%d) from do_copy_data_between_tables",
+                          error, ha_error));
+
+      post_trans_part(thd, to, &ha_error, FALSE);
+    }
+  }
+  else if (alter_flags & ALTER_PARTITION || num_write_parts != num_read_parts)
+  {
+    /* TEST to always do parallel alter !!! */
+    /* assert that changes of partiton always sets ALTER_PARTITION */
+    /* It must not be ALTER_PARTITION, it can be addition of index etc */
+    //DBUG_ASSERT(alter_flags & ALTER_PARTITION);
+    ALTER_DATA_SHARE alter_data_share;
+    ulong free_list_size;
+    ulong record_queue_size;
+    RECORD_QUEUE *record_queue;
+    FREE_RECORDS free_records;
+    ALTER_DATA_THREAD *read_data_threads;
+    ALTER_DATA_THREAD *write_data_threads;
+    /* just for checking... */
+    ulonglong writer_copied= 0;
+    ulonglong writer_deleted= 0;
+    uint buf_reclength;
+    size_t max_mem_for_record_buffer;
+    uint num_read_threads;
+    uint num_write_threads;
+
+    DBUG_PRINT("info", ("doing new, multi-threaded read/write splitted copy"));
+    /* Always round in favour of number of read threads */
+    num_write_threads= (num_threads * (100 - thd->variables.query_thread_bias))
+                       / 100;
+    if (num_write_threads >= num_threads)
+      num_write_threads= num_threads - 1;
+    num_write_threads= min(num_write_threads, num_write_parts);
+    if (!num_write_threads)
+      num_write_threads= 1;
+    num_read_threads= num_threads - num_write_threads;
+    num_read_threads= min(num_read_threads, num_read_parts);
+    DBUG_ASSERT(num_read_threads && num_write_threads);
+    DBUG_ASSERT((num_read_threads + num_write_threads) <= num_threads);
+    alter_data_share.num_read_threads= num_read_threads;
+    alter_data_share.num_write_threads= num_write_threads;
+    alter_data_share.num_read_parts= num_read_parts;
+    alter_data_share.num_write_parts= num_write_parts;
+    alter_data_share.estimated_rows= estimated_rows;
+    alter_data_share.sql_mode= thd->variables.sql_mode;
+    alter_data_share.indexes_are_disabled= indexes_are_disabled;
+
+    /*
+      What would the performance and problems be if we used temporary tables
+      for queue for records from read thread to write thread instead?
+    */
+
+    /*
+      max_mem_used_for_parallel_alter= 
+        B x (reclenght + 2xsizeof(uint)) record buffer 
+       + B x 2 x sizeof(uint) each read thread have an own free list 
+       + num_write_threads x B x sizeof(uint) each writers queue 
+    */
+    bzero(&free_records, sizeof(FREE_RECORDS));
+    free_records.sleeping= (uint*) thd->alloc(num_read_threads * sizeof(uint));
+    if (!free_records.sleeping)
+      DBUG_RETURN(1);
+
+    /*
+      Should we align the data to cache line sizes to avoid concurrency problem
+      (when lifting the mutex)?
+    */
+    buf_reclength= ALIGN_SIZE(to->s->reclength);
+    /*
+      TODO: Be more exact...
+      This is the first version, add free buffers and to buffers to readers,
+      and delay freeing in writers in the next version to decrease locking
+    */
+
+    max_mem_for_record_buffer= ((thd->variables.max_query_memory * 1024 * 1024)
+        - (sizeof(FREE_RECORDS) + num_write_threads * sizeof(RECORD_QUEUE)));
+    if (max_mem_for_record_buffer < 1024)
+      DBUG_RETURN(1);
+
+    /*
+      Every buffered record takes:
+      buf_reclength (size of record in the shared record buffer)
+      sizeof(ulong) for the free_record list
+      sizeof(ulong) per write_thread for the record_queues.
+    */
+    alter_data_share.max_rec_in_buffer=
+      max_mem_for_record_buffer /
+      (buf_reclength + (1 + num_write_threads) * sizeof(ulong));
+
+
+    /* should do single threaded or lower thread or increase memory limits? */
+    if (alter_data_share.max_rec_in_buffer < min(num_read_threads,
+                                                 num_write_threads))
+      DBUG_RETURN(1);
+
+    alter_data_share.record_buffer=
+                   (uchar*) thd->alloc(alter_data_share.max_rec_in_buffer *
+                                       buf_reclength);
+    alter_data_share.buf_reclength= buf_reclength;
+    if (!alter_data_share.record_buffer)
+      DBUG_RETURN(1);
+
+    free_list_size= alter_data_share.max_rec_in_buffer * sizeof(ulong);
+    free_records.list= (ulong*) thd->alloc(free_list_size);
+    if (!free_records.list)
+      DBUG_RETURN(1);
+    bzero((uchar*) free_records.list, free_list_size);
+    for (i= 0; i < alter_data_share.max_rec_in_buffer; i++)
+      free_records.list[i]= i;
+    free_records.num_freed_records= alter_data_share.max_rec_in_buffer;
+    if (pthread_mutex_init(&free_records.lock, MY_MUTEX_INIT_FAST))
+      DBUG_RETURN(1);
+    alter_data_share.free_records= &free_records;
+
+    record_queue_size= num_write_threads * sizeof(RECORD_QUEUE);
+    record_queue= (RECORD_QUEUE*) thd->alloc(record_queue_size);
+    if (!record_queue)
+      DBUG_RETURN(1);
+    bzero((uchar*) record_queue, record_queue_size);
+    for (i= 0; i < num_write_threads; i++)
+    {
+      record_queue[i].records_in_queue= (uint*) thd->alloc(free_list_size);
+      if (!record_queue[i].records_in_queue)
+      {
+        pthread_mutex_destroy(&free_records.lock);
+        DBUG_RETURN(1);
+      }
+      bzero((uchar*) record_queue[i].records_in_queue, free_list_size);
+    }
+    alter_data_share.record_queues= record_queue;
+    /*
+      Some thought about how to copy the row:
+      - Every read thread has its set of partitions to read from and goes
+        through them in serial (i.e. no ordered alter between partitions yet).
+        For each row:
+          - take a free to-record for buffering to the writer
+          - copy/convert the read row to the new write record format
+          - calculate which partition it should go to
+          - put it into the row queue of that writer thread
+        This would increase the copying in the server, but is unavoidable
+        when using the internal record format.
+        Optimizations:
+         - use chunks or lists when taking free records / put in row queue
+           to writer threads.
+         - Calculate which partition to write to BEFORE copy/converting
+           the record, and then use a consecutive block/list for each
+           write thread (All fields in the partitioning expressions maps
+           either to a 'from' field or a default).
+      - When calling read_record (rnd_next) it frees the allocated blob data
+        that is not stored directly in the record (only pointers)
+        So either we have to copy the blob data to be able to read the next
+        row, or simply wait until it has been written.
+        Conclusion, if copying a table including BLOB fields, we MUST wait for
+        the row to be written, before we can read a new record from the same
+        partition. We do not support this in this version...
+          
+      The record queue works like this:
+      1) a reader aquires the mutex and puts all its rows (1 or more)
+
+      Discarded ideas:
+      - create a broker which takes rows from the reader, converts them
+        to the new record format, and checks to which writer thread.
+        (This would probably be a bottle neck when scaling over a lot of cpus
+        and extra copying of records)
+        -> the conversion and mapping to writers must be done in the reader
+           (This would also avoid locking/changing table->record[0] between
+            reads)
+    */
+
+    /* Allocate all shared and thread specific structs */
+    if (allocate_alter_copy_structs(thd, &alter_data_share, &read_data_threads,
+                                    &write_data_threads))
+    {
+      pthread_mutex_destroy(&free_records.lock);
+      DBUG_RETURN(1);
+    }
+    if (init_alter_copy_conds(&alter_data_share, TRUE))
+    {
+      destroy_alter_copy_conds(&alter_data_share, TRUE);
+      pthread_mutex_destroy(&free_records.lock);
+      DBUG_RETURN(1);
+    }
+    alter_data_share.new_db= new_db;
+    alter_data_share.tmp_name= tmp_name;
+    alter_data_share.ignore= ignore;
+    alter_data_share.order= order;
+    alter_data_share.order_num= order_num;
+    alter_data_share.keys_onoff= keys_onoff;
+    alter_data_share.error_if_not_empty= error_if_not_empty;
+    alter_data_share.create= create;
+    alter_data_share.kill_flag= &thd->killed;
+    alter_data_share.to= to;
+    alter_data_share.from= from;
+    alter_data_share.table_list= table_list;
+
+    /* initialize template thread */
+    read_data_threads[0].alter_data_share= &alter_data_share;
+    read_data_threads[0].ok= TRUE;
+    read_data_threads[0].done= FALSE;
+    read_data_threads[0].killed= FALSE;
+    read_data_threads[0].wait_for_signal= FALSE;
+    read_data_threads[0].copied= 0;
+    read_data_threads[0].deleted= 0;
+    read_data_threads[0].ha_error= 0;
+    read_data_threads[0].table_list= table_list;
+    /* 'from' will be replaced with a new instance for reading */
+    read_data_threads[0].from= from;
+    /* only set if initialized */
+    read_data_threads[0].thread_num= 0;
+
+    /* Create the reader threads */
+    DBUG_PRINT("info", ("Starting %u read threads", num_read_threads));
+    for (i= 0; i < num_read_threads; i++)
+    {
+      if (i)
+        bcopy(&read_data_threads[0], &read_data_threads[i],
+              sizeof(ALTER_DATA_THREAD));
+      read_data_threads[i].thread_num= i;
+      read_data_threads[i].ctrl_cond= &alter_data_share.read_ctrl_cond[i];
+      read_data_threads[i].com_cond= &alter_data_share.read_com_cond[i];
+      read_data_threads[i].lock= &alter_data_share.read_mutex[i];
+      if (start_new_alter_thread(&read_data_threads[i],
+                                 parallel_read_data_thread))
+      {
+        readers_ok= FALSE;
+        break;
+      }
+    }
+    if (readers_ok)
+    {
+      /* Create the writer threads */
+      DBUG_PRINT("info", ("Starting %u write threads", num_write_threads));
+      for (i= 0; i < num_write_threads; i++)
+      {
+        bcopy(&read_data_threads[0], &write_data_threads[i],
+              sizeof(ALTER_DATA_THREAD));
+        write_data_threads[i].thread_num= i;
+        write_data_threads[i].from= NULL;
+        write_data_threads[i].ctrl_cond= &alter_data_share.write_ctrl_cond[i];
+        write_data_threads[i].com_cond= &alter_data_share.write_com_cond[i];
+        write_data_threads[i].lock= &alter_data_share.write_mutex[i];
+        if (start_new_alter_thread(&write_data_threads[i],
+                                   parallel_write_data_thread))
+        {
+          writers_ok= FALSE;
+          break;
+        }
+      }
+    }
+    /* TODO: Verify that the killing mechanism works */
+    DBUG_PRINT("info", ("All %d + %d threads have started",
+                        num_read_threads, num_write_threads));
+    for (i= 0; i < num_read_threads; i++)
+    {
+      pthread_mutex_lock(read_data_threads[i].lock);
+      if (read_data_threads[i].pthread_id)
+      {
+        /* Wake up the read threads to read/error handling one by one */
+        DBUG_PRINT("info", ("Sends signal to read thread %d for start reading",
+                            i));
+        read_data_threads[i].ok= (readers_ok && writers_ok);
+        pthread_cond_signal(read_data_threads[i].ctrl_cond);
+      }
+      pthread_mutex_unlock(read_data_threads[i].lock);
+    }
+    if (readers_ok)
+    {
+      for (i= 0; i < num_write_threads; i++)
+      {
+        pthread_mutex_lock(write_data_threads[i].lock);
+        if (write_data_threads[i].pthread_id)
+        {
+          /* Wake up the write threads to write/error handling one by one */
+          DBUG_PRINT("info", ("Sends signal to write thread %d for start writing",
+                              i));
+          write_data_threads[i].ok= writers_ok;
+          pthread_cond_signal(write_data_threads[i].ctrl_cond);
+        }
+        pthread_mutex_unlock(write_data_threads[i].lock);
+      }
+    }
+    /* Wait for the copy/error handling processes to be completed */
+    /* TODO: Add quit flag to free records / record queues */
+    for (i= 0; i < num_read_threads; i++)
+    {
+      DBUG_PRINT("info", ("Checking if read thread %d is done", i));
+      pthread_mutex_lock(read_data_threads[i].lock);
+      if (read_data_threads[i].pthread_id)
+      {
+        if (!read_data_threads[i].done)
+        {
+          read_data_threads[i].wait_for_signal= TRUE;
+          DBUG_PRINT("info", ("Read thread %d is not done, waiting", i));
+          /* add loop */
+          pthread_cond_wait(read_data_threads[i].ctrl_cond,
+                            read_data_threads[i].lock);
+        }
+        DBUG_PRINT("info", ("Read thread %d is done, continuing", i));
+      }
+      pthread_mutex_unlock(read_data_threads[i].lock);
+    }
+    /* release all record queues, by add an end record to the queue */
+    if (readers_ok)
+      for (i= 0; i < num_write_threads; i++)
+        put_in_rec_queue(&write_data_threads[i], i, RECORD_QUEUE_DONE);
+
+    for (i= 0; i < num_write_threads; i++)
+    {
+      DBUG_PRINT("info", ("Checking if write thread %d is done", i));
+      pthread_mutex_lock(write_data_threads[i].lock);
+      if (!write_data_threads[i].done)
+      {
+        write_data_threads[i].wait_for_signal= TRUE;
+        DBUG_PRINT("info", ("Write thread %d is not done, waiting", i));
+    /* add loop */
+        pthread_cond_wait(write_data_threads[i].ctrl_cond,
+                          write_data_threads[i].lock);
+      }
+      DBUG_PRINT("info", ("Write thread %d is done, continueing", i));
+      pthread_mutex_unlock(write_data_threads[i].lock);
+    }
+
+    /*
+      All threads have completed copying/error handling, we can finish up now
+    */
+    DBUG_PRINT("info", ("All %u + %u threads are done", num_read_threads,
+                        num_write_threads));
+    /* No need to lock the thread mutexes any longer */
+    for (i= 0; i < num_read_threads; i++)
+    {
+      if (ha_error == 0)
+        ha_error= read_data_threads[i].ha_error;
+      if (read_data_threads[i].ha_error)
+      {
+        DBUG_PRINT("info", ("Read thread %u had ha_error %u", i,
+                            read_data_threads[i].ha_error));
+      }
+      if (error == 0 && !read_data_threads[i].ok)
+        error= 1;
+      if (!read_data_threads[i].ok)
+        DBUG_PRINT("info", ("Read thread %u was NOT OK", i));
+      if (read_data_threads[i].killed)
+      {
+        DBUG_PRINT("info", ("Read thread %u was marked killed", i));
+        killed= TRUE;
+      }
+      DBUG_PRINT("info", ("Read thread %d copied %lu rows, deleted %lu rows",
+                          i, (ulong) read_data_threads[i].copied,
+                          (ulong) read_data_threads[i].deleted));
+      (*copied)+= read_data_threads[i].copied;
+      (*deleted)+= read_data_threads[i].deleted;
+      /* probably not needed, since allocated through thd. */
+    }
+    for (i= 0; i < num_write_threads; i++)
+    {
+      if (ha_error == 0)
+        ha_error= write_data_threads[i].ha_error;
+      if (write_data_threads[i].ha_error)
+      {
+        DBUG_PRINT("info", ("Write thread %u had ha_error %u", i,
+                            write_data_threads[i].ha_error));
+      }
+      if (error == 0 && !write_data_threads[i].ok)
+        error= 1;
+      if (!write_data_threads[i].ok)
+        DBUG_PRINT("info", ("Write thread %u was NOT OK", i));
+      if (write_data_threads[i].killed)
+      {
+        DBUG_PRINT("info", ("Write thread %u was marked killed", i));
+        killed= TRUE;
+      }
+      DBUG_PRINT("info", ("Write thread %d copied %lu rows, deleted %lu rows",
+                          i, (ulong) write_data_threads[i].copied,
+                          (ulong) write_data_threads[i].deleted));
+      writer_copied+= write_data_threads[i].copied;
+      writer_deleted+= write_data_threads[i].deleted;
+      /* TODO: fix the correct count between readers and writers */
+      /* probably not needed, since allocated through thd. */
+    }
+    pthread_mutex_destroy(&free_records.lock);
+    destroy_alter_copy_conds(&alter_data_share, TRUE);
+    /* DBUG_ASSERT(writer_copied == *copied); */
+    (*copied)= writer_copied;
+    thd->row_count= writer_copied;
+    DBUG_ASSERT(!writer_deleted);
+    /*
+      NOTE: pthread_join is not needed due to PTHREAD_CREATE_DETACHED is set
+      in mysqld.cc
+    */
+  }
+  else
+  {
+    /*
+      We have a partitioned table, we're not changing the partitioning,
+      we have at least two partitions and the user has set the maximum
+      number of threads to be larger than one.
+      Thus we start a number of threads that perform the copying between
+      the original table and the new table, partition to partition.
+      To start a new thread we need to create a new to table, a new from
+      table and a new THD object.
+    */
+    DBUG_ASSERT(!(alter_flags & ALTER_PARTITION));
+    DBUG_ASSERT(num_write_parts == num_read_parts && num_threads);
+    ALTER_DATA_THREAD *loc_threads;
+    ALTER_DATA_SHARE alter_data_share;
+    bool ok= TRUE;
+    uint saved_from_db_stat;
+    uint saved_to_db_stat;
+
+    DBUG_PRINT("info",
+               ("doing new, multi-threaded copy (no partition changes)"));
+    num_threads= min(num_threads, num_read_parts);
+    alter_data_share.num_read_threads= num_threads;
+    alter_data_share.num_write_threads= 0;   // read thread also writes
+    alter_data_share.num_read_parts= num_read_parts;
+    alter_data_share.num_write_parts= num_write_parts;
+    alter_data_share.new_db= new_db;
+    alter_data_share.tmp_name= tmp_name;
+    alter_data_share.ignore= ignore;
+    alter_data_share.order= order;
+    alter_data_share.order_num= order_num;
+    alter_data_share.keys_onoff= keys_onoff;
+    alter_data_share.error_if_not_empty= error_if_not_empty;
+    alter_data_share.create= create;
+    alter_data_share.kill_flag= &thd->killed;
+    alter_data_share.to= to;
+    alter_data_share.from= from;
+    alter_data_share.table_list= table_list;
+    alter_data_share.estimated_rows= estimated_rows;
+    alter_data_share.sql_mode= thd->variables.sql_mode;
+    alter_data_share.indexes_are_disabled= indexes_are_disabled;
+    if (allocate_alter_copy_structs(thd, &alter_data_share, &loc_threads,
+                                    NULL))
+      DBUG_RETURN(-1);
+
+    if (init_alter_copy_conds(&alter_data_share, FALSE))
+    {
+      destroy_alter_copy_conds(&alter_data_share, FALSE);
+      DBUG_RETURN(-1);
+    }
+
+    loc_threads[0].alter_data_share= &alter_data_share;
+    loc_threads[0].ok= TRUE;
+    loc_threads[0].done= FALSE;
+    loc_threads[0].killed= FALSE;
+    loc_threads[0].wait_for_signal= FALSE;
+    loc_threads[0].copied= 0;
+    loc_threads[0].deleted= 0;
+    loc_threads[0].ha_error= 0;
+    loc_threads[0].table_list= table_list;
+    loc_threads[0].from= from;
+    loc_threads[0].to= to;
+    loc_threads[0].thread_num= 0;
+
+    /*
+      Force partitioning handler to not open/lock partitions until they are
+      actually used.
+    */
+    saved_from_db_stat= from->db_stat;
+    saved_to_db_stat= to->db_stat;
+    from->db_stat|= HA_OPEN_WAIT_FOR_USAGE;
+    to->db_stat|= HA_OPEN_WAIT_FOR_USAGE;
+
+    for (i= 0; i < num_threads; i++)
+    {
+      DBUG_PRINT("info", ("Thread %d to be started", i));
+      if (i)
+      {
+        /* Thread 0 already done */
+        bcopy(&loc_threads[0], &loc_threads[i], sizeof(ALTER_DATA_THREAD));
+        loc_threads[i].thread_num= i;
+      }
+      loc_threads[i].ctrl_cond= &alter_data_share.read_ctrl_cond[i];
+      loc_threads[i].lock= &alter_data_share.read_mutex[i];
+      if (start_new_alter_thread(&loc_threads[i],
+                                 parallel_copy_data_thread))
+      {
+        ok= FALSE;
+        break;
+      }
+    }
+    DBUG_PRINT("info", ("All %d threads have started", num_threads));
+    for (i= 0; i < num_threads && loc_threads[i].pthread_id; i++)
+    {
+      /* Wake up the copy threads to copy/error handling one by one */
+      DBUG_PRINT("info", ("Sends signal to thread %d for start copying", i));
+      loc_threads[i].ok= ok;
+      pthread_cond_signal(loc_threads[i].ctrl_cond);
+    }
+    /* Wait for the copy/error handling processes to be completed */
+    for (i= 0; i < num_threads && loc_threads[i].pthread_id; i++)
+    {
+      DBUG_PRINT("info", ("Checking if thread %d is done copying", i));
+      pthread_mutex_lock(loc_threads[i].lock);
+      if (!loc_threads[i].done)
+      {
+        loc_threads[i].wait_for_signal= TRUE;
+        DBUG_PRINT("info", ("Thread %d is not done copying, waiting", i));
+        /* add loop */
+        pthread_cond_wait(loc_threads[i].ctrl_cond, loc_threads[i].lock);
+      }
+      DBUG_PRINT("info", ("Thread %d is done copying, continueing", i));
+      pthread_mutex_unlock(loc_threads[i].lock);
+    }
+    /*
+      We need to collect the results and check if all threads were successful.
+    */
+    for (i= 0; i < num_threads && loc_threads[i].pthread_id; i++)
+    {
+      if (ha_error == 0)
+        ha_error= loc_threads[i].ha_error;
+      if (loc_threads[i].ha_error)
+      {
+        DBUG_PRINT("info", ("Copy thread %u had ha_error %u", i,
+                            loc_threads[i].ha_error));
+      }
+      if (error == 0 && !loc_threads[i].ok)
+        error= 1;
+      if (!loc_threads[i].ok)
+        DBUG_PRINT("info", ("Copy thread %u was NOT OK", i));
+      if (loc_threads[i].killed)
+        killed= TRUE;
+      DBUG_PRINT("info", ("Thread %d copied %lu rows, deleted %lu rows", i,
+                          (ulong) loc_threads[i].copied,
+                          (ulong) loc_threads[i].deleted));
+      (*copied)+= loc_threads[i].copied;
+      (*deleted)+= loc_threads[i].deleted;
+    }
+    thd->row_count= *copied;
+    destroy_alter_copy_conds(&alter_data_share, FALSE);
+    /*
+      NOTE: pthread_join is not needed due to PTHREAD_CREATE_DETACHED is set
+      in mysqld.cc
+    */
+    from->db_stat= saved_from_db_stat;
+    to->db_stat= saved_to_db_stat;
+  }
+  if (ha_error)
+  {
+    DBUG_PRINT("info", ("Reporting ha_error %d", ha_error));
+    if (!to->file->is_fatal_error(ha_error, HA_CHECK_DUP_KEY))
+    {
+      uint key_nr= to->file->get_dup_key(ha_error);
+      if ((int) key_nr >= 0)
+      {
+        const char *err_msg= ER(ER_DUP_ENTRY_WITH_KEY_NAME);
+        if (key_nr == 0 &&
+            (to->key_info[0].key_part[0].field->flags &
+             AUTO_INCREMENT_FLAG))
+          err_msg= ER(ER_DUP_ENTRY_AUTOINCREMENT_CASE);
+        to->file->print_keydup_error(key_nr, err_msg);
+        DBUG_PRINT("info", ("%d was keydup error", ha_error));
+      }
+      else
+        to->file->print_error(ha_error,MYF(0));
+    }
+    else
+      to->file->print_error(ha_error,MYF(0));
+    if (!error)
+      error= 1;
+  }
+  /* All threads have completed copying/error handling, we can finish up now */
+  if (killed)
+    thd->killed= THD::KILLED_NO_VALUE;
+    //thd->killed= THD::KILL_BAD_DATA;
+  if (thd->killed)
+  {
+    DBUG_PRINT("info", ("Sending kill message"));
+    thd->send_kill_message();
+  }
+  DBUG_PRINT("info", ("copy_data_between_tables returns %d", error));
+  DBUG_RETURN(error);
+}
+/* Parallel alter module end */
 
 /*
   Recreates tables by calling mysql_alter_table().
@@ -7295,7 +9115,7 @@ bool mysql_recreate_table(THD *thd, TABL
 
   bzero((char*) &create_info, sizeof(create_info));
   create_info.row_type=ROW_TYPE_NOT_USED;
-  create_info.default_table_charset=default_charset_info;
+  create_info.default_table_charset= default_charset_info;
   /* Force alter table to recreate table */
   alter_info.flags= (ALTER_CHANGE_COLUMN | ALTER_RECREATE);
   DBUG_RETURN(mysql_alter_table(thd, NullS, NullS, &create_info,

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2008-07-24 20:38:44 +0000
+++ b/sql/table.cc	2009-01-14 10:58:32 +0000
@@ -824,6 +824,7 @@ static int open_binary_frm(THD *thd, TAB
   strpos+= (strmov(keynames, (char *) strpos) - keynames)+1;
 
   share->reclength = uint2korr((head+16));
+  DBUG_PRINT("info", ("reclength from frm: %u", share->reclength));
   if (*(head+26) == 1)
     share->system= 1;				/* one-record-database */
 #ifdef HAVE_CRYPTED_FRM
@@ -2475,6 +2476,7 @@ File create_frm(THD *thd, const char *na
     tmp_key_length= (key_length < 0xffff) ? key_length : 0xffff;
     int2store(fileinfo+14,tmp_key_length);
     int2store(fileinfo+16,reclength);
+    DBUG_PRINT("info", ("stored reclength %lu in frm", reclength));
     int4store(fileinfo+18,create_info->max_rows);
     int4store(fileinfo+22,create_info->min_rows);
     /* fileinfo[26] is set in mysql_create_frm() */

=== modified file 'sql/unireg.cc'
--- a/sql/unireg.cc	2008-03-28 10:14:27 +0000
+++ b/sql/unireg.cc	2009-01-14 10:58:32 +0000
@@ -586,6 +586,7 @@ static bool pack_header(uchar *forminfo,
 
   totlength= 0L;
   reclength= data_offset;
+  DBUG_PRINT("info", ("reclength %lu from data_offset", reclength));
   no_empty=int_count=int_parts=int_length=time_stamp_pos=null_fields=
     com_length=0;
   n_length=2L;
@@ -618,6 +619,7 @@ static bool pack_header(uchar *forminfo,
     }
 
     totlength+= field->length;
+    DBUG_PRINT("info", ("totlength %lu", totlength));
     com_length+= field->comment.length;
     if (MTYP_TYPENR(field->unireg_check) == Field::NOEMPTY ||
 	field->unireg_check & MTYP_NOEMPTY_BIT)
@@ -639,6 +641,7 @@ static bool pack_header(uchar *forminfo,
     DBUG_ASSERT(reclength == field->offset + data_offset);
     if ((uint) field->offset+ (uint) data_offset+ length > reclength)
       reclength=(uint) (field->offset+ data_offset + length);
+    DBUG_PRINT("info", ("new reclength in pack_header %lu", reclength));
     n_length+= (ulong) strlen(field->field_name)+1;
     field->interval_id=0;
     field->save_interval= 0;
@@ -719,6 +722,7 @@ static bool pack_header(uchar *forminfo,
   int2store(forminfo+262,totlength);
   int2store(forminfo+264,no_empty);
   int2store(forminfo+266,reclength);
+  DBUG_PRINT("info", ("Stored reclength %lu in forminfo", reclength));
   int2store(forminfo+268,n_length);
   int2store(forminfo+270,int_count);
   int2store(forminfo+272,int_parts);

Thread
bzr commit into mysql-5.1 branch (mattias.jonsson:2735)Mattias Jonsson14 Jan