List:Commits« Previous MessageNext Message »
From:Guilhem Bichot Date:November 20 2008 8:23pm
Subject:bzr push into mysql-6.0 branch (guilhem:2756 to 2757)
View as plain text  
 2757 Guilhem Bichot	2008-11-20 [merge]
      Merge of 5.1-maria (guilhem@stripped) into 6.0-maria
modified:
  mysys/waiting_threads.c
  storage/maria/ha_maria.cc
  storage/maria/ha_maria.h
  storage/maria/ma_control_file.c
  storage/maria/ma_delete.c
  storage/maria/ma_key.c
  storage/maria/ma_loghandler.c
  storage/maria/ma_open.c
  storage/maria/ma_pagecache.c
  storage/maria/ma_state.c
  storage/maria/ma_state.h
  storage/maria/ma_update.c
  storage/maria/ma_write.c
  storage/maria/maria_def.h
  storage/maria/trnman.c

 2756 Sergei Golubchik	2008-11-07 [merge]
      merge
modified:
  include/waiting_threads.h
  mysys/lf_hash.c
  mysys/waiting_threads.c

=== modified file 'mysys/waiting_threads.c'
--- a/mysys/waiting_threads.c	2008-11-03 19:33:34 +0000
+++ b/mysys/waiting_threads.c	2008-11-10 19:11:27 +0000
@@ -396,6 +396,8 @@ void wt_thd_destroy(WT_THD *thd)
 */
 int wt_resource_id_memcmp(void *a, void *b)
 {
+  /* we use the fact that there's no padding in the middle of WT_RESOURCE_ID */
+  compile_time_assert(offsetof(WT_RESOURCE_ID, type) == sizeof(ulonglong));
   return memcmp(a, b, sizeof_WT_RESOURCE_ID);
 }
 

=== modified file 'storage/maria/ha_maria.cc'
--- a/storage/maria/ha_maria.cc	2008-10-20 13:03:34 +0000
+++ b/storage/maria/ha_maria.cc	2008-11-20 19:18:59 +0000
@@ -2200,7 +2200,8 @@ int ha_maria::external_lock(THD *thd, in
         trnman_new_statement(trn);
       }
 
-      if (file->s->lock.get_status)
+      /* If handler uses versioning */
+      if (file->s->lock_key_trees)
       {
         if (_ma_setup_live_state(file))
           DBUG_RETURN(HA_ERR_OUT_OF_MEM);
@@ -2397,7 +2398,8 @@ int ha_maria::implicit_commit(THD *thd, 
         if (handler->s->base.born_transactional)
         {
           _ma_set_trn_for_table(handler, trn);
-          if (handler->s->lock.get_status)
+          /* If handler uses versioning */
+          if (handler->s->lock_key_trees)
           {
             if (_ma_setup_live_state(handler))
               error= HA_ERR_OUT_OF_MEM;
@@ -2958,6 +2960,16 @@ static int mark_recovery_success(void)
 }
 
 
+/*
+  Return 1 if table has changed during the current transaction
+*/
+
+bool ha_maria::is_changed() const
+{
+  return file->state->changed;
+}
+
+
 static int ha_maria_init(void *p)
 {
   int res;

=== modified file 'storage/maria/ha_maria.h'
--- a/storage/maria/ha_maria.h	2008-07-09 21:25:29 +0000
+++ b/storage/maria/ha_maria.h	2008-11-20 19:18:59 +0000
@@ -139,6 +139,7 @@ public:
   int repair(THD * thd, HA_CHECK_OPT * check_opt);
   bool check_and_repair(THD * thd);
   bool is_crashed() const;
+  bool is_changed() const;
   bool auto_repair() const { return 1; }
   int optimize(THD * thd, HA_CHECK_OPT * check_opt);
   int assign_to_keycache(THD * thd, HA_CHECK_OPT * check_opt);

=== modified file 'storage/maria/ma_control_file.c'
--- a/storage/maria/ma_control_file.c	2008-06-05 16:11:22 +0000
+++ b/storage/maria/ma_control_file.c	2008-11-20 19:18:59 +0000
@@ -185,8 +185,12 @@ static CONTROL_FILE_ERROR create_control
     files around (indeed it could be that the control file alone was deleted
     or not restored, and we should not go on with life at this point).
 
-    TODO: For now we trust (this is alpha version), but for beta if would
-    be great to verify.
+    Things should still be relatively safe as if someone tries to use
+    an old table with a new control file the different uuid:s between
+    the files will cause ma_open() to generate an HA_ERR_OLD_FILE
+    error. When used from mysqld this will cause the table to be open
+    in repair mode which will remove all dependencies between the
+    table and the old control file.
 
     We could have a tool which can rebuild the control file, by reading the
     directory of logs, finding the newest log, reading it to find last

=== modified file 'storage/maria/ma_delete.c'
--- a/storage/maria/ma_delete.c	2008-10-20 09:16:47 +0000
+++ b/storage/maria/ma_delete.c	2008-11-20 19:18:59 +0000
@@ -116,6 +116,7 @@ int maria_delete(MARIA_HA *info,const uc
   info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
   share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
                           STATE_NOT_ZEROFILLED);
+  info->state->changed=1;
 
   mi_sizestore(lastpos, info->cur_row.lastpos);
   (void)(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));

=== modified file 'storage/maria/ma_key.c'
--- a/storage/maria/ma_key.c	2008-06-26 15:10:11 +0000
+++ b/storage/maria/ma_key.c	2008-11-04 10:54:04 +0000
@@ -61,7 +61,7 @@ static int _ma_put_key_in_record(MARIA_H
   if trid < 256-12
     one byte
   else
-    one byte prefix (256-length_of_trid_in_bytes) followed by data
+    one byte prefix length_of_trid_in_bytes + 249 followed by data
     in high-byte-first order
 
   Prefix bytes 244 to 249 are reserved for negative transid, that can be used
@@ -69,6 +69,25 @@ static int _ma_put_key_in_record(MARIA_H
 
   We have to store transid in high-byte-first order to be able to do a
   fast byte-per-byte comparision of them without packing them up.
+
+  For example, assuming we the following data:
+
+  key_data:               1                (4 byte integer)
+  pointer_to_row:         2 << 8 + 3 = 515 (page 2, row 3)
+  table_create_transid    1000             Defined at create table time
+  transid                 1010	           Transaction that created row
+  delete_transid          2011             Transaction that deleted row
+
+  In addition we assume the table is created with a data pointer length
+  of 4 bytes (this is automatically calculated based on the medium
+  length of rows and the given max number of rows)
+
+  The binary data for the key would then look like this in hex:
+
+  00 00 00 01     Key data (1 stored high byte first)
+  00 00 00 47	  (515 << 1) + 1         ;  The last 1 is marker that key cont.
+  15              ((1000-1010) << 1) + 1 ;  The last 1 is marker that key cont.
+  FB 07 E6        length byte and  ((2011 - 1000) << 1) = 07 E6
 */
 
 uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid)
@@ -76,7 +95,7 @@ uint transid_store_packed(MARIA_HA *info
   uchar *start;
   uint length;
   uchar buff[8];
-  DBUG_ASSERT(trid < (LL(1) << (MAX_PACK_TRANSID_SIZE*8)));
+  DBUG_ASSERT(trid < (LL(1) << (MARIA_MAX_PACK_TRANSID_SIZE*8)));
   DBUG_ASSERT(trid >= info->s->state.create_trid);
 
   trid= (trid - info->s->state.create_trid) << 1;
@@ -84,7 +103,7 @@ uint transid_store_packed(MARIA_HA *info
   /* Mark that key contains transid */
   to[-1]|= 1;
 
-  if (trid < MIN_TRANSID_PACK_PREFIX)
+  if (trid < MARIA_MIN_TRANSID_PACK_OFFSET)
   {
     to[0]= (uchar) trid;
     return 1;
@@ -100,7 +119,8 @@ uint transid_store_packed(MARIA_HA *info
   } while (trid);
 
   length= (uint) (to - buff);
-  start[0]= (uchar) (256 - length);             /* Store length prefix */
+  /* Store length prefix */
+  start[0]= (uchar) (length + MARIA_TRANSID_PACK_OFFSET);
   start++;
   /* Copy things in high-byte-first order to output buffer */
   do
@@ -127,12 +147,13 @@ ulonglong transid_get_packed(MARIA_SHARE
   ulonglong value;
   uint length;
 
-  if (from[0] < MIN_TRANSID_PACK_PREFIX)
+  if (from[0] < MARIA_MIN_TRANSID_PACK_OFFSET)
     value= (ulonglong) from[0];
   else
   {
     value= 0;
-    for (length= (uint) (256 - from[0]), value= (ulonglong) from[1], from+=2;
+    for (length= (uint) (from[0] - MARIA_TRANSID_PACK_OFFSET),
+           value= (ulonglong) from[1], from+=2;
          --length ;
          from++)
       value= (value << 8) + ((ulonglong) *from);

=== modified file 'storage/maria/ma_loghandler.c'
--- a/storage/maria/ma_loghandler.c	2008-10-20 13:03:34 +0000
+++ b/storage/maria/ma_loghandler.c	2008-11-20 19:18:59 +0000
@@ -121,6 +121,8 @@ struct st_translog_buffer
     in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
   */
   TRANSLOG_ADDRESS next_buffer_offset;
+  /* Previous buffer offset to detect it flush finish */
+  TRANSLOG_ADDRESS prev_buffer_offset;
   /*
      How much is written (or will be written when copy_to_buffer_in_progress
      become 0) to this buffer
@@ -135,12 +137,12 @@ struct st_translog_buffer
   /* list of waiting buffer ready threads */
   struct st_my_thread_var *waiting_flush;
   /*
-    Pointer on the buffer which overlap with this one (due to flush of
+    If true then previous buffer overlap with this one (due to flush of
     loghandler, the last page of that buffer is the same as the first page
     of this buffer) and have to be written first (because contain old
     content of page which present in both buffers)
   */
-  struct st_translog_buffer *overlay;
+  my_bool overlay;
   uint buffer_no;
   /*
     Lock for the buffer.
@@ -175,6 +177,14 @@ struct st_translog_buffer
     With file and offset it allow detect buffer changes
   */
   uint8 ver;
+
+  /*
+    When previous buffer sent to disk it set its address here to allow
+    to detect when it is done
+    (we have to keep it in this buffer to lock buffers only in one direction).
+  */
+  TRANSLOG_ADDRESS prev_sent_to_disk;
+  pthread_cond_t prev_sent_to_disk_cond;
 };
 
 
@@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(stru
   /* list of waiting buffer ready threads */
   buffer->waiting_flush= 0;
   /* lock for the buffer. Current buffer also lock the handler */
-  if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST))
+  if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST) ||
+      pthread_cond_init(&buffer->prev_sent_to_disk_cond, 0))
     DBUG_RETURN(1);
   buffer->is_closing_buffer= 0;
+  buffer->prev_sent_to_disk= LSN_IMPOSSIBLE;
+  buffer->prev_buffer_offset= LSN_IMPOSSIBLE;
   buffer->ver= 0;
   DBUG_RETURN(0);
 }
@@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRAN
   {
     translog_lock_assert_owner();
     translog_start_buffer(new_buffer, cursor, new_buffer_no);
+    new_buffer->prev_buffer_offset=
+      log_descriptor.buffers[old_buffer_no].offset;
+    new_buffer->prev_last_lsn=
+      BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
   }
   log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
-  new_buffer->prev_last_lsn=
-    BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
   DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx)  buffer: 0x%lx",
                       LSN_IN_PARTS(new_buffer->prev_last_lsn),
                       (ulong) new_buffer));
@@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRAN
 
   SYNOPSIS
     translog_set_sent_to_disk()
-    lsn                  LSN to assign
-    in_buffers           to assign to in_buffers_only
+    buffer               buffer which we have sent to disk
 
   TODO: use atomic operations if possible (64bit architectures?)
 */
 
-static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers)
+static void translog_set_sent_to_disk(struct st_translog_buffer *buffer)
 {
+  LSN lsn= buffer->last_lsn;
+  TRANSLOG_ADDRESS in_buffers= buffer->next_buffer_offset;
+
   DBUG_ENTER("translog_set_sent_to_disk");
   pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
   DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx)  "
@@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_l
   }
 }
 
+/*
+  @brief Waits previous buffer flush finish
+
+  @param buffer          buffer for check
+
+  @retval 0 previous buffer flushed and this thread have to flush this one
+  @retval 1 previous buffer flushed and this buffer flushed by other thread too
+*/
+
+my_bool translog_prev_buffer_flush_wait(struct st_translog_buffer *buffer)
+{
+  TRANSLOG_ADDRESS offset= buffer->offset;
+  TRANSLOG_FILE *file= buffer->file;
+  uint8 ver= buffer->ver;
+  DBUG_ENTER("translog_prev_buffer_flush_wait");
+  DBUG_PRINT("enter", ("buffer: 0x%lx  #%u  offset: (%lu,0x%lx)  "
+                       "prev sent: (%lu,0x%lx) prev offset: (%lu,0x%lx)",
+                       (ulong) buffer, (uint) buffer->buffer_no,
+                       LSN_IN_PARTS(buffer->offset),
+                       LSN_IN_PARTS(buffer->prev_sent_to_disk),
+                       LSN_IN_PARTS(buffer->prev_buffer_offset)));
+  translog_buffer_lock_assert_owner(buffer);
+  /*
+    if prev_sent_to_disk == LSN_IMPOSSIBLE then
+    prev_buffer_offset should be LSN_IMPOSSIBLE
+    because it means that this buffer was never used
+  */
+  DBUG_ASSERT((buffer->prev_sent_to_disk == LSN_IMPOSSIBLE &&
+               buffer->prev_buffer_offset == LSN_IMPOSSIBLE) ||
+              buffer->prev_sent_to_disk != LSN_IMPOSSIBLE);
+  if (buffer->prev_buffer_offset != buffer->prev_sent_to_disk)
+  {
+    do {
+      pthread_cond_wait(&buffer->prev_sent_to_disk_cond, &buffer->mutex);
+      if (buffer->file != file || buffer->offset != offset ||
+          buffer->ver != ver)
+      {
+        translog_buffer_unlock(buffer);
+        DBUG_RETURN(1); /* some the thread flushed the buffer already */
+      }
+    } while(buffer->prev_buffer_offset != buffer->prev_sent_to_disk);
+  }
+  DBUG_RETURN(0);
+}
+
 
 /*
   Flush given buffer
@@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(str
   if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
     DBUG_RETURN(0); /* some the thread flushed the buffer already */
 
-  if (buffer->overlay && buffer->overlay->file == buffer->file &&
-      cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
-                        buffer->offset) > 0)
-  {
-    /*
-      This can't happen for normal translog_flush,
-      only during destroying the loghandler
-    */
-    struct st_translog_buffer *overlay= buffer->overlay;
-    TRANSLOG_ADDRESS buffer_offset= buffer->offset;
-    TRANSLOG_FILE *fl= buffer->file;
-    uint8 ver= buffer->ver;
-    translog_buffer_unlock(buffer);
-    translog_buffer_lock(overlay);
-    /* rechecks under mutex protection that overlay is still our overlay */
-    if (buffer->overlay->file == fl &&
-        cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
-                          buffer_offset) > 0)
-    {
-      translog_wait_for_buffer_free(overlay);
-    }
-    translog_buffer_unlock(overlay);
-    translog_buffer_lock(buffer);
-    if (buffer->file != fl || buffer_offset != buffer->offset ||
-        ver != buffer->ver)
-    {
-      /*
-        This means that somebody else flushed the buffer while we was
-        waiting for overlay then for locking buffer again.
-      */
-      DBUG_RETURN(0);
-    }
-  }
+  if (buffer->overlay && translog_prev_buffer_flush_wait(buffer))
+    DBUG_RETURN(0); /* some the thread flushed the buffer already */
 
   /*
     Send page by page in the pagecache what we are going to write on the
@@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(str
   file->is_sync= 0;
 
   if (LSN_OFFSET(buffer->last_lsn) != 0)    /* if buffer->last_lsn is set */
-    translog_set_sent_to_disk(buffer->last_lsn,
-                              buffer->next_buffer_offset);
+  {
+    if (translog_prev_buffer_flush_wait(buffer))
+      DBUG_RETURN(0); /* some the thread flushed the buffer already */
+    translog_set_sent_to_disk(buffer);
+  }
   else
     translog_set_only_in_buffers(buffer->next_buffer_offset);
+
+  /* say to next buffer that we are finished */
+  {
+    struct st_translog_buffer *next_buffer=
+      log_descriptor.buffers + ((buffer->buffer_no + 1) % TRANSLOG_BUFFERS_NO);
+    if (likely(translog_status == TRANSLOG_OK)){
+      translog_buffer_lock(next_buffer);
+      next_buffer->prev_sent_to_disk= buffer->offset;
+      translog_buffer_unlock(next_buffer);
+      pthread_cond_broadcast(&next_buffer->prev_sent_to_disk_cond);
+    }
+    else
+    {
+      /*
+        It is shutdown =>
+          1) there is only one thread
+          2) mutexes of other buffers can be destroyed => we can't use them
+      */
+      next_buffer->prev_sent_to_disk= buffer->offset;
+    }
+  }
   /* Free buffer */
   buffer->file= NULL;
   buffer->overlay= 0;
@@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(
     }
     translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
     old_buffer->next_buffer_offset= new_buffer->offset;
+    new_buffer->prev_buffer_offset= old_buffer->offset;
     translog_buffer_unlock(old_buffer);
     offset-= min_offset;
   }
@@ -7355,7 +7411,7 @@ static void translog_force_current_buffe
     log_descriptor.bc.ptr+= current_page_fill;
     log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
       current_page_fill;
-    new_buffer->overlay= old_buffer;
+    new_buffer->overlay= 1;
   }
   else
     translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
@@ -7428,8 +7484,8 @@ static void translog_force_current_buffe
     memcpy(new_buffer->buffer, data, current_page_fill);
   }
   old_buffer->next_buffer_offset= new_buffer->offset;
-
   translog_buffer_lock(new_buffer);
+  new_buffer->prev_buffer_offset= old_buffer->offset;
   translog_buffer_decrease_writers(new_buffer);
   translog_buffer_unlock(new_buffer);
 

=== modified file 'storage/maria/ma_open.c'
--- a/storage/maria/ma_open.c	2008-10-23 16:29:52 +0000
+++ b/storage/maria/ma_open.c	2008-11-20 19:18:59 +0000
@@ -449,7 +449,7 @@ MARIA_HA *maria_open(const char *name, i
     /* Ensure we have space in the key buffer for transaction id's */
     if (share->base.born_transactional)
       share->base.max_key_length= ALIGN_SIZE(share->base.max_key_length +
-                                             MAX_PACK_TRANSID_SIZE);
+                                             MARIA_MAX_PACK_TRANSID_SIZE);
 
     /*
       If page cache is not initialized, then assume we will create the
@@ -824,6 +824,7 @@ MARIA_HA *maria_open(const char *name, i
     (void)(my_rwlock_init(&share->mmap_lock, NULL));
 
     share->row_is_visible= _ma_row_visible_always;
+    share->lock.get_status= _ma_reset_update_flag;
     if (!thr_lock_inited)
     {
       /* Probably a single threaded program; Don't use concurrent inserts */

=== modified file 'storage/maria/ma_pagecache.c'
--- a/storage/maria/ma_pagecache.c	2008-10-20 13:03:34 +0000
+++ b/storage/maria/ma_pagecache.c	2008-11-20 19:18:59 +0000
@@ -97,8 +97,9 @@
 
 #define PCBLOCK_INFO(B) \
   DBUG_PRINT("info", \
-             ("block: 0x%lx  fd: %lu  page: %lu  s: %0x  hshL: 0x%lx  req: %u/%u " \
-              "wrlocks: %u  rdlocks %u  rdlocks_q: %u  pins: %u", \
+             ("block: 0x%lx  fd: %lu  page: %lu  s: %0x  hshL: " \
+              " 0x%lx  req: %u/%u wrlocks: %u  rdlocks %u  " \
+              "rdlocks_q: %u  pins: %u  status: %u", \
               (ulong)(B), \
               (ulong)((B)->hash_link ? \
                       (B)->hash_link->file.file : \
@@ -113,7 +114,7 @@
                      (B)->hash_link->requests : \
                        0), \
               block->wlocks, block->rlocks, block->rlocks_queue, \
-              (uint)(B)->pins))
+              (uint)(B)->pins, (uint)(B)->status))
 
 /* TODO: put it to my_static.c */
 my_bool my_disable_flush_pagecache_blocks= 0;
@@ -2598,6 +2599,8 @@ static void read_block(PAGECACHE *pageca
 {
 
   DBUG_ENTER("read_block");
+  DBUG_PRINT("enter", ("read block: 0x%lx  primary: %d",
+                       (ulong)block, primary));
   if (primary)
   {
     size_t error;
@@ -2606,9 +2609,6 @@ static void read_block(PAGECACHE *pageca
       that submitted primary requests
     */
 
-    DBUG_PRINT("read_block",
-               ("page to be read by primary request"));
-
     pagecache->global_cache_read++;
     /* Page is not in buffer yet, is to be read from disk */
     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
@@ -2655,9 +2655,7 @@ static void read_block(PAGECACHE *pageca
       This code is executed only by threads
       that submitted secondary requests
     */
-    DBUG_PRINT("read_block",
-               ("secondary request waiting for new page to be read"));
-    {
+
 #ifdef THREAD
       struct st_my_thread_var *thread= my_thread_var;
       /* Put the request into a queue and wait until it can be processed */
@@ -2674,7 +2672,6 @@ static void read_block(PAGECACHE *pageca
       KEYCACHE_DBUG_ASSERT(0);
       /* No parallel requests in single-threaded case */
 #endif
-    }
     DBUG_PRINT("read_block",
                ("secondary request: new page in cache"));
   }
@@ -3310,7 +3307,6 @@ restart:
                         page_cache_page_type_str[type]));
     if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ))
     {
-      DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
       /* The requested page is to be read into the block buffer */
       read_block(pagecache, block,
                  (my_bool)(page_st == PAGE_TO_BE_READ));
@@ -3845,6 +3841,7 @@ restart:
   {
     /* Key cache is used */
     int page_st;
+    my_bool need_page_ready_signal= FALSE;
 
     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
     if (!pagecache->can_be_used)
@@ -3859,10 +3856,7 @@ restart:
     reg_request= ((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
                   (pin == PAGECACHE_PIN));
     block= find_block(pagecache, file, pageno, level,
-                      (write_mode != PAGECACHE_WRITE_DONE &&
-                       lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
-                       lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
-                       lock != PAGECACHE_LOCK_WRITE_TO_READ),
+                      TRUE,
                       reg_request, &page_st);
     if (!block)
     {
@@ -3873,6 +3867,21 @@ restart:
       /* Write to the disk key cache is in resize at the moment*/
       goto no_key_cache;
     }
+    DBUG_PRINT("info", ("page status: %d", page_st));
+    if (!(block->status & PCBLOCK_ERROR) &&
+        ((page_st == PAGE_TO_BE_READ &&
+          (offset || size < pagecache->block_size)) ||
+         (page_st == PAGE_WAIT_TO_BE_READ)))
+    {
+      /* The requested page is to be read into the block buffer */
+      read_block(pagecache, block,
+                 (my_bool)(page_st == PAGE_TO_BE_READ));
+      DBUG_PRINT("info", ("read is done"));
+    }
+    else if (page_st == PAGE_TO_BE_READ)
+    {
+      need_page_ready_signal= TRUE;
+    }
 
     DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
                 block->type == PAGECACHE_READ_UNKNOWN_PAGE ||
@@ -3959,6 +3968,12 @@ restart:
         block->status&= ~PCBLOCK_ERROR;
     }
 
+#ifdef THREAD
+    if (need_page_ready_signal &&
+        block->wqueue[COND_FOR_REQUESTED].last_thread)
+      wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+#endif
+
     if (first_REDO_LSN_for_page)
     {
       /* single write action of the last write action */

=== modified file 'storage/maria/ma_state.c'
--- a/storage/maria/ma_state.c	2008-08-28 18:52:23 +0000
+++ b/storage/maria/ma_state.c	2008-11-03 13:53:22 +0000
@@ -96,6 +96,8 @@ my_bool _ma_setup_live_state(MARIA_HA *i
   pthread_mutex_unlock(&share->intern_lock);
   /* The current item can't be deleted as it's the first one visible for us */
   tables->state_start=  tables->state_current= history->state;
+  tables->state_current.changed= 0;
+
   DBUG_PRINT("info", ("records: %ld", (ulong) tables->state_start.records));
 
 end:
@@ -262,6 +264,7 @@ void _ma_get_status(void* param, my_bool
 #endif
   info->state_save= info->s->state.state;
   info->state= &info->state_save;
+  info->state->changed= 0;
   info->append_insert_at_end= concurrent_insert;
   DBUG_VOID_RETURN;
 }
@@ -315,6 +318,14 @@ void _ma_copy_status(void* to, void *fro
 }
 
 
+void _ma_reset_update_flag(void *param,
+                           my_bool concurrent_insert __attribute__((unused)))
+{
+  MARIA_HA *info=(MARIA_HA*) param;
+  info->state->changed= 0;
+}
+
+
 /**
    @brief Check if should allow concurrent inserts
 

=== modified file 'storage/maria/ma_state.h'
--- a/storage/maria/ma_state.h	2008-07-12 14:14:28 +0000
+++ b/storage/maria/ma_state.h	2008-11-03 13:53:22 +0000
@@ -24,6 +24,7 @@ typedef struct st_maria_status_info
   my_off_t key_file_length;
   my_off_t data_file_length;
   ha_checksum checksum;
+  my_bool     changed;
 } MARIA_STATUS_INFO;
 
 
@@ -62,6 +63,7 @@ void _ma_get_status(void* param, my_bool
 void _ma_update_status(void* param);
 void _ma_restore_status(void *param);
 void _ma_copy_status(void* to, void *from);
+void _ma_reset_update_flag(void *param, my_bool concurrent_insert);
 my_bool _ma_check_status(void *param);
 void _ma_block_get_status(void* param, my_bool concurrent_insert);
 void _ma_block_update_status(void *param);

=== modified file 'storage/maria/ma_update.c'
--- a/storage/maria/ma_update.c	2008-08-25 18:23:18 +0000
+++ b/storage/maria/ma_update.c	2008-11-20 19:18:59 +0000
@@ -173,6 +173,7 @@ int maria_update(register MARIA_HA *info
   */
   info->update= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED | key_changed);
   share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
+  info->state->changed= 1;
 
   /*
     Every Maria function that updates Maria table must end with

=== modified file 'storage/maria/ma_write.c'
--- a/storage/maria/ma_write.c	2008-10-20 09:16:47 +0000
+++ b/storage/maria/ma_write.c	2008-11-20 19:18:59 +0000
@@ -289,6 +289,7 @@ int maria_write(MARIA_HA *info, uchar *r
   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
 		 HA_STATE_ROW_CHANGED);
   share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
+  info->state->changed= 1;
 
   info->cur_row.lastpos= filepos;
   (void)(_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));

=== modified file 'storage/maria/maria_def.h'
--- a/storage/maria/maria_def.h	2008-10-14 21:23:33 +0000
+++ b/storage/maria/maria_def.h	2008-11-04 10:54:04 +0000
@@ -146,14 +146,15 @@ typedef struct st_maria_state_info
 #define MARIA_KEYDEF_SIZE	(2+ 5*2)
 #define MARIA_UNIQUEDEF_SIZE	(2+1+1)
 #define HA_KEYSEG_SIZE		(6+ 2*2 + 4*2)
-#define MARIA_MAX_KEY_BUFF	(HA_MAX_KEY_BUFF + MAX_PACK_TRANSID_SIZE)
+#define MARIA_MAX_KEY_BUFF	(HA_MAX_KEY_BUFF + MARIA_MAX_PACK_TRANSID_SIZE)
 #define MARIA_COLUMNDEF_SIZE	(2*7+1+1+4)
 #define MARIA_BASE_INFO_SIZE	(MY_UUID_SIZE + 5*8 + 6*4 + 11*2 + 6 + 5*2 + 1 + 16)
 #define MARIA_INDEX_BLOCK_MARGIN 16	/* Safety margin for .MYI tables */
-/* Internal management bytes needed to store 2 keys on an index page */
-#define MAX_PACK_TRANSID_SIZE (TRANSID_SIZE+1)
-#define MIN_TRANSID_PACK_PREFIX (256-TRANSID_SIZE*2)
-#define MARIA_INDEX_OVERHEAD_SIZE (MAX_PACK_TRANSID_SIZE * 2)
+/* Internal management bytes needed to store 2 transid/key on an index page */
+#define MARIA_MAX_PACK_TRANSID_SIZE   (TRANSID_SIZE+1)
+#define MARIA_TRANSID_PACK_OFFSET     (256- TRANSID_SIZE - 1)
+#define MARIA_MIN_TRANSID_PACK_OFFSET (MARIA_TRANSID_PACK_OFFSET-TRANSID_SIZE)
+#define MARIA_INDEX_OVERHEAD_SIZE     (MARIA_MAX_PACK_TRANSID_SIZE * 2)
 #define MARIA_DELETE_KEY_NR  255	/* keynr for deleted blocks */
 
 /*
@@ -941,8 +942,8 @@ extern my_bool _ma_compact_keypage(MARIA
 extern uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid);
 extern ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from);
 #define transid_packed_length(data) \
-  ((data)[0] < MIN_TRANSID_PACK_PREFIX ? 1 : \
-   (uint) (257 - (uchar) (data)[0]))
+  ((data)[0] < MARIA_MIN_TRANSID_PACK_OFFSET ? 1 : \
+   (uint) ((uchar) (data)[0]) - (MARIA_TRANSID_PACK_OFFSET - 1))
 #define key_has_transid(key) (*(key) & 1)
 
 extern MARIA_KEY *_ma_make_key(MARIA_HA *info, MARIA_KEY *int_key, uint keynr,

=== modified file 'storage/maria/trnman.c'
--- a/storage/maria/trnman.c	2008-10-30 11:21:05 +0000
+++ b/storage/maria/trnman.c	2008-11-20 14:11:00 +0000
@@ -37,6 +37,13 @@ static TRN committed_list_min, committed
 /* a counter, used to generate transaction ids */
 static TrID global_trid_generator;
 
+/*
+  The minimum existing transaction id for trnman_get_min_trid()
+  The default value is used when transaction manager not initialize;
+  Probably called from maria_chk
+*/
+static TrID trid_min_read_from= ~(TrID) 0;
+
 /* the mutex for everything above */
 static pthread_mutex_t LOCK_trn_list;
 
@@ -158,6 +165,7 @@ int trnman_init(TrID initial_trid)
 
   pool= 0;
   global_trid_generator= initial_trid;
+  trid_min_read_from= initial_trid;
   lf_hash_init(&trid_to_trn, sizeof(TRN*), LF_HASH_UNIQUE,
                0, 0, trn_get_hash_key, 0);
   DBUG_PRINT("info", ("pthread_mutex_init LOCK_trn_list"));
@@ -303,6 +311,7 @@ TRN *trnman_new_trn(WT_THD *wt)
   if (!trn->pins)
   {
     trnman_free_trn(trn);
+    pthread_mutex_unlock(&LOCK_trn_list);
     return 0;
   }
 
@@ -315,6 +324,7 @@ TRN *trnman_new_trn(WT_THD *wt)
   trn->next= &active_list_max;
   trn->prev= active_list_max.prev;
   active_list_max.prev= trn->prev->next= trn;
+  trid_min_read_from= active_list_min.next->min_read_from;
   DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list"));
   pthread_mutex_unlock(&LOCK_trn_list);
 
@@ -437,6 +447,8 @@ my_bool trnman_end_trn(TRN *trn, my_bool
     trn->next= free_me;
     free_me= trn;
   }
+  trid_min_read_from= active_list_min.next->min_read_from;
+
   if ((*trnman_end_trans_hook)(trn, commit,
                                active_list_min.next != &active_list_max))
     res= -1;
@@ -670,7 +682,10 @@ my_bool trnman_collect_transactions(LEX_
     */
     uint sid;
     LSN rec_lsn, undo_lsn, first_undo_lsn;
-    if ((sid= trn->short_id) == 0)
+    pthread_mutex_lock(&trn->state_lock);
+    sid= trn->short_id;
+    pthread_mutex_unlock(&trn->state_lock);
+    if (sid == 0)
     {
       /*
         Not even inited, has done nothing. Or it is the
@@ -787,25 +802,14 @@ TRN *trnman_get_any_trn()
 
 
 /**
-  Returns the minimum existing transaction id
-
-  @notes
-    This can only be called when we have at least one running transaction.
+  Returns the minimum existing transaction id. May return a too small
+  number in race conditions, but this is ok as the value is used to
+  remove not visible transid from index/rows.
 */
 
 TrID trnman_get_min_trid()
 {
-  TrID min_read_from;
-  if (short_trid_to_active_trn == NULL)
-  {
-    /* Transaction manager not initialize; Probably called from maria_chk */
-    return ~(TrID) 0;
-  }
-
-  pthread_mutex_lock(&LOCK_trn_list);
-  min_read_from= active_list_min.next->min_read_from;
-  pthread_mutex_unlock(&LOCK_trn_list);
-  return min_read_from;
+  return trid_min_read_from;
 }
 
 

Thread
bzr push into mysql-6.0 branch (guilhem:2756 to 2757) Guilhem Bichot20 Nov