3086 Pekka Nousiainen 2008-11-16
wl#4391 26b_ddmx.diff
DD - tsman, lgman client mutex
modified:
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/lgman.hpp
storage/ndb/src/kernel/blocks/pgman.cpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/blocks/tsman.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
3085 Pekka Nousiainen 2008-11-16
wl#4391 26a_ddmx.diff
DD - separate mutex implementation
added:
storage/ndb/src/kernel/vm/SafeMutex.cpp
storage/ndb/src/kernel/vm/SafeMutex.hpp
modified:
storage/ndb/src/kernel/blocks/CMakeLists.txt
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/Makefile.am
3084 Pekka Nousiainen 2008-11-15
wl#4391 25e_pgman.diff
DD - mt pgman: data file mt-safety
added:
storage/ndb/include/kernel/signaldata/DataFileOrd.hpp
modified:
storage/ndb/include/kernel/GlobalSignalNumbers.h
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
storage/ndb/src/kernel/blocks/PgmanProxy.cpp
storage/ndb/src/kernel/blocks/PgmanProxy.hpp
storage/ndb/src/kernel/blocks/pgman.cpp
storage/ndb/src/kernel/blocks/pgman.hpp
storage/ndb/src/kernel/blocks/tsman.cpp
=== modified file 'storage/ndb/src/kernel/blocks/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/CMakeLists.txt 2008-10-10 07:05:48 +0000
+++ b/storage/ndb/src/kernel/blocks/CMakeLists.txt 2008-11-16 09:15:35 +0000
@@ -41,6 +41,7 @@ ADD_LIBRARY(ndbblocks STATIC
restore.cpp
tsman.cpp
LocalProxy.cpp
+ PgmanProxy.cpp
)
ADD_EXECUTABLE(ndb_print_file
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2008-11-16 09:16:35 +0000
@@ -184,6 +184,7 @@ void Dbtup::do_tup_abortreq(Signal* sign
if (regOperPtr.p->m_undo_buffer_space)
{
jam();
+ D("Logfile_client - do_tup_abortreq");
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
}
@@ -350,6 +351,7 @@ void Dbtup::tupkeyErrorLab(Signal* signa
(regOperPtr->is_first_operation() && regOperPtr->is_last_operation()))
{
jam();
+ D("Logfile_client - tupkeyErrorLab");
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
lgman.free_log_space(regOperPtr->m_undo_buffer_space);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2008-10-03 08:22:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2008-11-16 09:16:35 +0000
@@ -595,6 +595,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
disk_page_abort_prealloc(signal, regFragPtr.p,
&req.m_page, req.m_page.m_page_idx);
+ D("Logfile_client - execTUP_COMMITREQ");
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
goto skip_disk;
@@ -668,6 +669,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
safe_cast(&Dbtup::disk_page_log_buffer_callback);
Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+ D("Logfile_client - execTUP_COMMITREQ");
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
jamEntry();
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2008-11-16 09:16:35 +0000
@@ -314,7 +314,8 @@ Dbtup::restart_setup_page(Disk_alloc_inf
page.m_file_no = pagePtr.p->m_file_no;
page.m_page_no = pagePtr.p->m_page_no;
- Tablespace_client tsman(0, c_tsman,
+ D("Tablespace_client - restart_setup_page");
+ Tablespace_client tsman(0, this, c_tsman,
0, 0, 0);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
@@ -349,7 +350,8 @@ Dbtup::disk_page_prealloc(Signal* signal
Fragrecord* fragPtrP = fragPtr.p;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
Uint32 idx= alloc.calc_page_free_bits(sz);
- Tablespace_client tsman(signal, c_tsman,
+ D("Tablespace_client - disk_page_prealloc");
+ Tablespace_client tsman(signal, this, c_tsman,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
@@ -912,7 +914,8 @@ Dbtup::disk_page_set_dirty(PagePtr pageP
ddassert(free >= used);
- Tablespace_client tsman(0, c_tsman,
+ D("Tablespace_client - disk_page_set_dirty");
+ Tablespace_client tsman(0, this, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
@@ -996,7 +999,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
ddassert(free >= used);
ddassert(alloc.calc_page_free_bits(free - used) == idx);
- Tablespace_client tsman(0, c_tsman,
+ D("Tablespace_client - disk_page_unmap_callback");
+ Tablespace_client tsman(0, this, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
@@ -1027,7 +1031,8 @@ Dbtup::disk_page_unmap_callback(Uint32 w
LocalDLList<Page> list(*pool, alloc.m_unmap_pages);
list.remove(pagePtr);
- Tablespace_client tsman(0, c_tsman,
+ D("Tablespace_client - disk_page_unmap_callback");
+ Tablespace_client tsman(0, this, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
@@ -1313,6 +1318,7 @@ Dbtup::disk_page_undo_alloc(Page* page,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
{
jam();
+ D("Logfile_client - disk_page_undo_alloc");
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Alloc alloc;
@@ -1336,6 +1342,7 @@ Dbtup::disk_page_undo_update(Page* page,
Uint32 gci, Uint32 logfile_group_id)
{
jam();
+ D("Logfile_client - disk_page_undo_update");
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Update update;
@@ -1368,6 +1375,7 @@ Dbtup::disk_page_undo_free(Page* page, c
Uint32 gci, Uint32 logfile_group_id)
{
jam();
+ D("Logfile_client - disk_page_undo_free");
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Free free;
@@ -1818,7 +1826,8 @@ Dbtup::disk_restart_undo_page_bits(Signa
Uint32 new_bits = alloc.calc_page_free_bits(free);
pageP->list_index = 0x8000 | new_bits;
- Tablespace_client tsman(signal, c_tsman,
+ D("Tablespace_client - disk_restart_undo_page_bits");
+ Tablespace_client tsman(signal, this, c_tsman,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-10-01 07:57:51 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-11-16 09:16:35 +0000
@@ -980,6 +980,7 @@ int Dbtup::handleUpdateReq(Signal* signa
Uint32 sz= operPtrP->m_undo_buffer_space=
(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
+ D("Logfile_client - handleUpdateReq");
Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
terrorCode= lgman.alloc_log_space(sz);
if(unlikely(terrorCode))
@@ -1378,6 +1379,7 @@ int Dbtup::handleInsertReq(Signal* signa
goto log_space_error;
}
+ D("Logfile_client - handleInsertReq");
Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
if(unlikely(res))
@@ -1678,6 +1680,7 @@ int Dbtup::handleDeleteReq(Signal* signa
(sizeof(Dbtup::Disk_undo::Free) >> 2) +
regTabPtr->m_offsets[DD].m_fix_header_size - 1;
+ D("Logfile_client - handleDeleteReq");
Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
terrorCode= lgman.alloc_log_space(sz);
if(unlikely(terrorCode))
@@ -3677,6 +3680,7 @@ Dbtup::nr_delete(Signal* signal, Uint32
Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
+ D("Logfile_client - nr_delete");
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res = lgman.alloc_log_space(sz);
ndbrequire(res == 0);
@@ -3791,6 +3795,7 @@ Dbtup::nr_delete_page_callback(Signal* s
cb.m_callbackData = userpointer;
cb.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_log_buffer_callback);
+ D("Logfile_client - nr_delete_page_callback");
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2008-10-10 09:32:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2008-11-16 09:16:35 +0000
@@ -345,6 +345,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
safe_cast(&Dbtup::undo_createtable_callback);
Uint32 sz= sizeof(Disk_undo::Create) >> 2;
+ D("Logfile_client - execTUP_ADD_ATTRREQ");
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
if((terrorCode = lgman.alloc_log_space(sz)))
{
@@ -511,7 +512,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signa
bzero(&rep,sizeof(rep));
if(regTabPtr.p->m_no_of_disk_attributes)
{
- Tablespace_client tsman(0, c_tsman, 0, 0,
+ D("Tablespace_client - execTUPFRAGREQ");
+ Tablespace_client tsman(0, this, c_tsman, 0, 0,
regFragPtr.p->m_tablespace_id);
ndbrequire(tsman.get_tablespace_info(&rep) == 0);
regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
@@ -1306,6 +1308,7 @@ Dbtup::undo_createtable_callback(Signal*
getFragmentrec(regFragPtr, fragOperPtr.p->fragidFrag, regTabPtr.p);
ndbrequire(regFragPtr.i != RNIL);
+ D("Logfile_client - undo_createtable_callback");
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
Disk_undo::Create create;
@@ -1616,6 +1619,7 @@ void Dbtup::releaseFragment(Signal* sign
cb.m_callbackFunction =
safe_cast(&Dbtup::drop_table_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
+ D("Logfile_client - releaseFragment");
Logfile_client lgman(this, c_lgman, logfile_group_id);
int r0 = lgman.alloc_log_space(sz);
if (r0)
@@ -1824,6 +1828,7 @@ Dbtup::drop_table_log_buffer_callback(Si
drop.m_table = tabPtr.i;
drop.m_type_length =
(Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
+ D("Logfile_client - drop_table_log_buffer_callback");
Logfile_client lgman(this, c_lgman, logfile_group_id);
Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
@@ -1909,7 +1914,8 @@ Dbtup::drop_fragment_free_extent_log_buf
Uint64 lsn = 0;
#endif
- Tablespace_client tsman(signal, c_tsman, tabPtr.i,
+ D("Tablespace_client - drop_fragment_free_extent_log_buffer_callback");
+ Tablespace_client tsman(signal, this, c_tsman, tabPtr.i,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-11-16 09:16:35 +0000
@@ -827,7 +827,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
// check if page is un-allocated or empty
if (likely(! (bits & ScanOp::SCAN_NR)))
{
- Tablespace_client tsman(signal, c_tsman,
+ D("Tablespace_client - scanNext");
+ Tablespace_client tsman(signal, this, c_tsman,
frag.fragTableId,
frag.fragmentId,
frag.m_tablespace_id);
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-10-08 14:14:43 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-16 09:16:35 +0000
@@ -54,7 +54,8 @@ extern EventLogger * g_eventLogger;
Lgman::Lgman(Block_context & ctx) :
SimulatedBlock(LGMAN, ctx),
m_logfile_group_list(m_logfile_group_pool),
- m_logfile_group_hash(m_logfile_group_pool)
+ m_logfile_group_hash(m_logfile_group_pool),
+ m_client_mutex(2, true)
{
BLOCK_CONSTRUCTOR(Lgman);
@@ -95,10 +96,40 @@ Lgman::Lgman(Block_context & ctx) :
m_last_lsn = 1;
m_logfile_group_hash.setSize(10);
+
+ if (isNdbMtLqh()) {
+ jam();
+ int ret = m_client_mutex.create();
+ ndbrequire(ret == 0);
+ }
}
Lgman::~Lgman()
{
+ if (isNdbMtLqh()) {
+ (void)m_client_mutex.destroy();
+ }
+}
+
+void
+Lgman::client_lock(BlockNumber block, int line)
+{
+ if (isNdbMtLqh()) {
+ D("try lock" << hex << V(block) << dec << V(line));
+ int ret = m_client_mutex.lock();
+ ndbrequire(ret == 0);
+ D("got lock" << hex << V(block) << dec << V(line));
+ }
+}
+
+void
+Lgman::client_unlock(BlockNumber block, int line)
+{
+ if (isNdbMtLqh()) {
+ D("unlock" << hex << V(block) << dec << V(line));
+ int ret = m_client_mutex.unlock();
+ ndbrequire(ret == 0);
+ }
}
BLOCK_FUNCTIONS(Lgman)
@@ -160,6 +191,7 @@ Lgman::execCONTINUEB(Signal* signal){
Uint32 type= signal->theData[0];
Uint32 ptrI = signal->theData[1];
+ client_lock(number(), __LINE__);
switch(type){
case LgmanContinueB::FILTER_LOG:
jam();
@@ -170,7 +202,7 @@ Lgman::execCONTINUEB(Signal* signal){
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
cut_log_tail(signal, ptr);
- return;
+ break;
}
case LgmanContinueB::FLUSH_LOG:
{
@@ -178,7 +210,7 @@ Lgman::execCONTINUEB(Signal* signal){
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
flush_log(signal, ptr, signal->theData[2]);
- return;
+ break;
}
case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
{
@@ -186,7 +218,7 @@ Lgman::execCONTINUEB(Signal* signal){
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
process_log_buffer_waiters(signal, ptr);
- return;
+ break;
}
case LgmanContinueB::FIND_LOG_HEAD:
jam();
@@ -200,22 +232,22 @@ Lgman::execCONTINUEB(Signal* signal){
{
init_run_undo_log(signal);
}
- return;
+ break;
case LgmanContinueB::EXECUTE_UNDO_RECORD:
jam();
execute_undo_record(signal);
- return;
+ break;
case LgmanContinueB::STOP_UNDO_LOG:
jam();
stop_run_undo_log(signal);
- return;
+ break;
case LgmanContinueB::READ_UNDO_LOG:
{
jam();
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
read_undo_log(signal, ptr);
- return;
+ break;
}
case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
{
@@ -223,7 +255,7 @@ Lgman::execCONTINUEB(Signal* signal){
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
process_log_sync_waiters(signal, ptr);
- return;
+ break;
}
case LgmanContinueB::FORCE_LOG_SYNC:
{
@@ -231,7 +263,7 @@ Lgman::execCONTINUEB(Signal* signal){
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
- return;
+ break;
}
case LgmanContinueB::DROP_FILEGROUP:
{
@@ -243,14 +275,15 @@ Lgman::execCONTINUEB(Signal* signal){
jam();
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
signal->length());
- return;
+ break;
}
Uint32 ref = signal->theData[2];
Uint32 data = signal->theData[3];
drop_filegroup_drop_files(signal, ptr, ref, data);
- return;
+ break;
}
}
+ client_unlock(number(), __LINE__);
}
void
@@ -1071,11 +1104,25 @@ Lgman::Undofile::Undofile(const struct C
}
Logfile_client::Logfile_client(SimulatedBlock* block,
- Lgman* lgman, Uint32 logfile_group_id)
+ Lgman* lgman, Uint32 logfile_group_id,
+ bool lock)
{
- m_block= numberToBlock(block->number(), block->instance());
+ Uint32 bno = block->number();
+ Uint32 ino = block->instance();
+ m_block= numberToBlock(bno, ino);
m_lgman= lgman;
+ m_lock = lock;
m_logfile_group_id= logfile_group_id;
+ D("client ctor" << hex << V(m_block));
+ if (m_lock)
+ m_lgman->client_lock(m_block, 0);
+}
+
+Logfile_client::~Logfile_client()
+{
+ D("client dtor" << hex << V(m_block));
+ if (m_lock)
+ m_lgman->client_unlock(m_block, 0);
}
int
@@ -1717,6 +1764,7 @@ void
Lgman::execFSWRITECONF(Signal* signal)
{
jamEntry();
+ client_lock(number(), __LINE__);
FsConf * conf = (FsConf*)signal->getDataPtr();
Ptr<Undofile> ptr;
m_file_pool.getPtr(ptr, conf->userPointer);
@@ -1773,6 +1821,7 @@ Lgman::execFSWRITECONF(Signal* signal)
{
ndbout_c("miss matched writes");
}
+ client_unlock(number(), __LINE__);
return;
}
@@ -1882,6 +1931,7 @@ Lgman::execEND_LCP_REQ(Signal* signal)
wait= true;
if(signal->getSendersBlockRef() != reference())
{
+ D("Logfile_client - execEND_LCP_REQ");
Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
Logfile_client::Request req;
req.m_callback.m_callbackData = ptr.i;
@@ -2216,6 +2266,7 @@ void
Lgman::execFSREADCONF(Signal* signal)
{
jamEntry();
+ client_lock(number(), __LINE__);
Ptr<Undofile> ptr;
Ptr<Logfile_group> lg_ptr;
@@ -2256,6 +2307,7 @@ Lgman::execFSREADCONF(Signal* signal)
lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
lg_ptr.p->m_next_reply_ptr_i = ptr.i;
}
+ client_unlock(number(), __LINE__);
return;
}
@@ -2279,6 +2331,7 @@ Lgman::execFSREADCONF(Signal* signal)
case Undofile::FS_SEARCHING:
jam();
find_log_head_in_file(signal, lg_ptr, ptr, lsn);
+ client_unlock(number(), __LINE__);
return;
default:
case Undofile::FS_EXECUTING:
@@ -2327,6 +2380,7 @@ Lgman::execFSREADCONF(Signal* signal)
}
}
find_log_head(signal, lg_ptr);
+ client_unlock(number(), __LINE__);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp 2008-09-30 15:11:30 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp 2008-11-16 09:16:35 +0000
@@ -29,6 +29,7 @@
#include <WOPool.hpp>
#include <SLFifoList.hpp>
+#include <SafeMutex.hpp>
class Lgman : public SimulatedBlock
{
@@ -259,6 +260,10 @@ private:
Logfile_group_list m_logfile_group_list;
Logfile_group_hash m_logfile_group_hash;
+ SafeMutex m_client_mutex;
+ void client_lock(BlockNumber block, int line);
+ void client_unlock(BlockNumber block, int line);
+
bool alloc_logbuffer_memory(Ptr<Logfile_group>, Uint32 pages);
void init_logbuffer_pointers(Ptr<Logfile_group>);
void free_logbuffer_memory(Ptr<Logfile_group>);
@@ -308,12 +313,14 @@ private:
class Logfile_client {
Uint32 m_block; // includes instance
Lgman * m_lgman;
+ bool m_lock;
DEBUG_OUT_DEFINES(LGMAN);
public:
Uint32 m_logfile_group_id;
- Logfile_client() {}
- Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id);
+ Logfile_client(SimulatedBlock* block, Lgman*, Uint32 logfile_group_id,
+ bool lock = true);
+ ~Logfile_client();
struct Request
{
=== modified file 'storage/ndb/src/kernel/blocks/pgman.cpp'
--- a/storage/ndb/src/kernel/blocks/pgman.cpp 2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/pgman.cpp 2008-11-16 09:16:35 +0000
@@ -1390,6 +1390,7 @@ Pgman::pageout(Signal* signal, Ptr<Page_
Logfile_client::Request req;
req.m_callback.m_callbackData = ptr.i;
req.m_callback.m_callbackFunction = safe_cast(&Pgman::logsync_callback);
+ D("Logfile_client - pageout");
Logfile_client lgman(this, c_lgman, RNIL);
int ret = lgman.sync_lsn(signal, ptr.p->m_lsn, &req, 0);
if (ret > 0)
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-16 09:16:35 +0000
@@ -43,7 +43,8 @@ Tsman::Tsman(Block_context& ctx) :
m_tablespace_list(m_tablespace_pool),
m_tablespace_hash(m_tablespace_pool),
m_pgman(0),
- m_lgman(0)
+ m_lgman(0),
+ m_client_mutex(2, true)
{
BLOCK_CONSTRUCTOR(Tsman);
@@ -85,10 +86,40 @@ Tsman::Tsman(Block_context& ctx) :
m_tablespace_hash.setSize(10);
m_file_hash.setSize(10);
m_lcp_ongoing = false;
+
+ if (isNdbMtLqh()) {
+ jam();
+ int ret = m_client_mutex.create();
+ ndbrequire(ret == 0);
+ }
}
Tsman::~Tsman()
{
+ if (isNdbMtLqh()) {
+ (void)m_client_mutex.destroy();
+ }
+}
+
+void
+Tsman::client_lock(BlockNumber block, int line)
+{
+ if (isNdbMtLqh()) {
+ D("try lock" << hex << V(block) << dec << V(line));
+ int ret = m_client_mutex.lock();
+ ndbrequire(ret == 0);
+ D("got lock" << hex << V(block) << dec << V(line));
+ }
+}
+
+void
+Tsman::client_unlock(BlockNumber block, int line)
+{
+ if (isNdbMtLqh()) {
+ D("unlock" << hex << V(block) << dec << V(line));
+ int ret = m_client_mutex.unlock();
+ ndbrequire(ret == 0);
+ }
}
BLOCK_FUNCTIONS(Tsman)
@@ -2181,7 +2212,9 @@ Tablespace_client::get_tablespace_info(C
if(m_tsman->m_tablespace_hash.find(ts_ptr, m_tablespace_id))
{
Uint32 logfile_group_id = ts_ptr.p->m_logfile_group_id;
- Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id);
+ // ctor is used here only for logging
+ D("Logfile_client - get_tablespace_info");
+ Logfile_client lgman(m_tsman, m_tsman->m_lgman, logfile_group_id, false);
rep->tablespace.extent_size = ts_ptr.p->m_extent_size;
rep->tablespace.logfile_group_id = lgman.m_logfile_group_id;
return 0;
=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp 2008-11-15 16:00:08 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp 2008-11-16 09:16:35 +0000
@@ -22,6 +22,7 @@
#include <DLList.hpp>
#include <NodeBitmask.hpp>
#include <signaldata/GetTabInfo.hpp>
+#include <SafeMutex.hpp>
#include "lgman.hpp"
#include "pgman.hpp"
@@ -204,6 +205,10 @@ private:
Tablespace_hash m_tablespace_hash;
SimulatedBlock * m_pgman;
Lgman * m_lgman;
+
+ SafeMutex m_client_mutex;
+ void client_lock(BlockNumber block, int line);
+ void client_unlock(BlockNumber block, int line);
int open_file(Signal*, Ptr<Tablespace>, Ptr<Datafile>, CreateFileImplReq*,
SectionHandle* handle);
@@ -270,24 +275,42 @@ Tsman::calc_page_no_in_extent(Uint32 pag
class Tablespace_client
{
public:
+ Uint32 m_block;
Tsman * m_tsman;
Signal* m_signal;
Uint32 m_table_id;
Uint32 m_fragment_id;
Uint32 m_tablespace_id;
+ bool m_lock;
DEBUG_OUT_DEFINES(TSMAN);
public:
- Tablespace_client(Signal* signal, Tsman* tsman,
- Uint32 table, Uint32 fragment, Uint32 tablespaceId) {
+ Tablespace_client(Signal* signal, SimulatedBlock* block, Tsman* tsman,
+ Uint32 table, Uint32 fragment, Uint32 tablespaceId,
+ bool lock = true) {
+ Uint32 bno = block->number();
+ Uint32 ino = block->instance();
+ m_block= numberToBlock(bno, ino);
m_tsman= tsman;
m_signal= signal;
m_table_id= table;
m_fragment_id= fragment;
m_tablespace_id= tablespaceId;
+ m_lock = lock;
+
+ D("client ctor" << hex << V(m_block) << dec
+ << V(m_table_id) << V(m_fragment_id) << V(m_tablespace_id));
+ if (m_lock)
+ m_tsman->client_lock(m_block, 0);
}
- Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);
+ Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);//undef
+
+ ~Tablespace_client() {
+ D("client dtor" << hex << V(m_block));
+ if (m_lock)
+ m_tsman->client_unlock(m_block, 0);
+ }
/**
* Return >0 if success, no of pages in extent, sets key
=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2008-10-09 09:34:24 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2008-11-16 09:15:35 +0000
@@ -35,6 +35,7 @@ ADD_LIBRARY(ndbkernel STATIC
Rope.cpp
WOPool.cpp
GlobalData.cpp
+ SafeMutex.cpp
)
ADD_LIBRARY(ndbsched STATIC
TimeQueue.cpp
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2008-10-08 21:20:38 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2008-11-16 09:15:35 +0000
@@ -36,7 +36,8 @@ libkernel_a_SOURCES = \
ndbd_malloc.cpp \
ndbd_malloc_impl.cpp \
Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
- GlobalData.cpp
+ GlobalData.cpp \
+ SafeMutex.cpp
libsched_a_SOURCES = TimeQueue.cpp \
ThreadConfig.cpp \
@@ -58,13 +59,14 @@ EXTRA_DIST=SimulatedBlock.cpp Transporte
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
-LDADD_LOC = $(top_builddir)/ndb/src/common/util/libgeneral.la
+LDADD_LOC = $(top_builddir)/storage/ndb/src/common/util/libgeneral.la
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
EXTRA_PROGRAMS = ndbd_malloc_impl_test bench_pool testDynArr256 \
- testSectionReader testLongSignals
+ testSectionReader testLongSignals \
+ testSafeMutex
ndbd_malloc_impl_test_CXXFLAGS = -DUNIT_TEST
ndbd_malloc_impl_test_SOURCES = ndbd_malloc_impl.cpp
@@ -108,3 +110,13 @@ testLongSignals_LDFLAGS = @ndb_bin_am_ld
$(top_builddir)/strings/libmystringslt.la \
@readline_link@ @TERMCAP_LIB@
+testSafeMutex_CXXFLAGS = -DUNIT_TEST
+testSafeMutex_SOURCES = SafeMutex.cpp
+testSafeMutex_LDFLAGS = @ndb_bin_am_ldflags@ \
+ $(top_builddir)/storage/ndb/src/libndbclient.la \
+ $(top_builddir)/mysys/libmysyslt.la \
+ $(top_builddir)/dbug/libdbuglt.la \
+ $(top_builddir)/strings/libmystringslt.la \
+ @readline_link@ @TERMCAP_LIB@
+
+
=== added file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-11-16 09:15:35 +0000
@@ -0,0 +1,207 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "SafeMutex.hpp"
+
+//wl4391_todo
+#define HAVE_PTHREAD_MUTEX_RECURSIVE 1
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& dm)
+{
+ out << "level=" << dm.m_level << "," << "usage=" << dm.m_usage;
+ return out;
+}
+
+int
+SafeMutex::create()
+{
+ if (m_init)
+ return ErrState;
+ int ret = -1;
+#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
+ if (m_limit > 1 || m_debug) {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+ ret = pthread_mutex_init(&m_mutex, &attr);
+ } else {
+ // error-check mutex does not work right on my linux, skip it
+ ret = pthread_mutex_init(&m_mutex, 0);
+ }
+#else
+ if (m_limit > 1 || m_debug)
+ return ErrUnsupp;
+ ret = pthread_mutex_init(&m_mutex, 0);
+#endif
+ if (ret != 0)
+ return ret;
+ m_init = true;
+ return 0;
+}
+
+int
+SafeMutex::destroy()
+{
+ if (!m_init)
+ return ErrState;
+ int ret = pthread_mutex_destroy(&m_mutex);
+ if (ret != 0)
+ return ret;
+ m_init = false;
+ return 0;
+}
+
+int
+SafeMutex::lock()
+{
+ pthread_t self = pthread_self();
+ int ret = pthread_mutex_lock(&m_mutex);
+ /* have mutex */
+ if (ret != 0)
+ return ret;
+ if (!(m_level < m_limit))
+ return ErrLevel;
+ m_level++;
+ if (m_level > m_usage)
+ m_usage = m_level;
+ if (m_level == 1 && m_owner != 0)
+ return ErrOwner1;
+ if (m_level >= 2 && m_owner != self)
+ return ErrOwner2;
+ m_owner = self;
+ return 0;
+}
+
+int
+SafeMutex::unlock()
+{
+ pthread_t self = pthread_self();
+ if (!(m_level > 0))
+ return ErrState;
+ if (m_owner != self)
+ return ErrOwner3;
+ if (m_level == 1)
+ m_owner = 0;
+ m_level--;
+ int ret = pthread_mutex_unlock(&m_mutex);
+ /* lose mutex */
+ if (ret != 0)
+ return ret;
+ return 0;
+}
+
+#ifdef UNIT_TEST
+
+struct sm_thr {
+ SafeMutex* sm_ptr;
+ uint index;
+ uint loops;
+ uint limit;
+ pthread_t id;
+ sm_thr() : sm_ptr(0), index(0), loops(0), limit(0), id(0) {}
+ ~sm_thr() {}
+};
+
+extern "C" { static void* sm_run(void* arg); }
+
+static void*
+sm_run(void* arg)
+{
+ sm_thr& thr = *(sm_thr*)arg;
+ assert(thr.sm_ptr != 0);
+ SafeMutex& sm = *thr.sm_ptr;
+ uint level = 0;
+ int dir = 0;
+ uint i;
+ for (i = 0; i < thr.loops; i++) {
+ int op = 0;
+ uint sel = uint(random()) % 10;
+ if (level == 0) {
+ dir = +1;
+ op = +1;
+ } else if (level == thr.limit) {
+ dir = -1;
+ op = -1;
+ } else if (dir == +1) {
+ op = sel != 0 ? +1 : -1;
+ } else if (dir == -1) {
+ op = sel != 0 ? -1 : +1;
+ } else {
+ assert(false);
+ }
+ if (op == +1) {
+ assert(level < thr.limit);
+ int ret = sm.lock();
+ assert(ret == 0);
+ level++;
+ } else if (op == -1) {
+ int ret = sm.unlock();
+ assert(ret == 0);
+ assert(level != 0);
+ level--;
+ } else {
+ assert(false);
+ }
+ }
+ while (level > 0) {
+ int ret = sm.unlock();
+ assert(ret == 0);
+ level--;
+ }
+}
+
+int
+main(int argc, char** argv)
+{
+ const uint max_thr = 128;
+ struct sm_thr thr[max_thr];
+
+ // threads - loops - max level
+ uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
+ assert(num_thr != 0 && num_thr <= max_thr);
+ uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
+ uint limit = argc > 3 ? atoi(argv[3]) : 10;
+ assert(limit != 0);
+
+ ndbout << "threads=" << num_thr;
+ ndbout << " loops=" << loops;
+ ndbout << " max level=" << limit << endl;
+
+ SafeMutex sm(limit, true);
+ int ret;
+ ret = sm.create();
+ assert(ret == 0);
+
+ uint i;
+ for (i = 0; i < num_thr; i++) {
+ thr[i].sm_ptr = &sm;
+ thr[i].index = i;
+ thr[i].loops = loops;
+ thr[i].limit = limit;
+ pthread_create(&thr[i].id, 0, &sm_run, &thr[i]);
+ ndbout << "create " << i << " id=" << thr[i].id << endl;
+ }
+ for (i = 0; i < num_thr; i++) {
+ void* value;
+ pthread_join(thr[i].id, &value);
+ ndbout << "join " << i << " id=" << thr[i].id << endl;
+ }
+
+ ret = sm.destroy();
+ assert(ret == 0);
+ return 0;
+}
+
+#endif
=== added file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-11-16 09:15:35 +0000
@@ -0,0 +1,76 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_SAFE_MUTEX_HPP
+#define NDB_SAFE_MUTEX_HPP
+
+#include <pthread.h>
+#include <assert.h>
+#include <ndb_types.h>
+#include <NdbOut.hpp>
+
+/*
+ * Recursive mutex with a recursion limit >= 1. Can be useful for
+ * debugging. If a recursive mutex is not wanted, one must rewrite
+ * caller code until limit 1 works.
+ *
+ * Implementation for limit > 1 uses a real OS recursive mutex. Should
+ * work on linux and solaris 10. There is a unit test testSafeMutex.
+ *
+ * The caller currently is multi-threaded disk data. Here it is easy
+ * to verify that the mutex is released within a time-slice.
+ */
+
+class SafeMutex {
+ pthread_mutex_t m_mutex;
+ pthread_t m_owner;
+ bool m_init;
+ Uint32 m_level;
+ Uint32 m_usage; // max level used so far
+ const Uint32 m_limit; // error if usage exceeds this
+ const bool m_debug; // use recursive mutex even for limit 1
+ friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
+
+public:
+ SafeMutex(Uint32 limit, bool debug) :
+ m_limit(limit),
+ m_debug(debug)
+ {
+ assert(m_limit >= 1),
+ m_owner = 0; // wl4391_todo assuming numeric non-zero
+ m_init = false;
+ m_level = 0;
+ m_usage = 0;
+ };
+ ~SafeMutex() {
+ (void)destroy();
+ }
+
+ enum {
+ // caller must crash on any error
+ ErrUnsupp = -101, // limit > 1 or debug, and not supported by OS
+ ErrState = -102, // user error
+ ErrLevel = -103, // level exceeded limit
+ ErrOwner1 = -104, // owner not 0 at first lock (OS error)
+ ErrOwner2 = -105, // owner not self at recursive lock (OS error)
+ ErrOwner3 = -106 // owner not self at unlock (OS error)
+ };
+ int create();
+ int destroy();
+ int lock();
+ int unlock();
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-16 09:16:35 +0000
@@ -110,6 +110,7 @@ class SimulatedBlock {
friend class Page_cache_client;
friend class Lgman;
friend class Logfile_client;
+ friend class Tablespace_client;
friend struct Pool_context;
friend struct SectionHandle;
friend class LockQueue;
| Thread |
|---|
| • bzr push into mysql-5.1 branch (pekka:3084 to 3086) WL#4391 | Pekka Nousiainen | 16 Nov |