Below is the list of changes that have just been committed into a local
5.0 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-10-04 02:38:31+10:00, stewart@willster.(none) +12 -0
BUG#21154 The management server consumes too much CPU
BUG#13987 Cluster: Loss of data nodes can cause high CPU usage from ndb_mgmd
fix the actual problem (getting incomplete line of data), introduced with previous
improvement.
also add list sessions, get session and get session id to mgmapi to allow
the implementation of a test case for this exact problem.
ndb/include/mgmapi/mgmapi.h@stripped, 2006-10-04 02:38:27+10:00, stewart@willster.(none) +13
-0
Add internal ndb_mgm_get_fd for use in test case
ndb/include/mgmapi/mgmapi_debug.h@stripped, 2006-10-04 02:38:27+10:00,
stewart@willster.(none) +14 -0
Add internal get_session_id and get_session
ndb/include/util/InputStream.hpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none)
+10 -0
- fix warning when building with gcc 4
- add mutex to be UNLOCKED when blocking (e.g. select(2))
- this means we can list sessions in a threadsafe way
- add this weird startover member to SocketInputStream
- this helps work out if we've read a newline yet and should start inserting
into buffer from the start
ndb/include/util/Parser.hpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none) +4
-1
add mutex to context to pass down to SocketInputStream
ndb/include/util/socket_io.h@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none) +6
-1
readln_socket accepts mutex to UNLOCK around select(2)
ndb/src/common/util/InputStream.cpp@stripped, 2006-10-04 02:38:28+10:00,
stewart@willster.(none) +24 -15
remove evil, add more.
As
ndb/src/common/util/Parser.cpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none)
+23 -15
set mutex for passing down to InputStream to unlock on select(2).
change way detecting of NoLine
ndb/src/common/util/socket_io.cpp@stripped, 2006-10-04 02:38:28+10:00,
stewart@willster.(none) +20 -15
unlock a mutex around select so that we can nicely do thread safe
listing of sessions.
Always retrieve data from the OS so that we instantly get EOF on disconnect
and don't end up spinning looking for a newline.
ndb/src/mgmapi/mgmapi.cpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none) +115
-11
add internal ndb_mgm_get_fd() for internal testing
internal/debug:
ndb_mgm_get_session_id
ndb_mgm_get_session
ndb/src/mgmsrv/Services.cpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none)
+153 -9
Add list sessions, get session id and get session.
introduce a session mutex
ndb/src/mgmsrv/Services.hpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none) +16
-2
Add list and get session.
Add session_id to MgmApiSession.
ndb/test/ndbapi/testMgm.cpp@stripped, 2006-10-04 02:38:28+10:00, stewart@willster.(none) +45
-0
Add test for MgmApiSession disconnection (mgmd at 100%)
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: stewart
# Host: willster.(none)
# Root: /home/stewart/Documents/MySQL/5.0/bug21154
--- 1.49/ndb/include/mgmapi/mgmapi.h 2006-10-04 02:38:36 +10:00
+++ 1.50/ndb/include/mgmapi/mgmapi.h 2006-10-04 02:38:36 +10:00
@@ -1072,6 +1072,19 @@
int ndb_mgm_end_session(NdbMgmHandle handle);
/**
+ * ndb_mgm_get_fd
+ *
+ * get the file descriptor of the handle.
+ * INTERNAL ONLY.
+ * USE FOR TESTING. OTHER USES ARE NOT A GOOD IDEA.
+ *
+ * @param handle NDB management handle
+ * @return handle->socket
+ *
+ */
+ int ndb_mgm_get_fd(NdbMgmHandle handle);
+
+ /**
* Get the node id of the mgm server we're connected to
*/
Uint32 ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle);
--- 1.7/ndb/include/mgmapi/mgmapi_debug.h 2006-10-04 02:38:36 +10:00
+++ 1.8/ndb/include/mgmapi/mgmapi_debug.h 2006-10-04 02:38:36 +10:00
@@ -132,6 +132,20 @@
const char * value,
struct ndb_mgm_reply* reply);
+ Uint64 ndb_mgm_get_session_id(NdbMgmHandle handle);
+
+ struct NdbMgmSession {
+ Uint64 id;
+ Uint32 m_stopSelf;
+ Uint32 m_stop;
+ Uint32 nodeid;
+ Uint32 parser_buffer_len;
+ Uint32 parser_status;
+ };
+
+ int ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
+ struct NdbMgmSession *s, int *len);
+
#ifdef __cplusplus
}
#endif
--- 1.3/ndb/include/util/InputStream.hpp 2006-10-04 02:38:36 +10:00
+++ 1.4/ndb/include/util/InputStream.hpp 2006-10-04 02:38:36 +10:00
@@ -19,13 +19,22 @@
#include <ndb_global.h>
#include <NdbTCP.h>
+#include <NdbMutex.h>
/**
* Input stream
*/
class InputStream {
public:
+ InputStream() { m_mutex= NULL; };
+ virtual ~InputStream() {};
virtual char* gets(char * buf, int bufLen) = 0;
+ /**
+ * Set the mutex to be UNLOCKED when blocking (e.g. select(2))
+ */
+ void set_mutex(NdbMutex *m) { m_mutex= m; };
+protected:
+ NdbMutex *m_mutex;
};
class FileInputStream : public InputStream {
@@ -40,6 +49,7 @@
class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket;
unsigned m_timeout;
+ bool m_startover;
public:
SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000);
char* gets(char * buf, int bufLen);
--- 1.4/ndb/include/util/Parser.hpp 2006-10-04 02:38:36 +10:00
+++ 1.5/ndb/include/util/Parser.hpp 2006-10-04 02:38:36 +10:00
@@ -61,12 +61,15 @@
/**
* Context for parse
*/
- struct Context {
+ class Context {
+ public:
+ Context() { m_mutex= NULL; };
ParserStatus m_status;
const ParserRow<T> * m_currentCmd;
const ParserRow<T> * m_currentArg;
char * m_currentToken;
char m_tokenBuffer[512];
+ NdbMutex *m_mutex;
Vector<const ParserRow<T> *> m_aliasUsed;
};
--- 1.3/ndb/include/util/socket_io.h 2006-10-04 02:38:36 +10:00
+++ 1.4/ndb/include/util/socket_io.h 2006-10-04 02:38:36 +10:00
@@ -21,12 +21,17 @@
#include <NdbTCP.h>
+#include <NdbMutex.h>
+
#ifdef __cplusplus
extern "C" {
#endif
int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
- int readln_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
+
+ int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
+ char * buf, int buflen, NdbMutex *mutex);
+
int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len);
int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...);
--- 1.3/ndb/src/common/util/InputStream.cpp 2006-10-04 02:38:36 +10:00
+++ 1.4/ndb/src/common/util/InputStream.cpp 2006-10-04 02:38:36 +10:00
@@ -36,26 +36,35 @@
SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned readTimeout)
- : m_socket(socket) {
- m_timeout = readTimeout;
+ : m_socket(socket) {
+ m_startover= true;
+ m_timeout = readTimeout;
}
-char*
+char*
SocketInputStream::gets(char * buf, int bufLen) {
- buf[0] = 77;
assert(bufLen >= 2);
- int res = readln_socket(m_socket, m_timeout, buf, bufLen - 1);
+ int offset= 0;
+ if(m_startover)
+ {
+ buf[0]= '\0';
+ m_startover= false;
+ }
+ else
+ offset= strlen(buf);
+
+ int res = readln_socket(m_socket, m_timeout, buf+offset, bufLen-offset, m_mutex);
+
+ if(res == 0)
+ {
+ buf[0]=0;
+ return buf;
+ }
+
+ m_startover= true;
+
if(res == -1)
return 0;
- if(res == 0 && buf[0] == 77){ // select return 0
- buf[0] = 0;
- } else if(res == 0 && buf[0] == 0){ // only newline
- buf[0] = '\n';
- buf[1] = 0;
- } else {
- int len = strlen(buf);
- buf[len + 1] = '\0';
- buf[len] = '\n';
- }
+
return buf;
}
--- 1.8/ndb/src/common/util/Parser.cpp 2006-10-04 02:38:36 +10:00
+++ 1.9/ndb/src/common/util/Parser.cpp 2006-10-04 02:38:36 +10:00
@@ -32,6 +32,7 @@
char* gets(char * buf, int bufLen);
void push_back(const char *);
+ void set_mutex(NdbMutex *m) { in.set_mutex(m); };
private:
InputStream & in;
char * buffer;
@@ -144,25 +145,32 @@
{
DBUG_ENTER("ParserImpl::run");
+ input.set_mutex(ctx->m_mutex);
+
* pDst = 0;
bool ownStop = false;
if(stop == 0)
stop = &ownStop;
-
+
ctx->m_aliasUsed.clear();
-
+
const unsigned sz = sizeof(ctx->m_tokenBuffer);
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof;
DBUG_RETURN(false);
}
-
- if(ctx->m_currentToken[0] == 0){
+
+ int last= strlen(ctx->m_currentToken);
+ if(last>0)
+ last--;
+
+ if(ctx->m_currentToken[last] !='\n'){
ctx->m_status = Parser<Dummy>::NoLine;
+ ctx->m_tokenBuffer[0]= '\0';
DBUG_RETURN(false);
}
-
+
if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine;
DBUG_RETURN(false);
@@ -174,14 +182,14 @@
ctx->m_status = Parser<Dummy>::UnknownCommand;
DBUG_RETURN(false);
}
-
+
Properties * p = new Properties();
-
+
bool invalidArgument = false;
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
-
- while((! * stop) &&
- !Eof(ctx->m_currentToken) &&
+
+ while((! * stop) &&
+ !Eof(ctx->m_currentToken) &&
!Empty(ctx->m_currentToken)){
if(ctx->m_currentToken[0] != 0){
trim(ctx->m_currentToken);
@@ -193,7 +201,7 @@
}
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
}
-
+
if(invalidArgument){
char buf[sz];
char * tmp;
@@ -204,13 +212,13 @@
}
DBUG_RETURN(false);
}
-
+
if(* stop){
delete p;
ctx->m_status = Parser<Dummy>::ExternalStop;
DBUG_RETURN(false);
}
-
+
if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p;
@@ -226,9 +234,9 @@
tmp.put("name", alias->name);
tmp.put("realName", alias->realName);
p->put("$ALIAS", i, &tmp);
- }
+ }
p->put("$ALIAS", ctx->m_aliasUsed.size());
-
+
ctx->m_status = Parser<Dummy>::Ok;
* pDst = p;
DBUG_RETURN(true);
--- 1.8/ndb/src/common/util/socket_io.cpp 2006-10-04 02:38:36 +10:00
+++ 1.9/ndb/src/common/util/socket_io.cpp 2006-10-04 02:38:36 +10:00
@@ -49,7 +49,7 @@
extern "C"
int
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
- char * buf, int buflen){
+ char * buf, int buflen, NdbMutex *mutex){
if(buflen <= 1)
return 0;
@@ -65,7 +65,12 @@
timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000;
+ if(mutex)
+ NdbMutex_Unlock(mutex);
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
+ if(mutex)
+ NdbMutex_Lock(mutex);
+
if(selectRes == 0){
return 0;
}
@@ -75,7 +80,6 @@
return -1;
}
- buf[0] = 0;
const int t = recv(socket, buf, buflen, MSG_PEEK);
if(t < 1)
@@ -87,27 +91,28 @@
for(int i=0; i< t;i++)
{
if(buf[i] == '\n'){
- recv(socket, buf, i+1, 0);
- buf[i] = 0;
+ int r= recv(socket, buf, i+1, 0);
+ buf[i+1]= 0;
+ if(r < 1) {
+ fcntl(socket, F_SETFL, sock_flags);
+ return -1;
+ }
if(i > 0 && buf[i-1] == '\r'){
- i--;
- buf[i] = 0;
+ buf[i-1] = '\n';
+ buf[i]= '\0';
}
fcntl(socket, F_SETFL, sock_flags);
- return t;
+ return r;
}
}
- if(t == (buflen - 1)){
- recv(socket, buf, t, 0);
- buf[t] = 0;
- fcntl(socket, F_SETFL, sock_flags);
- return buflen;
- }
-
- return 0;
+ int r= recv(socket, buf, t, 0);
+ if(r>=0)
+ buf[r] = 0;
+ fcntl(socket, F_SETFL, sock_flags);
+ return r;
}
extern "C"
--- 1.61/ndb/src/mgmapi/mgmapi.cpp 2006-10-04 02:38:36 +10:00
+++ 1.62/ndb/src/mgmapi/mgmapi.cpp 2006-10-04 02:38:36 +10:00
@@ -503,6 +503,18 @@
}
/**
+ * Only used for low level testing
+ * Never to be used by end user.
+ * Or anybody who doesn't know exactly what they're doing.
+ */
+extern "C"
+int
+ndb_mgm_get_fd(NdbMgmHandle handle)
+{
+ return handle->socket;
+}
+
+/**
* Disconnect from a mgm server
*/
extern "C"
@@ -692,22 +704,16 @@
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
- if(buf[strlen(buf)-1] == '\n')
- buf[strlen(buf)-1] = '\0';
-
- if(strcmp("node status", buf) != 0) {
+ if(strcmp("node status\n", buf) != 0) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
-
if(!in.gets(buf, sizeof(buf)))
{
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
- if(buf[strlen(buf)-1] == '\n')
- buf[strlen(buf)-1] = '\0';
-
+
BaseString tmp(buf);
Vector<BaseString> split;
tmp.split(split, ":");
@@ -715,7 +721,7 @@
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
-
+
if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
@@ -2280,7 +2286,6 @@
SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
-
if (out.println("check connection"))
goto ndb_mgm_check_connection_error;
@@ -2490,7 +2495,6 @@
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
-
in.gets(buf, sizeof(buf));
DBUG_RETURN(0);
@@ -2546,6 +2550,106 @@
delete prop;
DBUG_RETURN(1);
+}
+
+extern "C"
+Uint64
+ndb_mgm_get_session_id(NdbMgmHandle handle)
+{
+ Uint64 session_id=0;
+
+ DBUG_ENTER("ndb_mgm_get_session_id");
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("get session id reply", NULL, ""),
+ MGM_ARG("id", Int, Mandatory, "Node ID"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop = ndb_mgm_call(handle, reply, "get session id", &args);
+ CHECK_REPLY(prop, 0);
+
+ if(!prop->get("id",&session_id)){
+ fprintf(handle->errstream, "Unable to get session id\n");
+ return 0;
+ }
+
+ delete prop;
+ DBUG_RETURN(session_id);
+}
+
+extern "C"
+int
+ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
+ struct NdbMgmSession *s, int *len)
+{
+ int retval= 0;
+ DBUG_ENTER("ndb_mgm_get_session");
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+ args.put("id", id);
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("get session reply", NULL, ""),
+ MGM_ARG("id", Int, Mandatory, "Node ID"),
+ MGM_ARG("m_stopSelf", Int, Optional, "m_stopSelf"),
+ MGM_ARG("m_stop", Int, Optional, "stop session"),
+ MGM_ARG("nodeid", Int, Optional, "allocated node id"),
+ MGM_ARG("parser_buffer_len", Int, Optional, "waiting in buffer"),
+ MGM_ARG("parser_status", Int, Optional, "parser status"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop = ndb_mgm_call(handle, reply, "get session", &args);
+ CHECK_REPLY(prop, 0);
+
+ Uint64 r_id;
+ int rlen= 0;
+
+ if(!prop->get("id",&r_id)){
+ fprintf(handle->errstream, "Unable to get session id\n");
+ goto err;
+ }
+
+ s->id= r_id;
+ rlen+=sizeof(s->id);
+
+ if(prop->get("m_stopSelf",&(s->m_stopSelf)))
+ rlen+=sizeof(s->m_stopSelf);
+ else
+ goto err;
+
+ if(prop->get("m_stop",&(s->m_stop)))
+ rlen+=sizeof(s->m_stop);
+ else
+ goto err;
+
+ if(prop->get("nodeid",&(s->nodeid)))
+ rlen+=sizeof(s->nodeid);
+ else
+ goto err;
+
+ if(prop->get("parser_buffer_len",&(s->parser_buffer_len)))
+ {
+ rlen+=sizeof(s->parser_buffer_len);
+ if(prop->get("parser_status",&(s->parser_status)))
+ rlen+=sizeof(s->parser_status);
+ }
+
+ *len= rlen;
+ retval= 1;
+
+err:
+ delete prop;
+ DBUG_RETURN(retval);
}
template class Vector<const ParserRow<ParserDummy>*>;
--- 1.68/ndb/src/mgmsrv/Services.cpp 2006-10-04 02:38:36 +10:00
+++ 1.69/ndb/src/mgmsrv/Services.cpp 2006-10-04 02:38:36 +10:00
@@ -270,6 +270,13 @@
MGM_ARG("length", Int, Mandatory, "Length"),
MGM_ARG("data", String, Mandatory, "Data"),
+ MGM_CMD("list sessions", &MgmApiSession::listSessions, ""),
+
+ MGM_CMD("get session id", &MgmApiSession::getSessionId, ""),
+
+ MGM_CMD("get session", &MgmApiSession::getSession, ""),
+ MGM_ARG("id", Int, Mandatory, "SessionID"),
+
MGM_END()
};
@@ -282,7 +289,7 @@
NDB_TICKS tick;
};
-MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
+MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64
session_id)
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
@@ -291,6 +298,9 @@
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
m_stopSelf= 0;
+ m_ctx= NULL;
+ m_session_id= session_id;
+ m_mutex= NdbMutex_Create();
DBUG_VOID_RETURN;
}
@@ -314,6 +324,7 @@
g_RestartServer= true;
if(m_stopSelf)
g_StopServer= true;
+ NdbMutex_Destroy(m_mutex);
DBUG_VOID_RETURN;
}
@@ -323,11 +334,19 @@
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx;
- while(!m_stop) {
+ ctx.m_mutex= m_mutex;
+ m_ctx= &ctx;
+ bool stop= false;
+ while(!stop) {
+ NdbMutex_Lock(m_mutex);
+
m_parser->run(ctx, *this);
if(ctx.m_currentToken == 0)
+ {
+ NdbMutex_Unlock(m_mutex);
break;
+ }
switch(ctx.m_status) {
case Parser_t::UnknownCommand:
@@ -348,13 +367,19 @@
default:
break;
}
- }
+
+ stop= m_stop;
+ NdbMutex_Unlock(m_mutex);
+ };
+
+ NdbMutex_Lock(m_mutex);
+ m_ctx= NULL;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
-
+ NdbMutex_Unlock(m_mutex);
DBUG_VOID_RETURN;
}
@@ -1568,11 +1593,6 @@
result = -1;
goto done;
}
-
- m_mgmsrv.m_event_listner.add_listener(le);
-
- m_stop = true;
- m_socket = NDB_INVALID_SOCKET;
done:
m_output->println("listen event");
@@ -1580,6 +1600,13 @@
if(result != 0)
m_output->println("msg: %s", msg.c_str());
m_output->println("");
+
+ if(result==0)
+ {
+ m_mgmsrv.m_event_listner.add_listener(le);
+ m_stop = true;
+ m_socket = NDB_INVALID_SOCKET;
+ }
}
void
@@ -1679,6 +1706,123 @@
m_mgmsrv.eventReport(data);
m_output->println("report event reply");
m_output->println("result: ok");
+ m_output->println("");
+}
+
+void
+MgmApiSession::list_session(SocketServer::Session *_s, void *data)
+{
+ MgmApiSession *s= (MgmApiSession *)_s;
+ MgmApiSession *lister= (MgmApiSession*) data;
+
+ if(s!=lister)
+ NdbMutex_Lock(s->m_mutex);
+
+ Uint64 id= s->m_session_id;
+ lister->m_output->println("session: %llu",id);
+ lister->m_output->println("session.%llu.m_stopSelf: %d",id,s->m_stopSelf);
+ lister->m_output->println("session.%llu.m_stop: %d",id,s->m_stop);
+ lister->m_output->println("session.%llu.allocated.nodeid:
%d",id,s->m_allocated_resources->get_nodeid());
+ if(s->m_ctx)
+ {
+ int l= strlen(s->m_ctx->m_tokenBuffer);
+ char *buf= (char*) malloc(2*l+1);
+ char *b= buf;
+ for(int i=0; i<l;i++)
+ if(s->m_ctx->m_tokenBuffer[i]=='\n')
+ {
+ *b++='\\';
+ *b++='n';
+ }
+ else
+ {
+ *b++= s->m_ctx->m_tokenBuffer[i];
+ }
+ *b= '\0';
+
+ lister->m_output->println("session.%llu.parser.buffer.len: %u",id,l);
+ lister->m_output->println("session.%llu.parser.buffer: %s",id,buf);
+ lister->m_output->println("session.%llu.parser.status:
%d",id,s->m_ctx->m_status);
+
+ free(buf);
+ }
+
+ if(s!=lister)
+ NdbMutex_Unlock(s->m_mutex);
+}
+
+void
+MgmApiSession::listSessions(Parser_t::Context &ctx,
+ Properties const &args) {
+ m_mgmsrv.get_socket_server()->foreachSession(list_session,(void*)this);
+
+ m_output->println("");
+}
+
+void
+MgmApiSession::getSessionId(Parser_t::Context &ctx,
+ Properties const &args) {
+ m_output->println("get session id reply");
+ m_output->println("id: %llu",m_session_id);
+ m_output->println("");
+}
+
+struct get_session_param {
+ MgmApiSession *l;
+ Uint64 id;
+ int found;
+};
+
+void
+MgmApiSession::get_session(SocketServer::Session *_s, void *data)
+{
+ struct get_session_param *p= (struct get_session_param*)data;
+ MgmApiSession *s= (MgmApiSession *)_s;
+
+ if(s!=p->l)
+ NdbMutex_Lock(s->m_mutex);
+
+ if(p->id != s->m_session_id)
+ {
+ if(s!=p->l)
+ NdbMutex_Unlock(s->m_mutex);
+ return;
+ }
+
+ p->found= true;
+ p->l->m_output->println("id: %llu",s->m_session_id);
+ p->l->m_output->println("m_stopSelf: %d",s->m_stopSelf);
+ p->l->m_output->println("m_stop: %d",s->m_stop);
+ p->l->m_output->println("nodeid:
%d",s->m_allocated_resources->get_nodeid());
+ if(s->m_ctx)
+ {
+ int l= strlen(s->m_ctx->m_tokenBuffer);
+ p->l->m_output->println("parser_buffer_len: %u",l);
+ p->l->m_output->println("parser_status: %d",s->m_ctx->m_status);
+ }
+
+ if(s!=p->l)
+ NdbMutex_Unlock(s->m_mutex);
+}
+
+void
+MgmApiSession::getSession(Parser_t::Context &ctx,
+ Properties const &args) {
+ Uint64 id;
+ struct get_session_param p;
+
+ args.get("id", &id);
+
+ p.l= this;
+ p.id= id;
+ p.found= false;
+
+ m_output->println("get session reply");
+ m_mgmsrv.get_socket_server()->foreachSession(get_session,(void*)&p);
+
+ if(p.found==false)
+ m_output->println("id: 0");
+
m_output->println("");
}
--- 1.20/ndb/src/mgmsrv/Services.hpp 2006-10-04 02:38:36 +10:00
+++ 1.21/ndb/src/mgmsrv/Services.hpp 2006-10-04 02:38:36 +10:00
@@ -32,6 +32,8 @@
{
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
+ static void list_session(SocketServer::Session *_s, void *data);
+ static void get_session(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;
@@ -42,6 +44,11 @@
MgmtSrvr::Allocated_resources *m_allocated_resources;
char m_err_str[1024];
int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop
+ NdbMutex *m_mutex;
+
+ // for listing sessions and other fun:
+ Parser_t::Context *m_ctx;
+ Uint64 m_session_id;
void getConfig_common(Parser_t::Context &ctx,
const class Properties &args,
@@ -50,7 +57,7 @@
{ return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); }
public:
- MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock);
+ MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id);
virtual ~MgmApiSession();
void runSession();
@@ -107,13 +114,20 @@
void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args);
void report_event(Parser_t::Context &ctx, Properties const &args);
+
+ void listSessions(Parser_t::Context &ctx, Properties const &args);
+
+ void getSessionId(Parser_t::Context &ctx, Properties const &args);
+ void getSession(Parser_t::Context &ctx, Properties const &args);
};
class MgmApiService : public SocketServer::Service {
class MgmtSrvr * m_mgmsrv;
+ Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer
public:
MgmApiService(){
m_mgmsrv = 0;
+ m_next_session_id= 1;
}
void setMgm(class MgmtSrvr * mgmsrv){
@@ -121,7 +135,7 @@
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){
- return new MgmApiSession(* m_mgmsrv, socket);
+ return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++);
}
};
--- 1.4/ndb/test/ndbapi/testMgm.cpp 2006-10-04 02:38:36 +10:00
+++ 1.5/ndb/test/ndbapi/testMgm.cpp 2006-10-04 02:38:36 +10:00
@@ -21,6 +21,8 @@
#include <NdbRestarter.hpp>
#include <Vector.hpp>
#include <random.h>
+#include <mgmapi.h>
+#include <mgmapi_debug.h>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
@@ -167,6 +169,44 @@
return result;
}
+int runTestApiSession(NDBT_Context* ctx, NDBT_Step* step)
+{
+ char *mgm= ctx->getRemoteMgm();
+ Uint64 session_id= 0;
+
+ NdbMgmHandle h;
+ h= ndb_mgm_create_handle();
+ ndb_mgm_set_connectstring(h, mgm);
+ ndb_mgm_connect(h,0,0,0);
+ int s= ndb_mgm_get_fd(h);
+ session_id= ndb_mgm_get_session_id(h);
+ ndbout << "MGM Session id: " << session_id << endl;
+ write(s,"get",3);
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+
+ struct NdbMgmSession sess;
+ int slen= sizeof(struct NdbMgmSession);
+
+ h= ndb_mgm_create_handle();
+ ndb_mgm_set_connectstring(h, mgm);
+ ndb_mgm_connect(h,0,0,0);
+
+ if(ndb_mgm_get_session(h,session_id,&sess,&slen))
+ {
+ ndbout << "Failed, session still exists" << endl;
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+ return NDBT_FAILED;
+ }
+ else
+ {
+ ndbout << "SUCCESS: session is gone" << endl;
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+ return NDBT_OK;
+ }
+}
NDBT_TESTSUITE(testMgm);
@@ -174,6 +214,11 @@
"Test single user mode"){
INITIALIZER(runTestSingleUserMode);
FINALIZER(runClearTable);
+}
+TESTCASE("ApiSessionFailure",
+ "Test failures in MGMAPI session"){
+ INITIALIZER(runTestApiSession);
+
}
NDBT_TESTSUITE_END(testMgm);
| Thread |
|---|
| • bk commit into 5.0 tree (stewart:1.2265) BUG#13987 | Stewart Smith | 3 Oct |