#At file:///home/jonas/src/telco-6.4/ based on revid:jonas@stripped
2964 Jonas Oreland 2009-08-24
ndb - bug#46782
crash in SUMA.
For each global checkpoint, schedule each thread running an LQH to prevent
uneven load causing SUMA to overflow circular buffer
modified:
storage/ndb/include/kernel/GlobalSignalNumbers.h
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2009-08-24 08:18:43 +0000
@@ -816,8 +816,9 @@ extern const GlobalSignalNumber NO_OF_SI
#define GSN_ALTER_TABLE_REP 606
#define GSN_API_BROADCAST_REP 607
-#define GSN_608
-#define GSN_609
+
+#define GSN_SYNC_THREAD_REQ 608
+#define GSN_SYNC_THREAD_CONF 609
#define GSN_610
#define GSN_611
=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2009-08-05 10:48:56 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp 2009-08-24 08:18:43 +0000
@@ -744,5 +744,8 @@ const GsnName SignalNames [] = {
,{ GSN_CREATE_HASH_MAP_REQ, "CREATE_HASH_MAP_REQ" }
,{ GSN_CREATE_HASH_MAP_REF, "CREATE_HASH_MAP_REF" }
,{ GSN_CREATE_HASH_MAP_CONF, "CREATE_HASH_MAP_CONF" }
+
+ ,{ GSN_SYNC_THREAD_REQ, "SYNC_THREAD_REQ" }
+ ,{ GSN_SYNC_THREAD_CONF, "SYNC_THREAD_CONF" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2009-08-18 06:59:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2009-08-24 08:18:43 +0000
@@ -755,6 +755,7 @@ private:
void execDIH_SCAN_TAB_COMPLETE_REP(Signal*);
void execGCP_SAVEREF(Signal *);
void execGCP_TCFINISHED(Signal *);
+ void execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err);
void execREAD_NODESCONF(Signal *);
void execNDB_STTOR(Signal *);
void execDICTSTARTCONF(Signal *);
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2009-08-18 13:44:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2009-08-24 08:18:43 +0000
@@ -9437,8 +9437,25 @@ void Dbdih::execGCP_TCFINISHED(Signal* s
#endif
ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT);
+
+ /**
+ * Make sure that each LQH gets scheduled, so that they don't get out of sync
+ * wrt to SUB_GCP_COMPLETE_REP
+ */
+ Callback cb;
+ cb.m_callbackData = 10;
+ cb.m_callbackFunction = safe_cast(&Dbdih::execGCP_TCFINISHED_sync_conf);
+ Uint32 blocks[] = { DBLQH, 0 };
+ synchronize_threads_for_blocks(signal, blocks, cb);
+}//Dbdih::execGCP_TCFINISHED()
+
+void
+Dbdih::execGCP_TCFINISHED_sync_conf(Signal* signal, Uint32 cb, Uint32 err)
+{
+ ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMIT);
+
m_micro_gcp.m_state = MicroGcp::M_GCP_COMMITTED;
- retRef = m_micro_gcp.m_master_ref;
+ Uint32 retRef = m_micro_gcp.m_master_ref;
GCPNodeFinished* conf2 = (GCPNodeFinished*)signal->getDataPtrSend();
conf2->nodeId = cownNodeId;
@@ -9447,7 +9464,7 @@ void Dbdih::execGCP_TCFINISHED(Signal* s
conf2->gci_lo = (Uint32)(m_micro_gcp.m_old_gci & 0xFFFFFFFF);
sendSignal(retRef, GSN_GCP_NODEFINISH, signal,
GCPNodeFinished::SignalLength, JBB);
-}//Dbdih::execGCP_TCFINISHED()
+}
void
Dbdih::execSUB_GCP_COMPLETE_REP(Signal* signal)
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-08-24 08:18:43 +0000
@@ -158,6 +158,10 @@ SimulatedBlock::initCommon()
count = 5;
this->getParam("ActiveCounters", &count);
c_counterMgr.setSize(count);
+
+ count = 5;
+ this->getParam("ActiveThreadSync", &count);
+ c_syncThreadPool.setSize(count);
}
SimulatedBlock::~SimulatedBlock()
@@ -208,6 +212,8 @@ SimulatedBlock::installSimulatedBlockFun
a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
+ a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
+ a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
}
void
@@ -1918,7 +1924,6 @@ SimulatedBlock::execCALLBACK_CONF(Signal
Uint32 senderRef = conf->senderRef;
ndbrequire(m_callbackTableAddr != 0);
- const CallbackTable& ct = *m_callbackTableAddr;
const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
CallbackFunction function = ce.m_function;
@@ -3339,3 +3344,67 @@ SimulatedBlock::debugOutTag(char *buf, i
return buf;
}
#endif
+
+void
+SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
+ const Uint32 blocks[],
+ const Callback & cb,
+ JobBufferLevel prio)
+{
+#ifndef NDBD_MULTITHREADED
+ Callback copy = cb;
+ execute(signal, copy, 0);
+#else
+ ljam();
+ Uint32 ref[32]; // max threads
+ Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
+ ref, NDB_ARRAY_SIZE(ref));
+ if (cnt == 0)
+ {
+ ljam();
+ Callback copy = cb;
+ execute(signal, copy, 0);
+ return;
+ }
+
+ Ptr<SyncThreadRecord> ptr;
+ ndbrequire(c_syncThreadPool.seize(ptr));
+ ptr.p->m_cnt = cnt;
+ ptr.p->m_callback = cb;
+
+ signal->theData[0] = reference();
+ signal->theData[1] = ptr.i;
+ signal->theData[2] = Uint32(prio);
+ for (Uint32 i = 0; i<cnt; i++)
+ {
+ sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
+ }
+#endif
+}
+
+void
+SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
+{
+ ljamEntry();
+ Uint32 ref = signal->theData[0];
+ Uint32 prio = signal->theData[2];
+ sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
+ JobBufferLevel(prio));
+}
+
+void
+SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
+{
+ ljamEntry();
+ Ptr<SyncThreadRecord> ptr;
+ c_syncThreadPool.getPtr(ptr, signal->theData[1]);
+ if (ptr.p->m_cnt == 1)
+ {
+ ljam();
+ Callback copy = ptr.p->m_callback;
+ c_syncThreadPool.release(ptr);
+ execute(signal, copy, 0);
+ return;
+ }
+ ptr.p->m_cnt --;
+}
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-08-24 08:18:43 +0000
@@ -141,6 +141,14 @@ protected:
void initCommon();
public:
+ typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
+ Uint32 callbackData,
+ Uint32 returnCode);
+ struct Callback {
+ CallbackFunction m_callbackFunction;
+ Uint32 m_callbackData;
+ };
+
/**
*
*/
@@ -189,15 +197,26 @@ public:
static Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
static Uint32 getInstanceFromKey(Uint32 instanceKey); // local use only
-public:
- typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
- Uint32 callbackData,
- Uint32 returnCode);
- struct Callback {
- CallbackFunction m_callbackFunction;
- Uint32 m_callbackData;
- };
+ /**
+ * This method will make sure that when callback in called each
+ * thread running an instance any of the threads in blocks[]
+ * will have executed a signal
+ */
+ void synchronize_threads_for_blocks(Signal*, const Uint32 blocks[],
+ const Callback&, JobBufferLevel = JBB);
+private:
+ struct SyncThreadRecord
+ {
+ Callback m_callback;
+ Uint32 m_cnt;
+ Uint32 nextPool;
+ };
+ ArrayPool<SyncThreadRecord> c_syncThreadPool;
+ void execSYNC_THREAD_REQ(Signal*);
+ void execSYNC_THREAD_CONF(Signal*);
+
+public:
virtual const char* get_filename(Uint32 fd) const { return "";}
protected:
static Callback TheEmptyCallback;
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2009-06-06 12:20:07 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2009-08-24 08:18:43 +0000
@@ -3995,6 +3995,37 @@ lookup_lock(const void * ptr)
return 0;
}
+Uint32
+mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
+ Uint32 dst[], Uint32 len)
+{
+ Uint32 cnt = 0;
+ Bitmask<(MAX_THREADS+31)/32> mask;
+ mask.set(threadId);
+ for (Uint32 i = 0; blocks[i] != 0; i++)
+ {
+ Uint32 block = blocks[i];
+ /**
+ * Find each thread that has instance of block
+ */
+ assert(block == blockToMain(block));
+ Uint32 index = block - MIN_BLOCK_NO;
+ for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
+ {
+ Uint32 thr_no = thr_map[index][instance].thr_no;
+ if (thr_no == thr_map_entry::NULL_THR_NO)
+ break;
+
+ if (mask.get(thr_no))
+ continue;
+
+ mask.set(thr_no);
+ require(cnt < len);
+ dst[cnt++] = numberToRef(block, instance, 0);
+ }
+ }
+ return cnt;
+}
/**
* Global data
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2009-08-24 08:18:43 +0000
@@ -56,4 +56,16 @@ SendStatus mt_send_remote(Uint32 self, c
void mt_section_lock();
void mt_section_unlock();
+/**
+ * Get list of BlockReferences so that
+ * each thread holding an instance of any block in blocks[] get "covered"
+ * (excluding ownThreadId
+ *
+ * eg. calling it with DBLQH, will return a block-reference to *a* block
+ * in each of the threads that has an DBLQH instance
+ */
+Uint32 mt_get_thread_references_for_blocks(const Uint32 blocks[],
+ Uint32 ownThreadId,
+ Uint32 dst[], Uint32 len);
+
#endif
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20090824081843-h3uuf8qo3s4c6yyr.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (jonas:2964) Bug#46782 | Jonas Oreland | 24 Aug |