List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:November 16 2008 12:28pm
Subject:bzr commit into mysql-5.1 branch (pekka:3087) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 3087 Pekka Nousiainen	2008-11-16
      wl#4391 27a_ddcb.diff
      DD - callback via signal: implementation
added:
  storage/ndb/include/kernel/signaldata/CallbackSignal.hpp
modified:
  storage/ndb/include/kernel/GlobalSignalNumbers.h
  storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
  storage/ndb/src/kernel/vm/SafeMutex.hpp
  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
  storage/ndb/src/kernel/vm/SimulatedBlock.hpp

=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2008-11-16 12:28:20 +0000
@@ -943,11 +943,6 @@ extern const GlobalSignalNumber NO_OF_SI
 /* used 695 */
 /* used 696 */
 
-#define GSN_707
-#define GSN_708
-#define GSN_709
-
-
 /*
  * EVENT Signals
  */
@@ -1069,4 +1064,8 @@ extern const GlobalSignalNumber NO_OF_SI
 
 #define GSN_DATA_FILE_ORD               706
 
+#define GSN_CALLBACK_REQ                707 /*reserved*/
+#define GSN_CALLBACK_CONF               708
+#define GSN_CALLBACK_ACK                709
+
 #endif

=== added file 'storage/ndb/include/kernel/signaldata/CallbackSignal.hpp'
--- a/storage/ndb/include/kernel/signaldata/CallbackSignal.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/kernel/signaldata/CallbackSignal.hpp	2008-11-16 12:28:20 +0000
@@ -0,0 +1,35 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef CALLBACK_SIGNAL_HPP
+#define CALLBACK_SIGNAL_HPP
+
+#include "SignalData.hpp"
+
+struct CallbackConf {
+  STATIC_CONST( SignalLength = 5 );
+  Uint32 senderData;
+  Uint32 senderRef;
+  Uint32 callbackIndex;
+  Uint32 callbackData;
+  Uint32 returnCode;
+};
+
+struct CallbackAck {
+  STATIC_CONST( SignalLength = 1 );
+  Uint32 senderData;
+};
+
+#endif

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2008-11-15 16:01:37 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	2008-11-16 12:28:20 +0000
@@ -731,6 +731,10 @@ const GsnName SignalNames [] = {
 
   ,{ GSN_DATA_FILE_ORD, "DATA_FILE_ORD" }
 
+  ,{ GSN_CALLBACK_REQ, "CALLBACK_REQ" }
+  ,{ GSN_CALLBACK_CONF, "CALLBACK_CONF" }
+  ,{ GSN_CALLBACK_ACK, "CALLBACK_ACK" }
+
 
 };
 const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);

=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-11-16 09:15:35 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp	2008-11-16 12:28:20 +0000
@@ -16,7 +16,12 @@
 #ifndef NDB_SAFE_MUTEX_HPP
 #define NDB_SAFE_MUTEX_HPP
 
+#ifdef __WIN__
+#include <ndb_global.h>
+#include <my_pthread.h>
+#else
 #include <pthread.h>
+#endif
 #include <assert.h>
 #include <ndb_types.h>
 #include <NdbOut.hpp>

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-11-15 15:43:59 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2008-11-16 12:28:20 +0000
@@ -48,6 +48,7 @@
 #include <NdbSqlUtil.hpp>
 
 #include "../blocks/dbdih/Dbdih.hpp"
+#include <signaldata/CallbackSignal.hpp>
 #include "LongSignalImpl.hpp"
 
 #include <EventLogger.hpp>
@@ -121,6 +122,8 @@ SimulatedBlock::SimulatedBlock(BlockNumb
 
   installSimulatedBlockFunctions();
 
+  m_callbackTableAddr = 0;
+
   CLEAR_ERROR_INSERT_VALUE;
 
 #ifdef VM_TRACE
@@ -200,6 +203,7 @@ SimulatedBlock::installSimulatedBlockFun
   a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
   a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
   a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
+  a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
 }
 
 void
@@ -1824,6 +1828,93 @@ SimulatedBlock::execSEND_PACKED(Signal* 
 {
 }
 
+// MT LQH callback CONF via signal
+
+const SimulatedBlock::CallbackEntry&
+SimulatedBlock::getCallbackEntry(Uint32 ci)
+{
+  ndbrequire(m_callbackTableAddr != 0);
+  const CallbackTable& ct = *m_callbackTableAddr;
+  ndbrequire(ci < ct.m_count);
+  return ct.m_entry[ci];
+}
+
+void
+SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
+                                 CallbackPtr& cptr, Uint32 returnCode)
+{
+  Uint32 blockNo = blockToMain(fullBlockNo);
+  Uint32 instanceNo = blockToInstance(fullBlockNo);
+  SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
+  ndbrequire(b != 0);
+
+  const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
+
+  // wl4391_todo add as arg if this is not enough
+  Uint32 senderData = returnCode;
+
+  if (!isNdbMtLqh()) {
+    Callback c;
+    c.m_callbackFunction = ce.m_function;
+    c.m_callbackData = cptr.m_callbackData;
+    b->execute(signal, c, returnCode);
+
+    if (ce.m_flags & CALLBACK_ACK) {
+      jam();
+      CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
+      ack->senderData = senderData;
+      EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
+                     signal, CallbackAck::SignalLength);
+    }
+  } else {
+    CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
+    conf->senderData = senderData;
+    conf->senderRef = reference();
+    conf->callbackIndex = cptr.m_callbackIndex;
+    conf->callbackData = cptr.m_callbackData;
+    conf->returnCode = returnCode;
+
+    if (ce.m_flags & CALLBACK_DIRECT) {
+      jam();
+      EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
+                     signal, CallbackConf::SignalLength, instanceNo);
+    } else {
+      jam();
+      BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
+      sendSignal(ref, GSN_CALLBACK_CONF,
+                 signal, CallbackConf::SignalLength, JBB);
+    }
+  }
+  cptr.m_callbackIndex = ZNIL;
+}
+
+void
+SimulatedBlock::execCALLBACK_CONF(Signal* signal)
+{
+  const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
+
+  Uint32 senderData = conf->senderData;
+  Uint32 senderRef = conf->senderRef;
+
+  ndbrequire(m_callbackTableAddr != 0);
+  const CallbackTable& ct = *m_callbackTableAddr;
+  const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
+  CallbackFunction function = ce.m_function;
+
+  Callback callback;
+  callback.m_callbackFunction = function;
+  callback.m_callbackData = conf->callbackData;
+  execute(signal, callback, conf->returnCode);
+
+  if (ce.m_flags & CALLBACK_ACK) {
+    jam();
+    CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
+    ack->senderData = senderData;
+    sendSignal(senderRef, GSN_CALLBACK_ACK,
+               signal, CallbackAck::SignalLength, JBB);
+  }
+}
+
 #ifdef VM_TRACE_TIME
 void
 SimulatedBlock::clearTimes() {

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-16 09:16:35 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2008-11-16 12:28:20 +0000
@@ -720,6 +720,40 @@ protected:
   void execFSSYNCREF(Signal* signal);
   void execFSAPPENDREF(Signal* signal);
 
+  // MT LQH callback CONF via signal
+
+  struct CallbackPtr {
+    Uint32 m_callbackIndex;
+    Uint32 m_callbackData;
+  };
+
+  enum CallbackFlags {
+    CALLBACK_DIRECT = 0x0001, // use EXECUTE_DIRECT (assumed thread safe)
+    CALLBACK_ACK    = 0x0002  // send ack at the end of callback timeslice
+  };
+
+  struct CallbackEntry {
+    CallbackFunction m_function;
+    Uint32 m_flags;
+  };
+
+  struct CallbackTable {
+    Uint32 m_count;
+    CallbackEntry* m_entry; // array
+  };
+
+  CallbackTable* m_callbackTableAddr; // set by block if used
+
+  enum {
+    THE_NULL_CALLBACK = 0 // must assign TheNULLCallbackFunction
+  };
+
+  void execute(Signal* signal, CallbackPtr & cptr, Uint32 returnCode);
+  const CallbackEntry& getCallbackEntry(Uint32 ci);
+  void sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
+                        CallbackPtr& cptr, Uint32 returnCode);
+  void execCALLBACK_CONF(Signal* signal);
+
   // Variable for storing inserted errors, see pc.H
   ERROR_INSERT_VARIABLE;
 
@@ -817,6 +851,17 @@ SimulatedBlock::execute(Signal* signal, 
   (this->*fun)(signal, c.m_callbackData, returnCode);
 }
 
+inline
+void
+SimulatedBlock::execute(Signal* signal, CallbackPtr & cptr, Uint32 returnCode){
+  const CallbackEntry& ce = getCallbackEntry(cptr.m_callbackIndex);
+  cptr.m_callbackIndex = ZNIL;
+  Callback c;
+  c.m_callbackFunction = ce.m_function;
+  c.m_callbackData = cptr.m_callbackData;
+  execute(signal, c, returnCode);
+}
+                        
 inline 
 BlockNumber
 SimulatedBlock::number() const {

Thread
bzr commit into mysql-5.1 branch (pekka:3087) WL#4391Pekka Nousiainen16 Nov