2971 Ole John Aske 2009-11-04
Fixes racecondition where a scan may be closed on SPJ nodes from
TC while SPJ has outstanding requests to its datanodes.
The reply from the datanodes will then not find the 'Request' and 'TreeNode'
structures required to handle the reply.
Fixed by setting a 'pending close' state when a close request arrives
in this situation. The close is then executed when datanodes
has completed.
modified:
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
2970 Ole John Aske 2009-11-03
Fixed a problem where the 'needClose' flag in a SCANREF was not used.
Instead we assumed that a REF always closed the scan on TC also.
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2009-11-04 10:58:11 +0000
@@ -247,11 +247,17 @@ public:
/**
* SCAN_FRAGCONF is received
*/
- SF_STARTED = 2
+ SF_STARTED = 2,
+
+ /**
+ * SCAN_NEXTREQ(close) has been sent to datanodes
+ */
+ SF_CLOSING = 3
};
- Uint32 m_scan_state; // Only valid is TreeNodeState >= TN_ACTIVE
+ Uint32 m_scan_state; // Only valid if TreeNodeState >= TN_ACTIVE
Uint32 m_scan_status; // fragmentCompleted
+ bool m_pending_close; // SCAN_NEXTREQ(close) pending while SF_RUNNING
/** True if signal has been received since sending
* last SCAN_FRAGREQ/SCAN_NEXTREQ*/
bool m_scan_fragconf_received;
@@ -357,7 +363,7 @@ public:
/**
* Is attrinfo "constructed"
- * (implies key info will be disowned (by send-signal)
+ * (implies attr info will be disowned (by send-signal)
*/
T_ATTRINFO_CONSTRUCTED = 0x8,
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2009-11-04 10:58:11 +0000
@@ -533,6 +533,7 @@ Dbspj::do_init(Request* requestP, const
requestP->m_transId[1] = req->transId2;
requestP->m_node_mask.clear();
requestP->m_rootResultData = req->resultData;
+ requestP->m_currentNodePtrI = RNIL;
}
void
@@ -949,32 +950,51 @@ Dbspj::execSCAN_NEXTREQ(Signal* signal)
Ptr<TreeNode> treeNodePtr;
m_treenode_pool.getPtr(treeNodePtr, requestPtr.p->m_currentNodePtrI);
- if (req->closeFlag == ZTRUE && // Requested close
scan
- treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+ if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING)
{
jam();
-
/**
- * TODO this needs more elaborate *abort* handling
+ * Duplicate of a close request already sent to datanodes.
+ * Ignore this and wait for reply on pending request.
*/
- ScanFragConf* conf =
reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
-
- conf->senderData = requestPtr.p->m_senderData;
- conf->transId1 = requestPtr.p->m_transId[0];
- conf->transId2 = requestPtr.p->m_transId[1];
- conf->completedOps = 0;
- conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
- conf->total_len = 0; // Not supported...
+ DEBUG("execSCAN_NEXTREQ, is SF_CLOSING -> ignore request");
+ return;
+ }
- DEBUG("execSCAN_NEXTREQ(close), fragmentCompleted:" <<
conf->fragmentCompleted);
+ if (req->closeFlag == ZTRUE) // Requested close scan
+ {
+ if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+ {
+ jam();
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state !=
ScanFragData::SF_RUNNING)
- sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
- ScanFragConf::SignalLength, JBB);
+ ScanFragConf* conf =
reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
+ conf->senderData = requestPtr.p->m_senderData;
+ conf->transId1 = requestPtr.p->m_transId[0];
+ conf->transId2 = requestPtr.p->m_transId[1];
+ conf->completedOps = 0;
+ conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
+ conf->total_len = 0; // Not supported...
+
+ DEBUG("execSCAN_NEXTREQ(close), LQH has conf'ed 'w/ ZSCAN_FRAG_CLOSED");
+ sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
+ ScanFragConf::SignalLength, JBB);
- cleanup(requestPtr);
- return;
+ cleanup(requestPtr);
+ return;
+ }
+ else if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING)
+ {
+ jam();
+ DEBUG("execSCAN_NEXTREQ, make PENDING CLOSE");
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = true;
+ return;
+ }
+ // else; fallthrough & send to datanodes:
}
+ ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_status != 2)
ndbrequire(treeNodePtr.p->m_info != 0 &&
treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
(this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
@@ -1757,6 +1777,10 @@ Dbspj::scanFrag_build(Build_context& ctx
treeNodePtr.p->m_info = &g_ScanFragOpInfo;
treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+ treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+
ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
dst->senderData = treeNodePtr.i;
dst->resultRef = reference();
@@ -1799,6 +1823,7 @@ Dbspj::scanFrag_build(Build_context& ctx
DEBUG("param len: " << param->len);
if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
{
+ jam();
DEBUG_CRASH();
break;
}
@@ -1818,6 +1843,7 @@ Dbspj::scanFrag_build(Build_context& ctx
nodeDA, treeBits, paramDA, paramBits);
if (unlikely(err != 0))
{
+ jam();
DEBUG_CRASH();
break;
}
@@ -1997,6 +2023,7 @@ Dbspj::scanFrag_send(Signal* signal,
NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
JBB, &handle);
+ ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
@@ -2007,6 +2034,12 @@ Dbspj::scanFrag_send(Signal* signal,
treeNodePtr.p->m_scanfrag_data.m_descendant_keyrefs_received = 0;
treeNodePtr.p->m_scanfrag_data.m_descendant_keyreqs_sent = 0;
treeNodePtr.p->m_scanfrag_data.m_missing_descendant_rows = 0;
+
+ /**
+ * Save position where next-scan-req should continue or close
+ */
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+ requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
}
/** Return true if scan batch is complete. This happens when all scan
@@ -2060,9 +2093,6 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
Ptr<Request> requestPtr,
Ptr<TreeNode> treeNodePtr)
{
- /**
- * TODO
- */
const ScanFragRef* const rep = reinterpret_cast<const
ScanFragRef*>(signal->getDataPtr());
Uint32 errCode = rep->errorCode;
@@ -2086,9 +2116,29 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
ScanFragRef::SignalLength, JBB);
- // TODO: Cleanup operation on SPJ block
+ treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
+//treeNodePtr.p->m_scanfrag_data.m_scan_status = 2; // (2=ZSCAN_FRAG_CLOSED)
+ ndbassert (isScanComplete(treeNodePtr.p->m_scanfrag_data));
-//ndbrequire(false);
+ /**
+ * SCAN_FRAGREF implies that datanodes closed the cursor.
+ * -> Pending close is effectively a NOOP, reset it
+ */
+ if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+ {
+ jam();
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+ DEBUG(" SCAN_FRAGREF, had pending close which can be ignored (is closed)");
+ }
+
+ /**
+ * Cleanup operation on SPJ block, remove all allocated resources.
+ */
+ {
+ jam();
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+ nodeFinished(signal, requestPtr, treeNodePtr);
+ }
}
@@ -2114,7 +2164,8 @@ Dbspj::scanFrag_execSCAN_FRAGCONF(Signal
treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
}
treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
- if(isScanComplete(treeNodePtr.p->m_scanfrag_data)){
+ if (isScanComplete(treeNodePtr.p->m_scanfrag_data))
+ {
jam();
scanFrag_batch_complete(signal, requestPtr, treeNodePtr);
}
@@ -2126,8 +2177,34 @@ Dbspj::scanFrag_batch_complete(Signal* s
Ptr<TreeNode> treeNodePtr)
{
DEBUG("scanFrag_batch_complete()");
- ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
- ScanFragData::SF_RUNNING);
+
+ if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+ {
+ jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_RUNNING);
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
+
+ DEBUG("scanFrag_batch_complete() - has pending close, ignore this reply, request
close");
+
+ ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
+
+ /**
+ * SCAN_NEXTREQ(close) was requested while we where waiting for
+ * datanodes to complete this request.
+ * - Send close request to LQH now.
+ * - Suppress reply to TC/API, will reply later when close is conf'ed
+ */
+ req->closeFlag = ZTRUE;
+ req->senderData = treeNodePtr.i;
+ req->transId1 = requestPtr.p->m_transId[0];
+ req->transId2 = requestPtr.p->m_transId[1];
+ req->batch_size_rows = 0;
+ req->batch_size_bytes = 0;
+
+ treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+ scanFrag_execSCAN_NEXTREQ(signal, requestPtr, treeNodePtr);
+ return;
+ }
/**
* one batch complete...
@@ -2149,6 +2226,8 @@ Dbspj::scanFrag_batch_complete(Signal* s
if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2)
{
jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING
||
+ treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_CLOSING);
/**
* EOF for scan
*/
@@ -2158,11 +2237,12 @@ Dbspj::scanFrag_batch_complete(Signal* s
else
{
jam();
+ ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_RUNNING);
/**
- * Save position where next-scan-req should continue
+ * Check position where next-scan-req should continue
*/
treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
- requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
+ assert(requestPtr.p->m_currentNodePtrI == treeNodePtr.i);
}
}
@@ -2182,6 +2262,7 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
Ptr<TreeNode> treeNodePtr)
{
jamEntry();
+ ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_STARTED);
ScanFragNextReq* nextReq =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
nextReq->senderData = treeNodePtr.i;
@@ -2197,7 +2278,10 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
ScanFragNextReq::SignalLength,
JBB);
- treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+ treeNodePtr.p->m_scanfrag_data.m_scan_state = (nextReq->closeFlag == ZTRUE)
+ ? ScanFragData::SF_CLOSING
+ : ScanFragData::SF_RUNNING;
+
treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20091104105811-tf1pyxav7sfs1o9g.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2970 to 2971) | Ole John Aske | 4 Nov 2009 |