#At file:///home/jonas/bzr/telco-6.4/
2680 jonas oreland 2008-08-03 [merge]
merge bk-internal:telco-6.4 into eel:telco-6.4
removed:
storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp
added:
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/blocks/RestoreProxy.cpp
storage/ndb/src/kernel/blocks/RestoreProxy.hpp
storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp
storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp
storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp
storage/ndb/src/kernel/vm/GlobalData.cpp
storage/ndb/src/kernel/vm/dummy_nonmt.cpp
modified:
mysql-test/suite/ndb/r/ndb_restore.result
mysql-test/suite/ndb/r/ndb_restore_compressed.result
mysql-test/suite/ndb/t/ndb_restore.test
storage/ndb/include/debugger/SignalLoggerManager.hpp
storage/ndb/include/kernel/RefConvert.hpp
storage/ndb/include/kernel/kernel_config_parameters.h
storage/ndb/include/kernel/kernel_types.h
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/common/debugger/SignalLoggerManager.cpp
storage/ndb/src/common/logger/Logger.cpp
storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/SimBlockList.cpp
storage/ndb/src/kernel/blocks/Makefile.am
storage/ndb/src/kernel/blocks/backup/Backup.hpp
storage/ndb/src/kernel/blocks/backup/BackupInit.cpp
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
storage/ndb/src/kernel/blocks/restore.cpp
storage/ndb/src/kernel/blocks/restore.hpp
storage/ndb/src/kernel/main.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
storage/ndb/src/kernel/vm/Makefile.am
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
storage/ndb/src/mgmapi/mgmapi.cpp
storage/ndb/src/mgmclient/CommandInterpreter.cpp
storage/ndb/src/mgmsrv/Config.cpp
storage/ndb/src/mgmsrv/Config.hpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/src/mgmsrv/ConfigInfo.hpp
storage/ndb/src/mgmsrv/InitConfigFileParser.cpp
storage/ndb/src/mgmsrv/InitConfigFileParser.hpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp
storage/ndb/src/mgmsrv/Services.cpp
storage/ndb/src/mgmsrv/Services.hpp
storage/ndb/src/mgmsrv/main.cpp
storage/ndb/src/ndbapi/SignalSender.cpp
storage/ndb/src/ndbapi/SignalSender.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/test/include/NDBT_Test.hpp
storage/ndb/test/include/NdbMgmd.hpp
storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp
storage/ndb/test/ndbapi/testBasic.cpp
storage/ndb/test/ndbapi/testBasicAsynch.cpp
storage/ndb/test/ndbapi/testMgm.cpp
storage/ndb/test/ndbapi/testOIBasic.cpp
storage/ndb/test/ndbapi/testPartitioning.cpp
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/src/NDBT_Tables.cpp
storage/ndb/test/src/NDBT_Test.cpp
storage/ndb/tools/restore/restore_main.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_restore.result'
--- a/mysql-test/suite/ndb/r/ndb_restore.result 2008-05-29 11:30:21 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore.result 2008-07-24 10:57:58 +0000
@@ -574,8 +574,29 @@ SELECT @the_backup_id:=backup_id FROM te
@the_backup_id:=backup_id
<the_backup_id>
DROP TABLE test.backup_info;
+CREATE TABLE t11_c (
+c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+CREATE TABLE t12_c (
+c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3,
"eeeee","fffff");
+INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6,
"kkkkk","lllll");
+CREATE TEMPORARY TABLE IF NOT EXISTS test.backup_info (id INT, backup_id INT) ENGINE =
HEAP;
+DELETE FROM test.backup_info;
+LOAD DATA INFILE '../tmp.dat' INTO TABLE test.backup_info FIELDS TERMINATED BY ',';
+SELECT @the_backup_id:=backup_id FROM test.backup_info;
+@the_backup_id:=backup_id
+<the_backup_id>
+DROP TABLE test.backup_info;
+drop table t2_c,t11_c,t12_c;
+SELECT * FROM t11_c ORDER BY c1;
+c1 c2 c3
+1 aaaaa bbbbb
+2 ccccc ddddd
+3 eeeee fffff
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11;
-drop table if exists t2_c;
+drop table if exists t2_c,t11_c,t12_c;
520093696,<the_backup_id>
select epoch from mysql.ndb_apply_status where server_id=0;
epoch
=== modified file 'mysql-test/suite/ndb/r/ndb_restore_compressed.result'
--- a/mysql-test/suite/ndb/r/ndb_restore_compressed.result 2008-05-29 11:30:21 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore_compressed.result 2008-07-24 10:57:58 +0000
@@ -574,8 +574,29 @@ SELECT @the_backup_id:=backup_id FROM te
@the_backup_id:=backup_id
<the_backup_id>
DROP TABLE test.backup_info;
+CREATE TABLE t11_c (
+c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+CREATE TABLE t12_c (
+c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3,
"eeeee","fffff");
+INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6,
"kkkkk","lllll");
+CREATE TEMPORARY TABLE IF NOT EXISTS test.backup_info (id INT, backup_id INT) ENGINE =
HEAP;
+DELETE FROM test.backup_info;
+LOAD DATA INFILE '../tmp.dat' INTO TABLE test.backup_info FIELDS TERMINATED BY ',';
+SELECT @the_backup_id:=backup_id FROM test.backup_info;
+@the_backup_id:=backup_id
+<the_backup_id>
+DROP TABLE test.backup_info;
+drop table t2_c,t11_c,t12_c;
+SELECT * FROM t11_c ORDER BY c1;
+c1 c2 c3
+1 aaaaa bbbbb
+2 ccccc ddddd
+3 eeeee fffff
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11;
-drop table if exists t2_c;
+drop table if exists t2_c,t11_c,t12_c;
520093696,<the_backup_id>
select epoch from mysql.ndb_apply_status where server_id=0;
epoch
=== modified file 'mysql-test/suite/ndb/t/ndb_restore.test'
--- a/mysql-test/suite/ndb/t/ndb_restore.test 2008-06-18 21:19:57 +0000
+++ b/mysql-test/suite/ndb/t/ndb_restore.test 2008-07-24 10:57:58 +0000
@@ -435,13 +435,31 @@ drop table t1_c,t3_c,t4_c,t5_c,t6_c,t7_c
--source include/ndb_backup.inc
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults --core=0 -b $the_backup_id -n 1 -m -r
--ndb-nodegroup_map '(0,1)' $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id 2>&1 |
grep Translate || true
+CREATE TABLE t11_c (
+ c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+
+CREATE TABLE t12_c (
+ c1 int primary key, c2 char(10), c3 varchar(10)
+) ENGINE=ndbcluster DEFAULT CHARSET=latin1;
+
+INSERT INTO t11_c VALUES(1, "aaaaa", "bbbbb"), (2, "ccccc", "ddddd"), (3,
"eeeee","fffff");
+INSERT INTO t12_c VALUES(4, "ggggg", "hhhhh"), (5, "iiiii", "jjjjj"), (6,
"kkkkk","lllll");
+--source include/ndb_backup.inc
+drop table t2_c,t11_c,t12_c;
+# Only part of tables is restored, it should work
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b $the_backup_id -n 1 -m -r --print
--print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id test t11_c >>
$NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b $the_backup_id -n 2 -r --print
--print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-$the_backup_id test t11_c >>
$NDB_TOOLS_OUTPUT
+#Should only t11_c is restored
+SELECT * FROM t11_c ORDER BY c1;
+
#
# Cleanup
#
--disable_warnings
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11;
-drop table if exists t2_c;
+drop table if exists t2_c,t11_c,t12_c;
--enable_warnings
#
=== modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp'
--- a/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp 2008-07-25 05:48:32 +0000
@@ -26,6 +26,7 @@
#include <kernel_types.h>
#include <BlockNumbers.h>
#include <TransporterDefinitions.hpp>
+#include <RefConvert.hpp>
class SignalLoggerManager
{
@@ -187,11 +188,17 @@ private:
Uint32 traceId;
Uint8 logModes[NO_OF_BLOCKS];
+
+ NdbMutex* m_mutex;
+ void lock() { if (m_mutex != 0) NdbMutex_Lock(m_mutex); }
+ void unlock() { if (m_mutex != 0) NdbMutex_Unlock(m_mutex); }
public:
inline bool
logMatch(BlockNumber bno, LogMode mask)
{
+ // extract main block number
+ bno = blockToMain(bno);
// avoid addressing outside logModes
return
bno < MIN_BLOCK_NO || bno > MAX_BLOCK_NO ||
=== modified file 'storage/ndb/include/kernel/RefConvert.hpp'
--- a/storage/ndb/include/kernel/RefConvert.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/RefConvert.hpp 2008-07-25 05:48:32 +0000
@@ -16,30 +16,82 @@
#ifndef REFCONVERT_H
#define REFCONVERT_H
+#include <assert.h>
#include "kernel_types.h"
+/*
+ * In multithreaded kernel, BlockNumber includes the main block
+ * number in lower 9 bits and the instance in upper 7 bits.
+ */
+
+inline
+BlockNumber blockToMain(Uint32 block){
+ assert(block < (1 << 16));
+ return (BlockNumber)(block & ((1 << 9) - 1));
+}
+
+inline
+BlockInstance blockToInstance(Uint32 block){
+ assert(block < (1 << 16));
+ return (BlockNumber)(block >> 9);
+}
+
+inline
+BlockNumber numberToBlock(Uint32 main, Uint32 instance)
+{
+ assert(main < (1 << 9) && instance < (1 << (16 - 9)));
+ return (BlockNumber)(main | (instance << 9));
+}
+
+/**
+ * Convert BlockReference to NodeId
+ */
+inline
+NodeId refToNode(Uint32 ref){
+ return (NodeId)(ref & ((1 << 16) - 1));
+}
+
/**
- * Convert BlockReference to BlockNumber
+ * Convert BlockReference to full 16-bit BlockNumber.
*/
inline
-BlockNumber refToBlock(BlockReference ref){
+BlockNumber refToBlock(Uint32 ref){
return (BlockNumber)(ref >> 16);
}
/**
- * Convert BlockReference to NodeId
+ * Convert BlockReference to main BlockNumber.
+ * Used in tests such as: refToMain(senderRef) == DBTC.
+ */
+inline
+BlockNumber refToMain(Uint32 ref){
+ return (BlockNumber)((ref >> 16) & ((1 << 9) - 1));
+}
+
+/**
+ * Convert BlockReference to BlockInstance.
*/
inline
-NodeId refToNode(BlockReference ref){
- return (NodeId)(ref & 0xFFFF);
+BlockInstance refToInstance(Uint32 ref){
+ return (BlockInstance)(ref >> (16 + 9));
}
/**
* Convert NodeId and BlockNumber to BlockReference
*/
inline
-BlockReference numberToRef(BlockNumber bnr, NodeId proc){
- return (((Uint32)bnr) << 16) + proc;
+BlockReference numberToRef(Uint32 block, Uint32 node){
+ assert(node < (1 << 16) && block < (1 << 16));
+ return (BlockReference)(node | (block << 16));
+}
+
+/**
+ * Convert NodeId and block main and instance to BlockReference
+ */
+inline
+BlockReference numberToRef(Uint32 main, Uint32 instance, Uint32 node){
+ assert(node < (1 << 16) && main < (1 << 9) &&
instance < (1 << (16 - 9)));
+ return (BlockReference)(node | (main << 16) | (instance << (16 + 9)));
}
#endif
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h 2006-12-31 00:32:21 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2008-07-25 05:48:32 +0000
@@ -63,4 +63,7 @@
#define CFG_TUX_ATTRIBUTE (PRIVATE_BASE + 42)
#define CFG_TUX_SCAN_OP (PRIVATE_BASE + 43)
+#define CFG_NDBMT_WORKERS (PRIVATE_BASE + 44)
+#define CFG_NDBMT_THREADS (PRIVATE_BASE + 45)
+
#endif
=== modified file 'storage/ndb/include/kernel/kernel_types.h'
--- a/storage/ndb/include/kernel/kernel_types.h 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/kernel_types.h 2008-07-25 05:48:32 +0000
@@ -22,6 +22,7 @@
typedef Uint16 NodeId;
typedef Uint16 BlockNumber;
+typedef Uint16 BlockInstance;
typedef Uint32 BlockReference;
typedef Uint16 GlobalSignalNumber;
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-07-25 05:48:32 +0000
@@ -178,7 +178,10 @@
*/
#define NDBMT_BLOCK_BITS 9
#define NDBMT_BLOCK_MASK 0x001FF
-#define NDBMT_BLOCK_INSTANCES_BITS 7
+#define NDBMT_BLOCK_INSTANCE_BITS 7
#define NDBMT_BLOCK_INSTANCE_MASK 0xFE00
+#define MAX_NDBMT_WORKERS 4
+#define MAX_NDBMT_THREADS 4
+
#endif
=== removed file 'storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp'
--- a/storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/mgmcommon/MgmtErrorReporter.hpp 1970-01-01 00:00:00 +0000
@@ -1,67 +0,0 @@
-/* Copyright (C) 2003 MySQL AB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-//******************************************************************************
-// Description: This file contains the error reporting macros to be used
-// within management server.
-//
-// Author: Peter Lind
-//******************************************************************************
-
-
-#include <ndb_global.h> // exit
-#include <NdbOut.hpp>
-
-#define REPORT_WARNING(message) \
- ndbout << "WARNING: " << message << endl
-
-//****************************************************************************
-// Description: Report a warning, the message is printed on ndbout.
-// Parameters:
-// message: A text describing the warning.
-// Returns: -
-//****************************************************************************
-
-
-#define REPORT_ERROR(message) \
- ndbout << "ERROR: " << message << endl
-
-//****************************************************************************
-// Description: Report an error, the message is printed on ndbout.
-// Parameters:
-// message: A text describing the error.
-// Returns: -
-//****************************************************************************
-
-
-#ifdef MGMT_TRACE
-
-#define TRACE(message) \
- ndbout << "MGMT_TRACE: " << message << endl
-#else
-#define TRACE(message)
-
-#endif
-
-//****************************************************************************
-// Description: Print a message on ndbout.
-// Parameters:
-// message: The message
-// Returns: -
-//****************************************************************************
-
-#define MGM_REQUIRE(x) \
- if (!(x)) { ndbout << __FILE__ << " " << __LINE__ \
- << ": Warning! Requirement failed" << endl; }
=== modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp'
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2008-07-25 05:48:32 +0000
@@ -17,8 +17,22 @@
#include "SignalLoggerManager.hpp"
#include <LongSignal.hpp>
-
+#include <GlobalSignalNumbers.h>
#include <DebuggerNames.hpp>
+#include <NdbTick.h>
+
+// avoids MT log mixups but has some serializing effect
+static const bool g_use_mutex = true;
+
+static char* mytime()
+{
+ NDB_TICKS t = NdbTick_CurrentMillisecond();
+ uint s = (t / 1000) % 3600;
+ uint ms = t % 1000;
+ static char buf[100];
+ sprintf(buf, "%u.%03u", s, ms);
+ return buf;
+}
SignalLoggerManager::SignalLoggerManager()
{
@@ -28,6 +42,9 @@ SignalLoggerManager::SignalLoggerManager
outputStream = 0;
m_ownNodeId = 0;
m_logDistributed = false;
+ m_mutex = 0;
+ if (g_use_mutex)
+ m_mutex = NdbMutex_Create();
}
SignalLoggerManager::~SignalLoggerManager()
@@ -37,6 +54,10 @@ SignalLoggerManager::~SignalLoggerManage
fclose(outputStream);
outputStream = 0;
}
+ if (m_mutex != 0) {
+ NdbMutex_Destroy(m_mutex);
+ m_mutex = 0;
+ }
}
FILE *
@@ -133,7 +154,7 @@ SignalLoggerManager::log(LogMode logMode
count == 0){
for (int number = 0; number < NO_OF_BLOCKS; ++number){
- cnt += log(SLM_ON, number, logMode);
+ cnt += log(SLM_ON, MIN_BLOCK_NO + number, logMode);
}
} else {
for (int i = 0; i < count; ++i){
@@ -220,15 +241,17 @@ SignalLoggerManager::executeDirect(const
if(outputStream != 0 &&
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) || logMatch(receiverBlockNo, LogIn))){
+ lock();
const char* inOutStr = prio == 0 ? "In" : "Out";
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Direct --- Signal --- %s - %d ----\n", inOutStr,
time(0));
+ fprintf(outputStream, "---- Direct --- Signal --- %s - %s ----\n", inOutStr,
mytime());
#else
fprintf(outputStream, "---- Direct --- Signal --- %s ----------------\n", inOutStr);
#endif
// XXX pass in/out to print* function somehow
printSignalHeader(outputStream, sh, 0, node, true);
printSignalData(outputStream, sh, theData);
+ unlock();
}
}
@@ -249,8 +272,9 @@ SignalLoggerManager::executeSignal(const
(traceId == 0 || traceId == trace) &&
(logMatch(receiverBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != senderNode))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Received - Signal ----------------\n");
#endif
@@ -259,6 +283,7 @@ SignalLoggerManager::executeSignal(const
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -276,8 +301,9 @@ SignalLoggerManager::executeSignal(const
(traceId == 0 || traceId == trace) &&
(logMatch(receiverBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != senderNode))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Received - Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Received - Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Received - Signal ----------------\n");
#endif
@@ -286,6 +312,7 @@ SignalLoggerManager::executeSignal(const
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printLinearSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -306,8 +333,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -316,6 +344,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printLinearSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -335,8 +364,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -345,6 +375,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -362,8 +393,9 @@ SignalLoggerManager::sendSignal(const Si
(traceId == 0 || traceId == trace) &&
(logMatch(senderBlockNo, LogOut) ||
(m_logDistributed && m_ownNodeId != node))){
+ lock();
#ifdef VM_TRACE_TIME
- fprintf(outputStream, "---- Send ----- Signal - %d ----\n", time(0));
+ fprintf(outputStream, "---- Send ----- Signal - %s ----\n", mytime());
#else
fprintf(outputStream, "---- Send ----- Signal ----------------\n");
#endif
@@ -372,6 +404,7 @@ SignalLoggerManager::sendSignal(const Si
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printGenericSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -388,11 +421,12 @@ SignalLoggerManager::sendSignalWithDelay
if(outputStream != 0 &&
(traceId == 0 || traceId == trace) &&
logMatch(senderBlockNo, LogOut)){
+ lock();
#ifdef VM_TRACE_TIME
fprintf(outputStream,
- "---- Send ----- Signal (%d ms) %d\n",
+ "---- Send ----- Signal (%d ms) %s\n",
delayInMilliSeconds,
- time(0));
+ mytime());
#else
fprintf(outputStream, "---- Send delay Signal (%d ms) ----------\n",
delayInMilliSeconds);
@@ -402,6 +436,7 @@ SignalLoggerManager::sendSignalWithDelay
printSignalData(outputStream, sh, theData);
for (unsigned i = 0; i < secs; i++)
printSegmentedSection(outputStream, sh, ptr, i);
+ unlock();
}
}
@@ -417,15 +452,40 @@ SignalLoggerManager::log(BlockNumber bno
if(outputStream != 0 &&
logModes[bno2] != LogOff){
+ lock();
va_list ap;
va_start(ap, msg);
fprintf(outputStream, "%s: ", getBlockName(bno, "API"));
vfprintf(outputStream, msg, ap);
fprintf(outputStream, "\n");
va_end(ap);
+ unlock();
}
}
+static inline bool
+isSysBlock(BlockNumber block, Uint32 gsn)
+{
+ if (block != 0)
+ return false;
+ switch (gsn) {
+ case GSN_START_ORD:
+ return true; // first sig
+ case GSN_CONNECT_REP:
+ case GSN_DISCONNECT_REP:
+ case GSN_EVENT_REP:
+ return true; // transporter
+ case GSN_STOP_FOR_CRASH:
+ return true; // mt scheduler
+ }
+ return false;
+}
+
+static inline bool
+isApiBlock(BlockNumber block)
+{
+ return block >= 0x8000 || block == 4002 || block == 2047;
+}
void
SignalLoggerManager::printSignalHeader(FILE * output,
@@ -434,36 +494,79 @@ SignalLoggerManager::printSignalHeader(F
Uint32 node,
bool printReceiversSignalId)
{
- Uint32 receiverBlockNo = sh.theReceiversBlockNumber;
+ const char* const dummy_block_name = "UUNET";
+
+ bool receiverIsApi = isApiBlock(sh.theReceiversBlockNumber);
+ Uint32 receiverBlockNo;
+ Uint32 receiverInstanceNo;
+ if (!receiverIsApi) {
+ receiverBlockNo = blockToMain(sh.theReceiversBlockNumber);
+ receiverInstanceNo = blockToInstance(sh.theReceiversBlockNumber);
+ } else {
+ receiverBlockNo = sh.theReceiversBlockNumber;
+ receiverInstanceNo = 0;
+ }
Uint32 receiverProcessor = node;
+
Uint32 gsn = sh.theVerId_signalNumber;
- Uint32 senderBlockNo = refToBlock(sh.theSendersBlockRef);
- Uint32 senderProcessor = refToNode(sh.theSendersBlockRef);
+
+ Uint32 sbref = sh.theSendersBlockRef;
+ bool senderIsSys = isSysBlock(refToBlock(sbref), gsn);
+ bool senderIsApi = isApiBlock(refToBlock(sbref));
+ Uint32 senderBlockNo;
+ Uint32 senderInstanceNo;
+ if (!senderIsSys && !senderIsApi) {
+ senderBlockNo = refToMain(sbref);
+ senderInstanceNo = refToInstance(sbref);
+ } else {
+ senderBlockNo = refToBlock(sbref);
+ senderInstanceNo = 0;
+ }
+ Uint32 senderProcessor = refToNode(sbref);
+
Uint32 length = sh.theLength;
Uint32 trace = sh.theTrace;
Uint32 rSigId = sh.theSignalId;
Uint32 sSigId = sh.theSendersSignalId;
const char * signalName = getSignalName(gsn);
- const char * rBlockName = getBlockName(receiverBlockNo, "API");
- const char * sBlockName = getBlockName(senderBlockNo, "API");
-
- if(printReceiversSignalId)
+ const char * rBlockName =
+ receiverIsApi ? "API" :
+ getBlockName(receiverBlockNo, dummy_block_name);
+ const char * sBlockName =
+ senderIsSys ? "SYS" :
+ senderIsApi ? "API" :
+ getBlockName(senderBlockNo, dummy_block_name);
+
+ char rInstanceText[20];
+ char sInstanceText[20];
+ rInstanceText[0] = 0;
+ sInstanceText[0] = 0;
+ if (receiverInstanceNo != 0)
+ sprintf(rInstanceText, "/%u", (uint)receiverInstanceNo);
+ if (senderInstanceNo != 0)
+ sprintf(sInstanceText, "/%u", (uint)senderInstanceNo);
+// wl4391_todo senders instance missing
+// assert(gsn != GSN_LQHKEYREQ || receiverProcessor == senderProcessor ||
senderInstanceNo != 0);
+ if (printReceiversSignalId)
fprintf(output,
- "r.bn: %d \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
- ,receiverBlockNo, rBlockName, receiverProcessor, rSigId,
- gsn, signalName, prio);
+ "r.bn: %d%s \"%s\", r.proc: %d, r.sigId: %d gsn: %d \"%s\" prio: %d\n"
+ ,receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+ rSigId, gsn, signalName, prio);
else
fprintf(output,
- "r.bn: %d \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
- receiverBlockNo, rBlockName, receiverProcessor, gsn,
- signalName, prio);
+ "r.bn: %d%s \"%s\", r.proc: %d, gsn: %d \"%s\" prio: %d\n",
+ receiverBlockNo, rInstanceText, rBlockName, receiverProcessor,
+ gsn, signalName, prio);
fprintf(output,
- "s.bn: %d \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
+ "s.bn: %d%s \"%s\", s.proc: %d, s.sigId: %d length: %d trace: %d "
"#sec: %d fragInf: %d\n",
- senderBlockNo, sBlockName, senderProcessor, sSigId, length, trace,
- sh.m_noOfSections, sh.m_fragmentInfo);
+ senderBlockNo, sInstanceText, sBlockName, senderProcessor,
+ sSigId, length, trace, sh.m_noOfSections, sh.m_fragmentInfo);
+
+ //assert(strcmp(rBlockName, dummy_block_name) != 0);
+ //assert(strcmp(sBlockName, dummy_block_name) != 0);
}
void
=== modified file 'storage/ndb/src/common/logger/Logger.cpp'
--- a/storage/ndb/src/common/logger/Logger.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/common/logger/Logger.cpp 2008-07-29 13:49:59 +0000
@@ -222,6 +222,7 @@ Logger::addHandler(const BaseString &log
*err= handler->getErrorCode();
if(handler->getErrorStr())
strncpy(errStr, handler->getErrorStr(), len);
+ delete handler;
DBUG_RETURN(false);
}
loghandlers.push_back(handler);
@@ -240,6 +241,13 @@ Logger::removeHandler(LogHandler* pHandl
int rc = false;
if (pHandler != NULL)
{
+ if (pHandler == m_pConsoleHandler)
+ m_pConsoleHandler= NULL;
+ if (pHandler == m_pFileHandler)
+ m_pFileHandler= NULL;
+ if (pHandler == m_pSyslogHandler)
+ m_pSyslogHandler= NULL;
+
rc = m_pHandlerList->remove(pHandler);
}
@@ -251,6 +259,10 @@ Logger::removeAllHandlers()
{
Guard g(m_mutex);
m_pHandlerList->removeAll();
+
+ m_pConsoleHandler= NULL;
+ m_pFileHandler= NULL;
+ m_pSyslogHandler= NULL;
}
bool
=== modified file 'storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp'
--- a/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp 2007-03-22 11:35:31 +0000
+++ b/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp 2008-06-10 20:06:47 +0000
@@ -24,7 +24,6 @@
#include <NdbTCP.h>
#include <NdbEnv.h>
-#include "MgmtErrorReporter.hpp"
#include <uucode.h>
#include <Properties.hpp>
@@ -89,9 +88,11 @@ ConfigRetriever::~ConfigRetriever()
{
DBUG_ENTER("ConfigRetriever::~ConfigRetriever");
if (m_handle) {
- if(m_end_session)
- ndb_mgm_end_session(m_handle);
- ndb_mgm_disconnect(m_handle);
+ if (ndb_mgm_is_connected(m_handle)) {
+ if(m_end_session)
+ ndb_mgm_end_session(m_handle);
+ ndb_mgm_disconnect(m_handle);
+ }
ndb_mgm_destroy_handle(&m_handle);
}
DBUG_VOID_RETURN;
@@ -160,15 +161,15 @@ ConfigRetriever::getConfig() {
}
ndb_mgm_configuration *
-ConfigRetriever::getConfig(NdbMgmHandle m_handle_arg)
+ConfigRetriever::getConfig(NdbMgmHandle mgm_handle)
{
- ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle_arg,
+ ndb_mgm_configuration * conf = ndb_mgm_get_configuration(mgm_handle,
m_version);
if(conf == 0)
{
- BaseString tmp(ndb_mgm_get_latest_error_msg(m_handle_arg));
+ BaseString tmp(ndb_mgm_get_latest_error_msg(mgm_handle));
tmp.append(" : ");
- tmp.append(ndb_mgm_get_latest_error_desc(m_handle_arg));
+ tmp.append(ndb_mgm_get_latest_error_desc(mgm_handle));
setError(CR_ERROR, tmp.c_str());
return 0;
}
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-07-22 13:42:54 +0000
@@ -266,12 +266,11 @@ TransporterRegistry::connect_server(NDB_
{
DBUG_ENTER("TransporterRegistry::connect_server");
- // read node id from client
- // read transporter type
+ // read node id and transporter type from client
int nodeId, remote_transporter_type= -1;
SocketInputStream s_input(sockfd);
- char buf[256];
- if (s_input.gets(buf, 256) == 0) {
+ char buf[11+1+11+1]; // <int> <int>
+ if (s_input.gets(buf, sizeof(buf)) == 0) {
DBUG_PRINT("error", ("Could not get node id from client"));
DBUG_RETURN(false);
}
=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp 2008-07-26 05:13:40 +0000
@@ -36,6 +36,13 @@
#include <pgman.hpp>
#include <restore.hpp>
#include <NdbEnv.h>
+#include <LocalProxy.hpp>
+#include <DblqhProxy.hpp>
+#include <DbaccProxy.hpp>
+#include <DbtupProxy.hpp>
+#include <DbtuxProxy.hpp>
+#include <BackupProxy.hpp>
+#include <RestoreProxy.hpp>
#ifndef VM_TRACE
#define NEW_BLOCK(B) new B
@@ -68,6 +75,9 @@ void * operator new (size_t sz, SIMBLOCK
#define NEW_BLOCK(B) new(A_VALUE) B
#endif
+extern bool g_ndbMt;
+extern bool g_ndbMtLqh;
+
void
SimBlockList::load(EmulatorData& data){
noOfBlocks = NO_OF_BLOCKS;
@@ -91,26 +101,44 @@ SimBlockList::load(EmulatorData& data){
fs = NEW_BLOCK(Ndbfs)(ctx);
}
}
-
+
theList[0] = pg = NEW_BLOCK(Pgman)(ctx);
theList[1] = lg = NEW_BLOCK(Lgman)(ctx);
theList[2] = ts = NEW_BLOCK(Tsman)(ctx, pg, lg);
- theList[3] = NEW_BLOCK(Dbacc)(ctx);
+ if (!g_ndbMtLqh)
+ theList[3] = NEW_BLOCK(Dbacc)(ctx);
+ else
+ theList[3] = NEW_BLOCK(DbaccProxy)(ctx);
theList[4] = NEW_BLOCK(Cmvmi)(ctx);
theList[5] = fs;
theList[6] = dbdict = NEW_BLOCK(Dbdict)(ctx);
theList[7] = dbdih = NEW_BLOCK(Dbdih)(ctx);
- theList[8] = NEW_BLOCK(Dblqh)(ctx);
+ if (!g_ndbMtLqh)
+ theList[8] = NEW_BLOCK(Dblqh)(ctx);
+ else
+ theList[8] = NEW_BLOCK(DblqhProxy)(ctx);
theList[9] = NEW_BLOCK(Dbtc)(ctx);
- theList[10] = NEW_BLOCK(Dbtup)(ctx, pg);
+ if (!g_ndbMtLqh)
+ theList[10] = NEW_BLOCK(Dbtup)(ctx, pg);
+ else
+ theList[10] = NEW_BLOCK(DbtupProxy)(ctx);//wl4391_todo pg
theList[11] = NEW_BLOCK(Ndbcntr)(ctx);
theList[12] = NEW_BLOCK(Qmgr)(ctx);
theList[13] = NEW_BLOCK(Trix)(ctx);
- theList[14] = NEW_BLOCK(Backup)(ctx);
+ if (!g_ndbMtLqh)
+ theList[14] = NEW_BLOCK(Backup)(ctx);
+ else
+ theList[14] = NEW_BLOCK(BackupProxy)(ctx);
theList[15] = NEW_BLOCK(DbUtil)(ctx);
theList[16] = NEW_BLOCK(Suma)(ctx);
- theList[17] = NEW_BLOCK(Dbtux)(ctx);
- theList[18] = NEW_BLOCK(Restore)(ctx);
+ if (!g_ndbMtLqh)
+ theList[17] = NEW_BLOCK(Dbtux)(ctx);
+ else
+ theList[17] = NEW_BLOCK(DbtuxProxy)(ctx);
+ if (!g_ndbMtLqh)
+ theList[18] = NEW_BLOCK(Restore)(ctx);
+ else
+ theList[18] = NEW_BLOCK(RestoreProxy)(ctx);
assert(NO_OF_BLOCKS == 19);
}
=== added file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,121 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mt.hpp>
+#include "LocalProxy.hpp"
+
+LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
+ SimulatedBlock(blockNumber, ctx)
+{
+ BLOCK_CONSTRUCTOR(LocalProxy);
+
+ ndbrequire(instance() == 0); // this is main block
+ c_workers = 0;
+ c_threads = 0;
+ Uint32 i;
+ for (i = 0; i < MaxWorkers; i++)
+ c_worker[i] = 0;
+
+ // GSN_READ_CONFIG_REQ
+ addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
+ addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
+}
+
+LocalProxy::~LocalProxy()
+{
+ // dtor of main block deletes workers
+}
+
+// GSN_READ_CONFIG_REQ
+
+void
+LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
+{
+ Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+ ndbrequire(!ss.m_active);
+ ss.m_active = true;
+
+ const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
+ ss.m_readConfigReq = *req;
+ ndbrequire(ss.m_readConfigReq.noOfParameters == 0);
+
+ const Uint32 workers = globalData.ndbmtWorkers;
+ const Uint32 threads = globalData.ndbmtThreads;
+
+ Uint32 i;
+ for (i = 0; i < workers; i++) {
+ const Uint32 instanceNo = 1 + i;
+ SimulatedBlock* worker = newWorker(instanceNo);
+ ndbrequire(worker->instance() == instanceNo);
+ ndbrequire(this->getInstance(instanceNo) == worker);
+ c_worker[i] = worker;
+
+ add_worker_thr_map(number(), instanceNo);
+ }
+
+ // set after instances are created (sendpacked)
+ c_workers = workers;
+ c_threads = threads;
+
+ // run sequentially due to big mallocs and initializations
+ sendREAD_CONFIG_REQ(signal, 0);
+}
+
+void
+LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 i)
+{
+ Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+
+ ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = i;
+ req->noOfParameters = 0;
+ sendSignal(workerRef(i), GSN_READ_CONFIG_REQ,
+ signal, ReadConfigReq::SignalLength, JBB);
+ // for verification only
+ ss.m_worker = i;
+}
+
+void
+LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
+{
+ Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+ ndbrequire(ss.m_active);
+
+ const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
+ ndbrequire(ss.m_worker == conf->senderData);
+ if (ss.m_worker + 1 < c_workers) {
+ jam();
+ sendREAD_CONFIG_REQ(signal, ss.m_worker + 1);
+ return;
+ }
+
+ sendREAD_CONFIG_CONF(signal);
+ ss.m_active = false;
+}
+
+void
+LocalProxy::sendREAD_CONFIG_CONF(Signal* signal)
+{
+ Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+
+ ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_readConfigReq.senderData;
+ sendSignal(ss.m_readConfigReq.senderRef, GSN_READ_CONFIG_CONF,
+ signal, ReadConfigConf::SignalLength, JBB);
+}
+
+BLOCK_FUNCTIONS(LocalProxy)
=== added file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,69 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_LOCAL_PROXY_HPP
+#define NDB_LOCAL_PROXY_HPP
+
+#include <pc.hpp>
+#include <SimulatedBlock.hpp>
+#include <signaldata/ReadConfig.hpp>
+
+/*
+ * Proxy blocks for MT LQH.
+ *
+ * The LQH proxy is the LQH block seen by other nodes and blocks,
+ * unless by-passed for efficiency. Real LQH instances (workers)
+ * run behind it.
+ *
+ * There are also ACC,TUP,TUX,BACKUP,RESTORE proxies and workers.
+ * All proxy classes are subclasses of LocalProxy.
+ */
+
+class LocalProxy : public SimulatedBlock {
+public:
+ LocalProxy(BlockNumber blockNumber, Block_context& ctx);
+ virtual ~LocalProxy();
+ BLOCK_DEFINES(LocalProxy);
+
+protected:
+ enum { MaxWorkers = MAX_NDBMT_WORKERS };
+ Uint32 c_workers;
+ Uint32 c_threads;
+ SimulatedBlock* c_worker[MaxWorkers];
+
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
+
+ // worker index to worker ref
+ BlockReference workerRef(Uint32 i) {
+ return numberToRef(number(), 1 + i, getOwnNodeId());
+ }
+
+ // GSN_READ_CONFIG_REQ
+ struct Ss_READ_CONFIG_REQ {
+ bool m_active;
+ Uint32 m_worker;
+ ReadConfigReq m_readConfigReq;
+ Ss_READ_CONFIG_REQ() :
+ m_active(false)
+ {}
+ };
+ Ss_READ_CONFIG_REQ c_ss_READ_CONFIG_REQ;
+ void execREAD_CONFIG_REQ(Signal*);
+ void sendREAD_CONFIG_REQ(Signal*, Uint32 i);
+ void execREAD_CONFIG_CONF(Signal*);
+ void sendREAD_CONFIG_CONF(Signal*);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am 2008-04-25 16:35:50 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am 2008-07-26 05:13:40 +0000
@@ -54,7 +54,14 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
dbtux/DbtuxGen.cpp dbtux/DbtuxMeta.cpp dbtux/DbtuxMaint.cpp \
dbtux/DbtuxNode.cpp dbtux/DbtuxTree.cpp dbtux/DbtuxScan.cpp \
dbtux/DbtuxSearch.cpp dbtux/DbtuxCmp.cpp dbtux/DbtuxStat.cpp \
- dbtux/DbtuxDebug.cpp
+ dbtux/DbtuxDebug.cpp \
+ LocalProxy.cpp \
+ dblqh/DblqhProxy.cpp \
+ dbacc/DbaccProxy.cpp \
+ dbtup/DbtupProxy.cpp \
+ dbtux/DbtuxProxy.cpp \
+ backup/BackupProxy.cpp \
+ RestoreProxy.cpp
EXTRA_PROGRAMS = ndb_print_file
ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp
=== added file 'storage/ndb/src/kernel/blocks/RestoreProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/RestoreProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/RestoreProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "RestoreProxy.hpp"
+#include "restore.hpp"
+
+RestoreProxy::RestoreProxy(Block_context& ctx) :
+ LocalProxy(RESTORE, ctx)
+{
+}
+
+RestoreProxy::~RestoreProxy()
+{
+}
+
+SimulatedBlock*
+RestoreProxy::newWorker(Uint32 instanceNo)
+{
+ return new Restore(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(RestoreProxy)
=== added file 'storage/ndb/src/kernel/blocks/RestoreProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/RestoreProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/RestoreProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,31 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_RESTORE_PROXY_HPP
+#define NDB_RESTORE_PROXY_HPP
+
+#include <LocalProxy.hpp>
+
+class RestoreProxy : public LocalProxy {
+public:
+ RestoreProxy(Block_context& ctx);
+ virtual ~RestoreProxy();
+ BLOCK_DEFINES(RestoreProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-06-05 20:31:21 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-07-26 05:13:40 +0000
@@ -40,8 +40,10 @@
*/
class Backup : public SimulatedBlock
{
+ friend class BackupProxy;
+
public:
- Backup(Block_context& ctx);
+ Backup(Block_context& ctx, Uint32 instanceNumber = 0);
virtual ~Backup();
BLOCK_DEFINES(Backup);
=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2008-06-05 20:31:21 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp 2008-07-26 05:13:40 +0000
@@ -26,8 +26,8 @@
//extern const unsigned Ndbcntr::g_sysTableCount;
-Backup::Backup(Block_context& ctx) :
- SimulatedBlock(BACKUP, ctx),
+Backup::Backup(Block_context& ctx, Uint32 instanceNumber) :
+ SimulatedBlock(BACKUP, ctx, instanceNumber),
c_nodes(c_nodePool),
c_backups(c_backupPool)
{
=== added file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "BackupProxy.hpp"
+#include "Backup.hpp"
+
+BackupProxy::BackupProxy(Block_context& ctx) :
+ LocalProxy(BACKUP, ctx)
+{
+}
+
+BackupProxy::~BackupProxy()
+{
+}
+
+SimulatedBlock*
+BackupProxy::newWorker(Uint32 instanceNo)
+{
+ return new Backup(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(BackupProxy)
=== added file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,31 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_BACKUP_PROXY_HPP
+#define NDB_BACKUP_PROXY_HPP
+
+#include <LocalProxy.hpp>
+
+class BackupProxy : public LocalProxy {
+public:
+ BackupProxy(Block_context& ctx);
+ virtual ~BackupProxy();
+ BLOCK_DEFINES(BackupProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2008-07-26 05:13:40 +0000
@@ -268,6 +268,8 @@ ElementHeader::clearScanBit(Uint32 heade
class Dbacc: public SimulatedBlock {
+ friend class DbaccProxy;
+
public:
// State values
enum State {
@@ -625,7 +627,7 @@ struct Tabrec {
typedef Ptr<Tabrec> TabrecPtr;
public:
- Dbacc(Block_context&);
+ Dbacc(Block_context&, Uint32 instanceNumber = 0);
virtual ~Dbacc();
// pointer to TUP instance in this thread
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2008-07-26 05:13:40 +0000
@@ -86,8 +86,8 @@ void Dbacc::initRecords()
ctablesize);
}//Dbacc::initRecords()
-Dbacc::Dbacc(Block_context& ctx):
- SimulatedBlock(DBACC, ctx),
+Dbacc::Dbacc(Block_context& ctx, Uint32 instanceNumber):
+ SimulatedBlock(DBACC, ctx, instanceNumber),
c_tup(0)
{
BLOCK_CONSTRUCTOR(Dbacc);
=== added file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "DbaccProxy.hpp"
+#include "Dbacc.hpp"
+
+DbaccProxy::DbaccProxy(Block_context& ctx) :
+ LocalProxy(DBACC, ctx)
+{
+}
+
+DbaccProxy::~DbaccProxy()
+{
+}
+
+SimulatedBlock*
+DbaccProxy::newWorker(Uint32 instanceNo)
+{
+ return new Dbacc(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(DbaccProxy)
=== added file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,31 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_DBACC_PROXY_HPP
+#define NDB_DBACC_PROXY_HPP
+
+#include <LocalProxy.hpp>
+
+class DbaccProxy : public LocalProxy {
+public:
+ DbaccProxy(Block_context& ctx);
+ virtual ~DbaccProxy();
+ BLOCK_DEFINES(DbaccProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-07-25 05:48:32 +0000
@@ -1642,9 +1642,6 @@ Dbdict::Dbdict(Block_context& ctx):
c_opDropEvent(c_opRecordPool),
c_opSignalUtil(c_opRecordPool),
c_opRecordSequence(0)
-#ifdef VM_TRACE
- ,debugOut(*new NullOutputStream())
-#endif
{
BLOCK_CONSTRUCTOR(Dbdict);
@@ -2394,25 +2391,8 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
new (ptr.p) DictObject();
objs.release();
}
-
-#ifdef VM_TRACE
- {
- FILE* debugFile = globalSignalLoggers.getOutputStream();
- if (debugFile != NULL)
- debugOut = *new NdbOut(*new FileOutputStream(debugFile));
- }
-#endif
}//execSIZEALT_REP()
-#ifdef VM_TRACE
-bool
-Dbdict::debugOutOn() const
-{
- SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
- return globalSignalLoggers.logMatch(DBDICT, mask);
-}
-#endif
-
/* ---------------------------------------------------------------- */
// Start phase signals sent by CNTR. We reply with NDB_STTORRY when
// we completed this phase.
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-07-25 05:48:32 +0000
@@ -74,20 +74,6 @@
#include <LockQueue.hpp>
#include <signaldata/CopyData.hpp>
-// Debug Macros
-
-#ifdef VM_TRACE
-#define D(x) \
- do { \
- if (!debugOutOn()) break; \
- debugOut << "DBDICT:" << __LINE__ << " " << x << endl;
\
- } while (0)
-#define V(x) " " << #x << ":" << (x)
-#else
-#define D(x)
-#undef V
-#endif
-
#ifdef DBDICT_C
/*--------------------------------------------------------------*/
@@ -3422,8 +3408,6 @@ public:
int checkSingleUserMode(Uint32 senderRef);
#ifdef VM_TRACE
- NdbOut debugOut;
- bool debugOutOn() const;
friend NdbOut& operator<<(NdbOut& out, const DictObject&);
friend NdbOut& operator<<(NdbOut& out, const ErrorInfo&);
friend NdbOut& operator<<(NdbOut& out, const SchemaOp&);
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-06-05 20:31:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-07-25 05:48:32 +0000
@@ -1773,6 +1773,16 @@ private:
bool c_sr_wait_to;
NdbNodeBitmask m_sr_nodes;
NdbNodeBitmask m_to_nodes;
+
+ // block instances
+public:
+ Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
+ ndbrequire(!tFragPtr.isNull());
+ Uint32 log_part_id = tFragPtr.p->m_log_part_id;
+ Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS;
+ return instanceKey;
+ }
+ Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
};
#if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-06-09 14:32:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-07-25 05:48:32 +0000
@@ -16871,3 +16871,16 @@ do_send:
}
#endif
+
+// block instances
+Uint32
+Dbdih::dihGetInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+ TabRecordPtr tTabPtr;
+ tTabPtr.i = tabId;
+ ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
+ FragmentstorePtr tFragPtr;
+ getFragstore(tTabPtr.p, fragId, tFragPtr);
+ Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
+ return instanceKey;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-07-26 05:13:40 +0000
@@ -400,6 +400,8 @@ class Dbtup;
* - LOG
*/
class Dblqh: public SimulatedBlock {
+ friend class DblqhProxy;
+
public:
enum LcpCloseState {
LCP_IDLE = 0,
@@ -2047,7 +2049,7 @@ public:
};
public:
- Dblqh(Block_context& ctx);
+ Dblqh(Block_context& ctx, Uint32 instanceNumber = 0);
virtual ~Dblqh();
void receive_keyinfo(Signal*, Uint32 * data, Uint32 len);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-07-26 05:13:40 +0000
@@ -165,8 +165,8 @@ void Dblqh::initRecords()
bat[1].bits.v = 5;
}//Dblqh::initRecords()
-Dblqh::Dblqh(Block_context& ctx):
- SimulatedBlock(DBLQH, ctx),
+Dblqh::Dblqh(Block_context& ctx, Uint32 instanceNumber):
+ SimulatedBlock(DBLQH, ctx, instanceNumber),
c_lcp_waiting_fragments(c_fragment_pool),
c_lcp_restoring_fragments(c_fragment_pool),
c_lcp_complete_fragments(c_fragment_pool),
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-06-14 07:33:14 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-07-23 11:36:53 +0000
@@ -9091,7 +9091,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConne
if(len < left)
{
- offset = len;
+ offset = tcPtrP->m_offset_current_keybuf + len;
}
else
{
=== added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,49 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "DblqhProxy.hpp"
+#include "Dblqh.hpp"
+
+DblqhProxy::DblqhProxy(Block_context& ctx) :
+ LocalProxy(DBLQH, ctx)
+{
+ // GSN_SEND_PACKED
+ addRecSignal(GSN_SEND_PACKED, &DblqhProxy::execSEND_PACKED);
+}
+
+DblqhProxy::~DblqhProxy()
+{
+}
+
+SimulatedBlock*
+DblqhProxy::newWorker(Uint32 instanceNo)
+{
+ return new Dblqh(m_ctx, instanceNo);
+}
+
+// GSN_SEND_PACKED
+
+void
+DblqhProxy::execSEND_PACKED(Signal* signal)
+{
+ Uint32 i;
+ for (i = 0; i < c_workers; i++) {
+ ndbrequire(c_worker[i] != 0);
+ Dblqh* dblqh = static_cast<Dblqh*>(c_worker[i]);
+ dblqh->execSEND_PACKED(signal);
+ }
+}
+
+BLOCK_FUNCTIONS(DblqhProxy)
=== added file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_DBLQH_PROXY_HPP
+#define NDB_DBLQH_PROXY_HPP
+
+#include <LocalProxy.hpp>
+
+class DblqhProxy : public LocalProxy {
+public:
+ DblqhProxy(Block_context& ctx);
+ virtual ~DblqhProxy();
+ BLOCK_DEFINES(DblqhProxy);
+
+ // GSN_SEND_PACKED
+ void execSEND_PACKED(Signal*);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-07-26 05:13:40 +0000
@@ -344,6 +344,7 @@ inline const Uint32* ALIGN_WORD(const vo
#endif
class Dbtup: public SimulatedBlock {
+friend class DbtupProxy;
friend class Suma;
public:
struct KeyReqStruct;
@@ -1651,7 +1652,7 @@ struct TupHeadInfo {
Uint32 terrorCode;
public:
- Dbtup(Block_context&, Pgman*);
+ Dbtup(Block_context&, Pgman*, Uint32 instanceNumber = 0);
virtual ~Dbtup();
/*
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-07-26 05:13:40 +0000
@@ -49,8 +49,8 @@ void Dbtup::initData()
cpackedListIndex = 0;
}//Dbtup::initData()
-Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
- : SimulatedBlock(DBTUP, ctx),
+Dbtup::Dbtup(Block_context& ctx, Pgman* pgman, Uint32 instanceNumber)
+ : SimulatedBlock(DBTUP, ctx, instanceNumber),
c_lqh(0),
m_pgman(this, pgman),
c_extent_hash(c_extent_pool),
=== added file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,47 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "DbtupProxy.hpp"
+#include "Dbtup.hpp"
+
+DbtupProxy::DbtupProxy(Block_context& ctx) :
+ LocalProxy(DBTUP, ctx)
+{
+ addRecSignal(GSN_SEND_PACKED, &DbtupProxy::execSEND_PACKED);
+}
+
+DbtupProxy::~DbtupProxy()
+{
+}
+
+SimulatedBlock*
+DbtupProxy::newWorker(Uint32 instanceNo)
+{
+ return new Dbtup(m_ctx, 0, instanceNo);
+}
+
+// GSN_SEND_PACKED
+
+void
+DbtupProxy::execSEND_PACKED(Signal* signal)
+{
+ Uint32 i;
+ for (i = 0; i < c_workers; i++) {
+ Dbtup* dbtup = static_cast<Dbtup*>(c_worker[i]);
+ dbtup->execSEND_PACKED(signal);
+ }
+}
+
+BLOCK_FUNCTIONS(DbtupProxy)
=== added file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_DBTUP_PROXY
+#define NDB_DBTUP_PROXY
+
+#include <LocalProxy.hpp>
+
+class DbtupProxy : public LocalProxy {
+public:
+ DbtupProxy(Block_context& ctx);
+ virtual ~DbtupProxy();
+ BLOCK_DEFINES(DbtupProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+
+ // GSN_SEND_PACKED
+ void execSEND_PACKED(Signal*);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2008-07-26 05:13:40 +0000
@@ -101,8 +101,9 @@
class Configuration;
class Dbtux : public SimulatedBlock {
+ friend class DbtuxProxy;
public:
- Dbtux(Block_context& ctx);
+ Dbtux(Block_context& ctx, Uint32 instanceNumber = 0);
virtual ~Dbtux();
// pointer to TUP instance in this thread
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-07-26 05:13:40 +0000
@@ -18,8 +18,8 @@
#include <signaldata/NodeStateSignalData.hpp>
-Dbtux::Dbtux(Block_context& ctx) :
- SimulatedBlock(DBTUX, ctx),
+Dbtux::Dbtux(Block_context& ctx, Uint32 instanceNumber) :
+ SimulatedBlock(DBTUX, ctx, instanceNumber),
c_tup(0),
c_descPageList(RNIL),
#ifdef VM_TRACE
=== added file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.cpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,34 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "DbtuxProxy.hpp"
+#include "Dbtux.hpp"
+
+DbtuxProxy::DbtuxProxy(Block_context& ctx) :
+ LocalProxy(DBTUX, ctx)
+{
+}
+
+DbtuxProxy::~DbtuxProxy()
+{
+}
+
+SimulatedBlock*
+DbtuxProxy::newWorker(Uint32 instanceNo)
+{
+ return new Dbtux(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(DbtuxProxy)
=== added file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxProxy.hpp 2008-07-26 05:13:40 +0000
@@ -0,0 +1,31 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef NDB_DBTUX_PROXY_HPP
+#define NDB_DBTUX_PROXY_HPP
+
+#include <LocalProxy.hpp>
+
+class DbtuxProxy : public LocalProxy {
+public:
+ DbtuxProxy(Block_context& ctx);
+ virtual ~DbtuxProxy();
+ BLOCK_DEFINES(DbtuxProxy);
+
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-04-09 10:41:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-07-25 05:48:32 +0000
@@ -335,10 +335,8 @@ no_odirect:
req->data.pageData[0] = m_page_ptr.i;
m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
- FsReadWriteReq::FixedLength + 1
-#ifdef VM_TRACE
- , true // This EXECUTE_DIRECT is thread safe
-#endif
+ FsReadWriteReq::FixedLength + 1,
+ 0 // wl4391_todo This EXECUTE_DIRECT is thread safe
);
retry:
Uint32 size = request->par.open.page_size;
=== modified file 'storage/ndb/src/kernel/blocks/restore.cpp'
--- a/storage/ndb/src/kernel/blocks/restore.cpp 2008-03-18 07:12:39 +0000
+++ b/storage/ndb/src/kernel/blocks/restore.cpp 2008-07-26 05:13:40 +0000
@@ -32,8 +32,8 @@
#define PAGES LCP_RESTORE_BUFFER
-Restore::Restore(Block_context& ctx) :
- SimulatedBlock(RESTORE, ctx),
+Restore::Restore(Block_context& ctx, Uint32 instanceNumber) :
+ SimulatedBlock(RESTORE, ctx, instanceNumber),
m_file_list(m_file_pool),
m_file_hash(m_file_pool)
{
=== modified file 'storage/ndb/src/kernel/blocks/restore.hpp'
--- a/storage/ndb/src/kernel/blocks/restore.hpp 2008-03-18 07:12:39 +0000
+++ b/storage/ndb/src/kernel/blocks/restore.hpp 2008-07-26 05:13:40 +0000
@@ -28,8 +28,10 @@
class Restore : public SimulatedBlock
{
+ friend class RestoreProxy;
+
public:
- Restore(Block_context& ctx);
+ Restore(Block_context& ctx, Uint32 instanceNumber = 0);
virtual ~Restore();
BLOCK_DEFINES(Restore);
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-04-23 13:52:37 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-07-25 05:48:32 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+ /* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -33,6 +33,7 @@
#include <LogLevel.hpp>
#include <EventLogger.hpp>
+#include <NdbEnv.h>
#include <NdbAutoPtr.hpp>
@@ -285,6 +286,56 @@ init_global_memory_manager(EmulatorData
return 0; // Success
}
+bool g_ndbMt = false;
+bool g_ndbMtLqh = false;
+
+static int
+get_multithreaded_config(EmulatorData& ed)
+{
+ // multithreaded is compiled in ndbd/ndbmtd
+ g_ndbMt = SimulatedBlock::isMultiThreaded();
+ if (!g_ndbMt)
+ return 0;
+
+ // mt lqh via environment during development
+ {
+ const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ g_ndbMtLqh = true;
+ if (!g_ndbMtLqh)
+ return 0;
+ }
+
+ const ndb_mgm_configuration_iterator * p =
+ ed.theConfiguration->getOwnConfigIterator();
+ if (p == 0)
+ {
+ abort();
+ }
+
+ Uint32 workers = 0;
+ Uint32 threads = 0;
+ if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) ||
+ ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads))
+ {
+ g_eventLogger->alert("Failed to get CFG_NDBMT parameters from "
+ "config, exiting.");
+ return -1;
+ }
+
+ ndbout << "NDBMT: workers=" << workers
+ << " threads=" << threads << endl;
+
+ assert(workers != 0 && workers <= MAX_NDBMT_WORKERS);
+ assert(threads != 0 && threads <= MAX_NDBMT_THREADS);
+ assert(workers % threads == 0);
+
+ globalData.ndbmtWorkers = workers;
+ globalData.ndbmtThreads = threads;
+ return 0;
+}
+
+
int main(int argc, char** argv)
{
NDB_INIT(argv[0]);
@@ -480,28 +531,41 @@ int main(int argc, char** argv)
globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
}
- globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
+ if (get_multithreaded_config(globalEmulatorData))
+ return -1;
- // Load blocks
- globalEmulatorData.theSimBlockList->load(globalEmulatorData);
-
- // Set thread concurrency for Solaris' light weight processes
- int status;
- status = NdbThread_SetConcurrencyLevel(30);
- assert(status == 0);
+ globalEmulatorData.theThreadConfig->init(&globalEmulatorData);
#ifdef VM_TRACE
- // Create a signal logger
+ // Create a signal logger before block constructors
char *buf= NdbConfig_SignalLogFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr(buf);
FILE * signalLog = fopen(buf, "a");
globalSignalLoggers.setOwnNodeId(globalData.ownId);
globalSignalLoggers.setOutputStream(signalLog);
-#if 0 // to log startup
- globalSignalLoggers.log(SignalLoggerManager::LogInOut, "BLOCK=DBDICT,DBDIH");
- globalData.testOn = 1;
+#if 1 // to log startup
+ { const char* p = NdbEnv_GetEnv("NDB_SIGNAL_LOG", (char*)0, 0);
+ if (p != 0) {
+ char buf[200];
+ snprintf(buf, sizeof(buf), "BLOCK=%s", p);
+ for (char* q = buf; *q != 0; q++) *q = toupper(toascii(*q));
+ globalSignalLoggers.log(SignalLoggerManager::LogInOut, buf);
+ globalData.testOn = 1;
+ assert(signalLog != 0);
+ fprintf(signalLog, "START\n");
+ fflush(signalLog);
+ }
+ }
#endif
#endif
+
+ // Load blocks
+ globalEmulatorData.theSimBlockList->load(globalEmulatorData);
+
+ // Set thread concurrency for Solaris' light weight processes
+ int status;
+ status = NdbThread_SetConcurrencyLevel(30);
+ assert(status == 0);
catchsigs(false);
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-07-25 05:48:32 +0000
@@ -999,6 +999,17 @@ Configuration::calcSizeAlt(ConfigValues
cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords);
}
+ // NDBMT
+ {
+ uint workers = 4;
+ uint threads = 2;
+ assert(workers <= MAX_NDBMT_WORKERS);
+ assert(threads <= MAX_NDBMT_THREADS);
+ assert(workers % threads == 0);
+ cfg.put(CFG_NDBMT_WORKERS, workers);
+ cfg.put(CFG_NDBMT_THREADS, threads);
+ }
+
m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
m_ownConfigIterator = ndb_mgm_create_configuration_iterator
(m_ownConfig, 0);
=== added file 'storage/ndb/src/kernel/vm/GlobalData.cpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.cpp 2008-07-25 05:48:32 +0000
@@ -0,0 +1,28 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <kernel_types.h>
+#include <SimulatedBlock.hpp>
+#include "GlobalData.hpp"
+
+SimulatedBlock*
+GlobalData::getBlock(BlockNumber blockNo, Uint32 instanceNo)
+{
+ SimulatedBlock* b = getBlock(blockNo);
+ if (b != 0 && instanceNo != 0)
+ b = b->getInstance(instanceNo);
+ return b;
+}
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-07-25 05:48:32 +0000
@@ -67,11 +67,16 @@ struct GlobalData {
Uint32 sendPackedActivated;
Uint32 activateSendPacked;
+
+ Uint32 ndbmtWorkers;
+ Uint32 ndbmtThreads;
GlobalData(){
theSignalId = 0;
theStartLevel = NodeState::SL_NOTHING;
theRestartFlag = perform_start;
+ ndbmtWorkers = 0;
+ ndbmtThreads = 0;
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
#endif
@@ -80,6 +85,7 @@ struct GlobalData {
void setBlock(BlockNumber blockNo, SimulatedBlock * block);
SimulatedBlock * getBlock(BlockNumber blockNo);
+ SimulatedBlock * getBlock(BlockNumber blockNo, Uint32 instanceNo);
void incrementWatchDogCounter(Uint32 place);
Uint32 * getWatchDogPtr();
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2008-04-25 16:53:41 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2008-07-25 05:48:32 +0000
@@ -36,13 +36,15 @@ libkernel_a_SOURCES = \
Rope.cpp \
ndbd_malloc.cpp \
ndbd_malloc_impl.cpp \
- Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp
+ Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
+ GlobalData.cpp
libsched_a_SOURCES = TimeQueue.cpp \
ThreadConfig.cpp \
FastScheduler.cpp \
TransporterCallback_nonmt.cpp \
- SimulatedBlock_nonmt.cpp
+ SimulatedBlock_nonmt.cpp \
+ dummy_nonmt.cpp
libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
TransporterCallback_mt.cpp \
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-07-25 05:48:32 +0000
@@ -24,6 +24,7 @@
#include "SimulatedBlock.hpp"
#include <NdbOut.hpp>
+#include <OutputStream.hpp>
#include <GlobalData.hpp>
#include <Emulator.hpp>
#include <WatchDog.hpp>
@@ -46,6 +47,8 @@
#include <AttributeDescriptor.hpp>
#include <NdbSqlUtil.hpp>
+#include "../blocks/dbdih/Dbdih.hpp"
+
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
@@ -56,10 +59,15 @@ extern EventLogger * g_eventLogger;
// Constructor, Destructor
//
SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
- struct Block_context & ctx)
+ struct Block_context & ctx,
+ Uint32 instanceNumber)
: theNodeId(globalData.ownId),
theNumber(blockNumber),
- theReference(numberToRef(blockNumber, globalData.ownId)),
+ theInstance(instanceNumber),
+ theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
+ theInstanceCount(0),
+ theInstanceList(0),
+ theMainInstance(0),
m_ctx(ctx),
m_global_page_pool(globalData.m_global_page_pool),
m_shared_page_pool(globalData.m_shared_page_pool),
@@ -68,13 +76,41 @@ SimulatedBlock::SimulatedBlock(BlockNumb
c_segmentedFragmentSendList(c_fragmentSendPool),
c_mutexMgr(* this),
c_counterMgr(* this)
+#ifdef VM_TRACE
+ ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
+#endif
{
m_threadId = 0;
m_watchDogCounter = NULL;
m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
NewVarRef = 0;
- globalData.setBlock(blockNumber, this);
+ SimulatedBlock* main = globalData.getBlock(blockNumber);
+
+ if (theInstance == 0) {
+ ndbrequire(main == 0);
+ main = this;
+ globalData.setBlock(blockNumber, main);
+ main->theInstanceCount = 1;
+ } else {
+ ndbrequire(main != 0);
+ ndbrequire(theInstance == main->theInstanceCount);
+ ndbrequire(theInstance < MaxInstances);
+ if (theInstance == 1) {
+ ndbrequire(main->theInstanceList == 0);
+ main->theInstanceList = new SimulatedBlock* [MaxInstances];
+ Uint32 i;
+ for (i = 0; i < MaxInstances; i++)
+ main->theInstanceList[i] = 0;
+ } else {
+ ndbrequire(main->theInstanceList != 0);
+ }
+ ndbrequire(main->theInstanceList[theInstance] == 0);
+ main->theInstanceList[theInstance] = this;
+ main->theInstanceCount = theInstance + 1;
+ }
+ theMainInstance = main;
+
c_fragmentIdCounter = 1;
c_fragSenderRunning = false;
@@ -130,6 +166,14 @@ SimulatedBlock::~SimulatedBlock()
#ifdef VM_TRACE
delete [] m_global_variables;
#endif
+
+ if (theInstanceList != 0) {
+ Uint32 i;
+ for (i = 0; i < theInstanceCount; i++)
+ delete theInstanceList[i];
+ delete [] theInstanceList;
+ }
+ theInstanceList = 0;
}
void
@@ -172,6 +216,15 @@ SimulatedBlock::addRecSignalImpl(GlobalS
theExecArray[gsn] = f;
}
+Uint32
+SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
+{
+ Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+ ndbrequire(dbdih != 0);
+ Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
+ return instanceKey;
+}
+
void
SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter)
@@ -509,9 +562,9 @@ SimulatedBlock::sendSignal(BlockReferenc
signal->header.theSendersBlockRef = sendBRef;
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendlocal(m_threadId, &signal->header, signal->theData, NULL);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendprioa(m_threadId, &signal->header, signal->theData, NULL);
#else
globalScheduler.execute(signal, jobBuffer, recBlock,
gsn);
@@ -607,9 +660,9 @@ SimulatedBlock::sendSignal(NodeReceiverG
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendlocal(m_threadId, &signal->header, signal->theData, NULL);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData, NULL);
+ sendprioa(m_threadId, &signal->header, signal->theData, NULL);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
#endif
@@ -716,10 +769,10 @@ SimulatedBlock::sendSignal(BlockReferenc
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData+length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData+length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock,
@@ -831,10 +884,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -944,10 +997,10 @@ SimulatedBlock::sendSignal(BlockReferenc
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1059,10 +1112,10 @@ SimulatedBlock::sendSignal(NodeReceiverG
* dst ++ = sections->m_ptr[2].i;
#ifdef NDBD_MULTITHREADED
if (jobBuffer == JBB)
- sendlocal(m_threadId, recBlock, &signal->header, signal->theData,
+ sendlocal(m_threadId, &signal->header, signal->theData,
signal->theData + length);
else
- sendprioa(m_threadId, recBlock, &signal->header, signal->theData,
+ sendprioa(m_threadId, &signal->header, signal->theData,
signal->theData + length);
#else
globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
@@ -1246,16 +1299,20 @@ SimulatedBlock::freeBat(){
}
const NewVARIABLE *
-SimulatedBlock::getBat(Uint16 blockNo){
+SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
+ if (sb != 0 && instanceNo != 0)
+ sb = sb->getInstance(instanceNo);
if(sb == 0)
return 0;
return sb->NewVarRef;
}
Uint16
-SimulatedBlock::getBatSize(Uint16 blockNo){
+SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
SimulatedBlock * sb = globalData.getBlock(blockNo);
+ if (sb != 0 && instanceNo != 0)
+ sb = sb->getInstance(instanceNo);
if(sb == 0)
return 0;
return sb->theBATSize;
@@ -1418,7 +1475,7 @@ SimulatedBlock::infoEvent(const char * m
signalT.header.theLength = ((len+3)/4)+1;
#ifdef NDBD_MULTITHREADED
- sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ sendlocal(m_threadId,
&signalT.header, signalT.theData, signalT.m_sectionPtrI);
#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1463,7 +1520,7 @@ SimulatedBlock::warningEvent(const char
signalT.header.theLength = ((len+3)/4)+1;
#ifdef NDBD_MULTITHREADED
- sendlocal(m_threadId, signalT.header.theReceiversBlockNumber,
+ sendlocal(m_threadId,
&signalT.header, signalT.theData, signalT.m_sectionPtrI);
#else
globalScheduler.execute(&signalT.header, JBB, signalT.theData,
@@ -1621,7 +1678,7 @@ SimulatedBlock::printTimes(FILE * output
}
}
sum = (sum + 500)/ 1000;
- fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum);
+ fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
fprintf(output, "\n");
fflush(output);
}
@@ -2387,7 +2444,7 @@ SimulatedBlock::setNodeInfo(NodeId nodeI
}
bool
-SimulatedBlock::isMultiThreaded() const
+SimulatedBlock::isMultiThreaded()
{
#ifdef NDBD_MULTITHREADED
return true;
@@ -2720,3 +2777,37 @@ SimulatedBlock::create_distr_key(Uint32
}
CArray<KeyDescriptor> g_key_descriptor_pool;
+
+#ifdef VM_TRACE
+bool
+SimulatedBlock::debugOutOn() const
+{
+ SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
+ return globalSignalLoggers.logMatch(number(), mask);
+}
+
+const char*
+SimulatedBlock::debugOutTag(char *buf, int line)
+{
+ char blockbuf[40];
+ char instancebuf[40];
+ char linebuf[40];
+ char timebuf[40];
+ sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
+ instancebuf[0] = 0;
+ if (instance() != 0)
+ sprintf(instancebuf, "/%u", instance());
+ sprintf(linebuf, " %d", line);
+ timebuf[0] = 0;
+#ifdef VM_TRACE_TIME
+ {
+ NDB_TICKS t = NdbTick_CurrentMillisecond();
+ uint s = (t / 1000) % 3600;
+ uint ms = t % 1000;
+ sprintf(timebuf, " - %u.%03u -", s, ms);
+ }
+#endif
+ sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
+ return buf;
+}
+#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-07-25 05:48:32 +0000
@@ -56,6 +56,19 @@
#include "ndbd_malloc_impl.hpp"
#include <blocks/record_types.hpp>
+#ifdef VM_TRACE
+#define D(x) \
+ do { \
+ char buf[200]; \
+ if (!debugOutOn()) break; \
+ debugOut << debugOutTag(buf, __LINE__) << x << endl; \
+ } while (0)
+#define V(x) " " << #x << ":" << (x)
+#else
+#define D(x)
+#undef V
+#endif
+
/**
* Something for filesystem access
*/
@@ -108,7 +121,8 @@ protected:
* Constructor
*/
SimulatedBlock(BlockNumber blockNumber,
- struct Block_context & ctx);
+ struct Block_context & ctx,
+ Uint32 instanceNumber = 0);
/**********************************************************
* Handling of execFunctions
@@ -125,12 +139,34 @@ public:
*/
inline void executeFunction(GlobalSignalNumber gsn, Signal* signal);
+ /* Multiple block instances */
+ Uint32 instance() const {
+ return theInstance;
+ }
+ Uint32 getWorkerCount() const {
+ ndbrequire(theInstance == 0); // valid only on main instance
+ ndbrequire(theInstanceCount >= 1);
+ return theInstanceCount - 1;
+ }
+ SimulatedBlock* getInstance(Uint32 instanceNumber) {
+ ndbrequire(theInstance == 0);
+ if (instanceNumber == 0)
+ return this;
+ if (instanceNumber < theInstanceCount) {
+ ndbrequire(theInstanceList != 0);
+ return theInstanceList[instanceNumber];
+ }
+ return 0;
+ }
+ Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
+
/* Setup state of a block object for executing in a particular thread. */
void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter);
/* For multithreaded ndbd, get the id of owning thread. */
uint32 getThreadId() const { return m_threadId; }
- bool isMultiThreaded() const;
+ static bool isMultiThreaded();
+
public:
typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
Uint32 callbackData,
@@ -210,14 +246,15 @@ protected:
Uint32 length,
SectionHandle* sections) const;
+ /*
+ * Instance defaults to instance of sender. Using explicit
+ * instance argument asserts that the call is thread-safe.
+ */
void EXECUTE_DIRECT(Uint32 block,
Uint32 gsn,
Signal* signal,
- Uint32 len
-#ifdef VM_TRACE
- , bool is_thread_safe = false
-#endif
-);
+ Uint32 len,
+ Uint32 givenInstanceNo = ZNIL);
class SectionSegmentPool& getSectionSegmentPool();
void releaseSections(struct SectionHandle&);
@@ -398,8 +435,18 @@ private:
void signal_error(Uint32, Uint32, Uint32, const char*, int) const ;
const NodeId theNodeId;
const BlockNumber theNumber;
+ const Uint32 theInstance;
const BlockReference theReference;
/*
+ * Instance 0 is the main instance. It creates/owns other instances.
+ * In MT LQH main instance is the LQH proxy and the others ("workers")
+ * are real LQHs run by multiple threads.
+ */
+ enum { MaxInstances = 1 + MAX_NDBMT_WORKERS };
+ Uint32 theInstanceCount; // set in main
+ SimulatedBlock** theInstanceList; // set in main, indexed by instance
+ SimulatedBlock* theMainInstance; // set in all
+ /*
Thread id currently executing this block.
Not used in singlethreaded ndbd.
*/
@@ -416,8 +463,10 @@ protected:
Block_context m_ctx;
NewVARIABLE* allocateBat(int batSize);
void freeBat();
- static const NewVARIABLE* getBat (BlockNumber blockNo);
- static Uint16 getBatSize(BlockNumber blockNo);
+ static const NewVARIABLE* getBat (BlockNumber blockNo,
+ Uint32 instanceNo = 0);
+ static Uint16 getBatSize(BlockNumber blockNo,
+ Uint32 instanceNo = 0);
static BlockReference calcTcBlockRef (NodeId aNode);
static BlockReference calcLqhBlockRef (NodeId aNode);
@@ -629,6 +678,12 @@ public:
void clear_global_variables();
void init_globals_list(void ** tmp, size_t cnt);
#endif
+
+#ifdef VM_TRACE
+ NdbOut debugOut;
+ bool debugOutOn() const;
+ const char* debugOutTag(char* buf, int line);
+#endif
};
inline
@@ -806,17 +861,32 @@ void
SimulatedBlock::EXECUTE_DIRECT(Uint32 block,
Uint32 gsn,
Signal* signal,
- Uint32 len
-#ifdef VM_TRACE
- , bool is_thread_safe
-#endif
-)
+ Uint32 len,
+ Uint32 givenInstanceNo)
{
signal->setLength(len);
+ SimulatedBlock* b = globalData.getBlock(block);
+ ndbassert(b != 0);
+ /**
+ * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
+ * (unlike sendSignal) is generally not thread-safe.
+ * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
+ * unless caller explicitly marks it as being thread safe (eg NDBFS),
+ * by using an explicit instance argument.
+ * By default instance of sender is used. This is automatically thread-safe
+ * for worker instances (instance != 0).
+ */
+ Uint32 instanceNo = givenInstanceNo;
+ if (instanceNo == ZNIL)
+ instanceNo = instance();
+ if (instanceNo != 0)
+ b = b->getInstance(instanceNo);
+ ndbassert(b != 0);
+ ndbassert(givenInstanceNo != ZNIL || b->getThreadId() == getThreadId());
#ifdef VM_TRACE
if(globalData.testOn){
signal->header.theVerId_signalNumber = gsn;
- signal->header.theReceiversBlockNumber = block;
+ signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
signal->header.theSendersBlockRef = reference();
globalSignalLoggers.executeDirect(signal->header,
0, // in
@@ -824,14 +894,6 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
globalData.ownId);
}
#endif
- SimulatedBlock* b = globalData.getBlock(block);
- /**
- * In multithreaded NDB, blocks run in different threads, and EXECUTE_DIRECT
- * (unlike sendSignal() is generally not thread-safe.
- * So only allow EXECUTE_DIRECT between blocks that run in the same thread,
- * unless caller explicitly marks it as being thread safe (eg NDBFS).
- */
- ndbassert(is_thread_safe || b->getThreadId() == getThreadId());
#ifdef VM_TRACE_TIME
Uint32 us1, us2;
Uint64 ms1, ms2;
@@ -854,7 +916,7 @@ SimulatedBlock::EXECUTE_DIRECT(Uint32 bl
#ifdef VM_TRACE
if(globalData.testOn){
signal->header.theVerId_signalNumber = gsn;
- signal->header.theReceiversBlockNumber = block;
+ signal->header.theReceiversBlockNumber = numberToBlock(block, instanceNo);
signal->header.theSendersBlockRef = reference();
globalSignalLoggers.executeDirect(signal->header,
1, // out
@@ -875,6 +937,8 @@ SimulatedBlock::addTime(Uint32 gsn, Uint
inline
void
SimulatedBlock::subTime(Uint32 gsn, Uint64 time){
+ // wl4391_todo got 0xf3f3f3f3 here on GSN_UPGRADE_PROTOCOL_ORD...
+ if (gsn < 0xffff)
m_timeTrace[gsn].sub += time;
}
#endif
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-25 05:48:32 +0000
@@ -191,7 +191,14 @@ TransporterCallbackKernel::deliver_signa
const Uint32 secCount = header->m_noOfSections;
const Uint32 length = header->theLength;
+
+ /*
+ * Strip instance bits if not multithreaded. This is also
+ * done in versions prior to MT LQH, to simplify online upgrade.
+ */
+#ifndef NDBD_MULTITHREADED
header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
+#endif
#ifdef TRACE_DISTRIBUTED
ndbout_c("recv: %s(%d) from (%s, %d)",
@@ -232,10 +239,10 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+ sendlocal(receiverThreadId,
header, theData, secPtrI);
else
- sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
header, theData, secPtrI);
#endif
@@ -271,10 +278,10 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId, header->theReceiversBlockNumber,
+ sendlocal(receiverThreadId,
header, theData, NULL);
else
- sendprioa(receiverThreadId, header->theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
header, theData, NULL);
#endif
@@ -383,7 +390,7 @@ TransporterCallbackKernel::reportError(N
#else
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
@@ -436,7 +443,7 @@ TransporterCallbackKernel::reportReceive
#else
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
}
@@ -463,7 +470,7 @@ TransporterCallbackKernel::reportConnect
#else
signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
}
@@ -495,7 +502,7 @@ TransporterCallbackKernel::reportDisconn
#else
signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(receiverThreadId, signal.header.theReceiversBlockNumber,
+ sendprioa(receiverThreadId,
&signalT.header, signalT.theData, NULL);
#endif
=== added file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-07-25 05:48:32 +0000
@@ -0,0 +1,14 @@
+#include <assert.h>
+#include <ndb_types.h>
+
+void
+add_thr_map(Uint32, Uint32, Uint32)
+{
+ assert(false);
+}
+
+void
+add_worker_thr_map(Uint32, Uint32)
+{
+ assert(false);
+}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-08-02 10:35:51 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-08-03 18:22:23 +0000
@@ -43,9 +43,14 @@ static const Uint32 MAX_SIGNALS_PER_JB =
//#define NDB_MT_LOCK_TO_CPU
-static const Uint32 NUM_THREADS = 3;
-static const Uint32 RECEIVER_THREAD_NO = 2;
-#define MAX_THREADS 4
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS)
+#define NUM_MAIN_THREADS 2 // except receiver
+#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1)
+
+static Uint32 ndbmt_workers = 0;
+static Uint32 ndbmt_threads = 0;
+static Uint32 num_threads = 0;
+static Uint32 receiver_thread_no = 0;
#ifdef NDBMTD_X86
static inline
@@ -456,8 +461,7 @@ struct thr_data
thr_wait m_waiter;
unsigned m_thr_no;
- struct SimulatedBlock* m_blocks[NO_OF_BLOCKS];
-
+
Uint64 m_time;
struct thr_tq m_tq;
@@ -499,9 +503,9 @@ struct thr_data
struct thr_jb_read_state m_read_states[MAX_THREADS];
/* Jam buffers for making trace files at crashes. */
- EmulatedJamBuffer *m_jam;
+ EmulatedJamBuffer m_jam;
/* Watchdog counter for this thread. */
- Uint32 *m_watchdog_counter;
+ Uint32 m_watchdog_counter;
/* Signal delivery statistics. */
Uint32 m_prioa_count;
Uint32 m_prioa_size;
@@ -711,7 +715,7 @@ struct trp_callback : public Transporter
Uint32 total_bytes(NodeId node) const
{
Uint32 total = 0;
- for (Uint32 i = 0; i < NUM_THREADS; i++)
+ for (Uint32 i = 0; i < num_threads; i++)
total += m_thr_buffers[i]->m_buffers[node].used_bytes();
return total;
}
@@ -919,7 +923,7 @@ scan_queue(struct thr_data* selfptr, Uin
* If they are frequent, we may want to optimize, as sending one prio A
* signal is somewhat expensive compared to sending one prio B.
*/
- sendprioa(thr_no, s->theReceiversBlockNumber, s, data,
+ sendprioa(thr_no, s, data,
data + s->theLength);
* (page + pos) = free;
free = idx;
@@ -1202,7 +1206,8 @@ thr_send_buf::updateWritePtr(NodeId node
trp_callback::trp_callback()
{
- for (Uint32 i = 0; i < NUM_THREADS; i++)
+ // number of threads not yet set so use MAX_THREADS
+ for (Uint32 i = 0; i < MAX_THREADS; i++)
m_thr_buffers[i] = new thr_send_buf(this, i, &g_thr_repository.m_free_list);
for (int i = 0; i < MAX_NTRANSPORTERS; i++)
m_send_thr[i] = ~(Uint32)0;
@@ -1223,7 +1228,7 @@ trp_callback::reportSendLen(NodeId nodeI
signal.theData[2] = (bytes/count);
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
signal.header.theReceiversBlockNumber = CMVMI;
- sendprioa(m_send_thr[nodeId], signal.header.theReceiversBlockNumber,
+ sendprioa(m_send_thr[nodeId],
&signalT.header, signalT.theData, NULL);
}
@@ -1272,7 +1277,7 @@ trp_callback::get_bytes_to_send_iovec(No
if (max_iovecs == 0)
return 0;
- for (Uint32 thr = 0; thr < NUM_THREADS && iovecs < max_iovecs; thr++)
+ for (Uint32 thr = 0; thr < num_threads && iovecs < max_iovecs; thr++)
{
thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
thr_send_buf::page *pg = b->m_current_page;
@@ -1357,7 +1362,7 @@ trp_callback::bytes_sent(NodeId node, co
break; // Found it
}
- curr_thr = (curr_thr + 1) % NUM_THREADS;
+ curr_thr = (curr_thr + 1) % num_threads;
#ifdef VM_TRACE
assert(curr_thr != start_thr); // Sent data was not in buffer
#endif
@@ -1390,7 +1395,7 @@ trp_callback::bytes_sent(NodeId node, co
bool
trp_callback::has_data_to_send(NodeId node)
{
- for (Uint32 thr = 0; thr < NUM_THREADS; thr++)
+ for (Uint32 thr = 0; thr < num_threads; thr++)
{
thr_send_buf::send_buffer *b = m_thr_buffers[thr]->m_buffers + node;
thr_send_buf::page *pg = b->m_current_page;
@@ -1577,15 +1582,14 @@ inline
void
sendpacked(struct thr_data* selfptr, Signal* signal, Uint32 thr_no)
{
- SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
- SimulatedBlock* b_lqh = * (blockptr + DBLQH);
- SimulatedBlock* b_tc = * (blockptr + DBTC);
- SimulatedBlock* b_tup = * (blockptr + DBTUP);
- if (b_lqh)
+ SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
+ SimulatedBlock* b_tc = globalData.getBlock(DBTC);
+ SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
+ if (thr_no == 1)
b_lqh->executeFunction(GSN_SEND_PACKED, signal);
- if (b_tc)
+ if (thr_no == 0)
b_tc->executeFunction(GSN_SEND_PACKED, signal);
- if (b_tup)
+ if (thr_no == 1)
b_tup->executeFunction(GSN_SEND_PACKED, signal);
}
@@ -1736,7 +1740,6 @@ execute_signals(thr_data *selfptr, thr_j
r->m_write_pos :
q->m_buffers[read_index]->m_len);
thr_job_buffer *read_buffer = r->m_read_buffer;
- SimulatedBlock** blockptr = selfptr->m_blocks - MIN_BLOCK_NO;
while (num_signals < max_signals)
{
@@ -1779,9 +1782,11 @@ execute_signals(thr_data *selfptr, thr_j
Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
if(siglen>16)
__builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
- Uint32 bno = s->theReceiversBlockNumber;
+ Uint32 bno = blockToMain(s->theReceiversBlockNumber);
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
Uint32 gsn = s->theVerId_signalNumber;
- SimulatedBlock * block = blockptr[bno];
+ SimulatedBlock * main = globalData.getBlock(bno);
+ SimulatedBlock * block = main->getInstance(ino);
*watchDogCounter = 1;
/* Must update original buffer so signal dump will see it. */
s->theSignalId = (*signalIdCounter)++;
@@ -1799,6 +1804,16 @@ execute_signals(thr_data *selfptr, thr_j
/* Update just before execute so signal dump can know how far we are. */
r->m_read_pos = read_pos;
+#ifdef VM_TRACE
+ if (globalData.testOn) { //wl4391_todo segments
+ globalSignalLoggers.executeSignal(*s,
+ 0,
+ &sig->theData[0],
+ globalData.ownId);
+
+ }
+#endif
+
block->executeFunction(gsn, sig);
num_signals++;
@@ -1843,43 +1858,91 @@ run_job_buffers(thr_data *selfptr, Signa
return signal_count;
}
-static inline Uint32
-block2ThreadId(Uint32 block)
+struct thr_map_entry {
+ enum { NULL_THR_NO = 0xFFFF };
+ Uint32 thr_no;
+ SimulatedBlock* block;
+ thr_map_entry() : thr_no(NULL_THR_NO), block(0) {}
+};
+
+static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
+
+void
+add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no)
{
- /**
- * CMVMIis special, it runs in the receive thread due to calling directly
- * into TransporterRegistry.
- */
- if (block == CMVMI)
- return 2;
+ Uint32 index = block - MIN_BLOCK_NO;
+ assert(index < NO_OF_BLOCKS);
+ assert(instance < MAX_BLOCK_INSTANCES);
- /*
- * Block assignment:
- * 0 BACKUP LOCAL
- * 1 DBTC GLOBAL
- * 2 DBDIH GLOBAL
- * 3 DBLQH LOCAL
- * 4 DBACC LOCAL
- * 5 DBTUP LOCAL
- * 6 DBDICT GLOBAL
- * 7 NDBCNTR GLOBAL
- * 8 QMGR GLOBAL
- * 9 NDBFS GLOBAL
- * 10 CMVMI [Receive thread]
- * 11 TRIX GLOBAL
- * 12 DBUTIL GLOBAL
- * 13 SUMA LOCAL
- * 14 DBTUX LOCAL
- * 15 TSMAN LOCAL
- * 16 LGMAN LOCAL
- * 17 PGMAN LOCAL
- * 18 RESTORE LOCAL
- *
- * Thread 0 is for global, thread 1 for locals.
- */
- static const Uint32 locals = 0x7e039;
+ SimulatedBlock* main = globalData.getBlock(block);
+ assert(main != 0);
+ SimulatedBlock* b = main->getInstance(instance);
+ assert(b != 0);
+
+ assert(thr_no < num_threads);
+ struct thr_repository* rep = &g_thr_repository;
+ thr_data* thr_ptr = rep->m_thread + thr_no;
+
+ b->assignToThread(thr_no, &thr_ptr->m_jam,
&thr_ptr->m_watchdog_counter);
+
+ thr_map_entry& entry = thr_map[index][instance];
+ assert(entry.thr_no == thr_map_entry::NULL_THR_NO);
+ entry.thr_no = thr_no;
+ entry.block = b;
+}
+
+// static assignment of main instances before first signal
+static void
+add_main_thr_map()
+{
+ static int done = 0;
+ if (done++)
+ return;
+
+ const Uint32 thr_GLOBAL = 0;
+ const Uint32 thr_LOCAL = 1;
+ const Uint32 thr_RECEIVER = receiver_thread_no;
+
+ add_thr_map(BACKUP, 0, thr_LOCAL);
+ add_thr_map(DBTC, 0, thr_GLOBAL);
+ add_thr_map(DBDIH, 0, thr_GLOBAL);
+ add_thr_map(DBLQH, 0, thr_LOCAL);
+ add_thr_map(DBACC, 0, thr_LOCAL);
+ add_thr_map(DBTUP, 0, thr_LOCAL);
+ add_thr_map(DBDICT, 0, thr_GLOBAL);
+ add_thr_map(NDBCNTR, 0, thr_GLOBAL);
+ add_thr_map(QMGR, 0, thr_GLOBAL);
+ add_thr_map(NDBFS, 0, thr_GLOBAL);
+ add_thr_map(CMVMI, 0, thr_RECEIVER);
+ add_thr_map(TRIX, 0, thr_GLOBAL);
+ add_thr_map(DBUTIL, 0, thr_GLOBAL);
+ add_thr_map(SUMA, 0, thr_LOCAL);
+ add_thr_map(DBTUX, 0, thr_LOCAL);
+ add_thr_map(TSMAN, 0, thr_LOCAL);
+ add_thr_map(LGMAN, 0, thr_LOCAL);
+ add_thr_map(PGMAN, 0, thr_LOCAL);
+ add_thr_map(RESTORE, 0, thr_LOCAL);
+}
+
+// workers added by LocalProxy
+void
+add_worker_thr_map(Uint32 block, Uint32 instance)
+{
+ assert(instance != 0);
+ Uint32 i = instance - 1;
+ Uint32 thr_no = NUM_MAIN_THREADS + i % ndbmt_threads;
+ add_thr_map(block, instance, thr_no);
+}
+
+static inline Uint32
+block2ThreadId(Uint32 block, Uint32 instance)
+{
assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
- return (locals >> (block - MIN_BLOCK_NO)) & 1;
+ Uint32 index = block - MIN_BLOCK_NO;
+ assert(instance < MAX_BLOCK_INSTANCES);
+ const thr_map_entry& entry = thr_map[index][instance];
+ assert(entry.thr_no < num_threads);
+ return entry.thr_no;
}
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
@@ -1901,7 +1964,7 @@ static void reportSignalStats(Uint32 sel
s->theData[4] = b_count;
s->theData[5] = b_size;
/* ToDo: need this really be prio A like in old code? */
- sendlocal(self, s->header.theReceiversBlockNumber, &s->header, s->theData,
+ sendlocal(self, &s->header, s->theData,
NULL);
}
@@ -1923,18 +1986,16 @@ update_sched_stats(thr_data *selfptr)
}
static void
-init_thread(thr_data *selfptr, EmulatedJamBuffer *jam, Uint32 *watchDogCounter)
+init_thread(thr_data *selfptr)
{
selfptr->m_waiter.init();
- jam->theEmulatedJamIndex = 0;
- jam->theEmulatedJamBlockNumber = 0;
- memset(jam->theEmulatedJam, 0, sizeof(jam->theEmulatedJam));
- NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jam);
- selfptr->m_jam = jam;
+ selfptr->m_jam.theEmulatedJamIndex = 0;
+ selfptr->m_jam.theEmulatedJamBlockNumber = 0;
+ memset(selfptr->m_jam.theEmulatedJam, 0, sizeof(selfptr->m_jam.theEmulatedJam));
+ NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
- selfptr->m_watchdog_counter = watchDogCounter;
unsigned thr_no = selfptr->m_thr_no;
- globalEmulatorData.theWatchDog->registerWatchedThread(watchDogCounter,
+
globalEmulatorData.theWatchDog->registerWatchedThread(&selfptr->m_watchdog_counter,
thr_no);
NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
@@ -1954,22 +2015,6 @@ init_thread(thr_data *selfptr, EmulatedJ
#endif
}
-/* Assign to this thread all blocks that will run in this thread. */
-static void
-grab_threads(thr_data *selfptr, EmulatedJamBuffer *thread_jam,
- Uint32 *watchDogCounter)
-{
- for (Uint32 i = 0; i < NO_OF_BLOCKS; i++)
- {
- if (block2ThreadId(i + MIN_BLOCK_NO) == selfptr->m_thr_no)
- {
- require(selfptr->m_blocks[i] != 0);
- selfptr->m_blocks[i]->assignToThread(selfptr->m_thr_no, thread_jam,
- watchDogCounter);
- }
- }
-}
-
/**
* Align signal buffer for better cache performance.
* Also skew it a litte for each thread to avoid cache pollution.
@@ -2011,13 +2056,12 @@ mt_receiver_thread_main(void *thr_arg)
struct thr_data* selfptr = (struct thr_data *)thr_arg;
unsigned thr_no = selfptr->m_thr_no;
EmulatedJamBuffer thread_jam;
- Uint32 watchDogCounter;
+ Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
- init_thread(selfptr, &thread_jam, &watchDogCounter);
+ init_thread(selfptr);
receiverThreadId = thr_no;
signal = aligned_signal(signal_buf, thr_no);
- grab_threads(selfptr, &thread_jam, &watchDogCounter);
while (globalData.theRestartFlag != perform_stop)
{
@@ -2075,19 +2119,17 @@ mt_job_thread_main(void *thr_arg)
struct timespec nowait;
nowait.tv_sec = 0;
nowait.tv_nsec = 10 * 1000000;
- EmulatedJamBuffer thread_jam;
- Uint32 watchDogCounter;
Uint32 thrSignalId = 0;
struct thr_repository* rep = &g_thr_repository;
struct thr_data* selfptr = (struct thr_data *)thr_arg;
- init_thread(selfptr, &thread_jam, &watchDogCounter);
+ init_thread(selfptr);
+ EmulatedJamBuffer& thread_jam = selfptr->m_jam;
+ Uint32& watchDogCounter = selfptr->m_watchdog_counter;
unsigned thr_no = selfptr->m_thr_no;
signal = aligned_signal(signal_buf, thr_no);
- grab_threads(selfptr, &thread_jam, &watchDogCounter);
-
/* Avoid false watchdog alarms caused by race condition. */
watchDogCounter = 1;
@@ -2103,6 +2145,7 @@ mt_job_thread_main(void *thr_arg)
&watchDogCounter, &thrSignalId);
watchDogCounter = 1;
+ signal->header.m_noOfSections = 0; /* valgrind */
sendpacked(selfptr, signal, thr_no);
if (sum)
@@ -2140,16 +2183,23 @@ mt_job_thread_main(void *thr_arg)
}
void
-sendlocal(Uint32 self, Uint32 block, const SignalHeader *s, const Uint32 *data,
+sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
const Uint32 secPtr[3])
{
+ Uint32 block = blockToMain(s->theReceiversBlockNumber);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+ // map on receiver side
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+
/*
* Max number of signals to put into job buffer before flushing the buffer
* to the other thread.
* This parameter found to be reasonable by benchmarking.
*/
- Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == RECEIVER_THREAD_NO ? 2 : 20);
- Uint32 dst = block2ThreadId(block);
+ Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no ? 2 : 20);
+ Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
@@ -2164,15 +2214,23 @@ sendlocal(Uint32 self, Uint32 block, con
selfptr->m_next_buffer = seize_buffer(rep, self, false);
}
- if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
+ // wl4391_todo
+ if (true || w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
flush_write_state(dst, q, w);
}
void
-sendprioa(Uint32 self, Uint32 block, const SignalHeader *s, const uint32 *data,
+sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
const Uint32 secPtr[3])
{
- Uint32 dst = block2ThreadId(block);
+ Uint32 block = blockToMain(s->theReceiversBlockNumber);
+ Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+
+ // map on receiver side
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+
+ Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
struct thr_data * selfptr = rep->m_thread + self;
struct thr_data *dstptr = rep->m_thread + dst;
@@ -2260,17 +2318,26 @@ sendprioa_STOP_FOR_CRASH(Uint32 dst)
static thr_job_buffer dummy_buffer;
/*
- * Currently we have three threads with fixed block assignment.
+ * Before we had three main threads with fixed block assignment.
+ * Now there is also worker instances (we send to LQH instance).
*/
- assert(dst == 0 || dst == 1 || dst == 2); // ToDo when/if more threads.
- Uint32 bno = 0;
+ Uint32 main = 0;
+ Uint32 instance = 0;
if (dst == 0)
- bno = NDBCNTR;
+ main = NDBCNTR;
else if (dst == 1)
- bno = DBLQH;
- else if (dst == 2)
- bno = CMVMI;
- assert(block2ThreadId(bno) == dst);
+ main = DBLQH;
+ else if (dst >= NUM_MAIN_THREADS && dst < NUM_MAIN_THREADS +
ndbmt_threads)
+ {
+ main = DBLQH;
+ instance = dst - NUM_MAIN_THREADS + 1;
+ }
+ else if (dst == receiver_thread_no)
+ main = CMVMI;
+ else
+ assert(false);
+ Uint32 bno = numberToBlock(main, instance);
+ assert(block2ThreadId(main, instance) == dst);
struct thr_data * dstptr = rep->m_thread + dst;
memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2472,8 +2539,6 @@ thr_init(struct thr_repository* rep, str
queue_init(&selfptr->m_tq);
- selfptr->m_jam = NULL;
-
selfptr->m_prioa_count = 0;
selfptr->m_prioa_size = 0;
selfptr->m_priob_count = 0;
@@ -2551,7 +2616,15 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ::rep_init(&g_thr_repository, NUM_THREADS, emulatorData->m_mem_manager);
+ ndbmt_workers = globalData.ndbmtWorkers;
+ ndbmt_threads = globalData.ndbmtThreads;
+ num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
+ assert(num_threads <= MAX_THREADS);
+ receiver_thread_no = num_threads - 1;
+
+ ndbout << "NDBMT: num_threads=" << num_threads << endl;
+
+ ::rep_init(&g_thr_repository, num_threads, emulatorData->m_mem_manager);
}
void ThreadConfig::ipControlLoop(Uint32 thread_index)
@@ -2559,25 +2632,19 @@ void ThreadConfig::ipControlLoop(Uint32
unsigned int i;
unsigned int thr_no;
struct thr_repository* rep = &g_thr_repository;
- NdbThread *threads[NUM_THREADS];
+ NdbThread *threads[MAX_THREADS];
+
+ add_main_thr_map();
/*
* Start threads for all execution threads, except for the receiver
* thread, which runs in the main thread.
*/
- for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- for (i = 0; i<NO_OF_BLOCKS; i++)
- {
- if (block2ThreadId(i + MIN_BLOCK_NO) == thr_no)
- {
- rep->m_thread[thr_no].m_blocks[i] =
- globalData.getBlock(i + MIN_BLOCK_NO);
- }
- }
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
- if (thr_no == RECEIVER_THREAD_NO)
+ if (thr_no == receiver_thread_no)
continue; // Will run in the main thread.
/*
* The NdbThread_Create() takes void **, but that is cast to void * when
@@ -2592,12 +2659,12 @@ void ThreadConfig::ipControlLoop(Uint32
}
/* Now run the main loop for thread 0 directly. */
- mt_receiver_thread_main(&(rep->m_thread[RECEIVER_THREAD_NO]));
+ mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
/* Wait for all threads to shutdown. */
- for (thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- if (thr_no == RECEIVER_THREAD_NO)
+ if (thr_no == receiver_thread_no)
continue;
void *dummy_return_status;
NdbThread_WaitFor(threads[thr_no], &dummy_return_status);
@@ -2608,6 +2675,8 @@ void ThreadConfig::ipControlLoop(Uint32
int
ThreadConfig::doStart(NodeState::StartLevel startLevel)
{
+ add_main_thr_map();
+
SignalT<3> signalT;
memset(&signalT.header, 0, sizeof(SignalHeader));
@@ -2621,7 +2690,7 @@ ThreadConfig::doStart(NodeState::StartLe
StartOrd * const startOrd = (StartOrd *)&signalT.theData[0];
startOrd->restartInfo = 0;
- senddelay(block2ThreadId(CMVMI), &signalT.header, 1);
+ senddelay(block2ThreadId(CMVMI, 0), &signalT.header, 1);
return 0;
}
@@ -2656,7 +2725,7 @@ Uint32
FastScheduler::traceDumpGetNumThreads()
{
/* The last thread is only for receiver -> no trace file. */
- return NUM_THREADS;
+ return num_threads;
}
bool
@@ -2664,7 +2733,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
const Uint32 * & thrdTheEmulatedJam,
Uint32 & thrdTheEmulatedJamIndex)
{
- if (thr_no >= NUM_THREADS)
+ if (thr_no >= num_threads)
return false;
#ifdef NO_EMULATED_JAM
@@ -2672,7 +2741,7 @@ FastScheduler::traceDumpGetJam(Uint32 th
thrdTheEmulatedJam = NULL;
thrdTheEmulatedJamIndex = 0;
#else
- const EmulatedJamBuffer *jamBuffer = g_thr_repository.m_thread[thr_no].m_jam;
+ const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
@@ -2708,7 +2777,7 @@ FastScheduler::traceDumpPrepare()
pthread_mutex_lock(&g_thr_repository.stop_for_crash_mutex);
g_thr_repository.stopped_threads = 0;
- for (Uint32 thr_no = 0; thr_no < NUM_THREADS; thr_no++)
+ for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
{
if (selfptr != NULL && selfptr->m_thr_no == thr_no)
{
@@ -2766,7 +2835,7 @@ void
FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
{
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
- const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
+ thr_data *selfptr = reinterpret_cast<thr_data *>(value);
const thr_repository *rep = &g_thr_repository;
/*
* The selfptr might be NULL, or pointer to thread that is doing the crash
@@ -2775,7 +2844,7 @@ FastScheduler::dumpSignalMemory(Uint32 t
*/
Uint32 *watchDogCounter;
if (selfptr)
- watchDogCounter = selfptr->m_watchdog_counter;
+ watchDogCounter = &selfptr->m_watchdog_counter;
else
watchDogCounter = NULL;
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-07-25 05:48:32 +0000
@@ -14,9 +14,13 @@
*/
extern Uint32 receiverThreadId;
-void sendlocal(Uint32 self, Uint32 block, const struct SignalHeader *s,
+/* Assign block instance to thread */
+void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
+void add_worker_thr_map(Uint32 block, Uint32 instance);
+
+void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
-void sendprioa(Uint32 self, Uint32 block, const struct SignalHeader *s,
+void sendprioa(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
void senddelay(Uint32 thr_no, const struct SignalHeader*, Uint32 delay);
void mt_execSTOP_FOR_CRASH();
=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp 2008-03-13 14:09:32 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp 2008-06-09 18:30:58 +0000
@@ -218,19 +218,20 @@ ndb_mgm_set_name(NdbMgmHandle handle, co
extern "C"
int
-ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
+ndb_mgm_set_connectstring(NdbMgmHandle handle, const char* connect_string)
{
DBUG_ENTER("ndb_mgm_set_connectstring");
DBUG_PRINT("info", ("handle: 0x%lx", (long) handle));
handle->cfg.~LocalConfig();
new (&(handle->cfg)) LocalConfig;
- if (!handle->cfg.init(mgmsrv, 0) ||
+ if (!handle->cfg.init(connect_string, 0) ||
handle->cfg.ids.size() == 0)
{
handle->cfg.~LocalConfig();
new (&(handle->cfg)) LocalConfig;
handle->cfg.init(0, 0); /* reset the LocalConfig */
- SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, mgmsrv ? mgmsrv : "");
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING,
+ connect_string ? connect_string : "");
DBUG_RETURN(-1);
}
handle->cfg_i= -1;
=== modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp'
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-03-14 13:34:05 +0000
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-06-10 20:06:47 +0000
@@ -229,7 +229,6 @@ extern "C" {
#include <NdbMem.h>
#include <EventLogger.hpp>
#include <signaldata/SetLogLevelOrd.hpp>
-#include "MgmtErrorReporter.hpp"
#include <Parser.hpp>
#include <SocketServer.hpp>
#include <util/InputStream.hpp>
=== modified file 'storage/ndb/src/mgmsrv/Config.cpp'
--- a/storage/ndb/src/mgmsrv/Config.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/Config.cpp 2008-07-29 14:04:27 +0000
@@ -14,11 +14,11 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Config.hpp"
-#include "MgmtErrorReporter.hpp"
-//*****************************************************************************
-// Ctor / Dtor
-//*****************************************************************************
+#include <mgmapi_config_parameters.h>
+#include <NdbOut.hpp>
+#include "ConfigInfo.hpp"
+
Config::Config(struct ndb_mgm_configuration *config_values) :
m_configValues(config_values)
@@ -32,144 +32,52 @@ Config::~Config() {
}
}
-/*****************************************************************************/
-
-void
-Config::printAllNameValuePairs(NdbOut &out,
- const Properties *prop,
- const char* s) const {
- Properties::Iterator it(prop);
- const Properties * section = m_info.getInfo(s);
- for (const char* n = it.first(); n != NULL; n = it.next()) {
- Uint32 int_value;
- const char* str_value;
- Uint64 int_64;
+unsigned sections[]=
+{
+ CFG_SECTION_SYSTEM,
+ CFG_SECTION_NODE,
+ CFG_SECTION_CONNECTION
+};
+const size_t num_sections= sizeof(sections)/sizeof(unsigned);
+
+static const ConfigInfo g_info;
+
+void
+Config::print() const {
+
+ for(unsigned i= 0; i < num_sections; i++) {
+ unsigned section= sections[i];
+ ndb_mgm_configuration_iterator it(*m_configValues, section);
- if(!section->contains(n))
- continue;
- if (m_info.getStatus(section, n) == ConfigInfo::CI_INTERNAL)
- continue;
- if (m_info.getStatus(section, n) == ConfigInfo::CI_DEPRICATED)
- continue;
- if (m_info.getStatus(section, n) == ConfigInfo::CI_NOTIMPLEMENTED)
+ if (it.first())
continue;
- out << n << ": ";
+ for(;it.valid();it.next()) {
- switch (m_info.getType(section, n)) {
- case ConfigInfo::CI_INT:
- MGM_REQUIRE(prop->get(n, &int_value));
- out << int_value;
- break;
-
- case ConfigInfo::CI_INT64:
- MGM_REQUIRE(prop->get(n, &int_64));
- out << int_64;
- break;
-
- case ConfigInfo::CI_BOOL:
- MGM_REQUIRE(prop->get(n, &int_value));
- if (int_value) {
- out << "Y";
- } else {
- out << "N";
+ Uint32 section_type;
+ if(it.get(CFG_TYPE_OF_SECTION, §ion_type) != 0)
+ continue;
+
+ const ConfigInfo::ParamInfo* pinfo= NULL;
+ ConfigInfo::ParamInfoIter param_iter(g_info,
+ section,
+ section_type);
+
+ ndbout_c("[%s]", g_info.sectionName(section, section_type));
+
+ /* Loop through the section and print those values that exist */
+ Uint32 val;
+ Uint64 val64;
+ const char* val_str;
+ while((pinfo= param_iter.next())){
+
+ if (!it.get(pinfo->_paramId, &val))
+ ndbout_c("%s=%u", pinfo->_fname, val);
+ else if (!it.get(pinfo->_paramId, &val64))
+ ndbout_c("%s=%llu", pinfo->_fname, val64);
+ else if (!it.get(pinfo->_paramId, &val_str))
+ ndbout_c("%s=%s", pinfo->_fname, val_str);
}
- break;
- case ConfigInfo::CI_STRING:
- MGM_REQUIRE(prop->get(n, &str_value));
- out << str_value;
- break;
- case ConfigInfo::CI_SECTION:
- out << "SECTION";
- break;
- }
- out << endl;
- }
-}
-
-/*****************************************************************************/
-
-void Config::printConfigFile(NdbOut &out) const {
-#if 0
- Uint32 noOfNodes, noOfConnections, noOfComputers;
- MGM_REQUIRE(get("NoOfNodes", &noOfNodes));
- MGM_REQUIRE(get("NoOfConnections", &noOfConnections));
- MGM_REQUIRE(get("NoOfComputers", &noOfComputers));
-
- out <<
- "######################################################################" <<
- endl <<
- "#" << endl <<
- "# NDB Cluster System configuration" << endl <<
- "#" << endl <<
- "######################################################################" <<
- endl <<
- "# No of nodes (DB, API or MGM): " << noOfNodes << endl <<
- "# No of connections: " << noOfConnections << endl <<
- "######################################################################" <<
- endl;
-
- /**************************
- * Print COMPUTER configs *
- **************************/
- const char * name;
- Properties::Iterator it(this);
- for(name = it.first(); name != NULL; name = it.next()){
- if(strncasecmp("Computer_", name, 9) == 0){
-
- const Properties *prop;
- out << endl << "[COMPUTER]" << endl;
- MGM_REQUIRE(get(name, &prop));
- printAllNameValuePairs(out, prop, "COMPUTER");
-
- out << endl <<
- "###################################################################" <<
- endl;
-
- } else if(strncasecmp("Node_", name, 5) == 0){
- /**********************
- * Print NODE configs *
- **********************/
- const Properties *prop;
- const char *s;
-
- MGM_REQUIRE(get(name, &prop));
- MGM_REQUIRE(prop->get("Type", &s));
- out << endl << "[" << s << "]" << endl;
- printAllNameValuePairs(out, prop, s);
-
- out << endl <<
- "###################################################################" <<
- endl;
- } else if(strncasecmp("Connection_", name, 11) == 0){
- /****************************
- * Print CONNECTION configs *
- ****************************/
- const Properties *prop;
- const char *s;
-
- MGM_REQUIRE(get(name, &prop));
- MGM_REQUIRE(prop->get("Type", &s));
- out << endl << "[" << s << "]" << endl;
- printAllNameValuePairs(out, prop, s);
-
- out << endl <<
- "###################################################################" <<
- endl;
- } else if(strncasecmp("SYSTEM", name, strlen("SYSTEM")) == 0) {
- /************************
- * Print SYSTEM configs *
- ************************/
- const Properties *prop;
-
- MGM_REQUIRE(get(name, &prop));
- out << endl << "[SYSTEM]" << endl;
- printAllNameValuePairs(out, prop, "SYSTEM");
-
- out << endl <<
- "###################################################################" <<
- endl;
}
}
-#endif
}
=== modified file 'storage/ndb/src/mgmsrv/Config.hpp'
--- a/storage/ndb/src/mgmsrv/Config.hpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/Config.hpp 2008-07-29 13:38:18 +0000
@@ -22,16 +22,10 @@
/**
* @class Config
- * @brief Cluster Configuration (corresponds to initial configuration file)
+ * @brief Cluster Configuration Wrapper
*
- * Contains all cluster configuration parameters.
- *
- * The information includes all configurable parameters for a NDB cluster:
- * - DB, API and MGM nodes with all their properties,
- * - Connections between nodes and computers the nodes will execute on.
- *
- * The following categories (sections) of configuration parameters exists:
- * - COMPUTER, DB, MGM, API, TCP, SCI, SHM
+ * Adds a C++ wrapper around 'ndb_mgm_configuration' which is
+ * exposed from mgmapi_configuration
*
*/
@@ -40,31 +34,15 @@ public:
Config(struct ndb_mgm_configuration *config_values = NULL);
virtual ~Config();
- /**
- * Prints the configuration in configfile format
- */
- void printConfigFile(NdbOut &out = ndbout) const;
- void printConfigFile(OutputStream &out) const {
- NdbOut ndb(out);
- printConfigFile(ndb);
- }
-
- /**
- * Info
- */
- const ConfigInfo * getConfigInfo() const { return &m_info;}
-private:
- ConfigInfo m_info;
-
- void printAllNameValuePairs(NdbOut &out,
- const Properties *prop,
- const char* section) const;
-
- /**
- * Information about parameters (min, max values etc)
- */
-public:
+ void print() const;
+
struct ndb_mgm_configuration * m_configValues;
};
+class ConfigIter : public ndb_mgm_configuration_iterator {
+public:
+ ConfigIter(const Config* conf, unsigned type) :
+ ndb_mgm_configuration_iterator(*conf->m_configValues, type) {};
+};
+
#endif // Config_H
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-07-28 09:43:13 +0000
@@ -2832,6 +2832,57 @@ ConfigInfo::getAlias(const char * sectio
return 0;
}
+
+const char*
+ConfigInfo::sectionName(Uint32 section_type, Uint32 type) const {
+
+ switch (section_type){
+ case CFG_SECTION_SYSTEM:
+ return "SYSTEM";
+ break;
+
+ case CFG_SECTION_NODE:
+ switch(type){
+ case NODE_TYPE_DB:
+ return DB_TOKEN_PRINT;
+ break;
+ case NODE_TYPE_MGM:
+ return MGM_TOKEN_PRINT;
+ break;
+ case NODE_TYPE_API:
+ return API_TOKEN_PRINT;
+ break;
+ default:
+ assert(false);
+ break;
+ }
+ break;
+
+ case CFG_SECTION_CONNECTION:
+ switch(type){
+ case CONNECTION_TYPE_TCP:
+ return "TCP";
+ break;
+ case CONNECTION_TYPE_SHM:
+ return "SHM";
+ break;
+ case CONNECTION_TYPE_SCI:
+ return "SCI";
+ break;
+ default:
+ assert(false);
+ break;
+ }
+ break;
+
+ default:
+ assert(false);
+ break;
+ }
+
+ return "<unknown section>";
+}
+
bool
ConfigInfo::verify(const Properties * section, const char* fname,
Uint64 value) const {
@@ -3046,7 +3097,7 @@ fixNodeHostname(InitConfigFileParser::Co
}
if(!computer->get("HostName", &hostname)){
- ctx.reportError("HostName missing in [COMPUTER] (Id: %d) "
+ ctx.reportError("HostName missing in [COMPUTER] (Id: %s) "
" - [%s] starting at line: %d",
compId, ctx.fname, ctx.m_sectionLineno);
DBUG_RETURN(false);
@@ -3819,8 +3870,6 @@ fixDepricated(InitConfigFileParser::Cont
return true;
}
-extern int g_print_full_config;
-
static bool
saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){
const Properties * sec;
@@ -3841,12 +3890,6 @@ saveInConfigValues(InitConfigFileParser:
ndbout_c("skipping section %s", ctx.fname);
break;
}
-
- if (g_print_full_config)
- {
- const char *alias= ConfigInfo::nameToAlias(ctx.fname);
- printf("[%s]\n", alias ? alias : ctx.fname);
- }
Uint32 no = 0;
ctx.m_userProperties.get("$Section", id, &no);
@@ -3875,24 +3918,18 @@ saveInConfigValues(InitConfigFileParser:
Uint32 val;
require(ctx.m_currentSection->get(n, &val));
ok = ctx.m_configValues.put(id, val);
- if (g_print_full_config)
- printf("%s=%u\n", n, val);
break;
}
case PropertiesType_Uint64:{
Uint64 val;
require(ctx.m_currentSection->get(n, &val));
ok = ctx.m_configValues.put64(id, val);
- if (g_print_full_config)
- printf("%s=%llu\n", n, val);
break;
}
case PropertiesType_char:{
const char * val;
require(ctx.m_currentSection->get(n, &val));
ok = ctx.m_configValues.put(id, val);
- if (g_print_full_config)
- printf("%s=%s\n", n, val);
break;
}
default:
@@ -4270,5 +4307,43 @@ check_node_vs_replicas(Vector<ConfigInfo
return true;
}
+
+ConfigInfo::ParamInfoIter::ParamInfoIter(const ConfigInfo& info,
+ Uint32 section,
+ Uint32 section_type) :
+ m_info(info),
+ m_curr_param(0)
+{
+ /* Find the section's name */
+ for (int j=0; j<info.m_NoOfParams; j++) {
+ const ConfigInfo::ParamInfo & param = info.m_ParamInfo[j];
+ if (param._type == ConfigInfo::CI_SECTION &&
+ param._paramId == section &&
+ (section_type == (Uint32)~0 || param._section_type == section_type))
+ {
+ m_section_name= param._section;
+ break;
+ }
+ }
+ assert(m_section_name);
+}
+
+
+const ConfigInfo::ParamInfo*
+ConfigInfo::ParamInfoIter::next(void) {
+ assert(m_curr_param < m_info.m_NoOfParams);
+ do {
+ /* Loop through the parameter and return a pointer to the next found */
+ const ConfigInfo::ParamInfo* param = &m_info.m_ParamInfo[m_curr_param++];
+ if (strcmp(param->_section, m_section_name) == 0 &&
+ param->_type != ConfigInfo::CI_SECTION)
+ return param;
+ }
+ while (m_curr_param<m_info.m_NoOfParams);
+
+ return NULL;
+}
+
+
template class Vector<ConfigInfo::ConfigRuleSection>;
#endif /* NDB_MGMAPI */
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.hpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-07-25 10:02:59 +0000
@@ -81,11 +81,26 @@ public:
* For section entries, instead the _default member gives the internal id
* of that kind of section (CONNECTION_TYPE_TCP, NODE_TYPE_MGM, etc.)
*/
- const char* _default;
+ union {
+ const char* _default;
+ Uint32 _section_type; // if _type = CI_SECTION
+ };
const char* _min;
const char* _max;
};
+ class ParamInfoIter {
+ const ConfigInfo& m_info;
+ const char* m_section_name;
+ int m_curr_param;
+ public:
+ ParamInfoIter(const ConfigInfo& info,
+ Uint32 section,
+ Uint32 section_type = ~0);
+
+ const ParamInfo* next(void);
+ };
+
#ifndef NDB_MGMAPI
struct AliasPair{
const char * name;
@@ -149,6 +164,8 @@ public:
void print(const char* section) const;
void print(const Properties * section, const char* parameter) const;
+ const char* sectionName(Uint32 section, Uint32 type) const;
+
private:
Properties m_info;
Properties m_systemDefaults;
=== modified file 'storage/ndb/src/mgmsrv/InitConfigFileParser.cpp'
--- a/storage/ndb/src/mgmsrv/InitConfigFileParser.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/InitConfigFileParser.cpp 2008-07-25 16:03:09 +0000
@@ -17,7 +17,6 @@
#include "InitConfigFileParser.hpp"
#include "Config.hpp"
-#include "MgmtErrorReporter.hpp"
#include <NdbOut.hpp>
#include "ConfigInfo.hpp"
#include <m_string.h>
@@ -319,7 +318,7 @@ InitConfigFileParser::storeNameValuePair
ctx.reportError("Illegal boolean value for parameter %s", fname);
return false;
}
- MGM_REQUIRE(ctx.m_currentSection->put(pname, value_bool));
+ require(ctx.m_currentSection->put(pname, value_bool));
break;
}
case ConfigInfo::CI_INT:
@@ -337,14 +336,14 @@ InitConfigFileParser::storeNameValuePair
return false;
}
if(type == ConfigInfo::CI_INT){
- MGM_REQUIRE(ctx.m_currentSection->put(pname, (Uint32)value_int));
+ require(ctx.m_currentSection->put(pname, (Uint32)value_int));
} else {
- MGM_REQUIRE(ctx.m_currentSection->put64(pname, value_int));
+ require(ctx.m_currentSection->put64(pname, value_int));
}
break;
}
case ConfigInfo::CI_STRING:
- MGM_REQUIRE(ctx.m_currentSection->put(pname, value));
+ require(ctx.m_currentSection->put(pname, value));
break;
case ConfigInfo::CI_SECTION:
abort();
=== modified file 'storage/ndb/src/mgmsrv/InitConfigFileParser.hpp'
--- a/storage/ndb/src/mgmsrv/InitConfigFileParser.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/mgmsrv/InitConfigFileParser.hpp 2008-06-10 09:21:15 +0000
@@ -84,8 +84,10 @@ public:
public:
FILE * m_errstream;
- void reportError(const char * msg, ...);
- void reportWarning(const char * msg, ...);
+ void reportError(const char * msg, ...)
+ ATTRIBUTE_FORMAT(printf, 2, 3);
+ void reportWarning(const char * msg, ...)
+ ATTRIBUTE_FORMAT(printf, 2, 3);
};
static bool convertStringToUint64(const char* s, Uint64& val, Uint32 log10base =
0);
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-07-29 14:20:49 +0000
@@ -17,9 +17,8 @@
#include <my_pthread.h>
#include "MgmtSrvr.hpp"
-#include "MgmtErrorReporter.hpp"
#include "ndb_mgmd_error.h"
-#include <ConfigRetriever.hpp>
+#include "Services.hpp"
#include <NdbOut.hpp>
#include <NdbApiSignal.hpp>
@@ -57,13 +56,6 @@
#include <SignalSender.hpp>
-//#define MGM_SRV_DEBUG
-#ifdef MGM_SRV_DEBUG
-#define DEBUG(x) do ndbout << x << endl; while(0)
-#else
-#define DEBUG(x)
-#endif
-
int g_errorInsert;
#define ERROR_INSERTED(x) (g_errorInsert == x)
@@ -77,7 +69,6 @@ int g_errorInsert;
}\
}
-extern int g_no_nodeid_checks;
extern my_bool opt_core;
static void require(bool v)
@@ -215,56 +206,6 @@ MgmtSrvr::logLevelThreadRun()
}
}
-void
-MgmtSrvr::startEventLog()
-{
- NdbMutex_Lock(m_configMutex);
-
- g_eventLogger->setCategory("MgmSrvr");
-
- ndb_mgm_configuration_iterator
- iter(* _config->m_configValues, CFG_SECTION_NODE);
-
- if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){
- NdbMutex_Unlock(m_configMutex);
- return;
- }
-
- const char * tmp;
- char errStr[100];
- int err= 0;
- BaseString logdest;
- char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId);
- NdbAutoPtr<char> tmp_aptr(clusterLog);
-
- if(iter.get(CFG_LOG_DESTINATION, &tmp) == 0){
- logdest.assign(tmp);
- }
- NdbMutex_Unlock(m_configMutex);
-
- if(logdest.length() == 0 || logdest == "") {
- logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6",
- clusterLog);
- }
- errStr[0]='\0';
- if(!g_eventLogger->addHandler(logdest, &err, sizeof(errStr), errStr)) {
- ndbout << "Warning: could not add log destination \""
- << logdest.c_str() << "\". Reason: ";
- if(err)
- ndbout << strerror(err);
- if(err && errStr[0]!='\0')
- ndbout << ", ";
- if(errStr[0]!='\0')
- ndbout << errStr;
- ndbout << endl;
- }
-}
-
-void
-MgmtSrvr::stopEventLog()
-{
- g_eventLogger->close();
-}
bool
MgmtSrvr::setEventLogFilter(int severity, int enable)
@@ -308,279 +249,304 @@ int MgmtSrvr::translateStopRef(Uint32 er
}
-int
-MgmtSrvr::getPort() const
+MgmtSrvr::MgmtSrvr(const MgmtOpts& opts,
+ const char* connect_str) :
+ m_opts(opts),
+ _blockNumber(-1),
+ _ownNodeId(0),
+ m_port(0),
+ m_config_retriever(connect_str,
+ NDB_VERSION,
+ NDB_MGM_NODE_TYPE_MGM),
+ _ownReference(0),
+ _config(NULL),
+ theFacade(NULL),
+ _isStopThread(false),
+ _logLevelThreadSleep(500),
+ m_event_listner(this),
+ m_master_node(0),
+ _logLevelThread(NULL)
{
- if(NdbMutex_Lock(m_configMutex))
- return 0;
-
- ndb_mgm_configuration_iterator
- iter(* _config->m_configValues, CFG_SECTION_NODE);
+ DBUG_ENTER("MgmtSrvr::MgmtSrvr");
- if(iter.find(CFG_NODE_ID, getOwnNodeId()) != 0){
- ndbout << "Could not retrieve configuration for Node "
- << getOwnNodeId() << " in config file." << endl
- << "Have you set correct NodeId for this node?" << endl;
- NdbMutex_Unlock(m_configMutex);
- return 0;
+ m_configMutex = NdbMutex_Create();
+ m_node_id_mutex = NdbMutex_Create();
+ if (!m_configMutex || !m_node_id_mutex)
+ {
+ g_eventLogger->error("Failed to create MgmtSrvr mutexes");
+ require(false);
}
- unsigned type;
- if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 ||
- type != NODE_TYPE_MGM){
- ndbout << "Local node id " << getOwnNodeId()
- << " is not defined as management server" << endl
- << "Have you set correct NodeId for this node?" << endl;
- NdbMutex_Unlock(m_configMutex);
- return 0;
- }
-
- Uint32 port = 0;
- if(iter.get(CFG_MGM_PORT, &port) != 0){
- ndbout << "Could not find PortNumber in the configuration file." << endl;
- NdbMutex_Unlock(m_configMutex);
- return 0;
+ /* Init node arrays */
+ for(Uint32 i = 0; i<MAX_NODES; i++) {
+ nodeTypes[i] = (enum ndb_mgm_node_type)-1;
+ m_connect_address[i].s_addr= 0;
}
- NdbMutex_Unlock(m_configMutex);
+ /* Setup clusterlog as client[0] in m_event_listner */
+ {
+ Ndb_mgmd_event_service::Event_listener se;
+ se.m_socket = NDB_INVALID_SOCKET;
+ for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){
+ se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7);
+ }
+ se.m_logLevel.setLogLevel(LogLevel::llError, 15);
+ se.m_logLevel.setLogLevel(LogLevel::llConnection, 8);
+ se.m_logLevel.setLogLevel(LogLevel::llBackup, 15);
+ m_event_listner.m_clients.push_back(se);
+ m_event_listner.m_logLevel = se.m_logLevel;
+ }
- return port;
+ DBUG_VOID_RETURN;
}
-MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
- const char *config_filename,
- const char *connect_string) :
- _blockNumber(-1),
- m_socket_server(socket_server),
- _ownReference(0),
- m_local_mgm_handle(0),
- m_event_listner(this),
- m_master_node(0)
-{
-
- DBUG_ENTER("MgmtSrvr::MgmtSrvr");
-
- _ownNodeId= 0;
-
- _config = NULL;
-
- _isStopThread = false;
- _logLevelThread = NULL;
- _logLevelThreadSleep = 500;
-
- theFacade = 0;
- if (config_filename)
- m_configFilename.assign(config_filename);
+bool
+MgmtSrvr::init()
+{
+ DBUG_ENTER("MgmtSrvr::init");
- m_config_retriever= new ConfigRetriever(connect_string,
- NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
- // if connect_string explicitly given or
- // no config filename is given then
- // first try to allocate nodeid from another management server
- if ((connect_string || config_filename == NULL) &&
- (m_config_retriever->do_connect(0,0,0) == 0))
+ if (m_opts.mycnf || m_opts.config_filename)
{
- int tmp_nodeid= 0;
- tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
- if (tmp_nodeid == 0)
+ Config* conf= NULL;
+ if (m_opts.mycnf && (conf= load_init_mycnf()) == NULL)
{
- ndbout_c(m_config_retriever->getErrorString());
- require(false);
+ g_eventLogger->error("Could not load config from 'my.cnf'");
+ DBUG_RETURN(false);
}
- // read config from other managent server
- _config= fetchConfig();
- if (_config == 0)
+ else if (m_opts.config_filename && (conf= load_init_config()) == NULL)
{
- ndbout << m_config_retriever->getErrorString() << endl;
- require(false);
+ g_eventLogger->error("Could not load initial config from '%s'",
+ m_opts.config_filename);
+ DBUG_RETURN(false);
+ }
+ setConfig(conf);
+ }
+ else
+ {
+ if (fetch_config())
+ {
+ g_eventLogger->error("Could not fetch config");
+ DBUG_RETURN(false);
}
- _ownNodeId= tmp_nodeid;
+ }
+
+ if (m_opts.print_full_config)
+ {
+ print_config();
+ DBUG_RETURN(false);
}
if (_ownNodeId == 0)
{
- // read config locally
- _config= readConfig();
- if (_config == 0) {
- if (config_filename != NULL)
- ndbout << "Invalid configuration file: " << config_filename <<
endl;
- else
- ndbout << "Invalid configuration file" << endl;
- exit(-1);
+ _ownNodeId= m_config_retriever.get_configuration_nodeid();
+
+ int error_code;
+ BaseString error_string;
+ if (!alloc_node_id(&_ownNodeId, NDB_MGM_NODE_TYPE_MGM,
+ 0, 0, error_code, error_string, 0))
+ {
+ g_eventLogger->error("Unable to obtain requested nodeid, error: '%s'",
+ error_string.c_str());
+ DBUG_RETURN(false);
+ }
+
+ if (!m_config_retriever.verifyConfig(_config->m_configValues,
+ _ownNodeId))
+ {
+ g_eventLogger->error(m_config_retriever.getErrorString());
+ DBUG_RETURN(false);
}
}
- m_configMutex = NdbMutex_Create();
+ setClusterLog();
+
+ DBUG_RETURN(true);
+}
- /**
- * Fill the nodeTypes array
- */
- for(Uint32 i = 0; i<MAX_NODES; i++) {
- nodeTypes[i] = (enum ndb_mgm_node_type)-1;
- m_connect_address[i].s_addr= 0;
- }
+bool
+MgmtSrvr::start_transporter()
+{
+ DBUG_ENTER("MgmtSrvr::start_transporter");
+
+ theFacade= new TransporterFacade(0);
+ if (theFacade == 0)
{
- ndb_mgm_configuration_iterator
- iter(* _config->m_configValues, CFG_SECTION_NODE);
+ g_eventLogger->error("Could not create TransporterFacade.");
+ DBUG_RETURN(false);
+ }
- for(iter.first(); iter.valid(); iter.next()){
- unsigned type, id;
- if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0)
- continue;
-
- if(iter.get(CFG_NODE_ID, &id) != 0)
- continue;
-
- MGM_REQUIRE(id < MAX_NODES);
-
- switch(type){
- case NODE_TYPE_DB:
- nodeTypes[id] = NDB_MGM_NODE_TYPE_NDB;
- break;
- case NODE_TYPE_API:
- nodeTypes[id] = NDB_MGM_NODE_TYPE_API;
- break;
- case NODE_TYPE_MGM:
- nodeTypes[id] = NDB_MGM_NODE_TYPE_MGM;
- break;
- default:
- break;
- }
- }
+ if (theFacade->start_instance(_ownNodeId,
+ _config->m_configValues) < 0)
+ {
+ g_eventLogger->error("Failed to start transporter");
+ delete theFacade;
+ theFacade = 0;
+ DBUG_RETURN(false);
}
- BaseString error_string;
+ assert(_blockNumber == -1); // Blocknumber shouldn't been allocated yet
- if ((m_node_id_mutex = NdbMutex_Create()) == 0)
+ /*
+ Register ourself at TransporterFacade to be able to receive signals
+ and to be notified when a database process has died.
+ */
+ if ((_blockNumber= theFacade->open(this,
+ signalReceivedNotification,
+ nodeStatusNotification)) == -1)
{
- ndbout << "mutex creation failed line = " << __LINE__ << endl;
- require(false);
+ g_eventLogger->error("Failed to open block in TransporterFacade");
+ theFacade->stop_instance();
+ delete theFacade;
+ theFacade = 0;
+ DBUG_RETURN(false);
}
- if (_ownNodeId == 0) // we did not get node id from other server
+ _ownReference = numberToRef(_blockNumber, _ownNodeId);
+
+ /*
+ set api reg req frequency quite high:
+
+ 100 ms interval to make sure we have fairly up-to-date
+ info from the nodes. This to make sure that this info
+ is not dependent on heartbeat settings in the
+ configuration
+ */
+ theFacade->theClusterMgr->set_max_api_reg_req_interval(100);
+
+ DBUG_RETURN(true);
+}
+
+
+bool
+MgmtSrvr::start_mgm_service()
+{
+ DBUG_ENTER("MgmtSrvr::start_mgm_service");
+
+ assert(m_port == 0);
{
- NodeId tmp= m_config_retriever->get_configuration_nodeid();
- int error_code;
+ // Find the portnumber to use for mgm service
+ Guard g(m_configMutex);
+ ConfigIter iter(_config, CFG_SECTION_NODE);
- if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
- 0, 0, error_code, error_string)){
- ndbout << "Unable to obtain requested nodeid: "
- << error_string.c_str() << endl;
- require(false);
+ if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){
+ g_eventLogger->error("Could not find node %d in config", _ownNodeId);
+ DBUG_RETURN(false);
+ }
+
+ unsigned type;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 ||
+ type != NODE_TYPE_MGM){
+ g_eventLogger->error("Node %d is not defined as management server",
+ _ownNodeId);
+ return 0;
+ DBUG_RETURN(false);
}
- _ownNodeId = tmp;
+
+ if(iter.get(CFG_MGM_PORT, &m_port) != 0){
+ g_eventLogger->error("PortNumber not defined for node %d", _ownNodeId);
+ return 0;
+ DBUG_RETURN(false);
+ }
+ }
+
+ unsigned short port= m_port;
+ DBUG_PRINT("info", ("Using port %d", port));
+ if (port == 0)
+ {
+ g_eventLogger->error("Could not find out which port to use"\
+ " for management service");
+ DBUG_RETURN(false);
}
{
- DBUG_PRINT("info", ("verifyConfig"));
- if (!m_config_retriever->verifyConfig(_config->m_configValues,
- _ownNodeId))
+ int count= 5; // no of retries for tryBind
+ while(!m_socket_server.tryBind(port, m_opts.bind_address))
{
- ndbout << m_config_retriever->getErrorString() << endl;
- require(false);
+ if (--count > 0)
+ {
+ NdbSleep_SecSleep(1);
+ continue;
+ }
+ g_eventLogger->error("Unable to bind management service port: %s:%d!\n"
+ "Please check if the port is already used,\n"
+ "(perhaps a ndb_mgmd is already running),\n"
+ "and if you are executing on the correct computer",
+ (m_opts.bind_address ? m_opts.bind_address : "*"),
+ port);
+ DBUG_RETURN(false);
}
}
- // Setup clusterlog as client[0] in m_event_listner
{
- Ndb_mgmd_event_service::Event_listener se;
- se.m_socket = NDB_INVALID_SOCKET;
- for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){
- se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7);
+ MgmApiService * mapi = new MgmApiService(*this);
+ if (mapi == NULL)
+ {
+ g_eventLogger->error("Could not allocate MgmApiService");
+ DBUG_RETURN(false);
+ }
+
+ if(!m_socket_server.setup(mapi, &port, m_opts.bind_address))
+ {
+ delete mapi; // Will be deleted by SocketServer in all other cases
+ g_eventLogger->error("Unable to setup management service port: %s:%d!\n"
+ "Please check if the port is already used,\n"
+ "(perhaps a ndb_mgmd is already running),\n"
+ "and if you are executing on the correct computer",
+ (m_opts.bind_address ? m_opts.bind_address : "*"),
+ port);
+ DBUG_RETURN(false);
+ }
+
+ if (port != m_port)
+ {
+ g_eventLogger->error("Couldn't start management service on the "\
+ "requested port: %d. Got port: %d instead",
+ m_port, port);
+ DBUG_RETURN(false);
}
- se.m_logLevel.setLogLevel(LogLevel::llError, 15);
- se.m_logLevel.setLogLevel(LogLevel::llConnection, 8);
- se.m_logLevel.setLogLevel(LogLevel::llBackup, 15);
- m_event_listner.m_clients.push_back(se);
- m_event_listner.m_logLevel = se.m_logLevel;
}
-
- DBUG_VOID_RETURN;
+
+ m_socket_server.startServer();
+
+ g_eventLogger->info("Id: %d, Command port: %s:%d",
+ _ownNodeId,
+ m_opts.bind_address ? m_opts.bind_address : "*",
+ port);
+ DBUG_RETURN(true);
}
-bool
-MgmtSrvr::start(BaseString &error_string, const char * bindaddress)
+bool
+MgmtSrvr::start()
{
- int mgm_connect_result;
-
DBUG_ENTER("MgmtSrvr::start");
- theFacade= new TransporterFacade(0);
-
- if(theFacade == 0) {
- DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
- error_string.append("MgmtSrvr.cpp: theFacade is NULL.");
- DBUG_RETURN(false);
- }
- if ( theFacade->start_instance
- (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
- DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
+ /* Start transporter */
+ if(!start_transporter())
+ {
+ g_eventLogger->error("Failed to start transporter!");
DBUG_RETURN(false);
}
- assert(_blockNumber == -1);
-
- // Register ourself at TransporterFacade to be able to receive signals
- // and to be notified when a database process has died.
- _blockNumber = theFacade->open(this,
- signalReceivedNotification,
- nodeStatusNotification);
-
- if(_blockNumber == -1){
- DEBUG("MgmtSrvr.cpp: _blockNumber is -1.");
- error_string.append("MgmtSrvr.cpp: _blockNumber is -1.");
- theFacade->stop_instance();
- theFacade = 0;
+ /* Start mgm service */
+ if (!start_mgm_service())
+ {
+ g_eventLogger->error("Failed to start mangement service!");
DBUG_RETURN(false);
}
- if((mgm_connect_result= connect_to_self(bindaddress)) < 0)
+ /* Use local MGM port for TransporterRegistry */
+ if(!connect_to_self())
{
- ndbout_c("Unable to connect to our own ndb_mgmd (Error %d)",
- mgm_connect_result);
- ndbout_c("This is probably a bug.");
- }
-
- /*
- set api reg req frequency quite high:
-
- 100 ms interval to make sure we have fairly up-to-date
- info from the nodes. This to make sure that this info
- is not dependent on heart beat settings in the
- configuration
- */
- theFacade->theClusterMgr->set_max_api_reg_req_interval(100);
-
- TransporterRegistry *reg = theFacade->get_registry();
- for(unsigned int i=0;i<reg->m_transporter_interface.size();i++) {
- BaseString msg;
- DBUG_PRINT("info",("Setting dynamic port %d->%d : %d",
- reg->get_localNodeId(),
- reg->m_transporter_interface[i].m_remote_nodeId,
- reg->m_transporter_interface[i].m_s_service_port
- )
- );
- int res = setConnectionDbParameter((int)reg->get_localNodeId(),
- (int)reg->m_transporter_interface[i]
- .m_remote_nodeId,
- (int)CFG_CONNECTION_SERVER_PORT,
- reg->m_transporter_interface[i]
- .m_s_service_port,
- msg);
- DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str()));
+ g_eventLogger->error("Failed to connect to ourself!");
+ DBUG_RETURN(false);
}
- _ownReference = numberToRef(_blockNumber, _ownNodeId);
-
- startEventLog();
- // Set the initial confirmation count for subscribe requests confirm
- // from NDB nodes in the cluster.
- //
- // Loglevel thread
+ /* Loglevel thread */
+ assert(_isStopThread == false);
_logLevelThread = NdbThread_Create(logLevelThread_C,
(void**)this,
32768,
@@ -591,25 +557,100 @@ MgmtSrvr::start(BaseString &error_string
}
-//****************************************************************************
-//****************************************************************************
-MgmtSrvr::~MgmtSrvr()
+void
+MgmtSrvr::setClusterLog(void)
{
- if(theFacade != 0){
- theFacade->stop_instance();
- delete theFacade;
- theFacade = 0;
+ BaseString logdest;
+
+ g_eventLogger->close();
+
+ // Get log destination from config
+ DBUG_ASSERT(_ownNodeId);
+ ConfigIter iter(_config, CFG_SECTION_NODE);
+ require(iter.find(CFG_NODE_ID, _ownNodeId) == 0);
+
+ const char *value;
+ if(iter.get(CFG_LOG_DESTINATION, &value) == 0){
+ logdest.assign(value);
}
- stopEventLog();
+ if(logdest.length() == 0 || logdest == "") {
+ // No LogDestination set, use default settings
+ char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId);
+ logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6",
+ clusterLog);
+ free(clusterLog);
+ }
- NdbMutex_Destroy(m_node_id_mutex);
- NdbMutex_Destroy(m_configMutex);
+ int err= 0;
+ char errStr[100]= {0};
+ if(!g_eventLogger->addHandler(logdest, &err, sizeof(errStr), errStr)) {
+ ndbout << "Warning: could not add log destination '"
+ << logdest.c_str() << "'. Reason: ";
+ if(err)
+ ndbout << strerror(err);
+ if(err && errStr[0]!='\0')
+ ndbout << ", ";
+ if(errStr[0]!='\0')
+ ndbout << errStr;
+ ndbout << endl;
+ }
+
+ if (m_opts.non_interactive)
+ g_eventLogger->createConsoleHandler();
+}
+
+
+void
+MgmtSrvr::setConfig(Config* conf)
+{
+ DBUG_ENTER("MgmtSrvr::setConfig");
+ Guard g(m_configMutex);
- if(_config != NULL)
- delete _config;
+ _config= conf;
- // End set log level thread
+ /* Rebuild node arrays */
+ ConfigIter iter(_config, CFG_SECTION_NODE);
+ for(Uint32 i = 0; i<MAX_NODES; i++) {
+
+ m_connect_address[i].s_addr= 0;
+
+ if (iter.first())
+ continue;
+
+ if (iter.find(CFG_NODE_ID, i) == 0){
+ unsigned type;
+ require(iter.get(CFG_TYPE_OF_SECTION, &type) == 0);
+
+ switch(type){
+ case NODE_TYPE_DB:
+ nodeTypes[i] = NDB_MGM_NODE_TYPE_NDB;
+ break;
+ case NODE_TYPE_API:
+ nodeTypes[i] = NDB_MGM_NODE_TYPE_API;
+ break;
+ case NODE_TYPE_MGM:
+ nodeTypes[i] = NDB_MGM_NODE_TYPE_MGM;
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ nodeTypes[i] = (enum ndb_mgm_node_type)-1;
+ }
+
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
+MgmtSrvr::~MgmtSrvr()
+{
+
+ /* Stop log level thread */
void* res = 0;
_isStopThread = true;
@@ -618,10 +659,26 @@ MgmtSrvr::~MgmtSrvr()
NdbThread_Destroy(&_logLevelThread);
}
- if (m_config_retriever)
- delete m_config_retriever;
+ /* Stop mgm service, don't allow new connections */
+ m_socket_server.stopServer();
+
+ /* Stop all active session */
+ m_socket_server.stopSessions(true);
+
+ // Stop transporter
+ if(theFacade != 0){
+ theFacade->stop_instance();
+ delete theFacade;
+ theFacade = 0;
+ }
+
+ delete _config;
+
+ NdbMutex_Destroy(m_node_id_mutex);
+ NdbMutex_Destroy(m_configMutex);
}
+
//****************************************************************************
//****************************************************************************
@@ -856,12 +913,14 @@ int MgmtSrvr::sendStopMgmd(NodeId nodeId
if(ndb_mgm_connect(h,1,0,0))
{
DBUG_PRINT("info",("failed ndb_mgm_connect"));
+ ndb_mgm_destroy_handle(&h);
return SEND_OR_RECEIVE_FAILED;
}
if(!restart)
{
if(ndb_mgm_stop(h, 1, (const int*)&nodeId) < 0)
{
+ ndb_mgm_destroy_handle(&h);
return SEND_OR_RECEIVE_FAILED;
}
}
@@ -871,6 +930,7 @@ int MgmtSrvr::sendStopMgmd(NodeId nodeId
nodes[0]= (int)nodeId;
if(ndb_mgm_restart2(h, 1, nodes, initialStart, nostart, abort) < 0)
{
+ ndb_mgm_destroy_handle(&h);
return SEND_OR_RECEIVE_FAILED;
}
}
@@ -973,7 +1033,7 @@ int MgmtSrvr::sendSTOP_REQ(const Vector<
if ((getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM)
&&(getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB))
- return WRONG_PROCESS_TYPE;
+ DBUG_RETURN(WRONG_PROCESS_TYPE);
if (getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM)
nodes_to_stop.set(nodeId);
@@ -1309,7 +1369,7 @@ int MgmtSrvr::restartNodes(const Vector<
Uint32 mysql_version = 0;
Uint32 connectCount = 0;
bool system;
- const char *address;
+ const char *address= NULL;
status(nodeId, &s, &version, &mysql_version, &startPhase,
&system, &dynamicId, &nodeGroup, &connectCount,
&address);
NdbSleep_MilliSleep(100);
@@ -1414,7 +1474,7 @@ int MgmtSrvr::restartDB(bool nostart, bo
continue;
int result;
result = start(nodeId);
- DEBUG("Starting node " << nodeId << " with result " << result);
+ g_eventLogger->debug("Started node %d with result %d", nodeId, result);
/**
* Errors from this call are deliberately ignored.
* Maybe the user only wanted to restart a subset of the nodes.
@@ -2156,7 +2216,7 @@ MgmtSrvr::alloc_node_id_req(NodeId free_
}
bool
-MgmtSrvr::alloc_node_id(NodeId * nodeId,
+MgmtSrvr::alloc_node_id(NodeId * nodeId,
enum ndb_mgm_node_type type,
struct sockaddr *client_addr,
SOCKET_SIZE_TYPE *client_addr_len,
@@ -2166,7 +2226,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
DBUG_ENTER("MgmtSrvr::alloc_node_id");
DBUG_PRINT("enter", ("nodeid: %d type: %d client_addr: 0x%ld",
*nodeId, type, (long) client_addr));
- if (g_no_nodeid_checks) {
+ if (m_opts.no_nodeid_checks) {
if (*nodeId == 0) {
error_string.appfmt("no-nodeid-checks set in management server.\n"
"node id must be set explicitly in connectstring");
@@ -2486,6 +2546,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
DBUG_RETURN(false);
}
+
bool
MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const
{
@@ -2994,23 +3055,28 @@ void MgmtSrvr::transporter_connect(NDB_S
}
}
-int MgmtSrvr::connect_to_self(const char * bindaddress)
+bool MgmtSrvr::connect_to_self()
{
- int r= 0;
- m_local_mgm_handle= ndb_mgm_create_handle();
- snprintf(m_local_mgm_connect_string,sizeof(m_local_mgm_connect_string),
- "%s:%u", bindaddress ? bindaddress : "localhost", getPort());
- ndb_mgm_set_connectstring(m_local_mgm_handle, m_local_mgm_connect_string);
+ BaseString buf;
+ NdbMgmHandle mgm_handle= ndb_mgm_create_handle();
- if((r= ndb_mgm_connect(m_local_mgm_handle, 0, 0, 0)) < 0)
- {
- ndb_mgm_destroy_handle(&m_local_mgm_handle);
- return r;
+ buf.assfmt("%s:%u",
+ m_opts.bind_address ? m_opts.bind_address : "localhost",
+ getPort());
+ ndb_mgm_set_connectstring(mgm_handle, buf.c_str());
+
+ if(ndb_mgm_connect(mgm_handle, 0, 0, 0) < 0)
+ {
+ g_eventLogger->warning("%d %s",
+ ndb_mgm_get_latest_error(mgm_handle),
+ ndb_mgm_get_latest_error_desc(mgm_handle));
+ ndb_mgm_destroy_handle(&mgm_handle);
+ return false;
}
- // TransporterRegistry now owns this NdbMgmHandle and will destroy it.
- theFacade->get_registry()->set_mgm_handle(m_local_mgm_handle);
+ // TransporterRegistry now owns the handle and will destroy it.
+ theFacade->get_registry()->set_mgm_handle(mgm_handle);
- return 0;
+ return true;
}
template class MutexVector<NodeId>;
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-07-29 13:38:18 +0000
@@ -96,16 +96,6 @@ public:
NdbMutex *m_node_id_mutex;
/**
- * Start/initate the event log.
- */
- void startEventLog();
-
- /**
- * Stop the event log.
- */
- void stopEventLog();
-
- /**
* Enable/disable eventlog log levels/severities.
*
* @param serverity the log level/serverity.
@@ -126,17 +116,61 @@ public:
*/
enum LogMode {In, Out, InOut, Off};
- /* Constructor */
-
- MgmtSrvr(SocketServer *socket_server,
- const char *config_filename, /* Where to save config */
- const char *connect_string);
- NodeId getOwnNodeId() const {return _ownNodeId;};
+ /**
+ @struct MgmtOpts
+ @brief Options used to control how the management server is started
+ */
+
+ struct MgmtOpts {
+ int daemon;
+ int non_interactive;
+ int interactive;
+ const char* config_filename;
+ int mycnf;
+ const char* bind_address;
+ int no_nodeid_checks;
+ int print_full_config;
+ };
- bool start(BaseString &error_string, const char * bindaddress = 0);
+ MgmtSrvr(); // Not implemented
+ MgmtSrvr(const MgmtSrvr&); // Not implemented
+ MgmtSrvr(const MgmtOpts&, const char* connect_str);
~MgmtSrvr();
+ /*
+ To be called after constructor. Loads configuration
+ from disk or fetches it from other server
+ */
+ bool init();
+
+private:
+ /* Functions used from 'init' */
+ Config* load_init_config(void);
+ Config* load_init_mycnf(void);
+ bool fetch_config(void);
+ bool save_config(const Config* conf);
+ bool save_config(void);
+
+public:
+
+ /*
+ To be called after 'init', starts up the services
+ this server will expose
+ */
+ bool start(void);
+private:
+ /* Functions used from 'start' */
+ bool start_transporter(void);
+ bool start_mgm_service(void);
+ bool connect_to_self(void);
+
+public:
+
+ NodeId getOwnNodeId() const {return _ownNodeId;};
+
+ void print_config() { _config->print(); };
+
/**
* Get status on a node.
* address may point to a common area (e.g. from inet_addr)
@@ -153,32 +187,6 @@ public:
Uint32 * nodeGroup,
Uint32 * connectCount,
const char **address);
-
- // All the functions below may return any of this error codes:
- // NO_CONTACT_WITH_PROCESS, PROCESS_NOT_CONFIGURED, WRONG_PROCESS_TYPE,
- // COULD_NOT_ALLOCATE_MEMORY, SEND_OR_RECEIVE_FAILED
-
- /**
- * Save a configuration to permanent storage
- */
- int saveConfig(const Config *);
-
- /**
- * Save the running configuration
- */
- int saveConfig() {
- return saveConfig(_config);
- };
-
- /**
- * Read configuration from file, or from another MGM server
- */
- Config *readConfig();
-
- /**
- * Fetch configuration from another MGM server
- */
- Config *fetchConfig();
/**
* Stop a node
@@ -386,12 +394,16 @@ public:
* Get configuration
*/
const Config * getConfig() const;
+private:
+ void setConfig(Config* conf);
+ void setClusterLog(void);
+public:
/**
- * Returns the port number.
+ * Returns the port number where MgmApiService is started
* @return port number.
*/
- int getPort() const;
+ int getPort() const { return m_port; };
int setDbParameter(int node, int parameter, const char * value, BaseString&);
int setConnectionDbParameter(int node1, int node2, int param, int value,
@@ -399,15 +411,11 @@ public:
int getConnectionDbParameter(int node1, int node2, int param,
int *value, BaseString& msg);
- int connect_to_self(const char* bindaddress = 0);
-
void transporter_connect(NDB_SOCKET_TYPE sockfd);
- ConfigRetriever *get_config_retriever() { return m_config_retriever; };
-
const char *get_connect_address(Uint32 node_id);
void get_connected_nodes(NodeBitmask &connected_nodes) const;
- SocketServer *get_socket_server() { return m_socket_server; }
+ SocketServer *get_socket_server() { return &m_socket_server; }
void updateStatus();
@@ -452,15 +460,17 @@ private:
int check_nodes_stopping();
//**************************************************************************
-
+
+ const MgmtOpts& m_opts;
int _blockNumber;
NodeId _ownNodeId;
- SocketServer *m_socket_server;
+ Uint32 m_port;
+ SocketServer m_socket_server;
+ ConfigRetriever m_config_retriever;
BlockReference _ownReference;
NdbMutex *m_configMutex;
const Config * _config;
- BaseString m_configFilename;
NodeBitmask m_reserved_nodes;
struct in_addr m_connect_address[MAX_NODES];
@@ -495,8 +505,6 @@ private:
*/
void eventReport(const Uint32 * theData, Uint32 len);
- NdbMgmHandle m_local_mgm_handle;
- char m_local_mgm_connect_string[20];
class TransporterFacade * theFacade;
int sendVersionReq( int processId, Uint32 &version, Uint32& mysql_version,
@@ -522,8 +530,6 @@ private:
struct NdbThread* _logLevelThread;
static void *logLevelThread_C(void *);
void logLevelThreadRun();
-
- ConfigRetriever *m_config_retriever;
};
inline
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvrConfig.cpp 2008-07-29 13:38:18 +0000
@@ -13,62 +13,69 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include <signaldata/TestOrd.hpp>
-#include <OutputStream.hpp>
-
#include "MgmtSrvr.hpp"
#include <InitConfigFileParser.hpp>
#include <ConfigRetriever.hpp>
-#include <ndb_version.h>
+#include <NdbSleep.h>
-/**
- * Save a configuration to the running configuration file
- */
-int
-MgmtSrvr::saveConfig(const Config *conf) {
- BaseString newfile;
- newfile.appfmt("%s.new", m_configFilename.c_str());
-
- /* Open and write to the new config file */
- FILE *f = fopen(newfile.c_str(), "w");
- if(f == NULL) {
- /** @todo Send something apropriate to the log */
- return -1;
- }
- FileOutputStream stream(f);
- conf->printConfigFile(stream);
- fclose(f);
+Config*
+MgmtSrvr::load_init_config(void)
+{
+ InitConfigFileParser parser;
+ g_eventLogger->info("Reading cluster configuration from '%s'",
+ m_opts.config_filename);
+ return parser.parseConfig(m_opts.config_filename);
+}
- /* Rename file to real name */
- rename(newfile.c_str(), m_configFilename.c_str());
- return 0;
+Config*
+MgmtSrvr::load_init_mycnf(void)
+{
+ InitConfigFileParser parser;
+ g_eventLogger->info("Reading cluster configuration using my.cnf");
+ return parser.parse_mycnf();
}
-Config *
-MgmtSrvr::readConfig() {
- Config *conf;
- InitConfigFileParser parser;
- if (m_configFilename.length())
- {
- conf = parser.parseConfig(m_configFilename.c_str());
+
+bool
+MgmtSrvr::fetch_config(void)
+{
+ char buf[128];
+ DBUG_ENTER("MgmtSrvr::fetch_config");
+ assert(_config == NULL);
+
+ /* Loop until config loaded from other mgmd(s) */
+ g_eventLogger->info("Trying to get configuration from other mgmd(s)"\
+ "using '%.128s'...",
+ m_config_retriever.get_connectstring(buf, sizeof(buf)));
+
+
+ int retry= 0, delay= 0, verbose= 1;
+ while (m_config_retriever.do_connect(retry, delay, verbose) != 0) {
+ g_eventLogger->info("Waiting for connection to other mgmd(s)...");
+ NdbSleep_SecSleep(1);
}
- else
- {
- ndbout_c("Reading cluster configuration using my.cnf");
- conf = parser.parse_mycnf();
+ g_eventLogger->info("Connected...");
+
+ // "login" and alloc node id from the other mgmd
+ _ownNodeId= m_config_retriever.allocNodeId(retry, delay);
+ if (_ownNodeId == 0) {
+ g_eventLogger->error(m_config_retriever.getErrorString());
+ DBUG_RETURN(NULL);
}
- return conf;
-}
-Config *
-MgmtSrvr::fetchConfig() {
- struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig();
- if(tmp != 0){
- Config * conf = new Config();
- conf->m_configValues = tmp;
- return conf;
+ // read config from other managent server
+ struct ndb_mgm_configuration * tmp = m_config_retriever.getConfig();
+ if (tmp == NULL) {
+ g_eventLogger->error(m_config_retriever.getErrorString());
+ DBUG_RETURN(NULL);
}
- return 0;
+
+ setConfig(new Config(tmp));
+
+ DBUG_RETURN(true);
}
+
+
+
=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp 2008-07-29 13:49:59 +0000
@@ -38,9 +38,6 @@ extern bool g_StopServer;
extern bool g_RestartServer;
extern EventLogger * g_eventLogger;
-static const unsigned int MAX_READ_TIMEOUT = 1000 ;
-static const unsigned int MAX_WRITE_TIMEOUT = 100 ;
-
/**
const char * name;
const char * realName;
@@ -308,6 +305,12 @@ MgmApiSession::MgmApiSession(class MgmtS
m_session_id= session_id;
m_mutex= NdbMutex_Create();
m_errorInsert= 0;
+
+ struct sockaddr_in addr;
+ SOCKET_SIZE_TYPE addrlen= sizeof(addr);
+ getpeername(sock, (struct sockaddr*)&addr, &addrlen);
+ m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
+
DBUG_VOID_RETURN;
}
@@ -340,6 +343,8 @@ MgmApiSession::runSession()
{
DBUG_ENTER("MgmApiSession::runSession");
+ g_eventLogger->debug("%s: Connected!", name());
+
Parser_t::Context ctx;
ctx.m_mutex= m_mutex;
m_ctx= &ctx;
@@ -350,22 +355,53 @@ MgmApiSession::runSession()
m_input->reset_timeout();
m_output->reset_timeout();
- m_parser->run(ctx, *this);
-
- if(ctx.m_currentToken == 0)
+ if (m_parser->run(ctx, *this))
{
- NdbMutex_Unlock(m_mutex);
- break;
+ stop= m_stop;
+ assert(ctx.m_status == Parser_t::Ok);
}
+ else
+ {
+ const char* msg= NULL;
+ switch(ctx.m_status) {
+ case Parser_t::Eof: // Client disconnected
+ g_eventLogger->debug("%s: Disconnected!", name());
+ stop= true;
+ break;
+
+ case Parser_t::ExternalStop: // Stopped by other thread
+ stop= true;
+ break;
+
+ case Parser_t::NoLine: // Normal read timeout
+ case Parser_t::EmptyLine:
+ break;
+
+ case Parser_t::UnknownCommand: msg= "Unknown command"; break;
+ case Parser_t::UnknownArgument: msg= "Unknown argument"; break;
+ case Parser_t::TypeMismatch: msg= "Type mismatch"; break;
+ case Parser_t::InvalidArgumentFormat: msg= "Invalid arg. format"; break;
+ case Parser_t::UnknownArgumentType: msg= "Unknown argument type"; break;
+ case Parser_t::ArgumentGivenTwice: msg= "Argument given twice"; break;
+ case Parser_t::MissingMandatoryArgument: msg= "Missing arg."; break;
+
+ case Parser_t::Ok: // Should never happen here
+ case Parser_t::CommandWithoutFunction:
+ abort();
+ break;
+ }
- switch(ctx.m_status) {
- case Parser_t::UnknownCommand:
- break;
- default:
- break;
+ if (msg){
+ g_eventLogger->debug("%s: %s, '%s'",
+ name(), msg, ctx.m_currentToken);
+
+ // Send result to client
+ m_output->println("result: %s, '%s'",
+ msg, ctx.m_currentToken);
+ m_output->println("");
+ }
}
- stop= m_stop;
NdbMutex_Unlock(m_mutex);
};
=== modified file 'storage/ndb/src/mgmsrv/Services.hpp'
--- a/storage/ndb/src/mgmsrv/Services.hpp 2007-03-22 11:33:07 +0000
+++ b/storage/ndb/src/mgmsrv/Services.hpp 2008-07-29 13:38:18 +0000
@@ -48,6 +48,9 @@ private:
int m_errorInsert;
+ BaseString m_name;
+ const char* name() { return m_name.c_str(); }
+
const char *get_error_text(int err_no)
{ return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); }
@@ -115,20 +118,15 @@ public:
};
class MgmApiService : public SocketServer::Service {
- class MgmtSrvr * m_mgmsrv;
+ MgmtSrvr& m_mgmsrv;
Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer
public:
- MgmApiService(){
- m_mgmsrv = 0;
- m_next_session_id= 1;
- }
-
- void setMgm(class MgmtSrvr * mgmsrv){
- m_mgmsrv = mgmsrv;
- }
-
+ MgmApiService(MgmtSrvr& mgm):
+ m_mgmsrv(mgm),
+ m_next_session_id(1) {}
+
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){
- return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++);
+ return new MgmApiSession(m_mgmsrv, socket, m_next_session_id++);
}
};
=== modified file 'storage/ndb/src/mgmsrv/main.cpp'
--- a/storage/ndb/src/mgmsrv/main.cpp 2008-06-09 10:52:24 +0000
+++ b/storage/ndb/src/mgmsrv/main.cpp 2008-07-29 14:28:33 +0000
@@ -18,29 +18,18 @@
#include "MgmtSrvr.hpp"
#include "EventLogger.hpp"
-#include <Config.hpp>
-#include "InitConfigFileParser.hpp"
-#include <SocketServer.hpp>
-#include "Services.hpp"
+#include "Config.hpp"
+
#include <version.h>
#include <kernel_types.h>
-#include <NdbOut.hpp>
-#include <NdbMain.h>
#include <NdbDaemon.h>
#include <NdbConfig.h>
-#include <NdbHost.h>
+#include <NdbSleep.h>
#include <ndb_version.h>
-#include <ConfigRetriever.hpp>
#include <mgmapi_config_parameters.h>
-
#include <NdbAutoPtr.hpp>
-
#include <ndb_mgmclient.hpp>
-#undef DEBUG
-#define DEBUG(x) ndbout << x << endl;
-
-const char progname[] = "mgmtsrvr";
const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
// copied from mysql.cc to get readline
@@ -87,97 +76,53 @@ read_and_execute(Ndb_mgmclient* com, con
return com->execute(line_read,_try_reconnect);
}
-/**
- * @struct MgmGlobals
- * @brief Global Variables used in the management server
- *****************************************************************************/
-
-/** Command line arguments */
-static int opt_daemon; // NOT bool, bool need not be int
-static int opt_non_interactive;
-static int opt_interactive;
-static const char * opt_config_filename= 0;
-static int opt_mycnf = 0;
-static const char* _bind_address = 0;
-
-struct MgmGlobals {
- MgmGlobals();
- ~MgmGlobals();
-
- /** Stuff found in environment or in local config */
- NodeId localNodeId;
- short unsigned int port;
-
- /** The Mgmt Server */
- MgmtSrvr * mgmObject;
-
- /** The Socket Server */
- SocketServer * socketServer;
-};
+/* Global variables */
+bool g_StopServer= false;
+bool g_RestartServer= false;
+static MgmtSrvr* mgm;
+static MgmtSrvr::MgmtOpts opts;
-int g_no_nodeid_checks= 0;
-int g_print_full_config;
-static MgmGlobals *glob= 0;
-
-/******************************************************************************
- * Function prototypes
- ******************************************************************************/
-/**
- * Global variables
- */
-bool g_StopServer;
-bool g_RestartServer;
-extern EventLogger * g_eventLogger;
-
-enum ndb_mgmd_options {
- OPT_INTERACTIVE = NDB_STD_OPTIONS_LAST,
- OPT_NO_NODEID_CHECKS,
- OPT_NO_DAEMON
-};
NDB_STD_OPTS_VARS;
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_mgmd"),
{ "config-file", 'f', "Specify cluster configuration file",
- (uchar**) &opt_config_filename, (uchar**) &opt_config_filename, 0,
+ (uchar**) &opts.config_filename, (uchar**) &opts.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "print-full-config", 'P', "Print full config and exit",
- (uchar**) &g_print_full_config, (uchar**) &g_print_full_config, 0,
+ (uchar**) &opts.print_full_config, (uchar**) &opts.print_full_config, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
- (uchar**) &opt_daemon, (uchar**) &opt_daemon, 0,
+ (uchar**) &opts.daemon, (uchar**) &opts.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
- { "interactive", OPT_INTERACTIVE,
+ { "interactive", 256,
"Run interactive. Not supported but provided for testing purposes",
- (uchar**) &opt_interactive, (uchar**) &opt_interactive, 0,
+ (uchar**) &opts.interactive, (uchar**) &opts.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
- "Do not provide any node id checks",
- (uchar**) &g_no_nodeid_checks, (uchar**) &g_no_nodeid_checks, 0,
+ { "no-nodeid-checks", 256,
+ "Do not provide any node id checks",
+ (uchar**) &opts.no_nodeid_checks, (uchar**) &opts.no_nodeid_checks, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "nodaemon", OPT_NO_DAEMON,
+ { "nodaemon", 256,
"Don't run as daemon, but don't read from stdin",
- (uchar**) &opt_non_interactive, (uchar**) &opt_non_interactive, 0,
+ (uchar**) &opts.non_interactive, (uchar**) &opts.non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "mycnf", 256,
"Read cluster config from my.cnf",
- (uchar**) &opt_mycnf, (uchar**) &opt_mycnf, 0,
+ (uchar**) &opts.mycnf, (uchar**) &opts.mycnf, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "bind-address", 256,
+ { "bind-address", 256,
"Local bind address",
- (uchar**) &_bind_address, (uchar**) &_bind_address, 0,
+ (uchar**) &opts.bind_address, (uchar**) &opts.bind_address, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
-static void short_usage_sub(void)
-{
- printf("Usage: %s [OPTIONS]\n", my_progname);
-}
+
static void usage()
{
- short_usage_sub();
+ printf("Usage: %s [OPTIONS]\n", my_progname);
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
@@ -185,14 +130,13 @@ static void usage()
my_print_variables(my_long_options);
}
-/*
- * MAIN
- */
+
int main(int argc, char** argv)
{
-
NDB_INIT(argv[0]);
+ g_eventLogger->setCategory("MgmSrvr");
+
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@@ -203,167 +147,108 @@ int main(int argc, char** argv)
ndb_std_get_one_option)))
exit(ho_error);
-start:
- glob= new MgmGlobals;
-
- if (opt_interactive ||
- opt_non_interactive ||
- g_print_full_config) {
- opt_daemon= 0;
+ if (opts.interactive ||
+ opts.non_interactive ||
+ opts.print_full_config) {
+ opts.daemon= 0;
}
- if (opt_mycnf && opt_config_filename)
+ /* Output to console initially */
+ g_eventLogger->createConsoleHandler();
+
+ if (opts.mycnf && opts.config_filename)
{
- ndbout_c("Both --mycnf and -f is not supported");
- return 0;
+ g_eventLogger->error("Both --mycnf and -f is not supported");
+ exit(1);
}
- if (opt_mycnf == 0 && opt_config_filename == 0)
+ if (opts.mycnf == 0 && opts.config_filename == 0)
{
struct stat buf;
if (stat("config.ini", &buf) != -1)
- opt_config_filename = "config.ini";
+ opts.config_filename = "config.ini";
}
-
- glob->socketServer = new SocketServer();
-
- MgmApiService * mapi = new MgmApiService();
+start:
- glob->mgmObject = new MgmtSrvr(glob->socketServer,
- opt_config_filename,
- opt_connect_str);
+ g_eventLogger->info("NDB Cluster Management Server. %s", NDB_VERSION_STRING);
- if (g_print_full_config)
- goto the_end;
+ mgm= new MgmtSrvr(opts, opt_connect_str);
+ if (mgm == NULL) {
+ g_eventLogger->critical("Out of memory, couldn't create MgmtSrvr");
+ exit(1);
+ }
- my_setwd(NdbConfig_get_path(0), MYF(0));
+ /**
+ Install signal handler for SIGPIPE
+ Done in TransporterFacade as well.. what about Configretriever?
+ */
+#if !defined NDB_WIN32
+ signal(SIGPIPE, SIG_IGN);
+#endif
- glob->localNodeId= glob->mgmObject->getOwnNodeId();
- if (glob->localNodeId == 0) {
- goto error_end;
+ /* Init mgm, load or fetch config */
+ if (!mgm->init()) {
+ delete mgm;
+ exit(1);
}
- glob->port= glob->mgmObject->getPort();
+ my_setwd(NdbConfig_get_path(0), MYF(0));
- if (glob->port == 0)
- goto error_end;
+ if (opts.daemon) {
- {
- int count= 5; // no of retries for tryBind
- while(!glob->socketServer->tryBind(glob->port, _bind_address)){
- if (--count > 0) {
- NdbSleep_MilliSleep(1000);
- continue;
- }
- ndbout_c("Unable to setup port: %s:%d!\n"
- "Please check if the port is already used,\n"
- "(perhaps a ndb_mgmd is already running),\n"
- "and if you are executing on the correct computer",
- (_bind_address ? _bind_address : "*"), glob->port);
- goto error_end;
+ NodeId localNodeId= mgm->getOwnNodeId();
+ if (localNodeId == 0) {
+ g_eventLogger->error("Couldn't get own node id");
+ delete mgm;
+ exit(1);
}
- }
-
- if(!glob->socketServer->setup(mapi, &glob->port, _bind_address))
- {
- ndbout_c("Unable to setup management port: %s:%d!\n"
- "Please check if the port is already used,\n"
- "(perhaps a ndb_mgmd is already running),\n"
- "and if you are executing on the correct computer",
- (_bind_address ? _bind_address : "*"), glob->port);
- delete mapi;
- goto error_end;
- }
- if (opt_daemon) {
// Become a daemon
- char *lockfile= NdbConfig_PidFileName(glob->localNodeId);
- char *logfile= NdbConfig_StdoutFileName(glob->localNodeId);
+ char *lockfile= NdbConfig_PidFileName(localNodeId);
+ char *logfile= NdbConfig_StdoutFileName(localNodeId);
NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile);
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
- ndbout << "Cannot become daemon: " << NdbDaemon_ErrorText <<
endl;
- return 1;
+ g_eventLogger->error("Cannot become daemon: %s", NdbDaemon_ErrorText);
+ delete mgm;
+ exit(1);
}
}
-#ifndef NDB_WIN32
- signal(SIGPIPE, SIG_IGN);
-#endif
- {
- BaseString error_string;
- if(!glob->mgmObject->start(error_string, _bind_address)){
- ndbout_c("Unable to start management server.");
- ndbout_c("Probably caused by illegal initial configuration file.");
- ndbout_c(error_string.c_str());
- goto error_end;
- }
+ /* Start mgm services */
+ if (!mgm->start()) {
+ delete mgm;
+ exit(1);
}
- //glob->mgmObject->saveConfig();
- mapi->setMgm(glob->mgmObject);
-
- char msg[256];
- BaseString::snprintf(msg, sizeof(msg),
- "NDB Cluster Management Server. %s", NDB_VERSION_STRING);
- ndbout_c(msg);
- g_eventLogger->info(msg);
-
- BaseString::snprintf(msg, 256, "Id: %d, Command port: %s:%d",
- glob->localNodeId,
- _bind_address ? _bind_address : "*",
- glob->port);
- ndbout_c(msg);
- g_eventLogger->info(msg);
-
- g_StopServer = false;
- g_RestartServer= false;
- glob->socketServer->startServer();
-
- if(opt_interactive) {
+ if(opts.interactive) {
+ int port= mgm->getPort();
BaseString con_str;
- if(_bind_address)
- con_str.appfmt("host=%s:%d", _bind_address, glob->port);
- else
- con_str.appfmt("localhost:%d", glob->port);
+ if(opts.bind_address)
+ con_str.appfmt("host=%s:%d", opts.bind_address, port);
+ else
+ con_str.appfmt("localhost:%d", port);
Ndb_mgmclient com(con_str.c_str(), 1);
while(g_StopServer != true && read_and_execute(&com, "ndb_mgm> ", 1));
- } else
- {
+ }
+ else {
while(g_StopServer != true)
NdbSleep_MilliSleep(500);
}
- if(g_RestartServer)
- g_eventLogger->info("Restarting server...");
- else
- g_eventLogger->info("Shutting down server...");
- glob->socketServer->stopServer();
- // We disconnect from the ConfigRetreiver mgmd when we delete glob below
- glob->socketServer->stopSessions(true);
+ g_eventLogger->info("Shutting down server...");
+ delete mgm;
g_eventLogger->info("Shutdown complete");
- the_end:
- delete glob;
- if(g_RestartServer)
+
+ if(g_RestartServer){
+ g_eventLogger->info("Restarting server...");
+ g_RestartServer= g_StopServer= false;
goto start;
+ }
+
+ g_eventLogger->close();
+
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return 0;
- error_end:
- delete glob;
- ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
- return 1;
}
-MgmGlobals::MgmGlobals(){
- // Default values
- port = 0;
- socketServer = 0;
- mgmObject = 0;
-}
-
-MgmGlobals::~MgmGlobals(){
- if (socketServer)
- delete socketServer;
- if (mgmObject)
- delete mgmObject;
-}
=== modified file 'storage/ndb/src/ndbapi/SignalSender.cpp'
--- a/storage/ndb/src/ndbapi/SignalSender.cpp 2007-04-12 05:49:09 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.cpp 2008-07-25 11:28:05 +0000
@@ -70,13 +70,13 @@ SimpleSignal::print(FILE * out){
}
}
-SignalSender::SignalSender(TransporterFacade *facade)
+SignalSender::SignalSender(TransporterFacade *facade, int blockNo)
: m_lock(0)
{
m_cond = NdbCondition_Create();
theFacade = facade;
lock();
- m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
+ m_blockNo = theFacade->open(this, execSignal, execNodeStatus, blockNo);
unlock();
assert(m_blockNo > 0);
}
@@ -143,7 +143,12 @@ SignalSender::waitFor(Uint32 timeOutMill
}
return s;
}
-
+
+ /* Remove old signals from usedBuffer */
+ for (int i= 0; i < m_usedBuffer.size(); i++)
+ delete m_usedBuffer[i];
+ m_usedBuffer.clear();
+
NDB_TICKS now = NdbTick_CurrentMillisecond();
NDB_TICKS stop = now + timeOutMillis;
Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
=== modified file 'storage/ndb/src/ndbapi/SignalSender.hpp'
--- a/storage/ndb/src/ndbapi/SignalSender.hpp 2007-03-02 10:52:12 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.hpp 2008-06-10 14:15:53 +0000
@@ -43,7 +43,7 @@ private:
class SignalSender {
public:
- SignalSender(TransporterFacade *facade);
+ SignalSender(TransporterFacade *facade, int blockNo = -1);
virtual ~SignalSender();
int lock();
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-07-22 13:42:54 +0000
@@ -388,7 +388,7 @@ TransporterFacade::deliver_signal(Signal
return;
} else {
; // Ignore all other block numbers.
- if(header->theVerId_signalNumber!=3) {
+ if(header->theVerId_signalNumber != GSN_API_REGREQ) {
TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
<< header->theVerId_signalNumber << endl;
@@ -925,10 +925,11 @@ TransporterFacade::close_local(BlockNumb
int
TransporterFacade::open(void* objRef,
ExecuteFunction fun,
- NodeStatusFunction statusFun)
+ NodeStatusFunction statusFun,
+ int blockNo)
{
DBUG_ENTER("TransporterFacade::open");
- int r= m_threads.open(objRef, fun, statusFun);
+ int r= m_threads.open(objRef, fun, statusFun, blockNo);
if (r < 0)
DBUG_RETURN(r);
#if 1
@@ -1368,14 +1369,42 @@ TransporterFacade::ThreadData::expand(Ui
int
TransporterFacade::ThreadData::open(void* objRef,
ExecuteFunction fun,
- NodeStatusFunction fun2)
+ NodeStatusFunction fun2,
+ int blockNo)
{
Uint32 nextFree = m_firstFree;
if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
return -1;
}
-
+
+ Object_Execute oe = { objRef , fun };
+
+ if (unlikely(blockNo >= 0)) {
+ // Open block with fixed number
+ Uint32 index= numberToIndex(blockNo);
+
+ if(index > m_statusNext.size()){
+ expand(index - m_statusNext.size());
+ }
+
+ m_use_cnt++;
+
+ // Single linked free list, relink the previous one that points to this
+ for(Uint32 i = 0; i < m_statusNext.size(); i++){
+ if (m_statusNext[i] == index){
+ m_statusNext[i]= m_statusNext[index];
+ break;
+ }
+ }
+
+ m_statusNext[index] = INACTIVE;
+ m_objectExecute[index] = oe;
+ m_statusFunction[index] = fun2;
+
+ return indexToNumber(index);
+ }
+
if(nextFree == END_OF_LIST){
expand(10);
nextFree = m_firstFree;
@@ -1383,8 +1412,6 @@ TransporterFacade::ThreadData::open(void
m_use_cnt++;
m_firstFree = m_statusNext[nextFree];
-
- Object_Execute oe = { objRef , fun };
m_statusNext[nextFree] = INACTIVE;
m_objectExecute[nextFree] = oe;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-07-22 13:42:54 +0000
@@ -60,9 +60,11 @@ public:
/**
* Register this block for sending/receiving signals
+ * @blockNo block number to use, -1 => any blockNumber
* @return BlockNumber or -1 for failure
*/
- int open(void* objRef, ExecuteFunction, NodeStatusFunction);
+ int open(void* objRef, ExecuteFunction, NodeStatusFunction,
+ int blockNo = -1);
// Close this block number
int close(BlockNumber blockNumber, Uint64 trans_id);
@@ -279,7 +281,7 @@ private:
Vector<Object_Execute> m_objectExecute;
Vector<NodeStatusFunction> m_statusFunction;
- int open(void* objRef, ExecuteFunction, NodeStatusFunction);
+ int open(void* objRef, ExecuteFunction, NodeStatusFunction, int);
int close(int number);
void expand(Uint32 size);
=== modified file 'storage/ndb/test/include/NDBT_Test.hpp'
--- a/storage/ndb/test/include/NDBT_Test.hpp 2008-06-23 12:37:22 +0000
+++ b/storage/ndb/test/include/NDBT_Test.hpp 2008-07-22 13:25:43 +0000
@@ -122,13 +122,11 @@ typedef int (NDBT_TESTFUNC)(NDBT_Context
class NDBT_Step {
public:
- NDBT_Step(NDBT_TestCase* ptest,
- const char* pname,
- NDBT_TESTFUNC* pfunc);
+ NDBT_Step(NDBT_TestCase* ptest,
+ const char* pname,
+ NDBT_TESTFUNC* pfunc);
virtual ~NDBT_Step() {}
int execute(NDBT_Context*);
- virtual int setUp(Ndb_cluster_connection&) = 0;
- virtual void tearDown() = 0;
void setContext(NDBT_Context*);
NDBT_Context* getContext();
void print();
@@ -141,23 +139,18 @@ protected:
NDBT_TESTFUNC* func;
NDBT_TestCase* testcase;
int step_no;
-};
-class NDBT_NdbApiStep : public NDBT_Step {
+private:
+ int setUp(Ndb_cluster_connection&);
+ void tearDown();
+ Ndb* m_ndb;
+
public:
- NDBT_NdbApiStep(NDBT_TestCase* ptest,
- const char* pname,
- NDBT_TESTFUNC* pfunc);
- virtual ~NDBT_NdbApiStep() {}
- virtual int setUp(Ndb_cluster_connection&);
- virtual void tearDown();
+ Ndb* getNdb() const;
- Ndb* getNdb();
-protected:
- Ndb* ndb;
};
-class NDBT_ParallelStep : public NDBT_NdbApiStep {
+class NDBT_ParallelStep : public NDBT_Step {
public:
NDBT_ParallelStep(NDBT_TestCase* ptest,
const char* pname,
@@ -165,7 +158,7 @@ public:
virtual ~NDBT_ParallelStep() {}
};
-class NDBT_Verifier : public NDBT_NdbApiStep {
+class NDBT_Verifier : public NDBT_Step {
public:
NDBT_Verifier(NDBT_TestCase* ptest,
const char* name,
@@ -173,7 +166,7 @@ public:
virtual ~NDBT_Verifier() {}
};
-class NDBT_Initializer : public NDBT_NdbApiStep {
+class NDBT_Initializer : public NDBT_Step {
public:
NDBT_Initializer(NDBT_TestCase* ptest,
const char* name,
@@ -181,7 +174,7 @@ public:
virtual ~NDBT_Initializer() {}
};
-class NDBT_Finalizer : public NDBT_NdbApiStep {
+class NDBT_Finalizer : public NDBT_Step {
public:
NDBT_Finalizer(NDBT_TestCase* ptest,
const char* name,
@@ -190,6 +183,12 @@ public:
};
+enum NDBT_DriverType {
+ DummyDriver,
+ NdbApiDriver
+};
+
+
class NDBT_TestCase {
public:
NDBT_TestCase(NDBT_TestSuite* psuite,
@@ -211,10 +210,13 @@ public:
virtual bool tableExists(NdbDictionary::Table* aTable) = 0;
virtual bool isVerify(const NdbDictionary::Table* aTable) = 0;
- virtual void saveTestResult(const NdbDictionary::Table* ptab, int result) = 0;
+ virtual void saveTestResult(const char*, int result) = 0;
virtual void printTestResult() = 0;
void initBeforeTest(){ timer.doReset();};
+ void setDriverType(NDBT_DriverType type) { m_driverType= type; }
+ NDBT_DriverType getDriverType() const { return m_driverType; }
+
/**
* Get no of steps running/completed
*/
@@ -243,6 +245,7 @@ protected:
Properties props;
NdbTimer timer;
bool isVerifyTables;
+ NDBT_DriverType m_driverType;
};
static const int FAILED_TO_CREATE = 1000;
@@ -281,7 +284,7 @@ public:
virtual ~NDBT_TestCaseImpl1();
int addStep(NDBT_Step*);
int addVerifier(NDBT_Verifier*);
- int addInitializer(NDBT_Initializer*);
+ int addInitializer(NDBT_Initializer*, bool first= false);
int addFinalizer(NDBT_Finalizer*);
void addTable(const char*, bool);
bool tableExists(NdbDictionary::Table*);
@@ -300,7 +303,7 @@ public:
private:
static const int NORESULT = 999;
- void saveTestResult(const NdbDictionary::Table* ptab, int result);
+ void saveTestResult(const char*, int result);
void printTestResult();
void startStepInThread(int stepNo, NDBT_Context* ctx);
@@ -371,14 +374,24 @@ public:
void setTemporaryTables(bool val);
bool getTemporaryTables() const;
+
+ void setLogging(bool val);
+ bool getLogging() const;
+
+ int createTables(Ndb_cluster_connection&) const;
+ int dropTables(Ndb_cluster_connection&) const;
+
+ void setDriverType(NDBT_DriverType type) { m_driverType= type; }
+ NDBT_DriverType getDriverType() const { return m_driverType; }
+
private:
int executeOne(Ndb_cluster_connection&,
const char* _tabname, const char* testname = NULL);
int executeAll(Ndb_cluster_connection&,
const char* testname = NULL);
void execute(Ndb_cluster_connection&,
- Ndb*, const NdbDictionary::Table*, const char* testname = NULL);
-
+ const NdbDictionary::Table*, const char* testname = NULL);
+
int report(const char* _tcname = NULL);
int reportAllTables(const char* );
const char* name;
@@ -392,13 +405,14 @@ private:
int loops;
int timer;
NdbTimer testSuiteTimer;
- bool createTable;
+ bool m_createTable;
+ bool m_createAll;
bool diskbased;
bool runonce;
const char* tsname;
- bool createAllTables;
bool temporaryTables;
- bool nologging;
+ bool m_logging;
+ NDBT_DriverType m_driverType;
};
@@ -413,10 +427,18 @@ C##suitname():NDBT_TestSuite(#suitname){
NDBT_Initializer* pti; pti = NULL; \
NDBT_Finalizer* ptf; ptf = NULL;
+// The default driver type to use for all tests in suite
+#define DRIVER(type) \
+ setDriverType(type)
+
#define TESTCASE(testname, comment) \
pt = new NDBT_TestCaseImpl1(this, testname, comment); \
addTest(pt);
+// The driver type to use for a particular testcase
+#define TESTCASE_DRIVER(type) \
+ pt->setDriverType(type);
+
#define TC_PROPERTY(propname, propval) \
pt->setProperty(propname, propval);
@@ -464,6 +486,6 @@ C##suitname():NDBT_TestSuite(#suitname){
} } ; C##suitname suitname
// Helper functions for retrieving variables from NDBT_Step
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
+#define GETNDB(ps) ((NDBT_Step*)ps)->getNdb()
#endif
=== modified file 'storage/ndb/test/include/NdbMgmd.hpp'
--- a/storage/ndb/test/include/NdbMgmd.hpp 2008-06-23 12:37:22 +0000
+++ b/storage/ndb/test/include/NdbMgmd.hpp 2008-07-22 13:25:43 +0000
@@ -26,7 +26,7 @@ public:
m_connect_str(getenv("NDB_CONNECTSTRING"))
{
if (!m_connect_str.length()){
- fprintf(stderr, "Could not init NdbConnectCtring");
+ fprintf(stderr, "Could not init NdbConnectString");
abort();
}
}
=== modified file 'storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp'
--- a/storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp 2008-03-03 15:10:42 +0000
+++ b/storage/ndb/test/ndbapi/acrt/NdbRepStress.cpp 2008-07-22 13:25:43 +0000
@@ -45,10 +45,6 @@ urandom(uint m)
return r;
}
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
-/*
-*/
-
bool
syncSlaveWithMaster()
{
=== modified file 'storage/ndb/test/ndbapi/testBasic.cpp'
--- a/storage/ndb/test/ndbapi/testBasic.cpp 2007-11-02 16:23:17 +0000
+++ b/storage/ndb/test/ndbapi/testBasic.cpp 2008-07-22 13:25:43 +0000
@@ -19,8 +19,6 @@
#include <UtilTransactions.hpp>
#include <NdbRestarter.hpp>
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
-
/**
* TODO
=== modified file 'storage/ndb/test/ndbapi/testBasicAsynch.cpp'
--- a/storage/ndb/test/ndbapi/testBasicAsynch.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/testBasicAsynch.cpp 2008-07-22 13:25:43 +0000
@@ -19,8 +19,6 @@
#include "HugoAsynchTransactions.hpp"
#include "UtilTransactions.hpp"
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
-
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
int records = ctx->getNumRecords();
=== modified file 'storage/ndb/test/ndbapi/testMgm.cpp'
--- a/storage/ndb/test/ndbapi/testMgm.cpp 2008-06-23 12:52:49 +0000
+++ b/storage/ndb/test/ndbapi/testMgm.cpp 2008-07-22 13:25:43 +0000
@@ -642,6 +642,7 @@ done:
}
NDBT_TESTSUITE(testMgm);
+DRIVER(DummyDriver); /* turn off use of NdbApi */
TESTCASE("ApiSessionFailure",
"Test failures in MGMAPI session"){
INITIALIZER(runTestApiSession);
@@ -681,6 +682,8 @@ NDBT_TESTSUITE_END(testMgm);
int main(int argc, const char** argv){
ndb_init();
+ testMgm.setCreateTable(false);
+ testMgm.setRunAllTables(true);
return testMgm.execute(argc, argv);
}
=== modified file 'storage/ndb/test/ndbapi/testOIBasic.cpp'
--- a/storage/ndb/test/ndbapi/testOIBasic.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/test/ndbapi/testOIBasic.cpp 2008-07-23 11:36:53 +0000
@@ -614,6 +614,10 @@ getcs(Par par)
uint n = urandom(maxcsnumber);
cs = get_charset(n, MYF(0));
if (cs != 0) {
+ // avoid dodgy internal character sets
+ // see bug# 37554
+ if (cs->state & MY_CS_HIDDEN)
+ continue;
// prefer complex charsets
if (cs->mbmaxlen != 1 || urandom(5) == 0)
break;
=== modified file 'storage/ndb/test/ndbapi/testPartitioning.cpp'
--- a/storage/ndb/test/ndbapi/testPartitioning.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/test/ndbapi/testPartitioning.cpp 2008-07-22 13:27:57 +0000
@@ -19,8 +19,6 @@
#include <UtilTransactions.hpp>
#include <NdbRestarter.hpp>
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
-
static Uint32 max_dks = 0;
static
=== modified file 'storage/ndb/test/ndbapi/test_event.cpp'
--- a/storage/ndb/test/ndbapi/test_event.cpp 2008-06-18 21:25:50 +0000
+++ b/storage/ndb/test/ndbapi/test_event.cpp 2008-07-22 13:27:57 +0000
@@ -23,8 +23,6 @@
#include <NdbRestarts.hpp>
#include <signaldata/DumpStateOrd.hpp>
-#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
-
static int createEvent(Ndb *pNdb,
const NdbDictionary::Table &tab,
NDBT_Context* ctx)
=== modified file 'storage/ndb/test/src/NDBT_Tables.cpp'
--- a/storage/ndb/test/src/NDBT_Tables.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/test/src/NDBT_Tables.cpp 2008-07-22 13:27:57 +0000
@@ -1248,6 +1248,7 @@ loop:
if(r == -1){
if(pNdb->getDictionary()->getNdbError().code == 755)
{
+ ndbout << "Error: " << pNdb->getDictionary()->getNdbError() <<
endl;
if (create_default_tablespace(pNdb) == 0)
{
goto loop;
=== modified file 'storage/ndb/test/src/NDBT_Test.cpp'
--- a/storage/ndb/test/src/NDBT_Test.cpp 2008-06-23 12:37:22 +0000
+++ b/storage/ndb/test/src/NDBT_Test.cpp 2008-07-22 13:25:43 +0000
@@ -25,7 +25,6 @@
#include <time.h>
-// No verbose outxput
NDBT_Context::NDBT_Context(Ndb_cluster_connection& con)
: m_cluster_connection(con)
@@ -258,14 +257,58 @@ void NDBT_Context::setNumLoops(int _loop
loops = _loops;
}
-NDBT_Step::NDBT_Step(NDBT_TestCase* ptest, const char* pname,
- NDBT_TESTFUNC* pfunc): name(pname){
- assert(pfunc != NULL);
- func = pfunc;
- testcase = ptest;
- step_no = -1;
+NDBT_Step::NDBT_Step(NDBT_TestCase* ptest, const char* pname,
+ NDBT_TESTFUNC* pfunc) :
+ m_ctx(NULL), name(pname), func(pfunc),
+ testcase(ptest), step_no(-1), m_ndb(NULL)
+{
}
+
+int
+NDBT_Step::setUp(Ndb_cluster_connection& con){
+
+ switch(testcase->getDriverType())
+ {
+ case DummyDriver:
+ break;
+
+ case NdbApiDriver:
+ {
+ m_ndb = new Ndb(&con, "TEST_DB" );
+ m_ndb->init(1024);
+
+ int result = m_ndb->waitUntilReady(300); // 5 minutes
+ if (result != 0){
+ g_err << "Ndb was not ready" << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ }
+
+ default:
+ abort();
+ break;
+
+ }
+
+ return NDBT_OK;
+}
+
+
+void
+NDBT_Step::tearDown(){
+ delete m_ndb;
+ m_ndb = NULL;
+}
+
+
+Ndb* NDBT_Step::getNdb() const {
+ assert(m_ndb != NULL);
+ return m_ndb;
+}
+
+
int NDBT_Step::execute(NDBT_Context* ctx) {
assert(ctx != NULL);
@@ -305,58 +348,26 @@ NDBT_Context* NDBT_Step::getContext(){
return m_ctx;
}
-NDBT_NdbApiStep::NDBT_NdbApiStep(NDBT_TestCase* ptest,
- const char* pname,
- NDBT_TESTFUNC* pfunc)
- : NDBT_Step(ptest, pname, pfunc),
- ndb(NULL) {
-}
-
-
-int
-NDBT_NdbApiStep::setUp(Ndb_cluster_connection& con){
- ndb = new Ndb(&con, "TEST_DB" );
- ndb->init(1024);
-
- int result = ndb->waitUntilReady(300); // 5 minutes
- if (result != 0){
- g_err << name << ": Ndb was not ready" << endl;
- return NDBT_FAILED;
- }
- return NDBT_OK;
-}
-
-void
-NDBT_NdbApiStep::tearDown(){
- delete ndb;
- ndb = NULL;
-}
-
-Ndb* NDBT_NdbApiStep::getNdb(){
- assert(ndb != NULL);
- return ndb;
-}
-
NDBT_ParallelStep::NDBT_ParallelStep(NDBT_TestCase* ptest,
const char* pname,
NDBT_TESTFUNC* pfunc)
- : NDBT_NdbApiStep(ptest, pname, pfunc) {
+ : NDBT_Step(ptest, pname, pfunc) {
}
NDBT_Verifier::NDBT_Verifier(NDBT_TestCase* ptest,
const char* pname,
NDBT_TESTFUNC* pfunc)
- : NDBT_NdbApiStep(ptest, pname, pfunc) {
+ : NDBT_Step(ptest, pname, pfunc) {
}
NDBT_Initializer::NDBT_Initializer(NDBT_TestCase* ptest,
const char* pname,
NDBT_TESTFUNC* pfunc)
- : NDBT_NdbApiStep(ptest, pname, pfunc) {
+ : NDBT_Step(ptest, pname, pfunc) {
}
NDBT_Finalizer::NDBT_Finalizer(NDBT_TestCase* ptest,
const char* pname,
NDBT_TESTFUNC* pfunc)
- : NDBT_NdbApiStep(ptest, pname, pfunc) {
+ : NDBT_Step(ptest, pname, pfunc) {
}
NDBT_TestCase::NDBT_TestCase(NDBT_TestSuite* psuite,
@@ -386,6 +397,8 @@ NDBT_TestCaseImpl1::NDBT_TestCaseImpl1(N
numStepsCompleted = 0;
waitThreadsMutexPtr = NdbMutex_Create();
waitThreadsCondPtr = NdbCondition_Create();
+
+ m_driverType= psuite->getDriverType();
}
NDBT_TestCaseImpl1::~NDBT_TestCaseImpl1(){
@@ -429,9 +442,13 @@ int NDBT_TestCaseImpl1::addVerifier(NDBT
return 0;
}
-int NDBT_TestCaseImpl1::addInitializer(NDBT_Initializer* pInitializer){
+int NDBT_TestCaseImpl1::addInitializer(NDBT_Initializer* pInitializer,
+ bool first){
assert(pInitializer != NULL);
- initializers.push_back(pInitializer);
+ if (first)
+ initializers.push(pInitializer, 0);
+ else
+ initializers.push_back(pInitializer);
return 0;
}
@@ -722,9 +739,9 @@ int NDBT_TestCaseImpl1::runFinal(NDBT_Co
}
-void NDBT_TestCaseImpl1::saveTestResult(const NdbDictionary::Table* ptab,
+void NDBT_TestCaseImpl1::saveTestResult(const char* test_name,
int result){
- testResults.push_back(new NDBT_TestCaseResult(ptab->getName(),
+ testResults.push_back(new NDBT_TestCaseResult(test_name,
result,
timer.elapsedTime()));
}
@@ -754,18 +771,21 @@ void NDBT_TestCaseImpl1::printTestResult
-NDBT_TestSuite::NDBT_TestSuite(const char* pname):name(pname){
+NDBT_TestSuite::NDBT_TestSuite(const char* pname) :
+ name(pname),
+ m_createTable(true),
+ m_createAll(false),
+ m_logging(true),
+ m_driverType(NdbApiDriver)
+{
numTestsOk = 0;
numTestsFail = 0;
numTestsExecuted = 0;
records = 0;
loops = 0;
- createTable = true;
diskbased = false;
tsname = NULL;
- createAllTables = false;
temporaryTables = false;
- nologging = false;
}
@@ -777,14 +797,14 @@ NDBT_TestSuite::~NDBT_TestSuite(){
}
void NDBT_TestSuite::setCreateTable(bool _flag){
- createTable = _flag;
+ m_createTable = _flag;
}
void NDBT_TestSuite::setRunAllTables(bool _flag){
runonce = _flag;
}
void NDBT_TestSuite::setCreateAllTables(bool _flag){
- createAllTables = _flag;
+ m_createAll = _flag;
}
void NDBT_TestSuite::setTemporaryTables(bool val){
@@ -795,6 +815,14 @@ bool NDBT_TestSuite::getTemporaryTables(
return temporaryTables;
}
+void NDBT_TestSuite::setLogging(bool val){
+ m_logging = val;
+}
+
+bool NDBT_TestSuite::getLogging() const {
+ return m_logging;
+}
+
bool NDBT_TestSuite::timerIsOn(){
return (timer != 0);
}
@@ -819,91 +847,41 @@ int NDBT_TestSuite::executeAll(Ndb_clust
if(tests.size() == 0)
return NDBT_FAILED;
- Ndb ndb(&con, "TEST_DB");
- ndb.init(1024);
-
- int result = ndb.waitUntilReady(500); // 5 minutes
- if (result != 0){
- g_err << name <<": Ndb was not ready" << endl;
- return NDBT_FAILED;
- }
ndbout << name << " started [" << getDate() << "]" <<
endl;
-
if(!runonce)
{
testSuiteTimer.doStart();
for (int t=0; t < NDBT_Tables::getNumTables(); t++){
const NdbDictionary::Table* ptab = NDBT_Tables::getTable(t);
ndbout << "|- " << ptab->getName() << endl;
- execute(con, &ndb, ptab, _testname);
+ execute(con, ptab, _testname);
}
testSuiteTimer.doStop();
}
else
{
- NdbDictionary::Dictionary* pDict= ndb.getDictionary();
for (unsigned i = 0; i < tests.size(); i++){
if (_testname != NULL && strcasecmp(tests[i]->getName(), _testname) !=
0)
continue;
-
-
+
tests[i]->initBeforeTest();
ctx = new NDBT_Context(con);
- Uint32 t;
- for (t=0; t < (Uint32)NDBT_Tables::getNumTables(); t++)
- {
- const NdbDictionary::Table* pTab = NDBT_Tables::getTable(t);
- const NdbDictionary::Table* pTab2 = pDict->getTable(pTab->getName());
-
- if(pTab2 != 0 && pDict->dropTable(pTab->getName()) != 0)
- {
- numTestsFail++;
- numTestsExecuted++;
- g_err << "ERROR0: Failed to drop table " << pTab->getName() <<
endl;
- tests[i]->saveTestResult(pTab, FAILED_TO_CREATE);
- continue;
- }
-
- if (NDBT_Tables::createTable(&ndb, pTab->getName(),
- nologging, false,
- g_create_hook, this) != 0) {
- numTestsFail++;
- numTestsExecuted++;
- g_err << "ERROR1: Failed to create table " << pTab->getName()
- << pDict->getNdbError() << endl;
- tests[i]->saveTestResult(pTab, FAILED_TO_CREATE);
- continue;
- }
- pTab2 = pDict->getTable(pTab->getName());
-
- ctx->addTab(pTab2);
- }
-
ctx->setNumRecords(records);
ctx->setNumLoops(loops);
ctx->setSuite(this);
-
- const NdbDictionary::Table** tables= ctx->getTables();
- result = tests[i]->execute(ctx);
- tests[i]->saveTestResult(tables[0], result);
+ int result = tests[i]->execute(ctx);
+
+ tests[i]->saveTestResult("", result);
if (result != NDBT_OK)
numTestsFail++;
else
numTestsOk++;
numTestsExecuted++;
-
- if(result == NDBT_OK)
- {
- for(t = 0; tables[t] != 0; t++)
- {
- pDict->dropTable(tables[t]->getName());
- }
- }
-
+
delete ctx;
}
}
@@ -913,17 +891,9 @@ int NDBT_TestSuite::executeAll(Ndb_clust
int
NDBT_TestSuite::executeOne(Ndb_cluster_connection& con,
const char* _tabname, const char* _testname){
-
- if(tests.size() == 0)
- return NDBT_FAILED;
- Ndb ndb(&con, "TEST_DB");
- ndb.init(1024);
- int result = ndb.waitUntilReady(300); // 5 minutes
- if (result != 0){
- g_err << name <<": Ndb was not ready" << endl;
+ if(tests.size() == 0)
return NDBT_FAILED;
- }
ndbout << name << " started [" << getDate() << "]" <<
endl;
@@ -933,7 +903,7 @@ NDBT_TestSuite::executeOne(Ndb_cluster_c
ndbout << "|- " << ptab->getName() << endl;
- execute(con, &ndb, ptab, _testname);
+ execute(con, ptab, _testname);
if (numTestsFail > 0){
return NDBT_FAILED;
@@ -1026,7 +996,7 @@ NDBT_TestSuite::createHook(Ndb* ndb, Ndb
}
void NDBT_TestSuite::execute(Ndb_cluster_connection& con,
- Ndb* ndb, const NdbDictionary::Table* pTab,
+ const NdbDictionary::Table* pTab,
const char* _testname){
int result;
@@ -1047,50 +1017,20 @@ void NDBT_TestSuite::execute(Ndb_cluster
tests[t]->initBeforeTest();
- NdbDictionary::Dictionary* pDict = ndb->getDictionary();
- const NdbDictionary::Table* pTab2 = pDict->getTable(pTab->getName());
- if (createTable == true){
-
- if(pTab2 != 0 && pDict->dropTable(pTab->getName()) != 0){
- numTestsFail++;
- numTestsExecuted++;
- g_err << "ERROR0: Failed to drop table " << pTab->getName() <<
endl;
- tests[t]->saveTestResult(pTab, FAILED_TO_CREATE);
- continue;
- }
-
- if (NDBT_Tables::createTable(ndb, pTab->getName(), nologging, false,
- g_create_hook, this) != 0) {
- numTestsFail++;
- numTestsExecuted++;
- g_err << "ERROR1: Failed to create table " << pTab->getName()
- << pDict->getNdbError() << endl;
- tests[t]->saveTestResult(pTab, FAILED_TO_CREATE);
- continue;
- }
- pTab2 = pDict->getTable(pTab->getName());
- } else if(!pTab2) {
- pTab2 = pTab;
- }
-
ctx = new NDBT_Context(con);
- ctx->setTab(pTab2);
ctx->setNumRecords(records);
ctx->setNumLoops(loops);
ctx->setSuite(this);
-
+ ctx->setTab(pTab);
+
result = tests[t]->execute(ctx);
- tests[t]->saveTestResult(pTab, result);
+ tests[t]->saveTestResult(pTab->getName(), result);
if (result != NDBT_OK)
numTestsFail++;
else
numTestsOk++;
numTestsExecuted++;
- if (result == NDBT_OK && createTable == true && createAllTables ==
false){
- pDict->dropTable(pTab->getName());
- }
-
tests[t]->m_has_run = true;
delete ctx;
@@ -1099,6 +1039,133 @@ void NDBT_TestSuite::execute(Ndb_cluster
+int
+NDBT_TestSuite::createTables(Ndb_cluster_connection& con) const
+{
+ Ndb ndb(&con, "TEST_DB");
+ ndb.init(1);
+
+ NdbDictionary::Dictionary* pDict = ndb.getDictionary();
+ for(unsigned i = 0; i<m_tables_in_test.size(); i++)
+ {
+ const char *tab_name= m_tables_in_test[i].c_str();
+ if (pDict->dropTable(tab_name) != 0)
+ {
+ g_err << "runCreateTables: Failed to drop table " << tab_name
+ << pDict->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ if(NDBT_Tables::createTable(&ndb, tab_name, getLogging()) != 0)
+ {
+ g_err << "runCreateTables: Failed to create table " << tab_name
+ << pDict->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ if (i == 0){
+ // Update ctx with a pointer to the first created table
+ const NdbDictionary::Table* pTab2 = pDict->getTable(tab_name);
+ ctx->setTab(pTab2);
+ }
+ g_info << "created " << tab_name << endl;
+ }
+
+ return NDBT_OK;
+}
+
+
+static int
+runCreateTables(NDBT_Context* ctx, NDBT_Step* step)
+{
+ NDBT_TestSuite* suite= ctx->getSuite();
+ return suite->createTables(ctx->m_cluster_connection);
+}
+
+
+static int
+runCreateTable(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb ndb(&ctx->m_cluster_connection, "TEST_DB");
+ ndb.init(1);
+
+ NdbDictionary::Dictionary* pDict = ndb.getDictionary();
+ const NdbDictionary::Table* pTab = ctx->getTab();
+ const char *tab_name= pTab->getName();
+ if (pDict->dropTable(tab_name) > 0)
+ {
+ g_err << "runCreateTable: Failed to drop table " << tab_name
+ << pDict->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ if(NDBT_Tables::createTable(&ndb, tab_name,
+ ctx->getSuite()->getLogging()) != 0)
+ {
+ g_err << "runCreateTable: Failed to create table " << tab_name
+ << pDict->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ // Update ctx with a pointer to the created table
+ const NdbDictionary::Table* pTab2 = pDict->getTable(tab_name);
+ ctx->setTab(pTab2);
+
+ return NDBT_OK;
+}
+
+
+int
+NDBT_TestSuite::dropTables(Ndb_cluster_connection& con) const
+{
+ Ndb ndb(&con, "TEST_DB");
+ ndb.init(1);
+
+ int res= NDBT_OK;
+ NdbDictionary::Dictionary* pDict = ndb.getDictionary();
+ for(unsigned i = 0; i<m_tables_in_test.size(); i++)
+ {
+ const char *tab_name= m_tables_in_test[i].c_str();
+ if (pDict->dropTable(tab_name) != 0)
+ {
+ g_err << "runDropTables: Failed to drop table " << tab_name
+ << pDict->getNdbError() << endl;
+ res= NDBT_FAILED;
+ // Continue, try to drop all tables...
+ }
+
+ g_info << "dropped " << tab_name << endl;
+ }
+ return NDBT_OK;
+}
+
+
+static int
+runDropTables(NDBT_Context* ctx, NDBT_Step* step)
+{
+ NDBT_TestSuite* suite= ctx->getSuite();
+ return suite->dropTables(ctx->m_cluster_connection);
+}
+
+
+static int
+runDropTable(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb ndb(&ctx->m_cluster_connection, "TEST_DB");
+ ndb.init(1);
+
+ NdbDictionary::Dictionary* pDict = ndb.getDictionary();
+ const NdbDictionary::Table* pTab = ctx->getTab();
+ const char *tab_name= pTab->getName();
+
+ if (pDict->dropTable(tab_name) != 0)
+ {
+ g_err << "runDropTables: Failed to drop table " << tab_name
+ << pDict->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ return NDBT_OK;
+}
+
int
NDBT_TestSuite::report(const char* _tcname){
@@ -1269,21 +1336,6 @@ int NDBT_TestSuite::execute(int argc, co
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
- if (opt_print == true){
- printExecutionTree();
- return 0;
- }
-
- if (opt_print_html == true){
- printExecutionTreeHTML();
- return 0;
- }
-
- if (opt_print_cases == true){
- printCases();
- return 0;
- }
-
if (opt_verbose)
setOutputLevel(2); // Show g_info
else
@@ -1292,15 +1344,10 @@ int NDBT_TestSuite::execute(int argc, co
records = opt_records;
loops = opt_loops;
timer = opt_timer;
- nologging = opt_nologging;
+ if (opt_nologging)
+ setLogging(false);
temporaryTables = opt_temporary;
- Ndb_cluster_connection con;
- if(con.connect(12, 5, 1))
- {
- return NDBT_ProgramExit(NDBT_FAILED);
- }
-
if (opt_seed == 0)
{
opt_seed = NdbTick_CurrentMillisecond();
@@ -1310,46 +1357,58 @@ int NDBT_TestSuite::execute(int argc, co
srandom(opt_seed);
global_flag_skip_invalidate_cache = 1;
-
- {
- Ndb ndb(&con, "TEST_DB");
- ndb.init(1024);
- if (ndb.waitUntilReady(500)){
- g_err << "Ndb was not ready" << endl;
- return NDBT_ProgramExit(NDBT_FAILED);
- }
- NdbDictionary::Dictionary* pDict = ndb.getDictionary();
- int num_tables= argc;
+ int num_tables= argc;
+ if (argc == 0)
+ num_tables = NDBT_Tables::getNumTables();
+
+ for(int i = 0; i<num_tables; i++)
+ {
if (argc == 0)
- num_tables = NDBT_Tables::getNumTables();
+ m_tables_in_test.push_back(NDBT_Tables::getTable(i)->getName());
+ else
+ m_tables_in_test.push_back(_argv[i]);
+ }
- for(int i = 0; i<num_tables; i++)
+ if (m_createTable)
+ {
+ for (unsigned t = 0; t < tests.size(); t++)
{
- if (argc == 0)
- m_tables_in_test.push_back(NDBT_Tables::getTable(i)->getName());
- else
- m_tables_in_test.push_back(_argv[i]);
- if (createAllTables == true)
- {
- const char *tab_name= m_tables_in_test[i].c_str();
- const NdbDictionary::Table* pTab = pDict->getTable(tab_name);
- if (pTab && pDict->dropTable(tab_name) != 0)
- {
- g_err << "ERROR0: Failed to drop table " << tab_name
- << pDict->getNdbError() << endl;
- return NDBT_ProgramExit(NDBT_FAILED);
- }
- if(NDBT_Tables::createTable(&ndb, tab_name, nologging) != 0)
- {
- g_err << "ERROR1: Failed to create table " << tab_name
- << pDict->getNdbError() << endl;
- return NDBT_ProgramExit(NDBT_FAILED);
- }
- }
+ NDBT_TestCaseImpl1* pt= (NDBT_TestCaseImpl1*)tests[t];
+ NDBT_Initializer* pti =
+ new NDBT_Initializer(pt,
+ m_createAll ? "runCreateTables" : "runCreateTable",
+ m_createAll ? runCreateTables : runCreateTable);
+ pt->addInitializer(pti, true);
+ NDBT_Finalizer* ptf =
+ new NDBT_Finalizer(pt,
+ m_createAll ? "runDropTables" : "runDropTable",
+ m_createAll ? runDropTables : runDropTable);
+ pt->addFinalizer(ptf);
}
}
+ if (opt_print == true){
+ printExecutionTree();
+ return 0;
+ }
+
+ if (opt_print_html == true){
+ printExecutionTreeHTML();
+ return 0;
+ }
+
+ if (opt_print_cases == true){
+ printCases();
+ return 0;
+ }
+
+ Ndb_cluster_connection con;
+ if(con.connect(12, 5, 1))
+ {
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
if(argc == 0){
// No table specified
res = executeAll(con, opt_testname);
@@ -1362,24 +1421,8 @@ int NDBT_TestSuite::execute(int argc, co
res = report(opt_testname);
}
- if (res == NDBT_OK && createAllTables == true)
- {
- Ndb ndb(&con, "TEST_DB");
- ndb.init(1024);
- if (ndb.waitUntilReady(500)){
- g_err << "Ndb was not ready" << endl;
- return NDBT_ProgramExit(NDBT_FAILED);
- }
- NdbDictionary::Dictionary* pDict = ndb.getDictionary();
- for(unsigned i = 0; i<m_tables_in_test.size(); i++)
- {
- pDict->dropTable(m_tables_in_test[i].c_str());
- }
- }
-
return NDBT_ProgramExit(res);
}
-
void NDBT_TestSuite::printExecutionTree(){
=== modified file 'storage/ndb/tools/restore/restore_main.cpp'
--- a/storage/ndb/tools/restore/restore_main.cpp 2008-06-18 21:19:57 +0000
+++ b/storage/ndb/tools/restore/restore_main.cpp 2008-07-24 10:57:58 +0000
@@ -929,7 +929,8 @@ main(int argc, char** argv)
{
for(i=0; i < metaData.getNoOfTables(); i++)
{
- if (checkSysTable(metaData, i))
+ if (checkSysTable(metaData, i) &&
+ checkDbAndTableName(metaData[i]))
{
for(Uint32 j= 0; j < g_consumers.size(); j++)
if (!g_consumers[j]->table_equal(* metaData[i]))
@@ -945,7 +946,8 @@ main(int argc, char** argv)
{
//if want to promote attributes, compability check is done firstly
for (i=0; i < metaData.getNoOfTables(); i++){
- if (checkSysTable(metaData, i))
+ if (checkSysTable(metaData, i) &&
+ checkDbAndTableName(metaData[i]))
{
for(Uint32 j= 0; j < g_consumers.size(); j++)
{
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.4 branch (jonas:2680) | jonas oreland | 3 Aug |