#At file:///C:/Work/bzr/Merge/mysql-6.0-falcon-team/ based on revid:kevin.lewis@stripped
2702 Kevin Lewis 2009-05-12
Bug#43344 & Bug#44721
Second patch (comprehensive) with review changes
This series of changes closes many gaps in which a data buffer can get
thawed and then re-chilled almost immediatly which can cause the use
of a bad data buffer pointer. For 44344, it makes the act of chilling
and thawing records an atomic operation which uses CAS. Each call to
thaw returns a valid pointer to the buffer that was thawed so that it
can be used during the current cycle as long as the caller has a
CycleLock.
Record.h, Record.cpp, RecordVersion.h & RecordVersion.cpp;
The most significant change is that when a data buffer is chilled,
it recieves the value (-1) and this happens atomically.
Record::state == recChilled is no longer used.
The record.data pointer is now always read once, checked, and
returned up the call stack to where it is used.
hasRecord() is changed from an inline function to a virtual function
for both Record and RecordVersion. hasRecord() and isChilled() are
added so that the record can check data.record itself.
setEncodedRecord() and thaw() return the pointer to the buffer
that they just created and put into data.record so that the caller
does not have to rely on that buffer being there later.
getEncodedValue() uses the recordData pointer of the caller instead
of getting it again from data.record.
getAllocatedSize() is added so that the number of bytes in a buffer
holding the record can be consistently known. It is used in several
places.
chillData() is similar to and replaces deleteData() during
SRLUpdateRecords::chill
isDeleted() is also added so that the common (state == recDeleted)
check can be done by the record itself. This is not fully
implemented yet because we need to get away from the over-use of
these state flags. They are somewhat unreliable on multiple CPUs
unless they are set with CAS.
getRecordData() is also changed from an inline to a virtual function.
findRecordData() is like getRecordData but without the thaw. It is
used in three places along the call stack for thawing a record to
check if another thread has already thawed that record. This is
not likely now because of the use of syncThaw. But syncThaw may
no longer be necessary.
isAllocated() is an inline function used about 6 times to determine
if a pointer is real or not.
SRLUpdateRecords.h & SRLUpdateRecords.cpp;
thaw() returns the data buffer thawed in addition to the number of
bytes reallocated.
chill() is not sent the dataLength anymore since it calculates that
itself.
SerialLog.h, SerialLog.cpp, transaction.h, transaction.cpp;
Since the number of bytes chilled and thawed stored in the SerialLog
and Transaction were being maintained but not used, I deleted those
variables and code used to maintain them. The number of allocated
bytes associated with a Transaction is stored in totalRecorddata and
is still maintained.
transaction.cpp
Even though backlogging is not currently active, the choice whether
to backlog any particular record needed to be enhanced. Along with
replacing hasRecord(false) with isChilled(), it should avoid
backlogging record chains that do not start at the base record.
That is checked with isSuperceded(). Also, backlogging should occur
for deleted records as well as newly chilled records.
modified:
storage/falcon/Record.cpp
storage/falcon/Record.h
storage/falcon/RecordVersion.cpp
storage/falcon/RecordVersion.h
storage/falcon/SRLUpdateRecords.cpp
storage/falcon/SRLUpdateRecords.h
storage/falcon/SerialLog.cpp
storage/falcon/SerialLog.h
storage/falcon/Table.cpp
storage/falcon/Transaction.cpp
storage/falcon/Transaction.h
=== modified file 'storage/falcon/Record.cpp'
--- a/storage/falcon/Record.cpp 2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Record.cpp 2009-05-12 14:35:40 +0000
@@ -188,8 +188,7 @@ void Record::setValue(TransactionState *
switch (encoding)
{
case noEncoding:
- if (data.record != NULL)
- DELETE_RECORD (data.record);
+ deleteData(false);
data.record = (char*) NEW Value[format->count];
encoding = valueVector;
@@ -304,13 +303,15 @@ void Record::setValue(TransactionState *
int Record::getFormatVersion()
{
+ char* recordData = getRecordData();
+
switch (encoding)
{
case traditional:
- return *(short*) data.record;
+ return *(short*) recordData;
case shortVector:
- return -*(short*) (data.record + ((USHORT*) data.record)[0] - sizeof(short));
+ return -*(short*) (recordData + ((USHORT*) recordData)[0] - sizeof(short));
default:
NOT_YET_IMPLEMENTED;
@@ -363,9 +364,8 @@ void Record::getRawValue(int fieldId, Va
return;
// If chilled, restore the record data from the serial log
-
- if (state == recChilled)
- thaw();
+
+ char* recordData = getRecordData();
// If this is an encoded record, parse through the fields
@@ -374,11 +374,11 @@ void Record::getRawValue(int fieldId, Va
case byteVector:
case shortVector:
case longVector:
- getEncodedValue(fieldId, value);
+ getEncodedValue(recordData, fieldId, value);
return;
case valueVector:
- value->setValue(((Value*) data.record) + format->format[fieldId].physicalId, false);
+ value->setValue(((Value*) recordData) + format->format[fieldId].physicalId, false);
return;
case traditional:
@@ -390,10 +390,10 @@ void Record::getRawValue(int fieldId, Va
FieldFormat *ff = format->format + fieldId;
- if (!(data.record [NULL_BYTE (ff)] & NULL_BIT (ff)))
+ if (!(recordData [NULL_BYTE (ff)] & NULL_BIT (ff)))
return;
- char *ptr = data.record + ff->offset;
+ char *ptr = recordData + ff->offset;
switch (ff->type)
{
@@ -558,19 +558,21 @@ int Record::getBlobId(int fieldId)
if (fieldId >= format->maxId)
return -1;
+ char* recordData = getRecordData();
+
switch (encoding)
{
case traditional:
{
FieldFormat *ff = format->format + fieldId;
- if (!(data.record [NULL_BYTE (ff)] & NULL_BIT (ff)))
+ if (!(recordData [NULL_BYTE (ff)] & NULL_BIT (ff)))
return -1;
if (ff->type != Asciiblob && ff->type != Binaryblob)
return -1;
- return *(int32*) (data.record + ff->offset);
+ return *(int32*) (recordData + ff->offset);
}
case byteVector:
@@ -578,7 +580,7 @@ int Record::getBlobId(int fieldId)
case longVector:
{
Value value;
- getEncodedValue(fieldId, &value);
+ getEncodedValue(recordData, fieldId, &value);
if (value.getType() == Asciiblob || value.getType() == Binaryblob)
return value.getBlobId();
@@ -606,17 +608,18 @@ int Record::getSavePointId()
bool Record::getRecord(Stream *stream)
{
- if (data.record != NULL)
+ char* recordData = getRecordData();
+ if (recordData != NULL)
{
switch (encoding)
{
case traditional:
- stream->compress (format->length, data.record);
+ stream->compress (format->length, recordData);
break;
case shortVector:
stream->putSegment(getEncodedSize(),
- data.record + ((USHORT*) data.record)[0] - sizeof(short), false);
+ recordData + ((USHORT*) recordData)[0] - sizeof(short), false);
break;
default:
@@ -624,11 +627,19 @@ bool Record::getRecord(Stream *stream)
}
}
- return (data.record != NULL);
+ return (recordData != NULL);
+}
+
+int Record::getAllocatedSize()
+{
+ int vectorLength = format->count * sizeof(short);
+ return vectorLength + getEncodedSize();
}
int Record::getEncodedSize()
{
+ char* recordData = getRecordData();
+
switch (encoding)
{
case traditional:
@@ -638,7 +649,7 @@ int Record::getEncodedSize()
return size - format->count * sizeof(USHORT) - getSize();
case noEncoding:
- if (!data.record)
+ if (!recordData)
return 0;
default:
NOT_YET_IMPLEMENTED;
@@ -647,32 +658,32 @@ int Record::getEncodedSize()
}
}
-void Record::getEncodedValue(int fieldId, Value *value)
+void Record::getEncodedValue(char* recordData, int fieldId, Value *value)
{
switch (encoding)
{
case shortVector:
{
int index = format->format[fieldId].physicalId;
- USHORT *vector = (USHORT*) data.record;
+ USHORT *vector = (USHORT*) recordData;
if (highWater < index)
{
- const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+ const UCHAR *p = (UCHAR*) recordData + vector[highWater];
while (highWater < index)
{
const UCHAR *q = EncodedDataStream::skip(p);
- vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+ vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
p = q;
}
}
ASSERT(vector[index] < size);
- const UCHAR *q = EncodedDataStream::decode((UCHAR*) data.record + vector[index], value, false);
+ const UCHAR *q = EncodedDataStream::decode((UCHAR*) recordData + vector[index], value, false);
if (++index < format->count && highWater < index)
- vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+ vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
return;
}
@@ -707,17 +718,9 @@ void Record::finalize(Transaction *trans
delete [] values;
}
-int Record::setEncodedRecord(Stream *stream, bool interlocked)
+char* Record::setEncodedRecord(Stream *stream, bool interlocked)
{
- if (data.record)
- {
- if (encoding == valueVector)
- delete [] (Value*) data.record;
- else
- DELETE_RECORD (data.record);
-
- data.record = NULL;
- }
+ deleteData(false);
encoding = shortVector;
int vectorLength = format->count * sizeof(short);
@@ -726,18 +729,19 @@ int Record::setEncodedRecord(Stream *str
memset(dataBuffer, 0, vectorLength);
stream->getSegment(0, stream->totalLength, dataBuffer + vectorLength);
((USHORT*)dataBuffer)[0] = (USHORT) (vectorLength + sizeof(short));
-
highWater = 0;
- size += totalLength;
generation = format->table->database->currentGeneration;
+ // This could be called during a thaw, in which case the size is already set.
+
+ if (size == getSize())
+ size += totalLength;
+
if (interlocked)
{
- char **ptr = &data.record;
-
// If data.record has changed since allocating the new buffer, then free the new buffer
-
- if (!COMPARE_EXCHANGE_POINTER(ptr, NULL, dataBuffer))
+
+ if (!COMPARE_EXCHANGE_POINTER(&data.record, NULL, dataBuffer))
{
DELETE_RECORD(dataBuffer);
totalLength = 0;
@@ -746,42 +750,41 @@ int Record::setEncodedRecord(Stream *str
else
data.record = dataBuffer;
- return totalLength;
+ return dataBuffer;
}
const char* Record::getEncodedRecord()
{
- // If chilled, restore the record data from the serial log
-
- if (state == recChilled)
- thaw();
-
ASSERT(encoding == shortVector);
- return data.record + ((USHORT*) data.record)[0];
+ char* recordData = getRecordData();
+
+ return recordData + ((USHORT*) recordData)[0];
}
const UCHAR* Record::getEncoding(int index)
{
+ char* recordData = getRecordData();
+
switch (encoding)
{
case shortVector:
{
- USHORT *vector = (USHORT*) data.record;
+ USHORT *vector = (USHORT*) recordData;
if (highWater < index)
{
- const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+ const UCHAR *p = (UCHAR*) recordData + vector[highWater];
while (highWater < index)
{
const UCHAR *q = EncodedDataStream::skip(p);
- vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+ vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
p = q;
}
}
- return (UCHAR*) data.record + vector[index];
+ return (UCHAR*) recordData + vector[index];
}
default:
@@ -795,6 +798,8 @@ bool Record::isNull(int fieldId)
if (fieldId >= format->maxId)
return true;
+ char* recordData = getRecordData();
+
// If this is an encoded record, parse through the fields
switch (encoding)
@@ -802,33 +807,33 @@ bool Record::isNull(int fieldId)
case shortVector:
{
int index = format->format[fieldId].physicalId;
- USHORT *vector = (USHORT*) data.record;
+ USHORT *vector = (USHORT*) recordData;
if (highWater < index)
{
- const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+ const UCHAR *p = (UCHAR*) recordData + vector[highWater];
while (highWater < index)
{
const UCHAR *q = EncodedDataStream::skip(p);
- vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+ vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
p = q;
}
}
- const UCHAR *q = (UCHAR*) data.record + vector[index];
+ const UCHAR *q = (UCHAR*) recordData + vector[index];
return *q == edsNull;
}
case valueVector:
- return ((Value**) data.record)[format->format[fieldId].physicalId]->isNull();
+ return ((Value**) recordData)[format->format[fieldId].physicalId]->isNull();
case traditional:
{
FieldFormat *ff = format->format + fieldId;
- return (data.record [NULL_BYTE (ff)] & NULL_BIT (ff)) ? true : false;
+ return (recordData [NULL_BYTE (ff)] & NULL_BIT (ff)) ? true : false;
}
case byteVector:
@@ -866,32 +871,45 @@ void Record::setPriorVersion(Record *old
FATAL("setPriorVersion should only be called for RecordVersions\n");
}
-int Record::thaw(void)
+char *Record::thaw(void)
{
- return 0;
+ return data.record;
}
-int Record::setRecordData(const UCHAR * dataIn, int dataLength)
+char* Record::setRecordData(const UCHAR * dataIn, int dataLength, int *bytesReallocated)
{
encoding = shortVector;
int vectorLength = format->count * sizeof(short);
int totalLength = vectorLength + dataLength;
char *dataBuffer = allocRecordData(totalLength);
+ *bytesReallocated = totalLength;
memset(dataBuffer, 0, vectorLength);
memcpy(dataBuffer + vectorLength, dataIn, dataLength);
((USHORT*) dataBuffer)[0] = (USHORT) (vectorLength + sizeof(short));
highWater = 0;
- char **ptr = &data.record;
- // If data.record has changed since allocating the new buffer, then free the new buffer
-
- if (!COMPARE_EXCHANGE_POINTER(ptr, NULL, dataBuffer))
+ // If data.record has changed since allocating the new buffer,
+ // then free the buffer allocated here and use the new one.
+
+ char* prevBuffer = data.record;
+ while (!isAllocated(prevBuffer))
+ {
+ if (COMPARE_EXCHANGE_POINTER(&data.record, prevBuffer, dataBuffer))
+ break;
+
+ prevBuffer = data.record;
+ }
+
+ // prevBuffer is either NULL or is a buffer other than dataBuffer.
+
+ if (isAllocated(prevBuffer))
{
DELETE_RECORD(dataBuffer);
- totalLength = 0;
+ dataBuffer = prevBuffer;
+ *bytesReallocated = 0;
}
-
- return (totalLength);
+
+ return dataBuffer;
}
@@ -902,30 +920,95 @@ void Record::deleteData(void)
void Record::deleteData(bool now)
{
- if (data.record)
+ // Detach the pointer from data.record
+
+ char* recordData = data.record;
+ while (isAllocated(recordData))
+ {
+ if (COMPARE_EXCHANGE_POINTER(&data.record, recordData, NULL))
+ break;
+
+ recordData = data.record;
+ }
+
+ // Now free the detached buffer
+
+ if (isAllocated(recordData))
{
switch (encoding)
{
case valueVector:
if (now)
- delete [] (Value*) data.record;
+ delete [] (Value*) recordData;
else
- format->table->queueForDelete((Value**) data.record);
+ format->table->queueForDelete((Value**) recordData);
break;
default:
if (now)
{
- DELETE_RECORD (data.record);
+ DELETE_RECORD (recordData);
}
else
- format->table->queueForDelete((char *) data.record);
+ format->table->queueForDelete((char *) recordData);
}
+ }
+}
+
+// chillData is like deleteData except that it is only used during chill
+// in order to get rid of the data.record buffer. So it always uses
+// queueForDelete. And instead of setting the pointer to NULL like deleteData,
+// chillData sets it to (-1).
+// Only one thread can chill a record at a time because that
+// thread must 'own' the transaction. So this CAS may not be needed.
+
+
+void Record::chillData(void)
+{
+ // Detach the pointer from data.record
+
+ char* recordData = data.record;
+ while (isAllocated(recordData))
+ {
+ if (COMPARE_EXCHANGE_POINTER(&data.record, recordData, recordIsChilled))
+ break;
- data.record = NULL;
+ recordData = data.record;
+ }
+
+ // Now free the detached buffer
+
+ if (isAllocated(recordData))
+ {
+ switch (encoding)
+ {
+ case valueVector:
+ format->table->queueForDelete((Value**) recordData);
+ break;
+
+ default:
+ format->table->queueForDelete((char *) recordData);
+ }
}
}
+bool Record::hasRecord(void)
+{
+ char* recordData = data.record;
+ ASSERT(recordData != recordIsChilled);
+ return (recordData != NULL);
+}
+
+bool Record::isChilled(void)
+{
+ return false; // Only RecordVersions are ever chilled!
+}
+
+bool Record::isDeleted(void)
+{
+ return false; // Only RecordVersions are ever chilled!
+}
+
void Record::print(void)
{
printf(" %p\tId %d, enc %d, state %d, use %d, grp " I64FORMAT "\n",
@@ -998,9 +1081,11 @@ void Record::serialize(Serialize* stream
stream->putInt(format->version);
stream->putInt(state);
stream->putInt(size);
-
- if (data.record)
- stream->putData(getEncodedSize(), (UCHAR*) data.record + ((USHORT*) data.record)[0]);
+
+ char* recordData = getRecordData();
+
+ if (recordData)
+ stream->putData(getEncodedSize(), (UCHAR*) recordData + ((USHORT*) recordData)[0]);
else
stream->putData(0, NULL);
}
@@ -1012,7 +1097,9 @@ int Record::getSize(void)
int Record::getDataMemUsage(void)
{
- return (data.record == NULL ? 0 : MemMgr::blockSize(data.record));
+ char* recordData = getRecordData();
+
+ return (recordData == NULL ? 0 : MemMgr::blockSize(recordData));
}
int Record::getMemUsage(void)
@@ -1109,3 +1196,14 @@ void Record::queueForDelete(void)
state = recQueuedForDelete;
format->table->queueForDelete(this);
}
+
+char* Record::getRecordData(void)
+{
+ return data.record;
+}
+
+char* Record::findRecordData(void)
+{
+ return data.record;
+}
+
=== modified file 'storage/falcon/Record.h'
--- a/storage/falcon/Record.h 2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Record.h 2009-05-12 14:35:40 +0000
@@ -43,7 +43,7 @@ enum RecordEncoding {
static const int recData = 0; // record pointer is valid or record is deleted
static const int recDeleted = 1; // record has been deleted
-static const int recChilled = 2; // record data is temporarily stored in serial log
+//static const int recChilled = 2; // record data is temporarily stored in serial log
static const int recOnDisk = 3; // record is on disk and must be read
static const int recLock = 4; // this is a "record lock" and not a record
static const int recNoChill = 5; // record is in use and should not be chilled
@@ -55,6 +55,8 @@ static const int recPruning = 10; //
static const int recEndChain = 11; // end of chain for garbage collection
static const int recQueuedForDelete = 12; // Record is in purgatory.
+// Special value for data.record
+static const char* recordIsChilled = (char*) -1;
//#define CHECK_RECORD_ACTIVITY
#ifdef CHECK_RECORD_ACTIVITY
@@ -114,25 +116,29 @@ public:
virtual Record* fetchVersion (Transaction * transaction);
virtual void retire(void);
virtual void scavenge(TransId targetTransactionId, int oldestActiveSavePointId);
- virtual bool isVersion();
- virtual bool isSuperceded();
- virtual bool isNull(int fieldId);
virtual Record* releaseNonRecursive(void);
virtual Record* clearPriorVersion(void);
virtual void setPriorVersion(Record *oldPriorVersion, Record *newPriorVersion);
virtual Record* getPriorVersion();
virtual Record* getGCPriorVersion(void);
virtual void print(void);
- virtual int thaw(void);
+ virtual char* thaw(void);
virtual const char* getEncodedRecord();
- virtual int setRecordData(const UCHAR *dataIn, int dataLength);
+ virtual char* setRecordData(const UCHAR *dataIn, int dataLength, int *bytesReallocated);
+ virtual char* getRecordData(void);
virtual void serialize(Serialize* stream);
virtual int getSize(void);
virtual SyncObject* getSyncThaw(void);
virtual void queueForDelete(void);
+ virtual bool hasRecord();
+ virtual bool isChilled();
+ virtual bool isDeleted();
+ virtual bool isSuperceded();
+ virtual bool isVersion();
+ virtual bool isNull(int fieldId);
const UCHAR* getEncoding (int index);
- int setEncodedRecord(Stream *stream, bool interlocked);
+ char* setEncodedRecord(Stream *stream, bool interlocked);
void getValue (int fieldId, Value* value);
void getRawValue (int fieldId, Value* value);
int getFormatVersion();
@@ -142,38 +148,25 @@ public:
void addRef();
int getBlobId(int fieldId);
void finalize(Transaction *transaction);
- void getEncodedValue (int fieldId, Value *value);
+ void getEncodedValue(char* recordData, int fieldId, Value *value);
bool getRecord (Stream *stream);
+ int getAllocatedSize(void);
int getEncodedSize();
void deleteData(void);
void deleteData(bool now);
+ void chillData(void);
void printRecord(const char* header);
void validateData(void);
char* allocRecordData(int length);
void expungeRecord(void);
int getDataMemUsage(void);
int getMemUsage(void);
-
+ char* findRecordData(void);
+
Record (Table *table, Format *recordFormat);
Record (Table *table, int32 recordNumber, Stream *stream);
Record (Database *database, Serialize* stream);
- inline int hasRecord(bool forceThaw = true)
- {
- if (state == recChilled && forceThaw)
- thaw();
-
- return (data.record != NULL);
- }
-
- inline char* getRecordData()
- {
- if (state == recChilled)
- thaw();
-
- return data.record;
- }
-
protected:
virtual ~Record();
@@ -182,6 +175,12 @@ protected:
char *record;
} data;
+ inline bool isAllocated(char* recordData)
+ {
+ return ((recordData != NULL) && (recordData != recordIsChilled));
+ }
+
+
public:
uint64 generation;
Format *format;
=== modified file 'storage/falcon/RecordVersion.cpp'
--- a/storage/falcon/RecordVersion.cpp 2009-05-01 18:26:25 +0000
+++ b/storage/falcon/RecordVersion.cpp 2009-05-12 14:35:40 +0000
@@ -61,14 +61,9 @@ RecordVersion::RecordVersion(Table *tbl,
{
virtualOffset = 0;
- //transaction = trans;
- //transactionState = trans->transactionState;
- //transactionState->addRef();
-
transactionState = NULL;
setTransactionState(transaction->transactionState);
- //transactionId = transaction->transactionId;
savePointId = transaction->curSavePointId;
superceded = false;
@@ -80,8 +75,9 @@ RecordVersion::RecordVersion(Table *tbl,
priorVersion->addRef(REC_HISTORY);
recordNumber = oldVersion->recordNumber;
- if (priorVersion->state == recChilled)
- priorVersion->thaw();
+ // Be sure the priorVersion is thawed.
+
+ priorVersion->hasRecord();
if (transactionState == priorVersion->getTransactionState())
oldVersion->setSuperceded (true);
@@ -398,18 +394,21 @@ uint64 RecordVersion::getVirtualOffset()
return (virtualOffset);
}
-int RecordVersion::thaw()
+char *RecordVersion::thaw()
{
+ // We should try without this now that data.record is manipulated safely.
+
Sync syncThaw(format->table->getSyncThaw(this), "RecordVersion::thaw");
syncThaw.lock(Exclusive);
- int bytesRestored = 0;
+ int bytesReallocated = 0;
// Nothing to do if the record is no longer chilled
- if (state != recChilled)
- return getDataMemUsage();
-
+ char* recordData = data.record;
+ if (recordData != recordIsChilled)
+ return recordData;
+
// First, try to thaw from the serial log. If transaction->writePending is
// true, then the record data can be restored from the serial log. If writePending
// is false, then the record data has been written to the data pages.
@@ -419,38 +418,31 @@ int RecordVersion::thaw()
if (trans)
{
if (trans->writePending)
- {
- bytesRestored = trans->thaw(this);
-
- if (bytesRestored == 0)
- trans->thaw(this);
- }
+ recordData = trans->thaw(this, &bytesReallocated);
trans->release();
}
-
+
// The record data is no longer available in the serial log, so zap the
// virtual offset and restore from the data page.
-
- bool recordFetched = false;
- if (bytesRestored == 0)
+ if (bytesReallocated == 0)
{
Stream stream;
Table *table = format->table;
-
+
if (table->dbb->fetchRecord(table->dataSection, recordNumber, &stream))
{
- bytesRestored = setEncodedRecord(&stream, true);
- recordFetched = true;
+ recordData = setEncodedRecord(&stream, true);
+ bytesReallocated = getAllocatedSize();
}
-
- if (bytesRestored > 0)
+
+ if (bytesReallocated > 0)
{
virtualOffset = 0;
table->debugThawedRecords++;
- table->debugThawedBytes += bytesRestored;
-
+ table->debugThawedBytes += bytesReallocated;
+
if (table->debugThawedBytes >= table->database->configuration->recordChillThreshold)
{
Log::debug("%d: Record thaw (fetch): table %d, %ld records, %ld bytes\n", this->format->table->database->deltaTime,
@@ -460,25 +452,19 @@ int RecordVersion::thaw()
}
}
}
-
- if (state == recChilled)
- {
- if (data.record != NULL)
- state = recData;
- }
-
- return bytesRestored;
+
+ return recordData;
}
-/***
-char* RecordVersion::getRecordData()
+bool RecordVersion::isChilled(void)
{
- if (state == recChilled)
- thaw();
-
- return data.record;
+ return (data.record == recordIsChilled);
+}
+
+bool RecordVersion::isDeleted(void)
+{
+ return (data.record == NULL && state == recDeleted);
}
-***/
void RecordVersion::print(void)
{
@@ -517,6 +503,9 @@ void RecordVersion::setTransactionState(
if (transactionState)
transactionState->release();
+ // Add a use count on the transaction state to ensure it lives
+ // as long as the record version object
+
transactionState = newTransState;
transactionState->addRef();
}
@@ -525,3 +514,28 @@ Transaction* RecordVersion::findTransact
{
return format->table->database->transactionManager->findTransaction(transactionState->transactionId);
}
+
+bool RecordVersion::hasRecord(void)
+{
+ char* recordData = data.record;
+ if (recordData == NULL)
+ return false;
+
+ if (recordData == recordIsChilled)
+ {
+ recordData = thaw();
+ return (recordData != NULL);
+ }
+
+ return true;
+}
+
+char* RecordVersion::getRecordData()
+{
+ char* recordData = data.record;
+
+ if (recordData == recordIsChilled)
+ recordData = thaw();
+
+ return recordData;
+}
=== modified file 'storage/falcon/RecordVersion.h'
--- a/storage/falcon/RecordVersion.h 2009-04-16 12:09:15 +0000
+++ b/storage/falcon/RecordVersion.h 2009-05-12 14:35:40 +0000
@@ -36,7 +36,6 @@ public:
RecordVersion(Table *tbl, Format *fmt, Transaction *tran, Record *oldVersion);
RecordVersion(Database* database, Serialize *stream);
- virtual bool isSuperceded();
//virtual Transaction* getTransaction();
virtual TransactionState* getTransactionState() const;
virtual TransId getTransactionId();
@@ -46,7 +45,6 @@ public:
virtual Record* getGCPriorVersion(void);
virtual void retire(void);
virtual void scavengeSavepoint(Transaction* targetTransaction, int oldestActiveSavePoint);
- virtual bool isVersion();
virtual void rollback(Transaction *transaction);
virtual Record* fetchVersion (Transaction * trans);
virtual Record* releaseNonRecursive();
@@ -54,11 +52,17 @@ public:
virtual void setPriorVersion(Record *oldPriorVersion, Record *newPriorVersion);
virtual void setVirtualOffset(uint64 offset);
virtual uint64 getVirtualOffset();
- virtual int thaw(void);
+ virtual char* thaw(void);
virtual void print(void);
virtual int getSize(void);
virtual void serialize(Serialize* stream);
virtual Transaction* findTransaction(void);
+ virtual char* getRecordData();
+ virtual bool hasRecord();
+ virtual bool isChilled();
+ virtual bool isDeleted();
+ virtual bool isSuperceded();
+ virtual bool isVersion();
void commit();
bool committedBefore(TransId);
=== modified file 'storage/falcon/SRLUpdateRecords.cpp'
--- a/storage/falcon/SRLUpdateRecords.cpp 2009-05-08 18:37:39 +0000
+++ b/storage/falcon/SRLUpdateRecords.cpp 2009-05-12 14:35:40 +0000
@@ -38,36 +38,36 @@ SRLUpdateRecords::~SRLUpdateRecords(void
{
}
-bool SRLUpdateRecords::chill(Transaction *transaction, RecordVersion *record, uint dataLength)
+bool SRLUpdateRecords::chill(Transaction *transaction, RecordVersion *record)
{
- // Record data has been written to the serial log, so release the data
- // buffer and set the state accordingly
-
+ // Record data has been written to the serial log.
+ // Before releasing the data, find the allocated size.
+
+ int chillBytes = record->getAllocatedSize();
+
+ // Release the data buffer and set the state accordingly
+
ASSERT(record->format);
- record->deleteData();
- record->state = recChilled;
-
+ record->chillData();
+
// Update transaction counter and chillPoint
-
- transaction->chillPoint = &record->nextInTrans;
- //ASSERT(transaction->totalRecordData >= dataLength);
- if (transaction->totalRecordData >= dataLength)
- transaction->totalRecordData -= dataLength;
+ transaction->chillPoint = &record->nextInTrans;
+ if (transaction->totalRecordData >= chillBytes)
+ transaction->totalRecordData -= chillBytes;
else
transaction->totalRecordData = 0;
-
+
return true;
}
-int SRLUpdateRecords::thaw(RecordVersion *record, bool *thawed)
+char* SRLUpdateRecords::thaw(RecordVersion *record, int *bytesReallocated)
{
- *thawed = false;
-
// Nothing to do if record is no longer chilled
- if (record->state != recChilled)
- return record->size;
+ char* dataRecord = record->findRecordData();
+ if (dataRecord != recordIsChilled)
+ return dataRecord ;
// Find the window where the record is stored using the record offset, then
// activate the window, reading from disk if necessary
@@ -99,7 +99,7 @@ int SRLUpdateRecords::thaw(RecordVersion
int recordNumber = control->getInt();
ASSERT(recordNumber == record->recordNumber);
int dataLength = control->getInt();
- int bytesReallocated = 0;
+ *bytesReallocated = 0;
window->deactivateWindow();
sync.unlock();
@@ -107,31 +107,25 @@ int SRLUpdateRecords::thaw(RecordVersion
// setRecordData() handles race conditions with an interlocked compare and exchange,
// but check the state and record number anyway
- if (record->state == recChilled)
- bytesReallocated = record->setRecordData(control->input, dataLength);
+ dataRecord = record->findRecordData();
+ if (dataRecord == recordIsChilled)
+ dataRecord = record->setRecordData(control->input, dataLength, bytesReallocated);
+
- if (bytesReallocated > 0)
+ if (*bytesReallocated > 0)
{
ASSERT(recordNumber == record->recordNumber);
- bytesReallocated = record->getEncodedSize();
- *thawed = true;
if (log->chilledRecords > 0)
log->chilledRecords--;
-
- if (log->chilledBytes > uint64(bytesReallocated))
- log->chilledBytes -= bytesReallocated;
- else
- log->chilledBytes = 0;
}
- return bytesReallocated;
+ return dataRecord;
}
void SRLUpdateRecords::append(Transaction *transaction, RecordVersion *records, bool chillRecords)
{
uint32 chilledRecordsWindow = 0;
- uint32 chilledBytesWindow = 0;
SerialLogTransaction *srlTrans = NULL;
int savepointId;
@@ -158,7 +152,6 @@ void SRLUpdateRecords::append(Transactio
UCHAR *end = log->writeWarningTrack;
chilledRecordsWindow = 0;
- chilledBytesWindow = 0;
for (; record; record = record->nextInTrans)
{
@@ -169,7 +162,7 @@ void SRLUpdateRecords::append(Transactio
// Skip chilled records, but advance the chillpoint
- if (record->state == recChilled)
+ if (record->isChilled())
{
transaction->chillPoint = &record->nextInTrans;
continue;
@@ -200,30 +193,25 @@ void SRLUpdateRecords::append(Transactio
if (record->virtualOffset != 0)
{
+ // Record is already in serial log
+
if (chillRecords && record->state != recDeleted)
{
- int chillBytes = record->getEncodedSize();
-
- if (chill(transaction, record, chillBytes))
+ if (chill(transaction, record))
{
log->chilledRecords++;
- log->chilledBytes += chillBytes;
if (transaction->thawedRecords)
transaction->thawedRecords--;
}
}
- else
- {
- // Record is already in serial log
- }
continue;
}
// Load the record data into a stream
- if (record->hasRecord(false))
+ if (record->hasRecord())
{
if (!record->getRecord(&stream))
continue;
@@ -256,11 +244,8 @@ void SRLUpdateRecords::append(Transactio
if (chillRecords && record->state != recDeleted)
{
- if (chill(transaction, record, stream.totalLength))
- {
+ if (chill(transaction, record))
chilledRecordsWindow++;
- chilledBytesWindow += stream.totalLength;
- }
}
} // next record version
@@ -281,9 +266,7 @@ void SRLUpdateRecords::append(Transactio
if (chillRecords)
{
log->chilledRecords += chilledRecordsWindow;
- log->chilledBytes += chilledBytesWindow;
transaction->chilledRecords += chilledRecordsWindow;
- transaction->chilledBytes += chilledBytesWindow;
// uint32 windowNumber = (uint32)log->writeWindow->virtualOffset / SRL_WINDOW_SIZE;
}
} // next serial log record and write window
=== modified file 'storage/falcon/SRLUpdateRecords.h'
--- a/storage/falcon/SRLUpdateRecords.h 2009-01-17 08:22:44 +0000
+++ b/storage/falcon/SRLUpdateRecords.h 2009-05-12 14:35:40 +0000
@@ -33,8 +33,8 @@ public:
virtual void read(void);
virtual void pass2(void);
void append(Transaction *transaction, RecordVersion *records, bool chillRecords = false);
- bool chill(Transaction *transaction, RecordVersion *record, uint dataLength);
- int thaw(RecordVersion *record, bool *thawed);
+ bool chill(Transaction *transaction, RecordVersion *record);
+ char* thaw(RecordVersion *record, int *bytesReallocated);
const UCHAR *data;
int savepointId;
=== modified file 'storage/falcon/SerialLog.cpp'
--- a/storage/falcon/SerialLog.cpp 2009-04-20 18:40:49 +0000
+++ b/storage/falcon/SerialLog.cpp 2009-05-12 14:35:40 +0000
@@ -102,7 +102,6 @@ SerialLog::SerialLog(Database *db, JStri
tracePage = TRACE_PAGE;
traceRecord = 0;
chilledRecords = 0;
- chilledBytes = 0;
windowReads = 0;
priorWindowWrites = 0;
windowWrites = 0;
=== modified file 'storage/falcon/SerialLog.h'
--- a/storage/falcon/SerialLog.h 2009-04-20 18:40:49 +0000
+++ b/storage/falcon/SerialLog.h 2009-05-12 14:35:40 +0000
@@ -230,7 +230,6 @@ public:
int32 tracePage;
int32 traceRecord;
uint32 chilledRecords;
- uint64 chilledBytes;
int32 wantToSerializeGophers;
int32 serializeGophers;
uint64 startRecordVirtualOffset;
=== modified file 'storage/falcon/Table.cpp'
--- a/storage/falcon/Table.cpp 2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Table.cpp 2009-05-12 14:35:40 +0000
@@ -3534,13 +3534,9 @@ Record* Table::fetchForUpdate(Transactio
return NULL;
}
- if (source->state == recChilled && !source->thaw())
- {
- source->release(REC_HISTORY);
+ if (!source->hasRecord())
+ FATAL("Committed non-deleted record has no data buffer.\n");
- return NULL;
- }
-
// Lock the record
if (dbb->debug & DEBUG_RECORD_LOCKS)
=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp 2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Transaction.cpp 2009-05-12 14:35:40 +0000
@@ -127,9 +127,7 @@ void Transaction::initialize(Connection*
totalRecordData = 0;
totalRecords = 0;
chilledRecords = 0;
- chilledBytes = 0;
thawedRecords = 0;
- thawedBytes = 0;
debugThawedRecords = 0;
debugThawedBytes = 0;
committedRecords = 0;
@@ -551,12 +549,13 @@ void Transaction::chillRecords()
(uint32)(totalDataBefore-totalRecordData), committedRecords, connection->currentStatement);
}
-int Transaction::thaw(RecordVersion * record)
+char* Transaction::thaw(RecordVersion* record, int* bytesReallocated)
{
// Nothing to do if record is no longer chilled
-
- if (record->state != recChilled)
- return record->getDataMemUsage();
+
+ char *dataRecord = record->findRecordData();
+ if (dataRecord != recordIsChilled)
+ return dataRecord;
// Get pointer to record data in serial log
@@ -565,16 +564,14 @@ int Transaction::thaw(RecordVersion * re
// Thaw the record then update the total record data bytes for this transaction
ASSERT(record->getTransactionId() == transactionId);
- bool thawed;
- int bytesRestored = control.updateRecords.thaw(record, &thawed);
-
- if (bytesRestored > 0 && thawed)
+ dataRecord = control.updateRecords.thaw(record, bytesReallocated);
+
+ if (*bytesReallocated > 0)
{
- totalRecordData += bytesRestored;
+ totalRecordData += *bytesReallocated;
thawedRecords++;
- thawedBytes += bytesRestored;
debugThawedRecords++;
- debugThawedBytes += bytesRestored;
+ debugThawedBytes += *bytesReallocated;
}
if (debugThawedBytes >= database->configuration->recordChillThreshold)
@@ -585,7 +582,7 @@ int Transaction::thaw(RecordVersion * re
debugThawedBytes = 0;
}
- return bytesRestored;
+ return dataRecord;
}
void Transaction::thaw(DeferredIndex * deferredIndex)
@@ -605,7 +602,7 @@ void Transaction::addRecord(RecordVersio
else if (record->state == recDeleted)
++deletedRecords;
- totalRecordData += record->getEncodedSize();
+ totalRecordData += record->getAllocatedSize();
++totalRecords;
// If the transaction size has exceeded the chill threshold,
@@ -616,13 +613,13 @@ void Transaction::addRecord(RecordVersio
if (totalRecordData > database->configuration->recordChillThreshold
&& !systemTransaction)
{
- UCHAR saveState = record->state;
-
- // Chill all records except the current record, which may be part of an update or insert
+ // Chill all records except the current record,
+ // which may be part of an update or insert
- if (record->state != recLock && record->state != recChilled)
+ UCHAR saveState = record->state;
+ if (record->state != recLock)
record->state = recNoChill;
-
+
chillRecords();
if (record->state == recNoChill)
@@ -696,14 +693,11 @@ void Transaction::removeRecordNoLock(Rec
chillPoint = ptr;
// Adjust total record data count
-
- if (record->state != recChilled)
- {
- uint32 size = record->getEncodedSize();
-
- if (totalRecordData >= size)
- totalRecordData -= size;
- }
+
+ uint32 allocatedSize = record->getAllocatedSize();
+
+ if (totalRecordData >= allocatedSize)
+ totalRecordData -= allocatedSize;
if (record->state == recDeleted && deletedRecords > 0)
--deletedRecords;
@@ -1533,6 +1527,11 @@ void Transaction::releaseDeferredIndexes
}
}
+// Backlogging is only done by the thread for this transaction which is
+// the thread running backlogRecords().
+// So we assume that all chilling is done and no new record versions
+// will be added to any of these record chains while this is happening.
+
void Transaction::backlogRecords(void)
{
SavePoint *savePoint = savePoints;
@@ -1541,7 +1540,12 @@ void Transaction::backlogRecords(void)
{
prior = record->prevInTrans;
- if (!record->hasRecord(false))
+ // Be sure this is the base record. Backlog all chilled records
+ // and deleted records, but not lock records
+
+ if ( !record->isSuperceded()
+ && ( record->isChilled()
+ || record->isDeleted() ))
{
Sync syncSP(&syncSavepoints, "Transaction::backlogRecords");
syncSP.lock(Shared);
=== modified file 'storage/falcon/Transaction.h'
--- a/storage/falcon/Transaction.h 2009-04-06 12:33:12 +0000
+++ b/storage/falcon/Transaction.h 2009-05-12 14:35:40 +0000
@@ -89,7 +89,7 @@ public:
bool isXidEqual(int testLength, const UCHAR* test);
void releaseRecordLocks(void);
void chillRecords();
- int thaw(RecordVersion* record);
+ char* thaw(RecordVersion* record, int* bytesReallocated);
void thaw(DeferredIndex* deferredIndex);
void print(void);
void printBlockage(void);
@@ -149,9 +149,7 @@ public:
uint64 totalRecordData; // total bytes of record data for this transaction (unchilled + thawed)
uint32 totalRecords; // total record count
uint32 chilledRecords; // total chilled record count
- uint32 chilledBytes; // current bytes chilled
uint32 thawedRecords; // total thawed record count
- uint32 thawedBytes; // current bytes thawed
uint32 debugThawedRecords;
uint32 debugThawedBytes;
uint32 committedRecords; // committed record count
Attachment: [text/bzr-bundle] bzr/kevin.lewis@sun.com-20090512143540-f7yjfnr0jlbcroa1.bundle
| Thread |
|---|
| • bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2702)Bug#43344 Bug#44721 | Kevin Lewis | 12 May |