List:Commits« Previous MessageNext Message »
From:Martin Skold Date:February 11 2008 2:24pm
Subject:bk commit into 5.1 tree (mskold:1.2676)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-02-11 14:24:17+01:00, mskold@stripped +15 -0
  WL#4083 Resource shortage handling in event mechanism: Adds handling of buffer overflow in Suma during node failure and disconnect of lagging subscribers (subscribers that do not acknowledge GCP's fast enough) based on new configuration parameter MX_BUFFERED_GCP.

  sql/ha_ndbcluster_binlog.cc@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +27 -0
    WL#4083 Resource shortage handling in event mechanism: Added checks for missing data (inconsidtent GCI)

  storage/ndb/include/kernel/signaldata/SumaImpl.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +3 -2
    WL#4083 Resource shortage handling in event mechanism: Added MISSING_DATA bit signal incomplete GCI

  storage/ndb/include/mgmapi/mgmapi_config_parameters.h@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +2 -0
    WL#4083 Resource shortage handling in event mechanism: Added new configuration parameter MAX_BUFFERED_GCP to define how many GCP's are buffered in Suma before unresponsive subscribers are disconnected

  storage/ndb/include/mgmapi/ndb_logevent.h@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +11 -2
    WL#4083 Resource shortage handling in event mechanism: Added new SubscriptionStatus event.

  storage/ndb/include/ndbapi/Ndb.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +25 -0
    WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.

  storage/ndb/src/common/debugger/EventLogger.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +21 -0
    WL#4083 Resource shortage handling in event mechanism: Added printout for change of subscription status log event.

  storage/ndb/src/kernel/blocks/ERROR_codes.txt@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +6 -1
    WL#4083 Resource shortage handling in event mechanism: Added new error inserts for simulating buffer overflow in Suma.

  storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +135 -16
    WL#4083 Resource shortage handling in event mechanism: Added support for managing buffer overflow during node restarts by informing subscriber (will cause binlog injection thread to insert a GAP event). Added checking for non-responsive subscribers (exceeding MAX_BUFFERED_GCP) by disconnecting them.

  storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +5 -0
    WL#4083 Resource shortage handling in event mechanism

  storage/ndb/src/kernel/blocks/suma/SumaInit.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +1 -0
    WL#4083 Resource shortage handling in event mechanism

  storage/ndb/src/mgmsrv/ConfigInfo.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +12 -0
    WL#4083 Resource shortage handling in event mechanism: Added MAX_BUFFERED_GCP that specifies how many GCI's are buffered before subscribers are disconnected.

  storage/ndb/src/ndbapi/Ndb.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +12 -0
    WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +87 -8
    WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +19 -4
    WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.

  storage/ndb/test/ndbapi/test_event.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +185 -0
    WL#4083 Resource shortage handling in event mechanism: Added test cases for testing Suma buffer overflow and disconnect of lagging subscribers.

diff -Nrup a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
--- a/sql/ha_ndbcluster_binlog.cc	2007-12-11 20:53:05 +01:00
+++ b/sql/ha_ndbcluster_binlog.cc	2008-02-11 14:24:16 +01:00
@@ -4593,6 +4593,20 @@ restart:
           Uint32 iter= 0;
           const NdbEventOperation *gci_op;
           Uint32 event_types;
+
+          if (!i_ndb->isConsistentGCI(gci))
+          {
+            char errmsg[64];
+            uint end= sprintf(&errmsg[0],
+                              "Detected missing data in GCI %llu, "
+                              "inserting GAP event", gci);
+            errmsg[end]= '\0';
+            DBUG_PRINT("info",
+                       ("Detected missing data in GCI %llu, "
+                        "inserting GAP event", gci));
+            LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
+            inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
+          }
           while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
                  != NULL)
           {
@@ -4816,6 +4830,19 @@ restart:
                               gci_timer.elapsed_ms(),
                               (1000*event_count) / gci_timer.elapsed_ms());
 #endif
+      }
+      if(!i_ndb->isConsistent(gci))
+      {
+        char errmsg[64];
+        uint end= sprintf(&errmsg[0],
+                          "Detected missing data in GCI %llu, "
+                          "inserting GAP event", gci);
+        errmsg[end]= '\0';
+        DBUG_PRINT("info",
+                   ("Detected missing data in GCI %llu, "
+                    "inserting GAP event", gci));
+        LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
+        inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
       }
     }
 
diff -Nrup a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp	2008-02-11 14:24:16 +01:00
@@ -293,7 +293,7 @@ struct SubTableData {
   SECTION( AFTER_VALUES = 1 );
   SECTION( BEFORE_VALUES = 2 );
   
-  enum LogType {
+  enum Flags {
     SCAN = 1, 
     LOG  = 2,
     REMOVE_FLAGS = 0xff
@@ -303,7 +303,7 @@ struct SubTableData {
   Uint32 gci_hi;
   Uint32 tableId;
   Uint32 requestInfo;
-  Uint32 logType;
+  Uint32 flags;
   union {
     Uint32 changeMask;
     Uint32 anyValue;
@@ -393,6 +393,7 @@ struct SubGcpCompleteRep {
   STATIC_CONST( SignalLength = 5 );
   STATIC_CONST( ON_DISK = 1 );
   STATIC_CONST( IN_MEMORY = 2 );
+  STATIC_CONST( MISSING_DATA = 4 );
 
   Uint32 gci_hi;
   Uint32 senderRef;
diff -Nrup a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2007-09-28 13:55:10 +02:00
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2008-02-11 14:24:16 +01:00
@@ -123,6 +123,8 @@
 #define CFG_DB_MICRO_GCP_INTERVAL     170 /* micro gcp */
 #define CFG_DB_MICRO_GCP_TIMEOUT      171
 
+#define CFG_DB_MAX_BUFFERED_GCP       180 /* subscriptions */
+
 #define CFG_DB_SGA                    198 /* super pool mem */
 #define CFG_DB_DATA_MEM_2             199 /* used in special build in 5.1 */
 
diff -Nrup a/storage/ndb/include/mgmapi/ndb_logevent.h b/storage/ndb/include/mgmapi/ndb_logevent.h
--- a/storage/ndb/include/mgmapi/ndb_logevent.h	2007-06-12 07:03:31 +02:00
+++ b/storage/ndb/include/mgmapi/ndb_logevent.h	2008-02-11 14:24:16 +01:00
@@ -185,12 +185,16 @@ extern "C" {
     /* 59 used */
 
     /** NDB_MGM_EVENT_CATEGORY_STARTUP */
-    NDB_LE_StartReport = 60
+    NDB_LE_StartReport = 60,
 
     /* 61 (used in upcoming patch) */
     /* 62-67 used */
-    /* 68 unused */
+    /* 68 (used in upcoming patch) */
 
+    /** NDB_MGM_EVENT_SEVERITY_WARNING */
+    NDB_LE_SubscriptionStatus = 69
+
+    /* 70 unused */
   };
 
   /**
@@ -714,6 +718,11 @@ extern "C" {
 	unsigned bitmask_size;
 	unsigned bitmask_data[1];
       } StartReport;
+      /** Log event data @ref NDB_LE_SubscriptionStatus */
+      struct {
+        unsigned report_type;
+	unsigned node_id;
+      } SubscriptionStatus;
 #ifndef DOXYGEN_FIX
     };
 #else
diff -Nrup a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp
--- a/storage/ndb/include/ndbapi/Ndb.hpp	2007-11-02 23:46:16 +01:00
+++ b/storage/ndb/include/ndbapi/Ndb.hpp	2008-02-11 14:24:16 +01:00
@@ -1256,6 +1256,31 @@ public:
   NdbEventOperation *nextEvent();
 
   /**
+   * Check if all events are consistent
+   * If node failure occurs during resource exaustion events
+   * may be lost and the delivered event data might thus be incomplete.
+   *
+   * @param OUT aGCI
+   *        any inconsistent GCI found
+   *
+   * @return true if all received events are consistent, false if possible
+   * inconsistency
+   */
+  bool isConsistent(Uint64& gci);
+
+  /**
+   * Check if all events in a GCI are consistent
+   * If node failure occurs during resource exaustion events
+   * may be lost and the delivered event data might thus be incomplete.
+   *
+  * @param aGCI
+   *        the GCI to check
+   *
+   * @return true if GCI is consistent, false if possible inconsistency
+   */
+  bool isConsistentGCI(Uint64 gci);
+
+  /**
    * Iterate over distinct event operations which are part of current
    * GCI.  Valid after nextEvent.  Used to get summary information for
    * the epoch (e.g. list of all tables) before processing event data.
diff -Nrup a/storage/ndb/src/common/debugger/EventLogger.cpp b/storage/ndb/src/common/debugger/EventLogger.cpp
--- a/storage/ndb/src/common/debugger/EventLogger.cpp	2007-10-08 15:28:10 +02:00
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp	2008-02-11 14:24:16 +01:00
@@ -990,6 +990,26 @@ void getTextStartReport(QQQQ) {
   }
 }
 
+void getTextSubscriptionStatus(QQQQ)
+{
+  switch(theData[1]) {
+  case(1): // SubscriptionStatus::DISCONNECTED
+    BaseString::snprintf(m_text, m_text_len,
+                         "Disconnecting node %u because it has "
+                         "exceeded MaxBufferedEpochs, gci %lld",
+                         theData[2],
+                         make_uint64(theData[3], theData[4]));
+    break;
+  case(2): // SubscriptionStatus::INCONSISTENT
+    BaseString::snprintf(m_text, m_text_len,
+                         "Nodefailure while out of event buffer: "
+                         "informing subscribers of possibly missing event data"
+                         ", gci %lld",
+                         make_uint64(theData[3], theData[4]));
+    break;
+  }
+}
+
 #if 0
 BaseString::snprintf(m_text, 
 		     m_text_len, 
@@ -1071,6 +1091,7 @@ const EventLoggerBase::EventRepLogLevelM
   ROW(MissedHeartbeat,         LogLevel::llError,  8, Logger::LL_WARNING ),
   ROW(DeadDueToHeartbeat,      LogLevel::llError,  8, Logger::LL_ALERT   ),
   ROW(WarningEvent,            LogLevel::llError,  2, Logger::LL_WARNING ),
+  ROW(SubscriptionStatus,      LogLevel::llError,  4, Logger::LL_WARNING ),
   // INFO
   ROW(SentHeartbeat,           LogLevel::llInfo,  12, Logger::LL_INFO ),
   ROW(CreateLogBytes,          LogLevel::llInfo,  11, Logger::LL_INFO ),
diff -Nrup a/storage/ndb/src/kernel/blocks/ERROR_codes.txt b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2007-12-03 09:44:13 +01:00
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2008-02-11 14:24:16 +01:00
@@ -11,7 +11,7 @@ Next CMVMI 9000
 Next BACKUP 10038
 Next DBUTIL 11002
 Next DBTUX 12008
-Next SUMA 13034
+Next SUMA 13037
 
 TESTING NODE FAILURE, ARBITRATION
 ---------------------------------
@@ -567,3 +567,8 @@ NDBCNTR:
 
 1000: Crash insertion on SystemError::CopyFragRef
 1001: Delay sending NODE_FAILREP (to own node), until error is cleared
+
+SUMA:
+13034: Simulate report MISSING_DATA at node failure
+13035: Simulate disconnect lagging subscribers
+13036: Simulate out of event buffer
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2007-12-03 14:32:38 +01:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-02-11 14:24:16 +01:00
@@ -165,11 +165,13 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
   ndbrequire(p != 0);
 
   // SumaParticipant
-  Uint32 noTables, noAttrs;
+  Uint32 noTables, noAttrs, maxBufferedGcp;
   ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES,  
 			    &noTables);
   ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES,  
 			    &noAttrs);
+  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_GCP,
+                            &maxBufferedGcp);
 
   c_tablePool.setSize(noTables);
   c_tables.setSize(noTables);
@@ -181,6 +183,8 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
   c_syncPool.setSize(2);
   c_dataBufferPool.setSize(noAttrs);
 
+  c_maxBufferedGcp = maxBufferedGcp;
+
   // Calculate needed gcp pool as 10 records + the ones needed
   // during a possible api timeout
   Uint32 dbApiHbInterval, gcpInterval;
@@ -225,7 +229,8 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
   m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
   m_gcp_complete_rep_count = 0;
   m_out_of_buffer_gci = 0;
- 
+  m_missing_data = false;
+
   c_startup.m_wait_handover= false; 
   c_failedApiNodes.clear();
 
@@ -3518,7 +3523,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
     data->tableId        = tableId;
     data->requestInfo    = 0;
     SubTableData::setOperation(data->requestInfo, event);
-    data->logType        = 0;
+    data->flags          = 0;
     data->anyValue       = any_value;
     data->totalLen       = ptrLen;
     
@@ -3556,6 +3561,77 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
 }
 
 void
+Suma::checkMaxBufferedGCP(Signal *signal)
+{
+  /*
+   * Check if any subscribers are exceeding the MaxBufferedEpochs
+   */
+  jamEntry();
+  if (c_gcp_list.isEmpty())
+  {
+    jam();
+    return;
+  }
+  Ptr<Gcp_record> gcp;
+  c_gcp_list.first(gcp);
+  if (ERROR_INSERTED(13035))
+  {
+    jam();
+    CLEAR_ERROR_INSERT_VALUE;
+    ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
+            c_maxBufferedGcp, m_max_seen_gci,
+            m_last_complete_gci, gcp.p->m_gci);
+    c_maxBufferedGcp = 1;
+  }
+  if (m_max_seen_gci - gcp.p->m_gci >= (Uint64) c_maxBufferedGcp)
+  {
+    NodeBitmask subs = c_subscriber_nodes;
+    jam();
+    // Disconnect lagging subscribers
+    for(; !gcp.isNull(); c_gcp_list.next(gcp))
+    {
+      jam();
+      if (m_max_seen_gci - gcp.p->m_gci >= (Uint64) c_maxBufferedGcp)
+      {
+        jam();
+        for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
+        {
+          if (subs.get(nodeId))
+          {
+           jam();
+           subs.clear(nodeId);
+           // Disconnecting node
+           signal->theData[0] = NDB_LE_SubscriptionStatus;
+           signal->theData[1] = 1; // DISCONNECTED;
+           signal->theData[2] = nodeId;
+           signal->theData[3] = (Uint32) gcp.p->m_gci;
+           signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
+           sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
+           infoEvent("MaxBufferedEpochs %llu exceeded %u",
+                     m_max_seen_gci - gcp.p->m_gci,
+                     c_maxBufferedGcp);
+
+            /**
+             * Force API_FAILREQ
+            */
+           signal->theData[0] = nodeId;
+            EXECUTE_DIRECT(QMGR, GSN_API_FAILREQ, signal, 1);
+          }
+        }
+      }
+      else
+      {
+        /*
+         * We have found a newer gci that still is
+         * allowed to be buffered
+         */
+        break;
+      }
+    }
+  }
+}
+
+void
 Suma::execSUB_GCP_COMPLETE_REP(Signal* signal)
 {
   jamEntry();
@@ -3565,7 +3641,17 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
   Uint32 gci_hi = rep->gci_hi;
   Uint32 gci_lo = rep->gci_lo;
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
-  Uint32 flags = rep->flags;
+  Uint32 flags = (m_missing_data)
+                 ? rep->flags | SubGcpCompleteRep::MISSING_DATA
+                 : rep->flags;
+
+  if (ERROR_INSERTED(13034))
+  {
+    jam();
+    CLEAR_ERROR_INSERT_VALUE;
+    ndbout_c("Simulating out of event buffer at node failure");
+    flags |= SubGcpCompleteRep::MISSING_DATA;
+  }
 
 #ifdef VM_TRACE
   if (m_gcp_monitor == 0)
@@ -3584,6 +3670,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
 #endif
 
   m_last_complete_gci = gci;
+  checkMaxBufferedGCP(signal);
   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
 
   /**
@@ -3718,6 +3805,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
   {
     infoEvent("Reenable event buffer");
     m_out_of_buffer_gci = 0;
+    m_missing_data = false;
   }
 }
 
@@ -3868,7 +3956,7 @@ Suma::execALTER_TAB_REQ(Signal *signal)
   SubTableData::setOperation(data->requestInfo, 
 			     NdbDictionary::Event::_TE_ALTER);
   SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
-  data->logType        = 0;
+  data->flags          = 0;
   data->changeMask     = changeMask;
   data->totalLen       = tabInfoPtr.sz;
   {
@@ -3920,6 +4008,13 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
   m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
 
+  if (ERROR_INSERTED(13035))
+  {
+    jam();
+    ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack");
+    return;
+  }
+
   if (refToBlock(senderRef) == SUMA) {
     jam();
     // Ack from other SUMA
@@ -4763,7 +4858,7 @@ Suma::out_of_buffer(Signal* signal)
   
   m_out_of_buffer_gci = m_last_complete_gci - 1;
   infoEvent("Out of event buffer: nodefailure will cause event failures");
-
+  m_missing_data = false;
   out_of_buffer_release(signal, 0);
 }
 
@@ -4805,11 +4900,19 @@ Suma::out_of_buffer_release(Signal* sign
    */
   m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci 
     ? m_max_seen_gci : m_last_complete_gci;
+  m_missing_data = false;
 }
 
 Uint32
 Suma::seize_page()
 {
+  if (ERROR_INSERTED(13036))
+  {
+    jam();
+    CLEAR_ERROR_INSERT_VALUE;
+    ndbout_c("Simulating out of event buffer");
+    m_out_of_buffer_gci = m_max_seen_gci;
+  }
   if(unlikely(m_out_of_buffer_gci))
   {
     return RNIL;
@@ -4941,20 +5044,26 @@ void
 Suma::start_resend(Signal* signal, Uint32 buck)
 {
   printf("start_resend(%d, ", buck);
-  
-  if(m_out_of_buffer_gci)
-  {
-    progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 
-	      "Nodefailure while out of event buffer");
-    return;
-  }
-  
+
   /**
    * Resend from m_max_acked_gci + 1 until max_gci + 1
    */
   Bucket* bucket= c_buckets + buck;
   Page_pos pos= bucket->m_buffer_head;
 
+  if(m_out_of_buffer_gci)
+  {
+    Ptr<Gcp_record> gcp;
+    c_gcp_list.last(gcp);
+    signal->theData[0] = NDB_LE_SubscriptionStatus;
+    signal->theData[1] = 2; // INCONSISTENT;
+    signal->theData[3] = (Uint32) pos.m_max_gci;
+    signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
+    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+    m_missing_data = true;
+    return;
+  }
+
   if(pos.m_page_id == RNIL)
   {
     jam();
@@ -5087,9 +5196,19 @@ Suma::resend_bucket(Signal* signal, Uint
       SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
       rep->gci_hi = last_gci >> 32;
       rep->gci_lo = last_gci & 0xFFFFFFFF;
-      rep->flags = 0;
+      rep->flags = (m_missing_data)
+                   ? SubGcpCompleteRep::MISSING_DATA
+                   : 0;
       rep->senderRef  = reference();
       rep->gcp_complete_rep_count = 1;
+
+      if (ERROR_INSERTED(13034))
+      {
+        jam();
+        CLEAR_ERROR_INSERT_VALUE;
+        ndbout_c("Simulating out of event buffer at node failure");
+        rep->flags |= SubGcpCompleteRep::MISSING_DATA;
+      }
   
       char buf[255];
       c_subscriber_nodes.getText(buf);
@@ -5135,7 +5254,7 @@ Suma::resend_bucket(Signal* signal, Uint
 	data->tableId        = table;
 	data->requestInfo    = 0;
 	SubTableData::setOperation(data->requestInfo, event);
-	data->logType        = 0;
+	data->flags          = 0;
 	data->anyValue       = any_value;
 	data->totalLen       = ptrLen;
 	
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.hpp b/storage/ndb/src/kernel/blocks/suma/Suma.hpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2008-02-11 14:24:16 +01:00
@@ -349,6 +349,8 @@ public:
   ArrayPool<SyncRecord> c_syncPool;
   DataBuffer<15>::DataBufferPool c_dataBufferPool;
 
+  Uint32 c_maxBufferedGcp;
+
   NodeBitmask c_failedApiNodes;
   
   /**
@@ -600,11 +602,14 @@ private:
 
   Uint64 get_current_gci(Signal*);
 
+  void checkMaxBufferedGCP(Signal *signal);
+
   Uint64 m_max_seen_gci;      // FIRE_TRIG_ORD
   Uint64 m_max_sent_gci;      // FIRE_TRIG_ORD -> send
   Uint64 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
   Uint64 m_out_of_buffer_gci;
   Uint32 m_gcp_complete_rep_count;
+  bool m_missing_data;
 
   struct Gcp_record 
   {
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
--- a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2008-02-11 14:24:16 +01:00
@@ -127,6 +127,7 @@ Suma::Suma(Block_context& ctx) :
 #ifdef VM_TRACE
   m_gcp_monitor = 0;
 #endif
+  m_missing_data = false;
 }
 
 Suma::~Suma()
diff -Nrup a/storage/ndb/src/mgmsrv/ConfigInfo.cpp b/storage/ndb/src/mgmsrv/ConfigInfo.cpp
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2007-09-28 13:55:10 +02:00
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2008-02-11 14:24:16 +01:00
@@ -917,6 +917,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
     "32000" },
 
   {
+    CFG_DB_MAX_BUFFERED_GCP,
+    "MaxBufferedEpochs",
+    DB_TOKEN,
+    "Allowed numbered of epochs that a subscribing node can lag behind (unprocessed epochs).  Exceeding will cause lagging subscribers to be disconnected.",
+    ConfigInfo::CI_USED,
+    true,
+    ConfigInfo::CI_INT,
+    "100",
+    "0",
+    "100000" },
+
+  {
     CFG_DB_NO_REDOLOG_FILES,
     "NoOfFragmentLogFiles",
     DB_TOKEN,
diff -Nrup a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
--- a/storage/ndb/src/ndbapi/Ndb.cpp	2007-09-07 12:30:54 +02:00
+++ b/storage/ndb/src/ndbapi/Ndb.cpp	2008-02-11 14:24:16 +01:00
@@ -1817,6 +1817,18 @@ NdbEventOperation *Ndb::nextEvent()
   return theEventBuffer->nextEvent();
 }
 
+bool
+Ndb::isConsistent(Uint64& gci)
+{
+  return theEventBuffer->isConsistent(gci);
+}
+
+bool
+Ndb::isConsistentGCI(Uint64 gci)
+{
+  return theEventBuffer->isConsistentGCI(gci);
+}
+
 const NdbEventOperation*
 Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
 {
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2007-12-11 16:05:16 +01:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2008-02-11 14:24:16 +01:00
@@ -1270,9 +1270,18 @@ NdbEventBuffer::nextEvent()
 #endif
 
   EventBufData *data;
+  Uint64 gci;
   while ((data= m_available_data.m_head))
   {
     NdbEventOperationImpl *op= data->m_event_op;
+
+    /*
+     * The data was not associated with an event operation,
+     * possibly a dummy event list marking missing data
+     */
+    if (!op && !isConsistent(gci))
+      DBUG_RETURN_EVENT(0);
+
     DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
 
     /*
@@ -1315,8 +1324,10 @@ NdbEventBuffer::nextEvent()
            // moved to next gci, check if any references have been
            // released when completing the last gci
            deleteUsedEventOperations();
-           gci_ops = m_available_data.next_gci_ops();
+           gci_ops = m_available_data.delete_next_gci_ops();
          }
+         if (!gci_ops->m_consistent)
+           DBUG_RETURN_EVENT(0);
          assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
          // to return TE_NUL it should be made into data event
          if (SubTableData::getOperation(data->sdata->requestInfo) ==
@@ -1347,11 +1358,45 @@ NdbEventBuffer::nextEvent()
   while (gci_ops)
   {
     deleteUsedEventOperations();
-    gci_ops = m_available_data.next_gci_ops();
+    gci_ops = m_available_data.delete_next_gci_ops();
   }
   DBUG_RETURN_EVENT(0);
 }
 
+bool
+NdbEventBuffer::isConsistent(Uint64& gci)
+{
+  DBUG_ENTER("NdbEventBuffer::isConsistent");
+  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
+  while (gci_ops)
+  {
+    if (!gci_ops->m_consistent)
+    {
+      gci = gci_ops->m_gci;
+      DBUG_RETURN(false);
+    }
+    gci_ops = gci_ops->m_next;
+  }
+
+  DBUG_RETURN(true);
+}
+
+bool
+NdbEventBuffer::isConsistentGCI(Uint64 gci)
+{
+  DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
+  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
+  while (gci_ops)
+  {
+    if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
+      DBUG_RETURN(false);
+    gci_ops = gci_ops->m_next;
+  }
+
+  DBUG_RETURN(true);
+}
+
+
 NdbEventOperationImpl*
 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
 {
@@ -1738,6 +1783,35 @@ NdbEventBuffer::complete_bucket(Gci_cont
     assert(bucket->m_data.m_count);
 #endif
     m_complete_data.m_data.append_list(&bucket->m_data, gci);
+    if (bucket->m_state & Gci_container::GC_INCONSISTENT)
+    {
+      /*
+       * Bucket marked as possibly missing data, probably due to
+       * kernel running out of event_buffer during node failure.
+       * Mark newly appended event list as inconsistent.
+       */
+      assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
+      m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
+    }
+  }
+  else // if (bucket->m_data.is_empty())
+  {
+    if (bucket->m_state & Gci_container::GC_INCONSISTENT)
+    {
+      /*
+       * Bucket marked as possibly missing data, probably due to
+       * kernel running out of event_buffer during node failure
+       * Bucket contained no data so we must add a dummy event list
+       * as inconsistency marker.
+       */
+      EventBufData *dummy_data= alloc_data();
+      EventBufData_list *dummy_event_list = new EventBufData_list;
+      dummy_event_list->append_used_data(dummy_data);
+      dummy_event_list->m_is_not_multi_list = true;
+      m_complete_data.m_data.append_list(dummy_event_list, gci);
+      assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
+      m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
+    }
   }
 
   Uint32 minpos = m_min_gci_index;
@@ -1803,6 +1877,11 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
     DBUG_VOID_RETURN_EVENT;
   }
 
+  if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
+  {
+    bucket->m_state |= Gci_container::GC_INCONSISTENT;
+  }
+
   Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
   if(unlikely(old_cnt == ~(Uint32)0))
   {
@@ -1852,7 +1931,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
                          Uint32(gci >> 32), Uint32(gci),
                          Uint32(minGCI >> 32), Uint32(minGCI),
                          Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
-      bucket->m_state = Gci_container::GC_COMPLETE;
+      bucket->m_state |= Gci_container::GC_COMPLETE;
       bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
       m_latest_complete_GCI = gci;
     }
@@ -1886,7 +1965,7 @@ NdbEventBuffer::complete_outof_order_gci
     Gci_container* bucket = find_bucket(start_gci);
     assert(bucket);
     assert(maxpos == m_max_gci_index);
-    if (bucket->m_state != Gci_container::GC_COMPLETE)
+    if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
     {
 #ifdef VM_TRACE
       verify_known_gci(false);
@@ -1984,7 +2063,7 @@ NdbEventBuffer::report_node_connected(Ui
 			     NdbDictionary::Event::_TE_ACTIVE);
   SubTableData::setReqNodeId(data.requestInfo, node_id);
   SubTableData::setNdbdNodeId(data.requestInfo, node_id);
-  data.logType = SubTableData::LOG;
+  data.flags = SubTableData::LOG;
 
   Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
   find_max_known_gci(&gci);
@@ -2019,7 +2098,7 @@ NdbEventBuffer::report_node_failure(Uint
 			     NdbDictionary::Event::_TE_NODE_FAILURE);
   SubTableData::setReqNodeId(data.requestInfo, node_id);
   SubTableData::setNdbdNodeId(data.requestInfo, node_id);
-  data.logType = SubTableData::LOG;
+  data.flags = SubTableData::LOG;
 
   Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
   find_max_known_gci(&gci);
@@ -2090,7 +2169,7 @@ NdbEventBuffer::completeClusterFailed()
   data.requestInfo = 0;
   SubTableData::setOperation(data.requestInfo,
 			     NdbDictionary::Event::_TE_CLUSTER_FAILURE);
-  data.logType = SubTableData::LOG;
+  data.flags = SubTableData::LOG;
   data.gci_hi = Uint32(gci >> 32);
   data.gci_lo = Uint32(gci);
 
@@ -3163,7 +3242,7 @@ NdbEventBuffer::reportStatus()
   Uint64 apply_gci, latest_gci= m_latestGCI;
   if (apply_buf == 0)
     apply_buf= m_complete_data.m_data.m_head;
-  if (apply_buf)
+  if (apply_buf && apply_buf->sdata)
   {
     Uint32 gci_hi = apply_buf->sdata->gci_hi;
     Uint32 gci_lo = apply_buf->sdata->gci_lo;
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2007-09-12 13:30:24 +02:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2008-02-11 14:24:16 +01:00
@@ -139,7 +139,17 @@ public:
   };
   struct Gci_ops                // 2
   {
+    Gci_ops()
+      : m_gci(0),
+        m_consistent(true),
+        m_gci_op_list(NULL),
+        m_next(NULL),
+        m_gci_op_count(0)
+      {};
+    ~Gci_ops() {};
+
     Uint64 m_gci;
+    bool m_consistent;
     Gci_op *m_gci_op_list;
     Gci_ops *m_next;
     Uint32 m_gci_op_count;
@@ -160,7 +170,7 @@ public:
     Uint32 m_is_not_multi_list;  // 2
   };
   Gci_ops *first_gci_ops();
-  Gci_ops *next_gci_ops();
+  Gci_ops *delete_next_gci_ops();
   // case 1 above; add Gci_op to single list
   void add_gci_op(Gci_op g);
 private:
@@ -198,7 +208,7 @@ EventBufData_list::~EventBufData_list()
   {
     Gci_ops *op = first_gci_ops();
     while (op)
-      op = next_gci_ops();
+      op = delete_next_gci_ops();
   }
   DBUG_VOID_RETURN_EVENT;
 }
@@ -271,7 +281,7 @@ EventBufData_list::first_gci_ops()
 }
 
 inline EventBufData_list::Gci_ops *
-EventBufData_list::next_gci_ops()
+EventBufData_list::delete_next_gci_ops()
 {
   assert(!m_is_not_multi_list);
   Gci_ops *first = m_gci_ops_list;
@@ -320,8 +330,10 @@ struct Gci_container
 {
   enum State 
   {
-    GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
+    GC_COMPLETE     = 0x1, // GCI is complete, but waiting for out of order
+    GC_INCONSISTENT = 0x2  // GCI might be missing event data
   };
+
   
   Uint16 m_state;
   Uint16 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
@@ -530,6 +542,9 @@ public:
   int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
   int flushIncompleteEvents(Uint64 gci);
   NdbEventOperation *nextEvent();
+  bool isConsistent(Uint64& gci);
+  bool isConsistentGCI(Uint64 gci);
+
   NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
                                                Uint32* event_types);
   void deleteUsedEventOperations();
diff -Nrup a/storage/ndb/test/ndbapi/test_event.cpp b/storage/ndb/test/ndbapi/test_event.cpp
--- a/storage/ndb/test/ndbapi/test_event.cpp	2007-12-04 07:25:27 +01:00
+++ b/storage/ndb/test/ndbapi/test_event.cpp	2008-02-11 14:24:16 +01:00
@@ -1847,6 +1847,177 @@ runBug31701(NDBT_Context* ctx, NDBT_Step
   return NDBT_OK;
 }
 
+int
+errorInjectBufferOverflow(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb * ndb= GETNDB(step);
+  NdbRestarter restarter;
+  const NdbDictionary::Table* pTab = ctx->getTab();
+  int result= NDBT_OK;
+  int res;
+  bool found_gap = false;
+  NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
+  Uint64 gci;
+
+  if (pOp == 0)
+  {
+    g_err << "Failed to createEventOperation" << endl;
+    return NDBT_FAILED;
+  }
+
+  if (restarter.insertErrorInAllNodes(13034) != 0)
+  {
+    result = NDBT_FAILED;
+    goto cleanup;
+  }
+
+  res = ndb->pollEvents(5000);
+
+  if (ndb->getNdbError().code != 0)
+  {
+    g_err << "pollEvents failed: \n";
+    g_err << ndb->getNdbError().code << " "
+          << ndb->getNdbError().message << endl;
+    result = (ndb->getNdbError().code == 4720)?NDBT_OK:NDBT_FAILED;
+    goto cleanup;
+  }
+  if (res >= 0) {
+    NdbEventOperation *tmp;
+    while (!found_gap && (tmp= ndb->nextEvent()))
+    {
+      if (!ndb->isConsistent(gci))
+        found_gap = true;
+    }
+  }
+  if (!ndb->isConsistent(gci))
+    found_gap = true;
+  if (!found_gap)
+  {
+    g_err << "buffer overflow not detected\n";
+    result = NDBT_FAILED;
+    goto cleanup;
+  }
+
+cleanup:
+
+  if (ndb->dropEventOperation(pOp) != 0) {
+    g_err << "dropping event operation failed\n";
+    result = NDBT_FAILED;
+  }
+
+  return result;
+}
+
+int
+errorInjectStalling(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb * ndb= GETNDB(step);
+  NdbRestarter restarter;
+  const NdbDictionary::Table* pTab = ctx->getTab();
+  NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
+  int result = NDBT_OK;
+  int res;
+  bool connected = true;
+
+  if (pOp == 0)
+  {
+    g_err << "Failed to createEventOperation" << endl;
+    return NDBT_FAILED;
+  }
+
+  if (restarter.insertErrorInAllNodes(13035) != 0)
+  {
+    result = NDBT_FAILED;
+    goto cleanup;
+  }
+
+  res = ndb->pollEvents(5000) > 0;
+
+  if (ndb->getNdbError().code != 0)
+  {
+    g_err << "pollEvents failed: \n";
+    g_err << ndb->getNdbError().code << " "
+          << ndb->getNdbError().message << endl;
+    result = NDBT_FAILED;
+    goto cleanup;
+  }
+
+  if (res > 0) {
+    NdbEventOperation *tmp;
+    int count = 0;
+    while (connected && (tmp= ndb->nextEvent()))
+    {
+      if (tmp != pOp)
+      {
+        printf("Found stray NdbEventOperation\n");
+        result = NDBT_FAILED;
+        goto cleanup;
+      }
+      switch (tmp->getEventType()) {
+      case NdbDictionary::Event::TE_CLUSTER_FAILURE:
+        connected = false;
+        break;
+      default:
+        count++;
+        break;
+      }
+    }
+    if (connected)
+    {
+      g_err << "failed to detect cluster disconnect\n";
+      result = NDBT_FAILED;
+      goto cleanup;
+    }
+  }
+
+cleanup:
+
+  if (ndb->dropEventOperation(pOp) != 0) {
+    g_err << "dropping event operation failed\n";
+    result = NDBT_FAILED;
+  }
+
+  // Reconnect by trying to start a transaction
+  uint retries = 100;
+  while (!connected && retries--)
+  {
+    HugoTransactions hugoTrans(* ctx->getTab());
+    if (hugoTrans.loadTable(ndb, 100) == 0)
+    {
+      connected = true;
+      result = NDBT_OK;
+    }
+    else
+    {
+      NdbSleep_MilliSleep(300);
+      result = NDBT_FAILED;
+    }
+  }
+
+  if (!connected)
+    g_err << "Failed to reconnect\n";
+
+  // Restart cluster with abort
+  if (restarter.restartAll(false, false, true) != 0){
+    ctx->stopTest();
+    return NDBT_FAILED;
+  }
+
+  // Stop the other thread
+  ctx->stopTest();
+
+  if (restarter.waitClusterStarted(300) != 0){
+    return NDBT_FAILED;
+  }
+
+  if (ndb->waitUntilReady() != 0){
+    return NDBT_FAILED;
+  }
+
+  return result;
+}
+
+
 NDBT_TESTSUITE(test_event);
 TESTCASE("BasicEventOperation", 
 	 "Verify that we can listen to Events"
@@ -1983,6 +2154,20 @@ TESTCASE("Bug31701", ""){
   STEP(runBug31701);
   FINALIZER(runDropEvent);
   FINALIZER(runDropShadowTable);
+}
+TESTCASE("EventBufferOverflow",
+         "Simulating EventBuffer overflow while node restart"
+         "NOTE! No errors are allowed!" ){
+  INITIALIZER(runCreateEvent);
+  STEP(errorInjectBufferOverflow);
+  FINALIZER(runDropEvent);
+}
+TESTCASE("StallingSubscriber",
+         "Simulating slow subscriber that will become disconnected"
+         "NOTE! No errors are allowed!" ){
+  INITIALIZER(runCreateEvent);
+  STEP(errorInjectStalling);
+  FINALIZER(runDropEvent);
 }
 NDBT_TESTSUITE_END(test_event);
 
Thread
bk commit into 5.1 tree (mskold:1.2676)Martin Skold11 Feb