List:Commits« Previous MessageNext Message »
From:Tomas Ulin Date:October 15 2008 12:16pm
Subject:bzr commit into mysql-6.0-ndb branch (tomas.ulin:2675)
View as plain text  
#At file:///home/tomas/mysql_src/mysql-6.0-ndb/

 2675 Tomas Ulin	2008-10-15 [merge]
      merge
added:
  mysql-test/suite/ndb/r/ndb_discover_db.result
  mysql-test/suite/ndb/r/ndb_discover_db2.result
  mysql-test/suite/ndb/t/ndb_discover_db.test
  mysql-test/suite/ndb/t/ndb_discover_db2-master.opt
  mysql-test/suite/ndb/t/ndb_discover_db2.test
  sql/ha_ndbcluster_lock_ext.h
modified:
  mysql-test/mysql-test-run.pl
  mysql-test/suite/ndb/r/ndb_alter_table_online2.result
  mysql-test/suite/ndb/r/ndb_basic.result
  mysql-test/suite/ndb/t/disabled.def
  mysql-test/suite/ndb/t/ndb_alter_table_online2.test
  mysql-test/suite/ndb/t/ndb_basic.test
  mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result
  mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test
  sql/Makefile.am
  sql/ha_ndbcluster.cc
  sql/ha_ndbcluster.h
  sql/ha_ndbcluster_binlog.cc
  sql/ha_ndbcluster_binlog.h
  sql/ha_ndbcluster_connection.cc
  sql/ha_ndbcluster_connection.h
  sql/handler.cc
  sql/handler.h
  sql/mysqld.cc
  sql/sql_db.cc
  sql/sql_delete.cc
  sql/sql_parse.cc
  sql/sql_rename.cc
  sql/sql_table.cc
  storage/ndb/include/ndb_constants.h
  storage/ndb/include/util/ndb_opts.h
  storage/ndb/src/kernel/blocks/backup/Backup.cpp
  storage/ndb/src/kernel/blocks/trix/Trix.cpp
  storage/ndb/src/kernel/blocks/trix/Trix.hpp
  storage/ndb/src/ndbapi/NdbInterpretedCode.cpp
  storage/ndb/test/ndbapi/Makefile.am
  storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
  storage/ndb/test/ndbapi/bench/ndb_async2.cpp
  storage/ndb/test/ndbapi/bench/testData.h
  storage/ndb/test/ndbapi/flexAsynch.cpp
  storage/ndb/test/ndbapi/msa.cpp
  storage/ndb/test/run-test/setup.cpp

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	2008-09-02 15:44:54 +0000
+++ b/mysql-test/mysql-test-run.pl	2008-10-15 12:14:27 +0000
@@ -133,7 +133,7 @@ our $default_vardir;
 
 our $opt_usage;
 our $opt_suites;
-our $opt_suites_default= "main,backup,binlog,rpl,rpl_ndb,ndb,ndb_binlog"; # Default suites to run
+our $opt_suites_default= "ndb,ndb_binlog,rpl_ndb,main,backup,binlog,rpl"; # Default suites to run
 our $opt_script_debug= 0;  # Script debugging, enable with --script-debug
 our $opt_verbose= 0;  # Verbose output, enable with --verbose
 
@@ -2721,7 +2721,7 @@ sub ndbd_start ($$$) {
   mtr_add_arg($args, "$extra_args");
 
   my $nodeid= $cluster->{'ndbds'}->[$idx]->{'nodeid'};
-  my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}.log";
+  my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}_out.log";
   $pid= mtr_spawn($exe_ndbd, $args, "",
 		  $path_ndbd_log,
 		  $path_ndbd_log,
@@ -3779,7 +3779,6 @@ sub mysqld_arguments ($$$$) {
       mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
       if ( $mysql_version_id >= 50100 )
       {
-	mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
 	mtr_add_arg($args, "%s--ndb-log-orig", $prefix);
       }
     }
@@ -3847,7 +3846,6 @@ sub mysqld_arguments ($$$$) {
       mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
       if ( $mysql_version_id >= 50100 )
       {
-	mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
 	mtr_add_arg($args, "%s--ndb-log-orig", $prefix);
       }
     }

=== modified file 'mysql-test/suite/ndb/r/ndb_alter_table_online2.result'
--- a/mysql-test/suite/ndb/r/ndb_alter_table_online2.result	2007-11-19 12:59:12 +0000
+++ b/mysql-test/suite/ndb/r/ndb_alter_table_online2.result	2008-10-15 12:14:27 +0000
@@ -94,3 +94,4 @@ truncate ndb_show_tables_results;
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 drop table t1;
+drop database mysqlslap;

=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2008-10-15 12:14:27 +0000
@@ -924,4 +924,24 @@ create table if not exists t1 (a int not
 create table t2 like t1;
 rename table t1 to t10, t2 to t20;
 drop table t10,t20;
+#
+# bug #39872 - explain causes segv
+# (ndb_index_stat_enable=1 must be set to trigger bug)
+#
+set ndb_index_stat_enable=1;
+CREATE TABLE `t1` (
+`id` int(11) NOT NULL AUTO_INCREMENT,
+PRIMARY KEY (`id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+CREATE TABLE `t2` (
+`id` int(11) NOT NULL,
+`obj_id` int(11) DEFAULT NULL,
+UNIQUE KEY `id` (`id`),
+KEY `obj_id` (`obj_id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+# here we used to segv
+explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	NULL	NULL	NULL	NULL	NULL	NULL	NULL	Impossible WHERE noticed after reading const tables
+drop table t1, t2;
 End of 5.1 tests

=== added file 'mysql-test/suite/ndb/r/ndb_discover_db.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db.result	2008-09-30 09:14:44 +0000
@@ -0,0 +1,7 @@
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;

=== added file 'mysql-test/suite/ndb/r/ndb_discover_db2.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db2.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db2.result	2008-09-30 09:14:44 +0000
@@ -0,0 +1,28 @@
+show create database discover_db;
+Database	Create Database
+discover_db	CREATE DATABASE `discover_db` /*!40100 DEFAULT CHARACTER SET latin1 */
+show create database discover_db_2;
+Database	Create Database
+discover_db_2	CREATE DATABASE `discover_db_2` /*!40100 DEFAULT CHARACTER SET binary */
+reset master;
+insert into discover_db.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+master-bin1.000001	#	Query	102	#	BEGIN
+master-bin1.000001	#	Table_map	102	#	table_id: # (discover_db.t1)
+master-bin1.000001	#	Table_map	102	#	table_id: # (mysql.ndb_apply_status)
+master-bin1.000001	#	Write_rows	102	#	table_id: #
+master-bin1.000001	#	Write_rows	102	#	table_id: # flags: STMT_END_F
+master-bin1.000001	#	Query	102	#	COMMIT
+reset master;
+insert into discover_db_2.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+master-bin1.000001	#	Query	102	#	BEGIN
+master-bin1.000001	#	Table_map	102	#	table_id: # (discover_db_2.t1)
+master-bin1.000001	#	Table_map	102	#	table_id: # (mysql.ndb_apply_status)
+master-bin1.000001	#	Write_rows	102	#	table_id: #
+master-bin1.000001	#	Write_rows	102	#	table_id: # flags: STMT_END_F
+master-bin1.000001	#	Query	102	#	COMMIT
+drop database discover_db;
+drop database discover_db_2;

=== modified file 'mysql-test/suite/ndb/t/disabled.def'
--- a/mysql-test/suite/ndb/t/disabled.def	2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/ndb/t/disabled.def	2008-10-15 12:14:27 +0000
@@ -11,7 +11,6 @@
 ##############################################################################
 
 ndb_partition_error2	 : HF is not sure if the test can work as internded on all the platforms
-#ndb_index_ordered        : Bug#38370 The test ndb.ndb_index_ordered fails with the community features on
 
 # the below testcase have been reworked to avoid the bug, test contains comment, keep bug open
 #ndb_binlog_ddl_multi     : BUG#18976 2006-04-10 kent    CRBR: multiple binlog, second binlog may miss schema log events

=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_online2.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_online2.test	2008-04-20 21:25:28 +0000
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_online2.test	2008-10-15 12:14:27 +0000
@@ -166,3 +166,4 @@ truncate ndb_show_tables_results;
 
 # drop the table
 drop table t1;
+drop database mysqlslap;

=== modified file 'mysql-test/suite/ndb/t/ndb_basic.test'
--- a/mysql-test/suite/ndb/t/ndb_basic.test	2007-11-12 12:25:34 +0000
+++ b/mysql-test/suite/ndb/t/ndb_basic.test	2008-10-15 12:14:27 +0000
@@ -858,4 +858,23 @@ create table t2 like t1;
 rename table t1 to t10, t2 to t20;
 drop table t10,t20;
 
+--echo #
+--echo # bug #39872 - explain causes segv
+--echo # (ndb_index_stat_enable=1 must be set to trigger bug)
+--echo #
+set ndb_index_stat_enable=1;
+CREATE TABLE `t1` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  PRIMARY KEY (`id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+CREATE TABLE `t2` (
+  `id` int(11) NOT NULL,
+  `obj_id` int(11) DEFAULT NULL,
+  UNIQUE KEY `id` (`id`),
+  KEY `obj_id` (`obj_id`)
+) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
+--echo # here we used to segv
+explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
+drop table t1, t2;
+
 --echo End of 5.1 tests

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db.test	2008-09-30 09:14:44 +0000
@@ -0,0 +1,33 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+-- disable_warnings
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+-- enable_warnings
+
+#
+# Prepare for testing database discovery by creating
+# databases, and removing them on one mysqld
+# The discovery happens in ndb_discover_db2.test
+#
+
+# check that created database is discovered
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+
+# check that altered database is discovered
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db_2
+

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2-master.opt'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt	2008-09-30 09:14:44 +0000
@@ -0,0 +1 @@
+--skip-external-locking

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2.test	2008-09-30 09:14:44 +0000
@@ -0,0 +1,21 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+#
+# When this test started there no database on disk for server2
+# Check that table has been discovered correctly, and that the
+# binlog is updated correctly
+#
+
+-- connection server2
+show create database discover_db;
+show create database discover_db_2;
+reset master;
+insert into discover_db.t1 values (1,1);
+--source include/show_binlog_events2.inc
+reset master;
+insert into discover_db_2.t1 values (1,1);
+--source include/show_binlog_events2.inc
+
+drop database discover_db;
+drop database discover_db_2;

=== modified file 'mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result'
--- a/mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result	2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/rpl_ndb_big/r/rpl_ndb_apply_status.result	2008-10-15 12:14:27 +0000
@@ -12,7 +12,7 @@ count(*)
 create table t1 (a int key, b int) engine ndb;
 insert into t1 values (1,1);
 *** on master it should be empty ***
-select * from mysql.ndb_apply_status;
+select * from mysql.ndb_apply_status where server_id <> 0;
 server_id	epoch	log_name	start_pos	end_pos
 *** on slave there should be one row ***
 select count(*) from mysql.ndb_apply_status;

=== modified file 'mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test'
--- a/mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test	2008-08-16 05:15:49 +0000
+++ b/mysql-test/suite/rpl_ndb_big/t/rpl_ndb_apply_status.test	2008-10-15 12:14:27 +0000
@@ -32,7 +32,8 @@ connection master;
 create table t1 (a int key, b int) engine ndb;
 insert into t1 values (1,1);
 echo *** on master it should be empty ***;
-select * from mysql.ndb_apply_status;
+#filter away stuff put there with server_id = 0 (from ndb_restore)
+select * from mysql.ndb_apply_status where server_id <> 0;
 
 sync_slave_with_master;
 echo *** on slave there should be one row ***;

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2008-09-02 15:44:54 +0000
+++ b/sql/Makefile.am	2008-10-15 12:14:27 +0000
@@ -70,6 +70,7 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndbcluster.h ha_ndbcluster_cond.h \
 			ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
 			ha_ndbcluster_connection.h ha_ndbcluster_connection.h \
+			ha_ndbcluster_lock_ext.h \
 			ha_partition.h rpl_constants.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
 			rpl_reporting.h \

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2008-09-05 12:15:38 +0000
+++ b/sql/ha_ndbcluster.cc	2008-10-15 12:14:27 +0000
@@ -67,12 +67,22 @@ TYPELIB ndb_distribution_typelib= { arra
 const char *opt_ndb_distribution= ndb_distribution_names[ND_KEYHASH];
 enum ndb_distribution opt_ndb_distribution_id= ND_KEYHASH;
 
+/*
+  Provided for testing purposes to be able to run full test suite
+  with --ndbcluster option without getting warnings about cluster
+  not being connected
+*/
+my_bool ndbcluster_silent= 0;
+
 // Default value for parallelism
 static const int parallelism= 0;
 
-// Default value for max number of transactions
-// createable against NDB from this handler
-static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
+/*
+  Default value for max number of transactions createable against NDB from
+  the handler. Should really be 2 but there is a transaction to much allocated
+  when lock table is used, and one extra to used for global schema lock.
+*/
+static const int max_transactions= 4;
 
 static uint ndbcluster_partition_flags();
 static int ndbcluster_init(void *);
@@ -108,7 +118,7 @@ static uint ndbcluster_alter_partition_f
   return HA_PARTITION_FUNCTION_SUPPORTED;
 }
 
-#define NDB_AUTO_INCREMENT_RETRIES 10
+#define NDB_AUTO_INCREMENT_RETRIES 100
 #define BATCH_FLUSH_SIZE (32768)
 
 #define ERR_PRINT(err) \
@@ -147,10 +157,12 @@ static uchar *ndbcluster_get_key(NDB_SHA
 static
 NdbRecord *
 ndb_get_table_statistics_ndbrecord(NDBDICT *, const NDBTAB *);
-static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *, 
-                                    struct Ndb_statistics *);
-static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
-                                    const NdbRecord *, struct Ndb_statistics *);
+static int ndb_get_table_statistics(THD *thd, ha_ndbcluster*, bool, Ndb*, const NDBTAB *, 
+                                    struct Ndb_statistics *,
+                                    bool have_lock= FALSE);
+static int ndb_get_table_statistics(THD *thd, ha_ndbcluster*, bool, Ndb*,
+                                    const NdbRecord *, struct Ndb_statistics *,
+                                    bool have_lock= FALSE);
 
 THD *injector_thd= 0;
 
@@ -350,6 +362,9 @@ Thd_ndb::Thd_ndb()
   (void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
                    (hash_get_key)thd_ndb_share_get_key, 0, 0);
   m_unsent_bytes= 0;
+  global_schema_lock_trans= NULL;
+  global_schema_lock_count= 0;
+  global_schema_lock_error= 0;
   init_alloc_root(&m_batch_mem_root, BATCH_FLUSH_SIZE/4, 0);
 }
 
@@ -357,19 +372,6 @@ Thd_ndb::~Thd_ndb()
 {
   if (ndb)
   {
-#ifndef DBUG_OFF
-    Ndb::Free_list_usage tmp;
-    tmp.m_name= 0;
-    while (ndb->get_free_list_usage(&tmp))
-    {
-      uint leaked= (uint) tmp.m_created - tmp.m_free;
-      if (leaked)
-        fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
-                leaked, tmp.m_name,
-                (leaked == 1)?"":"'s",
-                (leaked == 1)?"has":"have");
-    }
-#endif
     delete ndb;
     ndb= NULL;
   }
@@ -489,7 +491,8 @@ static void set_ndb_err(THD *thd, const 
   DBUG_VOID_RETURN;
 }
 
-int ha_ndbcluster::ndb_err(NdbTransaction *trans)
+int ha_ndbcluster::ndb_err(NdbTransaction *trans,
+                           bool have_lock)
 {
   THD *thd= current_thd;
   int res;
@@ -508,7 +511,7 @@ int ha_ndbcluster::ndb_err(NdbTransactio
     bzero((char*) &table_list,sizeof(table_list));
     table_list.db= m_dbname;
     table_list.alias= table_list.table_name= m_tabname;
-    close_cached_tables(thd, &table_list, FALSE, FALSE);
+    close_cached_tables(thd, &table_list, have_lock, FALSE);
     break;
   }
   default:
@@ -2883,12 +2886,12 @@ int ha_ndbcluster::ndb_write_row(uchar *
     for (;;)
     {
       Ndb_tuple_id_range_guard g(m_share);
-      if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1) == -1)
+      if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1000) == -1)
       {
-	if (--retries &&
+	if (--retries && !thd->killed &&
 	    ndb->getNdbError().status == NdbError::TemporaryError)
 	{
-	  my_sleep(retry_sleep);
+	  do_retry_sleep(retry_sleep);
 	  continue;
 	}
 	ERR_RETURN(ndb->getNdbError());
@@ -3823,6 +3826,14 @@ int ha_ndbcluster::rnd_init(bool scan)
 
 int ha_ndbcluster::close_scan()
 {
+  /*
+    workaround for bug #39872 - explain causes segv
+    - rnd_end/close_scan is called on unlocked table
+    - should be fixed in server code, but this will
+    not be done until 6.0 as it is too intrusive
+  */
+  if (m_thd_ndb == NULL)
+    return 0;
   NdbTransaction *trans= m_thd_ndb->trans;
   int error;
   DBUG_ENTER("close_scan");
@@ -4504,6 +4515,11 @@ int ha_ndbcluster::start_statement(THD *
   {
     //lockThisTable();
     DBUG_PRINT("info", ("Locking the table..." ));
+#ifdef NOT_YET
+    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 0,
+                        "Table only locked locally in this mysqld", "NDB");
+#endif
   }
   DBUG_RETURN(0);
 }
@@ -5360,18 +5376,6 @@ int ha_ndbcluster::create(const char *na
   NDBDICT *dict= ndb->getDictionary();
 
   DBUG_PRINT("info", ("Tablespace %s,%s", form->s->tablespace, create_info->tablespace));
-  if (is_truncate)
-  {
-    {
-      Ndb_table_guard ndbtab_g(dict, m_tabname);
-      if (!(m_table= ndbtab_g.get_table()))
-	ERR_RETURN(dict->getNdbError());
-      m_table= NULL;
-    }
-    DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
-    if ((my_errno= delete_table(name)))
-      DBUG_RETURN(my_errno);
-  }
   table= form;
   if (create_from_engine)
   {
@@ -5390,6 +5394,12 @@ int ha_ndbcluster::create(const char *na
   }
 
 #ifdef HAVE_NDB_BINLOG
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+
+  if (!((thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP) ||
+        ndbcluster_has_global_schema_lock(thd_ndb)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::create"));
   /*
     Don't allow table creation unless
     schema distribution table is setup
@@ -5406,6 +5416,18 @@ int ha_ndbcluster::create(const char *na
     single_user_mode = NdbDictionary::Table::SingleUserModeReadWrite;
   }
 #endif /* HAVE_NDB_BINLOG */
+  if (is_truncate)
+  {
+    {
+      Ndb_table_guard ndbtab_g(dict, m_tabname);
+      if (!(m_table= ndbtab_g.get_table()))
+	ERR_RETURN(dict->getNdbError());
+      m_table= NULL;
+    }
+    DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
+    if ((my_errno= delete_table(name)))
+      DBUG_RETURN(my_errno);
+  }
 
   DBUG_PRINT("table", ("name: %s", m_tabname));  
   if (tab.setName(m_tabname))
@@ -5990,6 +6012,12 @@ int ha_ndbcluster::rename_table(const ch
   if (check_ndb_connection(thd))
     DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
 
+#ifdef HAVE_NDB_BINLOG
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::rename_table"));
+#endif
+
   Ndb *ndb= get_ndb(thd);
   ndb->setDatabaseName(old_dbname);
   dict= ndb->getDictionary();
@@ -6384,7 +6412,6 @@ int ha_ndbcluster::delete_table(const ch
   if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
-    DBUG_ASSERT(ndb_schema_share);
     error= HA_ERR_NO_CONNECTION;
     goto err;
   }
@@ -6397,6 +6424,13 @@ int ha_ndbcluster::delete_table(const ch
   }
 
   ndb= get_ndb(thd);
+
+#ifdef HAVE_NDB_BINLOG
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::delete_table"));
+#endif
+
   /*
     Drop table in ndb.
     If it was already gone it might have been dropped
@@ -6451,10 +6485,10 @@ void ha_ndbcluster::get_auto_increment(u
         ndb->readAutoIncrementValue(m_table, g.range, auto_value) ||
         ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size, increment, offset))
     {
-      if (--retries &&
+      if (--retries && !thd->killed &&
           ndb->getNdbError().status == NdbError::TemporaryError)
       {
-        my_sleep(retry_sleep);
+        do_retry_sleep(retry_sleep);
         continue;
       }
       const NdbError err= ndb->getNdbError();
@@ -6677,7 +6711,7 @@ int ha_ndbcluster::open(const char *name
     m_share= 0;
     DBUG_RETURN(res);
   }
-  if ((res= update_stats(thd, 1)) ||
+  if ((res= update_stats(thd, 1, true)) ||
       (res= info(HA_STATUS_CONST)))
   {
     free_share(&m_share);
@@ -6971,7 +7005,10 @@ err:
                              share->key, share->use_count));
     free_share(&share);
   }
-  if (ndb_error.code)
+  /*
+    ndbcluster_silent - avoid "cluster disconnected error"
+  */
+  if (ndb_error.code && (!ndbcluster_silent || ndb_error.code != 4009))
   {
     ERR_RETURN(ndb_error);
   }
@@ -6995,7 +7032,15 @@ int ndbcluster_table_exists_in_engine(ha
   NDBDICT* dict= ndb->getDictionary();
   NdbDictionary::Dictionary::List list;
   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+  {
+    /*
+      ndbcluster_silent
+      - avoid "cluster failure" warning if cluster is not connected
+    */
+    if (ndbcluster_silent && dict->getNdbError().code == 4009)
+      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
     ERR_RETURN(dict->getNdbError());
+  }
   for (uint i= 0 ; i < list.count ; i++)
   {
     NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
@@ -7096,7 +7141,6 @@ static void ndbcluster_drop_database(han
   if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
-    DBUG_ASSERT(ndb_schema_share);
     DBUG_VOID_RETURN;
   }
 #endif
@@ -7278,6 +7322,12 @@ int ndbcluster_find_files(handlerton *ht
   if (dir)
     DBUG_RETURN(0); // Discover of databases not yet supported
 
+#ifdef HAVE_NDB_BINLOG
+  Ndbcluster_global_schema_lock_guard ndbcluster_global_schema_lock_guard(thd);
+  if (ndbcluster_global_schema_lock_guard.lock())
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+#endif
+
   // List tables in NDB
   NDBDICT *dict= ndb->getDictionary();
   if (dict->listObjects(list, 
@@ -7688,22 +7738,6 @@ static int ndbcluster_end(handlerton *ht
 #endif
   hash_free(&ndbcluster_open_tables);
 
-  if (g_ndb)
-  {
-#ifndef DBUG_OFF
-    Ndb::Free_list_usage tmp;
-    tmp.m_name= 0;
-    while (g_ndb->get_free_list_usage(&tmp))
-    {
-      uint leaked= (uint) tmp.m_created - tmp.m_free;
-      if (leaked)
-        fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
-                leaked, tmp.m_name,
-                (leaked == 1)?"":"'s",
-                (leaked == 1)?"has":"have");
-    }
-#endif
-  }
   ndbcluster_disconnect();
 
   // cleanup ndb interface
@@ -8072,7 +8106,7 @@ uint ndb_get_commitcount(THD *thd, char 
   {
     Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
     if (ndbtab_g.get_table() == 0
-        || ndb_get_table_statistics(NULL, FALSE, ndb, ndbtab_g.get_table(), &stat))
+        || ndb_get_table_statistics(thd, NULL, FALSE, ndb, ndbtab_g.get_table(), &stat))
     {
       /* ndb_share reference temporary free */
       DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
@@ -8746,7 +8780,9 @@ struct ndb_table_statistics_row {
   Uint64 var_mem;
 };
 
-int ha_ndbcluster::update_stats(THD *thd, bool do_read_stat)
+int ha_ndbcluster::update_stats(THD *thd,
+                                bool do_read_stat,
+                                bool have_lock)
 {
   struct Ndb_statistics stat;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -8758,7 +8794,7 @@ int ha_ndbcluster::update_stats(THD *thd
     {
       DBUG_RETURN(my_errno= HA_ERR_OUT_OF_MEM);
     }
-    if (int err= ndb_get_table_statistics(this, TRUE, ndb,
+    if (int err= ndb_get_table_statistics(thd, this, TRUE, ndb,
                                           m_ndb_statistics_record, &stat))
     {
       DBUG_RETURN(err);
@@ -8826,15 +8862,16 @@ ndb_get_table_statistics_ndbrecord(NDBDI
 
 static 
 int
-ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ndb_get_table_statistics(THD *thd, ha_ndbcluster* file, bool report_error, Ndb* ndb,
                          const NdbRecord *record,
-                         struct Ndb_statistics * ndbstat)
+                         struct Ndb_statistics * ndbstat,
+                         bool have_lock)
 {
   NdbTransaction* pTrans;
   NdbError error;
-  int retries= 10;
+  int retries= 100;
   int reterr= 0;
-  int retry_sleep= 30 * 1000; /* 30 milliseconds */
+  int retry_sleep= 30; /* 30 milliseconds */
   const char *row;
 #ifndef DBUG_OFF
   char buff[22], buff2[22], buff3[22], buff4[22];
@@ -8941,7 +8978,7 @@ retry:
     {
       if (file && pTrans)
       {
-        reterr= file->ndb_err(pTrans);
+        reterr= file->ndb_err(pTrans, have_lock);
       }
       else
       {
@@ -8958,9 +8995,10 @@ retry:
       ndb->closeTransaction(pTrans);
       pTrans= NULL;
     }
-    if (error.status == NdbError::TemporaryError && retries--)
+    if (error.status == NdbError::TemporaryError &&
+        retries-- && !thd->killed)
     {
-      my_sleep(retry_sleep);
+      do_retry_sleep(retry_sleep);
       continue;
     }
     break;
@@ -8975,9 +9013,10 @@ retry:
 */
 static 
 int
-ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ndb_get_table_statistics(THD *thd, ha_ndbcluster* file, bool report_error, Ndb* ndb,
                          const NDBTAB *ndbtab,
-                         struct Ndb_statistics *ndbstat)
+                         struct Ndb_statistics *ndbstat,
+                         bool have_lock)
 {
   NDBDICT *dict= ndb->getDictionary();
   NdbRecord *rec= ndb_get_table_statistics_ndbrecord(dict, ndbtab);
@@ -8986,7 +9025,7 @@ ndb_get_table_statistics(ha_ndbcluster* 
     DBUG_ENTER("ndb_get_table_statistics");
     ERR_RETURN(dict->getNdbError());
   }
-  int res= ndb_get_table_statistics(file, report_error, ndb, rec, ndbstat);
+  int res= ndb_get_table_statistics(thd, file, report_error, ndb, rec, ndbstat);
   dict->releaseRecord(rec);
   return res;
 }
@@ -10150,7 +10189,7 @@ pthread_handler_t ndb_util_thread_func(v
         }
         Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
         if (ndbtab_g.get_table() &&
-            ndb_get_table_statistics(NULL, FALSE, ndb,
+            ndb_get_table_statistics(thd, NULL, FALSE, ndb,
                                      ndbtab_g.get_table(), &stat) == 0)
         {
 #ifndef DBUG_OFF
@@ -10895,6 +10934,12 @@ int ha_ndbcluster::alter_table_phase1(TH
   adding=  adding | HA_ADD_INDEX | HA_ADD_UNIQUE_INDEX;
   dropping= dropping | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
 
+#ifdef HAVE_NDB_BINLOG
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase1"));
+#endif
+
   if (!(alter_data= new NDB_ALTER_DATA(dict, m_table)))
     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
   old_tab= alter_data->old_table;
@@ -11085,6 +11130,12 @@ int ha_ndbcluster::alter_table_phase2(TH
   DBUG_ENTER("alter_table_phase2");
   dropping= dropping  | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
 
+#ifdef HAVE_NDB_BINLOG
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase2"));
+#endif
+
   if ((*alter_flags & dropping).is_set())
   {
     /* Tell the handler to finally drop the indexes. */
@@ -11117,6 +11168,10 @@ int ha_ndbcluster::alter_table_phase3(TH
   DBUG_ENTER("alter_table_phase3");
 
 #ifdef HAVE_NDB_BINLOG
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase3"));
+
   const char *db= table->s->db.str;
   const char *name= table->s->table_name.str;
   /*

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2008-08-16 05:15:49 +0000
+++ b/sql/ha_ndbcluster.h	2008-10-15 12:14:27 +0000
@@ -231,7 +231,12 @@ inline my_bool get_binlog_use_update(NDB
 
 enum THD_NDB_OPTIONS
 {
-  TNO_NO_LOG_SCHEMA_OP= 1 << 0
+  TNO_NO_LOG_SCHEMA_OP=  1 << 0,
+  /*
+    In participating mysqld, do not try to acquire global schema
+    lock, as one other mysqld already has the lock.
+  */
+  TNO_NO_LOCK_SCHEMA_OP= 1 << 1
 };
 
 enum THD_NDB_TRANS_OPTIONS
@@ -279,6 +284,9 @@ class Thd_ndb 
     we execute() to flush the rows buffered in m_batch_mem_root.
   */
   uint m_unsent_bytes;
+  NdbTransaction *global_schema_lock_trans;
+  uint global_schema_lock_count;
+  uint global_schema_lock_error;
 };
 
 class ha_ndbcluster: public handler
@@ -452,7 +460,7 @@ static void set_tabname(const char *path
   /*
    * Internal to ha_ndbcluster, used by C functions
    */
-  int ndb_err(NdbTransaction*);
+  int ndb_err(NdbTransaction*, bool have_lock= FALSE);
 
   my_bool register_query_cache_table(THD *thd, char *table_key,
                                      uint key_length,
@@ -715,7 +723,7 @@ private:
   NdbIndexScanOperation *m_multi_cursor;
   Ndb *get_ndb(THD *thd);
 
-  int update_stats(THD *thd, bool do_read_stat);
+  int update_stats(THD *thd, bool do_read_stat, bool have_lock= FALSE);
 };
 
 extern SHOW_VAR ndb_status_variables[];

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2008-09-11 06:11:36 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2008-10-15 12:14:27 +0000
@@ -28,6 +28,7 @@
 #include <ndbapi/NdbDictionary.hpp>
 #include <ndbapi/ndb_cluster_connection.hpp>
 #include <util/NdbAutoPtr.hpp>
+#include <portlib/NdbTick.h>
 
 #ifdef ndb_dynamite
 #undef assert
@@ -39,6 +40,8 @@ extern my_bool opt_ndb_log_orig;
 extern my_bool opt_ndb_log_update_as_write;
 extern my_bool opt_ndb_log_updated_only;
 
+extern my_bool ndbcluster_silent;
+
 /*
   defines for cluster replication table names
 */
@@ -146,6 +149,8 @@ static char binlog_mdlkey[MAX_MDLKEY_LEN
 /*
   Helper functions
 */
+static bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
+static bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
 
 #ifndef DBUG_OFF
 /* purecov: begin deadcode */
@@ -473,8 +478,14 @@ static void ndbcluster_binlog_wait(THD *
   if (ndb_binlog_running)
   {
     DBUG_ENTER("ndbcluster_binlog_wait");
-    const char *save_info= thd ? thd->proc_info : 0;
     ulonglong wait_epoch= ndb_get_latest_trans_gci();
+    /*
+      cluster not connected or no transactions done
+      so nothing to wait for
+    */
+    if (!wait_epoch)
+      DBUG_VOID_RETURN;
+    const char *save_info= thd ? thd->proc_info : 0;
     int count= 30;
     if (thd)
       THD_SET_PROC_INFO(thd,
@@ -663,6 +674,24 @@ static void ndbcluster_reset_slave(THD *
   char buf[1024];
   char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
   run_query(thd, buf, end, NULL, TRUE);
+  if (thd->main_da.is_error() &&
+      ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE) ||
+       (thd->main_da.sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
+  {
+    /*
+      If table does not exist ignore the error as it
+      is a consistant behavior
+    */
+    thd->main_da.reset_diagnostics_area();
+    /*
+      ndbcluster_silent
+      - avoid "no table mysql.ndb_apply_status" warning - ER_NO_SUCH_TABLE
+      - avoid "mysql.ndb_apply_status read only" warning - ER_OPEN_AS_READONLY
+    */
+    if (ndbcluster_silent)
+      mysql_reset_errors(thd, 1);
+  }
+
   DBUG_VOID_RETURN;
 }
 
@@ -681,15 +710,134 @@ static bool ndbcluster_flush_logs(handle
   return FALSE;
 }
 
+/*
+  Global schema lock across mysql servers
+*/
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb)
+{
+  if (thd_ndb->global_schema_lock_trans)
+  {
+    thd_ndb->global_schema_lock_trans->refresh();
+    return 1;
+  }
+  return 0;
+}
+
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  if (thd_ndb && thd_ndb->global_schema_lock_error != 0)
+    return HA_ERR_NO_CONNECTION;
+  sql_print_error("NDB: programming error, no lock taken while running "
+                  "query %s. Message: %s", thd->query, msg);
+  abort();
+}
+
+#include "ha_ndbcluster_lock_ext.h"
+
+/*
+  lock/unlock calls are reference counted, so calls to lock
+  must be matched to a call to unlock even if the lock call fails
+*/
+static int ndbcluster_global_schema_lock(THD *thd,
+                                         int report_cluster_disconnected)
+{
+  Ndb *ndb= check_ndb_in_thd(thd);
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  NdbError ndb_error;
+  if (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP)
+    return 0;
+  DBUG_ENTER("ndbcluster_global_schema_lock");
+  DBUG_PRINT("enter", ("query: %s", thd->query));
+  if (thd_ndb->global_schema_lock_count)
+  {
+    if (thd_ndb->global_schema_lock_trans)
+      thd_ndb->global_schema_lock_trans->refresh();
+    else
+      DBUG_ASSERT(thd_ndb->global_schema_lock_error != 0);
+    thd_ndb->global_schema_lock_count++;
+    DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                        thd_ndb->global_schema_lock_count));
+    DBUG_RETURN(0);
+  }
+  DBUG_ASSERT(thd_ndb->global_schema_lock_count == 0);
+  thd_ndb->global_schema_lock_count= 1;
+  thd_ndb->global_schema_lock_error= 0;
+  DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                      thd_ndb->global_schema_lock_count));
+
+  if ((thd_ndb->global_schema_lock_trans=
+       ndbcluster_global_schema_lock_ext(thd, ndb, ndb_error, -1)) != NULL)
+  {
+    DBUG_RETURN(0);
+  }
+
+  /*
+    ndbcluster_silent - avoid "cluster disconnected error"
+  */
+  if (ndbcluster_silent)
+    report_cluster_disconnected= 0;
+  if (ndb_error.code != 4009 || report_cluster_disconnected)
+  {
+    sql_print_warning("NDB: Could not acquire global schema lock (%d)%s",
+                      ndb_error.code, ndb_error.message);
+    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                        ndb_error.code, ndb_error.message,
+                        "NDB. Could not acquire global schema lock");
+  }
+  thd_ndb->global_schema_lock_error= ndb_error.code ? ndb_error.code : -1;
+  DBUG_RETURN(-1);
+}
+static int ndbcluster_global_schema_unlock(THD *thd)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  DBUG_ASSERT(thd_ndb != 0);
+  if (thd_ndb == 0 || (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP))
+    return 0;
+  Ndb *ndb= thd_ndb->ndb;
+  DBUG_ENTER("ndbcluster_global_schema_unlock");
+  NdbTransaction *trans= thd_ndb->global_schema_lock_trans;
+  thd_ndb->global_schema_lock_count--;
+  DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                      thd_ndb->global_schema_lock_count));
+  DBUG_ASSERT(ndb != NULL);
+  if (ndb == NULL)
+    return 0;
+  DBUG_ASSERT(trans != NULL || thd_ndb->global_schema_lock_error != 0);
+  if (thd_ndb->global_schema_lock_count != 0)
+  {
+    DBUG_RETURN(0);
+  }
+  thd_ndb->global_schema_lock_error= 0;
+  if (trans)
+  {
+    thd_ndb->global_schema_lock_trans= NULL;
+    NdbError ndb_error;
+    if (ndbcluster_global_schema_unlock_ext(ndb, trans, ndb_error))
+    {
+      sql_print_warning("NDB: Releasing global schema lock (%d)%s",
+                        ndb_error.code, ndb_error.message);
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                          ndb_error.code,
+                          ndb_error.message,
+                          "ndb. Releasing global schema lock");
+      DBUG_RETURN(-1);
+    }
+  }
+  DBUG_RETURN(0);
+}
+
 static int ndbcluster_binlog_func(handlerton *hton, THD *thd, 
                                   enum_binlog_func fn, 
                                   void *arg)
 {
+  DBUG_ENTER("ndbcluster_binlog_func");
   switch(fn)
   {
   case BFN_RESET_LOGS:
-    ndbcluster_reset_logs(thd);
-    break;
+    DBUG_RETURN(ndbcluster_reset_logs(thd));
   case BFN_RESET_SLAVE:
     ndbcluster_reset_slave(thd);
     break;
@@ -702,8 +850,35 @@ static int ndbcluster_binlog_func(handle
   case BFN_BINLOG_PURGE_FILE:
     ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
     break;
+  case BFN_GLOBAL_SCHEMA_LOCK:
+    DBUG_RETURN(ndbcluster_global_schema_lock(thd, 1));
+    break;
+  case BFN_GLOBAL_SCHEMA_UNLOCK:
+    ndbcluster_global_schema_unlock(thd);
+    break;
   }
-  return 0;
+  DBUG_RETURN(0);
+}
+Ndbcluster_global_schema_lock_guard::Ndbcluster_global_schema_lock_guard(THD *thd)
+  : m_thd(thd), m_lock(0)
+{
+}
+Ndbcluster_global_schema_lock_guard::~Ndbcluster_global_schema_lock_guard()
+{
+  if (m_lock)
+    ndbcluster_global_schema_unlock(m_thd);
+}
+int Ndbcluster_global_schema_lock_guard::lock()
+{
+  /* only one lock call allowed */
+  DBUG_ASSERT(m_lock == 0);
+  /*
+    Always se m_lock, even if lock fails. Since the
+    lock/unlock calls are reference counted, the number
+    of calls to lock and unlock need to match up.
+  */
+  m_lock= 1;
+  return ndbcluster_global_schema_lock(m_thd, 0);
 }
 
 void ndbcluster_binlog_init_handlerton()
@@ -903,8 +1078,185 @@ static int ndbcluster_create_schema_tabl
   DBUG_RETURN(0);
 }
 
+class Thd_ndb_options_guard
+{
+public:
+  Thd_ndb_options_guard(Thd_ndb *thd_ndb)
+    : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
+  ~Thd_ndb_options_guard() { m_val= m_save_val; }
+  void set(uint32 flag) { m_val|= flag; }
+private:
+  uint32 &m_val;
+  uint32 m_save_val;
+};
+
+/*
+  Ndb has no representation of the database schema objects.
+  The mysql.ndb_schema table contains the latest schema operations
+  done via a mysqld, and thus reflects databases created/dropped/altered
+  while a mysqld was disconnected.  This function tries to recover
+  the correct state w.r.t created databases using the information in
+  that table.
+
+  Function should only be called while ndbcluster_global_schema_lock
+  is held, to ensure that ndb_schema table is not being updated while
+  scanning.
+*/
+static int ndbcluster_find_all_databases(THD *thd)
+{
+  Ndb *ndb= check_ndb_in_thd(thd);
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+  NDBDICT *dict= ndb->getDictionary();
+  NdbTransaction *trans= NULL;
+  NdbError ndb_error;
+  int retries= 100;
+  int retry_sleep= 30; /* 30 milliseconds, transaction */
+  DBUG_ENTER("ndbcluster_find_all_databases");
+  /* Ensure that we have the right lock */
+  if (!ndbcluster_has_global_schema_lock(thd_ndb))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ndbcluster_find_all_databases"));
+  ndb->setDatabaseName(NDB_REP_DB);
+  thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
+  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+  while (1)
+  {
+    char db_buffer[FN_REFLEN];
+    char *db= db_buffer+1;
+    char name[FN_REFLEN];
+    char query[64000];
+    Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
+    NdbScanOperation *op;
+    NdbBlob *query_blob_handle;
+    int r= 0;
+    if (ndbtab == NULL)
+    {
+      ndb_error= dict->getNdbError();
+      goto error;
+    }
+    trans= ndb->startTransaction();
+    if (trans == NULL)
+    {
+      ndb_error= ndb->getNdbError();
+      goto error;
+    }
+    op= trans->getNdbScanOperation(ndbtab);
+    if (op == NULL)
+      abort();
+    op->readTuples(NdbScanOperation::LM_Read,
+                   NdbScanOperation::SF_TupScan, 1);
+    
+    r|= op->getValue("db", db_buffer) == NULL;
+    r|= op->getValue("name", name) == NULL;
+    r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
+    r|= query_blob_handle->getValue(query, sizeof(query));
+
+    if (r)
+      abort();
+
+    if (trans->execute(NdbTransaction::NoCommit))
+    {
+      ndb_error= trans->getNdbError();
+      goto error;
+    }
+
+    while ((r= op->nextResult()) == 0)
+    {
+      unsigned db_len= db_buffer[0];
+      unsigned name_len= name[0];
+      /*
+        name_len == 0 means no table name, hence the row
+        is for a database
+      */
+      if (db_len > 0 && name_len == 0)
+      {
+        /* database found */
+        Uint64 query_length= 0;
+        if (query_blob_handle->getLength(query_length))
+          abort();
+        db[db_len]= 0;
+        query[query_length]= 0;
+        build_table_filename(name, sizeof(name), db, "", "", 0);
+        int database_exists= !my_access(name, F_OK);
+        if (strncasecmp("CREATE", query, 6) == 0)
+        {
+          /* Database should exist */
+          if (!database_exists)
+          {
+            /* create missing database */
+            sql_print_information("NDB: Discovered missing database '%s'", db);
+            const int no_print_error[1]= {0};
+            run_query(thd, query, query + query_length,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            /* always reset here */
+            thd->main_da.reset_diagnostics_area();
+          }
+        }
+        else if (strncasecmp("ALTER", query, 5) == 0)
+        {
+          /* Database should exist */
+          if (!database_exists)
+          {
+            /* create missing database */
+            sql_print_information("NDB: Discovered missing database '%s'", db);
+            const int no_print_error[1]= {0};
+            name_len= my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
+            run_query(thd, name, name + name_len,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            thd->main_da.reset_diagnostics_area();
+            run_query(thd, query, query + query_length,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            /* always reset here */
+            thd->main_da.reset_diagnostics_area();
+          }
+        }
+        else if (strncasecmp("DROP", query, 4) == 0)
+        {
+          /* Database should not exist */
+          if (database_exists)
+          {
+            /* drop missing database */
+            sql_print_information("NDB: Discovered reamining database '%s'", db);
+          }
+        }
+      }
+    }
+    if (r == -1)
+    {
+      ndb_error= op->getNdbError();
+      goto error;
+    }
+    ndb->closeTransaction(trans);
+    trans= NULL;
+    DBUG_RETURN(0); // success
+  error:
+    if (trans)
+    {
+      ndb->closeTransaction(trans);
+      trans= NULL;
+    }
+    if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
+    {
+      if (retries--)
+      {
+        do_retry_sleep(retry_sleep);
+        continue; // retry
+      }
+    }
+    DBUG_RETURN(1); // not temp error or too many retries
+  }
+}
+
 int ndbcluster_setup_binlog_table_shares(THD *thd)
 {
+  Ndbcluster_global_schema_lock_guard global_schema_lock_guard(thd);
+  if (global_schema_lock_guard.lock())
+    return 1;
   if (!ndb_schema_share &&
       ndbcluster_check_ndb_schema_share() == 0)
   {
@@ -933,6 +1285,11 @@ int ndbcluster_setup_binlog_table_shares
     }
   }
 
+  if (ndbcluster_find_all_databases(thd))
+  {
+    return 1;
+  }
+
   if (!ndbcluster_find_all_files(thd))
   {
     pthread_mutex_lock(&LOCK_open);
@@ -1109,7 +1466,7 @@ ndbcluster_update_slock(THD *thd,
   const NDBTAB *ndbtab= ndbtab_g.get_table();
   NdbTransaction *trans= 0;
   int retries= 100;
-  int retry_sleep= 10; /* 10 milliseconds, transaction */
+  int retry_sleep= 30; /* 30 milliseconds, transaction */
   const NDBCOL *col[SCHEMA_SIZE];
   unsigned sz[SCHEMA_SIZE];
 
@@ -1119,7 +1476,8 @@ ndbcluster_update_slock(THD *thd,
 
   if (ndbtab == 0)
   {
-    abort();
+    if (dict->getNdbError().code != 4009)
+      abort();
     DBUG_RETURN(0);
   }
 
@@ -1204,13 +1562,13 @@ ndbcluster_update_slock(THD *thd,
   err:
     const NdbError *this_error= trans ?
       &trans->getNdbError() : &ndb->getNdbError();
-    if (this_error->status == NdbError::TemporaryError)
+    if (this_error->status == NdbError::TemporaryError && !thd->killed)
     {
       if (retries--)
       {
         if (trans)
           ndb->closeTransaction(trans);
-        my_sleep(retry_sleep);
+        do_retry_sleep(retry_sleep);
         continue; // retry
       }
     }
@@ -1446,7 +1804,7 @@ int ndbcluster_log_schema_op(THD *thd,
   const NDBTAB *ndbtab= ndbtab_g.get_table();
   NdbTransaction *trans= 0;
   int retries= 100;
-  int retry_sleep= 10; /* 10 milliseconds, transaction */
+  int retry_sleep= 30; /* 30 milliseconds, transaction */
   const NDBCOL *col[SCHEMA_SIZE];
   unsigned sz[SCHEMA_SIZE];
 
@@ -1553,13 +1911,13 @@ int ndbcluster_log_schema_op(THD *thd,
 err:
     const NdbError *this_error= trans ?
       &trans->getNdbError() : &ndb->getNdbError();
-    if (this_error->status == NdbError::TemporaryError)
+    if (this_error->status == NdbError::TemporaryError && !thd->killed)
     {
       if (retries--)
       {
         if (trans)
           ndb->closeTransaction(trans);
-        my_sleep(retry_sleep);
+        do_retry_sleep(retry_sleep);
         continue; // retry
       }
     }
@@ -1817,6 +2175,8 @@ ndb_binlog_thread_handle_schema_event(TH
     if (ev_type == NDBEVENT::TE_UPDATE ||
         ev_type == NDBEVENT::TE_INSERT)
     {
+      Thd_ndb *thd_ndb= get_thd_ndb(thd);
+      Thd_ndb_options_guard thd_ndb_options(thd_ndb);
       Cluster_schema *schema= (Cluster_schema *)
         sql_alloc(sizeof(Cluster_schema));
       MY_BITMAP slock;
@@ -1868,10 +2228,12 @@ ndb_binlog_thread_handle_schema_event(TH
           // fall through
         case SOT_RENAME_TABLE_NEW:
         {
-          uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
-                             "NDB Binlog: Skipping renaming locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
-                             schema->db, schema->name, schema->query,
-                             schema->node_id);
+          uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+                                "NDB Binlog: Skipping renaming locally "
+                                "defined table '%s.%s' from binlog schema "
+                                "event '%s' from node %d. ",
+                                schema->db, schema->name, schema->query,
+                                schema->node_id);
           
           errmsg[end]= '\0';
         }
@@ -1879,14 +2241,17 @@ ndb_binlog_thread_handle_schema_event(TH
         case SOT_DROP_TABLE:
           if (schema_type == SOT_DROP_TABLE)
           {
-            uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
-                               "NDB Binlog: Skipping dropping locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
-                               schema->db, schema->name, schema->query,
-                               schema->node_id);
+            uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+                                  "NDB Binlog: Skipping dropping locally "
+                                  "defined table '%s.%s' from binlog schema "
+                                  "event '%s' from node %d. ",
+                                  schema->db, schema->name, schema->query,
+                                  schema->node_id);
             errmsg[end]= '\0';
           }
           if (! ndbcluster_check_if_local_table(schema->db, schema->name))
           {
+            thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
             const int no_print_error[1]=
               {ER_BAD_TABLE_ERROR}; /* ignore missing table */
             run_query(thd, schema->query,
@@ -1946,6 +2311,7 @@ ndb_binlog_thread_handle_schema_event(TH
           break;
         // fall through
         case SOT_CREATE_TABLE:
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -1973,6 +2339,7 @@ ndb_binlog_thread_handle_schema_event(TH
           break;
         case SOT_DROP_DB:
           /* Drop the database locally if it only contains ndb tables */
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
           {
             const int no_print_error[1]= {0};
@@ -2002,6 +2369,7 @@ ndb_binlog_thread_handle_schema_event(TH
           /* fall through */
         case SOT_ALTER_DB:
         {
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           const int no_print_error[1]= {0};
           run_query(thd, schema->query,
                     schema->query + schema->query_length,
@@ -2158,8 +2526,10 @@ ndb_binlog_thread_handle_schema_event_po
     return;
   DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
   Cluster_schema *schema;
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
   while ((schema= post_epoch_log_list->pop()))
   {
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
     DBUG_PRINT("info",
                ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                 schema->db, schema->name,
@@ -2307,6 +2677,7 @@ ndb_binlog_thread_handle_schema_event_po
             free_share(&share);
             share= 0;
           }
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -2486,6 +2857,7 @@ ndb_binlog_thread_handle_schema_event_po
             free_share(&share);
             share= 0;
           }
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -2766,7 +3138,7 @@ void set_binlog_flags(NDB_SHARE *share)
     set_binlog_full(share);
 }
 
-bool
+static bool
 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
 {
   char key[FN_REFLEN];
@@ -2788,7 +3160,7 @@ ndbcluster_check_if_local_table(const ch
   DBUG_RETURN(false);
 }
 
-bool
+static bool
 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
 {
   DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
@@ -3350,9 +3722,9 @@ ndbcluster_create_event_ops(THD *thd, ND
       op->setCustomData(NULL);
       ndb->dropEventOperation(op);
       pthread_mutex_unlock(&injector_mutex);
-      if (retries)
+      if (retries && !thd->killed)
       {
-        my_sleep(retry_sleep);
+        do_retry_sleep(retry_sleep);
         continue;
       }
       DBUG_RETURN(-1);
@@ -5089,17 +5461,17 @@ ndbcluster_show_status_binlog(THD* thd, 
     pthread_mutex_unlock(&injector_mutex);
 
     buflen=
-      snprintf(buf, sizeof(buf),
-               "latest_epoch=%s, "
-               "latest_trans_epoch=%s, "
-               "latest_received_binlog_epoch=%s, "
-               "latest_handled_binlog_epoch=%s, "
-               "latest_applied_binlog_epoch=%s",
-               llstr(ndb_latest_epoch, buff1),
-               llstr(ndb_get_latest_trans_gci(), buff2),
-               llstr(ndb_latest_received_binlog_epoch, buff3),
-               llstr(ndb_latest_handled_binlog_epoch, buff4),
-               llstr(ndb_latest_applied_binlog_epoch, buff5));
+      my_snprintf(buf, sizeof(buf),
+                  "latest_epoch=%s, "
+                  "latest_trans_epoch=%s, "
+                  "latest_received_binlog_epoch=%s, "
+                  "latest_handled_binlog_epoch=%s, "
+                  "latest_applied_binlog_epoch=%s",
+                  llstr(ndb_latest_epoch, buff1),
+                  llstr(ndb_get_latest_trans_gci(), buff2),
+                  llstr(ndb_latest_received_binlog_epoch, buff3),
+                  llstr(ndb_latest_handled_binlog_epoch, buff4),
+                  llstr(ndb_latest_applied_binlog_epoch, buff5));
     if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
                    "binlog", strlen("binlog"),
                    buf, buflen))

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2008-03-26 23:22:46 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2008-10-15 12:14:27 +0000
@@ -159,10 +159,6 @@ void ndbcluster_binlog_init_handlerton()
   Initialize the binlog part of the NDB_SHARE
 */
 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *table);
-
-bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
-bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
-
 int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
                                    uint key_len,
                                    const char *db,
@@ -269,3 +265,17 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
 { thd_set_ha_data(thd, ndbcluster_hton, thd_ndb); }
 
 Ndb* check_ndb_in_thd(THD* thd);
+
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb);
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg);
+
+class Ndbcluster_global_schema_lock_guard
+{
+public:
+  Ndbcluster_global_schema_lock_guard(THD *thd);
+  ~Ndbcluster_global_schema_lock_guard();
+  int lock();
+private:
+  THD *m_thd;
+  int m_lock;
+};

=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc	2008-04-09 13:52:09 +0000
+++ b/sql/ha_ndbcluster_connection.cc	2008-10-02 15:52:29 +0000
@@ -91,7 +91,9 @@ int ndbcluster_connect(int (*connect_cal
         (now_time.tv_sec == end_time.tv_sec &&
          now_time.tv_usec >= end_time.tv_usec))
       break;
-    sleep(1);
+    do_retry_sleep(100);
+    if (abort_loop)
+      goto ndbcluster_connect_error;
   }
 
   {

=== modified file 'sql/ha_ndbcluster_connection.h'
--- a/sql/ha_ndbcluster_connection.h	2008-04-09 13:52:09 +0000
+++ b/sql/ha_ndbcluster_connection.h	2008-10-02 15:52:29 +0000
@@ -23,3 +23,9 @@ int ndb_has_node_id(uint id);
 
 /* options from from mysqld.cc */
 extern ulong opt_ndb_cluster_connection_pool;
+
+/* perform random sleep in the range milli_sleep to 2*milli_sleep */
+inline void do_retry_sleep(unsigned milli_sleep)
+{
+  my_sleep(1000*(milli_sleep + 5*(rand()%(milli_sleep/5))));
+}

=== added file 'sql/ha_ndbcluster_lock_ext.h'
--- a/sql/ha_ndbcluster_lock_ext.h	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndbcluster_lock_ext.h	2008-10-09 09:10:50 +0000
@@ -0,0 +1,113 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+/*
+  These functions are shared with ndb_restore so that the creating of
+  tables through ndb_restore is syncronized correctly with the mysqld's
+
+  The lock/unlock functions use the BACKUP_SEQUENCE row in SYSTAB_0
+
+  retry_time == 0 means no retry
+  retry_time <  0 means infinite retries
+  retry_time >  0 means retries for max 'retry_time' seconds
+*/
+static NdbTransaction *
+ndbcluster_global_schema_lock_ext(THD *thd, Ndb *ndb, NdbError &ndb_error,
+                                  int retry_time= 10)
+{
+  ndb->setDatabaseName("sys");
+  ndb->setDatabaseSchemaName("def");
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
+  Ndb_table_guard ndbtab_g(dict, "SYSTAB_0");
+  const NdbDictionary::Table *ndbtab= NULL;
+  NdbOperation *op;
+  NdbTransaction *trans= NULL;
+  int retry_sleep= 50; /* 50 milliseconds, transaction */
+  NDB_TICKS time_end;
+
+  if (retry_time > 0)
+  {
+    time_end= NdbTick_CurrentMillisecond();
+    time_end+= retry_time * 1000;
+  }
+  while (1)
+  {
+    if (!ndbtab)
+    {
+      if (!(ndbtab= ndbtab_g.get_table()))
+      {
+        if (dict->getNdbError().status == NdbError::TemporaryError)
+          goto retry;
+        ndb_error= dict->getNdbError();
+        goto error_handler;
+      }
+    }
+
+    trans= ndb->startTransaction();
+    if (trans == NULL)
+    {
+      ndb_error= ndb->getNdbError();
+      goto error_handler;
+    }
+
+    op= trans->getNdbOperation(ndbtab);
+    op->readTuple(NdbOperation::LM_Exclusive);
+    op->equal("SYSKEY_0", NDB_BACKUP_SEQUENCE);
+
+    if (trans->execute(NdbTransaction::NoCommit) == 0)
+      break;
+
+    if (trans->getNdbError().status != NdbError::TemporaryError)
+      goto error_handler;
+    else if (thd->killed)
+      goto error_handler;
+  retry:
+    if (retry_time == 0)
+      goto error_handler;
+    if (retry_time > 0 &&
+        time_end < NdbTick_CurrentMillisecond())
+      goto error_handler;
+    if (trans)
+    {
+      ndb->closeTransaction(trans);
+      trans= NULL;
+    }
+    do_retry_sleep(retry_sleep);
+  }
+  return trans;
+
+ error_handler:
+  if (trans)
+  {
+    ndb_error= trans->getNdbError();
+    ndb->closeTransaction(trans);
+  }
+  return NULL;
+}
+
+static int
+ndbcluster_global_schema_unlock_ext(Ndb *ndb, NdbTransaction *trans,
+                                    NdbError &ndb_error)
+{
+  if (trans->execute(NdbTransaction::Commit))
+  {
+    ndb_error= trans->getNdbError();
+    ndb->closeTransaction(trans);
+    return -1;
+  }
+  ndb->closeTransaction(trans);
+  return 0;
+}

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2008-09-05 08:57:28 +0000
+++ b/sql/handler.cc	2008-10-15 12:14:27 +0000
@@ -3855,6 +3855,42 @@ int ha_binlog_index_purge_file(THD *thd,
   return 0;
 }
 
+static int ha_global_schema_lock(THD *thd)
+{
+  binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_LOCK, 0};
+  binlog_func_foreach(thd, &bfn);
+  if (thd->main_da.is_error())
+    return 1;
+  return 0;
+}
+
+static int ha_global_schema_unlock(THD *thd)
+{
+  binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_UNLOCK, 0};
+  binlog_func_foreach(thd, &bfn);
+  if (thd->main_da.is_error())
+    return 1;
+  return 0;
+}
+
+Ha_global_schema_lock_guard::Ha_global_schema_lock_guard(THD *thd)
+  : m_thd(thd), m_lock(0)
+{
+}
+
+Ha_global_schema_lock_guard::~Ha_global_schema_lock_guard()
+{
+  if (m_lock)
+    ha_global_schema_unlock(m_thd);
+}
+
+int Ha_global_schema_lock_guard::lock()
+{
+  DBUG_ASSERT(m_lock == 0);
+  m_lock= 1;
+  return ha_global_schema_lock(m_thd);
+}
+
 struct binlog_log_query_st
 {
   enum_binlog_command binlog_command;

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2008-09-02 15:44:54 +0000
+++ b/sql/handler.h	2008-10-15 12:14:27 +0000
@@ -334,7 +334,9 @@ enum enum_binlog_func {
   BFN_RESET_SLAVE=       2,
   BFN_BINLOG_WAIT=       3,
   BFN_BINLOG_END=        4,
-  BFN_BINLOG_PURGE_FILE= 5
+  BFN_BINLOG_PURGE_FILE= 5,
+  BFN_GLOBAL_SCHEMA_LOCK=  6,
+  BFN_GLOBAL_SCHEMA_UNLOCK=7
 };
 
 enum enum_binlog_command {
@@ -2503,6 +2505,16 @@ void ha_binlog_log_query(THD *thd, handl
                          const char *db, const char *table_name);
 void ha_binlog_wait(THD *thd);
 int ha_binlog_end(THD *thd);
+class Ha_global_schema_lock_guard
+{
+public:
+  Ha_global_schema_lock_guard(THD *thd);
+  ~Ha_global_schema_lock_guard();
+  int lock();
+private:
+  THD *m_thd;
+  int m_lock;
+};
 #else
 inline int ha_int_dummy() { return 0; }
 #define ha_reset_logs(a) ha_int_dummy()
@@ -2511,4 +2523,11 @@ inline int ha_int_dummy() { return 0; }
 #define ha_binlog_log_query(a,b,c,d,e,f,g) do {} while (0)
 #define ha_binlog_wait(a) do {} while (0)
 #define ha_binlog_end(a)  do {} while (0)
+class Ha_global_schema_lock_guard
+{
+public:
+  Ha_global_schema_lock_guard(THD *thd) {}
+  ~Ha_global_schema_lock_guard() {}
+  int lock() { return 0; }
+};
 #endif

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-09-02 15:44:54 +0000
+++ b/sql/mysqld.cc	2008-10-15 12:14:27 +0000
@@ -6160,7 +6160,7 @@ thread is in the master's binlogs.",
    "Turn on more logging in the error log.",
    (uchar**) &ndb_extra_logging,
    (uchar**) &ndb_extra_logging,
-   0, GET_ULONG, OPT_ARG, 0, 0, 0, 0, 0, 0},
+   0, GET_ULONG, OPT_ARG, 1, 0, 0, 0, 0, 0},
 #ifdef HAVE_NDB_BINLOG
   {"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
    "Threshold on number of epochs to be behind before reporting binlog status. "

=== modified file 'sql/sql_db.cc'
--- a/sql/sql_db.cc	2008-08-27 08:47:03 +0000
+++ b/sql/sql_db.cc	2008-10-15 12:14:27 +0000
@@ -616,6 +616,7 @@ int mysql_create_db(THD *thd, char *db, 
   MY_STAT stat_info;
   uint create_options= create_info ? create_info->options : 0;
   uint path_len;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_db");
 
   /* do not create 'information_schema' db */
@@ -643,6 +644,8 @@ int mysql_create_db(THD *thd, char *db, 
     goto exit2;
   }
 
+  global_schema_lock_guard.lock();
+
   pthread_mutex_lock(&LOCK_mysql_create_db);
 
   /* Check directory */
@@ -767,6 +770,7 @@ bool mysql_alter_db(THD *thd, const char
   char path[FN_REFLEN+16];
   long result=1;
   int error= 0;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_alter_db");
 
   /*
@@ -784,6 +788,8 @@ bool mysql_alter_db(THD *thd, const char
   if ((error=wait_if_global_read_lock(thd,0,1)))
     goto exit2;
 
+  global_schema_lock_guard.lock();
+
   pthread_mutex_lock(&LOCK_mysql_create_db);
 
   /* 
@@ -861,6 +867,7 @@ bool mysql_rm_db(THD *thd,char *db,bool 
   MY_DIR *dirp;
   uint length;
   TABLE_LIST* dropped_tables= 0;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_rm_db");
 
   /*
@@ -880,6 +887,8 @@ bool mysql_rm_db(THD *thd,char *db,bool 
     error= -1;
     goto exit2;
   }
+
+  global_schema_lock_guard.lock();
 
   pthread_mutex_lock(&LOCK_mysql_create_db);
 

=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc	2008-09-02 15:44:54 +0000
+++ b/sql/sql_delete.cc	2008-10-15 12:14:27 +0000
@@ -977,6 +977,7 @@ bool mysql_truncate(THD *thd, TABLE_LIST
   bool error;
   uint path_length;
   MDL_LOCK_DATA *mdl_lock_data= 0;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_truncate");
 
   bzero((char*) &create_info,sizeof(create_info));
@@ -1030,6 +1031,8 @@ bool mysql_truncate(THD *thd, TABLE_LIST
                                       HTON_CAN_RECREATE))
       goto trunc_by_del;
 
+    if (table_type == DB_TYPE_NDBCLUSTER)
+      global_schema_lock_guard.lock();
     /*
       FIXME: Actually code of TRUNCATE breaks meta-data locking protocol since
              tries to get table enging and therefore accesses table in some way

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2008-09-02 15:44:54 +0000
+++ b/sql/sql_parse.cc	2008-10-15 12:14:27 +0000
@@ -2503,6 +2503,7 @@ mysql_execute_command(THD *thd)
     if (select_lex->item_list.elements)		// With select
     {
       select_result *result;
+      Ha_global_schema_lock_guard global_schema_lock_guard(thd);
 
       select_lex->options|= SELECT_NO_UNLOCK;
       unit->set_limit(select_lex);
@@ -2524,6 +2525,8 @@ mysql_execute_command(THD *thd)
       {
         lex->link_first_table_back(create_table, link_to_local);
         create_table->open_type= TABLE_LIST::OPEN_OR_CREATE;
+        if (!thd->locked_tables_mode)
+          global_schema_lock_guard.lock();
       }
 
       if (!(res= open_and_lock_tables(thd, lex->query_tables)))

=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc	2008-06-11 04:33:36 +0000
+++ b/sql/sql_rename.cc	2008-10-15 12:14:27 +0000
@@ -37,6 +37,7 @@ bool mysql_rename_tables(THD *thd, TABLE
   TABLE_LIST *ren_table= 0;
   int to_table;
   char *rename_log_table[2]= {NULL, NULL};
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_rename_tables");
 
   /*
@@ -132,6 +133,13 @@ bool mysql_rename_tables(THD *thd, TABLE
       DBUG_RETURN(1);
     }
   }
+
+  /*
+    Rename table may clash with parallell or existing ndb table
+    thus a global shema lock is needed to ensure global serialization.
+    Moreover we do not know here what table type the tables have.
+  */
+  global_schema_lock_guard.lock();
 
   if (lock_table_names(thd, table_list))
     goto err;

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2008-09-05 08:57:28 +0000
+++ b/sql/sql_table.cc	2008-10-15 12:14:27 +0000
@@ -1554,6 +1554,14 @@ int mysql_rm_table_part2(THD *thd, TABLE
   }
 
   mysql_ha_rm_tables(thd, tables);
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+  /*
+    Here we have no way of knowing if an ndb table is one of
+    the ones that should be dropped, so we take the global
+    schema lock to be safe.
+  */
+  if (!drop_temporary)
+    global_schema_lock_guard.lock();
 
   /*
     If we have the table in the definition cache, we don't have to check the
@@ -3739,8 +3747,14 @@ bool mysql_create_table(THD *thd, const 
 {
   MDL_LOCK_DATA *target_lock_data= 0;
   bool result;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_table");
 
+  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
+      !create_info->frm_only &&
+      !internal_tmp_table)
+    global_schema_lock_guard.lock();
+
   /* Wait for any database locks */
   pthread_mutex_lock(&LOCK_lock_db);
   while (!thd->killed &&
@@ -4819,9 +4833,13 @@ bool mysql_create_like_table(THD* thd, T
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   char tmp_path[FN_REFLEN];
 #endif
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_like_table");
 
 
+  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+    global_schema_lock_guard.lock();
+
   /*
     By opening source table and thus acquiring shared metadata lock on it
     we guarantee that it exists and no concurrent DDL operation will mess
@@ -6561,6 +6579,11 @@ bool mysql_alter_table(THD *thd,char *ne
 
     if (wait_if_global_read_lock(thd,0,1))
       DBUG_RETURN(TRUE);
+
+    Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+    if (table_type == DB_TYPE_NDBCLUSTER)
+      global_schema_lock_guard.lock();
+
     if (lock_table_names(thd, table_list))
     {
       error= 1;
@@ -6588,6 +6611,21 @@ view_err:
     DBUG_RETURN(error);
   }
 
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+  if (table_type == DB_TYPE_NDBCLUSTER ||
+      (create_info->db_type && create_info->db_type->db_type == DB_TYPE_NDBCLUSTER))
+  {
+    /*
+      To avoid deadlock in this situation
+    */
+    if (thd->locked_tables_mode)
+    {
+      my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
+                 ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
+      DBUG_RETURN(TRUE);
+    }
+    global_schema_lock_guard.lock();
+  }
   if (!(table= open_n_lock_single_table(thd, table_list, TL_WRITE_ALLOW_READ,
                                         MYSQL_OPEN_TAKE_UPGRADABLE_MDL)))
     DBUG_RETURN(TRUE);

=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h	2007-03-23 13:06:00 +0000
+++ b/storage/ndb/include/ndb_constants.h	2008-09-30 06:55:35 +0000
@@ -97,4 +97,9 @@
 #define NDB_SUM_READONLY     1
 #define NDB_SUM_READ_WRITE   2
 
+/*
+ * SYSTAB_0 reserved keys
+ */
+#define NDB_BACKUP_SEQUENCE 0x1F000000
+
 #endif

=== modified file 'storage/ndb/include/util/ndb_opts.h'
--- a/storage/ndb/include/util/ndb_opts.h	2007-07-02 17:08:02 +0000
+++ b/storage/ndb/include/util/ndb_opts.h	2008-10-09 11:53:53 +0000
@@ -100,8 +100,13 @@ const char *opt_debug= 0;
 
 static void ndb_std_print_version()
 {
-  printf("MySQL distrib %s, for %s (%s)\n",
-	 NDB_VERSION_STRING,SYSTEM_TYPE,MACHINE_TYPE);
+#ifndef DBUG_OFF
+  const char *suffix= "-debug";
+#else
+  const char *suffix= "";
+#endif
+  printf("MySQL distrib %s%s, for %s (%s)\n",
+	 NDB_VERSION_STRING,suffix,SYSTEM_TYPE,MACHINE_TYPE);
 }
 
 static void usage();

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-06-09 11:57:17 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-09-30 06:55:35 +0000
@@ -61,8 +61,6 @@
 
 static NDB_TICKS startTime;
 
-static const Uint32 BACKUP_SEQUENCE = 0x1F000000;
-
 #ifdef VM_TRACE
 #define DEBUG_OUT(x) ndbout << x << endl
 #else
@@ -170,7 +168,7 @@ Backup::createSequence(Signal* signal)
   UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
   
   req->senderData  = RNIL;
-  req->sequenceId  = BACKUP_SEQUENCE;
+  req->sequenceId  = NDB_BACKUP_SEQUENCE;
   req->requestType = UtilSequenceReq::Create;
   
   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
@@ -1131,7 +1129,7 @@ Backup::execBACKUP_REQ(Signal* signal)
     
   ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ;
   utilReq->senderData  = ptr.i;
-  utilReq->sequenceId  = BACKUP_SEQUENCE;
+  utilReq->sequenceId  = NDB_BACKUP_SEQUENCE;
   utilReq->requestType = UtilSequenceReq::NextVal;
   sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
 	     signal, UtilSequenceReq::SignalLength, JBB);

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2008-09-18 14:29:25 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2008-09-26 12:55:06 +0000
@@ -435,6 +435,27 @@ Trix::execDUMP_STATE_ORD(Signal* signal)
     // Ignore
   }
   }
+
+  if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
+  {
+    RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
+    return;
+  }
+
+  if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
+  {
+    RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
+    return;
+  }
+  
+  if (signal->theData[0] == 8004)
+  {
+    infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
+              c_theSubscriptionRecPool.getSize(),
+              c_theSubscriptionRecPool.getNoOfFree());
+    return;
+  }
+
 }
 
 // Build index

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.hpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.hpp	2008-02-20 09:04:29 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.hpp	2008-09-26 12:55:06 +0000
@@ -131,6 +131,7 @@ private:
    * The pool of node records
    */
   ArrayPool<SubscriptionRecord> c_theSubscriptionRecPool;
+  RSS_AP_SNAPSHOT(c_theSubscriptionRecPool);
 
   /**
    * The list of other subscriptions

=== modified file 'storage/ndb/src/ndbapi/NdbInterpretedCode.cpp'
--- a/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-05-22 16:22:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-09-29 18:26:36 +0000
@@ -666,17 +666,21 @@ NdbInterpretedCode::add_val(Uint32 attrI
   int res= 0;
   if ((res= read_attr(6, attrId) != 0))
     return res;
-  
+
   /* Load constant into register 7 */
   /* We attempt to use the smallest constant load
    * instruction
    */
   if (aValue < (1 << 16))
+  {
     if ((res= load_const_u16(7, aValue)) != 0)
       return res;   
+  }
   else
+  {
     if ((res= load_const_u32(7, aValue)) != 0)
       return res;
+  }
 
   /* Add registers 6 and 7 -> 7*/
   if ((res= add_reg(7, 6, 7)) != 0)
@@ -700,6 +704,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
    * instruction
    */
   if ((aValue >> 32) == 0)
+  {
     if (aValue < (1 << 16))
     {
       if ((res= load_const_u16(7, aValue)) != 0)
@@ -710,6 +715,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
       if ((res= load_const_u32(7, aValue)) != 0)
         return res;
     }
+  }
   else
     if ((res= load_const_u64(7, aValue)) != 0)
       return res;
@@ -717,7 +723,7 @@ NdbInterpretedCode::add_val(Uint32 attrI
   /* Add registers 6 and 7 -> 7*/
   if ((res= add_reg(7, 6, 7)) != 0)
     return res;
-
+  
   /* Write back */
   return write_attr(attrId, 7);
 }
@@ -736,11 +742,15 @@ NdbInterpretedCode::sub_val(Uint32 attrI
    * instruction
    */
   if (aValue < (1 << 16))
+  {
     if ((res= load_const_u16(7, aValue)) != 0)
-      return res;   
+      return res;
+  }   
   else
+  {
     if ((res= load_const_u32(7, aValue)) != 0)
       return res;
+  }
 
   /* Subtract register (R7=R6-R7)*/
   if ((res= sub_reg(7, 6, 7)) != 0)
@@ -764,6 +774,7 @@ NdbInterpretedCode::sub_val(Uint32 attrI
    * instruction
    */
   if ((aValue >> 32) == 0)
+  {
     if (aValue < (1 << 16))
     {
       if ((res= load_const_u16(7, aValue)) != 0)
@@ -774,9 +785,12 @@ NdbInterpretedCode::sub_val(Uint32 attrI
       if ((res= load_const_u32(7, aValue)) != 0)
         return res;
     }
+  }
   else
+  {
     if ((res= load_const_u64(7, aValue)) != 0)
       return res;
+  }
 
   /* Subtract register (R7=R6-R7)*/
   if ((res= sub_reg(7, 6, 7)) != 0)

=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am	2008-02-21 14:24:09 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am	2008-09-30 08:18:41 +0000
@@ -57,7 +57,8 @@ testIndexStat \
 ndbapi_50compat0 \
 ndbapi_50compat1 \
 testNDBT \
-NdbRepStress
+NdbRepStress \
+msa
 
 EXTRA_PROGRAMS = \
  test_event \
@@ -115,6 +116,7 @@ testSRBank_SOURCES = testSRBank.cpp
 test_event_merge_SOURCES = test_event_merge.cpp
 test_event_multi_table_SOURCES = test_event_multi_table.cpp
 testIndexStat_SOURCES = testIndexStat.cpp
+msa_SOURCES = msa.cpp
 
 ndbapi_50compat0_CPPFLAGS = -DNDBAPI_50_COMPAT
 ndbapi_50compat0_SOURCES = ndbapi_50compat0.cpp

=== modified file 'storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp'
--- a/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp	2008-09-30 08:18:41 +0000
@@ -25,6 +25,8 @@
 
 #include "userInterface.h"
 #include "dbGenerator.h"
+#include "ndb_schema.hpp"
+
 
 static int   numProcesses;
 static int   numSeconds;
@@ -33,6 +35,8 @@ static int   parallellism;
 static int   millisSendPoll;
 static int   minEventSendPoll;
 static int   forceSendPoll;
+static bool  useNdbRecord;
+static bool  useCombUpd;
 
 static ThreadData *data;
 static Ndb_cluster_connection *g_cluster_connection= 0;
@@ -53,8 +57,8 @@ static void usage(const char *prog)
      ++progname;
 
    ndbout_c(
-           "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] " 
-	   "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
+           "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]" 
+	   "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
            "  -proc <num>    Specifies that <num> is the number of\n"
            "                 threads. The default is 1.\n"
            "  -time <num>    Specifies that the test will run for <num> sec.\n"
@@ -68,7 +72,11 @@ static void usage(const char *prog)
 	   "sendPoll\n"
 	   "                 Default is 1\n"
 	   "  -f <num>       force parameter to sendPoll\n"
-	   "                 Default is 0\n",
+	   "                 Default is 0\n"
+           "  -ndbrecord     Use NdbRecord Api.\n"
+           "                 Default is to use old Api\n"
+           "  -combupdread   Use update pre-read operation where possible\n"
+           "                 Default is to use separate read+update ops\n",
            progname);
 }
 
@@ -85,7 +93,8 @@ parse_args(int argc, const char **argv)
    millisSendPoll   = 10000;
    minEventSendPoll = 1;
    forceSendPoll    = 0;
-   
+   useNdbRecord     = false;
+   useCombUpd       = false;
 
    i = 1;
    while (i < argc){
@@ -156,6 +165,15 @@ parse_args(int argc, const char **argv)
        }
        i += 2;
      }
+     else if (strcmp("-ndbrecord",argv[i]) == 0) {
+       useNdbRecord= true;
+       i++;
+     }
+     else if (strcmp("-combupdread",argv[i]) == 0) {
+       /* Comb up some dread */
+       useCombUpd= true;
+       i++;
+     }
      else {
        return 1;
      }
@@ -169,6 +187,10 @@ parse_args(int argc, const char **argv)
      ndbout_c("exiting...");
      return 1;
    }
+   if (useNdbRecord && useCombUpd){
+     ndbout_c("NdbRecord does not currently support combined update "
+              "and read.  Using separate read and update ops");
+   }
    return 0;
 }
 
@@ -232,6 +254,7 @@ print_stats(const char       *title,
   ndbout_c("Processor     : %s", name);
   ndbout_c("Number of Proc: %d",numProc);
   ndbout_c("Parallellism  : %d", parallellism);
+  ndbout_c("UseNdbRecord  : %u", useNdbRecord);
   ndbout_c("\n");
 
   if( gen->totalTransactions == 0 ) {
@@ -316,19 +339,192 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
     ndbout << "Cluster nodes not ready in 30 seconds." << endl;
     return 0;
   }
-  
+
+  NdbRecordSharedData* ndbRecordSharedDataPtr= NULL;
+
   g_cluster_connection= &con;
   data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
- 
+
+  NdbInterpretedCode* prog1= 0;
+  NdbInterpretedCode* prog2= 0;
+  NdbInterpretedCode* prog3= 0;
+
+  if (useNdbRecord)
+  {
+    /* We'll create NdbRecord structures to match the TransactionData
+     * struct
+     */
+
+    ndbRecordSharedDataPtr= (NdbRecordSharedData*) 
+      malloc(sizeof(NdbRecordSharedData));
+    Ndb* tempNdb= asyncDbConnect(1);
+    NdbDictionary::Dictionary* dict= tempNdb->getDictionary();
+
+    NdbDictionary::RecordSpecification cols[7];
+    
+    const NdbDictionary::Table* tab= dict->getTable(SUBSCRIBER_TABLE);
+    cols[0].column= tab->getColumn((int) IND_SUBSCRIBER_NUMBER);
+    cols[0].offset= offsetof(TransactionData, number);
+    cols[0].nullbit_byte_offset= 0;
+    cols[0].nullbit_bit_in_byte=  0;
+    cols[1].column= tab->getColumn((int) IND_SUBSCRIBER_NAME);
+    cols[1].offset= offsetof(TransactionData, name);
+    cols[1].nullbit_byte_offset= 0;
+    cols[1].nullbit_bit_in_byte=  0;
+    cols[2].column= tab->getColumn((int) IND_SUBSCRIBER_GROUP);
+    cols[2].offset= offsetof(TransactionData, group_id);
+    cols[2].nullbit_byte_offset= 0;
+    cols[2].nullbit_bit_in_byte=  0;
+    cols[3].column= tab->getColumn((int) IND_SUBSCRIBER_LOCATION);
+    cols[3].offset= offsetof(TransactionData, location);
+    cols[3].nullbit_byte_offset= 0;
+    cols[3].nullbit_bit_in_byte=  0;
+    cols[4].column= tab->getColumn((int) IND_SUBSCRIBER_SESSIONS);
+    cols[4].offset= offsetof(TransactionData, sessions);
+    cols[4].nullbit_byte_offset= 0;
+    cols[4].nullbit_bit_in_byte=  0;
+    cols[5].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_BY);
+    cols[5].offset= offsetof(TransactionData, changed_by);
+    cols[5].nullbit_byte_offset= 0;
+    cols[5].nullbit_bit_in_byte=  0;
+    cols[6].column= tab->getColumn((int) IND_SUBSCRIBER_CHANGED_TIME);
+    cols[6].offset= offsetof(TransactionData, changed_time);
+    cols[6].nullbit_byte_offset= 0;
+    cols[6].nullbit_bit_in_byte=  0;
+    
+    ndbRecordSharedDataPtr->subscriberTableNdbRecord= 
+      dict->createRecord(tab, cols, 7, sizeof(cols[0]), 0);
+
+    if (ndbRecordSharedDataPtr->subscriberTableNdbRecord == NULL)
+    {
+      ndbout << "Error creating record 1 : " << dict->getNdbError() << endl;
+      return -1;
+    }
+
+    tab= dict->getTable(GROUP_TABLE);
+    cols[0].column= tab->getColumn((int) IND_GROUP_ID);
+    cols[0].offset= offsetof(TransactionData, group_id);
+    cols[0].nullbit_byte_offset= 0;
+    cols[0].nullbit_bit_in_byte=  0;
+    /* GROUP_NAME not used via NdbRecord */
+    cols[1].column= tab->getColumn((int) IND_GROUP_ALLOW_READ);
+    cols[1].offset= offsetof(TransactionData, permission);
+    cols[1].nullbit_byte_offset= 0;
+    cols[1].nullbit_bit_in_byte=  0;
+    cols[2].column= tab->getColumn((int) IND_GROUP_ALLOW_INSERT);
+    cols[2].offset= offsetof(TransactionData, permission);
+    cols[2].nullbit_byte_offset= 0;
+    cols[2].nullbit_bit_in_byte=  0;
+    cols[3].column= tab->getColumn((int) IND_GROUP_ALLOW_DELETE);
+    cols[3].offset= offsetof(TransactionData, permission);
+    cols[3].nullbit_byte_offset= 0;
+    cols[3].nullbit_bit_in_byte=  0;
+
+    ndbRecordSharedDataPtr->groupTableNdbRecord=
+      dict->createRecord(tab, cols, 4, sizeof(cols[0]), 0);
+
+    if (ndbRecordSharedDataPtr->groupTableNdbRecord == NULL)
+    {
+      ndbout << "Error creating record 2: " << dict->getNdbError() << endl;
+      return -1;
+    }
+
+    tab= dict->getTable(SESSION_TABLE);
+    cols[0].column= tab->getColumn((int) IND_SESSION_SUBSCRIBER);
+    cols[0].offset= offsetof(TransactionData, number);
+    cols[0].nullbit_byte_offset= 0;
+    cols[0].nullbit_bit_in_byte=  0;
+    cols[1].column= tab->getColumn((int) IND_SESSION_SERVER);
+    cols[1].offset= offsetof(TransactionData, server_id);
+    cols[1].nullbit_byte_offset= 0;
+    cols[1].nullbit_bit_in_byte=  0;
+    cols[2].column= tab->getColumn((int) IND_SESSION_DATA);
+    cols[2].offset= offsetof(TransactionData, session_details);
+    cols[2].nullbit_byte_offset= 0;
+    cols[2].nullbit_bit_in_byte=  0;
+
+    ndbRecordSharedDataPtr->sessionTableNdbRecord=
+      dict->createRecord(tab, cols, 3, sizeof(cols[0]), 0);
+
+    if (ndbRecordSharedDataPtr->sessionTableNdbRecord == NULL)
+    {
+      ndbout << "Error creating record 3 : " << dict->getNdbError() << endl;
+      return -1;
+    }
+
+    tab= dict->getTable(SERVER_TABLE);
+    cols[0].column= tab->getColumn((int) IND_SERVER_SUBSCRIBER_SUFFIX);
+    cols[0].offset= offsetof(TransactionData, suffix);
+    cols[0].nullbit_byte_offset= 0;
+    cols[0].nullbit_bit_in_byte=  0;
+    cols[1].column= tab->getColumn((int) IND_SERVER_ID);
+    cols[1].offset= offsetof(TransactionData, server_id);
+    cols[1].nullbit_byte_offset= 0;
+    cols[1].nullbit_bit_in_byte=  0;
+    /* SERVER_NAME not used via NdbRecord*/
+    /* SERVER_READS not used via NdbRecord */
+    /* SERVER_INSERTS not used via NdbRecord */
+    /* SERVER_DELETES not used via NdbRecord */
+
+    ndbRecordSharedDataPtr->serverTableNdbRecord=
+      dict->createRecord(tab, cols, 2, sizeof(cols[0]), 0);
+
+    if (ndbRecordSharedDataPtr->serverTableNdbRecord == NULL)
+    {
+      ndbout << "Error creating record 4 : " << dict->getNdbError() << endl;
+      return -1;
+    }
+
+    /* Create program to increment server reads column */
+    prog1= new NdbInterpretedCode(tab);
+
+    if (prog1->add_val(IND_SERVER_READS, (Uint32)1) ||
+        prog1->interpret_exit_ok() ||
+        prog1->finalise())
+    {
+      ndbout << "Program 1 definition failed, exiting." << endl;
+      return -1;
+    }
+
+    prog2= new NdbInterpretedCode(tab);
+
+    if (prog2->add_val(IND_SERVER_INSERTS, (Uint32)1) ||
+        prog2->interpret_exit_ok() ||
+        prog2->finalise())
+    {
+      ndbout << "Program 2 definition failed, exiting." << endl;
+      return -1;
+    }
+
+    prog3= new NdbInterpretedCode(tab);
+
+    if (prog3->add_val(IND_SERVER_DELETES, (Uint32)1) ||
+        prog3->interpret_exit_ok() ||
+        prog3->finalise())
+    {
+      ndbout << "Program 3 definition failed, exiting." << endl;
+      return -1;
+    }
+
+    ndbRecordSharedDataPtr->incrServerReadsProg= prog1;
+    ndbRecordSharedDataPtr->incrServerInsertsProg= prog2;
+    ndbRecordSharedDataPtr->incrServerDeletesProg= prog3;
+
+    asyncDbDisconnect(tempNdb);      
+  }
+
   for(i = 0; i < numProcesses; i++) {
     for(j = 0; j<parallellism; j++){
-      data[i*parallellism+j].warmUpSeconds   = numWarmSeconds;
-      data[i*parallellism+j].testSeconds     = numSeconds;
-      data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
-      data[i*parallellism+j].randomSeed      = 
+      int tid= i*parallellism + j;
+      data[tid].warmUpSeconds         = numWarmSeconds;
+      data[tid].testSeconds           = numSeconds;
+      data[tid].coolDownSeconds       = numWarmSeconds;
+      data[tid].randomSeed            = 
 	NdbTick_CurrentMillisecond()+i+j;
-      data[i*parallellism+j].changedTime     = 0;
-      data[i*parallellism+j].runState        = Runnable;
+      data[tid].changedTime           = 0;
+      data[tid].runState              = Runnable;
+      data[tid].ndbRecordSharedData   = ndbRecordSharedDataPtr;
+      data[tid].useCombinedUpdate     = useCombUpd;
     }
     sprintf(threadName, "AsyncThread[%d]", i);
     pThread = NdbThread_Create(threadRoutine, 
@@ -355,6 +551,14 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
   }
    
   ndbout_c("All threads have finished");
+
+  if (useNdbRecord)
+  {
+    free(ndbRecordSharedDataPtr);
+    delete(prog1);
+    delete(prog2);
+    delete(prog3);
+  }
   
   /*-------------------------------------------*/
   /* Clear all structures for total statistics */
@@ -412,7 +616,6 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
 #include <sys/types.h>
 #include <time.h>
 
-#include "ndb_schema.hpp"
 #include "ndb_error.hpp"
 #include "userInterface.h"
 #include <NdbMutex.h>

=== modified file 'storage/ndb/test/ndbapi/bench/ndb_async2.cpp'
--- a/storage/ndb/test/ndbapi/bench/ndb_async2.cpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/ndb_async2.cpp	2008-09-30 08:18:41 +0000
@@ -66,6 +66,10 @@ startTransaction(Ndb * pNDB, ThreadData 
 #endif
 }
 
+// NdbRecord helper macros
+#define SET_MASK(mask, attrId)                         \
+  mask[attrId >> 3] |= (1 << (attrId & 7))
+
 void
 start_T1(Ndb * pNDB, ThreadData * td, int async){
 
@@ -78,17 +82,46 @@ start_T1(Ndb * pNDB, ThreadData * td, in
     NdbSleep_MilliSleep(10);
   }
 
-  NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
-  if (MyOp != NULL) {  
-    MyOp->updateTuple();  
-    MyOp->equal(IND_SUBSCRIBER_NUMBER, 
-		td->transactionData.number);
-    MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
-		   (char *)&td->transactionData.location);
-    MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
-		   td->transactionData.changed_by);
-    MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
-		   td->transactionData.changed_time);
+  const NdbOperation* op= NULL;
+
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      subscriberTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    //SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
+    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+    
+    op= pCON->updateTuple(record,
+                          rowPtr,
+                          record,
+                          rowPtr,
+                          mask);
+  }
+  else
+  {
+    NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
+    op= MyOp;
+    if (MyOp != NULL) {  
+      MyOp->updateTuple();  
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->setValue(IND_SUBSCRIBER_LOCATION, 
+                     (char *)&td->transactionData.location);
+      MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY, 
+                     td->transactionData.changed_by);
+      MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                     td->transactionData.changed_time);
+    }
+  }
+
+  if (op != NULL)
+  {
     if (async == 1) {
       pCON->executeAsynchPrepare( Commit , T1_Callback, td);
     } else {
@@ -97,7 +130,7 @@ start_T1(Ndb * pNDB, ThreadData * td, in
       return;
     }//if
   } else {
-    CHECK_NULL(MyOp, "T1: getNdbOperation", td, pCON->getNdbError());
+    CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
   }//if
 }
 
@@ -146,21 +179,43 @@ start_T2(Ndb * pNDB, ThreadData * td, in
     NdbSleep_MilliSleep(10);
   }
 
-  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
-  CHECK_NULL(MyOp, "T2: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->readTuple();
-  MyOp->equal(IND_SUBSCRIBER_NUMBER,
-	      td->transactionData.number);
-  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
-		 (char *)&td->transactionData.location);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
-		 td->transactionData.changed_by);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
-		 td->transactionData.changed_time);
-  MyOp->getValue(IND_SUBSCRIBER_NAME, 
-		 td->transactionData.name);
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      subscriberTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+    SET_MASK(mask, IND_SUBSCRIBER_NAME);
+
+    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr, 
+                                              NdbOperation::LM_Read, mask);
+    CHECK_NULL((void*) MyOp, "T2: readTuple", td,
+               pCON->getNdbError());
+  }
+  else
+  {
+    NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+    CHECK_NULL(MyOp, "T2: getNdbOperation", td,
+               pCON->getNdbError());
+    
+    MyOp->readTuple();
+    MyOp->equal(IND_SUBSCRIBER_NUMBER,
+                td->transactionData.number);
+    MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                   (char *)&td->transactionData.location);
+    MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                   td->transactionData.changed_by);
+    MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                   td->transactionData.changed_time);
+    MyOp->getValue(IND_SUBSCRIBER_NAME, 
+                   td->transactionData.name);
+  }
+
   if (async == 1) {
     pCON->executeAsynchPrepare( Commit , T2_Callback, td);
   } else {
@@ -183,6 +238,7 @@ T2_Callback(int result, NdbConnection * 
     start_T2(td->pNDB, td, stat_async);
     return;
   }//if
+
   td->pNDB->closeTransaction(pCON);
   complete_T2(td);
 }
@@ -217,24 +273,48 @@ start_T3(Ndb * pNDB, ThreadData * td, in
     CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
     NdbSleep_MilliSleep(10);
   }
-  
-  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
-  CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->readTuple();
-  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
-	      td->transactionData.number);
-  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
-		 (char *)&td->transactionData.location);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
-		 td->transactionData.changed_by);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
-		 td->transactionData.changed_time);
-  MyOp->getValue(IND_SUBSCRIBER_GROUP, 
-		 (char *)&td->transactionData.group_id);
-  MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
-		 (char *)&td->transactionData.sessions);
+
+  const NdbOperation* op;
+
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      subscriberTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+    op= pCON->readTuple(record, rowPtr, record, rowPtr,
+                        NdbOperation::LM_Read, mask);
+  }
+  else
+  {
+    NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+    op= MyOp;
+    CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
+               pCON->getNdbError());
+    
+    MyOp->readTuple();
+    MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                td->transactionData.number);
+    MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                   (char *)&td->transactionData.location);
+    MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                   td->transactionData.changed_by);
+    MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                   td->transactionData.changed_time);
+    MyOp->getValue(IND_SUBSCRIBER_GROUP, 
+                   (char *)&td->transactionData.group_id);
+    MyOp->getValue(IND_SUBSCRIBER_SESSIONS, 
+                   (char *)&td->transactionData.sessions);
+  }
+
   stat_async = async;
   if (async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
@@ -259,15 +339,35 @@ T3_Callback_1(int result, NdbConnection 
     return;
   }//if
 
-  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
-  CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
-	     pCON->getNdbError());
-    
-  MyOp->readTuple();
-  MyOp->equal(IND_GROUP_ID,
-	      (char*)&td->transactionData.group_id);
-  MyOp->getValue(IND_GROUP_ALLOW_READ, 
-		 (char *)&td->transactionData.permission);
+  const NdbOperation* op= NULL;
+
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      groupTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+    
+    SET_MASK(mask, IND_GROUP_ALLOW_READ);
+
+    op= pCON->readTuple(record, rowPtr, record, rowPtr,
+                        NdbOperation::LM_Read, mask);
+  }
+  else
+  {
+    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+    op= MyOp;
+    CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
+               pCON->getNdbError());
+    
+    MyOp->readTuple();
+    MyOp->equal(IND_GROUP_ID,
+                (char*)&td->transactionData.group_id);
+    MyOp->getValue(IND_GROUP_ALLOW_READ, 
+                   (char *)&td->transactionData.permission);
+  }
+
   if (stat_async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
   } else {
@@ -305,30 +405,66 @@ T3_Callback_2(int result, NdbConnection 
 	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 	   td->transactionData.suffix);
     
-    /* Operation 3 */
-    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
-    CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->simpleRead();
-    MyOp->equal(IND_SESSION_SUBSCRIBER,
-		(char*)td->transactionData.number);
-    MyOp->equal(IND_SESSION_SERVER,
-		(char*)&td->transactionData.server_id);
-    MyOp->getValue(IND_SESSION_DATA, 
-		   (char *)td->transactionData.session_details);
-    
-    /* Operation 4 */
-    MyOp = pCON->getNdbOperation(SERVER_TABLE);
-    CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->interpretedUpdateTuple();
-    MyOp->equal(IND_SERVER_ID,
-		(char*)&td->transactionData.server_id);
-    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
-		(char*)td->transactionData.suffix);
-    MyOp->incValue(IND_SERVER_READS, (uint32)1);
+    /* Operations 3 + 4 */
+    if (td->ndbRecordSharedData)
+    {
+      /* Op 3 */
+      char* rowPtr= (char*) &td->transactionData;
+      const NdbRecord* record= td->ndbRecordSharedData->
+        sessionTableNdbRecord;
+      Uint32 m=0;
+      unsigned char* mask= (unsigned char*) &m;
+
+      SET_MASK(mask, IND_SESSION_DATA);
+
+      const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
+                                                 NdbOperation::LM_SimpleRead,
+                                                 mask);
+      CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
+                 pCON->getNdbError());
+
+      /* Op 4 */
+      record= td->ndbRecordSharedData->
+        serverTableNdbRecord;
+      m= 0;
+
+      /* Attach interpreted program */
+      NdbOperation::OperationOptions opts;
+      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+      opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
+
+      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+                              &opts,
+                              sizeof(opts));
+      CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
+                 pCON->getNdbError());
+    }
+    else
+    {
+      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+      CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->simpleRead();
+      MyOp->equal(IND_SESSION_SUBSCRIBER,
+                  (char*)td->transactionData.number);
+      MyOp->equal(IND_SESSION_SERVER,
+                  (char*)&td->transactionData.server_id);
+      MyOp->getValue(IND_SESSION_DATA, 
+                     (char *)td->transactionData.session_details);
+      
+      MyOp = pCON->getNdbOperation(SERVER_TABLE);
+      CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SERVER_ID,
+                  (char*)&td->transactionData.server_id);
+      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+                  (char*)td->transactionData.suffix);
+      MyOp->incValue(IND_SERVER_READS, (uint32)1);
+    }
+
     td->transactionData.branchExecuted = 1;
   } else {
     DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
@@ -359,6 +495,7 @@ T3_Callback_3(int result, NdbConnection 
     start_T3(td->pNDB, td, stat_async);
     return;
   }//if
+
   td->pNDB->closeTransaction(pCON);
   complete_T3(td);
 }
@@ -392,26 +529,120 @@ start_T4(Ndb * pNDB, ThreadData * td, in
     CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
     NdbSleep_MilliSleep(10);
   }
-  
-  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
-  CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->interpretedUpdateTuple();
-  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
-	      td->transactionData.number);
-  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
-		 (char *)&td->transactionData.location);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
-		 td->transactionData.changed_by);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
-		 td->transactionData.changed_time);
-  MyOp->getValue(IND_SUBSCRIBER_GROUP,
-		 (char *)&td->transactionData.group_id);
-  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
-		 (char *)&td->transactionData.sessions); 
-  MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
-		 (uint32)td->transactionData.server_bit);
+
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      subscriberTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+    
+    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+                                              NdbOperation::LM_Read,
+                                              mask);
+    CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
+               pCON->getNdbError());
+
+    m= 0;
+
+    /* Create program to add something to the subscriber 
+     * sessions column 
+     */
+    Uint32 codeBuf[20];
+
+    for (Uint32 p=0; p<20; p++)
+      codeBuf[p]= 0;
+
+    NdbInterpretedCode program(pNDB->getDictionary()->
+                               getTable(SUBSCRIBER_TABLE),
+                               codeBuf,
+                               20);
+
+    if (program.add_val(IND_SUBSCRIBER_SESSIONS, 
+                        (uint32)td->transactionData.server_bit) || 
+        program.interpret_exit_ok() ||
+        program.finalise())
+    {
+      CHECK_NULL(NULL , "T4-1: Program create failed", td,
+                 program.getNdbError());
+    }
+
+    NdbOperation::OperationOptions opts;
+    opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+    opts.interpretedCode= &program;
+
+    MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+                            mask,
+                            &opts,
+                            sizeof(opts));
+    CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
+               pCON->getNdbError());
+
+  }
+  else
+  {
+    /* Use old Api */
+    if (td->useCombinedUpdate)
+    {
+      NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                     (char *)&td->transactionData.location);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                     td->transactionData.changed_by);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                     td->transactionData.changed_time);
+      MyOp->getValue(IND_SUBSCRIBER_GROUP,
+                     (char *)&td->transactionData.group_id);
+      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+                     (char *)&td->transactionData.sessions); 
+      MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
+                     (uint32)td->transactionData.server_bit);
+    }
+    else
+    {
+      /* Separate read op + update op 
+       * Relies on relative ordering of operation execution on a single
+       * row
+       */
+      NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
+                 pCON->getNdbError());
+      MyOp->readTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                     (char *)&td->transactionData.location);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                     td->transactionData.changed_by);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                     td->transactionData.changed_time);
+      MyOp->getValue(IND_SUBSCRIBER_GROUP,
+                     (char *)&td->transactionData.group_id);
+      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+                     (char *)&td->transactionData.sessions);
+      MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
+                 pCON->getNdbError());
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->incValue(IND_SUBSCRIBER_SESSIONS, 
+                     (uint32)td->transactionData.server_bit);
+    }
+  }
   stat_async = async;
   if (async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
@@ -438,15 +669,35 @@ T4_Callback_1(int result, NdbConnection 
 	 td->transactionData.server_id);
 
 
-  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
-  CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->readTuple();
-  MyOp->equal(IND_GROUP_ID,
-	      (char*)&td->transactionData.group_id);
-  MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
-		 (char *)&td->transactionData.permission);
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      groupTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
+
+    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+                                              NdbOperation::LM_Read,
+                                              mask);
+    
+    CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
+               pCON->getNdbError());
+  }
+  else
+  {
+    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+    CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
+               pCON->getNdbError());
+    
+    MyOp->readTuple();
+    MyOp->equal(IND_GROUP_ID,
+                (char*)&td->transactionData.group_id);
+    MyOp->getValue(IND_GROUP_ALLOW_INSERT, 
+                   (char *)&td->transactionData.permission);
+  }
   if (stat_async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
   } else {
@@ -484,32 +735,66 @@ T4_Callback_2(int result, NdbConnection 
 	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 	   td->transactionData.suffix);
     
-    /* Operation 3 */
-    
-    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
-    CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->insertTuple();
-    MyOp->equal(IND_SESSION_SUBSCRIBER,
-		(char*)td->transactionData.number);
-    MyOp->equal(IND_SESSION_SERVER,
-		(char*)&td->transactionData.server_id);
-    MyOp->setValue(SESSION_DATA, 
-		   (char *)td->transactionData.session_details);
-    /* Operation 4 */
-    
-    /* Operation 5 */
-    MyOp = pCON->getNdbOperation(SERVER_TABLE);
-    CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->interpretedUpdateTuple();
-    MyOp->equal(IND_SERVER_ID,
-		(char*)&td->transactionData.server_id);
-    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
-		(char*)td->transactionData.suffix);
-    MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+    /* Operations 3 + 4 */
+   
+    if (td->ndbRecordSharedData)
+    {
+      char* rowPtr= (char*) &td->transactionData;
+      const NdbRecord* record= td->ndbRecordSharedData->
+        sessionTableNdbRecord;
+      Uint32 m=0;
+      unsigned char* mask= (unsigned char*) &m;
+
+      SET_MASK(mask, IND_SESSION_SUBSCRIBER);
+      SET_MASK(mask, IND_SESSION_SERVER);
+      SET_MASK(mask, IND_SESSION_DATA);
+
+      const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
+
+      CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
+                 pCON->getNdbError());
+
+      record= td->ndbRecordSharedData->
+        serverTableNdbRecord;
+      m= 0;
+      
+      NdbOperation::OperationOptions opts;
+      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+      opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
+      
+      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+                              &opts, sizeof(opts));
+
+      CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
+                 pCON->getNdbError());
+    }
+    else
+    {
+      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+      CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->insertTuple();
+      MyOp->equal(IND_SESSION_SUBSCRIBER,
+                  (char*)td->transactionData.number);
+      MyOp->equal(IND_SESSION_SERVER,
+                  (char*)&td->transactionData.server_id);
+      MyOp->setValue(IND_SESSION_DATA, 
+                     (char *)td->transactionData.session_details);
+      /* Operation 4 */
+      
+      /* Operation 5 */
+      MyOp = pCON->getNdbOperation(SERVER_TABLE);
+      CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SERVER_ID,
+                  (char*)&td->transactionData.server_id);
+      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+                  (char*)td->transactionData.suffix);
+      MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
+    }
     td->transactionData.branchExecuted = 1;
   } else {
     td->transactionData.branchExecuted = 0;
@@ -551,7 +836,7 @@ T4_Callback_3(int result, NdbConnection 
     start_T4(td->pNDB, td, stat_async);
     return;
   }//if
-  
+
   DEBUG3("T4(%.*s, %.2d): - Completing", 
 	 SUBSCRIBER_NUMBER_LENGTH, 
 	 td->transactionData.number, 
@@ -590,25 +875,113 @@ start_T5(Ndb * pNDB, ThreadData * td, in
     NdbSleep_MilliSleep(10);
   }
   
-  NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
-  CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->interpretedUpdateTuple();
-  MyOp->equal(IND_SUBSCRIBER_NUMBER, 
-	      td->transactionData.number);
-  MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
-		 (char *)&td->transactionData.location);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
-		 td->transactionData.changed_by);
-  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
-		 td->transactionData.changed_time);
-  MyOp->getValue(IND_SUBSCRIBER_GROUP,
-		 (char *)&td->transactionData.group_id);
-  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
-		 (char *)&td->transactionData.sessions);
-  MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
-		 (uint32)td->transactionData.server_bit);
+  if (td->ndbRecordSharedData)
+  {    
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      subscriberTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
+    SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
+    SET_MASK(mask, IND_SUBSCRIBER_GROUP);
+    SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
+
+    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+                                              NdbOperation::LM_Read,
+                                              mask);
+    CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
+               pCON->getNdbError());
+
+    m= 0;
+
+    /* Create program to subtract something from the 
+     * subscriber sessions column
+     */
+    Uint32 codeBuf[20];
+    NdbInterpretedCode program(pNDB->getDictionary()->
+                               getTable(SUBSCRIBER_TABLE),
+                               codeBuf,
+                               20);
+    if (program.sub_val(IND_SUBSCRIBER_SESSIONS, 
+                        (uint32)td->transactionData.server_bit) ||
+        program.interpret_exit_ok() ||
+        program.finalise())
+    {
+      CHECK_NULL(NULL , "T5: Program create failed", td,
+                 program.getNdbError());
+    }
+    NdbOperation::OperationOptions opts;
+    opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+    opts.interpretedCode= &program;  
+
+    MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
+                            mask,
+                            &opts,
+                            sizeof(opts));
+    CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
+               pCON->getNdbError());
+  }
+  else
+  {
+    /* Use old Api */
+    if (td->useCombinedUpdate)
+    {
+      NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                     (char *)&td->transactionData.location);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                     td->transactionData.changed_by);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                     td->transactionData.changed_time);
+      MyOp->getValue(IND_SUBSCRIBER_GROUP,
+                     (char *)&td->transactionData.group_id);
+      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+                     (char *)&td->transactionData.sessions);
+      MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
+                     (uint32)td->transactionData.server_bit);
+    }
+    else
+    {
+      /* Use separate read and update operations
+       * This relies on execution ordering between operations on
+       * the same row
+       */
+      NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
+                 pCON->getNdbError());
+      MyOp->readTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->getValue(IND_SUBSCRIBER_LOCATION, 
+                     (char *)&td->transactionData.location);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY, 
+                     td->transactionData.changed_by);
+      MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME, 
+                     td->transactionData.changed_time);
+      MyOp->getValue(IND_SUBSCRIBER_GROUP,
+                     (char *)&td->transactionData.group_id);
+      MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
+                     (char *)&td->transactionData.sessions);
+
+      MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
+      CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
+                 pCON->getNdbError());
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SUBSCRIBER_NUMBER, 
+                  td->transactionData.number);
+      MyOp->subValue(IND_SUBSCRIBER_SESSIONS, 
+                     (uint32)td->transactionData.server_bit);
+    }
+  }
   stat_async = async;
   if (async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
@@ -634,15 +1007,36 @@ T5_Callback_1(int result, NdbConnection 
 	 td->transactionData.number, 
 	 td->transactionData.server_id);
   
-  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
-  CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
-	     pCON->getNdbError());
-  
-  MyOp->readTuple();
-  MyOp->equal(IND_GROUP_ID,
-	      (char*)&td->transactionData.group_id);
-  MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
-		 (char *)&td->transactionData.permission);
+  if (td->ndbRecordSharedData)
+  {
+    char* rowPtr= (char*) &td->transactionData;
+    const NdbRecord* record= td->ndbRecordSharedData->
+      groupTableNdbRecord;
+    Uint32 m=0;
+    unsigned char* mask= (unsigned char*) &m;
+
+    SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
+
+    const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
+                                              NdbOperation::LM_Read,
+                                              mask);
+
+    CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
+               pCON->getNdbError());
+  }
+  else
+  {
+    NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
+    CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
+               pCON->getNdbError());
+    
+    MyOp->readTuple();
+    MyOp->equal(IND_GROUP_ID,
+                (char*)&td->transactionData.group_id);
+    MyOp->getValue(IND_GROUP_ALLOW_DELETE, 
+                   (char *)&td->transactionData.permission);
+  }
+
   if (stat_async == 1) {
     pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
   } else {
@@ -680,29 +1074,58 @@ T5_Callback_2(int result, NdbConnection 
 	   SUBSCRIBER_NUMBER_SUFFIX_LENGTH, 
 	   td->transactionData.suffix);
     
-    /* Operation 3 */
-    NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
-    CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->deleteTuple();
-    MyOp->equal(IND_SESSION_SUBSCRIBER,
-		(char*)td->transactionData.number);
-    MyOp->equal(IND_SESSION_SERVER,
-		(char*)&td->transactionData.server_id);
-    /* Operation 4 */
-    
-    /* Operation 5 */
-    MyOp = pCON->getNdbOperation(SERVER_TABLE);
-    CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
-	       pCON->getNdbError());
-    
-    MyOp->interpretedUpdateTuple();
-    MyOp->equal(IND_SERVER_ID,
-		(char*)&td->transactionData.server_id);
-    MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
-		(char*)td->transactionData.suffix);
-    MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+    if (td->ndbRecordSharedData)
+    {
+      char* rowPtr= (char*) &td->transactionData;
+      const NdbRecord* record= td->ndbRecordSharedData->
+        sessionTableNdbRecord;
+      Uint32 m=0;
+      unsigned char* mask= (unsigned char*) &m;
+      
+      const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
+      CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
+                 pCON->getNdbError());
+
+      record= td->ndbRecordSharedData->
+        serverTableNdbRecord;
+      m= 0;
+
+      NdbOperation::OperationOptions opts;
+      opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
+      opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
+      
+      MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
+                              &opts, sizeof(opts));
+
+      CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
+                 pCON->getNdbError());
+    }
+    else
+    {
+      /* Operation 3 */
+      NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
+      CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->deleteTuple();
+      MyOp->equal(IND_SESSION_SUBSCRIBER,
+                  (char*)td->transactionData.number);
+      MyOp->equal(IND_SESSION_SERVER,
+                  (char*)&td->transactionData.server_id);
+      /* Operation 4 */
+      
+      /* Operation 5 */
+      MyOp = pCON->getNdbOperation(SERVER_TABLE);
+      CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
+                 pCON->getNdbError());
+      
+      MyOp->interpretedUpdateTuple();
+      MyOp->equal(IND_SERVER_ID,
+                  (char*)&td->transactionData.server_id);
+      MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
+                  (char*)td->transactionData.suffix);
+      MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
+    }
     td->transactionData.branchExecuted = 1;
   } else {
     td->transactionData.branchExecuted = 0;

=== modified file 'storage/ndb/test/ndbapi/bench/testData.h'
--- a/storage/ndb/test/ndbapi/bench/testData.h	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/bench/testData.h	2008-09-30 08:18:41 +0000
@@ -121,6 +121,16 @@ typedef struct {
 } TransactionData ;
 
 typedef struct {
+  const struct NdbRecord* subscriberTableNdbRecord;
+  const struct NdbRecord* groupTableNdbRecord;
+  const struct NdbRecord* sessionTableNdbRecord;
+  const struct NdbInterpretedCode* incrServerReadsProg;
+  const struct NdbInterpretedCode* incrServerInsertsProg;
+  const struct NdbInterpretedCode* incrServerDeletesProg;
+  const struct NdbRecord* serverTableNdbRecord;
+} NdbRecordSharedData ;
+
+typedef struct {
   struct NdbThread* pThread;
 
   unsigned long randomSeed;
@@ -135,10 +145,12 @@ typedef struct {
   /**
    * For async execution
    */
-  RunState          runState;
-  double            startTime;
-  TransactionData   transactionData;
-  struct Ndb      * pNDB;
+  RunState              runState;
+  double                startTime;
+  TransactionData       transactionData;
+  struct Ndb            * pNDB;
+  NdbRecordSharedData*  ndbRecordSharedData;
+  bool                  useCombinedUpdate;
 } ThreadData;
 
 /***************************************************************

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2007-08-01 03:07:58 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2008-10-01 09:24:07 +0000
@@ -56,6 +56,13 @@ enum StartType { 
   stStop 
 } ;
 
+struct ThreadNdb
+{
+  int NoOfOps;
+  int ThreadNo;
+  char * record;
+};
+
 extern "C" { static void* threadLoop(void*); }
 static void setAttrNames(void);
 static void setTableNames(void);
@@ -63,8 +70,10 @@ static int readArguments(int argc, const
 static int createTables(Ndb*);
 static void defineOperation(NdbConnection* aTransObject, StartType aType, 
                             Uint32 base, Uint32 aIndex);
+static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType, 
+                            Uint32 base, Uint32 aIndex);
 static void execute(StartType aType);
-static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
@@ -77,12 +86,6 @@ static int                              
                 
 ErrorData * flexAsynchErrorData;                        
 
-struct ThreadNdb
-{
-  int NoOfOps;
-  int ThreadNo;
-};
-
 static NdbThread*               threadLife[NDB_MAXTHREADS];
 static int                              tNodeId;
 static int                              ThreadReady[NDB_MAXTHREADS];
@@ -91,6 +94,9 @@ static char                             
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
 
 // Program Parameters
+static NdbRecord * g_record[MAXTABLES];
+static bool tNdbRecord = false;
+
 static bool                             tLocal = false;
 static int                              tLocalPart = 0;
 static int                              tSendForce = 0;
@@ -236,6 +242,17 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
+  if (tNdbRecord)
+  {
+    Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+    sz += 3;
+    for (Uint32 i = 0; i<tNoOfThreads; i++)
+    {
+      pThreadData[i].record = (char*)malloc(sz);
+      bzero(pThreadData[i].record, sz);
+    }
+  }
+
   if(returnValue == NDBT_OK){
     /****************************************************************
      *  Create NDB objects.                                   *
@@ -501,7 +518,7 @@ threadLoop(void* ThreadData)
 
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
-    if(!executeThread(tType, localNdb, threadBase)){
+    if(!executeThread(tabThread, tType, localNdb, threadBase)){
       break;
     }
     ThreadReady[threadNo] = 1;
@@ -515,66 +532,76 @@ threadLoop(void* ThreadData)
 
 static 
 bool
-executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+executeThread(ThreadNdb* pThread, 
+	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
   int i, j, k;
   NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
 
-  for (i = 0; i < tNoOfTransactions; i++) {
-    if (tLocal == false) {
-      tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-    } else {
-      tBase = i * tNoOfParallelTrans * MAX_SEEK;
-    }//if
-    START_REAL_TIME;
-    for (j = 0; j < tNoOfParallelTrans; j++) {
+  unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
+
+  for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
+  {
+    for (i = 0; i < tNoOfTransactions; i++) {
       if (tLocal == false) {
-        tBase2 = tBase + (j * tNoOfOpsPerTrans);
-      } else {
-        tBase2 = tBase + (j * MAX_SEEK);
-        tBase2 = getKey(threadBase, tBase2);
-      }//if
-      if (startTransGuess == true) {
-        Uint64 Tkey64;
-        Uint32* Tkey32 = (Uint32*)&Tkey64;
-        Tkey32[0] = threadBase;
-        Tkey32[1] = tBase2;
-        tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                         (const char*)&Tkey64, //Main PKey
-                                         (Uint32)4);           //Key Length
+        tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
       } else {
-        tConArray[j] = aNdbObject->startTransaction();
+        tBase = i * tNoOfParallelTrans * MAX_SEEK;
       }//if
-      if (tConArray[j] == NULL && 
-          !error_handler(aNdbObject->getNdbError()) ){
-        ndbout << endl << "Unable to recover! Quiting now" << endl ;
-        return false;
-      }//if
-      
-      for (k = 0; k < tNoOfOpsPerTrans; k++) {
-        //-------------------------------------------------------
-        // Define the operation, but do not execute it yet.
-        //-------------------------------------------------------
-        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+      START_REAL_TIME;
+      for (j = 0; j < tNoOfParallelTrans; j++) {
+        if (tLocal == false) {
+          tBase2 = tBase + (j * tNoOfOpsPerTrans);
+        } else {
+          tBase2 = tBase + (j * MAX_SEEK);
+          tBase2 = getKey(threadBase, tBase2);
+        }//if
+        if (startTransGuess == true) {
+          Uint64 Tkey64;
+          Uint32* Tkey32 = (Uint32*)&Tkey64;
+          Tkey32[0] = threadBase;
+          Tkey32[1] = tBase2;
+          tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+                                                      (const char*)&Tkey64, //Main PKey
+                                                      (Uint32)4);           //Key Length
+        } else {
+          tConArray[j] = aNdbObject->startTransaction();
+        }//if
+        if (tConArray[j] == NULL && 
+            !error_handler(aNdbObject->getNdbError()) ){
+          ndbout << endl << "Unable to recover! Quiting now" << endl ;
+          return false;
+        }//if
+        
+        for (k = 0; k < tNoOfOpsPerTrans; k++) {
+          //-------------------------------------------------------
+          // Define the operation, but do not execute it yet.
+          //-------------------------------------------------------
+          if (tNdbRecord)
+            defineNdbRecordOperation(pThread, 
+                                     tConArray[j], aType, threadBase,(tBase2+k));
+          else
+            defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
+        }//for
+        
+        tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+      }//for
+      STOP_REAL_TIME;
+      //-------------------------------------------------------
+      // Now we have defined a set of operations, it is now time
+      // to execute all of them.
+      //-------------------------------------------------------
+      int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+      while (Tcomp < tNoOfParallelTrans) {
+        int TlocalComp = aNdbObject->pollNdb(3000, 0);
+        Tcomp += TlocalComp;
+      }//while
+      for (j = 0 ; j < tNoOfParallelTrans ; j++) {
+        aNdbObject->closeTransaction(tConArray[j]);
       }//for
-      
-      tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
-    }//for
-    STOP_REAL_TIME;
-    //-------------------------------------------------------
-    // Now we have defined a set of operations, it is now time
-    // to execute all of them.
-    //-------------------------------------------------------
-    int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-    while (Tcomp < tNoOfParallelTrans) {
-      int TlocalComp = aNdbObject->pollNdb(3000, 0);
-      Tcomp += TlocalComp;
-    }//while
-    for (j = 0 ; j < tNoOfParallelTrans ; j++) {
-      aNdbObject->closeTransaction(tConArray[j]);
     }//for
-  }//for
+  } // for
   return true;
 }//executeThread()
 
@@ -720,6 +747,68 @@ defineOperation(NdbConnection* localNdbC
   return;
 }//defineOperation()
 
+
+static void
+defineNdbRecordOperation(ThreadNdb* pThread, 
+			 NdbConnection* pTrans, StartType aType,
+			 Uint32 threadBase, Uint32 aIndex)
+{
+  char * record = pThread->record;
+  Uint32 offset;
+  NdbDictionary::getOffset(g_record[0], 0, offset);
+  * (Uint32*)(record + offset) = threadBase;
+  * (Uint32*)(record + offset + 4) = aIndex;
+  
+  //-------------------------------------------------------
+  // Set-up the attribute values for this operation.
+  //-------------------------------------------------------
+  if (aType != stRead && aType != stDelete)
+  {
+    for (int k = 1; k < tNoOfAttributes; k++) {
+      NdbDictionary::getOffset(g_record[0], k, offset);
+      * (Uint32*)(record + offset) = aIndex;    
+    }//for
+  }
+  
+  const NdbOperation* op;
+  switch (aType) {
+  case stInsert: {   // Insert case
+    if (theWriteFlag == 1)
+    {
+      op = pTrans->writeTuple(g_record[0],record,g_record[0],record);
+    }
+    else
+    {
+      op = pTrans->insertTuple(g_record[0],record,g_record[0],record);
+    }
+    break;
+  }//case
+  case stRead: {     // Read Case
+    op = pTrans->readTuple(g_record[0],record,g_record[0],record);
+    break;
+  }//case
+  case stUpdate:{    // Update Case
+    op = pTrans->updateTuple(g_record[0],record,g_record[0],record);    
+    break;
+  }//case
+  case stDelete: {   // Delete Case
+    op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
+    break;
+  }//case
+  default: {
+    abort();
+  }//default
+  }//switch
+
+  if (op == NULL)
+  {
+    ndbout << "Operation is null " << pTrans->getNdbError() << endl;
+    abort();
+  }
+    
+  assert(op != 0);
+}
+
 static void setAttrNames()
 {
   int i;
@@ -813,6 +902,28 @@ createTables(Ndb* pMyNdb){
         return -1;
       
       NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+
+      if (tNdbRecord)
+      {
+	NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+	const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+	
+	int off = 0;
+	Vector<NdbDictionary::RecordSpecification> spec;
+	for (Uint32 j = 0; j<pTab->getNoOfColumns(); j++)
+	{
+	  NdbDictionary::RecordSpecification r0;
+	  r0.column = pTab->getColumn(j);
+	  r0.offset = off;
+	  off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+	  spec.push_back(r0);
+	}
+	g_record[i] = 
+	  pDict->createRecord(pTab, spec.getBase(), 
+			      spec.size(),
+			      sizeof(NdbDictionary::RecordSpecification));
+	assert(g_record[i]);
+      }
     }
   }
   
@@ -949,6 +1060,10 @@ readArguments(int argc, const char** arg
       startTransGuess = false;
       argc++;
       i--;
+    } else if (strcmp(argv[i], "-ndbrecord") == 0){
+      tNdbRecord = true;
+      argc++;
+      i--;
     } else {
       return -1;
     }
@@ -995,7 +1110,7 @@ input_error(){
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
   ndbout_c("   -local Number of part, only use keys in one part out of 16");
+  ndbout_c("   -ndbrecord");
 }
   
-
-
+template class Vector<NdbDictionary::RecordSpecification>;

=== modified file 'storage/ndb/test/ndbapi/msa.cpp'
--- a/storage/ndb/test/ndbapi/msa.cpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/test/ndbapi/msa.cpp	2008-09-30 08:18:41 +0000
@@ -22,6 +22,7 @@
 #include <NdbSleep.h>
 #include <NdbThread.h>
 #include <NdbTick.h>
+#include <NdbOut.hpp>
 
 const char* const c_szDatabaseName = "TEST_DB";
 
@@ -52,6 +53,7 @@ bool g_bWriteTuple = false;
 bool g_bInsertInitial = false;
 bool g_bVerifyInitial = false;
 
+Ndb_cluster_connection* theConnection = 0;
 NdbMutex* g_pNdbMutexPrintf = 0;
 NdbMutex* g_pNdbMutexIncrement = 0;
 long g_nNumCallsProcessed = 0;
@@ -242,7 +244,8 @@ int QueryTransaction(Ndb* pNdb, 
                      NdbError& err)
 {
     int iRes = -1;
-    NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+    NdbConnection* pNdbConnection = pNdb->startTransaction();
+    //0, (const char*)&iContextId, 4);
     if(pNdbConnection)
     {
         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -326,7 +329,8 @@ int RetryQueryTransaction(Ndb* pNdb, 
 int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
 {
     int iRes = -1;
-    NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+    NdbConnection* pNdbConnection = pNdb->startTransaction();
+    //0, (const char*)&iContextId, 4);
     if(pNdbConnection)
     {
         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -411,7 +415,8 @@ int InsertTransaction(Ndb* pNdb, 
                       NdbError& err)
 {
     int iRes = -1;
-    NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextID, 4);
+    NdbConnection* pNdbConnection = pNdb->startTransaction();
+    //0, (const char*)&iContextID, 4);
     if(pNdbConnection)
     {
         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -501,7 +506,8 @@ int RetryInsertTransaction(Ndb* pNdb, 
 int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
 {
     int iRes = -1;
-    NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4);
+    NdbConnection* pNdbConnection = pNdb->startTransaction();
+    //0, (const char*)&iContextId, 4);
     if(pNdbConnection)
     {
         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
@@ -678,7 +684,7 @@ void* RuntimeCallContext(void* lpParam)
     long iLockTime;
     long iLockTimeUSec;
     
-    pNdb = new Ndb("TEST_DB");
+    pNdb = new Ndb(theConnection, "TEST_DB");
     if(!pNdb)
     {
         NdbMutex_Lock(g_pNdbMutexPrintf);
@@ -971,7 +977,6 @@ void ShowHelp(const char* szCmd)
 int main(int argc, char* argv[])
 {
     ndb_init();
-    int iRes = -1;
     g_nNumThreads = 0;
     g_nMaxCallsPerSecond = 0;
     long nSeed = 0;
@@ -998,7 +1003,7 @@ int main(int argc, char* argv[])
                 break;
             case 'm': 
                 g_nStatusDataSize = atol(argv[i]+2); 
-                if(g_nStatusDataSize>sizeof(STATUS_DATA))
+                if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
                 {
                     g_nStatusDataSize = sizeof(STATUS_DATA);
                 }
@@ -1093,7 +1098,19 @@ int main(int argc, char* argv[])
     hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
 #endif
     
-    Ndb* pNdb = new Ndb(c_szDatabaseName);
+    theConnection= new Ndb_cluster_connection();
+    if (theConnection->connect(12, 5, 1) != 0)
+    {
+      ndbout << "Unable to connect to managment server." << endl;
+      return -1;
+    }
+    if (theConnection->wait_until_ready(30,0) < 0)
+    {
+      ndbout << "Cluster nodes not ready in 30 seconds." << endl;
+      return -1;
+    }
+
+    Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
     if(!pNdb)
     {
         printf("could not construct ndb\n");

=== modified file 'storage/ndb/test/run-test/setup.cpp'
--- a/storage/ndb/test/run-test/setup.cpp	2008-09-25 10:21:14 +0000
+++ b/storage/ndb/test/run-test/setup.cpp	2008-10-08 21:05:55 +0000
@@ -34,12 +34,10 @@ struct proc_option f_options[] = {
   ,{ "--host=",        atrt_process::AP_CLIENT, 0 }
   ,{ "--server-id=",   atrt_process::AP_MYSQLD, PO_REP }
   ,{ "--log-bin",      atrt_process::AP_MYSQLD, PO_REP_MASTER }
-#if 0
   ,{ "--master-host=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
   ,{ "--master-port=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
   ,{ "--master-user=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
   ,{ "--master-password=", atrt_process::AP_MYSQLD, PO_REP_SLAVE }
-#endif
   ,{ "--ndb-connectstring=", atrt_process::AP_MYSQLD | atrt_process::AP_CLUSTER
      ,PO_NDB }
   ,{ "--ndbcluster", atrt_process::AP_MYSQLD, PO_NDB }

Thread
bzr commit into mysql-6.0-ndb branch (tomas.ulin:2675) Tomas Ulin15 Oct