List:Commits« Previous MessageNext Message »
From:Vladislav Vaintroub Date:February 14 2009 9:35pm
Subject:bzr commit into mysql-6.0-falcon-team branch (vvaintroub:3020)
View as plain text  
#At file:///G:/bzr/mysql-6.0-falcon-team/ based on revid:vvaintroub@stripped

 3020 Vladislav Vaintroub	2009-02-14 [merge]
      merge from parent
      added:
        mysql-test/suite/falcon/r/falcon_bug_34182.result
        mysql-test/suite/falcon/t/falcon_bug_34182.test
      modified:
        storage/falcon/Cache.cpp
        storage/falcon/Dbb.cpp
        storage/falcon/DeferredIndex.cpp
        storage/falcon/Log.h
        storage/falcon/RecordScavenge.cpp
        storage/falcon/RecordScavenge.h
        storage/falcon/RecordVersion.cpp
        storage/falcon/SRLDeleteIndex.cpp
        storage/falcon/StorageTable.cpp
        storage/falcon/Table.cpp
        storage/falcon/Table.h
        storage/falcon/Transaction.cpp

=== added file 'mysql-test/suite/falcon/r/falcon_bug_34182.result'
--- a/mysql-test/suite/falcon/r/falcon_bug_34182.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/falcon/r/falcon_bug_34182.result	2009-02-12 20:22:40 +0000
@@ -0,0 +1,141 @@
+*** Bug #34182 ***
+# Initialisation
+SET @@storage_engine = 'Falcon';
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+# Test #1 - Double locking in a subselect
+CREATE TABLE t1 (id INTEGER primary key, x INTEGER) ENGINE falcon;
+CREATE TABLE t2 (b integer, a integer) ENGINE falcon;
+INSERT INTO t1 VALUES (1, 1), (222, 222);
+INSERT INTO t2 VALUES (1, 10), (2, 20), (3, 30);
+SELECT * FROM t1;
+id	x
+1	1
+222	222
+SELECT * FROM t2;
+b	a
+1	10
+2	20
+3	30
+# Following UPDATE should leave record id=1 locked in T1.
+BEGIN;
+UPDATE t2 SET a = 100 WHERE b = (SELECT x FROM t1 WHERE id = b FOR UPDATE);
+# Establish connection conn1 (user = root)
+# conn1 should be able to make updates to t1 id=222
+UPDATE t1 SET x = 2222 WHERE id = 222;
+# conn1 should not be able to make updates to t1 id=1
+UPDATE t1 SET x = 1111 WHERE id = 1;
+# Connect to default and COMMIT
+COMMIT;
+# Connect to conn1 and reap the UPDATE
+# Connect to default and prepare for Test #2
+DROP TABLE t1;
+DROP TABLE t2;
+# Test #2 - Multiple locking within savepoints
+CREATE TABLE t1 (id INTEGER primary key, x INTEGER) ENGINE falcon;
+INSERT INTO t1 VALUES (1, 1), (2, 2), (3, 3);
+BEGIN;
+SELECT * FROM t1 WHERE id = 1 FOR UPDATE;
+id	x
+1	1
+SAVEPOINT SP1;
+SELECT * FROM t1 WHERE id <= 2 FOR UPDATE;
+id	x
+1	1
+2	2
+SAVEPOINT SP2;
+SELECT * FROM t1 WHERE id <= 3 FOR UPDATE;
+id	x
+1	1
+2	2
+3	3
+# This update will fetch all 3 records for update, 
+# replace the lock on id=3, and then attempt to unlock
+# both id=1 and id=2.  But they should remain locked.
+UPDATE t1 SET x = 333 WHERE x = 3;
+SELECT * FROM t1;
+id	x
+1	1
+2	2
+3	333
+# conn1 should not be able to make updates to t1 id=1
+UPDATE t1 SET x = 1111 WHERE id = 1;
+# Establish connection conn2 (user = root)
+# conn2 should not be able to make updates to t1 id=2
+UPDATE t1 SET x = 2222 WHERE id = 2;
+# Connect to default and ROLLBACK TO SP1
+ROLLBACK;
+SELECT * FROM t1;
+id	x
+1	1111
+2	2222
+3	3
+# Check Results For Test 2
+# Check Results
+# Connect to conn1 and reap
+SELECT * FROM t1;
+id	x
+1	1111
+2	2222
+3	3
+# Connect to conn2 and reap
+SELECT * FROM t1;
+id	x
+1	1111
+2	2222
+3	3
+# Test #3 - Multiple locking within savepoints
+# Connect to default and lock records again
+BEGIN;
+SELECT * FROM t1 WHERE id = 1 FOR UPDATE;
+id	x
+1	1111
+SAVEPOINT SP1;
+SELECT * FROM t1 WHERE id <= 2 FOR UPDATE;
+id	x
+1	1111
+2	2222
+SAVEPOINT SP2;
+SELECT * FROM t1 WHERE id <= 3 FOR UPDATE;
+id	x
+1	1111
+2	2222
+3	3
+UPDATE t1 SET x = 333 WHERE x = 3;
+SELECT * FROM t1;
+id	x
+1	1111
+2	2222
+3	333
+ROLLBACK TO SP1;
+SELECT * FROM t1;
+id	x
+1	1111
+2	2222
+3	3
+# conn1 should not be able to make updates to t1 id=1
+UPDATE t1 SET x = 111111 WHERE id = 1;
+# conn2 should be able to make updates to t1 id=2
+UPDATE t1 SET x = 222222 WHERE id = 2;
+# Connect to default and commit;
+COMMIT;
+SELECT * FROM t1;
+id	x
+1	111111
+2	222222
+3	3
+# Check Results
+# Connect to conn1 and reap
+SELECT * FROM t1;
+id	x
+1	111111
+2	222222
+3	3
+# Connect to conn2
+SELECT * FROM t1;
+id	x
+1	111111
+2	222222
+3	3
+# Final cleanup
+DROP TABLE t1;

=== added file 'mysql-test/suite/falcon/t/falcon_bug_34182.test'
--- a/mysql-test/suite/falcon/t/falcon_bug_34182.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_34182.test	2009-02-12 20:22:40 +0000
@@ -0,0 +1,169 @@
+--source include/have_falcon.inc
+#--disable_abort_on_error
+#
+# Bug #34182: SELECT ... FOR UPDATE does not lock when in subquery
+#
+--echo *** Bug #34182 ***
+
+# ----------------------------------------------------- #
+# --- Initialisation                                --- #
+# ----------------------------------------------------- #
+--echo # Initialisation
+let $engine = 'Falcon';
+eval SET @@storage_engine = $engine;
+
+--disable_warnings
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+--enable_warnings
+
+# ----------------------------------------------------- #
+# --- Test #1 - Double locking in a subselect       --- #
+# ----------------------------------------------------- #
+--echo # Test #1 - Double locking in a subselect
+CREATE TABLE t1 (id INTEGER primary key, x INTEGER) ENGINE falcon;
+CREATE TABLE t2 (b integer, a integer) ENGINE falcon;
+INSERT INTO t1 VALUES (1, 1), (222, 222);
+INSERT INTO t2 VALUES (1, 10), (2, 20), (3, 30);
+SELECT * FROM t1;
+SELECT * FROM t2;
+
+--echo # Following UPDATE should leave record id=1 locked in T1.
+BEGIN;
+UPDATE t2 SET a = 100 WHERE b = (SELECT x FROM t1 WHERE id = b FOR UPDATE);
+
+
+--echo # Establish connection conn1 (user = root)
+connect (conn1,localhost,root,,);
+connection conn1;
+
+--echo # conn1 should be able to make updates to t1 id=222
+UPDATE t1 SET x = 2222 WHERE id = 222;  
+
+--echo # conn1 should not be able to make updates to t1 id=1
+--send  UPDATE t1 SET x = 1111 WHERE id = 1
+
+--echo # Connect to default and COMMIT
+connection default;
+--real_sleep 1
+COMMIT;
+
+--echo # Connect to conn1 and reap the UPDATE
+connection conn1;
+--reap
+--echo # Connect to default and prepare for Test #2
+connection default;
+DROP TABLE t1;
+DROP TABLE t2;
+
+# ----------------------------------------------------- #
+# --- Test #2 - Multiple locking within savepoints  --- #
+# ----------------------------------------------------- #
+--echo # Test #2 - Multiple locking within savepoints
+CREATE TABLE t1 (id INTEGER primary key, x INTEGER) ENGINE falcon;
+INSERT INTO t1 VALUES (1, 1), (2, 2), (3, 3);
+BEGIN;
+SELECT * FROM t1 WHERE id = 1 FOR UPDATE;
+SAVEPOINT SP1;
+SELECT * FROM t1 WHERE id <= 2 FOR UPDATE;
+SAVEPOINT SP2;
+SELECT * FROM t1 WHERE id <= 3 FOR UPDATE;
+
+--echo # This update will fetch all 3 records for update, 
+--echo # replace the lock on id=3, and then attempt to unlock
+--echo # both id=1 and id=2.  But they should remain locked.
+UPDATE t1 SET x = 333 WHERE x = 3;
+SELECT * FROM t1;
+
+
+--echo # conn1 should not be able to make updates to t1 id=1
+connection conn1;
+--send  UPDATE t1 SET x = 1111 WHERE id = 1
+
+--echo # Establish connection conn2 (user = root)
+connect (conn2,localhost,root,,);
+connection conn2;
+--echo # conn2 should not be able to make updates to t1 id=2
+--send  UPDATE t1 SET x = 2222 WHERE id = 2
+
+--echo # Connect to default and ROLLBACK TO SP1
+connection default;
+--real_sleep 1
+ROLLBACK;
+--real_sleep 1
+SELECT * FROM t1;
+
+# ----------------------------------------------------- #
+# --- Check Results For Test 2                      --- #
+# ----------------------------------------------------- #
+--echo # Check Results For Test 2
+--echo # Check Results
+--echo # Connect to conn1 and reap
+connection conn1;
+--reap
+SELECT * FROM t1;
+
+--echo # Connect to conn2 and reap
+connection conn2;
+--reap
+SELECT * FROM t1;
+
+
+# ----------------------------------------------------- #
+# --- Test #3 - More locking within savepoints      --- #
+# ----------------------------------------------------- #
+--echo # Test #3 - Multiple locking within savepoints
+--echo # Connect to default and lock records again
+connection default;
+BEGIN;
+SELECT * FROM t1 WHERE id = 1 FOR UPDATE;
+SAVEPOINT SP1;
+SELECT * FROM t1 WHERE id <= 2 FOR UPDATE;
+SAVEPOINT SP2;
+SELECT * FROM t1 WHERE id <= 3 FOR UPDATE;
+UPDATE t1 SET x = 333 WHERE x = 3;
+SELECT * FROM t1;
+ROLLBACK TO SP1;
+SELECT * FROM t1;
+
+
+--echo # conn1 should not be able to make updates to t1 id=1
+connection conn1;
+--send  UPDATE t1 SET x = 111111 WHERE id = 1
+
+--echo # conn2 should be able to make updates to t1 id=2
+connection conn2;
+UPDATE t1 SET x = 222222 WHERE id = 2;
+
+
+--echo # Connect to default and commit;
+connection default;
+--real_sleep 1
+COMMIT;
+--real_sleep 1
+SELECT * FROM t1;
+
+
+# ----------------------------------------------------- #
+# --- Check Results For Test 3                      --- #
+# ----------------------------------------------------- #
+--echo # Check Results
+--echo # Connect to conn1 and reap
+connection conn1;
+--reap
+SELECT * FROM t1;
+
+--echo # Connect to conn2
+connection conn2;
+SELECT * FROM t1;
+
+# ----------------------------------------------------- #
+# --- Final cleanup                                 --- #
+# ----------------------------------------------------- #
+--echo # Final cleanup
+connection default;
+disconnect conn1;
+disconnect conn2;
+
+DROP TABLE t1;
+

=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp	2009-02-10 10:32:44 +0000
+++ b/storage/falcon/Cache.cpp	2009-02-12 19:31:23 +0000
@@ -966,7 +966,7 @@ void Cache::ioThread(void)
 			if (!flushing) 
 				{
 				// Check whether we have been requested to exit the IO thread.
-				// Note that is is neccessary to hold the flush lock when
+				// Note that this is neccessary to hold the flush lock when
 				// checking the shutdown status to avoid that a new 
 				// flush request arriving just before the shutdown command is
 				// overlooked.

=== modified file 'storage/falcon/Dbb.cpp'
--- a/storage/falcon/Dbb.cpp	2009-02-13 15:53:28 +0000
+++ b/storage/falcon/Dbb.cpp	2009-02-14 15:57:14 +0000
@@ -407,6 +407,7 @@ int32 Dbb::findNextRecord(Section *secti
 int32 Dbb::createIndex(TransId transId, int indexVersion)
 {
 	int indexId;
+	
 	switch (indexVersion)
 		{
 		case INDEX_VERSION_0:

=== modified file 'storage/falcon/DeferredIndex.cpp'
--- a/storage/falcon/DeferredIndex.cpp	2009-02-11 23:19:43 +0000
+++ b/storage/falcon/DeferredIndex.cpp	2009-02-12 19:31:23 +0000
@@ -464,13 +464,12 @@ int DeferredIndex::compare(DINode* node1
 	if ((ret = memcmp(node1->key, node2->key, minLen)))
 		return ret;
 
-	// Still equal, check wich key that is the longest
+	// Still equal, check which key that is the longest
 
 	if ((ret = len1 > len2 ? 1 : len1 < len2 ? -1 : 0))
 		return ret;
 
-	// Still equal, check which has the greatest
-	// recordNumber
+	// Still equal, check which has the greatest recordNumber
 	
 	int32 rno1 = node1->recordNumber;
 	int32 rno2 = node2->recordNumber;
@@ -504,7 +503,7 @@ int DeferredIndex::compare(IndexKey *nod
 	if (partial)
 		return 0;
 
-	// Still equal, check wich key that is the longest
+	// Still equal, check which key that is the longest
 
 	return len1 > len2 ? 1 : len1 < len2 ? -1 : 0;
 }
@@ -529,13 +528,12 @@ int DeferredIndex::compare(IndexKey* nod
 	if ((ret = memcmp(node1->key, node2->key, minLen)))
 		return ret;
 
-	// Still equal, check wich key that is the longest
+	// Still equal, check which key that is the longest
 
 	if ((ret = len1 > len2 ? 1 : len1 < len2 ? -1 : 0))
 		return ret;
 
-	// Still equal, check which has the greatest
-	// recordNumber
+	// Still equal, check which has the greatest recordNumber
 
 	int32 rno2 = node2->recordNumber;
 	return recordNumber > rno2 ? 1 : recordNumber < rno2 ? -1 : 0;

=== modified file 'storage/falcon/Log.h'
--- a/storage/falcon/Log.h	2008-11-20 17:05:50 +0000
+++ b/storage/falcon/Log.h	2009-02-12 19:31:23 +0000
@@ -27,20 +27,23 @@
 #include <stdarg.h>
 #include "SyncObject.h"
 
-static const int	LogLog			= 1;
-static const int	LogDebug		= 2;
-static const int	LogInfo			= 4;
-static const int	LogJavaLog		= 8;
-static const int	LogJavaDebug	= 16;
-static const int	LogGG			= 32;
-static const int	LogPanic		= 64;
-static const int	LogScrub		= 128;
-static const int	LogException	= 256;
-static const int	LogScavenge		= 512;
-static const int	LogXARecovery	= 1024;
-static const int	LogMysqlInfo		= 0x20000000;
-static const int	LogMysqlWarning		= 0x40000000;
-static const int	LogMysqlError		= 0x80000000;
+// To activate certain messages, set global falcon_debug_mask=x 
+// where x is the total of the message types you want to get.
+
+static const int	LogLog           = 0x00000001;	// 1;
+static const int	LogDebug         = 0x00000002;	// 2;
+static const int	LogInfo          = 0x00000004;	// 4;
+static const int	LogJavaLog       = 0x00000008;	// 8;
+static const int	LogJavaDebug     = 0x00000010;	// 16;
+static const int	LogGG            = 0x00000020;	// 32;
+static const int	LogPanic         = 0x00000040;	// 64;
+static const int	LogScrub         = 0x00000080;	// 128;
+static const int	LogException     = 0x00000100;	// 256;
+static const int	LogScavenge      = 0x00000200;	// 512;
+static const int	LogXARecovery    = 0x00000400;	// 1024;
+static const int	LogMysqlInfo     = 0x20000000;
+static const int	LogMysqlWarning  = 0x40000000;
+static const int	LogMysqlError    = 0x80000000;
 
 typedef void (Listener) (int, const char*, void *arg);
 

=== modified file 'storage/falcon/RecordScavenge.cpp'
--- a/storage/falcon/RecordScavenge.cpp	2009-02-03 02:53:42 +0000
+++ b/storage/falcon/RecordScavenge.cpp	2009-02-12 19:31:23 +0000
@@ -60,6 +60,7 @@ RecordScavenge::RecordScavenge(Database 
 	retireableSpace = 0;
 	unScavengeableRecords = 0;
 	unScavengeableSpace = 0;
+	maxChainLength = 0;
 
 	Sync syncActive(&database->transactionManager->activeTransactions.syncObject, "RecordScavenge::RecordScavenge");
 	syncActive.lock(Shared);
@@ -82,12 +83,10 @@ bool RecordScavenge::canBeRetired(Record
 		if (!record->isVersion())
 			return true;
 
-		RecordVersion * recVer = (RecordVersion *) record;
-		ASSERT(!recVer->superceded);  // Must be the base record
-
-		// This record version may be retired if 
-		// it is currently not pointed to by a transaction
+		// This record version may be retired if it is
+		// currently not pointed to by a transaction.
 
+		RecordVersion * recVer = (RecordVersion *) record;
 		if (!recVer->transaction)
 			return true;
 		}
@@ -107,6 +106,7 @@ bool RecordScavenge::canBeRetired(Record
 
 Record* RecordScavenge::inventoryRecord(Record* record)
 {
+	int chainLength = 0;
 	Record *oldestVisibleRec = NULL;
 
 	Sync syncPrior(record->getSyncPrior(), "RecordScavenge::inventoryRecord");
@@ -114,6 +114,8 @@ Record* RecordScavenge::inventoryRecord(
 
 	for (Record *rec = record; rec; rec = rec->getPriorVersion())
 		{
+		if (++chainLength > maxChainLength)
+			maxChainLength = chainLength;
 		int scavengeType = CANNOT_SCAVENGE;  // Initial value
 
 		++totalRecords;
@@ -310,6 +312,9 @@ void RecordScavenge::print(void)
 	Log::log (LogScavenge,"Cycle=" I64FORMAT 
 		"  Inventory; unScavengeable records=" I64FORMAT " containing " I64FORMAT " bytes\n", 
 		cycle, unScavengeableRecords, unScavengeableSpace);
+	Log::log (LogScavenge,"Cycle=" I64FORMAT 
+		"  Inventory; max prior reord chain length=" I64FORMAT "\n", 
+		cycle, maxChainLength);
 
 	// Results of the Scavenge Cycle;
 

=== modified file 'storage/falcon/RecordScavenge.h'
--- a/storage/falcon/RecordScavenge.h	2009-02-01 08:18:51 +0000
+++ b/storage/falcon/RecordScavenge.h	2009-02-12 19:31:23 +0000
@@ -68,6 +68,7 @@ public:
 	uint64		retireableSpace;
 	uint64		unScavengeableRecords;
 	uint64		unScavengeableSpace;
+	uint64		maxChainLength;
 	uint64		ageGroups[AGE_GROUPS];
 	uint64		veryOldRecords;
 	uint64		veryOldRecordSpace;

=== modified file 'storage/falcon/RecordVersion.cpp'
--- a/storage/falcon/RecordVersion.cpp	2009-02-03 02:53:42 +0000
+++ b/storage/falcon/RecordVersion.cpp	2009-02-12 19:31:23 +0000
@@ -252,22 +252,23 @@ void RecordVersion::scavengeSavepoint(Tr
 		if (trans)
 			trans->removeRecord((RecordVersion*) rec);
 		}
-	
+
 	// If we didn't find anyone, there's nothing to do
-	
+
 	if (!ptr)
 		return;
-	
-	// There are intermediate versions to collapse.  Splice the unnecessary ones out of the loop
-	
+
+	// There are intermediate versions to collapse.  Make this 
+	// priorRecord point past the intermediate version(s) to the 
+	// next staying version.  
+
 	Record *prior = priorVersion;
 	prior->addRef();
 
 	syncPrior.unlock();
 	syncPrior.lock(Exclusive);
-	
+
 	setPriorVersion(rec);
-	//ptr->setPriorVersion(NULL);
 	ptr->state = recEndChain;
 	format->table->garbageCollect(prior, this, transaction, false);
 	prior->release();

=== modified file 'storage/falcon/SRLDeleteIndex.cpp'
--- a/storage/falcon/SRLDeleteIndex.cpp	2009-02-10 22:50:04 +0000
+++ b/storage/falcon/SRLDeleteIndex.cpp	2009-02-12 19:31:23 +0000
@@ -105,7 +105,6 @@ void SRLDeleteIndex::redo()
 	log->bumpIndexIncarnation(indexId, tableSpaceId, objDeleted);
 }
 
-
 void SRLDeleteIndex::print()
 {
 	logPrint("Delete Index %d\n", indexId);

=== modified file 'storage/falcon/StorageTable.cpp'
--- a/storage/falcon/StorageTable.cpp	2009-01-15 20:29:54 +0000
+++ b/storage/falcon/StorageTable.cpp	2009-02-12 20:22:40 +0000
@@ -628,7 +628,7 @@ void StorageTable::unlockRow(void)
 {
 	if (recordLocked)
 		{
-		share->table->unlockRecord(record->recordNumber);
+		share->table->unlockRecord(record->recordNumber, storageConnection->verbMark);
 		recordLocked = false;
 		}
 }

=== modified file 'storage/falcon/Table.cpp'
--- a/storage/falcon/Table.cpp	2009-02-10 22:00:13 +0000
+++ b/storage/falcon/Table.cpp	2009-02-14 17:11:43 +0000
@@ -531,6 +531,7 @@ Record* Table::fetchNext(int32 start)
 
 				if (record->state == recInserting)
 					{
+					record->release();
 					recordNumber = bitNumber + 1;
 					continue;
 					}
@@ -1902,7 +1903,7 @@ void Table::pruneRecords(RecordScavenge 
 	if (!records)
 		return;
 
-	Sync syncObj(&syncObject, "Table::retireRecords");
+	Sync syncObj(&syncObject, "Table::pruneRecords");
 	syncObj.lock(Shared);
 
 	if (records)
@@ -2692,7 +2693,8 @@ bool Table::checkUniqueRecordVersion(int
 			if (!activeTransaction)
 				{
 				activeTransaction = dup->getTransaction();
-				activeTransaction->addRef();
+				if (activeTransaction)
+					activeTransaction->addRef();
 				}
 
 			continue;  // check next record version
@@ -3392,34 +3394,39 @@ void Table::waitForWriteComplete()
 	database->waitForWriteComplete(this);
 }
 
-void Table::unlockRecord(int recordNumber)
+void Table::unlockRecord(int recordNumber, int verbMark)
 {
 	Record *record = fetch(recordNumber);
 
 	if (record)
 		{
 		if (record->state == recLock)
-			unlockRecord((RecordVersion*) record);
-		
+			unlockRecord((RecordVersion*) record, verbMark);
+
 		record->release();
 		}
 }
 
-void Table::unlockRecord(RecordVersion* record)
+void Table::unlockRecord(RecordVersion* record, int verbMark)
 {
-	//int uc = record->useCount;
-	ASSERT(record->getPriorVersion());
+	if (record->state != recLock)
+		return;
 
 	// A lock record that has superceded=true is already unlocked
 
-	if ((record->state == recLock) && !record->isSuperceded())
-		{
-		Record *prior = record->getPriorVersion();
-		if (insertIntoTree(prior, record, record->recordNumber))
-			record->setSuperceded(true);
-		else
-			Log::debug("Table::unlockRecord: record lock not in record tree\n");
-		}
+	if (record->isSuperceded())
+		return;
+
+	// Only unlock records at the current savepoint
+
+	if (record->savePointId < verbMark)
+		return;
+
+	Record *prior = record->getPriorVersion();
+	if (insertIntoTree(prior, record, record->recordNumber))
+		record->setSuperceded(true);
+	else
+		Log::debug("Table::unlockRecord: record lock not in record tree\n");
 }
 
 void Table::checkAncestor(Record* current, Record* oldRecord)
@@ -3448,47 +3455,54 @@ void Table::checkAncestor(Record* curren
 
 Record* Table::fetchForUpdate(Transaction* transaction, Record* source, bool usingIndex)
 {
-	Record *record = source;
-	int recordNumber = record->recordNumber;
+	//  Find the record that will be locked
+
+	int recordNumber = source->recordNumber;
 
 	// If we already have this locked or updated, get the active version
 
-	if (record->getTransaction() == transaction)
+	if (source->getTransaction() == transaction)
 		{
-		if (record->state == recDeleted)
+		if (source->state == recDeleted)
 			{
-			record->release();
+			source->release();
 			
 			return NULL;
 			}
 
-		if (record->state != recLock)
-			return record;
+		if (source->state != recLock)
+			return source;
+
+		// The source record (base record) is a lockRecord.
+		// There is only one lock record.  It is at the savepoint 
+		// where it was locked. The prior record is the real record.
 
-		Sync syncPrior(getSyncPrior(record), "Table::fetchForUpdate");
+		Sync syncPrior(getSyncPrior(source), "Table::fetchForUpdate");
 		syncPrior.lock(Shared);
 	
-		Record *prior = record->getPriorVersion();
+		Record *prior = source->getPriorVersion();
 		prior->addRef();
-		record->release();
+		source->release();
 
 		return prior;
 		}
 
+	// The record number is not currently locked.
+
 	for (;;)
 		{
 		// Try to avoid getting a lock if there is no way we will be updating this record.
 
-		if (!transaction->needToLock(record))
+		if (!transaction->needToLock(source))
 			{
-			record->release();
+			source->release();
 
 			return NULL;
 			}
 
 		// We may need to lock the record
 
-		State state = transaction->getRelativeState(record, WAIT_IF_ACTIVE);
+		State state = transaction->getRelativeState(source, WAIT_IF_ACTIVE);
 
 		switch (state)
 			{
@@ -3496,23 +3510,23 @@ Record* Table::fetchForUpdate(Transactio
 				// CommittedInvisible only happens for consistent read.
 
 				ASSERT(IS_CONSISTENT_READ(transaction->isolationLevel));
-				record->release();
+				source->release();
 				Log::debug("Table::fetchForUpdate: Update Conflict: TransId=%d, RecordNumber=%d, Table %s.%s\n", 
-					transaction->transactionId, record->recordNumber, schemaName, name);
+					transaction->transactionId, source->recordNumber, schemaName, name);
 				throw SQLError(UPDATE_CONFLICT, "update conflict in table %s.%s", schemaName, name);
 
 			case CommittedVisible:
 				{
-				if (record->state == recDeleted)
+				if (source->state == recDeleted)
 					{
-					record->release();
+					source->release();
 
 					return NULL;
 					}
 
-				if (record->state == recChilled	&& !record->thaw())
+				if (source->state == recChilled	&& !source->thaw())
 					{
-					record->release();
+					source->release();
 
 					return NULL;
 					}
@@ -3523,44 +3537,45 @@ Record* Table::fetchForUpdate(Transactio
 					Log::debug("Table::fetchForUpdate: TransactionId=%d, isolationLevel=%d, recordNumber=%d\n", 
 					           transaction->transactionId, transaction->isolationLevel, recordNumber);
 
-				RecordVersion *recordVersion = allocRecordVersion(NULL, transaction, record);
-				recordVersion->state = recLock;
-				
-				if (insertIntoTree(recordVersion, record, recordNumber))
+				RecordVersion *lockRecord = allocRecordVersion(NULL, transaction, source);
+				lockRecord->state = recLock;
+
+				if (insertIntoTree(lockRecord, source, recordNumber))
 					{
-					transaction->addRecord(recordVersion);
-					recordVersion->release();
+					transaction->addRecord(lockRecord);
+					lockRecord->release();
 
-					ASSERT(record->useCount >= 2);
-						
-					return record;
+					ASSERT(source->useCount >= 2);
+
+					return source;
 					}
-		
+
 #ifdef CHECK_RECORD_ACTIVITY
-				recordVersion->active = false;
+				lockRecord->active = false;
 #endif
-				recordVersion->release();
+				lockRecord->release();
 				}
 				break;
-			
+
 			case Deadlock:
-				record->release();
+				source->release();
 				throw SQLError(DEADLOCK, "Deadlock on table %s.%s, tid %d", schemaName, name, transaction->transactionId);
-				
+
 			case WasActive:
 			case RolledBack:
-				break;
-				
+				break;  // need to re-fetch the base record
+
 			default:
-				record->release();
+				source->release();
 				Log::debug("Table::fetchForUpdate: unexpected state %d\n", state);
 				throw SQLError(RUNTIME_ERROR, "unexpected transaction state %d", state);
 			}
-			
-		record->release();
-		record = fetch(recordNumber);
-		
-		if (record == NULL)
+
+		source->release();
+
+		source = fetch(recordNumber);
+
+		if (source == NULL)
 			return NULL;	
 		}
 }

=== modified file 'storage/falcon/Table.h'
--- a/storage/falcon/Table.h	2009-02-09 05:01:44 +0000
+++ b/storage/falcon/Table.h	2009-02-12 20:22:40 +0000
@@ -211,8 +211,8 @@ public:
 	Record*			allocRecord(int recordNumber, Stream* stream);
 	Format*			getCurrentFormat(void);
 	Record*			fetchForUpdate(Transaction* transaction, Record* record, bool usingIndex);
-	void			unlockRecord(int recordNumber);
-	void			unlockRecord(RecordVersion* record);
+	void			unlockRecord(int recordNumber, int verbMark);
+	void			unlockRecord(RecordVersion* record, int verbMark);
 
 	void			insert (Transaction *transaction, int count, Field **fields, Value **values);
 	uint			insert (Transaction *transaction, Stream *stream);

=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp	2009-01-09 21:49:16 +0000
+++ b/storage/falcon/Transaction.cpp	2009-02-12 20:45:15 +0000
@@ -396,7 +396,7 @@ void Transaction::rollback()
 		record->nextInTrans = NULL;
 
 		if (record->state == recLock)
-			record->format->table->unlockRecord(record);
+			record->format->table->unlockRecord(record, 0);
 		else
 			record->rollback(this);
 		
@@ -406,10 +406,14 @@ void Transaction::rollback()
 	firstRecord = NULL;
 	syncRec.unlock();
 
+	Sync syncSP(&syncSavepoints, "Transaction::rollback");
+	syncSP.lock(Shared);
 	for (SavePoint *savePoint = savePoints; savePoint; savePoint = savePoint->next)
 		if (savePoint->backloggedRecords)
 			database->backLog->rollbackRecords(savePoint->backloggedRecords, this);
 
+	syncSP.unlock();
+
 	if (backloggedRecords)
 		database->backLog->rollbackRecords(backloggedRecords, this);
 
@@ -519,11 +523,16 @@ void Transaction::chillRecords()
 
 	// The idea is that if savepoint N is rolled back, then chilled records attached to
 	// savepoints >= N	are ignored and not committed to the database.
-	
+
+	Sync syncSP(&syncSavepoints, "Transaction::rollback");
+	syncSP.lock(Shared);
+
 	for (SavePoint *savePoint = savePoints; savePoint; savePoint = savePoint->next)
 		if (savePoint->id != curSavePointId)
 			savePoint->setIncludedSavepoint(curSavePointId);
-	
+
+	syncSP.unlock();
+
 	if (database->lowMemory)
 		backlogRecords();
 
@@ -651,10 +660,15 @@ void Transaction::removeRecordNoLock(Rec
 	record->nextInTrans = NULL;
 	record->transaction = NULL;
 
+	Sync syncSP(&syncSavepoints, "Transaction::rollback");
+	syncSP.lock(Shared);
+
 	for (SavePoint *savePoint = savePoints; savePoint; savePoint = savePoint->next)
 		if (savePoint->records == &record->nextInTrans)
 			savePoint->records = ptr;
 
+	syncSP.unlock();
+
 	if (chillPoint == &record->nextInTrans)
 		chillPoint = ptr;
 
@@ -1041,16 +1055,12 @@ void Transaction::release()
 
 int Transaction::createSavepoint()
 {
-	Sync sync(&syncSavepoints, "Transaction::createSavepoint");
 	SavePoint *savePoint;
+	Sync sync(&syncSavepoints, "Transaction::createSavepoint");
+	sync.lock(Exclusive);
 	
 	ASSERT((savePoints || freeSavePoints) ? (savePoints != freeSavePoints) : true);
 	
-	// System transactions require an exclusive lock for concurrent access
-	
-	if (systemTransaction)
-		sync.lock(Exclusive);
-	
 	if ( (savePoint = freeSavePoints) )
 		freeSavePoints = savePoint->next;
 	else
@@ -1073,34 +1083,31 @@ void Transaction::releaseSavepoint(int s
 {
 	//validateRecords();
 	Sync sync(&syncSavepoints, "Transaction::releaseSavepoint");
+	sync.lock(Exclusive);
 
-	// System transactions require an exclusive lock for concurrent access
+	// The savePoints list goes from newest to oldest within this transaction.
 
-	if (systemTransaction)
-		sync.lock(Exclusive);
-	
 	for (SavePoint **ptr = &savePoints, *savePoint; (savePoint = *ptr); ptr = &savePoint->next)
 		if (savePoint->id == savePointId)
 			{
-			
 			// Savepoints are linked in descending order, so the next lower id is next on the list
-			
+
 			int nextLowerSavePointId = (savePoint->next) ? savePoint->next->id : 0;
 			*ptr = savePoint->next;
-			
-			// If we have backed logged records, merge them in to the previous savepoint or the transaction itself
-			
+
+			// If we have backlogged records, merge them in to the 
+			// previous savepoint or the transaction itself
+
 			if (savePoint->backloggedRecords)
 				{
 				SavePoint *nextSavePoint = savePoint->next;
-				
+
 				if (nextSavePoint)
 					{
 					if (nextSavePoint->backloggedRecords)
 						nextSavePoint->backloggedRecords->orBitmap(savePoint->backloggedRecords);
 					else
 						nextSavePoint->backloggedRecords = savePoint->backloggedRecords;
-					
 					}
 				else
 					{
@@ -1109,17 +1116,22 @@ void Transaction::releaseSavepoint(int s
 					else
 						backloggedRecords = savePoint->backloggedRecords;
 					}
-					
+
 				savePoint->backloggedRecords = NULL;
 				}
-					
+
 			if (savePoint->savepoints)
 				savePoint->clear();
 
-			// This savepoint is no longer needed, so commit pending record versions to the next pending savepoint
-			// Scavenge prior record versions having 1) the same transaction and 2) savepoint >= the savepoint being released
-			
-			for (RecordVersion *record = *savePoint->records; record && record->savePointId == savePointId; record = record->nextInTrans)
+			// This savepoint is no longer needed, so commit pending 
+			// record versions to the next pending savepoint
+			// Scavenge prior record versions having 
+			// 1) the same transaction and 
+			// 2) savepoint >= the savepoint being released
+
+			for (RecordVersion *record = *savePoint->records; 
+				 record && record->savePointId == savePointId; 
+				 record = record->nextInTrans)
 				{
 				record->savePointId = nextLowerSavePointId;
 				record->scavengeSavepoint(transactionId, nextLowerSavePointId);
@@ -1138,22 +1150,19 @@ void Transaction::releaseSavepoint(int s
 
 void Transaction::releaseSavepoints(void)
 {
-	Sync sync(&syncSavepoints, "Transaction::releaseSavepoints");
 	SavePoint *savePoint;
+	Sync sync(&syncSavepoints, "Transaction::releaseSavepoints");
+	sync.lock(Exclusive);
 	
 	// System transactions require an exclusive lock for concurrent access
 
-	if (systemTransaction)
-		sync.lock(Exclusive);
-	
-	
 	while ( (savePoint = savePoints) )
 		{
 		savePoints = savePoint->next;
-		
+
 		if (savePoint->savepoints)
 			savePoint->clear();
-			
+
 		if (savePoint < localSavePoints || savePoint >= localSavePoints + LOCAL_SAVE_POINTS)
 			delete savePoint;
 		}
@@ -1169,17 +1178,13 @@ void Transaction::releaseSavepoints(void
 
 void Transaction::rollbackSavepoint(int savePointId)
 {
+	SavePoint *savePoint;
 	//validateRecords();
 	Sync sync(&syncSavepoints, "Transaction::rollbackSavepoint");
-	SavePoint *savePoint;
-
-	// System transactions require an exclusive lock for concurrent access
-
-	if (systemTransaction)
-		sync.lock(Exclusive);
+	sync.lock(Exclusive);
 
 	// Be sure the target savepoint is valid before rolling them back.
-	
+
 	for (savePoint = savePoints; savePoint; savePoint = savePoint->next)
 		if (savePoint->id <= savePointId)
 			break;
@@ -1304,7 +1309,7 @@ void Transaction::releaseRecordLocks(voi
 	for (RecordVersion *record = firstRecord; record; record = record->nextInTrans)
 		if (record->state == recLock)
 			{
-			record->format->table->unlockRecord(record);
+			record->format->table->unlockRecord(record, 0);
 
 			// Don't do  removeRecord(record); now.  Other threads might be 
 			// pointing to it and need the transaction pointer to determine 
@@ -1483,28 +1488,31 @@ void Transaction::releaseDeferredIndexes
 void Transaction::backlogRecords(void)
 {
 	SavePoint *savePoint = savePoints;
-	
+
 	for (RecordVersion *record = lastRecord, *prior; record; record = prior)
 		{
 		prior = record->prevInTrans;
-		
+
 		if (!record->hasRecord(false))
 			{
+			Sync syncSP(&syncSavepoints, "Transaction::rollback");
+			syncSP.lock(Shared);
+
 			if (savePoints)
 				for (; savePoint && record->savePointId < savePoint->id; savePoint = savePoint->next)
 					;	
-									
+
 			if (savePoint)
 				savePoint->backlogRecord(record);
 			else
 				{
 				if (!backloggedRecords)
 					backloggedRecords = new Bitmap;
-				
+
 				int32 backlogId = record->format->table->backlogRecord(record);
 				backloggedRecords->set(backlogId);
 				}
-			
+
 			removeRecord(record);
 			}
 		}

Thread
bzr commit into mysql-6.0-falcon-team branch (vvaintroub:3020)Vladislav Vaintroub14 Feb