List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:November 16 2008 9:22am
Subject:bzr push into mysql-5.1 branch (pekka:3084 to 3086) WL#4391
View as plain text  
 3086 Pekka Nousiainen	2008-11-16
      wl#4391 26b_ddmx.diff
      DD - tsman, lgman client mutex
modified:
  storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
  storage/ndb/src/kernel/blocks/lgman.cpp
  storage/ndb/src/kernel/blocks/lgman.hpp
  storage/ndb/src/kernel/blocks/pgman.cpp
  storage/ndb/src/kernel/blocks/tsman.cpp
  storage/ndb/src/kernel/blocks/tsman.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp

 3085 Pekka Nousiainen	2008-11-16
      wl#4391 26a_ddmx.diff
      DD - separate mutex implementation
added:
  storage/ndb/src/kernel/vm/SafeMutex.cpp
  storage/ndb/src/kernel/vm/SafeMutex.hpp
modified:
  storage/ndb/src/kernel/blocks/CMakeLists.txt
  storage/ndb/src/kernel/vm/CMakeLists.txt
  storage/ndb/src/kernel/vm/Makefile.am

 3084 Pekka Nousiainen	2008-11-15
      wl#4391 25e_pgman.diff
      DD - mt pgman: data file mt-safety
added:
  storage/ndb/include/kernel/signaldata/DataFileOrd.hpp
modified:
  storage/ndb/include/kernel/GlobalSignalNumbers.h
  storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
  storage/ndb/src/kernel/blocks/PgmanProxy.cpp
  storage/ndb/src/kernel/blocks/PgmanProxy.hpp
  storage/ndb/src/kernel/blocks/pgman.cpp
  storage/ndb/src/kernel/blocks/pgman.hpp
  storage/ndb/src/kernel/blocks/tsman.cpp

=== modified file 'storage/ndb/src/kernel/blocks/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/CMakeLists.txt	2008-10-10 07:05:48 +0000
+++ b/storage/ndb/src/kernel/blocks/CMakeLists.txt	2008-11-16 09:15:35 +0000
@@ -41,6 +41,7 @@ ADD_LIBRARY(ndbblocks  STATIC
             restore.cpp
             tsman.cpp
 	    LocalProxy.cpp
+	    PgmanProxy.cpp
            )
 
 ADD_EXECUTABLE(ndb_print_file

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2008-11-16 09:16:35 +0000
@@ -184,6 +184,7 @@ void Dbtup::do_tup_abortreq(Signal* sign
     if (regOperPtr.p->m_undo_buffer_space)
     {
       jam();
+      D("Logfile_client - do_tup_abortreq");
       Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
       lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
     }
@@ -350,6 +351,7 @@ void Dbtup::tupkeyErrorLab(Signal* signa
       (regOperPtr->is_first_operation() && regOperPtr->is_last_operation()))
   {
     jam();
+    D("Logfile_client - tupkeyErrorLab");
     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
     lgman.free_log_space(regOperPtr->m_undo_buffer_space);
   }

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-10-03 08:22:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-11-16 09:16:35 +0000
@@ -595,6 +595,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
         disk_page_abort_prealloc(signal, regFragPtr.p, 
 				 &req.m_page, req.m_page.m_page_idx);
         
+        D("Logfile_client - execTUP_COMMITREQ");
         Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
         lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
 	goto skip_disk;
@@ -668,6 +669,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
       safe_cast(&Dbtup::disk_page_log_buffer_callback);
     Uint32 sz= regOperPtr.p->m_undo_buffer_space;
     
+    D("Logfile_client - execTUP_COMMITREQ");
     Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
     int res= lgman.get_log_buffer(signal, sz, &cb);
     jamEntry();

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2008-11-16 09:16:35 +0000
@@ -314,7 +314,8 @@ Dbtup::restart_setup_page(Disk_alloc_inf
     page.m_file_no = pagePtr.p->m_file_no;
     page.m_page_no = pagePtr.p->m_page_no;
 
-    Tablespace_client tsman(0, c_tsman,
+    D("Tablespace_client - restart_setup_page");
+    Tablespace_client tsman(0, this, c_tsman,
 			    0, 0, 0);
     unsigned uncommitted, committed;
     uncommitted = committed = ~(unsigned)0;
@@ -349,7 +350,8 @@ Dbtup::disk_page_prealloc(Signal* signal
   Fragrecord* fragPtrP = fragPtr.p; 
   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
   Uint32 idx= alloc.calc_page_free_bits(sz);
-  Tablespace_client tsman(signal, c_tsman,
+  D("Tablespace_client - disk_page_prealloc");
+  Tablespace_client tsman(signal, this, c_tsman,
 			  fragPtrP->fragTableId,
 			  fragPtrP->fragmentId,
 			  fragPtrP->m_tablespace_id);
@@ -912,7 +914,8 @@ Dbtup::disk_page_set_dirty(PagePtr pageP
   
   ddassert(free >= used);
   
-  Tablespace_client tsman(0, c_tsman,
+  D("Tablespace_client - disk_page_set_dirty");
+  Tablespace_client tsman(0, this, c_tsman,
 			  fragPtr.p->fragTableId,
 			  fragPtr.p->fragmentId,
 			  fragPtr.p->m_tablespace_id);
@@ -996,7 +999,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
       ddassert(free >= used);
       ddassert(alloc.calc_page_free_bits(free - used) == idx);
       
-      Tablespace_client tsman(0, c_tsman,
+      D("Tablespace_client - disk_page_unmap_callback");
+      Tablespace_client tsman(0, this, c_tsman,
 			      fragPtr.p->fragTableId,
 			      fragPtr.p->fragmentId,
 			      fragPtr.p->m_tablespace_id);
@@ -1027,7 +1031,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
     LocalDLList<Page> list(*pool, alloc.m_unmap_pages);
     list.remove(pagePtr);
 
-    Tablespace_client tsman(0, c_tsman,
+    D("Tablespace_client - disk_page_unmap_callback");
+    Tablespace_client tsman(0, this, c_tsman,
 			    fragPtr.p->fragTableId,
 			    fragPtr.p->fragmentId,
 			    fragPtr.p->m_tablespace_id);
@@ -1313,6 +1318,7 @@ Dbtup::disk_page_undo_alloc(Page* page, 
 			    Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_alloc");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
   
   Disk_undo::Alloc alloc;
@@ -1336,6 +1342,7 @@ Dbtup::disk_page_undo_update(Page* page,
 			     Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_update");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
 
   Disk_undo::Update update;
@@ -1368,6 +1375,7 @@ Dbtup::disk_page_undo_free(Page* page, c
 			   Uint32 gci, Uint32 logfile_group_id)
 {
   jam();
+  D("Logfile_client - disk_page_undo_free");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
 
   Disk_undo::Free free;
@@ -1818,7 +1826,8 @@ Dbtup::disk_restart_undo_page_bits(Signa
   Uint32 new_bits = alloc.calc_page_free_bits(free);
   pageP->list_index = 0x8000 | new_bits;
 
-  Tablespace_client tsman(signal, c_tsman,
+  D("Tablespace_client - disk_restart_undo_page_bits");
+  Tablespace_client tsman(signal, this, c_tsman,
 			  fragPtrP->fragTableId,
 			  fragPtrP->fragmentId,
 			  fragPtrP->m_tablespace_id);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-10-01 07:57:51 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-11-16 09:16:35 +0000
@@ -980,6 +980,7 @@ int Dbtup::handleUpdateReq(Signal* signa
       Uint32 sz= operPtrP->m_undo_buffer_space= 
 	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
       
+      D("Logfile_client - handleUpdateReq");
       Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
       terrorCode= lgman.alloc_log_space(sz);
       if(unlikely(terrorCode))
@@ -1378,6 +1379,7 @@ int Dbtup::handleInsertReq(Signal* signa
       goto log_space_error;
     }
 
+    D("Logfile_client - handleInsertReq");
     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
     res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
     if(unlikely(res))
@@ -1678,6 +1680,7 @@ int Dbtup::handleDeleteReq(Signal* signa
       (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
       regTabPtr->m_offsets[DD].m_fix_header_size - 1;
     
+    D("Logfile_client - handleDeleteReq");
     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
     terrorCode= lgman.alloc_log_space(sz);
     if(unlikely(terrorCode))
@@ -3677,6 +3680,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 
     Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
       tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
     
+    D("Logfile_client - nr_delete");
     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
     int res = lgman.alloc_log_space(sz);
     ndbrequire(res == 0);
@@ -3791,6 +3795,7 @@ Dbtup::nr_delete_page_callback(Signal* s
   cb.m_callbackData = userpointer;
   cb.m_callbackFunction =
     safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
+  D("Logfile_client - nr_delete_page_callback");
   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
   int res= lgman.get_log_buffer(signal, sz, &cb);
   switch(res){

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-10-10 09:32:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-11-16 09:16:35 +0000
@@ -345,6 +345,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
 	safe_cast(&Dbtup::undo_createtable_callback);
       Uint32 sz= sizeof(Disk_undo::Create) >> 2;
       
+      D("Logfile_client - execTUP_ADD_ATTRREQ");
       Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
       if((terrorCode = lgman.alloc_log_space(sz)))
       {
@@ -511,7 +512,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signa
   bzero(&rep,sizeof(rep));
   if(regTabPtr.p->m_no_of_disk_attributes)
   {
-    Tablespace_client tsman(0, c_tsman, 0, 0,
+    D("Tablespace_client - execTUPFRAGREQ");
+    Tablespace_client tsman(0, this, c_tsman, 0, 0,
                             regFragPtr.p->m_tablespace_id);
     ndbrequire(tsman.get_tablespace_info(&rep) == 0);
     regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
@@ -1306,6 +1308,7 @@ Dbtup::undo_createtable_callback(Signal*
   getFragmentrec(regFragPtr, fragOperPtr.p->fragidFrag, regTabPtr.p);
   ndbrequire(regFragPtr.i != RNIL);
   
+  D("Logfile_client - undo_createtable_callback");
   Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
 
   Disk_undo::Create create;
@@ -1616,6 +1619,7 @@ void Dbtup::releaseFragment(Signal* sign
     cb.m_callbackFunction = 
       safe_cast(&Dbtup::drop_table_log_buffer_callback);
     Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
+    D("Logfile_client - releaseFragment");
     Logfile_client lgman(this, c_lgman, logfile_group_id);
     int r0 = lgman.alloc_log_space(sz);
     if (r0)
@@ -1824,6 +1828,7 @@ Dbtup::drop_table_log_buffer_callback(Si
   drop.m_table = tabPtr.i;
   drop.m_type_length = 
     (Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
+  D("Logfile_client - drop_table_log_buffer_callback");
   Logfile_client lgman(this, c_lgman, logfile_group_id);
   
   Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
@@ -1909,7 +1914,8 @@ Dbtup::drop_fragment_free_extent_log_buf
       Uint64 lsn = 0;
 #endif
       
-      Tablespace_client tsman(signal, c_tsman, tabPtr.i, 
+      D("Tablespace_client - drop_fragment_free_extent_log_buffer_callback");
+      Tablespace_client tsman(signal, this, c_tsman, tabPtr.i, 
 			      fragPtr.p->fragmentId,
 			      fragPtr.p->m_tablespace_id);
       

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-11-16 09:16:35 +0000
@@ -827,7 +827,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
         // check if page is un-allocated or empty
 	if (likely(! (bits & ScanOp::SCAN_NR)))
 	{
-	  Tablespace_client tsman(signal, c_tsman,
+          D("Tablespace_client - scanNext");
+	  Tablespace_client tsman(signal, this, c_tsman,
 				  frag.fragTableId, 
 				  frag.fragmentId, 
 				  frag.m_tablespace_id);

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2008-10-08 14:14:43 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2008-11-16 09:16:35 +0000
@@ -54,7 +54,8 @@ extern EventLogger * g_eventLogger;
 Lgman::Lgman(Block_context & ctx) :
   SimulatedBlock(LGMAN, ctx),
   m_logfile_group_list(m_logfile_group_pool),
-  m_logfile_group_hash(m_logfile_group_pool)
+  m_logfile_group_hash(m_logfile_group_pool),
+  m_client_mutex(2, true)
 {
   BLOCK_CONSTRUCTOR(Lgman);
   
@@ -95,10 +96,40 @@ Lgman::Lgman(Block_context & ctx) :
 
   m_last_lsn = 1;
   m_logfile_group_hash.setSize(10);
+
+  if (isNdbMtLqh()) {
+    jam();
+    int ret = m_client_mutex.create();
+    ndbrequire(ret == 0);
+  }
 }
   
 Lgman::~Lgman()
 {
+  if (isNdbMtLqh()) {
+    (void)m_client_mutex.destroy();
+  }
+}
+
+void
+Lgman::client_lock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("try lock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.lock();
+    ndbrequire(ret == 0);
+    D("got lock" << hex << V(block) << dec << V(line));
+  }
+}
+
+void
+Lgman::client_unlock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("unlock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.unlock();
+    ndbrequire(ret == 0);
+  }
 }
 
 BLOCK_FUNCTIONS(Lgman)
@@ -160,6 +191,7 @@ Lgman::execCONTINUEB(Signal* signal){
 
   Uint32 type= signal->theData[0];
   Uint32 ptrI = signal->theData[1];
+  client_lock(number(), __LINE__);
   switch(type){
   case LgmanContinueB::FILTER_LOG:
     jam();
@@ -170,7 +202,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     cut_log_tail(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FLUSH_LOG:
   {
@@ -178,7 +210,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     flush_log(signal, ptr, signal->theData[2]);
-    return;
+    break;
   }
   case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
   {
@@ -186,7 +218,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     process_log_buffer_waiters(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FIND_LOG_HEAD:
     jam();
@@ -200,22 +232,22 @@ Lgman::execCONTINUEB(Signal* signal){
     {
       init_run_undo_log(signal);
     }
-    return;
+    break;
   case LgmanContinueB::EXECUTE_UNDO_RECORD:
     jam();
     execute_undo_record(signal);
-    return;
+    break;
   case LgmanContinueB::STOP_UNDO_LOG:
     jam();
     stop_run_undo_log(signal);
-    return;
+    break;
   case LgmanContinueB::READ_UNDO_LOG:
   {
     jam();
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     read_undo_log(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
   {
@@ -223,7 +255,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     process_log_sync_waiters(signal, ptr);
-    return;
+    break;
   }
   case LgmanContinueB::FORCE_LOG_SYNC:
   {
@@ -231,7 +263,7 @@ Lgman::execCONTINUEB(Signal* signal){
     Ptr<Logfile_group> ptr;
     m_logfile_group_pool.getPtr(ptr, ptrI);
     force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
-    return;
+    break;
   }
   case LgmanContinueB::DROP_FILEGROUP:
   {
@@ -243,14 +275,15 @@ Lgman::execCONTINUEB(Signal* signal){
       jam();
       sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 
 			  signal->length());
-      return;
+      break;
     }
     Uint32 ref = signal->theData[2];
     Uint32 data = signal->theData[3];
     drop_filegroup_drop_files(signal, ptr, ref, data);
-    return;
+    break;
   }
   }
+  client_unlock(number(), __LINE__);
 }
 
 void
@@ -1071,11 +1104,25 @@ Lgman::Undofile::Undofile(const struct C
 }
 
 Logfile_client::Logfile_client(SimulatedBlock* block, 
-			       Lgman* lgman, Uint32 logfile_group_id)
+			       Lgman* lgman, Uint32 logfile_group_id,
+                               bool lock)
 {
-  m_block= numberToBlock(block->number(), block->instance());
+  Uint32 bno = block->number();
+  Uint32 ino = block->instance();
+  m_block= numberToBlock(bno, ino);
   m_lgman= lgman;
+  m_lock = lock;
   m_logfile_group_id= logfile_group_id;
+  D("client ctor" << hex << V(m_block));
+  if (m_lock)
+    m_lgman->client_lock(m_block, 0);
+}
+
+Logfile_client::~Logfile_client()
+{
+  D("client dtor" << hex << V(m_block));
+  if (m_lock)
+    m_lgman->client_unlock(m_block, 0);
 }
 
 int
@@ -1717,6 +1764,7 @@ void
 Lgman::execFSWRITECONF(Signal* signal)
 {
   jamEntry();
+  client_lock(number(), __LINE__);
   FsConf * conf = (FsConf*)signal->getDataPtr();
   Ptr<Undofile> ptr;
   m_file_pool.getPtr(ptr, conf->userPointer);
@@ -1773,6 +1821,7 @@ Lgman::execFSWRITECONF(Signal* signal)
   {
     ndbout_c("miss matched writes");
   }
+  client_unlock(number(), __LINE__);
   
   return;
 }
@@ -1882,6 +1931,7 @@ Lgman::execEND_LCP_REQ(Signal* signal)
       wait= true;
       if(signal->getSendersBlockRef() != reference())
       {
+        D("Logfile_client - execEND_LCP_REQ");
 	Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
 	Logfile_client::Request req;
 	req.m_callback.m_callbackData = ptr.i;
@@ -2216,6 +2266,7 @@ void
 Lgman::execFSREADCONF(Signal* signal)
 {
   jamEntry();
+  client_lock(number(), __LINE__);
 
   Ptr<Undofile> ptr;  
   Ptr<Logfile_group> lg_ptr;
@@ -2256,6 +2307,7 @@ Lgman::execFSREADCONF(Signal* signal)
       lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
       lg_ptr.p->m_next_reply_ptr_i = ptr.i;
     }
+    client_unlock(number(), __LINE__);
     return;
   }
   
@@ -2279,6 +2331,7 @@ Lgman::execFSREADCONF(Signal* signal)
   case Undofile::FS_SEARCHING:
     jam();
     find_log_head_in_file(signal, lg_ptr, ptr, lsn);
+    client_unlock(number(), __LINE__);
     return;
   default:
   case Undofile::FS_EXECUTING:
@@ -2327,6 +2380,7 @@ Lgman::execFSREADCONF(Signal* signal)
     }
   }
   find_log_head(signal, lg_ptr);
+  client_unlock(number(), __LINE__);
 }
   
 void

=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp	2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp	2008-11-16 09:16:35 +0000
@@ -29,6 +29,7 @@
 
 #include <WOPool.hpp>
 #include <SLFifoList.hpp>
+#include <SafeMutex.hpp>
 
 class Lgman : public SimulatedBlock
 {
@@ -259,6 +260,10 @@ private:
   Logfile_group_list m_logfile_group_list;
   Logfile_group_hash m_logfile_group_hash;
 
+  SafeMutex m_client_mutex;
+  void client_lock(BlockNumber block, int line);
+  void client_unlock(BlockNumber block, int line);
+
   bool alloc_logbuffer_memory(Ptr<Logfile_group>, Uint32 pages);
   void init_logbuffer_pointers(Ptr<Logfile_group>);
   void free_logbuffer_memory(Ptr<Logfile_group>);
@@ -308,12 +313,14 @@ private:
 class Logfile_client {
   Uint32 m_block; // includes instance
   Lgman * m_lgman;
+  bool m_lock;
   DEBUG_OUT_DEFINES(LGMAN);
 public:
   Uint32 m_logfile_group_id;
 
-  Logfile_client() {}
-  Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id);
+  Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id,
+                 bool lock = true);
+  ~Logfile_client();
 
   struct Request
   {

=== modified file 'storage/ndb/src/kernel/blocks/pgman.cpp'
--- a/storage/ndb/src/kernel/blocks/pgman.cpp	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/pgman.cpp	2008-11-16 09:16:35 +0000
@@ -1390,6 +1390,7 @@ Pgman::pageout(Signal* signal, Ptr<Page_
   Logfile_client::Request req;
   req.m_callback.m_callbackData = ptr.i;
   req.m_callback.m_callbackFunction = safe_cast(&Pgman::logsync_callback);
+  D("Logfile_client - pageout");
   Logfile_client lgman(this, c_lgman, RNIL);
   int ret = lgman.sync_lsn(signal, ptr.p->m_lsn, &req, 0);
   if (ret > 0)

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-16 09:16:35 +0000
@@ -43,7 +43,8 @@ Tsman::Tsman(Block_context& ctx) :
   m_tablespace_list(m_tablespace_pool),
   m_tablespace_hash(m_tablespace_pool),
   m_pgman(0),
-  m_lgman(0)
+  m_lgman(0),
+  m_client_mutex(2, true)
 {
   BLOCK_CONSTRUCTOR(Tsman);
 
@@ -85,10 +86,40 @@ Tsman::Tsman(Block_context& ctx) :
   m_tablespace_hash.setSize(10);
   m_file_hash.setSize(10);
   m_lcp_ongoing = false;
+
+  if (isNdbMtLqh()) {
+    jam();
+    int ret = m_client_mutex.create();
+    ndbrequire(ret == 0);
+  }
 }
   
 Tsman::~Tsman()
 {
+  if (isNdbMtLqh()) {
+    (void)m_client_mutex.destroy();
+  }
+}
+
+void
+Tsman::client_lock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("try lock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.lock();
+    ndbrequire(ret == 0);
+    D("got lock" << hex << V(block) << dec << V(line));
+  }
+}
+
+void
+Tsman::client_unlock(BlockNumber block, int line)
+{
+  if (isNdbMtLqh()) {
+    D("unlock" << hex << V(block) << dec << V(line));
+    int ret = m_client_mutex.unlock();
+    ndbrequire(ret == 0);
+  }
 }
 
 BLOCK_FUNCTIONS(Tsman)
@@ -2181,7 +2212,9 @@ Tablespace_client::get_tablespace_info(C
   if(m_tsman->m_tablespace_hash.find(ts_ptr, m_tablespace_id))
   {
     Uint32 logfile_group_id = ts_ptr.p->m_logfile_group_id;
-    Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id);
+    // ctor is used here only for logging
+    D("Logfile_client - get_tablespace_info");
+    Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id, false);
     rep->tablespace.extent_size = ts_ptr.p->m_extent_size;
     rep->tablespace.logfile_group_id = lgman.m_logfile_group_id;
     return 0;

=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp	2008-11-15 16:00:08 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp	2008-11-16 09:16:35 +0000
@@ -22,6 +22,7 @@
 #include <DLList.hpp>
 #include <NodeBitmask.hpp>
 #include <signaldata/GetTabInfo.hpp>
+#include <SafeMutex.hpp>
 
 #include "lgman.hpp"
 #include "pgman.hpp"
@@ -204,6 +205,10 @@ private:
   Tablespace_hash m_tablespace_hash;
   SimulatedBlock * m_pgman;
   Lgman * m_lgman;
+
+  SafeMutex m_client_mutex;
+  void client_lock(BlockNumber block, int line);
+  void client_unlock(BlockNumber block, int line);
   
   int open_file(Signal*, Ptr<Tablespace>, Ptr<Datafile>, CreateFileImplReq*,
 		SectionHandle* handle);
@@ -270,24 +275,42 @@ Tsman::calc_page_no_in_extent(Uint32 pag
 class Tablespace_client
 {
 public:
+  Uint32 m_block;
   Tsman * m_tsman;
   Signal* m_signal;
   Uint32 m_table_id;
   Uint32 m_fragment_id;
   Uint32 m_tablespace_id;
+  bool m_lock;
   DEBUG_OUT_DEFINES(TSMAN);
 
 public:
-  Tablespace_client(Signal* signal, Tsman* tsman, 
-		    Uint32 table, Uint32 fragment, Uint32 tablespaceId) {
+  Tablespace_client(Signal* signal, SimulatedBlock* block, Tsman* tsman, 
+		    Uint32 table, Uint32 fragment, Uint32 tablespaceId,
+                    bool lock = true) {
+    Uint32 bno = block->number();
+    Uint32 ino = block->instance();
+    m_block= numberToBlock(bno, ino);
     m_tsman= tsman;
     m_signal= signal;
     m_table_id= table;
     m_fragment_id= fragment;
     m_tablespace_id= tablespaceId;
+    m_lock = lock;
+
+    D("client ctor" << hex << V(m_block) << dec
+      << V(m_table_id) << V(m_fragment_id) << V(m_tablespace_id));
+    if (m_lock)
+      m_tsman->client_lock(m_block, 0);
   }
 
-  Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);
+  Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);//undef
+
+  ~Tablespace_client() {
+    D("client dtor" << hex << V(m_block));
+    if (m_lock)
+      m_tsman->client_unlock(m_block, 0);
+  }
   
   /**
    * Return >0 if success, no of pages in extent, sets key

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2008-10-09 09:34:24 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2008-11-16 09:15:35 +0000
@@ -35,6 +35,7 @@ ADD_LIBRARY(ndbkernel STATIC
             Rope.cpp
             WOPool.cpp
 	    GlobalData.cpp
+	    SafeMutex.cpp
 )
 ADD_LIBRARY(ndbsched STATIC
             TimeQueue.cpp

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2008-10-08 21:20:38 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2008-11-16 09:15:35 +0000
@@ -36,7 +36,8 @@ libkernel_a_SOURCES = \
         ndbd_malloc.cpp \
 	ndbd_malloc_impl.cpp \
 	Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
-	GlobalData.cpp
+	GlobalData.cpp \
+	SafeMutex.cpp
 
 libsched_a_SOURCES = TimeQueue.cpp		\
                      ThreadConfig.cpp \
@@ -58,13 +59,14 @@ EXTRA_DIST=SimulatedBlock.cpp Transporte
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 
-LDADD_LOC = $(top_builddir)/ndb/src/common/util/libgeneral.la
+LDADD_LOC = $(top_builddir)/storage/ndb/src/common/util/libgeneral.la
 
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
 
 EXTRA_PROGRAMS = ndbd_malloc_impl_test bench_pool testDynArr256 \
-                 testSectionReader testLongSignals
+                 testSectionReader testLongSignals \
+		 testSafeMutex
 
 ndbd_malloc_impl_test_CXXFLAGS = -DUNIT_TEST
 ndbd_malloc_impl_test_SOURCES = ndbd_malloc_impl.cpp
@@ -108,3 +110,13 @@ testLongSignals_LDFLAGS = @ndb_bin_am_ld
   $(top_builddir)/strings/libmystringslt.la \
   @readline_link@ @TERMCAP_LIB@
 
+testSafeMutex_CXXFLAGS = -DUNIT_TEST
+testSafeMutex_SOURCES = SafeMutex.cpp
+testSafeMutex_LDFLAGS = @ndb_bin_am_ldflags@ \
+  $(top_builddir)/storage/ndb/src/libndbclient.la \
+  $(top_builddir)/mysys/libmysyslt.la \
+  $(top_builddir)/dbug/libdbuglt.la \
+  $(top_builddir)/strings/libmystringslt.la \
+  @readline_link@ @TERMCAP_LIB@
+
+

=== added file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp	2008-11-16 09:15:35 +0000
@@ -0,0 +1,207 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "SafeMutex.hpp"
+
+//wl4391_todo
+#define HAVE_PTHREAD_MUTEX_RECURSIVE 1
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& dm)
+{
+  out << "level=" << dm.m_level << "," << "usage=" << dm.m_usage;
+  return out;
+}
+
+int
+SafeMutex::create()
+{
+  if (m_init)
+    return ErrState;
+  int ret = -1;
+#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
+  if (m_limit > 1 || m_debug) {
+    pthread_mutexattr_t attr;
+    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+    ret = pthread_mutex_init(&m_mutex, &attr);
+  } else {
+    // error-check mutex does not work right on my linux, skip it
+    ret = pthread_mutex_init(&m_mutex, 0);
+  }
+#else
+  if (m_limit > 1 || m_debug)
+    return ErrUnsupp;
+  ret = pthread_mutex_init(&m_mutex, 0);
+#endif
+  if (ret != 0)
+    return ret;
+  m_init = true;
+  return 0;
+}
+
+int
+SafeMutex::destroy()
+{
+  if (!m_init)
+    return ErrState;
+  int ret = pthread_mutex_destroy(&m_mutex);
+  if (ret != 0)
+    return ret;
+  m_init = false;
+  return 0;
+}
+
+int
+SafeMutex::lock()
+{
+  pthread_t self = pthread_self();
+  int ret = pthread_mutex_lock(&m_mutex);
+  /* have mutex */
+  if (ret != 0)
+    return ret;
+  if (!(m_level < m_limit))
+    return ErrLevel;
+  m_level++;
+  if (m_level > m_usage)
+    m_usage = m_level;
+  if (m_level == 1 && m_owner != 0)
+    return ErrOwner1;
+  if (m_level >= 2 && m_owner != self)
+    return ErrOwner2;
+  m_owner = self;
+  return 0;
+}
+
+int
+SafeMutex::unlock()
+{
+  pthread_t self = pthread_self();
+  if (!(m_level > 0))
+    return ErrState;
+  if (m_owner != self)
+    return ErrOwner3;
+  if (m_level == 1)
+    m_owner = 0;
+  m_level--;
+  int ret = pthread_mutex_unlock(&m_mutex);
+  /* lose mutex */
+  if (ret != 0)
+    return ret;
+  return 0;
+}
+
+#ifdef UNIT_TEST
+
+struct sm_thr {
+  SafeMutex* sm_ptr;
+  uint index;
+  uint loops;
+  uint limit;
+  pthread_t id;
+  sm_thr() : sm_ptr(0), index(0), loops(0), limit(0), id(0) {}
+  ~sm_thr() {}
+};
+
+extern "C" { static void* sm_run(void* arg); }
+
+static void*
+sm_run(void* arg)
+{
+  sm_thr& thr = *(sm_thr*)arg;
+  assert(thr.sm_ptr != 0);
+  SafeMutex& sm = *thr.sm_ptr;
+  uint level = 0;
+  int dir = 0;
+  uint i;
+  for (i = 0; i < thr.loops; i++) {
+    int op = 0;
+    uint sel = uint(random()) % 10;
+    if (level == 0) {
+      dir = +1;
+      op = +1;
+    } else if (level == thr.limit) {
+      dir = -1;
+      op = -1;
+    } else if (dir == +1) {
+      op = sel != 0 ? +1 : -1;
+    } else if (dir == -1) {
+      op = sel != 0 ? -1 : +1;
+    } else {
+      assert(false);
+    }
+    if (op == +1) {
+      assert(level < thr.limit);
+      int ret = sm.lock();
+      assert(ret == 0);
+      level++;
+    } else if (op == -1) {
+      int ret = sm.unlock();
+      assert(ret == 0);
+      assert(level != 0);
+      level--;
+    } else {
+      assert(false);
+    }
+  }
+  while (level > 0) {
+    int ret = sm.unlock();
+    assert(ret == 0);
+    level--;
+  }
+}
+
+int
+main(int argc, char** argv)
+{
+  const uint max_thr = 128;
+  struct sm_thr thr[max_thr];
+
+  // threads - loops - max level
+  uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
+  assert(num_thr != 0 && num_thr <= max_thr);
+  uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
+  uint limit = argc > 3 ? atoi(argv[3]) : 10;
+  assert(limit != 0);
+
+  ndbout << "threads=" << num_thr;
+  ndbout << " loops=" << loops;
+  ndbout << " max level=" << limit << endl;
+
+  SafeMutex sm(limit, true);
+  int ret;
+  ret = sm.create();
+  assert(ret == 0);
+
+  uint i;
+  for (i = 0; i < num_thr; i++) {
+    thr[i].sm_ptr = &sm;
+    thr[i].index = i;
+    thr[i].loops = loops;
+    thr[i].limit = limit;
+    pthread_create(&thr[i].id, 0, &sm_run, &thr[i]);
+    ndbout << "create " << i << " id=" << thr[i].id << endl;
+  }
+  for (i = 0; i < num_thr; i++) {
+    void* value;
+    pthread_join(thr[i].id, &value);
+    ndbout << "join " << i << " id=" << thr[i].id << endl;
+  }
+
+  ret = sm.destroy();
+  assert(ret == 0);
+  return 0;
+}
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-11-16 09:15:35 +0000
@@ -0,0 +1,76 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef NDB_SAFE_MUTEX_HPP
+#define NDB_SAFE_MUTEX_HPP
+
+#include <pthread.h>
+#include <assert.h>
+#include <ndb_types.h>
+#include <NdbOut.hpp>
+
+/*
+ * Recursive mutex with a recursion limit >= 1.  Can be useful for
+ * debugging.  If a recursive mutex is not wanted, one must rewrite
+ * caller code until limit 1 works.
+ *
+ * Implementation for limit > 1 uses a real OS recursive mutex.  Should
+ * work on linux and solaris 10.  There is a unit test testSafeMutex.
+ *
+ * The caller currently is multi-threaded disk data.  Here it is easy
+ * to verify that the mutex is released within a time-slice.
+ */
+
+class SafeMutex {
+  pthread_mutex_t m_mutex;
+  pthread_t m_owner;
+  bool m_init;
+  Uint32 m_level;
+  Uint32 m_usage;       // max level used so far
+  const Uint32 m_limit; // error if usage exceeds this
+  const bool m_debug;   // use recursive mutex even for limit 1
+  friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
+
+public:
+  SafeMutex(Uint32 limit, bool debug) :
+    m_limit(limit),
+    m_debug(debug)
+  {
+    assert(m_limit >= 1),
+    m_owner = 0;        // wl4391_todo assuming numeric non-zero
+    m_init = false;
+    m_level = 0;
+    m_usage = 0;
+  };
+  ~SafeMutex() {
+    (void)destroy();
+  }
+
+  enum {
+    // caller must crash on any error
+    ErrUnsupp = -101,   // limit > 1 or debug, and not supported by OS
+    ErrState = -102,    // user error
+    ErrLevel = -103,    // level exceeded limit
+    ErrOwner1 = -104,   // owner not 0 at first lock (OS error)
+    ErrOwner2 = -105,   // owner not self at recursive lock (OS error)
+    ErrOwner3 = -106    // owner not self at unlock (OS error)
+  };
+  int create();
+  int destroy();
+  int lock();
+  int unlock();
+};
+
+#endif

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-16 09:16:35 +0000
@@ -110,6 +110,7 @@ class SimulatedBlock {
   friend class Page_cache_client;
   friend class Lgman;
   friend class Logfile_client;
+  friend class Tablespace_client;
   friend struct Pool_context;
   friend struct SectionHandle;
   friend class LockQueue;

Thread
bzr push into mysql-5.1 branch (pekka:3084 to 3086) WL#4391Pekka Nousiainen16 Nov