4414 jonas oreland 2012-01-17 [merge]
ndb - merge 70 to 71
modified:
storage/ndb/include/debugger/SignalLoggerManager.hpp
storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp
storage/ndb/include/ndb_types.h.in
storage/ndb/include/transporter/TransporterCallback.hpp
storage/ndb/include/transporter/TransporterDefinitions.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/debugger/EventLogger.cpp
storage/ndb/src/common/debugger/SignalLoggerManager.cpp
storage/ndb/src/common/transporter/Packer.cpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/ndbd.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/Emulator.hpp
storage/ndb/src/kernel/vm/SectionReader.cpp
storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
4413 Jonas Oreland 2012-01-16
ndb - bump version to 7.1.20
modified:
configure.in
storage/ndb/ndb_configure.m4
=== modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp'
--- a/storage/ndb/include/debugger/SignalLoggerManager.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp 2012-01-16 07:14:30 +0000
@@ -28,8 +28,10 @@
#include <kernel_types.h>
#include <BlockNumbers.h>
-#include <TransporterDefinitions.hpp>
#include <RefConvert.hpp>
+#include <NdbMutex.h>
+
+struct SignalHeader;
class SignalLoggerManager
{
=== modified file 'storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp 2012-01-16 13:56:30 +0000
@@ -21,8 +21,8 @@
#include "SignalData.hpp"
-class SignalDroppedRep {
-
+struct SignalDroppedRep
+{
/**
* Receiver(s)
*/
@@ -37,8 +37,7 @@ class SignalDroppedRep {
friend class TransporterCallbackKernel;
friend bool printSIGNAL_DROPPED_REP(FILE *, const Uint32 *, Uint32, Uint16);
-public:
-private:
+
Uint32 originalGsn;
Uint32 originalLength;
Uint32 originalSectionCount;
=== modified file 'storage/ndb/include/ndb_types.h.in'
--- a/storage/ndb/include/ndb_types.h.in 2012-01-10 12:58:40 +0000
+++ b/storage/ndb/include/ndb_types.h.in 2012-01-17 07:57:37 +0000
@@ -94,4 +94,76 @@ typedef unsigned int UintR;
#include "ndb_constants.h"
+struct LinearSectionPtr
+{
+ Uint32 sz;
+ Uint32 * p;
+};
+
+struct SegmentedSectionPtrPOD
+{
+ Uint32 sz;
+ Uint32 i;
+ struct SectionSegment * p;
+
+#ifdef __cplusplus
+ void setNull() { p = 0;}
+ bool isNull() const { return p == 0;}
+ inline SegmentedSectionPtrPOD& assign(struct SegmentedSectionPtr&);
+#endif
+};
+
+struct SegmentedSectionPtr
+{
+ Uint32 sz;
+ Uint32 i;
+ struct SectionSegment * p;
+
+#ifdef __cplusplus
+ SegmentedSectionPtr() {}
+ SegmentedSectionPtr(Uint32 sz_arg, Uint32 i_arg,
+ struct SectionSegment *p_arg)
+ :sz(sz_arg), i(i_arg), p(p_arg)
+ {}
+ SegmentedSectionPtr(const SegmentedSectionPtrPOD & src)
+ :sz(src.sz), i(src.i), p(src.p)
+ {}
+
+ void setNull() { p = 0;}
+ bool isNull() const { return p == 0;}
+#endif
+};
+
+#ifdef __cplusplus
+inline
+SegmentedSectionPtrPOD&
+SegmentedSectionPtrPOD::assign(struct SegmentedSectionPtr& src)
+{
+ this->i = src.i;
+ this->p = src.p;
+ this->sz = src.sz;
+ return *this;
+}
+#endif
+
+/* Abstract interface for iterating over
+ * words in a section
+ */
+#ifdef __cplusplus
+struct GenericSectionIterator
+{
+ virtual ~GenericSectionIterator() {};
+ virtual void reset()=0;
+ virtual const Uint32* getNextWords(Uint32& sz)=0;
+};
+#else
+struct GenericSectionIterator;
+#endif
+
+struct GenericSectionPtr
+{
+ Uint32 sz;
+ struct GenericSectionIterator* sectionIter;
+};
+
#endif
=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp 2011-09-09 10:48:14 +0000
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp 2012-01-16 08:42:18 +0000
@@ -31,277 +31,15 @@
#include <kernel_types.h>
#include "TransporterDefinitions.hpp"
+#include "TransporterRegistry.hpp"
-
-#define TE_DO_DISCONNECT 0x8000
-
-enum TransporterError {
- TE_NO_ERROR = 0,
- /**
- * TE_ERROR_CLOSING_SOCKET
- *
- * Error found during closing of socket
- *
- * Recommended behavior: Ignore
- */
- TE_ERROR_CLOSING_SOCKET = 0x1,
-
- /**
- * TE_ERROR_IN_SELECT_BEFORE_ACCEPT
- *
- * Error found during accept (just before)
- * The transporter will retry.
- *
- * Recommended behavior: Ignore
- * (or possible do setPerformState(PerformDisconnect)
- */
- TE_ERROR_IN_SELECT_BEFORE_ACCEPT = 0x2,
-
- /**
- * TE_INVALID_MESSAGE_LENGTH
- *
- * Error found in message (message length)
- *
- * Recommended behavior: setPerformState(PerformDisconnect)
- */
- TE_INVALID_MESSAGE_LENGTH = 0x3 | TE_DO_DISCONNECT,
-
- /**
- * TE_INVALID_CHECKSUM
- *
- * Error found in message (checksum)
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- TE_INVALID_CHECKSUM = 0x4 | TE_DO_DISCONNECT,
-
- /**
- * TE_COULD_NOT_CREATE_SOCKET
- *
- * Error found while creating socket
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- TE_COULD_NOT_CREATE_SOCKET = 0x5,
-
- /**
- * TE_COULD_NOT_BIND_SOCKET
- *
- * Error found while binding server socket
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- TE_COULD_NOT_BIND_SOCKET = 0x6,
-
- /**
- * TE_LISTEN_FAILED
- *
- * Error found while listening to server socket
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- TE_LISTEN_FAILED = 0x7,
-
- /**
- * TE_ACCEPT_RETURN_ERROR
- *
- * Error found during accept
- * The transporter will retry.
- *
- * Recommended behavior: Ignore
- * (or possible do setPerformState(PerformDisconnect)
- */
- TE_ACCEPT_RETURN_ERROR = 0x8
-
- /**
- * TE_SHM_DISCONNECT
- *
- * The remote node has disconnected
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SHM_DISCONNECT = 0xb | TE_DO_DISCONNECT
-
- /**
- * TE_SHM_IPC_STAT
- *
- * Unable to check shm segment
- * probably because remote node
- * has disconnected and removed it
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SHM_IPC_STAT = 0xc | TE_DO_DISCONNECT
-
- /**
- * Permanent error
- */
- ,TE_SHM_IPC_PERMANENT = 0x21
-
- /**
- * TE_SHM_UNABLE_TO_CREATE_SEGMENT
- *
- * Unable to create shm segment
- * probably os something error
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SHM_UNABLE_TO_CREATE_SEGMENT = 0xd
-
- /**
- * TE_SHM_UNABLE_TO_ATTACH_SEGMENT
- *
- * Unable to attach shm segment
- * probably invalid group / user
- *
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SHM_UNABLE_TO_ATTACH_SEGMENT = 0xe
-
- /**
- * TE_SHM_UNABLE_TO_REMOVE_SEGMENT
- *
- * Unable to remove shm segment
- *
- * Recommended behavior: Ignore (not much to do)
- * Print warning to logfile
- */
- ,TE_SHM_UNABLE_TO_REMOVE_SEGMENT = 0xf
-
- ,TE_TOO_SMALL_SIGID = 0x10
- ,TE_TOO_LARGE_SIGID = 0x11
- ,TE_WAIT_STACK_FULL = 0x12 | TE_DO_DISCONNECT
- ,TE_RECEIVE_BUFFER_FULL = 0x13 | TE_DO_DISCONNECT
-
- /**
- * TE_SIGNAL_LOST_SEND_BUFFER_FULL
- *
- * Send buffer is full, and trying to force send fails
- * a signal is dropped!! very bad very bad
- *
- */
- ,TE_SIGNAL_LOST_SEND_BUFFER_FULL = 0x14 | TE_DO_DISCONNECT
-
- /**
- * TE_SIGNAL_LOST
- *
- * Send failed for unknown reason
- * a signal is dropped!! very bad very bad
- *
- */
- ,TE_SIGNAL_LOST = 0x15
-
- /**
- * TE_SEND_BUFFER_FULL
- *
- * The send buffer was full, but sleeping for a while solved it
- */
- ,TE_SEND_BUFFER_FULL = 0x16
-
- /**
- * TE_SCI_UNABLE_TO_CLOSE_CHANNEL
- *
- * Unable to close the sci channel and the resources allocated by
- * the sisci api.
- */
- ,TE_SCI_UNABLE_TO_CLOSE_CHANNEL = 0x22
-
- /**
- * TE_SCI_LINK_ERROR
- *
- * There is no link from this node to the switch.
- * No point in continuing. Must check the connections.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_LINK_ERROR = 0x0017
-
- /**
- * TE_SCI_UNABLE_TO_START_SEQUENCE
- *
- * Could not start a sequence, because system resources
- * are exumed or no sequence has been created.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNABLE_TO_START_SEQUENCE = 0x18 | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
- *
- * Could not remove a sequence
- */
- ,TE_SCI_UNABLE_TO_REMOVE_SEQUENCE = 0x19 | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNABLE_TO_CREATE_SEQUENCE
- *
- * Could not create a sequence, because system resources are
- * exempted. Must reboot.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNABLE_TO_CREATE_SEQUENCE = 0x1a | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
- *
- * Tried to send data on redundant link but failed.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR = 0x1b | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_CANNOT_INIT_LOCALSEGMENT
- *
- * Cannot initialize local segment. A whole lot of things has
- * gone wrong (no system resources). Must reboot.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_CANNOT_INIT_LOCALSEGMENT = 0x1c | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_CANNOT_MAP_REMOTESEGMENT
- *
- * Cannot map remote segment. No system resources are left.
- * Must reboot system.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_CANNOT_MAP_REMOTESEGMENT = 0x1d | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNABLE_TO_UNMAP_SEGMENT
- *
- * Cannot free the resources used by this segment (step 1).
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNABLE_TO_UNMAP_SEGMENT = 0x1e | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNABLE_TO_REMOVE_SEGMENT
- *
- * Cannot free the resources used by this segment (step 2).
- * Cannot guarantee that enough resources exist for NDB
- * to map more segment
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNABLE_TO_REMOVE_SEGMENT = 0x1f | TE_DO_DISCONNECT
-
- /**
- * TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
- *
- * Cannot disconnect from a remote segment.
- * Recommended behavior: setPerformState(PerformDisonnect)
- */
- ,TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT = 0x20 | TE_DO_DISCONNECT
-
- /* Used 0x21 */
- /* Used 0x22 */
-};
-
/**
- * The TransporterCallback class encapsulates those aspects of the transporter
- * code that is specific to particular upper layer (NDB API, single-threaded
- * kernel, or multi-threaded kernel).
+ * The TransporterReceiveCallback class encapsulates
+ * the receive aspects of the transporter code that is
+ * specific to particular
+ * upper layer (NDB API, single-threaded kernel, or multi-threaded kernel).
*/
-class TransporterCallback {
+class TransporterReceiveHandle : public TransporterReceiveData {
public:
/**
* This method is called to deliver a signal to the upper layer.
@@ -329,15 +67,6 @@ public:
virtual int checkJobBuffer() = 0;
/**
- * The transporter periodically calls this method, indicating the number
- * of sends done to one NodeId, as well as total bytes sent.
- *
- * For multithreaded cases, this is only called while the send lock for the
- * given node is held.
- */
- virtual void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes) = 0;
-
- /**
* Same as reportSendLen(), but for received data.
*
* For multithreaded cases, this is only called while holding the global
@@ -379,6 +108,27 @@ public:
*/
virtual void transporter_recv_from(NodeId node) = 0;
+ /**
+ *
+ */
+ virtual ~TransporterReceiveHandle() { };
+};
+
+/**
+ * The TransporterCallback class encapsulates those aspects of the transporter
+ * code that is specific to particular upper layer (NDB API, single-threaded
+ * kernel, or multi-threaded kernel).
+ */
+class TransporterCallback {
+public:
+ /**
+ * The transporter periodically calls this method, indicating the number
+ * of sends done to one NodeId, as well as total bytes sent.
+ *
+ * For multithreaded cases, this is only called while the send lock for the
+ * given node is held.
+ */
+ virtual void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes) = 0;
/**
* Locking (no-op in single-threaded VM).
=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp 2012-01-16 07:14:30 +0000
@@ -126,64 +126,269 @@ struct SignalHeader {
Uint8 m_fragmentInfo;
}; /** 7x4 = 28 Bytes */
-struct LinearSectionPtr {
- Uint32 sz;
- Uint32 * p;
-};
-
-struct SegmentedSectionPtrPOD
-{
- Uint32 sz;
- Uint32 i;
- struct SectionSegment * p;
-
- void setNull() { p = 0;}
- bool isNull() const { return p == 0;}
- inline SegmentedSectionPtrPOD& assign(struct SegmentedSectionPtr&);
-};
+class NdbOut & operator <<(class NdbOut & out, SignalHeader & sh);
-struct SegmentedSectionPtr {
- Uint32 sz;
- Uint32 i;
- struct SectionSegment * p;
-
- SegmentedSectionPtr() {}
- SegmentedSectionPtr(Uint32 sz_arg, Uint32 i_arg,
- struct SectionSegment *p_arg)
- :sz(sz_arg), i(i_arg), p(p_arg)
- {}
- SegmentedSectionPtr(const SegmentedSectionPtrPOD & src)
- :sz(src.sz), i(src.i), p(src.p)
- {}
+#define TE_DO_DISCONNECT 0x8000
- void setNull() { p = 0;}
- bool isNull() const { return p == 0;}
-};
+enum TransporterError {
+ TE_NO_ERROR = 0,
+ /**
+ * TE_ERROR_CLOSING_SOCKET
+ *
+ * Error found during closing of socket
+ *
+ * Recommended behavior: Ignore
+ */
+ TE_ERROR_CLOSING_SOCKET = 0x1,
+
+ /**
+ * TE_ERROR_IN_SELECT_BEFORE_ACCEPT
+ *
+ * Error found during accept (just before)
+ * The transporter will retry.
+ *
+ * Recommended behavior: Ignore
+ * (or possible do setPerformState(PerformDisconnect)
+ */
+ TE_ERROR_IN_SELECT_BEFORE_ACCEPT = 0x2,
+
+ /**
+ * TE_INVALID_MESSAGE_LENGTH
+ *
+ * Error found in message (message length)
+ *
+ * Recommended behavior: setPerformState(PerformDisconnect)
+ */
+ TE_INVALID_MESSAGE_LENGTH = 0x3 | TE_DO_DISCONNECT,
+
+ /**
+ * TE_INVALID_CHECKSUM
+ *
+ * Error found in message (checksum)
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ TE_INVALID_CHECKSUM = 0x4 | TE_DO_DISCONNECT,
+
+ /**
+ * TE_COULD_NOT_CREATE_SOCKET
+ *
+ * Error found while creating socket
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ TE_COULD_NOT_CREATE_SOCKET = 0x5,
+
+ /**
+ * TE_COULD_NOT_BIND_SOCKET
+ *
+ * Error found while binding server socket
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ TE_COULD_NOT_BIND_SOCKET = 0x6,
+
+ /**
+ * TE_LISTEN_FAILED
+ *
+ * Error found while listening to server socket
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ TE_LISTEN_FAILED = 0x7,
+
+ /**
+ * TE_ACCEPT_RETURN_ERROR
+ *
+ * Error found during accept
+ * The transporter will retry.
+ *
+ * Recommended behavior: Ignore
+ * (or possible do setPerformState(PerformDisconnect)
+ */
+ TE_ACCEPT_RETURN_ERROR = 0x8
+
+ /**
+ * TE_SHM_DISCONNECT
+ *
+ * The remote node has disconnected
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SHM_DISCONNECT = 0xb | TE_DO_DISCONNECT
+
+ /**
+ * TE_SHM_IPC_STAT
+ *
+ * Unable to check shm segment
+ * probably because remote node
+ * has disconnected and removed it
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SHM_IPC_STAT = 0xc | TE_DO_DISCONNECT
+
+ /**
+ * Permanent error
+ */
+ ,TE_SHM_IPC_PERMANENT = 0x21
+
+ /**
+ * TE_SHM_UNABLE_TO_CREATE_SEGMENT
+ *
+ * Unable to create shm segment
+ * probably os something error
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SHM_UNABLE_TO_CREATE_SEGMENT = 0xd
+
+ /**
+ * TE_SHM_UNABLE_TO_ATTACH_SEGMENT
+ *
+ * Unable to attach shm segment
+ * probably invalid group / user
+ *
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SHM_UNABLE_TO_ATTACH_SEGMENT = 0xe
+
+ /**
+ * TE_SHM_UNABLE_TO_REMOVE_SEGMENT
+ *
+ * Unable to remove shm segment
+ *
+ * Recommended behavior: Ignore (not much to do)
+ * Print warning to logfile
+ */
+ ,TE_SHM_UNABLE_TO_REMOVE_SEGMENT = 0xf
+
+ ,TE_TOO_SMALL_SIGID = 0x10
+ ,TE_TOO_LARGE_SIGID = 0x11
+ ,TE_WAIT_STACK_FULL = 0x12 | TE_DO_DISCONNECT
+ ,TE_RECEIVE_BUFFER_FULL = 0x13 | TE_DO_DISCONNECT
+
+ /**
+ * TE_SIGNAL_LOST_SEND_BUFFER_FULL
+ *
+ * Send buffer is full, and trying to force send fails
+ * a signal is dropped!! very bad very bad
+ *
+ */
+ ,TE_SIGNAL_LOST_SEND_BUFFER_FULL = 0x14 | TE_DO_DISCONNECT
+
+ /**
+ * TE_SIGNAL_LOST
+ *
+ * Send failed for unknown reason
+ * a signal is dropped!! very bad very bad
+ *
+ */
+ ,TE_SIGNAL_LOST = 0x15
+
+ /**
+ * TE_SEND_BUFFER_FULL
+ *
+ * The send buffer was full, but sleeping for a while solved it
+ */
+ ,TE_SEND_BUFFER_FULL = 0x16
+
+ /**
+ * TE_SCI_UNABLE_TO_CLOSE_CHANNEL
+ *
+ * Unable to close the sci channel and the resources allocated by
+ * the sisci api.
+ */
+ ,TE_SCI_UNABLE_TO_CLOSE_CHANNEL = 0x22
+
+ /**
+ * TE_SCI_LINK_ERROR
+ *
+ * There is no link from this node to the switch.
+ * No point in continuing. Must check the connections.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_LINK_ERROR = 0x0017
+
+ /**
+ * TE_SCI_UNABLE_TO_START_SEQUENCE
+ *
+ * Could not start a sequence, because system resources
+ * are exumed or no sequence has been created.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNABLE_TO_START_SEQUENCE = 0x18 | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
+ *
+ * Could not remove a sequence
+ */
+ ,TE_SCI_UNABLE_TO_REMOVE_SEQUENCE = 0x19 | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNABLE_TO_CREATE_SEQUENCE
+ *
+ * Could not create a sequence, because system resources are
+ * exempted. Must reboot.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNABLE_TO_CREATE_SEQUENCE = 0x1a | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
+ *
+ * Tried to send data on redundant link but failed.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR = 0x1b | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_CANNOT_INIT_LOCALSEGMENT
+ *
+ * Cannot initialize local segment. A whole lot of things has
+ * gone wrong (no system resources). Must reboot.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_CANNOT_INIT_LOCALSEGMENT = 0x1c | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_CANNOT_MAP_REMOTESEGMENT
+ *
+ * Cannot map remote segment. No system resources are left.
+ * Must reboot system.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_CANNOT_MAP_REMOTESEGMENT = 0x1d | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNABLE_TO_UNMAP_SEGMENT
+ *
+ * Cannot free the resources used by this segment (step 1).
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNABLE_TO_UNMAP_SEGMENT = 0x1e | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNABLE_TO_REMOVE_SEGMENT
+ *
+ * Cannot free the resources used by this segment (step 2).
+ * Cannot guarantee that enough resources exist for NDB
+ * to map more segment
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNABLE_TO_REMOVE_SEGMENT = 0x1f | TE_DO_DISCONNECT
+
+ /**
+ * TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
+ *
+ * Cannot disconnect from a remote segment.
+ * Recommended behavior: setPerformState(PerformDisonnect)
+ */
+ ,TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT = 0x20 | TE_DO_DISCONNECT
-inline
-SegmentedSectionPtrPOD&
-SegmentedSectionPtrPOD::assign(struct SegmentedSectionPtr& src)
-{
- this->i = src.i;
- this->p = src.p;
- this->sz = src.sz;
- return *this;
-}
-
-/* Abstract interface for iterating over
- * words in a section
- */
-struct GenericSectionIterator {
- virtual ~GenericSectionIterator() {};
- virtual void reset()=0;
- virtual const Uint32* getNextWords(Uint32& sz)=0;
+ /* Used 0x21 */
+ /* Used 0x22 */
};
-struct GenericSectionPtr {
- Uint32 sz;
- GenericSectionIterator* sectionIter;
-};
-
-class NdbOut & operator <<(class NdbOut & out, SignalHeader & sh);
-
#endif // Define of TransporterDefinitions_H
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-04 08:53:15 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-16 13:56:30 +0000
@@ -33,7 +33,6 @@
#include <sys/epoll.h>
#endif
#include "TransporterDefinitions.hpp"
-#include "TransporterCallback.hpp"
#include <SocketServer.hpp>
#include <SocketClient.hpp>
@@ -85,6 +84,44 @@ public:
};
/**
+ * TransporterReceiveData
+ *
+ * State for pollReceive/performReceive
+ * Moved into own class to enable multi receive threads
+ */
+struct TransporterReceiveData
+{
+ TransporterReceiveData();
+ ~TransporterReceiveData();
+
+ bool init (unsigned maxTransporters);
+
+ /**
+ * Add a transporter to epoll_set
+ * does nothing if epoll not active
+ */
+ bool epoll_add(TCP_Transporter*);
+
+ /**
+ * Bitmask of transporters that has data "carried over" since
+ * last performReceive
+ */
+ NodeBitmask m_has_data_transporters;
+#if defined(HAVE_EPOLL_CREATE)
+ int m_epoll_fd;
+ struct epoll_event *m_epoll_events;
+ bool change_epoll(TCP_Transporter *t, bool add);
+#endif
+
+ /**
+ * Used in polling if exists TCP_Transporter
+ */
+ ndb_socket_poller m_socket_poller;
+};
+
+#include "TransporterCallback.hpp"
+
+/**
* @class TransporterRegistry
* @brief ...
*/
@@ -98,9 +135,9 @@ public:
* Constructor
*/
TransporterRegistry(TransporterCallback *callback,
+ TransporterReceiveHandle * receiveHandle,
bool use_default_send_buffer = true,
- unsigned maxTransporters = MAX_NTRANSPORTERS,
- unsigned sizeOfLongSignalMemory = 100);
+ unsigned maxTransporters = MAX_NTRANSPORTERS);
/**
* this handle will be used in the client connect thread
@@ -113,6 +150,12 @@ public:
bool init(NodeId localNodeId);
/**
+ * Iff using non-default TransporterReceiveHandle's
+ * they need to get initalized
+ */
+ bool init(TransporterReceiveHandle&);
+
+ /**
Handle the handshaking with a new client connection
on the server port.
NOTE! Connection should be closed if function
@@ -154,7 +197,6 @@ public:
struct NdbThread* start_clients();
bool stop_clients();
void start_clients_thread();
- void update_connections();
/**
* Start/Stop receiving
@@ -188,8 +230,8 @@ public:
void do_connect(NodeId node_id);
void do_disconnect(NodeId node_id, int errnum = 0);
bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
- void report_connect(NodeId node_id);
- void report_disconnect(NodeId node_id, int errnum);
+ void report_connect(TransporterReceiveHandle&, NodeId node_id);
+ void report_disconnect(TransporterReceiveHandle&, NodeId node_id, int errnum);
void report_error(NodeId nodeId, TransporterError errorCode,
const char *errorInfo = 0);
@@ -305,15 +347,10 @@ public:
*
*/
void external_IO(Uint32 timeOutMillis);
-
- inline Uint32 pollReceive(Uint32 timeOutMillis) {
- return pollReceive(timeOutMillis, m_has_data_transporters);
- }
- Uint32 pollReceive(Uint32 timeOutMillis, NodeBitmask& mask);
- void performReceive();
+
int performSend(NodeId nodeId);
void performSend();
-
+
/**
* Force sending if more than or equal to sendLimit
* number have asked for send. Returns 0 if not sending
@@ -325,13 +362,6 @@ public:
void printState();
#endif
-#ifdef ERROR_INSERT
- /* Utils for testing latency issues */
- bool isBlocked(NodeId nodeId);
- void blockReceive(NodeId nodeId);
- void unblockReceive(NodeId nodeId);
-#endif
-
class Transporter_interface {
public:
NodeId m_remote_nodeId;
@@ -347,6 +377,7 @@ protected:
private:
TransporterCallback *callbackObj;
+ TransporterReceiveHandle * receiveHandle;
NdbMgmHandle m_mgm_handle;
@@ -369,16 +400,6 @@ private:
#endif
/**
- * Bitmask of transporters that has data "carried over" since
- * last performReceive
- */
- NodeBitmask m_has_data_transporters;
-#if defined(HAVE_EPOLL_CREATE)
- int m_epoll_fd;
- struct epoll_event *m_epoll_events;
- bool change_epoll(TCP_Transporter *t, bool add);
-#endif
- /**
* Arrays holding all transporters in the order they are created
*/
TCP_Transporter** theTCPTransporters;
@@ -407,21 +428,23 @@ private:
* Overloaded bits, for fast check.
*/
NodeBitmask m_status_overloaded;
-
+
/**
* Unpack signal data.
*
* Defined in Packer.cpp.
*/
- Uint32 unpack(Uint32 * readPtr,
- Uint32 bufferSize,
- NodeId remoteNodeId,
- IOState state);
-
- Uint32 * unpack(Uint32 * readPtr,
- Uint32 * eodPtr,
- NodeId remoteNodeId,
- IOState state);
+ Uint32 unpack(TransporterReceiveHandle&,
+ Uint32 * readPtr,
+ Uint32 bufferSize,
+ NodeId remoteNodeId,
+ IOState state);
+
+ Uint32 * unpack(TransporterReceiveHandle&,
+ Uint32 * readPtr,
+ Uint32 * eodPtr,
+ NodeId remoteNodeId,
+ IOState state);
static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
/**
@@ -431,23 +454,22 @@ private:
* and remove it from theIndexedTransporters array
*/
void removeTransporter(NodeId nodeId);
-
- /**
- * Used in polling if exists TCP_Transporter
- */
- int tcpReadSelectReply;
- ndb_socket_poller m_socket_poller;
- Uint32 poll_TCP(Uint32 timeOutMillis, NodeBitmask&);
- Uint32 poll_SCI(Uint32 timeOutMillis, NodeBitmask&);
- Uint32 poll_SHM(Uint32 timeOutMillis, NodeBitmask&);
+ Uint32 poll_TCP(Uint32 timeOutMillis, TransporterReceiveHandle&);
+ Uint32 poll_SCI(Uint32 timeOutMillis, TransporterReceiveHandle&);
+ Uint32 poll_SHM(Uint32 timeOutMillis, TransporterReceiveHandle&);
int m_shm_own_pid;
int m_transp_count;
public:
- bool setup_wakeup_socket();
+ bool setup_wakeup_socket(TransporterReceiveHandle&);
void wakeup();
+
+ inline bool setup_wakeup_socket() {
+ assert(receiveHandle != 0);
+ return setup_wakeup_socket(* receiveHandle);
+ }
private:
bool m_has_extra_wakeup_socket;
NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
@@ -528,6 +550,34 @@ public:
void print_transporters(const char* where, NdbOut& out = ndbout);
+ /**
+ * Receiving
+ */
+ Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
+ void performReceive(TransporterReceiveHandle&);
+ void update_connections(TransporterReceiveHandle&);
+
+ inline Uint32 pollReceive(Uint32 timeOutMillis) {
+ assert(receiveHandle != 0);
+ return pollReceive(timeOutMillis, * receiveHandle);
+ }
+
+ inline void performReceive() {
+ assert(receiveHandle != 0);
+ performReceive(* receiveHandle);
+ }
+
+ inline void update_connections() {
+ assert(receiveHandle != 0);
+ update_connections(* receiveHandle);
+ }
+
+#ifdef ERROR_INSERT
+ /* Utils for testing latency issues */
+ bool isBlocked(NodeId nodeId);
+ void blockReceive(TransporterReceiveHandle&, NodeId nodeId);
+ void unblockReceive(TransporterReceiveHandle&, NodeId nodeId);
+#endif
};
inline void
=== modified file 'storage/ndb/src/common/debugger/EventLogger.cpp'
--- a/storage/ndb/src/common/debugger/EventLogger.cpp 2011-12-13 12:53:03 +0000
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp 2012-01-16 07:12:39 +0000
@@ -18,7 +18,7 @@
#include <ndb_global.h>
#include <EventLogger.hpp>
-#include <TransporterCallback.hpp>
+#include <TransporterDefinitions.hpp>
#include <NdbConfig.h>
#include <kernel/BlockNumbers.h>
=== modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp'
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2011-10-21 08:59:23 +0000
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp 2012-01-16 07:14:30 +0000
@@ -18,7 +18,7 @@
#include <ndb_global.h>
#include "SignalLoggerManager.hpp"
-#include <LongSignal.hpp>
+#include "TransporterDefinitions.hpp"
#include <GlobalSignalNumbers.h>
#include <DebuggerNames.hpp>
#include <NdbTick.h>
=== modified file 'storage/ndb/src/common/transporter/Packer.cpp'
--- a/storage/ndb/src/common/transporter/Packer.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/transporter/Packer.cpp 2012-01-16 08:42:18 +0000
@@ -29,10 +29,11 @@ Uint32 MAX_RECEIVED_SIGNALS = 1024;
#endif
Uint32
-TransporterRegistry::unpack(Uint32 * readPtr,
- Uint32 sizeOfData,
- NodeId remoteNodeId,
- IOState state) {
+TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
+ Uint32 * readPtr,
+ Uint32 sizeOfData,
+ NodeId remoteNodeId,
+ IOState state) {
SignalHeader signalHeader;
LinearSectionPtr ptr[3];
@@ -112,7 +113,7 @@ TransporterRegistry::unpack(Uint32 * rea
sectionData += sz;
}
- callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
+ recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
readPtr += messageLen32;
sizeOfData -= messageLenBytes;
@@ -198,7 +199,7 @@ TransporterRegistry::unpack(Uint32 * rea
sectionData += sz;
}
- callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
+ recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
} else {
DEBUG("prepareReceive(...) - Discarding message to block: "
<< rBlockNum << " from Node: " << remoteNodeId);
@@ -215,10 +216,11 @@ TransporterRegistry::unpack(Uint32 * rea
}
Uint32 *
-TransporterRegistry::unpack(Uint32 * readPtr,
- Uint32 * eodPtr,
- NodeId remoteNodeId,
- IOState state) {
+TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
+ Uint32 * readPtr,
+ Uint32 * eodPtr,
+ NodeId remoteNodeId,
+ IOState state) {
SignalHeader signalHeader;
LinearSectionPtr ptr[3];
Uint32 loop_count = 0;
@@ -289,7 +291,7 @@ TransporterRegistry::unpack(Uint32 * rea
sectionData += sz;
}
- callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
+ recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
readPtr += messageLen32;
}//while
@@ -366,7 +368,7 @@ TransporterRegistry::unpack(Uint32 * rea
sectionData += sz;
}
- callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
+ recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
} else {
DEBUG("prepareReceive(...) - Discarding message to block: "
<< rBlockNum << " from Node: " << remoteNodeId);
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2012-01-16 08:42:18 +0000
@@ -386,7 +386,8 @@ ok:
}
int
-TCP_Transporter::doReceive() {
+TCP_Transporter::doReceive(TransporterReceiveHandle& recvdata)
+{
// Select-function must return the socket for read
// before this method is called
// It reads the external TCP/IP interface once
@@ -417,8 +418,8 @@ TCP_Transporter::doReceive() {
receiveSize += nBytesRead;
if(receiveCount == reportFreq){
- get_callback_obj()->reportReceiveLen(remoteNodeId,
- receiveCount, receiveSize);
+ recvdata.reportReceiveLen(remoteNodeId,
+ receiveCount, receiveSize);
receiveCount = 0;
receiveSize = 0;
}
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2012-01-16 08:42:18 +0000
@@ -43,6 +43,7 @@ struct ReceiveBuffer {
};
class TCP_Transporter : public Transporter {
+ friend struct TransporterReceiveData;
friend class TransporterRegistry;
friend class Loopback_Transporter;
private:
@@ -69,7 +70,7 @@ private:
* It reads the external TCP/IP interface once
* and puts the data in the receiveBuffer
*/
- int doReceive();
+ int doReceive(TransporterReceiveHandle&);
/**
* Returns socket (used for select)
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2011-12-10 18:54:55 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-17 07:57:37 +0000
@@ -74,10 +74,120 @@ SocketServer::Session * TransporterServi
DBUG_RETURN(0);
}
+TransporterReceiveData::TransporterReceiveData()
+{
+#if defined(HAVE_EPOLL_CREATE)
+ m_epoll_fd = -1;
+ m_epoll_events = 0;
+#endif
+}
+
+bool
+TransporterReceiveData::init(unsigned maxTransporters)
+{
+ maxTransporters += 1; /* wakeup socket */
+#if defined(HAVE_EPOLL_CREATE)
+ m_epoll_fd = epoll_create(maxTransporters);
+ if (m_epoll_fd == -1)
+ {
+ perror("epoll_create failed... falling back to select!");
+ goto fallback;
+ }
+ m_epoll_events = new struct epoll_event[maxTransporters];
+ if (m_epoll_events == 0)
+ {
+ perror("Failed to alloc epoll-array... falling back to select!");
+ close(m_epoll_fd);
+ m_epoll_fd = -1;
+ goto fallback;
+ }
+ bzero(m_epoll_events, maxTransporters * sizeof(struct epoll_event));
+ return true;
+fallback:
+#endif
+ return m_socket_poller.set_max_count(maxTransporters);
+}
+
+bool
+TransporterReceiveData::epoll_add(TCP_Transporter *t)
+{
+#if defined(HAVE_EPOLL_CREATE)
+ if (m_epoll_fd != -1)
+ {
+ bool add = true;
+ struct epoll_event event_poll;
+ bzero(&event_poll, sizeof(event_poll));
+ NDB_SOCKET_TYPE sock_fd = t->getSocket();
+ int node_id = t->getRemoteNodeId();
+ int op = EPOLL_CTL_ADD;
+ int ret_val, error;
+
+ if (!my_socket_valid(sock_fd))
+ return FALSE;
+
+ event_poll.data.u32 = t->getRemoteNodeId();
+ event_poll.events = EPOLLIN;
+ ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
+ if (!ret_val)
+ goto ok;
+ error= errno;
+ if (error == ENOENT && !add)
+ {
+ /*
+ * Could be that socket was closed premature to this call.
+ * Not a problem that this occurs.
+ */
+ goto ok;
+ }
+ if (!add || (add && (error != ENOMEM)))
+ {
+ /*
+ * Serious problems, we are either using wrong parameters,
+ * have permission problems or the socket doesn't support
+ * epoll!!
+ */
+ ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
+ " node %u to epoll-set,"
+ " errno: %u %s",
+ add ? "ADD" : "DEL",
+ m_epoll_fd,
+ MY_SOCKET_FORMAT_VALUE(sock_fd),
+ node_id,
+ error,
+ strerror(error));
+ abort();
+ }
+ ndbout << "We lacked memory to add the socket for node id ";
+ ndbout << node_id << endl;
+ return false;
+ }
+
+ok:
+#endif
+ return true;
+}
+
+TransporterReceiveData::~TransporterReceiveData()
+{
+#if defined(HAVE_EPOLL_CREATE)
+ if (m_epoll_fd != -1)
+ {
+ close(m_epoll_fd);
+ m_epoll_fd = -1;
+ }
+
+ if (m_epoll_events)
+ {
+ delete [] m_epoll_events;
+ m_epoll_events = 0;
+ }
+#endif
+}
+
TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
+ TransporterReceiveHandle * recvHandle,
bool use_default_send_buffer,
- unsigned _maxTransporters,
- unsigned sizeOfLongSignalMemory) :
+ unsigned _maxTransporters) :
m_mgm_handle(0),
localNodeId(0),
m_transp_count(0),
@@ -87,6 +197,7 @@ TransporterRegistry::TransporterRegistry
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
+ receiveHandle = recvHandle;
maxTransporters = _maxTransporters;
sendCounter = 1;
@@ -101,34 +212,9 @@ TransporterRegistry::TransporterRegistry
ioStates = new IOState [maxTransporters];
m_disconnect_errnum = new int [maxTransporters];
m_error_states = new ErrorState [maxTransporters];
-
+
m_has_extra_wakeup_socket = false;
-#if defined(HAVE_EPOLL_CREATE)
- m_epoll_fd = -1;
- m_epoll_events = new struct epoll_event[maxTransporters];
- m_epoll_fd = epoll_create(maxTransporters);
- if (m_epoll_fd == -1 || !m_epoll_events)
- {
- /* Failure to allocate data or get epoll socket, abort */
- perror("Failed to alloc epoll-array or calling epoll_create... falling back to select!");
- if (m_epoll_fd != -1)
- {
- close(m_epoll_fd);
- m_epoll_fd = -1;
- }
- if (m_epoll_events)
- {
- delete [] m_epoll_events;
- m_epoll_events = 0;
- }
- }
- else
- {
- memset((char*)m_epoll_events, 0,
- maxTransporters * sizeof(struct epoll_event));
- }
-#endif
#ifdef ERROR_INSERT
m_blocked.clear();
m_blocked_with_data.clear();
@@ -256,10 +342,6 @@ TransporterRegistry::~TransporterRegistr
if (m_send_buffer_memory)
delete[] m_send_buffer_memory;
-#if defined(HAVE_EPOLL_CREATE)
- if (m_epoll_events) delete [] m_epoll_events;
- if (m_epoll_fd != -1) close(m_epoll_fd);
-#endif
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
@@ -298,14 +380,22 @@ TransporterRegistry::init(NodeId nodeId)
DEBUG("TransporterRegistry started node: " << localNodeId);
- if (!m_socket_poller.set_max_count(maxTransporters +
- 1 /* wakeup socket */))
- DBUG_RETURN(false);
+ if (receiveHandle)
+ {
+ if (!receiveHandle->init(maxTransporters))
+ DBUG_RETURN(false);
+ }
DBUG_RETURN(true);
}
bool
+TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
+{
+ return recvhandle.init(maxTransporters);
+}
+
+bool
TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
BaseString & msg) const
{
@@ -885,15 +975,17 @@ TransporterRegistry::external_IO(Uint32
// followed by the receive part where we expect to sleep for
// a while.
//-----------------------------------------------------------
- if(pollReceive(timeOutMillis)){
- performReceive();
+ if(pollReceive(timeOutMillis, * receiveHandle)){
+ performReceive(* receiveHandle);
}
performSend();
}
bool
-TransporterRegistry::setup_wakeup_socket()
+TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
if (m_has_extra_wakeup_socket)
{
return true;
@@ -912,14 +1004,15 @@ TransporterRegistry::setup_wakeup_socket
}
#if defined(HAVE_EPOLL_CREATE)
- if (m_epoll_fd != -1)
+ if (recvdata.m_epoll_fd != -1)
{
int sock = m_extra_wakeup_sockets[0].fd;
struct epoll_event event_poll;
bzero(&event_poll, sizeof(event_poll));
event_poll.data.u32 = 0;
event_poll.events = EPOLLIN;
- int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
+ int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
+ &event_poll);
if (ret_val != 0)
{
int error= errno;
@@ -953,15 +1046,17 @@ TransporterRegistry::wakeup()
Uint32
TransporterRegistry::pollReceive(Uint32 timeOutMillis,
- NodeBitmask& mask)
+ TransporterReceiveHandle& recvdata)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
Uint32 retVal = 0;
/**
* If any transporters have left-over data that was not fully executed in
* last loop, don't wait and return 'data available' even if nothing new
*/
- if (!mask.isclear())
+ if (!recvdata.m_has_data_transporters.isclear())
{
timeOutMillis = 0;
retVal = 1;
@@ -975,7 +1070,7 @@ TransporterRegistry::pollReceive(Uint32
#ifdef NDB_SHM_TRANSPORTER
if (nSHMTransporters > 0)
{
- Uint32 res = poll_SHM(0, mask);
+ Uint32 res = poll_SHM(0, recvdata);
if(res)
{
retVal |= res;
@@ -986,13 +1081,15 @@ TransporterRegistry::pollReceive(Uint32
#ifdef NDB_TCP_TRANSPORTER
#if defined(HAVE_EPOLL_CREATE)
- if (likely(m_epoll_fd != -1))
+ if (likely(recvdata.m_epoll_fd != -1))
{
+ int tcpReadSelectReply = 0;
Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
-
+
if (num_trps)
{
- tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
+ tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
+ recvdata.m_epoll_events,
num_trps, timeOutMillis);
retVal |= tcpReadSelectReply;
}
@@ -1002,7 +1099,7 @@ TransporterRegistry::pollReceive(Uint32
{
for (int i = 0; i < num_socket_events; i++)
{
- const Uint32 trpid = m_epoll_events[i].data.u32;
+ const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
#ifdef ERROR_INSERT
if (m_blocked.get(trpid))
{
@@ -1011,7 +1108,7 @@ TransporterRegistry::pollReceive(Uint32
continue;
}
#endif
- mask.set(trpid);
+ recvdata.m_has_data_transporters.set(trpid);
}
}
else if (num_socket_events < 0)
@@ -1024,20 +1121,18 @@ TransporterRegistry::pollReceive(Uint32
{
if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
{
- retVal |= poll_TCP(timeOutMillis, mask);
+ retVal |= poll_TCP(timeOutMillis, recvdata);
}
- else
- tcpReadSelectReply = 0;
}
#endif
#ifdef NDB_SCI_TRANSPORTER
if (nSCITransporters > 0)
- retVal |= poll_SCI(timeOutMillis, mask);
+ retVal |= poll_SCI(timeOutMillis, recvdata);
#endif
#ifdef NDB_SHM_TRANSPORTER
if (nSHMTransporters > 0)
{
- int res = poll_SHM(0, mask);
+ int res = poll_SHM(0, recvdata);
retVal |= res;
}
#endif
@@ -1047,8 +1142,11 @@ TransporterRegistry::pollReceive(Uint32
#ifdef NDB_SCI_TRANSPORTER
Uint32
-TransporterRegistry::poll_SCI(Uint32 timeOutMillis, NodeBitmask& mask)
+TransporterRegistry::poll_SCI(Uint32 timeOutMillis,
+ TransporterReceiveHandle& recvdata)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
Uint32 retVal = 0;
for (int i = 0; i < nSCITransporters; i++)
{
@@ -1058,7 +1156,7 @@ TransporterRegistry::poll_SCI(Uint32 tim
{
if (t->hasDataToRead())
{
- mask.set(node_id);
+ recvdata.m_has_data_transporters.set(node_id);
retVal = 1;
}
}
@@ -1071,8 +1169,11 @@ TransporterRegistry::poll_SCI(Uint32 tim
#ifdef NDB_SHM_TRANSPORTER
static int g_shm_counter = 0;
Uint32
-TransporterRegistry::poll_SHM(Uint32 timeOutMillis, NodeBitmask& mask)
-{
+TransporterRegistry::poll_SHM(Uint32 timeOutMillis,
+ TransporterReceiveHandle& recvdata)
+{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
Uint32 retVal = 0;
for (int j = 0; j < 100; j++)
{
@@ -1085,7 +1186,7 @@ TransporterRegistry::poll_SHM(Uint32 tim
if (t->hasDataToRead())
{
j = 100;
- mask.set(node_id);
+ recvdata.m_has_data_transporters.set(node_id);
retVal = 1;
}
}
@@ -1104,17 +1205,20 @@ TransporterRegistry::poll_SHM(Uint32 tim
* socket, which will be handled correctly in performReceive() (which _is_
* protected by transporter locks on upper layer).
*/
-Uint32
-TransporterRegistry::poll_TCP(Uint32 timeOutMillis, NodeBitmask& mask)
+Uint32
+TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
+ TransporterReceiveHandle& recvdata)
{
- m_socket_poller.clear();
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
+ recvdata.m_socket_poller.clear();
if (m_has_extra_wakeup_socket)
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
// Poll the wakup-socket for read
- m_socket_poller.add(socket, true, false, false);
+ recvdata.m_socket_poller.add(socket, true, false, false);
}
Uint16 idx[MAX_NODES];
@@ -1126,7 +1230,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
{
- idx[i] = m_socket_poller.add(socket, true, false, false);
+ idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
}
else
{
@@ -1134,14 +1238,14 @@ TransporterRegistry::poll_TCP(Uint32 tim
}
}
- tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
+ int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
if (tcpReadSelectReply > 0)
{
if (m_extra_wakeup_sockets)
{
- if (m_socket_poller.has_read(0))
- mask.set((Uint32)0);
+ if (recvdata.m_socket_poller.has_read(0))
+ recvdata.m_has_data_transporters.set((Uint32)0);
}
for (int i = 0; i < nTCPTransporters; i++)
@@ -1158,8 +1262,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
continue;
}
#endif
- if (m_socket_poller.has_read(idx[i]))
- mask.set(node_id);
+ if (recvdata.m_socket_poller.has_read(idx[i]))
+ recvdata.m_has_data_transporters.set(node_id);
}
}
}
@@ -1168,80 +1272,26 @@ TransporterRegistry::poll_TCP(Uint32 tim
}
#endif
-#if defined(HAVE_EPOLL_CREATE)
-bool
-TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
-{
- struct epoll_event event_poll;
- bzero(&event_poll, sizeof(event_poll));
- NDB_SOCKET_TYPE sock_fd = t->getSocket();
- int node_id = t->getRemoteNodeId();
- int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
- int ret_val, error;
-
- if (!my_socket_valid(sock_fd))
- return FALSE;
-
- event_poll.data.u32 = t->getRemoteNodeId();
- event_poll.events = EPOLLIN;
- ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
- if (!ret_val)
- goto ok;
- error= errno;
- if (error == ENOENT && !add)
- {
- /*
- * Could be that socket was closed premature to this call.
- * Not a problem that this occurs.
- */
- goto ok;
- }
- if (!add || (add && (error != ENOMEM)))
- {
- /*
- * Serious problems, we are either using wrong parameters,
- * have permission problems or the socket doesn't support
- * epoll!!
- */
- ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
- " node %u to epoll-set,"
- " errno: %u %s",
- add ? "ADD" : "DEL",
- m_epoll_fd,
- MY_SOCKET_FORMAT_VALUE(sock_fd),
- node_id,
- error,
- strerror(error));
- abort();
- }
- ndbout << "We lacked memory to add the socket for node id ";
- ndbout << node_id << endl;
- return TRUE;
-
-ok:
- return FALSE;
-}
-
-#endif
-
/**
* In multi-threaded cases, this must be protected by a global receive lock.
*/
void
-TransporterRegistry::performReceive()
+TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
bool hasReceived = false;
- if (m_has_data_transporters.get(0))
+ if (recvdata.m_has_data_transporters.get(0))
{
- m_has_data_transporters.clear(Uint32(0));
+ recvdata.m_has_data_transporters.clear(Uint32(0));
consume_extra_sockets();
}
#ifdef ERROR_INSERT
if (!m_blocked.isclear())
{
- if (m_has_data_transporters.isclear())
+ if (recvdata.m_has_data_transporters.isclear())
{
/* poll sees data, but we want to ignore for now
* sleep a little to avoid busy loop
@@ -1253,7 +1303,7 @@ TransporterRegistry::performReceive()
#ifdef NDB_TCP_TRANSPORTER
Uint32 id = 0;
- while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
+ while ((id = recvdata.m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
{
bool hasdata = false;
TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
@@ -1261,20 +1311,20 @@ TransporterRegistry::performReceive()
{
if (t->isConnected())
{
- t->doReceive();
+ t->doReceive(recvdata);
if (hasReceived)
- callbackObj->checkJobBuffer();
+ recvdata.checkJobBuffer();
hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
- callbackObj->transporter_recv_from(id);
- Uint32 szUsed = unpack(ptr, sz, id, ioStates[id]);
+ recvdata.transporter_recv_from(id);
+ Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
t->updateReceiveDataPtr(szUsed);
hasdata = t->hasReceiveData();
}
}
// If transporter still have data, make sure that it's remember to next time
- m_has_data_transporters.set(id, hasdata);
+ recvdata.m_has_data_transporters.set(id, hasdata);
}
#endif
@@ -1310,12 +1360,13 @@ TransporterRegistry::performReceive()
if(t->isConnected() && t->checkConnected())
{
if (hasReceived)
- callbackObj->checkJobBuffer();
+ recvdata.checkJobBuffer();
hasReceived = true;
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
- callbackObj->transporter_recv_from(nodeId);
- Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ recvdata.transporter_recv_from(nodeId);
+ Uint32 *newPtr = unpack(recvdata,
+ readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(newPtr);
}
}
@@ -1452,8 +1503,11 @@ TransporterRegistry::isBlocked(NodeId no
}
void
-TransporterRegistry::blockReceive(NodeId nodeId)
+TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
+ NodeId nodeId)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
/* Check that node is not already blocked?
* Stop pulling from its socket (but track received data etc)
*/
@@ -1462,29 +1516,32 @@ TransporterRegistry::blockReceive(NodeId
m_blocked.set(nodeId);
- if (m_has_data_transporters.get(nodeId))
+ if (recvdata.m_has_data_transporters.get(nodeId))
{
assert(!m_blocked_with_data.get(nodeId));
m_blocked_with_data.set(nodeId);
- m_has_data_transporters.clear(nodeId);
+ recvdata.m_has_data_transporters.clear(nodeId);
}
}
void
-TransporterRegistry::unblockReceive(NodeId nodeId)
+TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
+ NodeId nodeId)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
/* Check that node is blocked?
* Resume pulling from its socket
* Ensure in-flight data is processed if there was some
*/
assert(m_blocked.get(nodeId));
- assert(!m_has_data_transporters.get(nodeId));
+ assert(!recvdata.m_has_data_transporters.get(nodeId));
m_blocked.clear(nodeId);
if (m_blocked_with_data.get(nodeId))
{
- m_has_data_transporters.set(nodeId);
+ recvdata.m_has_data_transporters.set(nodeId);
}
if (m_blocked_disconnected.get(nodeId))
@@ -1492,7 +1549,7 @@ TransporterRegistry::unblockReceive(Node
/* Process disconnect notification/handling now */
m_blocked_disconnected.clear(nodeId);
- report_disconnect(nodeId, m_disconnect_errors[nodeId]);
+ report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
}
}
#endif
@@ -1581,8 +1638,11 @@ TransporterRegistry::do_disconnect(NodeI
}
void
-TransporterRegistry::report_connect(NodeId node_id)
+TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
+ NodeId node_id)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
DBUG_ENTER("TransporterRegistry::report_connect");
DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
@@ -1595,25 +1655,27 @@ TransporterRegistry::report_connect(Node
*/
callbackObj->reset_send_buffer(node_id, true);
- performStates[node_id] = CONNECTED;
-#if defined(HAVE_EPOLL_CREATE)
- if (likely(m_epoll_fd != -1))
+ if (recvdata.epoll_add((TCP_Transporter*)theTransporters[node_id]))
{
- if (change_epoll((TCP_Transporter*)theTransporters[node_id],
- TRUE))
- {
- performStates[node_id] = DISCONNECTING;
- DBUG_VOID_RETURN;
- }
+ performStates[node_id] = CONNECTED;
+ recvdata.reportConnect(node_id);
+ DBUG_VOID_RETURN;
}
-#endif
- callbackObj->reportConnect(node_id);
+
+ /**
+ * Failed to add to epoll_set...
+ * disconnect it (this is really really bad)
+ */
+ performStates[node_id] = DISCONNECTING;
DBUG_VOID_RETURN;
}
void
-TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
+TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
+ NodeId node_id, int errnum)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
DBUG_ENTER("TransporterRegistry::report_disconnect");
DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
@@ -1630,8 +1692,8 @@ TransporterRegistry::report_disconnect(N
#endif
performStates[node_id] = DISCONNECTED;
- m_has_data_transporters.clear(node_id);
- callbackObj->reportDisconnect(node_id, errnum);
+ recvdata.m_has_data_transporters.clear(node_id);
+ recvdata.reportDisconnect(node_id, errnum);
DBUG_VOID_RETURN;
}
@@ -1660,8 +1722,10 @@ TransporterRegistry::report_error(NodeId
* connect and disconnect.
*/
void
-TransporterRegistry::update_connections()
+TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata)
{
+ assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
for (int i= 0, n= 0; n < nTransporters; i++){
Transporter * t = theTransporters[i];
if (!t)
@@ -1674,7 +1738,7 @@ TransporterRegistry::update_connections(
const char *info = m_error_states[nodeId].m_info;
if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
{
- callbackObj->reportError(nodeId, code, info);
+ recvdata.reportError(nodeId, code, info);
m_error_states[nodeId].m_code = TE_NO_ERROR;
m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
}
@@ -1685,11 +1749,11 @@ TransporterRegistry::update_connections(
break;
case CONNECTING:
if(t->isConnected())
- report_connect(nodeId);
+ report_connect(recvdata, nodeId);
break;
case DISCONNECTING:
if(!t->isConnected())
- report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
+ report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
break;
}
}
=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2012-01-04 13:41:41 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2012-01-17 07:57:37 +0000
@@ -23,6 +23,7 @@
#include <NdbMem.h>
#include <NdbTick.h>
+#include <TransporterRegistry.hpp>
#include <SignalLoggerManager.hpp>
#include <FastScheduler.hpp>
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-01-12 13:34:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-01-17 07:57:37 +0000
@@ -80,6 +80,12 @@
#include "../suma/Suma.hpp"
#include "DblqhCommon.hpp"
+/**
+ * overload handling...
+ * TODO: cleanup...from all sorts of perspective
+ */
+#include <TransporterRegistry.hpp>
+
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-12-13 18:32:26 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-01-17 07:57:37 +0000
@@ -88,6 +88,8 @@
#include <signaldata/TransIdAI.hpp>
#include <signaldata/CreateTab.hpp>
+#include <TransporterRegistry.hpp> // error 8035
+
// Use DEBUG to print messages that should be
// seen only when we debug the product
#ifdef VM_TRACE
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-11 13:29:52 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2012-01-17 07:57:37 +0000
@@ -42,6 +42,8 @@
#include <signaldata/DihRestart.hpp>
#include <ndb_version.h>
+#include <TransporterRegistry.hpp> // Get connect address
+
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-11 15:43:32 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-16 13:56:30 +0000
@@ -50,6 +50,9 @@ BLOCK_FUNCTIONS(Trpman)
#ifdef ERROR_INSERT
static NodeBitmask c_error_9000_nodes_mask;
extern Uint32 MAX_RECEIVED_SIGNALS;
+
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance);
#endif
bool
@@ -524,6 +527,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
(arg == 9992)) /* Block recv from nodeid */
{
bool block = (arg == 9992);
+ TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+ assert(recvdata != 0);
for (Uint32 n = 1; n < signal->getLength(); n++)
{
Uint32 nodeId = signal->theData[n];
@@ -536,14 +541,13 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
if (block)
{
ndbout_c("TRPMAN : Blocking receive from node %u", nodeId);
-
- globalTransporterRegistry.blockReceive(nodeId);
+ globalTransporterRegistry.blockReceive(*recvdata, nodeId);
}
else
{
ndbout_c("TRPMAN : Unblocking receive from node %u", nodeId);
- globalTransporterRegistry.unblockReceive(nodeId);
+ globalTransporterRegistry.unblockReceive(*recvdata, nodeId);
}
}
else
@@ -563,6 +567,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
((pattern == 1)? "Other side":"Unknown"));
}
+ TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+ assert(recvdata != 0);
for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
{
if (!handles_this_node(node))
@@ -592,7 +598,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
break;
}
ndbout_c("TRPMAN : Blocking receive from node %u", node);
- globalTransporterRegistry.blockReceive(node);
+ globalTransporterRegistry.blockReceive(*recvdata, node);
}
}
}
@@ -600,6 +606,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
}
if (arg == 9991) /* Unblock recv from all blocked */
{
+ TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+ assert(recvdata != 0);
for (Uint32 node = 1; node < MAX_NODES; node++)
{
if (!handles_this_node(node))
@@ -607,7 +615,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
if (globalTransporterRegistry.isBlocked(node))
{
ndbout_c("CMVMI : Unblocking receive from node %u", node);
- globalTransporterRegistry.unblockReceive(node);
+ globalTransporterRegistry.unblockReceive(*recvdata, node);
}
}
}
=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp 2012-01-12 08:40:08 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp 2012-01-16 07:14:30 +0000
@@ -29,6 +29,8 @@
#include "ndbd.hpp"
+#include <TransporterRegistry.hpp>
+
#include <ConfigRetriever.hpp>
#include <LogLevel.hpp>
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-11 16:31:21 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-17 07:57:37 +0000
@@ -17,6 +17,7 @@
#include <ndb_global.h>
+#include <TransporterRegistry.hpp>
#include "Configuration.hpp"
#include <ErrorHandlingMacros.hpp>
#include "GlobalData.hpp"
=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp 2012-01-16 07:14:30 +0000
@@ -26,7 +26,6 @@
//
//===========================================================================
#include <kernel_types.h>
-#include <TransporterRegistry.hpp>
extern class JobTable globalJobTable;
extern class TimeQueue globalTimeQueue;
=== modified file 'storage/ndb/src/kernel/vm/SectionReader.cpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.cpp 2012-01-16 07:14:30 +0000
@@ -17,7 +17,6 @@
#include <SectionReader.hpp>
-#include <TransporterDefinitions.hpp>
#include "LongSignal.hpp"
#if 0
=== modified file 'storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp'
--- a/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp 2012-01-16 07:14:30 +0000
@@ -17,7 +17,6 @@
*/
#include <SimpleProperties.hpp>
-#include <TransporterDefinitions.hpp>
#include "LongSignal.hpp"
#include "LongSignalImpl.hpp"
#include "SimulatedBlock.hpp"
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-11 13:16:31 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-16 13:56:30 +0000
@@ -17,7 +17,6 @@
#include <ndb_global.h>
-#include <TransporterCallback.hpp>
#include <TransporterRegistry.hpp>
#include <FastScheduler.hpp>
#include <Emulator.hpp>
@@ -80,13 +79,10 @@ const char *lookupConnectionError(Uint32
#ifndef NDBD_MULTITHREADED
extern TransporterRegistry globalTransporterRegistry; // Forward declaration
-class TransporterCallbackKernelNonMT : public TransporterCallbackKernel
+class TransporterCallbackKernelNonMT :
+ public TransporterCallback,
+ public TransporterReceiveHandleKernel
{
- /**
- * Check to see if jobbbuffers are starting to get full
- * and if so call doJob
- */
- int checkJobBuffer() { return globalScheduler.checkDoJob(); }
void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
{
@@ -106,7 +102,8 @@ class TransporterCallbackKernelNonMT : p
}
};
static TransporterCallbackKernelNonMT myTransporterCallback;
-TransporterRegistry globalTransporterRegistry(&myTransporterCallback);
+TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
+ &myTransporterCallback);
#endif
#ifdef NDBD_MULTITHREADED
@@ -137,13 +134,14 @@ void mt_set_section_chunk_size(){}
#endif
void
-TransporterCallbackKernel::deliver_signal(SignalHeader * const header,
- Uint8 prio,
- Uint32 * const theData,
- LinearSectionPtr ptr[3])
+TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
+ Uint8 prio,
+ Uint32 * const theData,
+ LinearSectionPtr ptr[3])
{
#ifdef NDBD_MULTITHREADED
- SectionSegmentPool::Cache & cache = g_receiver_thread_cache[0].cache_instance;
+ SectionSegmentPool::Cache & cache =
+ g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
#endif
const Uint32 secCount = header->m_noOfSections;
@@ -193,10 +191,10 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId,
+ sendlocal(m_thr_no /* self */,
header, theData, secPtrI);
else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
header, theData, secPtrI);
#endif
@@ -230,12 +228,11 @@ TransporterCallbackKernel::deliver_signa
globalScheduler.execute(header, prio, theData, secPtrI);
#else
if (prio == JBB)
- sendlocal(receiverThreadId,
+ sendlocal(m_thr_no /* self */,
header, theData, NULL);
else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
header, theData, NULL);
-
#endif
}
@@ -246,9 +243,9 @@ operator<<(NdbOut& out, const SectionSeg
}
void
-TransporterCallbackKernel::reportError(NodeId nodeId,
- TransporterError errorCode,
- const char *info)
+TransporterReceiveHandleKernel::reportError(NodeId nodeId,
+ TransporterError errorCode,
+ const char *info)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
@@ -314,7 +311,7 @@ TransporterCallbackKernel::reportError(N
Uint32 secPtr[3];
globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
#else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
&signal.header, signal.theData, NULL);
#endif
@@ -352,7 +349,7 @@ TransporterCallbackKernelNonMT::reportSe
* Report average receive length in bytes (4096 last receives)
*/
void
-TransporterCallbackKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
+TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
Uint64 bytes)
{
@@ -372,7 +369,7 @@ TransporterCallbackKernel::reportReceive
Uint32 secPtr[3];
globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
#else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
&signal.header, signal.theData, NULL);
#endif
}
@@ -382,13 +379,17 @@ TransporterCallbackKernel::reportReceive
*/
void
-TransporterCallbackKernel::reportConnect(NodeId nodeId)
+TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
{
SignalT<1> signal;
memset(&signal.header, 0, sizeof(signal.header));
+#ifndef NDBD_MULTITHREADED
Uint32 trpman_instance = 1;
+#else
+ Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
+#endif
signal.header.theLength = 1;
signal.header.theSendersSignalId = 0;
signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
@@ -396,12 +397,12 @@ TransporterCallbackKernel::reportConnect
signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
signal.theData[0] = nodeId;
-
+
#ifndef NDBD_MULTITHREADED
Uint32 secPtr[3];
globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
#else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
&signal.header, signal.theData, NULL);
#endif
}
@@ -410,7 +411,7 @@ TransporterCallbackKernel::reportConnect
* Report connection broken
*/
void
-TransporterCallbackKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
+TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
{
DBUG_ENTER("reportDisconnect");
@@ -418,7 +419,11 @@ TransporterCallbackKernel::reportDisconn
SignalT<sizeof(DisconnectRep)/4> signal;
memset(&signal.header, 0, sizeof(signal.header));
+#ifndef NDBD_MULTITHREADED
Uint32 trpman_instance = 1;
+#else
+ Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
+#endif
signal.header.theLength = DisconnectRep::SignalLength;
signal.header.theSendersSignalId = 0;
signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
@@ -434,7 +439,7 @@ TransporterCallbackKernel::reportDisconn
Uint32 secPtr[3];
globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
#else
- sendprioa(receiverThreadId,
+ sendprioa(m_thr_no /* self */,
&signal.header, signal.theData, NULL);
#endif
@@ -466,11 +471,34 @@ SignalLoggerManager::printSegmentedSecti
putc('\n', output);
}
+/**
+ * Check to see if jobbbuffers are starting to get full
+ * and if so call doJob
+ */
+int
+TransporterReceiveHandleKernel::checkJobBuffer()
+{
+#ifndef NDBD_MULTITHREADED
+ return globalScheduler.checkDoJob();
+#else
+ return mt_checkDoJob();
+#endif
+}
+
void
-TransporterCallbackKernel::transporter_recv_from(NodeId nodeId)
+TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
{
if (globalData.get_hb_count(nodeId) != 0)
{
globalData.set_hb_count(nodeId) = 0;
}
}
+
+#ifndef NDBD_MULTITHREADED
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance)
+{
+ assert(instance == 0);
+ return &myTransporterCallback;
+}
+#endif
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-01-16 13:56:30 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2008 MySQL AB
+/* Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -13,9 +13,31 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-class TransporterCallbackKernel: public TransporterCallback
+#ifndef TRANSPORTER_CALLBACK_KERNEL_HPP
+#define TRANSPORTER_CALLBACK_KERNEL_HPP
+
+#include <TransporterCallback.hpp>
+
+class TransporterReceiveHandleKernel
+ : public TransporterReceiveHandle
{
public:
+#ifdef NDBD_MULTITHREADED
+ TransporterReceiveHandleKernel(Uint32 thr_no, Uint32 recv_thr_no) :
+ m_thr_no(thr_no), m_receiver_thread_idx(recv_thr_no) {}
+
+ /**
+ * m_thr_no == index in m_thr_data[]
+ */
+ Uint32 m_thr_no;
+
+ /**
+ * m_receiver_thread_idx == m_thr_no - firstReceiverThread ==
+ * instance() - 1(proxy)
+ */
+ Uint32 m_receiver_thread_idx;
+#endif
+
/* TransporterCallback interface. */
void deliver_signal(SignalHeader * const header,
Uint8 prio,
@@ -27,5 +49,8 @@ public:
void reportError(NodeId nodeId, TransporterError errorCode,
const char *info = 0);
void transporter_recv_from(NodeId node);
- virtual ~TransporterCallbackKernel() { }
+ int checkJobBuffer();
+ virtual ~TransporterReceiveHandleKernel() { }
};
+
+#endif
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-12 13:34:04 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-17 07:57:37 +0000
@@ -1,4 +1,4 @@
-/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
+/* Copyright (c) 2008, 2011, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -15,6 +15,8 @@
#include <ndb_global.h>
+#define NDBD_MULTITHREADED
+
#include <VMSignal.hpp>
#include <kernel_types.h>
#include <Prio.hpp>
@@ -24,6 +26,7 @@
#include <GlobalData.hpp>
#include <WatchDog.hpp>
#include <TransporterDefinitions.hpp>
+#include <TransporterRegistry.hpp>
#include "FastScheduler.hpp"
#include "mt.hpp"
#include <DebuggerNames.hpp>
@@ -928,12 +931,11 @@ struct mt_send_handle : public Transpor
virtual bool forceSend(NodeId node);
};
-struct trp_callback : public TransporterCallbackKernel
+struct trp_callback : public TransporterCallback
{
trp_callback() {}
/* Callback interface. */
- int checkJobBuffer();
void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
void lock_transporter(NodeId node);
void unlock_transporter(NodeId node);
@@ -1692,7 +1694,7 @@ trp_callback::unlock_transporter(NodeId
}
int
-trp_callback::checkJobBuffer()
+mt_checkDoJob()
{
struct thr_repository* rep = &g_thr_repository;
if (unlikely(check_job_buffers(rep)))
@@ -2824,8 +2826,6 @@ aligned_signal(unsigned char signal_buf[
return (Signal *)sigtmp;
}
-Uint32 receiverThreadId;
-
/*
* We only do receive in receiver thread(s), no other threads do receive.
*
@@ -2836,6 +2836,14 @@ Uint32 receiverThreadId;
* receive loop; this way we avoid races between update_connections() and
* TRPMAN calls into the transporters.
*/
+
+/**
+ * Array of pointers to TransporterReceiveHandleKernel
+ * these are not used "in traffic"
+ */
+TransporterReceiveHandleKernel *
+ g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
+
extern "C"
void *
mt_receiver_thread_main(void *thr_arg)
@@ -2848,20 +2856,30 @@ mt_receiver_thread_main(void *thr_arg)
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
bool has_received = false;
- const unsigned recv_thread_no = 0;
+ const unsigned recv_thread_idx = 0;
init_thread(selfptr);
- receiverThreadId = thr_no;
signal = aligned_signal(signal_buf, thr_no);
+ /**
+ * Object that keeps track of our pollReceive-state
+ */
+ TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
+ globalTransporterRegistry.init(recvdata);
+
+ /**
+ * Save pointer to this for management/error-insert
+ */
+ g_trp_receive_handle_ptr[recv_thread_idx] = &recvdata;
+
while (globalData.theRestartFlag != perform_stop)
- {
+ {
static int cnt = 0;
if (cnt == 0)
{
watchDogCounter = 5;
- globalTransporterRegistry.update_connections();
+ globalTransporterRegistry.update_connections(recvdata);
}
cnt = (cnt + 1) & 15;
@@ -2883,14 +2901,14 @@ mt_receiver_thread_main(void *thr_arg)
watchDogCounter = 7;
has_received = false;
- if (globalTransporterRegistry.pollReceive(1))
+ if (globalTransporterRegistry.pollReceive(1, recvdata))
{
if (check_job_buffers(rep) == 0)
{
watchDogCounter = 8;
- lock(&rep->m_receive_lock[recv_thread_no]);
- globalTransporterRegistry.performReceive();
- unlock(&rep->m_receive_lock[recv_thread_no]);
+ lock(&rep->m_receive_lock[recv_thread_idx]);
+ globalTransporterRegistry.performReceive(recvdata);
+ unlock(&rep->m_receive_lock[recv_thread_idx]);
has_received = true;
}
}
@@ -4145,6 +4163,16 @@ mt_get_thr_stat(class SimulatedBlock * b
dst->local_sent_priob = selfptr->m_stat.m_priob_count;
}
+TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance)
+{
+ assert(instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS);
+ if (instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS)
+ {
+ return g_trp_receive_handle_ptr[instance - 1 /* proxy */];
+ }
+ return 0;
+}
/**
* Global data
@@ -4153,4 +4181,5 @@ struct thr_repository g_thr_repository;
struct trp_callback g_trp_callback;
-TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);
+TransporterRegistry globalTransporterRegistry(&g_trp_callback, NULL,
+ false);
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2011-10-07 08:07:21 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2012-01-16 13:56:30 +0000
@@ -13,21 +13,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-#include <kernel_types.h>
-#include <TransporterDefinitions.hpp>
-
#ifndef ndb_mt_hpp
#define ndb_mt_hpp
-
-/*
- For now, we use locks to only have one thread at the time running in the
- transporter as sender, and only one as receiver.
-
- Thus, we can use a global variable to record the id of the current
- transporter threads. Only valid while holding the transporter receive lock.
-*/
-extern Uint32 receiverThreadId;
+#include <kernel_types.h>
+#include <TransporterDefinitions.hpp>
Uint32 mt_get_instance_count(Uint32 block);
@@ -57,6 +47,8 @@ SendStatus mt_send_remote(Uint32 self, c
void mt_section_lock();
void mt_section_unlock();
+int mt_checkDoJob();
+
/**
* Are we (not) multi threaded
*/
@@ -110,4 +102,11 @@ struct ndb_thr_stat
void
mt_get_thr_stat(class SimulatedBlock *, ndb_thr_stat* dst);
+/**
+ * Get TransporterReceiveHandle for a specific trpman instance
+ * Currently used for error insert that block/unblock traffic
+ */
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance);
+
#endif
=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2012-01-16 07:14:30 +0000
@@ -20,10 +20,11 @@
#include <NdbEventOperation.hpp>
#include <signaldata/SumaImpl.hpp>
-#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <UtilBuffer.hpp>
+#include <Vector.hpp>
+#include <NdbMutex.h>
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
//#define EVENT_DEBUG
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-10-14 13:39:05 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2012-01-17 07:57:37 +0000
@@ -351,7 +351,7 @@ TransporterFacade::start_instance(NodeId
(void)signal(SIGPIPE, SIG_IGN);
#endif
- theTransporterRegistry = new TransporterRegistry(this);
+ theTransporterRegistry = new TransporterRegistry(this, this);
if (theTransporterRegistry == NULL)
return -1;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-09 10:48:14 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2012-01-16 08:25:04 +0000
@@ -41,7 +41,9 @@ extern "C" {
void* runReceiveResponse_C(void*);
}
-class TransporterFacade : public TransporterCallback
+class TransporterFacade :
+ public TransporterCallback,
+ public TransporterReceiveHandle
{
public:
/**
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4413 to 4414) | jonas oreland | 17 Jan |