Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-07-14 11:48:51+03:00, pekka@stripped +8 -0
ndb - threads and timings to hugo* pk ops
storage/ndb/test/include/HugoTransactions.hpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +15 -1
threads and timings to hugo* pk ops
storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +226 -0
threads and timings to hugo* pk ops
storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0
BitKeeper file
/export/space/pekka/ndb/version/my51-dd/storage/ndb/test/include/NDBT_Thread.hpp
storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:24:20+03:00,
pekka@stripped +0 -0
storage/ndb/test/src/HugoTransactions.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +74 -4
threads and timings to hugo* pk ops
storage/ndb/test/src/Makefile.am@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +1 -1
threads and timings to hugo* pk ops
storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +283 -0
threads and timings to hugo* pk ops
storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0
BitKeeper file
/export/space/pekka/ndb/version/my51-dd/storage/ndb/test/src/NDBT_Thread.cpp
storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0
storage/ndb/test/tools/hugoPkDelete.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +89 -5
threads and timings to hugo* pk ops
storage/ndb/test/tools/hugoPkRead.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +85 -4
threads and timings to hugo* pk ops
storage/ndb/test/tools/hugoPkUpdate.cpp@stripped, 2007-07-14 11:48:07+03:00,
pekka@stripped +91 -7
threads and timings to hugo* pk ops
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: pekka
# Host: clam.(none)
# Root: /export/space/pekka/ndb/version/my51-dd
--- 1.14/storage/ndb/test/include/HugoTransactions.hpp 2006-12-23 21:20:23 +02:00
+++ 1.15/storage/ndb/test/include/HugoTransactions.hpp 2007-07-14 11:48:06 +03:00
@@ -20,7 +20,7 @@
#include <NDBT.hpp>
#include <HugoCalculator.hpp>
#include <HugoOperations.hpp>
-
+class NDBT_Stats;
class HugoTransactions : public HugoOperations {
public:
@@ -109,10 +109,24 @@
void setRetryMax(int retryMax = 100) { m_retryMax = retryMax; }
Uint32 m_latest_gci;
+
+ void setStatsLatency(NDBT_Stats* stats) { m_stats_latency = stats; }
+
+ // allows multiple threads to update separate batches
+ void setThrInfo(int thr_count, int thr_no) {
+ m_thr_count = thr_count;
+ m_thr_no = thr_no;
+ }
+
protected:
NDBT_ResultRow row;
int m_defaultScanUpdateMethod;
int m_retryMax;
+
+ NDBT_Stats* m_stats_latency;
+
+ int m_thr_count; // 0 if no separation between threads
+ int m_thr_no;
};
--- 1.32/storage/ndb/test/src/HugoTransactions.cpp 2007-04-12 12:43:54 +03:00
+++ 1.33/storage/ndb/test/src/HugoTransactions.cpp 2007-07-14 11:48:06 +03:00
@@ -14,8 +14,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "HugoTransactions.hpp"
+#include <NDBT_Stats.hpp>
#include <NdbSleep.h>
-
+#include <NdbTick.h>
HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
const NdbDictionary::Index* idx):
@@ -24,6 +25,10 @@
m_defaultScanUpdateMethod = 3;
setRetryMax();
+ m_stats_latency = 0;
+
+ m_thr_count = 0;
+ m_thr_no = -1;
}
HugoTransactions::~HugoTransactions(){
@@ -820,6 +825,16 @@
return NDBT_FAILED;
}
+ MicroSecondTimer timer_start;
+ MicroSecondTimer timer_stop;
+ bool timer_active =
+ m_stats_latency != 0 &&
+ r >= batch && // first batch is "warmup"
+ r + batch != records; // last batch is usually partial
+
+ if (timer_active)
+ NdbTick_getMicroTimer(&timer_start);
+
if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
{
ERR(pTrans->getNdbError());
@@ -892,6 +907,12 @@
}
closeTransaction(pNdb);
+
+ if (timer_active) {
+ NdbTick_getMicroTimer(&timer_stop);
+ NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+ m_stats_latency->addObservation((double)ticks);
+ }
}
deallocRows();
g_info << reads << " records read" << endl;
@@ -913,9 +934,17 @@
allocRows(batch);
g_info << "|- Updating records (batch=" << batch << ")..." <<
endl;
+ int batch_no = 0;
while (r < records){
if(r + batch > records)
batch = records - r;
+
+ if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
+ {
+ r += batch;
+ batch_no++;
+ continue;
+ }
if (retryAttempt >= m_retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
@@ -963,6 +992,16 @@
return NDBT_FAILED;
}
+ MicroSecondTimer timer_start;
+ MicroSecondTimer timer_stop;
+ bool timer_active =
+ m_stats_latency != 0 &&
+ r >= batch && // first batch is "warmup"
+ r + batch != records; // last batch is usually partial
+
+ if (timer_active)
+ NdbTick_getMicroTimer(&timer_start);
+
if(pIndexScanOp)
{
int rows_found = 0;
@@ -1039,8 +1078,15 @@
}
closeTransaction(pNdb);
-
+
+ if (timer_active) {
+ NdbTick_getMicroTimer(&timer_stop);
+ NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+ m_stats_latency->addObservation((double)ticks);
+ }
+
r += batch; // Read next record
+ batch_no++;
}
deallocRows();
@@ -1228,10 +1274,18 @@
int check;
g_info << "|- Deleting records..." << endl;
+ int batch_no = 0;
while (r < records){
if(r + batch > records)
batch = records - r;
+ if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
+ {
+ r += batch;
+ batch_no++;
+ continue;
+ }
+
if (retryAttempt >= m_retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
@@ -1255,6 +1309,16 @@
return NDBT_FAILED;
}
+ MicroSecondTimer timer_start;
+ MicroSecondTimer timer_stop;
+ bool timer_active =
+ m_stats_latency != 0 &&
+ r >= batch && // first batch is "warmup"
+ r + batch != records; // last batch is usually partial
+
+ if (timer_active)
+ NdbTick_getMicroTimer(&timer_start);
+
if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
{
ERR(pTrans->getNdbError());
@@ -1303,9 +1367,15 @@
m_latest_gci = pTrans->getGCI();
}
closeTransaction(pNdb);
-
- r += batch; // Read next record
+ if (timer_active) {
+ NdbTick_getMicroTimer(&timer_stop);
+ NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+ m_stats_latency->addObservation((double)ticks);
+ }
+
+ r += batch; // Read next record
+ batch_no++;
}
g_info << "|- " << deleted << " records deleted" << endl;
--- 1.10/storage/ndb/test/tools/hugoPkDelete.cpp 2006-12-23 21:20:31 +02:00
+++ 1.11/storage/ndb/test/tools/hugoPkDelete.cpp 2007-07-14 11:48:06 +03:00
@@ -20,22 +20,41 @@
#include <NdbApi.hpp>
#include <NdbMain.h>
#include <NDBT.hpp>
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
#include <NdbSleep.h>
#include <getarg.h>
#include <HugoTransactions.hpp>
+static NDBT_ThreadFunc hugoPkDelete;
+
+struct ThrInput {
+ const NdbDictionary::Table* pTab;
+ int records;
+ int batch;
+ int stats;
+};
+
+struct ThrOutput {
+ NDBT_Stats latency;
+};
+
int main(int argc, const char** argv){
ndb_init();
int _records = 0;
int _loops = 1;
- int _batch = 0;
+ int _threads = 1;
+ int _stats = 0;
+ int _batch = 1;
const char* _tabname = NULL;
int _help = 0;
struct getargs args[] = {
{ "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+ { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+ { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
// { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
@@ -81,12 +100,57 @@
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
- HugoTransactions hugoTrans(*pTab);
+ // threads
+ NDBT_ThreadSet ths(_threads);
+
+ // create Ndb object for each thread
+ if (ths.connect(&con, "TEST_DB") == -1) {
+ ndbout << "connect failed: err=" << ths.get_err() << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ // input is options
+ ThrInput input;
+ ths.set_input(&input);
+ input.pTab = pTab;
+ input.records = _records;
+ input.batch = _batch;
+ input.stats = _stats;
+
+ // output is stats
+ ThrOutput output;
+ ths.set_output<ThrOutput>();
+
int i = 0;
- while (i<_loops || _loops==0) {
+ while (i < _loops || _loops == 0) {
ndbout << i << ": ";
- if (hugoTrans.pkDelRecords(&MyNdb, _records) != 0){
- return NDBT_ProgramExit(NDBT_FAILED);
+
+ ths.set_func(hugoPkDelete);
+ ths.start();
+ ths.stop();
+
+ if (ths.get_err())
+ NDBT_ProgramExit(NDBT_FAILED);
+
+ if (_stats) {
+ NDBT_Stats latency;
+
+ // add stats from each thread
+ int n;
+ for (n = 0; n < ths.get_count(); n++) {
+ NDBT_Thread& thr = ths.get_thread(n);
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+ latency += output->latency;
+ }
+
+ ndbout
+ << "latency per batch (us): "
+ << " samples=" << latency.getCount()
+ << " min=" << (int)latency.getMin()
+ << " max=" << (int)latency.getMax()
+ << " mean=" << (int)latency.getMean()
+ << " stddev=" << (int)latency.getStddev()
+ << endl;
}
i++;
}
@@ -94,3 +158,23 @@
return NDBT_ProgramExit(NDBT_OK);
}
+static void hugoPkDelete(NDBT_Thread& thr)
+{
+ const ThrInput* input = (const ThrInput*)thr.get_input();
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+
+ HugoTransactions hugoTrans(*input->pTab);
+ output->latency.reset();
+ if (input->stats)
+ hugoTrans.setStatsLatency(&output->latency);
+
+ NDBT_ThreadSet& ths = thr.get_thread_set();
+ hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
+
+ int ret;
+ ret = hugoTrans.pkDelRecords(thr.get_ndb(),
+ input->records,
+ input->batch);
+ if (ret != 0)
+ thr.set_err(ret);
+}
--- 1.8/storage/ndb/test/tools/hugoPkRead.cpp 2006-12-23 21:20:31 +02:00
+++ 1.9/storage/ndb/test/tools/hugoPkRead.cpp 2007-07-14 11:48:06 +03:00
@@ -20,17 +20,33 @@
#include <NdbApi.hpp>
#include <NdbMain.h>
#include <NDBT.hpp>
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
#include <NdbSleep.h>
#include <getarg.h>
#include <HugoTransactions.hpp>
+static NDBT_ThreadFunc hugoPkRead;
+
+struct ThrInput {
+ const NdbDictionary::Table* pTab;
+ int records;
+ int batch;
+ int stats;
+};
+
+struct ThrOutput {
+ NDBT_Stats latency;
+};
int main(int argc, const char** argv){
ndb_init();
int _records = 0;
int _loops = 1;
+ int _threads = 1;
+ int _stats = 0;
int _abort = 0;
int _batch = 1;
const char* _tabname = NULL;
@@ -39,6 +55,8 @@
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are
aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+ { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+ { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
{ "batch", 'b', arg_integer, &_batch, "batch value(not 0)", "batch" },
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
@@ -64,6 +82,7 @@
{
return NDBT_ProgramExit(NDBT_FAILED);
}
+
Ndb MyNdb(&con, "TEST_DB" );
if(MyNdb.init() != 0){
@@ -81,12 +100,57 @@
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
- HugoTransactions hugoTrans(*pTab);
+ // threads
+ NDBT_ThreadSet ths(_threads);
+
+ // create Ndb object for each thread
+ if (ths.connect(&con, "TEST_DB") == -1) {
+ ndbout << "connect failed: err=" << ths.get_err() << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ // input is options
+ ThrInput input;
+ ths.set_input(&input);
+ input.pTab = pTab;
+ input.records = _records;
+ input.batch = _batch;
+ input.stats = _stats;
+
+ // output is stats
+ ThrOutput output;
+ ths.set_output<ThrOutput>();
+
int i = 0;
- while (i<_loops || _loops==0) {
+ while (i < _loops || _loops == 0) {
ndbout << i << ": ";
- if (hugoTrans.pkReadRecords(&MyNdb, _records, _batch) != 0){
- return NDBT_ProgramExit(NDBT_FAILED);
+
+ ths.set_func(hugoPkRead);
+ ths.start();
+ ths.stop();
+
+ if (ths.get_err())
+ NDBT_ProgramExit(NDBT_FAILED);
+
+ if (_stats) {
+ NDBT_Stats latency;
+
+ // add stats from each thread
+ int n;
+ for (n = 0; n < ths.get_count(); n++) {
+ NDBT_Thread& thr = ths.get_thread(n);
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+ latency += output->latency;
+ }
+
+ ndbout
+ << "latency per batch (us): "
+ << " samples=" << latency.getCount()
+ << " min=" << (int)latency.getMin()
+ << " max=" << (int)latency.getMax()
+ << " mean=" << (int)latency.getMean()
+ << " stddev=" << (int)latency.getStddev()
+ << endl;
}
i++;
}
@@ -94,3 +158,20 @@
return NDBT_ProgramExit(NDBT_OK);
}
+static void hugoPkRead(NDBT_Thread& thr)
+{
+ const ThrInput* input = (const ThrInput*)thr.get_input();
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+
+ HugoTransactions hugoTrans(*input->pTab);
+ output->latency.reset();
+ if (input->stats)
+ hugoTrans.setStatsLatency(&output->latency);
+
+ int ret;
+ ret = hugoTrans.pkReadRecords(thr.get_ndb(),
+ input->records,
+ input->batch);
+ if (ret != 0)
+ thr.set_err(ret);
+}
--- 1.11/storage/ndb/test/tools/hugoPkUpdate.cpp 2006-12-23 21:20:31 +02:00
+++ 1.12/storage/ndb/test/tools/hugoPkUpdate.cpp 2007-07-14 11:48:07 +03:00
@@ -20,24 +20,43 @@
#include <NdbApi.hpp>
#include <NdbMain.h>
#include <NDBT.hpp>
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
#include <NdbSleep.h>
#include <getarg.h>
#include <HugoTransactions.hpp>
+static NDBT_ThreadFunc hugoPkUpdate;
+
+struct ThrInput {
+ const NdbDictionary::Table* pTab;
+ int records;
+ int batch;
+ int stats;
+};
+
+struct ThrOutput {
+ NDBT_Stats latency;
+};
+
int main(int argc, const char** argv){
ndb_init();
int _records = 0;
int _loops = 1;
+ int _threads = 1;
+ int _stats = 0;
int _abort = 0;
- int _batch = 0;
+ int _batch = 1;
const char* _tabname = NULL, *db = 0;
int _help = 0;
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are
aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+ { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+ { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
// { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
{ "usage", '?', arg_flag, &_help, "Print help", "" },
@@ -83,16 +102,81 @@
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
- HugoTransactions hugoTrans(*pTab);
+ // threads
+ NDBT_ThreadSet ths(_threads);
+
+ // create Ndb object for each thread
+ if (ths.connect(&con, "TEST_DB") == -1) {
+ ndbout << "connect failed: err=" << ths.get_err() << endl;
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ // input is options
+ ThrInput input;
+ ths.set_input(&input);
+ input.pTab = pTab;
+ input.records = _records;
+ input.batch = _batch;
+ input.stats = _stats;
+
+ // output is stats
+ ThrOutput output;
+ ths.set_output<ThrOutput>();
+
int i = 0;
- while (i<_loops || _loops==0) {
- ndbout << "loop " << i << ": ";
- if (hugoTrans.pkUpdateRecords(&MyNdb,
- _records) != 0){
- return NDBT_ProgramExit(NDBT_FAILED);
+ while (i < _loops || _loops == 0) {
+ ndbout << i << ": ";
+
+ ths.set_func(hugoPkUpdate);
+ ths.start();
+ ths.stop();
+
+ if (ths.get_err())
+ NDBT_ProgramExit(NDBT_FAILED);
+
+ if (_stats) {
+ NDBT_Stats latency;
+
+ // add stats from each thread
+ int n;
+ for (n = 0; n < ths.get_count(); n++) {
+ NDBT_Thread& thr = ths.get_thread(n);
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+ latency += output->latency;
+ }
+
+ ndbout
+ << "latency per batch (us): "
+ << " samples=" << latency.getCount()
+ << " min=" << (int)latency.getMin()
+ << " max=" << (int)latency.getMax()
+ << " mean=" << (int)latency.getMean()
+ << " stddev=" << (int)latency.getStddev()
+ << endl;
}
i++;
}
return NDBT_ProgramExit(NDBT_OK);
+}
+
+static void hugoPkUpdate(NDBT_Thread& thr)
+{
+ const ThrInput* input = (const ThrInput*)thr.get_input();
+ ThrOutput* output = (ThrOutput*)thr.get_output();
+
+ HugoTransactions hugoTrans(*input->pTab);
+ output->latency.reset();
+ if (input->stats)
+ hugoTrans.setStatsLatency(&output->latency);
+
+ NDBT_ThreadSet& ths = thr.get_thread_set();
+ hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
+
+ int ret;
+ ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
+ input->records,
+ input->batch);
+ if (ret != 0)
+ thr.set_err(ret);
}
--- New file ---
+++ storage/ndb/test/include/NDBT_Thread.hpp 07/07/14 11:24:20
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef NDB_THREAD_HPP
#define NDB_THREAD_HPP
#include <NdbMutex.h>
#include <NdbCondition.h>
#include <NdbThread.h>
// NDBT_Thread ctor -> NDBT_Thread_run -> thr.run()
extern "C" {
static void* NDBT_Thread_run(void* arg);
}
// Function to run in a thread.
typedef void NDBT_ThreadFunc(class NDBT_Thread&);
/*
* NDBT_Thread
*
* Represents a thread. The thread pauses at startup.
* Main process sets a function to run. When the function
* returns, the thread pauses again to wait for a command.
* This allows main process to sync with the thread and
* exchange data with it.
*
* Input to thread is typically options. The input area
* is read-only in the thread. Output from thread is
* results such as statistics. Error code is handled
* separately.
*
* Pointer to Ndb object and method to create it are
* provided for convenience.
*/
class NDBT_ThreadSet;
class NDBT_Thread {
public:
NDBT_Thread();
NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no);
void create(NDBT_ThreadSet* thread_set, int thread_no);
~NDBT_Thread();
// if part of a set
inline NDBT_ThreadSet& get_thread_set() const {
assert(m_thread_set != 0);
return *m_thread_set;
}
inline int get_thread_no() const {
return m_thread_no;
}
// { Wait -> Start -> Stop }+ -> Exit
enum State {
Wait = 1, // wait for command
Start, // run current function
Stop, // stopped (paused) when current function done
Exit // exit thread
};
// tell thread to start running current function
void start();
// wait for thread to stop when function is done
void stop();
// tell thread to exit
void exit();
// collect thread after exit
void join();
// set function to run
inline void set_func(NDBT_ThreadFunc* func) {
m_func = func;
}
// input area
inline void set_input(const void* input) {
m_input = input;
}
inline const void* get_input() const {
return m_input;
}
// output area
inline void set_output(void* output) {
m_output = output;
}
inline void* get_output() const {
return m_output;
}
template <class T> inline void set_output() {
set_output(new T);
}
inline void delete_output() {
delete m_output;
m_output = 0;
}
// thread-specific Ndb object
inline class Ndb* get_ndb() const {
return m_ndb;
}
int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
void disconnect();
// error code (OS, Ndb, other)
void clear_err() {
m_err = 0;
}
void set_err(int err) {
m_err = err;
}
int get_err() const {
return m_err;
}
private:
friend class NDBT_ThreadSet;
friend void* NDBT_Thread_run(void* arg);
enum { Magic = 0xabacadae };
Uint32 m_magic;
State m_state;
NDBT_ThreadSet* m_thread_set;
int m_thread_no;
NDBT_ThreadFunc* m_func;
const void* m_input;
void* m_output;
class Ndb* m_ndb;
int m_err;
// run the thread
void run();
void lock() {
NdbMutex_Lock(m_mutex);
}
void unlock() {
NdbMutex_Unlock(m_mutex);
}
void wait() {
NdbCondition_Wait(m_cond, m_mutex);
}
void signal() {
NdbCondition_Signal(m_cond);
}
NdbMutex* m_mutex;
NdbCondition* m_cond;
NdbThread* m_thread;
void* m_status;
};
/*
* A set of threads, indexed from 0 to count-1. Methods
* are applied to each thread (serially). Input area is
* common to all threads. Output areas are allocated
* separately according to a template class.
*/
class NDBT_ThreadSet {
public:
NDBT_ThreadSet(int count);
~NDBT_ThreadSet();
inline int get_count() const {
return m_count;
}
inline NDBT_Thread& get_thread(int n) {
assert(n < m_count && m_thread[n] != 0);
return *m_thread[n];
}
// tell each thread to start running
void start();
// wait for each thread to stop
void stop();
// tell each thread to exit
void exit();
// collect each thread after exit
void join();
// set function to run in each thread
void set_func(NDBT_ThreadFunc* func);
// set input area (same instance in each thread)
void set_input(const void* input);
// set output areas
template <class T> inline void set_output() {
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.set_output<T>();
}
}
void delete_output();
// thread-specific Ndb objects
int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
void disconnect();
int get_err() const;
private:
int m_count;
NDBT_Thread** m_thread;
};
#endif
--- New file ---
+++ storage/ndb/test/src/NDBT_Thread.cpp 07/07/14 11:24:21
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <NDBT_Thread.hpp>
#include <NdbApi.hpp>
NDBT_Thread::NDBT_Thread()
{
create(0, -1);
}
NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
{
create(thread_set, thread_no);
}
void
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
{
m_magic = NDBT_Thread::Magic;
m_state = Wait;
m_thread_set = thread_set;
m_thread_no = thread_no;
m_func = 0;
m_input = 0;
m_output = 0;
m_ndb = 0;
m_err = 0;
m_mutex = NdbMutex_Create();
assert(m_mutex != 0);
m_cond = NdbCondition_Create();
assert(m_cond != 0);
char buf[20];
sprintf(buf, "NDBT_%04u");
const char* name = strdup(buf);
assert(name != 0);
unsigned stacksize = 512 * 1024;
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
m_thread = NdbThread_Create(NDBT_Thread_run,
(void**)this, stacksize, name, prio);
assert(m_thread != 0);
}
NDBT_Thread::~NDBT_Thread()
{
if (m_thread != 0) {
NdbThread_Destroy(&m_thread);
m_thread = 0;
}
if (m_cond != 0) {
NdbCondition_Destroy(m_cond);
m_cond = 0;
}
if (m_mutex != 0) {
NdbMutex_Destroy(m_mutex);
m_mutex = 0;
}
}
static void*
NDBT_Thread_run(void* arg)
{
assert(arg != 0);
NDBT_Thread& thr = *(NDBT_Thread*)arg;
assert(thr.m_magic == NDBT_Thread::Magic);
thr.run();
return 0;
}
void
NDBT_Thread::run()
{
while (1) {
lock();
while (m_state != Start && m_state != Exit) {
wait();
}
if (m_state == Exit) {
unlock();
break;
}
(*m_func)(*this);
m_state = Stop;
signal();
unlock();
}
}
// methods for main process
void
NDBT_Thread::start()
{
lock();
m_state = Start;
signal();
unlock();
}
void
NDBT_Thread::stop()
{
lock();
while (m_state != Stop)
wait();
m_state = Wait;
unlock();
}
void
NDBT_Thread::exit()
{
lock();
m_state = Exit;
signal();
unlock();
};
void
NDBT_Thread::join()
{
NdbThread_WaitFor(m_thread, &m_status);
m_thread = 0;
}
int
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
{
m_ndb = new Ndb(ncc, db);
if (m_ndb->init() == -1 ||
m_ndb->waitUntilReady() == -1) {
m_err = m_ndb->getNdbError().code;
return -1;
}
return 0;
}
void
NDBT_Thread::disconnect()
{
delete m_ndb;
m_ndb = 0;
}
// set of threads
NDBT_ThreadSet::NDBT_ThreadSet(int count)
{
m_count = count;
m_thread = new NDBT_Thread* [count];
for (int n = 0; n < count; n++) {
m_thread[n] = new NDBT_Thread(this, n);
}
}
NDBT_ThreadSet::~NDBT_ThreadSet()
{
delete_output();
for (int n = 0; n < m_count; n++) {
delete m_thread[n];
m_thread[n] = 0;
}
delete [] m_thread;
}
void
NDBT_ThreadSet::start()
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.start();
}
}
void
NDBT_ThreadSet::stop()
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.stop();
}
}
void
NDBT_ThreadSet::exit()
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.exit();
}
}
void
NDBT_ThreadSet::join()
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.join();
}
}
void
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.set_func(func);
}
}
void
NDBT_ThreadSet::set_input(const void* input)
{
for (int n = 0; n < m_count; n++) {
NDBT_Thread& thr = *m_thread[n];
thr.set_input(input);
}
}
void
NDBT_ThreadSet::delete_output()
{
for (int n = 0; n < m_count; n++) {
if (m_thread[n] != 0) {
NDBT_Thread& thr = *m_thread[n];
thr.delete_output();
}
}
}
int
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
{
for (int n = 0; n < m_count; n++) {
assert(m_thread[n] != 0);
NDBT_Thread& thr = *m_thread[n];
if (thr.connect(ncc, db) == -1)
return -1;
}
return 0;
}
void
NDBT_ThreadSet::disconnect()
{
for (int n = 0; n < m_count; n++) {
if (m_thread[n] != 0) {
NDBT_Thread& thr = *m_thread[n];
thr.disconnect();
}
}
}
int
NDBT_ThreadSet::get_err() const
{
for (int n = 0; n < m_count; n++) {
if (m_thread[n] != 0) {
NDBT_Thread& thr = *m_thread[n];
int err = thr.get_err();
if (err != 0)
return err;
}
}
return 0;
}
--- 1.11/storage/ndb/test/src/Makefile.am 2006-12-31 02:06:43 +02:00
+++ 1.12/storage/ndb/test/src/Makefile.am 2007-07-14 11:48:06 +03:00
@@ -24,7 +24,7 @@
NdbRestarter.cpp NdbRestarts.cpp NDBT_Output.cpp \
NdbBackup.cpp NdbConfig.cpp NdbGrep.cpp NDBT_Table.cpp \
NdbSchemaCon.cpp NdbSchemaOp.cpp getarg.c \
- CpcClient.cpp
+ CpcClient.cpp NDBT_Thread.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/common/mgmcommon
-I$(top_srcdir)/storage/ndb/include/mgmcommon -I$(top_srcdir)/storage/ndb/include/kernel
-I$(top_srcdir)/storage/ndb/src/mgmapi
| Thread |
|---|
| • bk commit into 5.1 tree (pekka:1.2508) | pekka | 14 Jul |