List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:December 9 2008 5:53pm
Subject:bzr push into mysql-5.1 branch (frazer:2785 to 2786) Bug#40535
View as plain text  
 2786 Frazer Clement	2008-12-09
      Bug# 40535 : NDBAPI : ScanFilter does not support BIT columns
      
      This bug is fixed to provide ScanFilter support for EQ, NE LT, LE, GT, GE 
      on bit columns.
      
      Additionally, 4 new comparison operations are defined for bit types :
       - COL_AND_MASK_EQ_MASK
       - COL_AND_MASK_NE_MASK
       - COL_AND_MASK_EQ_ZERO
       - COL_AND_MASK_NE_ZERO
      
      These can be used to test bits within a bitfield as part of a ScanFiler.
      
      A test program for all of the conditions and bit columns is added
modified:
  storage/ndb/include/kernel/Interpreter.hpp
  storage/ndb/include/ndbapi/NdbInterpretedCode.hpp
  storage/ndb/include/ndbapi/NdbOperation.hpp
  storage/ndb/include/ndbapi/NdbScanFilter.hpp
  storage/ndb/include/util/NdbSqlUtil.hpp
  storage/ndb/src/common/util/NdbSqlUtil.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
  storage/ndb/src/ndbapi/NdbInterpretedCode.cpp
  storage/ndb/src/ndbapi/NdbOperationInt.cpp
  storage/ndb/src/ndbapi/NdbScanFilter.cpp
  storage/ndb/test/ndbapi/testScanFilter.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt

 2785 Jonas Oreland	2008-12-08 [merge]
      merge 62 to 63
modified:
  storage/ndb/src/kernel/blocks/ERROR_codes.txt
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
  storage/ndb/test/include/NdbRestarter.hpp
  storage/ndb/test/ndbapi/testNodeRestart.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt
  storage/ndb/test/src/NdbRestarter.cpp

=== modified file 'storage/ndb/include/kernel/Interpreter.hpp'
--- a/storage/ndb/include/kernel/Interpreter.hpp	2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/kernel/Interpreter.hpp	2008-12-09 15:47:14 +0000
@@ -89,18 +89,17 @@ public:
   /**
    * Branch string
    *
-   * i = Instruction            -  5 Bits ( 0 - 5 ) max 63
-   * a = Attribute id
-   * l = Length of string
-   * b = Branch offset
-   * t = branch type
+   * i = Instruction              -  5 Bits ( 0 - 5 ) max 63
+   * a = Attribute id             -  16 bits
+   * l = Length of string (bytes) -  16 bits
+   * b = Branch offset (words)    -  16 bits
+   * t = branch type              -  4 bits
    * d = Array length diff
    * v = Varchar flag
-   * p = No-blank-padding flag for char compare
    *
    *           1111111111222222222233
    * 01234567890123456789012345678901
-   * iiiiii   ddvtttpbbbbbbbbbbbbbbbb
+   * iiiiii   ddvttttbbbbbbbbbbbbbbbb
    * aaaaaaaaaaaaaaaallllllllllllllll
    * -string....                    -
    */
@@ -117,17 +116,21 @@ public:
     GT = 4,
     GE = 5,
     LIKE = 6,
-    NOT_LIKE = 7
+    NOT_LIKE = 7,
+    AND_EQ_MASK = 8,
+    AND_NE_MASK = 9,
+    AND_EQ_ZERO = 10,
+    AND_NE_ZERO = 11
   };
+  // TODO : Remove other 2 unused parameters.
   static Uint32 BranchCol(BinaryCondition cond, 
-			  Uint32 arrayLengthDiff, Uint32 varchar, bool nopad);
+			  Uint32 arrayLengthDiff, Uint32 varchar);
   static Uint32 BranchCol_2(Uint32 AttrId);
   static Uint32 BranchCol_2(Uint32 AttrId, Uint32 Len);
 
   static Uint32 getBinaryCondition(Uint32 op1);
   static Uint32 getArrayLengthDiff(Uint32 op1);
   static Uint32 isVarchar(Uint32 op1);
-  static Uint32 isNopad(Uint32 op1);
   static Uint32 getBranchCol_AttrId(Uint32 op2);
   static Uint32 getBranchCol_Len(Uint32 op2);
   
@@ -216,15 +219,14 @@ inline
 Uint32
 Interpreter::BranchCol(BinaryCondition cond, 
 		       Uint32 arrayLengthDiff,
-		       Uint32 varchar, bool nopad){
-  //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u nopad=%d",
-      //cond, arrayLengthDiff, varchar, nopad);
+		       Uint32 varchar){
+  //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u",
+      //cond, arrayLengthDiff, varchar);
   return 
     BRANCH_ATTR_OP_ARG + 
     (arrayLengthDiff << 9) + 
     (varchar << 11) +
-    (cond << 12) +
-    (nopad << 15);
+    (cond << 12);
 }
 
 inline
@@ -242,7 +244,7 @@ Interpreter::BranchCol_2(Uint32 AttrId){
 inline
 Uint32
 Interpreter::getBinaryCondition(Uint32 op){
-  return (op >> 12) & 0x7;
+  return (op >> 12) & 0xf;
 }
 
 inline
@@ -259,12 +261,6 @@ Interpreter::isVarchar(Uint32 op){
 
 inline
 Uint32
-Interpreter::isNopad(Uint32 op){
-  return (op >> 15) & 1;
-}
-
-inline
-Uint32
 Interpreter::getBranchCol_AttrId(Uint32 op){
   return (op >> 16) & 0xFFFF;
 }

=== modified file 'storage/ndb/include/ndbapi/NdbInterpretedCode.hpp'
--- a/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp	2008-04-07 09:52:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp	2008-12-09 15:47:14 +0000
@@ -225,27 +225,30 @@ public:
    * Space required        Buffer          Request message
    *   branch_col_*_null   2 words         2 words
    *   branch_col_*        2 words +       2 words + 
-   *                       len bytes       len bytes
+   *                       type length     type length
    *                       rounded to      rounded to
    *                       nearest word    nearest word
    *
+   *                       Only significant words stored/
+   *                       sent for VAR* types
+   *
    * @param val       ptr to const value to compare against
-   * @param len       length in bytes of const value
+   * @param unused    unnecessary
    * @param attrId    column to compare
    * @param Label     Program label to jump to if condition is true
    * @return 0 if successful, -1 otherwise.
    */
-  int branch_col_eq(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_eq(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_ne(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_ne(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_lt(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_lt(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_le(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_le(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_gt(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_gt(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
-  int branch_col_ge(const void * val, Uint32 len, Uint32 attrId,
+  int branch_col_ge(const void * val, Uint32 unused, Uint32 attrId,
                     Uint32 Label);
   int branch_col_eq_null(Uint32 attrId, Uint32 Label);
   int branch_col_ne_null(Uint32 attrId, Uint32 Label);
@@ -280,13 +283,60 @@ public:
    * @param attrId    column to compare
    * @param Label     Program label to jump to if condition is true
    * @return 0 if successful, -1 otherwise.
-
+   *
    */
   int branch_col_like(const void * val, Uint32 len, Uint32 attrId,
                       Uint32 Label);
   int branch_col_notlike(const void * val, Uint32 len, Uint32 attrId,
                          Uint32 Label);
 
+  /* Table based bitwise logical conditional operations
+   * --------------------------------------------------
+   * These instructions are used to branch based on the 
+   * result of logical AND between Bit type column data 
+   * and a bitmask pattern.
+   *
+   * These instructions require that the table being operated
+   * upon was supplied when the NdbInterpretedCode object was
+   * constructed.
+   *
+   * The mask value should be the same size as the bit column
+   * being compared.
+   * Bitfields are passed in/out of NdbApi as 32-bit words
+   * with bits set from lsb to msb.
+   * The platform's endianness controls which byte contains
+   * the ls bits.  
+   *   x86= first(0th) byte.  Sparc/PPC= last(3rd byte)
+   *
+   * To set bit n of a bitmask to 1 from a Uint32* mask :
+   *   mask[n >> 5] |= (1 << (n & 31))
+   *
+   * if (BitWiseAnd(ValueOf(attrId), *mask) <EQ/NE> <*mask/0>)
+   *   goto Label;
+   *
+   * Space required        Buffer          Request message
+   *   branch_col_and_mask_eq_mask/
+   *   branch_col_and_mask_ne_mask/
+   *   branch_col_and_mask_eq_zero/
+   *   branch_col_and_mask_ne_zero
+   *                       2 words +       2 words + 
+   *                       column width    column width
+   *                       rounded to      rounded to
+   *                       nearest word    nearest word
+   *
+   * @param mask      ptr to const mask to use
+   * @param unused    unnecessary
+   * @param attrId    column to compare
+   * @param Label     Program label to jump to if condition is true
+   * @return 0 if successful, -1 otherwise.
+   *
+   */
+  int branch_col_and_mask_eq_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_ne_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_eq_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+  int branch_col_and_mask_ne_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+
+
   /* Program results 
    * ---------------
    * These instructions indicate to the interpreter that processing

=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-11-08 21:22:57 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-12-09 15:47:14 +0000
@@ -734,7 +734,12 @@ public:
   int branch_col_ge(Uint32 ColId, const void * val, Uint32 len, 
 		    bool nopad, Uint32 Label);
   /**
-   * The argument is always plain char, even if the field is varchar
+   * LIKE/NOTLIKE wildcard comparisons
+   * These instructions support SQL-style % and _ wildcards for
+   * (VAR)CHAR/BINARY columns only
+   *
+   * The argument is always plain char format, even if the field 
+   * is varchar
    * (changed in 5.0.22).
    * 
    * @note For Scans and NdbRecord operations, use the 
@@ -744,7 +749,40 @@ public:
 		      bool nopad, Uint32 Label);
   int branch_col_notlike(Uint32 ColId, const void *, Uint32 len, 
 			 bool nopad, Uint32 Label);
-  
+
+  /**
+   * Bitwise logical comparisons
+   *
+   * These comparison types are only supported for the Bitfield
+   * type
+   * They can be used to test for bit patterns in bitfield columns
+   *   The value passed is a bitmask which is bitwise-ANDed with the
+   *   column data.
+   *   Bitfields are passed in/out of NdbApi as 32-bit words with
+   *   bits set from lsb to msb.
+   *   The platform's endianness controls which byte contains the ls
+   *   bits.
+   *     x86= first(0th) byte.  Sparc/PPC= last (3rd byte)
+   *
+   *   To set bit n of a bitmask to 1 from a Uint32* mask : 
+   *     mask[n >> 5] |= (1 << (n & 31))
+   *
+   *   The branch can be taken in 4 cases :
+   *     - Column data AND Mask == Mask (all masked bits are set in data)
+   *     - Column data AND Mask != Mask (not all masked bits are set in data)
+   *     - Column data AND Mask == 0    (No masked bits are set in data)
+   *     - Column data AND Mask != 0    (Some masked bits are set in data)
+   *   
+   */
+  int branch_col_and_mask_eq_mask(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_ne_mask(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_eq_zero(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+  int branch_col_and_mask_ne_zero(Uint32 ColId, const void *, Uint32 len, 
+                                  bool nopad, Uint32 Label);
+
   /**
    * Interpreted program instruction: Exit with Ok
    *
@@ -1249,7 +1287,7 @@ protected:
   int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest);
   int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource);
   int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32);
-  int branch_col(Uint32 type, Uint32, const void *, Uint32, bool, Uint32 Label);
+  int branch_col(Uint32 type, Uint32, const void *, Uint32, Uint32 Label);
   int branch_col_null(Uint32 type, Uint32 col, Uint32 Label);
   NdbBlob *linkInBlobHandle(NdbTransaction *aCon,
                             const NdbColumnImpl *column,

=== modified file 'storage/ndb/include/ndbapi/NdbScanFilter.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanFilter.hpp	2008-04-07 09:56:30 +0000
+++ b/storage/ndb/include/ndbapi/NdbScanFilter.hpp	2008-12-09 15:47:14 +0000
@@ -80,14 +80,18 @@ public:
 
   enum BinaryCondition 
   {
-    COND_LE = 0,        ///< lower bound
-    COND_LT = 1,        ///< lower bound, strict
-    COND_GE = 2,        ///< upper bound
-    COND_GT = 3,        ///< upper bound, strict
-    COND_EQ = 4,        ///< equality
-    COND_NE = 5,        ///< not equal
-    COND_LIKE = 6,      ///< like
-    COND_NOT_LIKE = 7   ///< not like
+    COND_LE = 0,           ///< lower bound
+    COND_LT = 1,           ///< lower bound, strict
+    COND_GE = 2,           ///< upper bound
+    COND_GT = 3,           ///< upper bound, strict
+    COND_EQ = 4,           ///< equality
+    COND_NE = 5,           ///< not equal
+    COND_LIKE = 6,         ///< like
+    COND_NOT_LIKE = 7,     ///< not like
+    COND_AND_EQ_MASK = 8,  ///< (bit & mask) == mask
+    COND_AND_NE_MASK = 9,  ///< (bit & mask) != mask (incl. NULL)
+    COND_AND_EQ_ZERO = 10, ///< (bit & mask) == 0
+    COND_AND_NE_ZERO = 11, ///< (bit & mask) != 0 (incl. NULL)
   };
 
   /** 
@@ -130,6 +134,9 @@ public:
    * documentation for NdbOperation::equal().
    * For BinaryConditions LIKE and NOT_LIKE, the value pointed to by val
    * should NOT include initial length bytes.
+   * For LIKE and NOT_LIKE, the % and ? wildcards are supported.
+   * For bitmask operations, see the bitmask format information against
+   * the branch_col_and_mask_eq_mask instruction in NdbInterpretedCode.hpp
    *
    *  �return  0 if successful, -1 otherwise
    */

=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp	2008-12-09 15:47:14 +0000
@@ -52,6 +52,15 @@ public:
    */
   typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2);
 
+  /**
+   * Prototype for mask comparisons.  Defined for bit type.
+   *
+   * If common portion of data AND Mask is equal to mask
+   * return 0, else return 1.
+   * If cmpZero, compare data AND Mask to zero.
+   */
+  typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero); 
+
   enum CmpResult {
     CmpLess = -1,
     CmpEqual = 0,
@@ -96,6 +105,7 @@ public:
     Enum m_typeId;      // redundant
     Cmp* m_cmp;         // comparison method
     Like* m_like;       // "like" comparison method
+    AndMask* m_mask;    // Mask comparison method
   };
 
   /**
@@ -179,6 +189,8 @@ private:
   static Like likeVarbinary;
   static Like likeLongvarchar;
   static Like likeLongvarbinary;
+  //
+  static AndMask maskBit;
 };
 
 #endif

=== modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp'
--- a/storage/ndb/src/common/util/NdbSqlUtil.cpp	2007-02-23 11:28:34 +0000
+++ b/storage/ndb/src/common/util/NdbSqlUtil.cpp	2008-12-09 15:47:14 +0000
@@ -26,156 +26,187 @@ NdbSqlUtil::m_typeList[] = {
   { // 0
     Type::Undefined,
     NULL,
+    NULL,
     NULL
   },
   { // 1
     Type::Tinyint,
     cmpTinyint,
+    NULL,
     NULL
   },
   { // 2
     Type::Tinyunsigned,
     cmpTinyunsigned,
+    NULL,
     NULL
   },
   { // 3
     Type::Smallint,
     cmpSmallint,
+    NULL,
     NULL
   },
   { // 4
     Type::Smallunsigned,
     cmpSmallunsigned,
+    NULL,
     NULL
   },
   { // 5
     Type::Mediumint,
     cmpMediumint,
+    NULL,
     NULL
   },
   { // 6
     Type::Mediumunsigned,
     cmpMediumunsigned,
+    NULL,
     NULL
   },
   { // 7
     Type::Int,
     cmpInt,
+    NULL,
     NULL
   },
   { // 8
     Type::Unsigned,
     cmpUnsigned,
+    NULL,
     NULL
   },
   { // 9
     Type::Bigint,
     cmpBigint,
+    NULL,
     NULL
   },
   { // 10
     Type::Bigunsigned,
     cmpBigunsigned,
+    NULL,
     NULL
   },
   { // 11
     Type::Float,
     cmpFloat,
+    NULL,
     NULL
   },
   { // 12
     Type::Double,
     cmpDouble,
+    NULL,
     NULL
   },
   { // 13
     Type::Olddecimal,
     cmpOlddecimal,
+    NULL,
     NULL
   },
   { // 14
     Type::Char,
     cmpChar,
-    likeChar
+    likeChar,
+    NULL
   },
   { // 15
     Type::Varchar,
     cmpVarchar,
-    likeVarchar
+    likeVarchar,
+    NULL
   },
   { // 16
     Type::Binary,
     cmpBinary,
-    likeBinary
+    likeBinary,
+    NULL
   },
   { // 17
     Type::Varbinary,
     cmpVarbinary,
-    likeVarbinary
+    likeVarbinary,
+    NULL
   },
   { // 18
     Type::Datetime,
     cmpDatetime,
+    NULL,
     NULL
   },
   { // 19
     Type::Date,
     cmpDate,
+    NULL,
     NULL
   },
   { // 20
     Type::Blob,
     NULL,
+    NULL,
     NULL
   },
   { // 21
     Type::Text,
     NULL,
+    NULL,
     NULL
   },
   { // 22
     Type::Bit,
     cmpBit,
-    NULL
+    NULL,
+    maskBit
   },
   { // 23
     Type::Longvarchar,
     cmpLongvarchar,
-    likeLongvarchar
+    likeLongvarchar,
+    NULL
   },
   { // 24
     Type::Longvarbinary,
     cmpLongvarbinary,
-    likeLongvarbinary
+    likeLongvarbinary,
+    NULL
   },
   { // 25
     Type::Time,
     cmpTime,
+    NULL,
     NULL
   },
   { // 26
     Type::Year,
     cmpYear,
+    NULL,
     NULL
   },
   { // 27
     Type::Timestamp,
     cmpTimestamp,
+    NULL,
     NULL
   },
   { // 28
     Type::Olddecimalunsigned,
     cmpOlddecimalunsigned,
+    NULL,
     NULL
   },
   { // 29
     Type::Decimal,
     cmpDecimal,
+    NULL,
     NULL
   },
   { // 30
     Type::Decimalunsigned,
     cmpDecimalunsigned,
+    NULL,
     NULL
   }
 };
@@ -680,9 +711,54 @@ NdbSqlUtil::cmpText(const void* info, co
 int
 NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
 { 
-  Uint32 n = (n1 < n2) ? n1 : n2;
-  int ret = memcmp(p1, p2, n);
-  return ret;
+  /* Bitfields are stored as 32-bit words
+   * This means that a byte-by-byte comparison will not work on all platforms
+   * We do a word-wise comparison of the significant bytes.
+   * It is assumed that insignificant bits (but not bytes) are zeroed in the
+   * passed values.
+   */
+  const Uint32 bytes= MIN(n1, n2);
+  Uint32 words= (bytes + 3) >> 2;
+
+  /* Don't expect either value to be length zero */
+  assert(words);
+
+  /* Check ptr alignment */
+  if (unlikely(((((UintPtr)p1) & 3) != 0) ||
+               ((((UintPtr)p2) & 3) != 0)))
+  {
+    Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ];
+    Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ];
+    memcpy(copyP1, p1, words << 2);
+    memcpy(copyP2, p2, words << 2);
+
+    return cmpBit(info, copyP1, bytes, copyP2, bytes, full);
+  }
+
+  const Uint32* wp1= (const Uint32*) p1;
+  const Uint32* wp2= (const Uint32*) p2;
+  while (--words)
+  {
+    if (*wp1 < *wp2)
+      return -1;
+    if (*(wp1++) > *(wp2++))
+      return 1;
+  }
+
+  /* For the last word, we mask out any insignificant bytes */
+  const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+  const Uint32 mask= sigBytes?
+    (1 << (sigBytes *8)) -1 :
+    ~0;
+  const Uint32 lastWord1= *wp1 & mask;
+  const Uint32 lastWord2= *wp2 & mask;
+  
+  if (lastWord1 < lastWord2)
+    return -1;
+  if (lastWord1 > lastWord2)
+    return 1;
+
+  return 0;
 }
 
 
@@ -874,6 +950,85 @@ NdbSqlUtil::likeLongvarbinary(const void
   return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
 }
 
+
+// mask Functions
+
+int
+NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero)
+{
+  /* Bitfields are stored in word oriented form, so we must compare them in that
+   * style as well
+   * It is assumed that insignificant bits (but not bytes) in the passed values
+   * are zeroed
+   */
+  const Uint32 bytes = MIN(dataLen, maskLen);
+  Uint32 words = (bytes + 3) >> 2;
+
+  /* Don't expect either value to be length zero */
+  assert(words);
+
+  /* Check ptr alignment */
+  if (unlikely(((((UintPtr)data) & 3) != 0) ||
+               ((((UintPtr)mask) & 3) != 0)))
+  {
+    Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ];
+    Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ];
+    memcpy(copydata, data, words << 2);
+    memcpy(copymask, mask, words << 2);
+
+    return maskBit(data, bytes, mask, bytes, cmpZero);
+  }
+
+  const Uint32* wdata= (const Uint32*) data;
+  const Uint32* wmask= (const Uint32*) mask;
+
+  if (cmpZero)
+  {
+    while (--words)
+    {
+      if ((*(wdata++) & *(wmask++)) != 0)
+        return 1;
+    }
+    
+    /* For the last word, we mask out any insignificant bytes */
+    const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+    const Uint32 comparisonMask= sigBytes?
+      (1 << (sigBytes *8)) -1 :
+      ~0;
+    const Uint32 lastDataWord= *wdata & comparisonMask;
+    const Uint32 lastMaskWord= *wmask & comparisonMask;
+    
+    if ((lastDataWord & lastMaskWord) != 0)
+      return 1;
+
+    return 0;
+  }
+  else
+  {
+    while (--words)
+    {
+      if ((*(wdata++) & *wmask) != *wmask)
+        return 1;
+      
+      wmask++;
+    }
+
+    /* For the last word, we mask out any insignificant bytes */
+    const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+    const Uint32 comparisonMask= sigBytes?
+      (1 << (sigBytes *8)) -1 :
+      ~0;
+    const Uint32 lastDataWord= *wdata & comparisonMask;
+    const Uint32 lastMaskWord= *wmask & comparisonMask;
+    
+    if ((lastDataWord & lastMaskWord) != lastMaskWord)
+      return 1;
+
+    return 0;
+  }
+};
+
+
 // check charset
 
 uint

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-08-26 14:12:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-12-09 15:47:14 +0000
@@ -2491,12 +2491,23 @@ int Dbtup::interpreterNextLab(Signal* si
         const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
         // fixed length in 5.0
 	Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
+        
+        if (typeId == NDB_TYPE_BIT)
+        {
+          /* Size in bytes for bit fields can be incorrect due to
+           * rounding down
+           */
+          Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
+                                   + 7) / 8;
+          attrLen= bitFieldAttrLen;
+        }
 
 	bool r1_null = ah.isNULL();
 	bool r2_null = argLen == 0;
 	int res1;
-        if (cond != Interpreter::LIKE &&
-            cond != Interpreter::NOT_LIKE) {
+        if (cond <= Interpreter::GE)
+        {
+          /* Inequality - EQ, NE, LT, LE, GT, GE */
           if (r1_null || r2_null) {
             // NULL==NULL and NULL<not-NULL
             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
@@ -2509,16 +2520,42 @@ int Dbtup::interpreterNextLab(Signal* si
             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
           }
 	} else {
-          if (r1_null || r2_null) {
-            // NULL like NULL is true (has no practical use)
-            res1 =  r1_null && r2_null ? 0 : -1;
-          } else {
-	    jam();
-	    if (unlikely(sqlType.m_like == 0))
-	    {
-	      return TUPKEY_abort(signal, 40);
-	    }
-            res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+          if ((cond == Interpreter::LIKE) ||
+              (cond == Interpreter::NOT_LIKE))
+          {
+            if (r1_null || r2_null) {
+              // NULL like NULL is true (has no practical use)
+              res1 =  r1_null && r2_null ? 0 : -1;
+            } else {
+              jam();
+              if (unlikely(sqlType.m_like == 0))
+              {
+                return TUPKEY_abort(signal, 40);
+              }
+              res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+            }
+          }
+          else
+          {
+            /* AND_XX_MASK condition */
+            ndbassert(cond <= Interpreter::AND_NE_ZERO);
+            if (unlikely(sqlType.m_mask == 0))
+            {
+              return TUPKEY_abort(signal,40);
+            }
+            /* If either arg is NULL, we say COL AND MASK
+             * NE_ZERO and NE_MASK.
+             */
+            if (r1_null || r2_null) {
+              res1= 1;
+            } else {
+              
+              bool cmpZero= 
+                (cond == Interpreter::AND_EQ_ZERO) ||
+                (cond == Interpreter::AND_NE_ZERO);
+              
+              res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
+            }
           }
         }
 
@@ -2549,6 +2586,18 @@ int Dbtup::interpreterNextLab(Signal* si
         case Interpreter::NOT_LIKE:
           res = (res1 == 1);
           break;
+        case Interpreter::AND_EQ_MASK:
+          res = (res1 == 0);
+          break;
+        case Interpreter::AND_NE_MASK:
+          res = (res1 != 0);
+          break;
+        case Interpreter::AND_EQ_ZERO:
+          res = (res1 == 0);
+          break;
+        case Interpreter::AND_NE_ZERO:
+          res = (res1 != 0);
+          break;
 	  // XXX handle invalid value
         }
 #ifdef TRACE_INTERPRETER

=== modified file 'storage/ndb/src/ndbapi/NdbInterpretedCode.cpp'
--- a/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-11-11 11:40:42 +0000
+++ b/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	2008-12-09 15:47:14 +0000
@@ -510,6 +510,7 @@ NdbInterpretedCode::branch_col(Uint32 br
 
   Uint32 sigLen= len;
   Uint32 sendLen= len;
+  Uint32 lastWordMask= ~0;
 
   if (val == NULL)
     sigLen = sendLen = 0;
@@ -517,6 +518,16 @@ NdbInterpretedCode::branch_col(Uint32 br
     if (! col->getStringType())
     {
       /* Fixed size type */
+      if (col->getType() == NDB_TYPE_BIT)
+      {
+        /* We want to zero out insignificant bits in the
+         * last word of a bit type
+         */
+        Uint32 bitLen= col->getLength();
+        Uint32 lastWordBits= bitLen & 0x1F;
+        if (lastWordBits)
+          lastWordMask= (1 << lastWordBits) -1;
+      }
       sigLen= sendLen= col->m_attrSize * col->m_arraySize;
     }
     else
@@ -548,7 +559,7 @@ NdbInterpretedCode::branch_col(Uint32 br
     val = tempData;
   }
 
-  if (add_branch(Interpreter::BranchCol(c, 0, 0, false), Label) != 0)
+  if (add_branch(Interpreter::BranchCol(c, 0, 0), Label) != 0)
     DBUG_RETURN(-1);
 
   if (add1(Interpreter::BranchCol_2(attrId, sendLen)) != 0)
@@ -556,7 +567,8 @@ NdbInterpretedCode::branch_col(Uint32 br
 
   /* Get value byte length rounded up to nearest 32-bit word */
   Uint32 len2 = Interpreter::mod4(sendLen);
-  if(len2 == sendLen){
+  if((len2 == sendLen) && 
+     (lastWordMask == (Uint32)~0)){
     /* Whole number of 32-bit words */
     DBUG_RETURN(addN((Uint32*)val, len2 >> 2));
   } else {
@@ -571,7 +583,7 @@ NdbInterpretedCode::branch_col(Uint32 br
       char* p = (char*)&tmp;
       p[i] = ((char*)val)[len2+i];
     }
-    DBUG_RETURN(add1(tmp));
+    DBUG_RETURN(add1((tmp & lastWordMask)));
   }
 }
 
@@ -581,7 +593,7 @@ NdbInterpretedCode::branch_col_eq(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::EQ, attrId, val, len, Label);
+  return branch_col(Interpreter::EQ, attrId, val, 0, Label);
 }
 
 int 
@@ -590,7 +602,7 @@ NdbInterpretedCode::branch_col_ne(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::NE, attrId, val, len, Label);
+  return branch_col(Interpreter::NE, attrId, val, 0, Label);
 }
 
 int 
@@ -599,7 +611,7 @@ NdbInterpretedCode::branch_col_lt(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::LT, attrId, val, len, Label);
+  return branch_col(Interpreter::LT, attrId, val, 0, Label);
 }
 
 int 
@@ -608,7 +620,7 @@ NdbInterpretedCode::branch_col_le(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::LE, attrId, val, len, Label);
+  return branch_col(Interpreter::LE, attrId, val, 0, Label);
 }
 
 int 
@@ -617,7 +629,7 @@ NdbInterpretedCode::branch_col_gt(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::GT, attrId, val, len, Label);
+  return branch_col(Interpreter::GT, attrId, val, 0, Label);
 }
 
 int 
@@ -626,7 +638,7 @@ NdbInterpretedCode::branch_col_ge(const 
                                   Uint32 attrId,
                                   Uint32 Label)
 {
-  return branch_col(Interpreter::GE, attrId, val, len, Label);
+  return branch_col(Interpreter::GE, attrId, val, 0, Label);
 }
 
 int 
@@ -648,6 +660,42 @@ NdbInterpretedCode::branch_col_notlike(c
 }
 
 int
+NdbInterpretedCode::branch_col_and_mask_eq_mask(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_EQ_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_mask(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_NE_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_eq_zero(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_EQ_ZERO, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_zero(const void * mask,
+                                                Uint32 len,
+                                                Uint32 attrId,
+                                                Uint32 label)
+{
+  return branch_col(Interpreter::AND_NE_ZERO, attrId, mask, 0, Label);
+}
+
+int
 NdbInterpretedCode::interpret_exit_ok()
 {
   return add1(Interpreter::EXIT_OK);

=== modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2008-11-11 11:47:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationInt.cpp	2008-12-09 15:47:14 +0000
@@ -1067,7 +1067,7 @@ NdbOperation::insertCall(Uint32 aCall)
 int
 NdbOperation::branch_col(Uint32 type, 
 			 Uint32 ColId, const void * val, Uint32 len, 
-			 bool nopad, Uint32 Label){
+			 Uint32 Label){
 
   DBUG_ENTER("NdbOperation::branch_col");
   DBUG_PRINT("enter", ("type: %u  col:%u  val: 0x%lx  len: %u  label: %u",
@@ -1089,6 +1089,7 @@ NdbOperation::branch_col(Uint32 type, 
 
   Uint32 sigLen= len;
   Uint32 sendLen= len;
+  Uint32 lastWordMask= ~0;
 
   if (val == NULL)
     sigLen= sendLen = 0;
@@ -1096,6 +1097,16 @@ NdbOperation::branch_col(Uint32 type, 
     if (! col->getStringType())
     {
       /* Fixed size type */
+      if (col->getType() == NDB_TYPE_BIT)
+      {
+        /* We want to zero out insignificant bits in the
+         * last word of a bit type
+         */
+        Uint32 bitLen= col->getLength();
+        Uint32 lastWordBits= bitLen & 0x1F;
+        if (lastWordBits)
+          lastWordMask= (1 << lastWordBits) -1;
+      }
       sigLen= sendLen= col->m_attrSize * col->m_arraySize;
     }
     else
@@ -1130,7 +1141,7 @@ NdbOperation::branch_col(Uint32 type, 
     val = tempData;
   }
 
-  if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
+  if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0)) == -1)
     DBUG_RETURN(-1);
   
   if (insertBranch(Label) == -1)
@@ -1140,7 +1151,8 @@ NdbOperation::branch_col(Uint32 type, 
     DBUG_RETURN(-1);
   
   Uint32 len2 = Interpreter::mod4(sendLen);
-  if(len2 == sendLen){
+  if((len2 == sendLen) &&
+     (lastWordMask == (Uint32)~0)){
     insertATTRINFOloop((Uint32*)val, len2 >> 2);
   } else {
     len2 -= 4;
@@ -1150,7 +1162,7 @@ NdbOperation::branch_col(Uint32 type, 
       char* p = (char*)&tmp;
       p[i] = ((char*)val)[len2+i];
     }
-    insertATTRINFO(tmp);
+    insertATTRINFO(tmp & lastWordMask);
   }
   
   theErrorLine++;
@@ -1162,7 +1174,7 @@ NdbOperation::branch_col_eq(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::EQ, ColId, val, len, Label);
 }
 
 int
@@ -1170,28 +1182,28 @@ NdbOperation::branch_col_ne(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::NE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::NE, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_lt(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LT, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LT, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_le(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LE, ColId, val, len, Label);
 }
 int
 NdbOperation::branch_col_gt(Uint32 ColId, const void * val, Uint32 len, 
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::GT, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::GT, ColId, val, len, Label);
 }
 
 int
@@ -1199,7 +1211,7 @@ NdbOperation::branch_col_ge(Uint32 ColId
 			    bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::GE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::GE, ColId, val, len, Label);
 }
 
 int
@@ -1207,7 +1219,7 @@ NdbOperation::branch_col_like(Uint32 Col
 			      bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::LIKE, ColId, val, len, Label);
 }
 
 int
@@ -1215,7 +1227,39 @@ NdbOperation::branch_col_notlike(Uint32 
 				 bool nopad, Uint32 Label){
   INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
              nopad, Label));
-  return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label);
+  return branch_col(Interpreter::NOT_LIKE, ColId, val, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_mask(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_eq_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_EQ_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_mask(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_ne_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_NE_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_zero(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_eq_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_EQ_ZERO, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_zero(Uint32 ColId, const void * mask, 
+                                          Uint32 len, bool nopad, Uint32 Label){
+  INT_DEBUG(("branch_col_and_mask_ne_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+             nopad, Label));
+  return branch_col(Interpreter::AND_NE_ZERO, ColId, mask, len, Label);
 }
 
 int

=== modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-08-06 15:33:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-12-09 15:47:14 +0000
@@ -586,6 +586,42 @@ static const tab3 table3[] = {
        &NdbInterpretedCode::branch_col_notlike, 
        &NdbInterpretedCode::branch_col_like, 
        &NdbInterpretedCode::branch_col_notlike } }
+  
+  /**
+   * AND EQ MASK
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask } }
+  
+  /**
+   * AND NE MASK
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+       &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+       &NdbInterpretedCode::branch_col_and_mask_ne_mask } } 
+
+  /**
+   * AND EQ ZERO
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero } }
+  
+  /**
+   * AND NE ZERO
+   */
+  ,{ { 0,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+       &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+       &NdbInterpretedCode::branch_col_and_mask_ne_zero } } 
 };
 
 const int tab3_sz = sizeof(table3)/sizeof(table3[0]);
@@ -664,6 +700,14 @@ NdbScanFilter::cmp(BinaryCondition cond,
     return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len);
   case COND_NOT_LIKE:
     return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
+  case COND_AND_EQ_MASK:
+    return m_impl.cond_col_const(Interpreter::AND_EQ_MASK, ColId, val, len);
+  case COND_AND_NE_MASK:
+    return m_impl.cond_col_const(Interpreter::AND_NE_MASK, ColId, val, len);
+  case COND_AND_EQ_ZERO:
+    return m_impl.cond_col_const(Interpreter::AND_EQ_ZERO, ColId, val, len);
+  case COND_AND_NE_ZERO:
+    return m_impl.cond_col_const(Interpreter::AND_NE_ZERO, ColId, val, len);
   }
   return -1;
 }

=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-11-10 11:41:44 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-12-09 15:47:14 +0000
@@ -88,6 +88,49 @@ static
 const
 NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
 
+static
+const
+NDBT_Attribute MYTAB2Attribs[] = {
+  NDBT_Attribute("id",        NdbDictionary::Column::Unsigned, 1, true),
+  //                                                         _pk    _nullable
+  NDBT_Attribute("1bitnn",    NdbDictionary::Column::Bit, 1, false, false),
+  NDBT_Attribute("1bitnu",    NdbDictionary::Column::Bit, 1, false, true),
+  NDBT_Attribute("2bitnn",    NdbDictionary::Column::Bit, 2, false, false),
+  NDBT_Attribute("2bitnu",    NdbDictionary::Column::Bit, 2, false, true),
+  NDBT_Attribute("7bitnn",    NdbDictionary::Column::Bit, 7, false, false),
+  NDBT_Attribute("7bitnu",    NdbDictionary::Column::Bit, 7, false, true),
+  NDBT_Attribute("8bitnn",    NdbDictionary::Column::Bit, 8, false, false),
+  NDBT_Attribute("8bitnu",    NdbDictionary::Column::Bit, 8, false, true),
+  NDBT_Attribute("15bitnn",   NdbDictionary::Column::Bit, 15, false, false),
+  NDBT_Attribute("15bitnu",   NdbDictionary::Column::Bit, 15, false, true),
+  NDBT_Attribute("31bitnn",   NdbDictionary::Column::Bit, 31, false, false),
+  NDBT_Attribute("31bitnu",   NdbDictionary::Column::Bit, 31, false, true),
+  NDBT_Attribute("32bitnn",   NdbDictionary::Column::Bit, 32, false, false),
+  NDBT_Attribute("32bitnu",   NdbDictionary::Column::Bit, 32, false, true),
+  NDBT_Attribute("33bitnn",   NdbDictionary::Column::Bit, 33, false, false),
+  NDBT_Attribute("33bitnu",   NdbDictionary::Column::Bit, 33, false, true),
+  NDBT_Attribute("63bitnn",   NdbDictionary::Column::Bit, 63, false, false),
+  NDBT_Attribute("63bitnu",   NdbDictionary::Column::Bit, 63, false, true),
+  NDBT_Attribute("64bitnn",   NdbDictionary::Column::Bit, 64, false, false),
+  NDBT_Attribute("64bitnu",   NdbDictionary::Column::Bit, 64, false, true),
+  NDBT_Attribute("65bitnn",   NdbDictionary::Column::Bit, 65, false, false),
+  NDBT_Attribute("65bitnu",   NdbDictionary::Column::Bit, 65, false, true),
+  NDBT_Attribute("127bitnn",  NdbDictionary::Column::Bit, 127, false, false),
+  NDBT_Attribute("127bitnu",  NdbDictionary::Column::Bit, 127, false, true),
+  NDBT_Attribute("513bitnn",   NdbDictionary::Column::Bit, 513, false, false),
+  NDBT_Attribute("513bitnu",   NdbDictionary::Column::Bit, 513, false, true)
+};
+
+static const char* TABLE2_NAME= "MyTab2";
+
+static
+const
+NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
+
+static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
+static const int MAX_BIT_WIDTH= 513;
+/* One extra row for all bits == 0 */
+static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
 
 int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp, 
 			 bool existsOk, NDBT_CreateTableHook f)
@@ -1012,6 +1055,588 @@ int runScanFilterConstructorFail(NDBT_Co
   return NDBT_OK;
 }
 
+bool getBit(const Uint32* bitMap,
+            int bitNum)
+{
+  return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
+}
+
+void setBit(Uint32* bitMap,
+            int bitNum)
+{
+  bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
+}
+
+enum TestConditions {
+  COND_LE = 0,
+  COND_LT = 1,
+  COND_GE = 2,
+  COND_GT = 3,
+  COND_EQ = 4,
+  COND_NE = 5,
+  COND_NULL = 6,
+  COND_NOTNULL = 7,
+  COND_AND_EQ_MASK = 8,
+  COND_AND_NE_MASK = 9,
+  COND_AND_EQ_ZERO = 10,
+  COND_AND_NE_ZERO = 11
+};
+
+int getExpectedBitsSet(int rowId,
+                       int colBitWidth)
+{
+  /* returns value from -1 to colBitWidth -1
+   * -1 == no bits set
+   * 0 == bit 0 set
+   * 1 == bit 0 + bit 1 set
+   * ...
+   */
+  return (rowId % (colBitWidth + 1)) -1;
+}
+
+bool isNullValue(int rowId,
+                 int colBitWidth)
+{
+  /* Occasionally we'll have a Null column */
+  return (((rowId + colBitWidth) % 13) == 0);
+}
+
+void getBitfieldVariants(int bitNum, int& offset, bool& invert)
+{
+  offset= 0;
+  invert= false;
+  if ((bitNum % 5) == 3)
+  {
+    /* Invert the mask */
+    invert= true;
+  }
+  if ((bitNum % 7) == 6)
+  {
+    /* Shift the mask */
+    offset= (bitNum / 2);
+  }
+};
+  
+
+bool isRowExpected(int rowId, 
+                   TestConditions cond,
+                   int colBitWidth,
+                   int bitsSetInScanFilter,
+                   bool isNullable,
+                   const Uint32* maskBuff)
+{
+  if (isNullable && isNullValue(rowId, colBitWidth))
+  {
+    switch (cond) {
+    case COND_LE:
+      return true; // null < any value
+    case COND_LT:
+      return true; // null < any value
+    case COND_GE:
+      return false; // null < any value
+    case COND_GT:
+      return false; // null < any value
+    case COND_EQ:
+      return false; // null != any value 
+    case COND_NE:
+      return true; // null != any value
+    case COND_NULL:
+      return true; // null is null 
+    case COND_NOTNULL:
+      return false;
+    case COND_AND_EQ_MASK:
+      return false; // NULL AND MASK != MASK
+    case COND_AND_NE_MASK:
+      return true; // NULL AND MASK != MASK
+    case COND_AND_EQ_ZERO:
+      return false; // NULL AND MASK != 0
+    case COND_AND_NE_ZERO:
+      return true; // NULL AND MASK != 0
+    default:
+      printf("isRowExpected given bad condition : %u\n",
+             cond);
+      return false;
+    }    
+  }
+  else
+  {
+    /* Not a null value */
+    int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
+    
+    int offset= 0;
+    bool invert= false;
+
+    getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
+
+    switch (cond) {
+    case COND_LE:
+      return expectedBitsSet <= bitsSetInScanFilter;
+    case COND_LT:
+      return expectedBitsSet <  bitsSetInScanFilter;
+    case COND_GE:
+      return expectedBitsSet >= bitsSetInScanFilter;
+    case COND_GT:
+      return expectedBitsSet >  bitsSetInScanFilter;
+    case COND_EQ:
+      return expectedBitsSet == bitsSetInScanFilter;
+    case COND_NE:
+      return expectedBitsSet != bitsSetInScanFilter;
+    case COND_NULL:
+      return false;
+    case COND_NOTNULL:
+      return true;
+    case COND_AND_EQ_MASK:
+    case COND_AND_NE_MASK:
+    {
+      bool result= true;
+      /* Compare data AND mask to the mask buff */
+      for (int idx=0; idx < colBitWidth; idx++)
+      {
+        bool bitVal= (expectedBitsSet >= 0)?
+          (expectedBitsSet >= idx): false;
+        bool maskVal= getBit(maskBuff, idx);
+        if ((bitVal & maskVal) != maskVal)
+        {
+          /* Difference to mask, know result */
+          result= false;
+          break;
+        }
+      }
+
+      /* Invert result for NE condition */
+      return (result ^ (cond == COND_AND_NE_MASK));
+    }
+    case COND_AND_EQ_ZERO:
+    case COND_AND_NE_ZERO:
+    {
+      bool result= true;
+      /* Compare data AND mask to zero */
+      for (int idx=0; idx < colBitWidth; idx++)
+      {
+        bool bitVal= (expectedBitsSet >= 0)?
+          (expectedBitsSet >= idx): false;
+        bool maskVal= getBit(maskBuff, idx);
+        if ((bitVal & maskVal) != 0)
+        {
+          /* Difference to 0, know result */
+          result= false;
+          break;
+        }
+      }
+      /* Invert result for NE condition */
+      return (result ^ (cond == COND_AND_NE_ZERO));
+    }
+    default:
+      printf("isRowExpected given bad condition : %u\n",
+             cond);
+      return false;
+    }
+  }
+}
+
+int insertBitRows(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+
+  for (int i=0; i< TOTAL_ROWS; i++)
+  {
+    NdbTransaction* myTrans = pNdb->startTransaction();
+    if (myTrans == NULL)
+      APIERROR(pNdb->getNdbError());
+
+    NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
+    if (insertOp == NULL)
+      APIERROR(pNdb->getNdbError());
+    
+    const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
+    Uint32 buff[ buffSize ];
+
+    if (insertOp->insertTuple() != 0)
+      APIERROR(insertOp->getNdbError());
+
+    if (insertOp->equal((Uint32)0, i) != 0) // Set id column
+      APIERROR(insertOp->getNdbError());
+
+    for (int col=1; col < myTable->getNoOfColumns(); col++)
+    {
+      const NdbDictionary::Column* c= myTable->getColumn(col);
+      int colBitWidth= c->getLength();
+      bool isNullable= c->getNullable();
+
+      if (isNullable &&
+          isNullValue(i, colBitWidth))
+      {
+        /* Set column value to NULL */
+        if (insertOp->setValue(col, (char*) NULL) != 0)
+          APIERROR(insertOp->getNdbError());
+      }
+      else
+      {
+        /* Set lowest bits in this column */
+        memset(buff, 0, (4 * buffSize));
+        
+        int bitsToSet= getExpectedBitsSet(i, colBitWidth);
+        
+        if (bitsToSet >= 0)
+        {
+          for (int idx=0; idx <= bitsToSet; idx++)
+            setBit(buff, idx);
+        }
+        
+        if (insertOp->setValue(col, (char *)buff) != 0)
+          APIERROR(insertOp->getNdbError());
+      }
+    }
+
+    if (myTrans->execute(NdbTransaction::Commit) != 0)
+    {
+      APIERROR(myTrans->getNdbError());
+    }
+    myTrans->close();
+  }
+
+  printf("Inserted %u rows\n", TOTAL_ROWS);
+
+  return NDBT_OK;
+}
+
+int verifyBitData(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+
+  NdbTransaction* myTrans= pNdb->startTransaction();
+  if (myTrans == NULL)
+    APIERROR(pNdb->getNdbError());
+  
+  NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+  if (scanOp == NULL)
+    APIERROR(pNdb->getNdbError());
+  
+  if (scanOp->readTuples() != 0)
+    APIERROR(scanOp->getNdbError());
+  
+  NdbRecAttr* results[ NUM_COLS ];
+  
+  for (int col=0; col< NUM_COLS; col++)
+  {
+    if ((results[col]= scanOp->getValue(col)) == NULL)
+      APIERROR(scanOp->getNdbError());
+  }
+  
+  if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+    APIERROR(myTrans->getNdbError());
+  
+  for (int row=0; row < TOTAL_ROWS; row++)
+  {
+    if (scanOp->nextResult() != 0)
+      APIERROR(scanOp->getNdbError());
+    
+    int rowId= results[0]->int32_value();
+    
+    for (int col=1; col < NUM_COLS; col++)
+    {
+      const NdbDictionary::Column* c= myTable->getColumn(col);
+      bool isNullable= c->getNullable();
+      int colBitWidth= c->getLength();
+
+      if (isNullable &&
+          isNullValue(rowId, colBitWidth))
+      {
+        if (!results[ col ] ->isNULL())
+        {
+          printf("Mismatch at result %d row %d, column %d, expected NULL\n",
+                 row, rowId, col);
+          myTrans->close();
+          return NDBT_FAILED;
+        }
+      }
+      else
+      {
+        /* Non null value, check it */
+        int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
+        
+        const Uint32* val= (const Uint32 *) results[ col ]->aRef();
+        
+        for (int bitNum=0; bitNum < colBitWidth; bitNum++)
+        {
+          bool expectClear= (bitNum > expectedSetBits);
+          bool isClear= ! getBit(val, bitNum);
+          if (expectClear != isClear)
+          {
+            printf("Mismatch at result %d row %d, column %d, bit %d"
+                   " expected %d \n",
+                   row, rowId, col, bitNum, (expectClear)?0:1);
+            myTrans->close();
+            return NDBT_FAILED;
+          }
+        }
+      }
+    }
+  }
+  
+  if (scanOp->nextResult() != 1)
+  {
+    printf("Too many rows returned\n");
+    return NDBT_FAILED;
+  }
+  
+  if (myTrans->execute(NdbTransaction::Commit) != 0)
+    APIERROR(myTrans->getNdbError());
+  
+  myTrans->close();
+
+  printf("Verified data for %u rows\n",
+         TOTAL_ROWS);
+  
+  return NDBT_OK;
+}
+
+int verifyBitScanFilter(Ndb* pNdb)
+{
+  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+  
+  /* Perform scan with scanfilter for :
+   *   - each column in the table
+   *   - each supported comparison type
+   *   - each potentially set bit in the column
+   */
+  int scanCount= 0;
+  for (int col=1; col < NUM_COLS; col++)
+  {
+    const NdbDictionary::Column* c= myTable->getColumn(col);
+    const int bitWidth= c->getLength();
+    printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
+           (c->getNullable())?"Nullable":"Non-null",
+           col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
+    for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
+    {
+      for (int bitNum=0; bitNum <= bitWidth; bitNum++)
+      {
+        /* Define scan */
+        NdbTransaction* myTrans= pNdb->startTransaction();
+        if (myTrans == NULL)
+          APIERROR(pNdb->getNdbError());
+        
+        NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+        if (scanOp == NULL)
+          APIERROR(pNdb->getNdbError());
+        
+        if (scanOp->readTuples() != 0)
+          APIERROR(scanOp->getNdbError());
+        
+        NdbRecAttr* ra;
+        if ((ra= scanOp->getValue((Uint32)0)) == NULL)
+          APIERROR(scanOp->getNdbError());
+        
+        /* Define ScanFilter */
+        const int buffSize= (MAX_BIT_WIDTH + 31)/32;
+        Uint32 buff[ buffSize ];
+        memset(buff, 0, (4 * buffSize));
+        
+        /* Define constant value, with some variants for bitwise operators */
+        bool invert= false;
+        int offset= 0;
+
+        switch (comp) {
+        case COND_AND_EQ_MASK:
+        case COND_AND_NE_MASK:
+        case COND_AND_EQ_ZERO:
+        case COND_AND_NE_ZERO:
+          getBitfieldVariants(bitNum, offset, invert);
+        default:
+          ;
+        }        
+
+        /* Set lower bitNum -1 bits
+         * If bitNum == 0, set none
+         */
+        int bitsSetInFilter= bitNum-1;
+
+        if ((bitsSetInFilter >= 0) ||
+            invert)
+        {
+          for (int idx=0; idx <= (32 * buffSize); idx++)
+          {
+            if ((idx >= offset) && 
+                (idx <= (offset + bitsSetInFilter)))
+            {
+              if (!invert)
+                setBit(buff, idx);
+            }
+            else
+            {
+              if (invert)
+                setBit(buff, idx);
+            }
+          }
+        }
+
+
+
+        NdbScanFilter sf(scanOp);
+        
+        if (sf.begin(NdbScanFilter::AND) != 0)
+          APIERROR(sf.getNdbError());
+        
+        if ((comp != COND_NULL) &&
+            (comp != COND_NOTNULL))
+        {
+          /* Operator with a constant */
+          if (sf.cmp((NdbScanFilter::BinaryCondition)comp, 
+                     col, 
+                     (char*)buff) != 0)
+            APIERROR(sf.getNdbError());
+        }
+        else
+        {
+          switch (comp) {
+          case COND_NULL:
+            if (sf.isnull(col) != 0)
+              APIERROR(sf.getNdbError());
+            break;
+          case COND_NOTNULL:
+            if (sf.isnotnull(col) != 0)
+              APIERROR(sf.getNdbError());
+            break;
+          default:
+            printf("Condition %u not supported\n", comp);
+            return NDBT_FAILED;
+          }
+        }
+        
+        if (sf.end() != 0)
+          APIERROR(sf.getNdbError());
+
+        /* Calculate expected number of rows in result */
+        const NdbDictionary::Column* c= myTable->getColumn(col);
+        int colBitWidth= c->getLength();
+        bool isNullable= c->getNullable();
+
+        // printf("Determining expected rows\n");
+        int expectedResultCount= 0;
+        for (int i=0; i< TOTAL_ROWS; i++)
+        {
+          if (isRowExpected(i, 
+                            (TestConditions)comp, 
+                            colBitWidth, 
+                            bitsSetInFilter,
+                            isNullable,
+                            buff))
+            expectedResultCount++;
+        }
+        
+        
+        /* Execute */
+        if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+          APIERROR(myTrans->getNdbError());
+        
+
+        /* Process results to ensure we got the expected rows back */
+        int rc= 0;
+        int count= 0;
+        int matchCount= 0;
+        // printf("Checking rows returned\n");
+        while ((rc= scanOp->nextResult()) == 0)
+        {
+          int rowId= ra->int32_value();
+          count++;
+          /*
+           * Check that this row was expected
+           */
+          if (isRowExpected(rowId, 
+                            (TestConditions)comp, 
+                            colBitWidth, 
+                            bitsSetInFilter,
+                            isNullable,
+                            buff))
+          {
+            matchCount++;
+          }
+          else
+          {
+            printf("Col=%u Comp=%u BitNum=%u row=%u : "
+                   "Got row %u back which I did not expect\n",
+                   col, comp, bitNum, count, rowId);
+            myTrans->close();
+            return NDBT_FAILED;
+          }
+        }
+
+        if (rc != 1)
+        {
+          printf("Col=%u Comp=%u BitNum=%u :"
+                 "nextResult failure %d\n", col, comp, bitNum, rc);
+          APIERROR(myTrans->getNdbError());
+        }
+
+        //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
+        //       col, comp, bitNum, expectedResultCount, matchCount);
+        
+        /* Check that we didn't miss any expected rows */
+        if (matchCount != expectedResultCount)
+        {
+          printf("Col=%u Comp=%u BitNum=%u :"
+                 "Mismatch between expected(%u) and received(%u) result counts\n",
+                 col, comp, bitNum, expectedResultCount, matchCount);
+          myTrans->close();
+          return NDBT_FAILED;
+        }
+        
+        if (myTrans->execute(NdbTransaction::Commit) != 0)
+          APIERROR(myTrans->getNdbError());
+        
+        myTrans->close();
+
+        scanCount++;
+        
+      } // for bitNum
+    } // for comparison
+  } // for column
+  
+  printf("Verified %u scans with bitfield ScanFilter conditions\n",
+         scanCount);
+
+  return NDBT_OK;
+}
+
+
+int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
+{
+  /* Create table */
+  Ndb *pNdb = GETNDB(step);
+  pNdb->getDictionary()->dropTable(MYTAB2.getName());
+  int ret = createTable(pNdb, &MYTAB2, false, true, 0); 
+  if(ret)
+    return ret;
+
+  /* Populate with data */
+  if (insertBitRows(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+  
+  /* Initial data check via scan */
+  if (verifyBitData(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+
+  /* Verify Bit ScanFilter correctness */
+  if (verifyBitScanFilter(pNdb) != NDBT_OK)
+    return NDBT_FAILED;
+
+  /* Drop table */
+  pNdb->getDictionary()->dropTable(MYTAB2.getName());
+  
+  return NDBT_OK;
+}
+
+
 NDBT_TESTSUITE(testScanFilter);
 TESTCASE(TEST_NAME, 
 	 "Scan table TABLE_NAME for the records which accord with \
@@ -1025,6 +1650,12 @@ TESTCASE(TEST_NAME, 
   FINALIZER(runDropTables);
 }
 
+TESTCASE("TestScanFilterBit",
+         "Test ScanFilter with bitfield columns")
+{
+  INITIALIZER(runTestScanFilterBit);
+}
+
 NDBT_TESTSUITE_END(testScanFilter);
 
 
@@ -1038,5 +1669,5 @@ int main(int argc, const char** argv)
     return NDBT_ProgramExit(NDBT_FAILED);
   }
   
-  return testScanFilter.executeOneCtx(con, &MYTAB1, TEST_NAME);
+  return testScanFilter.execute(argc, argv);
 }

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2008-12-08 13:39:11 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2008-12-09 15:47:14 +0000
@@ -1179,7 +1179,7 @@ args -n bug37672 T1
 
 #EOF 2008-07-04
 max-time: 500
-cmd: testScanFilter
+cmd: testScanFilter T1
 args: 
 
 #EOF 2008-07-09

Thread
bzr push into mysql-5.1 branch (frazer:2785 to 2786) Bug#40535Frazer Clement9 Dec