List:Commits« Previous MessageNext Message »
From:jonas oreland Date:February 7 2012 7:41pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3806 to 3807)
View as plain text  
 3807 jonas oreland	2012-02-07 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/cmake/os/WindowsCache.cmake
      storage/ndb/include/ndb_config.h.in
      storage/ndb/include/util/Bitmask.hpp
      storage/ndb/ndb_configure.cmake
      storage/ndb/src/common/util/Bitmask.cpp
      storage/ndb/src/kernel/vm/DynArr256.cpp
      storage/ndb/src/kernel/vm/DynArr256.hpp
 3806 jonas oreland	2012-02-03 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/include/ndb_version.h.in
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/ndbd.cpp
=== modified file 'storage/ndb/cmake/os/WindowsCache.cmake'
--- a/storage/ndb/cmake/os/WindowsCache.cmake	2011-05-24 08:45:38 +0000
+++ b/storage/ndb/cmake/os/WindowsCache.cmake	2012-02-07 15:41:33 +0000
@@ -43,7 +43,10 @@ SET(HAVE_PTHREAD_MUTEXATTR_SETTYPE CACHE
 SET(HAVE_PTHREAD_SETSCHEDPARAM CACHE INTERNAL "")
 SET(HAVE_SUN_PREFETCH_H CACHE INTERNAL "")
 SET(HAVE___BUILTIN_FFS CACHE INTERNAL "")
+SET(HAVE___BUILTIN_CTZ CACHE INTERNAL "")
+SET(HAVE___BUILTIN_CLZ CACHE INTERNAL "")
 SET(HAVE__BITSCANFORWARD 1 CACHE INTERNAL "")
+SET(HAVE__BITSCANREVERSE 1 CACHE INTERNAL "")
 SET(HAVE_LINUX_SCHEDULING CACHE INTERNAL "")
 SET(HAVE_SOLARIS_AFFINITY CACHE INTERNAL "")
 SET(HAVE_LINUX_FUTEX CACHE INTERNAL "")

=== modified file 'storage/ndb/include/ndb_config.h.in'
--- a/storage/ndb/include/ndb_config.h.in	2011-03-15 15:50:34 +0000
+++ b/storage/ndb/include/ndb_config.h.in	2012-02-07 15:41:33 +0000
@@ -38,7 +38,10 @@
 #cmakedefine HAVE_MLOCK 1
 #cmakedefine HAVE_FFS 1
 #cmakedefine HAVE___BUILTIN_FFS 1
+#cmakedefine HAVE___BUILTIN_CTZ 1
+#cmakedefine HAVE___BUILTIN_CLZ 1
 #cmakedefine HAVE__BITSCANFORWARD 1
+#cmakedefine HAVE__BITSCANREVERSE 1
 #cmakedefine HAVE_PTHREAD_MUTEXATTR_INIT 1
 #cmakedefine HAVE_PTHREAD_MUTEXATTR_SETTYPE 1
 #cmakedefine HAVE_PTHREAD_SETSCHEDPARAM 1

=== modified file 'storage/ndb/include/util/Bitmask.hpp'
--- a/storage/ndb/include/util/Bitmask.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/util/Bitmask.hpp	2012-02-07 15:41:33 +0000
@@ -20,7 +20,7 @@
 
 #include <ndb_global.h>
 
-#ifdef HAVE__BITSCANFORWARD
+#if defined(HAVE__BITSCANFORWARD) || defined(HAVE__BITSCANREVERSE)
 #include <intrin.h>
 #endif
 
@@ -93,24 +93,54 @@ public:
   static unsigned count(unsigned size, const Uint32 data[]);
 
   /**
+   * return count trailing zero bits inside a word
+   * undefined behaviour if non set
+   */
+  static unsigned ctz(Uint32 x);
+
+  /**
+   * return count leading zero bits inside a word
+   * undefined behaviour if non set
+   */
+  static unsigned clz(Uint32 x);
+
+  /**
    * return index of first bit set inside a word
    * undefined behaviour if non set
    */
   static unsigned ffs(Uint32 x);
 
   /**
+   * return index of last bit set inside a word
+   * undefined behaviour if non set
+   */
+  static unsigned fls(Uint32 x);
+
+  /**
    * find - Find first set bit, starting from 0
    * Returns NotFound when not found.
    */
   static unsigned find_first(unsigned size, const Uint32 data[]);
 
   /**
+   * find - Find last set bit, starting from 0
+   * Returns NotFound when not found.
+   */
+  static unsigned find_last(unsigned size, const Uint32 data[]);
+
+  /**
    * find - Find first set bit, starting at given position.
    * Returns NotFound when not found.
    */
   static unsigned find_next(unsigned size, const Uint32 data[], unsigned n);
 
   /**
+   * find - Find last set bit, starting at given position.
+   * Returns NotFound when not found.
+   */
+  static unsigned find_prev(unsigned size, const Uint32 data[], unsigned n);
+
+  /**
    * find - Find first set bit, starting at given position.
    * Returns NotFound when not found.
    */
@@ -358,6 +388,68 @@ BitmaskImpl::count(unsigned size, const
 }
 
 /**
+ * return count trailing zero bits inside a word
+ * undefined behaviour if non set
+ */
+inline
+Uint32
+BitmaskImpl::ctz(Uint32 x)
+{
+  return ffs(x);
+}
+
+/**
+ * return count leading bits inside a word
+ * undefined behaviour if non set
+ */
+inline
+Uint32
+BitmaskImpl::clz(Uint32 x)
+{
+#if defined HAVE___BUILTIN_CLZ
+  return __builtin_clz(x);
+#elif defined(__GNUC__) && (defined(__x86_64__) || defined (__i386__))
+  asm("bsr %1,%0"
+      : "=r" (x)
+      : "rm" (x));
+  return 31 - x;
+#elif defined HAVE__BITSCANREVERSE
+  unsigned long r;
+  unsigned char res = _BitScanReverse(&r, (unsigned long)x);
+  assert(res > 0);
+  return 31 - (Uint32)r;
+#else
+  int b = 0;
+  if (!(x & 0xffff0000))
+  {
+    x <<= 16;
+    b += 16;
+  }
+  if (!(x & 0xff000000))
+  {
+    x <<= 8;
+    b += 8;
+  }
+  if (!(x & 0xf0000000))
+  {
+    x <<= 4;
+    b += 4;
+  }
+  if (!(x & 0xc0000000))
+  {
+    x <<= 2;
+    b += 2;
+  }
+  if (!(x & 0x80000000))
+  {
+    x <<= 1;
+    b += 1;
+  }
+  return b;
+#endif
+}
+
+/**
  * return index of first bit set inside a word
  * undefined behaviour if non set
  */
@@ -365,7 +457,9 @@ inline
 Uint32
 BitmaskImpl::ffs(Uint32 x)
 {
-#if defined(__GNUC__) && (defined(__x86_64__) || defined (__i386__))
+#if defined HAVE___BUILTIN_CTZ
+  return __builtin_ctz(x);
+#elif defined(__GNUC__) && (defined(__x86_64__) || defined (__i386__))
   asm("bsf %1,%0"
       : "=r" (x)
       : "rm" (x));
@@ -413,6 +507,57 @@ BitmaskImpl::ffs(Uint32 x)
 #endif
 }
 
+/**
+ * return index of last bit set inside a word
+ * undefined behaviour if non set
+ */
+inline
+Uint32
+BitmaskImpl::fls(Uint32 x)
+{
+#if defined(__GNUC__) && (defined(__x86_64__) || defined (__i386__))
+  asm("bsr %1,%0"
+      : "=r" (x)
+      : "rm" (x));
+  return x;
+#elif defined HAVE___BUILTIN_CLZ
+  return 31 - __builtin_clz(x);
+#elif defined HAVE__BITSCANREVERSE
+  unsigned long r;
+  unsigned char res = _BitScanReverse(&r, (unsigned long)x);
+  assert(res > 0);
+  return (Uint32)r;
+#else
+  int b = 31;
+  if (!(x & 0xffff0000))
+  {
+    x <<= 16;
+    b -= 16;
+  }
+  if (!(x & 0xff000000))
+  {
+    x <<= 8;
+    b -= 8;
+  }
+  if (!(x & 0xf0000000))
+  {
+    x <<= 4;
+    b -= 4;
+  }
+  if (!(x & 0xc0000000))
+  {
+    x <<= 2;
+    b -= 2;
+  }
+  if (!(x & 0x80000000))
+  {
+    x <<= 1;
+    b -= 1;
+  }
+  return b;
+#endif
+}
+
 inline unsigned
 BitmaskImpl::find_first(unsigned size, const Uint32 data[])
 {
@@ -430,8 +575,29 @@ BitmaskImpl::find_first(unsigned size, c
 }
 
 inline unsigned
+BitmaskImpl::find_last(unsigned size, const Uint32 data[])
+{
+  if (size == 0)
+    return NotFound;
+  Uint32 n = (size << 5) - 1;
+  do
+  {
+    Uint32 val = data[n >> 5];
+    if (val)
+    {
+      return n - clz(val);
+    }
+    n -= 32;
+  } while (n != 0xffffffff);
+ return NotFound;
+}
+
+inline unsigned
 BitmaskImpl::find_next(unsigned size, const Uint32 data[], unsigned n)
 {
+  assert(n <= (size << 5));
+  if (n == (size << 5)) // allow one step utside for easier use
+    return NotFound;
   Uint32 val = data[n >> 5];
   Uint32 b = n & 31;
   if (b)
@@ -457,6 +623,35 @@ BitmaskImpl::find_next(unsigned size, co
 }
 
 inline unsigned
+BitmaskImpl::find_prev(unsigned size, const Uint32 data[], unsigned n)
+{
+  if (n >= (Uint32) 0xffffffff /* -1 */) // allow one bit outside array for easier use
+    return NotFound;
+  assert(n < (size << 5));
+  Uint32 val = data[n >> 5];
+  Uint32 b = n & 31;
+  if (b < 31)
+  {
+    val <<= 31 - b;
+    if (val)
+    {
+      return n - clz(val);
+    }
+    n -= b + 1;
+  }
+
+  while (n != NotFound) {
+    val = data[n >> 5];
+    if (val)
+    {
+      return n - clz(val);
+    }
+    n -= 32;
+  }
+  return NotFound;
+}
+
+inline unsigned
 BitmaskImpl::find(unsigned size, const Uint32 data[], unsigned n)
 {
   return find_next(size, data, n);
@@ -742,6 +937,20 @@ public:
   unsigned find_next(unsigned n) const;
 
   /**
+   * find - Find last set bit, starting at 0
+   * Returns NotFound when not found.
+   */
+  static unsigned find_last(const Uint32 data[]);
+  unsigned find_last() const;
+
+  /**
+   * find - Find previous set bit, starting at n
+   * Returns NotFound when not found.
+   */
+  static unsigned find_prev(const Uint32 data[], unsigned n);
+  unsigned find_prev(unsigned n) const;
+
+  /**
    * find - Find first set bit, starting at given position.
    * Returns NotFound when not found.
    */
@@ -1024,6 +1233,34 @@ BitmaskPOD<size>::find_next(unsigned n)
 }
 
 template <unsigned size>
+inline unsigned
+BitmaskPOD<size>::find_last(const Uint32 data[])
+{
+  return BitmaskImpl::find_last(size, data);
+}
+
+template <unsigned size>
+inline unsigned
+BitmaskPOD<size>::find_last() const
+{
+  return BitmaskPOD<size>::find_last(rep.data);
+}
+
+template <unsigned size>
+inline unsigned
+BitmaskPOD<size>::find_prev(const Uint32 data[], unsigned n)
+{
+  return BitmaskImpl::find_prev(size, data, n);
+}
+
+template <unsigned size>
+inline unsigned
+BitmaskPOD<size>::find_prev(unsigned n) const
+{
+  return BitmaskPOD<size>::find_prev(rep.data, n);
+}
+
+template <unsigned size>
 inline unsigned
 BitmaskPOD<size>::find(const Uint32 data[], unsigned n)
 {

=== modified file 'storage/ndb/ndb_configure.cmake'
--- a/storage/ndb/ndb_configure.cmake	2011-10-14 08:26:28 +0000
+++ b/storage/ndb/ndb_configure.cmake	2012-02-07 19:40:05 +0000
@@ -76,6 +76,24 @@ int main()
 }"
 HAVE___BUILTIN_FFS)
 
+CHECK_CXX_SOURCE_COMPILES("
+unsigned A = 7;
+int main()
+{
+  unsigned a = __builtin_ctz(A);
+  return 0;
+}"
+HAVE___BUILTIN_CTZ)
+
+CHECK_CXX_SOURCE_COMPILES("
+unsigned A = 7;
+int main()
+{
+  unsigned a = __builtin_clz(A);
+  return 0;
+}"
+HAVE___BUILTIN_CLZ)
+
 CHECK_C_SOURCE_COMPILES("
 #include <intrin.h>
 unsigned long A = 7;
@@ -87,6 +105,17 @@ int main()
 }"
 HAVE__BITSCANFORWARD)
 
+CHECK_C_SOURCE_COMPILES("
+#include <intrin.h>
+unsigned long A = 7;
+int main()
+{
+  unsigned long a;
+  unsigned char res = _BitScanReverse(&a, A);
+  return (int)a;
+}"
+HAVE__BITSCANREVERSE)
+
 # Linux scheduling and locking support
 CHECK_C_SOURCE_COMPILES("
 #ifndef _GNU_SOURCE

=== modified file 'storage/ndb/src/common/util/Bitmask.cpp'
--- a/storage/ndb/src/common/util/Bitmask.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/common/util/Bitmask.cpp	2012-02-07 15:41:33 +0000
@@ -368,6 +368,28 @@ test_find_fast(Result& res, const Bitmas
 template<unsigned sz>
 inline
 void
+test_find_fast_reversed(Result& res, const Bitmask<sz> & mask, unsigned iter, FUNC func)
+{
+  Uint32 sum = 0;
+  Uint64 start = NdbTick_CurrentMillisecond();
+  for (Uint32 j = 0; j<iter; j++)
+  {
+
+    for (Uint32 n = BitmaskImpl::find_last(sz, mask.rep.data);
+         n != mask.NotFound;
+         n = BitmaskImpl::find_prev(sz, mask.rep.data, n - 1))
+    {
+      sum += (* func)(n);
+    }
+  }
+  Uint64 stop = NdbTick_CurrentMillisecond();
+  res.sum += sum;
+  res.elapsed += (stop - start);
+}
+
+template<unsigned sz>
+inline
+void
 test_toArray(Result& res, const Bitmask<sz> & mask, unsigned iter, FUNC func)
 {
   Uint32 sum = 0;
@@ -412,7 +434,7 @@ do_test(Uint32 len, FUNC func, const cha
   if (func == slow)
     iter = 3000;
 
-  Result res_find, res_fast, res_toArray, res_empty;
+  Result res_find, res_fast, res_fast_reversed, res_toArray, res_empty;
   for (Uint32 i = 0; i < (10000 / len); i++)
   {
     Bitmask<8> tmp;
@@ -437,6 +459,7 @@ do_test(Uint32 len, FUNC func, const cha
     }
     test_find(res_find, tmp, iter, func);
     test_find_fast(res_fast, tmp, iter, func);
+    test_find_fast_reversed(res_fast_reversed, tmp, iter, func);
     test_toArray(res_toArray, tmp, iter, func);
     test_empty(res_empty, len, iter, func);
   }
@@ -444,7 +467,8 @@ do_test(Uint32 len, FUNC func, const cha
   res_find.elapsed = sub0(res_find.elapsed, res_empty.elapsed);
   res_toArray.elapsed = sub0(res_toArray.elapsed, res_empty.elapsed);
   res_fast.elapsed = sub0(res_fast.elapsed, res_empty.elapsed);
-  Uint64 m = x_min(res_find.elapsed, res_toArray.elapsed, res_fast.elapsed);
+  res_fast_reversed.elapsed = sub0(res_fast_reversed.elapsed, res_empty.elapsed);
+  Uint64 m = x_min(res_find.elapsed, res_toArray.elapsed, res_fast_reversed.elapsed);
   if (m == 0)
     m = 1;
 
@@ -466,6 +490,16 @@ do_test(Uint32 len, FUNC func, const cha
          printf("toArray(%s,%s, %u) : %llu ns/iter (%.3u%%), (sum: %u)\n",
          dist, name, len,
          (1000000 * res_toArray.elapsed / div),
+         Uint32((100 * res_toArray.elapsed) / m),
+         res_toArray.sum);
+  printf("reversed(%s,%s, %u)    : %llu ns/iter (%.3u%%), (sum: %u)\n",
+         dist, name, len,
+         (1000000 * res_fast_reversed.elapsed / div),
+         Uint32((100 * res_fast_reversed.elapsed) / m),
+         res_fast_reversed.sum);
+         printf("toArray(%s,%s, %u) : %llu ns/iter (%.3u%%), (sum: %u)\n",
+         dist, name, len,
+         (1000000 * res_toArray.elapsed / div),
          Uint32((100 * res_toArray.elapsed) / m),
          res_toArray.sum);
   printf("\n");

=== modified file 'storage/ndb/src/kernel/vm/DynArr256.cpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.cpp	2011-12-23 17:22:50 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.cpp	2012-02-07 19:40:05 +0000
@@ -44,6 +44,8 @@ struct DA256Page
 {
   struct DA256CL m_header[2];
   struct DA256Node m_nodes[30];
+
+  bool get(Uint32 node, Uint32 idx, Uint32 type_id, Uint32*& val_ptr) const;
 };
 
 #undef require
@@ -52,18 +54,31 @@ struct DA256Page
 //#define DA256_USE_PREFETCH
 #define DA256_EXTRA_SAFE
 
+#ifdef TAP_TEST
+#define UNIT_TEST
+#include "NdbTap.hpp"
+#endif
 
 #ifdef UNIT_TEST
+#include "my_sys.h"
 #ifdef USE_CALLGRIND
 #include <valgrind/callgrind.h>
 #else
 #define CALLGRIND_TOGGLE_COLLECT()
 #endif
+Uint32 verbose = 0;
 Uint32 allocatedpages = 0;
 Uint32 allocatednodes = 0;
 Uint32 releasednodes = 0;
 #endif
 
+static
+inline
+Uint32 div15(Uint32 x)
+{
+  return ((x << 8) + (x << 4) + x + 255) >> 12;
+}
+
 inline
 void
 require_impl(bool x, int line)
@@ -97,6 +112,35 @@ DynArr256Pool::init(NdbMutex* m, Uint32
   m_mutex = m;
 }
 
+inline
+bool
+DA256Page::get(Uint32 node, Uint32 idx, Uint32 type_id, Uint32*& val_ptr) const
+{
+  Uint32 *magic_ptr, p;
+  if (idx != 255)
+  {
+    Uint32 line = div15(idx);
+    Uint32* ptr = (Uint32*)(m_nodes + node);
+
+    p = 0;
+    val_ptr = (ptr + 1 + idx + line);
+    magic_ptr =(ptr + (idx & ~15));
+  }
+  else
+  {
+    Uint32 b = (node + 1) >> 4;
+    Uint32 * ptr = (Uint32*)(m_header+b);
+
+    p = node - (b << 4) + b;
+    val_ptr = (ptr + 1 + p);
+    magic_ptr = ptr;
+  }
+
+  Uint32 magic = *magic_ptr;
+
+  return ((magic & (1 << p)) && (magic >> 16) == type_id);
+}
+
 static const Uint32 g_max_sizes[5] = { 0, 256, 65536, 16777216, ~0 };
 
 /**
@@ -142,34 +186,13 @@ DynArr256::get(Uint32 pos) const
     Uint32 page_no = ptrI >> DA256_BITS;
     Uint32 page_idx = ptrI & DA256_MASK;
     DA256Page * page = memroot + page_no;
-    
-    Uint32 *magic_ptr, p;
-    if (p0 != 255)
-    {
-      Uint32 line = ((p0 << 8) + (p0 << 4) + p0 + 255) >> 12;
-      Uint32 * ptr = (Uint32*)(page->m_nodes + page_idx);
-      
-      p = 0;
-      retVal = (ptr + 1 + p0 + line);
-      magic_ptr =(ptr + (p0 & ~15));
-    }
-    else
-    {
-      Uint32 b = (page_idx + 1) >> 4;
-      Uint32 * ptr = (Uint32*)(page->m_header+b);
-      
-      p = page_idx - (b << 4) + b;
-      retVal = (ptr + 1 + p);
-      magic_ptr = ptr;
-    }
-    
-    ptrI = *retVal;
-    Uint32 magic = *magic_ptr;
-    
-    if (unlikely(! ((magic & (1 << p)) && (magic >> 16) == type_id)))
+
+    if (unlikely(! page->get(page_idx, p0, type_id, retVal)))
       goto err;
+
+    ptrI = *retVal;
   }
-  
+
   return retVal;
 err:
   require(false);
@@ -222,31 +245,10 @@ DynArr256::set(Uint32 pos)
     Uint32 page_idx = ptrI & DA256_MASK;
     DA256Page * page = memroot + page_no;
     
-    Uint32 *magic_ptr, p;
-    if (p0 != 255)
-    {
-      Uint32 line = ((p0 << 8) + (p0 << 4) + p0 + 255) >> 12;
-      Uint32 * ptr = (Uint32*)(page->m_nodes + page_idx);
+    if (unlikely(! page->get(page_idx, p0, type_id, retVal)))
+      goto err;
 
-      p = 0;
-      magic_ptr = (ptr + (p0 & ~15));
-      retVal = (ptr + 1 + p0 + line);
-    }
-    else
-    {
-      Uint32 b = (page_idx + 1) >> 4;
-      Uint32 * ptr = (Uint32*)(page->m_header+b);
-      
-      p = page_idx - (b << 4) + b;
-      magic_ptr = ptr;
-      retVal = (ptr + 1 + p);
-    }
-     
     ptrI = * retVal;
-    Uint32 magic = *magic_ptr;
-
-    if (unlikely(! ((magic & (1 << p)) && (magic >> 16) == type_id)))
-      goto err;
   } 
   
   return retVal;
@@ -355,8 +357,7 @@ void
 DynArr256::init(ReleaseIterator &iter)
 {
   iter.m_sz = 1;
-  iter.m_pos = 0;
-  iter.m_ptr_i[0] = RNIL;
+  iter.m_pos = ~(~0U << (8 * m_head.m_sz));
   iter.m_ptr_i[1] = m_head.m_ptr_i;
   iter.m_ptr_i[2] = RNIL;
   iter.m_ptr_i[3] = RNIL;
@@ -371,92 +372,83 @@ DynArr256::init(ReleaseIterator &iter)
  * 2 - no data
  */
 Uint32
-DynArr256::release(ReleaseIterator &iter, Uint32 * retptr)
+DynArr256::truncate(Uint32 keep_pos, ReleaseIterator& iter, Uint32* ptrVal)
 {
-  Uint32 sz = iter.m_sz;
-  Uint32 ptrI = iter.m_ptr_i[sz];
-  Uint32 page_no = ptrI >> DA256_BITS;
-  Uint32 page_idx = ptrI & DA256_MASK;
   Uint32 type_id = (~m_pool.m_type_id) & 0xFFFF;
   DA256Page * memroot = m_pool.m_memroot;
-  DA256Page * page = memroot + page_no;
 
-  if (ptrI != RNIL)
+  for (;;)
   {
-    Uint32 p0 = iter.m_pos & 255;
-    for (; p0<256; p0++)
+    if (iter.m_sz == 0 ||
+        iter.m_pos < keep_pos ||
+        m_head.m_sz == 0)
     {
-      Uint32 *retVal, *magic_ptr, p;
-      if (p0 != 255)
+      return 0;
+    }
+
+    Uint32* refPtr;
+    Uint32 ptrI = iter.m_ptr_i[iter.m_sz];
+    Uint32 page_no = ptrI >> DA256_BITS;
+    Uint32 page_idx = (ptrI & DA256_MASK) ;
+    DA256Page* page = memroot + page_no;
+    Uint32 node_index = (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) & 255;
+    bool is_value = (iter.m_sz == m_head.m_sz);
+
+    if (unlikely(! page->get(page_idx, node_index, type_id, refPtr)))
+    {
+      require(false);
+    }
+    assert(refPtr != NULL);
+    *ptrVal = *refPtr;
+
+    if (iter.m_sz == 1 &&
+        (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) == 0)
+    {
+      assert(iter.m_ptr_i[iter.m_sz] == m_head.m_ptr_i);
+      assert(iter.m_ptr_i[iter.m_sz + 1] == RNIL);
+      iter.m_ptr_i[iter.m_sz] = is_value ? RNIL : *refPtr;
+      m_pool.release(m_head.m_ptr_i);
+      m_head.m_sz --;
+      m_head.m_ptr_i = iter.m_ptr_i[iter.m_sz];
+      return is_value ? 1 : 2;
+    }
+
+    if (is_value || iter.m_ptr_i[iter.m_sz + 1] == *refPtr)
+    { // sz--
+      Uint32 ptrI = *refPtr;
+      if (!is_value)
       {
-	Uint32 line = ((p0 << 8) + (p0 << 4) + p0 + 255) >> 12;
-	Uint32 * ptr = (Uint32*)(page->m_nodes + page_idx);
-	
-	p = 0;
-	retVal = (ptr + 1 + p0 + line);
-	magic_ptr =(ptr + (p0 & ~15));
+        if (ptrI != RNIL)
+        {
+          m_pool.release(ptrI);
+          *refPtr = iter.m_ptr_i[iter.m_sz+1] = RNIL;
+        }
       }
-      else
+      if (node_index == 0)
       {
-	Uint32 b = (page_idx + 1) >> 4;
-	Uint32 * ptr = (Uint32*)(page->m_header+b);
-	
-	p = page_idx - (b << 4) + b;
-	retVal = (ptr + 1 + p);
-	magic_ptr = ptr;
+        iter.m_sz --;
       }
-      
-      Uint32 magic = *magic_ptr;
-      Uint32 val = *retVal;
-      if (unlikely(! ((magic & (1 << p)) && (magic >> 16) == type_id)))
-	goto err;
-      
-      if (sz == m_head.m_sz)
+      else if (!is_value && ptrI == RNIL)
       {
-	* retptr = val;
-	p0++;
-	if (p0 != 256)
-	{
-	  /**
-	   * Move next
-	   */
-	  iter.m_pos &= ~(Uint32)255;
-	  iter.m_pos |= p0;
-	}
-	else
-	{
-	  /**
-	   * Move up
-	   */
-	  m_pool.release(ptrI);
-	  iter.m_sz --;
-	  iter.m_pos >>= 8;
-	}
-	return 1;
+        assert((~iter.m_pos & ~(0xffffffff << (8 * (m_head.m_sz - iter.m_sz)))) == 0);
+        iter.m_pos -= 1U << (8 * (m_head.m_sz - iter.m_sz));
       }
-      else if (val != RNIL)
+      else
       {
-	iter.m_sz++;
-	iter.m_ptr_i[iter.m_sz] = val;
-	iter.m_pos = (p0 << 8);
-	* retVal = RNIL;
-	return 2;
+        assert((iter.m_pos & ~(0xffffffff << (8 * (m_head.m_sz - iter.m_sz)))) == 0);
+        iter.m_pos --;
       }
+      if (is_value)
+        return 1;
+    }
+    else
+    { // sz++
+      assert(iter.m_ptr_i[iter.m_sz + 1] == RNIL);
+      iter.m_sz ++;
+      iter.m_ptr_i[iter.m_sz] = *refPtr;
+      return 2;
     }
-    
-    assert(p0 == 256);
-    m_pool.release(ptrI);
-    iter.m_sz --;
-    iter.m_pos >>= 8;
-    return 2;
   }
-  
-  new (&m_head) Head();
-  return 0;
-  
-err:
-  require(false);
-  return false;
 }
 
 static
@@ -638,10 +630,10 @@ DynArr256Pool::release(Uint32 ptrI)
 #ifdef UNIT_TEST
 
 static
-void
+bool
 simple(DynArr256 & arr, int argc, char* argv[])
 {
-  ndbout_c("argc: %d", argc);
+  if (verbose) ndbout_c("argc: %d", argc);
   for (Uint32 i = 1; i<(Uint32)argc; i++)
   {
     Uint32 * s = arr.set(atoi(argv[i]));
@@ -661,12 +653,13 @@ simple(DynArr256 & arr, int argc, char*
     
     Uint32 * g = arr.get(atoi(argv[i]));
     Uint32 v = g ? *g : ~0;
-    ndbout_c("p: %p %p %d", s, g, v);
+    if (verbose) ndbout_c("p: %p %p %d", s, g, v);
   }
+  return true;
 }
 
 static
-void
+bool
 basic(DynArr256& arr, int argc, char* argv[])
 {
 #define MAXLEN 65536
@@ -723,29 +716,19 @@ basic(DynArr256& arr, int argc, char* ar
     }
     }
   }
-}
-
-unsigned long long 
-micro()
-{
-  struct timeval tv;
-  gettimeofday(&tv, 0);
-  unsigned long long ret = tv.tv_sec;
-  ret *= 1000000;
-  ret += tv.tv_usec;
-  return ret;
+  return true;
 }
 
 static
-void
+bool
 read(DynArr256& arr, int argc, char ** argv)
 {
   Uint32 cnt = 100000;
   Uint64 mbytes = 16*1024;
-  Uint32 seed = time(0);
+  Uint32 seed = (Uint32) time(0);
   Uint32 seq = 0, seqmask = 0;
 
-  for (Uint32 i = 1; i<argc; i++)
+  for (int i = 1; i < argc; i++)
   {
     if (strncmp(argv[i], "--mbytes=", sizeof("--mbytes=")-1) == 0)
     {
@@ -767,10 +750,17 @@ read(DynArr256& arr, int argc, char ** a
   /**
    * Populate with 5Mb
    */
-  Uint32 maxidx = (1024*mbytes+31) / 32;
+
+  if (mbytes >= 134217720)
+  {
+    ndberr.println("--mbytes must be less than 134217720");
+    return false;
+  }
+  Uint32 maxidx = (Uint32)((1024*mbytes+31) / 32);
   Uint32 nodes = (maxidx+255) / 256;
   Uint32 pages = (nodes + 29)/ 30;
-  ndbout_c("%lldmb data -> %d entries (%dkb)",
+  if (verbose)
+    ndbout_c("%lldmb data -> %d entries (%dkb)",
 	   mbytes, maxidx, 32*pages);
   
   for (Uint32 i = 0; i<maxidx; i++)
@@ -788,13 +778,14 @@ read(DynArr256& arr, int argc, char ** a
     seqmask = ~(Uint32)0;
   }
 
-  ndbout_c("Timing %d %s reads (seed: %u)", cnt, 
+  if (verbose)
+    ndbout_c("Timing %d %s reads (seed: %u)", cnt,
 	   seq ? "sequential" : "random", seed);
 
   for (Uint32 i = 0; i<10; i++)
   {
     Uint32 sum0 = 0, sum1 = 0;
-    Uint64 start = micro();
+    Uint64 start = my_micro_time();
     for (Uint32 i = 0; i<cnt; i++)
     {
       Uint32 idx = ((rand() & (~seqmask)) + ((i + seq) & seqmask)) % maxidx;
@@ -802,22 +793,24 @@ read(DynArr256& arr, int argc, char ** a
       sum0 += idx;
       sum1 += *ptr;
     }
-    start = micro() - start;
-    float uspg = start; uspg /= cnt;
-    ndbout_c("Elapsed %lldus diff: %d -> %f us/get", start, sum0 - sum1, uspg);
+    start = my_micro_time() - start;
+    float uspg = (float)start; uspg /= cnt;
+    if (verbose)
+      ndbout_c("Elapsed %lldus diff: %d -> %f us/get", start, sum0 - sum1, uspg);
   }
+  return true;
 }
 
 static
-void
+bool
 write(DynArr256& arr, int argc, char ** argv)
 {
   Uint32 seq = 0, seqmask = 0;
   Uint32 cnt = 100000;
   Uint64 mbytes = 16*1024;
-  Uint32 seed = time(0);
+  Uint32 seed = (Uint32) time(0);
 
-  for (Uint32 i = 1; i<argc; i++)
+  for (int i = 1; i<argc; i++)
   {
     if (strncmp(argv[i], "--mbytes=", sizeof("--mbytes=")-1) == 0)
     {
@@ -839,10 +832,17 @@ write(DynArr256& arr, int argc, char **
   /**
    * Populate with 5Mb
    */
-  Uint32 maxidx = (1024*mbytes+31) / 32;
+
+  if (mbytes >= 134217720)
+  {
+    ndberr.println("--mbytes must be less than 134217720");
+    return false;
+  }
+  Uint32 maxidx = (Uint32)((1024*mbytes+31) / 32);
   Uint32 nodes = (maxidx+255) / 256;
   Uint32 pages = (nodes + 29)/ 30;
-  ndbout_c("%lldmb data -> %d entries (%dkb)",
+  if (verbose)
+    ndbout_c("%lldmb data -> %d entries (%dkb)",
 	   mbytes, maxidx, 32*pages);
 
   srand(seed);
@@ -853,25 +853,28 @@ write(DynArr256& arr, int argc, char **
     seqmask = ~(Uint32)0;
   }
 
-  ndbout_c("Timing %d %s writes (seed: %u)", cnt, 
+  if (verbose)
+    ndbout_c("Timing %d %s writes (seed: %u)", cnt,
 	   seq ? "sequential" : "random", seed);
   for (Uint32 i = 0; i<10; i++)
   {
-    Uint64 start = micro();
+    Uint64 start = my_micro_time();
     for (Uint32 i = 0; i<cnt; i++)
     {
       Uint32 idx = ((rand() & (~seqmask)) + ((i + seq) & seqmask)) % maxidx;
       Uint32 *ptr = arr.set(idx);
       *ptr = i;
     }
-    start = micro() - start;
-    float uspg = start; uspg /= cnt;
-    ndbout_c("Elapsed %lldus -> %f us/set", start, uspg);
+    start = my_micro_time() - start;
+    float uspg = (float)start; uspg /= cnt;
+    if (verbose)
+      ndbout_c("Elapsed %lldus -> %f us/set", start, uspg);
     DynArr256::ReleaseIterator iter;
     arr.init(iter);
     Uint32 val;
     while(arr.release(iter, &val));
   }
+  return true;
 }
 
 static
@@ -889,13 +892,37 @@ usage(FILE *f, int argc, char **argv)
 
 # include "test_context.hpp"
 
+#ifdef TAP_TEST
+static
+char* flatten(int argc, char** argv) /* NOT MT-SAFE */
+{
+  static char buf[10000];
+  size_t off = 0;
+  for (; argc > 0; argc--, argv++)
+  {
+    int i = 0;
+    if (off > 0 && (off + 1 < sizeof(buf)))
+      buf[off++] = ' ';
+    for (i = 0; (off + 1 < sizeof(buf)) && argv[0][i] != 0; i++, off++)
+      buf[off] = argv[0][i];
+    buf[off] = 0;
+  }
+  return buf;
+}
+#endif
+
 int
 main(int argc, char** argv)
 {
+#ifndef TAP_TEST
+  verbose = 1;
   if (argc == 1) {
     usage(stderr, argc, argv);
     exit(2);
   }
+#else
+  verbose = 0;
+#endif
 
   Pool_context pc = test_context(10000 /* pages */);
 
@@ -905,6 +932,42 @@ main(int argc, char** argv)
   DynArr256::Head head;
   DynArr256 arr(pool, head);
 
+#ifdef TAP_TEST
+  if (argc == 1)
+  {
+    char *argv[2] = { (char*)"dummy", NULL };
+    plan(5);
+    ok(simple(arr, 1, argv), "simple");
+    ok(basic(arr, 1, argv), "basic");
+    ok(read(arr, 1, argv), "read");
+    ok(write(arr, 1, argv), "write");
+  }
+  else if (strcmp(argv[1], "--simple") == 0)
+  {
+    plan(2);
+    ok(simple(arr, argc - 1, argv + 1), "simple %s", flatten(argc - 1, argv + 1));
+  }
+  else if (strcmp(argv[1], "--basic") == 0)
+  {
+    plan(2);
+    ok(basic(arr, argc - 1, argv + 1), "basic %s", flatten(argc - 1, argv + 1));
+  }
+  else if (strcmp(argv[1], "--read") == 0)
+  {
+    plan(2);
+    ok(read(arr, argc - 1, argv + 1), "read %s", flatten(argc - 1, argv + 1));
+  }
+  else if (strcmp(argv[1], "--write") == 0)
+  {
+    plan(2);
+    ok(write(arr, argc - 1, argv + 1), "write %s", flatten(argc - 1, argv + 1));
+  }
+  else
+  {
+    usage(stderr, argc, argv);
+    BAIL_OUT("Bad usage: %s %s", argv[0], flatten(argc - 1, argv + 1));
+  }
+#else
   if (strcmp(argv[1], "--simple") == 0)
     simple(arr, argc - 1, argv + 1);
   else if (strcmp(argv[1], "--basic") == 0)
@@ -918,34 +981,26 @@ main(int argc, char** argv)
     usage(stderr, argc, argv);
     exit(2);
   }
+#endif
 
   DynArr256::ReleaseIterator iter;
   arr.init(iter);
   Uint32 cnt = 0, val;
   while (arr.release(iter, &val)) cnt++;
   
-  ndbout_c("allocatedpages: %d allocatednodes: %d releasednodes: %d"
+  if (verbose)
+    ndbout_c("allocatedpages: %d allocatednodes: %d releasednodes: %d"
 	   " releasecnt: %d",
 	   allocatedpages, 
 	   allocatednodes,
 	   releasednodes,
 	   cnt);
-  
+#ifdef TAP_TEST
+  ok(allocatednodes == releasednodes, "release");
+  return exit_status();
+#else
   return 0;
-}
-
 #endif
-
-#ifdef TAP_TEST
-#include <NdbTap.hpp>
-#include "test_context.hpp"
-
-TAPTEST(DynArr256)
-{
-  Pool_context pc = test_context(100);
-
-  OK(true);
-
-  return 1;
 }
+
 #endif

=== modified file 'storage/ndb/src/kernel/vm/DynArr256.hpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.hpp	2012-02-07 19:40:05 +0000
@@ -79,6 +79,7 @@ public:
    *        2 - nodata
    */
   Uint32 release(ReleaseIterator&, Uint32* retptr);
+  Uint32 truncate(Uint32 keep_pos, ReleaseIterator&, Uint32* retptr);
 protected:
   Head & m_head;
   DynArr256Pool & m_pool;
@@ -87,4 +88,10 @@ protected:
   void handle_invalid_ptr(Uint32 pos, Uint32 ptrI, Uint32 p0);
 };
 
+inline
+Uint32 DynArr256::release(ReleaseIterator& iter, Uint32* retptr)
+{
+  return truncate(0, iter, retptr);
+}
+
 #endif

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3806 to 3807) jonas oreland8 Feb