#At file:///home/tomas/mysql_src/cge-6.2-global-schema-lock/
2682 Tomas Ulin 2008-10-02 [merge]
merge
modified:
mysql-test/suite/ndb/t/disabled.def
sql/mysqld.cc
storage/ndb/src/kernel/blocks/trix/Trix.cpp
storage/ndb/src/kernel/blocks/trix/Trix.hpp
storage/ndb/test/ndbapi/Makefile.am
storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
storage/ndb/test/ndbapi/bench/ndb_async2.cpp
storage/ndb/test/ndbapi/bench/testData.h
storage/ndb/test/ndbapi/flexAsynch.cpp
storage/ndb/test/ndbapi/msa.cpp
=== modified file 'mysql-test/suite/ndb/t/disabled.def'
--- a/mysql-test/suite/ndb/t/disabled.def 2008-07-25 10:32:05 +0000
+++ b/mysql-test/suite/ndb/t/disabled.def 2008-10-01 10:04:10 +0000
@@ -12,6 +12,5 @@
partition_03ndb : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table
ndb_partition_error2 : HF is not sure if the test can work as internded on all the platforms
-ndb_index_ordered : Bug#38370 The test ndb.ndb_index_ordered fails with the community features on
# the below testcase have been reworked to avoid the bug, test contains comment, keep bug open
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2008-09-01 12:28:57 +0000
+++ b/sql/mysqld.cc 2008-10-01 10:04:10 +0000
@@ -7584,13 +7584,14 @@ static void mysql_init_variables(void)
have_community_features = SHOW_OPTION_YES;
#else
have_community_features = SHOW_OPTION_NO;
+#endif
global_system_variables.ndb_index_stat_enable=FALSE;
max_system_variables.ndb_index_stat_enable=TRUE;
global_system_variables.ndb_index_stat_cache_entries=32;
max_system_variables.ndb_index_stat_cache_entries=~0L;
global_system_variables.ndb_index_stat_update_freq=20;
max_system_variables.ndb_index_stat_update_freq=~0L;
-#endif
+
#ifdef HAVE_OPENSSL
have_ssl=SHOW_OPTION_YES;
#else
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2008-09-18 14:29:25 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2008-09-26 12:55:06 +0000
@@ -435,6 +435,27 @@ Trix::execDUMP_STATE_ORD(Signal* signal)
// Ignore
}
}
+
+ if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
+ {
+ RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
+ return;
+ }
+
+ if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
+ {
+ RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
+ return;
+ }
+
+ if (signal->theData[0] == 8004)
+ {
+ infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
+ c_theSubscriptionRecPool.getSize(),
+ c_theSubscriptionRecPool.getNoOfFree());
+ return;
+ }
+
}
// Build index
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.hpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2008-02-20 09:04:29 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2008-09-26 12:55:06 +0000
@@ -131,6 +131,7 @@ private:
* The pool of node records
*/
ArrayPool<SubscriptionRecord> c_theSubscriptionRecPool;
+ RSS_AP_SNAPSHOT(c_theSubscriptionRecPool);
/**
* The list of other subscriptions
=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am 2008-02-21 14:24:09 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am 2008-09-30 08:18:41 +0000
@@ -57,7 +57,8 @@ testIndexStat \
ndbapi_50compat0 \
ndbapi_50compat1 \
testNDBT \
-NdbRepStress
+NdbRepStress \
+msa
EXTRA_PROGRAMS = \
test_event \
@@ -115,6 +116,7 @@ testSRBank_SOURCES = testSRBank.cpp
test_event_merge_SOURCES = test_event_merge.cpp
test_event_multi_table_SOURCES = test_event_multi_table.cpp
testIndexStat_SOURCES = testIndexStat.cpp
+msa_SOURCES = msa.cpp
ndbapi_50compat0_CPPFLAGS = -DNDBAPI_50_COMPAT
ndbapi_50compat0_SOURCES = ndbapi_50compat0.cpp
=== modified file 'storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp'
--- a/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp 2008-09-30 08:18:41 +0000
@@ -25,6 +25,8 @@
#include "userInterface.h"
#include "dbGenerator.h"
+#include "ndb_schema.hpp"
+
static int numProcesses;
static int numSeconds;
@@ -33,6 +35,8 @@ static int parallellism;
static int millisSendPoll;
static int minEventSendPoll;
static int forceSendPoll;
+static bool useNdbRecord;
+static bool useCombUpd;
static ThreadData *data;
static Ndb_cluster_connection *g_cluster_connection= 0;
@@ -53,8 +57,8 @@ static void usage(const char *prog)
++progname;
ndbout_c(
- "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
- "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
+ "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]"
+ "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
" -proc <num> Specifies that <num> is the number of\n"
" threads. The default is 1.\n"
" -time <num> Specifies that the test will run for <num> sec.\n"
@@ -68,7 +72,11 @@ static void usage(const char *prog)
"sendPoll\n"
" Default is 1\n"
" -f <num> force parameter to sendPoll\n"
- " Default is 0\n",
+ " Default is 0\n"
+ " -ndbrecord Use NdbRecord Api.\n"
+ " Default is to use old Api\n"
+ " -combupdread Use update pre-read operation where possible\n"
+ " Default is to use separate read+update ops\n",
progname);
}
@@ -85,7 +93,8 @@ parse_args(int argc, const char **argv)
millisSendPoll = 10000;
minEventSendPoll = 1;
forceSendPoll = 0;
-
+ useNdbRecord = false;
+ useCombUpd = false;
i = 1;
while (i < argc){
@@ -156,6 +165,15 @@ parse_args(int argc, const char **argv)
}
i += 2;
}
+ else if (strcmp("-ndbrecord",argv[i]) == 0) {
+ useNdbRecord= true;
+ i++;
+ }
+ else if (strcmp("-combupdread",argv[i]) == 0) {
+ /* Comb up some dread */
+ useCombUpd= true;
+ i++;
+ }
else {
return 1;
}
@@ -169,6 +187,10 @@ parse_args(int argc, const char **argv)
ndbout_c("exiting...");
return 1;
}
+ if (useNdbRecord && useCombUpd){
+ ndbout_c("NdbRecord does not currently support combined update "
+ "and read. Using separate read and update ops");
+ }
return 0;
}
@@ -232,6 +254,7 @@ print_stats(const char *title,
ndbout_c("Processor : %s", name);
ndbout_c("Number of Proc: %d",numProc);
ndbout_c("Parallellism : %d", parallellism);
+ ndbout_c("UseNdbRecord : %u", useNdbRecord);
ndbout_c("\n");
if( gen->totalTransactions == 0 ) {
@@ -316,19 +339,192 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
ndbout << "Cluster nodes not ready in 30 seconds." << endl;
return 0;
}
-
+
+ NdbRecordSharedData* ndbRecordSharedDataPtr= NULL;
+
g_cluster_connection= &con;
data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
-
+
+ NdbInterpretedCode* prog1= 0;
+ NdbInterpretedCode* prog2= 0;
+ NdbInterpretedCode* prog3= 0;
+
+ if (useNdbRecord)
+ {
+ /* We'll create NdbRecord structures to match the TransactionData
+ * struct
+ */
+
+ ndbRecordSharedDataPtr= (NdbRecordSharedData*)
+ malloc(sizeof(NdbRecordSharedData));
+ Ndb* tempNdb= asyncDbConnect(1);
+ NdbDictionary::Dictionary* dict= tempNdb->getDictionary();
+
+ NdbDictionary::RecordSpecification cols[7];
+
+ const NdbDictionary::Table* tab= dict->getTable(SUBSCRIBER_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SUBSCRIBER_NUMBER);
+ cols[0].offset= offsetof(TransactionData, number);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SUBSCRIBER_NAME);
+ cols[1].offset= offsetof(TransactionData, name);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_SUBSCRIBER_GROUP);
+ cols[2].offset= offsetof(TransactionData, group_id);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+ cols[3].column= tab->getColumn((int) IND_SUBSCRIBER_LOCATION);
+ cols[3].offset= offsetof(TransactionData, location);
+ cols[3].nullbit_byte_offset= 0;
+ cols[3].nullbit_bit_in_byte= 0;
+ cols[4].column= tab->getColumn((int) IND_SUBSCRIBER_SESSIONS);
+ cols[4].offset= offsetof(TransactionData, sessions);
+ cols[4].nullbit_byte_offset= 0;
+ cols[4].nullbit_bit_in_byte= 0;
+ cols[5].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_BY);
+ cols[5].offset= offsetof(TransactionData, changed_by);
+ cols[5].nullbit_byte_offset= 0;
+ cols[5].nullbit_bit_in_byte= 0;
+ cols[6].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_TIME);
+ cols[6].offset= offsetof(TransactionData, changed_time);
+ cols[6].nullbit_byte_offset= 0;
+ cols[6].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->subscriberTableNdbRecord=
+ dict->createRecord(tab, cols, 7, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->subscriberTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 1 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(GROUP_TABLE);
+ cols[0].column= tab->getColumn((int) IND_GROUP_ID);
+ cols[0].offset= offsetof(TransactionData, group_id);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ /* GROUP_NAME not used via NdbRecord */
+ cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_READ);
+ cols[1].offset= offsetof(TransactionData, permission);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_GROUP_ALLOW_INSERT);
+ cols[2].offset= offsetof(TransactionData, permission);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+ cols[3].column= tab->getColumn((int) IND_GROUP_ALLOW_DELETE);
+ cols[3].offset= offsetof(TransactionData, permission);
+ cols[3].nullbit_byte_offset= 0;
+ cols[3].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->groupTableNdbRecord=
+ dict->createRecord(tab, cols, 4, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->groupTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 2: " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(SESSION_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SESSION_SUBSCRIBER);
+ cols[0].offset= offsetof(TransactionData, number);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SESSION_SERVER);
+ cols[1].offset= offsetof(TransactionData, server_id);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_SESSION_DATA);
+ cols[2].offset= offsetof(TransactionData, session_details);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->sessionTableNdbRecord=
+ dict->createRecord(tab, cols, 3, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->sessionTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 3 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(SERVER_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SERVER_SUBSCRIBER_SUFFIX);
+ cols[0].offset= offsetof(TransactionData, suffix);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SERVER_ID);
+ cols[1].offset= offsetof(TransactionData, server_id);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ /* SERVER_NAME not used via NdbRecord*/
+ /* SERVER_READS not used via NdbRecord */
+ /* SERVER_INSERTS not used via NdbRecord */
+ /* SERVER_DELETES not used via NdbRecord */
+
+ ndbRecordSharedDataPtr->serverTableNdbRecord=
+ dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->serverTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 4 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ /* Create program to increment server reads column */
+ prog1= new NdbInterpretedCode(tab);
+
+ if (prog1->add_val(IND_SERVER_READS, (Uint32)1) ||
+ prog1->interpret_exit_ok() ||
+ prog1->finalise())
+ {
+ ndbout << "Program 1 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ prog2= new NdbInterpretedCode(tab);
+
+ if (prog2->add_val(IND_SERVER_INSERTS, (Uint32)1) ||
+ prog2->interpret_exit_ok() ||
+ prog2->finalise())
+ {
+ ndbout << "Program 2 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ prog3= new NdbInterpretedCode(tab);
+
+ if (prog3->add_val(IND_SERVER_DELETES, (Uint32)1) ||
+ prog3->interpret_exit_ok() ||
+ prog3->finalise())
+ {
+ ndbout << "Program 3 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ ndbRecordSharedDataPtr->incrServerReadsProg= prog1;
+ ndbRecordSharedDataPtr->incrServerInsertsProg= prog2;
+ ndbRecordSharedDataPtr->incrServerDeletesProg= prog3;
+
+ asyncDbDisconnect(tempNdb);
+ }
+
for(i = 0; i < numProcesses; i++) {
for(j = 0; j<parallellism; j++){
- data[i*parallellism+j].warmUpSeconds = numWarmSeconds;
- data[i*parallellism+j].testSeconds = numSeconds;
- data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
- data[i*parallellism+j].randomSeed =
+ int tid= i*parallellism + j;
+ data[tid].warmUpSeconds = numWarmSeconds;
+ data[tid].testSeconds = numSeconds;
+ data[tid].coolDownSeconds = numWarmSeconds;
+ data[tid].randomSeed =
NdbTick_CurrentMillisecond()+i+j;
- data[i*parallellism+j].changedTime = 0;
- data[i*parallellism+j].runState = Runnable;
+ data[tid].changedTime = 0;
+ data[tid].runState = Runnable;
+ data[tid].ndbRecordSharedData = ndbRecordSharedDataPtr;
+ data[tid].useCombinedUpdate = useCombUpd;
}
sprintf(threadName, "AsyncThread[%d]", i);
pThread = NdbThread_Create(threadRoutine,
@@ -355,6 +551,14 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
}
ndbout_c("All threads have finished");
+
+ if (useNdbRecord)
+ {
+ free(ndbRecordSharedDataPtr);
+ delete(prog1);
+ delete(prog2);
+ delete(prog3);
+ }
/*-------------------------------------------*/
/* Clear all structures for total statistics */
@@ -412,7 +616,6 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
#include <sys/types.h>
#include <time.h>
-#include "ndb_schema.hpp"
#include "ndb_error.hpp"
#include "userInterface.h"
#include <NdbMutex.h>
=== modified file 'storage/ndb/test/ndbapi/bench/ndb_async2.cpp'
--- a/storage/ndb/test/ndbapi/bench/ndb_async2.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/ndb_async2.cpp 2008-09-30 08:18:41 +0000
@@ -66,6 +66,10 @@ startTransaction(Ndb * pNDB, ThreadData
#endif
}
+// NdbRecord helper macros
+#define SET_MASK(mask, attrId) \
+ mask[attrId >> 3] |= (1 << (attrId & 7))
+
void
start_T1(Ndb * pNDB, ThreadData * td, int async){
@@ -78,17 +82,46 @@ start_T1(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
- if (MyOp != NULL) {
- MyOp->updateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->setValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
+ const NdbOperation* op= NULL;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ //SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+
+ op= pCON->updateTuple(record,
+ rowPtr,
+ record,
+ rowPtr,
+ mask);
+ }
+ else
+ {
+ NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ op= MyOp;
+ if (MyOp != NULL) {
+ MyOp->updateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->setValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ }
+ }
+
+ if (op != NULL)
+ {
if (async == 1) {
pCON->executeAsynchPrepare( Commit , T1_Callback, td);
} else {
@@ -97,7 +130,7 @@ start_T1(Ndb * pNDB, ThreadData * td, in
return;
}//if
} else {
- CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
+ CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
}//if
}
@@ -146,21 +179,43 @@ start_T2(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_NAME,
- td->transactionData.name);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_NAME);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ CHECK_NULL((void*) MyOp, "T2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_NAME,
+ td->transactionData.name);
+ }
+
if (async == 1) {
pCON->executeAsynchPrepare( Commit , T2_Callback, td);
} else {
@@ -183,6 +238,7 @@ T2_Callback(int result, NdbConnection *
start_T2(td->pNDB, td, stat_async);
return;
}//if
+
td->pNDB->closeTransaction(pCON);
complete_T2(td);
}
@@ -217,24 +273,48 @@ start_T3(Ndb * pNDB, ThreadData * td, in
CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
NdbSleep_MilliSleep(10);
}
-
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
+
+ const NdbOperation* op;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ op= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ }
+ else
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ op= MyOp;
+ CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ }
+
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
@@ -259,15 +339,35 @@ T3_Callback_1(int result, NdbConnection
return;
}//if
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_READ,
- (char *)&td->transactionData.permission);
+ const NdbOperation* op= NULL;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_READ);
+
+ op= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ op= MyOp;
+ CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_READ,
+ (char *)&td->transactionData.permission);
+ }
+
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
} else {
@@ -305,30 +405,66 @@ T3_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->simpleRead();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- MyOp->getValue(IND_SESSION_DATA,
- (char *)td->transactionData.session_details);
-
- /* Operation 4 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_READS, (uint32)1);
+ /* Operations 3 + 4 */
+ if (td->ndbRecordSharedData)
+ {
+ /* Op 3 */
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SESSION_DATA);
+
+ const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_SimpleRead,
+ mask);
+ CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
+ pCON->getNdbError());
+
+ /* Op 4 */
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ /* Attach interpreted program */
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->simpleRead();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ MyOp->getValue(IND_SESSION_DATA,
+ (char *)td->transactionData.session_details);
+
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_READS, (uint32)1);
+ }
+
td->transactionData.branchExecuted = 1;
} else {
DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
@@ -359,6 +495,7 @@ T3_Callback_3(int result, NdbConnection
start_T3(td->pNDB, td, stat_async);
return;
}//if
+
td->pNDB->closeTransaction(pCON);
complete_T3(td);
}
@@ -392,26 +529,120 @@ start_T4(Ndb * pNDB, ThreadData * td, in
CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
NdbSleep_MilliSleep(10);
}
-
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
- MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
- (uint32)td->transactionData.server_bit);
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+ CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
+ pCON->getNdbError());
+
+ m= 0;
+
+ /* Create program to add something to the subscriber
+ * sessions column
+ */
+ Uint32 codeBuf[20];
+
+ for (Uint32 p=0; p<20; p++)
+ codeBuf[p]= 0;
+
+ NdbInterpretedCode program(pNDB->getDictionary()->
+ getTable(SUBSCRIBER_TABLE),
+ codeBuf,
+ 20);
+
+ if (program.add_val(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit) ||
+ program.interpret_exit_ok() ||
+ program.finalise())
+ {
+ CHECK_NULL(NULL , "T4-1: Program create failed", td,
+ program.getNdbError());
+ }
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= &program;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+ mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
+ pCON->getNdbError());
+
+ }
+ else
+ {
+ /* Use old Api */
+ if (td->useCombinedUpdate)
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ else
+ {
+ /* Separate read op + update op
+ * Relies on relative ordering of operation execution on a single
+ * row
+ */
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
+ pCON->getNdbError());
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
+ pCON->getNdbError());
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ }
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
@@ -438,15 +669,35 @@ T4_Callback_1(int result, NdbConnection
td->transactionData.server_id);
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_INSERT,
- (char *)&td->transactionData.permission);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+
+ CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_INSERT,
+ (char *)&td->transactionData.permission);
+ }
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
} else {
@@ -484,32 +735,66 @@ T4_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
-
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->insertTuple();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- MyOp->setValue(SESSION_DATA,
- (char *)td->transactionData.session_details);
- /* Operation 4 */
-
- /* Operation 5 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+ /* Operations 3 + 4 */
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SESSION_SUBSCRIBER);
+ SET_MASK(mask, IND_SESSION_SERVER);
+ SET_MASK(mask, IND_SESSION_DATA);
+
+ const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
+
+ CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
+ pCON->getNdbError());
+
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts, sizeof(opts));
+
+ CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->insertTuple();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ MyOp->setValue(IND_SESSION_DATA,
+ (char *)td->transactionData.session_details);
+ /* Operation 4 */
+
+ /* Operation 5 */
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+ }
td->transactionData.branchExecuted = 1;
} else {
td->transactionData.branchExecuted = 0;
@@ -551,7 +836,7 @@ T4_Callback_3(int result, NdbConnection
start_T4(td->pNDB, td, stat_async);
return;
}//if
-
+
DEBUG3("T4(%.*s, %.2d): - Completing",
SUBSCRIBER_NUMBER_LENGTH,
td->transactionData.number,
@@ -590,25 +875,113 @@ start_T5(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
- MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
- (uint32)td->transactionData.server_bit);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+ CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
+ pCON->getNdbError());
+
+ m= 0;
+
+ /* Create program to subtract something from the
+ * subscriber sessions column
+ */
+ Uint32 codeBuf[20];
+ NdbInterpretedCode program(pNDB->getDictionary()->
+ getTable(SUBSCRIBER_TABLE),
+ codeBuf,
+ 20);
+ if (program.sub_val(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit) ||
+ program.interpret_exit_ok() ||
+ program.finalise())
+ {
+ CHECK_NULL(NULL , "T5: Program create failed", td,
+ program.getNdbError());
+ }
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= &program;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+ mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ /* Use old Api */
+ if (td->useCombinedUpdate)
+ {
+ NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ else
+ {
+ /* Use separate read and update operations
+ * This relies on execution ordering between operations on
+ * the same row
+ */
+ NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
+ pCON->getNdbError());
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+
+ MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
+ pCON->getNdbError());
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ }
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
@@ -634,15 +1007,36 @@ T5_Callback_1(int result, NdbConnection
td->transactionData.number,
td->transactionData.server_id);
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_DELETE,
- (char *)&td->transactionData.permission);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+
+ CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_DELETE,
+ (char *)&td->transactionData.permission);
+ }
+
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
} else {
@@ -680,29 +1074,58 @@ T5_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->deleteTuple();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- /* Operation 4 */
-
- /* Operation 5 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
+ CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
+ pCON->getNdbError());
+
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts, sizeof(opts));
+
+ CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ /* Operation 3 */
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->deleteTuple();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ /* Operation 4 */
+
+ /* Operation 5 */
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+ }
td->transactionData.branchExecuted = 1;
} else {
td->transactionData.branchExecuted = 0;
=== modified file 'storage/ndb/test/ndbapi/bench/testData.h'
--- a/storage/ndb/test/ndbapi/bench/testData.h 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/testData.h 2008-09-30 08:18:41 +0000
@@ -121,6 +121,16 @@ typedef struct {
} TransactionData ;
typedef struct {
+ const struct NdbRecord* subscriberTableNdbRecord;
+ const struct NdbRecord* groupTableNdbRecord;
+ const struct NdbRecord* sessionTableNdbRecord;
+ const struct NdbInterpretedCode* incrServerReadsProg;
+ const struct NdbInterpretedCode* incrServerInsertsProg;
+ const struct NdbInterpretedCode* incrServerDeletesProg;
+ const struct NdbRecord* serverTableNdbRecord;
+} NdbRecordSharedData ;
+
+typedef struct {
struct NdbThread* pThread;
unsigned long randomSeed;
@@ -135,10 +145,12 @@ typedef struct {
/**
* For async execution
*/
- RunState runState;
- double startTime;
- TransactionData transactionData;
- struct Ndb * pNDB;
+ RunState runState;
+ double startTime;
+ TransactionData transactionData;
+ struct Ndb * pNDB;
+ NdbRecordSharedData* ndbRecordSharedData;
+ bool useCombinedUpdate;
} ThreadData;
/***************************************************************
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2007-08-01 03:07:58 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2008-10-01 09:24:07 +0000
@@ -56,6 +56,13 @@ enum StartType {
stStop
} ;
+struct ThreadNdb
+{
+ int NoOfOps;
+ int ThreadNo;
+ char * record;
+};
+
extern "C" { static void* threadLoop(void*); }
static void setAttrNames(void);
static void setTableNames(void);
@@ -63,8 +70,10 @@ static int readArguments(int argc, const
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
+static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType,
+ Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
-static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
@@ -77,12 +86,6 @@ static int
ErrorData * flexAsynchErrorData;
-struct ThreadNdb
-{
- int NoOfOps;
- int ThreadNo;
-};
-
static NdbThread* threadLife[NDB_MAXTHREADS];
static int tNodeId;
static int ThreadReady[NDB_MAXTHREADS];
@@ -91,6 +94,9 @@ static char
static char attrName[MAXATTR][MAXSTRLEN+1];
// Program Parameters
+static NdbRecord * g_record[MAXTABLES];
+static bool tNdbRecord = false;
+
static bool tLocal = false;
static int tLocalPart = 0;
static int tSendForce = 0;
@@ -236,6 +242,17 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
+ if (tNdbRecord)
+ {
+ Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+ sz += 3;
+ for (Uint32 i = 0; i<tNoOfThreads; i++)
+ {
+ pThreadData[i].record = (char*)malloc(sz);
+ bzero(pThreadData[i].record, sz);
+ }
+ }
+
if(returnValue == NDBT_OK){
/****************************************************************
* Create NDB objects. *
@@ -501,7 +518,7 @@ threadLoop(void* ThreadData)
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
- if(!executeThread(tType, localNdb, threadBase)){
+ if(!executeThread(tabThread, tType, localNdb, threadBase)){
break;
}
ThreadReady[threadNo] = 1;
@@ -515,66 +532,76 @@ threadLoop(void* ThreadData)
static
bool
-executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+executeThread(ThreadNdb* pThread,
+ StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
int i, j, k;
NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
- for (i = 0; i < tNoOfTransactions; i++) {
- if (tLocal == false) {
- tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
- } else {
- tBase = i * tNoOfParallelTrans * MAX_SEEK;
- }//if
- START_REAL_TIME;
- for (j = 0; j < tNoOfParallelTrans; j++) {
+ unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
+
+ for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
+ {
+ for (i = 0; i < tNoOfTransactions; i++) {
if (tLocal == false) {
- tBase2 = tBase + (j * tNoOfOpsPerTrans);
- } else {
- tBase2 = tBase + (j * MAX_SEEK);
- tBase2 = getKey(threadBase, tBase2);
- }//if
- if (startTransGuess == true) {
- Uint64 Tkey64;
- Uint32* Tkey32 = (Uint32*)&Tkey64;
- Tkey32[0] = threadBase;
- Tkey32[1] = tBase2;
- tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
- (const char*)&Tkey64, //Main PKey
- (Uint32)4); //Key Length
+ tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
} else {
- tConArray[j] = aNdbObject->startTransaction();
+ tBase = i * tNoOfParallelTrans * MAX_SEEK;
}//if
- if (tConArray[j] == NULL &&
- !error_handler(aNdbObject->getNdbError()) ){
- ndbout << endl << "Unable to recover! Quiting now" << endl ;
- return false;
- }//if
-
- for (k = 0; k < tNoOfOpsPerTrans; k++) {
- //-------------------------------------------------------
- // Define the operation, but do not execute it yet.
- //-------------------------------------------------------
- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+ START_REAL_TIME;
+ for (j = 0; j < tNoOfParallelTrans; j++) {
+ if (tLocal == false) {
+ tBase2 = tBase + (j * tNoOfOpsPerTrans);
+ } else {
+ tBase2 = tBase + (j * MAX_SEEK);
+ tBase2 = getKey(threadBase, tBase2);
+ }//if
+ if (startTransGuess == true) {
+ Uint64 Tkey64;
+ Uint32* Tkey32 = (Uint32*)&Tkey64;
+ Tkey32[0] = threadBase;
+ Tkey32[1] = tBase2;
+ tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+ (const char*)&Tkey64, //Main PKey
+ (Uint32)4); //Key Length
+ } else {
+ tConArray[j] = aNdbObject->startTransaction();
+ }//if
+ if (tConArray[j] == NULL &&
+ !error_handler(aNdbObject->getNdbError()) ){
+ ndbout << endl << "Unable to recover! Quiting now" << endl ;
+ return false;
+ }//if
+
+ for (k = 0; k < tNoOfOpsPerTrans; k++) {
+ //-------------------------------------------------------
+ // Define the operation, but do not execute it yet.
+ //-------------------------------------------------------
+ if (tNdbRecord)
+ defineNdbRecordOperation(pThread,
+ tConArray[j], aType, threadBase,(tBase2+k));
+ else
+ defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+ }//for
+
+ tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+ }//for
+ STOP_REAL_TIME;
+ //-------------------------------------------------------
+ // Now we have defined a set of operations, it is now time
+ // to execute all of them.
+ //-------------------------------------------------------
+ int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+ while (Tcomp < tNoOfParallelTrans) {
+ int TlocalComp = aNdbObject->pollNdb(3000, 0);
+ Tcomp += TlocalComp;
+ }//while
+ for (j = 0 ; j < tNoOfParallelTrans ; j++) {
+ aNdbObject->closeTransaction(tConArray[j]);
}//for
-
- tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
- }//for
- STOP_REAL_TIME;
- //-------------------------------------------------------
- // Now we have defined a set of operations, it is now time
- // to execute all of them.
- //-------------------------------------------------------
- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
- while (Tcomp < tNoOfParallelTrans) {
- int TlocalComp = aNdbObject->pollNdb(3000, 0);
- Tcomp += TlocalComp;
- }//while
- for (j = 0 ; j < tNoOfParallelTrans ; j++) {
- aNdbObject->closeTransaction(tConArray[j]);
}//for
- }//for
+ } // for
return true;
}//executeThread()
@@ -720,6 +747,68 @@ defineOperation(NdbConnection* localNdbC
return;
}//defineOperation()
+
+static void
+defineNdbRecordOperation(ThreadNdb* pThread,
+ NdbConnection* pTrans, StartType aType,
+ Uint32 threadBase, Uint32 aIndex)
+{
+ char * record = pThread->record;
+ Uint32 offset;
+ NdbDictionary::getOffset(g_record[0], 0, offset);
+ * (Uint32*)(record + offset) = threadBase;
+ * (Uint32*)(record + offset + 4) = aIndex;
+
+ //-------------------------------------------------------
+ // Set-up the attribute values for this operation.
+ //-------------------------------------------------------
+ if (aType != stRead && aType != stDelete)
+ {
+ for (int k = 1; k < tNoOfAttributes; k++) {
+ NdbDictionary::getOffset(g_record[0], k, offset);
+ * (Uint32*)(record + offset) = aIndex;
+ }//for
+ }
+
+ const NdbOperation* op;
+ switch (aType) {
+ case stInsert: { // Insert case
+ if (theWriteFlag == 1)
+ {
+ op = pTrans->writeTuple(g_record[0],record,g_record[0],record);
+ }
+ else
+ {
+ op = pTrans->insertTuple(g_record[0],record,g_record[0],record);
+ }
+ break;
+ }//case
+ case stRead: { // Read Case
+ op = pTrans->readTuple(g_record[0],record,g_record[0],record);
+ break;
+ }//case
+ case stUpdate:{ // Update Case
+ op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
+ break;
+ }//case
+ case stDelete: { // Delete Case
+ op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+ break;
+ }//case
+ default: {
+ abort();
+ }//default
+ }//switch
+
+ if (op == NULL)
+ {
+ ndbout << "Operation is null " << pTrans->getNdbError() << endl;
+ abort();
+ }
+
+ assert(op != 0);
+}
+
static void setAttrNames()
{
int i;
@@ -813,6 +902,28 @@ createTables(Ndb* pMyNdb){
return -1;
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+
+ if (tNdbRecord)
+ {
+ NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+ const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+
+ int off = 0;
+ Vector<NdbDictionary::RecordSpecification> spec;
+ for (Uint32 j = 0; j<pTab->getNoOfColumns(); j++)
+ {
+ NdbDictionary::RecordSpecification r0;
+ r0.column = pTab->getColumn(j);
+ r0.offset = off;
+ off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+ spec.push_back(r0);
+ }
+ g_record[i] =
+ pDict->createRecord(pTab, spec.getBase(),
+ spec.size(),
+ sizeof(NdbDictionary::RecordSpecification));
+ assert(g_record[i]);
+ }
}
}
@@ -949,6 +1060,10 @@ readArguments(int argc, const char** arg
startTransGuess = false;
argc++;
i--;
+ } else if (strcmp(argv[i], "-ndbrecord") == 0){
+ tNdbRecord = true;
+ argc++;
+ i--;
} else {
return -1;
}
@@ -995,7 +1110,7 @@ input_error(){
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
ndbout_c(" -local Number of part, only use keys in one part out of 16");
+ ndbout_c(" -ndbrecord");
}
-
-
+template class Vector<NdbDictionary::RecordSpecification>;
=== modified file 'storage/ndb/test/ndbapi/msa.cpp'
--- a/storage/ndb/test/ndbapi/msa.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/msa.cpp 2008-09-30 08:18:41 +0000
@@ -22,6 +22,7 @@
#include <NdbSleep.h>
#include <NdbThread.h>
#include <NdbTick.h>
+#include <NdbOut.hpp>
const char* const c_szDatabaseName = "TEST_DB";
@@ -52,6 +53,7 @@ bool g_bWriteTuple = false;
bool g_bInsertInitial = false;
bool g_bVerifyInitial = false;
+Ndb_cluster_connection* theConnection = 0;
NdbMutex* g_pNdbMutexPrintf = 0;
NdbMutex* g_pNdbMutexIncrement = 0;
long g_nNumCallsProcessed = 0;
@@ -242,7 +244,8 @@ int QueryTransaction(Ndb* pNdb,
NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -326,7 +329,8 @@ int RetryQueryTransaction(Ndb* pNdb,
int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -411,7 +415,8 @@ int InsertTransaction(Ndb* pNdb,
NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextID, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextID, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -501,7 +506,8 @@ int RetryInsertTransaction(Ndb* pNdb,
int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -678,7 +684,7 @@ void* RuntimeCallContext(void* lpParam)
long iLockTime;
long iLockTimeUSec;
- pNdb = new Ndb("TEST_DB");
+ pNdb = new Ndb(theConnection, "TEST_DB");
if(!pNdb)
{
NdbMutex_Lock(g_pNdbMutexPrintf);
@@ -971,7 +977,6 @@ void ShowHelp(const char* szCmd)
int main(int argc, char* argv[])
{
ndb_init();
- int iRes = -1;
g_nNumThreads = 0;
g_nMaxCallsPerSecond = 0;
long nSeed = 0;
@@ -998,7 +1003,7 @@ int main(int argc, char* argv[])
break;
case 'm':
g_nStatusDataSize = atol(argv[i]+2);
- if(g_nStatusDataSize>sizeof(STATUS_DATA))
+ if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
{
g_nStatusDataSize = sizeof(STATUS_DATA);
}
@@ -1093,7 +1098,19 @@ int main(int argc, char* argv[])
hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
#endif
- Ndb* pNdb = new Ndb(c_szDatabaseName);
+ theConnection= new Ndb_cluster_connection();
+ if (theConnection->connect(12, 5, 1) != 0)
+ {
+ ndbout << "Unable to connect to managment server." << endl;
+ return -1;
+ }
+ if (theConnection->wait_until_ready(30,0) < 0)
+ {
+ ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+ return -1;
+ }
+
+ Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
if(!pNdb)
{
printf("could not construct ndb\n");
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (tomas.ulin:2682) | Tomas Ulin | 2 Oct |