2949 Kevin Lewis 2008-12-19
Bug#41194 - The Transaction knows about the serial log.
Let it make calls to SerialLogRecord functions directly.
Separate the serial log flush of commit and rollback records
from when we allow the gophers to start processing them.
Once those records are flushed, the recovery will be able to
process them, but we do not want the gophers to do that until
the commit or rollback is fully finished.
There is no need for the prepare flush to also start a gopher.
modified:
storage/falcon/Database.cpp
storage/falcon/Database.h
storage/falcon/Dbb.cpp
storage/falcon/Dbb.h
storage/falcon/SRLCommit.cpp
storage/falcon/SRLPrepare.cpp
storage/falcon/SRLRollback.cpp
storage/falcon/SRLRollback.h
storage/falcon/SerialLogRecord.cpp
storage/falcon/SerialLogRecord.h
storage/falcon/Transaction.cpp
2948 Kevin Lewis 2008-12-19
Bug#41521 and Bug#41564.
Sometimes a call to Thread::sleep(timeout, callersMutex)
from SyncObject::wait() can wakeup before the lock has been
granted. If it does this, the code was taking the thread
off the SyncObject queue and then calling the sleep again.
This time, the lock cannot be granted because the lock owner
does not signal the waiting thread. It is not queued.
A lock wait timeout will then occur.
The thread should only be taken off the queue if a lock wait
timeout really does occur.
modified:
storage/falcon/SyncObject.cpp
=== modified file 'storage/falcon/Database.cpp'
--- a/storage/falcon/Database.cpp 2008-11-20 17:05:50 +0000
+++ b/storage/falcon/Database.cpp 2008-12-19 18:45:32 +0000
@@ -2251,16 +2251,6 @@ void Database::deleteRepositoryBlob(cons
repository->deleteBlob (volume, blobId, transaction);
}
-void Database::commit(Transaction *transaction)
-{
- dbb->commit(transaction);
-}
-
-void Database::rollback(Transaction *transaction)
-{
- dbb->rollback(transaction->transactionId, transaction->hasUpdates);
-}
-
void Database::renameTable(Table* table, const char* newSchema, const char* newName)
{
newSchema = getSymbol(newSchema);
=== modified file 'storage/falcon/Database.h'
--- a/storage/falcon/Database.h 2008-10-16 01:04:03 +0000
+++ b/storage/falcon/Database.h 2008-12-19 18:45:32 +0000
@@ -116,8 +116,6 @@ public:
void shutdownNow();
void dropDatabase();
- void rollback (Transaction *transaction);
- void commit (Transaction *transaction);
void start();
void deleteRepositoryBlob(const char *schema, const char *repositoryName, int volume, int64 blobId, Transaction *transaction);
void deleteRepository (Repository *repository);
=== modified file 'storage/falcon/Dbb.cpp'
--- a/storage/falcon/Dbb.cpp 2008-10-30 00:22:54 +0000
+++ b/storage/falcon/Dbb.cpp 2008-12-19 18:45:32 +0000
@@ -1169,34 +1169,6 @@ void Dbb::reportStatistics()
priorFlushWrites = flushWrites;
}
-void Dbb::commit(Transaction *transaction)
-{
- if (transaction->hasUpdates)
- serialLog->logControl->commit.append(transaction);
-}
-
-void Dbb::prepareTransaction(TransId transId, int xidLength, const UCHAR *xid)
-{
- serialLog->logControl->prepare.append(transId, xidLength, xid);
-}
-
-void Dbb::rollback(TransId transId, bool updateTransaction)
-{
- if (updateTransaction)
- {
- if (serialLog)
- serialLog->logControl->rollback.append(transId, updateTransaction);
- //flush();
- }
-}
-
-/***
-void Dbb::setRecovering(bool flag)
-{
- recovering = flag;
-}
-***/
-
void Dbb::enableSerialLog()
{
Bdb *bdb = fetchPage(HEADER_PAGE, PAGE_header, Exclusive);
=== modified file 'storage/falcon/Dbb.h'
--- a/storage/falcon/Dbb.h 2008-10-30 00:22:54 +0000
+++ b/storage/falcon/Dbb.h 2008-12-19 18:45:32 +0000
@@ -113,10 +113,7 @@ public:
void createSection(int32 sectionId, TransId transId);
void dropDatabase();
void enableSerialLog();
- void rollback (TransId transId, bool updateTransaction);
void updateRecord(int32 sectionId, int32 recordId, Stream *stream, TransId transId, bool earlyWrite);
- void prepareTransaction(TransId transId, int xidLength, const UCHAR *xid);
- void commit(Transaction *transaction);
void reportStatistics();
bool hasDirtyPages();
bool deleteShadow (DatabaseCopy *shadow);
=== modified file 'storage/falcon/SRLCommit.cpp'
--- a/storage/falcon/SRLCommit.cpp 2007-10-04 20:45:34 +0000
+++ b/storage/falcon/SRLCommit.cpp 2008-12-19 18:45:32 +0000
@@ -46,18 +46,16 @@ void SRLCommit::append(Transaction *tran
transaction->addRef();
START_RECORD(srlCommit, "SRLCommit::append");
putInt(transaction->transactionId);
- //uint64 commitBlockNumber = log->nextBlockNumber;
uint64 commitBlockNumber = log->getWriteBlockNumber();
SerialLogTransaction *srlTransaction = log->getTransaction(transaction->transactionId);
-
+ srlTransaction->setTransaction(transaction);
+
+ // Flush transactions with changes immediately for durability
+
if (transaction->hasUpdates)
log->flush(false, commitBlockNumber, &sync);
else
sync.unlock();
-
- srlTransaction->setTransaction(transaction);
- srlTransaction->setState(sltCommitted);
- wakeup();
}
void SRLCommit::read()
=== modified file 'storage/falcon/SRLPrepare.cpp'
--- a/storage/falcon/SRLPrepare.cpp 2008-07-24 08:45:03 +0000
+++ b/storage/falcon/SRLPrepare.cpp 2008-12-19 18:45:32 +0000
@@ -47,15 +47,12 @@ void SRLPrepare::append(TransId transId,
putInt(transId);
putInt(xidLength);
putData(xidLength, xid);
- SerialLogTransaction *transaction = log->getTransaction(transId);
+ SerialLogTransaction *srlTransaction = log->getTransaction(transId);
log->flush(false, log->nextBlockNumber, &sync);
- if (transaction)
- transaction->setState(sltPrepared);
-
- if (transaction)
- wakeup();
+ if (srlTransaction)
+ srlTransaction->setState(sltPrepared);
}
void SRLPrepare::read()
=== modified file 'storage/falcon/SRLRollback.cpp'
--- a/storage/falcon/SRLRollback.cpp 2008-07-24 08:45:03 +0000
+++ b/storage/falcon/SRLRollback.cpp 2008-12-19 18:45:32 +0000
@@ -38,23 +38,16 @@ SRLRollback::~SRLRollback()
}
-void SRLRollback::append(TransId transId, bool updateTransaction)
+void SRLRollback::append(Transaction *transaction)
{
START_RECORD(srlRollback, "SRLRollback::append");
- putInt(transId);
+ putInt(transaction->transactionId);
uint64 commitBlockNumber = log->nextBlockNumber;
- SerialLogTransaction *transaction = log->findTransaction(transId);
- if (updateTransaction)
+ if (transaction->hasUpdates)
log->flush(false, commitBlockNumber, &sync);
else
sync.unlock();
-
- if (transaction)
- {
- transaction->setState(sltRolledBack);
- wakeup();
- }
}
void SRLRollback::read()
=== modified file 'storage/falcon/SRLRollback.h'
--- a/storage/falcon/SRLRollback.h 2007-09-20 15:44:25 +0000
+++ b/storage/falcon/SRLRollback.h 2008-12-19 18:45:32 +0000
@@ -33,7 +33,7 @@ public:
virtual void print();
virtual void pass1();
virtual void read();
- void append(TransId transId, bool updateTransaction);
+ void append(Transaction *transaction);
SRLRollback();
virtual ~SRLRollback();
=== modified file 'storage/falcon/SerialLogRecord.cpp'
--- a/storage/falcon/SerialLogRecord.cpp 2008-11-20 17:05:50 +0000
+++ b/storage/falcon/SerialLogRecord.cpp 2008-12-19 18:45:32 +0000
@@ -226,11 +226,6 @@ void SerialLogRecord::startRecord()
log->startRecord();
}
-void SerialLogRecord::wakeup()
-{
- log->wakeup();
-}
-
void SerialLogRecord::putStream(Stream *stream)
{
putInt(stream->totalLength);
=== modified file 'storage/falcon/SerialLogRecord.h'
--- a/storage/falcon/SerialLogRecord.h 2008-11-14 02:30:11 +0000
+++ b/storage/falcon/SerialLogRecord.h 2008-12-19 18:45:32 +0000
@@ -98,7 +98,6 @@ public:
int getInt(const UCHAR** ptr);
const UCHAR* getData(int32 length);
void putStream (Stream *stream);
- void wakeup();
void startRecord();
void putData(uint32 length, const UCHAR *data);
void putInt(int32 number);
=== modified file 'storage/falcon/SyncObject.cpp'
--- a/storage/falcon/SyncObject.cpp 2008-10-16 02:59:09 +0000
+++ b/storage/falcon/SyncObject.cpp 2008-12-19 18:30:49 +0000
@@ -638,16 +638,19 @@ void SyncObject::wait(LockType type, Thr
return;
}
- for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
- if (*ptr == thread)
- {
- *ptr = thread->queue;
- --waiters;
- break;
- }
-
if (!wokeup)
{
+ // A timeout occured.
+ // Take this thread off the queue and throw an exception
+
+ for (ptr = &queue; *ptr; ptr = &(*ptr)->queue)
+ if (*ptr == thread)
+ {
+ *ptr = thread->queue;
+ --waiters;
+ break;
+ }
+
mutex.release();
timedout(timeout);
}
=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp 2008-12-16 20:40:38 +0000
+++ b/storage/falcon/Transaction.cpp 2008-12-19 18:45:32 +0000
@@ -37,6 +37,7 @@
#include "TransactionManager.h"
#include "SerialLog.h"
#include "SerialLogControl.h"
+#include "SerialLogTransaction.h"
#include "InfoTable.h"
#include "Thread.h"
#include "Format.h"
@@ -266,6 +267,11 @@ void Transaction::commit()
database->flushInversion(this);
+ // Write the commit message to the serial log for durability.
+ // If a crash happens after this, the recover will commit.
+
+ database->serialLog->logControl->commit.append(this);
+
// Transfer transaction from active list to committed list, set committed state
Sync syncActiveTransactions(&transactionManager->activeTransactions.syncObject, "Transaction::commit(2)");
@@ -293,8 +299,12 @@ void Transaction::commit()
syncActiveTransactions.unlock();
syncIsActive.unlock(); // signal waiting transactions
+
+ // signal a gopher to start processing this transaction
- database->commit(this);
+ SerialLogTransaction *srlTransaction = database->serialLog->getTransaction(transactionId);
+ srlTransaction->setState(sltCommitted);
+ database->serialLog->wakeup();
delete [] xid;
xid = NULL;
@@ -411,12 +421,13 @@ void Transaction::rollback()
ASSERT(writePending);
writePending = false;
-
+
if (hasUpdates)
+ {
database->serialLog->preCommit(this);
-
- database->rollback(this);
-
+ database->serialLog->logControl->rollback.append(this);
+ }
+
if (xid)
{
delete [] xid;
@@ -433,6 +444,16 @@ void Transaction::rollback()
syncActiveTransactions.unlock();
state = RolledBack;
syncIsActive.unlock();
+
+ // Finish the SerialLogTransaction and signal a gopher
+
+ if (hasUpdates)
+ {
+ SerialLogTransaction *srlTransaction = database->serialLog->getTransaction(transactionId);
+ srlTransaction->setState(sltRolledBack);
+ database->serialLog->wakeup();
+ }
+
release();
}
@@ -455,7 +476,10 @@ void Transaction::prepare(int xidLen, co
database->pageWriter->waitForWrites(this);
state = Limbo;
- database->dbb->prepareTransaction(transactionId, xidLength, xid);
+
+ // Flush a prepare record to the serial log
+
+ database->serialLog->logControl->prepare.append(transactionId, xidLength, xid);
Sync sync(&syncDeferredIndexes, "Transaction::prepare");
sync.lock(Shared);
@@ -1413,12 +1437,15 @@ void Transaction::getInfo(InfoTable* inf
}
}
+// Called by the gopher thread to complete this transaction
+
void Transaction::fullyCommitted(void)
{
ASSERT(inList);
+ ASSERT(!isActive());
if (useCount < 2)
- Log::debug("Transaction::fullyCommitted: funny use count\n");
+ Log::debug("Transaction::fullyCommitted: Unusual use count=%d\n", useCount);
writeComplete();
releaseCommittedTransaction();
| Thread |
|---|
| • bzr push into mysql-6.0-falcon-team branch (klewis:2948 to 2949) Bug#41194,Bug#41521, Bug#41564 | Kevin Lewis | 19 Dec |