Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-11-19 20:28:21+01:00, jonas@stripped +4 -0
ndb -
add more packing of result sets
specially bitfields
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2006-11-19 20:28:04+01:00, jonas@stripped +19 -13
Add extra packing when READ_PACKED
(specially bitfields)
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2006-11-19 20:28:04+01:00, jonas@stripped +437 -127
Add extra packing
Change read function to use char* instead of Uint32
storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2006-11-19 20:28:04+01:00, jonas@stripped +5 -1
Always use READ_PACKED if possible
storage/ndb/src/ndbapi/NdbReceiver.cpp@stripped, 2006-11-19 20:28:04+01:00, jonas@stripped +52 -12
Add extra packing which is alignment dependant
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/mysql-5.1-wl2325-5.0-drop6
--- 1.34/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2006-11-19 20:28:25 +01:00
+++ 1.35/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2006-11-19 20:28:25 +01:00
@@ -348,7 +348,7 @@
friend class Suma;
public:
- typedef bool (Dbtup::* ReadFunction)(Uint32*,
+ typedef bool (Dbtup::* ReadFunction)(char*,
AttributeHeader*,
Uint32,
Uint32);
@@ -1463,7 +1463,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHOneWordNotNULL(Uint32* outBuffer,
+ bool readFixedSizeTHOneWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1476,7 +1476,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHTwoWordNotNULL(Uint32* outBuffer,
+ bool readFixedSizeTHTwoWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1489,7 +1489,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
+ bool readFixedSizeTHManyWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1502,7 +1502,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHOneWordNULLable(Uint32* outBuffer,
+ bool readFixedSizeTHOneWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1515,7 +1515,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHTwoWordNULLable(Uint32* outBuffer,
+ bool readFixedSizeTHTwoWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1528,7 +1528,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readFixedSizeTHManyWordNULLable(Uint32* outBuffer,
+ bool readFixedSizeTHManyWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
@@ -1542,16 +1542,21 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- bool readShortVarsizeNULLable(Uint32*, AttributeHeader*, Uint32, Uint32);
- bool readShortVarsizeNotNULL(Uint32*, AttributeHeader*, Uint32, Uint32);
- bool readLongVarsizeNULLable(Uint32*, AttributeHeader*, Uint32, Uint32);
- bool readLongVarsizeNotNULL(Uint32*, AttributeHeader*, Uint32, Uint32);
+ bool readFixedSizeCharNULLable(char*, AttributeHeader*, Uint32, Uint32);
+ bool readFixedSizeCharNotNULL(char*, AttributeHeader*, Uint32, Uint32);
+ bool readShortVarsizeNULLable(char*, AttributeHeader*, Uint32, Uint32);
+ bool readShortVarsizeNotNULL(char*, AttributeHeader*, Uint32, Uint32);
+ bool readLongVarsizeNULLable(char*, AttributeHeader*, Uint32, Uint32);
+ bool readLongVarsizeNotNULL(char*, AttributeHeader*, Uint32, Uint32);
- bool readBitsNULLable(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
+ bool readBitsNULLable(char* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNULLable(Uint32* inBuffer, Uint32, Uint32);
- bool readBitsNotNULL(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
+ bool readBitsNotNULL(char* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNotNULL(Uint32* inBuffer, Uint32, Uint32);
+ bool readBitNULLable(char* outBuffer, AttributeHeader*, Uint32, Uint32);
+ bool readBitNotNULL(char* outBuffer, AttributeHeader*, Uint32, Uint32);
+
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
@@ -2166,6 +2171,7 @@
Uint32 tCheckOffset;
Uint32 tMaxRead;
Uint32 tOutBufIndex;
+ Uint32 tOutBufBits;
Uint32* tTupleHeader;
bool tXfrmFlag;
bool tPackVarsizeFlag;
--- 1.25/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2006-11-19 20:28:25 +01:00
+++ 1.26/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2006-11-19 20:28:25 +01:00
@@ -40,55 +40,114 @@
Uint32 type = AttributeDescriptor::getType(attrDescriptor);
Uint32 array = AttributeDescriptor::getArrayType(attrDescriptor);
Uint32 charset = AttributeOffset::getCharsetFlag(attrOffset);
+ Uint32 size = AttributeDescriptor::getSize(attrDescriptor);
+ Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
ndbrequire(array == ZNON_ARRAY || array == ZFIXED_ARRAY);
- if (!AttributeDescriptor::getNullable(attrDescriptor)) {
- if (AttributeDescriptor::getSize(attrDescriptor) == 0){
+ if (!AttributeDescriptor::getNullable(attrDescriptor))
+ {
+ switch(size){
+ case 0: // Bitfield
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNotNULL;
+ ndbrequire(charset == 0);
+ if (array == 1)
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitNotNULL;
+ else
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNotNULL;
- } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
- ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNotNULL;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHOneWordNotNULL;
- } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 2) {
- ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHTwoWordNotNULL;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHTwoWordNotNULL;
- } else {
+ break;
+ case 3: // char
+ case 4: // short
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeCharNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL;
- }
- // replace functions for char attribute
- if (charset) {
+ break;
+ case 5: // word
+ case 6: // 64-bit
+ case 7: // 128-bit
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL;
+ ndbrequire(charset == 0);
+ switch(bytes){
+ case 4:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHOneWordNotNULL;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHOneWordNotNULL;
+ break;
+ case 8:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHTwoWordNotNULL;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHManyWordNotNULL;
+ break;
+ default:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHManyWordNotNULL;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHManyWordNotNULL;
+ break;
+ }
+ break;
+ default:
+ ljam();
+ jamLine(size);
+ ndbrequire(false);
}
- } else {
- if (AttributeDescriptor::getSize(attrDescriptor) == 0){
+ }
+ else
+ {
+ switch(size){
+ case 0: // Bitfield
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNULLable;
+ ndbrequire(charset == 0);
+ if (array == 1)
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitNULLable;
+ else
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNULLable;
- } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
+ break;
+ case 3: // char
+ case 4: // short
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNULLable;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeCharNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
- } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 2) {
+ break;
+ case 5: // word
+ case 6: // 64-bit
+ case 7: // 128-bit
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHTwoWordNULLable;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
- } else {
+ ndbrequire(charset == 0);
+ switch(bytes){
+ case 4:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHOneWordNULLable;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHManyWordNULLable;
+ break;
+ case 8:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHTwoWordNULLable;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHManyWordNULLable;
+ break;
+ default:
+ ljam();
+ regTabPtr->readFunctionArray[i] =
+ &Dbtup::readFixedSizeTHManyWordNULLable;
+ regTabPtr->updateFunctionArray[i] =
+ &Dbtup::updateFixedSizeTHManyWordNULLable;
+ break;
+ }
+ break;
+ default:
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
- }
- // replace functions for char attribute
- if (charset) {
- ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable;
- regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
+ jamLine(size);
+ ndbrequire(false);
}
}
@@ -113,6 +172,33 @@
}
}//Dbtup::setUpQueryRoutines()
+static
+inline
+Uint32
+pad32(Uint32 bytepos, Uint32 bitsused)
+{
+ if (bitsused)
+ {
+ assert((bytepos & 3) == 0);
+ }
+ Uint32 ret = 4 * ((bitsused + 31 >> 5)) +
+ ((bytepos + 3) & ~(Uint32)3);
+ return ret;
+}
+
+static
+inline
+Uint32
+pad_byte(Uint32 bytepos, Uint32 bitsused)
+{
+ if (bitsused)
+ {
+ assert((bytepos & 3) == 0);
+ }
+ Uint32 ret = bytepos + 4 * ((bitsused + 31 >> 5));
+ return ret;
+}
+
/* ---------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO READ A NUMBER OF ATTRIBUTES IN THE */
/* DATABASE AND PLACE THE RESULT IN ATTRINFO RECORDS. */
@@ -140,14 +226,17 @@
ndbrequire(attrDescriptorStart + (numAttributes << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
tOutBufIndex = 0;
+ tOutBufBits = 0;
tCheckOffset = regTabPtr->tupheadsize;
- tMaxRead = maxRead;
+ tMaxRead = 4*maxRead;
tTupleHeader = &pagePtr->pageWord[tupHeadOffset];
tXfrmFlag = xfrmFlag;
tPackVarsizeFlag = false;
+ char* outBuf = (char*)outBuffer;
ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE);
while (inBufIndex < inBufLen) {
+ Uint32 tmpBitsUsed = tOutBufBits;
Uint32 tmpAttrBufIndex = tOutBufIndex;
AttributeHeader ahIn(inBuffer[inBufIndex]);
inBufIndex++;
@@ -155,14 +244,16 @@
Uint32 attrDescriptorIndex = attrDescriptorStart + (attributeId << ZAD_LOG_SIZE);
ljam();
- AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 0);
- AttributeHeader* ahOut = (AttributeHeader*)&outBuffer[tmpAttrBufIndex];
- tOutBufIndex = tmpAttrBufIndex + 1;
+ tmpAttrBufIndex = pad32(tmpAttrBufIndex, tmpBitsUsed);
+ AttributeHeader::init(&outBuf[tmpAttrBufIndex], attributeId, 0);
+ AttributeHeader* ahOut = (AttributeHeader*)&outBuf[tmpAttrBufIndex];
+ tOutBufBits = 0;
+ tOutBufIndex = tmpAttrBufIndex + 4;
if (attributeId < numAttributes) {
Uint32 attributeDescriptor = tableDescriptor[attrDescriptorIndex].tabDescr;
Uint32 attributeOffset = tableDescriptor[attrDescriptorIndex + 1].tabDescr;
ReadFunction f = regTabPtr->readFunctionArray[attributeId];
- if ((this->*f)(outBuffer,
+ if ((this->*f)(outBuf,
ahOut,
attributeDescriptor,
attributeOffset)) {
@@ -172,21 +263,23 @@
}//if
} else if(attributeId & AttributeHeader::PSEUDO){
Uint32 sz = read_pseudo(inBuffer, inBufIndex, inBufLen,
- outBuffer+tmpAttrBufIndex);
+ (Uint32*)(outBuf+tmpAttrBufIndex));
inBufIndex += sz;
} else {
terrorCode = ZATTRIBUTE_ID_ERROR;
return -1;
}//if
}//while
- return tOutBufIndex;
+ return pad32(tOutBufIndex, tOutBufBits) >> 2;
}//Dbtup::readAttributes()
Uint32
Dbtup::read_packed(const Uint32* inBuf,
Uint32 idx, Uint32 len,
- Uint32* outBuf)
+ Uint32* outBuffer)
{
+ ndbassert((UintPtr(outBuffer) & 3) == 0);
+
Tablerec* const regTabPtr = tabptr.p;
Uint32 cnt;
Uint32 numAttributes = regTabPtr->noOfAttr;
@@ -218,13 +311,14 @@
Uint32 nullcnt = nullable.count();
Uint32 masksz = (cnt + nullcnt + 31) >> 5;
- Uint32* dst = outBuf + 1 + masksz;
- Uint32* dstmask = outBuf + 1;
- AttributeHeader::init(outBuf, AttributeHeader::READ_PACKED, 4*masksz);
+ Uint32* dstmask = outBuffer + 1;
+ AttributeHeader::init(outBuffer, AttributeHeader::READ_PACKED, 4*masksz);
bzero(dstmask, 4*masksz);
AttributeHeader ahOut;
- tOutBufIndex = tOutBufIndex + masksz;
+ char* outBuf = (char*)(outBuffer + 1 - (outIdx >> 2));
+ tOutBufIndex = pad32(outIdx, 0) + 4*masksz;
+ tOutBufBits = 0;
if (likely(outIdx + masksz <= maxRead))
{
ljam();
@@ -259,6 +353,8 @@
}
}
tPackVarsizeFlag = false;
+ tOutBufIndex = pad32(tOutBufIndex, tOutBufBits);
+ tOutBufBits = 0;
return bmlen32;
}
}
@@ -269,22 +365,27 @@
}
bool
-Dbtup::readFixedSizeTHOneWordNotNULL(Uint32* outBuffer,
+Dbtup::readFixedSizeTHOneWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
+ Uint32 bitsUsed = tOutBufBits;
+ Uint32 maxRead = tMaxRead;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 const wordRead = tTupleHeader[readOffset];
- Uint32 newIndexBuf = indexBuf + 1;
- Uint32 maxRead = tMaxRead;
+
+ indexBuf = pad32(indexBuf, bitsUsed);
+ Uint32 newIndexBuf = indexBuf + 4;
+ Uint32* dst = (Uint32*)(outBuffer + indexBuf);
ndbrequire(readOffset < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
- outBuffer[indexBuf] = wordRead;
+ dst[0] = wordRead;
ahOut->setDataSize(1);
+ tOutBufBits = 0;
tOutBufIndex = newIndexBuf;
return true;
} else {
@@ -295,24 +396,29 @@
}//Dbtup::readFixedSizeTHOneWordNotNULL()
bool
-Dbtup::readFixedSizeTHTwoWordNotNULL(Uint32* outBuffer,
+Dbtup::readFixedSizeTHTwoWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
+ Uint32 bitsUsed = tOutBufBits;
+ Uint32 maxRead = tMaxRead;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 const wordReadFirst = tTupleHeader[readOffset];
Uint32 const wordReadSecond = tTupleHeader[readOffset + 1];
- Uint32 newIndexBuf = indexBuf + 2;
- Uint32 maxRead = tMaxRead;
+
+ indexBuf = pad32(indexBuf, bitsUsed);
+ Uint32 newIndexBuf = indexBuf + 8;
+ Uint32* dst = (Uint32*)(outBuffer + indexBuf);
ndbrequire(readOffset + 1 < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(2);
- outBuffer[indexBuf] = wordReadFirst;
- outBuffer[indexBuf + 1] = wordReadSecond;
+ dst[0] = wordReadFirst;
+ dst[1] = wordReadSecond;
+ tOutBufBits = 0;
tOutBufIndex = newIndexBuf;
return true;
} else {
@@ -322,27 +428,67 @@
}//if
}//Dbtup::readFixedSizeTHTwoWordNotNULL()
+
bool
-Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
+Dbtup::readFixedSizeTHManyWordNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
- Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+ Uint32 bitsUsed = tOutBufBits;
+ Uint32 maxRead = tMaxRead;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
+
+ indexBuf = pad32(indexBuf, bitsUsed);
+ Uint32 newIndexBuf = indexBuf + 4*attrNoOfWords;
+ ndbrequire((readOffset + (attrNoOfWords - 1)) < tCheckOffset);
+
+ if (newIndexBuf <= maxRead)
+ {
+ ljam();
+ ahOut->setDataSize(attrNoOfWords);
+ MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+ &tTupleHeader[readOffset],
+ attrNoOfWords);
+ tOutBufBits = 0;
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}//Dbtup::readFixedSizeTHManyWordNotNULL()
+
+
+bool
+Dbtup::readFixedSizeCharNotNULL(char* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 bitsUsed = tOutBufBits;
Uint32 maxRead = tMaxRead;
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+ Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
+ Uint32 attrBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 attrNoOfWords = (attrBytes + 3) >> 2;
- ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
+ indexBuf = pad_byte(indexBuf, bitsUsed);
+ ndbrequire((readOffset + (attrNoOfWords - 1)) < tCheckOffset);
if (! charsetFlag || ! tXfrmFlag) {
- Uint32 newIndexBuf = indexBuf + attrNoOfWords;
- if (newIndexBuf <= maxRead) {
+ Uint32 newIndexBuf = indexBuf + attrBytes;
+ if (newIndexBuf <= maxRead)
+ {
ljam();
- ahOut->setDataSize(attrNoOfWords);
+ ahOut->setByteSize(attrBytes);
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
+ tOutBufBits = 0;
tOutBufIndex = newIndexBuf;
return true;
} else {
@@ -352,7 +498,7 @@
} else {
ljam();
Tablerec* regTabPtr = tabptr.p;
- Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 srcBytes = attrBytes;
uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
@@ -367,18 +513,20 @@
xmul = 1;
// see comment in DbtcMain.cpp
Uint32 dstLen = xmul * (srcBytes - lb);
- Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
- if (maxIndexBuf <= maxRead) {
+ Uint32 maxIndexBuf = indexBuf + lb + dstLen;
+ if (maxIndexBuf <= maxRead)
+ {
ljam();
- int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ const int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen,
+ srcPtr + lb, len);
ndbrequire(n != -1);
- while ((n & 3) != 0) {
- dstPtr[n++] = 0;
+ Uint32 m = n;
+ while ((m & 3) != 0) {
+ dstPtr[m++] = 0;
}
- Uint32 dstWords = (n >> 2);
- ahOut->setDataSize(dstWords);
- Uint32 newIndexBuf = indexBuf + dstWords;
- ndbrequire(newIndexBuf <= maxRead);
+ ahOut->setByteSize(n);
+ Uint32 newIndexBuf = indexBuf + n;
+ tOutBufBits = 0;
tOutBufIndex = newIndexBuf;
return true;
} else {
@@ -391,10 +539,10 @@
}
}
return false;
-}//Dbtup::readFixedSizeTHManyWordNotNULL()
+}
bool
-Dbtup::readFixedSizeTHOneWordNULLable(Uint32* outBuffer,
+Dbtup::readFixedSizeTHOneWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -413,7 +561,7 @@
}//Dbtup::readFixedSizeTHOneWordNULLable()
bool
-Dbtup::readFixedSizeTHTwoWordNULLable(Uint32* outBuffer,
+Dbtup::readFixedSizeTHTwoWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -432,7 +580,7 @@
}//Dbtup::readFixedSizeTHTwoWordNULLable()
bool
-Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer,
+Dbtup::readFixedSizeTHManyWordNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -451,6 +599,25 @@
}//Dbtup::readFixedSizeTHManyWordNULLable()
bool
+Dbtup::readFixedSizeCharNULLable(char* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ if (!nullFlagCheck(attrDes2)) {
+ ljam();
+ return readFixedSizeCharNotNULL(outBuffer,
+ ahOut,
+ attrDescriptor,
+ attrDes2);
+ } else {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }//if
+}//Dbtup::readFixedSizeTHManyWordNULLable()
+
+bool
Dbtup::nullFlagCheck(Uint32 attrDes2)
{
Tablerec* const regTabPtr = tabptr.p;
@@ -463,7 +630,7 @@
}//Dbtup::nullFlagCheck()
bool
-Dbtup::readShortVarsizeNULLable(Uint32* outBuffer,
+Dbtup::readShortVarsizeNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -483,27 +650,27 @@
}
bool
-Dbtup::readShortVarsizeNotNULL(Uint32* outBuffer,
+Dbtup::readShortVarsizeNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
const Uint32 pack = tPackVarsizeFlag;
const Uint32 indexBuf = tOutBufIndex;
+ const Uint32 bitsUsed = tOutBufBits;
Uint8* ptr = (Uint8*)(outBuffer + indexBuf);
- if (readFixedSizeTHManyWordNotNULL(outBuffer, ahOut,
- attrDescriptor, attrDes2))
+ if (readFixedSizeCharNotNULL(outBuffer, ahOut,
+ attrDescriptor, attrDes2))
{
- Uint32 oldlen32 = ahOut->getDataSize();
+ Uint32 oldlen = ahOut->getByteSize();
if (pack)
{
ljam();
Uint32 len = 1 + ptr[0];
- Uint32 len32 = (len + 3) >> 2;
- ahOut->setDataSize(len32);
- ndbassert(len32 <= oldlen32);
- ndbassert(tOutBufIndex == indexBuf + oldlen32);
- tOutBufIndex = indexBuf + len32;
+ ahOut->setByteSize(len);
+ ndbassert(len <= oldlen);
+ ndbassert(tOutBufIndex == indexBuf + oldlen);
+ tOutBufIndex = indexBuf + len;
}
return true;
}
@@ -511,7 +678,7 @@
}
bool
-Dbtup::readLongVarsizeNULLable(Uint32* outBuffer,
+Dbtup::readLongVarsizeNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -531,25 +698,27 @@
}
bool
-Dbtup::readLongVarsizeNotNULL(Uint32* outBuffer,
+Dbtup::readLongVarsizeNotNULL(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
const Uint32 pack = tPackVarsizeFlag;
const Uint32 indexBuf = tOutBufIndex;
+ const Uint32 bitsUsed = tOutBufBits;
Uint8* ptr = (Uint8*)(outBuffer + indexBuf);
- if (readFixedSizeTHManyWordNotNULL(outBuffer, ahOut,
- attrDescriptor, attrDes2))
+ if (readFixedSizeCharNotNULL(outBuffer, ahOut,
+ attrDescriptor, attrDes2))
{
- Uint32 oldlen32 = ahOut->getDataSize();
+ Uint32 oldlen = ahOut->getByteSize();
if (pack)
{
ljam();
Uint32 len = 2 + ptr[0] + 256*ptr[1];
- Uint32 len32 = (len + 3) >> 2;
- ahOut->setDataSize(len32);
- tOutBufIndex = indexBuf + len32;
+ ahOut->setByteSize(len);
+ ndbassert(len <= oldlen);
+ ndbassert(tOutBufIndex == indexBuf + oldlen);
+ tOutBufIndex = indexBuf + len;
}
return true;
}
@@ -637,18 +806,20 @@
AttributeHeader::init(&attributeHeader, attributeId, 0);
tOutBufIndex = 0;
+ tOutBufBits = 0;
tMaxRead = MAX_KEY_SIZE_IN_WORDS;
bool tmp = tXfrmFlag;
tXfrmFlag = false;
- ndbrequire((this->*f)(&keyReadBuffer[0], ahOut, attrDescriptor, attributeOffset));
+ ndbrequire((this->*f)((char*)keyReadBuffer, ahOut,
+ attrDescriptor, attributeOffset));
tXfrmFlag = tmp;
- ndbrequire(tOutBufIndex == ahOut->getDataSize());
+ ndbrequire(pad32(tOutBufIndex, tOutBufBits) == 4*ahOut->getDataSize());
if (ahIn.getDataSize() != ahOut->getDataSize()) {
ljam();
return true;
}//if
- if (memcmp(&keyReadBuffer[0], &updateBuffer[1], tOutBufIndex << 2) != 0) {
+ if (memcmp(&keyReadBuffer[0], &updateBuffer[1], tOutBufIndex) != 0) {
ljam();
return true;
}//if
@@ -839,6 +1010,7 @@
Uint32* outBuffer)
{
ndbassert(idx);
+ ndbassert((UintPtr(outBuffer) & 3) == 0);
idx--;
AttributeHeader ah(* (inBuf+idx));
Uint32 attrId = ah.getAttributeId();
@@ -852,7 +1024,7 @@
return read_packed(inBuf, idx, len, outBuffer);
case AttributeHeader::FRAGMENT:
outBuffer[1] = operPtr.p->fragId >> 1; // remove "hash" bit
- sz = 1;
+ sz = 4;
break;
case AttributeHeader::FRAGMENT_MEMORY:{
Uint64 tmp = 0;
@@ -870,11 +1042,11 @@
tmp *= 32768;
memcpy(outBuffer+1,&tmp,8);
}
- sz = 2;
+ sz = 8;
break;
case AttributeHeader::ROW_SIZE:
outBuffer[1] = tabptr.p->tupheadsize << 2;
- sz = 1;
+ sz = 4;
break;
case AttributeHeader::ROW_COUNT:
case AttributeHeader::COMMIT_COUNT:
@@ -884,7 +1056,7 @@
EXECUTE_DIRECT(DBLQH, GSN_READ_PSEUDO_REQ, signal, 2);
outBuffer[1] = signal->theData[0];
outBuffer[2] = signal->theData[1];
- sz = 2;
+ sz = 8;
break;
case AttributeHeader::RANGE_NO:
signal->theData[0] = operPtr.p->userpointer;
@@ -892,7 +1064,7 @@
EXECUTE_DIRECT(DBLQH, GSN_READ_PSEUDO_REQ, signal, 2);
outBuffer[1] = signal->theData[0];
- sz = 1;
+ sz = 4;
break;
case AttributeHeader::RECORDS_IN_RANGE:
signal->theData[0] = operPtr.p->userpointer;
@@ -903,20 +1075,60 @@
outBuffer[2] = signal->theData[1];
outBuffer[3] = signal->theData[2];
outBuffer[4] = signal->theData[3];
- sz = 4;
+ sz = 16;
break;
default:
sz = 0;
break;
}
- AttributeHeader::init(outBuffer, attrId, sz << 2);
+ AttributeHeader::init(outBuffer, attrId, sz);
tOutBufIndex = outBufIndex + sz;
return 0;
}
bool
-Dbtup::readBitsNotNULL(Uint32* outBuffer,
+Dbtup::readBitNotNULL(char* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ ndbassert(AttributeDescriptor::getArraySize(attrDescriptor) == 1);
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 bitsUsed = tOutBufBits;
+ Uint32 maxRead = tMaxRead;
+
+ indexBuf = pad32(indexBuf, 0);
+ Uint32 *outBuf = (Uint32*)(outBuffer + indexBuf);
+ Uint32 newIndexBuf = indexBuf + 4 * ((bitsUsed + 1) >> 5);
+ Uint32 val = outBuf[0];
+ Uint32 mask = (1 << bitsUsed) - 1;
+ val &= mask;
+ val |= BitmaskImpl::get(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos) << bitsUsed;
+
+ if (newIndexBuf <= maxRead)
+ {
+ ljam();
+ ahOut->setDataSize(1);
+ outBuf[0] = val;
+ tOutBufBits = (bitsUsed + 1) & 31;
+ tOutBufIndex = newIndexBuf;
+ return true;
+ }
+ else
+ {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::readBitNULLable(char* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -924,20 +1136,94 @@
Tablerec* const regTabPtr = tabptr.p;
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+
Uint32 indexBuf = tOutBufIndex;
- Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 bitsUsed = tOutBufBits;
Uint32 maxRead = tMaxRead;
+ indexBuf = pad32(indexBuf, 0);
- if (newIndexBuf <= maxRead) {
+ if(BitmaskImpl::get(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos))
+ {
ljam();
- ahOut->setDataSize((bitCount + 31) >> 5);
- tOutBufIndex = newIndexBuf;
-
- BitmaskImpl::getField(regTabPtr->tupNullWords,
+ ahOut->setNULL();
+ return true;
+ }
+
+ Uint32 *outBuf = (Uint32*)(outBuffer + indexBuf);
+ Uint32 newIndexBuf = indexBuf + 4 * ((bitsUsed + 1) >> 5);
+ Uint32 val = outBuf[0];
+ Uint32 mask = (1 << bitsUsed) - 1;
+ val &= mask;
+ val |= BitmaskImpl::get(regTabPtr->tupNullWords,
tTupleHeader+regTabPtr->tupNullIndex,
- pos,
- bitCount,
- outBuffer+indexBuf);
+ pos + 1) << bitsUsed;
+
+ if (newIndexBuf <= maxRead)
+ {
+ ljam();
+ ahOut->setDataSize(1);
+ outBuf[0] = val;
+ tOutBufBits = (bitsUsed + 1) & 31;
+ tOutBufIndex = newIndexBuf;
+ return true;
+ }
+ else
+ {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::readBitsNotNULL(char* outBuf,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 bitsUsed = tOutBufBits;
+ Uint32 maxRead = tMaxRead;
+
+ indexBuf = pad32(indexBuf, 0);
+ Uint32 newIndexBuf = indexBuf + 4 * ((bitsUsed + bitCount) >> 5);
+ Uint32* outBuffer = (Uint32*)(outBuf + indexBuf);
+
+ if (1 + newIndexBuf <= maxRead) {
+ ljam();
+ Uint32 sz = (bitCount + 31) >> 5;
+ ahOut->setDataSize(sz);
+
+ if (bitsUsed == 0)
+ {
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos,
+ bitCount,
+ outBuffer);
+ }
+ else
+ {
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos,
+ bitCount,
+ outBuffer + 2);
+
+ BitmaskImpl::setField(1+sz,
+ outBuffer,
+ bitsUsed,
+ bitCount,
+ outBuffer + 2);
+ }
+
+ tOutBufBits = (bitsUsed + bitCount) & 31;
+ tOutBufIndex = newIndexBuf;
return true;
} else {
@@ -948,7 +1234,7 @@
}
bool
-Dbtup::readBitsNULLable(Uint32* outBuffer,
+Dbtup::readBitsNULLable(char* outBuf,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
@@ -956,11 +1242,14 @@
Tablerec* const regTabPtr = tabptr.p;
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
-
Uint32 indexBuf = tOutBufIndex;
- Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 bitsUsed = tOutBufBits;
Uint32 maxRead = tMaxRead;
-
+
+ indexBuf = pad32(indexBuf, 0);
+ Uint32 newIndexBuf = indexBuf + 4 * ((bitsUsed + bitCount) >> 5);
+ Uint32* outBuffer = (Uint32*)(outBuf + indexBuf);
+
if(BitmaskImpl::get(regTabPtr->tupNullWords,
tTupleHeader+regTabPtr->tupNullIndex,
pos))
@@ -969,17 +1258,38 @@
ahOut->setNULL();
return true;
}
-
-
- if (newIndexBuf <= maxRead) {
+
+ if (2+newIndexBuf <= maxRead) {
ljam();
- ahOut->setDataSize((bitCount + 31) >> 5);
+ Uint32 sz = (bitCount + 31) >> 5;
+ ahOut->setDataSize(sz);
+
+ if (bitsUsed == 0)
+ {
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos + 1,
+ bitCount,
+ outBuffer);
+ }
+ else
+ {
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos + 1,
+ bitCount,
+ outBuffer + 2);
+
+ BitmaskImpl::setField(1+sz,
+ outBuffer,
+ bitsUsed,
+ bitCount,
+ outBuffer + 2);
+ }
+
+ tOutBufBits = (bitsUsed + bitCount) & 31;
tOutBufIndex = newIndexBuf;
- BitmaskImpl::getField(regTabPtr->tupNullWords,
- tTupleHeader+regTabPtr->tupNullIndex,
- pos+1,
- bitCount,
- outBuffer+indexBuf);
+
return true;
} else {
ljam();
--- 1.23/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2006-11-19 20:28:25 +01:00
+++ 1.24/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2006-11-19 20:28:25 +01:00
@@ -130,7 +130,11 @@
}
const Uint32 newlen = 1 + (maxId >> 5);
const bool all = cols == save;
- if (check == 0 && (all || (newlen < save)))
+ if (check == 0
+#ifdef VM_TRACE
+ && !getenv("NOPACKED")
+#endif
+ )
{
assert(1+ MAXNROFATTRIBUTESINWORDS <= TcKeyReq::MaxAttrInfo);
--- 1.20/storage/ndb/src/ndbapi/NdbReceiver.cpp 2006-11-19 20:28:25 +01:00
+++ 1.21/storage/ndb/src/ndbapi/NdbReceiver.cpp 2006-11-19 20:28:25 +01:00
@@ -228,6 +228,31 @@
return start;
}
+static
+inline
+const Uint8*
+pad(const Uint8* src, Uint32 align, Uint32 bitPos)
+{
+ UintPtr ptr = UintPtr(src);
+ switch(align & 3){
+ case 0:
+ return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
+ default:
+ return src + 4 * ((bitPos + 31) >> 5);
+ }
+}
+
+static
+void
+handle_packed_bit(const Uint8* _src, Uint32 pos, Uint32 len, NdbRecAttr* curr)
+{
+ Uint32 * src = (Uint32*)_src;
+ Uint32 * dst = (Uint32*)curr->aRef();
+ assert((UintPtr(dst) & 3) == 0);
+ assert((UintPtr(src) & 3) == 0);
+ BitmaskImpl::getField(1 + ((len + 31) >> 5), src, pos, len, dst);
+}
+
Uint32
NdbReceiver::receive_packed(NdbRecAttr** recAttr,
Uint32 bmlen,
@@ -235,15 +260,17 @@
Uint32 aLength)
{
NdbRecAttr* currRecAttr = *recAttr;
- const Uint32 *src = aDataPtr + bmlen;
+ const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
+ Uint32 bitPos = 0;
for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
{
if (BitmaskImpl::get(bmlen, aDataPtr, i))
{
- const NdbDictionary::Column * col = currRecAttr->getColumn();
- const NdbDictionary::Column::Type type = col->getType();
- assert(attrId == col->getColumnNo());
- if (col->getNullable())
+ const NdbColumnImpl & col =
+ NdbColumnImpl::getImpl(* currRecAttr->getColumn());
+ const NdbDictionary::Column::Type type = col.m_type;
+ assert(attrId == col.m_attrId);
+ if (col.m_nullable)
{
if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
{
@@ -252,27 +279,40 @@
continue;
}
}
- Uint32 sz = col->getSizeInBytes();
- Uint8* ptr = (Uint8*)src;
+ Uint32 align = col.m_attrSize;
+ Uint32 array = col.m_arraySize;
+ Uint32 len = col.m_length;
+ Uint32 sz = align * array;
switch(type){
+ case NdbDictionary::Column::Bit:
+ src = pad(src, 0, 0);
+ handle_packed_bit(src, bitPos, len, currRecAttr);
+ src += 4 * ((bitPos + len) >> 5);
+ bitPos = (bitPos + len) & 31;
+ goto next;
case NdbDictionary::Column::Varchar:
case NdbDictionary::Column::Varbinary:
- sz = 1 + ptr[0];
+ src = pad(src, align, bitPos);
+ sz = 1 + src[0];
break;
case NdbDictionary::Column::Longvarchar:
case NdbDictionary::Column::Longvarbinary:
- sz = 2 + ptr[0] + 256 * ptr[1];
+ src = pad(src, align, bitPos);
+ sz = 2 + src[0] + 256 * src[1];
break;
default:
+ src = pad(src, align, bitPos);
break;
}
- currRecAttr->receive_data(src, sz);
- src += (sz + 3) >> 2;
+
+ currRecAttr->receive_data((Uint32*)src, sz);
+ src += sz;
+ next:
currRecAttr = currRecAttr->next();
}
}
* recAttr = currRecAttr;
- return src - aDataPtr;
+ return ((Uint32*)pad(src, 0, bitPos)) - aDataPtr;
}
int
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2081) | jonas | 21 Nov |