Below is the list of changes that have just been committed into a local
5.0 repository of mikron. When mikron does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html
ChangeSet
1.1840 05/03/22 02:14:13 mronstrom@stripped +9 -0
WL 2405:
Integrated scan and dictSignal into poll owner handling for
improved performance of single threaded usage of NDB API
Still some things to do
ndb/src/ndbapi/TransporterFacade.hpp
1.25 05/03/22 02:14:05 mronstrom@stripped +25 -7
New PollGuard class implementation
Changed cond wait queue to use NdbWaiter instead of Ndb object
ndb/src/ndbapi/TransporterFacade.cpp
1.36 05/03/22 02:14:05 mronstrom@stripped +149 -14
New PollGuard class implementation
Changed cond wait queue to use NdbWaiter instead of Ndb object
ndb/src/ndbapi/Ndbinit.cpp
1.31 05/03/22 02:14:05 mronstrom@stripped +2 -0
new variables to initialise in NdbWaiter
ndb/src/ndbapi/Ndbif.cpp
1.30 05/03/22 02:14:04 mronstrom@stripped +51 -86
Moved lots of code to PollGuard class, added some comments
and used PollGuard object
ndb/src/ndbapi/NdbWaiter.hpp
1.2 05/03/22 02:14:04 mronstrom@stripped +19 -2
NdbWaiter class expanded, is the object in the cond wait queue
since the Ndb object is not always around
Don't signal when we are poll_owner object
ndb/src/ndbapi/NdbScanOperation.cpp
1.64 05/03/22 02:14:04 mronstrom@stripped +22 -25
unlock_and_signal hidden by by when poll_guard goes out of context
ndb/src/ndbapi/NdbDictionaryImpl.cpp
1.66 05/03/22 02:14:04 mronstrom@stripped +11 -20
WL 2405 Better performance for single threaded usage of NDB API:
Introduced a PollGuard class
The wait_n_unlock takes care of what wait on NdbWaiter did plus
unlock plus poll owner handling
ndb/include/ndbapi/NdbScanOperation.hpp
1.30 05/03/22 02:14:04 mronstrom@stripped +3 -1
WL 2405 Better performance for single threaded usage of NDB API:
Introduced a PollGuard class
ndb/include/ndbapi/Ndb.hpp
1.41 05/03/22 02:14:04 mronstrom@stripped +3 -3
WL 2405 Better performance for single threaded usage of NDB API:
Introduced a PollGuard class
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: mronstrom
# Host: c-dc08e253.1238-1-64736c10.cust.bredbandsbolaget.se
# Root: /Users/mikron/wl2405
--- 1.40/ndb/include/ndbapi/Ndb.hpp Fri Feb 4 14:40:48 2005
+++ 1.41/ndb/include/ndbapi/Ndb.hpp Tue Mar 22 02:14:04 2005
@@ -985,6 +985,7 @@
class NdbBlob;
class NdbReceiver;
class TransporterFacade;
+class PollGuard;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
@@ -1586,10 +1587,9 @@
Uint32 pollCompleted(NdbTransaction** aCopyArray);
void sendPrepTrans(int forceSend);
void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans);
- int poll_trans(int milliSecs, int noOfEventsToWaitFor,
- TransporterFacade *tp);
+ int poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg);
void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor,
- TransporterFacade *tp);
+ PollGuard *pg);
void completedTransaction(NdbTransaction* aTransaction);
void completedScanTransaction(NdbTransaction* aTransaction);
--- 1.29/ndb/include/ndbapi/NdbScanOperation.hpp Mon Jan 10 01:25:08 2005
+++ 1.30/ndb/include/ndbapi/NdbScanOperation.hpp Tue Mar 22 02:14:04 2005
@@ -21,6 +21,7 @@
class NdbBlob;
class NdbResultSet;
+class PollGuard;
/**
* @class NdbScanOperation
@@ -159,7 +160,8 @@
int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
virtual void release();
- int close_impl(class TransporterFacade*, bool forceSend = false);
+ int close_impl(class TransporterFacade*, bool forceSend,
+ PollGuard *poll_guard);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
--- 1.65/ndb/src/ndbapi/NdbDictionaryImpl.cpp Thu Jan 27 18:38:14 2005
+++ 1.66/ndb/src/ndbapi/NdbDictionaryImpl.cpp Tue Mar 22 02:14:04 2005
@@ -946,7 +946,8 @@
m_buffer.clear();
// Protected area
- m_transporter->lock_mutex();
+ Uint32 block_no; //TODO fix this
+ PollGuard poll_guard(m_transporter, &m_waiter, block_no);
Uint32 aNodeId;
if (useMasterNodeId) {
if ((m_masterNodeId == 0) ||
@@ -959,7 +960,6 @@
}
if(aNodeId == 0){
m_error.code= 4009;
- m_transporter->unlock_mutex();
DBUG_RETURN(-1);
}
{
@@ -980,21 +980,15 @@
r = m_transporter->sendSignal(signal, aNodeId);
}
if(r != 0){
- m_transporter->unlock_mutex();
continue;
}
}
m_error.code= 0;
-
- m_waiter.m_node = aNodeId;
- m_waiter.m_state = wst;
-
- m_waiter.wait(theWait);
- m_transporter->unlock_mutex();
+ int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst);
// End of Protected area
- if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
+ if(ret_val == 0 && m_error.code == 0){
// Normal return
DBUG_RETURN(0);
}
@@ -1002,7 +996,7 @@
/**
* Handle error codes
*/
- if(m_waiter.m_state == WAIT_NODE_FAILURE)
+ if(ret_val == -2) //WAIT_NODE_FAILURE
continue;
if ( (temporaryMask & m_error.code) != 0 ) {
@@ -3064,26 +3058,23 @@
for (Uint32 i = 0; i < RETRIES; i++) {
m_buffer.clear();
// begin protected
- m_transporter->lock_mutex();
+ Uint32 block_no; //TODO fix this
+ PollGuard poll_guard(m_transporter, &m_waiter, block_no);
Uint16 aNodeId = m_transporter->get_an_alive_node();
if (aNodeId == 0) {
m_error.code= 4009;
- m_transporter->unlock_mutex();
return -1;
}
if (m_transporter->sendSignal(signal, aNodeId) != 0) {
- m_transporter->unlock_mutex();
continue;
}
m_error.code= 0;
- m_waiter.m_node = aNodeId;
- m_waiter.m_state = WAIT_LIST_TABLES_CONF;
- m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
- m_transporter->unlock_mutex();
+ int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,
+ aNodeId, WAIT_LIST_TABLES_CONF);
// end protected
- if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
+ if (ret_val == 0 && m_error.code == 0)
return 0;
- if (m_waiter.m_state == WAIT_NODE_FAILURE)
+ if (ret_val == -2) //WAIT_NODE_FAILURE
continue;
return -1;
}
--- 1.63/ndb/src/ndbapi/NdbScanOperation.cpp Wed Jan 19 18:33:33 2005
+++ 1.64/ndb/src/ndbapi/NdbScanOperation.cpp Tue Mar 22 02:14:04 2005
@@ -452,7 +452,8 @@
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
@@ -487,10 +488,8 @@
/**
* No completed...
*/
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
@@ -647,9 +646,9 @@
m_sent_receivers_count);
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- close_impl(tp, forceSend);
-
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ close_impl(tp, forceSend, &poll_guard);
} while(0);
theNdbCon->theScanningOp = 0;
@@ -1313,7 +1312,8 @@
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
@@ -1323,10 +1323,8 @@
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
@@ -1456,7 +1454,9 @@
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
+ PollGuard *poll_guard)
+{
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1471,9 +1471,7 @@
*/
while(theError.code == 0 && m_sent_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
switch(return_code){
case 0:
break;
@@ -1541,9 +1539,7 @@
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId);
switch(return_code){
case 0:
break;
@@ -1583,12 +1579,13 @@
{
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
Uint32 nodeId = theNdbCon->theDBnode;
{
int res;
- if((res= close_impl(tp, forceSend)))
+ if((res= close_impl(tp, forceSend, &poll_guard)))
{
return res;
}
@@ -1602,7 +1599,6 @@
theError.code = 0;
if (doSendScan(nodeId) == -1)
return -1;
-
return 0;
}
@@ -1612,8 +1608,9 @@
{
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- res= close_impl(tp, forceSend);
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ res= close_impl(tp, forceSend, &poll_guard);
}
if(!res)
--- 1.29/ndb/src/ndbapi/Ndbif.cpp Fri Feb 4 19:59:52 2005
+++ 1.30/ndb/src/ndbapi/Ndbif.cpp Tue Mar 22 02:14:04 2005
@@ -334,6 +334,7 @@
const Uint32 tFirstData = *tDataPtr;
const Uint32 tLen = aSignal->getLength();
void * tFirstDataPtr;
+ NdbWaiter *t_waiter;
/*
In order to support 64 bit processes in the application we need to use
@@ -835,11 +836,32 @@
}
default:
goto InvalidSignal;
- }//switch
-
- if (theImpl->theWaiter.m_state == NO_WAIT) {
- // Wake up the thread waiting for response
- NdbCondition_Signal(theImpl->theWaiter.m_condition);
+ }//swich
+
+ t_waiter= &theImpl->theWaiter;
+ if (t_waiter->get_state() == NO_WAIT)
+ {
+ /*
+ If our waiter object is the owner of the "poll rights", then we
+ can simply return, we will return from this routine to the
+ place where external_poll was called. From there it will move
+ the "poll ownership" to a new thread if available.
+
+ If our waiter object doesn't own the "poll rights", then we must
+ signal the thread from where this waiter object called
+ its conditional wait. This will wake up this thread so that it
+ can continue its work.
+ */
+ TransporterFacade *tp= TransporterFacade::instance();
+ if (tp->get_poll_owner() != t_waiter)
+ {
+ /*
+ Wake up the thread waiting for response and remove it from queue
+ of objects waiting for receive completion
+ */
+ tp->remove_from_cond_wait_queue(t_waiter);
+ t_waiter->cond_signal();
+ }
}//if
return;
@@ -895,9 +917,17 @@
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0;
TransporterFacade *tp = TransporterFacade::instance();
- if (this != tp->get_poll_owner()) {
- TransporterFacade::instance()->remove_cond_wait_queue(this);
- cond_signal();
+ NdbWaiter *t_waiter= &theImpl->theWaiter;
+ if (tp->get_poll_owner() != t_waiter) {
+ /*
+ When we come here, this is executed by the thread owning the "poll
+ rights". This thread is not where our waiter object belongs.
+ Thus we wake up the thread owning this waiter object but first
+ we must remove it from the conditional wait queue so that we
+ don't assign it as poll owner later on.
+ */
+ tp->remove_from_cond_wait_queue(t_waiter);
+ t_waiter->cond_signal();
}
return;
}//if
@@ -1158,7 +1188,7 @@
void
Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
int noOfEventsToWaitFor,
- TransporterFacade *tp)
+ PollGuard *poll_guard)
{
theImpl->theWaiter.m_state = NO_WAIT;
/**
@@ -1172,55 +1202,13 @@
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
do {
if (waitTime < 1000) waitTime = 1000;
- Ndb *t_poll_owner = tp->get_poll_owner();
- if (t_poll_owner != NULL &&
- t_poll_owner != this) {
- /*
- We didn't get hold of the poll "right". We will sleep on a
- conditional mutex until the thread owning the poll "right"
- will wake us up after all data is received. If no data arrives
- we will wake up eventually due to the timeout.
- */
- cond_wait_index = tp->put_cond_wait_queue(this);
- NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
- (NdbMutex*)theImpl->theWaiter.m_mutex,
- waitTime);
- if (cond_wait_index != TransporterFacade::MAX_NO_THREADS)
- tp->remove_cond_wait_queue(this);
- } else {
- /*
- We got the poll "right" and we poll until data is received. After
- receiving data we will check if all data is received, if not we
- poll again.
- */
- tp->set_poll_owner(this);
- tp->external_poll(waitTime);
- }
+ poll_guard->wait_for_input(waitTime);
if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
break;
}//if
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
waitTime = (int)(maxTime - NdbTick_CurrentMillisecond());
} while (waitTime > 0);
- /*
- When completing the poll for this thread we must return the poll
- ownership if we own it. We will give it to the last thread that
- came here (the most recent) which is likely to be the one also
- last to complete. We will remove that thread from the conditional
- wait queue and set him as the new owner of the poll "right".
- We will wait however with the signal until we have unlocked the
- mutex for performance reasons.
- See Stevens book on Unix NetworkProgramming: The Sockets Networking
- API Volume 1 Third Edition on page 703-704 for a discussion on this
- subject.
- */
- if (tp->get_poll_owner() == this) {
- tp->set_poll_owner(NULL);
- cond_signal_ndb = tp->rem_last_from_cond_wait_queue();
- if (cond_signal_ndb != NULL)
- tp->set_poll_owner(cond_signal_ndb);
- }
- return;
}//Ndb::waitCompletedTransactions()
void Ndb::cond_signal()
@@ -1254,15 +1242,15 @@
int
Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
{
- TransporterFacade *tp = TransporterFacade::instance();
- tp->lock_mutex();
+ PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+ theNdbBlockNumber);
sendPrepTrans(forceSend);
- return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, tp);
+ return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
}
int
Ndb::poll_trans(int aMillisecondNumber, int minNoOfEventsToWakeup,
- TransporterFacade *tp)
+ PollGuard *pg)
{
NdbTransaction* tConArray[1024];
Uint32 tNoCompletedTransactions;
@@ -1272,19 +1260,15 @@
}//if
if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
(aMillisecondNumber > 0)) {
- waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, tp);
+ waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, pg);
tNoCompletedTransactions = pollCompleted(tConArray);
} else {
tNoCompletedTransactions = pollCompleted(tConArray);
}//if
- tp->unlock_mutex();
- if (cond_signal_ndb) {
- cond_signal_ndb->cond_signal();
- cond_signal_ndb = NULL;
- }
+ pg->unlock_and_signal();
reportCallback(tConArray, tNoCompletedTransactions);
return tNoCompletedTransactions;
-}//Ndb::sendPollNdb()
+}
/*****************************************************************************
int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);
@@ -1296,29 +1280,10 @@
int
Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup)
{
- TransporterFacade *tp = TransporterFacade::instance();
- tp->lock_mutex();
- return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, tp);
-}
-/*
- Duplicated code: moved into poll_trans(..)
-
- if ((minNoOfEventsToWakeup == 0) ||
- ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
- minNoOfEventsToWakeup = theNoOfSentTransactions;
- }//if
- if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
- (aMillisecondNumber > 0)) {
- waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
- tNoCompletedTransactions = pollCompleted(tConArray);
- } else {
- tNoCompletedTransactions = pollCompleted(tConArray);
- }//if
- TransporterFacade::instance()->unlock_mutex();
- reportCallback(tConArray, tNoCompletedTransactions);
- return tNoCompletedTransactions;
+ PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+ theNdbBlockNumber);
+ return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
}
-*/
/*****************************************************************************
int receiveOptimisedResponse();
@@ -1332,7 +1297,7 @@
Ndb::receiveResponse(int waitTime){
int tResultCode;
TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
-
+
theImpl->theWaiter.wait(waitTime);
if(theImpl->theWaiter.m_state == NO_WAIT) {
--- 1.30/ndb/src/ndbapi/Ndbinit.cpp Fri Feb 4 13:56:37 2005
+++ 1.31/ndb/src/ndbapi/Ndbinit.cpp Tue Mar 22 02:14:05 2005
@@ -228,6 +228,8 @@
m_node = 0;
m_state = NO_WAIT;
m_mutex = 0;
+ m_poll_owner= false;
+ m_cond_wait_index= TransporterFacade::MAX_NO_THREADS;
m_condition = NdbCondition_Create();
}
--- 1.35/ndb/src/ndbapi/TransporterFacade.cpp Fri Feb 4 14:40:48 2005
+++ 1.36/ndb/src/ndbapi/TransporterFacade.cpp Tue Mar 22 02:14:05 2005
@@ -499,7 +499,7 @@
doubly linked list. Finally it assigns the ndb object reference to the
entry.
*/
-Uint32 TransporterFacade::put_cond_wait_queue(Ndb *aNdb)
+Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter)
{
/*
Get first free entry
@@ -519,8 +519,8 @@
cond_wait_array[last_in_cond_wait].next_cond_wait = index;
last_in_cond_wait = index;
- cond_wait_array[index].ndb_cond_wait_object = aNdb;
- aNdb->set_cond_wait_index(index);
+ cond_wait_array[index].cond_wait_object = aWaiter;
+ aWaiter->set_cond_wait_index(index);
return index;
}
@@ -532,11 +532,11 @@
NULLifies the ndb object reference entry and sets the index in the
Ndb object to NIL (=MAX_NO_THREADS)
*/
-void TransporterFacade::remove_cond_wait_queue(Ndb *aNdb)
+void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter)
{
- Uint32 index = aNdb->get_cond_wait_index();
+ Uint32 index = aWaiter->get_cond_wait_index();
assert(index < MAX_NO_THREADS &&
- cond_wait_array[index].ndb_cond_wait_object == aNdb);
+ cond_wait_array[index].cond_wait_object == aWaiter);
/*
Remove from doubly linked list
*/
@@ -558,23 +558,23 @@
cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS;
first_free_cond_wait = index;
- cond_wait_array[index].ndb_cond_wait_object = NULL;
- aNdb->set_cond_wait_index(MAX_NO_THREADS);
+ cond_wait_array[index].cond_wait_object = NULL;
+ aWaiter->set_cond_wait_index(MAX_NO_THREADS);
}
/*
Get the latest Ndb object from the conditional wait queue
and also remove it from the list.
*/
-Ndb* TransporterFacade::rem_last_from_cond_wait_queue()
+NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue()
{
- Ndb *tNdb;
+ NdbWaiter *tWaiter;
Uint32 index = last_in_cond_wait;
if (last_in_cond_wait == MAX_NO_THREADS)
return NULL;
- tNdb = cond_wait_array[index].ndb_cond_wait_object;
- remove_cond_wait_queue(tNdb);
- return tNdb;
+ tWaiter = cond_wait_array[index].cond_wait_object;
+ remove_from_cond_wait_queue(tWaiter);
+ return tWaiter;
}
void TransporterFacade::init_cond_wait_queue()
@@ -590,7 +590,7 @@
*/
first_free_cond_wait = 0;
for (i = 0; i < MAX_NO_THREADS; i++) {
- cond_wait_array[i].ndb_cond_wait_object = NULL;
+ cond_wait_array[i].cond_wait_object = NULL;
cond_wait_array[i].next_cond_wait = i+1;
cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS;
}
@@ -1235,6 +1235,141 @@
m_objectExecute[number] = oe;
m_statusFunction[number] = 0;
return 0;
+}
+
+PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
+ Uint32 block_no)
+{
+ m_tp= tp;
+ m_waiter= aWaiter;
+ m_locked= true;
+ m_block_no= block_no;
+ tp->lock_mutex();
+}
+
+/*
+ This is a common routine for possibly forcing the send of buffered signals
+ and receiving response the thread is waiting for. It is designed to be
+ useful from:
+ 1) PK, UK lookups using the asynchronous interface
+ This routine uses the wait_for_input routine instead since it has
+ special end conditions due to the asynchronous nature of its usage.
+ 2) Scans
+ 3) dictSignal
+ It uses a NdbWaiter object to wait on the events and this object is
+ linked into the conditional wait queue. Thus this object contains
+ a reference to its place in the queue.
+
+ It replaces the method receiveResponse previously used on the Ndb object
+*/
+int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state)
+{
+ int ret_val;
+ m_waiter->set_node(nodeId);
+ m_waiter->set_state(state);
+ ret_val= wait_for_input_in_loop(wait_time);
+ unlock_and_signal();
+ return ret_val;
+}
+
+int PollGuard::wait_scan(int wait_time, NodeId nodeId)
+{
+ m_waiter->set_node(nodeId);
+ m_waiter->set_state(WAIT_SCAN);
+ return wait_for_input_in_loop(wait_time);
+}
+
+int PollGuard::wait_for_input_in_loop(int wait_time)
+{
+ int ret_val;
+ m_tp->checkForceSend(m_block_no);
+ NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
+ NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
+ do
+ {
+ wait_for_input(wait_time);
+ Uint32 state= m_waiter->get_state();
+ if (state == NO_WAIT)
+ return 0;
+ else if (state == WAIT_NODE_FAILURE)
+ {
+ ret_val= -2;
+ break;
+ }
+ wait_time= max_time - NdbTick_CurrentMillisecond();
+ if (wait_time <= 0)
+ {
+ m_waiter->set_state(WST_WAIT_TIMEOUT);
+ ret_val= -1;
+ break;
+ }
+ } while (1);
+#ifdef VM_TRACE
+ ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
+ ndbout << m_waiter->get_state() << endl;
+#endif
+ m_waiter->set_state(NO_WAIT);
+ return ret_val;
+}
+
+void PollGuard::wait_for_input(int wait_time)
+{
+ NdbWaiter *t_poll_owner= m_tp->get_poll_owner();
+ if (t_poll_owner != NULL && t_poll_owner != m_waiter)
+ {
+ /*
+ We didn't get hold of the poll "right". We will sleep on a
+ conditional mutex until the thread owning the poll "right"
+ will wake us up after all data is received. If no data arrives
+ we will wake up eventually due to the timeout.
+ */
+ Uint32 cond_wait_index= m_tp->put_in_cond_wait_queue(m_waiter);
+ m_waiter->wait(wait_time);
+ if (cond_wait_index != TransporterFacade::MAX_NO_THREADS)
+ m_tp->remove_from_cond_wait_queue(m_waiter);
+ }
+ else
+ {
+ /*
+ We got the poll "right" and we poll until data is received. After
+ receiving data we will check if all data is received, if not we
+ poll again.
+ */
+ m_tp->set_poll_owner(m_waiter);
+ m_waiter->set_poll_owner(true);
+ m_tp->external_poll(wait_time);
+ }
+}
+
+void PollGuard::unlock_and_signal()
+{
+ NdbWaiter *t_signal_cond_waiter= 0;
+ if (!m_locked)
+ return;
+ /*
+ When completing the poll for this thread we must return the poll
+ ownership if we own it. We will give it to the last thread that
+ came here (the most recent) which is likely to be the one also
+ last to complete. We will remove that thread from the conditional
+ wait queue and set him as the new owner of the poll "right".
+ We will wait however with the signal until we have unlocked the
+ mutex for performance reasons.
+ See Stevens book on Unix NetworkProgramming: The Sockets Networking
+ API Volume 1 Third Edition on page 703-704 for a discussion on this
+ subject.
+ */
+ if (m_tp->get_poll_owner() == m_waiter)
+ {
+ m_waiter->set_poll_owner(false);
+ t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue();
+ m_tp->set_poll_owner(t_signal_cond_waiter);
+ if (t_signal_cond_waiter)
+ t_signal_cond_waiter->set_poll_owner(true);
+ }
+ m_tp->unlock_mutex();
+ if (t_signal_cond_waiter)
+ t_signal_cond_waiter->cond_signal();
+ m_locked=false;
}
template class Vector<NodeStatusFunction>;
--- 1.24/ndb/src/ndbapi/TransporterFacade.hpp Fri Feb 4 14:40:48 2005
+++ 1.25/ndb/src/ndbapi/TransporterFacade.hpp Tue Mar 22 02:14:05 2005
@@ -34,6 +34,7 @@
class Ndb;
class NdbApiSignal;
+class NdbWaiter;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
@@ -137,19 +138,19 @@
be the last to complete its reception.
*/
void external_poll(Uint32 wait_time);
- Ndb *get_poll_owner(void) const { return poll_owner; }
- void set_poll_owner(Ndb *new_owner) { poll_owner = new_owner; }
- Uint32 put_cond_wait_queue(Ndb *aNdb);
- void remove_cond_wait_queue(Ndb* aNdb);
- Ndb *rem_last_from_cond_wait_queue();
+ NdbWaiter* get_poll_owner(void) const { return poll_owner; }
+ void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
+ Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
+ void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
+ NdbWaiter* rem_last_from_cond_wait_queue();
private:
void init_cond_wait_queue();
struct CondWaitQueueElement {
- Ndb *ndb_cond_wait_object;
+ NdbWaiter *cond_wait_object;
Uint32 next_cond_wait;
Uint32 prev_cond_wait;
};
- Ndb *poll_owner;
+ NdbWaiter *poll_owner;
CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
Uint32 first_in_cond_wait;
Uint32 first_free_cond_wait;
@@ -275,6 +276,23 @@
public:
GlobalDictCache m_globalDictCache;
+};
+
+class PollGuard
+{
+ public:
+ PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
+ ~PollGuard() { unlock_and_signal(); }
+ int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state);
+ int wait_for_input_in_loop(int wait_time);
+ void wait_for_input(int wait_time);
+ int wait_scan(int wait_time, NodeId nodeId);
+ void unlock_and_signal();
+ private:
+ TransporterFacade *m_tp;
+ NdbWaiter *m_waiter;
+ Uint32 m_block_no;
+ bool m_locked;
};
inline
--- 1.1/ndb/src/ndbapi/NdbWaiter.hpp Mon Dec 20 12:36:04 2004
+++ 1.2/ndb/src/ndbapi/NdbWaiter.hpp Tue Mar 22 02:14:04 2005
@@ -54,10 +54,19 @@
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
+ void cond_signal();
+ void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; }
+ Uint32 get_state() { return m_state; }
+ void set_state(Uint32 state) { m_state= state; }
+ void set_node(Uint32 node) { m_node= node; }
+ Uint32 get_cond_wait_index() { return m_cond_wait_index; }
+ void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; }
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
+ bool m_poll_owner;
+ Uint32 m_cond_wait_index;
struct NdbCondition * m_condition;
};
@@ -88,7 +97,8 @@
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
- NdbCondition_Signal(m_condition);
+ if (!m_poll_owner)
+ NdbCondition_Signal(m_condition);
}
}
@@ -96,7 +106,14 @@
void
NdbWaiter::signal(Uint32 state){
m_state = state;
- NdbCondition_Signal(m_condition);
+ if (!m_poll_owner)
+ NdbCondition_Signal(m_condition);
}
+inline
+void
+NdbWaiter::cond_signal()
+{
+ NdbCondition_Signal(m_condition);
+}
#endif
| Thread |
|---|
| • bk commit into 5.0 tree (mronstrom:1.1840) | mikael | 22 Mar |