#At file:///home/tomas/mysql_src/mysql-5.1-telco-6.4/
3069 Tomas Ulin 2008-11-13 [merge]
merge
modified:
mysql-test/suite/ndb/r/ndb_blob.result
mysql-test/suite/ndb/t/ndb_blob.test
storage/ndb/config/common.mk.am
storage/ndb/include/kernel/signaldata/DbinfoScan.hpp
storage/ndb/include/mgmcommon/IPCConfig.hpp
storage/ndb/include/ndb_version.h.in
storage/ndb/include/ndbinfo.h
storage/ndb/include/transporter/TransporterDefinitions.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/mgmcommon/IPCConfig.cpp
storage/ndb/src/common/transporter/SCI_Transporter.cpp
storage/ndb/src/common/transporter/SCI_Transporter.hpp
storage/ndb/src/common/transporter/SHM_Transporter.cpp
storage/ndb/src/common/transporter/SHM_Transporter.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/util/ConfigValues.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp
storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/mgmsrv/Config.cpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/src/mgmsrv/ConfigInfo.hpp
storage/ndb/src/mgmsrv/ConfigManager.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/mgmsrv/main.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/test/ndbapi/testScanFilter.cpp
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/run-test/db.cpp
storage/ndb/test/src/DbUtil.cpp
storage/ndb/test/src/NDBT_Tables.cpp
storage/ndb/test/tools/rep_latency.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_blob.result'
--- a/mysql-test/suite/ndb/r/ndb_blob.result 2008-11-08 21:22:57 +0000
+++ b/mysql-test/suite/ndb/r/ndb_blob.result 2008-11-10 11:55:47 +0000
@@ -636,16 +636,3 @@ select (giga = repeat(@stuff, 2000)) fro
(giga = repeat(@stuff, 2000))
1
drop table t1;
-create table t1 (
-id int,
-data longblob,
-primary key(id))
-engine=ndb;
-set @blurb= repeat('B', 1000000);
-set @blurbHash= sha1(@blurb);
-set autocommit = 1;
-insert into t1 values (1, @blurb);
-create table result_space (k int primary key, result int) engine=myisam;
-set autocommit = 0;
-drop table result_space;
-drop table t1;
=== modified file 'mysql-test/suite/ndb/t/ndb_blob.test'
--- a/mysql-test/suite/ndb/t/ndb_blob.test 2008-11-08 21:22:57 +0000
+++ b/mysql-test/suite/ndb/t/ndb_blob.test 2008-11-10 11:55:47 +0000
@@ -588,114 +588,3 @@ select sha1(giga) from t1;
select (giga = repeat(@stuff, 2000)) from t1 where a=0;
drop table t1;
-
-# bug# 39867 : Blob part operation error handling
-# We attempt to generate a transporter overload
-# and check that an error is generated, rather than
-# rubbish being silently returned.
-# Since transporter overload is not easy to reliably
-# reproduce, this testcase passes when all SELECTS
-# return either the correct result, or fail with
-# MySQL error 1297, and warnings about transporter
-# overload from Ndb. (1218)
-# The testcase will fail if there is no error, and
-# incorrect data/NULL, or if there is an error other
-# than 1297 due to 1218.
-#
-
-create table t1 (
- id int,
- data longblob,
- primary key(id))
- engine=ndb;
-
-set @blurb= repeat('B', 1000000); # ~1MB of stuff
-set @blurbHash= sha1(@blurb);
-
-set autocommit = 1;
-
-let $mysqltest_loopcounter= 500;
-let $success_count=0;
-let $fail_count=0;
-
-
-insert into t1 values (1, @blurb);
-
-create table result_space (k int primary key, result int) engine=myisam;
-
---disable_result_log
---disable_query_log
-
-while ($mysqltest_loopcounter)
-{
- # Allow success or unmapped error type (1297).
- --error 0,1297
- insert into result_space select 1, (sha1(data) = @blurbHash) from t1 where id=1;
-
- let $orig_errno= $mysql_errno;
-
- # Statement execution success path
- if (!$orig_errno)
- {
- # Test that the SHA1 of the data read was as expected
- # (e.g. comparison returned true)
- let $is_right_answer=`SELECT result from result_space where k=1`;
-
- if (!$is_right_answer)
- {
- --enable_result_log
- echo SELECT succeeded but gave bad result.
- SHOW WARNINGS;
- die "Bad data returned";
- }
- #echo Success gave correct result;
- inc $success_count;
- }
-
- # Statement execution failure path
- if ($orig_errno)
- {
- inc $fail_count;
- #echo FAIL;
-
- # For the error we are interested in (1218), MySQLD returns a generic error
- # code (1297). We use the warning info to check the Ndb error.
- # Note that sometimes the end of the warning states 'from NDB', sometimes
- # 'from NDBCLUSTER'. Probably it should be made consistent.
- #
- let $warning_message= query_get_value("SHOW WARNINGS", Message, 1);
- let $is_correct_message= `SELECT "$warning_message" LIKE "Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDB%"`;
-
- if (!$is_correct_message)
- {
- --enable_result_log
- echo SELECT failed with incorrect error ($mysql_errno);
- SHOW WARNINGS;
- die "Incorrect error from statement";
- }
- #echo Failed as expected;
- }
-
- delete from result_space;
-
- #echo $mysqltest_loopcounter;
-
- dec $mysqltest_loopcounter;
-
- if ($fail_count = 20)
- {
- # That's enough punishment
- #echo Exiting loop early at iteration $mysqltest_loopcounter;
- let $mysqltest_loopcounter= 0;
- }
-}
-
---enable_result_log
---enable_query_log
-
-# Interesting, but can't output as it's different every time
-# echo Successes $success_count;
-# echo Failures $fail_count;
-set autocommit = 0;
-drop table result_space;
-drop table t1;
=== modified file 'storage/ndb/config/common.mk.am'
--- a/storage/ndb/config/common.mk.am 2008-03-11 15:27:35 +0000
+++ b/storage/ndb/config/common.mk.am 2008-11-11 07:21:58 +0000
@@ -25,6 +25,4 @@ mgmapiincludedir = "$(pkgincludedir)/sto
INCLUDES = $(INCLUDES_LOC)
LDADD = $(LDADD_LOC)
DEFS = @DEFS@ @NDB_DEFS@ $(DEFS_LOC) $(NDB_EXTRA_FLAGS)
-NDB_CXXFLAGS=@ndb_cxxflags_fix@ $(NDB_CXXFLAGS_LOC)
-NDB_AM_CXXFLAGS:= $(AM_CXXFLAGS)
-AM_CXXFLAGS=$(NDB_AM_CXXFLAGS) $(NDB_CXXFLAGS)
+AM_CXXFLAGS= @ndb_cxxflags_fix@ $(NDB_CXXFLAGS_LOC)
=== modified file 'storage/ndb/include/kernel/signaldata/DbinfoScan.hpp'
--- a/storage/ndb/include/kernel/signaldata/DbinfoScan.hpp 2008-10-06 08:16:43 +0000
+++ b/storage/ndb/include/kernel/signaldata/DbinfoScan.hpp 2008-11-10 11:44:02 +0000
@@ -18,6 +18,14 @@
#include "SignalData.hpp"
+struct DbinfoScanCursor
+{
+ Uint32 cur_requestInfo;
+ Uint32 cur_node;
+ Uint32 cur_block;
+ Uint32 cur_item;
+};
+
/**
* SENDER: API,MGM
* RECIVER: DBINFO
@@ -49,11 +57,11 @@ struct DbinfoScanReq
Uint32 rows_total;
Uint32 word_total;
- Uint32 cur_requestInfo;
- Uint32 cur_node;
- Uint32 cur_block;
- Uint32 cur_item;
- Uint32 cursor[0];
+ union
+ {
+ Uint32 cursordata[1];
+ struct DbinfoScanCursor cursor;
+ };
};
/**
@@ -87,11 +95,11 @@ public:
Uint32 rows_total;
Uint32 word_total;
- Uint32 cur_requestInfo;
- Uint32 cur_node;
- Uint32 cur_block;
- Uint32 cur_item;
- Uint32 cursor[0];
+ union
+ {
+ Uint32 cursordata[1];
+ struct DbinfoScanCursor cursor;
+ };
};
/**
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-12 08:17:14 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+/* Copyright (C) 2003-2008 MySQL AB, Sun Microsystems Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -16,67 +16,21 @@
#ifndef IPCConfig_H
#define IPCConfig_H
-#include <ndb_types.h>
-#include <ndb_limits.h>
-#include <kernel_types.h>
-#include <Properties.hpp>
-
-/**
- * @class IPCConfig
- * @brief Config transporters in TransporterRegistry using Properties config
- */
-class IPCConfig
+struct IPCConfig
{
-public:
- IPCConfig(Properties * props);
- ~IPCConfig();
-
- /** @return 0 for OK */
- int init();
-
- NodeId ownId() const;
-
- /** @return No of transporters configured */
- int configureTransporters(class TransporterRegistry * theTransporterRegistry);
-
- /**
- * Supply a nodeId,
- * and get next higher node id
- * @return false if none found, true otherwise
- *
- * getREPHBFrequency and getNodeType uses the last Id supplied to
- * getNextRemoteNodeId.
- */
- bool getNextRemoteNodeId(NodeId & nodeId) const;
- Uint32 getREPHBFrequency(NodeId id) const;
- const char* getNodeType(NodeId id) const;
-
- NodeId getNoOfRemoteNodes() const {
- return theNoOfRemoteNodes;
- }
-
- void print() const { props->print(); }
-
- static Uint32 configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration &,
- class TransporterRegistry &);
-
-private:
- NodeId the_ownId;
- Properties * props;
-
- bool addRemoteNodeId(NodeId nodeId);
- NodeId theNoOfRemoteNodes;
- NodeId theRemoteNodeIds[MAX_NODES];
-};
-
-inline
-NodeId
-IPCConfig::ownId() const
-{
- return the_ownId;
-}
+ /*
+ configure_transporters
+ Create and configure transporters in TransporterRegistry
+ Returns:
+ true - sucessfully created and (re)configured transporters
+ false - at least one transporter could not be created
+ or (re)configured
+ */
+ static bool configureTransporters(Uint32 nodeId,
+ const struct ndb_mgm_configuration &,
+ class TransporterRegistry &);
+};
#endif // IPCConfig_H
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2008-11-07 14:50:21 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2008-11-13 13:36:29 +0000
@@ -282,4 +282,28 @@ ndbd_suma_dictlock_handover(Uint32 x)
return (x >= NDBD_SUMA_DICTLOCK_HANDOVER);
}
+#define NDBD_API_TAKE_OVERTCCONF_60 NDB_MAKE_VERSION(5,2,4)
+#define NDBD_API_TAKE_OVERTCCONF_62 NDB_MAKE_VERSION(6,2,17)
+#define NDBD_API_TAKE_OVERTCCONF_63 NDB_MAKE_VERSION(6,3,19)
+
+static
+inline
+int
+ndb_takeovertc(Uint32 x)
+{
+ if (x >= NDB_VERSION_D)
+ return 1;
+
+ const Uint32 major = (x >> 16) & 0xFF;
+ const Uint32 minor = (x >> 8) & 0xFF;
+
+ if (major >= 6)
+ {
+ if (minor == 2)
+ return x >= NDBD_API_TAKE_OVERTCCONF_62;
+ }
+
+ return x >= NDBD_API_TAKE_OVERTCCONF_63;
+}
+
#endif
=== modified file 'storage/ndb/include/ndbinfo.h'
--- a/storage/ndb/include/ndbinfo.h 2008-11-07 08:52:22 +0000
+++ b/storage/ndb/include/ndbinfo.h 2008-11-10 11:44:02 +0000
@@ -39,17 +39,12 @@ struct ndbinfo_column {
struct ndbinfo_table {
NDBINFO_TABLE_MEMBERS
- struct ndbinfo_column col[0];
-};
-
-/* because MSVC compiler hates you (or rather, zero length arrays) */
-struct ndbinfo_table_internal {
- NDBINFO_TABLE_MEMBERS
+ struct ndbinfo_column col[1];
};
#define DECLARE_NDBINFO_TABLE(var, num) \
-struct ndbinfostruct##var { \
- struct ndbinfo_table_internal t; \
+struct ndbinfostruct##var { \
+ NDBINFO_TABLE_MEMBERS \
struct ndbinfo_column col[num]; \
} var
@@ -105,10 +100,10 @@ int dbinfo_write_row_column_uint32(struc
conf->colBitmapLo= (req).colBitmapLo; \
conf->colBitmapHi= (req).colBitmapHi; \
conf->requestInfo= (req).requestInfo | DbinfoScanConf::MoreData; \
- conf->cur_requestInfo= 0; \
- conf->cur_node= getOwnNodeId(); \
- conf->cur_block= number(); \
- conf->cur_item= (itemnumber); \
+ conf->cursor.cur_requestInfo= 0; \
+ conf->cursor.cur_node= getOwnNodeId(); \
+ conf->cursor.cur_block= number(); \
+ conf->cursor.cur_item= (itemnumber); \
conf->maxRows= (rl).maxRows; \
conf->maxBytes= (rl).maxBytes; \
conf->rows_total= (rl).rows_total + (rl).rows; \
=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-08-27 14:34:22 +0000
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-12 08:17:14 +0000
@@ -40,6 +40,12 @@ enum SendStatus {
SEND_UNKNOWN_NODE = 5
};
+enum TransporterType {
+ tt_TCP_TRANSPORTER = 1,
+ tt_SCI_TRANSPORTER = 2,
+ tt_SHM_TRANSPORTER = 3
+};
+
/**
* Maximum message sizes
* ---------------------
@@ -75,6 +81,7 @@ struct TransporterConfiguration {
bool checksum;
bool signalId;
bool isMgmConnection; // is a mgm connection, requires transforming
+ TransporterType type;
union { // Transporter specific configuration information
@@ -92,11 +99,6 @@ struct TransporterConfiguration {
Uint32 shmSize;
int signum;
} shm;
-
- struct {
- Uint32 prioASignalSize;
- Uint32 prioBSignalSize;
- } ose;
struct {
Uint32 sendLimit; // Packet size
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-10 09:28:28 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-12 08:26:18 +0000
@@ -51,12 +51,6 @@ enum IOState {
HaltIO = 3
};
-enum TransporterType {
- tt_TCP_TRANSPORTER = 1,
- tt_SCI_TRANSPORTER = 2,
- tt_SHM_TRANSPORTER = 3
- // ID 4 was OSE Transporter which has been removed. Don't use ID 4.
-};
static const char *performStateString[] =
{ "is connected",
@@ -198,17 +192,21 @@ public:
IOState ioState(NodeId nodeId);
void setIOState(NodeId nodeId, IOState state);
- /**
- * createTransporter
+private:
+
+ bool createTCPTransporter(TransporterConfiguration * config);
+ bool createSCITransporter(TransporterConfiguration * config);
+ bool createSHMTransporter(TransporterConfiguration * config);
+
+public:
+ /**
+ * configureTransporter
+ *
+ * Configure a transporter, ie. create new if it
+ * does not exist otherwise try to reconfigure it
*
- * If the config object indicates that the transporter
- * to be created will act as a server and no server is
- * started, startServer is called. A transporter of the selected kind
- * is created and it is put in the transporter arrays.
- */
- bool createTCPTransporter(struct TransporterConfiguration * config);
- bool createSCITransporter(struct TransporterConfiguration * config);
- bool createSHMTransporter(struct TransporterConfiguration * config);
+ */
+ bool configureTransporter(TransporterConfiguration * config);
/**
* Allocate send buffer for default send buffer handling.
@@ -324,8 +322,6 @@ public:
void add_transporter_interface(NodeId remoteNodeId, const char *interf,
int s_port); // signed port. <0 is dynamic
Transporter* get_transporter(NodeId nodeId);
- NodeId get_localNodeId() { return localNodeId; };
-
struct in_addr get_connect_address(NodeId node_id) const;
protected:
@@ -339,7 +335,6 @@ private:
int sendCounter;
NodeId localNodeId;
- bool nodeIdSpecified;
unsigned maxTransporters;
int nTransporters;
int nTCPTransporters;
@@ -498,6 +493,9 @@ public:
bool has_data_to_send(NodeId node);
void reset_send_buffer(NodeId node);
+
+ void print_transporters(const char* where, NdbOut& out = ndbout);
+
};
inline void
=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-08-21 06:33:53 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-12 08:22:03 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+/* Copyright (C) 2003-2008 MySQL AB, Sun Microsystems Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -14,166 +14,47 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
-#include <ndb_opt_defaults.h>
#include <IPCConfig.hpp>
-#include <NdbOut.hpp>
-#include <NdbHost.h>
-#include <TransporterDefinitions.hpp>
#include <TransporterRegistry.hpp>
-#include <Properties.hpp>
+#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
-#include <mgmapi_config_parameters.h>
-#if defined DEBUG_TRANSPORTER
-#define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
-#else
-#define DEBUG(t)
-#endif
-IPCConfig::IPCConfig(Properties * p)
+/* Return true if node with "nodeId" is a MGM node */
+static bool is_mgmd(Uint32 nodeId,
+ const struct ndb_mgm_configuration & config)
{
- theNoOfRemoteNodes = 0;
- the_ownId = 0;
- if(p != 0)
- props = new Properties(* p);
- else
- props = 0;
-}
-
-
-IPCConfig::~IPCConfig()
-{
- if(props != 0){
- delete props;
- }
-}
-
-int
-IPCConfig::init(){
- Uint32 nodeId;
-
- if(props == 0) return -1;
- if(!props->get("LocalNodeId", &nodeId)) {
- DEBUG( "Did not find local node id." );
- return -1;
- }
- the_ownId = nodeId;
-
- Uint32 noOfConnections;
- if(!props->get("NoOfConnections", &noOfConnections)) {
- DEBUG( "Did not find noOfConnections." );
- return -1;
- }
-
- for(Uint32 i = 0; i<noOfConnections; i++){
- const Properties * tmp;
- Uint32 node1, node2;
-
- if(!props->get("Connection", i, &tmp)) {
- DEBUG( "Did not find Connection." );
- return -1;
- }
- if(!tmp->get("NodeId1", &node1)) {
- DEBUG( "Did not find NodeId1." );
- return -1;
- }
- if(!tmp->get("NodeId2", &node2)) {
- DEBUG( "Did not find NodeId2." );
- return -1;
- }
-
- if(node1 == the_ownId && node2 != the_ownId)
- if(!addRemoteNodeId(node2)) {
- DEBUG( "addRemoteNodeId(node2) failed." );
- return -1;
- }
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+ if (iter.find(CFG_NODE_ID, nodeId))
+ abort();
+ Uint32 type;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type))
+ abort();
- if(node1 != the_ownId && node2 == the_ownId)
- if(!addRemoteNodeId(node1)) {
- DEBUG( "addRemoteNodeId(node2) failed." );
- return -1;
- }
- }
- return 0;
+ return (type == NODE_TYPE_MGM);
}
-bool
-IPCConfig::addRemoteNodeId(NodeId nodeId){
- for(int i = 0; i<theNoOfRemoteNodes; i++)
- if(theRemoteNodeIds[i] == nodeId)
- return false;
- theRemoteNodeIds[theNoOfRemoteNodes] = nodeId;
- theNoOfRemoteNodes++;
- return true;
-}
-/**
- * Supply a nodeId,
- * and get next higher node id
- * Returns false if none found
- */
bool
-IPCConfig::getNextRemoteNodeId(NodeId & nodeId) const {
- NodeId returnNode = MAX_NODES + 1;
- for(int i = 0; i<theNoOfRemoteNodes; i++)
- if(theRemoteNodeIds[i] > nodeId){
- if(theRemoteNodeIds[i] < returnNode){
- returnNode = theRemoteNodeIds[i];
- }
- }
- if(returnNode == (MAX_NODES + 1))
- return false;
- nodeId = returnNode;
- return true;
-}
-
-
-Uint32
-IPCConfig::getREPHBFrequency(NodeId id) const {
- const Properties * tmp;
- Uint32 out;
-
- /**
- * Todo: Fix correct heartbeat
- */
- if (!props->get("Node", id, &tmp) ||
- !tmp->get("HeartbeatIntervalRepRep", &out)) {
- DEBUG("Illegal Node or HeartbeatIntervalRepRep in config.");
- out = 10000;
- }
-
- return out;
-}
-
-const char*
-IPCConfig::getNodeType(NodeId id) const {
- const char * out;
- const Properties * tmp;
-
- if (!props->get("Node", id, &tmp) || !tmp->get("Type", &out)) {
- DEBUG("Illegal Node or NodeType in config.");
- out = "Unknown";
- }
-
- return out;
-}
-
-#include <mgmapi.h>
-Uint32
IPCConfig::configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration & config,
- class TransporterRegistry & tr){
- TransporterConfiguration conf;
+ const struct ndb_mgm_configuration & config,
+ class TransporterRegistry & tr)
+{
+ bool result= true;
DBUG_ENTER("IPCConfig::configureTransporters");
- /**
- * Iterate over all MGM's an construct a connectstring
- * create mgm_handle and give it to the Transporter Registry
- */
+
+ if (!is_mgmd(nodeId, config))
{
+
+ /**
+ * Iterate over all MGM's and construct a connectstring
+ * create mgm_handle and give it to the Transporter Registry
+ */
+
const char *separator= "";
BaseString connect_string;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
@@ -198,9 +79,23 @@ IPCConfig::configureTransporters(Uint32
}
}
- Uint32 noOfTransportersCreated= 0;
+
+ /* Remove transporter to nodes that does not exist anymore */
+ for (int i= 1; i < MAX_NODES; i++)
+ {
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+ if (tr.get_transporter(i) && iter.find(CFG_NODE_ID, i))
+ {
+ // Transporter exist in TransporterResgistry but not
+ // in configuration
+ ndbout_c("The connection to node %d could not "
+ "be removed at this time", i);
+ result= false; // Need restart
+ }
+ }
+
+ TransporterConfiguration conf;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
-
for(iter.first(); iter.valid(); iter.next()){
bzero(&conf, sizeof(conf));
@@ -234,19 +129,11 @@ IPCConfig::configureTransporters(Uint32
Uint32 nodeIdServer= 0;
if(iter.get(CFG_CONNECTION_NODE_ID_SERVER, &nodeIdServer)) break;
- /*
- We check the node type.
- */
- Uint32 node1type, node2type;
- ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE);
- ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE);
- node1iter.find(CFG_NODE_ID,nodeId1);
- node2iter.find(CFG_NODE_ID,nodeId2);
- node1iter.get(CFG_TYPE_OF_SECTION,&node1type);
- node2iter.get(CFG_TYPE_OF_SECTION,&node2type);
-
- if(node1type==NODE_TYPE_MGM || node2type==NODE_TYPE_MGM)
+ if(is_mgmd(nodeId1, config) || is_mgmd(nodeId2, config))
+ {
+ // All connections with MGM uses the mgm port as server
conf.isMgmConnection= true;
+ }
else
conf.isMgmConnection= false;
@@ -286,21 +173,22 @@ IPCConfig::configureTransporters(Uint32
if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
- Uint32 tmp;
- if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
- conf.shm.signum= tmp;
-
- if(!tr.createSHMTransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
+ Uint32 signum;
+ if(iter.get(CFG_SHM_SIGNUM, &signum)) break;
+ conf.shm.signum= signum;
+
+ conf.type = tt_SHM_TRANSPORTER;
+
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SHM Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SHM Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
- } else {
- noOfTransportersCreated++;
+ ndbout_c("Failed to configure SHM Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
}
- DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
+ DBUG_PRINT("info", ("Configured SHM Transporter using shmkey %d, "
"buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
-
break;
case CONNECTION_TYPE_SCI:
@@ -318,13 +206,16 @@ IPCConfig::configureTransporters(Uint32
} else {
conf.sci.nLocalAdapters = 2;
}
- if(!tr.createSCITransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
+ conf.type = tt_SCI_TRANSPORTER;
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SCI Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SCI Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
+ ndbout_c("Failed to configure SCI Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
} else {
- DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
+ DBUG_PRINT("info", ("Configured SCI Transporter: Adapters = %d, "
"remote SCI node id %d",
conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
@@ -336,8 +227,6 @@ IPCConfig::configureTransporters(Uint32
"second remote SCI node id = %d",
conf.sci.remoteSciNodeId1));
}
- noOfTransportersCreated++;
- continue;
}
break;
@@ -357,14 +246,15 @@ IPCConfig::configureTransporters(Uint32
iter.get(CFG_TCP_RCV_BUF_SIZE, &conf.tcp.tcpRcvBufSize);
iter.get(CFG_TCP_MAXSEG_SIZE, &conf.tcp.tcpMaxsegSize);
iter.get(CFG_CONNECTION_OVERLOAD, &conf.tcp.tcpOverloadLimit);
+
+ conf.type = tt_TCP_TRANSPORTER;
- if(!tr.createTCPTransporter(&conf)){
- ndbout << "Failed to create TCP Transporter from: "
- << nodeId << " to: " << remoteNodeId << endl;
- } else {
- noOfTransportersCreated++;
+ if(!tr.configureTransporter(&conf)){
+ ndbout_c("Failed to configure TCP Transporter to node %d",
+ conf.remoteNodeId);
+ result= false;
}
- DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
+ DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
"maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
break;
@@ -375,6 +265,6 @@ IPCConfig::configureTransporters(Uint32
} // switch
} // for
- DBUG_RETURN(noOfTransportersCreated);
+ DBUG_RETURN(result);
}
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -99,6 +99,20 @@ SCI_Transporter::SCI_Transporter(Transpo
DBUG_VOID_RETURN;
}
+
+bool
+SCI_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->sci.sendLimit == (m_PacketSize + 3)/4 &&
+ conf->sci.bufferSize == m_buffersize &&
+ conf->sci.nLocalAdapters == m_adapters &&
+ conf->sci.remoteSciNodeId0 == m_remoteNodes[0] &&
+ conf->sci.remoteSciNodeId1 == m_remoteNodes[1])
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
void SCI_Transporter::disconnectImpl()
{
DBUG_ENTER("SCI_Transporter::disconnectImpl");
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -144,6 +144,9 @@ private:
* Destructor. Disconnects the transporter.
*/
~SCI_Transporter();
+
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
bool m_mapped;
bool m_initLocal;
bool m_sciinit;
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -64,6 +64,18 @@ SHM_Transporter::SHM_Transporter(Transpo
m_signal_threshold = 4096;
}
+
+bool
+SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if ((key_t)conf->shm.shmKey == shmKey &&
+ (int)conf->shm.shmSize == shmSize &&
+ conf->shm.signum == g_ndb_shm_signum)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
SHM_Transporter::~SHM_Transporter(){
doDisconnect();
}
@@ -131,19 +143,19 @@ SHM_Transporter::setupBuffers(){
#ifdef DEBUG_TRANSPORTER
printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
- printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
- printf("sharedReadIndex1 at %d (%p) = %d\n",
+ printf("Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %ld (%p) = %d\n",
(char*)sharedReadIndex1-shmBuf,
sharedReadIndex1, *sharedReadIndex1);
- printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ printf("sharedWriteIndex1 at %ld (%p) = %d\n",
(char*)sharedWriteIndex1-shmBuf,
sharedWriteIndex1, *sharedWriteIndex1);
- printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
- printf("sharedReadIndex2 at %d (%p) = %d\n",
+ printf("Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %ld (%p) = %d\n",
(char*)sharedReadIndex2-shmBuf,
sharedReadIndex2, *sharedReadIndex2);
- printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ printf("sharedWriteIndex2 at %ld (%p) = %d\n",
(char*)sharedWriteIndex2-shmBuf,
sharedWriteIndex2, *sharedWriteIndex2);
@@ -171,19 +183,19 @@ SHM_Transporter::setupBuffers(){
* clientStatusFlag = 1;
#ifdef DEBUG_TRANSPORTER
printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
- printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
- printf("sharedReadIndex2 at %d (%p) = %d\n",
+ printf("Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %ld (%p) = %d\n",
(char*)sharedReadIndex2-shmBuf,
sharedReadIndex2, *sharedReadIndex2);
- printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ printf("sharedWriteIndex2 at %ld (%p) = %d\n",
(char*)sharedWriteIndex2-shmBuf,
sharedWriteIndex2, *sharedWriteIndex2);
- printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
- printf("sharedReadIndex1 at %d (%p) = %d\n",
+ printf("Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %ld (%p) = %d\n",
(char*)sharedReadIndex1-shmBuf,
sharedReadIndex1, *sharedReadIndex1);
- printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ printf("sharedWriteIndex1 at %ld (%p) = %d\n",
(char*)sharedWriteIndex1-shmBuf,
sharedWriteIndex1, *sharedWriteIndex1);
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -49,6 +49,8 @@ public:
*/
virtual ~SHM_Transporter();
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
/**
* Do initialization
*/
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-11-12 13:01:03 +0000
@@ -69,6 +69,16 @@ setIf(int& ref, Uint32 val, Uint32 def)
ref = def;
}
+
+static
+Uint32 overload_limit(const TransporterConfiguration* conf)
+{
+ return (conf->tcp.tcpOverloadLimit ?
+ conf->tcp.tcpOverloadLimit :
+ conf->tcp.sendBufferSize*4/5);
+}
+
+
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const TransporterConfiguration* conf)
:
@@ -99,9 +109,24 @@ TCP_Transporter::TCP_Transporter(Transpo
setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
- m_overload_limit = conf->tcp.tcpOverloadLimit ?
- conf->tcp.tcpOverloadLimit : conf->tcp.sendBufferSize*4/5;
+ m_overload_limit = overload_limit(conf);
}
+
+
+bool
+TCP_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->tcp.sendBufferSize == m_max_send_buffer &&
+ conf->tcp.maxReceiveSize == maxReceiveSize &&
+ (int)conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
+ (int)conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
+ (int)conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
+ overload_limit(conf) == m_overload_limit)
+ return true; // No change
+ndbout_c("configure_derived, can't reconfigure");
+ return false; // Can't reconfigure
+}
+
TCP_Transporter::~TCP_Transporter() {
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -45,9 +45,11 @@ class TCP_Transporter : public Transport
private:
// Initialize member variables
TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
-
+
// Disconnect, delete send buffers and receive buffer
virtual ~TCP_Transporter();
+
+ virtual bool configure_derived(const TransporterConfiguration* conf);
/**
* Allocate buffers for sending and receiving
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -102,9 +102,28 @@ Transporter::Transporter(TransporterRegi
}
Transporter::~Transporter(){
- if (m_socket_client)
- delete m_socket_client;
+ delete m_socket_client;
}
+
+
+bool
+Transporter::configure(const TransporterConfiguration* conf)
+{
+ if (configure_derived(conf) &&
+ conf->s_port == m_s_port &&
+ strcmp(conf->remoteHostName, remoteHostName) == 0 &&
+ strcmp(conf->localHostName, localHostName) == 0 &&
+ conf->remoteNodeId == remoteNodeId &&
+ conf->localNodeId == localNodeId &&
+ (conf->serverNodeId == conf->localNodeId) == isServer &&
+ conf->checksum == checksumUsed &&
+ conf->signalId == signalIdUsed &&
+ conf->isMgmConnection == isMgmConnection &&
+ conf->type == m_type)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -116,6 +116,9 @@ protected:
bool signalId,
Uint32 max_send_buffer);
+ virtual bool configure(const TransporterConfiguration* conf);
+ virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
+
/**
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-10 09:28:28 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-12 08:26:18 +0000
@@ -76,6 +76,7 @@ TransporterRegistry::TransporterRegistry
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) :
m_mgm_handle(0),
+ localNodeId(0),
m_transp_count(0),
m_use_default_send_buffer(use_default_send_buffer),
m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
@@ -83,7 +84,6 @@ TransporterRegistry::TransporterRegistry
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
- nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
@@ -154,6 +154,19 @@ TransporterRegistry::allocate_send_buffe
if (!m_use_default_send_buffer)
return;
+ if (total_send_buffer == 0)
+ total_send_buffer = get_total_max_send_buffer();
+
+ if (m_send_buffers)
+ {
+ /* Send buffers already allocated -> resize the buffer pages */
+ assert(m_send_buffer_memory);
+
+ // TODO resize send buffer pages
+
+ return;
+ }
+
/* Initialize transporter send buffers (initially empty). */
m_send_buffers = new SendBuffer[maxTransporters];
for (unsigned i = 0; i < maxTransporters; i++)
@@ -264,11 +277,13 @@ TransporterRegistry::disconnectAll(){
bool
TransporterRegistry::init(NodeId nodeId) {
DBUG_ENTER("TransporterRegistry::init");
- nodeIdSpecified = true;
+ assert(localNodeId == 0 ||
+ localNodeId == nodeId);
+
localNodeId = nodeId;
-
+
DEBUG("TransporterRegistry started node: " << localNodeId);
-
+
DBUG_RETURN(true);
}
@@ -361,20 +376,47 @@ TransporterRegistry::connect_server(NDB_
DBUG_RETURN(res);
}
+
+bool
+TransporterRegistry::configureTransporter(TransporterConfiguration *config)
+{
+ NodeId remoteNodeId = config->remoteNodeId;
+
+ assert(localNodeId);
+ assert(config->localNodeId == localNodeId);
+
+ if (remoteNodeId >= maxTransporters)
+ return false;
+
+ Transporter* t = theTransporters[remoteNodeId];
+ if(t != NULL)
+ {
+ // Transporter already exist, try to reconfigure it
+ return t->configure(config);
+ }
+
+ DEBUG("Configuring transporter from " << localNodeId
+ << " to " << remoteNodeId);
+
+ switch (config->type){
+ case tt_TCP_TRANSPORTER:
+ return createTCPTransporter(config);
+ case tt_SHM_TRANSPORTER:
+ return createSHMTransporter(config);
+ case tt_SCI_TRANSPORTER:
+ return createSCITransporter(config);
+ default:
+ abort();
+ break;
+ }
+ return false;
+}
+
+
bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
TCP_Transporter * t = new TCP_Transporter(*this, config);
if (t == NULL)
@@ -405,17 +447,7 @@ TransporterRegistry::createSCITransporte
if(!SCI_Transporter::initSCI())
abort();
-
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
+
SCI_Transporter * t = new SCI_Transporter(*this,
config->localHostName,
config->remoteHostName,
@@ -457,13 +489,7 @@ bool
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
+
if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
@@ -476,9 +502,6 @@ TransporterRegistry::createSHMTransporte
if(config->shm.signum != g_ndb_shm_signum)
return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
SHM_Transporter * t = new SHM_Transporter(*this,
config->localHostName,
@@ -1270,8 +1293,12 @@ TransporterRegistry::ioState(NodeId node
void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+ if (ioStates[nodeId] == state)
+ return;
+
DEBUG("TransporterRegistry::setIOState("
- << nodeId << ", " << state << ")");
+ << nodeId << ", " << state << ")");
+
ioStates[nodeId] = state;
}
@@ -1609,9 +1636,10 @@ bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
DBUG_ENTER("TransporterRegistry::start_service");
- if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
+ if (m_transporter_interface.size() > 0 &&
+ localNodeId == 0)
{
- g_eventLogger->error("TransporterRegistry::startReceiving: localNodeId not specified");
+ g_eventLogger->error("INTERNAL ERROR: not initialized");
DBUG_RETURN(false);
}
@@ -1723,6 +1751,7 @@ NdbOut & operator <<(NdbOut & out, Signa
Transporter*
TransporterRegistry::get_transporter(NodeId nodeId) {
+ assert(nodeId < maxTransporters);
return theTransporters[nodeId];
}
@@ -1772,7 +1801,7 @@ NDB_SOCKET_TYPE TransporterRegistry::con
for(unsigned int i=0;i < m_transporter_interface.size();i++)
if (m_transporter_interface[i].m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(*h,
- get_localNodeId(),
+ localNodeId,
m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port,
@@ -2077,5 +2106,36 @@ TransporterRegistry::forceSend(NodeId no
else
return false;
}
+
+
+void
+TransporterRegistry::print_transporters(const char* where, NdbOut& out)
+{
+ out << where << " >>" << endl;
+
+ for(unsigned i = 0; i < maxTransporters; i++){
+ if(theTransporters[i] == NULL)
+ continue;
+
+ const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
+
+ out << i << " "
+ << getPerformStateString(remoteNodeId) << " to node: "
+ << remoteNodeId << " at "
+ << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
+ }
+
+ out << "<<" << endl;
+
+ for (size_t i= 0; i < m_transporter_interface.size(); i++){
+ Transporter_interface tf= m_transporter_interface[i];
+
+ out << i
+ << " remote node: " << tf.m_remote_nodeId
+ << " port: " << tf.m_s_service_port
+ << " interface: " << tf.m_interface << endl;
+ }
+}
+
template class Vector<TransporterRegistry::Transporter_interface>;
=== modified file 'storage/ndb/src/common/util/ConfigValues.cpp'
--- a/storage/ndb/src/common/util/ConfigValues.cpp 2008-10-08 13:51:23 +0000
+++ b/storage/ndb/src/common/util/ConfigValues.cpp 2008-11-13 08:02:28 +0000
@@ -307,7 +307,10 @@ ConfigValuesFactory::ConfigValuesFactory
ConfigValuesFactory::~ConfigValuesFactory()
{
if(m_cfg)
+ {
+ m_cfg->~ConfigValues();
free(m_cfg);
+ }
}
ConfigValues *
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-11-07 14:50:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-11-11 07:36:01 +0000
@@ -3304,6 +3304,7 @@ Dbdict::restart_checkSchemaStatusComplet
return;
}
+ ndbrequire(c_restartRecord.m_op_cnt == 0);
ndbrequire(c_nodeRestart || c_initialNodeRestart);
activateIndexes(signal, 0);
return;
@@ -3478,8 +3479,10 @@ void Dbdict::checkSchemaStatus(Signal* s
continue;
if (masterState != SchemaFile::SF_IN_USE)
+ {
+ ownEntry->init();
continue;
-
+ }
/**
* handle table(index) special as DIH has already copied
* table (using COPY_TABREQ)
@@ -3511,6 +3514,9 @@ void Dbdict::checkSchemaStatus(Signal* s
else
{
jam();
+
+ c_restartRecord.m_op_cnt = 0;
+
TxHandlePtr tx_ptr;
c_txHandleHash.getPtr(tx_ptr, c_restartRecord.m_tx_ptr_i);
@@ -3640,6 +3646,7 @@ Dbdict::restart_fromEndTrans(Signal* sig
releaseTxHandle(tx_ptr);
+ ndbrequire(c_restartRecord.m_op_cnt == 0);
c_restartRecord.activeTable++;
seizeTxHandle(tx_ptr);
@@ -3705,10 +3712,35 @@ Dbdict::restartNextPass(Signal* signal)
return;
}
}
+ else if (c_restartRecord.m_tx_ptr_i != RNIL)
+ {
+ /**
+ * Complete last trans
+ */
+ jam();
+
+ c_restartRecord.m_pass--;
+ c_restartRecord.m_op_cnt = 0;
+
+ Ptr<TxHandle> tx_ptr;
+ c_txHandleHash.getPtr(tx_ptr, c_restartRecord.m_tx_ptr_i);
+
+ Callback c = {
+ safe_cast(&Dbdict::restartEndPass_fromEndTrans),
+ tx_ptr.p->tx_key
+ };
+ tx_ptr.p->m_callback = c;
+
+ Uint32 flags = 0;
+ endSchemaTrans(signal, tx_ptr, flags);
+ return;
+ }
else
{
jam();
+ ndbrequire(c_restartRecord.m_op_cnt == 0);
+
/**
* Write schema file at-end of checkSchemaStatus
*/
@@ -4084,11 +4116,12 @@ void Dbdict::execAPI_FAILREQ(Signal* sig
void Dbdict::execNODE_FAILREP(Signal* signal)
{
jamEntry();
- NodeFailRep * const nodeFail = (NodeFailRep *)&signal->theData[0];
+ NodeFailRep nodeFailRep = *(NodeFailRep *)&signal->theData[0];
+ NodeFailRep * nodeFail = &nodeFailRep;
NodeRecordPtr ownNodePtr;
c_nodes.getPtr(ownNodePtr, getOwnNodeId());
- c_failureNr = nodeFail->failNo;
+ c_failureNr = nodeFail->failNo;
const Uint32 numberOfFailedNodes = nodeFail->noOfNodes;
const bool masterFailed = (c_masterNodeId != nodeFail->masterNodeId);
c_masterNodeId = nodeFail->masterNodeId;
@@ -4110,20 +4143,19 @@ void Dbdict::execNODE_FAILREP(Signal* si
*/
jam();
ownNodePtr.p->nodeState = NodeRecord::NDB_MASTER_TAKEOVER;
- ownNodePtr.p->nodeFailRep = *nodeFail;
+ ownNodePtr.p->nodeFailRep = nodeFailRep;
infoEvent("Node %u taking over as DICT master", c_masterNodeId);
handle_master_takeover(signal);
return;
}
- send_nf_complete_rep(signal);
+ send_nf_complete_rep(signal, &nodeFailRep);
return;
}//execNODE_FAILREP()
-void Dbdict::send_nf_complete_rep(Signal* signal)
+void Dbdict::send_nf_complete_rep(Signal* signal, const NodeFailRep* nodeFail)
{
jam();
- NodeFailRep * const nodeFail = (NodeFailRep *)&signal->theData[0];
Uint32 theFailedNodes[NdbNodeBitmask::Size];
memcpy(theFailedNodes, nodeFail->theNodes, sizeof(theFailedNodes));
NdbNodeBitmask tmp;
@@ -9093,8 +9125,8 @@ void Dbdict::execGET_TABINFOREQ(Signal*
jam();
// see own trans always
}
- else if (refToBlock(req->senderRef) != DBUTIL && /** XXX cheat */
- refToBlock(req->senderRef) != SUMA)
+ else if (refToBlock(req->senderRef) != DBUTIL && /** XXX cheat */
+ refToBlock(req->senderRef) != SUMA)
{
Uint32 err;
if ((err = check_read_obj(objEntry)))
@@ -17471,9 +17503,7 @@ void Dbdict::check_takeover_replies(Sign
No slave found any pending transactions, we are done
*/
jam();
- memcpy(signal->theData, &masterNodePtr.p->nodeFailRep,
- sizeof(masterNodePtr.p->nodeFailRep));
- send_nf_complete_rep(signal);
+ send_nf_complete_rep(signal, &masterNodePtr.p->nodeFailRep);
return;
}
/*
@@ -19213,6 +19243,17 @@ Dbdict::createFilegroup_parse(Signal* si
obj_ptr.p->m_type = fg.FilegroupType;
obj_ptr.p->m_ref_count = 0;
+ if (master)
+ {
+ jam();
+ releaseSections(handle);
+ SimplePropertiesSectionWriter w(*this);
+ packFilegroupIntoPages(w, fg_ptr, 0, 0);
+ w.getPtr(objInfoPtr);
+ handle.m_ptr[0] = objInfoPtr;
+ handle.m_cnt = 1;
+ }
+
{
SchemaFile::TableEntry te; te.init();
te.m_tableState = SchemaFile::SF_CREATE;
@@ -25141,9 +25182,7 @@ send_node_fail_rep:
/*
Continue with NODE_FAILREP
*/
- NodeFailRep * const nodeFailRep = (NodeFailRep *)&signal->theData[0];
- *nodeFailRep = ownNodePtr.p->nodeFailRep;
- send_nf_complete_rep(signal);
+ send_nf_complete_rep(signal, &ownNodePtr.p->nodeFailRep);
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-10-29 14:25:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-11-11 07:36:01 +0000
@@ -729,7 +729,7 @@ private:
void execTC_SCHVERCONF(Signal* signal);
void execNODE_FAILREP(Signal* signal);
- void send_nf_complete_rep(Signal* signal);
+ void send_nf_complete_rep(Signal* signal, const NodeFailRep*);
void execINCL_NODEREQ(Signal* signal);
void execAPI_FAILREQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2008-10-08 14:08:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2008-11-10 11:44:02 +0000
@@ -65,7 +65,8 @@ void Dbinfo::execSTTOR(Signal *signal)
const Uint32 startphase = signal->theData[1];
- if (startphase == 3) {
+ if (startphase == 3)
+ {
jam();
signal->theData[0] = reference();
sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
@@ -126,7 +127,7 @@ void Dbinfo::execDBINFO_TRANSID_AI(Signa
char *row= rowbuf;
copy((Uint32*)rowbuf, ptr);
- Uint32 rowsz= ptr.sz;
+ //Uint32 rowsz= ptr.sz;
int len;
for(int i=0; i<ncols; i++)
@@ -227,9 +228,9 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
const Uint32 tableId= req.tableId;
const Uint32 senderRef= req.senderRef;
const Uint32 apiTxnId= req.apiTxnId;
- const Uint32 colBitmapLo= req.colBitmapLo;
- const Uint32 colBitmapHi= req.colBitmapHi;
- const Uint32 requestInfo= req.requestInfo;
+ //const Uint32 colBitmapLo= req.colBitmapLo;
+ //const Uint32 colBitmapHi= req.colBitmapHi;
+ //const Uint32 requestInfo= req.requestInfo;
Uint32 i;
int j;
@@ -253,7 +254,10 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
dbinfo_ratelimit_init(&rl, &req);
if(!(req.requestInfo & DbinfoScanReq::StartScan))
- startid= req.cur_item;
+ {
+ jam();
+ startid= req.cursor.cur_item;
+ }
for(i=startid;dbinfo_ratelimit_continue(&rl) && i<number_ndbinfo_tables;i++)
{
@@ -277,6 +281,7 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
}
else
{
+ jam();
DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
conf->tableId= req.tableId;
conf->senderRef= req.senderRef;
@@ -296,8 +301,9 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
if(!(req.requestInfo & DbinfoScanReq::StartScan))
{
- startTableId= req.cur_item >> 8;
- startColumnId= req.cur_item & 0xFF;
+ jam();
+ startTableId= req.cursor.cur_item >> 8;
+ startColumnId= req.cursor.cur_item & 0xFF;
}
struct ndbinfo_table *t;
@@ -313,6 +319,7 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
for(j=startColumnId; continue_sending && j<t->ncols;j++)
{
+ jam();
dbinfo_write_row_init(&r, buf, sizeof(buf));
dbinfo_write_row_column(&r, (char*)&i, sizeof(i));
dbinfo_write_row_column(&r, (char*)&j, sizeof(j));
@@ -334,6 +341,7 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
}
else
{
+ jam();
DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
conf->tableId= req.tableId;
conf->senderRef= req.senderRef;
@@ -375,17 +383,17 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
memcpy(signal->getDataPtrSend(),&ireq,DbinfoScanReq::SignalLength*sizeof(Uint32));
- oreq->cur_requestInfo= 0;
- oreq->cur_node= 0;
- oreq->cur_block= DBINFO;
- oreq->cur_item= 0;
-
- for(oreq->cur_node= 0;
- !c_aliveNodes.get(oreq->cur_node);
- oreq->cur_node++)
+ oreq->cursor.cur_requestInfo= 0;
+ oreq->cursor.cur_node= 0;
+ oreq->cursor.cur_block= DBINFO;
+ oreq->cursor.cur_item= 0;
+
+ for(oreq->cursor.cur_node= 0;
+ !c_aliveNodes.get(oreq->cursor.cur_node);
+ oreq->cursor.cur_node++)
;
- sendSignal(numberToRef(DBINFO,oreq->cur_node), GSN_DBINFO_SCANREQ,
+ sendSignal(numberToRef(DBINFO,oreq->cursor.cur_node), GSN_DBINFO_SCANREQ,
signal, DbinfoScanReq::SignalLengthWithCursor, JBB);
}
else
@@ -393,10 +401,11 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
/**
* We have a cursor, so we need to continue scanning.
*/
+ jam();
int next_dbinfo_block= 0;
- if(req.cur_block != DBINFO)
+ if(req.cursor.cur_block != DBINFO)
{
- while(dbinfo_blocks[next_dbinfo_block] != req.cur_block
+ while(dbinfo_blocks[next_dbinfo_block] != req.cursor.cur_block
&& dbinfo_blocks[next_dbinfo_block] != 0)
{
jam();
@@ -409,9 +418,9 @@ void Dbinfo::execDBINFO_SCANREQ(Signal *
memcpy(signal->getDataPtrSend(),&ireq,signal->getLength()*sizeof(Uint32));
- oreq->cur_block= dbinfo_blocks[next_dbinfo_block];
+ oreq->cursor.cur_block= dbinfo_blocks[next_dbinfo_block];
- sendSignal(numberToRef(oreq->cur_block,oreq->cur_node),
+ sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
GSN_DBINFO_SCANREQ,
signal, signal->getLength(), JBB);
}
@@ -428,8 +437,8 @@ void Dbinfo::execDBINFO_SCANCONF(Signal
const Uint32 tableId= conf.tableId;
const Uint32 senderRef= conf.senderRef;
const Uint32 apiTxnId= conf.apiTxnId;
- const Uint32 colBitmapLo= conf.colBitmapLo;
- const Uint32 colBitmapHi= conf.colBitmapHi;
+ //const Uint32 colBitmapLo= conf.colBitmapLo;
+ //const Uint32 colBitmapHi= conf.colBitmapHi;
DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
@@ -442,7 +451,7 @@ void Dbinfo::execDBINFO_SCANCONF(Signal
*/
jam();
oreq->requestInfo &= ~(DbinfoScanReq::StartScan);
- sendSignal(numberToRef(oreq->cur_block,oreq->cur_node),
+ sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
GSN_DBINFO_SCANREQ,
signal, signal->getLength(), JBB);
return;
@@ -457,9 +466,10 @@ void Dbinfo::execDBINFO_SCANCONF(Signal
return;
}
- if(conf.cur_block != DBINFO)
+ if(conf.cursor.cur_block != DBINFO)
{
- while(dbinfo_blocks[next_dbinfo_block] != conf.cur_block
+ jam();
+ while(dbinfo_blocks[next_dbinfo_block] != conf.cursor.cur_block
&& dbinfo_blocks[next_dbinfo_block] != 0)
{
jam();
@@ -471,21 +481,23 @@ void Dbinfo::execDBINFO_SCANCONF(Signal
if(dbinfo_blocks[next_dbinfo_block]!=0)
{
- oreq->cur_block= dbinfo_blocks[next_dbinfo_block];
+ jam();
+ oreq->cursor.cur_block= dbinfo_blocks[next_dbinfo_block];
}
else
{
- for(oreq->cur_node++;
- !c_aliveNodes.get(oreq->cur_node)
- && oreq->cur_node < MAX_NDB_NODES;
- oreq->cur_node++)
+ for(oreq->cursor.cur_node++;
+ !c_aliveNodes.get(oreq->cursor.cur_node)
+ && oreq->cursor.cur_node < MAX_NDB_NODES;
+ oreq->cursor.cur_node++)
;
- if(oreq->cur_node < MAX_NDB_NODES)
+ if(oreq->cursor.cur_node < MAX_NDB_NODES)
{
- oreq->cur_requestInfo= 0;
- oreq->cur_block= DBINFO;
- oreq->cur_item= 0;
+ jam();
+ oreq->cursor.cur_requestInfo= 0;
+ oreq->cursor.cur_block= DBINFO;
+ oreq->cursor.cur_item= 0;
}
else
{
@@ -499,7 +511,7 @@ void Dbinfo::execDBINFO_SCANCONF(Signal
}
}
- sendSignal(numberToRef(oreq->cur_block,oreq->cur_node),
+ sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
GSN_DBINFO_SCANREQ,
signal, signal->getLength(), JBB);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h'
--- a/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h 2008-10-08 10:52:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h 2008-11-10 11:44:02 +0000
@@ -24,7 +24,7 @@ extern "C" {
/** Reserved for DBINFO only */
DECLARE_NDBINFO_TABLE(ndbinfo_TABLES,3)
- = {{"TABLES",3,0},
+ = { "TABLES", 3, 0,
{
{"TABLE_ID", NDBINFO_TYPE_NUMBER},
{"TABLE_NAME",NDBINFO_TYPE_STRING},
@@ -33,7 +33,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_TABLES,3)
/** Reserved for DBINFO only */
DECLARE_NDBINFO_TABLE(ndbinfo_COLUMNS,4)
- = {{"COLUMNS",4,0},
+ = { "COLUMNS", 4, 0,
{
{"TABLE_ID", NDBINFO_TYPE_NUMBER},
{"COLUMN_ID", NDBINFO_TYPE_NUMBER},
@@ -42,7 +42,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_COLUMNS,4)
}};
DECLARE_NDBINFO_TABLE(ndbinfo_MEMUSAGE,6)
- = {{"MEMUSAGE",6,0},
+ = { "MEMUSAGE", 6, 0,
{
{"RESOURCE_NAME", NDBINFO_TYPE_STRING},
{"NODE_ID", NDBINFO_TYPE_NUMBER},
@@ -53,7 +53,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_MEMUSAGE,6
}};
DECLARE_NDBINFO_TABLE(ndbinfo_LOGDESTINATION,5) =
-{{"LOGDESTINATION",5,0},
+{ "LOGDESTINATION", 5, 0,
{
{"NODE_ID",NDBINFO_TYPE_NUMBER},
{"TYPE",NDBINFO_TYPE_STRING},
@@ -64,7 +64,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_LOGDESTINA
};
DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_RECORDS,11)
-= {{"BACKUP_RECORDS",11,0},
+= { "BACKUP_RECORDS", 11, 0,
{
{"NODE_ID", NDBINFO_TYPE_NUMBER},
{"BACKUP_RECORD", NDBINFO_TYPE_NUMBER},
@@ -81,7 +81,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_REC
};
DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_PARAMETERS,14)
-= {{"BACKUP_PARAMETERS",14,0},
+= { "BACKUP_PARAMETERS", 14, 0,
{
{"NODE_ID", NDBINFO_TYPE_NUMBER},
{"CURRENT_DISK_WRITE_SPEED", NDBINFO_TYPE_NUMBER},
@@ -101,7 +101,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_PAR
};
DECLARE_NDBINFO_TABLE(ndbinfo_POOLS,5)
-= {{"POOLS",5 ,0},
+= { "POOLS", 5, 0,
{
{"NODE_ID", NDBINFO_TYPE_NUMBER},
{"BLOCK", NDBINFO_TYPE_STRING},
@@ -113,7 +113,7 @@ DECLARE_NDBINFO_TABLE(ndbinfo_POOLS,5)
static Uint32 number_ndbinfo_tables= 7;
-#define DBINFOTBL(x) (struct ndbinfo_table*)&(x).t
+#define DBINFOTBL(x) (struct ndbinfo_table*)&x
struct ndbinfo_table *ndbinfo_tables[] = {
DBINFOTBL(ndbinfo_TABLES),
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-11-13 14:16:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-11-13 15:05:07 +0000
@@ -3106,6 +3106,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
Uint8 Toperation = regTcPtr->operation;
Uint8 TopSimple = regTcPtr->opSimple;
+ Uint8 TopDirty = regTcPtr->dirtyOp;
tnoOfBackup = tnodeinfo & 3;
tnoOfStandby = (tnodeinfo >> 8) & 3;
@@ -3113,7 +3114,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
if (Toperation == ZREAD || Toperation == ZREAD_EX)
{
regTcPtr->m_special_op_flags &= ~TcConnectRecord::SOF_REORG_MOVING;
- if (TopSimple == 1){
+ if (TopSimple == 1 && TopDirty == 0){
jam();
/*-------------------------------------------------------------*/
/* A SIMPLE READ CAN SELECT ANY OF THE PRIMARY AND */
@@ -7722,6 +7723,9 @@ Dbtc::checkNodeFailComplete(Signal* sign
nfRep->nodeId = cownNodeid;
nfRep->failedNodeId = hostptr.i;
sendSignal(cdihblockref, GSN_NF_COMPLETEREP, signal,
+ NFCompleteRep::SignalLength, JBB);
+
+ sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
NFCompleteRep::SignalLength, JBB);
}
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2008-08-11 11:30:18 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp 2008-11-13 13:36:29 +0000
@@ -250,6 +250,7 @@ private:
void execDUMP_STATE_ORD(Signal* signal);
void execCONNECT_REP(Signal* signal);
void execNDB_FAILCONF(Signal* signal);
+ void execNF_COMPLETEREP(Signal*);
void execREAD_CONFIG_REQ(Signal* signal);
void execSTTOR(Signal* signal);
void execCM_INFOCONF(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2008-05-23 09:26:56 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp 2008-11-13 13:36:29 +0000
@@ -100,6 +100,7 @@ Qmgr::Qmgr(Block_context& ctx)
// Received signals
addRecSignal(GSN_CONNECT_REP, &Qmgr::execCONNECT_REP);
addRecSignal(GSN_NDB_FAILCONF, &Qmgr::execNDB_FAILCONF);
+ addRecSignal(GSN_NF_COMPLETEREP, &Qmgr::execNF_COMPLETEREP);
addRecSignal(GSN_READ_CONFIG_REQ, &Qmgr::execREAD_CONFIG_REQ);
addRecSignal(GSN_STTOR, &Qmgr::execSTTOR);
addRecSignal(GSN_CLOSE_COMCONF, &Qmgr::execCLOSE_COMCONF);
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-08-27 20:27:20 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-11-13 13:36:29 +0000
@@ -2793,11 +2793,46 @@ void Qmgr::execNDB_FAILCONF(Signal* sign
if (nodePtr.p->phase == ZAPI_ACTIVE){
jam();
sendSignal(nodePtr.p->blockRef, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBA);
+ NFCompleteRep::SignalLength, JBB);
}//if
}//for
return;
}//Qmgr::execNDB_FAILCONF()
+
+void
+Qmgr::execNF_COMPLETEREP(Signal* signal)
+{
+ jamEntry();
+ NFCompleteRep rep = *(NFCompleteRep*)signal->getDataPtr();
+ if (rep.blockNo != DBTC)
+ {
+ jam();
+ ndbassert(false);
+ return;
+ }
+
+ /**
+ * This is a disgrace...but execNF_COMPLETEREP in ndbapi is a mess
+ * actually equally messy as it is in ndbd...
+ * this is therefore a simple way of having ndbapi to get
+ * earlier information that transactions can be aborted
+ */
+ signal->theData[0] = rep.failedNodeId;
+ NodeRecPtr nodePtr;
+ for (nodePtr.i = 1; nodePtr.i < MAX_NODES; nodePtr.i++)
+ {
+ jam();
+ ptrAss(nodePtr, nodeRec);
+ if (nodePtr.p->phase == ZAPI_ACTIVE &&
+ ndb_takeovertc(getNodeInfo(nodePtr.i).m_version))
+ {
+ jam();
+ sendSignal(nodePtr.p->blockRef, GSN_TAKE_OVERTCCONF, signal,
+ NFCompleteRep::SignalLength, JBB);
+ }//if
+ }//for
+ return;
+}
/*******************************/
/* DISCONNECT_REP */
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-07 19:50:20 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-12 08:26:18 +0000
@@ -409,14 +409,20 @@ Configuration::setupConfiguration(){
/**
* Configure transporters
*/
- {
- int res = IPCConfig::configureTransporters(globalData.ownId,
- * p,
- globalTransporterRegistry);
- if(res <= 0){
- ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
- "No transporters configured");
- }
+ if (!globalTransporterRegistry.init(globalData.ownId))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not init transporter registry");
+ }
+
+ if (!IPCConfig::configureTransporters(globalData.ownId,
+ * p,
+ globalTransporterRegistry))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not configure transporters");
}
/**
@@ -434,11 +440,7 @@ Configuration::setupConfiguration(){
}
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
globalTransporterRegistry.allocate_send_buffers(total_send_buffer);
if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-10-30 20:30:06 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-11-11 07:27:00 +0000
@@ -218,7 +218,7 @@ TransporterCallbackKernel::reportError(N
const char *info)
{
#ifdef DEBUG_TRANSPORTER
- ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "")
+ ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
#endif
DBUG_ENTER("reportError");
=== modified file 'storage/ndb/src/mgmsrv/Config.cpp'
--- a/storage/ndb/src/mgmsrv/Config.cpp 2008-10-21 12:41:59 +0000
+++ b/storage/ndb/src/mgmsrv/Config.cpp 2008-11-13 07:57:54 +0000
@@ -15,7 +15,7 @@
#include "Config.hpp"
-#include <mgmapi_config_parameters.h>
+#include <mgmapi.h>
#include <NdbOut.hpp>
#include "ConfigInfo.hpp"
@@ -50,9 +50,7 @@ Config::Config(const Config* conf)
Config::~Config() {
- if(m_configValues != 0){
- free(m_configValues);
- }
+ ndb_mgm_destroy_configuration(m_configValues);
}
unsigned sections[]=
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-11-07 13:23:15 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-11-10 14:43:52 +0000
@@ -4521,13 +4521,14 @@ ConfigInfo::ParamInfoIter::ParamInfoIter
const ConfigInfo::ParamInfo & param = info.m_ParamInfo[j];
if (param._type == ConfigInfo::CI_SECTION &&
param._paramId == section &&
- (section_type == (Uint32)~0 || param._section_type == section_type))
+ (section_type == ~(Uint32)0 ||
+ Uint32(param._section_type) == section_type))
{
m_section_name= param._section;
- break;
+ return;
}
}
- assert(m_section_name);
+ abort();
}
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.hpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-10-21 12:41:59 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.hpp 2008-11-10 14:43:52 +0000
@@ -83,7 +83,8 @@ public:
*/
union {
const char* _default;
- Uint32 _section_type; // if _type = CI_SECTION
+ UintPtr _section_type; // if _type = CI_SECTION
+ /** NOTE must be UintPtr to be of same size as _default */
};
const char* _min;
const char* _max;
=== modified file 'storage/ndb/src/mgmsrv/ConfigManager.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigManager.cpp 2008-11-06 10:54:21 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigManager.cpp 2008-11-13 07:52:42 +0000
@@ -529,7 +529,7 @@ ConfigManager::prepareConfigChange(const
#endif
fclose(f);
- m_prepared_config = new Config(config->m_configValues);
+ m_prepared_config = new Config(config); // Copy
g_eventLogger->debug("Configuration prepared");
return true;
@@ -699,13 +699,13 @@ ConfigManager::execCONFIG_CHANGE_IMPL_RE
return;
}
- Config* new_config = new Config(cf.getConfigValues());
- Uint32 new_generation = new_config->getGeneration();
+ Config new_config(cf.getConfigValues());
+ Uint32 new_generation = new_config.getGeneration();
Uint32 curr_generation = m_config->getGeneration();
- const char* new_name = new_config->getName();
+ const char* new_name = new_config.getName();
const char* curr_name = m_config->getName();
- if (m_config->illegal_change(new_config))
+ if (m_config->illegal_change(&new_config))
{
sendConfigChangeImplRef(ss, nodeId, ConfigChangeRef::IllegalConfigChange);
return;
@@ -738,7 +738,7 @@ ConfigManager::execCONFIG_CHANGE_IMPL_RE
// Check config is equal to our initial config
{
- Config new_config_copy(new_config);
+ Config new_config_copy(&new_config);
require(new_config_copy.setName(new_name));
unsigned exclude[]= {CFG_SECTION_SYSTEM, 0};
if (!new_config_copy.equal(m_new_config, exclude))
@@ -774,7 +774,7 @@ ConfigManager::execCONFIG_CHANGE_IMPL_RE
"generation: %d. Our generation: %d\n" \
"This is the actual diff:\n%s",
new_generation, curr_generation,
- new_config->diff2str(m_config, buf));
+ new_config.diff2str(m_config, buf));
sendConfigChangeImplRef(ss, nodeId, ConfigChangeRef::InvalidGeneration);
return;
}
@@ -789,14 +789,14 @@ ConfigManager::execCONFIG_CHANGE_IMPL_RE
"name: '%s'. Our name: '%s'\n" \
"This is the actual diff:\n%s",
new_name, curr_name,
- new_config->diff2str(m_config, buf));
+ new_config.diff2str(m_config, buf));
sendConfigChangeImplRef(ss, nodeId, ConfigChangeRef::InvalidConfigName);
return;
}
}
// Set new generation
- if(!new_config->setGeneration(new_generation))
+ if(!new_config.setGeneration(new_generation))
{
g_eventLogger->error("Failed to set new generation to %d",
new_generation);
@@ -804,7 +804,7 @@ ConfigManager::execCONFIG_CHANGE_IMPL_RE
return;
}
- if (!prepareConfigChange(new_config))
+ if (!prepareConfigChange(&new_config))
{
sendConfigChangeImplRef(ss, nodeId, ConfigChangeRef::PrepareFailed);
return;
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-06 16:39:26 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-13 13:36:29 +0000
@@ -265,6 +265,7 @@ MgmtSrvr::MgmtSrvr(const MgmtOpts& opts,
m_local_config(NULL),
_ownReference(0),
m_config_manager(NULL),
+ m_need_restart(false),
theFacade(NULL),
_isStopThread(false),
_logLevelThreadSleep(500),
@@ -743,7 +744,16 @@ MgmtSrvr::config_changed(NodeId node_id,
setClusterLog(m_local_config);
- // TODO Magnus, Reload ClusterMgr::theNodes
+ if (theFacade)
+ {
+ if (!theFacade->configure(_ownNodeId,
+ m_local_config->m_configValues))
+ {
+ g_eventLogger->warning("Could not reconfigure everything online, "
+ "this node need a restart");
+ m_need_restart= true;
+ }
+ }
DBUG_VOID_RETURN;
}
@@ -973,11 +983,10 @@ MgmtSrvr::sendVersionReq(int v_nodeId,
do_send = 1; // retry with other node
continue;
}
-
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
// Ignore
continue;
-
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
@@ -1294,7 +1303,8 @@ int MgmtSrvr::sendSTOP_REQ(const Vector<
break;
}
case GSN_API_REGCONF:
- break;
+ case GSN_TAKE_OVERTCCONF:
+ continue;
default:
report_unknown_signal(signal);
#ifdef VM_TRACE
@@ -1857,7 +1867,8 @@ MgmtSrvr::setEventReportingLevelImpl(int
break;
}
case GSN_API_REGCONF:
- break;
+ case GSN_TAKE_OVERTCCONF:
+ continue;
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
@@ -1993,6 +2004,7 @@ retry:
break;
}
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
break;
default:
report_unknown_signal(signal);
@@ -2051,6 +2063,7 @@ MgmtSrvr::endSchemaTrans(SignalSender& s
break;
}
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
break;
default:
report_unknown_signal(signal);
@@ -2144,6 +2157,7 @@ MgmtSrvr::createNodegroup(int *nodes, in
break;
}
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
break;
default:
report_unknown_signal(signal);
@@ -2218,6 +2232,7 @@ MgmtSrvr::dropNodegroup(int ng)
break;
}
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
break;
default:
report_unknown_signal(signal);
@@ -2515,6 +2530,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSig
ndbout << "TAMPER ORD" << endl;
break;
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
break;
case GSN_DBINFO_SCANREQ:
@@ -2711,7 +2727,8 @@ MgmtSrvr::alloc_node_id_req(NodeId free_
continue;
}
case GSN_API_REGCONF:
- break;
+ case GSN_TAKE_OVERTCCONF:
+ continue;
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
@@ -3232,7 +3249,8 @@ MgmtSrvr::startBackup(Uint32& backupId,
break;
}
case GSN_API_REGCONF:
- break;
+ case GSN_TAKE_OVERTCCONF:
+ continue;
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
@@ -3770,9 +3788,9 @@ int MgmtSrvr::ndbinfo(Uint32 tableId,
{
memcpy(req,conf,signal->header.theLength*sizeof(Uint32));
req->requestInfo &= ~(DbinfoScanReq::StartScan);
- ssig.set(ss, TestOrd::TraceAPI, req->cur_block, GSN_DBINFO_SCANREQ,
- DbinfoScanReq::SignalLengthWithCursor);
- nodeId= req->cur_node;
+ ssig.set(ss, TestOrd::TraceAPI, req->cursor.cur_block,
+ GSN_DBINFO_SCANREQ, DbinfoScanReq::SignalLengthWithCursor);
+ nodeId= req->cursor.cur_node;
do_send= 1;
continue;
@@ -3783,6 +3801,7 @@ int MgmtSrvr::ndbinfo(Uint32 tableId,
}
break;
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
// Ignore;
break;
default:
@@ -3862,6 +3881,7 @@ MgmtSrvr::change_config(Config& new_conf
}
case GSN_API_REGCONF:
+ case GSN_TAKE_OVERTCCONF:
// Ignore;
break;
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-06 10:56:21 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-12 08:17:14 +0000
@@ -484,6 +484,8 @@ private:
class ConfigManager* m_config_manager;
+ bool m_need_restart;
+
NodeBitmask m_reserved_nodes;
struct in_addr m_connect_address[MAX_NODES];
=== modified file 'storage/ndb/src/mgmsrv/main.cpp'
--- a/storage/ndb/src/mgmsrv/main.cpp 2008-11-07 15:20:50 +0000
+++ b/storage/ndb/src/mgmsrv/main.cpp 2008-11-13 08:00:21 +0000
@@ -136,6 +136,18 @@ static void usage()
ndb_usage(short_usage_sub, load_default_groups, my_long_options);
}
+
+static void
+mgmd_exit(int result)
+{
+ g_eventLogger->close();
+
+ ndb_end(opt_ndb_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
+
+ exit(result);
+}
+
+
int main(int argc, char** argv)
{
NDB_INIT(argv[0]);
@@ -152,7 +164,7 @@ int main(int argc, char** argv)
#endif
if ((ho_error=handle_options(&argc, &argv, my_long_options,
ndb_std_get_one_option)))
- exit(ho_error);
+ mgmd_exit(ho_error);
if (opts.interactive ||
opts.non_interactive ||
@@ -169,7 +181,7 @@ int main(int argc, char** argv)
if (opts.mycnf && opts.config_filename)
{
g_eventLogger->error("Both --mycnf and -f is not supported");
- exit(1);
+ mgmd_exit(1);
}
/**
@@ -187,13 +199,13 @@ start:
mgm= new MgmtSrvr(opts, opt_connect_str);
if (mgm == NULL) {
g_eventLogger->critical("Out of memory, couldn't create MgmtSrvr");
- exit(1);
+ mgmd_exit(1);
}
/* Init mgm, load or fetch config */
if (!mgm->init()) {
delete mgm;
- exit(1);
+ mgmd_exit(1);
}
my_setwd(NdbConfig_get_path(0), MYF(0));
@@ -204,7 +216,7 @@ start:
if (localNodeId == 0) {
g_eventLogger->error("Couldn't get own node id");
delete mgm;
- exit(1);
+ mgmd_exit(1);
}
// Become a daemon
@@ -215,14 +227,14 @@ start:
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
g_eventLogger->error("Cannot become daemon: %s", NdbDaemon_ErrorText);
delete mgm;
- exit(1);
+ mgmd_exit(1);
}
}
/* Start mgm services */
if (!mgm->start()) {
delete mgm;
- exit(1);
+ mgmd_exit(1);
}
if(opts.interactive) {
@@ -250,9 +262,6 @@ start:
goto start;
}
- g_eventLogger->close();
-
- ndb_end(opt_ndb_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
- return 0;
+ mgmd_exit(0);
}
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-06 12:00:21 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-12 08:17:14 +0000
@@ -79,7 +79,8 @@ ClusterMgr::~ClusterMgr()
}
void
-ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
+ClusterMgr::configure(const ndb_mgm_configuration* config){
+ ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next()){
Uint32 nodeId = 0;
if(iter.get(CFG_NODE_ID, &nodeId))
@@ -107,6 +108,15 @@ ClusterMgr::init(ndb_mgm_configuration_i
type = type;
break;
}
+ }
+
+ /* Mark all non existing nodes as not defined */
+ for(Uint32 i = 0; i<MAX_NODES; i++) {
+ if (iter.first())
+ continue;
+
+ if (iter.find(CFG_NODE_ID, i))
+ theNodes[i]= Node();
}
/* Init own node info */
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-06 17:34:29 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-12 08:17:14 +0000
@@ -38,7 +38,7 @@ class ClusterMgr {
public:
ClusterMgr(class TransporterFacade &);
~ClusterMgr();
- void init(struct ndb_mgm_configuration_iterator & config);
+ void configure(const ndb_mgm_configuration* config);
void reportConnected(NodeId nodeId);
void reportDisconnected(NodeId nodeId);
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2008-11-06 16:52:59 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2008-11-13 13:36:29 +0000
@@ -915,6 +915,9 @@ Ndb::handleReceivedSignal(NdbApiSignal*
case GSN_API_REGCONF:{
return; // Ignore
}
+ case GSN_TAKE_OVERTCCONF:
+ abortTransactionsAfterNodeFailure(tFirstData); // theData[0]
+ break;
default:
tFirstDataPtr = NULL;
goto InvalidSignal;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-06 13:52:28 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-13 13:36:29 +0000
@@ -372,6 +372,16 @@ TransporterFacade::deliver_signal(Signal
}
break;
}
+ case GSN_TAKE_OVERTCCONF:
+ {
+ /**
+ * Report
+ */
+ NdbApiSignal tSignal(* header);
+ tSignal.setDataPtr(theData);
+ for_each(&tSignal, ptr);
+ return;
+ }
default:
break;
@@ -420,13 +430,43 @@ copy(Uint32 * & insertPtr,
*/
int
-TransporterFacade::start_instance(int nodeId,
- const ndb_mgm_configuration* props)
+TransporterFacade::start_instance(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- if (! init(nodeId, props)) {
+ assert(theOwnId == 0);
+ theOwnId = nodeId;
+
+ theTransporterRegistry = new TransporterRegistry(this);
+ if (theTransporterRegistry == NULL)
return -1;
- }
-
+
+ if (!theTransporterRegistry->init(nodeId))
+ return -1;
+
+ theClusterMgr = new ClusterMgr(*this);
+ if (theClusterMgr == NULL)
+ return -1;
+
+ if (!configure(nodeId, conf))
+ return -1;
+
+ if (!theTransporterRegistry->start_service(m_socket_server))
+ return -1;
+
+ theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+ (void**)this,
+ 32768,
+ "ndb_receive",
+ NDB_THREAD_PRIO_LOW);
+
+ theSendThread = NdbThread_Create(runSendRequest_C,
+ (void**)this,
+ 32768,
+ "ndb_send",
+ NDB_THREAD_PRIO_LOW);
+
+ theClusterMgr->startThread();
+
/**
* Install signal handler for SIGPIPE
*
@@ -679,82 +719,89 @@ void TransporterFacade::init_cond_wait_q
TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
theTransporterRegistry(0),
+ theOwnId(0),
+ theStartNodeId(1),
+ theClusterMgr(NULL),
+ theArbitMgr(NULL),
+ checkCounter(4),
+ currentSendLimit(1),
+ m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
+ m_batch_byte_size(SCAN_BATCH_SIZE),
+ m_batch_size(DEF_BATCH_SIZE),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
+ m_max_trans_id(0),
m_fragmented_signal_id(0),
m_globalDictCache(cache)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
init_cond_wait_queue();
poll_owner = NULL;
- theOwnId = 0;
theMutexPtr = NdbMutex_Create();
sendPerformedLastInterval = 0;
- checkCounter = 4;
- currentSendLimit = 1;
- theClusterMgr = NULL;
- theArbitMgr = NULL;
- theStartNodeId = 1;
- m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
- m_batch_byte_size= SCAN_BATCH_SIZE;
- m_batch_size= DEF_BATCH_SIZE;
- m_max_trans_id = 0;
-
for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
m_fixed2dynamic[i]= RNIL;
- theClusterMgr = new ClusterMgr(* this);
-
#ifdef API_TRACE
apiSignalLog = 0;
#endif
DBUG_VOID_RETURN;
}
+
bool
-TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
+TransporterFacade::configure(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- DBUG_ENTER("TransporterFacade::init");
-
- theOwnId = nodeId;
- theTransporterRegistry = new TransporterRegistry(this);
+ DBUG_ENTER("TransporterFacade::configure");
- const int res = IPCConfig::configureTransporters(nodeId,
- * props,
- * theTransporterRegistry);
- if(res <= 0){
- TRP_DEBUG( "configureTransporters returned 0 or less" );
+ assert(theOwnId == nodeId);
+ assert(theTransporterRegistry);
+ assert(theClusterMgr);
+
+ // Configure transporters
+ if (!IPCConfig::configureTransporters(nodeId,
+ * conf,
+ * theTransporterRegistry))
DBUG_RETURN(false);
- }
-
- ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
- iter.first();
- theClusterMgr->init(iter);
-
- iter.first();
- if(iter.find(CFG_NODE_ID, nodeId)){
- TRP_DEBUG( "Node info missing from config." );
+
+ // Configure cluster manager
+ theClusterMgr->configure(conf);
+
+ ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
+ if(iter.find(CFG_NODE_ID, nodeId))
DBUG_RETURN(false);
- }
-
+
+ // Configure send buffers
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
theTransporterRegistry->allocate_send_buffers(total_send_buffer);
+ // Configure arbitrator
Uint32 rank = 0;
- if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
- theArbitMgr = new ArbitMgr(* this);
+ iter.get(CFG_NODE_ARBIT_RANK, &rank);
+ if (rank > 0)
+ {
+ // The arbitrator should be active
+ if (!theArbitMgr)
+ theArbitMgr = new ArbitMgr(* this);
theArbitMgr->setRank(rank);
+
Uint32 delay = 0;
iter.get(CFG_NODE_ARBIT_DELAY, &delay);
theArbitMgr->setDelay(delay);
}
+ else if (theArbitMgr)
+ {
+ // No arbitrator should be started
+ theArbitMgr->doStop(NULL);
+ delete theArbitMgr;
+ theArbitMgr= NULL;
+ }
+
+ // Configure scan settings
Uint32 scan_batch_size= 0;
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
m_scan_batch_size= scan_batch_size;
@@ -767,9 +814,9 @@ TransporterFacade::init(Uint32 nodeId, c
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
m_batch_size= batch_size;
}
-
+
+ // Configure timeouts
Uint32 timeout = 120000;
- iter.first();
for (iter.first(); iter.valid(); iter.next())
{
Uint32 tmp1 = 0, tmp2 = 0;
@@ -780,24 +827,6 @@ TransporterFacade::init(Uint32 nodeId, c
timeout = tmp1;
}
m_waitfor_timeout = timeout;
-
- if (!theTransporterRegistry->start_service(m_socket_server)){
- ndbout_c("Unable to start theTransporterRegistry->start_service");
- DBUG_RETURN(false);
- }
-
- theReceiveThread = NdbThread_Create(runReceiveResponse_C,
- (void**)this,
- 32768,
- "ndb_receive",
- NDB_THREAD_PRIO_LOW);
-
- theSendThread = NdbThread_Create(runSendRequest_C,
- (void**)this,
- 32768,
- "ndb_send",
- NDB_THREAD_PRIO_LOW);
- theClusterMgr->startThread();
#ifdef API_TRACE
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-04 08:43:06 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-12 08:17:14 +0000
@@ -53,11 +53,16 @@ public:
STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade(GlobalDictCache *cache);
virtual ~TransporterFacade();
- bool init(Uint32, const ndb_mgm_configuration *);
- int start_instance(int, const ndb_mgm_configuration*);
+ int start_instance(NodeId, const ndb_mgm_configuration*);
void stop_instance();
-
+
+ /*
+ (Re)configure the TransporterFacade
+ to a specific configuration
+ */
+ bool configure(NodeId, const ndb_mgm_configuration *);
+
/**
* Register this block for sending/receiving signals
* @blockNo block number to use, -1 => any blockNumber
@@ -222,8 +227,7 @@ private:
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
- int theOwnId;
-
+ NodeId theOwnId;
NodeId theStartNodeId;
ClusterMgr* theClusterMgr;
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-09-09 12:04:17 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-11-12 08:17:14 +0000
@@ -711,7 +711,9 @@ int Ndb_cluster_connection::connect(int
if(props == 0)
break;
- m_impl.m_transporter_facade->start_instance(nodeId, props);
+ if (m_impl.m_transporter_facade->start_instance(nodeId, props) < 0)
+ DBUG_RETURN(-1);
+
if (m_impl.init_nodes_vector(nodeId, *props))
{
ndbout_c("Ndb_cluster_connection::connect: malloc failure");
=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-11-03 08:38:27 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-11-10 11:45:05 +0000
@@ -49,15 +49,7 @@ const char COL_LEN = 7;
* there are six columns, 'i', 'j', 'k', 'l', 'm', 'n', and each on is equal to 1 or 1,
* Since each tuple should be unique in this case, then TUPLE_NUM = 2 power 6 = 64
*/
-#if _AIX || NDB_WIN
-/*
- IBM xlC_r breaks on the initialization with pow():
- "The expression must be an integral constant expression."
-*/
-const int TUPLE_NUM = 64;
-#else
-const int TUPLE_NUM = (int)pow(2, COL_LEN-1);
-#endif
+const int TUPLE_NUM = 1 << (COL_LEN - 1);
/*
* the recursive level of random scan filter, can
@@ -487,7 +479,7 @@ int get_column_id(char ch)
*/
bool check_col_equal_one(int tuple_no, int col_id)
{
- int i = (int)pow((double)2, (double)(6 - col_id));
+ int i = 1 << (6 - col_id);
int j = tuple_no / i;
if(j % 2)
return true;
=== modified file 'storage/ndb/test/ndbapi/test_event.cpp'
--- a/storage/ndb/test/ndbapi/test_event.cpp 2008-10-31 15:20:07 +0000
+++ b/storage/ndb/test/ndbapi/test_event.cpp 2008-11-11 09:09:59 +0000
@@ -40,6 +40,8 @@ static int createEvent(Ndb *pNdb,
<< pNdb->getNdbError().message << endl;
return NDBT_FAILED;
}
+
+ myDict->dropEvent(eventName);
NdbDictionary::Event myEvent(eventName);
myEvent.setTable(tab.getName());
=== modified file 'storage/ndb/test/run-test/db.cpp'
--- a/storage/ndb/test/run-test/db.cpp 2008-09-25 10:21:14 +0000
+++ b/storage/ndb/test/run-test/db.cpp 2008-11-10 11:41:44 +0000
@@ -206,7 +206,6 @@ BINDS(MYSQL_BIND& bind, const char * s,
}
template <typename T>
-static
int
find(T* obj, Vector<T*>& arr)
{
@@ -472,6 +471,6 @@ setup_repl(atrt_config& config)
return true;
}
-template static int find(atrt_host* obj, Vector<atrt_host*>& arr);
-template static int find(atrt_cluster* obj, Vector<atrt_cluster*>& arr);
+template int find(atrt_host* obj, Vector<atrt_host*>& arr);
+template int find(atrt_cluster* obj, Vector<atrt_cluster*>& arr);
=== modified file 'storage/ndb/test/src/DbUtil.cpp'
--- a/storage/ndb/test/src/DbUtil.cpp 2008-10-30 15:15:48 +0000
+++ b/storage/ndb/test/src/DbUtil.cpp 2008-11-10 10:55:33 +0000
@@ -17,7 +17,7 @@
#include "DbUtil.hpp"
#include <NdbSleep.h>
-
+#include <NdbAutoPtr.hpp>
/* Constructors */
@@ -357,7 +357,9 @@ DbUtil::runQuery(const char* sql,
}
uint params= mysql_stmt_param_count(stmt);
- MYSQL_BIND bind_param[params];
+ MYSQL_BIND *bind_param = new MYSQL_BIND[params];
+ NdbAutoObjArrayPtr<MYSQL_BIND> _guard(bind_param);
+
bzero(bind_param, sizeof(bind_param));
for(uint i= 0; i < mysql_stmt_param_count(stmt); i++)
@@ -427,7 +429,8 @@ DbUtil::runQuery(const char* sql,
{
MYSQL_FIELD *fields= mysql_fetch_fields(res);
uint num_fields= mysql_num_fields(res);
- MYSQL_BIND bind_result[num_fields];
+ MYSQL_BIND *bind_result = new MYSQL_BIND[num_fields];
+ NdbAutoObjArrayPtr<MYSQL_BIND> _guard1(bind_result);
bzero(bind_result, sizeof(bind_result));
for (uint i= 0; i < num_fields; i++)
=== modified file 'storage/ndb/test/src/NDBT_Tables.cpp'
--- a/storage/ndb/test/src/NDBT_Tables.cpp 2008-10-10 09:32:12 +0000
+++ b/storage/ndb/test/src/NDBT_Tables.cpp 2008-11-11 12:54:17 +0000
@@ -413,6 +413,8 @@ NDBT_Attribute D2Attribs[] = {
NDBT_Attribute("KOL5", NdbDictionary::Column::Char, 199, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
NDBT_Attribute("KOL6", NdbDictionary::Column::Bit, 21, false, false, 0, NdbDictionary::Column::StorageTypeDisk),
NDBT_Attribute("KOL7", NdbDictionary::Column::Longvarbinary, 384, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
+ NDBT_Attribute("KOL8", NdbDictionary::Column::Varbinary, 88, false, true, 0, NdbDictionary::Column::StorageTypeDisk)
+
};
static
const
=== modified file 'storage/ndb/test/tools/rep_latency.cpp'
--- a/storage/ndb/test/tools/rep_latency.cpp 2007-04-23 18:27:43 +0000
+++ b/storage/ndb/test/tools/rep_latency.cpp 2008-11-10 11:41:44 +0000
@@ -18,6 +18,7 @@
*
*/
+#include <ndb_global.h>
#include <NdbApi.hpp>
#include <NdbSleep.h>
#include <sys/time.h>
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (tomas.ulin:3069) | Tomas Ulin | 13 Nov |