2675 Tomas Ulin 2008-10-15 [merge]
merge
added:
mysql-test/suite/ndb/r/ndb_discover_db.result
mysql-test/suite/ndb/r/ndb_discover_db2.result
mysql-test/suite/ndb/t/ndb_discover_db.test
mysql-test/suite/ndb/t/ndb_discover_db2-master.opt
mysql-test/suite/ndb/t/ndb_discover_db2.test
sql/ha_ndbcluster_lock_ext.h
modified:
mysql-test/mysql-test-run.pl
mysql-test/suite/ndb/r/ndb_alter_table_online2.result
mysql-test/suite/ndb/r/ndb_basic.result
mysql-test/suite/ndb/t/disabled.def
mysql-test/suite/ndb/t/ndb_alter_table_online2.test
mysql-test/suite/ndb/t/ndb_basic.test
mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result
mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test
sql/Makefile.am
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_connection.cc
sql/ha_ndbcluster_connection.h
sql/handler.cc
sql/handler.h
sql/mysqld.cc
sql/sql_db.cc
sql/sql_delete.cc
sql/sql_parse.cc
sql/sql_rename.cc
sql/sql_table.cc
storage/ndb/include/ndb_constants.h
storage/ndb/include/util/ndb_opts.h
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/trix/Trix.cpp
storage/ndb/src/kernel/blocks/trix/Trix.hpp
storage/ndb/src/ndbapi/NdbInterpretedCode.cpp
storage/ndb/test/ndbapi/Makefile.am
storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
storage/ndb/test/ndbapi/bench/ndb_async2.cpp
storage/ndb/test/ndbapi/bench/testData.h
storage/ndb/test/ndbapi/flexAsynch.cpp
storage/ndb/test/ndbapi/msa.cpp
storage/ndb/test/run-test/setup.cpp
2674 Jonas Oreland 2008-09-25 [merge]
merge 5.1-telco-6.2-merge t0 6.0-ndb
modified:
mysql-test/suite/ndb/r/ndb_insert.result
mysql-test/suite/ndb/t/ndb_insert.test
storage/ndb/src/common/portlib/NdbTick.c
storage/ndb/src/common/util/ndb_init.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/trix/Trix.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/ndberror.c
storage/ndb/test/include/NDBT_Test.hpp
storage/ndb/test/run-test/autotest-run.sh
storage/ndb/test/run-test/db.cpp
storage/ndb/test/run-test/main.cpp
storage/ndb/test/run-test/setup.cpp
storage/ndb/test/src/NDBT_Test.cpp
=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl 2008-09-02 15:44:54 +0000
+++ b/mysql-test/mysql-test-run.pl 2008-10-15 12:14:27 +0000
@@ -133,7 +133,7 @@ our $default_vardir;
our $opt_usage;
our $opt_suites;
-our $opt_suites_default= "main,backup,binlog,rpl,rpl_ndb,ndb,ndb_binlog"; # Default suites to run
+our $opt_suites_default= "ndb,ndb_binlog,rpl_ndb,main,backup,binlog,rpl"; # Default suites to run
our $opt_script_debug= 0; # Script debugging, enable with --script-debug
our $opt_verbose= 0; # Verbose output, enable with --verbose
@@ -2721,7 +2721,7 @@ sub ndbd_start ($$$) {
mtr_add_arg($args, "$extra_args");
my $nodeid= $cluster->{'ndbds'}->[$idx]->{'nodeid'};
- my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}.log";
+ my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}_out.log";
$pid= mtr_spawn($exe_ndbd, $args, "",
$path_ndbd_log,
$path_ndbd_log,
@@ -3779,7 +3779,6 @@ sub mysqld_arguments ($$$$) {
mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
if ( $mysql_version_id >= 50100 )
{
- mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
mtr_add_arg($args, "%s--ndb-log-orig", $prefix);
}
}
@@ -3847,7 +3846,6 @@ sub mysqld_arguments ($$$$) {
mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
if ( $mysql_version_id >= 50100 )
{
- mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
mtr_add_arg($args, "%s--ndb-log-orig", $prefix);
}
}
=== modified file 'mysql-test/suite/ndb/r/ndb_alter_table_online2.result'
--- a/mysql-test/suite/ndb/r/ndb_alter_table_online2.result 2007-11-19 12:59:12 +0000
+++ b/mysql-test/suite/ndb/r/ndb_alter_table_online2.result 2008-10-15 12:14:27 +0000
@@ -94,3 +94,4 @@ truncate ndb_show_tables_results;
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
drop table t1;
+drop database mysqlslap;
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result 2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result 2008-10-15 12:14:27 +0000
@@ -924,4 +924,24 @@ create table if not exists t1 (a int not
create table t2 like t1;
rename table t1 to t10, t2 to t20;
drop table t10,t20;
+#
+# bug #39872 - explain causes segv
+# (ndb_index_stat_enable=1 must be set to trigger bug)
+#
+set ndb_index_stat_enable=1;
+CREATE TABLE `t1` (
+`id` int(11) NOT NULL AUTO_INCREMENT,
+PRIMARY KEY (`id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+CREATE TABLE `t2` (
+`id` int(11) NOT NULL,
+`obj_id` int(11) DEFAULT NULL,
+UNIQUE KEY `id` (`id`),
+KEY `obj_id` (`obj_id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+# here we used to segv
+explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE NULL NULL NULL NULL NULL NULL NULL Impossible WHERE noticed after reading const tables
+drop table t1, t2;
End of 5.1 tests
=== added file 'mysql-test/suite/ndb/r/ndb_discover_db.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db.result 2008-09-30 09:14:44 +0000
@@ -0,0 +1,7 @@
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;
=== added file 'mysql-test/suite/ndb/r/ndb_discover_db2.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db2.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db2.result 2008-09-30 09:14:44 +0000
@@ -0,0 +1,28 @@
+show create database discover_db;
+Database Create Database
+discover_db CREATE DATABASE `discover_db` /*!40100 DEFAULT CHARACTER SET latin1 */
+show create database discover_db_2;
+Database Create Database
+discover_db_2 CREATE DATABASE `discover_db_2` /*!40100 DEFAULT CHARACTER SET binary */
+reset master;
+insert into discover_db.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin1.000001 # Query 102 # BEGIN
+master-bin1.000001 # Table_map 102 # table_id: # (discover_db.t1)
+master-bin1.000001 # Table_map 102 # table_id: # (mysql.ndb_apply_status)
+master-bin1.000001 # Write_rows 102 # table_id: #
+master-bin1.000001 # Write_rows 102 # table_id: # flags: STMT_END_F
+master-bin1.000001 # Query 102 # COMMIT
+reset master;
+insert into discover_db_2.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin1.000001 # Query 102 # BEGIN
+master-bin1.000001 # Table_map 102 # table_id: # (discover_db_2.t1)
+master-bin1.000001 # Table_map 102 # table_id: # (mysql.ndb_apply_status)
+master-bin1.000001 # Write_rows 102 # table_id: #
+master-bin1.000001 # Write_rows 102 # table_id: # flags: STMT_END_F
+master-bin1.000001 # Query 102 # COMMIT
+drop database discover_db;
+drop database discover_db_2;
=== modified file 'mysql-test/suite/ndb/t/disabled.def'
--- a/mysql-test/suite/ndb/t/disabled.def 2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/ndb/t/disabled.def 2008-10-15 12:14:27 +0000
@@ -11,7 +11,6 @@
##############################################################################
ndb_partition_error2 : HF is not sure if the test can work as internded on all the platforms
-#ndb_index_ordered : Bug#38370 The test ndb.ndb_index_ordered fails with the community features on
# the below testcase have been reworked to avoid the bug, test contains comment, keep bug open
#ndb_binlog_ddl_multi : BUG#18976 2006-04-10 kent CRBR: multiple binlog, second binlog may miss schema log events
=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_online2.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_online2.test 2008-04-20 21:25:28 +0000
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_online2.test 2008-10-15 12:14:27 +0000
@@ -166,3 +166,4 @@ truncate ndb_show_tables_results;
# drop the table
drop table t1;
+drop database mysqlslap;
=== modified file 'mysql-test/suite/ndb/t/ndb_basic.test'
--- a/mysql-test/suite/ndb/t/ndb_basic.test 2007-11-12 12:25:34 +0000
+++ b/mysql-test/suite/ndb/t/ndb_basic.test 2008-10-15 12:14:27 +0000
@@ -858,4 +858,23 @@ create table t2 like t1;
rename table t1 to t10, t2 to t20;
drop table t10,t20;
+--echo #
+--echo # bug #39872 - explain causes segv
+--echo # (ndb_index_stat_enable=1 must be set to trigger bug)
+--echo #
+set ndb_index_stat_enable=1;
+CREATE TABLE `t1` (
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ PRIMARY KEY (`id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+CREATE TABLE `t2` (
+ `id` int(11) NOT NULL,
+ `obj_id` int(11) DEFAULT NULL,
+ UNIQUE KEY `id` (`id`),
+ KEY `obj_id` (`obj_id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+--echo # here we used to segv
+explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
+drop table t1, t2;
+
--echo End of 5.1 tests
=== added file 'mysql-test/suite/ndb/t/ndb_discover_db.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db.test 2008-09-30 09:14:44 +0000
@@ -0,0 +1,33 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+-- disable_warnings
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+-- enable_warnings
+
+#
+# Prepare for testing database discovery by creating
+# databases, and removing them on one mysqld
+# The discovery happens in ndb_discover_db2.test
+#
+
+# check that created database is discovered
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+
+# check that altered database is discovered
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db_2
+
=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2-master.opt'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt 2008-09-30 09:14:44 +0000
@@ -0,0 +1 @@
+--skip-external-locking
=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2.test 2008-09-30 09:14:44 +0000
@@ -0,0 +1,21 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+#
+# When this test started there no database on disk for server2
+# Check that table has been discovered correctly, and that the
+# binlog is updated correctly
+#
+
+-- connection server2
+show create database discover_db;
+show create database discover_db_2;
+reset master;
+insert into discover_db.t1 values (1,1);
+--source include/show_binlog_events2.inc
+reset master;
+insert into discover_db_2.t1 values (1,1);
+--source include/show_binlog_events2.inc
+
+drop database discover_db;
+drop database discover_db_2;
=== modified file 'mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result'
--- a/mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result 2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result 2008-10-15 12:14:27 +0000
@@ -12,7 +12,7 @@ count(*)
create table t1 (a int key, b int) engine ndb;
insert into t1 values (1,1);
*** on master it should be empty ***
-select * from mysql.ndb_apply_status;
+select * from mysql.ndb_apply_status where server_id <> 0;
server_id epoch log_name start_pos end_pos
*** on slave there should be one row ***
select count(*) from mysql.ndb_apply_status;
=== modified file 'mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test'
--- a/mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test 2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test 2008-10-15 12:14:27 +0000
@@ -32,7 +32,8 @@ connection master;
create table t1 (a int key, b int) engine ndb;
insert into t1 values (1,1);
echo *** on master it should be empty ***;
-select * from mysql.ndb_apply_status;
+#filter away stuff put there with server_id = 0 (from ndb_restore)
+select * from mysql.ndb_apply_status where server_id <> 0;
sync_slave_with_master;
echo *** on slave there should be one row ***;
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2008-09-02 15:44:54 +0000
+++ b/sql/Makefile.am 2008-10-15 12:14:27 +0000
@@ -70,6 +70,7 @@ noinst_HEADERS = item.h item_func.h item
ha_ndbcluster.h ha_ndbcluster_cond.h \
ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
ha_ndbcluster_connection.h ha_ndbcluster_connection.h \
+ ha_ndbcluster_lock_ext.h \
ha_partition.h rpl_constants.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
rpl_reporting.h \
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2008-09-05 12:15:38 +0000
+++ b/sql/ha_ndbcluster.cc 2008-10-15 12:14:27 +0000
@@ -67,12 +67,22 @@ TYPELIB ndb_distribution_typelib= { arra
const char *opt_ndb_distribution= ndb_distribution_names[ND_KEYHASH];
enum ndb_distribution opt_ndb_distribution_id= ND_KEYHASH;
+/*
+ Provided for testing purposes to be able to run full test suite
+ with --ndbcluster option without getting warnings about cluster
+ not being connected
+*/
+my_bool ndbcluster_silent= 0;
+
// Default value for parallelism
static const int parallelism= 0;
-// Default value for max number of transactions
-// createable against NDB from this handler
-static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
+/*
+ Default value for max number of transactions createable against NDB from
+ the handler. Should really be 2 but there is a transaction to much allocated
+ when lock table is used, and one extra to used for global schema lock.
+*/
+static const int max_transactions= 4;
static uint ndbcluster_partition_flags();
static int ndbcluster_init(void *);
@@ -108,7 +118,7 @@ static uint ndbcluster_alter_partition_f
return HA_PARTITION_FUNCTION_SUPPORTED;
}
-#define NDB_AUTO_INCREMENT_RETRIES 10
+#define NDB_AUTO_INCREMENT_RETRIES 100
#define BATCH_FLUSH_SIZE (32768)
#define ERR_PRINT(err) \
@@ -147,10 +157,12 @@ static uchar *ndbcluster_get_key(NDB_SHA
static
NdbRecord *
ndb_get_table_statistics_ndbrecord(NDBDICT *, const NDBTAB *);
-static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *,
- struct Ndb_statistics *);
-static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
- const NdbRecord *, struct Ndb_statistics *);
+static int ndb_get_table_statistics(THD *thd, ha_ndbcluster*, bool, Ndb*, const NDBTAB *,
+ struct Ndb_statistics *,
+ bool have_lock= FALSE);
+static int ndb_get_table_statistics(THD *thd, ha_ndbcluster*, bool, Ndb*,
+ const NdbRecord *, struct Ndb_statistics *,
+ bool have_lock= FALSE);
THD *injector_thd= 0;
@@ -350,6 +362,9 @@ Thd_ndb::Thd_ndb()
(void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
(hash_get_key)thd_ndb_share_get_key, 0, 0);
m_unsent_bytes= 0;
+ global_schema_lock_trans= NULL;
+ global_schema_lock_count= 0;
+ global_schema_lock_error= 0;
init_alloc_root(&m_batch_mem_root, BATCH_FLUSH_SIZE/4, 0);
}
@@ -357,19 +372,6 @@ Thd_ndb::~Thd_ndb()
{
if (ndb)
{
-#ifndef DBUG_OFF
- Ndb::Free_list_usage tmp;
- tmp.m_name= 0;
- while (ndb->get_free_list_usage(&tmp))
- {
- uint leaked= (uint) tmp.m_created - tmp.m_free;
- if (leaked)
- fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
- leaked, tmp.m_name,
- (leaked == 1)?"":"'s",
- (leaked == 1)?"has":"have");
- }
-#endif
delete ndb;
ndb= NULL;
}
@@ -489,7 +491,8 @@ static void set_ndb_err(THD *thd, const
DBUG_VOID_RETURN;
}
-int ha_ndbcluster::ndb_err(NdbTransaction *trans)
+int ha_ndbcluster::ndb_err(NdbTransaction *trans,
+ bool have_lock)
{
THD *thd= current_thd;
int res;
@@ -508,7 +511,7 @@ int ha_ndbcluster::ndb_err(NdbTransactio
bzero((char*) &table_list,sizeof(table_list));
table_list.db= m_dbname;
table_list.alias= table_list.table_name= m_tabname;
- close_cached_tables(thd, &table_list, FALSE, FALSE);
+ close_cached_tables(thd, &table_list, have_lock, FALSE);
break;
}
default:
@@ -2883,12 +2886,12 @@ int ha_ndbcluster::ndb_write_row(uchar *
for (;;)
{
Ndb_tuple_id_range_guard g(m_share);
- if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1) == -1)
+ if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1000) == -1)
{
- if (--retries &&
+ if (--retries && !thd->killed &&
ndb->getNdbError().status == NdbError::TemporaryError)
{
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue;
}
ERR_RETURN(ndb->getNdbError());
@@ -3823,6 +3826,14 @@ int ha_ndbcluster::rnd_init(bool scan)
int ha_ndbcluster::close_scan()
{
+ /*
+ workaround for bug #39872 - explain causes segv
+ - rnd_end/close_scan is called on unlocked table
+ - should be fixed in server code, but this will
+ not be done until 6.0 as it is too intrusive
+ */
+ if (m_thd_ndb == NULL)
+ return 0;
NdbTransaction *trans= m_thd_ndb->trans;
int error;
DBUG_ENTER("close_scan");
@@ -4504,6 +4515,11 @@ int ha_ndbcluster::start_statement(THD *
{
//lockThisTable();
DBUG_PRINT("info", ("Locking the table..." ));
+#ifdef NOT_YET
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 0,
+ "Table only locked locally in this mysqld", "NDB");
+#endif
}
DBUG_RETURN(0);
}
@@ -5360,18 +5376,6 @@ int ha_ndbcluster::create(const char *na
NDBDICT *dict= ndb->getDictionary();
DBUG_PRINT("info", ("Tablespace %s,%s", form->s->tablespace, create_info->tablespace));
- if (is_truncate)
- {
- {
- Ndb_table_guard ndbtab_g(dict, m_tabname);
- if (!(m_table= ndbtab_g.get_table()))
- ERR_RETURN(dict->getNdbError());
- m_table= NULL;
- }
- DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
- if ((my_errno= delete_table(name)))
- DBUG_RETURN(my_errno);
- }
table= form;
if (create_from_engine)
{
@@ -5390,6 +5394,12 @@ int ha_ndbcluster::create(const char *na
}
#ifdef HAVE_NDB_BINLOG
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+
+ if (!((thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP) ||
+ ndbcluster_has_global_schema_lock(thd_ndb)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::create"));
/*
Don't allow table creation unless
schema distribution table is setup
@@ -5406,6 +5416,18 @@ int ha_ndbcluster::create(const char *na
single_user_mode = NdbDictionary::Table::SingleUserModeReadWrite;
}
#endif /* HAVE_NDB_BINLOG */
+ if (is_truncate)
+ {
+ {
+ Ndb_table_guard ndbtab_g(dict, m_tabname);
+ if (!(m_table= ndbtab_g.get_table()))
+ ERR_RETURN(dict->getNdbError());
+ m_table= NULL;
+ }
+ DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
+ if ((my_errno= delete_table(name)))
+ DBUG_RETURN(my_errno);
+ }
DBUG_PRINT("table", ("name: %s", m_tabname));
if (tab.setName(m_tabname))
@@ -5990,6 +6012,12 @@ int ha_ndbcluster::rename_table(const ch
if (check_ndb_connection(thd))
DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
+#ifdef HAVE_NDB_BINLOG
+ if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::rename_table"));
+#endif
+
Ndb *ndb= get_ndb(thd);
ndb->setDatabaseName(old_dbname);
dict= ndb->getDictionary();
@@ -6384,7 +6412,6 @@ int ha_ndbcluster::delete_table(const ch
if (!ndb_schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
- DBUG_ASSERT(ndb_schema_share);
error= HA_ERR_NO_CONNECTION;
goto err;
}
@@ -6397,6 +6424,13 @@ int ha_ndbcluster::delete_table(const ch
}
ndb= get_ndb(thd);
+
+#ifdef HAVE_NDB_BINLOG
+ if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::delete_table"));
+#endif
+
/*
Drop table in ndb.
If it was already gone it might have been dropped
@@ -6451,10 +6485,10 @@ void ha_ndbcluster::get_auto_increment(u
ndb->readAutoIncrementValue(m_table, g.range, auto_value) ||
ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size, increment, offset))
{
- if (--retries &&
+ if (--retries && !thd->killed &&
ndb->getNdbError().status == NdbError::TemporaryError)
{
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue;
}
const NdbError err= ndb->getNdbError();
@@ -6677,7 +6711,7 @@ int ha_ndbcluster::open(const char *name
m_share= 0;
DBUG_RETURN(res);
}
- if ((res= update_stats(thd, 1)) ||
+ if ((res= update_stats(thd, 1, true)) ||
(res= info(HA_STATUS_CONST)))
{
free_share(&m_share);
@@ -6971,7 +7005,10 @@ err:
share->key, share->use_count));
free_share(&share);
}
- if (ndb_error.code)
+ /*
+ ndbcluster_silent - avoid "cluster disconnected error"
+ */
+ if (ndb_error.code && (!ndbcluster_silent || ndb_error.code != 4009))
{
ERR_RETURN(ndb_error);
}
@@ -6995,7 +7032,15 @@ int ndbcluster_table_exists_in_engine(ha
NDBDICT* dict= ndb->getDictionary();
NdbDictionary::Dictionary::List list;
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+ {
+ /*
+ ndbcluster_silent
+ - avoid "cluster failure" warning if cluster is not connected
+ */
+ if (ndbcluster_silent && dict->getNdbError().code == 4009)
+ DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
ERR_RETURN(dict->getNdbError());
+ }
for (uint i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
@@ -7096,7 +7141,6 @@ static void ndbcluster_drop_database(han
if (!ndb_schema_share)
{
DBUG_PRINT("info", ("Schema distribution table not setup"));
- DBUG_ASSERT(ndb_schema_share);
DBUG_VOID_RETURN;
}
#endif
@@ -7278,6 +7322,12 @@ int ndbcluster_find_files(handlerton *ht
if (dir)
DBUG_RETURN(0); // Discover of databases not yet supported
+#ifdef HAVE_NDB_BINLOG
+ Ndbcluster_global_schema_lock_guard ndbcluster_global_schema_lock_guard(thd);
+ if (ndbcluster_global_schema_lock_guard.lock())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+#endif
+
// List tables in NDB
NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
@@ -7688,22 +7738,6 @@ static int ndbcluster_end(handlerton *ht
#endif
hash_free(&ndbcluster_open_tables);
- if (g_ndb)
- {
-#ifndef DBUG_OFF
- Ndb::Free_list_usage tmp;
- tmp.m_name= 0;
- while (g_ndb->get_free_list_usage(&tmp))
- {
- uint leaked= (uint) tmp.m_created - tmp.m_free;
- if (leaked)
- fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
- leaked, tmp.m_name,
- (leaked == 1)?"":"'s",
- (leaked == 1)?"has":"have");
- }
-#endif
- }
ndbcluster_disconnect();
// cleanup ndb interface
@@ -8072,7 +8106,7 @@ uint ndb_get_commitcount(THD *thd, char
{
Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
if (ndbtab_g.get_table() == 0
- || ndb_get_table_statistics(NULL, FALSE, ndb, ndbtab_g.get_table(), &stat))
+ || ndb_get_table_statistics(thd, NULL, FALSE, ndb, ndbtab_g.get_table(), &stat))
{
/* ndb_share reference temporary free */
DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
@@ -8746,7 +8780,9 @@ struct ndb_table_statistics_row {
Uint64 var_mem;
};
-int ha_ndbcluster::update_stats(THD *thd, bool do_read_stat)
+int ha_ndbcluster::update_stats(THD *thd,
+ bool do_read_stat,
+ bool have_lock)
{
struct Ndb_statistics stat;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -8758,7 +8794,7 @@ int ha_ndbcluster::update_stats(THD *thd
{
DBUG_RETURN(my_errno= HA_ERR_OUT_OF_MEM);
}
- if (int err= ndb_get_table_statistics(this, TRUE, ndb,
+ if (int err= ndb_get_table_statistics(thd, this, TRUE, ndb,
m_ndb_statistics_record, &stat))
{
DBUG_RETURN(err);
@@ -8826,15 +8862,16 @@ ndb_get_table_statistics_ndbrecord(NDBDI
static
int
-ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ndb_get_table_statistics(THD *thd, ha_ndbcluster* file, bool report_error, Ndb* ndb,
const NdbRecord *record,
- struct Ndb_statistics * ndbstat)
+ struct Ndb_statistics * ndbstat,
+ bool have_lock)
{
NdbTransaction* pTrans;
NdbError error;
- int retries= 10;
+ int retries= 100;
int reterr= 0;
- int retry_sleep= 30 * 1000; /* 30 milliseconds */
+ int retry_sleep= 30; /* 30 milliseconds */
const char *row;
#ifndef DBUG_OFF
char buff[22], buff2[22], buff3[22], buff4[22];
@@ -8941,7 +8978,7 @@ retry:
{
if (file && pTrans)
{
- reterr= file->ndb_err(pTrans);
+ reterr= file->ndb_err(pTrans, have_lock);
}
else
{
@@ -8958,9 +8995,10 @@ retry:
ndb->closeTransaction(pTrans);
pTrans= NULL;
}
- if (error.status == NdbError::TemporaryError && retries--)
+ if (error.status == NdbError::TemporaryError &&
+ retries-- && !thd->killed)
{
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue;
}
break;
@@ -8975,9 +9013,10 @@ retry:
*/
static
int
-ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ndb_get_table_statistics(THD *thd, ha_ndbcluster* file, bool report_error, Ndb* ndb,
const NDBTAB *ndbtab,
- struct Ndb_statistics *ndbstat)
+ struct Ndb_statistics *ndbstat,
+ bool have_lock)
{
NDBDICT *dict= ndb->getDictionary();
NdbRecord *rec= ndb_get_table_statistics_ndbrecord(dict, ndbtab);
@@ -8986,7 +9025,7 @@ ndb_get_table_statistics(ha_ndbcluster*
DBUG_ENTER("ndb_get_table_statistics");
ERR_RETURN(dict->getNdbError());
}
- int res= ndb_get_table_statistics(file, report_error, ndb, rec, ndbstat);
+ int res= ndb_get_table_statistics(thd, file, report_error, ndb, rec, ndbstat);
dict->releaseRecord(rec);
return res;
}
@@ -10150,7 +10189,7 @@ pthread_handler_t ndb_util_thread_func(v
}
Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
if (ndbtab_g.get_table() &&
- ndb_get_table_statistics(NULL, FALSE, ndb,
+ ndb_get_table_statistics(thd, NULL, FALSE, ndb,
ndbtab_g.get_table(), &stat) == 0)
{
#ifndef DBUG_OFF
@@ -10895,6 +10934,12 @@ int ha_ndbcluster::alter_table_phase1(TH
adding= adding | HA_ADD_INDEX | HA_ADD_UNIQUE_INDEX;
dropping= dropping | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
+#ifdef HAVE_NDB_BINLOG
+ if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::alter_table_phase1"));
+#endif
+
if (!(alter_data= new NDB_ALTER_DATA(dict, m_table)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
old_tab= alter_data->old_table;
@@ -11085,6 +11130,12 @@ int ha_ndbcluster::alter_table_phase2(TH
DBUG_ENTER("alter_table_phase2");
dropping= dropping | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
+#ifdef HAVE_NDB_BINLOG
+ if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::alter_table_phase2"));
+#endif
+
if ((*alter_flags & dropping).is_set())
{
/* Tell the handler to finally drop the indexes. */
@@ -11117,6 +11168,10 @@ int ha_ndbcluster::alter_table_phase3(TH
DBUG_ENTER("alter_table_phase3");
#ifdef HAVE_NDB_BINLOG
+ if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ha_ndbcluster::alter_table_phase3"));
+
const char *db= table->s->db.str;
const char *name= table->s->table_name.str;
/*
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2008-08-16 05:15:49 +0000
+++ b/sql/ha_ndbcluster.h 2008-10-15 12:14:27 +0000
@@ -231,7 +231,12 @@ inline my_bool get_binlog_use_update(NDB
enum THD_NDB_OPTIONS
{
- TNO_NO_LOG_SCHEMA_OP= 1 << 0
+ TNO_NO_LOG_SCHEMA_OP= 1 << 0,
+ /*
+ In participating mysqld, do not try to acquire global schema
+ lock, as one other mysqld already has the lock.
+ */
+ TNO_NO_LOCK_SCHEMA_OP= 1 << 1
};
enum THD_NDB_TRANS_OPTIONS
@@ -279,6 +284,9 @@ class Thd_ndb
we execute() to flush the rows buffered in m_batch_mem_root.
*/
uint m_unsent_bytes;
+ NdbTransaction *global_schema_lock_trans;
+ uint global_schema_lock_count;
+ uint global_schema_lock_error;
};
class ha_ndbcluster: public handler
@@ -452,7 +460,7 @@ static void set_tabname(const char *path
/*
* Internal to ha_ndbcluster, used by C functions
*/
- int ndb_err(NdbTransaction*);
+ int ndb_err(NdbTransaction*, bool have_lock= FALSE);
my_bool register_query_cache_table(THD *thd, char *table_key,
uint key_length,
@@ -715,7 +723,7 @@ private:
NdbIndexScanOperation *m_multi_cursor;
Ndb *get_ndb(THD *thd);
- int update_stats(THD *thd, bool do_read_stat);
+ int update_stats(THD *thd, bool do_read_stat, bool have_lock= FALSE);
};
extern SHOW_VAR ndb_status_variables[];
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2008-09-11 06:11:36 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2008-10-15 12:14:27 +0000
@@ -28,6 +28,7 @@
#include <ndbapi/NdbDictionary.hpp>
#include <ndbapi/ndb_cluster_connection.hpp>
#include <util/NdbAutoPtr.hpp>
+#include <portlib/NdbTick.h>
#ifdef ndb_dynamite
#undef assert
@@ -39,6 +40,8 @@ extern my_bool opt_ndb_log_orig;
extern my_bool opt_ndb_log_update_as_write;
extern my_bool opt_ndb_log_updated_only;
+extern my_bool ndbcluster_silent;
+
/*
defines for cluster replication table names
*/
@@ -146,6 +149,8 @@ static char binlog_mdlkey[MAX_MDLKEY_LEN
/*
Helper functions
*/
+static bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
+static bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
#ifndef DBUG_OFF
/* purecov: begin deadcode */
@@ -473,8 +478,14 @@ static void ndbcluster_binlog_wait(THD *
if (ndb_binlog_running)
{
DBUG_ENTER("ndbcluster_binlog_wait");
- const char *save_info= thd ? thd->proc_info : 0;
ulonglong wait_epoch= ndb_get_latest_trans_gci();
+ /*
+ cluster not connected or no transactions done
+ so nothing to wait for
+ */
+ if (!wait_epoch)
+ DBUG_VOID_RETURN;
+ const char *save_info= thd ? thd->proc_info : 0;
int count= 30;
if (thd)
THD_SET_PROC_INFO(thd,
@@ -663,6 +674,24 @@ static void ndbcluster_reset_slave(THD *
char buf[1024];
char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
run_query(thd, buf, end, NULL, TRUE);
+ if (thd->main_da.is_error() &&
+ ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE) ||
+ (thd->main_da.sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
+ {
+ /*
+ If table does not exist ignore the error as it
+ is a consistant behavior
+ */
+ thd->main_da.reset_diagnostics_area();
+ /*
+ ndbcluster_silent
+ - avoid "no table mysql.ndb_apply_status" warning - ER_NO_SUCH_TABLE
+ - avoid "mysql.ndb_apply_status read only" warning - ER_OPEN_AS_READONLY
+ */
+ if (ndbcluster_silent)
+ mysql_reset_errors(thd, 1);
+ }
+
DBUG_VOID_RETURN;
}
@@ -681,15 +710,134 @@ static bool ndbcluster_flush_logs(handle
return FALSE;
}
+/*
+ Global schema lock across mysql servers
+*/
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb)
+{
+ if (thd_ndb->global_schema_lock_trans)
+ {
+ thd_ndb->global_schema_lock_trans->refresh();
+ return 1;
+ }
+ return 0;
+}
+
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg)
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ if (thd_ndb && thd_ndb->global_schema_lock_error != 0)
+ return HA_ERR_NO_CONNECTION;
+ sql_print_error("NDB: programming error, no lock taken while running "
+ "query %s. Message: %s", thd->query, msg);
+ abort();
+}
+
+#include "ha_ndbcluster_lock_ext.h"
+
+/*
+ lock/unlock calls are reference counted, so calls to lock
+ must be matched to a call to unlock even if the lock call fails
+*/
+static int ndbcluster_global_schema_lock(THD *thd,
+ int report_cluster_disconnected)
+{
+ Ndb *ndb= check_ndb_in_thd(thd);
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ NdbError ndb_error;
+ if (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP)
+ return 0;
+ DBUG_ENTER("ndbcluster_global_schema_lock");
+ DBUG_PRINT("enter", ("query: %s", thd->query));
+ if (thd_ndb->global_schema_lock_count)
+ {
+ if (thd_ndb->global_schema_lock_trans)
+ thd_ndb->global_schema_lock_trans->refresh();
+ else
+ DBUG_ASSERT(thd_ndb->global_schema_lock_error != 0);
+ thd_ndb->global_schema_lock_count++;
+ DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+ thd_ndb->global_schema_lock_count));
+ DBUG_RETURN(0);
+ }
+ DBUG_ASSERT(thd_ndb->global_schema_lock_count == 0);
+ thd_ndb->global_schema_lock_count= 1;
+ thd_ndb->global_schema_lock_error= 0;
+ DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+ thd_ndb->global_schema_lock_count));
+
+ if ((thd_ndb->global_schema_lock_trans=
+ ndbcluster_global_schema_lock_ext(thd, ndb, ndb_error, -1)) != NULL)
+ {
+ DBUG_RETURN(0);
+ }
+
+ /*
+ ndbcluster_silent - avoid "cluster disconnected error"
+ */
+ if (ndbcluster_silent)
+ report_cluster_disconnected= 0;
+ if (ndb_error.code != 4009 || report_cluster_disconnected)
+ {
+ sql_print_warning("NDB: Could not acquire global schema lock (%d)%s",
+ ndb_error.code, ndb_error.message);
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb_error.code, ndb_error.message,
+ "NDB. Could not acquire global schema lock");
+ }
+ thd_ndb->global_schema_lock_error= ndb_error.code ? ndb_error.code : -1;
+ DBUG_RETURN(-1);
+}
+static int ndbcluster_global_schema_unlock(THD *thd)
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ DBUG_ASSERT(thd_ndb != 0);
+ if (thd_ndb == 0 || (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP))
+ return 0;
+ Ndb *ndb= thd_ndb->ndb;
+ DBUG_ENTER("ndbcluster_global_schema_unlock");
+ NdbTransaction *trans= thd_ndb->global_schema_lock_trans;
+ thd_ndb->global_schema_lock_count--;
+ DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+ thd_ndb->global_schema_lock_count));
+ DBUG_ASSERT(ndb != NULL);
+ if (ndb == NULL)
+ return 0;
+ DBUG_ASSERT(trans != NULL || thd_ndb->global_schema_lock_error != 0);
+ if (thd_ndb->global_schema_lock_count != 0)
+ {
+ DBUG_RETURN(0);
+ }
+ thd_ndb->global_schema_lock_error= 0;
+ if (trans)
+ {
+ thd_ndb->global_schema_lock_trans= NULL;
+ NdbError ndb_error;
+ if (ndbcluster_global_schema_unlock_ext(ndb, trans, ndb_error))
+ {
+ sql_print_warning("NDB: Releasing global schema lock (%d)%s",
+ ndb_error.code, ndb_error.message);
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb_error.code,
+ ndb_error.message,
+ "ndb. Releasing global schema lock");
+ DBUG_RETURN(-1);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
enum_binlog_func fn,
void *arg)
{
+ DBUG_ENTER("ndbcluster_binlog_func");
switch(fn)
{
case BFN_RESET_LOGS:
- ndbcluster_reset_logs(thd);
- break;
+ DBUG_RETURN(ndbcluster_reset_logs(thd));
case BFN_RESET_SLAVE:
ndbcluster_reset_slave(thd);
break;
@@ -702,8 +850,35 @@ static int ndbcluster_binlog_func(handle
case BFN_BINLOG_PURGE_FILE:
ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
break;
+ case BFN_GLOBAL_SCHEMA_LOCK:
+ DBUG_RETURN(ndbcluster_global_schema_lock(thd, 1));
+ break;
+ case BFN_GLOBAL_SCHEMA_UNLOCK:
+ ndbcluster_global_schema_unlock(thd);
+ break;
}
- return 0;
+ DBUG_RETURN(0);
+}
+Ndbcluster_global_schema_lock_guard::Ndbcluster_global_schema_lock_guard(THD *thd)
+ : m_thd(thd), m_lock(0)
+{
+}
+Ndbcluster_global_schema_lock_guard::~Ndbcluster_global_schema_lock_guard()
+{
+ if (m_lock)
+ ndbcluster_global_schema_unlock(m_thd);
+}
+int Ndbcluster_global_schema_lock_guard::lock()
+{
+ /* only one lock call allowed */
+ DBUG_ASSERT(m_lock == 0);
+ /*
+ Always se m_lock, even if lock fails. Since the
+ lock/unlock calls are reference counted, the number
+ of calls to lock and unlock need to match up.
+ */
+ m_lock= 1;
+ return ndbcluster_global_schema_lock(m_thd, 0);
}
void ndbcluster_binlog_init_handlerton()
@@ -903,8 +1078,185 @@ static int ndbcluster_create_schema_tabl
DBUG_RETURN(0);
}
+class Thd_ndb_options_guard
+{
+public:
+ Thd_ndb_options_guard(Thd_ndb *thd_ndb)
+ : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
+ ~Thd_ndb_options_guard() { m_val= m_save_val; }
+ void set(uint32 flag) { m_val|= flag; }
+private:
+ uint32 &m_val;
+ uint32 m_save_val;
+};
+
+/*
+ Ndb has no representation of the database schema objects.
+ The mysql.ndb_schema table contains the latest schema operations
+ done via a mysqld, and thus reflects databases created/dropped/altered
+ while a mysqld was disconnected. This function tries to recover
+ the correct state w.r.t created databases using the information in
+ that table.
+
+ Function should only be called while ndbcluster_global_schema_lock
+ is held, to ensure that ndb_schema table is not being updated while
+ scanning.
+*/
+static int ndbcluster_find_all_databases(THD *thd)
+{
+ Ndb *ndb= check_ndb_in_thd(thd);
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ NDBDICT *dict= ndb->getDictionary();
+ NdbTransaction *trans= NULL;
+ NdbError ndb_error;
+ int retries= 100;
+ int retry_sleep= 30; /* 30 milliseconds, transaction */
+ DBUG_ENTER("ndbcluster_find_all_databases");
+ /* Ensure that we have the right lock */
+ if (!ndbcluster_has_global_schema_lock(thd_ndb))
+ DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+ (thd, "ndbcluster_find_all_databases"));
+ ndb->setDatabaseName(NDB_REP_DB);
+ thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ while (1)
+ {
+ char db_buffer[FN_REFLEN];
+ char *db= db_buffer+1;
+ char name[FN_REFLEN];
+ char query[64000];
+ Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+ const NDBTAB *ndbtab= ndbtab_g.get_table();
+ NdbScanOperation *op;
+ NdbBlob *query_blob_handle;
+ int r= 0;
+ if (ndbtab == NULL)
+ {
+ ndb_error= dict->getNdbError();
+ goto error;
+ }
+ trans= ndb->startTransaction();
+ if (trans == NULL)
+ {
+ ndb_error= ndb->getNdbError();
+ goto error;
+ }
+ op= trans->getNdbScanOperation(ndbtab);
+ if (op == NULL)
+ abort();
+ op->readTuples(NdbScanOperation::LM_Read,
+ NdbScanOperation::SF_TupScan, 1);
+
+ r|= op->getValue("db", db_buffer) == NULL;
+ r|= op->getValue("name", name) == NULL;
+ r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
+ r|= query_blob_handle->getValue(query, sizeof(query));
+
+ if (r)
+ abort();
+
+ if (trans->execute(NdbTransaction::NoCommit))
+ {
+ ndb_error= trans->getNdbError();
+ goto error;
+ }
+
+ while ((r= op->nextResult()) == 0)
+ {
+ unsigned db_len= db_buffer[0];
+ unsigned name_len= name[0];
+ /*
+ name_len == 0 means no table name, hence the row
+ is for a database
+ */
+ if (db_len > 0 && name_len == 0)
+ {
+ /* database found */
+ Uint64 query_length= 0;
+ if (query_blob_handle->getLength(query_length))
+ abort();
+ db[db_len]= 0;
+ query[query_length]= 0;
+ build_table_filename(name, sizeof(name), db, "", "", 0);
+ int database_exists= !my_access(name, F_OK);
+ if (strncasecmp("CREATE", query, 6) == 0)
+ {
+ /* Database should exist */
+ if (!database_exists)
+ {
+ /* create missing database */
+ sql_print_information("NDB: Discovered missing database '%s'", db);
+ const int no_print_error[1]= {0};
+ run_query(thd, query, query + query_length,
+ no_print_error, /* print error */
+ TRUE); /* don't binlog the query */
+ /* always reset here */
+ thd->main_da.reset_diagnostics_area();
+ }
+ }
+ else if (strncasecmp("ALTER", query, 5) == 0)
+ {
+ /* Database should exist */
+ if (!database_exists)
+ {
+ /* create missing database */
+ sql_print_information("NDB: Discovered missing database '%s'", db);
+ const int no_print_error[1]= {0};
+ name_len= my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
+ run_query(thd, name, name + name_len,
+ no_print_error, /* print error */
+ TRUE); /* don't binlog the query */
+ thd->main_da.reset_diagnostics_area();
+ run_query(thd, query, query + query_length,
+ no_print_error, /* print error */
+ TRUE); /* don't binlog the query */
+ /* always reset here */
+ thd->main_da.reset_diagnostics_area();
+ }
+ }
+ else if (strncasecmp("DROP", query, 4) == 0)
+ {
+ /* Database should not exist */
+ if (database_exists)
+ {
+ /* drop missing database */
+ sql_print_information("NDB: Discovered reamining database '%s'", db);
+ }
+ }
+ }
+ }
+ if (r == -1)
+ {
+ ndb_error= op->getNdbError();
+ goto error;
+ }
+ ndb->closeTransaction(trans);
+ trans= NULL;
+ DBUG_RETURN(0); // success
+ error:
+ if (trans)
+ {
+ ndb->closeTransaction(trans);
+ trans= NULL;
+ }
+ if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
+ {
+ if (retries--)
+ {
+ do_retry_sleep(retry_sleep);
+ continue; // retry
+ }
+ }
+ DBUG_RETURN(1); // not temp error or too many retries
+ }
+}
+
int ndbcluster_setup_binlog_table_shares(THD *thd)
{
+ Ndbcluster_global_schema_lock_guard global_schema_lock_guard(thd);
+ if (global_schema_lock_guard.lock())
+ return 1;
if (!ndb_schema_share &&
ndbcluster_check_ndb_schema_share() == 0)
{
@@ -933,6 +1285,11 @@ int ndbcluster_setup_binlog_table_shares
}
}
+ if (ndbcluster_find_all_databases(thd))
+ {
+ return 1;
+ }
+
if (!ndbcluster_find_all_files(thd))
{
pthread_mutex_lock(&LOCK_open);
@@ -1109,7 +1466,7 @@ ndbcluster_update_slock(THD *thd,
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
- int retry_sleep= 10; /* 10 milliseconds, transaction */
+ int retry_sleep= 30; /* 30 milliseconds, transaction */
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
@@ -1119,7 +1476,8 @@ ndbcluster_update_slock(THD *thd,
if (ndbtab == 0)
{
- abort();
+ if (dict->getNdbError().code != 4009)
+ abort();
DBUG_RETURN(0);
}
@@ -1204,13 +1562,13 @@ ndbcluster_update_slock(THD *thd,
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError)
+ if (this_error->status == NdbError::TemporaryError && !thd->killed)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue; // retry
}
}
@@ -1446,7 +1804,7 @@ int ndbcluster_log_schema_op(THD *thd,
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
- int retry_sleep= 10; /* 10 milliseconds, transaction */
+ int retry_sleep= 30; /* 30 milliseconds, transaction */
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
@@ -1553,13 +1911,13 @@ int ndbcluster_log_schema_op(THD *thd,
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError)
+ if (this_error->status == NdbError::TemporaryError && !thd->killed)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue; // retry
}
}
@@ -1817,6 +2175,8 @@ ndb_binlog_thread_handle_schema_event(TH
if (ev_type == NDBEVENT::TE_UPDATE ||
ev_type == NDBEVENT::TE_INSERT)
{
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
Cluster_schema *schema= (Cluster_schema *)
sql_alloc(sizeof(Cluster_schema));
MY_BITMAP slock;
@@ -1868,10 +2228,12 @@ ndb_binlog_thread_handle_schema_event(TH
// fall through
case SOT_RENAME_TABLE_NEW:
{
- uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
- "NDB Binlog: Skipping renaming locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
+ uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+ "NDB Binlog: Skipping renaming locally "
+ "defined table '%s.%s' from binlog schema "
+ "event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
errmsg[end]= '\0';
}
@@ -1879,14 +2241,17 @@ ndb_binlog_thread_handle_schema_event(TH
case SOT_DROP_TABLE:
if (schema_type == SOT_DROP_TABLE)
{
- uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
- "NDB Binlog: Skipping dropping locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
+ uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+ "NDB Binlog: Skipping dropping locally "
+ "defined table '%s.%s' from binlog schema "
+ "event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
errmsg[end]= '\0';
}
if (! ndbcluster_check_if_local_table(schema->db, schema->name))
{
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
const int no_print_error[1]=
{ER_BAD_TABLE_ERROR}; /* ignore missing table */
run_query(thd, schema->query,
@@ -1946,6 +2311,7 @@ ndb_binlog_thread_handle_schema_event(TH
break;
// fall through
case SOT_CREATE_TABLE:
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
pthread_mutex_lock(&LOCK_open);
if (ndbcluster_check_if_local_table(schema->db, schema->name))
{
@@ -1973,6 +2339,7 @@ ndb_binlog_thread_handle_schema_event(TH
break;
case SOT_DROP_DB:
/* Drop the database locally if it only contains ndb tables */
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
{
const int no_print_error[1]= {0};
@@ -2002,6 +2369,7 @@ ndb_binlog_thread_handle_schema_event(TH
/* fall through */
case SOT_ALTER_DB:
{
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
const int no_print_error[1]= {0};
run_query(thd, schema->query,
schema->query + schema->query_length,
@@ -2158,8 +2526,10 @@ ndb_binlog_thread_handle_schema_event_po
return;
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
Cluster_schema *schema;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
while ((schema= post_epoch_log_list->pop()))
{
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
DBUG_PRINT("info",
("%s.%s: log query_length: %d query: '%s' type: %d",
schema->db, schema->name,
@@ -2307,6 +2677,7 @@ ndb_binlog_thread_handle_schema_event_po
free_share(&share);
share= 0;
}
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
pthread_mutex_lock(&LOCK_open);
if (ndbcluster_check_if_local_table(schema->db, schema->name))
{
@@ -2486,6 +2857,7 @@ ndb_binlog_thread_handle_schema_event_po
free_share(&share);
share= 0;
}
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
pthread_mutex_lock(&LOCK_open);
if (ndbcluster_check_if_local_table(schema->db, schema->name))
{
@@ -2766,7 +3138,7 @@ void set_binlog_flags(NDB_SHARE *share)
set_binlog_full(share);
}
-bool
+static bool
ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
{
char key[FN_REFLEN];
@@ -2788,7 +3160,7 @@ ndbcluster_check_if_local_table(const ch
DBUG_RETURN(false);
}
-bool
+static bool
ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
{
DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
@@ -3350,9 +3722,9 @@ ndbcluster_create_event_ops(THD *thd, ND
op->setCustomData(NULL);
ndb->dropEventOperation(op);
pthread_mutex_unlock(&injector_mutex);
- if (retries)
+ if (retries && !thd->killed)
{
- my_sleep(retry_sleep);
+ do_retry_sleep(retry_sleep);
continue;
}
DBUG_RETURN(-1);
@@ -5089,17 +5461,17 @@ ndbcluster_show_status_binlog(THD* thd,
pthread_mutex_unlock(&injector_mutex);
buflen=
- snprintf(buf, sizeof(buf),
- "latest_epoch=%s, "
- "latest_trans_epoch=%s, "
- "latest_received_binlog_epoch=%s, "
- "latest_handled_binlog_epoch=%s, "
- "latest_applied_binlog_epoch=%s",
- llstr(ndb_latest_epoch, buff1),
- llstr(ndb_get_latest_trans_gci(), buff2),
- llstr(ndb_latest_received_binlog_epoch, buff3),
- llstr(ndb_latest_handled_binlog_epoch, buff4),
- llstr(ndb_latest_applied_binlog_epoch, buff5));
+ my_snprintf(buf, sizeof(buf),
+ "latest_epoch=%s, "
+ "latest_trans_epoch=%s, "
+ "latest_received_binlog_epoch=%s, "
+ "latest_handled_binlog_epoch=%s, "
+ "latest_applied_binlog_epoch=%s",
+ llstr(ndb_latest_epoch, buff1),
+ llstr(ndb_get_latest_trans_gci(), buff2),
+ llstr(ndb_latest_received_binlog_epoch, buff3),
+ llstr(ndb_latest_handled_binlog_epoch, buff4),
+ llstr(ndb_latest_applied_binlog_epoch, buff5));
if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
"binlog", strlen("binlog"),
buf, buflen))
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2008-03-26 23:22:46 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2008-10-15 12:14:27 +0000
@@ -159,10 +159,6 @@ void ndbcluster_binlog_init_handlerton()
Initialize the binlog part of the NDB_SHARE
*/
int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *table);
-
-bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
-bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
-
int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
uint key_len,
const char *db,
@@ -269,3 +265,17 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
{ thd_set_ha_data(thd, ndbcluster_hton, thd_ndb); }
Ndb* check_ndb_in_thd(THD* thd);
+
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb);
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg);
+
+class Ndbcluster_global_schema_lock_guard
+{
+public:
+ Ndbcluster_global_schema_lock_guard(THD *thd);
+ ~Ndbcluster_global_schema_lock_guard();
+ int lock();
+private:
+ THD *m_thd;
+ int m_lock;
+};
=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc 2008-04-09 13:52:09 +0000
+++ b/sql/ha_ndbcluster_connection.cc 2008-10-02 15:52:29 +0000
@@ -91,7 +91,9 @@ int ndbcluster_connect(int (*connect_cal
(now_time.tv_sec == end_time.tv_sec &&
now_time.tv_usec >= end_time.tv_usec))
break;
- sleep(1);
+ do_retry_sleep(100);
+ if (abort_loop)
+ goto ndbcluster_connect_error;
}
{
=== modified file 'sql/ha_ndbcluster_connection.h'
--- a/sql/ha_ndbcluster_connection.h 2008-04-09 13:52:09 +0000
+++ b/sql/ha_ndbcluster_connection.h 2008-10-02 15:52:29 +0000
@@ -23,3 +23,9 @@ int ndb_has_node_id(uint id);
/* options from from mysqld.cc */
extern ulong opt_ndb_cluster_connection_pool;
+
+/* perform random sleep in the range milli_sleep to 2*milli_sleep */
+inline void do_retry_sleep(unsigned milli_sleep)
+{
+ my_sleep(1000*(milli_sleep + 5*(rand()%(milli_sleep/5))));
+}
=== added file 'sql/ha_ndbcluster_lock_ext.h'
--- a/sql/ha_ndbcluster_lock_ext.h 1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndbcluster_lock_ext.h 2008-10-09 09:10:50 +0000
@@ -0,0 +1,113 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ These functions are shared with ndb_restore so that the creating of
+ tables through ndb_restore is syncronized correctly with the mysqld's
+
+ The lock/unlock functions use the BACKUP_SEQUENCE row in SYSTAB_0
+
+ retry_time == 0 means no retry
+ retry_time < 0 means infinite retries
+ retry_time > 0 means retries for max 'retry_time' seconds
+*/
+static NdbTransaction *
+ndbcluster_global_schema_lock_ext(THD *thd, Ndb *ndb, NdbError &ndb_error,
+ int retry_time= 10)
+{
+ ndb->setDatabaseName("sys");
+ ndb->setDatabaseSchemaName("def");
+ NdbDictionary::Dictionary *dict= ndb->getDictionary();
+ Ndb_table_guard ndbtab_g(dict, "SYSTAB_0");
+ const NdbDictionary::Table *ndbtab= NULL;
+ NdbOperation *op;
+ NdbTransaction *trans= NULL;
+ int retry_sleep= 50; /* 50 milliseconds, transaction */
+ NDB_TICKS time_end;
+
+ if (retry_time > 0)
+ {
+ time_end= NdbTick_CurrentMillisecond();
+ time_end+= retry_time * 1000;
+ }
+ while (1)
+ {
+ if (!ndbtab)
+ {
+ if (!(ndbtab= ndbtab_g.get_table()))
+ {
+ if (dict->getNdbError().status == NdbError::TemporaryError)
+ goto retry;
+ ndb_error= dict->getNdbError();
+ goto error_handler;
+ }
+ }
+
+ trans= ndb->startTransaction();
+ if (trans == NULL)
+ {
+ ndb_error= ndb->getNdbError();
+ goto error_handler;
+ }
+
+ op= trans->getNdbOperation(ndbtab);
+ op->readTuple(NdbOperation::LM_Exclusive);
+ op->equal("SYSKEY_0", NDB_BACKUP_SEQUENCE);
+
+ if (trans->execute(NdbTransaction::NoCommit) == 0)
+ break;
+
+ if (trans->getNdbError().status != NdbError::TemporaryError)
+ goto error_handler;
+ else if (thd->killed)
+ goto error_handler;
+ retry:
+ if (retry_time == 0)
+ goto error_handler;
+ if (retry_time > 0 &&
+ time_end < NdbTick_CurrentMillisecond())
+ goto error_handler;
+ if (trans)
+ {
+ ndb->closeTransaction(trans);
+ trans= NULL;
+ }
+ do_retry_sleep(retry_sleep);
+ }
+ return trans;
+
+ error_handler:
+ if (trans)
+ {
+ ndb_error= trans->getNdbError();
+ ndb->closeTransaction(trans);
+ }
+ return NULL;
+}
+
+static int
+ndbcluster_global_schema_unlock_ext(Ndb *ndb, NdbTransaction *trans,
+ NdbError &ndb_error)
+{
+ if (trans->execute(NdbTransaction::Commit))
+ {
+ ndb_error= trans->getNdbError();
+ ndb->closeTransaction(trans);
+ return -1;
+ }
+ ndb->closeTransaction(trans);
+ return 0;
+}
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2008-09-05 08:57:28 +0000
+++ b/sql/handler.cc 2008-10-15 12:14:27 +0000
@@ -3855,6 +3855,42 @@ int ha_binlog_index_purge_file(THD *thd,
return 0;
}
+static int ha_global_schema_lock(THD *thd)
+{
+ binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_LOCK, 0};
+ binlog_func_foreach(thd, &bfn);
+ if (thd->main_da.is_error())
+ return 1;
+ return 0;
+}
+
+static int ha_global_schema_unlock(THD *thd)
+{
+ binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_UNLOCK, 0};
+ binlog_func_foreach(thd, &bfn);
+ if (thd->main_da.is_error())
+ return 1;
+ return 0;
+}
+
+Ha_global_schema_lock_guard::Ha_global_schema_lock_guard(THD *thd)
+ : m_thd(thd), m_lock(0)
+{
+}
+
+Ha_global_schema_lock_guard::~Ha_global_schema_lock_guard()
+{
+ if (m_lock)
+ ha_global_schema_unlock(m_thd);
+}
+
+int Ha_global_schema_lock_guard::lock()
+{
+ DBUG_ASSERT(m_lock == 0);
+ m_lock= 1;
+ return ha_global_schema_lock(m_thd);
+}
+
struct binlog_log_query_st
{
enum_binlog_command binlog_command;
=== modified file 'sql/handler.h'
--- a/sql/handler.h 2008-09-02 15:44:54 +0000
+++ b/sql/handler.h 2008-10-15 12:14:27 +0000
@@ -334,7 +334,9 @@ enum enum_binlog_func {
BFN_RESET_SLAVE= 2,
BFN_BINLOG_WAIT= 3,
BFN_BINLOG_END= 4,
- BFN_BINLOG_PURGE_FILE= 5
+ BFN_BINLOG_PURGE_FILE= 5,
+ BFN_GLOBAL_SCHEMA_LOCK= 6,
+ BFN_GLOBAL_SCHEMA_UNLOCK=7
};
enum enum_binlog_command {
@@ -2503,6 +2505,16 @@ void ha_binlog_log_query(THD *thd, handl
const char *db, const char *table_name);
void ha_binlog_wait(THD *thd);
int ha_binlog_end(THD *thd);
+class Ha_global_schema_lock_guard
+{
+public:
+ Ha_global_schema_lock_guard(THD *thd);
+ ~Ha_global_schema_lock_guard();
+ int lock();
+private:
+ THD *m_thd;
+ int m_lock;
+};
#else
inline int ha_int_dummy() { return 0; }
#define ha_reset_logs(a) ha_int_dummy()
@@ -2511,4 +2523,11 @@ inline int ha_int_dummy() { return 0; }
#define ha_binlog_log_query(a,b,c,d,e,f,g) do {} while (0)
#define ha_binlog_wait(a) do {} while (0)
#define ha_binlog_end(a) do {} while (0)
+class Ha_global_schema_lock_guard
+{
+public:
+ Ha_global_schema_lock_guard(THD *thd) {}
+ ~Ha_global_schema_lock_guard() {}
+ int lock() { return 0; }
+};
#endif
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2008-09-02 15:44:54 +0000
+++ b/sql/mysqld.cc 2008-10-15 12:14:27 +0000
@@ -6160,7 +6160,7 @@ thread is in the master's binlogs.",
"Turn on more logging in the error log.",
(uchar**) &ndb_extra_logging,
(uchar**) &ndb_extra_logging,
- 0, GET_ULONG, OPT_ARG, 0, 0, 0, 0, 0, 0},
+ 0, GET_ULONG, OPT_ARG, 1, 0, 0, 0, 0, 0},
#ifdef HAVE_NDB_BINLOG
{"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
"Threshold on number of epochs to be behind before reporting binlog status. "
=== modified file 'sql/sql_db.cc'
--- a/sql/sql_db.cc 2008-08-27 08:47:03 +0000
+++ b/sql/sql_db.cc 2008-10-15 12:14:27 +0000
@@ -616,6 +616,7 @@ int mysql_create_db(THD *thd, char *db,
MY_STAT stat_info;
uint create_options= create_info ? create_info->options : 0;
uint path_len;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_create_db");
/* do not create 'information_schema' db */
@@ -643,6 +644,8 @@ int mysql_create_db(THD *thd, char *db,
goto exit2;
}
+ global_schema_lock_guard.lock();
+
pthread_mutex_lock(&LOCK_mysql_create_db);
/* Check directory */
@@ -767,6 +770,7 @@ bool mysql_alter_db(THD *thd, const char
char path[FN_REFLEN+16];
long result=1;
int error= 0;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_alter_db");
/*
@@ -784,6 +788,8 @@ bool mysql_alter_db(THD *thd, const char
if ((error=wait_if_global_read_lock(thd,0,1)))
goto exit2;
+ global_schema_lock_guard.lock();
+
pthread_mutex_lock(&LOCK_mysql_create_db);
/*
@@ -861,6 +867,7 @@ bool mysql_rm_db(THD *thd,char *db,bool
MY_DIR *dirp;
uint length;
TABLE_LIST* dropped_tables= 0;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_rm_db");
/*
@@ -880,6 +887,8 @@ bool mysql_rm_db(THD *thd,char *db,bool
error= -1;
goto exit2;
}
+
+ global_schema_lock_guard.lock();
pthread_mutex_lock(&LOCK_mysql_create_db);
=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc 2008-09-02 15:44:54 +0000
+++ b/sql/sql_delete.cc 2008-10-15 12:14:27 +0000
@@ -977,6 +977,7 @@ bool mysql_truncate(THD *thd, TABLE_LIST
bool error;
uint path_length;
MDL_LOCK_DATA *mdl_lock_data= 0;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_truncate");
bzero((char*) &create_info,sizeof(create_info));
@@ -1030,6 +1031,8 @@ bool mysql_truncate(THD *thd, TABLE_LIST
HTON_CAN_RECREATE))
goto trunc_by_del;
+ if (table_type == DB_TYPE_NDBCLUSTER)
+ global_schema_lock_guard.lock();
/*
FIXME: Actually code of TRUNCATE breaks meta-data locking protocol since
tries to get table enging and therefore accesses table in some way
=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc 2008-09-02 15:44:54 +0000
+++ b/sql/sql_parse.cc 2008-10-15 12:14:27 +0000
@@ -2503,6 +2503,7 @@ mysql_execute_command(THD *thd)
if (select_lex->item_list.elements) // With select
{
select_result *result;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
select_lex->options|= SELECT_NO_UNLOCK;
unit->set_limit(select_lex);
@@ -2524,6 +2525,8 @@ mysql_execute_command(THD *thd)
{
lex->link_first_table_back(create_table, link_to_local);
create_table->open_type= TABLE_LIST::OPEN_OR_CREATE;
+ if (!thd->locked_tables_mode)
+ global_schema_lock_guard.lock();
}
if (!(res= open_and_lock_tables(thd, lex->query_tables)))
=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc 2008-06-11 04:33:36 +0000
+++ b/sql/sql_rename.cc 2008-10-15 12:14:27 +0000
@@ -37,6 +37,7 @@ bool mysql_rename_tables(THD *thd, TABLE
TABLE_LIST *ren_table= 0;
int to_table;
char *rename_log_table[2]= {NULL, NULL};
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_rename_tables");
/*
@@ -132,6 +133,13 @@ bool mysql_rename_tables(THD *thd, TABLE
DBUG_RETURN(1);
}
}
+
+ /*
+ Rename table may clash with parallell or existing ndb table
+ thus a global shema lock is needed to ensure global serialization.
+ Moreover we do not know here what table type the tables have.
+ */
+ global_schema_lock_guard.lock();
if (lock_table_names(thd, table_list))
goto err;
=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc 2008-09-05 08:57:28 +0000
+++ b/sql/sql_table.cc 2008-10-15 12:14:27 +0000
@@ -1554,6 +1554,14 @@ int mysql_rm_table_part2(THD *thd, TABLE
}
mysql_ha_rm_tables(thd, tables);
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+ /*
+ Here we have no way of knowing if an ndb table is one of
+ the ones that should be dropped, so we take the global
+ schema lock to be safe.
+ */
+ if (!drop_temporary)
+ global_schema_lock_guard.lock();
/*
If we have the table in the definition cache, we don't have to check the
@@ -3739,8 +3747,14 @@ bool mysql_create_table(THD *thd, const
{
MDL_LOCK_DATA *target_lock_data= 0;
bool result;
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_create_table");
+ if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
+ !create_info->frm_only &&
+ !internal_tmp_table)
+ global_schema_lock_guard.lock();
+
/* Wait for any database locks */
pthread_mutex_lock(&LOCK_lock_db);
while (!thd->killed &&
@@ -4819,9 +4833,13 @@ bool mysql_create_like_table(THD* thd, T
#ifdef WITH_PARTITION_STORAGE_ENGINE
char tmp_path[FN_REFLEN];
#endif
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
DBUG_ENTER("mysql_create_like_table");
+ if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+ global_schema_lock_guard.lock();
+
/*
By opening source table and thus acquiring shared metadata lock on it
we guarantee that it exists and no concurrent DDL operation will mess
@@ -6561,6 +6579,11 @@ bool mysql_alter_table(THD *thd,char *ne
if (wait_if_global_read_lock(thd,0,1))
DBUG_RETURN(TRUE);
+
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+ if (table_type == DB_TYPE_NDBCLUSTER)
+ global_schema_lock_guard.lock();
+
if (lock_table_names(thd, table_list))
{
error= 1;
@@ -6588,6 +6611,21 @@ view_err:
DBUG_RETURN(error);
}
+ Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+ if (table_type == DB_TYPE_NDBCLUSTER ||
+ (create_info->db_type && create_info->db_type->db_type == DB_TYPE_NDBCLUSTER))
+ {
+ /*
+ To avoid deadlock in this situation
+ */
+ if (thd->locked_tables_mode)
+ {
+ my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
+ ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+ global_schema_lock_guard.lock();
+ }
if (!(table= open_n_lock_single_table(thd, table_list, TL_WRITE_ALLOW_READ,
MYSQL_OPEN_TAKE_UPGRADABLE_MDL)))
DBUG_RETURN(TRUE);
=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h 2007-03-23 13:06:00 +0000
+++ b/storage/ndb/include/ndb_constants.h 2008-09-30 06:55:35 +0000
@@ -97,4 +97,9 @@
#define NDB_SUM_READONLY 1
#define NDB_SUM_READ_WRITE 2
+/*
+ * SYSTAB_0 reserved keys
+ */
+#define NDB_BACKUP_SEQUENCE 0x1F000000
+
#endif
=== modified file 'storage/ndb/include/util/ndb_opts.h'
--- a/storage/ndb/include/util/ndb_opts.h 2007-07-02 17:08:02 +0000
+++ b/storage/ndb/include/util/ndb_opts.h 2008-10-09 11:53:53 +0000
@@ -100,8 +100,13 @@ const char *opt_debug= 0;
static void ndb_std_print_version()
{
- printf("MySQL distrib %s, for %s (%s)\n",
- NDB_VERSION_STRING,SYSTEM_TYPE,MACHINE_TYPE);
+#ifndef DBUG_OFF
+ const char *suffix= "-debug";
+#else
+ const char *suffix= "";
+#endif
+ printf("MySQL distrib %s%s, for %s (%s)\n",
+ NDB_VERSION_STRING,suffix,SYSTEM_TYPE,MACHINE_TYPE);
}
static void usage();
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-06-09 11:57:17 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-09-30 06:55:35 +0000
@@ -61,8 +61,6 @@
static NDB_TICKS startTime;
-static const Uint32 BACKUP_SEQUENCE = 0x1F000000;
-
#ifdef VM_TRACE
#define DEBUG_OUT(x) ndbout << x << endl
#else
@@ -170,7 +168,7 @@ Backup::createSequence(Signal* signal)
UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
req->senderData = RNIL;
- req->sequenceId = BACKUP_SEQUENCE;
+ req->sequenceId = NDB_BACKUP_SEQUENCE;
req->requestType = UtilSequenceReq::Create;
sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
@@ -1131,7 +1129,7 @@ Backup::execBACKUP_REQ(Signal* signal)
ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ;
utilReq->senderData = ptr.i;
- utilReq->sequenceId = BACKUP_SEQUENCE;
+ utilReq->sequenceId = NDB_BACKUP_SEQUENCE;
utilReq->requestType = UtilSequenceReq::NextVal;
sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
signal, UtilSequenceReq::SignalLength, JBB);
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2008-09-18 14:29:25 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2008-09-26 12:55:06 +0000
@@ -435,6 +435,27 @@ Trix::execDUMP_STATE_ORD(Signal* signal)
// Ignore
}
}
+
+ if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
+ {
+ RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
+ return;
+ }
+
+ if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
+ {
+ RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
+ return;
+ }
+
+ if (signal->theData[0] == 8004)
+ {
+ infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
+ c_theSubscriptionRecPool.getSize(),
+ c_theSubscriptionRecPool.getNoOfFree());
+ return;
+ }
+
}
// Build index
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.hpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2008-02-20 09:04:29 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.hpp 2008-09-26 12:55:06 +0000
@@ -131,6 +131,7 @@ private:
* The pool of node records
*/
ArrayPool<SubscriptionRecord> c_theSubscriptionRecPool;
+ RSS_AP_SNAPSHOT(c_theSubscriptionRecPool);
/**
* The list of other subscriptions
=== modified file 'storage/ndb/src/ndbapi/NdbInterpretedCode.cpp'
--- a/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp 2008-05-22 16:22:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp 2008-09-29 18:26:36 +0000
@@ -666,17 +666,21 @@ NdbInterpretedCode::add_val(Uint32 attrI
int res= 0;
if ((res= read_attr(6, attrId) != 0))
return res;
-
+
/* Load constant into register 7 */
/* We attempt to use the smallest constant load
* instruction
*/
if (aValue < (1 << 16))
+ {
if ((res= load_const_u16(7, aValue)) != 0)
return res;
+ }
else
+ {
if ((res= load_const_u32(7, aValue)) != 0)
return res;
+ }
/* Add registers 6 and 7 -> 7*/
if ((res= add_reg(7, 6, 7)) != 0)
@@ -700,6 +704,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
* instruction
*/
if ((aValue >> 32) == 0)
+ {
if (aValue < (1 << 16))
{
if ((res= load_const_u16(7, aValue)) != 0)
@@ -710,6 +715,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
if ((res= load_const_u32(7, aValue)) != 0)
return res;
}
+ }
else
if ((res= load_const_u64(7, aValue)) != 0)
return res;
@@ -717,7 +723,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
/* Add registers 6 and 7 -> 7*/
if ((res= add_reg(7, 6, 7)) != 0)
return res;
-
+
/* Write back */
return write_attr(attrId, 7);
}
@@ -736,11 +742,15 @@ NdbInterpretedCode::sub_val(Uint32 attrI
* instruction
*/
if (aValue < (1 << 16))
+ {
if ((res= load_const_u16(7, aValue)) != 0)
- return res;
+ return res;
+ }
else
+ {
if ((res= load_const_u32(7, aValue)) != 0)
return res;
+ }
/* Subtract register (R7=R6-R7)*/
if ((res= sub_reg(7, 6, 7)) != 0)
@@ -764,6 +774,7 @@ NdbInterpretedCode::sub_val(Uint32 attrI
* instruction
*/
if ((aValue >> 32) == 0)
+ {
if (aValue < (1 << 16))
{
if ((res= load_const_u16(7, aValue)) != 0)
@@ -774,9 +785,12 @@ NdbInterpretedCode::sub_val(Uint32 attrI
if ((res= load_const_u32(7, aValue)) != 0)
return res;
}
+ }
else
+ {
if ((res= load_const_u64(7, aValue)) != 0)
return res;
+ }
/* Subtract register (R7=R6-R7)*/
if ((res= sub_reg(7, 6, 7)) != 0)
=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am 2008-02-21 14:24:09 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am 2008-09-30 08:18:41 +0000
@@ -57,7 +57,8 @@ testIndexStat \
ndbapi_50compat0 \
ndbapi_50compat1 \
testNDBT \
-NdbRepStress
+NdbRepStress \
+msa
EXTRA_PROGRAMS = \
test_event \
@@ -115,6 +116,7 @@ testSRBank_SOURCES = testSRBank.cpp
test_event_merge_SOURCES = test_event_merge.cpp
test_event_multi_table_SOURCES = test_event_multi_table.cpp
testIndexStat_SOURCES = testIndexStat.cpp
+msa_SOURCES = msa.cpp
ndbapi_50compat0_CPPFLAGS = -DNDBAPI_50_COMPAT
ndbapi_50compat0_SOURCES = ndbapi_50compat0.cpp
=== modified file 'storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp'
--- a/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp 2008-09-30 08:18:41 +0000
@@ -25,6 +25,8 @@
#include "userInterface.h"
#include "dbGenerator.h"
+#include "ndb_schema.hpp"
+
static int numProcesses;
static int numSeconds;
@@ -33,6 +35,8 @@ static int parallellism;
static int millisSendPoll;
static int minEventSendPoll;
static int forceSendPoll;
+static bool useNdbRecord;
+static bool useCombUpd;
static ThreadData *data;
static Ndb_cluster_connection *g_cluster_connection= 0;
@@ -53,8 +57,8 @@ static void usage(const char *prog)
++progname;
ndbout_c(
- "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
- "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
+ "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]"
+ "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
" -proc <num> Specifies that <num> is the number of\n"
" threads. The default is 1.\n"
" -time <num> Specifies that the test will run for <num> sec.\n"
@@ -68,7 +72,11 @@ static void usage(const char *prog)
"sendPoll\n"
" Default is 1\n"
" -f <num> force parameter to sendPoll\n"
- " Default is 0\n",
+ " Default is 0\n"
+ " -ndbrecord Use NdbRecord Api.\n"
+ " Default is to use old Api\n"
+ " -combupdread Use update pre-read operation where possible\n"
+ " Default is to use separate read+update ops\n",
progname);
}
@@ -85,7 +93,8 @@ parse_args(int argc, const char **argv)
millisSendPoll = 10000;
minEventSendPoll = 1;
forceSendPoll = 0;
-
+ useNdbRecord = false;
+ useCombUpd = false;
i = 1;
while (i < argc){
@@ -156,6 +165,15 @@ parse_args(int argc, const char **argv)
}
i += 2;
}
+ else if (strcmp("-ndbrecord",argv[i]) == 0) {
+ useNdbRecord= true;
+ i++;
+ }
+ else if (strcmp("-combupdread",argv[i]) == 0) {
+ /* Comb up some dread */
+ useCombUpd= true;
+ i++;
+ }
else {
return 1;
}
@@ -169,6 +187,10 @@ parse_args(int argc, const char **argv)
ndbout_c("exiting...");
return 1;
}
+ if (useNdbRecord && useCombUpd){
+ ndbout_c("NdbRecord does not currently support combined update "
+ "and read. Using separate read and update ops");
+ }
return 0;
}
@@ -232,6 +254,7 @@ print_stats(const char *title,
ndbout_c("Processor : %s", name);
ndbout_c("Number of Proc: %d",numProc);
ndbout_c("Parallellism : %d", parallellism);
+ ndbout_c("UseNdbRecord : %u", useNdbRecord);
ndbout_c("\n");
if( gen->totalTransactions == 0 ) {
@@ -316,19 +339,192 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
ndbout << "Cluster nodes not ready in 30 seconds." << endl;
return 0;
}
-
+
+ NdbRecordSharedData* ndbRecordSharedDataPtr= NULL;
+
g_cluster_connection= &con;
data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
-
+
+ NdbInterpretedCode* prog1= 0;
+ NdbInterpretedCode* prog2= 0;
+ NdbInterpretedCode* prog3= 0;
+
+ if (useNdbRecord)
+ {
+ /* We'll create NdbRecord structures to match the TransactionData
+ * struct
+ */
+
+ ndbRecordSharedDataPtr= (NdbRecordSharedData*)
+ malloc(sizeof(NdbRecordSharedData));
+ Ndb* tempNdb= asyncDbConnect(1);
+ NdbDictionary::Dictionary* dict= tempNdb->getDictionary();
+
+ NdbDictionary::RecordSpecification cols[7];
+
+ const NdbDictionary::Table* tab= dict->getTable(SUBSCRIBER_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SUBSCRIBER_NUMBER);
+ cols[0].offset= offsetof(TransactionData, number);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SUBSCRIBER_NAME);
+ cols[1].offset= offsetof(TransactionData, name);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_SUBSCRIBER_GROUP);
+ cols[2].offset= offsetof(TransactionData, group_id);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+ cols[3].column= tab->getColumn((int) IND_SUBSCRIBER_LOCATION);
+ cols[3].offset= offsetof(TransactionData, location);
+ cols[3].nullbit_byte_offset= 0;
+ cols[3].nullbit_bit_in_byte= 0;
+ cols[4].column= tab->getColumn((int) IND_SUBSCRIBER_SESSIONS);
+ cols[4].offset= offsetof(TransactionData, sessions);
+ cols[4].nullbit_byte_offset= 0;
+ cols[4].nullbit_bit_in_byte= 0;
+ cols[5].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_BY);
+ cols[5].offset= offsetof(TransactionData, changed_by);
+ cols[5].nullbit_byte_offset= 0;
+ cols[5].nullbit_bit_in_byte= 0;
+ cols[6].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_TIME);
+ cols[6].offset= offsetof(TransactionData, changed_time);
+ cols[6].nullbit_byte_offset= 0;
+ cols[6].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->subscriberTableNdbRecord=
+ dict->createRecord(tab, cols, 7, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->subscriberTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 1 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(GROUP_TABLE);
+ cols[0].column= tab->getColumn((int) IND_GROUP_ID);
+ cols[0].offset= offsetof(TransactionData, group_id);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ /* GROUP_NAME not used via NdbRecord */
+ cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_READ);
+ cols[1].offset= offsetof(TransactionData, permission);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_GROUP_ALLOW_INSERT);
+ cols[2].offset= offsetof(TransactionData, permission);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+ cols[3].column= tab->getColumn((int) IND_GROUP_ALLOW_DELETE);
+ cols[3].offset= offsetof(TransactionData, permission);
+ cols[3].nullbit_byte_offset= 0;
+ cols[3].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->groupTableNdbRecord=
+ dict->createRecord(tab, cols, 4, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->groupTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 2: " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(SESSION_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SESSION_SUBSCRIBER);
+ cols[0].offset= offsetof(TransactionData, number);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SESSION_SERVER);
+ cols[1].offset= offsetof(TransactionData, server_id);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ cols[2].column= tab->getColumn((int) IND_SESSION_DATA);
+ cols[2].offset= offsetof(TransactionData, session_details);
+ cols[2].nullbit_byte_offset= 0;
+ cols[2].nullbit_bit_in_byte= 0;
+
+ ndbRecordSharedDataPtr->sessionTableNdbRecord=
+ dict->createRecord(tab, cols, 3, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->sessionTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 3 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ tab= dict->getTable(SERVER_TABLE);
+ cols[0].column= tab->getColumn((int) IND_SERVER_SUBSCRIBER_SUFFIX);
+ cols[0].offset= offsetof(TransactionData, suffix);
+ cols[0].nullbit_byte_offset= 0;
+ cols[0].nullbit_bit_in_byte= 0;
+ cols[1].column= tab->getColumn((int) IND_SERVER_ID);
+ cols[1].offset= offsetof(TransactionData, server_id);
+ cols[1].nullbit_byte_offset= 0;
+ cols[1].nullbit_bit_in_byte= 0;
+ /* SERVER_NAME not used via NdbRecord*/
+ /* SERVER_READS not used via NdbRecord */
+ /* SERVER_INSERTS not used via NdbRecord */
+ /* SERVER_DELETES not used via NdbRecord */
+
+ ndbRecordSharedDataPtr->serverTableNdbRecord=
+ dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
+
+ if (ndbRecordSharedDataPtr->serverTableNdbRecord == NULL)
+ {
+ ndbout << "Error creating record 4 : " << dict->getNdbError() << endl;
+ return -1;
+ }
+
+ /* Create program to increment server reads column */
+ prog1= new NdbInterpretedCode(tab);
+
+ if (prog1->add_val(IND_SERVER_READS, (Uint32)1) ||
+ prog1->interpret_exit_ok() ||
+ prog1->finalise())
+ {
+ ndbout << "Program 1 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ prog2= new NdbInterpretedCode(tab);
+
+ if (prog2->add_val(IND_SERVER_INSERTS, (Uint32)1) ||
+ prog2->interpret_exit_ok() ||
+ prog2->finalise())
+ {
+ ndbout << "Program 2 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ prog3= new NdbInterpretedCode(tab);
+
+ if (prog3->add_val(IND_SERVER_DELETES, (Uint32)1) ||
+ prog3->interpret_exit_ok() ||
+ prog3->finalise())
+ {
+ ndbout << "Program 3 definition failed, exiting." << endl;
+ return -1;
+ }
+
+ ndbRecordSharedDataPtr->incrServerReadsProg= prog1;
+ ndbRecordSharedDataPtr->incrServerInsertsProg= prog2;
+ ndbRecordSharedDataPtr->incrServerDeletesProg= prog3;
+
+ asyncDbDisconnect(tempNdb);
+ }
+
for(i = 0; i < numProcesses; i++) {
for(j = 0; j<parallellism; j++){
- data[i*parallellism+j].warmUpSeconds = numWarmSeconds;
- data[i*parallellism+j].testSeconds = numSeconds;
- data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
- data[i*parallellism+j].randomSeed =
+ int tid= i*parallellism + j;
+ data[tid].warmUpSeconds = numWarmSeconds;
+ data[tid].testSeconds = numSeconds;
+ data[tid].coolDownSeconds = numWarmSeconds;
+ data[tid].randomSeed =
NdbTick_CurrentMillisecond()+i+j;
- data[i*parallellism+j].changedTime = 0;
- data[i*parallellism+j].runState = Runnable;
+ data[tid].changedTime = 0;
+ data[tid].runState = Runnable;
+ data[tid].ndbRecordSharedData = ndbRecordSharedDataPtr;
+ data[tid].useCombinedUpdate = useCombUpd;
}
sprintf(threadName, "AsyncThread[%d]", i);
pThread = NdbThread_Create(threadRoutine,
@@ -355,6 +551,14 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
}
ndbout_c("All threads have finished");
+
+ if (useNdbRecord)
+ {
+ free(ndbRecordSharedDataPtr);
+ delete(prog1);
+ delete(prog2);
+ delete(prog3);
+ }
/*-------------------------------------------*/
/* Clear all structures for total statistics */
@@ -412,7 +616,6 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
#include <sys/types.h>
#include <time.h>
-#include "ndb_schema.hpp"
#include "ndb_error.hpp"
#include "userInterface.h"
#include <NdbMutex.h>
=== modified file 'storage/ndb/test/ndbapi/bench/ndb_async2.cpp'
--- a/storage/ndb/test/ndbapi/bench/ndb_async2.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/ndb_async2.cpp 2008-09-30 08:18:41 +0000
@@ -66,6 +66,10 @@ startTransaction(Ndb * pNDB, ThreadData
#endif
}
+// NdbRecord helper macros
+#define SET_MASK(mask, attrId) \
+ mask[attrId >> 3] |= (1 << (attrId & 7))
+
void
start_T1(Ndb * pNDB, ThreadData * td, int async){
@@ -78,17 +82,46 @@ start_T1(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
- if (MyOp != NULL) {
- MyOp->updateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->setValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
+ const NdbOperation* op= NULL;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ //SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+
+ op= pCON->updateTuple(record,
+ rowPtr,
+ record,
+ rowPtr,
+ mask);
+ }
+ else
+ {
+ NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ op= MyOp;
+ if (MyOp != NULL) {
+ MyOp->updateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->setValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ }
+ }
+
+ if (op != NULL)
+ {
if (async == 1) {
pCON->executeAsynchPrepare( Commit , T1_Callback, td);
} else {
@@ -97,7 +130,7 @@ start_T1(Ndb * pNDB, ThreadData * td, in
return;
}//if
} else {
- CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
+ CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
}//if
}
@@ -146,21 +179,43 @@ start_T2(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_NAME,
- td->transactionData.name);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_NAME);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ CHECK_NULL((void*) MyOp, "T2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_NAME,
+ td->transactionData.name);
+ }
+
if (async == 1) {
pCON->executeAsynchPrepare( Commit , T2_Callback, td);
} else {
@@ -183,6 +238,7 @@ T2_Callback(int result, NdbConnection *
start_T2(td->pNDB, td, stat_async);
return;
}//if
+
td->pNDB->closeTransaction(pCON);
complete_T2(td);
}
@@ -217,24 +273,48 @@ start_T3(Ndb * pNDB, ThreadData * td, in
CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
NdbSleep_MilliSleep(10);
}
-
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
+
+ const NdbOperation* op;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ op= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ }
+ else
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ op= MyOp;
+ CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ }
+
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
@@ -259,15 +339,35 @@ T3_Callback_1(int result, NdbConnection
return;
}//if
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_READ,
- (char *)&td->transactionData.permission);
+ const NdbOperation* op= NULL;
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_READ);
+
+ op= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read, mask);
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ op= MyOp;
+ CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_READ,
+ (char *)&td->transactionData.permission);
+ }
+
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
} else {
@@ -305,30 +405,66 @@ T3_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->simpleRead();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- MyOp->getValue(IND_SESSION_DATA,
- (char *)td->transactionData.session_details);
-
- /* Operation 4 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_READS, (uint32)1);
+ /* Operations 3 + 4 */
+ if (td->ndbRecordSharedData)
+ {
+ /* Op 3 */
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SESSION_DATA);
+
+ const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_SimpleRead,
+ mask);
+ CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
+ pCON->getNdbError());
+
+ /* Op 4 */
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ /* Attach interpreted program */
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->simpleRead();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ MyOp->getValue(IND_SESSION_DATA,
+ (char *)td->transactionData.session_details);
+
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_READS, (uint32)1);
+ }
+
td->transactionData.branchExecuted = 1;
} else {
DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
@@ -359,6 +495,7 @@ T3_Callback_3(int result, NdbConnection
start_T3(td->pNDB, td, stat_async);
return;
}//if
+
td->pNDB->closeTransaction(pCON);
complete_T3(td);
}
@@ -392,26 +529,120 @@ start_T4(Ndb * pNDB, ThreadData * td, in
CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
NdbSleep_MilliSleep(10);
}
-
- NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
- MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
- (uint32)td->transactionData.server_bit);
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+ CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
+ pCON->getNdbError());
+
+ m= 0;
+
+ /* Create program to add something to the subscriber
+ * sessions column
+ */
+ Uint32 codeBuf[20];
+
+ for (Uint32 p=0; p<20; p++)
+ codeBuf[p]= 0;
+
+ NdbInterpretedCode program(pNDB->getDictionary()->
+ getTable(SUBSCRIBER_TABLE),
+ codeBuf,
+ 20);
+
+ if (program.add_val(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit) ||
+ program.interpret_exit_ok() ||
+ program.finalise())
+ {
+ CHECK_NULL(NULL , "T4-1: Program create failed", td,
+ program.getNdbError());
+ }
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= &program;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+ mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
+ pCON->getNdbError());
+
+ }
+ else
+ {
+ /* Use old Api */
+ if (td->useCombinedUpdate)
+ {
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ else
+ {
+ /* Separate read op + update op
+ * Relies on relative ordering of operation execution on a single
+ * row
+ */
+ NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
+ pCON->getNdbError());
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
+ pCON->getNdbError());
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ }
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
@@ -438,15 +669,35 @@ T4_Callback_1(int result, NdbConnection
td->transactionData.server_id);
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_INSERT,
- (char *)&td->transactionData.permission);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+
+ CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_INSERT,
+ (char *)&td->transactionData.permission);
+ }
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
} else {
@@ -484,32 +735,66 @@ T4_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
-
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->insertTuple();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- MyOp->setValue(SESSION_DATA,
- (char *)td->transactionData.session_details);
- /* Operation 4 */
-
- /* Operation 5 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+ /* Operations 3 + 4 */
+
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SESSION_SUBSCRIBER);
+ SET_MASK(mask, IND_SESSION_SERVER);
+ SET_MASK(mask, IND_SESSION_DATA);
+
+ const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
+
+ CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
+ pCON->getNdbError());
+
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts, sizeof(opts));
+
+ CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->insertTuple();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ MyOp->setValue(IND_SESSION_DATA,
+ (char *)td->transactionData.session_details);
+ /* Operation 4 */
+
+ /* Operation 5 */
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+ }
td->transactionData.branchExecuted = 1;
} else {
td->transactionData.branchExecuted = 0;
@@ -551,7 +836,7 @@ T4_Callback_3(int result, NdbConnection
start_T4(td->pNDB, td, stat_async);
return;
}//if
-
+
DEBUG3("T4(%.*s, %.2d): - Completing",
SUBSCRIBER_NUMBER_LENGTH,
td->transactionData.number,
@@ -590,25 +875,113 @@ start_T5(Ndb * pNDB, ThreadData * td, in
NdbSleep_MilliSleep(10);
}
- NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
- CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SUBSCRIBER_NUMBER,
- td->transactionData.number);
- MyOp->getValue(IND_SUBSCRIBER_LOCATION,
- (char *)&td->transactionData.location);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
- td->transactionData.changed_by);
- MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
- td->transactionData.changed_time);
- MyOp->getValue(IND_SUBSCRIBER_GROUP,
- (char *)&td->transactionData.group_id);
- MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
- (char *)&td->transactionData.sessions);
- MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
- (uint32)td->transactionData.server_bit);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ subscriberTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+ SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+ SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+ SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+ CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
+ pCON->getNdbError());
+
+ m= 0;
+
+ /* Create program to subtract something from the
+ * subscriber sessions column
+ */
+ Uint32 codeBuf[20];
+ NdbInterpretedCode program(pNDB->getDictionary()->
+ getTable(SUBSCRIBER_TABLE),
+ codeBuf,
+ 20);
+ if (program.sub_val(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit) ||
+ program.interpret_exit_ok() ||
+ program.finalise())
+ {
+ CHECK_NULL(NULL , "T5: Program create failed", td,
+ program.getNdbError());
+ }
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= &program;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+ mask,
+ &opts,
+ sizeof(opts));
+ CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ /* Use old Api */
+ if (td->useCombinedUpdate)
+ {
+ NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+ MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ else
+ {
+ /* Use separate read and update operations
+ * This relies on execution ordering between operations on
+ * the same row
+ */
+ NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
+ pCON->getNdbError());
+ MyOp->readTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->getValue(IND_SUBSCRIBER_LOCATION,
+ (char *)&td->transactionData.location);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
+ td->transactionData.changed_by);
+ MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
+ td->transactionData.changed_time);
+ MyOp->getValue(IND_SUBSCRIBER_GROUP,
+ (char *)&td->transactionData.group_id);
+ MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+ (char *)&td->transactionData.sessions);
+
+ MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+ CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
+ pCON->getNdbError());
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SUBSCRIBER_NUMBER,
+ td->transactionData.number);
+ MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
+ (uint32)td->transactionData.server_bit);
+ }
+ }
stat_async = async;
if (async == 1) {
pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
@@ -634,15 +1007,36 @@ T5_Callback_1(int result, NdbConnection
td->transactionData.number,
td->transactionData.server_id);
- NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
- CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->readTuple();
- MyOp->equal(IND_GROUP_ID,
- (char*)&td->transactionData.group_id);
- MyOp->getValue(IND_GROUP_ALLOW_DELETE,
- (char *)&td->transactionData.permission);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ groupTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
+
+ const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+ NdbOperation::LM_Read,
+ mask);
+
+ CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+ CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->readTuple();
+ MyOp->equal(IND_GROUP_ID,
+ (char*)&td->transactionData.group_id);
+ MyOp->getValue(IND_GROUP_ALLOW_DELETE,
+ (char *)&td->transactionData.permission);
+ }
+
if (stat_async == 1) {
pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
} else {
@@ -680,29 +1074,58 @@ T5_Callback_2(int result, NdbConnection
SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
td->transactionData.suffix);
- /* Operation 3 */
- NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
- CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->deleteTuple();
- MyOp->equal(IND_SESSION_SUBSCRIBER,
- (char*)td->transactionData.number);
- MyOp->equal(IND_SESSION_SERVER,
- (char*)&td->transactionData.server_id);
- /* Operation 4 */
-
- /* Operation 5 */
- MyOp = pCON->getNdbOperation(SERVER_TABLE);
- CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
- pCON->getNdbError());
-
- MyOp->interpretedUpdateTuple();
- MyOp->equal(IND_SERVER_ID,
- (char*)&td->transactionData.server_id);
- MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
- (char*)td->transactionData.suffix);
- MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+ if (td->ndbRecordSharedData)
+ {
+ char* rowPtr= (char*) &td->transactionData;
+ const NdbRecord* record= td->ndbRecordSharedData->
+ sessionTableNdbRecord;
+ Uint32 m=0;
+ unsigned char* mask= (unsigned char*) &m;
+
+ const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
+ CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
+ pCON->getNdbError());
+
+ record= td->ndbRecordSharedData->
+ serverTableNdbRecord;
+ m= 0;
+
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
+
+ MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+ &opts, sizeof(opts));
+
+ CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
+ pCON->getNdbError());
+ }
+ else
+ {
+ /* Operation 3 */
+ NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+ CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->deleteTuple();
+ MyOp->equal(IND_SESSION_SUBSCRIBER,
+ (char*)td->transactionData.number);
+ MyOp->equal(IND_SESSION_SERVER,
+ (char*)&td->transactionData.server_id);
+ /* Operation 4 */
+
+ /* Operation 5 */
+ MyOp = pCON->getNdbOperation(SERVER_TABLE);
+ CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
+ pCON->getNdbError());
+
+ MyOp->interpretedUpdateTuple();
+ MyOp->equal(IND_SERVER_ID,
+ (char*)&td->transactionData.server_id);
+ MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+ (char*)td->transactionData.suffix);
+ MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+ }
td->transactionData.branchExecuted = 1;
} else {
td->transactionData.branchExecuted = 0;
=== modified file 'storage/ndb/test/ndbapi/bench/testData.h'
--- a/storage/ndb/test/ndbapi/bench/testData.h 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/testData.h 2008-09-30 08:18:41 +0000
@@ -121,6 +121,16 @@ typedef struct {
} TransactionData ;
typedef struct {
+ const struct NdbRecord* subscriberTableNdbRecord;
+ const struct NdbRecord* groupTableNdbRecord;
+ const struct NdbRecord* sessionTableNdbRecord;
+ const struct NdbInterpretedCode* incrServerReadsProg;
+ const struct NdbInterpretedCode* incrServerInsertsProg;
+ const struct NdbInterpretedCode* incrServerDeletesProg;
+ const struct NdbRecord* serverTableNdbRecord;
+} NdbRecordSharedData ;
+
+typedef struct {
struct NdbThread* pThread;
unsigned long randomSeed;
@@ -135,10 +145,12 @@ typedef struct {
/**
* For async execution
*/
- RunState runState;
- double startTime;
- TransactionData transactionData;
- struct Ndb * pNDB;
+ RunState runState;
+ double startTime;
+ TransactionData transactionData;
+ struct Ndb * pNDB;
+ NdbRecordSharedData* ndbRecordSharedData;
+ bool useCombinedUpdate;
} ThreadData;
/***************************************************************
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2007-08-01 03:07:58 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2008-10-01 09:24:07 +0000
@@ -56,6 +56,13 @@ enum StartType {
stStop
} ;
+struct ThreadNdb
+{
+ int NoOfOps;
+ int ThreadNo;
+ char * record;
+};
+
extern "C" { static void* threadLoop(void*); }
static void setAttrNames(void);
static void setTableNames(void);
@@ -63,8 +70,10 @@ static int readArguments(int argc, const
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
+static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType,
+ Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
-static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
@@ -77,12 +86,6 @@ static int
ErrorData * flexAsynchErrorData;
-struct ThreadNdb
-{
- int NoOfOps;
- int ThreadNo;
-};
-
static NdbThread* threadLife[NDB_MAXTHREADS];
static int tNodeId;
static int ThreadReady[NDB_MAXTHREADS];
@@ -91,6 +94,9 @@ static char
static char attrName[MAXATTR][MAXSTRLEN+1];
// Program Parameters
+static NdbRecord * g_record[MAXTABLES];
+static bool tNdbRecord = false;
+
static bool tLocal = false;
static int tLocalPart = 0;
static int tSendForce = 0;
@@ -236,6 +242,17 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
+ if (tNdbRecord)
+ {
+ Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+ sz += 3;
+ for (Uint32 i = 0; i<tNoOfThreads; i++)
+ {
+ pThreadData[i].record = (char*)malloc(sz);
+ bzero(pThreadData[i].record, sz);
+ }
+ }
+
if(returnValue == NDBT_OK){
/****************************************************************
* Create NDB objects. *
@@ -501,7 +518,7 @@ threadLoop(void* ThreadData)
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
- if(!executeThread(tType, localNdb, threadBase)){
+ if(!executeThread(tabThread, tType, localNdb, threadBase)){
break;
}
ThreadReady[threadNo] = 1;
@@ -515,66 +532,76 @@ threadLoop(void* ThreadData)
static
bool
-executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+executeThread(ThreadNdb* pThread,
+ StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
int i, j, k;
NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
- for (i = 0; i < tNoOfTransactions; i++) {
- if (tLocal == false) {
- tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
- } else {
- tBase = i * tNoOfParallelTrans * MAX_SEEK;
- }//if
- START_REAL_TIME;
- for (j = 0; j < tNoOfParallelTrans; j++) {
+ unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
+
+ for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
+ {
+ for (i = 0; i < tNoOfTransactions; i++) {
if (tLocal == false) {
- tBase2 = tBase + (j * tNoOfOpsPerTrans);
- } else {
- tBase2 = tBase + (j * MAX_SEEK);
- tBase2 = getKey(threadBase, tBase2);
- }//if
- if (startTransGuess == true) {
- Uint64 Tkey64;
- Uint32* Tkey32 = (Uint32*)&Tkey64;
- Tkey32[0] = threadBase;
- Tkey32[1] = tBase2;
- tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
- (const char*)&Tkey64, //Main PKey
- (Uint32)4); //Key Length
+ tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
} else {
- tConArray[j] = aNdbObject->startTransaction();
+ tBase = i * tNoOfParallelTrans * MAX_SEEK;
}//if
- if (tConArray[j] == NULL &&
- !error_handler(aNdbObject->getNdbError()) ){
- ndbout << endl << "Unable to recover! Quiting now" << endl ;
- return false;
- }//if
-
- for (k = 0; k < tNoOfOpsPerTrans; k++) {
- //-------------------------------------------------------
- // Define the operation, but do not execute it yet.
- //-------------------------------------------------------
- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+ START_REAL_TIME;
+ for (j = 0; j < tNoOfParallelTrans; j++) {
+ if (tLocal == false) {
+ tBase2 = tBase + (j * tNoOfOpsPerTrans);
+ } else {
+ tBase2 = tBase + (j * MAX_SEEK);
+ tBase2 = getKey(threadBase, tBase2);
+ }//if
+ if (startTransGuess == true) {
+ Uint64 Tkey64;
+ Uint32* Tkey32 = (Uint32*)&Tkey64;
+ Tkey32[0] = threadBase;
+ Tkey32[1] = tBase2;
+ tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+ (const char*)&Tkey64, //Main PKey
+ (Uint32)4); //Key Length
+ } else {
+ tConArray[j] = aNdbObject->startTransaction();
+ }//if
+ if (tConArray[j] == NULL &&
+ !error_handler(aNdbObject->getNdbError()) ){
+ ndbout << endl << "Unable to recover! Quiting now" << endl ;
+ return false;
+ }//if
+
+ for (k = 0; k < tNoOfOpsPerTrans; k++) {
+ //-------------------------------------------------------
+ // Define the operation, but do not execute it yet.
+ //-------------------------------------------------------
+ if (tNdbRecord)
+ defineNdbRecordOperation(pThread,
+ tConArray[j], aType, threadBase,(tBase2+k));
+ else
+ defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+ }//for
+
+ tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+ }//for
+ STOP_REAL_TIME;
+ //-------------------------------------------------------
+ // Now we have defined a set of operations, it is now time
+ // to execute all of them.
+ //-------------------------------------------------------
+ int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+ while (Tcomp < tNoOfParallelTrans) {
+ int TlocalComp = aNdbObject->pollNdb(3000, 0);
+ Tcomp += TlocalComp;
+ }//while
+ for (j = 0 ; j < tNoOfParallelTrans ; j++) {
+ aNdbObject->closeTransaction(tConArray[j]);
}//for
-
- tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
- }//for
- STOP_REAL_TIME;
- //-------------------------------------------------------
- // Now we have defined a set of operations, it is now time
- // to execute all of them.
- //-------------------------------------------------------
- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
- while (Tcomp < tNoOfParallelTrans) {
- int TlocalComp = aNdbObject->pollNdb(3000, 0);
- Tcomp += TlocalComp;
- }//while
- for (j = 0 ; j < tNoOfParallelTrans ; j++) {
- aNdbObject->closeTransaction(tConArray[j]);
}//for
- }//for
+ } // for
return true;
}//executeThread()
@@ -720,6 +747,68 @@ defineOperation(NdbConnection* localNdbC
return;
}//defineOperation()
+
+static void
+defineNdbRecordOperation(ThreadNdb* pThread,
+ NdbConnection* pTrans, StartType aType,
+ Uint32 threadBase, Uint32 aIndex)
+{
+ char * record = pThread->record;
+ Uint32 offset;
+ NdbDictionary::getOffset(g_record[0], 0, offset);
+ * (Uint32*)(record + offset) = threadBase;
+ * (Uint32*)(record + offset + 4) = aIndex;
+
+ //-------------------------------------------------------
+ // Set-up the attribute values for this operation.
+ //-------------------------------------------------------
+ if (aType != stRead && aType != stDelete)
+ {
+ for (int k = 1; k < tNoOfAttributes; k++) {
+ NdbDictionary::getOffset(g_record[0], k, offset);
+ * (Uint32*)(record + offset) = aIndex;
+ }//for
+ }
+
+ const NdbOperation* op;
+ switch (aType) {
+ case stInsert: { // Insert case
+ if (theWriteFlag == 1)
+ {
+ op = pTrans->writeTuple(g_record[0],record,g_record[0],record);
+ }
+ else
+ {
+ op = pTrans->insertTuple(g_record[0],record,g_record[0],record);
+ }
+ break;
+ }//case
+ case stRead: { // Read Case
+ op = pTrans->readTuple(g_record[0],record,g_record[0],record);
+ break;
+ }//case
+ case stUpdate:{ // Update Case
+ op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
+ break;
+ }//case
+ case stDelete: { // Delete Case
+ op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+ break;
+ }//case
+ default: {
+ abort();
+ }//default
+ }//switch
+
+ if (op == NULL)
+ {
+ ndbout << "Operation is null " << pTrans->getNdbError() << endl;
+ abort();
+ }
+
+ assert(op != 0);
+}
+
static void setAttrNames()
{
int i;
@@ -813,6 +902,28 @@ createTables(Ndb* pMyNdb){
return -1;
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+
+ if (tNdbRecord)
+ {
+ NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+ const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+
+ int off = 0;
+ Vector<NdbDictionary::RecordSpecification> spec;
+ for (Uint32 j = 0; j<pTab->getNoOfColumns(); j++)
+ {
+ NdbDictionary::RecordSpecification r0;
+ r0.column = pTab->getColumn(j);
+ r0.offset = off;
+ off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+ spec.push_back(r0);
+ }
+ g_record[i] =
+ pDict->createRecord(pTab, spec.getBase(),
+ spec.size(),
+ sizeof(NdbDictionary::RecordSpecification));
+ assert(g_record[i]);
+ }
}
}
@@ -949,6 +1060,10 @@ readArguments(int argc, const char** arg
startTransGuess = false;
argc++;
i--;
+ } else if (strcmp(argv[i], "-ndbrecord") == 0){
+ tNdbRecord = true;
+ argc++;
+ i--;
} else {
return -1;
}
@@ -995,7 +1110,7 @@ input_error(){
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
ndbout_c(" -local Number of part, only use keys in one part out of 16");
+ ndbout_c(" -ndbrecord");
}
-
-
+template class Vector<NdbDictionary::RecordSpecification>;
=== modified file 'storage/ndb/test/ndbapi/msa.cpp'
--- a/storage/ndb/test/ndbapi/msa.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/msa.cpp 2008-09-30 08:18:41 +0000
@@ -22,6 +22,7 @@
#include <NdbSleep.h>
#include <NdbThread.h>
#include <NdbTick.h>
+#include <NdbOut.hpp>
const char* const c_szDatabaseName = "TEST_DB";
@@ -52,6 +53,7 @@ bool g_bWriteTuple = false;
bool g_bInsertInitial = false;
bool g_bVerifyInitial = false;
+Ndb_cluster_connection* theConnection = 0;
NdbMutex* g_pNdbMutexPrintf = 0;
NdbMutex* g_pNdbMutexIncrement = 0;
long g_nNumCallsProcessed = 0;
@@ -242,7 +244,8 @@ int QueryTransaction(Ndb* pNdb,
NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -326,7 +329,8 @@ int RetryQueryTransaction(Ndb* pNdb,
int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -411,7 +415,8 @@ int InsertTransaction(Ndb* pNdb,
NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextID, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextID, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -501,7 +506,8 @@ int RetryInsertTransaction(Ndb* pNdb,
int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
{
int iRes = -1;
- NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+ NdbConnection* pNdbConnection = pNdb->startTransaction();
+ //0, (const char*)&iContextId, 4);
if(pNdbConnection)
{
NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -678,7 +684,7 @@ void* RuntimeCallContext(void* lpParam)
long iLockTime;
long iLockTimeUSec;
- pNdb = new Ndb("TEST_DB");
+ pNdb = new Ndb(theConnection, "TEST_DB");
if(!pNdb)
{
NdbMutex_Lock(g_pNdbMutexPrintf);
@@ -971,7 +977,6 @@ void ShowHelp(const char* szCmd)
int main(int argc, char* argv[])
{
ndb_init();
- int iRes = -1;
g_nNumThreads = 0;
g_nMaxCallsPerSecond = 0;
long nSeed = 0;
@@ -998,7 +1003,7 @@ int main(int argc, char* argv[])
break;
case 'm':
g_nStatusDataSize = atol(argv[i]+2);
- if(g_nStatusDataSize>sizeof(STATUS_DATA))
+ if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
{
g_nStatusDataSize = sizeof(STATUS_DATA);
}
@@ -1093,7 +1098,19 @@ int main(int argc, char* argv[])
hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
#endif
- Ndb* pNdb = new Ndb(c_szDatabaseName);
+ theConnection= new Ndb_cluster_connection();
+ if (theConnection->connect(12, 5, 1) != 0)
+ {
+ ndbout << "Unable to connect to managment server." << endl;
+ return -1;
+ }
+ if (theConnection->wait_until_ready(30,0) < 0)
+ {
+ ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+ return -1;
+ }
+
+ Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
if(!pNdb)
{
printf("could not construct ndb\n");
=== modified file 'storage/ndb/test/run-test/setup.cpp'
--- a/storage/ndb/test/run-test/setup.cpp 2008-09-25 10:21:14 +0000
+++ b/storage/ndb/test/run-test/setup.cpp 2008-10-08 21:05:55 +0000
@@ -34,12 +34,10 @@ struct proc_option f_options[] = {
,{ "--host=", atrt_process::AP_CLIENT, 0 }
,{ "--server-id=", atrt_process::AP_MYSQLD, PO_REP }
,{ "--log-bin", atrt_process::AP_MYSQLD, PO_REP_MASTER }
-#if 0
,{ "--master-host=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
,{ "--master-port=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
,{ "--master-user=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
,{ "--master-password=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
-#endif
,{ "--ndb-connectstring=", atrt_process::AP_MYSQLD | atrt_process::AP_CLUSTER
,PO_NDB }
,{ "--ndbcluster", atrt_process::AP_MYSQLD, PO_NDB }
| Thread |
|---|
| • bzr push into mysql-6.0-ndb branch (tomas.ulin:2674 to 2675) | Tomas Ulin | 15 Oct |