List:Commits« Previous MessageNext Message »
From:jonas Date:February 3 2006 10:53am
Subject:bk commit into 5.1 tree (jonas:1.2130)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2130 06/02/03 10:53:43 jonas@eel.(none) +2 -0
  ndb dd -
    Fix possible deadlock when running with small log buffer, causing buffer to be full,
but not page getting flushed

  storage/ndb/src/kernel/blocks/lgman.hpp
    1.4 06/02/03 10:53:38 jonas@eel.(none) +1 -1
    Fix possible deadlock when running with small log buffer, causing buffer to be full,
but not page getting flushed

  storage/ndb/src/kernel/blocks/lgman.cpp
    1.7 06/02/03 10:53:38 jonas@eel.(none) +128 -67
    Fix possible deadlock when running with small log buffer, causing buffer to be full,
but not page getting flushed

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	eel.(none)
# Root:	/home/jonas/src/mysql-5.1-new

--- 1.6/storage/ndb/src/kernel/blocks/lgman.cpp	2006-02-02 12:23:20 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/lgman.cpp	2006-02-03 10:53:38 +01:00
@@ -175,7 +175,7 @@
     jam();
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
-    flush_log(signal, ptr);
+    flush_log(signal, ptr, signal->theData[2]);
     return;
   }
   case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
@@ -261,12 +261,25 @@
     while(!ptr.isNull())
     {
       infoEvent("lfg %d state: %x fs: %d lsn "
-		"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d",
+		"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d",
 		ptr.p->m_logfile_group_id, ptr.p->m_state, 
 		ptr.p->m_outstanding_fs,
 		ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
 		ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
+		!ptr.p->m_log_buffer_waiters.isEmpty(),
 		!ptr.p->m_log_sync_waiters.isEmpty());
+      if (!ptr.p->m_log_buffer_waiters.isEmpty())
+      {
+	Uint32 free_buffer= ptr.p->m_free_buffer_words;
+	LocalDLFifoList<Log_waiter> 
+	  list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
+	Ptr<Log_waiter> waiter;
+	list.first(waiter);
+	infoEvent("  free_buffer_words: %d head(waiters).sz: %d %d",
+		  ptr.p->m_free_buffer_words,
+		  waiter.p->m_size,
+		  2*File_formats::UNDO_PAGE_WORDS);
+      }
       m_logfile_group_list.next(ptr);
     }
   }
@@ -398,7 +411,7 @@
 				 Uint32 ref, Uint32 data)
 {
   jam();
-  ndbassert(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
+  ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
   ndbrequire(ptr.p->m_meta_files.isEmpty());
   ndbrequire(ptr.p->m_outstanding_fs == 0);
 
@@ -696,7 +709,8 @@
       lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
       signal->theData[1] = lg_ptr.i;
-      sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+      signal->theData[2] = 0;
+      sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
     }
   }
   else
@@ -1072,8 +1086,8 @@
       /**
        * Update free space with extra NOOP
        */
-      ndbassert(ptr.p->m_free_file_words >= free);
-      ndbassert(ptr.p->m_free_buffer_words >= free);
+      ndbrequire(ptr.p->m_free_file_words >= free);
+      ndbrequire(ptr.p->m_free_buffer_words >= free);
       ptr.p->m_free_file_words -= free;
       ptr.p->m_free_buffer_words -= free;
       
@@ -1090,7 +1104,7 @@
      last.p->m_sync_lsn > force_lsn && 
      ptr.p->m_last_sync_req_lsn < last.p->m_sync_lsn)
   {
-    ndbassert(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
+    ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
     signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
     signal->theData[1] = ptr.i;
     signal->theData[2] = last.p->m_sync_lsn >> 32;
@@ -1157,7 +1171,7 @@
   {
 next:
     // fits this page wo/ problem
-    ndbassert(total_free >= sz);
+    ndbrequire(total_free >= sz);
     ptr.p->m_free_buffer_words = total_free - sz;
     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
     return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
@@ -1176,7 +1190,7 @@
   /**
    * Update free space with extra NOOP
    */
-  ndbassert(ptr.p->m_free_file_words >= free);
+  ndbrequire(ptr.p->m_free_file_words >= free);
   ptr.p->m_free_file_words -= free;
 
   validate_logfile_group(ptr, "get_log_buffer");
@@ -1246,17 +1260,15 @@
       
       empty= list.isEmpty();
       if(!list.seize(wait))
+      {
 	return -1;
-      
+      }      
+
       wait.p->m_size= sz;
       wait.p->m_block= m_block;
       memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback));
     }
-	
-    if(empty)
-    { // Start ContinueB
-      m_lgman->process_log_buffer_waiters(signal, ptr);
-    }
+
     return 0;
   }
   return -1;
@@ -1283,7 +1295,7 @@
 }
 
 void
-Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
+Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
 {
   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
@@ -1292,26 +1304,75 @@
 
   if(consumer.m_current_page == producer.m_current_page)
   {
+
 #if 0
-    ndbout_c("ptr.p->m_file_pos[HEAD].m_ptr_i= %x", 
-	     ptr.p->m_file_pos[HEAD].m_ptr_i);
-    ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
-	     consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
-	     producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
+    if (force)
+    {
+      ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x", 
+	       force, ptr.p->m_file_pos[HEAD].m_ptr_i);
+      ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
+	       consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
+	       producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
+    }
 #endif
     if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
     {
       jam();
-      signal->theData[0] = LgmanContinueB::FLUSH_LOG;
-      signal->theData[1] = ptr.i;
-      sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
+
+      if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs)
+      {
+	force =  0;
+      }
+      
+      if (force < 2)
+      {
+	signal->theData[0] = LgmanContinueB::FLUSH_LOG;
+	signal->theData[1] = ptr.i;
+	signal->theData[2] = force + 1;
+	sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 
+			    force ? 10 : 100, 3);
+	return;
+      }
+      else
+      {
+	Buffer_idx pos= producer.m_current_pos;
+	GlobalPage *page = m_global_page_pool.getPtr(pos.m_ptr_i);
+	
+	Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
+
+	ndbout_c("force flush %d", free);
+	
+	ndbrequire(pos.m_idx); // don't flush empty page...
+	Uint64 lsn= ptr.p->m_last_lsn - 1;
+	
+	File_formats::Undofile::Undo_page* undo= 
+	  (File_formats::Undofile::Undo_page*)page;
+	undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
+	undo->m_page_header.m_page_lsn_hi = lsn >> 32;
+	undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
+	
+	/**
+	 * Update free space with extra NOOP
+	 */
+	ndbrequire(ptr.p->m_free_file_words >= free);
+	ndbrequire(ptr.p->m_free_buffer_words >= free);
+	ptr.p->m_free_file_words -= free;
+	ptr.p->m_free_buffer_words -= free;
+	
+	validate_logfile_group(ptr, "force_log_flush");
+	
+	next_page(ptr.p, PRODUCER);
+	ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
+	producer = ptr.p->m_pos[PRODUCER];
+	// break through
+      }
     }
     else
     {
       jam();
       ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
+      return;
     }
-    return;
   }
   
   bool full= false;
@@ -1339,7 +1400,7 @@
       else
       {
 	// Only 1 chunk
-	ndbassert(ptr.p->m_buffer_pages.getSize() == 2); 
+	ndbrequire(ptr.p->m_buffer_pages.getSize() == 2); 
 	Uint32 tmp= consumer.m_current_page.m_idx + 1;
 	cnt= write_log_pages(signal, ptr, page, tmp);
 	assert(cnt <= tmp);
@@ -1407,19 +1468,13 @@
   {
     signal->theData[0] = LgmanContinueB::FLUSH_LOG;
     signal->theData[1] = ptr.i;
-    sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+    signal->theData[2] = 0;
+    sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
   }
   else
   {
     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
   }
-  
-  if(tot > 0 && !ptr.p->m_log_buffer_waiters.isEmpty() &&
-     !(ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
-  {
-    jam();
-    process_log_buffer_waiters(signal, ptr);
-  }
 }
 
 void
@@ -1457,7 +1512,7 @@
   }
   else
   {
-    ptr.p->m_state &= (Uint32)Logfile_group::LG_WAITERS_THREAD;
+    ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
   }
 }
 
@@ -1593,14 +1648,14 @@
   Ptr<Undofile> ptr;
   m_file_pool.getPtr(ptr, conf->userPointer);
 
-  ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING);
+  ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
 
   Ptr<Logfile_group> lg_ptr;
   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
   
   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
-  ndbassert(cnt);
+  ndbrequire(cnt);
   
   if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
   {
@@ -1611,7 +1666,7 @@
     {
       Uint32 state= ptr.p->m_state;
       Uint32 pages= ptr.p->m_online.m_outstanding;
-      ndbassert(pages);
+      ndbrequire(pages);
       ptr.p->m_online.m_outstanding= 0;
       ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
       tot += pages;
@@ -1632,6 +1687,11 @@
     {
       process_log_sync_waiters(signal, lg_ptr);
     }
+
+    if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
+    {
+      process_log_buffer_waiters(signal, lg_ptr);
+    }
   }
   
   return;
@@ -1679,7 +1739,7 @@
       undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
       Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
       memcpy(dst, undo, sizeof(undo));
-      ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
+      ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
       ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
     }
     else
@@ -1688,7 +1748,7 @@
       * dst++ = last_lsn >> 32;
       * dst++ = last_lsn & 0xFFFFFFFF;
       memcpy(dst, undo, sizeof(undo));
-      ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
+      ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
       ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
     }
     ptr.p->m_last_lcp_lsn = last_lsn;
@@ -1802,13 +1862,13 @@
 	Ptr<Undofile> next = filePtr;
 	LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files);
 	while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
-	  ndbassert(next.i != filePtr.i);
+	  ndbrequire(next.i != filePtr.i);
 	if(next.isNull())
 	{
 	  jam();
 	  files.first(next);
 	  while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
-	    ndbassert(next.i != filePtr.i);
+	    ndbrequire(next.i != filePtr.i);
 	}
 	
 	tmp.m_idx= 0;
@@ -1862,7 +1922,7 @@
 int
 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
 {
-  ndbassert(words);
+  ndbrequire(words);
   words += 2; // lsn
   Logfile_group key;
   key.m_logfile_group_id= ref;
@@ -1886,7 +1946,7 @@
 int
 Lgman::free_log_space(Uint32 ref, Uint32 words)
 {
-  ndbassert(words);
+  ndbrequire(words);
   Logfile_group key;
   key.m_logfile_group_id= ref;
   Ptr<Logfile_group> ptr;
@@ -1896,7 +1956,7 @@
     validate_logfile_group(ptr, "free_log_space");
     return 0;
   }
-  ndbassert(false);
+  ndbrequire(false);
   return -1;
 }
 
@@ -2031,7 +2091,7 @@
      * All files have read first page
      *   and m_files is sorted acording to lsn
      */
-    ndbassert(!ptr.p->m_files.isEmpty());
+    ndbrequire(!ptr.p->m_files.isEmpty());
     LocalDLFifoList<Undofile> read_files(m_file_pool, ptr.p->m_files);
     read_files.last(file_ptr);
     
@@ -2080,11 +2140,11 @@
   m_file_pool.getPtr(ptr, conf->userPointer);
   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
 
-  ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING);
+  ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
   
   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
-  ndbassert(cnt);
+  ndbrequire(cnt);
   
   if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
   {
@@ -2098,7 +2158,7 @@
       {
 	Uint32 state= ptr.p->m_state;
 	Uint32 pages= ptr.p->m_online.m_outstanding;
-	ndbassert(pages);
+	ndbrequire(pages);
 	ptr.p->m_online.m_outstanding= 0;
 	ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
 	tot += pages;
@@ -2205,7 +2265,7 @@
   Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
   Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
 
-  ndbassert(head > tail);
+  ndbrequire(head > tail);
   Uint32 diff = head - tail;
   
   if(DEBUG_SEARCH_LOG_HEAD)
@@ -2442,23 +2502,23 @@
   {
     Uint32 max= 
       producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
-    ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
+    ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
-    ndbassert(cnt <= max);    
+    ndbrequire(cnt <= max);    
     producer.m_current_pos.m_ptr_i -= cnt;
     producer.m_current_page.m_idx -= cnt;
   } 
   else
   {
     Uint32 max= producer.m_current_page.m_idx;
-    ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
+    ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
-    ndbassert(cnt <= max);
+    ndbrequire(cnt <= max);
     producer.m_current_pos.m_ptr_i -= cnt;
     producer.m_current_page.m_idx -= cnt;
   } 
   
-  ndbassert(free >= cnt * File_formats::UNDO_PAGE_WORDS);
+  ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
   free -= (cnt * File_formats::UNDO_PAGE_WORDS);
   ptr.p->m_free_buffer_words = free;
   ptr.p->m_pos[PRODUCER] = producer;  
@@ -2476,7 +2536,7 @@
 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr, 
 		       Uint32 pageId, Uint32 pages)
 {
-  ndbassert(pages);
+  ndbrequire(pages);
   Ptr<Undofile> filePtr;
   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
   m_file_pool.getPtr(filePtr, tail.m_ptr_i);
@@ -2525,7 +2585,7 @@
   {
     jam();
 
-    ndbassert(tail.m_idx - max == 0);
+    ndbrequire(tail.m_idx - max == 0);
     req->varIndex = 1;
     req->numberOfPages = max;
     req->data.pageData[0] = pageId - max;
@@ -2651,7 +2711,7 @@
 
   Uint32 *record= pageP->m_data + pos - 1;
   Uint32 len= (* record) & 0xFFFF;
-  ndbassert(len);
+  ndbrequire(len);
   Uint32 *prev= record - len;
   Uint64 lsn = 0;
 
@@ -2663,7 +2723,7 @@
   }
   else
   {
-    ndbassert(pos >= 3);
+    ndbrequire(pos >= 3);
     lsn += * (prev - 1); lsn <<= 32;
     lsn += * (prev - 0);
     len += 2;
@@ -2671,14 +2731,14 @@
   }
   
 
-  ndbassert(pos >= len);
+  ndbrequire(pos >= len);
   
   if(pos == len)
   {
     /**
      * Switching page
      */
-    ndbassert(producer.m_current_pos.m_idx);
+    ndbrequire(producer.m_current_pos.m_idx);
     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
 
     if(consumer.m_current_page.m_idx)
@@ -2723,7 +2783,7 @@
     prev = pageP->m_data + pos - 1;
     
     if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
-      ndbassert(lsn + 1 == ptr.p->m_last_read_lsn);
+      ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
 
     ptr.p->m_pos[CONSUMER] = consumer;
     ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
@@ -2785,7 +2845,7 @@
       /**
        * Fix log TAIL
        */
-      ndbassert(ptr.p->m_state == 0);
+      ndbrequire(ptr.p->m_state == 0);
       ptr.p->m_state = Logfile_group::LG_ONLINE;
       Buffer_idx tail= ptr.p->m_file_pos[TAIL];
       Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
@@ -2796,7 +2856,7 @@
 	m_file_pool.getPtr(file, tail.m_ptr_i);
 	Uint32 page= tail.m_idx;
 	Uint32 size= file.p->m_file_size;
-	ndbassert(size >= page);
+	ndbrequire(size >= page);
 	Uint32 diff= size - page;
 	
 	if(pages >= diff)
@@ -2827,7 +2887,8 @@
       ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
       signal->theData[1] = ptr.i;
-      sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+      signal->theData[2] = 0;
+      sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
 
       if(1)
       {
@@ -2914,7 +2975,7 @@
     undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
     Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
     memcpy(dst, undo, sizeof(undo));
-    ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
+    ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
     ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
   }
   else
@@ -2923,7 +2984,7 @@
     * dst++ = last_lsn >> 32;
     * dst++ = last_lsn & 0xFFFFFFFF;
     memcpy(dst, undo, sizeof(undo));
-    ndbassert(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
+    ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
     ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
   }
   m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;

--- 1.3/storage/ndb/src/kernel/blocks/lgman.hpp	2006-01-11 09:26:03 +01:00
+++ 1.4/storage/ndb/src/kernel/blocks/lgman.hpp	2006-02-03 10:53:38 +01:00
@@ -258,7 +258,7 @@
   void endlcp_callback(Signal*, Uint32, Uint32);
   void open_file(Signal*, Ptr<Undofile>, Uint32 requestInfo);
 
-  void flush_log(Signal*, Ptr<Logfile_group>);
+  void flush_log(Signal*, Ptr<Logfile_group>, Uint32 force);
   Uint32 write_log_pages(Signal*, Ptr<Logfile_group>, 
 			 Uint32 pageId, Uint32 pages);
 
Thread
bk commit into 5.1 tree (jonas:1.2130)jonas3 Feb