List:Commits« Previous MessageNext Message »
From:Kevin Lewis Date:May 12 2009 2:36pm
Subject:bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2702)
Bug#43344 Bug#44721
View as plain text  
#At file:///C:/Work/bzr/Merge/mysql-6.0-falcon-team/ based on revid:kevin.lewis@stripped

 2702 Kevin Lewis	2009-05-12
      Bug#43344 & Bug#44721  
      Second patch (comprehensive) with review changes
      
      This series of changes closes many gaps in which a data buffer can get 
      thawed and then re-chilled almost immediatly which can cause the use 
      of a bad data buffer pointer.  For 44344, it makes the act of chilling
      and thawing records an atomic operation which uses CAS.  Each call to 
      thaw returns a valid pointer to the buffer that was thawed so that it 
      can be used during the current cycle as long as the caller has a 
      CycleLock.
      
      Record.h, Record.cpp, RecordVersion.h & RecordVersion.cpp;
      
      The most significant change is that when a data buffer is chilled, 
      it recieves the value (-1) and this happens atomically.  
      Record::state == recChilled is no longer used.  
      The record.data pointer is now always read once, checked, and 
      returned up the call stack to where it is used.
      
      hasRecord() is changed from an inline function to a virtual function 
      for both Record and RecordVersion. hasRecord() and isChilled() are 
      added so that the record can check data.record itself. 
      
      setEncodedRecord() and thaw() return the pointer to the buffer
      that they just created and put into data.record so that the caller 
      does not have to rely on that buffer being there later.
      
      getEncodedValue() uses the recordData pointer of the caller instead
      of getting it again from data.record.
      
      getAllocatedSize() is added so that the number of bytes in a buffer 
      holding the record can be consistently known.  It is used in several 
      places.  
      
      chillData() is similar to and replaces deleteData() during 
      SRLUpdateRecords::chill
      
      isDeleted() is also added so that the common (state == recDeleted) 
      check can be done by the record itself.  This is not fully 
      implemented yet because we need to get away from the over-use of 
      these state flags.  They are somewhat unreliable on multiple CPUs 
      unless they are set with CAS.
      
      getRecordData() is also changed from an inline to a virtual function.
      
      findRecordData() is like getRecordData but without the thaw.  It is 
      used in three places along the call stack for thawing a record to 
      check if another thread has already thawed that record.  This is 
      not likely now because of the use of syncThaw.  But syncThaw may 
      no longer be necessary.
      
      isAllocated() is an inline function used about 6 times to determine 
      if a pointer is real or not.
      
      SRLUpdateRecords.h & SRLUpdateRecords.cpp;
      
      thaw() returns the data buffer thawed in addition to the number of 
      bytes reallocated.
      
      chill() is not sent the dataLength anymore since it calculates that 
      itself.
      
      SerialLog.h, SerialLog.cpp, transaction.h, transaction.cpp;
      
      Since the number of bytes chilled and thawed stored in the SerialLog 
      and Transaction were being maintained but not used, I deleted those
      variables and code used to maintain them.  The number of allocated 
      bytes associated with a Transaction is stored in totalRecorddata and 
      is still maintained.
      
      transaction.cpp
      Even though backlogging is not currently active, the choice whether 
      to backlog any particular record needed to be enhanced.  Along with 
      replacing hasRecord(false) with isChilled(), it should avoid 
      backlogging record chains that do not start at the base record.  
      That is checked with isSuperceded(). Also, backlogging should occur 
      for deleted records as well as newly chilled records.

    modified:
      storage/falcon/Record.cpp
      storage/falcon/Record.h
      storage/falcon/RecordVersion.cpp
      storage/falcon/RecordVersion.h
      storage/falcon/SRLUpdateRecords.cpp
      storage/falcon/SRLUpdateRecords.h
      storage/falcon/SerialLog.cpp
      storage/falcon/SerialLog.h
      storage/falcon/Table.cpp
      storage/falcon/Transaction.cpp
      storage/falcon/Transaction.h
=== modified file 'storage/falcon/Record.cpp'
--- a/storage/falcon/Record.cpp	2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Record.cpp	2009-05-12 14:35:40 +0000
@@ -188,8 +188,7 @@ void Record::setValue(TransactionState *
 	switch (encoding)
 		{
 		case noEncoding:
-			if (data.record != NULL)
-				DELETE_RECORD (data.record);
+			deleteData(false);
 
 			data.record = (char*) NEW Value[format->count];
 			encoding = valueVector;
@@ -304,13 +303,15 @@ void Record::setValue(TransactionState *
 
 int Record::getFormatVersion()
 {
+	char* recordData = getRecordData();
+
 	switch (encoding)
 		{
 		case traditional:
-			return *(short*) data.record;
+			return *(short*) recordData;
 
 		case shortVector:
-			return -*(short*) (data.record + ((USHORT*) data.record)[0] - sizeof(short));
+			return -*(short*) (recordData + ((USHORT*) recordData)[0] - sizeof(short));
 
 		default:
 			NOT_YET_IMPLEMENTED;
@@ -363,9 +364,8 @@ void Record::getRawValue(int fieldId, Va
 		return;
 
 	// If chilled, restore the record data from the serial log
-	
-	if (state == recChilled)
-		thaw();
+
+	char* recordData = getRecordData();
 	
 	// If this is an encoded record, parse through the fields
 
@@ -374,11 +374,11 @@ void Record::getRawValue(int fieldId, Va
 		case byteVector:
 		case shortVector:
 		case longVector:
-			getEncodedValue(fieldId, value);
+			getEncodedValue(recordData, fieldId, value);
 			return;
 
 		case valueVector:
-			value->setValue(((Value*) data.record) + format->format[fieldId].physicalId, false);
+			value->setValue(((Value*) recordData) + format->format[fieldId].physicalId, false);
 			return;
 
 		case traditional:
@@ -390,10 +390,10 @@ void Record::getRawValue(int fieldId, Va
 
 	FieldFormat *ff = format->format + fieldId;
 
-	if (!(data.record [NULL_BYTE (ff)] & NULL_BIT (ff)))
+	if (!(recordData [NULL_BYTE (ff)] & NULL_BIT (ff)))
 		return;
 
-	char *ptr = data.record + ff->offset;
+	char *ptr = recordData + ff->offset;
 
 	switch (ff->type)
 		{
@@ -558,19 +558,21 @@ int Record::getBlobId(int fieldId)
 	if (fieldId >= format->maxId)
 		return -1;
 
+	char* recordData = getRecordData();
+
 	switch (encoding)
 		{
 		case traditional:
 			{
 			FieldFormat *ff = format->format + fieldId;
 
-			if (!(data.record [NULL_BYTE (ff)] & NULL_BIT (ff)))
+			if (!(recordData [NULL_BYTE (ff)] & NULL_BIT (ff)))
 				return -1;
 
 			if (ff->type != Asciiblob && ff->type != Binaryblob)
 				return -1;
 
-			return *(int32*) (data.record + ff->offset);
+			return *(int32*) (recordData + ff->offset);
 			}
 
 		case byteVector:
@@ -578,7 +580,7 @@ int Record::getBlobId(int fieldId)
 		case longVector:
 			{
 			Value value;
-			getEncodedValue(fieldId, &value);
+			getEncodedValue(recordData, fieldId, &value);
 
 			if (value.getType() == Asciiblob || value.getType() == Binaryblob)
 				return value.getBlobId();
@@ -606,17 +608,18 @@ int Record::getSavePointId()
 
 bool Record::getRecord(Stream *stream)
 {
-	if (data.record != NULL)
+	char* recordData = getRecordData();
+	if (recordData != NULL)
 		{
 		switch (encoding)
 			{
 			case traditional:
-				stream->compress (format->length, data.record);
+				stream->compress (format->length, recordData);
 				break;
 
 			case shortVector:
 				stream->putSegment(getEncodedSize(),
-						data.record + ((USHORT*) data.record)[0] - sizeof(short), false);
+						recordData + ((USHORT*) recordData)[0] - sizeof(short), false);
 				break;
 
 			default:
@@ -624,11 +627,19 @@ bool Record::getRecord(Stream *stream)
 			}
 		}
 
-	return (data.record != NULL);
+	return (recordData != NULL);
+}
+
+int Record::getAllocatedSize()
+{
+	int vectorLength = format->count * sizeof(short);
+	return vectorLength + getEncodedSize();
 }
 
 int Record::getEncodedSize()
 {
+	char* recordData = getRecordData();
+
 	switch (encoding)
 		{
 		case traditional:
@@ -638,7 +649,7 @@ int Record::getEncodedSize()
 			return size - format->count * sizeof(USHORT) - getSize();
 
 		case noEncoding:
-			if (!data.record)
+			if (!recordData)
 				return 0;
 		default:
 			NOT_YET_IMPLEMENTED;
@@ -647,32 +658,32 @@ int Record::getEncodedSize()
 		}
 }
 
-void Record::getEncodedValue(int fieldId, Value *value)
+void Record::getEncodedValue(char* recordData, int fieldId, Value *value)
 {
 	switch (encoding)
 		{
 		case shortVector:
 			{
 			int index = format->format[fieldId].physicalId;
-			USHORT *vector = (USHORT*) data.record;
+			USHORT *vector = (USHORT*) recordData;
 
 			if (highWater < index)
 				{
-				const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+				const UCHAR *p = (UCHAR*) recordData + vector[highWater];
 
 				while (highWater < index)
 					{
 					const UCHAR *q = EncodedDataStream::skip(p);
-					vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+					vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
 					p = q;
 					}
 				}
 
 			ASSERT(vector[index] < size);
-			const UCHAR *q = EncodedDataStream::decode((UCHAR*) data.record + vector[index], value, false);
+			const UCHAR *q = EncodedDataStream::decode((UCHAR*) recordData + vector[index], value, false);
 
 			if (++index < format->count && highWater < index)
-				vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+				vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
 
 			return;
 			}
@@ -707,17 +718,9 @@ void Record::finalize(Transaction *trans
 	delete [] values;
 }
 
-int Record::setEncodedRecord(Stream *stream, bool interlocked)
+char* Record::setEncodedRecord(Stream *stream, bool interlocked)
 {
-	if (data.record)
-		{
-		if (encoding == valueVector)
-			delete [] (Value*) data.record;
-		else
-			DELETE_RECORD (data.record);
-		
-		data.record = NULL;
-		}
+	deleteData(false);
 
 	encoding = shortVector;
 	int vectorLength = format->count * sizeof(short);
@@ -726,18 +729,19 @@ int Record::setEncodedRecord(Stream *str
 	memset(dataBuffer, 0, vectorLength);
 	stream->getSegment(0, stream->totalLength, dataBuffer + vectorLength);
 	((USHORT*)dataBuffer)[0] = (USHORT) (vectorLength + sizeof(short));
-	
 	highWater = 0;
-	size +=  totalLength;
 	generation = format->table->database->currentGeneration;
 
+	// This could be called during a thaw, in which case the size is already set.
+
+	if (size == getSize())
+		size += totalLength;
+
 	if (interlocked)
 		{
-		char **ptr = &data.record;
-		
 		// If data.record has changed since allocating the new buffer, then free the new buffer
-		
-		if (!COMPARE_EXCHANGE_POINTER(ptr, NULL, dataBuffer))
+
+		if (!COMPARE_EXCHANGE_POINTER(&data.record, NULL, dataBuffer))
 			{
 			DELETE_RECORD(dataBuffer);
 			totalLength = 0;
@@ -746,42 +750,41 @@ int Record::setEncodedRecord(Stream *str
 	else
 		data.record = dataBuffer;
 
-	return totalLength;
+	return dataBuffer;
 }
 
 const char* Record::getEncodedRecord()
 {
-	// If chilled, restore the record data from the serial log
-	
-	if (state == recChilled)
-		thaw();
-
 	ASSERT(encoding == shortVector);
 
-	return data.record + ((USHORT*) data.record)[0];
+	char* recordData = getRecordData();
+
+	return recordData + ((USHORT*) recordData)[0];
 }
 
 const UCHAR* Record::getEncoding(int index)
 {
+	char* recordData = getRecordData();
+
 	switch (encoding)
 		{
 		case shortVector:
 			{
-			USHORT *vector = (USHORT*) data.record;
+			USHORT *vector = (USHORT*) recordData;
 
 			if (highWater < index)
 				{
-				const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+				const UCHAR *p = (UCHAR*) recordData + vector[highWater];
 
 				while (highWater < index)
 					{
 					const UCHAR *q = EncodedDataStream::skip(p);
-					vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+					vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
 					p = q;
 					}
 				}
 
-			return (UCHAR*) data.record + vector[index];
+			return (UCHAR*) recordData + vector[index];
 			}
 
 		default:
@@ -795,6 +798,8 @@ bool Record::isNull(int fieldId)
 	if (fieldId >= format->maxId)
 		return true;
 
+	char* recordData = getRecordData();
+
 	// If this is an encoded record, parse through the fields
 
 	switch (encoding)
@@ -802,33 +807,33 @@ bool Record::isNull(int fieldId)
 		case shortVector:
 			{
 			int index = format->format[fieldId].physicalId;
-			USHORT *vector = (USHORT*) data.record;
+			USHORT *vector = (USHORT*) recordData;
 
 			if (highWater < index)
 				{
-				const UCHAR *p = (UCHAR*) data.record + vector[highWater];
+				const UCHAR *p = (UCHAR*) recordData + vector[highWater];
 
 				while (highWater < index)
 					{
 					const UCHAR *q = EncodedDataStream::skip(p);
-					vector[++highWater] = (USHORT) (q - (UCHAR*) data.record);
+					vector[++highWater] = (USHORT) (q - (UCHAR*) recordData);
 					p = q;
 					}
 				}
 
-			const UCHAR *q = (UCHAR*) data.record + vector[index];
+			const UCHAR *q = (UCHAR*) recordData + vector[index];
 
 			return *q == edsNull;
 			}
 
 		case valueVector:
-			return ((Value**) data.record)[format->format[fieldId].physicalId]->isNull();
+			return ((Value**) recordData)[format->format[fieldId].physicalId]->isNull();
 
 		case traditional:
 			{
 			FieldFormat *ff = format->format + fieldId;
 
-			return (data.record [NULL_BYTE (ff)] & NULL_BIT (ff)) ? true : false;
+			return (recordData [NULL_BYTE (ff)] & NULL_BIT (ff)) ? true : false;
 			}
 			
 		case byteVector:
@@ -866,32 +871,45 @@ void Record::setPriorVersion(Record *old
 	FATAL("setPriorVersion should only be called for RecordVersions\n");
 }
 
-int Record::thaw(void)
+char *Record::thaw(void)
 {
-	return 0;
+	return data.record;
 }
 
-int Record::setRecordData(const UCHAR * dataIn, int dataLength)
+char* Record::setRecordData(const UCHAR * dataIn, int dataLength, int *bytesReallocated)
 {
 	encoding = shortVector;
 	int vectorLength = format->count * sizeof(short);
 	int totalLength = vectorLength + dataLength;
 	char *dataBuffer = allocRecordData(totalLength);
+	*bytesReallocated = totalLength;
 	memset(dataBuffer, 0, vectorLength);
 	memcpy(dataBuffer + vectorLength, dataIn, dataLength);
 	((USHORT*) dataBuffer)[0] = (USHORT) (vectorLength + sizeof(short));
 	highWater = 0;
-	char **ptr = &data.record;
 	
-	// If data.record has changed since allocating the new buffer, then free the new buffer
-	
-	if (!COMPARE_EXCHANGE_POINTER(ptr, NULL, dataBuffer))
+	// If data.record has changed since allocating the new buffer, 
+	// then free the buffer allocated here and use the new one.
+
+	char* prevBuffer = data.record;
+	while (!isAllocated(prevBuffer))
+		{
+		if (COMPARE_EXCHANGE_POINTER(&data.record, prevBuffer, dataBuffer))
+			break;
+
+		prevBuffer = data.record;
+		}
+
+	// prevBuffer is either NULL or is a buffer other than dataBuffer.
+
+	if (isAllocated(prevBuffer))
 		{
 		DELETE_RECORD(dataBuffer);
-		totalLength = 0;
+		dataBuffer = prevBuffer;
+		*bytesReallocated = 0;
 		}
-		
-	return (totalLength);
+
+	return dataBuffer;
 }
 
 
@@ -902,30 +920,95 @@ void Record::deleteData(void)
 
 void Record::deleteData(bool now)
 {
-	if (data.record)
+	// Detach the pointer from data.record
+
+	char* recordData = data.record;
+	while (isAllocated(recordData))
+		{
+		if (COMPARE_EXCHANGE_POINTER(&data.record, recordData, NULL))
+			break;
+
+		recordData = data.record;
+		}
+
+	// Now free the detached buffer
+
+	if (isAllocated(recordData))
 		{
 		switch (encoding)
 			{
 			case valueVector:
 				if (now)
-					delete [] (Value*) data.record;
+					delete [] (Value*) recordData;
 				else
-					format->table->queueForDelete((Value**) data.record);
+					format->table->queueForDelete((Value**) recordData);
 				break;
 
 			default:
 				if (now)
 					{
-					DELETE_RECORD (data.record);
+					DELETE_RECORD (recordData);
 					}
 				else
-					format->table->queueForDelete((char *) data.record);
+					format->table->queueForDelete((char *) recordData);
 			}
+		}
+}
+
+// chillData is like deleteData except that it is only used during chill
+// in order to get rid of the data.record buffer.  So it always uses
+// queueForDelete.  And instead of setting the pointer to NULL like deleteData,
+// chillData sets it to (-1).
+// Only one thread can chill a record at a time because that 
+// thread must 'own' the transaction.  So this CAS may not be needed.
+
+
+void Record::chillData(void)
+{
+	// Detach the pointer from data.record
+
+	char* recordData = data.record;
+	while (isAllocated(recordData))
+		{
+		if (COMPARE_EXCHANGE_POINTER(&data.record, recordData, recordIsChilled))
+			break;
 
-		data.record = NULL;
+		recordData = data.record;
+		}
+
+	// Now free the detached buffer
+
+	if (isAllocated(recordData))
+		{
+		switch (encoding)
+			{
+			case valueVector:
+				format->table->queueForDelete((Value**) recordData);
+				break;
+
+			default:
+				format->table->queueForDelete((char *) recordData);
+			}
 		}
 }
 
+bool Record::hasRecord(void)
+{
+	char* recordData = data.record;
+	ASSERT(recordData != recordIsChilled);
+	return (recordData != NULL);
+}
+
+bool Record::isChilled(void)
+{
+	return false;		// Only RecordVersions are ever chilled!
+}
+
+bool Record::isDeleted(void)
+{
+	return false;		// Only RecordVersions are ever chilled!
+}
+
 void Record::print(void)
 {
 	printf("  %p\tId %d, enc %d, state %d, use %d, grp " I64FORMAT "\n",
@@ -998,9 +1081,11 @@ void Record::serialize(Serialize* stream
 	stream->putInt(format->version);
 	stream->putInt(state);
 	stream->putInt(size);
-	
-	if (data.record)
-		stream->putData(getEncodedSize(), (UCHAR*) data.record + ((USHORT*) data.record)[0]);
+
+	char* recordData = getRecordData();
+
+	if (recordData)
+		stream->putData(getEncodedSize(), (UCHAR*) recordData + ((USHORT*) recordData)[0]);
 	else
 		stream->putData(0, NULL);
 }
@@ -1012,7 +1097,9 @@ int Record::getSize(void)
 
 int Record::getDataMemUsage(void)
 {
-	return (data.record == NULL ? 0 : MemMgr::blockSize(data.record));
+	char* recordData = getRecordData();
+
+	return (recordData == NULL ? 0 : MemMgr::blockSize(recordData));
 }
 
 int Record::getMemUsage(void)
@@ -1109,3 +1196,14 @@ void Record::queueForDelete(void)
 	state = recQueuedForDelete;
 	format->table->queueForDelete(this);
 }
+
+char* Record::getRecordData(void)
+{
+	return data.record;
+}
+
+char* Record::findRecordData(void)
+{
+	return data.record;
+}
+

=== modified file 'storage/falcon/Record.h'
--- a/storage/falcon/Record.h	2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Record.h	2009-05-12 14:35:40 +0000
@@ -43,7 +43,7 @@ enum RecordEncoding {
 
 static const int recData      = 0;		// record pointer is valid or record is deleted
 static const int recDeleted   = 1;		// record has been deleted
-static const int recChilled   = 2;		// record data is temporarily stored in serial log
+//static const int recChilled   = 2;		// record data is temporarily stored in serial log
 static const int recOnDisk    = 3;		// record is on disk and must be read
 static const int recLock      = 4;		// this is a "record lock" and not a record
 static const int recNoChill   = 5;		// record is in use and should not be chilled
@@ -55,6 +55,8 @@ static const int recPruning   = 10;		// 
 static const int recEndChain  = 11;		// end of chain for garbage collection
 static const int recQueuedForDelete = 12;		// Record is in purgatory.
 
+// Special value for data.record
+static const char* recordIsChilled = (char*) -1;
 
 //#define CHECK_RECORD_ACTIVITY
 #ifdef CHECK_RECORD_ACTIVITY
@@ -114,25 +116,29 @@ public:
 	virtual Record*		fetchVersion (Transaction * transaction);
 	virtual void		retire(void);
 	virtual void		scavenge(TransId targetTransactionId, int oldestActiveSavePointId);
-	virtual bool		isVersion();
-	virtual bool		isSuperceded();
-	virtual bool		isNull(int fieldId);
 	virtual Record*		releaseNonRecursive(void);
 	virtual Record*		clearPriorVersion(void);
 	virtual void		setPriorVersion(Record *oldPriorVersion, Record *newPriorVersion);
 	virtual Record*		getPriorVersion();
 	virtual Record*		getGCPriorVersion(void);
 	virtual	void		print(void);
-	virtual int			thaw(void);
+	virtual char*		thaw(void);
 	virtual const char*	getEncodedRecord();
-	virtual int			setRecordData(const UCHAR *dataIn, int dataLength);
+	virtual char*		setRecordData(const UCHAR *dataIn, int dataLength, int *bytesReallocated);
+	virtual char*		getRecordData(void);
 	virtual void		serialize(Serialize* stream);
 	virtual int			getSize(void);
 	virtual SyncObject* getSyncThaw(void);
 	virtual void		queueForDelete(void);
+	virtual bool		hasRecord();
+	virtual bool		isChilled();
+	virtual bool		isDeleted();
+	virtual bool		isSuperceded();
+	virtual bool		isVersion();
+	virtual bool		isNull(int fieldId);
 
 	const UCHAR*		getEncoding (int index);
-	int					setEncodedRecord(Stream *stream, bool interlocked);
+	char*				setEncodedRecord(Stream *stream, bool interlocked);
 	void				getValue (int fieldId, Value* value);
 	void				getRawValue (int fieldId, Value* value);
 	int					getFormatVersion();
@@ -142,38 +148,25 @@ public:
 	void				addRef();
 	int					getBlobId(int fieldId);
 	void				finalize(Transaction *transaction);
-	void				getEncodedValue (int fieldId, Value *value);
+	void				getEncodedValue(char* recordData, int fieldId, Value *value);
 	bool				getRecord (Stream *stream);
+	int					getAllocatedSize(void);
 	int					getEncodedSize();
 	void				deleteData(void);
 	void				deleteData(bool now);
+	void				chillData(void);
 	void				printRecord(const char* header);
 	void				validateData(void);
 	char*				allocRecordData(int length);
 	void				expungeRecord(void);
 	int					getDataMemUsage(void);
 	int					getMemUsage(void);
-	
+	char*				findRecordData(void);
+
 	Record (Table *table, Format *recordFormat);
 	Record (Table *table, int32 recordNumber, Stream *stream);
 	Record (Database *database, Serialize* stream);
 
-	inline int hasRecord(bool forceThaw = true)
-		{
-		if (state == recChilled && forceThaw)
-			thaw();
-			
-		return (data.record != NULL);
-		}
-
-	inline char* getRecordData()
-		{
-		if (state == recChilled)
-			thaw();
-		
-		return data.record;
-		}
-		
 protected:
 	virtual ~Record();
 	
@@ -182,6 +175,12 @@ protected:
 		char	*record;
 		}		data;
 
+	inline bool isAllocated(char* recordData)
+		{
+		return ((recordData != NULL) && (recordData != recordIsChilled));
+		}
+
+
 public:
 	uint64		generation;
 	Format		*format;

=== modified file 'storage/falcon/RecordVersion.cpp'
--- a/storage/falcon/RecordVersion.cpp	2009-05-01 18:26:25 +0000
+++ b/storage/falcon/RecordVersion.cpp	2009-05-12 14:35:40 +0000
@@ -61,14 +61,9 @@ RecordVersion::RecordVersion(Table *tbl,
 {
 	virtualOffset = 0;
 	
-	//transaction   = trans;
-	//transactionState    = trans->transactionState;
-	//transactionState->addRef();
-
 	transactionState = NULL;
 	setTransactionState(transaction->transactionState);
 	
-	//transactionId = transaction->transactionId;
 	savePointId   = transaction->curSavePointId;
 	superceded    = false;
 
@@ -80,8 +75,9 @@ RecordVersion::RecordVersion(Table *tbl,
 		priorVersion->addRef(REC_HISTORY);
 		recordNumber = oldVersion->recordNumber;
 
-		if (priorVersion->state == recChilled)
-			priorVersion->thaw();
+		// Be sure the priorVersion is thawed.
+		
+		priorVersion->hasRecord();
 		
 		if (transactionState == priorVersion->getTransactionState())
 			oldVersion->setSuperceded (true);
@@ -398,18 +394,21 @@ uint64 RecordVersion::getVirtualOffset()
 	return (virtualOffset);
 }
 
-int RecordVersion::thaw()
+char *RecordVersion::thaw()
 {
+	// We should try without this now that data.record is manipulated safely.
+
 	Sync syncThaw(format->table->getSyncThaw(this), "RecordVersion::thaw");
 	syncThaw.lock(Exclusive);
 
-	int bytesRestored = 0;
+	int bytesReallocated = 0;
 	
 	// Nothing to do if the record is no longer chilled
 	
-	if (state != recChilled)
-		return getDataMemUsage();
-		
+	char* recordData = data.record;
+	if (recordData != recordIsChilled)
+		return recordData;
+
 	// First, try to thaw from the serial log. If transaction->writePending is 
 	// true, then the record data can be restored from the serial log. If writePending
 	// is false, then the record data has been written to the data pages.
@@ -419,38 +418,31 @@ int RecordVersion::thaw()
 	if (trans)
 		{
 		if (trans->writePending)
-			{
-			bytesRestored = trans->thaw(this);
-			
-			if (bytesRestored == 0)
-				trans->thaw(this);
-			}
+			recordData = trans->thaw(this, &bytesReallocated);
 
 		trans->release();
 		}
-	
+
 	// The record data is no longer available in the serial log, so zap the
 	// virtual offset and restore from the data page.
-		
-	bool recordFetched = false;
 
-	if (bytesRestored == 0)
+	if (bytesReallocated == 0)
 		{
 		Stream stream;
 		Table *table = format->table;
-		
+
 		if (table->dbb->fetchRecord(table->dataSection, recordNumber, &stream))
 			{
-			bytesRestored = setEncodedRecord(&stream, true);
-			recordFetched = true;
+			recordData = setEncodedRecord(&stream, true);
+			bytesReallocated = getAllocatedSize();
 			}
-			
-		if (bytesRestored > 0)
+
+		if (bytesReallocated > 0)
 			{
 			virtualOffset = 0;
 			table->debugThawedRecords++;
-			table->debugThawedBytes += bytesRestored;
-			
+			table->debugThawedBytes += bytesReallocated;
+
 			if (table->debugThawedBytes >= table->database->configuration->recordChillThreshold)
 				{
 				Log::debug("%d: Record thaw (fetch): table %d, %ld records, %ld bytes\n", this->format->table->database->deltaTime,
@@ -460,25 +452,19 @@ int RecordVersion::thaw()
 				}
 			}
 		}
-		
-	if (state == recChilled)
-		{
-		if (data.record != NULL)
-			state = recData;
-		}
-		
-	return bytesRestored;
+
+	return recordData;
 }
 
-/***
-char* RecordVersion::getRecordData()
+bool RecordVersion::isChilled(void)
 {
-	if (state == recChilled)
-		thaw();
-		
-	return data.record;
+	return (data.record == recordIsChilled);
+}
+
+bool RecordVersion::isDeleted(void)
+{
+	return (data.record == NULL && state == recDeleted);
 }
-***/
 
 void RecordVersion::print(void)
 {
@@ -517,6 +503,9 @@ void RecordVersion::setTransactionState(
 	if (transactionState)
 		transactionState->release();
 	
+	// Add a use count on the transaction state to ensure it lives 
+	// as long as the record version object
+
 	transactionState = newTransState;
 	transactionState->addRef();
 }
@@ -525,3 +514,28 @@ Transaction* RecordVersion::findTransact
 {
 	return format->table->database->transactionManager->findTransaction(transactionState->transactionId);
 }
+
+bool RecordVersion::hasRecord(void)
+{
+	char* recordData = data.record;
+	if (recordData == NULL)
+		return false;
+
+	if (recordData == recordIsChilled)
+		{
+		recordData = thaw();
+		return (recordData != NULL);
+		}
+
+	return true;
+}
+
+char* RecordVersion::getRecordData()
+{
+	char* recordData = data.record;
+
+	if (recordData == recordIsChilled)
+		recordData = thaw();
+
+	return recordData;
+}

=== modified file 'storage/falcon/RecordVersion.h'
--- a/storage/falcon/RecordVersion.h	2009-04-16 12:09:15 +0000
+++ b/storage/falcon/RecordVersion.h	2009-05-12 14:35:40 +0000
@@ -36,7 +36,6 @@ public:
 	RecordVersion(Table *tbl, Format *fmt, Transaction *tran, Record *oldVersion);
 	RecordVersion(Database* database, Serialize *stream);
 
-	virtual bool		isSuperceded();
 	//virtual Transaction* getTransaction();
 	virtual TransactionState* getTransactionState() const;
 	virtual TransId		getTransactionId();
@@ -46,7 +45,6 @@ public:
 	virtual Record*		getGCPriorVersion(void);
 	virtual void		retire(void);
 	virtual void		scavengeSavepoint(Transaction* targetTransaction, int oldestActiveSavePoint);
-	virtual bool		isVersion();
 	virtual void		rollback(Transaction *transaction);
 	virtual Record*		fetchVersion (Transaction * trans);
 	virtual Record*		releaseNonRecursive();
@@ -54,11 +52,17 @@ public:
 	virtual void		setPriorVersion(Record *oldPriorVersion, Record *newPriorVersion);
 	virtual void		setVirtualOffset(uint64 offset);
 	virtual uint64		getVirtualOffset();
-	virtual int			thaw(void);
+	virtual char*		thaw(void);
 	virtual void		print(void);
 	virtual int			getSize(void);
 	virtual void		serialize(Serialize* stream);
 	virtual Transaction* findTransaction(void);
+	virtual char*		getRecordData();
+	virtual bool		hasRecord();
+	virtual bool		isChilled();
+	virtual bool		isDeleted();
+	virtual bool		isSuperceded();
+	virtual bool		isVersion();
 
 	void				commit();
 	bool				committedBefore(TransId);

=== modified file 'storage/falcon/SRLUpdateRecords.cpp'
--- a/storage/falcon/SRLUpdateRecords.cpp	2009-05-08 18:37:39 +0000
+++ b/storage/falcon/SRLUpdateRecords.cpp	2009-05-12 14:35:40 +0000
@@ -38,36 +38,36 @@ SRLUpdateRecords::~SRLUpdateRecords(void
 {
 }
 
-bool SRLUpdateRecords::chill(Transaction *transaction, RecordVersion *record, uint dataLength)
+bool SRLUpdateRecords::chill(Transaction *transaction, RecordVersion *record)
 {
-	// Record data has been written to the serial log, so release the data
-	// buffer and set the state accordingly
-	
+	// Record data has been written to the serial log.
+	// Before releasing the data, find the allocated size.
+
+	int chillBytes = record->getAllocatedSize();
+
+	// Release the data buffer and set the state accordingly
+
 	ASSERT(record->format);
-	record->deleteData();
-	record->state = recChilled;
-	
+	record->chillData();
+
 	// Update transaction counter and chillPoint
-	
-	transaction->chillPoint = &record->nextInTrans;
 
-	//ASSERT(transaction->totalRecordData >= dataLength);
-	if (transaction->totalRecordData >= dataLength)
-		transaction->totalRecordData -= dataLength;
+	transaction->chillPoint = &record->nextInTrans;
+	if (transaction->totalRecordData >= chillBytes)
+		transaction->totalRecordData -= chillBytes;
 	else
 		transaction->totalRecordData = 0;
-		
+
 	return true;
 }
 
-int SRLUpdateRecords::thaw(RecordVersion *record, bool *thawed)
+char* SRLUpdateRecords::thaw(RecordVersion *record, int *bytesReallocated)
 {
-	*thawed = false;
-	
 	// Nothing to do if record is no longer chilled
 	
-	if (record->state != recChilled)
-		return record->size;
+	char* dataRecord = record->findRecordData();
+	if (dataRecord != recordIsChilled)
+		return dataRecord ;
 
 	// Find the window where the record is stored using the record offset, then
 	// activate the window, reading from disk if necessary
@@ -99,7 +99,7 @@ int SRLUpdateRecords::thaw(RecordVersion
 	int recordNumber = control->getInt();
 	ASSERT(recordNumber == record->recordNumber);
 	int dataLength   = control->getInt();
-	int bytesReallocated = 0;
+	*bytesReallocated = 0;
 
 	window->deactivateWindow();
 	sync.unlock();
@@ -107,31 +107,25 @@ int SRLUpdateRecords::thaw(RecordVersion
 	// setRecordData() handles race conditions with an interlocked compare and exchange,
 	// but check the state and record number anyway
 
-	if (record->state == recChilled)
-		bytesReallocated = record->setRecordData(control->input, dataLength);
+	dataRecord = record->findRecordData();
+	if (dataRecord == recordIsChilled)
+		dataRecord = record->setRecordData(control->input, dataLength, bytesReallocated);
+
 
-	if (bytesReallocated > 0)
+	if (*bytesReallocated > 0)
 		{
 		ASSERT(recordNumber == record->recordNumber);
-		bytesReallocated = record->getEncodedSize();
-		*thawed = true;
 
 		if (log->chilledRecords > 0)
 			log->chilledRecords--;
-		
-		if (log->chilledBytes > uint64(bytesReallocated))
-			log->chilledBytes -= bytesReallocated;
-		else
-			log->chilledBytes = 0;
 		}
 
-	return bytesReallocated;
+	return dataRecord;
 }
 
 void SRLUpdateRecords::append(Transaction *transaction, RecordVersion *records, bool chillRecords)
 {
 	uint32 chilledRecordsWindow = 0;
-	uint32 chilledBytesWindow   = 0;
 	SerialLogTransaction *srlTrans = NULL;
 	int savepointId;
 	
@@ -158,7 +152,6 @@ void SRLUpdateRecords::append(Transactio
 		UCHAR *end = log->writeWarningTrack;
 		
 		chilledRecordsWindow = 0;
-		chilledBytesWindow = 0;
 
 		for (; record; record = record->nextInTrans)
 			{
@@ -169,7 +162,7 @@ void SRLUpdateRecords::append(Transactio
 
 			// Skip chilled records, but advance the chillpoint
 
-			if (record->state == recChilled)
+			if (record->isChilled())
 				{
 				transaction->chillPoint = &record->nextInTrans;
 				continue;
@@ -200,30 +193,25 @@ void SRLUpdateRecords::append(Transactio
 
 			if (record->virtualOffset != 0)
 				{
+				// Record is already in serial log
+
 				if (chillRecords && record->state != recDeleted)
 					{
-					int chillBytes = record->getEncodedSize();
-
-					if (chill(transaction, record, chillBytes))
+					if (chill(transaction, record))
 						{
 						log->chilledRecords++;
-						log->chilledBytes += chillBytes;
 
 						if (transaction->thawedRecords)
 							transaction->thawedRecords--;
 						}
 					}
-				else
-					{
-					// Record is already in serial log
-					}
 
 				continue;
 				}
 			
 			// Load the record data into a stream
 
-			if (record->hasRecord(false))
+			if (record->hasRecord())
 				{
 				if (!record->getRecord(&stream))
 					continue;
@@ -256,11 +244,8 @@ void SRLUpdateRecords::append(Transactio
 			
 			if (chillRecords && record->state != recDeleted)
 				{
-				if (chill(transaction, record, stream.totalLength))
-					{
+				if (chill(transaction, record))
 					chilledRecordsWindow++;
-					chilledBytesWindow += stream.totalLength;
-					}
 				}
 			} // next record version
 		
@@ -281,9 +266,7 @@ void SRLUpdateRecords::append(Transactio
 		if (chillRecords)
 			{
 			log->chilledRecords += chilledRecordsWindow;
-			log->chilledBytes   += chilledBytesWindow;
 			transaction->chilledRecords += chilledRecordsWindow;
-			transaction->chilledBytes += chilledBytesWindow;
 //			uint32 windowNumber = (uint32)log->writeWindow->virtualOffset / SRL_WINDOW_SIZE;
 			}
 		} // next serial log record and write window

=== modified file 'storage/falcon/SRLUpdateRecords.h'
--- a/storage/falcon/SRLUpdateRecords.h	2009-01-17 08:22:44 +0000
+++ b/storage/falcon/SRLUpdateRecords.h	2009-05-12 14:35:40 +0000
@@ -33,8 +33,8 @@ public:
 	virtual void	read(void);
 	virtual void	pass2(void);
 	void			append(Transaction *transaction, RecordVersion *records, bool chillRecords = false);
-	bool			chill(Transaction *transaction, RecordVersion *record, uint dataLength);
-	int				thaw(RecordVersion *record, bool *thawed);
+	bool			chill(Transaction *transaction, RecordVersion *record);
+	char*			thaw(RecordVersion *record, int *bytesReallocated);
 	
 	const UCHAR		*data;
 	int				savepointId;

=== modified file 'storage/falcon/SerialLog.cpp'
--- a/storage/falcon/SerialLog.cpp	2009-04-20 18:40:49 +0000
+++ b/storage/falcon/SerialLog.cpp	2009-05-12 14:35:40 +0000
@@ -102,7 +102,6 @@ SerialLog::SerialLog(Database *db, JStri
 	tracePage = TRACE_PAGE;
 	traceRecord = 0;
 	chilledRecords = 0;
-	chilledBytes = 0;
 	windowReads = 0;
 	priorWindowWrites = 0;
 	windowWrites = 0;

=== modified file 'storage/falcon/SerialLog.h'
--- a/storage/falcon/SerialLog.h	2009-04-20 18:40:49 +0000
+++ b/storage/falcon/SerialLog.h	2009-05-12 14:35:40 +0000
@@ -230,7 +230,6 @@ public:
 	int32				tracePage;
 	int32				traceRecord;
 	uint32				chilledRecords;
-	uint64				chilledBytes;
 	int32				wantToSerializeGophers;
 	int32				serializeGophers;
 	uint64				startRecordVirtualOffset;

=== modified file 'storage/falcon/Table.cpp'
--- a/storage/falcon/Table.cpp	2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Table.cpp	2009-05-12 14:35:40 +0000
@@ -3534,13 +3534,9 @@ Record* Table::fetchForUpdate(Transactio
 					return NULL;
 					}
 
-				if (source->state == recChilled	&& !source->thaw())
-					{
-					source->release(REC_HISTORY);
+				if (!source->hasRecord())
+					FATAL("Committed non-deleted record has no data buffer.\n");
 
-					return NULL;
-					}
-						
 				// Lock the record
 
 				if (dbb->debug & DEBUG_RECORD_LOCKS)

=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp	2009-05-08 18:37:39 +0000
+++ b/storage/falcon/Transaction.cpp	2009-05-12 14:35:40 +0000
@@ -127,9 +127,7 @@ void Transaction::initialize(Connection*
 	totalRecordData = 0;
 	totalRecords = 0;
 	chilledRecords = 0;
-	chilledBytes = 0;
 	thawedRecords = 0;
-	thawedBytes = 0;
 	debugThawedRecords = 0;
 	debugThawedBytes = 0;
 	committedRecords = 0;
@@ -551,12 +549,13 @@ void Transaction::chillRecords()
 				(uint32)(totalDataBefore-totalRecordData), committedRecords, connection->currentStatement);
 }
 
-int Transaction::thaw(RecordVersion * record)
+char* Transaction::thaw(RecordVersion* record, int* bytesReallocated)
 {
 	// Nothing to do if record is no longer chilled
-	
-	if (record->state != recChilled)
-		return record->getDataMemUsage();
+
+	char *dataRecord = record->findRecordData();
+	if (dataRecord != recordIsChilled)
+		return dataRecord;
 
 	// Get pointer to record data in serial log
 
@@ -565,16 +564,14 @@ int Transaction::thaw(RecordVersion * re
 	// Thaw the record then update the total record data bytes for this transaction
 
 	ASSERT(record->getTransactionId() == transactionId);
-	bool thawed;
-	int bytesRestored = control.updateRecords.thaw(record, &thawed);
-	
-	if (bytesRestored > 0 && thawed)
+	dataRecord = control.updateRecords.thaw(record, bytesReallocated);
+
+	if (*bytesReallocated > 0)
 		{
-		totalRecordData += bytesRestored;
+		totalRecordData += *bytesReallocated;
 		thawedRecords++;
-		thawedBytes += bytesRestored;
 		debugThawedRecords++;
-		debugThawedBytes += bytesRestored;
+		debugThawedBytes += *bytesReallocated;
 		}
 
 	if (debugThawedBytes >= database->configuration->recordChillThreshold)
@@ -585,7 +582,7 @@ int Transaction::thaw(RecordVersion * re
 		debugThawedBytes = 0;
 		}
 	
-	return bytesRestored;
+	return dataRecord;
 }
 
 void Transaction::thaw(DeferredIndex * deferredIndex)
@@ -605,7 +602,7 @@ void Transaction::addRecord(RecordVersio
 	else if (record->state == recDeleted)
 		++deletedRecords;
 		
-	totalRecordData += record->getEncodedSize();
+	totalRecordData += record->getAllocatedSize();
 	++totalRecords;
 	
 	// If the transaction size has exceeded the chill threshold,
@@ -616,13 +613,13 @@ void Transaction::addRecord(RecordVersio
 	if (totalRecordData > database->configuration->recordChillThreshold
 		&& !systemTransaction)
 		{
-		UCHAR saveState = record->state;
-		
-		// Chill all records except the current record, which may be part of an update or insert
+		// Chill all records except the current record, 
+		// which may be part of an update or insert
 
-		if (record->state != recLock && record->state != recChilled)
+		UCHAR saveState = record->state;
+		if (record->state != recLock)
 			record->state = recNoChill;
-			
+
 		chillRecords();
 
 		if (record->state == recNoChill)
@@ -696,14 +693,11 @@ void Transaction::removeRecordNoLock(Rec
 		chillPoint = ptr;
 
 	// Adjust total record data count
-	
-	if (record->state != recChilled)
-		{
-		uint32 size = record->getEncodedSize();
-		
-		if (totalRecordData >= size)
-			totalRecordData -= size;
-		}
+
+	uint32 allocatedSize = record->getAllocatedSize();
+
+	if (totalRecordData >= allocatedSize)
+		totalRecordData -= allocatedSize;
 
 	if (record->state == recDeleted && deletedRecords > 0)
 		--deletedRecords;
@@ -1533,6 +1527,11 @@ void Transaction::releaseDeferredIndexes
 		}
 }
 
+// Backlogging is only done by the thread for this transaction which is
+// the thread running backlogRecords().
+// So we assume that all chilling is done and no new record versions
+// will be added to any of these record chains while this is happening.
+
 void Transaction::backlogRecords(void)
 {
 	SavePoint *savePoint = savePoints;
@@ -1541,7 +1540,12 @@ void Transaction::backlogRecords(void)
 		{
 		prior = record->prevInTrans;
 
-		if (!record->hasRecord(false))
+		// Be sure this is the base record.  Backlog all chilled records 
+		// and deleted records, but not lock records
+
+		if (   !record->isSuperceded()
+		    && (   record->isChilled()
+		        || record->isDeleted() ))
 			{
 			Sync syncSP(&syncSavepoints, "Transaction::backlogRecords");
 			syncSP.lock(Shared);

=== modified file 'storage/falcon/Transaction.h'
--- a/storage/falcon/Transaction.h	2009-04-06 12:33:12 +0000
+++ b/storage/falcon/Transaction.h	2009-05-12 14:35:40 +0000
@@ -89,7 +89,7 @@ public:
 	bool		isXidEqual(int testLength, const UCHAR* test);
 	void		releaseRecordLocks(void);
 	void		chillRecords();
-	int			thaw(RecordVersion* record);
+	char*		thaw(RecordVersion* record, int* bytesReallocated);
 	void		thaw(DeferredIndex* deferredIndex);
 	void		print(void);
 	void		printBlockage(void);
@@ -149,9 +149,7 @@ public:
 	uint64			totalRecordData;	// total bytes of record data for this transaction (unchilled + thawed)
 	uint32			totalRecords;		// total record count
 	uint32			chilledRecords;		// total chilled record count
-	uint32			chilledBytes;		// current bytes chilled
 	uint32			thawedRecords;		// total thawed record count
-	uint32			thawedBytes;		// current bytes thawed
 	uint32			debugThawedRecords;
 	uint32			debugThawedBytes;
 	uint32			committedRecords;	// committed record count


Attachment: [text/bzr-bundle] bzr/kevin.lewis@sun.com-20090512143540-f7yjfnr0jlbcroa1.bundle
Thread
bzr commit into mysql-6.0-falcon-team branch (kevin.lewis:2702)Bug#43344 Bug#44721Kevin Lewis12 May