List:Commits« Previous MessageNext Message »
From:Kevin Lewis Date:December 19 2008 7:55pm
Subject:bzr push into mysql-6.0-falcon-team branch (klewis:2948 to 2949) Bug#41194,
Bug#41521, Bug#41564
View as plain text  
 2949 Kevin Lewis	2008-12-19
      Bug#41194 - The Transaction knows about the serial log.  
      Let it make calls to SerialLogRecord functions directly.
      Separate the serial log flush of commit and rollback records
      from when we allow the gophers to start processing them.
      Once those records are flushed, the recovery will be able to 
      process them, but we do not want the gophers to do that until 
      the commit or rollback is fully finished.  
      There is no need for the prepare flush to also start a gopher.
modified:
  storage/falcon/Database.cpp
  storage/falcon/Database.h
  storage/falcon/Dbb.cpp
  storage/falcon/Dbb.h
  storage/falcon/SRLCommit.cpp
  storage/falcon/SRLPrepare.cpp
  storage/falcon/SRLRollback.cpp
  storage/falcon/SRLRollback.h
  storage/falcon/SerialLogRecord.cpp
  storage/falcon/SerialLogRecord.h
  storage/falcon/Transaction.cpp

 2948 Kevin Lewis	2008-12-19
      Bug#41521 and Bug#41564.  
      Sometimes a call to  Thread::sleep(timeout, callersMutex)
      from SyncObject::wait() can wakeup before the lock has been 
      granted.  If it does this, the code was taking the thread 
      off the SyncObject queue and then calling the sleep again. 
      This time, the lock cannot be granted because the lock owner
      does not signal the waiting thread.  It is not queued.
      A lock wait timeout will then occur.
      The thread should only be taken off the queue if a lock wait 
      timeout really does occur.
modified:
  storage/falcon/SyncObject.cpp

=== modified file 'storage/falcon/Database.cpp'
--- a/storage/falcon/Database.cpp	2008-11-20 17:05:50 +0000
+++ b/storage/falcon/Database.cpp	2008-12-19 18:45:32 +0000
@@ -2251,16 +2251,6 @@ void Database::deleteRepositoryBlob(cons
 	repository->deleteBlob (volume, blobId, transaction);	
 }
 
-void Database::commit(Transaction *transaction)
-{
-	dbb->commit(transaction);
-}
-
-void Database::rollback(Transaction *transaction)
-{
-	dbb->rollback(transaction->transactionId, transaction->hasUpdates);
-}
-
 void Database::renameTable(Table* table, const char* newSchema, const char* newName)
 {
 	newSchema = getSymbol(newSchema);

=== modified file 'storage/falcon/Database.h'
--- a/storage/falcon/Database.h	2008-10-16 01:04:03 +0000
+++ b/storage/falcon/Database.h	2008-12-19 18:45:32 +0000
@@ -116,8 +116,6 @@ public:
 
 	void			shutdownNow();
 	void			dropDatabase();
-	void			rollback (Transaction *transaction);
-	void			commit (Transaction *transaction);
 	void			start();
 	void			deleteRepositoryBlob(const char *schema, const char *repositoryName, int volume, int64 blobId, Transaction *transaction);
 	void			deleteRepository (Repository *repository);

=== modified file 'storage/falcon/Dbb.cpp'
--- a/storage/falcon/Dbb.cpp	2008-10-30 00:22:54 +0000
+++ b/storage/falcon/Dbb.cpp	2008-12-19 18:45:32 +0000
@@ -1169,34 +1169,6 @@ void Dbb::reportStatistics()
 	priorFlushWrites = flushWrites;
 }
 
-void Dbb::commit(Transaction *transaction)
-{
-	if (transaction->hasUpdates)
-		serialLog->logControl->commit.append(transaction);
-}
-
-void Dbb::prepareTransaction(TransId transId, int xidLength, const UCHAR *xid)
-{
-	serialLog->logControl->prepare.append(transId, xidLength, xid);
-}
-
-void Dbb::rollback(TransId transId, bool updateTransaction)
-{
-	if (updateTransaction)
-		{
-		if (serialLog)
-			serialLog->logControl->rollback.append(transId, updateTransaction);
-		//flush();
-		}
-}
-
-/***
-void Dbb::setRecovering(bool flag)
-{
-	recovering = flag;
-}
-***/
-
 void Dbb::enableSerialLog()
 {
 	Bdb *bdb = fetchPage(HEADER_PAGE, PAGE_header, Exclusive);

=== modified file 'storage/falcon/Dbb.h'
--- a/storage/falcon/Dbb.h	2008-10-30 00:22:54 +0000
+++ b/storage/falcon/Dbb.h	2008-12-19 18:45:32 +0000
@@ -113,10 +113,7 @@ public:
 	void	createSection(int32 sectionId, TransId transId);
 	void	dropDatabase();
 	void	enableSerialLog();
-	void	rollback (TransId transId, bool updateTransaction);
 	void	updateRecord(int32 sectionId, int32 recordId, Stream *stream, TransId transId, bool earlyWrite);
-	void	prepareTransaction(TransId transId, int xidLength, const UCHAR *xid);
-	void	commit(Transaction *transaction);
 	void	reportStatistics();
 	bool	hasDirtyPages();
 	bool	deleteShadow (DatabaseCopy *shadow);

=== modified file 'storage/falcon/SRLCommit.cpp'
--- a/storage/falcon/SRLCommit.cpp	2007-10-04 20:45:34 +0000
+++ b/storage/falcon/SRLCommit.cpp	2008-12-19 18:45:32 +0000
@@ -46,18 +46,16 @@ void SRLCommit::append(Transaction *tran
 	transaction->addRef();
 	START_RECORD(srlCommit, "SRLCommit::append");
 	putInt(transaction->transactionId);
-	//uint64 commitBlockNumber = log->nextBlockNumber;
 	uint64 commitBlockNumber = log->getWriteBlockNumber();
 	SerialLogTransaction *srlTransaction = log->getTransaction(transaction->transactionId);
-	
+	srlTransaction->setTransaction(transaction);
+
+	// Flush transactions with changes immediately for durability
+
 	if (transaction->hasUpdates)
 		log->flush(false, commitBlockNumber, &sync);
 	else
 		sync.unlock();
-
-	srlTransaction->setTransaction(transaction);
-	srlTransaction->setState(sltCommitted);
-	wakeup();
 }
 
 void SRLCommit::read()

=== modified file 'storage/falcon/SRLPrepare.cpp'
--- a/storage/falcon/SRLPrepare.cpp	2008-07-24 08:45:03 +0000
+++ b/storage/falcon/SRLPrepare.cpp	2008-12-19 18:45:32 +0000
@@ -47,15 +47,12 @@ void SRLPrepare::append(TransId transId,
 	putInt(transId);
 	putInt(xidLength);
 	putData(xidLength, xid);
-	SerialLogTransaction *transaction = log->getTransaction(transId);
+	SerialLogTransaction *srlTransaction = log->getTransaction(transId);
 
 	log->flush(false, log->nextBlockNumber, &sync);
 
-	if (transaction)
-		transaction->setState(sltPrepared);
-
-	if (transaction)
-		wakeup();
+	if (srlTransaction)
+		srlTransaction->setState(sltPrepared);
 }
 
 void SRLPrepare::read()

=== modified file 'storage/falcon/SRLRollback.cpp'
--- a/storage/falcon/SRLRollback.cpp	2008-07-24 08:45:03 +0000
+++ b/storage/falcon/SRLRollback.cpp	2008-12-19 18:45:32 +0000
@@ -38,23 +38,16 @@ SRLRollback::~SRLRollback()
 
 }
 
-void SRLRollback::append(TransId transId, bool updateTransaction)
+void SRLRollback::append(Transaction *transaction)
 {
 	START_RECORD(srlRollback, "SRLRollback::append");
-	putInt(transId);
+	putInt(transaction->transactionId);
 	uint64 commitBlockNumber = log->nextBlockNumber;
-	SerialLogTransaction *transaction = log->findTransaction(transId);
 
-	if (updateTransaction)
+	if (transaction->hasUpdates)
 		log->flush(false, commitBlockNumber, &sync);
 	else
 		sync.unlock();
-	
-	if (transaction)
-		{
-		transaction->setState(sltRolledBack);
-		wakeup();
-		}
 }
 
 void SRLRollback::read()

=== modified file 'storage/falcon/SRLRollback.h'
--- a/storage/falcon/SRLRollback.h	2007-09-20 15:44:25 +0000
+++ b/storage/falcon/SRLRollback.h	2008-12-19 18:45:32 +0000
@@ -33,7 +33,7 @@ public:
 	virtual void print();
 	virtual void pass1();
 	virtual void read();
-	void append(TransId transId, bool updateTransaction);
+	void append(Transaction *transaction);
 	SRLRollback();
 	virtual ~SRLRollback();
 

=== modified file 'storage/falcon/SerialLogRecord.cpp'
--- a/storage/falcon/SerialLogRecord.cpp	2008-11-20 17:05:50 +0000
+++ b/storage/falcon/SerialLogRecord.cpp	2008-12-19 18:45:32 +0000
@@ -226,11 +226,6 @@ void SerialLogRecord::startRecord()
 	log->startRecord();
 }
 
-void SerialLogRecord::wakeup()
-{
-	log->wakeup();
-}
-
 void SerialLogRecord::putStream(Stream *stream)
 {
 	putInt(stream->totalLength);

=== modified file 'storage/falcon/SerialLogRecord.h'
--- a/storage/falcon/SerialLogRecord.h	2008-11-14 02:30:11 +0000
+++ b/storage/falcon/SerialLogRecord.h	2008-12-19 18:45:32 +0000
@@ -98,7 +98,6 @@ public:
 	int				getInt(const UCHAR** ptr);
 	const UCHAR*	getData(int32 length);
 	void			putStream (Stream *stream);
-	void			wakeup();
 	void			startRecord();
 	void			putData(uint32 length, const UCHAR *data);
 	void			putInt(int32 number);

=== modified file 'storage/falcon/SyncObject.cpp'
--- a/storage/falcon/SyncObject.cpp	2008-10-16 02:59:09 +0000
+++ b/storage/falcon/SyncObject.cpp	2008-12-19 18:30:49 +0000
@@ -638,16 +638,19 @@ void SyncObject::wait(LockType type, Thr
 				return;
 				}
 			
-			for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
-				if (*ptr == thread)
-					{
-					*ptr = thread->queue;
-					--waiters;
-					break;
-					}
-			
 			if (!wokeup)
 				{
+				// A timeout occured.
+				// Take this thread off the queue and throw an exception
+
+				for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
+					if (*ptr == thread)
+						{
+						*ptr = thread->queue;
+						--waiters;
+						break;
+						}
+
 				mutex.release();
 				timedout(timeout);
 				}

=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp	2008-12-16 20:40:38 +0000
+++ b/storage/falcon/Transaction.cpp	2008-12-19 18:45:32 +0000
@@ -37,6 +37,7 @@
 #include "TransactionManager.h"
 #include "SerialLog.h"
 #include "SerialLogControl.h"
+#include "SerialLogTransaction.h"
 #include "InfoTable.h"
 #include "Thread.h"
 #include "Format.h"
@@ -266,6 +267,11 @@ void Transaction::commit()
 
 	database->flushInversion(this);
 
+	// Write the commit message to the serial log for durability.
+	// If a crash happens after this, the recover will commit.
+
+	database->serialLog->logControl->commit.append(this);
+
 	// Transfer transaction from active list to committed list, set committed state
 
 	Sync syncActiveTransactions(&transactionManager->activeTransactions.syncObject, "Transaction::commit(2)");
@@ -293,8 +299,12 @@ void Transaction::commit()
 	syncActiveTransactions.unlock();
 	
 	syncIsActive.unlock(); // signal waiting transactions
+	
+	// signal a gopher to start processing this transaction
 
-	database->commit(this);
+	SerialLogTransaction *srlTransaction = database->serialLog->getTransaction(transactionId);
+	srlTransaction->setState(sltCommitted);
+	database->serialLog->wakeup();
 
 	delete [] xid;
 	xid = NULL;
@@ -411,12 +421,13 @@ void Transaction::rollback()
 
 	ASSERT(writePending);
 	writePending = false;
-	
+
 	if (hasUpdates)
+		{
 		database->serialLog->preCommit(this);
-		
-	database->rollback(this);
-	
+		database->serialLog->logControl->rollback.append(this);
+		}
+
 	if (xid)
 		{
 		delete [] xid;
@@ -433,6 +444,16 @@ void Transaction::rollback()
 	syncActiveTransactions.unlock();
 	state = RolledBack;
 	syncIsActive.unlock();
+
+	// Finish the SerialLogTransaction and signal a gopher
+
+	if (hasUpdates)
+		{
+		SerialLogTransaction *srlTransaction = database->serialLog->getTransaction(transactionId);
+		srlTransaction->setState(sltRolledBack);
+		database->serialLog->wakeup();
+		}
+
 	release();
 }
 
@@ -455,7 +476,10 @@ void Transaction::prepare(int xidLen, co
 		
 	database->pageWriter->waitForWrites(this);
 	state = Limbo;
-	database->dbb->prepareTransaction(transactionId, xidLength, xid);
+
+	// Flush a prepare record to the serial log
+
+	database->serialLog->logControl->prepare.append(transactionId, xidLength, xid);
 
 	Sync sync(&syncDeferredIndexes, "Transaction::prepare");
 	sync.lock(Shared);
@@ -1413,12 +1437,15 @@ void Transaction::getInfo(InfoTable* inf
 		}
 }
 
+// Called by the gopher thread to complete this transaction
+
 void Transaction::fullyCommitted(void)
 {
 	ASSERT(inList);
+	ASSERT(!isActive());
 
 	if (useCount < 2)
-		Log::debug("Transaction::fullyCommitted: funny use count\n");
+		Log::debug("Transaction::fullyCommitted: Unusual use count=%d\n", useCount);
 
 	writeComplete();
 	releaseCommittedTransaction();

Thread
bzr push into mysql-6.0-falcon-team branch (klewis:2948 to 2949) Bug#41194,Bug#41521, Bug#41564Kevin Lewis19 Dec