List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:November 16 2008 9:16am
Subject:bzr commit into mysql-5.1 branch (pekka:3086) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 3086 Pekka Nousiainen	2008-11-16
      wl#4391 26b_ddmx.diff
      DD - tsman, lgman client mutex
modified:
  storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
  storage/ndb/src/kernel/blocks/lgman.cpp
  storage/ndb/src/kernel/blocks/lgman.hpp
  storage/ndb/src/kernel/blocks/pgman.cpp
  storage/ndb/src/kernel/blocks/tsman.cpp
  storage/ndb/src/kernel/blocks/tsman.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2008-11-16 09:16:35 +0000
@@ -184,6 +184,7 @@ void Dbtup::do_tup_abortreq(Signal* sign
     if (regOperPtr.p->m_undo_buffer_space)
     {
       jam();
+      D("Logfile_client - do_tup_abortreq");
       Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
       lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
     }
@@ -350,6 +351,7 @@ void Dbtup::tupkeyErrorLab(Signal* signa
       (regOperPtr->is_first_operation() && regOperPtr->is_last_operation()))
   {
     jam();
+    D("Logfile_client - tupkeyErrorLab");
     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
     lgman.free_log_space(regOperPtr->m_undo_buffer_space);
   }

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-10-03 08:22:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-11-16 09:16:35 +0000
@@ -595,6 +595,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
         disk_page_abort_prealloc(signal, regFragPtr.p, 
 				 &req.m_page, req.m_page.m_page_idx);
         
+        D("Logfile_client - execTUP_COMMITREQ");
         Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
         lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
 	goto skip_disk;
@@ -668,6 +669,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
       safe_cast(&Dbtup::disk_page_log_buffer_callback);
     Uint32 sz= regOperPtr.p->m_undo_buffer_space;
     
+    D("Logfile_client - execTUP_COMMITREQ");
     Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
     int res= lgman.get_log_buffer(signal, sz, &cb);
     jamEntry();

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2008-11-16 09:16:35 +0000
@@ -314,7 +314,8 @@ Dbtup::restart_setup_page(Disk_alloc_inf
     page.m_file_no = pagePtr.p->m_file_no;
     page.m_page_no = pagePtr.p->m_page_no;
 
-    Tablespace_client tsman(0, c_tsman,
+    D("Tablespace_client - restart_setup_page");
+    Tablespace_client tsman(0, this, c_tsman,
 			    0, 0, 0);
     unsigned uncommitted, committed;
     uncommitted = committed = ~(unsigned)0;
@@ -349,7 +350,8 @@ Dbtup::disk_page_prealloc(Signal* signal
   Fragrecord* fragPtrP = fragPtr.p; 
   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
   Uint32 idx= alloc.calc_page_free_bits(sz);
-  Tablespace_client tsman(signal, c_tsman,
+  D("Tablespace_client - disk_page_prealloc");
+  Tablespace_client tsman(signal, this, c_tsman,
 			  fragPtrP->fragTableId,
 			  fragPtrP->fragmentId,
 			  fragPtrP->m_tablespace_id);
@@ -912,7 +914,8 @@ Dbtup::disk_page_set_dirty(PagePtr pageP
   
   ddassert(free >= used);
   
-  Tablespace_client tsman(0, c_tsman,
+  D("Tablespace_client - disk_page_set_dirty");
+  Tablespace_client tsman(0, this, c_tsman,
 			  fragPtr.p->fragTableId,
 			  fragPtr.p->fragmentId,
 			  fragPtr.p->m_tablespace_id);
@@ -996,7 +999,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
       ddassert(free >= used);
       ddassert(alloc.calc_page_free_bits(free - used) == idx);
       
-      Tablespace_client tsman(0, c_tsman,
+      D("Tablespace_client - disk_page_unmap_callback");
+      Tablespace_client tsman(0, this, c_tsman,
 			      fragPtr.p->fragTableId,
 			      fragPtr.p->fragmentId,
 			      fragPtr.p->m_tablespace_id);
@@ -1027,7 +1031,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
     LocalDLList<Page> list(*pool, alloc.m_unmap_pages);
     list.remove(pagePtr);
 
-    Tablespace_client tsman(0, c_tsman,
+    D("Tablespace_client - disk_page_unmap_callback");
+    Tablespace_client tsman(0, this, c_tsman,
 			    fragPtr.p->fragTableId,
 			    fragPtr.p->fragmentId,
 			    fragPtr.p->m_tablespace_id);
@@ -1313,6 +1318,7 @@ Dbtup::disk_page_undo_alloc(Page* page, 
 			    Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_alloc");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
   
   Disk_undo::Alloc alloc;
@@ -1336,6 +1342,7 @@ Dbtup::disk_page_undo_update(Page* page,
 			     Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_update");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
 
   Disk_undo::Update update;
@@ -1368,6 +1375,7 @@ Dbtup::disk_page_undo_free(Page* page, c
 			   Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_free");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
 
   Disk_undo::Free free;
@@ -1818,7 +1826,8 @@ Dbtup::disk_restart_undo_page_bits(Signa
   Uint32 new_bits = alloc.calc_page_free_bits(free);
   pageP->list_index = 0x8000 | new_bits;
 
-  Tablespace_client tsman(signal, c_tsman,
+  D("Tablespace_client - disk_restart_undo_page_bits");
+  Tablespace_client tsman(signal, this, c_tsman,
 			  fragPtrP->fragTableId,
 			  fragPtrP->fragmentId,
 			  fragPtrP->m_tablespace_id);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-10-01 07:57:51 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-11-16 09:16:35 +0000
@@ -980,6 +980,7 @@ int Dbtup::handleUpdateReq(Signal* signa
       Uint32 sz= operPtrP->m_undo_buffer_space= 
 	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
       
+      D("Logfile_client - handleUpdateReq");
       Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
       terrorCode= lgman.alloc_log_space(sz);
       if(unlikely(terrorCode))
@@ -1378,6 +1379,7 @@ int Dbtup::handleInsertReq(Signal* signa
       goto log_space_error;
     }
 
+    D("Logfile_client - handleInsertReq");
     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
     res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
     if(unlikely(res))
@@ -1678,6 +1680,7 @@ int Dbtup::handleDeleteReq(Signal* signa
       (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
       regTabPtr->m_offsets[DD].m_fix_header_size - 1;
     
+    D("Logfile_client - handleDeleteReq");
     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
     terrorCode= lgman.alloc_log_space(sz);
     if(unlikely(terrorCode))
@@ -3677,6 +3680,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 
     Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
       tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
     
+    D("Logfile_client - nr_delete");
     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
     int res = lgman.alloc_log_space(sz);
     ndbrequire(res == 0);
@@ -3791,6 +3795,7 @@ Dbtup::nr_delete_page_callback(Signal* s
   cb.m_callbackData = userpointer;
   cb.m_callbackFunction =
     safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
+  D("Logfile_client - nr_delete_page_callback");
   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
   int res= lgman.get_log_buffer(signal, sz, &cb);
   switch(res){

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-10-10 09:32:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-11-16 09:16:35 +0000
@@ -345,6 +345,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
 	safe_cast(&Dbtup::undo_createtable_callback);
       Uint32 sz= sizeof(Disk_undo::Create) >> 2;
       
+      D("Logfile_client - execTUP_ADD_ATTRREQ");
       Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
       if((terrorCode = lgman.alloc_log_space(sz)))
       {
@@ -511,7 +512,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signa
   bzero(&rep,sizeof(rep));
   if(regTabPtr.p->m_no_of_disk_attributes)
   {
-    Tablespace_client tsman(0, c_tsman, 0, 0,
+    D("Tablespace_client - execTUPFRAGREQ");
+    Tablespace_client tsman(0, this, c_tsman, 0, 0,
                             regFragPtr.p->m_tablespace_id);
     ndbrequire(tsman.get_tablespace_info(&rep) == 0);
     regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
@@ -1306,6 +1308,7 @@ Dbtup::undo_createtable_callback(Signal*
   getFragmentrec(regFragPtr, fragOperPtr.p->fragidFrag, regTabPtr.p);
   ndbrequire(regFragPtr.i != RNIL);
   
+  D("Logfile_client - undo_createtable_callback");
   Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
 
   Disk_undo::Create create;
@@ -1616,6 +1619,7 @@ void Dbtup::releaseFragment(Signal* sign
     cb.m_callbackFunction = 
       safe_cast(&Dbtup::drop_table_log_buffer_callback);
     Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
+    D("Logfile_client - releaseFragment");
     Logfile_client lgman(this, c_lgman, logfile_group_id);
     int r0 = lgman.alloc_log_space(sz);
     if (r0)
@@ -1824,6 +1828,7 @@ Dbtup::drop_table_log_buffer_callback(Si
   drop.m_table = tabPtr.i;
   drop.m_type_length = 
     (Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
+  D("Logfile_client - drop_table_log_buffer_callback");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
   
   Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
@@ -1909,7 +1914,8 @@ Dbtup::drop_fragment_free_extent_log_buf
       Uint64 lsn = 0;
 #endif
       
-      Tablespace_client tsman(signal, c_tsman, tabPtr.i, 
+      D("Tablespace_client - drop_fragment_free_extent_log_buffer_callback");
+      Tablespace_client tsman(signal, this, c_tsman, tabPtr.i, 
 			      fragPtr.p->fragmentId,
 			      fragPtr.p->m_tablespace_id);
       

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-11-16 09:16:35 +0000
@@ -827,7 +827,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
         // check if page is un-allocated or empty
 	if (likely(! (bits & ScanOp::SCAN_NR)))
 	{
-	  Tablespace_client tsman(signal, c_tsman,
+          D("Tablespace_client - scanNext");
+	  Tablespace_client tsman(signal, this, c_tsman,
 				  frag.fragTableId, 
 				  frag.fragmentId, 
 				  frag.m_tablespace_id);

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2008-10-08 14:14:43 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2008-11-16 09:16:35 +0000
@@ -54,7 +54,8 @@ extern EventLogger * g_eventLogger;
 Lgman::Lgman(Block_context & ctx) :
   SimulatedBlock(LGMAN, ctx),
   m_logfile_group_list(m_logfile_group_pool),
-  m_logfile_group_hash(m_logfile_group_pool)
+  m_logfile_group_hash(m_logfile_group_pool),
+  m_client_mutex(2, true)
 {
   BLOCK_CONSTRUCTOR(Lgman);
   
@@ -95,10 +96,40 @@ Lgman::Lgman(Block_context & ctx) :
 
   m_last_lsn = 1;
   m_logfile_group_hash.setSize(10);
+
+  if (isNdbMtLqh()) {
+    jam();
+    int ret = m_client_mutex.create();
+    ndbrequire(ret == 0);
+  }
 }
   
 Lgman::~Lgman()
 {
+  if (isNdbMtLqh()) {
+    (void)m_client_mutex.destroy();
+  }
+}
+
+void
+Lgman::client_lock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("try lock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.lock();
+    ndbrequire(ret == 0);
+    D("got lock" << hex << V(block) << dec << V(line));
+  }
+}
+
+void
+Lgman::client_unlock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("unlock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.unlock();
+    ndbrequire(ret == 0);
+  }
 }
 
 BLOCK_FUNCTIONS(Lgman)
@@ -160,6 +191,7 @@ Lgman::execCONTINUEB(Signal* signal){
 
   Uint32 type= signal->theData[0];
   Uint32 ptrI = signal->theData[1];
+  client_lock(number(), __LINE__);
   switch(type){
   case LgmanContinueB::FILTER_LOG:
     jam();
@@ -170,7 +202,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     cut_log_tail(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FLUSH_LOG:
   {
@@ -178,7 +210,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     flush_log(signal, ptr, signal->theData[2]);
-    return;
+    break;
   }
   case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
   {
@@ -186,7 +218,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     process_log_buffer_waiters(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FIND_LOG_HEAD:
     jam();
@@ -200,22 +232,22 @@ Lgman::execCONTINUEB(Signal* signal){
     {
       init_run_undo_log(signal);
     }
-    return;
+    break;
   case LgmanContinueB::EXECUTE_UNDO_RECORD:
     jam();
     execute_undo_record(signal);
-    return;
+    break;
   case LgmanContinueB::STOP_UNDO_LOG:
     jam();
     stop_run_undo_log(signal);
-    return;
+    break;
   case LgmanContinueB::READ_UNDO_LOG:
   {
     jam();
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     read_undo_log(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
   {
@@ -223,7 +255,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     process_log_sync_waiters(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FORCE_LOG_SYNC:
   {
@@ -231,7 +263,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
-    return;
+    break;
   }
   case LgmanContinueB::DROP_FILEGROUP:
   {
@@ -243,14 +275,15 @@ Lgman::execCONTINUEB(Signal* signal){
       jam();
       sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 
 			  signal->length());
-      return;
+      break;
     }
     Uint32 ref = signal->theData[2];
     Uint32 data = signal->theData[3];
     drop_filegroup_drop_files(signal, ptr, ref, data);
-    return;
+    break;
   }
   }
+  client_unlock(number(), __LINE__);
 }
 
 void
@@ -1071,11 +1104,25 @@ Lgman::Undofile::Undofile(const struct C
 }
 
 Logfile_client::Logfile_client(SimulatedBlock* block, 
-			       Lgman* lgman, Uint32 logfile_group_id)
+			       Lgman* lgman, Uint32 logfile_group_id,
+                               bool lock)
 {
-  m_block= numberToBlock(block->number(), block->instance());
+  Uint32 bno = block->number();
+  Uint32 ino = block->instance();
+  m_block= numberToBlock(bno, ino);
   m_lgman= lgman;
+  m_lock = lock;
   m_logfile_group_id= logfile_group_id;
+  D("client ctor" << hex << V(m_block));
+  if (m_lock)
+    m_lgman->client_lock(m_block, 0);
+}
+
+Logfile_client::~Logfile_client()
+{
+  D("client dtor" << hex << V(m_block));
+  if (m_lock)
+    m_lgman->client_unlock(m_block, 0);
 }
 
 int
@@ -1717,6 +1764,7 @@ void
 Lgman::execFSWRITECONF(Signal* signal)
 {
   jamEntry();
+  client_lock(number(), __LINE__);
   FsConf * conf = (FsConf*)signal->getDataPtr();
   Ptr<Undofile> ptr;
   m_file_pool.getPtr(ptr, conf->userPointer);
@@ -1773,6 +1821,7 @@ Lgman::execFSWRITECONF(Signal* signal)
   {
     ndbout_c("miss matched writes");
   }
+  client_unlock(number(), __LINE__);
   
   return;
 }
@@ -1882,6 +1931,7 @@ Lgman::execEND_LCP_REQ(Signal* signal)
       wait= true;
       if(signal->getSendersBlockRef() != reference())
       {
+        D("Logfile_client - execEND_LCP_REQ");
 	Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
 	Logfile_client::Request req;
 	req.m_callback.m_callbackData = ptr.i;
@@ -2216,6 +2266,7 @@ void
 Lgman::execFSREADCONF(Signal* signal)
 {
   jamEntry();
+  client_lock(number(), __LINE__);
 
   Ptr<Undofile> ptr;  
   Ptr<Logfile_group> lg_ptr;
@@ -2256,6 +2307,7 @@ Lgman::execFSREADCONF(Signal* signal)
       lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
       lg_ptr.p->m_next_reply_ptr_i = ptr.i;
     }
+    client_unlock(number(), __LINE__);
     return;
   }
   
@@ -2279,6 +2331,7 @@ Lgman::execFSREADCONF(Signal* signal)
   case Undofile::FS_SEARCHING:
     jam();
     find_log_head_in_file(signal, lg_ptr, ptr, lsn);
+    client_unlock(number(), __LINE__);
     return;
   default:
   case Undofile::FS_EXECUTING:
@@ -2327,6 +2380,7 @@ Lgman::execFSREADCONF(Signal* signal)
     }
   }
   find_log_head(signal, lg_ptr);
+  client_unlock(number(), __LINE__);
 }
   
 void

=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp	2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp	2008-11-16 09:16:35 +0000
@@ -29,6 +29,7 @@
 
 #include <WOPool.hpp>
 #include <SLFifoList.hpp>
+#include <SafeMutex.hpp>
 
 class Lgman : public SimulatedBlock
 {
@@ -259,6 +260,10 @@ private:
   Logfile_group_list m_logfile_group_list;
   Logfile_group_hash m_logfile_group_hash;
 
+  SafeMutex m_client_mutex;
+  void client_lock(BlockNumber block, int line);
+  void client_unlock(BlockNumber block, int line);
+
   bool alloc_logbuffer_memory(Ptr<Logfile_group>, Uint32 pages);
   void init_logbuffer_pointers(Ptr<Logfile_group>);
   void free_logbuffer_memory(Ptr<Logfile_group>);
@@ -308,12 +313,14 @@ private:
 class Logfile_client {
   Uint32 m_block; // includes instance
   Lgman * m_lgman;
+  bool m_lock;
   DEBUG_OUT_DEFINES(LGMAN);
 public:
   Uint32 m_logfile_group_id;
 
-  Logfile_client() {}
-  Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id);
+  Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id,
+                 bool lock = true);
+  ~Logfile_client();
 
   struct Request
   {

=== modified file 'storage/ndb/src/kernel/blocks/pgman.cpp'
--- a/storage/ndb/src/kernel/blocks/pgman.cpp	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/pgman.cpp	2008-11-16 09:16:35 +0000
@@ -1390,6 +1390,7 @@ Pgman::pageout(Signal* signal, Ptr<Page_
   Logfile_client::Request req;
   req.m_callback.m_callbackData = ptr.i;
   req.m_callback.m_callbackFunction = safe_cast(&Pgman::logsync_callback);
+  D("Logfile_client - pageout");
   Logfile_client lgman(this, c_lgman, RNIL);
   int ret = lgman.sync_lsn(signal, ptr.p->m_lsn, &req, 0);
   if (ret > 0)

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-16 09:16:35 +0000
@@ -43,7 +43,8 @@ Tsman::Tsman(Block_context& ctx) :
   m_tablespace_list(m_tablespace_pool),
   m_tablespace_hash(m_tablespace_pool),
   m_pgman(0),
-  m_lgman(0)
+  m_lgman(0),
+  m_client_mutex(2, true)
 {
   BLOCK_CONSTRUCTOR(Tsman);
 
@@ -85,10 +86,40 @@ Tsman::Tsman(Block_context& ctx) :
   m_tablespace_hash.setSize(10);
   m_file_hash.setSize(10);
   m_lcp_ongoing = false;
+
+  if (isNdbMtLqh()) {
+    jam();
+    int ret = m_client_mutex.create();
+    ndbrequire(ret == 0);
+  }
 }
   
 Tsman::~Tsman()
 {
+  if (isNdbMtLqh()) {
+    (void)m_client_mutex.destroy();
+  }
+}
+
+void
+Tsman::client_lock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("try lock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.lock();
+    ndbrequire(ret == 0);
+    D("got lock" << hex << V(block) << dec << V(line));
+  }
+}
+
+void
+Tsman::client_unlock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("unlock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.unlock();
+    ndbrequire(ret == 0);
+  }
 }
 
 BLOCK_FUNCTIONS(Tsman)
@@ -2181,7 +2212,9 @@ Tablespace_client::get_tablespace_info(C
   if(m_tsman->m_tablespace_hash.find(ts_ptr, m_tablespace_id))
   {
     Uint32 logfile_group_id = ts_ptr.p->m_logfile_group_id;
-    Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id);
+    // ctor is used here only for logging
+    D("Logfile_client - get_tablespace_info");
+    Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id, false);
     rep->tablespace.extent_size = ts_ptr.p->m_extent_size;
     rep->tablespace.logfile_group_id = lgman.m_logfile_group_id;
     return 0;

=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp	2008-11-15 16:00:08 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp	2008-11-16 09:16:35 +0000
@@ -22,6 +22,7 @@
 #include <DLList.hpp>
 #include <NodeBitmask.hpp>
 #include <signaldata/GetTabInfo.hpp>
+#include <SafeMutex.hpp>
 
 #include "lgman.hpp"
 #include "pgman.hpp"
@@ -204,6 +205,10 @@ private:
   Tablespace_hash m_tablespace_hash;
   SimulatedBlock * m_pgman;
   Lgman * m_lgman;
+
+  SafeMutex m_client_mutex;
+  void client_lock(BlockNumber block, int line);
+  void client_unlock(BlockNumber block, int line);
   
   int open_file(Signal*, Ptr<Tablespace>, Ptr<Datafile>, CreateFileImplReq*,
 		SectionHandle* handle);
@@ -270,24 +275,42 @@ Tsman::calc_page_no_in_extent(Uint32 pag
 class Tablespace_client
 {
 public:
+  Uint32 m_block;
   Tsman * m_tsman;
   Signal* m_signal;
   Uint32 m_table_id;
   Uint32 m_fragment_id;
   Uint32 m_tablespace_id;
+  bool m_lock;
   DEBUG_OUT_DEFINES(TSMAN);
 
 public:
-  Tablespace_client(Signal* signal, Tsman* tsman, 
-		    Uint32 table, Uint32 fragment, Uint32 tablespaceId) {
+  Tablespace_client(Signal* signal, SimulatedBlock* block, Tsman* tsman, 
+		    Uint32 table, Uint32 fragment, Uint32 tablespaceId,
+                    bool lock = true) {
+    Uint32 bno = block->number();
+    Uint32 ino = block->instance();
+    m_block= numberToBlock(bno, ino);
     m_tsman= tsman;
     m_signal= signal;
     m_table_id= table;
     m_fragment_id= fragment;
     m_tablespace_id= tablespaceId;
+    m_lock = lock;
+
+    D("client ctor" << hex << V(m_block) << dec
+      << V(m_table_id) << V(m_fragment_id) << V(m_tablespace_id));
+    if (m_lock)
+      m_tsman->client_lock(m_block, 0);
   }
 
-  Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);
+  Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);//undef
+
+  ~Tablespace_client() {
+    D("client dtor" << hex << V(m_block));
+    if (m_lock)
+      m_tsman->client_unlock(m_block, 0);
+  }
   
   /**
    * Return >0 if success, no of pages in extent, sets key

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-16 09:16:35 +0000
@@ -110,6 +110,7 @@ class SimulatedBlock {
   friend class Page_cache_client;
   friend class Lgman;
   friend class Logfile_client;
+  friend class Tablespace_client;
   friend struct Pool_context;
   friend struct SectionHandle;
   friend class LockQueue;

Thread
bzr commit into mysql-5.1 branch (pekka:3086) WL#4391Pekka Nousiainen16 Nov