List:Commits« Previous MessageNext Message »
From:jonas Date:February 6 2008 9:13pm
Subject:bk commit into 5.1 tree (jonas:1.2850)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas.  When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-02-06 21:13:51+01:00, jonas@stripped +3 -0
  Transaction(s)
  
  This patchset aims to make transaction-handling *explicit*
  The transaction can have certain states
  An operation can certain states
  
  Note: This patch is dead-code, next patch will remove 
        iterator, and enable this code
  
  ---
   storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp |   24 +
   storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp           |  274 +++++++++++++-
   storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp           |   53 ++
   3 files changed, 336 insertions(+), 15 deletions(-)

  storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp@stripped, 2008-02-06
21:13:49+01:00, jonas@stripped +21 -3
    Transaction(s)
    
    This patchset aims to make transaction-handling *explicit*
    The transaction can have certain states
    An operation can certain states
    
    Note: This patch is dead-code, next patch will remove 
          iterator, and enable this code

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2008-02-06 21:13:49+01:00,
jonas@stripped +264 -10
    Transaction(s)
    
    This patchset aims to make transaction-handling *explicit*
    The transaction can have certain states
    An operation can certain states
    
    Note: This patch is dead-code, next patch will remove 
          iterator, and enable this code

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2008-02-06 21:13:49+01:00,
jonas@stripped +51 -2
    Transaction(s)
    
    This patchset aims to make transaction-handling *explicit*
    The transaction can have certain states
    An operation can certain states
    
    Note: This patch is dead-code, next patch will remove 
          iterator, and enable this code

diff -Nrup a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp	2008-01-05 13:38:13 +01:00
+++ b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp	2008-02-06 21:13:49 +01:00
@@ -20,7 +20,21 @@
 #include "SignalData.hpp"
 #include "GlobalSignalNumbers.h"
 
-struct SchemaTransImplReq {
+struct SchemaTransImplReq
+{
+  enum RequestType
+  {
+    RT_START         = 0x0,
+    RT_PARSE         = 0x1,
+    RT_PREPARE       = 0x2,
+    RT_ABORT_PARSE   = 0x3,
+    RT_ABORT_PREPARE = 0x4,
+    RT_COMMIT        = 0x5,
+
+    RT_COMPLETE      = 0x6,// Not yet used
+    RT_FLUSH_SCHEMA  = 0x7 // Not yet used
+  };
+
   STATIC_CONST( SignalLength = 9 );
   Uint32 senderRef;
   Uint32 transKey;
@@ -32,6 +46,8 @@ struct SchemaTransImplReq {
   Uint32 clientRef;
   Uint32 transId;
 
+  Uint32 requestType;
+
   // phaseInfo
   static Uint32 getMode(const Uint32& info) {
     return BitmaskImpl::getField(1, &info, 0, 8);
@@ -96,7 +112,8 @@ struct SchemaTransImplReq {
   }
 };
 
-struct SchemaTransImplConf {
+struct SchemaTransImplConf
+{
   enum {
     IT_REPEAT = (1 << 1)
   };
@@ -107,7 +124,8 @@ struct SchemaTransImplConf {
   Uint32 itFlags;
 };
 
-struct SchemaTransImplRef {
+struct SchemaTransImplRef
+{
   STATIC_CONST( SignalLength = 6 );
   STATIC_CONST( GSN = GSN_SCHEMA_TRANS_IMPL_REF );
   enum ErrorCode {
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-02-06 21:12:33 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-02-06 21:13:49 +01:00
@@ -19394,7 +19394,8 @@ Dbdict::handleClientReq(Signal* signal, 
   const OpInfo& info = getOpInfo(op_ptr);
   (this->*(info.m_parse))(signal, true, op_ptr, handle, error);
 
-  if (hasError(error)) {
+  if (hasError(error))
+  {
     jam();
     setError(op_ptr, error);
     setTransMode(trans_ptr, TransMode::Rollback, true);
@@ -19864,7 +19865,7 @@ Dbdict::runTransMaster(Signal* signal, S
      * This is commit moment...
      */
     jam();
-    trans_commit(signal, trans_ptr);
+    trans_commit_start(signal, trans_ptr);
     return;
   }
 
@@ -19875,11 +19876,67 @@ Dbdict::runTransMaster(Signal* signal, S
 }
 
 void
-Dbdict::trans_commit(Signal* signal, SchemaTransPtr trans_ptr)
+Dbdict::trans_prepare_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  trans_ptr.p->m_state = SchemaTrans::TS_PREPARING;
+}
+
+void
+Dbdict::trans_prepare_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_PREPARING);
+}
+
+void
+Dbdict::trans_prepare_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_PREPARING);
+}
+
+void
+Dbdict::trans_abort_parse_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  trans_ptr.p->m_state = SchemaTrans::TS_ABORTING_PARSE;
+}
+
+void
+Dbdict::trans_abort_parse_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PARSE);
+}
+
+void
+Dbdict::trans_abort_parse_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PARSE);
+}
+
+void
+Dbdict::trans_abort_prepare_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  trans_ptr.p->m_state = SchemaTrans::TS_ABORTING_PREPARE;
+}
+
+void
+Dbdict::trans_abort_prepare_next(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PREPARE);
+}
+
+void
+Dbdict::trans_abort_prepare_done(Signal* signal, SchemaTransPtr trans_ptr)
+{
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_ABORTING_PREPARE);
+}
+
+void
+Dbdict::trans_commit_start(Signal* signal, SchemaTransPtr trans_ptr)
 {
   jam();
   ndbout_c("trans_commit");
 
+  trans_ptr.p->m_state = SchemaTrans::TS_COMMITTING;
+
   Mutex mutex(signal, c_mutexMgr, trans_ptr.p->m_commit_mutex);
   Callback c = { safe_cast(&Dbdict::trans_commit_mutex_locked), trans_ptr.i };
 
@@ -19899,6 +19956,8 @@ Dbdict::trans_commit_mutex_locked(Signal
   SchemaTransPtr trans_ptr;
   c_schemaTransPool.getPtr(trans_ptr, transPtrI);
 
+  ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_COMMITTING);
+
   SectionHandle handle(this);
   handle.m_cnt = 0;
   sendTransReq(signal, trans_ptr, handle);
@@ -19910,7 +19969,6 @@ Dbdict::trans_commit_done(Signal* signal
   ndbout_c("trans_commit_done");
   Mutex mutex(signal, c_mutexMgr, trans_ptr.p->m_commit_mutex);
   Callback c = { safe_cast(&Dbdict::trans_commit_mutex_unlocked), trans_ptr.i };
-
   if (mutex.isNull())
   {
     /**
@@ -19982,12 +20040,202 @@ Dbdict::setTransMode(SchemaTransPtr tran
 void
 Dbdict::execSCHEMA_TRANS_IMPL_REQ(Signal* signal)
 {
+  jamEntry();
   if (!assembleFragments(signal)) {
     jam();
     return;
   }
-  jamEntry();
+
+  SchemaTransImplReq reqCopy =
+    *(const SchemaTransImplReq*)signal->getDataPtr();
+  const SchemaTransImplReq *req = &reqCopy;
+
+  Uint32 i = ~0;
+  if (i == SchemaTransImplReq::RT_START)
+  {
+    jam();
+    slave_run_start(signal, req);
+    return;
+  }
+
+  ErrorInfo error;
+  SchemaTransPtr trans_ptr;
+  const Uint32 trans_key = req->transKey;
+  if (!findSchemaTrans(trans_ptr, trans_key))
+  {
+    jam();
+    setError(error, SchemaTransImplRef::InvalidTransKey, __LINE__);
+    goto err;
+  }
+
+  /**
+   * Check *transaction* request
+   */
+  switch(i){
+  case SchemaTransImplReq::RT_PARSE:
+    jam();
+    slave_run_parse(signal, trans_ptr, req);
+    return;
+  case SchemaTransImplReq::RT_COMPLETE:
+    jam();
+    //slave_run_complete(signal, trans_ptr);
+    return;
+  case SchemaTransImplReq::RT_FLUSH_SCHEMA:
+    jam();
+    slave_flush_schema(signal, trans_ptr);
+    return;
+  default:
+    break;
+  }
+
+  SchemaOpPtr op_ptr;
+  if (!findSchemaOp(op_ptr, req->opKey))
+  {
+    jam();
+    // wl3600_todo better error no
+    setError(error, SchemaTransImplRef::InvalidTransKey, __LINE__);
+    goto err;
+  }
+
+  {
+    const OpInfo info = getOpInfo(op_ptr);
+    switch(i){
+    case SchemaTransImplReq::RT_START:
+    case SchemaTransImplReq::RT_PARSE:
+    case SchemaTransImplReq::RT_COMPLETE:
+    case SchemaTransImplReq::RT_FLUSH_SCHEMA:
+      ndbrequire(false); // handled above
+    case SchemaTransImplReq::RT_PREPARE:
+      jam();
+      (this->*(info.m_prepare))(signal, op_ptr);
+      return;
+    case SchemaTransImplReq::RT_ABORT_PARSE:
+      jam();
+      (this->*(info.m_abortParse))(signal, op_ptr);
+      return;
+    case SchemaTransImplReq::RT_ABORT_PREPARE:
+      jam();
+      (this->*(info.m_abortPrepare))(signal, op_ptr);
+      return;
+    case SchemaTransImplReq::RT_COMMIT:
+      jam();
+      (this->*(info.m_commit))(signal, op_ptr);
+      return;
+    }
+  }
+
   recvTransReq(signal);
+  return;
+err:
+  ndbrequire(false);
+}
+
+void
+Dbdict::slave_run_start(Signal *signal, const SchemaTransImplReq* req)
+{
+  ErrorInfo error;
+  SchemaTransPtr trans_ptr;
+  const Uint32 trans_key = req->transKey;
+  if (req->senderRef != reference())
+  {
+    jam();
+    if (!seizeSchemaTrans(trans_ptr, trans_key))
+    {
+      jam();
+      setError(error, SchemaTransImplRef::TooManySchemaTrans, __LINE__);
+      goto err;
+    }
+    trans_ptr.p->m_clientRef = req->clientRef;
+    trans_ptr.p->m_transId = req->transId;
+  }
+  else
+  {
+    jam();
+    ndbrequire(findSchemaTrans(trans_ptr, req->transKey));
+  }
+  sendTransConf(signal, trans_ptr);
+  return;
+
+err:
+  sendTransRef(signal, trans_ptr);
+}
+
+void
+Dbdict::slave_run_parse(Signal *signal,
+                        SchemaTransPtr trans_ptr,
+                        const SchemaTransImplReq* req)
+{
+  SchemaOpPtr op_ptr;
+  D("slave_run_parse");
+
+  const Uint32 op_key = req->opKey;
+  const Uint32 phaseInfo = req->phaseInfo;
+  const Uint32 gsn = SchemaTransImplReq::getGsn(phaseInfo);
+  const Uint32 requestInfo = req->requestInfo;
+  const OpInfo& info = *findOpInfo(gsn);
+
+  // signal data contains impl_req
+  const Uint32* src = signal->getDataPtr();
+  const Uint32 len = info.m_impl_req_length;
+
+  SectionHandle handle(this, signal);
+
+  ndbrequire(op_key != RNIL);
+  ErrorInfo error;
+  if (trans_ptr.p->m_isMaster)
+  {
+    jam();
+    // this branch does nothing but is convenient for signal pong
+
+    //XXX Check if op == last op in trans
+    findSchemaOp(op_ptr, op_key);
+    ndbrequire(!op_ptr.isNull());
+
+    OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
+    const Uint32* dst = oprec_ptr.p->m_impl_req_data;
+    ndbrequire(memcmp(dst, src, len << 2) == 0);
+  }
+  else
+  {
+    if (seizeSchemaOp(op_ptr, op_key, info))
+    {
+      jam();
+
+      DictSignal::addRequestExtra(op_ptr.p->m_requestInfo, requestInfo);
+      DictSignal::addRequestFlags(op_ptr.p->m_requestInfo, requestInfo);
+
+      OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
+      Uint32* dst = oprec_ptr.p->m_impl_req_data;
+      memcpy(dst, src, len << 2);
+
+      (this->*(info.m_parse))(signal, false, op_ptr, handle, error);
+      if (!hasError(error))
+      {
+        jam();
+        addSchemaOp(trans_ptr, op_ptr);
+      }
+    } else {
+      jam();
+      setError(error, SchemaTransImplRef::TooManySchemaOps, __LINE__);
+    }
+  }
+
+  // parse must consume but not release signal sections
+  releaseSections(handle);
+
+  if (hasError(error))
+  {
+    jam();
+    sendTransRef(signal, trans_ptr);
+    return;
+  }
+  sendTransConf(signal, op_ptr);
+}
+
+void
+Dbdict::slave_flush_schema(Signal *signal,
+			   SchemaTransPtr trans_ptr)
+{
 }
 
 void
@@ -20189,7 +20437,8 @@ Dbdict::recvTransParseReq(Signal* signal
 
   ndbrequire(op_key != RNIL);
   ErrorInfo error;
-  if (trans_ptr.p->m_isMaster) {
+  if (trans_ptr.p->m_isMaster)
+  {
     jam();
     // this branch does nothing but is convenient for signal pong
     findSchemaOp(op_ptr, op_key);
@@ -20198,8 +20447,11 @@ Dbdict::recvTransParseReq(Signal* signal
     OpRecPtr oprec_ptr = op_ptr.p->m_oprec_ptr;
     const Uint32* dst = oprec_ptr.p->m_impl_req_data;
     ndbrequire(memcmp(dst, src, len << 2) == 0);
-  } else {
-    if (seizeSchemaOp(op_ptr, op_key, info)) {
+  }
+  else
+  {
+    if (seizeSchemaOp(op_ptr, op_key, info))
+    {
       jam();
 
       DictSignal::addRequestExtra(op_ptr.p->m_requestInfo, requestInfo);
@@ -20212,7 +20464,8 @@ Dbdict::recvTransParseReq(Signal* signal
       memcpy(dst, src, len << 2);
 
       (this->*(info.m_parse))(signal, false, op_ptr, handle, error);
-      if (!hasError(error)) {
+      if (!hasError(error))
+      {
         jam();
         updateSchemaOpStep(trans_ptr, op_ptr);
       }
@@ -20225,7 +20478,8 @@ Dbdict::recvTransParseReq(Signal* signal
   // parse must consume but not release signal sections
   releaseSections(handle);
 
-  if (hasError(error)) {
+  if (hasError(error))
+  {
     jam();
     setError(trans_ptr, error);
     sendTransRef(signal, trans_ptr);
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-02-06 21:12:33 +01:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2008-02-06 21:13:49 +01:00
@@ -1356,7 +1356,8 @@ private:
 
   // OpRec
 
-  struct OpRec {
+  struct OpRec
+  {
     Uint32 nextPool;
 
     // reference to the static member in subclass
@@ -1426,6 +1427,21 @@ private:
   struct SchemaOp {
     Uint32 nextPool;
 
+    enum OpState
+    {
+      OS_PARSING          = 1,
+      OS_PARSED           = 2,
+      OS_PREPARING        = 3,
+      OS_PREPARED         = 4,
+      OS_ABORTING_PREPARE = 5,
+      OS_ABORTED_PREPARE  = 6,
+      OS_ABORTING_PARSE   = 7,
+      //OS_ABORTED_PARSE    = 8  // Not used, op released
+      OS_COMMITTING       = 9,
+      //OS_COMMITTED        = 10 // Not used, op released
+    };
+
+    Uint32 m_state;
     Uint32 op_key;
     Uint32 nextHash;
     Uint32 prevHash;
@@ -1775,6 +1791,22 @@ private:
     // ArrayPool
     Uint32 nextPool;
 
+    enum TransState
+    {
+      TS_STARTING         = 1, // Starting at participants
+      TS_STARTED          = 2, // Started (potentially with parsed ops)
+      TS_PARSING          = 3, // Parsing at participants
+      TS_SUBOP            = 4, // Creating subop
+      TS_ROLLBACK_SP      = 5, // Rolling back to SP (supported before prepare)
+      TS_PREPARING        = 6, // Preparing operations
+      TS_ABORTING_PREPARE = 7, // Aborting prepared operations
+      TS_ABORTING_PARSE   = 8, // Aborting parsed operations
+      TS_COMMITTING       = 9, // Committing
+      TS_COMMITTED        = 10,// Committed
+      TS_COMPLETING       = 11,// Completing
+    };
+
+    Uint32 m_state;
     // DLHashTable
     Uint32 trans_key;
     Uint32 nextHash;
@@ -1880,8 +1912,21 @@ private:
   void runTransMaster(Signal*, SchemaTransPtr);
   void setTransMode(SchemaTransPtr, TransMode::Value, bool hold);
 
-  void trans_commit(Signal*, SchemaTransPtr);
+  void trans_prepare_start(Signal*, SchemaTransPtr);
+  void trans_prepare_next(Signal*, SchemaTransPtr);
+  void trans_prepare_done(Signal*, SchemaTransPtr);
+
+  void trans_abort_prepare_start(Signal*, SchemaTransPtr);
+  void trans_abort_prepare_next(Signal*, SchemaTransPtr);
+  void trans_abort_prepare_done(Signal*, SchemaTransPtr);
+
+  void trans_abort_parse_start(Signal*, SchemaTransPtr);
+  void trans_abort_parse_next(Signal*, SchemaTransPtr);
+  void trans_abort_parse_done(Signal*, SchemaTransPtr);
+
+  void trans_commit_start(Signal*, SchemaTransPtr);
   void trans_commit_mutex_locked(Signal*, Uint32, Uint32);
+  void trans_commit_next(Signal*, SchemaTransPtr);
   void trans_commit_done(Signal* signal, SchemaTransPtr);
   void trans_commit_mutex_unlocked(Signal*, Uint32, Uint32);
 
@@ -1895,6 +1940,10 @@ private:
   void sendTransConf(Signal*, SchemaTransPtr, Uint32 itFlags = 0);
   void sendTransRef(Signal*, SchemaOpPtr);
   void sendTransRef(Signal*, SchemaTransPtr);
+
+  void slave_run_start(Signal*, const SchemaTransImplReq*);
+  void slave_run_parse(Signal*, SchemaTransPtr, const SchemaTransImplReq*);
+  void slave_flush_schema(Signal*, SchemaTransPtr);
 
   // reply to trans client for begin/end trans
   void sendTransClientReply(Signal*, SchemaTransPtr);
Thread
bk commit into 5.1 tree (jonas:1.2850)jonas6 Feb