List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 27 2008 10:00am
Subject:bzr commit into mysql-5.1 branch (pekka:2738) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 2738 Pekka Nousiainen	2008-08-27
      wl#4391 19.diff
      SUMA (subscription triggers).
modified:
  storage/ndb/src/kernel/blocks/LocalProxy.cpp
  storage/ndb/src/kernel/blocks/LocalProxy.hpp
  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
  storage/ndb/src/kernel/blocks/suma/Suma.cpp

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-08-27 08:00:52 +0000
@@ -745,6 +745,8 @@ LocalProxy::sendTIME_SIGNAL(Signal* sign
 void
 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
 {
+  if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
+    return;
   const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
   Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
   ss.m_req = *req;
@@ -822,6 +824,8 @@ LocalProxy::sendCREATE_TRIG_IMPL_CONF(Si
 void
 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
 {
+  if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
+    return;
   const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
   Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
   ss.m_req = *req;

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-08-27 08:00:52 +0000
@@ -128,6 +128,10 @@ protected:
   template <class Ss>
   struct SsPool {
     Ss m_pool[Ss::poolSize];
+    Uint32 m_usage;
+    SsPool() {
+      m_usage = 0;
+    }
   };
 
   Uint32 c_ssIdSeq;
@@ -148,6 +152,7 @@ protected:
   Ss& ssSeize(Uint32 ssId) {
     SsPool<Ss>& sp = Ss::pool(this);
     ndbrequire(ssId != 0);
+    ndbrequire(sp.m_usage < Ss::poolSize);
     Ss* ssptr = 0;
     for (Uint32 i = 0; i < Ss::poolSize; i++) {
       Ss& ss = sp.m_pool[i];
@@ -160,6 +165,7 @@ protected:
       }
     }
     ndbrequire(ssptr != 0);
+    sp.m_usage++;
     return *ssptr;
   }
 
@@ -182,6 +188,7 @@ protected:
   template <class Ss>
   void ssRelease(Uint32 ssId) {
     SsPool<Ss>& sp = Ss::pool(this);
+    ndbrequire(sp.m_usage != 0);
     ndbrequire(ssId != 0);
     Ss* ssptr = 0;
     for (Uint32 i = 0; i < Ss::poolSize; i++) {
@@ -193,6 +200,7 @@ protected:
       }
     }
     ndbrequire(ssptr != 0);
+    sp.m_usage--;
   }
 
   template <class Ss>
@@ -200,6 +208,22 @@ protected:
     ssRelease<Ss>(ss.m_ssId);
   }
 
+  /*
+   * In some cases handle pool full via delayed signal.
+   * wl4391_todo maybe use CONTINUEB and guard against infinite loop.
+   */
+  template <class Ss>
+  bool ssQueue(Signal* signal) {
+    SsPool<Ss>& sp = Ss::pool(this);
+    if (sp.m_usage < Ss::poolSize)
+      return false;
+    ndbrequire(signal->getNoOfSections() == 0);
+    GlobalSignalNumber gsn = signal->header.theVerId_signalNumber & 0xFFFF;
+    sendSignalWithDelay(reference(), gsn,
+                        signal, 10, signal->length());
+    return true;
+  }
+
   // system info
 
   Uint32 c_typeOfStart;
@@ -405,7 +429,7 @@ protected:
       m_sendREQ = &LocalProxy::sendCREATE_TRIG_IMPL_REQ;
       m_sendCONF = &LocalProxy::sendCREATE_TRIG_IMPL_CONF;
     }
-    enum { poolSize = 1 };
+    enum { poolSize = 3 };
     static SsPool<Ss_CREATE_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
       return proxy->c_ss_CREATE_TRIG_IMPL_REQ;
     }
@@ -424,7 +448,7 @@ protected:
       m_sendREQ = &LocalProxy::sendDROP_TRIG_IMPL_REQ;
       m_sendCONF = &LocalProxy::sendDROP_TRIG_IMPL_CONF;
     }
-    enum { poolSize = 1 };
+    enum { poolSize = 3 };
     static SsPool<Ss_DROP_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
       return proxy->c_ss_DROP_TRIG_IMPL_REQ;
     }

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-08-07 11:52:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-08-27 08:00:52 +0000
@@ -897,9 +897,9 @@ struct TupTriggerData {
   TriggerActionTime::Value triggerActionTime;
   TriggerEvent::Value triggerEvent;
   /**
-   * Receiver block
+   * Receiver block reference
    */
-  Uint32 m_receiverBlock;
+  Uint32 m_receiverRef;
   
   /**
    * Monitor all replicas, i.e. trigger will fire on all nodes where tuples

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-08-27 08:00:52 +0000
@@ -318,7 +318,7 @@ Dbtup::createTrigger(Tablerec* table, co
 
     tptr.p->monitorAllAttributes = TriggerInfo::getMonitorAllAttributes(tinfo);
     tptr.p->monitorReplicas = TriggerInfo::getMonitorReplicas(tinfo);
-    tptr.p->m_receiverBlock = refToBlock(req->receiverRef);
+    tptr.p->m_receiverRef = req->receiverRef;
 
     if (tptr.p->monitorAllAttributes)
     {
@@ -420,7 +420,7 @@ Dbtup::dropTrigger(Tablerec* table, cons
       if (ptr.p->triggerId == triggerId)
       {
 	if(ttype==TriggerType::SUBSCRIPTION &&
-	   sender != ptr.p->m_receiverBlock)
+	   sender != refToBlock(ptr.p->m_receiverRef))
 	{
 	  /**
 	   * You can only drop your own triggers for subscription triggers.
@@ -874,7 +874,7 @@ void Dbtup::executeTrigger(KeyReqStruct 
       }
   */
   Signal* signal= req_struct->signal;
-  BlockReference ref = trigPtr->m_receiverBlock;
+  BlockReference ref = trigPtr->m_receiverRef;
   Uint32* const keyBuffer = &cinBuffer[0];
   Uint32* const afterBuffer = &coutBuffer[0];
   Uint32* const beforeBuffer = &clogMemBuffer[0];
@@ -886,7 +886,7 @@ void Dbtup::executeTrigger(KeyReqStruct 
   ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
   Fragrecord::FragState fragstatus = regFragPtr.p->fragStatus;
 
-  if (ref == BACKUP) {
+  if (refToMain(ref) == BACKUP) {
     jam();
     /*
     In order for the implementation of BACKUP to work even when changing
@@ -953,8 +953,9 @@ void Dbtup::executeTrigger(KeyReqStruct 
   case (TriggerType::SUBSCRIPTION_BEFORE):
     jam();
     // Since only backup uses subscription triggers we send to backup directly for now
-    ref = trigPtr->m_receiverBlock;
-    executeDirect = true;
+    ref = trigPtr->m_receiverRef;
+    // executeDirect = !isNdbMtLqh() || (refToMain(ref) != SUMA);
+    executeDirect = refToInstance(ref) == instance();
     break;
   case (TriggerType::READ_ONLY_CONSTRAINT):
     terrorCode = ZREAD_ONLY_CONSTRAINT_VIOLATION;
@@ -1174,7 +1175,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
     req_struct->m_tuple_ptr= save;
     ndbrequire(ret != -1);
     noBeforeWords = ret;
-    if (trigPtr->m_receiverBlock != SUMA &&
+    if (refToMain(trigPtr->m_receiverRef) != SUMA &&
         (noAfterWords == noBeforeWords) &&
         (memcmp(afterBuffer, beforeBuffer, noAfterWords << 2) == 0)) {
 //--------------------------------------------------------------------
@@ -1209,7 +1210,7 @@ void Dbtup::sendTrigAttrInfo(Signal* sig
                      sigLen);
     if (executeDirect) {
       jam();
-      EXECUTE_DIRECT(receiverReference, 
+      EXECUTE_DIRECT(refToMain(receiverReference), 
                      GSN_TRIG_ATTRINFO,
                      signal,
 		     TrigAttrInfo::StaticLength + sigLen);
@@ -1275,23 +1276,25 @@ void Dbtup::sendFireTrigOrd(Signal* sign
     break;
   case (TriggerType::SUBSCRIPTION_BEFORE): // Only Suma
     jam();
-    // Since only backup uses subscription triggers we 
-    // send to backup directly for now
     fireTrigOrd->setGCI(req_struct->gci_hi);
     fireTrigOrd->setHashValue(req_struct->hash_value);
     fireTrigOrd->m_any_value = regOperPtr->m_any_value;
     fireTrigOrd->m_gci_lo = req_struct->gci_lo;
-    EXECUTE_DIRECT(trigPtr->m_receiverBlock,
-                   GSN_FIRE_TRIG_ORD,
-                   signal,
-		   FireTrigOrd::SignalLengthSuma);
+    if (refToInstance(trigPtr->m_receiverRef) == instance())
+      EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
+                     GSN_FIRE_TRIG_ORD,
+                     signal,
+                     FireTrigOrd::SignalLengthSuma);
+    else
+      sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
+                 signal, FireTrigOrd::SignalLengthSuma, JBB);
     break;
   case (TriggerType::SUBSCRIPTION):
     jam();
     // Since only backup uses subscription triggers we 
     // send to backup directly for now
     fireTrigOrd->setGCI(req_struct->gci_hi);
-    EXECUTE_DIRECT(trigPtr->m_receiverBlock,
+    EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
                    GSN_FIRE_TRIG_ORD,
                    signal,
 		   FireTrigOrd::SignalWithGCILength);

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-08-27 07:54:42 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-08-27 08:00:52 +0000
@@ -233,7 +233,9 @@ Suma::execSTTOR(Signal* signal) {
   if(m_startphase == 3)
   {
     jam();
-    ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
+    // wl4391_todo something
+    Uint32 instanceNo = !isNdbMtLqh() ? 0 : 1;
+    ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP, instanceNo)) != 0);
   }
 
   if(m_startphase == 5)

Thread
bzr commit into mysql-5.1 branch (pekka:2738) WL#4391Pekka Nousiainen27 Aug