List:Commits« Previous MessageNext Message »
From:Mattias Jonsson Date:February 17 2009 2:04pm
Subject:bzr commit into mysql-5.1 branch (mattias.jonsson:2799)
View as plain text  
#At file:///Users/mattiasj/clones/bzrroot/wl3513-51-pv_part2/ based on revid:mattias.jonsson@stripped

 2799 Mattias Jonsson	2009-02-17
      Work in progress
      
      Added handler::open_handler + handler::open_files to lower
      the number of open files in partitioned tables.
modified:
  include/myisam.h
  mysql-test/r/partition_1024_parts.result
  mysql-test/t/partition_1024_parts.test
  sql/ha_partition.cc
  sql/ha_partition.h
  sql/handler.cc
  sql/handler.h
  sql/mysqld.cc
  sql/set_var.cc
  sql/sql_class.h
  sql/sql_partition.cc
  storage/myisam/ha_myisam.cc
  storage/myisam/ha_myisam.h
  storage/myisam/mi_close.c
  storage/myisam/mi_extra.c
  storage/myisam/mi_locking.c
  storage/myisam/mi_open.c
  storage/myisam/myisamdef.h

=== modified file 'include/myisam.h'
--- a/include/myisam.h	2008-08-23 02:47:43 +0000
+++ b/include/myisam.h	2009-02-17 14:04:29 +0000
@@ -263,9 +263,16 @@ extern int (*myisam_test_invalid_symlink
 	/* Prototypes for myisam-functions */
 
 extern int mi_close(struct st_myisam_info *file);
+extern int mi_close_files(struct st_myisam_info *file);
+extern int mi_close_handler(struct st_myisam_info *file);
 extern int mi_delete(struct st_myisam_info *file,const uchar *buff);
 extern struct st_myisam_info *mi_open(const char *name,int mode,
 				      uint wait_if_locked);
+extern struct st_myisam_info *mi_open_handler(const char *name,int mode,
+                                              uint wait_if_locked);
+extern struct st_myisam_info *mi_open_files(struct st_myisam_info *file,
+                                            const char *name,int mode,
+                                            uint wait_if_locked);
 extern int mi_panic(enum ha_panic_function function);
 extern int mi_rfirst(struct st_myisam_info *file,uchar *buf,int inx);
 extern int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,

=== modified file 'mysql-test/r/partition_1024_parts.result'
--- a/mysql-test/r/partition_1024_parts.result	2009-02-13 08:45:03 +0000
+++ b/mysql-test/r/partition_1024_parts.result	2009-02-17 14:04:29 +0000
@@ -1,4 +1,9 @@
 drop table if exists t1, t2;
+SET @old_max:= @@max_open_partition_files;
+SET @@max_open_partition_files= 512;
+show variables like 'max_open_partition_files';
+Variable_name	Value
+max_open_partition_files	512
 CREATE TABLE t1
 (a INT PRIMARY KEY,
 b varchar(63),
@@ -5146,3 +5151,4 @@ sum(c)
 99805
 # clean up
 DROP TABLE t1;
+SET @@max_open_partition_files= @old_max;

=== modified file 'mysql-test/t/partition_1024_parts.test'
--- a/mysql-test/t/partition_1024_parts.test	2009-02-13 08:45:03 +0000
+++ b/mysql-test/t/partition_1024_parts.test	2009-02-17 14:04:29 +0000
@@ -7,11 +7,14 @@ drop table if exists t1, t2;
 # Verify that it does not deadlock when open/locking the partitions
 # concurrently in different orders.
 
+SET @old_max:= @@max_open_partition_files;
+SET @@max_open_partition_files= 512;
+SHOW VARIABLES LIKE 'max_open_partition_files';
 CREATE TABLE t1
 (a INT PRIMARY KEY,
- b varchar(63),
- c int,
- key(c, b))
+ b VARCHAR(63),
+ c INT,
+ KEY(c, b))
 PARTITION BY HASH (a)
 PARTITIONS 1024;
 FLUSH TABLES;
@@ -1054,7 +1057,7 @@ reap;
 connection default;
 # Will also chech the index_* functions
 --echo # verify result (via index)
-SELECT * FROM t1 order by a;
+SELECT * FROM t1 ORDER BY a;
 FLUSH TABLES;
 connection decrease_conn;
 --echo # fork off a SELECT sum in decrease_conn
@@ -1071,3 +1074,4 @@ disconnect decrease_conn;
 connection default;
 --echo # clean up
 DROP TABLE t1;
+SET @@max_open_partition_files= @old_max;

=== modified file 'sql/ha_partition.cc'
--- a/sql/ha_partition.cc	2009-02-13 11:04:15 +0000
+++ b/sql/ha_partition.cc	2009-02-17 14:04:29 +0000
@@ -247,6 +247,9 @@ void ha_partition::init_handler_variable
     this allows blackhole to work properly
   */
   m_no_locks= 0;
+  m_have_open_files= FALSE;
+  m_open_lru.entry= NULL;
+  m_open_lru.first= NO_CURRENT_PART_ID;
   m_index_inited= MAX_KEY;
 
 #ifdef DONT_HAVE_TO_BE_INITALIZED
@@ -284,6 +287,7 @@ ha_partition::~ha_partition()
       delete m_file[i];
   }
   my_free((char*) m_ordered_rec_buffer, MYF(MY_ALLOW_ZERO_PTR));
+  my_free((char*) m_open_lru.entry, MYF(MY_ALLOW_ZERO_PTR));
 
   clear_handler_file();
   DBUG_VOID_RETURN;
@@ -1092,7 +1096,7 @@ int ha_partition::handle_opt_partitions(
                           sub_elem->partition_name))
             DBUG_RETURN(HA_ADMIN_INTERNAL_ERROR);
 #endif
-          if ((error= prepare_use_partition(part, FALSE)) ||
+          if ((error= open_partition(part, FALSE)) ||
               (error= handle_opt_part(thd, check_opt, m_file[part], flag)))
           {
             /* print a line which partition the error belongs to */
@@ -1120,7 +1124,7 @@ int ha_partition::handle_opt_partitions(
                         part_elem->partition_name))
           DBUG_RETURN(HA_ADMIN_INTERNAL_ERROR);
 #endif
-        if ((error= prepare_use_partition(i, FALSE)) ||
+        if ((error= open_partition(i, FALSE)) ||
             (error= handle_opt_part(thd, check_opt, m_file[i], flag)))
         {
           /* print a line which partition the error belongs to */
@@ -1157,7 +1161,7 @@ bool ha_partition::check_and_repair(THD 
 
   do
   {
-    if (prepare_use_partition(file - m_file, FALSE) ||
+    if (open_partition(file - m_file, FALSE) ||
         (*file)->ha_check_and_repair(thd))
       DBUG_RETURN(TRUE);
   } while (*(++file));
@@ -1235,6 +1239,10 @@ int ha_partition::prepare_new_partition(
   if ((error= file->ha_create(part_name, tbl, create_info)))
     goto error;
   create_flag= TRUE;
+  /* The new files is fully opened, to avoid excessive open/close when copy */
+  if (m_have_open_files)
+    table->db_stat&= ~HA_OPEN_ONLY_FILES;
+
   if ((error= file->ha_open(tbl, part_name, m_mode, m_open_test_lock)))
     goto error;
   /*
@@ -1246,8 +1254,12 @@ int ha_partition::prepare_new_partition(
   if ((error= file->ha_external_lock(ha_thd(), m_lock_type)))
     goto error;
 
+  if (m_have_open_files)
+    table->db_stat|= HA_OPEN_ONLY_FILES;
   DBUG_RETURN(0);
 error:
+  if (m_have_open_files)
+    table->db_stat|= HA_OPEN_ONLY_FILES;
   if (create_flag)
     VOID(file->ha_delete_table(part_name));
   DBUG_RETURN(error);
@@ -1288,12 +1300,15 @@ void ha_partition::cleanup_new_partition
 
   if (m_added_file && m_added_file[0])
   {
+    bool saved_have_open_files= m_have_open_files;
     m_file= m_added_file;
     m_added_file= NULL;
+    m_have_open_files= FALSE;
 
     /* delete_table also needed, a bit more complex */
     close();
 
+    m_have_open_files= saved_have_open_files;
     m_added_file= m_file;
     m_file= save_m_file;
   }
@@ -1621,6 +1636,9 @@ int ha_partition::change_partitions(HA_C
     change_partitions has done all the preparations, now it is time to
     actually copy the data from the reorganised partitions to the new
     partitions.
+
+  NOTE
+    This function will not use separate open for partitions handler and files
 */
 
 int ha_partition::copy_partitions(ulonglong * const copied,
@@ -1652,7 +1670,7 @@ int ha_partition::copy_partitions(ulongl
     */
     for (from_part_id= 0; file != m_file[from_part_id]; from_part_id++)
       ;
-    if ((result= prepare_use_partition(from_part_id, FALSE)))
+    if ((result= open_partition(from_part_id, FALSE)))
       goto error;
     if ((result= file->ha_rnd_init(1)))
       goto error;
@@ -1687,6 +1705,7 @@ int ha_partition::copy_partitions(ulongl
         /* Copy record to new handler */
         (*copied)++;
         tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
+        /* m_new_file is always fully opened (not only open_handler) */
         result= m_new_file[new_part]->ha_write_row(m_rec0);
         reenable_binlog(thd);
         if (result)
@@ -1819,6 +1838,7 @@ uint ha_partition::del_ren_cre_table(con
 
   if (get_from_handler_file(from, ha_thd()->mem_root))
     DBUG_RETURN(TRUE);
+  DBUG_ASSERT(!m_have_open_files);
   DBUG_ASSERT(m_file_buffer);
   DBUG_PRINT("enter", ("from: (%s) to: (%s)", from, to));
   name_buffer_ptr= m_name_buffer_ptr;
@@ -1910,7 +1930,7 @@ partition_element *ha_partition::find_pa
   }
   DBUG_ASSERT(0);
   my_error(ER_OUT_OF_RESOURCES, MYF(0));
-  current_thd->fatal_error();                   // Abort
+  ha_thd()->fatal_error();                      // Abort
   return NULL;
 }
 
@@ -2410,7 +2430,7 @@ int ha_partition::open(const char *name,
 {
   char *name_buffer_ptr= m_name_buffer_ptr;
   int error= HA_ERR_INITIALIZATION;
-  uint alloc_len;
+  uint i, alloc_len, curr_part_id= 0;
   handler **file;
   char name_buff[FN_REFLEN];
   bool is_not_tmp_table= (table_share->tmp_table == NO_TMP_TABLE);
@@ -2454,7 +2474,10 @@ int ha_partition::open(const char *name,
     }
   }
 
-  /* Initialize the bitmap we use to minimize ha_start_bulk_insert calls */
+  if (!(m_name_buffer_idx= (uint*)my_malloc(m_tot_parts * sizeof(uint),
+                                             MYF(MY_WME))))
+    DBUG_RETURN(1);
+  /* Initialize the bitmap we use to minimize ha_start_bulk_insert calls. */
   if (bitmap_init(&m_bulk_insert_started, NULL, m_tot_parts, FALSE))
     DBUG_RETURN(1);
   m_bulk_insert_active= FALSE;
@@ -2487,13 +2510,74 @@ int ha_partition::open(const char *name,
     goto err_handler;
 
   file= m_file;
+  m_open_lru.first= NO_CURRENT_PART_ID;
+  if (!(m_open_lru.entry= (struct lru_entry_st*)
+                          my_malloc(sizeof(struct lru_entry_st) * m_tot_parts,
+                          MYF(MY_WME))))
+    goto err_handler;
+  for (i= 0; i < m_tot_parts; i++)
+  {
+    m_open_lru.entry[i].next= i + 1;
+    m_open_lru.entry[i].part_id= NO_CURRENT_PART_ID;
+  }
+  m_open_lru.entry[m_tot_parts - 1].next= NO_CURRENT_PART_ID;
+  m_open_lru.empty_list= 0;
+  if (table->s->next_number_keypart)
+  {
+    /* Would give very bad performance, and hard to handle */
+    m_have_open_files= FALSE;
+  }
+  else
+  {
+    m_have_open_files= TRUE;
+    table->db_stat|= HA_OPEN_NO_FILES;
+  }
+  strmov(m_name, name);
   do
   {
+    /*
+      TODO: Could gain speed for some memory cost by save this conversion 
+      if m_have_open_files
+    */
     create_partition_name(name_buff, name, name_buffer_ptr, NORMAL_PART_NAME,
                           FALSE);
     if ((error= (*file)->ha_open(table, (const char*) name_buff, mode,
                                  test_if_locked)))
-      goto err_handler;
+    {
+      /*
+        If the first partition does not support separate open functions for
+        initialize the handler and opening the files, then don't use it
+      */
+      if (error == HA_ERR_WRONG_COMMAND && m_have_open_files && file == m_file)
+      {
+        table->db_stat&= ~HA_OPEN_NO_FILES;
+        m_have_open_files= FALSE;
+        if ((error= (*file)->ha_open(table, (const char*) name_buff, mode,
+                                     test_if_locked)))
+          goto err_handler;
+      }
+      else
+        goto err_handler;
+    }
+    /* open the first partitions file for accurate data for ref_length etc. */
+    if (m_have_open_files)
+    {
+      m_name_buffer_idx[curr_part_id]= name_buffer_ptr - m_name_buffer_ptr;
+      DBUG_PRINT("info", ("m_name_buffer_idx %p [%d] %d", m_name_buffer_idx,
+                          curr_part_id, m_name_buffer_idx[curr_part_id]));
+      curr_part_id++;
+      if (file == m_file)
+      {
+        table->db_stat&= ~HA_OPEN_NO_FILES;
+        table->db_stat|= HA_OPEN_ONLY_FILES;
+        if (bitmap_init(&m_open_partitions, NULL, m_tot_parts, TRUE))
+          goto err_handler;
+        if ((error= open_partition(0, FALSE)))
+          goto err_handler;
+        table->db_stat&= ~HA_OPEN_ONLY_FILES;
+        table->db_stat|= HA_OPEN_NO_FILES;
+      }
+    }
     m_no_locks+= (*file)->lock_count();
     name_buffer_ptr+= strlen(name_buffer_ptr) + 1;
     set_if_bigger(ref_length, ((*file)->ref_length));
@@ -2514,6 +2598,25 @@ int ha_partition::open(const char *name,
       goto err_handler;
     }
   } while (*(++file));
+  if (m_have_open_files)
+  {
+    char *tmp;
+    uint part_name_tot_size;
+    DBUG_PRINT("info", ("All partitions support open_files"));
+    table->db_stat&= ~HA_OPEN_NO_FILES;
+    table->db_stat|= HA_OPEN_ONLY_FILES;
+    /*
+      Save a copy of all partitioning names in m_name_buffer_ptr,
+      since it is freed in clear_handle_file below.
+      This is to avoid walking in the part_info list for finding the
+      partitioning name in open_partition.
+    */
+    part_name_tot_size= name_buffer_ptr - m_name_buffer_ptr;
+    if (!(tmp= (char*) my_malloc(part_name_tot_size, MYF(MY_WME))))
+      goto err_handler;
+    memcpy(tmp, m_name_buffer_ptr, part_name_tot_size);
+    m_name_buffer_ptr= tmp;
+  }
   key_used_on_scan= m_file[0]->key_used_on_scan;
   implicit_emptied= m_file[0]->implicit_emptied;
   /*
@@ -2571,12 +2674,34 @@ int ha_partition::open(const char *name,
                             m_part_info->part_expr->get_monotonicity_info();
   else if (m_part_info->list_of_part_fields)
     m_part_func_monotonicity_info= MONOTONIC_STRICT_INCREASING;
+  /*
+    HA_STATUS_VARIABLE forces open of all partitions, must be done to
+    support myisam auto-repair.
+  */
   info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
   DBUG_RETURN(0);
 
 err_handler:
+  if (m_have_open_files)
+  {
+    table->db_stat&= ~HA_OPEN_NO_FILES;
+    table->db_stat|= HA_OPEN_ONLY_FILES;
+    close_partitions(TRUE);
+    if (file == m_file && m_name_buffer_ptr != name_buffer_ptr)
+    {
+      table->db_stat&= ~HA_OPEN_ONLY_FILES;
+      table->db_stat|= HA_OPEN_NO_FILES;
+      (*file)->close_handler();
+    }
+  }
+
   while (file-- != m_file)
-    (*file)->close();
+  {
+    if (m_have_open_files)
+      (*file)->close_handler();
+    else
+      (*file)->close();
+  }
   bitmap_free(&m_bulk_insert_started);
   bitmap_free(&m_reset_partitions);
   bitmap_free(&m_rnd_partitions);
@@ -2584,6 +2709,13 @@ err_handler:
   bitmap_free(&m_cache_partitions);
   if (!is_clone)
     bitmap_free(&(m_part_info->used_partitions));
+  if (m_have_open_files)
+  {
+    bitmap_free(&m_open_partitions);
+    table->db_stat&= ~HA_OPEN_ONLY_FILES;
+    table->db_stat&= ~HA_OPEN_NO_FILES;
+  }
+  my_free((char*) m_name_buffer_idx, MYF(MY_ALLOW_ZERO_PTR));
 
   DBUG_RETURN(error);
 }
@@ -2603,6 +2735,57 @@ handler *ha_partition::clone(MEM_ROOT *m
 }
 
 
+int ha_partition::close_partitions(bool close_all)
+{
+  uint part_id, i= m_open_lru.first, part_count= 0, prev= NO_CURRENT_PART_ID;
+  uint max_open_partitions= ha_thd()->variables.max_open_partition_files;
+  int error, saved_error= 0;
+  DBUG_ENTER("ha_partition::close_partitions");
+  DBUG_ASSERT(m_have_open_files);
+  table->db_stat|= HA_OPEN_ONLY_FILES;
+  table->db_stat&= ~HA_OPEN_NO_FILES;
+  while (i != NO_CURRENT_PART_ID)
+  {
+    part_id= m_open_lru.entry[i].part_id;
+    if (m_bulk_insert_active &&
+        bitmap_is_set(&m_bulk_insert_started, part_id))
+    {
+      (void) m_file[part_id]->ha_end_bulk_insert();
+      bitmap_clear_bit(&m_bulk_insert_started, part_id);
+    }
+    if (close_all || part_count >= max_open_partitions)
+    {
+      uint next_i= i;
+      DBUG_PRINT("info", ("closing partition %u (i %u next %u count %u)",
+                          part_id, i, m_open_lru.entry[i].next, part_count));
+      if ((error= m_file[part_id]->close_files() && !saved_error))
+        saved_error= error;
+      if (prev != NO_CURRENT_PART_ID)
+        m_open_lru.entry[prev].next= m_open_lru.entry[i].next;
+      else
+        m_open_lru.first= NO_CURRENT_PART_ID;
+      next_i= m_open_lru.entry[i].next;
+      m_open_lru.entry[i].next= m_open_lru.empty_list;
+      m_open_lru.empty_list= i;
+      m_open_lru.entry[i].part_id= NO_CURRENT_PART_ID;
+      bitmap_clear_bit(&m_open_partitions, part_id);
+      i= next_i;
+    }
+    else
+    {
+      prev= i;
+      i= m_open_lru.entry[i].next;
+    }
+    part_count++;
+  }
+#ifndef DBUG_OFF
+  if (close_all)
+    DBUG_ASSERT(bitmap_is_clear_all(&m_open_partitions));
+#endif
+  DBUG_RETURN(saved_error);
+}
+
+
 /*
   Close handler object
 
@@ -2629,6 +2812,13 @@ int ha_partition::close(void)
 
   DBUG_ASSERT(table->s == table_share);
   delete_queue(&m_queue);
+  if (m_have_open_files)
+  {
+    (void) close_partitions(TRUE);
+    table->db_stat&= ~HA_OPEN_ONLY_FILES;
+    table->db_stat|= HA_OPEN_NO_FILES;
+    bitmap_free(&m_open_partitions);
+  }
   bitmap_free(&m_bulk_insert_started);
   bitmap_free(&m_reset_partitions);
   bitmap_free(&m_rnd_partitions);
@@ -2640,9 +2830,15 @@ int ha_partition::close(void)
 repeat:
   do
   {
-    (*file)->close();
+    if (m_have_open_files && first)
+      (*file)->close_handler();
+    else
+      (*file)->close();
   } while (*(++file));
 
+  if (m_have_open_files && first)
+    table->db_stat&= ~HA_OPEN_NO_FILES;
+
   if (first && m_added_file && m_added_file[0])
   {
     file= m_added_file;
@@ -2650,15 +2846,15 @@ repeat:
     goto repeat;
   }
 
+  m_have_open_files= FALSE;
   m_handler_status= handler_closed;
   DBUG_RETURN(0);
 }
 
 
 /*
-  prepare_use_partition called directly before use of a partition,
-  to initialize index_init, rnd_init, bulk_start_insert etc.
-  if not done previously.
+  prepare_use_partition called directly after open of a partition,
+  or before use of the partition, to initiate it if not previously used.
 */
 int ha_partition::prepare_use_partition(uint part_id, bool insert)
 {
@@ -2712,10 +2908,6 @@ int ha_partition::prepare_use_partition(
 }
 
 
-#ifdef NOT_USED
-/*
-  This will be enabled when one can close the partitions files without
-  closing the partitions handler.
 /*
   prepare_close_partition, called before close partition, to end started
   operations.
@@ -2746,8 +2938,158 @@ int ha_partition::prepare_close_partitio
   }
   DBUG_RETURN(error);
 }
+
+
+int ha_partition::open_partition(uint part_id, bool is_insert)
+{
+  uint i, prev= NO_CURRENT_PART_ID;
+  int error= 0;
+  DBUG_ENTER("open_partition");
+  if (!m_have_open_files)
+    DBUG_RETURN(prepare_use_partition(part_id, is_insert));
+
+  table->db_stat|= HA_OPEN_ONLY_FILES;
+  table->db_stat&= ~HA_OPEN_NO_FILES;
+  /*
+    Use a LRU with max_open_partitions entries, if open then continue,
+    if needed close the last used partition, then open the partition.
+    (Another way would be to record the first max_open_partitions used
+    partitions and close all the other in reset/statement end.)
+  */
+  i= m_open_lru.first;
+  if (i == NO_CURRENT_PART_ID)
+  {
+    DBUG_PRINT("info", ("no previously opened partition, first %u",
+                        part_id));
+    /* empty LRU, open and and set as first/last in lru */
+    char name_buff[FN_REFLEN];
+    create_partition_name(name_buff, m_name,
+                          m_name_buffer_ptr + m_name_buffer_idx[part_id],
+                          NORMAL_PART_NAME, FALSE);
+    if ((error= m_file[part_id]->ha_open(table, name_buff, m_mode,
+                                         m_open_test_lock)))
+      DBUG_RETURN(error);
+    bitmap_set_bit(&m_open_partitions, part_id);
+    m_open_lru.entry[m_open_lru.empty_list].part_id= part_id;
+    m_open_lru.first= m_open_lru.empty_list;
+    m_open_lru.empty_list= m_open_lru.entry[m_open_lru.empty_list].next;
+    m_open_lru.entry[m_open_lru.first].next= NO_CURRENT_PART_ID;
+  }
+  else if (m_open_lru.entry[i].part_id != part_id)
+  {
+    bool found= FALSE;
+    /* NOT first in lru, so we need to look it up */
+    DBUG_PRINT("info", ("first %u part_id %u first part_id %u empty %u",
+                        i, part_id, m_open_lru.entry[i].part_id,
+                        m_open_lru.empty_list));
+    while (m_open_lru.entry[i].next != NO_CURRENT_PART_ID)
+    {
+      prev= i;
+      i= m_open_lru.entry[i].next;
+      DBUG_PRINT("info", ("i %u prev %u part_id %u next %u",
+                          i, prev, m_open_lru.entry[i].part_id,
+                          m_open_lru.entry[i].next));
+      if (m_open_lru.entry[i].part_id == part_id)
+      {
+        found= TRUE;
+        break;
+      }
+    }
+    if (found)
+    {
+      DBUG_ASSERT(prev != NO_CURRENT_PART_ID);
+      DBUG_PRINT("info", ("found part %u, reorder LRU first %u empty %u",
+                          part_id, m_open_lru.first, m_open_lru.empty_list));
+      /* already opened, just reorder the LRU */
+      m_open_lru.entry[prev].next= m_open_lru.entry[i].next;
+      m_open_lru.entry[i].next= m_open_lru.first;
+      m_open_lru.first= i;
+    }
+    else
+    {
+      char name_buff[FN_REFLEN];
+      uint index_to_use;
+      /*
+        We could close partitions if not index search, but we wait until
+        reset is called.
+      */
+#ifdef NOT_USED
+      /*
+        This is if we would limit the number of concurrently open partitions
+        during a statement. This will seriously affect the performance of
+        index searches (which uses all partitions) and sorting after scanning
+        (rnd_pos).
+
+        TODO:
+        (this code is a little bit out of sync...)
+        num_open_partitions and max_open_partitions must be added.
+        decrease the size of the m_open_lru.entry array to max_open..
+        reinit index at last position, re open on rnd_pos.
+      */
+      if (num_open_partitions >= max_open_partitions)
+      {
+        /* LRU is full, close and reuse the last entry */
+        uint close_id= m_open_lru.entry[i].part_id;
+        DBUG_PRINT("info", ("partition LRU list full, closing %u", close_id));
+        /* remove assertion to allow only one open partition at a time */
+        DBUG_ASSERT(prev != NO_CURRENT_PART_ID);
+        if (prev != NO_CURRENT_PART_ID)
+        {
+          m_open_lru.entry[prev].next= NO_CURRENT_PART_ID;
+        }
+        if ((error= prepare_close_partition(close_id)))
+        {
+          DBUG_PRINT("error", ("prepare_close_partition %d failed (%d)",
+                               close_id, error));
+          DBUG_RETURN(error);
+        }
+        if ((error= m_file[close_id]->close_files()))
+        {
+          DBUG_PRINT("error", ("close_files on part %d failed", close_id));
+          DBUG_RETURN(error);
+        }
+        index_to_use= i;
+      }
 #endif /* NOT_USED */
+      create_partition_name(name_buff, m_name,
+                            m_name_buffer_ptr + m_name_buffer_idx[part_id],
+                            NORMAL_PART_NAME, FALSE);
+      DBUG_PRINT("info", ("adding part %u in entry %u",
+                          part_id, index_to_use));
+      if ((error= m_file[part_id]->ha_open(table, name_buff, m_mode,
+                                           m_open_test_lock)))
+      {
+        DBUG_RETURN(error);
+      }
+      bitmap_set_bit(&m_open_partitions, part_id);
+      index_to_use= m_open_lru.empty_list;
+      m_open_lru.empty_list= m_open_lru.entry[m_open_lru.empty_list].next;
+      m_open_lru.entry[index_to_use].part_id= part_id;
+      m_open_lru.entry[index_to_use].next= m_open_lru.first;
+      m_open_lru.first= index_to_use;
+    }
+  }
+  else
+  {
+    DBUG_PRINT("info", ("partition %u was already first in LRU", part_id));
+  }
+  DBUG_RETURN(prepare_use_partition(part_id, is_insert));
+}
+
 
+bool ha_partition::partition_is_open(uint part_id)
+{
+  DBUG_ENTER("ha_partition::partition_is_open");
+  if (!m_have_open_files)
+    DBUG_RETURN(TRUE);
+  if (bitmap_is_set(&m_open_partitions, part_id))
+  {
+    DBUG_PRINT("info", ("partition %u is open", part_id));
+    DBUG_RETURN(TRUE);
+  }
+  DBUG_PRINT("info", ("partition %u is not open", part_id));
+  DBUG_RETURN(FALSE);
+}
 /****************************************************************************
                 MODULE start/end statement
 ****************************************************************************/
@@ -2905,6 +3247,9 @@ THR_LOCK_DATA **ha_partition::store_lock
   DESCRIPTION
     This method is called instead of external lock when the table is locked
     before the statement is executed.
+
+  NOTE
+    Not supported when separated open handler/file
 */
 
 int ha_partition::start_stmt(THD *thd, thr_lock_type lock_type)
@@ -2964,6 +3309,7 @@ uint ha_partition::lock_count() const
 void ha_partition::unlock_row()
 {
   DBUG_ENTER("ha_partition::unlock_row");
+  /* m_last_part must already be open! */
   m_file[m_last_part]->unlock_row();
   DBUG_VOID_RETURN;
 }
@@ -2987,6 +3333,9 @@ void ha_partition::unlock_row()
     the last committed row value under the cursor.
     Note: prune_partitions are already called before this call, so using
     pruning is OK.
+
+  NOTE
+    Not used/supported when separated open handler/file
 */
 void ha_partition::try_semi_consistent_read(bool yes)
 {
@@ -3114,7 +3463,7 @@ int ha_partition::write_row(uchar * buf)
   m_last_part= part_id;
   DBUG_PRINT("info", ("Insert in partition %d", part_id));
 
-  error= prepare_use_partition(part_id, TRUE);
+  error= open_partition(part_id, TRUE);
   if (!error)
   {
     tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
@@ -3180,7 +3529,7 @@ int ha_partition::update_row(const uchar
   }
 
   m_last_part= new_part_id;
-  if ((error= prepare_use_partition(new_part_id, TRUE)))
+  if ((error= open_partition(new_part_id, TRUE)))
     goto exit;
   if (new_part_id == old_part_id)
   {
@@ -3200,7 +3549,7 @@ int ha_partition::update_row(const uchar
     if (error)
       goto exit;
 
-    if ((error= prepare_use_partition(old_part_id, FALSE)))
+    if ((error= open_partition(old_part_id, FALSE)))
       goto exit;
     tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
     error= m_file[old_part_id]->ha_delete_row(old_data);
@@ -3276,7 +3625,7 @@ int ha_partition::delete_row(const uchar
     DBUG_RETURN(error);
   }
   m_last_part= part_id;
-  if ((error= prepare_use_partition(part_id, FALSE)))
+  if ((error= open_partition(part_id, FALSE)))
     DBUG_RETURN(error);
   tmp_disable_binlog(thd);
   error= m_file[part_id]->ha_delete_row(buf);
@@ -3325,7 +3674,7 @@ int ha_partition::delete_all_rows()
   file= m_file;
   do
   {
-    if ((error= prepare_use_partition(file - m_file, FALSE)))
+    if (m_have_open_files && (error= open_partition(file - m_file, FALSE)))
       DBUG_RETURN(error);
     if ((error= (*file)->ha_delete_all_rows()))
       DBUG_RETURN(error);
@@ -3440,7 +3789,7 @@ int ha_partition::end_bulk_insert()
     table scan or when the server wants to access data through rnd_pos.
 
     When scan is used we will scan one handler partition at a time.
-    When preparing for rnd_pos we will init all handler partitions.
+    When preparing for rnd_pos we will init all [open] handler partitions.
     No extra cache handling is needed when scannning is not performed.
 
     Before initialising we will call rnd_end to ensure that we clean up from
@@ -3508,7 +3857,7 @@ int ha_partition::rnd_init(bool scan)
     */
     rnd_end();
     /* for scanning, only use cache/rnd_init on one partition at a time */
-    if ((error= prepare_use_partition((uint) part_id, FALSE)))
+    if ((error= open_partition((uint) part_id, FALSE)))
       goto err;
     if ((error= m_file[part_id]->ha_rnd_init(scan)))
       goto err;
@@ -3519,7 +3868,7 @@ int ha_partition::rnd_init(bool scan)
   {
     /*
       no scan, don't use cache, and delay the ha_rnd_init(0) call to
-      be done in prepare_use_partition
+      be done in open_partition
     */
   }
   m_scan_value= scan ? 1 : 0;
@@ -3558,6 +3907,9 @@ int ha_partition::rnd_end()
   case 1:
     if (NO_CURRENT_PART_ID != m_part_spec.start_part)         // Table scan
     {
+      DBUG_ASSERT(!m_have_open_files ||
+                  (m_open_lru.entry[m_open_lru.first].part_id
+                   == m_part_spec.start_part));
       late_extra_no_cache(m_part_spec.start_part);
       DBUG_PRINT("info", ("rnd_end() on partition %u",
                           m_part_spec.start_part));
@@ -3621,7 +3973,8 @@ int ha_partition::rnd_next(uchar *buf)
   }
   
   DBUG_ASSERT(m_scan_value == 1);
-  if ((result= prepare_use_partition(part_id, FALSE)))
+  DBUG_ASSERT(partition_is_open(part_id));
+  if ((result= open_partition(part_id, FALSE)))
   {
     m_part_spec.start_part= NO_CURRENT_PART_ID;
     table->status= STATUS_NOT_FOUND;
@@ -3631,6 +3984,7 @@ int ha_partition::rnd_next(uchar *buf)
   
   while (TRUE)
   {
+    /* file should already be opened in rnd_init or previous call */
     result= file->rnd_next(buf);
     if (!result)
     {
@@ -3667,7 +4021,7 @@ int ha_partition::rnd_next(uchar *buf)
     m_last_part= part_id;
     m_part_spec.start_part= part_id;
     file= m_file[part_id];
-    if ((result= prepare_use_partition(part_id, FALSE)))
+    if ((result= open_partition(part_id, FALSE)))
       break;
     DBUG_PRINT("info", ("rnd_init(1) on partition %d", part_id));
     if ((result= file->ha_rnd_init(1)))
@@ -3713,6 +4067,7 @@ void ha_partition::position(const uchar 
   handler *file= m_file[m_last_part];
   DBUG_ENTER("ha_partition::position");
 
+  DBUG_ASSERT(partition_is_open(m_last_part));
   file->position(record);
   int2store(ref, m_last_part);
   memcpy((ref + PARTITION_BYTES_IN_POS), file->ref,
@@ -3758,15 +4113,21 @@ void ha_partition::column_bitmaps_signal
 
 int ha_partition::rnd_pos(uchar * buf, uchar *pos)
 {
+  int error;
   uint part_id;
   DBUG_ENTER("ha_partition::rnd_pos");
 
   part_id= uint2korr((const uchar *) pos);
   DBUG_ASSERT(part_id < m_tot_parts);
-  /* rnd_init(0) must have been called (done in prepare_use_partition) */
-  DBUG_ASSERT(bitmap_is_set(&m_rnd_partitions, part_id));
-  m_last_part= part_id;
-  DBUG_RETURN(m_file[part_id]->rnd_pos(buf, (pos + PARTITION_BYTES_IN_POS)));
+  /* rnd_init(0) will be called in open_partition if not already done */
+  if (!(error= open_partition(part_id, FALSE)))
+  {
+    m_last_part= part_id;
+    DBUG_ASSERT(m_scan_value == 0 &&
+                bitmap_is_set(&m_rnd_partitions, part_id));
+    error= m_file[part_id]->rnd_pos(buf, (pos + PARTITION_BYTES_IN_POS));
+  }
+  DBUG_RETURN(error);
 }
 
 
@@ -3891,10 +4252,7 @@ int ha_partition::index_init(uint inx, b
                        (*key_info)->key_part[i].field->field_index);
     } while (*(++key_info));
   }
-  /*
-    The real ha_index_init call is delayed until prepare_use_partition
-    is called.
-  */
+  /* The real ha_index_init call is delayed until open_partition is called */
   DBUG_ASSERT(m_index_inited == MAX_KEY);
   m_index_inited= active_index;
   DBUG_RETURN(error);
@@ -3933,7 +4291,7 @@ int ha_partition::index_end()
     part_id= file - m_file;
     if (bitmap_is_set(&m_index_partitions, part_id))
     {
-      if ((tmp= (*file)->ha_index_end()))
+      if ((tmp= m_file[part_id]->ha_index_end()))
         error= tmp;
       bitmap_clear_bit(&m_index_partitions, part_id);
     }
@@ -4538,7 +4896,7 @@ int ha_partition::handle_unordered_scan_
 
     if (!(bitmap_is_set(&(m_part_info->used_partitions), i)))
       continue;
-    if ((error= prepare_use_partition(i, FALSE)))
+    if ((error= open_partition(i, FALSE)))
       DBUG_RETURN(error);
     file= m_file[i];
     m_part_spec.start_part= i;
@@ -4646,7 +5004,7 @@ int ha_partition::handle_ordered_index_s
     int error;
     handler *file= m_file[i];
     DBUG_PRINT("info", ("looking for a record in part %u", i));
-    if ((error= prepare_use_partition(i, FALSE)))
+    if ((error= open_partition(i, FALSE)))
       DBUG_RETURN(error);
 
     switch (m_index_scan_type) {
@@ -4786,7 +5144,7 @@ int ha_partition::handle_ordered_next(uc
   handler *file= m_file[part_id];
   DBUG_ENTER("ha_partition::handle_ordered_next");
   
-  if ((error= prepare_use_partition(part_id, FALSE)))
+  if ((error= open_partition(part_id, FALSE)))
     DBUG_RETURN(error);
   if (m_index_scan_type == partition_read_range)
   {
@@ -4842,7 +5200,7 @@ int ha_partition::handle_ordered_prev(uc
   handler *file= m_file[part_id];
   DBUG_ENTER("ha_partition::handle_ordered_prev");
 
-  if ((error= prepare_use_partition(part_id, FALSE)))
+  if ((error= open_partition(part_id, FALSE)))
     DBUG_RETURN(error);
   if ((error= file->index_prev(rec_buf(part_id))))
   {
@@ -4974,10 +5332,10 @@ int ha_partition::info(uint flag)
         do
         {
           file= *file_array;
-          if (prepare_use_partition(file_array - m_file, FALSE))
+          if (open_partition(file_array - m_file, FALSE))
           {
-            sql_print_error("failed to use partition %d in info/auto_increment",
-                            (int) (file_array - m_file));
+            sql_print_error("info(STATUS_AUTO): failed to open partition %u",
+                            (uint) (file_array - m_file));
             auto_increment_value= ~(ulonglong)(0);
             break;
           }
@@ -5036,7 +5394,7 @@ int ha_partition::info(uint flag)
                         (file_array - m_file)))
       {
         file= *file_array;
-        if (!prepare_use_partition(file_array - m_file, FALSE))
+        if (!open_partition(file_array - m_file, FALSE))
         {
           file->info(HA_STATUS_VARIABLE);
           stats.records+= file->stats.records;
@@ -5049,8 +5407,8 @@ int ha_partition::info(uint flag)
         }
         else
         {
-          sql_print_error("failed to use partition %d in info/STATUS_VARIABLE",
-                          (int) (file_array - m_file));
+          sql_print_error("info(STATUS_VARIABLE): failed to open partition %u",
+                          (uint) (file_array - m_file));
         }
       }
     } while (*(++file_array));
@@ -5116,18 +5474,19 @@ int ha_partition::info(uint flag)
     handler *file;
 
     file= m_file[0];
-    if (!prepare_use_partition(0, FALSE))
+    if (!open_partition(0, FALSE))
     {
       file->info(HA_STATUS_CONST);
       stats.create_time= file->stats.create_time;
     }
     else
-      sql_print_error("failed to use partition 0 in info/STATUS_CONST");
+      sql_print_error("info(STATUS_CONST): failed to open partition 0");
     ref_length= m_ref_length;
   }
   if (flag & HA_STATUS_ERRKEY)
   {
     handler *file= m_file[m_last_part];
+    DBUG_ASSERT(partition_is_open(m_last_part));
     DBUG_PRINT("info", ("info: HA_STATUS_ERRKEY"));
     /*
       This flag is used to get index number of the unique index that
@@ -5152,15 +5511,15 @@ int ha_partition::info(uint flag)
     do
     {
       file= *file_array;
-      if (!prepare_use_partition(file_array - m_file, FALSE))
+      if (!open_partition(file_array - m_file, FALSE))
       {
         file->info(HA_STATUS_TIME);
         if (file->stats.update_time > stats.update_time)
           stats.update_time= file->stats.update_time;
       }
       else
-        sql_print_error("failed to use partition %d in info/STATUS_CONST",
-                        (int) (file_array - m_file));
+        sql_print_error("info(STATUS_CONST): failed to open partition %u",
+                        (uint) (file_array - m_file));
     } while (*(++file_array));
   }
   DBUG_RETURN(0);
@@ -5171,7 +5530,7 @@ void ha_partition::get_dynamic_partition
                                               uint part_id)
 {
   handler *file= m_file[part_id];
-  if (!prepare_use_partition(part_id, FALSE))
+  if (!open_partition(part_id, FALSE))
   {
     file->info(HA_STATUS_CONST | HA_STATUS_TIME | HA_STATUS_VARIABLE |
                HA_STATUS_NO_LOCK);
@@ -5190,7 +5549,7 @@ void ha_partition::get_dynamic_partition
       stat_info->check_sum= file->checksum();
   }
   else
-    sql_print_error("failed to open partition %u in info/STATUS_CONST",
+    sql_print_error("get_dynamic_partition_info: failed to open partition %u",
                     part_id);
   return;
 }
@@ -5659,6 +6018,9 @@ int ha_partition::reset(void)
         result= tmp;
     }
   } while (*(++file));
+  /* Decrease the number of open files used by partitions. */
+  if (m_have_open_files)
+    close_partitions(FALSE);
   DBUG_ASSERT(bitmap_is_clear_all(&m_rnd_partitions));
   DBUG_ASSERT(bitmap_is_clear_all(&m_index_partitions));
   DBUG_ASSERT(bitmap_is_clear_all(&m_cache_partitions));
@@ -5737,8 +6099,11 @@ int ha_partition::prepare_for_rename()
       if ((tmp= (*file)->extra(HA_EXTRA_PREPARE_FOR_RENAME)))
         result= tmp;      
     for (file= m_reorged_file; *file; file++)
+    {
+      DBUG_ASSERT(!m_have_open_files);
       if ((tmp= (*file)->extra(HA_EXTRA_PREPARE_FOR_RENAME)))
         result= tmp;   
+    }
     DBUG_RETURN(result);   
   }
   
@@ -5758,26 +6123,26 @@ int ha_partition::prepare_for_rename()
 
   DESCRIPTION
     There are different categories of flags:
-    - Can be called without prior use of the partition.
+    - Can be called without open_files have been called.
     - Only to be used on the partitions left after pruning.
     - Can be delayed until the next handler call
-      (will be handled/enabled/disabled in prepare_use_partition)
+      (will be handled/enabled/disabled in open_partition)
 */
 
 int ha_partition::loop_extra(enum ha_extra_function operation)
 {
   int result= 0, tmp;
   uint i;
-  bool only_used_partitions= FALSE, use_pruning= TRUE, use_delayed= TRUE;
-  bool do_use= TRUE;
+  bool only_opened_partitions= FALSE, use_pruning= TRUE, use_delayed= TRUE;
+  bool do_open= TRUE;
   DBUG_ENTER("ha_partition::loop_extra()");
   
   switch (operation) {
     case HA_EXTRA_KEYREAD:
     case HA_EXTRA_NO_KEYREAD:
-      /* when using keys, all partition will be used, so no need to delay */
+      /* when using keys, all partitions will be used, so no need to delay */
       use_delayed= FALSE;
-      do_use= FALSE;
+      do_open= FALSE;
       break;
     case HA_EXTRA_WRITE_CACHE:
       /* currently no pruning on write operations */
@@ -5789,8 +6154,8 @@ int ha_partition::loop_extra(enum ha_ext
     case HA_EXTRA_KEYREAD_PRESERVE_FIELDS:
     case HA_EXTRA_NORMAL:
     case HA_EXTRA_QUICK:
-      /* only neccesary on already used partitions */
-      only_used_partitions= TRUE;
+      /* only neccesary on already opened partitions */
+      only_opened_partitions= TRUE;
       break;
     case HA_EXTRA_PREPARE_FOR_RENAME:
       /* must be done on all partitions directly */
@@ -5801,7 +6166,7 @@ int ha_partition::loop_extra(enum ha_ext
     case HA_EXTRA_CACHE:
       /* How is it used, check in debug */
       DBUG_ASSERT(0);
-      only_used_partitions= TRUE;
+      only_opened_partitions= TRUE;
       use_pruning= FALSE;
       use_delayed= FALSE;
       break;
@@ -5815,20 +6180,28 @@ int ha_partition::loop_extra(enum ha_ext
   {
     if (use_pruning && !bitmap_is_set(&(m_part_info->used_partitions), i))
       continue;
-    if (!only_used_partitions &&
-        do_use &&
-        (tmp= prepare_use_partition(i, FALSE)))
+    if (!only_opened_partitions &&
+        do_open &&
+        (tmp= open_partition(i, FALSE)))
     {
-      sql_print_error("failed to use partition in loop_extra");
+      sql_print_error("failed to open partition %u in loop_extra(%u)",
+                      i, (uint) operation);
       result= tmp;
       continue;
     }
-    DBUG_PRINT("info", ("extra (%u) on part %u", operation, i));
-    if ((tmp= m_file[i]->extra(operation)))
-      result= tmp;
-    bitmap_set_bit(&m_reset_partitions, i);
-    if (operation == HA_EXTRA_NO_CACHE)
-       bitmap_clear_bit(&m_cache_partitions, i);
+    if (!only_opened_partitions || partition_is_open(i))
+    {
+      DBUG_PRINT("info", ("extra (%u) on part %u", operation, i));
+      if ((tmp= m_file[i]->extra(operation)))
+        result= tmp;
+      if (m_have_open_files)
+      {
+        bitmap_set_bit(&m_reset_partitions, i);
+        /* HA_EXTRA_CACHE is handled in prepare_extra_cache */
+        if (operation == HA_EXTRA_NO_CACHE)
+          bitmap_clear_bit(&m_cache_partitions, i);
+      }
+    }
   }
   DBUG_RETURN(result);
 }
@@ -5851,7 +6224,9 @@ void ha_partition::late_extra_cache(uint
 
   if (!m_extra_cache)
     DBUG_VOID_RETURN;
-  if (bitmap_is_set(&m_cache_partitions, partition_id))
+  DBUG_ASSERT(partition_is_open(partition_id));
+  if (!partition_is_open(partition_id) ||
+      bitmap_is_set(&m_cache_partitions, partition_id))
     DBUG_VOID_RETURN;
   if (m_extra_cache_size == 0)
     VOID(m_file[partition_id]->extra(HA_EXTRA_CACHE));
@@ -5879,6 +6254,9 @@ void ha_partition::late_extra_no_cache(u
 
   if (!m_extra_cache)
     DBUG_VOID_RETURN;
+  DBUG_ASSERT(partition_is_open(partition_id));
+  if (!partition_is_open(partition_id))
+    DBUG_VOID_RETURN;
   VOID(m_file[partition_id]->extra(HA_EXTRA_NO_CACHE));
   bitmap_clear_bit(&m_cache_partitions, partition_id);
   DBUG_VOID_RETURN;
@@ -5907,6 +6285,157 @@ const key_map *ha_partition::keys_to_use
 }
 
 
+#define MAX_CHECK_PARTS_FOR_OPTIMIZE 10
+struct st_records_in_range_args
+{
+  uint inx;
+  key_range *min_key;
+  key_range *max_key;
+  ha_rows rows;                            /* return value */
+};
+typedef st_records_in_range_args RECORDS_IN_RANGE_ARGS;
+/*
+  Get a estimate of optimizer data for the table
+
+  SYNOPSIS
+    get_optimizer_data()
+
+  RETURN VALUE
+    FALSE ok
+    TRUE  failure
+*/
+bool ha_partition::get_optimizer_data(enum enum_partition_optimizer_func func,
+                                      void *data)
+{
+  double total_time= 0, *ret_time;
+  ha_rows rows, total_rows= 0, *ret_rows;
+  RECORDS_IN_RANGE_ARGS *recs_arg;
+  uint first, last, part_id, num_used_parts, partitions_called, check_min_num;
+  bool force_open= FALSE;
+  DBUG_ENTER("ha_partition::get_optimizer_data");
+  partitions_called= num_used_parts= 0;
+
+  /* This might even be optimized also when not having open_files */
+  check_min_num= m_tot_parts;                /* lower when forced open */
+  if (func == func_records_in_range)
+    recs_arg= (RECORDS_IN_RANGE_ARGS*) data;
+
+  /*
+    First check if any of the already opened partitions can be used,
+    otherwise open and check a third of the used partitions, up to 10
+    and the result is greater than 0.
+  */
+  first= bitmap_get_first_set(&(m_part_info->used_partitions));
+  last= bitmap_get_last_set(&(m_part_info->used_partitions));
+restart_with_open:
+  for (part_id= first; part_id <= last ; part_id++)
+  {
+    if (bitmap_is_set(&(m_part_info->used_partitions), part_id))
+    {
+      num_used_parts++;
+      /*
+        For the first round, only use opened partitions,
+        on the other round, only use non opened partitions.
+      */
+      if (!(force_open ^ partition_is_open(part_id)))
+        continue;
+      switch (func) {
+      case func_scan_time:
+        total_time+= m_file[part_id]->scan_time();
+        break;
+      case func_records_in_range:
+        rows= m_file[part_id]->records_in_range(recs_arg->inx,
+                                                recs_arg->min_key,
+                                                recs_arg->max_key);
+        if (rows == HA_POS_ERROR)
+        {
+          recs_arg->rows= HA_POS_ERROR;
+          DBUG_RETURN(TRUE);
+        }
+        total_rows+= rows;
+        break;
+      case func_estimate_rows_upper_bound:
+        rows= m_file[part_id]->estimate_rows_upper_bound();
+        if (rows == HA_POS_ERROR)
+        {
+          ha_rows *ret= (ha_rows*) data;
+          *ret= HA_POS_ERROR;
+          DBUG_RETURN(TRUE);
+        }
+        total_rows+= rows;
+        break;
+      default:
+        DBUG_ASSERT(0);
+      }
+      partitions_called++;
+      if (partitions_called >= check_min_num)
+      {
+        /* 0 for result has a special meaning (i.e. empty), so assure this */
+        bool can_leave= TRUE;
+        switch (func) {
+          case func_scan_time:
+            if (total_time == 0)
+              can_leave= FALSE;
+            break;
+          case func_records_in_range:
+          case func_estimate_rows_upper_bound:
+            if (total_rows == 0)
+              can_leave= FALSE;
+            break;
+          default:
+            DBUG_ASSERT(0);
+        }
+        if (can_leave)
+          break;
+      }
+    }
+  }
+  if (!force_open && m_have_open_files)
+  {
+    /* 0 for result has a special meaning (i.e. empty), so assure this */
+    bool can_leave= TRUE;
+    switch (func) {
+      case func_scan_time:
+        if (total_time == 0)
+          can_leave= FALSE;
+        break;
+      case func_records_in_range:
+      case func_estimate_rows_upper_bound:
+        if (total_rows == 0)
+          can_leave= FALSE;
+        break;
+      default:
+        DBUG_ASSERT(0);
+    }
+    check_min_num= (num_used_parts + 2) / 3;
+    check_min_num= min(MAX_CHECK_PARTS_FOR_OPTIMIZE, check_min_num);
+    if (!can_leave && partitions_called < check_min_num)
+    {
+      force_open= TRUE;
+      goto restart_with_open;
+    }
+  }
+  switch (func) {
+    case func_scan_time:
+      ret_time= (double*) data;
+      *ret_time= total_time;
+      break;
+    case func_records_in_range:
+      recs_arg->rows= total_rows;
+      break;
+    case func_estimate_rows_upper_bound:
+      ret_rows= (ha_rows*) data;
+      *ret_rows= total_rows;
+      break;
+    default:
+      DBUG_ASSERT(0);
+      /* no return value */
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
+
 /*
   Return time for a scan of the table
 
@@ -5920,12 +6449,9 @@ const key_map *ha_partition::keys_to_use
 double ha_partition::scan_time()
 {
   double scan_time= 0;
-  uint i;
   DBUG_ENTER("ha_partition::scan_time");
 
-  for (i= 0; m_file[i]; i++)
-    if (bitmap_is_set(&(m_part_info->used_partitions), i))
-      scan_time+= m_file[i]->scan_time();
+  get_optimizer_data(func_scan_time, (void*) &scan_time);
   DBUG_RETURN(scan_time);
 }
 
@@ -5951,9 +6477,16 @@ double ha_partition::scan_time()
 
 double ha_partition::read_time(uint index, uint ranges, ha_rows rows)
 {
+  uint part_id= 0;
   DBUG_ENTER("ha_partition::read_time");
 
-  DBUG_RETURN(m_file[0]->read_time(index, ranges, rows));
+  if (m_have_open_files)
+  {
+    DBUG_ASSERT(m_open_lru.first != NO_CURRENT_PART_ID);
+    part_id= m_open_lru.entry[m_open_lru.first].part_id;
+    DBUG_ASSERT(partition_is_open(part_id));
+  }
+  DBUG_RETURN(m_file[part_id]->read_time(index, ranges, rows));
 }
 
 /*
@@ -5983,22 +6516,17 @@ double ha_partition::read_time(uint inde
 ha_rows ha_partition::records_in_range(uint inx, key_range *min_key,
 				       key_range *max_key)
 {
-  ha_rows rows, in_range= 0;
-  handler **file;
+  struct st_records_in_range_args func_arg;
   DBUG_ENTER("ha_partition::records_in_range");
 
-  file= m_file;
-  do
-  {
-    if (bitmap_is_set(&(m_part_info->used_partitions), (file - m_file)))
-    {
-      rows= (*file)->records_in_range(inx, min_key, max_key);
-      if (rows == HA_POS_ERROR)
-        DBUG_RETURN(HA_POS_ERROR);
-      in_range+= rows;
-    }
-  } while (*(++file));
-  DBUG_RETURN(in_range);
+  func_arg.inx= inx;
+  func_arg.min_key= min_key;
+  func_arg.max_key= max_key;
+  func_arg.rows= 0;
+
+  if (get_optimizer_data(func_records_in_range, (void*) &func_arg))
+    DBUG_RETURN(HA_POS_ERROR);
+  DBUG_RETURN(func_arg.rows);
 }
 
 
@@ -6014,21 +6542,11 @@ ha_rows ha_partition::records_in_range(u
 
 ha_rows ha_partition::estimate_rows_upper_bound()
 {
-  ha_rows rows, tot_rows= 0;
-  handler **file;
+  ha_rows tot_rows= 0;
   DBUG_ENTER("ha_partition::estimate_rows_upper_bound");
 
-  file= m_file;
-  do
-  {
-    if (bitmap_is_set(&(m_part_info->used_partitions), (file - m_file)))
-    {
-      rows= (*file)->estimate_rows_upper_bound();
-      if (rows == HA_POS_ERROR)
-        DBUG_RETURN(HA_POS_ERROR);
-      tot_rows+= rows;
-    }
-  } while (*(++file));
+  if (get_optimizer_data(func_estimate_rows_upper_bound, (void*) &tot_rows))
+    DBUG_RETURN(HA_POS_ERROR);
   DBUG_RETURN(tot_rows);
 }
 
@@ -6053,6 +6571,8 @@ ha_rows ha_partition::records()
   file= m_file;
   do
   {
+    if (open_partition(file - m_file, FALSE))
+      DBUG_RETURN(HA_POS_ERROR);
     rows= (*file)->records();
     if (rows == HA_POS_ERROR)
       DBUG_RETURN(HA_POS_ERROR);
@@ -6085,6 +6605,8 @@ bool ha_partition::can_switch_engines()
   file= m_file;
   do
   {
+    if (open_partition(file - m_file, FALSE))
+      DBUG_RETURN(FALSE);
     if (!(*file)->can_switch_engines())
       DBUG_RETURN(FALSE);
   } while (*(++file));
@@ -6102,9 +6624,16 @@ bool ha_partition::can_switch_engines()
 
 uint8 ha_partition::table_cache_type()
 {
+  uint part_id= 0;
   DBUG_ENTER("ha_partition::table_cache_type");
 
-  DBUG_RETURN(m_file[0]->table_cache_type());
+  if (m_have_open_files)
+  {
+    DBUG_ASSERT(m_open_lru.first != NO_CURRENT_PART_ID);
+    part_id= m_open_lru.entry[m_open_lru.first].part_id;
+    DBUG_ASSERT(partition_is_open(part_id));
+  }
+  DBUG_RETURN(m_file[part_id]->table_cache_type());
 }
 
 
@@ -6114,9 +6643,16 @@ uint8 ha_partition::table_cache_type()
 
 const char *ha_partition::index_type(uint inx)
 {
+  uint part_id= 0;
   DBUG_ENTER("ha_partition::index_type");
 
-  DBUG_RETURN(m_file[0]->index_type(inx));
+  if (m_have_open_files)
+  {
+    DBUG_ASSERT(m_open_lru.first != NO_CURRENT_PART_ID);
+    part_id= m_open_lru.entry[m_open_lru.first].part_id;
+    DBUG_ASSERT(partition_is_open(part_id));
+  }
+  DBUG_RETURN(m_file[part_id]->index_type(inx));
 }
 
 
@@ -6153,7 +6689,10 @@ void ha_partition::print_error(int error
   if (error == HA_ERR_NO_PARTITION_FOUND)
     m_part_info->print_no_partition_found(table);
   else
+  {
+    DBUG_ASSERT(partition_is_open(m_last_part));
     m_file[m_last_part]->print_error(error, errflag);
+  }
   DBUG_VOID_RETURN;
 }
 
@@ -6163,6 +6702,7 @@ bool ha_partition::get_error_message(int
   DBUG_ENTER("ha_partition::get_error_message");
 
   /* Should probably look for my own errors first */
+  DBUG_ASSERT(partition_is_open(m_last_part));
   DBUG_RETURN(m_file[m_last_part]->get_error_message(error, buf));
 }
 
@@ -6197,10 +6737,17 @@ bool ha_partition::check_if_incompatible
     the underlying handlers.
   */
   for (file= m_file; *file; file++)
+  {
+    if (open_partition(file - m_file, FALSE))
+    {
+      ret= COMPATIBLE_DATA_NO;
+      break;
+    }
     if ((ret=  (*file)->check_if_incompatible_data(create_info,
                                                    table_changes)) !=
         COMPATIBLE_DATA_YES)
       break;
+  }
   return ret;
 }
 
@@ -6219,8 +6766,12 @@ int ha_partition::add_index(TABLE *table
     partitioning function. So no need for extra check here.
   */
   for (file= m_file; *file; file++)
+  {
+    if (m_have_open_files && (ret= open_partition(file - m_file, FALSE)))
+      break;
     if ((ret=  (*file)->add_index(table_arg, key_info, num_of_keys)))
       break;
+  }
   return ret;
 }
 
@@ -6235,8 +6786,12 @@ int ha_partition::prepare_drop_index(TAB
     DROP INDEX does not affect partitioning.
   */
   for (file= m_file; *file; file++)
+  {
+    if (m_have_open_files && (ret= open_partition(file - m_file, FALSE)))
+      break;
     if ((ret=  (*file)->prepare_drop_index(table_arg, key_num, num_of_keys)))
       break;
+  }
   return ret;
 }
 
@@ -6247,8 +6802,12 @@ int ha_partition::final_drop_index(TABLE
   int ret= HA_ERR_WRONG_COMMAND;
 
   for (file= m_file; *file; file++)
+  {
+    if (m_have_open_files && (ret= open_partition(file - m_file, FALSE)))
+      break;
     if ((ret=  (*file)->final_drop_index(table_arg)))
       break;
+  }
   return ret;
 }
 
@@ -6375,6 +6934,7 @@ int ha_partition::cmp_ref(const uchar *r
     part_id= uint2korr(ref1);
     file= m_file[part_id];
     DBUG_ASSERT(part_id < m_tot_parts);
+    DBUG_ASSERT(partition_is_open(part_id));
     DBUG_RETURN(file->cmp_ref((ref1 + PARTITION_BYTES_IN_POS),
 			      (ref2 + PARTITION_BYTES_IN_POS)));
   }
@@ -6412,6 +6972,8 @@ int ha_partition::reset_auto_increment(u
   ha_data->next_auto_inc_val= 0;
   do
   {
+    if ((res= open_partition(file - m_file, FALSE)))
+      break;
     if ((res= (*file)->ha_reset_auto_increment(value)) != 0)
       break;
   } while (*(++file));
@@ -6455,6 +7017,18 @@ void ha_partition::get_auto_increment(ul
     lock_auto_increment();
     do
     {
+#ifdef NOT_USED
+      /* m_have_open_files is never set when next_number_keypart is */
+      if (open_partition(file - m_file, FALSE))
+      {
+        *first_value= ~(ulonglong)(0);
+        /* log that the error was between table/partition handler */
+        sql_print_error("failed to open partition in get_auto_increment");
+        unlock_auto_increment();
+        DBUG_VOID_RETURN;
+      }
+#endif
+      DBUG_ASSERT(!m_have_open_files);
       /* Only nb_desired_values = 1 makes sense */
       (*file)->get_auto_increment(offset, increment, 1,
                                  &first_value_part, &nb_reserved_values_part);
@@ -6525,6 +7099,7 @@ void ha_partition::release_auto_incremen
 
   if (table->s->next_number_keypart)
   {
+    DBUG_ASSERT(!m_have_open_files);
     for (uint i= 0; i < m_tot_parts; i++)
       m_file[i]->ha_release_auto_increment();
   }
@@ -6564,6 +7139,10 @@ void ha_partition::init_table_handle_for
 
 /****************************************************************************
                 MODULE enable/disable indexes
+
+  NOTE
+    These are altering the table properties,
+    so we need to go through and open all partitions.
 ****************************************************************************/
 
 /*
@@ -6583,6 +7162,8 @@ int ha_partition::disable_indexes(uint m
 
   for (file= m_file; *file; file++)
   {
+    if (m_have_open_files && (error= open_partition(file - m_file, FALSE)))
+      break;
     if ((error= (*file)->ha_disable_indexes(mode)))
       break;
   }
@@ -6607,6 +7188,8 @@ int ha_partition::enable_indexes(uint mo
 
   for (file= m_file; *file; file++)
   {
+    if (m_have_open_files && (error= open_partition(file - m_file, FALSE)))
+      break;
     if ((error= (*file)->ha_enable_indexes(mode)))
       break;
   }
@@ -6630,8 +7213,12 @@ int ha_partition::indexes_are_disabled(v
   int error= 0;
 
   for (file= m_file; *file; file++)
+  {
+    if (m_have_open_files && (error= open_partition(file - m_file, FALSE)))
+      break;
     if ((error= (*file)->indexes_are_disabled()))
       break;
+  }
   return error;
 }
 

=== modified file 'sql/ha_partition.h'
--- a/sql/ha_partition.h	2009-02-13 08:45:03 +0000
+++ b/sql/ha_partition.h	2009-02-17 14:04:29 +0000
@@ -74,6 +74,8 @@ private:
   uint m_open_test_lock;                // Open test_if_locked
   char *m_file_buffer;                  // Buffer with names
   char *m_name_buffer_ptr;		// Pointer to first partition name
+  uint *m_name_buffer_idx;              // Array of offset to partition names
+  char m_name[FN_REFLEN];               // Current opened table path name
   plugin_ref *m_engine_array;           // Array of types of the handlers
   handler **m_file;                     // Array of references to handler inst.
   uint m_file_tot_parts;                // Debug
@@ -183,6 +185,11 @@ private:
   /* used for prediction of start_bulk_insert rows */
   enum_monotonicity_info m_part_func_monotonicity_info;
   /*
+    The partitions support ::open_files, i.e. can open the handler and
+    the associated files separatly.
+  */
+  bool m_have_open_files;
+  /*
     Record every partition which have got a rnd_init(0) call, and use those in
     rnd_end, rnd_init(1) - scan is only set on the currently scanned partition.
   */
@@ -202,7 +209,25 @@ private:
     index_end.
   */
   MY_BITMAP m_index_partitions;
+  /* Every currently open partition, only used if m_have_open_files */
+  MY_BITMAP m_open_partitions;
 
+  /*
+    LRU structures for be able to close all handler files that has not been
+    recently used in the statement.
+    (Maybe we should keep the first used instead.)
+  */
+  struct lru_entry_st
+  {
+    uint part_id;
+    uint next;
+  };
+  struct lru_st
+  {
+    uint first;
+    uint empty_list;
+    struct lru_entry_st *entry;
+  } m_open_lru;
 public:
   handler *clone(MEM_ROOT *mem_root);
   virtual void set_part_info(partition_info *part_info)
@@ -322,10 +347,11 @@ public:
   virtual int open(const char *name, int mode, uint test_if_locked);
   virtual int close(void);
 private:
+  int  open_partition(uint part_id, bool insert);
+  bool partition_is_open(uint part_id);
   int  prepare_use_partition(uint part_id, bool insert);
-#ifdef NOT_YET_USED
   int  prepare_close_partition(uint close_id);
-#endif
+  int  close_partitions(bool close_all);
 public:
 
   /*
@@ -569,6 +595,20 @@ public:
      set by calling info(HA_STATUS_INFO) ?
      -------------------------------------------------------------------------
   */
+private:
+  enum enum_partition_optimizer_func
+  {
+    func_scan_time,
+    func_records_in_range,
+    func_estimate_rows_upper_bound,
+    func_records
+  };
+  /*
+    Helper function since the partitioning code is very similar for every
+    the optimizer hints/cost calls
+  */
+  bool get_optimizer_data(enum_partition_optimizer_func func, void *data);
+public:
 
   /*
     keys_to_use_for_scanning can probably be implemented as the

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2009-02-13 11:04:15 +0000
+++ b/sql/handler.cc	2009-02-17 14:04:29 +0000
@@ -2028,14 +2028,23 @@ int handler::ha_open(TABLE *table_arg, c
   DBUG_ASSERT(table->s == table_share);
   DBUG_ASSERT(alloc_root_inited(&table->mem_root));
 
-  if ((error=open(name,mode,test_if_locked)))
+  if (table->db_stat & HA_OPEN_NO_FILES)
+    error= open_handler(name,mode,test_if_locked);
+  else if (table->db_stat & HA_OPEN_ONLY_FILES)
+    error= open_files(name,mode,test_if_locked);
+  else
+    error= open(name,mode,test_if_locked);
+
+  if (error && (error == EACCES || error == EROFS) && mode == O_RDWR &&
+      (table->db_stat & HA_TRY_READ_ONLY))
   {
-    if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
-	(table->db_stat & HA_TRY_READ_ONLY))
-    {
-      table->db_stat|=HA_READ_ONLY;
-      error=open(name,O_RDONLY,test_if_locked);
-    }
+    table->db_stat|=HA_READ_ONLY;
+    if (table->db_stat & HA_OPEN_NO_FILES)
+      error= open_handler(name,mode,test_if_locked);
+    else if (table->db_stat & HA_OPEN_ONLY_FILES)
+      error= open_files(name,mode,test_if_locked);
+    else
+      error= open(name,O_RDONLY,test_if_locked);
   }
   if (error)
   {

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2008-11-25 09:55:30 +0000
+++ b/sql/handler.h	2009-02-17 14:04:29 +0000
@@ -218,17 +218,19 @@
   HA_GET_INFO does an implicit HA_ABORT_IF_LOCKED
 */
 
-#define HA_OPEN_KEYFILE		1
-#define HA_OPEN_RNDFILE		2
-#define HA_GET_INDEX		4
-#define HA_GET_INFO		8	/* do a ha_info() after open */
-#define HA_READ_ONLY		16	/* File opened as readonly */
+#define HA_OPEN_KEYFILE      1
+#define HA_OPEN_RNDFILE      (1L << 1)
+#define HA_GET_INDEX         (1L << 2)
+#define HA_GET_INFO          (1L << 3) /* do a ha_info() after open */
+#define HA_READ_ONLY         (1L << 4) /* File opened as readonly */
 /* Try readonly if can't open with read and write */
-#define HA_TRY_READ_ONLY	32
-#define HA_WAIT_IF_LOCKED	64	/* Wait if locked on open */
-#define HA_ABORT_IF_LOCKED	128	/* skip if locked on open.*/
-#define HA_BLOCK_LOCK		256	/* unlock when reading some records */
-#define HA_OPEN_TEMPORARY	512
+#define HA_TRY_READ_ONLY     (1L << 5)
+#define HA_WAIT_IF_LOCKED    (1L << 6) /* Wait if locked on open */
+#define HA_ABORT_IF_LOCKED   (1L << 7) /* skip if locked on open */
+#define HA_BLOCK_LOCK        (1L << 8) /* unlock when reading some records */
+#define HA_OPEN_TEMPORARY    (1L << 9) 
+#define HA_OPEN_NO_FILES     (1L << 10) /* use open_handler */
+#define HA_OPEN_ONLY_FILES   (1L << 11) /* use open_files */ 
 
 	/* Some key definitions */
 #define HA_KEY_NULL_LENGTH	1
@@ -1158,7 +1160,7 @@ public:
   {
     cached_table_flags= table_flags();
   }
-  /* ha_ methods: pubilc wrappers for private virtual API */
+  /* ha_ methods: public wrappers for private virtual API */
 
   int ha_open(TABLE *table, const char *name, int mode, int test_if_locked);
   int ha_index_init(uint idx, bool sorted)
@@ -1333,6 +1335,16 @@ public:
   virtual void column_bitmaps_signal();
   uint get_index(void) const { return active_index; }
   virtual int close(void)=0;
+  /* See open_files */
+  virtual int close_files(void)
+  {
+    return HA_ERR_WRONG_COMMAND;
+  }
+  /* See open_handler */
+  virtual int close_handler(void)
+  {
+    return HA_ERR_WRONG_COMMAND;
+  }
 
   /**
     @retval  0   Bulk update used by handler
@@ -1773,6 +1785,22 @@ private:
   */
 
   virtual int open(const char *name, int mode, uint test_if_locked)=0;
+  /*
+    open_handler is the part of open that does not opens any table specific
+    files, i.e. initalizations etc. If open_handler is supported, open_files,
+    close_files and close_hanlder MUST also be supported.
+  */
+  virtual int open_handler(const char *name, int mode, uint test_if_locked)
+  {
+    return HA_ERR_WRONG_COMMAND;
+  }
+  /*
+    open_files is the part of open that does opens any table specific files.
+  */
+  virtual int open_files(const char *name, int mode, uint test_if_locked)
+  {
+    return HA_ERR_WRONG_COMMAND;
+  }
   virtual int index_init(uint idx, bool sorted) { active_index= idx; return 0; }
   virtual int index_end() { active_index= MAX_KEY; return 0; }
   /**

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-12-22 23:56:44 +0000
+++ b/sql/mysqld.cc	2009-02-17 14:04:29 +0000
@@ -5610,7 +5610,8 @@ enum options_mysqld
   OPT_OLD_MODE,
   OPT_SLAVE_EXEC_MODE,
   OPT_GENERAL_LOG_FILE,
-  OPT_SLOW_QUERY_LOG_FILE
+  OPT_SLOW_QUERY_LOG_FILE,
+  OPT_PARTITION_FILES
 };
 
 
@@ -6156,6 +6157,13 @@ master-ssl",
    "Enable old-style user limits (before 5.0.3 user resources were counted per each user+host vs. per account)",
    (uchar**) &opt_old_style_user_limits, (uchar**) &opt_old_style_user_limits,
    0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+  {"max_open_partition_files", OPT_PARTITION_FILES,
+   "Maximum partition handlers per table that still have its files opened.",
+   (uchar**) &global_system_variables.max_open_partition_files,
+   (uchar**) &max_system_variables.max_open_partition_files, 0, GET_ULONG,
+   REQUIRED_ARG, 128, 0, 1024, 0, 0, 0},
+#endif
   {"pid-file", OPT_PID_FILE, "Pid file used by safe_mysqld.",
    (uchar**) &pidfile_name_ptr, (uchar**) &pidfile_name_ptr, 0, GET_STR,
    REQUIRED_ARG, 0, 0, 0, 0, 0, 0},

=== modified file 'sql/set_var.cc'
--- a/sql/set_var.cc	2008-11-28 15:27:12 +0000
+++ b/sql/set_var.cc	2009-02-17 14:04:29 +0000
@@ -393,6 +393,11 @@ static sys_var_thd_ha_rows	sys_sql_max_j
 					      &SV::max_join_size,
 					      fix_max_join_size);
 #endif
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+static sys_var_thd_ulong
+sys_max_open_partition_files(&vars, "max_open_partition_files",
+                             &SV::max_open_partition_files);
+#endif /* WITH_PARTITION_STORAGE_ENGINE */
 static sys_var_long_ptr_global
 sys_max_prepared_stmt_count(&vars, "max_prepared_stmt_count",
                             &max_prepared_stmt_count,

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2008-12-20 10:53:27 +0000
+++ b/sql/sql_class.h	2009-02-17 14:04:29 +0000
@@ -396,6 +396,11 @@ struct system_variables
   DATE_TIME_FORMAT *datetime_format;
   DATE_TIME_FORMAT *time_format;
   my_bool sysdate_is_now;
+
+#ifdef WITH_PARTITION_STORAGE_ENGINE
+  /* Maximum of partitions that have called the subhandler with open_files */
+  ulong max_open_partition_files;
+#endif
 };
 
 

=== modified file 'sql/sql_partition.cc'
--- a/sql/sql_partition.cc	2008-11-04 08:36:56 +0000
+++ b/sql/sql_partition.cc	2009-02-17 14:04:29 +0000
@@ -6102,6 +6102,8 @@ uint fast_alter_partition_table(THD *thd
       Thus the need to downgrade the lock disappears.
       1) Write the new frm, pack it and then delete it
       2) Perform the change within the handler
+
+      NOTE: No handler supports HA_PARTITION_ONE_PHASE yet, not even NDB.
     */
     if (mysql_write_frm(lpt, WFRM_WRITE_SHADOW | WFRM_PACK_FRM) ||
         mysql_change_partitions(lpt))

=== modified file 'storage/myisam/ha_myisam.cc'
--- a/storage/myisam/ha_myisam.cc	2008-12-20 10:53:27 +0000
+++ b/storage/myisam/ha_myisam.cc	2009-02-17 14:04:29 +0000
@@ -631,15 +631,37 @@ err:
 }
 #endif /* HAVE_REPLICATION */
 
+/*
+  Name is here without an extension
+  ::open_handler is the non file related part of ::open
+*/
+int ha_myisam::open_handler(const char *name, int mode, uint test_if_locked)
+{
+  DBUG_ENTER("ha_myisam::open_handler");
+  if (!(file=mi_open_handler(name, mode,
+                             test_if_locked | HA_OPEN_FROM_SQL_LAYER)))
+    DBUG_RETURN(my_errno ? my_errno : -1);
 
-/* Name is here without an extension */
-int ha_myisam::open(const char *name, int mode, uint test_if_locked)
+  if (test_if_locked & (HA_OPEN_IGNORE_IF_LOCKED | HA_OPEN_TMP_TABLE))
+    VOID(mi_extra(file, HA_EXTRA_NO_WAIT_LOCK, 0));
+  if (!(test_if_locked & HA_OPEN_WAIT_IF_LOCKED))
+    VOID(mi_extra(file, HA_EXTRA_WAIT_LOCK, 0));
+
+  DBUG_RETURN(0);
+}
+
+
+/*
+  ::open_files is the file related part of ::open
+*/
+int ha_myisam::open_files(const char *name, int mode, uint test_if_locked)
 {
   MI_KEYDEF *keyinfo;
   MI_COLUMNDEF *recinfo= 0;
   uint recs;
   uint i;
 
+  DBUG_ENTER("ha_myisam::open_files");
   /*
     If the user wants to have memory mapped data files, add an
     open_flag. Do not memory map temporary tables because they are
@@ -658,8 +680,9 @@ int ha_myisam::open(const char *name, in
   if (!(test_if_locked & HA_OPEN_TMP_TABLE) && opt_myisam_use_mmap)
     test_if_locked|= HA_OPEN_MMAP;
 
-  if (!(file=mi_open(name, mode, test_if_locked | HA_OPEN_FROM_SQL_LAYER)))
-    return (my_errno ? my_errno : -1);
+  if (file != mi_open_files(file, name, mode,
+                            test_if_locked | HA_OPEN_FROM_SQL_LAYER))
+    DBUG_RETURN(my_errno ? my_errno : -1);
   if (!table->s->tmp_table) /* No need to perform a check for tmp table */
   {
     if ((my_errno= table2myisam(table, &keyinfo, &recinfo, &recs)))
@@ -681,12 +704,7 @@ int ha_myisam::open(const char *name, in
     }
   }
   
-  if (test_if_locked & (HA_OPEN_IGNORE_IF_LOCKED | HA_OPEN_TMP_TABLE))
-    VOID(mi_extra(file, HA_EXTRA_NO_WAIT_LOCK, 0));
-
   info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
-  if (!(test_if_locked & HA_OPEN_WAIT_IF_LOCKED))
-    VOID(mi_extra(file, HA_EXTRA_WAIT_LOCK, 0));
   if (!table->s->db_record_offset)
     int_table_flags|=HA_REC_NOT_IN_SEQ;
   if (file->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
@@ -703,7 +721,7 @@ int ha_myisam::open(const char *name, in
   my_errno= 0;
   goto end;
  err:
-  this->close();
+  this->close_files();
  end:
   /*
     Both recinfo and keydef are allocated by my_multi_malloc(), thus only
@@ -711,14 +729,47 @@ int ha_myisam::open(const char *name, in
   */
   if (recinfo)
     my_free((uchar*) recinfo, MYF(0));
-  return my_errno;
+  DBUG_RETURN(my_errno);
 }
 
-int ha_myisam::close(void)
+
+
+/* Name is here without an extension */
+int ha_myisam::open(const char *name, int mode, uint test_if_locked)
+{
+  int error;
+  DBUG_ENTER("ha_myisam::open");
+  if ((error= open_handler(name, mode, test_if_locked)))
+    DBUG_RETURN(error);
+  if ((error= open_files(name, mode, test_if_locked)))
+  {
+    /* Revert the open_handler in case of error in open_files */
+    close_handler();
+  }
+  DBUG_RETURN(error);
+}
+
+int ha_myisam::close_files(void)
+{
+  return mi_close_files(file);
+}
+
+int ha_myisam::close_handler(void)
 {
   MI_INFO *tmp=file;
   file=0;
-  return mi_close(tmp);
+  return mi_close_handler(tmp);
+}
+
+int ha_myisam::close(void)
+{
+  int error, saved_error= 0;
+  if ((error= close_files()))
+    saved_error= error;
+  if ((error= close_handler()))
+    saved_error= error;
+
+  return saved_error;
 }
 
 int ha_myisam::write_row(uchar *buf)

=== modified file 'storage/myisam/ha_myisam.h'
--- a/storage/myisam/ha_myisam.h	2007-11-15 19:25:43 +0000
+++ b/storage/myisam/ha_myisam.h	2009-02-17 14:04:29 +0000
@@ -61,7 +61,11 @@ class ha_myisam: public handler
   uint checksum() const;
 
   int open(const char *name, int mode, uint test_if_locked);
+  int open_handler(const char *name, int mode, uint test_if_locked);
+  int open_files(const char *name, int mode, uint test_if_locked);
   int close(void);
+  int close_files(void);
+  int close_handler(void);
   int write_row(uchar * buf);
   int update_row(const uchar * old_data, uchar * new_data);
   int delete_row(const uchar * buf);

=== modified file 'storage/myisam/mi_close.c'
--- a/storage/myisam/mi_close.c	2007-05-10 09:59:39 +0000
+++ b/storage/myisam/mi_close.c	2009-02-17 14:04:29 +0000
@@ -22,14 +22,14 @@
 
 #include "myisamdef.h"
 
-int mi_close(register MI_INFO *info)
+int mi_close_files(register MI_INFO *info)
 {
   int error=0,flag;
   MYISAM_SHARE *share=info->s;
-  DBUG_ENTER("mi_close");
-  DBUG_PRINT("enter",("base: 0x%lx  reopen: %u  locks: %u",
+  DBUG_ENTER("mi_close_files");
+  DBUG_PRINT("enter",("base: 0x%lx  reopen: %u  locks: %u name %s",
 		      (long) info, (uint) share->reopen,
-                      (uint) share->tot_locks));
+                      (uint) share->tot_locks, share->unique_file_name));
 
   pthread_mutex_lock(&THR_LOCK_myisam);
   if (info->lock_type == F_EXTRA_LCK)
@@ -57,10 +57,10 @@ int mi_close(register MI_INFO *info)
     info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
   }
   flag= !--share->reopen;
-  myisam_open_list=list_delete(myisam_open_list,&info->open_list);
   pthread_mutex_unlock(&share->intern_lock);
 
   my_free(mi_get_rec_buff_ptr(info, info->rec_buff), MYF(MY_ALLOW_ZERO_PTR));
+  info->rec_buff= NULL;
   if (flag)
   {
     if (share->kfile >= 0 &&
@@ -80,29 +80,21 @@ int mi_close(register MI_INFO *info)
 	mi_state_info_write(share->kfile, &share->state, 1);
       if (my_close(share->kfile,MYF(0)))
         error = my_errno;
+      share->kfile= -1;
     }
 #ifdef HAVE_MMAP
     if (share->file_map)
+    {
       _mi_unmap_file(info);
+      share->file_map= NULL;
+    }
 #endif
     if (share->decode_trees)
     {
       my_free((uchar*) share->decode_trees,MYF(0));
       my_free((uchar*) share->decode_tables,MYF(0));
+      share->decode_trees= NULL;
     }
-#ifdef THREAD
-    thr_lock_delete(&share->lock);
-    VOID(pthread_mutex_destroy(&share->intern_lock));
-    {
-      int i,keys;
-      keys = share->state.header.keys;
-      VOID(rwlock_destroy(&share->mmap_lock));
-      for(i=0; i<keys; i++) {
-	VOID(rwlock_destroy(&share->key_root_lock[i]));
-      }
-    }
-#endif
-    my_free((uchar*) info->s,MYF(0));
   }
   pthread_mutex_unlock(&THR_LOCK_myisam);
   if (info->ftparser_param)
@@ -112,13 +104,65 @@ int mi_close(register MI_INFO *info)
   }
   if (info->dfile >= 0 && my_close(info->dfile,MYF(0)))
     error = my_errno;
+  info->dfile= -1;
 
   myisam_log_command(MI_LOG_CLOSE,info,NULL,0,error);
-  my_free((uchar*) info,MYF(0));
 
   if (error)
   {
     DBUG_RETURN(my_errno=error);
   }
   DBUG_RETURN(0);
+} /* mi_close_files */
+
+
+int mi_close_handler(register MI_INFO *info)
+{
+  MYISAM_SHARE *share=info->s;
+  int error=0, flag;
+  DBUG_ENTER("mi_close_handler");
+  DBUG_PRINT("enter",("base: 0x%lx  reopen: %u refs: %u locks: %u name %s",
+		      (long) info, (uint) share->reopen,
+                      (uint) share->ref_count,
+                      (uint) share->tot_locks, share->unique_file_name));
+
+  pthread_mutex_lock(&THR_LOCK_myisam);
+  pthread_mutex_lock(&share->intern_lock);
+  flag= !--share->ref_count;
+  myisam_open_list=list_delete(myisam_open_list,&info->open_list);
+  pthread_mutex_unlock(&share->intern_lock);
+  if (flag)
+  {
+
+#ifdef THREAD
+    thr_lock_delete(&share->lock);
+    VOID(pthread_mutex_destroy(&share->intern_lock));
+    {
+      int i,keys;
+      keys = share->state.header.keys;
+      VOID(rwlock_destroy(&share->mmap_lock));
+      for(i=0; i<keys; i++) {
+        VOID(rwlock_destroy(&share->key_root_lock[i]));
+      }
+    }
+#endif
+    my_free((uchar*) info->s,MYF(0));
+  }
+  pthread_mutex_unlock(&THR_LOCK_myisam);
+  myisam_log_command(MI_LOG_CLOSE,info,NULL,0,error);
+  my_free((uchar*) info,MYF(0));
+  DBUG_RETURN(0);
+}
+
+int mi_close(register MI_INFO *info)
+{
+  int error,saved_error=0;
+  DBUG_ENTER("mi_close");
+  if ((error= mi_close_files(info)))
+    saved_error= error;
+  if ((error= mi_close_handler(info)))
+    DBUG_RETURN(my_errno= error);
+  if (saved_error)
+    my_errno= saved_error;
+  DBUG_RETURN(saved_error);
 } /* mi_close */

=== modified file 'storage/myisam/mi_extra.c'
--- a/storage/myisam/mi_extra.c	2007-10-11 15:07:40 +0000
+++ b/storage/myisam/mi_extra.c	2009-02-17 14:04:29 +0000
@@ -431,11 +431,12 @@ int mi_reset(MI_INFO *info)
   {
     info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
     error= end_io_cache(&info->rec_cache);
+    DBUG_PRINT("info", ("end_io_cache returned %d", error));
   }
-  if (share->base.blobs)
+  if (share->base.blobs && info->rec_buff)
     mi_alloc_rec_buff(info, -1, &info->rec_buff);
 #if defined(HAVE_MMAP) && defined(HAVE_MADVISE)
-  if (info->opt_flag & MEMMAP_USED)
+  if ((info->opt_flag & MEMMAP_USED) && share->file_map)
     madvise((char*) share->file_map, share->state.state.data_file_length,
             MADV_RANDOM);
 #endif

=== modified file 'storage/myisam/mi_locking.c'
--- a/storage/myisam/mi_locking.c	2007-08-13 13:11:25 +0000
+++ b/storage/myisam/mi_locking.c	2009-02-17 14:04:29 +0000
@@ -284,8 +284,11 @@ void mi_get_status(void* param, int conc
 		     (long) info->s->state.state.data_file_length,
                      concurrent_insert));
 #ifndef DBUG_OFF
-  if (info->state->key_file_length > info->s->state.state.key_file_length ||
-      info->state->data_file_length > info->s->state.state.data_file_length)
+  if (!info->state)
+    DBUG_PRINT("info", ("mi_get_status called when never used"));
+  if (info->state &&
+      (info->state->key_file_length > info->s->state.state.key_file_length ||
+       info->state->data_file_length > info->s->state.state.data_file_length))
     DBUG_PRINT("warning",("old info:  key_file: %ld  data_file: %ld",
 			  (long) info->state->key_file_length,
 			  (long) info->state->data_file_length));

=== modified file 'storage/myisam/mi_open.c'
--- a/storage/myisam/mi_open.c	2009-02-13 11:04:15 +0000
+++ b/storage/myisam/mi_open.c	2009-02-17 14:04:29 +0000
@@ -65,14 +65,165 @@ MI_INFO *test_if_reopen(char *filename)
 
 
 /******************************************************************************
-  open a MyISAM database.
+  open a MyISAM database without touching its file, i.e. initialization only.
   See my_base.h for the handle_locking argument
   if handle_locking and HA_OPEN_ABORT_IF_CRASHED then abort if the table
   is marked crashed or if we are not using locking and the table doesn't
   have an open count of 0.
 ******************************************************************************/
 
-MI_INFO *mi_open(const char *name, int mode, uint open_flags)
+MI_INFO *mi_open_handler(const char *name, int mode, uint open_flags)
+{
+  int save_errno, realpath_err;
+  uint errpos;
+  char name_buff[FN_REFLEN], org_name[FN_REFLEN];
+  MI_INFO info,*m_info,*old_info;
+  MYISAM_SHARE share_buff,*share;
+  DBUG_ENTER("mi_open_handler");
+
+  LINT_INIT(m_info);
+  LINT_INIT(open_flags);
+  errpos=0;
+  bzero((uchar*) &info,sizeof(info));
+
+  realpath_err= my_realpath(name_buff,
+                  fn_format(org_name,name,"",MI_NAME_IEXT,4),MYF(0));
+
+  pthread_mutex_lock(&THR_LOCK_myisam);
+  if (!(old_info=test_if_reopen(name_buff)))
+  {
+    share= &share_buff;
+    bzero((uchar*) &share_buff,sizeof(share_buff));
+
+    errpos=3;
+    if (!my_multi_malloc(MY_WME,
+			 &share,sizeof(*share),
+			 &share->unique_file_name,strlen(name_buff)+1,
+			 &share->mmap_lock,sizeof(rw_lock_t),
+			 NullS))
+      goto err;
+    errpos=4;
+    *share=share_buff;
+ 
+    strmov(share->unique_file_name, name_buff);
+    share->unique_name_length= strlen(name_buff);
+
+    share->this_process=(ulong) getpid();
+    share->last_process= share->state.process;
+    share->last_version=1;                          /* Mark as opened */
+    share->blocksize=min(IO_SIZE,myisam_block_size);
+    share->is_log_table= FALSE;
+#ifdef THREAD
+    thr_lock_init(&share->lock);
+    VOID(pthread_mutex_init(&share->intern_lock,MY_MUTEX_INIT_FAST));
+    VOID(my_rwlock_init(&share->mmap_lock, NULL));
+    if (!thr_lock_inited)
+    {
+      /* Probably a single threaded program; Don't use concurrent inserts */
+      myisam_concurrent_insert=0;
+    }
+#endif
+    share->mode=O_RDWR;
+  }
+  else
+  {
+    share= old_info->s;
+    if (mode == O_RDWR && share->mode == O_RDONLY)
+    {
+      my_errno=EACCES;				/* Can't open in write mode */
+      goto err;
+    }
+    errpos=5;
+  }
+
+  /* alloc and set up private structure parts */
+  if (!my_multi_malloc(MY_WME,
+		       &m_info,sizeof(MI_INFO),
+		       &info.filename,strlen(name)+1,
+		       NullS))
+    goto err;
+  errpos=6;
+
+  strmov(info.filename,name);
+
+  info.s=share;
+  info.lastpos= HA_OFFSET_ERROR;
+  info.update= (short) (HA_STATE_NEXT_FOUND+HA_STATE_PREV_FOUND);
+  info.opt_flag=0;
+  info.this_loop=0;				/* Update counter */
+  if (mode == O_RDONLY)
+  {
+    share->options|=HA_OPTION_READ_ONLY_DATA;
+    share->mode= O_RDONLY;
+  }
+  info.lock_type=F_UNLCK;
+  info.quick_mode=0;
+  info.bulk_insert=0;
+  info.ft1_to_ft2=0;
+  info.errkey= -1;
+  info.page_changed=1;
+  pthread_mutex_lock(&share->intern_lock);
+#ifdef NOT_USED
+  /* only incremented when opening a file */
+  share->reopen++;
+#endif /* NOT_USED */
+  /* increase share reference count, under THR_LOCK_myisam */
+  share->ref_count++;
+  share->write_flag=MYF(MY_NABP | MY_WAIT_IF_FULL);
+  pthread_mutex_unlock(&share->intern_lock);
+
+  *m_info=info;
+#ifdef THREAD
+  thr_lock_data_init(&share->lock,&m_info->lock,(void*) m_info);
+#endif
+  m_info->open_list.data=(void*) m_info;
+  myisam_open_list=list_add(myisam_open_list,&m_info->open_list);
+
+  pthread_mutex_unlock(&THR_LOCK_myisam);
+  if (myisam_log_file >= 0)
+  {
+    intern_filename(name_buff,share->index_file_name);
+    _myisam_log(MI_LOG_OPEN, m_info, (uchar*) name_buff, strlen(name_buff));
+  }
+  DBUG_RETURN(m_info);
+
+err:
+  save_errno=my_errno ? my_errno : HA_ERR_END_OF_FILE;
+  if ((save_errno == HA_ERR_CRASHED) ||
+      (save_errno == HA_ERR_CRASHED_ON_USAGE) ||
+      (save_errno == HA_ERR_CRASHED_ON_REPAIR))
+    mi_report_error(save_errno, name);
+  switch (errpos) {
+  case 6:
+    my_free((uchar*) m_info,MYF(0));
+    /* fall through */
+  case 5:
+    if (old_info)
+      break;					/* Don't remove open table */
+    /* fall through */
+  case 4:
+    my_free((uchar*) share,MYF(0));
+    /* fall through */
+  case 0:
+  default:
+    break;
+  }
+  pthread_mutex_unlock(&THR_LOCK_myisam);
+  my_errno=save_errno;
+  DBUG_RETURN (NULL);
+} /* mi_open_handler */
+
+
+/******************************************************************************
+  open MyISAM database files (data + index files).
+  See my_base.h for the handle_locking argument
+  if handle_locking and HA_OPEN_ABORT_IF_CRASHED then abort if the table
+  is marked crashed or if we are not using locking and the table doesn't
+  have an open count of 0.
+******************************************************************************/
+
+MI_INFO *mi_open_files(MI_INFO *m_info, const char *name,
+                       int mode, uint open_flags)
 {
   int lock_error,kfile,open_mode,save_errno,have_rtree=0, realpath_err;
   uint i,j,len,errpos,head_length,base_pos,offset,info_length,keys,
@@ -80,19 +231,24 @@ MI_INFO *mi_open(const char *name, int m
   char name_buff[FN_REFLEN], org_name[FN_REFLEN], index_name[FN_REFLEN],
        data_name[FN_REFLEN];
   uchar *disk_cache, *disk_pos, *end_pos;
-  MI_INFO info,*m_info,*old_info;
-  MYISAM_SHARE share_buff,*share;
+  MYISAM_SHARE *share;
   ulong rec_per_key_part[HA_MAX_POSSIBLE_KEY*MI_MAX_KEY_SEG];
   my_off_t key_root[HA_MAX_POSSIBLE_KEY],key_del[MI_MAX_KEY_BLOCK_SIZE];
   ulonglong max_key_file_length, max_data_file_length;
-  DBUG_ENTER("mi_open");
+  DBUG_ENTER("mi_open_files");
 
-  LINT_INIT(m_info);
+  if (!m_info)
+  {
+    /* It must have been opened by mi_open_handler first! */
+    DBUG_ASSERT(0);
+    DBUG_PRINT("error",("mi_open_files without mi_open_handler beeing called"));
+    my_errno=HA_ERR_UNSUPPORTED;
+    DBUG_RETURN(NULL);
+  }
   kfile= -1;
   lock_error=1;
   errpos=0;
-  head_length=sizeof(share_buff.state.header);
-  bzero((uchar*) &info,sizeof(info));
+  head_length=sizeof(share->state.header);
 
   realpath_err= my_realpath(name_buff,
                   fn_format(org_name,name,"",MI_NAME_IEXT,4),MYF(0));
@@ -104,517 +260,505 @@ MI_INFO *mi_open(const char *name, int m
   }
 
   pthread_mutex_lock(&THR_LOCK_myisam);
-  if (!(old_info=test_if_reopen(name_buff)))
+  if (!test_if_reopen(name_buff))
   {
-    share= &share_buff;
-    bzero((uchar*) &share_buff,sizeof(share_buff));
-    share_buff.state.rec_per_key_part=rec_per_key_part;
-    share_buff.state.key_root=key_root;
-    share_buff.state.key_del=key_del;
-    share_buff.key_cache= multi_key_cache_search((uchar*) name_buff,
-                                                 strlen(name_buff));
-
-    DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_open",
-                    if (strstr(name, "/t1"))
-                    {
-                      my_errno= HA_ERR_CRASHED;
-                      goto err;
-                    });
-    if ((kfile=my_open(name_buff,(open_mode=O_RDWR) | O_SHARE,MYF(0))) < 0)
-    {
-      if ((errno != EROFS && errno != EACCES) ||
-	  mode != O_RDONLY ||
-	  (kfile=my_open(name_buff,(open_mode=O_RDONLY) | O_SHARE,MYF(0))) < 0)
-	goto err;
-    }
-    share->mode=open_mode;
-    errpos=1;
-    if (my_read(kfile, share->state.header.file_version, head_length,
-		MYF(MY_NABP)))
-    {
-      my_errno= HA_ERR_NOT_A_TABLE;
-      goto err;
-    }
-    if (memcmp((uchar*) share->state.header.file_version,
-	       (uchar*) myisam_file_magic, 4))
-    {
-      DBUG_PRINT("error",("Wrong header in %s",name_buff));
-      DBUG_DUMP("error_dump",(char*) share->state.header.file_version,
-		head_length);
-      my_errno=HA_ERR_NOT_A_TABLE;
-      goto err;
-    }
-    share->options= mi_uint2korr(share->state.header.options);
-    if (share->options &
-	~(HA_OPTION_PACK_RECORD | HA_OPTION_PACK_KEYS |
-	  HA_OPTION_COMPRESS_RECORD | HA_OPTION_READ_ONLY_DATA |
-	  HA_OPTION_TEMP_COMPRESS_RECORD | HA_OPTION_CHECKSUM |
-          HA_OPTION_TMP_TABLE | HA_OPTION_DELAY_KEY_WRITE |
-          HA_OPTION_RELIES_ON_SQL_LAYER))
-    {
-      DBUG_PRINT("error",("wrong options: 0x%lx", share->options));
-      my_errno=HA_ERR_OLD_FILE;
-      goto err;
-    }
-    if ((share->options & HA_OPTION_RELIES_ON_SQL_LAYER) &&
-        ! (open_flags & HA_OPEN_FROM_SQL_LAYER))
-    {
-      DBUG_PRINT("error", ("table cannot be openned from non-sql layer"));
-      my_errno= HA_ERR_UNSUPPORTED;
-      goto err;
-    }
-    /* Don't call realpath() if the name can't be a link */
-    if (!strcmp(name_buff, org_name) ||
-        my_readlink(index_name, org_name, MYF(0)) == -1)
-      (void) strmov(index_name, org_name);
-    *strrchr(org_name, '.')= '\0';
-    (void) fn_format(data_name,org_name,"",MI_NAME_DEXT,
-                     MY_APPEND_EXT|MY_UNPACK_FILENAME|MY_RESOLVE_SYMLINKS);
-
-    info_length=mi_uint2korr(share->state.header.header_length);
-    base_pos=mi_uint2korr(share->state.header.base_pos);
-    if (!(disk_cache= (uchar*) my_alloca(info_length+128)))
+    /* It must have been opened by mi_open_handler first! */
+    DBUG_ASSERT(0);
+    DBUG_PRINT("error",("mi_open_files without mi_open_handler beeing called"));
+    my_errno=HA_ERR_UNSUPPORTED;
+    DBUG_RETURN(NULL);
+  }
+  else
+  {
+    share= m_info->s;
+    DBUG_ASSERT(share);
+    DBUG_PRINT("info",("base: 0x%lx  reopen: %u  locks: %u name %s",
+                       (long) m_info, (uint) share->reopen,
+                       (uint) share->tot_locks, share->unique_file_name));
+    if (mode == O_RDWR && share->mode == O_RDONLY)
     {
-      my_errno=ENOMEM;
+      my_errno=EACCES;				/* Can't open in write mode */
       goto err;
     }
-    end_pos=disk_cache+info_length;
-    errpos=2;
 
-    VOID(my_seek(kfile,0L,MY_SEEK_SET,MYF(0)));
-    if (!(open_flags & HA_OPEN_TMP_TABLE))
+    if (!share->reopen)
     {
-      if ((lock_error=my_lock(kfile,F_RDLCK,0L,F_TO_EOF,
-			      MYF(open_flags & HA_OPEN_WAIT_IF_LOCKED ?
-				  0 : MY_DONT_WAIT))) &&
-	  !(open_flags & HA_OPEN_IGNORE_IF_LOCKED))
-	goto err;
-    }
-    errpos=3;
-    if (my_read(kfile,disk_cache,info_length,MYF(MY_NABP)))
-    {
-      my_errno=HA_ERR_CRASHED;
-      goto err;
-    }
-    len=mi_uint2korr(share->state.header.state_info_length);
-    keys=    (uint) share->state.header.keys;
-    uniques= (uint) share->state.header.uniques;
-    fulltext_keys= (uint) share->state.header.fulltext_keys;
-    key_parts= mi_uint2korr(share->state.header.key_parts);
-    unique_key_parts= mi_uint2korr(share->state.header.unique_key_parts);
-    if (len != MI_STATE_INFO_SIZE)
-    {
-      DBUG_PRINT("warning",
-		 ("saved_state_info_length: %d  state_info_length: %d",
-		  len,MI_STATE_INFO_SIZE));
-    }
-    share->state_diff_length=len-MI_STATE_INFO_SIZE;
+      share->state.rec_per_key_part=rec_per_key_part;
+      share->state.key_root=key_root;
+      share->state.key_del=key_del;
+      share->key_cache= multi_key_cache_search((uchar*) name_buff,
+                                               strlen(name_buff));
+
+      DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_open",
+                      if (strstr(name, "/t1"))
+                      {
+                        my_errno= HA_ERR_CRASHED;
+                        goto err;
+                      });
+      if ((kfile=my_open(name_buff,(open_mode=O_RDWR) | O_SHARE,MYF(0))) < 0)
+      {
+        if ((errno != EROFS && errno != EACCES) ||
+            mode != O_RDONLY ||
+            (kfile=my_open(name_buff,(open_mode=O_RDONLY) | O_SHARE,MYF(0))) < 0)
+          goto err;
+      }
+      share->mode=open_mode;
+      errpos=1;
+      if (my_read(kfile, share->state.header.file_version, head_length,
+                  MYF(MY_NABP)))
+      {
+        my_errno= HA_ERR_NOT_A_TABLE;
+        goto err;
+      }
+      if (memcmp((uchar*) share->state.header.file_version,
+                 (uchar*) myisam_file_magic, 4))
+      {
+        DBUG_PRINT("error",("Wrong header in %s",name_buff));
+        DBUG_DUMP("error_dump",(uchar*) share->state.header.file_version,
+                  head_length);
+        my_errno=HA_ERR_NOT_A_TABLE;
+        goto err;
+      }
+      share->options= mi_uint2korr(share->state.header.options);
+      if (share->options &
+          ~(HA_OPTION_PACK_RECORD | HA_OPTION_PACK_KEYS |
+            HA_OPTION_COMPRESS_RECORD | HA_OPTION_READ_ONLY_DATA |
+            HA_OPTION_TEMP_COMPRESS_RECORD | HA_OPTION_CHECKSUM |
+            HA_OPTION_TMP_TABLE | HA_OPTION_DELAY_KEY_WRITE |
+            HA_OPTION_RELIES_ON_SQL_LAYER))
+      {
+        DBUG_PRINT("error",("wrong options: 0x%lx", share->options));
+        my_errno=HA_ERR_OLD_FILE;
+        goto err;
+      }
+      if ((share->options & HA_OPTION_RELIES_ON_SQL_LAYER) &&
+          ! (open_flags & HA_OPEN_FROM_SQL_LAYER))
+      {
+        DBUG_PRINT("error", ("table cannot be openned from non-sql layer"));
+        my_errno= HA_ERR_UNSUPPORTED;
+        goto err;
+      }
+      /* Don't call realpath() if the name can't be a link */
+      if (!strcmp(name_buff, org_name) ||
+          my_readlink(index_name, org_name, MYF(0)) == -1)
+        (void) strmov(index_name, org_name);
+      *strrchr(org_name, '.')= '\0';
+      (void) fn_format(data_name,org_name,"",MI_NAME_DEXT,
+                       MY_APPEND_EXT|MY_UNPACK_FILENAME|MY_RESOLVE_SYMLINKS);
+
+      info_length=mi_uint2korr(share->state.header.header_length);
+      base_pos=mi_uint2korr(share->state.header.base_pos);
+      if (!(disk_cache= (uchar*) my_alloca(info_length+128)))
+      {
+        my_errno=ENOMEM;
+        goto err;
+      }
+      end_pos=disk_cache+info_length;
+      errpos=2;
 
-    mi_state_info_read(disk_cache, &share->state);
-    len= mi_uint2korr(share->state.header.base_info_length);
-    if (len != MI_BASE_INFO_SIZE)
-    {
-      DBUG_PRINT("warning",("saved_base_info_length: %d  base_info_length: %d",
-			    len,MI_BASE_INFO_SIZE));
-    }
-    disk_pos= my_n_base_info_read(disk_cache + base_pos, &share->base);
-    share->state.state_length=base_pos;
+      VOID(my_seek(kfile,0L,MY_SEEK_SET,MYF(0)));
+      if (!(open_flags & HA_OPEN_TMP_TABLE))
+      {
+        if ((lock_error=my_lock(kfile,F_RDLCK,0L,F_TO_EOF,
+                                MYF(open_flags & HA_OPEN_WAIT_IF_LOCKED ?
+                                    0 : MY_DONT_WAIT))) &&
+            !(open_flags & HA_OPEN_IGNORE_IF_LOCKED))
+          goto err;
+      }
+      errpos=3;
+      if (my_read(kfile,disk_cache,info_length,MYF(MY_NABP)))
+      {
+        my_errno=HA_ERR_CRASHED;
+        goto err;
+      }
+      len=mi_uint2korr(share->state.header.state_info_length);
+      keys=    (uint) share->state.header.keys;
+      uniques= (uint) share->state.header.uniques;
+      fulltext_keys= (uint) share->state.header.fulltext_keys;
+      key_parts= mi_uint2korr(share->state.header.key_parts);
+      unique_key_parts= mi_uint2korr(share->state.header.unique_key_parts);
+      if (len != MI_STATE_INFO_SIZE)
+      {
+        DBUG_PRINT("warning",
+                   ("saved_state_info_length: %d  state_info_length: %d",
+                    len,MI_STATE_INFO_SIZE));
+      }
+      share->state_diff_length=len-MI_STATE_INFO_SIZE;
 
-    if (!(open_flags & HA_OPEN_FOR_REPAIR) &&
-	((share->state.changed & STATE_CRASHED) ||
-	 ((open_flags & HA_OPEN_ABORT_IF_CRASHED) &&
-	  (my_disable_locking && share->state.open_count))))
-    {
-      DBUG_PRINT("error",("Table is marked as crashed. open_flags: %u  "
-                          "changed: %u  open_count: %u  !locking: %d",
-                          open_flags, share->state.changed,
-                          share->state.open_count, my_disable_locking));
-      my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
-		HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
-      goto err;
-    }
+      mi_state_info_read(disk_cache, &share->state);
+      len= mi_uint2korr(share->state.header.base_info_length);
+      if (len != MI_BASE_INFO_SIZE)
+      {
+        DBUG_PRINT("warning",("saved_base_info_length: %d  base_info_length: %d",
+                              len,MI_BASE_INFO_SIZE));
+      }
+      disk_pos= my_n_base_info_read(disk_cache + base_pos, &share->base);
+      share->state.state_length=base_pos;
 
-    /* sanity check */
-    if (share->base.keystart > 65535 || share->base.rec_reflength > 8)
-    {
-      my_errno=HA_ERR_CRASHED;
-      goto err;
-    }
+      if (!(open_flags & HA_OPEN_FOR_REPAIR) &&
+          ((share->state.changed & STATE_CRASHED) ||
+           ((open_flags & HA_OPEN_ABORT_IF_CRASHED) &&
+            (my_disable_locking && share->state.open_count))))
+      {
+        DBUG_PRINT("error",("Table is marked as crashed. open_flags: %u  "
+                            "changed: %u  open_count: %u  !locking: %d",
+                            open_flags, share->state.changed,
+                            share->state.open_count, my_disable_locking));
+        my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
+                  HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
+        goto err;
+      }
 
-    key_parts+=fulltext_keys*FT_SEGS;
-    if (share->base.max_key_length > MI_MAX_KEY_BUFF || keys > MI_MAX_KEY ||
-	key_parts > MI_MAX_KEY * MI_MAX_KEY_SEG)
-    {
-      DBUG_PRINT("error",("Wrong key info:  Max_key_length: %d  keys: %d  key_parts: %d", share->base.max_key_length, keys, key_parts));
-      my_errno=HA_ERR_UNSUPPORTED;
-      goto err;
-    }
+      /* sanity check */
+      if (share->base.keystart > 65535 || share->base.rec_reflength > 8)
+      {
+        my_errno=HA_ERR_CRASHED;
+        goto err;
+      }
+
+      key_parts+=fulltext_keys*FT_SEGS;
+      if (share->base.max_key_length > MI_MAX_KEY_BUFF || keys > MI_MAX_KEY ||
+          key_parts > MI_MAX_KEY * MI_MAX_KEY_SEG)
+      {
+        DBUG_PRINT("error",("Wrong key info:  Max_key_length: %d  keys: %d  key_parts: %d", share->base.max_key_length, keys, key_parts));
+        my_errno=HA_ERR_UNSUPPORTED;
+        goto err;
+      }
 
-    /* Correct max_file_length based on length of sizeof(off_t) */
-    max_data_file_length=
-      (share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD)) ?
-      (((ulonglong) 1 << (share->base.rec_reflength*8))-1) :
-      (mi_safe_mul(share->base.pack_reclength,
-		   (ulonglong) 1 << (share->base.rec_reflength*8))-1);
-    max_key_file_length=
-      mi_safe_mul(MI_MIN_KEY_BLOCK_LENGTH,
-		  ((ulonglong) 1 << (share->base.key_reflength*8))-1);
+      /* Correct max_file_length based on length of sizeof(off_t) */
+      max_data_file_length=
+        (share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD)) ?
+        (((ulonglong) 1 << (share->base.rec_reflength*8))-1) :
+        (mi_safe_mul(share->base.pack_reclength,
+                     (ulonglong) 1 << (share->base.rec_reflength*8))-1);
+      max_key_file_length=
+        mi_safe_mul(MI_MIN_KEY_BLOCK_LENGTH,
+                    ((ulonglong) 1 << (share->base.key_reflength*8))-1);
 #if SIZEOF_OFF_T == 4
-    set_if_smaller(max_data_file_length, INT_MAX32);
-    set_if_smaller(max_key_file_length, INT_MAX32);
+      set_if_smaller(max_data_file_length, INT_MAX32);
+      set_if_smaller(max_key_file_length, INT_MAX32);
 #endif
 #if USE_RAID && SYSTEM_SIZEOF_OFF_T == 4
-    set_if_smaller(max_key_file_length, INT_MAX32);
-    if (!share->base.raid_type)
-    {
-      set_if_smaller(max_data_file_length, INT_MAX32);
-    }
-    else
-    {
-      set_if_smaller(max_data_file_length,
-		     (ulonglong) share->base.raid_chunks << 31);
-    }
+      set_if_smaller(max_key_file_length, INT_MAX32);
+      if (!share->base.raid_type)
+      {
+        set_if_smaller(max_data_file_length, INT_MAX32);
+      }
+      else
+      {
+        set_if_smaller(max_data_file_length,
+                       (ulonglong) share->base.raid_chunks << 31);
+      }
 #elif !defined(USE_RAID)
-    if (share->base.raid_type)
-    {
-      DBUG_PRINT("error",("Table uses RAID but we don't have RAID support"));
-      my_errno=HA_ERR_UNSUPPORTED;
-      goto err;
-    }
+      if (share->base.raid_type)
+      {
+        DBUG_PRINT("error",("Table uses RAID but we don't have RAID support"));
+        my_errno=HA_ERR_UNSUPPORTED;
+        goto err;
+      }
 #endif
-    share->base.max_data_file_length=(my_off_t) max_data_file_length;
-    share->base.max_key_file_length=(my_off_t) max_key_file_length;
+      share->base.max_data_file_length=(my_off_t) max_data_file_length;
+      share->base.max_key_file_length=(my_off_t) max_key_file_length;
 
-    if (share->options & HA_OPTION_COMPRESS_RECORD)
-      share->base.max_key_length+=2;	/* For safety */
+      if (share->options & HA_OPTION_COMPRESS_RECORD)
+        share->base.max_key_length+=2;	/* For safety */
 
-    /* Add space for node pointer */
-    share->base.max_key_length+= share->base.key_reflength;
+      /* Add space for node pointer */
+      share->base.max_key_length+= share->base.key_reflength;
 
-    if (!my_multi_malloc(MY_WME,
-			 &share,sizeof(*share),
-			 &share->state.rec_per_key_part,sizeof(long)*key_parts,
-			 &share->keyinfo,keys*sizeof(MI_KEYDEF),
-			 &share->uniqueinfo,uniques*sizeof(MI_UNIQUEDEF),
-			 &share->keyparts,
-			 (key_parts+unique_key_parts+keys+uniques) *
-			 sizeof(HA_KEYSEG),
-			 &share->rec,
-			 (share->base.fields+1)*sizeof(MI_COLUMNDEF),
-			 &share->blobs,sizeof(MI_BLOB)*share->base.blobs,
-			 &share->unique_file_name,strlen(name_buff)+1,
-			 &share->index_file_name,strlen(index_name)+1,
-			 &share->data_file_name,strlen(data_name)+1,
-			 &share->state.key_root,keys*sizeof(my_off_t),
-			 &share->state.key_del,
-			 (share->state.header.max_block_size_index*sizeof(my_off_t)),
+      if (!my_multi_malloc(MY_WME,
+                           &share->state.rec_per_key_part,sizeof(long)*key_parts,
+                           &share->keyinfo,keys*sizeof(MI_KEYDEF),
+                           &share->uniqueinfo,uniques*sizeof(MI_UNIQUEDEF),
+                           &share->keyparts,
+                           (key_parts+unique_key_parts+keys+uniques) *
+                           sizeof(HA_KEYSEG),
+                           &share->rec,
+                           (share->base.fields+1)*sizeof(MI_COLUMNDEF),
+                           &share->blobs,sizeof(MI_BLOB)*share->base.blobs,
+                           &share->index_file_name,strlen(index_name)+1,
+                           &share->data_file_name,strlen(data_name)+1,
+                           &share->state.key_root,keys*sizeof(my_off_t),
+                           &share->state.key_del,
+                           (share->state.header.max_block_size_index*sizeof(my_off_t)),
 #ifdef THREAD
-			 &share->key_root_lock,sizeof(rw_lock_t)*keys,
+                           &share->key_root_lock,sizeof(rw_lock_t)*keys,
 #endif
-			 &share->mmap_lock,sizeof(rw_lock_t),
-			 NullS))
-      goto err;
-    errpos=4;
-    *share=share_buff;
-    memcpy((char*) share->state.rec_per_key_part,
-	   (char*) rec_per_key_part, sizeof(long)*key_parts);
-    memcpy((char*) share->state.key_root,
-	   (char*) key_root, sizeof(my_off_t)*keys);
-    memcpy((char*) share->state.key_del,
-	   (char*) key_del, (sizeof(my_off_t) *
-			     share->state.header.max_block_size_index));
-    strmov(share->unique_file_name, name_buff);
-    share->unique_name_length= strlen(name_buff);
-    strmov(share->index_file_name,  index_name);
-    strmov(share->data_file_name,   data_name);
+                           NullS))
+        goto err;
+      errpos=4;
+      memcpy((char*) share->state.rec_per_key_part,
+             (char*) rec_per_key_part, sizeof(long)*key_parts);
+      memcpy((char*) share->state.key_root,
+             (char*) key_root, sizeof(my_off_t)*keys);
+      memcpy((char*) share->state.key_del,
+             (char*) key_del, (sizeof(my_off_t) *
+                               share->state.header.max_block_size_index));
+      strmov(share->index_file_name,  index_name);
+      strmov(share->data_file_name,   data_name);
 
-    share->blocksize=min(IO_SIZE,myisam_block_size);
-    {
-      HA_KEYSEG *pos=share->keyparts;
-      for (i=0 ; i < keys ; i++)
       {
-        share->keyinfo[i].share= share;
-	disk_pos=mi_keydef_read(disk_pos, &share->keyinfo[i]);
-        disk_pos_assert(disk_pos + share->keyinfo[i].keysegs * HA_KEYSEG_SIZE,
- 			end_pos);
-        if (share->keyinfo[i].key_alg == HA_KEY_ALG_RTREE)
-          have_rtree=1;
-	set_if_smaller(share->blocksize,share->keyinfo[i].block_length);
-	share->keyinfo[i].seg=pos;
-	for (j=0 ; j < share->keyinfo[i].keysegs; j++,pos++)
-	{
-	  disk_pos=mi_keyseg_read(disk_pos, pos);
-          if (pos->flag & HA_BLOB_PART &&
-              ! (share->options & (HA_OPTION_COMPRESS_RECORD |
-                                   HA_OPTION_PACK_RECORD)))
+        HA_KEYSEG *pos=share->keyparts;
+        for (i=0 ; i < keys ; i++)
+        {
+          share->keyinfo[i].share= share;
+          disk_pos=mi_keydef_read(disk_pos, &share->keyinfo[i]);
+          disk_pos_assert(disk_pos + share->keyinfo[i].keysegs * HA_KEYSEG_SIZE,
+                          end_pos);
+          if (share->keyinfo[i].key_alg == HA_KEY_ALG_RTREE)
+              have_rtree=1;
+          set_if_smaller(share->blocksize,share->keyinfo[i].block_length);
+          share->keyinfo[i].seg=pos;
+          for (j=0 ; j < share->keyinfo[i].keysegs; j++,pos++)
           {
-            my_errno= HA_ERR_CRASHED;
-            goto err;
+            disk_pos=mi_keyseg_read(disk_pos, pos);
+            if (pos->flag & HA_BLOB_PART &&
+                ! (share->options & (HA_OPTION_COMPRESS_RECORD |
+                                     HA_OPTION_PACK_RECORD)))
+            {
+              my_errno= HA_ERR_CRASHED;
+              goto err;
+            }
+            if (pos->type == HA_KEYTYPE_TEXT ||
+                pos->type == HA_KEYTYPE_VARTEXT1 ||
+                pos->type == HA_KEYTYPE_VARTEXT2)
+            {
+              if (!pos->language)
+                pos->charset=default_charset_info;
+              else if (!(pos->charset= get_charset(pos->language, MYF(MY_WME))))
+              {
+                my_errno=HA_ERR_UNKNOWN_CHARSET;
+                goto err;
+              }
+            }
+            else if (pos->type == HA_KEYTYPE_BINARY)
+              pos->charset= &my_charset_bin;
           }
-	  if (pos->type == HA_KEYTYPE_TEXT ||
-              pos->type == HA_KEYTYPE_VARTEXT1 ||
-              pos->type == HA_KEYTYPE_VARTEXT2)
-	  {
-	    if (!pos->language)
-	      pos->charset=default_charset_info;
-	    else if (!(pos->charset= get_charset(pos->language, MYF(MY_WME))))
-	    {
-	      my_errno=HA_ERR_UNKNOWN_CHARSET;
-	      goto err;
-	    }
-	  }
-	  else if (pos->type == HA_KEYTYPE_BINARY)
-	    pos->charset= &my_charset_bin;
-	}
-	if (share->keyinfo[i].flag & HA_SPATIAL)
-	{
+          if (share->keyinfo[i].flag & HA_SPATIAL)
+          {
 #ifdef HAVE_SPATIAL
-	  uint sp_segs=SPDIMS*2;
-	  share->keyinfo[i].seg=pos-sp_segs;
-	  share->keyinfo[i].keysegs--;
+            uint sp_segs=SPDIMS*2;
+            share->keyinfo[i].seg=pos-sp_segs;
+            share->keyinfo[i].keysegs--;
 #else
-	  my_errno=HA_ERR_UNSUPPORTED;
-	  goto err;
+            my_errno=HA_ERR_UNSUPPORTED;
+            goto err;
 #endif
-	}
-        else if (share->keyinfo[i].flag & HA_FULLTEXT)
-	{
-          if (!fulltext_keys)
-          { /* 4.0 compatibility code, to be removed in 5.0 */
-            share->keyinfo[i].seg=pos-FT_SEGS;
-            share->keyinfo[i].keysegs-=FT_SEGS;
           }
-          else
+          else if (share->keyinfo[i].flag & HA_FULLTEXT)
           {
-            uint k;
-            share->keyinfo[i].seg=pos;
-            for (k=0; k < FT_SEGS; k++)
+            if (!fulltext_keys)
+            { /* 4.0 compatibility code, to be removed in 5.0 */
+              share->keyinfo[i].seg=pos-FT_SEGS;
+              share->keyinfo[i].keysegs-=FT_SEGS;
+            }
+            else
             {
-              *pos= ft_keysegs[k];
-              pos[0].language= pos[-1].language;
-              if (!(pos[0].charset= pos[-1].charset))
+              uint k;
+              share->keyinfo[i].seg=pos;
+              for (k=0; k < FT_SEGS; k++)
               {
-                my_errno=HA_ERR_CRASHED;
-                goto err;
+                *pos= ft_keysegs[k];
+                pos[0].language= pos[-1].language;
+                if (!(pos[0].charset= pos[-1].charset))
+                {
+                  my_errno=HA_ERR_CRASHED;
+                  goto err;
+                }
+                pos++;
               }
-              pos++;
+            }
+            if (!share->ft2_keyinfo.seg)
+            {
+              memcpy(& share->ft2_keyinfo, & share->keyinfo[i], sizeof(MI_KEYDEF));
+              share->ft2_keyinfo.keysegs=1;
+              share->ft2_keyinfo.flag=0;
+              share->ft2_keyinfo.keylength=
+              share->ft2_keyinfo.minlength=
+              share->ft2_keyinfo.maxlength=HA_FT_WLEN+share->base.rec_reflength;
+              share->ft2_keyinfo.seg=pos-1;
+              share->ft2_keyinfo.end=pos;
+              setup_key_functions(& share->ft2_keyinfo);
             }
           }
-          if (!share->ft2_keyinfo.seg)
+          setup_key_functions(share->keyinfo+i);
+          share->keyinfo[i].end=pos;
+          pos->type=HA_KEYTYPE_END;			/* End */
+          pos->length=share->base.rec_reflength;
+          pos->null_bit=0;
+          pos->flag=0;					/* For purify */
+          pos++;
+        }
+        for (i=0 ; i < uniques ; i++)
+        {
+          disk_pos=mi_uniquedef_read(disk_pos, &share->uniqueinfo[i]);
+          disk_pos_assert(disk_pos + share->uniqueinfo[i].keysegs *
+                          HA_KEYSEG_SIZE, end_pos);
+          share->uniqueinfo[i].seg=pos;
+          for (j=0 ; j < share->uniqueinfo[i].keysegs; j++,pos++)
           {
-            memcpy(& share->ft2_keyinfo, & share->keyinfo[i], sizeof(MI_KEYDEF));
-            share->ft2_keyinfo.keysegs=1;
-            share->ft2_keyinfo.flag=0;
-            share->ft2_keyinfo.keylength=
-            share->ft2_keyinfo.minlength=
-            share->ft2_keyinfo.maxlength=HA_FT_WLEN+share->base.rec_reflength;
-            share->ft2_keyinfo.seg=pos-1;
-            share->ft2_keyinfo.end=pos;
-            setup_key_functions(& share->ft2_keyinfo);
+            disk_pos=mi_keyseg_read(disk_pos, pos);
+            if (pos->type == HA_KEYTYPE_TEXT ||
+                pos->type == HA_KEYTYPE_VARTEXT1 ||
+                pos->type == HA_KEYTYPE_VARTEXT2)
+            {
+              if (!pos->language)
+                pos->charset=default_charset_info;
+              else if (!(pos->charset= get_charset(pos->language, MYF(MY_WME))))
+              {
+                my_errno=HA_ERR_UNKNOWN_CHARSET;
+                goto err;
+              }
+            }
           }
-	}
-        setup_key_functions(share->keyinfo+i);
-	share->keyinfo[i].end=pos;
-	pos->type=HA_KEYTYPE_END;			/* End */
-	pos->length=share->base.rec_reflength;
-	pos->null_bit=0;
-	pos->flag=0;					/* For purify */
-	pos++;
-      }
-      for (i=0 ; i < uniques ; i++)
-      {
-	disk_pos=mi_uniquedef_read(disk_pos, &share->uniqueinfo[i]);
-        disk_pos_assert(disk_pos + share->uniqueinfo[i].keysegs *
-			HA_KEYSEG_SIZE, end_pos);
-	share->uniqueinfo[i].seg=pos;
-	for (j=0 ; j < share->uniqueinfo[i].keysegs; j++,pos++)
-	{
-	  disk_pos=mi_keyseg_read(disk_pos, pos);
-	  if (pos->type == HA_KEYTYPE_TEXT ||
-              pos->type == HA_KEYTYPE_VARTEXT1 ||
-              pos->type == HA_KEYTYPE_VARTEXT2)
-	  {
-	    if (!pos->language)
-	      pos->charset=default_charset_info;
-	    else if (!(pos->charset= get_charset(pos->language, MYF(MY_WME))))
-	    {
-	      my_errno=HA_ERR_UNKNOWN_CHARSET;
-	      goto err;
-	    }
-	  }
-	}
-	share->uniqueinfo[i].end=pos;
-	pos->type=HA_KEYTYPE_END;			/* End */
-	pos->null_bit=0;
-	pos->flag=0;
-	pos++;
+          share->uniqueinfo[i].end=pos;
+          pos->type=HA_KEYTYPE_END;			/* End */
+          pos->null_bit=0;
+          pos->flag=0;
+          pos++;
+        }
+        share->ftparsers= 0;
       }
-      share->ftparsers= 0;
-    }
 
-    disk_pos_assert(disk_pos + share->base.fields *MI_COLUMNDEF_SIZE, end_pos);
-    for (i=j=offset=0 ; i < share->base.fields ; i++)
-    {
-      disk_pos=mi_recinfo_read(disk_pos,&share->rec[i]);
-      share->rec[i].pack_type=0;
-      share->rec[i].huff_tree=0;
-      share->rec[i].offset=offset;
-      if (share->rec[i].type == (int) FIELD_BLOB)
-      {
-	share->blobs[j].pack_length=
-	  share->rec[i].length-portable_sizeof_char_ptr;
-	share->blobs[j].offset=offset;
-	j++;
+      disk_pos_assert(disk_pos + share->base.fields *MI_COLUMNDEF_SIZE, end_pos);
+      for (i=j=offset=0 ; i < share->base.fields ; i++)
+      {
+        disk_pos=mi_recinfo_read(disk_pos,&share->rec[i]);
+        share->rec[i].pack_type=0;
+        share->rec[i].huff_tree=0;
+        share->rec[i].offset=offset;
+        if (share->rec[i].type == (int) FIELD_BLOB)
+        {
+          share->blobs[j].pack_length=
+            share->rec[i].length-portable_sizeof_char_ptr;
+          share->blobs[j].offset=offset;
+          j++;
+        }
+        offset+=share->rec[i].length;
+      }
+      share->rec[i].type=(int) FIELD_LAST;	/* End marker */
+      if (offset > share->base.reclength)
+      {
+        /* purecov: begin inspected */
+        my_errno= HA_ERR_CRASHED;
+        goto err;
+        /* purecov: end */
       }
-      offset+=share->rec[i].length;
-    }
-    share->rec[i].type=(int) FIELD_LAST;	/* End marker */
-    if (offset > share->base.reclength)
-    {
-      /* purecov: begin inspected */
-      my_errno= HA_ERR_CRASHED;
-      goto err;
-      /* purecov: end */
-    }
-
-    if (! lock_error)
-    {
-      VOID(my_lock(kfile,F_UNLCK,0L,F_TO_EOF,MYF(MY_SEEK_NOT_DONE)));
-      lock_error=1;			/* Database unlocked */
-    }
 
-    if (mi_open_datafile(&info, share, name, -1))
-      goto err;
-    errpos=5;
+      if (! lock_error)
+      {
+        VOID(my_lock(kfile,F_UNLCK,0L,F_TO_EOF,MYF(MY_SEEK_NOT_DONE)));
+        lock_error=1;			/* Database unlocked */
+      }
 
-    share->kfile=kfile;
-    share->this_process=(ulong) getpid();
-    share->last_process= share->state.process;
-    share->base.key_parts=key_parts;
-    share->base.all_key_parts=key_parts+unique_key_parts;
-    if (!(share->last_version=share->state.version))
-      share->last_version=1;			/* Safety */
-    share->rec_reflength=share->base.rec_reflength; /* May be changed */
-    share->base.margin_key_file_length=(share->base.max_key_file_length -
-					(keys ? MI_INDEX_BLOCK_MARGIN *
-					 share->blocksize * keys : 0));
-    share->blocksize=min(IO_SIZE,myisam_block_size);
-    share->data_file_type=STATIC_RECORD;
-    if (share->options & HA_OPTION_COMPRESS_RECORD)
-    {
-      share->data_file_type = COMPRESSED_RECORD;
-      share->options|= HA_OPTION_READ_ONLY_DATA;
-      info.s=share;
-      if (_mi_read_pack_info(&info,
-			     (pbool)
-			     test(!(share->options &
-				    (HA_OPTION_PACK_RECORD |
-				     HA_OPTION_TEMP_COMPRESS_RECORD)))))
-	goto err;
-    }
-    else if (share->options & HA_OPTION_PACK_RECORD)
-      share->data_file_type = DYNAMIC_RECORD;
-    my_afree(disk_cache);
-    mi_setup_functions(share);
-    share->is_log_table= FALSE;
+      if (mi_open_datafile(m_info, share, name, -1))
+        goto err;
+      errpos=5;
+
+      share->kfile=kfile;
+      share->last_process= share->state.process;
+      share->base.key_parts=key_parts;
+      share->base.all_key_parts=key_parts+unique_key_parts;
+      if (!(share->last_version=share->state.version))
+        share->last_version=1;			/* Safety */
+      share->rec_reflength=share->base.rec_reflength; /* May be changed */
+      share->base.margin_key_file_length=(share->base.max_key_file_length -
+                                          (keys ? MI_INDEX_BLOCK_MARGIN *
+                                           share->blocksize * keys : 0));
+      share->data_file_type=STATIC_RECORD;
+      if (share->options & HA_OPTION_COMPRESS_RECORD)
+      {
+        share->data_file_type = COMPRESSED_RECORD;
+        share->options|= HA_OPTION_READ_ONLY_DATA;
+        if (_mi_read_pack_info(m_info,
+                               (pbool)
+                               test(!(share->options &
+                                      (HA_OPTION_PACK_RECORD |
+                                       HA_OPTION_TEMP_COMPRESS_RECORD)))))
+          goto err;
+      }
+      else if (share->options & HA_OPTION_PACK_RECORD)
+        share->data_file_type = DYNAMIC_RECORD;
+      my_afree(disk_cache);
+      mi_setup_functions(share);
+      share->is_log_table= FALSE;
 #ifdef THREAD
-    thr_lock_init(&share->lock);
-    VOID(pthread_mutex_init(&share->intern_lock,MY_MUTEX_INIT_FAST));
-    for (i=0; i<keys; i++)
-      VOID(my_rwlock_init(&share->key_root_lock[i], NULL));
-    VOID(my_rwlock_init(&share->mmap_lock, NULL));
-    if (!thr_lock_inited)
-    {
-      /* Probably a single threaded program; Don't use concurrent inserts */
-      myisam_concurrent_insert=0;
-    }
-    else if (myisam_concurrent_insert)
-    {
-      share->concurrent_insert=
-	((share->options & (HA_OPTION_READ_ONLY_DATA | HA_OPTION_TMP_TABLE |
-			   HA_OPTION_COMPRESS_RECORD |
-			   HA_OPTION_TEMP_COMPRESS_RECORD)) ||
-	 (open_flags & HA_OPEN_TMP_TABLE) ||
-	 have_rtree) ? 0 : 1;
-      if (share->concurrent_insert)
-      {
-	share->lock.get_status=mi_get_status;
-	share->lock.copy_status=mi_copy_status;
-	share->lock.update_status=mi_update_status;
-        share->lock.restore_status= mi_restore_status;
-	share->lock.check_status=mi_check_status;
+      for (i=0; i<keys; i++)
+        VOID(my_rwlock_init(&share->key_root_lock[i], NULL));
+      if (myisam_concurrent_insert)
+      {
+        share->concurrent_insert=
+          ((share->options & (HA_OPTION_READ_ONLY_DATA | HA_OPTION_TMP_TABLE |
+                             HA_OPTION_COMPRESS_RECORD |
+                             HA_OPTION_TEMP_COMPRESS_RECORD)) ||
+           (open_flags & HA_OPEN_TMP_TABLE) ||
+           have_rtree) ? 0 : 1;
+        if (share->concurrent_insert)
+        {
+          share->lock.get_status=mi_get_status;
+          share->lock.copy_status=mi_copy_status;
+          share->lock.update_status=mi_update_status;
+          share->lock.restore_status= mi_restore_status;
+          share->lock.check_status=mi_check_status;
+        }
       }
-    }
 #endif
-    /*
-      Memory mapping can only be requested after initializing intern_lock.
-    */
-    if (open_flags & HA_OPEN_MMAP)
-    {
-      info.s= share;
-      mi_extra(&info, HA_EXTRA_MMAP, 0);
+      /*
+        Memory mapping can only be requested after initializing intern_lock.
+      */
+      if (open_flags & HA_OPEN_MMAP)
+        mi_extra(m_info, HA_EXTRA_MMAP, 0);
     }
-  }
-  else
-  {
-    share= old_info->s;
-    if (mode == O_RDWR && share->mode == O_RDONLY)
+    else
     {
-      my_errno=EACCES;				/* Can't open in write mode */
-      goto err;
+      if (mi_open_datafile(m_info, share, name, -1))
+        goto err;
+      errpos=5;
     }
-    if (mi_open_datafile(&info, share, name, old_info->dfile))
-      goto err;
-    errpos=5;
-    have_rtree= old_info->rtree_recursion_state != NULL;
   }
 
   /* alloc and set up private structure parts */
   if (!my_multi_malloc(MY_WME,
-		       &m_info,sizeof(MI_INFO),
-		       &info.blobs,sizeof(MI_BLOB)*share->base.blobs,
-		       &info.buff,(share->base.max_key_block_length*2+
-				   share->base.max_key_length),
-		       &info.lastkey,share->base.max_key_length*3+1,
-		       &info.first_mbr_key, share->base.max_key_length,
-		       &info.filename,strlen(name)+1,
-		       &info.rtree_recursion_state,have_rtree ? 1024 : 0,
+		       &m_info->blobs,sizeof(MI_BLOB)*share->base.blobs,
+		       &m_info->buff,(share->base.max_key_block_length*2+
+                                     share->base.max_key_length),
+		       &m_info->lastkey,share->base.max_key_length*3+1,
+		       &m_info->first_mbr_key, share->base.max_key_length,
+		       &m_info->rtree_recursion_state,have_rtree ? 1024 : 0,
 		       NullS))
     goto err;
   errpos=6;
 
   if (!have_rtree)
-    info.rtree_recursion_state= NULL;
+    m_info->rtree_recursion_state= NULL;
 
-  strmov(info.filename,name);
-  memcpy(info.blobs,share->blobs,sizeof(MI_BLOB)*share->base.blobs);
-  info.lastkey2=info.lastkey+share->base.max_key_length;
+  memcpy(m_info->blobs,share->blobs,sizeof(MI_BLOB)*share->base.blobs);
+  m_info->lastkey2=m_info->lastkey+share->base.max_key_length;
 
-  info.s=share;
-  info.lastpos= HA_OFFSET_ERROR;
-  info.update= (short) (HA_STATE_NEXT_FOUND+HA_STATE_PREV_FOUND);
-  info.opt_flag=0;                              /* READ_CHECK is NOT default */
-  info.this_unique= (ulong) info.dfile; /* Uniq number in process */
+  m_info->lastpos= HA_OFFSET_ERROR;
+  m_info->update= (short) (HA_STATE_NEXT_FOUND+HA_STATE_PREV_FOUND);
+  m_info->opt_flag=0;
+  m_info->this_unique= (ulong) m_info->dfile; /* Uniq number in process */
   if (share->data_file_type == COMPRESSED_RECORD)
-    info.this_unique= share->state.unique;
-  info.this_loop=0;				/* Update counter */
-  info.last_unique= share->state.unique;
-  info.last_loop=   share->state.update_count;
+    m_info->this_unique= share->state.unique;
+  m_info->this_loop=0;				/* Update counter */
+  m_info->last_unique= share->state.unique;
+  m_info->last_loop=   share->state.update_count;
   if (mode == O_RDONLY)
     share->options|=HA_OPTION_READ_ONLY_DATA;
-  info.lock_type=F_UNLCK;
-  info.quick_mode=0;
-  info.bulk_insert=0;
-  info.ft1_to_ft2=0;
-  info.errkey= -1;
-  info.page_changed=1;
+  m_info->quick_mode=0;
+  m_info->bulk_insert=0;
+  m_info->ft1_to_ft2=0;
+  m_info->errkey= -1;
+  m_info->page_changed=1;
   pthread_mutex_lock(&share->intern_lock);
-  info.read_record=share->read_record;
+  m_info->read_record=share->read_record;
   share->reopen++;
+  DBUG_PRINT("info", ("reopen %d", (uint) share->reopen));
   share->write_flag=MYF(MY_NABP | MY_WAIT_IF_FULL);
   if (share->options & HA_OPTION_READ_ONLY_DATA)
   {
-    info.lock_type=F_RDLCK;
+    m_info->lock_type=F_RDLCK;
     share->r_locks++;
     share->tot_locks++;
   }
@@ -625,28 +769,21 @@ MI_INFO *mi_open(const char *name, int m
     share->write_flag=MYF(MY_NABP);
     share->w_locks++;			/* We don't have to update status */
     share->tot_locks++;
-    info.lock_type=F_WRLCK;
+    m_info->lock_type=F_WRLCK;
   }
   if (((open_flags & HA_OPEN_DELAY_KEY_WRITE) ||
       (share->options & HA_OPTION_DELAY_KEY_WRITE)) &&
       myisam_delay_key_write)
     share->delay_key_write=1;
-  info.state= &share->state.state;	/* Change global values by default */
+  m_info->state= &share->state.state;	/* Change global values by default */
   pthread_mutex_unlock(&share->intern_lock);
 
   /* Allocate buffer for one record */
 
   /* prerequisites: bzero(info) && info->s=share; are met. */
-  if (!mi_alloc_rec_buff(&info, -1, &info.rec_buff))
+  if (!mi_alloc_rec_buff(m_info, -1, &m_info->rec_buff))
     goto err;
-  bzero(info.rec_buff, mi_get_rec_buff_len(&info, info.rec_buff));
-
-  *m_info=info;
-#ifdef THREAD
-  thr_lock_data_init(&share->lock,&m_info->lock,(void*) m_info);
-#endif
-  m_info->open_list.data=(void*) m_info;
-  myisam_open_list=list_add(myisam_open_list,&m_info->open_list);
+  bzero(m_info->rec_buff, mi_get_rec_buff_len(m_info, m_info->rec_buff));
 
   pthread_mutex_unlock(&THR_LOCK_myisam);
   if (myisam_log_file >= 0)
@@ -664,15 +801,13 @@ err:
     mi_report_error(save_errno, name);
   switch (errpos) {
   case 6:
-    my_free((uchar*) m_info,MYF(0));
+    my_free((uchar*) m_info->blobs,MYF(0));
     /* fall through */
   case 5:
-    VOID(my_close(info.dfile,MYF(0)));
-    if (old_info)
-      break;					/* Don't remove open table */
+    VOID(my_close(m_info->dfile,MYF(0)));
     /* fall through */
   case 4:
-    my_free((uchar*) share,MYF(0));
+    my_free((uchar*) share->state.rec_per_key_part,MYF(0));
     /* fall through */
   case 3:
     if (! lock_error)
@@ -691,6 +826,30 @@ err:
   pthread_mutex_unlock(&THR_LOCK_myisam);
   my_errno=save_errno;
   DBUG_RETURN (NULL);
+} /* mi_open_files */
+
+
+
+
+/******************************************************************************
+  open a MyISAM database.
+  See my_base.h for the handle_locking argument
+  if handle_locking and HA_OPEN_ABORT_IF_CRASHED then abort if the table
+  is marked crashed or if we are not using locking and the table doesn't
+  have an open count of 0.
+******************************************************************************/
+
+MI_INFO *mi_open(const char *name, int mode, uint open_flags)
+{
+  MI_INFO *m_info, *ret;
+  DBUG_ENTER("mi_open");
+  if (!(m_info= mi_open_handler(name, mode, open_flags)))
+    DBUG_RETURN(NULL);
+  if (!(ret= mi_open_files(m_info, name, mode, open_flags)))
+  {
+    mi_close_handler(m_info);
+  }
+  DBUG_RETURN(ret);
 } /* mi_open */
 
 

=== modified file 'storage/myisam/myisamdef.h'
--- a/storage/myisam/myisamdef.h	2008-08-23 02:47:43 +0000
+++ b/storage/myisam/myisamdef.h	2009-02-17 14:04:29 +0000
@@ -198,6 +198,7 @@ typedef struct st_mi_isam_share {	/* Sha
   File	data_file;			/* Shared data file */
   int	mode;				/* mode of file on open */
   uint	reopen;				/* How many times reopened */
+  uint  ref_count;                      /* How many references to the share */
   uint	w_locks,r_locks,tot_locks;	/* Number of read/write locks */
   uint	blocksize;			/* blocksize of keyfile */
   myf write_flag;

Thread
bzr commit into mysql-5.1 branch (mattias.jonsson:2799)Mattias Jonsson17 Feb