List:Commits« Previous MessageNext Message »
From:mikael Date:February 21 2006 6:45am
Subject:bk commit into 5.1 tree (mikron:1.2149)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of mikron. When mikron does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2149 06/02/21 07:45:20 mikron@stripped +23 -0
  WL 2826: Error handling of ALTER TABLE for partitioning
  Introduced a new table log, can later be reused for many other
  meta data operations

  sql/unireg.cc
    1.75 06/02/21 07:45:05 mikron@stripped +1 -8
    Removed partition state from frm file

  sql/table.cc
    1.210 06/02/21 07:45:05 mikron@stripped +7 -28
    Removed partition state from frm file and made it
    able to read 5.1.6 frm files

  sql/sql_yacc.yy
    1.465 06/02/21 07:45:05 mikron@stripped +5 -66
    Removed partition state from parser

  sql/sql_table.cc
    1.309 06/02/21 07:45:04 mikron@stripped +968 -73
    A new module table log
    Updated mysql_write_frm to adapt it to table log

  sql/sql_partition.cc
    1.38 06/02/21 07:45:04 mikron@stripped +1049 -158
    Remove partition state from generate_partition_syntax
    Removed write of temporary partitions
    New error handling routines of ALTER TABLE for partitioning
    Fixed missing update of no_parts
    Fixed proper error printouts from a few routines
    Lots of new table log code
    Fixed code to use table log code and also injection of errors for
    testability of new code
    Some routines moved here from ha_partition.cc

  sql/sql_class.h
    1.285 06/02/21 07:45:04 mikron@stripped +3 -0
    Introduce new error inject variable

  sql/sql_class.cc
    1.246 06/02/21 07:45:04 mikron@stripped +3 -0
    Introduce new error inject variable

  sql/sql_base.cc
    1.304 06/02/21 07:45:03 mikron@stripped +3 -2
    Added option for abort_and_upgrade_lock to be not killable since
    killing the process in certain situations can lead to unwanted
    complications.

  sql/share/errmsg.txt
    1.85 06/02/21 07:45:03 mikron@stripped +2 -1
    Added new error message

  sql/mysqld.cc
    1.538 06/02/21 07:45:03 mikron@stripped +2 -1
    Added execute of table log in start-up phase and release of
    table log memory in exiting the process.

  sql/mysql_priv.h
    1.379 06/02/21 07:45:02 mikron@stripped +125 -3
    Added error inject support
    Added interface to table log plus some definitions for it
    Updated some interfaces to mysql_write_frm and
    abort_and_upgrade_lock

  sql/log.cc
    1.193 06/02/21 07:45:02 mikron@stripped +3 -1
    Compiler warnings

  sql/handler.h
    1.195 06/02/21 07:45:02 mikron@stripped +15 -3
    Keep track of log entry for a partition element to be able to
    inactivate the log entry after it's done
    Added list of log entries to partition info struct, also
    pointer to an frm log entry and the execute log entry
    New interface to create_handler_files

  sql/ha_partition.h
    1.12 06/02/21 07:45:02 mikron@stripped +2 -1
    New interface

  sql/ha_partition.cc
    1.32 06/02/21 07:45:01 mikron@stripped +111 -159
    Compiler warnings
    Moved code to sql server part
    Adapt create_handler_files to also support rename of handler file
    Rewrote change_partitions and rename_partitions to be in line with
    the new error handling code, both simplified it and made it more
    efficient
    Inserted calls to table log at proper places to ensure that recovery
    is consistent

  sql/ha_ndbcluster.h
    1.120 06/02/21 07:45:01 mikron@stripped +2 -1
    Adapt to new interface

  sql/ha_ndbcluster.cc
    1.275 06/02/21 07:45:01 mikron@stripped +7 -1
    Adapt to new interface

  sql/ha_myisammrg.cc
    1.81 06/02/21 07:45:01 mikron@stripped +3 -1
    Compiler warnings

  sql/ha_heap.cc
    1.83 06/02/21 07:45:00 mikron@stripped +3 -1
    Compiler warnings

  include/my_sys.h
    1.188 06/02/21 07:45:00 mikron@stripped +1 -1
    Corrected spelling error

  configure.in
    1.342 06/02/21 07:45:00 mikron@stripped +13 -0
    New error inject configure option

  BUILD/compile-pentium-debug-max
    1.18 06/02/21 07:45:00 mikron@stripped +1 -1
    Use error inject code in pentium debug max build

  BUILD/SETUP.sh
    1.54 06/02/21 07:45:00 mikron@stripped +1 -0
    New config to use error injection

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mikron
# Host:	c-c50ae253.1238-1-64736c10.cust.bredbandsbolaget.se
# Root:	/Users/mikron/patch-wl2826

--- 1.341/configure.in	2006-02-17 14:53:29 +01:00
+++ 1.342/configure.in	2006-02-21 07:45:00 +01:00
@@ -663,6 +663,7 @@
   AC_MSG_RESULT([no])
 fi
 
+  
 MYSQL_SYS_LARGEFILE
 
 # Types that must be checked AFTER large file support is checked
@@ -1561,6 +1562,18 @@
 then
   OPTIMIZE_CXXFLAGS=""
   DEBUG_OPTIMIZE_CXX=""
+fi
+
+# If we should allow error injection tests
+AC_ARG_WITH(error-inject,
+    [ --with-error-inject    Enable error injection in MySQL Server],
+    [ with_error_inject=$withval ],
+    [ with_error_inject=no ])
+
+if test "$with_error_inject" = "yes"
+then
+  CFLAGS="-DERROR_INJECT_SUPPORT $CFLAGS"
+  CXXFLAGS="-DERROR_INJECT_SUPPORT $CXXFLAGS"
 fi
 
 AC_ARG_WITH(debug,

--- 1.187/include/my_sys.h	2006-01-26 04:54:37 +01:00
+++ 1.188/include/my_sys.h	2006-02-21 07:45:00 +01:00
@@ -559,7 +559,7 @@
 				 enum file_type type_of_file,
 				 uint error_message_number, myf MyFlags);
 extern File my_create(const char *FileName,int CreateFlags,
-		      int AccsesFlags, myf MyFlags);
+		      int AccessFlags, myf MyFlags);
 extern int my_close(File Filedes,myf MyFlags);
 extern File my_dup(File file, myf MyFlags);
 extern int my_mkdir(const char *dir, int Flags, myf MyFlags);

--- 1.82/sql/ha_heap.cc	2006-02-08 22:21:14 +01:00
+++ 1.83/sql/ha_heap.cc	2006-02-21 07:45:00 +01:00
@@ -58,7 +58,9 @@
   NULL,    /* Alter table flags */
   NULL,    /* Alter Tablespace */
   NULL,    /* Fill Files Table */
-  HTON_CAN_RECREATE
+  HTON_CAN_RECREATE,
+  NULL,    /* binlog func */
+  NULL     /* binlog query */ 
 };
 
 static handler *heap_create_handler(TABLE_SHARE *table)

--- 1.80/sql/ha_myisammrg.cc	2006-02-14 22:35:52 +01:00
+++ 1.81/sql/ha_myisammrg.cc	2006-02-21 07:45:01 +01:00
@@ -68,7 +68,9 @@
   NULL,    /* Alter table flags */
   NULL,    /* Alter Tablespace */
   NULL,    /* Fill Files Table */
-  HTON_CAN_RECREATE
+  HTON_CAN_RECREATE,
+  NULL,    /* binlog func */
+  NULL     /* binlog query */
 };
 
 static handler *myisammrg_create_handler(TABLE_SHARE *table)

--- 1.194/sql/handler.h	2006-02-17 17:12:18 +01:00
+++ 1.195/sql/handler.h	2006-02-21 07:45:02 +01:00
@@ -665,6 +665,7 @@
 
 #define UNDEF_NODEGROUP 65535
 class Item;
+struct st_table_log_memory_entry;
 
 class partition_element :public Sql_alloc {
 public:
@@ -674,6 +675,7 @@
   ulonglong part_min_rows;
   char *partition_name;
   char *tablespace_name;
+  st_table_log_memory_entry *log_entry;
   longlong range_value;
   char* part_comment;
   char* data_file_name;
@@ -684,7 +686,8 @@
   
   partition_element()
   : part_max_rows(0), part_min_rows(0), partition_name(NULL),
-    tablespace_name(NULL), range_value(0), part_comment(NULL),
+    tablespace_name(NULL), log_entry(0),
+    range_value(0), part_comment(NULL),
     data_file_name(NULL), index_file_name(NULL),
     engine_type(NULL),part_state(PART_NORMAL),
     nodegroup_id(UNDEF_NODEGROUP)
@@ -806,6 +809,7 @@
                                             PARTITION_ITERATOR *part_iter);
 
 
+
 class partition_info : public Sql_alloc
 {
 public:
@@ -852,7 +856,10 @@
   Item *subpart_expr;
 
   Item *item_free_list;
-  
+
+  st_table_log_memory_entry *first_log_entry;
+  st_table_log_memory_entry *exec_log_entry;
+  st_table_log_memory_entry *frm_log_entry;
   /* 
     A bitmap of partitions used by the current query. 
     Usage pattern:
@@ -964,6 +971,7 @@
     part_field_array(NULL), subpart_field_array(NULL),
     full_part_field_array(NULL),
     part_expr(NULL), subpart_expr(NULL), item_free_list(NULL),
+    first_log_entry(NULL), exec_log_entry(NULL), frm_log_entry(NULL),
     list_array(NULL),
     part_info_string(NULL),
     part_func_string(NULL), subpart_func_string(NULL),
@@ -1804,7 +1812,11 @@
   virtual void drop_table(const char *name);
   
   virtual int create(const char *name, TABLE *form, HA_CREATE_INFO *info)=0;
-  virtual int create_handler_files(const char *name) { return FALSE;}
+  virtual int create_handler_files(const char *name, const char *old_name,
+                                   bool rename_flag)
+  {
+    return FALSE;
+  }
 
   virtual int change_partitions(HA_CREATE_INFO *create_info,
                                 const char *path,

--- 1.192/sql/log.cc	2006-02-18 19:07:28 +01:00
+++ 1.193/sql/log.cc	2006-02-21 07:45:02 +01:00
@@ -102,7 +102,9 @@
   NULL,                         /* Alter table flags */
   NULL,                         /* Alter Tablespace */
   NULL,                         /* Fill FILES table */
-  HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
+  HTON_NOT_USER_SELECTABLE | HTON_HIDDEN,
+  NULL,                         /* binlog func */
+  NULL                          /* binlog query */
 };
 
 

--- 1.378/sql/mysql_priv.h	2006-02-13 08:53:41 +01:00
+++ 1.379/sql/mysql_priv.h	2006-02-21 07:45:02 +01:00
@@ -603,6 +603,67 @@
 #define query_cache_invalidate_by_MyISAM_filename_ref NULL
 #endif /*HAVE_QUERY_CACHE*/
 
+/*
+  Error injector Macros to enable easy testing of recovery after failures
+  in various error cases.
+*/
+#ifndef ERROR_INJECT_SUPPORT
+
+#define ERROR_INJECT(x) 0
+#define ERROR_INJECT_ACTION(x,action) 0
+#define ERROR_INJECT_CRASH(x) 0
+#define ERROR_INJECT_VALUE(x) 0
+#define ERROR_INJECT_VALUE_ACTION(x,action) 0
+#define ERROR_INJECT_VALUE_CRASH(x) 0
+#define SET_ERROR_INJECT_VALUE(x)
+
+#else
+
+#define SET_ERROR_INJECT_VALUE(x) \
+  current_thd->error_inject_value= (x)
+
+inline bool
+my_error_inject_name(const char *dbug_str)
+{
+  const char *extra_str= "-d,";
+  char total_str[200];
+  if (_db_strict_keyword_ (dbug_str))
+  {
+    strxmov(total_str, extra_str, dbug_str, NullS);
+    DBUG_SET(total_str);
+    return 1;
+  }
+  return 0;
+}
+
+
+inline bool
+my_error_inject(int value)
+{
+  THD *thd= current_thd;
+  if (thd->error_inject_value == (uint)value)
+  {
+    thd->error_inject_value= 0;
+    return 1;
+  }
+  return 0;
+}
+
+#define ERROR_INJECT_CRASH(code) \
+  DBUG_EVALUATE_IF(code, (abort(), 0), 0)
+#define ERROR_INJECT_ACTION(code, action) \
+  (my_error_inject_name(code) ? ((action), 0) : 0)
+#define ERROR_INJECT(code) \
+  my_error_inject_name(code)
+#define ERROR_INJECT_VALUE(value) \
+  my_error_inject(value)
+#define ERROR_INJECT_VALUE_ACTION(value,action) \
+  (my_error_inject(value) ? (action) : 0)
+#define ERROR_INJECT_VALUE_CRASH(value) \
+  (my_error_inject(value) ? abort() : 0)
+
+#endif
+
 uint build_table_path(char *buff, size_t bufflen, const char *db,
                       const char *table, const char *ext);
 void write_bin_log(THD *thd, bool clear_error,
@@ -1078,6 +1139,16 @@
 bool remove_table_from_cache(THD *thd, const char *db, const char *table,
                              uint flags);
 
+#define NORMAL_PART_NAME 0
+#define TEMP_PART_NAME 1
+#define RENAMED_PART_NAME 2
+void create_partition_name(char *out, const char *in1,
+                           const char *in2, uint name_variant,
+                           bool translate);
+void create_subpartition_name(char *out, const char *in1,
+                              const char *in2, const char *in3,
+                              uint name_variant);
+
 typedef struct st_lock_param_type
 {
   ulonglong copied;
@@ -1097,14 +1168,62 @@
   uint key_count;
   uint db_options;
   uint pack_frm_len;
+  partition_info *part_info;
 } ALTER_PARTITION_PARAM_TYPE;
 
 void mem_alloc_error(size_t size);
-#define WFRM_INITIAL_WRITE 1
-#define WFRM_CREATE_HANDLER_FILES 2
+
+typedef struct st_table_log_entry
+{
+  const char *name;
+  const char *from_name;
+  const char *handler_type;
+  uint next_entry;
+  uint entry_pos;
+  char action_type;
+  char entry_type;
+  char phase;
+  char not_used;
+} TABLE_LOG_ENTRY;
+
+typedef struct st_table_log_memory_entry
+{
+  uint entry_pos;
+  struct st_table_log_memory_entry *next_log_entry;
+  struct st_table_log_memory_entry *prev_log_entry;
+  struct st_table_log_memory_entry *next_active_log_entry;
+} TABLE_LOG_MEMORY_ENTRY;
+
+#define TLOG_EXECUTE_CODE 'e'
+#define TLOG_LOG_ENTRY_CODE 'l'
+#define TLOG_IGNORE_LOG_ENTRY_CODE 'i'
+
+#define TLOG_DELETE_ACTION_CODE 'd'
+#define TLOG_RENAME_ACTION_CODE 'r'
+#define TLOG_REPLACE_ACTION_CODE 's'
+
+#define TLOG_HANDLER_TYPE_LEN 32
+
+bool write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
+                           TABLE_LOG_MEMORY_ENTRY **active_entry);
+bool write_execute_table_log_entry(uint first_entry,
+                                   bool complete,
+                                   TABLE_LOG_MEMORY_ENTRY **active_entry);
+bool inactivate_table_log_entry(uint entry_no);
+void release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry);
+bool sync_table_log();
+void release_table_log();
+void execute_table_log_recovery();
+bool execute_table_log_entry(uint first_entry);
+void lock_global_table_log();
+void unlock_global_table_log();
+
+#define WFRM_WRITE_SHADOW 1
+#define WFRM_INSTALL_SHADOW 2
 #define WFRM_PACK_FRM 4
 bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags);
-bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
+bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt,
+                            bool can_be_killed);
 void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
 void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table);
 
@@ -1275,6 +1394,9 @@
 extern ulong delayed_insert_limit, delayed_queue_size;
 extern ulong delayed_insert_threads, delayed_insert_writes;
 extern ulong delayed_rows_in_use,delayed_insert_errors;
+#ifdef ERROR_INJECT_SUPPORT
+extern ulong error_inject_value;
+#endif
 extern ulong slave_open_temp_tables;
 extern ulong query_cache_size, query_cache_min_res_unit;
 extern ulong slow_launch_threads, slow_launch_time;

--- 1.537/sql/mysqld.cc	2006-02-18 23:37:51 +01:00
+++ 1.538/sql/mysqld.cc	2006-02-21 07:45:03 +01:00
@@ -3697,7 +3697,7 @@
       unireg_abort(1);
     }
   }
-
+  execute_table_log_recovery();
   init_events();
 
   create_shutdown_thread();
@@ -3729,6 +3729,7 @@
   /* (void) pthread_attr_destroy(&connection_attrib); */
   
   DBUG_PRINT("quit",("Exiting main thread"));
+  release_table_log();
 
 #ifndef __WIN__
 #ifdef EXTRA_DEBUG2

--- 1.303/sql/sql_base.cc	2006-02-13 20:34:30 +01:00
+++ 1.304/sql/sql_base.cc	2006-02-21 07:45:03 +01:00
@@ -6145,7 +6145,8 @@
     old_lock_level                Old lock level
 */
 
-bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
+bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt,
+                            bool can_be_killed)
 {
   uint flags= RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG;
   int error= FALSE;
@@ -6155,7 +6156,7 @@
   VOID(pthread_mutex_lock(&LOCK_open));
   mysql_lock_abort(lpt->thd, lpt->table, TRUE);
   VOID(remove_table_from_cache(lpt->thd, lpt->db, lpt->table_name, flags));
-  if (lpt->thd->killed)
+  if (can_be_killed && lpt->thd->killed)
   {
     lpt->thd->no_warnings_for_error= 0;
     error= TRUE;

--- 1.245/sql/sql_class.cc	2006-02-14 06:41:15 +01:00
+++ 1.246/sql/sql_class.cc	2006-02-21 07:45:04 +01:00
@@ -222,6 +222,9 @@
   cuted_fields= sent_row_count= 0L;
   limit_found_rows= 0;
   statement_id_counter= 0UL;
+#ifdef ERROR_INJECT_SUPPORT
+  error_inject_value= 0UL;
+#endif
   // Must be reset to handle error with THD's created for init of mysqld
   lex->current_select= 0;
   start_time=(time_t) 0;

--- 1.284/sql/sql_class.h	2006-02-14 06:41:19 +01:00
+++ 1.285/sql/sql_class.h	2006-02-21 07:45:04 +01:00
@@ -1099,6 +1099,9 @@
   query_id_t query_id, warn_id;
   ulong      thread_id, col_access;
 
+#ifdef ERROR_INJECT_SUPPORT
+  ulong      error_inject_value;
+#endif
   /* Statement id is thread-wide. This counter is used to generate ids */
   ulong      statement_id_counter;
   ulong	     rand_saved_seed1, rand_saved_seed2;

--- 1.308/sql/sql_table.cc	2006-02-17 19:55:38 +01:00
+++ 1.309/sql/sql_table.cc	2006-02-21 07:45:04 +01:00
@@ -241,6 +241,886 @@
   DBUG_RETURN(FALSE);
 }
 
+/*
+--------------------------------------------------------------------------
+
+   MODULE: Table log
+   -----------------
+
+   This module is used to ensure that we can recover from crashes that occur
+   in the middle of a meta-data operation in MySQL. E.g. DROP TABLE t1, t2;
+   We need to ensure that both t1 and t2 are dropped and not only t1 and
+   also that each table drop is entirely done and not "half-baked".
+
+   To support this we create log entries for each meta-data statement in the
+   table log while we are executing. These entries are dropped when the
+   operation is completed.
+
+   At recovery those entries that were not completed will be executed.
+
+   There is only one table log in the system and it is protected by a mutex
+   and there is a global struct that contains information about its current
+   state.
+
+   History:
+   First version written in 2006 by Mikael Ronstrom
+--------------------------------------------------------------------------
+*/
+
+
+typedef struct st_global_table_log
+{
+  char file_entry[IO_SIZE];
+  char file_name_str[FN_REFLEN];
+  char *file_name;
+  TABLE_LOG_MEMORY_ENTRY *first_free;
+  TABLE_LOG_MEMORY_ENTRY *first_used;
+  uint no_entries;
+  File file_id;
+  uint name_len;
+  uint handler_type_len;
+  uint io_size;
+} GLOBAL_TABLE_LOG;
+
+GLOBAL_TABLE_LOG global_table_log;
+
+pthread_mutex_t LOCK_gtl;
+
+#define TLOG_ENTRY_TYPE_POS 0
+#define TLOG_ACTION_TYPE_POS 1
+#define TLOG_PHASE_POS 2
+#define TLOG_NEXT_ENTRY_POS 4
+#define TLOG_NAME_POS 8
+
+#define TLOG_NO_ENTRY_POS 0
+#define TLOG_NAME_LEN_POS 4
+#define TLOG_HANDLER_TYPE_POS 8
+#define TLOG_IO_SIZE_POS 12
+
+/*
+  Read one entry from table log file
+  SYNOPSIS
+    read_table_log_file_entry()
+    entry_no                     Entry number to read
+  RETURN VALUES
+    TRUE                         Error
+    FALSE                        Success
+*/
+
+static
+bool
+read_table_log_file_entry(uint entry_no)
+{
+  bool error= FALSE;
+  File file_id= global_table_log.file_id;
+  char *file_entry= (char*)global_table_log.file_entry;
+  uint io_size= global_table_log.io_size;
+  DBUG_ENTER("read_table_log_file_entry");
+
+  if (my_pread(file_id, file_entry, io_size, io_size * entry_no,
+               MYF(MY_WME)) != IO_SIZE)
+    error= TRUE;
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Write one entry from table log file
+  SYNOPSIS
+    write_table_log_file_entry()
+    entry_no                     Entry number to read
+  RETURN VALUES
+    TRUE                         Error
+    FALSE                        Success
+*/
+
+static
+bool
+write_table_log_file_entry(uint entry_no)
+{
+  bool error= FALSE;
+  File file_id= global_table_log.file_id;
+  char *file_entry= (char*)global_table_log.file_entry;
+  DBUG_ENTER("write_table_log_file_entry");
+
+  if (my_pwrite(file_id, file_entry,
+                IO_SIZE, IO_SIZE * entry_no, MYF(MY_WME)) != IO_SIZE)
+    error= TRUE;
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Write table log header
+  SYNOPSIS
+    write_table_log_header()
+  RETURN VALUES
+    TRUE                      Error
+    FALSE                     Success
+*/
+
+static
+bool
+write_table_log_header()
+{
+  uint16 const_var;
+  bool error= FALSE;
+  DBUG_ENTER("write_table_log_header");
+
+  int4store(&global_table_log.file_entry[TLOG_NO_ENTRY_POS],
+            global_table_log.no_entries);
+  const_var= FN_LEN;
+  int4store(&global_table_log.file_entry[TLOG_NAME_LEN_POS],
+            const_var);
+  const_var= TLOG_HANDLER_TYPE_LEN;
+  int4store(&global_table_log.file_entry[TLOG_HANDLER_TYPE_POS],
+            const_var);
+  const_var= IO_SIZE;
+  int4store(&global_table_log.file_entry[TLOG_IO_SIZE_POS],
+            const_var);
+  if (write_table_log_file_entry(0UL))
+    error= TRUE;
+  if (!error)
+    VOID(sync_table_log());
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Create table log file name
+  SYNOPSIS
+    create_table_log_file_name()
+    file_name                   Filename setup
+  RETURN VALUES
+    NONE
+*/
+
+static
+void
+create_table_log_file_name(char *file_name)
+{
+  strxmov(file_name, mysql_data_home, "/", "table_log.log", NullS);
+}
+
+
+/*
+  Read header of table log file
+  SYNOPSIS
+    read_table_log_header()
+  RETURN VALUES
+    > 0                  Last entry in table log
+    0                    No entries in table log
+  DESCRIPTION
+    When we read the table log header we get information about maximum sizes
+    of names in the table log and we also get information about the number
+    of entries in the table log.
+*/
+
+static
+uint
+read_table_log_header()
+{
+  char *file_entry= (char*)global_table_log.file_entry;
+  char file_name[FN_REFLEN];
+  uint entry_no;
+  bool successful_open= FALSE;
+  DBUG_ENTER("read_table_log_header");
+
+  bzero(file_entry, sizeof(global_table_log.file_entry));
+  create_table_log_file_name(file_name);
+  if (!(my_open(file_name, O_RDWR | O_TRUNC | O_BINARY, MYF(MY_WME))))
+  {
+    if (read_table_log_file_entry(0UL))
+    {
+      ; /* Write message into error log */
+    }
+    else
+      successful_open= TRUE;
+  }
+  entry_no= uint4korr(&file_entry[TLOG_NO_ENTRY_POS]);
+  global_table_log.name_len= uint4korr(&file_entry[TLOG_NAME_LEN_POS]);
+  global_table_log.handler_type_len=
+        uint4korr(&file_entry[TLOG_HANDLER_TYPE_POS]);
+  if (successful_open) 
+    global_table_log.io_size= uint4korr(&file_entry[TLOG_IO_SIZE_POS]);
+  else
+    global_table_log.io_size= IO_SIZE;
+  global_table_log.first_free= NULL;
+  global_table_log.first_used= NULL;
+  global_table_log.no_entries= 0;
+  VOID(pthread_mutex_init(&LOCK_gtl, MY_MUTEX_INIT_FAST));
+  DBUG_RETURN(entry_no);
+}
+
+
+/*
+  Read a table log entry
+  SYNOPSIS
+    read_table_log_entry()
+    read_entry               Number of entry to read
+    out:entry_info           Information from entry
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Read a specified entry in the table log
+*/
+
+bool
+read_table_log_entry(uint read_entry, TABLE_LOG_ENTRY *table_log_entry)
+{
+  char *file_entry= (char*)&global_table_log.file_entry;
+  uint inx;
+  DBUG_ENTER("read_table_log_entry");
+
+  if (read_table_log_file_entry(read_entry))
+  {
+    /* Error handling */
+    DBUG_RETURN(TRUE);
+  }
+  table_log_entry->entry_pos= read_entry;
+  table_log_entry->entry_type= file_entry[TLOG_ENTRY_TYPE_POS];
+  table_log_entry->action_type= file_entry[TLOG_ACTION_TYPE_POS];
+  table_log_entry->phase= file_entry[TLOG_PHASE_POS];
+  table_log_entry->next_entry= uint4korr(&file_entry[TLOG_NEXT_ENTRY_POS]);
+  table_log_entry->name= &file_entry[TLOG_NAME_POS];
+  inx= TLOG_NAME_POS + global_table_log.name_len;
+  table_log_entry->from_name= &file_entry[inx];
+  inx+= global_table_log.name_len;
+  table_log_entry->handler_type= &file_entry[inx];
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  Initialise table log
+  SYNOPSIS
+    init_table_log()
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Write the header of the table log file and length of names. Also set
+    number of entries to zero.
+*/
+
+static
+bool
+init_table_log()
+{
+  bool error= FALSE;
+  char file_name[FN_REFLEN];
+  DBUG_ENTER("init_table_log");
+
+  global_table_log.io_size= IO_SIZE;
+  create_table_log_file_name(file_name);
+  VOID(my_delete(file_name, MYF(0)));
+  if ((global_table_log.file_id= my_create(file_name,
+                                           CREATE_MODE,
+                                           O_RDWR | O_TRUNC | O_BINARY,
+                                           MYF(MY_WME))) < 0)
+  {
+    /* Couldn't create table log file, this is serious error */
+    abort();
+  }
+  if (write_table_log_header())
+  {
+    /* Write to error log */
+    error= TRUE;
+  }
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Execute one action in a table log entry
+  SYNOPSIS
+    execute_table_log_action()
+    table_log_entry              Information in action entry to execute
+  RETURN VALUES
+    TRUE                       Error
+    FALSE                      Success
+*/
+
+static
+bool
+execute_table_log_action(TABLE_LOG_ENTRY *table_log_entry)
+{
+  bool frm_action= FALSE;
+  LEX_STRING handler_name;
+  handler *file;
+  MEM_ROOT mem_root;
+  bool error= TRUE;
+  char path[FN_REFLEN];
+  char from_path[FN_REFLEN];
+  char *par_ext= (char*)".par";
+  handlerton *hton;
+  DBUG_ENTER("execute_table_log_action");
+
+  if (table_log_entry->entry_type == TLOG_IGNORE_LOG_ENTRY_CODE)
+  {
+    DBUG_RETURN(FALSE);
+  }
+  handler_name.str= (char*)table_log_entry->handler_type;
+  handler_name.length= strlen(table_log_entry->handler_type);
+  hton= ha_resolve_by_name(current_thd, &handler_name);
+  if (!hton)
+  {
+    my_error(ER_ILLEGAL_HA, MYF(0), table_log_entry->handler_type);
+    DBUG_RETURN(TRUE);
+  }
+  init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); 
+  if (strcmp("frm", table_log_entry->handler_type))
+    frm_action= TRUE;
+  else
+  {
+    TABLE_SHARE dummy;
+    bzero(&dummy, sizeof(TABLE_SHARE));
+    file= get_new_handler(&dummy, &mem_root, hton);
+    if (!file)
+    {
+      mem_alloc_error(sizeof(handler));
+      goto error;
+    }
+  }
+  switch (table_log_entry->action_type)
+  {
+    case TLOG_DELETE_ACTION_CODE:
+    case TLOG_REPLACE_ACTION_CODE:
+    {
+      if (table_log_entry->action_type == TLOG_DELETE_ACTION_CODE ||
+          (table_log_entry->action_type == TLOG_REPLACE_ACTION_CODE &&
+           table_log_entry->phase == 0UL))
+      {
+        if (frm_action)
+        {
+          strxmov(path, table_log_entry->name, reg_ext, NullS);
+          if (my_delete(path, MYF(MY_WME)))
+            break;
+          strxmov(path, table_log_entry->name, par_ext, NullS);
+          if (my_delete(path, MYF(MY_WME)))
+            break;
+        }
+        else
+        {
+          if (file->delete_table(table_log_entry->name))
+            break;
+        }
+        if ((!inactivate_table_log_entry(table_log_entry->entry_pos)))
+          ;
+        else
+        {
+          VOID(sync_table_log());
+          error= FALSE;
+        }
+        break;
+      }
+      if (table_log_entry->action_type == TLOG_DELETE_ACTION_CODE)
+        break;
+    }
+    case TLOG_RENAME_ACTION_CODE:
+    {
+      error= TRUE;
+      if (frm_action)
+      {
+        strxmov(path, table_log_entry->name, reg_ext, NullS);
+        strxmov(from_path, table_log_entry->from_name, reg_ext, NullS);
+        if (my_rename(path, from_path, MYF(MY_WME)))
+          break;
+        strxmov(path, table_log_entry->name, par_ext, NullS);
+        strxmov(from_path, table_log_entry->from_name, par_ext, NullS);
+        if (my_rename(path, from_path, MYF(MY_WME)))
+          break;
+      }
+      else
+      {
+        if (file->rename_table(table_log_entry->name,
+                               table_log_entry->from_name))
+          break;
+        if ((!inactivate_table_log_entry(table_log_entry->entry_pos)))
+          ;
+        else
+        {
+          VOID(sync_table_log());
+          error= FALSE;
+        }
+      }
+      break;
+    }
+    default:
+      DBUG_ASSERT(0);
+      break;
+  }
+  delete file;
+error:
+  free_root(&mem_root, MYF(0)); 
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Get a free entry in the table log
+  SYNOPSIS
+    get_free_table_log_entry()
+    out:active_entry                A table log memory entry returned
+  RETURN VALUES
+    TRUE                       Error
+    FALSE                      Success
+*/
+
+static
+bool
+get_free_table_log_entry(TABLE_LOG_MEMORY_ENTRY **active_entry,
+                         bool *write_header)
+{
+  uint entry_no;
+  TABLE_LOG_MEMORY_ENTRY *used_entry;
+  TABLE_LOG_MEMORY_ENTRY *first_used= global_table_log.first_used;
+  DBUG_ENTER("get_free_table_log_entry");
+
+  if (global_table_log.first_free == NULL)
+  {
+    if (!(used_entry= (TABLE_LOG_MEMORY_ENTRY*)my_malloc(
+                              sizeof(TABLE_LOG_MEMORY_ENTRY), MYF(MY_WME))))
+    {
+      DBUG_RETURN(TRUE);
+    }
+    global_table_log.no_entries++;
+    used_entry->entry_pos= entry_no= global_table_log.no_entries;
+    *write_header= TRUE;
+  }
+  else
+  {
+    used_entry= global_table_log.first_free;
+    global_table_log.first_free= used_entry->next_log_entry;
+    entry_no= used_entry->entry_pos;
+    *write_header= FALSE;
+  }
+  /*
+    Move from free list to used list
+  */
+  used_entry->next_log_entry= first_used;
+  used_entry->prev_log_entry= NULL;
+  global_table_log.first_used= used_entry;
+  if (first_used)
+    first_used->prev_log_entry= used_entry;
+
+  *active_entry= used_entry;
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  External interface methods for the Table log Module
+  ---------------------------------------------------
+*/
+
+/*
+  SYNOPSIS
+    write_table_log_entry()
+    table_log_entry         Information about log entry
+    out:entry_written       Entry information written into   
+
+  RETURN VALUES
+    TRUE                      Error
+    FALSE                     Success
+
+  DESCRIPTION
+    A careful write of the table log is performed to ensure that we can
+    handle crashes occurring during CREATE and ALTER TABLE processing.
+*/
+
+bool
+write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
+                      TABLE_LOG_MEMORY_ENTRY **active_entry)
+{
+  bool error, write_header;
+  DBUG_ENTER("write_table_log_entry");
+
+  global_table_log.file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_LOG_ENTRY_CODE;
+  global_table_log.file_entry[TLOG_ACTION_TYPE_POS]=
+                                    table_log_entry->action_type;
+  global_table_log.file_entry[TLOG_PHASE_POS]= 0;
+  int4store(&global_table_log.file_entry[TLOG_NEXT_ENTRY_POS],
+            table_log_entry->next_entry);
+  DBUG_ASSERT(strlen(table_log_entry->name) < FN_LEN);
+  strncpy(&global_table_log.file_entry[TLOG_NAME_POS],
+          table_log_entry->name, FN_LEN);
+  if (table_log_entry->action_type == TLOG_RENAME_ACTION_CODE ||
+      table_log_entry->action_type == TLOG_REPLACE_ACTION_CODE)
+  {
+    DBUG_ASSERT(strlen(table_log_entry->from_name) < FN_LEN);
+    strncpy(&global_table_log.file_entry[TLOG_NAME_POS + FN_LEN],
+          table_log_entry->from_name, FN_LEN);
+  }
+  else
+    global_table_log.file_entry[TLOG_NAME_POS + FN_LEN]= 0;
+  DBUG_ASSERT(strlen(table_log_entry->handler_type) < FN_LEN);
+  strncpy(&global_table_log.file_entry[TLOG_NAME_POS + (2*FN_LEN)],
+          table_log_entry->handler_type, FN_LEN);
+  if (get_free_table_log_entry(active_entry, &write_header))
+  {
+    DBUG_RETURN(TRUE);
+  }
+  error= FALSE;
+  if (write_table_log_file_entry((*active_entry)->entry_pos))
+    error= TRUE;
+  if (write_header && !error)
+  {
+    VOID(sync_table_log());
+    if (write_table_log_header())
+      error= TRUE;
+  }
+  if (error)
+    release_table_log_memory_entry(*active_entry);
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Write final entry in the table log
+  SYNOPSIS
+    write_execute_table_log_entry()
+    first_entry                    First entry in linked list of entries
+                                   to execute, if 0 = NULL it means that
+                                   the entry is removed and the entries
+                                   are put into the free list.
+    in:out:exec_entry              Entry to execute, 0 = NULL if the entry
+                                   is written first time and needs to be
+                                   returned. In this case the entry written
+                                   is returned in this parameter
+  RETURN VALUES
+    TRUE                           Error
+    FALSE                          Success
+
+  DESCRIPTION
+    This is the last write in the table log. The previous log entries have
+    already been written but not yet synched to disk.
+*/ 
+
+bool
+write_execute_table_log_entry(uint first_entry,
+                              bool complete,
+                              TABLE_LOG_MEMORY_ENTRY **active_entry)
+{
+  bool write_header= FALSE;
+  char *file_entry= (char*)global_table_log.file_entry;
+  DBUG_ENTER("write_execute_table_log_entry");
+
+  if (!complete)
+  {
+    VOID(sync_table_log());
+    file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_EXECUTE_CODE;
+  }
+  else
+    file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_CODE;
+  file_entry[TLOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
+  file_entry[TLOG_PHASE_POS]= 0;
+  int4store(&file_entry[TLOG_NEXT_ENTRY_POS], first_entry);
+  file_entry[TLOG_NAME_POS]= 0;
+  file_entry[TLOG_NAME_POS + FN_LEN]= 0;
+  file_entry[TLOG_NAME_POS + 2*FN_LEN]= 0;
+  if (!(*active_entry))
+  {
+    if (get_free_table_log_entry(active_entry, &write_header))
+    {
+      DBUG_RETURN(TRUE);
+    }
+  }
+  if (write_table_log_file_entry((*active_entry)->entry_pos))
+  {
+    release_table_log_memory_entry(*active_entry);
+    DBUG_RETURN(TRUE);
+  }
+  VOID(sync_table_log());
+  if (write_header)
+  {
+    if (write_table_log_header())
+    {
+      release_table_log_memory_entry(*active_entry);
+      DBUG_RETURN(TRUE);
+    }
+  }
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  For complex rename operations we need to inactivate individual entries.
+  SYNOPSIS
+    inactivate_table_log_entry()
+    entry_no                      Entry position of record to change
+  RETURN VALUES
+    TRUE                         Error
+    FALSE                        Success
+  DESCRIPTION
+    During replace operations where we start with an existing table called
+    t1 and a replacement table called t1#temp or something else and where
+    we want to delete t1 and rename t1#temp to t1 this is not possible to
+    do in a safe manner unless the table log is informed of the phases in
+    the change.
+
+    Delete actions are 1-phase actions that can be ignored immediately after
+    being executed.
+    Rename actions from x to y is also a 1-phase action since there is no
+    interaction with any other handlers named x and y.
+    Replace action where drop y and x -> y happens needs to be a two-phase
+    action. Thus the first phase will drop y and the second phase will
+    rename x -> y.
+*/
+
+bool
+inactivate_table_log_entry(uint entry_no)
+{
+  bool error= TRUE;
+  char *file_entry= (char*)global_table_log.file_entry;
+  DBUG_ENTER("inactivate_table_log_entry");
+
+  if (!read_table_log_file_entry(entry_no))
+  {
+    if (file_entry[TLOG_ENTRY_TYPE_POS] == TLOG_LOG_ENTRY_CODE)
+    {
+      if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_DELETE_ACTION_CODE ||
+          file_entry[TLOG_ACTION_TYPE_POS] == TLOG_RENAME_ACTION_CODE ||
+          (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE &&
+           file_entry[TLOG_PHASE_POS] == 1))
+        file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_CODE;
+      else if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE)
+      {
+        DBUG_ASSERT(file_entry[TLOG_PHASE_POS] == 0);
+        file_entry[TLOG_PHASE_POS]= 1;
+      }
+      else
+      {
+        DBUG_ASSERT(0);
+      }
+      if (!write_table_log_file_entry(entry_no))
+        error= FALSE;
+    }
+  }
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Sync table log file
+  SYNOPSIS
+    sync_table_log()
+  RETURN VALUES
+    TRUE                      Error
+    FALSE                     Success
+*/
+
+bool
+sync_table_log()
+{
+  bool error= FALSE;
+  DBUG_ENTER("sync_table_log");
+
+  if (my_sync(global_table_log.file_id, MYF(0)))
+  {
+    /* Write to error log */
+    error= TRUE;
+  }
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Release a log memory entry
+  SYNOPSIS
+    release_table_log_memory_entry()
+    log_memory_entry                Log memory entry to release
+  RETURN VALUES
+    NONE
+*/
+
+void
+release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry)
+{
+  TABLE_LOG_MEMORY_ENTRY *first_free= global_table_log.first_free;
+  TABLE_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
+  TABLE_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
+  DBUG_ENTER("release_table_log_memory_entry");
+
+  global_table_log.first_free= log_entry;
+  log_entry->next_log_entry= first_free;
+
+  if (prev_log_entry)
+    prev_log_entry->next_log_entry= next_log_entry;
+  else
+    global_table_log.first_used= next_log_entry;
+  if (next_log_entry)
+    next_log_entry->prev_log_entry= prev_log_entry;
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Execute one entry in the table log. Executing an entry means executing
+  a linked list of actions.
+  SYNOPSIS
+    execute_table_log_entry()
+    first_entry                Reference to first action in entry
+  RETURN VALUES
+    TRUE                       Error
+    FALSE                      Success
+*/
+
+bool
+execute_table_log_entry(uint first_entry)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  uint read_entry= first_entry;
+  DBUG_ENTER("execute_table_log_entry");
+
+  lock_global_table_log();
+  do
+  {
+    if (read_table_log_entry(read_entry, &table_log_entry))
+    {
+      DBUG_ASSERT(0);
+      /* Write to error log and continue with next log entry */
+      break;
+    }
+    DBUG_ASSERT(table_log_entry.entry_type == TLOG_LOG_ENTRY_CODE ||
+                table_log_entry.entry_type == TLOG_IGNORE_LOG_ENTRY_CODE);
+
+    if (execute_table_log_action(&table_log_entry))
+    {
+      DBUG_ASSERT(0);
+      /* Write to error log and continue with next log entry */
+      break;
+    }
+    read_entry= table_log_entry.next_entry;
+  } while (read_entry);
+  unlock_global_table_log();
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  Execute the table log at recovery of MySQL Server
+  SYNOPSIS
+    execute_table_log_recovery()
+  RETURN VALUES
+    NONE
+*/
+
+void
+execute_table_log_recovery()
+{
+  uint no_entries, i;
+  TABLE_LOG_ENTRY table_log_entry;
+  DBUG_ENTER("execute_table_log_recovery");
+
+  no_entries= read_table_log_header();
+  for (i= 0; i < no_entries; i++)
+  {
+    if (read_table_log_entry(i, &table_log_entry))
+    {
+      DBUG_ASSERT(0);
+      /* Write to error log */
+      break;
+    }
+    if (table_log_entry.entry_type == TLOG_EXECUTE_CODE)
+    {
+      if (execute_table_log_entry(table_log_entry.next_entry))
+      {
+        /*
+           Currently errors are either crashing or ignored so we should
+           never end up here
+        */
+        abort();
+        DBUG_VOID_RETURN;
+      }
+    }
+  }
+  VOID(init_table_log());
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Release all memory allocated to the table log
+  SYNOPSIS
+    release_table_log()
+  RETURN VALUES
+    NONE
+*/
+
+void
+release_table_log()
+{
+  TABLE_LOG_MEMORY_ENTRY *free_list= global_table_log.first_free;
+  TABLE_LOG_MEMORY_ENTRY *used_list= global_table_log.first_used;
+  DBUG_ENTER("release_table_log");
+
+  lock_global_table_log();
+  while (used_list)
+  {
+    TABLE_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
+    my_free((char*)used_list, MYF(0));
+    used_list= tmp;
+  }
+  while (free_list)
+  {
+    TABLE_LOG_MEMORY_ENTRY *tmp= free_list->next_log_entry;
+    my_free((char*)free_list, MYF(0));
+    free_list= tmp;
+  }
+  VOID(my_close(global_table_log.file_id, MYF(0)));
+  unlock_global_table_log();
+  VOID(pthread_mutex_destroy(&LOCK_gtl));
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Lock mutex for global table log
+  SYNOPSIS
+    lock_global_table_log()
+  RETURN VALUES
+    NONE
+*/
+
+void
+lock_global_table_log()
+{
+  DBUG_ENTER("lock_global_table_log");
+
+  VOID(pthread_mutex_lock(&LOCK_gtl));
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Unlock mutex for global table log
+  SYNOPSIS
+    unlock_global_table_log()
+  RETURN VALUES
+    NONE
+*/
+
+void
+unlock_global_table_log()
+{
+  DBUG_ENTER("unlock_global_table_log");
+
+  VOID(pthread_mutex_unlock(&LOCK_gtl));
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+---------------------------------------------------------------------------
+
+  END MODULE Table log
+  --------------------
+
+---------------------------------------------------------------------------
+*/
+
 
 /*
   SYNOPSIS
@@ -274,83 +1154,66 @@
   */
   int error= 0;
   char path[FN_REFLEN+1];
+  char shadow_path[FN_REFLEN+1];
+  char shadow_frm_name[FN_REFLEN+1];
   char frm_name[FN_REFLEN+1];
   DBUG_ENTER("mysql_write_frm");
 
-  if (flags & WFRM_INITIAL_WRITE)
-  {
-    error= mysql_copy_create_list(lpt->create_list,
-                                  &lpt->new_create_list);
-    error+= mysql_copy_key_list(lpt->key_list,
-                                &lpt->new_key_list);
-    if (error)
+  /*
+    Build shadow frm file name
+  */
+  build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
+                       lpt->table_name, "#");
+  strxmov(shadow_frm_name, shadow_path, reg_ext, NullS);
+  if (flags & WFRM_WRITE_SHADOW)
+  {
+    if (mysql_copy_create_list(lpt->create_list,
+                               &lpt->new_create_list) ||
+        mysql_copy_key_list(lpt->key_list,
+                            &lpt->new_key_list) ||
+        mysql_prepare_table(lpt->thd, lpt->create_info,
+                            &lpt->new_create_list,
+                            &lpt->new_key_list,
+                            /*tmp_table*/ 1,
+                            &lpt->db_options,
+                            lpt->table->file,
+                            &lpt->key_info_buffer,
+                            &lpt->key_count,
+                            /*select_field_count*/ 0))
     {
       DBUG_RETURN(TRUE);
     }
-  }
-  build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
-  strxmov(frm_name, path, reg_ext, NullS);
-  if ((flags & WFRM_INITIAL_WRITE) &&
-      (mysql_prepare_table(lpt->thd, lpt->create_info, &lpt->new_create_list,
-                           &lpt->new_key_list,/*tmp_table*/ 1, &lpt->db_options,
-                           lpt->table->file, &lpt->key_info_buffer,
-                           &lpt->key_count, /*select_field_count*/ 0)))
-  {
-    DBUG_RETURN(TRUE);
-  }
 #ifdef WITH_PARTITION_STORAGE_ENGINE
-  {
-    partition_info *part_info= lpt->table->part_info;
-    char *part_syntax_buf;
-    uint syntax_len, i;
-    bool any_unnormal_state= FALSE;
-
-    if (part_info)
     {
-      uint max_part_state_len= part_info->partitions.elements +
-                           part_info->temp_partitions.elements;
-      if (!(part_info->part_state= (uchar*)sql_alloc(max_part_state_len)))
-      {
-        DBUG_RETURN(TRUE);
-      }
-      part_info->part_state_len= 0;
-      if (!(part_syntax_buf= generate_partition_syntax(part_info,
-                                                       &syntax_len,
-                                                       TRUE, FALSE)))
-      {
-        DBUG_RETURN(TRUE);
-      }
-      for (i= 0; i < part_info->part_state_len; i++)
-      {
-        enum partition_state part_state=
-                        (enum partition_state)part_info->part_state[i];
-        if (part_state != PART_NORMAL && part_state != PART_IS_ADDED)
-          any_unnormal_state= TRUE;
-      }
-      if (!any_unnormal_state)
-      {
-        part_info->part_state= NULL;
-        part_info->part_state_len= 0;
+      partition_info *part_info= lpt->table->part_info;
+      char *part_syntax_buf;
+      uint syntax_len;
+
+      if (part_info)
+      {
+        if (!(part_syntax_buf= generate_partition_syntax(part_info,
+                                                         &syntax_len,
+                                                         TRUE, FALSE)))
+        {
+          DBUG_RETURN(TRUE);
+        }
+        part_info->part_info_string= part_syntax_buf;
+        part_info->part_info_len= syntax_len;
       }
-      part_info->part_info_string= part_syntax_buf;
-      part_info->part_info_len= syntax_len;
     }
-  }
 #endif
-  /*
-    We write the frm file with the LOCK_open mutex since otherwise we could
-    overwrite the frm file as another is reading it in open_table.
-  */
-  lpt->create_info->table_options= lpt->db_options;
-  VOID(pthread_mutex_lock(&LOCK_open));
-  if ((mysql_create_frm(lpt->thd, frm_name, lpt->db, lpt->table_name,
-                        lpt->create_info, lpt->new_create_list, lpt->key_count,
-                        lpt->key_info_buffer, lpt->table->file)) ||
-      ((flags & WFRM_CREATE_HANDLER_FILES) &&
-       lpt->table->file->create_handler_files(path)))
-  {
-    error= 1;
-    goto end;
+    /* Write shadow frm file */
+    lpt->create_info->table_options= lpt->db_options;
+    if ((mysql_create_frm(lpt->thd, shadow_frm_name, lpt->db,
+                          lpt->table_name, lpt->create_info,
+                          lpt->new_create_list, lpt->key_count,
+                          lpt->key_info_buffer, lpt->table->file)) ||
+         lpt->table->file->create_handler_files(shadow_path, NULL, FALSE))
+    {
+      my_delete(shadow_frm_name, MYF(0));
+      error= 1;
+      goto end;
+    }
   }
   if (flags & WFRM_PACK_FRM)
   {
@@ -362,7 +1225,7 @@
     */
     const void *data= 0;
     uint length= 0;
-    if (readfrm(path, &data, &length) ||
+    if (readfrm(shadow_path, &data, &length) ||
         packfrm(data, length, &lpt->pack_frm_data, &lpt->pack_frm_len))
     {
       my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
@@ -371,11 +1234,43 @@
       error= 1;
       goto end;
     }
-    error= my_delete(frm_name, MYF(MY_WME));
+    error= my_delete(shadow_frm_name, MYF(MY_WME));
+  }
+  if (flags & WFRM_INSTALL_SHADOW)
+  {
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+    partition_info *part_info= lpt->part_info;
+#endif
+    /*
+      Build frm file name
+    */
+    build_table_filename(path, sizeof(path), lpt->db,
+                         lpt->table_name, "");
+    strxmov(frm_name, path, reg_ext, NullS);
+    /*
+      When we are changing to use new frm file we need to ensure that we
+      don't collide with another thread in process to open the frm file.
+    */
+    VOID(pthread_mutex_lock(&LOCK_open));
+    if (my_delete(frm_name, MYF(MY_WME)) ||
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+        inactivate_table_log_entry(part_info->frm_log_entry->entry_pos) ||
+        (sync_table_log(), FALSE) ||
+#endif
+        my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
+        lpt->table->file->create_handler_files(path, shadow_path, TRUE))
+    {
+      error= 1;
+    }
+    VOID(pthread_mutex_unlock(&LOCK_open));
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+    inactivate_table_log_entry(part_info->frm_log_entry->entry_pos);
+    part_info->frm_log_entry= NULL;
+    VOID(sync_table_log());
+#endif
   }
-  /* Frm file have been updated to reflect the change about to happen.  */
+
 end:
-  VOID(pthread_mutex_unlock(&LOCK_open));
   DBUG_RETURN(error);
 }
 
@@ -4693,7 +5588,7 @@
       error= (mysql_create_frm(thd, reg_path, db, table_name,
                                create_info, prepared_create_list, key_count,
                                key_info_buffer, table->file) ||
-              table->file->create_handler_files(reg_path));
+              table->file->create_handler_files(reg_path, NULL, FALSE));
       VOID(pthread_mutex_unlock(&LOCK_open));
       if (error)
         goto err;
@@ -4739,7 +5634,7 @@
       error= (mysql_create_frm(thd, reg_path, db, table_name,
                                create_info, prepared_create_list, key_count,
                                key_info_buffer, table->file) ||
-              table->file->create_handler_files(reg_path));
+              table->file->create_handler_files(reg_path, NULL, FALSE));
       VOID(pthread_mutex_unlock(&LOCK_open));
       if (error)
         goto err;
@@ -4962,7 +5857,7 @@
       VOID(pthread_mutex_lock(&LOCK_open));
     }
     /* Tell the handler that a new frm file is in place. */
-    if (table->file->create_handler_files(reg_path))
+    if (table->file->create_handler_files(reg_path, NULL, FALSE))
     {
       VOID(pthread_mutex_unlock(&LOCK_open));
       goto err;

--- 1.464/sql/sql_yacc.yy	2006-02-20 20:51:26 +01:00
+++ 1.465/sql/sql_yacc.yy	2006-02-21 07:45:05 +01:00
@@ -3608,75 +3608,14 @@
           LEX *lex= Lex;
           partition_info *part_info= lex->part_info;
           partition_element *p_elem= new partition_element();
-          uint part_id= part_info->partitions.elements +
-                        part_info->temp_partitions.elements;
-          enum partition_state part_state;
+          uint part_id= part_info->partitions.elements;
 
-          if (part_info->part_state)
-            part_state= (enum partition_state)part_info->part_state[part_id];
-          else
-            part_state= PART_NORMAL;
-          switch (part_state)
+          if (!p_elem || part_info->partitions.push_back(p_elem))
           {
-            case PART_TO_BE_DROPPED:
-            /*
-              This part is currently removed so we keep it in a
-              temporary list for REPAIR TABLE to be able to handle
-              failures during drop partition process.
-            */
-            case PART_TO_BE_ADDED:
-            /*
-              This part is currently being added so we keep it in a
-              temporary list for REPAIR TABLE to be able to handle
-              failures during add partition process.
-            */
-              if (!p_elem || part_info->temp_partitions.push_back(p_elem))
-              {
-                mem_alloc_error(sizeof(partition_element));
-                YYABORT;
-              }
-              break;
-            case PART_IS_ADDED:
-            /*
-              Part has been added and is now a normal partition
-            */
-            case PART_TO_BE_REORGED:
-            /*
-              This part is currently reorganised, it is still however
-              used so we keep it in the list of partitions. We do
-              however need the state to be able to handle REPAIR TABLE
-              after failures in the reorganisation process.
-            */
-            case PART_REORGED_DROPPED:
-            /*
-              This part is currently reorganised as part of a
-              COALESCE PARTITION and it will be dropped without a new
-              replacement partition after completing the reorganisation.
-            */
-            case PART_CHANGED:
-            /*
-              This part is currently split or merged as part of ADD
-              PARTITION for a hash partition or as part of COALESCE
-              PARTITION for a hash partitioned table.
-            */
-            case PART_IS_CHANGED:
-            /*
-              This part has been split or merged as part of ADD
-              PARTITION for a hash partition or as part of COALESCE
-              PARTITION for a hash partitioned table.
-            */
-            case PART_NORMAL:
-              if (!p_elem || part_info->partitions.push_back(p_elem))
-              {
-                mem_alloc_error(sizeof(partition_element));
-                YYABORT;
-              }
-              break;
-            default:
-              mem_alloc_error((part_id * 1000) + part_state);
-              YYABORT;
+            mem_alloc_error(sizeof(partition_element));
+            YYABORT;
           }
-          p_elem->part_state= part_state;
+          p_elem->part_state= PART_NORMAL;
           part_info->curr_part_elem= p_elem;
           part_info->current_partition= p_elem;
           part_info->use_default_partitions= FALSE;

--- 1.209/sql/table.cc	2006-02-14 20:10:43 +01:00
+++ 1.210/sql/table.cc	2006-02-21 07:45:05 +01:00
@@ -633,36 +633,15 @@
 #endif
       next_chunk+= 5 + partition_info_len;
     }
-    if (share->mysql_version > 50105 && next_chunk + 5 < buff_end)
+#if 0
+    if (share->mysql_version == 50106)
     {
       /*
-        Partition state was introduced to support partition management in version 5.1.5
-      */
-      uint32 part_state_len= uint4korr(next_chunk);
-#ifdef WITH_PARTITION_STORAGE_ENGINE
-      if ((share->part_state_len= part_state_len))
-        if (!(share->part_state=
-              (uchar*) memdup_root(&share->mem_root, next_chunk + 4,
-                                   part_state_len)))
-        {
-          my_free(buff, MYF(0));
-          goto err;
-        }
-#else
-      if (part_state_len)
-      {
-        DBUG_PRINT("info", ("WITH_PARTITION_STORAGE_ENGINE is not defined"));
-        my_free(buff, MYF(0));
-        goto err;
-      }
-#endif
-      next_chunk+= 4 + part_state_len;
-    }
-#ifdef WITH_PARTITION_STORAGE_ENGINE
-    else
-    {
-      share->part_state_len= 0;
-      share->part_state= NULL;
+         Partition state array was here in version 5.1.6, this code makes
+         it possible to load a 5.1.6 table in later versions. Can most
+         likely be removed at some point in time.
+       */
+      next_chunk+= 4;
     }
 #endif
     keyinfo= share->key_info;

--- 1.74/sql/unireg.cc	2006-01-28 19:21:14 +01:00
+++ 1.75/sql/unireg.cc	2006-02-21 07:45:05 +01:00
@@ -136,7 +136,6 @@
   if (part_info)
   {
     create_info->extra_size+= part_info->part_info_len;
-    create_info->extra_size+= part_info->part_state_len;
   }
 #endif
 
@@ -209,12 +208,6 @@
         my_write(file, (const byte*)part_info->part_info_string,
                  part_info->part_info_len + 1, MYF_RW))
       goto err;
-    DBUG_PRINT("info", ("Part state len = %d", part_info->part_state_len));
-    int4store(buff, part_info->part_state_len);
-    if (my_write(file, (const byte*)buff, 4, MYF_RW) ||
-        my_write(file, (const byte*)part_info->part_state,
-                 part_info->part_state_len, MYF_RW))
-      goto err;
   }
   else
 #endif
@@ -330,7 +323,7 @@
 
   // Make sure mysql_create_frm din't remove extension
   DBUG_ASSERT(*fn_rext(frm_name));
-  if (file->create_handler_files(path))
+  if (file->create_handler_files(path, NULL, FALSE))
     goto err_handler;
   if (!create_info->frm_only && ha_create_table(thd, path, db, table_name,
                                                 create_info,0))

--- 1.84/sql/share/errmsg.txt	2006-02-20 20:51:26 +01:00
+++ 1.85/sql/share/errmsg.txt	2006-02-21 07:45:03 +01:00
@@ -5812,4 +5812,5 @@
 	eng "Transaction isolation level can't be changed while a transaction is in progress"
 ER_WARN_DEPRECATED_STATEMENT
         eng  "The '%s' statement is deprecated and will be removed in MySQL %s. Please use client programs (e.g. %s) instead."
-
+ER_TABLE_LOG_ERROR
+        eng "Error in table log"

--- 1.274/sql/ha_ndbcluster.cc	2006-02-20 12:19:33 +01:00
+++ 1.275/sql/ha_ndbcluster.cc	2006-02-21 07:45:01 +01:00
@@ -4463,7 +4463,9 @@
   DBUG_RETURN(my_errno);
 }
 
-int ha_ndbcluster::create_handler_files(const char *file) 
+int ha_ndbcluster::create_handler_files(const char *file,
+                                        const char *old_name,
+                                        bool rename_flag) 
 { 
   const char *name;
   Ndb* ndb;
@@ -4474,6 +4476,10 @@
 
   DBUG_ENTER("create_handler_files");
 
+  if (rename_flag)
+  {
+    DBUG_RETURN(FALSE);
+  }
   if (!(ndb= get_ndb()))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 

--- 1.119/sql/ha_ndbcluster.h	2006-02-13 21:55:41 +01:00
+++ 1.120/sql/ha_ndbcluster.h	2006-02-21 07:45:01 +01:00
@@ -611,7 +611,8 @@
   int rename_table(const char *from, const char *to);
   int delete_table(const char *name);
   int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
-  int create_handler_files(const char *file);
+  int create_handler_files(const char *file, const char *old_name,
+                           bool rename_flag);
   int get_default_no_partitions(ulonglong max_rows);
   bool get_no_parts(const char *name, uint *no_parts);
   void set_auto_partitions(partition_info *part_info);

--- 1.31/sql/ha_partition.cc	2006-02-17 10:33:59 +01:00
+++ 1.32/sql/ha_partition.cc	2006-02-21 07:45:01 +01:00
@@ -102,7 +102,9 @@
   alter_table_flags, /* Partition flags */
   NULL, /* Alter Tablespace */
   NULL, /* Fill FILES table */
-  HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
+  HTON_NOT_USER_SELECTABLE | HTON_HIDDEN,
+  NULL, /* binlog function */
+  NULL  /* binlog query */
 };
 
 /*
@@ -393,88 +395,6 @@
                 MODULE meta data changes
 ****************************************************************************/
 /*
-  Create partition names
-
-  SYNOPSIS
-    create_partition_name()
-    out:out                   Created partition name string
-    in1                       First part
-    in2                       Second part
-    name_variant              Normal, temporary or renamed partition name
-
-  RETURN VALUE
-    NONE
-
-  DESCRIPTION
-    This method is used to calculate the partition name, service routine to
-    the del_ren_cre_table method.
-*/
-
-#define NORMAL_PART_NAME 0
-#define TEMP_PART_NAME 1
-#define RENAMED_PART_NAME 2
-static void create_partition_name(char *out, const char *in1,
-                                  const char *in2, uint name_variant,
-                                  bool translate)
-{
-  char transl_part_name[FN_REFLEN];
-  const char *transl_part;
-
-  if (translate)
-  {
-    tablename_to_filename(in2, transl_part_name, FN_REFLEN);
-    transl_part= transl_part_name;
-  }
-  else
-    transl_part= in2;
-  if (name_variant == NORMAL_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part, NullS);
-  else if (name_variant == TEMP_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
-  else if (name_variant == RENAMED_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
-}
-
-/*
-  Create subpartition name
-
-  SYNOPSIS
-    create_subpartition_name()
-    out:out                   Created partition name string
-    in1                       First part
-    in2                       Second part
-    in3                       Third part
-    name_variant              Normal, temporary or renamed partition name
-
-  RETURN VALUE
-    NONE
-
-  DESCRIPTION
-  This method is used to calculate the subpartition name, service routine to
-  the del_ren_cre_table method.
-*/
-
-static void create_subpartition_name(char *out, const char *in1,
-                                     const char *in2, const char *in3,
-                                     uint name_variant)
-{
-  char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
-
-  tablename_to_filename(in2, transl_part_name, FN_REFLEN);
-  tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
-  if (name_variant == NORMAL_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part_name,
-            "#SP#", transl_subpart_name, NullS);
-  else if (name_variant == TEMP_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part_name,
-            "#SP#", transl_subpart_name, "#TMP#", NullS);
-  else if (name_variant == RENAMED_PART_NAME)
-    strxmov(out, in1, "#P#", transl_part_name,
-            "#SP#", transl_subpart_name, "#REN#", NullS);
-}
-
-
-/*
   Delete a table
 
   SYNOPSIS
@@ -564,7 +484,9 @@
     and types of engines in the partitions.
 */
 
-int ha_partition::create_handler_files(const char *name)
+int ha_partition::create_handler_files(const char *path,
+                                       const char *old_path,
+                                       bool rename_flag)
 {
   DBUG_ENTER("ha_partition::create_handler_files()");
 
@@ -572,10 +494,26 @@
     We need to update total number of parts since we might write the handler
     file as part of a partition management command
   */
-  if (create_handler_file(name))
+  if (rename_flag)
   {
-    my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
-    DBUG_RETURN(1);
+    char name[FN_REFLEN];
+    char old_name[FN_REFLEN];
+
+    strxmov(name, path, ha_par_ext, NullS);
+    strxmov(old_name, old_path, ha_par_ext, NullS);
+    if (my_delete(name, MYF(MY_WME)) ||
+        my_rename(old_name, name, MYF(MY_WME)))
+    {
+      DBUG_RETURN(TRUE);
+    }
+  }
+  else
+  {
+    if (create_handler_file(path))
+    {
+      my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
+      DBUG_RETURN(1);
+    }
   }
   DBUG_RETURN(0);
 }
@@ -641,45 +579,26 @@
 int ha_partition::drop_partitions(const char *path)
 {
   List_iterator<partition_element> part_it(m_part_info->partitions);
-  List_iterator<partition_element> temp_it(m_part_info->temp_partitions);
   char part_name_buff[FN_REFLEN];
   uint no_parts= m_part_info->partitions.elements;
   uint part_count= 0;
   uint no_subparts= m_part_info->no_subparts;
   uint i= 0;
   uint name_variant;
-  int  error= 1;
-  bool reorged_parts= (m_reorged_parts > 0);
-  bool temp_partitions= (m_part_info->temp_partitions.elements > 0);
+  int  ret_error;
+  int  error= 0;
   DBUG_ENTER("ha_partition::drop_partitions");
 
-  if (temp_partitions)
-    no_parts= m_part_info->temp_partitions.elements;
   do
   {
-    partition_element *part_elem;
-    if (temp_partitions)
-    {
-      /*
-        We need to remove the reorganised partitions that were put in the
-        temp_partitions-list.
-      */
-      part_elem= temp_it++;
-      DBUG_ASSERT(part_elem->part_state == PART_TO_BE_DROPPED);
-    }
-    else
-      part_elem= part_it++;
-    if (part_elem->part_state == PART_TO_BE_DROPPED ||
-        part_elem->part_state == PART_IS_CHANGED)
+    partition_element *part_elem= part_it++;
+    if (part_elem->part_state == PART_TO_BE_DROPPED)
     {
       handler *file;
       /*
         This part is to be dropped, meaning the part or all its subparts.
       */
       name_variant= NORMAL_PART_NAME;
-      if (part_elem->part_state == PART_IS_CHANGED ||
-          (part_elem->part_state == PART_TO_BE_DROPPED && temp_partitions))
-        name_variant= RENAMED_PART_NAME;
       if (m_is_sub_partitioned)
       {
         List_iterator<partition_element> sub_it(part_elem->subpartitions);
@@ -691,12 +610,10 @@
           create_subpartition_name(part_name_buff, path,
                                    part_elem->partition_name,
                                    sub_elem->partition_name, name_variant);
-          if (reorged_parts)
-            file= m_reorged_file[part_count++];
-          else
-            file= m_file[part];
+          file= m_file[part];
           DBUG_PRINT("info", ("Drop subpartition %s", part_name_buff));
-          error= file->delete_table((const char *) part_name_buff);
+          if ((ret_error= file->delete_table((const char *) part_name_buff)))
+            error= ret_error;
         } while (++j < no_subparts);
       }
       else
@@ -704,12 +621,10 @@
         create_partition_name(part_name_buff, path,
                               part_elem->partition_name, name_variant,
                               TRUE);
-        if (reorged_parts)
-          file= m_reorged_file[part_count++];
-        else
-          file= m_file[i];
+        file= m_file[i];
         DBUG_PRINT("info", ("Drop partition %s", part_name_buff));
-        error= file->delete_table((const char *) part_name_buff);
+        if ((ret_error= file->delete_table((const char *) part_name_buff)))
+          error= ret_error;
       }
       if (part_elem->part_state == PART_IS_CHANGED)
         part_elem->part_state= PART_NORMAL;
@@ -751,7 +666,8 @@
   uint no_subparts= m_part_info->no_subparts;
   uint i= 0;
   uint j= 0;
-  int error= 1;
+  int error= 0;
+  int ret_error;
   uint temp_partitions= m_part_info->temp_partitions.elements;
   handler *file;
   partition_element *part_elem, *sub_elem;
@@ -759,6 +675,14 @@
 
   if (temp_partitions)
   {
+    /*
+      These are the reorganised partitions that have already been copied.
+      We delete the partitions and log the delete by inactivating the
+      delete log entry in the table log. We only need to synchronise
+      these writes before moving to the next loop since there is no
+      interaction among reorganised partitions, they cannot have the
+      same name.
+    */
     do
     {
       part_elem= temp_it++;
@@ -769,39 +693,59 @@
         {
           sub_elem= sub_it++;
           file= m_reorged_file[part_count++];
-          create_subpartition_name(part_name_buff, path,
-                                   part_elem->partition_name,
-                                   sub_elem->partition_name,
-                                   RENAMED_PART_NAME);
           create_subpartition_name(norm_name_buff, path,
                                    part_elem->partition_name,
                                    sub_elem->partition_name,
                                    NORMAL_PART_NAME);
-          DBUG_PRINT("info", ("Rename subpartition from %s to %s",
-                     norm_name_buff, part_name_buff));
-          error= file->rename_table((const char *) norm_name_buff,
-                                    (const char *) part_name_buff);
+          DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
+          if ((ret_error= file->delete_table((const char *) norm_name_buff)))
+            error= ret_error;
+          else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
+            error= 1;
+          else
+            sub_elem->log_entry= NULL; /* Indicate success */
         } while (++j < no_subparts);
       }
       else
       {
         file= m_reorged_file[part_count++];
-        create_partition_name(part_name_buff, path,
-                              part_elem->partition_name, RENAMED_PART_NAME,
-                              TRUE);
         create_partition_name(norm_name_buff, path,
                               part_elem->partition_name, NORMAL_PART_NAME,
                               TRUE);
-        DBUG_PRINT("info", ("Rename partition from %s to %s",
-                   norm_name_buff, part_name_buff));
-        error= file->rename_table((const char *) norm_name_buff,
-                                  (const char *) part_name_buff);
+        DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
+        if ((ret_error= file->delete_table((const char *) norm_name_buff)))
+          error= ret_error;
+        else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
+          error= 1;
+        else
+          part_elem->log_entry= NULL; /* Indicate success */
       }
     } while (++i < temp_partitions);
+    VOID(sync_table_log());
   }
   i= 0;
   do
   {
+    /*
+       When state is PART_IS_CHANGED it means that we have created a new
+       TEMP partition that is to be renamed to normal partition name and
+       we are to delete the old partition with currently the normal name.
+       
+       We perform this operation by
+       1) Delete old partition with normal partition name
+       2) Signal this in table log entry
+       3) Synch table log to ensure we have consistency in crashes
+       4) Rename temporary partition name to normal partition name
+       5) Signal this to table log entry
+       It is not necessary to synch the last state since a new rename
+       should not corrupt things if there was no temporary partition.
+
+       The only other parts we need to cater for are new parts that
+       replace reorganised parts. The reorganised parts were deleted
+       by the code above that goes through the temp_partitions list.
+       Thus the synch above makes it safe to simply perform step 4 and 5
+       for those entries.
+    */
     part_elem= part_it++;
     if (part_elem->part_state == PART_IS_CHANGED ||
         (part_elem->part_state == PART_IS_ADDED && temp_partitions))
@@ -823,14 +767,12 @@
           if (part_elem->part_state == PART_IS_CHANGED)
           {
             file= m_reorged_file[part_count++];
-            create_subpartition_name(part_name_buff, path,
-                                     part_elem->partition_name,
-                                     sub_elem->partition_name,
-                                     RENAMED_PART_NAME);
-            DBUG_PRINT("info", ("Rename subpartition from %s to %s",
-                       norm_name_buff, part_name_buff));
-            error= file->rename_table((const char *) norm_name_buff,
-                                      (const char *) part_name_buff);
+            DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
+            if ((ret_error= file->delete_table((const char *) norm_name_buff)))
+              error= ret_error;
+            else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
+              error= 1;
+            VOID(sync_table_log());
           }
           file= m_new_file[part];
           create_subpartition_name(part_name_buff, path,
@@ -839,8 +781,13 @@
                                    TEMP_PART_NAME);
           DBUG_PRINT("info", ("Rename subpartition from %s to %s",
                      part_name_buff, norm_name_buff));
-          error= file->rename_table((const char *) part_name_buff,
-                                    (const char *) norm_name_buff);
+          if ((ret_error= file->rename_table((const char *) part_name_buff,
+                                             (const char *) norm_name_buff)))
+            error= ret_error;
+          else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
+            error= 1;
+          else
+            sub_elem->log_entry= NULL;
         } while (++j < no_subparts);
       }
       else
@@ -851,13 +798,12 @@
         if (part_elem->part_state == PART_IS_CHANGED)
         {
           file= m_reorged_file[part_count++];
-          create_partition_name(part_name_buff, path,
-                                part_elem->partition_name, RENAMED_PART_NAME,
-                                TRUE);
-          DBUG_PRINT("info", ("Rename partition from %s to %s",
-                     norm_name_buff, part_name_buff));
-          error= file->rename_table((const char *) norm_name_buff,
-                                    (const char *) part_name_buff);
+          DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
+          if ((ret_error= file->delete_table((const char *) norm_name_buff)))
+            error= ret_error;
+          else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
+            error= 1;
+          VOID(sync_table_log());
         }
         file= m_new_file[i];
         create_partition_name(part_name_buff, path,
@@ -865,11 +811,17 @@
                               TRUE);
         DBUG_PRINT("info", ("Rename partition from %s to %s",
                    part_name_buff, norm_name_buff));
-        error= file->rename_table((const char *) part_name_buff,
-                                  (const char *) norm_name_buff);
+        if ((ret_error= file->rename_table((const char *) part_name_buff,
+                                           (const char *) norm_name_buff)))
+          error= ret_error;
+        else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
+          error= 1;
+        else
+          part_elem->log_entry= NULL;
       }
     }
   } while (++i < no_parts);
+  VOID(sync_table_log());
   DBUG_RETURN(error);
 }
 
@@ -1861,8 +1813,8 @@
   {
     part_elem= part_it++;
     if (part_elem->part_state != PART_NORMAL &&
-        part_elem->part_state != PART_IS_ADDED &&
-        part_elem->part_state != PART_IS_CHANGED)
+        part_elem->part_state != PART_TO_BE_ADDED &&
+        part_elem->part_state != PART_CHANGED)
       continue;
     tablename_to_filename(part_elem->partition_name, part_name,
                           FN_REFLEN);
@@ -1913,8 +1865,8 @@
   {
     part_elem= part_it++;
     if (part_elem->part_state != PART_NORMAL &&
-        part_elem->part_state != PART_IS_ADDED &&
-        part_elem->part_state != PART_IS_CHANGED)
+        part_elem->part_state != PART_TO_BE_ADDED &&
+        part_elem->part_state != PART_CHANGED)
       continue;
     if (!m_is_sub_partitioned)
     {

--- 1.11/sql/ha_partition.h	2006-01-29 01:22:28 +01:00
+++ 1.12/sql/ha_partition.h	2006-02-21 07:45:02 +01:00
@@ -179,7 +179,8 @@
   virtual int rename_table(const char *from, const char *to);
   virtual int create(const char *name, TABLE *form,
 		     HA_CREATE_INFO *create_info);
-  virtual int create_handler_files(const char *name);
+  virtual int create_handler_files(const char *name,
+                                   const char *old_name, bool rename_flag);
   virtual void update_create_info(HA_CREATE_INFO *create_info);
   virtual char *update_table_comment(const char *comment);
   virtual int change_partitions(HA_CREATE_INFO *create_info,

--- 1.37/sql/sql_partition.cc	2006-02-15 12:52:22 +01:00
+++ 1.38/sql/sql_partition.cc	2006-02-21 07:45:04 +01:00
@@ -2390,15 +2390,10 @@
   char path[FN_REFLEN];
   int err= 0;
   List_iterator<partition_element> part_it(part_info->partitions);
-  List_iterator<partition_element> temp_it(part_info->temp_partitions);
   File fptr;
   char *buf= NULL; //Return buffer
-  uint use_temp= 0;
-  uint no_temp_parts= part_info->temp_partitions.elements;
-  bool write_part_state;
   DBUG_ENTER("generate_partition_syntax");
 
-  write_part_state= (part_info->part_state && !part_info->part_state_len);
   if (unlikely(((fptr= create_temp_file(path,mysql_tmpdir,"psy", 
                                         O_RDWR | O_BINARY | O_TRUNC |  
                                         O_TEMPORARY, MYF(MY_WME)))) < 0))
@@ -2463,67 +2458,26 @@
       err+= add_space(fptr);
     }
   }
-  no_parts= part_info->no_parts;
-  tot_no_parts= no_parts + no_temp_parts;
+  tot_no_parts= part_info->partitions.elements;
   no_subparts= part_info->no_subparts;
 
   if (write_all || (!part_info->use_default_partitions))
   {
+    bool first= TRUE;
     err+= add_begin_parenthesis(fptr);
     i= 0;
     do
     {
-      /*
-        We need to do some clever list manipulation here since we have two
-        different needs for our list processing and here we take some of the
-        cost of using a simpler list processing for the other parts of the
-        code.
-
-        ALTER TABLE REORGANIZE PARTITIONS has the list of partitions to be
-        the final list as the main list and the reorganised partitions is in
-        the temporary partition list. Thus when finding the first part added
-        we insert the temporary list if there is such a list. If there is no
-        temporary list we are performing an ADD PARTITION.
-      */
-      if (use_temp && use_temp <= no_temp_parts)
-      {
-        part_elem= temp_it++;
-        DBUG_ASSERT(no_temp_parts);
-        no_temp_parts--;
-      }
-      else if (use_temp)
-      {
-        DBUG_ASSERT(no_parts);
-        part_elem= save_part_elem;
-        use_temp= 0;
-        no_parts--;
-      }
-      else
-      {
-        part_elem= part_it++;
-        if ((part_elem->part_state == PART_TO_BE_ADDED ||
-             part_elem->part_state == PART_IS_ADDED) && no_temp_parts)
-        {
-          save_part_elem= part_elem;
-          part_elem= temp_it++;
-          no_temp_parts--;
-          use_temp= 1;
-        }
-        else
-        {
-          DBUG_ASSERT(no_parts);
-          no_parts--;
-        }
-      }
-
-      if (part_elem->part_state != PART_IS_DROPPED)
+      part_elem= part_it++;
+      if (part_elem->part_state != PART_TO_BE_DROPPED &&
+          part_elem->part_state != PART_REORGED_DROPPED)
       {
-        if (write_part_state)
+        if (!first)
         {
-          uint32 part_state_id= part_info->part_state_len;
-          part_info->part_state[part_state_id]= (uchar)part_elem->part_state;
-          part_info->part_state_len= part_state_id+1;
+          err+= add_comma(fptr);
+          err+= add_space(fptr);
         }
+        first= FALSE;
         err+= add_partition(fptr);
         err+= add_string(fptr, part_elem->partition_name);
         err+= add_space(fptr);
@@ -2553,16 +2507,10 @@
               err+= add_end_parenthesis(fptr);
           } while (++j < no_subparts);
         }
-        if (i != (tot_no_parts-1))
-        {
-          err+= add_comma(fptr);
-          err+= add_space(fptr);
-        }
       }
       if (i == (tot_no_parts-1))
         err+= add_end_parenthesis(fptr);
     } while (++i < tot_no_parts);
-    DBUG_ASSERT(!no_parts && !no_temp_parts);
   }
   if (err)
     goto close_file;
@@ -4030,27 +3978,6 @@
 
 /*
   SYNOPSIS
-    fast_alter_partition_error_handler()
-    lpt                           Container for parameters
-
-  RETURN VALUES
-    None
-
-  DESCRIPTION
-    Support routine to clean up after failures of on-line ALTER TABLE
-    for partition management.
-*/
-
-static void fast_alter_partition_error_handler(ALTER_PARTITION_PARAM_TYPE *lpt)
-{
-  DBUG_ENTER("fast_alter_partition_error_handler");
-  /* TODO: WL 2826 Error handling */
-  DBUG_VOID_RETURN;
-}
-
-
-/*
-  SYNOPSIS
     fast_end_partition()
     thd                           Thread object
     out:copied                    Number of records copied
@@ -4070,6 +3997,7 @@
 
 static int fast_end_partition(THD *thd, ulonglong copied,
                               ulonglong deleted,
+                              TABLE *table,
                               TABLE_LIST *table_list, bool is_empty,
                               ALTER_PARTITION_PARAM_TYPE *lpt,
                               bool written_bin_log)
@@ -4097,7 +4025,7 @@
     send_ok(thd,copied+deleted,0L,tmp_name);
     DBUG_RETURN(FALSE);
   }
-  fast_alter_partition_error_handler(lpt);
+  table->file->print_error(error, MYF(0));
   DBUG_RETURN(TRUE);
 }
 
@@ -4320,7 +4248,8 @@
           after the change as before. Thus we can reply ok immediately
           without any changes at all.
         */
-        DBUG_RETURN(fast_end_partition(thd, ULL(0), ULL(0), NULL,
+        DBUG_RETURN(fast_end_partition(thd, ULL(0), ULL(0),
+                                       table, NULL,
                                        TRUE, NULL, FALSE));
       }
       else if (new_part_no > curr_part_no)
@@ -4642,6 +4571,7 @@
         my_error(ER_ROW_IS_REFERENCED, MYF(0));
         DBUG_RETURN(TRUE);
       }
+      tab_part_info->no_parts-= no_parts_dropped;
     }
     else if ((alter_info->flags & ALTER_OPTIMIZE_PARTITION) ||
              (alter_info->flags & ALTER_ANALYZE_PARTITION) ||
@@ -5092,14 +5022,17 @@
 static bool mysql_change_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
 {
   char path[FN_REFLEN+1];
+  int error;
+  handler *file= lpt->table->file;
   DBUG_ENTER("mysql_change_partitions");
 
   build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
-  DBUG_RETURN(lpt->table->file->change_partitions(lpt->create_info, path,
-                                                  &lpt->copied,
-                                                  &lpt->deleted,
-                                                  lpt->pack_frm_data,
-                                                  lpt->pack_frm_len));
+  DBUG_RETURN(file->change_partitions(lpt->create_info,
+                                      path,
+                                      &lpt->copied,
+                                      &lpt->deleted,
+                                      lpt->pack_frm_data,
+                                      lpt->pack_frm_len));
 }
 
 
@@ -5125,10 +5058,17 @@
 static bool mysql_rename_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
 {
   char path[FN_REFLEN+1];
+  int error;
   DBUG_ENTER("mysql_rename_partitions");
 
   build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
-  DBUG_RETURN(lpt->table->file->rename_partitions(path));
+  if ((error= lpt->table->file->rename_partitions(path)))
+  {
+    if (error != 1)
+      lpt->table->file->print_error(error, MYF(0));
+    DBUG_RETURN(TRUE);
+  }
+  DBUG_RETURN(FALSE);
 }
 
 
@@ -5159,11 +5099,13 @@
   List_iterator<partition_element> part_it(part_info->partitions);
   uint i= 0;
   uint remove_count= 0;
+  int error;
   DBUG_ENTER("mysql_drop_partitions");
 
   build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
-  if (lpt->table->file->drop_partitions(path))
+  if ((error= lpt->table->file->drop_partitions(path)))
   {
+    lpt->table->file->print_error(error, MYF(0));
     DBUG_RETURN(TRUE);
   }
   do
@@ -5181,6 +5123,810 @@
 
 
 /*
+  Insert log entry into list
+  SYNOPSIS
+    insert_part_info_log_entry_list()
+    log_entry
+  RETURN VALUES
+    NONE
+*/
+
+static
+void
+insert_part_info_log_entry_list(partition_info *part_info,
+                                TABLE_LOG_MEMORY_ENTRY *log_entry)
+{
+  log_entry->next_active_log_entry= part_info->first_log_entry;
+  part_info->first_log_entry= log_entry;
+}
+
+
+/*
+  Release all log entries for this partition info struct
+  SYNOPSIS
+    release_part_info_log_entries()
+    first_log_entry                 First log entry in list to release
+  RETURN VALUES
+    NONE
+*/
+
+static
+void
+release_part_info_log_entries(TABLE_LOG_MEMORY_ENTRY *log_entry)
+{
+  DBUG_ENTER("release_part_info_log_entries");
+
+  while (log_entry)
+  {
+    release_table_log_memory_entry(log_entry);
+    log_entry= log_entry->next_active_log_entry;
+  }
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Log an delete/rename frm file
+  SYNOPSIS
+    write_log_replace_delete_frm()
+    lpt                            Struct for parameters
+    next_entry                     Next reference to use in log record
+    path                           Name to rename from
+    rename_flag                    TRUE if rename, else delete
+  RETURN VALUES
+    TRUE                           Error
+    FALSE                          Success
+  DESCRIPTION
+    Support routine that writes a replace or delete of an frm file into the
+    table log. It also inserts an entry that keeps track of used space into
+    the partition info object
+*/
+
+static
+bool
+write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
+                            uint next_entry,
+                            const char *from_path,
+                            const char *to_path,
+                            bool replace_flag)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  DBUG_ENTER("write_log_replace_delete_frm");
+
+  if (replace_flag)
+    table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
+  else
+    table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
+  table_log_entry.next_entry= next_entry;
+  table_log_entry.handler_type= "frm";
+  table_log_entry.name= to_path;
+  if (replace_flag)
+    table_log_entry.from_name= from_path;
+  if (write_table_log_entry(&table_log_entry, &log_entry))
+  {
+    DBUG_RETURN(TRUE);
+  }
+  insert_part_info_log_entry_list(lpt->part_info, log_entry);
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  Log final partition changes in change partition
+  SYNOPSIS
+    write_log_changed_partitions()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    This code is used to perform safe ADD PARTITION for HASH partitions
+    and COALESCE for HASH partitions and REORGANIZE for any type of
+    partitions.
+    We prepare entries for all partitions except the reorganised partitions
+    in REORGANIZE partition, those are handled by
+    write_log_dropped_partitions. For those partitions that are replaced
+    special care is needed to ensure that this is performed correctly and
+    this requires a two-phased approach with this log as a helper for this.
+
+    This code is closely intertwined with the code in rename_partitions in
+    the partition handler.
+*/
+
+static
+bool
+write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
+                             uint *next_entry, const char *path)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  char tmp_path[FN_LEN];
+  char normal_path[FN_LEN];
+  List_iterator<partition_element> part_it(part_info->partitions);
+  uint temp_partitions= part_info->temp_partitions.elements;
+  uint no_elements= part_info->partitions.elements;
+  uint i= 0;
+  DBUG_ENTER("write_log_changed_partitions");
+
+  do
+  {
+    partition_element *part_elem= part_it++;
+    if (part_elem->part_state == PART_IS_CHANGED ||
+        (part_elem->part_state == PART_IS_ADDED && temp_partitions))
+    {
+      if (is_sub_partitioned(part_info))
+      {
+        List_iterator<partition_element> sub_it(part_elem->subpartitions);
+        uint no_subparts= part_info->no_subparts;
+        uint j= 0;
+        do
+        {
+          partition_element *sub_elem= sub_it++;
+          table_log_entry.next_entry= *next_entry;
+          table_log_entry.handler_type=
+               ha_resolve_storage_engine_name(sub_elem->engine_type);
+          create_subpartition_name(tmp_path, path,
+                                   part_elem->partition_name,
+                                   sub_elem->partition_name,
+                                   TEMP_PART_NAME);
+          create_subpartition_name(normal_path, path,
+                                   part_elem->partition_name,
+                                   sub_elem->partition_name,
+                                   NORMAL_PART_NAME);
+          table_log_entry.name= normal_path;
+          table_log_entry.from_name= tmp_path;
+          if (part_elem->part_state == PART_IS_CHANGED)
+            table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
+          else
+            table_log_entry.action_type= TLOG_RENAME_ACTION_CODE;
+          if (write_table_log_entry(&table_log_entry, &log_entry))
+          {
+            DBUG_RETURN(TRUE);
+          }
+          *next_entry= log_entry->entry_pos;
+          sub_elem->log_entry= log_entry;
+          insert_part_info_log_entry_list(part_info, log_entry);
+        } while (++j < no_subparts);
+      }
+      else
+      {
+        table_log_entry.next_entry= *next_entry;
+        table_log_entry.handler_type=
+               ha_resolve_storage_engine_name(part_elem->engine_type);
+        create_partition_name(tmp_path, path,
+                              part_elem->partition_name,
+                              TEMP_PART_NAME, TRUE);
+        create_partition_name(normal_path, path,
+                              part_elem->partition_name,
+                              NORMAL_PART_NAME, TRUE);
+        table_log_entry.name= normal_path;
+        table_log_entry.from_name= tmp_path;
+        if (part_elem->part_state == PART_IS_CHANGED)
+          table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
+        else
+          table_log_entry.action_type= TLOG_RENAME_ACTION_CODE;
+        if (write_table_log_entry(&table_log_entry, &log_entry))
+        {
+          DBUG_RETURN(TRUE);
+        }
+        *next_entry= log_entry->entry_pos;
+        part_elem->log_entry= log_entry;
+        insert_part_info_log_entry_list(part_info, log_entry);
+      }
+    }
+  } while (++i < no_elements);
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  Log dropped partitions
+  SYNOPSIS
+    write_log_dropped_partitions()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+*/
+
+static
+bool
+write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
+                             uint *next_entry,
+                             const char *path,
+                             bool temp_list)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  char tmp_path[FN_LEN];
+  List_iterator<partition_element> part_it(part_info->partitions);
+  List_iterator<partition_element> temp_it(part_info->temp_partitions);
+  uint no_temp_partitions= part_info->temp_partitions.elements;
+  uint no_elements= part_info->partitions.elements;
+  uint i= 0;
+  DBUG_ENTER("write_log_dropped_partitions");
+
+  table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
+  if (temp_list)
+    no_elements= no_temp_partitions;
+  while (no_elements--)
+  {
+    partition_element *part_elem;
+    if (temp_list)
+      part_elem= temp_it++;
+    else
+      part_elem= part_it++;
+    if (part_elem->part_state == PART_TO_BE_DROPPED ||
+        part_elem->part_state == PART_TO_BE_ADDED ||
+        part_elem->part_state == PART_CHANGED)
+    {
+      uint name_variant;
+      if (part_elem->part_state == PART_CHANGED ||
+          (part_elem->part_state == PART_TO_BE_ADDED &&
+           no_temp_partitions))
+        name_variant= TEMP_PART_NAME;
+      else
+        name_variant= NORMAL_PART_NAME;
+      if (is_sub_partitioned(part_info))
+      {
+        List_iterator<partition_element> sub_it(part_elem->subpartitions);
+        uint no_subparts= part_info->no_subparts;
+        uint j= 0;
+        do
+        {
+          partition_element *sub_elem= sub_it++;
+          table_log_entry.next_entry= *next_entry;
+          table_log_entry.handler_type=
+               ha_resolve_storage_engine_name(sub_elem->engine_type);
+          create_subpartition_name(tmp_path, path,
+                                   part_elem->partition_name,
+                                   sub_elem->partition_name,
+                                   name_variant);
+          table_log_entry.name= tmp_path;
+          if (write_table_log_entry(&table_log_entry, &log_entry))
+          {
+            DBUG_RETURN(TRUE);
+          }
+          *next_entry= log_entry->entry_pos;
+          if (temp_list)
+            sub_elem->log_entry= log_entry;
+          insert_part_info_log_entry_list(part_info, log_entry);
+        } while (++j < no_subparts);
+      }
+      else
+      {
+        table_log_entry.next_entry= *next_entry;
+        table_log_entry.handler_type=
+               ha_resolve_storage_engine_name(part_elem->engine_type);
+        create_partition_name(tmp_path, path,
+                              part_elem->partition_name,
+                              name_variant, TRUE);
+        table_log_entry.name= tmp_path;
+        if (write_table_log_entry(&table_log_entry, &log_entry))
+        {
+          DBUG_RETURN(TRUE);
+        }
+        *next_entry= log_entry->entry_pos;
+        if (temp_list)
+          part_elem->log_entry= log_entry;
+        insert_part_info_log_entry_list(part_info, log_entry);
+      }
+    }
+  }
+  DBUG_RETURN(FALSE);
+}
+
+
+/*
+  Set execute log entry in table log for this partitioned table
+  SYNOPSIS
+    set_part_info_exec_log_entry()
+    part_info                      Partition info object
+    exec_log_entry                 Log entry
+  RETURN VALUES
+    NONE
+*/
+
+static
+void
+set_part_info_exec_log_entry(partition_info *part_info,
+                             TABLE_LOG_MEMORY_ENTRY *exec_log_entry)
+{
+  part_info->exec_log_entry= exec_log_entry;
+  exec_log_entry->next_active_log_entry= NULL;
+}
+
+
+/*
+  Write the log entry to ensure that the shadow frm file is removed at
+  crash.
+  SYNOPSIS
+    write_log_drop_shadow_frm()
+    lpt                      Struct containing parameters
+    install_frm              Should we log action to install shadow frm or should
+                             the action be to remove the shadow frm file.
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Prepare an entry to the table log indicating a drop/install of the shadow frm
+    file and its corresponding handler file.
+*/
+
+static
+bool
+write_log_drop_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  TABLE_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
+  char shadow_path[FN_LEN];
+  DBUG_ENTER("write_log_drop_shadow_frm");
+
+  build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
+                       lpt->table_name, "#");
+  lock_global_table_log();
+  do
+  {
+    if (write_log_replace_delete_frm(lpt, 0UL, NULL,
+                                    (const char*)shadow_path, FALSE))
+      break;
+    log_entry= part_info->first_log_entry;
+    if (write_execute_table_log_entry(log_entry->entry_pos,
+                                      FALSE, &exec_log_entry))
+      break;
+    unlock_global_table_log();
+    set_part_info_exec_log_entry(part_info, exec_log_entry);
+    DBUG_RETURN(FALSE);
+  } while (TRUE);
+  release_part_info_log_entries(part_info->first_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= NULL;
+  my_error(ER_TABLE_LOG_ERROR, MYF(0));
+  DBUG_RETURN(TRUE);
+}
+
+
+/*
+  Log renaming of shadow frm to real frm name and dropping of old frm
+  SYNOPSIS
+    write_log_rename_frm()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Prepare an entry to ensure that we complete the renaming of the frm
+    file if failure occurs in the middle of the rename process.
+*/
+
+static
+bool
+write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
+  char path[FN_LEN];
+  char shadow_path[FN_LEN];
+  TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
+  DBUG_ENTER("write_log_rename_frm");
+
+  part_info->first_log_entry= NULL;
+  build_table_filename(path, sizeof(path), lpt->db,
+                       lpt->table_name, "");
+  build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
+                       lpt->table_name, "#");
+  lock_global_table_log();
+  do
+  {
+    if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
+      break;
+    log_entry= part_info->first_log_entry;
+    part_info->frm_log_entry= log_entry;
+    if (write_execute_table_log_entry(log_entry->entry_pos,
+                                      FALSE, &exec_log_entry))
+      break;
+    release_part_info_log_entries(old_first_log_entry);
+    unlock_global_table_log();
+    DBUG_RETURN(FALSE);
+  } while (TRUE);
+  release_part_info_log_entries(part_info->first_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= old_first_log_entry;
+  part_info->frm_log_entry= NULL;
+  my_error(ER_TABLE_LOG_ERROR, MYF(0));
+  DBUG_RETURN(TRUE);
+}
+
+
+/*
+  Write the log entries to ensure that the drop partition command is completed
+  even in the presence of a crash.
+
+  SYNOPSIS
+    write_log_drop_partition()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Prepare entries to the table log indicating all partitions to drop and to
+    install the shadow frm file and remove the old frm file.
+*/
+
+static
+bool
+write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
+  char tmp_path[FN_LEN];
+  char path[FN_LEN];
+  uint next_entry= 0;
+  TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
+  DBUG_ENTER("write_log_drop_partition");
+
+  part_info->first_log_entry= NULL;
+  build_table_filename(path, sizeof(path), lpt->db,
+                       lpt->table_name, "");
+  build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
+                       lpt->table_name, "#");
+  lock_global_table_log();
+  do
+  {
+    if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
+                                     FALSE))
+      break;
+    if (write_log_replace_delete_frm(lpt, next_entry, (const char*)path,
+                                    (const char*)tmp_path, TRUE))
+      break;
+    log_entry= part_info->first_log_entry;
+    part_info->frm_log_entry= log_entry;
+    if (write_execute_table_log_entry(log_entry->entry_pos,
+                                      FALSE, &exec_log_entry))
+      break;
+    release_part_info_log_entries(old_first_log_entry);
+    unlock_global_table_log();
+    DBUG_RETURN(FALSE); 
+  } while (TRUE);
+  release_part_info_log_entries(part_info->first_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= old_first_log_entry;
+  part_info->frm_log_entry= NULL;
+  my_error(ER_TABLE_LOG_ERROR, MYF(0));
+  DBUG_RETURN(TRUE);
+}
+
+
+/*
+  Write the log entries to ensure that the add partition command is not
+  executed at all if a crash before it has completed
+
+  SYNOPSIS
+    write_log_add_change_partition()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    Prepare entries to the table log indicating all partitions to drop and to
+    remove the shadow frm file.
+    We always inject entries backwards in the list in the table log since we
+    don't know the entry position until we have written it.
+*/
+
+static
+bool
+write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  TABLE_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
+  char tmp_path[FN_LEN];
+  char path[FN_LEN];
+  uint next_entry= 0;
+  DBUG_ENTER("write_log_add_change_partition");
+
+  build_table_filename(path, sizeof(path), lpt->db,
+                       lpt->table_name, "");
+  build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
+                       lpt->table_name, "#");
+  lock_global_table_log();
+  do
+  {
+    if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
+                                     FALSE))
+      break;
+    if (write_log_replace_delete_frm(lpt, next_entry, NULL, tmp_path,
+                                    FALSE))
+      break;
+    log_entry= part_info->first_log_entry;
+    if (write_execute_table_log_entry(log_entry->entry_pos,
+                                      FALSE, &exec_log_entry))
+      break;
+    unlock_global_table_log();
+    set_part_info_exec_log_entry(part_info, exec_log_entry);
+    DBUG_RETURN(FALSE); 
+  } while (TRUE);
+  release_part_info_log_entries(part_info->first_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= NULL;
+  my_error(ER_TABLE_LOG_ERROR, MYF(0));
+  DBUG_RETURN(TRUE);
+}
+
+
+/*
+  Write description of how to complete the operation after first phase of
+  change partitions.
+
+  SYNOPSIS
+    write_log_final_change_partition()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+  DESCRIPTION
+    We will write log entries that specify to remove all partitions reorganised,
+    to rename others to reflect the new naming scheme and to install the shadow
+    frm file.
+*/
+
+static
+bool
+write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
+{
+  TABLE_LOG_ENTRY table_log_entry;
+  partition_info *part_info= lpt->part_info;
+  TABLE_LOG_MEMORY_ENTRY *log_entry;
+  TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
+  char path[FN_LEN];
+  char shadow_path[FN_LEN];
+  TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
+  uint next_entry= 0;
+  DBUG_ENTER("write_log_final_change_partition");
+
+  part_info->first_log_entry= NULL;
+  build_table_filename(path, sizeof(path), lpt->db,
+                       lpt->table_name, "");
+  build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
+                       lpt->table_name, "#");
+  lock_global_table_log();
+  do
+  {
+    if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
+                                     TRUE))
+      break;
+    if (write_log_changed_partitions(lpt, &next_entry, (const char*)path))
+      break;
+    if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
+      break;
+    log_entry= part_info->first_log_entry;
+    part_info->frm_log_entry= log_entry;
+    if (write_execute_table_log_entry(log_entry->entry_pos,
+                                      FALSE, &exec_log_entry))
+      break;
+    release_part_info_log_entries(old_first_log_entry);
+    unlock_global_table_log();
+    DBUG_RETURN(FALSE);
+  } while (TRUE);
+  release_part_info_log_entries(part_info->first_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= old_first_log_entry;
+  part_info->frm_log_entry= NULL;
+  my_error(ER_TABLE_LOG_ERROR, MYF(0));
+  DBUG_RETURN(TRUE);
+}
+
+
+/*
+  Remove entry from table log and release resources for others to use
+
+  SYNOPSIS
+    write_log_completed()
+    lpt                      Struct containing parameters
+  RETURN VALUES
+    TRUE                     Error
+    FALSE                    Success
+*/
+
+static
+void
+write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt, bool dont_crash)
+{
+  partition_info *part_info= lpt->part_info;
+  uint count_loop= 0;
+  bool not_success;
+  TABLE_LOG_MEMORY_ENTRY *log_entry= part_info->exec_log_entry;
+  DBUG_ENTER("write_log_completed");
+
+  DBUG_ASSERT(log_entry);
+  lock_global_table_log();
+  do
+  {
+    if (!(not_success= write_execute_table_log_entry(0UL, TRUE, &log_entry)))
+      break;
+    my_sleep(1); 
+  } while (count_loop++ < 20);
+  if (not_success && !dont_crash)
+  {
+    /*
+      Failed to write 20 consecutive attempts to write. Bad...
+      We have completed the operation but have log records to REMOVE
+      stuff that shouldn't be removed. What clever things could one do
+      here?
+    */
+    abort();
+  }
+  release_part_info_log_entries(part_info->first_log_entry);
+  release_part_info_log_entries(part_info->exec_log_entry);
+  unlock_global_table_log();
+  part_info->exec_log_entry= NULL;
+  part_info->first_log_entry= NULL;
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+   Release all log entries
+   SYNOPSIS
+     release_log_entries()
+     part_info                  Partition info struct
+   RETURN VALUES
+     NONE
+*/
+
+static
+void
+release_log_entries(partition_info *part_info)
+{
+  lock_global_table_log();
+  release_part_info_log_entries(part_info->first_log_entry);
+  release_part_info_log_entries(part_info->exec_log_entry);
+  unlock_global_table_log();
+  part_info->first_log_entry= NULL;
+  part_info->exec_log_entry= NULL;
+}
+
+
+/*
+  Handle errors for ALTER TABLE for partitioning
+  SYNOPSIS
+    handle_alter_part_error()
+    lpt                        Struct carrying parameters
+    not_completed              Was request in complete phase when error occurred
+  RETURN VALUES
+    NONE
+*/
+
+void
+handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt, bool not_completed,
+                        bool drop_partition, bool frm_install)
+{
+  partition_info *part_info= lpt->part_info;
+  DBUG_ENTER("handle_alter_part_error");
+
+  if (!part_info->first_log_entry &&
+      execute_table_log_entry(part_info->first_log_entry->entry_pos))
+  {
+    /*
+      We couldn't recover from error, most likely manual interaction is required.
+    */
+    write_log_completed(lpt, FALSE);
+    release_log_entries(part_info);
+    if (not_completed)
+    {
+      char *text1= 
+      (char*)"Operation was unsuccessful, table is still intact, ";
+      if (drop_partition)
+      {
+        /* Table is still ok, but we left a shadow frm file behind. */
+        char *text2=
+      (char*)"but it is possible that a shadow frm file was left behind";
+        push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+                            "%s \n %s", text1, text2);
+      }
+      else
+      {
+        char *text2=
+      (char*)"but it is possible that a shadow frm file was left behind.";
+        char *text3=
+      (char*)"It is also possible that temporary partitions are left behind, ";
+        char *text4=
+      (char*)"these could be empty or more or less filled with records";
+        push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+                            "%s \n %s \n %s \n %s", text1, text2, text3, text4);
+      }
+    }
+    else
+    {
+      if (frm_install)
+      {
+        /*
+           Failed during install of shadow frm file, table isn't intact
+           and dropped partitions are still there
+        */
+        char *text1=
+      (char*)"Failed during alter of partitions, table is no longer intact, ";
+        char *text2=
+      (char*)"The frm file is in an unknown state, and a backup";
+        char *text3=
+      (char*)" is required.";
+        push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+                            "%s \n %s%s\n", text1, text2, text3);
+      }
+      else if (drop_partition)
+      {
+        /*
+          Table is ok, we have switched to new table but left dropped partitions
+          still in their places. We remove the log records and ask the user to
+          perform the action manually. We remove the log records and ask the user
+          to perform the action manually.
+        */
+        char *text1=
+      (char*)"Failed during drop of partitions, table is intact, ";
+        char *text2=
+      (char*)"Manual drop of remaining partitions is required";
+        push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+                            "%s\n%s", text1, text2);
+      }
+      else
+      {
+        /*
+          We failed during renaming of partitions. The table is most certainly in
+          a very bad state so we give user warning and disable the table by
+          writing an ancient frm version into it.
+        */
+        char *text1=
+      (char*)"Failed during renaming of partitions. We are now in a position";
+        char *text2=
+      (char*)" where table is not reusable";
+        char *text3=
+      (char*)"Table is disabled by writing ancient frm file version into it";
+        push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+                            "%s%s\n%s", text1, text2, text3);
+      }
+    }
+    
+  }
+  else
+  {
+    release_log_entries(part_info);
+    if (not_completed)
+    {
+      /*
+        We hit an error before things were completed but managed
+        to recover from the error. An error occurred and we have
+        restored things to original so no need for further action.
+      */
+      ;
+    }
+    else
+    {
+      /*
+        We hit an error after we had completed most of the operation
+        and were successful in a second attempt so the operation
+        actually is successful now. We need to issue a warning that
+        even though we reported an error the operation was successfully
+        completed.
+      */
+      push_warning(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
+   "Operation was successfully completed after failure of normal operation");
+    }
+  }
+  DBUG_VOID_RETURN;
+}
+
+
+/*
   Actually perform the change requested by ALTER TABLE of partitions
   previously prepared.
 
@@ -5221,9 +5967,12 @@
   ALTER_PARTITION_PARAM_TYPE lpt_obj;
   ALTER_PARTITION_PARAM_TYPE *lpt= &lpt_obj;
   bool written_bin_log= TRUE;
+  bool not_completed= TRUE;
+  bool frm_install= FALSE;
   DBUG_ENTER("fast_alter_partition_table");
 
   lpt->thd= thd;
+  lpt->part_info= part_info;
   lpt->create_info= create_info;
   lpt->create_list= create_list;
   lpt->key_list= key_list;
@@ -5255,17 +6004,18 @@
       In this case it is enough to call optimise_partitions, there is no
       need to change frm files or anything else.
     */
+    int error;
     written_bin_log= FALSE;
     if (((alter_info->flags & ALTER_OPTIMIZE_PARTITION) &&
-         (table->file->optimize_partitions(thd))) ||
+         (error= table->file->optimize_partitions(thd))) ||
         ((alter_info->flags & ALTER_ANALYZE_PARTITION) &&
-         (table->file->analyze_partitions(thd))) ||
+         (error= table->file->analyze_partitions(thd))) ||
         ((alter_info->flags & ALTER_CHECK_PARTITION) &&
-         (table->file->check_partitions(thd))) ||
+         (error= table->file->check_partitions(thd))) ||
         ((alter_info->flags & ALTER_REPAIR_PARTITION) &&
-         (table->file->repair_partitions(thd))))
+         (error= table->file->repair_partitions(thd))))
     {
-      fast_alter_partition_error_handler(lpt);
+      table->file->print_error(error, MYF(0));
       DBUG_RETURN(TRUE);
     }
   }
@@ -5310,10 +6060,9 @@
       1) Write the new frm, pack it and then delete it
       2) Perform the change within the handler
     */
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE | WFRM_PACK_FRM)) ||
-        (mysql_change_partitions(lpt)))
+    if (mysql_write_frm(lpt, WFRM_WRITE_SHADOW | WFRM_PACK_FRM) ||
+        mysql_change_partitions(lpt))
     {
-      fast_alter_partition_error_handler(lpt);
       DBUG_RETURN(TRUE);
     }
   }
@@ -5341,32 +6090,62 @@
       after a DROP PARTITION) if one ensured that failed accesses to the
       dropped partitions was aborted for sure (thus only possible for
       transactional engines).
-      
-      1) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
+
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the new frm file as a shadow frm
+      2) Write the table log to ensure that the operation is completed
+         even in the presence of a MySQL Server crash
+      3) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
          the table have completed
-      2) Write the new frm file where the partitions have changed but are
-         still remaining with the state PART_TO_BE_DROPPED
-      3) Write the bin log
-      4) Prepare MyISAM handlers for drop of partitions
-      5) Ensure that any users that has opened the table but not yet
+      4) Write the bin log
+         Unfortunately the writing of the binlog is not synchronised with
+         other logging activities. So no matter in which order the binlog
+         is written compared to other activities there will always be cases
+         where crashes make strange things occur. In this placement it can
+         happen that the ALTER TABLE DROP PARTITION gets performed in the
+         master but not in the slaves if we have a crash, after writing the
+         table log but before writing the binlog. A solution to this would
+         require writing the statement first in the table log and then
+         when recovering from the crash read the binlog and insert it into
+         the binlog if not written already.
+      5) Install the previously written shadow frm file
+      6) Ensure that any users that has opened the table but not yet
          reached the abort lock do that before downgrading the lock.
-      6) Drop the partitions
-      7) Write the frm file that the partition has been dropped
-      8) Wait until all accesses using the old frm file has completed
-      9) Complete query
+      7) Prepare MyISAM handlers for drop of partitions
+      8) Drop the partitions
+      9) Remove entries from table log
+      10) Wait until all accesses using the old frm file has completed
+      11) Complete query
+
+      We insert Error injections at all places where it could be interesting
+      to test if recovery is properly done.
     */
-    if ((abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
+    if (write_log_drop_shadow_frm(lpt) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_1") ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_2") ||
+        write_log_drop_partition(lpt) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_3") ||
+        ((not_completed= FALSE), FALSE) ||
+        (abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
         ((!thd->lex->no_write_to_binlog) &&
          (write_bin_log(thd, FALSE,
-                       thd->query, thd->query_length), FALSE)) ||
-        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
+                        thd->query, thd->query_length), FALSE)) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_4") ||
+        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_5") ||
+        ((frm_install= TRUE), FALSE) ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
+        ((frm_install= FALSE), FALSE) ||
         (close_open_tables_and_downgrade(lpt), FALSE) || 
-        (mysql_drop_partitions(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_6") ||
+        mysql_drop_partitions(lpt) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_7") ||
+        (write_log_completed(lpt, FALSE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_drop_partition_8") ||
         (mysql_wait_completed_table(lpt, table), FALSE))
     {
-      fast_alter_partition_error_handler(lpt);
+      handle_alter_part_error(lpt, not_completed, TRUE, frm_install);
       DBUG_RETURN(TRUE);
     }
   }
@@ -5383,28 +6162,45 @@
       miss updates made by a transaction serialised before it that are
       inserted into the new partition.
 
-      1) Write the new frm file where state of added partitions is
-         changed to PART_TO_BE_ADDED
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the new frm file as a shadow frm file
+      2) Log the changes to happen in table log
       2) Add the new partitions
       3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
          are still using the old partitioning scheme. Wait until all
          ongoing users have completed before progressing.
-      4) Write a new frm file of the table where the partitions are added
-         to the table.
-      5) Write binlog
-      6) Wait until all accesses using the old frm file has completed
-      7) Complete query
+      4) Write binlog
+      5) Now the change is completed except for the installation of the
+         new frm file. We thus write an action in the log to change to
+         the shadow frm file
+      6) Install the new frm file of the table where the partitions are
+         added to the table.
+      7) Wait until all accesses using the old frm file has completed
+      8) Remove entries from table log
+      9) Complete query
     */
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
-        (mysql_change_partitions(lpt)) ||
-        (abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
+    if (write_log_add_change_partition(lpt) ||
+        ERROR_INJECT_CRASH("crash_add_partition_1") ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECT_CRASH("crash_add_partition_2") ||
+        mysql_change_partitions(lpt) ||
+        ERROR_INJECT_CRASH("crash_add_partition_3") ||
+        (abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
         ((!thd->lex->no_write_to_binlog) &&
          (write_bin_log(thd, FALSE,
                         thd->query, thd->query_length), FALSE)) ||
-        (close_open_tables_and_downgrade(lpt), FALSE))
+        ERROR_INJECT_CRASH("crash_add_partition_4") ||
+        write_log_rename_frm(lpt) ||
+        ((not_completed= FALSE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_add_partition_5") ||
+        ((frm_install= TRUE), FALSE) ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
+        ERROR_INJECT_CRASH("crash_add_partition_6") ||
+        (close_open_tables_and_downgrade(lpt), FALSE) ||
+        (write_log_completed(lpt, FALSE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_add_partition_7")) 
     {
-      fast_alter_partition_error_handler(lpt);
+      handle_alter_part_error(lpt, not_completed, FALSE, frm_install);
       DBUG_RETURN(TRUE);
     }
   }
@@ -5441,44 +6237,57 @@
       use a lower lock level. This can be handled inside store_lock in the
       respective handler.
 
-      1) Write the new frm file where state of added partitions is
-         changed to PART_TO_BE_ADDED and the reorganised partitions
-         are set in state PART_TO_BE_REORGED.
-      2) Add the new partitions
+      0) Write an entry that removes the shadow frm file if crash occurs 
+      1) Write the shadow frm file of new partitioning
+      2) Log such that temporary partitions added in change phase are
+         removed in a crash situation
+      3) Add the new partitions
          Copy from the reorganised partitions to the new partitions
-      3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
+      4) Log that operation is completed and log all complete actions
+         needed to complete operation from here
+      5) Lock all partitions in TL_WRITE_ONLY to ensure that no users
          are still using the old partitioning scheme. Wait until all
          ongoing users have completed before progressing.
-      4) Prepare MyISAM handlers for rename and delete of partitions
-      5) Write a new frm file of the table where the partitions are
-         reorganised.
-      6) Rename the reorged partitions such that they are no longer
+      6) Prepare MyISAM handlers for rename and delete of partitions
+      7) Rename the reorged partitions such that they are no longer
          used and rename those added to their real new names.
-      7) Write bin log
-      8) Wait until all accesses using the old frm file has completed
-      9) Drop the reorganised partitions
-      10)Write a new frm file of the table where the partitions are
-         reorganised.
-      11)Wait until all accesses using the old frm file has completed
-      12)Complete query
+      8) Write bin log
+      9) Install the shadow frm file
+      10) Wait until all accesses using the old frm file has completed
+      11) Drop the reorganised partitions
+      12) Remove log entry
+      13)Wait until all accesses using the old frm file has completed
+      14)Complete query
     */
-
-    if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
-        (mysql_change_partitions(lpt)) ||
-        (abort_and_upgrade_lock(lpt)) ||
-        (mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
-        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
-        (mysql_rename_partitions(lpt)) ||
+    if (write_log_add_change_partition(lpt) ||
+        ERROR_INJECT_CRASH("crash_change_partition_1") ||
+        mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
+        ERROR_INJECT_CRASH("crash_change_partition_2") ||
+        mysql_change_partitions(lpt) ||
+        ERROR_INJECT_CRASH("crash_change_partition_3") ||
+        write_log_final_change_partition(lpt) ||
+        ERROR_INJECT_CRASH("crash_change_partition_4") ||
+        ((not_completed= FALSE), FALSE) ||
+        (abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
         ((!thd->lex->no_write_to_binlog) &&
          (write_bin_log(thd, FALSE,
                         thd->query, thd->query_length), FALSE)) ||
+        ERROR_INJECT_CRASH("crash_change_partition_5") ||
+        (table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_change_partition_6") ||
+        mysql_rename_partitions(lpt) ||
+        ((frm_install= TRUE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_change_partition_7") ||
+        mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
+        ERROR_INJECT_CRASH("crash_change_partition_8") ||
         (close_open_tables_and_downgrade(lpt), FALSE) ||
-        (mysql_drop_partitions(lpt)) ||
-        (mysql_write_frm(lpt, 0UL)) ||
+        ERROR_INJECT_CRASH("crash_change_partition_9") ||
+        (write_log_completed(lpt, FALSE), FALSE) ||
+        ERROR_INJECT_CRASH("crash_change_partition_10") ||
         (mysql_wait_completed_table(lpt, table), FALSE))
     {
-        fast_alter_partition_error_handler(lpt);
-        DBUG_RETURN(TRUE);
+      handle_alter_part_error(lpt, not_completed, FALSE, frm_install);
+      DBUG_RETURN(TRUE);
     }
   }
   /*
@@ -5486,7 +6295,7 @@
     user
   */
   DBUG_RETURN(fast_end_partition(thd, lpt->copied, lpt->deleted,
-                                 table_list, FALSE, lpt,
+                                 table, table_list, FALSE, lpt,
                                  written_bin_log));
 }
 #endif
@@ -6087,6 +6896,88 @@
   field->store(part_iter->field_vals.start, FALSE);
   part_iter->field_vals.start++;
   return part_iter->part_info->get_subpartition_id(part_iter->part_info);
+}
+
+
+/*
+  Create partition names
+
+  SYNOPSIS
+    create_partition_name()
+    out:out                   Created partition name string
+    in1                       First part
+    in2                       Second part
+    name_variant              Normal, temporary or renamed partition name
+
+  RETURN VALUE
+    NONE
+
+  DESCRIPTION
+    This method is used to calculate the partition name, service routine to
+    the del_ren_cre_table method.
+*/
+
+void
+create_partition_name(char *out, const char *in1,
+                      const char *in2, uint name_variant,
+                      bool translate)
+{
+  char transl_part_name[FN_REFLEN];
+  const char *transl_part;
+
+  if (translate)
+  {
+    tablename_to_filename(in2, transl_part_name, FN_REFLEN);
+    transl_part= transl_part_name;
+  }
+  else
+    transl_part= in2;
+  if (name_variant == NORMAL_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part, NullS);
+  else if (name_variant == TEMP_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
+  else if (name_variant == RENAMED_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
+}
+
+
+/*
+  Create subpartition name
+
+  SYNOPSIS
+    create_subpartition_name()
+    out:out                   Created partition name string
+    in1                       First part
+    in2                       Second part
+    in3                       Third part
+    name_variant              Normal, temporary or renamed partition name
+
+  RETURN VALUE
+    NONE
+
+  DESCRIPTION
+  This method is used to calculate the subpartition name, service routine to
+  the del_ren_cre_table method.
+*/
+
+void
+create_subpartition_name(char *out, const char *in1,
+                         const char *in2, const char *in3,
+                         uint name_variant)
+{
+  char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
+
+  tablename_to_filename(in2, transl_part_name, FN_REFLEN);
+  tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
+  if (name_variant == NORMAL_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part_name,
+            "#SP#", transl_subpart_name, NullS);
+  else if (name_variant == TEMP_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part_name,
+            "#SP#", transl_subpart_name, "#TMP#", NullS);
+  else if (name_variant == RENAMED_PART_NAME)
+    strxmov(out, in1, "#P#", transl_part_name,
+            "#SP#", transl_subpart_name, "#REN#", NullS);
 }
 #endif
 

--- 1.17/BUILD/compile-pentium-debug-max	2005-02-05 15:05:42 +01:00
+++ 1.18/BUILD/compile-pentium-debug-max	2006-02-21 07:45:00 +01:00
@@ -6,6 +6,6 @@
 extra_flags="$pentium_cflags $debug_cflags $max_cflags"
 c_warnings="$c_warnings $debug_extra_warnings"
 cxx_warnings="$cxx_warnings $debug_extra_warnings"
-extra_configs="$pentium_configs $debug_configs $max_configs"
+extra_configs="$pentium_configs $debug_configs $max_configs $error_inject"
 
 . "$path/FINISH.sh"

--- 1.53/BUILD/SETUP.sh	2006-01-23 22:17:07 +01:00
+++ 1.54/BUILD/SETUP.sh	2006-02-21 07:45:00 +01:00
@@ -71,6 +71,7 @@
 pentium64_cflags="$check_cpu_cflags -m64"
 ppc_cflags="$check_cpu_cflags"
 sparc_cflags=""
+error_inject="--with-error-inject "
 
 # be as fast as we can be without losing our ability to backtrace
 fast_cflags="-O3 -fno-omit-frame-pointer"
Thread
bk commit into 5.1 tree (mikron:1.2149)mikael21 Feb