4649 Jonas Oreland 2011-11-09
ndb - include mikaels extensions for flexAsynch...
modified:
storage/ndb/test/ndbapi/flexAsynch.cpp
4648 Ole John Aske 2011-11-09
Fix for bug#13355055: CLUSTER INTERNALS FAILS TO TERMINATE BATCH AT MAX 'BATCHBYTESIZE'
We have observed SCANREQs with a surprisingly small 'BatchSize' argument as part
of debugging and tuning SPJ. Where we expected 'BatchSize=64' (Default) we
have observed values around ~10. This directly translated into suboptimal performance.
When debugging this, we found the root cause in NdbRecord::calculate_batch_size(), which
returns the batchsize (#rows) and arguments for the SCANREQ signal.
It contained the following questionable logic:
1) Calculate the worst case record length based on that *all columns* are selected
from a table, and all varchar() columns being filled to their *max limit*.
2) If that record length is such that 'batchsize * recLength' > ,
reduce batchsize such that batchbytesize would never be exceeded.
This effectively put ::calculate_batch_size() in control of the batchbytesize
logic. The negative impact if that logic was that 'batchsize' could be severely
restricted in cases where we could have delivered a lot more rows in that batch.
However, there are logic in LQH+TUP which are intended to keep the delivered batches
withing the batchsize limits. This is a much better place to control this as
LQH & TUP knows the exact size of the TRANSID_AI payload being delivered, taking
actual varchar length and only the selected columns into acount.
Debugging that logic, it turned out that it contained bugs in how the produced
batchsize was counted: Actually a mixup between whether the 'length' was in
specified in number of bytes or Uint32. - So the above questionable
::calculate_batch_size() logic seems to have been invented only to
circumvent this bug......
Fixing that bug allowed us to now leave the entire batch control to
the LQH block.
- ::calculate_batch_size could then be significantly simplified.
- The specified BatchSize & BatchByteSize arguments could be used as
specified directly as args in SCANREQ signals.
- Will likely give better performance (larger effective batches) when
scanning a table with 'max record length > BatchByteSize / BatchSize'
(~500 bytes with default config)
Fix number of bytes/Uint32 mixup in how m_curr_batch_size_bytes is counted
******
Fix number of bytes/Uint32 mixup in how the SPJ adaptive parallelism count m_totalBytes
******
Simplify ::calculate_batch_size() as LQH now correctly will stay within the specified batch_size rows/bytes limits
******
Remove NdbRecord::m_max_transid_ai_bytes which is now obsolete
******
Remove unused args from NdbRecord::calculate_batch_size()
******
Fix SPJs adaptive paralellism logic to also handle batchsize termination due to BatchByteSize being exhausted
modified:
storage/ndb/include/kernel/signaldata/ScanFrag.hpp
storage/ndb/include/kernel/signaldata/TupKey.hpp
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
storage/ndb/src/ndbapi/NdbRecord.hpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2011-11-09 14:19:23 +0000
@@ -37,7 +37,7 @@
#define MAX_SEEK 16
#define MAXSTRLEN 16
#define MAXATTR 64
-#define MAXTABLES 64
+#define MAXTABLES 1
#define NDB_MAXTHREADS 128
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -59,6 +59,16 @@ enum StartType {
stStop
} ;
+enum RunType {
+ RunInsert,
+ RunRead,
+ RunUpdate,
+ RunDelete,
+ RunCreateTable,
+ RunDropTable,
+ RunAll
+};
+
struct ThreadNdb
{
int NoOfOps;
@@ -70,6 +80,7 @@ extern "C" { static void* threadLoop(voi
static void setAttrNames(void);
static void setTableNames(void);
static int readArguments(int argc, const char** argv);
+static void dropTables(Ndb* pMyNdb);
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
@@ -77,6 +88,8 @@ static void defineNdbRecordOperation(Thr
Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
+ unsigned int threadBase, int threadNo);
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
@@ -92,9 +105,15 @@ ErrorData * flexAsynchErrorData;
static NdbThread* threadLife[NDB_MAXTHREADS];
static int tNodeId;
static int ThreadReady[NDB_MAXTHREADS];
+static longlong ThreadExecutions[NDB_MAXTHREADS];
static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
static char attrName[MAXATTR][MAXSTRLEN+1];
+static RunType tRunType = RunAll;
+static int tStdTableNum = 0;
+static int tWarmupTime = 10; //Seconds
+static int tExecutionTime = 30; //Seconds
+static int tCooldownTime = 10; //Seconds
// Program Parameters
static NdbRecord * g_record[MAXTABLES];
@@ -126,9 +145,10 @@ static int
#define START_REAL_TIME
#define STOP_REAL_TIME
-#define START_TIMER { NdbTimer timer; timer.doStart();
+#define DEFINE_TIMER NdbTimer timer
+#define START_TIMER timer.doStart();
#define STOP_TIMER timer.doStop();
-#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
+#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
NDBT_Stats a_i, a_u, a_d, a_r;
@@ -183,6 +203,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ThreadNdb* pThreadData;
int tLoops=0;
int returnValue = NDBT_OK;
+ DEFINE_TIMER;
flexAsynchErrorData = new ErrorData;
flexAsynchErrorData->resetErrorCounters();
@@ -201,7 +222,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ndbout << " " << tNoOfParallelTrans;
ndbout << " number of parallel operation per thread " << endl;
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
- ndbout << " " << tNoOfLoops << " iterations " << endl;
+ if (tRunType == RunAll){
+ ndbout << " " << tNoOfLoops << " iterations " << endl;
+ } else if (tRunType == RunRead || tRunType == RunUpdate){
+ ndbout << " Warmup time is " << tWarmupTime << endl;
+ ndbout << " Execution time is " << tExecutionTime << endl;
+ ndbout << " Cooldown time is " << tCooldownTime << endl;
+ }
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
ndbout << " " << tAttributeSize;
@@ -262,10 +289,20 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
if (pNdb->waitUntilReady(10000) != 0){
ndbout << "NDB is not ready" << endl;
ndbout << "Benchmark failed!" << endl;
- returnValue = NDBT_FAILED;
+ return NDBT_ProgramExit(NDBT_FAILED);
}
- if(returnValue == NDBT_OK){
+ if (tRunType == RunCreateTable)
+ {
+ if (createTables(pNdb) != 0){
+ returnValue = NDBT_FAILED;
+ }
+ }
+ else if (tRunType == RunDropTable)
+ {
+ dropTables(pNdb);
+ }
+ else if(returnValue == NDBT_OK){
if (createTables(pNdb) != 0){
returnValue = NDBT_FAILED;
}
@@ -282,14 +319,15 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
- if(returnValue == NDBT_OK){
+ if(returnValue == NDBT_OK &&
+ tRunType != RunCreateTable &&
+ tRunType != RunDropTable){
/****************************************************************
* Create NDB objects. *
****************************************************************/
resetThreads();
for (Uint32 i = 0; i < tNoOfThreads ; i++) {
- pThreadData[i].ThreadNo = i
-;
+ pThreadData[i].ThreadNo = i;
threadLife[i] = NdbThread_Create(threadLoop,
(void**)&pThreadData[i],
32768,
@@ -312,76 +350,86 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int ci = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"
- << endl << endl;
- ndbout << "Attempting to redo the failed transactions now..."
- << endl ;
- ndbout << "Redo attempt " << ci <<" out of " << retry_opt
- << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- ci++;
- }
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl
- << endl;
+ if (tRunType == RunAll || tRunType == RunInsert){
+ ndbout << "Executing inserts" << endl;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ }
+ if (tRunType == RunAll){
+ a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int ci = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"
+ << endl << endl;
+ ndbout << "Attempting to redo the failed transactions now..."
+ << endl ;
+ ndbout << "Redo attempt " << ci <<" out of " << retry_opt
+ << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ ci++;
+ }
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }else{
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl
+ << endl;
+ }//if
}//if
- }//if
-
+ }//if
/****************************************************************
* Perform read. *
****************************************************************/
failed = 0 ;
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr = 1;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl;
- ndbout <<"Redo attempt " << cr <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
+ if (tRunType == RunAll || tRunType == RunRead){
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
START_TIMER;
execute(stRead);
STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr++ ;
- }//while
- if(0 == failed ) {
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ if (tRunType == RunAll){
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }//if
+ }//for
+ }//if
+
+ if (tRunType == RunAll){
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cr = 1;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl;
+ ndbout <<"Redo attempt " << cr <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr++ ;
+ }//while
+ if(0 == failed ) {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }else{
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ }//if
}//if
}//if
@@ -391,35 +439,40 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
- if (0 < failed) {
- int i = retry_opt ;
- int cu = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cu <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cu++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- } else {
- ndbout << endl;
- ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+
+ if (tRunType == RunAll || tRunType == RunUpdate){
+ ndbout << "Executing updates" << endl;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll){
+ a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cu = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cu <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cu++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ } else {
+ ndbout << endl;
+ ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -428,38 +481,41 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr2 = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
+
+ if (tRunType == RunAll){
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
START_TIMER;
execute(stRead);
STOP_TIMER;
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr2++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cr2 = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr2++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }else{
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -470,34 +526,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int cd = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<< endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now:" << endl ;
- ndbout << endl <<"Redo attempt " << cd <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cd++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ if (tRunType == RunAll || tRunType == RunDelete){
+ ndbout << "Executing deletes" << endl;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll){
+ a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cd = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<< endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+ ndbout << endl <<"Redo attempt " << cd <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cd++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }else{
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -516,17 +577,61 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
NdbThread_WaitFor(threadLife[i], &tmp);
NdbThread_Destroy(&threadLife[i]);
}
- }
+ }
+
+ if (tRunType == RunAll)
+ {
+ dropTables(pNdb);
+ }
delete [] pThreadData;
delete pNdb;
- //printing errorCounters
- flexAsynchErrorData->printErrorCounters(ndbout);
-
- print("insert", a_i);
- print("update", a_u);
- print("delete", a_d);
- print("read ", a_r);
+ if (tRunType == RunAll ||
+ tRunType == RunInsert ||
+ tRunType == RunDelete ||
+ tRunType == RunUpdate ||
+ tRunType == RunRead)
+ {
+ //printing errorCounters
+ flexAsynchErrorData->printErrorCounters(ndbout);
+ if (tRunType == RunAll) {
+ print("insert", a_i);
+ print("update", a_u);
+ print("delete", a_d);
+ print("read ", a_r);
+ }
+ }
+ if (tRunType == RunInsert ||
+ tRunType == RunRead ||
+ tRunType == RunUpdate ||
+ tRunType == RunDelete)
+ {
+ longlong total_executions = 0;
+ longlong total_transactions;
+ longlong exec_time;
+
+ if (tRunType == RunInsert || tRunType == RunDelete) {
+ total_executions = (longlong)tNoOfTransactions;
+ total_executions *= (longlong)tNoOfThreads;
+ } else {
+ for (Uint32 i = 0; i < tNoOfThreads; i++){
+ total_executions += ThreadExecutions[i];
+ }
+ }
+ total_transactions = total_executions * tNoOfParallelTrans;
+ if (tRunType == RunInsert || tRunType == RunDelete) {
+ exec_time = (longlong)timer.elapsedTime();
+ } else {
+ exec_time = (longlong)tExecutionTime * 1000;
+ }
+ ndbout << "Total number of transactions is " << total_transactions;
+ ndbout << endl;
+ ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+
+ total_transactions = (total_transactions * 1000) / exec_time;
+ int trans_per_sec = (int)total_transactions;
+ ndbout << "Total transactions per second " << trans_per_sec << endl;
+ }
delete [] g_cluster_connection;
@@ -551,7 +656,7 @@ threadLoop(void* ThreadData)
localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
localNdb->init(1024);
localNdb->waitUntilReady(10000);
- unsigned int threadBase = (threadNo << 16) + tNodeId ;
+ unsigned int threadBase = (threadNo << 16);
for (;;){
while (ThreadStart[threadNo] == stIdle) {
@@ -565,8 +670,14 @@ threadLoop(void* ThreadData)
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
- if(!executeThread(tabThread, tType, localNdb, threadBase)){
- break;
+ if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
+ if(!executeThread(tabThread, tType, localNdb, threadBase)){
+ break;
+ }
+ } else {
+ if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+ break;
+ }
}
ThreadReady[threadNo] = 1;
}//for
@@ -577,80 +688,131 @@ threadLoop(void* ThreadData)
return NULL;
}//threadLoop()
-static
-bool
-executeThread(ThreadNdb* pThread,
- StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+static int error_count = 0;
+static bool
+executeTrans(ThreadNdb* pThread,
+ StartType aType,
+ Ndb* aNdbObject,
+ unsigned int threadBase,
+ unsigned int i)
+{
NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
- unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
-
- for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
- {
- for (unsigned int i = 0; i < tNoOfTransactions; i++) {
- if (tLocal == false) {
- tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
- } else {
- tBase = i * tNoOfParallelTrans * MAX_SEEK;
- }//if
- START_REAL_TIME;
- for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
- if (tLocal == false) {
- tBase2 = tBase + (j * tNoOfOpsPerTrans);
- } else {
- tBase2 = tBase + (j * MAX_SEEK);
- tBase2 = getKey(threadBase, tBase2);
- }//if
- if (startTransGuess == true) {
- union {
- Uint64 Tkey64;
- Uint32 Tkey32[2];
- };
- Tkey32[0] = threadBase;
- Tkey32[1] = tBase2;
- tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
- (const char*)&Tkey64, //Main PKey
- (Uint32)4); //Key Length
- } else {
- tConArray[j] = aNdbObject->startTransaction();
- }//if
- if (tConArray[j] == NULL &&
- !error_handler(aNdbObject->getNdbError()) ){
- ndbout << endl << "Unable to recover! Quiting now" << endl ;
- return false;
- }//if
-
- for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
- //-------------------------------------------------------
- // Define the operation, but do not execute it yet.
- //-------------------------------------------------------
- if (tNdbRecord)
- defineNdbRecordOperation(pThread,
- tConArray[j], aType, threadBase,(tBase2+k));
- else
- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
- }//for
-
- tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
- }//for
- STOP_REAL_TIME;
+ if (tLocal == false) {
+ tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+ } else {
+ tBase = i * tNoOfParallelTrans * MAX_SEEK;
+ }//if
+ START_REAL_TIME;
+ for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+ if (tLocal == false) {
+ tBase2 = tBase + (j * tNoOfOpsPerTrans);
+ } else {
+ tBase2 = tBase + (j * MAX_SEEK);
+ tBase2 = getKey(threadBase, tBase2);
+ }//if
+ if (startTransGuess == true) {
+ union {
+ Uint64 Tkey64;
+ Uint32 Tkey32[2];
+ };
+ Tkey32[0] = threadBase;
+ Tkey32[1] = tBase2;
+ tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+ (const char*)&Tkey64, //Main PKey
+ (Uint32)4); //Key Length
+ } else {
+ tConArray[j] = aNdbObject->startTransaction();
+ }//if
+ if (tConArray[j] == NULL){
+ error_handler(aNdbObject->getNdbError());
+ ndbout << endl << "Unable to recover! Quiting now" << endl ;
+ return false;
+ }//if
+
+ for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
//-------------------------------------------------------
- // Now we have defined a set of operations, it is now time
- // to execute all of them.
+ // Define the operation, but do not execute it yet.
//-------------------------------------------------------
- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
- while (unsigned(Tcomp) < tNoOfParallelTrans) {
- int TlocalComp = aNdbObject->pollNdb(3000, 0);
- Tcomp += TlocalComp;
- }//while
- for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
- aNdbObject->closeTransaction(tConArray[j]);
- }//for
+ if (tNdbRecord)
+ defineNdbRecordOperation(pThread,
+ tConArray[j], aType, threadBase,(tBase2+k));
+ else
+ defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
}//for
- } // for
+
+ tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+ }//for
+ STOP_REAL_TIME;
+ //-------------------------------------------------------
+ // Now we have defined a set of operations, it is now time
+ // to execute all of them.
+ //-------------------------------------------------------
+ int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+ while (unsigned(Tcomp) < tNoOfParallelTrans) {
+ int TlocalComp = aNdbObject->pollNdb(3000, 0);
+ Tcomp += TlocalComp;
+ }//while
+ for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
+ if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+ error_count++;
+ ndbout << "i = " << i << ", j = " << j << ", error = ";
+ ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+ ndbout << hex << threadBase << endl;
+ }
+ aNdbObject->closeTransaction(tConArray[j]);
+ }//for
+ return true;
+}
+
+static
+bool
+executeTransLoop(ThreadNdb* pThread,
+ StartType aType,
+ Ndb* aNdbObject,
+ unsigned int threadBase,
+ int threadNo) {
+ bool continue_flag = true;
+ int time_expired;
+ longlong executions = 0;
+ unsigned int i = 0;
+ DEFINE_TIMER;
+
+ ThreadExecutions[threadNo] = 0;
+ START_TIMER;
+ do
+ {
+ if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+ return false;
+ STOP_TIMER;
+ time_expired = timer.elapsedTime() / 1000;
+ if (time_expired < tWarmupTime)
+ ; //Do nothing
+ else if (time_expired < (tWarmupTime + tExecutionTime)){
+ executions++; //Count measurement
+ }
+ else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
+ ; //Do nothing
+ else
+ continue_flag = false; //Time expired
+ if (i == tNoOfTransactions) /* Make sure the record exists */
+ i = 0;
+ } while (continue_flag);
+ ThreadExecutions[threadNo] = executions;
+ return true;
+}//executeTransLoop()
+
+static
+bool
+executeThread(ThreadNdb* pThread,
+ StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+ for (unsigned int i = 0; i < tNoOfTransactions; i++) {
+ if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+ return false;
+ }//for
return true;
}//executeThread()
@@ -880,8 +1042,20 @@ static void setTableNames()
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i,
(unsigned)(NdbTick_CurrentMillisecond()+rand()));
} else {
- BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
+ BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", tStdTableNum);
}
+ ndbout << "Using table name " << tableName[0] << endl;
+ }
+}
+
+static void
+dropTables(Ndb* pMyNdb)
+{
+ int i;
+ for (i = 0; i < MAXTABLES; i++)
+ {
+ ndbout << "Dropping table " << tableName[i] << "..." << endl;
+ pMyNdb->getDictionary()->dropTable(tableName[i]);
}
}
@@ -893,8 +1067,8 @@ createTables(Ndb* pMyNdb){
NdbSchemaOp *MySchemaOp;
int check;
- if (theTableCreateFlag == 0) {
- for(int i=0; i < 1 ;i++) {
+ if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
+ for(int i=0; i < MAXTABLES ;i++) {
ndbout << "Creating " << tableName[i] << "..." << endl;
MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
@@ -953,31 +1127,35 @@ createTables(Ndb* pMyNdb){
return -1;
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+ }
+ }
+ if (tNdbRecord)
+ {
+ for(int i=0; i < MAXTABLES ;i++) {
+ NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+ const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
- if (tNdbRecord)
+ if (pTab == NULL){
+ error_handler(pDict->getNdbError());
+ return -1;
+ }
+ int off = 0;
+ Vector<NdbDictionary::RecordSpecification> spec;
+ for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
{
- NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
- const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
-
- int off = 0;
- Vector<NdbDictionary::RecordSpecification> spec;
- for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
- {
- NdbDictionary::RecordSpecification r0;
- r0.column = pTab->getColumn(j);
- r0.offset = off;
- off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
- spec.push_back(r0);
- }
- g_record[i] =
+ NdbDictionary::RecordSpecification r0;
+ r0.column = pTab->getColumn(j);
+ r0.offset = off;
+ off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+ spec.push_back(r0);
+ }
+ g_record[i] =
pDict->createRecord(pTab, spec.getBase(),
spec.size(),
sizeof(NdbDictionary::RecordSpecification));
- assert(g_record[i]);
- }
+ assert(g_record[i]);
}
}
-
return 0;
}
@@ -996,6 +1174,14 @@ bool error_handler(const NdbError & err)
return false ; // return false to abort
}
+static void
+setAggregateRun(void)
+{
+ tNoOfLoops = 1;
+ tExtraReadLoop = 0;
+ theTableCreateFlag = 1;
+}
+
static
int
readArguments(int argc, const char** argv){
@@ -1110,6 +1296,43 @@ readArguments(int argc, const char** arg
tExtraReadLoop = atoi(argv[i+1]);
} else if (strcmp(argv[i], "-con") == 0){
tConnections = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-insert") == 0){
+ setAggregateRun();
+ tRunType = RunInsert;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-read") == 0){
+ setAggregateRun();
+ tRunType = RunRead;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-update") == 0){
+ setAggregateRun();
+ tRunType = RunUpdate;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-delete") == 0){
+ setAggregateRun();
+ tRunType = RunDelete;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-create_table") == 0){
+ tRunType = RunCreateTable;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-drop_table") == 0){
+ tRunType = RunDropTable;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-warmup_time") == 0){
+ tWarmupTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-execution_time") == 0){
+ tExecutionTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-cooldown_time") == 0){
+ tCooldownTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-table") == 0){
+ tStdTableNum = atoi(argv[i+1]);
+ theStdTableNameFlag = 1;
} else {
return -1;
}
@@ -1131,7 +1354,6 @@ readArguments(int argc, const char** arg
static
void
input_error(){
-
ndbout_c("FLEXASYNCH");
ndbout_c(" Perform benchmark of insert, update and delete transactions");
ndbout_c(" ");
@@ -1156,7 +1378,18 @@ input_error(){
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
ndbout_c(" -local Number of part, only use keys in one part out of 16");
- ndbout_c(" -ndbrecord");
+ ndbout_c(" -ndbrecord Use NDB Record");
+ ndbout_c(" -r Number of extra loops");
+ ndbout_c(" -insert Only run inserts on standard table");
+ ndbout_c(" -read Only run reads on standard table");
+ ndbout_c(" -update Only run updates on standard table");
+ ndbout_c(" -delete Only run deletes on standard table");
+ ndbout_c(" -create_table Only run Create Table of standard table");
+ ndbout_c(" -drop_table Only run Drop Table on standard table");
+ ndbout_c(" -warmup_time Warmup Time before measurement starts");
+ ndbout_c(" -execution_time Execution Time where measurement is done");
+ ndbout_c(" -cooldown_time Cooldown time after measurement completed");
+ ndbout_c(" -table Number of standard table, default 0");
}
template class Vector<NdbDictionary::RecordSpecification>;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4648 to 4649) | Jonas Oreland | 11 Nov |