#At file:///home/jonas/src/telco-6.3/
2634 jonas@stripped 2008-06-16 [merge]
merge
modified:
storage/ndb/include/kernel/signaldata/SumaImpl.hpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/kernel/blocks/suma/Suma.hpp
=== modified file 'storage/ndb/include/kernel/signaldata/SumaImpl.hpp'
--- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2008-02-20 09:04:29 +0000
+++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2008-06-16 06:25:29 +0000
@@ -39,7 +39,8 @@ struct SubCreateReq {
GetFlags = 0xff << 16,
RestartFlag = 0x2 << 16,
ReportAll = 0x4 << 16,
- ReportSubscribe= 0x8 << 16
+ ReportSubscribe= 0x8 << 16,
+ NR_Sub_Dropped = 0x1 << 24 // sub is dropped but needs to be copied
};
Uint32 senderRef;
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-06-11 20:28:21 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-06-16 06:50:10 +0000
@@ -1469,6 +1469,16 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
Subscription::REPORT_SUBSCRIBE : 0;
const Uint32 tableId = req.tableId;
+ bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped;
+
+ /**
+ * This 2 options are only allowed during NR
+ */
+ if (subDropped)
+ {
+ ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id);
+ }
+
Subscription key;
key.m_subscriptionId = subId;
key.m_subscriptionKey = subKey;
@@ -1550,6 +1560,12 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
subOpPtr.p->m_senderRef = senderRef;
subOpPtr.p->m_senderData = senderData;
+ if (subDropped)
+ {
+ jam();
+ subPtr.p->m_options |= Subscription::MARKED_DROPPED;
+ }
+
TablePtr tabPtr;
if (found)
{
@@ -2345,11 +2361,6 @@ Suma::execSUB_START_REQ(Signal* signal){
case Subscription::UNDEFINED:
jam();
ndbrequire(false);
- case Subscription::DROPPED:
- jam();
- sendSubStartRef(signal,
- senderRef, senderData, SubStartRef::Dropped);
- return;
case Subscription::DEFINING:
jam();
sendSubStartRef(signal,
@@ -2359,6 +2370,23 @@ Suma::execSUB_START_REQ(Signal* signal){
break;
}
+ if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
+ {
+ jam();
+ if (c_startup.m_restart_server_node_id == 0)
+ {
+ sendSubStartRef(signal,
+ senderRef, senderData, SubStartRef::Dropped);
+ return;
+ }
+ else
+ {
+ /**
+ * Allow SUB_START_REQ from peer node
+ */
+ }
+ }
+
if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
{
jam();
@@ -2437,7 +2465,6 @@ Suma::execSUB_START_REQ(Signal* signal){
break;
case Subscription::T_DEFINED:{
jam();
-
report_sub_start_conf(signal, subPtr);
return;
}
@@ -2714,27 +2741,40 @@ Suma::drop_triggers(Signal* signal, Subs
jam();
subPtr.p->m_outstanding_trigger = 0;
- for(Uint32 j = 0; j<3; j++)
+
+ Ptr<Table> tabPtr;
+ c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+ if (tabPtr.p->m_state == Table::DROPPED)
{
- Uint32 triggerId = subPtr.p->m_triggers[j];
- if (triggerId != ILLEGAL_TRIGGER_ID)
+ jam();
+ subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
+ subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
+ subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
+ }
+ else
+ {
+ for(Uint32 j = 0; j<3; j++)
{
- jam();
- subPtr.p->m_outstanding_trigger++;
- DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
- req->setConnectionPtr(subPtr.i);
- req->setUserRef(SUMA_REF);
- req->setRequestType(DropTrigReq::RT_USER);
- req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
- req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
- req->setIndexId(RNIL);
-
- req->setTableId(subPtr.p->m_tableId);
- req->setTriggerId(triggerId);
- req->setTriggerEvent((TriggerEvent::Value)j);
+ Uint32 triggerId = subPtr.p->m_triggers[j];
+ if (triggerId != ILLEGAL_TRIGGER_ID)
+ {
+ jam();
+ subPtr.p->m_outstanding_trigger++;
+ DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
+ req->setConnectionPtr(subPtr.i);
+ req->setUserRef(SUMA_REF);
+ req->setRequestType(DropTrigReq::RT_USER);
+ req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
+ req->setIndexId(RNIL);
+
+ req->setTableId(subPtr.p->m_tableId);
+ req->setTriggerId(triggerId);
+ req->setTriggerEvent((TriggerEvent::Value)j);
- sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
- signal, DropTrigReq::SignalLength, JBB);
+ sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+ signal, DropTrigReq::SignalLength, JBB);
+ }
}
}
@@ -2904,9 +2944,6 @@ Suma::execSUB_STOP_REQ(Signal* signal){
sendSubStopRef(signal,
senderRef, senderData, SubStopRef::Defining);
return;
- case Subscription::DROPPED:
- jam();
- break;
case Subscription::DEFINED:
jam();
break;
@@ -3759,7 +3796,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear())
{
CRASH_INSERTION(13033);
-
+
NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
SubGcpCompleteRep::SignalLength, JBB);
@@ -3834,6 +3871,14 @@ Suma::execDROP_TAB_CONF(Signal *signal)
tabPtr.p->m_state = Table::DROPPED;
c_tables.remove(tabPtr);
+ if (tabPtr.p->m_subscriptions.isEmpty())
+ {
+ jam();
+ tabPtr.p->release(* this);
+ c_tablePool.release(tabPtr);
+ return;
+ }
+
if (senderRef == 0)
{
jam();
@@ -3989,8 +4034,10 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
return;
}
- if (refToBlock(senderRef) == SUMA) {
+ if (refToBlock(senderRef) == SUMA)
+ {
jam();
+
// Ack from other SUMA
Uint32 nodeId= refToNode(senderRef);
for(Uint32 i = 0; i<c_no_of_buckets; i++)
@@ -4104,18 +4151,20 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
case Subscription::DEFINING:
jam();
ndbrequire(false);
- case Subscription::DROPPED:
- /**
- * already dropped
- */
- jam();
- sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
- return;
case Subscription::DEFINED:
+ if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
+ {
+ /**
+ * already dropped
+ */
+ jam();
+ sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
+ return;
+ }
break;
}
- subPtr.p->m_state = Subscription::DROPPED;
+ subPtr.p->m_options |= Subscription::MARKED_DROPPED;
check_release_subscription(signal, subPtr);
SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
@@ -4187,10 +4236,10 @@ do_release:
if (tabPtr.p->m_state == Table::DROPPED)
{
jam();
- subPtr.p->m_state = Subscription::DROPPED;
+ subPtr.p->m_options |= Subscription::MARKED_DROPPED;
}
- if (subPtr.p->m_state != Subscription::DROPPED)
+ if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0)
{
jam();
return;
@@ -4380,34 +4429,35 @@ Suma::sendSubCreateReq(Signal* signal, P
return;
}
- Ptr<Table> tabPtr;
- c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
- bool dropped =
- subPtr.p->m_state == Subscription::DROPPED ||
- tabPtr.p->m_state == Table::DROPPED;
+ c_restart.m_waiting_on_self = 0;
+ SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = subPtr.i;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->subscriptionType = subPtr.p->m_subscriptionType;
+ req->tableId = subPtr.p->m_tableId;
- if (! dropped)
+ if (subPtr.p->m_options & Subscription::REPORT_ALL)
{
- jam();
- c_restart.m_waiting_on_self = 0;
- SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
- req->senderRef = reference();
- req->senderData = subPtr.i;
- req->subscriptionId = subPtr.p->m_subscriptionId;
- req->subscriptionKey = subPtr.p->m_subscriptionKey;
- req->subscriptionType = subPtr.p->m_subscriptionType;
- req->tableId = subPtr.p->m_tableId;
+ req->subscriptionType |= SubCreateReq::ReportAll;
+ }
- if (subPtr.p->m_options & Subscription::REPORT_ALL)
- {
- req->subscriptionType |= SubCreateReq::ReportAll;
- }
+ if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
+ {
+ req->subscriptionType |= SubCreateReq::ReportSubscribe;
+ }
- if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
- {
- req->subscriptionType |= SubCreateReq::ReportSubscribe;
- }
+ if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
+ {
+ req->subscriptionType |= SubCreateReq::NR_Sub_Dropped;
+ ndbout_c("copying dropped sub: %u", subPtr.i);
+ }
+ Ptr<Table> tabPtr;
+ c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
+ if (tabPtr.p->m_state != Table::DROPPED)
+ {
if (!ndbd_suma_dictlock(getNodeInfo(refToNode(c_restart.m_ref)).m_version))
{
jam();
@@ -4420,16 +4470,17 @@ Suma::sendSubCreateReq(Signal* signal, P
* Thank you Ms. Fortuna
*/
}
-
+
sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
SubCreateReq::SignalLength, JBB);
}
else
{
- /**
- * No need to copy DROPPED subscription...
- * but this introduces a real time break
- */
+ jam();
+ ndbout_c("not copying sub %u with dropped table: %u/%u",
+ subPtr.i,
+ tabPtr.p->m_tableId, tabPtr.i);
+
c_restart.m_waiting_on_self = 1;
SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
conf->senderRef = reference();
@@ -4482,25 +4533,25 @@ Suma::execSUB_CREATE_CONF(Signal* signal
abort_start_me(signal, subPtr, true);
return;
}
-
- Ptr<Subscriber> ptr;
-
+
Ptr<Table> tabPtr;
c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
- bool dropped =
- subPtr.p->m_state == Subscription::DROPPED ||
- tabPtr.p->m_state == Table::DROPPED;
-
- if (! dropped)
+
+ Ptr<Subscriber> ptr;
+ if (tabPtr.p->m_state != Table::DROPPED)
{
+ jam();
LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
list.first(ptr);
}
else
{
+ jam();
ptr.setNull();
+ ndbout_c("not copying subscribers on sub: %u with dropped table %u/%u",
+ subPtr.i, tabPtr.p->m_tableId, tabPtr.i);
}
-
+
copySubscriber(signal, subPtr, ptr);
}
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2008-05-21 08:45:04 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2008-06-16 06:50:10 +0000
@@ -228,13 +228,13 @@ public:
enum Options {
REPORT_ALL = 0x1,
- REPORT_SUBSCRIBE = 0x2
+ REPORT_SUBSCRIBE = 0x2,
+ MARKED_DROPPED = 0x4
};
enum State {
UNDEFINED,
DEFINED,
- DROPPED,
DEFINING
};
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.3 branch (jonas:2634) | jonas | 16 Jun |