List:Commits« Previous MessageNext Message »
From:jonas Date:March 10 2007 12:02pm
Subject:bk commit into 5.1 tree (jonas:1.2326)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-03-10 12:02:50+01:00, jonas@stripped +34 -0
  Merge perch.ndb.mysql.com:/home/jonas/src/51-telco
  into  perch.ndb.mysql.com:/home/jonas/src/51-dynattr
  MERGE: 1.2321.1.143

  sql/ha_ndbcluster.cc@stripped, 2007-03-10 11:59:06+01:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.362.1.56

  storage/ndb/include/kernel/AttributeDescriptor.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.9.1.1

  storage/ndb/include/kernel/signaldata/AlterTab.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.4.1.1

  storage/ndb/include/kernel/signaldata/AlterTable.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.10.1.2

  storage/ndb/include/kernel/signaldata/DictTabInfo.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.32.1.4

  storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.85.1.4

  storage/ndb/include/util/UtilBuffer.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.6.1.2

  storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp@stripped, 2007-03-10
11:59:07+01:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.19.1.1

  storage/ndb/src/common/util/SimpleProperties.cpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.9.1.3

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.104.1.15

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.43.1.6

  storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.4.1.1

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.50.1.12

  storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp@stripped, 2007-03-10 12:02:47+01:00,
jonas@stripped +0 -2
    merge
    MERGE: 1.12.1.4

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.16.1.7

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2007-03-10 11:59:07+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.46.1.10

  storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp@stripped, 2007-03-10 11:59:08+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.31.1.6

  storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp@stripped, 2007-03-10 11:59:08+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.20.1.2

  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp@stripped, 2007-03-10 11:59:08+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.31.1.3

  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2007-03-10 12:02:47+01:00,
jonas@stripped +0 -2
    merge
    MERGE: 1.28.1.5

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2007-03-10 11:59:08+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.15.1.5

  storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp@stripped, 2007-03-10 11:59:08+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.5.1.1

  storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -12
    Auto merged
    MERGE: 1.25.1.2

  storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.4.1.2

  storage/ndb/src/kernel/vm/DLFifoList.hpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.10.1.1

  storage/ndb/src/ndbapi/NdbDictionary.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.64.1.5

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.152.1.9

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.69.1.4

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.62.1.14

  storage/ndb/src/ndbapi/TransporterFacade.cpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.52.1.12

  storage/ndb/test/include/NDBT_Table.hpp@stripped, 2007-03-10 11:59:09+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.13.1.1

  storage/ndb/test/ndbapi/testDict.cpp@stripped, 2007-03-10 11:59:10+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.31.1.4

  storage/ndb/test/src/NDBT_Tables.cpp@stripped, 2007-03-10 11:59:10+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.22.1.1

  storage/ndb/tools/restore/Restore.cpp@stripped, 2007-03-10 11:59:10+01:00,
jonas@stripped +0 -0
    Auto merged
    MERGE: 1.40.1.9

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-dynattr/RESYNC

--- 1.5/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	2007-03-10 12:02:57 +01:00
+++ 1.6/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -164,7 +163,6 @@
   /**
    * TODO free fix + var part
    */
-  Uint32 page_idx= key->m_page_idx;
   Uint32 *ptr = ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
   Tuple_header* tuple = (Tuple_header*)ptr;
 

--- 1.10/storage/ndb/include/kernel/AttributeDescriptor.hpp	2007-03-10 12:02:57 +01:00
+++ 1.11/storage/ndb/include/kernel/AttributeDescriptor.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.5/storage/ndb/include/kernel/signaldata/AlterTab.hpp	2007-03-10 12:02:57 +01:00
+++ 1.6/storage/ndb/include/kernel/signaldata/AlterTab.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.11/storage/ndb/include/kernel/signaldata/AlterTable.hpp	2007-03-10 12:02:57 +01:00
+++ 1.12/storage/ndb/include/kernel/signaldata/AlterTable.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -230,7 +229,8 @@
     NullablePrimaryKey = 740,
     UnsupportedChange = 741,
     BackupInProgress = 762,
-    IncompatibleVersions = 763
+    IncompatibleVersions = 763,
+    SingleUser = 299
   };
 
 private:

--- 1.33/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp	2007-03-10 12:02:57 +01:00
+++ 1.34/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -47,17 +46,17 @@
 #endif
 
 #define DTIMAP(x, y, z) \
-  { DictTabInfo::y, offsetof(x, z), SimpleProperties::Uint32Value, 0, (~0), 0 }
+  { DictTabInfo::y, my_offsetof(x, z), SimpleProperties::Uint32Value, 0, (~0), 0 }
 
 #define DTIMAP2(x, y, z, u, v) \
-  { DictTabInfo::y, offsetof(x, z), SimpleProperties::Uint32Value, u, v, 0 }
+  { DictTabInfo::y, my_offsetof(x, z), SimpleProperties::Uint32Value, u, v, 0 }
 
 #define DTIMAPS(x, y, z, u, v) \
-  { DictTabInfo::y, offsetof(x, z), SimpleProperties::StringValue, u, v, 0 }
+  { DictTabInfo::y, my_offsetof(x, z), SimpleProperties::StringValue, u, v, 0 }
 
 #define DTIMAPB(x, y, z, u, v, l) \
-  { DictTabInfo::y, offsetof(x, z), SimpleProperties::BinaryValue, u, v, \
-                     offsetof(x, l) }
+  { DictTabInfo::y, my_offsetof(x, z), SimpleProperties::BinaryValue, u, v, \
+                     my_offsetof(x, l) }
 
 #define DTIBREAK(x) \
   { DictTabInfo::x, 0, SimpleProperties::InvalidValue, 0, 0, 0 }
@@ -604,17 +603,17 @@
 };
 
 #define DFGIMAP(x, y, z) \
-  { DictFilegroupInfo::y, offsetof(x, z), SimpleProperties::Uint32Value, 0, (~0), 0 }
+  { DictFilegroupInfo::y, my_offsetof(x, z), SimpleProperties::Uint32Value, 0, (~0), 0 }
 
 #define DFGIMAP2(x, y, z, u, v) \
-  { DictFilegroupInfo::y, offsetof(x, z), SimpleProperties::Uint32Value, u, v, 0 }
+  { DictFilegroupInfo::y, my_offsetof(x, z), SimpleProperties::Uint32Value, u, v, 0 }
 
 #define DFGIMAPS(x, y, z, u, v) \
-  { DictFilegroupInfo::y, offsetof(x, z), SimpleProperties::StringValue, u, v, 0 }
+  { DictFilegroupInfo::y, my_offsetof(x, z), SimpleProperties::StringValue, u, v, 0 }
 
 #define DFGIMAPB(x, y, z, u, v, l) \
-  { DictFilegroupInfo::y, offsetof(x, z), SimpleProperties::BinaryValue, u, v, \
-                     offsetof(x, l) }
+  { DictFilegroupInfo::y, my_offsetof(x, z), SimpleProperties::BinaryValue, u, v, \
+                     my_offsetof(x, l) }
 
 #define DFGIBREAK(x) \
   { DictFilegroupInfo::x, 0, SimpleProperties::InvalidValue, 0, 0, 0 }

--- 1.87/storage/ndb/include/ndbapi/NdbDictionary.hpp	2007-03-10 12:02:57 +01:00
+++ 1.88/storage/ndb/include/ndbapi/NdbDictionary.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -112,6 +111,7 @@
       IndexTrigger = 8,       ///< Index maintenance, internal
       SubscriptionTrigger = 9,///< Backup or replication, internal
       ReadOnlyConstraint = 10,///< Trigger, internal
+      TableEvent = 11,        ///< Table event
       Tablespace = 20,        ///< Tablespace
       LogfileGroup = 21,      ///< Logfile group
       Datafile = 22,          ///< Datafile
@@ -820,9 +820,9 @@
      */
     void setMaxLoadFactor(int);
 
-    void setTablespace(const char * name);
+    void setTablespaceName(const char * name);
+    const char * getTablespaceName() const;
     void setTablespace(const class Tablespace &);
-    const char * getTablespace() const;
     bool getTablespace(Uint32 *id= 0, Uint32 *version= 0) const;
 
     /**
@@ -1763,6 +1763,14 @@
      * @return an Event if successful, otherwise NULL.
      */
     const Event * getEvent(const char * eventName);
+
+    /**
+     * List defined events
+     * @param list   List of events returned in the dictionary
+     * @return 0 if successful otherwise -1.
+     */
+    int listEvents(List & list);
+    int listEvents(List & list) const;
 
     /** @} *******************************************************************/
 

--- 1.8/storage/ndb/include/util/UtilBuffer.hpp	2007-03-10 12:02:57 +01:00
+++ 1.9/storage/ndb/include/util/UtilBuffer.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.20/storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp	2007-03-10 12:02:57
+01:00
+++ 1.21/storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp	2007-03-10 12:02:57
+01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.12/storage/ndb/src/common/util/SimpleProperties.cpp	2007-03-10 12:02:57 +01:00
+++ 1.13/storage/ndb/src/common/util/SimpleProperties.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.41/storage/ndb/tools/restore/Restore.cpp	2007-03-10 12:02:57 +01:00
+++ 1.42/storage/ndb/tools/restore/Restore.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -28,6 +27,7 @@
 #include <NdbAutoPtr.hpp>
 
 #include "../../../../sql/ha_ndbcluster_tables.h"
+extern NdbRecordPrintFormat g_ndbrecord_print_format;
 
 Uint16 Twiddle16(Uint16 in); // Byte shift 16-bit data
 Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data
@@ -60,7 +60,12 @@
     return true;
   case 64:
     for(i = 0; i<arraySize; i++){
-      attr_data->u_int64_value[i] = Twiddle64(attr_data->u_int64_value[i]);
+      // allow unaligned
+      char* p = (char*)&attr_data->u_int64_value[i];
+      Uint64 x;
+      memcpy(&x, p, sizeof(Uint64));
+      x = Twiddle64(x);
+      memcpy(p, &x, sizeof(Uint64));
     }
     return true;
   default:
@@ -294,15 +299,23 @@
   Uint32 i;
   for (i = 0; i < getNoOfTables(); i++) {
     TableS* table = allTables[i];
+    table->m_local_id = i;
     const char* tableName = table->getTableName();
     if ( // XXX should use type
         strcmp(tableName, "SYSTAB_0") == 0 ||
         strcmp(tableName, "NDB$EVENTS_0") == 0 ||
         strcmp(tableName, "sys/def/SYSTAB_0") == 0 ||
         strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0 ||
-        strcmp(tableName, "cluster_replication/def/" NDB_APPLY_TABLE) == 0 ||
+        /*
+          The following is for old MySQL versions,
+           before we changed the database name of the tables from
+           "cluster_replication" -> "cluster" -> "mysql"
+        */
+        strcmp(tableName, "cluster_replication/def/" OLD_NDB_APPLY_TABLE) == 0 ||
+        strcmp(tableName, "cluster/def/" OLD_NDB_APPLY_TABLE) == 0 ||
         strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) == 0 ||
         strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE)== 0 )
+
       table->isSysTable = true;
   }
   for (i = 0; i < getNoOfTables(); i++) {
@@ -317,9 +330,10 @@
       Uint32 j;
       for (j = 0; j < getNoOfTables(); j++) {
         TableS* table = allTables[j];
-        if (table->getTableId() == id1) {
+        if (table->getTableId() == (Uint32) id1) {
           if (table->isSysTable)
             blobTable->isSysTable = true;
+          blobTable->m_main_table = table;
           break;
         }
       }
@@ -417,6 +431,7 @@
   m_noOfRecords= 0;
   backupVersion = version;
   isSysTable = false;
+  m_main_table = NULL;
   
   for (int i = 0; i < tableImpl->getNoOfColumns(); i++)
     createAttr(tableImpl->getColumn(i));
@@ -887,6 +902,7 @@
     return false;
   }
 
+  info.setLevel(254);
   info << "_____________________________________________________" << endl
        << "Processing data in table: " << m_currentTable->getTableName() 
        << "(" << Header.TableId << ") fragment " 
@@ -1144,14 +1160,14 @@
 
   if (data.null)
   {
-    ndbout << "<NULL>";
+    ndbout << g_ndbrecord_print_format.null_string;
     return ndbout;
   }
   
   NdbRecAttr tmprec(0);
-  tmprec.setup(desc.m_column, (char *)data.void_value);
+  tmprec.setup(desc.m_column, 0);
   tmprec.receive_data((Uint32*)data.void_value, data.size);
-  ndbout << tmprec;
+  ndbrecattr_print_formatted(ndbout, tmprec, g_ndbrecord_print_format);
 
   return ndbout;
 }
@@ -1160,17 +1176,15 @@
 NdbOut& 
 operator<<(NdbOut& ndbout, const TupleS& tuple)
 {
-  ndbout << tuple.getTable()->getTableName() << "; ";
   for (int i = 0; i < tuple.getNoOfAttributes(); i++) 
   {
+    if (i > 0)
+      ndbout << g_ndbrecord_print_format.fields_terminated_by;
     AttributeData * attr_data = tuple.getData(i);
     const AttributeDesc * attr_desc = tuple.getDesc(i);
     const AttributeS attr = {attr_desc, *attr_data};
     debug << i << " " << attr_desc->m_column->getName();
     ndbout << attr;
-    
-    if (i != (tuple.getNoOfAttributes() - 1))
-      ndbout << delimiter << " ";
   } // for
   return ndbout;
 }

--- 1.108/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-03-10 12:02:57 +01:00
+++ 1.109/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -189,7 +188,7 @@
     &Dbdict::drop_undofile_prepare_start, 0,
     0,
     0, 0,
-    0, 0
+    0, 0, 0
   }
 };
 
@@ -292,7 +291,6 @@
     for(; ok; ok = c_obj_hash.next(iter))
     {
       Rope name(c_rope_pool, iter.curr.p->m_name);
-      const Uint32 size = name.size();
       char buf[1024];
       name.copy(buf);
       ndbout_c("%s m_ref_count: %d", buf, iter.curr.p->m_ref_count); 
@@ -479,7 +477,7 @@
 		   CreateFragmentationReq::SignalLength);
     ndbrequire(signal->theData[0] == 0);
     Uint16 *data = (Uint16*)&signal->theData[25];
-    Uint32 count = 2 + data[0] * data[1];
+    Uint32 count = 2 + (1 + data[0]) * data[1];
     w.add(DictTabInfo::ReplicaDataLen, 2*count);
     for (Uint32 i = 0; i < count; i++)
       data[i] = htons(data[i]);
@@ -1811,7 +1809,7 @@
 void Dbdict::initNodeRecords() 
 {
   jam();
-  for (unsigned i = 1; i < MAX_NODES; i++) {
+  for (unsigned i = 1; i < MAX_NDB_NODES; i++) {
     NodeRecordPtr nodePtr;
     c_nodes.getPtr(nodePtr, i);
     nodePtr.p->hotSpare = false;
@@ -2058,7 +2056,7 @@
   c_attributeRecordPool.setSize(attributesize);
   c_attributeRecordHash.setSize(64);
   c_fsConnectRecordPool.setSize(ZFS_CONNECT_SIZE);
-  c_nodes.setSize(MAX_NODES);
+  c_nodes.setSize(MAX_NDB_NODES);
   c_pageRecordArray.setSize(ZNUMBER_OF_PAGES);
   c_schemaPageRecordArray.setSize(2 * NDB_SF_MAX_PAGES);
   c_tableRecordPool.setSize(tablerecSize);
@@ -2243,10 +2241,10 @@
     NodeRecordPtr nodePtr;
     c_nodes.getPtr(nodePtr, i);
 
-    if (NodeBitmask::get(readNodes->allNodes, i)) {
+    if (NdbNodeBitmask::get(readNodes->allNodes, i)) {
       jam();
       nodePtr.p->nodeState = NodeRecord::NDB_NODE_ALIVE;
-      if (NodeBitmask::get(readNodes->inactiveNodes, i)) {
+      if (NdbNodeBitmask::get(readNodes->inactiveNodes, i)) {
 	jam();
 	/**-------------------------------------------------------------------
 	 *
@@ -2275,7 +2273,7 @@
   jamEntry();
   HotSpareRep * const hotSpare = (HotSpareRep*)&signal->theData[0];
   for (unsigned i = 1; i < MAX_NDB_NODES; i++) {
-    if (NodeBitmask::get(hotSpare->theHotSpareNodes, i)) {
+    if (NdbNodeBitmask::get(hotSpare->theHotSpareNodes, i)) {
       NodeRecordPtr nodePtr;
       c_nodes.getPtr(nodePtr, i);
       nodePtr.p->hotSpare = true;
@@ -3616,7 +3614,7 @@
   c_masterNodeId = nodeFail->masterNodeId;
 
   c_noNodesFailed += numberOfFailedNodes;
-  Uint32 theFailedNodes[NodeBitmask::Size];
+  Uint32 theFailedNodes[NdbNodeBitmask::Size];
   memcpy(theFailedNodes, nodeFail->theNodes, sizeof(theFailedNodes));
 
   c_counterMgr.execNODE_FAILREP(signal);
@@ -3626,7 +3624,11 @@
   case BS_IDLE:
     jam();
     ok = true;
-    if(c_opRecordPool.getSize() != c_opRecordPool.getNoOfFree()){
+    if(c_opRecordPool.getSize() != 
+       (c_opRecordPool.getNoOfFree() + 
+	c_opSubEvent.get_count() + c_opCreateEvent.get_count() +
+	c_opDropEvent.get_count() + c_opSignalUtil.get_count()))
+    {
       jam();
       c_blockState = BS_NODE_FAILURE;
     }
@@ -3652,7 +3654,7 @@
   
   for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
     jam();
-    if(NodeBitmask::get(theFailedNodes, i)) {
+    if(NdbNodeBitmask::get(theFailedNodes, i)) {
       jam();
       NodeRecordPtr nodePtr;
       c_nodes.getPtr(nodePtr, i);
@@ -3758,6 +3760,15 @@
       break;
     }
 
+    if(getNodeState().getSingleUserMode() &&
+       (refToNode(signal->getSendersBlockRef()) !=
+        getNodeState().getSingleUserApi()))
+    {
+      jam();
+      parseRecord.errorCode = CreateTableRef::SingleUser;
+      break;
+    }
+
     CreateTableRecordPtr createTabPtr;
     c_opCreateTable.seize(createTabPtr);
     
@@ -3792,7 +3803,7 @@
     createTabPtr.p->m_dihAddFragPtr = RNIL;
 
     Uint32 key = c_opRecordSequence + 1;
-    Uint32 *theData = signal->getDataPtrSend(), i;
+    Uint32 *theData = signal->getDataPtrSend();
     Uint16 *frag_data= (Uint16*)&signal->theData[25];
     CreateFragmentationReq * const req = (CreateFragmentationReq*)theData;
     req->senderRef = reference();
@@ -3950,6 +3961,15 @@
     return;
   }
   
+  if(getNodeState().getSingleUserMode() &&
+     (refToNode(signal->getSendersBlockRef()) !=
+      getNodeState().getSingleUserApi()))
+  {
+    jam();
+    alterTableRef(signal, req, AlterTableRef::SingleUser);
+    return;
+  }
+
   const TableRecord::TabState tabState = tablePtr.p->tabState;
   bool ok = false;
   switch(tabState){
@@ -5116,7 +5136,6 @@
   packTableIntoPages(w, tabPtr);
   
   SegmentedSectionPtr spDataPtr;
-  Ptr<SectionSegment> tmpTsPtr;
   w.getPtr(spDataPtr);
   
   signal->setSection(spDataPtr, CreateTabReq::DICT_TAB_INFO);
@@ -5601,7 +5620,6 @@
   Uint32 fragCount = req->totalFragments;
   Uint32 requestInfo = req->requestInfo;
   Uint32 startGci = req->startGci;
-  Uint32 tablespace_id= req->tablespaceId;
   Uint32 logPart = req->logPartId;
 
   ndbrequire(node == getOwnNodeId());
@@ -6295,11 +6313,6 @@
   tablePtr.p->defaultNoPartFlag = c_tableDesc.DefaultNoPartFlag; 
   tablePtr.p->linearHashFlag = c_tableDesc.LinearHashFlag; 
   
-  Uint64 maxRows =
-    (((Uint64)tablePtr.p->maxRowsHigh) << 32) + tablePtr.p->maxRowsLow;
-  Uint64 minRows =
-    (((Uint64)tablePtr.p->minRowsHigh) << 32) + tablePtr.p->minRowsLow;
-
   {
     Rope frm(c_rope_pool, tablePtr.p->frmData);
     tabRequire(frm.assign(c_tableDesc.FrmData, c_tableDesc.FrmLen),
@@ -6727,6 +6740,15 @@
     return;
   }
   
+  if(getNodeState().getSingleUserMode() &&
+     (refToNode(signal->getSendersBlockRef()) !=
+      getNodeState().getSingleUserApi()))
+  {
+    jam();
+    dropTableRef(signal, req, DropTableRef::SingleUser);
+    return;
+  }
+
   const TableRecord::TabState tabState = tablePtr.p->tabState;
   bool ok = false;
   switch(tabState){
@@ -7720,7 +7742,6 @@
 Dbdict::execLIST_TABLES_REQ(Signal* signal)
 {
   jamEntry();
-  Uint32 i;
   ListTablesReq * req = (ListTablesReq*)signal->getDataPtr();
   Uint32 senderRef  = req->senderRef;
   Uint32 senderData = req->senderData;
@@ -7935,6 +7956,13 @@
         jam();
         tmperr = CreateIndxRef::Busy;
       }
+      else if(getNodeState().getSingleUserMode() &&
+              (refToNode(senderRef) !=
+               getNodeState().getSingleUserApi()))
+      {
+        jam();
+        tmperr = CreateIndxRef::SingleUser;
+      }
       if (tmperr != CreateIndxRef::NoError) {
 	releaseSections(signal);
 	OpCreateIndex opBusy;
@@ -8578,6 +8606,13 @@
         jam();
         tmperr = DropIndxRef::Busy;
       }
+      else if(getNodeState().getSingleUserMode() &&
+              (refToNode(senderRef) !=
+               getNodeState().getSingleUserApi()))
+      {
+        jam();
+        tmperr = DropIndxRef::SingleUser;
+      }
       if (tmperr != DropIndxRef::NoError) {
 	err = tmperr;
 	goto error;
@@ -9597,7 +9632,6 @@
     evntRecPtr.i = ref->getSenderData();
     ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
 
-    Uint32 err;
     interpretUtilPrepareErrorCode(errorCode, evntRecPtr.p->m_errorCode,
 				  evntRecPtr.p->m_errorLine);
     evntRecPtr.p->m_errorNode = reference();
@@ -10026,6 +10060,8 @@
   // Seize a Create Event record, the Coordinator will now have two seized
   // but that's ok, it's like a recursion
 
+  CRASH_INSERTION2(6009, getOwnNodeId() != c_masterNodeId);
+
   SubCreateReq * sumaReq = (SubCreateReq *)signal->getDataPtrSend();
   
   sumaReq->senderRef        = reference(); // reference to DICT
@@ -10224,9 +10260,20 @@
   }
   OpSubEventPtr subbPtr;
   Uint32 errCode = 0;
+
+  DictLockPtr loopPtr;
+  if (c_dictLockQueue.first(loopPtr) &&
+      loopPtr.p->lt->lockType == DictLockReq::NodeRestartLock)
+  {
+    jam();
+    errCode = 1405;
+    goto busy;
+  }
+
   if (!c_opSubEvent.seize(subbPtr)) {
     errCode = SubStartRef::Busy;
 busy:
+    jam();
     SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
 
     { // fix
@@ -10284,6 +10331,8 @@
    * Participant
    */
   ndbrequire(refToBlock(origSenderRef) == DBDICT);
+
+  CRASH_INSERTION(6007);
   
   {
     SubStartReq* req = (SubStartReq*) signal->getDataPtrSend();
@@ -10323,6 +10372,7 @@
     SubStartRef* ref = (SubStartRef*) signal->getDataPtrSend();
     ref->senderRef = reference();
     ref->senderData = subbPtr.p->m_senderData;
+    ref->errorCode = err;
     sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
 	       signal, SubStartRef::SignalLength2, JBB);
     c_opSubEvent.release(subbPtr);
@@ -10385,6 +10435,7 @@
 #ifdef EVENT_PH3_DEBUG
   ndbout_c("DBDICT(Coordinator) got GSN_SUB_START_CONF = (%d)", subbPtr.i);
 #endif
+  subbPtr.p->m_sub_start_conf = *conf;
   subbPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(senderRef));
   completeSubStartReq(signal,subbPtr.i,0);
 }
@@ -10424,6 +10475,9 @@
 #ifdef EVENT_DEBUG
   ndbout_c("SUB_START_CONF");
 #endif
+  
+  SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
+  * conf = subbPtr.p->m_sub_start_conf;
   sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_CONF,
 	     signal, SubStartConf::SignalLength, JBB);
   c_opSubEvent.release(subbPtr);
@@ -10514,6 +10568,9 @@
   ndbout_c("SUB_STOP_REQ 2");
 #endif
   ndbrequire(refToBlock(origSenderRef) == DBDICT);
+
+  CRASH_INSERTION(6008);
+
   {
     SubStopReq* req = (SubStopReq*) signal->getDataPtrSend();
     
@@ -10542,6 +10599,7 @@
     SubStopRef* ref = (SubStopRef*) signal->getDataPtrSend();
     ref->senderRef = reference();
     ref->senderData = subbPtr.p->m_senderData;
+    ref->errorCode = err;
     sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_REF,
 	       signal, SubStopRef::SignalLength, JBB);
     c_opSubEvent.release(subbPtr);
@@ -10594,6 +10652,7 @@
    * Coordinator
    */
   ndbrequire(refToBlock(senderRef) == DBDICT);
+  subbPtr.p->m_sub_stop_conf = *conf;
   subbPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(senderRef));
   completeSubStopReq(signal,subbPtr.i,0);
 }
@@ -10634,6 +10693,8 @@
 #ifdef EVENT_DEBUG
   ndbout_c("SUB_STOP_CONF");
 #endif
+  SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
+  * conf = subbPtr.p->m_sub_stop_conf;
   sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_CONF,
 	     signal, SubStopConf::SignalLength, JBB);
   c_opSubEvent.release(subbPtr);
@@ -10839,6 +10900,8 @@
     subbPtr.p->m_errorCode = 0;
   }
 
+  CRASH_INSERTION2(6010, getOwnNodeId() != c_masterNodeId);
+  
   SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend();
   req->senderRef = reference();
   req->senderData = subbPtr.i;
@@ -10880,6 +10943,7 @@
       SubRemoveRef* ref = (SubRemoveRef*) signal->getDataPtrSend();
       ref->senderRef = reference();
       ref->senderData = subbPtr.p->m_senderData;
+      ref->errorCode = err;
       sendSignal(subbPtr.p->m_senderRef, GSN_SUB_REMOVE_REF,
 		 signal, SubRemoveRef::SignalLength, JBB);
     }
@@ -13788,8 +13852,8 @@
   static const DictLockType lt[] = {
     { DictLockReq::NodeRestartLock, BS_NODE_RESTART, "NodeRestart" }
   };
-  for (int i = 0; i < sizeof(lt)/sizeof(lt[0]); i++) {
-    if (lt[i].lockType == lockType)
+  for (unsigned int i = 0; i < sizeof(lt)/sizeof(lt[0]); i++) {
+    if ((Uint32) lt[i].lockType == lockType)
       return &lt[i];
   }
   return NULL;
@@ -13918,6 +13982,15 @@
       break;
     }
 
+    if (c_blockState != BS_IDLE)
+    {
+      /**
+       * If state is BS_NODE_FAILURE, it might be that no op is running
+       */
+      jam();
+      break;
+    }
+
     ndbrequire(c_blockState == BS_IDLE);
     lockPtr.p->locked = true;
     c_blockState = lockPtr.p->lt->blockState;
@@ -13941,7 +14014,7 @@
 
   DictLockPtr lockPtr;
   c_dictLockQueue.getPtr(lockPtr, ord->lockPtr);
-  ndbrequire(lockPtr.p->lt->lockType == ord->lockType);
+  ndbrequire((Uint32) lockPtr.p->lt->lockType == ord->lockType);
 
   if (lockPtr.p->locked) {
     jam();
@@ -15327,7 +15400,6 @@
   const Uint32 objId = req->objId;
   const Uint32 objVersion = req->objVersion;
   const Uint32 objType = req->objType;
-  const Uint32 requestInfo = req->requestInfo;
   
   DropObjRecordPtr dropObjPtr;  
   ndbrequire(c_opDropObj.seize(dropObjPtr));
@@ -15826,8 +15898,7 @@
 
 void
 Dbdict::create_fg_abort_start(Signal* signal, SchemaOp* op){
-  CreateFilegroupImplReq* req = 
-    (CreateFilegroupImplReq*)signal->getDataPtrSend();
+  (void) signal->getDataPtrSend();
 
   if (op->m_obj_ptr_i != RNIL)
   {

--- 1.44/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-03-10 12:02:57 +01:00
+++ 1.45/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -52,6 +51,7 @@
 #include <signaldata/DropTrig.hpp>
 #include <signaldata/AlterTrig.hpp>
 #include <signaldata/DictLock.hpp>
+#include <signaldata/SumaImpl.hpp>
 #include "SchemaFile.hpp"
 #include <blocks/mutexes.hpp>
 #include <SafeCounter.hpp>
@@ -1131,6 +1131,7 @@
    * seize/release invokes ctor/dtor automatically.
    */
   struct OpRecordCommon {
+    OpRecordCommon() {}
     Uint32 key;         // key shared between master and slaves
     Uint32 nextHash;
     Uint32 prevHash;
@@ -1146,6 +1147,7 @@
    * Create table record
    */
   struct CreateTableRecord : OpRecordCommon {
+    CreateTableRecord() {}
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_coordinatorRef;
@@ -1191,6 +1193,7 @@
    * Drop table record
    */
   struct DropTableRecord : OpRecordCommon {
+    DropTableRecord() {}
     DropTableReq m_request;
     
     Uint32 m_requestType;
@@ -1634,6 +1637,10 @@
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_errorCode;
+    union {
+      SubStartConf m_sub_start_conf;
+      SubStopConf m_sub_stop_conf;
+    };
     RequestTracker m_reqTracker;
   };
   typedef Ptr<OpSubEvent> OpSubEventPtr;
@@ -1970,6 +1977,7 @@
     NodeBitmask m_nodes;
     
     Uint32 m_errorCode;
+    SchemaTransaction() {}
     void setErrorCode(Uint32 c){ if(m_errorCode == 0) m_errorCode = c;}
 
     /**
@@ -2043,10 +2051,10 @@
   KeyTable2<OpDropIndex, OpRecordUnion> c_opDropIndex;
   KeyTable2<OpAlterIndex, OpRecordUnion> c_opAlterIndex;
   KeyTable2<OpBuildIndex, OpRecordUnion> c_opBuildIndex;
-  KeyTable2<OpCreateEvent, OpRecordUnion> c_opCreateEvent;
-  KeyTable2<OpSubEvent, OpRecordUnion> c_opSubEvent;
-  KeyTable2<OpDropEvent, OpRecordUnion> c_opDropEvent;
-  KeyTable2<OpSignalUtil, OpRecordUnion> c_opSignalUtil;
+  KeyTable2C<OpCreateEvent, OpRecordUnion> c_opCreateEvent;
+  KeyTable2C<OpSubEvent, OpRecordUnion> c_opSubEvent;
+  KeyTable2C<OpDropEvent, OpRecordUnion> c_opDropEvent;
+  KeyTable2C<OpSignalUtil, OpRecordUnion> c_opSignalUtil;
   KeyTable2<OpCreateTrigger, OpRecordUnion> c_opCreateTrigger;
   KeyTable2<OpDropTrigger, OpRecordUnion> c_opDropTrigger;
   KeyTable2<OpAlterTrigger, OpRecordUnion> c_opAlterTrigger;

--- 1.5/storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp	2007-03-10 12:02:57 +01:00
+++ 1.6/storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.56/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-03-10 12:02:57 +01:00
+++ 1.57/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -175,6 +174,7 @@
 #define ZTRY_TO_UPDATE_ERROR 888
 #define ZCALL_ERROR 890
 #define ZTEMPORARY_RESOURCE_FAILURE 891
+#define ZUNSUPPORTED_BRANCH 892
 
 #define ZSTORED_SEIZE_ATTRINBUFREC_ERROR 873 // Part of Scan
 
@@ -534,6 +534,7 @@
       return (m_key.m_file_no << 16) ^ m_key.m_page_idx;
     }
 
+    Extent_info() {}
     bool equal(const Extent_info & rec) const {
       return m_key.m_file_no == rec.m_key.m_file_no &&
 	m_key.m_page_idx == rec.m_key.m_page_idx;
@@ -642,7 +643,8 @@
   
   DLList<ScanOp>::Head m_scanList;
 
-  bool m_undo_complete;
+  enum { UC_LCP = 1, UC_CREATE = 2 };
+  Uint32 m_undo_complete;
   Uint32 m_tablespace_id;
   Uint32 m_logfile_group_id;
   Disk_alloc_info m_disk_alloc_info;
@@ -684,6 +686,7 @@
     Uint32 currentAttrinbufLen; //Used until copyAttrinfo
   };
 
+  Operationrec() {}
   bool is_first_operation() const { return prevActiveOp == RNIL;}
   bool is_last_operation() const { return nextActiveOp == RNIL;}
 
@@ -805,6 +808,7 @@
   /* WHEN THE TRIGGER IS DEACTIVATED.         */
   /* **************************************** */
 struct TupTriggerData {
+  TupTriggerData() {}
   
   /**
    * Trigger id, used by DICT/TRIX to identify the trigger
@@ -1062,6 +1066,9 @@
       ,UNDO_UPDATE = File_formats::Undofile::UNDO_TUP_UPDATE
       ,UNDO_FREE = File_formats::Undofile::UNDO_TUP_FREE
       ,UNDO_CREATE = File_formats::Undofile::UNDO_TUP_CREATE
+      ,UNDO_DROP = File_formats::Undofile::UNDO_TUP_DROP
+      ,UNDO_ALLOC_EXTENT = File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT
+      ,UNDO_FREE_EXTENT = File_formats::Undofile::UNDO_TUP_FREE_EXTENT
     };
     
     struct Alloc 
@@ -1094,6 +1101,30 @@
       Uint32 m_table;
       Uint32 m_type_length; // 16 bit type, 16 bit length
     };
+
+    struct Drop
+    {
+      Uint32 m_table;
+      Uint32 m_type_length; // 16 bit type, 16 bit length
+    };
+
+    struct AllocExtent
+    {
+      Uint32 m_table;
+      Uint32 m_fragment;
+      Uint32 m_page_no;
+      Uint32 m_file_no;
+      Uint32 m_type_length;
+    };
+
+    struct FreeExtent
+    {
+      Uint32 m_table;
+      Uint32 m_fragment;
+      Uint32 m_page_no;
+      Uint32 m_file_no;
+      Uint32 m_type_length;
+    };
   };
   
   Extent_info_pool c_extent_pool;
@@ -1294,6 +1325,7 @@
     STATIC_CONST( LCP_KEEP    = 0x02000000 ); // Should be returned in LCP
     STATIC_CONST( FREE        = 0x02800000 ); // Is free
     
+    Tuple_header() {}
     Uint32 get_tuple_version() const { 
       return m_header_bits & TUP_VERSION_MASK;
     }
@@ -1531,7 +1563,7 @@
   int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci);
 
   void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
-  void nr_delete_logbuffer_callback(Signal*, Uint32 op, Uint32 page);
+  void nr_delete_log_buffer_callback(Signal*, Uint32 op, Uint32 page);
 private:
   BLOCK_DEFINES(Dbtup);
 
@@ -1557,7 +1589,6 @@
   void execTUP_ABORTREQ(Signal* signal);
   void execNDB_STTOR(Signal* signal);
   void execREAD_CONFIG_REQ(Signal* signal);
-  void execSET_VAR_REQ(Signal* signal);
   void execDROP_TAB_REQ(Signal* signal);
   void execALTER_TAB_REQ(Signal* signal);
   void execTUP_DEALLOCREQ(Signal* signal);
@@ -1819,7 +1850,8 @@
                       Operationrec* regOperPtr,
                       Fragrecord* regFragPtr,
                       Tablerec* regTabPtr,
-                      KeyReqStruct* req_struct);
+                      KeyReqStruct* req_struct,
+		      bool disk);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
@@ -2701,9 +2733,10 @@
                         Uint32 fragId);
 
 
-  void releaseFragment(Signal* signal, Uint32 tableId);
+  void releaseFragment(Signal* signal, Uint32 tableId, Uint32);
   void drop_fragment_free_var_pages(Signal*);
-  void drop_fragment_free_exent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
+  void drop_fragment_free_extent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
+  void drop_fragment_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
   void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32);
   void drop_fragment_unmap_page_callback(Signal* signal, Uint32, Uint32);
   
@@ -2766,15 +2799,8 @@
   void returnCommonArea(Uint32 retPageRef, Uint32 retNo);
   void initializePage();
 
-// Private methods
-  void removeCommonArea(Uint32 remPageRef, Uint32 list);
-  void insertCommonArea(Uint32 insPageRef, Uint32 list);
-  void findFreeLeftNeighbours(Uint32& allocPageRef, Uint32& noPagesAllocated,
Uint32 noPagesToAllocate);
-  void findFreeRightNeighbours(Uint32& allocPageRef, Uint32& noPagesAllocated,
Uint32 noPagesToAllocate);
   Uint32 nextHigherTwoLog(Uint32 input);
 
-// Private data
-  Uint32 cfreepageList[16];
 
 //------------------------------------------------------------------------------------------------------
 // Page Mapper, convert logical page id's to physical page id's
@@ -2900,7 +2926,8 @@
 
   ArrayPool<Page> c_page_pool;
   Uint32 cnoOfAllocatedPages;
-  
+  Uint32 c_no_of_pages;
+
   Tablerec *tablerec;
   Uint32 cnoOfTablerec;
 
@@ -2935,6 +2962,7 @@
 
   // Trigger variables
   Uint32 c_maxTriggersPerTable;
+  Uint32 c_memusage_report_frequency;
 
   Uint32 c_errorInsert4000TableId;
   Uint32 c_min_list_size[MAX_FREE_LIST + 1];
@@ -2998,6 +3026,9 @@
   void disk_page_commit_callback(Signal*, Uint32 opPtrI, Uint32 page_id);  
   
   void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32); 
+
+  void disk_page_alloc_extent_log_buffer_callback(Signal*, Uint32, Uint32);
+  void disk_page_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
   
   Uint64 disk_page_undo_alloc(Page*, const Local_key*,
 			      Uint32 sz, Uint32 gci, Uint32 logfile_group_id);
@@ -3013,6 +3044,9 @@
   void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
   void undo_createtable_logsync_callback(Signal* signal, Uint32, Uint32);
 
+  void drop_table_log_buffer_callback(Signal*, Uint32, Uint32);
+  void drop_table_logsync_callback(Signal*, Uint32, Uint32);
+
   void disk_page_set_dirty(Ptr<Page>);
   void restart_setup_page(Disk_alloc_info&, Ptr<Page>);
   void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
@@ -3043,10 +3077,12 @@
     Ptr<Extent_info> m_extent_ptr;
     Local_key m_key;
   };
+
+  void disk_restart_mark_no_lcp(Uint32 table, Uint32 frag);
   
 private:
   void disk_restart_undo_next(Signal*);
-  void disk_restart_undo_lcp(Uint32, Uint32);
+  void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
   void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
   void disk_restart_undo_alloc(Apply_undo*);
   void disk_restart_undo_update(Apply_undo*);

--- 1.13/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2007-03-10 12:02:57 +01:00
+++ 1.14/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -146,7 +145,7 @@
     {
       if(bits & Tuple_header::MM_GROWN)
       {
-	// ndbout_c("abort grow");
+	if (0) ndbout_c("abort grow");
 	Ptr<Page> vpage;
 	Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx;
 	Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
@@ -177,7 +176,7 @@
       } 
       else if(bits & Tuple_header::MM_SHRINK)
       {
-	// ndbout_c("abort shrink");
+	if (0) ndbout_c("abort shrink");
       }
     }
     else if (regOperPtr.p->is_first_operation() && 
@@ -327,6 +326,10 @@
     } else {
       ndbrequire(false);
     }//if
+    break;
+  case 40:
+    ljam();
+    terrorCode = ZUNSUPPORTED_BRANCH;
     break;
   default:
     ndbrequire(false);

--- 1.18/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-03-10 12:02:57 +01:00
+++ 1.19/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -43,7 +42,7 @@
   getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
   ndbassert(regFragPtr.p != NULL);
   
-  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~0))
+  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~ (Uint32) 0))
   {
     Local_key tmp;
     tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
@@ -153,10 +152,10 @@
 static
 inline
 bool
-operator>=(const Local_key& key1, const Local_key& key2)
+operator>(const Local_key& key1, const Local_key& key2)
 {
   return key1.m_page_no > key2.m_page_no ||
-    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
+    (key1.m_page_no == key2.m_page_no && key1.m_page_idx > key2.m_page_idx);
 }
 
 void
@@ -177,8 +176,11 @@
   {
     Local_key disk;
     memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
+    PagePtr tmpptr;
+    tmpptr.i = m_pgman.m_ptr.i;
+    tmpptr.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
     disk_page_free(signal, regTabPtr, regFragPtr, 
-		   &disk, *(PagePtr*)&m_pgman.m_ptr, gci);
+		   &disk, tmpptr, gci);
   }
   
   if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
@@ -188,7 +190,7 @@
     Local_key rowid = regOperPtr->m_tuple_location;
     Local_key scanpos = scanOp.p->m_scanPos.m_key;
     rowid.m_page_no = page->frag_page_id;
-    if (rowid >= scanpos)
+    if (rowid > scanpos)
     {
       extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
       ptr->m_operation_ptr_i = lcp_keep_list;
@@ -233,6 +235,7 @@
 {
   ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
   
+  Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
   Uint32 save= tuple_ptr->m_operation_ptr_i;
   Uint32 bits= tuple_ptr->m_header_bits;
 
@@ -288,7 +291,6 @@
     Local_key key;
     memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
     Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
-    Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
 
     PagePtr diskPagePtr = *(PagePtr*)&m_pgman.m_ptr;
     ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
@@ -297,19 +299,6 @@
     if(copy_bits & Tuple_header::DISK_ALLOC)
     {
       disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
-
-      if(lcpScan_ptr_i != RNIL)
-      {
-	ScanOpPtr scanOp;
-	c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
-	Local_key rowid = regOperPtr->m_tuple_location;
-	Local_key scanpos = scanOp.p->m_scanPos.m_key;
-	rowid.m_page_no = pagePtr.p->frag_page_id;
-	if(rowid >= scanpos)
-	{
-	  copy_bits |= Tuple_header::LCP_SKIP;
-	}
-      }
     }
     
     if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
@@ -336,6 +325,18 @@
     copy_bits |= Tuple_header::DISK_PART;
   }
   
+  if(lcpScan_ptr_i != RNIL && (bits & Tuple_header::ALLOC))
+  {
+    ScanOpPtr scanOp;
+    c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
+    Local_key rowid = regOperPtr->m_tuple_location;
+    Local_key scanpos = scanOp.p->m_scanPos.m_key;
+    rowid.m_page_no = pagePtr.p->frag_page_id;
+    if(rowid > scanpos)
+    {
+       copy_bits |= Tuple_header::LCP_SKIP;
+    }
+  }
   
   Uint32 clear= 
     Tuple_header::ALLOC | Tuple_header::FREE |
@@ -381,7 +382,12 @@
   regOperPtr.p->m_commit_disk_callback_page= page_id;
   m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
   
-  disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
+  {
+    PagePtr tmp;
+    tmp.i = m_pgman.m_ptr.i;
+    tmp.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
+    disk_page_set_dirty(tmp);
+  }
   
   execTUP_COMMITREQ(signal);
   if(signal->theData[0] == 0)
@@ -532,7 +538,7 @@
 	
 	c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id, 
 				regOperPtr.p->m_undo_buffer_space);
-	ndbout_c("insert+delete");
+	if (0) ndbout_c("insert+delete");
 	goto skip_disk;
       }
     } 
@@ -568,7 +574,14 @@
       break;
     }
     get_page = true;
-    disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
+
+    {
+      PagePtr tmpptr;
+      tmpptr.i = m_pgman.m_ptr.i;
+      tmpptr.p = reinterpret_cast<Page*>(m_pgman.m_ptr.p);
+      disk_page_set_dirty(tmpptr);
+    }
+    
     regOperPtr.p->m_commit_disk_callback_page= res;
     regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
   } 

--- 1.49/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-03-10 12:02:57 +01:00
+++ 1.50/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -478,7 +477,6 @@
   Tuple_header* ptr= (Tuple_header*)tmp;
   
   int res= 1;
-  Uint32 opPtr= ptr->m_operation_ptr_i;
   if(ptr->m_header_bits & Tuple_header::DISK_PART)
   {
     Page_cache_client::Request req;
@@ -563,7 +561,6 @@
   Tuple_header* ptr= (Tuple_header*)tmp;
   
   int res= 1;
-  Uint32 opPtr= ptr->m_operation_ptr_i;
   if(ptr->m_header_bits & Tuple_header::DISK_PART)
   {
     Page_cache_client::Request req;
@@ -710,7 +707,7 @@
    copyAttrinfo(regOperPtr, &cinBuffer[0]);
    
    Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
-   if(Roptype == ZINSERT && localkey == ~0)
+   if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
    {
      // No tuple allocatated yet
      goto do_insert;
@@ -841,7 +838,9 @@
      {
        jam();
        if (handleDeleteReq(signal, regOperPtr,
-			   regFragPtr, regTabPtr, &req_struct) == -1) {
+			   regFragPtr, regTabPtr, 
+			   &req_struct,
+			   disk_page != RNIL) == -1) {
 	 return;
        }
        /*
@@ -1530,7 +1529,8 @@
                            Operationrec* regOperPtr,
                            Fragrecord* regFragPtr,
                            Tablerec* regTabPtr,
-                           KeyReqStruct *req_struct)
+                           KeyReqStruct *req_struct,
+			   bool disk)
 {
   // delete must set but not increment tupVersion
   if (!regOperPtr->is_first_operation())
@@ -1582,8 +1582,11 @@
   {
     return 0;
   }
-  
-  return handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
+
+  if (setup_read(req_struct, regOperPtr, regFragPtr, regTabPtr, disk))
+  {
+    return handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
+  }
 
 error:
   tupkeyErrorLab(signal);
@@ -2316,6 +2319,11 @@
             // NULL==NULL and NULL<not-NULL
             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
           } else {
+	    jam();
+	    if (unlikely(sqlType.m_cmp == 0))
+	    {
+	      return TUPKEY_abort(signal, 40);
+	    }
             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
           }
 	} else {
@@ -2323,6 +2331,11 @@
             // NULL like NULL is true (has no practical use)
             res1 =  r1_null && r2_null ? 0 : -1;
           } else {
+	    jam();
+	    if (unlikely(sqlType.m_like == 0))
+	    {
+	      return TUPKEY_abort(signal, 40);
+	    }
             res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
           }
         }
@@ -3316,7 +3329,7 @@
     if(needed <= alloc)
     {
       //ndbassert(!regOperPtr->is_first_operation());
-      ndbout_c(" no grow");
+      if (0) ndbout_c(" no grow");
       return 0;
     }
     Uint32 *new_var_part=realloc_var_part(regFragPtr, regTabPtr, pagePtr, 
@@ -3386,7 +3399,6 @@
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
   Local_key tmp = *key;
-  Uint32 pages = fragPtr.p->noOfPages;
   
   int ret;
   PagePtr page_ptr;
@@ -3596,7 +3608,7 @@
     disk_page_set_dirty(disk_page);
 
     preq.m_callback.m_callbackFunction =
-      safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
+      safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
     res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
     switch(res){
@@ -3608,7 +3620,7 @@
       break;
     }
 
-    ndbout << "DIRECT DISK DELETE: " << disk << endl;
+    if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
     disk_page_free(signal, tablePtr.p, fragPtr.p,
 		   &disk, *(PagePtr*)&disk_page, gci);
     return 0;
@@ -3649,7 +3661,7 @@
   Callback cb;
   cb.m_callbackData = userpointer;
   cb.m_callbackFunction =
-    safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
+    safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
   int res= lgman.get_log_buffer(signal, sz, &cb);
   switch(res){
@@ -3660,7 +3672,7 @@
     break;
   }
     
-  ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
+  if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref <<
endl;
   disk_page_free(signal, tablePtr.p, fragPtr.p,
 		 &op.m_disk_ref, pagePtr, op.m_gci);
   
@@ -3669,7 +3681,7 @@
 }
 
 void
-Dbtup::nr_delete_logbuffer_callback(Signal* signal, 
+Dbtup::nr_delete_log_buffer_callback(Signal* signal, 
 				    Uint32 userpointer, 
 				    Uint32 unused)
 {
@@ -3692,7 +3704,7 @@
   /**
    * reset page no
    */
-  ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref <<
endl;
+  if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref
<< endl;
   
   disk_page_free(signal, tablePtr.p, fragPtr.p,
 		 &op.m_disk_ref, pagePtr, op.m_gci);

--- 1.32/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-03-10 12:02:57 +01:00
+++ 1.33/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -59,7 +58,7 @@
     c_extent_hash(c_extent_pool),
     c_storedProcPool(),
     c_buildIndexList(c_buildIndexPool),
-    c_undo_buffer(this)
+    c_undo_buffer(&ctx.m_mm)
 {
   BLOCK_CONSTRUCTOR(Dbtup);
 
@@ -83,7 +82,6 @@
   addRecSignal(GSN_TUP_ABORTREQ, &Dbtup::execTUP_ABORTREQ);
   addRecSignal(GSN_NDB_STTOR, &Dbtup::execNDB_STTOR);
   addRecSignal(GSN_READ_CONFIG_REQ, &Dbtup::execREAD_CONFIG_REQ, true);
-  addRecSignal(GSN_SET_VAR_REQ,  &Dbtup::execSET_VAR_REQ);
 
   // Trigger Signals
   addRecSignal(GSN_CREATE_TRIG_REQ, &Dbtup::execCREATE_TRIG_REQ);
@@ -115,6 +113,7 @@
   tableDescriptor = 0;
   totNoOfPagesAllocated = 0;
   cnoOfAllocatedPages = 0;
+  c_no_of_pages = 0;
   
   initData();
 }//Dbtup::Dbtup()
@@ -122,6 +121,7 @@
 Dbtup::~Dbtup() 
 {
   // Records with dynamic sizes
+  c_page_pool.clear();
   deallocRecord((void **)&attrbufrec,"Attrbufrec", 
 		sizeof(Attrbufrec), 
 		cnoOfAttrbufrec);
@@ -171,12 +171,13 @@
     break;
   case ZREL_FRAG:
     ljam();
-    releaseFragment(signal, dataPtr);
+    releaseFragment(signal, dataPtr, signal->theData[2]);
     break;
   case ZREPORT_MEMORY_USAGE:{
     ljam();
     static int c_currentMemUsed = 0;
-    Uint32 tmp = c_page_pool.getSize();
+    Uint32 cnt = signal->theData[1];
+    Uint32 tmp = c_no_of_pages;
     int now = tmp ? (cnoOfAllocatedPages * 100)/tmp : 0;
     const int thresholds[] = { 100, 90, 80, 0 };
     
@@ -189,12 +190,22 @@
       }
     }
 
-    if(now != c_currentMemUsed){
-      reportMemoryUsage(signal, now > c_currentMemUsed ? 1 : -1);
+    if(now != c_currentMemUsed || 
+       (c_memusage_report_frequency && cnt + 1 == c_memusage_report_frequency))
+    {
+      reportMemoryUsage(signal, 
+			now > c_currentMemUsed ? 1 : 
+			now < c_currentMemUsed ? -1 : 0);
+      cnt = 0;
       c_currentMemUsed = now;
+    } 
+    else
+    {
+      cnt++;
     }
     signal->theData[0] = ZREPORT_MEMORY_USAGE;
-    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 2000, 1);    
+    signal->theData[1] = cnt;
+    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 2);    
     return;
   }
   case ZBUILD_INDEX:
@@ -219,7 +230,7 @@
     fragPtr.i= signal->theData[2];
     ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
     ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-    drop_fragment_free_exent(signal, tabPtr, fragPtr, signal->theData[3]);
+    drop_fragment_free_extent(signal, tabPtr, fragPtr, signal->theData[3]);
     return;
   }
   case ZUNMAP_PAGES:
@@ -318,7 +329,7 @@
 
   c_storedProcPool.setSize(noOfStoredProc);
   c_buildIndexPool.setSize(c_noOfBuildIndexRec);
-  c_triggerPool.setSize(noOfTriggers);
+  c_triggerPool.setSize(noOfTriggers, false, true, true, CFG_DB_NO_TRIGGERS);
 
   c_extent_hash.setSize(1024); // 4k
   
@@ -336,6 +347,7 @@
 
   ScanOpPtr lcp;
   ndbrequire(c_scanOpPool.seize(lcp));
+  new (lcp.p) ScanOp();
   c_lcp_scan_op= lcp.i;
 
   czero = 0;
@@ -343,6 +355,10 @@
   clastBitMask = 1;
   clastBitMask = clastBitMask << 31;
 
+  c_memusage_report_frequency = 0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_MEMREPORT_FREQUENCY, 
+			    &c_memusage_report_frequency);
+  
   initialiseRecordsLab(signal, 0, ref, senderData);
 }//Dbtup::execSIZEALT_REP()
 
@@ -350,6 +366,7 @@
 {
   unsigned i;
   Uint32 tmp;
+  Uint32 tmp1 = 0;
   const ndb_mgm_configuration_iterator * p = 
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
@@ -357,9 +374,10 @@
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_PAGE, &tmp));
 
   // Records with dynamic sizes
-  Page* ptr =(Page*)allocRecord("Page", sizeof(Page), tmp, false);
-  c_page_pool.set(ptr, tmp);
-  
+  void* ptr = m_ctx.m_mm.get_memroot();
+  c_page_pool.set((Page*)ptr, (Uint32)~0);
+  c_no_of_pages = tmp;
+
   attrbufrec = (Attrbufrec*)allocRecord("Attrbufrec", 
 					sizeof(Attrbufrec), 
 					cnoOfAttrbufrec);
@@ -385,7 +403,9 @@
 						  cnoOfTabDescrRec);
 
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_OP_RECS, &tmp));
-  c_operation_pool.setSize(tmp);
+  ndb_mgm_get_int_parameter(p, CFG_DB_NO_LOCAL_OPS, &tmp1);
+  c_operation_pool.setSize(tmp, false, true, true, 
+      tmp1 == 0 ? CFG_DB_NO_OPS : CFG_DB_NO_LOCAL_OPS);
   
   pageRange = (PageRange*)allocRecord("PageRange",
 				      sizeof(PageRange), 
@@ -510,7 +530,8 @@
 /*       RESTART.                        */
 /*****************************************/
     signal->theData[0] = ZREPORT_MEMORY_USAGE;
-    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 2000, 1);    
+    signal->theData[1] = 0;
+    sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1);    
     break;
   default:
     ljam();
@@ -734,33 +755,6 @@
   regFragPtr.p->nextfreefrag = cfirstfreefrag;
   cfirstfreefrag = regFragPtr.i;
 }//Dbtup::releaseFragrec()
-
-void Dbtup::execSET_VAR_REQ(Signal* signal) 
-{
-#if 0
-  SetVarReq* const setVarReq = (SetVarReq*)signal->getDataPtrSend();
-  ConfigParamId var = setVarReq->variable();
-  int val = setVarReq->value();
-
-  switch (var) {
-
-  case NoOfDiskPagesToDiskAfterRestartTUP:
-    clblPagesPerTick = val;
-    sendSignal(CMVMI_REF, GSN_SET_VAR_CONF, signal, 1, JBB);
-    break;
-
-  case NoOfDiskPagesToDiskDuringRestartTUP:
-    // Valid only during start so value not set.
-    sendSignal(CMVMI_REF, GSN_SET_VAR_CONF, signal, 1, JBB);
-    break;
-
-  default:
-    sendSignal(CMVMI_REF, GSN_SET_VAR_REF, signal, 1, JBB);
-  } // switch
-#endif
-
-}//execSET_VAR_REQ()
-
 
 
 

--- 1.21/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2007-03-10 12:02:57 +01:00
+++ 1.22/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.35/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-03-10 12:02:57 +01:00
+++ 1.36/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -55,7 +54,6 @@
   regTabPtr.i           = tupFragReq->tableId;
   Uint32 noOfAttributes = tupFragReq->noOfAttr;
   Uint32 fragId         = tupFragReq->fragId;
-  Uint32 noOfNullAttr = tupFragReq->noOfNullAttr;
   /*  Uint32 schemaVersion = tupFragReq->schemaVersion;*/
   Uint32 noOfKeyAttr = tupFragReq->noOfKeyAttr;
   Uint32 noOfCharsets = tupFragReq->noOfCharsets;
@@ -327,6 +325,7 @@
     
     if(lastAttr)
     {
+      jam();
       /**
        * Init Disk_alloc_info
        */
@@ -338,6 +337,11 @@
 	ndbrequire(tsman.get_tablespace_info(&rep) == 0);
 	regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
       }
+      else
+      {
+	jam();
+	regFragPtr.p->m_logfile_group_id = RNIL;
+      }
       new (&regFragPtr.p->m_disk_alloc_info)
 	Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size);
       releaseFragoperrec(fragOperPtr);      
@@ -541,7 +545,12 @@
 			    regFragPtr.p->m_tablespace_id);
     ndbrequire(tsman.get_tablespace_info(&rep) == 0);
     regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
-  }    
+  }
+  else
+  {
+    jam();
+    regFragPtr.p->m_logfile_group_id = RNIL;
+  }
 
   new (&regFragPtr.p->m_disk_alloc_info)
     Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size); 
@@ -561,8 +570,8 @@
       Uint32 sz= sizeof(Disk_undo::Create) >> 2;
       
       Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
-      int r0 = c_lgman->alloc_log_space(regFragPtr.p->m_logfile_group_id,
-					sz);
+      (void)  c_lgman->alloc_log_space(regFragPtr.p->m_logfile_group_id,
+                                       sz);
       
       int res= lgman.get_log_buffer(signal, sz, &cb);
       switch(res){
@@ -574,7 +583,7 @@
 	ndbrequire("NOT YET IMPLEMENTED" == 0);
 	break;
       }
-      execute(signal, cb, 0);
+      execute(signal, cb, regFragPtr.p->m_logfile_group_id);
       return;
     }
   }
@@ -1428,7 +1437,8 @@
 
   signal->theData[0]= ZREL_FRAG;
   signal->theData[1]= tabPtr.i;
-  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+  signal->theData[2]= RNIL;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
 }
 
 void Dbtup::releaseTabDescr(Tablerec* const regTabPtr) 
@@ -1474,7 +1484,8 @@
   }
 }
 
-void Dbtup::releaseFragment(Signal* signal, Uint32 tableId)
+void Dbtup::releaseFragment(Signal* signal, Uint32 tableId, 
+			    Uint32 logfile_group_id)
 {
   TablerecPtr tabPtr;
   tabPtr.i= tableId;
@@ -1501,16 +1512,35 @@
     sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);  
     return;
   }
+
+#if NOT_YET_UNDO_DROP_TABLE
+#error "This code is complete, but I prefer not to enable it until I need it"
+  if (logfile_group_id != RNIL)
+  {
+    Callback cb;
+    cb.m_callbackData= tabPtr.i;
+    cb.m_callbackFunction = 
+      safe_cast(&Dbtup::drop_table_log_buffer_callback);
+    Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
+    (void) c_lgman->alloc_log_space(logfile_group_id, sz);
+    
+    Logfile_client lgman(this, c_lgman, logfile_group_id);
+    int res= lgman.get_log_buffer(signal, sz, &cb);
+    switch(res){
+    case 0:
+      ljam();
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    default:
+      execute(signal, cb, logfile_group_id);
+      return;
+    }
+  }
+#endif
   
-  DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend();
-  dropConf->senderRef= reference();
-  dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr;
-  dropConf->tableId= tabPtr.i;
-  sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF,
-             signal, DropTabConf::SignalLength, JBB);
-  
-  releaseTabDescr(tabPtr.p);
-  initTab(tabPtr.p);
+  drop_table_logsync_callback(signal, tabPtr.i, RNIL);
 }
 
 void
@@ -1537,7 +1567,7 @@
 	alloc_info.m_curr_extent_info_ptr_i= RNIL;
       }
       
-      drop_fragment_free_exent(signal, tabPtr, fragPtr, 0);
+      drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);
       return;
     }
     
@@ -1570,7 +1600,7 @@
     }
     return;
   }
-  drop_fragment_free_exent(signal, tabPtr, fragPtr, 0);  
+  drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);  
 }
 
 void
@@ -1603,10 +1633,10 @@
 }
 
 void
-Dbtup::drop_fragment_free_exent(Signal *signal, 
-				TablerecPtr tabPtr, 
-				FragrecordPtr fragPtr,
-				Uint32 pos)
+Dbtup::drop_fragment_free_extent(Signal *signal, 
+				 TablerecPtr tabPtr, 
+				 FragrecordPtr fragPtr,
+				 Uint32 pos)
 {
   if (tabPtr.p->m_no_of_disk_attributes)
   {
@@ -1616,25 +1646,32 @@
       if(!alloc_info.m_free_extents[pos].isEmpty())
       {
 	jam();
-	Local_extent_info_list
-	  list(c_extent_pool, alloc_info.m_free_extents[pos]);
-	Ptr<Extent_info> ext_ptr;
-	list.first(ext_ptr);
+	Callback cb;
+	cb.m_callbackData= fragPtr.i;
+	cb.m_callbackFunction = 
+	  safe_cast(&Dbtup::drop_fragment_free_extent_log_buffer_callback);
+#if NOT_YET_UNDO_FREE_EXTENT
+	Uint32 sz= sizeof(Disk_undo::FreeExtent) >> 2;
+	(void) c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
 	
-	Tablespace_client tsman(signal, c_tsman, tabPtr.i, 
-				fragPtr.p->fragmentId,
-				fragPtr.p->m_tablespace_id);
+	Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
 	
-	tsman.free_extent(&ext_ptr.p->m_key);
-	c_extent_hash.remove(ext_ptr);
-	list.release(ext_ptr);
-	
-	signal->theData[0] = ZFREE_EXTENT;
-	signal->theData[1] = tabPtr.i;
-	signal->theData[2] = fragPtr.i;
-	signal->theData[3] = pos;
-	sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);  
+	int res= lgman.get_log_buffer(signal, sz, &cb);
+	switch(res){
+	case 0:
+	  ljam();
+	  return;
+	case -1:
+	  ndbrequire("NOT YET IMPLEMENTED" == 0);
+	  break;
+	default:
+	  execute(signal, cb, fragPtr.p->m_logfile_group_id);
+	  return;
+	}
+#else
+	execute(signal, cb, fragPtr.p->m_logfile_group_id);	
 	return;
+#endif
       }
     }
     
@@ -1654,6 +1691,123 @@
 }
 
 void
+Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
+				      Uint32 logfile_group_id)
+{
+  TablerecPtr tabPtr;
+  tabPtr.i = tablePtrI;
+  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+  
+  ndbrequire(tabPtr.p->m_no_of_disk_attributes);
+
+  Disk_undo::Drop drop;
+  drop.m_table = tabPtr.i;
+  drop.m_type_length = 
+    (Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
+  Logfile_client lgman(this, c_lgman, logfile_group_id);
+  
+  Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
+  Uint64 lsn = lgman.add_entry(c, 1);
+
+  Logfile_client::Request req;
+  req.m_callback.m_callbackData= tablePtrI;
+  req.m_callback.m_callbackFunction = 
+    safe_cast(&Dbtup::drop_table_logsync_callback);
+  
+  int ret = lgman.sync_lsn(signal, lsn, &req, 0);
+  switch(ret){
+  case 0:
+    return;
+  default:
+    ndbout_c("ret: %d", ret);
+    ndbrequire(false);
+  }
+}
+
+void
+Dbtup::drop_table_logsync_callback(Signal* signal, 
+				   Uint32 tabPtrI, 
+				   Uint32 logfile_group_id)
+{
+  TablerecPtr tabPtr;
+  tabPtr.i = tabPtrI;
+  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+  
+  DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend();
+  dropConf->senderRef= reference();
+  dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr;
+  dropConf->tableId= tabPtr.i;
+  sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF,
+             signal, DropTabConf::SignalLength, JBB);
+  
+  releaseTabDescr(tabPtr.p);
+  initTab(tabPtr.p);
+}
+
+void
+Dbtup::drop_fragment_free_extent_log_buffer_callback(Signal* signal,
+						     Uint32 fragPtrI,
+						     Uint32 unused)
+{
+  FragrecordPtr fragPtr;
+  fragPtr.i = fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+
+  TablerecPtr tabPtr;
+  tabPtr.i = fragPtr.p->fragTableId;
+  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+
+  ndbrequire(tabPtr.p->m_no_of_disk_attributes);
+  Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info;  
+
+  for(Uint32 pos = 0; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++)
+  {
+    if(!alloc_info.m_free_extents[pos].isEmpty())
+    {
+      jam();
+      Local_extent_info_list
+	list(c_extent_pool, alloc_info.m_free_extents[pos]);
+      Ptr<Extent_info> ext_ptr;
+      list.first(ext_ptr);
+
+#if NOT_YET_UNDO_FREE_EXTENT
+#error "This code is complete"
+#error "but not needed until we do dealloc of empty extents"
+      Disk_undo::FreeExtent free;
+      free.m_table = tabPtr.i;
+      free.m_fragment = fragPtr.p->fragmentId;
+      free.m_file_no = ext_ptr.p->m_key.m_file_no;
+      free.m_page_no = ext_ptr.p->m_key.m_page_no;
+      free.m_type_length = 
+	(Disk_undo::UNDO_FREE_EXTENT << 16) | (sizeof(free) >> 2);
+      Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
+      
+      Logfile_client::Change c[1] = {{ &free, sizeof(free) >> 2 } };
+      Uint64 lsn = lgman.add_entry(c, 1);
+#else
+      Uint64 lsn = 0;
+#endif
+      
+      Tablespace_client tsman(signal, c_tsman, tabPtr.i, 
+			      fragPtr.p->fragmentId,
+			      fragPtr.p->m_tablespace_id);
+      
+      tsman.free_extent(&ext_ptr.p->m_key, lsn);
+      c_extent_hash.remove(ext_ptr);
+      list.release(ext_ptr);
+      
+      signal->theData[0] = ZFREE_EXTENT;
+      signal->theData[1] = tabPtr.i;
+      signal->theData[2] = fragPtr.i;
+      signal->theData[3] = pos;
+      sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);  
+      return;
+    }
+  }
+  ndbrequire(false);
+}
+
+void
 Dbtup::drop_fragment_free_var_pages(Signal* signal)
 {
   ljam();
@@ -1684,7 +1838,7 @@
     sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);  
     return;
   }
-  
+  Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
   releaseFragPages(fragPtr.p);
   Uint32 i;
   for(i= 0; i<MAX_FRAG_PER_NODE; i++)
@@ -1698,7 +1852,8 @@
   
   signal->theData[0]= ZREL_FRAG;
   signal->theData[1]= tabPtr.i;
-  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);  
+  signal->theData[2]= logfile_group_id;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);  
   return;
 }
 

--- 1.31/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2007-03-10 12:02:57 +01:00
+++ 1.32/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -392,7 +391,6 @@
     Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
     if (maxIndexBuf <= maxRead && ok) {
       ljam();
-      const char* ssrcPtr = (const char*)srcPtr;
       int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
       ndbrequire(n != -1);
       int m = n;
@@ -558,7 +556,6 @@
     Uint32 maxIndexBuf = index_buf + (dstLen >> 2);
     if (maxIndexBuf <= max_read && ok) {
       ljam();
-      const char* ssrcPtr = (const char*)srcPtr;
       int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
       ndbrequire(n != -1);
       int m = n;
@@ -1375,7 +1372,6 @@
     Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
     if (maxIndexBuf <= maxRead && ok) {
       ljam();
-      const char* ssrcPtr = (const char*)srcPtr;
       int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
       ndbrequire(n != -1);
       int m = n;
@@ -1541,6 +1537,7 @@
       memcpy(req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
 	     inBuffer+inBufIndex+1, sz << 2);
       inBufIndex += 1 + sz;
+      req_struct->in_buf_index = inBufIndex;
     }
     else
     {
@@ -2478,7 +2475,7 @@
                             Uint32 attr_des2)
 {
   Uint32 attr_descriptor, index_buf, in_buf_len, var_index, null_ind;
-  Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
+  Uint32 vsize_in_words, new_index, max_var_size;
   Uint32 var_attr_pos;
   char *var_data_start;
   Uint16 *vpos_array;
@@ -2519,6 +2516,7 @@
     terrorCode= ZAI_INCONSISTENCY_ERROR;
     return false;
   }
+  return false;
 }
 
 bool

--- 1.6/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2007-03-10 12:02:57 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.26/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2007-03-10 12:02:57 +01:00
+++ 1.27/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.11/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-03-10 12:02:57 +01:00
+++ 1.12/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-03-10 12:02:57 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.65/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-03-10 12:02:58 +01:00
+++ 1.66/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -57,7 +56,6 @@
   theCompletedLastOp(NULL),
   theNoOfOpSent(0),
   theNoOfOpCompleted(0),
-  theNoOfOpFetched(0),
   theMyRef(0),
   theTCConPtr(0),
   theTransactionId(0),
@@ -132,7 +130,6 @@
     theNdb->theImpl->m_ndb_cluster_connection.get_latest_trans_gci();
   theCommitStatus         = Started;
   theCompletionStatus     = NotCompleted;
-  m_abortOption           = AbortOnError;
 
   theError.code		  = 0;
   theErrorLine		  = 0;
@@ -177,12 +174,9 @@
 NdbTransaction::setOperationErrorCodeAbort(int error, int abortOption)
 {
   DBUG_ENTER("NdbTransaction::setOperationErrorCodeAbort");
-  if (abortOption == -1)
-    abortOption = m_abortOption;
   if (theTransactionIsStarted == false) {
     theCommitStatus = Aborted;
-  } else if ((abortOption == AbortOnError) && 
-	     (theCommitStatus != Committed) &&
+  } else if ((theCommitStatus != Committed) &&
              (theCommitStatus != Aborted)) {
     theCommitStatus = NeedAbort;
   }//if
@@ -264,8 +258,8 @@
 *****************************************************************************/
 int 
 NdbTransaction::execute(ExecType aTypeOfExec, 
-		       AbortOption abortOption,
-		       int forceSend)
+			NdbOperation::AbortOption abortOption,
+			int forceSend)
 {
   NdbError savedError= theError;
   DBUG_ENTER("NdbTransaction::execute");
@@ -355,40 +349,14 @@
       theCompletedLastOp = NULL;
     }
 
-    if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
+    if (executeNoBlobs(tExecType, 
+		       NdbOperation::DefaultAbortOption,
+		       forceSend) == -1)
     {
-      ret = -1;
       if(savedError.code==0)
 	savedError= theError;
       
-      /**
-       * If AO_IgnoreError, error codes arent always set on individual
-       *   operations, making postExecute impossible
-       */
-      if (abortOption == AO_IgnoreError)
-      {
-         if (theCompletedFirstOp != NULL)
-	 {
-	   if (tCompletedFirstOp != NULL)
-	   {
-	     tCompletedLastOp->next(theCompletedFirstOp);
-	     theCompletedFirstOp = tCompletedFirstOp;
-	   } 
-	 }
-	 else
-	 {
-	   theCompletedFirstOp = tCompletedFirstOp;
-	   theCompletedLastOp = tCompletedLastOp;
-	 }
-         if (tPrepOp != NULL && tRestOp != NULL) {
-           if (theFirstOpInList == NULL)
-             theFirstOpInList = tRestOp;
-           else
-             theLastOpInList->next(tRestOp);
-           theLastOpInList = tLastOp;
-        }
-	DBUG_RETURN(-1);
-      }
+      DBUG_RETURN(-1);
     }
     
 #ifdef ndb_api_crash_on_complex_blob_abort
@@ -448,9 +416,9 @@
 }
 
 int 
-NdbTransaction::executeNoBlobs(ExecType aTypeOfExec, 
-                              AbortOption abortOption,
-                              int forceSend)
+NdbTransaction::executeNoBlobs(NdbTransaction::ExecType aTypeOfExec, 
+			       NdbOperation::AbortOption abortOption,
+			       int forceSend)
 {
   DBUG_ENTER("NdbTransaction::executeNoBlobs");
   DBUG_PRINT("enter", ("aTypeOfExec: %d, abortOption: %d", 
@@ -477,6 +445,7 @@
          * This timeout situation can occur if NDB crashes.
          */
         ndbout << "This timeout should never occur, execute(..)" << endl;
+	theError.code = 4012;
         setOperationErrorCodeAbort(4012);  // Error code for "Cluster Failure"
         DBUG_RETURN(-1);
       }//if
@@ -527,14 +496,14 @@
 Remark:        Prepare a part of a transaction in an asynchronous manner. 
 *****************************************************************************/
 void 
-NdbTransaction::executeAsynchPrepare( ExecType           aTypeOfExec,
+NdbTransaction::executeAsynchPrepare(NdbTransaction::ExecType aTypeOfExec,
                                      NdbAsynchCallback  aCallback,
                                      void*              anyObject,
-                                     AbortOption abortOption)
+                                     NdbOperation::AbortOption abortOption)
 {
   DBUG_ENTER("NdbTransaction::executeAsynchPrepare");
-  DBUG_PRINT("enter", ("aTypeOfExec: %d, aCallback: %x, anyObject: %x", 
-		       aTypeOfExec, aCallback, anyObject));
+  DBUG_PRINT("enter", ("aTypeOfExec: %d, aCallback: 0x%lx, anyObject: Ox%lx",
+		       aTypeOfExec, (long) aCallback, (long) anyObject));
 
   /**
    * Reset error.code on execute
@@ -570,7 +539,6 @@
   theReturnStatus     = ReturnSuccess;
   theCallbackFunction = aCallback;
   theCallbackObject   = anyObject;
-  m_abortOption   = abortOption;
   m_waitForReply = true;
   tNdb->thePreparedTransactionsArray[tnoOfPreparedTransactions] = this;
   theTransArrayIndex = tnoOfPreparedTransactions;
@@ -665,8 +633,7 @@
   while (tOp) {
     int tReturnCode;
     NdbOperation* tNextOp = tOp->next();
-
-    tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId);
+    tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId, abortOption);
     if (tReturnCode == -1) {
       theSendStatus = sendABORTfail;
       DBUG_VOID_RETURN;
@@ -926,7 +893,10 @@
      *	The user did not perform any rollback but simply closed the
      *      transaction. We must rollback Ndb since Ndb have been contacted.
      ************************************************************************/
-    execute(Rollback);
+    if (!theSimpleState)
+    {
+      execute(Rollback);
+    }
   }//if
   theMagicNumber = 0xFE11DC;
   theInUseState = false;
@@ -1010,7 +980,7 @@
 NdbTransaction::releaseExecutedScanOperation(NdbIndexScanOperation* cursorOp)
 {
   DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
-  DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
+  DBUG_PRINT("enter", ("this: 0x%lx  op: 0x%lx", (long) this, (long) cursorOp));
   
   releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
   
@@ -1796,14 +1766,8 @@
       }
     } else if ((tNoComp >= tNoSent) &&
                (theLastExecOpInList->theCommitIndicator == 1)){
-
-
-      if (m_abortOption == AO_IgnoreError && theError.code != 0){
-	/**
-	 * There's always a TCKEYCONF when using IgnoreError
-	 */
-	return -1;
-      }
+      
+      
 /**********************************************************************/
 // We sent the transaction with Commit flag set and received a CONF with
 // no Commit flag set. This is clearly an anomaly.
@@ -1970,10 +1934,13 @@
     if (tCommitFlag == 1) {
       theCommitStatus = Committed;
       theGlobalCheckpointId = tGCI;
-      assert(tGCI);
-      *p_latest_trans_gci = tGCI;
+      if (tGCI) // Read(dirty) only transaction doesnt get GCI
+      {
+	*p_latest_trans_gci = tGCI;
+      }
     } else if ((tNoComp >= tNoSent) &&
                (theLastExecOpInList->theCommitIndicator == 1)){
+
       /**********************************************************************/
       // We sent the transaction with Commit flag set and received a CONF with
       // no Commit flag set. This is clearly an anomaly.
@@ -1997,41 +1964,6 @@
   return -1;
 }//NdbTransaction::receiveTCINDXCONF()
 
-/*****************************************************************************
-int receiveTCINDXREF( NdbApiSignal* aSignal)
-
-Return Value:   Return 0 : send was succesful.
-                Return -1: In all other case.   
-Parameters:     aSignal: the signal object that contains the 
-                TCINDXREF signal from TC.
-Remark:         Handles the reception of the TCINDXREF signal.
-*****************************************************************************/
-int
-NdbTransaction::receiveTCINDXREF( NdbApiSignal* aSignal)
-{
-  if(checkState_TransId(aSignal->getDataPtr()+1)){
-    theError.code = aSignal->readData(4);	// Override any previous errors
-
-    /**********************************************************************/
-    /*	A serious error has occured. This could be due to deadlock or */
-    /*	lack of resources or simply a programming error in NDB. This  */
-    /*	transaction will be aborted. Actually it has already been     */
-    /*	and we only need to report completion and return with the     */
-    /*	error code to the application.				      */
-    /**********************************************************************/
-    theCompletionStatus = NdbTransaction::CompletedFailure;
-    theCommitStatus = NdbTransaction::Aborted;
-    theReturnStatus = NdbTransaction::ReturnFailure;
-    return 0;
-  } else {
-#ifdef NDB_NO_DROPPED_SIGNAL
-    abort();
-#endif
-  }
-
-  return -1;
-}//NdbTransaction::receiveTCINDXREF()
-
 /*******************************************************************************
 int OpCompletedFailure();
 
@@ -2041,36 +1973,15 @@
 Remark:        An operation was completed with failure.
 *******************************************************************************/
 int 
-NdbTransaction::OpCompleteFailure(Uint8 abortOption, bool setFailure)
+NdbTransaction::OpCompleteFailure(NdbOperation* op)
 {
   Uint32 tNoComp = theNoOfOpCompleted;
   Uint32 tNoSent = theNoOfOpSent;
-  if (setFailure)
-    theCompletionStatus = NdbTransaction::CompletedFailure;
+
   tNoComp++;
   theNoOfOpCompleted = tNoComp;
-  if (tNoComp == tNoSent) {
-    //------------------------------------------------------------------------
-    //If the transaction consists of only simple reads we can set
-    //Commit state Aborted.  Otherwise this simple operation cannot
-    //decide the success of the whole transaction since a simple
-    //operation is not really part of that transaction.
-    //------------------------------------------------------------------------
-    if (abortOption == AO_IgnoreError){
-      /**
-       * There's always a TCKEYCONF when using IgnoreError
-       */
-      return -1;
-    }
-    
-    return 0;	// Last operation received
-  } else if (tNoComp > tNoSent) {
-    setOperationErrorCodeAbort(4113);	// Too many operations, 
-                                        // stop waiting for more
-    return 0;
-  } else {
-    return -1;	// Continue waiting for more signals
-  }//if
+  
+  return (tNoComp == tNoSent) ? 0 : -1;
 }//NdbTransaction::OpCompleteFailure()
 
 /******************************************************************************

--- 1.67/storage/ndb/src/ndbapi/NdbDictionary.cpp	2007-03-10 12:02:58 +01:00
+++ 1.68/storage/ndb/src/ndbapi/NdbDictionary.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -204,7 +203,7 @@
 
 void 
 NdbDictionary::Column::setPartitionKey(bool val){
-  m_impl.m_distributionKey = true;
+  m_impl.m_distributionKey = val;
 }
 
 bool 
@@ -674,8 +673,14 @@
   return true;
 }
 
+const char *
+NdbDictionary::Table::getTablespaceName() const 
+{
+  return m_impl.m_tablespace_name.c_str();
+}
+
 void 
-NdbDictionary::Table::setTablespace(const char * name){
+NdbDictionary::Table::setTablespaceName(const char * name){
   m_impl.m_tablespace_id = ~0;
   m_impl.m_tablespace_version = ~0;
   m_impl.m_tablespace_name.assign(name);
@@ -1671,6 +1676,18 @@
   if(t)
     return t->m_facade;
   return 0;
+}
+
+int
+NdbDictionary::Dictionary::listEvents(List& list)
+{
+  return m_impl.listEvents(list);
+}
+
+int
+NdbDictionary::Dictionary::listEvents(List& list) const
+{
+  return m_impl.listEvents(list);
 }
 
 int

--- 1.156/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-03-10 12:02:58 +01:00
+++ 1.157/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -2146,9 +2145,14 @@
     impl->m_replicaCount = replicaCount;
     impl->m_fragmentCount = fragCount;
     DBUG_PRINT("info", ("replicaCount=%x , fragCount=%x",replicaCount,fragCount));
-    for(i = 0; i < (Uint32) (fragCount*replicaCount); i++)
+    Uint32 pos = 2;
+    for(i = 0; i < (Uint32) fragCount;i++)
     {
-      impl->m_fragments.push_back(ntohs(tableDesc->ReplicaData[i+2]));
+      pos++; // skip logpart
+      for (Uint32 j = 0; j<(Uint32)replicaCount; j++)
+      {
+	impl->m_fragments.push_back(ntohs(tableDesc->ReplicaData[pos++]));
+      }
     }
 
     Uint32 topBit = (1 << 31);
@@ -2248,9 +2252,9 @@
   }
 
   // blob tables - use "t2" to get values set by kernel
-  if (t2->m_noOfBlobs != 0 && createBlobTables(*t2) != 0) {
+  if (t2->m_noOfBlobs != 0 && createBlobTables(t, *t2) != 0) {
     int save_code = m_error.code;
-    (void)dropTable(*t2);
+    (void)dropTableGlobal(*t2);
     m_error.code = save_code;
     delete t2;
     DBUG_RETURN(-1);
@@ -2262,7 +2266,7 @@
 }
 
 int
-NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
+NdbDictionaryImpl::createBlobTables(NdbTableImpl& orig, NdbTableImpl &t)
 {
   DBUG_ENTER("NdbDictionaryImpl::createBlobTables");
   for (unsigned i = 0; i < t.m_columns.size(); i++) {
@@ -2271,6 +2275,10 @@
       continue;
     NdbTableImpl bt;
     NdbBlob::getBlobTable(bt, &t, &c);
+    NdbDictionary::Column::StorageType 
+      d = NdbDictionary::Column::StorageTypeDisk;
+    if (orig.m_columns[i]->getStorageType() == d)
+      bt.getColumn("DATA")->setStorageType(d);
     if (createTable(bt) != 0) {
       DBUG_RETURN(-1);
     }
@@ -2914,7 +2922,6 @@
 NdbDictionaryImpl::dropTableGlobal(NdbTableImpl & impl)
 {
   int res;
-  const char * name = impl.getName();
   DBUG_ENTER("NdbDictionaryImpl::dropTableGlobal");
   DBUG_ASSERT(impl.m_status != NdbDictionary::Object::New);
   DBUG_ASSERT(impl.m_indexType == NdbDictionary::Object::TypeUndefined);
@@ -3601,7 +3608,7 @@
     evnt.mi_type           = evntConf->getEventType();
     evnt.setTable(dataPtr);
   } else {
-    if (evnt.m_tableImpl->m_id         != evntConf->getTableId() ||
+    if ((Uint32) evnt.m_tableImpl->m_id         != evntConf->getTableId() ||
 	evnt.m_tableImpl->m_version    != evntConf->getTableVersion() ||
 	//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
 	evnt.mi_type           != evntConf->getEventType()) {
@@ -3719,7 +3726,7 @@
       DBUG_RETURN(NULL);
     }
     if ((tab->m_status != NdbDictionary::Object::Retrieved) ||
-        (tab->m_id != ev->m_table_id) ||
+        ((Uint32) tab->m_id != ev->m_table_id) ||
         (table_version_major(tab->m_version) !=
          table_version_major(ev->m_table_version)))
     {
@@ -3749,7 +3756,7 @@
   DBUG_PRINT("info",("Table: id: %d version: %d", 
                      table.m_id, table.m_version));
 
-  if (table.m_id != ev->m_table_id ||
+  if ((Uint32) table.m_id != ev->m_table_id ||
       table_version_major(table.m_version) !=
       table_version_major(ev->m_table_version))
   {
@@ -3765,7 +3772,7 @@
 #endif
 
   
-  if ( attributeList_sz > table.getNoOfColumns() )
+  if ( attributeList_sz > (uint) table.getNoOfColumns() )
   {
     m_error.code = 241;
     DBUG_PRINT("error",("Invalid version, too many columns"));
@@ -3775,7 +3782,7 @@
 
   assert( (int)attributeList_sz <= table.getNoOfColumns() );
   for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
-    if ( id >= table.getNoOfColumns())
+    if ( id >= (uint) table.getNoOfColumns())
     {
       m_error.code = 241;
       DBUG_PRINT("error",("Invalid version, column %d out of range", id));
@@ -4074,17 +4081,217 @@
   DBUG_VOID_RETURN;
 }
 
+#include <NdbScanOperation.hpp>
+#include <NdbSleep.h>
+static int scanEventTable(Ndb* pNdb, 
+                          const NdbDictionary::Table* pTab,
+                          NdbDictionary::Dictionary::List &list)
+{
+  int                  retryAttempt = 0;
+  const int            retryMax = 100;
+  int                  check;
+  NdbTransaction       *pTrans = NULL;
+  NdbScanOperation     *pOp = NULL;
+  NdbRecAttr *event_name, *event_id;
+  NdbError err;
+
+  while (true)
+  {
+    NdbDictionary::Dictionary::List tmp_list;
+
+    if (retryAttempt)
+    {
+      if (retryAttempt >= retryMax)
+      {
+        ndbout << "ERROR: has retried this operation " << retryAttempt 
+               << " times, failing!" << endl;
+        goto error;
+      }
+      if (pTrans)
+        pNdb->closeTransaction(pTrans);
+      NdbSleep_MilliSleep(50);
+    }
+    retryAttempt++;
+    pTrans = pNdb->startTransaction();
+    if (pTrans == NULL)
+    {
+      if (pNdb->getNdbError().status == NdbError::TemporaryError)
+        continue;
+      goto error;
+    }
+
+    Uint64 row_count = 0;
+    {
+      if ((pOp = pTrans->getNdbScanOperation(pTab->getName())) == NULL)
+        goto error;
+      if (pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, 1) != 0)
+        goto error;
+      if (pOp->interpret_exit_last_row() == -1)
+        goto error;
+
+      Uint64 tmp;
+      pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&tmp);
+      if (pTrans->execute(NdbTransaction::NoCommit) == -1)
+        goto error;
+
+      int eof;
+      while ((eof = pOp->nextResult(true)) == 0)
+        row_count += tmp;
+    
+      if (eof == -1)
+      {
+        if (pTrans->getNdbError().status == NdbError::TemporaryError)
+          continue;
+        goto error;
+      }
+    }
+
+    if ((pOp = pTrans->getNdbScanOperation(pTab->getName())) == NULL)
+      goto error;
+
+    if (pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, 1) != 0)
+      goto error;
+    
+    if (pOp->interpret_exit_ok() == -1)
+      goto error;
+    
+    if ((event_id   = pOp->getValue(6)) == 0 ||
+        (event_name = pOp->getValue(0u)) == 0)
+      goto error;
+
+    if (pTrans->execute(NdbTransaction::NoCommit) == -1)
+    {
+      const NdbError err = pTrans->getNdbError();
+      if (err.status == NdbError::TemporaryError)
+        continue;
+      goto error;
+    }
+
+    tmp_list.count = row_count;
+    tmp_list.elements =
+      new NdbDictionary::Dictionary::List::Element[row_count];
+
+    int eof;
+    unsigned rows = 0;
+    while((eof = pOp->nextResult()) == 0)
+    {
+      if (rows < tmp_list.count)
+      {
+        NdbDictionary::Dictionary::List::Element &el = tmp_list.elements[rows];
+        el.id = event_id->u_32_value();
+        el.type = NdbDictionary::Object::TableEvent;
+        el.state = NdbDictionary::Object::StateOnline;
+        el.store = NdbDictionary::Object::StorePermanent;
+        el.name = strdup(event_name->aRef());
+      }
+      rows++;
+    }
+    if (eof == -1)
+    {
+      if (pTrans->getNdbError().status == NdbError::TemporaryError)
+        continue;
+      goto error;
+    }
+
+    pNdb->closeTransaction(pTrans);
+
+    if (rows < tmp_list.count)
+      tmp_list.count = rows;
+
+    list = tmp_list;
+    tmp_list.count = 0;
+    tmp_list.elements = NULL;
+
+    return 0;
+  }
+error:
+  int error_code;
+  if (pTrans)
+  {
+    error_code = pTrans->getNdbError().code;
+    pNdb->closeTransaction(pTrans);
+  }
+  else
+    error_code = pNdb->getNdbError().code;
+
+  return error_code;
+}
+
+int
+NdbDictionaryImpl::listEvents(List& list)
+{
+  int error_code;
+
+  BaseString currentDb(m_ndb.getDatabaseName());
+  BaseString currentSchema(m_ndb.getDatabaseSchemaName());
+
+  m_ndb.setDatabaseName("sys");
+  m_ndb.setDatabaseSchemaName("def");
+
+  const NdbDictionary::Table* pTab =
+    m_facade->getTable("NDB$EVENTS_0");
+
+  if(pTab == NULL)
+    error_code = m_facade->getNdbError().code;
+  else
+    error_code = scanEventTable(&m_ndb, pTab, list);
+
+  m_ndb.setDatabaseName(currentDb.c_str());
+  m_ndb.setDatabaseSchemaName(currentSchema.c_str());
+  if (error_code)
+  {
+    m_error.code = error_code;
+    return -1;
+  }
+  return 0;
+}
+
 /*****************************************************************
  * List objects or indexes
  */
 int
 NdbDictionaryImpl::listObjects(List& list, NdbDictionary::Object::Type type)
 {
+  int ret;
+  List list1, list2;
+  if (type == NdbDictionary::Object::TableEvent)
+    return listEvents(list);
+
+  if (type == NdbDictionary::Object::TypeUndefined)
+  {
+    ret = listEvents(list2);
+    if (ret)
+      return ret;
+  }
+
   ListTablesReq req;
   req.requestData = 0;
   req.setTableType(getKernelConstant(type, objectTypeMapping, 0));
   req.setListNames(true);
-  return m_receiver.listObjects(list, req.requestData, m_ndb.usingFullyQualifiedNames());
+  if (!list2.count)
+    return m_receiver.listObjects(list, req.requestData,
+                                  m_ndb.usingFullyQualifiedNames());
+  ret = m_receiver.listObjects(list1, req.requestData,
+                               m_ndb.usingFullyQualifiedNames());
+  if (ret)
+    return ret;
+  list.count = list1.count + list2.count;
+  list.elements = new NdbDictionary::Dictionary::List::Element[list.count];
+  unsigned i;
+  const NdbDictionary::Dictionary::List::Element null_el;
+  for (i = 0; i < list1.count; i++)
+  {
+    NdbDictionary::Dictionary::List::Element &el = list1.elements[i];
+    list.elements[i] = el;
+    el = null_el;
+  }
+  for (i = 0; i < list2.count; i++)
+  {
+    NdbDictionary::Dictionary::List::Element &el = list2.elements[i];
+    list.elements[i + list1.count] = el;
+    el = null_el;
+  }
+  return 0;
 }
 
 int
@@ -4287,8 +4494,6 @@
 NdbDictInterface::execWAIT_GCP_CONF(NdbApiSignal* signal,
 				    LinearSectionPtr ptr[3])
 {
-  const WaitGCPConf * const conf=
-    CAST_CONSTPTR(WaitGCPConf, signal->getDataPtr());
   m_waiter.signal(NO_WAIT);
 }
 

--- 1.71/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-03-10 12:02:58 +01:00
+++ 1.72/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -587,7 +586,7 @@
   bool setTransporter(class TransporterFacade * tf);
 
   int createTable(NdbTableImpl &t);
-  int createBlobTables(NdbTableImpl& t);
+  int createBlobTables(NdbTableImpl& org, NdbTableImpl& created);
   int alterTable(NdbTableImpl &old_impl, NdbTableImpl &impl);
   int dropTable(const char * name);
   int dropTable(NdbTableImpl &);
@@ -608,6 +607,7 @@
   int dropEvent(const char * eventName);
   int dropEvent(const NdbEventImpl &);
   int dropBlobEvents(const NdbEventImpl &);
+  int listEvents(List& list);
 
   int executeSubscribeEvent(NdbEventOperationImpl &);
   int stopSubscribeEvent(NdbEventOperationImpl &);

--- 1.54/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-03-10 12:02:58 +01:00
+++ 1.55/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -762,7 +761,7 @@
 void
 TransporterFacade::for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
 {
-  DBUG_ENTER("TransporterFacade::connected");
+  DBUG_ENTER("TransporterFacade::for_each");
   Uint32 sz = m_threads.m_statusNext.size();
   TransporterFacade::ThreadData::Object_Execute oe; 
   for (Uint32 i = 0; i < sz ; i ++) 
@@ -1380,20 +1379,18 @@
 
 int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend)
 {
-  int ret_val, response_time;
+  int ret_val;
   if (forceSend)
     m_tp->forceSend(m_block_no);
   else
     m_tp->checkForceSend(m_block_no);
-  if (wait_time == -1) //Means wait forever
-    response_time= WAITFOR_RESPONSE_TIMEOUT;
-  else
-    response_time= wait_time;
+
   NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
   NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
+  const int maxsleep = (wait_time == -1 || wait_time > 10) ? 10 : wait_time;
   do
   {
-    wait_for_input(response_time);
+    wait_for_input(maxsleep);
     Uint32 state= m_waiter->get_state();
     if (state == NO_WAIT)
     {
@@ -1406,7 +1403,7 @@
     }
     if (wait_time == -1)
     {
-#ifdef VM_TRACE
+#ifdef NOT_USED
       ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl;
 #endif
       continue;
@@ -1444,7 +1441,7 @@
       queue if it hasn't happened already. It is usually already out of the
       queue but at time-out it could be that the object is still there.
     */
-    Uint32 cond_wait_index= m_tp->put_in_cond_wait_queue(m_waiter);
+    (void) m_tp->put_in_cond_wait_queue(m_waiter);
     m_waiter->wait(wait_time);
     if (m_waiter->get_cond_wait_index() != TransporterFacade::MAX_NO_THREADS)
     {
@@ -1512,3 +1509,29 @@
 
 template class Vector<NodeStatusFunction>;
 template class Vector<TransporterFacade::ThreadData::Object_Execute>;
+
+#include "SignalSender.hpp"
+
+SendStatus
+SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
+#ifdef API_TRACE
+  if(setSignalLog() && TRACE_GSN(s->header.theVerId_signalNumber)){
+    SignalHeader tmp = s->header;
+    tmp.theSendersBlockRef = getOwnRef();
+
+    LinearSectionPtr ptr[3];
+    signalLogger.sendSignal(tmp,
+			    1,
+			    s->theData,
+			    nodeId, ptr, 0);
+    signalLogger.flushSignalLog();
+  }
+#endif
+  assert(getNodeInfo(nodeId).m_api_reg_conf == true ||
+         s->readSignalNumber() == GSN_API_REGREQ);
+  return theFacade->theTransporterRegistry->prepareSend(&s->header,
+							1, // JBB
+							&s->theData[0],
+							nodeId, 
+							&s->ptr[0]);
+}

--- 1.14/storage/ndb/test/include/NDBT_Table.hpp	2007-03-10 12:02:58 +01:00
+++ 1.15/storage/ndb/test/include/NDBT_Table.hpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.33/storage/ndb/test/ndbapi/testDict.cpp	2007-03-10 12:02:58 +01:00
+++ 1.34/storage/ndb/test/ndbapi/testDict.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -322,7 +321,11 @@
     }
     i++;
   }
-
+  
+  for (Uint32 i = 0; i<numTables; i++)
+    if (tabList[i])
+      pDic->dropTable(NDBT_Tables::getTable(i)->getName());
+  
   delete [] tabList;
   return result;
 }
@@ -1855,7 +1858,7 @@
   Ndb* pNdb = GETNDB(step);  
 
   NdbDictionary::Table tab = *ctx->getTab();
-  tab.setTablespace("DEFAULT-TS");
+  tab.setTablespaceName("DEFAULT-TS");
   
   for(Uint32 i = 0; i<tab.getNoOfColumns(); i++)
     if(!tab.getColumn(i)->getPrimaryKey())
@@ -2199,7 +2202,7 @@
     // create indexes
     const char** indlist = NDBT_Tables::getIndexes(tabName);
     uint indnum = 0;
-    while (*indlist != 0) {
+    while (indlist != 0 && *indlist != 0) {
       uint count = 0;
     try_create_index:
       count++;

--- 1.24/storage/ndb/test/src/NDBT_Tables.cpp	2007-03-10 12:02:58 +01:00
+++ 1.25/storage/ndb/test/src/NDBT_Tables.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of

--- 1.364/sql/ha_ndbcluster.cc	2007-03-10 12:02:58 +01:00
+++ 1.365/sql/ha_ndbcluster.cc	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
-  the Free Software Foundation; either version 2 of the License, or
-  (at your option) any later version.
+  the Free Software Foundation; version 2 of the License.
 
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -48,6 +47,7 @@
 extern my_bool opt_ndb_optimized_node_selection;
 extern const char *opt_ndbcluster_connectstring;
 extern ulong opt_ndb_cache_check_time;
+extern ulong opt_ndb_wait_connected;
 
 // ndb interface initialization/cleanup
 #ifdef  __cplusplus
@@ -134,7 +134,7 @@
 }
 
 static int ndbcluster_inited= 0;
-int ndbcluster_util_inited= 0;
+static int ndbcluster_terminating= 0;
 
 static Ndb* g_ndb= NULL;
 Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -151,15 +151,16 @@
 #ifdef HAVE_NDB_BINLOG
 static int rename_share(NDB_SHARE *share, const char *new_key);
 #endif
-static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
 static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *, 
                                     struct Ndb_statistics *);
 
 
 // Util thread variables
 pthread_t ndb_util_thread;
+int ndb_util_thread_running= 0;
 pthread_mutex_t LOCK_ndb_util_thread;
 pthread_cond_t COND_ndb_util_thread;
+pthread_cond_t COND_ndb_util_ready;
 pthread_handler_t ndb_util_thread_func(void *arg);
 ulong ndb_cache_check_time;
 
@@ -192,9 +193,15 @@
 
 static int update_status_variables(Ndb_cluster_connection *c)
 {
-  ndb_cluster_node_id=         c->node_id();
-  ndb_connected_port=          c->get_connected_port();
-  ndb_connected_host=          c->get_connected_host();
+  ndb_connected_port= c->get_connected_port();
+  ndb_connected_host= c->get_connected_host();
+  if (ndb_cluster_node_id != (int) c->node_id())
+  {
+    ndb_cluster_node_id= c->node_id();
+    sql_print_information("NDB: NodeID is %lu, management server '%s:%lu'",
+                          ndb_cluster_node_id, ndb_connected_host,
+                          ndb_connected_port);
+  }
   ndb_number_of_replicas=      0;
   ndb_number_of_ready_data_nodes= c->get_no_ready();
   ndb_number_of_data_nodes=     c->no_db_nodes();
@@ -259,16 +266,16 @@
 
 int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
 {
-  int res= trans->execute(NdbTransaction::NoCommit,
-                          NdbTransaction::AO_IgnoreError,
-                          h->m_force_send);
-  if (res == 0)
-    return 0;
+  if (trans->execute(NdbTransaction::NoCommit,
+                     NdbOperation::AO_IgnoreError,
+                     h->m_force_send) == -1)
+    return -1;
 
   const NdbError &err= trans->getNdbError();
-  if (err.classification != NdbError::ConstraintViolation &&
+  if (err.classification != NdbError::NoError &&
+      err.classification != NdbError::ConstraintViolation &&
       err.classification != NdbError::NoDataFound)
-    return res;
+    return -1;
 
   return 0;
 }
@@ -286,7 +293,7 @@
   return h->m_ignore_no_key ?
     execute_no_commit_ignore_no_key(h,trans) :
     trans->execute(NdbTransaction::NoCommit,
-		   NdbTransaction::AbortOnError,
+		   NdbOperation::AbortOnError,
 		   h->m_force_send);
 }
 
@@ -299,7 +306,7 @@
     return 0;
 #endif
   return trans->execute(NdbTransaction::Commit,
-                        NdbTransaction::AbortOnError,
+                        NdbOperation::AbortOnError,
                         h->m_force_send);
 }
 
@@ -312,7 +319,7 @@
     return 0;
 #endif
   return trans->execute(NdbTransaction::Commit,
-                        NdbTransaction::AbortOnError,
+                        NdbOperation::AbortOnError,
                         thd->variables.ndb_force_send);
 }
 
@@ -327,7 +334,7 @@
 #endif
   h->release_completed_operations(trans, force_release);
   return trans->execute(NdbTransaction::NoCommit,
-                        NdbTransaction::AO_IgnoreError,
+                        NdbOperation::AO_IgnoreError,
                         h->m_force_send);
 }
 
@@ -413,7 +420,8 @@
     thd_ndb_share->stat.no_uncommitted_rows_count= 0;
     thd_ndb_share->stat.records= ~(ha_rows)0;
   }
-  DBUG_PRINT("exit", ("thd_ndb_share: 0x%x  key: 0x%x", thd_ndb_share, key));
+  DBUG_PRINT("exit", ("thd_ndb_share: 0x%lx  key: 0x%lx",
+                      (long) thd_ndb_share, (long) key));
   DBUG_RETURN(thd_ndb_share);
 }
 
@@ -441,15 +449,15 @@
 {
   ha_rows retval;
   DBUG_ENTER("ha_ndbcluster::records");
-  struct Ndb_local_table_statistics *info= m_table_info;
+  struct Ndb_local_table_statistics *local_info= m_table_info;
   DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
                       ((const NDBTAB *)m_table)->getTableId(),
-                      info->no_uncommitted_rows_count));
+                      local_info->no_uncommitted_rows_count));
 
   Ndb *ndb= get_ndb();
   ndb->setDatabaseName(m_dbname);
   struct Ndb_statistics stat;
-  if (ndb_get_table_statistics(this, true, ndb, m_table, &stat) == 0)
+  if (ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat) == 0)
   {
     retval= stat.row_count;
   }
@@ -460,9 +468,9 @@
 
   THD *thd= current_thd;
   if (get_thd_ndb(thd)->error)
-    info->no_uncommitted_rows_count= 0;
+    local_info->no_uncommitted_rows_count= 0;
 
-  DBUG_RETURN(retval + info->no_uncommitted_rows_count);
+  DBUG_RETURN(retval + local_info->no_uncommitted_rows_count);
 }
 
 int ha_ndbcluster::records_update()
@@ -472,30 +480,29 @@
   DBUG_ENTER("ha_ndbcluster::records_update");
   int result= 0;
 
-  struct Ndb_local_table_statistics *info= m_table_info;
+  struct Ndb_local_table_statistics *local_info= m_table_info;
   DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
                       ((const NDBTAB *)m_table)->getTableId(),
-                      info->no_uncommitted_rows_count));
-  //  if (info->records == ~(ha_rows)0)
+                      local_info->no_uncommitted_rows_count));
   {
     Ndb *ndb= get_ndb();
     struct Ndb_statistics stat;
     ndb->setDatabaseName(m_dbname);
-    result= ndb_get_table_statistics(this, true, ndb, m_table, &stat);
+    result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat);
     if (result == 0)
     {
       stats.mean_rec_length= stat.row_size;
       stats.data_file_length= stat.fragment_memory;
-      info->records= stat.row_count;
+      local_info->records= stat.row_count;
     }
   }
   {
     THD *thd= current_thd;
     if (get_thd_ndb(thd)->error)
-      info->no_uncommitted_rows_count= 0;
+      local_info->no_uncommitted_rows_count= 0;
   }
-  if(result==0)
-    stats.records= info->records+ info->no_uncommitted_rows_count;
+  if (result == 0)
+    stats.records= local_info->records+ local_info->no_uncommitted_rows_count;
   DBUG_RETURN(result);
 }
 
@@ -513,11 +520,11 @@
   if (m_ha_not_exact_count)
     return;
   DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
-  struct Ndb_local_table_statistics *info= m_table_info;
-  info->no_uncommitted_rows_count+= c;
+  struct Ndb_local_table_statistics *local_info= m_table_info;
+  local_info->no_uncommitted_rows_count+= c;
   DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
                       ((const NDBTAB *)m_table)->getTableId(),
-                      info->no_uncommitted_rows_count));
+                      local_info->no_uncommitted_rows_count));
   DBUG_VOID_RETURN;
 }
 
@@ -761,8 +768,8 @@
         blob_ptr= (char*)"";
       }
 
-      DBUG_PRINT("value", ("set blob ptr=%p len=%u",
-                           blob_ptr, blob_len));
+      DBUG_PRINT("value", ("set blob ptr: 0x%lx  len: %u",
+                           (long) blob_ptr, blob_len));
       DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
 
       if (set_blob_value)
@@ -847,8 +854,8 @@
           uint32 len= 0xffffffff;  // Max uint32
           if (ndb_blob->readData(buf, len) != 0)
             ERR_RETURN(ndb_blob->getNdbError());
-          DBUG_PRINT("info", ("[%u] offset=%u buf=%p len=%u [ptrdiff=%d]",
-                              i, offset, buf, len, (int)ptrdiff));
+          DBUG_PRINT("info", ("[%u] offset: %u  buf: 0x%lx  len=%u  [ptrdiff=%d]",
+                              i, offset, (long) buf, len, (int)ptrdiff));
           DBUG_ASSERT(len == len64);
           // Ugly hack assumes only ptr needs to be changed
           field_blob->ptr+= ptrdiff;
@@ -954,7 +961,6 @@
 
 bool ha_ndbcluster::uses_blob_value()
 {
-  uint blob_fields;
   MY_BITMAP *bitmap;
   uint *blob_index, *blob_index_end;
   if (table_share->blob_fields == 0)
@@ -1104,7 +1110,6 @@
   const char *index_name;
   KEY* key_info= tab->key_info;
   const char **key_name= tab->s->keynames.type_names;
-  NDBDICT *dict= ndb->getDictionary();
   DBUG_ENTER("ha_ndbcluster::create_indexes");
   
   for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
@@ -1170,8 +1175,8 @@
       index= dict->getIndexGlobal(index_name, *m_table);
       if (!index)
         ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("index: 0x%x  id: %d  version: %d.%d  status: %d",
-                          index,
+      DBUG_PRINT("info", ("index: 0x%lx  id: %d  version: %d.%d  status: %d",
+                          (long) index,
                           index->getObjectId(),
                           index->getObjectVersion() & 0xFFFFFF,
                           index->getObjectVersion() >> 24,
@@ -1214,8 +1219,8 @@
       index= dict->getIndexGlobal(unique_index_name, *m_table);
       if (!index)
         ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("index: 0x%x  id: %d  version: %d.%d  status: %d",
-                          index,
+      DBUG_PRINT("info", ("index: 0x%lx  id: %d  version: %d.%d  status: %d",
+                          (long) index,
                           index->getObjectId(),
                           index->getObjectVersion() & 0xFFFFFF,
                           index->getObjectVersion() >> 24,
@@ -1242,7 +1247,6 @@
   int error= 0;
   THD *thd=current_thd;
   NDBDICT *dict= ndb->getDictionary();
-  const char *index_name;
   KEY* key_info= tab->key_info;
   const char **key_name= tab->s->keynames.type_names;
   DBUG_ENTER("ha_ndbcluster::open_indexes");
@@ -1254,9 +1258,9 @@
         m_index[i].index= m_index[i].unique_index= NULL;
       else
         break;
-    m_index[i].null_in_unique_index= false;
+    m_index[i].null_in_unique_index= FALSE;
     if (check_index_fields_not_null(key_info))
-      m_index[i].null_in_unique_index= true;
+      m_index[i].null_in_unique_index= TRUE;
   }
 
   if (error && !ignore_error)
@@ -1292,7 +1296,6 @@
   const char *index_name;
   KEY* key_info= tab->key_info;
   const char **key_name= tab->s->keynames.type_names;
-  NDBDICT *dict= ndb->getDictionary();
   DBUG_ENTER("ha_ndbcluster::renumber_indexes");
   
   for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
@@ -1409,10 +1412,10 @@
     {
       Field* field= key_part->field;
       if (field->maybe_null())
-	DBUG_RETURN(true);
+	DBUG_RETURN(TRUE);
     }
   
-  DBUG_RETURN(false);
+  DBUG_RETURN(FALSE);
 }
 
 void ha_ndbcluster::release_metadata(THD *thd, Ndb *ndb)
@@ -1730,7 +1733,8 @@
       ERR_RETURN(trans->getNdbError());
   }
 
-  if (execute_no_commit_ie(this,trans,false) != 0) 
+  if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 ||
+      op->getNdbError().code) 
   {
     table->status= STATUS_NOT_FOUND;
     DBUG_RETURN(ndb_err(trans));
@@ -1795,7 +1799,7 @@
     }
   }
   
-  if (execute_no_commit(this,trans,false) != 0) 
+  if (execute_no_commit(this,trans,FALSE) != 0) 
   {
     table->status= STATUS_NOT_FOUND;
     DBUG_RETURN(ndb_err(trans));
@@ -1841,7 +1845,7 @@
     if (err.status != NdbError::Success)
     {
       if (ndb_to_mysql_error(&err) != (int) errcode)
-        DBUG_RETURN(false);
+        DBUG_RETURN(FALSE);
       if (op == last) break;
       op= trans->getNextCompletedOperation(op);
     }
@@ -1872,10 +1876,10 @@
         if (errcode == HA_ERR_KEY_NOT_FOUND)
           m_dupkey= table->s->primary_key;
       }
-      DBUG_RETURN(false);      
+      DBUG_RETURN(FALSE);      
     }
   }
-  DBUG_RETURN(true);
+  DBUG_RETURN(TRUE);
 }
 
 
@@ -1884,7 +1888,8 @@
  * primary key or unique index values
 */
 
-int ha_ndbcluster::peek_indexed_rows(const byte *record)
+int ha_ndbcluster::peek_indexed_rows(const byte *record,
+				     bool check_pk)
 {
   NdbTransaction *trans= m_active_trans;
   NdbOperation *op;
@@ -1896,7 +1901,7 @@
   NdbOperation::LockMode lm=
       (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
   first= NULL;
-  if (table->s->primary_key != MAX_KEY)
+  if (check_pk && table->s->primary_key != MAX_KEY)
   {
     /*
      * Fetch any row with colliding primary key
@@ -1952,7 +1957,7 @@
   }
   last= trans->getLastDefinedOperation();
   if (first)
-    res= execute_no_commit_ie(this,trans,false);
+    res= execute_no_commit_ie(this,trans,FALSE);
   else
   {
     // Table has no keys
@@ -2001,7 +2006,8 @@
   if ((res= define_read_attrs(buf, op)))
     DBUG_RETURN(res);
 
-  if (execute_no_commit_ie(this,trans,false) != 0) 
+  if (execute_no_commit_ie(this,trans,FALSE) != 0 ||
+      op->getNdbError().code) 
   {
     table->status= STATUS_NOT_FOUND;
     DBUG_RETURN(ndb_err(trans));
@@ -2015,7 +2021,7 @@
 inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
 {
   DBUG_ENTER("fetch_next");
-  int check;
+  int local_check;
   NdbTransaction *trans= m_active_trans;
   
   if (m_lock_tuple)
@@ -2026,19 +2032,21 @@
       LOCK WITH SHARE MODE) and row was not explictly unlocked 
       with unlock_row() call
     */
-      NdbConnection *trans= m_active_trans;
+      NdbConnection *con_trans= m_active_trans;
       NdbOperation *op;
       // Lock row
       DBUG_PRINT("info", ("Keeping lock on scanned row"));
       
       if (!(op= m_active_cursor->lockCurrentTuple()))
       {
-	m_lock_tuple= false;
-	ERR_RETURN(trans->getNdbError());
+        /* purecov: begin inspected */
+	m_lock_tuple= FALSE;
+	ERR_RETURN(con_trans->getNdbError());
+        /* purecov: end */    
       }
       m_ops_pending++;
   }
-  m_lock_tuple= false;
+  m_lock_tuple= FALSE;
   
   bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
                     m_lock.type != TL_READ_WITH_SHARED_LOCKS;;
@@ -2049,13 +2057,13 @@
     */
     if (m_ops_pending && m_blobs_pending)
     {
-      if (execute_no_commit(this,trans,false) != 0)
+      if (execute_no_commit(this,trans,FALSE) != 0)
         DBUG_RETURN(ndb_err(trans));
       m_ops_pending= 0;
       m_blobs_pending= FALSE;
     }
     
-    if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
+    if ((local_check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
     {
       /*
 	Explicitly lock tuple if "select for update" or
@@ -2066,7 +2074,7 @@
 		     m_lock.type == TL_READ_WITH_SHARED_LOCKS);
       DBUG_RETURN(0);
     } 
-    else if (check == 1 || check == 2)
+    else if (local_check == 1 || local_check == 2)
     {
       // 1: No more records
       // 2: No more cached records
@@ -2076,12 +2084,12 @@
         all pending update or delete operations should 
         be sent to NDB
       */
-      DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
+      DBUG_PRINT("info", ("ops_pending: %ld", (long) m_ops_pending));    
       if (m_ops_pending)
       {
         if (m_transaction_on)
         {
-          if (execute_no_commit(this,trans,false) != 0)
+          if (execute_no_commit(this,trans,FALSE) != 0)
             DBUG_RETURN(-1);
         }
         else
@@ -2096,13 +2104,13 @@
         }
         m_ops_pending= 0;
       }
-      contact_ndb= (check == 2);
+      contact_ndb= (local_check == 2);
     }
     else
     {
       DBUG_RETURN(-1);
     }
-  } while (check == 2);
+  } while (local_check == 2);
 
   DBUG_RETURN(1);
 }
@@ -2279,8 +2287,7 @@
           DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
           DBUG_ASSERT(FALSE);
           // Stop setting bounds but continue with what we have
-          op->end_of_bound(range_no);
-          DBUG_RETURN(0);
+          DBUG_RETURN(op->end_of_bound(range_no));
         }
       }
     }
@@ -2309,7 +2316,7 @@
       // Set bound if not done with this key
       if (p.key != NULL)
       {
-        DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
+        DBUG_PRINT("info", ("key %d:%d  offset: %d  length: %d  last: %d  bound: %d",
                             j, i, tot_len, part_len, p.part_last, p.bound_type));
         DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
 
@@ -2327,8 +2334,7 @@
 
     tot_len+= part_store_len;
   }
-  op->end_of_bound(range_no);
-  DBUG_RETURN(0);
+  DBUG_RETURN(op->end_of_bound(range_no));
 }
 
 /*
@@ -2362,7 +2368,7 @@
    bool need_pk = (lm == NdbOperation::LM_Read);
     if (!(op= trans->getNdbIndexScanOperation(m_index[active_index].index, 
                                               m_table)) ||
-        op->readTuples(lm, 0, parallelism, sorted, descending, false, need_pk))
+        op->readTuples(lm, 0, parallelism, sorted, descending, FALSE, need_pk))
       ERR_RETURN(trans->getNdbError());
     if (m_use_partition_function && part_spec != NULL &&
         part_spec->start_part == part_spec->end_part)
@@ -2384,7 +2390,7 @@
   
   {
     const key_range *keys[2]= { start_key, end_key };
-    res= set_bounds(op, active_index, false, keys);
+    res= set_bounds(op, active_index, FALSE, keys);
     if (res)
       DBUG_RETURN(res);
   }
@@ -2408,7 +2414,7 @@
       ERR_RETURN(trans->getNdbError());
   }
 
-  if (execute_no_commit(this,trans,false) != 0)
+  if (execute_no_commit(this,trans,FALSE) != 0)
     DBUG_RETURN(ndb_err(trans));
   
   DBUG_RETURN(next_result(buf));
@@ -2503,7 +2509,7 @@
   if ((res= define_read_attrs(buf, op)))
     DBUG_RETURN(res);
 
-  if (execute_no_commit(this,trans,false) != 0)
+  if (execute_no_commit(this,trans,FALSE) != 0)
     DBUG_RETURN(ndb_err(trans));
   DBUG_PRINT("exit", ("Scan started successfully"));
   DBUG_RETURN(next_result(buf));
@@ -2538,7 +2544,7 @@
     part_spec.start_part= 0;
     part_spec.end_part= m_part_info->get_tot_partitions() - 1;
     prune_partition_set(table, &part_spec);
-    DBUG_PRINT("info", ("part_spec.start_part = %u, part_spec.end_part = %u",
+    DBUG_PRINT("info", ("part_spec.start_part: %u  part_spec.end_part: %u",
                         part_spec.start_part, part_spec.end_part));
     /*
       If partition pruning has found no partition in set
@@ -2572,7 +2578,7 @@
   if ((res= define_read_attrs(buf, op)))
     DBUG_RETURN(res);
 
-  if (execute_no_commit(this,trans,false) != 0)
+  if (execute_no_commit(this,trans,FALSE) != 0)
     DBUG_RETURN(ndb_err(trans));
   DBUG_PRINT("exit", ("Scan started successfully"));
   DBUG_RETURN(next_result(buf));
@@ -2588,7 +2594,7 @@
   NdbTransaction *trans= m_active_trans;
   NdbOperation *op;
   int res;
-  THD *thd= current_thd;
+  THD *thd= table->in_use;
   longlong func_value= 0;
   DBUG_ENTER("ha_ndbcluster::write_row");
 
@@ -2601,7 +2607,6 @@
      */
     if (has_auto_increment) 
     {
-      THD *thd= table->in_use;
       int error;
 
       m_skip_auto_increment= FALSE;
@@ -2621,7 +2626,7 @@
       start_bulk_insert will set parameters to ensure that each
       write_row is committed individually
     */
-    int peek_res= peek_indexed_rows(record);
+    int peek_res= peek_indexed_rows(record, TRUE);
     
     if (!peek_res) 
     {
@@ -2734,13 +2739,13 @@
   {
     // Send rows to NDB
     DBUG_PRINT("info", ("Sending inserts to NDB, "\
-                        "rows_inserted:%d, bulk_insert_rows: %d", 
+                        "rows_inserted: %d  bulk_insert_rows: %d", 
                         (int)m_rows_inserted, (int)m_bulk_insert_rows));
 
     m_bulk_insert_not_flushed= FALSE;
     if (m_transaction_on)
     {
-      if (execute_no_commit(this,trans,false) != 0)
+      if (execute_no_commit(this,trans,FALSE) != 0)
       {
         m_skip_auto_increment= TRUE;
         no_uncommitted_rows_execute_failure();
@@ -2766,10 +2771,12 @@
   {
     Ndb *ndb= get_ndb();
     Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
+#ifndef DBUG_OFF
     char buff[22];
     DBUG_PRINT("info", 
                ("Trying to set next auto increment value to %s",
                 llstr(next_val, buff)));
+#endif
     Ndb_tuple_id_range_guard g(m_share);
     if (ndb->setAutoIncrementValue(m_table, g.range, next_val, TRUE)
         == -1)
@@ -2822,7 +2829,7 @@
 
 int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
 {
-  THD *thd= current_thd;
+  THD *thd= table->in_use;
   NdbTransaction *trans= m_active_trans;
   NdbScanOperation* cursor= m_active_cursor;
   NdbOperation *op;
@@ -2830,9 +2837,27 @@
   uint32 old_part_id= 0, new_part_id= 0;
   int error;
   longlong func_value;
+  bool pk_update= (table_share->primary_key != MAX_KEY &&
+		   key_cmp(table_share->primary_key, old_data, new_data));
   DBUG_ENTER("update_row");
   m_write_op= TRUE;
   
+  /*
+   * If IGNORE the ignore constraint violations on primary and unique keys,
+   * but check that it is not part of INSERT ... ON DUPLICATE KEY UPDATE
+   */
+  if (m_ignore_dup_key && thd->lex->sql_command == SQLCOM_UPDATE)
+  {
+    int peek_res= peek_indexed_rows(new_data, pk_update);
+    
+    if (!peek_res) 
+    {
+      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+    }
+    if (peek_res != HA_ERR_KEY_NOT_FOUND)
+      DBUG_RETURN(peek_res);
+  }
+
   statistic_increment(thd->status_var.ha_update_count, &LOCK_status);
   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
   {
@@ -2853,9 +2878,7 @@
    * Check for update of primary key or partition change
    * for special handling
    */  
-  if (((table_share->primary_key != MAX_KEY) &&
-       key_cmp(table_share->primary_key, old_data, new_data)) ||
-      (old_part_id != new_part_id))
+  if (pk_update || old_part_id != new_part_id)
   {
     int read_res, insert_res, delete_res, undo_res;
 
@@ -2915,7 +2938,7 @@
     DBUG_PRINT("info", ("Calling updateTuple on cursor"));
     if (!(op= cursor->updateCurrentTuple()))
       ERR_RETURN(trans->getNdbError());
-    m_lock_tuple= false;
+    m_lock_tuple= FALSE;
     m_ops_pending++;
     if (uses_blob_value())
       m_blobs_pending= TRUE;
@@ -2978,7 +3001,7 @@
     op->setValue(no_fields, part_func_value);
   }
   // Execute update operation
-  if (!cursor && execute_no_commit(this,trans,false) != 0) {
+  if (!cursor && execute_no_commit(this,trans,FALSE) != 0) {
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
@@ -2993,7 +3016,7 @@
 
 int ha_ndbcluster::delete_row(const byte *record)
 {
-  THD *thd= current_thd;
+  THD *thd= table->in_use;
   NdbTransaction *trans= m_active_trans;
   NdbScanOperation* cursor= m_active_cursor;
   NdbOperation *op;
@@ -3024,7 +3047,7 @@
     DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
     if (cursor->deleteCurrentTuple() != 0)
       ERR_RETURN(trans->getNdbError());     
-    m_lock_tuple= false;
+    m_lock_tuple= FALSE;
     m_ops_pending++;
 
     if (m_use_partition_function)
@@ -3064,7 +3087,7 @@
   }
 
   // Execute delete operation
-  if (execute_no_commit(this,trans,false) != 0) {
+  if (execute_no_commit(this,trans,FALSE) != 0) {
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
@@ -3184,7 +3207,8 @@
           char* ptr;
           field_blob->get_ptr(&ptr, row_offset);
           uint32 len= field_blob->get_length(row_offset);
-          DBUG_PRINT("info",("[%u] SET ptr=%p len=%u", col_no, ptr, len));
+          DBUG_PRINT("info",("[%u] SET ptr: 0x%lx  len: %u",
+                             col_no, (long) ptr, len));
 #endif
         }
       }
@@ -3291,8 +3315,7 @@
     unless m_lock.type == TL_READ_HIGH_PRIORITY
     and no sub-sequent call to unlock_row()
   */
-  m_lock_tuple= false;
-    m_lock_tuple= false;
+  m_lock_tuple= FALSE;
   DBUG_RETURN(0);
 }
 
@@ -3314,7 +3337,6 @@
   const byte* end_ptr= key + key_len;
   curr_part= key_info->key_part;
   end_part= curr_part + key_info->key_parts;
-  
 
   for (; curr_part != end_part && key < end_ptr; curr_part++)
   {
@@ -3426,7 +3448,7 @@
   if (m_use_partition_function)
   {
     get_partition_set(table, buf, active_index, start_key, &part_spec);
-    DBUG_PRINT("info", ("part_spec.start_part = %u, part_spec.end_part = %u",
+    DBUG_PRINT("info", ("part_spec.start_part: %u  part_spec.end_part: %u",
                         part_spec.start_part, part_spec.end_part));
     /*
       If partition pruning has found no partition in set
@@ -3459,8 +3481,9 @@
     {
       if (m_active_cursor && (error= close_scan()))
         DBUG_RETURN(error);
-      DBUG_RETURN(pk_read(start_key->key, start_key->length, buf,
-                          part_spec.start_part));
+      error= pk_read(start_key->key, start_key->length, buf,
+		     part_spec.start_part);
+      DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
     }
     break;
   case UNIQUE_ORDERED_INDEX:
@@ -3471,7 +3494,9 @@
     {
       if (m_active_cursor && (error= close_scan()))
         DBUG_RETURN(error);
-      DBUG_RETURN(unique_index_read(start_key->key, start_key->length, buf));
+
+      error= unique_index_read(start_key->key, start_key->length, buf);
+      DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
     }
     else if (type == UNIQUE_INDEX)
       DBUG_RETURN(unique_index_scan(key_info, 
@@ -3549,20 +3574,20 @@
       
       if (!(op= cursor->lockCurrentTuple()))
       {
-	m_lock_tuple= false;
+	m_lock_tuple= FALSE;
 	ERR_RETURN(trans->getNdbError());
       }
       m_ops_pending++;      
   }
-  m_lock_tuple= false;
+  m_lock_tuple= FALSE;
   if (m_ops_pending)
   {
     /*
       Take over any pending transactions to the 
       deleteing/updating transaction before closing the scan    
     */
-    DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
-    if (execute_no_commit(this,trans,false) != 0) {
+    DBUG_PRINT("info", ("ops_pending: %ld", (long) m_ops_pending));    
+    if (execute_no_commit(this,trans,FALSE) != 0) {
       no_uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
     }
@@ -3680,20 +3705,26 @@
       size_t len = key_part->length;
       const byte * ptr = record + key_part->offset;
       Field *field = key_part->field;
-      if ((field->type() ==  MYSQL_TYPE_VARCHAR) &&
-	  ((Field_varstring*)field)->length_bytes == 1)
+      if (field->type() ==  MYSQL_TYPE_VARCHAR)
       {
-	/** 
-	 * Keys always use 2 bytes length
-	 */
-	buff[0] = ptr[0];
-	buff[1] = 0;
-	memcpy(buff+2, ptr + 1, len);	
-	len += 2;
+        if (((Field_varstring*)field)->length_bytes == 1)
+        {
+          /**
+           * Keys always use 2 bytes length
+           */
+          buff[0] = ptr[0];
+          buff[1] = 0;
+          memcpy(buff+2, ptr + 1, len);
+        }
+        else
+        {
+          memcpy(buff, ptr, len + 2);
+        }
+        len += 2;
       }
       else
       {
-	memcpy(buff, ptr, len);
+        memcpy(buff, ptr, len);
       }
       buff += len;
     }
@@ -3761,7 +3792,7 @@
       struct Ndb_statistics stat;
       ndb->setDatabaseName(m_dbname);
       if (current_thd->variables.ndb_use_exact_count &&
-          (result= ndb_get_table_statistics(this, true, ndb, m_table, &stat))
+          (result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat))
           == 0)
       {
         stats.mean_rec_length= stat.row_size;
@@ -3788,7 +3819,7 @@
   if (flag & HA_STATUS_AUTO)
   {
     DBUG_PRINT("info", ("HA_STATUS_AUTO"));
-    if (m_table)
+    if (m_table && table->found_next_number_field)
     {
       Ndb *ndb= get_ndb();
       Ndb_tuple_id_range_guard g(m_share);
@@ -3957,12 +3988,12 @@
     NdbTransaction *trans= m_active_trans;
     // Send rows to NDB
     DBUG_PRINT("info", ("Sending inserts to NDB, "\
-                        "rows_inserted:%d, bulk_insert_rows: %d", 
+                        "rows_inserted: %d  bulk_insert_rows: %d", 
                         (int) m_rows_inserted, (int) m_bulk_insert_rows)); 
     m_bulk_insert_not_flushed= FALSE;
     if (m_transaction_on)
     {
-      if (execute_no_commit(this, trans,false) != 0)
+      if (execute_no_commit(this, trans,FALSE) != 0)
       {
         no_uncommitted_rows_execute_failure();
         my_errno= error= ndb_err(trans);
@@ -3977,7 +4008,7 @@
       }
       else
       {
-        int res= trans->restart();
+        IF_DBUG(int res=) trans->restart();
         DBUG_ASSERT(res == 0);
       }
     }
@@ -4287,7 +4318,7 @@
   DBUG_ENTER("unlock_row");
 
   DBUG_PRINT("info", ("Unlocking row"));
-  m_lock_tuple= false;
+  m_lock_tuple= FALSE;
   DBUG_VOID_RETURN;
 }
 
@@ -4315,11 +4346,10 @@
       ERR_RETURN(ndb->getNdbError());
     no_uncommitted_rows_reset(thd);
     thd_ndb->stmt= trans;
+    thd_ndb->query_state&= NDB_QUERY_NORMAL;
     trans_register_ha(thd, FALSE, ndbcluster_hton);
   }
-  thd_ndb->query_state&= NDB_QUERY_NORMAL;
   m_active_trans= trans;
-
   // Start of statement
   m_ops_pending= 0;    
   thd->set_current_stmt_binlog_row_based_if_mixed();
@@ -4367,8 +4397,8 @@
   while ((share= it++))
   {
     pthread_mutex_lock(&share->mutex);
-    DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
-			share->key, share->commit_count));
+    DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %lu",
+                        share->table_name, (ulong) share->commit_count));
     share->commit_count= 0;
     share->commit_count_lock++;
     pthread_mutex_unlock(&share->mutex);
@@ -4617,19 +4647,29 @@
       col.setType(NDBCOL::Text);
       col.setCharset(cs);
     }
-    // Use "<=" even if "<" is the exact condition
-    if (field->max_length() <= (1 << 8))
-      goto mysql_type_tiny_blob;
-    else if (field->max_length() <= (1 << 16))
-    {
-      col.setInlineSize(256);
-      col.setPartSize(2000);
-      col.setStripeSize(16);
+    {
+      Field_blob *field_blob= (Field_blob *)field;
+      /*
+       * max_data_length is 2^8-1, 2^16-1, 2^24-1 for tiny, blob, medium.
+       * Tinyblob gets no blob parts.  The other cases are just a crude
+       * way to control part size and striping.
+       *
+       * In mysql blob(256) is promoted to blob(65535) so it does not
+       * in fact fit "inline" in NDB.
+       */
+      if (field_blob->max_data_length() < (1 << 8))
+        goto mysql_type_tiny_blob;
+      else if (field_blob->max_data_length() < (1 << 16))
+      {
+        col.setInlineSize(256);
+        col.setPartSize(2000);
+        col.setStripeSize(16);
+      }
+      else if (field_blob->max_data_length() < (1 << 24))
+        goto mysql_type_medium_blob;
+      else
+        goto mysql_type_long_blob;
     }
-    else if (field->max_length() <= (1 << 24))
-      goto mysql_type_medium_blob;
-    else
-      goto mysql_type_long_blob;
     break;
   mysql_type_medium_blob:
   case MYSQL_TYPE_MEDIUM_BLOB:   
@@ -4686,7 +4726,9 @@
   // Set autoincrement
   if (field->flags & AUTO_INCREMENT_FLAG) 
   {
+#ifndef DBUG_OFF
     char buff[22];
+#endif
     col.setAutoIncrement(TRUE);
     ulonglong value= info->auto_increment_value ?
       info->auto_increment_value : (ulonglong) 1;
@@ -4704,15 +4746,16 @@
 
 int ha_ndbcluster::create(const char *name, 
                           TABLE *form, 
-                          HA_CREATE_INFO *info)
+                          HA_CREATE_INFO *create_info)
 {
   THD *thd= current_thd;
   NDBTAB tab;
   NDBCOL col;
   uint pack_length, length, i, pk_length= 0;
   const void *data, *pack_data;
-  bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
+  bool create_from_engine= (create_info->table_options &
HA_OPTION_CREATE_FROM_ENGINE);
   bool is_truncate= (thd->lex->sql_command == SQLCOM_TRUNCATE);
+  char tablespace[FN_LEN];
 
   DBUG_ENTER("ha_ndbcluster::create");
   DBUG_PRINT("enter", ("name: %s", name));
@@ -4721,8 +4764,22 @@
   set_dbname(name);
   set_tabname(name);
 
+  if ((my_errno= check_ndb_connection()))
+    DBUG_RETURN(my_errno);
+  
+  Ndb *ndb= get_ndb();
+  NDBDICT *dict= ndb->getDictionary();
+
   if (is_truncate)
   {
+    {
+      Ndb_table_guard ndbtab_g(dict, m_tabname);
+      if (!(m_table= ndbtab_g.get_table()))
+	ERR_RETURN(dict->getNdbError());
+      if ((get_tablespace_name(thd, tablespace, FN_LEN)))
+	create_info->tablespace= tablespace;    
+      m_table= NULL;
+    }
     DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE"));
     if ((my_errno= delete_table(name)))
       DBUG_RETURN(my_errno);
@@ -4750,7 +4807,7 @@
     schema distribution table is setup
     ( unless it is a creation of the schema dist table itself )
   */
-  if (!schema_share &&
+  if (!ndb_schema_share &&
       !(strcmp(m_dbname, NDB_REP_DB) == 0 &&
         strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
   {
@@ -4761,7 +4818,7 @@
 
   DBUG_PRINT("table", ("name: %s", m_tabname));  
   tab.setName(m_tabname);
-  tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));    
+  tab.setLogging(!(create_info->options & HA_LEX_CREATE_TMP_TABLE));    
    
   // Save frm data for this table
   if (readfrm(name, &data, &length))
@@ -4771,8 +4828,7 @@
     my_free((char*)data, MYF(0));
     DBUG_RETURN(2);
   }
-
-  DBUG_PRINT("info", ("setFrm data=%lx  len=%d", pack_data, pack_length));
+  DBUG_PRINT("info", ("setFrm data: 0x%lx  len: %d", (long) pack_data, pack_length));
   tab.setFrm(pack_data, pack_length);      
   my_free((char*)data, MYF(0));
   my_free((char*)pack_data, MYF(0));
@@ -4783,10 +4839,10 @@
     DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", 
                         field->field_name, field->real_type(),
                         field->pack_length()));
-    if ((my_errno= create_ndb_column(col, field, info)))
+    if ((my_errno= create_ndb_column(col, field, create_info)))
       DBUG_RETURN(my_errno);
  
-    if (info->store_on_disk || getenv("NDB_DEFAULT_DISK"))
+    if (create_info->storage_media == HA_SM_DISK)
       col.setStorageType(NdbDictionary::Column::StorageTypeDisk);
     else
     {
@@ -4810,11 +4866,29 @@
                              NdbDictionary::Column::StorageTypeMemory);
   }
 
-  if (info->store_on_disk)
-    if (info->tablespace)
-      tab.setTablespace(info->tablespace);
+  if (create_info->storage_media == HA_SM_DISK)
+  { 
+    if (create_info->tablespace)
+      tab.setTablespaceName(create_info->tablespace);
     else
-      tab.setTablespace("DEFAULT-TS");
+      tab.setTablespaceName("DEFAULT-TS");
+  }
+  else if (create_info->tablespace)
+  {
+    if (create_info->storage_media == HA_SM_MEMORY)
+    {
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+			  ER_ILLEGAL_HA_CREATE_OPTION,
+			  ER(ER_ILLEGAL_HA_CREATE_OPTION),
+			  ndbcluster_hton_name,
+			  "TABLESPACE currently only supported for "
+			  "STORAGE DISK"); 
+      DBUG_RETURN(HA_ERR_UNSUPPORTED);
+    }
+    tab.setTablespaceName(create_info->tablespace);
+    create_info->storage_media = HA_SM_DISK;  //if use tablespace, that also means
store on disk
+  }
+
   // No primary key, create shadow key as 64 bit, auto increment  
   if (form->s->primary_key == MAX_KEY) 
   {
@@ -4843,13 +4917,13 @@
     case MYSQL_TYPE_MEDIUM_BLOB:   
     case MYSQL_TYPE_LONG_BLOB: 
     {
-      NdbDictionary::Column * col= tab.getColumn(i);
-      int size= pk_length + (col->getPartSize()+3)/4 + 7;
+      NdbDictionary::Column * column= tab.getColumn(i);
+      int size= pk_length + (column->getPartSize()+3)/4 + 7;
       if (size > NDB_MAX_TUPLE_SIZE_IN_WORDS && 
          (pk_length+7) < NDB_MAX_TUPLE_SIZE_IN_WORDS)
       {
         size= NDB_MAX_TUPLE_SIZE_IN_WORDS - pk_length - 7;
-        col->setPartSize(4*size);
+        column->setPartSize(4*size);
       }
       /**
        * If size > NDB_MAX and pk_length+7 >= NDB_MAX
@@ -4869,12 +4943,7 @@
     DBUG_RETURN(my_errno);
   }
 
-  if ((my_errno= check_ndb_connection()))
-    DBUG_RETURN(my_errno);
-  
   // Create the table in NDB     
-  Ndb *ndb= get_ndb();
-  NDBDICT *dict= ndb->getDictionary();
   if (dict->createTable(tab) != 0) 
   {
     const NdbError err= dict->getNdbError();
@@ -4947,11 +5016,17 @@
       get a new share
     */
 
-    if (!(share= get_share(name, form, true, true)))
+    /* ndb_share reference create */
+    if (!(share= get_share(name, form, TRUE, TRUE)))
     {
       sql_print_error("NDB: allocating table share for %s failed", name);
       /* my_errno is set */
     }
+    else
+    {
+      DBUG_PRINT("NDB_SHARE", ("%s binlog create  use_count: %u",
+                               share->key, share->use_count));
+    }
     pthread_mutex_unlock(&ndbcluster_mutex);
 
     while (!IS_TMP_PREFIX(m_tabname))
@@ -4960,7 +5035,7 @@
       ndb_rep_event_name(&event_name,m_dbname,m_tabname);
       int do_event_op= ndb_binlog_running;
 
-      if (!schema_share &&
+      if (!ndb_schema_share &&
           strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
         do_event_op= 1;
@@ -4975,7 +5050,7 @@
         if (ndb_extra_logging)
           sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
                                 event_name.c_ptr());
-        if (share && do_event_op &&
+        if (share && 
             ndbcluster_create_event_ops(share, m_table, event_name.c_ptr()))
         {
           sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
@@ -5009,10 +5084,8 @@
 int ha_ndbcluster::create_handler_files(const char *file,
                                         const char *old_name,
                                         int action_flag,
-                                        HA_CREATE_INFO *info) 
+                                        HA_CREATE_INFO *create_info)
 { 
-  char path[FN_REFLEN];
-  const char *name;
   Ndb* ndb;
   const NDBTAB *tab;
   const void *data, *pack_data;
@@ -5030,7 +5103,7 @@
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
   NDBDICT *dict= ndb->getDictionary();
-  if (!info->frm_only)
+  if (!create_info->frm_only)
     DBUG_RETURN(0); // Must be a create, ignore since frm is saved in create
 
   // TODO handle this
@@ -5067,6 +5140,9 @@
   }
   
   set_ndb_share_state(m_share, NSS_INITIAL);
+  /* ndb_share reference schema(?) free */
+  DBUG_PRINT("NDB_SHARE", ("%s binlog schema(?) free  use_count: %u",
+                           m_share->key, m_share->use_count));
   free_share(&m_share); // Decrease ref_count
 
   DBUG_RETURN(error);
@@ -5109,6 +5185,17 @@
     error= create_unique_index(unique_name, key_info);
     break;
   case ORDERED_INDEX:
+    if (key_info->algorithm == HA_KEY_ALG_HASH)
+    {
+      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+			  ER_ILLEGAL_HA_CREATE_OPTION,
+			  ER(ER_ILLEGAL_HA_CREATE_OPTION),
+			  ndbcluster_hton_name,
+			  "Ndb does not support non-unique "
+			  "hash based indexes");
+      error= HA_ERR_UNSUPPORTED;
+      break;
+    }
     error= create_ordered_index(name, key_info);
     break;
   default:
@@ -5182,7 +5269,10 @@
 */ 
 void ha_ndbcluster::prepare_for_alter()
 {
+  /* ndb_share reference schema */
   ndbcluster_get_share(m_share); // Increase ref_count
+  DBUG_PRINT("NDB_SHARE", ("%s binlog schema  use_count: %u",
+                           m_share->key, m_share->use_count));
   set_ndb_share_state(m_share, NSS_ALTERED);
 }
 
@@ -5192,19 +5282,18 @@
 int ha_ndbcluster::add_index(TABLE *table_arg, 
                              KEY *key_info, uint num_of_keys)
 {
-  DBUG_ENTER("ha_ndbcluster::add_index");
-  DBUG_PRINT("info", ("ha_ndbcluster::add_index to table %s", 
-                      table_arg->s->table_name));
   int error= 0;
   uint idx;
-
+  DBUG_ENTER("ha_ndbcluster::add_index");
+  DBUG_PRINT("enter", ("table %s", table_arg->s->table_name.str));
   DBUG_ASSERT(m_share->state == NSS_ALTERED);
+
   for (idx= 0; idx < num_of_keys; idx++)
   {
     KEY *key= key_info + idx;
     KEY_PART_INFO *key_part= key->key_part;
     KEY_PART_INFO *end= key_part + key->key_parts;
-    NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key, false);
+    NDB_INDEX_TYPE idx_type= get_index_type_from_key(idx, key_info, false);
     DBUG_PRINT("info", ("Adding index: '%s'", key_info[idx].name));
     // Add fields to key_part struct
     for (; key_part != end; key_part++)
@@ -5217,6 +5306,9 @@
   if (error)
   {
     set_ndb_share_state(m_share, NSS_INITIAL);
+    /* ndb_share reference schema free */
+    DBUG_PRINT("NDB_SHARE", ("%s binlog schema free  use_count: %u",
+                             m_share->key, m_share->use_count));
     free_share(&m_share); // Decrease ref_count
   }
   DBUG_RETURN(error);  
@@ -5261,6 +5353,9 @@
   if((error= drop_indexes(ndb, table_arg)))
   {
     m_share->state= NSS_INITIAL;
+    /* ndb_share reference schema free */
+    DBUG_PRINT("NDB_SHARE", ("%s binlog schema free  use_count: %u",
+                             m_share->key, m_share->use_count));
     free_share(&m_share); // Decrease ref_count
   }
   DBUG_RETURN(error);
@@ -5302,10 +5397,13 @@
   int ndb_table_id= orig_tab->getObjectId();
   int ndb_table_version= orig_tab->getObjectVersion();
 
-  NDB_SHARE *share= get_share(from, 0, false);
+  /* ndb_share reference temporary */
+  NDB_SHARE *share= get_share(from, 0, FALSE);
   if (share)
   {
-    int r= rename_share(share, to);
+    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                             share->key, share->use_count));
+    IF_DBUG(int r=) rename_share(share, to);
     DBUG_ASSERT(r == 0);
   }
 #endif
@@ -5326,8 +5424,11 @@
 #ifdef HAVE_NDB_BINLOG
     if (share)
     {
-      int r= rename_share(share, from);
-      DBUG_ASSERT(r == 0);
+      IF_DBUG(int ret=) rename_share(share, from);
+      DBUG_ASSERT(ret == 0);
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share);
     }
 #endif
@@ -5340,7 +5441,12 @@
     // ToDo in 4.1 should rollback alter table...
 #ifdef HAVE_NDB_BINLOG
     if (share)
+    {
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share);
+    }
 #endif
     DBUG_RETURN(result);
   }
@@ -5374,7 +5480,7 @@
       if (ndb_extra_logging)
         sql_print_information("NDB Binlog: RENAME Event: %s",
                               event_name.c_ptr());
-      if (share && ndb_binlog_running &&
+      if (share &&
           ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
       {
         sql_print_error("NDB Binlog: FAILED create event operations "
@@ -5421,7 +5527,12 @@
     }
   }
   if (share)
+  {
+    /* ndb_share reference temporary free */
+    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                             share->key, share->use_count));
     free_share(&share);
+  }
 #endif
 
   DBUG_RETURN(result);
@@ -5451,12 +5562,18 @@
     Don't allow drop table unless
     schema distribution table is setup
   */
-  if (!schema_share)
+  if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
-  NDB_SHARE *share= get_share(path, 0, false);
+  /* ndb_share reference temporary */
+  NDB_SHARE *share= get_share(path, 0, FALSE);
+  if (share)
+  {
+    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                             share->key, share->use_count));
+  }
 #endif
 
   /* Drop the table from NDB */
@@ -5536,9 +5653,14 @@
           The share kept by the server has not been freed, free it
         */
         share->state= NSS_DROPPED;
+        /* ndb_share reference create free */
+        DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
+                                 share->key, share->use_count));
         free_share(&share, TRUE);
       }
-      /* free the share taken above */
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share, TRUE);
       pthread_mutex_unlock(&ndbcluster_mutex);
     }
@@ -5588,9 +5710,14 @@
         The share kept by the server has not been freed, free it
       */
       share->state= NSS_DROPPED;
+      /* ndb_share reference create free */
+      DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share, TRUE);
     }
-    /* free the share taken above */
+    /* ndb_share reference temporary free */
+    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                             share->key, share->use_count));
     free_share(&share, TRUE);
     pthread_mutex_unlock(&ndbcluster_mutex);
   }
@@ -5610,7 +5737,7 @@
     Don't allow drop table unless
     schema distribution table is setup
   */
-  if (!schema_share)
+  if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -5766,6 +5893,9 @@
 
   if (m_share)
   {
+    /* ndb_share reference handler free */
+    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
+                             m_share->key, m_share->use_count));
     free_share(&m_share);
   }
   release_metadata(thd, ndb);
@@ -5828,14 +5958,21 @@
   DBUG_PRINT("info", ("ref_length: %d", ref_length));
 
   // Init table lock structure 
+  /* ndb_share reference handler */
   if (!(m_share=get_share(name, table)))
     DBUG_RETURN(1);
+  DBUG_PRINT("NDB_SHARE", ("%s handler  use_count: %u",
+                           m_share->key, m_share->use_count));
   thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
   
   set_dbname(name);
   set_tabname(name);
   
-  if (check_ndb_connection()) {
+  if (check_ndb_connection())
+  {
+    /* ndb_share reference handler free */
+    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
+                             m_share->key, m_share->use_count));
     free_share(&m_share);
     m_share= 0;
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -5847,7 +5984,7 @@
     Ndb *ndb= get_ndb();
     ndb->setDatabaseName(m_dbname);
     struct Ndb_statistics stat;
-    res= ndb_get_table_statistics(NULL, false, ndb, m_table, &stat);
+    res= ndb_get_table_statistics(NULL, FALSE, ndb, m_table, &stat);
     stats.mean_rec_length= stat.row_size;
     stats.data_file_length= stat.fragment_memory;
     stats.records= stat.row_count;
@@ -5894,8 +6031,11 @@
 int ha_ndbcluster::close(void)
 {
   DBUG_ENTER("close");
-  THD *thd= current_thd;
+  THD *thd= table->in_use;
   Ndb *ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
+  /* ndb_share reference handler free */
+  DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
+                           m_share->key, m_share->use_count));
   free_share(&m_share);
   m_share= 0;
   release_metadata(thd, ndb);
@@ -6002,7 +6142,13 @@
   ndb->setDatabaseName(db);
   NDBDICT* dict= ndb->getDictionary();
   build_table_filename(key, sizeof(key), db, name, "", 0);
-  NDB_SHARE *share= get_share(key, 0, false);
+  /* ndb_share reference temporary */
+  NDB_SHARE *share= get_share(key, 0, FALSE);
+  if (share)
+  {
+    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                             share->key, share->use_count));
+  }
   if (share && get_ndb_share_state(share) == NSS_ALTERED)
   {
     // Frm has been altered on disk, but not yet written to ndb
@@ -6048,12 +6194,22 @@
   *frmblob= data;
   
   if (share)
+  {
+    /* ndb_share reference temporary free */
+    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                             share->key, share->use_count));
     free_share(&share);
+  }
 
   DBUG_RETURN(0);
 err:
   if (share)
+  {
+    /* ndb_share reference temporary free */
+    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                             share->key, share->use_count));
     free_share(&share);
+  }
   if (ndb_error.code)
   {
     ERR_RETURN(ndb_error);
@@ -6169,14 +6325,13 @@
 
 static void ndbcluster_drop_database(handlerton *hton, char *path)
 {
-  THD *thd= current_thd;
   DBUG_ENTER("ndbcluster_drop_database");
 #ifdef HAVE_NDB_BINLOG
   /*
     Don't allow drop database unless
     schema distribution table is setup
   */
-  if (!schema_share)
+  if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
     DBUG_VOID_RETURN;
@@ -6186,6 +6341,7 @@
   ndbcluster_drop_database_impl(path);
 #ifdef HAVE_NDB_BINLOG
   char db[FN_REFLEN];
+  THD *thd= current_thd;
   ha_ndbcluster::set_dbname(path, db);
   ndbcluster_log_schema_op(thd, 0,
                            thd->query, thd->query_length,
@@ -6193,9 +6349,7 @@
 #endif
   DBUG_VOID_RETURN;
 }
-/*
-  find all tables in ndb and discover those needed
-*/
+
 int ndb_create_table_from_engine(THD *thd, const char *db,
                                  const char *table_name)
 {
@@ -6208,18 +6362,22 @@
   return res;
 }
 
+/*
+  find all tables in ndb and discover those needed
+*/
 int ndbcluster_find_all_files(THD *thd)
 {
-  DBUG_ENTER("ndbcluster_find_all_files");
   Ndb* ndb;
   char key[FN_REFLEN];
+  NDBDICT *dict;
+  int unhandled, retries= 5, skipped;
+  DBUG_ENTER("ndbcluster_find_all_files");
 
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
-  NDBDICT *dict= ndb->getDictionary();
+  dict= ndb->getDictionary();
 
-  int unhandled, retries= 5, skipped;
   LINT_INIT(unhandled);
   LINT_INIT(skipped);
   do
@@ -6289,7 +6447,13 @@
       }
       else if (cmp_frm(ndbtab, pack_data, pack_length))
       {
-        NDB_SHARE *share= get_share(key, 0, false);
+        /* ndb_share reference temporary */
+        NDB_SHARE *share= get_share(key, 0, FALSE);
+        if (share)
+        {
+          DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                                   share->key, share->use_count));
+        }
         if (!share || get_ndb_share_state(share) != NSS_ALTERED)
         {
           discover= 1;
@@ -6297,7 +6461,12 @@
                                 elmt.database, elmt.name);
         }
         if (share)
+        {
+          /* ndb_share reference temporary free */
+          DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                                   share->key, share->use_count));
           free_share(&share);
+        }
       }
       my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
       my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
@@ -6403,12 +6572,12 @@
   List<char> delete_list;
   while ((file_name=it++))
   {
-    bool file_on_disk= false;
+    bool file_on_disk= FALSE;
     DBUG_PRINT("info", ("%s", file_name));     
     if (hash_search(&ndb_tables, file_name, strlen(file_name)))
     {
       DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
-      file_on_disk= true;
+      file_on_disk= TRUE;
     }
     
     // Check for .ndb file with this name
@@ -6524,6 +6693,23 @@
   
   hash_free(&ok_tables);
   hash_free(&ndb_tables);
+
+  // Delete schema file from files
+  if (!strcmp(db, NDB_REP_DB))
+  {
+    uint count = 0;
+    while (count++ < files->elements)
+    {
+      file_name = (char *)files->pop();
+      if (!strcmp(file_name, NDB_SCHEMA_TABLE))
+      {
+        DBUG_PRINT("info", ("skip %s.%s table, it should be hidden to user",
+                   NDB_REP_DB, NDB_SCHEMA_TABLE));
+        continue;
+      }
+      files->push_back(file_name); 
+    }
+  }
   } // extra bracket to avoid gcc 2.95.3 warning
   DBUG_RETURN(0);    
 }
@@ -6537,6 +6723,7 @@
 /* Call back after cluster connect */
 static int connect_callback()
 {
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
   update_status_variables(g_ndb_cluster_connection);
 
   uint node_id, i= 0;
@@ -6546,6 +6733,7 @@
     g_node_id_map[node_id]= i++;
 
   pthread_cond_signal(&COND_ndb_util_thread);
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
   return 0;
 }
 
@@ -6556,6 +6744,15 @@
   int res;
   DBUG_ENTER("ndbcluster_init");
 
+  if (ndbcluster_inited)
+    DBUG_RETURN(FALSE);
+
+  pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_ndb_util_thread, NULL);
+  pthread_cond_init(&COND_ndb_util_ready, NULL);
+  ndb_util_thread_running= -1;
+  ndbcluster_terminating= 0;
   ndb_dictionary_is_mysqld= 1;
   ndbcluster_hton= (handlerton *)p;
 
@@ -6619,13 +6816,48 @@
     goto ndbcluster_init_error;
   }
 
-  if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
+  /* Connect to management server */
+
+  struct timeval end_time;
+  gettimeofday(&end_time, 0);
+  end_time.tv_sec+= opt_ndb_wait_connected;
+
+  while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
+  {
+    struct timeval now_time;
+    gettimeofday(&now_time, 0);
+    if (now_time.tv_sec > end_time.tv_sec ||
+        (now_time.tv_sec == end_time.tv_sec &&
+         now_time.tv_usec >= end_time.tv_usec))
+      break;
+    sleep(1);
+  }
+
+  if (res == 0)
   {
     connect_callback();
     DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
                        g_ndb_cluster_connection->get_connected_host(),
                        g_ndb_cluster_connection->get_connected_port()));
-    g_ndb_cluster_connection->wait_until_ready(10,3);
+    {
+      struct timeval now_time;
+      gettimeofday(&now_time, 0);
+      ulong wait_until_ready_time = (end_time.tv_sec > now_time.tv_sec) ?
+        end_time.tv_sec - now_time.tv_sec : 1;
+      res= g_ndb_cluster_connection->wait_until_ready(wait_until_ready_time,3);
+    }
+    if (res == 0)
+    {
+      sql_print_information("NDB: all storage nodes connected");
+    }
+    else if (res > 0)
+    {
+      sql_print_information("NDB: some storage nodes connected");
+    }
+    else if (res < 0)
+    {
+      sql_print_information("NDB: no storage nodes connected (timed out)");
+    }
   } 
   else if (res == 1)
   {
@@ -6654,17 +6886,12 @@
   
   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                    (hash_get_key) ndbcluster_get_key,0,0);
-  pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
 #ifdef HAVE_NDB_BINLOG
   /* start the ndb injector thread */
   if (ndbcluster_binlog_start())
     goto ndbcluster_init_error;
 #endif /* HAVE_NDB_BINLOG */
 
-  pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&COND_ndb_util_thread, NULL);
-
-
   ndb_cache_check_time = opt_ndb_cache_check_time;
   // Create utility thread
   pthread_t tmp;
@@ -6675,6 +6902,24 @@
     pthread_mutex_destroy(&ndbcluster_mutex);
     pthread_mutex_destroy(&LOCK_ndb_util_thread);
     pthread_cond_destroy(&COND_ndb_util_thread);
+    pthread_cond_destroy(&COND_ndb_util_ready);
+    goto ndbcluster_init_error;
+  }
+
+  /* Wait for the util thread to start */
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  while (ndb_util_thread_running < 0)
+    pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  
+  if (!ndb_util_thread_running)
+  {
+    DBUG_PRINT("error", ("ndb utility thread exited prematurely"));
+    hash_free(&ndbcluster_open_tables);
+    pthread_mutex_destroy(&ndbcluster_mutex);
+    pthread_mutex_destroy(&LOCK_ndb_util_thread);
+    pthread_cond_destroy(&COND_ndb_util_thread);
+    pthread_cond_destroy(&COND_ndb_util_ready);
     goto ndbcluster_init_error;
   }
 
@@ -6700,6 +6945,17 @@
 
   if (!ndbcluster_inited)
     DBUG_RETURN(0);
+  ndbcluster_inited= 0;
+
+  /* wait for util thread to finish */
+  sql_print_information("Stopping Cluster Utility thread");
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  ndbcluster_terminating= 1;
+  pthread_cond_signal(&COND_ndb_util_thread);
+  while (ndb_util_thread_running > 0)
+    pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
 
 #ifdef HAVE_NDB_BINLOG
   {
@@ -6712,7 +6968,7 @@
       fprintf(stderr, "NDB: table share %s with use_count %d not freed\n",
               share->key, share->use_count);
 #endif
-      real_free_share(&share);
+      ndbcluster_real_free_share(&share);
     }
     pthread_mutex_unlock(&ndbcluster_mutex);
   }
@@ -6746,14 +7002,14 @@
   pthread_mutex_destroy(&ndbcluster_mutex);
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
-  ndbcluster_inited= 0;
+  pthread_cond_destroy(&COND_ndb_util_ready);
   DBUG_RETURN(0);
 }
 
 void ha_ndbcluster::print_error(int error, myf errflag)
 {
   DBUG_ENTER("ha_ndbcluster::print_error");
-  DBUG_PRINT("enter", ("error = %d", error));
+  DBUG_PRINT("enter", ("error: %d", error));
 
   if (error == HA_ERR_NO_PARTITION_FOUND)
     m_part_info->print_no_partition_found(table);
@@ -6909,19 +7165,19 @@
     {
       // We must provide approx table rows
       Uint64 table_rows=0;
-      Ndb_local_table_statistics *info= m_table_info;
-      if (info->records != ~(ha_rows)0 && info->records != 0)
+      Ndb_local_table_statistics *ndb_info= m_table_info;
+      if (ndb_info->records != ~(ha_rows)0 && ndb_info->records != 0)
       {
-        table_rows = info->records;
-        DBUG_PRINT("info", ("use info->records: %llu", table_rows));
+        table_rows = ndb_info->records;
+        DBUG_PRINT("info", ("use info->records: %lu", (ulong) table_rows));
       }
       else
       {
         Ndb_statistics stat;
-        if ((res=ndb_get_table_statistics(this, true, ndb, m_table, &stat)) != 0)
+        if ((res=ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat)))
           break;
         table_rows=stat.row_count;
-        DBUG_PRINT("info", ("use db row_count: %llu", table_rows));
+        DBUG_PRINT("info", ("use db row_count: %lu", (ulong) table_rows));
         if (table_rows == 0) {
           // Problem if autocommit=0
 #ifdef ndb_get_table_statistics_uses_active_trans
@@ -6944,7 +7200,7 @@
       if ((op->readTuples(NdbOperation::LM_CommittedRead)) == -1)
         ERR_BREAK(op->getNdbError(), res);
       const key_range *keys[2]={ min_key, max_key };
-      if ((res=set_bounds(op, inx, true, keys)) != 0)
+      if ((res=set_bounds(op, inx, TRUE, keys)) != 0)
         break;
 
       // Decide if db should be contacted
@@ -7050,7 +7306,10 @@
     DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
     DBUG_RETURN(1);
   }
+  /* ndb_share reference temporary, free below */
   share->use_count++;
+  DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                           share->key, share->use_count));
   pthread_mutex_unlock(&ndbcluster_mutex);
 
   pthread_mutex_lock(&share->mutex);
@@ -7059,10 +7318,15 @@
     if (share->commit_count != 0)
     {
       *commit_count= share->commit_count;
+#ifndef DBUG_OFF
       char buff[22];
+#endif
       DBUG_PRINT("info", ("Getting commit_count: %s from share",
                           llstr(share->commit_count, buff)));
       pthread_mutex_unlock(&share->mutex);
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share);
       DBUG_RETURN(0);
     }
@@ -7079,8 +7343,11 @@
   {
     Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
     if (ndbtab_g.get_table() == 0
-        || ndb_get_table_statistics(NULL, false, ndb, ndbtab_g.get_table(), &stat))
+        || ndb_get_table_statistics(NULL, FALSE, ndb, ndbtab_g.get_table(), &stat))
     {
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share);
       DBUG_RETURN(1);
     }
@@ -7089,7 +7356,9 @@
   pthread_mutex_lock(&share->mutex);
   if (share->commit_count_lock == lock)
   {
+#ifndef DBUG_OFF
     char buff[22];
+#endif
     DBUG_PRINT("info", ("Setting commit_count to %s",
                         llstr(stat.commit_count, buff)));
     share->commit_count= stat.commit_count;
@@ -7101,6 +7370,9 @@
     *commit_count= 0;
   }
   pthread_mutex_unlock(&share->mutex);
+  /* ndb_share reference temporary free */
+  DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                           share->key, share->use_count));
   free_share(&share);
   DBUG_RETURN(0);
 }
@@ -7145,7 +7417,9 @@
   bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
   char *dbname= full_name;
   char *tabname= dbname+strlen(dbname)+1;
+#ifndef DBUG_OFF
   char buff[22], buff2[22];
+#endif
   DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
   DBUG_PRINT("enter", ("dbname: %s, tabname: %s, is_autocommit: %d",
                        dbname, tabname, is_autocommit));
@@ -7212,7 +7486,9 @@
                                           ulonglong *engine_data)
 {
   Uint64 commit_count;
+#ifndef DBUG_OFF
   char buff[22];
+#endif
   bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
   DBUG_ENTER("ha_ndbcluster::register_query_cache_table");
   DBUG_PRINT("enter",("dbname: %s, tabname: %s, is_autocommit: %d",
@@ -7252,33 +7528,54 @@
   return (byte*) share->key;
 }
 
+
 #ifndef DBUG_OFF
-static void dbug_print_open_tables()
+
+static void print_share(const char* where, NDB_SHARE* share)
 {
-  DBUG_ENTER("dbug_print_open_tables");
-  for (uint i= 0; i < ndbcluster_open_tables.records; i++)
-  {
-    NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
-    DBUG_PRINT("share",
-               ("[%d] 0x%lx key: %s  key_length: %d",
-                i, share, share->key, share->key_length));
-    DBUG_PRINT("share",
-               ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-                share->db, share->table_name,
-                share->use_count, share->commit_count));
+  fprintf(DBUG_FILE,
+          "%s %s.%s: use_count: %u, commit_count: %lu\n",
+          where, share->db, share->table_name, share->use_count,
+          (ulong) share->commit_count);
+  fprintf(DBUG_FILE,
+          "  - key: %s, key_length: %d\n",
+          share->key, share->key_length);
+
 #ifdef HAVE_NDB_BINLOG
-    if (share->table)
-      DBUG_PRINT("share",
-                 ("table->s->db.table_name: %s.%s",
-                  share->table->s->db.str,
share->table->s->table_name.str));
+  if (share->table)
+    fprintf(DBUG_FILE,
+            "  - share->table: %p %s.%s\n",
+            share->table, share->table->s->db.str,
+            share->table->s->table_name.str);
 #endif
-  }
-  DBUG_VOID_RETURN;
 }
-#else
-#define dbug_print_open_tables()
+
+
+static void print_ndbcluster_open_tables()
+{
+  DBUG_LOCK_FILE;
+  fprintf(DBUG_FILE, ">ndbcluster_open_tables\n");
+  for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+    print_share("",
+                (NDB_SHARE*)hash_element(&ndbcluster_open_tables, i));
+  fprintf(DBUG_FILE, "<ndbcluster_open_tables\n");
+  DBUG_UNLOCK_FILE;
+}
+
 #endif
 
+
+#define dbug_print_open_tables()                \
+  DBUG_EXECUTE("info",                          \
+               print_ndbcluster_open_tables(););
+
+#define dbug_print_share(t, s)                  \
+  DBUG_LOCK_FILE;                               \
+  DBUG_EXECUTE("info",                          \
+               print_share((t), (s)););         \
+  DBUG_UNLOCK_FILE;
+
+
 #ifdef HAVE_NDB_BINLOG
 /*
   For some reason a share is still around, try to salvage the situation
@@ -7296,21 +7593,34 @@
   static ulong trailing_share_id= 0;
   DBUG_ENTER("handle_trailing_share");
 
+  /* ndb_share reference temporary, free below */
   ++share->use_count;
+  DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                           share->key, share->use_count));
   pthread_mutex_unlock(&ndbcluster_mutex);
 
   TABLE_LIST table_list;
   bzero((char*) &table_list,sizeof(table_list));
   table_list.db= share->db;
   table_list.alias= table_list.table_name= share->table_name;
+  safe_mutex_assert_owner(&LOCK_open);
   close_cached_tables(thd, 0, &table_list, TRUE);
 
   pthread_mutex_lock(&ndbcluster_mutex);
+  /* ndb_share reference temporary free */
+  DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                           share->key, share->use_count));
   if (!--share->use_count)
   {
-    DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.",
-               share->key)); 
-    real_free_share(&share);
+    if (ndb_extra_logging)
+      sql_print_information("NDB_SHARE: trailing share "
+                            "%s(connect_count: %u) "
+                            "released by close_cached_tables at "
+                            "connect_count: %u",
+                            share->key,
+                            share->connect_count,
+                            g_ndb_cluster_connection->get_connect_count());
+    ndbcluster_real_free_share(&share);
     DBUG_RETURN(0);
   }
 
@@ -7318,16 +7628,28 @@
     share still exists, if share has not been dropped by server
     release that share
   */
-  if (share->state != NSS_DROPPED && !--share->use_count)
+  if (share->state != NSS_DROPPED)
   {
-    DBUG_PRINT("info", ("NDB_SHARE: %s already exists, "
-                        "use_count=%d  state != NSS_DROPPED.",
-                        share->key, share->use_count)); 
-    real_free_share(&share);
-    DBUG_RETURN(0);
+    share->state= NSS_DROPPED;
+    /* ndb_share reference create free */
+    DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
+                             share->key, share->use_count));
+    --share->use_count;
+
+    if (share->use_count == 0)
+    {
+      if (ndb_extra_logging)
+        sql_print_information("NDB_SHARE: trailing share "
+                              "%s(connect_count: %u) "
+                              "released after NSS_DROPPED check "
+                              "at connect_count: %u",
+                              share->key,
+                              share->connect_count,
+                              g_ndb_cluster_connection->get_connect_count());
+      ndbcluster_real_free_share(&share);
+      DBUG_RETURN(0);
+    }
   }
-  DBUG_PRINT("error", ("NDB_SHARE: %s already exists  use_count=%d.",
-                       share->key, share->use_count));
 
   sql_print_error("NDB_SHARE: %s already exists  use_count=%d."
                   " Moving away for safety, but possible memleak.",
@@ -7421,19 +7743,9 @@
   share->table_name= share->db + strlen(share->db) + 1;
   ha_ndbcluster::set_tabname(new_key, share->table_name);
 
-  DBUG_PRINT("rename_share",
-             ("0x%lx key: %s  key_length: %d",
-              share, share->key, share->key_length));
-  DBUG_PRINT("rename_share",
-             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-              share->db, share->table_name,
-              share->use_count, share->commit_count));
+  dbug_print_share("rename_share:", share);
   if (share->table)
   {
-    DBUG_PRINT("rename_share",
-               ("table->s->db.table_name: %s.%s",
-                share->table->s->db.str,
share->table->s->table_name.str));
-
     if (share->op == 0)
     {
       share->table->s->db.str= share->db;
@@ -7461,14 +7773,7 @@
   share->use_count++;
 
   dbug_print_open_tables();
-
-  DBUG_PRINT("get_share",
-             ("0x%lx key: %s  key_length: %d",
-              share, share->key, share->key_length));
-  DBUG_PRINT("get_share",
-             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-              share->db, share->table_name,
-              share->use_count, share->commit_count));
+  dbug_print_share("ndbcluster_get_share:", share);
   pthread_mutex_unlock(&ndbcluster_mutex);
   return share;
 }
@@ -7493,7 +7798,6 @@
                                 bool create_if_not_exists,
                                 bool have_lock)
 {
-  THD *thd= current_thd;
   NDB_SHARE *share;
   uint length= (uint) strlen(key);
   DBUG_ENTER("ndbcluster_get_share");
@@ -7559,14 +7863,7 @@
   share->use_count++;
 
   dbug_print_open_tables();
-
-  DBUG_PRINT("info",
-             ("0x%lx key: %s  key_length: %d  key: %s",
-              share, share->key, share->key_length, key));
-  DBUG_PRINT("info",
-             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-              share->db, share->table_name,
-              share->use_count, share->commit_count));
+  dbug_print_share("ndbcluster_get_share:", share);
   if (!have_lock)
     pthread_mutex_unlock(&ndbcluster_mutex);
   DBUG_RETURN(share);
@@ -7576,13 +7873,7 @@
 void ndbcluster_real_free_share(NDB_SHARE **share)
 {
   DBUG_ENTER("ndbcluster_real_free_share");
-  DBUG_PRINT("real_free_share",
-             ("0x%lx key: %s  key_length: %d",
-              (*share), (*share)->key, (*share)->key_length));
-  DBUG_PRINT("real_free_share",
-             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-              (*share)->db, (*share)->table_name,
-              (*share)->use_count, (*share)->commit_count));
+  dbug_print_share("ndbcluster_real_free_share:", *share);
 
   hash_delete(&ndbcluster_open_tables, (byte*) *share);
   thr_lock_delete(&(*share)->lock);
@@ -7611,12 +7902,7 @@
   DBUG_VOID_RETURN;
 }
 
-/*
-  decrease refcount of share
-  calls real_free_share when refcount reaches 0
 
-  have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
-*/
 void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
 {
   if (!have_lock)
@@ -7625,18 +7911,12 @@
     (*share)->util_lock= 0;
   if (!--(*share)->use_count)
   {
-    real_free_share(share);
+    ndbcluster_real_free_share(share);
   }
   else
   {
     dbug_print_open_tables();
-    DBUG_PRINT("free_share",
-               ("0x%lx key: %s  key_length: %d",
-                *share, (*share)->key, (*share)->key_length));
-    DBUG_PRINT("free_share",
-               ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-                (*share)->db, (*share)->table_name,
-                (*share)->use_count, (*share)->commit_count));
+    dbug_print_share("ndbcluster_free_share:", *share);
   }
   if (!have_lock)
     pthread_mutex_unlock(&ndbcluster_mutex);
@@ -7653,7 +7933,9 @@
   int retries= 10;
   int reterr= 0;
   int retry_sleep= 30 * 1000; /* 30 milliseconds */
+#ifndef DBUG_OFF
   char buff[22], buff2[22], buff3[22], buff4[22];
+#endif
   DBUG_ENTER("ndb_get_table_statistics");
   DBUG_PRINT("enter", ("table: %s", ndbtab->getName()));
 
@@ -7669,7 +7951,6 @@
     Uint64 sum_row_size= 0;
     Uint64 sum_mem= 0;
     NdbScanOperation*pOp;
-    NdbResultSet *rs;
     int check;
 
     if ((pTrans= ndb->startTransaction()) == NULL)
@@ -7705,7 +7986,7 @@
 		  (char*)&var_mem);
     
     if (pTrans->execute(NdbTransaction::NoCommit,
-                        NdbTransaction::AbortOnError,
+                        NdbOperation::AbortOnError,
                         TRUE) == -1)
     {
       error= pTrans->getNdbError();
@@ -7749,7 +8030,7 @@
 retry:
     if(report_error)
     {
-      if (file)
+      if (file && pTrans)
       {
         reterr= file->ndb_err(pTrans);
       }
@@ -7848,10 +8129,10 @@
     const byte *key= range->start_key.key;
     uint key_len= range->start_key.length;
     if (check_null_in_key(key_info, key, key_len))
-      DBUG_RETURN(true);
+      DBUG_RETURN(TRUE);
     curr += reclength;
   }
-  DBUG_RETURN(false);
+  DBUG_RETURN(FALSE);
 }
 
 int
@@ -7861,21 +8142,20 @@
                                       bool sorted, 
                                       HANDLER_BUFFER *buffer)
 {
-  DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
   m_write_op= FALSE;
-  
   int res;
   KEY* key_info= table->key_info + active_index;
-  NDB_INDEX_TYPE index_type= get_index_type(active_index);
+  NDB_INDEX_TYPE cur_index_type= get_index_type(active_index);
   ulong reclength= table_share->reclength;
   NdbOperation* op;
   Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
+  DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
 
   /**
    * blobs and unique hash index with NULL can't be batched currently
    */
   if (uses_blob_value() ||
-      (index_type ==  UNIQUE_INDEX &&
+      (cur_index_type ==  UNIQUE_INDEX &&
        has_null_in_unique_index(active_index) &&
        null_value_index_search(ranges, ranges+range_count, buffer)))
   {
@@ -7933,7 +8213,7 @@
       get_partition_set(table, curr, active_index,
                         &multi_range_curr->start_key,
                         &part_spec);
-      DBUG_PRINT("info", ("part_spec.start_part = %u, part_spec.end_part = %u",
+      DBUG_PRINT("info", ("part_spec.start_part: %u  part_spec.end_part: %u",
                           part_spec.start_part, part_spec.end_part));
       /*
         If partition pruning has found no partition in set
@@ -7950,7 +8230,7 @@
         continue;
       }
     }
-    switch(index_type){
+    switch (cur_index_type) {
     case PRIMARY_KEY_ORDERED_INDEX:
       if (!(multi_range_curr->start_key.length == key_info->key_length &&
           multi_range_curr->start_key.flag == HA_READ_KEY_EXACT))
@@ -7963,9 +8243,8 @@
           !op->readTuple(lm) && 
           !set_primary_key(op, multi_range_curr->start_key.key) &&
           !define_read_attrs(curr, op) &&
-          (op->setAbortOption(AO_IgnoreError), TRUE) &&
           (!m_use_partition_function ||
-           (op->setPartitionId(part_spec.start_part), true)))
+           (op->setPartitionId(part_spec.start_part), TRUE)))
         curr += reclength;
       else
         ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
@@ -7985,8 +8264,7 @@
       if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && 
           !op->readTuple(lm) && 
           !set_index_key(op, key_info, multi_range_curr->start_key.key) &&
-          !define_read_attrs(curr, op) &&
-          (op->setAbortOption(AO_IgnoreError), TRUE))
+          !define_read_attrs(curr, op))
         curr += reclength;
       else
         ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
@@ -8010,7 +8288,7 @@
         }
         else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) 
                  &&!scanOp->readTuples(lm, 0, parallelism, sorted, 
-				       FALSE, TRUE, need_pk)
+				       FALSE, TRUE, need_pk, TRUE)
                  &&!generate_scan_filter(m_cond_stack, scanOp)
                  &&!define_read_attrs(end_of_buffer-reclength, scanOp))
         {
@@ -8026,7 +8304,7 @@
 
       const key_range *keys[2]= { &multi_range_curr->start_key, 
                                   &multi_range_curr->end_key };
-      if ((res= set_bounds(scanOp, active_index, false, keys,
+      if ((res= set_bounds(scanOp, active_index, FALSE, keys,
                            multi_range_curr-ranges)))
         DBUG_RETURN(res);
       break;
@@ -8148,7 +8426,7 @@
         DBUG_MULTI_RANGE(6);
         // First fetch from cursor
         DBUG_ASSERT(range_no == -1);
-        if ((res= m_multi_cursor->nextResult(true)))
+        if ((res= m_multi_cursor->nextResult(TRUE)))
         {
           DBUG_MULTI_RANGE(15);
           goto close_scan;
@@ -8186,6 +8464,8 @@
   if (multi_range_curr == multi_range_end)
   {
     DBUG_MULTI_RANGE(16);
+    Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
+    thd_ndb->query_state&= NDB_QUERY_NORMAL;
     DBUG_RETURN(HA_ERR_END_OF_FILE);
   }
   
@@ -8270,7 +8550,6 @@
   }
 
   ndb->setDatabaseName(m_dbname);
-  NDBDICT* dict= ndb->getDictionary();
   const NDBTAB* tab= m_table;
   DBUG_ASSERT(tab != NULL);
 
@@ -8293,32 +8572,27 @@
 pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
 {
   THD *thd; /* needs to be first for thread_stack */
-  Ndb* ndb;
   struct timespec abstime;
   List<NDB_SHARE> util_open_tables;
+  Thd_ndb *thd_ndb;
 
   my_thread_init();
   DBUG_ENTER("ndb_util_thread");
-  DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
+  DBUG_PRINT("enter", ("ndb_cache_check_time: %lu", ndb_cache_check_time));
+ 
+   pthread_mutex_lock(&LOCK_ndb_util_thread);
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   THD_CHECK_SENTRY(thd);
-  ndb= new Ndb(g_ndb_cluster_connection, "");
 
   pthread_detach_this_thread();
   ndb_util_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
-  if (thd->store_globals() || (ndb->init() != 0))
-  {
-    thd->cleanup();
-    delete thd;
-    delete ndb;
-    DBUG_RETURN(NULL);
-  }
+  if (thd->store_globals())
+    goto ndb_util_thread_fail;
   thd->init_for_queries();
   thd->version=refresh_version;
-  thd->set_time();
   thd->main_security_ctx.host_or_ip= "";
   thd->client_capabilities = 0;
   my_net_init(&thd->net, 0);
@@ -8326,16 +8600,29 @@
   thd->main_security_ctx.priv_user = 0;
   thd->current_stmt_binlog_row_based= TRUE;     // If in mixed mode
 
+  /* Signal successful initialization */
+  ndb_util_thread_running= 1;
+  pthread_cond_signal(&COND_ndb_util_ready);
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
   /*
     wait for mysql server to start
   */
   pthread_mutex_lock(&LOCK_server_started);
   while (!mysqld_server_started)
-    pthread_cond_wait(&COND_server_started, &LOCK_server_started);
+  {
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
+	                       &abstime);
+    if (ndbcluster_terminating)
+    {
+      pthread_mutex_unlock(&LOCK_server_started);
+      pthread_mutex_lock(&LOCK_ndb_util_thread);
+      goto ndb_util_thread_end;
+    }
+  }
   pthread_mutex_unlock(&LOCK_server_started);
 
-  ndbcluster_util_inited= 1;
-
   /*
     Wait for cluster to start
   */
@@ -8343,28 +8630,21 @@
   while (!ndb_cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
   {
     /* ndb not connected yet */
-    set_timespec(abstime, 1);
-    pthread_cond_timedwait(&COND_ndb_util_thread,
-                           &LOCK_ndb_util_thread,
-                           &abstime);
-    if (abort_loop)
-    {
-      pthread_mutex_unlock(&LOCK_ndb_util_thread);
+    pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
+    if (ndbcluster_terminating)
       goto ndb_util_thread_end;
-    }
   }
   pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
+  /* Get thd_ndb for this thread */
+  if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
   {
-    Thd_ndb *thd_ndb;
-    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
-    {
-      sql_print_error("Could not allocate Thd_ndb object");
-      goto ndb_util_thread_end;
-    }
-    set_thd_ndb(thd, thd_ndb);
-    thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+    sql_print_error("Could not allocate Thd_ndb object");
+    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    goto ndb_util_thread_end;
   }
+  set_thd_ndb(thd, thd_ndb);
+  thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
 
 #ifdef HAVE_NDB_BINLOG
   if (ndb_extra_logging && ndb_binlog_running)
@@ -8379,23 +8659,25 @@
 #endif
 
   set_timespec(abstime, 0);
-  for (;!abort_loop;)
+  for (;;)
   {
     pthread_mutex_lock(&LOCK_ndb_util_thread);
-    pthread_cond_timedwait(&COND_ndb_util_thread,
-                           &LOCK_ndb_util_thread,
-                           &abstime);
+    if (!ndbcluster_terminating)
+      pthread_cond_timedwait(&COND_ndb_util_thread,
+                             &LOCK_ndb_util_thread,
+                             &abstime);
+    if (ndbcluster_terminating) /* Shutting down server */
+      goto ndb_util_thread_end;
     pthread_mutex_unlock(&LOCK_ndb_util_thread);
 #ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
-    DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
+    DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %lu",
                                    ndb_cache_check_time));
 #endif
-    if (abort_loop)
-      break; /* Shutting down server */
 
 #ifdef HAVE_NDB_BINLOG
     /*
-      Check that the apply_status_share and schema_share has been created.
+      Check that the ndb_apply_status_share and ndb_schema_share 
+      have been created.
       If not try to create it
     */
     if (!ndb_binlog_tables_inited)
@@ -8421,7 +8703,10 @@
         continue; // injector thread is the only user, skip statistics
       share->util_lock= current_thd; // Mark that util thread has lock
 #endif /* HAVE_NDB_BINLOG */
+      /* ndb_share reference temporary, free below */
       share->use_count++; /* Make sure the table can't be closed */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                               share->key, share->use_count));
       DBUG_PRINT("ndb_util_thread",
                  ("Found open table[%d]: %s, use_count: %d",
                   i, share->table_name, share->use_count));
@@ -8442,31 +8727,36 @@
         /*
           Util thread and injector thread is the only user, skip statistics
 	*/
+        /* ndb_share reference temporary free */
+        DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                                 share->key, share->use_count));
         free_share(&share);
         continue;
       }
 #endif /* HAVE_NDB_BINLOG */
       DBUG_PRINT("ndb_util_thread",
-                 ("Fetching commit count for: %s",
-                  share->key));
+                 ("Fetching commit count for: %s", share->key));
 
-      /* Contact NDB to get commit count for table */
-      ndb->setDatabaseName(share->db);
       struct Ndb_statistics stat;
-
       uint lock;
       pthread_mutex_lock(&share->mutex);
       lock= share->commit_count_lock;
       pthread_mutex_unlock(&share->mutex);
 
       {
+        /* Contact NDB to get commit count for table */
+        Ndb* ndb= thd_ndb->ndb;
+        ndb->setDatabaseName(share->db);
         Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
         if (ndbtab_g.get_table() &&
-            ndb_get_table_statistics(NULL, false, ndb, ndbtab_g.get_table(), &stat)
== 0)
+            ndb_get_table_statistics(NULL, FALSE, ndb,
+                                     ndbtab_g.get_table(), &stat) == 0)
         {
+#ifndef DBUG_OFF
           char buff[22], buff2[22];
-          DBUG_PRINT("ndb_util_thread",
-                     ("Table: %s, commit_count: %llu, rows: %llu",
+#endif
+          DBUG_PRINT("info",
+                     ("Table: %s  commit_count: %s  rows: %s",
                       share->key,
                       llstr(stat.commit_count, buff),
                       llstr(stat.row_count, buff2)));
@@ -8485,7 +8775,9 @@
         share->commit_count= stat.commit_count;
       pthread_mutex_unlock(&share->mutex);
 
-      /* Decrease the use count and possibly free share */
+      /* ndb_share reference temporary free */
+      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                               share->key, share->use_count));
       free_share(&share);
     }
 
@@ -8513,12 +8805,19 @@
       abstime.tv_nsec-= 1000000000;
     }
   }
+
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
+
 ndb_util_thread_end:
-  sql_print_information("Stopping Cluster Utility thread");
   net_end(&thd->net);
+ndb_util_thread_fail:
   thd->cleanup();
   delete thd;
-  delete ndb;
+  
+  /* signal termination */
+  ndb_util_thread_running= 0;
+  pthread_cond_signal(&COND_ndb_util_ready);
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
   DBUG_PRINT("exit", ("ndb_util_thread"));
   my_thread_end();
   pthread_exit(0);
@@ -8635,14 +8934,14 @@
   
   if (context->supported)
   {
-    Ndb_rewrite_context *rewrite_context= context->rewrite_stack;
-    const Item_func *func_item;
+    Ndb_rewrite_context *rewrite_context2= context->rewrite_stack;
+    const Item_func *rewrite_func_item;
     // Check if we are rewriting some unsupported function call
-    if (rewrite_context &&
-        (func_item= rewrite_context->func_item) &&
-        rewrite_context->count++ == 0)
+    if (rewrite_context2 &&
+        (rewrite_func_item= rewrite_context2->func_item) &&
+        rewrite_context2->count++ == 0)
     {
-      switch (func_item->functype()) {
+      switch (rewrite_func_item->functype()) {
       case Item_func::BETWEEN:
         /*
           Rewrite 
@@ -8669,7 +8968,7 @@
         if (context->expecting(item->type()))
         {
           // This is the <field>|<const> item, save it in the rewrite context
-          rewrite_context->left_hand_item= item;
+          rewrite_context2->left_hand_item= item;
           if (item->type() == Item::FUNC_ITEM)
           {
             Item_func *func_item= (Item_func *) item;
@@ -8813,7 +9112,7 @@
             Check that the field is part of the table of the handler
             instance and that we expect a field with of this result type.
           */
-          if (context->table == field->table)
+          if (context->table->s == field->table->s)
           {       
             const NDBTAB *tab= (const NDBTAB *) context->ndb_table;
             DBUG_PRINT("info", ("FIELD_ITEM"));
@@ -8834,7 +9133,7 @@
                    type == MYSQL_TYPE_DATETIME)
                   ? (context->expecting_field_result(STRING_RESULT) ||
                      context->expecting_field_result(INT_RESULT))
-                  : true)) &&
+                  : TRUE)) &&
                 // Bit fields no yet supported in scan filter
                 type != MYSQL_TYPE_BIT &&
                 // No BLOB support in scan filter
@@ -9307,8 +9606,8 @@
           DBUG_PRINT("info", ("INT_ITEM"));
           if (context->expecting(Item::INT_ITEM)) 
           {
-            Item_int *int_item= (Item_int *) item;      
-            DBUG_PRINT("info", ("value %d", int_item->value));
+            DBUG_PRINT("info", ("value %ld",
+                                (long) ((Item_int*) item)->value));
             NDB_ITEM_QUALIFICATION q;
             q.value_type= Item::INT_ITEM;
             curr_cond->ndb_item= new Ndb_item(NDB_VALUE, q, item);
@@ -9331,11 +9630,10 @@
             context->supported= FALSE;
           break;
         case Item::REAL_ITEM:
-          DBUG_PRINT("info", ("REAL_ITEM %s"));
+          DBUG_PRINT("info", ("REAL_ITEM"));
           if (context->expecting(Item::REAL_ITEM)) 
           {
-            Item_float *float_item= (Item_float *) item;      
-            DBUG_PRINT("info", ("value %f", float_item->value));
+            DBUG_PRINT("info", ("value %f", ((Item_float*) item)->value));
             NDB_ITEM_QUALIFICATION q;
             q.value_type= Item::REAL_ITEM;
             curr_cond->ndb_item= new Ndb_item(NDB_VALUE, q, item);
@@ -9379,11 +9677,11 @@
             context->supported= FALSE;
           break;
         case Item::DECIMAL_ITEM:
-          DBUG_PRINT("info", ("DECIMAL_ITEM %s"));
+          DBUG_PRINT("info", ("DECIMAL_ITEM"));
           if (context->expecting(Item::DECIMAL_ITEM)) 
           {
-            Item_decimal *decimal_item= (Item_decimal *) item;      
-            DBUG_PRINT("info", ("value %f", decimal_item->val_real()));
+            DBUG_PRINT("info", ("value %f",
+                                ((Item_decimal*) item)->val_real()));
             NDB_ITEM_QUALIFICATION q;
             q.value_type= Item::DECIMAL_ITEM;
             curr_cond->ndb_item= new Ndb_item(NDB_VALUE, q, item);
@@ -9494,25 +9792,24 @@
       break;
     Ndb_item *a= cond->next->ndb_item;
     Ndb_item *b, *field, *value= NULL;
-    LINT_INIT(field);
 
     switch (cond->ndb_item->argument_count()) {
     case 1:
-      field= 
-        (a->type == NDB_FIELD)? a : NULL;
+      field= (a->type == NDB_FIELD)? a : NULL;
       break;
     case 2:
       if (!cond->next->next)
+      {
+        field= NULL;
         break;
+      }
       b= cond->next->next->ndb_item;
-      value= 
-        (a->type == NDB_VALUE)? a
-        : (b->type == NDB_VALUE)? b
-        : NULL;
-      field= 
-        (a->type == NDB_FIELD)? a
-        : (b->type == NDB_FIELD)? b
-        : NULL;
+      value= ((a->type == NDB_VALUE) ? a :
+              (b->type == NDB_VALUE) ? b :
+              NULL);
+      field= ((a->type == NDB_FIELD) ? a :
+              (b->type == NDB_FIELD) ? b :
+              NULL);
       break;
     default:
       field= NULL; //Keep compiler happy
@@ -9722,6 +10019,7 @@
   DBUG_RETURN(1);
 }
 
+
 int
 ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
 {
@@ -9795,6 +10093,7 @@
   DBUG_RETURN(0);
 }
 
+
 int
 ha_ndbcluster::build_scan_filter(Ndb_cond * &cond, NdbScanFilter *filter)
 {
@@ -9845,14 +10144,14 @@
   DBUG_RETURN(0);
 }
 
+
 int
 ha_ndbcluster::generate_scan_filter_from_cond(Ndb_cond_stack *ndb_cond_stack,
 					      NdbScanFilter& filter)
 {
-  DBUG_ENTER("generate_scan_filter_from_cond");
   bool multiple_cond= FALSE;
-  
-  DBUG_PRINT("info", ("Generating scan filter"));
+  DBUG_ENTER("generate_scan_filter_from_cond");
+
   // Wrap an AND group around multiple conditions
   if (ndb_cond_stack->next) 
   {
@@ -9878,6 +10177,7 @@
   DBUG_RETURN(0);
 }
 
+
 int ha_ndbcluster::generate_scan_filter_from_key(NdbScanOperation *op,
 						 const KEY* key_info, 
 						 const byte *key, 
@@ -9888,15 +10188,14 @@
   KEY_PART_INFO* end= key_part+key_info->key_parts;
   NdbScanFilter filter(op);
   int res;
-
   DBUG_ENTER("generate_scan_filter_from_key");
+
   filter.begin(NdbScanFilter::AND);
   for (; key_part != end; key_part++) 
   {
     Field* field= key_part->field;
     uint32 pack_len= field->pack_length();
     const byte* ptr= key;
-    char buf[256];
     DBUG_PRINT("info", ("Filtering value for %s", field->field_name));
     DBUG_DUMP("key", (char*)ptr, pack_len);
     if (key_part->null_bit)
@@ -9931,7 +10230,7 @@
 /*
   get table space info for SHOW CREATE TABLE
 */
-char* ha_ndbcluster::get_tablespace_name(THD *thd)
+char* ha_ndbcluster::get_tablespace_name(THD *thd, char* name, uint name_len)
 {
   Ndb *ndb= check_ndb_in_thd(thd);
   NDBDICT *ndbdict= ndb->getDictionary();
@@ -9949,7 +10248,14 @@
     ndberr= ndbdict->getNdbError();
     if(ndberr.classification != NdbError::NoError)
       goto err;
-    return (my_strdup(ts.getName(), MYF(0)));
+    DBUG_PRINT("info", ("Found tablespace '%s'", ts.getName()));
+    if (name)
+    {
+      strxnmov(name, name_len, ts.getName(), NullS);
+      return name;
+    }
+    else
+      return (my_strdup(ts.getName(), MYF(0)));
   }
 err:
   if (ndberr.status == NdbError::TemporaryError)
@@ -10067,13 +10373,13 @@
   return (reported_frags < no_fragments);
 }
 
-int ha_ndbcluster::get_default_no_partitions(HA_CREATE_INFO *info)
+int ha_ndbcluster::get_default_no_partitions(HA_CREATE_INFO *create_info)
 {
   ha_rows max_rows, min_rows;
-  if (info)
+  if (create_info)
   {
-    max_rows= info->max_rows;
-    min_rows= info->min_rows;
+    max_rows= create_info->max_rows;
+    min_rows= create_info->min_rows;
   }
   else
   {
@@ -10224,15 +10530,14 @@
 {
   uint16 frag_data[MAX_PARTITIONS];
   char *ts_names[MAX_PARTITIONS];
-  ulong ts_index= 0, fd_index= 0, i, j;
+  ulong fd_index= 0, i, j;
   NDBTAB *tab= (NDBTAB*)tab_par;
   NDBTAB::FragmentType ftype= NDBTAB::UserDefined;
   partition_element *part_elem;
   bool first= TRUE;
-  uint ts_id, ts_version, part_count= 0, tot_ts_name_len;
+  uint tot_ts_name_len;
   List_iterator<partition_element> part_it(part_info->partitions);
   int error;
-  char *name_ptr;
   DBUG_ENTER("ha_ndbcluster::set_up_partition_info");
 
   if (part_info->part_type == HASH_PARTITION &&
@@ -10346,7 +10651,7 @@
 }
 
 
-bool ha_ndbcluster::check_if_incompatible_data(HA_CREATE_INFO *info,
+bool ha_ndbcluster::check_if_incompatible_data(HA_CREATE_INFO *create_info,
 					       uint table_changes)
 {
   DBUG_ENTER("ha_ndbcluster::check_if_incompatible_data");
@@ -10404,76 +10709,78 @@
   }
   
   /* Check that auto_increment value was not changed */
-  if ((info->used_fields & HA_CREATE_USED_AUTO) &&
-      info->auto_increment_value != 0)
+  if ((create_info->used_fields & HA_CREATE_USED_AUTO) &&
+      create_info->auto_increment_value != 0)
     DBUG_RETURN(COMPATIBLE_DATA_NO);
   
   /* Check that row format didn't change */
-  if ((info->used_fields & HA_CREATE_USED_AUTO) &&
-      get_row_type() != info->row_type)
+  if ((create_info->used_fields & HA_CREATE_USED_AUTO) &&
+      get_row_type() != create_info->row_type)
     DBUG_RETURN(COMPATIBLE_DATA_NO);
 
   DBUG_RETURN(COMPATIBLE_DATA_YES);
 }
 
-bool set_up_tablespace(st_alter_tablespace *info,
+bool set_up_tablespace(st_alter_tablespace *alter_info,
                        NdbDictionary::Tablespace *ndb_ts)
 {
-  ndb_ts->setName(info->tablespace_name);
-  ndb_ts->setExtentSize(info->extent_size);
-  ndb_ts->setDefaultLogfileGroup(info->logfile_group_name);
-  return false;
+  ndb_ts->setName(alter_info->tablespace_name);
+  ndb_ts->setExtentSize(alter_info->extent_size);
+  ndb_ts->setDefaultLogfileGroup(alter_info->logfile_group_name);
+  return FALSE;
 }
 
-bool set_up_datafile(st_alter_tablespace *info,
+bool set_up_datafile(st_alter_tablespace *alter_info,
                      NdbDictionary::Datafile *ndb_df)
 {
-  if (info->max_size > 0)
+  if (alter_info->max_size > 0)
   {
     my_error(ER_TABLESPACE_AUTO_EXTEND_ERROR, MYF(0));
-    return true;
+    return TRUE;
   }
-  ndb_df->setPath(info->data_file_name);
-  ndb_df->setSize(info->initial_size);
-  ndb_df->setTablespace(info->tablespace_name);
-  return false;
+  ndb_df->setPath(alter_info->data_file_name);
+  ndb_df->setSize(alter_info->initial_size);
+  ndb_df->setTablespace(alter_info->tablespace_name);
+  return FALSE;
 }
 
-bool set_up_logfile_group(st_alter_tablespace *info,
+bool set_up_logfile_group(st_alter_tablespace *alter_info,
                           NdbDictionary::LogfileGroup *ndb_lg)
 {
-  ndb_lg->setName(info->logfile_group_name);
-  ndb_lg->setUndoBufferSize(info->undo_buffer_size);
-  return false;
+  ndb_lg->setName(alter_info->logfile_group_name);
+  ndb_lg->setUndoBufferSize(alter_info->undo_buffer_size);
+  return FALSE;
 }
 
-bool set_up_undofile(st_alter_tablespace *info,
+bool set_up_undofile(st_alter_tablespace *alter_info,
                      NdbDictionary::Undofile *ndb_uf)
 {
-  ndb_uf->setPath(info->undo_file_name);
-  ndb_uf->setSize(info->initial_size);
-  ndb_uf->setLogfileGroup(info->logfile_group_name);
-  return false;
+  ndb_uf->setPath(alter_info->undo_file_name);
+  ndb_uf->setSize(alter_info->initial_size);
+  ndb_uf->setLogfileGroup(alter_info->logfile_group_name);
+  return FALSE;
 }
 
-int ndbcluster_alter_tablespace(handlerton *hton, THD* thd, st_alter_tablespace *info)
+int ndbcluster_alter_tablespace(handlerton *hton,
+                                THD* thd, st_alter_tablespace *alter_info)
 {
+  int is_tablespace= 0;
+  NdbError err;
+  NDBDICT *dict;
+  int error;
+  const char *errmsg;
+  Ndb *ndb;
   DBUG_ENTER("ha_ndbcluster::alter_tablespace");
+  LINT_INIT(errmsg);
 
-  int is_tablespace= 0;
-  Ndb *ndb= check_ndb_in_thd(thd);
+  ndb= check_ndb_in_thd(thd);
   if (ndb == NULL)
   {
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
+  dict= ndb->getDictionary();
 
-  NdbError err;
-  NDBDICT *dict= ndb->getDictionary();
-  int error;
-  const char * errmsg;
-  LINT_INIT(errmsg);
-
-  switch (info->ts_cmd_type){
+  switch (alter_info->ts_cmd_type){
   case (CREATE_TABLESPACE):
   {
     error= ER_CREATE_FILEGROUP_FAILED;
@@ -10481,11 +10788,11 @@
     NdbDictionary::Tablespace ndb_ts;
     NdbDictionary::Datafile ndb_df;
     NdbDictionary::ObjectId objid;
-    if (set_up_tablespace(info, &ndb_ts))
+    if (set_up_tablespace(alter_info, &ndb_ts))
     {
       DBUG_RETURN(1);
     }
-    if (set_up_datafile(info, &ndb_df))
+    if (set_up_datafile(alter_info, &ndb_df))
     {
       DBUG_RETURN(1);
     }
@@ -10495,7 +10802,7 @@
       DBUG_PRINT("error", ("createTablespace returned %d", error));
       goto ndberror;
     }
-    DBUG_PRINT("info", ("Successfully created Tablespace"));
+    DBUG_PRINT("alter_info", ("Successfully created Tablespace"));
     errmsg= "DATAFILE";
     if (dict->createDatafile(ndb_df))
     {
@@ -10517,10 +10824,10 @@
   case (ALTER_TABLESPACE):
   {
     error= ER_ALTER_FILEGROUP_FAILED;
-    if (info->ts_alter_tablespace_type == ALTER_TABLESPACE_ADD_FILE)
+    if (alter_info->ts_alter_tablespace_type == ALTER_TABLESPACE_ADD_FILE)
     {
       NdbDictionary::Datafile ndb_df;
-      if (set_up_datafile(info, &ndb_df))
+      if (set_up_datafile(alter_info, &ndb_df))
       {
 	DBUG_RETURN(1);
       }
@@ -10530,14 +10837,14 @@
 	goto ndberror;
       }
     }
-    else if(info->ts_alter_tablespace_type == ALTER_TABLESPACE_DROP_FILE)
+    else if(alter_info->ts_alter_tablespace_type == ALTER_TABLESPACE_DROP_FILE)
     {
-      NdbDictionary::Tablespace ts= dict->getTablespace(info->tablespace_name);
-      NdbDictionary::Datafile df= dict->getDatafile(0, info->data_file_name);
+      NdbDictionary::Tablespace ts=
dict->getTablespace(alter_info->tablespace_name);
+      NdbDictionary::Datafile df= dict->getDatafile(0, alter_info->data_file_name);
       NdbDictionary::ObjectId objid;
       df.getTablespaceId(&objid);
       if (ts.getObjectId() == objid.getObjectId() && 
-	  strcmp(df.getPath(), info->data_file_name) == 0)
+	  strcmp(df.getPath(), alter_info->data_file_name) == 0)
       {
 	errmsg= " DROP DATAFILE";
 	if (dict->dropDatafile(df))
@@ -10555,7 +10862,7 @@
     else
     {
       DBUG_PRINT("error", ("Unsupported alter tablespace: %d", 
-			   info->ts_alter_tablespace_type));
+			   alter_info->ts_alter_tablespace_type));
       DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
     }
     is_tablespace= 1;
@@ -10567,14 +10874,14 @@
     NdbDictionary::LogfileGroup ndb_lg;
     NdbDictionary::Undofile ndb_uf;
     NdbDictionary::ObjectId objid;
-    if (info->undo_file_name == NULL)
+    if (alter_info->undo_file_name == NULL)
     {
       /*
 	REDO files in LOGFILE GROUP not supported yet
       */
       DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
     }
-    if (set_up_logfile_group(info, &ndb_lg))
+    if (set_up_logfile_group(alter_info, &ndb_lg))
     {
       DBUG_RETURN(1);
     }
@@ -10583,8 +10890,8 @@
     {
       goto ndberror;
     }
-    DBUG_PRINT("info", ("Successfully created Logfile Group"));
-    if (set_up_undofile(info, &ndb_uf))
+    DBUG_PRINT("alter_info", ("Successfully created Logfile Group"));
+    if (set_up_undofile(alter_info, &ndb_uf))
     {
       DBUG_RETURN(1);
     }
@@ -10606,7 +10913,7 @@
   case (ALTER_LOGFILE_GROUP):
   {
     error= ER_ALTER_FILEGROUP_FAILED;
-    if (info->undo_file_name == NULL)
+    if (alter_info->undo_file_name == NULL)
     {
       /*
 	REDO files in LOGFILE GROUP not supported yet
@@ -10614,7 +10921,7 @@
       DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
     }
     NdbDictionary::Undofile ndb_uf;
-    if (set_up_undofile(info, &ndb_uf))
+    if (set_up_undofile(alter_info, &ndb_uf))
     {
       DBUG_RETURN(1);
     }
@@ -10629,7 +10936,7 @@
   {
     error= ER_DROP_FILEGROUP_FAILED;
     errmsg= "TABLESPACE";
-    if (dict->dropTablespace(dict->getTablespace(info->tablespace_name)))
+    if (dict->dropTablespace(dict->getTablespace(alter_info->tablespace_name)))
     {
       goto ndberror;
     }
@@ -10640,7 +10947,7 @@
   {
     error= ER_DROP_FILEGROUP_FAILED;
     errmsg= "LOGFILE GROUP";
-    if (dict->dropLogfileGroup(dict->getLogfileGroup(info->logfile_group_name)))
+    if
(dict->dropLogfileGroup(dict->getLogfileGroup(alter_info->logfile_group_name)))
     {
       goto ndberror;
     }
@@ -10663,13 +10970,13 @@
   if (is_tablespace)
     ndbcluster_log_schema_op(thd, 0,
                              thd->query, thd->query_length,
-                             "", info->tablespace_name,
+                             "", alter_info->tablespace_name,
                              0, 0,
                              SOT_TABLESPACE, 0, 0, 0);
   else
     ndbcluster_log_schema_op(thd, 0,
                              thd->query, thd->query_length,
-                             "", info->logfile_group_name,
+                             "", alter_info->logfile_group_name,
                              0, 0,
                              SOT_LOGFILE_GROUP, 0, 0, 0);
 #endif
@@ -10690,7 +10997,6 @@
 {
   Ndb *ndb;
   NDBDICT *dict;
-  const NDBTAB *tab;
   int err;
   DBUG_ENTER("ha_ndbcluster::get_no_parts");
   LINT_INIT(err);

--- 1.16/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-03-10 12:02:58 +01:00
+++ 1.17/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-03-10 12:02:58 +01:00
@@ -2,8 +2,7 @@
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+   the Free Software Foundation; version 2 of the License.
 
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -54,8 +53,7 @@
     // flags
     Uint32 bits = 0;
     
-    if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
-	tablePtr.p->m_no_of_disk_attributes == 0)
+    if (!AccScanReq::getLcpScanFlag(req->requestInfo))
     {
       // seize from pool and link to per-fragment list
       LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
@@ -88,6 +86,7 @@
       
       ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
       c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
+      ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
       bits |= ScanOp::SCAN_LCP;
       if ((tablePtr.p->m_attributes[MM].m_no_of_varsize +
            tablePtr.p->m_attributes[MM].m_no_of_dynamic) > 0) {
@@ -834,7 +833,6 @@
       {
 	ndbassert(bits & ScanOp::SCAN_NR);
 	Local_key& key_mm = pos.m_key_mm;
-        Fix_page* page = (Fix_page*)pos.m_page;
 	if (! (bits & ScanOp::SCAN_DD)) {
 	  key_mm = pos.m_key;
 	  // caller has already set pos.m_get to next tuple
@@ -1041,6 +1039,7 @@
   {
     ndbrequire(fragPtr.p->m_lcp_scan_op == scanPtr.i);
     fragPtr.p->m_lcp_scan_op = RNIL;
+    scanPtr.p->m_fragPtrI = RNIL;
   }
 }
 
@@ -1053,23 +1052,21 @@
   tablePtr.i = req->tableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
-  if(tablePtr.p->m_no_of_disk_attributes)
-  {
-    jam();
-    FragrecordPtr fragPtr;
-    Uint32 fragId = req->fragmentId;
-    fragPtr.i = RNIL;
-    getFragmentrec(fragPtr, fragId, tablePtr.p);
-    ndbrequire(fragPtr.i != RNIL);
-    Fragrecord& frag = *fragPtr.p;
-    
-    ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
-    frag.m_lcp_scan_op = c_lcp_scan_op;
-    ScanOpPtr scanPtr;
-    c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
-    //ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i); ?
-
-    scanFirst(signal, scanPtr);
-    scanPtr.p->m_state = ScanOp::First;
-  }
+  jam();
+  FragrecordPtr fragPtr;
+  Uint32 fragId = req->fragmentId;
+  fragPtr.i = RNIL;
+  getFragmentrec(fragPtr, fragId, tablePtr.p);
+  ndbrequire(fragPtr.i != RNIL);
+  Fragrecord& frag = *fragPtr.p;
+  
+  ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
+  frag.m_lcp_scan_op = c_lcp_scan_op;
+  ScanOpPtr scanPtr;
+  c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
+  ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
+  scanPtr.p->m_fragPtrI = fragPtr.i;
+  
+  scanFirst(signal, scanPtr);
+  scanPtr.p->m_state = ScanOp::First;
 }
Thread
bk commit into 5.1 tree (jonas:1.2326)jonas10 Mar