Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-02-11 14:24:17+01:00, mskold@stripped +15 -0
WL#4083 Resource shortage handling in event mechanism: Adds handling of buffer overflow in Suma during node failure and disconnect of lagging subscribers (subscribers that do not acknowledge GCP's fast enough) based on new configuration parameter MX_BUFFERED_GCP.
sql/ha_ndbcluster_binlog.cc@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +27 -0
WL#4083 Resource shortage handling in event mechanism: Added checks for missing data (inconsidtent GCI)
storage/ndb/include/kernel/signaldata/SumaImpl.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +3 -2
WL#4083 Resource shortage handling in event mechanism: Added MISSING_DATA bit signal incomplete GCI
storage/ndb/include/mgmapi/mgmapi_config_parameters.h@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +2 -0
WL#4083 Resource shortage handling in event mechanism: Added new configuration parameter MAX_BUFFERED_GCP to define how many GCP's are buffered in Suma before unresponsive subscribers are disconnected
storage/ndb/include/mgmapi/ndb_logevent.h@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +11 -2
WL#4083 Resource shortage handling in event mechanism: Added new SubscriptionStatus event.
storage/ndb/include/ndbapi/Ndb.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +25 -0
WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.
storage/ndb/src/common/debugger/EventLogger.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +21 -0
WL#4083 Resource shortage handling in event mechanism: Added printout for change of subscription status log event.
storage/ndb/src/kernel/blocks/ERROR_codes.txt@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +6 -1
WL#4083 Resource shortage handling in event mechanism: Added new error inserts for simulating buffer overflow in Suma.
storage/ndb/src/kernel/blocks/suma/Suma.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +135 -16
WL#4083 Resource shortage handling in event mechanism: Added support for managing buffer overflow during node restarts by informing subscriber (will cause binlog injection thread to insert a GAP event). Added checking for non-responsive subscribers (exceeding MAX_BUFFERED_GCP) by disconnecting them.
storage/ndb/src/kernel/blocks/suma/Suma.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +5 -0
WL#4083 Resource shortage handling in event mechanism
storage/ndb/src/kernel/blocks/suma/SumaInit.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +1 -0
WL#4083 Resource shortage handling in event mechanism
storage/ndb/src/mgmsrv/ConfigInfo.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +12 -0
WL#4083 Resource shortage handling in event mechanism: Added MAX_BUFFERED_GCP that specifies how many GCI's are buffered before subscribers are disconnected.
storage/ndb/src/ndbapi/Ndb.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +12 -0
WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +87 -8
WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +19 -4
WL#4083 Resource shortage handling in event mechanism: Added methods for checking GCI consistency.
storage/ndb/test/ndbapi/test_event.cpp@stripped, 2008-02-11 14:24:16+01:00, mskold@stripped +185 -0
WL#4083 Resource shortage handling in event mechanism: Added test cases for testing Suma buffer overflow and disconnect of lagging subscribers.
diff -Nrup a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
--- a/sql/ha_ndbcluster_binlog.cc 2007-12-11 20:53:05 +01:00
+++ b/sql/ha_ndbcluster_binlog.cc 2008-02-11 14:24:16 +01:00
@@ -4593,6 +4593,20 @@ restart:
Uint32 iter= 0;
const NdbEventOperation *gci_op;
Uint32 event_types;
+
+ if (!i_ndb->isConsistentGCI(gci))
+ {
+ char errmsg[64];
+ uint end= sprintf(&errmsg[0],
+ "Detected missing data in GCI %llu, "
+ "inserting GAP event", gci);
+ errmsg[end]= '\0';
+ DBUG_PRINT("info",
+ ("Detected missing data in GCI %llu, "
+ "inserting GAP event", gci));
+ LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
+ inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
+ }
while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
!= NULL)
{
@@ -4816,6 +4830,19 @@ restart:
gci_timer.elapsed_ms(),
(1000*event_count) / gci_timer.elapsed_ms());
#endif
+ }
+ if(!i_ndb->isConsistent(gci))
+ {
+ char errmsg[64];
+ uint end= sprintf(&errmsg[0],
+ "Detected missing data in GCI %llu, "
+ "inserting GAP event", gci);
+ errmsg[end]= '\0';
+ DBUG_PRINT("info",
+ ("Detected missing data in GCI %llu, "
+ "inserting GAP event", gci));
+ LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
+ inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
}
}
diff -Nrup a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2007-09-05 15:19:56 +02:00
+++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2008-02-11 14:24:16 +01:00
@@ -293,7 +293,7 @@ struct SubTableData {
SECTION( AFTER_VALUES = 1 );
SECTION( BEFORE_VALUES = 2 );
- enum LogType {
+ enum Flags {
SCAN = 1,
LOG = 2,
REMOVE_FLAGS = 0xff
@@ -303,7 +303,7 @@ struct SubTableData {
Uint32 gci_hi;
Uint32 tableId;
Uint32 requestInfo;
- Uint32 logType;
+ Uint32 flags;
union {
Uint32 changeMask;
Uint32 anyValue;
@@ -393,6 +393,7 @@ struct SubGcpCompleteRep {
STATIC_CONST( SignalLength = 5 );
STATIC_CONST( ON_DISK = 1 );
STATIC_CONST( IN_MEMORY = 2 );
+ STATIC_CONST( MISSING_DATA = 4 );
Uint32 gci_hi;
Uint32 senderRef;
diff -Nrup a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2007-09-28 13:55:10 +02:00
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2008-02-11 14:24:16 +01:00
@@ -123,6 +123,8 @@
#define CFG_DB_MICRO_GCP_INTERVAL 170 /* micro gcp */
#define CFG_DB_MICRO_GCP_TIMEOUT 171
+#define CFG_DB_MAX_BUFFERED_GCP 180 /* subscriptions */
+
#define CFG_DB_SGA 198 /* super pool mem */
#define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */
diff -Nrup a/storage/ndb/include/mgmapi/ndb_logevent.h b/storage/ndb/include/mgmapi/ndb_logevent.h
--- a/storage/ndb/include/mgmapi/ndb_logevent.h 2007-06-12 07:03:31 +02:00
+++ b/storage/ndb/include/mgmapi/ndb_logevent.h 2008-02-11 14:24:16 +01:00
@@ -185,12 +185,16 @@ extern "C" {
/* 59 used */
/** NDB_MGM_EVENT_CATEGORY_STARTUP */
- NDB_LE_StartReport = 60
+ NDB_LE_StartReport = 60,
/* 61 (used in upcoming patch) */
/* 62-67 used */
- /* 68 unused */
+ /* 68 (used in upcoming patch) */
+ /** NDB_MGM_EVENT_SEVERITY_WARNING */
+ NDB_LE_SubscriptionStatus = 69
+
+ /* 70 unused */
};
/**
@@ -714,6 +718,11 @@ extern "C" {
unsigned bitmask_size;
unsigned bitmask_data[1];
} StartReport;
+ /** Log event data @ref NDB_LE_SubscriptionStatus */
+ struct {
+ unsigned report_type;
+ unsigned node_id;
+ } SubscriptionStatus;
#ifndef DOXYGEN_FIX
};
#else
diff -Nrup a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp
--- a/storage/ndb/include/ndbapi/Ndb.hpp 2007-11-02 23:46:16 +01:00
+++ b/storage/ndb/include/ndbapi/Ndb.hpp 2008-02-11 14:24:16 +01:00
@@ -1256,6 +1256,31 @@ public:
NdbEventOperation *nextEvent();
/**
+ * Check if all events are consistent
+ * If node failure occurs during resource exaustion events
+ * may be lost and the delivered event data might thus be incomplete.
+ *
+ * @param OUT aGCI
+ * any inconsistent GCI found
+ *
+ * @return true if all received events are consistent, false if possible
+ * inconsistency
+ */
+ bool isConsistent(Uint64& gci);
+
+ /**
+ * Check if all events in a GCI are consistent
+ * If node failure occurs during resource exaustion events
+ * may be lost and the delivered event data might thus be incomplete.
+ *
+ * @param aGCI
+ * the GCI to check
+ *
+ * @return true if GCI is consistent, false if possible inconsistency
+ */
+ bool isConsistentGCI(Uint64 gci);
+
+ /**
* Iterate over distinct event operations which are part of current
* GCI. Valid after nextEvent. Used to get summary information for
* the epoch (e.g. list of all tables) before processing event data.
diff -Nrup a/storage/ndb/src/common/debugger/EventLogger.cpp b/storage/ndb/src/common/debugger/EventLogger.cpp
--- a/storage/ndb/src/common/debugger/EventLogger.cpp 2007-10-08 15:28:10 +02:00
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp 2008-02-11 14:24:16 +01:00
@@ -990,6 +990,26 @@ void getTextStartReport(QQQQ) {
}
}
+void getTextSubscriptionStatus(QQQQ)
+{
+ switch(theData[1]) {
+ case(1): // SubscriptionStatus::DISCONNECTED
+ BaseString::snprintf(m_text, m_text_len,
+ "Disconnecting node %u because it has "
+ "exceeded MaxBufferedEpochs, gci %lld",
+ theData[2],
+ make_uint64(theData[3], theData[4]));
+ break;
+ case(2): // SubscriptionStatus::INCONSISTENT
+ BaseString::snprintf(m_text, m_text_len,
+ "Nodefailure while out of event buffer: "
+ "informing subscribers of possibly missing event data"
+ ", gci %lld",
+ make_uint64(theData[3], theData[4]));
+ break;
+ }
+}
+
#if 0
BaseString::snprintf(m_text,
m_text_len,
@@ -1071,6 +1091,7 @@ const EventLoggerBase::EventRepLogLevelM
ROW(MissedHeartbeat, LogLevel::llError, 8, Logger::LL_WARNING ),
ROW(DeadDueToHeartbeat, LogLevel::llError, 8, Logger::LL_ALERT ),
ROW(WarningEvent, LogLevel::llError, 2, Logger::LL_WARNING ),
+ ROW(SubscriptionStatus, LogLevel::llError, 4, Logger::LL_WARNING ),
// INFO
ROW(SentHeartbeat, LogLevel::llInfo, 12, Logger::LL_INFO ),
ROW(CreateLogBytes, LogLevel::llInfo, 11, Logger::LL_INFO ),
diff -Nrup a/storage/ndb/src/kernel/blocks/ERROR_codes.txt b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2007-12-03 09:44:13 +01:00
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2008-02-11 14:24:16 +01:00
@@ -11,7 +11,7 @@ Next CMVMI 9000
Next BACKUP 10038
Next DBUTIL 11002
Next DBTUX 12008
-Next SUMA 13034
+Next SUMA 13037
TESTING NODE FAILURE, ARBITRATION
---------------------------------
@@ -567,3 +567,8 @@ NDBCNTR:
1000: Crash insertion on SystemError::CopyFragRef
1001: Delay sending NODE_FAILREP (to own node), until error is cleared
+
+SUMA:
+13034: Simulate report MISSING_DATA at node failure
+13035: Simulate disconnect lagging subscribers
+13036: Simulate out of event buffer
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2007-12-03 14:32:38 +01:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-02-11 14:24:16 +01:00
@@ -165,11 +165,13 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
ndbrequire(p != 0);
// SumaParticipant
- Uint32 noTables, noAttrs;
+ Uint32 noTables, noAttrs, maxBufferedGcp;
ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES,
&noTables);
ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES,
&noAttrs);
+ ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_GCP,
+ &maxBufferedGcp);
c_tablePool.setSize(noTables);
c_tables.setSize(noTables);
@@ -181,6 +183,8 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
c_syncPool.setSize(2);
c_dataBufferPool.setSize(noAttrs);
+ c_maxBufferedGcp = maxBufferedGcp;
+
// Calculate needed gcp pool as 10 records + the ones needed
// during a possible api timeout
Uint32 dbApiHbInterval, gcpInterval;
@@ -225,7 +229,8 @@ Suma::execREAD_CONFIG_REQ(Signal* signal
m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
m_gcp_complete_rep_count = 0;
m_out_of_buffer_gci = 0;
-
+ m_missing_data = false;
+
c_startup.m_wait_handover= false;
c_failedApiNodes.clear();
@@ -3518,7 +3523,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
data->tableId = tableId;
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
- data->logType = 0;
+ data->flags = 0;
data->anyValue = any_value;
data->totalLen = ptrLen;
@@ -3556,6 +3561,77 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
}
void
+Suma::checkMaxBufferedGCP(Signal *signal)
+{
+ /*
+ * Check if any subscribers are exceeding the MaxBufferedEpochs
+ */
+ jamEntry();
+ if (c_gcp_list.isEmpty())
+ {
+ jam();
+ return;
+ }
+ Ptr<Gcp_record> gcp;
+ c_gcp_list.first(gcp);
+ if (ERROR_INSERTED(13035))
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
+ c_maxBufferedGcp, m_max_seen_gci,
+ m_last_complete_gci, gcp.p->m_gci);
+ c_maxBufferedGcp = 1;
+ }
+ if (m_max_seen_gci - gcp.p->m_gci >= (Uint64) c_maxBufferedGcp)
+ {
+ NodeBitmask subs = c_subscriber_nodes;
+ jam();
+ // Disconnect lagging subscribers
+ for(; !gcp.isNull(); c_gcp_list.next(gcp))
+ {
+ jam();
+ if (m_max_seen_gci - gcp.p->m_gci >= (Uint64) c_maxBufferedGcp)
+ {
+ jam();
+ for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
+ {
+ if (subs.get(nodeId))
+ {
+ jam();
+ subs.clear(nodeId);
+ // Disconnecting node
+ signal->theData[0] = NDB_LE_SubscriptionStatus;
+ signal->theData[1] = 1; // DISCONNECTED;
+ signal->theData[2] = nodeId;
+ signal->theData[3] = (Uint32) gcp.p->m_gci;
+ signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
+ infoEvent("MaxBufferedEpochs %llu exceeded %u",
+ m_max_seen_gci - gcp.p->m_gci,
+ c_maxBufferedGcp);
+
+ /**
+ * Force API_FAILREQ
+ */
+ signal->theData[0] = nodeId;
+ EXECUTE_DIRECT(QMGR, GSN_API_FAILREQ, signal, 1);
+ }
+ }
+ }
+ else
+ {
+ /*
+ * We have found a newer gci that still is
+ * allowed to be buffered
+ */
+ break;
+ }
+ }
+ }
+}
+
+void
Suma::execSUB_GCP_COMPLETE_REP(Signal* signal)
{
jamEntry();
@@ -3565,7 +3641,17 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
Uint32 gci_hi = rep->gci_hi;
Uint32 gci_lo = rep->gci_lo;
Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
- Uint32 flags = rep->flags;
+ Uint32 flags = (m_missing_data)
+ ? rep->flags | SubGcpCompleteRep::MISSING_DATA
+ : rep->flags;
+
+ if (ERROR_INSERTED(13034))
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ ndbout_c("Simulating out of event buffer at node failure");
+ flags |= SubGcpCompleteRep::MISSING_DATA;
+ }
#ifdef VM_TRACE
if (m_gcp_monitor == 0)
@@ -3584,6 +3670,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
#endif
m_last_complete_gci = gci;
+ checkMaxBufferedGCP(signal);
m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
/**
@@ -3718,6 +3805,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
{
infoEvent("Reenable event buffer");
m_out_of_buffer_gci = 0;
+ m_missing_data = false;
}
}
@@ -3868,7 +3956,7 @@ Suma::execALTER_TAB_REQ(Signal *signal)
SubTableData::setOperation(data->requestInfo,
NdbDictionary::Event::_TE_ALTER);
SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
- data->logType = 0;
+ data->flags = 0;
data->changeMask = changeMask;
data->totalLen = tabInfoPtr.sz;
{
@@ -3920,6 +4008,13 @@ Suma::execSUB_GCP_COMPLETE_ACK(Signal* s
Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
+ if (ERROR_INSERTED(13035))
+ {
+ jam();
+ ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack");
+ return;
+ }
+
if (refToBlock(senderRef) == SUMA) {
jam();
// Ack from other SUMA
@@ -4763,7 +4858,7 @@ Suma::out_of_buffer(Signal* signal)
m_out_of_buffer_gci = m_last_complete_gci - 1;
infoEvent("Out of event buffer: nodefailure will cause event failures");
-
+ m_missing_data = false;
out_of_buffer_release(signal, 0);
}
@@ -4805,11 +4900,19 @@ Suma::out_of_buffer_release(Signal* sign
*/
m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
? m_max_seen_gci : m_last_complete_gci;
+ m_missing_data = false;
}
Uint32
Suma::seize_page()
{
+ if (ERROR_INSERTED(13036))
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ ndbout_c("Simulating out of event buffer");
+ m_out_of_buffer_gci = m_max_seen_gci;
+ }
if(unlikely(m_out_of_buffer_gci))
{
return RNIL;
@@ -4941,20 +5044,26 @@ void
Suma::start_resend(Signal* signal, Uint32 buck)
{
printf("start_resend(%d, ", buck);
-
- if(m_out_of_buffer_gci)
- {
- progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
- "Nodefailure while out of event buffer");
- return;
- }
-
+
/**
* Resend from m_max_acked_gci + 1 until max_gci + 1
*/
Bucket* bucket= c_buckets + buck;
Page_pos pos= bucket->m_buffer_head;
+ if(m_out_of_buffer_gci)
+ {
+ Ptr<Gcp_record> gcp;
+ c_gcp_list.last(gcp);
+ signal->theData[0] = NDB_LE_SubscriptionStatus;
+ signal->theData[1] = 2; // INCONSISTENT;
+ signal->theData[3] = (Uint32) pos.m_max_gci;
+ signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ m_missing_data = true;
+ return;
+ }
+
if(pos.m_page_id == RNIL)
{
jam();
@@ -5087,9 +5196,19 @@ Suma::resend_bucket(Signal* signal, Uint
SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
rep->gci_hi = last_gci >> 32;
rep->gci_lo = last_gci & 0xFFFFFFFF;
- rep->flags = 0;
+ rep->flags = (m_missing_data)
+ ? SubGcpCompleteRep::MISSING_DATA
+ : 0;
rep->senderRef = reference();
rep->gcp_complete_rep_count = 1;
+
+ if (ERROR_INSERTED(13034))
+ {
+ jam();
+ CLEAR_ERROR_INSERT_VALUE;
+ ndbout_c("Simulating out of event buffer at node failure");
+ rep->flags |= SubGcpCompleteRep::MISSING_DATA;
+ }
char buf[255];
c_subscriber_nodes.getText(buf);
@@ -5135,7 +5254,7 @@ Suma::resend_bucket(Signal* signal, Uint
data->tableId = table;
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
- data->logType = 0;
+ data->flags = 0;
data->anyValue = any_value;
data->totalLen = ptrLen;
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/Suma.hpp b/storage/ndb/src/kernel/blocks/suma/Suma.hpp
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2008-02-11 14:24:16 +01:00
@@ -349,6 +349,8 @@ public:
ArrayPool<SyncRecord> c_syncPool;
DataBuffer<15>::DataBufferPool c_dataBufferPool;
+ Uint32 c_maxBufferedGcp;
+
NodeBitmask c_failedApiNodes;
/**
@@ -600,11 +602,14 @@ private:
Uint64 get_current_gci(Signal*);
+ void checkMaxBufferedGCP(Signal *signal);
+
Uint64 m_max_seen_gci; // FIRE_TRIG_ORD
Uint64 m_max_sent_gci; // FIRE_TRIG_ORD -> send
Uint64 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
Uint64 m_out_of_buffer_gci;
Uint32 m_gcp_complete_rep_count;
+ bool m_missing_data;
struct Gcp_record
{
diff -Nrup a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
--- a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp 2007-09-05 15:19:57 +02:00
+++ b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp 2008-02-11 14:24:16 +01:00
@@ -127,6 +127,7 @@ Suma::Suma(Block_context& ctx) :
#ifdef VM_TRACE
m_gcp_monitor = 0;
#endif
+ m_missing_data = false;
}
Suma::~Suma()
diff -Nrup a/storage/ndb/src/mgmsrv/ConfigInfo.cpp b/storage/ndb/src/mgmsrv/ConfigInfo.cpp
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2007-09-28 13:55:10 +02:00
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-02-11 14:24:16 +01:00
@@ -917,6 +917,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
"32000" },
{
+ CFG_DB_MAX_BUFFERED_GCP,
+ "MaxBufferedEpochs",
+ DB_TOKEN,
+ "Allowed numbered of epochs that a subscribing node can lag behind (unprocessed epochs). Exceeding will cause lagging subscribers to be disconnected.",
+ ConfigInfo::CI_USED,
+ true,
+ ConfigInfo::CI_INT,
+ "100",
+ "0",
+ "100000" },
+
+ {
CFG_DB_NO_REDOLOG_FILES,
"NoOfFragmentLogFiles",
DB_TOKEN,
diff -Nrup a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
--- a/storage/ndb/src/ndbapi/Ndb.cpp 2007-09-07 12:30:54 +02:00
+++ b/storage/ndb/src/ndbapi/Ndb.cpp 2008-02-11 14:24:16 +01:00
@@ -1817,6 +1817,18 @@ NdbEventOperation *Ndb::nextEvent()
return theEventBuffer->nextEvent();
}
+bool
+Ndb::isConsistent(Uint64& gci)
+{
+ return theEventBuffer->isConsistent(gci);
+}
+
+bool
+Ndb::isConsistentGCI(Uint64 gci)
+{
+ return theEventBuffer->isConsistentGCI(gci);
+}
+
const NdbEventOperation*
Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2007-12-11 16:05:16 +01:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2008-02-11 14:24:16 +01:00
@@ -1270,9 +1270,18 @@ NdbEventBuffer::nextEvent()
#endif
EventBufData *data;
+ Uint64 gci;
while ((data= m_available_data.m_head))
{
NdbEventOperationImpl *op= data->m_event_op;
+
+ /*
+ * The data was not associated with an event operation,
+ * possibly a dummy event list marking missing data
+ */
+ if (!op && !isConsistent(gci))
+ DBUG_RETURN_EVENT(0);
+
DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
/*
@@ -1315,8 +1324,10 @@ NdbEventBuffer::nextEvent()
// moved to next gci, check if any references have been
// released when completing the last gci
deleteUsedEventOperations();
- gci_ops = m_available_data.next_gci_ops();
+ gci_ops = m_available_data.delete_next_gci_ops();
}
+ if (!gci_ops->m_consistent)
+ DBUG_RETURN_EVENT(0);
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
// to return TE_NUL it should be made into data event
if (SubTableData::getOperation(data->sdata->requestInfo) ==
@@ -1347,11 +1358,45 @@ NdbEventBuffer::nextEvent()
while (gci_ops)
{
deleteUsedEventOperations();
- gci_ops = m_available_data.next_gci_ops();
+ gci_ops = m_available_data.delete_next_gci_ops();
}
DBUG_RETURN_EVENT(0);
}
+bool
+NdbEventBuffer::isConsistent(Uint64& gci)
+{
+ DBUG_ENTER("NdbEventBuffer::isConsistent");
+ EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
+ while (gci_ops)
+ {
+ if (!gci_ops->m_consistent)
+ {
+ gci = gci_ops->m_gci;
+ DBUG_RETURN(false);
+ }
+ gci_ops = gci_ops->m_next;
+ }
+
+ DBUG_RETURN(true);
+}
+
+bool
+NdbEventBuffer::isConsistentGCI(Uint64 gci)
+{
+ DBUG_ENTER("NdbEventBuffer::isConsistentGCI");
+ EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
+ while (gci_ops)
+ {
+ if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
+ DBUG_RETURN(false);
+ gci_ops = gci_ops->m_next;
+ }
+
+ DBUG_RETURN(true);
+}
+
+
NdbEventOperationImpl*
NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
@@ -1738,6 +1783,35 @@ NdbEventBuffer::complete_bucket(Gci_cont
assert(bucket->m_data.m_count);
#endif
m_complete_data.m_data.append_list(&bucket->m_data, gci);
+ if (bucket->m_state & Gci_container::GC_INCONSISTENT)
+ {
+ /*
+ * Bucket marked as possibly missing data, probably due to
+ * kernel running out of event_buffer during node failure.
+ * Mark newly appended event list as inconsistent.
+ */
+ assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
+ m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
+ }
+ }
+ else // if (bucket->m_data.is_empty())
+ {
+ if (bucket->m_state & Gci_container::GC_INCONSISTENT)
+ {
+ /*
+ * Bucket marked as possibly missing data, probably due to
+ * kernel running out of event_buffer during node failure
+ * Bucket contained no data so we must add a dummy event list
+ * as inconsistency marker.
+ */
+ EventBufData *dummy_data= alloc_data();
+ EventBufData_list *dummy_event_list = new EventBufData_list;
+ dummy_event_list->append_used_data(dummy_data);
+ dummy_event_list->m_is_not_multi_list = true;
+ m_complete_data.m_data.append_list(dummy_event_list, gci);
+ assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
+ m_complete_data.m_data.m_gci_ops_list_tail->m_consistent = false;
+ }
}
Uint32 minpos = m_min_gci_index;
@@ -1803,6 +1877,11 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
DBUG_VOID_RETURN_EVENT;
}
+ if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
+ {
+ bucket->m_state |= Gci_container::GC_INCONSISTENT;
+ }
+
Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
if(unlikely(old_cnt == ~(Uint32)0))
{
@@ -1852,7 +1931,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
Uint32(gci >> 32), Uint32(gci),
Uint32(minGCI >> 32), Uint32(minGCI),
Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
- bucket->m_state = Gci_container::GC_COMPLETE;
+ bucket->m_state |= Gci_container::GC_COMPLETE;
bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
m_latest_complete_GCI = gci;
}
@@ -1886,7 +1965,7 @@ NdbEventBuffer::complete_outof_order_gci
Gci_container* bucket = find_bucket(start_gci);
assert(bucket);
assert(maxpos == m_max_gci_index);
- if (bucket->m_state != Gci_container::GC_COMPLETE)
+ if (!(bucket->m_state & Gci_container::GC_COMPLETE)) // Not complete
{
#ifdef VM_TRACE
verify_known_gci(false);
@@ -1984,7 +2063,7 @@ NdbEventBuffer::report_node_connected(Ui
NdbDictionary::Event::_TE_ACTIVE);
SubTableData::setReqNodeId(data.requestInfo, node_id);
SubTableData::setNdbdNodeId(data.requestInfo, node_id);
- data.logType = SubTableData::LOG;
+ data.flags = SubTableData::LOG;
Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
find_max_known_gci(&gci);
@@ -2019,7 +2098,7 @@ NdbEventBuffer::report_node_failure(Uint
NdbDictionary::Event::_TE_NODE_FAILURE);
SubTableData::setReqNodeId(data.requestInfo, node_id);
SubTableData::setNdbdNodeId(data.requestInfo, node_id);
- data.logType = SubTableData::LOG;
+ data.flags = SubTableData::LOG;
Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
find_max_known_gci(&gci);
@@ -2090,7 +2169,7 @@ NdbEventBuffer::completeClusterFailed()
data.requestInfo = 0;
SubTableData::setOperation(data.requestInfo,
NdbDictionary::Event::_TE_CLUSTER_FAILURE);
- data.logType = SubTableData::LOG;
+ data.flags = SubTableData::LOG;
data.gci_hi = Uint32(gci >> 32);
data.gci_lo = Uint32(gci);
@@ -3163,7 +3242,7 @@ NdbEventBuffer::reportStatus()
Uint64 apply_gci, latest_gci= m_latestGCI;
if (apply_buf == 0)
apply_buf= m_complete_data.m_data.m_head;
- if (apply_buf)
+ if (apply_buf && apply_buf->sdata)
{
Uint32 gci_hi = apply_buf->sdata->gci_hi;
Uint32 gci_lo = apply_buf->sdata->gci_lo;
diff -Nrup a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2007-09-12 13:30:24 +02:00
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2008-02-11 14:24:16 +01:00
@@ -139,7 +139,17 @@ public:
};
struct Gci_ops // 2
{
+ Gci_ops()
+ : m_gci(0),
+ m_consistent(true),
+ m_gci_op_list(NULL),
+ m_next(NULL),
+ m_gci_op_count(0)
+ {};
+ ~Gci_ops() {};
+
Uint64 m_gci;
+ bool m_consistent;
Gci_op *m_gci_op_list;
Gci_ops *m_next;
Uint32 m_gci_op_count;
@@ -160,7 +170,7 @@ public:
Uint32 m_is_not_multi_list; // 2
};
Gci_ops *first_gci_ops();
- Gci_ops *next_gci_ops();
+ Gci_ops *delete_next_gci_ops();
// case 1 above; add Gci_op to single list
void add_gci_op(Gci_op g);
private:
@@ -198,7 +208,7 @@ EventBufData_list::~EventBufData_list()
{
Gci_ops *op = first_gci_ops();
while (op)
- op = next_gci_ops();
+ op = delete_next_gci_ops();
}
DBUG_VOID_RETURN_EVENT;
}
@@ -271,7 +281,7 @@ EventBufData_list::first_gci_ops()
}
inline EventBufData_list::Gci_ops *
-EventBufData_list::next_gci_ops()
+EventBufData_list::delete_next_gci_ops()
{
assert(!m_is_not_multi_list);
Gci_ops *first = m_gci_ops_list;
@@ -320,8 +330,10 @@ struct Gci_container
{
enum State
{
- GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
+ GC_COMPLETE = 0x1, // GCI is complete, but waiting for out of order
+ GC_INCONSISTENT = 0x2 // GCI might be missing event data
};
+
Uint16 m_state;
Uint16 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
@@ -530,6 +542,9 @@ public:
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
int flushIncompleteEvents(Uint64 gci);
NdbEventOperation *nextEvent();
+ bool isConsistent(Uint64& gci);
+ bool isConsistentGCI(Uint64 gci);
+
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
Uint32* event_types);
void deleteUsedEventOperations();
diff -Nrup a/storage/ndb/test/ndbapi/test_event.cpp b/storage/ndb/test/ndbapi/test_event.cpp
--- a/storage/ndb/test/ndbapi/test_event.cpp 2007-12-04 07:25:27 +01:00
+++ b/storage/ndb/test/ndbapi/test_event.cpp 2008-02-11 14:24:16 +01:00
@@ -1847,6 +1847,177 @@ runBug31701(NDBT_Context* ctx, NDBT_Step
return NDBT_OK;
}
+int
+errorInjectBufferOverflow(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb * ndb= GETNDB(step);
+ NdbRestarter restarter;
+ const NdbDictionary::Table* pTab = ctx->getTab();
+ int result= NDBT_OK;
+ int res;
+ bool found_gap = false;
+ NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
+ Uint64 gci;
+
+ if (pOp == 0)
+ {
+ g_err << "Failed to createEventOperation" << endl;
+ return NDBT_FAILED;
+ }
+
+ if (restarter.insertErrorInAllNodes(13034) != 0)
+ {
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+
+ res = ndb->pollEvents(5000);
+
+ if (ndb->getNdbError().code != 0)
+ {
+ g_err << "pollEvents failed: \n";
+ g_err << ndb->getNdbError().code << " "
+ << ndb->getNdbError().message << endl;
+ result = (ndb->getNdbError().code == 4720)?NDBT_OK:NDBT_FAILED;
+ goto cleanup;
+ }
+ if (res >= 0) {
+ NdbEventOperation *tmp;
+ while (!found_gap && (tmp= ndb->nextEvent()))
+ {
+ if (!ndb->isConsistent(gci))
+ found_gap = true;
+ }
+ }
+ if (!ndb->isConsistent(gci))
+ found_gap = true;
+ if (!found_gap)
+ {
+ g_err << "buffer overflow not detected\n";
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+
+cleanup:
+
+ if (ndb->dropEventOperation(pOp) != 0) {
+ g_err << "dropping event operation failed\n";
+ result = NDBT_FAILED;
+ }
+
+ return result;
+}
+
+int
+errorInjectStalling(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb * ndb= GETNDB(step);
+ NdbRestarter restarter;
+ const NdbDictionary::Table* pTab = ctx->getTab();
+ NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
+ int result = NDBT_OK;
+ int res;
+ bool connected = true;
+
+ if (pOp == 0)
+ {
+ g_err << "Failed to createEventOperation" << endl;
+ return NDBT_FAILED;
+ }
+
+ if (restarter.insertErrorInAllNodes(13035) != 0)
+ {
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+
+ res = ndb->pollEvents(5000) > 0;
+
+ if (ndb->getNdbError().code != 0)
+ {
+ g_err << "pollEvents failed: \n";
+ g_err << ndb->getNdbError().code << " "
+ << ndb->getNdbError().message << endl;
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+
+ if (res > 0) {
+ NdbEventOperation *tmp;
+ int count = 0;
+ while (connected && (tmp= ndb->nextEvent()))
+ {
+ if (tmp != pOp)
+ {
+ printf("Found stray NdbEventOperation\n");
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+ switch (tmp->getEventType()) {
+ case NdbDictionary::Event::TE_CLUSTER_FAILURE:
+ connected = false;
+ break;
+ default:
+ count++;
+ break;
+ }
+ }
+ if (connected)
+ {
+ g_err << "failed to detect cluster disconnect\n";
+ result = NDBT_FAILED;
+ goto cleanup;
+ }
+ }
+
+cleanup:
+
+ if (ndb->dropEventOperation(pOp) != 0) {
+ g_err << "dropping event operation failed\n";
+ result = NDBT_FAILED;
+ }
+
+ // Reconnect by trying to start a transaction
+ uint retries = 100;
+ while (!connected && retries--)
+ {
+ HugoTransactions hugoTrans(* ctx->getTab());
+ if (hugoTrans.loadTable(ndb, 100) == 0)
+ {
+ connected = true;
+ result = NDBT_OK;
+ }
+ else
+ {
+ NdbSleep_MilliSleep(300);
+ result = NDBT_FAILED;
+ }
+ }
+
+ if (!connected)
+ g_err << "Failed to reconnect\n";
+
+ // Restart cluster with abort
+ if (restarter.restartAll(false, false, true) != 0){
+ ctx->stopTest();
+ return NDBT_FAILED;
+ }
+
+ // Stop the other thread
+ ctx->stopTest();
+
+ if (restarter.waitClusterStarted(300) != 0){
+ return NDBT_FAILED;
+ }
+
+ if (ndb->waitUntilReady() != 0){
+ return NDBT_FAILED;
+ }
+
+ return result;
+}
+
+
NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation",
"Verify that we can listen to Events"
@@ -1983,6 +2154,20 @@ TESTCASE("Bug31701", ""){
STEP(runBug31701);
FINALIZER(runDropEvent);
FINALIZER(runDropShadowTable);
+}
+TESTCASE("EventBufferOverflow",
+ "Simulating EventBuffer overflow while node restart"
+ "NOTE! No errors are allowed!" ){
+ INITIALIZER(runCreateEvent);
+ STEP(errorInjectBufferOverflow);
+ FINALIZER(runDropEvent);
+}
+TESTCASE("StallingSubscriber",
+ "Simulating slow subscriber that will become disconnected"
+ "NOTE! No errors are allowed!" ){
+ INITIALIZER(runCreateEvent);
+ STEP(errorInjectStalling);
+ FINALIZER(runDropEvent);
}
NDBT_TESTSUITE_END(test_event);
| Thread |
|---|
| • bk commit into 5.1 tree (mskold:1.2676) | Martin Skold | 11 Feb |