List:Internals« Previous MessageNext Message »
From:jonas.oreland Date:April 19 2005 3:21pm
Subject:bk commit into 5.1-ndb tree (joreland:1.1860)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1-ndb repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1860 05/04/19 15:21:49 joreland@stripped +7 -0
  wl1870 - ndb diskdata
    initial support for traversing undo log

  ndb/src/kernel/blocks/tsman.cpp
    1.31 05/04/19 15:21:45 joreland@stripped +6 -0
    #ifdef incomplete undoing in tsman

  ndb/src/kernel/blocks/print_file.cpp
    1.4 05/04/19 15:21:45 joreland@stripped +31 -8
    Update undo printer

  ndb/src/kernel/blocks/lgman.hpp
    1.26 05/04/19 15:21:45 joreland@stripped +12 -27
    Add support for executing undo log

  ndb/src/kernel/blocks/lgman.cpp
    1.33 05/04/19 15:21:45 joreland@stripped +494 -140
    Add support for executing undo log

  ndb/src/kernel/blocks/diskpage.hpp
    1.18 05/04/19 15:21:45 joreland@stripped +2 -0
    Add bit to indicate no extra LSN in undo record

  ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
    1.17 05/04/19 15:21:45 joreland@stripped +10 -8
    Fix size of undo records

  ndb/include/kernel/signaldata/LgmanContinueB.hpp
    1.4 05/04/19 15:21:45 joreland@stripped +2 -0
    execute/read undo

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	joreland
# Host:	eel.hemma.oreland.se.ndb.mysql.com.ndb.mysql.com.ndb.mysql.com
# Root:	/home/jonas/src/mysql-5.1-ndb-dd

--- 1.3/ndb/include/kernel/signaldata/LgmanContinueB.hpp	Wed Apr 13 16:10:00 2005
+++ 1.4/ndb/include/kernel/signaldata/LgmanContinueB.hpp	Tue Apr 19 15:21:45 2005
@@ -31,6 +31,8 @@
     ,FLUSH_LOG = 2
     ,PROCESS_LOG_BUFFER_WAITERS = 3
     ,FIND_LOG_HEAD = 4
+    ,EXECUTE_UNDO_RECORD = 5
+    ,READ_UNDO_LOG = 6
   };
 };
 

--- 1.16/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Tue Apr  5 08:57:51 2005
+++ 1.17/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Tue Apr 19 15:21:45 2005
@@ -705,11 +705,13 @@
   Logfile_client lsman(this, c_lgman, logfile_group_id);
 
   Disk_undo::Alloc alloc;
-  alloc.m_type_length= Disk_undo::UNDO_ALLOC << 16 | sizeof(alloc);
+  alloc.m_type_length= Disk_undo::UNDO_ALLOC << 16 | (sizeof(alloc) >> 2);
   alloc.m_page_no = key->m_page_no;
   alloc.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
-
-  Uint64 lsn= lsman.add_entry(&alloc, sizeof(alloc) >> 2);
+  
+  Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
+  
+  Uint64 lsn= lsman.add_entry<1>(c);
   page->m_page_header.m_page_lsn_hi= lsn >> 32;
   page->m_page_header.m_page_lsn_lo= lsn & 0xFFFFFFFF;
 }
@@ -726,7 +728,7 @@
   update.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
   update.m_gci= gci;
   
-  update.m_type_length= Disk_undo::UNDO_UPDATE << 16 | (sizeof(update)+sz-4);
+  update.m_type_length= Disk_undo::UNDO_UPDATE << 16 | (sz + (sizeof(update)
>> 2) - 1);
 
   Logfile_client::Change c[3] = {
     { &update, 3 },
@@ -753,16 +755,16 @@
   free.m_file_no_page_idx= key->m_file_no << 16 | key->m_page_idx;
   free.m_gci= gci;
   
-  free.m_type_length= Disk_undo::UNDO_FREE << 16 | (sizeof(free)+4*sz-4);
-
+  free.m_type_length= Disk_undo::UNDO_FREE << 16 | (sz + (sizeof(free) >> 2)
- 1);
+  
   Logfile_client::Change c[3] = {
     { &free, 3 },
     { src, sz },
     { &free.m_type_length, 1 }
   };
-
+  
   ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
-    
+  
   Uint64 lsn= lsman.add_entry<3>(c);
   page->m_page_header.m_page_lsn_hi= lsn >> 32;
   page->m_page_header.m_page_lsn_lo= lsn & 0xFFFFFFFF;

--- 1.17/ndb/src/kernel/blocks/diskpage.hpp	Wed Mar 16 23:42:35 2005
+++ 1.18/ndb/src/kernel/blocks/diskpage.hpp	Tue Apr 19 15:21:45 2005
@@ -144,6 +144,8 @@
       ,UNDO_TUP_ALLOC  = 3
       ,UNDO_TUP_UPDATE = 4
       ,UNDO_TUP_FREE   = 5
+      
+      ,UNDO_NEXT_LSN   = 0x8000
     };
 
     struct Undo_lcp

--- 1.32/ndb/src/kernel/blocks/lgman.cpp	Fri Apr 15 07:58:20 2005
+++ 1.33/ndb/src/kernel/blocks/lgman.cpp	Tue Apr 19 15:21:45 2005
@@ -73,6 +73,7 @@
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
   addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
 
+  m_last_lsn = 0;
   m_logfile_group_pool.setSize(10);
   m_logfile_group_hash.setSize(10);
   m_file_pool.setSize(10);
@@ -156,6 +157,16 @@
       init_run_undo_log(signal);
     }
     return;
+  case LgmanContinueB::EXECUTE_UNDO_RECORD:
+    execute_undo_record(signal);
+    return;
+  case LgmanContinueB::READ_UNDO_LOG:
+  {
+    Ptr<Logfile_group> ptr;
+    m_logfile_group_pool.getPtr(ptr, ptrI);
+    read_undo_log(signal, ptr);
+    return;
+  }
   }
 }
 
@@ -202,10 +213,6 @@
     m_logfile_group_hash.add(ptr);
     m_logfile_group_list.add(ptr);
     
-    signal->theData[0] = LgmanContinueB::FLUSH_LOG;
-    signal->theData[1] = ptr.i;
-    sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
-
     CreateFilegroupImplConf* conf= 
       (CreateFilegroupImplConf*)signal->getDataPtr();
     conf->senderData = senderData;
@@ -443,6 +450,9 @@
        */
       free.add(ptr);
       ptr.p->m_state = Undofile::FS_ONLINE;
+      signal->theData[0] = LgmanContinueB::FLUSH_LOG;
+      signal->theData[1] = ptr.i;
+      sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
     }
   }
   else
@@ -487,8 +497,8 @@
   ndbrequire(false);
 }
 
-#define READER 0
-#define WRITER 1
+#define CONSUMER 0
+#define PRODUCER 1
 
 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
 {
@@ -503,10 +513,10 @@
 
   m_free_file_words = 0;
   m_free_buffer_words = 0;
-  m_pos[READER].m_current_page.m_ptr_i = RNIL;   // { m_buffer_pages, idx }
-  m_pos[READER].m_current_pos.m_ptr_i = RNIL;    // { page ptr.i, m_words_used}
-  m_pos[WRITER].m_current_page.m_ptr_i = RNIL;   // { m_buffer_pages, idx }
-  m_pos[WRITER].m_current_pos.m_ptr_i = RNIL;    // { page ptr.i, m_words_used}
+  m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
+  m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
+  m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
+  m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
 
   m_tail_pos[2].m_ptr_i= RNIL;
   m_tail_pos[2].m_idx= ~0;
@@ -573,15 +583,15 @@
   ndbrequire(map.next(it));
   tmp[1] = *it.data;
   
-  ptr.p->m_pos[READER].m_current_page.m_ptr_i = 0;        // Index in page map
-  ptr.p->m_pos[READER].m_current_page.m_idx = range.m_idx - 1; // left in range
-  ptr.p->m_pos[READER].m_current_pos.m_ptr_i = range.m_ptr_i;  // Which page
-  ptr.p->m_pos[READER].m_current_pos.m_idx = 0;                // Where on page
-
-  ptr.p->m_pos[WRITER].m_current_page.m_ptr_i = 0;         // Index in page map
-  ptr.p->m_pos[WRITER].m_current_page.m_idx = range.m_idx - 1; // left in range
-  ptr.p->m_pos[WRITER].m_current_pos.m_ptr_i = range.m_ptr_i;  // Which page
-  ptr.p->m_pos[WRITER].m_current_pos.m_idx = 0;                // Where on page
+  ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0;      // Index in page map
+  ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
+  ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
+  ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0;               // Page pos
+
+  ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0;      // Index in page map
+  ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
+  ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
+  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;               // Page pos
 
   return true;
 }
@@ -642,69 +652,15 @@
   return 0;
 }
 
-Uint64
-Logfile_client::add_entry(Local_key key, void * base, Change* c)
-{
-#if 0
-  Uint32 len = c->len;
-  Uint32 off = (char*)c->ptr - (char*)base;
-  File_formats::Undofile::Undo_entry* ptr;
-  if((ptr = get_log_buffer(4+len)))
-  {
-    ptr->m_file_no = key.m_file_no;
-    ptr->m_page_no = key.m_page_no;
-    ptr->m_changes[0].m_len_offset = (len << 16) | off;
-    memcpy(ptr->m_changes[0].m_data, c->ptr, 4 * len);
-    ptr->m_changes[0].m_data[len] = 4 + len;
-    return 1;
-  }
-#endif
-  return 0;
-}
-
-Uint64
-Logfile_client::add_entry(Local_key key, Uint32 off, Uint32 word)
-{
-#if 0
-  File_formats::Undofile::Undo_entry* ptr = 0;
-  
-  if((ptr = get_log_buffer(5)))
-  {
-    ptr->m_file_no = key.m_file_no;
-    ptr->m_page_no = key.m_page_no;
-    ptr->m_changes[0].m_len_offset = (1 << 16) | off;
-    ptr->m_changes[0].m_data[0] = word;
-    ptr->m_changes[0].m_data[1] = 5;
-    return 1;
-  }
-#endif
-  return 0;
-}
-
-Uint64
-Logfile_client::add_entry(const void* src, Uint32 len)
-{
-  Uint32 *dst= get_log_buffer(len);
-  memcpy(dst, src, 4*len);
-  
-#if 0
-  const Uint32 *data= (Uint32*)src;
-  for(Uint32 i= 0; i<len >> 2; i++)
-    printf("%.8x ", data[i]);
-  printf("\n");
-#endif
-  return m_latest_lsn;
-}
-
 Uint32*
 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
 {
-  GlobalPage *pageP;
-  pageP= m_global_page_pool.getPtr(ptr.p->m_pos[WRITER].m_current_pos.m_ptr_i);
+  GlobalPage *page;
+  page=m_global_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
   
   Uint32 total_free= ptr.p->m_free_buffer_words;
   assert(total_free >= sz);
-  Uint32 pos= ptr.p->m_pos[WRITER].m_current_pos.m_idx;
+  Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
   Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
 
   if(sz <= free)
@@ -713,8 +669,8 @@
     // fits this page wo/ problem
     ndbassert(total_free >= sz);
     ptr.p->m_free_buffer_words = total_free - sz;
-    ptr.p->m_pos[WRITER].m_current_pos.m_idx = pos + sz;
-    return ((File_formats::Undofile::Undo_page*)pageP)->m_data + pos;
+    ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
+    return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
   }
   
   /**
@@ -722,7 +678,7 @@
    */
   Uint64 lsn= ptr.p->m_last_lsn;
   File_formats::Undofile::Undo_page* undo= 
-    (File_formats::Undofile::Undo_page*)pageP;
+    (File_formats::Undofile::Undo_page*)page;
   undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
   undo->m_page_header.m_page_lsn_hi = lsn >> 32;
   undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
@@ -736,7 +692,7 @@
   pos= 0;
   assert(total_free >= free);
   total_free -= free;
-  pageP= m_global_page_pool.getPtr(next_page(ptr.p, WRITER));
+  page= m_global_page_pool.getPtr(next_page(ptr.p, PRODUCER));
   goto next;
 }
 
@@ -774,24 +730,11 @@
   }
 }
 
-Uint32*
-Logfile_client::get_log_buffer(Uint32 sz)
-{
-  Lgman::Logfile_group key;
-  key.m_logfile_group_id= m_logfile_group_id;
-  Ptr<Lgman::Logfile_group> ptr;
-  if(m_lgman->m_logfile_group_hash.find(ptr, key))
-  {
-    m_latest_lsn = ptr.p->m_last_lsn++;
-    return m_lgman->get_log_buffer(ptr, sz);
-  }
-  return 0;
-}
-
 int
 Logfile_client::get_log_buffer(Signal* signal, Uint32 sz, 
 			       SimulatedBlock::Callback* callback)
 {
+  sz += 2; // lsn
   Lgman::Logfile_group key;
   key.m_logfile_group_id= m_logfile_group_id;
   Ptr<Lgman::Logfile_group> ptr;
@@ -841,20 +784,19 @@
 void
 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
 {
-  Logfile_group::Position reader= ptr.p->m_pos[READER];
-  Logfile_group::Position writer= ptr.p->m_pos[WRITER];
+  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
+  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
  
   jamEntry();
 
-  if(ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL ||
-     reader.m_current_page == writer.m_current_page)
+  if(consumer.m_current_page == producer.m_current_page)
   {
 #if 0
     ndbout_c("ptr.p->m_file_pos[HEAD].m_ptr_i= %x", 
 	     ptr.p->m_file_pos[HEAD].m_ptr_i);
-    ndbout_c("reader.m_current_page: %d %d writer.m_current_page: %d %d",
-	     reader.m_current_page.m_ptr_i, reader.m_current_page.m_idx,
-	     writer.m_current_page.m_ptr_i, writer.m_current_page.m_idx);
+    ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
+	     consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
+	     producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
 #endif
 
     signal->theData[0] = LgmanContinueB::FLUSH_LOG;
@@ -865,28 +807,29 @@
   
   bool full= false;
   Uint32 tot= 0;
-  while(!(reader.m_current_page == writer.m_current_page) && !full)
+  while(!(consumer.m_current_page == producer.m_current_page) && !full)
   {
     Uint32 cnt; // pages written
-    Uint32 page= reader.m_current_pos.m_ptr_i;
-    if(reader.m_current_page.m_ptr_i == writer.m_current_page.m_ptr_i)
+    Uint32 page= consumer.m_current_pos.m_ptr_i;
+    if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
     {
-      if(reader.m_current_page.m_idx > writer.m_current_page.m_idx)
+      if(consumer.m_current_page.m_idx > producer.m_current_page.m_idx)
       {
 	jam();
-	Uint32 tmp= reader.m_current_page.m_idx - writer.m_current_page.m_idx;
+	Uint32 tmp= 
+	  consumer.m_current_page.m_idx - producer.m_current_page.m_idx;
 	cnt= write_log_pages(signal, ptr, page, tmp);
 	assert(cnt <= tmp);
 	
-	reader.m_current_pos.m_ptr_i += cnt;
-	reader.m_current_page.m_idx -= cnt;
+	consumer.m_current_pos.m_ptr_i += cnt;
+	consumer.m_current_page.m_idx -= cnt;
 	full= (tmp > cnt);
       }
       else
       {
 	// Only 1 chunk
 	ndbassert(ptr.p->m_buffer_pages.getSize() == 2); 
-	Uint32 tmp= reader.m_current_page.m_idx + 1;
+	Uint32 tmp= consumer.m_current_page.m_idx + 1;
 	cnt= write_log_pages(signal, ptr, page, tmp);
 	assert(cnt <= tmp);
 
@@ -897,9 +840,9 @@
 	   * Entire chunk is written
 	   *   move to next
 	   */
-	  ptr.p->m_pos[READER].m_current_page.m_idx= 0;
-	  next_page(ptr.p, READER);
-	  reader = ptr.p->m_pos[READER];
+	  ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
+	  next_page(ptr.p, CONSUMER);
+	  consumer = ptr.p->m_pos[CONSUMER];
  	}
 	else
 	{
@@ -908,14 +851,14 @@
 	   * Failed to write entire chunk...
 	   */
 	  full= true;
-	  reader.m_current_page.m_idx -= cnt;
-	  reader.m_current_pos.m_ptr_i += cnt;
+	  consumer.m_current_page.m_idx -= cnt;
+	  consumer.m_current_pos.m_ptr_i += cnt;
 	}
       }
     }
     else
     {
-      Uint32 tmp= reader.m_current_page.m_idx + 1;
+      Uint32 tmp= consumer.m_current_page.m_idx + 1;
       cnt= write_log_pages(signal, ptr, page, tmp);
       assert(cnt <= tmp);
 
@@ -926,9 +869,9 @@
 	 * Entire chunk is written
 	 *   move to next
 	 */
-	ptr.p->m_pos[READER].m_current_page.m_idx= 0;
-	next_page(ptr.p, READER);
-	reader = ptr.p->m_pos[READER];
+	ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
+	next_page(ptr.p, CONSUMER);
+	consumer = ptr.p->m_pos[CONSUMER];
       }
       else
       {
@@ -937,16 +880,15 @@
 	 * Failed to write entire chunk...
 	 */
 	full= true;
-	reader.m_current_page.m_idx -= cnt;
-	reader.m_current_pos.m_ptr_i += cnt;
+	consumer.m_current_page.m_idx -= cnt;
+	consumer.m_current_pos.m_ptr_i += cnt;
       }
     }
 
     tot += cnt;
-    ptr.p->m_free_buffer_words += (cnt * File_formats::UNDO_PAGE_WORDS);
   }
 
-  ptr.p->m_pos[READER]= reader;
+  ptr.p->m_pos[CONSUMER]= consumer;
   
   signal->theData[0] = LgmanContinueB::FLUSH_LOG;
   signal->theData[1] = ptr.i;
@@ -1008,8 +950,6 @@
     return 0;
   }
   
-  filePtr.p->m_online.m_outstanding++;
-  
   Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
   Uint32 max, pages= in_pages;
 
@@ -1042,7 +982,8 @@
     
     sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 
 	       FsReadWriteReq::FixedLength + 1, JBA);
-    
+
+    filePtr.p->m_online.m_outstanding = max;
   }
   else
   {
@@ -1052,6 +993,8 @@
     
     sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 
 	       FsReadWriteReq::FixedLength + 1, JBA);
+
+    filePtr.p->m_online.m_outstanding = max;
     
     Ptr<Undofile> next = filePtr;
     LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files);
@@ -1088,10 +1031,13 @@
   Ptr<Undofile> filePtr;
   m_file_pool.getPtr(filePtr, conf->userPointer);
 
-  ndbassert(filePtr.p->m_online.m_outstanding);
-  filePtr.p->m_online.m_outstanding--;
-
+  Uint32 cnt= filePtr.p->m_online.m_outstanding;
+  ndbassert(cnt);
+  filePtr.p->m_online.m_outstanding= 0;
   
+  Ptr<Logfile_group> lg_ptr;
+  m_logfile_group_pool.getPtr(lg_ptr, filePtr.p->m_logfile_group_ptr_i);
+  lg_ptr.p->m_free_buffer_words += (cnt * File_formats::UNDO_PAGE_WORDS);
 }
 
 void
@@ -1260,6 +1206,57 @@
   return -1;
 }
 
+template<Uint32 cnt>
+Uint64
+Logfile_client::add_entry(const Change* src)
+{
+  Uint32 i, tot= 0;
+  for(i= 0; i<cnt; i++)
+  {
+    tot += src[i].len;
+  }
+  
+  Uint32 *dst;
+  Uint64 last_lsn= m_lgman->m_last_lsn;
+  {
+    Lgman::Logfile_group key;
+    key.m_logfile_group_id= m_logfile_group_id;
+    Ptr<Lgman::Logfile_group> ptr;
+    if(m_lgman->m_logfile_group_hash.find(ptr, key))
+    {
+      Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
+      if(last_lsn_filegroup == last_lsn)
+      {
+	dst= m_lgman->get_log_buffer(ptr, tot);
+	for(i= 0; i<cnt; i++)
+	{
+	  memcpy(dst, src[i].ptr, 4*src[i].len);
+	  dst += src[i].len;
+	}
+	* (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
+	ptr.p->m_free_buffer_words += 2;
+      }
+      else
+      {
+	assert(false);
+	dst= m_lgman->get_log_buffer(ptr, tot + 2);
+	* dst++ = last_lsn >> 32;
+	* dst++ = last_lsn & 0xFFFFFFFF;
+	for(i= 0; i<cnt; i++)
+	{
+	  memcpy(dst, src[i].ptr, 4*src[i].len);
+	  dst += src[i].len;
+	}
+      }
+    }
+    
+    m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
+    
+    return last_lsn;
+  }
+}
+
+template Uint64 Logfile_client::add_entry<1>(const Change*);
 template Uint64 Logfile_client::add_entry<3>(const Change*);
 
 void
@@ -1307,7 +1304,7 @@
     /**
      * Use log buffer memory when reading
      */
-    Uint32 page_id = ptr.p->m_pos[READER].m_current_pos.m_ptr_i;
+    Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
     file_ptr.p->m_online.m_outstanding= page_id;
     
     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
@@ -1344,7 +1341,7 @@
     ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
     ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) +
1;
     
-    Uint32 page_id = ptr.p->m_pos[READER].m_current_pos.m_ptr_i;
+    Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
     file_ptr.p->m_online.m_outstanding= page_id;
 
     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
@@ -1376,11 +1373,22 @@
   
   m_file_pool.getPtr(ptr, conf->userPointer);
   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
-  
+
+  if(ptr.p->m_state == Undofile::FS_EXECUTING)
+  {
+    jam();
+    Uint32 pages= ptr.p->m_online.m_outstanding;
+    ndbassert(pages);
+    lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx +=
+      (pages * File_formats::UNDO_PAGE_WORDS);
+    ptr.p->m_online.m_outstanding= 0;
+    return;
+  }
+
   Ptr<GlobalPage> page_ptr;
   m_global_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
   ptr.p->m_online.m_outstanding= 0;
-
+  
   File_formats::Undofile::Undo_page* page = 
     (File_formats::Undofile::Undo_page*)page_ptr.p;
   
@@ -1397,9 +1405,8 @@
     jam();
     find_log_head_in_file(signal, lg_ptr, ptr, lsn);
     return;
-  case Undofile::FS_EXECUTING:
-    ndbrequire(false);
   default:
+  case Undofile::FS_EXECUTING:
   case Undofile::FS_CREATING:
   case Undofile::FS_DROPPING:
   case Undofile::FS_ONLINE:
@@ -1409,6 +1416,9 @@
     ndbrequire(false);
   }
 
+  /**
+   * Prepare for execution
+   */
   ptr.p->m_state = Undofile::FS_EXECUTING;
   ptr.p->m_online.m_first_lsn = lsn;
   
@@ -1497,7 +1507,7 @@
 	     tail, file_ptr.p->m_online.m_first_lsn,
 	     head, curr);
 #endif
-    Uint32 page_id = ptr.p->m_pos[READER].m_current_pos.m_ptr_i;
+    Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
     file_ptr.p->m_online.m_outstanding= page_id;
     
     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
@@ -1521,10 +1531,11 @@
 #ifdef PRINT_SEARCH    
   ndbout_c("-> found last page: %d", tail);
 #endif
-
+  
+  file_ptr.p->m_state = Undofile::FS_EXECUTING;
   ptr.p->m_last_lsn = file_ptr.p->m_online.m_first_lsn;
-  ptr.p->m_last_written_lsn = file_ptr.p->m_online.m_first_lsn;
-
+  ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_first_lsn;
+  
   /**
    * Set HEAD position
    */
@@ -1565,7 +1576,7 @@
 		file_ptr.p->m_online.m_first_lsn);
     }
   }
-
+  
   /**
    * Start next logfile group
    */
@@ -1592,9 +1603,57 @@
     list.next(group);
     list.remove(ptr);
 
+    {
+      /**
+       * Init buffer pointers
+       */
+      Lgman::Page_map::Iterator it;
+      Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
+      ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
+      ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
+      Uint32 idx= 
+	ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = map.getSize()-2;
+      map.position(it, idx);
+      
+      union {
+	Uint32 _tmp[2];
+	Lgman::Buffer_idx range;
+      };
+      _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
+      
+      if(idx == ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i)
+      {
+	ndbassert(map.getSize() == 2);
+	/**
+	 * Only 1 page_range
+	 */
+	range.m_idx --;
+	range.m_ptr_i ++;
+      }
+      ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i= range.m_ptr_i+ range.m_idx;
+      ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx;
+      
+      Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
+      File_formats::Undofile::Undo_page* pageP = 
+	(File_formats::Undofile::Undo_page*)m_global_page_pool.getPtr(page);
+      
+      ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
+      ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pageP->m_words_used;
+    }
+    
+    /**
+     * Start consumer thread
+     */
+    signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
+    signal->theData[1] = ptr.i;
+    sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+
+    /**
+     * Insert in correct postion in list of logfile_group's
+     */
     Ptr<Logfile_group> pos;
     for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
-      if(ptr.p->m_last_lsn >= pos.p->m_last_lsn)
+      if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
 	break;
     
     if(pos.isNull())
@@ -1603,4 +1662,299 @@
       tmp.insert(ptr, pos);
   }
   list = tmp;
+
+  execute_undo_record(signal);
+}
+
+void
+Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
+{
+  Uint32 cnt, free= ptr.p->m_free_buffer_words;
+  if(free <= File_formats::UNDO_PAGE_WORDS)
+  {
+    ndbout_c("free: %d", free);
+    signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
+    signal->theData[1] = ptr.i;
+    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
+    return;
+  }
+
+  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
+  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
+
+  if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
+     producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
+  {
+    Uint32 max= 
+      producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
+    ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
+    
+    cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
+    ndbassert(cnt <= max);    
+    producer.m_current_pos.m_ptr_i -= cnt;
+    producer.m_current_page.m_idx -= cnt;
+  }
+  else
+  {
+    Uint32 max= producer.m_current_page.m_idx;
+    ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
+    cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
+    ndbassert(cnt <= max);
+    producer.m_current_pos.m_ptr_i -= cnt;
+    producer.m_current_page.m_idx -= cnt;
+    if(cnt == max)
+    {
+      // Switch range
+      Lgman::Page_map::Iterator it;
+      Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
+      Uint32 sz = map.getSize();
+      Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
+      map.position(it, pos);
+      union {
+	Uint32 _tmp[2];
+	Lgman::Buffer_idx range;
+      };
+      
+      _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
+      producer.m_current_page.m_ptr_i = pos;
+      producer.m_current_page.m_idx = range.m_idx;
+      producer.m_current_pos.m_ptr_i = range.m_ptr_i;
+    }
+  } 
+  
+  free -= cnt *= File_formats::UNDO_PAGE_WORDS;
+  ptr.p->m_free_buffer_words = free;
+  ptr.p->m_pos[PRODUCER] = producer;  
+
+  signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
+  signal->theData[1] = ptr.i;
+
+  if(free > File_formats::UNDO_PAGE_WORDS)
+    sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+  else
+    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
+}
+
+Uint32
+Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr, 
+		       Uint32 pageId, Uint32 pages)
+{
+  Ptr<Undofile> filePtr;
+  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
+  m_file_pool.getPtr(filePtr, tail.m_ptr_i);
+  
+  if(filePtr.p->m_online.m_outstanding > 0)
+  {
+    jam();
+    return 0;
+  }
+  
+  Uint32 max= tail.m_idx;
+
+  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
+  req->filePointer = filePtr.p->m_online.m_fd;
+  req->userReference = reference();
+  req->userPointer = filePtr.i;
+  req->operationFlag = 0;
+  FsReadWriteReq::setFormatFlag(req->operationFlag, 
+				FsReadWriteReq::fsFormatGlobalPage);
+
+  if(max > pages)
+  {
+    jam();
+    tail.m_idx -= pages;
+
+    ndbout_c("reading %d pages from %d (fileSize: %d)",
+	     pages, 1+tail.m_idx, filePtr.p->m_file_size);
+    req->varIndex = 1 + tail.m_idx;
+    req->numberOfPages = pages;
+    req->data.pageData[0] = pageId - pages;
+    ptr.p->m_file_pos[TAIL] = tail;
+    
+    sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 
+	       FsReadWriteReq::FixedLength + 1, JBA);
+    
+    filePtr.p->m_online.m_outstanding = pages;
+    max = pages;
+  }
+  else
+  {
+    jam();
+
+    ndbassert(tail.m_idx - max == 0);
+    req->varIndex = 1;
+    req->numberOfPages = max;
+    req->data.pageData[0] = pageId - max;
+
+    ndbout_c("reading %d pages from %d (fileSize: %d)",
+	     max, 1, filePtr.p->m_file_size);
+    
+    sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 
+	       FsReadWriteReq::FixedLength + 1, JBA);
+    
+    filePtr.p->m_online.m_outstanding = max;
+    
+    Ptr<Undofile> prev = filePtr;
+    LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files);
+    if(!files.prev(prev))
+    {
+      jam();
+      files.last(prev);
+    }
+    ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
+
+    tail.m_idx= prev.p->m_file_size - 1;
+    tail.m_ptr_i= prev.i;
+    ptr.p->m_file_pos[TAIL] = tail;
+    if(max < pages && filePtr.i != prev.i)
+      max += read_undo_pages(signal, ptr, pageId - max, pages - max);
+  }
+  
+  return max;
+
+}
+
+void
+Lgman::execute_undo_record(Signal* signal)
+{
+  const Uint32* ptr;
+  if((ptr = get_next_undo_record()))
+  {
+    Uint32 len= (* ptr) & 0xFFFF;
+    Uint32 type= (* ptr) >> 16;
+    switch(type){
+    default:
+      ndbout_c("Found undo: %d len: %d", type & 0x7FFF, len);
+    }
+  }
+  
+  signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
+  return;
+}
+
+const Uint32*
+Lgman::get_next_undo_record()
+{
+  Ptr<Logfile_group> ptr;
+  m_logfile_group_list.first(ptr);
+
+  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
+  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
+  if(producer.m_current_pos.m_idx < File_formats::UNDO_PAGE_WORDS + 1)
+  {
+    jam();
+    //ndbout_c("waiting for fetch");
+    /**
+     * Wait for fetching pages...
+     */
+    return 0;
+  }
+  
+  Uint32 pos = consumer.m_current_pos.m_idx;
+  Uint32 page = consumer.m_current_pos.m_ptr_i;
+  
+  File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*)
+    m_global_page_pool.getPtr(page);
+  
+  ndbrequire(pos > 0);
+  Uint32 *record= pageP->m_data + pos - 1;
+  Uint32 len= (* record) & 0xFFFF;
+  ndbassert(len);
+
+  ndbassert(pos >= len);
+  pos -= len;
+  
+  ndbassert(producer.m_current_pos.m_idx > len);
+  producer.m_current_pos.m_idx -= len;
+  
+  Uint32 *prev= record - len;
+  Uint64 lsn = 0;
+  if(pos == 0)
+  {
+    /**
+     * Switching page
+     */
+    if(consumer.m_current_page.m_idx)
+    {
+      consumer.m_current_page.m_idx--;   // left in range
+      consumer.m_current_pos.m_ptr_i --; // page
+    }
+    else
+    {
+      // 0 pages left in range...switch range
+      Lgman::Page_map::Iterator it;
+      Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
+      Uint32 sz = map.getSize();
+      Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
+      
+      map.position(it, tmp);
+      union {
+	Uint32 _tmp[2];
+	Lgman::Buffer_idx range;
+      };
+      
+      _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
+      
+      consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
+      consumer.m_current_page.m_ptr_i = tmp;           // pos in map
+
+      consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
+    }
+
+    pageP=(File_formats::Undofile::Undo_page*)
+      m_global_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
+    
+    pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
+    
+    lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
+    lsn += pageP->m_page_header.m_page_lsn_lo;
+    
+    prev = pageP->m_data + pos - 1;
+    
+    if(((* prev) >> 16) & File_formats::Undofile::Undofile::UNDO_NEXT_LSN)
+      ndbassert(lsn + 1 == ptr.p->m_last_read_lsn);
+
+    if(pos == 0)
+      ndbout_c("FOUND EMPTY PAGE");
+    ptr.p->m_pos[CONSUMER] = consumer;
+  }
+  else
+  {
+    // Same page
+    if(((* prev) >> 16) & File_formats::Undofile::Undofile::UNDO_NEXT_LSN)
+    {
+      lsn = ptr.p->m_last_read_lsn - 1;
+    }
+    else
+    {
+      ndbassert(false);
+      ndbassert(pos >= 3);
+      lsn += * (prev - 2); lsn <<= 32;
+      lsn += * (prev - 1);
+    }
+    ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
+  }
+  
+  ptr.p->m_last_read_lsn = lsn;
+  
+  /**
+   * Re-sort log file groups
+   */
+  Ptr<Logfile_group> sort = ptr;
+  if(m_logfile_group_list.next(sort))
+  {
+    while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
+      m_logfile_group_list.next(sort);
+    
+    if(sort.i != ptr.p->nextList)
+    {
+      m_logfile_group_list.remove(ptr);
+      if(sort.isNull())
+	m_logfile_group_list.add(ptr);
+      else
+	m_logfile_group_list.insert(ptr, sort);
+    }
+  }
+  return record;
 }

--- 1.25/ndb/src/kernel/blocks/lgman.hpp	Fri Apr 15 07:58:20 2005
+++ 1.26/ndb/src/kernel/blocks/lgman.hpp	Tue Apr 19 15:21:45 2005
@@ -151,7 +151,10 @@
 
     Uint64 m_last_lsn;
     Uint64 m_last_synced_lsn;
-    Uint64 m_last_written_lsn;
+    union {
+      Uint64 m_last_written_lsn;
+      Uint64 m_last_read_lsn;
+    };
     Uint64 m_current_sync_lsn;
 
     Buffer_idx m_tail_pos[3]; // 0 is cut, 1 is saved, 2 is current
@@ -165,7 +168,7 @@
     DLFifoList<Log_buffer_waiter>::Head m_log_buffer_waiters;
     Page_map::Head m_buffer_pages; // Pairs of { ptr.i, count }
     struct Position {
-      Buffer_idx m_current_page;   // { m_buffer_pages, left in range }
+      Buffer_idx m_current_page;   // { m_buffer_pages.i, left in range }
       Buffer_idx m_current_pos;    // { page ptr.i, m_words_used }
     } m_pos[2]; // 0 is reader (lgman) 1 is writer (tup)
 
@@ -201,6 +204,7 @@
 
   Page_map::DataBufferPool m_data_buffer_pool;
 
+  Uint64 m_last_lsn;
   Uint32 m_latest_lcp;
   DLHashTable<Undofile> m_file_hash;
   DLFifoList<Logfile_group> m_logfile_group_list;
@@ -223,6 +227,12 @@
   void find_log_head_in_file(Signal*,
Ptr<Logfile_group>,Ptr<Undofile>,Uint64);
 
   void init_run_undo_log(Signal*);
+  void read_undo_log(Signal*, Ptr<Logfile_group> ptr);
+  Uint32 read_undo_pages(Signal*, Ptr<Logfile_group>, 
+			 Uint32 pageId, Uint32 pages);
+  
+  void execute_undo_record(Signal*);
+  const Uint32* get_next_undo_record();
 };
 
 class Logfile_client {
@@ -279,33 +289,8 @@
   int get_log_buffer(Signal*, Uint32 sz, SimulatedBlock::Callback* m_callback);
   
 private:
-  Uint64 m_latest_lsn;
   Uint32* get_log_buffer(Uint32 sz);
 };
 
-template<Uint32 cnt>
-Uint64
-Logfile_client::add_entry(const Change* src)
-{
-  Uint32 i, tot= 0;
-  for(i= 0; i<cnt; i++)
-  {
-    tot += src[i].len;
-  }
-  Uint32 *dst= get_log_buffer(tot);
-  for(i= 0; i<cnt; i++)
-  {
-    memcpy(dst, src[i].ptr, 4*src[i].len);
-    dst += src[i].len;
-  }
-  
-#if 0
-  const Uint32 *data= (Uint32*)src[j].ptr;
-  for(Uint32 i= 0; i<src[j].len >> 2; i++)
-    printf("%.8x ", data[i]);
-  printf("\n");
-#endif
-  return m_latest_lsn;
-}
 
 #endif

--- 1.3/ndb/src/kernel/blocks/print_file.cpp	Tue Mar 29 11:57:36 2005
+++ 1.4/ndb/src/kernel/blocks/print_file.cpp	Tue Apr 19 15:21:45 2005
@@ -227,7 +227,13 @@
 	   << page->m_page_header.m_page_lsn_hi << " " 
 	   << page->m_page_header.m_page_lsn_lo << "] " 
 	   << "words used: " << page->m_words_used << endl;
-    
+
+    Uint64 lsn= 0;
+    lsn += page->m_page_header.m_page_lsn_hi;
+    lsn <<= 32;
+    lsn += page->m_page_header.m_page_lsn_lo;
+    lsn++;
+
     if(g_verbosity >= 3)
     {
       Uint32 pos= page->m_words_used - 1;
@@ -235,22 +241,37 @@
       {
 	Uint32 len= page->m_data[pos] & 0xFFFF;
 	Uint32 type= page->m_data[pos] >> 16;
-	const Uint32* src= page->m_data + pos - (len >> 2) + 1;
+	const Uint32* src= page->m_data + pos - len + 1;
+	Uint32 next_pos= pos - len;
+	if(type & File_formats::Undofile::UNDO_NEXT_LSN)
+	{
+	  type &= ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
+	  lsn--;
+	}
+	else
+	{
+	  lsn = 0;
+	  lsn += * (src - 2);
+	  lsn <<= 32;
+	  lsn += * (src - 1);
+	  next_pos -= 2;
+	}
 	if(g_verbosity > 3)
-	  printf(" %.4d - %.4d : ", pos - (len >> 2) + 1, pos);
+	  printf(" %.4d - %.4d : ", pos - len + 1, pos);
 	switch(type){
 	case File_formats::Undofile::UNDO_NOOP:
 	  if(g_verbosity > 3)
 	    printf("[ NOOP %d ]", len);
 	  break;
 	case File_formats::Undofile::UNDO_LCP:
-	  printf("[ LCP %d ]",page->m_data[pos-1]);
+	  printf("[ %lld LCP %d ]", lsn, page->m_data[pos-1]);
 	  break;
 	case File_formats::Undofile::UNDO_TUP_ALLOC:
 	  if(g_verbosity > 3)
 	  {
 	    Dbtup::Disk_undo::Alloc *req= (Dbtup::Disk_undo::Alloc*)src;
-	    printf("[ A %d %d %d ]",
+	    printf("[ %%d A %d %d %d ]",
+		   lsn,
 		   req->m_file_no_page_idx >> 16,
 		   req->m_file_no_page_idx & 0xFFFF,
 		   req->m_page_no);
@@ -260,7 +281,8 @@
 	  if(g_verbosity > 3)
 	  {
 	    Dbtup::Disk_undo::Update *req= (Dbtup::Disk_undo::Update*)src;
-	    printf("[ U %d %d %d gci: %d ]",
+	    printf("[ %lld U %d %d %d gci: %d ]",
+		   lsn,
 		   req->m_file_no_page_idx >> 16,
 		   req->m_file_no_page_idx & 0xFFFF,
 		   req->m_page_no,
@@ -271,7 +293,8 @@
 	  if(g_verbosity > 3)
 	  {
 	    Dbtup::Disk_undo::Free *req= (Dbtup::Disk_undo::Free*)src;
-	    printf("[ F %d %d %d gci: %d ]",
+	    printf("[ %lld F %d %d %d gci: %d ]",
+		   lsn,
 		   req->m_file_no_page_idx >> 16,
 		   req->m_file_no_page_idx & 0xFFFF,
 		   req->m_page_no,
@@ -293,7 +316,7 @@
 	  }
 	  assert(len && type);
 	}
-	pos -= (len >> 2);
+	pos = next_pos;
 	if(g_verbosity > 3)
 	  printf("\n");
       }

--- 1.30/ndb/src/kernel/blocks/tsman.cpp	Wed Apr 13 13:42:51 2005
+++ 1.31/ndb/src/kernel/blocks/tsman.cpp	Tue Apr 19 15:21:45 2005
@@ -931,9 +931,12 @@
       /**
        * Undo
        */
+      Uint64 lsn = 0;
+#if 0
       Logfile_client::Change entry = { header, eh_words };
       Uint64 lsn = ts_ptr.p->m_logfile_client.add_entry(preq.m_page, 
 							ptr_p, &entry);
+#endif
       
       /**
        * Init header
@@ -1129,8 +1132,11 @@
     /**
      * Undo
      */
+    Uint64 lsn = 0;
+#if 0
     Uint64 lsn = ts_ptr.p->m_logfile_client.add_entry(preq.m_page, 
 						      offset, old_value);
+#endif
     
     /**
      * Update lsn
Thread
bk commit into 5.1-ndb tree (joreland:1.1860)jonas.oreland19 Apr