List:Commits« Previous MessageNext Message »
From:Tomas Ulin Date:December 5 2008 9:09am
Subject:bzr commit into mysql-5.1 branch (tomas.ulin:3137)
View as plain text  
#At file:///home/tomas/mysql_src/mysql-5.1-telco-6.4/

 3137 Tomas Ulin	2008-12-05 [merge]
      merge
added:
  storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
modified:
  mysql-test/suite/ndb/t/ndb_dd_dump.test
  sql/ha_ndbcluster.cc
  sql/table.cc
  storage/ndb/include/kernel/NodeInfo.hpp
  storage/ndb/include/kernel/ndb_limits.h
  storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp
  storage/ndb/include/kernel/signaldata/FsOpenReq.hpp
  storage/ndb/include/mgmapi/mgmapi_config_parameters.h
  storage/ndb/include/ndb_version.h.in
  storage/ndb/src/kernel/blocks/ERROR_codes.txt
  storage/ndb/src/kernel/blocks/LocalProxy.cpp
  storage/ndb/src/kernel/blocks/LocalProxy.hpp
  storage/ndb/src/kernel/blocks/Makefile.am
  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
  storage/ndb/src/kernel/blocks/lgman.cpp
  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
  storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt
  storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
  storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
  storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
  storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp
  storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
  storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp
  storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
  storage/ndb/src/kernel/blocks/record_types.hpp
  storage/ndb/src/kernel/blocks/tsman.cpp
  storage/ndb/src/kernel/main.cpp
  storage/ndb/src/kernel/vm/SafeMutex.cpp
  storage/ndb/src/kernel/vm/SafeMutex.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
  storage/ndb/src/mgmapi/LocalConfig.cpp
  storage/ndb/src/mgmapi/mgmapi.cpp
  storage/ndb/src/mgmclient/CommandInterpreter.cpp
  storage/ndb/src/mgmsrv/ConfigInfo.cpp
  storage/ndb/test/ndbapi/testDict.cpp
  storage/ndb/test/ndbapi/testNodeRestart.cpp
  storage/ndb/test/run-test/command.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt
  storage/ndb/test/run-test/daily-devel-tests.txt
  storage/ndb/test/run-test/setup.cpp

=== modified file 'mysql-test/suite/ndb/t/ndb_dd_dump.test'
--- a/mysql-test/suite/ndb/t/ndb_dd_dump.test	2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_dump.test	2008-12-03 19:48:24 +0000
@@ -260,6 +260,7 @@ CREATE TABLE test.t (
 
  SELECT count(*) FROM test.t;
  LOAD DATA INFILE 't_backup' INTO TABLE test.t;
+ --remove_file $MYSQLTEST_VARDIR/master-data/test/t_backup
 
  SELECT * FROM test.t order by a;
 

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2008-12-05 08:33:43 +0000
+++ b/sql/ha_ndbcluster.cc	2008-12-05 09:09:17 +0000
@@ -4148,7 +4148,7 @@ int ha_ndbcluster::ndb_delete_row(const 
     /*
       Poor approx. let delete ~ tabsize / 4
     */
-    uint delete_size= 12 + m_bytes_per_write >> 2;
+    uint delete_size= 12 + (m_bytes_per_write >> 2);
     bool need_flush= add_row_check_if_batch_full_size(thd_ndb, delete_size);
     if ( allow_batch &&
 	 table_share->primary_key != MAX_KEY &&

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2008-11-21 21:06:44 +0000
+++ b/sql/table.cc	2008-12-02 15:07:57 +0000
@@ -4372,13 +4372,19 @@ void st_table::clear_column_bitmaps()
 void st_table::prepare_for_position()
 {
   DBUG_ENTER("st_table::prepare_for_position");
-
-  if ((file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
-      s->primary_key < MAX_KEY)
+  
+  if (s->primary_key < MAX_KEY)
   {
-    mark_columns_used_by_index_no_reset(s->primary_key, read_set);
-    /* signal change */
-    file->column_bitmaps_signal(HA_CHANGE_TABLE_READ_BITMAP);
+    if (file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX)
+    {
+      mark_columns_used_by_index_no_reset(s->primary_key, read_set);
+    }
+    if ((file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) ||
+        (file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
+    {
+      /* signal change */
+      file->column_bitmaps_signal(HA_COMPLETE_TABLE_READ_BITMAP);
+    }
   }
   DBUG_VOID_RETURN;
 }

=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp	2008-11-04 08:43:06 +0000
+++ b/storage/ndb/include/kernel/NodeInfo.hpp	2008-12-01 18:04:19 +0000
@@ -37,6 +37,7 @@ public:
   
   Uint32 m_version;       ///< Ndb version
   Uint32 m_mysql_version; ///< MySQL version
+  Uint32 m_lqh_workers;   ///< LQH workers
   Uint32 m_type;          ///< Node type
   Uint32 m_connectCount;  ///< No of times connected
   bool   m_connected;     ///< Node is connected
@@ -50,6 +51,7 @@ inline
 NodeInfo::NodeInfo(){
   m_version = 0;
   m_mysql_version = 0;
+  m_lqh_workers = 0;
   m_type = INVALID;
   m_connectCount = 0;
   m_heartbeat_cnt= 0;

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2008-11-13 15:22:59 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2008-12-02 13:10:49 +0000
@@ -187,4 +187,6 @@
 #define MAX_NDBMT_LQH_WORKERS 4
 #define MAX_NDBMT_LQH_THREADS 4
 
+#define NDB_FILE_BUFFER_SIZE (256*1024)
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp	2007-01-06 00:21:39 +0000
+++ b/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp	2008-12-01 18:04:19 +0000
@@ -161,7 +161,7 @@ class CmNodeInfoReq {
   friend class Qmgr;
   
 public:
-  STATIC_CONST( SignalLength = 4 );
+  STATIC_CONST( SignalLength = 5 );
   
 private:
   /**
@@ -171,6 +171,7 @@ private:
   Uint32 dynamicId;
   Uint32 version;
   Uint32 mysql_version;
+  Uint32 lqh_workers;   // added in telco-6.4
 };
 
 class CmNodeInfoRef {
@@ -198,20 +199,14 @@ class CmNodeInfoConf {
   friend class Qmgr;
   
 public:
-  STATIC_CONST( SignalLength = 4 );
+  STATIC_CONST( SignalLength = 5 );
   
 private:
   Uint32 nodeId;
   Uint32 dynamicId;
   Uint32 version;
   Uint32 mysql_version;
+  Uint32 lqh_workers;   // added in telco-6.4
 };
 
 #endif
-
-
-
-
-
-
-

=== modified file 'storage/ndb/include/kernel/signaldata/FsOpenReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp	2008-08-21 06:38:48 +0000
+++ b/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp	2008-12-02 13:10:49 +0000
@@ -33,6 +33,7 @@ class FsOpenReq {
   friend class Win32AsyncFile; // FIXME
   friend class Filename;
   friend class VoidFs;
+  friend class AsyncIoThread;
 
   /**
    * Sender(s)
@@ -90,6 +91,8 @@ private:
   STATIC_CONST( OM_CHECK_SIZE     = 0x2000 );
   STATIC_CONST( OM_DIRECT         = 0x4000 );
   STATIC_CONST( OM_GZ             = 0x8000 );
+  STATIC_CONST( OM_THREAD_POOL    = 0x10000 );
+  STATIC_CONST( OM_WRITE_BUFFER   = 0x20000 );
   
   enum Suffixes {
     S_DATA = 0,

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-12-02 13:10:49 +0000
@@ -147,6 +147,7 @@
 #define CFG_NDBMT_LQH_WORKERS         188
 
 #define CFG_DB_INIT_REDO              189
+#define CFG_DB_THREAD_POOL            190
 
 #define CFG_DB_SGA                    198 /* super pool mem */
 #define CFG_DB_DATA_MEM_2             199 /* used in special build in 5.1 */

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2008-11-14 09:12:01 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2008-12-01 18:04:19 +0000
@@ -102,6 +102,7 @@ Uint32 ndbGetOwnVersion();
 #define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
 #define NDBD_MAX_RECVBYTESIZE_32K MAKE_VERSION(6,3,18)
 #define NDBD_LONG_SCANFRAGREQ MAKE_VERSION(6,4,0)
+#define NDBD_MT_LQH_VERSION MAKE_VERSION(6,4,0)
 
 
 static

=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-11-13 14:16:21 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-12-03 19:49:40 +0000
@@ -5,7 +5,7 @@ Next DBACC 3002
 Next DBTUP 4029
 Next DBLQH 5051
 Next DBDICT 6013
-Next DBDIH 7215
+Next DBDIH 7216
 Next DBTC 8073
 Next CMVMI 9000
 Next BACKUP 10041
@@ -184,7 +184,9 @@ And crash when all have "not" been sent
 
 7213: in GCP_COMMIT Kill specified node and self, stop processing
 7214: in GCP_TCFINISHED kill specified node
-     
+
+7215: set c_fragments_per_node = 1 (needs to be done at startup)
+
 ERROR CODES FOR TESTING NODE FAILURE, FAILURE IN COPY FRAGMENT PROCESS:
 -----------------------------------------------------------------------
 

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-11-16 12:58:27 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2008-12-03 19:49:40 +0000
@@ -241,6 +241,12 @@ LocalProxy::setMask(SsParallel& ss)
     ss.m_workerMask.set(i);
 }
 
+void
+LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
+{
+  ss.m_workerMask.assign(mask);
+}
+
 // load workers (before first signal)
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-11-16 12:58:27 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2008-12-03 19:49:40 +0000
@@ -167,8 +167,9 @@ protected:
   bool firstReply(const SsParallel& ss);
   bool lastReply(const SsParallel& ss);
   bool lastExtra(Signal* signal, SsParallel& ss);
-  // set all bits in worker mask
+  // set all or given bits in worker mask
   void setMask(SsParallel& ss);
+  void setMask(SsParallel& ss, const WorkerMask& mask);
 
   /*
    * Ss instances are seized from a pool.  Each pool is simply an array

=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am	2008-11-16 15:29:16 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am	2008-12-02 13:10:49 +0000
@@ -41,6 +41,7 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
   dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
   dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
   dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
+  ndbfs/AsyncIoThread.cpp \
   ndbfs/PosixAsyncFile.cpp ndbfs/AsyncFile.cpp \
   ndbfs/Ndbfs.cpp \
   ndbfs/VoidFs.cpp \

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-11-21 11:04:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-12-02 14:25:58 +0000
@@ -9771,18 +9771,8 @@ flush:
       ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtrSend();
       conf->senderData = senderData;
       conf->noOfTables = count;
-      if (handle.m_cnt)
-      {
-        jam();
-        sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
-                   sigLen, JBB, &handle);
-      }
-      else
-      {
-        jam();
-        sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
-                   sigLen, JBB);
-      }
+      sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
+                 sigLen, JBB, &handle);
 
       signal->header.m_noOfSections = 0;
       signal->header.m_fragmentInfo = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-10-09 10:11:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-11-28 13:16:25 +0000
@@ -1246,7 +1246,8 @@ private:
 
   Uint32 c_nextNodeGroup;
   NodeGroupRecord *nodeGroupRecord;
-  
+  RSS_OP_SNAPSHOT(cnghash);
+
   NodeRecord *nodeRecord;
 
   PageRecord *pageRecord;

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-11-26 10:37:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-12-03 19:49:40 +0000
@@ -1228,6 +1228,10 @@ void Dbdih::execREAD_CONFIG_REQ(Signal* 
   {
     jam();
     c_fragments_per_node = getLqhWorkers();
+    // try to get some LQH workers which initially handle no fragments
+    if (ERROR_INSERTED(7215)) {
+      c_fragments_per_node = 1;
+    }
   }
   ndbout_c("Using %u fragments per node", c_fragments_per_node);
 
@@ -7089,7 +7093,7 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
           }
         }
       }
-
+      
       if (flags & CreateFragmentationReq::RI_GET_FRAGMENTATION)
       {
         jam();
@@ -7864,6 +7868,8 @@ void Dbdih::execALTER_TAB_REQ(Signal * s
     if ((err = add_fragments_to_table(tabPtr, buf)))
     {
       jam();
+      ndbrequire(tabPtr.p->totalfragments == save);
+      ndbrequire(connectPtr.p->m_alter.m_org_totalfragments == save);
       send_alter_tab_ref(signal, connectPtr, err);
       return;
     }
@@ -8057,8 +8063,7 @@ Dbdih::release_fragment_from_table(Ptr<T
   getFragstore(tabPtr.p, fragId, fragPtr);
   dec_ng_refcount(getNodeGroup(fragPtr.p->preferredPrimary));
 
-  Uint32 allocated = chunks << LOG_NO_OF_FRAGS_PER_CHUNK;
-  if (fragId < allocated)
+  if (fragId == ((chunks - 1) << LOG_NO_OF_FRAGS_PER_CHUNK))
   {
     jam();
 
@@ -8067,6 +8072,7 @@ Dbdih::release_fragment_from_table(Ptr<T
     fragPtr.p->nextFragmentChunk = cfirstfragstore;
     cfirstfragstore = fragPtr.i;
     cremainingfrags += NO_OF_FRAGS_PER_CHUNK;
+    tabPtr.p->noOfFragChunks = chunks - 1;
   }
 
   tabPtr.p->totalfragments--;
@@ -8111,12 +8117,16 @@ Dbdih::drop_fragments(Signal* signal, Pt
     Ptr<TabRecord> tabPtr;
     tabPtr.i = connectPtr.p->table;
     ptrAss(tabPtr, tabRecord);
-    for (Uint32 i = connectPtr.p->m_alter.m_totalfragments - 1;
-         i >= connectPtr.p->m_alter.m_org_totalfragments; i--)
+
+    Uint32 new_frags = connectPtr.p->m_alter.m_totalfragments;
+    Uint32 org_frags = connectPtr.p->m_alter.m_org_totalfragments;
+    tabPtr.p->totalfragments = new_frags;
+    for (Uint32 i = new_frags - 1; i >= org_frags; i--)
     {
       jam();
       release_fragment_from_table(tabPtr, i);
     }
+    connectPtr.p->m_alter.m_totalfragments = org_frags;
 
     switch(connectPtr.p->connectState){
     case ConnectRecord::ALTER_TABLE_ABORT:
@@ -16005,12 +16015,36 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
   if (arg == DumpStateOrd::SchemaResourceSnapshot)
   {
     RSS_OP_SNAPSHOT_SAVE(cremainingfrags);
+
+    {
+      Uint32 cnghash = 0;
+      NodeGroupRecordPtr NGPtr;
+      for (Uint32 i = 0; i<cnoOfNodeGroups; i++)
+      {
+        NGPtr.i = c_node_groups[i];
+        ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+        cnghash = (cnghash * 33) + NGPtr.p->m_ref_count;
+      }
+      RSS_OP_SNAPSHOT_SAVE(cnghash);
+    }
     return;
   }
 
   if (arg == DumpStateOrd::SchemaResourceCheckLeak)
   {
     RSS_OP_SNAPSHOT_CHECK(cremainingfrags);
+
+    {
+      Uint32 cnghash = 0;
+      NodeGroupRecordPtr NGPtr;
+      for (Uint32 i = 0; i<cnoOfNodeGroups; i++)
+      {
+        NGPtr.i = c_node_groups[i];
+        ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+        cnghash = (cnghash * 33) + NGPtr.p->m_ref_count;
+      }
+      RSS_OP_SNAPSHOT_CHECK(cnghash);
+    }
   }
 
   DECLARE_DUMP0(DBDIH, 7213, "Set error 7213 with extra arg")

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-12-01 18:05:11 +0000
@@ -859,6 +859,11 @@ public:
      * Log part
      */
     Uint32 m_log_part_ptr_i;
+
+    /**
+     * Instance key for fast access.
+     */
+    Uint16 lqhInstanceKey;
   };
   typedef Ptr<Fragrecord> FragrecordPtr;
   

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2008-12-01 18:05:11 +0000
@@ -72,3 +72,10 @@ NdbLogPartInfo::partNoIndex(Uint32 lpno)
   assert(partNo[i] == lpno);
   return i;
 }
+
+Uint32
+NdbLogPartInfo::instanceKey(Uint32 lpno) const
+{
+  assert(lpno < LogParts);
+  return 1 + lpno;
+}

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2008-12-01 18:05:11 +0000
@@ -47,6 +47,7 @@ struct NdbLogPartInfo {
   bool partNoOwner(Uint32 lpno) const;
   bool partNoOwner(Uint32 tabId, Uint32 fragId);
   Uint32 partNoIndex(Uint32 lpno) const;
+  Uint32 instanceKey(Uint32 lpno) const;
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-03 19:49:40 +0000
@@ -720,7 +720,10 @@ void Dblqh::startphase1Lab(Signal* signa
   for (Ti = 0; Ti < chostFileSize; Ti++) {
     ThostPtr.i = Ti;
     ptrCheckGuard(ThostPtr, chostFileSize, hostRecord);
-    // wl4391_todo using own instance() does not work with mixed versions
+    /*
+     * Valid only if receiver has same number of LQH workers.
+     * In general full instance key of fragment must be used.
+     */
     ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
     ThostPtr.p->hostTcBlockRef  = calcTcBlockRef(ThostPtr.i);
     ThostPtr.p->inPackedList = false;
@@ -1594,6 +1597,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
     ndbrequire(ptr.p->logPartNo == logPartNo);
 
     fragptr.p->m_log_part_ptr_i = ptr.i;
+    fragptr.p->lqhInstanceKey = lpinfo.instanceKey(logPartNo);
   }
 
   if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
@@ -3012,99 +3016,163 @@ void Dblqh::sendCommitLqh(Signal* signal
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(alqhBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[5];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->gci_hi;
+  Tdata[2] = tcConnectptr.p->transid[0];
+  Tdata[3] = tcConnectptr.p->transid[1];
+  Tdata[4] = tcConnectptr.p->gci_lo;
+  Uint32 len = 5;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+    len = 4;
+  }
+
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+  if (send_unpacked) {
+    jam();
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    Uint32 Tnode = Thostptr.i;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+    sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
     jam();
     sendPackedSignalLqh(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
-  Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMIT << 28);
-  Uint32 gci_hi = tcConnectptr.p->gci_hi;
-  Uint32 gci_lo = tcConnectptr.p->gci_lo;
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsLqh[pos] = ptrAndType;
-  Thostptr.p->packedWordsLqh[pos + 1] = gci_hi;
-  Thostptr.p->packedWordsLqh[pos + 2] = transid1;
-  Thostptr.p->packedWordsLqh[pos + 3] = transid2;
-  Thostptr.p->packedWordsLqh[pos + 4] = gci_lo;
-  Thostptr.p->noOfPackedWordsLqh = pos + 5;
-
-  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
-  {
-    jam();
-    ndbassert(gci_lo == 0 || getNodeInfo(Thostptr.i).m_connected == false);
-    Thostptr.p->noOfPackedWordsLqh = pos + 4;
   }
-}//Dblqh::sendCommitLqh()
+
+  Tdata[0] |= (ZCOMMIT << 28);
+  Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
+  memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
 
 void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(alqhBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+  if (send_unpacked) {
+    jam();
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    Uint32 Tnode = Thostptr.i;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+    sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsLqh > 22) {
     jam();
     sendPackedSignalLqh(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETE << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETE << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsLqh[pos] = ptrAndType;
-  Thostptr.p->packedWordsLqh[pos + 1] = transid1;
-  Thostptr.p->packedWordsLqh[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsLqh = pos + 3;
-}//Dblqh::sendCompleteLqh()
+  memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
 
 void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(atcBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently TC is single-threaded
+  const bool send_unpacked = false;
+  if (send_unpacked) {
+    jam();
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+    sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsTc > 22) {
     jam();
     sendPackedSignalTc(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMMITTED << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsTc;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMITTED << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsTc[pos] = ptrAndType;
-  Thostptr.p->packedWordsTc[pos + 1] = transid1;
-  Thostptr.p->packedWordsTc[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCommittedTc()
+  memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsTc = pos + len;
+}
 
 void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref)
 {
   HostRecordPtr Thostptr;
   Thostptr.i = refToNode(atcBlockref);
   ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+  Uint32 Tdata[3];
+  Tdata[0] = tcConnectptr.p->clientConnectrec;
+  Tdata[1] = tcConnectptr.p->transid[0];
+  Tdata[2] = tcConnectptr.p->transid[1];
+  Uint32 len = 3;
+
+  // currently TC is single-threaded
+  const bool send_unpacked = false;
+  if (send_unpacked) {
+    jam();
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
+    BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+    sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+    return;
+  }
+
   if (Thostptr.p->noOfPackedWordsTc > 22) {
     jam();
     sendPackedSignalTc(signal, Thostptr.p);
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETED << 28);
   Uint32 pos = Thostptr.p->noOfPackedWordsTc;
-  Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETED << 28);
-  Uint32 transid1 = tcConnectptr.p->transid[0];
-  Uint32 transid2 = tcConnectptr.p->transid[1];
-  Thostptr.p->packedWordsTc[pos] = ptrAndType;
-  Thostptr.p->packedWordsTc[pos + 1] = transid1;
-  Thostptr.p->packedWordsTc[pos + 2] = transid2;
-  Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCompletedTc()
+  memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsTc = pos + len;
+}
 
 void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref)
 {
@@ -3130,7 +3198,9 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig
     lqhKeyConf = (LqhKeyConf *)
       &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
     Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength;
-  } else if(refToMain(atcBlockref) == DBLQH){
+  } else if(refToMain(atcBlockref) == DBLQH &&
+            refToInstance(atcBlockref) == instance()) {
+    //wl4391_todo check
     jam();
 /*******************************************************************
 // This signal was intended for DBLQH as part of log execution or
@@ -5781,8 +5851,10 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
   lqhKeyReq->variableData[nextPos + 0] = sig0;
   nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
 
-  // wl4391_todo for mixed versions must recompute full instance key here
-  BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+  // pass full instance key for remote to map to real instance
+  BlockReference lqhRef = numberToRef(DBLQH,
+                                      fragptr.p->lqhInstanceKey,
+                                      regTcPtr->nextReplica);
   
   if (likely(sendLongReq))
   {
@@ -6112,7 +6184,17 @@ void Dblqh::writeAttrinfoLab(Signal* sig
 /* ------------------------------------------------------------------------- */
 void Dblqh::sendTupkey(Signal* signal) 
 {
-  BlockReference lqhRef = calcInstanceBlockRef(DBLQH, tcConnectptr.p->nextReplica);
+  BlockReference lqhRef = 0;
+  {
+    // wl4391_todo fragptr
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = tcConnectptr.p->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    Uint32 Tnode = tcConnectptr.p->nextReplica;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+  }
+
   signal->theData[0] = tcConnectptr.p->tcOprec;
   signal->theData[1] = tcConnectptr.p->transid[0];
   signal->theData[2] = tcConnectptr.p->transid[1];
@@ -7286,7 +7368,12 @@ void Dblqh::execABORT(Signal* signal) 
 /* ------------------------------------------------------------------------- */
 // We will immediately send the ABORT message also to the next LQH node in line.
 /* ------------------------------------------------------------------------- */
-    BlockReference TLqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+    FragrecordPtr Tfragptr;
+    Tfragptr.i = regTcPtr->fragmentptr;
+    c_fragment_pool.getPtr(Tfragptr);
+    Uint32 Tnode = regTcPtr->nextReplica;
+    Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+    BlockReference TLqhRef = numberToRef(DBLQH, instanceKey, Tnode);
     signal->theData[0] = regTcPtr->tcOprec;
     signal->theData[1] = regTcPtr->tcBlockref;
     signal->theData[2] = regTcPtr->transid[0];
@@ -11909,10 +11996,6 @@ void Dblqh::execLCP_FRAG_ORD(Signal* sig
       if (cnoOfFragsCheckpointed > 0) {
         jam();
         completeLcpRoundLab(signal, lcpId);
-      } else if (isNdbMtLqh()) {
-        jam();
-        // makes proxy code simpler
-        completeLcpRoundLab(signal, lcpId);
       } else {
         jam();
         sendLCP_COMPLETE_REP(signal, lcpId);
@@ -14141,7 +14224,7 @@ void Dblqh::openFileRw(Signal* signal, L
   signal->theData[3] = olfLogFilePtr.p->fileName[1];
   signal->theData[4] = olfLogFilePtr.p->fileName[2];
   signal->theData[5] = olfLogFilePtr.p->fileName[3];
-  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
   if (c_o_direct)
     signal->theData[6] |= FsOpenReq::OM_DIRECT;
   req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -14167,7 +14250,7 @@ void Dblqh::openLogfileInit(Signal* sign
   signal->theData[3] = logFilePtr.p->fileName[1];
   signal->theData[4] = logFilePtr.p->fileName[2];
   signal->theData[5] = logFilePtr.p->fileName[3];
-  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
+  signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_WRITE_BUFFER;
   if (c_o_direct)
     signal->theData[6] |= FsOpenReq::OM_DIRECT;
 
@@ -14271,7 +14354,7 @@ void Dblqh::openNextLogfile(Signal* sign
     signal->theData[3] = onlLogFilePtr.p->fileName[1];
     signal->theData[4] = onlLogFilePtr.p->fileName[2];
     signal->theData[5] = onlLogFilePtr.p->fileName[3];
-    signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+    signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
     if (c_o_direct)
       signal->theData[6] |= FsOpenReq::OM_DIRECT;
     req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -15215,10 +15298,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
      *    WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED 
      *    ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
      * --------------------------------------------------------------------- */
-    BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-    NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
     signal->theData[0] = cownNodeid;
-    sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    if (!isNdbMtLqh()) {
+      NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+      sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    } else {
+      const Uint32 sz = NdbNodeBitmask::Size;
+      m_sr_nodes.copyto(sz, &signal->theData[1]);
+      sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+    }
     return;
   } else {
     jam();
@@ -15232,7 +15320,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       
       Uint32 index = csrPhasesCompleted;
       arrGuard(index, MAX_LOG_EXEC);
-      BlockReference ref = calcInstanceBlockRef(DBLQH, fragptr.p->srLqhLognode[index]);
+      Uint32 Tnode = fragptr.p->srLqhLognode[index];
+      Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+      BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
       fragptr.p->srStatus = Fragrecord::SS_STARTED;
 
       /* --------------------------------------------------------------------
@@ -15249,7 +15339,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       execFragReq->lastGci = fragptr.p->srLastGci[index];
       sendSignal(ref, GSN_EXEC_FRAGREQ, signal, 
 		 ExecFragReq::SignalLength, JBB);
-
     }
     signal->theData[0] = next;
     sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15339,6 +15428,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
 /* *************** */
 void Dblqh::execEXEC_SRCONF(Signal* signal) 
 {
+  // wl4391_todo workaround until timing fixed
+  if (cnoOutstandingExecFragReq != 0) {
+    ndbout << "delay: reqs=" << cnoOutstandingExecFragReq << endl;
+    sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+                        signal, 10, signal->getLength());
+    return;
+  }
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   arrGuard(nodeId, MAX_NDB_NODES);
@@ -16704,9 +16800,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
   jamEntry();
 
   signal->theData[0] = cownNodeid;
-  BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-  NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
-  sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  if (!isNdbMtLqh()) {
+    NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+    sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  } else {
+    const Uint32 sz = NdbNodeBitmask::Size;
+    m_sr_nodes.copyto(sz, &signal->theData[1]);
+    sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+  }
   return;
 }//Dblqh::srPhase3Comp()
 
@@ -19543,7 +19644,7 @@ Dblqh::validate_filter(Signal* signal)
     default:
       infoEvent("Invalid filter op: 0x%x pos: %ld",
 		* start,
-		start - (signal->theData + 1));
+		(long int)(start - (signal->theData + 1)));
       return false;
     }
   }

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-12-03 19:49:40 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
 
   // GSN_SUB_GCP_COMPLETE_REP
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+  // GSN_EXEC_SRREQ
+  addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+  addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
 }
 
 DblqhProxy::~DblqhProxy()
@@ -443,6 +447,12 @@ DblqhProxy::sendLCP_FRAG_ORD(Signal* sig
     return;
   }
 
+  if (!ss.m_active.get(ss.m_worker)) {
+    jam();
+    D("LCP: active" << V(ss.m_worker));
+    ss.m_active.set(ss.m_worker);
+  }
+
   sendSignal(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
              signal, LcpFragOrd::SignalLength, JBB);
 }
@@ -488,6 +498,12 @@ DblqhProxy::execLCP_COMPLETE_ORD(Signal*
   Ss_LCP_COMPLETE_ORD& ss = ssSeize<Ss_LCP_COMPLETE_ORD>(ssId);
   ss.m_req = *req;
 
+  Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ssId);
+  const Uint32 activeCount = ssLcp.m_active.count();
+  D("LCP: complete" << V(activeCount));
+  // database with no fragments is not handled
+  ndbrequire(activeCount != 0);
+
   // seize END_LCP_REQ records
   Uint32 i;
   for (i = 0; i < ss.BlockCnt; i++) {
@@ -497,9 +513,10 @@ DblqhProxy::execLCP_COMPLETE_ORD(Signal*
     Uint32 ssIdEnd = getSsId(&tmp);
     Ss_END_LCP_REQ& ssEnd = ssSeize<Ss_END_LCP_REQ>(ssIdEnd);
     ss.m_endLcp[i].m_ssId = ssIdEnd;
+    ssEnd.m_ssIdLcp = ssId;
 
-    // set wait-for bitmask in SsParallel
-    setMask(ssEnd);
+    // set wait-for bitmask
+    setMask(ssEnd, ssLcp.m_active);
   }
 
   sendREQ(signal, ss);
@@ -655,6 +672,13 @@ void
 DblqhProxy::sendEND_LCP_CONF(Signal* signal, Uint32 ssId)
 {
   Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
+  Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ss.m_ssIdLcp);
+
+  // workers handling no fragments sent no REQ and get no CONF
+  if (!ssLcp.m_active.get(ss.m_worker)) {
+    jam();
+    return;
+  }
   
   EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
   conf->senderData = ss.m_req[ss.m_worker].senderData;
@@ -1327,6 +1351,114 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
   }
 
   ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
+}
+
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+  const Ss_EXEC_SR_1::Sig* sig =
+    (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+  Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+  ss.m_gsn = gsn;
+  ss.m_sig = *sig;
+
+  sendREQ(signal, ss);
+  ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+  const Ss_EXEC_SR_2::Sig* sig =
+    (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+
+  bool found = false;
+  Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+  if (!found) {
+    jam();
+    setMask(ss);
+  }
+
+  ndbrequire(sig->nodeId == getOwnNodeId());
+  if (ss.m_sigcount == 0) {
+    jam();
+    ss.m_gsn = gsn;
+    ss.m_sig = *sig;
+  } else {
+    jam();
+    ndbrequire(ss.m_gsn == gsn);
+    ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+  }
+  ss.m_sigcount++;
+
+  // reversed roles
+  recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+  if (!lastReply(ss)) {
+    jam();
+    return;
+  }
+
+  NodeBitmask nodes;
+  nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+  NodeReceiverGroup rg(DBLQH, nodes);
+
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+  ssRelease<Ss_EXEC_SR_2>(ssId);
 }
 
 BLOCK_FUNCTIONS(DblqhProxy)

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-12-03 19:49:40 +0000
@@ -126,6 +126,7 @@ protected:
      * set and is treated as a fictional signal GSN_LCP_COMPLETE_ORD.
      */
     static const char* name() { return "LCP_FRAG_ORD"; }
+    WorkerMask m_active; // handled at least 1 fragment
     Ss_LCP_FRAG_ORD() {
       m_sendREQ = (SsFUNC)&DblqhProxy::sendLCP_FRAG_ORD;
       m_sendCONF = (SsFUNC)0;
@@ -187,6 +188,7 @@ protected:
      * Note TSMAN sends no END_LCP_CONF.
      */
     static const char* name() { return "END_LCP_REQ"; }
+    Uint32 m_ssIdLcp;
     Uint32 m_reqcount;
     Uint32 m_backupId;
     Uint32 m_proxyBlockNo;
@@ -195,6 +197,7 @@ protected:
     Ss_END_LCP_REQ() {
       m_sendREQ = (SsFUNC)&DblqhProxy::sendEND_LCP_CONF;
       m_sendCONF = (SsFUNC)&DblqhProxy::sendEND_LCP_REQ;
+      m_ssIdLcp = 0;
       m_reqcount = 0;
       m_backupId = 0;
       m_proxyBlockNo = 0;
@@ -380,9 +383,7 @@ protected:
 
   // GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
   struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
     static const char* name() { return "START_RECREQ_2"; }
-#endif
     struct Req {
       enum { SignalLength = 2 };
       Uint32 lcpId;
@@ -458,6 +459,71 @@ protected:
   void execEMPTY_LCP_CONF(Signal*);
   void execEMPTY_LCP_REF(Signal*);
   void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_1 [ fictional gsn ]
+  struct Ss_EXEC_SR_1 : SsParallel {
+    /*
+     * Handle EXEC_SRREQ and EXEC_SRCONF.  These are broadcast
+     * signals (not REQ/CONF).  EXEC_SR_1 receives one signal and
+     * sends it to its workers.  EXEC_SR_2 waits for signal from
+     * all workers and broadcasts it to all nodes.  These are
+     * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+     */
+    static const char* name() { return "EXEC_SR_1"; }
+    struct Sig {
+      enum { SignalLength = 1 };
+      Uint32 nodeId;
+    };
+    GlobalSignalNumber m_gsn;
+    Sig m_sig;
+    Ss_EXEC_SR_1() {
+      m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+      m_sendCONF = (SsFUNC)0;
+      m_gsn = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+    }
+  };
+  SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+  Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SRREQ(Signal*);
+  void execEXEC_SRCONF(Signal*);
+  void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_2 [ fictional gsn ]
+  struct Ss_EXEC_SR_2 : SsParallel {
+    static const char* name() { return "EXEC_SR_2"; }
+    struct Sig {
+      enum { SignalLength = 1 + NdbNodeBitmask::Size };
+      Uint32 nodeId;
+      Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+    };
+    GlobalSignalNumber m_gsn;
+    Uint32 m_sigcount;
+    Sig m_sig; // all signals must be identical
+    Ss_EXEC_SR_2() {
+      // reversed roles
+      m_sendREQ = (SsFUNC)0;
+      m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+      m_gsn = 0;
+      m_sigcount = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+    }
+  };
+  SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+  Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_2(Signal*, Uint32 ssId);
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2008-10-30 09:43:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2008-12-03 19:51:33 +0000
@@ -1385,7 +1385,7 @@ private:
                      TcConnectRecord * const regTcPtr);
   void sendCompleteLqh(Signal* signal,
                        TcConnectRecord * const regTcPtr);
-  void sendTCKEY_FAILREF(Signal* signal, const ApiConnectRecord *);
+  void sendTCKEY_FAILREF(Signal* signal, ApiConnectRecord *);
   void sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecord *);
   void routeTCKEY_FAILREFCONF(Signal* signal, const ApiConnectRecord *, 
 			      Uint32 gsn, Uint32 len);
@@ -1462,6 +1462,7 @@ private:
   void seizeApiConnect(Signal* signal);
   void seizeApiConnectCopy(Signal* signal);
   void seizeApiConnectFail(Signal* signal);
+  void crash_gcp(Uint32 line);
   void seizeGcp(Ptr<GcpRecord> & dst, Uint64 gci);
   void seizeTcConnect(Signal* signal);
   void seizeTcConnectFail(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2008-12-03 19:51:33 +0000
@@ -316,7 +316,8 @@ Dbtc::Dbtc(Block_context& ctx):
 		    &cachePtr,
 		    &hostptr,
 		    &timeOutptr,
-		    &scanFragptr }; 
+		    &scanFragptr, 
+                    &tcNodeFailptr }; 
     init_globals_list(tmp, sizeof(tmp)/sizeof(tmp[0]));
   }
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-11-13 15:05:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-12-03 19:51:33 +0000
@@ -4685,7 +4685,15 @@ void Dbtc::commitGciHandling(Signal* sig
         linkApiToGcp(localGcpPointer, regApiPtr);
         return;
       } else {
-        ndbrequire(regApiPtr.p->globalcheckpointid > localGcpPointer.p->gcpId);
+        if (unlikely(! (regApiPtr.p->globalcheckpointid > localGcpPointer.p->gcpId)))
+        {
+          ndbout_c("%u/%u %u/%u",
+                   Uint32(regApiPtr.p->globalcheckpointid >> 32),
+                   Uint32(regApiPtr.p->globalcheckpointid),
+                   Uint32(localGcpPointer.p->gcpId >> 32),
+                   Uint32(localGcpPointer.p->gcpId));
+          crash_gcp(__LINE__);
+        }
         localGcpPointer.i = localGcpPointer.p->nextGcp;
         jam();
         if (localGcpPointer.i != RNIL) {
@@ -4734,6 +4742,33 @@ void Dbtc::linkApiToGcp(Ptr<GcpRecord> r
   regGcpPtr.p->lastApiConnect = regApiPtr.i;
 }//Dbtc::linkApiToGcp()
 
+void
+Dbtc::crash_gcp(Uint32 line)
+{
+  UintR Tfirstgcp = cfirstgcp;
+  UintR TgcpFilesize = cgcpFilesize;
+  GcpRecord *localGcpRecord = gcpRecord;
+  GcpRecordPtr localGcpPointer;
+
+  localGcpPointer.i = cfirstgcp;
+
+  while (localGcpPointer.i != RNIL)
+  {
+    ptrCheckGuard(localGcpPointer, cgcpFilesize, gcpRecord);
+    ndbout_c("%u : %u/%u nomoretrans: %u api %u %u next: %u",
+             localGcpPointer.i,
+             Uint32(localGcpPointer.p->gcpId >> 32),
+             Uint32(localGcpPointer.p->gcpId),             
+             localGcpPointer.p->gcpNomoretransRec,
+             localGcpPointer.p->firstApiConnect,
+             localGcpPointer.p->lastApiConnect,
+             localGcpPointer.p->nextGcp);
+    localGcpPointer.i = localGcpPointer.p->nextGcp;
+  }
+  progError(line, NDBD_EXIT_NDBREQUIRE);
+  ndbrequire(false);
+}
+
 void Dbtc::seizeGcp(Ptr<GcpRecord> & dst, Uint64 Tgci)
 {
   GcpRecordPtr tmpGcpPointer;
@@ -4744,6 +4779,11 @@ void Dbtc::seizeGcp(Ptr<GcpRecord> & dst
   GcpRecord *localGcpRecord = gcpRecord;
 
   localGcpPointer.i = cfirstfreeGcp;
+  if (unlikely(localGcpPointer.i > TgcpFilesize))
+  {
+    ndbout_c("%u/%u", Uint32(Tgci >> 32), Uint32(Tgci));
+    crash_gcp(__LINE__);
+  }
   ptrCheckGuard(localGcpPointer, TgcpFilesize, localGcpRecord);
   UintR TfirstfreeGcp = localGcpPointer.p->nextGcp;
   localGcpPointer.p->gcpId = Tgci;
@@ -4829,25 +4869,30 @@ void Dbtc::sendCommitLqh(Signal* signal,
   Thostptr.i = regTcPtr->lastLqhNodeId;
   ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = regTcPtr->lastLqhCon;
-  UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
-  UintR Tdata3 = regApiPtr->transid[0];
-  UintR Tdata4 = regApiPtr->transid[1];
-  UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
+  Uint32 Tdata[5];
+  Tdata[0] = regTcPtr->lastLqhCon;
+  Tdata[1] = Uint32(regApiPtr->globalcheckpointid >> 32);
+  Tdata[2] = regApiPtr->transid[0];
+  Tdata[3] = regApiPtr->transid[1];
+  Tdata[4] = Uint32(regApiPtr->globalcheckpointid);
+  Uint32 len = 5;
+
+  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+  {
+    jam();
+    ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+    len = 4;
+  }
 
-  // wl4391_todo testing own config is wrong for mixed versions
-  bool send_unpacked = isNdbMtLqh();
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
   if (send_unpacked) {
     Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata1;
-    data[1] = Tdata2;
-    data[2] = Tdata3;
-    data[3] = Tdata4;
-    data[4] = Tdata5;
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
     Uint32 Tnode = Thostptr.i;
     Uint32 instanceKey = regTcPtr->lqhInstanceKey;
     BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
-    sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB);
+    sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
     return;
   }
 
@@ -4857,23 +4902,14 @@ void Dbtc::sendCommitLqh(Signal* signal,
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMMIT << 28);
   UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
   UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
-  TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
-  TDataPtr[1] = Tdata2;
-  TDataPtr[2] = Tdata3;
-  TDataPtr[3] = Tdata4;
-  TDataPtr[4] = Tdata5;
-  Thostptr.p->noOfPackedWordsLqh = Tindex + 5;
-
-  if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
-  {
-    jam();
-    ndbassert(Tdata5 == 0 || getNodeInfo(Thostptr.i).m_connected == false);
-    Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo
-  }
-}//Dbtc::sendCommitLqh()
+  memcpy(TDataPtr, &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
 
 void
 Dbtc::DIVER_node_fail_handling(Signal* signal, Uint64 Tgci)
@@ -5104,7 +5140,6 @@ void Dbtc::copyApi(ApiConnectRecordPtr c
   UintR Tlqhkeyconfrec = regApiPtr.p->lqhkeyconfrec;
   UintR TgcpPointer = regApiPtr.p->gcpPointer;
   UintR TgcpFilesize = cgcpFilesize;
-  UintR TcommitAckMarker = regApiPtr.p->commitAckMarker;
   NdbNodeBitmask Tnodes = regApiPtr.p->m_transaction_nodes;
   GcpRecord *localGcpRecord = gcpRecord;
 
@@ -5115,7 +5150,7 @@ void Dbtc::copyApi(ApiConnectRecordPtr c
   copyPtr.p->transid[0] = Ttransid1;
   copyPtr.p->transid[1] = Ttransid2;
   copyPtr.p->lqhkeyconfrec = Tlqhkeyconfrec;
-  copyPtr.p->commitAckMarker = TcommitAckMarker;
+  copyPtr.p->commitAckMarker = RNIL;
   copyPtr.p->m_transaction_nodes = Tnodes;
   copyPtr.p->singleUserMode = 0;
 
@@ -5222,16 +5257,16 @@ void Dbtc::sendCompleteLqh(Signal* signa
   Thostptr.i = regTcPtr->lastLqhNodeId; //last???
   ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = regTcPtr->lastLqhCon;
-  UintR Tdata2 = regApiPtr->transid[0];
-  UintR Tdata3 = regApiPtr->transid[1];
+  Uint32 Tdata[3];
+  Tdata[0] = regTcPtr->lastLqhCon;
+  Tdata[1] = regApiPtr->transid[0];
+  Tdata[2] = regApiPtr->transid[1];
+  Uint32 len = 3;
 
-  bool send_unpacked = isNdbMtLqh();
+  // currently packed signal cannot address specific instance
+  const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
   if (send_unpacked) {
-    Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata1;
-    data[1] = Tdata2;
-    data[2] = Tdata3;
+    memcpy(&signal->theData[0], &Tdata[0], len << 2);
     Uint32 Tnode = Thostptr.i;
     Uint32 instanceKey = regTcPtr->lqhInstanceKey;
     BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
@@ -5245,14 +5280,14 @@ void Dbtc::sendCompleteLqh(Signal* signa
   } else {
     jam();
     updatePackedList(signal, Thostptr.p, Thostptr.i);
-  }//if
+  }
+
+  Tdata[0] |= (ZCOMPLETE << 28);
   UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
   UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
-  TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28);
-  TDataPtr[1] = Tdata2;
-  TDataPtr[2] = Tdata3;
-  Thostptr.p->noOfPackedWordsLqh = Tindex + 3;
-}//Dbtc::sendCompleteLqh()
+  memcpy(TDataPtr, &Tdata[0], len << 2);
+  Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
 
 void
 Dbtc::execTC_COMMIT_ACK(Signal* signal){
@@ -5301,22 +5336,25 @@ Dbtc::sendRemoveMarker(Signal* signal, 
   hostPtr.i = nodeId;
   ptrCheckGuard(hostPtr, ThostFilesize, hostRecord);
 
-  UintR Tdata1 = 0;
-  UintR Tdata2 = transid1;
-  UintR Tdata3 = transid2;
+  Uint32 Tdata[3];
+  Tdata[0] = 0;
+  Tdata[1] = transid1;
+  Tdata[2] = transid2;
+  Uint32 len = 3;
 
-  bool send_unpacked = isNdbMtLqh();
+  // currently packed signals can not address specific instance
+  bool send_unpacked = getNodeInfo(hostPtr.i).m_lqh_workers != 0;
   if (send_unpacked) {
-    Uint32* data = signal->getDataPtrSend();
-    data[0] = Tdata2;
-    data[1] = Tdata3;
+    jam();
+    // first word omitted
+    memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2);
     Uint32 Tnode = hostPtr.i;
     Uint32 i;
     for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) {
       // wl4391_todo skip workers not part of tx
       Uint32 instanceKey = 1 + i;
       BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
-      sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 2, JBB);
+      sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
     }
     return;
   }
@@ -5327,14 +5365,13 @@ Dbtc::sendRemoveMarker(Signal* signal, 
   } else {
     jam();
     updatePackedList(signal, hostPtr.p, hostPtr.i);
-  }//if
+  }
   
   UintR  numWord = hostPtr.p->noOfPackedWordsLqh;
   UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
 
-  dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28);
-  dataPtr[1] = Tdata2;
-  dataPtr[2] = Tdata3;
+  Tdata[0] |= (ZREMOVE_MARKER << 28);
+  memcpy(dataPtr, &Tdata[0], len << 2);
   hostPtr.p->noOfPackedWordsLqh = numWord + 3;
 }
 
@@ -5476,6 +5513,7 @@ void Dbtc::releaseApiConCopy(Signal* sig
   regApiPtr->nextApiConnect = TfirstfreeApiConnectCopyOld;
   setApiConTimer(apiConnectptr.i, 0, __LINE__);
   regApiPtr->apiConnectstate = CS_RESTART;
+  ndbrequire(regApiPtr->commitAckMarker == RNIL);
 }//Dbtc::releaseApiConCopy()
 
 /* ========================================================================= */
@@ -7864,6 +7902,10 @@ void Dbtc::execTAKE_OVERTCCONF(Signal* s
   if (signal->getSendersBlockRef() != reference())
   {
     jam();
+
+    tcNodeFailptr.i = 0;
+    ptrAss(tcNodeFailptr, tcFailRecord);
+
     /**
      * Node should be in queue
      */
@@ -8303,7 +8345,7 @@ void Dbtc::completeTransAtTakeOverDoOne(
 }//Dbtc::completeTransAtTakeOverDoOne()
 
 void 
-Dbtc::sendTCKEY_FAILREF(Signal* signal, const ApiConnectRecord * regApiPtr){
+Dbtc::sendTCKEY_FAILREF(Signal* signal, ApiConnectRecord * regApiPtr){
   jam();
 
   const Uint32 ref = regApiPtr->ndbapiBlockref;
@@ -8326,6 +8368,14 @@ Dbtc::sendTCKEY_FAILREF(Signal* signal, 
       routeTCKEY_FAILREFCONF(signal, regApiPtr, GSN_TCKEY_FAILREF, 3);
     }
   }
+
+  const Uint32 marker = regApiPtr->commitAckMarker;
+  if(marker != RNIL)
+  {
+    jam();
+    m_commitAckMarkerHash.release(marker);
+    regApiPtr->commitAckMarker = RNIL;
+  }
 }
 
 void 
@@ -8554,12 +8604,6 @@ void Dbtc::toAbortHandlingLab(Signal* si
         if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
           jam();
 	  sendTCKEY_FAILREF(signal, apiConnectptr.p);
-	  const Uint32 marker = apiConnectptr.p->commitAckMarker;
-          if(marker != RNIL){
-	    jam();
-            m_commitAckMarkerHash.release(marker);
-            apiConnectptr.p->commitAckMarker = RNIL;
-          }
           
 	  /*------------------------------------------------------------*/
 	  /*       WE HAVE COMPLETED THIS TRANSACTION NOW AND CAN       */
@@ -11607,6 +11651,7 @@ void Dbtc::releaseApiConnectFail(Signal*
   setApiConTimer(apiConnectptr.i, 0, __LINE__);
   apiConnectptr.p->nextApiConnect = cfirstfreeApiConnectFail;
   cfirstfreeApiConnectFail = apiConnectptr.i;
+  ndbrequire(apiConnectptr.p->commitAckMarker == RNIL);
 }//Dbtc::releaseApiConnectFail()
 
 void Dbtc::releaseKeys() 
@@ -12274,7 +12319,7 @@ Dbtc::validate_filter(Signal* signal)
     default:
       infoEvent("Invalid filter op: 0x%x pos: %ld",
 		* start,
-		start - (signal->theData + 1));
+		(long int)(start - (signal->theData + 1)));
       return false;
     }
   }

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2008-12-02 13:10:49 +0000
@@ -26,7 +26,6 @@
 #include <signaldata/SumaImpl.hpp>
 #include <signaldata/LgmanContinueB.hpp>
 #include <signaldata/GetTabInfo.hpp>
-#include "ndbfs/Ndbfs.hpp"
 #include "dbtup/Dbtup.hpp"
 
 #include <EventLogger.hpp>
@@ -56,11 +55,7 @@ Lgman::Lgman(Block_context & ctx) :
   m_tup(0),
   m_logfile_group_list(m_logfile_group_pool),
   m_logfile_group_hash(m_logfile_group_pool),
-#ifdef __sun // temp
-  m_client_mutex(1, false)
-#else
-  m_client_mutex(2, true)
-#endif
+  m_client_mutex("lgman-client", 2, true)
 {
   BLOCK_CONSTRUCTOR(Lgman);
   

=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2008-10-05 07:12:56 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2008-12-02 13:10:49 +0000
@@ -98,8 +98,8 @@ static BlockInfo ALL_BLOCKS[] = { 
 static const Uint32 ALL_BLOCKS_SZ = sizeof(ALL_BLOCKS)/sizeof(BlockInfo);
 
 static BlockReference readConfigOrder[ALL_BLOCKS_SZ] = {
-  NDBFS_REF, // let it run first to make sure it can start the threads
   CMVMI_REF,
+  NDBFS_REF,
   DBINFO_REF,
   DBTUP_REF,
   DBACC_REF,

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2008-12-02 13:10:49 +0000
@@ -27,203 +27,44 @@
 #include <signaldata/FsReadWriteReq.hpp>
 #include <Configuration.hpp>
 
-const char *actionName[] = {
-  "open",
-  "close",
-  "closeRemove",
-  "read",
-  "readv",
-  "write",
-  "writev",
-  "writeSync",
-  "writevSync",
-  "sync",
-  "end" };
-
-static int numAsyncFiles = 0;
-
-extern "C" void * runAsyncFile(void* arg)
-{
-  ((AsyncFile*)arg)->run();
-  return (NULL);
-}
-
-
 AsyncFile::AsyncFile(SimulatedBlock& fs) :
   theFileName(),
-  theReportTo(0),
-  theMemoryChannelPtr(NULL),
   m_fs(fs)
 {
-  m_page_ptr.setNull();
-  m_current_request= m_last_request= 0;
-  m_auto_sync_freq = 0;
-}
-
-struct NdbThread*
-AsyncFile::doStart()
-{
-  // Stacksize for filesystem threads
-  // An 8k stack should be enough
-  const NDB_THREAD_STACKSIZE stackSize = 8192;
-
-  char buf[16];
-  numAsyncFiles++;
-  BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
-
-  theStartMutexPtr = NdbMutex_Create();
-  theStartConditionPtr = NdbCondition_Create();
-  NdbMutex_Lock(theStartMutexPtr);
-  theStartFlag = false;
-
-  theThreadPtr = NdbThread_Create(runAsyncFile,
-                                  (void**)this,
-                                  stackSize,
-                                  (char*)&buf,
-                                  NDB_THREAD_PRIO_MEAN);
-
-  if (theThreadPtr == 0)
-  {
-    ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
-              "","Could not allocate file system thread");
-  }
-  NdbCondition_Wait(theStartConditionPtr,
-                    theStartMutexPtr);
-  NdbMutex_Unlock(theStartMutexPtr);
-  NdbMutex_Destroy(theStartMutexPtr);
-  NdbCondition_Destroy(theStartConditionPtr);
-
-  return theThreadPtr;
-}
+  m_thread = 0;
 
-void AsyncFile::shutdown()
-{
-  void *status;
-  Request request;
-  request.action = Request::end;
-  this->theMemoryChannelPtr->writeChannel( &request );
-  NdbThread_WaitFor(theThreadPtr, &status);
-  NdbThread_Destroy(&theThreadPtr);
-  delete theMemoryChannelPtr;
+  m_page_cnt = 0;
+  m_page_ptr.setNull();
+  theWriteBuffer = 0;
+  theWriteBufferSize = 0;
 }
 
 void
-AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
-{
-  theReportTo = reportTo;
-}
-
-void AsyncFile::execute(Request* request)
-{
-  theMemoryChannelPtr->writeChannel( request );
-}
-
-int AsyncFile::init()
+AsyncFile::attach(AsyncIoThread* thr)
 {
-  // Create write buffer for bigger writes
-  theWriteBufferSize = WRITEBUFFERSIZE;
-  theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
-
-  return 0;
+#if 0
+  ndbout_c("%p:%s attach to %p (m_thread: %p)", this, theFileName.c_str(), thr,
+             m_thread);
+#endif
+  assert(m_thread == 0);
+  m_thread = thr;
 }
 
 void
-AsyncFile::run()
+AsyncFile::detach(AsyncIoThread* thr)
 {
-  Request *request;
-
-  // Create theMemoryChannel in the thread that will wait for it
-  NdbMutex_Lock(theStartMutexPtr);
-  theMemoryChannelPtr = new MemoryChannel<Request>();
-  theStartFlag = true;
-
-  int r= this->init();
-
-  NdbMutex_Unlock(theStartMutexPtr);
-  NdbCondition_Signal(theStartConditionPtr);
-
-  if(r!=0)
-  {
-    DEBUG(ndbout_c("AsyncFile::init() failed"));
-    return;
-  }
-
-  while (1) {
-    request = theMemoryChannelPtr->readChannel();
-    if (!request) {
-      DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
-      endReq();
-      return;
-    }//if
-    m_current_request= request;
-    switch (request->action) {
-    case Request:: open:
-      openReq(request);
-      break;
-    case Request:: close:
-      closeReq(request);
-      break;
-    case Request:: closeRemove:
-      closeReq(request);
-      removeReq(request);
-      break;
-    case Request:: readPartial:
-    case Request:: read:
-      readReq(request);
-      break;
-    case Request:: readv:
-      readvReq(request);
-      break;
-    case Request:: write:
-      writeReq(request);
-      break;
-    case Request:: writev:
-      writevReq(request);
-      break;
-    case Request:: writeSync:
-      writeReq(request);
-      syncReq(request);
-      break;
-    case Request:: writevSync:
-      writevReq(request);
-      syncReq(request);
-      break;
-    case Request:: sync:
-      syncReq(request);
-      break;
-    case Request:: append:
-      appendReq(request);
-      break;
-    case Request:: append_synch:
-      appendReq(request);
-      syncReq(request);
-      break;
-    case Request::rmrf:
-      rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
-      break;
-    case Request:: end:
-      if (isOpen())
-        closeReq(request);
-      endReq();
-      return;
-    default:
-      DEBUG(ndbout_c("Invalid Request"));
-      abort();
-      break;
-    }//switch
-    m_last_request= request;
-    m_current_request= 0;
-
-    // No need to signal as ndbfs only uses tryRead
-    theReportTo->writeChannelNoSignal(request);
-  }//while
-}//AsyncFile::run()
-
+#if 0
+  ndbout_c("%p:%s detach from %p", this, theFileName.c_str(), thr);
+#endif
+  assert(m_thread == thr);
+  m_thread = 0;
+}
 
 void
 AsyncFile::readReq( Request * request)
 {
-  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
+  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++)
+  {
     off_t offset = request->par.readWrite.pages[i].offset;
     size_t size  = request->par.readWrite.pages[i].size;
     char * buf   = request->par.readWrite.pages[i].buf;
@@ -244,76 +85,90 @@ AsyncFile::readvReq( Request * request)
 }
 
 void
-AsyncFile::writeReq( Request * request)
+AsyncFile::writeReq(Request * request)
 {
-  int page_num = 0;
-  bool write_not_complete = true;
+  Uint32 cnt = request->par.readWrite.numberOfPages;
+  if (theWriteBuffer == 0 || cnt == 1)
+  {
+    for (Uint32 i = 0; i<cnt; i++)
+    {
+      int err = writeBuffer(request->par.readWrite.pages[i].buf,
+                            request->par.readWrite.pages[i].size,
+                            request->par.readWrite.pages[i].offset);
+      if (err)
+      {
+        request->error = err;
+        return;
+      }
+      goto done;
+    }
+  }
 
-  while(write_not_complete) {
-    int totsize = 0;
-    off_t offset = request->par.readWrite.pages[page_num].offset;
-    char* bufptr = theWriteBuffer;
-
-    write_not_complete = false;
-    if (request->par.readWrite.numberOfPages > 1) {
-      off_t page_offset = offset;
-
-      // Multiple page write, copy to buffer for one write
-      for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
-        memcpy(bufptr,
-               request->par.readWrite.pages[i].buf,
-               request->par.readWrite.pages[i].size);
-        bufptr += request->par.readWrite.pages[i].size;
-        totsize += request->par.readWrite.pages[i].size;
-        if (((i + 1) < request->par.readWrite.numberOfPages)) {
-          // There are more pages to write
-          // Check that offsets are consequtive
-	  off_t tmp = page_offset + request->par.readWrite.pages[i].size;
-          if (tmp != request->par.readWrite.pages[i+1].offset) {
-            // Next page is not aligned with previous, not allowed
-            DEBUG(ndbout_c("Page offsets are not aligned"));
-            request->error = EINVAL;
-            return;
-          }
-          if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
-            // We are not finished and the buffer is full
-            write_not_complete = true;
-            // Start again with next page
-            page_num = i + 1;
-            break;
+  {
+    int page_num = 0;
+    bool write_not_complete = true;
+
+    while(write_not_complete) {
+      int totsize = 0;
+      off_t offset = request->par.readWrite.pages[page_num].offset;
+      char* bufptr = theWriteBuffer;
+
+      write_not_complete = false;
+      if (request->par.readWrite.numberOfPages > 1) {
+        off_t page_offset = offset;
+
+        // Multiple page write, copy to buffer for one write
+        for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
+          memcpy(bufptr,
+                 request->par.readWrite.pages[i].buf,
+                 request->par.readWrite.pages[i].size);
+          bufptr += request->par.readWrite.pages[i].size;
+          totsize += request->par.readWrite.pages[i].size;
+          if (((i + 1) < request->par.readWrite.numberOfPages)) {
+            // There are more pages to write
+            // Check that offsets are consequtive
+            off_t tmp = page_offset + request->par.readWrite.pages[i].size;
+            if (tmp != request->par.readWrite.pages[i+1].offset) {
+              // Next page is not aligned with previous, not allowed
+              DEBUG(ndbout_c("Page offsets are not aligned"));
+              request->error = EINVAL;
+              return;
+            }
+            if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
+              // We are not finished and the buffer is full
+              write_not_complete = true;
+              // Start again with next page
+              page_num = i + 1;
+              break;
+            }
           }
+          page_offset += request->par.readWrite.pages[i].size;
         }
-        page_offset += request->par.readWrite.pages[i].size;
+        bufptr = theWriteBuffer;
+      } else {
+        // One page write, write page directly
+        bufptr = request->par.readWrite.pages[0].buf;
+        totsize = request->par.readWrite.pages[0].size;
       }
-      bufptr = theWriteBuffer;
-    } else {
-      // One page write, write page directly
-      bufptr = request->par.readWrite.pages[0].buf;
-      totsize = request->par.readWrite.pages[0].size;
-    }
-    int err = writeBuffer(bufptr, totsize, offset);
-    if(err != 0){
-      request->error = err;
-      return;
-    }
-  } // while(write_not_complete)
-
-  if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+      int err = writeBuffer(bufptr, totsize, offset);
+      if(err != 0){
+        request->error = err;
+        return;
+      }
+    } // while(write_not_complete)
+  }
+done:
+  if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
+  {
     syncReq(request);
   }
 }
 
 void
-AsyncFile::writevReq( Request * request)
+AsyncFile::writevReq(Request * request)
 {
   // WriteFileGather on WIN32?
   writeReq(request);
-}
-
-void AsyncFile::endReq()
-{
-  if (theWriteBuffer)
-    ndbd_free(theWriteBuffer, theWriteBufferSize);
 }
 
 #ifdef DEBUG_ASYNCFILE

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp	2008-12-02 13:10:49 +0000
@@ -16,166 +16,33 @@
 #ifndef AsyncFile_H
 #define AsyncFile_H
 
-/**
-   AsyncFile
-
-   All file operations executed in thread-per-file, away from the DB threads.
- */
-
-
-// void execute(Request *request);
-// Description:
-//   performens the requered action.
-// Parameters:
-//    request: request to be called when open is finished.
-//       action= open|close|read|write|sync
-//          if action is open then:
-//             par.open.flags= UNIX open flags, see man open
-//             par.open.name= name of the file to open
-//          if action is read or write then:
-//             par.readWrite.buf= user provided buffer to read/write 
-//             the data from/to
-//             par.readWrite.size= how many bytes must be read/written
-//             par.readWrite.offset= absolute offset in file in bytes
-// return:
-//    return values are stored in the request error field:
-//       error= return state of the action, UNIX error see man open/errno
-//       userData= is untouched can be used be user.
-
-
-
-// void reportTo( MemoryChannel<Request> *reportTo );
-// Description:
-//   set the channel where the file must report the result of the 
-//    actions back to.
-// Parameters:
-//    reportTo: the memory channel to use use MemoryChannelMultipleWriter 
-//              if more 
-//              than one file uses this channel to report back.
-
 #include <kernel_types.h>
-#include "MemoryChannel.hpp"
+#include "AsyncIoThread.hpp"
 #include "Filename.hpp"
 
-// Use this define if you want printouts from AsyncFile class
-//#define DEBUG_ASYNCFILE
-
-#ifdef DEBUG_ASYNCFILE
-#include <NdbOut.hpp>
-#define DEBUG(x) x
-#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
-void printErrorAndFlags(Uint32 used_flags);
-#else
-#define DEBUG(x)
-#define PRINT_ERRORANDFLAGS(f)
-#endif
-
-// Define the size of the write buffer (for each thread)
-#define WRITEBUFFERSIZE 262144
-
-const int ERR_ReadUnderflow = 1000;
-
-const int WRITECHUNK = 262144;
-
-class AsyncFile;
-
-class Request
-{
-public:
-  Request() {}
-
-  enum Action {
-    open,
-    close,
-    closeRemove,
-    read,   // Allways leave readv directly after 
-            // read because SimblockAsyncFileSystem depends on it
-    readv,
-    write,// Allways leave writev directly after 
-	        // write because SimblockAsyncFileSystem depends on it
-    writev,
-    writeSync,// Allways leave writevSync directly after 
-    // writeSync because SimblockAsyncFileSystem depends on it
-    writevSync,
-    sync,
-    end,
-    append,
-    append_synch,
-    rmrf,
-    readPartial
-  };
-  Action action;
-  union {
-    struct {
-      Uint32 flags;
-      Uint32 page_size;
-      Uint64 file_size;
-      Uint32 auto_sync_size;
-    } open;
-    struct {
-      int numberOfPages;
-      struct{
-	char *buf;
-	size_t size;
-	off_t offset;
-      } pages[16];
-    } readWrite;
-    struct {
-      const char * buf;
-      size_t size;
-    } append;
-    struct {
-      bool directory;
-      bool own_directory;
-    } rmrf;
-  } par;
-  int error;
-  
-  void set(BlockReference userReference, 
-	   Uint32 userPointer,
-	   Uint16 filePointer);
-  BlockReference theUserReference;
-  Uint32 theUserPointer;
-  Uint16 theFilePointer;
-   // Information for open, needed if the first open action fails.
-  AsyncFile* file;
-  Uint32 theTrace;
-};
-
-NdbOut& operator <<(NdbOut&, const Request&);
-
-inline
-void 
-Request::set(BlockReference userReference, 
-	     Uint32 userPointer, Uint16 filePointer) 
-{
-  theUserReference= userReference;
-  theUserPointer= userPointer;
-  theFilePointer= filePointer;
-}
-
 class AsyncFile
 {
   friend class Ndbfs;
+  friend class AsyncIoThread;
+
 public:
   AsyncFile(SimulatedBlock& fs);
   virtual ~AsyncFile() {};
 
-  void reportTo( MemoryChannel<Request> *reportTo );
-
-  void execute( Request* request );
-
-  virtual struct NdbThread* doStart();
+  virtual int init() = 0;
 
-  virtual void shutdown();
-
-  // its a thread so its always running
-  virtual void run();
+  void reportTo( MemoryChannel<Request> *reportTo );
 
   virtual bool isOpen() = 0;
 
   Filename theFileName;
   Request *m_current_request, *m_last_request;
+
+  void set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt);
+  bool has_buffer() const;
+  void clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt);
+
+  AsyncIoThread* getThread() const { return m_thread;}
 private:
 
   /**
@@ -184,14 +51,6 @@ private:
    */
 
   /**
-   * init()
-   *
-   * Initialise buffers etc. After init(), ready to execute()
-   * Called with theStartMutexPtr held.
-   */
-  virtual int init();
-
-  /**
    * openReq() - open a file.
    */
   virtual void openReq(Request *request) = 0;
@@ -204,15 +63,13 @@ private:
   /**
    * writeBuffer() - write into file
    */
-  virtual int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK)=0;
-
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset)=0;
 
   virtual void closeReq(Request *request)=0;
   virtual void syncReq(Request *request)=0;
   virtual void removeReq(Request *request)=0;
   virtual void appendReq(Request *request)=0;
-  virtual void rmrfReq(Request *request, char * path, bool removePath)=0;
+  virtual void rmrfReq(Request *request, const char * path, bool removePath)=0;
   virtual void createDirectories()=0;
 
   /**
@@ -229,38 +86,60 @@ protected:
   virtual void writeReq(Request *request);
   virtual void writevReq(Request *request);
 
-  /**
-   * endReq()
-   *
-   * Inverse to ::init(). Cleans up thread before it exits.
-   */
-  virtual void endReq();
-
 private:
-  /**
-   * (end of what implementors need)
-   */
+  void attach(AsyncIoThread* thr);
+  void detach(AsyncIoThread* thr);
 
-  MemoryChannel<Request> *theReportTo;
-  MemoryChannel<Request>* theMemoryChannelPtr;
-
-  struct NdbThread* theThreadPtr;
-  NdbMutex* theStartMutexPtr;
-  NdbCondition* theStartConditionPtr;
-  bool   theStartFlag;
+  AsyncIoThread* m_thread; // For bound files
 
 protected:
-  int theWriteBufferSize;
-  char* theWriteBuffer;
-
   size_t m_write_wo_sync;  // Writes wo/ sync
   size_t m_auto_sync_freq; // Auto sync freq in bytes
+  Uint32 m_open_flags;
 
-public:
-  SimulatedBlock& m_fs;
-
+  /**
+   * file buffers
+   */
   Uint32 m_page_cnt;
   Ptr<GlobalPage> m_page_ptr;
+
+  char* theWriteBuffer;
+  Uint32 theWriteBufferSize;
+
+public:
+  SimulatedBlock& m_fs;
 };
+
+inline
+void
+AsyncFile::set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt)
+{
+  assert(!has_buffer());
+  m_page_ptr = ptr;
+  m_page_cnt = cnt;
+  theWriteBuffer = (char*)ptr.p;
+  theWriteBufferSize = cnt * sizeof(GlobalPage);
+}
+
+inline
+bool
+AsyncFile::has_buffer() const
+{
+  return m_page_cnt > 0;
+}
+
+inline
+void
+AsyncFile::clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt)
+{
+  assert(has_buffer());
+  ptr = m_page_ptr;
+  cnt = m_page_cnt;
+  m_page_cnt = 0;
+  m_page_ptr.setNull();
+  theWriteBuffer = 0;
+  theWriteBufferSize = 0;
+}
+
 
 #endif

=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	2008-12-02 13:10:49 +0000
@@ -0,0 +1,203 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#include "AsyncIoThread.hpp"
+#include "AsyncFile.hpp"
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <Configuration.hpp>
+#include "Ndbfs.hpp"
+
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+  : m_fs(fs)
+{
+  m_current_file = file;
+  if (file)
+  {
+    theMemoryChannelPtr = &theMemoryChannel;
+  }
+  else
+  {
+    theMemoryChannelPtr = &m_fs.theToThreads;
+  }
+  theReportTo = &m_fs.theFromThreads;
+}
+
+static int numAsyncFiles = 0;
+
+extern "C"
+void *
+runAsyncIoThread(void* arg)
+{
+  ((AsyncIoThread*)arg)->run();
+  return (NULL);
+}
+
+
+struct NdbThread*
+AsyncIoThread::doStart()
+{
+  // Stacksize for filesystem threads
+  // An 8k stack should be enough
+  const NDB_THREAD_STACKSIZE stackSize = 8192;
+
+  char buf[16];
+  numAsyncFiles++;
+  BaseString::snprintf(buf, sizeof(buf), "AsyncIoThread%d", numAsyncFiles);
+
+  theStartMutexPtr = NdbMutex_Create();
+  theStartConditionPtr = NdbCondition_Create();
+  NdbMutex_Lock(theStartMutexPtr);
+  theStartFlag = false;
+
+  theThreadPtr = NdbThread_Create(runAsyncIoThread,
+                                  (void**)this,
+                                  stackSize,
+                                  buf,
+                                  NDB_THREAD_PRIO_MEAN);
+
+  if (theThreadPtr == 0)
+  {
+    ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
+              "","Could not allocate file system thread");
+  }
+
+  do
+  {
+    NdbCondition_Wait(theStartConditionPtr,
+                      theStartMutexPtr);
+  }
+  while (theStartFlag == false);
+
+  NdbMutex_Unlock(theStartMutexPtr);
+  NdbMutex_Destroy(theStartMutexPtr);
+  NdbCondition_Destroy(theStartConditionPtr);
+
+  return theThreadPtr;
+}
+
+void
+AsyncIoThread::shutdown()
+{
+  void *status;
+  Request request;
+  request.action = Request::end;
+  this->theMemoryChannelPtr->writeChannel( &request );
+  NdbThread_WaitFor(theThreadPtr, &status);
+  NdbThread_Destroy(&theThreadPtr);
+}
+
+void
+AsyncIoThread::dispatch(Request *request)
+{
+  assert(m_current_file);
+  assert(m_current_file->getThread() == this);
+  assert(theMemoryChannelPtr == &theMemoryChannel);
+  theMemoryChannelPtr->writeChannel(request);
+}
+
+void
+AsyncIoThread::run()
+{
+  Request *request;
+
+  // Create theMemoryChannel in the thread that will wait for it
+  NdbMutex_Lock(theStartMutexPtr);
+  theStartFlag = true;
+  NdbMutex_Unlock(theStartMutexPtr);
+  NdbCondition_Signal(theStartConditionPtr);
+
+  while (1)
+  {
+    request = theMemoryChannelPtr->readChannel();
+    if (!request || request->action == Request::end)
+    {
+      DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
+      theStartFlag = false;
+      return;
+    }//if
+
+    AsyncFile * file = request->file;
+    m_current_request= request;
+    switch (request->action) {
+    case Request::open:
+      file->openReq(request);
+      break;
+    case Request::close:
+      file->closeReq(request);
+      break;
+    case Request::closeRemove:
+      file->closeReq(request);
+      file->removeReq(request);
+      break;
+    case Request::readPartial:
+    case Request::read:
+      file->readReq(request);
+      break;
+    case Request::readv:
+      file->readvReq(request);
+      break;
+    case Request::write:
+      file->writeReq(request);
+      break;
+    case Request::writev:
+      file->writevReq(request);
+      break;
+    case Request::writeSync:
+      file->writeReq(request);
+      file->syncReq(request);
+      break;
+    case Request::writevSync:
+      file->writevReq(request);
+      file->syncReq(request);
+      break;
+    case Request::sync:
+      file->syncReq(request);
+      break;
+    case Request::append:
+      file->appendReq(request);
+      break;
+    case Request::append_synch:
+      file->appendReq(request);
+      file->syncReq(request);
+      break;
+    case Request::rmrf:
+      file->rmrfReq(request, file->theFileName.c_str(),
+                    request->par.rmrf.own_directory);
+      break;
+    case Request::end:
+      theStartFlag = false;
+      return;
+    default:
+      DEBUG(ndbout_c("Invalid Request"));
+      abort();
+      break;
+    }//switch
+    m_last_request = request;
+    m_current_request = 0;
+
+    // No need to signal as ndbfs only uses tryRead
+    theReportTo->writeChannelNoSignal(request);
+  }
+}

=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	2008-12-02 13:10:49 +0000
@@ -0,0 +1,151 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef AsyncIoThread_H
+#define AsyncIoThread_H
+
+#include <kernel_types.h>
+#include <Pool.hpp>
+#include "MemoryChannel.hpp"
+
+// Use this define if you want printouts from AsyncFile class
+//#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+const int ERR_ReadUnderflow = 1000;
+
+class AsyncFile;
+
+class Request
+{
+public:
+  Request() {}
+
+  enum Action {
+    open,
+    close,
+    closeRemove,
+    read,   // Allways leave readv directly after
+            // read because SimblockAsyncFileSystem depends on it
+    readv,
+    write,// Allways leave writev directly after
+	        // write because SimblockAsyncFileSystem depends on it
+    writev,
+    writeSync,// Allways leave writevSync directly after
+    // writeSync because SimblockAsyncFileSystem depends on it
+    writevSync,
+    sync,
+    end,
+    append,
+    append_synch,
+    rmrf,
+    readPartial
+  };
+  Action action;
+  union {
+    struct {
+      Uint32 flags;
+      Uint32 page_size;
+      Uint64 file_size;
+      Uint32 auto_sync_size;
+    } open;
+    struct {
+      int numberOfPages;
+      struct{
+	char *buf;
+	size_t size;
+	off_t offset;
+      } pages[16];
+    } readWrite;
+    struct {
+      const char * buf;
+      size_t size;
+    } append;
+    struct {
+      bool directory;
+      bool own_directory;
+    } rmrf;
+  } par;
+  int error;
+
+  void set(BlockReference userReference,
+	   Uint32 userPointer,
+	   Uint16 filePointer);
+  BlockReference theUserReference;
+  Uint32 theUserPointer;
+  Uint16 theFilePointer;
+   // Information for open, needed if the first open action fails.
+  AsyncFile* file;
+  Uint32 theTrace;
+};
+
+NdbOut& operator <<(NdbOut&, const Request&);
+
+inline
+void
+Request::set(BlockReference userReference,
+	     Uint32 userPointer, Uint16 filePointer)
+{
+  theUserReference= userReference;
+  theUserPointer= userPointer;
+  theFilePointer= filePointer;
+}
+
+class AsyncIoThread
+{
+  friend class Ndbfs;
+  friend class AsyncFile;
+public:
+  AsyncIoThread(class Ndbfs&, AsyncFile* file);
+  virtual ~AsyncIoThread() {};
+
+  struct NdbThread* doStart();
+  void shutdown();
+
+  // its a thread so its always running
+  void run();
+
+  /**
+   * Add a request to a thread,
+   *   should only be used with bound threads
+   */
+  void dispatch(Request*);
+
+  AsyncFile * m_current_file;
+  Request *m_current_request, *m_last_request;
+
+private:
+  Ndbfs & m_fs;
+
+  MemoryChannel<Request> *theReportTo;
+  MemoryChannel<Request> *theMemoryChannelPtr;
+  MemoryChannel<Request> theMemoryChannel; // If file-bound
+
+  bool   theStartFlag;
+  struct NdbThread* theThreadPtr;
+  NdbMutex* theStartMutexPtr;
+  NdbCondition* theStartConditionPtr;
+};
+
+#endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt	2008-08-20 13:22:09 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt	2008-12-02 13:19:42 +0000
@@ -18,5 +18,5 @@ INCLUDE(${CMAKE_SOURCE_DIR}/storage/ndb/
 
 ADD_LIBRARY(ndbndbfs STATIC
              AsyncFile.cpp Ndbfs.cpp VoidFs.cpp Filename.cpp CircularIndex.cpp
-             Win32AsyncFile.cpp
+             Win32AsyncFile.cpp AsyncIoThread.cpp
 )

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp	2008-08-22 11:02:38 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp	2008-12-02 13:10:49 +0000
@@ -131,8 +131,8 @@ template <class T> void MemoryChannel<T>
   if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort();
   theChannel[theWriteIndex]= t;
   ++theWriteIndex;
-  NdbMutex_Unlock(theMutexPtr);
   NdbCondition_Signal(theConditionPtr);
+  NdbMutex_Unlock(theMutexPtr);
 }
 
 template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2008-12-02 13:10:49 +0000
@@ -53,7 +53,6 @@ int pageSize( const NewVARIABLE* baseAdd
    return (1 << log_psize);
 }
 
-
 Ndbfs::Ndbfs(Block_context& ctx) :
   SimulatedBlock(NDBFS, ctx),
   scanningInProgress(false),
@@ -80,16 +79,47 @@ Ndbfs::Ndbfs(Block_context& ctx) :
 
 Ndbfs::~Ndbfs()
 {
-  // Delete all files
-  // AsyncFile destuctor will take care of deleting
-  // the thread it has created
+  /**
+   * Stop all unbound threads
+   */
+
+  /**
+   * Post enought Request::end to saturate all unbound threads
+   */
+  Request request;
+  request.action = Request::end;
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    theToThreads.writeChannel(&request);
+  }
+
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    AsyncIoThread * thr = theThreads[i];
+    thr->shutdown();
+  }
+
+  /**
+   * delete all threads
+   */
+  for (unsigned i = 0; i < theThreads.size(); i++)
+  {
+    AsyncIoThread * thr = theThreads[i];
+    delete thr;
+    theThreads[i] = 0;
+  }
+  theThreads.clear();
+
+  /**
+   * Delete all files
+   */
   for (unsigned i = 0; i < theFiles.size(); i++){
     AsyncFile* file = theFiles[i];
-    file->shutdown();
     delete file;
     theFiles[i] = NULL;
   }//for
   theFiles.clear();
+
   if (theRequestPool)
     delete theRequestPool;
 }
@@ -114,12 +144,34 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
   m_maxFiles = 0;
   ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
   Uint32 noIdleFiles = 27;
+
   ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
   if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
     m_maxFiles = noIdleFiles;
+
   // Create idle AsyncFiles
-  for (Uint32 i = 0; i < noIdleFiles; i++){
-    theIdleFiles.push_back(createAsyncFile());
+  for (Uint32 i = 0; i < noIdleFiles; i++)
+  {
+    theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+  }
+
+  Uint32 threadpool = 8;
+  ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
+
+  // Create IoThreads
+  for (Uint32 i = 0; i < threadpool; i++)
+  {
+    AsyncIoThread * thr = createIoThread(0);
+    if (thr)
+    {
+      jam();
+      theThreads.push_back(thr);
+    }
+    else
+    {
+      jam();
+      break;
+    }
   }
 
   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -176,7 +228,15 @@ int 
 Ndbfs::forward( AsyncFile * file, Request* request)
 {
   jam();
-  file->execute(request);
+  AsyncIoThread* thr = file->getThread();
+  if (thr) // bound
+  {
+    thr->dispatch(request);
+  }
+  else
+  {
+    theToThreads.writeChannel(request);
+  }
   return 1;
 }
 
@@ -186,13 +246,27 @@ Ndbfs::execFSOPENREQ(Signal* signal)
   jamEntry();
   const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
   const BlockReference userRef = fsOpenReq->userReference;
-  AsyncFile* file = getIdleFile();
+
+  bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
+  AsyncFile* file = getIdleFile(bound);
   ndbrequire(file != NULL);
   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
 
   Uint32 userPointer = fsOpenReq->userPointer;
+
+  if(signal->getNoOfSections() == 0){
+    jam();
+    file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
+  } else {
+    jam();
+    SectionHandle handle(this, signal);
+    SegmentedSectionPtr ptr;
+    handle.getSection(ptr, FsOpenReq::FILENAME);
+    file->theFileName.set(spec, ptr, g_sectionSegmentPool);
+    releaseSections(handle);
+  }
   
-  if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
+  if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
   {
     jam();
     Uint32 cnt = 16; // 512k
@@ -211,9 +285,29 @@ Ndbfs::execFSOPENREQ(Signal* signal)
       return;
     }
     m_shared_page_pool.getPtr(page_ptr);
-    file->m_page_ptr = page_ptr;
-    file->m_page_cnt = cnt;
+    file->set_buffer(page_ptr, cnt);
   } 
+  else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
+  {
+    jam();
+    Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
+    Ptr<GlobalPage> page_ptr;
+    m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
+    if(cnt == 0)
+    {
+      file->m_page_ptr.setNull();
+      file->m_page_cnt = 0;
+
+      FsRef * const fsRef = (FsRef *)&signal->theData[0];
+      fsRef->userPointer  = userPointer;
+      fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
+      fsRef->osErrorCode  = ~0; // Indicate local error
+      sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
+      return;
+    }
+    m_shared_page_pool.getPtr(page_ptr);
+    file->set_buffer(page_ptr, cnt);
+  }
   else
   {
     ndbassert(file->m_page_ptr.isNull());
@@ -221,20 +315,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
     file->m_page_cnt = 0;
   }
   
-  if(signal->getNoOfSections() == 0){
-    jam();
-    file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
-  } else {
-    jam();
-    SectionHandle handle(this, signal);
-    SegmentedSectionPtr ptr;
-    handle.getSection(ptr, FsOpenReq::FILENAME);
-    file->theFileName.set(spec, ptr, g_sectionSegmentPool);
-    releaseSections(handle);
-  }
-  file->reportTo(&theFromThreads);
   if (getenv("NDB_TRACE_OPEN"))
-    ndbout_c("open(%s)", file->theFileName.c_str());
+    ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
   
   Request* request = theRequestPool->get();
   request->action = Request::open;
@@ -258,12 +340,11 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
   jamEntry();
   const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
   const BlockReference userRef = req->userReference;
-  AsyncFile* file = getIdleFile();
+  AsyncFile* file = getIdleFile(true);
   ndbrequire(file != NULL);
 
   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
   file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
-  file->reportTo(&theFromThreads);
   
   Request* request = theRequestPool->get();
   request->action = Request::rmrf;
@@ -303,6 +384,9 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
     return;
   }
 
+  if (getenv("NDB_TRACE_OPEN"))
+    ndbout_c("close(%s)", openFile->theFileName.c_str());
+
   Request *request = theRequestPool->get();
   if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
      jam();
@@ -669,10 +753,11 @@ Ndbfs::newId()
 }
 
 AsyncFile*
-Ndbfs::createAsyncFile(){
+Ndbfs::createAsyncFile(bool bound){
 
   // Check limit of open files
-  if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles) {
+  if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles)
+  {
     // Print info about all open files
     for (unsigned i = 0; i < theFiles.size(); i++){
       AsyncFile* file = theFiles[i];
@@ -687,29 +772,76 @@ Ndbfs::createAsyncFile(){
   AsyncFile* file = new PosixAsyncFile(* this);
 #endif
 
-  struct NdbThread* thr = file->doStart();
-  globalEmulatorData.theConfiguration->addThread(thr, NdbfsThread);
+  if (file->init())
+  {
+    ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
+  }
 
-  // Put the file in list of all files
-  theFiles.push_back(file);
+  if (bound)
+  {
+    AsyncIoThread * thr = createIoThread(file);
+    theThreads.push_back(thr);
+    file->attach(thr);
 
 #ifdef VM_TRACE
-  infoEvent("NDBFS: Created new file thread %d", theFiles.size());
+    ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
 #endif
+  }
+
+  theFiles.push_back(file);
   
   return file;
 }
 
+void
+Ndbfs::pushIdleFile(AsyncFile* file)
+{
+  if (file->getThread())
+  {
+    theIdleBoundFiles.push_back(file);
+  }
+  else
+  {
+    theIdleUnboundFiles.push_back(file);
+  }
+}
+
+AsyncIoThread*
+Ndbfs::createIoThread(AsyncFile* file)
+{
+  AsyncIoThread* thr = new AsyncIoThread(*this, file);
+
+  struct NdbThread* thrptr = thr->doStart();
+  globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+  return thr;
+}
+
 AsyncFile*
-Ndbfs::getIdleFile(){
-  AsyncFile* file;
-  if (theIdleFiles.size() > 0){
-    file = theIdleFiles[0];
-    theIdleFiles.erase(0);
-  } else {
-    file = createAsyncFile();
-  } 
-  return file;
+Ndbfs::getIdleFile(bool bound)
+{
+  if (bound)
+  {
+    Uint32 sz = theIdleBoundFiles.size();
+    if (sz)
+    {
+      AsyncFile* file = theIdleBoundFiles[sz - 1];
+      theIdleBoundFiles.erase(sz - 1);
+      return file;
+    }
+  }
+  else
+  {
+    Uint32 sz = theIdleUnboundFiles.size();
+    if (sz)
+    {
+      AsyncFile* file = theIdleUnboundFiles[sz - 1];
+      theIdleUnboundFiles.erase(sz - 1);
+      return file;
+    }
+  }
+
+  return createAsyncFile(bound);
 }
 
 
@@ -721,14 +853,28 @@ Ndbfs::report(Request * request, Signal*
   signal->setTrace(request->theTrace);
   const BlockReference ref = request->theUserReference;
 
-  if(!request->file->m_page_ptr.isNull())
+  if(request->file->has_buffer())
   {
-    assert(request->file->m_page_cnt > 0);
-    m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, 
-                             request->file->m_page_ptr.i,
-                             request->file->m_page_cnt);
-    request->file->m_page_ptr.setNull();
-    request->file->m_page_cnt = 0;
+    Uint32 cnt;
+    Ptr<GlobalPage> ptr;
+    if (request->file->m_open_flags & FsOpenReq::OM_INIT)
+    {
+      jam();
+      request->file->clear_buffer(ptr, cnt);
+      m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, ptr.i, cnt);
+    }
+    else if (request->file->m_open_flags & FsOpenReq::OM_WRITE_BUFFER)
+    {
+      jam();
+      if ((request->action == Request::open && request->error) ||
+          (request->action == Request::close ||
+           request->action == Request::closeRemove))
+      {
+        jam();
+        request->file->clear_buffer(ptr, cnt);
+        m_ctx.m_mm.release_pages(RT_FILE_BUFFER, ptr.i, cnt);
+      }
+    }
   }
   
   if (request->error) {
@@ -750,7 +896,7 @@ Ndbfs::report(Request * request, Signal*
     case Request:: open: {
       jam();
       // Put the file back in idle files list
-      theIdleFiles.push_back(request->file);  
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
       break;
     }
@@ -790,7 +936,7 @@ Ndbfs::report(Request * request, Signal*
     case Request::rmrf: {
       jam();
       // Put the file back in idle files list
-      theIdleFiles.push_back(request->file);  
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
       break;
     }
@@ -823,7 +969,7 @@ Ndbfs::report(Request * request, Signal*
       // removes the file from OpenFiles list
       theOpenFiles.erase(request->theFilePointer); 
       // Put the file in idle files list
-      theIdleFiles.push_back(request->file); 
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
       break;
     }
@@ -863,7 +1009,7 @@ Ndbfs::report(Request * request, Signal*
     case Request::rmrf: {
       jam();
       // Put the file in idle files list
-      theIdleFiles.push_back(request->file);            
+      pushIdleFile(request->file);
       sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
       break;
     }
@@ -1055,9 +1201,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     infoEvent("NDBFS: Files: %d Open files: %d",
 	      theFiles.size(),
 	      theOpenFiles.size());
-    infoEvent(" Idle files: %d Max opened files: %d",
-	       theIdleFiles.size(),
-	       m_maxOpenedFiles);
+    infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
+              theIdleBoundFiles.size(),
+              theIdleUnboundFiles.size(),
+              m_maxOpenedFiles);
     infoEvent(" Max files: %d",
 	      m_maxFiles);
     infoEvent(" Requests: %d",
@@ -1084,10 +1231,16 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     return;
   }
   if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
-    infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
+    infoEvent("NDBFS: Dump idle files: %d %u",
+              theIdleBoundFiles.size(), theIdleUnboundFiles.size());
     
-    for (unsigned i = 0; i < theIdleFiles.size(); i++){
-      AsyncFile* file = theIdleFiles[i];
+    for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
+      AsyncFile* file = theIdleBoundFiles[i];
+      infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
+    }
+
+    for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
+      AsyncFile* file = theIdleUnboundFiles[i];
       infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
     }
     return;
@@ -1095,6 +1248,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
 
   if(signal->theData[0] == 404)
   {
+#if 0
     ndbrequire(signal->getLength() == 2);
     Uint32 file= signal->theData[1];
     AsyncFile* openFile = theOpenFiles.find(file);
@@ -1115,6 +1269,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
       AsyncFile* file = theFiles[i];
       ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
     }
+#endif
   }
 }//Ndbfs::execDUMP_STATE_ORD()
 
@@ -1132,6 +1287,7 @@ Ndbfs::get_filename(Uint32 fd) const
 BLOCK_FUNCTIONS(Ndbfs)
 
 template class Vector<AsyncFile*>;
+template class Vector<AsyncIoThread*>;
 template class Vector<OpenFiles::OpenFileItem>;
 template class MemoryChannel<Request>;
 template class Pool<Request>;

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2008-12-02 13:10:49 +0000
@@ -22,21 +22,21 @@
 #include "AsyncFile.hpp"
 #include "OpenFiles.hpp"
 
-
+class AsyncIoThread;
 
 // Because one NDB Signal request can result in multiple requests to
 // AsyncFile one class must be made responsible to keep track
 // of all out standing request and when all are finished the result
 // must be reported to the sending block.
 
-
 class Ndbfs : public SimulatedBlock
 {
+  friend class AsyncIoThread;
 public:
   Ndbfs(Block_context&);
   virtual ~Ndbfs();
-
   virtual const char* get_filename(Uint32 fd) const;
+
 protected:
   BLOCK_DEFINES(Ndbfs);
 
@@ -69,17 +69,22 @@ private:
   Uint16 theLastId;
   BlockReference cownref;
 
-  // Communication from files 
+  // Communication from/to files
   MemoryChannel<Request> theFromThreads;
+  MemoryChannel<Request> theToThreads;
 
   Pool<Request>* theRequestPool;
 
-  AsyncFile* createAsyncFile();
-  AsyncFile* getIdleFile();
-
-  Vector<AsyncFile*> theFiles;     // List all created AsyncFiles
-  Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
-  OpenFiles theOpenFiles;          // List of open AsyncFiles
+  AsyncIoThread* createIoThread(AsyncFile* file);
+  AsyncFile* createAsyncFile(bool bound);
+  AsyncFile* getIdleFile(bool bound);
+  void pushIdleFile(AsyncFile*);
+
+  Vector<AsyncIoThread*> theThreads;// List of all created threads
+  Vector<AsyncFile*> theFiles;      // List all created AsyncFiles
+  Vector<AsyncFile*> theIdleBoundFiles;   // List of idle AsyncFiles
+  Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+  OpenFiles theOpenFiles;           // List of open AsyncFiles
 
   BaseString theFileSystemPath;
   BaseString theBackupFilePath;

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp	2008-12-02 20:01:33 +0000
@@ -21,6 +21,7 @@
 #include <xfs/xfs.h>
 #endif
 
+#include "Ndbfs.hpp"
 #include "AsyncFile.hpp"
 #include "PosixAsyncFile.hpp"
 
@@ -34,17 +35,8 @@
 
 #include <NdbTick.h>
 
-// use this to test broken pread code
-//#define HAVE_BROKEN_PREAD
-
-#ifdef HAVE_BROKEN_PREAD
-#undef HAVE_PWRITE
-#undef HAVE_PREAD
-#endif
-
 // For readv and writev
 #include <sys/uio.h>
-
 #include <dirent.h>
 
 PosixAsyncFile::PosixAsyncFile(SimulatedBlock& fs) :
@@ -53,18 +45,12 @@ PosixAsyncFile::PosixAsyncFile(Simulated
   use_gz(0)
 {
   memset(&azf,0,sizeof(azf));
+  init_mutex();
 }
 
 int PosixAsyncFile::init()
 {
   // Create write buffer for bigger writes
-  theWriteBufferSize = WRITEBUFFERSIZE;
-  theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
-                                                 NDB_O_DIRECT_WRITE_ALIGNMENT-1);
-  theWriteBuffer = (char *)
-    (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
-     ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
   azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
                                          +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
 
@@ -82,13 +68,8 @@ int PosixAsyncFile::init()
 
   azf.stream.opaque= &az_mempool;
 
-  if (!theWriteBuffer) {
-    DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
-    return -1;
-  }//if
-
   return 0;
-}//AsyncFile::init()
+}
 
 #ifdef O_DIRECT
 static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
@@ -229,8 +210,14 @@ void PosixAsyncFile::openReq(Request *re
     break;
     return;
   }
-  if(flags & FsOpenReq::OM_GZ)
-    use_gz= 1;
+  if (flags & FsOpenReq::OM_GZ)
+  {
+    use_gz = 1;
+  }
+  else
+  {
+    use_gz = 0;
+  }
 
   // allow for user to choose any permissionsa with umask
   const int mode = S_IRUSR | S_IWUSR |
@@ -363,7 +350,8 @@ no_odirect:
   retry:
       off_t save_size = size;
       char* buf = (char*)m_page_ptr.p;
-      while(size > 0){
+      while(size > 0)
+      {
 #ifdef TRACE_INIT
         write_cnt++;
 #endif
@@ -504,8 +492,9 @@ int PosixAsyncFile::readBuffer(Request *
 {
   int return_value;
   req->par.readWrite.pages[0].size = 0;
-#if ! defined(HAVE_PREAD)
   off_t seek_val;
+#if ! defined(HAVE_PREAD)
+  FileGuard guard(this);
   if(!use_gz)
   {
     while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
@@ -516,7 +505,6 @@ int PosixAsyncFile::readBuffer(Request *
     }
   }
 #endif
-  off_t seek_val;
   if(use_gz)
   {
     while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
@@ -611,15 +599,16 @@ void PosixAsyncFile::readvReq(Request *r
 #endif
 }
 
-int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset,
-                                size_t chunk_size)
+int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset)
 {
+  size_t chunk_size = 256*1024;
   size_t bytes_to_write = chunk_size;
   int return_value;
 
   m_write_wo_sync += size;
 
 #if ! defined(HAVE_PWRITE)
+  FileGuard guard(this);
   off_t seek_val;
   while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
 	&& errno == EINTR);
@@ -772,56 +761,66 @@ void PosixAsyncFile::removeReq(Request *
   }
 }
 
-void PosixAsyncFile::rmrfReq(Request *request, char *path, bool removePath)
+void
+PosixAsyncFile::rmrfReq(Request *request, const char * src, bool removePath)
 {
-  Uint32 path_len = strlen(path);
-  Uint32 path_max_copy = PATH_MAX - path_len;
-  char* path_add = &path[path_len];
-
-  if(!request->par.rmrf.directory){
+  if(!request->par.rmrf.directory)
+  {
     // Remove file
-    if(unlink((const char *)path) != 0 && errno != ENOENT)
+    if(unlink(src) != 0 && errno != ENOENT)
       request->error = errno;
     return;
   }
-  // Remove directory
-  DIR* dirp = opendir((const char *)path);
-  if(dirp == 0){
+
+  char path[PATH_MAX];
+  strcpy(path, src);
+  strcat(path, "/");
+
+  DIR* dirp;
+  struct dirent * dp;
+loop:
+  dirp = opendir(path);
+  if(dirp == 0)
+  {
     if(errno != ENOENT)
       request->error = errno;
     return;
   }
-  struct dirent * dp;
-  while ((dp = readdir(dirp)) != NULL){
-    if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) {
-      BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
-	       DIR_SEPARATOR, dp->d_name);
-      if(remove((const char*)path) == 0){
-        path[path_len] = 0;
-	continue;
-      }
 
-      rmrfReq(request, path, true);
-      path[path_len] = 0;
-      if(request->error != 0){
-	closedir(dirp);
-	return;
+  while ((dp = readdir(dirp)) != NULL)
+  {
+    if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) 
+    {
+      int len = strlen(path);
+      strcat(path, dp->d_name);
+      if (remove(path) == 0)
+      {
+        path[len] = 0;
+        continue;
       }
+      
+      closedir(dirp);
+      strcat(path, "/");
+      goto loop;
     }
   }
   closedir(dirp);
-  if(removePath && rmdir((const char *)path) != 0){
+  path[strlen(path)-1] = 0; // remove /
+  if (strcmp(src, path) != 0)
+  {
+    char * t = strrchr(path, '/');
+    t[1] = 0;
+    goto loop;
+  }
+
+  if(removePath && rmdir(src) != 0)
+  {
     request->error = errno;
   }
-  return;
 }
 
-void PosixAsyncFile::endReq()
+PosixAsyncFile::~PosixAsyncFile()
 {
-  // Thread is ended with return
-  if (theWriteBufferUnaligned)
-    ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
-
   if (azfBufferUnaligned)
     ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
               +NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -830,10 +829,9 @@ void PosixAsyncFile::endReq()
     ndbd_free(az_mempool.mem,az_mempool.size);
 
   az_mempool.mem = NULL;
-  theWriteBufferUnaligned = NULL;
   azfBufferUnaligned = NULL;
+  destroy_mutex();
 }
-
 
 void PosixAsyncFile::createDirectories()
 {

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp	2007-11-15 00:30:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp	2008-12-02 13:10:49 +0000
@@ -24,15 +24,26 @@
 
 #include <azlib.h>
 
+/**
+ * PREAD/PWRITE is needed to use file != thread
+ *   therefor it's defined/checked here
+ */
+#ifdef HAVE_BROKEN_PREAD
+#undef HAVE_PWRITE
+#undef HAVE_PREAD
+#elif defined (HAVE_PREAD)
+#define HAVE_PWRITE
+#endif
+
 class PosixAsyncFile : public AsyncFile
 {
   friend class Ndbfs;
 public:
   PosixAsyncFile(SimulatedBlock& fs);
+  virtual ~PosixAsyncFile();
 
-  int init();
-
-  bool isOpen();
+  virtual int init();
+  virtual bool isOpen();
 
   virtual void openReq(Request *request);
   virtual void readvReq(Request *request);
@@ -41,32 +52,59 @@ public:
   virtual void syncReq(Request *request);
   virtual void removeReq(Request *request);
   virtual void appendReq(Request *request);
-  virtual void rmrfReq(Request *request, char * path, bool removePath);
-  void endReq();
+  virtual void rmrfReq(Request *request, const char * path, bool removePath);
 
   virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
-  virtual int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK);
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset);
 
   virtual void createDirectories();
 
 private:
   int theFd;
 
-  Uint32 m_open_flags; // OM_ flags from request to open file
-
   int use_gz;
   azio_stream azf;
   struct az_alloc_rec az_mempool;
-
-  void* theWriteBufferUnaligned;
   void* azfBufferUnaligned;
 
-  size_t m_write_wo_sync;  // Writes wo/ sync
-  size_t m_auto_sync_freq; // Auto sync freq in bytes
-
   int check_odirect_read(Uint32 flags, int&new_flags, int mode);
   int check_odirect_write(Uint32 flags, int&new_flags, int mode);
+
+#ifndef HAVE_PREAD
+  struct FileGuard;
+  friend struct FileGuard;
+  NdbMutex * m_mutex;
+  void init_mutex() { m_mutex = NdbMutex_Create();}
+  void destroy_mutex() { NdbMutex_Destroy(m_mutex);}
+
+  /**
+   * If dont HAVE_PREAD and using file != thread
+   */
+  struct FileGuard
+  {
+    PosixAsyncFile* m_file;
+    FileGuard (PosixAsyncFile* file) : m_file(file) {
+      if (m_file->getThread() == 0)
+      {
+        NdbMutex_Lock(m_file->m_mutex);
+      }
+    }
+    ~FileGuard() {
+      if (m_file->getThread() == 0)
+      {
+        NdbMutex_Unlock(m_file->m_mutex);
+      }
+    }
+  };
+#else
+  void init_mutex() {}
+  void destroy_mutex() {}
+  struct FileGuard
+  {
+    FileGuard (PosixAsyncFile* file){}
+    ~FileGuard () {}
+  };
+#endif
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp	2008-12-02 20:01:33 +0000
@@ -37,6 +37,12 @@ Win32AsyncFile::~Win32AsyncFile()
 
 }
 
+int
+Win32AsyncFile::init()
+{
+  return 0;
+}
+
 void Win32AsyncFile::openReq(Request* request)
 {
   m_auto_sync_freq = 0;
@@ -171,7 +177,7 @@ void Win32AsyncFile::openReq(Request* re
                           );
       Uint32 size = request->par.open.page_size;
       char* buf = (char*)m_page_ptr.p;
-	  DWORD dwWritten;
+      DWORD dwWritten;
       while(size > 0){
 	BOOL bWrite= WriteFile(hFile, buf, size, &dwWritten, 0);
 	if(!bWrite || dwWritten!=size)
@@ -205,7 +211,8 @@ void Win32AsyncFile::openReq(Request* re
 }
 
 int
-Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
+Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset)
+{
   req->par.readWrite.pages[0].size = 0;
 
   while (size > 0) {
@@ -259,9 +266,9 @@ Win32AsyncFile::readBuffer(Request* req,
 }
 
 int
-Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
-		       size_t chunk_size)
+Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset)
 {
+  size_t chunk_size = 256 * 1024;
   size_t bytes_to_write = chunk_size;
 
   m_write_wo_sync += size;
@@ -365,55 +372,65 @@ Win32AsyncFile::removeReq(Request * requ
 }
 
 void
-Win32AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
-  Uint32 path_len = strlen(path);
-  Uint32 path_max_copy = PATH_MAX - path_len;
-  char* path_add = &path[path_len];
-
-  if(!request->par.rmrf.directory){
+Win32AsyncFile::rmrfReq(Request * request, const char * src, bool removePath){
+  if (!request->par.rmrf.directory)
+  {
     // Remove file
-    if(!DeleteFile(path)){
+    if (!DeleteFile(src))
+    {
       DWORD dwError = GetLastError();
-      if(dwError!=ERROR_FILE_NOT_FOUND)
+      if (dwError != ERROR_FILE_NOT_FOUND)
 	request->error = dwError;
     }
     return;
   }
 
+  char path[PATH_MAX];
+  strcpy(path, src);
   strcat(path, "\\*");
+
   WIN32_FIND_DATA ffd;
-  HANDLE hFindFile = FindFirstFile(path, &ffd);
-  path[path_len] = 0;
-  if(INVALID_HANDLE_VALUE==hFindFile){
+  HANDLE hFindFile;
+loop:
+  hFindFile = FindFirstFile(path, &ffd);
+  if (INVALID_HANDLE_VALUE == hFindFile)
+  {
     DWORD dwError = GetLastError();
-    if(dwError!=ERROR_PATH_NOT_FOUND)
+    if (dwError != ERROR_PATH_NOT_FOUND)
       request->error = dwError;
     return;
   }
+  path[strlen(path) - 1] = 0; // remove '*'
 
   do {
-    if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
-      strcat(path, "\\");
+    if (0 != strcmp(".", ffd.cFileName) && 0 != strcmp("..", ffd.cFileName))
+    {
+      int len = strlen(path);
       strcat(path, ffd.cFileName);
-      if(DeleteFile(path)) {
-        path[path_len] = 0;
+      if(DeleteFile(path)) 
+      {
+        path[len] = 0;
 	continue;
       }//if
 
-      rmrfReq(request, path, true);
-      path[path_len] = 0;
-      if(request->error != 0){
-	FindClose(hFindFile);
-	return;
-      }
+      FindClose(hFindFile);
+      strcat(path, "\\*");
+      goto loop;
     }
   } while(FindNextFile(hFindFile, &ffd));
-
+  
   FindClose(hFindFile);
+  path[strlen(path)-1] = 0; // remove '\'
+  if (strcmp(src, path) != 0)
+  {
+    char * t = strrchr(path, '\\');
+    t[1] = '*';
+    t[2] = 0;
+    goto loop;
+  }
 
-  if(removePath && !RemoveDirectory(path))
+  if(removePath && !RemoveDirectory(src))
     request->error = GetLastError();
-
 }
 
 void Win32AsyncFile::createDirectories()

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp	2008-08-21 06:38:48 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp	2008-12-02 17:02:41 +0000
@@ -30,32 +30,25 @@ class Win32AsyncFile : public AsyncFile
   friend class Ndbfs;
 public:
   Win32AsyncFile(SimulatedBlock& fs);
-  ~Win32AsyncFile();
+  virtual ~Win32AsyncFile();
 
-  void reportTo( MemoryChannel<Request> *reportTo );
+  virtual int init();
+  virtual bool isOpen();
+  virtual void openReq(Request *request);
+  virtual void closeReq(Request *request);
+  virtual void syncReq(Request *request);
+  virtual void removeReq(Request *request);
+  virtual void appendReq(Request *request);
+  virtual void rmrfReq(Request *request, const char * path, bool removePath);
 
-  void execute( Request* request );
-
-  bool isOpen();
+  virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
+  virtual int writeBuffer(const char * buf, size_t size, off_t offset);
 
 private:
-
-  void openReq(Request *request);
-  void closeReq(Request *request);
-  void syncReq(Request *request);
-  void removeReq(Request *request);
-  void appendReq(Request *request);
-  void rmrfReq(Request *request, char * path, bool removePath);
-
-  int readBuffer(Request*, char * buf, size_t size, off_t offset);
-  int writeBuffer(const char * buf, size_t size, off_t offset,
-		  size_t chunk_size = WRITECHUNK);
-
   int extendfile(Request* request);
   void createDirectories();
 
   HANDLE hFile;
-  Uint32 m_open_flags; // OM_ flags from request to open file
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2008-11-13 13:36:29 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2008-12-01 18:04:19 +0000
@@ -991,6 +991,9 @@ void Qmgr::execCM_REGCONF(Signal* signal
   c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);
 
   myNodePtr.p->ndynamicId = TdynamicId;
+
+  // set own MT config here or in REF, and others in CM_NODEINFOREQ/CONF
+  setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
   
 /*--------------------------------------------------------------*/
 // Send this as an EVENT REPORT to inform about hearing about
@@ -1109,6 +1112,7 @@ Qmgr::sendCmNodeInfoReq(Signal* signal, 
   req->dynamicId = self->ndynamicId;
   req->version = getNodeInfo(getOwnNodeId()).m_version;
   req->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+  req->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
   const Uint32 ref = calcQmgrBlockRef(nodeId);
   sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB);
   DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, "");
@@ -1214,6 +1218,9 @@ void Qmgr::execCM_REGREF(Signal* signal)
 
   skip_nodes.bitAND(c_definedNodes);
   c_start.m_skip_nodes.bitOR(skip_nodes);
+
+  // set own MT config here or in CONF, and others in CM_NODEINFOREQ/CONF
+  setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
   
   char buf[100];
   switch (TrefuseReason) {
@@ -1661,11 +1668,17 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
   const Uint32 dynamicId = conf->dynamicId;
   const Uint32 version = conf->version;
   Uint32 mysql_version = conf->mysql_version;
+  Uint32 lqh_workers = conf->lqh_workers;
   if (version < NDBD_SPLIT_VERSION)
   {
     jam();
     mysql_version = 0;
   }
+  if (version < NDBD_MT_LQH_VERSION)
+  {
+    jam();
+    lqh_workers = 0;
+  }
 
   NodeRecPtr nodePtr;  
   nodePtr.i = getOwnNodeId();
@@ -1684,6 +1697,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
   replyNodePtr.p->blockRef = signal->getSendersBlockRef();
   setNodeInfo(replyNodePtr.i).m_version = version;
   setNodeInfo(replyNodePtr.i).m_mysql_version = mysql_version;
+  setNodeInfo(replyNodePtr.i).m_lqh_workers = lqh_workers;
 
   recompute_version_info(NodeInfo::DB, version);
   
@@ -1741,8 +1755,13 @@ void Qmgr::execCM_NODEINFOREQ(Signal* si
   Uint32 mysql_version = req->mysql_version;
   if (req->version < NDBD_SPLIT_VERSION)
     mysql_version = 0;
-  
   setNodeInfo(addNodePtr.i).m_mysql_version = mysql_version;
+
+  Uint32 lqh_workers = req->lqh_workers;
+  if (req->version < NDBD_MT_LQH_VERSION)
+    lqh_workers = 0;
+  setNodeInfo(addNodePtr.i).m_lqh_workers = lqh_workers;
+
   c_maxDynamicId = req->dynamicId;
 
   cmAddPrepare(signal, addNodePtr, nodePtr.p);
@@ -1799,6 +1818,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR
   conf->dynamicId = self->ndynamicId;
   conf->version = getNodeInfo(getOwnNodeId()).m_version;
   conf->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+  conf->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
   sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal,
 	     CmNodeInfoConf::SignalLength, JBB);
   DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");

=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp	2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp	2008-12-02 13:10:49 +0000
@@ -45,6 +45,11 @@
 #define RG_JOBBUFFER            4
 
 /**
+ * File-thread buffers
+ */
+#define RG_FILE_BUFFERS         5
+
+/**
  * 
  */
 #define RG_RESERVED             0
@@ -69,5 +74,7 @@
 #define RT_DBTUP_PAGE_MAP          MAKE_TID( 2, RG_DATAMEM)
 
 #define RT_JOB_BUFFER              MAKE_TID( 1, RG_JOBBUFFER)
+
+#define RT_FILE_BUFFER             MAKE_TID( 1, RG_FILE_BUFFERS)
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2008-12-02 13:10:49 +0000
@@ -45,11 +45,7 @@ Tsman::Tsman(Block_context& ctx) :
   m_pgman(0),
   m_lgman(0),
   m_tup(0),
-#ifdef __sun // temp
-  m_client_mutex(1, false)
-#else
-  m_client_mutex(2, true)
-#endif
+  m_client_mutex("tsman-client", 2, true)
 {
   BLOCK_CONSTRUCTOR(Tsman);
 
@@ -771,6 +767,7 @@ Tsman::open_file(Signal* signal, 
   req->fileFlags = 0;
   req->fileFlags |= FsOpenReq::OM_READWRITE;
   req->fileFlags |= FsOpenReq::OM_DIRECT;
+  req->fileFlags |= FsOpenReq::OM_THREAD_POOL;
   switch(requestInfo){
   case CreateFileImplReq::Create:
     req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;

=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp	2008-11-09 18:37:29 +0000
+++ b/storage/ndb/src/kernel/main.cpp	2008-12-02 13:10:49 +0000
@@ -250,20 +250,34 @@ init_global_memory_manager(EmulatorData 
                         "config, exiting.");
     return -1;
   }
+
   if (tupmem)
   {
     Resource_limit rl;
     rl.m_min = tupmem;
     rl.m_max = tupmem;
-    rl.m_resource_id = 3;
+    rl.m_resource_id = RG_DATAMEM;
+    ed.m_mem_manager->set_resource_limit(rl);
+  }
+
+  Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+  Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
+  Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+
+  if (filepages)
+  {
+    Resource_limit rl;
+    rl.m_min = filepages;
+    rl.m_max = filepages;
+    rl.m_resource_id = RG_FILE_BUFFERS;
     ed.m_mem_manager->set_resource_limit(rl);
   }
 
-  if (shared_mem+tupmem)
+  if (shared_mem + tupmem + filepages)
   {
     Resource_limit rl;
     rl.m_min = 0;
-    rl.m_max = shared_mem + tupmem;
+    rl.m_max = shared_mem + tupmem + filepages;
     rl.m_resource_id = 0;
     ed.m_mem_manager->set_resource_limit(rl);
   }
@@ -280,7 +294,7 @@ init_global_memory_manager(EmulatorData 
     ndb_mgm_get_db_parameter_info(CFG_DB_SGA, &sga, &size);
 
     g_eventLogger->alert("Malloc (%lld bytes) for %s and %s failed, exiting",
-                         Uint64(shared_mem + tupmem) * 32768,
+                         Uint64(shared_mem + tupmem) * GLOBAL_PAGE_SIZE,
                          dm.m_name, sga.m_name);
     return -1;
   }
@@ -293,8 +307,10 @@ get_multithreaded_config(EmulatorData& e
 {
   // multithreaded is compiled in ndbd/ndbmtd for now
   globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
-  if (!globalData.isNdbMt)
+  if (!globalData.isNdbMt) {
+    ndbout << "NDBMT: non-mt" << endl;
     return 0;
+  }
 
   const ndb_mgm_configuration_iterator * p =
     ed.theConfiguration->getOwnConfigIterator();

=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp	2008-12-01 18:03:48 +0000
@@ -15,94 +15,151 @@
 
 #include "SafeMutex.hpp"
 
-NdbOut&
-operator<<(NdbOut& out, const SafeMutex& dm)
-{
-  out << "level=" << dm.m_level << "," << "usage=" << dm.m_usage;
-  return out;
-}
-
 int
 SafeMutex::create()
 {
-  if (m_init)
-    return ErrState;
-  int ret = -1;
-#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifndef __WIN__
-  if (m_limit > 1 || m_debug) {
-    pthread_mutexattr_t attr;
-    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-    ret = pthread_mutex_init(&m_mutex, &attr);
-  } else {
-    // error-check mutex does not work right on my linux, skip it
-    ret = pthread_mutex_init(&m_mutex, 0);
-  }
-#else
+  int ret;
+  if (m_initdone)
+    return err(ErrState, __LINE__);
   ret = pthread_mutex_init(&m_mutex, 0);
-#endif
-#else
-  if (m_limit > 1 || m_debug)
-    return ErrUnsupp;
-  ret = pthread_mutex_init(&m_mutex, 0);
-#endif
   if (ret != 0)
-    return ret;
-  m_init = true;
+    return err(ret, __LINE__);
+  ret = pthread_cond_init(&m_cond, 0);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  m_initdone = true;
   return 0;
 }
 
 int
 SafeMutex::destroy()
 {
-  if (!m_init)
-    return ErrState;
-  int ret = pthread_mutex_destroy(&m_mutex);
+  int ret;
+  if (!m_initdone)
+    return err(ErrState, __LINE__);
+  ret = pthread_cond_destroy(&m_cond);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  ret = pthread_mutex_destroy(&m_mutex);
   if (ret != 0)
-    return ret;
-  m_init = false;
+    return err(ret, __LINE__);
+  m_initdone = false;
   return 0;
 }
 
 int
 SafeMutex::lock()
 {
-  pthread_t self = pthread_self();
-  int ret = pthread_mutex_lock(&m_mutex);
-  /* have mutex */
+  int ret;
+  if (m_simple) {
+    ret = pthread_mutex_lock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    return 0;
+  }
+  ret = pthread_mutex_lock(&m_mutex);
   if (ret != 0)
-    return ret;
-  if (!(m_level < m_limit))
-    return ErrLevel;
-  m_level++;
-  if (m_level > m_usage)
-    m_usage = m_level;
-  if (m_level == 1 && m_owner != 0)
-    return ErrOwner1;
-  if (m_level >= 2 && m_owner != self)
-    return ErrOwner2;
-  m_owner = self;
+    return err(ret, __LINE__);
+  return lock_impl();
+}
+
+int
+SafeMutex::lock_impl()
+{
+  int ret;
+  pthread_t self = pthread_self();
+  assert(self != 0);
+  while (1) {
+    if (m_level == 0) {
+      assert(m_owner == 0);
+      m_owner = self;
+    } else if (m_owner != self) {
+      ret = pthread_cond_wait(&m_cond, &m_mutex);
+      if (ret != 0)
+        return err(ret, __LINE__);
+      continue;
+    }
+    if (!(m_level < m_limit))
+      return err(ErrLevel, __LINE__);
+    m_level++;
+    if (m_usage < m_level)
+      m_usage = m_level;
+    ret = pthread_cond_signal(&m_cond);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    ret = pthread_mutex_unlock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    break;
+  }
   return 0;
 }
 
 int
 SafeMutex::unlock()
 {
+  int ret;
+  if (m_simple) {
+    ret = pthread_mutex_unlock(&m_mutex);
+    if (ret != 0)
+      return err(ret, __LINE__);
+    return 0;
+  }
+  ret = pthread_mutex_lock(&m_mutex);
+  if (ret != 0)
+    return err(ret, __LINE__);
+  return unlock_impl();
+}
+
+int
+SafeMutex::unlock_impl()
+{
+  int ret;
   pthread_t self = pthread_self();
-  if (!(m_level > 0))
-    return ErrState;
+  assert(self != 0);
   if (m_owner != self)
-    return ErrOwner3;
-  if (m_level == 1)
-    m_owner = 0;
+    return err(ErrOwner, __LINE__);
+  if (m_level == 0)
+    return err(ErrNolock, __LINE__);
   m_level--;
-  int ret = pthread_mutex_unlock(&m_mutex);
-  /* lose mutex */
+  if (m_level == 0) {
+    m_owner = 0;
+    ret = pthread_cond_signal(&m_cond);
+    if (ret != 0)
+      return err(ret, __LINE__);
+  }
+  ret = pthread_mutex_unlock(&m_mutex);
   if (ret != 0)
-    return ret;
+    return err(ret, __LINE__);
   return 0;
 }
 
+int
+SafeMutex::err(int errcode, int errline)
+{
+  assert(errcode != 0);
+  m_errcode = errcode;
+  m_errline = errline;
+  ndbout << *this << endl;
+#ifdef UNIT_TEST
+  abort();
+#endif
+  return errcode;
+}
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& sm)
+{
+  out << sm.m_name << ":";
+  out << " level=" << sm.m_level;
+  out << " usage=" << sm.m_usage;
+  if (sm.m_errcode != 0) {
+    out << " errcode=" << sm.m_errcode;
+    out << " errline=" << sm.m_errline;
+  }
+  return out;
+}
+
 #ifdef UNIT_TEST
 
 struct sm_thr {
@@ -144,10 +201,12 @@ sm_run(void* arg)
     }
     if (op == +1) {
       assert(level < thr.limit);
+      //ndbout << thr.index << ": lock" << endl;
       int ret = sm.lock();
       assert(ret == 0);
       level++;
     } else if (op == -1) {
+      //ndbout << thr.index << ": unlock" << endl;
       int ret = sm.unlock();
       assert(ret == 0);
       assert(level != 0);
@@ -161,6 +220,7 @@ sm_run(void* arg)
     assert(ret == 0);
     level--;
   }
+  return 0;
 }
 
 int
@@ -169,18 +229,19 @@ main(int argc, char** argv)
   const uint max_thr = 128;
   struct sm_thr thr[max_thr];
 
-  // threads - loops - max level
+  // threads - loops - max level - debug
   uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
   assert(num_thr != 0 && num_thr <= max_thr);
   uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
   uint limit = argc > 3 ? atoi(argv[3]) : 10;
   assert(limit != 0);
+  bool debug = argc > 4 ? atoi(argv[4]) : true;
 
   ndbout << "threads=" << num_thr;
   ndbout << " loops=" << loops;
   ndbout << " max level=" << limit << endl;
 
-  SafeMutex sm(limit, true);
+  SafeMutex sm("test-mutex", limit, debug);
   int ret;
   ret = sm.create();
   assert(ret == 0);

=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-12-01 18:03:48 +0000
@@ -26,61 +26,70 @@
 #include <ndb_types.h>
 #include <NdbOut.hpp>
 
-#undef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifdef __linux
-#define HAVE_PTHREAD_MUTEX_RECURSIVE
-#endif
-
 /*
- * Recursive mutex with a recursion limit >= 1.  Can be useful for
- * debugging.  If a recursive mutex is not wanted, one must rewrite
- * caller code until limit 1 works.
+ * Recursive mutex with recursion limit >= 1.  Intended for debugging.
+ * One should rewrite caller code until limit 1 works.
  *
- * Implementation for limit > 1 uses a real OS recursive mutex.  Should
- * work on linux and solaris 10.  There is a unit test testSafeMutex.
+ * The implementation uses a default mutex.  If limit is > 1 or debug
+ * is specified then a recursive mutex is simulated.  Operating system
+ * recursive mutex (if any) is not used.  The simulation is several
+ * times slower.  There is a unit test testSafeMutex.
  *
  * The caller currently is multi-threaded disk data.  Here it is easy
  * to verify that the mutex is released within a time-slice.
  */
 
 class SafeMutex {
+  const char* const m_name;
+  const Uint32 m_limit; // error if usage exceeds this
+  const bool m_debug;   // use recursive implementation even for limit 1
+  const bool m_simple;
   pthread_mutex_t m_mutex;
+  pthread_cond_t m_cond;
   pthread_t m_owner;
-  bool m_init;
+  bool m_initdone;
   Uint32 m_level;
   Uint32 m_usage;       // max level used so far
-  const Uint32 m_limit; // error if usage exceeds this
-  const bool m_debug;   // use recursive mutex even for limit 1
+  int m_errcode;
+  int m_errline;
+  int err(int errcode, int errline);
   friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
 
 public:
-  SafeMutex(Uint32 limit, bool debug) :
+  SafeMutex(const char* name, Uint32 limit, bool debug) :
+    m_name(name),
     m_limit(limit),
-    m_debug(debug)
+    m_debug(debug),
+    m_simple(!(limit > 1 || debug))
   {
     assert(m_limit >= 1),
     m_owner = 0;        // wl4391_todo assuming numeric non-zero
-    m_init = false;
+    m_initdone = false;
     m_level = 0;
     m_usage = 0;
+    m_errcode = 0;
+    m_errline = 0;
   };
   ~SafeMutex() {
-    (void)destroy();
+    if (m_initdone)
+      (void)destroy();
   }
 
   enum {
-    // caller must crash on any error
-    ErrUnsupp = -101,   // limit > 1 or debug, and not supported by OS
-    ErrState = -102,    // user error
-    ErrLevel = -103,    // level exceeded limit
-    ErrOwner1 = -104,   // owner not 0 at first lock (OS error)
-    ErrOwner2 = -105,   // owner not self at recursive lock (OS error)
-    ErrOwner3 = -106    // owner not self at unlock (OS error)
+    // caller must crash on any error - recovery is not possible
+    ErrState = -101,    // user error
+    ErrLevel = -102,    // level exceeded limit
+    ErrOwner = -103,    // unlock when not owner
+    ErrNolock = -104    // unlock when no lock
   };
   int create();
   int destroy();
   int lock();
   int unlock();
+
+private:
+  int lock_impl();
+  int unlock_impl();
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-01 18:05:11 +0000
@@ -544,6 +544,7 @@ protected:
   BlockReference calcInstanceBlockRef(BlockNumber aBlock);
 
   // matching instance on another node e.g. LQH-LQH
+  // valid only if receiver has same number of workers
   BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
 
   /** 

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2008-02-08 14:35:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2008-12-02 13:10:49 +0000
@@ -84,7 +84,7 @@ public:
 private:
   void grow(Uint32 start, Uint32 cnt);
 
-#define XX_RL_COUNT 5
+#define XX_RL_COUNT 6
   /**
    * Return pointer to free page data on page
    */

=== modified file 'storage/ndb/src/mgmapi/LocalConfig.cpp'
--- a/storage/ndb/src/mgmapi/LocalConfig.cpp	2008-11-18 16:33:59 +0000
+++ b/storage/ndb/src/mgmapi/LocalConfig.cpp	2008-12-02 14:25:58 +0000
@@ -275,7 +275,8 @@ LocalConfig::parseString(const char * co
     err.assfmt("Unexpected entry: \"%s\"", tok);
     return false;
   }
-
+  bind_address_port= 0;
+  bind_address.assign("");
   return true;
 }
 
@@ -343,6 +344,15 @@ char *
 LocalConfig::makeConnectString(char *buf, int sz)
 {
   int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId);
+  if (p < sz && bind_address.length())
+  {
+    int new_p= p+BaseString::snprintf(buf+p,sz-p,",bind-address=%s:%d",
+                                      bind_address.c_str(), bind_address_port);
+    if (new_p < sz)
+      p= new_p;
+    else 
+      buf[p]= 0;
+  }
   if (p < sz)
     for (unsigned i = 0; i < ids.size(); i++)
     {
@@ -356,6 +366,18 @@ LocalConfig::makeConnectString(char *buf
       {
 	buf[p]= 0;
 	break;
+      }
+      if (!bind_address.length() && ids[i].bind_address.length())
+      {
+        new_p= p+BaseString::snprintf(buf+p,sz-p,",bind-address=%s:%d",
+                                      ids[i].bind_address.c_str(), ids[i].bind_address_port);
+        if (new_p < sz)
+          p= new_p;
+        else 
+        {
+            buf[p]= 0;
+            break;
+        }
       }
     }
   buf[sz-1]=0;

=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp	2008-11-19 14:47:19 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp	2008-12-02 14:25:58 +0000
@@ -238,6 +238,8 @@ ndb_mgm_set_connectstring(NdbMgmHandle h
     DBUG_RETURN(-1);
   }
   handle->cfg_i= -1;
+  handle->cfg.bind_address_port= handle->m_bindaddress_port;
+  handle->cfg.bind_address.assign(handle->m_bindaddress ? handle->m_bindaddress : "");
   DBUG_RETURN(0);
 }
 
@@ -265,6 +267,11 @@ ndb_mgm_set_bindaddress(NdbMgmHandle han
   {
     handle->m_bindaddress = 0;
     handle->m_bindaddress_port = 0;
+  }
+  if (handle->cfg.ids.size() != 0)
+  {
+    handle->cfg.bind_address_port= handle->m_bindaddress_port;
+    handle->cfg.bind_address.assign(handle->m_bindaddress ? handle->m_bindaddress : "");
   }
   DBUG_RETURN(0);
 }

=== modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp'
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2008-11-19 10:34:01 +0000
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2008-12-03 14:14:38 +0000
@@ -2289,10 +2289,10 @@ print_status(const ndb_mgm_node_state * 
          << ": " << status_string(state->node_status);
   switch(state->node_status){
   case NDB_MGM_NODE_STATUS_STARTING:
-    ndbout << " (Phase " << state->start_phase << ")";
+    ndbout << " (Last completed phase " << state->start_phase << ")";
     break;
   case NDB_MGM_NODE_STATUS_SHUTTING_DOWN:
-    ndbout << " (Phase " << state->start_phase << ")";
+    ndbout << " (Last completed phase " << state->start_phase << ")";
     break;
   default:
     break;

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-12-02 13:10:49 +0000
@@ -1074,6 +1074,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
     0, 0 },
 
   {
+    CFG_DB_THREAD_POOL,
+    "ThreadPool",
+    DB_TOKEN,
+    "No of unbound threads for file access (currently only for DD)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    "8",
+    "0",  
+    STR_VALUE(MAX_INT_RNIL) },
+
+  {
     CFG_DB_MAX_OPEN_FILES,
     "MaxNoOfOpenFiles",
     DB_TOKEN,

=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp	2008-10-31 14:19:25 +0000
+++ b/storage/ndb/test/ndbapi/testDict.cpp	2008-12-02 12:05:54 +0000
@@ -1243,86 +1243,6 @@ int runGetPrimaryKey(NDBT_Context* ctx, 
   return result;
 }
 
-struct ErrorCodes { int error_id; bool crash;};
-ErrorCodes
-NF_codes[] = {
-  {6003, true}
-  ,{6004, true}
-  //,6005, true,
-  //{7173, false}
-};
-
-int
-runNF1(NDBT_Context* ctx, NDBT_Step* step){
-  NdbRestarter restarter;
-  if(restarter.getNumDbNodes() < 2)
-    return NDBT_OK;
-
-  myRandom48Init((long)NdbTick_CurrentMillisecond());
-  
-  Ndb* pNdb = GETNDB(step);
-  const NdbDictionary::Table* pTab = ctx->getTab();
-
-  NdbDictionary::Dictionary* dict = pNdb->getDictionary();
-  dict->dropTable(pTab->getName());
-
-  int result = NDBT_OK;
-
-  const int loops = ctx->getNumLoops();
-  for (int l = 0; l < loops && result == NDBT_OK ; l++){
-    const int sz = sizeof(NF_codes)/sizeof(NF_codes[0]);
-    for(int i = 0; i<sz; i++){
-      int rand = myRandom48(restarter.getNumDbNodes());
-      int nodeId = restarter.getRandomNotMasterNodeId(rand);
-      struct ErrorCodes err_struct = NF_codes[i];
-      int error = err_struct.error_id;
-      bool crash = err_struct.crash;
-      
-      g_info << "NF1: node = " << nodeId << " error code = " << error << endl;
-      
-      int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 3};
-      
-      CHECK2(restarter.dumpStateOneNode(nodeId, val2, 2) == 0,
-	     "failed to set RestartOnErrorInsert");
-
-      CHECK2(restarter.insertErrorInNode(nodeId, error) == 0,
-	     "failed to set error insert");
-      
-      CHECK2(dict->createTable(* pTab) == 0,
-	     "failed to create table");
-      
-      if (crash) {
-        CHECK2(restarter.waitNodesNoStart(&nodeId, 1) == 0,
-	    "waitNodesNoStart failed");
-
-        if(myRandom48(100) > 50){
-  	  CHECK2(restarter.startNodes(&nodeId, 1) == 0,
-	       "failed to start node");
-          
-	  CHECK2(restarter.waitClusterStarted() == 0,
-	       "waitClusterStarted failed");
-
-  	  CHECK2(dict->dropTable(pTab->getName()) == 0,
-	       "drop table failed");
-        } else {
-	  CHECK2(dict->dropTable(pTab->getName()) == 0,
-	       "drop table failed");
-	
-	  CHECK2(restarter.startNodes(&nodeId, 1) == 0,
-	       "failed to start node");
-          
-	  CHECK2(restarter.waitClusterStarted() == 0,
-	       "waitClusterStarted failed");
-        }
-      }
-    }
-  }
- end:  
-  dict->dropTable(pTab->getName());
-  
-  return result;
-}
-  
 #define APIERROR(error) \
   { g_err << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
               << error.code << ", msg: " << error.message << "." << endl; \
@@ -1468,102 +1388,6 @@ runTableRename(NDBT_Context* ctx, NDBT_S
 }
 
 int
-runTableRenameNF(NDBT_Context* ctx, NDBT_Step* step){
-  NdbRestarter restarter;
-  if(restarter.getNumDbNodes() < 2)
-    return NDBT_OK;
-
-  int result = NDBT_OK;
-
-  Ndb* pNdb = GETNDB(step);
-  NdbDictionary::Dictionary* dict = pNdb->getDictionary();
-  int records = ctx->getNumRecords();
-  const int loops = ctx->getNumLoops();
-
-  ndbout << "|- " << ctx->getTab()->getName() << endl;  
-
-  for (int l = 0; l < loops && result == NDBT_OK ; l++){
-    const NdbDictionary::Table* pTab = ctx->getTab();
-
-    // Try to create table in db
-    if (pTab->createTableInDb(pNdb) != 0){
-      return NDBT_FAILED;
-    }
-    
-    // Verify that table is in db     
-    const NdbDictionary::Table* pTab2 = 
-      NDBT_Table::discoverTableFromDb(pNdb, pTab->getName());
-    if (pTab2 == NULL){
-      ndbout << pTab->getName() << " was not found in DB"<< endl;
-      return NDBT_FAILED;
-    }
-    ctx->setTab(pTab2);
-
-    // Load table
-    HugoTransactions hugoTrans(*ctx->getTab());
-    if (hugoTrans.loadTable(pNdb, records) != 0){
-      return NDBT_FAILED;
-    }
-
-    BaseString pTabName(pTab->getName());
-    BaseString pTabNewName(pTabName);
-    pTabNewName.append("xx");
-    
-    const NdbDictionary::Table * oldTable = dict->getTable(pTabName.c_str());
-    if (oldTable) {
-      NdbDictionary::Table newTable = *oldTable;
-      newTable.setName(pTabNewName.c_str());
-      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
-	     "TableRename failed");
-    }
-    else {
-      result = NDBT_FAILED;
-    }
-    
-    // Restart one node at a time
-    
-    /**
-     * Need to run LCP at high rate otherwise
-     * packed replicas become "to many"
-     */
-    int val = DumpStateOrd::DihMinTimeBetweenLCP;
-    if(restarter.dumpStateAllNodes(&val, 1) != 0){
-      do { CHECK(0); } while(0);
-      g_err << "Failed to set LCP to min value" << endl;
-      return NDBT_FAILED;
-    }
-    
-    const int numNodes = restarter.getNumDbNodes();
-    for(int i = 0; i<numNodes; i++){
-      int nodeId = restarter.getDbNodeId(i);
-      int error = NF_codes[i].error_id;
-
-      g_info << "NF1: node = " << nodeId << " error code = " << error << endl;
-
-      CHECK2(restarter.restartOneDbNode(nodeId) == 0,
-	     "failed to set restartOneDbNode");
-
-      CHECK2(restarter.waitClusterStarted() == 0,
-	     "waitClusterStarted failed");
-
-    }
-
-    // Verify table contents
-    NdbDictionary::Table pNewTab(pTabNewName.c_str());
-    
-    UtilTransactions utilTrans(pNewTab);
-    if (utilTrans.clearTable(pNdb,  records) != 0){
-      continue;
-    }    
-
-    // Drop table
-    dict->dropTable(pTabNewName.c_str());
-  }
- end:    
-  return result;
-}
-
-int
 runTableRenameSR(NDBT_Context* ctx, NDBT_Step* step){
   NdbRestarter restarter;
   if(restarter.getNumDbNodes() < 2)
@@ -6675,17 +6499,24 @@ runFailAddPartition(NDBT_Context* ctx, N
   NdbDictionary::Table altered = * org;
   altered.setFragmentCount(org->getFragmentCount() + 2);
 
-  NdbDictionary::HashMap hm;
-  pDic->initDefaultHashMap(hm, altered.getFragmentCount());
-  if (pDic->getHashMap(hm, hm.getName()) == -1)
+  if (pDic->beginSchemaTrans())
   {
-    if (pDic->createHashMap(hm) != 0)
-    {
-      ndbout << "Failed to create hashmap: " << pDic->getNdbError() << endl;
-      return NDBT_FAILED;
-    }
+    ndbout << "Failed to beginSchemaTrans()" << pDic->getNdbError() << endl;
+    return NDBT_FAILED;
+  }
+
+  if (pDic->prepareHashMap(*org, altered) == -1)
+  {
+    ndbout << "Failed to create hashmap: " << pDic->getNdbError() << endl;
+    return NDBT_FAILED;
   }
 
+  if (pDic->endSchemaTrans())
+  {
+    ndbout << "Failed to endSchemaTrans()" << pDic->getNdbError() << endl;
+    return NDBT_FAILED;
+  }
+  
   int dump1 = DumpStateOrd::SchemaResourceSnapshot;
   int dump2 = DumpStateOrd::SchemaResourceCheckLeak;
 
@@ -6702,9 +6533,16 @@ runFailAddPartition(NDBT_Context* ctx, N
              "failed to set error insert");
       CHECK(restarter.dumpStateAllNodes(&dump1, 1) == 0);
 
-      CHECK2(pDic->alterTable(*org, altered) != 0,
+      int res = pDic->alterTable(*org, altered);
+      if (res)
+      {
+        ndbout << pDic->getNdbError() << endl;
+      }
+      CHECK2(res != 0,
              "failed to fail after error insert " << errval);
       CHECK(restarter.dumpStateAllNodes(&dump2, 1) == 0);
+      CHECK2(restarter.insertErrorInNode(nodeId, 0) == 0,
+             "failed to clear error insert");
 
       const NdbDictionary::Table* check = pDic->getTable(tab.getName());
 
@@ -6926,17 +6764,9 @@ TESTCASE("StoreFrmError", 
 	 "Test that a frm file with too long length can't be stored."){
   INITIALIZER(runStoreFrmError);
 }
-TESTCASE("NF1", 
-	 "Test that create table can handle NF (not master)"){
-  INITIALIZER(runNF1);
-}
 TESTCASE("TableRename",
 	 "Test basic table rename"){
   INITIALIZER(runTableRename);
-}
-TESTCASE("TableRenameNF",
-	 "Test that table rename can handle node failure"){
-  INITIALIZER(runTableRenameNF);
 }
 TESTCASE("TableRenameSR",
 	 "Test that table rename can handle system restart"){

=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp	2008-11-03 08:38:27 +0000
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp	2008-12-03 19:51:33 +0000
@@ -292,6 +292,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
   int result = NDBT_OK;
   int loops = ctx->getNumLoops();
   int sync_threads = ctx->getProperty("SyncThreads", (unsigned)0);
+  int sleep0 = ctx->getProperty("Sleep0", (unsigned)0);
+  int sleep1 = ctx->getProperty("Sleep1", (unsigned)0);
+  int randnode = ctx->getProperty("RandNode", (unsigned)0);
   NdbRestarter restarter;
   int i = 0;
   int lastId = 0;
@@ -310,6 +313,10 @@ int runRestarter(NDBT_Context* ctx, NDBT
   while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){
 
     int id = lastId % restarter.getNumDbNodes();
+    if (randnode == 1)
+    {
+      id = rand() % restarter.getNumDbNodes();
+    }
     int nodeId = restarter.getDbNodeId(id);
     ndbout << "Restart node " << nodeId << endl; 
     if(restarter.restartOneDbNode(nodeId, false, true, true) != 0){
@@ -325,6 +332,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
       break;
     }
 
+    if (sleep1)
+      NdbSleep_MilliSleep(sleep1);
+
     if (restarter.startNodes(&nodeId, 1))
     {
       g_err << "Failed to start node" << endl;
@@ -338,6 +348,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
       break;
     }
 
+    if (sleep0)
+      NdbSleep_MilliSleep(sleep0);
+
     ctx->sync_up_and_wait("PauseThreads", sync_threads);
 
     lastId++;
@@ -3243,6 +3256,65 @@ loop2:
   return NDBT_OK;
 }
 
+int 
+runHammer(NDBT_Context* ctx, NDBT_Step* step)
+{ 
+  int result = NDBT_OK;
+  int records = ctx->getNumRecords();
+  Ndb* pNdb = GETNDB(step);
+  HugoOperations hugoOps(*ctx->getTab());
+  while (!ctx->isTestStopped())
+  {
+    int r = rand() % records;
+    if (hugoOps.startTransaction(pNdb) != 0)
+      goto err;
+    
+    if ((rand() % 100) < 50)
+    {
+      if (hugoOps.pkUpdateRecord(pNdb, r, 1, rand()) != 0)
+        goto err;
+    }
+    else
+    {
+      if (hugoOps.pkWriteRecord(pNdb, r, 1, rand()) != 0)
+        goto err;
+    }
+    
+    if (hugoOps.execute_NoCommit(pNdb) != 0)
+      goto err;
+    
+    if (hugoOps.pkDeleteRecord(pNdb, r, 1) != 0)
+      goto err;
+    
+    if (hugoOps.execute_NoCommit(pNdb) != 0)
+      goto err;
+    
+    if ((rand() % 100) < 50)
+    {
+      if (hugoOps.pkInsertRecord(pNdb, r, 1, rand()) != 0)
+        goto err;
+    }
+    else
+    {
+      if (hugoOps.pkWriteRecord(pNdb, r, 1, rand()) != 0)
+        goto err;
+    }
+    
+    if ((rand() % 100) < 90)
+    {
+      hugoOps.execute_Commit(pNdb);
+    }
+    else
+    {
+  err:
+      hugoOps.execute_Rollback(pNdb);
+    }
+    
+    hugoOps.closeTransaction(pNdb);
+  }
+  return NDBT_OK;
+}
+
 NDBT_TESTSUITE(testNodeRestart);
 TESTCASE("NoLoad", 
 	 "Test that one node at a time can be stopped and then restarted "\
@@ -3695,6 +3767,15 @@ TESTCASE("Bug36276", ""){
 TESTCASE("Bug36245", ""){
   INITIALIZER(runLoadTable);
   STEP(runBug36245);
+  VERIFIER(runClearTable);
+}
+TESTCASE("NF_Hammer", ""){
+  TC_PROPERTY("Sleep0", 9000);
+  TC_PROPERTY("Sleep1", 3000);
+  TC_PROPERTY("Rand", 1);
+  INITIALIZER(runLoadTable);
+  STEPS(runHammer, 25);
+  STEP(runRestarter);
   VERIFIER(runClearTable);
 }
 NDBT_TESTSUITE_END(testNodeRestart);

=== modified file 'storage/ndb/test/run-test/command.cpp'
--- a/storage/ndb/test/run-test/command.cpp	2008-02-21 13:57:42 +0000
+++ b/storage/ndb/test/run-test/command.cpp	2008-11-27 18:03:09 +0000
@@ -66,6 +66,9 @@ static
 bool
 do_change_version(atrt_config& config, SqlResultSet& command,
                   AtrtClient& atrtdb){
+  /**
+   * TODO make option to restart "not" initial
+   */
   uint process_id= command.columnAsInt("process_id");
   const char* process_args= command.column("process_args");
 

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2008-11-08 21:43:03 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2008-12-03 20:14:25 +0000
@@ -321,7 +321,7 @@ max-time: 500
 cmd: testScan
 args: -n ScanRead488Timeout -l 10 T6 D1 D2
 
-max-time: 600
+max-time: 1200
 cmd: testScan
 args: -n ScanRead40 -l 100 T6 D1 D2 
 
@@ -337,10 +337,6 @@ max-time: 1800
 cmd: testScan
 args: -n ScanRead40RandomTable -l 100 T1 
 
-max-time: 3600
-cmd: testScan
-args: -n ScanRead40RandomTable -l 1000 T6 
-
 max-time: 500
 cmd: testScan
 args: -n ScanWithLocksAndInserts T6 D1 D2
@@ -954,10 +950,6 @@ cmd: DbAsyncGenerator
 args: -time 60 -p 1 -proc 25
 type: bench
 
-max-time: 300 
-cmd: testMgm
-args: 
-
 max-time: 5000
 cmd: testNodeRestart
 args: -n GCP T1
@@ -1157,7 +1149,6 @@ cmd: testDict
 args: -n FailAddPartition T1 I3
 
 # EOF 2008-06-05
-
 # Test data buffering for TCKEYREQ
 max-time: 500
 cmd: testLimits
@@ -1178,28 +1169,28 @@ cmd: testBasic
 args: -n PkUpdate WIDE_MAXKEY_HUGO WIDE_MAXATTR_HUGO WIDE_MAXKEYMAXCOLS_HUGO WIDE_MINKEYMAXCOLS_HUGO
 
 # EOF 2008-06-30
-
 max-time: 500
 cmd: test_event
 args -n bug37672 T1
 
 #EOF 2008-07-04
-
 max-time: 500
 cmd: testScanFilter
 args: 
 
 #EOF 2008-07-09
-
 max-time: 600
 cmd: test_event
 args -r 5000 -n Bug30780 T1
 
 #EOF 2008-08-11
-
 # Test data buffering for SCANTABREQ
 max-time: 500
 cmd: testLimits
 args: -n ExhaustSegmentedSectionScan WIDE_2COL
 
 #EOF 2008-08-20
+max-time: 300 
+cmd: testMgm
+args: 
+

=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt	2008-08-30 05:26:09 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt	2008-12-03 19:51:33 +0000
@@ -133,6 +133,10 @@ max-time: 2500
 cmd: testNodeRestart
 args: -n FiftyPercentStopAndWait T6 T13 
 
+max-time: 2500
+cmd: testNodeRestart
+args: -n NF_Hammer -r 5 T1
+
 #max-time: 500
 #cmd: testNodeRestart
 #args: -n StopOnError T1 
@@ -181,10 +185,6 @@ args: -l 2 -n SR1_O T6 T13 
 max-time: 500
 cmd: testIndex
 args: -n MixedTransaction T1 
-
-max-time: 2500
-cmd: testDict
-args: -n NF1 T1 T6 T13 
 
 #
 max-time: 1500

=== modified file 'storage/ndb/test/run-test/setup.cpp'
--- a/storage/ndb/test/run-test/setup.cpp	2008-11-03 12:33:34 +0000
+++ b/storage/ndb/test/run-test/setup.cpp	2008-11-27 19:42:21 +0000
@@ -317,7 +317,7 @@ load_process(atrt_config& config, atrt_c
 			      proc.m_host->m_basedir.c_str());
     proc.m_proc.m_args.appfmt(" --defaults-group-suffix=%s",
 			      cluster.m_name.c_str());
-    proc.m_proc.m_args.append(" --nodaemon -n");
+    proc.m_proc.m_args.append(" --nodaemon --initial -n");
     if (g_fix_nodeid)
       proc.m_proc.m_args.appfmt(" --ndb-nodeid=%d", proc.m_nodeid);
     proc.m_proc.m_cwd.assfmt("%sndbd.%d", dir.c_str(), proc.m_index);

Thread
bzr commit into mysql-5.1 branch (tomas.ulin:3137) Tomas Ulin5 Dec