4818 mikael.ronstrom@stripped 2012-01-23
ndb - add more confusing and obfuscated code to flexAsync
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
4817 mikael.ronstrom@stripped 2012-01-23
ndb - add last pieces needed for multiple receive threads
modified:
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/src/kernel/vm/mt.hpp
4816 jonas oreland 2012-01-23
ndb - assert that extra_sockets is not used by ndbmtd (i.e when using several receiveHandles)
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
4815 jonas oreland 2012-01-23
ndb - allow 12 LDM too
modified:
storage/ndb/src/kernel/vm/mt_thr_config.cpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
4814 jonas oreland 2012-01-23
ndb - fix CMake don't use "IN ITEMS"
modified:
storage/ndb/src/kernel/vm/CMakeLists.txt
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-20 07:41:48 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-23 19:53:55 +0000
@@ -1242,6 +1242,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (extra_socket && recvdata.m_transporters.get(0))
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
+ assert(&recvdata == receiveHandle); // not used by ndbmtd...
// Poll the wakup-socket for read
recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1317,6 +1318,7 @@ TransporterRegistry::performReceive(Tran
if (recvdata.m_has_data_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
+ assert(&recvdata == receiveHandle); // not used by ndbmtd
recvdata.m_has_data_transporters.clear(Uint32(0));
consume_extra_sockets();
}
@@ -1619,7 +1621,7 @@ run_start_clients_C(void * me)
}
/**
- * This method is used to initiate connection, called from the CMVMI blockx.
+ * This method is used to initiate connection, called from the TRPMAN block.
*
* This works asynchronously, no actions are taken directly in the calling
* thread.
@@ -1652,7 +1654,7 @@ TransporterRegistry::do_connect(NodeId n
}
/**
- * This method is used to initiate disconnect from CMVMI. It is also called
+ * This method is used to initiate disconnect from TRPMAN. It is also called
* from the TCP transporter in case of an I/O error on the socket.
*
* This works asynchronously, similar to do_connect().
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-23 20:23:08 +0000
@@ -23,6 +23,8 @@
#include <signaldata/RouteOrd.hpp>
#include <signaldata/DumpStateOrd.hpp>
+#include <mt.hpp>
+
Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
SimulatedBlock(TRPMAN, ctx, instanceno)
{
@@ -50,21 +52,17 @@ BLOCK_FUNCTIONS(Trpman)
#ifdef ERROR_INSERT
static NodeBitmask c_error_9000_nodes_mask;
extern Uint32 MAX_RECEIVED_SIGNALS;
-
-class TransporterReceiveHandle *
-mt_get_trp_receive_handle(unsigned instance);
#endif
bool
Trpman::handles_this_node(Uint32 nodeId)
{
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
return true;
#else
if (globalData.ndbMtReceiveThreads <= (Uint32)1)
return true;
- return (instance()==
- (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+ return (instance()== (mt_get_recv_thread_idx(nodeId) + /* proxy */ 1));
#endif
}
@@ -780,10 +778,10 @@ TrpmanProxy::execROUTE_ORD(Signal* signa
jamEntry();
ndbassert(nodeId != 0);
-#if MAX_NDBMT_RECEIVE_THREADS == 1
+#ifndef NDBD_MULTITHREADED
Uint32 workerId = 0;
#else
- Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+ Uint32 workerId = mt_get_recv_thread_idx(nodeId);
#endif
SectionHandle handle(this, signal);
sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2012-01-23 20:23:08 +0000
@@ -402,6 +402,10 @@ TransporterReceiveHandleKernel::reportCo
Uint32 secPtr[3];
globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
#else
+ /**
+ * The first argument to sendprioa is from which thread number this
+ * signal is sent, it is always sent from a receive thread
+ */
sendprioa(m_thr_no /* self */,
&signal.header, signal.theData, NULL);
#endif
@@ -413,7 +417,6 @@ TransporterReceiveHandleKernel::reportCo
void
TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
{
-
DBUG_ENTER("reportDisconnect");
SignalT<sizeof(DisconnectRep)/4> signal;
@@ -481,10 +484,24 @@ TransporterReceiveHandleKernel::checkJob
#ifndef NDBD_MULTITHREADED
return globalScheduler.checkDoJob();
#else
- return mt_checkDoJob();
+ return mt_checkDoJob(m_receiver_thread_idx);
#endif
}
+#ifdef NDBD_MULTITHREADED
+void
+TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
+{
+ m_transporters.clear(); /* Clear all first */
+ for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+ {
+ if (recv_thread_idx_array[nodeId] == m_receiver_thread_idx)
+ m_transporters.set(nodeId); /* Belongs to our receive thread */
+ }
+ return;
+}
+#endif
+
void
TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
{
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp 2012-01-23 20:23:08 +0000
@@ -36,6 +36,11 @@ public:
* instance() - 1(proxy)
*/
Uint32 m_receiver_thread_idx;
+
+ /**
+ * Assign nodes to this TransporterReceiveHandle
+ */
+ void assign_nodes(NodeId *recv_thread_idx_array);
#endif
/* TransporterCallback interface. */
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-01-23 20:23:08 +0000
@@ -89,8 +89,8 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
/* If this is too small it crashes before first signal. */
#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
-static Uint32 receiver_thread_no = 0;
static Uint32 num_threads = 0;
+static Uint32 first_receiver_thread_no = 0;
#define NO_SEND_THREAD (MAX_BLOCK_THREADS + MAX_NDBMT_SEND_THREADS + 1)
@@ -1526,10 +1526,10 @@ flush_jbb_write_state(thr_data *selfptr)
* else 1
*/
static int
-check_job_buffers(struct thr_repository* rep)
+check_job_buffers(struct thr_repository* rep, Uint32 recv_thread_id)
{
const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
- unsigned thr_no = receiver_thread_no;
+ unsigned thr_no = first_receiver_thread_no + recv_thread_id;
const thr_data *thrptr = rep->m_thread;
for (unsigned i = 0; i<num_threads; i++, thrptr++)
{
@@ -1549,7 +1549,6 @@ check_job_buffers(struct thr_repository*
return 1;
}
}
-
return 0;
}
@@ -1669,7 +1668,7 @@ trp_callback::reportSendLen(NodeId nodeI
void
trp_callback::lock_transporter(NodeId node)
{
- Uint32 recv_thread_no = 0;
+ Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
struct thr_repository* rep = &g_thr_repository;
/**
* Note: take the send lock _first_, so that we will not hold the receive
@@ -1681,31 +1680,31 @@ trp_callback::lock_transporter(NodeId no
* non-waiting (so we will not block sending on other transporters).
*/
lock(&rep->m_send_buffers[node].m_send_lock);
- lock(&rep->m_receive_lock[recv_thread_no]);
+ lock(&rep->m_receive_lock[recv_thread_idx]);
}
void
trp_callback::unlock_transporter(NodeId node)
{
- Uint32 recv_thread_no = 0;
+ Uint32 recv_thread_idx = mt_get_recv_thread_idx(node);
struct thr_repository* rep = &g_thr_repository;
- unlock(&rep->m_receive_lock[recv_thread_no]);
+ unlock(&rep->m_receive_lock[recv_thread_idx]);
unlock(&rep->m_send_buffers[node].m_send_lock);
}
int
-mt_checkDoJob()
+mt_checkDoJob(Uint32 recv_thread_idx)
{
struct thr_repository* rep = &g_thr_repository;
- if (unlikely(check_job_buffers(rep)))
+ if (unlikely(check_job_buffers(rep, recv_thread_idx)))
{
do
{
/**
* theoretically (or when we do single threaded by using ndbmtd with
* all in same thread) we should execute signals here...to
- * prevent dead-lock, but...with current ndbmtd only CMVMI runs in
- * this thread, and other thread is waiting for CMVMI
+ * prevent dead-lock, but...with current ndbmtd only TRPMAN runs in
+ * this thread, and other thread is waiting for TRPMAN
* except for QMGR open/close connection, but that is not
* (i think) sufficient to create a deadlock
*/
@@ -1728,7 +1727,7 @@ mt_checkDoJob()
NdbSleep_MilliSleep(0);
#endif
- } while (check_job_buffers(rep));
+ } while (check_job_buffers(rep, recv_thread_idx));
}
return 0;
@@ -2844,6 +2843,11 @@ aligned_signal(unsigned char signal_buf[
TransporterReceiveHandleKernel *
g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
+/**
+ * Array for mapping nodes to receiver threads and function to access it.
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
extern "C"
void *
mt_receiver_thread_main(void *thr_arg)
@@ -2855,8 +2859,9 @@ mt_receiver_thread_main(void *thr_arg)
unsigned thr_no = selfptr->m_thr_no;
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
Uint32 thrSignalId = 0;
+ const Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
bool has_received = false;
- const unsigned recv_thread_idx = 0;
+ int cnt = 0;
init_thread(selfptr);
signal = aligned_signal(signal_buf, thr_no);
@@ -2865,6 +2870,7 @@ mt_receiver_thread_main(void *thr_arg)
* Object that keeps track of our pollReceive-state
*/
TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
+ recvdata.assign_nodes(g_node_to_recv_thr_map);
globalTransporterRegistry.init(recvdata);
/**
@@ -2874,8 +2880,6 @@ mt_receiver_thread_main(void *thr_arg)
while (globalData.theRestartFlag != perform_stop)
{
- static int cnt = 0;
-
if (cnt == 0)
{
watchDogCounter = 5;
@@ -2903,7 +2907,7 @@ mt_receiver_thread_main(void *thr_arg)
has_received = false;
if (globalTransporterRegistry.pollReceive(1, recvdata))
{
- if (check_job_buffers(rep) == 0)
+ if (check_job_buffers(rep, recv_thread_idx) == 0)
{
watchDogCounter = 8;
lock(&rep->m_receive_lock[recv_thread_idx]);
@@ -2970,7 +2974,7 @@ check_job_buffer_full(thr_data *selfptr)
* In order to prevent "job-buffer-full", i.e
* that one thread(T1) produces so much signals to another thread(T2)
* so that the ring-buffer from T1 to T2 gets full
- * the mainlop have 2 "config" variables
+ * the main loop have 2 "config" variables
* - m_max_exec_signals
* This is the *total* no of signals T1 can execute before calling
* this method again
@@ -3170,7 +3174,7 @@ sendlocal(Uint32 self, const SignalHeade
* to the other thread.
* This parameter found to be reasonable by benchmarking.
*/
- Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ?
+ Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self >= first_receiver_thread_no) ?
MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
MAX_SIGNALS_BEFORE_FLUSH_OTHER;
@@ -3541,9 +3545,9 @@ ThreadConfig::init()
Uint32 num_lqh_threads = globalData.ndbMtLqhThreads;
Uint32 num_tc_threads = globalData.ndbMtTcThreads;
Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
-
- receiver_thread_no = NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
- num_threads = receiver_thread_no + num_recv_threads;
+ first_receiver_thread_no =
+ NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
+ num_threads = first_receiver_thread_no + num_recv_threads;
require(num_threads <= MAX_BLOCK_THREADS);
ndbout << "NDBMT: number of block threads=" << num_threads << endl;
@@ -3565,6 +3569,45 @@ setcpuaffinity(struct thr_repository* re
}
}
+/**
+ * return receiver thread handling a particular node
+ * returned number is indexed from 0 and upwards to #receiver threads
+ * (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId)
+{
+ assert(nodeId < NDB_ARRAY_SIZE(g_node_to_recv_thr_map));
+ return g_node_to_recv_thr_map[nodeId];
+}
+
+static
+void
+assign_receiver_threads(void)
+{
+ Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
+ Uint32 recv_thread_idx = 0;
+ for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+ {
+ Transporter *node_trp =
+ globalTransporterRegistry.get_transporter(nodeId);
+
+ if (node_trp)
+ {
+ g_node_to_recv_thr_map[nodeId] = recv_thread_idx;
+ recv_thread_idx++;
+ if (recv_thread_idx == num_recv_threads)
+ recv_thread_idx = 0;
+ }
+ else
+ {
+ /* Flag for no transporter */
+ g_node_to_recv_thr_map[nodeId] = MAX_NODES;
+ }
+ }
+ return;
+}
+
void
ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
{
@@ -3576,38 +3619,59 @@ ThreadConfig::ipControlLoop(NdbThread* p
*/
setcpuaffinity(rep);
+ /**
+ * assign nodes to receiver threads
+ */
+ assign_receiver_threads();
+
/*
* Start threads for all execution threads, except for the receiver
* thread, which runs in the main thread.
*/
+
for (thr_no = 0; thr_no < num_threads; thr_no++)
{
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
- if (thr_no == receiver_thread_no)
+ if (thr_no == first_receiver_thread_no)
continue; // Will run in the main thread.
/*
* The NdbThread_Create() takes void **, but that is cast to void * when
* passed to the thread function. Which is kind of strange ...
*/
- rep->m_thread[thr_no].m_thread =
- NdbThread_Create(mt_job_thread_main,
- (void **)(rep->m_thread + thr_no),
- 1024*1024,
- "execute thread", //ToDo add number
- NDB_THREAD_PRIO_MEAN);
- require(rep->m_thread[thr_no].m_thread != NULL);
+ if (thr_no < first_receiver_thread_no)
+ {
+ /* Start block threads */
+ rep->m_thread[thr_no].m_thread =
+ NdbThread_Create(mt_job_thread_main,
+ (void **)(rep->m_thread + thr_no),
+ 1024*1024,
+ "execute thread", //ToDo add number
+ NDB_THREAD_PRIO_MEAN);
+ require(rep->m_thread[thr_no].m_thread != NULL);
+ }
+ else
+ {
+ /* Start a receiver thread, also block thread for TRPMAN */
+ rep->m_thread[thr_no].m_thread =
+ NdbThread_Create(mt_receiver_thread_main,
+ (void **)(rep->m_thread + thr_no),
+ 1024*1024,
+ "receive thread", //ToDo add number
+ NDB_THREAD_PRIO_MEAN);
+ require(rep->m_thread[thr_no].m_thread != NULL);
+ }
}
- /* Now run the main loop for thread 0 directly. */
- rep->m_thread[receiver_thread_no].m_thread = pThis;
- mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
+ /* Now run the main loop for first receiver thread directly. */
+ rep->m_thread[first_receiver_thread_no].m_thread = pThis;
+ mt_receiver_thread_main(&(rep->m_thread[first_receiver_thread_no]));
/* Wait for all threads to shutdown. */
for (thr_no = 0; thr_no < num_threads; thr_no++)
{
- if (thr_no == receiver_thread_no)
+ if (thr_no == first_receiver_thread_no)
continue;
void *dummy_return_status;
NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2012-01-23 20:23:08 +0000
@@ -47,7 +47,7 @@ SendStatus mt_send_remote(Uint32 self, c
void mt_section_lock();
void mt_section_unlock();
-int mt_checkDoJob();
+int mt_checkDoJob(Uint32 receiver_thread_idx);
/**
* Are we (not) multi threaded
@@ -109,4 +109,12 @@ mt_get_thr_stat(class SimulatedBlock *,
class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance);
+/**
+ * return receiver thread handling a particular node
+ * returned number is indexed from 0 and upwards to #receiver threads
+ * (or MAX_NODES is none)
+ */
+Uint32
+mt_get_recv_thread_idx(NodeId nodeId);
+
#endif
=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2012-01-04 11:13:53 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2012-01-23 19:50:55 +0000
@@ -462,15 +462,16 @@ THRConfig::do_validate()
}
/**
- * LDM can be 1 2 4 8 16
+ * LDM can be 1 2 4 8 12 16
*/
if (m_threads[T_LDM].size() != 1 &&
m_threads[T_LDM].size() != 2 &&
m_threads[T_LDM].size() != 4 &&
m_threads[T_LDM].size() != 8 &&
+ m_threads[T_LDM].size() != 12 &&
m_threads[T_LDM].size() != 16)
{
- m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,16. Specified: %u",
+ m_err_msg.assfmt("No of LDM-instances can be 1,2,4,8,12,16. Specified: %u",
m_threads[T_LDM].size());
return -1;
}
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2012-01-12 08:40:08 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2012-01-23 19:50:55 +0000
@@ -4828,6 +4828,7 @@ check_2n_number_less_16(Uint32 num)
case 2:
case 4:
case 8:
+ case 12:
case 16:
return true;
default:
@@ -4861,13 +4862,13 @@ checkThreadConfig(InitConfigFileParser::
if (!check_2n_number_less_16(lqhThreads))
{
- ctx.reportError("NumLqhThreads must be 0, 1,2,4,8 or 16");
+ ctx.reportError("NumLqhThreads must be 0, 1,2,4,8,12 or 16");
return false;
}
if (!check_2n_number_less_16(ndbLogParts) ||
ndbLogParts < 4)
{
- ctx.reportError("NoOfLogParts must be 4,8 or 16");
+ ctx.reportError("NoOfLogParts must be 4,8,12 or 16");
return false;
}
if (ctx.m_currentSection->get("ThreadConfig", &thrconfig))
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2011-11-11 08:35:14 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-01-23 20:25:28 +0000
@@ -36,9 +36,10 @@
#define MAX_PARTS 4
#define MAX_SEEK 16
#define MAXSTRLEN 16
-#define MAXATTR 64
+#define MAXATTR 511
#define MAXTABLES 1
#define NDB_MAXTHREADS 128
+#define NDB_MAX_NODES 48
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
#define from <sys/thread.h> on AIX (IBM compiler). We explicitly
@@ -93,8 +94,8 @@ static bool executeTransLoop(ThreadNdb*
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
-static Uint32 getKey(Uint32, Uint32) ;
static void input_error();
+static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
static int retry_opt = 3 ;
@@ -108,7 +109,10 @@ static int
static longlong ThreadExecutions[NDB_MAXTHREADS];
static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
+static const NdbDictionary::Table * tables[MAXTABLES];
static char attrName[MAXATTR][MAXSTRLEN+1];
+static bool nodeTableArray[MAXTABLES][NDB_MAX_NODES + 1];
+static Uint32 numberNodeTable[MAXTABLES];
static RunType tRunType = RunAll;
static int tStdTableNum = 0;
static int tWarmupTime = 10; //Seconds
@@ -119,8 +123,7 @@ static int
static NdbRecord * g_record[MAXTABLES];
static bool tNdbRecord = false;
-static bool tLocal = false;
-static int tLocalPart = 0;
+static int tLocal = 0;
static int tSendForce = 0;
static int tNoOfLoops = 1;
static int tAttributeSize = 1;
@@ -606,19 +609,18 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
tRunType == RunUpdate ||
tRunType == RunDelete)
{
- longlong total_executions = 0;
- longlong total_transactions;
+ longlong total_transactions = 0;
longlong exec_time;
if (tRunType == RunInsert || tRunType == RunDelete) {
- total_executions = (longlong)tNoOfTransactions;
- total_executions *= (longlong)tNoOfThreads;
+ total_transactions = (longlong)tNoOfTransactions;
+ total_transactions *= (longlong)tNoOfThreads;
+ total_transactions *= (longlong)tNoOfParallelTrans;
} else {
for (Uint32 i = 0; i < tNoOfThreads; i++){
- total_executions += ThreadExecutions[i];
+ total_transactions += ThreadExecutions[i];
}
}
- total_transactions = total_executions * tNoOfParallelTrans;
if (tRunType == RunInsert || tRunType == RunDelete) {
exec_time = (longlong)timer.elapsedTime();
} else {
@@ -654,9 +656,10 @@ threadLoop(void* ThreadData)
ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
int threadNo = tabThread->ThreadNo;
localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
- localNdb->init(1024);
+ localNdb->init(MAXPAR);
localNdb->waitUntilReady(10000);
- unsigned int threadBase = (threadNo << 16);
+ unsigned int threadBase = threadNo;
+
for (;;){
while (ThreadStart[threadNo] == stIdle) {
@@ -671,11 +674,18 @@ threadLoop(void* ThreadData)
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
- if(!executeThread(tabThread, tType, localNdb, threadBase)){
+ if(!executeThread(tabThread,
+ tType,
+ localNdb,
+ threadBase)){
break;
}
} else {
- if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+ if(!executeTransLoop(tabThread,
+ tType,
+ localNdb,
+ threadBase,
+ threadNo)){
break;
}
}
@@ -691,81 +701,142 @@ threadLoop(void* ThreadData)
static int error_count = 0;
static bool
+update_num_ops(Uint32 *num_ops, NdbConnection **tConArray)
+{
+ /*
+ Move num_ops forward to next unused position, can be old
+ transactions still outstanding
+ */
+ for ( ; *num_ops < tNoOfParallelTrans; (*num_ops)++)
+ {
+ if (tConArray[*num_ops])
+ continue;
+ else
+ break;
+ }
+ if (*num_ops == tNoOfParallelTrans)
+ return true;
+ return false;
+}
+
+static int
executeTrans(ThreadNdb* pThread,
StartType aType,
Ndb* aNdbObject,
unsigned int threadBase,
- unsigned int i)
+ unsigned int record,
+ Uint32 nodeId,
+ NdbConnection **tConArray,
+ bool execute_all)
{
- NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
+ Uint32 threadBaseLoc, threadBaseLoc2;
+ Uint32 num_ops = 0;
+ Uint32 i, loops;
- if (tLocal == false) {
- tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
- } else {
- tBase = i * tNoOfParallelTrans * MAX_SEEK;
- }//if
START_REAL_TIME;
- for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
- if (tLocal == false) {
+ for (i = record, loops = 0;
+ i < tNoOfTransactions &&
+ loops < 16 &&
+ num_ops < tNoOfParallelTrans;
+ i++, loops++)
+ {
+ tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+ threadBaseLoc = (threadBase * tNoOfTransactions * tNoOfParallelTrans) +
+ (i * tNoOfParallelTrans);
+ for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+ if (update_num_ops(&num_ops, tConArray))
+ break;
+ threadBaseLoc2 = threadBaseLoc + j;
tBase2 = tBase + (j * tNoOfOpsPerTrans);
- } else {
- tBase2 = tBase + (j * MAX_SEEK);
- tBase2 = getKey(threadBase, tBase2);
- }//if
- if (startTransGuess == true) {
- union {
- Uint64 Tkey64;
- Uint32 Tkey32[2];
- };
- Tkey32[0] = threadBase;
- Tkey32[1] = tBase2;
- tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
- (const char*)&Tkey64, //Main PKey
- (Uint32)4); //Key Length
- } else {
- tConArray[j] = aNdbObject->startTransaction();
- }//if
- if (tConArray[j] == NULL){
- error_handler(aNdbObject->getNdbError());
- ndbout << endl << "Unable to recover! Quiting now" << endl ;
- return false;
- }//if
-
- for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
- //-------------------------------------------------------
- // Define the operation, but do not execute it yet.
- //-------------------------------------------------------
- if (tNdbRecord)
- defineNdbRecordOperation(pThread,
- tConArray[j], aType, threadBase,(tBase2+k));
+ if (startTransGuess == true) {
+ union {
+ Uint64 _align;
+ Uint32 Tkey32[2];
+ };
+ (void)_align;
+
+ Tkey32[0] = threadBaseLoc2;
+ Tkey32[1] = tBase2;
+ Ndb::Key_part_ptr hint[2];
+ hint[0].ptr = Tkey32+0;
+ hint[0].len = 4;
+ hint[1].ptr = 0;
+ hint[1].len = 0;
+
+ tConArray[num_ops] = aNdbObject->startTransaction(tables[0], hint);
+ }
else
- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
- }//for
+ {
+ tConArray[num_ops] = aNdbObject->startTransaction();
+ }
+
+ if (tConArray[num_ops] == NULL){
+ error_handler(aNdbObject->getNdbError());
+ ndbout << endl << "Unable to recover! Quitting now" << endl ;
+ return -1;
+ }//if
+
+ if (nodeId != 0 &&
+ tConArray[num_ops]->getConnectedNodeId() != nodeId)
+ {
+ /*
+ We're running only local operations, this won't be local,
+ ignore this record
+ */
+ aNdbObject->closeTransaction(tConArray[num_ops]);
+ continue;
+ }
+ for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
+ //-------------------------------------------------------
+ // Define the operation, but do not execute it yet.
+ //-------------------------------------------------------
+ if (tNdbRecord)
+ defineNdbRecordOperation(pThread,
+ tConArray[num_ops],
+ aType,
+ threadBaseLoc2,
+ (tBase2+k));
+ else
+ defineOperation(tConArray[num_ops],
+ aType,
+ threadBaseLoc2,
+ (tBase2 + k));
+ }//for
- tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+ tConArray[num_ops]->executeAsynchPrepare(Commit,
+ &executeCallback,
+ (void*)&tConArray[num_ops]);
+ num_ops++;
+ }//for
}//for
STOP_REAL_TIME;
+ if (num_ops == 0)
+ return 0;
//-------------------------------------------------------
// Now we have defined a set of operations, it is now time
- // to execute all of them.
+ // to execute all of them. If execute_all isn't set, we
+ // only execute at least half of them. In this manner we
+ // can cater for different execution speeds in different
+ // parts of the system.
//-------------------------------------------------------
- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
- while (unsigned(Tcomp) < tNoOfParallelTrans) {
- int TlocalComp = aNdbObject->pollNdb(3000, 0);
+ int min_execs = execute_all ? (int)num_ops :
+ (num_ops > 1 ? (int)(num_ops / 2) : 1);
+ int Tcomp = aNdbObject->sendPollNdb(3000,
+ min_execs,
+ tSendForce);
+ while (Tcomp < min_execs) {
+ int TlocalComp = aNdbObject->pollNdb(3000, min_execs - Tcomp);
Tcomp += TlocalComp;
}//while
- for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
- if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
- error_count++;
- ndbout << "i = " << i << ", j = " << j << ", error = ";
- ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
- ndbout << hex << threadBase << endl;
- }
- aNdbObject->closeTransaction(tConArray[j]);
- }//for
- return true;
+ if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+ error_count++;
+ ndbout << "i = " << i << ", error = ";
+ ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+ ndbout << hex << threadBase << endl;
+ }
+ return Tcomp;
}
static
@@ -779,28 +850,66 @@ executeTransLoop(ThreadNdb* pThread,
int time_expired;
longlong executions = 0;
unsigned int i = 0;
+ Uint32 nodeId;
+ int ops = 0;
+ int record;
+ Uint32 local_count = 0;
+ bool execute_all = true;
DEFINE_TIMER;
+ NdbConnection* tConArray[MAXPAR];
+ for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
+ if (tLocal > 0)
+ {
+ nodeId = get_my_node_id((Uint32)0, threadBase);
+ }
+ else
+ nodeId = 0;
ThreadExecutions[threadNo] = 0;
START_TIMER;
do
{
- if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+ if (tLocal == 2)
+ {
+ /* Select node on round robin basis */
+ local_count++;
+ nodeId = get_my_node_id((Uint32)0, local_count);
+ }
+ else if (tLocal == 3)
+ {
+ /* Select node on random basis */
+ local_count = (Uint32)(rand() % numberNodeTable[0]);
+ nodeId = get_my_node_id((Uint32)0, local_count);
+ }
+ record = rand() % tNoOfTransactions;
+ if ((ops = executeTrans(pThread,
+ aType,
+ aNdbObject,
+ threadBase,
+ (Uint32)record,
+ nodeId,
+ tConArray,
+ execute_all)) < 0)
return false;
STOP_TIMER;
+ if (!continue_flag)
+ break;
time_expired = (int)(timer.elapsedTime() / 1000);
if (time_expired < tWarmupTime)
; //Do nothing
else if (time_expired < (tWarmupTime + tExecutionTime)){
- executions++; //Count measurement
+ executions += ops; //Count measurement
}
else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
; //Do nothing
else
+ {
+ execute_all = true;
continue_flag = false; //Time expired
+ }
if (i == tNoOfTransactions) /* Make sure the record exists */
i = 0;
- } while (continue_flag);
+ } while (1);
ThreadExecutions[threadNo] = executions;
return true;
}//executeTransLoop()
@@ -808,39 +917,32 @@ executeTransLoop(ThreadNdb* pThread,
static
bool
executeThread(ThreadNdb* pThread,
- StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+ StartType aType,
+ Ndb* aNdbObject,
+ unsigned int threadBase) {
+ NdbConnection* tConArray[MAXPAR];
+
+ for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
for (unsigned int i = 0; i < tNoOfTransactions; i++) {
- if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+ if ((executeTrans(pThread,
+ aType,
+ aNdbObject,
+ threadBase,
+ i,
+ (Uint32)0,
+ tConArray,
+ true)) < 0)
return false;
}//for
return true;
}//executeThread()
-static
-Uint32
-getKey(Uint32 aBase, Uint32 anIndex) {
- Uint32 Tfound = anIndex;
- union {
- Uint64 Tkey64;
- Uint32 Tkey32[2];
- };
- Tkey32[0] = aBase;
- Uint32 hash;
- for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
- Tkey32[1] = (Uint32)i;
- hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
- hash = (hash >> 6) & (MAX_PARTS - 1);
- if (hash == unsigned(tLocalPart)) {
- Tfound = i;
- break;
- }//if
- }//for
- return Tfound;
-}//getKey()
-
static void
executeCallback(int result, NdbConnection* NdbObject, void* aObject)
{
+ NdbConnection **array_ref = (NdbConnection**)aObject;
+ assert(NdbObject == *array_ref);
+ *array_ref = NULL;
if (result == -1) {
// Add complete error handling here
@@ -860,8 +962,8 @@ executeCallback(int result, NdbConnectio
// ndbout << "Error occured in poll:" << endl;
// ndbout << NdbObject->getNdbError() << endl;
failed++ ;
- return;
}//if
+ NdbObject->close(); /* Close transaction */
return;
}//executeCallback()
@@ -929,14 +1031,16 @@ defineOperation(NdbConnection* localNdbC
error_handler(localNdbOperation->getNdbError());
}//default
}//switch
- localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
+
+ localNdbOperation->equal((Uint32)0, (char*)(attrValue + 0));
+ localNdbOperation->equal((Uint32)1, (char*)(attrValue + 1));
switch (aType) {
case stInsert: // Insert case
case stUpdate: // Update Case
{
for (countAttributes = 1;
countAttributes < loopCountAttributes; countAttributes++) {
- localNdbOperation->setValue(countAttributes,
+ localNdbOperation->setValue(countAttributes + 1,
(char*)&attrValue[0]);
}//for
break;
@@ -944,7 +1048,7 @@ defineOperation(NdbConnection* localNdbC
case stRead: { // Read Case
for (countAttributes = 1;
countAttributes < loopCountAttributes; countAttributes++) {
- localNdbOperation->getValue(countAttributes,
+ localNdbOperation->getValue(countAttributes + 1,
(char*)&attrValue[0]);
}//for
break;
@@ -1059,86 +1163,126 @@ dropTables(Ndb* pMyNdb)
}
}
+/*
+ Set up nodeTableArray with a boolean true for all nodes that
+ contains the table.
+*/
+static int
+setUpNodeTableArray(Uint32 tableNo, const NdbDictionary::Table *pTab)
+{
+ Uint32 numFragments = pTab->getFragmentCount();
+ Uint32 nodeId;
+ for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+ nodeTableArray[tableNo][i] = false;
+ for (Uint32 i = 0; i < numFragments; i++)
+ {
+ if ((pTab->getFragmentNodes(i, &nodeId, (Uint32)1)) == 0)
+ {
+ return 1;
+ }
+ nodeTableArray[tableNo][nodeId] = true;
+ }
+ numberNodeTable[tableNo] = 0;
+ for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+ {
+ if (nodeTableArray[tableNo][i])
+ numberNodeTable[tableNo]++;
+ }
+ return 0;
+}
+
+static Uint32
+get_my_node_id(Uint32 tableNo, Uint32 threadNo)
+{
+ Uint32 count = 0;
+ Uint32 n = threadNo % numberNodeTable[tableNo];
+ for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+ {
+ if (nodeTableArray[tableNo][i])
+ {
+ if (count == n)
+ return i;
+ count++;
+ }
+ }
+ return 0;
+}
+
static
int
createTables(Ndb* pMyNdb){
- NdbSchemaCon *MySchemaTransaction;
- NdbSchemaOp *MySchemaOp;
- int check;
-
- if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
- for(int i=0; i < MAXTABLES ;i++) {
+ NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+ if (theTableCreateFlag == 0 || tRunType == RunCreateTable)
+ {
+ for(int i=0; i < MAXTABLES ;i++)
+ {
ndbout << "Creating " << tableName[i] << "..." << endl;
- MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
-
- if(MySchemaTransaction == NULL &&
- (!error_handler(MySchemaTransaction->getNdbError())))
- return -1;
-
- MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
- if(MySchemaOp == NULL &&
- (!error_handler(MySchemaTransaction->getNdbError())))
- return -1;
+ NdbDictionary::Table tab;
+ tab.setName(tableName[i]);
+ if (tempTable)
+ {
+ tab.setLogging(false);
+ }
- check = MySchemaOp->createTable( tableName[i]
- ,8 // Table Size
- ,TupleKey // Key Type
- ,40 // Nr of Pages
- ,All
- ,6
- ,(tLoadFactor - 5)
- ,(tLoadFactor)
- ,1
- ,!tempTable
- );
-
- if (check == -1 &&
- (!error_handler(MySchemaTransaction->getNdbError())))
- return -1;
-
- check = MySchemaOp->createAttribute( (char*)attrName[0],
- TupleKey,
- 32,
- PKSIZE,
- UnSigned,
- MMBased,
- NotNullAttribute );
-
- if (check == -1 &&
- (!error_handler(MySchemaTransaction->getNdbError())))
- return -1;
- for (unsigned j = 1; j < tNoOfAttributes ; j++){
- check = MySchemaOp->createAttribute( (char*)attrName[j],
- NoKey,
- 32,
- tAttributeSize,
- UnSigned,
- MMBased,
- NotNullAttribute );
- if (check == -1 &&
- (!error_handler(MySchemaTransaction->getNdbError())))
- return -1;
+ {
+ NdbDictionary::Column distkey;
+ distkey.setName("DISTKEY");
+ distkey.setType(NdbDictionary::Column::Unsigned);
+ distkey.setPrimaryKey(true);
+ distkey.setDistributionKey(true);
+ tab.addColumn(distkey);
}
-
- if (MySchemaTransaction->execute() == -1 &&
- (!error_handler(MySchemaTransaction->getNdbError())))
+
+ {
+ NdbDictionary::Column pk;
+ pk.setName(attrName[0]);
+ pk.setType(NdbDictionary::Column::Unsigned);
+ pk.setPrimaryKey(true);
+ tab.addColumn(pk);
+ }
+
+ for (unsigned j = 1; j < tNoOfAttributes ; j++)
+ {
+ NdbDictionary::Column col;
+ col.setName(attrName[j]);
+ col.setType(NdbDictionary::Column::Unsigned);
+ col.setLength(tAttributeSize);
+ tab.addColumn(col);
+ }
+
+ int res = pDict->createTable(tab);
+ if (res != 0)
+ {
+ ndbout << pDict->getNdbError() << endl;
return -1;
-
- NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+ }
+ }
+ }
+
+ for(int i=0; i < MAXTABLES ;i++)
+ {
+ const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+ if (pTab == NULL)
+ {
+ error_handler(pDict->getNdbError());
+ return -1;
+ }
+ tables[i] = pTab;
+ if (setUpNodeTableArray(i, pTab))
+ {
+ error_handler(pDict->getNdbError());
+ return -1;
}
}
+
if (tNdbRecord)
{
- for(int i=0; i < MAXTABLES ;i++) {
- NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
- const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+ for(int i=0; i < MAXTABLES ;i++)
+ {
+ const NdbDictionary::Table * pTab = tables[i];
- if (pTab == NULL){
- error_handler(pDict->getNdbError());
- return -1;
- }
int off = 0;
Vector<NdbDictionary::RecordSpecification> spec;
for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
@@ -1241,13 +1385,12 @@ readArguments(int argc, const char** arg
return -1;
}
} else if (strcmp(argv[i], "-local") == 0){
- tLocalPart = atoi(argv[i+1]);
- tLocal = true;
- startTransGuess = true;
- if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
- ndbout_c("Invalid local part");
+ tLocal = atoi(argv[i+1]);
+ if (tLocal < 1 || (tLocal > 3)){
+ ndbout_c("Invalid local value, only 1,2 or 3 allowed");
return -1;
}
+ startTransGuess = true;
} else if (strcmp(argv[i], "-simple") == 0){
theSimpleFlag = 1;
argc++;
@@ -1340,7 +1483,7 @@ readArguments(int argc, const char** arg
argc -= 2;
i = i + 2;
}//while
- if (tLocal == true) {
+ if (tLocal > 0) {
if (tNoOfOpsPerTrans != 1) {
ndbout_c("Not valid to have more than one op per trans with local");
}//if
@@ -1377,7 +1520,7 @@ input_error(){
ndbout_c(" -adaptive Use adaptive send algorithm (default)");
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
- ndbout_c(" -local Number of part, only use keys in one part out of 16");
+ ndbout_c(" -local 1 = each thread its own node, 2 = round robin on node per parallel trans 3 = random node per parallel trans");
ndbout_c(" -ndbrecord Use NDB Record");
ndbout_c(" -r Number of extra loops");
ndbout_c(" -insert Only run inserts on standard table");
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4814 to 4818) | jonas oreland | 25 Jan |