List:Commits« Previous MessageNext Message »
From:pekka Date:July 14 2007 10:49am
Subject:bk commit into 5.1 tree (pekka:1.2508)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-07-14 11:48:51+03:00, pekka@stripped +8 -0
  ndb - threads and timings to hugo* pk ops

  storage/ndb/test/include/HugoTransactions.hpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +15 -1
    threads and timings to hugo* pk ops

  storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +226 -0
    threads and timings to hugo* pk ops

  storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0
    BitKeeper file
/export/space/pekka/ndb/version/my51-dd/storage/ndb/test/include/NDBT_Thread.hpp

  storage/ndb/test/include/NDBT_Thread.hpp@stripped, 2007-07-14 11:24:20+03:00,
pekka@stripped +0 -0

  storage/ndb/test/src/HugoTransactions.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +74 -4
    threads and timings to hugo* pk ops

  storage/ndb/test/src/Makefile.am@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +1 -1
    threads and timings to hugo* pk ops

  storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +283 -0
    threads and timings to hugo* pk ops

  storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0
    BitKeeper file
/export/space/pekka/ndb/version/my51-dd/storage/ndb/test/src/NDBT_Thread.cpp

  storage/ndb/test/src/NDBT_Thread.cpp@stripped, 2007-07-14 11:24:21+03:00,
pekka@stripped +0 -0

  storage/ndb/test/tools/hugoPkDelete.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +89 -5
    threads and timings to hugo* pk ops

  storage/ndb/test/tools/hugoPkRead.cpp@stripped, 2007-07-14 11:48:06+03:00,
pekka@stripped +85 -4
    threads and timings to hugo* pk ops

  storage/ndb/test/tools/hugoPkUpdate.cpp@stripped, 2007-07-14 11:48:07+03:00,
pekka@stripped +91 -7
    threads and timings to hugo* pk ops

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	clam.(none)
# Root:	/export/space/pekka/ndb/version/my51-dd

--- 1.14/storage/ndb/test/include/HugoTransactions.hpp	2006-12-23 21:20:23 +02:00
+++ 1.15/storage/ndb/test/include/HugoTransactions.hpp	2007-07-14 11:48:06 +03:00
@@ -20,7 +20,7 @@
 #include <NDBT.hpp>
 #include <HugoCalculator.hpp>
 #include <HugoOperations.hpp>
-
+class NDBT_Stats;
 
 class HugoTransactions : public HugoOperations {
 public:
@@ -109,10 +109,24 @@
   void setRetryMax(int retryMax = 100) { m_retryMax = retryMax; }
   
   Uint32 m_latest_gci;
+
+  void setStatsLatency(NDBT_Stats* stats) { m_stats_latency = stats; }
+
+  // allows multiple threads to update separate batches
+  void setThrInfo(int thr_count, int thr_no) {
+    m_thr_count = thr_count;
+    m_thr_no = thr_no;
+  }
+
 protected:  
   NDBT_ResultRow row;
   int m_defaultScanUpdateMethod;
   int m_retryMax;
+
+  NDBT_Stats* m_stats_latency;
+
+  int m_thr_count;      // 0 if no separation between threads
+  int m_thr_no;
 };
 
 

--- 1.32/storage/ndb/test/src/HugoTransactions.cpp	2007-04-12 12:43:54 +03:00
+++ 1.33/storage/ndb/test/src/HugoTransactions.cpp	2007-07-14 11:48:06 +03:00
@@ -14,8 +14,9 @@
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
 #include "HugoTransactions.hpp"
+#include <NDBT_Stats.hpp>
 #include <NdbSleep.h>
-
+#include <NdbTick.h>
 
 HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
 				   const NdbDictionary::Index* idx):
@@ -24,6 +25,10 @@
 
   m_defaultScanUpdateMethod = 3;
   setRetryMax();
+  m_stats_latency = 0;
+
+  m_thr_count = 0;
+  m_thr_no = -1;
 }
 
 HugoTransactions::~HugoTransactions(){
@@ -820,6 +825,16 @@
       return NDBT_FAILED;
     }
 
+    MicroSecondTimer timer_start;
+    MicroSecondTimer timer_stop;
+    bool timer_active =
+      m_stats_latency != 0 &&
+      r >= batch &&             // first batch is "warmup"
+      r + batch != records;     // last batch is usually partial
+
+    if (timer_active)
+      NdbTick_getMicroTimer(&timer_start);
+
     if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
     {
       ERR(pTrans->getNdbError());
@@ -892,6 +907,12 @@
     }
     
     closeTransaction(pNdb);
+
+    if (timer_active) {
+      NdbTick_getMicroTimer(&timer_stop);
+      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+      m_stats_latency->addObservation((double)ticks);
+    }
   }
   deallocRows();
   g_info << reads << " records read" << endl;
@@ -913,9 +934,17 @@
   allocRows(batch);
 
   g_info << "|- Updating records (batch=" << batch << ")..." <<
endl;
+  int batch_no = 0;
   while (r < records){
     if(r + batch > records)
       batch = records - r;
+
+    if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
+    {
+      r += batch;
+      batch_no++;
+      continue;
+    }
     
     if (retryAttempt >= m_retryMax){
       g_info << "ERROR: has retried this operation " << retryAttempt 
@@ -963,6 +992,16 @@
       return NDBT_FAILED;
     }
 
+    MicroSecondTimer timer_start;
+    MicroSecondTimer timer_stop;
+    bool timer_active =
+      m_stats_latency != 0 &&
+      r >= batch &&             // first batch is "warmup"
+      r + batch != records;     // last batch is usually partial
+
+    if (timer_active)
+      NdbTick_getMicroTimer(&timer_start);
+
     if(pIndexScanOp)
     {
       int rows_found = 0;
@@ -1039,8 +1078,15 @@
     }
     
     closeTransaction(pNdb);
-    
+
+    if (timer_active) {
+      NdbTick_getMicroTimer(&timer_stop);
+      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+      m_stats_latency->addObservation((double)ticks);
+    }
+
     r += batch; // Read next record
+    batch_no++;
   }
   
   deallocRows();
@@ -1228,10 +1274,18 @@
   int                  check;
 
   g_info << "|- Deleting records..." << endl;
+  int batch_no = 0;
   while (r < records){
     if(r + batch > records)
       batch = records - r;
 
+    if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
+    {
+      r += batch;
+      batch_no++;
+      continue;
+    }
+
     if (retryAttempt >= m_retryMax){
       g_info << "ERROR: has retried this operation " << retryAttempt 
 	     << " times, failing!" << endl;
@@ -1255,6 +1309,16 @@
       return NDBT_FAILED;
     }
 
+    MicroSecondTimer timer_start;
+    MicroSecondTimer timer_stop;
+    bool timer_active =
+      m_stats_latency != 0 &&
+      r >= batch &&             // first batch is "warmup"
+      r + batch != records;     // last batch is usually partial
+
+    if (timer_active)
+      NdbTick_getMicroTimer(&timer_start);
+
     if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
     {
       ERR(pTrans->getNdbError());
@@ -1303,9 +1367,15 @@
       m_latest_gci = pTrans->getGCI();
     }
     closeTransaction(pNdb);
-    
-    r += batch; // Read next record
 
+    if (timer_active) {
+      NdbTick_getMicroTimer(&timer_stop);
+      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
+      m_stats_latency->addObservation((double)ticks);
+    }
+
+    r += batch; // Read next record
+    batch_no++;
   }
 
   g_info << "|- " << deleted << " records deleted" << endl;

--- 1.10/storage/ndb/test/tools/hugoPkDelete.cpp	2006-12-23 21:20:31 +02:00
+++ 1.11/storage/ndb/test/tools/hugoPkDelete.cpp	2007-07-14 11:48:06 +03:00
@@ -20,22 +20,41 @@
 #include <NdbApi.hpp>
 #include <NdbMain.h>
 #include <NDBT.hpp> 
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
 #include <NdbSleep.h>
 #include <getarg.h>
 
 #include <HugoTransactions.hpp>
 
+static NDBT_ThreadFunc hugoPkDelete;
+
+struct ThrInput {
+  const NdbDictionary::Table* pTab;
+  int records;
+  int batch;
+  int stats;
+};
+
+struct ThrOutput {
+  NDBT_Stats latency;
+};
+
 int main(int argc, const char** argv){
   ndb_init();
 
   int _records = 0;
   int _loops = 1;
-  int _batch = 0;
+  int _threads = 1;
+  int _stats = 0;
+  int _batch = 1;
   const char* _tabname = NULL;
   int _help = 0;
   
   struct getargs args[] = {
     { "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+    { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+    { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
     //    { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
     { "records", 'r', arg_integer, &_records, "Number of records", "records" },
     { "usage", '?', arg_flag, &_help, "Print help", "" }
@@ -81,12 +100,57 @@
     return NDBT_ProgramExit(NDBT_WRONGARGS);
   }
 
-  HugoTransactions hugoTrans(*pTab);
+  // threads
+  NDBT_ThreadSet ths(_threads);
+
+  // create Ndb object for each thread
+  if (ths.connect(&con, "TEST_DB") == -1) {
+    ndbout << "connect failed: err=" << ths.get_err() << endl;
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  // input is options
+  ThrInput input;
+  ths.set_input(&input);
+  input.pTab = pTab;
+  input.records = _records;
+  input.batch = _batch;
+  input.stats = _stats;
+
+  // output is stats
+  ThrOutput output;
+  ths.set_output<ThrOutput>();
+
   int i = 0;
-  while (i<_loops || _loops==0) {
+  while (i < _loops || _loops == 0) {
     ndbout << i << ": ";
-    if (hugoTrans.pkDelRecords(&MyNdb, _records) != 0){
-      return NDBT_ProgramExit(NDBT_FAILED);
+
+    ths.set_func(hugoPkDelete);
+    ths.start();
+    ths.stop();
+
+    if (ths.get_err())
+      NDBT_ProgramExit(NDBT_FAILED);
+
+    if (_stats) {
+      NDBT_Stats latency;
+
+      // add stats from each thread
+      int n;
+      for (n = 0; n < ths.get_count(); n++) {
+        NDBT_Thread& thr = ths.get_thread(n);
+        ThrOutput* output = (ThrOutput*)thr.get_output();
+        latency += output->latency;
+      }
+
+      ndbout
+        << "latency per batch (us): "
+        << " samples=" << latency.getCount()
+        << " min=" << (int)latency.getMin()
+        << " max=" << (int)latency.getMax()
+        << " mean=" << (int)latency.getMean()
+        << " stddev=" << (int)latency.getStddev()
+        << endl;
     }
     i++;
   }
@@ -94,3 +158,23 @@
   return NDBT_ProgramExit(NDBT_OK);
 }
 
+static void hugoPkDelete(NDBT_Thread& thr)
+{
+  const ThrInput* input = (const ThrInput*)thr.get_input();
+  ThrOutput* output = (ThrOutput*)thr.get_output();
+
+  HugoTransactions hugoTrans(*input->pTab);
+  output->latency.reset();
+  if (input->stats)
+    hugoTrans.setStatsLatency(&output->latency);
+
+  NDBT_ThreadSet& ths = thr.get_thread_set();
+  hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
+
+  int ret;
+  ret = hugoTrans.pkDelRecords(thr.get_ndb(),
+                               input->records,
+                               input->batch);
+  if (ret != 0)
+    thr.set_err(ret);
+}

--- 1.8/storage/ndb/test/tools/hugoPkRead.cpp	2006-12-23 21:20:31 +02:00
+++ 1.9/storage/ndb/test/tools/hugoPkRead.cpp	2007-07-14 11:48:06 +03:00
@@ -20,17 +20,33 @@
 #include <NdbApi.hpp>
 #include <NdbMain.h>
 #include <NDBT.hpp> 
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
 #include <NdbSleep.h>
 #include <getarg.h>
 
 #include <HugoTransactions.hpp>
 
+static NDBT_ThreadFunc hugoPkRead;
+
+struct ThrInput {
+  const NdbDictionary::Table* pTab;
+  int records;
+  int batch;
+  int stats;
+};
+
+struct ThrOutput {
+  NDBT_Stats latency;
+};
 
 int main(int argc, const char** argv){
   ndb_init();
 
   int _records = 0;
   int _loops = 1;
+  int _threads = 1;
+  int _stats = 0;
   int _abort = 0;
   int _batch = 1;
   const char* _tabname = NULL;
@@ -39,6 +55,8 @@
   struct getargs args[] = {
     { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are
aborted", "abort%" },
     { "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+    { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+    { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
     { "batch", 'b', arg_integer, &_batch, "batch value(not 0)", "batch" },
     { "records", 'r', arg_integer, &_records, "Number of records", "records" },
     { "usage", '?', arg_flag, &_help, "Print help", "" }
@@ -64,6 +82,7 @@
   {
     return NDBT_ProgramExit(NDBT_FAILED);
   }
+
   Ndb MyNdb(&con, "TEST_DB" );
 
   if(MyNdb.init() != 0){
@@ -81,12 +100,57 @@
     return NDBT_ProgramExit(NDBT_WRONGARGS);
   }
 
-  HugoTransactions hugoTrans(*pTab);
+  // threads
+  NDBT_ThreadSet ths(_threads);
+
+  // create Ndb object for each thread
+  if (ths.connect(&con, "TEST_DB") == -1) {
+    ndbout << "connect failed: err=" << ths.get_err() << endl;
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  // input is options
+  ThrInput input;
+  ths.set_input(&input);
+  input.pTab = pTab;
+  input.records = _records;
+  input.batch = _batch;
+  input.stats = _stats;
+
+  // output is stats
+  ThrOutput output;
+  ths.set_output<ThrOutput>();
+
   int i = 0;
-  while (i<_loops || _loops==0) {
+  while (i < _loops || _loops == 0) {
     ndbout << i << ": ";
-    if (hugoTrans.pkReadRecords(&MyNdb, _records, _batch) != 0){
-      return NDBT_ProgramExit(NDBT_FAILED);
+
+    ths.set_func(hugoPkRead);
+    ths.start();
+    ths.stop();
+
+    if (ths.get_err())
+      NDBT_ProgramExit(NDBT_FAILED);
+
+    if (_stats) {
+      NDBT_Stats latency;
+
+      // add stats from each thread
+      int n;
+      for (n = 0; n < ths.get_count(); n++) {
+        NDBT_Thread& thr = ths.get_thread(n);
+        ThrOutput* output = (ThrOutput*)thr.get_output();
+        latency += output->latency;
+      }
+
+      ndbout
+        << "latency per batch (us): "
+        << " samples=" << latency.getCount()
+        << " min=" << (int)latency.getMin()
+        << " max=" << (int)latency.getMax()
+        << " mean=" << (int)latency.getMean()
+        << " stddev=" << (int)latency.getStddev()
+        << endl;
     }
     i++;
   }
@@ -94,3 +158,20 @@
   return NDBT_ProgramExit(NDBT_OK);
 }
 
+static void hugoPkRead(NDBT_Thread& thr)
+{
+  const ThrInput* input = (const ThrInput*)thr.get_input();
+  ThrOutput* output = (ThrOutput*)thr.get_output();
+
+  HugoTransactions hugoTrans(*input->pTab);
+  output->latency.reset();
+  if (input->stats)
+    hugoTrans.setStatsLatency(&output->latency);
+
+  int ret;
+  ret = hugoTrans.pkReadRecords(thr.get_ndb(),
+                                input->records,
+                                input->batch);
+  if (ret != 0)
+    thr.set_err(ret);
+}

--- 1.11/storage/ndb/test/tools/hugoPkUpdate.cpp	2006-12-23 21:20:31 +02:00
+++ 1.12/storage/ndb/test/tools/hugoPkUpdate.cpp	2007-07-14 11:48:07 +03:00
@@ -20,24 +20,43 @@
 #include <NdbApi.hpp>
 #include <NdbMain.h>
 #include <NDBT.hpp> 
+#include <NDBT_Thread.hpp>
+#include <NDBT_Stats.hpp>
 #include <NdbSleep.h>
 #include <getarg.h>
 
 #include <HugoTransactions.hpp>
 
+static NDBT_ThreadFunc hugoPkUpdate;
+
+struct ThrInput {
+  const NdbDictionary::Table* pTab;
+  int records;
+  int batch;
+  int stats;
+};
+
+struct ThrOutput {
+  NDBT_Stats latency;
+};
+
 int main(int argc, const char** argv){
   ndb_init();
 
   int _records = 0;
   int _loops = 1;
+  int _threads = 1;
+  int _stats = 0;
   int _abort = 0;
-  int _batch = 0;
+  int _batch = 1;
   const char* _tabname = NULL, *db = 0;
   int _help = 0;
 
   struct getargs args[] = {
     { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are
aborted", "abort%" },
     { "loops", 'l', arg_integer, &_loops, "number of times to run this
program(0=infinite loop)", "loops" },
+    { "threads", 't', arg_integer, &_threads, "number of threads (default 1)",
"threads" },
+    { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
     //    { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
     { "records", 'r', arg_integer, &_records, "Number of records", "records" },
     { "usage", '?', arg_flag, &_help, "Print help", "" },
@@ -83,16 +102,81 @@
     return NDBT_ProgramExit(NDBT_WRONGARGS);
   }
 
-  HugoTransactions hugoTrans(*pTab);
+  // threads
+  NDBT_ThreadSet ths(_threads);
+
+  // create Ndb object for each thread
+  if (ths.connect(&con, "TEST_DB") == -1) {
+    ndbout << "connect failed: err=" << ths.get_err() << endl;
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+
+  // input is options
+  ThrInput input;
+  ths.set_input(&input);
+  input.pTab = pTab;
+  input.records = _records;
+  input.batch = _batch;
+  input.stats = _stats;
+
+  // output is stats
+  ThrOutput output;
+  ths.set_output<ThrOutput>();
+
   int i = 0;
-  while (i<_loops || _loops==0) {
-    ndbout << "loop " << i << ": ";
-    if (hugoTrans.pkUpdateRecords(&MyNdb, 
-				  _records) != 0){
-      return NDBT_ProgramExit(NDBT_FAILED);
+  while (i < _loops || _loops == 0) {
+    ndbout << i << ": ";
+
+    ths.set_func(hugoPkUpdate);
+    ths.start();
+    ths.stop();
+
+    if (ths.get_err())
+      NDBT_ProgramExit(NDBT_FAILED);
+
+    if (_stats) {
+      NDBT_Stats latency;
+
+      // add stats from each thread
+      int n;
+      for (n = 0; n < ths.get_count(); n++) {
+        NDBT_Thread& thr = ths.get_thread(n);
+        ThrOutput* output = (ThrOutput*)thr.get_output();
+        latency += output->latency;
+      }
+
+      ndbout
+        << "latency per batch (us): "
+        << " samples=" << latency.getCount()
+        << " min=" << (int)latency.getMin()
+        << " max=" << (int)latency.getMax()
+        << " mean=" << (int)latency.getMean()
+        << " stddev=" << (int)latency.getStddev()
+        << endl;
     }
     i++;
   }
 
   return NDBT_ProgramExit(NDBT_OK);
+}
+
+static void hugoPkUpdate(NDBT_Thread& thr)
+{
+  const ThrInput* input = (const ThrInput*)thr.get_input();
+  ThrOutput* output = (ThrOutput*)thr.get_output();
+
+  HugoTransactions hugoTrans(*input->pTab);
+  output->latency.reset();
+  if (input->stats)
+    hugoTrans.setStatsLatency(&output->latency);
+
+  NDBT_ThreadSet& ths = thr.get_thread_set();
+  hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
+
+  int ret;
+  ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
+                                  input->records,
+                                  input->batch);
+  if (ret != 0)
+    thr.set_err(ret);
 }
--- New file ---
+++ storage/ndb/test/include/NDBT_Thread.hpp	07/07/14 11:24:20
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef NDB_THREAD_HPP
#define NDB_THREAD_HPP

#include <NdbMutex.h>
#include <NdbCondition.h>
#include <NdbThread.h>

// NDBT_Thread ctor -> NDBT_Thread_run -> thr.run()
extern "C" {
static void* NDBT_Thread_run(void* arg);
}

// Function to run in a thread.

typedef void NDBT_ThreadFunc(class NDBT_Thread&);

/*
 * NDBT_Thread
 *
 * Represents a thread.  The thread pauses at startup.
 * Main process sets a function to run.  When the function
 * returns, the thread pauses again to wait for a command.
 * This allows main process to sync with the thread and
 * exchange data with it.
 *
 * Input to thread is typically options.  The input area
 * is read-only in the thread.  Output from thread is
 * results such as statistics.  Error code is handled
 * separately.
 *
 * Pointer to Ndb object and method to create it are
 * provided for convenience.
 */

class NDBT_ThreadSet;

class NDBT_Thread {
public:
  NDBT_Thread();
  NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no);
  void create(NDBT_ThreadSet* thread_set, int thread_no);
  ~NDBT_Thread();

  // if part of a set
  inline NDBT_ThreadSet& get_thread_set() const {
    assert(m_thread_set != 0);
    return *m_thread_set;
  }
  inline int get_thread_no() const {
    return m_thread_no;
  }

  // { Wait -> Start -> Stop }+ -> Exit
  enum State {
    Wait = 1,   // wait for command
    Start,      // run current function
    Stop,       // stopped (paused) when current function done
    Exit        // exit thread
  };

  // tell thread to start running current function
  void start();
  // wait for thread to stop when function is done
  void stop();
  // tell thread to exit
  void exit();
  // collect thread after exit
  void join();

  // set function to run
  inline void set_func(NDBT_ThreadFunc* func) {
    m_func = func;
  }

  // input area
  inline void set_input(const void* input) {
    m_input = input;
  }
  inline const void* get_input() const {
    return m_input;
  }

  // output area
  inline void set_output(void* output) {
    m_output = output;
  }
  inline void* get_output() const {
    return m_output;
  }
  template <class T> inline void set_output() {
    set_output(new T);
  }
  inline void delete_output() {
    delete m_output;
    m_output = 0;
  }

  // thread-specific Ndb object
  inline class Ndb* get_ndb() const {
    return m_ndb;
  }
  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
  void disconnect();

  // error code (OS, Ndb, other)
  void clear_err() {
    m_err = 0;
  }
  void set_err(int err) {
    m_err = err;
  }
  int get_err() const {
    return m_err;
  }

private:
  friend class NDBT_ThreadSet;
  friend void* NDBT_Thread_run(void* arg);

  enum { Magic = 0xabacadae };
  Uint32 m_magic;

  State m_state;
  NDBT_ThreadSet* m_thread_set;
  int m_thread_no;

  NDBT_ThreadFunc* m_func;
  const void* m_input;
  void* m_output;
  class Ndb* m_ndb;
  int m_err;

  // run the thread
  void run();

  void lock() {
    NdbMutex_Lock(m_mutex);
  }
  void unlock() {
    NdbMutex_Unlock(m_mutex);
  }

  void wait() {
    NdbCondition_Wait(m_cond, m_mutex);
  }
  void signal() {
    NdbCondition_Signal(m_cond);
  }

  NdbMutex* m_mutex;
  NdbCondition* m_cond;
  NdbThread* m_thread;
  void* m_status;
};

/*
 * A set of threads, indexed from 0 to count-1.  Methods
 * are applied to each thread (serially).  Input area is
 * common to all threads.  Output areas are allocated
 * separately according to a template class.
 */

class NDBT_ThreadSet {
public:
  NDBT_ThreadSet(int count);
  ~NDBT_ThreadSet();

  inline int get_count() const {
    return m_count;
  }
  inline NDBT_Thread& get_thread(int n) {
    assert(n < m_count && m_thread[n] != 0);
    return *m_thread[n];
  }

  // tell each thread to start running
  void start();
  // wait for each thread to stop
  void stop();
  // tell each thread to exit
  void exit();
  // collect each thread after exit
  void join();

  // set function to run in each thread
  void set_func(NDBT_ThreadFunc* func);

  // set input area (same instance in each thread)
  void set_input(const void* input);

  // set output areas
  template <class T> inline void set_output() {
    for (int n = 0; n < m_count; n++) {
      NDBT_Thread& thr = *m_thread[n];
      thr.set_output<T>();
    }
  }
  void delete_output();

  // thread-specific Ndb objects
  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
  void disconnect();

  int get_err() const;

private:
  int m_count;
  NDBT_Thread** m_thread;
};

#endif

--- New file ---
+++ storage/ndb/test/src/NDBT_Thread.cpp	07/07/14 11:24:21
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <NDBT_Thread.hpp>
#include <NdbApi.hpp>

NDBT_Thread::NDBT_Thread()
{
  create(0, -1);
}

NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
{
  create(thread_set, thread_no);
}

void
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
{
  m_magic = NDBT_Thread::Magic;

  m_state = Wait;
  m_thread_set = thread_set;
  m_thread_no = thread_no;
  m_func = 0;
  m_input = 0;
  m_output = 0;
  m_ndb = 0;
  m_err = 0;

  m_mutex = NdbMutex_Create();
  assert(m_mutex != 0);
  m_cond = NdbCondition_Create();
  assert(m_cond != 0);

  char buf[20];
  sprintf(buf, "NDBT_%04u");
  const char* name = strdup(buf);
  assert(name != 0);

  unsigned stacksize = 512 * 1024;
  NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
  m_thread = NdbThread_Create(NDBT_Thread_run,
                              (void**)this, stacksize, name, prio);
  assert(m_thread != 0);
}

NDBT_Thread::~NDBT_Thread()
{
  if (m_thread != 0) {
    NdbThread_Destroy(&m_thread);
    m_thread = 0;
  }
  if (m_cond != 0) {
    NdbCondition_Destroy(m_cond);
    m_cond = 0;
  }
  if (m_mutex != 0) {
    NdbMutex_Destroy(m_mutex);
    m_mutex = 0;
  }
}

static void*
NDBT_Thread_run(void* arg)
{
  assert(arg != 0);
  NDBT_Thread& thr = *(NDBT_Thread*)arg;
  assert(thr.m_magic == NDBT_Thread::Magic);
  thr.run();
  return 0;
}

void
NDBT_Thread::run()
{
  while (1) {
    lock();
    while (m_state != Start && m_state != Exit) {
      wait();
    }
    if (m_state == Exit) {
      unlock();
      break;
    }
    (*m_func)(*this);
    m_state = Stop;
    signal();
    unlock();
  }
}

// methods for main process

void
NDBT_Thread::start()
{
  lock();
  m_state = Start;
  signal();
  unlock();
}

void
NDBT_Thread::stop()
{
  lock();
  while (m_state != Stop)
    wait();
  m_state = Wait;
  unlock();
}

void
NDBT_Thread::exit()
{
  lock();
  m_state = Exit;
  signal();
  unlock();
};

void
NDBT_Thread::join()
{
  NdbThread_WaitFor(m_thread, &m_status);
  m_thread = 0;
}

int
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
{
  m_ndb = new Ndb(ncc, db);
  if (m_ndb->init() == -1 ||
      m_ndb->waitUntilReady() == -1) {
    m_err = m_ndb->getNdbError().code;
    return -1;
  }
  return 0;
}

void
NDBT_Thread::disconnect()
{
  delete m_ndb;
  m_ndb = 0;
}

// set of threads

NDBT_ThreadSet::NDBT_ThreadSet(int count)
{
  m_count = count;
  m_thread = new NDBT_Thread* [count];
  for (int n = 0; n < count; n++) {
    m_thread[n] = new NDBT_Thread(this, n);
  }
}

NDBT_ThreadSet::~NDBT_ThreadSet()
{
  delete_output();
  for (int n = 0; n < m_count; n++) {
    delete m_thread[n];
    m_thread[n] = 0;
  }
  delete [] m_thread;
}

void
NDBT_ThreadSet::start()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.start();
  }
}

void
NDBT_ThreadSet::stop()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.stop();
  }
}

void
NDBT_ThreadSet::exit()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.exit();
  }
}

void
NDBT_ThreadSet::join()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.join();
  }
}

void
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.set_func(func);
  }
}

void
NDBT_ThreadSet::set_input(const void* input)
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.set_input(input);
  }
}

void
NDBT_ThreadSet::delete_output()
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      thr.delete_output();
    }
  }
}

int
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
{
  for (int n = 0; n < m_count; n++) {
    assert(m_thread[n] != 0);
    NDBT_Thread& thr = *m_thread[n];
    if (thr.connect(ncc, db) == -1)
      return -1;
  }
  return 0;
}

void
NDBT_ThreadSet::disconnect()
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      thr.disconnect();
    }
  }
}

int
NDBT_ThreadSet::get_err() const
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      int err = thr.get_err();
      if (err != 0)
        return err;
    }
  }
  return 0;
}


--- 1.11/storage/ndb/test/src/Makefile.am	2006-12-31 02:06:43 +02:00
+++ 1.12/storage/ndb/test/src/Makefile.am	2007-07-14 11:48:06 +03:00
@@ -24,7 +24,7 @@
 	NdbRestarter.cpp NdbRestarts.cpp NDBT_Output.cpp \
 	NdbBackup.cpp  NdbConfig.cpp NdbGrep.cpp NDBT_Table.cpp \
 	NdbSchemaCon.cpp NdbSchemaOp.cpp getarg.c \
-	CpcClient.cpp
+	CpcClient.cpp NDBT_Thread.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/common/mgmcommon
-I$(top_srcdir)/storage/ndb/include/mgmcommon -I$(top_srcdir)/storage/ndb/include/kernel
-I$(top_srcdir)/storage/ndb/src/mgmapi
 
Thread
bk commit into 5.1 tree (pekka:1.2508)pekka14 Jul