Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1897 05/05/31 02:55:19 tomas@stripped +10 -0
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
adopted to change in event interface
+ one bugfix w.r.t dropping table on which there is no event
cleaned up internal implemenation as a result
storage/ndb/test/ndbapi/test_event.cpp
1.16 05/05/31 02:55:03 tomas@stripped +6 -13
adopted to changes in event api
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
1.30 05/05/31 02:55:03 tomas@stripped +17 -22
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
cleaned up internal implementatoin as a result
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
1.60 05/05/31 02:55:02 tomas@stripped +166 -193
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
cleaned up internal implemenation as a result
storage/ndb/src/ndbapi/NdbEventOperation.cpp
1.17 05/05/31 02:55:02 tomas@stripped +1 -7
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
storage/ndb/src/ndbapi/Ndb.cpp
1.61 05/05/31 02:55:02 tomas@stripped +3 -3
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
storage/ndb/ndbapi-examples/ndbapi_event_example/ndbapi_event.cpp
1.15 05/05/31 02:55:01 tomas@stripped +2 -7
adopted to changes in event api
storage/ndb/include/ndbapi/NdbEventOperation.hpp
1.26 05/05/31 02:55:01 tomas@stripped +9 -28
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
storage/ndb/include/ndbapi/Ndb.hpp
1.50 05/05/31 02:55:01 tomas@stripped +5 -4
changed event api somewhat
- removed NbdEventOperation::next()
- replaced with Ndb::nextEvent()
storage/ndb/include/kernel/signaldata/SumaImpl.hpp
1.13 05/05/31 02:55:00 tomas@stripped +1 -6
removed is consistant flag
sql/ha_ndbcluster.cc
1.261 05/05/31 02:54:58 tomas@stripped +4 -6
adopted to change in event interface
+ one bugfix w.r.t dropping table on which there is no event
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-wl2325
--- 1.14/storage/ndb/ndbapi-examples/ndbapi_event_example/ndbapi_event.cpp 2005-04-08 02:43:54 +02:00
+++ 1.15/storage/ndb/ndbapi-examples/ndbapi_event_example/ndbapi_event.cpp 2005-05-31 02:55:01 +02:00
@@ -29,6 +29,7 @@
* createEventOperation()
* dropEventOperation()
* pollEvents()
+ * nextEvent()
*
* NdbDictionary
* createEvent()
@@ -43,8 +44,6 @@
* getValue()
* getPreValue()
* execute()
- * next()
- * isConsistent()
* getEventType()
*
*/
@@ -172,11 +171,8 @@
int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
if (r > 0) {
// printf("got data! %d\n", r);
- int overrun;
- while (op->next(&overrun) > 0) {
+ while ((op= myNdb->nextEvent())) {
i++;
- if (!op->isConsistent())
- printf("A node failure has occured and events might be missing\n");
switch (op->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
printf("%u INSERT: ", i);
@@ -190,7 +186,6 @@
default:
abort(); // should not happen
}
- printf("overrun %u pk %u: ", overrun, recAttr[0]->u_32_value());
for (int i = 1; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value
printf(" post[%u]=", i);
--- 1.12/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-05-26 10:38:27 +02:00
+++ 1.13/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-05-31 02:55:00 +02:00
@@ -296,14 +296,9 @@
enum LogType {
SCAN = 1,
LOG = 2,
- REMOVE_FLAGS = 0xff,
- GCINOTCONSISTENT = 0x1 << 16
+ REMOVE_FLAGS = 0xff
};
- void setGCINotConsistent() { logType |= (Uint32)GCINOTCONSISTENT; };
- bool isGCIConsistent()
- { return (logType & (Uint32)GCINOTCONSISTENT) == 0 ? true : false; };
-
Uint32 senderData;
Uint32 gci;
Uint32 tableId;
--- 1.49/storage/ndb/include/ndbapi/Ndb.hpp 2005-05-26 16:22:16 +02:00
+++ 1.50/storage/ndb/include/ndbapi/Ndb.hpp 2005-05-31 02:55:01 +02:00
@@ -1234,17 +1234,18 @@
* @param aMillisecondNumber
* maximum time to wait
*
- * @return the number of events that has occured, -1 on failure
+ * @return > 0 if events available, 0 if no events available, < 0 on failure
*/
int pollEvents(int aMillisecondNumber);
-#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
- * Returns an event that has data after a pollEvents
+ * Returns an event operation that has data after a pollEvents
*
* @return an event operations that has data, NULL if no events left with data.
*/
- NdbEventOperation *nextEvent(int *error, Uint64 maxGCI= ~(Uint64)0);
+ NdbEventOperation *nextEvent(Uint64 maxGCI= ~(Uint64)0);
+
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
void setWaitGCI(Uint64 gci);
Uint64 getLatestGCI();
--- 1.25/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2005-05-18 01:43:32 +02:00
+++ 1.26/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2005-05-31 02:55:01 +02:00
@@ -33,7 +33,7 @@
* - To listen to events, an NdbEventOperation object is instantiated by
* Ndb::createEventOperation()
* - execute() starts the event flow. Use Ndb::pollEvents() to wait
- * for an event to occur. Use next() to iterate
+ * for an event to occur. Use Ndb::nextEvent() to iterate
* through the events that have occured.
* - The instance is removed by Ndb::dropEventOperation()
*
@@ -56,9 +56,9 @@
* - Today all events INSERT/DELETE/UPDATE and all changed attributes are
* sent to the API, even if only specific attributes have been specified.
* These are however hidden from the user and only relevant data is shown
- * after next().
+ * after Ndb::nextEvent().
* - "False" exits from Ndb::pollEvents() may occur and thus
- * the subsequent next() will return zero,
+ * the subsequent Ndb::nextEvent() will return NULL,
* since there was no available data. Just do Ndb::pollEvents() again.
* - Event code does not check table schema version. Make sure to drop events
* after table is dropped. Will be fixed in later
@@ -96,18 +96,14 @@
/**
* Activates the NdbEventOperation to start receiving events. The
- * changed attribute values may be retrieved after next() has returned
- * a value greater than zero. The getValue() methods must be called
+ * changed attribute values may be retrieved after Ndb::nextEvent()
+ * has returned not NULL. The getValue() methods must be called
* prior to execute().
*
* @return 0 if successful otherwise -1.
*/
int execute();
- // about the event operation
- // getting data
- // NdbResultSet* getResultSet();
-
/**
* Defines a retrieval operation of an attribute value.
* The NDB API allocate memory for the NdbRecAttr object that
@@ -130,8 +126,8 @@
* the database! The NdbRecAttr object returned by this method
* is <em>not</em> readable/printable before the
* execute() has been made and
- * next() has returned a value greater than
- * zero. If a specific attribute has not changed the corresponding
+ * Ndb::nextEvent() has returned not NULL.
+ * If a specific attribute has not changed the corresponding
* NdbRecAttr will be in state UNDEFINED. This is checked by
* NdbRecAttr::isNULL() which then returns -1.
*
@@ -150,22 +146,6 @@
*/
NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0);
- /**
- * Retrieves event resultset if available, inserted into the NdbRecAttrs
- * specified in getValue() and getPreValue(). To avoid polling for
- * a resultset, one can use Ndb::pollEvents()
- * which will wait on a mutex until an event occurs or the specified
- * timeout occurs.
- *
- * @return >=0 if successful otherwise -1. Return value indicates number
- * of available events. By sending pOverRun one may query for buffer
- * overflow and *pOverRun will indicate the number of events that have
- * overwritten.
- *
- * @return number of available events, -1 on failure
- */
- int next(int *pOverRun=0,Uint64 maxGCI=~(Uint64)0);
-
int isOverrun() const;
/**
@@ -177,7 +157,8 @@
/**
* Query for occured event type.
*
- * @note Only valid after next() has been called and returned value >= 0
+ * @note Only valid after Ndb::nextEvent() has been called and
+ * returned a not NULL value
*
* @return type of event
*/
--- 1.60/storage/ndb/src/ndbapi/Ndb.cpp 2005-05-18 01:43:32 +02:00
+++ 1.61/storage/ndb/src/ndbapi/Ndb.cpp 2005-05-31 02:55:02 +02:00
@@ -1272,12 +1272,12 @@
int
Ndb::pollEvents(int aMillisecondNumber)
{
- return theEventBuffer->wait(aMillisecondNumber);
+ return theEventBuffer->pollEvents(aMillisecondNumber);
}
-NdbEventOperation *Ndb::nextEvent(int *error, Uint64 maxGCI)
+NdbEventOperation *Ndb::nextEvent(Uint64 maxGCI)
{
- return theEventBuffer->next(error, maxGCI);
+ return theEventBuffer->nextEvent(maxGCI);
}
void Ndb::setWaitGCI(Uint64 gci)
--- 1.16/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2005-04-27 23:32:59 +02:00
+++ 1.17/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2005-05-31 02:55:02 +02:00
@@ -57,12 +57,6 @@
}
int
-NdbEventOperation::next(int *pOverrun, Uint64 maxGCI)
-{
- return m_impl.next(maxGCI);
-}
-
-int
NdbEventOperation::isOverrun() const
{
return 0; // ToDo
@@ -71,7 +65,7 @@
bool
NdbEventOperation::isConsistent() const
{
- return m_impl.isConsistent();
+ return true;
}
void
--- 1.59/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-05-30 17:11:35 +02:00
+++ 1.60/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-05-31 02:55:02 +02:00
@@ -316,12 +316,6 @@
DBUG_RETURN(r);
}
-bool
-NdbEventOperationImpl::isConsistent()
-{
- return m_data_item->sdata->isGCIConsistent();
-}
-
Uint64
NdbEventOperationImpl::getGCI()
{
@@ -335,150 +329,135 @@
}
int
-NdbEventOperationImpl::next(Uint64 maxGCI)
+NdbEventOperationImpl::receive_event()
{
- DBUG_ENTER("NdbEventOperationImpl::next");
- int nr = 10000; // a high value
-
- do
- {
- int r= m_event_buffer->getDataL(this, maxGCI);
-
- if (unlikely(r <= 0))
- {
- DBUG_RETURN(r); // no data
- }
+ DBUG_ENTER("NdbEventOperationImpl::receive_event");
- Uint32 operation= (Uint32)m_data_item->sdata->operation;
- DBUG_PRINT("info",("sdata->operation %u",operation));
+ Uint32 operation= (Uint32)m_data_item->sdata->operation;
+ DBUG_PRINT("info",("sdata->operation %u",operation));
- if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
- {
- DBUG_RETURN(r);
- }
+ if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
+ {
+ DBUG_RETURN(1);
+ }
- // now move the data into the RecAttrs
+ // now move the data into the RecAttrs
- int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
+ int is_update= operation == NdbDictionary::Event::_TE_UPDATE;
- Uint32 *aAttrPtr = m_data_item->ptr[0].p;
- Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
- Uint32 *aDataPtr = m_data_item->ptr[1].p;
+ Uint32 *aAttrPtr = m_data_item->ptr[0].p;
+ Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
+ Uint32 *aDataPtr = m_data_item->ptr[1].p;
- DBUG_DUMP("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
- DBUG_DUMP("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
+ DBUG_DUMP("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
+ DBUG_DUMP("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
- // copy data into the RecAttr's
- // we assume that the respective attribute lists are sorted
+ // copy data into the RecAttr's
+ // we assume that the respective attribute lists are sorted
- // first the pk's
- {
- NdbRecAttr *tAttr= theFirstPkAttrs[0];
- NdbRecAttr *tAttr1= theFirstPkAttrs[1];
- while(tAttr)
+ // first the pk's
+ {
+ NdbRecAttr *tAttr= theFirstPkAttrs[0];
+ NdbRecAttr *tAttr1= theFirstPkAttrs[1];
+ while(tAttr)
+ {
+ assert(aAttrPtr < aAttrEndPtr);
+ unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
+ assert(tAttr->attrId() ==
+ AttributeHeader(*aAttrPtr).getAttributeId());
+ receive_data(tAttr, aDataPtr, tDataSz);
+ if (is_update)
{
- assert(aAttrPtr < aAttrEndPtr);
- unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
- assert(tAttr->attrId() ==
- AttributeHeader(*aAttrPtr).getAttributeId());
- receive_data(tAttr, aDataPtr, tDataSz);
- if (is_update)
- {
- receive_data(tAttr1, aDataPtr, tDataSz);
- tAttr1= tAttr1->next();
- }
- // next
- aAttrPtr++;
- aDataPtr+= tDataSz;
- tAttr= tAttr->next();
+ receive_data(tAttr1, aDataPtr, tDataSz);
+ tAttr1= tAttr1->next();
}
+ // next
+ aAttrPtr++;
+ aDataPtr+= tDataSz;
+ tAttr= tAttr->next();
}
-
- NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
-
- Uint32 tRecAttrId;
- Uint32 tAttrId;
- Uint32 tDataSz;
- int hasSomeData=0;
- while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
- tRecAttrId = tWorkingRecAttr->attrId();
- tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
- tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
-
- while (tAttrId > tRecAttrId) {
- DBUG_PRINT("info",("undef [%u] %u 0x%x [%u] 0x%x",
- tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
- tWorkingRecAttr->setUNDEFINED();
- tWorkingRecAttr = tWorkingRecAttr->next();
- if (tWorkingRecAttr == NULL)
- break;
- tRecAttrId = tWorkingRecAttr->attrId();
- }
+ }
+
+ NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
+
+ Uint32 tRecAttrId;
+ Uint32 tAttrId;
+ Uint32 tDataSz;
+ int hasSomeData=0;
+ while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
+ tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
+
+ while (tAttrId > tRecAttrId) {
+ DBUG_PRINT("info",("undef [%u] %u 0x%x [%u] 0x%x",
+ tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
+ tRecAttrId = tWorkingRecAttr->attrId();
+ }
+ if (tWorkingRecAttr == NULL)
+ break;
+
+ if (tAttrId == tRecAttrId) {
+ hasSomeData++;
- if (tAttrId == tRecAttrId) {
- hasSomeData++;
-
- DBUG_PRINT("info",("set [%u] %u 0x%x [%u] 0x%x",
- tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
+ DBUG_PRINT("info",("set [%u] %u 0x%x [%u] 0x%x",
+ tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
- receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
- tWorkingRecAttr = tWorkingRecAttr->next();
- }
- aAttrPtr++;
- aDataPtr += tDataSz;
+ receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
+ tWorkingRecAttr = tWorkingRecAttr->next();
}
+ aAttrPtr++;
+ aDataPtr += tDataSz;
+ }
- while (tWorkingRecAttr != NULL) {
- tRecAttrId = tWorkingRecAttr->attrId();
- //printf("set undefined [%u] %u %u [%u]\n",
- // tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+ while (tWorkingRecAttr != NULL) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ //printf("set undefined [%u] %u %u [%u]\n",
+ // tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ }
+
+ tWorkingRecAttr = theFirstDataAttrs[1];
+ aDataPtr = m_data_item->ptr[2].p;
+ Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
+ while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
+ tDataSz = AttributeHeader(*aDataPtr).getDataSize();
+ aDataPtr++;
+ while (tAttrId > tRecAttrId) {
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
- }
-
- tWorkingRecAttr = theFirstDataAttrs[1];
- aDataPtr = m_data_item->ptr[2].p;
- Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
- while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
- tRecAttrId = tWorkingRecAttr->attrId();
- tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
- tDataSz = AttributeHeader(*aDataPtr).getDataSize();
- aDataPtr++;
- while (tAttrId > tRecAttrId) {
- tWorkingRecAttr->setUNDEFINED();
- tWorkingRecAttr = tWorkingRecAttr->next();
- if (tWorkingRecAttr == NULL)
- break;
- tRecAttrId = tWorkingRecAttr->attrId();
- }
if (tWorkingRecAttr == NULL)
break;
- if (tAttrId == tRecAttrId) {
- assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
- hasSomeData++;
-
- receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
- tWorkingRecAttr = tWorkingRecAttr->next();
- }
- aDataPtr += tDataSz;
+ tRecAttrId = tWorkingRecAttr->attrId();
}
- while (tWorkingRecAttr != NULL) {
- tWorkingRecAttr->setUNDEFINED();
+ if (tWorkingRecAttr == NULL)
+ break;
+ if (tAttrId == tRecAttrId) {
+ assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
+ hasSomeData++;
+
+ receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
tWorkingRecAttr = tWorkingRecAttr->next();
}
-
- if (hasSomeData || !is_update)
- {
- DBUG_RETURN(r);
- }
+ aDataPtr += tDataSz;
+ }
+ while (tWorkingRecAttr != NULL) {
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ }
+
+ if (hasSomeData || !is_update)
+ {
+ DBUG_RETURN(1);
+ }
- if (r < nr)
- nr = r;
- else
- nr--; // we don't want to be stuck here forever
- } while (nr > 0);
DBUG_RETURN(0);
}
@@ -554,7 +533,9 @@
m_total_used(0),
m_dropped_ev_op(0)
{
+#ifdef VM_TRACE
m_latest_command= "NdbEventBuffer::NdbEventBuffer";
+#endif
if ((p_cond = NdbCondition_Create()) == NULL) {
ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
@@ -574,7 +555,9 @@
// ToDo set event buffer size
// pre allocate event data array
m_sz= 0;
+#ifdef VM_TRACE
m_free_data_count= 0;
+#endif
m_free_data= 0;
m_free_data_sz= 0;
@@ -631,18 +614,20 @@
m_free_data= last_data;
m_sz+= sz;
+#ifdef VM_TRACE
m_free_data_count+= sz;
-
+#endif
return 0;
}
int
-NdbEventBuffer::wait(int aMillisecondNumber,
- NdbEventOperation **p_ev_op)
+NdbEventBuffer::pollEvents(int aMillisecondNumber)
{
int ret= 0;
+#ifdef VM_TRACE
const char *m_latest_command_save= m_latest_command;
- m_latest_command= "NdbEventBuffer::wait";
+ m_latest_command= "NdbEventBuffer::pollEvents";
+#endif
NdbMutex_Lock(m_mutex);
NdbEventOperationImpl *ev_op= move_data();
@@ -653,9 +638,6 @@
}
if (ev_op)
{
- if (p_ev_op)
- *p_ev_op= ev_op->m_facade;
-
// m_mutex is locked
// update event ops data counters
#ifdef VM_TRACE
@@ -663,58 +645,80 @@
ev_op->m_data_done_count= 0;
#endif
- ret= m_available_data.m_count;
+ ret= 1;
}
NdbMutex_Unlock(m_mutex); // we have moved the data
+#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
+#endif
return ret;
}
NdbEventOperation *
-NdbEventBuffer::next(int *error, Uint64 maxGCI)
+NdbEventBuffer::nextEvent(Uint64 maxGCI)
{
-
- DBUG_ENTER("NdbEventBuffer::next");
+ DBUG_ENTER("NdbEventBuffer::nextEvent");
+#ifdef VM_TRACE
const char *m_latest_command_save= m_latest_command;
+#endif
if (m_used_data.m_count > 1024)
{
- m_latest_command= "NdbEventBuffer::next (lock)";
+#ifdef VM_TRACE
+ m_latest_command= "NdbEventBuffer::nextEvent (lock)";
+#endif
NdbMutex_Lock(m_mutex);
// return m_used_data to m_free_data
free_list(m_used_data);
NdbMutex_Unlock(m_mutex);
}
- m_latest_command= "NdbEventBuffer::next";
+#ifdef VM_TRACE
+ m_latest_command= "NdbEventBuffer::nextEvent";
+#endif
- while (m_available_data.m_head)
+ EventBufData *data;
+ while ((data= m_available_data.m_head) &&
+ data->sdata->gci <= maxGCI)
{
- NdbEventOperationImpl *ev_op= m_available_data.m_head->m_event_op;
- int r= ev_op->next(maxGCI);
+ NdbEventOperationImpl *op= data->m_event_op;
+
+ // set NdbEventOperation data
+ op->m_data_item= data;
+
+ // remove item from m_available_data
+ m_available_data.remove_first();
+
+ // add it to used list
+ m_used_data.append(data);
+
+#ifdef VM_TRACE
+ op->m_data_done_count++;
+#endif
+
+ int r= op->receive_event();
if (r > 0)
{
- if (ev_op->m_state == NdbEventOperation::EO_EXECUTING)
+ if (op->m_state == NdbEventOperation::EO_EXECUTING)
{
+#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
- DBUG_RETURN(ev_op->m_facade);
+#endif
+ DBUG_RETURN(op->m_facade);
}
// the next event belonged to an event op that is no
// longer valid, skip to next
continue;
}
- else if (r == 0)
- {
- m_latest_command= m_latest_command_save;
- DBUG_RETURN(0);
- }
- if (error)
- *error= -1;
+#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
- DBUG_RETURN(0);
+#endif
}
+ m_error.code= 0;
+#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
+#endif
DBUG_RETURN(0);
}
@@ -747,7 +751,9 @@
<< " state: " << hex << gci.m_state
<< " head: " << hex << gci.m_data.m_head
<< " tail: " << hex << gci.m_data.m_tail
+#ifdef VM_TRACE
<< " cnt: " << dec << gci.m_data.m_count
+#endif
<< "]";
return out;
}
@@ -1070,6 +1076,7 @@
data= m_free_data;
if (unlikely(data == 0))
{
+#ifdef VM_TRACE
printf("m_latest_command: %s\n", m_latest_command);
printf("no free data, m_latestGCI %lld m_waitGCI %lld\n",
m_latestGCI, m_waitGCI);
@@ -1079,6 +1086,7 @@
m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
printf("m_used_data_count %d\n", m_used_data.m_count);
+#endif
op->m_has_error= 2;
DBUG_RETURN(-1); // TODO handle this, overrun, or, skip?
}
@@ -1086,7 +1094,10 @@
// remove data from free list
m_free_data= data->m_next;
+#ifdef VM_TRACE
m_free_data_count--;
+ assert(m_free_data_sz >= data->sz);
+#endif
m_free_data_sz-= data->sz;
if (unlikely(copy_data_alloc(sdata, ptr, data)))
@@ -1120,47 +1131,6 @@
#endif
}
-int NdbEventBuffer::getDataL(NdbEventOperationImpl *op, Uint64 maxGCI)
-{
- DBUG_ENTER("NdbEventBuffer::getDataL");
- int r= 0;
-
- // ToDo should start at bufferid dependant pos for optimization
- EventBufData *prev= 0;
- for(EventBufData *data= m_available_data.m_head;
- data && (data->sdata->gci <= maxGCI);
- data= data->m_next)
- {
- DBUG_PRINT("info",("eventId: %d", data->m_event_op->m_eventId));
-
- if (op == data->m_event_op)
- {
- // set NdbEventOperation data
- data->m_event_op->m_data_item= data;
-
- // remove item from m_available_data
- m_available_data.remove(data,prev);
-
- // add it to used list
- m_used_data.append(data);
-
- if (!data->sdata->isGCIConsistent())
- op->m_has_error= 5;
-
-#ifdef VM_TRACE
- op->m_data_done_count++;
-#endif
-
- r= m_available_data.m_count+1;
- break;
- }
- // keep old data item to be able to "link out"
- prev= data;
- }
- DBUG_PRINT("exit",("return: %u", r));
- DBUG_RETURN(r);
-}
-
int
NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr f_ptr[3],
@@ -1185,7 +1155,6 @@
{
NdbMem_Free((char*)sdata);
#ifdef VM_TRACE
- assert(m_free_data_sz >= ev_buf->sz);
assert(m_total_alloc >= ev_buf->sz);
#endif
m_total_alloc-= ev_buf->sz;
@@ -1261,7 +1230,9 @@
if (!m_available_data.is_empty())
{
DBUG_ENTER("NdbEventBuffer::move_data");
+#ifdef VM_TRACE
DBUG_PRINT("exit",("m_available_data_count %u", m_available_data.m_count));
+#endif
DBUG_RETURN(m_available_data.m_head->m_event_op);
}
return 0;
@@ -1272,8 +1243,10 @@
{
// return list to m_free_data
list.m_tail->m_next= m_free_data;
- m_free_data= m_used_data.m_head;
+ m_free_data= list.m_head;
+#ifdef VM_TRACE
m_free_data_count+= list.m_count;
+#endif
m_free_data_sz+= list.m_sz;
// list returned to m_free_data
--- 1.29/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-05-30 17:11:35 +02:00
+++ 1.30/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2005-05-31 02:55:03 +02:00
@@ -43,7 +43,7 @@
EventBufData_list();
~EventBufData_list();
- void remove(EventBufData *data, EventBufData *prev);
+ void remove_first();
void append(EventBufData *data);
void append(const EventBufData_list &list);
@@ -56,7 +56,9 @@
inline
EventBufData_list::EventBufData_list()
- : m_head(0), m_tail(0), m_count(0), m_sz(0)
+ : m_head(0), m_tail(0),
+ m_count(0),
+ m_sz(0)
{
}
@@ -73,22 +75,13 @@
}
inline
-void EventBufData_list::remove(EventBufData *data, EventBufData *prev)
+void EventBufData_list::remove_first()
{
- if (likely(prev == 0))
- {
- m_head= data->m_next;
- if (m_head == 0)
- m_tail= 0;
- }
- else
- {
- prev->m_next= data->m_next;
- if (prev->m_next == 0) // the last was removed
- m_tail= prev;
- }
m_count--;
- m_sz-= data->sz;
+ m_sz-= m_head->sz;
+ m_head= m_head->m_next;
+ if (m_head == 0)
+ m_tail= 0;
}
inline
@@ -151,8 +144,7 @@
int stop();
NdbRecAttr *getValue(const char *colName, char *aValue, int n);
NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
- int next(Uint64 latestGCI);
- bool isConsistent();
+ int receive_event();
Uint64 getGCI();
Uint64 getLatestGCI();
@@ -229,14 +221,12 @@
void completeClusterFailed();
// used by user thread
- int getDataL(NdbEventOperationImpl *op, Uint64 maxGCI);
Uint64 getLatestGCI();
void setWaitGCI(Uint64 gci);
Uint32 getEventId(int bufferId);
- int wait(int aMillisecondNumber,
- NdbEventOperation **p_ev_op= 0);
- NdbEventOperation *next(int *error, Uint64 maxGCI);
+ int pollEvents(int aMillisecondNumber);
+ NdbEventOperation *nextEvent(Uint64 maxGCI);
NdbEventOperationImpl *move_data();
@@ -252,7 +242,9 @@
// Global Mutex used for some things
static NdbMutex *p_add_drop_mutex;
+#ifdef VM_TRACE
const char *m_latest_command;
+#endif
Ndb *m_ndb;
Uint64 m_latestGCI; // latest "handover" GCI
@@ -265,7 +257,9 @@
// receive thread
Gci_container m_complete_data;
EventBufData *m_free_data;
+#ifdef VM_TRACE
unsigned m_free_data_count;
+#endif
unsigned m_free_data_sz;
// user thread
@@ -275,6 +269,7 @@
unsigned m_total_alloc; // total allocated memory
unsigned m_total_used; // used allocated memory
+ NdbError m_error;
private:
int expand(unsigned sz);
--- 1.15/storage/ndb/test/ndbapi/test_event.cpp 2005-05-24 15:44:27 +02:00
+++ 1.16/storage/ndb/test/ndbapi/test_event.cpp 2005-05-31 02:55:03 +02:00
@@ -201,10 +201,11 @@
if (res > 0) {
//printf("got data! %d\n", r);
- int overrun;
- while (pOp->next(&overrun) > 0) {
+ NdbEventOperation *tmp;
+ while ((tmp= pNdb->nextEvent()))
+ {
+ assert(tmp == pOp);
r++;
- r += overrun;
count++;
Uint32 gci = pOp->getGCI();
@@ -245,7 +246,6 @@
recEvent[pk].count++;
}
- g_info << "overrun " << overrun << " pk " << pk;
for (i = 1; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value
g_info << " post[" << i << "]=";
@@ -570,8 +570,7 @@
ndb->pollEvents(0);
- int error;
- while ((pOp= ndb->nextEvent(&error, curr_gci)) != 0)
+ while ((pOp= ndb->nextEvent(curr_gci)) != 0)
{
assert(pOp == pCreate);
int noRetries= 0;
@@ -977,9 +976,8 @@
{
break;
}
- int error= 0;
NdbEventOperation *pOp;
- while ((pOp= ndb->nextEvent(&error)))
+ while ((pOp= ndb->nextEvent()))
{
char buf[1024];
sprintf(buf, "%s_SHADOW", pOp->getTable()->getName());
@@ -1151,11 +1149,6 @@
NdbSleep_MilliSleep(100); // sleep before retying
} while(1);
} // for
- if (error)
- {
- g_err << "nextEvent()\n";
- DBUG_RETURN(-1);
- }
} // while(1)
g_info << "n_updates: " << n_updates << " "
<< "n_inserts: " << n_inserts << " "
--- 1.260/sql/ha_ndbcluster.cc 2005-05-30 17:11:35 +02:00
+++ 1.261/sql/ha_ndbcluster.cc 2005-05-31 02:54:58 +02:00
@@ -8704,9 +8704,9 @@
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
dict->getNdbError().code, dict->getNdbError().message,
"NDB");
+ ndb_error= dict->getNdbError();
+ DBUG_RETURN(-1);
}
- ndb_error= dict->getNdbError();
- DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
@@ -9171,7 +9171,6 @@
arg __attribute__((unused)))
{
THD *thd; /* needs to be first for thread_stack */
- int error= 0;
Ndb *ndb= 0;
injector *inj= injector::instance();
@@ -9339,10 +9338,9 @@
int event_count= 0;
#endif
thd->proc_info= "Processing events";
- error= 0;
NdbEventOperation *pOp;
Binlog_index_row row;
- while ((pOp= ndb->nextEvent(&error, ndb_latest_received_binlog_epoch)) != NULL)
+ while ((pOp= ndb->nextEvent(ndb_latest_received_binlog_epoch)) != NULL)
{
assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
if (!apply_status_share)
@@ -9405,7 +9403,7 @@
// a non-data event
ndb_injector_thread_handle_non_data_event(ndb,pOp,row);
}
- } while ((pOp= ndb->nextEvent(&error, gci)) != NULL);
+ } while ((pOp= ndb->nextEvent(gci)) != NULL);
#ifdef RUN_NDB_BINLOG_TIMER
write_timer.stop();
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1897) | tomas | 31 May |