Below is the list of changes that have just been committed into a local
5.2 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-11-23 23:49:58+11:00, stewart@willster.(none) +21 -0
Merge willster.(none):/home/stewart/Documents/MySQL/5.2/main
into willster.(none):/home/stewart/Documents/MySQL/5.2/wl1504-add-node
Merge latest 5.2 into add node tree
MERGE: 1.2098.11.10
storage/ndb/include/kernel/GlobalSignalNumbers.h@stripped, 2006-11-23 23:04:14+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.16.1.11
storage/ndb/include/mgmapi/mgmapi.h@stripped, 2006-11-23 23:04:14+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.50.1.3
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp@stripped, 2006-11-23
23:49:54+11:00, stewart@willster.(none) +0 -1
merge
MERGE: 1.11.1.5
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2006-11-23
23:04:14+11:00, stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.63.1.2
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2006-11-23 23:49:54+11:00,
stewart@willster.(none) +6 -5
merge
MERGE: 1.77.3.2
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2006-11-23 23:49:54+11:00,
stewart@willster.(none) +2 -3
merge
MERGE: 1.28.1.1
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp@stripped, 2006-11-23 23:04:16+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.13.3.1
storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp@stripped, 2006-11-23 23:49:54+11:00,
stewart@willster.(none) +0 -0
merge
MERGE: 1.11.1.1
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2006-11-23 23:49:54+11:00,
stewart@willster.(none) +0 -0
merge
MERGE: 1.38.1.44
storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp@stripped, 2006-11-23 23:04:16+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.6.1.1
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp@stripped, 2006-11-23 23:49:54+11:00,
stewart@willster.(none) +1 -4
merge
MERGE: 1.29.1.1
storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2006-11-23 23:04:16+11:00,
stewart@willster.(none) +2 -2
Auto merged
MERGE: 1.28.1.20
storage/ndb/src/mgmapi/mgmapi.cpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.60.1.10
storage/ndb/src/mgmclient/CommandInterpreter.cpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.59.1.6
storage/ndb/src/mgmsrv/ConfigInfo.cpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.72.1.1
storage/ndb/src/mgmsrv/MgmtSrvr.cpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.90.1.13
storage/ndb/src/mgmsrv/MgmtSrvr.hpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.41.1.8
storage/ndb/src/mgmsrv/Services.cpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.59.1.13
storage/ndb/src/mgmsrv/Services.hpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.18.1.3
storage/ndb/src/ndbapi/TransporterFacade.hpp@stripped, 2006-11-23 23:04:17+11:00,
stewart@willster.(none) +0 -0
Auto merged
MERGE: 1.27.1.1
storage/ndb/tools/waiter.cpp@stripped, 2006-11-23 23:49:54+11:00, stewart@willster.(none) +1
-2
merge
MERGE: 1.24.1.2
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: stewart
# Host: willster.(none)
# Root: /home/stewart/Documents/MySQL/5.2/wl1504-add-node/RESYNC
--- 1.17/storage/ndb/include/kernel/GlobalSignalNumbers.h 2006-03-11 02:00:13 +11:00
+++ 1.18/storage/ndb/include/kernel/GlobalSignalNumbers.h 2006-11-23 23:04:14 +11:00
@@ -517,16 +517,12 @@
#define GSN_TEST_ORD 407
#define GSN_TESTSIG 408
#define GSN_TIME_SIGNAL 409
-/* 410 unused */
-/* 411 unused */
-/* 412 unused */
#define GSN_TUP_ABORTREQ 414
#define GSN_TUP_ADD_ATTCONF 415
#define GSN_TUP_ADD_ATTRREF 416
#define GSN_TUP_ADD_ATTRREQ 417
#define GSN_TUP_ATTRINFO 418
#define GSN_TUP_COMMITREQ 419
-/* 420 unused */
/* 421 unused */
/* 422 unused */
@@ -695,6 +691,8 @@
#define GSN_BACKUP_FRAGMENT_REF 546
#define GSN_BACKUP_FRAGMENT_CONF 547
+#define GSN_BACKUP_FRAGMENT_COMPLETE_REP 575
+
#define GSN_STOP_BACKUP_REQ 548
#define GSN_STOP_BACKUP_REF 549
#define GSN_STOP_BACKUP_CONF 550
@@ -744,7 +742,7 @@
#define GSN_SUB_STOP_REQ 572
#define GSN_SUB_STOP_REF 573
#define GSN_SUB_STOP_CONF 574
-/* 575 unused */
+/* 575 used */
#define GSN_SUB_CREATE_REQ 576
#define GSN_SUB_CREATE_REF 577
#define GSN_SUB_CREATE_CONF 578
@@ -982,5 +980,11 @@
#define GSN_DICT_ABORT_REQ 667
#define GSN_DICT_ABORT_REF 668
#define GSN_DICT_ABORT_CONF 669
+
+/* DICT LOCK signals */
+#define GSN_DICT_LOCK_REQ 410
+#define GSN_DICT_LOCK_CONF 411
+#define GSN_DICT_LOCK_REF 412
+#define GSN_DICT_UNLOCK_ORD 420
#endif
--- 1.51/storage/ndb/include/mgmapi/mgmapi.h 2006-02-21 22:51:14 +11:00
+++ 1.52/storage/ndb/include/mgmapi/mgmapi.h 2006-11-23 23:04:14 +11:00
@@ -231,6 +231,12 @@
/** Could not connect to socker */
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
+ /* Alloc node id failures */
+ /** Generic error, retry may succeed */
+ NDB_MGM_ALLOCID_ERROR = 1101,
+ /** Non retriable error */
+ NDB_MGM_ALLOCID_CONFIG_MISMATCH = 1102,
+
/* Service errors - Start/Stop Node or System */
/** Start failed */
NDB_MGM_START_FAILED = 2001,
@@ -694,6 +700,28 @@
const int * node_list, int abort);
/**
+ * Stops cluster nodes
+ *
+ * @param handle Management handle.
+ * @param no_of_nodes Number of database nodes to stop<br>
+ * -1: All database and management nodes<br>
+ * 0: All database nodes in cluster<br>
+ * n: Stop the <var>n</var> node(s) specified in
+ * the array node_list
+ * @param node_list List of node IDs of database nodes to be stopped
+ * @param abort Don't perform graceful stop,
+ * but rather stop immediately
+ * @param disconnect Returns true if you need to disconnect to apply
+ * the stop command (e.g. stopping the mgm server
+ * that handle is connected to)
+ *
+ * @return Number of nodes stopped (-1 on error).
+ */
+ int ndb_mgm_stop3(NdbMgmHandle handle, int no_of_nodes,
+ const int * node_list, int abort, int *disconnect);
+
+
+ /**
* Restart database nodes
*
* @param handle Management handle.
@@ -733,6 +761,31 @@
int nostart, int abort);
/**
+ * Restart nodes
+ *
+ * @param handle Management handle.
+ * @param no_of_nodes Number of database nodes to be restarted:<br>
+ * 0: Restart all database nodes in the cluster<br>
+ * n: Restart the <var>n</var> node(s) specified
in the
+ * array node_list
+ * @param node_list List of node IDs of database nodes to be restarted
+ * @param initial Remove filesystem from restarting node(s)
+ * @param nostart Don't actually start node(s) but leave them
+ * waiting for start command
+ * @param abort Don't perform graceful restart,
+ * but rather restart immediately
+ * @param disconnect Returns true if mgmapi client must disconnect from
+ * server to apply the requested operation. (e.g.
+ * restart the management server)
+ *
+ *
+ * @return Number of nodes stopped (-1 on error).
+ */
+ int ndb_mgm_restart3(NdbMgmHandle handle, int no_of_nodes,
+ const int * node_list, int initial,
+ int nostart, int abort, int *disconnect);
+
+ /**
* Start database nodes
*
* @param handle Management handle.
@@ -843,16 +896,6 @@
enum ndb_mgm_event_category category,
int level,
struct ndb_mgm_reply* reply);
-
- /**
- * Returns the port number where statistics information is sent
- *
- * @param handle NDB management handle.
- * @param reply Reply message.
- * @return -1 on error.
- */
- int ndb_mgm_get_stat_port(NdbMgmHandle handle,
- struct ndb_mgm_reply* reply);
#endif
/**
@@ -998,7 +1041,7 @@
void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
- unsigned version, int nodetype);
+ unsigned version, int nodetype, int log_event);
/**
* End Session
@@ -1021,6 +1064,16 @@
* Get the node id of the mgm server we're connected to
*/
Uint32 ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle);
+
+ /**
+ * Get the version of the mgm server we're talking to.
+ * Designed to allow switching of protocol depending on version
+ * so that new clients can speak to old servers in a compat mode
+ */
+ int ndb_mgm_get_version(NdbMgmHandle handle,
+ int *major, int *minor, int* build,
+ int len, char* str);
+
/**
* Config iterator
--- 1.12/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2006-03-11 02:00:13
+11:00
+++ 1.13/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2006-11-23 23:49:54
+11:00
@@ -365,6 +365,8 @@
,{ GSN_EVENT_SUBSCRIBE_REF, "EVENT_SUBSCRIBE_REF" }
,{ GSN_DUMP_STATE_ORD, "DUMP_STATE_ORD" }
+ ,{ GSN_NODE_START_REP, "NODE_START_REP" }
+
,{ GSN_START_INFOREQ, "START_INFOREQ" }
,{ GSN_START_INFOREF, "START_INFOREF" }
,{ GSN_START_INFOCONF, "START_INFOCONF" }
@@ -624,6 +626,12 @@
,{ GSN_LCP_PREPARE_REQ, "LCP_PREPARE_REQ" }
,{ GSN_LCP_PREPARE_REF, "LCP_PREPARE_REF" }
,{ GSN_LCP_PREPARE_CONF, "LCP_PREPARE_CONF" }
+
+ /* DICT LOCK */
+ ,{ GSN_DICT_LOCK_REQ, "DICT_LOCK_REQ" }
+ ,{ GSN_DICT_LOCK_CONF, "DICT_LOCK_CONF" }
+ ,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
+ ,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
,{ GSN_ALTER_NODEGROUP_REQ, "ALTER_NODEGROUP_REQ" }
,{ GSN_ALTER_NODEGROUP_REF, "ALTER_NODEGROUP_REF" }
--- 1.86/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2006-08-23 04:59:17 +10:00
+++ 1.87/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2006-11-23 23:04:17 +11:00
@@ -3732,9 +3732,10 @@
Uint32 db_host_count= 0;
ctx.m_userProperties.get(DB_TOKEN, &db_nodes);
ctx.m_userProperties.get("NoOfReplicas", &replicas);
- if((db_nodes % replicas) != 0){
+ if(db_nodes < replicas)
+ {
ctx.reportError("Invalid no of db nodes wrt no of replicas.\n"
- "No of nodes must be dividable with no or replicas");
+ "No of nodes must be at least the number of replicas");
return false;
}
// check that node groups and arbitrators are ok
--- 1.64/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2005-11-30 04:25:29
+11:00
+++ 1.65/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2006-11-23 23:04:14
+11:00
@@ -80,14 +80,15 @@
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
- unsigned sizeOfLongSignalMemory)
+ unsigned sizeOfLongSignalMemory) :
+ m_mgm_handle(0),
+ m_transp_count(0)
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
- m_mgm_handle= 0;
callbackObj=callback;
@@ -1002,7 +1003,6 @@
#endif
}
-static int x = 0;
void
TransporterRegistry::performSend()
{
@@ -1070,7 +1070,7 @@
}
#endif
#ifdef NDB_TCP_TRANSPORTER
- for (i = x; i < nTCPTransporters; i++)
+ for (i = m_transp_count; i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() &&
@@ -1079,7 +1079,7 @@
t->doSend();
}
}
- for (i = 0; i < x && i < nTCPTransporters; i++)
+ for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() &&
@@ -1088,8 +1088,8 @@
t->doSend();
}
}
- x++;
- if (x == nTCPTransporters) x = 0;
+ m_transp_count++;
+ if (m_transp_count == nTCPTransporters) m_transp_count = 0;
#endif
#endif
#ifdef NDB_SCI_TRANSPORTER
@@ -1319,7 +1319,7 @@
else
{
ndbout_c("Management server closed connection early. "
- "It is probably being shut down (or has crashed). "
+ "It is probably being shut down (or has problems). "
"We will retry the connection.");
}
}
--- 1.101/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2006-09-03 19:11:35 +10:00
+++ 1.102/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2006-11-23 23:49:54 +11:00
@@ -86,6 +86,11 @@
#include <signaldata/CreateObj.hpp>
#include <SLList.hpp>
+#include <signaldata/AlterNodegroup.hpp>
+#include <signaldata/AlterNodegroupImpl.hpp>
+
+#include <signaldata/GetNodegroup.hpp>
+
#define ZNOT_FOUND 626
#define ZALREADYEXIST 630
@@ -190,7 +195,22 @@
0,
0, 0,
0, 0
- }
+ },
+ /**
+ * Alter Nodegroup
+ */
+ {
+ GSN_ALTER_NODEGROUP_REQ,
+ GSN_ALTER_NG_REQ, GSN_ALTER_NG_REF, GSN_ALTER_NG_CONF,
+ 0, 0, 0, 0,
+ &Dbdict::alter_ng_prepare_start, 0,
+ &Dbdict::alter_ng_commit,
+ 0, &Dbdict::alter_ng_commit_complete,
+/* &Dbdict::alterNG_abort,
+ &Dbdict::alter_ng_abort_start, &Dbdict::alter_ng_abort_complete,*/
+ 0,
+ 0, 0
+ },
};
Uint32
@@ -464,7 +484,7 @@
if(signal)
{
- /* Denna branch körs vid GET_TABINFOREQ */
+ /* Denna branch körs vid GET_TABINFOREQ */
Uint32 * theData = signal->getDataPtrSend();
CreateFragmentationReq * const req = (CreateFragmentationReq*)theData;
@@ -485,7 +505,7 @@
}
else
{
- /* Denna del körs vid CREATE_TABLEREQ, ALTER_TABLEREQ */
+ /* Denna del körs vid CREATE_TABLEREQ, ALTER_TABLEREQ */
;
}
@@ -1511,6 +1531,7 @@
c_opRecordSequence(0),
c_dictLockQueue(c_dictLockPool),
c_dictLockPoll(false)
+ c_opAddNG(c_schemaOp),
{
BLOCK_CONSTRUCTOR(Dbdict);
@@ -1681,6 +1702,11 @@
addRecSignal(GSN_DICT_LOCK_REQ, &Dbdict::execDICT_LOCK_REQ);
addRecSignal(GSN_DICT_UNLOCK_ORD, &Dbdict::execDICT_UNLOCK_ORD);
+
+ addRecSignal(GSN_ALTER_NODEGROUP_REQ, &Dbdict::execALTER_NODEGROUP_REQ);
+ addRecSignal(GSN_ALTER_NG_REQ, &Dbdict::execALTER_NG_REQ);
+ addRecSignal(GSN_ALTER_NG_REF, &Dbdict::execALTER_NG_REF);
+ addRecSignal(GSN_ALTER_NG_CONF, &Dbdict::execALTER_NG_CONF);
}//Dbdict::Dbdict()
Dbdict::~Dbdict()
@@ -4619,7 +4645,7 @@
}
/*
- TODO RONM: Lite ny kod för FragmentData och RangeOrListData
+ TODO RONM: Lite ny kod för FragmentData och RangeOrListData
*/
if (supportedAlteration)
{
@@ -5209,7 +5235,7 @@
req->primaryTableId = tabPtr.p->primaryTableId;
/*
- Behöver fiska upp fragDataPtr från table object istället
+ Behöver fiska upp fragDataPtr från table object istället
*/
if(!fragDataPtr.isNull()){
signal->setSection(fragDataPtr, DiAddTabReq::FRAGMENTATION);
@@ -14230,6 +14256,8 @@
SchemaTransaction * trans_ptr_p,
Uint32 nodeId)
{
+ ndbout_c("schemaOp_reply: trans: %p node: %d", trans_ptr_p, nodeId);
+
{
SafeCounter tmp(c_counterMgr, trans_ptr_p->m_counter);
if(!tmp.clearWaitingFor(nodeId)){
@@ -14403,6 +14431,9 @@
DropFilegroupConf::SignalLength, JBB);
break;
}
+ case GSN_ALTER_NODEGROUP_REQ:{
+ break; // FIXME tell the managementserver something useful
+ }
default:
ndbrequire(false);
}
@@ -16083,3 +16114,311 @@
DropFilegroupImplReq::SignalLength, JBB);
}
+/*
+ ALTER_NODEGROUP_REQ
+ -------------------
+
+ Parse API request
+
+ trans_key
+
+ start internal protocols
+ */
+void
+Dbdict::execALTER_NODEGROUP_REQ(Signal* signal)
+{
+ Uint32 i;
+
+ jamEntry();
+
+ if(!assembleFragments(signal))
+ {
+ jam();
+ return;
+ }
+
+ AlterNodegroupReq *req= (AlterNodegroupReq*)signal->getDataPtr();
+ AlterNodegroupRef *ref= (AlterNodegroupRef*)signal->getDataPtrSend();
+ Uint32 senderRef= req->senderRef;
+ Uint32 senderData= req->senderData;
+ Uint32 op= req->operation;
+
+ Uint32 nrNodes= req->nrNodes;
+
+ Uint32 nodes[4]; // FIXME should be max no replicas
+
+ for(i=0;i<nrNodes;i++) // FIXME signed and unsigned
+ nodes[i]= req->nodes[i];
+
+ do {
+ if(op!=AlterNodegroupReq::Add) // only support add at the moment
+ {
+ jam();
+ ref->errorCode = AlterNodegroupRef::Unsupported;
+ ref->errorKey = 0;
+ ref->errorLine = __LINE__;
+ break;
+ }
+
+ Ptr<SchemaTransaction> trans_ptr;
+ if (! c_Trans.seize(trans_ptr)){
+ ref->errorCode = AlterNodegroupRef::Busy;
+ ref->errorKey = 0;
+ ref->errorLine = __LINE__;
+ break;
+ }
+ if(getOwnNodeId() != c_masterNodeId){
+ jam();
+ ref->errorCode = AlterNodegroupRef::NotMaster;
+ ref->errorKey = 0;
+ ref->errorLine = __LINE__;
+ break;
+ }
+
+ if (c_blockState != BS_IDLE){
+ jam();
+ ref->errorCode = AlterNodegroupRef::Busy;
+ ref->errorKey = 0;
+ ref->errorLine = __LINE__;
+ break;
+ }
+
+ ndbout_c("execALTER_NODEGROUP_REQ: trans_ptr.p: %p", trans_ptr.p);
+
+ const Uint32 trans_key = ++c_opRecordSequence;
+ trans_ptr.p->key = trans_key;
+ trans_ptr.p->m_senderRef = senderRef;
+ trans_ptr.p->m_senderData = senderData;
+ trans_ptr.p->m_nodes = c_aliveNodes;
+ trans_ptr.p->m_errorCode = 0;
+ c_Trans.add(trans_ptr);
+
+ const Uint32 op_key = ++c_opRecordSequence;
+ trans_ptr.p->m_op.m_key = op_key;
+ trans_ptr.p->m_op.m_vt_index = 5;
+ trans_ptr.p->m_op.m_state = DictObjOp::Preparing; // FIXME
+
+ AlterNGReq *ngreq = (AlterNGReq*)signal->getDataPtrSend();
+ ngreq->op_key= trans_ptr.p->m_op.m_key;
+ ngreq->senderRef= reference();
+ ngreq->senderData= trans_key;
+ ngreq->clientRef= trans_ptr.p->m_senderRef;//senderRef;
+ ngreq->clientData= trans_ptr.p->m_senderData;// senderData;
+
+ ndbassert(nrNodes <= 4);
+ ngreq->nrNodes= nrNodes;
+ for(i=0; i<nrNodes; i++) // FIXME sign and unsigned
+ ngreq->nodes[i]= nodes[i];
+
+ // first, send to DIH to work out what NG we're adding
+ ngreq->operation= AlterNGReq::GetNextNodegroupID;
+ sendSignal(DBDIH_REF, GSN_ALTER_NG_REQ, signal,
+ AlterNGReq::SignalLength+nrNodes, JBB);
+ return;
+ } while(0);
+
+ ref->senderData = senderData;
+ ref->masterNodeId = c_masterNodeId;
+ sendSignal(senderRef, GSN_ALTER_NODEGROUP_REF,signal,
+ CreateFileRef::SignalLength, JBB);
+}
+
+void
+Dbdict::execALTER_NG_REQ(Signal *signal)
+{
+ jamEntry();
+
+ if(!assembleFragments(signal)){
+ jam();
+ return;
+ }
+
+ AlterNGReq *req= (AlterNGReq*)signal->getDataPtr();
+
+ AddNGRecordPtr opPtr;
+ ndbrequire(c_opAddNG.seize(opPtr));
+ jam();
+ const Uint32 key= req->op_key;
+ opPtr.p->key= key;
+ c_opAddNG.add(opPtr);
+ jam();
+ opPtr.p->m_errorCode = 0;
+ opPtr.p->m_senderRef = req->senderRef;
+ opPtr.p->m_senderData = req->senderData;
+ opPtr.p->m_clientRef = req->clientRef;
+ opPtr.p->m_clientData = req->clientData;
+
+ // 3 unused values
+ opPtr.p->m_obj_id = 0;
+ opPtr.p->m_obj_type = 0;
+ opPtr.p->m_obj_version = 0;
+
+ opPtr.p->nrNodes= req->nrNodes;
+ for(int i=0; i< req->nrNodes; i++) // FIXME signed and unsigned
+ opPtr.p->nodes[i]= req->nodes[i];
+ jam();
+ opPtr.p->m_vt_index= 5;
+ opPtr.p->m_callback.m_callbackData = key;
+ opPtr.p->m_callback.m_callbackFunction=
+ safe_cast(&Dbdict::alter_ng_prepare_complete);
+ jam();
+ (this->*f_dict_op[opPtr.p->m_vt_index].m_prepare_start) (signal, opPtr.p);
+}
+
+void
+Dbdict::execALTER_NG_REF(Signal *signal)
+{
+ ndbassert(false);
+}
+
+void Dbdict::alter_ng_prepare_start(Signal* signal, SchemaOp *op)
+{
+ jam();
+ AlterNGReq *req= (AlterNGReq*)signal->getDataPtr();
+ AlterNGRef *ref= (AlterNGRef*)signal->getDataPtrSend();
+ struct OpAddNG *aop= (struct OpAddNG*) op;
+
+ do
+ {
+ if (c_blockState != BS_IDLE){
+ jam();
+ ref->errorCode = AlterNodegroupRef::Busy;
+ ref->errorLine = __LINE__;
+ break;
+ }
+
+ c_blockState= BS_ALTER_NG;
+
+ // do prepare
+ // set up the structure in DBDIH
+
+ req->operation= AlterNGReq::Add;
+
+ EXECUTE_DIRECT(DBDIH, GSN_ALTER_NG_REQ, signal,
+ AlterNGReq::SignalLength+aop->nrNodes);
+
+ jamEntry();
+ AlterNGConf* const conf= (AlterNGConf*)&signal->theData[0];
+
+ // now we know what' nodegroup we're creating
+ jam();
+ aop->nodegroupId= conf->nodegroupId;
+ } while (0);
+
+ jam();
+
+ execute(signal, op->m_callback, 0);
+
+}
+
+void Dbdict::alter_ng_prepare_complete(Signal* signal,
+ Uint32 callbackData,
+ Uint32 returnCode)
+{
+ jam();
+ ndbrequire(returnCode == 0);
+
+ AddNGRecordPtr opPtr;
+ ndbrequire(c_opAddNG.find(opPtr, callbackData));
+
+ if(opPtr.p->m_errorCode == 0)
+ {
+ jam();
+
+ AlterNGConf* const conf= (AlterNGConf*)signal->getDataPtr();
+ conf->senderData= opPtr.p->m_senderData;
+ conf->senderRef= reference();
+ conf->clientData= opPtr.p->m_clientData;
+ conf->clientRef= opPtr.p->m_clientRef;
+ sendSignal(opPtr.p->m_senderRef, GSN_ALTER_NG_CONF,
+ signal, AlterNGConf::SignalLength, JBB);
+ return;
+ }
+
+ // FIXME send REF on failed
+ jam();
+ ndbassert(false);
+}
+
+/*
+ * synchronisation point. here PREPARE is done (when all sigs received)
+ */
+void Dbdict::execALTER_NG_CONF(Signal *signal)
+{
+ jamEntry();
+ int i;
+
+ AlterNGConf* const conf = (AlterNGConf*)signal->getDataPtr();
+
+ Ptr<SchemaTransaction> trans_ptr;
+ ndbrequire(c_Trans.find(trans_ptr, conf->senderData));
+
+ ndbout_c("execALTER_NG_CONF: trans_ptr.p: %p", trans_ptr.p);
+
+ const Uint32 senderRef= conf->senderRef;
+ const Uint32 senderData= conf->senderData;
+ const Uint32 enabled= conf->enabled;
+
+ if(refToBlock(signal->senderBlockRef()) == DBDIH && !enabled) // not commit
+ {
+ const Uint32 nrNodes= conf->nrNodes;
+ ndbassert(nrNodes <= 4);
+ Uint32 nodes[4]; // FIXME (should be MAX)
+ for(i=0;i<nrNodes;i++)
+ nodes[i]= conf->nodes[i];
+
+ jam();
+ AlterNGReq *ngreq = (AlterNGReq*)signal->getDataPtrSend();
+ ngreq->op_key= trans_ptr.p->m_op.m_key;
+ ngreq->senderRef= reference();
+ ngreq->senderData= senderData; //trans_key;
+ ngreq->clientRef= trans_ptr.p->m_senderRef;//senderRef;
+ ngreq->clientData=trans_ptr.p->m_senderData;// senderData;
+
+ ndbassert(nrNodes <= 4);
+ ngreq->nrNodes= nrNodes;
+ for(i=0; i<nrNodes; i++) // FIXME sign and unsigned
+ ngreq->nodes[i]= nodes[i];
+
+ char buf[100];
+ trans_ptr.p->m_nodes.getText(buf);
+ ndbout_c("prepare towards: %s", buf);
+
+ NodeReceiverGroup rg(DBDICT, trans_ptr.p->m_nodes);
+ SafeCounter tmp(c_counterMgr, trans_ptr.p->m_counter);
+ tmp.init<AlterNGRef>(rg, GSN_ALTER_NG_REF, trans_ptr.p->key);
+ sendSignal(rg, GSN_ALTER_NG_REQ, signal,
+ AlterNGReq::SignalLength+nrNodes, JBB);
+ return;
+ }
+
+ jam();
+ schemaOp_reply(signal, trans_ptr.p, refToNode(senderRef));
+}
+
+void Dbdict::alter_ng_commit(Signal *signal, SchemaOp *op)
+{
+ jam();
+
+ ndbout << "ALTER_NG COMMIT" << endl;
+
+ jam();
+
+ AlterNGReq* const req= (AlterNGReq*)signal->getDataPtr();
+
+ req->operation= AlterNGReq::Enable;
+ req->senderRef= op->m_senderRef;
+ req->senderData= op->m_senderData;
+
+ sendSignal(DBDIH_REF, GSN_ALTER_NG_REQ, signal,
+ AlterNGReq::SignalLength, JBB);
+}
+
+void Dbdict::alter_ng_commit_complete(Signal *signal, SchemaOp *op)
+{
+ jam();
+ c_blockState= BS_IDLE;
+ ndbout << "ALTER_NG COMMIT DONE" << endl;
+
+ execute(signal, op->m_callback, 0);
+}
--- 1.42/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2006-08-22 07:41:15 +10:00
+++ 1.43/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2006-11-23 23:49:54 +11:00
@@ -1067,7 +1067,8 @@
BS_CREATE_TAB = 1,
BS_BUSY = 2,
BS_NODE_FAILURE = 3,
- BS_NODE_RESTART = 4
+ BS_NODE_RESTART = 4,
+ BS_ALTER_NG = 5
};
struct BlockState;
friend struct BlockState;
@@ -1991,7 +1992,15 @@
{
};
typedef Ptr<OpDropObj> DropObjRecordPtr;
-
+
+ struct OpAddNG : public SchemaOp
+ {
+ int nodegroupId;
+ int nrNodes;
+ int nodes[4]; // FIXME
+ };
+ typedef Ptr<OpAddNG> AddNGRecordPtr;
+
/**
* Only used at coordinator/master
*/
@@ -2050,6 +2059,7 @@
KeyTable2<SchemaTransaction, OpRecordUnion> c_Trans;
KeyTable2Ref<OpCreateObj, SchemaOp, OpRecordUnion> c_opCreateObj;
KeyTable2Ref<OpDropObj, SchemaOp, OpRecordUnion> c_opDropObj;
+ KeyTable2Ref<OpAddNG, SchemaOp, OpRecordUnion> c_opAddNG;
// Unique key for operation XXX move to some system table
Uint32 c_opRecordSequence;
@@ -2608,6 +2618,13 @@
void execDICT_ABORT_REF(Signal*);
void execDICT_ABORT_CONF(Signal*);
+ void execALTER_NODEGROUP_REQ(Signal*);
+ void execALTER_NG_REQ(Signal*);
+ void execALTER_NG_REF(Signal*);
+ void execALTER_NG_CONF(Signal*);
+
+ void execGET_NODEGROUP_CONF(Signal*);
+
public:
void createObj_commit(Signal*, struct SchemaOp*);
void createObj_abort(Signal*, struct SchemaOp*);
@@ -2638,6 +2655,14 @@
void send_drop_fg(Signal*, SchemaOp*, DropFilegroupImplReq::RequestInfo);
void drop_undofile_prepare_start(Signal* signal, SchemaOp*);
+
+ void alter_ng_prepare_start(Signal* signal, SchemaOp* op);
+ void alter_ng_prepare_complete(Signal *signal, Uint32,Uint32);//SchemaOp* op);
+ void alter_ng_commit(Signal *signal, SchemaOp* op);
+ void alter_ng_commit_complete(Signal *signal, SchemaOp* op);
+ void alter_ng_abort(Signal *signal, SchemaOp* op);
+ void alter_ng_abort_start(Signal *signal, SchemaOp* op);
+ void alter_ng_abort_complete(Signal *signal, SchemaOp* op);
};
inline bool
--- 1.20/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2006-07-10 22:01:53 +10:00
+++ 1.21/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2006-11-23 23:04:16 +11:00
@@ -714,6 +714,9 @@
void execALTER_TAB_REQ(Signal* signal);
void execCREATE_FRAGMENTATION_REQ(Signal*);
+
+ void execALTER_NG_REQ(Signal*);
+ void execGET_NODEGROUP_REQ(Signal*);
void waitDropTabWritingToFile(Signal *, TabRecordPtr tabPtr);
void checkPrepDropTabComplete(Signal *, TabRecordPtr tabPtr);
--- 1.15/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2006-06-28 16:47:40 +10:00
+++ 1.16/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2006-11-23 23:49:54 +11:00
@@ -260,6 +260,8 @@
addRecSignal(GSN_START_FRAGREF,
&Dbdih::execSTART_FRAGREF);
+ addRecSignal(GSN_ALTER_NG_REQ, &Dbdih::execALTER_NG_REQ);
+
apiConnectRecord = 0;
connectRecord = 0;
fileRecord = 0;
--- 1.45/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-03-11 02:00:14 +11:00
+++ 1.46/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-11-23 23:49:54 +11:00
@@ -68,6 +68,7 @@
#include <signaldata/LqhFrag.hpp>
#include <signaldata/FsOpenReq.hpp>
#include <signaldata/DihFragCount.hpp>
+#include <signaldata/DictLock.hpp>
#include <signaldata/AlterNodegroupImpl.hpp>
#include <DebuggerNames.hpp>
@@ -546,7 +547,7 @@
break;
case DihContinueB::ZSTART_PERMREQ_AGAIN:
jam();
- nodeRestartPh2Lab(signal);
+ nodeRestartPh2Lab2(signal);
return;
break;
case DihContinueB::SwitchReplica:
@@ -1091,7 +1092,7 @@
jamEntry();
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequireErr(p != 0, NDBD_EXIT_INVALID_CONFIG);
initData();
@@ -1134,6 +1135,26 @@
return;
}//Dbdih::execSTART_FRAGCONF()
+void Dbdih::execSTART_FRAGREF(Signal* signal)
+{
+ jamEntry();
+
+ /**
+ * Kill starting node
+ */
+ Uint32 errCode = signal->theData[1];
+ Uint32 nodeId = signal->theData[2];
+
+ SystemError * const sysErr = (SystemError*)&signal->theData[0];
+ sysErr->errorCode = SystemError::StartFragRefError;
+ sysErr->errorRef = reference();
+ sysErr->data1 = errCode;
+ sysErr->data2 = 0;
+ sendSignal(calcNdbCntrBlockRef(nodeId), GSN_SYSTEM_ERROR, signal,
+ SystemError::SignalLength, JBB);
+ return;
+}//Dbdih::execSTART_FRAGCONF()
+
void Dbdih::execSTART_MEREF(Signal* signal)
{
jamEntry();
@@ -1177,7 +1198,7 @@
{
jamEntry();
cntrlblockref = signal->theData[0];
- if(theConfiguration.getInitialStart()){
+ if(m_ctx.m_config.getInitialStart()){
sendSignal(cntrlblockref, GSN_DIH_RESTARTREF, signal, 1, JBB);
} else {
readGciFileLab(signal);
@@ -1323,6 +1344,7 @@
case NodeState::ST_INITIAL_NODE_RESTART:
case NodeState::ST_NODE_RESTART:
jam();
+
/***********************************************************************
* When starting nodes while system is operational we must be controlled
* by the master since only one node restart is allowed at a time.
@@ -1333,7 +1355,7 @@
req->startingRef = reference();
req->startingVersion = 0; // Obsolete
sendSignal(cmasterdihref, GSN_START_MEREQ, signal,
- StartMeReq::SignalLength, JBB);
+ StartMeReq::SignalLength, JBB);
return;
}
ndbrequire(false);
@@ -1401,6 +1423,27 @@
}//Dbdih::execNDB_STTOR()
void
+Dbdih::exec_node_start_rep(Signal* signal)
+{
+ /*
+ * Send DICT_UNLOCK_ORD when this node is SL_STARTED.
+ *
+ * Sending it before (sp 7) conflicts with code which assumes
+ * SL_STARTING means we are in copy phase of NR.
+ *
+ * NodeState::starting.restartType is not supposed to be used
+ * when SL_STARTED. Also it seems NODE_START_REP can arrive twice.
+ *
+ * For these reasons there are no consistency checks and
+ * we rely on c_dictLockSlavePtrI_nodeRestart alone.
+ */
+ if (c_dictLockSlavePtrI_nodeRestart != RNIL) {
+ sendDictUnlockOrd(signal, c_dictLockSlavePtrI_nodeRestart);
+ c_dictLockSlavePtrI_nodeRestart = RNIL;
+ }
+}
+
+void
Dbdih::createMutexes(Signal * signal, Uint32 count){
Callback c = { safe_cast(&Dbdih::createMutex_done), count };
@@ -1467,6 +1510,33 @@
return;
}
+ NodeRecordPtr nodePtr;
+ Uint32 gci = SYSFILE->lastCompletedGCI[getOwnNodeId()];
+ for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++)
+ {
+ jam();
+ ptrAss(nodePtr, nodeRecord);
+ if (SYSFILE->lastCompletedGCI[nodePtr.i] > gci)
+ {
+ jam();
+ /**
+ * Since we're starting(is master) and there
+ * there are other nodes with higher GCI...
+ * there gci's must be invalidated...
+ * and they _must_ do an initial start
+ * indicate this by setting lastCompletedGCI = 0
+ */
+ SYSFILE->lastCompletedGCI[nodePtr.i] = 0;
+ ndbrequire(nodePtr.p->nodeStatus != NodeRecord::ALIVE);
+ warningEvent("Making filesystem for node %d unusable",
+ nodePtr.i);
+ }
+ }
+ /**
+ * This set which GCI we will try to restart to
+ */
+ SYSFILE->newestRestorableGCI = gci;
+
ndbrequire(isMaster());
copyGciLab(signal, CopyGCIReq::RESTART); // We have already read the file!
}//Dbdih::ndbStartReqLab()
@@ -1587,6 +1657,35 @@
/*---------------------------------------------------------------------------*/
void Dbdih::nodeRestartPh2Lab(Signal* signal)
{
+ /*
+ * Lock master DICT to avoid metadata operations during INR/NR.
+ * Done just before START_PERMREQ.
+ *
+ * It would be more elegant to do this just before START_MEREQ.
+ * The problem is, on INR we end up in massive invalidateNodeLCP
+ * which is not fully protected against metadata ops.
+ */
+ ndbrequire(c_dictLockSlavePtrI_nodeRestart == RNIL);
+
+ // check that we are not yet taking part in schema ops
+ CRASH_INSERTION(7174);
+
+ Uint32 lockType = DictLockReq::NodeRestartLock;
+ Callback c = { safe_cast(&Dbdih::recvDictLockConf_nodeRestart), 0 };
+ sendDictLockReq(signal, lockType, c);
+}
+
+void Dbdih::recvDictLockConf_nodeRestart(Signal* signal, Uint32 data, Uint32 ret)
+{
+ ndbrequire(c_dictLockSlavePtrI_nodeRestart == RNIL);
+ ndbrequire(data != RNIL);
+ c_dictLockSlavePtrI_nodeRestart = data;
+
+ nodeRestartPh2Lab2(signal);
+}
+
+void Dbdih::nodeRestartPh2Lab2(Signal* signal)
+{
/*------------------------------------------------------------------------*/
// REQUEST FOR PERMISSION FROM MASTER TO START A NODE IN AN ALREADY
// RUNNING SYSTEM.
@@ -1597,7 +1696,7 @@
req->nodeId = cownNodeId;
req->startType = cstarttype;
sendSignal(cmasterdihref, GSN_START_PERMREQ, signal, 3, JBB);
-}//Dbdih::nodeRestartPh2Lab()
+}
void Dbdih::execSTART_PERMCONF(Signal* signal)
{
@@ -1613,7 +1712,7 @@
{
jamEntry();
Uint32 errorCode = signal->theData[1];
- if (errorCode == ZNODE_ALREADY_STARTING_ERROR) {
+ if (errorCode == StartPermRef::ZNODE_ALREADY_STARTING_ERROR) {
jam();
/*-----------------------------------------------------------------------*/
// The master was busy adding another node. We will wait for a second and
@@ -1623,6 +1722,20 @@
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 3000, 1);
return;
}//if
+
+ if (errorCode == StartPermRef::InitialStartRequired)
+ {
+ CRASH_INSERTION(7170);
+ char buf[255];
+ BaseString::snprintf(buf, sizeof(buf),
+ "Cluster requires this node to be started "
+ " with --initial as partial start has been performed"
+ " and this filesystem is unusable");
+ progError(__LINE__,
+ NDBD_EXIT_SR_RESTARTCONFLICT,
+ buf);
+ ndbrequire(false);
+ }
/*------------------------------------------------------------------------*/
// Some node process in another node involving our node was still active. We
// will recover from this by crashing here.
@@ -1725,15 +1838,15 @@
const BlockReference retRef = req->blockRef;
const Uint32 nodeId = req->nodeId;
const Uint32 typeStart = req->startType;
-
CRASH_INSERTION(7122);
ndbrequire(isMaster());
ndbrequire(refToNode(retRef) == nodeId);
if ((c_nodeStartMaster.activeState) ||
- (c_nodeStartMaster.wait != ZFALSE)) {
+ (c_nodeStartMaster.wait != ZFALSE) ||
+ ERROR_INSERTED_CLEAR(7175)) {
jam();
signal->theData[0] = nodeId;
- signal->theData[1] = ZNODE_ALREADY_STARTING_ERROR;
+ signal->theData[1] = StartPermRef::ZNODE_ALREADY_STARTING_ERROR;
sendSignal(retRef, GSN_START_PERMREF, signal, 2, JBB);
return;
}//if
@@ -1743,6 +1856,16 @@
ndbrequire(false);
}//if
+ if (SYSFILE->lastCompletedGCI[nodeId] == 0 &&
+ typeStart != NodeState::ST_INITIAL_NODE_RESTART)
+ {
+ jam();
+ signal->theData[0] = nodeId;
+ signal->theData[1] = StartPermRef::InitialStartRequired;
+ sendSignal(retRef, GSN_START_PERMREF, signal, 2, JBB);
+ return;
+ }
+
/*----------------------------------------------------------------------
* WE START THE INCLUSION PROCEDURE
* ---------------------------------------------------------------------*/
@@ -3755,24 +3878,12 @@
/* ------------------------------------------------------------------------- */
void Dbdih::selectMasterCandidateAndSend(Signal* signal)
{
- Uint32 gci = 0;
- Uint32 masterCandidateId = 0;
- NodeRecordPtr nodePtr;
- for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) {
- jam();
- ptrAss(nodePtr, nodeRecord);
- if (SYSFILE->lastCompletedGCI[nodePtr.i] > gci) {
- jam();
- masterCandidateId = nodePtr.i;
- gci = SYSFILE->lastCompletedGCI[nodePtr.i];
- }//if
- }//for
- ndbrequire(masterCandidateId != 0);
setNodeGroups();
- signal->theData[0] = masterCandidateId;
- signal->theData[1] = gci;
+ signal->theData[0] = getOwnNodeId();
+ signal->theData[1] = SYSFILE->lastCompletedGCI[getOwnNodeId()];
sendSignal(cntrlblockref, GSN_DIH_RESTARTCONF, signal, 2, JBB);
-
+
+ NodeRecordPtr nodePtr;
Uint32 node_groups[MAX_NDB_NODES];
memset(node_groups, 0, sizeof(node_groups));
for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) {
@@ -3790,10 +3901,10 @@
if(count != 0 && count != cnoReplicas){
char buf[255];
BaseString::snprintf(buf, sizeof(buf),
- "Illegal configuration change."
- " Initial start needs to be performed "
- " when changing no of replicas (%d != %d)",
- node_groups[nodePtr.i], cnoReplicas);
+ "Illegal configuration change."
+ " Initial start needs to be performed "
+ " when changing no of replicas (%d != %d)",
+ node_groups[nodePtr.i], cnoReplicas);
progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
}
}
@@ -5466,6 +5577,7 @@
//const Uint32 lcpId = SYSFILE->latestLCP_ID;
const bool lcpOngoingFlag = (tabPtr.p->tabLcpStatus== TabRecord::TLS_ACTIVE);
+ const bool temporary = !tabPtr.p->storedTable;
FragmentstorePtr fragPtr;
for(Uint32 fragNo = 0; fragNo < tabPtr.p->totalfragments; fragNo++){
@@ -5486,7 +5598,7 @@
jam();
found = true;
noOfRemovedReplicas++;
- removeNodeFromStored(nodeId, fragPtr, replicaPtr);
+ removeNodeFromStored(nodeId, fragPtr, replicaPtr, temporary);
if(replicaPtr.p->lcpOngoingFlag){
jam();
/**
@@ -5631,6 +5743,12 @@
return;
}
+ if (ERROR_INSERTED(7030))
+ {
+ ndbout_c("Reenable GCP_PREPARE");
+ CLEAR_ERROR_INSERT_VALUE;
+ }
+
NFCompleteRep * const nf = (NFCompleteRep *)&signal->theData[0];
nf->blockNo = DBDIH;
nf->nodeId = cownNodeId;
@@ -6191,9 +6309,6 @@
signal->theData[0] = 7012;
execDUMP_STATE_ORD(signal);
- signal->theData[0] = 7015;
- execDUMP_STATE_ORD(signal);
-
c_lcpMasterTakeOverState.set(LMTOS_IDLE, __LINE__);
checkLocalNodefailComplete(signal, failedNodePtr.i, NF_LCP_TAKE_OVER);
@@ -6583,6 +6698,7 @@
ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
const Uint32 max = NGPtr.p->nodeCount;
+ fragments[count++] = c_nextLogPart++; // Store logpart first
Uint32 tmp= next_replica_node[NGPtr.i];
for(Uint32 replicaNo = 0; replicaNo < noOfReplicas; replicaNo++)
{
@@ -6629,7 +6745,8 @@
FragmentstorePtr fragPtr;
ReplicaRecordPtr replicaPtr;
getFragstore(primTabPtr.p, fragNo, fragPtr);
- fragments[count++]= fragPtr.p->preferredPrimary;
+ fragments[count++] = c_nextLogPart++;
+ fragments[count++] = fragPtr.p->preferredPrimary;
for (replicaPtr.i = fragPtr.p->storedReplicas;
replicaPtr.i != RNIL;
replicaPtr.i = replicaPtr.p->nextReplica) {
@@ -6652,7 +6769,7 @@
}
}
}
- ndbrequire(count == (2U + noOfReplicas * noOfFragments));
+ ndbrequire(count == (2U + (1 + noOfReplicas) * noOfFragments));
CreateFragmentationConf * const conf =
(CreateFragmentationConf*)signal->getDataPtrSend();
@@ -6825,8 +6942,8 @@
FragmentstorePtr fragPtr;
Uint32 activeIndex = 0;
getFragstore(tabPtr.p, fragId, fragPtr);
+ fragPtr.p->m_log_part_id = fragments[index++];
fragPtr.p->preferredPrimary = fragments[index];
- fragPtr.p->m_log_part_id = c_nextLogPart++;
for (Uint32 i = 0; i<noReplicas; i++) {
const Uint32 nodeId = fragments[index++];
@@ -7766,6 +7883,16 @@
{
jamEntry();
CRASH_INSERTION(7005);
+
+ if (ERROR_INSERTED(7030))
+ {
+ cgckptflag = true;
+ ndbout_c("Delayed GCP_PREPARE 5s");
+ sendSignalWithDelay(reference(), GSN_GCP_PREPARE, signal, 5000,
+ signal->getLength());
+ return;
+ }
+
Uint32 masterNodeId = signal->theData[0];
Uint32 gci = signal->theData[1];
BlockReference retRef = calcDihBlockRef(masterNodeId);
@@ -7778,6 +7905,14 @@
cgcpParticipantState = GCP_PARTICIPANT_PREPARE_RECEIVED;
cnewgcp = gci;
+ if (ERROR_INSERTED(7031))
+ {
+ ndbout_c("Crashing delayed in GCP_PREPARE 3s");
+ signal->theData[0] = 9999;
+ sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 3000, 1);
+ return;
+ }
+
signal->theData[0] = cownNodeId;
signal->theData[1] = gci;
sendSignal(retRef, GSN_GCP_PREPARECONF, signal, 2, JBA);
@@ -8470,11 +8605,21 @@
/* WE FAILED IN OPENING A FILE. IF THE FIRST FILE THEN TRY WITH THE */
/* DUPLICATE FILE, OTHERWISE WE REPORT AN ERROR IN THE SYSTEM RESTART. */
/* ---------------------------------------------------------------------- */
- ndbrequire(filePtr.i == tabPtr.p->tabFile[0]);
- filePtr.i = tabPtr.p->tabFile[1];
- ptrCheckGuard(filePtr, cfileFileSize, fileRecord);
- openFileRw(signal, filePtr);
- filePtr.p->reqStatus = FileRecord::OPENING_TABLE;
+ if (filePtr.i == tabPtr.p->tabFile[0])
+ {
+ filePtr.i = tabPtr.p->tabFile[1];
+ ptrCheckGuard(filePtr, cfileFileSize, fileRecord);
+ openFileRw(signal, filePtr);
+ filePtr.p->reqStatus = FileRecord::OPENING_TABLE;
+ }
+ else
+ {
+ char buf[256];
+ BaseString::snprintf(buf, sizeof(buf),
+ "Error opening DIH schema files for table: %d",
+ tabPtr.i);
+ progError(__LINE__, NDBD_EXIT_AFS_NO_SUCH_FILE, buf);
+ }
}//Dbdih::openingTableErrorLab()
void Dbdih::readingTableLab(Signal* signal, FileRecordPtr filePtr)
@@ -8615,7 +8760,7 @@
*--------_----------------------------------------------------- */
const Uint32 nextCrashed = noCrashedReplicas + 1;
replicaPtr.p->noCrashedReplicas = nextCrashed;
- arrGuard(nextCrashed, 8);
+ arrGuardErr(nextCrashed, 8, NDBD_EXIT_MAX_CRASHED_REPLICAS);
replicaPtr.p->createGci[nextCrashed] = newestRestorableGCI + 1;
ndbrequire(newestRestorableGCI + 1 != 0xF1F1F1F1);
replicaPtr.p->replicaLastGci[nextCrashed] = (Uint32)-1;
@@ -8640,6 +8785,7 @@
}
replicaPtr.i = nextReplicaPtrI;
}//while
+ updateNodeInfo(fragPtr);
}
}
@@ -10789,6 +10935,10 @@
c_copyGCIMaster.m_copyReason,
c_copyGCIMaster.m_waiting);
break;
+ case GCP_READY: // shut up lint
+ case GCP_PREPARE_SENT:
+ case GCP_COMMIT_SENT:
+ break;
}
ndbout_c("c_copyGCISlave: sender{Data, Ref} %d %x reason: %d nextWord: %d",
@@ -11533,7 +11683,7 @@
memset(&sysfileData[0], 0, sizeof(sysfileData));
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
c_lcpState.clcpDelay = 20;
@@ -12030,7 +12180,7 @@
Uint32 tmngNode;
Uint32 tmngNodeGroup;
Uint32 tmngLimit;
- Uint32 i;
+ Uint32 i, j;
/**-----------------------------------------------------------------------
* ASSIGN ALL ACTIVE NODES INTO NODE GROUPS. HOT SPARE NODES ARE ASSIGNED
@@ -12076,6 +12226,38 @@
Sysfile::setNodeGroup(mngNodeptr.i, SYSFILE->nodeGroups,
mngNodeptr.p->nodeGroup);
}//if
}//for
+
+ for (i = 0; i<cnoOfNodeGroups; i++)
+ {
+ jam();
+ bool alive = false;
+ NodeGroupRecordPtr NGPtr;
+ NGPtr.i = i;
+ ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+ for (j = 0; j<NGPtr.p->nodeCount; j++)
+ {
+ jam();
+ mngNodeptr.i = NGPtr.p->nodesInGroup[j];
+ ptrCheckGuard(mngNodeptr, MAX_NDB_NODES, nodeRecord);
+ if (checkNodeAlive(NGPtr.p->nodesInGroup[j]))
+ {
+ alive = true;
+ break;
+ }
+ }
+
+ if (!alive)
+ {
+ char buf[255];
+ BaseString::snprintf
+ (buf, sizeof(buf),
+ "Illegal initial start, no alive node in nodegroup %u", i);
+ progError(__LINE__,
+ NDBD_EXIT_SR_RESTARTCONFLICT,
+ buf);
+
+ }
+ }
}//Dbdih::makeNodeGroups()
/**
@@ -12212,7 +12394,8 @@
/* THAT THE NEW REPLICA IS NOT STARTED YET AND REPLICA_LAST_GCI IS*/
/* SET TO -1 TO INDICATE THAT IT IS NOT DEAD YET. */
/*----------------------------------------------------------------------*/
- arrGuard(ncrReplicaPtr.p->noCrashedReplicas + 1, 8);
+ arrGuardErr(ncrReplicaPtr.p->noCrashedReplicas + 1, 8,
+ NDBD_EXIT_MAX_CRASHED_REPLICAS);
ncrReplicaPtr.p->replicaLastGci[ncrReplicaPtr.p->noCrashedReplicas] =
SYSFILE->lastCompletedGCI[nodeId];
ncrReplicaPtr.p->noCrashedReplicas = ncrReplicaPtr.p->noCrashedReplicas + 1;
@@ -12575,9 +12758,18 @@
/*---------------------------------------------------------------*/
void Dbdih::removeNodeFromStored(Uint32 nodeId,
FragmentstorePtr fragPtr,
- ReplicaRecordPtr replicatePtr)
+ ReplicaRecordPtr replicatePtr,
+ bool temporary)
{
- newCrashedReplica(nodeId, replicatePtr);
+ if (!temporary)
+ {
+ jam();
+ newCrashedReplica(nodeId, replicatePtr);
+ }
+ else
+ {
+ jam();
+ }
removeStoredReplica(fragPtr, replicatePtr);
linkOldStoredReplica(fragPtr, replicatePtr);
ndbrequire(fragPtr.p->storedReplicas != RNIL);
@@ -12877,7 +13069,6 @@
void Dbdih::setInitialActiveStatus()
{
NodeRecordPtr siaNodeptr;
- Uint32 tsiaNodeActiveStatus;
Uint32 tsiaNoActiveNodes;
tsiaNoActiveNodes = csystemnodes - cnoHotSpare;
@@ -12885,39 +13076,34 @@
SYSFILE->nodeStatus[i] = 0;
for (siaNodeptr.i = 1; siaNodeptr.i < MAX_NDB_NODES; siaNodeptr.i++) {
ptrAss(siaNodeptr, nodeRecord);
- if (siaNodeptr.p->nodeStatus == NodeRecord::ALIVE) {
+ switch(siaNodeptr.p->nodeStatus){
+ case NodeRecord::ALIVE:
+ case NodeRecord::DEAD:
if (tsiaNoActiveNodes == 0) {
jam();
siaNodeptr.p->activeStatus = Sysfile::NS_HotSpare;
} else {
jam();
tsiaNoActiveNodes = tsiaNoActiveNodes - 1;
- siaNodeptr.p->activeStatus = Sysfile::NS_Active;
- }//if
- } else {
- jam();
- siaNodeptr.p->activeStatus = Sysfile::NS_NotDefined;
- }//if
- switch (siaNodeptr.p->activeStatus) {
- case Sysfile::NS_Active:
- jam();
- tsiaNodeActiveStatus = Sysfile::NS_Active;
- break;
- case Sysfile::NS_HotSpare:
- jam();
- tsiaNodeActiveStatus = Sysfile::NS_HotSpare;
- break;
- case Sysfile::NS_NotDefined:
- jam();
- tsiaNodeActiveStatus = Sysfile::NS_NotDefined;
+ if (siaNodeptr.p->nodeStatus == NodeRecord::ALIVE)
+ {
+ jam();
+ siaNodeptr.p->activeStatus = Sysfile::NS_Active;
+ }
+ else
+ {
+ siaNodeptr.p->activeStatus = Sysfile::NS_NotActive_NotTakenOver;
+ }
+ }
break;
default:
- ndbrequire(false);
- return;
+ jam();
+ siaNodeptr.p->activeStatus = Sysfile::NS_NotDefined;
break;
- }//switch
- Sysfile::setNodeStatus(siaNodeptr.i, SYSFILE->nodeStatus,
- tsiaNodeActiveStatus);
+ }//if
+ Sysfile::setNodeStatus(siaNodeptr.i,
+ SYSFILE->nodeStatus,
+ siaNodeptr.p->activeStatus);
}//for
}//Dbdih::setInitialActiveStatus()
@@ -13537,7 +13723,8 @@
Dbdih::execDUMP_STATE_ORD(Signal* signal)
{
DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
- if (dumpState->args[0] == DumpStateOrd::DihDumpNodeRestartInfo) {
+ Uint32 arg = dumpState->args[0];
+ if (arg == DumpStateOrd::DihDumpNodeRestartInfo) {
infoEvent("c_nodeStartMaster.blockLcp = %d, c_nodeStartMaster.blockGcp = %d,
c_nodeStartMaster.wait = %d",
c_nodeStartMaster.blockLcp, c_nodeStartMaster.blockGcp, c_nodeStartMaster.wait);
infoEvent("cstartGcpNow = %d, cgcpStatus = %d",
@@ -13547,7 +13734,7 @@
infoEvent("cgcpOrderBlocked = %d, cgcpStartCounter = %d",
cgcpOrderBlocked, cgcpStartCounter);
}//if
- if (dumpState->args[0] == DumpStateOrd::DihDumpNodeStatusInfo) {
+ if (arg == DumpStateOrd::DihDumpNodeStatusInfo) {
NodeRecordPtr localNodePtr;
infoEvent("Printing nodeStatus of all nodes");
for (localNodePtr.i = 1; localNodePtr.i < MAX_NDB_NODES; localNodePtr.i++) {
@@ -13559,7 +13746,7 @@
}//for
}//if
- if (dumpState->args[0] == DumpStateOrd::DihPrintFragmentation){
+ if (arg == DumpStateOrd::DihPrintFragmentation){
infoEvent("Printing fragmentation of all tables --");
for(Uint32 i = 0; i<ctabFileSize; i++){
TabRecordPtr tabPtr;
@@ -13734,7 +13921,7 @@
}
}
- if(dumpState->args[0] == 7019 && signal->getLength() == 2)
+ if(arg == 7019 && signal->getLength() == 2)
{
char buf2[8+1];
NodeRecordPtr nodePtr;
@@ -13752,7 +13939,7 @@
nodePtr.p->m_nodefailSteps.getText(buf2));
}
- if(dumpState->args[0] == 7020 && signal->getLength() > 3)
+ if(arg == 7020 && signal->getLength() > 3)
{
Uint32 gsn= signal->theData[1];
Uint32 block= signal->theData[2];
@@ -13776,7 +13963,7 @@
gsn, getBlockName(block, "UNKNOWN"), length, buf);
}
- if(dumpState->args[0] == DumpStateOrd::DihDumpLCPState){
+ if(arg == DumpStateOrd::DihDumpLCPState){
infoEvent("-- Node %d LCP STATE --", getOwnNodeId());
infoEvent("lcpStatus = %d (update place = %d) ",
c_lcpState.lcpStatus, c_lcpState.lcpStatusUpdatedPlace);
@@ -13792,7 +13979,7 @@
infoEvent("-- Node %d LCP STATE --", getOwnNodeId());
}
- if(dumpState->args[0] == DumpStateOrd::DihDumpLCPMasterTakeOver){
+ if(arg == DumpStateOrd::DihDumpLCPMasterTakeOver){
infoEvent("-- Node %d LCP MASTER TAKE OVER STATE --", getOwnNodeId());
infoEvent
("c_lcpMasterTakeOverState.state = %d updatePlace = %d failedNodeId = %d",
@@ -13807,52 +13994,25 @@
infoEvent("-- Node %d LCP MASTER TAKE OVER STATE --", getOwnNodeId());
}
- if (signal->theData[0] == 7015){
- for(Uint32 i = 0; i<ctabFileSize; i++){
- TabRecordPtr tabPtr;
- tabPtr.i = i;
- ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
-
- if(tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
- continue;
-
- infoEvent
- ("Table %d: TabCopyStatus: %d TabUpdateStatus: %d TabLcpStatus: %d",
- tabPtr.i,
- tabPtr.p->tabCopyStatus,
- tabPtr.p->tabUpdateState,
- tabPtr.p->tabLcpStatus);
+ if (signal->theData[0] == 7015)
+ {
+ if (signal->getLength() == 1)
+ {
+ signal->theData[1] = 0;
+ }
- FragmentstorePtr fragPtr;
- for (Uint32 fid = 0; fid < tabPtr.p->totalfragments; fid++) {
- jam();
- getFragstore(tabPtr.p, fid, fragPtr);
-
- char buf[100], buf2[100];
- BaseString::snprintf(buf, sizeof(buf), " Fragment %d: noLcpReplicas==%d ",
- fid, fragPtr.p->noLcpReplicas);
-
- Uint32 num=0;
- ReplicaRecordPtr replicaPtr;
- replicaPtr.i = fragPtr.p->storedReplicas;
- do {
- ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
- BaseString::snprintf(buf2, sizeof(buf2), "%s %d(on %d)=%d(%s)",
- buf, num,
- replicaPtr.p->procNode,
- replicaPtr.p->lcpIdStarted,
- replicaPtr.p->lcpOngoingFlag ? "Ongoing" : "Idle");
- BaseString::snprintf(buf, sizeof(buf), "%s", buf2);
-
- num++;
- replicaPtr.i = replicaPtr.p->nextReplica;
- } while (replicaPtr.i != RNIL);
- infoEvent(buf);
- }
+ Uint32 tableId = signal->theData[1];
+ if (tableId < ctabFileSize)
+ {
+ signal->theData[0] = 7021;
+ execDUMP_STATE_ORD(signal);
+ signal->theData[0] = 7015;
+ signal->theData[1] = tableId + 1;
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal, 2, JBB);
}
}
- if(dumpState->args[0] == DumpStateOrd::EnableUndoDelayDataWrite){
+ if(arg == DumpStateOrd::EnableUndoDelayDataWrite){
ndbout << "Dbdih:: delay write of datapages for table = "
<< dumpState->args[1]<< endl;
// Send this dump to ACC and TUP
@@ -13882,7 +14042,7 @@
return;
}
- if(dumpState->args[0] == 7098){
+ if(arg == 7098){
if(signal->length() == 3){
jam();
infoEvent("startLcpRoundLoopLab(tabel=%d, fragment=%d)",
@@ -13895,10 +14055,73 @@
}
}
- if(dumpState->args[0] == DumpStateOrd::DihStartLcpImmediately){
+ if(arg == DumpStateOrd::DihStartLcpImmediately){
c_lcpState.ctimer += (1 << c_lcpState.clcpDelay);
return;
}
+
+ if (arg == DumpStateOrd::DihSetTimeBetweenGcp)
+ {
+ if (signal->getLength() == 1)
+ {
+ const ndb_mgm_configuration_iterator * p =
+ m_ctx.m_config.getOwnConfigIterator();
+ ndbrequire(p != 0);
+ ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &cgcpDelay);
+ }
+ else
+ {
+ cgcpDelay = signal->theData[1];
+ }
+ ndbout_c("Setting time between gcp : %d", cgcpDelay);
+ }
+
+ if (arg == 7021 && signal->getLength() == 2)
+ {
+ TabRecordPtr tabPtr;
+ tabPtr.i = signal->theData[1];
+ if (tabPtr.i >= ctabFileSize)
+ return;
+
+ ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
+
+ if(tabPtr.p->tabStatus != TabRecord::TS_ACTIVE)
+ return;
+
+ infoEvent
+ ("Table %d: TabCopyStatus: %d TabUpdateStatus: %d TabLcpStatus: %d",
+ tabPtr.i,
+ tabPtr.p->tabCopyStatus,
+ tabPtr.p->tabUpdateState,
+ tabPtr.p->tabLcpStatus);
+
+ FragmentstorePtr fragPtr;
+ for (Uint32 fid = 0; fid < tabPtr.p->totalfragments; fid++) {
+ jam();
+ getFragstore(tabPtr.p, fid, fragPtr);
+
+ char buf[100], buf2[100];
+ BaseString::snprintf(buf, sizeof(buf), " Fragment %d: noLcpReplicas==%d ",
+ fid, fragPtr.p->noLcpReplicas);
+
+ Uint32 num=0;
+ ReplicaRecordPtr replicaPtr;
+ replicaPtr.i = fragPtr.p->storedReplicas;
+ do {
+ ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
+ BaseString::snprintf(buf2, sizeof(buf2), "%s %d(on %d)=%d(%s)",
+ buf, num,
+ replicaPtr.p->procNode,
+ replicaPtr.p->lcpIdStarted,
+ replicaPtr.p->lcpOngoingFlag ? "Ongoing" : "Idle");
+ BaseString::snprintf(buf, sizeof(buf), "%s", buf2);
+
+ num++;
+ replicaPtr.i = replicaPtr.p->nextReplica;
+ } while (replicaPtr.i != RNIL);
+ infoEvent(buf);
+ }
+ }
}//Dbdih::execDUMP_STATE_ORD()
void
@@ -14627,11 +14850,36 @@
jam();
conf->senderData = senderData;
conf->gcp = cnewgcp;
+ conf->blockStatus = cgcpOrderBlocked;
sendSignal(senderRef, GSN_WAIT_GCP_CONF, signal,
WaitGCPConf::SignalLength, JBB);
return;
}//if
+ if (requestType == WaitGCPReq::BlockStartGcp)
+ {
+ jam();
+ conf->senderData = senderData;
+ conf->gcp = cnewgcp;
+ conf->blockStatus = cgcpOrderBlocked;
+ sendSignal(senderRef, GSN_WAIT_GCP_CONF, signal,
+ WaitGCPConf::SignalLength, JBB);
+ cgcpOrderBlocked = 1;
+ return;
+ }
+
+ if (requestType == WaitGCPReq::UnblockStartGcp)
+ {
+ jam();
+ conf->senderData = senderData;
+ conf->gcp = cnewgcp;
+ conf->blockStatus = cgcpOrderBlocked;
+ sendSignal(senderRef, GSN_WAIT_GCP_CONF, signal,
+ WaitGCPConf::SignalLength, JBB);
+ cgcpOrderBlocked = 0;
+ return;
+ }
+
if(isMaster()) {
/**
* Master
@@ -14643,6 +14891,7 @@
jam();
conf->senderData = senderData;
conf->gcp = coldgcp;
+ conf->blockStatus = cgcpOrderBlocked;
sendSignal(senderRef, GSN_WAIT_GCP_CONF, signal,
WaitGCPConf::SignalLength, JBB);
return;
@@ -14729,6 +14978,7 @@
conf->senderData = ptr.p->clientData;
conf->gcp = gcp;
+ conf->blockStatus = cgcpOrderBlocked;
sendSignal(ptr.p->clientRef, GSN_WAIT_GCP_CONF, signal,
WaitGCPConf::SignalLength, JBB);
@@ -14796,6 +15046,7 @@
c_waitGCPMasterList.next(ptr);
conf->senderData = clientData;
+ conf->blockStatus = cgcpOrderBlocked;
sendSignal(clientRef, GSN_WAIT_GCP_CONF, signal,
WaitGCPConf::SignalLength, JBB);
@@ -14913,6 +15164,125 @@
useInTransactions = false;
copyCompleted = false;
allowNodeStart = true;
+}
+
+// DICT lock slave
+
+void
+Dbdih::sendDictLockReq(Signal* signal, Uint32 lockType, Callback c)
+{
+ DictLockReq* req = (DictLockReq*)&signal->theData[0];
+ DictLockSlavePtr lockPtr;
+
+ c_dictLockSlavePool.seize(lockPtr);
+ ndbrequire(lockPtr.i != RNIL);
+
+ req->userPtr = lockPtr.i;
+ req->lockType = lockType;
+ req->userRef = reference();
+
+ lockPtr.p->lockPtr = RNIL;
+ lockPtr.p->lockType = lockType;
+ lockPtr.p->locked = false;
+ lockPtr.p->callback = c;
+
+ // handle rolling upgrade
+ {
+ Uint32 masterVersion = getNodeInfo(cmasterNodeId).m_version;
+
+ const unsigned int get_major = getMajor(masterVersion);
+ const unsigned int get_minor = getMinor(masterVersion);
+ const unsigned int get_build = getBuild(masterVersion);
+ ndbrequire(get_major >= 4);
+
+ if (masterVersion < NDBD_DICT_LOCK_VERSION_5 ||
+ masterVersion < NDBD_DICT_LOCK_VERSION_5_1 &&
+ get_major == 5 && get_minor == 1 ||
+ ERROR_INSERTED(7176)) {
+ jam();
+
+ infoEvent("DIH: detect upgrade: master node %u old version %u.%u.%u",
+ (unsigned int)cmasterNodeId, get_major, get_minor, get_build);
+
+ DictLockConf* conf = (DictLockConf*)&signal->theData[0];
+ conf->userPtr = lockPtr.i;
+ conf->lockType = lockType;
+ conf->lockPtr = ZNIL;
+
+ sendSignal(reference(), GSN_DICT_LOCK_CONF, signal,
+ DictLockConf::SignalLength, JBB);
+ return;
+ }
+ }
+
+ BlockReference dictMasterRef = calcDictBlockRef(cmasterNodeId);
+ sendSignal(dictMasterRef, GSN_DICT_LOCK_REQ, signal,
+ DictLockReq::SignalLength, JBB);
+}
+
+void
+Dbdih::execDICT_LOCK_CONF(Signal* signal)
+{
+ jamEntry();
+ recvDictLockConf(signal);
+}
+
+void
+Dbdih::execDICT_LOCK_REF(Signal* signal)
+{
+ jamEntry();
+ ndbrequire(false);
+}
+
+void
+Dbdih::recvDictLockConf(Signal* signal)
+{
+ const DictLockConf* conf = (const DictLockConf*)&signal->theData[0];
+
+ DictLockSlavePtr lockPtr;
+ c_dictLockSlavePool.getPtr(lockPtr, conf->userPtr);
+
+ lockPtr.p->lockPtr = conf->lockPtr;
+ ndbrequire(lockPtr.p->lockType == conf->lockType);
+ ndbrequire(lockPtr.p->locked == false);
+ lockPtr.p->locked = true;
+
+ lockPtr.p->callback.m_callbackData = lockPtr.i;
+ execute(signal, lockPtr.p->callback, 0);
+}
+
+void
+Dbdih::sendDictUnlockOrd(Signal* signal, Uint32 lockSlavePtrI)
+{
+ DictUnlockOrd* ord = (DictUnlockOrd*)&signal->theData[0];
+
+ DictLockSlavePtr lockPtr;
+ c_dictLockSlavePool.getPtr(lockPtr, lockSlavePtrI);
+
+ ord->lockPtr = lockPtr.p->lockPtr;
+ ord->lockType = lockPtr.p->lockType;
+
+ c_dictLockSlavePool.release(lockPtr);
+
+ // handle rolling upgrade
+ {
+ Uint32 masterVersion = getNodeInfo(cmasterNodeId).m_version;
+
+ const unsigned int get_major = getMajor(masterVersion);
+ const unsigned int get_minor = getMinor(masterVersion);
+ ndbrequire(get_major >= 4);
+
+ if (masterVersion < NDBD_DICT_LOCK_VERSION_5 ||
+ masterVersion < NDBD_DICT_LOCK_VERSION_5_1 &&
+ get_major == 5 && get_minor == 1 ||
+ ERROR_INSERTED(7176)) {
+ return;
+ }
+ }
+
+ BlockReference dictMasterRef = calcDictBlockRef(cmasterNodeId);
+ sendSignal(dictMasterRef, GSN_DICT_UNLOCK_ORD, signal,
+ DictUnlockOrd::SignalLength, JBB);
}
void Dbdih::execALTER_NG_REQ(Signal *signal)
--- 1.8/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp 2006-04-06 20:24:09 +10:00
+++ 1.9/storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp 2006-11-23 23:04:16 +11:00
@@ -264,8 +264,8 @@
void waitpoint61Lab(Signal* signal);
void waitpoint71Lab(Signal* signal);
- void updateNodeState(Signal* signal, const NodeState & newState) const ;
- void getNodeGroup(Signal* signal);
+ void updateNodeState(Signal* signal, const NodeState & newState);
+ Uint32 getNodeGroup(Signal* signal);
// Initialisation
void initData();
@@ -310,7 +310,7 @@
Uint16 cdynamicNodeId;
Uint32 c_fsRemoveCount;
- Uint32 c_nodeGroup;
+
void clearFilesystem(Signal* signal);
void execFSREMOVECONF(Signal* signal);
--- 1.39/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2006-08-14 22:00:27 +10:00
+++ 1.40/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2006-11-23 23:49:54 +11:00
@@ -297,8 +297,8 @@
break;
case 6:
jam();
- getNodeGroup(signal);
- // Fall through
+ //getNodeGroup(signal);
+ sendSttorry(signal);
break;
case ZSTART_PHASE_8:
jam();
@@ -315,7 +315,7 @@
}//switch
}//Ndbcntr::execSTTOR()
-void
+Uint32
Ndbcntr::getNodeGroup(Signal* signal){
jam();
CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
@@ -323,8 +323,9 @@
EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal,
CheckNodeGroups::SignalLength);
jamEntry();
- c_nodeGroup = sd->output;
- sendSttorry(signal);
+ return sd->output;
+// c_nodeGroup = sd->output;
+// sendSttorry(signal);
}
/*******************************/
@@ -2082,9 +2083,10 @@
#endif
}//Ndbcntr::execSET_VAR_REQ()
-void Ndbcntr::updateNodeState(Signal* signal, const NodeState& newState) const{
+void Ndbcntr::updateNodeState(Signal* signal, const NodeState& newState) {
NodeStateRep * const stateRep = (NodeStateRep *)&signal->theData[0];
+ Uint32 nodeGroup= getNodeGroup(signal);
if (newState.startLevel == NodeState::SL_STARTED)
{
CRASH_INSERTION(1000);
@@ -2092,7 +2094,7 @@
stateRep->nodeState = newState;
stateRep->nodeState.masterNodeId = cmasterNodeId;
- stateRep->nodeState.setNodeGroup(c_nodeGroup);
+ stateRep->nodeState.setNodeGroup(nodeGroup);
for(Uint32 i = 0; i<ALL_BLOCKS_SZ; i++){
sendSignal(ALL_BLOCKS[i].Ref, GSN_NODE_STATE_REP, signal,
@@ -2583,8 +2585,9 @@
void Ndbcntr::execWAIT_GCP_CONF(Signal* signal){
jamEntry();
- WaitGCPConf* conf = (WaitGCPConf*)signal->getDataPtr();
+ Uint32 nodeGroup= getNodeGroup(signal);
+ WaitGCPConf* conf = (WaitGCPConf*)signal->getDataPtr();
switch(conf->senderData){
case StopRecord::SR_BLOCK_GCP_START_GCP:
{
@@ -2649,7 +2652,7 @@
NodeStateRep * rep = (NodeStateRep *)&signal->theData[0];
rep->nodeState = newState;
rep->nodeState.masterNodeId = cmasterNodeId;
- rep->nodeState.setNodeGroup(c_nodeGroup);
+ rep->nodeState.setNodeGroup(nodeGroup);
EXECUTE_DIRECT(QMGR, GSN_NODE_STATE_REP, signal,
NodeStateRep::SignalLength);
--- 1.31/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2006-02-17 15:07:13 +11:00
+++ 1.32/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2006-11-23 23:04:16 +11:00
@@ -42,6 +42,7 @@
#include <signaldata/GCPSave.hpp>
#include <signaldata/CreateTab.hpp>
#include <signaldata/DropTab.hpp>
+#include <signaldata/AlterTable.hpp>
#include <signaldata/AlterTab.hpp>
#include <signaldata/DihFragCount.hpp>
#include <signaldata/SystemError.hpp>
@@ -162,7 +163,7 @@
Uint32 senderData = req->senderData;
const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
+ m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
// SumaParticipant
@@ -409,7 +410,22 @@
jam();
if (ref != NULL)
+ {
+ switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
+ {
+ case UtilSequenceRef::NoSuchSequence:
+ ndbrequire(false);
+ case UtilSequenceRef::TCError:
+ {
+ char buf[128];
+ snprintf(buf, sizeof(buf),
+ "Startup failed during sequence creation. TC error %d",
+ ref->TCErrorCode);
+ progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
+ }
+ }
ndbrequire(false);
+ }
sendSTTORRY(signal);
}
@@ -1046,6 +1062,15 @@
const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
Subscription::REPORT_SUBSCRIBE : 0;
const Uint32 tableId = req.tableId;
+ Subscription::State state = (Subscription::State) req.state;
+ if (signal->getLength() != SubCreateReq::SignalLength2)
+ {
+ /*
+ api or restarted by older version
+ if restarted by old version, do the best we can
+ */
+ state = Subscription::DEFINED;
+ }
Subscription key;
key.m_subscriptionId = subId;
@@ -1073,6 +1098,17 @@
addTableId(req.tableId, subPtr, 0);
}
} else {
+ if (c_startup.m_restart_server_node_id &&
+ refToNode(subRef) != c_startup.m_restart_server_node_id)
+ {
+ /**
+ * only allow "restart_server" Suma's to come through
+ * for restart purposes
+ */
+ jam();
+ sendSubStartRef(signal, 1405);
+ DBUG_VOID_RETURN;
+ }
// Check that id/key is unique
if(c_subscriptions.find(subPtr, key)) {
jam();
@@ -1096,8 +1132,9 @@
subPtr.p->m_options = reportSubscribe | reportAll;
subPtr.p->m_tableId = tableId;
subPtr.p->m_table_ptrI = RNIL;
- subPtr.p->m_state = Subscription::DEFINED;
+ subPtr.p->m_state = state;
subPtr.p->n_subscribers = 0;
+ subPtr.p->m_current_sync_ptrI = RNIL;
fprintf(stderr, "table %d options %x\n", subPtr.p->m_tableId,
subPtr.p->m_options);
DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
@@ -1169,13 +1206,15 @@
DBUG_PRINT("info",("c_syncPool size: %d free: %d",
c_syncPool.getSize(),
c_syncPool.getNoOfFree()));
- new (syncPtr.p) Ptr<SyncRecord>;
+
syncPtr.p->m_senderRef = req->senderRef;
syncPtr.p->m_senderData = req->senderData;
syncPtr.p->m_subscriptionPtrI = subPtr.i;
syncPtr.p->ptrI = syncPtr.i;
syncPtr.p->m_error = 0;
+ subPtr.p->m_current_sync_ptrI = syncPtr.i;
+
{
jam();
syncPtr.p->m_tableList.append(&subPtr.p->m_tableId, 1);
@@ -1413,12 +1452,13 @@
tabPtr.p->m_error = 0;
tabPtr.p->m_schemaVersion = RNIL;
tabPtr.p->m_state = Table::DEFINING;
- tabPtr.p->m_hasTriggerDefined[0] = 0;
- tabPtr.p->m_hasTriggerDefined[1] = 0;
- tabPtr.p->m_hasTriggerDefined[2] = 0;
- tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_drop_subbPtr.p = 0;
+ for (int j= 0; j < 3; j++)
+ {
+ tabPtr.p->m_hasTriggerDefined[j] = 0;
+ tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
+ tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
+ }
c_tables.add(tabPtr);
@@ -1449,7 +1489,9 @@
jam();
DBUG_ENTER("Suma::completeOneSubscriber");
- if (tabPtr.p->m_error)
+ if (tabPtr.p->m_error &&
+ (c_startup.m_restart_server_node_id == 0 ||
+ tabPtr.p->m_state != Table::DROPPED))
{
sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
SubscriptionData::TableData);
@@ -1534,8 +1576,44 @@
void
Suma::execGET_TABINFOREF(Signal* signal){
jamEntry();
- /* ToDo handle this */
- ndbrequire(false);
+ GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
+ Uint32 tableId = ref->tableId;
+ Uint32 senderData = ref->senderData;
+ GetTabInfoRef::ErrorCode errorCode =
+ (GetTabInfoRef::ErrorCode) ref->errorCode;
+ int do_resend_request = 0;
+ TablePtr tabPtr;
+ c_tablePool.getPtr(tabPtr, senderData);
+ switch (errorCode)
+ {
+ case GetTabInfoRef::TableNotDefined:
+ // wrong state
+ break;
+ case GetTabInfoRef::InvalidTableId:
+ // no such table
+ break;
+ case GetTabInfoRef::Busy:
+ do_resend_request = 1;
+ break;
+ case GetTabInfoRef::TableNameTooLong:
+ ndbrequire(false);
+ }
+ if (do_resend_request)
+ {
+ GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = senderData;
+ req->requestType =
+ GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+ req->tableId = tableId;
+ sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
+ 30, GetTabInfoReq::SignalLength);
+ return;
+ }
+ tabPtr.p->m_state = Table::DROPPED;
+ tabPtr.p->m_error = errorCode;
+ completeAllSubscribers(signal, tabPtr);
+ completeInitTable(signal, tabPtr);
}
void
@@ -2065,7 +2143,7 @@
ndbrequire(c_subscriptions.find(subPtr, key));
ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
- req->senderData = subPtr.i;
+ req->senderData = subPtr.p->m_current_sync_ptrI;
req->closeFlag = 0;
req->transId1 = 0;
req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
@@ -2104,6 +2182,12 @@
#endif
release();
+
+ Ptr<Subscription> subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_current_sync_ptrI == ptrI);
+ subPtr.p->m_current_sync_ptrI = RNIL;
+
suma.c_syncPool.release(ptrI);
DBUG_PRINT("info",("c_syncPool size: %d free: %d",
suma.c_syncPool.getSize(),
@@ -2150,7 +2234,7 @@
Subscription key;
key.m_subscriptionId = req->subscriptionId;
key.m_subscriptionKey = req->subscriptionKey;
-
+
if (c_startup.m_restart_server_node_id &&
refToNode(senderRef) != c_startup.m_restart_server_node_id)
{
@@ -2170,13 +2254,24 @@
DBUG_VOID_RETURN;
}
- if (subPtr.p->m_state != Subscription::DEFINED) {
+ if (subPtr.p->m_state == Subscription::LOCKED) {
jam();
DBUG_PRINT("info",("Locked"));
sendSubStartRef(signal, 1411);
DBUG_VOID_RETURN;
}
+ if (subPtr.p->m_state == Subscription::DROPPED &&
+ c_startup.m_restart_server_node_id == 0) {
+ jam();
+ DBUG_PRINT("info",("Dropped"));
+ sendSubStartRef(signal, 1418);
+ DBUG_VOID_RETURN;
+ }
+
+ ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
+ c_startup.m_restart_server_node_id);
+
SubscriberPtr subbPtr;
if(!c_subscriberPool.seize(subbPtr)){
jam();
@@ -2190,7 +2285,8 @@
c_subscriber_nodes.set(refToNode(subscriberRef));
// setup subscription record
- subPtr.p->m_state = Subscription::LOCKED;
+ if (subPtr.p->m_state == Subscription::DEFINED)
+ subPtr.p->m_state = Subscription::LOCKED;
// store these here for later use
subPtr.p->m_senderRef = senderRef;
subPtr.p->m_senderData = senderData;
@@ -2238,8 +2334,14 @@
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- ndbrequire( subPtr.p->m_state == Subscription::LOCKED )
- subPtr.p->m_state = Subscription::DEFINED;
+ ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
+ (subPtr.p->m_state == Subscription::DROPPED &&
+ c_startup.m_restart_server_node_id));
+ if (subPtr.p->m_state == Subscription::LOCKED)
+ {
+ jam();
+ subPtr.p->m_state = Subscription::DEFINED;
+ }
subPtr.p->n_subscribers++;
DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
@@ -2290,8 +2392,14 @@
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- ndbrequire( subPtr.p->m_state == Subscription::LOCKED );
- subPtr.p->m_state = Subscription::DEFINED;
+ ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
+ (subPtr.p->m_state == Subscription::DROPPED &&
+ c_startup.m_restart_server_node_id));
+ if (subPtr.p->m_state == Subscription::LOCKED)
+ {
+ jam();
+ subPtr.p->m_state = Subscription::DEFINED;
+ }
SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
ref->senderRef = reference();
@@ -2357,6 +2465,18 @@
DBUG_VOID_RETURN;
}
+ if (c_startup.m_restart_server_node_id &&
+ refToNode(senderRef) != c_startup.m_restart_server_node_id)
+ {
+ /**
+ * only allow "restart_server" Suma's to come through
+ * for restart purposes
+ */
+ jam();
+ sendSubStopRef(signal, 1405);
+ DBUG_VOID_RETURN;
+ }
+
if (subPtr.p->m_state == Subscription::LOCKED) {
jam();
DBUG_PRINT("error", ("locked"));
@@ -2368,7 +2488,8 @@
TablePtr tabPtr;
tabPtr.i = subPtr.p->m_table_ptrI;
- if (!(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
+ if (tabPtr.i == RNIL ||
+ !(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
tabPtr.p->m_tableId != subPtr.p->m_tableId)
{
jam();
@@ -2378,6 +2499,13 @@
DBUG_VOID_RETURN;
}
+ if (tabPtr.p->m_drop_subbPtr.p != 0) {
+ jam();
+ DBUG_PRINT("error", ("table locked"));
+ sendSubStopRef(signal, 1420);
+ DBUG_VOID_RETURN;
+ }
+
DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u",
subPtr.i, subPtr.p->m_tableId, tabPtr.i,
subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
@@ -2430,7 +2558,7 @@
subPtr.p->m_senderRef = senderRef; // store ref to requestor
subPtr.p->m_senderData = senderData; // store ref to requestor
- tabPtr.p->m_drop_subbPtr= subbPtr;
+ tabPtr.p->m_drop_subbPtr = subbPtr;
if (subPtr.p->m_state == Subscription::DEFINED)
{
@@ -2447,6 +2575,7 @@
tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
tabPtr.p->checkRelease(*this);
sendSubStopComplete(signal, tabPtr.p->m_drop_subbPtr);
+ tabPtr.p->m_drop_subbPtr.p = 0;
}
else
{
@@ -2520,12 +2649,16 @@
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci + 1; // XXX ???
data->tableId = 0;
- data->operation = NdbDictionary::Event::_TE_STOP;
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo,
+ NdbDictionary::Event::_TE_STOP);
+ SubTableData::setNdbdNodeId(data->requestInfo,
+ getOwnNodeId());
data->senderData = subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
}
-
+
SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
conf->senderRef= reference();
@@ -2552,6 +2685,26 @@
SubscriptionPtr subPtr,
SubscriberPtr subbPtr)
{
+ SubTableData * data = (SubTableData*)signal->getDataPtrSend();
+
+ if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE &&
+ !c_startup.m_restart_server_node_id)
+ {
+ data->gci = m_last_complete_gci + 1;
+ data->tableId = subPtr.p->m_tableId;
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo,
+ NdbDictionary::Event::_TE_ACTIVE);
+ SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
+ SubTableData::setReqNodeId(data->requestInfo,
+ refToNode(subbPtr.p->m_senderRef));
+ data->changeMask = 0;
+ data->totalLen = 0;
+ data->senderData = subbPtr.p->m_senderData;
+ sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength, JBB);
+ }
+
if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
{
return;
@@ -2562,11 +2715,15 @@
return;
}
- SubTableData * data = (SubTableData*)signal->getDataPtrSend();
+//#ifdef VM_TRACE
+ ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
+ subPtr.i, subPtr.p->n_subscribers);
+//#endif
data->gci = m_last_complete_gci + 1;
data->tableId = subPtr.p->m_tableId;
- data->operation = table_event;
- data->ndbd_nodeid = refToNode(reference());
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo, table_event);
+ SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
data->changeMask = 0;
data->totalLen = 0;
@@ -2578,16 +2735,35 @@
{
if (i_subbPtr.p->m_subPtrI == subPtr.i)
{
- data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
+ SubTableData::setReqNodeId(data->requestInfo,
+ refToNode(subbPtr.p->m_senderRef));
data->senderData = i_subbPtr.p->m_senderData;
sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
+//#ifdef VM_TRACE
+ ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
+ table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
+ "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
+ refToNode(i_subbPtr.p->m_senderRef),
+ refToNode(subbPtr.p->m_senderRef), data->senderData
+ );
+//#endif
if (i_subbPtr.i != subbPtr.i)
{
- data->req_nodeid = refToNode(i_subbPtr.p->m_senderRef);
+ SubTableData::setReqNodeId(data->requestInfo,
+ refToNode(i_subbPtr.p->m_senderRef));
+
data->senderData = subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
+//#ifdef VM_TRACE
+ ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
+ table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
+ "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
+ refToNode(subbPtr.p->m_senderRef),
+ refToNode(i_subbPtr.p->m_senderRef), data->senderData
+ );
+//#endif
}
}
}
@@ -2745,6 +2921,9 @@
jam();
DBUG_ENTER("Suma::dropTrigger");
+ m_hasOutstandingTriggerReq[0] =
+ m_hasOutstandingTriggerReq[1] =
+ m_hasOutstandingTriggerReq[2] = 1;
for(Uint32 j = 0; j<3; j++){
jam();
suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
@@ -2823,14 +3002,18 @@
suma.suma_ndbrequire(type < 3);
suma.suma_ndbrequire(m_triggerIds[type] == triggerId);
+ suma.suma_ndbrequire(m_hasTriggerDefined[type] > 0);
+ suma.suma_ndbrequire(m_hasOutstandingTriggerReq[type] == 1);
m_hasTriggerDefined[type]--;
+ m_hasOutstandingTriggerReq[type] = 0;
if (m_hasTriggerDefined[type] == 0)
{
jam();
m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
}
- if( m_hasTriggerDefined[0] != m_hasTriggerDefined[1] ||
- m_hasTriggerDefined[0] != m_hasTriggerDefined[2])
+ if( m_hasOutstandingTriggerReq[0] ||
+ m_hasOutstandingTriggerReq[1] ||
+ m_hasOutstandingTriggerReq[2])
{
// more to come
jam();
@@ -2848,6 +3031,7 @@
checkRelease(suma);
suma.sendSubStopComplete(signal, m_drop_subbPtr);
+ m_drop_subbPtr.p = 0;
}
void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
@@ -2980,7 +3164,9 @@
Uint32 ref = subPtr.p->m_senderRef;
sdata->tableId = syncPtr.p->m_currentTableId;
sdata->senderData = subPtr.p->m_senderData;
- sdata->operation = NdbDictionary::Event::_TE_SCAN; // Scan
+ sdata->requestInfo = 0;
+ SubTableData::setOperation(sdata->requestInfo,
+ NdbDictionary::Event::_TE_SCAN); // Scan
sdata->gci = 0; // Undefined
#if PRINT_ONLY
ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
@@ -3196,7 +3382,8 @@
SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
data->gci = gci;
data->tableId = tabPtr.p->m_tableId;
- data->operation = event;
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
@@ -3402,13 +3589,17 @@
DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
tabPtr.p->m_state = Table::DROPPED;
- tabPtr.p->m_hasTriggerDefined[0] = 0;
- tabPtr.p->m_hasTriggerDefined[1] = 0;
- tabPtr.p->m_hasTriggerDefined[2] = 0;
- tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
-
+ for (int j= 0; j < 3; j++)
+ {
+ if (!tabPtr.p->m_hasOutstandingTriggerReq[j])
+ {
+ tabPtr.p->m_hasTriggerDefined[j] = 0;
+ tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
+ tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
+ }
+ else
+ tabPtr.p->m_hasTriggerDefined[j] = 1;
+ }
if (senderRef == 0)
{
DBUG_VOID_RETURN;
@@ -3418,8 +3609,9 @@
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci+1;
data->tableId = tableId;
- data->operation = NdbDictionary::Event::_TE_DROP;
- data->req_nodeid = refToNode(senderRef);
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo,NdbDictionary::Event::_TE_DROP);
+ SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
{
LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
@@ -3447,7 +3639,7 @@
DBUG_VOID_RETURN;
}
-static Uint32 b_dti_buf[10000];
+static Uint32 b_dti_buf[MAX_WORDS_META_FILE];
void
Suma::execALTER_TAB_REQ(Signal *signal)
@@ -3469,7 +3661,7 @@
}
DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
-
+ Table::State old_state = tabPtr.p->m_state;
tabPtr.p->m_state = Table::ALTERED;
// triggers must be removed, waiting for sub stop req for that
@@ -3497,8 +3689,10 @@
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci+1;
data->tableId = tableId;
- data->operation = NdbDictionary::Event::_TE_ALTER;
- data->req_nodeid = refToNode(senderRef);
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo,
+ NdbDictionary::Event::_TE_ALTER);
+ SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
data->logType = 0;
data->changeMask = changeMask;
data->totalLen = tabInfoPtr.sz;
@@ -3527,6 +3721,11 @@
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
}
}
+ if (AlterTableReq::getFrmFlag(changeMask))
+ {
+ // Frm changes only are handled on-line
+ tabPtr.p->m_state = old_state;
+ }
DBUG_VOID_RETURN;
}
@@ -3640,7 +3839,17 @@
sendSubRemoveRef(signal, req, 1413);
DBUG_VOID_RETURN;
}
-
+ if (subPtr.p->m_state == Subscription::DROPPED)
+ {
+ /**
+ * already dropped
+ */
+ jam();
+ sendSubRemoveRef(signal, req, 1419);
+ DBUG_VOID_RETURN;
+ }
+
+ ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
if (subPtr.p->n_subscribers == 0)
@@ -3953,8 +4162,9 @@
case SubCreateReq::TableEvent:
jam();
req->tableId = subPtr.p->m_tableId;
+ req->state = subPtr.p->m_state;
suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
- SubCreateReq::SignalLength, JBB);
+ SubCreateReq::SignalLength2, JBB);
DBUG_VOID_RETURN;
case SubCreateReq::SingleTableScan:
jam();
@@ -4712,7 +4922,8 @@
SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
data->gci = last_gci;
data->tableId = tabPtr.p->m_tableId;
- data->operation = event;
+ data->requestInfo = 0;
+ SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
--- 1.61/storage/ndb/src/mgmapi/mgmapi.cpp 2006-02-21 22:51:14 +11:00
+++ 1.62/storage/ndb/src/mgmapi/mgmapi.cpp 2006-11-23 23:04:17 +11:00
@@ -27,6 +27,7 @@
#include <mgmapi_debug.h>
#include "mgmapi_configuration.hpp"
#include <socket_io.h>
+#include <version.h>
#include <NdbOut.hpp>
#include <SocketServer.hpp>
@@ -103,6 +104,9 @@
#endif
FILE *errstream;
char *m_name;
+ int mgmd_version_major;
+ int mgmd_version_minor;
+ int mgmd_version_build;
};
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
@@ -138,6 +142,12 @@
return ret; \
}
+#define DBUG_CHECK_REPLY(reply, ret) \
+ if (reply == NULL) { \
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, ""); \
+ DBUG_RETURN(ret); \
+ }
+
/*****************************************************************************
* Handles
*****************************************************************************/
@@ -168,6 +178,10 @@
h->logfile = 0;
#endif
+ h->mgmd_version_major= -1;
+ h->mgmd_version_minor= -1;
+ h->mgmd_version_build= -1;
+
DBUG_PRINT("info", ("handle=0x%x", (UintPtr)h));
DBUG_RETURN(h);
}
@@ -361,8 +375,9 @@
* Print some info about why the parser returns NULL
*/
fprintf(handle->errstream,
- "Error in mgm protocol parser. cmd: >%s< status: %d curr: %d\n",
- cmd, (Uint32)ctx.m_status, ctx.m_currentToken);
+ "Error in mgm protocol parser. cmd: >%s< status: %d curr: %s\n",
+ cmd, (Uint32)ctx.m_status,
+ (ctx.m_currentToken)?ctx.m_currentToken:"NULL");
DBUG_PRINT("info",("ctx.status: %d, ctx.m_currentToken: %s",
ctx.m_status, ctx.m_currentToken));
}
@@ -678,7 +693,11 @@
out.println("");
char buf[1024];
- in.gets(buf, sizeof(buf));
+ if(!in.gets(buf, sizeof(buf)))
+ {
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
+ return NULL;
+ }
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
@@ -687,7 +706,11 @@
return NULL;
}
- in.gets(buf, sizeof(buf));
+ if(!in.gets(buf, sizeof(buf)))
+ {
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
+ return NULL;
+ }
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
@@ -695,10 +718,12 @@
Vector<BaseString> split;
tmp.split(split, ":");
if(split.size() != 2){
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
if(!(split[0].trim() == "nodes")){
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
@@ -708,6 +733,13 @@
malloc(sizeof(ndb_mgm_cluster_state)+
noOfNodes*(sizeof(ndb_mgm_node_state)+sizeof("000.000.000.000#")));
+ if(!state)
+ {
+ SET_ERROR(handle, NDB_MGM_OUT_OF_MEMORY,
+ "Allocating ndb_mgm_cluster_state");
+ return NULL;
+ }
+
state->no_of_nodes= noOfNodes;
ndb_mgm_node_state * ptr = &state->node_states[0];
int nodeId = 0;
@@ -717,7 +749,13 @@
}
i = -1; ptr--;
for(; i<noOfNodes; ){
- in.gets(buf, sizeof(buf));
+ if(!in.gets(buf, sizeof(buf)))
+ {
+ free(state);
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY,
+ "Probably disconnected");
+ return NULL;
+ }
tmp.assign(buf);
if(tmp.trim() == ""){
@@ -746,6 +784,7 @@
if(i+1 != noOfNodes){
free(state);
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, "Node count mismatch");
return NULL;
}
@@ -826,37 +865,81 @@
return ndb_mgm_stop2(handle, no_of_nodes, node_list, 0);
}
-
extern "C"
-int
+int
ndb_mgm_stop2(NdbMgmHandle handle, int no_of_nodes, const int * node_list,
int abort)
{
- SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_stop2");
- const ParserRow<ParserDummy> stop_reply[] = {
+ int disconnect;
+ return ndb_mgm_stop3(handle, no_of_nodes, node_list, abort, &disconnect);
+}
+
+
+extern "C"
+int
+ndb_mgm_stop3(NdbMgmHandle handle, int no_of_nodes, const int * node_list,
+ int abort, int *disconnect)
+{
+ SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_stop3");
+ const ParserRow<ParserDummy> stop_reply_v1[] = {
MGM_CMD("stop reply", NULL, ""),
MGM_ARG("stopped", Int, Optional, "No of stopped nodes"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
};
+ const ParserRow<ParserDummy> stop_reply_v2[] = {
+ MGM_CMD("stop reply", NULL, ""),
+ MGM_ARG("stopped", Int, Optional, "No of stopped nodes"),
+ MGM_ARG("result", String, Mandatory, "Error message"),
+ MGM_ARG("disconnect", Int, Mandatory, "Need to disconnect"),
+ MGM_END()
+ };
+
CHECK_HANDLE(handle, -1);
CHECK_CONNECTED(handle, -1);
- if(no_of_nodes < 0){
+ if(handle->mgmd_version_build==-1)
+ {
+ char verstr[50];
+ if(!ndb_mgm_get_version(handle,
+ &(handle->mgmd_version_major),
+ &(handle->mgmd_version_minor),
+ &(handle->mgmd_version_build),
+ sizeof(verstr),
+ verstr))
+ {
+ return -1;
+ }
+ }
+ int use_v2= ((handle->mgmd_version_major==5)
+ && (
+ (handle->mgmd_version_minor==0 &&
handle->mgmd_version_build>=21)
+ ||(handle->mgmd_version_minor==1 &&
handle->mgmd_version_build>=12)
+ ||(handle->mgmd_version_minor>1)
+ )
+ )
+ || (handle->mgmd_version_major>5);
+
+ if(no_of_nodes < -1){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NUMBER_OF_NODES,
"Negative number of nodes requested to stop");
return -1;
}
Uint32 stoppedNoOfNodes = 0;
- if(no_of_nodes == 0){
+ if(no_of_nodes <= 0){
/**
- * All database nodes should be stopped
+ * All nodes should be stopped (all or just db)
*/
Properties args;
args.put("abort", abort);
+ if(use_v2)
+ args.put("stop", (no_of_nodes==-1)?"mgm,db":"db");
const Properties *reply;
- reply = ndb_mgm_call(handle, stop_reply, "stop all", &args);
+ if(use_v2)
+ reply = ndb_mgm_call(handle, stop_reply_v2, "stop all", &args);
+ else
+ reply = ndb_mgm_call(handle, stop_reply_v1, "stop all", &args);
CHECK_REPLY(reply, -1);
if(!reply->get("stopped", &stoppedNoOfNodes)){
@@ -865,6 +948,10 @@
delete reply;
return -1;
}
+ if(use_v2)
+ reply->get("disconnect", (Uint32*)disconnect);
+ else
+ *disconnect= 0;
BaseString result;
reply->get("result", result);
if(strcmp(result.c_str(), "Ok") != 0) {
@@ -890,7 +977,11 @@
args.put("abort", abort);
const Properties *reply;
- reply = ndb_mgm_call(handle, stop_reply, "stop", &args);
+ if(use_v2)
+ reply = ndb_mgm_call(handle, stop_reply_v2, "stop v2", &args);
+ else
+ reply = ndb_mgm_call(handle, stop_reply_v1, "stop", &args);
+
CHECK_REPLY(reply, stoppedNoOfNodes);
if(!reply->get("stopped", &stoppedNoOfNodes)){
SET_ERROR(handle, NDB_MGM_STOP_FAILED,
@@ -898,6 +989,10 @@
delete reply;
return -1;
}
+ if(use_v2)
+ reply->get("disconnect", (Uint32*)disconnect);
+ else
+ *disconnect= 0;
BaseString result;
reply->get("result", result);
if(strcmp(result.c_str(), "Ok") != 0) {
@@ -911,20 +1006,69 @@
extern "C"
int
+ndb_mgm_restart(NdbMgmHandle handle, int no_of_nodes, const int *node_list)
+{
+ SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_restart");
+ return ndb_mgm_restart2(handle, no_of_nodes, node_list, 0, 0, 0);
+}
+
+extern "C"
+int
ndb_mgm_restart2(NdbMgmHandle handle, int no_of_nodes, const int * node_list,
int initial, int nostart, int abort)
{
- SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_restart2");
+ int disconnect;
+
+ return ndb_mgm_restart3(handle, no_of_nodes, node_list, initial, nostart,
+ abort, &disconnect);
+}
+
+extern "C"
+int
+ndb_mgm_restart3(NdbMgmHandle handle, int no_of_nodes, const int * node_list,
+ int initial, int nostart, int abort, int *disconnect)
+{
+ SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_restart3");
Uint32 restarted = 0;
- const ParserRow<ParserDummy> restart_reply[] = {
+ const ParserRow<ParserDummy> restart_reply_v1[] = {
MGM_CMD("restart reply", NULL, ""),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_ARG("restarted", Int, Optional, "No of restarted nodes"),
MGM_END()
};
+ const ParserRow<ParserDummy> restart_reply_v2[] = {
+ MGM_CMD("restart reply", NULL, ""),
+ MGM_ARG("result", String, Mandatory, "Error message"),
+ MGM_ARG("restarted", Int, Optional, "No of restarted nodes"),
+ MGM_ARG("disconnect", Int, Optional, "Disconnect to apply"),
+ MGM_END()
+ };
+
CHECK_HANDLE(handle, -1);
CHECK_CONNECTED(handle, -1);
-
+
+ if(handle->mgmd_version_build==-1)
+ {
+ char verstr[50];
+ if(!ndb_mgm_get_version(handle,
+ &(handle->mgmd_version_major),
+ &(handle->mgmd_version_minor),
+ &(handle->mgmd_version_build),
+ sizeof(verstr),
+ verstr))
+ {
+ return -1;
+ }
+ }
+ int use_v2= ((handle->mgmd_version_major==5)
+ && (
+ (handle->mgmd_version_minor==0 &&
handle->mgmd_version_build>=21)
+ ||(handle->mgmd_version_minor==1 &&
handle->mgmd_version_build>=12)
+ ||(handle->mgmd_version_minor>1)
+ )
+ )
+ || (handle->mgmd_version_major>5);
+
if(no_of_nodes < 0){
SET_ERROR(handle, NDB_MGM_RESTART_FAILED,
"Restart requested of negative number of nodes");
@@ -939,7 +1083,7 @@
const Properties *reply;
const int timeout = handle->read_timeout;
handle->read_timeout= 5*60*1000; // 5 minutes
- reply = ndb_mgm_call(handle, restart_reply, "restart all", &args);
+ reply = ndb_mgm_call(handle, restart_reply_v1, "restart all", &args);
handle->read_timeout= timeout;
CHECK_REPLY(reply, -1);
@@ -975,7 +1119,10 @@
const Properties *reply;
const int timeout = handle->read_timeout;
handle->read_timeout= 5*60*1000; // 5 minutes
- reply = ndb_mgm_call(handle, restart_reply, "restart node", &args);
+ if(use_v2)
+ reply = ndb_mgm_call(handle, restart_reply_v2, "restart node v2", &args);
+ else
+ reply = ndb_mgm_call(handle, restart_reply_v1, "restart node", &args);
handle->read_timeout= timeout;
if(reply != NULL) {
BaseString result;
@@ -986,20 +1133,16 @@
return -1;
}
reply->get("restarted", &restarted);
+ if(use_v2)
+ reply->get("disconnect", (Uint32*)disconnect);
+ else
+ *disconnect= 0;
delete reply;
}
return restarted;
}
-extern "C"
-int
-ndb_mgm_restart(NdbMgmHandle handle, int no_of_nodes, const int *node_list)
-{
- SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_restart");
- return ndb_mgm_restart2(handle, no_of_nodes, node_list, 0, 0, 0);
-}
-
static const char *clusterlog_severity_names[]=
{ "enabled", "debug", "info", "warning", "error", "critical", "alert" };
@@ -1051,7 +1194,7 @@
ndb_mgm_get_clusterlog_severity_filter(NdbMgmHandle handle)
{
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing:
ndb_mgm_get_clusterlog_severity_filter");
- static unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]=
+ unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]=
{0,0,0,0,0,0,0};
const ParserRow<ParserDummy> getinfo_reply[] = {
MGM_CMD("clusterlog", NULL, ""),
@@ -1252,7 +1395,7 @@
MGM_END()
};
CHECK_HANDLE(handle, -1);
-
+
const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle);
SocketClient s(hostname, port);
@@ -1274,19 +1417,20 @@
}
args.put("filter", tmp.c_str());
}
-
+
int tmp = handle->socket;
handle->socket = sockfd;
-
+
const Properties *reply;
reply = ndb_mgm_call(handle, stat_reply, "listen event", &args);
-
+
handle->socket = tmp;
-
+
if(reply == NULL) {
close(sockfd);
CHECK_REPLY(reply, -1);
}
+ delete reply;
return sockfd;
}
@@ -1299,34 +1443,7 @@
extern "C"
int
-ndb_mgm_get_stat_port(NdbMgmHandle handle, struct ndb_mgm_reply* /*reply*/)
-{
- SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_get_stat_port");
- const ParserRow<ParserDummy> stat_reply[] = {
- MGM_CMD("error", NULL, ""),
- MGM_ARG("result", String, Mandatory, "Error message"),
- MGM_CMD("get statport reply", NULL, ""),
- MGM_ARG("tcpport", Int, Mandatory, "TCP port for statistics"),
- MGM_END()
- };
- CHECK_HANDLE(handle, -1);
- CHECK_CONNECTED(handle, -1);
-
- Properties args;
- const Properties *reply;
- reply = ndb_mgm_call(handle, stat_reply, "get statport", &args);
- CHECK_REPLY(reply, -1);
-
- Uint32 port;
- reply->get("tcpport", &port);
-
- delete reply;
- return port;
-}
-
-extern "C"
-int
-ndb_mgm_dump_state(NdbMgmHandle handle, int nodeId, int* _args,
+ndb_mgm_dump_state(NdbMgmHandle handle, int nodeId, const int * _args,
int _num_args, struct ndb_mgm_reply* /* reply */)
{
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_dump_state");
@@ -1808,7 +1925,7 @@
}
delete prop;
- return (ndb_mgm_configuration*)cvf.m_cfg;
+ return (ndb_mgm_configuration*)cvf.getConfigValues();
} while(0);
delete prop;
@@ -1868,7 +1985,8 @@
extern "C"
int
-ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
+ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype,
+ int log_event)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
@@ -1888,9 +2006,11 @@
args.put("endian", (endian_check.c[sizeof(long)-1])?"big":"little");
if (handle->m_name)
args.put("name", handle->m_name);
+ args.put("log_event", log_event);
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""),
+ MGM_ARG("error_code", Int, Optional, "Error code"),
MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
@@ -1903,14 +2023,16 @@
nodeid= -1;
do {
const char * buf;
- if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
+ if (!prop->get("result", &buf) || strcmp(buf, "Ok") != 0)
+ {
const char *hostname= ndb_mgm_get_connected_host(handle);
unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err;
+ Uint32 error_code= NDB_MGM_ALLOCID_ERROR;
err.assfmt("Could not alloc node id at %s port %d: %s",
hostname, port, buf);
- setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
- err.c_str());
+ prop->get("error_code", &error_code);
+ setError(handle, error_code, __LINE__, err.c_str());
break;
}
Uint32 _nodeid;
@@ -2171,9 +2293,9 @@
int param,
int value,
struct ndb_mgm_reply* mgmreply){
- DBUG_ENTER("ndb_mgm_set_connection_int_parameter");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
+ DBUG_ENTER("ndb_mgm_set_connection_int_parameter");
Properties args;
args.put("node1", node1);
@@ -2190,7 +2312,7 @@
const Properties *prop;
prop= ndb_mgm_call(handle, reply, "set connection parameter", &args);
- CHECK_REPLY(prop, -1);
+ DBUG_CHECK_REPLY(prop, -1);
int res= -1;
do {
@@ -2214,9 +2336,9 @@
int param,
int *value,
struct ndb_mgm_reply* mgmreply){
- DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
CHECK_HANDLE(handle, -1);
CHECK_CONNECTED(handle, -2);
+ DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
Properties args;
args.put("node1", node1);
@@ -2232,7 +2354,7 @@
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get connection parameter", &args);
- CHECK_REPLY(prop, -3);
+ DBUG_CHECK_REPLY(prop, -3);
int res= -1;
do {
@@ -2280,9 +2402,9 @@
{
Uint32 nodeid=0;
- DBUG_ENTER("ndb_mgm_get_mgmd_nodeid");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
+ DBUG_ENTER("ndb_mgm_get_mgmd_nodeid");
Properties args;
@@ -2294,7 +2416,7 @@
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get mgmd nodeid", &args);
- CHECK_REPLY(prop, 0);
+ DBUG_CHECK_REPLY(prop, 0);
if(!prop->get("nodeid",&nodeid)){
fprintf(handle->errstream, "Unable to get value\n");
@@ -2308,9 +2430,9 @@
extern "C"
int ndb_mgm_report_event(NdbMgmHandle handle, Uint32 *data, Uint32 length)
{
- DBUG_ENTER("ndb_mgm_report_event");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
+ DBUG_ENTER("ndb_mgm_report_event");
Properties args;
args.put("length", length);
@@ -2329,7 +2451,7 @@
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "report event", &args);
- CHECK_REPLY(prop, -1);
+ DBUG_CHECK_REPLY(prop, -1);
DBUG_RETURN(0);
}
@@ -2337,9 +2459,9 @@
extern "C"
int ndb_mgm_end_session(NdbMgmHandle handle)
{
- DBUG_ENTER("ndb_mgm_end_session");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
+ DBUG_ENTER("ndb_mgm_end_session");
SocketOutputStream s_output(handle->socket);
s_output.println("end session");
@@ -2351,6 +2473,58 @@
in.gets(buf, sizeof(buf));
DBUG_RETURN(0);
+}
+
+extern "C"
+int ndb_mgm_get_version(NdbMgmHandle handle,
+ int *major, int *minor, int *build, int len, char* str)
+{
+ DBUG_ENTER("ndb_mgm_get_version");
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("version", NULL, ""),
+ MGM_ARG("id", Int, Mandatory, "ID"),
+ MGM_ARG("major", Int, Mandatory, "Major"),
+ MGM_ARG("minor", Int, Mandatory, "Minor"),
+ MGM_ARG("string", String, Mandatory, "String"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop = ndb_mgm_call(handle, reply, "get version", &args);
+ CHECK_REPLY(prop, 0);
+
+ Uint32 id;
+ if(!prop->get("id",&id)){
+ fprintf(handle->errstream, "Unable to get value\n");
+ return 0;
+ }
+ *build= getBuild(id);
+
+ if(!prop->get("major",(Uint32*)major)){
+ fprintf(handle->errstream, "Unable to get value\n");
+ return 0;
+ }
+
+ if(!prop->get("minor",(Uint32*)minor)){
+ fprintf(handle->errstream, "Unable to get value\n");
+ return 0;
+ }
+
+ BaseString result;
+ if(!prop->get("string", result)){
+ fprintf(handle->errstream, "Unable to get value\n");
+ return 0;
+ }
+
+ strncpy(str, result.c_str(), len);
+
+ delete prop;
+ DBUG_RETURN(1);
}
int ndb_mgm_add_nodegroup(NdbMgmHandle handle,
--- 1.60/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2006-02-21 22:51:15 +11:00
+++ 1.61/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2006-11-23 23:04:17 +11:00
@@ -18,6 +18,7 @@
#include <my_sys.h>
#include <Vector.hpp>
#include <mgmapi.h>
+#include <util/BaseString.hpp>
class MgmtSrvr;
@@ -63,6 +64,9 @@
*/
void analyseAfterFirstToken(int processId, char* allAfterFirstTokenCstr);
+ void executeCommand(Vector<BaseString> &command_list,
+ unsigned command_pos,
+ int *node_ids, int no_of_nodes);
/**
* Parse the block specification part of the LOG* commands,
* things after LOG*: [BLOCK = {ALL|<blockName>+}]
@@ -97,10 +101,14 @@
public:
void executeStop(int processId, const char* parameters, bool all);
+ void executeStop(Vector<BaseString> &command_list, unsigned command_pos,
+ int *node_ids, int no_of_nodes);
void executeEnterSingleUser(char* parameters);
void executeExitSingleUser(char* parameters);
void executeStart(int processId, const char* parameters, bool all);
void executeRestart(int processId, const char* parameters, bool all);
+ void executeRestart(Vector<BaseString> &command_list, unsigned command_pos,
+ int *node_ids, int no_of_nodes);
void executeLogLevel(int processId, const char* parameters, bool all);
void executeError(int processId, const char* parameters, bool all);
void executeLog(int processId, const char* parameters, bool all);
@@ -150,13 +158,21 @@
NdbMgmHandle m_mgmsrv;
NdbMgmHandle m_mgmsrv2;
+ const char *m_constr;
bool m_connected;
int m_verbose;
int try_reconnect;
int m_error;
struct NdbThread* m_event_thread;
+ NdbMutex *m_print_mutex;
};
+struct event_thread_param {
+ NdbMgmHandle *m;
+ NdbMutex **p;
+};
+
+NdbMutex* print_mutex;
/*
* Facade object for CommandInterpreter
@@ -334,25 +350,11 @@
CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
: m_verbose(verbose)
{
- m_mgmsrv = ndb_mgm_create_handle();
- if(m_mgmsrv == NULL) {
- ndbout_c("Cannot create handle to management server.");
- exit(-1);
- }
- m_mgmsrv2 = ndb_mgm_create_handle();
- if(m_mgmsrv2 == NULL) {
- ndbout_c("Cannot create 2:nd handle to management server.");
- exit(-1);
- }
- if (ndb_mgm_set_connectstring(m_mgmsrv, _host))
- {
- printError();
- exit(-1);
- }
-
+ m_constr= _host;
m_connected= false;
m_event_thread= 0;
try_reconnect = 0;
+ m_print_mutex= NdbMutex_Create();
}
/*
@@ -361,8 +363,7 @@
CommandInterpreter::~CommandInterpreter()
{
disconnect();
- ndb_mgm_destroy_handle(&m_mgmsrv);
- ndb_mgm_destroy_handle(&m_mgmsrv2);
+ NdbMutex_Destroy(m_print_mutex);
}
static bool
@@ -384,15 +385,14 @@
void
CommandInterpreter::printError()
{
- if (ndb_mgm_check_connection(m_mgmsrv))
- {
- m_connected= false;
- disconnect();
- }
ndbout_c("* %5d: %s",
ndb_mgm_get_latest_error(m_mgmsrv),
ndb_mgm_get_latest_error_msg(m_mgmsrv));
ndbout_c("* %s", ndb_mgm_get_latest_error_desc(m_mgmsrv));
+ if (ndb_mgm_check_connection(m_mgmsrv))
+ {
+ disconnect();
+ }
}
//*****************************************************************************
@@ -400,11 +400,13 @@
static int do_event_thread;
static void*
-event_thread_run(void* m)
+event_thread_run(void* p)
{
DBUG_ENTER("event_thread_run");
- NdbMgmHandle handle= *(NdbMgmHandle*)m;
+ struct event_thread_param param= *(struct event_thread_param*)p;
+ NdbMgmHandle handle= *(param.m);
+ NdbMutex* printmutex= *(param.p);
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
1, NDB_MGM_EVENT_CATEGORY_STARTUP,
@@ -422,7 +424,11 @@
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
- ndbout << tmp;
+ if(tmp && strlen(tmp))
+ {
+ Guard g(printmutex);
+ ndbout << tmp;
+ }
}
} while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
@@ -436,78 +442,100 @@
}
bool
-CommandInterpreter::connect()
+CommandInterpreter::connect()
{
DBUG_ENTER("CommandInterpreter::connect");
- if(!m_connected)
+
+ if(m_connected)
+ DBUG_RETURN(m_connected);
+
+ m_mgmsrv = ndb_mgm_create_handle();
+ if(m_mgmsrv == NULL) {
+ ndbout_c("Cannot create handle to management server.");
+ exit(-1);
+ }
+ m_mgmsrv2 = ndb_mgm_create_handle();
+ if(m_mgmsrv2 == NULL) {
+ ndbout_c("Cannot create 2:nd handle to management server.");
+ exit(-1);
+ }
+
+ if (ndb_mgm_set_connectstring(m_mgmsrv, m_constr))
+ {
+ printError();
+ exit(-1);
+ }
+
+ if(ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
+ DBUG_RETURN(m_connected); // couldn't connect, always false
+
+ const char *host= ndb_mgm_get_connected_host(m_mgmsrv);
+ unsigned port= ndb_mgm_get_connected_port(m_mgmsrv);
+ BaseString constr;
+ constr.assfmt("%s:%d",host,port);
+ if(!ndb_mgm_set_connectstring(m_mgmsrv2, constr.c_str()) &&
+ !ndb_mgm_connect(m_mgmsrv2, try_reconnect-1, 5, 1))
{
- if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
+ DBUG_PRINT("info",("2:ndb connected to Management Server ok at: %s:%d",
+ host, port));
+ assert(m_event_thread == 0);
+ assert(do_event_thread == 0);
+ do_event_thread= 0;
+ struct event_thread_param p;
+ p.m= &m_mgmsrv2;
+ p.p= &m_print_mutex;
+ m_event_thread = NdbThread_Create(event_thread_run,
+ (void**)&p,
+ 32768,
+ "CommandInterpreted_event_thread",
+ NDB_THREAD_PRIO_LOW);
+ if (m_event_thread != 0)
{
- const char *host= ndb_mgm_get_connected_host(m_mgmsrv);
- unsigned port= ndb_mgm_get_connected_port(m_mgmsrv);
- BaseString constr;
- constr.assfmt("%s:%d",host,port);
- if(!ndb_mgm_set_connectstring(m_mgmsrv2, constr.c_str()) &&
- !ndb_mgm_connect(m_mgmsrv2, try_reconnect-1, 5, 1))
- {
- DBUG_PRINT("info",("2:ndb connected to Management Server ok at: %s:%d",
- host, port));
- assert(m_event_thread == 0);
- assert(do_event_thread == 0);
- do_event_thread= 0;
- m_event_thread = NdbThread_Create(event_thread_run,
- (void**)&m_mgmsrv2,
- 32768,
- "CommandInterpreted_event_thread",
- NDB_THREAD_PRIO_LOW);
- if (m_event_thread != 0)
- {
- DBUG_PRINT("info",("Thread created ok, waiting for started..."));
- int iter= 1000; // try for 30 seconds
- while(do_event_thread == 0 &&
- iter-- > 0)
- NdbSleep_MilliSleep(30);
- }
- if (m_event_thread == 0 ||
- do_event_thread == 0 ||
- do_event_thread == -1)
- {
- DBUG_PRINT("info",("Warning, event thread startup failed, "
- "degraded printouts as result, errno=%d",
- errno));
- printf("Warning, event thread startup failed, "
- "degraded printouts as result, errno=%d\n", errno);
- do_event_thread= 0;
- if (m_event_thread)
- {
- void *res;
- NdbThread_WaitFor(m_event_thread, &res);
- NdbThread_Destroy(&m_event_thread);
- }
- ndb_mgm_disconnect(m_mgmsrv2);
- }
- }
- else
- {
- DBUG_PRINT("warning",
- ("Could not do 2:nd connect to mgmtserver for event listening"));
- DBUG_PRINT("info", ("code: %d, msg: %s",
- ndb_mgm_get_latest_error(m_mgmsrv2),
- ndb_mgm_get_latest_error_msg(m_mgmsrv2)));
- printf("Warning, event connect failed, degraded printouts as result\n");
- printf("code: %d, msg: %s\n",
- ndb_mgm_get_latest_error(m_mgmsrv2),
- ndb_mgm_get_latest_error_msg(m_mgmsrv2));
- }
- m_connected= true;
- DBUG_PRINT("info",("Connected to Management Server at: %s:%d", host, port));
- if (m_verbose)
+ DBUG_PRINT("info",("Thread created ok, waiting for started..."));
+ int iter= 1000; // try for 30 seconds
+ while(do_event_thread == 0 &&
+ iter-- > 0)
+ NdbSleep_MilliSleep(30);
+ }
+ if (m_event_thread == 0 ||
+ do_event_thread == 0 ||
+ do_event_thread == -1)
+ {
+ DBUG_PRINT("info",("Warning, event thread startup failed, "
+ "degraded printouts as result, errno=%d",
+ errno));
+ printf("Warning, event thread startup failed, "
+ "degraded printouts as result, errno=%d\n", errno);
+ do_event_thread= 0;
+ if (m_event_thread)
{
- printf("Connected to Management Server at: %s:%d\n",
- host, port);
+ void *res;
+ NdbThread_WaitFor(m_event_thread, &res);
+ NdbThread_Destroy(&m_event_thread);
}
+ ndb_mgm_disconnect(m_mgmsrv2);
}
}
+ else
+ {
+ DBUG_PRINT("warning",
+ ("Could not do 2:nd connect to mgmtserver for event listening"));
+ DBUG_PRINT("info", ("code: %d, msg: %s",
+ ndb_mgm_get_latest_error(m_mgmsrv2),
+ ndb_mgm_get_latest_error_msg(m_mgmsrv2)));
+ printf("Warning, event connect failed, degraded printouts as result\n");
+ printf("code: %d, msg: %s\n",
+ ndb_mgm_get_latest_error(m_mgmsrv2),
+ ndb_mgm_get_latest_error_msg(m_mgmsrv2));
+ }
+ m_connected= true;
+ DBUG_PRINT("info",("Connected to Management Server at: %s:%d", host, port));
+ if (m_verbose)
+ {
+ printf("Connected to Management Server at: %s:%d\n",
+ host, port);
+ }
+
DBUG_RETURN(m_connected);
}
@@ -515,20 +543,18 @@
CommandInterpreter::disconnect()
{
DBUG_ENTER("CommandInterpreter::disconnect");
+
if (m_event_thread) {
void *res;
do_event_thread= 0;
NdbThread_WaitFor(m_event_thread, &res);
NdbThread_Destroy(&m_event_thread);
m_event_thread= 0;
- ndb_mgm_disconnect(m_mgmsrv2);
+ ndb_mgm_destroy_handle(&m_mgmsrv2);
}
if (m_connected)
{
- if (ndb_mgm_disconnect(m_mgmsrv) == -1) {
- ndbout_c("Could not disconnect from management server");
- printError();
- }
+ ndb_mgm_destroy_handle(&m_mgmsrv);
m_connected= false;
}
DBUG_RETURN(true);
@@ -546,6 +572,7 @@
int result= execute_impl(_line);
if (error)
*error= m_error;
+
return result;
}
@@ -590,9 +617,16 @@
}
} while (do_continue);
// if there is anything in the line proceed
+ Vector<BaseString> command_list;
+ {
+ BaseString tmp(line);
+ tmp.split(command_list);
+ for (unsigned i= 0; i < command_list.size();)
+ command_list[i].c_str()[0] ? i++ : (command_list.erase(i),0);
+ }
char* firstToken = strtok(line, " ");
char* allAfterFirstToken = strtok(NULL, "");
-
+
if (strcasecmp(firstToken, "HELP") == 0 ||
strcasecmp(firstToken, "?") == 0) {
executeHelp(allAfterFirstToken);
@@ -618,6 +652,7 @@
DBUG_RETURN(true);
if (strcasecmp(firstToken, "SHOW") == 0) {
+ Guard g(m_print_mutex);
executeShow(allAfterFirstToken);
DBUG_RETURN(true);
}
@@ -684,22 +719,45 @@
analyseAfterFirstToken(-1, allAfterFirstToken);
} else {
/**
- * First token should be a digit, node ID
+ * First tokens should be digits, node ID's
*/
- int nodeId;
-
- if (! convert(firstToken, nodeId)) {
+ int node_ids[MAX_NODES];
+ unsigned pos;
+ for (pos= 0; pos < command_list.size(); pos++)
+ {
+ int node_id;
+ if (convert(command_list[pos].c_str(), node_id))
+ {
+ if (node_id <= 0) {
+ ndbout << "Invalid node ID: " << command_list[pos].c_str()
+ << "." << endl;
+ DBUG_RETURN(true);
+ }
+ node_ids[pos]= node_id;
+ continue;
+ }
+ break;
+ }
+ int no_of_nodes= pos;
+ if (no_of_nodes == 0)
+ {
+ /* No digit found */
invalid_command(_line);
DBUG_RETURN(true);
}
-
- if (nodeId <= 0) {
- ndbout << "Invalid node ID: " << firstToken << "." << endl;
+ if (pos == command_list.size())
+ {
+ /* No command found */
+ invalid_command(_line);
DBUG_RETURN(true);
}
-
- analyseAfterFirstToken(nodeId, allAfterFirstToken);
-
+ if (no_of_nodes == 1)
+ {
+ analyseAfterFirstToken(node_ids[0], allAfterFirstToken);
+ DBUG_RETURN(true);
+ }
+ executeCommand(command_list, pos, node_ids, no_of_nodes);
+ DBUG_RETURN(true);
}
DBUG_RETURN(true);
}
@@ -769,6 +827,27 @@
ndbout << endl;
}
+void
+CommandInterpreter::executeCommand(Vector<BaseString> &command_list,
+ unsigned command_pos,
+ int *node_ids, int no_of_nodes)
+{
+ const char *cmd= command_list[command_pos].c_str();
+ if (strcasecmp("STOP", cmd) == 0)
+ {
+ executeStop(command_list, command_pos+1, node_ids, no_of_nodes);
+ return;
+ }
+ if (strcasecmp("RESTART", cmd) == 0)
+ {
+ executeRestart(command_list, command_pos+1, node_ids, no_of_nodes);
+ return;
+ }
+ ndbout_c("Invalid command: '%s' after multi node id list. "
+ "Expected STOP or RESTART.", cmd);
+ return;
+}
+
/**
* Get next nodeid larger than the give node_id. node_id will be
* set to the next node_id in the list. node_id should be set
@@ -822,6 +901,7 @@
ndbout_c("Trying to start all nodes of system.");
ndbout_c("Use ALL STATUS to see the system start-up phases.");
} else {
+ Guard g(m_print_mutex);
struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
if(cl == 0){
ndbout_c("Unable get status from management server");
@@ -954,7 +1034,8 @@
NdbAutoPtr<char> ap1((char*)state);
int result = 0;
- result = ndb_mgm_stop(m_mgmsrv, 0, 0);
+ int need_disconnect;
+ result = ndb_mgm_stop3(m_mgmsrv, -1, 0, 0, &need_disconnect);
if (result < 0) {
ndbout << "Shutdown of NDB Cluster node(s) failed." << endl;
printError();
@@ -963,28 +1044,11 @@
ndbout << result << " NDB Cluster node(s) have shutdown." << endl;
- int mgm_id= 0;
- mgm_id= ndb_mgm_get_mgmd_nodeid(m_mgmsrv);
- if (mgm_id == 0)
- {
- ndbout << "Unable to locate management server, "
- << "shutdown manually with <id> STOP"
+ if(need_disconnect) {
+ ndbout << "Disconnecting to allow management server to shutdown."
<< endl;
- return 1;
- }
-
- result = ndb_mgm_stop(m_mgmsrv, 1, &mgm_id);
- if (result <= 0) {
- ndbout << "Shutdown of NDB Cluster management server failed." << endl;
- printError();
- if (result == 0)
- return 1;
- return result;
+ disconnect();
}
-
- m_connected= false;
- disconnect();
- ndbout << "NDB Cluster management server shutdown." << endl;
return 0;
}
@@ -1048,7 +1112,7 @@
}
if (node_state->node_group >= 0) {
ndbout << ", Nodegroup: " << node_state->node_group;
- if (node_state->dynamic_id == master_id)
+ if (master_id && node_state->dynamic_id == master_id)
ndbout << ", Master";
}
}
@@ -1137,6 +1201,7 @@
if(it == 0){
ndbout_c("Unable to create config iterator");
+ ndb_mgm_destroy_configuration(conf);
return;
}
NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it);
@@ -1181,6 +1246,7 @@
print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0);
print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0);
// ndbout << helpTextShow;
+ ndb_mgm_destroy_configuration(conf);
return;
} else if (strcasecmp(parameters, "PROPERTIES") == 0 ||
strcasecmp(parameters, "PROP") == 0) {
@@ -1206,12 +1272,7 @@
{
disconnect();
if (!emptyString(parameters)) {
- if (ndb_mgm_set_connectstring(m_mgmsrv,
- BaseString(parameters).trim().c_str()))
- {
- printError();
- return;
- }
+ m_constr= BaseString(parameters).trim().c_str();
}
connect();
}
@@ -1354,24 +1415,69 @@
//*****************************************************************************
void
-CommandInterpreter::executeStop(int processId, const char *, bool all)
+CommandInterpreter::executeStop(int processId, const char *parameters,
+ bool all)
{
- int result = 0;
- if(all) {
- result = ndb_mgm_stop(m_mgmsrv, 0, 0);
- } else {
- result = ndb_mgm_stop(m_mgmsrv, 1, &processId);
+ Vector<BaseString> command_list;
+ if (parameters)
+ {
+ BaseString tmp(parameters);
+ tmp.split(command_list);
+ for (unsigned i= 0; i < command_list.size();)
+ command_list[i].c_str()[0] ? i++ : (command_list.erase(i),0);
}
- if (result < 0) {
- ndbout << "Shutdown failed." << endl;
+ if (all)
+ executeStop(command_list, 0, 0, 0);
+ else
+ executeStop(command_list, 0, &processId, 1);
+}
+
+void
+CommandInterpreter::executeStop(Vector<BaseString> &command_list,
+ unsigned command_pos,
+ int *node_ids, int no_of_nodes)
+{
+ int need_disconnect;
+ int abort= 0;
+ for (; command_pos < command_list.size(); command_pos++)
+ {
+ const char *item= command_list[command_pos].c_str();
+ if (strcasecmp(item, "-A") == 0)
+ {
+ abort= 1;
+ continue;
+ }
+ ndbout_c("Invalid option: %s. Expecting -A after STOP",
+ item);
+ return;
+ }
+
+ int result= ndb_mgm_stop3(m_mgmsrv, no_of_nodes, node_ids, abort,
+ &need_disconnect);
+ if (result < 0)
+ {
+ ndbout_c("Shutdown failed.");
printError();
- } else
+ }
+ else
+ {
+ if (node_ids == 0)
+ ndbout_c("NDB Cluster has shutdown.");
+ else
{
- if(all)
- ndbout << "NDB Cluster has shutdown." << endl;
- else
- ndbout << "Node " << processId << " has shutdown." << endl;
+ ndbout << "Node";
+ for (int i= 0; i < no_of_nodes; i++)
+ ndbout << " " << node_ids[i];
+ ndbout_c(" has shutdown.");
}
+ }
+
+ if(need_disconnect)
+ {
+ ndbout << "Disconnecting to allow Management Server to shutdown" << endl;
+ disconnect();
+ }
+
}
void
@@ -1437,47 +1543,77 @@
void
CommandInterpreter::executeRestart(int processId, const char* parameters,
- bool all)
+ bool all)
+{
+ Vector<BaseString> command_list;
+ if (parameters)
+ {
+ BaseString tmp(parameters);
+ tmp.split(command_list);
+ for (unsigned i= 0; i < command_list.size();)
+ command_list[i].c_str()[0] ? i++ : (command_list.erase(i),0);
+ }
+ if (all)
+ executeRestart(command_list, 0, 0, 0);
+ else
+ executeRestart(command_list, 0, &processId, 1);
+}
+
+void
+CommandInterpreter::executeRestart(Vector<BaseString> &command_list,
+ unsigned command_pos,
+ int *node_ids, int no_of_nodes)
{
int result;
- int nostart = 0;
- int initialstart = 0;
- int abort = 0;
-
- if(parameters != 0 && strlen(parameters) != 0){
- char * tmpString = my_strdup(parameters,MYF(MY_WME));
- My_auto_ptr<char> ap1(tmpString);
- char * tmpPtr = 0;
- char * item = strtok_r(tmpString, " ", &tmpPtr);
- while(item != NULL){
- if(strcasecmp(item, "-N") == 0)
- nostart = 1;
- if(strcasecmp(item, "-I") == 0)
- initialstart = 1;
- if(strcasecmp(item, "-A") == 0)
- abort = 1;
- item = strtok_r(NULL, " ", &tmpPtr);
+ int nostart= 0;
+ int initialstart= 0;
+ int abort= 0;
+ int need_disconnect= 0;
+
+ for (; command_pos < command_list.size(); command_pos++)
+ {
+ const char *item= command_list[command_pos].c_str();
+ if (strcasecmp(item, "-N") == 0)
+ {
+ nostart= 1;
+ continue;
+ }
+ if (strcasecmp(item, "-I") == 0)
+ {
+ initialstart= 1;
+ continue;
}
+ if (strcasecmp(item, "-A") == 0)
+ {
+ abort= 1;
+ continue;
+ }
+ ndbout_c("Invalid option: %s. Expecting -A,-N or -I after RESTART",
+ item);
+ return;
}
- if(all) {
- result = ndb_mgm_restart2(m_mgmsrv, 0, NULL, initialstart, nostart, abort);
- } else {
- int v[1];
- v[0] = processId;
- result = ndb_mgm_restart2(m_mgmsrv, 1, v, initialstart, nostart, abort);
- }
-
+ result= ndb_mgm_restart3(m_mgmsrv, no_of_nodes, node_ids,
+ initialstart, nostart, abort, &need_disconnect);
+
if (result <= 0) {
- ndbout.println("Restart failed.", result);
+ ndbout_c("Restart failed.");
printError();
- } else
+ }
+ else
+ {
+ if (node_ids == 0)
+ ndbout_c("NDB Cluster is being restarted.");
+ else
{
- if(all)
- ndbout << "NDB Cluster is being restarted." << endl;
- else
- ndbout_c("Node %d is being restarted.", processId);
+ ndbout << "Node";
+ for (int i= 0; i < no_of_nodes; i++)
+ ndbout << " " << node_ids[i];
+ ndbout_c(" is being restarted");
}
+ if(need_disconnect)
+ disconnect();
+ }
}
void
--- 1.92/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2006-11-23 22:40:27 +11:00
+++ 1.93/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2006-11-23 23:04:17 +11:00
@@ -61,9 +61,6 @@
#include <SignalSender.hpp>
-extern bool g_StopServer;
-extern bool g_RestartServer;
-
//#define MGM_SRV_DEBUG
#ifdef MGM_SRV_DEBUG
#define DEBUG(x) do ndbout << x << endl; while(0)
@@ -81,7 +78,6 @@
}\
}
-extern int global_flag_send_heartbeat_now;
extern int g_no_nodeid_checks;
extern my_bool opt_core;
@@ -123,41 +119,50 @@
/**
* Handle started nodes
*/
- EventSubscribeReq req;
- req = m_event_listner[0].m_logLevel;
- req.blockRef = _ownReference;
-
- SetLogLevelOrd ord;
-
m_started_nodes.lock();
- while(m_started_nodes.size() > 0){
- Uint32 node = m_started_nodes[0];
- m_started_nodes.erase(0, false);
- m_started_nodes.unlock();
+ if (m_started_nodes.size() > 0)
+ {
+ // calculate max log level
+ EventSubscribeReq req;
+ {
+ LogLevel tmp;
+ m_event_listner.lock();
+ for(int i = m_event_listner.m_clients.size() - 1; i >= 0; i--)
+ tmp.set_max(m_event_listner[i].m_logLevel);
+ m_event_listner.unlock();
+ req = tmp;
+ }
+ req.blockRef = _ownReference;
+ while (m_started_nodes.size() > 0)
+ {
+ Uint32 node = m_started_nodes[0];
+ m_started_nodes.erase(0, false);
+ m_started_nodes.unlock();
- setEventReportingLevelImpl(node, req);
-
- ord = m_nodeLogLevel[node];
- setNodeLogLevelImpl(node, ord);
-
- m_started_nodes.lock();
- }
+ setEventReportingLevelImpl(node, req);
+
+ SetLogLevelOrd ord;
+ ord = m_nodeLogLevel[node];
+ setNodeLogLevelImpl(node, ord);
+
+ m_started_nodes.lock();
+ }
+ }
m_started_nodes.unlock();
m_log_level_requests.lock();
- while(m_log_level_requests.size() > 0){
- req = m_log_level_requests[0];
+ while (m_log_level_requests.size() > 0)
+ {
+ EventSubscribeReq req = m_log_level_requests[0];
m_log_level_requests.erase(0, false);
m_log_level_requests.unlock();
-
- LogLevel tmp;
- tmp = req;
-
+
if(req.blockRef == 0){
req.blockRef = _ownReference;
setEventReportingLevelImpl(0, req);
} else {
- ord = req;
+ SetLogLevelOrd ord;
+ ord = req;
setNodeLogLevelImpl(req.blockRef, ord);
}
m_log_level_requests.lock();
@@ -295,6 +300,8 @@
{MgmtSrvr::SYSTEM_SHUTDOWN_IN_PROGRESS, "System shutdown in progress" },
{MgmtSrvr::NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH,
"Node shutdown would cause system crash" },
+ {MgmtSrvr::UNSUPPORTED_NODE_SHUTDOWN,
+ "Unsupported multi node shutdown. Abort option required." },
{MgmtSrvr::NODE_NOT_API_NODE, "The specified node is not an API node." },
{MgmtSrvr::OPERATION_NOT_ALLOWED_START_STOP,
"Operation not allowed while nodes are starting or stopping."},
@@ -313,6 +320,9 @@
case StopRef::NodeShutdownWouldCauseSystemCrash:
return NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH;
break;
+ case StopRef::UnsupportedNodeShutdown:
+ return UNSUPPORTED_NODE_SHUTDOWN;
+ break;
}
return 4999;
}
@@ -387,8 +397,9 @@
_ownReference(0),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
+ m_local_mgm_handle(0),
m_event_listner(this),
- m_local_mgm_handle(0)
+ m_master_node(0)
{
DBUG_ENTER("MgmtSrvr::MgmtSrvr");
@@ -498,9 +509,10 @@
if (_ownNodeId == 0) // we did not get node id from other server
{
NodeId tmp= m_config_retriever->get_configuration_nodeid();
+ int error_code;
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
- 0, 0, error_string)){
+ 0, 0, error_code, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
require(false);
@@ -561,8 +573,7 @@
DBUG_RETURN(false);
}
}
- theFacade= TransporterFacade::theFacadeInstance
- = new TransporterFacade();
+ theFacade= new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
@@ -674,23 +685,16 @@
int MgmtSrvr::okToSendTo(NodeId nodeId, bool unCond)
{
- if(nodeId == 0)
- return 0;
-
- if (getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB)
+ if(nodeId == 0 || getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB)
return WRONG_PROCESS_TYPE;
-
// Check if we have contact with it
if(unCond){
if(theFacade->theClusterMgr->getNodeInfo(nodeId).connected)
return 0;
- return NO_CONTACT_WITH_PROCESS;
}
- if (theFacade->get_node_alive(nodeId) == 0) {
- return NO_CONTACT_WITH_PROCESS;
- } else {
+ else if (theFacade->get_node_alive(nodeId) == true)
return 0;
- }
+ return NO_CONTACT_WITH_PROCESS;
}
void report_unknown_signal(SimpleSignal *signal)
@@ -930,18 +934,32 @@
* client connection to that mgmd and stop it that way.
* This allows us to stop mgm servers when there isn't any real
* distributed communication up.
+ *
+ * node_ids.size()==0 means to stop all DB nodes.
+ * MGM nodes will *NOT* be stopped.
+ *
+ * If we work out we should be stopping or restarting ourselves,
+ * we return <0 in stopSelf for restart, >0 for stop
+ * and 0 for do nothing.
*/
-int MgmtSrvr::sendSTOP_REQ(NodeId nodeId,
+int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids,
NodeBitmask &stoppedNodes,
Uint32 singleUserNodeId,
bool abort,
bool stop,
bool restart,
bool nostart,
- bool initialStart)
+ bool initialStart,
+ int* stopSelf)
{
int error = 0;
+ DBUG_ENTER("MgmtSrvr::sendSTOP_REQ");
+ DBUG_PRINT("enter", ("no of nodes: %d singleUseNodeId: %d "
+ "abort: %d stop: %d restart: %d "
+ "nostart: %d initialStart: %d",
+ node_ids.size(), singleUserNodeId,
+ abort, stop, restart, nostart, initialStart));
stoppedNodes.clear();
@@ -979,36 +997,54 @@
// send the signals
NodeBitmask nodes;
- if (nodeId)
- {
- if(nodeId==getOwnNodeId())
- {
- if(restart)
- g_RestartServer= true;
- g_StopServer= true;
- return 0;
- }
- if(getNodeType(nodeId) == NDB_MGM_NODE_TYPE_NDB)
- {
- int r;
- if((r= okToSendTo(nodeId, true)) != 0)
- return r;
- if (ss.sendSignal(nodeId, &ssig) != SEND_OK)
- return SEND_OR_RECEIVE_FAILED;
+ NodeId nodeId= 0;
+ int use_master_node= 0;
+ int do_send= 0;
+ *stopSelf= 0;
+ NdbNodeBitmask nodes_to_stop;
+ {
+ for (unsigned i= 0; i < node_ids.size(); i++)
+ {
+ nodeId= node_ids[i];
+ ndbout << "asked to stop " << nodeId << endl;
+ if (getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM)
+ nodes_to_stop.set(nodeId);
+ else if (nodeId != getOwnNodeId())
+ {
+ error= sendStopMgmd(nodeId, abort, stop, restart,
+ nostart, initialStart);
+ if (error == 0)
+ stoppedNodes.set(nodeId);
+ }
+ else
+ {
+ ndbout << "which is me" << endl;
+ *stopSelf= (restart)? -1 : 1;
+ stoppedNodes.set(nodeId);
+ }
}
- else if(getNodeType(nodeId) == NDB_MGM_NODE_TYPE_MGM)
+ }
+ int no_of_nodes_to_stop= nodes_to_stop.count();
+ if (node_ids.size())
+ {
+ if (no_of_nodes_to_stop)
{
- error= sendStopMgmd(nodeId, abort, stop, restart, nostart, initialStart);
- if(error==0)
- stoppedNodes.set(nodeId);
- return error;
+ do_send= 1;
+ if (no_of_nodes_to_stop == 1)
+ {
+ nodeId= nodes_to_stop.find(0);
+ }
+ else // multi node stop, send to master
+ {
+ use_master_node= 1;
+ nodes_to_stop.copyto(NdbNodeBitmask::Size, stopReq->nodes);
+ StopReq::setStopNodes(stopReq->requestInfo, 1);
+ }
}
- else
- return WRONG_PROCESS_TYPE;
- nodes.set(nodeId);
}
else
{
+ nodeId= 0;
while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB))
{
if(okToSendTo(nodeId, true) == 0)
@@ -1018,19 +1054,33 @@
nodes.set(nodeId);
}
}
- nodeId= 0;
- while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_MGM))
- {
- if(nodeId==getOwnNodeId())
- continue;
- if(sendStopMgmd(nodeId, abort, stop, restart, nostart, initialStart)==0)
- stoppedNodes.set(nodeId);
- }
}
// now wait for the replies
- while (!nodes.isclear())
+ while (!nodes.isclear() || do_send)
{
+ if (do_send)
+ {
+ int r;
+ assert(nodes.count() == 0);
+ if (use_master_node)
+ nodeId= m_master_node;
+ if ((r= okToSendTo(nodeId, true)) != 0)
+ {
+ bool next;
+ if (!use_master_node)
+ DBUG_RETURN(r);
+ m_master_node= nodeId= 0;
+ while((next= getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true
&&
+ (r= okToSendTo(nodeId, true)) != 0);
+ if (!next)
+ DBUG_RETURN(NO_CONTACT_WITH_DB_NODES);
+ }
+ if (ss.sendSignal(nodeId, &ssig) != SEND_OK)
+ DBUG_RETURN(SEND_OR_RECEIVE_FAILED);
+ nodes.set(nodeId);
+ do_send= 0;
+ }
SimpleSignal *signal = ss.waitFor();
int gsn = signal->readSignalNumber();
switch (gsn) {
@@ -1042,6 +1092,13 @@
#endif
assert(nodes.get(nodeId));
nodes.clear(nodeId);
+ if (ref->errorCode == StopRef::MultiNodeShutdownNotMaster)
+ {
+ assert(use_master_node);
+ m_master_node= ref->masterNodeId;
+ do_send= 1;
+ continue;
+ }
error = translateStopRef(ref->errorCode);
break;
}
@@ -1052,40 +1109,32 @@
ndbout_c("Node %d single user mode", nodeId);
#endif
assert(nodes.get(nodeId));
- assert(singleUserNodeId != 0);
+ if (singleUserNodeId != 0)
+ {
+ stoppedNodes.set(nodeId);
+ }
+ else
+ {
+ assert(no_of_nodes_to_stop > 1);
+ stoppedNodes.bitOR(nodes_to_stop);
+ }
nodes.clear(nodeId);
- stoppedNodes.set(nodeId);
break;
}
case GSN_NF_COMPLETEREP:{
const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
- ndbout_c("Node %d fail completed", rep->failedNodeId);
+ ndbout_c("sendSTOP_REQ Node %d fail completed", rep->failedNodeId);
#endif
+ nodes.clear(rep->failedNodeId); // clear the failed node
+ if (singleUserNodeId == 0)
+ stoppedNodes.set(rep->failedNodeId);
break;
}
case GSN_NODE_FAILREP:{
const NodeFailRep * const rep =
CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
- NodeBitmask failedNodes;
- failedNodes.assign(NodeBitmask::Size, rep->theNodes);
-#ifdef VM_TRACE
- {
- ndbout << "Failed nodes:";
- for (unsigned i = 0; i < 32*NodeBitmask::Size; i++)
- if(failedNodes.get(i))
- ndbout << " " << i;
- ndbout << endl;
- }
-#endif
- failedNodes.bitAND(nodes);
- if (!failedNodes.isclear())
- {
- nodes.bitANDC(failedNodes); // clear the failed nodes
- if (singleUserNodeId == 0)
- stoppedNodes.bitOR(failedNodes);
- }
break;
}
default:
@@ -1093,17 +1142,22 @@
#ifdef VM_TRACE
ndbout_c("Unknown signal %d", gsn);
#endif
- return SEND_OR_RECEIVE_FAILED;
+ DBUG_RETURN(SEND_OR_RECEIVE_FAILED);
}
}
- return error;
+ if (error && *stopSelf)
+ {
+ *stopSelf= 0;
+ }
+ DBUG_RETURN(error);
}
/*
- * Stop one node
+ * Stop one nodes
*/
-int MgmtSrvr::stopNode(int nodeId, bool abort)
+int MgmtSrvr::stopNodes(const Vector<NodeId> &node_ids,
+ int *stopCount, bool abort, int* stopSelf)
{
if (!abort)
{
@@ -1118,31 +1172,62 @@
}
}
NodeBitmask nodes;
- return sendSTOP_REQ(nodeId,
- nodes,
- 0,
- abort,
- false,
- false,
- false,
- false);
+ int ret= sendSTOP_REQ(node_ids,
+ nodes,
+ 0,
+ abort,
+ false,
+ false,
+ false,
+ false,
+ stopSelf);
+ if (stopCount)
+ *stopCount= nodes.count();
+ return ret;
+}
+
+int MgmtSrvr::shutdownMGM(int *stopCount, bool abort, int *stopSelf)
+{
+ NodeId nodeId = 0;
+ int error;
+
+ while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_MGM))
+ {
+ if(nodeId==getOwnNodeId())
+ continue;
+ error= sendStopMgmd(nodeId, abort, true, false,
+ false, false);
+ if (error == 0)
+ *stopCount++;
+ }
+
+ *stopSelf= 1;
+ *stopCount++;
+
+ return 0;
}
/*
- * Perform system shutdown
+ * Perform DB nodes shutdown.
+ * MGM servers are left in their current state
*/
-int MgmtSrvr::stop(int * stopCount, bool abort)
+int MgmtSrvr::shutdownDB(int * stopCount, bool abort)
{
NodeBitmask nodes;
- int ret = sendSTOP_REQ(0,
+ Vector<NodeId> node_ids;
+
+ int tmp;
+
+ int ret = sendSTOP_REQ(node_ids,
nodes,
0,
abort,
true,
false,
false,
- false);
+ false,
+ &tmp);
if (stopCount)
*stopCount = nodes.count();
return ret;
@@ -1166,14 +1251,17 @@
return OPERATION_NOT_ALLOWED_START_STOP;
}
NodeBitmask nodes;
- int ret = sendSTOP_REQ(0,
+ Vector<NodeId> node_ids;
+ int stopSelf;
+ int ret = sendSTOP_REQ(node_ids,
nodes,
singleUserNodeId,
false,
false,
false,
false,
- false);
+ false,
+ &stopSelf);
if (stopCount)
*stopCount = nodes.count();
return ret;
@@ -1183,36 +1271,82 @@
* Perform node restart
*/
-int MgmtSrvr::restartNode(int nodeId, bool nostart, bool initialStart,
- bool abort)
+int MgmtSrvr::restartNodes(const Vector<NodeId> &node_ids,
+ int * stopCount, bool nostart,
+ bool initialStart, bool abort,
+ int *stopSelf)
{
NodeBitmask nodes;
- return sendSTOP_REQ(nodeId,
- nodes,
- 0,
- abort,
- false,
- true,
- nostart,
- initialStart);
+ int ret= sendSTOP_REQ(node_ids,
+ nodes,
+ 0,
+ abort,
+ false,
+ true,
+ true,
+ initialStart,
+ stopSelf);
+
+ if (ret)
+ return ret;
+
+ if (stopCount)
+ *stopCount = nodes.count();
+
+ // start up the nodes again
+ int waitTime = 12000;
+ NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
+ for (unsigned i = 0; i < node_ids.size(); i++)
+ {
+ NodeId nodeId= node_ids[i];
+ enum ndb_mgm_node_status s;
+ s = NDB_MGM_NODE_STATUS_NO_CONTACT;
+#ifdef VM_TRACE
+ ndbout_c("Waiting for %d not started", nodeId);
+#endif
+ while (s != NDB_MGM_NODE_STATUS_NOT_STARTED && waitTime > 0)
+ {
+ Uint32 startPhase = 0, version = 0, dynamicId = 0, nodeGroup = 0;
+ Uint32 connectCount = 0;
+ bool system;
+ const char *address;
+ status(nodeId, &s, &version, &startPhase,
+ &system, &dynamicId, &nodeGroup, &connectCount,
&address);
+ NdbSleep_MilliSleep(100);
+ waitTime = (maxTime - NdbTick_CurrentMillisecond());
+ }
+ }
+
+ if (nostart)
+ return 0;
+
+ for (unsigned i = 0; i < node_ids.size(); i++)
+ {
+ int result = start(node_ids[i]);
+ }
+ return 0;
}
/*
- * Perform system restart
+ * Perform restart of all DB nodes
*/
-int MgmtSrvr::restart(bool nostart, bool initialStart,
- bool abort, int * stopCount )
+int MgmtSrvr::restartDB(bool nostart, bool initialStart,
+ bool abort, int * stopCount)
{
NodeBitmask nodes;
- int ret = sendSTOP_REQ(0,
+ Vector<NodeId> node_ids;
+ int tmp;
+
+ int ret = sendSTOP_REQ(node_ids,
nodes,
0,
abort,
true,
true,
true,
- initialStart);
+ initialStart,
+ &tmp);
if (ret)
return ret;
@@ -1316,6 +1450,12 @@
#include <ClusterMgr.hpp>
+void
+MgmtSrvr::updateStatus()
+{
+ theFacade->theClusterMgr->forceHB();
+}
+
int
MgmtSrvr::status(int nodeId,
ndb_mgm_node_status * _status,
@@ -1408,7 +1548,8 @@
MgmtSrvr::setEventReportingLevelImpl(int nodeId,
const EventSubscribeReq& ll)
{
- INIT_SIGNAL_SENDER(ss,nodeId);
+ SignalSender ss(theFacade);
+ ss.lock();
SimpleSignal ssig;
EventSubscribeReq * dst =
@@ -1417,41 +1558,54 @@
EventSubscribeReq::SignalLength);
*dst = ll;
- send(ss,ssig,nodeId,NODE_TYPE_DB);
+ NodeBitmask nodes;
+ nodes.clear();
+ Uint32 max = (nodeId == 0) ? (nodeId = 1, MAX_NDB_NODES) : nodeId;
+ for(; nodeId <= max; nodeId++)
+ {
+ if (nodeTypes[nodeId] != NODE_TYPE_DB)
+ continue;
+ if (okToSendTo(nodeId, true))
+ continue;
+ if (ss.sendSignal(nodeId, &ssig) == SEND_OK)
+ {
+ nodes.set(nodeId);
+ }
+ }
-#if 0
- while (1)
+ int error = 0;
+ while (!nodes.isclear())
{
SimpleSignal *signal = ss.waitFor();
int gsn = signal->readSignalNumber();
- switch (gsn) {
+ nodeId = refToNode(signal->header.theSendersBlockRef);
+ switch (gsn) {
case GSN_EVENT_SUBSCRIBE_CONF:{
+ nodes.clear(nodeId);
break;
}
case GSN_EVENT_SUBSCRIBE_REF:{
- return SEND_OR_RECEIVE_FAILED;
+ nodes.clear(nodeId);
+ error = 1;
+ break;
}
case GSN_NF_COMPLETEREP:{
const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
- if (rep->failedNodeId == nodeId)
- return SEND_OR_RECEIVE_FAILED;
+ nodes.clear(rep->failedNodeId);
break;
}
case GSN_NODE_FAILREP:{
- const NodeFailRep * const rep =
- CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
- if (NodeBitmask::get(rep->theNodes,nodeId))
- return SEND_OR_RECEIVE_FAILED;
+ // ignore, NF_COMPLETEREP will arrive later
break;
}
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
}
-
}
-#endif
+ if (error)
+ return SEND_OR_RECEIVE_FAILED;
return 0;
}
@@ -1471,19 +1625,6 @@
return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}
-int
-MgmtSrvr::send(SignalSender &ss, SimpleSignal &ssig, Uint32 node, Uint32
node_type){
- Uint32 max = (node == 0) ? MAX_NODES : node + 1;
-
- for(; node < max; node++){
- while(nodeTypes[node] != (int)node_type && node < max) node++;
- if(nodeTypes[node] != (int)node_type)
- break;
- ss.sendSignal(node, &ssig);
- }
- return 0;
-}
-
//****************************************************************************
//****************************************************************************
@@ -1836,7 +1977,7 @@
}
int
-MgmtSrvr::alloc_node_id_req(Uint32 free_node_id)
+MgmtSrvr::alloc_node_id_req(NodeId free_node_id, enum ndb_mgm_node_type type)
{
SignalSender ss(theFacade);
ss.lock(); // lock will be released on exit
@@ -1849,6 +1990,7 @@
req->senderRef = ss.getOwnRef();
req->senderData = 19;
req->nodeId = free_node_id;
+ req->nodeType = type;
int do_send = 1;
NodeId nodeId = 0;
@@ -1922,7 +2064,8 @@
enum ndb_mgm_node_type type,
struct sockaddr *client_addr,
SOCKET_SIZE_TYPE *client_addr_len,
- BaseString &error_string)
+ int &error_code, BaseString &error_string,
+ int log_event)
{
DBUG_ENTER("MgmtSrvr::alloc_node_id");
DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d",
@@ -1931,6 +2074,7 @@
if (*nodeId == 0) {
error_string.appfmt("no-nodeid-checks set in management server.\n"
"node id must be set explicitly in connectstring");
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
DBUG_RETURN(true);
@@ -1955,8 +2099,10 @@
if(NdbMutex_Lock(m_configMutex))
{
+ // should not happen
error_string.appfmt("unable to lock configuration mutex");
- return false;
+ error_code = NDB_MGM_ALLOCID_ERROR;
+ DBUG_RETURN(false);
}
ndb_mgm_configuration_iterator
iter(* _config->m_configValues, CFG_SECTION_NODE);
@@ -2027,6 +2173,7 @@
"or specifying unique host names in config file.",
id_found, tmp);
NdbMutex_Unlock(m_configMutex);
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
if (config_hostname == 0) {
@@ -2035,6 +2182,7 @@
"or specifying unique host names in config file,\n"
"or specifying just one mgmt server in config file.",
tmp);
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
id_found= tmp; // mgmt server matched, check for more matches
@@ -2043,7 +2191,7 @@
if (id_found && client_addr != 0)
{
- int res = alloc_node_id_req(id_found);
+ int res = alloc_node_id_req(id_found, type);
unsigned save_id_found = id_found;
switch (res)
{
@@ -2109,15 +2257,16 @@
char tmp_str[128];
m_reserved_nodes.getText(tmp_str);
- g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes
%s.",
- id_found, get_connect_address(id_found), tmp_str);
+ g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, "
+ "m_reserved_nodes %s.",
+ id_found, get_connect_address(id_found), tmp_str);
DBUG_RETURN(true);
}
if (found_matching_type && !found_free_node) {
// we have a temporary error which might be due to that
// we have got the latest connect status from db-nodes. Force update.
- global_flag_send_heartbeat_now= 1;
+ updateStatus();
}
BaseString type_string, type_c_string;
@@ -2130,26 +2279,48 @@
type_c_string.assfmt("%s(%s)", alias, str);
}
- if (*nodeId == 0) {
+ if (*nodeId == 0)
+ {
if (found_matching_id)
+ {
if (found_matching_type)
+ {
if (found_free_node)
+ {
error_string.appfmt("Connection done from wrong host ip %s.",
(client_addr)?
- inet_ntoa(((struct sockaddr_in *)
+ inet_ntoa(((struct sockaddr_in *)
(client_addr))->sin_addr):"");
+ error_code = NDB_MGM_ALLOCID_ERROR;
+ }
else
+ {
error_string.appfmt("No free node id found for %s.",
type_string.c_str());
+ error_code = NDB_MGM_ALLOCID_ERROR;
+ }
+ }
else
+ {
error_string.appfmt("No %s node defined in config file.",
type_string.c_str());
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
+ }
+ }
else
+ {
error_string.append("No nodes defined in config file.");
- } else {
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
+ }
+ }
+ else
+ {
if (found_matching_id)
+ {
if (found_matching_type)
- if (found_free_node) {
+ {
+ if (found_free_node)
+ {
// have to split these into two since inet_ntoa overwrites itself
error_string.appfmt("Connection with id %d done from wrong host ip %s,",
*nodeId, inet_ntoa(((struct sockaddr_in *)
@@ -2157,27 +2328,44 @@
error_string.appfmt(" expected %s(%s).", config_hostname,
r_config_addr ?
"lookup failed" : inet_ntoa(config_addr));
- } else
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
+ }
+ else
+ {
error_string.appfmt("Id %d already allocated by another node.",
*nodeId);
+ error_code = NDB_MGM_ALLOCID_ERROR;
+ }
+ }
else
+ {
error_string.appfmt("Id %d configured as %s, connect attempted as %s.",
*nodeId, type_c_string.c_str(),
type_string.c_str());
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
+ }
+ }
else
+ {
error_string.appfmt("No node defined with id=%d in config file.",
*nodeId);
+ error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
+ }
}
- g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. "
- "Returned error string \"%s\"",
- *nodeId,
- client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) :
"<none>",
- error_string.c_str());
-
- NodeBitmask connected_nodes2;
- get_connected_nodes(connected_nodes2);
+ if (log_event || error_code == NDB_MGM_ALLOCID_CONFIG_MISMATCH)
{
+ g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s."
+ " Returned error string \"%s\"",
+ *nodeId,
+ client_addr != 0
+ ? inet_ntoa(((struct sockaddr_in *)
+ (client_addr))->sin_addr)
+ : "<none>",
+ error_string.c_str());
+
+ NodeBitmask connected_nodes2;
+ get_connected_nodes(connected_nodes2);
BaseString tmp_connected, tmp_not_connected;
for(Uint32 i = 0; i < MAX_NODES; i++)
{
@@ -2243,12 +2431,16 @@
SignalSender ss(theFacade);
ss.lock(); // lock will be released on exit
- bool next;
- NodeId nodeId = 0;
- while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
- theFacade->get_node_alive(nodeId) == false);
-
- if(!next) return NO_CONTACT_WITH_DB_NODES;
+ NodeId nodeId = m_master_node;
+ if (okToSendTo(nodeId, false) != 0)
+ {
+ bool next;
+ nodeId = m_master_node = 0;
+ while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
+ okToSendTo(nodeId, false) != 0);
+ if(!next)
+ return NO_CONTACT_WITH_DB_NODES;
+ }
SimpleSignal ssig;
BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend());
@@ -2301,14 +2493,20 @@
event.Event = BackupEvent::BackupCompleted;
event.Completed.BackupId = rep->backupId;
- event.Completed.NoOfBytes = rep->noOfBytes;
+ event.Completed.NoOfBytes = rep->noOfBytesLow;
event.Completed.NoOfLogBytes = rep->noOfLogBytes;
- event.Completed.NoOfRecords = rep->noOfRecords;
+ event.Completed.NoOfRecords = rep->noOfRecordsLow;
event.Completed.NoOfLogRecords = rep->noOfLogRecords;
event.Completed.stopGCP = rep->stopGCP;
event.Completed.startGCP = rep->startGCP;
event.Nodes = rep->nodes;
+ if (signal->header.theLength >= BackupCompleteRep::SignalLength)
+ {
+ event.Completed.NoOfBytes += ((Uint64)rep->noOfBytesHigh) << 32;
+ event.Completed.NoOfRecords += ((Uint64)rep->noOfRecordsHigh) << 32;
+ }
+
backupId = rep->backupId;
return 0;
}
@@ -2316,7 +2514,7 @@
const BackupRef * const ref =
CAST_CONSTPTR(BackupRef, signal->getDataPtr());
if(ref->errorCode == BackupRef::IAmNotMaster){
- nodeId = refToNode(ref->masterRef);
+ m_master_node = nodeId = refToNode(ref->masterRef);
#ifdef VM_TRACE
ndbout_c("I'm not master resending to %d", nodeId);
#endif
@@ -2373,6 +2571,7 @@
MgmtSrvr::abortBackup(Uint32 backupId)
{
SignalSender ss(theFacade);
+ ss.lock(); // lock will be released on exit
bool next;
NodeId nodeId = 0;
@@ -2400,6 +2599,8 @@
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m)
{
+ m_reserved_nodes.clear();
+ m_alloc_timeout= 0;
}
MgmtSrvr::Allocated_resources::~Allocated_resources()
@@ -2408,7 +2609,7 @@
if (!m_reserved_nodes.isclear()) {
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
// node has been reserved, force update signal to ndb nodes
- global_flag_send_heartbeat_now= 1;
+ m_mgmsrv.updateStatus();
char tmp_str[128];
m_mgmsrv.m_reserved_nodes.getText(tmp_str);
@@ -2418,9 +2619,22 @@
}
void
-MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
+MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout)
{
m_reserved_nodes.set(id);
+ m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout;
+}
+
+bool
+MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick)
+{
+ if (m_alloc_timeout && tick > m_alloc_timeout)
+ {
+ g_eventLogger.info("Mgmt server state: nodeid %d timed out.",
+ get_nodeid());
+ return true;
+ }
+ return false;
}
NodeId
--- 1.42/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2006-02-21 22:51:15 +11:00
+++ 1.43/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2006-11-23 23:04:17 +11:00
@@ -106,7 +106,8 @@
~Allocated_resources();
// methods to reserve/allocate resources which
// will be freed when running destructor
- void reserve_node(NodeId id);
+ void reserve_node(NodeId id, NDB_TICKS timeout);
+ bool is_timed_out(NDB_TICKS tick);
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear();
}
bool isclear() { return m_reserved_nodes.isclear(); }
@@ -114,6 +115,7 @@
private:
MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes;
+ NDB_TICKS m_alloc_timeout;
};
NdbMutex *m_node_id_mutex;
@@ -176,6 +178,7 @@
STATIC_CONST( NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH = 5028 );
STATIC_CONST( NO_CONTACT_WITH_DB_NODES = 5030 );
+ STATIC_CONST( UNSUPPORTED_NODE_SHUTDOWN = 5031 );
STATIC_CONST( NODE_NOT_API_NODE = 5062 );
STATIC_CONST( OPERATION_NOT_ALLOWED_START_STOP = 5063 );
@@ -252,12 +255,15 @@
* @param processId: Id of the DB process to stop
* @return 0 if succeeded, otherwise: as stated above, plus:
*/
- int stopNode(int nodeId, bool abort = false);
+ int stopNodes(const Vector<NodeId> &node_ids, int *stopCount, bool abort,
+ int *stopSelf);
+
+ int shutdownMGM(int *stopCount, bool abort, int *stopSelf);
/**
- * Stop the system
+ * shutdown the DB nodes
*/
- int stop(int * cnt = 0, bool abort = false);
+ int shutdownDB(int * cnt = 0, bool abort = false);
/**
* print version info about a node
@@ -286,18 +292,19 @@
int start(int processId);
/**
- * Restart a node
+ * Restart nodes
* @param processId: Id of the DB process to start
*/
- int restartNode(int processId, bool nostart, bool initialStart,
- bool abort = false);
+ int restartNodes(const Vector<NodeId> &node_ids,
+ int *stopCount, bool nostart,
+ bool initialStart, bool abort, int *stopSelf);
/**
- * Restart the system
+ * Restart all DB nodes
*/
- int restart(bool nostart, bool initialStart,
- bool abort = false,
- int * stopCount = 0);
+ int restartDB(bool nostart, bool initialStart,
+ bool abort = false,
+ int * stopCount = 0);
struct BackupEvent {
enum Event {
@@ -316,9 +323,9 @@
Uint32 ErrorCode;
} FailedToStart ;
struct {
+ Uint64 NoOfBytes;
+ Uint64 NoOfRecords;
Uint32 BackupId;
- Uint32 NoOfBytes;
- Uint32 NoOfRecords;
Uint32 NoOfLogBytes;
Uint32 NoOfLogRecords;
Uint32 startGCP;
@@ -425,8 +432,10 @@
*/
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool alloc_node_id(NodeId * _nodeId, enum ndb_mgm_node_type type,
- struct sockaddr *client_addr, SOCKET_SIZE_TYPE *client_addr_len,
- BaseString &error_string);
+ struct sockaddr *client_addr,
+ SOCKET_SIZE_TYPE *client_addr_len,
+ int &error_code, BaseString &error_string,
+ int log_event = 1);
/**
*
@@ -478,12 +487,12 @@
void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; }
+ void updateStatus();
+
//**************************************************************************
private:
//**************************************************************************
- int send(SignalSender &ss, SimpleSignal &ssig, Uint32 node, Uint32 node_type);
-
int sendStopMgmd(NodeId nodeId,
bool abort,
bool stop,
@@ -491,14 +500,15 @@
bool nostart,
bool initialStart);
- int sendSTOP_REQ(NodeId nodeId,
+ int sendSTOP_REQ(const Vector<NodeId> &node_ids,
NodeBitmask &stoppedNodes,
Uint32 singleUserNodeId,
bool abort,
bool stop,
bool restart,
bool nostart,
- bool initialStart);
+ bool initialStart,
+ int *stopSelf);
/**
* Check if it is possible to send a signal to a (DB) process
@@ -516,7 +526,7 @@
*/
int getBlockNumber(const BaseString &blockName);
- int alloc_node_id_req(Uint32 free_node_id);
+ int alloc_node_id_req(NodeId free_node_id, enum ndb_mgm_node_type type);
//**************************************************************************
int _blockNumber;
@@ -651,6 +661,8 @@
friend class Ndb_mgmd_event_service;
Ndb_mgmd_event_service m_event_listner;
+ NodeId m_master_node;
+
/**
* Handles the thread wich upon a 'Node is started' event will
* set the node's previous loglevel settings.
--- 1.60/storage/ndb/src/mgmsrv/Services.cpp 2006-02-21 22:51:15 +11:00
+++ 1.61/storage/ndb/src/mgmsrv/Services.cpp 2006-11-23 23:04:17 +11:00
@@ -35,6 +35,7 @@
#include <base64.h>
extern bool g_StopServer;
+extern bool g_RestartServer;
extern EventLogger g_eventLogger;
static const unsigned int MAX_READ_TIMEOUT = 1000 ;
@@ -121,8 +122,6 @@
const
ParserRow<MgmApiSession> commands[] = {
- MGM_CMD("get statport", &MgmApiSession::getStatPort, ""),
-
MGM_CMD("get config", &MgmApiSession::getConfig, ""),
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("node", Int, Optional, "Node ID"),
@@ -137,6 +136,8 @@
MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_ARG("endian", String, Optional, "Endianness"),
MGM_ARG("name", String, Optional, "Name of connection"),
+ MGM_ARG("timeout", Int, Optional, "Timeout in seconds"),
+ MGM_ARG("log_event", Int, Optional, "Log failure in cluster log"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""),
@@ -144,7 +145,13 @@
MGM_CMD("get info clusterlog", &MgmApiSession::getInfoClusterLog, ""),
- MGM_CMD("restart node", &MgmApiSession::restart, ""),
+ MGM_CMD("restart node", &MgmApiSession::restart_v1, ""),
+ MGM_ARG("node", String, Mandatory, "Nodes to restart"),
+ MGM_ARG("initialstart", Int, Optional, "Initial start"),
+ MGM_ARG("nostart", Int, Optional, "No start"),
+ MGM_ARG("abort", Int, Optional, "Abort"),
+
+ MGM_CMD("restart node v2", &MgmApiSession::restart_v2, ""),
MGM_ARG("node", String, Mandatory, "Nodes to restart"),
MGM_ARG("initialstart", Int, Optional, "Initial start"),
MGM_ARG("nostart", Int, Optional, "No start"),
@@ -185,13 +192,18 @@
MGM_CMD("abort backup", &MgmApiSession::abortBackup, ""),
MGM_ARG("id", Int, Mandatory, "Backup id"),
- MGM_CMD("stop", &MgmApiSession::stop, ""),
+ MGM_CMD("stop", &MgmApiSession::stop_v1, ""),
+ MGM_ARG("node", String, Mandatory, "Node"),
+ MGM_ARG("abort", Int, Mandatory, "Node"),
+
+ MGM_CMD("stop v2", &MgmApiSession::stop_v2, ""),
MGM_ARG("node", String, Mandatory, "Node"),
MGM_ARG("abort", Int, Mandatory, "Node"),
MGM_CMD("stop all", &MgmApiSession::stopAll, ""),
MGM_ARG("abort", Int, Mandatory, "Node"),
-
+ MGM_ARG("stop", String, Optional, "MGM/DB or both"),
+
MGM_CMD("enter single user", &MgmApiSession::enterSingleUser, ""),
MGM_ARG("nodeId", Int, Mandatory, "Node"),
@@ -265,6 +277,15 @@
MGM_END()
};
+struct PurgeStruct
+{
+ NodeBitmask free_nodes;/* free nodes as reported
+ * by ndbd in apiRegReqConf
+ */
+ BaseString *str;
+ NDB_TICKS tick;
+};
+
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
@@ -273,6 +294,7 @@
m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
+ m_stopSelf= 0;
DBUG_VOID_RETURN;
}
@@ -292,6 +314,10 @@
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
+ if(m_stopSelf < 0)
+ g_RestartServer= true;
+ if(m_stopSelf)
+ g_StopServer= true;
DBUG_VOID_RETURN;
}
@@ -414,12 +440,15 @@
{
const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff;
+ Uint32 timeout= 20; // default seconds timeout
const char * transporter;
const char * user;
const char * password;
const char * public_key;
const char * endian= NULL;
const char * name= NULL;
+ Uint32 log_event= 1;
+ bool log_event_version;
union { long l; char c[sizeof(long)]; } endian_check;
args.get("version", &version);
@@ -431,6 +460,9 @@
args.get("public key", &public_key);
args.get("endian", &endian);
args.get("name", &name);
+ args.get("timeout", &timeout);
+ /* for backwards compatability keep track if client uses new protocol */
+ log_event_version= args.get("log_event", &log_event);
endian_check.l = 1;
if(endian
@@ -457,9 +489,9 @@
return;
}
- struct sockaddr addr;
+ struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
- int r = getpeername(m_socket, &addr, &addrlen);
+ int r = getpeername(m_socket, (struct sockaddr*)&addr, &addrlen);
if (r != 0 ) {
m_output->println(cmd);
m_output->println("result: getpeername(%d) failed, err= %d", m_socket, r);
@@ -470,14 +502,40 @@
NodeId tmp= nodeid;
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string;
- if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
- &addr, &addrlen, error_string)){
+ int error_code;
+ NDB_TICKS tick= 0;
+ /* only report error on second attempt as not to clog the cluster log */
+ while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
+ (struct sockaddr*)&addr, &addrlen,
+ error_code, error_string,
+ tick == 0 ? 0 : log_event))
+ {
+ /* NDB_MGM_ALLOCID_CONFIG_MISMATCH is a non retriable error */
+ if (tick == 0 && error_code != NDB_MGM_ALLOCID_CONFIG_MISMATCH)
+ {
+ // attempt to free any timed out reservations
+ tick= NdbTick_CurrentMillisecond();
+ struct PurgeStruct ps;
+ m_mgmsrv.get_connected_nodes(ps.free_nodes);
+ // invert connected_nodes to get free nodes
+ ps.free_nodes.bitXORC(NodeBitmask());
+ ps.str= 0;
+ ps.tick= tick;
+ m_mgmsrv.get_socket_server()->
+ foreachSession(stop_session_if_timed_out,&ps);
+ m_mgmsrv.get_socket_server()->checkSessions();
+ error_string = "";
+ continue;
+ }
const char *alias;
const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
nodetype, &str);
m_output->println(cmd);
m_output->println("result: %s", error_string.c_str());
+ /* only use error_code protocol if client knows about it */
+ if (log_event_version)
+ m_output->println("error_code: %d", error_code);
m_output->println("");
return;
}
@@ -497,7 +555,7 @@
m_output->println("nodeid: %u", tmp);
m_output->println("result: Ok");
m_output->println("");
- m_allocated_resources->reserve_node(tmp);
+ m_allocated_resources->reserve_node(tmp, timeout*1000);
if (name)
g_eventLogger.info("Node %d: %s", tmp, name);
@@ -613,15 +671,6 @@
}
void
-MgmApiSession::getStatPort(Parser_t::Context &,
- const class Properties &) {
-
- m_output->println("get statport reply");
- m_output->println("tcpport: %d", 0);
- m_output->println("");
-}
-
-void
MgmApiSession::insertError(Parser<MgmApiSession>::Context &,
Properties const &args) {
Uint32 node = 0, error = 0;
@@ -787,9 +836,8 @@
m_mgmsrv.m_event_listner.unlock();
{
- LogLevel ll;
- ll.setLogLevel(category,level);
- m_mgmsrv.m_event_listner.update_max_log_level(ll);
+ LogLevel tmp;
+ m_mgmsrv.m_event_listner.update_max_log_level(tmp);
}
m_output->println(reply);
@@ -850,8 +898,19 @@
}
void
-MgmApiSession::restart(Parser<MgmApiSession>::Context &,
+MgmApiSession::restart_v1(Parser<MgmApiSession>::Context &,
Properties const &args) {
+ restart(args,1);
+}
+
+void
+MgmApiSession::restart_v2(Parser<MgmApiSession>::Context &,
+ Properties const &args) {
+ restart(args,2);
+}
+
+void
+MgmApiSession::restart(Properties const &args, int version) {
Uint32
nostart = 0,
initialstart = 0,
@@ -872,14 +931,12 @@
}
int restarted = 0;
- int result = 0;
-
- for(size_t i = 0; i < nodes.size(); i++)
- if((result = m_mgmsrv.restartNode(nodes[i],
- nostart != 0,
- initialstart != 0,
- abort != 0)) == 0)
- restarted++;
+ int result= m_mgmsrv.restartNodes(nodes,
+ &restarted,
+ nostart != 0,
+ initialstart != 0,
+ abort != 0,
+ &m_stopSelf);
m_output->println("restart reply");
if(result != 0){
@@ -887,6 +944,8 @@
} else
m_output->println("result: Ok");
m_output->println("restarted: %d", restarted);
+ if(version>1)
+ m_output->println("disconnect: %d", (m_stopSelf)?1:0);
m_output->println("");
}
@@ -903,7 +962,7 @@
args.get("nostart", &nostart);
int count = 0;
- int result = m_mgmsrv.restart(nostart, initialstart, abort, &count);
+ int result = m_mgmsrv.restartDB(nostart, initialstart, abort, &count);
m_output->println("restart reply");
if(result != 0)
@@ -919,6 +978,7 @@
MgmtSrvr &mgmsrv,
enum ndb_mgm_node_type type) {
NodeId nodeId = 0;
+ mgmsrv.updateStatus();
while(mgmsrv.getNextNodeId(&nodeId, type)) {
enum ndb_mgm_node_status status;
Uint32 startPhase = 0,
@@ -996,15 +1056,31 @@
}
void
-MgmApiSession::stop(Parser<MgmApiSession>::Context &,
- Properties const &args) {
+MgmApiSession::stop_v1(Parser<MgmApiSession>::Context &,
+ Properties const &args) {
+ stop(args,1);
+}
+
+void
+MgmApiSession::stop_v2(Parser<MgmApiSession>::Context &,
+ Properties const &args) {
+ stop(args,2);
+}
+
+void
+MgmApiSession::stop(Properties const &args, int version) {
Uint32 abort;
char *nodes_str;
Vector<NodeId> nodes;
args.get("node", (const char **)&nodes_str);
if(nodes_str == NULL)
+ {
+ m_output->println("stop reply");
+ m_output->println("result: empty node list");
+ m_output->println("");
return;
+ }
args.get("abort", &abort);
char *p, *last;
@@ -1014,29 +1090,10 @@
nodes.push_back(atoi(p));
}
- int stop_self= 0;
- size_t i;
-
- for(i=0; i < nodes.size(); i++) {
- if (nodes[i] == m_mgmsrv.getOwnNodeId()) {
- stop_self= 1;
- if (i != nodes.size()-1) {
- m_output->println("stop reply");
- m_output->println("result: server must be stopped last");
- m_output->println("");
- return;
- }
- }
- }
-
- int stopped = 0, result = 0;
-
- for(i=0; i < nodes.size(); i++)
- if (nodes[i] != m_mgmsrv.getOwnNodeId()) {
- if((result = m_mgmsrv.stopNode(nodes[i], abort != 0)) == 0)
- stopped++;
- } else
- stopped++;
+ int stopped= 0;
+ int result= 0;
+ if (nodes.size())
+ result= m_mgmsrv.stopNodes(nodes, &stopped, abort != 0, &m_stopSelf);
m_output->println("stop reply");
if(result != 0)
@@ -1044,28 +1101,41 @@
else
m_output->println("result: Ok");
m_output->println("stopped: %d", stopped);
+ if(version>1)
+ m_output->println("disconnect: %d", (m_stopSelf)?1:0);
m_output->println("");
-
- if (stop_self)
- g_StopServer= true;
}
-
void
MgmApiSession::stopAll(Parser<MgmApiSession>::Context &,
- Properties const &args) {
- int stopped = 0;
+ Properties const &args) {
+ int stopped[2] = {0,0};
Uint32 abort;
args.get("abort", &abort);
- int result = m_mgmsrv.stop(&stopped, abort != 0);
+ BaseString stop;
+ const char* tostop= "db";
+ int ver=1;
+ if (args.get("stop", stop))
+ {
+ tostop= stop.c_str();
+ ver= 2;
+ }
+
+ int result= 0;
+ if(strstr(tostop,"db"))
+ result= m_mgmsrv.shutdownDB(&stopped[0], abort != 0);
+ if(!result && strstr(tostop,"mgm"))
+ result= m_mgmsrv.shutdownMGM(&stopped[1], abort!=0, &m_stopSelf);
m_output->println("stop reply");
if(result != 0)
m_output->println("result: %s", get_error_text(result));
else
m_output->println("result: Ok");
- m_output->println("stopped: %d", stopped);
+ m_output->println("stopped: %d", stopped[0]+stopped[1]);
+ if(ver >1)
+ m_output->println("disconnect: %d", (m_stopSelf)?1:0);
m_output->println("");
}
@@ -1296,21 +1366,23 @@
void
Ndb_mgmd_event_service::update_max_log_level(const LogLevel &log_level)
{
- LogLevel tmp= m_logLevel;
- tmp.set_max(log_level);
+ LogLevel tmp = log_level;
+ m_clients.lock();
+ for(int i = m_clients.size() - 1; i >= 0; i--)
+ tmp.set_max(m_clients[i].m_logLevel);
+ m_clients.unlock();
update_log_level(tmp);
}
void
Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
{
- if(!(tmp == m_logLevel)){
- m_logLevel = tmp;
- EventSubscribeReq req;
- req = tmp;
- req.blockRef = 0;
- m_mgmsrv->m_log_level_requests.push_back(req);
- }
+ m_logLevel = tmp;
+ EventSubscribeReq req;
+ req = tmp;
+ // send update to all nodes
+ req.blockRef = 0;
+ m_mgmsrv->m_log_level_requests.push_back(req);
}
void
@@ -1506,14 +1578,6 @@
m_output->println("");
}
-struct PurgeStruct
-{
- NodeBitmask free_nodes;/* free nodes as reported
- * by ndbd in apiRegReqConf
- */
- BaseString *str;
-};
-
void
MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
{
@@ -1521,7 +1585,20 @@
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes))
{
- ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
+ if (ps.str)
+ ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
+ s->stopSession();
+ }
+}
+
+void
+MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data)
+{
+ MgmApiSession *s= (MgmApiSession *)_s;
+ struct PurgeStruct &ps= *(struct PurgeStruct *)data;
+ if (s->m_allocated_resources->is_reserved(ps.free_nodes) &&
+ s->m_allocated_resources->is_timed_out(ps.tick))
+ {
s->stopSession();
}
}
@@ -1538,6 +1615,7 @@
ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
+ m_mgmsrv.get_socket_server()->checkSessions();
m_output->println("purge stale sessions reply");
if (str.length() > 0)
--- 1.19/storage/ndb/src/mgmsrv/Services.hpp 2006-02-21 22:51:15 +11:00
+++ 1.20/storage/ndb/src/mgmsrv/Services.hpp 2006-11-23 23:04:17 +11:00
@@ -30,6 +30,7 @@
class MgmApiSession : public SocketServer::Session
{
+ static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;
@@ -40,6 +41,7 @@
Parser_t *m_parser;
MgmtSrvr::Allocated_resources *m_allocated_resources;
char m_err_str[1024];
+ int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop
void getConfig_common(Parser_t::Context &ctx,
const class Properties &args,
@@ -52,7 +54,6 @@
virtual ~MgmApiSession();
void runSession();
- void getStatPort(Parser_t::Context &ctx, const class Properties &args);
void getConfig(Parser_t::Context &ctx, const class Properties &args);
#ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT
void getConfig_old(Parser_t::Context &ctx);
@@ -62,7 +63,9 @@
void getVersion(Parser_t::Context &ctx, const class Properties &args);
void getStatus(Parser_t::Context &ctx, const class Properties &args);
void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args);
- void restart(Parser_t::Context &ctx, const class Properties &args);
+ void restart(const class Properties &args, int version);
+ void restart_v1(Parser_t::Context &ctx, const class Properties &args);
+ void restart_v2(Parser_t::Context &ctx, const class Properties &args);
void restartAll(Parser_t::Context &ctx, const class Properties &args);
void insertError(Parser_t::Context &ctx, const class Properties &args);
void setTrace(Parser_t::Context &ctx, const class Properties &args);
@@ -74,7 +77,9 @@
void abortBackup(Parser_t::Context &ctx, const class Properties &args);
void enterSingleUser(Parser_t::Context &ctx, const class Properties &args);
void exitSingleUser(Parser_t::Context &ctx, const class Properties &args);
- void stop(Parser_t::Context &ctx, const class Properties &args);
+ void stop_v1(Parser_t::Context &ctx, const class Properties &args);
+ void stop_v2(Parser_t::Context &ctx, const class Properties &args);
+ void stop(const class Properties &args, int version);
void stopAll(Parser_t::Context &ctx, const class Properties &args);
void start(Parser_t::Context &ctx, const class Properties &args);
void startAll(Parser_t::Context &ctx, const class Properties &args);
--- 1.33/storage/ndb/src/ndbapi/TransporterFacade.hpp 2006-08-09 18:41:20 +10:00
+++ 1.34/storage/ndb/src/ndbapi/TransporterFacade.hpp 2006-11-23 23:04:17 +11:00
@@ -344,7 +344,7 @@
inline
Uint32
TransporterFacade::getNodeGrp(NodeId n) const {
- return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;
+ return theClusterMgr->getNodeInfo(n).m_state.getNodeGroup();
}
@@ -377,8 +377,8 @@
const Uint32 startLevel = node.m_state.startLevel;
if (node.m_info.m_type == NodeInfo::DB) {
- if(node.m_state.singleUserMode &&
- ownId() == node.m_state.singleUserApi) {
+ if(node.m_state.getSingleUserMode() &&
+ ownId() == node.m_state.getSingleUserApi()) {
return (node.compatible &&
(node.m_state.startLevel == NodeState::SL_STOPPING_1 ||
node.m_state.startLevel == NodeState::SL_STARTED ||
--- 1.25/storage/ndb/tools/waiter.cpp 2006-03-15 05:07:32 +11:00
+++ 1.26/storage/ndb/tools/waiter.cpp 2006-11-23 23:49:54 +11:00
@@ -96,7 +96,7 @@
wait_status= NDB_MGM_NODE_STATUS_STARTED;
}
- if (waitClusterStatus(_hostName, wait_status, _timeout, _node) != 0)
+ if (waitClusterStatus(_hostName, wait_status, _timeout*10, _node) != 0)
return NDBT_ProgramExit(NDBT_FAILED);
return NDBT_ProgramExit(NDBT_OK);
}
@@ -128,6 +128,12 @@
ndbout << "status==NULL, retries="<<retries<<endl;
MGMERR(handle);
retries++;
+ ndb_mgm_disconnect(handle);
+ if (ndb_mgm_connect(handle,0,0,1)) {
+ MGMERR(handle);
+ g_err << "Reconnect failed" << endl;
+ break;
+ }
continue;
}
int count = status->no_of_nodes;
@@ -325,7 +331,7 @@
g_info << "Waiting for cluster to enter state "
<< ndb_mgm_get_node_status_string(_status)<< endl;
}
- NdbSleep_SecSleep(1);
+ NdbSleep_MilliSleep(100);
attempts++;
}
return 0;
| Thread |
|---|
| • bk commit into 5.2 tree (stewart:1.2334) | Stewart Smith | 23 Nov |