3801 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3800 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3799 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3798 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3797 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3796 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3795 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3794 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3793 Mikael Ronstrom 2012-03-05
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
3792 Mikael Ronstrom 2012-03-02
Fixes for flexAsynch
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael.ronstrom@stripped
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael.ronstrom@stripped
@@ -148,6 +148,7 @@ static bool tempTabl
static bool startTransGuess = true;
static int tExtraReadLoop = 0;
static bool tNew = false;
+static bool tImmediate = false;
//Program Flags
static int theTestFlag = 0;
@@ -720,24 +721,24 @@ executeCallback(int result, NdbConnectio
NdbConnection **array_ref = (NdbConnection**)aObject;
assert(NdbObject == *array_ref);
*array_ref = NULL;
- if (result == -1) {
+ if (result == -1 && failed < 100) {
- // Add complete error handling here
+ // Add complete error handling here
- int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
- if (retCode == 1) {
- if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
- } else if (retCode == 2) {
- ndbout << "4115 should not happen in flexAsynch" << endl;
- } else if (retCode == 3) {
- /* What can we do here? */
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- }//if(retCode == 3)
-
- // ndbout << "Error occured in poll:" << endl;
- // ndbout << NdbObject->getNdbError() << endl;
+ int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
+ if (retCode == 1) {
+ if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ ndbout_c("Error code = %d", NdbObject->getNdbError().code);
+ }
+ } else if (retCode == 2) {
+ ndbout << "4115 should not happen in flexAsynch" << endl;
+ } else if (retCode == 3) {
+ /* What can we do here? */
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ }//if(retCode == 3)
+ // ndbout << "Error occured in poll:" << endl;
+ // ndbout << NdbObject->getNdbError() << endl;
failed++ ;
}//if
NdbObject->close(); /* Close transaction */
@@ -1146,8 +1147,8 @@ setAggregateRun(void)
static void* definer_thread(void *data);
static void* executor_thread(void *data);
-static Uint32 tNoOfExecutorThreads;
-static Uint32 tNoOfDefinerThreads;
+static Uint32 tNoOfExecutorThreads = 0;
+static Uint32 tNoOfDefinerThreads = 0;
enum RunState
{
@@ -1182,15 +1183,17 @@ typedef struct thread_data_struct THREAD
struct thread_data_struct
{
KEY_LIST_HEADER list_header;
- char *record;
Uint32 thread_id;
bool ready;
bool stop;
bool start;
+
+ char *record;
NdbMutex *transport_mutex;
struct NdbCondition *transport_cond;
struct NdbCondition *main_cond;
struct NdbCondition *start_cond;
+ char not_used[52];
};
THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
@@ -1350,7 +1353,6 @@ static void
start_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
{
init_thread_data(my_thread_data, thread_id);
- DEB_PRINT(("Now starting definer thread, id = %u", thread_id));
threadLife[thread_id] = NdbThread_Create(definer_thread,
(void**)my_thread_data,
1024 * 1024,
@@ -1372,7 +1374,6 @@ static void
start_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
{
init_thread_data(my_thread_data, thread_id);
- DEB_PRINT(("Now starting executor thread, id = %u", thread_id));
threadLife[thread_id] = NdbThread_Create(executor_thread,
(void**)my_thread_data,
1024 * 1024,
@@ -1395,15 +1396,13 @@ main_thread(RunType start_type, NdbTimer
{
bool insert_delete;
- DEB_PRINT(("Executing type = %u in new manner", (uint)start_type));
tNoOfExecutorThreads = tNoOfThreads;
- tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+ if (tNoOfDefinerThreads == 0)
+ {
+ tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+ }
tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
- DEB_PRINT((
- "num threads = %u, num definer threads = %u, num executor threads = %u",
- tNoOfThreads, tNoOfDefinerThreads, tNoOfExecutorThreads));
-
if (start_type == RunInsert ||
start_type == RunDelete)
insert_delete = true;
@@ -1451,7 +1450,6 @@ main_thread(RunType start_type, NdbTimer
NdbThread_WaitFor(threadLife[i], &tmp);
NdbThread_Destroy(&threadLife[i]);
}
- DEB_PRINT(("main: Definer and Executor threads are gone"));
}
static NdbConnection*
@@ -1515,36 +1513,6 @@ get_first_free(KEY_LIST_HEADER *list_hea
return key_op;
}
-/**
- * Retrieve a linked list of prepared operations. If no operations
- * prepared we wait on a condition until operations are defined for
- * us to execute.
- */
-static void
-receive_operations(THREAD_DATA *my_thread_data,
- KEY_LIST_HEADER *list_header)
-{
- KEY_OPERATION *first_in_list = NULL;
- KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
-
- NdbMutex_Lock(my_thread_data->transport_mutex);
- while (!first_in_list && !my_thread_data->stop)
- {
- first_in_list = thread_list_header->first_in_list;
- list_header->first_in_list = thread_list_header->first_in_list;
- list_header->last_in_list = thread_list_header->last_in_list;
- list_header->num_in_list = thread_list_header->num_in_list;
- thread_list_header->first_in_list = NULL;
- thread_list_header->last_in_list = NULL;
- thread_list_header->num_in_list = 0;
- if (first_in_list)
- break;
- NdbCondition_Wait(my_thread_data->transport_cond,
- my_thread_data->transport_mutex);
- }
- NdbMutex_Unlock(my_thread_data->transport_mutex);
-}
-
static void
move_list(KEY_LIST_HEADER *src_list_header,
KEY_LIST_HEADER *dst_list_header)
@@ -1564,6 +1532,53 @@ move_list(KEY_LIST_HEADER *src_list_head
dst_list_header->last_in_list = src_list_header->last_in_list;
dst_list_header->num_in_list += src_list_header->num_in_list;
src_list_header->num_in_list = 0;
+ src_list_header->first_in_list = NULL;
+ src_list_header->last_in_list = NULL;
+}
+
+/**
+ * Retrieve a linked list of prepared operations. If no operations
+ * prepared we wait on a condition until operations are defined for
+ * us to execute.
+ */
+static void
+receive_operations(THREAD_DATA *my_thread_data,
+ KEY_LIST_HEADER *list_header,
+ bool wait)
+{
+ bool first = true;
+ KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
+ list_header->first_in_list = NULL;
+ list_header->last_in_list = NULL;
+ list_header->num_in_list = 0;
+
+recheck:
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (!my_thread_data->stop &&
+ (first || thread_list_header->first_in_list))
+ {
+ move_list(thread_list_header, list_header);
+ if (list_header->first_in_list)
+ break;
+ NdbCondition_Wait(my_thread_data->transport_cond,
+ my_thread_data->transport_mutex);
+ first = false;
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+ if (first && wait &&
+ thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
+ {
+ /**
+ * We will wait for at least 20 microseconds if we haven't yet received
+ * at least half of the number of records we desire to execute.
+ */
+ struct timespec timer;
+ timer.tv_sec = 0;
+ timer.tv_nsec = 20000;
+ first = false;
+ nanosleep(&timer, NULL);
+ goto recheck;
+ }
}
static void
@@ -1572,20 +1587,14 @@ send_operations(Uint32 thread_id,
{
THREAD_DATA *recv_thread = &thread_data_array[thread_id];
- DEB_PRINT(("Sending %u operations to thread id %u",
- list_header->num_in_list, thread_id));
NdbMutex_Lock(recv_thread->transport_mutex);
- if (!recv_thread->list_header.first_in_list &&
- list_header->first_in_list)
- {
- /**
- * We are moving operations into an empty list, thus we need
- * to wake any threads waiting for operations to execute.
- */
- NdbCondition_Signal(recv_thread->transport_cond);
- }
+ /**
+ * We are moving operations into the list, thus we need
+ * to wake any threads waiting for operations to execute.
+ */
move_list(list_header,
&recv_thread->list_header);
+ NdbCondition_Signal(recv_thread->transport_cond);
NdbMutex_Unlock(recv_thread->transport_mutex);
}
@@ -1702,9 +1711,7 @@ wait_until_all_completed(THREAD_DATA *my
bool outstanding = true;
while (outstanding && !my_thread_data->stop)
{
- receive_operations(my_thread_data, &list_header);
- DEB_PRINT(("received %u operations in thread id = %u",
- list_header.num_in_list, my_thread_data->thread_id));
+ receive_operations(my_thread_data, &list_header, false);
update_thread_state(&list_header, thread_state);
move_list(&list_header, free_list_header);
outstanding = check_for_outstanding(thread_state);
@@ -1763,8 +1770,6 @@ definer_thread(void *data)
{
THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
- DEB_PRINT(("definer thread: my_thread_data = 0x%llx, my_thread_id = %u",
- (Uint64)my_thread_data, my_thread_id));
RunType run_type = tRunType;
Uint32 thread_state[MAX_EXECUTOR_THREADS];
Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
@@ -1791,9 +1796,7 @@ definer_thread(void *data)
my_ndb);
delete my_ndb;
ThreadExecutions[my_thread_id] = 0;
- DEB_PRINT(("definer thread %u have completed startup", my_thread_id));
signal_thread_ready_wait_for_start(my_thread_data);
- DEB_PRINT(("definer thread %u starts executing", my_thread_id));
while (!my_thread_data->stop)
{
@@ -1827,7 +1830,6 @@ definer_thread(void *data)
current_record = first_record;
}
}
- DEB_PRINT(("definer thread %u is done", my_thread_id));
signal_thread_ready_wait_for_start(my_thread_data);
free(key_op_mem);
free(thread_id_mem);
@@ -1935,28 +1937,21 @@ executor_thread(void *data)
Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
Uint64 exec_count = 0;
Uint32 error_count = 0;
+ Uint32 executions = 0;
Uint32 error_flag = false;
int ret_code;
KEY_LIST_HEADER list_header;
- DEB_PRINT(("executor thread: my_thread_data = 0x%llx, thread_id = %u",
- (Uint64)my_thread_data, my_thread_id));
Ndb *my_ndb = get_ndb_object(my_thread_id);
ThreadExecutions[my_thread_id] = 0;
- DEB_PRINT(("executor thread id = %u have completed startup",
- my_thread_id));
signal_thread_ready_wait_for_start(my_thread_data);
- DEB_PRINT(("executor thread id = %u are starting to execute",
- my_thread_id));
-
while (!my_thread_data->stop)
{
- receive_operations(my_thread_data, &list_header);
+ receive_operations(my_thread_data, &list_header, !tImmediate);
if (list_header.num_in_list == 0)
{
- DEB_PRINT(("Stop executor thread has been requested by wakeup"));
break;
}
ret_code = 0;
@@ -1978,11 +1973,12 @@ executor_thread(void *data)
tRunType == RunDelete ||
tRunState == EXECUTING))
{
+ executions++;
exec_count += (Uint64)ret_code;
}
}
- DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows",
- my_thread_id, exec_count));
+ DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows in %u executes",
+ my_thread_id, exec_count, executions));
ThreadExecutions[my_thread_id] = exec_count;
if (error_count)
{
@@ -2009,6 +2005,12 @@ readArguments(int argc, const char** arg
ndbout_c("Invalid no of threads");
return -1;
}
+ } else if (strcmp(argv[i], "-d") == 0){
+ tNoOfDefinerThreads = atoi(argv[i+1]);
+ if (tNoOfDefinerThreads > NDB_MAXTHREADS){
+ ndbout_c("Invalid no of definer threads");
+ return -1;
+ }
} else if (strcmp(argv[i], "-p") == 0){
tNoOfParallelTrans = atoi(argv[i+1]);
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
@@ -2138,6 +2140,10 @@ readArguments(int argc, const char** arg
tNdbRecord = true;
argc++;
i--;
+ } else if (strcmp(argv[i], "-immediate") == 0){
+ tImmediate = true;
+ argc++;
+ i--;
} else if (strcmp(argv[i], "-drop_table") == 0){
tRunType = RunDropTable;
argc++;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3792 to 3801) | Mikael Ronstrom | 5 Mar |