List:Commits« Previous MessageNext Message »
From:Martin Skold Date:September 29 2010 10:56am
Subject:bzr commit into mysql-5.1-telco-6.3 branch (Martin.Skold:3288) Bug#51310
Bug#55641 Bug#56116 Bug#56763 Bug#56770 Bug#56829 Bug#56840 Bug#56841
Bug#5...
View as plain text  
#At file:///home/marty/MySQL/mysql-5.1-telco-6.3/

 3288 Martin Skold	2010-09-29 [merge]
      Merge
      removed:
        cluster_change_hist.txt
      modified:
        mysql-test/collections/default.experimental
        mysql-test/suite/ndb/r/ndb_database.result
        mysql-test/suite/ndb/t/ndb_database.test
        sql/ha_ndbcluster.cc
        sql/ha_ndbcluster.h
        sql/ha_ndbcluster_binlog.cc
        sql/handler.cc
        sql/handler.h
        sql/sql_show.cc
        sql/sql_table.cc
        storage/ndb/include/kernel/GlobalSignalNumbers.h
        storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp
        storage/ndb/include/mgmapi/mgmapi.h
        storage/ndb/include/ndbapi/NdbDictionary.hpp
        storage/ndb/src/kernel/blocks/ERROR_codes.txt
        storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
        storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
        storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
        storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
        storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
        storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
        storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
        storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
        storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp
        storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
        storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
        storage/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
        storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
        storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
        storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
        storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
        storage/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp
        storage/ndb/src/kernel/blocks/suma/Suma.cpp
        storage/ndb/src/kernel/blocks/suma/Suma.hpp
        storage/ndb/src/kernel/main.cpp
        storage/ndb/src/ndbapi/DictCache.cpp
        storage/ndb/src/ndbapi/DictCache.hpp
        storage/ndb/src/ndbapi/NdbDictionary.cpp
        storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
        storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
        storage/ndb/test/include/NdbRestarter.hpp
        storage/ndb/test/ndbapi/testIndex.cpp
        storage/ndb/test/ndbapi/testRestartGci.cpp
        storage/ndb/test/ndbapi/testSystemRestart.cpp
        storage/ndb/test/run-test/daily-basic-tests.txt
        storage/ndb/test/src/NdbRestarter.cpp

=== removed file 'cluster_change_hist.txt'
--- a/cluster_change_hist.txt	2009-12-14 10:18:40 +0000
+++ b/cluster_change_hist.txt	1970-01-01 00:00:00 +0000
@@ -1,276 +0,0 @@
-==================================================
-mysql-5.1.X-ndb-6.2.Y
-==================================================
-
-mysql-5.1.19-ndb-6.2.4 (released 2007-07-04)
-
-  Bug #29525 batching incorrect
-  Bug #28720 Disk data meta information is not visible in mysqld but exists in ndbd
-  Bug #28642 Tablespace returning incorrect usage status
-  Bug #29501 DN crashes in LGMAN during NR while DD schema operations are being handled
-  Bug #29354 Incorrect handling of replica REDO during SR (5.0)
-  Bug #29364 SQL queries hang while data node in start phase 5
-  Bug #27404 util thd mysql_parse sig11 when mysqld default multibyte charset
-
-mysql-5.1.19-ndb-6.2.3 (released 2007-07-02)
-(additional bugs fixed compared to mysql-5.1.19)
-
-  Bug #29331 Incorrect node killed on GCP Stop (COPY_GCI)
-  Bug #29057 Incorrect handling of file 0 during redo execution(Error while reading REDO log)
-  Bug #29067 Incorrect order during redo invalidation(Error while reading REDO log)
-  Bug #29118 Incorrect handling of loglap in redo invalidation (Error while reading REDO log)
-  Bug #29229 memory corruption in dbtup
-  Bug #29185 Large IN list crashes mysqld with cluster and condition pushdown
-  Bug #29176 drop table can crash pgman
-  Bug #28989 hpux11 ps_row warnings
-  Bug #25305 rpl_ndb_UUID.test fails on pb-valgrind
-  Bug #29167 Insufficient nodes for SR
-  Bug #27640 backup id not displayed in the output of "ndb_mgm start backup wait completed"
-  Bug #28949 n_mix valgrind failing in pushbuild
-  Bug #29099 slow backup for disk data
-  Bug #26342 auto_increment_increment AND auto_increment_offset REALLY REALLY anger NDB cluster
-  Bug #29103 ndb_restore segfaults on NULL var[char|binary]
-  Bug #29074 preserve file timestamps in ndb_error_reporter
-  Bug #29073 Store history for ndb_mgm
-  Bug #29063 mgmapi: connect timeout set incorrectly
-  Bug #29044 blob code sets error code on transaction object
-  Bug #28724 for blobs, op flag to not set error on trans
-  Bug #28751 Lots of memory locked in memory causes high kswapd
-  Bug #28899 not possible to set separate watchdog timeout at startup
-  Bug# 28726 LCP files not removed after initial system restart
-  Bug #20612 failed ndbrequire in PGMAN
-  Bug #28443 transporter gets stuck when >1024 signals received at once
-  Bug #28770 file already opened error when corrupt schema file
-  Bug #28749 MaxNoOfOpenFiles offset by 1
-  Bug #26783 replication status unknown after cluster or mysqld failure
-  Bug #28717 Race between NODE_FAILREP and COPY_GCIREQ
-  Bug #28719 multi pk update ignore corrupts data
-  Bug #28653 Fast GCP + high load + high RedoBuffer causes ndbrequire
-  Bug #28525 Node failures in PGMAN at ndbrequire (line 430)
-
-
-==================================================
-mysql-5.1.15-ndb-6.1.X
-==================================================
-
-mysql-5.1.15-ndb-6.1.16 (2007-06-28)
-
-  Bug #29331 Incorrect node killed on GCP Stop (COPY_GCI)
-  Bug #29057 Incorrect handling of file 0 during redo execution(Error while reading REDO log)
-  Bug #29067 Incorrect order during redo invalidation(Error while reading REDO log)
-  Bug #29118 Incorrect handling of loglap in redo invalidation (Error while reading REDO log)
-  Bug #29176 drop table can crash pgman
-
-mysql-5.1.15-ndb-6.1.15 (2007-06-20)
-
-  Bug #29229 memory corruption in dbtup
-
-mysql-5.1.15-ndb-6.1.14 (2007-06-19)
-
-  Bug #29176 missing stack pruning in drop_page
-  Bug #29167 Insufficient nodes for SR
-
-mysql-5.1.15-ndb-6.1.13 (2007-06-15)
-
-  Bug #29099 slow backup for disk data
-
-mysql-5.1.15-ndb-6.1.12 (2007-06-13)
-
-  Bug #29044 memory buddy allocator "unoptimal" memory handling
-  extend backup dump to give more info
-
-mysql-5.1.15-ndb-6.1.11 (2007-06-06)
-
-  Bug #28751 Lots of memory locked in memory causes high kswapd
-  Bug #28899 not possible to set separate watchdog timeout at startup
-  Bug #28726 LCP files not removed after initial system restart
-  Bug #20612 failed ndbrequire in PGMAN
-  make size of redo log files (fragment log files) configurable (error 1220)
-
-mysql-5.1.15-ndb-6.1.10 (2007-05-30)
-
-  Bug #28783 heartbeat failures under high load
-  Bug #28770 file already opened error when corrupt schema file
-  Bug #28717 Race between NODE_FAILREP and COPY_GCIREQ
-  Bug #28525 Node failures in PGMAN at ndbrequire (line 430)
-  Bug #28653 Fast GCP + high load + high RedoBuffer causes ndbrequire
-  Bug #28749 MaxNoOfOpenFiles offset by 1
-  Added addditional 'times' printout in ndbd WatchDog thread
-  Some filename changes to avoid 'tar' issues with 99 char limit
-  Removed some extra printouts in ndbd out file
-
-mysql-5.1.15-ndb-6.1.9 (2007-05-24)
-
-  Bug #28593 cluster backup scans in acc index order, bad for disk data
-  Bug #28443 transporter gets stuck when >1024 signals received at once
- 
-mysql-5.1.15-ndb-6.1.8 (2007-05-18)
-
-  Bug #28491 Expand check gets disabled in some cases (initial node restart)
-  Bug #28348 Dropped tables not removed from LCP
-  Bug #20535 NDB API implicit insert of NULL does not overwrite old value
-  Bug #27437 NDB Data node crashes during mysqldump inserts
-  Bug #27942 node failure during massive insert
-
-mysql-5.1.15-ndb-6.1.7 (2007-05-05)
-
-  Bug #26121 mysqldump includes LOCK TABLES general_log WRITE
-  Bug #28161 Detached triggers + DD and only MM update could cause node failure
-  Bug #25530 --with-readline fails with commercial source packages
-  Bug #25741 AllocNodeIdRef::NodeFailureHandlingNotCompleted
-  Bug #27205 Occational 899 if delete+insert during LCP
-  Bug #24667 After ALTER TABLE operation ndb_dd table becomes regular ndb
-  Bug #28093 ndb: retry sleep in get table stats 30s instead of 30ms
-  Bug #28073 Infinite loop in lock queue.
-  Bug #27495 Missing implementation of NdbTransaction::executeAsynch().
-  Bug #28023 NR can fail to update GCI
-  Bug #27756 Memleak with insert+delete
-  Bug #27757 Tup scan cant see own records
-  Bug #27748 Inconsistent replication(backup) with some multi-update combinations
-  Bug #27651 Very high load can cause event api to get out of sync, "out of order buckets"
-  Bug #27728 Partially connected API's can cause problem for SUMA
-  Bug #27663 Missmatched free/delete in listObjects
-  Bug #27560 Memory usage of mysqld grows while doing nothing
-  Bug #27581 Drop/truncate table can result in node/cluster-crash
-
-  (no bug number) fix bug in my.cnf config handling, put64 for 64-bit variables
-
-  Other changes:
-  - Force varpart, such that tables can handle online add column when feature arrives
-
-  Test tools update for measuring replication latency:
-
-  - new ndb tool 'rep_latency' to measure replication latency
-  - simple extend of 'listen_event' to do apply on remote cluster
-
-mysql-5.1.15-ndb-6.1.6 (2007-03-30)
-
-  Bug #27529 Slave crashes on lots of updates
-  Bug #27512 Inconsistent tuples when using variable size and >16Gb datamemory
-  Bug #27378 update becomes delete on slave
-  Bug #27466 nf during nr can leave cluster in inconsistent state
-  Bug #27444 DataMemory missing from report in cluster logs
-  Bug #27044 replicated with unique field ndb table allows duplicate key inserts
-  Bug #26286 row-based logging scales worse than statement-based logging
-  Bug #19896 Last_Errno: 4294967295, Error in Write_rows event: (error number 4 billion ?)
-  Bug #27320 ndb handler does not reset extra flags on reset()
-  Bug #27283 Race condition in GCP Master take-over
-  Bug #26825 MySQL Server Crashes in high load
-  Bug #27286 Rare racecondition in nodeid allocation on master failure
-  Bug #27291 Setting LockPagesInMainMemory to zero doesn't work
-  Bug #20185 Node failure might cause other node failure
-  Bug #27203 ScanDelete+Scan+Insert could cause node crash
-  Bug #27169 xtra word sent in SUMA::resend_bucket causing crash of mysqld(event listener)
-  Bug #27005 Node failure + API failure can crash SUMA resend
-  Bug #27087 Unoptimal handling of failed API in SUMA
-  Bug #27102 Node can crash during flush of undo during restart with filesystem error
-  Bug #27003 Failure during noderestart could crash/hang alive node (and hence cluster)
-  (not reported) correct event buffer status reporting
-
-mysql-5.1.15-ndb-6.1.5 (2007-03-15)
-
-  Bug #26997 - mysqld segfault when in single user mode
-  Bug #26825 - MySQL Server Crashes in high load
-  Bug #25743 - If undo_buffer_size (for LG) greater than the inital shared memory (default 20M), ndbd nodes are crashed
-  Bug #26899 - ndb_restore cannot restore selected tables and databases
-  Bug #26900 - ndb_restore printout option does not give structured data
-  Bug #26720 - Infinite loop on unknown signal in logging function
-
-  * ndb_apply_status schema change
-  * ndb_restore options for printing to file
-
-  Note: since ndb_apply_status schema change this version is not backwards compatible
-
-mysql-5.1.15-ndb-6.1.4 (limited test release)
-
-  Bug #26741 - ndb_restore with only print_data seg faults
-  Bug #26739 - ndb_restore segfault on some 64-bit architectures
-  Bug #26663 - cluster have issues with api nodeids > 63
-
-mysql-5.1.15-ndb-6.1.3 (25 Feb 2007)
-
-  Bug #26515 - Incorrect pointer after FSCLOSECONF in REDO invalidation
-  Bug #26514 - Invalid memory access could occur during DD usage
-  Bug #26490 - duplicate cluster error code
-  Bug #26487 - Bug in extent/page allocation when *using* >1gb datafiles
-  Bug #25801 - Internal error if starting partially with missing REDO
-  Bug #26481 - Node failure during initial node restart, can lead to subsequent node failures
-  Bug #26457 - Incorrect handling of LCP take-over during multi-master-node-failure
-  Bug #26454 - LockPagesInMemory=1 is called after distributed communcation has started
-  Bug #25239 - Out of memory problem can cause crash in SUMA
-  Bug #21033 - Error 0 in readAutoIncrementValue()
-  Bug #26293 - cluster mgmt node sometimes doesn't receive events from all nodes on restart
-
-  * listEvents ndbapi extension
-  * Modified ndb_show_tables to also handle Table Events
-  * mysqld option --ndb-wait-connected
-  * Possibility to disable arbitration by setting arbitrationrank=0 on all nodes
-  * All dump 2352
-  * Different READ/SCAN (EX-SH)
-  * New dump commands for tracking hanging locks
-  * Change level on StartREDOLog to make it default come up in cluster log
-  * Make ndbd_redo_log_reader be build default
-
----------------------------------------------------
-
-mysql-5.1.16-ndb-6.2.0 (2007-03-09)
-
-  * WAN settings for TCP transporters
-  * moved initialization of ndbd fs block first to ensure that it gets enough space for allocation of file system thread stacks
-  * added listEvents
-  * modified ndb_show_tables to also handle Table Events
-  * added mysqld option --ndb-wait-connected
-  * Add possibility to disable arbitration by setting arbitrationrank=0 on all nodes
-  * Add all dump 2352
-  * Add different READ/SCAN (EX-SH)
-  * ndb - add new dump commands for tracking hanging locks
-  * Change level on StartREDOLog to make it default come up in cluster log
-  * Make ndbd_redo_log_reader be build default
-  * Integrate mem-manager with TUP removing need for DataMemory2
-  * New interface(ndbapi) for iterating over ndb-object connected to a Ndb_cluster_connection
-  * New interface(ndbapi) for starting transaction with hints
-  * Different handling of AbortOption in ndbapi
-
----------------------------------------------------
-
-mysql-5.1.15-ndb-6.1.2 (7 Feb 2007)
-
-  Bug #26267 - using node ids > 48 causes invalid memory access
-
----------------------------------------------------
-
-mysql-5.1.15-ndb-6.1.1 (31 Jan 2007)
-
-  Bug #25997 - Valgrind reports leak in event code in mysqld during hadeling of NF and CF
-  Bug #25794 - Delete+Read (in same op) crashes datanodes if >5 cols or disk/varchar
-  Bug #25286 - NDB data node crashed in DBLQH, Line 2483
-
-  (* added extra version info to API to enable support for non-supported online software upgrade)
-  * 255 node support
-
-  Notes:
-
-  - This release is not online upgradable with ndb-6.1.0.
-  - All executables needs to be replaced
-  - All ndbapi/mgmapi application should be recompiled
-
----------------------------------------------------
-
-mysql-5.1.14-ndb-6.1.0 (20 Dec 2006)
-
-  Bug #25059 - Unique index lookup execute(Commit) on non existing tuple can lead to 4012
-  Bug #25090 - Incorrect handling of execute(Commit, AO_IgnoreError), can lead to DBCT crash
-  Bug #19956 - Problems with VARCHAR primary key and BLOB fields
-  Bug #24949 - Pulling cable from _last_ db-node can cause 6min timeout in api
-  Bug #25001 - MEDIUMTEXT column not stored on disk
-  Bug #24664 - Rare problem in LCP, can lead to unability to restart
-  Bug #24917 - Makeing node restart with "newly" dropped disk table can cause failure during restart
-  Bug #24914 - Start Transaction with hint guessed incorrectly
-  Bug #21948 - Repeated create/drop/truncate(DD) together with restarts can cause SR failure
-  Bug #17605 - DD Extents are not available until after LCP
-  Bug #22773 - NDB_LE_Connected sent when it should be NDB_LE_Disconnected
-  Bug #24166 - SR-crash if DD and hidden key
-
-  * add support for periodic mem-reporting, config.ini parameter "MemReportFrequency" which is specified in seconds
-
-

=== modified file 'mysql-test/collections/default.experimental'
--- a/mysql-test/collections/default.experimental	2010-03-15 08:46:55 +0000
+++ b/mysql-test/collections/default.experimental	2010-09-14 06:51:53 +0000
@@ -14,3 +14,6 @@ rpl.rpl_innodb_bug28430*  @solaris      
 rpl_ndb.rpl_ndb_2ndb   # rpl_ndb_2ndb fails sporadically
 
 binlog.binlog_unsafe @solaris            # Bug #47128 Test "binlog_unsafe" exceds default stack allocation
+
+rpl_ndb.rpl_ndb_typeconv_all @solaris    # bug#52131
+rpl_ndb.rpl_ndb_typeconv_lossy @solaris  # bug#52131

=== modified file 'mysql-test/suite/ndb/r/ndb_database.result'
--- a/mysql-test/suite/ndb/r/ndb_database.result	2010-07-26 14:41:18 +0000
+++ b/mysql-test/suite/ndb/r/ndb_database.result	2010-09-22 12:06:29 +0000
@@ -59,4 +59,5 @@ create table newdb.t1(a int primary key)
 show tables;
 Tables_in_newdb
 t1
+drop table t1;
 drop database newdb;

=== modified file 'mysql-test/suite/ndb/t/ndb_database.test'
--- a/mysql-test/suite/ndb/t/ndb_database.test	2010-07-26 14:41:18 +0000
+++ b/mysql-test/suite/ndb/t/ndb_database.test	2010-09-22 12:06:29 +0000
@@ -103,4 +103,5 @@ show tables;
 # Check that we can reuse the table name etc.
 create table newdb.t1(a int primary key) engine=ndb;
 show tables;
-drop database newdb;
\ No newline at end of file
+drop table t1;
+drop database newdb;

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2010-09-02 09:50:57 +0000
+++ b/sql/ha_ndbcluster.cc	2010-09-22 12:06:29 +0000
@@ -541,8 +541,7 @@ check_completed_operations(Thd_ndb *thd_
 }
 
 void
-ha_ndbcluster::release_completed_operations(Thd_ndb *thd_ndb,
-                                            NdbTransaction *trans)
+ha_ndbcluster::release_completed_operations(NdbTransaction *trans)
 {
   /**
    * mysqld reads/write blobs fully,
@@ -564,7 +563,7 @@ int execute_no_commit(Thd_ndb *thd_ndb, 
                       uint *ignore_count)
 {
   DBUG_ENTER("execute_no_commit");
-  ha_ndbcluster::release_completed_operations(thd_ndb, trans);
+  ha_ndbcluster::release_completed_operations(trans);
   const NdbOperation *first= trans->getFirstDefinedOperation();
   thd_ndb->m_execute_count++;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
@@ -626,7 +625,7 @@ inline
 int execute_no_commit_ie(Thd_ndb *thd_ndb, NdbTransaction *trans)
 {
   DBUG_ENTER("execute_no_commit_ie");
-  ha_ndbcluster::release_completed_operations(thd_ndb, trans);
+  ha_ndbcluster::release_completed_operations(trans);
   int res= trans->execute(NdbTransaction::NoCommit,
                           NdbOperation::AO_IgnoreError,
                           thd_ndb->m_force_send);
@@ -2681,7 +2680,7 @@ inline int ha_ndbcluster::fetch_next(Ndb
        nextResult() on Blobs generates Blob part read ops,
        so we will free them here
     */
-    release_completed_operations(m_thd_ndb, trans);
+    release_completed_operations(trans);
     
     if ((local_check= cursor->nextResult(&_m_next_row,
                                          contact_ndb,
@@ -8836,7 +8835,9 @@ int ndbcluster_drop_database_impl(THD *t
     }
     pthread_mutex_unlock(&LOCK_open);
   }
-  DBUG_RETURN(ret);      
+
+  dict->invalidateDbGlobal(dbname);
+  DBUG_RETURN(ret);
 }
 
 static void ndbcluster_drop_database(handlerton *hton, char *path)
@@ -8855,9 +8856,20 @@ static void ndbcluster_drop_database(han
   ndbcluster_drop_database_impl(thd, path);
   char db[FN_REFLEN];
   ha_ndbcluster::set_dbname(path, db);
+  uint32 table_id= 0, table_version= 0;
+  /*
+    Since databases aren't real ndb schema object
+    they don't have any id/version
+
+    But since that id/version is used to make sure that event's on SCHEMA_TABLE
+    is correct, we set random numbers
+  */
+  table_id = (uint32)rand();
+  table_version = (uint32)rand();
   ndbcluster_log_schema_op(thd,
                            thd->query(), thd->query_length(),
-                           db, "", 0, 0, SOT_DROP_DB, 0, 0, 0);
+                           db, "", table_id, table_version,
+                           SOT_DROP_DB, 0, 0, 0);
   DBUG_VOID_RETURN;
 }
 
@@ -12717,42 +12729,84 @@ int ha_ndbcluster::alter_table_phase2(TH
     /* ndb_share reference schema free */
     DBUG_PRINT("NDB_SHARE", ("%s binlog schema free  use_count: %u",
                              m_share->key, m_share->use_count));
+    delete alter_data;
+    alter_info->data= 0;
   }
   set_ndb_share_state(m_share, NSS_INITIAL);
   free_share(&m_share); // Decrease ref_count
-  delete alter_data;
   DBUG_RETURN(error);
 }
 
-int ha_ndbcluster::alter_table_phase3(THD *thd, TABLE *table)
+int ha_ndbcluster::alter_table_phase3(THD *thd, TABLE *table,
+                                      HA_CREATE_INFO *create_info,
+                                      HA_ALTER_INFO *alter_info,
+                                      HA_ALTER_FLAGS *alter_flags)
 {
   DBUG_ENTER("alter_table_phase3");
 
+  NDB_ALTER_DATA *alter_data= (NDB_ALTER_DATA *) alter_info->data;
   if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+  {
+    delete alter_data;
+    alter_info->data= 0;
     DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
                 (thd, "ha_ndbcluster::alter_table_phase3"));
+  }
 
   const char *db= table->s->db.str;
   const char *name= table->s->table_name.str;
+
   /*
     all mysqld's will read frms from disk and setup new
     event operation for the table (new_op)
   */
+  uint32 table_id= 0, table_version= 0;
+  DBUG_ASSERT(alter_data != 0);
+  if (alter_data)
+  {
+    table_id= alter_data->table_id;
+    table_version= alter_data->old_table_version;
+  }
   ndbcluster_log_schema_op(thd, thd->query(), thd->query_length(),
                            db, name,
-                           0, 0,
+                           table_id, table_version,
                            SOT_ONLINE_ALTER_TABLE_PREPARE,
                            0, 0, 0);
+
+  /*
+    Get table id/version for new table
+  */
+  table_id= 0;
+  table_version= 0;
+  {
+    Ndb* ndb= get_ndb(thd);
+    DBUG_ASSERT(ndb != 0);
+    if (ndb)
+    {
+      ndb->setDatabaseName(db);
+      Ndb_table_guard ndbtab(ndb->getDictionary(), name);
+      const NDBTAB *new_tab= ndbtab.get_table();
+      DBUG_ASSERT(new_tab != 0);
+      if (new_tab)
+      {
+        table_id= new_tab->getObjectId();
+        table_version= new_tab->getObjectVersion();
+      }
+    }
+  }
+
   /*
     all mysqld's will switch to using the new_op, and delete the old
     event operation
   */
   ndbcluster_log_schema_op(thd, thd->query(), thd->query_length(),
                            db, name,
-                           0, 0,
+                           table_id, table_version,
                            SOT_ONLINE_ALTER_TABLE_COMMIT,
                            0, 0, 0);
 
+  delete alter_data;
+  alter_info->data= 0;
   DBUG_RETURN(0);
 }
 
@@ -12816,6 +12870,7 @@ int ndbcluster_alter_tablespace(handlert
   }
   dict= ndb->getDictionary();
 
+  uint32 table_id= 0, table_version= 0;
   switch (alter_info->ts_cmd_type){
   case (CREATE_TABLESPACE):
   {
@@ -12838,6 +12893,8 @@ int ndbcluster_alter_tablespace(handlert
       DBUG_PRINT("error", ("createTablespace returned %d", error));
       goto ndberror;
     }
+    table_id = objid.getObjectId();
+    table_version = objid.getObjectVersion();
     if (dict->getWarningFlags() &
         NdbDictionary::Dictionary::WarnExtentRoundUp)
     {
@@ -12890,10 +12947,13 @@ int ndbcluster_alter_tablespace(handlert
 	DBUG_RETURN(1);
       }
       errmsg= " CREATE DATAFILE";
-      if (dict->createDatafile(ndb_df))
+      NdbDictionary::ObjectId objid;
+      if (dict->createDatafile(ndb_df, false, &objid))
       {
 	goto ndberror;
       }
+      table_id= objid.getObjectId();
+      table_version= objid.getObjectVersion();
       if (dict->getWarningFlags() &
           NdbDictionary::Dictionary::WarnDatafileRoundUp)
       {
@@ -12916,6 +12976,8 @@ int ndbcluster_alter_tablespace(handlert
       NdbDictionary::Datafile df= dict->getDatafile(0, alter_info->data_file_name);
       NdbDictionary::ObjectId objid;
       df.getTablespaceId(&objid);
+      table_id = df.getObjectId();
+      table_version = df.getObjectVersion();
       if (ts.getObjectId() == objid.getObjectId() && 
 	  strcmp(df.getPath(), alter_info->data_file_name) == 0)
       {
@@ -12963,6 +13025,8 @@ int ndbcluster_alter_tablespace(handlert
     {
       goto ndberror;
     }
+    table_id = objid.getObjectId();
+    table_version = objid.getObjectVersion();
     if (dict->getWarningFlags() &
         NdbDictionary::Dictionary::WarnUndobufferRoundUp)
     {
@@ -13013,10 +13077,13 @@ int ndbcluster_alter_tablespace(handlert
       DBUG_RETURN(1);
     }
     errmsg= "CREATE UNDOFILE";
-    if (dict->createUndofile(ndb_uf))
+    NdbDictionary::ObjectId objid;
+    if (dict->createUndofile(ndb_uf, false, &objid))
     {
       goto ndberror;
     }
+    table_id = objid.getObjectId();
+    table_version = objid.getObjectVersion();
     if (dict->getWarningFlags() &
         NdbDictionary::Dictionary::WarnUndofileRoundDown)
     {
@@ -13030,7 +13097,11 @@ int ndbcluster_alter_tablespace(handlert
   {
     error= ER_DROP_FILEGROUP_FAILED;
     errmsg= "TABLESPACE";
-    if (dict->dropTablespace(dict->getTablespace(alter_info->tablespace_name)))
+    NdbDictionary::Tablespace ts=
+      dict->getTablespace(alter_info->tablespace_name);
+    table_id= ts.getObjectId();
+    table_version= ts.getObjectVersion();
+    if (dict->dropTablespace(ts))
     {
       goto ndberror;
     }
@@ -13041,7 +13112,11 @@ int ndbcluster_alter_tablespace(handlert
   {
     error= ER_DROP_FILEGROUP_FAILED;
     errmsg= "LOGFILE GROUP";
-    if (dict->dropLogfileGroup(dict->getLogfileGroup(alter_info->logfile_group_name)))
+    NdbDictionary::LogfileGroup lg=
+      dict->getLogfileGroup(alter_info->logfile_group_name);
+    table_id= lg.getObjectId();
+    table_version= lg.getObjectVersion();
+    if (dict->dropLogfileGroup(lg))
     {
       goto ndberror;
     }
@@ -13064,13 +13139,13 @@ int ndbcluster_alter_tablespace(handlert
     ndbcluster_log_schema_op(thd,
                              thd->query(), thd->query_length(),
                              "", alter_info->tablespace_name,
-                             0, 0,
+                             table_id, table_version,
                              SOT_TABLESPACE, 0, 0, 0);
   else
     ndbcluster_log_schema_op(thd,
                              thd->query(), thd->query_length(),
                              "", alter_info->logfile_group_name,
-                             0, 0,
+                             table_id, table_version,
                              SOT_LOGFILE_GROUP, 0, 0, 0);
   DBUG_RETURN(FALSE);
 

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2010-03-01 19:26:45 +0000
+++ b/sql/ha_ndbcluster.h	2010-09-15 18:26:11 +0000
@@ -108,13 +108,17 @@ public:
 		 const NdbDictionary::Table *table) :
     dictionary(dict),
     old_table(table),
-    new_table(new NdbDictionary::Table(*table))
+    new_table(new NdbDictionary::Table(*table)),
+      table_id(table->getObjectId()),
+      old_table_version(table->getObjectVersion())
   {}
   ~NDB_ALTER_DATA()
   { delete new_table; }
   NdbDictionary::Dictionary *dictionary;
   const  NdbDictionary::Table *old_table;
   NdbDictionary::Table *new_table;
+  Uint32 table_id;
+  Uint32 old_table_version;
 };
 
 typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
@@ -502,7 +506,7 @@ static void set_tabname(const char *path
     static member function as it needs to access private
     NdbTransaction methods
   */
-  static void release_completed_operations(Thd_ndb*, NdbTransaction*);
+  static void release_completed_operations(NdbTransaction*);
 
   /*
     Condition pushdown
@@ -572,7 +576,10 @@ static void set_tabname(const char *path
                          HA_ALTER_INFO *alter_info,
                          HA_ALTER_FLAGS *alter_flags);
 
-  int alter_table_phase3(THD *thd, TABLE *table);
+  int alter_table_phase3(THD *thd, TABLE *table,
+                         HA_CREATE_INFO *create_info,
+                         HA_ALTER_INFO *alter_info,
+                         HA_ALTER_FLAGS *alter_flags);
 
 private:
 #ifdef HAVE_NDB_BINLOG

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2010-09-13 08:06:11 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2010-09-29 10:55:48 +0000
@@ -142,6 +142,8 @@ typedef struct st_ndb_schema_object {
   uint use_count;
   MY_BITMAP slock_bitmap;
   uint32 slock[256/32]; // 256 bits for lock status of table
+  uint32 table_id;
+  uint32 table_version;
 } NDB_SCHEMA_OBJECT;
 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
                                                 my_bool create_if_not_exists,
@@ -672,6 +674,16 @@ ndbcluster_binlog_log_query(handlerton *
                        db, table_name, query));
   enum SCHEMA_OP_TYPE type;
   int log= 0;
+  uint32 table_id= 0, table_version= 0;
+  /*
+    Since databases aren't real ndb schema object
+    they don't have any id/version
+
+    But since that id/version is used to make sure that event's on SCHEMA_TABLE
+    is correct, we set random numbers
+  */
+  table_id = (uint32)rand();
+  table_version = (uint32)rand();
   switch (binlog_command)
   {
   case LOGCOM_CREATE_TABLE:
@@ -706,7 +718,7 @@ ndbcluster_binlog_log_query(handlerton *
   if (log)
   {
     ndbcluster_log_schema_op(thd, query, query_length,
-                             db, table_name, 0, 0, type,
+                             db, table_name, table_id, table_version, type,
                              0, 0, 0);
   }
   DBUG_VOID_RETURN;
@@ -941,6 +953,10 @@ static int ndbcluster_global_schema_lock
 
   if (thd_ndb->global_schema_lock_trans)
   {
+    if (ndb_extra_logging > 19)
+    {
+      sql_print_information("NDB: Global schema lock acquired");
+    }
     DBUG_RETURN(0);
   }
 
@@ -1005,6 +1021,10 @@ static int ndbcluster_global_schema_unlo
                           "ndb. Releasing global schema lock");
       DBUG_RETURN(-1);
     }
+    if (ndb_extra_logging > 19)
+    {
+      sql_print_information("NDB: Global schema lock release");
+    }
   }
   DBUG_RETURN(0);
 }
@@ -1670,7 +1690,9 @@ char *ndb_pack_varchar(const NDBCOL *col
 static int
 ndbcluster_update_slock(THD *thd,
                         const char *db,
-                        const char *table_name)
+                        const char *table_name,
+                        uint32 table_id,
+                        uint32 table_version)
 {
   DBUG_ENTER("ndbcluster_update_slock");
   if (!ndb_schema_share)
@@ -1748,7 +1770,24 @@ ndbcluster_update_slock(THD *thd,
     }
     if (trans->execute(NdbTransaction::NoCommit))
       goto err;
-    bitmap_clear_bit(&slock, node_id);
+
+    if (ndb_extra_logging > 19)
+    {
+      uint32 copy[SCHEMA_SLOCK_SIZE/4];
+      memcpy(copy, bitbuf, sizeof(copy));
+      bitmap_clear_bit(&slock, node_id);
+      sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
+                            db, table_name,
+                            table_id, table_version,
+                            copy[0], copy[1],
+                            slock.bitmap[0],
+                            slock.bitmap[1]);
+    }
+    else
+    {
+      bitmap_clear_bit(&slock, node_id);
+    }
+
     {
       NdbOperation *op= 0;
       int r= 0;
@@ -1778,10 +1817,12 @@ ndbcluster_update_slock(THD *thd,
       r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
       DBUG_ASSERT(r == 0);
     }
-    if (trans->execute(NdbTransaction::Commit) == 0)
+    if (trans->execute(NdbTransaction::Commit, 
+                       NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
     {
       DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
                           node_id, db, table_name));
+      dict->forceGCPWait(1);
       break;
     }
   err:
@@ -1821,7 +1862,8 @@ ndbcluster_update_slock(THD *thd,
 static void ndb_report_waiting(const char *key,
                                int the_time,
                                const char *op,
-                               const char *obj)
+                               const char *obj,
+                               const MY_BITMAP * map)
 {
   ulonglong ndb_latest_epoch= 0;
   const char *proc_info= "<no info>";
@@ -1831,19 +1873,79 @@ static void ndb_report_waiting(const cha
   if (injector_thd)
     proc_info= injector_thd->proc_info;
   pthread_mutex_unlock(&injector_mutex);
-  sql_print_information("NDB %s:"
-                        " waiting max %u sec for %s %s."
-                        "  epochs: (%u/%u,%u/%u,%u/%u)"
-                        "  injector proc_info: %s"
-                        ,key, the_time, op, obj
-                        ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
-                        ,(uint)(ndb_latest_handled_binlog_epoch)
-                        ,(uint)(ndb_latest_received_binlog_epoch >> 32)
-                        ,(uint)(ndb_latest_received_binlog_epoch)
-                        ,(uint)(ndb_latest_epoch >> 32)
-                        ,(uint)(ndb_latest_epoch)
-                        ,proc_info
-                        );
+  if (map == 0)
+  {
+    sql_print_information("NDB %s:"
+                          " waiting max %u sec for %s %s."
+                          "  epochs: (%u/%u,%u/%u,%u/%u)"
+                          "  injector proc_info: %s"
+                          ,key, the_time, op, obj
+                          ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
+                          ,(uint)(ndb_latest_handled_binlog_epoch)
+                          ,(uint)(ndb_latest_received_binlog_epoch >> 32)
+                          ,(uint)(ndb_latest_received_binlog_epoch)
+                          ,(uint)(ndb_latest_epoch >> 32)
+                          ,(uint)(ndb_latest_epoch)
+                          ,proc_info
+                          );
+  }
+  else
+  {
+    sql_print_information("NDB %s:"
+                          " waiting max %u sec for %s %s."
+                          "  epochs: (%u/%u,%u/%u,%u/%u)"
+                          "  injector proc_info: %s map: %x%x"
+                          ,key, the_time, op, obj
+                          ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
+                          ,(uint)(ndb_latest_handled_binlog_epoch)
+                          ,(uint)(ndb_latest_received_binlog_epoch >> 32)
+                          ,(uint)(ndb_latest_received_binlog_epoch)
+                          ,(uint)(ndb_latest_epoch >> 32)
+                          ,(uint)(ndb_latest_epoch)
+                          ,proc_info
+                          ,map->bitmap[0]
+                          ,map->bitmap[1]
+                          );
+  }
+}
+
+static
+const char*
+get_schema_type_name(uint type)
+{
+  switch(type){
+  case SOT_DROP_TABLE:
+    return "DROP_TABLE";
+  case SOT_CREATE_TABLE:
+    return "CREATE_TABLE";
+  case SOT_RENAME_TABLE_NEW:
+    return "RENAME_TABLE_NEW";
+  case SOT_ALTER_TABLE_COMMIT:
+    return "ALTER_TABLE_COMMIT";
+  case SOT_DROP_DB:
+    return "DROP_DB";
+  case SOT_CREATE_DB:
+    return "CREATE_DB";
+  case SOT_ALTER_DB:
+    return "ALTER_DB";
+  case SOT_CLEAR_SLOCK:
+    return "CLEAR_SLOCK";
+  case SOT_TABLESPACE:
+    return "TABLESPACE";
+  case SOT_LOGFILE_GROUP:
+    return "LOGFILE_GROUP";
+  case SOT_RENAME_TABLE:
+    return "RENAME_TABLE";
+  case SOT_TRUNCATE_TABLE:
+    return "TRUNCATE_TABLE";
+  case SOT_RENAME_TABLE_PREPARE:
+    return "RENAME_TABLE_PREPARE";
+  case SOT_ONLINE_ALTER_TABLE_PREPARE:
+    return "ONLINE_ALTER_TABLE_PREPARE";
+  case SOT_ONLINE_ALTER_TABLE_COMMIT:
+    return "ONLINE_ALTER_TABLE_COMMIT";
+  }
+  return "<unknown>";
 }
 
 int ndbcluster_log_schema_op(THD *thd,
@@ -1878,6 +1980,7 @@ int ndbcluster_log_schema_op(THD *thd,
   char tmp_buf2[FN_REFLEN];
   const char *type_str;
   int also_internal= 0;
+  uint32 log_type= (uint32)type;
   switch (type)
   {
   case SOT_DROP_TABLE:
@@ -1947,20 +2050,16 @@ int ndbcluster_log_schema_op(THD *thd,
     char key[FN_REFLEN + 1];
     build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
     ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
+    ndb_schema_object->table_id= ndb_table_id;
+    ndb_schema_object->table_version= ndb_table_version;
   }
 
   const NdbError *ndb_error= 0;
   uint32 node_id= g_ndb_cluster_connection->node_id();
   Uint64 epoch= 0;
-  MY_BITMAP schema_subscribers;
-  uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
-  char bitbuf_e[sizeof(bitbuf)];
-  bzero(bitbuf_e, sizeof(bitbuf_e));
   {
-    int i, updated= 0;
+    int i;
     int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
-    bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
-    bitmap_set_all(&schema_subscribers);
 
     /* begin protect ndb_schema_share */
     pthread_mutex_lock(&ndb_schema_share_mutex);
@@ -1974,48 +2073,20 @@ int ndbcluster_log_schema_op(THD *thd,
     pthread_mutex_lock(&ndb_schema_share->mutex);
     for (i= 0; i < no_storage_nodes; i++)
     {
-      MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
-      if (!bitmap_is_clear_all(table_subscribers))
-      {
-        bitmap_intersect(&schema_subscribers,
-                         table_subscribers);
-        updated= 1;
-      }
+      bitmap_union(&ndb_schema_object->slock_bitmap,
+                   &ndb_schema_share->subscriber_bitmap[i]);
     }
     pthread_mutex_unlock(&ndb_schema_share->mutex);
     pthread_mutex_unlock(&ndb_schema_share_mutex);
     /* end protect ndb_schema_share */
 
-    if (updated)
-    {
-      bitmap_clear_bit(&schema_subscribers, node_id);
-      /*
-        if setting own acknowledge bit it is important that
-        no other mysqld's are registred, as subsequent code
-        will cause the original event to be hidden (by blob
-        merge event code)
-      */
-      if (bitmap_is_clear_all(&schema_subscribers))
-          bitmap_set_bit(&schema_subscribers, node_id);
-    }
-    else
-      bitmap_clear_all(&schema_subscribers);
-
     if (also_internal)
-      bitmap_set_bit(&schema_subscribers, node_id);        
-
-    if (ndb_schema_object)
-    {
-      pthread_mutex_lock(&ndb_schema_object->mutex);
-      memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
-             sizeof(ndb_schema_object->slock));
-      pthread_mutex_unlock(&ndb_schema_object->mutex);
-    }
+      bitmap_set_bit(&ndb_schema_object->slock_bitmap, node_id);
+    else
+      bitmap_clear_bit(&ndb_schema_object->slock_bitmap, node_id);
 
-    DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
-              no_bytes_in_map(&schema_subscribers));
-    DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
-                        bitmap_is_clear_all(&schema_subscribers)));
+    DBUG_DUMP("schema_subscribers", (uchar*)&ndb_schema_object->slock,
+              no_bytes_in_map(&ndb_schema_object->slock_bitmap));
   }
 
   Ndb *ndb= thd_ndb->ndb;
@@ -2060,8 +2131,7 @@ int ndbcluster_log_schema_op(THD *thd,
   {
     const char *log_db= db;
     const char *log_tab= table_name;
-    const char *log_subscribers= (char*)schema_subscribers.bitmap;
-    uint32 log_type= (uint32)type;
+    const char *log_subscribers= (char*)ndb_schema_object->slock;
     if ((trans= ndb->startTransaction()) == 0)
       goto err;
     while (1)
@@ -2083,7 +2153,8 @@ int ndbcluster_log_schema_op(THD *thd,
       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
       DBUG_ASSERT(r == 0);
       /* slock */
-      DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
+      DBUG_ASSERT(sz[SCHEMA_SLOCK_I] ==
+                  no_bytes_in_map(&ndb_schema_object->slock_bitmap));
       r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
       DBUG_ASSERT(r == 0);
       /* query */
@@ -2140,21 +2211,13 @@ int ndbcluster_log_schema_op(THD *thd,
 
       r|= op->setAnyValue(anyValue);
       DBUG_ASSERT(r == 0);
-#if 0
-      if (log_db != new_db && new_db && new_table_name)
-      {
-        log_db= new_db;
-        log_tab= new_table_name;
-        log_subscribers= bitbuf_e; // no ack expected on this
-        log_type= (uint32)SOT_RENAME_TABLE_NEW;
-        continue;
-      }
-#endif
       break;
     }
-    if (trans->execute(NdbTransaction::Commit) == 0)
+    if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
+                       1 /* force send */) == 0)
     {
       DBUG_PRINT("info", ("logged: %s", query));
+      dict->forceGCPWait(1);
       break;
     }
 err:
@@ -2185,21 +2248,25 @@ end:
     ndb->closeTransaction(trans);
   ndb->setDatabaseName(save_db);
 
+  if (ndb_extra_logging > 19)
+  {
+    sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
+                          db,
+                          table_name,
+                          ndb_table_id,
+                          ndb_table_version,
+                          get_schema_type_name(log_type),
+                          log_type,
+                          query,
+                          ndb_schema_object->slock_bitmap.bitmap[0],
+                          ndb_schema_object->slock_bitmap.bitmap[1]);
+  }
+
   /*
     Wait for other mysqld's to acknowledge the table operation
   */
-  if (ndb_error == 0 &&
-      !bitmap_is_clear_all(&schema_subscribers))
+  if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
   {
-    if (!also_internal)
-    {
-      /*
-        if own nodeid is set we are a single mysqld registred
-        as an optimization we update the slock directly
-      */
-      if (bitmap_is_set(&schema_subscribers, node_id))
-        ndbcluster_update_slock(thd, db, table_name);
-    }
     int max_timeout= opt_ndb_sync_timeout;
     pthread_mutex_lock(&ndb_schema_object->mutex);
     if (have_lock_open)
@@ -2226,24 +2293,24 @@ end:
         pthread_mutex_unlock(&ndb_schema_share_mutex);
         break;
       }
+      MY_BITMAP servers;
+      bitmap_init(&servers, 0, 256, FALSE);
+      bitmap_clear_all(&servers);
+      bitmap_set_bit(&servers, node_id); // "we" are always alive
       pthread_mutex_lock(&ndb_schema_share->mutex);
       for (i= 0; i < no_storage_nodes; i++)
       {
         /* remove any unsubscribed from schema_subscribers */
         MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
-        if (!bitmap_is_clear_all(tmp))
-          bitmap_intersect(&schema_subscribers, tmp);
+        bitmap_union(&servers, tmp);
       }
       pthread_mutex_unlock(&ndb_schema_share->mutex);
       pthread_mutex_unlock(&ndb_schema_share_mutex);
       /* end protect ndb_schema_share */
 
       /* remove any unsubscribed from ndb_schema_object->slock */
-      bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
-
-      DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
-                (uchar*)ndb_schema_object->slock_bitmap.bitmap,
-                no_bytes_in_map(&ndb_schema_object->slock_bitmap));
+      bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
+      bitmap_free(&servers);
 
       if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
         break;
@@ -2255,11 +2322,13 @@ end:
         {
           sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
                           type_str, ndb_schema_object->key);
+          DBUG_ASSERT(false);
           break;
         }
         if (ndb_extra_logging)
           ndb_report_waiting(type_str, max_timeout,
-                             "distributing", ndb_schema_object->key);
+                             "distributing", ndb_schema_object->key,
+                             &ndb_schema_object->slock_bitmap);
       }
     }
     if (have_lock_open)
@@ -2268,10 +2337,34 @@ end:
     }
     pthread_mutex_unlock(&ndb_schema_object->mutex);
   }
+  else if (ndb_error)
+  {
+    sql_print_error("NDB %s: distributing %s err: %u",
+                    type_str, ndb_schema_object->key,
+                    ndb_error->code);
+  }
+  else if (ndb_extra_logging > 19)
+  {
+    sql_print_information("NDB %s: not waiting for distributing %s",
+                          type_str, ndb_schema_object->key);
+  }
 
   if (ndb_schema_object)
     ndb_free_schema_object(&ndb_schema_object, FALSE);
 
+  if (ndb_extra_logging > 19)
+  {
+    sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
+                          " - complete!",
+                          db,
+                          table_name,
+                          ndb_table_id,
+                          ndb_table_version,
+                          get_schema_type_name(log_type),
+                          log_type,
+                          query);
+  }
+
   DBUG_RETURN(0);
 }
 
@@ -2468,6 +2561,19 @@ ndb_binlog_thread_handle_schema_event(TH
                   schema->db, schema->name,
                   schema->query_length, schema->query,
                   schema_type));
+
+      if (ndb_extra_logging > 19)
+      {
+        sql_print_information("NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
+                              schema->db, schema->name,
+                              schema->id, schema->version,
+                              schema->query,
+                              get_schema_type_name(schema_type),
+                              schema_type,
+                              schema->node_id,
+                              slock.bitmap[0], slock.bitmap[1]);
+      }
+
       if ((schema->db[0] == 0) && (schema->name[0] == 0))
         DBUG_RETURN(0);
       switch (schema_type)
@@ -2677,7 +2783,8 @@ ndb_binlog_thread_handle_schema_event(TH
           if (post_epoch_unlock)
             post_epoch_unlock_list->push_back(schema, mem_root);
           else
-            ndbcluster_update_slock(thd, schema->db, schema->name);
+            ndbcluster_update_slock(thd, schema->db, schema->name,
+                                    schema->id, schema->version);
         }
       }
       DBUG_RETURN(0);
@@ -2827,9 +2934,21 @@ ndb_binlog_thread_handle_schema_event_po
         NDB_SCHEMA_OBJECT *ndb_schema_object=
           (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
                                               (const uchar*) key, strlen(key));
-        if (ndb_schema_object)
+        if (ndb_schema_object &&
+            (ndb_schema_object->table_id == schema->id &&
+             ndb_schema_object->table_version == schema->version))
         {
           pthread_mutex_lock(&ndb_schema_object->mutex);
+          if (ndb_extra_logging > 19)
+          {
+            sql_print_information("NDB: CLEAR_SLOCK key: %s(%u/%u) from"
+                                  " %x%x to %x%x",
+                                  key, schema->id, schema->version,
+                                  ndb_schema_object->slock[0],
+                                  ndb_schema_object->slock[1],
+                                  schema->slock[0],
+                                  schema->slock[1]);
+          }
           memcpy(ndb_schema_object->slock, schema->slock,
                  sizeof(ndb_schema_object->slock));
           DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
@@ -2838,6 +2957,24 @@ ndb_binlog_thread_handle_schema_event_po
           pthread_mutex_unlock(&ndb_schema_object->mutex);
           pthread_cond_signal(&injector_cond);
         }
+        else if (ndb_extra_logging > 19)
+        {
+          if (ndb_schema_object == 0)
+          {
+            sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
+                                  key, schema->id, schema->version);
+          }
+          else
+          {
+            sql_print_information("NDB: Discarding event...key: %s "
+                                  "non matching id/version [%u/%u] != [%u/%u]",
+                                  key,
+                                  ndb_schema_object->table_id,
+                                  ndb_schema_object->table_version,
+                                  schema->id,
+                                  schema->version);
+          }
+        }
         pthread_mutex_unlock(&ndbcluster_mutex);
         continue;
       }
@@ -3162,7 +3299,8 @@ ndb_binlog_thread_handle_schema_event_po
   }
   while ((schema= post_epoch_unlock_list->pop()))
   {
-    ndbcluster_update_slock(thd, schema->db, schema->name);
+    ndbcluster_update_slock(thd, schema->db, schema->name,
+                            schema->id, schema->version);
   }
   DBUG_VOID_RETURN;
 }
@@ -4770,11 +4908,12 @@ ndbcluster_handle_alter_table(THD *thd, 
       {
         sql_print_error("NDB %s: %s timed out. Ignoring...",
                         type_str, share->key);
+        DBUG_ASSERT(false);
         break;
       }
       if (ndb_extra_logging)
         ndb_report_waiting(type_str, max_timeout,
-                           type_str, share->key);
+                           type_str, share->key, 0);
     }
   }
   pthread_mutex_unlock(&share->mutex);
@@ -4843,11 +4982,12 @@ ndbcluster_handle_drop_table(THD *thd, N
       {
         sql_print_error("NDB %s: %s timed out. Ignoring...",
                         type_str, share->key);
+        DBUG_ASSERT(false);
         break;
       }
       if (ndb_extra_logging)
         ndb_report_waiting(type_str, max_timeout,
-                           type_str, share->key);
+                           type_str, share->key, 0);
     }
   }
   pthread_mutex_lock(&LOCK_open);

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2010-08-20 14:07:55 +0000
+++ b/sql/handler.cc	2010-09-15 18:26:11 +0000
@@ -3565,7 +3565,10 @@ handler::alter_table_phase2(THD *thd,
 }
 
 int
-handler::alter_table_phase3(THD *thd, TABLE *table)
+handler::alter_table_phase3(THD *thd, TABLE *table,
+                            HA_CREATE_INFO *create_info,
+                            HA_ALTER_INFO *alter_info,
+                            HA_ALTER_FLAGS *alter_flags)
 {
   DBUG_ENTER("alter_table_phase3");
   DBUG_RETURN(0);

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2010-08-20 13:04:07 +0000
+++ b/sql/handler.h	2010-09-15 18:26:11 +0000
@@ -1879,7 +1879,7 @@ public:
    @param     altered_table     A temporary table show what table is to
                                 change to
    @param     alter_info        Storage place for data used during phase1
-                                and phase2
+                                and phase2 and phase3
    @param     alter_flags       Bitmask that shows what will be changed
 
    @retval   0      OK
@@ -1897,8 +1897,10 @@ public:
     @param    thd               The thread handle
     @param    altered_table     A temporary table show what table is to
                                 change to
+    @param    create_info       Information from the parsing phase about new
+                                table properties.
     @param    alter_info        Storage place for data used during phase1
-                                and phase2
+                                and phase2 and phase3
     @param    alter_flags       Bitmask that shows what will be changed
 
     @retval  0      OK
@@ -1921,8 +1923,16 @@ public:
 
     @param    thd               The thread handle
     @param    table             The altered table, re-opened
+    @param    create_info       Information from the parsing phase about new
+                                table properties.
+    @param    alter_info        Storage place for data used during phase1
+                                and phase2 and phase3
+    @param    alter_flags       Bitmask that shows what has been changed
  */
- virtual int alter_table_phase3(THD *thd, TABLE *table);
+ virtual int alter_table_phase3(THD *thd, TABLE *table,
+                                HA_CREATE_INFO *create_info,
+                                HA_ALTER_INFO *alter_info,
+                                HA_ALTER_FLAGS *alter_flags);
 
   /**
     use_hidden_primary_key() is called in case of an update/delete when

=== modified file 'sql/sql_show.cc'
--- a/sql/sql_show.cc	2010-06-09 21:11:56 +0000
+++ b/sql/sql_show.cc	2010-09-17 14:01:17 +0000
@@ -3353,6 +3353,9 @@ int get_all_tables(THD *thd, TABLE_LIST 
   uint table_open_method;
   DBUG_ENTER("get_all_tables");
 
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+  global_schema_lock_guard.lock();
+
   lex->view_prepare_mode= TRUE;
   lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
 

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2010-08-20 13:04:07 +0000
+++ b/sql/sql_table.cc	2010-09-15 18:26:11 +0000
@@ -6442,7 +6442,10 @@ int mysql_fast_or_online_alter_table(THD
       Tell the handler that the changed frm is on disk and table
       has been re-opened
    */
-    if ((error= t_table->file->alter_table_phase3(thd, t_table)))
+    if ((error= t_table->file->alter_table_phase3(thd, t_table,
+                                                  create_info,
+                                                  alter_info,
+                                                  ha_alter_flags)))
     {
       goto err;
     }

=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2009-10-12 06:21:54 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2010-09-20 12:44:28 +0000
@@ -188,7 +188,7 @@ extern const GlobalSignalNumber NO_OF_SI
 #define GSN_ROUTE_ORD                   121
 #define GSN_NODE_VERSION_REP            122
 /* 123 unused */
-/* 124 unused */
+#define GSN_FSSUSPENDORD                124 /* local */
 #define GSN_CHECK_LCP_STOP              125
 #define GSN_CLOSE_COMCONF               126 /* local */
 #define GSN_CLOSE_COMREQ                127 /* local */

=== modified file 'storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp	2009-05-27 12:11:46 +0000
+++ b/storage/ndb/include/kernel/signaldata/FsReadWriteReq.hpp	2010-09-20 12:44:28 +0000
@@ -172,5 +172,12 @@ FsReadWriteReq::getPartialReadFlag(UintR
   return (opFlag >> PARTIAL_READ_SHIFT) & 1;
 }
 
+struct FsSuspendOrd
+{
+  UintR filePointer;          // DATA 0
+  Uint32 milliseconds;
+
+  STATIC_CONST(SignalLength = 2);
+};
 
 #endif

=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h	2010-02-22 09:44:41 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h	2010-09-22 14:10:27 +0000
@@ -583,6 +583,25 @@ extern "C" {
    */
   const char *ndb_mgm_get_connected_bind_address(NdbMgmHandle handle);
 
+  /**
+   * Get the version of the mgm server we're talking to.
+   *
+   * @param   handle         Management handle
+   * @param   major          Returns the major version number for NDB
+   * @param   minor          Returns the minor version number for NDB
+   * @param   build          Returns the build version number for NDB
+   * @param   len            Specifies the max size of the buffer
+   *                         available to return version string in
+   * @param   str            Pointer to buffer where to return the
+   *                         version string which is in the
+   *                         form "mysql-X.X.X ndb-Y.Y.Y-status"
+   *
+   * @return  0 for error and 1 for success
+   */
+  int ndb_mgm_get_version(NdbMgmHandle handle,
+                          int *major, int *minor, int* build,
+                          int len, char* str);
+
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   /** @} *********************************************************************/
   /**
@@ -1183,16 +1202,6 @@ extern "C" {
   Uint32 ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle);
 
   /**
-   * Get the version of the mgm server we're talking to.
-   * Designed to allow switching of protocol depending on version
-   * so that new clients can speak to old servers in a compat mode
-   */
-  int ndb_mgm_get_version(NdbMgmHandle handle,
-                          int *major, int *minor, int* build,
-                          int len, char* str);
-
-
-  /**
    * Config iterator
    */
   typedef struct ndb_mgm_configuration_iterator ndb_mgm_configuration_iterator;

=== modified file 'storage/ndb/include/ndbapi/NdbDictionary.hpp'
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp	2010-05-27 08:51:31 +0000
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp	2010-09-22 12:06:29 +0000
@@ -2256,6 +2256,7 @@ public:
      * Force gcp and wait for gcp complete
      */
     int forceGCPWait();
+    int forceGCPWait(int type);
 #endif
 
     /** @} *******************************************************************/
@@ -2326,6 +2327,7 @@ public:
     int dropIndexGlobal(const Index &index);
     int removeIndexGlobal(const Index &ndbidx, int invalidate) const;
     int removeTableGlobal(const Table &ndbtab, int invalidate) const;
+    void invalidateDbGlobal(const char * dbname);
 #endif
 
     /*

=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2010-09-06 08:14:08 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2010-09-20 12:44:28 +0000
@@ -3,7 +3,7 @@ Next NDBCNTR 1002
 Next NDBFS 2000
 Next DBACC 3002
 Next DBTUP 4032
-Next DBLQH 5057
+Next DBLQH 5060
 Next DBDICT 6025
 Next DBDIH 7226
 Next DBTC 8088

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2010-09-06 08:14:08 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2010-09-13 14:25:36 +0000
@@ -7499,6 +7499,9 @@ Dbdict::prepDropTab_complete(Signal* sig
   signal->theData[2] = gci_hi;
   signal->theData[3] = gci_lo;
   sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 20, 4);
+
+  signal->theData[0] = 6099;
+  sendSignal(DBDIH_REF, GSN_DUMP_STATE_ORD, signal, 1, JBB);
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2010-08-17 10:07:41 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2010-09-13 14:25:36 +0000
@@ -16091,6 +16091,20 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
     SET_ERROR_INSERT_VALUE2(7216, signal->theData[1]);
     return;
   }
+  DECLARE_DUMP0(DBDIH, 6099, "Start microgcp")
+  {
+    if (isMaster())
+    {
+      jam();
+      m_micro_gcp.m_master.m_start_time = 0;
+    }
+    else
+    {
+      jam();
+      sendSignal(cmasterdihref, GSN_DUMP_STATE_ORD, signal, 1, JBB);
+    }
+    return;
+  }
 }//Dbdih::execDUMP_STATE_ORD()
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2010-08-30 09:07:26 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2010-09-20 12:44:28 +0000
@@ -1432,10 +1432,13 @@ public:
       OPEN_SR_READ_INVALIDATE_PAGES = 21,
       CLOSE_SR_READ_INVALIDATE_PAGES = 22,
       OPEN_SR_WRITE_INVALIDATE_PAGES = 23,
-      CLOSE_SR_WRITE_INVALIDATE_PAGES = 24
+      CLOSE_SR_WRITE_INVALIDATE_PAGES = 24,
+      OPEN_SR_READ_INVALIDATE_SEARCH_FILES = 25,
+      CLOSE_SR_READ_INVALIDATE_SEARCH_FILES = 26,
+      CLOSE_SR_READ_INVALIDATE_SEARCH_LAST_FILE = 27
 #ifndef NO_REDO_OPEN_FILE_CACHE
-      ,OPEN_EXEC_LOG_CACHED = 25
-      ,CLOSING_EXEC_LOG_CACHED = 26
+      ,OPEN_EXEC_LOG_CACHED = 28
+      ,CLOSING_EXEC_LOG_CACHED = 29
 #endif
     };
     
@@ -1609,6 +1612,7 @@ public:
       READ_SR_INVALIDATE_PAGES = 18,
       WRITE_SR_INVALIDATE_PAGES = 19,
       WRITE_SR_INVALIDATE_PAGES_UPDATE_PAGE0 = 20
+      ,READ_SR_INVALIDATE_SEARCH_FILES = 21
     };
     /**
      * We have to remember the log pages read. 
@@ -2410,6 +2414,8 @@ private:
   Uint32 nextLogFilePtr(Uint32 logFilePtrI);
   void readFileInInvalidate(Signal *signal, int stepNext);
   void writeFileInInvalidate(Signal *signal, int stepPrev);
+  bool invalidateCloseFile(Signal*, Ptr<LogPartRecord>, Ptr<LogFileRecord>,
+                           LogFileRecord::LogFileStatus status);
   void exitFromInvalidate(Signal* signal);
   Uint32 calcPageCheckSum(LogPageRecordPtr logP);
   Uint32 handleLongTupKey(Signal* signal, Uint32* dataPtr, Uint32 len);
@@ -3101,6 +3107,9 @@ public:
 
   Uint32 get_node_status(Uint32 nodeId) const;
   bool check_ndb_versions() const;
+
+  void suspendFile(Signal* signal, Uint32 filePtrI, Uint32 millis);
+  void suspendFile(Signal* signal, Ptr<LogFileRecord> logFile, Uint32 millis);
 };
 
 inline

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2010-08-31 10:40:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2010-09-27 08:50:20 +0000
@@ -67,6 +67,7 @@
 #include <signaldata/RouteOrd.hpp>
 #include <signaldata/FsRef.hpp>
 #include <signaldata/FsReadWriteReq.hpp>
+#include <NdbEnv.h>
 
 #include "../suma/Suma.hpp"
 
@@ -145,7 +146,15 @@ operator<<(NdbOut& out, Operation_t op)
 //#define MARKER_TRACE 0
 //#define TRACE_SCAN_TAKEOVER 1
 
-#ifndef DEBUG_REDO
+#ifdef VM_TRACE
+#ifndef NDB_DEBUG_REDO
+#define NDB_DEBUG_REDO
+#endif
+#endif
+
+#ifdef NDB_DEBUG_REDO
+static int DEBUG_REDO = 0;
+#else
 #define DEBUG_REDO 0
 #endif
 
@@ -622,6 +631,15 @@ void Dblqh::execSTTOR(Signal* signal) 
     traceopout = &ndbout;
 #endif
     
+#ifdef NDB_DEBUG_REDO
+    {
+      char buf[100];
+      if (NdbEnv_GetEnv("NDB_DEBUG_REDO", buf, sizeof(buf)))
+      {
+        DEBUG_REDO = 1;
+      }
+    }
+#endif
     return;
     break;
   case 4:
@@ -13201,6 +13219,23 @@ void Dblqh::execFSCLOSECONF(Signal* sign
     readFileInInvalidate(signal, 2);
     return;
 
+  case LogFileRecord::CLOSE_SR_READ_INVALIDATE_SEARCH_FILES:
+    jam();
+    logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
+
+    logPartPtr.i = logFilePtr.p->logPartRec;
+    ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+
+    readFileInInvalidate(signal, 4);
+    return;
+  case LogFileRecord::CLOSE_SR_READ_INVALIDATE_SEARCH_LAST_FILE:
+    logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
+
+    logPartPtr.i = logFilePtr.p->logPartRec;
+    ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+
+    readFileInInvalidate(signal, 7);
+    return;
   case LogFileRecord::CLOSE_SR_WRITE_INVALIDATE_PAGES:
     jam();
     logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
@@ -13262,6 +13297,11 @@ void Dblqh::execFSOPENCONF(Signal* signa
     logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
     readFileInInvalidate(signal, 0);
     return;
+  case LogFileRecord::OPEN_SR_READ_INVALIDATE_SEARCH_FILES:
+    jam();
+    logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
+    readFileInInvalidate(signal, 5);
+    return;
   case LogFileRecord::OPEN_SR_WRITE_INVALIDATE_PAGES:
     jam();
     logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
@@ -13399,6 +13439,10 @@ void Dblqh::execFSREADCONF(Signal* signa
     jam();
     invalidateLogAfterLastGCI(signal);
     return;
+  case LogFileOperationRecord::READ_SR_INVALIDATE_SEARCH_FILES:
+    jam();
+    invalidateLogAfterLastGCI(signal);
+    return;
   case LogFileOperationRecord::READ_SR_FOURTH_PHASE:
     jam();
     releaseLfo(signal);
@@ -16621,7 +16665,8 @@ Dblqh::nextLogFilePtr(Uint32 logFilePtrI
   return tmp.p->nextLogFile;
 }
 
-void Dblqh::invalidateLogAfterLastGCI(Signal* signal)
+void
+Dblqh::invalidateLogAfterLastGCI(Signal* signal)
 {
   jam();
   if (logPartPtr.p->logExecState != LogPartRecord::LES_EXEC_LOG_INVALIDATE) {
@@ -16635,6 +16680,34 @@ void Dblqh::invalidateLogAfterLastGCI(Si
   }
 
   switch (lfoPtr.p->lfoState) {
+  case LogFileOperationRecord::READ_SR_INVALIDATE_SEARCH_FILES:
+  {
+    jam();
+    // Check if this file contains pages needing to be invalidated
+    ndbrequire(logPartPtr.p->invalidatePageNo == 1);
+    bool ok = logPagePtr.p->logPageWord[ZPOS_LOG_LAP] == logPartPtr.p->logLap;
+    releaseLfo(signal);
+    releaseLogpage(signal);
+    if (ok)
+    {
+      jam();
+      // This page must be invalidated.
+      // We search next file
+      readFileInInvalidate(signal, 3);
+      return;
+    }
+    else
+    {
+      jam();
+      /**
+       * This file doest not need to be invalidated...move to previous
+       *   file and search forward linear
+       */
+      readFileInInvalidate(signal, 6);
+      return;
+    }
+    break;
+  }
   case LogFileOperationRecord::READ_SR_INVALIDATE_PAGES:
     jam();
     // Check if this page must be invalidated.
@@ -16689,22 +16762,10 @@ void Dblqh::invalidateLogAfterLastGCI(Si
         }
       }
       
-      if (logFilePtr.p->fileNo != 0 &&
-          logFilePtr.i != logPartPtr.p->currentLogfile &&
-          logFilePtr.i != nextLogFilePtr(logPartPtr.p->currentLogfile))
+      if (invalidateCloseFile(signal, logPartPtr, logFilePtr,
+                              LogFileRecord::CLOSE_SR_WRITE_INVALIDATE_PAGES))
       {
         jam();
-        if (DEBUG_REDO)
-        {
-          ndbout_c("invalidate part: %u close %u(%u) (write) (%u)",
-                   logPartPtr.i,
-                   logFilePtr.p->fileNo,
-                   logFilePtr.i,
-                   logPartPtr.p->currentLogfile);
-        }
-        logFilePtr.p->logFileStatus =
-          LogFileRecord::CLOSE_SR_WRITE_INVALIDATE_PAGES;
-        closeFile(signal, logFilePtr, __LINE__);
         return;
       }
       writeFileInInvalidate(signal, 1); // step prev
@@ -16776,31 +16837,63 @@ Dblqh::writeFileInInvalidate(Signal* sig
   return;
 }//Dblqh::invalidateLogAfterLastGCI
 
+bool
+Dblqh::invalidateCloseFile(Signal* signal,
+                           Ptr<LogPartRecord> partPtr,
+                           Ptr<LogFileRecord> filePtr,
+                           LogFileRecord::LogFileStatus status)
+{
+  jam();
+  if (filePtr.p->fileNo != 0 &&
+      filePtr.i != partPtr.p->currentLogfile &&
+      filePtr.i != nextLogFilePtr(logPartPtr.p->currentLogfile))
+  {
+    jam();
+    if (DEBUG_REDO)
+    {
+      ndbout_c("invalidate part: %u close %u(%u) state: %u (%u)",
+               logPartPtr.i,
+               logFilePtr.p->fileNo,
+               logFilePtr.i,
+               (Uint32)status,
+               logPartPtr.p->currentLogfile);
+    }
+    filePtr.p->logFileStatus = status;
+    closeFile(signal, filePtr, __LINE__);
+    return true;
+  }
+  return false;
+}
+
 void Dblqh::readFileInInvalidate(Signal* signal, int stepNext)
 {
   jam();
 
+  if (DEBUG_REDO)
+  {
+    ndbout_c("readFileInInvalidate part: %u file: %u stepNext: %u",
+             logPartPtr.i, logFilePtr.p->fileNo, stepNext);
+  }
+
+  if (stepNext == 0)
+  {
+    jam();
+    // Contact NDBFS. Real time break.
+    readSinglePage(signal, logPartPtr.p->invalidatePageNo);
+    lfoPtr.p->lfoState = LogFileOperationRecord::READ_SR_INVALIDATE_PAGES;
+    return;
+  }
+
   if (stepNext == 1)
   {
+    jam();
     logPartPtr.p->invalidatePageNo++;
     if (logPartPtr.p->invalidatePageNo == (clogFileSize * ZPAGES_IN_MBYTE))
     {
-      if (logFilePtr.p->fileNo != 0 &&
-          logFilePtr.i != logPartPtr.p->currentLogfile &&
-          logFilePtr.i != nextLogFilePtr(logPartPtr.p->currentLogfile))
+      if (invalidateCloseFile(signal, logPartPtr, logFilePtr,
+                              LogFileRecord::CLOSE_SR_READ_INVALIDATE_PAGES))
       {
         jam();
-        if (DEBUG_REDO)
-        {
-          ndbout_c("invalidate part: %u close %u(%u) (read) (%u)",
-                   logPartPtr.i,
-                   logFilePtr.p->fileNo,
-                   logFilePtr.i,
-                   logPartPtr.p->currentLogfile);
-        }
-        logFilePtr.p->logFileStatus =
-          LogFileRecord::CLOSE_SR_READ_INVALIDATE_PAGES;
-        closeFile(signal, logFilePtr, __LINE__);
         return;
       }
       else
@@ -16809,6 +16902,14 @@ void Dblqh::readFileInInvalidate(Signal*
         stepNext = 2; // After close
       }
     }
+    else
+    {
+      jam();
+      // Contact NDBFS. Real time break.
+      readSinglePage(signal, logPartPtr.p->invalidatePageNo);
+      lfoPtr.p->lfoState = LogFileOperationRecord::READ_SR_INVALIDATE_PAGES;
+      return;
+    }
   }
   
   if (stepNext == 2)
@@ -16830,28 +16931,144 @@ void Dblqh::readFileInInvalidate(Signal*
       logPartPtr.p->logLap++;
       if (DEBUG_REDO)
       {
-        ndbout_c("readFileInInvalidate part: %u wrap to file 0 -> logLap: %u",
-                 logPartPtr.i, logPartPtr.p->logLap);
+        ndbout_c("readFileInInvalidate part: %u step: %u wrap to file 0 -> logLap: %u",
+                 logPartPtr.i, stepNext, logPartPtr.p->logLap);
       }
     }
 
+stepNext_2:
     if (logFilePtr.p->logFileStatus != LogFileRecord::OPEN)
     {
       jam();
       if (DEBUG_REDO)
       {
-        ndbout_c("invalidate part: %u open for read %u",
-                 logPartPtr.i, logFilePtr.p->fileNo);
+        ndbout_c("invalidate part: %u step: %u open for read %u",
+                 logPartPtr.i, stepNext, logFilePtr.p->fileNo);
       }
       logFilePtr.p->logFileStatus =LogFileRecord::OPEN_SR_READ_INVALIDATE_PAGES;
       openFileRw(signal, logFilePtr);
       return;
     }
+
+    // Contact NDBFS. Real time break.
+    readSinglePage(signal, logPartPtr.p->invalidatePageNo);
+    lfoPtr.p->lfoState = LogFileOperationRecord::READ_SR_INVALIDATE_PAGES;
+    return;
+  }
+
+  if (stepNext == 3)
+  {
+    jam();
+    if (invalidateCloseFile
+        (signal, logPartPtr, logFilePtr,
+         LogFileRecord::CLOSE_SR_READ_INVALIDATE_SEARCH_FILES))
+    {
+      jam();
+      return;
+    }
+    stepNext = 4;
   }
 
-  // Contact NDBFS. Real time break.
-  readSinglePage(signal, logPartPtr.p->invalidatePageNo); 
-  lfoPtr.p->lfoState = LogFileOperationRecord::READ_SR_INVALIDATE_PAGES;
+  if (stepNext == 4)
+  {
+    jam();
+    logFilePtr.i = logFilePtr.p->nextLogFile;
+    ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
+    logPartPtr.p->invalidateFileNo = logFilePtr.p->fileNo;
+    // Page 0 is used for file descriptors.
+    logPartPtr.p->invalidatePageNo = 1;
+
+    if (logFilePtr.p->fileNo == 0)
+    {
+      /**
+       * We're wrapping in the log...
+       *   update logLap
+       */
+      logPartPtr.p->logLap++;
+      if (DEBUG_REDO)
+      {
+        ndbout_c("readFileInInvalidate part: %u step: %u wrap to file 0 -> logLap: %u",
+                 logPartPtr.i, stepNext, logPartPtr.p->logLap);
+      }
+    }
+
+    if (logFilePtr.p->logFileStatus != LogFileRecord::OPEN)
+    {
+      jam();
+      if (DEBUG_REDO)
+      {
+        ndbout_c("invalidate part: %u step: %u open for read %u",
+                 logPartPtr.i, stepNext, logFilePtr.p->fileNo);
+      }
+      logFilePtr.p->logFileStatus =
+        LogFileRecord::OPEN_SR_READ_INVALIDATE_SEARCH_FILES;
+      openFileRw(signal, logFilePtr);
+      return;
+    }
+    stepNext = 5;
+  }
+
+  if (stepNext == 5)
+  {
+    jam();
+    // Contact NDBFS. Real time break.
+    readSinglePage(signal, logPartPtr.p->invalidatePageNo);
+    lfoPtr.p->lfoState =
+      LogFileOperationRecord::READ_SR_INVALIDATE_SEARCH_FILES;
+    return;
+  }
+
+  if (stepNext == 6)
+  {
+    jam();
+    if (invalidateCloseFile
+        (signal, logPartPtr, logFilePtr,
+         LogFileRecord::CLOSE_SR_READ_INVALIDATE_SEARCH_LAST_FILE))
+    {
+      jam();
+      return;
+    }
+    stepNext = 7;
+  }
+
+  if (stepNext == 7)
+  {
+    jam();
+
+    if (logFilePtr.p->fileNo == 0)
+    {
+      jam();
+      /**
+       * We're wrapping in the log...
+       *   update logLap
+       */
+      logPartPtr.p->logLap--;
+      ndbrequire(logPartPtr.p->logLap); // Should always be > 0
+      if (DEBUG_REDO)
+      {
+        ndbout_c("invalidateLogAfterLastGCI part: %u step: %u wrap from file 0 -> logLap: %u",
+                 logPartPtr.i, stepNext, logPartPtr.p->logLap);
+      }
+    }
+
+    logFilePtr.i = logFilePtr.p->prevLogFile;
+    ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
+
+    logPartPtr.p->invalidateFileNo = logFilePtr.p->fileNo;
+    // Page 0 is used for file descriptors.
+    logPartPtr.p->invalidatePageNo = 1;
+
+    if (logPartPtr.p->invalidateFileNo == logPartPtr.p->headFileNo)
+    {
+      jam();
+      logPartPtr.p->invalidatePageNo = logPartPtr.p->headPageNo;
+      readFileInInvalidate(signal, 1);
+      return;
+    }
+
+    goto stepNext_2;
+  }
+  ndbrequire(false);
 }
 
 void Dblqh::exitFromInvalidate(Signal* signal)
@@ -17309,7 +17526,7 @@ void Dblqh::readSrFourthZeroLab(Signal* 
   logPartPtr.p->invalidatePageNo = logPartPtr.p->headPageNo;
   logPartPtr.p->logExecState = LogPartRecord::LES_EXEC_LOG_INVALIDATE;
    
-  readFileInInvalidate(signal, 1);
+  readFileInInvalidate(signal, 3);
   return;
 }//Dblqh::readSrFourthZeroLab()
 
@@ -20074,6 +20291,31 @@ void Dblqh::writeNextLog(Signal* signal)
     jam();
     logPartPtr.p->m_tail_problem = true;
   }
+
+  if (ERROR_INSERTED(5058) &&
+      (twnlNextMbyte + 3 >= clogFileSize) &&
+      logFilePtr.p->fileNo != 0 &&
+      logFilePtr.p->nextLogFile != logPartPtr.p->firstLogfile)
+  {
+    jam();
+    srand((int)time(0));
+    Uint32 wait = 3 + (rand() % 5);
+
+    suspendFile(signal, logFilePtr, /* forever */ 0);
+    suspendFile(signal, logPartPtr.p->firstLogfile, /* forever */ 0);
+    signal->theData[0] = 9999;
+    sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, wait * 1000, 1);
+    CLEAR_ERROR_INSERT_VALUE;
+  }
+
+  if (ERROR_INSERTED(5059) &&
+      twnlNextMbyte == 4 &&
+      logFilePtr.p->fileNo != 0)
+  {
+    signal->theData[0] = 9999;
+    sendSignal(CMVMI_REF, GSN_NDB_TAMPER, signal, 1, JBA);
+  }
+
 }//Dblqh::writeNextLog()
 
 bool
@@ -21546,3 +21788,22 @@ Dblqh::check_ndb_versions() const
   }
   return true;
 }
+
+void
+Dblqh::suspendFile(Signal* signal, Uint32 filePtrI, Uint32 millis)
+{
+  Ptr<LogFileRecord> tmp;
+  tmp.i = filePtrI;
+  ptrCheckGuard(tmp, clogFileFileSize, logFileRecord);
+  suspendFile(signal, tmp, millis);
+}
+
+void
+Dblqh::suspendFile(Signal* signal, Ptr<LogFileRecord> logFilePtr, Uint32 millis)
+{
+  SaveSignal<FsSuspendOrd::SignalLength> tmp(signal);
+  signal->theData[0] = logFilePtr.p->fileRef;
+  signal->theData[1] = millis;
+  sendSignal(NDBFS_REF, GSN_FSSUSPENDORD, signal, 2, JBA);
+}
+

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2010-09-03 05:30:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2010-09-24 11:06:19 +0000
@@ -1679,7 +1679,7 @@ public:
    * index node.  TUX reads and writes the node directly via pointer.
    */
   int tuxAllocNode(Uint8* jambase, Uint32* jamidx, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node);
-  void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
+  void tuxFreeNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
   void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
 
   /*

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2010-09-03 05:30:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2010-09-24 11:06:19 +0000
@@ -75,10 +75,8 @@ Dbtup::tuxAllocNode(Uint8* jambase, Uint
   return 0;
 }
 
-#if 0
 void
-Dbtup::tuxFreeNode(Signal* signal,
-                   Uint32 fragPtrI,
+Dbtup::tuxFreeNode(Uint32 fragPtrI,
                    Uint32 pageId,
                    Uint32 pageOffset,
                    Uint32* node)
@@ -90,15 +88,19 @@ Dbtup::tuxFreeNode(Signal* signal,
   TablerecPtr tablePtr;
   tablePtr.i= fragPtr.p->fragTableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  Local_key key;
+  key.m_page_no = pageId;
+  key.m_page_idx = pageOffset;
   PagePtr pagePtr;
-  pagePtr.i= pageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+  Tuple_header* ptr = (Tuple_header*)get_ptr(&pagePtr, &key, tablePtr.p);
+
   Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
   Uint32 attrDataOffset= AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
-  ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
-  freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
+  ndbrequire(node == (Uint32*)ptr + attrDataOffset);
+
+  free_fix_rec(fragPtr.p, tablePtr.p, &key, (Fix_page*)pagePtr.p);
 }
-#endif
 
 void
 Dbtup::tuxGetNode(Uint32 fragPtrI,

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2010-09-03 05:30:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2010-09-14 18:19:46 +0000
@@ -2136,8 +2136,15 @@ Dbtup::start_restore_lcp(Uint32 tableId,
   tabPtr.i= tableId;
   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
   
-  tabPtr.p->m_dropTable.tabUserPtr= tabPtr.p->m_attributes[DD].m_no_of_fixsize;
-  tabPtr.p->m_dropTable.tabUserRef= tabPtr.p->m_attributes[DD].m_no_of_varsize;
+  ndbassert(tabPtr.p->m_attributes[DD].m_no_of_fixsize < (1 << 16));
+  ndbassert(tabPtr.p->m_attributes[DD].m_no_of_varsize < (1 << 16));
+  
+  Uint32 saveAttrCounts = 
+    (tabPtr.p->m_attributes[DD].m_no_of_fixsize << 16) |
+    (tabPtr.p->m_attributes[DD].m_no_of_varsize << 0);
+  
+  tabPtr.p->m_dropTable.tabUserPtr= saveAttrCounts;
+  tabPtr.p->m_dropTable.tabUserRef= (tabPtr.p->m_bits & Tablerec::TR_RowGCI)? 1 : 0;
   
   Uint32 *tabDesc = (Uint32*)(tableDescriptor+tabPtr.p->tabDescriptor);
   for(Uint32 i= 0; i<tabPtr.p->m_no_of_attributes; i++)
@@ -2154,6 +2161,8 @@ Dbtup::start_restore_lcp(Uint32 tableId,
   tabPtr.p->m_no_of_disk_attributes = 0;
   tabPtr.p->m_attributes[DD].m_no_of_fixsize = 0;
   tabPtr.p->m_attributes[DD].m_no_of_varsize = 0;
+  /* Avoid LQH trampling GCI restored in raw format */
+  tabPtr.p->m_bits &= ~((Uint16) Tablerec::TR_RowGCI);
 }
 void
 Dbtup::complete_restore_lcp(Signal* signal, 
@@ -2164,9 +2173,12 @@ Dbtup::complete_restore_lcp(Signal* sign
   tabPtr.i= tableId;
   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
   
-  tabPtr.p->m_attributes[DD].m_no_of_fixsize= tabPtr.p->m_dropTable.tabUserPtr;
-  tabPtr.p->m_attributes[DD].m_no_of_varsize= tabPtr.p->m_dropTable.tabUserRef;
-  
+  Uint32 restoreAttrCounts = tabPtr.p->m_dropTable.tabUserPtr;
+
+  tabPtr.p->m_attributes[DD].m_no_of_fixsize= restoreAttrCounts >> 16;
+  tabPtr.p->m_attributes[DD].m_no_of_varsize= restoreAttrCounts & 0xffff;
+  tabPtr.p->m_bits |= ((tabPtr.p->m_dropTable.tabUserRef & 1) ? Tablerec::TR_RowGCI : 0);
+
   tabPtr.p->m_no_of_disk_attributes = 
     tabPtr.p->m_attributes[DD].m_no_of_fixsize + 
     tabPtr.p->m_attributes[DD].m_no_of_varsize;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2010-08-20 10:18:47 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2010-09-24 11:06:19 +0000
@@ -493,7 +493,7 @@ private:
     Uint16 m_numAttrs;
     bool m_storeNullKey;
     TreeHead m_tree;
-    TupLoc m_freeLoc;           // list of free index nodes
+    TupLoc m_freeLoc;           // one free node for next op
     DLList<ScanOp> m_scanList;  // current scans on this fragment
     Uint32 m_tupIndexFragPtrI;
     Uint32 m_tupTableFragPtrI;
@@ -603,9 +603,11 @@ private:
    * DbtuxNode.cpp
    */
   int allocNode(TuxCtx&, NodeHandle& node);
+  void freeNode(NodeHandle& node);
   void selectNode(NodeHandle& node, TupLoc loc);
   void insertNode(NodeHandle& node);
   void deleteNode(NodeHandle& node);
+  void freePreallocatedNode(Frag& frag);
   void setNodePref(struct TuxCtx &, NodeHandle& node);
   // node operations
   void nodePushUp(TuxCtx&, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2010-01-12 07:26:46 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2010-09-24 11:06:19 +0000
@@ -144,8 +144,6 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
       {
         break;
       }
-      // link to freelist
-      node.setLink(0, frag.m_freeLoc);
       frag.m_freeLoc = node.m_loc;
       ndbrequire(frag.m_freeLoc != NullTupLoc);
     }

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2010-09-24 11:06:19 +0000
@@ -146,8 +146,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
         jam();
         break;
       }
-      // link to freelist
-      node.setLink(0, frag.m_freeLoc);
       frag.m_freeLoc = node.m_loc;
       ndbrequire(frag.m_freeLoc != NullTupLoc);
     }

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2010-09-24 11:06:19 +0000
@@ -54,6 +54,24 @@ Dbtux::allocNode(TuxCtx& ctx, NodeHandle
 }
 
 /*
+ * Free index node in TUP
+ */
+void
+Dbtux::freeNode(NodeHandle& node)
+{
+  Frag& frag = node.m_frag;
+  Uint32 pageId = node.m_loc.getPageId();
+  Uint32 pageOffset = node.m_loc.getPageOffset();
+  Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
+  c_tup->tuxFreeNode(frag.m_tupIndexFragPtrI,
+                     pageId, pageOffset, node32);
+  jamEntry();
+  // invalidate the handle
+  node.m_loc = NullTupLoc;
+  node.m_node = 0;
+}
+
+/*
  * Set handle to point to existing node.
  */
 void
@@ -77,9 +95,9 @@ void
 Dbtux::insertNode(NodeHandle& node)
 {
   Frag& frag = node.m_frag;
-  // unlink from freelist
+  // use up pre-allocated node
   selectNode(node, frag.m_freeLoc);
-  frag.m_freeLoc = node.getLink(0);
+  frag.m_freeLoc = NullTupLoc;
   new (node.m_node) TreeNode();
 #ifdef VM_TRACE
   TreeHead& tree = frag.m_tree;
@@ -90,19 +108,44 @@ Dbtux::insertNode(NodeHandle& node)
 }
 
 /*
- * Delete existing node.  Simply put it on the freelist.
+ * Delete existing node.  Make it the pre-allocated free node if there
+ * is none.  Otherwise return it to fragment's free list.
  */
 void
 Dbtux::deleteNode(NodeHandle& node)
 {
   Frag& frag = node.m_frag;
   ndbrequire(node.getOccup() == 0);
-  // link to freelist
-  node.setLink(0, frag.m_freeLoc);
-  frag.m_freeLoc = node.m_loc;
-  // invalidate the handle
-  node.m_loc = NullTupLoc;
-  node.m_node = 0;
+  if (frag.m_freeLoc == NullTupLoc)
+  {
+    jam();
+    frag.m_freeLoc = node.m_loc;
+    // invalidate the handle
+    node.m_loc = NullTupLoc;
+    node.m_node = 0;
+  }
+  else
+  {
+    jam();
+    freeNode(node);
+  }
+}
+
+/*
+ * Free the pre-allocated node, called when tree is empty.  This avoids
+ * leaving any used pages in DataMemory.
+ */
+void
+Dbtux::freePreallocatedNode(Frag& frag)
+{
+  if (frag.m_freeLoc != NullTupLoc)
+  {
+    jam();
+    NodeHandle node(frag);
+    selectNode(node, frag.m_freeLoc);
+    freeNode(node);
+    frag.m_freeLoc = NullTupLoc;
+  }
 }
 
 /*

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp	2010-09-24 11:06:19 +0000
@@ -328,6 +328,8 @@ Dbtux::treeRemoveNode(Frag& frag, NodeHa
   }
   // tree is now empty
   tree.m_root = NullTupLoc;
+  // free even the pre-allocated node
+  freePreallocatedNode(frag);
 }
 
 /*

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2010-09-20 12:44:28 +0000
@@ -31,6 +31,10 @@
 #include <signaldata/FsOpenReq.hpp>
 #include <signaldata/FsReadWriteReq.hpp>
 #include <Configuration.hpp>
+#include <NdbSleep.h>
+
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
 
 const char *actionName[] = {
   "open",
@@ -220,6 +224,23 @@ AsyncFile::run()
     case Request::buildindx:
       buildIndxReq(request);
       break;
+    case Request::suspend:
+    {
+      if (request->par.suspend.milliseconds)
+      {
+        g_eventLogger->debug("%s suspend: %u milliseconds",
+                             theFileName.c_str(),
+                             request->par.suspend.milliseconds);
+        NdbSleep_MilliSleep(request->par.suspend.milliseconds);
+      }
+      else
+      {
+        g_eventLogger->debug("%s suspend", theFileName.c_str());
+        endReq();
+        return;
+      }
+      continue;
+    }
     case Request:: end:
       if (isOpen())
         closeReq(request);

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2010-09-20 12:44:28 +0000
@@ -108,7 +108,8 @@ public:
     rmrf,
     readPartial,
     allocmem,
-    buildindx
+    buildindx,
+    suspend
   };
   Action action;
   union {
@@ -141,6 +142,9 @@ public:
     struct {
       struct mt_BuildIndxReq m_req;
     } build;
+    struct {
+      Uint32 milliseconds;
+    } suspend;
   } par;
   int error;
   

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2010-06-10 07:00:44 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2010-09-20 12:44:28 +0000
@@ -77,6 +77,7 @@ Ndbfs::Ndbfs(Block_context& ctx) :
   addRecSignal(GSN_SEND_PACKED, &Ndbfs::execSEND_PACKED, true);
   addRecSignal(GSN_BUILDINDXREQ, &Ndbfs::execBUILDINDXREQ);
    // Set send signals
+  addRecSignal(GSN_FSSUSPENDORD, &Ndbfs::execFSSUSPENDORD);
 
   theRequestPool = new Pool<Request>;
 }
@@ -700,6 +701,34 @@ Ndbfs::execFSSYNCREQ(Signal * signal)
   ndbrequire(forward(openFile,request));
 }
 
+/*
+ * PR0: File Pointer DR0: User reference DR1: User Pointer
+ */
+void
+Ndbfs::execFSSUSPENDORD(Signal * signal)
+{
+  jamEntry();
+  Uint16 filePointer =  (Uint16)signal->theData[0];
+  Uint32 millis = signal->theData[1];
+  AsyncFile* openFile = theOpenFiles.find(filePointer);
+
+  if (openFile == NULL)
+  {
+    jam(); //file not open
+    return;
+  }
+
+  Request *request = theRequestPool->get();
+  request->error = 0;
+  request->action = Request::suspend;
+  request->set(0, 0, filePointer);
+  request->file = openFile;
+  request->theTrace = signal->getTrace();
+  request->par.suspend.milliseconds = millis;
+
+  ndbrequire(forward(openFile,request));
+}
+
 void 
 Ndbfs::execFSAPPENDREQ(Signal * signal)
 {
@@ -985,6 +1014,7 @@ Ndbfs::report(Request * request, Signal*
     }
     
     case Request:: end: {
+    case Request:: suspend:
       // Report nothing
       break;
     }
@@ -1079,6 +1109,7 @@ Ndbfs::report(Request * request, Signal*
       break;
     }
     case Request:: end: {
+    case Request:: suspend:
       // Report nothing
       break;
     }

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2010-09-20 12:44:28 +0000
@@ -58,6 +58,7 @@ protected:
   void execALLOC_MEM_REQ(Signal* signal);
   void execSEND_PACKED(Signal*);
   void execBUILDINDXREQ(Signal* signal);
+  void execFSSUSPENDORD(Signal*);
 
   Uint16 newId();
 
@@ -123,6 +124,7 @@ protected:
   void execSTTOR(Signal* signal);
   void execALLOC_MEM_REQ(Signal*);
   void execSEND_PACKED(Signal*);
+  void execFSSUSPENDORD(Signal*);
 
 private:
   // Declared but not defined

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp	2010-08-20 10:18:47 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp	2010-09-20 12:44:28 +0000
@@ -53,7 +53,7 @@ VoidFs::VoidFs(Block_context & ctx) :
   addRecSignal(GSN_FSSYNCREQ, &VoidFs::execFSSYNCREQ, true);
   addRecSignal(GSN_FSAPPENDREQ, &VoidFs::execFSAPPENDREQ, true);
   addRecSignal(GSN_FSREMOVEREQ, &VoidFs::execFSREMOVEREQ, true);
-
+  addRecSignal(GSN_FSSUSPENDORD, &VoidFs::execFSSUSPENDORD, true);
    // Set send signals
 }
 
@@ -213,6 +213,15 @@ VoidFs::execFSAPPENDREQ(Signal * signal)
   sendSignal(userRef, GSN_FSAPPENDCONF, signal, 2, JBB);
 }
 
+/*
+ * PR0: File Pointer DR0: User reference DR1: User Pointer
+ */
+void
+VoidFs::execFSSUSPENDORD(Signal * signal)
+{
+  jamEntry();
+}
+
 void
 VoidFs::execDUMP_STATE_ORD(Signal* signal)
 {

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2010-09-06 08:14:08 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2010-09-29 07:36:31 +0000
@@ -1101,10 +1101,10 @@ Suma::execNODE_FAILREP(Signal* signal){
 	  progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 
 		    "Nodefailure during SUMA takeover");
 	}
-        else if (state & Bucket::BUCKET_SHUTDOWN)
+        else if (state & Bucket::BUCKET_SHUTDOWN_TO)
         {
           jam();
-          c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN);
+          c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO);
           m_switchover_buckets.clear(i);
           ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId());
           start_resend(signal, i);
@@ -3569,7 +3569,11 @@ Suma::get_responsible_node(Uint32 bucket
 bool
 Suma::check_switchover(Uint32 bucket, Uint64 gci)
 {
-  const Uint32 send_mask = (Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER);
+  const Uint32 send_mask = 
+    Bucket::BUCKET_STARTING |
+    Bucket::BUCKET_TAKEOVER |
+    Bucket::BUCKET_SHUTDOWN_TO;
+
   bool send = c_buckets[bucket].m_state & send_mask;
   ndbassert(m_switchover_buckets.get(bucket));
   if(unlikely(gci > c_buckets[bucket].m_switchover_gci))
@@ -3864,24 +3868,26 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
 	}
         else if (state & Bucket::BUCKET_SHUTDOWN)
         {
+          jam();
           Uint32 nodeId = c_buckets[i].m_switchover_node;
-          if (nodeId == getOwnNodeId())
-          {
-            jam();
-            m_active_buckets.clear(i);
-            shutdown_nodes.set(nodeId);
-          }
-          else
-          {
-            jam();
-            NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
-            nodegroup.clear(nodeId);
-            ndbrequire(get_responsible_node(i) == nodeId &&
-                       get_responsible_node(i, nodegroup) == getOwnNodeId());
-            m_active_buckets.set(i);
-            takeover_nodes.set(nodeId);
-          }
+          ndbrequire(nodeId == getOwnNodeId());
+          m_active_buckets.clear(i);
+          shutdown_nodes.set(nodeId);
           c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN;
+          ndbout_c(" shutdown");
+        }
+        else if (state & Bucket::BUCKET_SHUTDOWN_TO)
+        {
+          jam();
+          Uint32 nodeId = c_buckets[i].m_switchover_node;
+          NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
+          nodegroup.clear(nodeId);
+          ndbrequire(get_responsible_node(i) == nodeId &&
+                     get_responsible_node(i, nodegroup) == getOwnNodeId());
+          m_active_buckets.set(i);
+          takeover_nodes.set(nodeId);
+          c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO;
+          ndbout_c(" shutdown_to");
         }
 	else
 	{
@@ -4880,7 +4886,7 @@ Suma::execSUMA_HANDOVER_REQ(Signal* sign
         tmp.set(i);
         m_switchover_buckets.set(i);
         c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
-        c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN;
+        c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN_TO;
         c_buckets[i].m_switchover_node = nodeId;
         ndbout_c("prepare to takeover bucket: %d", i);
       }

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2010-08-26 09:00:51 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2010-09-29 07:36:31 +0000
@@ -570,7 +570,8 @@ private:
       ,BUCKET_HANDOVER = 0x2 // On running node
       ,BUCKET_TAKEOVER = 0x4 // On takeing over node
       ,BUCKET_RESEND   = 0x8 // On takeing over node
-      ,BUCKET_SHUTDOWN = 0x10 // Graceful shutdown
+      ,BUCKET_SHUTDOWN = 0x10 // Graceful shutdown (shutdown)
+      ,BUCKET_SHUTDOWN_TO = 0x20 // Graceful shutdown (take-over)
     };
     Uint16 m_state;
     Uint16 m_switchover_node;

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2010-08-17 09:43:19 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2010-09-22 09:16:46 +0000
@@ -649,9 +649,11 @@ extern "C"
 void 
 handler_error(int signum){
   // only let one thread run shutdown
-  static long thread_id= 0;
+  static bool handling_error = false;
+  static pthread_t thread_id; // Valid when handling_error is true
 
-  if (thread_id != 0 && thread_id == my_thread_id())
+  if (handling_error &&
+      pthread_equal(thread_id, pthread_self()))
   {
     // Shutdown thread received signal
 #ifndef NDB_WIN32
@@ -664,7 +666,10 @@ handler_error(int signum){
   if(theShutdownMutex && NdbMutex_Trylock(theShutdownMutex) != 0)
     while(true)
       NdbSleep_MilliSleep(10);
-  thread_id= my_thread_id();
+
+  thread_id = pthread_self();
+  handling_error = true;
+
   g_eventLogger->info("Received signal %d. Running error handler.", signum);
   childReportSignal(signum);
   // restart the system

=== modified file 'storage/ndb/src/ndbapi/DictCache.cpp'
--- a/storage/ndb/src/ndbapi/DictCache.cpp	2009-08-25 19:44:04 +0000
+++ b/storage/ndb/src/ndbapi/DictCache.cpp	2010-09-22 12:06:29 +0000
@@ -361,6 +361,36 @@ GlobalDictCache::invalidate_all()
 }
 
 void
+GlobalDictCache::invalidateDb(const char * name, size_t len)
+{
+  DBUG_ENTER("GlobalDictCache::invalidateDb");
+  NdbElement_t<Vector<TableVersion> > * curr = m_tableHash.getNext(0);
+  while(curr != 0)
+  {
+    Vector<TableVersion> * vers = curr->theData;
+    if (vers->size())
+    {
+      TableVersion * ver = & vers->back();
+      if (ver->m_status != RETREIVING)
+      {
+        if (ver->m_impl->matchDb(name, len))
+        {
+          ver->m_impl->m_status = NdbDictionary::Object::Invalid;
+          ver->m_status = DROPPED;
+          if (ver->m_refCount == 0)
+          {
+            delete ver->m_impl;
+            vers->erase(vers->size() - 1);
+          }
+        }
+      }
+    }
+    curr = m_tableHash.getNext(curr);
+  }
+  DBUG_VOID_RETURN;
+}
+
+void
 GlobalDictCache::release(const NdbTableImpl * tab, int invalidate)
 {
   DBUG_ENTER("GlobalDictCache::release");

=== modified file 'storage/ndb/src/ndbapi/DictCache.hpp'
--- a/storage/ndb/src/ndbapi/DictCache.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/ndbapi/DictCache.hpp	2010-09-22 12:06:29 +0000
@@ -79,6 +79,7 @@ public:
 
   unsigned get_size();
   void invalidate_all();
+  void invalidateDb(const char * name, size_t len);
 public:
   enum Status {
     OK = 0,

=== modified file 'storage/ndb/src/ndbapi/NdbDictionary.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionary.cpp	2010-07-26 11:10:10 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionary.cpp	2010-09-22 12:06:29 +0000
@@ -2136,7 +2136,13 @@ NdbDictionary::Dictionary::invalidateInd
 int
 NdbDictionary::Dictionary::forceGCPWait()
 {
-  return m_impl.forceGCPWait();
+  return forceGCPWait(0);
+}
+
+int
+NdbDictionary::Dictionary::forceGCPWait(int type)
+{
+  return m_impl.forceGCPWait(type);
 }
 
 void
@@ -2563,3 +2569,14 @@ NdbDictionary::Dictionary::getUndofile(U
   return tmp;
 }
 
+void
+NdbDictionary::Dictionary::invalidateDbGlobal(const char * name)
+{
+  if (m_impl.m_globalHash && name != 0)
+  {
+    size_t len = strlen(name);
+    m_impl.m_globalHash->lock();
+    m_impl.m_globalHash->invalidateDb(name, len);
+    m_impl.m_globalHash->unlock();
+  }
+}

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2010-08-30 09:07:26 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2010-09-14 09:25:48 +0000
@@ -2276,7 +2276,7 @@ NdbDictInterface::dictSignal(NdbApiSigna
     }    
     
     m_error.code= 0;
-    int ret_val= poll_guard.wait_n_unlock(timeout, node, wst);
+    int ret_val= poll_guard.wait_n_unlock(timeout, node, wst, true);
     // End of Protected area  
     
     if(ret_val == 0 && m_error.code == 0){
@@ -5449,7 +5449,8 @@ NdbDictInterface::listObjects(NdbApiSign
     }
     m_error.code= 0;
     int ret_val= poll_guard.wait_n_unlock(DICT_WAITFOR_TIMEOUT,
-                                          aNodeId, WAIT_LIST_TABLES_CONF);
+                                          aNodeId, WAIT_LIST_TABLES_CONF,
+                                          true);
     // end protected
     if (ret_val == 0 && m_error.code == 0)
       return 0;
@@ -5555,42 +5556,74 @@ NdbDictInterface::execOLD_LIST_TABLES_CO
 }
 
 int
-NdbDictionaryImpl::forceGCPWait()
+NdbDictionaryImpl::forceGCPWait(int type)
 {
-  return m_receiver.forceGCPWait();
+  return m_receiver.forceGCPWait(type);
 }
 
 int
-NdbDictInterface::forceGCPWait()
+NdbDictInterface::forceGCPWait(int type)
 {
   NdbApiSignal tSignal(m_reference);
-  WaitGCPReq* const req = CAST_PTR(WaitGCPReq, tSignal.getDataPtrSend());
-  req->senderRef = m_reference;
-  req->senderData = 0;
-  req->requestType = WaitGCPReq::CompleteForceStart;
-  tSignal.theReceiversBlockNumber = DBDIH;
-  tSignal.theVerId_signalNumber = GSN_WAIT_GCP_REQ;
-  tSignal.theLength = WaitGCPReq::SignalLength;
-
-  const Uint32 RETRIES = 100;
-  for (Uint32 i = 0; i < RETRIES; i++)
+  if (type == 0)
   {
-    m_transporter->lock_mutex();
-    Uint16 aNodeId = m_transporter->get_an_alive_node();
-    if (aNodeId == 0) {
-      m_error.code= 4009;
+    WaitGCPReq* const req = CAST_PTR(WaitGCPReq, tSignal.getDataPtrSend());
+    req->senderRef = m_reference;
+    req->senderData = 0;
+    req->requestType = WaitGCPReq::CompleteForceStart;
+    tSignal.theReceiversBlockNumber = DBDIH;
+    tSignal.theVerId_signalNumber = GSN_WAIT_GCP_REQ;
+    tSignal.theLength = WaitGCPReq::SignalLength;
+
+    const Uint32 RETRIES = 100;
+    for (Uint32 i = 0; i < RETRIES; i++)
+    {
+      m_transporter->lock_mutex();
+      Uint16 aNodeId = m_transporter->get_an_alive_node();
+      if (aNodeId == 0) {
+        m_error.code= 4009;
+        m_transporter->unlock_mutex();
+        return -1;
+      }
+      if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
+        m_transporter->unlock_mutex();
+        continue;
+      }
+
+      m_error.code= 0;
+      m_waiter.m_node = aNodeId;
+      m_waiter.m_state = WAIT_LIST_TABLES_CONF;
+      m_waiter.wait(DICT_WAITFOR_TIMEOUT);
       m_transporter->unlock_mutex();
-      return -1;
+      return 0;
     }
-    if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
+    return -1;
+  }
+  else if (type == 1)
+  {
+    tSignal.getDataPtrSend()[0] = 6099;
+    tSignal.theReceiversBlockNumber = DBDIH;
+    tSignal.theVerId_signalNumber = GSN_DUMP_STATE_ORD;
+    tSignal.theLength = 1;
+
+    const Uint32 RETRIES = 100;
+    for (Uint32 i = 0; i < RETRIES; i++)
+    {
+      m_transporter->lock_mutex();
+      Uint16 aNodeId = m_transporter->get_an_alive_node();
+      if (aNodeId == 0) {
+        m_error.code= 4009;
+        m_transporter->unlock_mutex();
+        return -1;
+      }
+      if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
+        m_transporter->unlock_mutex();
+        continue;
+      }
+
+      m_transporter->forceSend(refToBlock(m_reference));
       m_transporter->unlock_mutex();
-      continue;
     }
-    m_error.code= 0;
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
-    m_waiter.wait(DICT_WAITFOR_TIMEOUT);
-    m_transporter->unlock_mutex();    
     return 0;
   }
   return -1;

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2010-07-26 11:10:10 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2010-09-22 12:06:29 +0000
@@ -189,6 +189,8 @@ public:
   const char * getMysqlName() const;
   int updateMysqlName();
 
+  bool matchDb(const char * name, size_t len) const;
+
   int aggregate(NdbError& error);
   int validate(NdbError& error);
 
@@ -601,7 +603,7 @@ public:
 			  LinearSectionPtr ptr[3],
 			  Uint32 noOfSections, bool fullyQualifiedNames);
 
-  int forceGCPWait();
+  int forceGCPWait(int type);
 
   static int parseTableInfo(NdbTableImpl ** dst, 
 			    const Uint32 * data, Uint32 len,
@@ -750,7 +752,7 @@ public:
   int executeSubscribeEvent(NdbEventOperationImpl &);
   int stopSubscribeEvent(NdbEventOperationImpl &);
 
-  int forceGCPWait();
+  int forceGCPWait(int type);
 
   int listObjects(List& list, NdbDictionary::Object::Type type, 
                   bool fullyQualified);
@@ -1065,6 +1067,15 @@ NdbTableImpl::getMysqlName() const
 }
 
 inline
+bool
+NdbTableImpl::matchDb(const char * name, size_t len) const
+{
+  return 
+    len < m_internalName.length() &&
+    memcmp(name, m_internalName.c_str(), len) == 0;
+}
+
+inline
 Uint32
 Hash( const char* str ){
   Uint32 h = 0;

=== modified file 'storage/ndb/test/include/NdbRestarter.hpp'
--- a/storage/ndb/test/include/NdbRestarter.hpp	2010-02-18 23:01:15 +0000
+++ b/storage/ndb/test/include/NdbRestarter.hpp	2010-09-20 12:44:28 +0000
@@ -94,6 +94,8 @@ public:
   int getMasterNodeVersion(int& version);
   int getNodeTypeVersionRange(ndb_mgm_node_type type, int& minVer, int& maxVer);
   
+  int getNodeStatus(int nodeId); // return NDB_MGM_NODE_STATUS_*
+
   NdbMgmHandle handle;  
 
   enum NodeSelector

=== modified file 'storage/ndb/test/ndbapi/testIndex.cpp'
--- a/storage/ndb/test/ndbapi/testIndex.cpp	2010-03-19 12:27:49 +0000
+++ b/storage/ndb/test/ndbapi/testIndex.cpp	2010-09-24 11:01:03 +0000
@@ -24,6 +24,9 @@
 #include <NdbRestarts.hpp>
 #include <Vector.hpp>
 #include <signaldata/DumpStateOrd.hpp>
+#include <NodeBitmask.hpp>
+#include <NdbSqlUtil.hpp>
+#include <BlockNumbers.h>
 
 #define CHECK(b) if (!(b)) { \
   g_err << "ERR: "<< step->getName() \
@@ -2319,6 +2322,250 @@ runBug50118(NDBT_Context* ctx, NDBT_Step
   return NDBT_OK;
 }
 
+// bug#56829
+
+#undef CHECK2 // previous no good
+#define CHECK2(b, e) \
+  if (!(b)) { \
+    g_err << "ERR: " << #b << " failed at line " << __LINE__ \
+          << ": " << e << endl; \
+    result = NDBT_FAILED; \
+    break; \
+  }
+
+static int
+get_data_memory_pages(NdbMgmHandle h, NdbNodeBitmask dbmask, int* pages_out)
+{
+  int result = NDBT_OK;
+  int pages = 0;
+
+  while (1)
+  {
+    // sends dump 1000 and retrieves all replies
+    ndb_mgm_events* e = 0;
+    CHECK2((e = ndb_mgm_dump_events(h, NDB_LE_MemoryUsage, 0, 0)) != 0, ndb_mgm_get_latest_error_msg(h));
+
+    // sum up pages (also verify sanity)
+    for (int i = 0; i < e->no_of_events; i++)
+    {
+      ndb_logevent* le = &e->events[i];
+      CHECK2(le->type == NDB_LE_MemoryUsage, "bad event type " << le->type);
+      const ndb_logevent_MemoryUsage* lem = &le->MemoryUsage;
+      if (lem->block != DBTUP)
+        continue;
+      int nodeId = le->source_nodeid;
+      CHECK2(dbmask.get(nodeId), "duplicate event from node " << nodeId);
+      dbmask.clear(nodeId);
+      pages += lem->pages_used;
+      g_info << "i:" << i << " node:" << le->source_nodeid << " pages:" << lem->pages_used << endl;
+    }
+    free(e);
+    CHECK2(result == NDBT_OK, "failed");
+
+    char buf[NdbNodeBitmask::TextLength + 1];
+    CHECK2(dbmask.isclear(), "no response from nodes " << dbmask.getText(buf));
+    break;
+  }
+
+  *pages_out = pages;
+  return result;
+}
+
+int
+runBug56829(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  NdbDictionary::Dictionary* pDic = pNdb->getDictionary();
+  const int loops = ctx->getNumLoops();
+  int result = NDBT_OK;
+  const NdbDictionary::Table tab(*ctx->getTab());
+  const int rows = ctx->getNumRecords();
+  const char* mgm = ctx->getRemoteMgm();
+
+  char tabname[100];
+  strcpy(tabname, tab.getName());
+  char indname[100];
+  strcpy(indname, tabname);
+  strcat(indname, "X1");
+
+  (void)pDic->dropTable(tabname);
+
+  NdbMgmHandle h = 0;
+  NdbNodeBitmask dbmask;
+  // entry n marks if row with PK n exists
+  char* rowmask = new char [rows];
+  memset(rowmask, 0, rows);
+  int loop = 0;
+  while (loop < loops)
+  {
+    CHECK2(rows > 0, "rows must be != 0");
+    g_info << "loop " << loop << "<" << loops << endl;
+
+    // at first loop connect to mgm
+    if (loop == 0)
+    {
+      CHECK2((h = ndb_mgm_create_handle()) != 0, "mgm: failed to create handle");
+      CHECK2(ndb_mgm_set_connectstring(h, mgm) == 0, ndb_mgm_get_latest_error_msg(h));
+      CHECK2(ndb_mgm_connect(h, 0, 0, 0) == 0, ndb_mgm_get_latest_error_msg(h));
+      g_info << "mgm: connected to " << (mgm ? mgm : "default") << endl;
+
+      // make bitmask of DB nodes
+      dbmask.clear();
+      ndb_mgm_cluster_state* cs = 0;
+      CHECK2((cs = ndb_mgm_get_status(h)) != 0, ndb_mgm_get_latest_error_msg(h));
+      for (int j = 0; j < cs->no_of_nodes; j++)
+      {
+        ndb_mgm_node_state* ns = &cs->node_states[j];
+        if (ns->node_type == NDB_MGM_NODE_TYPE_NDB)
+        {
+          CHECK2(ns->node_status == NDB_MGM_NODE_STATUS_STARTED, "node " << ns->node_id << " not started status " << ns->node_status);
+          CHECK2(!dbmask.get(ns->node_id), "duplicate node id " << ns->node_id);
+          dbmask.set(ns->node_id);
+          g_info << "added DB node " << ns->node_id << endl;
+        }
+      }
+      free(cs);
+      CHECK2(result == NDBT_OK, "some DB nodes are not started");
+      CHECK2(!dbmask.isclear(), "found no DB nodes");
+    }
+
+    // data memory pages after following events
+    // 0-initial 1,2-create table,index 3-load 4-delete 5,6-drop index,table
+    int pages[7];
+
+    // initial
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[0]) == NDBT_OK, "failed");
+    g_info << "initial pages " << pages[0] << endl;
+
+    // create table
+    g_info << "create table " << tabname << endl;
+    const NdbDictionary::Table* pTab = 0;
+    CHECK2(pDic->createTable(tab) == 0, pDic->getNdbError());
+    CHECK2((pTab = pDic->getTable(tabname)) != 0, pDic->getNdbError());
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[1]) == NDBT_OK, "failed");
+    g_info << "create table pages " << pages[1] << endl;
+
+    // choice of index attributes is not relevant to this bug
+    // choose one non-PK updateable column
+    NdbDictionary::Index ind;
+    ind.setName(indname);
+    ind.setTable(tabname);
+    ind.setType(NdbDictionary::Index::OrderedIndex);
+    ind.setLogging(false);
+    {
+      HugoCalculator calc(*pTab);
+      for (int j = 0; j < pTab->getNoOfColumns(); j++)
+      {
+        const NdbDictionary::Column* col = pTab->getColumn(j);
+        if (col->getPrimaryKey() || calc.isUpdateCol(j))
+          continue;
+        CHARSET_INFO* cs = col->getCharset();
+        if (NdbSqlUtil::check_column_for_ordered_index(col->getType(), col->getCharset()) == 0)
+        {
+          ind.addColumn(*col);
+          break;
+        }
+      }
+    }
+    CHECK2(ind.getNoOfColumns() == 1, "cannot use table " << tabname);
+
+    // create index
+    g_info << "create index " << indname << " on " << ind.getColumn(0)->getName() << endl;
+    const NdbDictionary::Index* pInd = 0;
+    CHECK2(pDic->createIndex(ind, *pTab) == 0, pDic->getNdbError());
+    CHECK2((pInd = pDic->getIndex(indname, tabname)) != 0, pDic->getNdbError());
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[2]) == NDBT_OK, "failed");
+    g_info << "create index pages " << pages[2] << endl;
+
+    HugoTransactions trans(*pTab);
+
+    // load all records
+    g_info << "load records" << endl;
+    CHECK2(trans.loadTable(pNdb, rows) == 0, trans.getNdbError());
+    memset(rowmask, 1, rows);
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[3]) == NDBT_OK, "failed");
+    g_info << "load records pages " << pages[3] << endl;
+
+    // test index with random ops
+    g_info << "test index ops" << endl;
+    {
+      HugoOperations ops(*pTab);
+      for (int i = 0; i < rows; i++)
+      {
+        CHECK2(ops.startTransaction(pNdb) == 0, ops.getNdbError());
+        for (int j = 0; j < 32; j++)
+        {
+          int n = rand() % rows;
+          if (!rowmask[n])
+          {
+            CHECK2(ops.pkInsertRecord(pNdb, n) == 0, ops.getNdbError());
+            rowmask[n] = 1;
+          }
+          else if (rand() % 2 == 0)
+          {
+            CHECK2(ops.pkDeleteRecord(pNdb, n) == 0, ops.getNdbError());
+            rowmask[n] = 0;
+          }
+          else
+          {
+            CHECK2(ops.pkUpdateRecord(pNdb, n) == 0, ops.getNdbError());
+          }
+        }
+        CHECK2(result == NDBT_OK, "index ops batch failed");
+        CHECK2(ops.execute_Commit(pNdb) == 0, ops.getNdbError());
+        ops.closeTransaction(pNdb);
+      }
+      CHECK2(result == NDBT_OK, "index ops failed");
+    }
+
+    // delete all records
+    g_info << "delete records" << endl;
+    CHECK2(trans.clearTable(pNdb) == 0, trans.getNdbError());
+    memset(rowmask, 0, rows);
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[4]) == NDBT_OK, "failed");
+    g_info << "delete records pages " << pages[4] << endl;
+
+    // drop index
+    g_info << "drop index" <<  endl;
+    CHECK2(pDic->dropIndex(indname, tabname) == 0, pDic->getNdbError());
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[5]) == NDBT_OK, "failed");
+    g_info << "drop index pages " << pages[5] << endl;
+
+    // drop table
+    g_info << "drop table" << endl;
+    CHECK2(pDic->dropTable(tabname) == 0, pDic->getNdbError());
+    CHECK2(get_data_memory_pages(h, dbmask, &pages[6]) == NDBT_OK, "failed");
+    g_info << "drop table pages " << pages[6] << endl;
+
+    // verify
+    CHECK2(pages[1] == pages[0], "pages after create table " << pages[1]
+                                  << " not == initial pages " << pages[0]);
+    CHECK2(pages[2] == pages[0], "pages after create index " << pages[2]
+                                  << " not == initial pages " << pages[0]);
+    CHECK2(pages[3] >  pages[0], "pages after load " << pages[3]
+                                  << " not >  initial pages " << pages[0]);
+    CHECK2(pages[4] == pages[0], "pages after delete " << pages[4]
+                                  << " not == initial pages " << pages[0]);
+    CHECK2(pages[5] == pages[0], "pages after drop index " << pages[5]
+                                  << " not == initial pages " << pages[0]);
+    CHECK2(pages[6] == pages[0], "pages after drop table " << pages[6]
+                                  << " not == initial pages " << pages[0]);
+
+    loop++;
+
+    // at last loop disconnect from mgm
+    if (loop == loops)
+    {
+      CHECK2(ndb_mgm_disconnect(h) == 0, ndb_mgm_get_latest_error_msg(h));
+      ndb_mgm_destroy_handle(&h);
+      g_info << "mgm: disconnected" << endl;
+    }
+  }
+  delete [] rowmask;
+
+  return result;
+}
+
 
 NDBT_TESTSUITE(testIndex);
 TESTCASE("CreateAll", 
@@ -2710,6 +2957,11 @@ TESTCASE("Bug50118", ""){
   FINALIZER(createPkIndex_Drop);
   FINALIZER(runClearTable);
 }
+TESTCASE("Bug56829",
+         "Return empty ordered index nodes to index fragment "
+         "so that empty fragment pages can be freed"){
+  STEP(runBug56829);
+}
 NDBT_TESTSUITE_END(testIndex);
 
 int main(int argc, const char** argv){

=== modified file 'storage/ndb/test/ndbapi/testRestartGci.cpp'
--- a/storage/ndb/test/ndbapi/testRestartGci.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/test/ndbapi/testRestartGci.cpp	2010-09-15 09:41:50 +0000
@@ -76,25 +76,16 @@ int runInsertRememberGci(NDBT_Context* c
 
     CHECK(hugoOps.closeTransaction(pNdb) == 0);
     i++;
+    /* Sleep so that records will have > 1 GCI between them */
+    NdbSleep_MilliSleep(10);
   };
 
   return result;
 }
 
-int runRestartGciControl(NDBT_Context* ctx, NDBT_Step* step){
-  int records = ctx->getNumRecords();
+int runRestart(NDBT_Context* ctx, NDBT_Step* step){
   Ndb* pNdb = GETNDB(step);
-  UtilTransactions utilTrans(*ctx->getTab());
   NdbRestarter restarter;
-  
-  // Wait until we have enough records in db
-  int count = 0;
-  while (count < records){
-    if (utilTrans.selectCount(pNdb, 64, &count) != 0){
-      ctx->stopTest();
-      return NDBT_FAILED;
-    }
-  }
 
   // Restart cluster with abort
   if (restarter.restartAll(false, false, true) != 0){
@@ -102,9 +93,6 @@ int runRestartGciControl(NDBT_Context* c
     return NDBT_FAILED;
   }
 
-  // Stop the other thread
-  ctx->stopTest();
-
   if (restarter.waitClusterStarted(300) != 0){
     return NDBT_FAILED;
   }
@@ -116,6 +104,27 @@ int runRestartGciControl(NDBT_Context* c
   return NDBT_OK;
 }
 
+int runRestartGciControl(NDBT_Context* ctx, NDBT_Step* step){
+  int records = ctx->getNumRecords();
+  Ndb* pNdb = GETNDB(step);
+  UtilTransactions utilTrans(*ctx->getTab());
+  
+  // Wait until we have enough records in db
+  int count = 0;
+  while (count < records){
+    if (utilTrans.selectCount(pNdb, 64, &count) != 0){
+      ctx->stopTest();
+      return NDBT_FAILED;
+    }
+    NdbSleep_MilliSleep(10);
+  }
+
+  // Stop the other thread
+  ctx->stopTest();
+
+  return runRestart(ctx,step);
+}
+
 int runVerifyInserts(NDBT_Context* ctx, NDBT_Step* step){
   int result = NDBT_OK;
   Ndb* pNdb = GETNDB(step);
@@ -147,9 +156,19 @@ int runVerifyInserts(NDBT_Context* ctx, 
 
   // RULE2: The records found in db should have same or lower 
   // gci as in the vector
+  int recordsWithIncorrectGci = 0;
   for (i = 0; i < savedRecords.size(); i++){
     CHECK(hugoOps.startTransaction(pNdb) == 0);
+    /* First read of row to check contents */
     CHECK(hugoOps.pkReadRecord(pNdb, i) == 0);
+    /* Second read of row to get GCI */
+    NdbTransaction* trans = hugoOps.getTransaction();
+    NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
+    CHECK(readOp != NULL);
+    CHECK(readOp->readTuple() == 0);
+    CHECK(hugoOps.equalForRow(readOp, i) == 0);
+    NdbRecAttr* rowGci = readOp->getValue(NdbDictionary::Column::ROW_GCI);
+    CHECK(rowGci != NULL);
     if (hugoOps.execute_Commit(pNdb) != 0){
       // Record was not found in db'
 
@@ -158,6 +177,14 @@ int runVerifyInserts(NDBT_Context* ctx, 
 	ndbout << "ERR: Record "<<i<<" should have existed" << endl;
 	result = NDBT_FAILED;
       }
+      else
+      {
+        /* It didn't exist, but that was expected.
+         * Let's disappear it, so that it doesn't cause confusion
+         * after further restarts.
+         */
+        savedRecords[i].m_gci = (Uint32(1) << 31) -1; // Big number
+      }
     } else {
       // Record was found in db
       BaseString str = hugoOps.getRecordStr(0);
@@ -166,11 +193,19 @@ int runVerifyInserts(NDBT_Context* ctx, 
 	ndbout << "ERR: Record "<<i<<" str did not match "<< endl;
 	result = NDBT_FAILED;
       }
-      // Check record gci
+      // Check record gci in range
       if (savedRecords[i].m_gci > restartGCI){
 	ndbout << "ERR: Record "<<i<<" should not have existed" << endl;
 	result = NDBT_FAILED;
       }
+      // Check record gci is exactly correct
+      if (savedRecords[i].m_gci != rowGci->int32_value()){
+        ndbout << "ERR: Record "<<i<<" should have GCI " <<
+          savedRecords[i].m_gci << ", but has " << 
+          rowGci->int32_value() << endl;
+        recordsWithIncorrectGci++;
+        result = NDBT_FAILED;
+      }
     }
 
     CHECK(hugoOps.closeTransaction(pNdb) == 0);    
@@ -184,6 +219,9 @@ int runVerifyInserts(NDBT_Context* ctx, 
   ndbout << "There are " << recordsWithLowerOrSameGci 
 	 << " records with lower or same gci than " << restartGCI <<  endl;
   
+  ndbout << "There are " << recordsWithIncorrectGci
+         << " records with incorrect Gci on recovery." << endl;
+
   return result;
 }
 
@@ -212,6 +250,11 @@ TESTCASE("InsertRestartGci", 
   STEP(runInsertRememberGci);
   STEP(runRestartGciControl);
   VERIFIER(runVerifyInserts);
+  /* Restart again - LCP after first restart will mean that this
+   * time we recover from LCP, not Redo
+   */
+  VERIFIER(runRestart);
+  VERIFIER(runVerifyInserts);  // Check GCIs again
   FINALIZER(runClearTable);
 }
 NDBT_TESTSUITE_END(testRestartGci);

=== modified file 'storage/ndb/test/ndbapi/testSystemRestart.cpp'
--- a/storage/ndb/test/ndbapi/testSystemRestart.cpp	2010-06-18 10:50:09 +0000
+++ b/storage/ndb/test/ndbapi/testSystemRestart.cpp	2010-09-23 06:17:24 +0000
@@ -2196,6 +2196,52 @@ runBug54611(NDBT_Context* ctx, NDBT_Step
   return NDBT_OK;
 }
 
+int
+runBug56961(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbRestarter res;
+  Uint32 loops = ctx->getNumLoops();
+  Ndb* pNdb = GETNDB(step);
+  int rows = ctx->getNumRecords();
+
+  int node = res.getNode(NdbRestarter::NS_RANDOM);
+  int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
+  HugoTransactions hugoTrans(*ctx->getTab());
+
+  for (Uint32 l = 0; l<loops; l++)
+  {
+    ndbout_c("Waiting for %d to restart (5058)", node);
+    res.dumpStateOneNode(node, val2, 2);
+    res.insertErrorInNode(node, 5058);
+
+    hugoTrans.clearTable(pNdb);
+    hugoTrans.loadTable(pNdb, rows);
+    while (hugoTrans.scanUpdateRecords(pNdb, rows) == NDBT_OK &&
+           res.getNodeStatus(node) != NDB_MGM_NODE_STATUS_NOT_STARTED &&
+           res.getNodeStatus(node) != NDB_MGM_NODE_STATUS_NO_CONTACT);
+    res.waitNodesNoStart(&node, 1);
+    res.startNodes(&node, 1);
+    ndbout_c("Waiting for %d to start", node);
+    res.waitClusterStarted();
+
+    ndbout_c("Waiting for %d to restart (5059)", node);
+    res.dumpStateOneNode(node, val2, 2);
+    res.insertErrorInNode(node, 5059);
+
+    hugoTrans.clearTable(pNdb);
+    hugoTrans.loadTable(pNdb, rows);
+    while (hugoTrans.scanUpdateRecords(pNdb, rows) == NDBT_OK &&
+           res.getNodeStatus(node) != NDB_MGM_NODE_STATUS_NOT_STARTED &&
+           res.getNodeStatus(node) != NDB_MGM_NODE_STATUS_NO_CONTACT);
+    res.waitNodesNoStart(&node, 1);
+    res.startNodes(&node, 1);
+    ndbout_c("Waiting for %d to start", node);
+    res.waitClusterStarted();
+  }
+
+  return NDBT_OK;
+}
+
 NDBT_TESTSUITE(testSystemRestart);
 TESTCASE("SR1", 
 	 "Basic system restart test. Focus on testing restart from REDO log.\n"
@@ -2514,6 +2560,11 @@ TESTCASE("Bug54611", "")
   INITIALIZER(runLoadTable);
   INITIALIZER(runBug54611);
 }
+TESTCASE("Bug56961", "")
+{
+  INITIALIZER(runLoadTable);
+  INITIALIZER(runBug56961);
+}
 NDBT_TESTSUITE_END(testSystemRestart);
 
 int main(int argc, const char** argv){

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2010-09-06 08:14:08 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2010-09-24 12:16:36 +0000
@@ -1471,3 +1471,7 @@ max-time: 300
 cmd: testIndex
 args: -n ConstraintDetails
 
+max-time: 300
+cmd: testIndex
+args: -n Bug56829 T1
+

=== modified file 'storage/ndb/test/src/NdbRestarter.cpp'
--- a/storage/ndb/test/src/NdbRestarter.cpp	2010-02-18 23:01:15 +0000
+++ b/storage/ndb/test/src/NdbRestarter.cpp	2010-09-20 12:44:28 +0000
@@ -953,4 +953,18 @@ NdbRestarter::getNodeTypeVersionRange(nd
   return 0;
 }
 
+int
+NdbRestarter::getNodeStatus(int nodeid)
+{
+  if (getStatus() != 0)
+    return -1;
+
+  for (size_t n = 0; n < ndbNodes.size(); n++)
+  {
+    if (ndbNodes[n].node_id == nodeid)
+      return ndbNodes[n].node_status;
+  }
+  return -1;
+}
+
 template class Vector<ndb_mgm_node_state>;

Thread
bzr commit into mysql-5.1-telco-6.3 branch (Martin.Skold:3288) Bug#51310Bug#55641 Bug#56116 Bug#56763 Bug#56770 Bug#56829 Bug#56840 Bug#56841Bug#56890...Martin Skold29 Sep