MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Stewart Smith Date:October 9 2008 6:28am
Subject:bzr push into mysql-5.1 branch (stewart:2986 to 2991)
View as plain text  
 2991 Stewart Smith	2008-10-09
       fix use of non-portable gettimeofday so that Win32 builds again
modified:
  sql/ha_ndbcluster_lock_ext.h

 2990 Stewart Smith	2008-10-09
       disable some spurious warnings on MSVC for NDB
modified:
  storage/ndb/CMakeLists.txt

 2989 Stewart Smith	2008-10-09
       add missing files for longsignal support to Cmake (fix build on win32)
modified:
  storage/ndb/src/kernel/vm/CMakeLists.txt

 2988 Stewart Smith	2008-10-09
       use correct cast (is bitmask for < 32bits so casting to Uint32 is okay)
modified:
  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp

 2987 Stewart Smith	2008-10-09 [merge]
      merge mainline
added:
  storage/ndb/src/kernel/vm/LongSignal.cpp
  storage/ndb/src/kernel/vm/LongSignalImpl.hpp
  storage/ndb/src/kernel/vm/LongSignal_mt.cpp
  storage/ndb/src/kernel/vm/LongSignal_nonmt.cpp
  storage/ndb/src/kernel/vm/SimplePropertiesSection_mt.cpp
  storage/ndb/src/kernel/vm/SimplePropertiesSection_nonmt.cpp
  storage/ndb/src/kernel/vm/mt-asm.h
modified:
  config/ac-macros/ha_ndbcluster.m4
  configure.in
  storage/ndb/include/util/SimpleProperties.hpp
  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
  storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp
  storage/ndb/src/kernel/vm/ArrayPool.hpp
  storage/ndb/src/kernel/vm/LongSignal.hpp
  storage/ndb/src/kernel/vm/Makefile.am
  storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/src/kernel/vm/TransporterCallback.cpp
  storage/ndb/src/kernel/vm/mt.cpp
  storage/ndb/src/kernel/vm/mt.hpp

 2986 Stewart Smith	2008-10-09
       use explicit casts for char and uint32
modified:
  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp

=== modified file 'config/ac-macros/ha_ndbcluster.m4'
--- a/config/ac-macros/ha_ndbcluster.m4	2008-04-29 08:09:52 +0000
+++ b/config/ac-macros/ha_ndbcluster.m4	2008-10-08 20:24:20 +0000
@@ -269,7 +269,7 @@ AC_DEFUN([MYSQL_SETUP_NDBCLUSTER], [
   
   if test X"$ndb_mtd" = Xyes
   then
-    if test X"$have_ndbmtd_x86" = Xyes
+    if test X"$have_ndbmtd_asm" = Xyes
     then
       build_ndbmtd=yes
       AC_MSG_RESULT([Including ndbmtd])

=== modified file 'configure.in'
--- a/configure.in	2008-10-07 06:57:14 +0000
+++ b/configure.in	2008-10-08 20:24:20 +0000
@@ -2413,24 +2413,34 @@ AC_TRY_LINK(
   AC_DEFINE(HAVE_LINUX_FUTEX, [1], [Linux futex support]),
   AC_MSG_RESULT(no))
 
-#checking for asm-needed for ndbmtd
-AC_MSG_CHECKING(for x86-assembler needed for ndbmtd)
-AC_LANG_SAVE
-AC_LANG_CPLUSPLUS
-AC_TRY_LINK(
-  [],
-  [
-    unsigned int a = 0, *ap = &a;
-    asm volatile("xchg %0, %1;" : "+r" (a) , "+m" (*ap));
-    asm volatile("mfence":::"memory");
-    asm volatile("" ::: "memory");
-    asm volatile ("rep;nop");
-    return a;
-  ],
-  have_ndbmtd_x86=yes
-  AC_MSG_RESULT(yes)
-  AC_DEFINE(NDBMTD_X86, [1], [ndbmtd x86 support]),
-  AC_MSG_RESULT(no))
+#checking for asm needed for ndbmtd
+AC_CACHE_CHECK([assembler needed for ndbmtd],
+               [have_ndbmtd_asm], [AC_TRY_RUN([
+    #include "storage/ndb/src/kernel/vm/mt-asm.h"
+    int main()
+    {
+      unsigned int a = 0;
+      volatile unsigned int *ap = (volatile unsigned int*)&a;
+      #ifdef NDB_HAVE_XCNG
+      a = xcng(ap, 1);
+      cpu_pause();
+      #endif
+      mb();
+      * ap = 2;
+      rmb();
+      * ap = 1;
+      wmb();
+      * ap = 0;
+      read_barrier_depends();
+      return a;
+   }
+], [have_ndbmtd_asm=yes],
+   [have_ndbmtd_asm=no],
+   [have_ndbmtd_asm=no])])
+if test "x$have_ndbmtd_asm" = xyes; then
+  AC_DEFINE(HAVE_NDBMTD_ASM, 1,
+            [Define to 1 if assmbler needed to run ndbmtd is present.])
+fi
 
 AC_CACHE_CHECK([support for weak symbols], mysql_cv_weak_symbol,
 [AC_TRY_LINK([],[

=== modified file 'sql/ha_ndbcluster_lock_ext.h'
--- a/sql/ha_ndbcluster_lock_ext.h	2008-10-07 08:20:50 +0000
+++ b/sql/ha_ndbcluster_lock_ext.h	2008-10-09 09:48:11 +0000
@@ -14,6 +14,8 @@
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
 
+#include <NdbTick.h>
+
 /*
   These functions are shared with ndb_restore so that the creating of
   tables through ndb_restore is syncronized correctly with the mysqld's
@@ -36,12 +38,12 @@ ndbcluster_global_schema_lock_ext(THD *t
   NdbOperation *op;
   NdbTransaction *trans= NULL;
   int retry_sleep= 50; /* 50 milliseconds, transaction */
-  struct timeval time_end;
+  NDB_TICKS time_end;
 
   if (retry_time > 0)
   {
-    gettimeofday(&time_end, 0);
-    time_end.tv_sec+= retry_time;
+    time_end= NdbTick_CurrentMillisecond();
+    time_end+= retry_time*1000;
   }
   while (1)
   {
@@ -79,11 +81,8 @@ ndbcluster_global_schema_lock_ext(THD *t
       goto error_handler;
     if (retry_time > 0)
     {
-      struct timeval time_now;
-      gettimeofday(&time_now, 0);
-      if ((time_end.tv_sec < time_now.tv_sec) ||
-          (time_end.tv_sec == time_now.tv_sec && time_end.tv_usec < time_now.tv_usec))
-        goto error_handler;
+	  if(time_end < NdbTick_CurrentMillisecond())
+	    goto error_handler;
     }
     if (trans)
     {

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2008-08-20 13:22:09 +0000
+++ b/storage/ndb/CMakeLists.txt	2008-10-09 09:40:16 +0000
@@ -15,6 +15,14 @@
 
 SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DVM_TRACE")
 SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DVM_TRACE")
+
+# Disable some warnings for NDB build:
+# 4200: non-standard extension used: zero length array
+# 4355: 'this': used in base member initializer list
+
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4355 /wd4200")
+SET(CMAKE_C_FLAGS   "${CMAKE_CXX_FLAGS} /wd4355 /wd4200")
+
 ADD_SUBDIRECTORY(include)
 ADD_SUBDIRECTORY(src)
 ADD_SUBDIRECTORY(tools)

=== modified file 'storage/ndb/include/util/SimpleProperties.hpp'
--- a/storage/ndb/include/util/SimpleProperties.hpp	2008-05-16 13:08:36 +0000
+++ b/storage/ndb/include/util/SimpleProperties.hpp	2008-10-08 19:09:05 +0000
@@ -276,7 +276,7 @@ Uint32 SimplePropertiesSectionReader::ge
  */
 class SimplePropertiesSectionWriter : public SimpleProperties::Writer {
 public:
-  SimplePropertiesSectionWriter(class SectionSegmentPool &);
+  SimplePropertiesSectionWriter(class SimulatedBlock & block);
   virtual ~SimplePropertiesSectionWriter();
 
   virtual bool reset();
@@ -294,7 +294,9 @@ private:
 
   Int32 m_pos;
   Uint32 m_sz;
+
   class SectionSegmentPool & m_pool;
+  class SimulatedBlock & m_block;
   struct SectionSegment * m_head;
   Uint32 m_prevPtrI; // Prev to m_currentSegment
   struct SectionSegment * m_currentSegment;

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-10-08 13:58:58 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-10-09 01:13:03 +0000
@@ -4847,14 +4847,14 @@ Dbdict::createTable_release(SchemaOpPtr 
     jam();
     SegmentedSectionPtr ss_ptr;
     getSection(ss_ptr, createTabPtr.p->m_tabInfoPtrI);
-    ::release(ss_ptr);
+    SimulatedBlock::release(ss_ptr);
     createTabPtr.p->m_tabInfoPtrI = RNIL;
   }
   if (createTabPtr.p->m_fragmentsPtrI != RNIL) {
     jam();
     SegmentedSectionPtr ss_ptr;
     getSection(ss_ptr, createTabPtr.p->m_fragmentsPtrI);
-    ::release(ss_ptr);
+    SimulatedBlock::release(ss_ptr);
     createTabPtr.p->m_fragmentsPtrI = RNIL;
   }
   releaseOpRec<CreateTableRec>(op_ptr);
@@ -5111,7 +5111,7 @@ Dbdict::createTable_parse(Signal* signal
       Uint32 count = 2 + (1 + frag_data[0]) * frag_data[1];
 
       Ptr<SectionSegment> frag_ptr;
-      bool ok = ::import(frag_ptr, (Uint32*)frag_data, (count + 1) / 2);
+      bool ok = import(frag_ptr, (Uint32*)frag_data, (count + 1) / 2);
       ndbrequire(ok);
       createTabPtr.p->m_fragmentsPtrI = frag_ptr.i;
 
@@ -5121,7 +5121,7 @@ Dbdict::createTable_parse(Signal* signal
 
     // dump table record back into DictTabInfo
     {
-      SimplePropertiesSectionWriter w(getSectionSegmentPool());
+      SimplePropertiesSectionWriter w(* this);
       packTableIntoPages(w, tabPtr);
 
       SegmentedSectionPtr ss_ptr;
@@ -7336,7 +7336,7 @@ Dbdict::alterTable_parse(Signal* signal,
   {
     jam();
     releaseSections(handle);
-    SimplePropertiesSectionWriter w(getSectionSegmentPool());
+    SimplePropertiesSectionWriter w(* this);
     packTableIntoPages(w, alterTabPtr.p->m_newTablePtr);
 
     SegmentedSectionPtr tabInfoPtr;
@@ -7348,7 +7348,7 @@ Dbdict::alterTable_parse(Signal* signal,
     {
       jam();
       SegmentedSectionPtr ss_ptr;
-      ndbrequire(::import(ss_ptr, c_fragData_align32, (c_fragDataLen+1)/2));
+      ndbrequire(import(ss_ptr, c_fragData_align32, (c_fragDataLen+1)/2));
       handle.m_ptr[AlterTabReq::FRAGMENTATION] = ss_ptr;
       handle.m_cnt = 2;
     }
@@ -9406,8 +9406,8 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
   ListTablesData ltd;
   const Uint32 listTablesDataSizeInWords = (sizeof(ListTablesData) + 3) / 4;
   char tname[MAX_TAB_NAME_SIZE];
-  SimplePropertiesSectionWriter tableDataWriter(getSectionSegmentPool());
-  SimplePropertiesSectionWriter tableNamesWriter(getSectionSegmentPool());
+  SimplePropertiesSectionWriter tableDataWriter(* this);
+  SimplePropertiesSectionWriter tableNamesWriter(* this);
 
   Uint32 count = 0;
   Uint32 fragId = rand();
@@ -20639,7 +20639,7 @@ Dbdict::copyOut(const OpSection& op_sec,
     return false;
   }
   Ptr<SectionSegment> ptr;
-  if (!::import(ptr, buf, op_sec.getSize())) {
+  if (!import(ptr, buf, op_sec.getSize())) {
     jam();
     return false;
   }
@@ -23888,7 +23888,7 @@ Dbdict::createHashMap_parse(Signal* sign
      */
     hm.HashMapBuckets *= sizeof(Uint16);
     SimpleProperties::UnpackStatus s;
-    SimplePropertiesSectionWriter w(getSectionSegmentPool());
+    SimplePropertiesSectionWriter w(* this);
     s = SimpleProperties::pack(w,
                                &hm,
                                DictHashMapInfo::Mapping,

=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp	2008-10-05 07:12:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp	2008-10-08 20:29:28 +0000
@@ -14,7 +14,7 @@ void dbinfo_write_row_init(struct dbinfo
 int dbinfo_write_row_column(struct dbinfo_row *r, const char* col, int clen)
 {
   AttributeHeader ah;
-  if(!((r->blen - r->endrow) >= (clen+ah.getHeaderSize()*sizeof(Uint32))))
+  if(!(int(r->blen - r->endrow) >= int(clen+ah.getHeaderSize()*sizeof(Uint32))))
   {
     return -1; // Not enough room.
   }

=== modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp'
--- a/storage/ndb/src/kernel/vm/ArrayPool.hpp	2008-08-22 11:02:38 +0000
+++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp	2008-10-08 19:09:05 +0000
@@ -89,6 +89,8 @@ public:
    */
   bool seize(Ptr<T> &);
 
+  bool seizeList(Uint32 & n, Ptr<T> &);
+
   /**
    * Allocate object <b>i</b> from pool - update Ptr
    */
@@ -124,6 +126,34 @@ public:
   }
 #endif
 
+  /**
+   * Cache+LockFun is used to make thread-local caches for ndbmtd
+   *   I.e each thread has one cache-instance, and can seize/release on this
+   *       wo/ locks
+   */
+  struct Cache
+  {
+    Cache(Uint32 a0 = 128, Uint32 a1 = 128) { m_first_free = RNIL; m_free_cnt = 0; m_alloc_cnt = a0; m_max_free_cnt = a1; }
+    Uint32 m_first_free;
+    Uint32 m_free_cnt;
+    Uint32 m_alloc_cnt;
+    Uint32 m_max_free_cnt;
+  };
+
+  struct LockFun
+  {
+    void (* lock)(void);
+    void (* unlock)(void);
+  };
+
+  bool load(LockFun, Cache&, Uint32 n);
+  bool seize(LockFun, Cache&, Ptr<T> &);
+  void release(LockFun, Cache&, Uint32 i);
+  void release(LockFun, Cache&, Ptr<T> &);
+  void releaseList(LockFun, Cache&, Uint32 n, Uint32 first, Uint32 last);
+protected:
+  void releaseList(LockFun, Cache&, Uint32 n);
+
 protected:
   friend class Array<T>;
 
@@ -650,6 +680,29 @@ ArrayPool<T>::seize(Ptr<T> & ptr){
 template <class T>
 inline
 bool
+ArrayPool<T>::seizeList(Uint32 & cnt, Ptr<T> & ptr){
+  Uint32 save = cnt;
+  Uint32 tmp = save - 1;
+  if (seize(ptr))
+  {
+    Ptr<T> prev = ptr;
+    Ptr<T> curr;
+    while (tmp && seize(curr))
+    {
+      prev.p->nextPool = curr.i;
+      prev = curr;
+      tmp--;
+    }
+    prev.p->nextPool = RNIL;
+    cnt = save - tmp;
+    return true;
+  }
+  return false;
+}
+
+template <class T>
+inline
+bool
 ArrayPool<T>::seizeId(Ptr<T> & ptr, Uint32 i){
   Uint32 ff = firstFree;
   Uint32 prev = RNIL;
@@ -889,6 +942,111 @@ ArrayPool<T>::release(Ptr<T> & ptr){
   ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
 }
 
+#if 0
+#define DUMP(a,b) do { printf("%s c.m_first_free: %u c.m_free_cnt: %u %s", a, c.m_first_free, c.m_free_cnt, b); fflush(stdout); } while(0)
+#else
+#define DUMP(a,b)
+#endif
+
+template <class T>
+inline
+bool
+ArrayPool<T>::seize(LockFun l, Cache& c, Ptr<T> & p)
+{
+  DUMP("seize", "-> ");
+
+  Uint32 ff = c.m_first_free;
+  if (ff != RNIL)
+  {
+    c.m_first_free = theArray[ff].nextPool;
+    c.m_free_cnt--;
+    p.i = ff;
+    p.p = theArray + ff;
+    DUMP("LOCAL ", "\n");
+    return true;
+  }
+
+  Uint32 tmp = c.m_alloc_cnt;
+  l.lock();
+  seizeList(tmp, p);
+  l.unlock();
+
+  if (tmp)
+  {
+    c.m_first_free = theArray[p.i].nextPool;
+    c.m_free_cnt = tmp - 1;
+    DUMP("LOCKED", "\n");
+    return true;
+  }
+  return false;
+}
+
+template <class T>
+inline
+void
+ArrayPool<T>::release(LockFun l, Cache& c, Uint32 i)
+{
+  Ptr<T> tmp;
+  getPtr(tmp, i);
+  release(l, c, tmp);
+}
+
+template <class T>
+inline
+void
+ArrayPool<T>::release(LockFun l, Cache& c, Ptr<T> & p)
+{
+  p.p->nextPool = c.m_first_free;
+  c.m_first_free = p.i;
+  c.m_free_cnt ++;
+
+  if (c.m_free_cnt > 2 * c.m_max_free_cnt)
+  {
+    releaseList(l, c, c.m_max_free_cnt);
+  }
+}
+
+template <class T>
+inline
+void
+ArrayPool<T>::releaseList(LockFun l, Cache& c,
+                          Uint32 n, Uint32 first, Uint32 last)
+{
+  theArray[last].nextPool = c.m_first_free;
+  c.m_first_free = first;
+  c.m_free_cnt += n;
+
+  if (c.m_free_cnt > 2 * c.m_max_free_cnt)
+  {
+    releaseList(l, c, c.m_max_free_cnt);
+  }
+}
+
+template <class T>
+inline
+void
+ArrayPool<T>::releaseList(LockFun l, Cache& c, Uint32 n)
+{
+  DUMP("releaseListImpl", "-> ");
+  Uint32 ff = c.m_first_free;
+  Uint32 prev = ff;
+  Uint32 curr = ff;
+  Uint32 i;
+  for (i = 0; i < n && curr != RNIL; i++)
+  {
+    prev = curr;
+    curr = theArray[curr].nextPool;
+  }
+  c.m_first_free = curr;
+  c.m_free_cnt -= i;
+
+  DUMP("", "\n");
+
+  l.lock();
+  releaseList(i, ff, prev);
+  l.unlock();
+}
+
 template <class T>
 class UnsafeArrayPool : public ArrayPool<T> {
 public:

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2008-08-27 02:45:19 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2008-10-09 09:34:24 +0000
@@ -43,4 +43,6 @@ ADD_LIBRARY(ndbsched STATIC
             TransporterCallback_nonmt.cpp
             SimulatedBlock_nonmt.cpp
 	    dummy_nonmt.cpp
+	    SimplePropertiesSection_nonmt.cpp
+	    LongSignal_nonmt.cpp
 )

=== added file 'storage/ndb/src/kernel/vm/LongSignal.cpp'
--- a/storage/ndb/src/kernel/vm/LongSignal.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal.cpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,345 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "LongSignal.hpp"
+#include "LongSignalImpl.hpp"
+
+/**
+ * verifySection
+ * Assertion method to check that a segmented section is constructed
+ * 'properly' where 'properly' is loosly defined.
+ */
+bool
+verifySection(Uint32 firstIVal, SectionSegmentPool& thePool)
+{
+  if (firstIVal == RNIL)
+    return true;
+
+  /* Get first section ptr (With assertions in getPtr) */
+  SectionSegment* first= thePool.getPtr(firstIVal);
+
+  assert(first != NULL);
+  Uint32 totalSize= first->m_sz;
+  Uint32 lastSegIVal= first->m_lastSegment;
+
+  /* Hmm, need to be careful of length == 0
+   * Nature abhors a segmented section with length 0
+   */
+  //assert(totalSize != 0);
+  assert(lastSegIVal != RNIL); /* Should never be == RNIL */
+  /* We ignore m_ownerRef */
+
+  if (totalSize <= SectionSegment::DataLength)
+  {
+    /* 1 segment */
+    assert(first->m_lastSegment == firstIVal);
+    // m_nextSegment not always set to RNIL on last segment
+    //assert(first->m_nextSegment == RNIL);
+  }
+  else
+  {
+    /* > 1 segment */
+    assert(first->m_nextSegment != RNIL);
+    assert(first->m_lastSegment != firstIVal);
+    Uint32 currIVal= firstIVal;
+    SectionSegment* curr= first;
+
+    /* Traverse segments to where we think the end should be */
+    while (totalSize > SectionSegment::DataLength)
+    {
+      currIVal= curr->m_nextSegment;
+      curr= thePool.getPtr(currIVal);
+      totalSize-= SectionSegment::DataLength;
+      /* Ignore m_ownerRef, m_sz, m_lastSegment of intermediate
+       * Segments
+       */
+    }
+
+    /* Once we are here, we are on the last Segment of this Section
+     * Check that last segment is as stated in the first segment
+     */
+    assert(currIVal == lastSegIVal);
+    // m_nextSegment not always set properly on last segment
+    //assert(curr->m_nextSegment == RNIL);
+    /* Ignore m_ownerRef, m_sz, m_lastSegment of last segment */
+  }
+
+  return true;
+}
+
+void
+copy(Uint32 * & insertPtr,
+     class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
+
+  Uint32 len = _ptr.sz;
+  SectionSegment * ptrP = _ptr.p;
+
+  assert(verifySection(_ptr.i, thePool));
+
+  while(len > 60){
+    memcpy(insertPtr, &ptrP->theData[0], 4 * 60);
+    len -= 60;
+    insertPtr += 60;
+    ptrP = thePool.getPtr(ptrP->m_nextSegment);
+  }
+  memcpy(insertPtr, &ptrP->theData[0], 4 * len);
+  insertPtr += len;
+}
+
+void
+copy(Uint32 * dst, SegmentedSectionPtr src){
+  copy(dst, g_sectionSegmentPool, src);
+}
+
+/* Copy variant which takes an IVal */
+void
+copy(Uint32* dst, Uint32 srcFirstIVal)
+{
+  SegmentedSectionPtr p;
+  getSection(p, srcFirstIVal);
+
+  copy(dst, p);
+}
+
+void
+print(SectionSegment * s, Uint32 len, FILE* out){
+  for(Uint32 i = 0; i<len; i++){
+    fprintf(out, "H\'0x%.8x ", s->theData[i]);
+    if(((i + 1) % 6) == 0)
+      fprintf(out, "\n");
+  }
+}
+
+void
+print(SegmentedSectionPtr ptr, FILE* out){
+
+  ptr.p = g_sectionSegmentPool.getPtr(ptr.i);
+  Uint32 len = ptr.p->m_sz;
+
+  fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr.i, ptr.p, len, ptr.sz);
+  while(len > SectionSegment::DataLength){
+    print(ptr.p, SectionSegment::DataLength, out);
+
+    len -= SectionSegment::DataLength;
+    fprintf(out, "ptr.i = %d\n", ptr.p->m_nextSegment);
+    ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment);
+  }
+
+  print(ptr.p, len, out);
+  fprintf(out, "\n");
+}
+
+bool
+dupSection(SPC_ARG Uint32& copyFirstIVal, Uint32 srcFirstIVal)
+{
+  assert(verifySection(srcFirstIVal));
+
+  SectionSegment* p= g_sectionSegmentPool.getPtr(srcFirstIVal);
+  Uint32 sz= p->m_sz;
+  copyFirstIVal= RNIL;
+  bool ok= true;
+
+  /* Duplicate bulk of section */
+  while (sz > SectionSegment::DataLength)
+  {
+    ok= appendToSection(SPC_CACHE_ARG copyFirstIVal, &p->theData[0],
+                        SectionSegment::DataLength);
+    if (!ok)
+      break;
+
+    sz-= SectionSegment::DataLength;
+    srcFirstIVal= p->m_nextSegment;
+    p= g_sectionSegmentPool.getPtr(srcFirstIVal);
+  }
+
+  /* Duplicate last segment */
+  if (ok && (sz > 0))
+    ok= appendToSection(SPC_CACHE_ARG copyFirstIVal, &p->theData[0], sz);
+
+  if (unlikely(!ok))
+  {
+    releaseSection(SPC_CACHE_ARG copyFirstIVal);
+    copyFirstIVal= RNIL;
+    return false;
+  }
+
+  assert(verifySection(copyFirstIVal));
+  return true;
+}
+
+/**
+ * appendToSection
+ * Append supplied words to the chain of section segments
+ * indicated by the first section passed.
+ * If the passed IVal == RNIL then a section will be seized
+ * and the IVal will be updated to indicate the first IVal
+ * section in the chain
+ * Sections are made up of linked SectionSegment objects
+ * where :
+ *   - The first SectionSegment's m_sz is the size of the
+ *     whole section
+ *   - The first SectionSegment's m_lastSegment refers to
+ *     the last segment in the section
+ *   - Each SectionSegment's m_nextSegment refers to the
+ *     next segment in the section, *except* for the last
+ *     SectionSegment's which is RNIL
+ *   - Each SectionSegment except the first does not use
+ *     its m_sz or m_nextSegment members.
+ */
+bool
+appendToSection(SPC_ARG Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
+{
+  Ptr<SectionSegment> firstPtr, currPtr;
+
+  if (len == 0)
+    return true;
+
+  Uint32 remain= SectionSegment::DataLength;
+  Uint32 segmentLen= 0;
+
+  if (firstSegmentIVal == RNIL)
+  {
+    /* First data to be added to this section */
+    bool result= g_sectionSegmentPool.seize(SPC_SEIZE_ARG firstPtr);
+
+    if (!result)
+      return false;
+
+    firstPtr.p->m_sz= 0;
+    firstPtr.p->m_ownerRef= 0;
+    firstSegmentIVal= firstPtr.i;
+
+    currPtr= firstPtr;
+  }
+  else
+  {
+    /* Section has at least one segment with data already */
+    g_sectionSegmentPool.getPtr(firstPtr, firstSegmentIVal);
+    g_sectionSegmentPool.getPtr(currPtr, firstPtr.p->m_lastSegment);
+
+    Uint32 existingLen= firstPtr.p->m_sz;
+    assert(existingLen > 0);
+    segmentLen= existingLen % SectionSegment::DataLength;
+
+    /* If existingLen %  SectionSegment::DataLength == 0
+     * we assume that the last segment is full
+     */
+    segmentLen= segmentLen == 0 ?
+      SectionSegment::DataLength :
+      segmentLen;
+
+    remain= SectionSegment::DataLength - segmentLen;
+  }
+
+  firstPtr.p->m_sz+= len;
+
+  while(len > remain) {
+    /* Fill this segment, and link in another one
+     * Note that we can memcpy to a bad address with size 0
+     */
+    memcpy(&currPtr.p->theData[segmentLen], src, remain << 2);
+    src += remain;
+    len -= remain;
+    Ptr<SectionSegment> prevPtr= currPtr;
+    bool result = g_sectionSegmentPool.seize(SPC_SEIZE_ARG currPtr);
+    if (!result)
+    {
+      /* Failed, ensure segment list is consistent for
+       * freeing later
+       */
+      firstPtr.p->m_lastSegment= prevPtr.i;
+      firstPtr.p->m_sz-= len;
+      return false;
+    }
+    prevPtr.p->m_nextSegment = currPtr.i;
+    currPtr.p->m_sz= 0;
+    currPtr.p->m_ownerRef= 0;
+
+    segmentLen= 0;
+    remain= SectionSegment::DataLength;
+  }
+
+  /* Data fits in the current last segment */
+  firstPtr.p->m_lastSegment= currPtr.i;
+  currPtr.p->m_nextSegment= RNIL;
+  memcpy(&currPtr.p->theData[segmentLen], src, len << 2);
+
+  return true;
+}
+
+bool
+import(SPC_ARG Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
+
+  first.p = 0;
+  if(g_sectionSegmentPool.seize(SPC_SEIZE_ARG first)){
+    ;
+  } else {
+    ndbout_c("No Segmented Sections for import");
+    return false;
+  }
+
+  first.p->m_sz = len;
+  first.p->m_ownerRef = 0;
+
+  Ptr<SectionSegment> currPtr = first;
+
+  while(len > SectionSegment::DataLength){
+    memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength);
+    src += SectionSegment::DataLength;
+    len -= SectionSegment::DataLength;
+    Ptr<SectionSegment> prevPtr = currPtr;
+    if(g_sectionSegmentPool.seize(SPC_SEIZE_ARG currPtr)){
+      prevPtr.p->m_nextSegment = currPtr.i;
+      ;
+    } else {
+      /* Leave segment chain in ok condition for release */
+      first.p->m_lastSegment = prevPtr.i;
+      first.p->m_sz-= len;
+      prevPtr.p->m_nextSegment = RNIL;
+      ndbout_c("Not enough Segmented Sections during import");
+      return false;
+    }
+  }
+
+  first.p->m_lastSegment = currPtr.i;
+  currPtr.p->m_nextSegment = RNIL;
+  memcpy(&currPtr.p->theData[0], src, 4 * len);
+
+  assert(verifySection(first.i));
+  return true;
+}
+
+void
+release(SPC_ARG SegmentedSectionPtr & ptr)
+{
+  g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                   relSz(ptr.sz),
+				   ptr.i,
+				   ptr.p->m_lastSegment);
+}
+
+void
+releaseSection(SPC_ARG Uint32 firstSegmentIVal)
+{
+  if (firstSegmentIVal != RNIL)
+  {
+    SectionSegment* p = g_sectionSegmentPool.getPtr(firstSegmentIVal);
+
+    g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                     relSz(p->m_sz),
+                                     firstSegmentIVal,
+                                     p->m_lastSegment);
+  }
+}

=== modified file 'storage/ndb/src/kernel/vm/LongSignal.hpp'
--- a/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-09-04 15:45:21 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-10-08 19:09:05 +0000
@@ -50,38 +50,14 @@ extern SectionSegmentPool g_sectionSegme
  * Function prototypes
  */
 void print(SegmentedSectionPtr ptr, FILE* out);
-void copy(SegmentedSectionPtr dst, Uint32 * src, Uint32 len);
 void copy(Uint32 * dst, SegmentedSectionPtr src);
 void copy(Uint32 * dst, Uint32 srcFirstIVal);
-bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
-/* appendToSection : If firstSegmentIVal == RNIL, import */
-bool appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);
-/* dupSection : Create new section as copy of src section */
-bool dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal);
-
-
-inline
-bool
-import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
-{
-  Ptr<SectionSegment> tmp;
-  if (import(tmp, src, len))
-  {
-    ptr.i = tmp.i;
-    ptr.p = tmp.p;
-    ptr.sz = len;
-    return true;
-  }
-  return false;
-}
 
 extern class SectionSegmentPool g_sectionSegmentPool;
 
 /* Defined in SimulatedBlock.cpp */
 void getSection(SegmentedSectionPtr & ptr, Uint32 id);
 void getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]);
-void release(SegmentedSectionPtr & ptr);
-void releaseSection(Uint32 firstSegmentIVal);
 
 /* Internal verification */
 bool verifySection(Uint32 firstIVal, 

=== added file 'storage/ndb/src/kernel/vm/LongSignalImpl.hpp'
--- a/storage/ndb/src/kernel/vm/LongSignalImpl.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignalImpl.hpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,54 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef NDB_LS_IMPL_HPP
+#define NDB_LS_IMPL_HPP
+
+#include "LongSignal.hpp"
+
+#ifdef NDBD_MULTITHREADED
+#include "mt.hpp"
+#define SPC_ARG SectionSegmentPool::Cache& cache,
+#define SPC_SEIZE_ARG f_section_lock, cache,
+#define SPC_CACHE_ARG cache,
+static
+SectionSegmentPool::LockFun
+f_section_lock =
+{
+  mt_section_lock,
+  mt_section_unlock
+};
+#else
+#define SPC_ARG
+#define SPC_SEIZE_ARG
+#define SPC_CACHE_ARG
+#endif
+
+/* Calculate number of segments to release based on section size
+ * Always release one segment, even if size is zero
+ */
+#define relSz(x) ((x == 0)? 1 : ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength))
+
+bool import(SPC_ARG Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
+
+/* appendToSection : If firstSegmentIVal == RNIL, import */
+bool appendToSection(SPC_ARG Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);
+/* dupSection : Create new section as copy of src section */
+bool dupSection(SPC_ARG Uint32& copyFirstIVal, Uint32 srcFirstIVal);
+
+void release(SPC_ARG SegmentedSectionPtr & ptr);
+void releaseSection(SPC_ARG Uint32 firstSegmentIVal);
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/LongSignal_mt.cpp'
--- a/storage/ndb/src/kernel/vm/LongSignal_mt.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal_mt.cpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,17 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#define NDBD_MULTITHREADED
+#include "LongSignal.cpp"

=== added file 'storage/ndb/src/kernel/vm/LongSignal_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/LongSignal_nonmt.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal_nonmt.cpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,17 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#undef NDBD_MULTITHREADED
+#include "LongSignal.cpp"

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2008-09-17 13:23:21 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2008-10-08 21:20:38 +0000
@@ -30,7 +30,6 @@ libkernel_a_SOURCES = \
 	Emulator.cpp		\
 	Configuration.cpp		\
 	WatchDog.cpp \
-        SimplePropertiesSection.cpp \
         SectionReader.cpp \
         Mutex.cpp SafeCounter.cpp \
         Rope.cpp \
@@ -44,14 +43,18 @@ libsched_a_SOURCES = TimeQueue.cpp		\
                      FastScheduler.cpp \
                      TransporterCallback_nonmt.cpp \
                      SimulatedBlock_nonmt.cpp \
+                     LongSignal_nonmt.cpp \
+                     SimplePropertiesSection_nonmt.cpp \
 		     dummy_nonmt.cpp
 
 libsched_mt_a_SOURCES = SimulatedBlock_mt.cpp \
                         TransporterCallback_mt.cpp \
+                        LongSignal_mt.cpp \
+                        SimplePropertiesSection_mt.cpp \
                         mt.cpp \
                         dummy_mt.cpp
 
-EXTRA_DIST=SimulatedBlock.cpp TransporterCallback.cpp CMakeLists.txt
+EXTRA_DIST=SimulatedBlock.cpp TransporterCallback.cpp CMakeLists.txt LongSignal.cpp SimplePropertiesSection.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 

=== modified file 'storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp'
--- a/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp	2008-05-16 13:08:36 +0000
+++ b/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp	2008-10-08 19:09:05 +0000
@@ -16,6 +16,8 @@
 #include <SimpleProperties.hpp>
 #include <TransporterDefinitions.hpp>
 #include "LongSignal.hpp"
+#include "LongSignalImpl.hpp"
+#include "SimulatedBlock.hpp"
 
 SimplePropertiesSectionReader::SimplePropertiesSectionReader
 (struct SegmentedSectionPtr & ptr, class SectionSegmentPool & pool)
@@ -118,8 +120,8 @@ SimplePropertiesSectionReader::getWords(
   return false;
 }
 
-SimplePropertiesSectionWriter::SimplePropertiesSectionWriter(class SectionSegmentPool & pool)
-  : m_pool(pool)
+SimplePropertiesSectionWriter::SimplePropertiesSectionWriter(class SimulatedBlock & block)
+  : m_pool(block.getSectionSegmentPool()), m_block(block)
 {
   m_pos = -1;
   m_head = 0;
@@ -128,13 +130,17 @@ SimplePropertiesSectionWriter::SimplePro
   reset();
 }
 
-extern void release(SegmentedSectionPtr & ptr);
-
 SimplePropertiesSectionWriter::~SimplePropertiesSectionWriter()
 {
   release();
 }
 
+#ifdef NDBD_MULTITHREADED
+#define SP_POOL_ARG f_section_lock, *m_block.m_sectionPoolCache,
+#else
+#define SP_POOL_ARG
+#endif
+
 void
 SimplePropertiesSectionWriter::release()
 {
@@ -150,14 +156,14 @@ SimplePropertiesSectionWriter::release()
       m_head->m_lastSegment = m_currentSegment->m_lastSegment;
 
       if((m_pos % SectionSegment::DataLength) == 0){
-        m_pool.release(m_currentSegment->m_lastSegment);
+        m_pool.release(SP_POOL_ARG m_currentSegment->m_lastSegment);
         m_head->m_lastSegment = m_prevPtrI;
       }
-      ::release(ptr);
+      m_block.release(ptr);
     }
     else
     {
-      m_pool.release(m_head->m_lastSegment);
+      m_pool.release(SP_POOL_ARG m_head->m_lastSegment);
     }
   }
   m_pos = -1;
@@ -171,7 +177,7 @@ SimplePropertiesSectionWriter::reset()
 {
   release();
   Ptr<SectionSegment> first;
-  if(m_pool.seize(first)){
+  if(m_pool.seize(SP_POOL_ARG first)){
     ;
   } else {
     m_pos = -1;
@@ -201,7 +207,7 @@ SimplePropertiesSectionWriter::putWords(
   while(len >= left){
     memcpy(&m_currentSegment->theData[m_pos], src, 4 * left);
     Ptr<SectionSegment> next;    
-    if(m_pool.seize(next)){
+    if(m_pool.seize(SP_POOL_ARG next)){
 
       m_prevPtrI = m_currentSegment->m_lastSegment;
       m_currentSegment->m_nextSegment = next.i;
@@ -261,7 +267,7 @@ SimplePropertiesSectionWriter::getPtr(st
 
   if (m_head)
   {
-    m_pool.release(m_head->m_lastSegment);
+    m_pool.release(SP_POOL_ARG m_head->m_lastSegment);
   }
 
   m_sz = 0;

=== added file 'storage/ndb/src/kernel/vm/SimplePropertiesSection_mt.cpp'
--- a/storage/ndb/src/kernel/vm/SimplePropertiesSection_mt.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SimplePropertiesSection_mt.cpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,17 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#define NDBD_MULTITHREADED
+#include "SimplePropertiesSection.cpp"

=== added file 'storage/ndb/src/kernel/vm/SimplePropertiesSection_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/SimplePropertiesSection_nonmt.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/SimplePropertiesSection_nonmt.cpp	2008-10-08 19:09:05 +0000
@@ -0,0 +1,17 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#undef NDBD_MULTITHREADED
+#include "SimplePropertiesSection.cpp"

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-10-01 07:23:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-10-08 19:09:05 +0000
@@ -48,6 +48,7 @@
 #include <NdbSqlUtil.hpp>
 
 #include "../blocks/dbdih/Dbdih.hpp"
+#include "LongSignalImpl.hpp"
 
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
@@ -220,12 +221,12 @@ SimulatedBlock::addRecSignalImpl(GlobalS
 }
 
 void
-SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
-                               Uint32 *watchDogCounter)
+SimulatedBlock::assignToThread(ThreadContext ctx)
 {
-  m_threadId = threadId;
-  m_jamBuffer = jamBuffer;
-  m_watchDogCounter = watchDogCounter;
+  m_threadId = ctx.threadId;
+  m_jamBuffer = ctx.jamBuffer;
+  m_watchDogCounter = ctx.watchDogCounter;
+  m_sectionPoolCache = ctx.sectionPoolCache;
 }
 
 Uint32
@@ -364,167 +365,34 @@ getSection(SegmentedSectionPtr & ptr, Ui
 }
 
 #ifdef NDBD_MULTITHREADED
-#define MT_SECTION_LOCK mt_section_lock();
-#define MT_SECTION_UNLOCK mt_section_unlock();
+#define SB_SP_ARG *m_sectionPoolCache,
 #else
-#define MT_SECTION_LOCK
-#define MT_SECTION_UNLOCK
+#define SB_SP_ARG
 #endif
 
-/**
- * appendToSection
- * Append supplied words to the chain of section segments 
- * indicated by the first section passed.
- * If the passed IVal == RNIL then a section will be seized
- * and the IVal will be updated to indicate the first IVal
- * section in the chain
- * Sections are made up of linked SectionSegment objects
- * where : 
- *   - The first SectionSegment's m_sz is the size of the
- *     whole section
- *   - The first SectionSegment's m_lastSegment refers to
- *     the last segment in the section
- *   - Each SectionSegment's m_nextSegment refers to the
- *     next segment in the section, *except* for the last
- *     SectionSegment's which is RNIL
- *   - Each SectionSegment except the first does not use
- *     its m_sz or m_nextSegment members.
- */
-bool
-appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len) {
-  Ptr<SectionSegment> firstPtr, currPtr;
-
-  if (len == 0) 
-    return true;  
-
-  Uint32 remain= SectionSegment::DataLength;
-  Uint32 segmentLen= 0;
-
-  if (firstSegmentIVal == RNIL)
-  {
-    /* First data to be added to this section */
-    MT_SECTION_LOCK
-      bool result= g_sectionSegmentPool.seize(firstPtr);
-    MT_SECTION_UNLOCK
-
-    if (!result)
-      return false;
-    
-    firstPtr.p->m_sz= 0;
-    firstPtr.p->m_ownerRef= 0;
-    firstSegmentIVal= firstPtr.i;
-
-    currPtr= firstPtr;
-  }
-  else
-  {
-    /* Section has at least one segment with data already */
-    g_sectionSegmentPool.getPtr(firstPtr, firstSegmentIVal);
-    g_sectionSegmentPool.getPtr(currPtr, firstPtr.p->m_lastSegment);
-
-    Uint32 existingLen= firstPtr.p->m_sz;
-    assert(existingLen > 0);
-    segmentLen= existingLen % SectionSegment::DataLength;
-    
-    /* If existingLen %  SectionSegment::DataLength == 0
-     * we assume that the last segment is full
-     */
-    segmentLen= segmentLen == 0 ? 
-      SectionSegment::DataLength :
-      segmentLen;
-
-    remain= SectionSegment::DataLength - segmentLen;
-  }
-
-  firstPtr.p->m_sz+= len;
-
-  while(len > remain) {
-    /* Fill this segment, and link in another one 
-     * Note that we can memcpy to a bad address with size 0
-     */
-    memcpy(&currPtr.p->theData[segmentLen], src, remain << 2);
-    src += remain;
-    len -= remain;
-    Ptr<SectionSegment> prevPtr= currPtr;
-    MT_SECTION_LOCK
-      bool result = g_sectionSegmentPool.seize(currPtr);
-    MT_SECTION_UNLOCK
-    if (!result)
-    {
-      /* Failed, ensure segment list is consistent for
-       * freeing later
-       */
-      firstPtr.p->m_lastSegment= prevPtr.i;
-      firstPtr.p->m_sz-= len;
-      return false;
-    }
-    prevPtr.p->m_nextSegment = currPtr.i;
-    currPtr.p->m_sz= 0;
-    currPtr.p->m_ownerRef= 0;
-
-    segmentLen= 0;
-    remain= SectionSegment::DataLength;
-  }
-
-  /* Data fits in the current last segment */
-  firstPtr.p->m_lastSegment= currPtr.i;
-  currPtr.p->m_nextSegment= RNIL;
-  memcpy(&currPtr.p->theData[segmentLen], src, len << 2);
-
-  return true;
-}
-
-/* Macro to calculate number of segments to free for given section length
- * Since releaseList() always releases one segment, we make sure to always
- * ask for one to be released, even when x == 0.
- */
-#define relSz(x) ((x == 0)?1 : ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength))
-
-void
-release(SegmentedSectionPtr & ptr){
-  MT_SECTION_LOCK
-  g_sectionSegmentPool.releaseList(relSz(ptr.sz),
-				   ptr.i, 
-				   ptr.p->m_lastSegment);
-  MT_SECTION_UNLOCK
-}
-
+static
 void
-releaseSection(Uint32 firstSegmentIVal)
-{
-  if (firstSegmentIVal != RNIL)
-  {
-    SectionSegment* p = g_sectionSegmentPool.getPtr(firstSegmentIVal);
-
-    MT_SECTION_LOCK
-    g_sectionSegmentPool.releaseList(relSz(p->m_sz),
-                                     firstSegmentIVal,
-                                     p->m_lastSegment);
-    MT_SECTION_UNLOCK
-  }
-}
-
-static void
-releaseSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
+releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
   Uint32 tSec0 = ptr[0].i;
   Uint32 tSz0 = ptr[0].sz;
   Uint32 tSec1 = ptr[1].i;
   Uint32 tSz1 = ptr[1].sz;
   Uint32 tSec2 = ptr[2].i;
   Uint32 tSz2 = ptr[2].sz;
-  MT_SECTION_LOCK
   switch(secCount){
   case 3:
-    g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2, 
+    g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                     relSz(tSz2), tSec2,
 				     ptr[2].p->m_lastSegment);
   case 2:
-    g_sectionSegmentPool.releaseList(relSz(tSz1), tSec1, 
+    g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                     relSz(tSz1), tSec1,
 				     ptr[1].p->m_lastSegment);
   case 1:
-    g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0, 
+    g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                     relSz(tSz0), tSec0,
 				     ptr[0].p->m_lastSegment);
   case 0:
-    MT_SECTION_UNLOCK
     return;
   }
   char msg[40];
@@ -532,7 +400,6 @@ releaseSections(Uint32 secCount, Segment
   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
 }
 
-
 void 
 SimulatedBlock::sendSignal(BlockReference ref, 
 			   GlobalSignalNumber gsn, 
@@ -777,7 +644,7 @@ SimulatedBlock::sendSignal(BlockReferenc
      */
     Ptr<SectionSegment> segptr[3];
     for(Uint32 i = 0; i<noOfSections; i++){
-      ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
+      ndbrequire(::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz));
       signal->theData[length+i] = segptr[i].i;
     }
     
@@ -892,7 +759,7 @@ SimulatedBlock::sendSignal(NodeReceiverG
      */
     Ptr<SectionSegment> segptr[3];
     for(Uint32 i = 0; i<noOfSections; i++){
-      ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz));
+      ndbrequire(::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz));
       signal->theData[length+i] = segptr[i].i;
     }
 
@@ -1051,7 +918,7 @@ SimulatedBlock::sendSignal(BlockReferenc
 #endif
 
     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
-    ::releaseSections(noOfSections, sections->m_ptr);
+    ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
   }
 
   signal->header.m_noOfSections = 0;
@@ -1178,7 +1045,7 @@ SimulatedBlock::sendSignal(NodeReceiverG
 
   if (release)
   {
-    ::releaseSections(noOfSections, sections->m_ptr);
+    ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
   }
 
   sections->m_cnt = 0;
@@ -1248,7 +1115,7 @@ SimulatedBlock::sendSignalNoRelease(Bloc
     for (Uint32 sec=0; sec < noOfSections; sec++)
     {
       Uint32 secCopy;
-      bool ok= dupSection(secCopy, sections->m_ptr[sec].i);
+      bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
       ndbrequire (ok);
       * dst ++ = secCopy;
     }
@@ -1371,7 +1238,7 @@ SimulatedBlock::sendSignalNoRelease(Node
     for (Uint32 sec=0; sec < noOfSections; sec++)
     {
       Uint32 secCopy;
-      bool ok= dupSection(secCopy, sections->m_ptr[sec].i);
+      bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
       ndbrequire (ok);
       * dst ++ = secCopy;
     }
@@ -1530,12 +1397,56 @@ SimulatedBlock::sendSignalWithDelay(Bloc
 }
 
 void
+SimulatedBlock::release(SegmentedSectionPtr & ptr)
+{
+  ::release(SB_SP_ARG ptr);
+}
+
+void
+SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
+{
+  ::releaseSection(SB_SP_ARG firstSegmentIVal);
+}
+
+void
 SimulatedBlock::releaseSections(SectionHandle& handle)
 {
-  ::releaseSections(handle.m_cnt, handle.m_ptr);
+  ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
   handle.m_cnt = 0;
 }
 
+bool
+SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
+{
+  return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
+}
+
+bool
+SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
+{
+  return ::import(SB_SP_ARG first, src, len);
+}
+
+bool
+SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
+{
+  Ptr<SectionSegment> tmp;
+  if (::import(SB_SP_ARG tmp, src, len))
+  {
+    ptr.i = tmp.i;
+    ptr.p = tmp.p;
+    ptr.sz = len;
+    return true;
+  }
+  return false;
+}
+
+bool
+SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
+{
+  return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
+}
+
 class SectionSegmentPool& 
 SimulatedBlock::getSectionSegmentPool(){
   return g_sectionSegmentPool;

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-10-08 14:19:37 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-10-09 01:13:03 +0000
@@ -112,6 +112,7 @@ class SimulatedBlock {
   friend struct Pool_context;
   friend struct SectionHandle;
   friend class LockQueue;
+  friend class SimplePropertiesSectionWriter;
 public:
   friend class BlockComponent;
   virtual ~SimulatedBlock();
@@ -160,9 +161,15 @@ public:
   }
   virtual void loadWorkers() {}
 
+  struct ThreadContext
+  {
+    Uint32 threadId;
+    EmulatedJamBuffer* jamBuffer;
+    Uint32 * watchDogCounter;
+    SectionSegmentPool::Cache * sectionPoolCache;
+  };
   /* Setup state of a block object for executing in a particular thread. */
-  void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
-                      Uint32 *watchDogCounter);
+  void assignToThread(ThreadContext ctx);
   /* For multithreaded ndbd, get the id of owning thread. */
   uint32 getThreadId() const { return m_threadId; }
   static bool isMultiThreaded();
@@ -289,8 +296,15 @@ protected:
                       Uint32 givenInstanceNo = ZNIL);
   
   class SectionSegmentPool& getSectionSegmentPool();
+  void release(SegmentedSectionPtr & ptr);
+  void releaseSection(Uint32 firstSegmentIVal);
   void releaseSections(struct SectionHandle&);
 
+  bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
+  bool import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len);
+  bool appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);
+  bool dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal);
+
   void handle_invalid_sections_in_send_signal(Signal*) const;
   void handle_lingering_sections_after_execute(Signal*) const;
   void handle_lingering_sections_after_execute(SectionHandle*) const;
@@ -503,6 +517,8 @@ private:
   EmulatedJamBuffer *m_jamBuffer;
   /* For multithreaded ndb, the thread-specific watchdog counter. */
   Uint32 *m_watchDogCounter;
+
+  SectionSegmentPool::Cache * m_sectionPoolCache;
 protected:
   Block_context m_ctx;
   NewVARIABLE* allocateBat(int batSize);

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-09-17 13:23:21 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-10-08 19:09:05 +0000
@@ -22,6 +22,7 @@
 #include <ErrorHandlingMacros.hpp>
 
 #include "LongSignal.hpp"
+#include "LongSignalImpl.hpp"
 
 #include <signaldata/EventReport.hpp>
 #include <signaldata/TestOrd.hpp>
@@ -33,8 +34,6 @@
 #include "DataBuffer.hpp"
 #include "TransporterCallbackKernel.hpp"
 
-
-
 /**
  * The instance
  */
@@ -62,201 +61,6 @@ const char *lookupConnectionError(Uint32
   return connectionError[i].text;
 }
 
-#ifdef NDBD_MULTITHREADED
-#define MT_SECTION_LOCK mt_section_lock();
-#define MT_SECTION_UNLOCK mt_section_unlock();
-#else
-#define MT_SECTION_LOCK
-#define MT_SECTION_UNLOCK
-#endif
-
-/**
- * verifySection
- * Assertion method to check that a segmented section is constructed 
- * 'properly' where 'properly' is loosly defined. 
- */
-bool
-verifySection(Uint32 firstIVal, SectionSegmentPool& thePool)
-{
-  if (firstIVal == RNIL)
-    return true;
-
-  /* Get first section ptr (With assertions in getPtr) */
-  SectionSegment* first= thePool.getPtr(firstIVal);
-
-  assert(first != NULL);
-  Uint32 totalSize= first->m_sz;
-  Uint32 lastSegIVal= first->m_lastSegment;
-
-  /* Hmm, need to be careful of length == 0 
-   * Nature abhors a segmented section with length 0
-   */
-  //assert(totalSize != 0);
-  assert(lastSegIVal != RNIL); /* Should never be == RNIL */
-  /* We ignore m_ownerRef */
-
-  if (totalSize <= SectionSegment::DataLength)
-  {
-    /* 1 segment */
-    assert(first->m_lastSegment == firstIVal);
-    // m_nextSegment not always set to RNIL on last segment
-    //assert(first->m_nextSegment == RNIL);
-  }
-  else
-  {
-    /* > 1 segment */
-    assert(first->m_nextSegment != RNIL);
-    assert(first->m_lastSegment != firstIVal);
-    Uint32 currIVal= firstIVal;
-    SectionSegment* curr= first;
-
-    /* Traverse segments to where we think the end should be */
-    while (totalSize > SectionSegment::DataLength)
-    {
-      currIVal= curr->m_nextSegment;
-      curr= thePool.getPtr(currIVal);
-      totalSize-= SectionSegment::DataLength;
-      /* Ignore m_ownerRef, m_sz, m_lastSegment of intermediate
-       * Segments
-       */
-    }
-    
-    /* Once we are here, we are on the last Segment of this Section
-     * Check that last segment is as stated in the first segment
-     */
-    assert(currIVal == lastSegIVal);
-    // m_nextSegment not always set properly on last segment
-    //assert(curr->m_nextSegment == RNIL);
-    /* Ignore m_ownerRef, m_sz, m_lastSegment of last segment */
-  }
-
-  return true;
-}
-
-bool
-import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
-
-  first.p = 0;
-  MT_SECTION_LOCK
-  if(g_sectionSegmentPool.seize(first)){
-    ;
-  } else {
-    MT_SECTION_UNLOCK
-    ndbout_c("No Segmented Sections for import");
-    return false;
-  }
-
-  first.p->m_sz = len;
-  first.p->m_ownerRef = 0;
-  
-  Ptr<SectionSegment> currPtr = first;
-  
-  while(len > SectionSegment::DataLength){
-    memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength);
-    src += SectionSegment::DataLength;
-    len -= SectionSegment::DataLength;
-    Ptr<SectionSegment> prevPtr = currPtr;
-    if(g_sectionSegmentPool.seize(currPtr)){
-      prevPtr.p->m_nextSegment = currPtr.i;
-      ;
-    } else {
-      /* Leave segment chain in ok condition for release */
-      first.p->m_lastSegment = prevPtr.i;
-      first.p->m_sz-= len;
-      prevPtr.p->m_nextSegment = RNIL;
-      MT_SECTION_UNLOCK
-      ndbout_c("Not enough Segmented Sections during import");
-      return false;
-    }
-  }
-  MT_SECTION_UNLOCK
-
-  first.p->m_lastSegment = currPtr.i;
-  currPtr.p->m_nextSegment = RNIL;
-  memcpy(&currPtr.p->theData[0], src, 4 * len);
-
-  assert(verifySection(first.i));
-  return true;
-}
-
-void 
-copy(Uint32 * & insertPtr, 
-     class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
-
-  Uint32 len = _ptr.sz;
-  SectionSegment * ptrP = _ptr.p;
-
-  assert(verifySection(_ptr.i, thePool));
-  
-  while(len > 60){
-    memcpy(insertPtr, &ptrP->theData[0], 4 * 60);
-    len -= 60;
-    insertPtr += 60;
-    ptrP = thePool.getPtr(ptrP->m_nextSegment);
-  }
-  memcpy(insertPtr, &ptrP->theData[0], 4 * len);
-  insertPtr += len;
-}
-
-void
-copy(Uint32 * dst, SegmentedSectionPtr src){
-  copy(dst, g_sectionSegmentPool, src);
-}
-
-/* Copy variant which takes an IVal */
-void
-copy(Uint32* dst, Uint32 srcFirstIVal)
-{
-  SegmentedSectionPtr p;
-  getSection(p, srcFirstIVal);
-
-  copy(dst, p);
-} 
-
-bool
-dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
-{
-  assert(verifySection(srcFirstIVal));
-
-  SectionSegment* p= g_sectionSegmentPool.getPtr(srcFirstIVal);
-  Uint32 sz= p->m_sz;
-  copyFirstIVal= RNIL;
-  bool ok= true;
-  
-  /* Duplicate bulk of section */
-  while (sz > SectionSegment::DataLength)
-  {
-    ok= appendToSection(copyFirstIVal, &p->theData[0], 
-                        SectionSegment::DataLength);
-    if (!ok)
-      break;
-
-    sz-= SectionSegment::DataLength;
-    srcFirstIVal= p->m_nextSegment;
-    p= g_sectionSegmentPool.getPtr(srcFirstIVal);
-  }
-  
-  /* Duplicate last segment */
-  if (ok && (sz > 0))
-    ok= appendToSection(copyFirstIVal, &p->theData[0], sz);
-
-  if (unlikely(!ok))
-  {
-    releaseSection(copyFirstIVal);
-    copyFirstIVal= RNIL;
-    return false;
-  }
-  
-  assert(verifySection(copyFirstIVal));
-  return true;
-}
-
-
-/* Calculate number of segments to release based on section size
- * Always release one segment, even if size is zero
- */
-#define relSz(x) ((x == 0)? 1 : ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength))
-
 #include <DebuggerNames.hpp>
 
 #ifndef NDBD_MULTITHREADED
@@ -291,6 +95,9 @@ static TransporterCallbackKernelNonMT my
 TransporterRegistry globalTransporterRegistry(&myTransporterCallback);
 #endif
 
+#ifdef NDBD_MULTITHREADED
+static SectionSegmentPool::Cache cache(1024,1024);
+#endif
 
 void
 TransporterCallbackKernel::deliver_signal(SignalHeader * const header,
@@ -319,11 +126,11 @@ TransporterCallbackKernel::deliver_signa
 
   switch(secCount){
   case 3:
-    ok &= import(secPtr[2], ptr[2].p, ptr[2].sz);
+    ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
   case 2:
-    ok &= import(secPtr[1], ptr[1].p, ptr[1].sz);
+    ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
   case 1:
-    ok &= import(secPtr[0], ptr[0].p, ptr[0].sz);
+    ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
   }
 
   /**
@@ -357,16 +164,14 @@ TransporterCallbackKernel::deliver_signa
   /**
    * Out of memory
    */
-  MT_SECTION_LOCK
   for(Uint32 i = 0; i<secCount; i++){
     if(secPtr[i].p != 0){
-      g_sectionSegmentPool.releaseList(relSz(secPtr[i].p->m_sz), 
+      g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
+                                       relSz(secPtr[i].p->m_sz),
                                        secPtr[i].i, 
 				       secPtr[i].p->m_lastSegment);
     }
   }
-  MT_SECTION_UNLOCK
-
 
   SignalDroppedRep * rep = (SignalDroppedRep*)theData;
   Uint32 gsn = header->theVerId_signalNumber;
@@ -399,34 +204,6 @@ operator<<(NdbOut& out, const SectionSeg
 }
 
 void
-print(SectionSegment * s, Uint32 len, FILE* out){
-  for(Uint32 i = 0; i<len; i++){
-    fprintf(out, "H\'0x%.8x ", s->theData[i]);
-    if(((i + 1) % 6) == 0)
-      fprintf(out, "\n");
-  }
-}
-
-void
-print(SegmentedSectionPtr ptr, FILE* out){
-
-  ptr.p = g_sectionSegmentPool.getPtr(ptr.i);
-  Uint32 len = ptr.p->m_sz;
-  
-  fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr.i, ptr.p, len, ptr.sz);
-  while(len > SectionSegment::DataLength){
-    print(ptr.p, SectionSegment::DataLength, out);
-    
-    len -= SectionSegment::DataLength;
-    fprintf(out, "ptr.i = %d\n", ptr.p->m_nextSegment);
-    ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment);
-  }
-  
-  print(ptr.p, len, out);
-  fprintf(out, "\n");
-}
-
-void
 TransporterCallbackKernel::reportError(NodeId nodeId,
                                        TransporterError errorCode,
                                        const char *info)

=== added file 'storage/ndb/src/kernel/vm/mt-asm.h'
--- a/storage/ndb/src/kernel/vm/mt-asm.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-asm.h	2008-10-08 19:43:24 +0000
@@ -0,0 +1,66 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+/**
+ * Only memory barriers *must* be ported
+ * if XCNG (x86-sematics) is provided, spinlocks will be enabled
+ */
+#ifndef NDB_MT_ASM_H
+#define NDB_MT_ASM_H
+
+#if defined(__GNUC__)
+#if defined(__x86_64__)
+/* Memory barriers, these definitions are for x64_64. */
+#define mb()    asm volatile("mfence":::"memory")
+/* According to Intel docs, it does not reorder loads. */
+//#define rmb() asm volatile("lfence":::"memory")                               
+#define rmb()   asm volatile("" ::: "memory")
+#define wmb()   asm volatile("" ::: "memory")
+#define read_barrier_depends()  do {} while(0)
+
+#define NDB_HAVE_XCNG
+static inline
+int
+xcng(volatile unsigned * addr, int val)
+{
+  asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
+  return val;
+}
+static
+inline
+void
+cpu_pause()
+{
+  asm volatile ("rep;nop");
+}
+
+#elif defined(__sparc__)
+#define mb()    asm volatile("membar #LoadLoad | #LoadStore | #StoreLoad | #StoreStore":::"memory)
+#define rmb()   asm volatile("membar #LoadLoad" ::: "memory")
+#define wmb()   asm volatile("membar #StoreStore" ::: "memory")
+#define read_barrier_depends()  do {} while(0)
+
+// link error if used incorrectly (i.e wo/ having NDB_HAVE_XCNG)
+extern  int xcng(volatile unsigned * addr, int val);
+extern void cpu_pause();
+
+#elif
+#error "Unsupported architecture"
+#endif
+#else
+#error "Unsupported compiler"
+#endif
+
+#endif

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2008-10-07 10:26:07 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2008-10-08 19:43:24 +0000
@@ -1,3 +1,18 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -25,6 +40,8 @@
 #include <signaldata/StopForCrash.hpp>
 #include "TransporterCallbackKernel.hpp"
 
+#include "mt-asm.h"
+
 #ifdef __GNUC__
 /* Provides a small (but noticeable) speedup in benchmarks. */
 #define memcpy __builtin_memcpy
@@ -54,38 +71,7 @@ static Uint32 ndbmt_threads = 0;
 static Uint32 num_threads = 0;
 static Uint32 receiver_thread_no = 0;
 
-#ifdef NDBMTD_X86
-static inline
-int
-xcng(volatile unsigned * addr, int val)
-{
-  asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
-  return val;
-}
-
-/**
- * from ?md/?ntel manual "spinlock howto"
- */
-static
-inline
-void
-cpu_pause()
-{
-  asm volatile ("rep;nop");
-}
-
-/* Memory barriers, these definitions are for x64_64. */
-#define mb() 	asm volatile("mfence":::"memory")
-/* According to Intel docs, it does not reorder loads. */
-//#define rmb()	asm volatile("lfence":::"memory")
-#define rmb()	asm volatile("" ::: "memory")
-#define wmb()	asm volatile("" ::: "memory")
-#define read_barrier_depends()	do {} while(0)
-#else
-#error "Unsupported architecture"
-#endif
-
-#ifdef HAVE_LINUX_FUTEX
+#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
 #define USE_FUTEX
 #endif
 
@@ -233,6 +219,7 @@ require(bool x)
     abort();
 }
 
+#ifdef NDB_HAVE_XCNG
 struct thr_spin_lock
 {
   thr_spin_lock(const char * name = 0)
@@ -247,17 +234,6 @@ struct thr_spin_lock
   volatile Uint32 m_lock;
 };
 
-struct thr_mutex
-{
-  thr_mutex(const char * name = 0) {
-    m_mutex = NdbMutex_Create();
-    m_name = name;
-  }
-
-  const char * m_name;
-  NdbMutex * m_mutex;
-};
-
 static
 inline
 void
@@ -305,6 +281,20 @@ trylock(struct thr_spin_lock* sl)
   volatile unsigned* val = &sl->m_lock;
   return xcng(val, 1);
 }
+#else
+#define thr_spin_lock thr_mutex
+#endif
+
+struct thr_mutex
+{
+  thr_mutex(const char * name = 0) {
+    m_mutex = NdbMutex_Create();
+    m_name = name;
+  }
+
+  const char * m_name;
+  NdbMutex * m_mutex;
+};
 
 static
 inline
@@ -527,6 +517,8 @@ struct thr_data
   /* Used for sendpacked (send-at-job-buffer-end). */
   Uint32 m_instance_count;
   BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
+
+  SectionSegmentPool::Cache m_sectionPoolCache;
 };
 
 template<typename T>
@@ -1880,16 +1872,20 @@ execute_signals(thr_data *selfptr, thr_j
      * (Though on Intel Core 2, they do not give much speedup, as apparently
      * the hardware prefetcher is already doing a fairly good job).
      */
+#ifdef __GNUC__
     __builtin_prefetch (read_buffer->m_data + read_pos + 16, 0, 3);
     __builtin_prefetch ((Uint32 *)&sig->header + 16, 1, 3);
+#endif
 
     /* Now execute the signal. */
     SignalHeader* s =
       reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
     Uint32 seccnt = s->m_noOfSections;
     Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
+#ifdef __GNUC__
     if(siglen>16)
       __builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
+#endif
     Uint32 bno = blockToMain(s->theReceiversBlockNumber);
     SimulatedBlock* block = globalData.getBlock(bno);
 
@@ -2015,7 +2011,12 @@ add_thr_map(Uint32 main, Uint32 instance
   assert(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
   thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
 
-  b->assignToThread(thr_no, &thr_ptr->m_jam, &thr_ptr->m_watchdog_counter);
+  SimulatedBlock::ThreadContext ctx;
+  ctx.threadId = thr_no;
+  ctx.jamBuffer = &thr_ptr->m_jam;
+  ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
+  ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
+  b->assignToThread(ctx);
 
   /* Create entry mapping block to thread. */
   thr_map_entry& entry = thr_map[index][instance];

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	2008-08-15 11:01:41 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp	2008-10-08 19:09:05 +0000
@@ -1,3 +1,18 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
 #include <kernel_types.h>
 #include <TransporterDefinitions.hpp>
 

=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2008-09-14 20:00:09 +0000
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2008-10-09 09:03:01 +0000
@@ -649,7 +649,7 @@ private:
   NdbMutex *m_add_drop_mutex;
 
   inline Gci_container* find_bucket(Uint64 gci){
-    Uint32 pos = (gci & ACTIVE_GCI_MASK);
+    Uint32 pos = (Uint32)(gci & ACTIVE_GCI_MASK);
     Gci_container *bucket= ((Gci_container*)(m_active_gci.getBase())) + pos;
     if(likely(gci == bucket->m_gci))
       return bucket;

Thread
bzr push into mysql-5.1 branch (stewart:2986 to 2991) Stewart Smith9 Oct