List:Commits« Previous MessageNext Message »
From:tomas Date:February 9 2007 10:37am
Subject:bk commit into 5.1 tree (tomas:1.2431)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2431 07/02/09 16:36:41 tomas@stripped +8 -0
  Merge poseidon.mysql.com:/home/tomas/mysql-5.0-ndb
  into  poseidon.mysql.com:/home/tomas/mysql-5.1-new-ndb

  storage/ndb/src/common/debugger/EventLogger.cpp
    1.35 07/02/09 16:36:31 tomas@stripped +0 -93
    masnual merge

  storage/ndb/src/common/debugger/EventLogger.cpp
    1.21.9.2 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/src/common/debugger/EventLogger.cpp ->
storage/ndb/src/common/debugger/EventLogger.cpp

  storage/ndb/src/ndbapi/Ndb.cpp
    1.86 07/02/09 16:36:31 tomas@stripped +0 -7
    manual merge

  storage/ndb/src/ndbapi/NdbScanOperation.cpp
    1.110 07/02/09 16:26:20 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/mgmclient/CommandInterpreter.cpp
    1.83 07/02/09 16:26:20 tomas@stripped +0 -0
    Auto merged

  storage/ndb/include/ndbapi/NdbScanOperation.hpp
    1.48 07/02/09 16:26:20 tomas@stripped +0 -0
    Auto merged

  storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
    1.29 07/02/09 16:26:20 tomas@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.h
    1.169 07/02/09 16:26:20 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbScanOperation.cpp
    1.66.24.2 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbScanOperation.cpp ->
storage/ndb/src/ndbapi/NdbScanOperation.cpp

  storage/ndb/src/ndbapi/Ndb.cpp
    1.49.20.2 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/src/ndbapi/Ndb.cpp -> storage/ndb/src/ndbapi/Ndb.cpp

  storage/ndb/src/mgmclient/CommandInterpreter.cpp
    1.49.28.3 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/src/mgmclient/CommandInterpreter.cpp ->
storage/ndb/src/mgmclient/CommandInterpreter.cpp

  storage/ndb/include/ndbapi/NdbScanOperation.hpp
    1.32.10.2 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/include/ndbapi/NdbScanOperation.hpp ->
storage/ndb/include/ndbapi/NdbScanOperation.hpp

  storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
    1.17.7.2 07/02/09 16:26:19 tomas@stripped +0 -0
    Merge rename: ndb/include/ndbapi/NdbIndexScanOperation.hpp ->
storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp

  sql/ha_ndbcluster.cc
    1.407 07/02/09 16:26:19 tomas@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.mysql.com
# Root:	/home/tomas/mysql-5.1-new-ndb/RESYNC

--- 1.17.7.1/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2007-02-07 18:29:29 +07:00
+++ 1.29/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2007-02-09 16:26:20 +07:00
@@ -29,6 +29,7 @@
   friend class NdbResultSet;
   friend class NdbOperation;
   friend class NdbScanOperation;
+  friend class NdbIndexStat;
 #endif
 
 public:
@@ -117,17 +118,23 @@
    * @param attr        Attribute name, alternatively:
    * @param type        Type of bound
    * @param value       Pointer to bound value, 0 for NULL
-   * @param len         Value length in bytes.
-   *                    Fixed per datatype and can be omitted
    * @return            0 if successful otherwise -1
+   *
+   * @note See comment under equal() about data format and length.
    */
-  int setBound(const char* attr, int type, const void* value, Uint32 len = 0);
+#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
+  int setBound(const char* attr, int type, const void* value, Uint32 len);
+#endif
+  int setBound(const char* attr, int type, const void* value);
 
   /**
    * Define bound on index key in range scan using index column id.
    * See the other setBound() method for details.
    */
-  int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0);
+#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
+  int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len);
+#endif
+  int setBound(Uint32 anAttrId, int type, const void* aValue);
 
   /**
    * Reset bounds and put operation in list that will be
@@ -155,19 +162,21 @@
    * Is current scan sorted descending
    */
   bool getDescending() const { return m_descending; }
+
 private:
   NdbIndexScanOperation(Ndb* aNdb);
   virtual ~NdbIndexScanOperation();
 
-  int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
+  int setBound(const NdbColumnImpl*, int type, const void* aValue);
   int insertBOUNDS(Uint32 * data, Uint32 sz);
+  Uint32 getKeyFromSCANTABREQ(Uint32* data, Uint32 size);
 
-  virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
+  virtual int equal_impl(const NdbColumnImpl*, const char*);
   virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
 
   void fix_get_values();
   int next_result_ordered(bool fetchAllowed, bool forceSend = false);
-  int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
+  int send_next_scan_ordered(Uint32 idx);
   int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
 
   Uint32 m_sort_columns;
@@ -176,5 +185,21 @@
 
   friend struct Ndb_free_list_t<NdbIndexScanOperation>;
 };
+
+inline
+int
+NdbIndexScanOperation::setBound(const char* attr, int type, const void* value,
+                                Uint32 len)
+{
+  return setBound(attr, type, value);
+}
+
+inline
+int
+NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* value,
+                                Uint32 len)
+{
+  return setBound(anAttrId, type, value);
+}
 
 #endif

--- 1.32.10.1/ndb/include/ndbapi/NdbScanOperation.hpp	2007-02-07 18:29:29 +07:00
+++ 1.48/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2007-02-09 16:26:20 +07:00
@@ -20,6 +20,7 @@
 
 class NdbBlob;
 class NdbResultSet;
+class PollGuard;
 
 /**
  * @class NdbScanOperation
@@ -41,7 +42,8 @@
    * ranges (bounds) are to be passed.
    */
   enum ScanFlag {
-    SF_TupScan = (1 << 16),     // scan TUP
+    SF_TupScan = (1 << 16),     // scan TUP order
+    SF_DiskScan = (2 << 16),    // scan in DISK order
     SF_OrderBy = (1 << 24),     // index scan in order
     SF_Descending = (2 << 24),  // index scan in descending order
     SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
@@ -202,7 +204,8 @@
   int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
   virtual void release();
   
-  int close_impl(class TransporterFacade*, bool forceSend = false);
+  int close_impl(class TransporterFacade*, bool forceSend,
+                 PollGuard *poll_guard);
 
   // Overloaded methods from NdbCursorOperation
   int executeCursor(int ProcessorId);
@@ -211,7 +214,6 @@
   int init(const NdbTableImpl* tab, NdbTransaction*);
   int prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId);
   int doSend(int ProcessorId);
-  void checkForceSend(bool forceSend);
 
   virtual void setErrorCode(int aErrorCode);
   virtual void setErrorCodeAbort(int aErrorCode);
@@ -253,12 +255,12 @@
   Uint32 m_sent_receivers_count;  // NOTE needs mutex to access
   NdbReceiver** m_sent_receivers; // receive thread puts them here
   
-  int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
+  int send_next_scan(Uint32 cnt, bool close);
   void receiver_delivered(NdbReceiver*);
   void receiver_completed(NdbReceiver*);
   void execCLOSE_SCAN_REP();
 
-  int getKeyFromKEYINFO20(Uint32* data, unsigned size);
+  int getKeyFromKEYINFO20(Uint32* data, Uint32 & size);
   NdbOperation*	takeOverScanOp(OperationType opType, NdbTransaction*);
   
   bool m_ordered;
@@ -266,6 +268,7 @@
   Uint32 m_read_range_no;
   NdbRecAttr *m_curr_row; // Pointer to last returned row
   bool m_multi_range; // Mark if operation is part of multi-range scan
+  bool m_executed; // Marker if operation should be released at close
 };
 
 inline

--- 1.21.9.1/ndb/src/common/debugger/EventLogger.cpp	2007-02-09 20:31:46 +07:00
+++ 1.35/storage/ndb/src/common/debugger/EventLogger.cpp	2007-02-09 16:36:31 +07:00
@@ -15,13 +15,12 @@
 
 #include <ndb_global.h>
 
-#include "EventLogger.hpp"
+#include <EventLogger.hpp>
 #include <TransporterCallback.hpp>
 
 #include <NdbConfig.h>
 #include <kernel/BlockNumbers.h>
 #include <signaldata/ArbitSignalData.hpp>
-#include <GrepEvent.hpp>
 #include <NodeState.hpp>
 #include <version.h>
 
@@ -528,6 +527,7 @@
 		       theData[1],
 		       theData[2]);
 }
+
 void getTextTransporterError(QQQQ) {
   struct myTransporterError{
     int errorNum;
@@ -714,6 +714,43 @@
 void getTextInfoEvent(QQQQ) {
   BaseString::snprintf(m_text, m_text_len, (char *)&theData[1]);
 }
+const char bytes_unit[]= "B";
+const char kbytes_unit[]= "KB";
+const char mbytes_unit[]= "MB";
+static void convert_unit(unsigned &data, const char *&unit)
+{
+  if (data < 16*1024)
+  {
+    unit= bytes_unit;
+    return;
+  }
+  if (data < 16*1024*1024)
+  {
+    data= (data+1023)/1024;
+    unit= kbytes_unit;
+    return;
+  }
+  data= (data+1024*1024-1)/(1024*1024);
+  unit= mbytes_unit;  
+}
+
+void getTextEventBufferStatus(QQQQ) {
+  unsigned used= theData[1], alloc= theData[2], max_= theData[3];
+  const char *used_unit, *alloc_unit, *max_unit;
+  convert_unit(used, used_unit);
+  convert_unit(alloc, alloc_unit);
+  convert_unit(max_, max_unit);
+  BaseString::snprintf(m_text, m_text_len,
+		       "Event buffer status: used=%d%s(%d%) alloc=%d%s(%d%) "
+		       "max=%d%s apply_gci=%lld latest_gci=%lld",
+		       used, used_unit,
+		       theData[2] ? (theData[1]*100)/theData[2] : 0,
+		       alloc, alloc_unit,
+		       theData[3] ? (theData[2]*100)/theData[3] : 0,
+		       max_, max_unit,
+		       theData[4]+(((Uint64)theData[5])<<32),
+		       theData[6]+(((Uint64)theData[7])<<32));
+}
 void getTextWarningEvent(QQQQ) {
   BaseString::snprintf(m_text, m_text_len, (char *)&theData[1]);
 }
@@ -966,6 +1003,7 @@
   ROW(SentHeartbeat,           LogLevel::llInfo,  12, Logger::LL_INFO ),
   ROW(CreateLogBytes,          LogLevel::llInfo,  11, Logger::LL_INFO ),
   ROW(InfoEvent,               LogLevel::llInfo,   2, Logger::LL_INFO ),
+  ROW(EventBufferStatus,       LogLevel::llInfo,   7, Logger::LL_INFO ),
 
   //Single User
   ROW(SingleUser,              LogLevel::llInfo,   7, Logger::LL_INFO ),
@@ -1004,6 +1042,7 @@
   removeAllHandlers();
 }
 
+#ifdef NOT_USED
 static NdbOut&
 operator<<(NdbOut& out, const LogLevel & ll)
 {
@@ -1013,6 +1052,7 @@
   out << "]";
   return out;
 }
+#endif
 
 int
 EventLoggerBase::event_lookup(int eventType,
@@ -1057,6 +1097,7 @@
   Logger::LoggerLevel severity = Logger::LL_WARNING;
   LogLevel::EventCategory cat= LogLevel::llInvalid;
   EventTextFunction textF;
+  char log_text[MAX_TEXT_LENGTH];
 
   DBUG_ENTER("EventLogger::log");
   DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId));
@@ -1070,29 +1111,29 @@
     DBUG_PRINT("info",("m_logLevel.getLogLevel=%d", m_logLevel.getLogLevel(cat)));
 
   if (threshold <= set){
-    getText(m_text,sizeof(m_text),textF,theData,nodeId);
+    getText(log_text,sizeof(log_text),textF,theData,nodeId);
 
     switch (severity){
     case Logger::LL_ALERT:
-      alert(m_text);
+      alert(log_text);
       break;
     case Logger::LL_CRITICAL:
-      critical(m_text); 
+      critical(log_text); 
       break;
     case Logger::LL_WARNING:
-      warning(m_text); 
+      warning(log_text); 
       break;
     case Logger::LL_ERROR:
-      error(m_text); 
+      error(log_text); 
       break;
     case Logger::LL_INFO:
-      info(m_text); 
+      info(log_text); 
       break;
     case Logger::LL_DEBUG:
-      debug(m_text); 
+      debug(log_text); 
       break;
     default:
-      info(m_text); 
+      info(log_text); 
       break;
     }
   } // if (..
@@ -1110,7 +1151,3 @@
 {
   m_filterLevel = filterLevel;
 }
-
-//
-// PRIVATE
-//

--- 1.49.28.2/ndb/src/mgmclient/CommandInterpreter.cpp	2007-02-01 23:42:04 +07:00
+++ 1.83/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2007-02-09 16:26:20 +07:00
@@ -15,14 +15,7 @@
 
 #include <ndb_global.h>
 #include <my_sys.h>
-
-//#define HAVE_GLOBAL_REPLICATION
-
 #include <Vector.hpp>
-#ifdef  HAVE_GLOBAL_REPLICATION
-#include "../rep/repapi/repapi.h"
-#endif
-
 #include <mgmapi.h>
 #include <util/BaseString.hpp>
 
@@ -125,7 +118,7 @@
   int  executeStatus(int processId, const char* parameters, bool all);
   int  executeEventReporting(int processId, const char* parameters, bool all);
   int  executeDumpState(int processId, const char* parameters, bool all);
-  int  executeStartBackup(char * parameters);
+  int  executeStartBackup(char * parameters, bool interactive);
   int  executeAbortBackup(char * parameters);
   int  executeStop(Vector<BaseString> &command_list, unsigned command_pos,
                    int *node_ids, int no_of_nodes);
@@ -167,11 +160,6 @@
   int m_verbose;
   int try_reconnect;
   int m_error;
-#ifdef HAVE_GLOBAL_REPLICATION  
-  NdbRepHandle m_repserver;
-  const char *rep_host;
-  bool rep_connected;
-#endif
   struct NdbThread* m_event_thread;
   NdbMutex *m_print_mutex;
 };
@@ -236,10 +224,6 @@
 #include <NdbMem.h>
 #include <EventLogger.hpp>
 #include <signaldata/SetLogLevelOrd.hpp>
-#include <signaldata/GrepImpl.hpp>
-#ifdef HAVE_GLOBAL_REPLICATION
-
-#endif // HAVE_GLOBAL_REPLICATION
 #include "MgmtErrorReporter.hpp"
 #include <Parser.hpp>
 #include <SocketServer.hpp>
@@ -267,9 +251,6 @@
 "---------------------------------------------------------------------------\n"
 "HELP                                   Print help text\n"
 "HELP COMMAND                           Print detailed help for COMMAND(e.g. SHOW)\n"
-#ifdef HAVE_GLOBAL_REPLICATION
-"HELP REPLICATION                       Help for global replication\n"
-#endif // HAVE_GLOBAL_REPLICATION
 #ifdef VM_TRACE // DEBUG ONLY
 "HELP DEBUG                             Help for debug compiled version\n"
 #endif
@@ -293,9 +274,6 @@
 "EXIT SINGLE USER MODE                  Exit single user mode\n"
 "<id> STATUS                            Print status\n"
 "<id> CLUSTERLOG {<category>=<level>}+  Set log level for cluster
log\n"
-#ifdef HAVE_GLOBAL_REPLICATION
-"REP CONNECT <host:port>                Connect to REP server on host:port\n"
-#endif
 "PURGE STALE SESSIONS                   Reset reserved nodeid's in the mgmt server\n"
 "CONNECT [<connectstring>]              Connect to management server (reconnect if
already connected)\n"
 "QUIT                                   Quit management client\n"
@@ -595,39 +573,6 @@
 ;
 
 
-#ifdef HAVE_GLOBAL_REPLICATION
-static const char* helpTextRep =
-"---------------------------------------------------------------------------\n"
-" NDB Cluster -- Management Client -- Help for Global Replication\n"
-"---------------------------------------------------------------------------\n"
-"Commands should be executed on the standby NDB Cluster\n"
-"These features are in an experimental release state.\n"
-"\n"
-"Simple Commands:\n"
-"REP START              Start Global Replication\n" 
-"REP START REQUESTOR    Start Global Replication Requestor\n" 
-"REP STATUS             Show Global Replication status\n" 
-"REP STOP               Stop Global Replication\n"
-"REP STOP REQUESTOR     Stop Global Replication Requestor\n"
-"\n" 
-"Advanced Commands:\n"
-"REP START <protocol>   Starts protocol\n"
-"REP STOP <protocol>    Stops protocol\n"
-"<protocol> = TRANSFER | APPLY | DELETE\n"
-"\n"
-#ifdef VM_TRACE // DEBUG ONLY
-"Debugging commands:\n"
-"REP DELETE             Removes epochs stored in primary and standy systems\n"
-"REP DROP <tableid>     Drop a table in SS identified by table id\n"
-"REP SLOWSTOP           Stop Replication (Tries to synchonize with primary)\n" 
-"REP FASTSTOP           Stop Replication (Stops in consistent state)\n" 
-"<component> = SUBSCRIPTION\n"
-"              METALOG | METASCAN | DATALOG | DATASCAN\n"
-"              REQUESTOR | TRANSFER | APPLY | DELETE\n"
-#endif
-;
-#endif // HAVE_GLOBAL_REPLICATION
-
 #ifdef VM_TRACE // DEBUG ONLY
 static const char* helpTextDebug =
 "---------------------------------------------------------------------------\n"
@@ -680,10 +625,6 @@
   {"PURGE STALE SESSIONS", helpTextPurgeStaleSessions},
   {"CONNECT", helpTextConnect},
   {"QUIT", helpTextQuit},
-#ifdef HAVE_GLOBAL_REPLICATION
-  {"REPLICATION", helpTextRep},
-  {"REP", helpTextRep},
-#endif // HAVE_GLOBAL_REPLICATION
 #ifdef VM_TRACE // DEBUG ONLY
   {"DEBUG", helpTextDebug},
 #endif //VM_TRACE
@@ -723,11 +664,6 @@
   m_event_thread= NULL;
   try_reconnect = 0;
   m_print_mutex= NdbMutex_Create();
-#ifdef HAVE_GLOBAL_REPLICATION
-  rep_host = NULL;
-  m_repserver = NULL;
-  rep_connected = false;
-#endif
 }
 
 /*
@@ -1054,7 +990,7 @@
   else if(strcasecmp(firstToken, "START") == 0 &&
 	  allAfterFirstToken != NULL &&
 	  strncasecmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){
-    m_error= executeStartBackup(allAfterFirstToken);
+    m_error= executeStartBackup(allAfterFirstToken, interactive);
     DBUG_RETURN(true);
   }
   else if(strcasecmp(firstToken, "ABORT") == 0 &&
@@ -1066,14 +1002,7 @@
   else if (strcasecmp(firstToken, "PURGE") == 0) {
     m_error = executePurge(allAfterFirstToken);
     DBUG_RETURN(true);
-  } 
-#ifdef HAVE_GLOBAL_REPLICATION
-  else if(strcasecmp(firstToken, "REPLICATION") == 0 ||
-	  strcasecmp(firstToken, "REP") == 0) {
-    m_error = executeRep(allAfterFirstToken);
-    DBUG_RETURN(true);
-  }
-#endif // HAVE_GLOBAL_REPLICATION
+  }                
   else if(strcasecmp(firstToken, "ENTER") == 0 &&
 	  allAfterFirstToken != NULL &&
 	  strncasecmp(allAfterFirstToken, "SINGLE USER MODE ", 
@@ -1549,7 +1478,6 @@
     return -1;
   }
 
-  int i;
   char *str;
   
   if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
@@ -1628,8 +1556,8 @@
       case NDB_MGM_NODE_TYPE_UNKNOWN:
         ndbout << "Error: Unknown Node Type" << endl;
         return -1;
-      case NDB_MGM_NODE_TYPE_REP:
-	abort();
+      case NDB_MGM_NODE_TYPE_MAX:
+        break;                                  /* purify: deadcode */
       }
     }
 
@@ -1667,7 +1595,6 @@
 {
   BaseString *basestring = NULL;
 
-  int retval;
   disconnect();
   if (!emptyString(parameters)) {
     basestring= new BaseString(parameters);
@@ -1704,7 +1631,15 @@
   char * item = strtok_r(tmpString, " ", &tmpPtr);
   int enable;
 
-  const unsigned int *enabled= ndb_mgm_get_logfilter(m_mgmsrv);
+  ndb_mgm_severity enabled[NDB_MGM_EVENT_SEVERITY_ALL] = 
+    {{NDB_MGM_EVENT_SEVERITY_ON,0},
+     {NDB_MGM_EVENT_SEVERITY_DEBUG,0},
+     {NDB_MGM_EVENT_SEVERITY_INFO,0},
+     {NDB_MGM_EVENT_SEVERITY_WARNING,0},
+     {NDB_MGM_EVENT_SEVERITY_ERROR,0},
+     {NDB_MGM_EVENT_SEVERITY_CRITICAL,0},
+     {NDB_MGM_EVENT_SEVERITY_ALERT,0}};
+  ndb_mgm_get_clusterlog_severity_filter(m_mgmsrv, &enabled[0],
NDB_MGM_EVENT_SEVERITY_ALL);
   if(enabled == NULL) {
     ndbout << "Couldn't get status" << endl;
     printError();
@@ -1717,25 +1652,25 @@
    ********************/
   if (strcasecmp(item, "INFO") == 0) {
     DBUG_PRINT("info",("INFO"));
-    if(enabled[0] == 0)
+    if(enabled[0].value == 0)
     {
       ndbout << "Cluster logging is disabled." << endl;
       m_error = 0;
       DBUG_VOID_RETURN;
     }
 #if 0 
-    for(i = 0; i<7;i++)
-      printf("enabled[%d] = %d\n", i, enabled[i]);
+    for(i = 0; i<DB_MGM_EVENT_SEVERITY_ALL;i++)
+      printf("enabled[%d] = %d\n", i, enabled[i].value);
 #endif
     ndbout << "Severities enabled: ";
     for(i = 1; i < (int)NDB_MGM_EVENT_SEVERITY_ALL; i++) {
-      const char *str= ndb_mgm_get_event_severity_string((ndb_mgm_event_severity)i);
+      const char *str= ndb_mgm_get_event_severity_string(enabled[i].category);
       if (str == 0)
       {
 	DBUG_ASSERT(false);
 	continue;
       }
-      if(enabled[i])
+      if(enabled[i].value)
 	ndbout << BaseString(str).ndb_toupper() << " ";
     }
     ndbout << endl;
@@ -2102,7 +2037,6 @@
 
   ndb_mgm_node_status status;
   Uint32 startPhase, version;
-  bool system;
   
   struct ndb_mgm_cluster_state *cl;
   cl = ndb_mgm_get_status(m_mgmsrv);
@@ -2526,24 +2460,17 @@
   return retval;
 }
 
+
 /*****************************************************************************
  * Backup
  *****************************************************************************/
 int
-CommandInterpreter::executeStartBackup(char* parameters)
+CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
 {
   struct ndb_mgm_reply reply;
   unsigned int backupId;
-#if 0
-  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
-  int fd = ndb_mgm_listen_event(m_mgmsrv, filter);
-  if (fd < 0)
-  {
-    ndbout << "Initializing start of backup failed" << endl;
-    printError();
-    return fd;
-  }
-#endif
+  int fd = -1;
+  
   Vector<BaseString> args;
   {
     BaseString(parameters).split(args);
@@ -2556,25 +2483,21 @@
   int sz= args.size();
 
   int result;
-  if (sz == 2 &&
-      args[1] == "NOWAIT")
+  int flags = 2;
+  if (sz == 2 && args[1] == "NOWAIT")
   {
+    flags = 0;
     result = ndb_mgm_start_backup(m_mgmsrv, 0, &backupId, &reply);
   }
-  else if (sz == 1 ||
-	   (sz == 3 &&
-	    args[1] == "WAIT" &&
-	    args[2] == "COMPLETED"))
+  else if (sz == 1 || (sz == 3 && args[1] == "WAIT" && args[2] ==
"COMPLETED"))
   {
+    flags = 2;
     ndbout_c("Waiting for completed, this may take several minutes");
-    result = ndb_mgm_start_backup(m_mgmsrv, 2, &backupId, &reply);
   }
-  else if (sz == 3 &&
-	   args[1] == "WAIT" &&
-	   args[2] == "STARTED")
+  else if (sz == 3 && args[1] == "WAIT" && args[2] == "STARTED")
   {
     ndbout_c("Waiting for started, this may take several minutes");
-    result = ndb_mgm_start_backup(m_mgmsrv, 1, &backupId, &reply);
+    flags = 1;
   }
   else
   {
@@ -2582,45 +2505,63 @@
     return -1;
   }
 
+  /**
+   * If interactive...event listner is already running
+   */
+  if (flags == 2 && !interactive)
+  {
+    int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0, 0 };
+    fd = ndb_mgm_listen_event(m_mgmsrv, filter);
+    if (fd < 0)
+    {
+      ndbout << "Initializing start of backup failed" << endl;
+      printError();
+      return fd;
+    }
+  }
+  result = ndb_mgm_start_backup(m_mgmsrv, flags, &backupId, &reply);
+
   if (result != 0) {
     ndbout << "Backup failed" << endl;
     printError();
-#if 0
-    close(fd);
-#endif
+
+    if (fd >= 0) 
+      close(fd);
     return result;
   }
-#if 0
-  ndbout_c("Waiting for completed, this may take several minutes");
-  char *tmp;
-  char buf[1024];
+
+  if (fd >= 0)
   {
-    SocketInputStream in(fd);
-    int count = 0;
+    char *tmp;
+    char buf[1024];
+    {
+      SocketInputStream in(fd);
+      int count = 0;
+      do {
+	tmp = in.gets(buf, 1024);
+	if(tmp)
+	{
+	  ndbout << tmp;
+	  unsigned int id;
+	  if(sscanf(tmp, "%*[^:]: Backup %d ", &id) == 1 && id == backupId){
+	    count++;
+	  }
+	}
+      } while(count < 2);
+    }
+    
+    SocketInputStream in(fd, 10);
     do {
       tmp = in.gets(buf, 1024);
-      if(tmp)
+      if(tmp && tmp[0] != 0)
       {
 	ndbout << tmp;
-	unsigned int id;
-	if(sscanf(tmp, "%*[^:]: Backup %d ", &id) == 1 && id == backupId){
-	  count++;
-	}
       }
-    } while(count < 2);
+    } while(tmp && tmp[0] != 0);
+    
+    close(fd);
   }
 
-  SocketInputStream in(fd, 10);
-  do {
-    tmp = in.gets(buf, 1024);
-    if(tmp && tmp[0] != 0)
-    {
-      ndbout << tmp;
-    }
-  } while(tmp && tmp[0] != 0);
-
-  close(fd);
-#endif  
   return 0;
 }
 
@@ -2653,234 +2594,5 @@
   ndbout << "Invalid arguments: expected <BackupId>" << endl;
   return -1;
 }
-
-#ifdef HAVE_GLOBAL_REPLICATION
-/*****************************************************************************
- * Global Replication
- *
- * For information about the different commands, see
- * GrepReq::Request in file signaldata/grepImpl.cpp.
- *
- * Below are commands as of 2003-07-05 (may change!):
- * START = 0,            ///< Start Global Replication (all phases)
- * START_METALOG = 1,    ///< Start Global Replication (all phases)
- * START_METASCAN = 2,   ///< Start Global Replication (all phases)
- * START_DATALOG = 3,    ///< Start Global Replication (all phases)
- * START_DATASCAN = 4,   ///< Start Global Replication (all phases)
- * START_REQUESTOR = 5,  ///< Start Global Replication (all phases)
- * ABORT = 6,            ///< Immediate stop (removes subscription)
- * SLOW_STOP = 7,        ///< Stop after finishing applying current GCI epoch
- * FAST_STOP = 8,        ///< Stop after finishing applying all PS GCI epochs
- * START_TRANSFER = 9,   ///< Start SS-PS transfer
- * STOP_TRANSFER = 10,   ///< Stop SS-PS transfer
- * START_APPLY = 11,     ///< Start applying GCI epochs in SS
- * STOP_APPLY = 12,      ///< Stop applying GCI epochs in SS
- * STATUS = 13,           ///< Status
- * START_SUBSCR = 14,
- * REMOVE_BUFFERS = 15,
- * DROP_TABLE = 16
-
- *****************************************************************************/
-
-int
-CommandInterpreter::executeRep(char* parameters) 
-{
-  if (emptyString(parameters)) {
-    ndbout << helpTextRep;
-    return 0;
-  }
-
-  char * line = my_strdup(parameters,MYF(MY_WME));
-  My_auto_ptr<char> ap1((char*)line);
-  char * firstToken = strtok(line, " ");
-  
-  struct ndb_rep_reply  reply;
-  unsigned int          repId;
-
-
-  if (!strcasecmp(firstToken, "CONNECT")) {
-    char * host = strtok(NULL, "\0");
-    for (unsigned int i = 0; i < strlen(host); ++i) {
-      host[i] = tolower(host[i]);
-    }
-    
-    if(host == NULL)
-    {
-      ndbout_c("host:port must be specified.");
-      return -1;
-    }
-    
-    if(rep_connected) {
-      if(m_repserver != NULL) {
-	ndb_rep_disconnect(m_repserver);
-	rep_connected = false;
-      }       
-    }
-          
-    if(m_repserver == NULL)
-      m_repserver = ndb_rep_create_handle();
-    if(ndb_rep_connect(m_repserver, host) < 0){
-      ndbout_c("Failed to connect to %s", host);
-      return -1;
-    } 
-    else
-      rep_connected=true;
-    return 0;
-    
-    if(!rep_connected) {
-      ndbout_c("Not connected to REP server");
-      return -1;
-    }
-  }
-    
-  /********
-   * START 
-   ********/
-  if (!strcasecmp(firstToken, "START")) {
-    
-    unsigned int          req;
-    char *startType = strtok(NULL, "\0");
-    
-    if (startType == NULL) {                
-      req = GrepReq::START;
-    } else if (!strcasecmp(startType, "SUBSCRIPTION")) {  
-      req = GrepReq::START_SUBSCR;
-    } else if (!strcasecmp(startType, "METALOG")) { 
-      req = GrepReq::START_METALOG;
-    } else if (!strcasecmp(startType, "METASCAN")) {
-      req = GrepReq::START_METASCAN;
-    } else if (!strcasecmp(startType, "DATALOG")) {
-      req = GrepReq::START_DATALOG;
-    } else if (!strcasecmp(startType, "DATASCAN")) {
-      req = GrepReq::START_DATASCAN;
-    } else if (!strcasecmp(startType, "REQUESTOR")) {
-      req = GrepReq::START_REQUESTOR;
-    } else if (!strcasecmp(startType, "TRANSFER")) {
-      req = GrepReq::START_TRANSFER;
-    } else if (!strcasecmp(startType, "APPLY")) {
-      req = GrepReq::START_APPLY;
-    } else if (!strcasecmp(startType, "DELETE")) {
-      req = GrepReq::START_DELETE;
-    } else {
-      ndbout_c("Illegal argument to command 'REPLICATION START'");
-      return -1;
-    }
-
-    int result = ndb_rep_command(m_repserver, req, &repId, &reply);
-    
-    if (result != 0) {
-      ndbout << "Start of Global Replication failed" << endl;
-      return -1;
-    } else {
-      ndbout << "Start of Global Replication ordered" << endl;
-    }
-    return 0;
-  }
-
-  /********
-   * STOP
-   ********/
-  if (!strcasecmp(firstToken, "STOP")) {    
-    unsigned int          req;
-    char *startType = strtok(NULL, " ");
-    unsigned int epoch = 0;
-    
-    if (startType == NULL) {                 
-      /**
-       * Stop immediately
-       */
-      req = GrepReq::STOP;
-    } else if (!strcasecmp(startType, "EPOCH")) {  
-      char *strEpoch = strtok(NULL, "\0");
-      if(strEpoch == NULL) {
-	ndbout_c("Epoch expected!");
-	return -1;
-      }
-      req = GrepReq::STOP;
-      epoch=atoi(strEpoch);      
-    } else if (!strcasecmp(startType, "SUBSCRIPTION")) {  
-      req = GrepReq::STOP_SUBSCR;
-    } else if (!strcasecmp(startType, "METALOG")) { 
-      req = GrepReq::STOP_METALOG;
-    } else if (!strcasecmp(startType, "METASCAN")) {
-      req = GrepReq::STOP_METASCAN;
-    } else if (!strcasecmp(startType, "DATALOG")) {
-      req = GrepReq::STOP_DATALOG;
-    } else if (!strcasecmp(startType, "DATASCAN")) {
-      req = GrepReq::STOP_DATASCAN;
-    } else if (!strcasecmp(startType, "REQUESTOR")) {
-      req = GrepReq::STOP_REQUESTOR;
-    } else if (!strcasecmp(startType, "TRANSFER")) {
-      req = GrepReq::STOP_TRANSFER;
-    } else if (!strcasecmp(startType, "APPLY")) {
-      req = GrepReq::STOP_APPLY;
-    } else if (!strcasecmp(startType, "DELETE")) {
-      req = GrepReq::STOP_DELETE;
-    } else {
-      ndbout_c("Illegal argument to command 'REPLICATION STOP'");
-      return -1;
-    }
-    int result = ndb_rep_command(m_repserver, req, &repId, &reply, epoch);
-    
-    if (result != 0) {
-      ndbout << "Stop command failed" << endl;
-      return -1;
-    } else {
-      ndbout << "Stop ordered" << endl;
-    }
-    return 0;
-  }
-
-  /*********
-   * STATUS
-   *********/
-  if (!strcasecmp(firstToken, "STATUS")) {
-    struct rep_state repstate;
-    int result = 
-      ndb_rep_get_status(m_repserver, &repId, &reply, &repstate);
-    
-    if (result != 0) {
-      ndbout << "Status request of Global Replication failed" << endl;
-      return -1;
-    } else {
-      ndbout << "Status request of Global Replication ordered" << endl;
-      ndbout << "See printout at one of the DB nodes" << endl;
-      ndbout << "(Better status report is under development.)" << endl;
-      ndbout << " SubscriptionId " << repstate.subid 
-	     << " SubscriptionKey " << repstate.subkey << endl;
-    }
-    return 0;
-  }
-
-  /*********
-   * QUERY (see repapi.h for querable counters)
-   *********/
-  if (!strcasecmp(firstToken, "QUERY")) {
-    char *query = strtok(NULL, "\0");
-    int queryCounter=-1;
-    if(query != NULL) {
-      queryCounter = atoi(query);
-    }
-    struct rep_state repstate;
-    unsigned repId = 0;
-    int result = ndb_rep_query(m_repserver, (QueryCounter)queryCounter,
-			       &repId, &reply, &repstate);
-    
-    if (result != 0) {
-      ndbout << "Query repserver failed" << endl;
-      return -1;
-    } else {
-      ndbout << "Query repserver sucessful" << endl;
-      ndbout_c("repstate : QueryCounter %d, f=%d l=%d"
-	       " nodegroups %d" , 
-	       repstate.queryCounter,
-	       repstate.first[0], repstate.last[0],
-	       repstate.no_of_nodegroups );
-    }
-    return 0;
-  }
-  return 0;
-}
-#endif // HAVE_GLOBAL_REPLICATION
 
 template class Vector<char const*>;

--- 1.49.20.1/ndb/src/ndbapi/Ndb.cpp	2007-02-01 23:42:04 +07:00
+++ 1.86/storage/ndb/src/ndbapi/Ndb.cpp	2007-02-09 16:36:31 +07:00
@@ -27,6 +27,8 @@
 #include "NdbImpl.hpp"
 #include <NdbOperation.hpp>
 #include <NdbTransaction.hpp>
+#include <NdbEventOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
 #include <NdbRecAttr.hpp>
 #include <md5_hash.hpp>
 #include <NdbSleep.h>
@@ -136,7 +138,7 @@
 //***************************************************************************
   
   int	         tReturnCode;
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
 
   DBUG_ENTER("Ndb::NDB_connect");
 
@@ -171,23 +173,9 @@
   tSignal->setData(theMyRef, 2);	// Set my block reference
   tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
   Uint32 nodeSequence;
-  { // send and receive signal
-    Guard guard(tp->theMutexPtr);
-    nodeSequence = tp->getNodeSequence(tNode);
-    bool node_is_alive = tp->get_node_alive(tNode);
-    if (node_is_alive) { 
-      tReturnCode = tp->sendSignal(tSignal, tNode);  
-      releaseSignal(tSignal); 
-      if (tReturnCode != -1) {
-        theImpl->theWaiter.m_node = tNode;  
-        theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
-        tReturnCode = receiveResponse(); 
-      }//if
-    } else {
-      releaseSignal(tSignal);
-      tReturnCode = -1;
-    }//if
-  }
+  tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
+                             0, &nodeSequence);
+  releaseSignal(tSignal); 
   if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected))
{
     //************************************************
     // Send and receive was successful
@@ -228,10 +216,9 @@
 void 
 Ndb::doDisconnect()
 {
+  DBUG_ENTER("Ndb::doDisconnect");
   NdbTransaction* tNdbCon;
   CHECK_STATUS_MACRO_VOID;
-  /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */
-  DBUG_ENTER("Ndb::doDisconnect");
 
   Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
   Uint8 *theDBnodes= theImpl->theDBnodes;
@@ -268,8 +255,6 @@
   DBUG_ENTER("Ndb::waitUntilReady");
   int secondsCounter = 0;
   int milliCounter = 0;
-  int noChecksSinceFirstAliveFound = 0;
-  int id;
 
   if (theInitState != Initialised) {
     // Ndb::init is not called
@@ -589,6 +574,7 @@
 #ifdef CUSTOMER_RELEASE
   return -1;
 #else
+  DBUG_ENTER("Ndb::NdbTamper");
   CHECK_STATUS_MACRO;
   checkFailedNode();
 
@@ -610,20 +596,20 @@
 	break;
      default:
         theError.code = 4102;
-        return -1;
+        DBUG_RETURN(-1);
   }
 
   tNdbConn = getNdbCon();	// Get free connection object
   if (tNdbConn == NULL) {
     theError.code = 4000;
-    return -1;
+    DBUG_RETURN(-1);
   }
   tSignal.setSignal(GSN_DIHNDBTAMPER);
   tSignal.setData (tAction, 1);
   tSignal.setData(tNdbConn->ptr2int(),2);
   tSignal.setData(theMyRef,3);		// Set return block reference
   tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
   if (tAction == 3) {
     tp->lock_mutex();
     tp->sendSignal(&tSignal, aNode);
@@ -635,12 +621,12 @@
     if (tNode == 0) {
       theError.code = 4002;
       releaseNdbCon(tNdbConn);
-      return -1;
+      DBUG_RETURN(-1);
     }//if
     ret_code = tp->sendSignal(&tSignal,aNode);
     tp->unlock_mutex();
     releaseNdbCon(tNdbConn);
-    return ret_code;
+    DBUG_RETURN(ret_code);
   } else {
     do {
       tp->lock_mutex();
@@ -651,7 +637,7 @@
       if (tNode == 0) {
         theError.code = 4009;
         releaseNdbCon(tNdbConn);
-        return -1;
+        DBUG_RETURN(-1);
       }//if
       ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
       if (ret_code == 0) {  
@@ -659,15 +645,15 @@
           theRestartGCI = 0;
         }//if
         releaseNdbCon(tNdbConn);
-        return theRestartGCI;
+        DBUG_RETURN(theRestartGCI);
       } else if ((ret_code == -5) || (ret_code == -2)) {
         TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
       } else {
-        return -1;
+        DBUG_RETURN(-1);
       }//if
     } while (1);
   }
-  return 0;
+  DBUG_RETURN(0);
 #endif
 }
 #if 0
@@ -766,15 +752,18 @@
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
   DBUG_RETURN(0);
@@ -785,31 +774,48 @@
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 & tupleId,
+                           Uint32 cacheSize)
+{
+  DBUG_ENTER("Ndb::getAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::getTupleIdFromNdb(Ndb_local_table_info* info,
-                       Uint64 & tupleId, Uint32 cacheSize)
+Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
+                       TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = ++info->m_first_tuple_id;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = ++range.m_first_tuple_id;
     DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
   }
   else
@@ -822,7 +828,7 @@
      * and returns first tupleId in the new range.
      */
     Uint64 opValue = cacheSize;
-    if (opTupleIdOnNdb(info, opValue, 0) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -834,15 +840,18 @@
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
@@ -853,31 +862,47 @@
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::readTupleIdFromNdb(Ndb_local_table_info* info,
-                        Uint64 & tupleId)
+Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
+                            TupleIdRange & range, Uint64 & tupleId)
+{
+  DBUG_ENTER("Ndb::readAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
+                        TupleIdRange & range, Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = info->m_first_tuple_id + 1;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = range.m_first_tuple_id + 1;
   }
   else
   {
@@ -886,7 +911,7 @@
      * only if no other transactions are allowed.
      */
     Uint64 opValue = 0;
-    if (opTupleIdOnNdb(info, opValue, 3) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -898,15 +923,18 @@
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
@@ -916,36 +944,52 @@
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 tupleId,
+                           bool increase)
+{
+  DBUG_ENTER("Ndb::setAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
 int
-Ndb::setTupleIdInNdb(Ndb_local_table_info* info,
-                     Uint64 tupleId, bool increase)
+Ndb::setTupleIdInNdb(const NdbTableImpl* table,
+                     TupleIdRange & range, Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setTupleIdInNdb");
   if (increase)
   {
-    if (info->m_first_tuple_id != info->m_last_tuple_id)
+    if (range.m_first_tuple_id != range.m_last_tuple_id)
     {
-      assert(info->m_first_tuple_id < info->m_last_tuple_id);
-      if (tupleId <= info->m_first_tuple_id + 1)
+      assert(range.m_first_tuple_id < range.m_last_tuple_id);
+      if (tupleId <= range.m_first_tuple_id + 1)
 	DBUG_RETURN(0);
-      if (tupleId <= info->m_last_tuple_id)
+      if (tupleId <= range.m_last_tuple_id)
       {
-	info->m_first_tuple_id = tupleId - 1;
+	range.m_first_tuple_id = tupleId - 1;
         DBUG_PRINT("info", 
                    ("Setting next auto increment cached value to %lu",
                     (ulong)tupleId));  
@@ -956,7 +1000,7 @@
      * if tupleId <= NEXTID, do nothing.  otherwise update NEXTID to
      * tupleId and set cached range to first = last = tupleId - 1.
      */
-    if (opTupleIdOnNdb(info, tupleId, 2) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
       DBUG_RETURN(-1);
   }
   else
@@ -964,42 +1008,62 @@
     /*
      * update NEXTID to given value.  reset cached range.
      */
-    if (opTupleIdOnNdb(info, tupleId, 1) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
       DBUG_RETURN(-1);
   }
   DBUG_RETURN(0);
 }
 
+int Ndb::initAutoIncrement()
+{
+  if (m_sys_tab_0)
+    return 0;
+
+  BaseString currentDb(getDatabaseName());
+  BaseString currentSchema(getDatabaseSchemaName());
+
+  setDatabaseName("sys");
+  setDatabaseSchemaName("def");
+
+  m_sys_tab_0 = theDictionary->getTableGlobal("SYSTAB_0");
+
+  // Restore current name space
+  setDatabaseName(currentDb.c_str());
+  setDatabaseSchemaName(currentSchema.c_str());
+
+  if (m_sys_tab_0 == NULL) {
+    assert(theDictionary->m_error.code != 0);
+    theError.code = theDictionary->m_error.code;
+    return -1;
+  }
+
+  return 0;
+}
+
 int
-Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op)
+Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
+                    TupleIdRange & range, Uint64 & opValue, Uint32 op)
 {
   DBUG_ENTER("Ndb::opTupleIdOnNdb");
-  Uint32 aTableId = info->m_table_impl->m_tableId;
+  Uint32 aTableId = table->m_id;
   DBUG_PRINT("enter", ("table: %u  value: %lu  op: %u",
                        aTableId, (ulong) opValue, op));
 
-  NdbTransaction*     tConnection;
-  NdbOperation*      tOperation= 0; // Compiler warning if not initialized
+  NdbTransaction*    tConnection = NULL;
+  NdbOperation*      tOperation = NULL;
   Uint64             tValue;
   NdbRecAttr*        tRecAttrResult;
 
-  NdbError savedError;
-
-  CHECK_STATUS_MACRO_ZERO;
+  CHECK_STATUS_MACRO;
 
-  BaseString currentDb(getDatabaseName());
-  BaseString currentSchema(getDatabaseSchemaName());
+  if (initAutoIncrement() == -1)
+    goto error_handler;
 
-  setDatabaseName("sys");
-  setDatabaseSchemaName("def");
   tConnection = this->startTransaction();
   if (tConnection == NULL)
-    goto error_return;
+    goto error_handler;
 
-  if (usingFullyQualifiedNames())
-    tOperation = tConnection->getNdbOperation("SYSTAB_0");
-  else
-    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
+  tOperation = tConnection->getNdbOperation(m_sys_tab_0);
   if (tOperation == NULL)
     goto error_handler;
 
@@ -1007,18 +1071,18 @@
     {
     case 0:
       tOperation->interpretedUpdateTuple();
-      tOperation->equal("SYSKEY_0", aTableId );
+      tOperation->equal("SYSKEY_0", aTableId);
       tOperation->incValue("NEXTID", opValue);
       tRecAttrResult = tOperation->getValue("NEXTID");
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
       tValue = tRecAttrResult->u_64_value();
 
-      info->m_first_tuple_id = tValue - opValue;
-      info->m_last_tuple_id  = tValue - 1;
-      opValue = info->m_first_tuple_id; // out
+      range.m_first_tuple_id = tValue - opValue;
+      range.m_last_tuple_id  = tValue - 1;
+      opValue = range.m_first_tuple_id; // out
       break;
     case 1:
       // create on first use
@@ -1026,11 +1090,10 @@
       tOperation->equal("SYSKEY_0", aTableId );
       tOperation->setValue("NEXTID", opValue);
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
-      info->m_first_tuple_id = ~(Uint64)0;
-      info->m_last_tuple_id  = ~(Uint64)0;
+      range.reset();
       break;
     case 2:
       tOperation->interpretedUpdateTuple();
@@ -1044,7 +1107,7 @@
       tOperation->def_label(0);
       tOperation->interpret_exit_nok(9999);
       
-      if (tConnection->execute( Commit ) == -1)
+      if (tConnection->execute( NdbTransaction::Commit ) == -1)
       {
         if (tConnection->theError.code != 9999)
           goto error_handler;
@@ -1053,15 +1116,15 @@
       {
         DBUG_PRINT("info", 
                    ("Setting next auto increment value (db) to %lu",
-                    (ulong)opValue));  
-        info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
+                    (ulong) opValue));  
+        range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
       }
       break;
     case 3:
       tOperation->readTuple();
       tOperation->equal("SYSKEY_0", aTableId );
       tRecAttrResult = tOperation->getValue("NEXTID");
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
       opValue = tRecAttrResult->u_64_value(); // out
       break;
@@ -1071,29 +1134,28 @@
 
   this->closeTransaction(tConnection);
 
-  // Restore current name space
-  setDatabaseName(currentDb.c_str());
-  setDatabaseSchemaName(currentSchema.c_str());
-
   DBUG_RETURN(0);
 
-  error_handler:
+error_handler:
+  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
+             theError.code,
+             tConnection != NULL ? tConnection->theError.code : -1,
+             tOperation != NULL ? tOperation->theError.code : -1));
+
+  if (theError.code == 0 && tConnection != NULL)
     theError.code = tConnection->theError.code;
+  if (theError.code == 0 && tOperation != NULL)
+    theError.code = tOperation->theError.code;
+  DBUG_ASSERT(theError.code != 0);
 
-    savedError = theError;
+  NdbError savedError;
+  savedError = theError;
 
+  if (tConnection != NULL)
     this->closeTransaction(tConnection);
-    theError = savedError;
 
-  error_return:
-    // Restore current name space
-    setDatabaseName(currentDb.c_str());
-    setDatabaseSchemaName(currentSchema.c_str());
+  theError = savedError;
 
-  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
-             theError.code,
-             tConnection ? tConnection->theError.code : -1,
-             tOperation ? tOperation->theError.code : -1));
   DBUG_RETURN(-1);
 }
 
@@ -1114,39 +1176,43 @@
   return Data;
 #endif
 }
+
+// <internal>
+Ndb_cluster_connection &
+Ndb::get_ndb_cluster_connection()
+{
+  return theImpl->m_ndb_cluster_connection;
+}
+
 const char * Ndb::getCatalogName() const
 {
   return theImpl->m_dbname.c_str();
 }
 
-
 void Ndb::setCatalogName(const char * a_catalog_name)
 {
-  if (a_catalog_name)
-  {
+  // TODO can table_name_separator be escaped?
+  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
     theImpl->m_dbname.assign(a_catalog_name);
     theImpl->update_prefix();
   }
 }
 
-
 const char * Ndb::getSchemaName() const
 {
   return theImpl->m_schemaname.c_str();
 }
 
-
 void Ndb::setSchemaName(const char * a_schema_name)
 {
-  if (a_schema_name) {
+  // TODO can table_name_separator be escaped?
+  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
     theImpl->m_schemaname.assign(a_schema_name);
     theImpl->update_prefix();
   }
 }
+// </internal>
  
-/*
-Deprecated functions
-*/
 const char * Ndb::getDatabaseName() const
 {
   return getCatalogName();
@@ -1166,6 +1232,26 @@
 {
   setSchemaName(a_schema_name);
 }
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+  const char* s0 = t->m_impl.m_internalName.c_str();
+  const char* s1 = strchr(s0, table_name_separator);
+  if (s1 && s1 != s0) {
+    const char* s2 = strchr(s1 + 1, table_name_separator);
+    if (s2 && s2 != s1 + 1) {
+      char buf[NAME_LEN + 1];
+      if (s1 - s0 <= NAME_LEN && s2 - (s1 + 1) <= NAME_LEN) {
+        sprintf(buf, "%.*s", (int) (s1 - s0), s0);
+        setDatabaseName(buf);
+        sprintf(buf, "%.*s", (int) (s2 - (s1 + 1)), s1 + 1);
+        setDatabaseSchemaName(buf);
+        return 0;
+      }
+    }
+  }
+  return -1;
+}
  
 bool Ndb::usingFullyQualifiedNames()
 {
@@ -1228,9 +1314,16 @@
   if (fullyQualifiedNames)
   {
     /* Internal table name format <db>/<schema>/<table>
-       <db>/<schema> is already available in m_prefix
+       <db>/<schema>/ is already available in m_prefix
        so just concat the two strings
      */
+#ifdef VM_TRACE
+    // verify that m_prefix looks like abc/def/
+    const char* s0 = theImpl->m_prefix.c_str();
+    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 +
1) == 0);
+#endif
     ret.assfmt("%s%s",
                theImpl->m_prefix.c_str(),
                external_name);
@@ -1242,6 +1335,35 @@
   DBUG_RETURN(ret);
 }
 
+const BaseString
+Ndb::old_internalize_index_name(const NdbTableImpl * table,
+				const char * external_name) const
+{
+  BaseString ret;
+  DBUG_ENTER("old_internalize_index_name");
+  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
+                       external_name, table ? table->m_id : ~0));
+  if (!table)
+  {
+    DBUG_PRINT("error", ("!table"));
+    DBUG_RETURN(ret);
+  }
+
+  if (fullyQualifiedNames)
+  {
+    /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+    ret.assfmt("%s%d%c%s",
+               theImpl->m_prefix.c_str(),
+               table->m_id,
+               table_name_separator,
+               external_name);
+  }
+  else
+    ret.assign(external_name);
+
+  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
+  DBUG_RETURN(ret);
+}
 
 const BaseString
 Ndb::internalize_index_name(const NdbTableImpl * table,
@@ -1250,7 +1372,7 @@
   BaseString ret;
   DBUG_ENTER("internalize_index_name");
   DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
-                       external_name, table ? table->m_tableId : ~0));
+                       external_name, table ? table->m_id : ~0));
   if (!table)
   {
     DBUG_PRINT("error", ("!table"));
@@ -1259,10 +1381,10 @@
 
   if (fullyQualifiedNames)
   {
-    /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+    /* Internal index name format sys/def/<tabid>/<table> */
     ret.assfmt("%s%d%c%s",
-               theImpl->m_prefix.c_str(),
-               table->m_tableId,
+               theImpl->m_systemPrefix.c_str(),
+               table->m_id,
                table_name_separator,
                external_name);
   }
@@ -1307,6 +1429,113 @@
   BaseString ret = BaseString(schemaName);
   delete [] schemaName;
   return ret;
+}
+
+// ToDo set event buffer size
+NdbEventOperation* Ndb::createEventOperation(const char* eventName)
+{
+  DBUG_ENTER("Ndb::createEventOperation");
+  NdbEventOperation* tOp= theEventBuffer->createEventOperation(eventName,
+							       theError);
+  if (tOp)
+  {
+    // keep track of all event operations
+    NdbEventOperationImpl *op=
+      NdbEventBuffer::getEventOperationImpl(tOp);
+    op->m_next= theImpl->m_ev_op;
+    op->m_prev= 0;
+    theImpl->m_ev_op= op;
+    if (op->m_next)
+      op->m_next->m_prev= op;
+  }
+
+  DBUG_RETURN(tOp);
+}
+
+int Ndb::dropEventOperation(NdbEventOperation* tOp)
+{
+  DBUG_ENTER("Ndb::dropEventOperation");
+  DBUG_PRINT("info", ("name: %s", tOp->getEvent()->getTable()->getName()));
+  // remove it from list
+  NdbEventOperationImpl *op=
+    NdbEventBuffer::getEventOperationImpl(tOp);
+  if (op->m_next)
+    op->m_next->m_prev= op->m_prev;
+  if (op->m_prev)
+    op->m_prev->m_next= op->m_next;
+  else
+    theImpl->m_ev_op= op->m_next;
+
+  DBUG_PRINT("info", ("first: %s",
+                      theImpl->m_ev_op ?
theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
+  assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);
+
+  theEventBuffer->dropEventOperation(tOp);
+  DBUG_RETURN(0);
+}
+
+NdbEventOperation *Ndb::getEventOperation(NdbEventOperation* tOp)
+{
+  NdbEventOperationImpl *op;
+  if (tOp)
+    op= NdbEventBuffer::getEventOperationImpl(tOp)->m_next;
+  else
+    op= theImpl->m_ev_op;
+  if (op)
+    return op->m_facade;
+  return 0;
+}
+
+int
+Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
+{
+  return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
+}
+
+int
+Ndb::flushIncompleteEvents(Uint64 gci)
+{
+  return theEventBuffer->flushIncompleteEvents(gci);
+}
+
+NdbEventOperation *Ndb::nextEvent()
+{
+  return theEventBuffer->nextEvent();
+}
+
+const NdbEventOperation*
+Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
+{
+  NdbEventOperationImpl* op =
+    theEventBuffer->getGCIEventOperations(iter, event_types);
+  if (op != NULL)
+    return op->m_facade;
+  return NULL;
+}
+
+Uint64 Ndb::getLatestGCI()
+{
+  return theEventBuffer->getLatestGCI();
+}
+
+void Ndb::setReportThreshEventGCISlip(unsigned thresh)
+{
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+   theEventBuffer->m_free_thresh= thresh;
+   theEventBuffer->m_min_free_thresh= thresh;
+   theEventBuffer->m_max_free_thresh= 100;
+ }
+}
+
+void Ndb::setReportThreshEventFreeMem(unsigned thresh)
+{
+  if (theEventBuffer->m_free_thresh != thresh)
+  {
+    theEventBuffer->m_free_thresh= thresh;
+    theEventBuffer->m_min_free_thresh= thresh;
+    theEventBuffer->m_max_free_thresh= 100;
+  }
 }
 
 #ifdef VM_TRACE

--- 1.66.24.1/ndb/src/ndbapi/NdbScanOperation.cpp	2007-02-07 18:29:29 +07:00
+++ 1.110/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2007-02-09 16:26:20 +07:00
@@ -49,6 +49,7 @@
   m_receivers = 0;
   m_array = new Uint32[1]; // skip if on delete in fix_receivers
   theSCAN_TABREQ = 0;
+  m_executed = false;
 }
 
 NdbScanOperation::~NdbScanOperation()
@@ -110,6 +111,7 @@
   theNdbCon->theMagicNumber = 0xFE11DF;
   theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
   m_read_range_no = 0;
+  m_executed = false;
   return 0;
 }
 
@@ -161,9 +163,25 @@
   }
 
   m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
+  bool tupScan = (scan_flags & SF_TupScan);
 
+#if 1 // XXX temp for testing
+  { char* p = getenv("NDB_USE_TUPSCAN");
+    if (p != 0) {
+      unsigned n = atoi(p); // 0-10
+      if ((unsigned int) (::time(0) % 10) < n) tupScan = true;
+    }
+  }
+#endif
+  if (scan_flags & SF_DiskScan)
+  {
+    tupScan = true;
+    m_no_disk_flag = false;
+  }
+  
   bool rangeScan = false;
-  if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
+  if ( (int) m_accessTable->m_indexType ==
+       (int) NdbDictionary::Index::OrderedIndex)
   {
     if (m_currentTable == m_accessTable){
       // Old way of scanning indexes, should not be allowed
@@ -176,12 +194,9 @@
     theStatus = GetValue;
     theOperationType  = OpenRangeScanRequest;
     rangeScan = true;
-  }
-
-  bool tupScan = (scan_flags & SF_TupScan);
-  if (tupScan && rangeScan)
     tupScan = false;
-
+  }
+  
   if (rangeScan && (scan_flags & SF_OrderBy))
     parallel = fragCount;
   
@@ -201,7 +216,7 @@
   theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
   ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
   req->apiConnectPtr = theNdbCon->theTCConPtr;
-  req->tableId = m_accessTable->m_tableId;
+  req->tableId = m_accessTable->m_id;
   req->tableSchemaVersion = m_accessTable->m_version;
   req->storedProcId = 0xFFFF;
   req->buddyConPtr = theNdbCon->theBuddyConPtr;
@@ -362,7 +377,7 @@
 int
 NdbScanOperation::executeCursor(int nodeId){
   NdbTransaction * tCon = theNdbCon;
-  TransporterFacade* tp = TransporterFacade::instance();
+  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
   Guard guard(tp->theMutexPtr);
 
   Uint32 magic = tCon->theMagicNumber;
@@ -384,6 +399,7 @@
     if (doSendScan(nodeId) == -1)
       return -1;
 
+    m_executed= true; // Mark operation as executed
     return 0;
   } else {
     if (!(tp->get_node_stopping(nodeId) &&
@@ -463,18 +479,28 @@
   }
   
   Uint32 nodeId = theNdbCon->theDBnode;
-  TransporterFacade* tp = TransporterFacade::instance();
-  Guard guard(tp->theMutexPtr);
-  if(theError.code)
-    return -1;
+  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+  /*
+    The PollGuard has an implicit call of unlock_and_signal through the
+    ~PollGuard method. This method is called implicitly by the compiler
+    in all places where the object is out of context due to a return,
+    break, continue or simply end of statement block
+  */
+  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                       theNdb->theNdbBlockNumber);
 
-  Uint32 seq = theNdbCon->theNodeSequence;
-  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
-							  forceSend) == 0){
+  const Uint32 seq = theNdbCon->theNodeSequence;
+
+  if(theError.code)
+  {
+    goto err4;
+  }
+  
+  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
+  {
       
     idx = m_current_api_receiver;
     last = m_api_receivers_count;
-
     Uint32 timeout = tp->m_waitfor_timeout;
       
     do {
@@ -501,12 +527,10 @@
 	/**
 	 * No completed...
 	 */
-	theNdb->theImpl->theWaiter.m_node = nodeId;
-	theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-	int return_code = theNdb->receiveResponse(3*timeout);
-	if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+        int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
+	if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 	  continue;
-	} else if(return_code == -1){
+	} else if(ret_code == -1){
 	  retVal = -1;
 	} else {
 	  idx = last;
@@ -556,17 +580,21 @@
     if(theError.code == 0)
       setErrorCode(4028); // seq changed = Node fail
     break;
+  case -4:
+err4:
+    setErrorCode(theError.code);
+    break;
   }
     
   theNdbCon->theTransactionIsStarted = false;
   theNdbCon->theReleaseOnClose = true;
-  if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
+  if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
   return -1;
 }
 
 int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
-				 bool forceSend){  
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
+{
   if(cnt > 0){
     NdbApiSignal tSignal(theNdb->theMyRef);
     tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -601,7 +629,7 @@
     if(sent)
     {
       Uint32 nodeId = theNdbCon->theDBnode;
-      TransporterFacade * tp = TransporterFacade::instance();
+      TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
       if(cnt > 21){
 	tSignal.setLength(4);
 	LinearSectionPtr ptr[3];
@@ -613,9 +641,6 @@
 	ret = tp->sendSignal(&tSignal, nodeId);
       }
     }
-    
-    if (!ret) checkForceSend(forceSend);
-
     m_sent_receivers_count = last + sent;
     m_api_receivers_count -= cnt;
     m_current_api_receiver = 0;
@@ -625,15 +650,6 @@
   return 0;
 }
 
-void NdbScanOperation::checkForceSend(bool forceSend)
-{
-  if (forceSend) {
-    TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
-  } else {
-    TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
-  }//if
-}
-
 int 
 NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
 {
@@ -653,8 +669,8 @@
 {
   DBUG_ENTER("NdbScanOperation::close");
   DBUG_PRINT("enter", ("this: 0x%lx  tcon: 0x%lx  con: 0x%lx  force: %d  release: %d",
-                       (long)this,
-                       (long)m_transConnection, (long)theNdbCon,
+                       (long) this,
+                       (long) m_transConnection, (long) theNdbCon,
                        forceSend, releaseOp));
 
   if(m_transConnection){
@@ -668,10 +684,16 @@
 	       m_conf_receivers_count,
 	       m_sent_receivers_count);
     
-    TransporterFacade* tp = TransporterFacade::instance();
-    Guard guard(tp->theMutexPtr);
-    close_impl(tp, forceSend);
-    
+    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                         theNdb->theNdbBlockNumber);
+    close_impl(tp, forceSend, &poll_guard);
   }
 
   NdbConnection* tCon = theNdbCon;
@@ -794,6 +816,7 @@
    */
   Uint32 reqInfo = req->requestInfo;
   ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
+  ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
   req->requestInfo = reqInfo;
   
   for(Uint32 i = 0; i<theParallelism; i++){
@@ -829,7 +852,6 @@
   tSignal = theSCAN_TABREQ;
   
   Uint32 tupKeyLen = theTupKeyLen;
-  Uint32 len = theTotalNrOfKeyWordInSignal;
   Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
   Uint64 transId = theNdbCon->theTransactionId;
   
@@ -844,7 +866,7 @@
   req->requestInfo = tmp;
   tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
 
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
   LinearSectionPtr ptr[3];
   ptr[0].p = m_prepared_receivers;
   ptr[0].sz = theParallelism;
@@ -928,13 +950,20 @@
  *     the scan process. 
  ****************************************************************************/
 int
-NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
+NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
 {
   NdbRecAttr * tRecAttr = m_curr_row;
   if(tRecAttr)
   {
     const Uint32 * src = (Uint32*)tRecAttr->aRef();
-    memcpy(data, src, 4*size);
+
+    assert(tRecAttr->get_size_in_bytes() > 0);
+    assert(tRecAttr->get_size_in_bytes() < 65536);
+    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
+
+    assert(size >= len);
+    memcpy(data, src, 4*len);
+    size = len;
     return 0;
   }
   return -1;
@@ -959,10 +988,13 @@
     }
     pTrans->theSimpleState = 0;
     
-    const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
-
+    assert(tRecAttr->get_size_in_bytes() > 0);
+    assert(tRecAttr->get_size_in_bytes() < 65536);
+    const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
+    
     newOp->theTupKeyLen = len;
     newOp->theOperationType = opType;
+    newOp->m_abortOption = AbortOnError;
     switch (opType) {
     case (ReadRequest):
       newOp->theLockMode = theLockMode;
@@ -1058,23 +1090,23 @@
 
 int
 NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
-				const void* aValue, Uint32 len)
+				const void* aValue)
 {
-  return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
+  return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
 }
 
 int
 NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
-				const void* aValue, Uint32 len)
+				const void* aValue)
 {
-  return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
+  return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
 }
 
 int
 NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
-				  const char* aValue, 
-				  Uint32 len){
-  return setBound(anAttrObject, BoundEQ, aValue, len);
+				  const char* aValue)
+{
+  return setBound(anAttrObject, BoundEQ, aValue);
 }
 
 NdbRecAttr*
@@ -1084,7 +1116,7 @@
     return NdbScanOperation::getValue_impl(attrInfo, aValue);
   }
   
-  int id = attrInfo->m_attrId;                // In "real" table
+  int id = attrInfo->getColumnNo();                // In "real" table
   assert(m_accessTable->m_index);
   int sz = (int)m_accessTable->m_index->m_key_ids.size();
   if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
@@ -1121,7 +1153,7 @@
  */
 int
 NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
-				int type, const void* aValue, Uint32 len)
+				int type, const void* aValue)
 {
   if (!tAttrInfo)
   {
@@ -1129,24 +1161,23 @@
     return -1;
   }
   if (theOperationType == OpenRangeScanRequest &&
-      (0 <= type && type <= 4) &&
-      len <= 8000) {
+      (0 <= type && type <= 4)) {
     // insert bound type
     Uint32 currLen = theTotalNrOfKeyWordInSignal;
     Uint32 remaining = KeyInfo::DataLength - currLen;
-    Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
     bool tDistrKey = tAttrInfo->m_distributionKey;
 
-    len = aValue != NULL ? sizeInBytes : 0;
-    if (len != sizeInBytes && (len != 0)) {
-      setErrorCodeAbort(4209);
-      return -1;
-    }
+    Uint32 len = 0;
+    if (aValue != NULL)
+      if (! tAttrInfo->get_var_length(aValue, len)) {
+        setErrorCodeAbort(4209);
+        return -1;
+      }
 
     // insert attribute header
     Uint32 tIndexAttrId = tAttrInfo->m_attrId;
     Uint32 sizeInWords = (len + 3) / 4;
-    AttributeHeader ah(tIndexAttrId, sizeInWords);
+    AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
     const Uint32 ahValue = ah.m_value;
 
     const Uint32 align = (UintPtr(aValue) & 7);
@@ -1156,25 +1187,31 @@
     const bool nobytes = (len & 0x3) == 0;
     const Uint32 totalLen = 2 + sizeInWords;
     Uint32 tupKeyLen = theTupKeyLen;
+    union {
+      Uint32 tempData[2000];
+      Uint64 __align;
+    };
+    Uint64 *valPtr;
     if(remaining > totalLen && aligned && nobytes){
       Uint32 * dst = theKEYINFOptr + currLen;
       * dst ++ = type;
       * dst ++ = ahValue;
       memcpy(dst, aValue, 4 * sizeInWords);
       theTotalNrOfKeyWordInSignal = currLen + totalLen;
+      valPtr = (Uint64*)aValue;
     } else {
       if(!aligned || !nobytes){
-        Uint32 tempData[2000];
 	tempData[0] = type;
 	tempData[1] = ahValue;
 	tempData[2 + (len >> 2)] = 0;
         memcpy(tempData+2, aValue, len);
-	
 	insertBOUNDS(tempData, 2+sizeInWords);
+	valPtr = (Uint64*)(tempData+2);
       } else {
 	Uint32 buf[2] = { type, ahValue };
 	insertBOUNDS(buf, 2);
 	insertBOUNDS((Uint32*)aValue, sizeInWords);
+	valPtr = (Uint64*)aValue;
       }
     }
     theTupKeyLen = tupKeyLen + totalLen;
@@ -1191,7 +1228,7 @@
     if(type == BoundEQ && tDistrKey && !m_multi_range)
     {
       theNoOfTupKeyLeft--;
-      return handle_distribution_key((Uint64*)aValue, sizeInWords);
+      return handle_distribution_key(valPtr, sizeInWords);
     }
     return 0;
   } else {
@@ -1240,6 +1277,31 @@
   return -1;
 }
 
+Uint32
+NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
+{
+  DBUG_ENTER("NdbIndexScanOperation::getKeyFromSCANTABREQ");
+  assert(size >= theTotalNrOfKeyWordInSignal);
+  size = theTotalNrOfKeyWordInSignal;
+  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
+  Uint32 pos = 0;
+  while (pos < size) {
+    assert(tSignal != NULL);
+    Uint32* tData = tSignal->getDataPtrSend();
+    Uint32 rem = size - pos;
+    if (rem > KeyInfo::DataLength)
+      rem = KeyInfo::DataLength;
+    Uint32 i = 0;
+    while (i < rem) {
+      data[pos + i] = tData[KeyInfo::HeaderLength + i];
+      i++;
+    }
+    pos += rem;
+  }
+  DBUG_DUMP("key", (char*)data, size << 2);
+  DBUG_RETURN(size);
+}
+
 int
 NdbIndexScanOperation::readTuples(LockMode lm,
 				  Uint32 scan_flags,
@@ -1300,8 +1362,6 @@
   Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
   assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
   
-  const NdbIndexImpl * idx = m_accessTable->m_index;
-  const NdbTableImpl * tab = m_currentTable;
   for(Uint32 i = 0; i<cnt; i++){
     Uint32 val = theTupleKeyDefined[i][0];
     switch(val){
@@ -1340,10 +1400,11 @@
       return (r1_null ? -1 : 1) * jdir;
     }
     const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
-    Uint32 len = r1->theAttrSize * r1->theArraySize;
+    Uint32 len1 = r1->get_size_in_bytes();
+    Uint32 len2 = r2->get_size_in_bytes();
     if(!r1_null){
       const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
-      int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
+      int r = (*sqlType.m_cmp)(col.m_cs, d1, len1, d2, len2, true);
       if(r){
 	assert(r != NdbSqlUtil::CmpUnknown);
 	return r * jdir;
@@ -1381,26 +1442,31 @@
   if(fetchNeeded){
     if(fetchAllowed){
       if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
-      TransporterFacade* tp = TransporterFacade::instance();
-      Guard guard(tp->theMutexPtr);
+      TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+      /*
+        The PollGuard has an implicit call of unlock_and_signal through the
+        ~PollGuard method. This method is called implicitly by the compiler
+        in all places where the object is out of context due to a return,
+        break, continue or simply end of statement block
+      */
+      PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                           theNdb->theNdbBlockNumber);
       if(theError.code)
 	return -1;
       Uint32 seq = theNdbCon->theNodeSequence;
       Uint32 nodeId = theNdbCon->theDBnode;
       Uint32 timeout = tp->m_waitfor_timeout;
       if(seq == tp->getNodeSequence(nodeId) &&
-	 !send_next_scan_ordered(s_idx, forceSend)){
+	 !send_next_scan_ordered(s_idx)){
 	Uint32 tmp = m_sent_receivers_count;
 	s_idx = m_current_api_receiver; 
 	while(m_sent_receivers_count > 0 && !theError.code){
-	  theNdb->theImpl->theWaiter.m_node = nodeId;
-	  theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-	  int return_code = theNdb->receiveResponse(3*timeout);
-	  if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+          int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
+	  if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
 	    continue;
 	  }
 	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
-	  if(return_code == -1){
+	  if(ret_code == -1){
 	    setErrorCode(4008);
 	  } else {
 	    setErrorCode(4028);
@@ -1487,7 +1553,8 @@
 }
 
 int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){  
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
+{
   if(idx == theParallelism)
     return 0;
   
@@ -1522,15 +1589,16 @@
   m_sent_receivers_count = last + 1;
   
   Uint32 nodeId = theNdbCon->theDBnode;
-  TransporterFacade * tp = TransporterFacade::instance();
+  TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
   tSignal.setLength(4+1);
   int ret= tp->sendSignal(&tSignal, nodeId);
-  if (!ret) checkForceSend(forceSend);
   return ret;
 }
 
 int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
+                             PollGuard *poll_guard)
+{
   Uint32 seq = theNdbCon->theNodeSequence;
   Uint32 nodeId = theNdbCon->theDBnode;
   
@@ -1541,15 +1609,12 @@
   }
   
   Uint32 timeout = tp->m_waitfor_timeout;
-
   /**
    * Wait for outstanding
    */
   while(theError.code == 0 && m_sent_receivers_count)
   {
-    theNdb->theImpl->theWaiter.m_node = nodeId;
-    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-    int return_code = theNdb->receiveResponse(3*timeout);
+    int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
     switch(return_code){
     case 0:
       break;
@@ -1606,7 +1671,7 @@
   }
   
   // Send close scan
-  if(send_next_scan(api+conf, true, forceSend) == -1)
+  if(send_next_scan(api+conf, true) == -1)
   {
     theNdbCon->theReleaseOnClose = true;
     return -1;
@@ -1617,9 +1682,7 @@
    */
   while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
   {
-    theNdb->theImpl->theWaiter.m_node = nodeId;
-    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
-    int return_code = theNdb->receiveResponse(3*timeout);
+    int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
     switch(return_code){
     case 0:
       break;
@@ -1658,13 +1721,20 @@
 NdbScanOperation::restart(bool forceSend)
 {
   
-  TransporterFacade* tp = TransporterFacade::instance();
-  Guard guard(tp->theMutexPtr);
+  TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+  /*
+    The PollGuard has an implicit call of unlock_and_signal through the
+    ~PollGuard method. This method is called implicitly by the compiler
+    in all places where the object is out of context due to a return,
+    break, continue or simply end of statement block
+  */
+  PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                       theNdb->theNdbBlockNumber);
   Uint32 nodeId = theNdbCon->theDBnode;
   
   {
     int res;
-    if((res= close_impl(tp, forceSend)))
+    if((res= close_impl(tp, forceSend, &poll_guard)))
     {
       return res;
     }
@@ -1678,7 +1748,6 @@
   theError.code = 0;
   if (doSendScan(nodeId) == -1)
     return -1;
-  
   return 0;
 }
 
@@ -1687,9 +1756,16 @@
   int res;
   
   {
-    TransporterFacade* tp = TransporterFacade::instance();
-    Guard guard(tp->theMutexPtr);
-    res= close_impl(tp, forceSend);
+    TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+                         theNdb->theNdbBlockNumber);
+    res= close_impl(tp, forceSend, &poll_guard);
   }
 
   if(!res)
Thread
bk commit into 5.1 tree (tomas:1.2431)tomas9 Feb