From: Date: August 3 2008 8:22pm Subject: bzr commit into mysql-5.1-telco-6.4 branch (jonas:2680) List-Archive: http://lists.mysql.com/commits/50846 Message-Id: <20080803182245.4C9A16C454@eel.hemma.oreland.se> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///home/jonas/bzr/telco-6.4/ 2680 jonas oreland 2008-08-03 [merge] merge bk-internal:telco-6.4 into eel:telco-6.4 removed: storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp added: storage/ndb/src/kernel/blocks/LocalProxy.cpp storage/ndb/src/kernel/blocks/LocalProxy.hpp storage/ndb/src/kernel/blocks/RestoreProxy.cpp storage/ndb/src/kernel/blocks/RestoreProxy.hpp storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp storage/ndb/src/kernel/vm/GlobalData.cpp storage/ndb/src/kernel/vm/dummy_nonmt.cpp modified: mysql-test/suite/ndb/r/ndb_restore.result mysql-test/suite/ndb/r/ndb_restore_compressed.result mysql-test/suite/ndb/t/ndb_restore.test storage/ndb/include/debugger/SignalLoggerManager.hpp storage/ndb/include/kernel/RefConvert.hpp storage/ndb/include/kernel/kernel_config_parameters.h storage/ndb/include/kernel/kernel_types.h storage/ndb/include/kernel/ndb_limits.h storage/ndb/src/common/debugger/SignalLoggerManager.cpp storage/ndb/src/common/logger/Logger.cpp storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/Makefile.am storage/ndb/src/kernel/blocks/backup/Backup.hpp storage/ndb/src/kernel/blocks/backup/BackupInit.cpp storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp storage/ndb/src/kernel/blocks/restore.cpp storage/ndb/src/kernel/blocks/restore.hpp storage/ndb/src/kernel/main.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/GlobalData.hpp storage/ndb/src/kernel/vm/Makefile.am storage/ndb/src/kernel/vm/SimulatedBlock.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt.hpp storage/ndb/src/mgmapi/mgmapi.cpp storage/ndb/src/mgmclient/CommandInterpreter.cpp storage/ndb/src/mgmsrv/Config.cpp storage/ndb/src/mgmsrv/Config.hpp storage/ndb/src/mgmsrv/ConfigInfo.cpp storage/ndb/src/mgmsrv/ConfigInfo.hpp storage/ndb/src/mgmsrv/InitConfigFileParser.cpp storage/ndb/src/mgmsrv/InitConfigFileParser.hpp storage/ndb/src/mgmsrv/MgmtSrvr.cpp storage/ndb/src/mgmsrv/MgmtSrvr.hpp storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp storage/ndb/src/mgmsrv/Services.cpp storage/ndb/src/mgmsrv/Services.hpp storage/ndb/src/mgmsrv/main.cpp storage/ndb/src/ndbapi/SignalSender.cpp storage/ndb/src/ndbapi/SignalSender.hpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/test/include/NDBT_Test.hpp storage/ndb/test/include/NdbMgmd.hpp storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp storage/ndb/test/ndbapi/testBasic.cpp storage/ndb/test/ndbapi/testBasicAsynch.cpp storage/ndb/test/ndbapi/testMgm.cpp storage/ndb/test/ndbapi/testOIBasic.cpp storage/ndb/test/ndbapi/testPartitioning.cpp storage/ndb/test/ndbapi/test_event.cpp storage/ndb/test/src/NDBT_Tables.cpp storage/ndb/test/src/NDBT_Test.cpp storage/ndb/tools/restore/restore_main.cpp === modified file 'mysql-test/suite/ndb/r/ndb_restore.result' --- a/mysql-test/suite/ndb/r/ndb_restore.result 2008-05-29 11:30:21 +0000 +++ b/mysql-test/suite/ndb/r/ndb_restore.result 2008-07-24 10:57:58 +0000 @@ -574,8 +574,29 @@ SELECT @the_backup_id:=backup_id FROM te @the_backup_id:=backup_id DROP TABLE test.backup_info; +CREATE TABLE t11_c ( +c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; +CREATE TABLE t12_c ( +c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; +INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3, "eeeee","fffff"); +INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6, "kkkkk","lllll"); +CREATE TEMPORARY TABLE IF NOT EXISTS test.backup_info (id INT, backup_id INT) ENGINE = HEAP; +DELETE FROM test.backup_info; +LOAD DATA INFILE '../tmp.dat' INTO TABLE test.backup_info FIELDS TERMINATED BY ','; +SELECT @the_backup_id:=backup_id FROM test.backup_info; +@the_backup_id:=backup_id + +DROP TABLE test.backup_info; +drop table t2_c,t11_c,t12_c; +SELECT * FROM t11_c ORDER BY c1; +c1 c2 c3 +1 aaaaa bbbbb +2 ccccc ddddd +3 eeeee fffff drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11; -drop table if exists t2_c; +drop table if exists t2_c,t11_c,t12_c; 520093696, select epoch from mysql.ndb_apply_status where server_id=0; epoch === modified file 'mysql-test/suite/ndb/r/ndb_restore_compressed.result' --- a/mysql-test/suite/ndb/r/ndb_restore_compressed.result 2008-05-29 11:30:21 +0000 +++ b/mysql-test/suite/ndb/r/ndb_restore_compressed.result 2008-07-24 10:57:58 +0000 @@ -574,8 +574,29 @@ SELECT @the_backup_id:=backup_id FROM te @the_backup_id:=backup_id DROP TABLE test.backup_info; +CREATE TABLE t11_c ( +c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; +CREATE TABLE t12_c ( +c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; +INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3, "eeeee","fffff"); +INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6, "kkkkk","lllll"); +CREATE TEMPORARY TABLE IF NOT EXISTS test.backup_info (id INT, backup_id INT) ENGINE = HEAP; +DELETE FROM test.backup_info; +LOAD DATA INFILE '../tmp.dat' INTO TABLE test.backup_info FIELDS TERMINATED BY ','; +SELECT @the_backup_id:=backup_id FROM test.backup_info; +@the_backup_id:=backup_id + +DROP TABLE test.backup_info; +drop table t2_c,t11_c,t12_c; +SELECT * FROM t11_c ORDER BY c1; +c1 c2 c3 +1 aaaaa bbbbb +2 ccccc ddddd +3 eeeee fffff drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11; -drop table if exists t2_c; +drop table if exists t2_c,t11_c,t12_c; 520093696, select epoch from mysql.ndb_apply_status where server_id=0; epoch === modified file 'mysql-test/suite/ndb/t/ndb_restore.test' --- a/mysql-test/suite/ndb/t/ndb_restore.test 2008-06-18 21:19:57 +0000 +++ b/mysql-test/suite/ndb/t/ndb_restore.test 2008-07-24 10:57:58 +0000 @@ -435,13 +435,31 @@ drop table t1_c,t3_c,t4_c,t5_c,t6_c,t7_c --source include/ndb_backup.inc --exec $NDB_TOOLS_DIR/ndb_restore --no-defaults --core=0 -b $the_backup_id -n 1 -m -r --ndb-nodegroup_map '(0,1)' $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id 2>&1 | grep Translate || true +CREATE TABLE t11_c ( + c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; + +CREATE TABLE t12_c ( + c1 int primary key, c2 char(10), c3 varchar(10) +) ENGINE=ndbcluster DEFAULT CHARSET=latin1; + +INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3, "eeeee","fffff"); +INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6, "kkkkk","lllll"); +--source include/ndb_backup.inc +drop table t2_c,t11_c,t12_c; +# Only part of tables is restored, it should work +--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b $the_backup_id -n 1 -m -r --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id test t11_c >> $NDB_TOOLS_OUTPUT +--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b $the_backup_id -n 2 -r --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id test t11_c >> $NDB_TOOLS_OUTPUT +#Should only t11_c is restored +SELECT * FROM t11_c ORDER BY c1; + # # Cleanup # --disable_warnings drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11; -drop table if exists t2_c; +drop table if exists t2_c,t11_c,t12_c; --enable_warnings # === modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp' --- a/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-07-25 05:48:32 +0000 @@ -26,6 +26,7 @@ #include #include #include +#include class SignalLoggerManager { @@ -187,11 +188,17 @@ private: Uint32 traceId; Uint8 logModes[NO_OF_BLOCKS]; + + NdbMutex* m_mutex; + void lock() { if (m_mutex != 0) NdbMutex_Lock(m_mutex); } + void unlock() { if (m_mutex != 0) NdbMutex_Unlock(m_mutex); } public: inline bool logMatch(BlockNumber bno, LogMode mask) { + // extract main block number + bno = blockToMain(bno); // avoid addressing outside logModes return bno < MIN_BLOCK_NO || bno > MAX_BLOCK_NO || === modified file 'storage/ndb/include/kernel/RefConvert.hpp' --- a/storage/ndb/include/kernel/RefConvert.hpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/include/kernel/RefConvert.hpp 2008-07-25 05:48:32 +0000 @@ -16,30 +16,82 @@ #ifndef REFCONVERT_H #define REFCONVERT_H +#include #include "kernel_types.h" +/* + * In multithreaded kernel, BlockNumber includes the main block + * number in lower 9 bits and the instance in upper 7 bits. + */ + +inline +BlockNumber blockToMain(Uint32 block){ + assert(block < (1 << 16)); + return (BlockNumber)(block & ((1 << 9) - 1)); +} + +inline +BlockInstance blockToInstance(Uint32 block){ + assert(block < (1 << 16)); + return (BlockNumber)(block >> 9); +} + +inline +BlockNumber numberToBlock(Uint32 main, Uint32 instance) +{ + assert(main < (1 << 9) && instance < (1 << (16 - 9))); + return (BlockNumber)(main | (instance << 9)); +} + +/** + * Convert BlockReference to NodeId + */ +inline +NodeId refToNode(Uint32 ref){ + return (NodeId)(ref & ((1 << 16) - 1)); +} + /** - * Convert BlockReference to BlockNumber + * Convert BlockReference to full 16-bit BlockNumber. */ inline -BlockNumber refToBlock(BlockReference ref){ +BlockNumber refToBlock(Uint32 ref){ return (BlockNumber)(ref >> 16); } /** - * Convert BlockReference to NodeId + * Convert BlockReference to main BlockNumber. + * Used in tests such as: refToMain(senderRef) == DBTC. + */ +inline +BlockNumber refToMain(Uint32 ref){ + return (BlockNumber)((ref >> 16) & ((1 << 9) - 1)); +} + +/** + * Convert BlockReference to BlockInstance. */ inline -NodeId refToNode(BlockReference ref){ - return (NodeId)(ref & 0xFFFF); +BlockInstance refToInstance(Uint32 ref){ + return (BlockInstance)(ref >> (16 + 9)); } /** * Convert NodeId and BlockNumber to BlockReference */ inline -BlockReference numberToRef(BlockNumber bnr, NodeId proc){ - return (((Uint32)bnr) << 16) + proc; +BlockReference numberToRef(Uint32 block, Uint32 node){ + assert(node < (1 << 16) && block < (1 << 16)); + return (BlockReference)(node | (block << 16)); +} + +/** + * Convert NodeId and block main and instance to BlockReference + */ +inline +BlockReference numberToRef(Uint32 main, Uint32 instance, Uint32 node){ + assert(node < (1 << 16) && main < (1 << 9) && instance < (1 << (16 - 9))); + return (BlockReference)(node | (main << 16) | (instance << (16 + 9))); } #endif === modified file 'storage/ndb/include/kernel/kernel_config_parameters.h' --- a/storage/ndb/include/kernel/kernel_config_parameters.h 2006-12-31 00:32:21 +0000 +++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2008-07-25 05:48:32 +0000 @@ -63,4 +63,7 @@ #define CFG_TUX_ATTRIBUTE (PRIVATE_BASE + 42) #define CFG_TUX_SCAN_OP (PRIVATE_BASE + 43) +#define CFG_NDBMT_WORKERS (PRIVATE_BASE + 44) +#define CFG_NDBMT_THREADS (PRIVATE_BASE + 45) + #endif === modified file 'storage/ndb/include/kernel/kernel_types.h' --- a/storage/ndb/include/kernel/kernel_types.h 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/include/kernel/kernel_types.h 2008-07-25 05:48:32 +0000 @@ -22,6 +22,7 @@ typedef Uint16 NodeId; typedef Uint16 BlockNumber; +typedef Uint16 BlockInstance; typedef Uint32 BlockReference; typedef Uint16 GlobalSignalNumber; === modified file 'storage/ndb/include/kernel/ndb_limits.h' --- a/storage/ndb/include/kernel/ndb_limits.h 2008-06-18 09:01:43 +0000 +++ b/storage/ndb/include/kernel/ndb_limits.h 2008-07-25 05:48:32 +0000 @@ -178,7 +178,10 @@ */ #define NDBMT_BLOCK_BITS 9 #define NDBMT_BLOCK_MASK 0x001FF -#define NDBMT_BLOCK_INSTANCES_BITS 7 +#define NDBMT_BLOCK_INSTANCE_BITS 7 #define NDBMT_BLOCK_INSTANCE_MASK 0xFE00 +#define MAX_NDBMT_WORKERS 4 +#define MAX_NDBMT_THREADS 4 + #endif === removed file 'storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp' --- a/storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp 1970-01-01 00:00:00 +0000 @@ -1,67 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -//****************************************************************************** -// Description: This file contains the error reporting macros to be used -// within management server. -// -// Author: Peter Lind -//****************************************************************************** - - -#include // exit -#include - -#define REPORT_WARNING(message) \ - ndbout << "WARNING: " << message << endl - -//**************************************************************************** -// Description: Report a warning, the message is printed on ndbout. -// Parameters: -// message: A text describing the warning. -// Returns: - -//**************************************************************************** - - -#define REPORT_ERROR(message) \ - ndbout << "ERROR: " << message << endl - -//**************************************************************************** -// Description: Report an error, the message is printed on ndbout. -// Parameters: -// message: A text describing the error. -// Returns: - -//**************************************************************************** - - -#ifdef MGMT_TRACE - -#define TRACE(message) \ - ndbout << "MGMT_TRACE: " << message << endl -#else -#define TRACE(message) - -#endif - -//**************************************************************************** -// Description: Print a message on ndbout. -// Parameters: -// message: The message -// Returns: - -//**************************************************************************** - -#define MGM_REQUIRE(x) \ - if (!(x)) { ndbout << __FILE__ << " " << __LINE__ \ - << ": Warning! Requirement failed" << endl; } === modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp' --- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-07-25 05:48:32 +0000 @@ -17,8 +17,22 @@ #include "SignalLoggerManager.hpp" #include - +#include #include +#include + +// avoids MT log mixups but has some serializing effect +static const bool g_use_mutex = true; + +static char* mytime() +{ + NDB_TICKS t = NdbTick_CurrentMillisecond(); + uint s = (t / 1000) % 3600; + uint ms = t % 1000; + static char buf[100]; + sprintf(buf, "%u.%03u", s, ms); + return buf; +} SignalLoggerManager::SignalLoggerManager() { @@ -28,6 +42,9 @@ SignalLoggerManager::SignalLoggerManager outputStream = 0; m_ownNodeId = 0; m_logDistributed = false; + m_mutex = 0; + if (g_use_mutex) + m_mutex = NdbMutex_Create(); } SignalLoggerManager::~SignalLoggerManager() @@ -37,6 +54,10 @@ SignalLoggerManager::~SignalLoggerManage fclose(outputStream); outputStream = 0; } + if (m_mutex != 0) { + NdbMutex_Destroy(m_mutex); + m_mutex = 0; + } } FILE * @@ -133,7 +154,7 @@ SignalLoggerManager::log(LogMode logMode count == 0){ for (int number = 0; number < NO_OF_BLOCKS; ++number){ - cnt += log(SLM_ON, number, logMode); + cnt += log(SLM_ON, MIN_BLOCK_NO + number, logMode); } } else { for (int i = 0; i < count; ++i){ @@ -220,15 +241,17 @@ SignalLoggerManager::executeDirect(const if(outputStream != 0 && (traceId == 0 || traceId == trace) && (logMatch(senderBlockNo, LogOut) || logMatch(receiverBlockNo, LogIn))){ + lock(); const char* inOutStr = prio == 0 ? "In" : "Out"; #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Direct --- Signal --- %s - %d ----\n", inOutStr, time(0)); + fprintf(outputStream, "---- Direct --- Signal --- %s - %s ----\n", inOutStr, mytime()); #else fprintf(outputStream, "---- Direct --- Signal --- %s ----------------\n", inOutStr); #endif // XXX pass in/out to print* function somehow printSignalHeader(outputStream, sh, 0, node, true); printSignalData(outputStream, sh, theData); + unlock(); } } @@ -249,8 +272,9 @@ SignalLoggerManager::executeSignal(const (traceId == 0 || traceId == trace) && (logMatch(receiverBlockNo, LogOut) || (m_logDistributed && m_ownNodeId != senderNode))){ + lock(); #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0)); + fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime()); #else fprintf(outputStream, "---- Received - Signal ----------------\n"); #endif @@ -259,6 +283,7 @@ SignalLoggerManager::executeSignal(const printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printSegmentedSection(outputStream, sh, ptr, i); + unlock(); } } @@ -276,8 +301,9 @@ SignalLoggerManager::executeSignal(const (traceId == 0 || traceId == trace) && (logMatch(receiverBlockNo, LogOut) || (m_logDistributed && m_ownNodeId != senderNode))){ + lock(); #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0)); + fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime()); #else fprintf(outputStream, "---- Received - Signal ----------------\n"); #endif @@ -286,6 +312,7 @@ SignalLoggerManager::executeSignal(const printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printLinearSection(outputStream, sh, ptr, i); + unlock(); } } @@ -306,8 +333,9 @@ SignalLoggerManager::sendSignal(const Si (traceId == 0 || traceId == trace) && (logMatch(senderBlockNo, LogOut) || (m_logDistributed && m_ownNodeId != node))){ + lock(); #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0)); + fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime()); #else fprintf(outputStream, "---- Send ----- Signal ----------------\n"); #endif @@ -316,6 +344,7 @@ SignalLoggerManager::sendSignal(const Si printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printLinearSection(outputStream, sh, ptr, i); + unlock(); } } @@ -335,8 +364,9 @@ SignalLoggerManager::sendSignal(const Si (traceId == 0 || traceId == trace) && (logMatch(senderBlockNo, LogOut) || (m_logDistributed && m_ownNodeId != node))){ + lock(); #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0)); + fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime()); #else fprintf(outputStream, "---- Send ----- Signal ----------------\n"); #endif @@ -345,6 +375,7 @@ SignalLoggerManager::sendSignal(const Si printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printSegmentedSection(outputStream, sh, ptr, i); + unlock(); } } @@ -362,8 +393,9 @@ SignalLoggerManager::sendSignal(const Si (traceId == 0 || traceId == trace) && (logMatch(senderBlockNo, LogOut) || (m_logDistributed && m_ownNodeId != node))){ + lock(); #ifdef VM_TRACE_TIME - fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0)); + fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime()); #else fprintf(outputStream, "---- Send ----- Signal ----------------\n"); #endif @@ -372,6 +404,7 @@ SignalLoggerManager::sendSignal(const Si printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printGenericSection(outputStream, sh, ptr, i); + unlock(); } } @@ -388,11 +421,12 @@ SignalLoggerManager::sendSignalWithDelay if(outputStream != 0 && (traceId == 0 || traceId == trace) && logMatch(senderBlockNo, LogOut)){ + lock(); #ifdef VM_TRACE_TIME fprintf(outputStream, - "---- Send ----- Signal (%d ms) %d\n", + "---- Send ----- Signal (%d ms) %s\n", delayInMilliSeconds, - time(0)); + mytime()); #else fprintf(outputStream, "---- Send delay Signal (%d ms) ----------\n", delayInMilliSeconds); @@ -402,6 +436,7 @@ SignalLoggerManager::sendSignalWithDelay printSignalData(outputStream, sh, theData); for (unsigned i = 0; i < secs; i++) printSegmentedSection(outputStream, sh, ptr, i); + unlock(); } } @@ -417,15 +452,40 @@ SignalLoggerManager::log(BlockNumber bno if(outputStream != 0 && logModes[bno2] != LogOff){ + lock(); va_list ap; va_start(ap, msg); fprintf(outputStream, "%s: ", getBlockName(bno, "API")); vfprintf(outputStream, msg, ap); fprintf(outputStream, "\n"); va_end(ap); + unlock(); } } +static inline bool +isSysBlock(BlockNumber block, Uint32 gsn) +{ + if (block != 0) + return false; + switch (gsn) { + case GSN_START_ORD: + return true; // first sig + case GSN_CONNECT_REP: + case GSN_DISCONNECT_REP: + case GSN_EVENT_REP: + return true; // transporter + case GSN_STOP_FOR_CRASH: + return true; // mt scheduler + } + return false; +} + +static inline bool +isApiBlock(BlockNumber block) +{ + return block >= 0x8000 || block == 4002 || block == 2047; +} void SignalLoggerManager::printSignalHeader(FILE * output, @@ -434,36 +494,79 @@ SignalLoggerManager::printSignalHeader(F Uint32 node, bool printReceiversSignalId) { - Uint32 receiverBlockNo = sh.theReceiversBlockNumber; + const char* const dummy_block_name = "UUNET"; + + bool receiverIsApi = isApiBlock(sh.theReceiversBlockNumber); + Uint32 receiverBlockNo; + Uint32 receiverInstanceNo; + if (!receiverIsApi) { + receiverBlockNo = blockToMain(sh.theReceiversBlockNumber); + receiverInstanceNo = blockToInstance(sh.theReceiversBlockNumber); + } else { + receiverBlockNo = sh.theReceiversBlockNumber; + receiverInstanceNo = 0; + } Uint32 receiverProcessor = node; + Uint32 gsn = sh.theVerId_signalNumber; - Uint32 senderBlockNo = refToBlock(sh.theSendersBlockRef); - Uint32 senderProcessor = refToNode(sh.theSendersBlockRef); + + Uint32 sbref = sh.theSendersBlockRef; + bool senderIsSys = isSysBlock(refToBlock(sbref), gsn); + bool senderIsApi = isApiBlock(refToBlock(sbref)); + Uint32 senderBlockNo; + Uint32 senderInstanceNo; + if (!senderIsSys && !senderIsApi) { + senderBlockNo = refToMain(sbref); + senderInstanceNo = refToInstance(sbref); + } else { + senderBlockNo = refToBlock(sbref); + senderInstanceNo = 0; + } + Uint32 senderProcessor = refToNode(sbref); + Uint32 length = sh.theLength; Uint32 trace = sh.theTrace; Uint32 rSigId = sh.theSignalId; Uint32 sSigId = sh.theSendersSignalId; const char * signalName = getSignalName(gsn); - const char * rBlockName = getBlockName(receiverBlockNo, "API"); - const char * sBlockName = getBlockName(senderBlockNo, "API"); - - if(printReceiversSignalId) + const char * rBlockName = + receiverIsApi ? "API" : + getBlockName(receiverBlockNo, dummy_block_name); + const char * sBlockName = + senderIsSys ? "SYS" : + senderIsApi ? "API" : + getBlockName(senderBlockNo, dummy_block_name); + + char rInstanceText[20]; + char sInstanceText[20]; + rInstanceText[0] = 0; + sInstanceText[0] = 0; + if (receiverInstanceNo != 0) + sprintf(rInstanceText, "/%u", (uint)receiverInstanceNo); + if (senderInstanceNo != 0) + sprintf(sInstanceText, "/%u", (uint)senderInstanceNo); +// wl4391_todo senders instance missing +// assert(gsn != GSN_LQHKEYREQ || receiverProcessor == senderProcessor || senderInstanceNo != 0); + if (printReceiversSignalId) fprintf(output, - "r.bn: %d \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n" - ,receiverBlockNo, rBlockName, receiverProcessor, rSigId, - gsn, signalName, prio); + "r.bn: %d%s \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n" + ,receiverBlockNo, rInstanceText, rBlockName, receiverProcessor, + rSigId, gsn, signalName, prio); else fprintf(output, - "r.bn: %d \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n", - receiverBlockNo, rBlockName, receiverProcessor, gsn, - signalName, prio); + "r.bn: %d%s \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n", + receiverBlockNo, rInstanceText, rBlockName, receiverProcessor, + gsn, signalName, prio); fprintf(output, - "s.bn: %d \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d " + "s.bn: %d%s \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d " "#sec: %d fragInf: %d\n", - senderBlockNo, sBlockName, senderProcessor, sSigId, length, trace, - sh.m_noOfSections, sh.m_fragmentInfo); + senderBlockNo, sInstanceText, sBlockName, senderProcessor, + sSigId, length, trace, sh.m_noOfSections, sh.m_fragmentInfo); + + //assert(strcmp(rBlockName, dummy_block_name) != 0); + //assert(strcmp(sBlockName, dummy_block_name) != 0); } void === modified file 'storage/ndb/src/common/logger/Logger.cpp' --- a/storage/ndb/src/common/logger/Logger.cpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/src/common/logger/Logger.cpp 2008-07-29 13:49:59 +0000 @@ -222,6 +222,7 @@ Logger::addHandler(const BaseString &log *err= handler->getErrorCode(); if(handler->getErrorStr()) strncpy(errStr, handler->getErrorStr(), len); + delete handler; DBUG_RETURN(false); } loghandlers.push_back(handler); @@ -240,6 +241,13 @@ Logger::removeHandler(LogHandler* pHandl int rc = false; if (pHandler != NULL) { + if (pHandler == m_pConsoleHandler) + m_pConsoleHandler= NULL; + if (pHandler == m_pFileHandler) + m_pFileHandler= NULL; + if (pHandler == m_pSyslogHandler) + m_pSyslogHandler= NULL; + rc = m_pHandlerList->remove(pHandler); } @@ -251,6 +259,10 @@ Logger::removeAllHandlers() { Guard g(m_mutex); m_pHandlerList->removeAll(); + + m_pConsoleHandler= NULL; + m_pFileHandler= NULL; + m_pSyslogHandler= NULL; } bool === modified file 'storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp' --- a/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp 2007-03-22 11:35:31 +0000 +++ b/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp 2008-06-10 20:06:47 +0000 @@ -24,7 +24,6 @@ #include #include -#include "MgmtErrorReporter.hpp" #include #include @@ -89,9 +88,11 @@ ConfigRetriever::~ConfigRetriever() { DBUG_ENTER("ConfigRetriever::~ConfigRetriever"); if (m_handle) { - if(m_end_session) - ndb_mgm_end_session(m_handle); - ndb_mgm_disconnect(m_handle); + if (ndb_mgm_is_connected(m_handle)) { + if(m_end_session) + ndb_mgm_end_session(m_handle); + ndb_mgm_disconnect(m_handle); + } ndb_mgm_destroy_handle(&m_handle); } DBUG_VOID_RETURN; @@ -160,15 +161,15 @@ ConfigRetriever::getConfig() { } ndb_mgm_configuration * -ConfigRetriever::getConfig(NdbMgmHandle m_handle_arg) +ConfigRetriever::getConfig(NdbMgmHandle mgm_handle) { - ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle_arg, + ndb_mgm_configuration * conf = ndb_mgm_get_configuration(mgm_handle, m_version); if(conf == 0) { - BaseString tmp(ndb_mgm_get_latest_error_msg(m_handle_arg)); + BaseString tmp(ndb_mgm_get_latest_error_msg(mgm_handle)); tmp.append(" : "); - tmp.append(ndb_mgm_get_latest_error_desc(m_handle_arg)); + tmp.append(ndb_mgm_get_latest_error_desc(mgm_handle)); setError(CR_ERROR, tmp.c_str()); return 0; } === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-07-22 13:42:54 +0000 @@ -266,12 +266,11 @@ TransporterRegistry::connect_server(NDB_ { DBUG_ENTER("TransporterRegistry::connect_server"); - // read node id from client - // read transporter type + // read node id and transporter type from client int nodeId, remote_transporter_type= -1; SocketInputStream s_input(sockfd); - char buf[256]; - if (s_input.gets(buf, 256) == 0) { + char buf[11+1+11+1]; // + if (s_input.gets(buf, sizeof(buf)) == 0) { DBUG_PRINT("error", ("Could not get node id from client")); DBUG_RETURN(false); } === modified file 'storage/ndb/src/kernel/SimBlockList.cpp' --- a/storage/ndb/src/kernel/SimBlockList.cpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/src/kernel/SimBlockList.cpp 2008-07-26 05:13:40 +0000 @@ -36,6 +36,13 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #ifndef VM_TRACE #define NEW_BLOCK(B) new B @@ -68,6 +75,9 @@ void * operator new (size_t sz, SIMBLOCK #define NEW_BLOCK(B) new(A_VALUE) B #endif +extern bool g_ndbMt; +extern bool g_ndbMtLqh; + void SimBlockList::load(EmulatorData& data){ noOfBlocks = NO_OF_BLOCKS; @@ -91,26 +101,44 @@ SimBlockList::load(EmulatorData& data){ fs = NEW_BLOCK(Ndbfs)(ctx); } } - + theList[0] = pg = NEW_BLOCK(Pgman)(ctx); theList[1] = lg = NEW_BLOCK(Lgman)(ctx); theList[2] = ts = NEW_BLOCK(Tsman)(ctx, pg, lg); - theList[3] = NEW_BLOCK(Dbacc)(ctx); + if (!g_ndbMtLqh) + theList[3] = NEW_BLOCK(Dbacc)(ctx); + else + theList[3] = NEW_BLOCK(DbaccProxy)(ctx); theList[4] = NEW_BLOCK(Cmvmi)(ctx); theList[5] = fs; theList[6] = dbdict = NEW_BLOCK(Dbdict)(ctx); theList[7] = dbdih = NEW_BLOCK(Dbdih)(ctx); - theList[8] = NEW_BLOCK(Dblqh)(ctx); + if (!g_ndbMtLqh) + theList[8] = NEW_BLOCK(Dblqh)(ctx); + else + theList[8] = NEW_BLOCK(DblqhProxy)(ctx); theList[9] = NEW_BLOCK(Dbtc)(ctx); - theList[10] = NEW_BLOCK(Dbtup)(ctx, pg); + if (!g_ndbMtLqh) + theList[10] = NEW_BLOCK(Dbtup)(ctx, pg); + else + theList[10] = NEW_BLOCK(DbtupProxy)(ctx);//wl4391_todo pg theList[11] = NEW_BLOCK(Ndbcntr)(ctx); theList[12] = NEW_BLOCK(Qmgr)(ctx); theList[13] = NEW_BLOCK(Trix)(ctx); - theList[14] = NEW_BLOCK(Backup)(ctx); + if (!g_ndbMtLqh) + theList[14] = NEW_BLOCK(Backup)(ctx); + else + theList[14] = NEW_BLOCK(BackupProxy)(ctx); theList[15] = NEW_BLOCK(DbUtil)(ctx); theList[16] = NEW_BLOCK(Suma)(ctx); - theList[17] = NEW_BLOCK(Dbtux)(ctx); - theList[18] = NEW_BLOCK(Restore)(ctx); + if (!g_ndbMtLqh) + theList[17] = NEW_BLOCK(Dbtux)(ctx); + else + theList[17] = NEW_BLOCK(DbtuxProxy)(ctx); + if (!g_ndbMtLqh) + theList[18] = NEW_BLOCK(Restore)(ctx); + else + theList[18] = NEW_BLOCK(RestoreProxy)(ctx); assert(NO_OF_BLOCKS == 19); } === added file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp' --- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,121 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include +#include "LocalProxy.hpp" + +LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) : + SimulatedBlock(blockNumber, ctx) +{ + BLOCK_CONSTRUCTOR(LocalProxy); + + ndbrequire(instance() == 0); // this is main block + c_workers = 0; + c_threads = 0; + Uint32 i; + for (i = 0; i < MaxWorkers; i++) + c_worker[i] = 0; + + // GSN_READ_CONFIG_REQ + addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true); + addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true); +} + +LocalProxy::~LocalProxy() +{ + // dtor of main block deletes workers +} + +// GSN_READ_CONFIG_REQ + +void +LocalProxy::execREAD_CONFIG_REQ(Signal* signal) +{ + Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ; + ndbrequire(!ss.m_active); + ss.m_active = true; + + const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr(); + ss.m_readConfigReq = *req; + ndbrequire(ss.m_readConfigReq.noOfParameters == 0); + + const Uint32 workers = globalData.ndbmtWorkers; + const Uint32 threads = globalData.ndbmtThreads; + + Uint32 i; + for (i = 0; i < workers; i++) { + const Uint32 instanceNo = 1 + i; + SimulatedBlock* worker = newWorker(instanceNo); + ndbrequire(worker->instance() == instanceNo); + ndbrequire(this->getInstance(instanceNo) == worker); + c_worker[i] = worker; + + add_worker_thr_map(number(), instanceNo); + } + + // set after instances are created (sendpacked) + c_workers = workers; + c_threads = threads; + + // run sequentially due to big mallocs and initializations + sendREAD_CONFIG_REQ(signal, 0); +} + +void +LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 i) +{ + Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ; + + ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend(); + req->senderRef = reference(); + req->senderData = i; + req->noOfParameters = 0; + sendSignal(workerRef(i), GSN_READ_CONFIG_REQ, + signal, ReadConfigReq::SignalLength, JBB); + // for verification only + ss.m_worker = i; +} + +void +LocalProxy::execREAD_CONFIG_CONF(Signal* signal) +{ + Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ; + ndbrequire(ss.m_active); + + const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr(); + ndbrequire(ss.m_worker == conf->senderData); + if (ss.m_worker + 1 < c_workers) { + jam(); + sendREAD_CONFIG_REQ(signal, ss.m_worker + 1); + return; + } + + sendREAD_CONFIG_CONF(signal); + ss.m_active = false; +} + +void +LocalProxy::sendREAD_CONFIG_CONF(Signal* signal) +{ + Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ; + + ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend(); + conf->senderRef = reference(); + conf->senderData = ss.m_readConfigReq.senderData; + sendSignal(ss.m_readConfigReq.senderRef, GSN_READ_CONFIG_CONF, + signal, ReadConfigConf::SignalLength, JBB); +} + +BLOCK_FUNCTIONS(LocalProxy) === added file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp' --- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,69 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_LOCAL_PROXY_HPP +#define NDB_LOCAL_PROXY_HPP + +#include +#include +#include + +/* + * Proxy blocks for MT LQH. + * + * The LQH proxy is the LQH block seen by other nodes and blocks, + * unless by-passed for efficiency. Real LQH instances (workers) + * run behind it. + * + * There are also ACC,TUP,TUX,BACKUP,RESTORE proxies and workers. + * All proxy classes are subclasses of LocalProxy. + */ + +class LocalProxy : public SimulatedBlock { +public: + LocalProxy(BlockNumber blockNumber, Block_context& ctx); + virtual ~LocalProxy(); + BLOCK_DEFINES(LocalProxy); + +protected: + enum { MaxWorkers = MAX_NDBMT_WORKERS }; + Uint32 c_workers; + Uint32 c_threads; + SimulatedBlock* c_worker[MaxWorkers]; + + virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0; + + // worker index to worker ref + BlockReference workerRef(Uint32 i) { + return numberToRef(number(), 1 + i, getOwnNodeId()); + } + + // GSN_READ_CONFIG_REQ + struct Ss_READ_CONFIG_REQ { + bool m_active; + Uint32 m_worker; + ReadConfigReq m_readConfigReq; + Ss_READ_CONFIG_REQ() : + m_active(false) + {} + }; + Ss_READ_CONFIG_REQ c_ss_READ_CONFIG_REQ; + void execREAD_CONFIG_REQ(Signal*); + void sendREAD_CONFIG_REQ(Signal*, Uint32 i); + void execREAD_CONFIG_CONF(Signal*); + void sendREAD_CONFIG_CONF(Signal*); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/Makefile.am' --- a/storage/ndb/src/kernel/blocks/Makefile.am 2008-04-25 16:35:50 +0000 +++ b/storage/ndb/src/kernel/blocks/Makefile.am 2008-07-26 05:13:40 +0000 @@ -54,7 +54,14 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp dbtux/DbtuxGen.cpp dbtux/DbtuxMeta.cpp dbtux/DbtuxMaint.cpp \ dbtux/DbtuxNode.cpp dbtux/DbtuxTree.cpp dbtux/DbtuxScan.cpp \ dbtux/DbtuxSearch.cpp dbtux/DbtuxCmp.cpp dbtux/DbtuxStat.cpp \ - dbtux/DbtuxDebug.cpp + dbtux/DbtuxDebug.cpp \ + LocalProxy.cpp \ + dblqh/DblqhProxy.cpp \ + dbacc/DbaccProxy.cpp \ + dbtup/DbtupProxy.cpp \ + dbtux/DbtuxProxy.cpp \ + backup/BackupProxy.cpp \ + RestoreProxy.cpp EXTRA_PROGRAMS = ndb_print_file ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp === added file 'storage/ndb/src/kernel/blocks/RestoreProxy.cpp' --- a/storage/ndb/src/kernel/blocks/RestoreProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/RestoreProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "RestoreProxy.hpp" +#include "restore.hpp" + +RestoreProxy::RestoreProxy(Block_context& ctx) : + LocalProxy(RESTORE, ctx) +{ +} + +RestoreProxy::~RestoreProxy() +{ +} + +SimulatedBlock* +RestoreProxy::newWorker(Uint32 instanceNo) +{ + return new Restore(m_ctx, instanceNo); +} + +BLOCK_FUNCTIONS(RestoreProxy) === added file 'storage/ndb/src/kernel/blocks/RestoreProxy.hpp' --- a/storage/ndb/src/kernel/blocks/RestoreProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/RestoreProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,31 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_RESTORE_PROXY_HPP +#define NDB_RESTORE_PROXY_HPP + +#include + +class RestoreProxy : public LocalProxy { +public: + RestoreProxy(Block_context& ctx); + virtual ~RestoreProxy(); + BLOCK_DEFINES(RestoreProxy); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-06-05 20:31:21 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-07-26 05:13:40 +0000 @@ -40,8 +40,10 @@ */ class Backup : public SimulatedBlock { + friend class BackupProxy; + public: - Backup(Block_context& ctx); + Backup(Block_context& ctx, Uint32 instanceNumber = 0); virtual ~Backup(); BLOCK_DEFINES(Backup); === modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp' --- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2008-06-05 20:31:21 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2008-07-26 05:13:40 +0000 @@ -26,8 +26,8 @@ //extern const unsigned Ndbcntr::g_sysTableCount; -Backup::Backup(Block_context& ctx) : - SimulatedBlock(BACKUP, ctx), +Backup::Backup(Block_context& ctx, Uint32 instanceNumber) : + SimulatedBlock(BACKUP, ctx, instanceNumber), c_nodes(c_nodePool), c_backups(c_backupPool) { === added file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp' --- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "BackupProxy.hpp" +#include "Backup.hpp" + +BackupProxy::BackupProxy(Block_context& ctx) : + LocalProxy(BACKUP, ctx) +{ +} + +BackupProxy::~BackupProxy() +{ +} + +SimulatedBlock* +BackupProxy::newWorker(Uint32 instanceNo) +{ + return new Backup(m_ctx, instanceNo); +} + +BLOCK_FUNCTIONS(BackupProxy) === added file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp' --- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,31 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_BACKUP_PROXY_HPP +#define NDB_BACKUP_PROXY_HPP + +#include + +class BackupProxy : public LocalProxy { +public: + BackupProxy(Block_context& ctx); + virtual ~BackupProxy(); + BLOCK_DEFINES(BackupProxy); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp' --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2008-07-26 05:13:40 +0000 @@ -268,6 +268,8 @@ ElementHeader::clearScanBit(Uint32 heade class Dbacc: public SimulatedBlock { + friend class DbaccProxy; + public: // State values enum State { @@ -625,7 +627,7 @@ struct Tabrec { typedef Ptr TabrecPtr; public: - Dbacc(Block_context&); + Dbacc(Block_context&, Uint32 instanceNumber = 0); virtual ~Dbacc(); // pointer to TUP instance in this thread === modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp' --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2008-07-26 05:13:40 +0000 @@ -86,8 +86,8 @@ void Dbacc::initRecords() ctablesize); }//Dbacc::initRecords() -Dbacc::Dbacc(Block_context& ctx): - SimulatedBlock(DBACC, ctx), +Dbacc::Dbacc(Block_context& ctx, Uint32 instanceNumber): + SimulatedBlock(DBACC, ctx, instanceNumber), c_tup(0) { BLOCK_CONSTRUCTOR(Dbacc); === added file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "DbaccProxy.hpp" +#include "Dbacc.hpp" + +DbaccProxy::DbaccProxy(Block_context& ctx) : + LocalProxy(DBACC, ctx) +{ +} + +DbaccProxy::~DbaccProxy() +{ +} + +SimulatedBlock* +DbaccProxy::newWorker(Uint32 instanceNo) +{ + return new Dbacc(m_ctx, instanceNo); +} + +BLOCK_FUNCTIONS(DbaccProxy) === added file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,31 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_DBACC_PROXY_HPP +#define NDB_DBACC_PROXY_HPP + +#include + +class DbaccProxy : public LocalProxy { +public: + DbaccProxy(Block_context& ctx); + virtual ~DbaccProxy(); + BLOCK_DEFINES(DbaccProxy); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp' --- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-25 05:48:32 +0000 @@ -1642,9 +1642,6 @@ Dbdict::Dbdict(Block_context& ctx): c_opDropEvent(c_opRecordPool), c_opSignalUtil(c_opRecordPool), c_opRecordSequence(0) -#ifdef VM_TRACE - ,debugOut(*new NullOutputStream()) -#endif { BLOCK_CONSTRUCTOR(Dbdict); @@ -2394,25 +2391,8 @@ void Dbdict::execREAD_CONFIG_REQ(Signal* new (ptr.p) DictObject(); objs.release(); } - -#ifdef VM_TRACE - { - FILE* debugFile = globalSignalLoggers.getOutputStream(); - if (debugFile != NULL) - debugOut = *new NdbOut(*new FileOutputStream(debugFile)); - } -#endif }//execSIZEALT_REP() -#ifdef VM_TRACE -bool -Dbdict::debugOutOn() const -{ - SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut; - return globalSignalLoggers.logMatch(DBDICT, mask); -} -#endif - /* ---------------------------------------------------------------- */ // Start phase signals sent by CNTR. We reply with NDB_STTORRY when // we completed this phase. === modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp' --- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-06-05 20:34:20 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-07-25 05:48:32 +0000 @@ -74,20 +74,6 @@ #include #include -// Debug Macros - -#ifdef VM_TRACE -#define D(x) \ - do { \ - if (!debugOutOn()) break; \ - debugOut << "DBDICT:" << __LINE__ << " " << x << endl; \ - } while (0) -#define V(x) " " << #x << ":" << (x) -#else -#define D(x) -#undef V -#endif - #ifdef DBDICT_C /*--------------------------------------------------------------*/ @@ -3422,8 +3408,6 @@ public: int checkSingleUserMode(Uint32 senderRef); #ifdef VM_TRACE - NdbOut debugOut; - bool debugOutOn() const; friend NdbOut& operator<<(NdbOut& out, const DictObject&); friend NdbOut& operator<<(NdbOut& out, const ErrorInfo&); friend NdbOut& operator<<(NdbOut& out, const SchemaOp&); === modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp' --- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-06-05 20:31:21 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-07-25 05:48:32 +0000 @@ -1773,6 +1773,16 @@ private: bool c_sr_wait_to; NdbNodeBitmask m_sr_nodes; NdbNodeBitmask m_to_nodes; + + // block instances +public: + Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) { + ndbrequire(!tFragPtr.isNull()); + Uint32 log_part_id = tFragPtr.p->m_log_part_id; + Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS; + return instanceKey; + } + Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId); }; #if (DIH_CDATA_SIZE < _SYSFILE_SIZE32) === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-06-09 14:32:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-07-25 05:48:32 +0000 @@ -16871,3 +16871,16 @@ do_send: } #endif + +// block instances +Uint32 +Dbdih::dihGetInstanceKey(Uint32 tabId, Uint32 fragId) +{ + TabRecordPtr tTabPtr; + tTabPtr.i = tabId; + ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord); + FragmentstorePtr tFragPtr; + getFragstore(tTabPtr.p, fragId, tFragPtr); + Uint32 instanceKey = dihGetInstanceKey(tFragPtr); + return instanceKey; +} === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-06-05 20:34:20 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-07-26 05:13:40 +0000 @@ -400,6 +400,8 @@ class Dbtup; * - LOG */ class Dblqh: public SimulatedBlock { + friend class DblqhProxy; + public: enum LcpCloseState { LCP_IDLE = 0, @@ -2047,7 +2049,7 @@ public: }; public: - Dblqh(Block_context& ctx); + Dblqh(Block_context& ctx, Uint32 instanceNumber = 0); virtual ~Dblqh(); void receive_keyinfo(Signal*, Uint32 * data, Uint32 len); === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-07-26 05:13:40 +0000 @@ -165,8 +165,8 @@ void Dblqh::initRecords() bat[1].bits.v = 5; }//Dblqh::initRecords() -Dblqh::Dblqh(Block_context& ctx): - SimulatedBlock(DBLQH, ctx), +Dblqh::Dblqh(Block_context& ctx, Uint32 instanceNumber): + SimulatedBlock(DBLQH, ctx, instanceNumber), c_lcp_waiting_fragments(c_fragment_pool), c_lcp_restoring_fragments(c_fragment_pool), c_lcp_complete_fragments(c_fragment_pool), === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-06-14 07:33:14 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-07-23 11:36:53 +0000 @@ -9091,7 +9091,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConne if(len < left) { - offset = len; + offset = tcPtrP->m_offset_current_keybuf + len; } else { === added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,49 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "DblqhProxy.hpp" +#include "Dblqh.hpp" + +DblqhProxy::DblqhProxy(Block_context& ctx) : + LocalProxy(DBLQH, ctx) +{ + // GSN_SEND_PACKED + addRecSignal(GSN_SEND_PACKED, &DblqhProxy::execSEND_PACKED); +} + +DblqhProxy::~DblqhProxy() +{ +} + +SimulatedBlock* +DblqhProxy::newWorker(Uint32 instanceNo) +{ + return new Dblqh(m_ctx, instanceNo); +} + +// GSN_SEND_PACKED + +void +DblqhProxy::execSEND_PACKED(Signal* signal) +{ + Uint32 i; + for (i = 0; i < c_workers; i++) { + ndbrequire(c_worker[i] != 0); + Dblqh* dblqh = static_cast(c_worker[i]); + dblqh->execSEND_PACKED(signal); + } +} + +BLOCK_FUNCTIONS(DblqhProxy) === added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_DBLQH_PROXY_HPP +#define NDB_DBLQH_PROXY_HPP + +#include + +class DblqhProxy : public LocalProxy { +public: + DblqhProxy(Block_context& ctx); + virtual ~DblqhProxy(); + BLOCK_DEFINES(DblqhProxy); + + // GSN_SEND_PACKED + void execSEND_PACKED(Signal*); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp' --- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-06-05 20:34:20 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-07-26 05:13:40 +0000 @@ -344,6 +344,7 @@ inline const Uint32* ALIGN_WORD(const vo #endif class Dbtup: public SimulatedBlock { +friend class DbtupProxy; friend class Suma; public: struct KeyReqStruct; @@ -1651,7 +1652,7 @@ struct TupHeadInfo { Uint32 terrorCode; public: - Dbtup(Block_context&, Pgman*); + Dbtup(Block_context&, Pgman*, Uint32 instanceNumber = 0); virtual ~Dbtup(); /* === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-07-26 05:13:40 +0000 @@ -49,8 +49,8 @@ void Dbtup::initData() cpackedListIndex = 0; }//Dbtup::initData() -Dbtup::Dbtup(Block_context& ctx, Pgman* pgman) - : SimulatedBlock(DBTUP, ctx), +Dbtup::Dbtup(Block_context& ctx, Pgman* pgman, Uint32 instanceNumber) + : SimulatedBlock(DBTUP, ctx, instanceNumber), c_lqh(0), m_pgman(this, pgman), c_extent_hash(c_extent_pool), === added file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,47 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "DbtupProxy.hpp" +#include "Dbtup.hpp" + +DbtupProxy::DbtupProxy(Block_context& ctx) : + LocalProxy(DBTUP, ctx) +{ + addRecSignal(GSN_SEND_PACKED, &DbtupProxy::execSEND_PACKED); +} + +DbtupProxy::~DbtupProxy() +{ +} + +SimulatedBlock* +DbtupProxy::newWorker(Uint32 instanceNo) +{ + return new Dbtup(m_ctx, 0, instanceNo); +} + +// GSN_SEND_PACKED + +void +DbtupProxy::execSEND_PACKED(Signal* signal) +{ + Uint32 i; + for (i = 0; i < c_workers; i++) { + Dbtup* dbtup = static_cast(c_worker[i]); + dbtup->execSEND_PACKED(signal); + } +} + +BLOCK_FUNCTIONS(DbtupProxy) === added file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_DBTUP_PROXY +#define NDB_DBTUP_PROXY + +#include + +class DbtupProxy : public LocalProxy { +public: + DbtupProxy(Block_context& ctx); + virtual ~DbtupProxy(); + BLOCK_DEFINES(DbtupProxy); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); + + // GSN_SEND_PACKED + void execSEND_PACKED(Signal*); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp' --- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-07-26 05:13:40 +0000 @@ -101,8 +101,9 @@ class Configuration; class Dbtux : public SimulatedBlock { + friend class DbtuxProxy; public: - Dbtux(Block_context& ctx); + Dbtux(Block_context& ctx, Uint32 instanceNumber = 0); virtual ~Dbtux(); // pointer to TUP instance in this thread === modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp' --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-06-05 20:19:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-07-26 05:13:40 +0000 @@ -18,8 +18,8 @@ #include -Dbtux::Dbtux(Block_context& ctx) : - SimulatedBlock(DBTUX, ctx), +Dbtux::Dbtux(Block_context& ctx, Uint32 instanceNumber) : + SimulatedBlock(DBTUX, ctx, instanceNumber), c_tup(0), c_descPageList(RNIL), #ifdef VM_TRACE === added file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,34 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "DbtuxProxy.hpp" +#include "Dbtux.hpp" + +DbtuxProxy::DbtuxProxy(Block_context& ctx) : + LocalProxy(DBTUX, ctx) +{ +} + +DbtuxProxy::~DbtuxProxy() +{ +} + +SimulatedBlock* +DbtuxProxy::newWorker(Uint32 instanceNo) +{ + return new Dbtux(m_ctx, instanceNo); +} + +BLOCK_FUNCTIONS(DbtuxProxy) === added file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp 2008-07-26 05:13:40 +0000 @@ -0,0 +1,31 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_DBTUX_PROXY_HPP +#define NDB_DBTUX_PROXY_HPP + +#include + +class DbtuxProxy : public LocalProxy { +public: + DbtuxProxy(Block_context& ctx); + virtual ~DbtuxProxy(); + BLOCK_DEFINES(DbtuxProxy); + +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; + +#endif === modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp' --- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-04-09 10:41:03 +0000 +++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-07-25 05:48:32 +0000 @@ -335,10 +335,8 @@ no_odirect: req->data.pageData[0] = m_page_ptr.i; m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal, - FsReadWriteReq::FixedLength + 1 -#ifdef VM_TRACE - , true // This EXECUTE_DIRECT is thread safe -#endif + FsReadWriteReq::FixedLength + 1, + 0 // wl4391_todo This EXECUTE_DIRECT is thread safe ); retry: Uint32 size = request->par.open.page_size; === modified file 'storage/ndb/src/kernel/blocks/restore.cpp' --- a/storage/ndb/src/kernel/blocks/restore.cpp 2008-03-18 07:12:39 +0000 +++ b/storage/ndb/src/kernel/blocks/restore.cpp 2008-07-26 05:13:40 +0000 @@ -32,8 +32,8 @@ #define PAGES LCP_RESTORE_BUFFER -Restore::Restore(Block_context& ctx) : - SimulatedBlock(RESTORE, ctx), +Restore::Restore(Block_context& ctx, Uint32 instanceNumber) : + SimulatedBlock(RESTORE, ctx, instanceNumber), m_file_list(m_file_pool), m_file_hash(m_file_pool) { === modified file 'storage/ndb/src/kernel/blocks/restore.hpp' --- a/storage/ndb/src/kernel/blocks/restore.hpp 2008-03-18 07:12:39 +0000 +++ b/storage/ndb/src/kernel/blocks/restore.hpp 2008-07-26 05:13:40 +0000 @@ -28,8 +28,10 @@ class Restore : public SimulatedBlock { + friend class RestoreProxy; + public: - Restore(Block_context& ctx); + Restore(Block_context& ctx, Uint32 instanceNumber = 0); virtual ~Restore(); BLOCK_DEFINES(Restore); === modified file 'storage/ndb/src/kernel/main.cpp' --- a/storage/ndb/src/kernel/main.cpp 2008-04-23 13:52:37 +0000 +++ b/storage/ndb/src/kernel/main.cpp 2008-07-25 05:48:32 +0000 @@ -1,4 +1,4 @@ -/* Copyright (C) 2003 MySQL AB + /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,6 +33,7 @@ #include #include +#include #include @@ -285,6 +286,56 @@ init_global_memory_manager(EmulatorData return 0; // Success } +bool g_ndbMt = false; +bool g_ndbMtLqh = false; + +static int +get_multithreaded_config(EmulatorData& ed) +{ + // multithreaded is compiled in ndbd/ndbmtd + g_ndbMt = SimulatedBlock::isMultiThreaded(); + if (!g_ndbMt) + return 0; + + // mt lqh via environment during development + { + const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0); + if (p != 0 && strchr("1Y", p[0]) != 0) + g_ndbMtLqh = true; + if (!g_ndbMtLqh) + return 0; + } + + const ndb_mgm_configuration_iterator * p = + ed.theConfiguration->getOwnConfigIterator(); + if (p == 0) + { + abort(); + } + + Uint32 workers = 0; + Uint32 threads = 0; + if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) || + ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads)) + { + g_eventLogger->alert("Failed to get CFG_NDBMT parameters from " + "config, exiting."); + return -1; + } + + ndbout << "NDBMT: workers=" << workers + << " threads=" << threads << endl; + + assert(workers != 0 && workers <= MAX_NDBMT_WORKERS); + assert(threads != 0 && threads <= MAX_NDBMT_THREADS); + assert(workers % threads == 0); + + globalData.ndbmtWorkers = workers; + globalData.ndbmtThreads = threads; + return 0; +} + + int main(int argc, char** argv) { NDB_INIT(argv[0]); @@ -480,28 +531,41 @@ int main(int argc, char** argv) globalEmulatorData.theWatchDog->unregisterWatchedThread(0); } - globalEmulatorData.theThreadConfig->init(&globalEmulatorData); + if (get_multithreaded_config(globalEmulatorData)) + return -1; - // Load blocks - globalEmulatorData.theSimBlockList->load(globalEmulatorData); - - // Set thread concurrency for Solaris' light weight processes - int status; - status = NdbThread_SetConcurrencyLevel(30); - assert(status == 0); + globalEmulatorData.theThreadConfig->init(&globalEmulatorData); #ifdef VM_TRACE - // Create a signal logger + // Create a signal logger before block constructors char *buf= NdbConfig_SignalLogFileName(globalData.ownId); NdbAutoPtr tmp_aptr(buf); FILE * signalLog = fopen(buf, "a"); globalSignalLoggers.setOwnNodeId(globalData.ownId); globalSignalLoggers.setOutputStream(signalLog); -#if 0 // to log startup - globalSignalLoggers.log(SignalLoggerManager::LogInOut, "BLOCK=DBDICT,DBDIH"); - globalData.testOn = 1; +#if 1 // to log startup + { const char* p = NdbEnv_GetEnv("NDB_SIGNAL_LOG", (char*)0, 0); + if (p != 0) { + char buf[200]; + snprintf(buf, sizeof(buf), "BLOCK=%s", p); + for (char* q = buf; *q != 0; q++) *q = toupper(toascii(*q)); + globalSignalLoggers.log(SignalLoggerManager::LogInOut, buf); + globalData.testOn = 1; + assert(signalLog != 0); + fprintf(signalLog, "START\n"); + fflush(signalLog); + } + } #endif #endif + + // Load blocks + globalEmulatorData.theSimBlockList->load(globalEmulatorData); + + // Set thread concurrency for Solaris' light weight processes + int status; + status = NdbThread_SetConcurrencyLevel(30); + assert(status == 0); catchsigs(false); === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-07-25 05:48:32 +0000 @@ -999,6 +999,17 @@ Configuration::calcSizeAlt(ConfigValues cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords); } + // NDBMT + { + uint workers = 4; + uint threads = 2; + assert(workers <= MAX_NDBMT_WORKERS); + assert(threads <= MAX_NDBMT_THREADS); + assert(workers % threads == 0); + cfg.put(CFG_NDBMT_WORKERS, workers); + cfg.put(CFG_NDBMT_THREADS, threads); + } + m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues(); m_ownConfigIterator = ndb_mgm_create_configuration_iterator (m_ownConfig, 0); === added file 'storage/ndb/src/kernel/vm/GlobalData.cpp' --- a/storage/ndb/src/kernel/vm/GlobalData.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/GlobalData.cpp 2008-07-25 05:48:32 +0000 @@ -0,0 +1,28 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include +#include +#include +#include "GlobalData.hpp" + +SimulatedBlock* +GlobalData::getBlock(BlockNumber blockNo, Uint32 instanceNo) +{ + SimulatedBlock* b = getBlock(blockNo); + if (b != 0 && instanceNo != 0) + b = b->getInstance(instanceNo); + return b; +} === modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp' --- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-01-01 12:45:11 +0000 +++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-07-25 05:48:32 +0000 @@ -67,11 +67,16 @@ struct GlobalData { Uint32 sendPackedActivated; Uint32 activateSendPacked; + + Uint32 ndbmtWorkers; + Uint32 ndbmtThreads; GlobalData(){ theSignalId = 0; theStartLevel = NodeState::SL_NOTHING; theRestartFlag = perform_start; + ndbmtWorkers = 0; + ndbmtThreads = 0; #ifdef GCP_TIMER_HACK gcp_timer_limit = 0; #endif @@ -80,6 +85,7 @@ struct GlobalData { void setBlock(BlockNumber blockNo, SimulatedBlock * block); SimulatedBlock * getBlock(BlockNumber blockNo); + SimulatedBlock * getBlock(BlockNumber blockNo, Uint32 instanceNo); void incrementWatchDogCounter(Uint32 place); Uint32 * getWatchDogPtr(); === modified file 'storage/ndb/src/kernel/vm/Makefile.am' --- a/storage/ndb/src/kernel/vm/Makefile.am 2008-04-25 16:53:41 +0000 +++ b/storage/ndb/src/kernel/vm/Makefile.am 2008-07-25 05:48:32 +0000 @@ -36,13 +36,15 @@ libkernel_a_SOURCES = \ Rope.cpp \ ndbd_malloc.cpp \ ndbd_malloc_impl.cpp \ - Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp + Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \ + GlobalData.cpp libsched_a_SOURCES = TimeQueue.cpp \ ThreadConfig.cpp \ FastScheduler.cpp \ TransporterCallback_nonmt.cpp \ - SimulatedBlock_nonmt.cpp + SimulatedBlock_nonmt.cpp \ + dummy_nonmt.cpp libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \ TransporterCallback_mt.cpp \ === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-25 05:48:32 +0000 @@ -24,6 +24,7 @@ #include "SimulatedBlock.hpp" #include +#include #include #include #include @@ -46,6 +47,8 @@ #include #include +#include "../blocks/dbdih/Dbdih.hpp" + #include extern EventLogger * g_eventLogger; @@ -56,10 +59,15 @@ extern EventLogger * g_eventLogger; // Constructor, Destructor // SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, - struct Block_context & ctx) + struct Block_context & ctx, + Uint32 instanceNumber) : theNodeId(globalData.ownId), theNumber(blockNumber), - theReference(numberToRef(blockNumber, globalData.ownId)), + theInstance(instanceNumber), + theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)), + theInstanceCount(0), + theInstanceList(0), + theMainInstance(0), m_ctx(ctx), m_global_page_pool(globalData.m_global_page_pool), m_shared_page_pool(globalData.m_shared_page_pool), @@ -68,13 +76,41 @@ SimulatedBlock::SimulatedBlock(BlockNumb c_segmentedFragmentSendList(c_fragmentSendPool), c_mutexMgr(* this), c_counterMgr(* this) +#ifdef VM_TRACE + ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream()))) +#endif { m_threadId = 0; m_watchDogCounter = NULL; m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM); NewVarRef = 0; - globalData.setBlock(blockNumber, this); + SimulatedBlock* main = globalData.getBlock(blockNumber); + + if (theInstance == 0) { + ndbrequire(main == 0); + main = this; + globalData.setBlock(blockNumber, main); + main->theInstanceCount = 1; + } else { + ndbrequire(main != 0); + ndbrequire(theInstance == main->theInstanceCount); + ndbrequire(theInstance < MaxInstances); + if (theInstance == 1) { + ndbrequire(main->theInstanceList == 0); + main->theInstanceList = new SimulatedBlock* [MaxInstances]; + Uint32 i; + for (i = 0; i < MaxInstances; i++) + main->theInstanceList[i] = 0; + } else { + ndbrequire(main->theInstanceList != 0); + } + ndbrequire(main->theInstanceList[theInstance] == 0); + main->theInstanceList[theInstance] = this; + main->theInstanceCount = theInstance + 1; + } + theMainInstance = main; + c_fragmentIdCounter = 1; c_fragSenderRunning = false; @@ -130,6 +166,14 @@ SimulatedBlock::~SimulatedBlock() #ifdef VM_TRACE delete [] m_global_variables; #endif + + if (theInstanceList != 0) { + Uint32 i; + for (i = 0; i < theInstanceCount; i++) + delete theInstanceList[i]; + delete [] theInstanceList; + } + theInstanceList = 0; } void @@ -172,6 +216,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS theExecArray[gsn] = f; } +Uint32 +SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId) +{ + Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH); + ndbrequire(dbdih != 0); + Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId); + return instanceKey; +} + void SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer, Uint32 *watchDogCounter) @@ -509,9 +562,9 @@ SimulatedBlock::sendSignal(BlockReferenc signal->header.theSendersBlockRef = sendBRef; #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL); + sendlocal(m_threadId, &signal->header, signal->theData, NULL); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL); + sendprioa(m_threadId, &signal->header, signal->theData, NULL); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); @@ -607,9 +660,9 @@ SimulatedBlock::sendSignal(NodeReceiverG #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL); + sendlocal(m_threadId, &signal->header, signal->theData, NULL); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL); + sendprioa(m_threadId, &signal->header, signal->theData, NULL); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif @@ -716,10 +769,10 @@ SimulatedBlock::sendSignal(BlockReferenc #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, + sendlocal(m_threadId, &signal->header, signal->theData, signal->theData+length); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, + sendprioa(m_threadId, &signal->header, signal->theData, signal->theData+length); #else globalScheduler.execute(signal, jobBuffer, recBlock, @@ -831,10 +884,10 @@ SimulatedBlock::sendSignal(NodeReceiverG #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, + sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, + sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); @@ -944,10 +997,10 @@ SimulatedBlock::sendSignal(BlockReferenc #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, + sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, + sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); @@ -1059,10 +1112,10 @@ SimulatedBlock::sendSignal(NodeReceiverG * dst ++ = sections->m_ptr[2].i; #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) - sendlocal(m_threadId, recBlock, &signal->header, signal->theData, + sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else - sendprioa(m_threadId, recBlock, &signal->header, signal->theData, + sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); @@ -1246,16 +1299,20 @@ SimulatedBlock::freeBat(){ } const NewVARIABLE * -SimulatedBlock::getBat(Uint16 blockNo){ +SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){ SimulatedBlock * sb = globalData.getBlock(blockNo); + if (sb != 0 && instanceNo != 0) + sb = sb->getInstance(instanceNo); if(sb == 0) return 0; return sb->NewVarRef; } Uint16 -SimulatedBlock::getBatSize(Uint16 blockNo){ +SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){ SimulatedBlock * sb = globalData.getBlock(blockNo); + if (sb != 0 && instanceNo != 0) + sb = sb->getInstance(instanceNo); if(sb == 0) return 0; return sb->theBATSize; @@ -1418,7 +1475,7 @@ SimulatedBlock::infoEvent(const char * m signalT.header.theLength = ((len+3)/4)+1; #ifdef NDBD_MULTITHREADED - sendlocal(m_threadId, signalT.header.theReceiversBlockNumber, + sendlocal(m_threadId, &signalT.header, signalT.theData, signalT.m_sectionPtrI); #else globalScheduler.execute(&signalT.header, JBB, signalT.theData, @@ -1463,7 +1520,7 @@ SimulatedBlock::warningEvent(const char signalT.header.theLength = ((len+3)/4)+1; #ifdef NDBD_MULTITHREADED - sendlocal(m_threadId, signalT.header.theReceiversBlockNumber, + sendlocal(m_threadId, &signalT.header, signalT.theData, signalT.m_sectionPtrI); #else globalScheduler.execute(&signalT.header, JBB, signalT.theData, @@ -1621,7 +1678,7 @@ SimulatedBlock::printTimes(FILE * output } } sum = (sum + 500)/ 1000; - fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum); + fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum); fprintf(output, "\n"); fflush(output); } @@ -2387,7 +2444,7 @@ SimulatedBlock::setNodeInfo(NodeId nodeI } bool -SimulatedBlock::isMultiThreaded() const +SimulatedBlock::isMultiThreaded() { #ifdef NDBD_MULTITHREADED return true; @@ -2720,3 +2777,37 @@ SimulatedBlock::create_distr_key(Uint32 } CArray g_key_descriptor_pool; + +#ifdef VM_TRACE +bool +SimulatedBlock::debugOutOn() const +{ + SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut; + return globalSignalLoggers.logMatch(number(), mask); +} + +const char* +SimulatedBlock::debugOutTag(char *buf, int line) +{ + char blockbuf[40]; + char instancebuf[40]; + char linebuf[40]; + char timebuf[40]; + sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN")); + instancebuf[0] = 0; + if (instance() != 0) + sprintf(instancebuf, "/%u", instance()); + sprintf(linebuf, " %d", line); + timebuf[0] = 0; +#ifdef VM_TRACE_TIME + { + NDB_TICKS t = NdbTick_CurrentMillisecond(); + uint s = (t / 1000) % 3600; + uint ms = t % 1000; + sprintf(timebuf, " - %u.%03u -", s, ms); + } +#endif + sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf); + return buf; +} +#endif === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-07-25 05:48:32 +0000 @@ -56,6 +56,19 @@ #include "ndbd_malloc_impl.hpp" #include +#ifdef VM_TRACE +#define D(x) \ + do { \ + char buf[200]; \ + if (!debugOutOn()) break; \ + debugOut << debugOutTag(buf, __LINE__) << x << endl; \ + } while (0) +#define V(x) " " << #x << ":" << (x) +#else +#define D(x) +#undef V +#endif + /** * Something for filesystem access */ @@ -108,7 +121,8 @@ protected: * Constructor */ SimulatedBlock(BlockNumber blockNumber, - struct Block_context & ctx); + struct Block_context & ctx, + Uint32 instanceNumber = 0); /********************************************************** * Handling of execFunctions @@ -125,12 +139,34 @@ public: */ inline void executeFunction(GlobalSignalNumber gsn, Signal* signal); + /* Multiple block instances */ + Uint32 instance() const { + return theInstance; + } + Uint32 getWorkerCount() const { + ndbrequire(theInstance == 0); // valid only on main instance + ndbrequire(theInstanceCount >= 1); + return theInstanceCount - 1; + } + SimulatedBlock* getInstance(Uint32 instanceNumber) { + ndbrequire(theInstance == 0); + if (instanceNumber == 0) + return this; + if (instanceNumber < theInstanceCount) { + ndbrequire(theInstanceList != 0); + return theInstanceList[instanceNumber]; + } + return 0; + } + Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId); + /* Setup state of a block object for executing in a particular thread. */ void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer, Uint32 *watchDogCounter); /* For multithreaded ndbd, get the id of owning thread. */ uint32 getThreadId() const { return m_threadId; } - bool isMultiThreaded() const; + static bool isMultiThreaded(); + public: typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, Uint32 callbackData, @@ -210,14 +246,15 @@ protected: Uint32 length, SectionHandle* sections) const; + /* + * Instance defaults to instance of sender. Using explicit + * instance argument asserts that the call is thread-safe. + */ void EXECUTE_DIRECT(Uint32 block, Uint32 gsn, Signal* signal, - Uint32 len -#ifdef VM_TRACE - , bool is_thread_safe = false -#endif -); + Uint32 len, + Uint32 givenInstanceNo = ZNIL); class SectionSegmentPool& getSectionSegmentPool(); void releaseSections(struct SectionHandle&); @@ -398,8 +435,18 @@ private: void signal_error(Uint32, Uint32, Uint32, const char*, int) const ; const NodeId theNodeId; const BlockNumber theNumber; + const Uint32 theInstance; const BlockReference theReference; /* + * Instance 0 is the main instance. It creates/owns other instances. + * In MT LQH main instance is the LQH proxy and the others ("workers") + * are real LQHs run by multiple threads. + */ + enum { MaxInstances = 1 + MAX_NDBMT_WORKERS }; + Uint32 theInstanceCount; // set in main + SimulatedBlock** theInstanceList; // set in main, indexed by instance + SimulatedBlock* theMainInstance; // set in all + /* Thread id currently executing this block. Not used in singlethreaded ndbd. */ @@ -416,8 +463,10 @@ protected: Block_context m_ctx; NewVARIABLE* allocateBat(int batSize); void freeBat(); - static const NewVARIABLE* getBat (BlockNumber blockNo); - static Uint16 getBatSize(BlockNumber blockNo); + static const NewVARIABLE* getBat (BlockNumber blockNo, + Uint32 instanceNo = 0); + static Uint16 getBatSize(BlockNumber blockNo, + Uint32 instanceNo = 0); static BlockReference calcTcBlockRef (NodeId aNode); static BlockReference calcLqhBlockRef (NodeId aNode); @@ -629,6 +678,12 @@ public: void clear_global_variables(); void init_globals_list(void ** tmp, size_t cnt); #endif + +#ifdef VM_TRACE + NdbOut debugOut; + bool debugOutOn() const; + const char* debugOutTag(char* buf, int line); +#endif }; inline @@ -806,17 +861,32 @@ void SimulatedBlock::EXECUTE_DIRECT(Uint32 block, Uint32 gsn, Signal* signal, - Uint32 len -#ifdef VM_TRACE - , bool is_thread_safe -#endif -) + Uint32 len, + Uint32 givenInstanceNo) { signal->setLength(len); + SimulatedBlock* b = globalData.getBlock(block); + ndbassert(b != 0); + /** + * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT + * (unlike sendSignal) is generally not thread-safe. + * So only allow EXECUTE_DIRECT between blocks that run in the same thread, + * unless caller explicitly marks it as being thread safe (eg NDBFS), + * by using an explicit instance argument. + * By default instance of sender is used. This is automatically thread-safe + * for worker instances (instance != 0). + */ + Uint32 instanceNo = givenInstanceNo; + if (instanceNo == ZNIL) + instanceNo = instance(); + if (instanceNo != 0) + b = b->getInstance(instanceNo); + ndbassert(b != 0); + ndbassert(givenInstanceNo != ZNIL || b->getThreadId() == getThreadId()); #ifdef VM_TRACE if(globalData.testOn){ signal->header.theVerId_signalNumber = gsn; - signal->header.theReceiversBlockNumber = block; + signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo); signal->header.theSendersBlockRef = reference(); globalSignalLoggers.executeDirect(signal->header, 0, // in @@ -824,14 +894,6 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl globalData.ownId); } #endif - SimulatedBlock* b = globalData.getBlock(block); - /** - * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT - * (unlike sendSignal() is generally not thread-safe. - * So only allow EXECUTE_DIRECT between blocks that run in the same thread, - * unless caller explicitly marks it as being thread safe (eg NDBFS). - */ - ndbassert(is_thread_safe || b->getThreadId() == getThreadId()); #ifdef VM_TRACE_TIME Uint32 us1, us2; Uint64 ms1, ms2; @@ -854,7 +916,7 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl #ifdef VM_TRACE if(globalData.testOn){ signal->header.theVerId_signalNumber = gsn; - signal->header.theReceiversBlockNumber = block; + signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo); signal->header.theSendersBlockRef = reference(); globalSignalLoggers.executeDirect(signal->header, 1, // out @@ -875,6 +937,8 @@ SimulatedBlock::addTime(Uint32 gsn, Uint inline void SimulatedBlock::subTime(Uint32 gsn, Uint64 time){ + // wl4391_todo got 0xf3f3f3f3 here on GSN_UPGRADE_PROTOCOL_ORD... + if (gsn < 0xffff) m_timeTrace[gsn].sub += time; } #endif === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-25 05:48:32 +0000 @@ -191,7 +191,14 @@ TransporterCallbackKernel::deliver_signa const Uint32 secCount = header->m_noOfSections; const Uint32 length = header->theLength; + + /* + * Strip instance bits if not multithreaded. This is also + * done in versions prior to MT LQH, to simplify online upgrade. + */ +#ifndef NDBD_MULTITHREADED header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK; +#endif #ifdef TRACE_DISTRIBUTED ndbout_c("recv: %s(%d) from (%s, %d)", @@ -232,10 +239,10 @@ TransporterCallbackKernel::deliver_signa globalScheduler.execute(header, prio, theData, secPtrI); #else if (prio == JBB) - sendlocal(receiverThreadId, header->theReceiversBlockNumber, + sendlocal(receiverThreadId, header, theData, secPtrI); else - sendprioa(receiverThreadId, header->theReceiversBlockNumber, + sendprioa(receiverThreadId, header, theData, secPtrI); #endif @@ -271,10 +278,10 @@ TransporterCallbackKernel::deliver_signa globalScheduler.execute(header, prio, theData, secPtrI); #else if (prio == JBB) - sendlocal(receiverThreadId, header->theReceiversBlockNumber, + sendlocal(receiverThreadId, header, theData, NULL); else - sendprioa(receiverThreadId, header->theReceiversBlockNumber, + sendprioa(receiverThreadId, header, theData, NULL); #endif @@ -383,7 +390,7 @@ TransporterCallbackKernel::reportError(N #else signal.header.theVerId_signalNumber = GSN_EVENT_REP; signal.header.theReceiversBlockNumber = CMVMI; - sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber, + sendprioa(receiverThreadId, &signalT.header, signalT.theData, NULL); #endif @@ -436,7 +443,7 @@ TransporterCallbackKernel::reportReceive #else signal.header.theVerId_signalNumber = GSN_EVENT_REP; signal.header.theReceiversBlockNumber = CMVMI; - sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber, + sendprioa(receiverThreadId, &signalT.header, signalT.theData, NULL); #endif } @@ -463,7 +470,7 @@ TransporterCallbackKernel::reportConnect #else signal.header.theVerId_signalNumber = GSN_CONNECT_REP; signal.header.theReceiversBlockNumber = CMVMI; - sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber, + sendprioa(receiverThreadId, &signalT.header, signalT.theData, NULL); #endif } @@ -495,7 +502,7 @@ TransporterCallbackKernel::reportDisconn #else signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP; signal.header.theReceiversBlockNumber = CMVMI; - sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber, + sendprioa(receiverThreadId, &signalT.header, signalT.theData, NULL); #endif === added file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp' --- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-07-25 05:48:32 +0000 @@ -0,0 +1,14 @@ +#include +#include + +void +add_thr_map(Uint32, Uint32, Uint32) +{ + assert(false); +} + +void +add_worker_thr_map(Uint32, Uint32) +{ + assert(false); +} === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2008-08-02 10:35:51 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-08-03 18:22:23 +0000 @@ -43,9 +43,14 @@ static const Uint32 MAX_SIGNALS_PER_JB = //#define NDB_MT_LOCK_TO_CPU -static const Uint32 NUM_THREADS = 3; -static const Uint32 RECEIVER_THREAD_NO = 2; -#define MAX_THREADS 4 +#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS) +#define NUM_MAIN_THREADS 2 // except receiver +#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1) + +static Uint32 ndbmt_workers = 0; +static Uint32 ndbmt_threads = 0; +static Uint32 num_threads = 0; +static Uint32 receiver_thread_no = 0; #ifdef NDBMTD_X86 static inline @@ -456,8 +461,7 @@ struct thr_data thr_wait m_waiter; unsigned m_thr_no; - struct SimulatedBlock* m_blocks[NO_OF_BLOCKS]; - + Uint64 m_time; struct thr_tq m_tq; @@ -499,9 +503,9 @@ struct thr_data struct thr_jb_read_state m_read_states[MAX_THREADS]; /* Jam buffers for making trace files at crashes. */ - EmulatedJamBuffer *m_jam; + EmulatedJamBuffer m_jam; /* Watchdog counter for this thread. */ - Uint32 *m_watchdog_counter; + Uint32 m_watchdog_counter; /* Signal delivery statistics. */ Uint32 m_prioa_count; Uint32 m_prioa_size; @@ -711,7 +715,7 @@ struct trp_callback : public Transporter Uint32 total_bytes(NodeId node) const { Uint32 total = 0; - for (Uint32 i = 0; i < NUM_THREADS; i++) + for (Uint32 i = 0; i < num_threads; i++) total += m_thr_buffers[i]->m_buffers[node].used_bytes(); return total; } @@ -919,7 +923,7 @@ scan_queue(struct thr_data* selfptr, Uin * If they are frequent, we may want to optimize, as sending one prio A * signal is somewhat expensive compared to sending one prio B. */ - sendprioa(thr_no, s->theReceiversBlockNumber, s, data, + sendprioa(thr_no, s, data, data + s->theLength); * (page + pos) = free; free = idx; @@ -1202,7 +1206,8 @@ thr_send_buf::updateWritePtr(NodeId node trp_callback::trp_callback() { - for (Uint32 i = 0; i < NUM_THREADS; i++) + // number of threads not yet set so use MAX_THREADS + for (Uint32 i = 0; i < MAX_THREADS; i++) m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list); for (int i = 0; i < MAX_NTRANSPORTERS; i++) m_send_thr[i] = ~(Uint32)0; @@ -1223,7 +1228,7 @@ trp_callback::reportSendLen(NodeId nodeI signal.theData[2] = (bytes/count); signal.header.theVerId_signalNumber = GSN_EVENT_REP; signal.header.theReceiversBlockNumber = CMVMI; - sendprioa(m_send_thr[nodeId], signal.header.theReceiversBlockNumber, + sendprioa(m_send_thr[nodeId], &signalT.header, signalT.theData, NULL); } @@ -1272,7 +1277,7 @@ trp_callback::get_bytes_to_send_iovec(No if (max_iovecs == 0) return 0; - for (Uint32 thr = 0; thr < NUM_THREADS && iovecs < max_iovecs; thr++) + for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++) { thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node; thr_send_buf::page *pg = b->m_current_page; @@ -1357,7 +1362,7 @@ trp_callback::bytes_sent(NodeId node, co break; // Found it } - curr_thr = (curr_thr + 1) % NUM_THREADS; + curr_thr = (curr_thr + 1) % num_threads; #ifdef VM_TRACE assert(curr_thr != start_thr); // Sent data was not in buffer #endif @@ -1390,7 +1395,7 @@ trp_callback::bytes_sent(NodeId node, co bool trp_callback::has_data_to_send(NodeId node) { - for (Uint32 thr = 0; thr < NUM_THREADS; thr++) + for (Uint32 thr = 0; thr < num_threads; thr++) { thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node; thr_send_buf::page *pg = b->m_current_page; @@ -1577,15 +1582,14 @@ inline void sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no) { - SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO; - SimulatedBlock* b_lqh = * (blockptr + DBLQH); - SimulatedBlock* b_tc = * (blockptr + DBTC); - SimulatedBlock* b_tup = * (blockptr + DBTUP); - if (b_lqh) + SimulatedBlock* b_lqh = globalData.getBlock(DBLQH); + SimulatedBlock* b_tc = globalData.getBlock(DBTC); + SimulatedBlock* b_tup = globalData.getBlock(DBTUP); + if (thr_no == 1) b_lqh->executeFunction(GSN_SEND_PACKED, signal); - if (b_tc) + if (thr_no == 0) b_tc->executeFunction(GSN_SEND_PACKED, signal); - if (b_tup) + if (thr_no == 1) b_tup->executeFunction(GSN_SEND_PACKED, signal); } @@ -1736,7 +1740,6 @@ execute_signals(thr_data *selfptr, thr_j r->m_write_pos : q->m_buffers[read_index]->m_len); thr_job_buffer *read_buffer = r->m_read_buffer; - SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO; while (num_signals < max_signals) { @@ -1779,9 +1782,11 @@ execute_signals(thr_data *selfptr, thr_j Uint32 siglen = (sizeof(*s)>>2) + s->theLength; if(siglen>16) __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3); - Uint32 bno = s->theReceiversBlockNumber; + Uint32 bno = blockToMain(s->theReceiversBlockNumber); + Uint32 ino = blockToInstance(s->theReceiversBlockNumber); Uint32 gsn = s->theVerId_signalNumber; - SimulatedBlock * block = blockptr[bno]; + SimulatedBlock * main = globalData.getBlock(bno); + SimulatedBlock * block = main->getInstance(ino); *watchDogCounter = 1; /* Must update original buffer so signal dump will see it. */ s->theSignalId = (*signalIdCounter)++; @@ -1799,6 +1804,16 @@ execute_signals(thr_data *selfptr, thr_j /* Update just before execute so signal dump can know how far we are. */ r->m_read_pos = read_pos; +#ifdef VM_TRACE + if (globalData.testOn) { //wl4391_todo segments + globalSignalLoggers.executeSignal(*s, + 0, + &sig->theData[0], + globalData.ownId); + + } +#endif + block->executeFunction(gsn, sig); num_signals++; @@ -1843,43 +1858,91 @@ run_job_buffers(thr_data *selfptr, Signa return signal_count; } -static inline Uint32 -block2ThreadId(Uint32 block) +struct thr_map_entry { + enum { NULL_THR_NO = 0xFFFF }; + Uint32 thr_no; + SimulatedBlock* block; + thr_map_entry() : thr_no(NULL_THR_NO), block(0) {} +}; + +static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES]; + +void +add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no) { - /** - * CMVMIis special, it runs in the receive thread due to calling directly - * into TransporterRegistry. - */ - if (block == CMVMI) - return 2; + Uint32 index = block - MIN_BLOCK_NO; + assert(index < NO_OF_BLOCKS); + assert(instance < MAX_BLOCK_INSTANCES); - /* - * Block assignment: - * 0 BACKUP LOCAL - * 1 DBTC GLOBAL - * 2 DBDIH GLOBAL - * 3 DBLQH LOCAL - * 4 DBACC LOCAL - * 5 DBTUP LOCAL - * 6 DBDICT GLOBAL - * 7 NDBCNTR GLOBAL - * 8 QMGR GLOBAL - * 9 NDBFS GLOBAL - * 10 CMVMI [Receive thread] - * 11 TRIX GLOBAL - * 12 DBUTIL GLOBAL - * 13 SUMA LOCAL - * 14 DBTUX LOCAL - * 15 TSMAN LOCAL - * 16 LGMAN LOCAL - * 17 PGMAN LOCAL - * 18 RESTORE LOCAL - * - * Thread 0 is for global, thread 1 for locals. - */ - static const Uint32 locals = 0x7e039; + SimulatedBlock* main = globalData.getBlock(block); + assert(main != 0); + SimulatedBlock* b = main->getInstance(instance); + assert(b != 0); + + assert(thr_no < num_threads); + struct thr_repository* rep = &g_thr_repository; + thr_data* thr_ptr = rep->m_thread + thr_no; + + b->assignToThread(thr_no, &thr_ptr->m_jam, &thr_ptr->m_watchdog_counter); + + thr_map_entry& entry = thr_map[index][instance]; + assert(entry.thr_no == thr_map_entry::NULL_THR_NO); + entry.thr_no = thr_no; + entry.block = b; +} + +// static assignment of main instances before first signal +static void +add_main_thr_map() +{ + static int done = 0; + if (done++) + return; + + const Uint32 thr_GLOBAL = 0; + const Uint32 thr_LOCAL = 1; + const Uint32 thr_RECEIVER = receiver_thread_no; + + add_thr_map(BACKUP, 0, thr_LOCAL); + add_thr_map(DBTC, 0, thr_GLOBAL); + add_thr_map(DBDIH, 0, thr_GLOBAL); + add_thr_map(DBLQH, 0, thr_LOCAL); + add_thr_map(DBACC, 0, thr_LOCAL); + add_thr_map(DBTUP, 0, thr_LOCAL); + add_thr_map(DBDICT, 0, thr_GLOBAL); + add_thr_map(NDBCNTR, 0, thr_GLOBAL); + add_thr_map(QMGR, 0, thr_GLOBAL); + add_thr_map(NDBFS, 0, thr_GLOBAL); + add_thr_map(CMVMI, 0, thr_RECEIVER); + add_thr_map(TRIX, 0, thr_GLOBAL); + add_thr_map(DBUTIL, 0, thr_GLOBAL); + add_thr_map(SUMA, 0, thr_LOCAL); + add_thr_map(DBTUX, 0, thr_LOCAL); + add_thr_map(TSMAN, 0, thr_LOCAL); + add_thr_map(LGMAN, 0, thr_LOCAL); + add_thr_map(PGMAN, 0, thr_LOCAL); + add_thr_map(RESTORE, 0, thr_LOCAL); +} + +// workers added by LocalProxy +void +add_worker_thr_map(Uint32 block, Uint32 instance) +{ + assert(instance != 0); + Uint32 i = instance - 1; + Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads; + add_thr_map(block, instance, thr_no); +} + +static inline Uint32 +block2ThreadId(Uint32 block, Uint32 instance) +{ assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO); - return (locals >> (block - MIN_BLOCK_NO)) & 1; + Uint32 index = block - MIN_BLOCK_NO; + assert(instance < MAX_BLOCK_INSTANCES); + const thr_map_entry& entry = thr_map[index][instance]; + assert(entry.thr_no < num_threads); + return entry.thr_no; } static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size, @@ -1901,7 +1964,7 @@ static void reportSignalStats(Uint32 sel s->theData[4] = b_count; s->theData[5] = b_size; /* ToDo: need this really be prio A like in old code? */ - sendlocal(self, s->header.theReceiversBlockNumber, &s->header, s->theData, + sendlocal(self, &s->header, s->theData, NULL); } @@ -1923,18 +1986,16 @@ update_sched_stats(thr_data *selfptr) } static void -init_thread(thr_data *selfptr, EmulatedJamBuffer *jam, Uint32 *watchDogCounter) +init_thread(thr_data *selfptr) { selfptr->m_waiter.init(); - jam->theEmulatedJamIndex = 0; - jam->theEmulatedJamBlockNumber = 0; - memset(jam->theEmulatedJam, 0, sizeof(jam->theEmulatedJam)); - NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jam); - selfptr->m_jam = jam; + selfptr->m_jam.theEmulatedJamIndex = 0; + selfptr->m_jam.theEmulatedJamBlockNumber = 0; + memset(selfptr->m_jam.theEmulatedJam, 0, sizeof(selfptr->m_jam.theEmulatedJam)); + NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam); - selfptr->m_watchdog_counter = watchDogCounter; unsigned thr_no = selfptr->m_thr_no; - globalEmulatorData.theWatchDog->registerWatchedThread(watchDogCounter, + globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter, thr_no); NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr); @@ -1954,22 +2015,6 @@ init_thread(thr_data *selfptr, EmulatedJ #endif } -/* Assign to this thread all blocks that will run in this thread. */ -static void -grab_threads(thr_data *selfptr, EmulatedJamBuffer *thread_jam, - Uint32 *watchDogCounter) -{ - for (Uint32 i = 0; i < NO_OF_BLOCKS; i++) - { - if (block2ThreadId(i + MIN_BLOCK_NO) == selfptr->m_thr_no) - { - require(selfptr->m_blocks[i] != 0); - selfptr->m_blocks[i]->assignToThread(selfptr->m_thr_no, thread_jam, - watchDogCounter); - } - } -} - /** * Align signal buffer for better cache performance. * Also skew it a litte for each thread to avoid cache pollution. @@ -2011,13 +2056,12 @@ mt_receiver_thread_main(void *thr_arg) struct thr_data* selfptr = (struct thr_data *)thr_arg; unsigned thr_no = selfptr->m_thr_no; EmulatedJamBuffer thread_jam; - Uint32 watchDogCounter; + Uint32& watchDogCounter = selfptr->m_watchdog_counter; Uint32 thrSignalId = 0; - init_thread(selfptr, &thread_jam, &watchDogCounter); + init_thread(selfptr); receiverThreadId = thr_no; signal = aligned_signal(signal_buf, thr_no); - grab_threads(selfptr, &thread_jam, &watchDogCounter); while (globalData.theRestartFlag != perform_stop) { @@ -2075,19 +2119,17 @@ mt_job_thread_main(void *thr_arg) struct timespec nowait; nowait.tv_sec = 0; nowait.tv_nsec = 10 * 1000000; - EmulatedJamBuffer thread_jam; - Uint32 watchDogCounter; Uint32 thrSignalId = 0; struct thr_repository* rep = &g_thr_repository; struct thr_data* selfptr = (struct thr_data *)thr_arg; - init_thread(selfptr, &thread_jam, &watchDogCounter); + init_thread(selfptr); + EmulatedJamBuffer& thread_jam = selfptr->m_jam; + Uint32& watchDogCounter = selfptr->m_watchdog_counter; unsigned thr_no = selfptr->m_thr_no; signal = aligned_signal(signal_buf, thr_no); - grab_threads(selfptr, &thread_jam, &watchDogCounter); - /* Avoid false watchdog alarms caused by race condition. */ watchDogCounter = 1; @@ -2103,6 +2145,7 @@ mt_job_thread_main(void *thr_arg) &watchDogCounter, &thrSignalId); watchDogCounter = 1; + signal->header.m_noOfSections = 0; /* valgrind */ sendpacked(selfptr, signal, thr_no); if (sum) @@ -2140,16 +2183,23 @@ mt_job_thread_main(void *thr_arg) } void -sendlocal(Uint32 self, Uint32 block, const SignalHeader *s, const Uint32 *data, +sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data, const Uint32 secPtr[3]) { + Uint32 block = blockToMain(s->theReceiversBlockNumber); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); + + // map on receiver side + if (instance != 0) + instance = 1 + (instance - 1) % ndbmt_workers; + /* * Max number of signals to put into job buffer before flushing the buffer * to the other thread. * This parameter found to be reasonable by benchmarking. */ - Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == RECEIVER_THREAD_NO ? 2 : 20); - Uint32 dst = block2ThreadId(block); + Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no ? 2 : 20); + Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; struct thr_data * selfptr = rep->m_thread + self; @@ -2164,15 +2214,23 @@ sendlocal(Uint32 self, Uint32 block, con selfptr->m_next_buffer = seize_buffer(rep, self, false); } - if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH) + // wl4391_todo + if (true || w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH) flush_write_state(dst, q, w); } void -sendprioa(Uint32 self, Uint32 block, const SignalHeader *s, const uint32 *data, +sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data, const Uint32 secPtr[3]) { - Uint32 dst = block2ThreadId(block); + Uint32 block = blockToMain(s->theReceiversBlockNumber); + Uint32 instance = blockToInstance(s->theReceiversBlockNumber); + + // map on receiver side + if (instance != 0) + instance = 1 + (instance - 1) % ndbmt_workers; + + Uint32 dst = block2ThreadId(block, instance); struct thr_repository* rep = &g_thr_repository; struct thr_data * selfptr = rep->m_thread + self; struct thr_data *dstptr = rep->m_thread + dst; @@ -2260,17 +2318,26 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst) static thr_job_buffer dummy_buffer; /* - * Currently we have three threads with fixed block assignment. + * Before we had three main threads with fixed block assignment. + * Now there is also worker instances (we send to LQH instance). */ - assert(dst == 0 || dst == 1 || dst == 2); // ToDo when/if more threads. - Uint32 bno = 0; + Uint32 main = 0; + Uint32 instance = 0; if (dst == 0) - bno = NDBCNTR; + main = NDBCNTR; else if (dst == 1) - bno = DBLQH; - else if (dst == 2) - bno = CMVMI; - assert(block2ThreadId(bno) == dst); + main = DBLQH; + else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS + ndbmt_threads) + { + main = DBLQH; + instance = dst - NUM_MAIN_THREADS + 1; + } + else if (dst == receiver_thread_no) + main = CMVMI; + else + assert(false); + Uint32 bno = numberToBlock(main, instance); + assert(block2ThreadId(main, instance) == dst); struct thr_data * dstptr = rep->m_thread + dst; memset(&signalT.header, 0, sizeof(SignalHeader)); @@ -2472,8 +2539,6 @@ thr_init(struct thr_repository* rep, str queue_init(&selfptr->m_tq); - selfptr->m_jam = NULL; - selfptr->m_prioa_count = 0; selfptr->m_prioa_size = 0; selfptr->m_priob_count = 0; @@ -2551,7 +2616,15 @@ ThreadConfig::~ThreadConfig() void ThreadConfig::init(EmulatorData *emulatorData) { - ::rep_init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager); + ndbmt_workers = globalData.ndbmtWorkers; + ndbmt_threads = globalData.ndbmtThreads; + num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1; + assert(num_threads <= MAX_THREADS); + receiver_thread_no = num_threads - 1; + + ndbout << "NDBMT: num_threads=" << num_threads << endl; + + ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager); } void ThreadConfig::ipControlLoop(Uint32 thread_index) @@ -2559,25 +2632,19 @@ void ThreadConfig::ipControlLoop(Uint32 unsigned int i; unsigned int thr_no; struct thr_repository* rep = &g_thr_repository; - NdbThread *threads[NUM_THREADS]; + NdbThread *threads[MAX_THREADS]; + + add_main_thr_map(); /* * Start threads for all execution threads, except for the receiver * thread, which runs in the main thread. */ - for (thr_no = 0; thr_no < NUM_THREADS; thr_no++) + for (thr_no = 0; thr_no < num_threads; thr_no++) { - for (i = 0; im_thread[thr_no].m_blocks[i] = - globalData.getBlock(i + MIN_BLOCK_NO); - } - } rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond(); - if (thr_no == RECEIVER_THREAD_NO) + if (thr_no == receiver_thread_no) continue; // Will run in the main thread. /* * The NdbThread_Create() takes void **, but that is cast to void * when @@ -2592,12 +2659,12 @@ void ThreadConfig::ipControlLoop(Uint32 } /* Now run the main loop for thread 0 directly. */ - mt_receiver_thread_main(&(rep->m_thread[RECEIVER_THREAD_NO])); + mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no])); /* Wait for all threads to shutdown. */ - for (thr_no = 0; thr_no < NUM_THREADS; thr_no++) + for (thr_no = 0; thr_no < num_threads; thr_no++) { - if (thr_no == RECEIVER_THREAD_NO) + if (thr_no == receiver_thread_no) continue; void *dummy_return_status; NdbThread_WaitFor(threads[thr_no], &dummy_return_status); @@ -2608,6 +2675,8 @@ void ThreadConfig::ipControlLoop(Uint32 int ThreadConfig::doStart(NodeState::StartLevel startLevel) { + add_main_thr_map(); + SignalT<3> signalT; memset(&signalT.header, 0, sizeof(SignalHeader)); @@ -2621,7 +2690,7 @@ ThreadConfig::doStart(NodeState::StartLe StartOrd * const startOrd = (StartOrd *)&signalT.theData[0]; startOrd->restartInfo = 0; - senddelay(block2ThreadId(CMVMI), &signalT.header, 1); + senddelay(block2ThreadId(CMVMI, 0), &signalT.header, 1); return 0; } @@ -2656,7 +2725,7 @@ Uint32 FastScheduler::traceDumpGetNumThreads() { /* The last thread is only for receiver -> no trace file. */ - return NUM_THREADS; + return num_threads; } bool @@ -2664,7 +2733,7 @@ FastScheduler::traceDumpGetJam(Uint32 th const Uint32 * & thrdTheEmulatedJam, Uint32 & thrdTheEmulatedJamIndex) { - if (thr_no >= NUM_THREADS) + if (thr_no >= num_threads) return false; #ifdef NO_EMULATED_JAM @@ -2672,7 +2741,7 @@ FastScheduler::traceDumpGetJam(Uint32 th thrdTheEmulatedJam = NULL; thrdTheEmulatedJamIndex = 0; #else - const EmulatedJamBuffer *jamBuffer = g_thr_repository.m_thread[thr_no].m_jam; + const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam; thrdTheEmulatedJam = jamBuffer->theEmulatedJam; thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex; jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber; @@ -2708,7 +2777,7 @@ FastScheduler::traceDumpPrepare() pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex); g_thr_repository.stopped_threads = 0; - for (Uint32 thr_no = 0; thr_no < NUM_THREADS; thr_no++) + for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++) { if (selfptr != NULL && selfptr->m_thr_no == thr_no) { @@ -2766,7 +2835,7 @@ void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out) { void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD); - const thr_data *selfptr = reinterpret_cast(value); + thr_data *selfptr = reinterpret_cast(value); const thr_repository *rep = &g_thr_repository; /* * The selfptr might be NULL, or pointer to thread that is doing the crash @@ -2775,7 +2844,7 @@ FastScheduler::dumpSignalMemory(Uint32 t */ Uint32 *watchDogCounter; if (selfptr) - watchDogCounter = selfptr->m_watchdog_counter; + watchDogCounter = &selfptr->m_watchdog_counter; else watchDogCounter = NULL; === modified file 'storage/ndb/src/kernel/vm/mt.hpp' --- a/storage/ndb/src/kernel/vm/mt.hpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-07-25 05:48:32 +0000 @@ -14,9 +14,13 @@ */ extern Uint32 receiverThreadId; -void sendlocal(Uint32 self, Uint32 block, const struct SignalHeader *s, +/* Assign block instance to thread */ +void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no); +void add_worker_thr_map(Uint32 block, Uint32 instance); + +void sendlocal(Uint32 self, const struct SignalHeader *s, const Uint32 *data, const Uint32 secPtr[3]); -void sendprioa(Uint32 self, Uint32 block, const struct SignalHeader *s, +void sendprioa(Uint32 self, const struct SignalHeader *s, const Uint32 *data, const Uint32 secPtr[3]); void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay); void mt_execSTOP_FOR_CRASH(); === modified file 'storage/ndb/src/mgmapi/mgmapi.cpp' --- a/storage/ndb/src/mgmapi/mgmapi.cpp 2008-03-13 14:09:32 +0000 +++ b/storage/ndb/src/mgmapi/mgmapi.cpp 2008-06-09 18:30:58 +0000 @@ -218,19 +218,20 @@ ndb_mgm_set_name(NdbMgmHandle handle, co extern "C" int -ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv) +ndb_mgm_set_connectstring(NdbMgmHandle handle, const char* connect_string) { DBUG_ENTER("ndb_mgm_set_connectstring"); DBUG_PRINT("info", ("handle: 0x%lx", (long) handle)); handle->cfg.~LocalConfig(); new (&(handle->cfg)) LocalConfig; - if (!handle->cfg.init(mgmsrv, 0) || + if (!handle->cfg.init(connect_string, 0) || handle->cfg.ids.size() == 0) { handle->cfg.~LocalConfig(); new (&(handle->cfg)) LocalConfig; handle->cfg.init(0, 0); /* reset the LocalConfig */ - SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, mgmsrv ? mgmsrv : ""); + SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, + connect_string ? connect_string : ""); DBUG_RETURN(-1); } handle->cfg_i= -1; === modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp' --- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-03-14 13:34:05 +0000 +++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-06-10 20:06:47 +0000 @@ -229,7 +229,6 @@ extern "C" { #include #include #include -#include "MgmtErrorReporter.hpp" #include #include #include === modified file 'storage/ndb/src/mgmsrv/Config.cpp' --- a/storage/ndb/src/mgmsrv/Config.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/Config.cpp 2008-07-29 14:04:27 +0000 @@ -14,11 +14,11 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "Config.hpp" -#include "MgmtErrorReporter.hpp" -//***************************************************************************** -// Ctor / Dtor -//***************************************************************************** +#include +#include +#include "ConfigInfo.hpp" + Config::Config(struct ndb_mgm_configuration *config_values) : m_configValues(config_values) @@ -32,144 +32,52 @@ Config::~Config() { } } -/*****************************************************************************/ - -void -Config::printAllNameValuePairs(NdbOut &out, - const Properties *prop, - const char* s) const { - Properties::Iterator it(prop); - const Properties * section = m_info.getInfo(s); - for (const char* n = it.first(); n != NULL; n = it.next()) { - Uint32 int_value; - const char* str_value; - Uint64 int_64; +unsigned sections[]= +{ + CFG_SECTION_SYSTEM, + CFG_SECTION_NODE, + CFG_SECTION_CONNECTION +}; +const size_t num_sections= sizeof(sections)/sizeof(unsigned); + +static const ConfigInfo g_info; + +void +Config::print() const { + + for(unsigned i= 0; i < num_sections; i++) { + unsigned section= sections[i]; + ndb_mgm_configuration_iterator it(*m_configValues, section); - if(!section->contains(n)) - continue; - if (m_info.getStatus(section, n) == ConfigInfo::CI_INTERNAL) - continue; - if (m_info.getStatus(section, n) == ConfigInfo::CI_DEPRICATED) - continue; - if (m_info.getStatus(section, n) == ConfigInfo::CI_NOTIMPLEMENTED) + if (it.first()) continue; - out << n << ": "; + for(;it.valid();it.next()) { - switch (m_info.getType(section, n)) { - case ConfigInfo::CI_INT: - MGM_REQUIRE(prop->get(n, &int_value)); - out << int_value; - break; - - case ConfigInfo::CI_INT64: - MGM_REQUIRE(prop->get(n, &int_64)); - out << int_64; - break; - - case ConfigInfo::CI_BOOL: - MGM_REQUIRE(prop->get(n, &int_value)); - if (int_value) { - out << "Y"; - } else { - out << "N"; + Uint32 section_type; + if(it.get(CFG_TYPE_OF_SECTION, §ion_type) != 0) + continue; + + const ConfigInfo::ParamInfo* pinfo= NULL; + ConfigInfo::ParamInfoIter param_iter(g_info, + section, + section_type); + + ndbout_c("[%s]", g_info.sectionName(section, section_type)); + + /* Loop through the section and print those values that exist */ + Uint32 val; + Uint64 val64; + const char* val_str; + while((pinfo= param_iter.next())){ + + if (!it.get(pinfo->_paramId, &val)) + ndbout_c("%s=%u", pinfo->_fname, val); + else if (!it.get(pinfo->_paramId, &val64)) + ndbout_c("%s=%llu", pinfo->_fname, val64); + else if (!it.get(pinfo->_paramId, &val_str)) + ndbout_c("%s=%s", pinfo->_fname, val_str); } - break; - case ConfigInfo::CI_STRING: - MGM_REQUIRE(prop->get(n, &str_value)); - out << str_value; - break; - case ConfigInfo::CI_SECTION: - out << "SECTION"; - break; - } - out << endl; - } -} - -/*****************************************************************************/ - -void Config::printConfigFile(NdbOut &out) const { -#if 0 - Uint32 noOfNodes, noOfConnections, noOfComputers; - MGM_REQUIRE(get("NoOfNodes", &noOfNodes)); - MGM_REQUIRE(get("NoOfConnections", &noOfConnections)); - MGM_REQUIRE(get("NoOfComputers", &noOfComputers)); - - out << - "######################################################################" << - endl << - "#" << endl << - "# NDB Cluster System configuration" << endl << - "#" << endl << - "######################################################################" << - endl << - "# No of nodes (DB, API or MGM): " << noOfNodes << endl << - "# No of connections: " << noOfConnections << endl << - "######################################################################" << - endl; - - /************************** - * Print COMPUTER configs * - **************************/ - const char * name; - Properties::Iterator it(this); - for(name = it.first(); name != NULL; name = it.next()){ - if(strncasecmp("Computer_", name, 9) == 0){ - - const Properties *prop; - out << endl << "[COMPUTER]" << endl; - MGM_REQUIRE(get(name, &prop)); - printAllNameValuePairs(out, prop, "COMPUTER"); - - out << endl << - "###################################################################" << - endl; - - } else if(strncasecmp("Node_", name, 5) == 0){ - /********************** - * Print NODE configs * - **********************/ - const Properties *prop; - const char *s; - - MGM_REQUIRE(get(name, &prop)); - MGM_REQUIRE(prop->get("Type", &s)); - out << endl << "[" << s << "]" << endl; - printAllNameValuePairs(out, prop, s); - - out << endl << - "###################################################################" << - endl; - } else if(strncasecmp("Connection_", name, 11) == 0){ - /**************************** - * Print CONNECTION configs * - ****************************/ - const Properties *prop; - const char *s; - - MGM_REQUIRE(get(name, &prop)); - MGM_REQUIRE(prop->get("Type", &s)); - out << endl << "[" << s << "]" << endl; - printAllNameValuePairs(out, prop, s); - - out << endl << - "###################################################################" << - endl; - } else if(strncasecmp("SYSTEM", name, strlen("SYSTEM")) == 0) { - /************************ - * Print SYSTEM configs * - ************************/ - const Properties *prop; - - MGM_REQUIRE(get(name, &prop)); - out << endl << "[SYSTEM]" << endl; - printAllNameValuePairs(out, prop, "SYSTEM"); - - out << endl << - "###################################################################" << - endl; } } -#endif } === modified file 'storage/ndb/src/mgmsrv/Config.hpp' --- a/storage/ndb/src/mgmsrv/Config.hpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/Config.hpp 2008-07-29 13:38:18 +0000 @@ -22,16 +22,10 @@ /** * @class Config - * @brief Cluster Configuration (corresponds to initial configuration file) + * @brief Cluster Configuration Wrapper * - * Contains all cluster configuration parameters. - * - * The information includes all configurable parameters for a NDB cluster: - * - DB, API and MGM nodes with all their properties, - * - Connections between nodes and computers the nodes will execute on. - * - * The following categories (sections) of configuration parameters exists: - * - COMPUTER, DB, MGM, API, TCP, SCI, SHM + * Adds a C++ wrapper around 'ndb_mgm_configuration' which is + * exposed from mgmapi_configuration * */ @@ -40,31 +34,15 @@ public: Config(struct ndb_mgm_configuration *config_values = NULL); virtual ~Config(); - /** - * Prints the configuration in configfile format - */ - void printConfigFile(NdbOut &out = ndbout) const; - void printConfigFile(OutputStream &out) const { - NdbOut ndb(out); - printConfigFile(ndb); - } - - /** - * Info - */ - const ConfigInfo * getConfigInfo() const { return &m_info;} -private: - ConfigInfo m_info; - - void printAllNameValuePairs(NdbOut &out, - const Properties *prop, - const char* section) const; - - /** - * Information about parameters (min, max values etc) - */ -public: + void print() const; + struct ndb_mgm_configuration * m_configValues; }; +class ConfigIter : public ndb_mgm_configuration_iterator { +public: + ConfigIter(const Config* conf, unsigned type) : + ndb_mgm_configuration_iterator(*conf->m_configValues, type) {}; +}; + #endif // Config_H === modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp' --- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-07-28 09:43:13 +0000 @@ -2832,6 +2832,57 @@ ConfigInfo::getAlias(const char * sectio return 0; } + +const char* +ConfigInfo::sectionName(Uint32 section_type, Uint32 type) const { + + switch (section_type){ + case CFG_SECTION_SYSTEM: + return "SYSTEM"; + break; + + case CFG_SECTION_NODE: + switch(type){ + case NODE_TYPE_DB: + return DB_TOKEN_PRINT; + break; + case NODE_TYPE_MGM: + return MGM_TOKEN_PRINT; + break; + case NODE_TYPE_API: + return API_TOKEN_PRINT; + break; + default: + assert(false); + break; + } + break; + + case CFG_SECTION_CONNECTION: + switch(type){ + case CONNECTION_TYPE_TCP: + return "TCP"; + break; + case CONNECTION_TYPE_SHM: + return "SHM"; + break; + case CONNECTION_TYPE_SCI: + return "SCI"; + break; + default: + assert(false); + break; + } + break; + + default: + assert(false); + break; + } + + return ""; +} + bool ConfigInfo::verify(const Properties * section, const char* fname, Uint64 value) const { @@ -3046,7 +3097,7 @@ fixNodeHostname(InitConfigFileParser::Co } if(!computer->get("HostName", &hostname)){ - ctx.reportError("HostName missing in [COMPUTER] (Id: %d) " + ctx.reportError("HostName missing in [COMPUTER] (Id: %s) " " - [%s] starting at line: %d", compId, ctx.fname, ctx.m_sectionLineno); DBUG_RETURN(false); @@ -3819,8 +3870,6 @@ fixDepricated(InitConfigFileParser::Cont return true; } -extern int g_print_full_config; - static bool saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){ const Properties * sec; @@ -3841,12 +3890,6 @@ saveInConfigValues(InitConfigFileParser: ndbout_c("skipping section %s", ctx.fname); break; } - - if (g_print_full_config) - { - const char *alias= ConfigInfo::nameToAlias(ctx.fname); - printf("[%s]\n", alias ? alias : ctx.fname); - } Uint32 no = 0; ctx.m_userProperties.get("$Section", id, &no); @@ -3875,24 +3918,18 @@ saveInConfigValues(InitConfigFileParser: Uint32 val; require(ctx.m_currentSection->get(n, &val)); ok = ctx.m_configValues.put(id, val); - if (g_print_full_config) - printf("%s=%u\n", n, val); break; } case PropertiesType_Uint64:{ Uint64 val; require(ctx.m_currentSection->get(n, &val)); ok = ctx.m_configValues.put64(id, val); - if (g_print_full_config) - printf("%s=%llu\n", n, val); break; } case PropertiesType_char:{ const char * val; require(ctx.m_currentSection->get(n, &val)); ok = ctx.m_configValues.put(id, val); - if (g_print_full_config) - printf("%s=%s\n", n, val); break; } default: @@ -4270,5 +4307,43 @@ check_node_vs_replicas(Vector_section, m_section_name) == 0 && + param->_type != ConfigInfo::CI_SECTION) + return param; + } + while (m_curr_param; #endif /* NDB_MGMAPI */ === modified file 'storage/ndb/src/mgmsrv/ConfigInfo.hpp' --- a/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-07-25 10:02:59 +0000 @@ -81,11 +81,26 @@ public: * For section entries, instead the _default member gives the internal id * of that kind of section (CONNECTION_TYPE_TCP, NODE_TYPE_MGM, etc.) */ - const char* _default; + union { + const char* _default; + Uint32 _section_type; // if _type = CI_SECTION + }; const char* _min; const char* _max; }; + class ParamInfoIter { + const ConfigInfo& m_info; + const char* m_section_name; + int m_curr_param; + public: + ParamInfoIter(const ConfigInfo& info, + Uint32 section, + Uint32 section_type = ~0); + + const ParamInfo* next(void); + }; + #ifndef NDB_MGMAPI struct AliasPair{ const char * name; @@ -149,6 +164,8 @@ public: void print(const char* section) const; void print(const Properties * section, const char* parameter) const; + const char* sectionName(Uint32 section, Uint32 type) const; + private: Properties m_info; Properties m_systemDefaults; === modified file 'storage/ndb/src/mgmsrv/InitConfigFileParser.cpp' --- a/storage/ndb/src/mgmsrv/InitConfigFileParser.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/InitConfigFileParser.cpp 2008-07-25 16:03:09 +0000 @@ -17,7 +17,6 @@ #include "InitConfigFileParser.hpp" #include "Config.hpp" -#include "MgmtErrorReporter.hpp" #include #include "ConfigInfo.hpp" #include @@ -319,7 +318,7 @@ InitConfigFileParser::storeNameValuePair ctx.reportError("Illegal boolean value for parameter %s", fname); return false; } - MGM_REQUIRE(ctx.m_currentSection->put(pname, value_bool)); + require(ctx.m_currentSection->put(pname, value_bool)); break; } case ConfigInfo::CI_INT: @@ -337,14 +336,14 @@ InitConfigFileParser::storeNameValuePair return false; } if(type == ConfigInfo::CI_INT){ - MGM_REQUIRE(ctx.m_currentSection->put(pname, (Uint32)value_int)); + require(ctx.m_currentSection->put(pname, (Uint32)value_int)); } else { - MGM_REQUIRE(ctx.m_currentSection->put64(pname, value_int)); + require(ctx.m_currentSection->put64(pname, value_int)); } break; } case ConfigInfo::CI_STRING: - MGM_REQUIRE(ctx.m_currentSection->put(pname, value)); + require(ctx.m_currentSection->put(pname, value)); break; case ConfigInfo::CI_SECTION: abort(); === modified file 'storage/ndb/src/mgmsrv/InitConfigFileParser.hpp' --- a/storage/ndb/src/mgmsrv/InitConfigFileParser.hpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/src/mgmsrv/InitConfigFileParser.hpp 2008-06-10 09:21:15 +0000 @@ -84,8 +84,10 @@ public: public: FILE * m_errstream; - void reportError(const char * msg, ...); - void reportWarning(const char * msg, ...); + void reportError(const char * msg, ...) + ATTRIBUTE_FORMAT(printf, 2, 3); + void reportWarning(const char * msg, ...) + ATTRIBUTE_FORMAT(printf, 2, 3); }; static bool convertStringToUint64(const char* s, Uint64& val, Uint32 log10base = 0); === modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-07-29 14:20:49 +0000 @@ -17,9 +17,8 @@ #include #include "MgmtSrvr.hpp" -#include "MgmtErrorReporter.hpp" #include "ndb_mgmd_error.h" -#include +#include "Services.hpp" #include #include @@ -57,13 +56,6 @@ #include -//#define MGM_SRV_DEBUG -#ifdef MGM_SRV_DEBUG -#define DEBUG(x) do ndbout << x << endl; while(0) -#else -#define DEBUG(x) -#endif - int g_errorInsert; #define ERROR_INSERTED(x) (g_errorInsert == x) @@ -77,7 +69,6 @@ int g_errorInsert; }\ } -extern int g_no_nodeid_checks; extern my_bool opt_core; static void require(bool v) @@ -215,56 +206,6 @@ MgmtSrvr::logLevelThreadRun() } } -void -MgmtSrvr::startEventLog() -{ - NdbMutex_Lock(m_configMutex); - - g_eventLogger->setCategory("MgmSrvr"); - - ndb_mgm_configuration_iterator - iter(* _config->m_configValues, CFG_SECTION_NODE); - - if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){ - NdbMutex_Unlock(m_configMutex); - return; - } - - const char * tmp; - char errStr[100]; - int err= 0; - BaseString logdest; - char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId); - NdbAutoPtr tmp_aptr(clusterLog); - - if(iter.get(CFG_LOG_DESTINATION, &tmp) == 0){ - logdest.assign(tmp); - } - NdbMutex_Unlock(m_configMutex); - - if(logdest.length() == 0 || logdest == "") { - logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6", - clusterLog); - } - errStr[0]='\0'; - if(!g_eventLogger->addHandler(logdest, &err, sizeof(errStr), errStr)) { - ndbout << "Warning: could not add log destination \"" - << logdest.c_str() << "\". Reason: "; - if(err) - ndbout << strerror(err); - if(err && errStr[0]!='\0') - ndbout << ", "; - if(errStr[0]!='\0') - ndbout << errStr; - ndbout << endl; - } -} - -void -MgmtSrvr::stopEventLog() -{ - g_eventLogger->close(); -} bool MgmtSrvr::setEventLogFilter(int severity, int enable) @@ -308,279 +249,304 @@ int MgmtSrvr::translateStopRef(Uint32 er } -int -MgmtSrvr::getPort() const +MgmtSrvr::MgmtSrvr(const MgmtOpts& opts, + const char* connect_str) : + m_opts(opts), + _blockNumber(-1), + _ownNodeId(0), + m_port(0), + m_config_retriever(connect_str, + NDB_VERSION, + NDB_MGM_NODE_TYPE_MGM), + _ownReference(0), + _config(NULL), + theFacade(NULL), + _isStopThread(false), + _logLevelThreadSleep(500), + m_event_listner(this), + m_master_node(0), + _logLevelThread(NULL) { - if(NdbMutex_Lock(m_configMutex)) - return 0; - - ndb_mgm_configuration_iterator - iter(* _config->m_configValues, CFG_SECTION_NODE); + DBUG_ENTER("MgmtSrvr::MgmtSrvr"); - if(iter.find(CFG_NODE_ID, getOwnNodeId()) != 0){ - ndbout << "Could not retrieve configuration for Node " - << getOwnNodeId() << " in config file." << endl - << "Have you set correct NodeId for this node?" << endl; - NdbMutex_Unlock(m_configMutex); - return 0; + m_configMutex = NdbMutex_Create(); + m_node_id_mutex = NdbMutex_Create(); + if (!m_configMutex || !m_node_id_mutex) + { + g_eventLogger->error("Failed to create MgmtSrvr mutexes"); + require(false); } - unsigned type; - if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 || - type != NODE_TYPE_MGM){ - ndbout << "Local node id " << getOwnNodeId() - << " is not defined as management server" << endl - << "Have you set correct NodeId for this node?" << endl; - NdbMutex_Unlock(m_configMutex); - return 0; - } - - Uint32 port = 0; - if(iter.get(CFG_MGM_PORT, &port) != 0){ - ndbout << "Could not find PortNumber in the configuration file." << endl; - NdbMutex_Unlock(m_configMutex); - return 0; + /* Init node arrays */ + for(Uint32 i = 0; ido_connect(0,0,0) == 0)) + if (m_opts.mycnf || m_opts.config_filename) { - int tmp_nodeid= 0; - tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/); - if (tmp_nodeid == 0) + Config* conf= NULL; + if (m_opts.mycnf && (conf= load_init_mycnf()) == NULL) { - ndbout_c(m_config_retriever->getErrorString()); - require(false); + g_eventLogger->error("Could not load config from 'my.cnf'"); + DBUG_RETURN(false); } - // read config from other managent server - _config= fetchConfig(); - if (_config == 0) + else if (m_opts.config_filename && (conf= load_init_config()) == NULL) { - ndbout << m_config_retriever->getErrorString() << endl; - require(false); + g_eventLogger->error("Could not load initial config from '%s'", + m_opts.config_filename); + DBUG_RETURN(false); + } + setConfig(conf); + } + else + { + if (fetch_config()) + { + g_eventLogger->error("Could not fetch config"); + DBUG_RETURN(false); } - _ownNodeId= tmp_nodeid; + } + + if (m_opts.print_full_config) + { + print_config(); + DBUG_RETURN(false); } if (_ownNodeId == 0) { - // read config locally - _config= readConfig(); - if (_config == 0) { - if (config_filename != NULL) - ndbout << "Invalid configuration file: " << config_filename << endl; - else - ndbout << "Invalid configuration file" << endl; - exit(-1); + _ownNodeId= m_config_retriever.get_configuration_nodeid(); + + int error_code; + BaseString error_string; + if (!alloc_node_id(&_ownNodeId, NDB_MGM_NODE_TYPE_MGM, + 0, 0, error_code, error_string, 0)) + { + g_eventLogger->error("Unable to obtain requested nodeid, error: '%s'", + error_string.c_str()); + DBUG_RETURN(false); + } + + if (!m_config_retriever.verifyConfig(_config->m_configValues, + _ownNodeId)) + { + g_eventLogger->error(m_config_retriever.getErrorString()); + DBUG_RETURN(false); } } - m_configMutex = NdbMutex_Create(); + setClusterLog(); + + DBUG_RETURN(true); +} - /** - * Fill the nodeTypes array - */ - for(Uint32 i = 0; im_configValues, CFG_SECTION_NODE); + g_eventLogger->error("Could not create TransporterFacade."); + DBUG_RETURN(false); + } - for(iter.first(); iter.valid(); iter.next()){ - unsigned type, id; - if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0) - continue; - - if(iter.get(CFG_NODE_ID, &id) != 0) - continue; - - MGM_REQUIRE(id < MAX_NODES); - - switch(type){ - case NODE_TYPE_DB: - nodeTypes[id] = NDB_MGM_NODE_TYPE_NDB; - break; - case NODE_TYPE_API: - nodeTypes[id] = NDB_MGM_NODE_TYPE_API; - break; - case NODE_TYPE_MGM: - nodeTypes[id] = NDB_MGM_NODE_TYPE_MGM; - break; - default: - break; - } - } + if (theFacade->start_instance(_ownNodeId, + _config->m_configValues) < 0) + { + g_eventLogger->error("Failed to start transporter"); + delete theFacade; + theFacade = 0; + DBUG_RETURN(false); } - BaseString error_string; + assert(_blockNumber == -1); // Blocknumber shouldn't been allocated yet - if ((m_node_id_mutex = NdbMutex_Create()) == 0) + /* + Register ourself at TransporterFacade to be able to receive signals + and to be notified when a database process has died. + */ + if ((_blockNumber= theFacade->open(this, + signalReceivedNotification, + nodeStatusNotification)) == -1) { - ndbout << "mutex creation failed line = " << __LINE__ << endl; - require(false); + g_eventLogger->error("Failed to open block in TransporterFacade"); + theFacade->stop_instance(); + delete theFacade; + theFacade = 0; + DBUG_RETURN(false); } - if (_ownNodeId == 0) // we did not get node id from other server + _ownReference = numberToRef(_blockNumber, _ownNodeId); + + /* + set api reg req frequency quite high: + + 100 ms interval to make sure we have fairly up-to-date + info from the nodes. This to make sure that this info + is not dependent on heartbeat settings in the + configuration + */ + theFacade->theClusterMgr->set_max_api_reg_req_interval(100); + + DBUG_RETURN(true); +} + + +bool +MgmtSrvr::start_mgm_service() +{ + DBUG_ENTER("MgmtSrvr::start_mgm_service"); + + assert(m_port == 0); { - NodeId tmp= m_config_retriever->get_configuration_nodeid(); - int error_code; + // Find the portnumber to use for mgm service + Guard g(m_configMutex); + ConfigIter iter(_config, CFG_SECTION_NODE); - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, - 0, 0, error_code, error_string)){ - ndbout << "Unable to obtain requested nodeid: " - << error_string.c_str() << endl; - require(false); + if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){ + g_eventLogger->error("Could not find node %d in config", _ownNodeId); + DBUG_RETURN(false); + } + + unsigned type; + if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 || + type != NODE_TYPE_MGM){ + g_eventLogger->error("Node %d is not defined as management server", + _ownNodeId); + return 0; + DBUG_RETURN(false); } - _ownNodeId = tmp; + + if(iter.get(CFG_MGM_PORT, &m_port) != 0){ + g_eventLogger->error("PortNumber not defined for node %d", _ownNodeId); + return 0; + DBUG_RETURN(false); + } + } + + unsigned short port= m_port; + DBUG_PRINT("info", ("Using port %d", port)); + if (port == 0) + { + g_eventLogger->error("Could not find out which port to use"\ + " for management service"); + DBUG_RETURN(false); } { - DBUG_PRINT("info", ("verifyConfig")); - if (!m_config_retriever->verifyConfig(_config->m_configValues, - _ownNodeId)) + int count= 5; // no of retries for tryBind + while(!m_socket_server.tryBind(port, m_opts.bind_address)) { - ndbout << m_config_retriever->getErrorString() << endl; - require(false); + if (--count > 0) + { + NdbSleep_SecSleep(1); + continue; + } + g_eventLogger->error("Unable to bind management service port: %s:%d!\n" + "Please check if the port is already used,\n" + "(perhaps a ndb_mgmd is already running),\n" + "and if you are executing on the correct computer", + (m_opts.bind_address ? m_opts.bind_address : "*"), + port); + DBUG_RETURN(false); } } - // Setup clusterlog as client[0] in m_event_listner { - Ndb_mgmd_event_service::Event_listener se; - se.m_socket = NDB_INVALID_SOCKET; - for(size_t t = 0; terror("Could not allocate MgmApiService"); + DBUG_RETURN(false); + } + + if(!m_socket_server.setup(mapi, &port, m_opts.bind_address)) + { + delete mapi; // Will be deleted by SocketServer in all other cases + g_eventLogger->error("Unable to setup management service port: %s:%d!\n" + "Please check if the port is already used,\n" + "(perhaps a ndb_mgmd is already running),\n" + "and if you are executing on the correct computer", + (m_opts.bind_address ? m_opts.bind_address : "*"), + port); + DBUG_RETURN(false); + } + + if (port != m_port) + { + g_eventLogger->error("Couldn't start management service on the "\ + "requested port: %d. Got port: %d instead", + m_port, port); + DBUG_RETURN(false); } - se.m_logLevel.setLogLevel(LogLevel::llError, 15); - se.m_logLevel.setLogLevel(LogLevel::llConnection, 8); - se.m_logLevel.setLogLevel(LogLevel::llBackup, 15); - m_event_listner.m_clients.push_back(se); - m_event_listner.m_logLevel = se.m_logLevel; } - - DBUG_VOID_RETURN; + + m_socket_server.startServer(); + + g_eventLogger->info("Id: %d, Command port: %s:%d", + _ownNodeId, + m_opts.bind_address ? m_opts.bind_address : "*", + port); + DBUG_RETURN(true); } -bool -MgmtSrvr::start(BaseString &error_string, const char * bindaddress) +bool +MgmtSrvr::start() { - int mgm_connect_result; - DBUG_ENTER("MgmtSrvr::start"); - theFacade= new TransporterFacade(0); - - if(theFacade == 0) { - DEBUG("MgmtSrvr.cpp: theFacade is NULL."); - error_string.append("MgmtSrvr.cpp: theFacade is NULL."); - DBUG_RETURN(false); - } - if ( theFacade->start_instance - (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) { - DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0."); + /* Start transporter */ + if(!start_transporter()) + { + g_eventLogger->error("Failed to start transporter!"); DBUG_RETURN(false); } - assert(_blockNumber == -1); - - // Register ourself at TransporterFacade to be able to receive signals - // and to be notified when a database process has died. - _blockNumber = theFacade->open(this, - signalReceivedNotification, - nodeStatusNotification); - - if(_blockNumber == -1){ - DEBUG("MgmtSrvr.cpp: _blockNumber is -1."); - error_string.append("MgmtSrvr.cpp: _blockNumber is -1."); - theFacade->stop_instance(); - theFacade = 0; + /* Start mgm service */ + if (!start_mgm_service()) + { + g_eventLogger->error("Failed to start mangement service!"); DBUG_RETURN(false); } - if((mgm_connect_result= connect_to_self(bindaddress)) < 0) + /* Use local MGM port for TransporterRegistry */ + if(!connect_to_self()) { - ndbout_c("Unable to connect to our own ndb_mgmd (Error %d)", - mgm_connect_result); - ndbout_c("This is probably a bug."); - } - - /* - set api reg req frequency quite high: - - 100 ms interval to make sure we have fairly up-to-date - info from the nodes. This to make sure that this info - is not dependent on heart beat settings in the - configuration - */ - theFacade->theClusterMgr->set_max_api_reg_req_interval(100); - - TransporterRegistry *reg = theFacade->get_registry(); - for(unsigned int i=0;im_transporter_interface.size();i++) { - BaseString msg; - DBUG_PRINT("info",("Setting dynamic port %d->%d : %d", - reg->get_localNodeId(), - reg->m_transporter_interface[i].m_remote_nodeId, - reg->m_transporter_interface[i].m_s_service_port - ) - ); - int res = setConnectionDbParameter((int)reg->get_localNodeId(), - (int)reg->m_transporter_interface[i] - .m_remote_nodeId, - (int)CFG_CONNECTION_SERVER_PORT, - reg->m_transporter_interface[i] - .m_s_service_port, - msg); - DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str())); + g_eventLogger->error("Failed to connect to ourself!"); + DBUG_RETURN(false); } - _ownReference = numberToRef(_blockNumber, _ownNodeId); - - startEventLog(); - // Set the initial confirmation count for subscribe requests confirm - // from NDB nodes in the cluster. - // - // Loglevel thread + /* Loglevel thread */ + assert(_isStopThread == false); _logLevelThread = NdbThread_Create(logLevelThread_C, (void**)this, 32768, @@ -591,25 +557,100 @@ MgmtSrvr::start(BaseString &error_string } -//**************************************************************************** -//**************************************************************************** -MgmtSrvr::~MgmtSrvr() +void +MgmtSrvr::setClusterLog(void) { - if(theFacade != 0){ - theFacade->stop_instance(); - delete theFacade; - theFacade = 0; + BaseString logdest; + + g_eventLogger->close(); + + // Get log destination from config + DBUG_ASSERT(_ownNodeId); + ConfigIter iter(_config, CFG_SECTION_NODE); + require(iter.find(CFG_NODE_ID, _ownNodeId) == 0); + + const char *value; + if(iter.get(CFG_LOG_DESTINATION, &value) == 0){ + logdest.assign(value); } - stopEventLog(); + if(logdest.length() == 0 || logdest == "") { + // No LogDestination set, use default settings + char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId); + logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6", + clusterLog); + free(clusterLog); + } - NdbMutex_Destroy(m_node_id_mutex); - NdbMutex_Destroy(m_configMutex); + int err= 0; + char errStr[100]= {0}; + if(!g_eventLogger->addHandler(logdest, &err, sizeof(errStr), errStr)) { + ndbout << "Warning: could not add log destination '" + << logdest.c_str() << "'. Reason: "; + if(err) + ndbout << strerror(err); + if(err && errStr[0]!='\0') + ndbout << ", "; + if(errStr[0]!='\0') + ndbout << errStr; + ndbout << endl; + } + + if (m_opts.non_interactive) + g_eventLogger->createConsoleHandler(); +} + + +void +MgmtSrvr::setConfig(Config* conf) +{ + DBUG_ENTER("MgmtSrvr::setConfig"); + Guard g(m_configMutex); - if(_config != NULL) - delete _config; + _config= conf; - // End set log level thread + /* Rebuild node arrays */ + ConfigIter iter(_config, CFG_SECTION_NODE); + for(Uint32 i = 0; istop_instance(); + delete theFacade; + theFacade = 0; + } + + delete _config; + + NdbMutex_Destroy(m_node_id_mutex); + NdbMutex_Destroy(m_configMutex); } + //**************************************************************************** //**************************************************************************** @@ -856,12 +913,14 @@ int MgmtSrvr::sendStopMgmd(NodeId nodeId if(ndb_mgm_connect(h,1,0,0)) { DBUG_PRINT("info",("failed ndb_mgm_connect")); + ndb_mgm_destroy_handle(&h); return SEND_OR_RECEIVE_FAILED; } if(!restart) { if(ndb_mgm_stop(h, 1, (const int*)&nodeId) < 0) { + ndb_mgm_destroy_handle(&h); return SEND_OR_RECEIVE_FAILED; } } @@ -871,6 +930,7 @@ int MgmtSrvr::sendStopMgmd(NodeId nodeId nodes[0]= (int)nodeId; if(ndb_mgm_restart2(h, 1, nodes, initialStart, nostart, abort) < 0) { + ndb_mgm_destroy_handle(&h); return SEND_OR_RECEIVE_FAILED; } } @@ -973,7 +1033,7 @@ int MgmtSrvr::sendSTOP_REQ(const Vector< if ((getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM) &&(getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB)) - return WRONG_PROCESS_TYPE; + DBUG_RETURN(WRONG_PROCESS_TYPE); if (getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM) nodes_to_stop.set(nodeId); @@ -1309,7 +1369,7 @@ int MgmtSrvr::restartNodes(const Vector< Uint32 mysql_version = 0; Uint32 connectCount = 0; bool system; - const char *address; + const char *address= NULL; status(nodeId, &s, &version, &mysql_version, &startPhase, &system, &dynamicId, &nodeGroup, &connectCount, &address); NdbSleep_MilliSleep(100); @@ -1414,7 +1474,7 @@ int MgmtSrvr::restartDB(bool nostart, bo continue; int result; result = start(nodeId); - DEBUG("Starting node " << nodeId << " with result " << result); + g_eventLogger->debug("Started node %d with result %d", nodeId, result); /** * Errors from this call are deliberately ignored. * Maybe the user only wanted to restart a subset of the nodes. @@ -2156,7 +2216,7 @@ MgmtSrvr::alloc_node_id_req(NodeId free_ } bool -MgmtSrvr::alloc_node_id(NodeId * nodeId, +MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, struct sockaddr *client_addr, SOCKET_SIZE_TYPE *client_addr_len, @@ -2166,7 +2226,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, DBUG_ENTER("MgmtSrvr::alloc_node_id"); DBUG_PRINT("enter", ("nodeid: %d type: %d client_addr: 0x%ld", *nodeId, type, (long) client_addr)); - if (g_no_nodeid_checks) { + if (m_opts.no_nodeid_checks) { if (*nodeId == 0) { error_string.appfmt("no-nodeid-checks set in management server.\n" "node id must be set explicitly in connectstring"); @@ -2486,6 +2546,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, DBUG_RETURN(false); } + bool MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const { @@ -2994,23 +3055,28 @@ void MgmtSrvr::transporter_connect(NDB_S } } -int MgmtSrvr::connect_to_self(const char * bindaddress) +bool MgmtSrvr::connect_to_self() { - int r= 0; - m_local_mgm_handle= ndb_mgm_create_handle(); - snprintf(m_local_mgm_connect_string,sizeof(m_local_mgm_connect_string), - "%s:%u", bindaddress ? bindaddress : "localhost", getPort()); - ndb_mgm_set_connectstring(m_local_mgm_handle, m_local_mgm_connect_string); + BaseString buf; + NdbMgmHandle mgm_handle= ndb_mgm_create_handle(); - if((r= ndb_mgm_connect(m_local_mgm_handle, 0, 0, 0)) < 0) - { - ndb_mgm_destroy_handle(&m_local_mgm_handle); - return r; + buf.assfmt("%s:%u", + m_opts.bind_address ? m_opts.bind_address : "localhost", + getPort()); + ndb_mgm_set_connectstring(mgm_handle, buf.c_str()); + + if(ndb_mgm_connect(mgm_handle, 0, 0, 0) < 0) + { + g_eventLogger->warning("%d %s", + ndb_mgm_get_latest_error(mgm_handle), + ndb_mgm_get_latest_error_desc(mgm_handle)); + ndb_mgm_destroy_handle(&mgm_handle); + return false; } - // TransporterRegistry now owns this NdbMgmHandle and will destroy it. - theFacade->get_registry()->set_mgm_handle(m_local_mgm_handle); + // TransporterRegistry now owns the handle and will destroy it. + theFacade->get_registry()->set_mgm_handle(mgm_handle); - return 0; + return true; } template class MutexVector; === modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-07-29 13:38:18 +0000 @@ -96,16 +96,6 @@ public: NdbMutex *m_node_id_mutex; /** - * Start/initate the event log. - */ - void startEventLog(); - - /** - * Stop the event log. - */ - void stopEventLog(); - - /** * Enable/disable eventlog log levels/severities. * * @param serverity the log level/serverity. @@ -126,17 +116,61 @@ public: */ enum LogMode {In, Out, InOut, Off}; - /* Constructor */ - - MgmtSrvr(SocketServer *socket_server, - const char *config_filename, /* Where to save config */ - const char *connect_string); - NodeId getOwnNodeId() const {return _ownNodeId;}; + /** + @struct MgmtOpts + @brief Options used to control how the management server is started + */ + + struct MgmtOpts { + int daemon; + int non_interactive; + int interactive; + const char* config_filename; + int mycnf; + const char* bind_address; + int no_nodeid_checks; + int print_full_config; + }; - bool start(BaseString &error_string, const char * bindaddress = 0); + MgmtSrvr(); // Not implemented + MgmtSrvr(const MgmtSrvr&); // Not implemented + MgmtSrvr(const MgmtOpts&, const char* connect_str); ~MgmtSrvr(); + /* + To be called after constructor. Loads configuration + from disk or fetches it from other server + */ + bool init(); + +private: + /* Functions used from 'init' */ + Config* load_init_config(void); + Config* load_init_mycnf(void); + bool fetch_config(void); + bool save_config(const Config* conf); + bool save_config(void); + +public: + + /* + To be called after 'init', starts up the services + this server will expose + */ + bool start(void); +private: + /* Functions used from 'start' */ + bool start_transporter(void); + bool start_mgm_service(void); + bool connect_to_self(void); + +public: + + NodeId getOwnNodeId() const {return _ownNodeId;}; + + void print_config() { _config->print(); }; + /** * Get status on a node. * address may point to a common area (e.g. from inet_addr) @@ -153,32 +187,6 @@ public: Uint32 * nodeGroup, Uint32 * connectCount, const char **address); - - // All the functions below may return any of this error codes: - // NO_CONTACT_WITH_PROCESS, PROCESS_NOT_CONFIGURED, WRONG_PROCESS_TYPE, - // COULD_NOT_ALLOCATE_MEMORY, SEND_OR_RECEIVE_FAILED - - /** - * Save a configuration to permanent storage - */ - int saveConfig(const Config *); - - /** - * Save the running configuration - */ - int saveConfig() { - return saveConfig(_config); - }; - - /** - * Read configuration from file, or from another MGM server - */ - Config *readConfig(); - - /** - * Fetch configuration from another MGM server - */ - Config *fetchConfig(); /** * Stop a node @@ -386,12 +394,16 @@ public: * Get configuration */ const Config * getConfig() const; +private: + void setConfig(Config* conf); + void setClusterLog(void); +public: /** - * Returns the port number. + * Returns the port number where MgmApiService is started * @return port number. */ - int getPort() const; + int getPort() const { return m_port; }; int setDbParameter(int node, int parameter, const char * value, BaseString&); int setConnectionDbParameter(int node1, int node2, int param, int value, @@ -399,15 +411,11 @@ public: int getConnectionDbParameter(int node1, int node2, int param, int *value, BaseString& msg); - int connect_to_self(const char* bindaddress = 0); - void transporter_connect(NDB_SOCKET_TYPE sockfd); - ConfigRetriever *get_config_retriever() { return m_config_retriever; }; - const char *get_connect_address(Uint32 node_id); void get_connected_nodes(NodeBitmask &connected_nodes) const; - SocketServer *get_socket_server() { return m_socket_server; } + SocketServer *get_socket_server() { return &m_socket_server; } void updateStatus(); @@ -452,15 +460,17 @@ private: int check_nodes_stopping(); //************************************************************************** - + + const MgmtOpts& m_opts; int _blockNumber; NodeId _ownNodeId; - SocketServer *m_socket_server; + Uint32 m_port; + SocketServer m_socket_server; + ConfigRetriever m_config_retriever; BlockReference _ownReference; NdbMutex *m_configMutex; const Config * _config; - BaseString m_configFilename; NodeBitmask m_reserved_nodes; struct in_addr m_connect_address[MAX_NODES]; @@ -495,8 +505,6 @@ private: */ void eventReport(const Uint32 * theData, Uint32 len); - NdbMgmHandle m_local_mgm_handle; - char m_local_mgm_connect_string[20]; class TransporterFacade * theFacade; int sendVersionReq( int processId, Uint32 &version, Uint32& mysql_version, @@ -522,8 +530,6 @@ private: struct NdbThread* _logLevelThread; static void *logLevelThread_C(void *); void logLevelThreadRun(); - - ConfigRetriever *m_config_retriever; }; inline === modified file 'storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp 2008-07-29 13:38:18 +0000 @@ -13,62 +13,69 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include -#include - #include "MgmtSrvr.hpp" #include #include -#include +#include -/** - * Save a configuration to the running configuration file - */ -int -MgmtSrvr::saveConfig(const Config *conf) { - BaseString newfile; - newfile.appfmt("%s.new", m_configFilename.c_str()); - - /* Open and write to the new config file */ - FILE *f = fopen(newfile.c_str(), "w"); - if(f == NULL) { - /** @todo Send something apropriate to the log */ - return -1; - } - FileOutputStream stream(f); - conf->printConfigFile(stream); - fclose(f); +Config* +MgmtSrvr::load_init_config(void) +{ + InitConfigFileParser parser; + g_eventLogger->info("Reading cluster configuration from '%s'", + m_opts.config_filename); + return parser.parseConfig(m_opts.config_filename); +} - /* Rename file to real name */ - rename(newfile.c_str(), m_configFilename.c_str()); - return 0; +Config* +MgmtSrvr::load_init_mycnf(void) +{ + InitConfigFileParser parser; + g_eventLogger->info("Reading cluster configuration using my.cnf"); + return parser.parse_mycnf(); } -Config * -MgmtSrvr::readConfig() { - Config *conf; - InitConfigFileParser parser; - if (m_configFilename.length()) - { - conf = parser.parseConfig(m_configFilename.c_str()); + +bool +MgmtSrvr::fetch_config(void) +{ + char buf[128]; + DBUG_ENTER("MgmtSrvr::fetch_config"); + assert(_config == NULL); + + /* Loop until config loaded from other mgmd(s) */ + g_eventLogger->info("Trying to get configuration from other mgmd(s)"\ + "using '%.128s'...", + m_config_retriever.get_connectstring(buf, sizeof(buf))); + + + int retry= 0, delay= 0, verbose= 1; + while (m_config_retriever.do_connect(retry, delay, verbose) != 0) { + g_eventLogger->info("Waiting for connection to other mgmd(s)..."); + NdbSleep_SecSleep(1); } - else - { - ndbout_c("Reading cluster configuration using my.cnf"); - conf = parser.parse_mycnf(); + g_eventLogger->info("Connected..."); + + // "login" and alloc node id from the other mgmd + _ownNodeId= m_config_retriever.allocNodeId(retry, delay); + if (_ownNodeId == 0) { + g_eventLogger->error(m_config_retriever.getErrorString()); + DBUG_RETURN(NULL); } - return conf; -} -Config * -MgmtSrvr::fetchConfig() { - struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig(); - if(tmp != 0){ - Config * conf = new Config(); - conf->m_configValues = tmp; - return conf; + // read config from other managent server + struct ndb_mgm_configuration * tmp = m_config_retriever.getConfig(); + if (tmp == NULL) { + g_eventLogger->error(m_config_retriever.getErrorString()); + DBUG_RETURN(NULL); } - return 0; + + setConfig(new Config(tmp)); + + DBUG_RETURN(true); } + + + === modified file 'storage/ndb/src/mgmsrv/Services.cpp' --- a/storage/ndb/src/mgmsrv/Services.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/Services.cpp 2008-07-29 13:49:59 +0000 @@ -38,9 +38,6 @@ extern bool g_StopServer; extern bool g_RestartServer; extern EventLogger * g_eventLogger; -static const unsigned int MAX_READ_TIMEOUT = 1000 ; -static const unsigned int MAX_WRITE_TIMEOUT = 100 ; - /** const char * name; const char * realName; @@ -308,6 +305,12 @@ MgmApiSession::MgmApiSession(class MgmtS m_session_id= session_id; m_mutex= NdbMutex_Create(); m_errorInsert= 0; + + struct sockaddr_in addr; + SOCKET_SIZE_TYPE addrlen= sizeof(addr); + getpeername(sock, (struct sockaddr*)&addr, &addrlen); + m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); + DBUG_VOID_RETURN; } @@ -340,6 +343,8 @@ MgmApiSession::runSession() { DBUG_ENTER("MgmApiSession::runSession"); + g_eventLogger->debug("%s: Connected!", name()); + Parser_t::Context ctx; ctx.m_mutex= m_mutex; m_ctx= &ctx; @@ -350,22 +355,53 @@ MgmApiSession::runSession() m_input->reset_timeout(); m_output->reset_timeout(); - m_parser->run(ctx, *this); - - if(ctx.m_currentToken == 0) + if (m_parser->run(ctx, *this)) { - NdbMutex_Unlock(m_mutex); - break; + stop= m_stop; + assert(ctx.m_status == Parser_t::Ok); } + else + { + const char* msg= NULL; + switch(ctx.m_status) { + case Parser_t::Eof: // Client disconnected + g_eventLogger->debug("%s: Disconnected!", name()); + stop= true; + break; + + case Parser_t::ExternalStop: // Stopped by other thread + stop= true; + break; + + case Parser_t::NoLine: // Normal read timeout + case Parser_t::EmptyLine: + break; + + case Parser_t::UnknownCommand: msg= "Unknown command"; break; + case Parser_t::UnknownArgument: msg= "Unknown argument"; break; + case Parser_t::TypeMismatch: msg= "Type mismatch"; break; + case Parser_t::InvalidArgumentFormat: msg= "Invalid arg. format"; break; + case Parser_t::UnknownArgumentType: msg= "Unknown argument type"; break; + case Parser_t::ArgumentGivenTwice: msg= "Argument given twice"; break; + case Parser_t::MissingMandatoryArgument: msg= "Missing arg."; break; + + case Parser_t::Ok: // Should never happen here + case Parser_t::CommandWithoutFunction: + abort(); + break; + } - switch(ctx.m_status) { - case Parser_t::UnknownCommand: - break; - default: - break; + if (msg){ + g_eventLogger->debug("%s: %s, '%s'", + name(), msg, ctx.m_currentToken); + + // Send result to client + m_output->println("result: %s, '%s'", + msg, ctx.m_currentToken); + m_output->println(""); + } } - stop= m_stop; NdbMutex_Unlock(m_mutex); }; === modified file 'storage/ndb/src/mgmsrv/Services.hpp' --- a/storage/ndb/src/mgmsrv/Services.hpp 2007-03-22 11:33:07 +0000 +++ b/storage/ndb/src/mgmsrv/Services.hpp 2008-07-29 13:38:18 +0000 @@ -48,6 +48,9 @@ private: int m_errorInsert; + BaseString m_name; + const char* name() { return m_name.c_str(); } + const char *get_error_text(int err_no) { return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); } @@ -115,20 +118,15 @@ public: }; class MgmApiService : public SocketServer::Service { - class MgmtSrvr * m_mgmsrv; + MgmtSrvr& m_mgmsrv; Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer public: - MgmApiService(){ - m_mgmsrv = 0; - m_next_session_id= 1; - } - - void setMgm(class MgmtSrvr * mgmsrv){ - m_mgmsrv = mgmsrv; - } - + MgmApiService(MgmtSrvr& mgm): + m_mgmsrv(mgm), + m_next_session_id(1) {} + SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){ - return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++); + return new MgmApiSession(m_mgmsrv, socket, m_next_session_id++); } }; === modified file 'storage/ndb/src/mgmsrv/main.cpp' --- a/storage/ndb/src/mgmsrv/main.cpp 2008-06-09 10:52:24 +0000 +++ b/storage/ndb/src/mgmsrv/main.cpp 2008-07-29 14:28:33 +0000 @@ -18,29 +18,18 @@ #include "MgmtSrvr.hpp" #include "EventLogger.hpp" -#include -#include "InitConfigFileParser.hpp" -#include -#include "Services.hpp" +#include "Config.hpp" + #include #include -#include -#include #include #include -#include +#include #include -#include #include - #include - #include -#undef DEBUG -#define DEBUG(x) ndbout << x << endl; - -const char progname[] = "mgmtsrvr"; const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 }; // copied from mysql.cc to get readline @@ -87,97 +76,53 @@ read_and_execute(Ndb_mgmclient* com, con return com->execute(line_read,_try_reconnect); } -/** - * @struct MgmGlobals - * @brief Global Variables used in the management server - *****************************************************************************/ - -/** Command line arguments */ -static int opt_daemon; // NOT bool, bool need not be int -static int opt_non_interactive; -static int opt_interactive; -static const char * opt_config_filename= 0; -static int opt_mycnf = 0; -static const char* _bind_address = 0; - -struct MgmGlobals { - MgmGlobals(); - ~MgmGlobals(); - - /** Stuff found in environment or in local config */ - NodeId localNodeId; - short unsigned int port; - - /** The Mgmt Server */ - MgmtSrvr * mgmObject; - - /** The Socket Server */ - SocketServer * socketServer; -}; +/* Global variables */ +bool g_StopServer= false; +bool g_RestartServer= false; +static MgmtSrvr* mgm; +static MgmtSrvr::MgmtOpts opts; -int g_no_nodeid_checks= 0; -int g_print_full_config; -static MgmGlobals *glob= 0; - -/****************************************************************************** - * Function prototypes - ******************************************************************************/ -/** - * Global variables - */ -bool g_StopServer; -bool g_RestartServer; -extern EventLogger * g_eventLogger; - -enum ndb_mgmd_options { - OPT_INTERACTIVE = NDB_STD_OPTIONS_LAST, - OPT_NO_NODEID_CHECKS, - OPT_NO_DAEMON -}; NDB_STD_OPTS_VARS; static struct my_option my_long_options[] = { NDB_STD_OPTS("ndb_mgmd"), { "config-file", 'f', "Specify cluster configuration file", - (uchar**) &opt_config_filename, (uchar**) &opt_config_filename, 0, + (uchar**) &opts.config_filename, (uchar**) &opts.config_filename, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "print-full-config", 'P', "Print full config and exit", - (uchar**) &g_print_full_config, (uchar**) &g_print_full_config, 0, + (uchar**) &opts.print_full_config, (uchar**) &opts.print_full_config, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", - (uchar**) &opt_daemon, (uchar**) &opt_daemon, 0, + (uchar**) &opts.daemon, (uchar**) &opts.daemon, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, - { "interactive", OPT_INTERACTIVE, + { "interactive", 256, "Run interactive. Not supported but provided for testing purposes", - (uchar**) &opt_interactive, (uchar**) &opt_interactive, 0, + (uchar**) &opts.interactive, (uchar**) &opts.interactive, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, - { "no-nodeid-checks", OPT_NO_NODEID_CHECKS, - "Do not provide any node id checks", - (uchar**) &g_no_nodeid_checks, (uchar**) &g_no_nodeid_checks, 0, + { "no-nodeid-checks", 256, + "Do not provide any node id checks", + (uchar**) &opts.no_nodeid_checks, (uchar**) &opts.no_nodeid_checks, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, - { "nodaemon", OPT_NO_DAEMON, + { "nodaemon", 256, "Don't run as daemon, but don't read from stdin", - (uchar**) &opt_non_interactive, (uchar**) &opt_non_interactive, 0, + (uchar**) &opts.non_interactive, (uchar**) &opts.non_interactive, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "mycnf", 256, "Read cluster config from my.cnf", - (uchar**) &opt_mycnf, (uchar**) &opt_mycnf, 0, + (uchar**) &opts.mycnf, (uchar**) &opts.mycnf, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, - { "bind-address", 256, + { "bind-address", 256, "Local bind address", - (uchar**) &_bind_address, (uchar**) &_bind_address, 0, + (uchar**) &opts.bind_address, (uchar**) &opts.bind_address, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} }; -static void short_usage_sub(void) -{ - printf("Usage: %s [OPTIONS]\n", my_progname); -} + static void usage() { - short_usage_sub(); + printf("Usage: %s [OPTIONS]\n", my_progname); ndb_std_print_version(); print_defaults(MYSQL_CONFIG_NAME,load_default_groups); puts(""); @@ -185,14 +130,13 @@ static void usage() my_print_variables(my_long_options); } -/* - * MAIN - */ + int main(int argc, char** argv) { - NDB_INIT(argv[0]); + g_eventLogger->setCategory("MgmSrvr"); + load_defaults("my",load_default_groups,&argc,&argv); int ho_error; @@ -203,167 +147,108 @@ int main(int argc, char** argv) ndb_std_get_one_option))) exit(ho_error); -start: - glob= new MgmGlobals; - - if (opt_interactive || - opt_non_interactive || - g_print_full_config) { - opt_daemon= 0; + if (opts.interactive || + opts.non_interactive || + opts.print_full_config) { + opts.daemon= 0; } - if (opt_mycnf && opt_config_filename) + /* Output to console initially */ + g_eventLogger->createConsoleHandler(); + + if (opts.mycnf && opts.config_filename) { - ndbout_c("Both --mycnf and -f is not supported"); - return 0; + g_eventLogger->error("Both --mycnf and -f is not supported"); + exit(1); } - if (opt_mycnf == 0 && opt_config_filename == 0) + if (opts.mycnf == 0 && opts.config_filename == 0) { struct stat buf; if (stat("config.ini", &buf) != -1) - opt_config_filename = "config.ini"; + opts.config_filename = "config.ini"; } - - glob->socketServer = new SocketServer(); - - MgmApiService * mapi = new MgmApiService(); +start: - glob->mgmObject = new MgmtSrvr(glob->socketServer, - opt_config_filename, - opt_connect_str); + g_eventLogger->info("NDB Cluster Management Server. %s", NDB_VERSION_STRING); - if (g_print_full_config) - goto the_end; + mgm= new MgmtSrvr(opts, opt_connect_str); + if (mgm == NULL) { + g_eventLogger->critical("Out of memory, couldn't create MgmtSrvr"); + exit(1); + } - my_setwd(NdbConfig_get_path(0), MYF(0)); + /** + Install signal handler for SIGPIPE + Done in TransporterFacade as well.. what about Configretriever? + */ +#if !defined NDB_WIN32 + signal(SIGPIPE, SIG_IGN); +#endif - glob->localNodeId= glob->mgmObject->getOwnNodeId(); - if (glob->localNodeId == 0) { - goto error_end; + /* Init mgm, load or fetch config */ + if (!mgm->init()) { + delete mgm; + exit(1); } - glob->port= glob->mgmObject->getPort(); + my_setwd(NdbConfig_get_path(0), MYF(0)); - if (glob->port == 0) - goto error_end; + if (opts.daemon) { - { - int count= 5; // no of retries for tryBind - while(!glob->socketServer->tryBind(glob->port, _bind_address)){ - if (--count > 0) { - NdbSleep_MilliSleep(1000); - continue; - } - ndbout_c("Unable to setup port: %s:%d!\n" - "Please check if the port is already used,\n" - "(perhaps a ndb_mgmd is already running),\n" - "and if you are executing on the correct computer", - (_bind_address ? _bind_address : "*"), glob->port); - goto error_end; + NodeId localNodeId= mgm->getOwnNodeId(); + if (localNodeId == 0) { + g_eventLogger->error("Couldn't get own node id"); + delete mgm; + exit(1); } - } - - if(!glob->socketServer->setup(mapi, &glob->port, _bind_address)) - { - ndbout_c("Unable to setup management port: %s:%d!\n" - "Please check if the port is already used,\n" - "(perhaps a ndb_mgmd is already running),\n" - "and if you are executing on the correct computer", - (_bind_address ? _bind_address : "*"), glob->port); - delete mapi; - goto error_end; - } - if (opt_daemon) { // Become a daemon - char *lockfile= NdbConfig_PidFileName(glob->localNodeId); - char *logfile= NdbConfig_StdoutFileName(glob->localNodeId); + char *lockfile= NdbConfig_PidFileName(localNodeId); + char *logfile= NdbConfig_StdoutFileName(localNodeId); NdbAutoPtr tmp_aptr1(lockfile), tmp_aptr2(logfile); if (NdbDaemon_Make(lockfile, logfile, 0) == -1) { - ndbout << "Cannot become daemon: " << NdbDaemon_ErrorText << endl; - return 1; + g_eventLogger->error("Cannot become daemon: %s", NdbDaemon_ErrorText); + delete mgm; + exit(1); } } -#ifndef NDB_WIN32 - signal(SIGPIPE, SIG_IGN); -#endif - { - BaseString error_string; - if(!glob->mgmObject->start(error_string, _bind_address)){ - ndbout_c("Unable to start management server."); - ndbout_c("Probably caused by illegal initial configuration file."); - ndbout_c(error_string.c_str()); - goto error_end; - } + /* Start mgm services */ + if (!mgm->start()) { + delete mgm; + exit(1); } - //glob->mgmObject->saveConfig(); - mapi->setMgm(glob->mgmObject); - - char msg[256]; - BaseString::snprintf(msg, sizeof(msg), - "NDB Cluster Management Server. %s", NDB_VERSION_STRING); - ndbout_c(msg); - g_eventLogger->info(msg); - - BaseString::snprintf(msg, 256, "Id: %d, Command port: %s:%d", - glob->localNodeId, - _bind_address ? _bind_address : "*", - glob->port); - ndbout_c(msg); - g_eventLogger->info(msg); - - g_StopServer = false; - g_RestartServer= false; - glob->socketServer->startServer(); - - if(opt_interactive) { + if(opts.interactive) { + int port= mgm->getPort(); BaseString con_str; - if(_bind_address) - con_str.appfmt("host=%s:%d", _bind_address, glob->port); - else - con_str.appfmt("localhost:%d", glob->port); + if(opts.bind_address) + con_str.appfmt("host=%s:%d", opts.bind_address, port); + else + con_str.appfmt("localhost:%d", port); Ndb_mgmclient com(con_str.c_str(), 1); while(g_StopServer != true && read_and_execute(&com, "ndb_mgm> ", 1)); - } else - { + } + else { while(g_StopServer != true) NdbSleep_MilliSleep(500); } - if(g_RestartServer) - g_eventLogger->info("Restarting server..."); - else - g_eventLogger->info("Shutting down server..."); - glob->socketServer->stopServer(); - // We disconnect from the ConfigRetreiver mgmd when we delete glob below - glob->socketServer->stopSessions(true); + g_eventLogger->info("Shutting down server..."); + delete mgm; g_eventLogger->info("Shutdown complete"); - the_end: - delete glob; - if(g_RestartServer) + + if(g_RestartServer){ + g_eventLogger->info("Restarting server..."); + g_RestartServer= g_StopServer= false; goto start; + } + + g_eventLogger->close(); + ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); return 0; - error_end: - delete glob; - ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); - return 1; } -MgmGlobals::MgmGlobals(){ - // Default values - port = 0; - socketServer = 0; - mgmObject = 0; -} - -MgmGlobals::~MgmGlobals(){ - if (socketServer) - delete socketServer; - if (mgmObject) - delete mgmObject; -} === modified file 'storage/ndb/src/ndbapi/SignalSender.cpp' --- a/storage/ndb/src/ndbapi/SignalSender.cpp 2007-04-12 05:49:09 +0000 +++ b/storage/ndb/src/ndbapi/SignalSender.cpp 2008-07-25 11:28:05 +0000 @@ -70,13 +70,13 @@ SimpleSignal::print(FILE * out){ } } -SignalSender::SignalSender(TransporterFacade *facade) +SignalSender::SignalSender(TransporterFacade *facade, int blockNo) : m_lock(0) { m_cond = NdbCondition_Create(); theFacade = facade; lock(); - m_blockNo = theFacade->open(this, execSignal, execNodeStatus); + m_blockNo = theFacade->open(this, execSignal, execNodeStatus, blockNo); unlock(); assert(m_blockNo > 0); } @@ -143,7 +143,12 @@ SignalSender::waitFor(Uint32 timeOutMill } return s; } - + + /* Remove old signals from usedBuffer */ + for (int i= 0; i < m_usedBuffer.size(); i++) + delete m_usedBuffer[i]; + m_usedBuffer.clear(); + NDB_TICKS now = NdbTick_CurrentMillisecond(); NDB_TICKS stop = now + timeOutMillis; Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis); === modified file 'storage/ndb/src/ndbapi/SignalSender.hpp' --- a/storage/ndb/src/ndbapi/SignalSender.hpp 2007-03-02 10:52:12 +0000 +++ b/storage/ndb/src/ndbapi/SignalSender.hpp 2008-06-10 14:15:53 +0000 @@ -43,7 +43,7 @@ private: class SignalSender { public: - SignalSender(TransporterFacade *facade); + SignalSender(TransporterFacade *facade, int blockNo = -1); virtual ~SignalSender(); int lock(); === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-07-22 13:42:54 +0000 @@ -388,7 +388,7 @@ TransporterFacade::deliver_signal(Signal return; } else { ; // Ignore all other block numbers. - if(header->theVerId_signalNumber!=3) { + if(header->theVerId_signalNumber != GSN_API_REGREQ) { TRP_DEBUG( "TransporterFacade received signal to unknown block no." ); ndbout << "BLOCK NO: " << tRecBlockNo << " sig " << header->theVerId_signalNumber << endl; @@ -925,10 +925,11 @@ TransporterFacade::close_local(BlockNumb int TransporterFacade::open(void* objRef, ExecuteFunction fun, - NodeStatusFunction statusFun) + NodeStatusFunction statusFun, + int blockNo) { DBUG_ENTER("TransporterFacade::open"); - int r= m_threads.open(objRef, fun, statusFun); + int r= m_threads.open(objRef, fun, statusFun, blockNo); if (r < 0) DBUG_RETURN(r); #if 1 @@ -1368,14 +1369,42 @@ TransporterFacade::ThreadData::expand(Ui int TransporterFacade::ThreadData::open(void* objRef, ExecuteFunction fun, - NodeStatusFunction fun2) + NodeStatusFunction fun2, + int blockNo) { Uint32 nextFree = m_firstFree; if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){ return -1; } - + + Object_Execute oe = { objRef , fun }; + + if (unlikely(blockNo >= 0)) { + // Open block with fixed number + Uint32 index= numberToIndex(blockNo); + + if(index > m_statusNext.size()){ + expand(index - m_statusNext.size()); + } + + m_use_cnt++; + + // Single linked free list, relink the previous one that points to this + for(Uint32 i = 0; i < m_statusNext.size(); i++){ + if (m_statusNext[i] == index){ + m_statusNext[i]= m_statusNext[index]; + break; + } + } + + m_statusNext[index] = INACTIVE; + m_objectExecute[index] = oe; + m_statusFunction[index] = fun2; + + return indexToNumber(index); + } + if(nextFree == END_OF_LIST){ expand(10); nextFree = m_firstFree; @@ -1383,8 +1412,6 @@ TransporterFacade::ThreadData::open(void m_use_cnt++; m_firstFree = m_statusNext[nextFree]; - - Object_Execute oe = { objRef , fun }; m_statusNext[nextFree] = INACTIVE; m_objectExecute[nextFree] = oe; === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-06-17 20:28:45 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-07-22 13:42:54 +0000 @@ -60,9 +60,11 @@ public: /** * Register this block for sending/receiving signals + * @blockNo block number to use, -1 => any blockNumber * @return BlockNumber or -1 for failure */ - int open(void* objRef, ExecuteFunction, NodeStatusFunction); + int open(void* objRef, ExecuteFunction, NodeStatusFunction, + int blockNo = -1); // Close this block number int close(BlockNumber blockNumber, Uint64 trans_id); @@ -279,7 +281,7 @@ private: Vector m_objectExecute; Vector m_statusFunction; - int open(void* objRef, ExecuteFunction, NodeStatusFunction); + int open(void* objRef, ExecuteFunction, NodeStatusFunction, int); int close(int number); void expand(Uint32 size); === modified file 'storage/ndb/test/include/NDBT_Test.hpp' --- a/storage/ndb/test/include/NDBT_Test.hpp 2008-06-23 12:37:22 +0000 +++ b/storage/ndb/test/include/NDBT_Test.hpp 2008-07-22 13:25:43 +0000 @@ -122,13 +122,11 @@ typedef int (NDBT_TESTFUNC)(NDBT_Context class NDBT_Step { public: - NDBT_Step(NDBT_TestCase* ptest, - const char* pname, - NDBT_TESTFUNC* pfunc); + NDBT_Step(NDBT_TestCase* ptest, + const char* pname, + NDBT_TESTFUNC* pfunc); virtual ~NDBT_Step() {} int execute(NDBT_Context*); - virtual int setUp(Ndb_cluster_connection&) = 0; - virtual void tearDown() = 0; void setContext(NDBT_Context*); NDBT_Context* getContext(); void print(); @@ -141,23 +139,18 @@ protected: NDBT_TESTFUNC* func; NDBT_TestCase* testcase; int step_no; -}; -class NDBT_NdbApiStep : public NDBT_Step { +private: + int setUp(Ndb_cluster_connection&); + void tearDown(); + Ndb* m_ndb; + public: - NDBT_NdbApiStep(NDBT_TestCase* ptest, - const char* pname, - NDBT_TESTFUNC* pfunc); - virtual ~NDBT_NdbApiStep() {} - virtual int setUp(Ndb_cluster_connection&); - virtual void tearDown(); + Ndb* getNdb() const; - Ndb* getNdb(); -protected: - Ndb* ndb; }; -class NDBT_ParallelStep : public NDBT_NdbApiStep { +class NDBT_ParallelStep : public NDBT_Step { public: NDBT_ParallelStep(NDBT_TestCase* ptest, const char* pname, @@ -165,7 +158,7 @@ public: virtual ~NDBT_ParallelStep() {} }; -class NDBT_Verifier : public NDBT_NdbApiStep { +class NDBT_Verifier : public NDBT_Step { public: NDBT_Verifier(NDBT_TestCase* ptest, const char* name, @@ -173,7 +166,7 @@ public: virtual ~NDBT_Verifier() {} }; -class NDBT_Initializer : public NDBT_NdbApiStep { +class NDBT_Initializer : public NDBT_Step { public: NDBT_Initializer(NDBT_TestCase* ptest, const char* name, @@ -181,7 +174,7 @@ public: virtual ~NDBT_Initializer() {} }; -class NDBT_Finalizer : public NDBT_NdbApiStep { +class NDBT_Finalizer : public NDBT_Step { public: NDBT_Finalizer(NDBT_TestCase* ptest, const char* name, @@ -190,6 +183,12 @@ public: }; +enum NDBT_DriverType { + DummyDriver, + NdbApiDriver +}; + + class NDBT_TestCase { public: NDBT_TestCase(NDBT_TestSuite* psuite, @@ -211,10 +210,13 @@ public: virtual bool tableExists(NdbDictionary::Table* aTable) = 0; virtual bool isVerify(const NdbDictionary::Table* aTable) = 0; - virtual void saveTestResult(const NdbDictionary::Table* ptab, int result) = 0; + virtual void saveTestResult(const char*, int result) = 0; virtual void printTestResult() = 0; void initBeforeTest(){ timer.doReset();}; + void setDriverType(NDBT_DriverType type) { m_driverType= type; } + NDBT_DriverType getDriverType() const { return m_driverType; } + /** * Get no of steps running/completed */ @@ -243,6 +245,7 @@ protected: Properties props; NdbTimer timer; bool isVerifyTables; + NDBT_DriverType m_driverType; }; static const int FAILED_TO_CREATE = 1000; @@ -281,7 +284,7 @@ public: virtual ~NDBT_TestCaseImpl1(); int addStep(NDBT_Step*); int addVerifier(NDBT_Verifier*); - int addInitializer(NDBT_Initializer*); + int addInitializer(NDBT_Initializer*, bool first= false); int addFinalizer(NDBT_Finalizer*); void addTable(const char*, bool); bool tableExists(NdbDictionary::Table*); @@ -300,7 +303,7 @@ public: private: static const int NORESULT = 999; - void saveTestResult(const NdbDictionary::Table* ptab, int result); + void saveTestResult(const char*, int result); void printTestResult(); void startStepInThread(int stepNo, NDBT_Context* ctx); @@ -371,14 +374,24 @@ public: void setTemporaryTables(bool val); bool getTemporaryTables() const; + + void setLogging(bool val); + bool getLogging() const; + + int createTables(Ndb_cluster_connection&) const; + int dropTables(Ndb_cluster_connection&) const; + + void setDriverType(NDBT_DriverType type) { m_driverType= type; } + NDBT_DriverType getDriverType() const { return m_driverType; } + private: int executeOne(Ndb_cluster_connection&, const char* _tabname, const char* testname = NULL); int executeAll(Ndb_cluster_connection&, const char* testname = NULL); void execute(Ndb_cluster_connection&, - Ndb*, const NdbDictionary::Table*, const char* testname = NULL); - + const NdbDictionary::Table*, const char* testname = NULL); + int report(const char* _tcname = NULL); int reportAllTables(const char* ); const char* name; @@ -392,13 +405,14 @@ private: int loops; int timer; NdbTimer testSuiteTimer; - bool createTable; + bool m_createTable; + bool m_createAll; bool diskbased; bool runonce; const char* tsname; - bool createAllTables; bool temporaryTables; - bool nologging; + bool m_logging; + NDBT_DriverType m_driverType; }; @@ -413,10 +427,18 @@ C##suitname():NDBT_TestSuite(#suitname){ NDBT_Initializer* pti; pti = NULL; \ NDBT_Finalizer* ptf; ptf = NULL; +// The default driver type to use for all tests in suite +#define DRIVER(type) \ + setDriverType(type) + #define TESTCASE(testname, comment) \ pt = new NDBT_TestCaseImpl1(this, testname, comment); \ addTest(pt); +// The driver type to use for a particular testcase +#define TESTCASE_DRIVER(type) \ + pt->setDriverType(type); + #define TC_PROPERTY(propname, propval) \ pt->setProperty(propname, propval); @@ -464,6 +486,6 @@ C##suitname():NDBT_TestSuite(#suitname){ } } ; C##suitname suitname // Helper functions for retrieving variables from NDBT_Step -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() +#define GETNDB(ps) ((NDBT_Step*)ps)->getNdb() #endif === modified file 'storage/ndb/test/include/NdbMgmd.hpp' --- a/storage/ndb/test/include/NdbMgmd.hpp 2008-06-23 12:37:22 +0000 +++ b/storage/ndb/test/include/NdbMgmd.hpp 2008-07-22 13:25:43 +0000 @@ -26,7 +26,7 @@ public: m_connect_str(getenv("NDB_CONNECTSTRING")) { if (!m_connect_str.length()){ - fprintf(stderr, "Could not init NdbConnectCtring"); + fprintf(stderr, "Could not init NdbConnectString"); abort(); } } === modified file 'storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp' --- a/storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp 2008-03-03 15:10:42 +0000 +++ b/storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp 2008-07-22 13:25:43 +0000 @@ -45,10 +45,6 @@ urandom(uint m) return r; } -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() -/* -*/ - bool syncSlaveWithMaster() { === modified file 'storage/ndb/test/ndbapi/testBasic.cpp' --- a/storage/ndb/test/ndbapi/testBasic.cpp 2007-11-02 16:23:17 +0000 +++ b/storage/ndb/test/ndbapi/testBasic.cpp 2008-07-22 13:25:43 +0000 @@ -19,8 +19,6 @@ #include #include -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() - /** * TODO === modified file 'storage/ndb/test/ndbapi/testBasicAsynch.cpp' --- a/storage/ndb/test/ndbapi/testBasicAsynch.cpp 2006-12-23 19:20:40 +0000 +++ b/storage/ndb/test/ndbapi/testBasicAsynch.cpp 2008-07-22 13:25:43 +0000 @@ -19,8 +19,6 @@ #include "HugoAsynchTransactions.hpp" #include "UtilTransactions.hpp" -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() - int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int records = ctx->getNumRecords(); === modified file 'storage/ndb/test/ndbapi/testMgm.cpp' --- a/storage/ndb/test/ndbapi/testMgm.cpp 2008-06-23 12:52:49 +0000 +++ b/storage/ndb/test/ndbapi/testMgm.cpp 2008-07-22 13:25:43 +0000 @@ -642,6 +642,7 @@ done: } NDBT_TESTSUITE(testMgm); +DRIVER(DummyDriver); /* turn off use of NdbApi */ TESTCASE("ApiSessionFailure", "Test failures in MGMAPI session"){ INITIALIZER(runTestApiSession); @@ -681,6 +682,8 @@ NDBT_TESTSUITE_END(testMgm); int main(int argc, const char** argv){ ndb_init(); + testMgm.setCreateTable(false); + testMgm.setRunAllTables(true); return testMgm.execute(argc, argv); } === modified file 'storage/ndb/test/ndbapi/testOIBasic.cpp' --- a/storage/ndb/test/ndbapi/testOIBasic.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/test/ndbapi/testOIBasic.cpp 2008-07-23 11:36:53 +0000 @@ -614,6 +614,10 @@ getcs(Par par) uint n = urandom(maxcsnumber); cs = get_charset(n, MYF(0)); if (cs != 0) { + // avoid dodgy internal character sets + // see bug# 37554 + if (cs->state & MY_CS_HIDDEN) + continue; // prefer complex charsets if (cs->mbmaxlen != 1 || urandom(5) == 0) break; === modified file 'storage/ndb/test/ndbapi/testPartitioning.cpp' --- a/storage/ndb/test/ndbapi/testPartitioning.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/test/ndbapi/testPartitioning.cpp 2008-07-22 13:27:57 +0000 @@ -19,8 +19,6 @@ #include #include -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() - static Uint32 max_dks = 0; static === modified file 'storage/ndb/test/ndbapi/test_event.cpp' --- a/storage/ndb/test/ndbapi/test_event.cpp 2008-06-18 21:25:50 +0000 +++ b/storage/ndb/test/ndbapi/test_event.cpp 2008-07-22 13:27:57 +0000 @@ -23,8 +23,6 @@ #include #include -#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() - static int createEvent(Ndb *pNdb, const NdbDictionary::Table &tab, NDBT_Context* ctx) === modified file 'storage/ndb/test/src/NDBT_Tables.cpp' --- a/storage/ndb/test/src/NDBT_Tables.cpp 2008-07-01 12:35:34 +0000 +++ b/storage/ndb/test/src/NDBT_Tables.cpp 2008-07-22 13:27:57 +0000 @@ -1248,6 +1248,7 @@ loop: if(r == -1){ if(pNdb->getDictionary()->getNdbError().code == 755) { + ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl; if (create_default_tablespace(pNdb) == 0) { goto loop; === modified file 'storage/ndb/test/src/NDBT_Test.cpp' --- a/storage/ndb/test/src/NDBT_Test.cpp 2008-06-23 12:37:22 +0000 +++ b/storage/ndb/test/src/NDBT_Test.cpp 2008-07-22 13:25:43 +0000 @@ -25,7 +25,6 @@ #include -// No verbose outxput NDBT_Context::NDBT_Context(Ndb_cluster_connection& con) : m_cluster_connection(con) @@ -258,14 +257,58 @@ void NDBT_Context::setNumLoops(int _loop loops = _loops; } -NDBT_Step::NDBT_Step(NDBT_TestCase* ptest, const char* pname, - NDBT_TESTFUNC* pfunc): name(pname){ - assert(pfunc != NULL); - func = pfunc; - testcase = ptest; - step_no = -1; +NDBT_Step::NDBT_Step(NDBT_TestCase* ptest, const char* pname, + NDBT_TESTFUNC* pfunc) : + m_ctx(NULL), name(pname), func(pfunc), + testcase(ptest), step_no(-1), m_ndb(NULL) +{ } + +int +NDBT_Step::setUp(Ndb_cluster_connection& con){ + + switch(testcase->getDriverType()) + { + case DummyDriver: + break; + + case NdbApiDriver: + { + m_ndb = new Ndb(&con, "TEST_DB" ); + m_ndb->init(1024); + + int result = m_ndb->waitUntilReady(300); // 5 minutes + if (result != 0){ + g_err << "Ndb was not ready" << endl; + return NDBT_FAILED; + } + break; + } + + default: + abort(); + break; + + } + + return NDBT_OK; +} + + +void +NDBT_Step::tearDown(){ + delete m_ndb; + m_ndb = NULL; +} + + +Ndb* NDBT_Step::getNdb() const { + assert(m_ndb != NULL); + return m_ndb; +} + + int NDBT_Step::execute(NDBT_Context* ctx) { assert(ctx != NULL); @@ -305,58 +348,26 @@ NDBT_Context* NDBT_Step::getContext(){ return m_ctx; } -NDBT_NdbApiStep::NDBT_NdbApiStep(NDBT_TestCase* ptest, - const char* pname, - NDBT_TESTFUNC* pfunc) - : NDBT_Step(ptest, pname, pfunc), - ndb(NULL) { -} - - -int -NDBT_NdbApiStep::setUp(Ndb_cluster_connection& con){ - ndb = new Ndb(&con, "TEST_DB" ); - ndb->init(1024); - - int result = ndb->waitUntilReady(300); // 5 minutes - if (result != 0){ - g_err << name << ": Ndb was not ready" << endl; - return NDBT_FAILED; - } - return NDBT_OK; -} - -void -NDBT_NdbApiStep::tearDown(){ - delete ndb; - ndb = NULL; -} - -Ndb* NDBT_NdbApiStep::getNdb(){ - assert(ndb != NULL); - return ndb; -} - NDBT_ParallelStep::NDBT_ParallelStep(NDBT_TestCase* ptest, const char* pname, NDBT_TESTFUNC* pfunc) - : NDBT_NdbApiStep(ptest, pname, pfunc) { + : NDBT_Step(ptest, pname, pfunc) { } NDBT_Verifier::NDBT_Verifier(NDBT_TestCase* ptest, const char* pname, NDBT_TESTFUNC* pfunc) - : NDBT_NdbApiStep(ptest, pname, pfunc) { + : NDBT_Step(ptest, pname, pfunc) { } NDBT_Initializer::NDBT_Initializer(NDBT_TestCase* ptest, const char* pname, NDBT_TESTFUNC* pfunc) - : NDBT_NdbApiStep(ptest, pname, pfunc) { + : NDBT_Step(ptest, pname, pfunc) { } NDBT_Finalizer::NDBT_Finalizer(NDBT_TestCase* ptest, const char* pname, NDBT_TESTFUNC* pfunc) - : NDBT_NdbApiStep(ptest, pname, pfunc) { + : NDBT_Step(ptest, pname, pfunc) { } NDBT_TestCase::NDBT_TestCase(NDBT_TestSuite* psuite, @@ -386,6 +397,8 @@ NDBT_TestCaseImpl1::NDBT_TestCaseImpl1(N numStepsCompleted = 0; waitThreadsMutexPtr = NdbMutex_Create(); waitThreadsCondPtr = NdbCondition_Create(); + + m_driverType= psuite->getDriverType(); } NDBT_TestCaseImpl1::~NDBT_TestCaseImpl1(){ @@ -429,9 +442,13 @@ int NDBT_TestCaseImpl1::addVerifier(NDBT return 0; } -int NDBT_TestCaseImpl1::addInitializer(NDBT_Initializer* pInitializer){ +int NDBT_TestCaseImpl1::addInitializer(NDBT_Initializer* pInitializer, + bool first){ assert(pInitializer != NULL); - initializers.push_back(pInitializer); + if (first) + initializers.push(pInitializer, 0); + else + initializers.push_back(pInitializer); return 0; } @@ -722,9 +739,9 @@ int NDBT_TestCaseImpl1::runFinal(NDBT_Co } -void NDBT_TestCaseImpl1::saveTestResult(const NdbDictionary::Table* ptab, +void NDBT_TestCaseImpl1::saveTestResult(const char* test_name, int result){ - testResults.push_back(new NDBT_TestCaseResult(ptab->getName(), + testResults.push_back(new NDBT_TestCaseResult(test_name, result, timer.elapsedTime())); } @@ -754,18 +771,21 @@ void NDBT_TestCaseImpl1::printTestResult -NDBT_TestSuite::NDBT_TestSuite(const char* pname):name(pname){ +NDBT_TestSuite::NDBT_TestSuite(const char* pname) : + name(pname), + m_createTable(true), + m_createAll(false), + m_logging(true), + m_driverType(NdbApiDriver) +{ numTestsOk = 0; numTestsFail = 0; numTestsExecuted = 0; records = 0; loops = 0; - createTable = true; diskbased = false; tsname = NULL; - createAllTables = false; temporaryTables = false; - nologging = false; } @@ -777,14 +797,14 @@ NDBT_TestSuite::~NDBT_TestSuite(){ } void NDBT_TestSuite::setCreateTable(bool _flag){ - createTable = _flag; + m_createTable = _flag; } void NDBT_TestSuite::setRunAllTables(bool _flag){ runonce = _flag; } void NDBT_TestSuite::setCreateAllTables(bool _flag){ - createAllTables = _flag; + m_createAll = _flag; } void NDBT_TestSuite::setTemporaryTables(bool val){ @@ -795,6 +815,14 @@ bool NDBT_TestSuite::getTemporaryTables( return temporaryTables; } +void NDBT_TestSuite::setLogging(bool val){ + m_logging = val; +} + +bool NDBT_TestSuite::getLogging() const { + return m_logging; +} + bool NDBT_TestSuite::timerIsOn(){ return (timer != 0); } @@ -819,91 +847,41 @@ int NDBT_TestSuite::executeAll(Ndb_clust if(tests.size() == 0) return NDBT_FAILED; - Ndb ndb(&con, "TEST_DB"); - ndb.init(1024); - - int result = ndb.waitUntilReady(500); // 5 minutes - if (result != 0){ - g_err << name <<": Ndb was not ready" << endl; - return NDBT_FAILED; - } ndbout << name << " started [" << getDate() << "]" << endl; - if(!runonce) { testSuiteTimer.doStart(); for (int t=0; t < NDBT_Tables::getNumTables(); t++){ const NdbDictionary::Table* ptab = NDBT_Tables::getTable(t); ndbout << "|- " << ptab->getName() << endl; - execute(con, &ndb, ptab, _testname); + execute(con, ptab, _testname); } testSuiteTimer.doStop(); } else { - NdbDictionary::Dictionary* pDict= ndb.getDictionary(); for (unsigned i = 0; i < tests.size(); i++){ if (_testname != NULL && strcasecmp(tests[i]->getName(), _testname) != 0) continue; - - + tests[i]->initBeforeTest(); ctx = new NDBT_Context(con); - Uint32 t; - for (t=0; t < (Uint32)NDBT_Tables::getNumTables(); t++) - { - const NdbDictionary::Table* pTab = NDBT_Tables::getTable(t); - const NdbDictionary::Table* pTab2 = pDict->getTable(pTab->getName()); - - if(pTab2 != 0 && pDict->dropTable(pTab->getName()) != 0) - { - numTestsFail++; - numTestsExecuted++; - g_err << "ERROR0: Failed to drop table " << pTab->getName() << endl; - tests[i]->saveTestResult(pTab, FAILED_TO_CREATE); - continue; - } - - if (NDBT_Tables::createTable(&ndb, pTab->getName(), - nologging, false, - g_create_hook, this) != 0) { - numTestsFail++; - numTestsExecuted++; - g_err << "ERROR1: Failed to create table " << pTab->getName() - << pDict->getNdbError() << endl; - tests[i]->saveTestResult(pTab, FAILED_TO_CREATE); - continue; - } - pTab2 = pDict->getTable(pTab->getName()); - - ctx->addTab(pTab2); - } - ctx->setNumRecords(records); ctx->setNumLoops(loops); ctx->setSuite(this); - - const NdbDictionary::Table** tables= ctx->getTables(); - result = tests[i]->execute(ctx); - tests[i]->saveTestResult(tables[0], result); + int result = tests[i]->execute(ctx); + + tests[i]->saveTestResult("", result); if (result != NDBT_OK) numTestsFail++; else numTestsOk++; numTestsExecuted++; - - if(result == NDBT_OK) - { - for(t = 0; tables[t] != 0; t++) - { - pDict->dropTable(tables[t]->getName()); - } - } - + delete ctx; } } @@ -913,17 +891,9 @@ int NDBT_TestSuite::executeAll(Ndb_clust int NDBT_TestSuite::executeOne(Ndb_cluster_connection& con, const char* _tabname, const char* _testname){ - - if(tests.size() == 0) - return NDBT_FAILED; - Ndb ndb(&con, "TEST_DB"); - ndb.init(1024); - int result = ndb.waitUntilReady(300); // 5 minutes - if (result != 0){ - g_err << name <<": Ndb was not ready" << endl; + if(tests.size() == 0) return NDBT_FAILED; - } ndbout << name << " started [" << getDate() << "]" << endl; @@ -933,7 +903,7 @@ NDBT_TestSuite::executeOne(Ndb_cluster_c ndbout << "|- " << ptab->getName() << endl; - execute(con, &ndb, ptab, _testname); + execute(con, ptab, _testname); if (numTestsFail > 0){ return NDBT_FAILED; @@ -1026,7 +996,7 @@ NDBT_TestSuite::createHook(Ndb* ndb, Ndb } void NDBT_TestSuite::execute(Ndb_cluster_connection& con, - Ndb* ndb, const NdbDictionary::Table* pTab, + const NdbDictionary::Table* pTab, const char* _testname){ int result; @@ -1047,50 +1017,20 @@ void NDBT_TestSuite::execute(Ndb_cluster tests[t]->initBeforeTest(); - NdbDictionary::Dictionary* pDict = ndb->getDictionary(); - const NdbDictionary::Table* pTab2 = pDict->getTable(pTab->getName()); - if (createTable == true){ - - if(pTab2 != 0 && pDict->dropTable(pTab->getName()) != 0){ - numTestsFail++; - numTestsExecuted++; - g_err << "ERROR0: Failed to drop table " << pTab->getName() << endl; - tests[t]->saveTestResult(pTab, FAILED_TO_CREATE); - continue; - } - - if (NDBT_Tables::createTable(ndb, pTab->getName(), nologging, false, - g_create_hook, this) != 0) { - numTestsFail++; - numTestsExecuted++; - g_err << "ERROR1: Failed to create table " << pTab->getName() - << pDict->getNdbError() << endl; - tests[t]->saveTestResult(pTab, FAILED_TO_CREATE); - continue; - } - pTab2 = pDict->getTable(pTab->getName()); - } else if(!pTab2) { - pTab2 = pTab; - } - ctx = new NDBT_Context(con); - ctx->setTab(pTab2); ctx->setNumRecords(records); ctx->setNumLoops(loops); ctx->setSuite(this); - + ctx->setTab(pTab); + result = tests[t]->execute(ctx); - tests[t]->saveTestResult(pTab, result); + tests[t]->saveTestResult(pTab->getName(), result); if (result != NDBT_OK) numTestsFail++; else numTestsOk++; numTestsExecuted++; - if (result == NDBT_OK && createTable == true && createAllTables == false){ - pDict->dropTable(pTab->getName()); - } - tests[t]->m_has_run = true; delete ctx; @@ -1099,6 +1039,133 @@ void NDBT_TestSuite::execute(Ndb_cluster +int +NDBT_TestSuite::createTables(Ndb_cluster_connection& con) const +{ + Ndb ndb(&con, "TEST_DB"); + ndb.init(1); + + NdbDictionary::Dictionary* pDict = ndb.getDictionary(); + for(unsigned i = 0; idropTable(tab_name) != 0) + { + g_err << "runCreateTables: Failed to drop table " << tab_name + << pDict->getNdbError() << endl; + return NDBT_FAILED; + } + if(NDBT_Tables::createTable(&ndb, tab_name, getLogging()) != 0) + { + g_err << "runCreateTables: Failed to create table " << tab_name + << pDict->getNdbError() << endl; + return NDBT_FAILED; + } + + if (i == 0){ + // Update ctx with a pointer to the first created table + const NdbDictionary::Table* pTab2 = pDict->getTable(tab_name); + ctx->setTab(pTab2); + } + g_info << "created " << tab_name << endl; + } + + return NDBT_OK; +} + + +static int +runCreateTables(NDBT_Context* ctx, NDBT_Step* step) +{ + NDBT_TestSuite* suite= ctx->getSuite(); + return suite->createTables(ctx->m_cluster_connection); +} + + +static int +runCreateTable(NDBT_Context* ctx, NDBT_Step* step) +{ + Ndb ndb(&ctx->m_cluster_connection, "TEST_DB"); + ndb.init(1); + + NdbDictionary::Dictionary* pDict = ndb.getDictionary(); + const NdbDictionary::Table* pTab = ctx->getTab(); + const char *tab_name= pTab->getName(); + if (pDict->dropTable(tab_name) > 0) + { + g_err << "runCreateTable: Failed to drop table " << tab_name + << pDict->getNdbError() << endl; + return NDBT_FAILED; + } + + if(NDBT_Tables::createTable(&ndb, tab_name, + ctx->getSuite()->getLogging()) != 0) + { + g_err << "runCreateTable: Failed to create table " << tab_name + << pDict->getNdbError() << endl; + return NDBT_FAILED; + } + + // Update ctx with a pointer to the created table + const NdbDictionary::Table* pTab2 = pDict->getTable(tab_name); + ctx->setTab(pTab2); + + return NDBT_OK; +} + + +int +NDBT_TestSuite::dropTables(Ndb_cluster_connection& con) const +{ + Ndb ndb(&con, "TEST_DB"); + ndb.init(1); + + int res= NDBT_OK; + NdbDictionary::Dictionary* pDict = ndb.getDictionary(); + for(unsigned i = 0; idropTable(tab_name) != 0) + { + g_err << "runDropTables: Failed to drop table " << tab_name + << pDict->getNdbError() << endl; + res= NDBT_FAILED; + // Continue, try to drop all tables... + } + + g_info << "dropped " << tab_name << endl; + } + return NDBT_OK; +} + + +static int +runDropTables(NDBT_Context* ctx, NDBT_Step* step) +{ + NDBT_TestSuite* suite= ctx->getSuite(); + return suite->dropTables(ctx->m_cluster_connection); +} + + +static int +runDropTable(NDBT_Context* ctx, NDBT_Step* step) +{ + Ndb ndb(&ctx->m_cluster_connection, "TEST_DB"); + ndb.init(1); + + NdbDictionary::Dictionary* pDict = ndb.getDictionary(); + const NdbDictionary::Table* pTab = ctx->getTab(); + const char *tab_name= pTab->getName(); + + if (pDict->dropTable(tab_name) != 0) + { + g_err << "runDropTables: Failed to drop table " << tab_name + << pDict->getNdbError() << endl; + return NDBT_FAILED; + } + return NDBT_OK; +} + int NDBT_TestSuite::report(const char* _tcname){ @@ -1269,21 +1336,6 @@ int NDBT_TestSuite::execute(int argc, co return NDBT_ProgramExit(NDBT_WRONGARGS); } - if (opt_print == true){ - printExecutionTree(); - return 0; - } - - if (opt_print_html == true){ - printExecutionTreeHTML(); - return 0; - } - - if (opt_print_cases == true){ - printCases(); - return 0; - } - if (opt_verbose) setOutputLevel(2); // Show g_info else @@ -1292,15 +1344,10 @@ int NDBT_TestSuite::execute(int argc, co records = opt_records; loops = opt_loops; timer = opt_timer; - nologging = opt_nologging; + if (opt_nologging) + setLogging(false); temporaryTables = opt_temporary; - Ndb_cluster_connection con; - if(con.connect(12, 5, 1)) - { - return NDBT_ProgramExit(NDBT_FAILED); - } - if (opt_seed == 0) { opt_seed = NdbTick_CurrentMillisecond(); @@ -1310,46 +1357,58 @@ int NDBT_TestSuite::execute(int argc, co srandom(opt_seed); global_flag_skip_invalidate_cache = 1; - - { - Ndb ndb(&con, "TEST_DB"); - ndb.init(1024); - if (ndb.waitUntilReady(500)){ - g_err << "Ndb was not ready" << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - NdbDictionary::Dictionary* pDict = ndb.getDictionary(); - int num_tables= argc; + int num_tables= argc; + if (argc == 0) + num_tables = NDBT_Tables::getNumTables(); + + for(int i = 0; igetName()); + else + m_tables_in_test.push_back(_argv[i]); + } - for(int i = 0; igetName()); - else - m_tables_in_test.push_back(_argv[i]); - if (createAllTables == true) - { - const char *tab_name= m_tables_in_test[i].c_str(); - const NdbDictionary::Table* pTab = pDict->getTable(tab_name); - if (pTab && pDict->dropTable(tab_name) != 0) - { - g_err << "ERROR0: Failed to drop table " << tab_name - << pDict->getNdbError() << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - if(NDBT_Tables::createTable(&ndb, tab_name, nologging) != 0) - { - g_err << "ERROR1: Failed to create table " << tab_name - << pDict->getNdbError() << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - } + NDBT_TestCaseImpl1* pt= (NDBT_TestCaseImpl1*)tests[t]; + NDBT_Initializer* pti = + new NDBT_Initializer(pt, + m_createAll ? "runCreateTables" : "runCreateTable", + m_createAll ? runCreateTables : runCreateTable); + pt->addInitializer(pti, true); + NDBT_Finalizer* ptf = + new NDBT_Finalizer(pt, + m_createAll ? "runDropTables" : "runDropTable", + m_createAll ? runDropTables : runDropTable); + pt->addFinalizer(ptf); } } + if (opt_print == true){ + printExecutionTree(); + return 0; + } + + if (opt_print_html == true){ + printExecutionTreeHTML(); + return 0; + } + + if (opt_print_cases == true){ + printCases(); + return 0; + } + + Ndb_cluster_connection con; + if(con.connect(12, 5, 1)) + { + return NDBT_ProgramExit(NDBT_FAILED); + } + if(argc == 0){ // No table specified res = executeAll(con, opt_testname); @@ -1362,24 +1421,8 @@ int NDBT_TestSuite::execute(int argc, co res = report(opt_testname); } - if (res == NDBT_OK && createAllTables == true) - { - Ndb ndb(&con, "TEST_DB"); - ndb.init(1024); - if (ndb.waitUntilReady(500)){ - g_err << "Ndb was not ready" << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - NdbDictionary::Dictionary* pDict = ndb.getDictionary(); - for(unsigned i = 0; idropTable(m_tables_in_test[i].c_str()); - } - } - return NDBT_ProgramExit(res); } - void NDBT_TestSuite::printExecutionTree(){ === modified file 'storage/ndb/tools/restore/restore_main.cpp' --- a/storage/ndb/tools/restore/restore_main.cpp 2008-06-18 21:19:57 +0000 +++ b/storage/ndb/tools/restore/restore_main.cpp 2008-07-24 10:57:58 +0000 @@ -929,7 +929,8 @@ main(int argc, char** argv) { for(i=0; i < metaData.getNoOfTables(); i++) { - if (checkSysTable(metaData, i)) + if (checkSysTable(metaData, i) && + checkDbAndTableName(metaData[i])) { for(Uint32 j= 0; j < g_consumers.size(); j++) if (!g_consumers[j]->table_equal(* metaData[i])) @@ -945,7 +946,8 @@ main(int argc, char** argv) { //if want to promote attributes, compability check is done firstly for (i=0; i < metaData.getNoOfTables(); i++){ - if (checkSysTable(metaData, i)) + if (checkSysTable(metaData, i) && + checkDbAndTableName(metaData[i])) { for(Uint32 j= 0; j < g_consumers.size(); j++) {