List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:December 9 2008 5:50pm
Subject:bzr push into mysql-5.1 branch (frazer:3164 to 3167)
View as plain text  
 3167 Frazer Clement	2008-12-09
      WL4258 Long Signal Transactions
      
      Previous work in WL4258 modified NdbRecord based PK and UK operations to 
      send long signals to the kernel while non-NdbRecord PK and UK operations
      continued to send 'short' signals.
      
      This patch modifies NDBAPI to send long signals for non-NdbRecord PK and
      UK operations so that after this patch, *all* operations are sent using long signals.
      
      Most of the modifications are limited to the send-phase.
modified:
  storage/ndb/src/ndbapi/NdbOperationExec.cpp
  storage/ndb/src/ndbapi/NdbOperationSearch.cpp
  storage/ndb/src/ndbapi/NdbScanOperation.cpp

 3166 Frazer Clement	2008-12-09
      WL4258 Long Signal Transactions
      
      Correcting handling of Signal Dropped Rep for fragmented signals.
      
      When the transporter cannot deliver an incoming long signal to a worker thread
      due to a shortage of Section Segments, it instead delivers a Signal Dropped
      Report.
      
      When the signal is part of a larger fragmented signal, this can result in 
      node failures, or inconsistent data being received as any parts which are
      correctly received, are incorrectly handled.
      
      This patch adds handling for the case where a dropped signal is part of
      a fragmented signal.
      
      Testing is done using SCANTABREQ which can be fragmented as a result of
      WL4258.
modified:
  storage/ndb/src/kernel/blocks/ERROR_codes.txt
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
  storage/ndb/test/ndbapi/testLimits.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt

 3165 Frazer Clement	2008-12-09 [merge]
      Merge 6.3->6.4
modified:
  storage/ndb/include/kernel/Interpreter.hpp
  storage/ndb/include/ndbapi/NdbInterpretedCode.hpp
  storage/ndb/include/ndbapi/NdbOperation.hpp
  storage/ndb/include/ndbapi/NdbScanFilter.hpp
  storage/ndb/include/util/NdbSqlUtil.hpp
  storage/ndb/src/common/util/NdbSqlUtil.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
  storage/ndb/src/ndbapi/NdbInterpretedCode.cpp
  storage/ndb/src/ndbapi/NdbOperationInt.cpp
  storage/ndb/src/ndbapi/NdbScanFilter.cpp
  storage/ndb/test/ndbapi/testScanFilter.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt

 3164 Jonas Oreland	2008-12-09
      ndb - fix merge, release marker
modified:
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp

=== modified file 'storage/ndb/include/kernel/Interpreter.hpp'
--- a/storage/ndb/include/kernel/Interpreter.hpp	2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/kernel/Interpreter.hpp	2008-12-09 15:47:14 +0000
@@ -89,18 +89,17 @@ public:
   /**
    * Branch string
    *
-   * i = Instruction            -  5 Bits ( 0 - 5 ) max 63
-   * a = Attribute id
-   * l = Length of string
-   * b = Branch offset
-   * t = branch type
+   * i = Instruction              -  5 Bits ( 0 - 5 ) max 63
+   * a = Attribute id             -  16 bits
+   * l = Length of string (bytes) -  16 bits
+   * b = Branch offset (words)    -  16 bits
+   * t = branch type              -  4 bits
    * d = Array length diff
    * v = Varchar flag
-   * p = No-blank-padding flag for char compare
    *
    *           1111111111222222222233
    * 01234567890123456789012345678901
-   * iiiiii   ddvtttpbbbbbbbbbbbbbbbb
+   * iiiiii   ddvttttbbbbbbbbbbbbbbbb
    * aaaaaaaaaaaaaaaallllllllllllllll
    * -string....                    -
    */
@@ -117,17 +116,21 @@ public:
     GT = 4,
     GE = 5,
     LIKE = 6,
-    NOT_LIKE = 7
+    NOT_LIKE = 7,
+    AND_EQ_MASK = 8,
+    AND_NE_MASK = 9,
+    AND_EQ_ZERO = 10,
+    AND_NE_ZERO = 11
   };
+  // TODO : Remove other 2 unused parameters.
   static Uint32 BranchCol(BinaryCondition cond, 
-			  Uint32 arrayLengthDiff, Uint32 varchar, bool nopad);
+			  Uint32 arrayLengthDiff, Uint32 varchar);
   static Uint32 BranchCol_2(Uint32 AttrId);
   static Uint32 BranchCol_2(Uint32 AttrId, Uint32 Len);
 
   static Uint32 getBinaryCondition(Uint32 op1);
   static Uint32 getArrayLengthDiff(Uint32 op1);
   static Uint32 isVarchar(Uint32 op1);
-  static Uint32 isNopad(Uint32 op1);
   static Uint32 getBranchCol_AttrId(Uint32 op2);
   static Uint32 getBranchCol_Len(Uint32 op2);
   
@@ -216,15 +219,14 @@ inline
 Uint32
 Interpreter::BranchCol(BinaryCondition cond, 
 		       Uint32 arrayLengthDiff,
-		       Uint32 varchar, bool nopad){
-  //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u nopad=%d",
-      //cond, arrayLengthDiff, varchar, nopad);
+		       Uint32 varchar){
+  //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u",
+      //cond, arrayLengthDiff, varchar);
   return 
     BRANCH_ATTR_OP_ARG + 
     (arrayLengthDiff << 9) + 
     (varchar << 11) +
-    (cond << 12) +
-    (nopad << 15);
+    (cond << 12);
 }
 
 inline
@@ -242,7 +244,7 @@ Interpreter::BranchCol_2(Uint32 AttrId){
 inline
 Uint32
 Interpreter::getBinaryCondition(Uint32 op){
-  return (op >> 12) & 0x7;
+  return (op >> 12) & 0xf;
 }
 
 inline
@@ -259,12 +261,6 @@ Interpreter::isVarchar(Uint32 op){
 
 inline
 Uint32
-Interpreter::isNopad(Uint32 op){
-  return (op >> 15) & 1;
-}
-
-inline
-Uint32
 Interpreter::getBranchCol_AttrId(Uint32 op){
   return (op >> 16) & 0xFFFF;
 }

=== modified file 'storage/ndb/include/ndbapi/NdbInterpretedCode.hpp'
--- a/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp	2008-08-21 14:34:12 +0000
+++ b/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp	2008-12-09 17:02:23 +0000
@@ -225,27 +225,30 @@ public:
    * Space required        Buffer          Request message
    *   branch_col_*_null   2 words         2 words
    *   branch_col_*        2 words +       2 words + 
-   *                       len bytes       len bytes
+   *                       type length     type length
    *                       rounded to      rounded to
    *                       nearest word    nearest word
    *
+   *                       Only significant words stored/
+   *                       sent for VAR* types
+   *
    * @param val       ptr to const value to compare against
-   * @param len       length in bytes of const value
+   * @param unused    unnecessary
    * @param attrId    column to compare
    * @param Label     Program label to jump to if condition is true
    * @return 0 if successful, -1 otherwise.
    */
-  int branch_col_eq(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_eq(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_ne(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_ne(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_lt(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_lt(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_le(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_le(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_gt(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_gt(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_ge(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_ge(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
   int branch_col_eq_null(Uint32 attrId, Uint32 Label);
   int branch_col_ne_null(Uint32 attrId, Uint32 Label);
@@ -280,13 +283,60 @@ public:
    * @param attrId    column to compare
    * @param Label     Program label to jump to if condition is true
    * @return 0 if successful, -1 otherwise.
-
+   *
    */
   int branch_col_like(const void * val, Uint32 len, Uint32 attrId,
                       Uint32 Label);
   int branch_col_notlike(const void * val, Uint32 len, Uint32 attrId,
                          Uint32 Label);
 
+  /* Table based bitwise logical conditional operations
+   * --------------------------------------------------
+   * These instructions are used to branch based on the 
+   * result of logical AND between Bit type column data 
+   * and a bitmask pattern.
+   *
+   * These instructions require that the table being operated
+   * upon was supplied when the NdbInterpretedCode object was
+   * constructed.
+   *
+   * The mask value should be the same size as the bit column
+   * being compared.
+   * Bitfields are passed in/out of NdbApi as 32-bit words
+   * with bits set from lsb to msb.
+   * The platform's endianness controls which byte contains
+   * the ls bits.  
+   *   x86= first(0th) byte.  Sparc/PPC= last(3rd byte)
+   *
+   * To set bit n of a bitmask to 1 from a Uint32* mask :
+   *   mask[n >> 5] |= (1 << (n & 31))
+   *
+   * if (BitWiseAnd(ValueOf(attrId), *mask) <EQ/NE> <*mask/0>)
+   *   goto Label;
+   *
+   * Space required        Buffer          Request message
+   *   branch_col_and_mask_eq_mask/
+   *   branch_col_and_mask_ne_mask/
+   *   branch_col_and_mask_eq_zero/
+   *   branch_col_and_mask_ne_zero
+   *                       2 words +       2 words + 
+   *                       column width    column width
+   *                       rounded to      rounded to
+   *                       nearest word    nearest word
+   *
+   * @param mask      ptr to const mask to use
+   * @param unused    unnecessary
+   * @param attrId    column to compare
+   * @param Label     Program label to jump to if condition is true
+   * @return 0 if successful, -1 otherwise.
+   *
+   */
+  int branch_col_and_mask_eq_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_ne_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_eq_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_ne_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+
+
   /* Program results 
    * ---------------
    * These instructions indicate to the interpreter that processing

=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-11-08 21:43:03 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-12-09 17:02:23 +0000
@@ -734,7 +734,12 @@ public:
   int branch_col_ge(Uint32 ColId, const void * val, Uint32 len, 
 		    bool nopad, Uint32 Label);
   /**
-   * The argument is always plain char, even if the field is varchar
+   * LIKE/NOTLIKE wildcard comparisons
+   * These instructions support SQL-style % and _ wildcards for
+   * (VAR)CHAR/BINARY columns only
+   *
+   * The argument is always plain char format, even if the field 
+   * is varchar
    * (changed in 5.0.22).
    * 
    * @note For Scans and NdbRecord operations, use the 
@@ -744,7 +749,40 @@ public:
 		      bool nopad, Uint32 Label);
   int branch_col_notlike(Uint32 ColId, const void *, Uint32 len, 
 			 bool nopad, Uint32 Label);
-  
+
+  /**
+   * Bitwise logical comparisons
+   *
+   * These comparison types are only supported for the Bitfield
+   * type
+   * They can be used to test for bit patterns in bitfield columns
+   *   The value passed is a bitmask which is bitwise-ANDed with the
+   *   column data.
+   *   Bitfields are passed in/out of NdbApi as 32-bit words with
+   *   bits set from lsb to msb.
+   *   The platform's endianness controls which byte contains the ls
+   *   bits.
+   *     x86= first(0th) byte.  Sparc/PPC= last (3rd byte)
+   *
+   *   To set bit n of a bitmask to 1 from a Uint32* mask : 
+   *     mask[n >> 5] |= (1 << (n & 31))
+   *
+   *   The branch can be taken in 4 cases :
+   *     - Column data AND Mask == Mask (all masked bits are set in data)
+   *     - Column data AND Mask != Mask (not all masked bits are set in data)
+   *     - Column data AND Mask == 0    (No masked bits are set in data)
+   *     - Column data AND Mask != 0    (Some masked bits are set in data)
+   *   
+   */
+  int branch_col_and_mask_eq_mask(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_ne_mask(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_eq_zero(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_ne_zero(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+
   /**
    * Interpreted program instruction: Exit with Ok
    *
@@ -1238,7 +1276,7 @@ protected:
   int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest);
   int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource);
   int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32);
-  int branch_col(Uint32 type, Uint32, const void *, Uint32, bool, Uint32 Label);
+  int branch_col(Uint32 type, Uint32, const void *, Uint32, Uint32 Label);
   int branch_col_null(Uint32 type, Uint32 col, Uint32 Label);
   NdbBlob *linkInBlobHandle(NdbTransaction *aCon,
                             const NdbColumnImpl *column,

=== modified file 'storage/ndb/include/ndbapi/NdbScanFilter.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanFilter.hpp	2008-08-21 14:34:12 +0000
+++ b/storage/ndb/include/ndbapi/NdbScanFilter.hpp	2008-12-09 17:02:23 +0000
@@ -80,14 +80,18 @@ public:
 
   enum BinaryCondition 
   {
-    COND_LE = 0,        ///< lower bound
-    COND_LT = 1,        ///< lower bound, strict
-    COND_GE = 2,        ///< upper bound
-    COND_GT = 3,        ///< upper bound, strict
-    COND_EQ = 4,        ///< equality
-    COND_NE = 5,        ///< not equal
-    COND_LIKE = 6,      ///< like
-    COND_NOT_LIKE = 7   ///< not like
+    COND_LE = 0,           ///< lower bound
+    COND_LT = 1,           ///< lower bound, strict
+    COND_GE = 2,           ///< upper bound
+    COND_GT = 3,           ///< upper bound, strict
+    COND_EQ = 4,           ///< equality
+    COND_NE = 5,           ///< not equal
+    COND_LIKE = 6,         ///< like
+    COND_NOT_LIKE = 7,     ///< not like
+    COND_AND_EQ_MASK = 8,  ///< (bit & mask) == mask
+    COND_AND_NE_MASK = 9,  ///< (bit & mask) != mask (incl. NULL)
+    COND_AND_EQ_ZERO = 10, ///< (bit & mask) == 0
+    COND_AND_NE_ZERO = 11, ///< (bit & mask) != 0 (incl. NULL)
   };
 
   /** 
@@ -130,6 +134,9 @@ public:
    * documentation for NdbOperation::equal().
    * For BinaryConditions LIKE and NOT_LIKE, the value pointed to by val
    * should NOT include initial length bytes.
+   * For LIKE and NOT_LIKE, the % and ? wildcards are supported.
+   * For bitmask operations, see the bitmask format information against
+   * the branch_col_and_mask_eq_mask instruction in NdbInterpretedCode.hpp
    *
    *  �return  0 if successful, -1 otherwise
    */

=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp	2008-12-09 15:47:14 +0000
@@ -52,6 +52,15 @@ public:
    */
   typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2);
 
+  /**
+   * Prototype for mask comparisons.  Defined for bit type.
+   *
+   * If common portion of data AND Mask is equal to mask
+   * return 0, else return 1.
+   * If cmpZero, compare data AND Mask to zero.
+   */
+  typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero); 
+
   enum CmpResult {
     CmpLess = -1,
     CmpEqual = 0,
@@ -96,6 +105,7 @@ public:
     Enum m_typeId;      // redundant
     Cmp* m_cmp;         // comparison method
     Like* m_like;       // "like" comparison method
+    AndMask* m_mask;    // Mask comparison method
   };
 
   /**
@@ -179,6 +189,8 @@ private:
   static Like likeVarbinary;
   static Like likeLongvarchar;
   static Like likeLongvarbinary;
+  //
+  static AndMask maskBit;
 };
 
 #endif

=== modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp'
--- a/storage/ndb/src/common/util/NdbSqlUtil.cpp	2007-02-23 11:28:34 +0000
+++ b/storage/ndb/src/common/util/NdbSqlUtil.cpp	2008-12-09 15:47:14 +0000
@@ -26,156 +26,187 @@ NdbSqlUtil::m_typeList[] = {
   { // 0
     Type::Undefined,
     NULL,
+    NULL,
     NULL
   },
   { // 1
     Type::Tinyint,
     cmpTinyint,
+    NULL,
     NULL
   },
   { // 2
     Type::Tinyunsigned,
     cmpTinyunsigned,
+    NULL,
     NULL
   },
   { // 3
     Type::Smallint,
     cmpSmallint,
+    NULL,
     NULL
   },
   { // 4
     Type::Smallunsigned,
     cmpSmallunsigned,
+    NULL,
     NULL
   },
   { // 5
     Type::Mediumint,
     cmpMediumint,
+    NULL,
     NULL
   },
   { // 6
     Type::Mediumunsigned,
     cmpMediumunsigned,
+    NULL,
     NULL
   },
   { // 7
     Type::Int,
     cmpInt,
+    NULL,
     NULL
   },
   { // 8
     Type::Unsigned,
     cmpUnsigned,
+    NULL,
     NULL
   },
   { // 9
     Type::Bigint,
     cmpBigint,
+    NULL,
     NULL
   },
   { // 10
     Type::Bigunsigned,
     cmpBigunsigned,
+    NULL,
     NULL
   },
   { // 11
     Type::Float,
     cmpFloat,
+    NULL,
     NULL
   },
   { // 12
     Type::Double,
     cmpDouble,
+    NULL,
     NULL
   },
   { // 13
     Type::Olddecimal,
     cmpOlddecimal,
+    NULL,
     NULL
   },
   { // 14
     Type::Char,
     cmpChar,
-    likeChar
+    likeChar,
+    NULL
   },
   { // 15
     Type::Varchar,
     cmpVarchar,
-    likeVarchar
+    likeVarchar,
+    NULL
   },
   { // 16
     Type::Binary,
     cmpBinary,
-    likeBinary
+    likeBinary,
+    NULL
   },
   { // 17
     Type::Varbinary,
     cmpVarbinary,
-    likeVarbinary
+    likeVarbinary,
+    NULL
   },
   { // 18
     Type::Datetime,
     cmpDatetime,
+    NULL,
     NULL
   },
   { // 19
     Type::Date,
     cmpDate,
+    NULL,
     NULL
   },
   { // 20
     Type::Blob,
     NULL,
+    NULL,
     NULL
   },
   { // 21
     Type::Text,
     NULL,
+    NULL,
     NULL
   },
   { // 22
     Type::Bit,
     cmpBit,
-    NULL
+    NULL,
+    maskBit
   },
   { // 23
     Type::Longvarchar,
     cmpLongvarchar,
-    likeLongvarchar
+    likeLongvarchar,
+    NULL
   },
   { // 24
     Type::Longvarbinary,
     cmpLongvarbinary,
-    likeLongvarbinary
+    likeLongvarbinary,
+    NULL
   },
   { // 25
     Type::Time,
     cmpTime,
+    NULL,
     NULL
   },
   { // 26
     Type::Year,
     cmpYear,
+    NULL,
     NULL
   },
   { // 27
     Type::Timestamp,
     cmpTimestamp,
+    NULL,
     NULL
   },
   { // 28
     Type::Olddecimalunsigned,
     cmpOlddecimalunsigned,
+    NULL,
     NULL
   },
   { // 29
     Type::Decimal,
     cmpDecimal,
+    NULL,
     NULL
   },
   { // 30
     Type::Decimalunsigned,
     cmpDecimalunsigned,
+    NULL,
     NULL
   }
 };
@@ -680,9 +711,54 @@ NdbSqlUtil::cmpText(const void* info, co
 int
 NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
 { 
-  Uint32 n = (n1 < n2) ? n1 : n2;
-  int ret = memcmp(p1, p2, n);
-  return ret;
+  /* Bitfields are stored as 32-bit words
+   * This means that a byte-by-byte comparison will not work on all platforms
+   * We do a word-wise comparison of the significant bytes.
+   * It is assumed that insignificant bits (but not bytes) are zeroed in the
+   * passed values.
+   */
+  const Uint32 bytes= MIN(n1, n2);
+  Uint32 words= (bytes + 3) >> 2;
+
+  /* Don't expect either value to be length zero */
+  assert(words);
+
+  /* Check ptr alignment */
+  if (unlikely(((((UintPtr)p1) & 3) != 0) ||
+               ((((UintPtr)p2) & 3) != 0)))
+  {
+    Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ];
+    Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ];
+    memcpy(copyP1, p1, words << 2);
+    memcpy(copyP2, p2, words << 2);
+
+    return cmpBit(info, copyP1, bytes, copyP2, bytes, full);
+  }
+
+  const Uint32* wp1= (const Uint32*) p1;
+  const Uint32* wp2= (const Uint32*) p2;
+  while (--words)
+  {
+    if (*wp1 < *wp2)
+      return -1;
+    if (*(wp1++) > *(wp2++))
+      return 1;
+  }
+
+  /* For the last word, we mask out any insignificant bytes */
+  const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+  const Uint32 mask= sigBytes?
+    (1 << (sigBytes *8)) -1 :
+    ~0;
+  const Uint32 lastWord1= *wp1 & mask;
+  const Uint32 lastWord2= *wp2 & mask;
+  
+  if (lastWord1 < lastWord2)
+    return -1;
+  if (lastWord1 > lastWord2)
+    return 1;
+
+  return 0;
 }
 
 
@@ -874,6 +950,85 @@ NdbSqlUtil::likeLongvarbinary(const void
   return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
 }
 
+
+// mask Functions
+
+int
+NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero)
+{
+  /* Bitfields are stored in word oriented form, so we must compare them in that
+   * style as well
+   * It is assumed that insignificant bits (but not bytes) in the passed values
+   * are zeroed
+   */
+  const Uint32 bytes = MIN(dataLen, maskLen);
+  Uint32 words = (bytes + 3) >> 2;
+
+  /* Don't expect either value to be length zero */
+  assert(words);
+
+  /* Check ptr alignment */
+  if (unlikely(((((UintPtr)data) & 3) != 0) ||
+               ((((UintPtr)mask) & 3) != 0)))
+  {
+    Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ];
+    Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ];
+    memcpy(copydata, data, words << 2);
+    memcpy(copymask, mask, words << 2);
+
+    return maskBit(data, bytes, mask, bytes, cmpZero);
+  }
+
+  const Uint32* wdata= (const Uint32*) data;
+  const Uint32* wmask= (const Uint32*) mask;
+
+  if (cmpZero)
+  {
+    while (--words)
+    {
+      if ((*(wdata++) & *(wmask++)) != 0)
+        return 1;
+    }
+    
+    /* For the last word, we mask out any insignificant bytes */
+    const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+    const Uint32 comparisonMask= sigBytes?
+      (1 << (sigBytes *8)) -1 :
+      ~0;
+    const Uint32 lastDataWord= *wdata & comparisonMask;
+    const Uint32 lastMaskWord= *wmask & comparisonMask;
+    
+    if ((lastDataWord & lastMaskWord) != 0)
+      return 1;
+
+    return 0;
+  }
+  else
+  {
+    while (--words)
+    {
+      if ((*(wdata++) & *wmask) != *wmask)
+        return 1;
+      
+      wmask++;
+    }
+
+    /* For the last word, we mask out any insignificant bytes */
+    const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+    const Uint32 comparisonMask= sigBytes?
+      (1 << (sigBytes *8)) -1 :
+      ~0;
+    const Uint32 lastDataWord= *wdata & comparisonMask;
+    const Uint32 lastMaskWord= *wmask & comparisonMask;
+    
+    if ((lastDataWord & lastMaskWord) != lastMaskWord)
+      return 1;
+
+    return 0;
+  }
+};
+
+
 // check charset
 
 uint

=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-12-08 13:58:15 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-12-09 17:15:12 +0000
@@ -3,10 +3,10 @@ Next NDBCNTR 1002
 Next NDBFS 2000
 Next DBACC 3002
 Next DBTUP 4029
-Next DBLQH 5051
+Next DBLQH 5053
 Next DBDICT 6013
 Next DBDIH 7216
-Next DBTC 8074
+Next DBTC 8078
 Next CMVMI 9000
 Next BACKUP 10041
 Next DBUTIL 11002
@@ -354,6 +354,25 @@ ABORT OF TCKEYREQ
 
 8051 : Simulate failure of allocation for saveINDXKEYINFO
 
+DBTC LONG SIGNAL TESTING
+------------------------
+8065: Consume all but 10 long section segments on next TCKEYREQ
+8066: Consume all but 1 long section segments on next TCKEYREQ
+8067: Consume all long section segments on next TCKEYREQ
+8068: Free all segments hoarded by 8065, 8066, 8067
+
+8069: Always send 'short' LQHKEYREQ to LQH
+8070: Always send 'short' SCANFRAGREQ to LQH
+
+8074: Drop first fragment of fragmented SCANTABREQ
+8075: Drop middle fragments of fragmented SCANTABREQ
+8076: Drop last fragment of fragmented SCANTABREQ
+8077: Drop all fragments of fragmented SCANTABREQ
+
+DBLQH LONG SIGNAL TESTING
+-------------------------
+5051: Send short LQHKEYREQ to next replica
+5052: Fail to store KeyInfo from TUP in long section
 
 CMVMI
 -----

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-08 13:58:15 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-09 17:15:12 +0000
@@ -3661,6 +3661,12 @@ void Dblqh::execSIGNAL_DROPPED_REP(Signa
    * long signal buffering to store its sections
    */
   jamEntry();
+
+  if (!assembleDroppedFragments(signal))
+  {
+    jam();
+    return;
+  }
   
   const SignalDroppedRep* rep = (SignalDroppedRep*) &signal->theData[0];
   Uint32 originalGSN= rep->originalGsn;

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2008-12-03 19:51:33 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2008-12-09 17:15:12 +0000
@@ -1870,6 +1870,10 @@ private:
   bool validate_filter(Signal*);
   bool match_and_print(Signal*, ApiConnectRecordPtr);
 
+#ifdef ERROR_INSERT
+  bool testFragmentDrop(Signal* signal);
+#endif
+
   // For Error inserts
   Uint32 errorInsertHoardedSegments;
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-12-09 06:49:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-12-09 17:15:12 +0000
@@ -2606,7 +2606,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
       ERROR_INSERTED(8066) ||
       ERROR_INSERTED(8067))
   {
-    /* Consume all but 10(8065) or all but 1 (8066) or all (8077) of 
+    /* Consume all but 10(8065) or all but 1 (8066) or all (8067) of 
      * the SegmentedSection buffers to allow testing of what happens 
      * when they're exhausted, either in this signal or one to follow
      * 8068 frees all 'hoarded' segments
@@ -3834,6 +3834,13 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal
    * long signal buffering to store its sections
    */
   jamEntry();
+
+  if (!assembleDroppedFragments(signal))
+  {
+    jam();
+    return;
+  }
+
   const SignalDroppedRep* rep = (SignalDroppedRep*) &signal->theData[0];
   Uint32 originalGSN= rep->originalGsn;
 
@@ -3933,6 +3940,8 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal
     jam();
     /* Don't expect dropped signals for other GSNs,
      * default handling
+     * TODO : Can TC get long TRANSID_AI as part of 
+     * Unique index operations?
      */
     SimulatedBlock::execSIGNAL_DROPPED_REP(signal);
   };
@@ -9464,6 +9473,50 @@ void Dbtc::systemErrorLab(Signal* signal
 }//Dbtc::systemErrorLab()
 
 
+#ifdef ERROR_INSERT
+bool Dbtc::testFragmentDrop(Signal* signal)
+{
+  Uint32 fragIdToDrop= ~0;
+  /* Drop some fragments to test the dropped fragment handling code */
+  if (ERROR_INSERTED(8074))
+    fragIdToDrop= 1;
+  else if (ERROR_INSERTED(8075))
+    fragIdToDrop= 2;
+  else if (ERROR_INSERTED(8076))
+    fragIdToDrop= 3;
+  
+  if ((signal->header.m_fragmentInfo == fragIdToDrop) ||
+      ERROR_INSERTED(8077)) // Drop all fragments
+  {
+    /* This signal fragment should be dropped 
+     * Let's throw away the sections, and call the
+     * signal dropped report handler
+     * This code is replicating the effect of the code in
+     * TransporterCallback::deliver_signal()
+     */
+    SectionHandle handle(this, signal);
+    Uint32 secCount= handle.m_cnt;
+    releaseSections(handle);
+    SignalDroppedRep* rep = (SignalDroppedRep*)signal->theData;
+    Uint32 gsn = signal->header.theVerId_signalNumber;
+    Uint32 len = signal->header.theLength;
+    Uint32 newLen= (len > 22 ? 22 : len);
+    memmove(rep->originalData, signal->theData, (4 * newLen));
+    rep->originalGsn = gsn;
+    rep->originalLength = len;
+    rep->originalSectionCount = secCount;
+    signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
+    signal->header.theLength = newLen + 3;
+    signal->header.m_noOfSections = 0;
+
+    EXECUTE_DIRECT(DBTC, GSN_SIGNAL_DROPPED_REP, signal,
+                   newLen + 3);
+    return true;
+  }
+  return false;
+}
+#endif
+
 /* ######################################################################### *
  * #######                        SCAN MODULE                        ####### *
  * ######################################################################### *
@@ -9560,6 +9613,21 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
 {
   jamEntry();
 
+#ifdef ERROR_INSERT
+  /* Test fragmented + dropped signal handling */
+  if (ERROR_INSERTED(8074) ||
+      ERROR_INSERTED(8075) ||
+      ERROR_INSERTED(8076) ||
+      ERROR_INSERTED(8077))
+  {
+    jam();
+    if (testFragmentDrop(signal)) {
+      jam();
+      return;
+    }
+  } /* End of test fragmented + dropped signal handling */
+#endif  
+
   /* Reassemble if the request was fragmented */
   if (!assembleFragments(signal)){
     jam();

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-11-16 12:29:39 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-12-09 17:02:23 +0000
@@ -2461,12 +2461,23 @@ int Dbtup::interpreterNextLab(Signal* si
         const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
         // fixed length in 5.0
 	Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
+        
+        if (typeId == NDB_TYPE_BIT)
+        {
+          /* Size in bytes for bit fields can be incorrect due to
+           * rounding down
+           */
+          Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
+                                   + 7) / 8;
+          attrLen= bitFieldAttrLen;
+        }
 
 	bool r1_null = ah.isNULL();
 	bool r2_null = argLen == 0;
 	int res1;
-        if (cond != Interpreter::LIKE &&
-            cond != Interpreter::NOT_LIKE) {
+        if (cond <= Interpreter::GE)
+        {
+          /* Inequality - EQ, NE, LT, LE, GT, GE */
           if (r1_null || r2_null) {
             // NULL==NULL and NULL<not-NULL
             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
@@ -2479,16 +2490,42 @@ int Dbtup::interpreterNextLab(Signal* si
             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
           }
 	} else {
-          if (r1_null || r2_null) {
-            // NULL like NULL is true (has no practical use)
-            res1 =  r1_null && r2_null ? 0 : -1;
-          } else {
-	    jam();
-	    if (unlikely(sqlType.m_like == 0))
-	    {
-	      return TUPKEY_abort(signal, 40);
-	    }
-            res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+          if ((cond == Interpreter::LIKE) ||
+              (cond == Interpreter::NOT_LIKE))
+          {
+            if (r1_null || r2_null) {
+              // NULL like NULL is true (has no practical use)
+              res1 =  r1_null && r2_null ? 0 : -1;
+            } else {
+              jam();
+              if (unlikely(sqlType.m_like == 0))
+              {
+                return TUPKEY_abort(signal, 40);
+              }
+              res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+            }
+          }
+          else
+          {
+            /* AND_XX_MASK condition */
+            ndbassert(cond <= Interpreter::AND_NE_ZERO);
+            if (unlikely(sqlType.m_mask == 0))
+            {
+              return TUPKEY_abort(signal,40);
+            }
+            /* If either arg is NULL, we say COL AND MASK
+             * NE_ZERO and NE_MASK.
+             */
+            if (r1_null || r2_null) {
+              res1= 1;
+            } else {
+              
+              bool cmpZero= 
+                (cond == Interpreter::AND_EQ_ZERO) ||
+                (cond == Interpreter::AND_NE_ZERO);
+              
+              res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
+            }
           }
         }
 
@@ -2519,6 +2556,18 @@ int Dbtup::interpreterNextLab(Signal* si
         case Interpreter::NOT_LIKE:
           res = (res1 == 1);
           break;
+        case Interpreter::AND_EQ_MASK:
+          res = (res1 == 0);
+          break;
+        case Interpreter::AND_NE_MASK:
+          res = (res1 != 0);
+          break;
+        case Interpreter::AND_EQ_ZERO:
+          res = (res1 == 0);
+          break;
+        case Interpreter::AND_NE_ZERO:
+          res = (res1 != 0);
+          break;
 	  // XXX handle invalid value
         }
 #ifdef TRACE_INTERPRETER

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-12-09 17:15:12 +0000
@@ -1758,6 +1758,9 @@ SimulatedBlock::execNDB_TAMPER(Signal * 
 
 void
 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
+  /* Note no need for fragmented signal handling as we are
+   * going to crash this node
+   */
   char msg[64];
   const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
   BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
@@ -2019,6 +2022,8 @@ SimulatedBlock::assembleFragments(Signal
       fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
     }
     
+    ndbassert(! fragPtr.p->isDropped() );
+    
     /**
      * Don't release allocated segments
      */
@@ -2034,41 +2039,208 @@ SimulatedBlock::assembleFragments(Signal
     /**
      * FragInfo == 2 or 3
      */
-    Uint32 i;
-    for(i = 0; i<secs; i++){
-      Uint32 sectionNo = secNos[i];
-      ndbassert(sectionNo < 3);
-      Uint32 sectionPtrI = sectionPtr[i];
-      if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
-	linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
-      } else {
-	fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
+    if ( likely(! fragPtr.p->isDropped()) )
+    {
+      Uint32 i;
+      for(i = 0; i<secs; i++){
+        Uint32 sectionNo = secNos[i];
+        ndbassert(sectionNo < 3);
+        Uint32 sectionPtrI = sectionPtr[i];
+        if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
+          linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
+        } else {
+          fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
+        }
+      }
+      
+      /**
+       * fragInfo = 2
+       */
+      if(fragInfo == 2){
+        signal->header.m_fragmentInfo = 0;
+        signal->header.m_noOfSections = 0;
+        return false;
+      }
+      
+      /**
+       * fragInfo = 3
+       */
+      for(i = 0; i<3; i++){
+        Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
+        if(ptrI != RNIL){
+          signal->m_sectionPtrI[i] = ptrI;
+        } else {
+          break;
+        }
       }
+
+      signal->setLength(sigLen - secs);
+      signal->header.m_noOfSections = i;
+      signal->header.m_fragmentInfo = 0;
+      
+      c_fragmentInfoHash.release(fragPtr);
+      return true;
+    }
+    else
+    {
+      /* This fragmented signal has already had at least 1 fragment
+       * dropped.  We must release the received segments.
+       */
+      for (Uint32 i=0; i < secs; i++)
+        releaseSection( sectionPtr[i] );
+      
+      signal->header.m_fragmentInfo = 0;
+      signal->header.m_noOfSections = 0;
+      
+      /* FragInfo == 2 
+       * More fragments to come, keep waiting
+       */
+      if (fragInfo == 2)
+        return false;
+      
+      /* FragInfo == 3
+       * That was the last fragment.
+       * We're now ready for handling the dropped signal.
+       */      
+      SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
+      Uint32 gsn = signal->header.theVerId_signalNumber;
+      Uint32 len = signal->header.theLength;
+      Uint32 newLen= (len > 22 ? 22 : len);
+      memmove(rep->originalData, signal->theData, (4 * newLen));
+      rep->originalGsn = gsn;
+      rep->originalLength = len;
+      rep->originalSectionCount = 0;
+      signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
+      signal->header.theLength = newLen + 3;
+      signal->header.m_noOfSections = 0;
+      signal->header.m_fragmentInfo = 3;
+
+
+      /* Perform dropped signal handling, in this thread, now */
+      EXECUTE_DIRECT(theNumber, GSN_SIGNAL_DROPPED_REP, 
+                     signal, signal->header.theLength);
+      
+      /* return false to caller - they should not process the signal */
+      return false;
+    } // else (isDropped())
+  }
+  
+  /**
+   * Unable to find fragment
+   */
+  ndbrequire(false);
+  return false;
+}
+
+bool
+SimulatedBlock::assembleDroppedFragments(Signal* signal)
+{
+  /* This method is called at the start of a  SIGNAL_DROPPED_REP 
+   * handler when there is a chance that the dropped signal could
+   * be part of a fragmented signal.
+   * If the dropped signal was a fragmented signal, this
+   * needs to be handled specially to ensure that fragments
+   * of the signal are correctly dropped to avoid segment
+   * leaks etc.
+   * There are a number of cases : 
+   *   1) First fragment dropped  (FragInfo=1)
+   *      All remaining fragments must be dropped when they 
+   *      arrive.  The Signal dropped report handler must be 
+   *      executed when the last fragment has arrived.
+   *   2) Middle fragment dropped  (FragInfo=2)
+   *      Any existing stored segments must be released.  
+   *      All remaining fragments must be dropped when they
+   *      arrive.  
+   *   3) Last fragment dropped  (FragInfo=3)
+   *      Any existing stored segments must be released.  
+   *      Signal Dropped handling can occur, so return true.
+   *
+   * To indicate that a fragment has been dropped for a signal,
+   * all the section I Values in the fragment's hash entry are 
+   * set to RNIL.
+   * Signal Dropped Report handling is performed when the last
+   * fragment arrives.  If the last fragment is not dropped
+   * by the transporter layer then normal fragment assembly 
+   * arranges for dropped signal handling to occur.
+   */
+  Uint32 sigLen = signal->length() - 1;
+  Uint32 fragId = signal->theData[sigLen];
+  Uint32 fragInfo = signal->header.m_fragmentInfo;
+  Uint32 senderRef = signal->getSendersBlockRef();
+
+  if(fragInfo == 0){
+    return true;
+  }
+  
+  /* This method is for handling SIGNAL_DROPPED_REP only */
+  ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
+  ndbrequire(signal->header.m_noOfSections == 0);
+
+  if(fragInfo == 1){
+    /**
+     * First in train
+     */
+    Ptr<FragmentInfo> fragPtr;
+    if(!c_fragmentInfoHash.seize(fragPtr)){
+      ndbrequire(false);
+      return false;
     }
     
+    new (fragPtr.p)FragmentInfo(fragId, senderRef);
+    c_fragmentInfoHash.add(fragPtr);
+    
+    /* Mark entry in hash as belonging to dropped signal so subsequent
+     * fragments can also be dropped
+     */
+    fragPtr.p->m_sectionPtrI[0]= RNIL;
+    fragPtr.p->m_sectionPtrI[1]= RNIL;
+    fragPtr.p->m_sectionPtrI[2]= RNIL;
+
+    /* Wait for last fragment before SignalDroppedRep handling */
+    signal->header.m_fragmentInfo = 0;
+    return false;
+  }
+  
+  FragmentInfo key(fragId, senderRef);
+  Ptr<FragmentInfo> fragPtr;
+  if(c_fragmentInfoHash.find(fragPtr, key)){
+    
+    /**
+     * FragInfo == 2 or 3
+     */
+    if (! fragPtr.p->isDropped() )
+    {
+      /* Fragmented Signal not already marked as dropped
+       * Need to free stored segments
+       */
+      releaseSection(fragPtr.p->m_sectionPtrI[0]);
+      releaseSection(fragPtr.p->m_sectionPtrI[1]);
+      releaseSection(fragPtr.p->m_sectionPtrI[2]);
+      
+      /* Mark as dropped now */
+      fragPtr.p->m_sectionPtrI[0]= RNIL;
+      fragPtr.p->m_sectionPtrI[1]= RNIL;
+      fragPtr.p->m_sectionPtrI[2]= RNIL;
+      
+      ndbassert( fragPtr.p->isDropped() );
+    }
+
     /**
      * fragInfo = 2
+     *   Still waiting for final fragments.
+     *   Return false to caller.
      */
     if(fragInfo == 2){
       signal->header.m_fragmentInfo = 0;
-      signal->header.m_noOfSections = 0;
       return false;
     }
     
     /**
      * fragInfo = 3
+     *   All fragments received, remove entry
+     *   from hash and return to caller for
+     *   dropped signal handling.
      */
-    for(i = 0; i<3; i++){
-      Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
-      if(ptrI != RNIL){
-	signal->m_sectionPtrI[i] = ptrI;
-      } else {
-	break;
-      }
-    }
-
-    signal->setLength(sigLen - secs);
-    signal->header.m_noOfSections = i;
     signal->header.m_fragmentInfo = 0;
 
     c_fragmentInfoHash.release(fragPtr);

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-01 18:05:11 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-12-09 17:15:12 +0000
@@ -319,6 +319,21 @@ protected:
    */
   bool assembleFragments(Signal * signal);
   
+  /**
+   * Assemble dropped fragments
+   *
+   * Should be called at the start of a Dropped Signal Report 
+   * (GSN_DROPPED_SIGNAL_REP) handler when it is expected that
+   * the block could receive fragmented signals.
+   * No dropped signal handling should be done until this method
+   * returns true.
+   * 
+   * @return true if all fragments has arrived and dropped signal
+   *              handling can proceed.
+   *         false otherwise
+   */
+  bool assembleDroppedFragments(Signal * signal);
+  
   /* If send size is > FRAGMENT_WORD_SIZE, fragments of this size
    * will be sent by the sendFragmentedSignal variants
    */
@@ -388,6 +403,13 @@ protected:
     inline Uint32 hashValue() const {
       return m_senderRef + m_fragmentId ;
     }
+
+    inline bool isDropped() const {
+      /* IsDropped when entry in hash, but no segments stored */
+      return (( m_sectionPtrI[0] == RNIL ) &&
+              ( m_sectionPtrI[1] == RNIL ) &&
+              ( m_sectionPtrI[2] == RNIL ) );
+    }
   }; // sizeof() = 32 bytes
   
   /**

=== modified file 'storage/ndb/src/ndbapi/NdbInterpretedCode.cpp'
--- a/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-11-11 12:54:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-12-09 17:02:23 +0000
@@ -508,12 +508,23 @@ NdbInterpretedCode::branch_col(Uint32 br
     DBUG_RETURN(error(BadAttributeId));
   }
 
+  Uint32 lastWordMask= ~0;
   if (val == NULL)
     len = 0;
   else {
     if (! col->getStringType())
     {
       /* Fixed size type */
+      if (col->getType() == NDB_TYPE_BIT)
+      {
+        /* We want to zero out insignificant bits in the
+         * last word of a bit type
+         */
+        Uint32 bitLen= col->getLength();
+        Uint32 lastWordBits= bitLen & 0x1F;
+        if (lastWordBits)
+          lastWordMask= (1 << lastWordBits) -1;
+      }
       len= col->m_attrSize * col->m_arraySize;
     }
     else
@@ -536,7 +547,7 @@ NdbInterpretedCode::branch_col(Uint32 br
   if (col->m_storageType == NDB_STORAGETYPE_DISK)
     m_flags|= UsesDisk;
 
-  if (add_branch(Interpreter::BranchCol(c, 0, 0, false), Label) != 0)
+  if (add_branch(Interpreter::BranchCol(c, 0, 0), Label) != 0)
     DBUG_RETURN(-1);
 
   if (add1(Interpreter::BranchCol_2(attrId, len)) != 0)
@@ -544,7 +555,8 @@ NdbInterpretedCode::branch_col(Uint32 br
 
   /* Get value byte length rounded up to nearest 32-bit word */
   Uint32 len2 = Interpreter::mod4(len);
-  if(len2 == len){
+  if((len2 == len)  &&
+     (lastWordMask == (Uint32)~0)){
     /* Whole number of 32-bit words */
     DBUG_RETURN(addN((Uint32*)val, len2 >> 2));
   } else {
@@ -559,7 +571,7 @@ NdbInterpretedCode::branch_col(Uint32 br
       char* p = (char*)&tmp;
       p[i] = ((char*)val)[len2+i];
     }
-    DBUG_RETURN(add1(tmp));
+    DBUG_RETURN(add1((tmp & lastWordMask)));
   }
 }
 
@@ -569,7 +581,7 @@ NdbInterpretedCode::branch_col_eq(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::EQ, attrId, val, len, Label);
+  return branch_col(Interpreter::EQ, attrId, val, 0, Label);
 }
 
 int 
@@ -578,7 +590,7 @@ NdbInterpretedCode::branch_col_ne(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::NE, attrId, val, len, Label);
+  return branch_col(Interpreter::NE, attrId, val, 0, Label);
 }
 
 int 
@@ -587,7 +599,7 @@ NdbInterpretedCode::branch_col_lt(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::LT, attrId, val, len, Label);
+  return branch_col(Interpreter::LT, attrId, val, 0, Label);
 }
 
 int 
@@ -596,7 +608,7 @@ NdbInterpretedCode::branch_col_le(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::LE, attrId, val, len, Label);
+  return branch_col(Interpreter::LE, attrId, val, 0, Label);
 }
 
 int 
@@ -605,7 +617,7 @@ NdbInterpretedCode::branch_col_gt(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::GT, attrId, val, len, Label);
+  return branch_col(Interpreter::GT, attrId, val, 0, Label);
 }
 
 int 
@@ -614,7 +626,7 @@ NdbInterpretedCode::branch_col_ge(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::GE, attrId, val, len, Label);
+  return branch_col(Interpreter::GE, attrId, val, 0, Label);
 }
 
 int 
@@ -636,6 +648,42 @@ NdbInterpretedCode::branch_col_notlike(c
 }
 
 int
+NdbInterpretedCode::branch_col_and_mask_eq_mask(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_EQ_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_mask(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_NE_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_eq_zero(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_EQ_ZERO, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_zero(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_NE_ZERO, attrId, mask, 0, Label);
+}
+
+int
 NdbInterpretedCode::interpret_exit_ok()
 {
   return add1(Interpreter::EXIT_OK);

=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-11-11 12:54:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-12-09 17:25:55 +0000
@@ -37,6 +37,88 @@
 #include <NdbOut.hpp>
 
 
+/**
+ * Old NdbApi KeyInfo Section Iterator
+ *
+ * This is an implementation of GenericSectionIterator
+ * that reads signal data from signal object chains
+ * prepared by old NdbApi code
+ * In those chains, some data is in the first 
+ * (TCKEYREQ/TCINDXREQ) signal, and the rest is in linked
+ * KEYINFO / ATTRINFO chains.
+ * Longer term, the 'old' code should be modified to split
+ * operation definition from execution.  The intermediate
+ * use of signal chains for KeyInfo and AttrInfo can be
+ * revisited.
+ */
+class OldNdbApiSectionIterator: public GenericSectionIterator
+{
+private :
+  STATIC_CONST(KeyAndAttrInfoHeaderLength = 3);
+
+  const Uint32 firstSigDataLen; // Num words in first signal
+  Uint32* firstDataPtr;         // Ptr to start of data in first signal
+  NdbApiSignal* secondSignal;   // Second signal
+  // Nasty void* for current iterator position
+  void* currentPos;  // start  == firstDataPtr
+                     // middle == NdbApiSignal* 
+                     // end    == NULL
+
+  void checkStaticAssertions()
+  {
+    STATIC_ASSERT(KeyInfo::HeaderLength == KeyAndAttrInfoHeaderLength);
+    STATIC_ASSERT(AttrInfo::HeaderLength == KeyAndAttrInfoHeaderLength);
+  };
+
+public :
+  OldNdbApiSectionIterator(NdbApiSignal* TCREQ,
+                           Uint32 dataOffset,
+                           Uint32 dataLen,
+                           NdbApiSignal* nextSignal) :
+    firstSigDataLen(dataLen),
+    firstDataPtr(TCREQ->getDataPtrSend()+dataOffset),
+    secondSignal(nextSignal),
+    currentPos(firstDataPtr)
+  {
+    assert((dataOffset + dataLen) <= NdbApiSignal::MaxSignalWords);
+  }
+  
+  ~OldNdbApiSectionIterator()
+  {};
+
+  void reset()
+  {
+    currentPos= firstDataPtr;
+  }
+
+  Uint32* getNextWords(Uint32& sz)
+  {
+    /* In first TCKEY/INDXREQ, data is at offset depending
+     * on whether it's KEYINFO or ATTRINFO
+     * In following signals, data starts at offset 3
+     * regardless
+     */
+    if (likely(currentPos != NULL))
+    {
+      if (currentPos == firstDataPtr)
+      {
+        currentPos= secondSignal;
+        sz= firstSigDataLen;
+        return firstDataPtr;
+      }
+      /* Second signal is KeyInfo or AttrInfo
+       * Ignore header words
+       */
+      NdbApiSignal* sig= (NdbApiSignal*)currentPos;
+      assert(sig->getLength() >= KeyAndAttrInfoHeaderLength);
+      sz= sig->getLength() - KeyAndAttrInfoHeaderLength;
+      currentPos= sig->next();
+      return sig->getDataPtrSend() + KeyAndAttrInfoHeaderLength;
+    }
+    sz = 0;
+    return NULL;
+  }
+};
 
 void
 NdbOperation::setLastFlag(NdbApiSignal* signal, Uint32 lastFlag)
@@ -57,49 +139,56 @@ Remark:         Sends the TCKEYREQ signa
 int
 NdbOperation::doSend(int aNodeId, Uint32 lastFlag)
 {
-  int tReturnCode;
-  int tSignalCount = 0;
   assert(theTCREQ != NULL);
   setLastFlag(theTCREQ, lastFlag);
 
   if (m_attribute_record != NULL)
   {
     /* NdbRecord send - single long signal */
-    tReturnCode= doSendNdbRecord(aNodeId);
-    if (tReturnCode == -1)
+    if (doSendNdbRecord(aNodeId) == -1 )
       return -1;
-
-    tSignalCount= 1;
   }
   else
   {
-    /* Old Api send - signal train */
+    /* Old Api send - transform signal train to long sections */
     TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
-    tReturnCode = tp->sendSignal(theTCREQ, aNodeId);
-    tSignalCount++;
-    if (tReturnCode == -1) {
-      return -1;
+
+    TcKeyReq* tcKeyReq= (TcKeyReq*) theTCREQ->getDataPtrSend();
+    const Uint32 inlineKIOffset= tcKeyReq->keyInfo - (Uint32*) tcKeyReq;
+    const Uint32 inlineKILength= MIN(TcKeyReq::MaxKeyInfo,
+                                     theTupKeyLen);
+    const Uint32 inlineAIOffset = tcKeyReq->attrInfo - (Uint32*) tcKeyReq;
+    const Uint32 inlineAILength= MIN(TcKeyReq::MaxAttrInfo, 
+                                     theTotalCurrAI_Len);
+    
+    Uint32 numSecs= 1;
+    GenericSectionPtr secs[2];
+    /* Create iterators which use the signal train to extract
+     * long sections from the short signal trains
+     */
+    OldNdbApiSectionIterator keyInfoIter(theTCREQ,
+                                         inlineKIOffset,
+                                         inlineKILength,
+                                         theTCREQ->next());
+    OldNdbApiSectionIterator attrInfoIter(theTCREQ,
+                                          inlineAIOffset,
+                                          inlineAILength,
+                                          theFirstATTRINFO);
+
+    /* KeyInfo - always present for TCKEY/INDXREQ */
+    secs[0].sz= theTupKeyLen;
+    secs[0].sectionIter= &keyInfoIter;
+
+    /* AttrInfo - not always needed (e.g. Delete) */
+    if (theTotalCurrAI_Len != 0)
+    {
+      secs[1].sz= theTotalCurrAI_Len;
+      secs[1].sectionIter= &attrInfoIter;
+      numSecs++;
     }
-    NdbApiSignal *tSignal = theTCREQ->next();
-    while (tSignal != NULL) {
-      NdbApiSignal* tnextSignal = tSignal->next();
-      tReturnCode = tp->sendSignal(tSignal, aNodeId);
-      tSignal = tnextSignal;
-      if (tReturnCode == -1) {
-        return -1;
-      }
-      tSignalCount++;
-    }//while
-    tSignal = theFirstATTRINFO;
-    while (tSignal != NULL) {
-      NdbApiSignal* tnextSignal = tSignal->next();
-      tReturnCode = tp->sendSignal(tSignal, aNodeId);
-      tSignal = tnextSignal;
-      if (tReturnCode == -1) {
-        return -1;
-      }
-      tSignalCount++;
-    }//while
+
+    if (tp->sendSignal(theTCREQ, aNodeId, &secs[0], numSecs) == -1)
+      return -1;
   }
 
   /* Todo : Consider calling NdbOperation::postExecuteRelease()
@@ -108,7 +197,7 @@ NdbOperation::doSend(int aNodeId, Uint32
    */
 
   theNdbCon->OpSent();
-  return tSignalCount;
+  return 1;
 }//NdbOperation::doSend()
 
 
@@ -219,7 +308,7 @@ NdbOperation::prepareSend(Uint32 aTC_Con
     return -1;
   }
   Uint32 TattrLen = 0;
-  tcKeyReq->setAttrinfoLen(TattrLen, tTotalCurrAI_Len);
+  tcKeyReq->setAttrinfoLen(TattrLen, 0); // Not required for long signals.
   tcKeyReq->setAPIVersion(TattrLen, NDB_VERSION);
   tcKeyReq->attrLen            = TattrLen;
 
@@ -244,12 +333,7 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   tcKeyReq->transId2           = tTransId2;
   
   tReqInfo = 0;
-  if (tTotalCurrAI_Len <= TcKeyReq::MaxAttrInfo) {
-    tcKeyReq->setAIInTcKeyReq(tReqInfo, tTotalCurrAI_Len);
-  } else {
-    tcKeyReq->setAIInTcKeyReq(tReqInfo, TcKeyReq::MaxAttrInfo);
-  }//if
-
+  tcKeyReq->setAIInTcKeyReq(tReqInfo, 0); // Not needed
   tcKeyReq->setSimpleFlag(tReqInfo, tSimpleIndicator);
   tcKeyReq->setCommitFlag(tReqInfo, tCommitIndicator);
   tcKeyReq->setStartFlag(tReqInfo, tStartIndicator);
@@ -257,12 +341,11 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   tcKeyReq->setNoDiskFlag(tReqInfo, tNoDisk);
 
   OperationType tOperationType = theOperationType;
-  Uint32 tTupKeyLen = theTupKeyLen;
   Uint8 abortOption = (ao == DefaultAbortOption) ? (Uint8) m_abortOption : (Uint8) ao;
 
   tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator);
   tcKeyReq->setOperationType(tReqInfo, tOperationType);
-  tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen);
+  tcKeyReq->setKeyLength(tReqInfo, 0); // Not needed
   
   // A dirty read is always ignore error
   abortOption = tDirtyState ? (Uint8) AO_IgnoreError : (Uint8) abortOption;
@@ -290,105 +373,37 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   tOptionalDataPtr[0] = tScanInfo;
   tOptionalDataPtr[tDistrKeyIndex] = tDistrKey;
 
-//-------------------------------------------------------------
-// The next is step is to compress the key data part of the
-// TCKEYREQ signal.
-//-------------------------------------------------------------
-  Uint32 tKeyIndex = tDistrKeyIndex + tDistrKeyIndicator;
-  Uint32* tKeyDataPtr = &tOptionalDataPtr[tKeyIndex];
-  Uint32 Tdata1 = tcKeyReq->keyInfo[0];
-  Uint32 Tdata2 = tcKeyReq->keyInfo[1];
-  Uint32 Tdata3 = tcKeyReq->keyInfo[2];
-  Uint32 Tdata4 = tcKeyReq->keyInfo[3];
-  Uint32 Tdata5;
-
-  tKeyDataPtr[0] = Tdata1;
-  tKeyDataPtr[1] = Tdata2;
-  tKeyDataPtr[2] = Tdata3;
-  tKeyDataPtr[3] = Tdata4;
-  if (tTupKeyLen > 4) {
-    Tdata1 = tcKeyReq->keyInfo[4];
-    Tdata2 = tcKeyReq->keyInfo[5];
-    Tdata3 = tcKeyReq->keyInfo[6];
-    Tdata4 = tcKeyReq->keyInfo[7];
-
-    tKeyDataPtr[4] = Tdata1;
-    tKeyDataPtr[5] = Tdata2;
-    tKeyDataPtr[6] = Tdata3;
-    tKeyDataPtr[7] = Tdata4;
-  }//if
-//-------------------------------------------------------------
-// Finally we also compress the ATTRINFO part of the signal.
-// We optimise by using the if-statement for sending KEYINFO
-// signals to calculating the new Attrinfo Index.
-//-------------------------------------------------------------
-  Uint32 tAttrInfoIndex;  
+  theTCREQ->setLength(TcKeyReq::StaticLength + 
+                      tDistrKeyIndex +         // 1 for scan info present
+                      theDistrKeyIndicator_);  // 1 for distr key present
 
-  if (tTupKeyLen > TcKeyReq::MaxKeyInfo) {
+  /* Ensure the signal objects have the correct length
+   * information
+   */
+  if (theTupKeyLen > TcKeyReq::MaxKeyInfo) {
     /**
-     *	Set transid, TC connect ptr and length in the KEYINFO signals
+     *	Set correct length on last KeyInfo signal
      */
-    NdbApiSignal* tSignal = theTCREQ->next();
-    Uint32 remainingKey = tTupKeyLen - TcKeyReq::MaxKeyInfo;
-    do {
-      Uint32* tSigDataPtr = tSignal->getDataPtrSend();
-      NdbApiSignal* tnextSignal = tSignal->next();
-      tSigDataPtr[0] = aTC_ConnectPtr;
-      tSigDataPtr[1] = tTransId1;
-      tSigDataPtr[2] = tTransId2;
-      if (remainingKey > KeyInfo::DataLength) {
-	// The signal is full
-	tSignal->setLength(KeyInfo::MaxSignalLength);
-	remainingKey -= KeyInfo::DataLength;
-      }
-      else {
-	// Last signal
-	tSignal->setLength(KeyInfo::HeaderLength + remainingKey);
-	remainingKey = 0;
-      }
-      tSignal = tnextSignal;
-    } while (tSignal != NULL);
-    tAttrInfoIndex = tKeyIndex + TcKeyReq::MaxKeyInfo;
-  } else {
-    tAttrInfoIndex = tKeyIndex + tTupKeyLen;
-  }//if
+    if (theLastKEYINFO == NULL)
+      theLastKEYINFO= theTCREQ->next();
 
-//-------------------------------------------------------------
-// Perform the Attrinfo packing in the TCKEYREQ signal started
-// above.
-//-------------------------------------------------------------
-  Uint32* tAIDataPtr = &tOptionalDataPtr[tAttrInfoIndex];
-  Tdata1 = tcKeyReq->attrInfo[0];
-  Tdata2 = tcKeyReq->attrInfo[1];
-  Tdata3 = tcKeyReq->attrInfo[2];
-  Tdata4 = tcKeyReq->attrInfo[3];
-  Tdata5 = tcKeyReq->attrInfo[4];
-
-  theTCREQ->setLength(tcKeyReq->getAIInTcKeyReq(tReqInfo) +
-                      tAttrInfoIndex + TcKeyReq::StaticLength);
-
-  tAIDataPtr[0] = Tdata1;
-  tAIDataPtr[1] = Tdata2;
-  tAIDataPtr[2] = Tdata3;
-  tAIDataPtr[3] = Tdata4;
-  tAIDataPtr[4] = Tdata5;
-
-/***************************************************
-*  Send the ATTRINFO signals.
-***************************************************/
-  if (tTotalCurrAI_Len > 5) {
+    assert(theLastKEYINFO != NULL);
+
+    Uint32 lastKeyInfoLen= ((theTupKeyLen - TcKeyReq::MaxKeyInfo)
+                            % KeyInfo::DataLength);
+    
+    theLastKEYINFO->setLength(lastKeyInfoLen ? 
+                              KeyInfo::HeaderLength + lastKeyInfoLen : 
+                              KeyInfo::MaxSignalLength);
+  }
+
+  /* Set the length on the last AttrInfo signal */
+  if (tTotalCurrAI_Len > TcKeyReq::MaxAttrInfo) {
     // Set the last signal's length.
-    NdbApiSignal* tSignal = theFirstATTRINFO;
     theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
-    do {
-      Uint32* tSigDataPtr = tSignal->getDataPtrSend();
-      NdbApiSignal* tnextSignal = tSignal->next();
-      tSigDataPtr[0] = aTC_ConnectPtr;
-      tSigDataPtr[1] = tTransId1;
-      tSigDataPtr[2] = tTransId2;
-      tSignal = tnextSignal;
-    } while (tSignal != NULL);
   }//if
+  theTotalCurrAI_Len= tTotalCurrAI_Len;
+
   theStatus = WaitResponse;
   theReceiver.prepareSend();
   return 0;

=== modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2008-11-11 12:54:17 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2008-12-09 17:02:23 +0000
@@ -1067,7 +1067,7 @@ NdbOperation::insertCall(Uint32 aCall)
 int
 NdbOperation::branch_col(Uint32 type, 
 			 Uint32 ColId, const void * val, Uint32 len, 
-			 bool nopad, Uint32 Label){
+			 Uint32 Label){
 
   DBUG_ENTER("NdbOperation::branch_col");
   DBUG_PRINT("enter", ("type: %u  col:%u  val: 0x%lx  len: %u  label: %u",
@@ -1087,12 +1087,23 @@ NdbOperation::branch_col(Uint32 type, 
     abort();
   }
 
+  Uint32 lastWordMask= ~0;
   if (val == NULL)
     len = 0;
   else {
     if (! col->getStringType())
     {
       /* Fixed size type */
+      if (col->getType() == NDB_TYPE_BIT)
+      {
+        /* We want to zero out insignificant bits in the
+         * last word of a bit type
+         */
+        Uint32 bitLen= col->getLength();
+        Uint32 lastWordBits= bitLen & 0x1F;
+        if (lastWordBits)
+          lastWordMask= (1 << lastWordBits) -1;
+      }
       len= col->m_attrSize * col->m_arraySize;
     }
     else
@@ -1121,7 +1132,7 @@ NdbOperation::branch_col(Uint32 type, 
     val = tempData;
   }
 
-  if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
+  if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0)) == -1)
     DBUG_RETURN(-1);
   
   if (insertBranch(Label) == -1)
@@ -1131,7 +1142,8 @@ NdbOperation::branch_col(Uint32 type, 
     DBUG_RETURN(-1);
   
   Uint32 len2 = Interpreter::mod4(len);
-  if(len2 == len){
+  if((len2 == len) &&
+     (lastWordMask == (Uint32)~0)){
     insertATTRINFOloop((Uint32*)val, len2 >> 2);
   } else {
     len2 -= 4;
@@ -1141,7 +1153,7 @@ NdbOperation::branch_col(Uint32 type, 
       char* p = (char*)&tmp;
       p[i] = ((char*)val)[len2+i];
     }
-    insertATTRINFO(tmp);
+    insertATTRINFO(tmp & lastWordMask);
   }
   
   theErrorLine++;
@@ -1153,7 +1165,7 @@ NdbOperation::branch_col_eq(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::EQ, ColId, val, len, Label);
 }
 
 int
@@ -1161,28 +1173,28 @@ NdbOperation::branch_col_ne(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::NE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::NE, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_lt(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LT, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LT, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_le(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LE, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_gt(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::GT, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::GT, ColId, val, len, Label);
 }
 
 int
@@ -1190,7 +1202,7 @@ NdbOperation::branch_col_ge(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::GE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::GE, ColId, val, len, Label);
 }
 
 int
@@ -1198,7 +1210,7 @@ NdbOperation::branch_col_like(Uint32 Col
 			      bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LIKE, ColId, val, len, Label);
 }
 
 int
@@ -1206,7 +1218,39 @@ NdbOperation::branch_col_notlike(Uint32 
 				 bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::NOT_LIKE, ColId, val, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_mask(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_eq_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_EQ_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_mask(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_ne_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_NE_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_zero(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_eq_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_EQ_ZERO, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_zero(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_ne_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_NE_ZERO, ColId, mask, len, Label);
 }
 
 int

=== modified file 'storage/ndb/src/ndbapi/NdbOperationSearch.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationSearch.cpp	2008-11-08 21:43:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationSearch.cpp	2008-12-09 17:25:55 +0000
@@ -392,6 +392,7 @@ NdbOperation::insertKEYINFO(const char* 
       setErrorCodeAbort(4001);
       return -1;
     }
+    tSignal->setLength(KeyInfo::MaxSignalLength);
     if (theTCREQ->next() != NULL)
        theLastKEYINFO->next(tSignal);
     else

=== modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-08-06 15:46:41 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-12-09 17:02:23 +0000
@@ -586,6 +586,42 @@ static const tab3 table3[] = {
        &NdbInterpretedCode::branch_col_notlike, 
        &NdbInterpretedCode::branch_col_like, 
        &NdbInterpretedCode::branch_col_notlike } }
+  
+  /**
+   * AND EQ MASK
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask } }
+  
+  /**
+   * AND NE MASK
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask } } 
+
+  /**
+   * AND EQ ZERO
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero } }
+  
+  /**
+   * AND NE ZERO
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero } } 
 };
 
 const int tab3_sz = sizeof(table3)/sizeof(table3[0]);
@@ -675,6 +711,14 @@ NdbScanFilter::cmp(BinaryCondition cond,
     return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len);
   case COND_NOT_LIKE:
     return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
+  case COND_AND_EQ_MASK:
+    return m_impl.cond_col_const(Interpreter::AND_EQ_MASK, ColId, val, len);
+  case COND_AND_NE_MASK:
+    return m_impl.cond_col_const(Interpreter::AND_NE_MASK, ColId, val, len);
+  case COND_AND_EQ_ZERO:
+    return m_impl.cond_col_const(Interpreter::AND_EQ_ZERO, ColId, val, len);
+  case COND_AND_NE_ZERO:
+    return m_impl.cond_col_const(Interpreter::AND_NE_ZERO, ColId, val, len);
   }
   return -1;
 }

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-11-17 19:03:20 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-12-09 17:25:55 +0000
@@ -2149,6 +2149,7 @@ NdbScanOperation::takeOverScanOp(Operati
     Uint32 left = len - i;
     while(tSignal && left > KeyInfo::DataLength){
       tSignal->setSignal(GSN_KEYINFO);
+      tSignal->setLength(KeyInfo::MaxSignalLength);
       KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
       memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
       src += 4 * KeyInfo::DataLength;
@@ -2156,10 +2157,13 @@ NdbScanOperation::takeOverScanOp(Operati
       
       tSignal->next(theNdb->getSignal());
       tSignal = tSignal->next();
+      newOp->theLastKEYINFO = tSignal;
     }
     
     if(tSignal && left > 0){
       tSignal->setSignal(GSN_KEYINFO);
+      tSignal->setLength(KeyInfo::HeaderLength + left);
+      newOp->theLastKEYINFO = tSignal;
       KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
       memcpy(keyInfo->keyData, src, 4 * left);
     }      

=== modified file 'storage/ndb/test/ndbapi/testLimits.cpp'
--- a/storage/ndb/test/ndbapi/testLimits.cpp	2008-08-26 14:05:01 +0000
+++ b/storage/ndb/test/ndbapi/testLimits.cpp	2008-12-09 17:15:12 +0000
@@ -919,6 +919,95 @@ int testSegmentedSectionScan(NDBT_Contex
   return NDBT_OK;
 }
 
+int testDropSignalFragments(NDBT_Context* ctx, NDBT_Step* step){
+  /* Segmented section exhaustion results in dropped signals
+   * Fragmented signals split one logical signal over multiple
+   * physical signals (to cope with the MAX_SIGNAL_LENGTH=32kB
+   * limitation).
+   * This testcase checks that when individual signals comprising
+   * a fragmented signal (in this case SCANTABREQ) are dropped, the
+   * system behaves correctly.
+   * Correct behaviour is to behave in the same way as if the signal
+   * was not fragmented, and for SCANTABREQ, to return a temporary
+   * resource error.
+   */
+  NdbRestarter restarter;
+  Ndb* pNdb= GETNDB(step);
+
+  /* SEND > ((2 * MAX_SEND_MESSAGE_BYTESIZE) + SOME EXTRA) 
+   * This way we get at least 3 fragments
+   * However, as this is generally > 64kB, it's too much AttrInfo for
+   * a ScanTabReq, so the 'success' case returns error 874
+   */
+  const Uint32 PROG_WORDS= 16500; 
+
+  struct SubCase
+  {
+    Uint32 errorInsertCode;
+    int expectedRc;
+  };
+  const Uint32 numSubCases= 5;
+  const SubCase cases[numSubCases]= 
+  /* Error insert   Scanrc */
+    {{          0,     874},  // Normal, success which gives too much AI error
+     {       8074,     217},  // Drop first fragment -> error 217
+     {       8075,     217},  // Drop middle fragment(s) -> error 217
+     {       8076,     217},  // Drop last fragment -> error 217
+     {       8077,     217}}; // Drop all fragments -> error 217
+  const Uint32 numIterations= 50;
+  
+  Uint32 buff[ PROG_WORDS + 10 ]; // 10 extra for final 'return' etc.
+
+  for (Uint32 iteration=0; iteration < (numIterations * numSubCases); iteration++)
+  {
+    /* Start a transaction */
+    NdbTransaction* trans= pNdb->startTransaction();
+    CHECKNOTNULL(trans);
+
+    SubCase subcase= cases[iteration % numSubCases];
+
+    Uint32 errorInsertVal= subcase.errorInsertCode;
+    // printf("Inserting error : %u\n", errorInsertVal);
+    CHECKEQUAL(0, restarter.insertErrorInAllNodes(errorInsertVal));
+
+    NdbScanOperation* scan= trans->getNdbScanOperation(ctx->getTab());
+    
+    CHECKNOTNULL(scan);
+    
+    CHECKEQUAL(0, scan->readTuples());
+    
+    /* Create a large program, to give a large SCANTABREQ */
+    NdbInterpretedCode prog(ctx->getTab(), buff, PROG_WORDS + 10);
+    
+    for (Uint32 w=0; w < PROG_WORDS; w++)
+      CHECKEQUAL(0, prog.load_const_null(1));
+    
+    CHECKEQUAL(0, prog.interpret_exit_ok());
+    CHECKEQUAL(0, prog.finalise());
+    
+    CHECKEQUAL(0, scan->setInterpretedCode(&prog));
+    
+    /* Api doesn't seem to wait for result of scan request */
+    CHECKEQUAL(0, trans->execute(NdbTransaction::NoCommit));
+    
+    CHECKEQUAL(0, trans->getNdbError().code);
+
+    CHECKEQUAL(-1, scan->nextResult());
+    
+    int expectedResult= subcase.expectedRc;
+    CHECKEQUAL(expectedResult, scan->getNdbError().code);
+
+    scan->close();
+    
+    trans->close();
+  }
+
+  restarter.insertErrorInAllNodes(0);
+
+  return NDBT_OK;
+}
+
+
 NDBT_TESTSUITE(testLimits);
 
 TESTCASE("ExhaustSegmentedSectionPk",
@@ -935,6 +1024,11 @@ TESTCASE("ExhaustSegmentedSectionScan",
   INITIALIZER(testSegmentedSectionScan);
 }
 
+TESTCASE("DropSignalFragments",
+         "Test behaviour of Segmented Section exhaustion with fragmented signals"){
+  INITIALIZER(testDropSignalFragments);
+}
+
 NDBT_TESTSUITE_END(testLimits);
 
 int main(int argc, const char** argv){

=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-11-10 11:45:05 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-12-09 17:02:23 +0000
@@ -88,6 +88,49 @@ static
 const
 NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
 
+static
+const
+NDBT_Attribute MYTAB2Attribs[] = {
+  NDBT_Attribute("id",        NdbDictionary::Column::Unsigned, 1, true),
+  //                                                         _pk    _nullable
+  NDBT_Attribute("1bitnn",    NdbDictionary::Column::Bit, 1, false, false),
+  NDBT_Attribute("1bitnu",    NdbDictionary::Column::Bit, 1, false, true),
+  NDBT_Attribute("2bitnn",    NdbDictionary::Column::Bit, 2, false, false),
+  NDBT_Attribute("2bitnu",    NdbDictionary::Column::Bit, 2, false, true),
+  NDBT_Attribute("7bitnn",    NdbDictionary::Column::Bit, 7, false, false),
+  NDBT_Attribute("7bitnu",    NdbDictionary::Column::Bit, 7, false, true),
+  NDBT_Attribute("8bitnn",    NdbDictionary::Column::Bit, 8, false, false),
+  NDBT_Attribute("8bitnu",    NdbDictionary::Column::Bit, 8, false, true),
+  NDBT_Attribute("15bitnn",   NdbDictionary::Column::Bit, 15, false, false),
+  NDBT_Attribute("15bitnu",   NdbDictionary::Column::Bit, 15, false, true),
+  NDBT_Attribute("31bitnn",   NdbDictionary::Column::Bit, 31, false, false),
+  NDBT_Attribute("31bitnu",   NdbDictionary::Column::Bit, 31, false, true),
+  NDBT_Attribute("32bitnn",   NdbDictionary::Column::Bit, 32, false, false),
+  NDBT_Attribute("32bitnu",   NdbDictionary::Column::Bit, 32, false, true),
+  NDBT_Attribute("33bitnn",   NdbDictionary::Column::Bit, 33, false, false),
+  NDBT_Attribute("33bitnu",   NdbDictionary::Column::Bit, 33, false, true),
+  NDBT_Attribute("63bitnn",   NdbDictionary::Column::Bit, 63, false, false),
+  NDBT_Attribute("63bitnu",   NdbDictionary::Column::Bit, 63, false, true),
+  NDBT_Attribute("64bitnn",   NdbDictionary::Column::Bit, 64, false, false),
+  NDBT_Attribute("64bitnu",   NdbDictionary::Column::Bit, 64, false, true),
+  NDBT_Attribute("65bitnn",   NdbDictionary::Column::Bit, 65, false, false),
+  NDBT_Attribute("65bitnu",   NdbDictionary::Column::Bit, 65, false, true),
+  NDBT_Attribute("127bitnn",  NdbDictionary::Column::Bit, 127, false, false),
+  NDBT_Attribute("127bitnu",  NdbDictionary::Column::Bit, 127, false, true),
+  NDBT_Attribute("513bitnn",   NdbDictionary::Column::Bit, 513, false, false),
+  NDBT_Attribute("513bitnu",   NdbDictionary::Column::Bit, 513, false, true)
+};
+
+static const char* TABLE2_NAME= "MyTab2";
+
+static
+const
+NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
+
+static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
+static const int MAX_BIT_WIDTH= 513;
+/* One extra row for all bits == 0 */
+static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
 
 int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp, 
 			 bool existsOk, NDBT_CreateTableHook f)
@@ -1013,6 +1056,588 @@ int runScanFilterConstructorFail(NDBT_Co
   return NDBT_OK;
 }
 
+bool getBit(const Uint32* bitMap,
+            int bitNum)
+{
+  return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
+}
+
+void setBit(Uint32* bitMap,
+            int bitNum)
+{
+  bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
+}
+
+enum TestConditions {
+  COND_LE = 0,
+  COND_LT = 1,
+  COND_GE = 2,
+  COND_GT = 3,
+  COND_EQ = 4,
+  COND_NE = 5,
+  COND_NULL = 6,
+  COND_NOTNULL = 7,
+  COND_AND_EQ_MASK = 8,
+  COND_AND_NE_MASK = 9,
+  COND_AND_EQ_ZERO = 10,
+  COND_AND_NE_ZERO = 11
+};
+
+int getExpectedBitsSet(int rowId,
+                       int colBitWidth)
+{
+  /* returns value from -1 to colBitWidth -1
+   * -1 == no bits set
+   * 0 == bit 0 set
+   * 1 == bit 0 + bit 1 set
+   * ...
+   */
+  return (rowId % (colBitWidth + 1)) -1;
+}
+
+bool isNullValue(int rowId,
+                 int colBitWidth)
+{
+  /* Occasionally we'll have a Null column */
+  return (((rowId + colBitWidth) % 13) == 0);
+}
+
+void getBitfieldVariants(int bitNum, int& offset, bool& invert)
+{
+  offset= 0;
+  invert= false;
+  if ((bitNum % 5) == 3)
+  {
+    /* Invert the mask */
+    invert= true;
+  }
+  if ((bitNum % 7) == 6)
+  {
+    /* Shift the mask */
+    offset= (bitNum / 2);
+  }
+};
+  
+
+bool isRowExpected(int rowId, 
+                   TestConditions cond,
+                   int colBitWidth,
+                   int bitsSetInScanFilter,
+                   bool isNullable,
+                   const Uint32* maskBuff)
+{
+  if (isNullable && isNullValue(rowId, colBitWidth))
+  {
+    switch (cond) {
+    case COND_LE:
+      return true; // null < any value
+    case COND_LT:
+      return true; // null < any value
+    case COND_GE:
+      return false; // null < any value
+    case COND_GT:
+      return false; // null < any value
+    case COND_EQ:
+      return false; // null != any value 
+    case COND_NE:
+      return true; // null != any value
+    case COND_NULL:
+      return true; // null is null 
+    case COND_NOTNULL:
+      return false;
+    case COND_AND_EQ_MASK:
+      return false; // NULL AND MASK != MASK
+    case COND_AND_NE_MASK:
+      return true; // NULL AND MASK != MASK
+    case COND_AND_EQ_ZERO:
+      return false; // NULL AND MASK != 0
+    case COND_AND_NE_ZERO:
+      return true; // NULL AND MASK != 0
+    default:
+      printf("isRowExpected given bad condition : %u\n",
+             cond);
+      return false;
+    }    
+  }
+  else
+  {
+    /* Not a null value */
+    int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
+    
+    int offset= 0;
+    bool invert= false;
+
+    getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
+
+    switch (cond) {
+    case COND_LE:
+      return expectedBitsSet <= bitsSetInScanFilter;
+    case COND_LT:
+      return expectedBitsSet <  bitsSetInScanFilter;
+    case COND_GE:
+      return expectedBitsSet >= bitsSetInScanFilter;
+    case COND_GT:
+      return expectedBitsSet >  bitsSetInScanFilter;
+    case COND_EQ:
+      return expectedBitsSet == bitsSetInScanFilter;
+    case COND_NE:
+      return expectedBitsSet != bitsSetInScanFilter;
+    case COND_NULL:
+      return false;
+    case COND_NOTNULL:
+      return true;
+    case COND_AND_EQ_MASK:
+    case COND_AND_NE_MASK:
+    {
+      bool result= true;
+      /* Compare data AND mask to the mask buff */
+      for (int idx=0; idx < colBitWidth; idx++)
+      {
+        bool bitVal= (expectedBitsSet >= 0)?
+          (expectedBitsSet >= idx): false;
+        bool maskVal= getBit(maskBuff, idx);
+        if ((bitVal & maskVal) != maskVal)
+        {
+          /* Difference to mask, know result */
+          result= false;
+          break;
+        }
+      }
+
+      /* Invert result for NE condition */
+      return (result ^ (cond == COND_AND_NE_MASK));
+    }
+    case COND_AND_EQ_ZERO:
+    case COND_AND_NE_ZERO:
+    {
+      bool result= true;
+      /* Compare data AND mask to zero */
+      for (int idx=0; idx < colBitWidth; idx++)
+      {
+        bool bitVal= (expectedBitsSet >= 0)?
+          (expectedBitsSet >= idx): false;
+        bool maskVal= getBit(maskBuff, idx);
+        if ((bitVal & maskVal) != 0)
+        {
+          /* Difference to 0, know result */
+          result= false;
+          break;
+        }
+      }
+      /* Invert result for NE condition */
+      return (result ^ (cond == COND_AND_NE_ZERO));
+    }
+    default:
+      printf("isRowExpected given bad condition : %u\n",
+             cond);
+      return false;
+    }
+  }
+}
+
+int insertBitRows(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+
+  for (int i=0; i< TOTAL_ROWS; i++)
+  {
+    NdbTransaction* myTrans = pNdb->startTransaction();
+    if (myTrans == NULL)
+      APIERROR(pNdb->getNdbError());
+
+    NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
+    if (insertOp == NULL)
+      APIERROR(pNdb->getNdbError());
+    
+    const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
+    Uint32 buff[ buffSize ];
+
+    if (insertOp->insertTuple() != 0)
+      APIERROR(insertOp->getNdbError());
+
+    if (insertOp->equal((Uint32)0, i) != 0) // Set id column
+      APIERROR(insertOp->getNdbError());
+
+    for (int col=1; col < myTable->getNoOfColumns(); col++)
+    {
+      const NdbDictionary::Column* c= myTable->getColumn(col);
+      int colBitWidth= c->getLength();
+      bool isNullable= c->getNullable();
+
+      if (isNullable &&
+          isNullValue(i, colBitWidth))
+      {
+        /* Set column value to NULL */
+        if (insertOp->setValue(col, (char*) NULL) != 0)
+          APIERROR(insertOp->getNdbError());
+      }
+      else
+      {
+        /* Set lowest bits in this column */
+        memset(buff, 0, (4 * buffSize));
+        
+        int bitsToSet= getExpectedBitsSet(i, colBitWidth);
+        
+        if (bitsToSet >= 0)
+        {
+          for (int idx=0; idx <= bitsToSet; idx++)
+            setBit(buff, idx);
+        }
+        
+        if (insertOp->setValue(col, (char *)buff) != 0)
+          APIERROR(insertOp->getNdbError());
+      }
+    }
+
+    if (myTrans->execute(NdbTransaction::Commit) != 0)
+    {
+      APIERROR(myTrans->getNdbError());
+    }
+    myTrans->close();
+  }
+
+  printf("Inserted %u rows\n", TOTAL_ROWS);
+
+  return NDBT_OK;
+}
+
+int verifyBitData(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+
+  NdbTransaction* myTrans= pNdb->startTransaction();
+  if (myTrans == NULL)
+    APIERROR(pNdb->getNdbError());
+  
+  NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+  if (scanOp == NULL)
+    APIERROR(pNdb->getNdbError());
+  
+  if (scanOp->readTuples() != 0)
+    APIERROR(scanOp->getNdbError());
+  
+  NdbRecAttr* results[ NUM_COLS ];
+  
+  for (int col=0; col< NUM_COLS; col++)
+  {
+    if ((results[col]= scanOp->getValue(col)) == NULL)
+      APIERROR(scanOp->getNdbError());
+  }
+  
+  if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+    APIERROR(myTrans->getNdbError());
+  
+  for (int row=0; row < TOTAL_ROWS; row++)
+  {
+    if (scanOp->nextResult() != 0)
+      APIERROR(scanOp->getNdbError());
+    
+    int rowId= results[0]->int32_value();
+    
+    for (int col=1; col < NUM_COLS; col++)
+    {
+      const NdbDictionary::Column* c= myTable->getColumn(col);
+      bool isNullable= c->getNullable();
+      int colBitWidth= c->getLength();
+
+      if (isNullable &&
+          isNullValue(rowId, colBitWidth))
+      {
+        if (!results[ col ] ->isNULL())
+        {
+          printf("Mismatch at result %d row %d, column %d, expected NULL\n",
+                 row, rowId, col);
+          myTrans->close();
+          return NDBT_FAILED;
+        }
+      }
+      else
+      {
+        /* Non null value, check it */
+        int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
+        
+        const Uint32* val= (const Uint32 *) results[ col ]->aRef();
+        
+        for (int bitNum=0; bitNum < colBitWidth; bitNum++)
+        {
+          bool expectClear= (bitNum > expectedSetBits);
+          bool isClear= ! getBit(val, bitNum);
+          if (expectClear != isClear)
+          {
+            printf("Mismatch at result %d row %d, column %d, bit %d"
+                   " expected %d \n",
+                   row, rowId, col, bitNum, (expectClear)?0:1);
+            myTrans->close();
+            return NDBT_FAILED;
+          }
+        }
+      }
+    }
+  }
+  
+  if (scanOp->nextResult() != 1)
+  {
+    printf("Too many rows returned\n");
+    return NDBT_FAILED;
+  }
+  
+  if (myTrans->execute(NdbTransaction::Commit) != 0)
+    APIERROR(myTrans->getNdbError());
+  
+  myTrans->close();
+
+  printf("Verified data for %u rows\n",
+         TOTAL_ROWS);
+  
+  return NDBT_OK;
+}
+
+int verifyBitScanFilter(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+  
+  /* Perform scan with scanfilter for :
+   *   - each column in the table
+   *   - each supported comparison type
+   *   - each potentially set bit in the column
+   */
+  int scanCount= 0;
+  for (int col=1; col < NUM_COLS; col++)
+  {
+    const NdbDictionary::Column* c= myTable->getColumn(col);
+    const int bitWidth= c->getLength();
+    printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
+           (c->getNullable())?"Nullable":"Non-null",
+           col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
+    for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
+    {
+      for (int bitNum=0; bitNum <= bitWidth; bitNum++)
+      {
+        /* Define scan */
+        NdbTransaction* myTrans= pNdb->startTransaction();
+        if (myTrans == NULL)
+          APIERROR(pNdb->getNdbError());
+        
+        NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+        if (scanOp == NULL)
+          APIERROR(pNdb->getNdbError());
+        
+        if (scanOp->readTuples() != 0)
+          APIERROR(scanOp->getNdbError());
+        
+        NdbRecAttr* ra;
+        if ((ra= scanOp->getValue((Uint32)0)) == NULL)
+          APIERROR(scanOp->getNdbError());
+        
+        /* Define ScanFilter */
+        const int buffSize= (MAX_BIT_WIDTH + 31)/32;
+        Uint32 buff[ buffSize ];
+        memset(buff, 0, (4 * buffSize));
+        
+        /* Define constant value, with some variants for bitwise operators */
+        bool invert= false;
+        int offset= 0;
+
+        switch (comp) {
+        case COND_AND_EQ_MASK:
+        case COND_AND_NE_MASK:
+        case COND_AND_EQ_ZERO:
+        case COND_AND_NE_ZERO:
+          getBitfieldVariants(bitNum, offset, invert);
+        default:
+          ;
+        }        
+
+        /* Set lower bitNum -1 bits
+         * If bitNum == 0, set none
+         */
+        int bitsSetInFilter= bitNum-1;
+
+        if ((bitsSetInFilter >= 0) ||
+            invert)
+        {
+          for (int idx=0; idx <= (32 * buffSize); idx++)
+          {
+            if ((idx >= offset) && 
+                (idx <= (offset + bitsSetInFilter)))
+            {
+              if (!invert)
+                setBit(buff, idx);
+            }
+            else
+            {
+              if (invert)
+                setBit(buff, idx);
+            }
+          }
+        }
+
+
+
+        NdbScanFilter sf(scanOp);
+        
+        if (sf.begin(NdbScanFilter::AND) != 0)
+          APIERROR(sf.getNdbError());
+        
+        if ((comp != COND_NULL) &&
+            (comp != COND_NOTNULL))
+        {
+          /* Operator with a constant */
+          if (sf.cmp((NdbScanFilter::BinaryCondition)comp, 
+                     col, 
+                     (char*)buff) != 0)
+            APIERROR(sf.getNdbError());
+        }
+        else
+        {
+          switch (comp) {
+          case COND_NULL:
+            if (sf.isnull(col) != 0)
+              APIERROR(sf.getNdbError());
+            break;
+          case COND_NOTNULL:
+            if (sf.isnotnull(col) != 0)
+              APIERROR(sf.getNdbError());
+            break;
+          default:
+            printf("Condition %u not supported\n", comp);
+            return NDBT_FAILED;
+          }
+        }
+        
+        if (sf.end() != 0)
+          APIERROR(sf.getNdbError());
+
+        /* Calculate expected number of rows in result */
+        const NdbDictionary::Column* c= myTable->getColumn(col);
+        int colBitWidth= c->getLength();
+        bool isNullable= c->getNullable();
+
+        // printf("Determining expected rows\n");
+        int expectedResultCount= 0;
+        for (int i=0; i< TOTAL_ROWS; i++)
+        {
+          if (isRowExpected(i, 
+                            (TestConditions)comp, 
+                            colBitWidth, 
+                            bitsSetInFilter,
+                            isNullable,
+                            buff))
+            expectedResultCount++;
+        }
+        
+        
+        /* Execute */
+        if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+          APIERROR(myTrans->getNdbError());
+        
+
+        /* Process results to ensure we got the expected rows back */
+        int rc= 0;
+        int count= 0;
+        int matchCount= 0;
+        // printf("Checking rows returned\n");
+        while ((rc= scanOp->nextResult()) == 0)
+        {
+          int rowId= ra->int32_value();
+          count++;
+          /*
+           * Check that this row was expected
+           */
+          if (isRowExpected(rowId, 
+                            (TestConditions)comp, 
+                            colBitWidth, 
+                            bitsSetInFilter,
+                            isNullable,
+                            buff))
+          {
+            matchCount++;
+          }
+          else
+          {
+            printf("Col=%u Comp=%u BitNum=%u row=%u : "
+                   "Got row %u back which I did not expect\n",
+                   col, comp, bitNum, count, rowId);
+            myTrans->close();
+            return NDBT_FAILED;
+          }
+        }
+
+        if (rc != 1)
+        {
+          printf("Col=%u Comp=%u BitNum=%u :"
+                 "nextResult failure %d\n", col, comp, bitNum, rc);
+          APIERROR(myTrans->getNdbError());
+        }
+
+        //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
+        //       col, comp, bitNum, expectedResultCount, matchCount);
+        
+        /* Check that we didn't miss any expected rows */
+        if (matchCount != expectedResultCount)
+        {
+          printf("Col=%u Comp=%u BitNum=%u :"
+                 "Mismatch between expected(%u) and received(%u) result counts\n",
+                 col, comp, bitNum, expectedResultCount, matchCount);
+          myTrans->close();
+          return NDBT_FAILED;
+        }
+        
+        if (myTrans->execute(NdbTransaction::Commit) != 0)
+          APIERROR(myTrans->getNdbError());
+        
+        myTrans->close();
+
+        scanCount++;
+        
+      } // for bitNum
+    } // for comparison
+  } // for column
+  
+  printf("Verified %u scans with bitfield ScanFilter conditions\n",
+         scanCount);
+
+  return NDBT_OK;
+}
+
+
+int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
+{
+  /* Create table */
+  Ndb *pNdb = GETNDB(step);
+  pNdb->getDictionary()->dropTable(MYTAB2.getName());
+  int ret = createTable(pNdb, &MYTAB2, false, true, 0); 
+  if(ret)
+    return ret;
+
+  /* Populate with data */
+  if (insertBitRows(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+  
+  /* Initial data check via scan */
+  if (verifyBitData(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+
+  /* Verify Bit ScanFilter correctness */
+  if (verifyBitScanFilter(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+
+  /* Drop table */
+  pNdb->getDictionary()->dropTable(MYTAB2.getName());
+  
+  return NDBT_OK;
+}
+
+
 NDBT_TESTSUITE(testScanFilter);
 TESTCASE(TEST_NAME, 
 	 "Scan table TABLE_NAME for the records which accord with \
@@ -1026,6 +1651,12 @@ TESTCASE(TEST_NAME, 
   FINALIZER(runDropTables);
 }
 
+TESTCASE("TestScanFilterBit",
+         "Test ScanFilter with bitfield columns")
+{
+  INITIALIZER(runTestScanFilterBit);
+}
+
 NDBT_TESTSUITE_END(testScanFilter);
 
 
@@ -1039,6 +1670,6 @@ int main(int argc, const char** argv)
     return NDBT_ProgramExit(NDBT_FAILED);
   }
 
-  NDBT_TESTSUITE_INSTANCE(testScanFilter);  
-  return testScanFilter.executeOneCtx(con, &MYTAB1, TEST_NAME);
+  NDBT_TESTSUITE_INSTANCE(testScanFilter);
+  return testScanFilter.execute(argc, argv);
 }

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2008-12-08 13:58:15 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2008-12-09 17:15:12 +0000
@@ -1175,7 +1175,7 @@ args -n bug37672 T1
 
 #EOF 2008-07-04
 max-time: 500
-cmd: testScanFilter
+cmd: testScanFilter T1
 args: 
 
 #EOF 2008-07-09
@@ -1198,3 +1198,6 @@ max-time: 300 
 cmd: testMgm
 args: 
 
+max-time: 500
+cmd: testLimits
+args: -n DropSignalFragments T1

Thread
bzr push into mysql-5.1 branch (frazer:3164 to 3167) Frazer Clement9 Dec