Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1803 05/03/15 11:00:21 tomas@stripped +8 -0
partial commit
optimized event api, removed mutex use and data copying
ndb/test/ndbapi/test_event_multi_table.cpp
1.3 05/03/15 11:00:09 tomas@stripped +3 -3
partial commit
optimized event api, removed mutex use and data copying
ndb/src/ndbapi/Ndbif.cpp
1.29 05/03/15 11:00:09 tomas@stripped +5 -3
partial commit
optimized event api, removed mutex use and data copying
ndb/src/ndbapi/NdbEventOperationImpl.hpp
1.8 05/03/15 11:00:09 tomas@stripped +45 -51
partial commit
optimized event api, removed mutex use and data copying
ndb/src/ndbapi/NdbEventOperationImpl.cpp
1.19 05/03/15 11:00:09 tomas@stripped +226 -217
partial commit
optimized event api, removed mutex use and data copying
ndb/src/ndbapi/NdbEventOperation.cpp
1.7 05/03/15 11:00:08 tomas@stripped +4 -8
partial commit
optimized event api, removed mutex use and data copying
ndb/src/ndbapi/Ndb.cpp
1.48 05/03/15 11:00:07 tomas@stripped +4 -31
partial commit
optimized event api, removed mutex use and data copying
ndb/include/ndbapi/NdbEventOperation.hpp
1.15 05/03/15 11:00:07 tomas@stripped +1 -1
partial commit
optimized event api, removed mutex use and data copying
ndb/include/ndbapi/Ndb.hpp
1.42 05/03/15 11:00:07 tomas@stripped +0 -1
partial commit
optimized event api, removed mutex use and data copying
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-wl2325
--- 1.41/ndb/include/ndbapi/Ndb.hpp 2005-03-14 13:56:42 +01:00
+++ 1.42/ndb/include/ndbapi/Ndb.hpp 2005-03-15 11:00:07 +01:00
@@ -1646,7 +1646,6 @@
class NdbImpl * theImpl;
class NdbDictionaryImpl* theDictionary;
class NdbGlobalEventBufferHandle* theGlobalEventBufferHandle;
- class NdbEventOperationImpl* m_event_op;
NdbTransaction* theConIdleList; // First connection in idle list.
--- 1.14/ndb/include/ndbapi/NdbEventOperation.hpp 2005-03-10 20:26:46 +01:00
+++ 1.15/ndb/include/ndbapi/NdbEventOperation.hpp 2005-03-15 11:00:07 +01:00
@@ -226,7 +226,7 @@
friend class NdbEventOperationImpl;
friend class Ndb;
#endif
- NdbEventOperation(Ndb *theNdb, const char* eventName,int bufferLength);
+ NdbEventOperation(Ndb *theNdb, const char* eventName);
~NdbEventOperation();
class NdbEventOperationImpl &m_impl;
NdbEventOperation(NdbEventOperationImpl& impl);
--- 1.47/ndb/src/ndbapi/Ndb.cpp 2005-03-10 20:26:46 +01:00
+++ 1.48/ndb/src/ndbapi/Ndb.cpp 2005-03-15 11:00:07 +01:00
@@ -1206,12 +1206,13 @@
return ret;
}
+// ToDo set event buffer size
NdbEventOperation* Ndb::createEventOperation(const char* eventName,
const int bufferLength)
{
NdbEventOperation* tOp;
- tOp = new NdbEventOperation(this, eventName, bufferLength);
+ tOp = new NdbEventOperation(this, eventName);
if (tOp == 0)
{
@@ -1247,40 +1248,12 @@
int
Ndb::pollEvents(int aMillisecondNumber)
{
- m_event_op= 0;
- return NdbEventOperationImpl::wait(theGlobalEventBufferHandle,
- aMillisecondNumber);
+ return theGlobalEventBufferHandle->wait(aMillisecondNumber);
}
NdbEventOperation *Ndb::nextEvent(int *error)
{
- int _id;
- if (m_event_op)
- _id= m_event_op->m_impl.m_bufferId;
- else
- _id= 0;
- int id= _id;
- int max= theGlobalEventBufferHandle->maxBufferId();
- do {
- if (theGlobalEventBufferHandle->hasData(id))
- {
- m_event_op= theGlobalEventBufferHandle->getNdbEventOperationImpl(id);
- int r= m_event_op->next(0);
- if (r > 0)
- return m_event_op->m_facade;
- else if (r < 0)
- {
- if (error)
- *error= -1;
- return 0;
- }
- }
- id++;
- if (id > max)
- id= 0;
- } while (id != _id);
- m_event_op= 0;
- return 0;
+ return theGlobalEventBufferHandle->next(error);
}
#ifdef VM_TRACE
--- 1.6/ndb/src/ndbapi/NdbEventOperation.cpp 2005-03-10 20:26:46 +01:00
+++ 1.7/ndb/src/ndbapi/NdbEventOperation.cpp 2005-03-15 11:00:08 +01:00
@@ -21,12 +21,8 @@
#include "NdbEventOperationImpl.hpp"
#include "NdbDictionaryImpl.hpp"
-NdbEventOperation::NdbEventOperation(Ndb *theNdb,
- const char* eventName,
- int bufferLength)
- : m_impl(* new NdbEventOperationImpl(*this,theNdb,
- eventName,
- bufferLength))
+NdbEventOperation::NdbEventOperation(Ndb *theNdb,const char* eventName)
+ : m_impl(* new NdbEventOperationImpl(*this,theNdb,eventName))
{
}
@@ -63,13 +59,13 @@
int
NdbEventOperation::next(int *pOverrun)
{
- return m_impl.next(pOverrun);
+ return m_impl.next();
}
int
NdbEventOperation::isOverrun()
{
- return m_impl.m_overrun;
+ return 0; // ToDo
}
bool
--- 1.18/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-14 13:56:42 +01:00
+++ 1.19/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-15 11:00:09 +01:00
@@ -49,13 +49,14 @@
//#define EVENT_DEBUG
+// todo handle several ndb objects
+// todo free allocated data when closing NdbGlobalEventBuffer
NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
Ndb *theNdb,
- const char* eventName,
- const int bufferLength)
+ const char* eventName)
: NdbEventOperation(*this), m_facade(this), m_ndb(theNdb),
- m_state(EO_ERROR), m_bufferL(bufferLength)
+ m_state(EO_ERROR)
{
m_eventId = 0;
theFirstPkAttrs[0] = NULL;
@@ -66,10 +67,7 @@
theCurrentDataAttrs[0] = NULL;
theFirstDataAttrs[1] = NULL;
theCurrentDataAttrs[1] = NULL;
- sdata = NULL;
- ptr[0].p = NULL;
- ptr[1].p = NULL;
- ptr[2].p = NULL;
+ m_data_item= NULL;
// we should lookup id in Dictionary, TODO
// also make sure we only have one listener on each event
@@ -87,10 +85,8 @@
m_eventId = m_eventImpl->m_eventId;
m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
- if (m_bufferHandle->m_bufferL > 0)
- m_bufferL =m_bufferHandle->m_bufferL;
- else
- m_bufferHandle->m_bufferL = m_bufferL;
+
+ m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this);
m_state = EO_CREATED;
}
@@ -98,7 +94,6 @@
NdbEventOperationImpl::~NdbEventOperationImpl()
{
int i;
- if (sdata) NdbMem_Free((char*)sdata);
for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstPkAttrs[i];
while (p) {
@@ -120,6 +115,8 @@
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // We should send stop signal here
}
+
+ m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
}
NdbEventOperation::State
@@ -257,24 +254,17 @@
m_eventImpl->m_bufferId = m_bufferId = (Uint32)r;
- r = -1;
- if (m_bufferId >= 0) {
- // now we check if there's already a subscriber
-
- r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
- if (r) {
- //Error
- m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
- m_state = EO_ERROR;
- } else {
- m_bufferHandle->addSubscribeEvent(m_bufferId);
- m_state = EO_EXECUTING;
- }
- } else {
+ r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
+ if (r) {
//Error
+ m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
m_state = EO_ERROR;
+ DBUG_RETURN(r);
+ } else {
+ m_bufferHandle->addSubscribeEvent(m_bufferId);
+ m_state = EO_EXECUTING;
+ DBUG_RETURN(0);
}
- DBUG_RETURN(r);
}
int
@@ -286,8 +276,6 @@
DBUG_RETURN(-1);
}
- // ndbout_c("NdbEventOperation::stopping()");
-
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) {
m_error.code= m_ndb->getNdbError().code;
@@ -302,40 +290,31 @@
m_error.code= 4712;
DBUG_RETURN(-1);
}
- // m_eventImpl->m_bufferId = m_bufferId;
-
- int r = -1;
- r = myDictImpl.stopSubscribeEvent(*m_eventImpl);
-#ifdef EVENT_DEBUG
- ndbout_c("NdbEventOperation::stopping() done");
-#endif
+ int r= myDictImpl.stopSubscribeEvent(*m_eventImpl);
if (r) {
//Error
m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
m_error.code= myDictImpl.m_error.code;
m_state = EO_ERROR;
+ DBUG_RETURN(r);
} else {
-#ifdef EVENT_DEBUG
- ndbout_c("NdbEventOperation::dropping()");
-#endif
m_bufferHandle->dropSubscribeEvent(m_bufferId);
m_state = EO_CREATED;
+ DBUG_RETURN(0);
}
-
- DBUG_RETURN(r);
}
bool
NdbEventOperationImpl::isConsistent()
{
- return sdata->isGCIConsistent();
+ return m_data_item->sdata->isGCIConsistent();
}
Uint32
NdbEventOperationImpl::getGCI()
{
- return sdata->gci;
+ return m_data_item->sdata->gci;
}
Uint32
@@ -345,26 +324,23 @@
}
int
-NdbEventOperationImpl::next(int *pOverrun)
+NdbEventOperationImpl::next()
{
DBUG_ENTER("NdbEventOperationImpl::next");
int nr = 10000; // a high value
- m_overrun = 0;
while (nr > 0) {
- int o= 0;
- int r= m_bufferHandle->getDataL(m_bufferId, sdata, ptr, &o);
-
- m_overrun+= o;
- if (pOverrun)
- *pOverrun = m_overrun;
+ int r= m_bufferHandle->getDataL(m_bufferId);
if (r <= 0)
{
DBUG_RETURN(r); // no data
}
- if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever
+ if (r < nr)
+ nr = r;
+ else
+ nr--; // we don't want to be stuck here forever
#ifdef EVENT_DEBUG
ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);
@@ -381,9 +357,9 @@
// no copying since no RecAttr's
- Uint32 *aAttrPtr = ptr[0].p;
- Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
- Uint32 *aDataPtr = ptr[1].p;
+ Uint32 *aAttrPtr = m_data_item->ptr[0].p;
+ Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
+ Uint32 *aDataPtr = m_data_item->ptr[1].p;
#ifdef EVENT_DEBUG
printf("after values sz=%u\n", ptr[1].sz);
@@ -460,8 +436,8 @@
}
tWorkingRecAttr = theFirstDataAttrs[1];
- aDataPtr = ptr[2].p;
- Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
+ aDataPtr = m_data_item->ptr[2].p;
+ Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
tRecAttrId = tWorkingRecAttr->attrId();
tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
@@ -501,7 +477,7 @@
NdbDictionary::Event::TableEvent
NdbEventOperationImpl::getEventType()
{
- switch (sdata->operation) {
+ switch (m_data_item->sdata->operation) {
case TriggerEvent::TE_INSERT:
return NdbDictionary::Event::TE_INSERT;
case TriggerEvent::TE_DELETE:
@@ -543,9 +519,9 @@
void
NdbEventOperationImpl::printAll()
{
- Uint32 *aAttrPtr = ptr[0].p;
- Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
- Uint32 *aDataPtr = ptr[1].p;
+ Uint32 *aAttrPtr = m_data_item->ptr[0].p;
+ Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
+ Uint32 *aDataPtr = m_data_item->ptr[1].p;
//tRecAttr->setup(tAttrInfo, aValue)) {
@@ -560,12 +536,6 @@
}
}
-
-int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber)
-{
- return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber);
-}
-
/*
* Global variable ndbGlobalEventBuffer
* Class NdbGlobalEventBufferHandle
@@ -636,7 +606,7 @@
}
NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle
-(Ndb *ndb, int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0)
+(Ndb *ndb, int MAX_NUMBER_ACTIVE_EVENTS) : m_nids(0)
{
if ((p_cond = NdbCondition_Create()) == NULL) {
ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed");
@@ -645,7 +615,7 @@
m_mutex= ndb->theImpl->theWaiter.m_mutex;
- lock();
+ NdbMutex_Lock(m_mutex);
if (ndbGlobalEventBuffer == NULL) {
if (ndbGlobalEventBuffer == NULL) {
ndbGlobalEventBuffer = new NdbGlobalEventBuffer();
@@ -656,7 +626,7 @@
}
}
}
- unlock();
+ NdbMutex_Unlock(m_mutex);
GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));
}
@@ -780,41 +750,53 @@
}
int
-NdbGlobalEventBufferHandle::wait(int aMillisecondNumber)
+NdbGlobalEventBufferHandle::wait(int aMillisecondNumber,
+ NdbEventOperation **p_ev_op)
{
DBUG_ENTER("NdbGlobalEventBufferHandle::wait");
- lock();
- // check if there are anything in any of the buffers
- int i;
- int n = 0;
- for (i = 0; i < m_nids; i++)
- n += hasData(m_bufferIds[i]);
- if (n)
+ NdbMutex_Lock(m_mutex);
+ NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
+ NdbEventOperationImpl *ev_op= ndbGlobalEventBuffer->move_data();
+ NdbMutex_Unlock(m_mutex); // we have moved the data
+
+ if (ev_op)
{
- unlock();
- DBUG_RETURN(n);
+ if (p_ev_op)
+ *p_ev_op= ev_op->m_facade;
+ DBUG_PRINT("exit",("m_available_data_count %u",
+ ndbGlobalEventBuffer->m_available_data_count));
+ DBUG_RETURN(ndbGlobalEventBuffer->m_available_data_count);
}
+ DBUG_RETURN(0);
+}
- if (NdbCondition_WaitTimeout(p_cond, m_mutex,
- aMillisecondNumber) == 0)
+NdbEventOperation *
+NdbGlobalEventBufferHandle::next(int *error)
+{
+ DBUG_ENTER("NdbGlobalEventBufferHandle::next");
+ EventBufItem::Data *data;
+ while((data = ndbGlobalEventBuffer->m_available_data))
{
- n = 0;
- for (i = 0; i < m_nids; i++)
- n += hasData(m_bufferIds[i]);
- unlock();
- DBUG_RETURN(n);
+ NdbEventOperationImpl *ev_op= data->m_event_op;
+ int r= ev_op->next();
+ if (r > 0)
+ {
+ DBUG_RETURN(ev_op->m_facade);
+ }
+ else if (r < 0)
+ {
+ if (error)
+ *error= -1;
+ DBUG_RETURN(0);
+ }
}
- unlock();
- DBUG_RETURN(-1);
+ DBUG_RETURN(0);
}
-int NdbGlobalEventBufferHandle::getDataL(const int bufferId,
- SubTableData * &sdata,
- LinearSectionPtr ptr[3],
- int *pOverrun)
+int NdbGlobalEventBufferHandle::getDataL(const int bufferId)
{
- GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun));
+ return ndbGlobalEventBuffer->real_getDataL(bufferId);
}
/*
@@ -856,10 +838,37 @@
ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
exit(-1);
}
+
+ // ToDo set event buffer size
+ // pre allocate event data array
+ int sz= 10000;
+ m_allocated_data=
+ (EventBufItem::Data *)NdbMem_Allocate(sz*sizeof(EventBufItem::Data));
+ bzero((void*)m_allocated_data, sz*sizeof(EventBufItem::Data));
+ for (int i= 0; i < sz-1; i++)
+ {
+ m_allocated_data[i].m_next= &m_allocated_data[i+1];
+ }
+ m_allocated_data[sz-1].m_next= 0;
+
+ // initialize lists
+ m_received_data= 0;
+ m_last_received_data= 0;
+ m_received_data_count= 0;
+
+ m_available_data= 0;
+ m_last_available_data= 0;
+ m_available_data_count= 0;
+
+ m_used_data= 0;
+ m_last_used_data= 0;
+
+ m_free_data= m_allocated_data;
}
NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
{
+ NdbMem_Free((void*)m_allocated_data);
NdbMutex_Destroy(p_add_drop_mutex);
// NdbMem_Deallocate(m_eventBufferIdToEventId);
}
@@ -873,11 +882,10 @@
{ // First init
DBUG_PRINT("info",("first to come"));
m_max = MAX_NUMBER_ACTIVE_EVENTS;
- m_buf = new BufItem[m_max];
+ m_buf = new EventBufItem[m_max];
for (int i=0; i<m_max; i++) {
m_buf[i].gId= 0;
m_buf[i].active= 0;
- m_buf[i].bufferempty= 1;
}
}
assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);
@@ -954,31 +962,14 @@
}
}
- BufItem &b= m_buf[bufferId];
+ EventBufItem &b= m_buf[bufferId];
b.gId= eventId;
b.active= 0;
b.m_event_op= eventOp;
+ b.theHandle= aHandle;
b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;
- b.front= 0;
- b.sz= 0;
- b.max_sz= aHandle->m_bufferL;
- b.data=
- (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));
- for (int i = 0; i < b.max_sz; i++) {
- b.data[i].sdata= NULL;
- b.data[i].ptr[0].p= NULL;
- b.data[i].ptr[1].p= NULL;
- b.data[i].ptr[2].p= NULL;
- }
-
- // initialize BufItem
- b.theHandle = aHandle;
- b.back=0;
- b.bufferempty = 1;
- b.overrun=0; // set to -1 to handle first insert
-
DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d",
bufferId, eventId));
@@ -992,16 +983,12 @@
NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
{
DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent");
- BufItem &b = m_buf[bufferId];
+ EventBufItem &b = m_buf[bufferId];
DBUG_PRINT("enter", ("eventId: %d, bufferId: %d", b.gId, bufferId));
b.theHandle = NULL;
b.gId= 0;
- if (b.data) {
- NdbMem_Free((void *)b.data);
- b.data = NULL;
- }
// add_drop_unlock();
DBUG_VOID_RETURN;
}
@@ -1010,7 +997,7 @@
NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId)
{
DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent");
- BufItem &b = m_buf[bufferId];
+ EventBufItem &b = m_buf[bufferId];
b.active= 1;
b.theHandle->addBufferId(bufferId);
@@ -1045,11 +1032,8 @@
DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent");
// add_drop_lock(); // only one thread can do add-drop at a time
- BufItem &b = m_buf[bufferId];
+ EventBufItem &b = m_buf[bufferId];
- b.overrun=0;
- b.bufferempty=1;
- b.back=0;
b.theHandle->dropBufferId(bufferId);
real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();
@@ -1081,123 +1065,105 @@
LinearSectionPtr ptr[3])
{
DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL");
- BufItem &b = m_buf[bufferId];
+ EventBufItem &b = m_buf[bufferId];
if ( b.eventType & (1 << (Uint32)sdata->operation) )
{
- if (b.active) {
-#ifdef EVENT_DEBUG
- ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);
-#endif
- // move front forward
+ if (b.active && b.theHandle) {
+ DBUG_PRINT("info", ("data insertion in buffer %d with eventId %d",
+ bufferId, b.gId));
+ EventBufItem::Data *data;
+ if ((data= m_free_data))
+ {
+ // remove data from free list
+ m_free_data= m_free_data->m_next;
+ // add it to received data
+ data->m_next= 0;
+ if (m_last_received_data)
+ {
+ m_last_received_data->m_next= data;
+ }
+ else
+ {
+ m_received_data= data;
+ }
+ m_last_received_data= data;
+ m_received_data_count++;
+ }
+ else
+ {
+ DBUG_RETURN(-1); // TODO handle this, overrun, or, skip?
+ }
+
if (copy_data_alloc(sdata, ptr,
- b.data[b.front].sdata, b.data[b.front].ptr))
+ data->sdata, data->ptr))
{
DBUG_RETURN(-1);
}
- if (b.theHandle) { // active subscriber
- if (b.front == b.back) { // next-to-read == written
- if (b.bufferempty == 0) {
- b.overrun++; // another item has been overwritten
- b.back++; // move next-to-read next since old item was overwritten
- if (b.back == b.max_sz) b.back= 0; // start from beginning
- }
- }
- b.bufferempty = 0;
- // signal subscriber that there's more to get
- NdbCondition_Signal(b.theHandle->p_cond);
- }
- b.front++; // move next-to-write
- if (b.front == b.max_sz) b.front = 0; // start from beginning
-#ifdef EVENT_DEBUG
- ndbout_c("Front= %d Back = %d overun = %d", b.front,
- b.back, b.overrun);
-#endif
+ data->m_event_op= b.m_event_op;
+ // signal subscriber that there's more to get
+ NdbCondition_Signal(b.theHandle->p_cond);
+ DBUG_RETURN(0);
} else {
-#ifdef EVENT_DEBUG
- ndbout_c("Data arrived before ready eventId", b.gId);
-#endif
+ DBUG_PRINT("info",("Data arrived before ready eventId", b.gId));
+ DBUG_RETURN(0);
}
}
else
{
-#ifdef EVENT_DEBUG
- ndbout_c("skipped");
-#endif
- }
-
- DBUG_RETURN(0);
-}
-
-int NdbGlobalEventBuffer::hasData(int bufferId) {
- DBUG_ENTER("NdbGlobalEventBuffer::hasData");
- BufItem &b = m_buf[bufferId];
-
- if(b.bufferempty)
- {
+ DBUG_PRINT("info",("skipped"));
DBUG_RETURN(0);
}
-
- if (b.front <= b.back)
- {
- DBUG_RETURN(b.max_sz-b.back + b.front);
- }
- else
- {
- DBUG_RETURN(b.front-b.back);
- }
}
-int NdbGlobalEventBufferHandle::hasData(int bufferId)
-{
- return ndbGlobalEventBuffer->hasData(bufferId);
-}
-NdbEventOperationImpl *NdbGlobalEventBufferHandle::getNdbEventOperationImpl(int bufferId)
-{
- return ndbGlobalEventBuffer->getNdbEventOperationImpl(bufferId);
-}
-NdbEventOperationImpl *NdbGlobalEventBuffer::getNdbEventOperationImpl(int bufferId)
-{
- return m_buf[bufferId].m_event_op;
-}
-
-int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
- SubTableData * &sdata,
- LinearSectionPtr ptr[3],
- int *pOverrun)
+int NdbGlobalEventBuffer::real_getDataL(const int bufferId)
{
DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL");
- BufItem &b = m_buf[bufferId];
-
- if (pOverrun) {
- *pOverrun = b.overrun;
- b.overrun = 0; // if pOverrun is returned to user reset e.overrun
- }
-
- if (b.bufferempty)
- {
- DBUG_RETURN(0); // nothing to get
- }
+ int r= 0;
- DBUG_PRINT("info",("bufferId: %d, b.back: %d", bufferId, b.back));
-
- if (copy_data_alloc(b.data[b.back].sdata, b.data[b.back].ptr,
- sdata, ptr))
+ // ToDo should start at bufferid dependant pos for optimization
+ EventBufItem::Data *prev= 0;
+ for(EventBufItem::Data *data= m_available_data;
+ data; data= data->m_next)
{
- DBUG_RETURN(-1);
- }
+ DBUG_PRINT("info",("bufferId: %d", bufferId));
- b.back++; if (b.back == b.max_sz) b.back= 0; // move next-to-read forward
+ if (bufferId == data->m_event_op->m_bufferId)
+ {
+ // set NdbEventOperation data
+ data->m_event_op->m_data_item= data;
- if (b.front == b.back) // back has cought up with front
- b.bufferempty = 1;
+ // remove item from m_available_data
+ if (prev)
+ {
+ prev->m_next= data->m_next;
+ if (prev->m_next == 0) // the last was removed
+ m_last_available_data= prev;
+ }
+ else
+ {
+ m_available_data= data->m_next;
+ if (m_available_data == 0)
+ m_last_available_data= 0;
+ }
+ m_available_data_count--;
-#ifdef EVENT_DEBUG
- ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);
-#endif
+ // add it to used list
+ if (m_last_used_data == 0)
+ m_last_used_data= data;
+ data->m_next= m_used_data;
+ m_used_data= data;
- DBUG_RETURN(hasData(bufferId)+1);
+ r= m_available_data_count+1;
+ break;
+ }
+ // keep old data item to be able to "link out"
+ prev= data;
+ }
+ DBUG_PRINT("exit",("return: %u", r));
+ DBUG_RETURN(r);
}
+
int
NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr f_ptr[3],
@@ -1228,6 +1194,49 @@
t_p.p= NULL;
t_p.sz= 0;
}
+ }
+ DBUG_RETURN(0);
+}
+
+NdbEventOperationImpl *
+NdbGlobalEventBuffer::move_data()
+{
+ DBUG_ENTER("NdbGlobalEventBuffer::move_data");
+ // handle received data
+ if (m_received_data)
+ {
+ // move this list to last in m_available_data
+ if (m_last_available_data)
+ {
+ m_last_available_data->m_next= m_received_data;
+ }
+ else
+ {
+ m_available_data= m_received_data;
+ }
+ m_last_available_data= m_last_received_data;
+ m_available_data_count+= m_received_data_count;
+
+ // all received data moved to available data
+ m_received_data= 0;
+ m_last_received_data= 0;
+ m_received_data_count= 0;
+ }
+ // handle used data
+ if (m_used_data)
+ {
+ // return m_used_data to m_free_data
+ m_last_used_data->m_next= m_free_data;
+ m_free_data= m_used_data;
+
+ // all m_used_data returned to m_free_data
+ m_used_data= 0;
+ m_last_used_data= 0;
+ }
+ if (m_available_data)
+ {
+ DBUG_PRINT("exit",("m_available_data_count %u", m_available_data_count));
+ DBUG_RETURN(m_available_data->m_event_op);
}
DBUG_RETURN(0);
}
--- 1.7/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-03-14 13:56:42 +01:00
+++ 1.8/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-03-15 11:00:09 +01:00
@@ -22,12 +22,27 @@
#include <transporter/TransporterDefinitions.hpp>
class NdbGlobalEventBufferHandle;
+class NdbEventOperationImpl;
+struct EventBufItem {
+ Uint32 gId;
+ int active;
+ Uint32 eventType;
+ struct Data {
+ SubTableData *sdata;
+ LinearSectionPtr ptr[3];
+
+ NdbEventOperationImpl *m_event_op;
+ Data *m_next;
+ };
+ NdbGlobalEventBufferHandle *theHandle;
+ NdbEventOperationImpl *m_event_op;
+};
+
class NdbEventOperationImpl : public NdbEventOperation {
public:
NdbEventOperationImpl(NdbEventOperation &N,
Ndb *theNdb,
- const char* eventName,
- const int bufferLength);
+ const char* eventName);
~NdbEventOperationImpl();
NdbEventOperation::State getState();
@@ -36,25 +51,17 @@
int stop();
NdbRecAttr *getValue(const char *colName, char *aValue, int n);
NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
- static int wait(void *p, int aMillisecondNumber);
- int next(int *pOverrun);
+ int next();
bool isConsistent();
Uint32 getGCI();
Uint32 getLatestGCI();
NdbDictionary::Event::TableEvent getEventType();
- /*
- getOperation();
- getGCI();
- getLogType();
- */
-
void print();
void printAll();
NdbEventOperation *m_facade;
- int m_overrun;
const NdbError & getNdbError() const;
NdbError m_error;
@@ -71,9 +78,9 @@
NdbEventOperation::State m_state;
Uint32 m_eventId;
int m_bufferId;
- int m_bufferL;
- SubTableData *sdata;
- LinearSectionPtr ptr[3];
+ Uint32 m_oid;
+
+ EventBufItem::Data *m_data_item;
};
class NdbGlobalEventBuffer;
@@ -96,10 +103,7 @@
int prepareDropSubscribeEvent(int bufferId);
void dropSubscribeEvent(int bufferId);
- int getDataL(const int bufferId,
- SubTableData * &sdata,
- LinearSectionPtr ptr[3],
- int *pOverrun);
+ int getDataL(const int bufferId);
static int insertDataL(int bufferId,
const SubTableData * const sdata,
LinearSectionPtr ptr[3]);
@@ -107,13 +111,9 @@
Uint32 getLatestGCI();
static Uint32 getEventId(int bufferId);
- NdbEventOperationImpl *getNdbEventOperationImpl(int bufferId);
- int hasData(int bufferId);
- int maxBufferId() { return m_nids-1; };
+ int wait(int aMillisecondNumber, NdbEventOperation **p_ev_op= 0);
+ NdbEventOperation *next(int *error);
- int wait(int aMillisecondNumber);
-
- int m_bufferL;
private:
friend class NdbGlobalEventBuffer;
void addBufferId(int bufferId);
@@ -131,6 +131,8 @@
void add_drop_lock();
void add_drop_unlock();
+ NdbEventOperationImpl *move_data();
+
NdbGlobalEventBuffer();
~NdbGlobalEventBuffer();
@@ -143,16 +145,11 @@
void real_unprepareAddSubscribeEvent(int bufferId);
void real_addSubscribeEvent(int bufferId);
- NdbEventOperationImpl *getNdbEventOperationImpl(int bufferId);
-
void real_unprepareDropSubscribeEvent(int bufferId);
int real_prepareDropSubscribeEvent(int bufferId);
void real_dropSubscribeEvent(int bufferId);
- int real_getDataL(const int bufferId,
- SubTableData * &sdata,
- LinearSectionPtr ptr[3],
- int *pOverrun);
+ int real_getDataL(const int bufferId);
int real_insertDataL(int bufferId,
const SubTableData * const sdata,
LinearSectionPtr ptr[3]);
@@ -163,8 +160,6 @@
SubTableData * &t_sdata,
LinearSectionPtr t_ptr[3]);
- int hasData(int bufferId);
-
Vector<NdbGlobalEventBufferHandle*> m_handlers;
// Global Mutex used for some things
@@ -174,25 +169,24 @@
int m_no;
int m_max;
- struct BufItem {
- Uint32 gId;
- int active;
- Uint32 eventType;
- struct Data {
- SubTableData *sdata;
- LinearSectionPtr ptr[3];
- } * data;
-
- int back;
- int overrun;
- int bufferempty;
- int front;
- int sz;
- int max_sz;
+ EventBufItem *m_buf;
- NdbGlobalEventBufferHandle *theHandle;
- NdbEventOperationImpl *m_event_op;
- };
- BufItem *m_buf;
+ // receive thread
+ EventBufItem::Data *m_received_data;
+ EventBufItem::Data *m_last_received_data;
+ unsigned m_received_data_count;
+
+ EventBufItem::Data *m_free_data;
+
+ // user thread
+ EventBufItem::Data *m_available_data;
+ EventBufItem::Data *m_last_available_data;
+ unsigned m_available_data_count;
+
+ EventBufItem::Data *m_used_data;
+ EventBufItem::Data *m_last_used_data;
+
+ // all allocated data
+ EventBufItem::Data *m_allocated_data;
};
#endif
--- 1.28/ndb/src/ndbapi/Ndbif.cpp 2005-03-14 13:56:43 +01:00
+++ 1.29/ndb/src/ndbapi/Ndbif.cpp 2005-03-15 11:00:09 +01:00
@@ -685,16 +685,18 @@
case GSN_SUB_REMOVE_CONF:
case GSN_SUB_REMOVE_REF:
break; // ignore these signals
- case GSN_SUB_GCP_COMPLETE_REP:
case GSN_SUB_START_CONF:
case GSN_SUB_START_REF:
- case GSN_SUB_TABLE_DATA:
case GSN_SUB_STOP_CONF:
case GSN_SUB_STOP_REF:
NdbDictInterface::execSignal(&theDictionary->m_receiver,
aSignal, ptr);
break;
-
+ case GSN_SUB_GCP_COMPLETE_REP:
+ case GSN_SUB_TABLE_DATA:
+ NdbDictInterface::execSignal(&theDictionary->m_receiver,
+ aSignal, ptr);
+ break;
case GSN_DIHNDBTAMPER:
{
tFirstDataPtr = int2void(tFirstData);
--- 1.2/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-10 20:26:46 +01:00
+++ 1.3/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-15 11:00:09 +01:00
@@ -65,14 +65,14 @@
while (1)
{
int res= ndb->pollEvents(1000); // wait for event or 1000 ms
- DBUG_PRINT("info", ("pollEvents res=%d", r));
+ DBUG_PRINT("info", ("pollEvents res=%d", res));
if (res <= 0)
{
break;
}
int error= 0;
- for (NdbEventOperation *pOp= ndb->nextEvent(&error);
- pOp; pOp= ndb->nextEvent(&error) )
+ NdbEventOperation *pOp;
+ while ((pOp= ndb->nextEvent(&error)))
{
char buf[1024];
sprintf(buf, "%s_SHADOW", pOp->getTable()->getName());
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1803) | tomas | 15 Mar |