3792 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3791 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3790 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3789 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3788 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3787 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3786 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3785 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3784 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3783 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3782 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3781 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3780 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3779 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3778 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3777 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3776 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3775 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3774 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3773 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3772 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3771 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3770 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3769 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3768 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3767 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3766 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3765 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3764 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3763 Mikael Ronstrom 2012-03-01
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3762 Mikael Ronstrom 2012-03-01
Fixes for debug printouts
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3761 Mikael Ronstrom 2012-03-01
Fixes for debug printouts
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3760 Mikael Ronstrom 2012-03-01
Make source distributions compilable with debug
modified:
cmake/maintainer.cmake
3759 Mikael Ronstrom 2012-03-01
More debug printouts for new flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3758 Mikael Ronström 2012-02-29
Signal to wake waiting executor threads
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
=== modified file 'cmake/maintainer.cmake'
--- a/cmake/maintainer.cmake revid:mikael@dator9-20120229143837-wp3s7fcdo5bqnykz
+++ b/cmake/maintainer.cmake revid:mikael.ronstrom@stripped20120302185524-mfpfeas69q69kqyy
@@ -18,7 +18,7 @@ INCLUDE(CheckCCompilerFlag)
# Setup GCC (GNU C compiler) warning options.
MACRO(SET_MYSQL_MAINTAINER_GNU_C_OPTIONS)
SET(MY_MAINTAINER_WARNINGS
- "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing -Werror")
+ "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing")
CHECK_C_COMPILER_FLAG("-Wdeclaration-after-statement"
HAVE_DECLARATION_AFTER_STATEMENT)
IF(HAVE_DECLARATION_AFTER_STATEMENT)
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael@dator9-20120229143837-wp3s7fcdo5bqnykz
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael.ronstrom@stripped0302185524-mfpfeas69q69kqyy
@@ -56,6 +56,9 @@
#define MAXATTRSIZE 1000
#define PKSIZE 2
+#define DEB_PRINT(a) ndbout_c a
+//#define DEB_PRINT(a)
+
enum StartType {
stIdle = 0,
stInsert = 1,
@@ -101,9 +104,10 @@ static void executeCallback(int result,
static bool error_handler(const NdbError & err);
static void input_error();
static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
-static void main_thread(RunType run_type);
-static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
+static void main_thread(RunType run_type, NdbTimer & timer);
+static Uint64 get_total_transactions();
+static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
static int retry_opt = 3 ;
static int failed = 0 ;
@@ -153,7 +157,6 @@ static int
static int theStdTableNameFlag = 0;
static int theTableCreateFlag = 0;
static int tConnections = 1;
-static int tStop = 0;
#define START_REAL_TIME
#define STOP_REAL_TIME
@@ -186,13 +189,13 @@ resetThreads(){
}
static void
-waitForThreads(void)
+waitForThreads(Uint32 num_threads_to_wait_for)
{
int cont = 0;
do {
cont = 0;
NdbSleep_MilliSleep(20);
- for (unsigned i = 0; i < tNoOfThreads ; i++) {
+ for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
if (ThreadReady[i] == 0) {
cont = 1;
}//if
@@ -264,7 +267,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ndbout << endl;
- NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
+ NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
/* print Setting */
flexAsynchErrorData->printSettings(ndbout);
@@ -319,7 +322,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
- if (tNdbRecord)
+ if (tNdbRecord && !tNew)
{
Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
sz += 3;
@@ -335,7 +338,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
tRunType != RunDropTable){
if (tNew)
{
- main_thread(tRunType);
+ main_thread(tRunType, timer);
}
else
{
@@ -370,27 +373,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
tRunType == RunUpdate ||
tRunType == RunDelete)
{
- longlong total_transactions = 0;
- longlong exec_time;
+ Uint64 total_transactions = 0;
+ Uint64 exec_time;
- if (tRunType == RunInsert || tRunType == RunDelete) {
- total_transactions = (longlong)tNoOfTransactions;
- total_transactions *= (longlong)tNoOfThreads;
- total_transactions *= (longlong)tNoOfParallelTrans;
- } else {
- for (Uint32 i = 0; i < tNoOfThreads; i++){
- total_transactions += ThreadExecutions[i];
+ if (tNew)
+ {
+ total_transactions = get_total_transactions();
+ }
+ else
+ {
+ if (tRunType == RunInsert || tRunType == RunDelete) {
+ total_transactions = (longlong)tNoOfTransactions;
+ total_transactions *= (longlong)tNoOfThreads;
+ total_transactions *= (longlong)tNoOfParallelTrans;
+ } else {
+ for (Uint32 i = 0; i < tNoOfThreads; i++){
+ total_transactions += ThreadExecutions[i];
+ }
}
}
if (tRunType == RunInsert || tRunType == RunDelete) {
- exec_time = (longlong)timer.elapsedTime();
+ exec_time = (Uint64)timer.elapsedTime();
} else {
- exec_time = (longlong)tExecutionTime * 1000;
+ exec_time = (Uint64)tExecutionTime * 1000;
}
ndbout << "Total number of transactions is " << total_transactions;
ndbout << endl;
ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+ if (!exec_time)
+ {
+ exec_time = 1; /* Avoid floating point exception */
+ ndbout_c("Zero execution time!!!");
+ }
total_transactions = (total_transactions * 1000) / exec_time;
int trans_per_sec = (int)total_transactions;
ndbout << "Total transactions per second " << trans_per_sec << endl;
@@ -406,7 +421,7 @@ static void execute(StartType aType)
{
resetThreads();
tellThreads(aType);
- waitForThreads();
+ waitForThreads(tNoOfThreads);
}//execute()
static void*
@@ -1128,6 +1143,9 @@ setAggregateRun(void)
* NDB Record format in the NDB API.
*/
+static void* definer_thread(void *data);
+static void* executor_thread(void *data);
+
static Uint32 tNoOfExecutorThreads;
static Uint32 tNoOfDefinerThreads;
@@ -1159,70 +1177,232 @@ struct key_list_header
Uint32 num_in_list;
};
-typedef struct thread_data THREAD_DATA;
-struct thread_data
+
+typedef struct thread_data_struct THREAD_DATA;
+struct thread_data_struct
{
KEY_LIST_HEADER list_header;
char *record;
Uint32 thread_id;
+ bool ready;
+ bool stop;
+ bool start;
NdbMutex *transport_mutex;
struct NdbCondition *transport_cond;
+ struct NdbCondition *main_cond;
+ struct NdbCondition *start_cond;
};
+THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
-static void* definer_thread(void *data);
-static void* executor_thread(void *data);
+static Uint64
+get_total_transactions()
+{
+ Uint64 total_transactions = 0;
+
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++){
+ total_transactions += ThreadExecutions[i];
+ }
+ return total_transactions;
+}
static void
-wait_for_start_signal(THREAD_DATA *my_thread_data)
+init_list_headers(KEY_LIST_HEADER *list_header,
+ Uint32 num_list_headers)
+{
+ Uint32 i;
+ KEY_LIST_HEADER *list_header_ref;
+ char *list_header_ptr = (char*)list_header;
+ for (i = 0;
+ i < num_list_headers;
+ i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
+ {
+ list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
+ list_header_ref->first_in_list = NULL;
+ list_header_ref->last_in_list = NULL;
+ list_header_ref->num_in_list = 0;
+ }
+}
+
+static Uint32
+get_thread_id_safe(THREAD_DATA *my_thread_data)
{
NdbMutex_Lock(my_thread_data->transport_mutex);
- NdbCondition_Wait(my_thread_data->transport_cond,
- my_thread_data->transport_mutex);
+ Uint32 thread_id = my_thread_data->thread_id;
NdbMutex_Unlock(my_thread_data->transport_mutex);
+ return thread_id;
}
static void
-signal_thread_to_start(THREAD_DATA *thread_data)
+wait_thread_ready(THREAD_DATA *my_thread_data)
{
- NdbMutex_Lock(thread_data->transport_mutex);
- NdbCondition_Signal(thread_data->transport_cond);
- NdbMutex_Unlock(thread_data->transport_mutex);
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (1)
+ {
+ if (my_thread_data->ready)
+ break;
+ NdbCondition_Wait(my_thread_data->main_cond,
+ my_thread_data->transport_mutex);
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
}
static void
-start_definer_thread(THREAD_DATA *thread_data, Uint32 thread_id)
+wait_for_threads_ready(Uint32 num_threads)
{
- thread_data->thread_id = thread_id;
+ for (Uint32 i = 0; i < num_threads; i++)
+ wait_thread_ready(&thread_data_array[i]);
+}
+
+static void
+signal_thread_to_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->start = true;
+ my_thread_data->ready = false;
+ NdbCondition_Signal(my_thread_data->start_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_start(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
+}
+
+static void
+signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->ready = true;
+ NdbCondition_Signal(my_thread_data->main_cond);
+ while (1)
+ {
+ if (my_thread_data->start)
+ break;
+ NdbCondition_Wait(my_thread_data->start_cond,
+ my_thread_data->transport_mutex);
+ }
+ my_thread_data->start = false;
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_thread_to_stop(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->stop = true;
+ NdbCondition_Signal(my_thread_data->transport_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_stop()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_stop()
+{
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+destroy_thread_data(THREAD_DATA *my_thread_data)
+{
+ free(my_thread_data->record);
+ NdbMutex_Destroy(my_thread_data->transport_mutex);
+ NdbCondition_Destroy(my_thread_data->transport_cond);
+ NdbCondition_Destroy(my_thread_data->start_cond);
+ NdbCondition_Destroy(my_thread_data->main_cond);
+}
+
+static void
+init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+ my_thread_data->record = (char*)malloc(sz);
+ memset(my_thread_data->record, 0, sz);
+ init_list_headers(&my_thread_data->list_header, 1);
+ my_thread_data->stop = false;
+ my_thread_data->ready = false;
+ my_thread_data->start = false;
+ my_thread_data->transport_mutex = NdbMutex_Create();
+ my_thread_data->transport_cond = NdbCondition_Create();
+ my_thread_data->main_cond = NdbCondition_Create();
+ my_thread_data->start_cond = NdbCondition_Create();
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->thread_id = thread_id;
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+start_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ init_thread_data(my_thread_data, thread_id);
+ DEB_PRINT(("Now starting definer thread, id = %u", thread_id));
threadLife[thread_id] = NdbThread_Create(definer_thread,
- (void**)thread_data,
+ (void**)my_thread_data,
1024 * 1024,
"flexAsynchThread",
NDB_THREAD_PRIO_LOW);
}
static void
-start_executor_thread(THREAD_DATA *thread_data, Uint32 thread_id)
+start_definer_threads()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ {
+ Uint32 thread_id = i;
+ start_definer_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
+
+static void
+start_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
{
- thread_data->thread_id = thread_id;
+ init_thread_data(my_thread_data, thread_id);
+ DEB_PRINT(("Now starting executor thread, id = %u", thread_id));
threadLife[thread_id] = NdbThread_Create(executor_thread,
- (void**)thread_data,
+ (void**)my_thread_data,
1024 * 1024,
"flexAsynchThread",
NDB_THREAD_PRIO_LOW);
}
-THREAD_DATA thread_data[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
+static void
+start_executor_threads()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ Uint32 thread_id = tNoOfDefinerThreads + i;
+ start_executor_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
static void
-main_thread(RunType start_type)
+main_thread(RunType start_type, NdbTimer & timer)
{
bool insert_delete;
+ DEB_PRINT(("Executing type = %u in new manner", (uint)start_type));
tNoOfExecutorThreads = tNoOfThreads;
tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
- resetThreads();
+ DEB_PRINT((
+ "num threads = %u, num definer threads = %u, num executor threads = %u",
+ tNoOfThreads, tNoOfDefinerThreads, tNoOfExecutorThreads));
if (start_type == RunInsert ||
start_type == RunDelete)
@@ -1230,49 +1410,48 @@ main_thread(RunType start_type)
else
insert_delete = false;
- for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
- {
- Uint32 thread_id = i;
- start_definer_thread(&thread_data[thread_id], thread_id);
- }
- for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
- {
- Uint32 thread_id = tNoOfDefinerThreads + i;
- start_executor_thread(&thread_data[thread_id], thread_id);
- }
- waitForThreads(); /* Wait for threads to complete startup */
+ start_definer_threads();
+ start_executor_threads();
+
+ wait_for_threads_ready(tNoOfThreads);
+
+ DEB_PRINT(("main: Definer and execution threads have completed startup"));
/**
* Start threads, start with execution threads to ensure they are
* up and running before definer threads starts sending data to
* them
*/
- for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
- signal_thread_to_start(&thread_data[tNoOfDefinerThreads + i]);
- for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
- signal_thread_to_start(&thread_data[i]);
+ START_TIMER;
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
- if (insert_delete)
- {
- waitForThreads();
- }
- else
+ if (!insert_delete)
{
sleep(tWarmupTime);
tRunState = EXECUTING;
sleep(tExecutionTime);
tRunState = COOLDOWN;
sleep(tCooldownTime);
- tStop = true;
- waitForThreads();
+ signal_definer_threads_to_stop();
}
+ wait_for_threads_ready(tNoOfDefinerThreads);
+ STOP_TIMER;
+ DEB_PRINT(("main: Definer threads are done"));
+ signal_executor_threads_to_stop();
+ wait_for_threads_ready(tNoOfThreads);
+ DEB_PRINT(("main: Definer and Executor threads are done"));
+
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
void * tmp;
- for(Uint32 i = 0; i < (tNoOfDefinerThreads + tNoOfExecutorThreads); i++)
+ for(Uint32 i = 0; i < tNoOfThreads; i++)
{
NdbThread_WaitFor(threadLife[i], &tmp);
NdbThread_Destroy(&threadLife[i]);
}
+ DEB_PRINT(("main: Definer and Executor threads are gone"));
}
static NdbConnection*
@@ -1336,43 +1515,6 @@ get_first_free(KEY_LIST_HEADER *list_hea
return key_op;
}
-static void
-init_list_headers(KEY_LIST_HEADER *list_header,
- Uint32 num_list_headers)
-{
- Uint32 i;
- KEY_LIST_HEADER *list_header_ref;
- char *list_header_ptr = (char*)list_header;
- for (i = 0;
- i < num_list_headers;
- list_header_ptr += sizeof(KEY_LIST_HEADER))
- {
- list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
- list_header_ref->first_in_list = NULL;
- list_header_ref->last_in_list = NULL;
- list_header_ref->num_in_list = 0;
- }
-}
-
-static void
-init_thread_data(THREAD_DATA *thread_data)
-{
- Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
- thread_data->record = (char*)malloc(sz);
- memset(&thread_data->record, 0, sz);
- init_list_headers(&thread_data->list_header, 1);
- thread_data->transport_mutex = NdbMutex_Create();
- thread_data->transport_cond = NdbCondition_Create();
-}
-
-static void
-destroy_thread_data(THREAD_DATA *thread_data)
-{
- free(thread_data->record);
- NdbMutex_Destroy(thread_data->transport_mutex);
- NdbCondition_Destroy(thread_data->transport_cond);
-}
-
/**
* Retrieve a linked list of prepared operations. If no operations
* prepared we wait on a condition until operations are defined for
@@ -1386,7 +1528,7 @@ receive_operations(THREAD_DATA *my_threa
KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
NdbMutex_Lock(my_thread_data->transport_mutex);
- while (!first_in_list)
+ while (!first_in_list && !my_thread_data->stop)
{
first_in_list = thread_list_header->first_in_list;
list_header->first_in_list = thread_list_header->first_in_list;
@@ -1428,8 +1570,10 @@ static void
send_operations(Uint32 thread_id,
KEY_LIST_HEADER *list_header)
{
- THREAD_DATA *recv_thread = &thread_data[thread_id];
+ THREAD_DATA *recv_thread = &thread_data_array[thread_id];
+ DEB_PRINT(("Sending %u operations to thread id %u",
+ list_header->num_in_list, thread_id));
NdbMutex_Lock(recv_thread->transport_mutex);
if (!recv_thread->list_header.first_in_list &&
list_header->first_in_list)
@@ -1475,6 +1619,7 @@ get_thread_id_for_record(Uint32 record_i
Uint32 node_count,
Uint32 thread_count,
Uint32 thread_group,
+ Uint32 num_thread_groups,
Ndb *my_ndb)
{
Uint32 thread_id;
@@ -1482,11 +1627,17 @@ get_thread_id_for_record(Uint32 record_i
Uint32 node_id = trans->getConnectedNodeId();
trans->close();
Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
+ if (node_count >= thread_count)
+ {
+ thread_id = node_rel_id % thread_count;
+ return thread_id;
+ }
+
recalculate:
thread_id = thread_group * node_count + node_rel_id;
- if (thread_id > thread_count)
+ if (thread_id >= thread_count)
{
- thread_group = rand() % thread_group;
+ thread_group = rand() % (num_thread_groups - 1);
goto recalculate;
}
return thread_id;
@@ -1499,7 +1650,7 @@ void init_thread_id_mem(char *thread_id_
{
Uint32 node_count = get_node_count((Uint32)0);
Uint32 thread_count = tNoOfExecutorThreads;
- Uint32 thread_groups = (thread_count + node_count - 1) / node_count;
+ Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count;
Uint32 thread_group = 0;
for (Uint32 record_id = first_record, i = 0;
i < total_records;
@@ -1509,9 +1660,10 @@ void init_thread_id_mem(char *thread_id_
node_count,
thread_count,
thread_group,
+ num_thread_groups,
my_ndb);
thread_group++;
- if (thread_group == thread_groups)
+ if (thread_group == num_thread_groups)
thread_group = 0;
}
}
@@ -1519,7 +1671,7 @@ void init_thread_id_mem(char *thread_id_
static bool
check_for_outstanding(Uint32 *thread_state)
{
- for (Uint32 i = 0; i < MAX_EXECUTOR_THREADS; i++)
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
{
if (thread_state[i])
return true;
@@ -1548,9 +1700,11 @@ wait_until_all_completed(THREAD_DATA *my
{
KEY_LIST_HEADER list_header;
bool outstanding = true;
- while (outstanding)
+ while (outstanding && !my_thread_data->stop)
{
receive_operations(my_thread_data, &list_header);
+ DEB_PRINT(("received %u operations in thread id = %u",
+ list_header.num_in_list, my_thread_data->thread_id));
update_thread_state(&list_header, thread_state);
move_list(&list_header, free_list_header);
outstanding = check_for_outstanding(thread_state);
@@ -1559,31 +1713,31 @@ wait_until_all_completed(THREAD_DATA *my
static Uint32
prepare_operations(char *thread_id_mem,
- KEY_LIST_HEADER *list_header,
+ KEY_LIST_HEADER *free_list_header,
Uint32 *thread_state,
Uint32 first_record_to_define,
Uint32 num_records_to_define,
+ Uint32 first_record,
Uint32 last_record,
Uint32 max_per_thread)
{
KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
Uint32 record_id, i, num_records;
- init_list_headers(&thread_list_headers[0],
- MAX_EXECUTOR_THREADS);
+ init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
for (record_id = first_record_to_define, i = 0;
- record_id < last_record && i < num_records_to_define;
+ record_id <= last_record && i < num_records_to_define;
record_id++, i++)
{
- KEY_OPERATION *define_op = get_first_free(list_header);
- Uint32 thread_id = (Uint32)thread_id_mem[record_id];
+ KEY_OPERATION *define_op = get_first_free(free_list_header);
+ Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record];
define_op->first_key = record_id;
define_op->second_key = record_id;
define_op->executor_thread_id = thread_id;
thread_state[thread_id]++;
- KEY_LIST_HEADER *list_header = &thread_list_headers[thread_id];
- insert_list(list_header, define_op);
- if (list_header->num_in_list >= max_per_thread)
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
+ insert_list(thread_list_header, define_op);
+ if (thread_list_header->num_in_list >= max_per_thread)
{
/**
* One thread has max number of records, we won't define any
@@ -1593,12 +1747,12 @@ prepare_operations(char *thread_id_mem,
}
}
num_records = i;
- for (i = 0; i < MAX_EXECUTOR_THREADS; i++)
+ for (i = 0; i < tNoOfExecutorThreads; i++)
{
- KEY_LIST_HEADER *list_header = &thread_list_headers[i];
- if (list_header->num_in_list)
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
+ if (thread_list_header->num_in_list)
{
- send_operations(tNoOfDefinerThreads + i, list_header);
+ send_operations(tNoOfDefinerThreads + i, thread_list_header);
}
}
return num_records;
@@ -1608,12 +1762,14 @@ static void*
definer_thread(void *data)
{
THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
- Uint32 my_thread_id = my_thread_data->thread_id;
+ Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
+ DEB_PRINT(("definer thread: my_thread_data = 0x%llx, my_thread_id = %u",
+ (Uint64)my_thread_data, my_thread_id));
RunType run_type = tRunType;
Uint32 thread_state[MAX_EXECUTOR_THREADS];
- Uint32 max_outstanding = (tNoOfThreads * tNoOfParallelTrans) /
+ Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
tNoOfDefinerThreads;
- Uint32 max_per_thread = 1024 / tNoOfDefinerThreads;
+ Uint32 max_per_thread = 1000 / tNoOfDefinerThreads;
Uint32 total_records = max_outstanding * tNoOfTransactions;
Uint32 first_record = total_records * my_thread_id;
Uint32 my_last_record = first_record + total_records - 1;
@@ -1621,34 +1777,41 @@ definer_thread(void *data)
KEY_LIST_HEADER free_list_header;
void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
char *thread_id_mem = (char*)malloc(total_records);
- Ndb *my_ndb = get_ndb_object(my_thread_id);
+ memset((char*)&thread_state[0], 0, sizeof(thread_state));
- init_list_headers(&my_thread_data->list_header, 1);
init_key_op_list((char*)key_op_mem,
&free_list_header,
max_outstanding,
my_thread_id,
run_type);
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
init_thread_id_mem(thread_id_mem,
first_record,
total_records,
my_ndb);
delete my_ndb;
- init_thread_data(my_thread_data);
ThreadExecutions[my_thread_id] = 0;
- ThreadReady[my_thread_id] = 1; /* Flag we've completed startup */
- wait_for_start_signal(my_thread_data);
+ DEB_PRINT(("definer thread %u have completed startup", my_thread_id));
+ signal_thread_ready_wait_for_start(my_thread_data);
+ DEB_PRINT(("definer thread %u starts executing", my_thread_id));
- while (!tStop)
+ while (!my_thread_data->stop)
{
Uint32 defined_ops = prepare_operations(thread_id_mem,
&free_list_header,
&thread_state[0],
current_record,
max_outstanding,
+ first_record,
my_last_record,
max_per_thread);
current_record += defined_ops;
+ if (defined_ops)
+ {
+ wait_until_all_completed(my_thread_data,
+ &thread_state[0],
+ &free_list_header);
+ }
if (current_record > my_last_record)
{
if (run_type != RunRead &&
@@ -1663,14 +1826,12 @@ definer_thread(void *data)
}
current_record = first_record;
}
- wait_until_all_completed(my_thread_data,
- &thread_state[0],
- &free_list_header);
}
+ DEB_PRINT(("definer thread %u is done", my_thread_id));
+ signal_thread_ready_wait_for_start(my_thread_data);
free(key_op_mem);
free(thread_id_mem);
destroy_thread_data(my_thread_data);
- ThreadReady[my_thread_id] = 1;
return NULL;
}
@@ -1743,7 +1904,7 @@ report_back_operations(KEY_OPERATION *fi
KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
KEY_OPERATION *next_op, *executed_op;
- init_list_headers(&thread_list_header[0], MAX_DEFINER_THREADS);
+ init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
executed_op = first_defined_op;
while (executed_op)
{
@@ -1752,7 +1913,7 @@ report_back_operations(KEY_OPERATION *fi
executed_op);
executed_op = next_op;
}
- for (Uint32 i = 0; i < MAX_DEFINER_THREADS; i++)
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
{
if (thread_list_header[i].first_in_list)
{
@@ -1771,34 +1932,66 @@ static void*
executor_thread(void *data)
{
THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
- Uint32 my_thread_id = my_thread_data->thread_id;
+ Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
Uint64 exec_count = 0;
+ Uint32 error_count = 0;
+ Uint32 error_flag = false;
int ret_code;
KEY_LIST_HEADER list_header;
- Ndb *my_ndb = get_ndb_object(my_thread_id);
- init_thread_data(my_thread_data);
+ DEB_PRINT(("executor thread: my_thread_data = 0x%llx, thread_id = %u",
+ (Uint64)my_thread_data, my_thread_id));
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
ThreadExecutions[my_thread_id] = 0;
- ThreadReady[my_thread_id] = 1; /* Flag we've completed startup */
- wait_for_start_signal(my_thread_data);
+ DEB_PRINT(("executor thread id = %u have completed startup",
+ my_thread_id));
+
+ signal_thread_ready_wait_for_start(my_thread_data);
- while (!tStop)
+ DEB_PRINT(("executor thread id = %u are starting to execute",
+ my_thread_id));
+
+ while (!my_thread_data->stop)
{
receive_operations(my_thread_data, &list_header);
- ret_code = execute_operations(my_thread_data->record,
- my_ndb,
- list_header.first_in_list);
- if (ret_code < 0)
+ if (list_header.num_in_list == 0)
+ {
+ DEB_PRINT(("Stop executor thread has been requested by wakeup"));
break;
+ }
+ ret_code = 0;
+ if (!error_flag)
+ {
+ /* Ignore to execute after errors to simplify error handling */
+ ret_code = execute_operations(my_thread_data->record,
+ my_ndb,
+ list_header.first_in_list);
+ }
report_back_operations(list_header.first_in_list);
- if (tRunState == EXECUTING)
+ if (ret_code < 0)
+ {
+ ndbout_c("executor thread id = %u received error", my_thread_id);
+ error_flag = true;
+ }
+ else if (!error_flag &&
+ (tRunType == RunInsert ||
+ tRunType == RunDelete ||
+ tRunState == EXECUTING))
+ {
exec_count += (Uint64)ret_code;
+ }
}
+ DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows",
+ my_thread_id, exec_count));
ThreadExecutions[my_thread_id] = exec_count;
- free(my_thread_data->record);
+ if (error_count)
+ {
+ ndbout_c("Received %u errors in executor thread, id = %u",
+ error_count, my_thread_id);
+ }
+ signal_thread_ready_wait_for_start(my_thread_data);
delete my_ndb;
destroy_thread_data(my_thread_data);
- ThreadReady[my_thread_id] = 1;
return NULL;
}
@@ -1942,6 +2135,7 @@ readArguments(int argc, const char** arg
argc++;
} else if (strcmp(argv[i], "-new") == 0){
tNew = true;
+ tNdbRecord = true;
argc++;
i--;
} else if (strcmp(argv[i], "-drop_table") == 0){
No bundle (reason: useless for push emails).| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3758 to 3792) | Mikael Ronstrom | 5 Mar |