4939 Mauritz Sundell 2012-06-11
ndb - flexAsynch
The revised flexAsynch program used in Mikael Ronstroms benchmarks,
spring 2012.
Part of Mikael Ronstroms "Patches used in benchmark tree with Intel"
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
4938 Mauritz Sundell 2012-06-11
ndb - NdbSleep_MicroSleep
Added sleep function with microseconds resolution.
Uses waitable timer in windows and nanosleep in posix, as
fallback it rounds up to milliseconds and uses
NdbSleep_MilliSleep
modified:
storage/ndb/include/ndb_config.h.in
storage/ndb/include/portlib/NdbSleep.h
storage/ndb/ndb_configure.cmake
4937 Mauritz Sundell 2012-06-07
ndb - fifo idle connection list
This ensures that the list of connections to the TC block is kept in a FIFO queue rather than a
LIFO queue. Using a LIFO queue gave very unbalanced accesses to the TC blocks, there could
be as much as a 10x difference between the different TC blocks in a node.
Part of Mikael Ronstroms "Patches used in benchmark tree with Intel"
modified:
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndbinit.cpp
storage/ndb/src/ndbapi/Ndblist.cpp
=== modified file 'storage/ndb/include/ndb_config.h.in'
--- a/storage/ndb/include/ndb_config.h.in 2012-02-07 15:41:33 +0000
+++ b/storage/ndb/include/ndb_config.h.in 2012-06-11 13:20:48 +0000
@@ -17,6 +17,7 @@
#cmakedefine HAVE_POSIX_MEMALIGN 1
#cmakedefine HAVE_CLOCK_GETTIME 1
+#cmakedefine HAVE_NANOSLEEP 1
#cmakedefine HAVE_PTHREAD_CONDATTR_SETCLOCK 1
#cmakedefine HAVE_PTHREAD_SELF 1
#cmakedefine HAVE_SCHED_GET_PRIORITY_MIN 1
=== modified file 'storage/ndb/include/portlib/NdbSleep.h'
--- a/storage/ndb/include/portlib/NdbSleep.h 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/portlib/NdbSleep.h 2012-06-11 13:20:48 +0000
@@ -20,6 +20,54 @@
#include <ndb_global.h>
+static inline void NdbSleep_MilliSleep(int milliseconds);
+
+static inline
+void NdbSleep_MicroSleep(int microseconds)
+{
+ assert(0 < microseconds);
+#ifdef _WIN32
+ // Waitable timer use 100ns time unit, negative for relative time periods
+ LARGE_INTEGER liDueTime;
+ liDueTime.QuadPart = -10LL * microseconds;
+
+ HANDLE hTimer = CreateWaitableTimer(NULL, TRUE, NULL);
+ if (NULL == hTimer ||
+ !SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0) ||
+ WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0)
+ {
+#ifndef NDEBUG
+ // Error code for crash analysis
+ DWORD winerr = GetLastError();
+#endif
+ assert(false);
+ // Fallback to millisleep in release
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+ }
+ if (NULL != hTimer)
+ {
+ CloseHandle(hTimer);
+ }
+#elif defined(HAVE_NANOSLEEP)
+ struct timespec t;
+ t.tv_sec = microseconds / 1000000;
+ t.tv_nsec = 1000 * (microseconds % 1000000);
+ while (nanosleep(&t, &t) == -1)
+ {
+ if (errno != EINTR)
+ {
+ assert(false);
+ // Fallback to millisleep in release
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+ return ;
+ }
+ }
+#else
+ // Fallback to millisleep
+ NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000);
+#endif
+}
+
static inline
void NdbSleep_MilliSleep(int milliseconds)
{
=== modified file 'storage/ndb/ndb_configure.cmake'
--- a/storage/ndb/ndb_configure.cmake 2012-02-07 15:41:33 +0000
+++ b/storage/ndb/ndb_configure.cmake 2012-06-11 13:20:48 +0000
@@ -47,6 +47,7 @@ INCLUDE(ndb_require_variable)
CHECK_FUNCTION_EXISTS(posix_memalign HAVE_POSIX_MEMALIGN)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
CHECK_FUNCTION_EXISTS(pthread_condattr_setclock HAVE_PTHREAD_CONDATTR_SETCLOCK)
CHECK_FUNCTION_EXISTS(pthread_self HAVE_PTHREAD_SELF)
CHECK_FUNCTION_EXISTS(sched_get_priority_min HAVE_SCHED_GET_PRIORITY_MIN)
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-02-14 08:16:48 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-06-11 13:30:32 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2003, 2011, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -24,6 +24,8 @@
#include <md5_hash.hpp>
#include <NdbThread.h>
+#include <NdbMutex.h>
+#include <NdbCondition.h>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <NdbOut.hpp>
@@ -39,6 +41,9 @@
#define MAXATTR 511
#define MAXTABLES 1
#define NDB_MAXTHREADS 128
+#define MAX_EXECUTOR_THREADS 128
+#define MAX_DEFINER_THREADS 32
+#define MAX_REAL_THREADS 160
#define NDB_MAX_NODES 48
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -52,22 +57,22 @@
#define PKSIZE 2
enum StartType {
- stIdle,
- stInsert,
- stRead,
- stUpdate,
- stDelete,
- stStop
+ stIdle = 0,
+ stInsert = 1,
+ stRead = 2,
+ stUpdate = 3,
+ stDelete = 4,
+ stStop = 5
} ;
enum RunType {
- RunInsert,
- RunRead,
- RunUpdate,
- RunDelete,
- RunCreateTable,
- RunDropTable,
- RunAll
+ RunInsert = 1,
+ RunRead = 2,
+ RunUpdate = 3,
+ RunDelete = 4,
+ RunCreateTable = 5,
+ RunDropTable = 6,
+ RunAll = 7
};
struct ThreadNdb
@@ -85,7 +90,7 @@ static void dropTables(Ndb* pMyNdb);
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
-static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType,
+static void defineNdbRecordOperation(char*, NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
@@ -97,17 +102,20 @@ static bool error_handler(const NdbError
static void input_error();
static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
+static void main_thread(RunType run_type, NdbTimer & timer);
+static Uint64 get_total_transactions();
+static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
static int retry_opt = 3 ;
static int failed = 0 ;
ErrorData * flexAsynchErrorData;
-static NdbThread* threadLife[NDB_MAXTHREADS];
+static NdbThread* threadLife[MAX_REAL_THREADS];
static int tNodeId;
-static int ThreadReady[NDB_MAXTHREADS];
-static longlong ThreadExecutions[NDB_MAXTHREADS];
-static StartType ThreadStart[NDB_MAXTHREADS];
+static int ThreadReady[MAX_REAL_THREADS];
+static longlong ThreadExecutions[MAX_REAL_THREADS];
+static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
static const NdbDictionary::Table * tables[MAXTABLES];
static char attrName[MAXATTR][MAXSTRLEN+1];
@@ -136,6 +144,8 @@ static unsigned int tLoadFac
static bool tempTable = false;
static bool startTransGuess = true;
static int tExtraReadLoop = 0;
+static bool tNew = false;
+static bool tImmediate = false;
//Program Flags
static int theTestFlag = 0;
@@ -177,13 +187,13 @@ resetThreads(){
}
static void
-waitForThreads(void)
+waitForThreads(Uint32 num_threads_to_wait_for)
{
int cont = 0;
do {
cont = 0;
NdbSleep_MilliSleep(20);
- for (unsigned i = 0; i < tNoOfThreads ; i++) {
+ for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
if (ThreadReady[i] == 0) {
cont = 1;
}//if
@@ -204,9 +214,8 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
{
ndb_init();
ThreadNdb* pThreadData;
- int tLoops=0;
- int returnValue = NDBT_OK;
DEFINE_TIMER;
+ int returnValue = NDBT_OK;
flexAsynchErrorData = new ErrorData;
flexAsynchErrorData->resetErrorCounters();
@@ -256,7 +265,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ndbout << endl;
- NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
+ NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
/* print Setting */
flexAsynchErrorData->printSettings(ndbout);
@@ -311,7 +320,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
- if (tNdbRecord)
+ if (tNdbRecord && !tNew)
{
Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
sz += 3;
@@ -325,260 +334,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
if(returnValue == NDBT_OK &&
tRunType != RunCreateTable &&
tRunType != RunDropTable){
- /****************************************************************
- * Create NDB objects. *
- ****************************************************************/
- resetThreads();
- for (Uint32 i = 0; i < tNoOfThreads ; i++) {
- pThreadData[i].ThreadNo = i;
- threadLife[i] = NdbThread_Create(threadLoop,
- (void**)&pThreadData[i],
- 32768,
- "flexAsynchThread",
- NDB_THREAD_PRIO_LOW);
- }//for
- ndbout << endl << "All NDB objects and table created" << endl << endl;
- int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
- /****************************************************************
- * Execute program. *
- ****************************************************************/
-
- for(;;) {
-
- int loopCount = tLoops + 1 ;
- ndbout << endl << "Loop # " << loopCount << endl << endl ;
-
- /****************************************************************
- * Perform inserts. *
- ****************************************************************/
-
- failed = 0 ;
- if (tRunType == RunAll || tRunType == RunInsert){
- ndbout << "Executing inserts" << endl;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- }
- if (tRunType == RunAll){
- a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int ci = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"
- << endl << endl;
- ndbout << "Attempting to redo the failed transactions now..."
- << endl ;
- ndbout << "Redo attempt " << ci <<" out of " << retry_opt
- << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- ci++;
- }
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl
- << endl;
- }//if
- }//if
- }//if
- /****************************************************************
- * Perform read. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunRead){
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- ndbout << "Executing reads" << endl;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- if (tRunType == RunAll){
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }//if
- }//for
- }//if
-
- if (tRunType == RunAll){
- if (0 < failed) {
- int i = retry_opt ;
- int cr = 1;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl;
- ndbout <<"Redo attempt " << cr <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr++ ;
- }//while
- if(0 == failed ) {
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
- }//if
- }//if
- }//if
-
-
- /****************************************************************
- * Perform update. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunUpdate){
- ndbout << "Executing updates" << endl;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- }//if
- if (tRunType == RunAll){
- a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
- if (0 < failed) {
- int i = retry_opt ;
- int cu = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cu <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cu++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- } else {
- ndbout << endl;
- ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
- /****************************************************************
- * Perform read. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll){
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- ndbout << "Executing reads" << endl;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr2 = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr2++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
-
- /****************************************************************
- * Perform delete. *
- ****************************************************************/
-
- failed = 0 ;
-
- if (tRunType == RunAll || tRunType == RunDelete){
- ndbout << "Executing deletes" << endl;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- }//if
- if (tRunType == RunAll){
- a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int cd = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<< endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now:" << endl ;
- ndbout << endl <<"Redo attempt " << cd <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cd++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
- }//if
- }//if
- }//if
-
- tLoops++;
- ndbout << "--------------------------------------------------" << endl;
-
- if(tNoOfLoops != 0){
- if(tNoOfLoops <= tLoops)
- break ;
- }
- }//for
-
- execute(stStop);
- void * tmp;
- for(Uint32 i = 0; i<tNoOfThreads; i++){
- NdbThread_WaitFor(threadLife[i], &tmp);
- NdbThread_Destroy(&threadLife[i]);
+ if (tNew)
+ {
+ main_thread(tRunType, timer);
+ }
+ else
+ {
+ run_old_flexAsynch(pThreadData, timer);
}
}
@@ -609,27 +371,43 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
tRunType == RunUpdate ||
tRunType == RunDelete)
{
- longlong total_transactions = 0;
- longlong exec_time;
+ Uint64 total_transactions = 0;
+ Uint64 exec_time;
- if (tRunType == RunInsert || tRunType == RunDelete) {
- total_transactions = (longlong)tNoOfTransactions;
- total_transactions *= (longlong)tNoOfThreads;
- total_transactions *= (longlong)tNoOfParallelTrans;
- } else {
- for (Uint32 i = 0; i < tNoOfThreads; i++){
- total_transactions += ThreadExecutions[i];
+ if (tNew)
+ {
+ total_transactions = get_total_transactions();
+ }
+ else
+ {
+ if (tRunType == RunInsert || tRunType == RunDelete)
+ {
+ total_transactions = (longlong)tNoOfTransactions;
+ total_transactions *= (longlong)tNoOfThreads;
+ total_transactions *= (longlong)tNoOfParallelTrans;
+ }
+ else
+ {
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ total_transactions += ThreadExecutions[i];
+ }
}
}
if (tRunType == RunInsert || tRunType == RunDelete) {
- exec_time = (longlong)timer.elapsedTime();
+ exec_time = (Uint64)timer.elapsedTime();
} else {
- exec_time = (longlong)tExecutionTime * 1000;
+ exec_time = (Uint64)tExecutionTime * 1000;
}
ndbout << "Total number of transactions is " << total_transactions;
ndbout << endl;
ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+ if (!exec_time)
+ {
+ exec_time = 1; /* Avoid floating point exception */
+ ndbout_c("Zero execution time!!!");
+ }
total_transactions = (total_transactions * 1000) / exec_time;
int trans_per_sec = (int)total_transactions;
ndbout << "Total transactions per second " << trans_per_sec << endl;
@@ -645,7 +423,7 @@ static void execute(StartType aType)
{
resetThreads();
tellThreads(aType);
- waitForThreads();
+ waitForThreads(tNoOfThreads);
}//execute()
static void*
@@ -794,7 +572,7 @@ executeTrans(ThreadNdb* pThread,
// Define the operation, but do not execute it yet.
//-------------------------------------------------------
if (tNdbRecord)
- defineNdbRecordOperation(pThread,
+ defineNdbRecordOperation(pThread->record,
tConArray[num_ops],
aType,
threadBaseLoc2,
@@ -944,24 +722,31 @@ executeCallback(int result, NdbConnectio
NdbConnection **array_ref = (NdbConnection**)aObject;
assert(NdbObject == *array_ref);
*array_ref = NULL;
- if (result == -1) {
-
- // Add complete error handling here
+ if (result == -1 && failed < 100)
+ {
- int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
- if (retCode == 1) {
- if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
- } else if (retCode == 2) {
- ndbout << "4115 should not happen in flexAsynch" << endl;
- } else if (retCode == 3) {
- /* What can we do here? */
- ndbout_c("execute: %s", NdbObject->getNdbError().message);
- }//if(retCode == 3)
+ // Add complete error handling here
- // ndbout << "Error occured in poll:" << endl;
- // ndbout << NdbObject->getNdbError() << endl;
+ int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
+ if (retCode == 1)
+ {
+ if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630)
+ {
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ ndbout_c("Error code = %d", NdbObject->getNdbError().code);
+ }
+ }
+ else if (retCode == 2)
+ {
+ ndbout << "4115 should not happen in flexAsynch" << endl;
+ }
+ else if (retCode == 3)
+ {
+ /* What can we do here? */
+ ndbout_c("execute: %s", NdbObject->getNdbError().message);
+ }//if(retCode == 3)
+ // ndbout << "Error occured in poll:" << endl;
+ // ndbout << NdbObject->getNdbError() << endl;
failed++ ;
}//if
NdbObject->close(); /* Close transaction */
@@ -1067,11 +852,12 @@ defineOperation(NdbConnection* localNdbC
static void
-defineNdbRecordOperation(ThreadNdb* pThread,
- NdbConnection* pTrans, StartType aType,
- Uint32 threadBase, Uint32 aIndex)
+defineNdbRecordOperation(char *record,
+ NdbConnection* pTrans,
+ StartType aType,
+ Uint32 threadBase,
+ Uint32 aIndex)
{
- char * record = pThread->record;
Uint32 offset;
NdbDictionary::getOffset(g_record[0], 0, offset);
* (Uint32*)(record + offset) = threadBase;
@@ -1102,15 +888,24 @@ defineNdbRecordOperation(ThreadNdb* pThr
break;
}//case
case stRead: { // Read Case
- op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead);
+ op = pTrans->readTuple(g_record[0],
+ record,
+ g_record[0],
+ record,
+ NdbOperation::LM_CommittedRead);
break;
}//case
case stUpdate:{ // Update Case
- op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
+ op = pTrans->updateTuple(g_record[0],
+ record,
+ g_record[0],
+ record);
break;
}//case
case stDelete: { // Delete Case
- op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+ op = pTrans->deleteTuple(g_record[0],
+ record,
+ g_record[0]);
break;
}//case
default: {
@@ -1193,6 +988,25 @@ setUpNodeTableArray(Uint32 tableNo, cons
}
static Uint32
+get_node_relative_id(Uint32 tableNo, Uint32 node_id)
+{
+ Uint32 rel_id = 0;
+
+ for (Uint32 i = 1; i < node_id; i++)
+ {
+ if (nodeTableArray[tableNo][i])
+ rel_id++;
+ }
+ return rel_id;
+}
+
+static Uint32
+get_node_count(Uint32 tableNo)
+{
+ return get_node_relative_id(tableNo, NDB_MAX_NODES + 1);
+}
+
+static Uint32
get_my_node_id(Uint32 tableNo, Uint32 threadNo)
{
Uint32 count = 0;
@@ -1327,6 +1141,859 @@ setAggregateRun(void)
theTableCreateFlag = 1;
}
+/* Start NEW Module */
+
+/**
+ * This part contains the code used for the case --local 4 which is using
+ * the design pattern that could be used for asynchronous applications of
+ * the NDB API.
+ *
+ * This variant will always use transaction hints, it will always the
+ * NDB Record format in the NDB API.
+ */
+
+static void* definer_thread(void *data);
+static void* executor_thread(void *data);
+
+static Uint32 tNoOfExecutorThreads = 0;
+static Uint32 tNoOfDefinerThreads = 0;
+
+enum RunState
+{
+ WARMUP = 0,
+ EXECUTING = 1,
+ COOLDOWN = 2
+};
+
+RunState tRunState = WARMUP;
+
+typedef struct KeyOperation KEY_OPERATION;
+struct KeyOperation
+{
+ Uint32 first_key;
+ Uint32 second_key;
+ Uint32 definer_thread_id;
+ Uint32 executor_thread_id;
+ RunType operation_type;
+ KEY_OPERATION *next_key_op;
+};
+
+typedef struct key_list_header KEY_LIST_HEADER;
+struct key_list_header
+{
+ KEY_OPERATION *first_in_list;
+ KEY_OPERATION *last_in_list;
+ Uint32 num_in_list;
+};
+
+
+typedef struct thread_data_struct THREAD_DATA;
+struct thread_data_struct
+{
+ KEY_LIST_HEADER list_header;
+ Uint32 thread_id;
+ bool ready;
+ bool stop;
+ bool start;
+
+ char *record;
+ NdbMutex *transport_mutex;
+ struct NdbCondition *transport_cond;
+ struct NdbCondition *main_cond;
+ struct NdbCondition *start_cond;
+ char not_used[52];
+};
+THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
+
+static Uint64
+get_total_transactions()
+{
+ Uint64 total_transactions = 0;
+
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+ {
+ total_transactions += ThreadExecutions[i];
+ }
+ return total_transactions;
+}
+
+static void
+init_list_headers(KEY_LIST_HEADER *list_header,
+ Uint32 num_list_headers)
+{
+ Uint32 i;
+ KEY_LIST_HEADER *list_header_ref;
+ char *list_header_ptr = (char*)list_header;
+ for (i = 0;
+ i < num_list_headers;
+ i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
+ {
+ list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
+ list_header_ref->first_in_list = NULL;
+ list_header_ref->last_in_list = NULL;
+ list_header_ref->num_in_list = 0;
+ }
+}
+
+static void
+wait_thread_ready(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (1)
+ {
+ if (my_thread_data->ready)
+ break;
+ NdbCondition_Wait(my_thread_data->main_cond,
+ my_thread_data->transport_mutex);
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+wait_for_threads_ready(Uint32 num_threads)
+{
+ for (Uint32 i = 0; i < num_threads; i++)
+ wait_thread_ready(&thread_data_array[i]);
+}
+
+static void
+signal_thread_to_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->start = true;
+ my_thread_data->ready = false;
+ NdbCondition_Signal(my_thread_data->start_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_start(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_start()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
+}
+
+static void
+signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->ready = true;
+ NdbCondition_Signal(my_thread_data->main_cond);
+ while (1)
+ {
+ if (my_thread_data->start)
+ break;
+ NdbCondition_Wait(my_thread_data->start_cond,
+ my_thread_data->transport_mutex);
+ }
+ my_thread_data->start = false;
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_thread_to_stop(THREAD_DATA *my_thread_data)
+{
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ my_thread_data->stop = true;
+ NdbCondition_Signal(my_thread_data->transport_cond);
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_stop()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_stop()
+{
+ for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+ signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+destroy_thread_data(THREAD_DATA *my_thread_data)
+{
+ free(my_thread_data->record);
+ NdbMutex_Destroy(my_thread_data->transport_mutex);
+ NdbCondition_Destroy(my_thread_data->transport_cond);
+ NdbCondition_Destroy(my_thread_data->start_cond);
+ NdbCondition_Destroy(my_thread_data->main_cond);
+}
+
+static void
+init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+ my_thread_data->record = (char*)malloc(sz);
+ memset(my_thread_data->record, 0, sz);
+ init_list_headers(&my_thread_data->list_header, 1);
+ my_thread_data->stop = false;
+ my_thread_data->ready = false;
+ my_thread_data->start = false;
+ my_thread_data->transport_mutex = NdbMutex_Create();
+ my_thread_data->transport_cond = NdbCondition_Create();
+ my_thread_data->main_cond = NdbCondition_Create();
+ my_thread_data->start_cond = NdbCondition_Create();
+ my_thread_data->thread_id = thread_id;
+}
+
+static void
+create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ init_thread_data(my_thread_data, thread_id);
+ threadLife[thread_id] = NdbThread_Create(definer_thread,
+ (void**)my_thread_data,
+ 1024 * 1024,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_definer_threads()
+{
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ {
+ Uint32 thread_id = i;
+ create_definer_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
+
+static void
+create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+ init_thread_data(my_thread_data, thread_id);
+ threadLife[thread_id] = NdbThread_Create(executor_thread,
+ (void**)my_thread_data,
+ 1024 * 1024,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+}
+
+static void
+create_executor_threads()
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ Uint32 thread_id = tNoOfDefinerThreads + i;
+ create_executor_thread(&thread_data_array[thread_id], thread_id);
+ }
+}
+
+static void
+main_thread(RunType start_type, NdbTimer & timer)
+{
+ bool insert_delete;
+
+ tNoOfExecutorThreads = tNoOfThreads;
+ if (tNoOfDefinerThreads == 0)
+ {
+ tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+ }
+ tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
+
+ if (start_type == RunInsert ||
+ start_type == RunDelete)
+ insert_delete = true;
+ else
+ insert_delete = false;
+
+ create_definer_threads();
+ create_executor_threads();
+
+ wait_for_threads_ready(tNoOfThreads);
+
+ /**
+ * Start threads, start with execution threads to ensure they are
+ * up and running before definer threads starts sending data to
+ * them
+ */
+ START_TIMER;
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
+
+ if (!insert_delete)
+ {
+ sleep(tWarmupTime);
+ tRunState = EXECUTING;
+ sleep(tExecutionTime);
+ tRunState = COOLDOWN;
+ sleep(tCooldownTime);
+ signal_definer_threads_to_stop();
+ }
+ wait_for_threads_ready(tNoOfDefinerThreads);
+ STOP_TIMER;
+
+ signal_executor_threads_to_stop();
+ wait_for_threads_ready(tNoOfThreads);
+
+ /**
+ * Now all threads are stopped and prepared to be destroyed,
+ * now start them just to destroy themselves
+ */
+ signal_definer_threads_to_start();
+ signal_executor_threads_to_start();
+
+ void * tmp;
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ NdbThread_WaitFor(threadLife[i], &tmp);
+ NdbThread_Destroy(&threadLife[i]);
+ }
+}
+
+static NdbConnection*
+get_trans_object(Uint32 first_key,
+ Uint32 second_key,
+ Ndb *my_ndb)
+{
+ union {
+ Uint64 _align;
+ Uint32 Tkey32[2];
+ };
+ (void)_align;
+
+ Tkey32[0] = first_key;
+ Tkey32[1] = second_key;
+ Ndb::Key_part_ptr hint[2];
+ hint[0].ptr = Tkey32+0;
+ hint[0].len = 4;
+ hint[1].ptr = 0;
+ hint[1].len = 0;
+
+ return my_ndb->startTransaction(tables[0], hint);
+}
+
+static Ndb*
+get_ndb_object(Uint32 my_thread_id)
+{
+ Ndb *my_ndb = new Ndb(g_cluster_connection+(my_thread_id % tConnections),
+ "TEST_DB");
+ my_ndb->init(MAXPAR);
+ my_ndb->waitUntilReady(10000);
+ return my_ndb;
+}
+
+static void
+insert_list(KEY_LIST_HEADER *list_header,
+ KEY_OPERATION *insert_op)
+{
+ KEY_OPERATION *current_last = list_header->last_in_list;
+ insert_op->next_key_op = NULL;
+ list_header->last_in_list = insert_op;
+ if (current_last)
+ current_last->next_key_op = insert_op;
+ else
+ list_header->first_in_list = insert_op;
+ list_header->num_in_list++;
+}
+
+static KEY_OPERATION*
+get_first_free(KEY_LIST_HEADER *list_header)
+{
+ assert(list_header->first_in_list);
+ KEY_OPERATION *key_op = list_header->first_in_list;
+ list_header->first_in_list = key_op->next_key_op;
+ list_header->num_in_list--;
+ if (!list_header->first_in_list)
+ {
+ list_header->last_in_list = NULL;
+ }
+ key_op->next_key_op = NULL;
+ return key_op;
+}
+
+static void
+move_list(KEY_LIST_HEADER *src_list_header,
+ KEY_LIST_HEADER *dst_list_header)
+{
+ KEY_OPERATION *last_completed_op = dst_list_header->last_in_list;
+ KEY_OPERATION *first_in_list = src_list_header->first_in_list;
+ if (!first_in_list)
+ return;
+ if (last_completed_op)
+ {
+ last_completed_op->next_key_op = first_in_list;
+ }
+ else
+ {
+ dst_list_header->first_in_list = first_in_list;
+ }
+ dst_list_header->last_in_list = src_list_header->last_in_list;
+ dst_list_header->num_in_list += src_list_header->num_in_list;
+ src_list_header->num_in_list = 0;
+ src_list_header->first_in_list = NULL;
+ src_list_header->last_in_list = NULL;
+}
+
+/**
+ * Retrieve a linked list of prepared operations. If no operations
+ * prepared we wait on a condition until operations are defined for
+ * us to execute.
+ */
+static void
+receive_operations(THREAD_DATA *my_thread_data,
+ KEY_LIST_HEADER *list_header,
+ bool wait)
+{
+ bool first = true;
+ KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
+ list_header->first_in_list = NULL;
+ list_header->last_in_list = NULL;
+ list_header->num_in_list = 0;
+
+recheck:
+ NdbMutex_Lock(my_thread_data->transport_mutex);
+ while (!my_thread_data->stop &&
+ (first || thread_list_header->first_in_list))
+ {
+ move_list(thread_list_header, list_header);
+ if (list_header->first_in_list)
+ break;
+ NdbCondition_Wait(my_thread_data->transport_cond,
+ my_thread_data->transport_mutex);
+ first = false;
+ }
+ NdbMutex_Unlock(my_thread_data->transport_mutex);
+ if (first && wait &&
+ thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
+ {
+ /**
+ * We will wait for at least 200 microseconds if we haven't yet received
+ * at least half of the number of records we desire to execute.
+ */
+ NdbSleep_MicroSleep(200);
+ first = false;
+ goto recheck;
+ }
+}
+
+static void
+send_operations(Uint32 thread_id,
+ KEY_LIST_HEADER *list_header)
+{
+ THREAD_DATA *recv_thread = &thread_data_array[thread_id];
+
+ NdbMutex_Lock(recv_thread->transport_mutex);
+ /**
+ * We are moving operations into the list, thus we need
+ * to wake any threads waiting for operations to execute.
+ */
+ move_list(list_header,
+ &recv_thread->list_header);
+ NdbCondition_Signal(recv_thread->transport_cond);
+ NdbMutex_Unlock(recv_thread->transport_mutex);
+}
+
+static void
+init_key_op_list(char *key_op_ptr,
+ KEY_LIST_HEADER *list_header,
+ Uint32 max_outstanding,
+ Uint32 my_thread_id,
+ RunType my_run_type)
+{
+ KEY_OPERATION *key_op;
+
+ list_header->first_in_list = (KEY_OPERATION*)key_op_ptr;
+ for (Uint32 i = 0;
+ i < max_outstanding;
+ i++, key_op_ptr += sizeof(KEY_OPERATION))
+ {
+ key_op = (KEY_OPERATION*)key_op_ptr;
+ key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION));
+ key_op->definer_thread_id = my_thread_id;
+ key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+ key_op->operation_type = my_run_type;
+ }
+ key_op->next_key_op = NULL; /* Last key operation */
+ list_header->last_in_list = key_op;
+ list_header->num_in_list = max_outstanding;
+}
+
+static Uint32
+get_thread_id_for_record(Uint32 record_id,
+ Uint32 node_count,
+ Uint32 thread_count,
+ Uint32 thread_group,
+ Uint32 num_thread_groups,
+ Ndb *my_ndb)
+{
+ Uint32 thread_id;
+ NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb);
+ Uint32 node_id = trans->getConnectedNodeId();
+ trans->close();
+ Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
+ if (node_count >= thread_count)
+ {
+ thread_id = node_rel_id % thread_count;
+ return thread_id;
+ }
+
+recalculate:
+ thread_id = thread_group * node_count + node_rel_id;
+ if (thread_id >= thread_count)
+ {
+ /**
+ * Only the last thread group may have less than
+ * node_count threads, so choosing a random group
+ * except the last will always give a valid
+ * thread_id.
+ */
+ thread_group = rand() % (num_thread_groups - 1);
+ goto recalculate;
+ }
+ return thread_id;
+}
+
+void init_thread_id_mem(char *thread_id_mem,
+ Uint32 first_record,
+ Uint32 total_records,
+ Ndb *my_ndb)
+{
+ Uint32 node_count = get_node_count((Uint32)0);
+ Uint32 thread_count = tNoOfExecutorThreads;
+ Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count;
+ Uint32 thread_group = 0;
+ for (Uint32 record_id = first_record, i = 0;
+ i < total_records;
+ i++, record_id++)
+ {
+ thread_id_mem[i] = (char)get_thread_id_for_record(record_id,
+ node_count,
+ thread_count,
+ thread_group,
+ num_thread_groups,
+ my_ndb);
+ thread_group++;
+ if (thread_group == num_thread_groups)
+ thread_group = 0;
+ }
+}
+
+static bool
+check_for_outstanding(Uint32 *thread_state)
+{
+ for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ if (thread_state[i])
+ return true;
+ }
+ return false;
+}
+
+static void
+update_thread_state(KEY_LIST_HEADER *list_header,
+ Uint32 *thread_state)
+{
+ KEY_OPERATION *key_op = list_header->first_in_list;
+
+ while (key_op)
+ {
+ thread_state[key_op->executor_thread_id]--;
+ key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
+ key_op = key_op->next_key_op;
+ }
+}
+
+static void
+wait_until_all_completed(THREAD_DATA *my_thread_data,
+ Uint32 *thread_state,
+ KEY_LIST_HEADER *free_list_header)
+{
+ KEY_LIST_HEADER list_header;
+ bool outstanding = true;
+ while (outstanding && !my_thread_data->stop)
+ {
+ receive_operations(my_thread_data, &list_header, false);
+ update_thread_state(&list_header, thread_state);
+ move_list(&list_header, free_list_header);
+ outstanding = check_for_outstanding(thread_state);
+ }
+}
+
+static Uint32
+prepare_operations(char *thread_id_mem,
+ KEY_LIST_HEADER *free_list_header,
+ Uint32 *thread_state,
+ Uint32 first_record_to_define,
+ Uint32 num_records_to_define,
+ Uint32 first_record,
+ Uint32 last_record,
+ Uint32 max_per_thread)
+{
+ KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
+ Uint32 record_id, i, num_records;
+
+ init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
+ for (record_id = first_record_to_define, i = 0;
+ record_id <= last_record && i < num_records_to_define;
+ record_id++, i++)
+ {
+ KEY_OPERATION *define_op = get_first_free(free_list_header);
+ Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record];
+ define_op->first_key = record_id;
+ define_op->second_key = record_id;
+ define_op->executor_thread_id = thread_id;
+ thread_state[thread_id]++;
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
+ insert_list(thread_list_header, define_op);
+ if (thread_list_header->num_in_list >= max_per_thread)
+ {
+ /**
+ * One thread has max number of records, we won't define any
+ * more to keep the code simple.
+ */
+ break;
+ }
+ }
+ num_records = i;
+ for (i = 0; i < tNoOfExecutorThreads; i++)
+ {
+ KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
+ if (thread_list_header->num_in_list)
+ {
+ send_operations(tNoOfDefinerThreads + i, thread_list_header);
+ }
+ }
+ return num_records;
+}
+
+static void*
+definer_thread(void *data)
+{
+ THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+ Uint32 my_thread_id = my_thread_data->thread_id;
+ RunType run_type = tRunType;
+ Uint32 thread_state[MAX_EXECUTOR_THREADS];
+ Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
+ tNoOfDefinerThreads;
+ Uint32 max_per_thread = 1000 / tNoOfDefinerThreads;
+ Uint32 total_records = max_outstanding * tNoOfTransactions;
+ Uint32 first_record = total_records * my_thread_id;
+ Uint32 my_last_record = first_record + total_records - 1;
+ Uint32 current_record = first_record;
+ KEY_LIST_HEADER free_list_header;
+ void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
+ char *thread_id_mem = (char*)malloc(total_records);
+ memset((char*)&thread_state[0], 0, sizeof(thread_state));
+
+ init_key_op_list((char*)key_op_mem,
+ &free_list_header,
+ max_outstanding,
+ my_thread_id,
+ run_type);
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
+ init_thread_id_mem(thread_id_mem,
+ first_record,
+ total_records,
+ my_ndb);
+ delete my_ndb;
+ ThreadExecutions[my_thread_id] = 0;
+ signal_thread_ready_wait_for_start(my_thread_data);
+
+ while (!my_thread_data->stop)
+ {
+ Uint32 defined_ops = prepare_operations(thread_id_mem,
+ &free_list_header,
+ &thread_state[0],
+ current_record,
+ max_outstanding,
+ first_record,
+ my_last_record,
+ max_per_thread);
+ current_record += defined_ops;
+ if (defined_ops)
+ {
+ wait_until_all_completed(my_thread_data,
+ &thread_state[0],
+ &free_list_header);
+ }
+ if (current_record > my_last_record)
+ {
+ if (run_type != RunRead &&
+ run_type != RunUpdate)
+ {
+ /**
+ * Inserts and deletes are done when first round is
+ * completed. Reads and updates proceed until time is
+ * completed.
+ */
+ break;
+ }
+ current_record = first_record;
+ }
+ }
+ signal_thread_ready_wait_for_start(my_thread_data);
+ free(key_op_mem);
+ free(thread_id_mem);
+ destroy_thread_data(my_thread_data);
+ return NULL;
+}
+
+/**
+ * This method receives a linked list of key operations and executes
+ * all of them.
+ *
+ * Return Value: >= 0 means successful completion of this many operations
+ * -1 Failure, stop test
+ */
+static int
+execute_operations(char *record,
+ Ndb* my_ndb,
+ KEY_OPERATION *key_op)
+{
+ NdbConnection* ndb_conn_array[MAXPAR];
+ Uint32 num_ops = 0;
+
+ while (key_op)
+ {
+ ndb_conn_array[num_ops] = get_trans_object(key_op->first_key,
+ key_op->second_key,
+ my_ndb);
+ if (ndb_conn_array[num_ops] == NULL){
+ error_handler(my_ndb->getNdbError());
+ ndbout << endl << "Unable to recover! Quitting now" << endl ;
+ return -1;
+ }
+ //-------------------------------------------------------
+ // Define the operation, but do not execute it yet.
+ //-------------------------------------------------------
+ defineNdbRecordOperation(record,
+ ndb_conn_array[num_ops],
+ (StartType)key_op->operation_type,
+ key_op->first_key,
+ key_op->second_key);
+
+ ndb_conn_array[num_ops]->executeAsynchPrepare(Commit,
+ &executeCallback,
+ (void*)&ndb_conn_array[num_ops]);
+ num_ops++;
+ key_op = key_op->next_key_op;
+ }
+ if (num_ops == 0)
+ return 0;
+
+ /**
+ * Now execute each defined operation and wait for all of them to
+ * complete.
+ */
+ int Tcomp = my_ndb->sendPollNdb(3000,
+ num_ops,
+ tSendForce);
+ if (Tcomp != (int)num_ops &&
+ my_ndb->getNdbError().code != 0)
+ {
+ /* Error handling */
+ if (error_count > 100)
+ return -1;
+
+ error_count++;
+ ndbout << "error = " << my_ndb->getNdbError().code << endl;
+ }
+ return Tcomp;
+}
+
+static void
+report_back_operations(KEY_OPERATION *first_defined_op)
+{
+ KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
+ KEY_OPERATION *next_op, *executed_op;
+
+ init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
+ executed_op = first_defined_op;
+ while (executed_op)
+ {
+ next_op = executed_op->next_key_op;
+ insert_list(&thread_list_header[executed_op->definer_thread_id],
+ executed_op);
+ executed_op = next_op;
+ }
+ for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+ {
+ if (thread_list_header[i].first_in_list)
+ {
+ send_operations(i, &thread_list_header[i]);
+ }
+ }
+}
+
+/**
+ * This is the main function of the executor threads, these threads
+ * receive linked lists of operations to execute from the definer
+ * threads. The definer threads stops these threads by simply
+ * sending a stop operation.
+ */
+static void*
+executor_thread(void *data)
+{
+ THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
+ Uint32 my_thread_id = my_thread_data->thread_id;
+ Uint64 exec_count = 0;
+ Uint32 error_count = 0;
+ Uint32 executions = 0;
+ Uint32 error_flag = false;
+ int ret_code;
+ KEY_LIST_HEADER list_header;
+
+ Ndb *my_ndb = get_ndb_object(my_thread_id);
+ ThreadExecutions[my_thread_id] = 0;
+
+ signal_thread_ready_wait_for_start(my_thread_data);
+
+ while (!my_thread_data->stop)
+ {
+ receive_operations(my_thread_data, &list_header, !tImmediate);
+ if (list_header.num_in_list == 0)
+ {
+ break;
+ }
+ ret_code = 0;
+ if (!error_flag)
+ {
+ /* Ignore to execute after errors to simplify error handling */
+ ret_code = execute_operations(my_thread_data->record,
+ my_ndb,
+ list_header.first_in_list);
+ }
+ report_back_operations(list_header.first_in_list);
+ if (ret_code < 0)
+ {
+ ndbout_c("executor thread id = %u received error", my_thread_id);
+ error_flag = true;
+ }
+ else if (!error_flag &&
+ (tRunType == RunInsert ||
+ tRunType == RunDelete ||
+ tRunState == EXECUTING))
+ {
+ executions++;
+ exec_count += (Uint64)ret_code;
+ }
+ }
+
+ ThreadExecutions[my_thread_id] = exec_count;
+ if (error_count)
+ {
+ ndbout_c("Received %u errors in executor thread, id = %u",
+ error_count, my_thread_id);
+ }
+ signal_thread_ready_wait_for_start(my_thread_data);
+ delete my_ndb;
+ destroy_thread_data(my_thread_data);
+ return NULL;
+}
+
+/* End NEW Module */
+
static
int
readArguments(int argc, const char** argv){
@@ -1339,6 +2006,15 @@ readArguments(int argc, const char** arg
ndbout_c("Invalid no of threads");
return -1;
}
+ }
+ else if (strcmp(argv[i], "-d") == 0)
+ {
+ tNoOfDefinerThreads = atoi(argv[i+1]);
+ if (tNoOfDefinerThreads > NDB_MAXTHREADS)
+ {
+ ndbout_c("Invalid no of definer threads");
+ return -1;
+ }
} else if (strcmp(argv[i], "-p") == 0){
tNoOfParallelTrans = atoi(argv[i+1]);
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
@@ -1463,6 +2139,18 @@ readArguments(int argc, const char** arg
} else if (strcmp(argv[i], "-create_table") == 0){
tRunType = RunCreateTable;
argc++;
+ }
+ else if (strcmp(argv[i], "-new") == 0)
+ {
+ tNew = true;
+ tNdbRecord = true;
+ argc++;
+ i--;
+ }
+ else if (strcmp(argv[i], "-immediate") == 0)
+ {
+ tImmediate = true;
+ argc++;
i--;
} else if (strcmp(argv[i], "-drop_table") == 0){
tRunType = RunDropTable;
@@ -1536,4 +2224,303 @@ input_error(){
ndbout_c(" -table Number of standard table, default 0");
}
+static void
+run_old_flexAsynch(ThreadNdb *pThreadData,
+ NdbTimer & timer)
+{
+ int tLoops=0;
+ /****************************************************************
+ * Create NDB objects. *
+ ****************************************************************/
+ resetThreads();
+ for (Uint32 i = 0; i < tNoOfThreads ; i++)
+ {
+ pThreadData[i].ThreadNo = i;
+ threadLife[i] = NdbThread_Create(threadLoop,
+ (void**)&pThreadData[i],
+ 32768,
+ "flexAsynchThread",
+ NDB_THREAD_PRIO_LOW);
+ }//for
+ ndbout << endl << "All NDB objects and table created" << endl << endl;
+ int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
+ /****************************************************************
+ * Execute program. *
+ ****************************************************************/
+
+ for (;;)
+ {
+
+ int loopCount = tLoops + 1 ;
+ ndbout << endl << "Loop # " << loopCount << endl << endl ;
+
+ /****************************************************************
+ * Perform inserts. *
+ ****************************************************************/
+
+ failed = 0 ;
+ if (tRunType == RunAll || tRunType == RunInsert)
+ {
+ ndbout << "Executing inserts" << endl;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ }
+ if (tRunType == RunAll)
+ {
+ a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int ci = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"
+ << endl << endl;
+ ndbout << "Attempting to redo the failed transactions now..."
+ << endl ;
+ ndbout << "Redo attempt " << ci <<" out of " << retry_opt
+ << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ ci++;
+ }
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl
+ << endl;
+ }//if
+ }//if
+ }//if
+ /****************************************************************
+ * Perform read. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunRead)
+ {
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ if (tRunType == RunAll)
+ {
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }//if
+ }//for
+ }//if
+
+ if (tRunType == RunAll)
+ {
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cr = 1;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl;
+ ndbout <<"Redo attempt " << cr <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }
+ else
+ {
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ }//if
+ }//if
+ }//if
+
+
+ /****************************************************************
+ * Perform update. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunUpdate)
+ {
+ ndbout << "Executing updates" << endl;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll)
+ {
+ a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cu = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cu <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cu++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+ /****************************************************************
+ * Perform read. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll)
+ {
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }
+
+ if (0 < failed)
+ {
+ int i = retry_opt ;
+ int cr2 = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr2++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+
+ /****************************************************************
+ * Perform delete. *
+ ****************************************************************/
+
+ failed = 0 ;
+
+ if (tRunType == RunAll || tRunType == RunDelete)
+ {
+ ndbout << "Executing deletes" << endl;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll)
+ {
+ a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cd = 1 ;
+ while (0 < failed && 0 < i)
+ {
+ ndbout << failed << " of the transactions returned errors!"<< endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+ ndbout << endl <<"Redo attempt " << cd <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cd++ ;
+ }//while
+ if (0 == failed )
+ {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }
+ else
+ {
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
+ }//if
+ }//if
+
+ tLoops++;
+ ndbout << "--------------------------------------------------" << endl;
+
+ if (tNoOfLoops != 0)
+ {
+ if (tNoOfLoops <= tLoops)
+ break ;
+ }
+ }//for
+
+ execute(stStop);
+ void * tmp;
+ for (Uint32 i = 0; i < tNoOfThreads; i++)
+ {
+ NdbThread_WaitFor(threadLife[i], &tmp);
+ NdbThread_Destroy(&threadLife[i]);
+ }
+}
template class Vector<NdbDictionary::RecordSpecification>;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4937 to 4939) | Mauritz Sundell | 11 Jun |