Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-04-23 20:27:43+02:00, tomas@stripped +3 -0
new ndb tool to measure replication latency
BitKeeper/etc/ignore@stripped, 2007-04-23 20:27:41+02:00, tomas@stripped +13
-0
Added storage/ndb/test/tools/rep_latency storage/ndb/test/ndbapi/testIndexStat
storage/ndb/test/ndbapi/testInterpreter
storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent
storage/ndb/ndbapi-examples/mgmapi_logevent2/mgmapi_logevent2
storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async
storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event
storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries
storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan
storage/ndb/ndbapi-examples/ndbapi_simple/ndbapi_simple
storage/ndb/ndbapi-examples/ndbapi_simple_dual/ndbapi_simple_dual
storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index to the ignore list
storage/ndb/test/tools/Makefile.am@stripped, 2007-04-23 20:27:41+02:00,
tomas@stripped +2 -1
new ndb tool to measure replication latency
storage/ndb/test/tools/rep_latency.cpp@stripped, 2007-04-23 20:27:41+02:00,
tomas@stripped +304 -0
New BitKeeper file ``storage/ndb/test/tools/rep_latency.cpp''
storage/ndb/test/tools/rep_latency.cpp@stripped, 2007-04-23 20:27:41+02:00,
tomas@stripped +0 -0
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: whalegate.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-telco-gca
--- 1.270/BitKeeper/etc/ignore 2007-01-03 01:49:03 +01:00
+++ 1.271/BitKeeper/etc/ignore 2007-04-23 20:27:41 +02:00
@@ -2934,3 +2934,16 @@
win/vs8cache.txt
zlib/*.ds?
zlib/*.vcproj
+storage/ndb/test/tools/rep_latency
+storage/ndb/test/ndbapi/testIndexStat
+storage/ndb/test/ndbapi/testInterpreter
+storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent
+storage/ndb/ndbapi-examples/mgmapi_logevent2/mgmapi_logevent2
+storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async
+storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1
+storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event
+storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries
+storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan
+storage/ndb/ndbapi-examples/ndbapi_simple/ndbapi_simple
+storage/ndb/ndbapi-examples/ndbapi_simple_dual/ndbapi_simple_dual
+storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index
--- 1.15/storage/ndb/test/tools/Makefile.am 2006-12-31 01:06:43 +01:00
+++ 1.16/storage/ndb/test/tools/Makefile.am 2007-04-23 20:27:41 +02:00
@@ -13,7 +13,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead
hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab
create_index ndb_cpcc listen_event
+ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead
hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab
create_index ndb_cpcc listen_event rep_latency
# transproxy
@@ -33,6 +33,7 @@
create_index_SOURCES = create_index.cpp
ndb_cpcc_SOURCES = cpcc.cpp
listen_event_SOURCES = listen.cpp
+rep_latency_SOURCES = rep_latency.cpp
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am
--- New file ---
+++ storage/ndb/test/tools/rep_latency.cpp 07/04/23 20:27:41
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
* Update on master wait for update on slave
*
*/
#include <NdbApi.hpp>
#include <NdbSleep.h>
#include <sys/time.h>
#include <NdbOut.hpp>
#include <NDBT.hpp>
struct Xxx
{
Ndb *ndb;
const NdbDictionary::Table *table;
Uint32 pk_col;
Uint32 col;
};
struct XxxR
{
Uint32 pk_val;
Uint32 val;
struct timeval start_time;
Uint32 latency;
};
static int
prepare_master_or_slave(Ndb &myNdb,
const char* table,
const char* pk,
Uint32 pk_val,
const char* col,
struct Xxx &xxx,
struct XxxR &xxxr);
static void
run_master_update(struct Xxx &xxx, struct XxxR &xxxr);
static void
run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr);
#define PRINT_ERROR(code,msg) \
g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << ".\n"
#define APIERROR(error) { \
PRINT_ERROR((error).code, (error).message); \
exit(-1); }
int main(int argc, char** argv)
{
if (argc != 8)
{
ndbout << "Arguments are <connect_string cluster 1> <connect_string
cluster 2> <database> <table name> <primary key> <value of
primary key> <attribute to update>.\n";
exit(-1);
}
// ndb_init must be called first
ndb_init();
{
const char *opt_connectstring1 = argv[1];
const char *opt_connectstring2 = argv[2];
const char *opt_db = argv[3];
const char *opt_table = argv[4];
const char *opt_pk = argv[5];
const Uint32 opt_pk_val = atoi(argv[6]);
const char *opt_col = argv[7];
// Object representing the cluster 1
Ndb_cluster_connection cluster1_connection(opt_connectstring1);
// Object representing the cluster 2
Ndb_cluster_connection cluster2_connection(opt_connectstring2);
// connect cluster 1 and run application
// Connect to cluster 1 management server (ndb_mgmd)
if (cluster1_connection.connect(4 /* retries */,
5 /* delay between retries */,
1 /* verbose */))
{
g_err << "Cluster 1 management server was not ready within 30 secs.\n";
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster1_connection.wait_until_ready(30,0) < 0)
{
g_err << "Cluster 1 was not ready within 30 secs.\n";
exit(-1);
}
// connect cluster 2 and run application
// Connect to cluster management server (ndb_mgmd)
if (cluster2_connection.connect(4 /* retries */,
5 /* delay between retries */,
1 /* verbose */))
{
g_err << "Cluster 2 management server was not ready within 30 secs.\n";
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster2_connection.wait_until_ready(30,0) < 0)
{
g_err << "Cluster 2 was not ready within 30 secs.\n";
exit(-1);
}
// Object representing the database
Ndb myNdb1(&cluster1_connection, opt_db);
Ndb myNdb2(&cluster2_connection, opt_db);
//
struct Xxx xxx1;
struct Xxx xxx2;
struct XxxR xxxr;
prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col,
xxx1, xxxr);
prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col,
xxx2, xxxr);
while (1)
{
// run the application code
run_master_update(xxx1, xxxr);
run_slave_wait(xxx2, xxxr);
ndbout << "latency: " << xxxr.latency << endl;
}
}
// Note: all connections must have been destroyed before calling ndb_end()
ndb_end(0);
return 0;
}
static int
prepare_master_or_slave(Ndb &myNdb,
const char* table,
const char* pk,
Uint32 pk_val,
const char* col,
struct Xxx &xxx,
struct XxxR &xxxr)
{
if (myNdb.init())
APIERROR(myNdb.getNdbError());
const NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
const NdbDictionary::Table *myTable = myDict->getTable(table);
if (myTable == NULL)
APIERROR(myDict->getNdbError());
const NdbDictionary::Column *myPkCol = myTable->getColumn(pk);
if (myPkCol == NULL)
APIERROR(myDict->getNdbError());
if (myPkCol->getType() != NdbDictionary::Column::Unsigned)
{
PRINT_ERROR(0, "Primary key column not of type unsigned");
exit(-1);
}
const NdbDictionary::Column *myCol = myTable->getColumn(col);
if (myCol == NULL)
APIERROR(myDict->getNdbError());
if (myCol->getType() != NdbDictionary::Column::Unsigned)
{
PRINT_ERROR(0, "Update column not of type unsigned");
exit(-1);
}
xxx.ndb = &myNdb;
xxx.table = myTable;
xxx.pk_col = myPkCol->getColumnNo();
xxx.col = myCol->getColumnNo();
xxxr.pk_val = pk_val;
return 0;
}
static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr)
{
Ndb *ndb = xxx.ndb;
const NdbDictionary::Table *myTable = xxx.table;
int retry_sleep= 10; /* 10 milliseconds */
int retries= 100;
while (1)
{
Uint32 val;
NdbTransaction *trans = ndb->startTransaction();
if (trans == NULL)
goto err;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->readTupleExclusive();
op->equal(xxx.pk_col, xxxr.pk_val);
op->getValue(xxx.col, (char *)&val);
}
if (trans->execute(NdbTransaction::NoCommit))
goto err;
//fprintf(stderr, "read %u\n", val);
xxxr.val = val + 1;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->updateTuple();
op->equal(xxx.pk_col, xxxr.pk_val);
op->setValue(xxx.col, xxxr.val);
}
if (trans->execute(NdbTransaction::Commit))
goto err;
ndb->closeTransaction(trans);
//fprintf(stderr, "updated to %u\n", xxxr.val);
break;
err:
const NdbError this_error= trans ?
trans->getNdbError() : ndb->getNdbError();
if (this_error.status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
NdbSleep_MilliSleep(retry_sleep);
continue; // retry
}
}
if (trans)
ndb->closeTransaction(trans);
APIERROR(this_error);
}
/* update done start timer */
gettimeofday(&xxxr.start_time, 0);
}
static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr)
{
struct timeval old_end_time = xxxr.start_time, end_time;
Ndb *ndb = xxx.ndb;
const NdbDictionary::Table *myTable = xxx.table;
int retry_sleep= 10; /* 10 milliseconds */
int retries= 100;
while (1)
{
Uint32 val;
NdbTransaction *trans = ndb->startTransaction();
if (trans == NULL)
goto err;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->readTuple();
op->equal(xxx.pk_col, xxxr.pk_val);
op->getValue(xxx.col, (char *)&val);
if (trans->execute(NdbTransaction::Commit))
goto err;
}
/* read done, check time of read */
gettimeofday(&end_time, 0);
ndb->closeTransaction(trans);
//fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val);
if (xxxr.val != val)
{
/* expected value not received yet */
retries = 100;
NdbSleep_MilliSleep(retry_sleep);
old_end_time = end_time;
continue;
}
break;
err:
const NdbError this_error= trans ?
trans->getNdbError() : ndb->getNdbError();
if (this_error.status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
NdbSleep_MilliSleep(retry_sleep);
continue; // retry
}
}
if (trans)
ndb->closeTransaction(trans);
APIERROR(this_error);
}
Int64 elapsed_usec1 =
((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 +
((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec);
Int64 elapsed_usec2 =
((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 +
((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec);
xxxr.latency =
((elapsed_usec1 - elapsed_usec2/2)+999)/1000;
}
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.2457) | tomas | 23 Apr |