List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 4 2009 11:58am
Subject:bzr push into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2970 to 2971)
View as plain text  
 2971 Ole John Aske	2009-11-04
      Fixes racecondition where a scan may be closed on SPJ nodes from 
      TC while SPJ has outstanding requests to its datanodes. 
      The reply from the datanodes will then not find the 'Request' and 'TreeNode' 
      structures required to handle the reply.
      
      Fixed by setting a 'pending close' state when a close request arrives
      in this situation. The close is then executed when datanodes 
      has completed.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
 2970 Ole John Aske	2009-11-03
      Fixed a problem where the 'needClose' flag in a SCANREF was not used. 
      Instead we assumed that a REF always closed the scan on TC also.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-11-04 10:58:11 +0000
@@ -247,11 +247,17 @@ public:
       /**
        * SCAN_FRAGCONF is received
        */
-      SF_STARTED = 2
+      SF_STARTED = 2,
+
+      /**
+       * SCAN_NEXTREQ(close) has been sent to datanodes
+       */
+      SF_CLOSING = 3
     };
 
-    Uint32 m_scan_state;     // Only valid is TreeNodeState >= TN_ACTIVE
+    Uint32 m_scan_state;     // Only valid if TreeNodeState >= TN_ACTIVE
     Uint32 m_scan_status;    // fragmentCompleted
+    bool   m_pending_close;  // SCAN_NEXTREQ(close) pending while SF_RUNNING
     /** True if signal has been received since sending 
      * last SCAN_FRAGREQ/SCAN_NEXTREQ*/
     bool   m_scan_fragconf_received; 
@@ -357,7 +363,7 @@ public:
 
       /**
        * Is attrinfo "constructed"
-       *   (implies key info will be disowned (by send-signal)
+       *   (implies attr info will be disowned (by send-signal)
        */
       T_ATTRINFO_CONSTRUCTED = 0x8,
 

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-11-04 10:58:11 +0000
@@ -533,6 +533,7 @@ Dbspj::do_init(Request* requestP, const 
   requestP->m_transId[1] = req->transId2;
   requestP->m_node_mask.clear();
   requestP->m_rootResultData = req->resultData;
+  requestP->m_currentNodePtrI = RNIL;
 }
 
 void
@@ -949,32 +950,51 @@ Dbspj::execSCAN_NEXTREQ(Signal* signal)
   Ptr<TreeNode> treeNodePtr;
   m_treenode_pool.getPtr(treeNodePtr, requestPtr.p->m_currentNodePtrI);
 
-  if (req->closeFlag == ZTRUE &&                         // Requested close
scan
-      treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+  if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING)
   {
     jam();
-
     /**
-     * TODO this needs more elaborate *abort* handling
+     * Duplicate of a close request already sent to datanodes.
+     * Ignore this and wait for reply on pending request.
      */
-    ScanFragConf* conf =
reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
-
-    conf->senderData = requestPtr.p->m_senderData;
-    conf->transId1 = requestPtr.p->m_transId[0];
-    conf->transId2 = requestPtr.p->m_transId[1];
-    conf->completedOps = 0;
-    conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
-    conf->total_len = 0; // Not supported...
+    DEBUG("execSCAN_NEXTREQ, is SF_CLOSING -> ignore request");
+    return;
+  }
 
-    DEBUG("execSCAN_NEXTREQ(close), fragmentCompleted:" <<
conf->fragmentCompleted);
+  if (req->closeFlag == ZTRUE)                             // Requested close scan
+  {
+    if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+    {
+      jam();
+      ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state !=
ScanFragData::SF_RUNNING)
 
-    sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
-               ScanFragConf::SignalLength, JBB);
+      ScanFragConf* conf =
reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
+      conf->senderData = requestPtr.p->m_senderData;
+      conf->transId1 = requestPtr.p->m_transId[0];
+      conf->transId2 = requestPtr.p->m_transId[1];
+      conf->completedOps = 0;
+      conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
+      conf->total_len = 0; // Not supported...
+
+      DEBUG("execSCAN_NEXTREQ(close), LQH has conf'ed 'w/ ZSCAN_FRAG_CLOSED");
+      sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
+                 ScanFragConf::SignalLength, JBB);
 
-    cleanup(requestPtr);
-    return;
+      cleanup(requestPtr);
+      return;
+    }
+    else if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING)
+    {
+      jam();
+      DEBUG("execSCAN_NEXTREQ, make PENDING CLOSE");
+      treeNodePtr.p->m_scanfrag_data.m_pending_close = true;
+      return;
+    }
+    // else; fallthrough & send to datanodes:
   }
 
+  ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
+  ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_status != 2)
   ndbrequire(treeNodePtr.p->m_info != 0 &&
              treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
   (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
@@ -1757,6 +1777,10 @@ Dbspj::scanFrag_build(Build_context& ctx
     treeNodePtr.p->m_info = &g_ScanFragOpInfo;
     treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
 
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+    treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+
     ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
     dst->senderData = treeNodePtr.i;
     dst->resultRef = reference();
@@ -1799,6 +1823,7 @@ Dbspj::scanFrag_build(Build_context& ctx
     DEBUG("param len: " << param->len);
     if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
     {
+      jam();
       DEBUG_CRASH();
       break;
     }
@@ -1818,6 +1843,7 @@ Dbspj::scanFrag_build(Build_context& ctx
                   nodeDA, treeBits, paramDA, paramBits);
     if (unlikely(err != 0))
     {
+      jam();
       DEBUG_CRASH();
       break;
     }
@@ -1997,6 +2023,7 @@ Dbspj::scanFrag_send(Signal* signal,
 	     NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
              JBB, &handle);
 
+  ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
   treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
   treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
   treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
@@ -2007,6 +2034,12 @@ Dbspj::scanFrag_send(Signal* signal,
   treeNodePtr.p->m_scanfrag_data.m_descendant_keyrefs_received = 0;
   treeNodePtr.p->m_scanfrag_data.m_descendant_keyreqs_sent = 0;
   treeNodePtr.p->m_scanfrag_data.m_missing_descendant_rows = 0;
+
+  /**
+   * Save position where next-scan-req should continue or close
+   */
+  treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+  requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
 }
 
 /** Return true if scan batch is complete. This happens when all scan 
@@ -2060,9 +2093,6 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
                                  Ptr<Request> requestPtr,
                                  Ptr<TreeNode> treeNodePtr)
 {
-  /**
-   * TODO
-   */
   const ScanFragRef* const rep = reinterpret_cast<const
ScanFragRef*>(signal->getDataPtr());
   Uint32 errCode = rep->errorCode;
 
@@ -2086,9 +2116,29 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
   sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
 	     ScanFragRef::SignalLength, JBB);
 
-  // TODO: Cleanup operation on SPJ block
+  treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
+//treeNodePtr.p->m_scanfrag_data.m_scan_status = 2;  // (2=ZSCAN_FRAG_CLOSED)
+  ndbassert (isScanComplete(treeNodePtr.p->m_scanfrag_data));
 
-//ndbrequire(false);
+  /**
+   * SCAN_FRAGREF implies that datanodes closed the cursor.
+   *  -> Pending close is effectively a NOOP, reset it
+   */
+  if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+  {
+    jam();
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+    DEBUG(" SCAN_FRAGREF, had pending close which can be ignored (is closed)");
+  }
+
+  /**
+   * Cleanup operation on SPJ block, remove all allocated resources.
+   */
+  {
+    jam();
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+    nodeFinished(signal, requestPtr, treeNodePtr);
+  }
 }
 
 
@@ -2114,7 +2164,8 @@ Dbspj::scanFrag_execSCAN_FRAGCONF(Signal
     treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
   }
   treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
-  if(isScanComplete(treeNodePtr.p->m_scanfrag_data)){
+  if (isScanComplete(treeNodePtr.p->m_scanfrag_data))
+  {
     jam();
     scanFrag_batch_complete(signal, requestPtr, treeNodePtr);
   }
@@ -2126,8 +2177,34 @@ Dbspj::scanFrag_batch_complete(Signal* s
                                Ptr<TreeNode> treeNodePtr)
 {
   DEBUG("scanFrag_batch_complete()");
-  ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
-             ScanFragData::SF_RUNNING);
+
+  if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+  {
+    jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_RUNNING);
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
+
+    DEBUG("scanFrag_batch_complete() - has pending close, ignore this reply, request
close");
+
+    ScanFragNextReq* req =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
+
+    /**
+     * SCAN_NEXTREQ(close) was requested while we where waiting for 
+     * datanodes to complete this request. 
+     *   - Send close request to LQH now.
+     *   - Suppress reply to TC/API, will reply later when close is conf'ed
+     */
+    req->closeFlag = ZTRUE;
+    req->senderData = treeNodePtr.i;
+    req->transId1 = requestPtr.p->m_transId[0];
+    req->transId2 = requestPtr.p->m_transId[1];
+    req->batch_size_rows = 0;
+    req->batch_size_bytes = 0;
+
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+    scanFrag_execSCAN_NEXTREQ(signal, requestPtr, treeNodePtr);
+    return;
+  }
 
   /**
    * one batch complete...
@@ -2149,6 +2226,8 @@ Dbspj::scanFrag_batch_complete(Signal* s
   if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2)
   {
     jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING
||
+               treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_CLOSING);
     /**
      * EOF for scan
      */
@@ -2158,11 +2237,12 @@ Dbspj::scanFrag_batch_complete(Signal* s
   else
   {
     jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
ScanFragData::SF_RUNNING);
     /**
-     * Save position where next-scan-req should continue
+     * Check position where next-scan-req should continue
      */
     treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
-    requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
+    assert(requestPtr.p->m_currentNodePtrI == treeNodePtr.i);
   }
 }
 
@@ -2182,6 +2262,7 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
                                  Ptr<TreeNode> treeNodePtr)
 {
   jamEntry();
+  ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_STARTED);
 
   ScanFragNextReq* nextReq =
reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
   nextReq->senderData = treeNodePtr.i;
@@ -2197,7 +2278,10 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
              ScanFragNextReq::SignalLength, 
              JBB);
 
-  treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+  treeNodePtr.p->m_scanfrag_data.m_scan_state = (nextReq->closeFlag == ZTRUE)
+    ? ScanFragData::SF_CLOSING 
+    : ScanFragData::SF_RUNNING;
+
   treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
   treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
   treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;


Attachment: [text/bzr-bundle] bzr/ole.john.aske@sun.com-20091104105811-tf1pyxav7sfs1o9g.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj branch (ole.john.aske:2970 to 2971) Ole John Aske4 Nov 2009