2786 Frazer Clement 2008-12-09
Bug# 40535 : NDBAPI : ScanFilter does not support BIT columns
This bug is fixed to provide ScanFilter support for EQ, NE LT, LE, GT, GE
on bit columns.
Additionally, 4 new comparison operations are defined for bit types :
- COL_AND_MASK_EQ_MASK
- COL_AND_MASK_NE_MASK
- COL_AND_MASK_EQ_ZERO
- COL_AND_MASK_NE_ZERO
These can be used to test bits within a bitfield as part of a ScanFiler.
A test program for all of the conditions and bit columns is added
modified:
storage/ndb/include/kernel/Interpreter.hpp
storage/ndb/include/ndbapi/NdbInterpretedCode.hpp
storage/ndb/include/ndbapi/NdbOperation.hpp
storage/ndb/include/ndbapi/NdbScanFilter.hpp
storage/ndb/include/util/NdbSqlUtil.hpp
storage/ndb/src/common/util/NdbSqlUtil.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/ndbapi/NdbInterpretedCode.cpp
storage/ndb/src/ndbapi/NdbOperationInt.cpp
storage/ndb/src/ndbapi/NdbScanFilter.cpp
storage/ndb/test/ndbapi/testScanFilter.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
2785 Jonas Oreland 2008-12-08 [merge]
merge 62 to 63
modified:
storage/ndb/src/kernel/blocks/ERROR_codes.txt
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/test/include/NdbRestarter.hpp
storage/ndb/test/ndbapi/testNodeRestart.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
storage/ndb/test/src/NdbRestarter.cpp
=== modified file 'storage/ndb/include/kernel/Interpreter.hpp'
--- a/storage/ndb/include/kernel/Interpreter.hpp 2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/kernel/Interpreter.hpp 2008-12-09 15:47:14 +0000
@@ -89,18 +89,17 @@ public:
/**
* Branch string
*
- * i = Instruction - 5 Bits ( 0 - 5 ) max 63
- * a = Attribute id
- * l = Length of string
- * b = Branch offset
- * t = branch type
+ * i = Instruction - 5 Bits ( 0 - 5 ) max 63
+ * a = Attribute id - 16 bits
+ * l = Length of string (bytes) - 16 bits
+ * b = Branch offset (words) - 16 bits
+ * t = branch type - 4 bits
* d = Array length diff
* v = Varchar flag
- * p = No-blank-padding flag for char compare
*
* 1111111111222222222233
* 01234567890123456789012345678901
- * iiiiii ddvtttpbbbbbbbbbbbbbbbb
+ * iiiiii ddvttttbbbbbbbbbbbbbbbb
* aaaaaaaaaaaaaaaallllllllllllllll
* -string.... -
*/
@@ -117,17 +116,21 @@ public:
GT = 4,
GE = 5,
LIKE = 6,
- NOT_LIKE = 7
+ NOT_LIKE = 7,
+ AND_EQ_MASK = 8,
+ AND_NE_MASK = 9,
+ AND_EQ_ZERO = 10,
+ AND_NE_ZERO = 11
};
+ // TODO : Remove other 2 unused parameters.
static Uint32 BranchCol(BinaryCondition cond,
- Uint32 arrayLengthDiff, Uint32 varchar, bool nopad);
+ Uint32 arrayLengthDiff, Uint32 varchar);
static Uint32 BranchCol_2(Uint32 AttrId);
static Uint32 BranchCol_2(Uint32 AttrId, Uint32 Len);
static Uint32 getBinaryCondition(Uint32 op1);
static Uint32 getArrayLengthDiff(Uint32 op1);
static Uint32 isVarchar(Uint32 op1);
- static Uint32 isNopad(Uint32 op1);
static Uint32 getBranchCol_AttrId(Uint32 op2);
static Uint32 getBranchCol_Len(Uint32 op2);
@@ -216,15 +219,14 @@ inline
Uint32
Interpreter::BranchCol(BinaryCondition cond,
Uint32 arrayLengthDiff,
- Uint32 varchar, bool nopad){
- //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u nopad=%d",
- //cond, arrayLengthDiff, varchar, nopad);
+ Uint32 varchar){
+ //ndbout_c("BranchCol: cond=%d diff=%u varchar=%u",
+ //cond, arrayLengthDiff, varchar);
return
BRANCH_ATTR_OP_ARG +
(arrayLengthDiff << 9) +
(varchar << 11) +
- (cond << 12) +
- (nopad << 15);
+ (cond << 12);
}
inline
@@ -242,7 +244,7 @@ Interpreter::BranchCol_2(Uint32 AttrId){
inline
Uint32
Interpreter::getBinaryCondition(Uint32 op){
- return (op >> 12) & 0x7;
+ return (op >> 12) & 0xf;
}
inline
@@ -259,12 +261,6 @@ Interpreter::isVarchar(Uint32 op){
inline
Uint32
-Interpreter::isNopad(Uint32 op){
- return (op >> 15) & 1;
-}
-
-inline
-Uint32
Interpreter::getBranchCol_AttrId(Uint32 op){
return (op >> 16) & 0xFFFF;
}
=== modified file 'storage/ndb/include/ndbapi/NdbInterpretedCode.hpp'
--- a/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp 2008-04-07 09:52:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbInterpretedCode.hpp 2008-12-09 15:47:14 +0000
@@ -225,27 +225,30 @@ public:
* Space required Buffer Request message
* branch_col_*_null 2 words 2 words
* branch_col_* 2 words + 2 words +
- * len bytes len bytes
+ * type length type length
* rounded to rounded to
* nearest word nearest word
*
+ * Only significant words stored/
+ * sent for VAR* types
+ *
* @param val ptr to const value to compare against
- * @param len length in bytes of const value
+ * @param unused unnecessary
* @param attrId column to compare
* @param Label Program label to jump to if condition is true
* @return 0 if successful, -1 otherwise.
*/
- int branch_col_eq(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_eq(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
- int branch_col_ne(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_ne(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
- int branch_col_lt(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_lt(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
- int branch_col_le(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_le(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
- int branch_col_gt(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_gt(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
- int branch_col_ge(const void * val, Uint32 len, Uint32 attrId,
+ int branch_col_ge(const void * val, Uint32 unused, Uint32 attrId,
Uint32 Label);
int branch_col_eq_null(Uint32 attrId, Uint32 Label);
int branch_col_ne_null(Uint32 attrId, Uint32 Label);
@@ -280,13 +283,60 @@ public:
* @param attrId column to compare
* @param Label Program label to jump to if condition is true
* @return 0 if successful, -1 otherwise.
-
+ *
*/
int branch_col_like(const void * val, Uint32 len, Uint32 attrId,
Uint32 Label);
int branch_col_notlike(const void * val, Uint32 len, Uint32 attrId,
Uint32 Label);
+ /* Table based bitwise logical conditional operations
+ * --------------------------------------------------
+ * These instructions are used to branch based on the
+ * result of logical AND between Bit type column data
+ * and a bitmask pattern.
+ *
+ * These instructions require that the table being operated
+ * upon was supplied when the NdbInterpretedCode object was
+ * constructed.
+ *
+ * The mask value should be the same size as the bit column
+ * being compared.
+ * Bitfields are passed in/out of NdbApi as 32-bit words
+ * with bits set from lsb to msb.
+ * The platform's endianness controls which byte contains
+ * the ls bits.
+ * x86= first(0th) byte. Sparc/PPC= last(3rd byte)
+ *
+ * To set bit n of a bitmask to 1 from a Uint32* mask :
+ * mask[n >> 5] |= (1 << (n & 31))
+ *
+ * if (BitWiseAnd(ValueOf(attrId), *mask) <EQ/NE> <*mask/0>)
+ * goto Label;
+ *
+ * Space required Buffer Request message
+ * branch_col_and_mask_eq_mask/
+ * branch_col_and_mask_ne_mask/
+ * branch_col_and_mask_eq_zero/
+ * branch_col_and_mask_ne_zero
+ * 2 words + 2 words +
+ * column width column width
+ * rounded to rounded to
+ * nearest word nearest word
+ *
+ * @param mask ptr to const mask to use
+ * @param unused unnecessary
+ * @param attrId column to compare
+ * @param Label Program label to jump to if condition is true
+ * @return 0 if successful, -1 otherwise.
+ *
+ */
+ int branch_col_and_mask_eq_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+ int branch_col_and_mask_ne_mask(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+ int branch_col_and_mask_eq_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+ int branch_col_and_mask_ne_zero(const void * mask, Uint32 unused, Uint32 attrId, Uint32 Label);
+
+
/* Program results
* ---------------
* These instructions indicate to the interpreter that processing
=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-11-08 21:22:57 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-12-09 15:47:14 +0000
@@ -734,7 +734,12 @@ public:
int branch_col_ge(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label);
/**
- * The argument is always plain char, even if the field is varchar
+ * LIKE/NOTLIKE wildcard comparisons
+ * These instructions support SQL-style % and _ wildcards for
+ * (VAR)CHAR/BINARY columns only
+ *
+ * The argument is always plain char format, even if the field
+ * is varchar
* (changed in 5.0.22).
*
* @note For Scans and NdbRecord operations, use the
@@ -744,7 +749,40 @@ public:
bool nopad, Uint32 Label);
int branch_col_notlike(Uint32 ColId, const void *, Uint32 len,
bool nopad, Uint32 Label);
-
+
+ /**
+ * Bitwise logical comparisons
+ *
+ * These comparison types are only supported for the Bitfield
+ * type
+ * They can be used to test for bit patterns in bitfield columns
+ * The value passed is a bitmask which is bitwise-ANDed with the
+ * column data.
+ * Bitfields are passed in/out of NdbApi as 32-bit words with
+ * bits set from lsb to msb.
+ * The platform's endianness controls which byte contains the ls
+ * bits.
+ * x86= first(0th) byte. Sparc/PPC= last (3rd byte)
+ *
+ * To set bit n of a bitmask to 1 from a Uint32* mask :
+ * mask[n >> 5] |= (1 << (n & 31))
+ *
+ * The branch can be taken in 4 cases :
+ * - Column data AND Mask == Mask (all masked bits are set in data)
+ * - Column data AND Mask != Mask (not all masked bits are set in data)
+ * - Column data AND Mask == 0 (No masked bits are set in data)
+ * - Column data AND Mask != 0 (Some masked bits are set in data)
+ *
+ */
+ int branch_col_and_mask_eq_mask(Uint32 ColId, const void *, Uint32 len,
+ bool nopad, Uint32 Label);
+ int branch_col_and_mask_ne_mask(Uint32 ColId, const void *, Uint32 len,
+ bool nopad, Uint32 Label);
+ int branch_col_and_mask_eq_zero(Uint32 ColId, const void *, Uint32 len,
+ bool nopad, Uint32 Label);
+ int branch_col_and_mask_ne_zero(Uint32 ColId, const void *, Uint32 len,
+ bool nopad, Uint32 Label);
+
/**
* Interpreted program instruction: Exit with Ok
*
@@ -1249,7 +1287,7 @@ protected:
int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest);
int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource);
int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32);
- int branch_col(Uint32 type, Uint32, const void *, Uint32, bool, Uint32 Label);
+ int branch_col(Uint32 type, Uint32, const void *, Uint32, Uint32 Label);
int branch_col_null(Uint32 type, Uint32 col, Uint32 Label);
NdbBlob *linkInBlobHandle(NdbTransaction *aCon,
const NdbColumnImpl *column,
=== modified file 'storage/ndb/include/ndbapi/NdbScanFilter.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanFilter.hpp 2008-04-07 09:56:30 +0000
+++ b/storage/ndb/include/ndbapi/NdbScanFilter.hpp 2008-12-09 15:47:14 +0000
@@ -80,14 +80,18 @@ public:
enum BinaryCondition
{
- COND_LE = 0, ///< lower bound
- COND_LT = 1, ///< lower bound, strict
- COND_GE = 2, ///< upper bound
- COND_GT = 3, ///< upper bound, strict
- COND_EQ = 4, ///< equality
- COND_NE = 5, ///< not equal
- COND_LIKE = 6, ///< like
- COND_NOT_LIKE = 7 ///< not like
+ COND_LE = 0, ///< lower bound
+ COND_LT = 1, ///< lower bound, strict
+ COND_GE = 2, ///< upper bound
+ COND_GT = 3, ///< upper bound, strict
+ COND_EQ = 4, ///< equality
+ COND_NE = 5, ///< not equal
+ COND_LIKE = 6, ///< like
+ COND_NOT_LIKE = 7, ///< not like
+ COND_AND_EQ_MASK = 8, ///< (bit & mask) == mask
+ COND_AND_NE_MASK = 9, ///< (bit & mask) != mask (incl. NULL)
+ COND_AND_EQ_ZERO = 10, ///< (bit & mask) == 0
+ COND_AND_NE_ZERO = 11, ///< (bit & mask) != 0 (incl. NULL)
};
/**
@@ -130,6 +134,9 @@ public:
* documentation for NdbOperation::equal().
* For BinaryConditions LIKE and NOT_LIKE, the value pointed to by val
* should NOT include initial length bytes.
+ * For LIKE and NOT_LIKE, the % and ? wildcards are supported.
+ * For bitmask operations, see the bitmask format information against
+ * the branch_col_and_mask_eq_mask instruction in NdbInterpretedCode.hpp
*
* �return 0 if successful, -1 otherwise
*/
=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp 2008-12-09 15:47:14 +0000
@@ -52,6 +52,15 @@ public:
*/
typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2);
+ /**
+ * Prototype for mask comparisons. Defined for bit type.
+ *
+ * If common portion of data AND Mask is equal to mask
+ * return 0, else return 1.
+ * If cmpZero, compare data AND Mask to zero.
+ */
+ typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero);
+
enum CmpResult {
CmpLess = -1,
CmpEqual = 0,
@@ -96,6 +105,7 @@ public:
Enum m_typeId; // redundant
Cmp* m_cmp; // comparison method
Like* m_like; // "like" comparison method
+ AndMask* m_mask; // Mask comparison method
};
/**
@@ -179,6 +189,8 @@ private:
static Like likeVarbinary;
static Like likeLongvarchar;
static Like likeLongvarbinary;
+ //
+ static AndMask maskBit;
};
#endif
=== modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp'
--- a/storage/ndb/src/common/util/NdbSqlUtil.cpp 2007-02-23 11:28:34 +0000
+++ b/storage/ndb/src/common/util/NdbSqlUtil.cpp 2008-12-09 15:47:14 +0000
@@ -26,156 +26,187 @@ NdbSqlUtil::m_typeList[] = {
{ // 0
Type::Undefined,
NULL,
+ NULL,
NULL
},
{ // 1
Type::Tinyint,
cmpTinyint,
+ NULL,
NULL
},
{ // 2
Type::Tinyunsigned,
cmpTinyunsigned,
+ NULL,
NULL
},
{ // 3
Type::Smallint,
cmpSmallint,
+ NULL,
NULL
},
{ // 4
Type::Smallunsigned,
cmpSmallunsigned,
+ NULL,
NULL
},
{ // 5
Type::Mediumint,
cmpMediumint,
+ NULL,
NULL
},
{ // 6
Type::Mediumunsigned,
cmpMediumunsigned,
+ NULL,
NULL
},
{ // 7
Type::Int,
cmpInt,
+ NULL,
NULL
},
{ // 8
Type::Unsigned,
cmpUnsigned,
+ NULL,
NULL
},
{ // 9
Type::Bigint,
cmpBigint,
+ NULL,
NULL
},
{ // 10
Type::Bigunsigned,
cmpBigunsigned,
+ NULL,
NULL
},
{ // 11
Type::Float,
cmpFloat,
+ NULL,
NULL
},
{ // 12
Type::Double,
cmpDouble,
+ NULL,
NULL
},
{ // 13
Type::Olddecimal,
cmpOlddecimal,
+ NULL,
NULL
},
{ // 14
Type::Char,
cmpChar,
- likeChar
+ likeChar,
+ NULL
},
{ // 15
Type::Varchar,
cmpVarchar,
- likeVarchar
+ likeVarchar,
+ NULL
},
{ // 16
Type::Binary,
cmpBinary,
- likeBinary
+ likeBinary,
+ NULL
},
{ // 17
Type::Varbinary,
cmpVarbinary,
- likeVarbinary
+ likeVarbinary,
+ NULL
},
{ // 18
Type::Datetime,
cmpDatetime,
+ NULL,
NULL
},
{ // 19
Type::Date,
cmpDate,
+ NULL,
NULL
},
{ // 20
Type::Blob,
NULL,
+ NULL,
NULL
},
{ // 21
Type::Text,
NULL,
+ NULL,
NULL
},
{ // 22
Type::Bit,
cmpBit,
- NULL
+ NULL,
+ maskBit
},
{ // 23
Type::Longvarchar,
cmpLongvarchar,
- likeLongvarchar
+ likeLongvarchar,
+ NULL
},
{ // 24
Type::Longvarbinary,
cmpLongvarbinary,
- likeLongvarbinary
+ likeLongvarbinary,
+ NULL
},
{ // 25
Type::Time,
cmpTime,
+ NULL,
NULL
},
{ // 26
Type::Year,
cmpYear,
+ NULL,
NULL
},
{ // 27
Type::Timestamp,
cmpTimestamp,
+ NULL,
NULL
},
{ // 28
Type::Olddecimalunsigned,
cmpOlddecimalunsigned,
+ NULL,
NULL
},
{ // 29
Type::Decimal,
cmpDecimal,
+ NULL,
NULL
},
{ // 30
Type::Decimalunsigned,
cmpDecimalunsigned,
+ NULL,
NULL
}
};
@@ -680,9 +711,54 @@ NdbSqlUtil::cmpText(const void* info, co
int
NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- Uint32 n = (n1 < n2) ? n1 : n2;
- int ret = memcmp(p1, p2, n);
- return ret;
+ /* Bitfields are stored as 32-bit words
+ * This means that a byte-by-byte comparison will not work on all platforms
+ * We do a word-wise comparison of the significant bytes.
+ * It is assumed that insignificant bits (but not bytes) are zeroed in the
+ * passed values.
+ */
+ const Uint32 bytes= MIN(n1, n2);
+ Uint32 words= (bytes + 3) >> 2;
+
+ /* Don't expect either value to be length zero */
+ assert(words);
+
+ /* Check ptr alignment */
+ if (unlikely(((((UintPtr)p1) & 3) != 0) ||
+ ((((UintPtr)p2) & 3) != 0)))
+ {
+ Uint32 copyP1[ MAX_TUPLE_SIZE_IN_WORDS ];
+ Uint32 copyP2[ MAX_TUPLE_SIZE_IN_WORDS ];
+ memcpy(copyP1, p1, words << 2);
+ memcpy(copyP2, p2, words << 2);
+
+ return cmpBit(info, copyP1, bytes, copyP2, bytes, full);
+ }
+
+ const Uint32* wp1= (const Uint32*) p1;
+ const Uint32* wp2= (const Uint32*) p2;
+ while (--words)
+ {
+ if (*wp1 < *wp2)
+ return -1;
+ if (*(wp1++) > *(wp2++))
+ return 1;
+ }
+
+ /* For the last word, we mask out any insignificant bytes */
+ const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+ const Uint32 mask= sigBytes?
+ (1 << (sigBytes *8)) -1 :
+ ~0;
+ const Uint32 lastWord1= *wp1 & mask;
+ const Uint32 lastWord2= *wp2 & mask;
+
+ if (lastWord1 < lastWord2)
+ return -1;
+ if (lastWord1 > lastWord2)
+ return 1;
+
+ return 0;
}
@@ -874,6 +950,85 @@ NdbSqlUtil::likeLongvarbinary(const void
return likeLongvarchar(&my_charset_bin, p1, n1, p2, n2);
}
+
+// mask Functions
+
+int
+NdbSqlUtil::maskBit(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero)
+{
+ /* Bitfields are stored in word oriented form, so we must compare them in that
+ * style as well
+ * It is assumed that insignificant bits (but not bytes) in the passed values
+ * are zeroed
+ */
+ const Uint32 bytes = MIN(dataLen, maskLen);
+ Uint32 words = (bytes + 3) >> 2;
+
+ /* Don't expect either value to be length zero */
+ assert(words);
+
+ /* Check ptr alignment */
+ if (unlikely(((((UintPtr)data) & 3) != 0) ||
+ ((((UintPtr)mask) & 3) != 0)))
+ {
+ Uint32 copydata[ MAX_TUPLE_SIZE_IN_WORDS ];
+ Uint32 copymask[ MAX_TUPLE_SIZE_IN_WORDS ];
+ memcpy(copydata, data, words << 2);
+ memcpy(copymask, mask, words << 2);
+
+ return maskBit(data, bytes, mask, bytes, cmpZero);
+ }
+
+ const Uint32* wdata= (const Uint32*) data;
+ const Uint32* wmask= (const Uint32*) mask;
+
+ if (cmpZero)
+ {
+ while (--words)
+ {
+ if ((*(wdata++) & *(wmask++)) != 0)
+ return 1;
+ }
+
+ /* For the last word, we mask out any insignificant bytes */
+ const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+ const Uint32 comparisonMask= sigBytes?
+ (1 << (sigBytes *8)) -1 :
+ ~0;
+ const Uint32 lastDataWord= *wdata & comparisonMask;
+ const Uint32 lastMaskWord= *wmask & comparisonMask;
+
+ if ((lastDataWord & lastMaskWord) != 0)
+ return 1;
+
+ return 0;
+ }
+ else
+ {
+ while (--words)
+ {
+ if ((*(wdata++) & *wmask) != *wmask)
+ return 1;
+
+ wmask++;
+ }
+
+ /* For the last word, we mask out any insignificant bytes */
+ const Uint32 sigBytes= bytes & 3; // 0..3; 0 == all bytes significant
+ const Uint32 comparisonMask= sigBytes?
+ (1 << (sigBytes *8)) -1 :
+ ~0;
+ const Uint32 lastDataWord= *wdata & comparisonMask;
+ const Uint32 lastMaskWord= *wmask & comparisonMask;
+
+ if ((lastDataWord & lastMaskWord) != lastMaskWord)
+ return 1;
+
+ return 0;
+ }
+};
+
+
// check charset
uint
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-08-26 14:12:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-12-09 15:47:14 +0000
@@ -2491,12 +2491,23 @@ int Dbtup::interpreterNextLab(Signal* si
const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
// fixed length in 5.0
Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
+
+ if (typeId == NDB_TYPE_BIT)
+ {
+ /* Size in bytes for bit fields can be incorrect due to
+ * rounding down
+ */
+ Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
+ + 7) / 8;
+ attrLen= bitFieldAttrLen;
+ }
bool r1_null = ah.isNULL();
bool r2_null = argLen == 0;
int res1;
- if (cond != Interpreter::LIKE &&
- cond != Interpreter::NOT_LIKE) {
+ if (cond <= Interpreter::GE)
+ {
+ /* Inequality - EQ, NE, LT, LE, GT, GE */
if (r1_null || r2_null) {
// NULL==NULL and NULL<not-NULL
res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
@@ -2509,16 +2520,42 @@ int Dbtup::interpreterNextLab(Signal* si
res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
}
} else {
- if (r1_null || r2_null) {
- // NULL like NULL is true (has no practical use)
- res1 = r1_null && r2_null ? 0 : -1;
- } else {
- jam();
- if (unlikely(sqlType.m_like == 0))
- {
- return TUPKEY_abort(signal, 40);
- }
- res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+ if ((cond == Interpreter::LIKE) ||
+ (cond == Interpreter::NOT_LIKE))
+ {
+ if (r1_null || r2_null) {
+ // NULL like NULL is true (has no practical use)
+ res1 = r1_null && r2_null ? 0 : -1;
+ } else {
+ jam();
+ if (unlikely(sqlType.m_like == 0))
+ {
+ return TUPKEY_abort(signal, 40);
+ }
+ res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+ }
+ }
+ else
+ {
+ /* AND_XX_MASK condition */
+ ndbassert(cond <= Interpreter::AND_NE_ZERO);
+ if (unlikely(sqlType.m_mask == 0))
+ {
+ return TUPKEY_abort(signal,40);
+ }
+ /* If either arg is NULL, we say COL AND MASK
+ * NE_ZERO and NE_MASK.
+ */
+ if (r1_null || r2_null) {
+ res1= 1;
+ } else {
+
+ bool cmpZero=
+ (cond == Interpreter::AND_EQ_ZERO) ||
+ (cond == Interpreter::AND_NE_ZERO);
+
+ res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
+ }
}
}
@@ -2549,6 +2586,18 @@ int Dbtup::interpreterNextLab(Signal* si
case Interpreter::NOT_LIKE:
res = (res1 == 1);
break;
+ case Interpreter::AND_EQ_MASK:
+ res = (res1 == 0);
+ break;
+ case Interpreter::AND_NE_MASK:
+ res = (res1 != 0);
+ break;
+ case Interpreter::AND_EQ_ZERO:
+ res = (res1 == 0);
+ break;
+ case Interpreter::AND_NE_ZERO:
+ res = (res1 != 0);
+ break;
// XXX handle invalid value
}
#ifdef TRACE_INTERPRETER
=== modified file 'storage/ndb/src/ndbapi/NdbInterpretedCode.cpp'
--- a/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp 2008-11-11 11:40:42 +0000
+++ b/storage/ndb/src/ndbapi/NdbInterpretedCode.cpp 2008-12-09 15:47:14 +0000
@@ -510,6 +510,7 @@ NdbInterpretedCode::branch_col(Uint32 br
Uint32 sigLen= len;
Uint32 sendLen= len;
+ Uint32 lastWordMask= ~0;
if (val == NULL)
sigLen = sendLen = 0;
@@ -517,6 +518,16 @@ NdbInterpretedCode::branch_col(Uint32 br
if (! col->getStringType())
{
/* Fixed size type */
+ if (col->getType() == NDB_TYPE_BIT)
+ {
+ /* We want to zero out insignificant bits in the
+ * last word of a bit type
+ */
+ Uint32 bitLen= col->getLength();
+ Uint32 lastWordBits= bitLen & 0x1F;
+ if (lastWordBits)
+ lastWordMask= (1 << lastWordBits) -1;
+ }
sigLen= sendLen= col->m_attrSize * col->m_arraySize;
}
else
@@ -548,7 +559,7 @@ NdbInterpretedCode::branch_col(Uint32 br
val = tempData;
}
- if (add_branch(Interpreter::BranchCol(c, 0, 0, false), Label) != 0)
+ if (add_branch(Interpreter::BranchCol(c, 0, 0), Label) != 0)
DBUG_RETURN(-1);
if (add1(Interpreter::BranchCol_2(attrId, sendLen)) != 0)
@@ -556,7 +567,8 @@ NdbInterpretedCode::branch_col(Uint32 br
/* Get value byte length rounded up to nearest 32-bit word */
Uint32 len2 = Interpreter::mod4(sendLen);
- if(len2 == sendLen){
+ if((len2 == sendLen) &&
+ (lastWordMask == (Uint32)~0)){
/* Whole number of 32-bit words */
DBUG_RETURN(addN((Uint32*)val, len2 >> 2));
} else {
@@ -571,7 +583,7 @@ NdbInterpretedCode::branch_col(Uint32 br
char* p = (char*)&tmp;
p[i] = ((char*)val)[len2+i];
}
- DBUG_RETURN(add1(tmp));
+ DBUG_RETURN(add1((tmp & lastWordMask)));
}
}
@@ -581,7 +593,7 @@ NdbInterpretedCode::branch_col_eq(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::EQ, attrId, val, len, Label);
+ return branch_col(Interpreter::EQ, attrId, val, 0, Label);
}
int
@@ -590,7 +602,7 @@ NdbInterpretedCode::branch_col_ne(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::NE, attrId, val, len, Label);
+ return branch_col(Interpreter::NE, attrId, val, 0, Label);
}
int
@@ -599,7 +611,7 @@ NdbInterpretedCode::branch_col_lt(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::LT, attrId, val, len, Label);
+ return branch_col(Interpreter::LT, attrId, val, 0, Label);
}
int
@@ -608,7 +620,7 @@ NdbInterpretedCode::branch_col_le(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::LE, attrId, val, len, Label);
+ return branch_col(Interpreter::LE, attrId, val, 0, Label);
}
int
@@ -617,7 +629,7 @@ NdbInterpretedCode::branch_col_gt(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::GT, attrId, val, len, Label);
+ return branch_col(Interpreter::GT, attrId, val, 0, Label);
}
int
@@ -626,7 +638,7 @@ NdbInterpretedCode::branch_col_ge(const
Uint32 attrId,
Uint32 Label)
{
- return branch_col(Interpreter::GE, attrId, val, len, Label);
+ return branch_col(Interpreter::GE, attrId, val, 0, Label);
}
int
@@ -648,6 +660,42 @@ NdbInterpretedCode::branch_col_notlike(c
}
int
+NdbInterpretedCode::branch_col_and_mask_eq_mask(const void * mask,
+ Uint32 len,
+ Uint32 attrId,
+ Uint32 label)
+{
+ return branch_col(Interpreter::AND_EQ_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_mask(const void * mask,
+ Uint32 len,
+ Uint32 attrId,
+ Uint32 label)
+{
+ return branch_col(Interpreter::AND_NE_MASK, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_eq_zero(const void * mask,
+ Uint32 len,
+ Uint32 attrId,
+ Uint32 label)
+{
+ return branch_col(Interpreter::AND_EQ_ZERO, attrId, mask, 0, Label);
+}
+
+int
+NdbInterpretedCode::branch_col_and_mask_ne_zero(const void * mask,
+ Uint32 len,
+ Uint32 attrId,
+ Uint32 label)
+{
+ return branch_col(Interpreter::AND_NE_ZERO, attrId, mask, 0, Label);
+}
+
+int
NdbInterpretedCode::interpret_exit_ok()
{
return add1(Interpreter::EXIT_OK);
=== modified file 'storage/ndb/src/ndbapi/NdbOperationInt.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2008-11-11 11:47:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2008-12-09 15:47:14 +0000
@@ -1067,7 +1067,7 @@ NdbOperation::insertCall(Uint32 aCall)
int
NdbOperation::branch_col(Uint32 type,
Uint32 ColId, const void * val, Uint32 len,
- bool nopad, Uint32 Label){
+ Uint32 Label){
DBUG_ENTER("NdbOperation::branch_col");
DBUG_PRINT("enter", ("type: %u col:%u val: 0x%lx len: %u label: %u",
@@ -1089,6 +1089,7 @@ NdbOperation::branch_col(Uint32 type,
Uint32 sigLen= len;
Uint32 sendLen= len;
+ Uint32 lastWordMask= ~0;
if (val == NULL)
sigLen= sendLen = 0;
@@ -1096,6 +1097,16 @@ NdbOperation::branch_col(Uint32 type,
if (! col->getStringType())
{
/* Fixed size type */
+ if (col->getType() == NDB_TYPE_BIT)
+ {
+ /* We want to zero out insignificant bits in the
+ * last word of a bit type
+ */
+ Uint32 bitLen= col->getLength();
+ Uint32 lastWordBits= bitLen & 0x1F;
+ if (lastWordBits)
+ lastWordMask= (1 << lastWordBits) -1;
+ }
sigLen= sendLen= col->m_attrSize * col->m_arraySize;
}
else
@@ -1130,7 +1141,7 @@ NdbOperation::branch_col(Uint32 type,
val = tempData;
}
- if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
+ if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0)) == -1)
DBUG_RETURN(-1);
if (insertBranch(Label) == -1)
@@ -1140,7 +1151,8 @@ NdbOperation::branch_col(Uint32 type,
DBUG_RETURN(-1);
Uint32 len2 = Interpreter::mod4(sendLen);
- if(len2 == sendLen){
+ if((len2 == sendLen) &&
+ (lastWordMask == (Uint32)~0)){
insertATTRINFOloop((Uint32*)val, len2 >> 2);
} else {
len2 -= 4;
@@ -1150,7 +1162,7 @@ NdbOperation::branch_col(Uint32 type,
char* p = (char*)&tmp;
p[i] = ((char*)val)[len2+i];
}
- insertATTRINFO(tmp);
+ insertATTRINFO(tmp & lastWordMask);
}
theErrorLine++;
@@ -1162,7 +1174,7 @@ NdbOperation::branch_col_eq(Uint32 ColId
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::EQ, ColId, val, len, Label);
}
int
@@ -1170,28 +1182,28 @@ NdbOperation::branch_col_ne(Uint32 ColId
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::NE, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::NE, ColId, val, len, Label);
}
int
NdbOperation::branch_col_lt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::LT, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::LT, ColId, val, len, Label);
}
int
NdbOperation::branch_col_le(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::LE, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::LE, ColId, val, len, Label);
}
int
NdbOperation::branch_col_gt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::GT, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::GT, ColId, val, len, Label);
}
int
@@ -1199,7 +1211,7 @@ NdbOperation::branch_col_ge(Uint32 ColId
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::GE, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::GE, ColId, val, len, Label);
}
int
@@ -1207,7 +1219,7 @@ NdbOperation::branch_col_like(Uint32 Col
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::LIKE, ColId, val, len, Label);
}
int
@@ -1215,7 +1227,39 @@ NdbOperation::branch_col_notlike(Uint32
bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId, len, (char*) val, len,
nopad, Label));
- return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label);
+ return branch_col(Interpreter::NOT_LIKE, ColId, val, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_mask(Uint32 ColId, const void * mask,
+ Uint32 len, bool nopad, Uint32 Label){
+ INT_DEBUG(("branch_col_and_mask_eq_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+ nopad, Label));
+ return branch_col(Interpreter::AND_EQ_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_mask(Uint32 ColId, const void * mask,
+ Uint32 len, bool nopad, Uint32 Label){
+ INT_DEBUG(("branch_col_and_mask_ne_mask %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+ nopad, Label));
+ return branch_col(Interpreter::AND_NE_MASK, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_eq_zero(Uint32 ColId, const void * mask,
+ Uint32 len, bool nopad, Uint32 Label){
+ INT_DEBUG(("branch_col_and_mask_eq_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+ nopad, Label));
+ return branch_col(Interpreter::AND_EQ_ZERO, ColId, mask, len, Label);
+}
+
+int
+NdbOperation::branch_col_and_mask_ne_zero(Uint32 ColId, const void * mask,
+ Uint32 len, bool nopad, Uint32 Label){
+ INT_DEBUG(("branch_col_and_mask_ne_zero %u %.*s(%u,%d) -> %u", ColId, len, (char*) mask, len,
+ nopad, Label));
+ return branch_col(Interpreter::AND_NE_ZERO, ColId, mask, len, Label);
}
int
=== modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-08-06 15:33:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-12-09 15:47:14 +0000
@@ -586,6 +586,42 @@ static const tab3 table3[] = {
&NdbInterpretedCode::branch_col_notlike,
&NdbInterpretedCode::branch_col_like,
&NdbInterpretedCode::branch_col_notlike } }
+
+ /**
+ * AND EQ MASK
+ */
+ ,{ { 0,
+ &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+ &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+ &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+ &NdbInterpretedCode::branch_col_and_mask_eq_mask } }
+
+ /**
+ * AND NE MASK
+ */
+ ,{ { 0,
+ &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+ &NdbInterpretedCode::branch_col_and_mask_ne_mask,
+ &NdbInterpretedCode::branch_col_and_mask_eq_mask,
+ &NdbInterpretedCode::branch_col_and_mask_ne_mask } }
+
+ /**
+ * AND EQ ZERO
+ */
+ ,{ { 0,
+ &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+ &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+ &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+ &NdbInterpretedCode::branch_col_and_mask_eq_zero } }
+
+ /**
+ * AND NE ZERO
+ */
+ ,{ { 0,
+ &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+ &NdbInterpretedCode::branch_col_and_mask_ne_zero,
+ &NdbInterpretedCode::branch_col_and_mask_eq_zero,
+ &NdbInterpretedCode::branch_col_and_mask_ne_zero } }
};
const int tab3_sz = sizeof(table3)/sizeof(table3[0]);
@@ -664,6 +700,14 @@ NdbScanFilter::cmp(BinaryCondition cond,
return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len);
case COND_NOT_LIKE:
return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
+ case COND_AND_EQ_MASK:
+ return m_impl.cond_col_const(Interpreter::AND_EQ_MASK, ColId, val, len);
+ case COND_AND_NE_MASK:
+ return m_impl.cond_col_const(Interpreter::AND_NE_MASK, ColId, val, len);
+ case COND_AND_EQ_ZERO:
+ return m_impl.cond_col_const(Interpreter::AND_EQ_ZERO, ColId, val, len);
+ case COND_AND_NE_ZERO:
+ return m_impl.cond_col_const(Interpreter::AND_NE_ZERO, ColId, val, len);
}
return -1;
}
=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-11-10 11:41:44 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-12-09 15:47:14 +0000
@@ -88,6 +88,49 @@ static
const
NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
+static
+const
+NDBT_Attribute MYTAB2Attribs[] = {
+ NDBT_Attribute("id", NdbDictionary::Column::Unsigned, 1, true),
+ // _pk _nullable
+ NDBT_Attribute("1bitnn", NdbDictionary::Column::Bit, 1, false, false),
+ NDBT_Attribute("1bitnu", NdbDictionary::Column::Bit, 1, false, true),
+ NDBT_Attribute("2bitnn", NdbDictionary::Column::Bit, 2, false, false),
+ NDBT_Attribute("2bitnu", NdbDictionary::Column::Bit, 2, false, true),
+ NDBT_Attribute("7bitnn", NdbDictionary::Column::Bit, 7, false, false),
+ NDBT_Attribute("7bitnu", NdbDictionary::Column::Bit, 7, false, true),
+ NDBT_Attribute("8bitnn", NdbDictionary::Column::Bit, 8, false, false),
+ NDBT_Attribute("8bitnu", NdbDictionary::Column::Bit, 8, false, true),
+ NDBT_Attribute("15bitnn", NdbDictionary::Column::Bit, 15, false, false),
+ NDBT_Attribute("15bitnu", NdbDictionary::Column::Bit, 15, false, true),
+ NDBT_Attribute("31bitnn", NdbDictionary::Column::Bit, 31, false, false),
+ NDBT_Attribute("31bitnu", NdbDictionary::Column::Bit, 31, false, true),
+ NDBT_Attribute("32bitnn", NdbDictionary::Column::Bit, 32, false, false),
+ NDBT_Attribute("32bitnu", NdbDictionary::Column::Bit, 32, false, true),
+ NDBT_Attribute("33bitnn", NdbDictionary::Column::Bit, 33, false, false),
+ NDBT_Attribute("33bitnu", NdbDictionary::Column::Bit, 33, false, true),
+ NDBT_Attribute("63bitnn", NdbDictionary::Column::Bit, 63, false, false),
+ NDBT_Attribute("63bitnu", NdbDictionary::Column::Bit, 63, false, true),
+ NDBT_Attribute("64bitnn", NdbDictionary::Column::Bit, 64, false, false),
+ NDBT_Attribute("64bitnu", NdbDictionary::Column::Bit, 64, false, true),
+ NDBT_Attribute("65bitnn", NdbDictionary::Column::Bit, 65, false, false),
+ NDBT_Attribute("65bitnu", NdbDictionary::Column::Bit, 65, false, true),
+ NDBT_Attribute("127bitnn", NdbDictionary::Column::Bit, 127, false, false),
+ NDBT_Attribute("127bitnu", NdbDictionary::Column::Bit, 127, false, true),
+ NDBT_Attribute("513bitnn", NdbDictionary::Column::Bit, 513, false, false),
+ NDBT_Attribute("513bitnu", NdbDictionary::Column::Bit, 513, false, true)
+};
+
+static const char* TABLE2_NAME= "MyTab2";
+
+static
+const
+NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
+
+static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
+static const int MAX_BIT_WIDTH= 513;
+/* One extra row for all bits == 0 */
+static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp,
bool existsOk, NDBT_CreateTableHook f)
@@ -1012,6 +1055,588 @@ int runScanFilterConstructorFail(NDBT_Co
return NDBT_OK;
}
+bool getBit(const Uint32* bitMap,
+ int bitNum)
+{
+ return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
+}
+
+void setBit(Uint32* bitMap,
+ int bitNum)
+{
+ bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
+}
+
+enum TestConditions {
+ COND_LE = 0,
+ COND_LT = 1,
+ COND_GE = 2,
+ COND_GT = 3,
+ COND_EQ = 4,
+ COND_NE = 5,
+ COND_NULL = 6,
+ COND_NOTNULL = 7,
+ COND_AND_EQ_MASK = 8,
+ COND_AND_NE_MASK = 9,
+ COND_AND_EQ_ZERO = 10,
+ COND_AND_NE_ZERO = 11
+};
+
+int getExpectedBitsSet(int rowId,
+ int colBitWidth)
+{
+ /* returns value from -1 to colBitWidth -1
+ * -1 == no bits set
+ * 0 == bit 0 set
+ * 1 == bit 0 + bit 1 set
+ * ...
+ */
+ return (rowId % (colBitWidth + 1)) -1;
+}
+
+bool isNullValue(int rowId,
+ int colBitWidth)
+{
+ /* Occasionally we'll have a Null column */
+ return (((rowId + colBitWidth) % 13) == 0);
+}
+
+void getBitfieldVariants(int bitNum, int& offset, bool& invert)
+{
+ offset= 0;
+ invert= false;
+ if ((bitNum % 5) == 3)
+ {
+ /* Invert the mask */
+ invert= true;
+ }
+ if ((bitNum % 7) == 6)
+ {
+ /* Shift the mask */
+ offset= (bitNum / 2);
+ }
+};
+
+
+bool isRowExpected(int rowId,
+ TestConditions cond,
+ int colBitWidth,
+ int bitsSetInScanFilter,
+ bool isNullable,
+ const Uint32* maskBuff)
+{
+ if (isNullable && isNullValue(rowId, colBitWidth))
+ {
+ switch (cond) {
+ case COND_LE:
+ return true; // null < any value
+ case COND_LT:
+ return true; // null < any value
+ case COND_GE:
+ return false; // null < any value
+ case COND_GT:
+ return false; // null < any value
+ case COND_EQ:
+ return false; // null != any value
+ case COND_NE:
+ return true; // null != any value
+ case COND_NULL:
+ return true; // null is null
+ case COND_NOTNULL:
+ return false;
+ case COND_AND_EQ_MASK:
+ return false; // NULL AND MASK != MASK
+ case COND_AND_NE_MASK:
+ return true; // NULL AND MASK != MASK
+ case COND_AND_EQ_ZERO:
+ return false; // NULL AND MASK != 0
+ case COND_AND_NE_ZERO:
+ return true; // NULL AND MASK != 0
+ default:
+ printf("isRowExpected given bad condition : %u\n",
+ cond);
+ return false;
+ }
+ }
+ else
+ {
+ /* Not a null value */
+ int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
+
+ int offset= 0;
+ bool invert= false;
+
+ getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
+
+ switch (cond) {
+ case COND_LE:
+ return expectedBitsSet <= bitsSetInScanFilter;
+ case COND_LT:
+ return expectedBitsSet < bitsSetInScanFilter;
+ case COND_GE:
+ return expectedBitsSet >= bitsSetInScanFilter;
+ case COND_GT:
+ return expectedBitsSet > bitsSetInScanFilter;
+ case COND_EQ:
+ return expectedBitsSet == bitsSetInScanFilter;
+ case COND_NE:
+ return expectedBitsSet != bitsSetInScanFilter;
+ case COND_NULL:
+ return false;
+ case COND_NOTNULL:
+ return true;
+ case COND_AND_EQ_MASK:
+ case COND_AND_NE_MASK:
+ {
+ bool result= true;
+ /* Compare data AND mask to the mask buff */
+ for (int idx=0; idx < colBitWidth; idx++)
+ {
+ bool bitVal= (expectedBitsSet >= 0)?
+ (expectedBitsSet >= idx): false;
+ bool maskVal= getBit(maskBuff, idx);
+ if ((bitVal & maskVal) != maskVal)
+ {
+ /* Difference to mask, know result */
+ result= false;
+ break;
+ }
+ }
+
+ /* Invert result for NE condition */
+ return (result ^ (cond == COND_AND_NE_MASK));
+ }
+ case COND_AND_EQ_ZERO:
+ case COND_AND_NE_ZERO:
+ {
+ bool result= true;
+ /* Compare data AND mask to zero */
+ for (int idx=0; idx < colBitWidth; idx++)
+ {
+ bool bitVal= (expectedBitsSet >= 0)?
+ (expectedBitsSet >= idx): false;
+ bool maskVal= getBit(maskBuff, idx);
+ if ((bitVal & maskVal) != 0)
+ {
+ /* Difference to 0, know result */
+ result= false;
+ break;
+ }
+ }
+ /* Invert result for NE condition */
+ return (result ^ (cond == COND_AND_NE_ZERO));
+ }
+ default:
+ printf("isRowExpected given bad condition : %u\n",
+ cond);
+ return false;
+ }
+ }
+}
+
+int insertBitRows(Ndb* pNdb)
+{
+ const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+ const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+ if(myTable == NULL)
+ APIERROR(myDict->getNdbError());
+
+ for (int i=0; i< TOTAL_ROWS; i++)
+ {
+ NdbTransaction* myTrans = pNdb->startTransaction();
+ if (myTrans == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
+ if (insertOp == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
+ Uint32 buff[ buffSize ];
+
+ if (insertOp->insertTuple() != 0)
+ APIERROR(insertOp->getNdbError());
+
+ if (insertOp->equal((Uint32)0, i) != 0) // Set id column
+ APIERROR(insertOp->getNdbError());
+
+ for (int col=1; col < myTable->getNoOfColumns(); col++)
+ {
+ const NdbDictionary::Column* c= myTable->getColumn(col);
+ int colBitWidth= c->getLength();
+ bool isNullable= c->getNullable();
+
+ if (isNullable &&
+ isNullValue(i, colBitWidth))
+ {
+ /* Set column value to NULL */
+ if (insertOp->setValue(col, (char*) NULL) != 0)
+ APIERROR(insertOp->getNdbError());
+ }
+ else
+ {
+ /* Set lowest bits in this column */
+ memset(buff, 0, (4 * buffSize));
+
+ int bitsToSet= getExpectedBitsSet(i, colBitWidth);
+
+ if (bitsToSet >= 0)
+ {
+ for (int idx=0; idx <= bitsToSet; idx++)
+ setBit(buff, idx);
+ }
+
+ if (insertOp->setValue(col, (char *)buff) != 0)
+ APIERROR(insertOp->getNdbError());
+ }
+ }
+
+ if (myTrans->execute(NdbTransaction::Commit) != 0)
+ {
+ APIERROR(myTrans->getNdbError());
+ }
+ myTrans->close();
+ }
+
+ printf("Inserted %u rows\n", TOTAL_ROWS);
+
+ return NDBT_OK;
+}
+
+int verifyBitData(Ndb* pNdb)
+{
+ const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+ const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+ if(myTable == NULL)
+ APIERROR(myDict->getNdbError());
+
+ NdbTransaction* myTrans= pNdb->startTransaction();
+ if (myTrans == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+ if (scanOp == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ if (scanOp->readTuples() != 0)
+ APIERROR(scanOp->getNdbError());
+
+ NdbRecAttr* results[ NUM_COLS ];
+
+ for (int col=0; col< NUM_COLS; col++)
+ {
+ if ((results[col]= scanOp->getValue(col)) == NULL)
+ APIERROR(scanOp->getNdbError());
+ }
+
+ if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+ APIERROR(myTrans->getNdbError());
+
+ for (int row=0; row < TOTAL_ROWS; row++)
+ {
+ if (scanOp->nextResult() != 0)
+ APIERROR(scanOp->getNdbError());
+
+ int rowId= results[0]->int32_value();
+
+ for (int col=1; col < NUM_COLS; col++)
+ {
+ const NdbDictionary::Column* c= myTable->getColumn(col);
+ bool isNullable= c->getNullable();
+ int colBitWidth= c->getLength();
+
+ if (isNullable &&
+ isNullValue(rowId, colBitWidth))
+ {
+ if (!results[ col ] ->isNULL())
+ {
+ printf("Mismatch at result %d row %d, column %d, expected NULL\n",
+ row, rowId, col);
+ myTrans->close();
+ return NDBT_FAILED;
+ }
+ }
+ else
+ {
+ /* Non null value, check it */
+ int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
+
+ const Uint32* val= (const Uint32 *) results[ col ]->aRef();
+
+ for (int bitNum=0; bitNum < colBitWidth; bitNum++)
+ {
+ bool expectClear= (bitNum > expectedSetBits);
+ bool isClear= ! getBit(val, bitNum);
+ if (expectClear != isClear)
+ {
+ printf("Mismatch at result %d row %d, column %d, bit %d"
+ " expected %d \n",
+ row, rowId, col, bitNum, (expectClear)?0:1);
+ myTrans->close();
+ return NDBT_FAILED;
+ }
+ }
+ }
+ }
+ }
+
+ if (scanOp->nextResult() != 1)
+ {
+ printf("Too many rows returned\n");
+ return NDBT_FAILED;
+ }
+
+ if (myTrans->execute(NdbTransaction::Commit) != 0)
+ APIERROR(myTrans->getNdbError());
+
+ myTrans->close();
+
+ printf("Verified data for %u rows\n",
+ TOTAL_ROWS);
+
+ return NDBT_OK;
+}
+
+int verifyBitScanFilter(Ndb* pNdb)
+{
+ const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
+ const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
+ if(myTable == NULL)
+ APIERROR(myDict->getNdbError());
+
+ /* Perform scan with scanfilter for :
+ * - each column in the table
+ * - each supported comparison type
+ * - each potentially set bit in the column
+ */
+ int scanCount= 0;
+ for (int col=1; col < NUM_COLS; col++)
+ {
+ const NdbDictionary::Column* c= myTable->getColumn(col);
+ const int bitWidth= c->getLength();
+ printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
+ (c->getNullable())?"Nullable":"Non-null",
+ col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
+ for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
+ {
+ for (int bitNum=0; bitNum <= bitWidth; bitNum++)
+ {
+ /* Define scan */
+ NdbTransaction* myTrans= pNdb->startTransaction();
+ if (myTrans == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
+ if (scanOp == NULL)
+ APIERROR(pNdb->getNdbError());
+
+ if (scanOp->readTuples() != 0)
+ APIERROR(scanOp->getNdbError());
+
+ NdbRecAttr* ra;
+ if ((ra= scanOp->getValue((Uint32)0)) == NULL)
+ APIERROR(scanOp->getNdbError());
+
+ /* Define ScanFilter */
+ const int buffSize= (MAX_BIT_WIDTH + 31)/32;
+ Uint32 buff[ buffSize ];
+ memset(buff, 0, (4 * buffSize));
+
+ /* Define constant value, with some variants for bitwise operators */
+ bool invert= false;
+ int offset= 0;
+
+ switch (comp) {
+ case COND_AND_EQ_MASK:
+ case COND_AND_NE_MASK:
+ case COND_AND_EQ_ZERO:
+ case COND_AND_NE_ZERO:
+ getBitfieldVariants(bitNum, offset, invert);
+ default:
+ ;
+ }
+
+ /* Set lower bitNum -1 bits
+ * If bitNum == 0, set none
+ */
+ int bitsSetInFilter= bitNum-1;
+
+ if ((bitsSetInFilter >= 0) ||
+ invert)
+ {
+ for (int idx=0; idx <= (32 * buffSize); idx++)
+ {
+ if ((idx >= offset) &&
+ (idx <= (offset + bitsSetInFilter)))
+ {
+ if (!invert)
+ setBit(buff, idx);
+ }
+ else
+ {
+ if (invert)
+ setBit(buff, idx);
+ }
+ }
+ }
+
+
+
+ NdbScanFilter sf(scanOp);
+
+ if (sf.begin(NdbScanFilter::AND) != 0)
+ APIERROR(sf.getNdbError());
+
+ if ((comp != COND_NULL) &&
+ (comp != COND_NOTNULL))
+ {
+ /* Operator with a constant */
+ if (sf.cmp((NdbScanFilter::BinaryCondition)comp,
+ col,
+ (char*)buff) != 0)
+ APIERROR(sf.getNdbError());
+ }
+ else
+ {
+ switch (comp) {
+ case COND_NULL:
+ if (sf.isnull(col) != 0)
+ APIERROR(sf.getNdbError());
+ break;
+ case COND_NOTNULL:
+ if (sf.isnotnull(col) != 0)
+ APIERROR(sf.getNdbError());
+ break;
+ default:
+ printf("Condition %u not supported\n", comp);
+ return NDBT_FAILED;
+ }
+ }
+
+ if (sf.end() != 0)
+ APIERROR(sf.getNdbError());
+
+ /* Calculate expected number of rows in result */
+ const NdbDictionary::Column* c= myTable->getColumn(col);
+ int colBitWidth= c->getLength();
+ bool isNullable= c->getNullable();
+
+ // printf("Determining expected rows\n");
+ int expectedResultCount= 0;
+ for (int i=0; i< TOTAL_ROWS; i++)
+ {
+ if (isRowExpected(i,
+ (TestConditions)comp,
+ colBitWidth,
+ bitsSetInFilter,
+ isNullable,
+ buff))
+ expectedResultCount++;
+ }
+
+
+ /* Execute */
+ if (myTrans->execute(NdbTransaction::NoCommit) != 0)
+ APIERROR(myTrans->getNdbError());
+
+
+ /* Process results to ensure we got the expected rows back */
+ int rc= 0;
+ int count= 0;
+ int matchCount= 0;
+ // printf("Checking rows returned\n");
+ while ((rc= scanOp->nextResult()) == 0)
+ {
+ int rowId= ra->int32_value();
+ count++;
+ /*
+ * Check that this row was expected
+ */
+ if (isRowExpected(rowId,
+ (TestConditions)comp,
+ colBitWidth,
+ bitsSetInFilter,
+ isNullable,
+ buff))
+ {
+ matchCount++;
+ }
+ else
+ {
+ printf("Col=%u Comp=%u BitNum=%u row=%u : "
+ "Got row %u back which I did not expect\n",
+ col, comp, bitNum, count, rowId);
+ myTrans->close();
+ return NDBT_FAILED;
+ }
+ }
+
+ if (rc != 1)
+ {
+ printf("Col=%u Comp=%u BitNum=%u :"
+ "nextResult failure %d\n", col, comp, bitNum, rc);
+ APIERROR(myTrans->getNdbError());
+ }
+
+ //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
+ // col, comp, bitNum, expectedResultCount, matchCount);
+
+ /* Check that we didn't miss any expected rows */
+ if (matchCount != expectedResultCount)
+ {
+ printf("Col=%u Comp=%u BitNum=%u :"
+ "Mismatch between expected(%u) and received(%u) result counts\n",
+ col, comp, bitNum, expectedResultCount, matchCount);
+ myTrans->close();
+ return NDBT_FAILED;
+ }
+
+ if (myTrans->execute(NdbTransaction::Commit) != 0)
+ APIERROR(myTrans->getNdbError());
+
+ myTrans->close();
+
+ scanCount++;
+
+ } // for bitNum
+ } // for comparison
+ } // for column
+
+ printf("Verified %u scans with bitfield ScanFilter conditions\n",
+ scanCount);
+
+ return NDBT_OK;
+}
+
+
+int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
+{
+ /* Create table */
+ Ndb *pNdb = GETNDB(step);
+ pNdb->getDictionary()->dropTable(MYTAB2.getName());
+ int ret = createTable(pNdb, &MYTAB2, false, true, 0);
+ if(ret)
+ return ret;
+
+ /* Populate with data */
+ if (insertBitRows(pNdb) != NDBT_OK)
+ return NDBT_FAILED;
+
+ /* Initial data check via scan */
+ if (verifyBitData(pNdb) != NDBT_OK)
+ return NDBT_FAILED;
+
+ /* Verify Bit ScanFilter correctness */
+ if (verifyBitScanFilter(pNdb) != NDBT_OK)
+ return NDBT_FAILED;
+
+ /* Drop table */
+ pNdb->getDictionary()->dropTable(MYTAB2.getName());
+
+ return NDBT_OK;
+}
+
+
NDBT_TESTSUITE(testScanFilter);
TESTCASE(TEST_NAME,
"Scan table TABLE_NAME for the records which accord with \
@@ -1025,6 +1650,12 @@ TESTCASE(TEST_NAME,
FINALIZER(runDropTables);
}
+TESTCASE("TestScanFilterBit",
+ "Test ScanFilter with bitfield columns")
+{
+ INITIALIZER(runTestScanFilterBit);
+}
+
NDBT_TESTSUITE_END(testScanFilter);
@@ -1038,5 +1669,5 @@ int main(int argc, const char** argv)
return NDBT_ProgramExit(NDBT_FAILED);
}
- return testScanFilter.executeOneCtx(con, &MYTAB1, TEST_NAME);
+ return testScanFilter.execute(argc, argv);
}
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2008-12-08 13:39:11 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2008-12-09 15:47:14 +0000
@@ -1179,7 +1179,7 @@ args -n bug37672 T1
#EOF 2008-07-04
max-time: 500
-cmd: testScanFilter
+cmd: testScanFilter T1
args:
#EOF 2008-07-09
| Thread |
|---|
| • bzr push into mysql-5.1 branch (frazer:2785 to 2786) Bug#40535 | Frazer Clement | 9 Dec |