List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:August 24 2009 8:18am
Subject:bzr commit into mysql-5.1-telco-7.0 branch (jonas:2964) Bug#46782
View as plain text  
#At file:///home/jonas/src/telco-6.4/ based on revid:jonas@stripped

 2964 Jonas Oreland	2009-08-24
      ndb - bug#46782
        crash in SUMA.
        For each global checkpoint, schedule each thread running an LQH to prevent
          uneven load causing SUMA to overflow circular buffer

    modified:
      storage/ndb/include/kernel/GlobalSignalNumbers.h
      storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2009-08-24 08:18:43 +0000
@@ -816,8 +816,9 @@ extern const GlobalSignalNumber NO_OF_SI
 
 #define GSN_ALTER_TABLE_REP             606
 #define GSN_API_BROADCAST_REP           607
-#define GSN_608
-#define GSN_609
+
+#define GSN_SYNC_THREAD_REQ             608
+#define GSN_SYNC_THREAD_CONF            609
 #define GSN_610
 #define GSN_611
 

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2009-08-05 10:48:56 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2009-08-24 08:18:43 +0000
@@ -744,5 +744,8 @@ const GsnName SignalNames [] = {
   ,{ GSN_CREATE_HASH_MAP_REQ, "CREATE_HASH_MAP_REQ" }
   ,{ GSN_CREATE_HASH_MAP_REF, "CREATE_HASH_MAP_REF" }
   ,{ GSN_CREATE_HASH_MAP_CONF, "CREATE_HASH_MAP_CONF" }
+
+  ,{ GSN_SYNC_THREAD_REQ, "SYNC_THREAD_REQ" }
+  ,{ GSN_SYNC_THREAD_CONF, "SYNC_THREAD_CONF" }
 };
 const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2009-08-18 06:59:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2009-08-24 08:18:43 +0000
@@ -755,6 +755,7 @@ private:
   void execDIH_SCAN_TAB_COMPLETE_REP(Signal*);
   void execGCP_SAVEREF(Signal *);
   void execGCP_TCFINISHED(Signal *);
+  void execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err);
   void execREAD_NODESCONF(Signal *);
   void execNDB_STTOR(Signal *);
   void execDICTSTARTCONF(Signal *);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-08-18 13:44:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-08-24 08:18:43 +0000
@@ -9437,8 +9437,25 @@ void Dbdih::execGCP_TCFINISHED(Signal* s
 #endif
 
   ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT);
+
+  /**
+   * Make sure that each LQH gets scheduled, so that they don't get out of sync
+   * wrt to SUB_GCP_COMPLETE_REP
+   */
+  Callback cb;
+  cb.m_callbackData = 10;
+  cb.m_callbackFunction = safe_cast(&Dbdih::execGCP_TCFINISHED_sync_conf);
+  Uint32 blocks[] = { DBLQH, 0 };
+  synchronize_threads_for_blocks(signal, blocks, cb);
+}//Dbdih::execGCP_TCFINISHED()
+
+void
+Dbdih::execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err)
+{
+  ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT);
+
   m_micro_gcp.m_state = MicroGcp::M_GCP_COMMITTED;
-  retRef = m_micro_gcp.m_master_ref;
+  Uint32 retRef = m_micro_gcp.m_master_ref;
 
   GCPNodeFinished* conf2 = (GCPNodeFinished*)signal->getDataPtrSend();
   conf2->nodeId = cownNodeId;
@@ -9447,7 +9464,7 @@ void Dbdih::execGCP_TCFINISHED(Signal* s
   conf2->gci_lo = (Uint32)(m_micro_gcp.m_old_gci & 0xFFFFFFFF);
   sendSignal(retRef, GSN_GCP_NODEFINISH, signal, 
              GCPNodeFinished::SignalLength, JBB);
-}//Dbdih::execGCP_TCFINISHED()
+}
 
 void
 Dbdih::execSUB_GCP_COMPLETE_REP(Signal* signal)

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-08-24 08:18:43 +0000
@@ -158,6 +158,10 @@ SimulatedBlock::initCommon()
   count = 5;
   this->getParam("ActiveCounters", &count);
   c_counterMgr.setSize(count);
+
+  count = 5;
+  this->getParam("ActiveThreadSync", &count);
+  c_syncThreadPool.setSize(count);
 }
 
 SimulatedBlock::~SimulatedBlock()
@@ -208,6 +212,8 @@ SimulatedBlock::installSimulatedBlockFun
   a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
   a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
   a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
+  a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
+  a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
 }
 
 void
@@ -1918,7 +1924,6 @@ SimulatedBlock::execCALLBACK_CONF(Signal
   Uint32 senderRef = conf->senderRef;
 
   ndbrequire(m_callbackTableAddr != 0);
-  const CallbackTable& ct = *m_callbackTableAddr;
   const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
   CallbackFunction function = ce.m_function;
 
@@ -3339,3 +3344,67 @@ SimulatedBlock::debugOutTag(char *buf, i
   return buf;
 }
 #endif
+
+void
+SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
+                                               const Uint32 blocks[],
+                                               const Callback & cb,
+                                               JobBufferLevel prio)
+{
+#ifndef NDBD_MULTITHREADED
+  Callback copy = cb;
+  execute(signal, copy, 0);
+#else
+  ljam();
+  Uint32 ref[32]; // max threads
+  Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
+                                                   ref, NDB_ARRAY_SIZE(ref));
+  if (cnt == 0)
+  {
+    ljam();
+    Callback copy = cb;
+    execute(signal, copy, 0);
+    return;
+  }
+
+  Ptr<SyncThreadRecord> ptr;
+  ndbrequire(c_syncThreadPool.seize(ptr));
+  ptr.p->m_cnt = cnt;
+  ptr.p->m_callback = cb;
+
+  signal->theData[0] = reference();
+  signal->theData[1] = ptr.i;
+  signal->theData[2] = Uint32(prio);
+  for (Uint32 i = 0; i<cnt; i++)
+  {
+    sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
+  }
+#endif
+}
+
+void
+SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
+{
+  ljamEntry();
+  Uint32 ref = signal->theData[0];
+  Uint32 prio = signal->theData[2];
+  sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
+             JobBufferLevel(prio));
+}
+
+void
+SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
+{
+  ljamEntry();
+  Ptr<SyncThreadRecord> ptr;
+  c_syncThreadPool.getPtr(ptr, signal->theData[1]);
+  if (ptr.p->m_cnt == 1)
+  {
+    ljam();
+    Callback copy = ptr.p->m_callback;
+    c_syncThreadPool.release(ptr);
+    execute(signal, copy, 0);
+    return;
+  }
+  ptr.p->m_cnt --;
+}

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-08-24 08:18:43 +0000
@@ -141,6 +141,14 @@ protected:
 
   void initCommon();
 public:
+  typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
+						    Uint32 callbackData,
+						    Uint32 returnCode);
+  struct Callback {
+    CallbackFunction m_callbackFunction;
+    Uint32 m_callbackData;
+  };
+
   /**
    * 
    */
@@ -189,15 +197,26 @@ public:
   static Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
   static Uint32 getInstanceFromKey(Uint32 instanceKey); // local use only
 
-public:
-  typedef void (SimulatedBlock::* CallbackFunction)(class Signal*, 
-						    Uint32 callbackData,
-						    Uint32 returnCode);
-  struct Callback {
-    CallbackFunction m_callbackFunction;
-    Uint32 m_callbackData;
-  };
+  /**
+   * This method will make sure that when callback in called each
+   *   thread running an instance any of the threads in blocks[]
+   *   will have executed a signal
+   */
+  void synchronize_threads_for_blocks(Signal*, const Uint32 blocks[],
+                                      const Callback&, JobBufferLevel = JBB);
   
+private:
+  struct SyncThreadRecord
+  {
+    Callback m_callback;
+    Uint32 m_cnt;
+    Uint32 nextPool;
+  };
+  ArrayPool<SyncThreadRecord> c_syncThreadPool;
+  void execSYNC_THREAD_REQ(Signal*);
+  void execSYNC_THREAD_CONF(Signal*);
+
+public:
   virtual const char* get_filename(Uint32 fd) const { return "";}
 protected:
   static Callback TheEmptyCallback;

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2009-06-06 12:20:07 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2009-08-24 08:18:43 +0000
@@ -3995,6 +3995,37 @@ lookup_lock(const void * ptr)
   return 0;
 }
 
+Uint32
+mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
+                                    Uint32 dst[], Uint32 len)
+{
+  Uint32 cnt = 0;
+  Bitmask<(MAX_THREADS+31)/32> mask;
+  mask.set(threadId);
+  for (Uint32 i = 0; blocks[i] != 0; i++)
+  {
+    Uint32 block = blocks[i];
+    /**
+     * Find each thread that has instance of block
+     */
+    assert(block == blockToMain(block));
+    Uint32 index = block - MIN_BLOCK_NO;
+    for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
+    {
+      Uint32 thr_no = thr_map[index][instance].thr_no;
+      if (thr_no == thr_map_entry::NULL_THR_NO)
+        break;
+
+      if (mask.get(thr_no))
+        continue;
+
+      mask.set(thr_no);
+      require(cnt < len);
+      dst[cnt++] = numberToRef(block, instance, 0);
+    }
+  }
+  return cnt;
+}
 
 /**
  * Global data

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2009-08-24 08:18:43 +0000
@@ -56,4 +56,16 @@ SendStatus mt_send_remote(Uint32 self, c
 void mt_section_lock();
 void mt_section_unlock();
 
+/**
+ * Get list of BlockReferences so that
+ *   each thread holding an instance of any block in blocks[] get "covered"
+ *   (excluding ownThreadId
+ *
+ * eg. calling it with DBLQH, will return a block-reference to *a* block
+ *     in each of the threads that has an DBLQH instance
+ */
+Uint32 mt_get_thread_references_for_blocks(const Uint32 blocks[],
+                                           Uint32 ownThreadId,
+                                           Uint32 dst[], Uint32 len);
+
 #endif


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20090824081843-h3uuf8qo3s4c6yyr.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (jonas:2964) Bug#46782Jonas Oreland24 Aug