Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-05-18 11:10:04+02:00, jonas@stripped +1 -0
Merge perch.ndb.mysql.com:/home/jonas/src/50-work
into perch.ndb.mysql.com:/home/jonas/src/51-telco-gca
MERGE: 1.1810.2124.64
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2007-05-18 11:10:02+02:00, jonas@stripped +0 -12
merge
MERGE: 1.52.15.2
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2007-05-18 11:08:26+02:00, jonas@stripped +0 -0
Merge rename: ndb/src/common/transporter/TransporterRegistry.cpp -> storage/ndb/src/common/transporter/TransporterRegistry.cpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-telco-gca/RESYNC
--- 1.52.15.1/ndb/src/common/transporter/TransporterRegistry.cpp 2007-05-18 11:10:08 +02:00
+++ 1.70/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-05-18 11:10:08 +02:00
@@ -2,8 +2,7 @@
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
+ the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -27,11 +26,6 @@
#include "TCP_Transporter.hpp"
#endif
-#ifdef NDB_OSE_TRANSPORTER
-#include "OSE_Receiver.hpp"
-#include "OSE_Transporter.hpp"
-#endif
-
#ifdef NDB_SCI_TRANSPORTER
#include "SCI_Transporter.hpp"
#endif
@@ -80,21 +74,21 @@
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
- unsigned sizeOfLongSignalMemory)
+ unsigned sizeOfLongSignalMemory) :
+ m_mgm_handle(0),
+ m_transp_count(0)
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
- m_mgm_handle= 0;
callbackObj=callback;
theTCPTransporters = new TCP_Transporter * [maxTransporters];
theSCITransporters = new SCI_Transporter * [maxTransporters];
theSHMTransporters = new SHM_Transporter * [maxTransporters];
- theOSETransporters = new OSE_Transporter * [maxTransporters];
theTransporterTypes = new TransporterType [maxTransporters];
theTransporters = new Transporter * [maxTransporters];
performStates = new PerformState [maxTransporters];
@@ -105,21 +99,16 @@
nTCPTransporters = 0;
nSCITransporters = 0;
nSHMTransporters = 0;
- nOSETransporters = 0;
// Initialize the transporter arrays
for (unsigned i=0; i<maxTransporters; i++) {
theTCPTransporters[i] = NULL;
theSCITransporters[i] = NULL;
theSHMTransporters[i] = NULL;
- theOSETransporters[i] = NULL;
theTransporters[i] = NULL;
performStates[i] = DISCONNECTED;
ioStates[i] = NoHalt;
}
- theOSEReceiver = 0;
- theOSEJunkSocketSend = 0;
- theOSEJunkSocketRecv = 0;
DBUG_VOID_RETURN;
}
@@ -154,19 +143,11 @@
delete[] theTCPTransporters;
delete[] theSCITransporters;
delete[] theSHMTransporters;
- delete[] theOSETransporters;
delete[] theTransporterTypes;
delete[] theTransporters;
delete[] performStates;
delete[] ioStates;
-#ifdef NDB_OSE_TRANSPORTER
- if(theOSEReceiver != NULL){
- theOSEReceiver->destroyPhantom();
- delete theOSEReceiver;
- theOSEReceiver = 0;
- }
-#endif
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
@@ -326,60 +307,6 @@
nTransporters++;
nTCPTransporters++;
-#if defined NDB_OSE || defined NDB_SOFTOSE
- t->theReceiverPid = theReceiverPid;
-#endif
-
- return true;
-#else
- return false;
-#endif
-}
-
-bool
-TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
-#ifdef NDB_OSE_TRANSPORTER
-
- if(!nodeIdSpecified){
- init(conf->localNodeId);
- }
-
- if(conf->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[conf->remoteNodeId] != NULL)
- return false;
-
- if(theOSEReceiver == NULL){
- theOSEReceiver = new OSE_Receiver(this,
- 10,
- localNodeId);
- }
-
- OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
- conf->ose.prioBSignalSize,
- localNodeId,
- conf->localHostName,
- conf->remoteNodeId,
- conf->serverNodeId,
- conf->remoteHostName,
- conf->checksum,
- conf->signalId);
- if (t == NULL)
- return false;
- else if (!t->initTransporter()) {
- delete t;
- return false;
- }
- // Put the transporter in the transporter arrays
- theOSETransporters[nOSETransporters] = t;
- theTransporters[t->getRemoteNodeId()] = t;
- theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
- performStates[t->getRemoteNodeId()] = DISCONNECTED;
-
- nTransporters++;
- nOSETransporters++;
-
return true;
#else
return false;
@@ -457,10 +384,7 @@
* Make sure to block g_ndb_shm_signum
* TransporterRegistry::init is run from "main" thread
*/
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, g_ndb_shm_signum);
- pthread_sigmask(SIG_BLOCK, &mask, 0);
+ NdbThread_set_shm_sigmask(TRUE);
}
if(config->shm.signum != g_ndb_shm_signum)
@@ -552,19 +476,8 @@
nSHMTransporters --;
#endif
break;
- case tt_OSE_TRANSPORTER:
-#ifdef NDB_OSE_TRANSPORTER
- for(; ind < nOSETransporters; ind++)
- if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)
- break;
- ind++;
- for(; ind<nOSETransporters; ind++)
- theOSETransporters[ind-1] = theOSETransporters[ind];
- nOSETransporters --;
-#endif
- break;
}
-
+
nTransporters--;
// Delete the transporter and remove it from theTransporters array
@@ -744,12 +657,7 @@
Uint32
TransporterRegistry::pollReceive(Uint32 timeOutMillis){
Uint32 retVal = 0;
-#ifdef NDB_OSE_TRANSPORTER
- retVal |= poll_OSE(timeOutMillis);
- retVal |= poll_TCP(0);
- return retVal;
-#endif
-
+
if((nSCITransporters) > 0)
{
timeOutMillis=0;
@@ -826,18 +734,6 @@
}
#endif
-#ifdef NDB_OSE_TRANSPORTER
-Uint32
-TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
-{
- if(theOSEReceiver != NULL){
- return theOSEReceiver->doReceive(timeOutMillis);
- }
- NdbSleep_MilliSleep(timeOutMillis);
- return 0;
-}
-#endif
-
#ifdef NDB_TCP_TRANSPORTER
Uint32
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
@@ -877,27 +773,15 @@
timeOutMillis = hasdata ? 0 : timeOutMillis;
struct timeval timeout;
-#ifdef NDB_OSE
- // Return directly if there are no TCP transporters configured
-
- if(timeOutMillis <= 1){
- timeout.tv_sec = 0;
- timeout.tv_usec = 1025;
- } else {
- timeout.tv_sec = timeOutMillis / 1000;
- timeout.tv_usec = (timeOutMillis % 1000) * 1000;
- }
-#else
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
-#endif
// The highest socket value plus one
maxSocketValue++;
tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
if(false && tcpReadSelectReply == -1 && errno == EINTR)
- ndbout_c("woke-up by signal");
+ g_eventLogger.info("woke-up by signal");
#ifdef NDB_WIN32
if(tcpReadSelectReply == SOCKET_ERROR)
@@ -914,33 +798,6 @@
void
TransporterRegistry::performReceive()
{
-#ifdef NDB_OSE_TRANSPORTER
- if(theOSEReceiver != 0)
- {
- while(theOSEReceiver->hasData())
- {
- NodeId remoteNodeId;
- Uint32 * readPtr;
- Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
- transporter_recv_from(callbackObj, remoteNodeId);
- Uint32 szUsed = unpack(readPtr,
- sz,
- remoteNodeId,
- ioStates[remoteNodeId]);
-#ifdef DEBUG_TRANSPORTER
- /**
- * OSE transporter can handle executions of
- * half signals
- */
- assert(sz == szUsed);
-#endif
- theOSEReceiver->updateReceiveDataPtr(szUsed);
- theOSEReceiver->doReceive(0);
- // checkJobBuffer();
- }
- }
-#endif
-
#ifdef NDB_TCP_TRANSPORTER
for (int i=0; i<nTCPTransporters; i++)
{
@@ -1009,75 +866,14 @@
#endif
}
-static int x = 0;
void
TransporterRegistry::performSend()
{
int i;
sendCounter = 1;
-
-#ifdef NDB_OSE_TRANSPORTER
- for (int i = 0; i < nOSETransporters; i++)
- {
- OSE_Transporter *t = theOSETransporters[i];
- if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
- {
- t->doSend();
- }//if
- }//for
-#endif
-
-#ifdef NDB_TCP_TRANSPORTER
-#ifdef NDB_OSE
- {
- int maxSocketValue = 0;
-
- // Needed for TCP/IP connections
- // The writeset are used by select
- fd_set writeset;
- FD_ZERO(&writeset);
-
- // Prepare for sending and receiving
- for (i = 0; i < nTCPTransporters; i++) {
- TCP_Transporter * t = theTCPTransporters[i];
-
- // If the transporter is connected
- if ((t->hasDataToSend()) && (t->isConnected())) {
- const int socket = t->getSocket();
- // Find the highest socket value. It will be used by select
- if (socket > maxSocketValue) {
- maxSocketValue = socket;
- }//if
- FD_SET(socket, &writeset);
- }//if
- }//for
-
- // The highest socket value plus one
- if(maxSocketValue == 0)
- return;
-
- maxSocketValue++;
- struct timeval timeout = { 0, 1025 };
- Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
-
- if (tmp == 0)
- {
- return;
- }//if
- for (i = 0; i < nTCPTransporters; i++) {
- TCP_Transporter *t = theTCPTransporters[i];
- const NodeId nodeId = t->getRemoteNodeId();
- const int socket = t->getSocket();
- if(is_connected(nodeId)){
- if(t->isConnected() && FD_ISSET(socket, &writeset)) {
- t->doSend();
- }//if
- }//if
- }//for
- }
-#endif
+
#ifdef NDB_TCP_TRANSPORTER
- for (i = x; i < nTCPTransporters; i++)
+ for (i = m_transp_count; i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() &&
@@ -1086,7 +882,7 @@
t->doSend();
}
}
- for (i = 0; i < x && i < nTCPTransporters; i++)
+ for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() &&
@@ -1095,9 +891,8 @@
t->doSend();
}
}
- x++;
- if (x == nTCPTransporters) x = 0;
-#endif
+ m_transp_count++;
+ if (m_transp_count == nTCPTransporters) m_transp_count = 0;
#endif
#ifdef NDB_SCI_TRANSPORTER
//scroll through the SCI transporters,
@@ -1320,12 +1115,12 @@
}
else if(ndb_mgm_is_connected(m_mgm_handle))
{
- ndbout_c("Failed to get dynamic port to connect to: %d", res);
+ g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
ndb_mgm_disconnect(m_mgm_handle);
}
else
{
- ndbout_c("Management server closed connection early. "
+ g_eventLogger.info("Management server closed connection early. "
"It is probably being shut down (or has problems). "
"We will retry the connection.");
}
@@ -1423,7 +1218,7 @@
DBUG_ENTER("TransporterRegistry::start_service");
if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
{
- ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
+ g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
DBUG_RETURN(false);
}
@@ -1449,7 +1244,7 @@
* If it wasn't a dynamically allocated port, or
* our attempts at getting a new dynamic port failed
*/
- ndbout_c("Unable to setup transporter service port: %s:%d!\n"
+ g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
"Please check if the port is already used,\n"
"(perhaps the node is already running)",
t.m_interface ? t.m_interface : "*", t.m_s_service_port);
@@ -1477,21 +1272,6 @@
TransporterRegistry::startReceiving()
{
DBUG_ENTER("TransporterRegistry::startReceiving");
-#ifdef NDB_OSE_TRANSPORTER
- if(theOSEReceiver != NULL){
- theOSEReceiver->createPhantom();
- }
-#endif
-
-#ifdef NDB_OSE
- theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);
-#endif
-
-#if defined NDB_OSE || defined NDB_SOFTOSE
- theReceiverPid = current_process();
- for(int i = 0; i<nTCPTransporters; i++)
- theTCPTransporters[i]->theReceiverPid = theReceiverPid;
-#endif
#ifdef NDB_SHM_TRANSPORTER
m_shm_own_pid = getpid();
@@ -1500,11 +1280,9 @@
DBUG_PRINT("info",("Install signal handler for signum %d",
g_ndb_shm_signum));
struct sigaction sa;
+ NdbThread_set_shm_sigmask(FALSE);
sigemptyset(&sa.sa_mask);
- sigaddset(&sa.sa_mask, g_ndb_shm_signum);
- pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
sa.sa_handler = shm_sig_handler;
- sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
int ret;
while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
@@ -1522,41 +1300,20 @@
void
TransporterRegistry::stopReceiving(){
-#ifdef NDB_OSE_TRANSPORTER
- if(theOSEReceiver != NULL){
- theOSEReceiver->destroyPhantom();
- }
-#endif
-
/**
* Disconnect all transporters, this includes detach from remote node
* and since that must be done from the same process that called attach
* it's done here in the receive thread
*/
disconnectAll();
-
-#if defined NDB_OSE || defined NDB_SOFTOSE
- if(theOSEJunkSocketRecv > 0)
- close(theOSEJunkSocketRecv);
- theOSEJunkSocketRecv = -1;
-#endif
-
}
void
TransporterRegistry::startSending(){
-#if defined NDB_OSE || defined NDB_SOFTOSE
- theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);
-#endif
}
void
TransporterRegistry::stopSending(){
-#if defined NDB_OSE || defined NDB_SOFTOSE
- if(theOSEJunkSocketSend > 0)
- close(theOSEJunkSocketSend);
- theOSEJunkSocketSend = -1;
-#endif
}
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
@@ -1584,13 +1341,13 @@
if(!mgm_nodeid)
{
- ndbout_c("%s: %d", __FILE__, __LINE__);
+ g_eventLogger.error("%s: %d", __FILE__, __LINE__);
return false;
}
Transporter * t = theTransporters[mgm_nodeid];
if (!t)
{
- ndbout_c("%s: %d", __FILE__, __LINE__);
+ g_eventLogger.error("%s: %d", __FILE__, __LINE__);
return false;
}
DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
@@ -1606,7 +1363,7 @@
if ( h==NULL || *h == NULL )
{
- ndbout_c("%s: %d", __FILE__, __LINE__);
+ g_eventLogger.error("%s: %d", __FILE__, __LINE__);
return NDB_INVALID_SOCKET;
}
@@ -1619,10 +1376,10 @@
m_transporter_interface[i].m_s_service_port,
&mgm_reply) < 0)
{
- ndbout_c("Error: %s: %d",
+ g_eventLogger.error("Error: %s: %d",
ndb_mgm_get_latest_error_desc(*h),
ndb_mgm_get_latest_error(*h));
- ndbout_c("%s: %d", __FILE__, __LINE__);
+ g_eventLogger.error("%s: %d", __FILE__, __LINE__);
ndb_mgm_destroy_handle(h);
return NDB_INVALID_SOCKET;
}
@@ -1634,10 +1391,10 @@
NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
if ( sockfd == NDB_INVALID_SOCKET)
{
- ndbout_c("Error: %s: %d",
+ g_eventLogger.error("Error: %s: %d",
ndb_mgm_get_latest_error_desc(*h),
ndb_mgm_get_latest_error(*h));
- ndbout_c("%s: %d", __FILE__, __LINE__);
+ g_eventLogger.error("%s: %d", __FILE__, __LINE__);
ndb_mgm_destroy_handle(h);
}
return sockfd;
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2480) | jonas | 18 May |