Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-02-06 21:13:51+01:00, jonas@stripped +3 -0
Transaction(s)
This patchset aims to make transaction-handling *explicit*
The transaction can have certain states
An operation can certain states
Note: This patch is dead-code, next patch will remove
iterator, and enable this code
---
storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp | 24 +
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp | 274 +++++++++++++-
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp | 53 ++
3 files changed, 336 insertions(+), 15 deletions(-)
storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp@stripped, 2008-02-06
21:13:49+01:00, jonas@stripped +21 -3
Transaction(s)
This patchset aims to make transaction-handling *explicit*
The transaction can have certain states
An operation can certain states
Note: This patch is dead-code, next patch will remove
iterator, and enable this code
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2008-02-06 21:13:49+01:00,
jonas@stripped +264 -10
Transaction(s)
This patchset aims to make transaction-handling *explicit*
The transaction can have certain states
An operation can certain states
Note: This patch is dead-code, next patch will remove
iterator, and enable this code
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2008-02-06 21:13:49+01:00,
jonas@stripped +51 -2
Transaction(s)
This patchset aims to make transaction-handling *explicit*
The transaction can have certain states
An operation can certain states
Note: This patch is dead-code, next patch will remove
iterator, and enable this code
diff -Nrup a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp 2008-01-05 13:38:13 +01:00
+++ b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp 2008-02-06 21:13:49 +01:00
@@ -20,7 +20,21 @@
#include "SignalData.hpp"
#include "GlobalSignalNumbers.h"
-struct SchemaTransImplReq {
+struct SchemaTransImplReq
+{
+ enum RequestType
+ {
+ RT_START = 0x0,
+ RT_PARSE = 0x1,
+ RT_PREPARE = 0x2,
+ RT_ABORT_PARSE = 0x3,
+ RT_ABORT_PREPARE = 0x4,
+ RT_COMMIT = 0x5,
+
+ RT_COMPLETE = 0x6,// Not yet used
+ RT_FLUSH_SCHEMA = 0x7 // Not yet used
+ };
+
STATIC_CONST( SignalLength = 9 );
Uint32 senderRef;
Uint32 transKey;
@@ -32,6 +46,8 @@ struct SchemaTransImplReq {
Uint32 clientRef;
Uint32 transId;
+ Uint32 requestType;
+
// phaseInfo
static Uint32 getMode(const Uint32& info) {
return BitmaskImpl::getField(1, &info, 0, 8);
@@ -96,7 +112,8 @@ struct SchemaTransImplReq {
}
};
-struct SchemaTransImplConf {
+struct SchemaTransImplConf
+{
enum {
IT_REPEAT = (1 << 1)
};
@@ -107,7 +124,8 @@ struct SchemaTransImplConf {
Uint32 itFlags;
};
-struct SchemaTransImplRef {
+struct SchemaTransImplRef
+{
STATIC_CONST( SignalLength = 6 );
STATIC_CONST( GSN = GSN_SCHEMA_TRANS_IMPL_REF );
enum ErrorCode {
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-02-06 21:12:33 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-02-06 21:13:49 +01:00
@@ -19394,7 +19394,8 @@ Dbdict::handleClientReq(Signal* signal,
const OpInfo& info = getOpInfo(op_ptr);
(this->*(info.m_parse))(signal, true, op_ptr, handle, error);
- if (hasError(error)) {
+ if (hasError(error))
+ {
jam();
setError(op_ptr, error);
setTransMode(trans_ptr, TransMode::Rollback, true);
@@ -19864,7 +19865,7 @@ Dbdict::runTransMaster(Signal* signal, S
* This is commit moment...
*/
jam();
- trans_commit(signal, trans_ptr);
+ trans_commit_start(signal, trans_ptr);
return;
}
@@ -19875,11 +19876,67 @@ Dbdict::runTransMaster(Signal* signal, S
}
void
-Dbdict::trans_commit(Signal* signal, SchemaTransPtr trans_ptr)
+Dbdict::trans_prepare_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ trans_ptr.p->m_state = SchemaTrans::TS_PREPARING;
+}
+
+void
+Dbdict::trans_prepare_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_PREPARING);
+}
+
+void
+Dbdict::trans_prepare_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_PREPARING);
+}
+
+void
+Dbdict::trans_abort_parse_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ trans_ptr.p->m_state = SchemaTrans::TS_ABORTING_PARSE;
+}
+
+void
+Dbdict::trans_abort_parse_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PARSE);
+}
+
+void
+Dbdict::trans_abort_parse_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PARSE);
+}
+
+void
+Dbdict::trans_abort_prepare_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ trans_ptr.p->m_state = SchemaTrans::TS_ABORTING_PREPARE;
+}
+
+void
+Dbdict::trans_abort_prepare_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PREPARE);
+}
+
+void
+Dbdict::trans_abort_prepare_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PREPARE);
+}
+
+void
+Dbdict::trans_commit_start(Signal* signal, SchemaTransPtr trans_ptr)
{
jam();
ndbout_c("trans_commit");
+ trans_ptr.p->m_state = SchemaTrans::TS_COMMITTING;
+
Mutex mutex(signal, c_mutexMgr, trans_ptr.p->m_commit_mutex);
Callback c = { safe_cast(&Dbdict::trans_commit_mutex_locked), trans_ptr.i };
@@ -19899,6 +19956,8 @@ Dbdict::trans_commit_mutex_locked(Signal
SchemaTransPtr trans_ptr;
c_schemaTransPool.getPtr(trans_ptr, transPtrI);
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_COMMITTING);
+
SectionHandle handle(this);
handle.m_cnt = 0;
sendTransReq(signal, trans_ptr, handle);
@@ -19910,7 +19969,6 @@ Dbdict::trans_commit_done(Signal* signal
ndbout_c("trans_commit_done");
Mutex mutex(signal, c_mutexMgr, trans_ptr.p->m_commit_mutex);
Callback c = { safe_cast(&Dbdict::trans_commit_mutex_unlocked), trans_ptr.i };
-
if (mutex.isNull())
{
/**
@@ -19982,12 +20040,202 @@ Dbdict::setTransMode(SchemaTransPtr tran
void
Dbdict::execSCHEMA_TRANS_IMPL_REQ(Signal* signal)
{
+ jamEntry();
if (!assembleFragments(signal)) {
jam();
return;
}
- jamEntry();
+
+ SchemaTransImplReq reqCopy =
+ *(const SchemaTransImplReq*)signal->getDataPtr();
+ const SchemaTransImplReq *req = &reqCopy;
+
+ Uint32 i = ~0;
+ if (i == SchemaTransImplReq::RT_START)
+ {
+ jam();
+ slave_run_start(signal, req);
+ return;
+ }
+
+ ErrorInfo error;
+ SchemaTransPtr trans_ptr;
+ const Uint32 trans_key = req->transKey;
+ if (!findSchemaTrans(trans_ptr, trans_key))
+ {
+ jam();
+ setError(error, SchemaTransImplRef::InvalidTransKey, __LINE__);
+ goto err;
+ }
+
+ /**
+ * Check *transaction* request
+ */
+ switch(i){
+ case SchemaTransImplReq::RT_PARSE:
+ jam();
+ slave_run_parse(signal, trans_ptr, req);
+ return;
+ case SchemaTransImplReq::RT_COMPLETE:
+ jam();
+ //slave_run_complete(signal, trans_ptr);
+ return;
+ case SchemaTransImplReq::RT_FLUSH_SCHEMA:
+ jam();
+ slave_flush_schema(signal, trans_ptr);
+ return;
+ default:
+ break;
+ }
+
+ SchemaOpPtr op_ptr;
+ if (!findSchemaOp(op_ptr, req->opKey))
+ {
+ jam();
+ // wl3600_todo better error no
+ setError(error, SchemaTransImplRef::InvalidTransKey, __LINE__);
+ goto err;
+ }
+
+ {
+ const OpInfo info = getOpInfo(op_ptr);
+ switch(i){
+ case SchemaTransImplReq::RT_START:
+ case SchemaTransImplReq::RT_PARSE:
+ case SchemaTransImplReq::RT_COMPLETE:
+ case SchemaTransImplReq::RT_FLUSH_SCHEMA:
+ ndbrequire(false); // handled above
+ case SchemaTransImplReq::RT_PREPARE:
+ jam();
+ (this->*(info.m_prepare))(signal, op_ptr);
+ return;
+ case SchemaTransImplReq::RT_ABORT_PARSE:
+ jam();
+ (this->*(info.m_abortParse))(signal, op_ptr);
+ return;
+ case SchemaTransImplReq::RT_ABORT_PREPARE:
+ jam();
+ (this->*(info.m_abortPrepare))(signal, op_ptr);
+ return;
+ case SchemaTransImplReq::RT_COMMIT:
+ jam();
+ (this->*(info.m_commit))(signal, op_ptr);
+ return;
+ }
+ }
+
recvTransReq(signal);
+ return;
+err:
+ ndbrequire(false);
+}
+
+void
+Dbdict::slave_run_start(Signal *signal, const SchemaTransImplReq* req)
+{
+ ErrorInfo error;
+ SchemaTransPtr trans_ptr;
+ const Uint32 trans_key = req->transKey;
+ if (req->senderRef != reference())
+ {
+ jam();
+ if (!seizeSchemaTrans(trans_ptr, trans_key))
+ {
+ jam();
+ setError(error, SchemaTransImplRef::TooManySchemaTrans, __LINE__);
+ goto err;
+ }
+ trans_ptr.p->m_clientRef = req->clientRef;
+ trans_ptr.p->m_transId = req->transId;
+ }
+ else
+ {
+ jam();
+ ndbrequire(findSchemaTrans(trans_ptr, req->transKey));
+ }
+ sendTransConf(signal, trans_ptr);
+ return;
+
+err:
+ sendTransRef(signal, trans_ptr);
+}
+
+void
+Dbdict::slave_run_parse(Signal *signal,
+ SchemaTransPtr trans_ptr,
+ const SchemaTransImplReq* req)
+{
+ SchemaOpPtr op_ptr;
+ D("slave_run_parse");
+
+ const Uint32 op_key = req->opKey;
+ const Uint32 phaseInfo = req->phaseInfo;
+ const Uint32 gsn = SchemaTransImplReq::getGsn(phaseInfo);
+ const Uint32 requestInfo = req->requestInfo;
+ const OpInfo& info = *findOpInfo(gsn);
+
+ // signal data contains impl_req
+ const Uint32* src = signal->getDataPtr();
+ const Uint32 len = info.m_impl_req_length;
+
+ SectionHandle handle(this, signal);
+
+ ndbrequire(op_key != RNIL);
+ ErrorInfo error;
+ if (trans_ptr.p->m_isMaster)
+ {
+ jam();
+ // this branch does nothing but is convenient for signal pong
+
+ //XXX Check if op == last op in trans
+ findSchemaOp(op_ptr, op_key);
+ ndbrequire(!op_ptr.isNull());
+
+ OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
+ const Uint32* dst = oprec_ptr.p->m_impl_req_data;
+ ndbrequire(memcmp(dst, src, len << 2) == 0);
+ }
+ else
+ {
+ if (seizeSchemaOp(op_ptr, op_key, info))
+ {
+ jam();
+
+ DictSignal::addRequestExtra(op_ptr.p->m_requestInfo, requestInfo);
+ DictSignal::addRequestFlags(op_ptr.p->m_requestInfo, requestInfo);
+
+ OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
+ Uint32* dst = oprec_ptr.p->m_impl_req_data;
+ memcpy(dst, src, len << 2);
+
+ (this->*(info.m_parse))(signal, false, op_ptr, handle, error);
+ if (!hasError(error))
+ {
+ jam();
+ addSchemaOp(trans_ptr, op_ptr);
+ }
+ } else {
+ jam();
+ setError(error, SchemaTransImplRef::TooManySchemaOps, __LINE__);
+ }
+ }
+
+ // parse must consume but not release signal sections
+ releaseSections(handle);
+
+ if (hasError(error))
+ {
+ jam();
+ sendTransRef(signal, trans_ptr);
+ return;
+ }
+ sendTransConf(signal, op_ptr);
+}
+
+void
+Dbdict::slave_flush_schema(Signal *signal,
+ SchemaTransPtr trans_ptr)
+{
}
void
@@ -20189,7 +20437,8 @@ Dbdict::recvTransParseReq(Signal* signal
ndbrequire(op_key != RNIL);
ErrorInfo error;
- if (trans_ptr.p->m_isMaster) {
+ if (trans_ptr.p->m_isMaster)
+ {
jam();
// this branch does nothing but is convenient for signal pong
findSchemaOp(op_ptr, op_key);
@@ -20198,8 +20447,11 @@ Dbdict::recvTransParseReq(Signal* signal
OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
const Uint32* dst = oprec_ptr.p->m_impl_req_data;
ndbrequire(memcmp(dst, src, len << 2) == 0);
- } else {
- if (seizeSchemaOp(op_ptr, op_key, info)) {
+ }
+ else
+ {
+ if (seizeSchemaOp(op_ptr, op_key, info))
+ {
jam();
DictSignal::addRequestExtra(op_ptr.p->m_requestInfo, requestInfo);
@@ -20212,7 +20464,8 @@ Dbdict::recvTransParseReq(Signal* signal
memcpy(dst, src, len << 2);
(this->*(info.m_parse))(signal, false, op_ptr, handle, error);
- if (!hasError(error)) {
+ if (!hasError(error))
+ {
jam();
updateSchemaOpStep(trans_ptr, op_ptr);
}
@@ -20225,7 +20478,8 @@ Dbdict::recvTransParseReq(Signal* signal
// parse must consume but not release signal sections
releaseSections(handle);
- if (hasError(error)) {
+ if (hasError(error))
+ {
jam();
setError(trans_ptr, error);
sendTransRef(signal, trans_ptr);
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-02-06 21:12:33 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-02-06 21:13:49 +01:00
@@ -1356,7 +1356,8 @@ private:
// OpRec
- struct OpRec {
+ struct OpRec
+ {
Uint32 nextPool;
// reference to the static member in subclass
@@ -1426,6 +1427,21 @@ private:
struct SchemaOp {
Uint32 nextPool;
+ enum OpState
+ {
+ OS_PARSING = 1,
+ OS_PARSED = 2,
+ OS_PREPARING = 3,
+ OS_PREPARED = 4,
+ OS_ABORTING_PREPARE = 5,
+ OS_ABORTED_PREPARE = 6,
+ OS_ABORTING_PARSE = 7,
+ //OS_ABORTED_PARSE = 8 // Not used, op released
+ OS_COMMITTING = 9,
+ //OS_COMMITTED = 10 // Not used, op released
+ };
+
+ Uint32 m_state;
Uint32 op_key;
Uint32 nextHash;
Uint32 prevHash;
@@ -1775,6 +1791,22 @@ private:
// ArrayPool
Uint32 nextPool;
+ enum TransState
+ {
+ TS_STARTING = 1, // Starting at participants
+ TS_STARTED = 2, // Started (potentially with parsed ops)
+ TS_PARSING = 3, // Parsing at participants
+ TS_SUBOP = 4, // Creating subop
+ TS_ROLLBACK_SP = 5, // Rolling back to SP (supported before prepare)
+ TS_PREPARING = 6, // Preparing operations
+ TS_ABORTING_PREPARE = 7, // Aborting prepared operations
+ TS_ABORTING_PARSE = 8, // Aborting parsed operations
+ TS_COMMITTING = 9, // Committing
+ TS_COMMITTED = 10,// Committed
+ TS_COMPLETING = 11,// Completing
+ };
+
+ Uint32 m_state;
// DLHashTable
Uint32 trans_key;
Uint32 nextHash;
@@ -1880,8 +1912,21 @@ private:
void runTransMaster(Signal*, SchemaTransPtr);
void setTransMode(SchemaTransPtr, TransMode::Value, bool hold);
- void trans_commit(Signal*, SchemaTransPtr);
+ void trans_prepare_start(Signal*, SchemaTransPtr);
+ void trans_prepare_next(Signal*, SchemaTransPtr);
+ void trans_prepare_done(Signal*, SchemaTransPtr);
+
+ void trans_abort_prepare_start(Signal*, SchemaTransPtr);
+ void trans_abort_prepare_next(Signal*, SchemaTransPtr);
+ void trans_abort_prepare_done(Signal*, SchemaTransPtr);
+
+ void trans_abort_parse_start(Signal*, SchemaTransPtr);
+ void trans_abort_parse_next(Signal*, SchemaTransPtr);
+ void trans_abort_parse_done(Signal*, SchemaTransPtr);
+
+ void trans_commit_start(Signal*, SchemaTransPtr);
void trans_commit_mutex_locked(Signal*, Uint32, Uint32);
+ void trans_commit_next(Signal*, SchemaTransPtr);
void trans_commit_done(Signal* signal, SchemaTransPtr);
void trans_commit_mutex_unlocked(Signal*, Uint32, Uint32);
@@ -1895,6 +1940,10 @@ private:
void sendTransConf(Signal*, SchemaTransPtr, Uint32 itFlags = 0);
void sendTransRef(Signal*, SchemaOpPtr);
void sendTransRef(Signal*, SchemaTransPtr);
+
+ void slave_run_start(Signal*, const SchemaTransImplReq*);
+ void slave_run_parse(Signal*, SchemaTransPtr, const SchemaTransImplReq*);
+ void slave_flush_schema(Signal*, SchemaTransPtr);
// reply to trans client for begin/end trans
void sendTransClientReply(Signal*, SchemaTransPtr);
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2850) | jonas | 6 Feb |