Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1804 05/03/16 01:18:11 tomas@stripped +10 -0
moved EventBuffer handling to the ndb object
ndb/test/ndbapi/test_event_multi_table.cpp
1.4 05/03/16 01:18:03 tomas@stripped +8 -0
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/Ndbinit.cpp
1.31 05/03/16 01:18:03 tomas@stripped +17 -10
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/Ndbif.cpp
1.30 05/03/16 01:18:03 tomas@stripped +51 -2
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/NdbImpl.hpp
1.9 05/03/16 01:18:03 tomas@stripped +2 -0
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/NdbEventOperationImpl.hpp
1.9 05/03/16 01:18:03 tomas@stripped +29 -73
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/NdbEventOperationImpl.cpp
1.20 05/03/16 01:18:03 tomas@stripped +120 -510
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/NdbDictionaryImpl.hpp
1.31 05/03/16 01:18:03 tomas@stripped +4 -10
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/NdbDictionaryImpl.cpp
1.73 05/03/16 01:18:03 tomas@stripped +12 -88
moved EventBuffer handling to the ndb object
ndb/src/ndbapi/Ndb.cpp
1.49 05/03/16 01:18:03 tomas@stripped +22 -18
moved EventBuffer handling to the ndb object
ndb/include/ndbapi/Ndb.hpp
1.43 05/03/16 01:18:02 tomas@stripped +2 -3
moved EventBuffer handling to the ndb object
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-wl2325
--- 1.42/ndb/include/ndbapi/Ndb.hpp 2005-03-15 11:00:07 +01:00
+++ 1.43/ndb/include/ndbapi/Ndb.hpp 2005-03-16 01:18:02 +01:00
@@ -1053,7 +1053,7 @@
friend class NdbReceiver;
friend class NdbOperation;
friend class NdbEventOperationImpl;
- friend class NdbGlobalEventBufferHandle;
+ friend class NdbEventBuffer;
friend class NdbTransaction;
friend class Table;
friend class NdbApiSignal;
@@ -1503,7 +1503,6 @@
NdbIndexScanOperation* getScanOperation(); // Get a scan operation from idle
NdbIndexOperation* getIndexOperation();// Get an index operation from idle
- class NdbGlobalEventBufferHandle* getGlobalEventBufferHandle();
NdbBlob* getNdbBlob();// Get a blob handle etc
void releaseSignal(NdbApiSignal* anApiSignal);
@@ -1645,7 +1644,7 @@
class NdbImpl * theImpl;
class NdbDictionaryImpl* theDictionary;
- class NdbGlobalEventBufferHandle* theGlobalEventBufferHandle;
+ class NdbEventBuffer* theEventBuffer;
NdbTransaction* theConIdleList; // First connection in idle list.
--- 1.48/ndb/src/ndbapi/Ndb.cpp 2005-03-15 11:00:07 +01:00
+++ 1.49/ndb/src/ndbapi/Ndb.cpp 2005-03-16 01:18:03 +01:00
@@ -1210,10 +1210,7 @@
NdbEventOperation* Ndb::createEventOperation(const char* eventName,
const int bufferLength)
{
- NdbEventOperation* tOp;
-
- tOp = new NdbEventOperation(this, eventName);
-
+ NdbEventOperation* tOp= new NdbEventOperation(this, eventName);
if (tOp == 0)
{
theError.code= 4000;
@@ -1226,34 +1223,41 @@
tOp = NULL;
}
- //now we have to look up this event in dict
+ // keep track of all event operations
+ NdbEventOperationImpl &op= tOp->m_impl;
+ op.m_next= theImpl->m_ev_op;
+ op.m_prev= 0;
+ theImpl->m_ev_op= &op;
+ if (op.m_next)
+ op.m_next->m_prev= &op;
return tOp;
}
-int Ndb::dropEventOperation(NdbEventOperation* op) {
- delete op;
- return 0;
-}
-
-NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle()
+int Ndb::dropEventOperation(NdbEventOperation* tOp)
{
- return theGlobalEventBufferHandle;
+ DBUG_ENTER("Ndb::dropEventOperation");
+ // remove it from list
+ NdbEventOperationImpl &op= tOp->m_impl;
+ if (op.m_next)
+ op.m_next->m_prev= op.m_prev;
+ if (op.m_prev)
+ op.m_prev->m_next= op.m_next;
+ else
+ theImpl->m_ev_op= op.m_next;
+ delete tOp;
+ DBUG_RETURN(0);
}
-//void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs)
-//{
-//}
-
int
Ndb::pollEvents(int aMillisecondNumber)
{
- return theGlobalEventBufferHandle->wait(aMillisecondNumber);
+ return theEventBuffer->wait(aMillisecondNumber);
}
NdbEventOperation *Ndb::nextEvent(int *error)
{
- return theGlobalEventBufferHandle->next(error);
+ return theEventBuffer->next(error);
}
#ifdef VM_TRACE
--- 1.72/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2005-03-03 17:29:20 +01:00
+++ 1.73/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2005-03-16 01:18:03 +01:00
@@ -589,7 +589,6 @@
{
mi_type = 0;
m_dur = NdbDictionary::Event::ED_UNDEFINED;
- eventOp = NULL;
m_tableImpl = NULL;
}
@@ -599,7 +598,6 @@
{
mi_type = 0;
m_dur = NdbDictionary::Event::ED_UNDEFINED;
- eventOp = NULL;
m_tableImpl = NULL;
}
@@ -904,12 +902,6 @@
case GSN_SUB_START_REF:
tmp->execSUB_START_REF(signal, ptr);
break;
- case GSN_SUB_TABLE_DATA:
- tmp->execSUB_TABLE_DATA(signal, ptr);
- break;
- case GSN_SUB_GCP_COMPLETE_REP:
- tmp->execSUB_GCP_COMPLETE_REP(signal, ptr);
- break;
case GSN_SUB_STOP_CONF:
tmp->execSUB_STOP_CONF(signal, ptr);
break;
@@ -2537,15 +2529,15 @@
}
int
-NdbDictionaryImpl::executeSubscribeEvent(NdbEventImpl & ev)
+NdbDictionaryImpl::executeSubscribeEvent(NdbEventOperationImpl & ev_op)
{
// NdbDictInterface m_receiver;
- return m_receiver.executeSubscribeEvent(m_ndb, ev);
+ return m_receiver.executeSubscribeEvent(m_ndb, ev_op);
}
int
NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
- NdbEventImpl & evnt)
+ NdbEventOperationImpl & ev_op)
{
DBUG_ENTER("NdbDictInterface::executeSubscribeEvent");
NdbApiSignal tSignal(m_reference);
@@ -2556,10 +2548,10 @@
SubStartReq * sumaStart = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
- sumaStart->subscriptionId = evnt.m_eventId;
- sumaStart->subscriptionKey = evnt.m_eventKey;
+ sumaStart->subscriptionId = ev_op.m_eventImpl->m_eventId;
+ sumaStart->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
sumaStart->part = SubscriptionData::TableData;
- sumaStart->subscriberData = evnt.m_bufferId & 0xFF;
+ sumaStart->subscriberData = ev_op.m_oid;
sumaStart->subscriberRef = m_reference;
DBUG_RETURN(executeSubscribeEvent(&tSignal, NULL));
@@ -2578,15 +2570,15 @@
}
int
-NdbDictionaryImpl::stopSubscribeEvent(NdbEventImpl & ev)
+NdbDictionaryImpl::stopSubscribeEvent(NdbEventOperationImpl & ev_op)
{
// NdbDictInterface m_receiver;
- return m_receiver.stopSubscribeEvent(m_ndb, ev);
+ return m_receiver.stopSubscribeEvent(m_ndb, ev_op);
}
int
NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
- NdbEventImpl & evnt)
+ NdbEventOperationImpl & ev_op)
{
DBUG_ENTER("NdbDictInterface::stopSubscribeEvent");
@@ -2598,9 +2590,9 @@
SubStopReq * sumaStop = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
- sumaStop->subscriptionId = evnt.m_eventId;
- sumaStop->subscriptionKey = evnt.m_eventKey;
- sumaStop->subscriberData = evnt.m_bufferId & 0xFF;
+ sumaStop->subscriptionId = ev_op.m_eventImpl->m_eventId;
+ sumaStop->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
+ sumaStop->subscriberData = ev_op.m_oid;
sumaStop->part = (Uint32) SubscriptionData::TableData;
sumaStop->subscriberRef = m_reference;
@@ -2803,74 +2795,6 @@
m_error.code= subStartRef->errorCode;
m_waiter.signal(NO_WAIT);
DBUG_VOID_RETURN;
-}
-void
-NdbDictInterface::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal,
- LinearSectionPtr ptr[3])
-{
- const SubGcpCompleteRep * const rep=
- CAST_CONSTPTR(SubGcpCompleteRep, signal->getDataPtr());
-
- const Uint32 gci = rep->gci;
- // const Uint32 senderRef = rep->senderRef;
- const Uint32 subscriberData = rep->subscriberData;
-
- const Uint32 bufferId = subscriberData;
-
- const Uint32 ref = signal->theSendersBlockRef;
-
- NdbApiSignal tSignal(m_reference);
- SubGcpCompleteAcc * acc=
- CAST_PTR(SubGcpCompleteAcc, tSignal.getDataPtrSend());
-
- acc->rep = *rep;
-
- tSignal.theReceiversBlockNumber = refToBlock(ref);
- tSignal.theVerId_signalNumber = GSN_SUB_GCP_COMPLETE_ACC;
- tSignal.theLength = SubGcpCompleteAcc::SignalLength;
-
- Uint32 aNodeId = refToNode(ref);
-
- // m_transporter->lock_mutex();
- int r;
- r = m_transporter->sendSignal(&tSignal, aNodeId);
- // m_transporter->unlock_mutex();
-
- NdbGlobalEventBufferHandle::latestGCI(bufferId, gci);
-}
-
-void
-NdbDictInterface::execSUB_TABLE_DATA(NdbApiSignal * signal,
- LinearSectionPtr ptr[3])
-{
-#ifdef EVENT_DEBUG
- const char * FNAME = "NdbDictInterface::execSUB_TABLE_DATA";
-#endif
- //TODO
- const SubTableData * const sdata = CAST_CONSTPTR(SubTableData,
signal->getDataPtr());
-
- // const Uint32 gci = sdata->gci;
- // const Uint32 operation = sdata->operation;
- // const Uint32 tableId = sdata->tableId;
- // const Uint32 noOfAttrs = sdata->noOfAttributes;
- // const Uint32 dataLen = sdata->dataSize;
- const Uint32 subscriberData = sdata->subscriberData;
- // const Uint32 logType = sdata->logType;
-
- for (int i=signal->m_noOfSections;i < 3; i++) {
- ptr[i].p = NULL;
- ptr[i].sz = 0;
- }
-#ifdef EVENT_DEBUG
- ndbout_c("%s: senderData %d, gci %d, operation %d, tableId %d, noOfAttrs %d, dataLen
%d",
- FNAME, subscriberData, gci, operation, tableId, noOfAttrs, dataLen);
- ndbout_c("ptr[0] %u %u ptr[1] %u %u ptr[2] %u %u\n",
- ptr[0].p,ptr[0].sz,ptr[1].p,ptr[1].sz,ptr[2].p,ptr[2].sz);
-#endif
- const Uint32 bufferId = subscriberData;
-
- NdbGlobalEventBufferHandle::insertDataL(bufferId,
- sdata, ptr);
}
/*****************************************************************
--- 1.30/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2005-02-24 12:55:59 +01:00
+++ 1.31/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2005-03-16 01:18:03 +01:00
@@ -241,10 +241,6 @@
Vector<NdbColumnImpl *> m_columns;
Vector<unsigned> m_attrIds;
- int m_bufferId;
-
- NdbEventOperation *eventOp;
-
static NdbEventImpl & getImpl(NdbDictionary::Event & t);
static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
NdbDictionary::Event * m_facade;
@@ -300,10 +296,10 @@
int dropEvent(const NdbEventImpl &);
int dropEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
- int executeSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
+ int executeSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
int executeSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
- int stopSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
+ int stopSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
int stopSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool
fullyQualifiedNames);
@@ -355,8 +351,6 @@
void execCREATE_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_START_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_START_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
- void execSUB_TABLE_DATA(NdbApiSignal *, LinearSectionPtr ptr[3]);
- void execSUB_GCP_COMPLETE_REP(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_STOP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_STOP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
@@ -399,8 +393,8 @@
int createEvent(NdbEventImpl &);
int dropEvent(const char * eventName);
- int executeSubscribeEvent(NdbEventImpl &);
- int stopSubscribeEvent(NdbEventImpl &);
+ int executeSubscribeEvent(NdbEventOperationImpl &);
+ int stopSubscribeEvent(NdbEventOperationImpl &);
int listObjects(List& list, NdbDictionary::Object::Type type);
int listIndexes(List& list, Uint32 indexId);
--- 1.19/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-15 11:00:09 +01:00
+++ 1.20/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-16 01:18:03 +01:00
@@ -50,14 +50,15 @@
//#define EVENT_DEBUG
// todo handle several ndb objects
-// todo free allocated data when closing NdbGlobalEventBuffer
+// todo free allocated data when closing NdbEventBuffer
NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
Ndb *theNdb,
const char* eventName)
- : NdbEventOperation(*this), m_facade(this), m_ndb(theNdb),
- m_state(EO_ERROR)
+ : NdbEventOperation(*this), m_facade(this), m_magic_number(0),
+ m_ndb(theNdb), m_state(EO_ERROR), m_next(0), m_prev(0)
{
+ DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl");
m_eventId = 0;
theFirstPkAttrs[0] = NULL;
theCurrentPkAttrs[0] = NULL;
@@ -75,25 +76,30 @@
if (!m_ndb) abort();
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
- if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; }
+ if (!myDict) { m_error.code= m_ndb->getNdbError().code; DBUG_VOID_RETURN; }
const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
- if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; }
+ if (!myEvnt) { m_error.code= myDict->getNdbError().code; DBUG_VOID_RETURN; }
m_eventImpl = &myEvnt->m_impl;
m_eventId = m_eventImpl->m_eventId;
- m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
+ m_event_buffer= m_ndb->theEventBuffer;
m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
m_state = EO_CREATED;
+
+ DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
+ DBUG_VOID_RETURN;
}
NdbEventOperationImpl::~NdbEventOperationImpl()
{
+ DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl");
int i;
+ m_magic_number= 0;
for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstPkAttrs[i];
while (p) {
@@ -113,10 +119,12 @@
if (m_state == EO_EXECUTING) {
stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
- ; // We should send stop signal here
+ ; // ToDo? We should send stop signal here
}
m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
+ DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
+ DBUG_VOID_RETURN;
}
NdbEventOperation::State
@@ -241,30 +249,19 @@
}
- NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
-
-
- int r= m_bufferHandle->prepareAddSubscribeEvent(this);
- m_error.code= 4709;
-
- if (r < 0)
- {
- DBUG_RETURN(-1);
- }
-
- m_eventImpl->m_bufferId = m_bufferId = (Uint32)r;
-
- r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
- if (r) {
- //Error
- m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
- m_state = EO_ERROR;
- DBUG_RETURN(r);
- } else {
- m_bufferHandle->addSubscribeEvent(m_bufferId);
- m_state = EO_EXECUTING;
+ m_event_buffer->add_drop_lock();
+ m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
+ m_state= EO_EXECUTING;
+ int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
+ if (r == 0) {
+ m_event_buffer->add_drop_unlock();
DBUG_RETURN(0);
}
+ //Error
+ m_state= EO_ERROR;
+ m_magic_number= 0;
+ m_event_buffer->add_drop_unlock();
+ DBUG_RETURN(r);
}
int
@@ -282,27 +279,19 @@
DBUG_RETURN(-1);
}
- NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
-
- int ret = m_bufferHandle->prepareDropSubscribeEvent(m_bufferId);
-
- if (ret < 0) {
- m_error.code= 4712;
- DBUG_RETURN(-1);
- }
-
- int r= myDictImpl.stopSubscribeEvent(*m_eventImpl);
- if (r) {
- //Error
- m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
- m_error.code= myDictImpl.m_error.code;
- m_state = EO_ERROR;
- DBUG_RETURN(r);
- } else {
- m_bufferHandle->dropSubscribeEvent(m_bufferId);
- m_state = EO_CREATED;
+ m_event_buffer->add_drop_lock();
+ int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this);
+ m_state= EO_CREATED;
+ m_magic_number= 0;
+ if (r == 0) {
+ m_event_buffer->add_drop_unlock();
DBUG_RETURN(0);
}
+ //Error
+ m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code;
+ m_state= EO_ERROR;
+ m_event_buffer->add_drop_unlock();
+ DBUG_RETURN(r);
}
bool
@@ -320,7 +309,7 @@
Uint32
NdbEventOperationImpl::getLatestGCI()
{
- return m_bufferHandle->getLatestGCI();
+ return m_event_buffer->getLatestGCI();
}
int
@@ -330,7 +319,7 @@
int nr = 10000; // a high value
while (nr > 0) {
- int r= m_bufferHandle->getDataL(m_bufferId);
+ int r= m_event_buffer->getDataL(this);
if (r <= 0)
{
@@ -537,246 +526,100 @@
}
/*
- * Global variable ndbGlobalEventBuffer
- * Class NdbGlobalEventBufferHandle
- * Class NdbGlobalEventBuffer
- *
- */
-
-#define ADD_DROP_LOCK_GUARDR(TYPE, FN) \
-{ \
- ndbGlobalEventBuffer->add_drop_lock(); \
- lock(); \
- TYPE r = ndbGlobalEventBuffer->FN; \
- unlock(); \
- if (r < 0) { \
- ndbGlobalEventBuffer->add_drop_unlock(); \
- } \
- return r;\
-}
-#define GUARDR(TYPE, FN) \
-{ \
- lock(); \
- TYPE r = ndbGlobalEventBuffer->FN; \
- unlock(); \
- return r;\
-}
-#define GUARD(FN) \
-{ \
- lock(); \
- ndbGlobalEventBuffer->FN; \
- unlock(); \
-}
-#define ADD_DROP_UNLOCK_GUARD(FN) \
-{ \
- GUARD(FN); \
- ndbGlobalEventBuffer->add_drop_unlock(); \
-}
-#define GUARDBLOCK(BLOCK) \
-{ \
- lock(); \
- BLOCK \
- unlock(); \
-}
-
-/*
- * Global variable ndbGlobalEventBuffer
- *
+ * Class NdbEventBuffer
+ * Each Ndb object has a Object.
*/
-static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL;
-
-/*
- * Class NdbGlobalEventBufferHandle
- * Each Ndb object has a Handle. This Handle is used to access the
- * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer
- */
+// ToDo ref count this so it get's destroyed
+NdbMutex *NdbEventBuffer::p_add_drop_mutex= 0;
-NdbGlobalEventBufferHandle *
-NdbGlobalEventBuffer_init(Ndb *ndb, int n)
-{
- return new NdbGlobalEventBufferHandle(ndb, n);
- // return NdbGlobalEventBufferHandle::init(n);
-}
-
-void
-NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h)
-{
- delete h;
-}
-
-NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle
-(Ndb *ndb, int MAX_NUMBER_ACTIVE_EVENTS) : m_nids(0)
+NdbEventBuffer::NdbEventBuffer(Ndb *ndb)
+ : m_latestGCI(0)
{
if ((p_cond = NdbCondition_Create()) == NULL) {
- ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed");
+ ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
exit(-1);
}
-
m_mutex= ndb->theImpl->theWaiter.m_mutex;
-
- NdbMutex_Lock(m_mutex);
- if (ndbGlobalEventBuffer == NULL) {
- if (ndbGlobalEventBuffer == NULL) {
- ndbGlobalEventBuffer = new NdbGlobalEventBuffer();
- if (!ndbGlobalEventBuffer) {
- unlock();
- ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer");
- exit(-1);
- }
+ lock();
+ if (p_add_drop_mutex == 0)
+ {
+ if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
+ ndbout_c("NdbEventBuffer: NdbMutex_Create() failed");
+ exit(-1);
}
}
- NdbMutex_Unlock(m_mutex);
-
- GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));
-}
-
-NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
-{
- NdbCondition_Destroy(p_cond);
-
- lock();
- ndbGlobalEventBuffer->real_remove(this);
unlock();
- lock();
- if (ndbGlobalEventBuffer->m_handlers.size() == 0) {
- delete ndbGlobalEventBuffer;
- ndbGlobalEventBuffer = NULL;
+ // ToDo set event buffer size
+ // pre allocate event data array
+ m_sz= 10000;
+ m_allocated_data=
+ (EventBufItem::Data *)NdbMem_Allocate(m_sz*sizeof(EventBufItem::Data));
+ bzero((void*)m_allocated_data, m_sz*sizeof(EventBufItem::Data));
+ for (unsigned i= 0; i < m_sz-1; i++)
+ {
+ m_allocated_data[i].m_next= &m_allocated_data[i+1];
}
- unlock();
-}
+ m_allocated_data[m_sz-1].m_next= 0;
-void
-NdbGlobalEventBufferHandle::addBufferId(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBufferHandle::addBufferId");
- DBUG_PRINT("enter",("bufferId=%d",bufferId));
- if (m_nids >= NDB_MAX_ACTIVE_EVENTS) {
- ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting");
- exit(-1);
- }
- m_bufferIds[m_nids] = bufferId;
- m_nids++;
- DBUG_VOID_RETURN;
-}
+ // initialize lists
+ m_received_data= 0;
+ m_last_received_data= 0;
+ m_received_data_count= 0;
-void
-NdbGlobalEventBufferHandle::dropBufferId(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBufferHandle::dropBufferId");
- DBUG_PRINT("enter",("bufferId=%d",bufferId));
- for (int i = 0; i < m_nids; i++)
- if (m_bufferIds[i] == bufferId) {
- m_nids--;
- for (; i < m_nids; i++)
- m_bufferIds[i] = m_bufferIds[i+1];
- DBUG_VOID_RETURN;
- }
- ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist",
- bufferId);
- exit(-1);
-}
-/*
-NdbGlobalEventBufferHandle *
-NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS)
-{
- return new NdbGlobalEventBufferHandle();
-}
-void
-NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
-{
- delete handle;
-}
-*/
-int
-NdbGlobalEventBufferHandle::prepareAddSubscribeEvent
-(NdbEventOperationImpl *eventOp)
-{
- ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp));
-}
-void
-NdbGlobalEventBufferHandle::addSubscribeEvent(int bufferId)
-{
- ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId));
-}
-void
-NdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId)
-{
- ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId));
-}
+ m_available_data= 0;
+ m_last_available_data= 0;
+ m_available_data_count= 0;
-int
-NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId)
-{
- ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId));
-}
+ m_used_data= 0;
+ m_last_used_data= 0;
-void
-NdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId)
-{
- ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId));
+ m_free_data= m_allocated_data;
}
-void
-NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId)
+NdbEventBuffer::~NdbEventBuffer()
{
- ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId));
-}
+ // todo lock? what id receive thread writes here?
+ for (unsigned i= 0; i < m_sz; i++)
+ {
+ if (m_allocated_data[i].sdata)
+ NdbMem_Free(m_allocated_data[i].sdata);
+ }
-int
-NdbGlobalEventBufferHandle::insertDataL(int bufferId,
- const SubTableData * const sdata,
- LinearSectionPtr ptr[3])
-{
- // called by ndb receive thread, thread already has lock
- // on m_mutex
- return ndbGlobalEventBuffer->real_insertDataL(bufferId,sdata,ptr);
-}
-
-void
-NdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci)
-{
- // called by ndb receive thread, thread already has lock
- // on m_mutex
- ndbGlobalEventBuffer->real_latestGCI(bufferId,gci);
+ NdbMem_Free(m_allocated_data);
+ NdbCondition_Destroy(p_cond);
+// static global NdbMutex_Destroy(p_add_drop_mutex);
}
-Uint32
-NdbGlobalEventBufferHandle::getLatestGCI()
-{
- // just reads one single variable, no mutex needed
- return ndbGlobalEventBuffer->real_getLatestGCI();
-}
-
int
-NdbGlobalEventBufferHandle::wait(int aMillisecondNumber,
- NdbEventOperation **p_ev_op)
+NdbEventBuffer::wait(int aMillisecondNumber,
+ NdbEventOperation **p_ev_op)
{
- DBUG_ENTER("NdbGlobalEventBufferHandle::wait");
+ DBUG_ENTER("NdbEventBuffer::wait");
NdbMutex_Lock(m_mutex);
- NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
- NdbEventOperationImpl *ev_op= ndbGlobalEventBuffer->move_data();
+ if (m_received_data == 0)
+ NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
+ NdbEventOperationImpl *ev_op= move_data();
NdbMutex_Unlock(m_mutex); // we have moved the data
if (ev_op)
{
if (p_ev_op)
*p_ev_op= ev_op->m_facade;
- DBUG_PRINT("exit",("m_available_data_count %u",
- ndbGlobalEventBuffer->m_available_data_count));
- DBUG_RETURN(ndbGlobalEventBuffer->m_available_data_count);
+ DBUG_PRINT("exit",("m_available_data_count %u", m_available_data_count));
+ DBUG_RETURN(m_available_data_count);
}
DBUG_RETURN(0);
}
NdbEventOperation *
-NdbGlobalEventBufferHandle::next(int *error)
+NdbEventBuffer::next(int *error)
{
- DBUG_ENTER("NdbGlobalEventBufferHandle::next");
+ DBUG_ENTER("NdbEventBuffer::next");
EventBufItem::Data *data;
- while((data = ndbGlobalEventBuffer->m_available_data))
+ while((data = m_available_data))
{
NdbEventOperationImpl *ev_op= data->m_event_op;
int r= ev_op->next();
@@ -794,258 +637,29 @@
DBUG_RETURN(0);
}
-int NdbGlobalEventBufferHandle::getDataL(const int bufferId)
-{
- return ndbGlobalEventBuffer->real_getDataL(bufferId);
-}
-
-/*
- * Class NdbGlobalEventBuffer
- *
- *
- */
-
-
void
-NdbGlobalEventBufferHandle::lock()
+NdbEventBuffer::lock()
{
NdbMutex_Lock(m_mutex);
}
void
-NdbGlobalEventBufferHandle::unlock()
+NdbEventBuffer::unlock()
{
NdbMutex_Unlock(m_mutex);
}
void
-NdbGlobalEventBuffer::add_drop_lock()
+NdbEventBuffer::add_drop_lock()
{
NdbMutex_Lock(p_add_drop_mutex);
}
void
-NdbGlobalEventBuffer::add_drop_unlock()
+NdbEventBuffer::add_drop_unlock()
{
NdbMutex_Unlock(p_add_drop_mutex);
}
-// Private methods
-
-NdbGlobalEventBuffer::NdbGlobalEventBuffer() :
- m_handlers(),
- m_latestGCI(0),
- m_no(0) // must start at ZERO!
-{
- if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
- ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
- exit(-1);
- }
-
- // ToDo set event buffer size
- // pre allocate event data array
- int sz= 10000;
- m_allocated_data=
- (EventBufItem::Data *)NdbMem_Allocate(sz*sizeof(EventBufItem::Data));
- bzero((void*)m_allocated_data, sz*sizeof(EventBufItem::Data));
- for (int i= 0; i < sz-1; i++)
- {
- m_allocated_data[i].m_next= &m_allocated_data[i+1];
- }
- m_allocated_data[sz-1].m_next= 0;
-
- // initialize lists
- m_received_data= 0;
- m_last_received_data= 0;
- m_received_data_count= 0;
-
- m_available_data= 0;
- m_last_available_data= 0;
- m_available_data_count= 0;
-
- m_used_data= 0;
- m_last_used_data= 0;
-
- m_free_data= m_allocated_data;
-}
-
-NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
-{
- NdbMem_Free((void*)m_allocated_data);
- NdbMutex_Destroy(p_add_drop_mutex);
- // NdbMem_Deallocate(m_eventBufferIdToEventId);
-}
-void
-NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
- int MAX_NUMBER_ACTIVE_EVENTS)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_init");
- DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
- if (m_handlers.size() == 0)
- { // First init
- DBUG_PRINT("info",("first to come"));
- m_max = MAX_NUMBER_ACTIVE_EVENTS;
- m_buf = new EventBufItem[m_max];
- for (int i=0; i<m_max; i++) {
- m_buf[i].gId= 0;
- m_buf[i].active= 0;
- }
- }
- assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);
- // TODO make sure we don't hit roof
- m_handlers.push_back(h);
- DBUG_VOID_RETURN;
-}
-void
-NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_remove");
- DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
- for (Uint32 i=0 ; i < m_handlers.size(); i++)
- {
- DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i]));
- if (m_handlers[i] == h)
- {
- m_handlers.erase(i);
- if (m_handlers.size() == 0)
- {
- DBUG_PRINT("info",("last to go"));
- delete[] m_buf;
- m_buf = NULL;
- }
- DBUG_VOID_RETURN;
- }
- }
- ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle");
- DBUG_PRINT("error",("non-existing handle"));
- abort();
- DBUG_VOID_RETURN;
-}
-
-int
-NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
-(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent");
- int i;
- int bufferId= -1;
- Uint32 eventId= eventOp->m_eventId;
-
- DBUG_PRINT("enter",("eventId: %u", eventId));
- // add_drop_lock(); // only one thread can do add or drop at a time
-
- // Find place where eventId already set
- for (i=0; i<m_no; i++) {
- if (m_buf[i].gId == eventId) {
- DBUG_PRINT("error",("Event with eventId=%d already has a subscriber",
- eventId));
- DBUG_RETURN(-1);
- }
- else if (bufferId < 0 && m_buf[i].gId == 0)
- {
- bufferId= i; // we found an empty spot
- }
- }
- if (bufferId < 0)
- {
- if (m_no < m_max)
- {
- // room for more so get that
- bufferId= m_no;
- m_buf[m_no].gId= 0;
- m_no++;
- }
- else
- {
- // add_drop_unlock();
- DBUG_PRINT("error",("Can't accept more subscribers:"
- " bufferId=%d, m_no=%d, m_max=%d",
- bufferId, m_no, m_max));
- DBUG_RETURN(-1);
- }
- }
-
- EventBufItem &b= m_buf[bufferId];
-
- b.gId= eventId;
- b.active= 0;
- b.m_event_op= eventOp;
- b.theHandle= aHandle;
- b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;
-
- DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d",
- bufferId, eventId));
-
- /* we now have a lock on the prepare so that no one can mess with this
- * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
- */
- DBUG_RETURN(bufferId);
-}
-
-void
-NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent");
- EventBufItem &b = m_buf[bufferId];
-
- DBUG_PRINT("enter", ("eventId: %d, bufferId: %d", b.gId, bufferId));
-
- b.theHandle = NULL;
- b.gId= 0;
- // add_drop_unlock();
- DBUG_VOID_RETURN;
-}
-
void
-NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent");
- EventBufItem &b = m_buf[bufferId];
-
- b.active= 1;
- b.theHandle->addBufferId(bufferId);
-
- // add_drop_unlock();
- DBUG_PRINT("info",("added bufferId %d", bufferId));
- DBUG_VOID_RETURN;
-}
-
-void
-NdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId)
-{
- // add_drop_unlock(); // only one thread can do add or drop at a time
-}
-
-int
-NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent");
- // add_drop_lock(); // only one thread can do add or drop at a time
-
- if (m_buf[bufferId].theHandle != NULL)
- {
- DBUG_RETURN(0);
- }
- DBUG_RETURN(-1);
-}
-
-void
-NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
-{
- DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent");
- // add_drop_lock(); // only one thread can do add-drop at a time
-
- EventBufItem &b = m_buf[bufferId];
-
- b.theHandle->dropBufferId(bufferId);
-
- real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();
-
-#ifdef EVENT_DEBUG
- ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);
-#endif
- DBUG_VOID_RETURN;
-}
-
-void
-NdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci)
+NdbEventBuffer::latestGCI(NdbEventOperationImpl *op, Uint32 gci)
{
if (gci > m_latestGCI)
m_latestGCI = gci;
@@ -1054,24 +668,22 @@
}
Uint32
-NdbGlobalEventBuffer::real_getLatestGCI()
+NdbEventBuffer::getLatestGCI()
{
return m_latestGCI;
}
int
-NdbGlobalEventBuffer::real_insertDataL(int bufferId,
- const SubTableData * const sdata,
- LinearSectionPtr ptr[3])
+NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
+ const SubTableData * const sdata,
+ LinearSectionPtr ptr[3])
{
- DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL");
- EventBufItem &b = m_buf[bufferId];
+ DBUG_ENTER("NdbEventBuffer::insertDataL");
- if ( b.eventType & (1 << (Uint32)sdata->operation) )
+ if ( (Uint32)op->m_eventImpl->mi_type & (1 <<
(Uint32)sdata->operation) )
{
- if (b.active && b.theHandle) {
- DBUG_PRINT("info", ("data insertion in buffer %d with eventId %d",
- bufferId, b.gId));
+ if (op->m_state == NdbEventOperation::EO_EXECUTING) {
+ DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId));
EventBufItem::Data *data;
if ((data= m_free_data))
{
@@ -1100,12 +712,12 @@
{
DBUG_RETURN(-1);
}
- data->m_event_op= b.m_event_op;
+ data->m_event_op= op;
// signal subscriber that there's more to get
- NdbCondition_Signal(b.theHandle->p_cond);
+ NdbCondition_Signal(p_cond);
DBUG_RETURN(0);
} else {
- DBUG_PRINT("info",("Data arrived before ready eventId", b.gId));
+ DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN(0);
}
}
@@ -1116,9 +728,9 @@
}
}
-int NdbGlobalEventBuffer::real_getDataL(const int bufferId)
+int NdbEventBuffer::getDataL(NdbEventOperationImpl *op)
{
- DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL");
+ DBUG_ENTER("NdbEventBuffer::getDataL");
int r= 0;
// ToDo should start at bufferid dependant pos for optimization
@@ -1126,9 +738,9 @@
for(EventBufItem::Data *data= m_available_data;
data; data= data->m_next)
{
- DBUG_PRINT("info",("bufferId: %d", bufferId));
+ DBUG_PRINT("info",("eventId: %d", data->m_event_op->m_eventId));
- if (bufferId == data->m_event_op->m_bufferId)
+ if (op == data->m_event_op)
{
// set NdbEventOperation data
data->m_event_op->m_data_item= data;
@@ -1165,12 +777,12 @@
}
int
-NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
- LinearSectionPtr f_ptr[3],
- SubTableData * &t_sdata,
- LinearSectionPtr t_ptr[3])
+NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
+ LinearSectionPtr f_ptr[3],
+ SubTableData * &t_sdata,
+ LinearSectionPtr t_ptr[3])
{
- DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc");
+ DBUG_ENTER("NdbEventBuffer::copy_data_alloc");
unsigned sz4= (sizeof(SubTableData)+3)>>2;
Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 +
f_ptr[0].sz +
@@ -1199,9 +811,9 @@
}
NdbEventOperationImpl *
-NdbGlobalEventBuffer::move_data()
+NdbEventBuffer::move_data()
{
- DBUG_ENTER("NdbGlobalEventBuffer::move_data");
+ DBUG_ENTER("NdbEventBuffer::move_data");
// handle received data
if (m_received_data)
{
@@ -1240,5 +852,3 @@
}
DBUG_RETURN(0);
}
-
-template class Vector<NdbGlobalEventBufferHandle*>;
--- 1.8/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-03-15 11:00:09 +01:00
+++ 1.9/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-03-16 01:18:03 +01:00
@@ -21,12 +21,10 @@
#include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp>
-class NdbGlobalEventBufferHandle;
+#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
+
class NdbEventOperationImpl;
struct EventBufItem {
- Uint32 gId;
- int active;
- Uint32 eventType;
struct Data {
SubTableData *sdata;
LinearSectionPtr ptr[3];
@@ -34,15 +32,15 @@
NdbEventOperationImpl *m_event_op;
Data *m_next;
};
- NdbGlobalEventBufferHandle *theHandle;
- NdbEventOperationImpl *m_event_op;
};
class NdbEventOperationImpl : public NdbEventOperation {
public:
NdbEventOperationImpl(NdbEventOperation &N,
Ndb *theNdb,
- const char* eventName);
+ const char* eventName);
+ NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
+ NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
~NdbEventOperationImpl();
NdbEventOperation::State getState();
@@ -62,13 +60,14 @@
void printAll();
NdbEventOperation *m_facade;
+ Uint32 m_magic_number;
const NdbError & getNdbError() const;
NdbError m_error;
Ndb *m_ndb;
NdbEventImpl *m_eventImpl;
- NdbGlobalEventBufferHandle *m_bufferHandle;
+ NdbEventBuffer *m_event_buffer;
NdbRecAttr *theFirstPkAttrs[2];
NdbRecAttr *theCurrentPkAttrs[2];
@@ -77,99 +76,55 @@
NdbEventOperation::State m_state;
Uint32 m_eventId;
- int m_bufferId;
Uint32 m_oid;
EventBufItem::Data *m_data_item;
+
+ // managed by the ndb object
+ NdbEventOperationImpl *m_next;
+ NdbEventOperationImpl *m_prev;
};
-class NdbGlobalEventBuffer;
-class NdbGlobalEventBufferHandle {
+
+class NdbEventBuffer {
public:
- NdbGlobalEventBufferHandle (Ndb *ndb,
- int MAX_NUMBER_ACTIVE_EVENTS);
- ~NdbGlobalEventBufferHandle ();
- //static NdbGlobalEventBufferHandle *init(int MAX_NUMBER_ACTIVE_EVENTS);
+ NdbEventBuffer(Ndb*);
+ ~NdbEventBuffer();
+ void add_drop_lock();
+ void add_drop_unlock();
void lock();
void unlock();
- // returns bufferId 0-N if ok otherwise -1
- int prepareAddSubscribeEvent(NdbEventOperationImpl *);
- void unprepareAddSubscribeEvent(int bufferId);
- void addSubscribeEvent(int bufferId);
-
- void unprepareDropSubscribeEvent(int bufferId);
- int prepareDropSubscribeEvent(int bufferId);
- void dropSubscribeEvent(int bufferId);
-
- int getDataL(const int bufferId);
- static int insertDataL(int bufferId,
- const SubTableData * const sdata,
- LinearSectionPtr ptr[3]);
- static void latestGCI(int bufferId, Uint32 gci);
+ int getDataL(NdbEventOperationImpl *op);
+ int insertDataL(NdbEventOperationImpl *op,
+ const SubTableData * const sdata,
+ LinearSectionPtr ptr[3]);
+ void latestGCI(NdbEventOperationImpl *op, Uint32 gci);
+
Uint32 getLatestGCI();
- static Uint32 getEventId(int bufferId);
+ Uint32 getEventId(int bufferId);
int wait(int aMillisecondNumber, NdbEventOperation **p_ev_op= 0);
NdbEventOperation *next(int *error);
-private:
- friend class NdbGlobalEventBuffer;
- void addBufferId(int bufferId);
- void dropBufferId(int bufferId);
-
- NdbMutex *m_mutex;
- struct NdbCondition *p_cond;
- int m_nids;
- int m_bufferIds[NDB_MAX_ACTIVE_EVENTS];
-};
-
-class NdbGlobalEventBuffer {
-private:
- friend class NdbGlobalEventBufferHandle;
- void add_drop_lock();
- void add_drop_unlock();
-
NdbEventOperationImpl *move_data();
- NdbGlobalEventBuffer();
- ~NdbGlobalEventBuffer();
-
- void real_remove(NdbGlobalEventBufferHandle *h);
- void real_init(NdbGlobalEventBufferHandle *h,
- int MAX_NUMBER_ACTIVE_EVENTS);
-
- int real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *h,
- NdbEventOperationImpl *);
- void real_unprepareAddSubscribeEvent(int bufferId);
- void real_addSubscribeEvent(int bufferId);
-
- void real_unprepareDropSubscribeEvent(int bufferId);
- int real_prepareDropSubscribeEvent(int bufferId);
- void real_dropSubscribeEvent(int bufferId);
-
- int real_getDataL(const int bufferId);
- int real_insertDataL(int bufferId,
- const SubTableData * const sdata,
- LinearSectionPtr ptr[3]);
- void real_latestGCI(int bufferId, Uint32 gci);
- Uint32 real_getLatestGCI();
int copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr f_ptr[3],
SubTableData * &t_sdata,
LinearSectionPtr t_ptr[3]);
- Vector<NdbGlobalEventBufferHandle*> m_handlers;
-
// Global Mutex used for some things
- NdbMutex *p_add_drop_mutex;
+ static NdbMutex *p_add_drop_mutex;
Uint32 m_latestGCI;
int m_no;
int m_max;
- EventBufItem *m_buf;
+
+ NdbMutex *m_mutex;
+ struct NdbCondition *p_cond;
// receive thread
EventBufItem::Data *m_received_data;
@@ -188,5 +143,6 @@
// all allocated data
EventBufItem::Data *m_allocated_data;
+ unsigned m_sz;
};
#endif
--- 1.8/ndb/src/ndbapi/NdbImpl.hpp 2004-12-31 10:21:30 +01:00
+++ 1.9/ndb/src/ndbapi/NdbImpl.hpp 2005-03-16 01:18:03 +01:00
@@ -58,6 +58,8 @@
NdbWaiter theWaiter;
+ NdbEventOperationImpl *m_ev_op;
+
int m_optimized_node_selection;
};
--- 1.29/ndb/src/ndbapi/Ndbif.cpp 2005-03-15 11:00:09 +01:00
+++ 1.30/ndb/src/ndbapi/Ndbif.cpp 2005-03-16 01:18:03 +01:00
@@ -26,6 +26,7 @@
#include <NdbRecAttr.hpp>
#include <NdbReceiver.hpp>
#include "API.hpp"
+#include "NdbEventOperationImpl.hpp"
#include <signaldata/TcCommit.hpp>
#include <signaldata/TcKeyFailConf.hpp>
@@ -37,11 +38,14 @@
#include <signaldata/TransIdAI.hpp>
#include <signaldata/ScanFrag.hpp>
#include <signaldata/ScanTab.hpp>
+#include <signaldata/SumaImpl.hpp>
#include <ndb_limits.h>
#include <NdbOut.hpp>
#include <NdbTick.h>
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
/******************************************************************************
* int init( int aNrOfCon, int aNrOfOp );
@@ -693,10 +697,55 @@
aSignal, ptr);
break;
case GSN_SUB_GCP_COMPLETE_REP:
+ {
+ const SubGcpCompleteRep * const rep=
+ CAST_CONSTPTR(SubGcpCompleteRep, aSignal->getDataPtr());
+ const Uint32 gci= rep->gci;
+ // const Uint32 senderRef= rep->senderRef;
+ const Uint32 oid= rep->subscriberData;
+ const Uint32 ref= aSignal->theSendersBlockRef;
+
+ // send acnowledge
+ NdbApiSignal tSignal(theMyRef);
+ SubGcpCompleteAcc * acc=
+ CAST_PTR(SubGcpCompleteAcc, tSignal.getDataPtrSend());
+ acc->rep= *rep;
+ tSignal.theReceiversBlockNumber= refToBlock(ref);
+ tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACC;
+ tSignal.theLength= SubGcpCompleteAcc::SignalLength;
+ Uint32 aNodeId= refToNode(ref);
+ TransporterFacade::instance()->sendSignal(&tSignal, aNodeId);
+
+ NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
+ if (op->m_magic_number == NDB_EVENT_OP_MAGIC_NUMBER)
+ theEventBuffer->latestGCI(op,gci);
+ else
+ g_eventLogger.error("dropped GSN_SUB_GCP_COMPLETE_REP due to wrong magic number");
+ break;
+ }
case GSN_SUB_TABLE_DATA:
- NdbDictInterface::execSignal(&theDictionary->m_receiver,
- aSignal, ptr);
+ {
+ const SubTableData * const sdata = CAST_CONSTPTR(SubTableData,
aSignal->getDataPtr());
+ const Uint32 oid = sdata->subscriberData;
+
+ for (int i= aSignal->m_noOfSections;i < 3; i++) {
+ ptr[i].p = NULL;
+ ptr[i].sz = 0;
+ }
+#ifdef EVENT_DEBUG
+ ndbout_c("%s: senderData %d, gci %d, operation %d, tableId %d, noOfAttrs %d, dataLen
%d",
+ FNAME, subscriberData, gci, operation, tableId, noOfAttrs, dataLen);
+ ndbout_c("ptr[0] %u %u ptr[1] %u %u ptr[2] %u %u\n",
+ ptr[0].p,ptr[0].sz,ptr[1].p,ptr[1].sz,ptr[2].p,ptr[2].sz);
+#endif
+
+ NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
+ if (op->m_magic_number == NDB_EVENT_OP_MAGIC_NUMBER)
+ theEventBuffer->insertDataL(op,sdata, ptr);
+ else
+ g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic number");
break;
+ }
case GSN_DIHNDBTAMPER:
{
tFirstDataPtr = int2void(tFirstData);
--- 1.30/ndb/src/ndbapi/Ndbinit.cpp 2005-03-14 13:56:43 +01:00
+++ 1.31/ndb/src/ndbapi/Ndbinit.cpp 2005-03-16 01:18:03 +01:00
@@ -29,10 +29,10 @@
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include "ObjectMap.hpp"
+#include "NdbEventOperationImpl.hpp"
-class NdbGlobalEventBufferHandle;
-NdbGlobalEventBufferHandle *NdbGlobalEventBuffer_init(Ndb*,int);
-void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *);
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema)
@@ -129,13 +129,11 @@
{
// theImpl->theWaiter.m_mutex must be set before this
- NdbGlobalEventBufferHandle *h=
- NdbGlobalEventBuffer_init(this,NDB_MAX_ACTIVE_EVENTS);
- if (h == NULL) {
- ndbout_c("Failed NdbGlobalEventBuffer_init(%d)",NDB_MAX_ACTIVE_EVENTS);
+ theEventBuffer= new NdbEventBuffer(this);
+ if (theEventBuffer == NULL) {
+ ndbout_c("Failed NdbEventBuffer()");
exit(-1);
}
- theGlobalEventBufferHandle = h;
}
DBUG_VOID_RETURN;
@@ -151,9 +149,17 @@
{
DBUG_ENTER("Ndb::~Ndb()");
DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this));
+
+ assert(theImpl->m_ev_op == 0); // user should return NdbEventOperation's
+ for (NdbEventOperationImpl *op= theImpl->m_ev_op; op; op=op->m_next)
+ {
+ if (op->m_state == NdbEventOperation::EO_EXECUTING && op->stop())
+ g_eventLogger.error("stopping NdbEventOperation failed in Ndb destructor");
+ op->m_magic_number= 0;
+ }
doDisconnect();
- NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle);
+ delete theEventBuffer;
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
TransporterFacade::instance()->close(theNdbBlockNumber, theFirstTransId);
@@ -239,7 +245,8 @@
m_dictionary(ndb),
theCurrentConnectIndex(0),
theNdbObjectIdMap(1024,1024),
- theNoOfDBnodes(0)
+ theNoOfDBnodes(0),
+ m_ev_op(0)
{
int i;
for (i = 0; i < MAX_NDB_NODES; i++) {
--- 1.3/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-15 11:00:09 +01:00
+++ 1.4/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-16 01:18:03 +01:00
@@ -463,6 +463,14 @@
}
} while (0);
+ for (i= 0; i < (int)pOps.size(); i++)
+ {
+ if (ndb.dropEventOperation(pOps[i]))
+ {
+ no_error= 0;
+ }
+ }
+
if (no_error)
DBUG_RETURN(NDBT_ProgramExit(NDBT_OK));
DBUG_RETURN(NDBT_ProgramExit(NDBT_FAILED));
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1804) | tomas | 17 Mar |