#At file:///export/space/pekka/ndb/version/my51-wl4391/
2738 Pekka Nousiainen 2008-08-27
wl#4391 19.diff
SUMA (subscription triggers).
modified:
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-08-27 08:00:52 +0000
@@ -745,6 +745,8 @@ LocalProxy::sendTIME_SIGNAL(Signal* sign
void
LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
{
+ if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
+ return;
const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
ss.m_req = *req;
@@ -822,6 +824,8 @@ LocalProxy::sendCREATE_TRIG_IMPL_CONF(Si
void
LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
{
+ if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
+ return;
const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
ss.m_req = *req;
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-08-27 08:00:52 +0000
@@ -128,6 +128,10 @@ protected:
template <class Ss>
struct SsPool {
Ss m_pool[Ss::poolSize];
+ Uint32 m_usage;
+ SsPool() {
+ m_usage = 0;
+ }
};
Uint32 c_ssIdSeq;
@@ -148,6 +152,7 @@ protected:
Ss& ssSeize(Uint32 ssId) {
SsPool<Ss>& sp = Ss::pool(this);
ndbrequire(ssId != 0);
+ ndbrequire(sp.m_usage < Ss::poolSize);
Ss* ssptr = 0;
for (Uint32 i = 0; i < Ss::poolSize; i++) {
Ss& ss = sp.m_pool[i];
@@ -160,6 +165,7 @@ protected:
}
}
ndbrequire(ssptr != 0);
+ sp.m_usage++;
return *ssptr;
}
@@ -182,6 +188,7 @@ protected:
template <class Ss>
void ssRelease(Uint32 ssId) {
SsPool<Ss>& sp = Ss::pool(this);
+ ndbrequire(sp.m_usage != 0);
ndbrequire(ssId != 0);
Ss* ssptr = 0;
for (Uint32 i = 0; i < Ss::poolSize; i++) {
@@ -193,6 +200,7 @@ protected:
}
}
ndbrequire(ssptr != 0);
+ sp.m_usage--;
}
template <class Ss>
@@ -200,6 +208,22 @@ protected:
ssRelease<Ss>(ss.m_ssId);
}
+ /*
+ * In some cases handle pool full via delayed signal.
+ * wl4391_todo maybe use CONTINUEB and guard against infinite loop.
+ */
+ template <class Ss>
+ bool ssQueue(Signal* signal) {
+ SsPool<Ss>& sp = Ss::pool(this);
+ if (sp.m_usage < Ss::poolSize)
+ return false;
+ ndbrequire(signal->getNoOfSections() == 0);
+ GlobalSignalNumber gsn = signal->header.theVerId_signalNumber & 0xFFFF;
+ sendSignalWithDelay(reference(), gsn,
+ signal, 10, signal->length());
+ return true;
+ }
+
// system info
Uint32 c_typeOfStart;
@@ -405,7 +429,7 @@ protected:
m_sendREQ = &LocalProxy::sendCREATE_TRIG_IMPL_REQ;
m_sendCONF = &LocalProxy::sendCREATE_TRIG_IMPL_CONF;
}
- enum { poolSize = 1 };
+ enum { poolSize = 3 };
static SsPool<Ss_CREATE_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
return proxy->c_ss_CREATE_TRIG_IMPL_REQ;
}
@@ -424,7 +448,7 @@ protected:
m_sendREQ = &LocalProxy::sendDROP_TRIG_IMPL_REQ;
m_sendCONF = &LocalProxy::sendDROP_TRIG_IMPL_CONF;
}
- enum { poolSize = 1 };
+ enum { poolSize = 3 };
static SsPool<Ss_DROP_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
return proxy->c_ss_DROP_TRIG_IMPL_REQ;
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-08-07 11:52:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-08-27 08:00:52 +0000
@@ -897,9 +897,9 @@ struct TupTriggerData {
TriggerActionTime::Value triggerActionTime;
TriggerEvent::Value triggerEvent;
/**
- * Receiver block
+ * Receiver block reference
*/
- Uint32 m_receiverBlock;
+ Uint32 m_receiverRef;
/**
* Monitor all replicas, i.e. trigger will fire on all nodes where tuples
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2008-08-27 08:00:52 +0000
@@ -318,7 +318,7 @@ Dbtup::createTrigger(Tablerec* table, co
tptr.p->monitorAllAttributes = TriggerInfo::getMonitorAllAttributes(tinfo);
tptr.p->monitorReplicas = TriggerInfo::getMonitorReplicas(tinfo);
- tptr.p->m_receiverBlock = refToBlock(req->receiverRef);
+ tptr.p->m_receiverRef = req->receiverRef;
if (tptr.p->monitorAllAttributes)
{
@@ -420,7 +420,7 @@ Dbtup::dropTrigger(Tablerec* table, cons
if (ptr.p->triggerId == triggerId)
{
if(ttype==TriggerType::SUBSCRIPTION &&
- sender != ptr.p->m_receiverBlock)
+ sender != refToBlock(ptr.p->m_receiverRef))
{
/**
* You can only drop your own triggers for subscription triggers.
@@ -874,7 +874,7 @@ void Dbtup::executeTrigger(KeyReqStruct
}
*/
Signal* signal= req_struct->signal;
- BlockReference ref = trigPtr->m_receiverBlock;
+ BlockReference ref = trigPtr->m_receiverRef;
Uint32* const keyBuffer = &cinBuffer[0];
Uint32* const afterBuffer = &coutBuffer[0];
Uint32* const beforeBuffer = &clogMemBuffer[0];
@@ -886,7 +886,7 @@ void Dbtup::executeTrigger(KeyReqStruct
ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
Fragrecord::FragState fragstatus = regFragPtr.p->fragStatus;
- if (ref == BACKUP) {
+ if (refToMain(ref) == BACKUP) {
jam();
/*
In order for the implementation of BACKUP to work even when changing
@@ -953,8 +953,9 @@ void Dbtup::executeTrigger(KeyReqStruct
case (TriggerType::SUBSCRIPTION_BEFORE):
jam();
// Since only backup uses subscription triggers we send to backup directly for now
- ref = trigPtr->m_receiverBlock;
- executeDirect = true;
+ ref = trigPtr->m_receiverRef;
+ // executeDirect = !isNdbMtLqh() || (refToMain(ref) != SUMA);
+ executeDirect = refToInstance(ref) == instance();
break;
case (TriggerType::READ_ONLY_CONSTRAINT):
terrorCode = ZREAD_ONLY_CONSTRAINT_VIOLATION;
@@ -1174,7 +1175,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
req_struct->m_tuple_ptr= save;
ndbrequire(ret != -1);
noBeforeWords = ret;
- if (trigPtr->m_receiverBlock != SUMA &&
+ if (refToMain(trigPtr->m_receiverRef) != SUMA &&
(noAfterWords == noBeforeWords) &&
(memcmp(afterBuffer, beforeBuffer, noAfterWords << 2) == 0)) {
//--------------------------------------------------------------------
@@ -1209,7 +1210,7 @@ void Dbtup::sendTrigAttrInfo(Signal* sig
sigLen);
if (executeDirect) {
jam();
- EXECUTE_DIRECT(receiverReference,
+ EXECUTE_DIRECT(refToMain(receiverReference),
GSN_TRIG_ATTRINFO,
signal,
TrigAttrInfo::StaticLength + sigLen);
@@ -1275,23 +1276,25 @@ void Dbtup::sendFireTrigOrd(Signal* sign
break;
case (TriggerType::SUBSCRIPTION_BEFORE): // Only Suma
jam();
- // Since only backup uses subscription triggers we
- // send to backup directly for now
fireTrigOrd->setGCI(req_struct->gci_hi);
fireTrigOrd->setHashValue(req_struct->hash_value);
fireTrigOrd->m_any_value = regOperPtr->m_any_value;
fireTrigOrd->m_gci_lo = req_struct->gci_lo;
- EXECUTE_DIRECT(trigPtr->m_receiverBlock,
- GSN_FIRE_TRIG_ORD,
- signal,
- FireTrigOrd::SignalLengthSuma);
+ if (refToInstance(trigPtr->m_receiverRef) == instance())
+ EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
+ GSN_FIRE_TRIG_ORD,
+ signal,
+ FireTrigOrd::SignalLengthSuma);
+ else
+ sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
+ signal, FireTrigOrd::SignalLengthSuma, JBB);
break;
case (TriggerType::SUBSCRIPTION):
jam();
// Since only backup uses subscription triggers we
// send to backup directly for now
fireTrigOrd->setGCI(req_struct->gci_hi);
- EXECUTE_DIRECT(trigPtr->m_receiverBlock,
+ EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
GSN_FIRE_TRIG_ORD,
signal,
FireTrigOrd::SignalWithGCILength);
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-08-27 08:00:52 +0000
@@ -233,7 +233,9 @@ Suma::execSTTOR(Signal* signal) {
if(m_startphase == 3)
{
jam();
- ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
+ // wl4391_todo something
+ Uint32 instanceNo = !isNdbMtLqh() ? 0 : 1;
+ ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP, instanceNo)) != 0);
}
if(m_startphase == 5)
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (pekka:2738) WL#4391 | Pekka Nousiainen | 27 Aug |