List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 16 2011 1:33pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3533 to 3534)
View as plain text  
 3534 Ole John Aske	2011-08-16 [merge]
      Merge

    modified:
      mysql-test/r/group_by.result
      mysql-test/t/group_by.test
      sql/sql_select.cc
      storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 3533 Ole John Aske	2011-08-16 [merge]
      Merge

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'mysql-test/r/group_by.result'
--- a/mysql-test/r/group_by.result	2010-10-29 08:23:06 +0000
+++ b/mysql-test/r/group_by.result	2011-08-16 10:20:19 +0000
@@ -1856,3 +1856,18 @@ COUNT(*)
 2
 DROP TABLE t1;
 # End of 5.1 tests
+#
+# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+#
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+pk
+DROP VIEW v1;
+DROP TABLE t1,t2;
+# End of Bug#12798270

=== modified file 'mysql-test/t/group_by.test'
--- a/mysql-test/t/group_by.test	2010-10-29 08:23:06 +0000
+++ b/mysql-test/t/group_by.test	2011-08-16 10:20:19 +0000
@@ -1248,3 +1248,24 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1;
 DROP TABLE t1;
 
 --echo # End of 5.1 tests
+
+--echo #
+--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+--echo #
+
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+
+DROP VIEW v1;
+DROP TABLE t1,t2;
+
+--echo # End of Bug#12798270

=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc	2011-07-09 11:16:31 +0000
+++ b/sql/sql_select.cc	2011-08-16 13:33:13 +0000
@@ -6948,7 +6948,15 @@ make_join_readinfo(JOIN *join, ulonglong
          (join->sort_by_table == (TABLE *) 1 && i != join->const_tables)))
       ordered_set= 1;
 
+#ifdef MCP_BUG12798270
     tab->sorted= sorted;
+#else
+    /*
+      For eq_ref there is at most one join match for each row from
+      previous tables so ordering is not useful.
+    */
+    tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false;
+#endif
     sorted= 0;                                  // only first must be sorted
     table->status=STATUS_NO_RECORD;
     pick_table_access_method (tab);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-08-16 08:47:35 +0000
@@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
 {
   const Frag& frag = node.m_frag;
   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
-  KeyData prefKey(index.m_keySpec, false, 0);
-  prefKey.set_buf(node.getPref(), index.m_prefBytes);
+  /*
+   * bug#12873640
+   * Node prefix exists if it has non-zero number of attributes.  It is
+   * then a partial instance of KeyData.  If the prefix does not exist
+   * then set_buf() could overwrite m_pageId1 in first entry, causing
+   * random crash in TUP via readKeyAttrs().
+   */
   if (index.m_prefAttrs > 0) {
+    KeyData prefKey(index.m_keySpec, false, 0);
+    prefKey.set_buf(node.getPref(), index.m_prefBytes);
     jam();
     readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
-  }
 #ifdef VM_TRACE
-  if (debugFlags & DebugMaint) {
-    debugOut << "setNodePref: " << node;
-    debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
-    debugOut << endl;
-  }
+    if (debugFlags & DebugMaint) {
+      debugOut << "setNodePref: " << node;
+      debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+      debugOut << endl;
+    }
 #endif
+  }
 }
 
 // node operations

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-08-16 08:27:14 +0000
@@ -39,6 +39,16 @@ static const char * f_method = "MSms";
 #endif
 #define MAX_CHUNKS 10
 
+#ifdef VM_TRACE
+#ifndef NDBD_RANDOM_START_PAGE
+#define NDBD_RANDOM_START_PAGE
+#endif
+#endif
+
+#ifdef NDBD_RANDOM_START_PAGE
+static Uint32 g_random_start_page_id = 0;
+#endif
+
 /*
  * For muti-threaded ndbd, these calls are used for locking around
  * memory allocation operations.
@@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager()
   mt_mem_manager_init();
 }
 
+void*
+Ndbd_mem_manager::get_memroot() const
+{
+#ifdef NDBD_RANDOM_START_PAGE
+  return (void*)(m_base_page - g_random_start_page_id);
+#else
+  return (void*)m_base_page;
+#endif
+}
+
 /**
  *
  * resource 0 has following semantics:
@@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun
   }
 #endif
 
+#ifdef NDBD_RANDOM_START_PAGE
+  /**
+   * In order to find bad-users of page-id's
+   *   we add a random offset to the page-id's returned
+   *   however, due to ZONE_LO that offset can't be that big
+   *   (since we at get_page don't know if it's a HI/LO page)
+   */
+  Uint32 max_rand_start = ZONE_LO_BOUND - 1;
+  if (max_rand_start > pages)
+  {
+    max_rand_start -= pages;
+    if (max_rand_start > 0x10000)
+      g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000));
+    else if (max_rand_start)
+      g_random_start_page_id = rand() % max_rand_start;
+
+    assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF);
+
+    ndbout_c("using g_random_start_page_id: %u (%.8x)",
+             g_random_start_page_id, g_random_start_page_id);
+  }
+#endif
+
   /**
    * Do malloc
    */
@@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone, 
       return;
     * pages = save;
   }
-  
+
   alloc_impl(ZONE_LO, ret, pages, min);
 }
 
@@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type
 
       check_resource_limits(m_resource_limit);
       mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+      *i += g_random_start_page_id;
+      return m_base_page + *i - g_random_start_page_id;
+#else
       return m_base_page + *i;
+#endif
     }
   }
   mt_mem_manager_unlock();
@@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
   release(i, 1);
   m_resource_limit[0].m_curr = tot.m_curr - 1;
@@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ
     m_resource_limit[idx].m_curr = rl.m_curr + req;
     check_resource_limits(m_resource_limit);
     mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+    *i += g_random_start_page_id;
+#endif
     return ;
   }
   mt_mem_manager_unlock();
   * cnt = req;
+#ifdef NDBD_RANDOM_START_PAGE
+  *i += g_random_start_page_id;
+#endif
   return;
 }
 
@@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   release(i, cnt);
 
   Uint32 currnew = rl.m_curr - cnt;

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-08-16 13:33:13 +0000
@@ -68,7 +68,7 @@ public:
 
   bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true);
   void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0);
-  void* get_memroot() const { return (void*)m_base_page;}
+  void* get_memroot() const;
   
   void dump() const ;
   

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 08:15:13 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 13:33:13 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
  * children. That way, results for child operations can be updated correctly
  * when the application iterates over the results of the root scan operation.
  */
+class TupleCorrelation
+{
+public:
+  static const Uint32 wordCount = 1;
+
+  explicit TupleCorrelation()
+  : m_correlation((tupleNotFound<<16) | tupleNotFound)
+  {}
+
+  /** Conversion to/from Uint32 to store/fetch from buffers */
+  explicit TupleCorrelation(Uint32 val)
+  : m_correlation(val)
+  {}
+  Uint32 toUint32() const 
+  { return m_correlation; }
+
+  Uint16 getTupleId() const
+  { return m_correlation & 0xffff;}
+
+  Uint16 getParentTupleId() const
+  { return m_correlation >> 16;}
+
+private:
+  Uint32 m_correlation;
+}; // class TupleCorrelation
+
 class CorrelationData
 {
 public:
@@ -99,18 +125,15 @@ public:
     assert(AttributeHeader(m_corrPart[0]).getAttributeId() 
            == AttributeHeader::CORR_FACTOR64);
     assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
-    assert(getTupleId()<tupleNotFound);
-    assert(getParentTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
   }
 
   Uint32 getRootReceiverId() const
   { return m_corrPart[2];}
 
-  Uint16 getTupleId() const
-  { return m_corrPart[1] & 0xffff;}
-
-  Uint16 getParentTupleId() const
-  { return m_corrPart[1] >> 16;}
+  const TupleCorrelation getTupleCorrelation() const
+  { return TupleCorrelation(m_corrPart[1]); }
 
 private:
   const Uint32* const m_corrPart;
@@ -166,6 +189,11 @@ public:
    */
   void reset();
 
+  /**
+   * Prepare for reading another batch of results.
+   */
+  void prepareResultSet();
+
   void incrOutstandingResults(Int32 delta)
   {
     m_outstandingResults += delta;
@@ -307,6 +335,9 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
+  char* getRow(Uint32 tupleNo) const
+  { return (m_buffer + (tupleNo*m_rowSize)); }
+
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
    * ids and pass it on to m_receiver.
@@ -314,12 +345,15 @@ public:
    * @param ptr buffer holding tuple.
    * @param len buffer length.
    */
-  void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+  void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                      TupleCorrelation correlation);
 
-  /** A complete batch has been received for a fragment on this NdbResultStream,
-   *  Update whatever required before the appl. are allowed to navigate the result.
+  /**
+   * A complete batch has been received for a fragment on this NdbResultStream,
+   * Update whatever required before the appl. are allowed to navigate the result.
+   * @return true if node and all its siblings have returned all rows.
    */ 
-  void handleBatchComplete();
+  bool prepareResultSet();
 
   /**
    * Navigate within the current result batch to resp. first and next row.
@@ -364,12 +398,6 @@ public:
     return m_subScanComplete; 
   }
 
-  /** Variant of isSubScanComplete() above which checks that this resultstream
-   * and all its descendants have consumed all batches of rows instantiated 
-   * from their parent operation(s).
-   */
-  bool isAllSubScansComplete() const;
-
   bool isScanQuery() const
   { return (m_properties & Is_Scan_Query); }
 
@@ -420,7 +448,7 @@ public:
      * that had no matching children.*/
     Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
 
-    explicit TupleSet()
+    explicit TupleSet() : m_hash_head(tupleNotFound)
     {}
 
   private:
@@ -452,9 +480,14 @@ private:
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
   /** Used for checking if buffer overrun occurred. */
   Uint32* m_batchOverflowCheck;
 
+  Uint32 m_rowSize;
+
   /** The number of transid_AI messages received.*/
   Uint32 m_rowCount;
 
@@ -489,11 +522,7 @@ private:
   /** TupleSet contains the correlation between parent/childs */
   TupleSet* m_tupleSet;
 
-  void clearTupleSet();
-
-  void setParentChildMap(Uint16 parentId,
-                         Uint16 tupleId, 
-                         Uint16 tupleNo);
+  void buildResultCorrelations();
 
   Uint16 getTupleId(Uint16 tupleNo) const
   { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -576,7 +605,9 @@ NdbResultStream::NdbResultStream(NdbQuer
      | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
        ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
+  m_buffer(NULL),
   m_batchOverflowCheck(NULL),
+  m_rowSize(0),
   m_rowCount(0),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
@@ -599,6 +630,8 @@ NdbResultStream::prepare()
   const Uint32 rowSize = m_operation.getRowSize();
   NdbQueryImpl &query = m_operation.getQuery();
 
+  m_rowSize = rowSize;
+
   /* Parent / child correlation is only relevant for scan type queries
    * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
    * Neither is these structures required for operations not having respective
@@ -610,16 +643,13 @@ NdbResultStream::prepare()
     m_tupleSet = 
       new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) 
       TupleSet[m_maxRows];
-
-    clearTupleSet();
   }
   else
     m_maxRows = 1;
 
   const int bufferSize = rowSize * m_maxRows;
   NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
-  char* const rowBuf = reinterpret_cast<char*>(bufferAlloc
-                                               .allocObjMem(bufferSize));
+  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
 
   // So that we can test for buffer overrun.
   m_batchOverflowCheck = 
@@ -633,7 +663,7 @@ NdbResultStream::prepare()
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
                           rowSize,
-                          rowBuf);
+                          m_buffer);
 } //NdbResultStream::prepare
 
 void
@@ -646,7 +676,6 @@ NdbResultStream::reset()
   m_iterState = Iter_notStarted;
   m_currentRow = tupleNotFound;
 
-  clearTupleSet();
   m_receiver.prepareSend();
   /**
    * If this stream will get new rows in the next batch, then so will
@@ -660,84 +689,10 @@ NdbResultStream::reset()
   }
 } //NdbResultStream::reset
 
-void
-NdbResultStream::clearTupleSet()
-{
-  assert (isScanQuery());
-  for (Uint32 i=0; i<m_maxRows; i++)
-  {
-    m_tupleSet[i].m_parentId = tupleNotFound;
-    m_tupleSet[i].m_tupleId  = tupleNotFound;
-    m_tupleSet[i].m_hash_head = tupleNotFound;
-    m_tupleSet[i].m_skip = false;
-    m_tupleSet[i].m_hasMatchingChild.clear();
-  }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{ 
-  // Lookups should always be 'complete'
-  assert(m_subScanComplete || isScanResult());
-
-  if (!m_subScanComplete)
-    return false;
-
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); 
-       childNo++)
-  {
-    const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    const NdbResultStream& childStream = m_rootFrag.getResultStream(child);
-    if (!childStream.isAllSubScansComplete())
-      return false;
-  }
-  return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
-                                   Uint16 tupleId, 
-                                   Uint16 tupleNo)
-{
-  assert (isScanQuery());
-  assert (tupleNo < m_maxRows);
-  assert (tupleId != tupleNotFound);
-
-  for (Uint32 i = 0; i < tupleNo; i++)
-  {
-    // Check that tuple id is unique.
-    assert (m_tupleSet[i].m_tupleId != tupleId); 
-  }
-  m_tupleSet[tupleNo].m_parentId = parentId;
-  m_tupleSet[tupleNo].m_tupleId  = tupleId;
-
-  const Uint16 hash = (parentId % m_maxRows);
-  if (parentId == tupleNotFound)
-  {
-    /* Root stream: Insert sequentially in hash_next to make it
-     * possible to use ::findTupleWithParentId() and ::findNextTuple()
-     * to navigate even the root operation.
-     */
-    assert (m_parent==NULL);
-    /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
-    if (tupleNo==0)
-      m_tupleSet[hash].m_hash_head  = 0;
-    else
-      m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
-    m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
-  }
-  else
-  {
-    /* Insert parentId in HashMap */
-    m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
-    m_tupleSet[hash].m_hash_head  = tupleNo;
-  }
-}
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
- *  from the root operation in the order which they was inserted by ::setParentChildMap()
+ *  from the root operation in the order which they was inserted by 
+ *  ::buildResultCorrelations()
  *
  *  Position of 'currentRow' is *not* updated and should be modified by callee
  *  if it want to keep the new position.
@@ -843,99 +798,188 @@ NdbResultStream::nextResult()
 } //NdbResultStream::nextResult()
 
 
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
 void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                                TupleCorrelation correlation)
 {
-  assert(m_iterState == Iter_notStarted);
+  m_receiver.execTRANSID_AI(ptr, len);
+  m_rowCount++;
+
   if (isScanQuery())
   {
-    const CorrelationData correlData(ptr, len);
-
-    assert(m_rootFrag.getResultStream(0).m_receiver.getId() ==
-           correlData.getRootReceiverId());
-
-    m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
-
     /**
-     * Keep correlation data between parent and child tuples.
-     * Since tuples may arrive in any order, we cannot match
-     * parent and child until all tuples (for this batch and 
-     * root fragment) have arrived.
+     * Store TupleCorrelation as hidden value imm. after received row
+     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
      */
-    setParentChildMap(m_parent==NULL
-                      ? tupleNotFound
-                      : correlData.getParentTupleId(),
-                      correlData.getTupleId(),
-                      m_rowCount);
+    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+    row_recv[-1] = correlation.toUint32();
   }
-  else
-  {
-    // Lookup query.
-    m_receiver.execTRANSID_AI(ptr, len);
-  }
-  m_rowCount++;
-  /* Set correct #rows received in the NdbReceiver.
-   */
-  getReceiver().m_result_rows = getRowCount();
 } // NdbResultStream::execTRANSID_AI()
 
-
 /**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ *  - Fill in parent/child result correlations in m_tupleSet[]
+ *  - ... or reset m_tupleSet[] if we reuse the previous.
+ *  - Apply inner/outer join filtering to remove non qualifying 
+ *    rows.
  */
-void 
-NdbResultStream::handleBatchComplete()
+bool 
+NdbResultStream::prepareResultSet()
 {
-  // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+  bool isComplete = isSubScanComplete();  //Childs with more rows
+  assert(isComplete || isScanResult());   //Lookups always 'complete'
+
+  // Set correct #rows received in the NdbReceiver.
+  getReceiver().m_result_rows = getRowCount();
 
-  for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+  /**
+   * Prepare NdbResultStream for reading - either the next received
+   * from datanodes or reuse current.
+   */
+  if (m_tupleSet!=NULL)
   {
-    m_tupleSet[tupleNo].m_skip = false;
+    const bool newResults = (m_iterState!=Iter_finished);
+    if (newResults)
+    {
+      buildResultCorrelations();
+    }
+    else
+    {
+      // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      {
+        m_tupleSet[tupleNo].m_skip = false;
+      }
+    }
   }
 
+  /**
+   * Recursively iterate all child results depth first. 
+   * Filter away any result rows which should not be visible (yet) - 
+   * Either due to incomplete child batches, or the join being an 'inner join'.
+   * Set result itterator state to 'before first' resultrow.
+   */
   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
     NdbResultStream& childStream = m_rootFrag.getResultStream(child);
-    childStream.handleBatchComplete();
+    const bool allSubScansComplete = childStream.prepareResultSet();
+
+    Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
 
-    const bool isInnerJoin = childStream.isInnerJoin();
-    const bool allSubScansComplete = childStream.isAllSubScansComplete();
+    /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+    const bool skipNonMatches = !allSubScansComplete ||      // 1)
+                                childStream.isInnerJoin();   // 2)
 
-    for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+    if (m_tupleSet!=NULL)
     {
-      if (!m_tupleSet[tupleNo].m_skip)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
       {
-        Uint16 tupleId = getTupleId(tupleNo);
-        if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
-          m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
-        /////////////////////////////////
-        //  No child matched for this row. Making parent row visible
-        //  will cause a NULL (outer join) row to be produced.
-        //  Skip NULL row production when:
-        //    1) Some child batches are not complete; they may contain later matches.
-        //    2) A match was found in a previous batch.
-        //    3) Join type is 'inner join', skip as no child are matching.
-        //
-        else if (!allSubScansComplete                                 // 1)
-             ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo)  // 2)
-             ||  isInnerJoin)                                         // 3)
-          m_tupleSet[tupleNo].m_skip = true;
+        if (!m_tupleSet[tupleNo].m_skip)
+        {
+          Uint16 tupleId = getTupleId(tupleNo);
+          if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+            m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+          /////////////////////////////////
+          //  No child matched for this row. Making parent row visible
+          //  will cause a NULL (outer join) row to be produced.
+          //  Skip NULL row production when:
+          //    1) Some child batches are not complete; they may contain later matches.
+          //    2) Join type is 'inner join', skip as no child are matching.
+          //    3) A match was found in a previous batch.
+          //  Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+          //
+          else if (skipNonMatches                                       // 1 & 2)
+               ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+            m_tupleSet[tupleNo].m_skip = true;
+        }
       }
     }
+    isComplete &= allSubScansComplete;
   }
-  m_currentRow = tupleNotFound;
+
+  // Set current position 'before first'
   m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+  m_currentRow = tupleNotFound;
+
+  return isComplete; 
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent 
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the 
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to 
+ * read the NdbResultStream.
+ */
+void 
+NdbResultStream::buildResultCorrelations()
+{
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+  {
+    /* Clear the hashmap structures */
+    for (Uint32 i=0; i<m_maxRows; i++)
+    {
+      m_tupleSet[i].m_hash_head = tupleNotFound;
+    }
+
+    /* Rebuild correlation & hashmap from received buffers */
+    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    {
+      const Uint32* row = (Uint32*)getRow(tupleNo+1);
+      const TupleCorrelation correlation(row[-1]);
+
+      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 parentId = (m_parent!=NULL) 
+                                ? correlation.getParentTupleId()
+                                : tupleNotFound;
+
+      m_tupleSet[tupleNo].m_skip     = false;
+      m_tupleSet[tupleNo].m_parentId = parentId;
+      m_tupleSet[tupleNo].m_tupleId  = tupleId;
+      m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+      /* Insert into parentId-hashmap */
+      const Uint16 hash = (parentId % m_maxRows);
+      if (m_parent==NULL)
+      {
+        /* Root stream: Insert sequentially in hash_next to make it
+         * possible to use ::findTupleWithParentId() and ::findNextTuple()
+         * to navigate even the root operation.
+         */
+        /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+        if (tupleNo==0)
+          m_tupleSet[hash].m_hash_head  = tupleNo;
+        else
+          m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
+        m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
+      }
+      else
+      {
+        /* Insert parentId in HashMap */
+        m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+        m_tupleSet[hash].m_hash_head  = tupleNo;
+      }
+    }
+  }
+} // NdbResultStream::buildResultCorrelations
 
 
 ///////////////////////////////////////////
-/////////  NdbRootFragment methods ///////////
+////////  NdbRootFragment methods /////////
 ///////////////////////////////////////////
 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags, 
                                         Uint32 noOfFrags)
@@ -1056,6 +1100,16 @@ void NdbRootFragment::reset()
   m_confReceived = false;
 }
 
+void NdbRootFragment::prepareResultSet()
+{
+  NdbResultStream& rootStream = getResultStream(0);
+  rootStream.prepareResultSet();  
+
+  /* Position at the first (sorted?) row available from this fragments.
+   */
+  rootStream.firstResult();
+}
+
 void NdbRootFragment::setConfReceived()
 { 
   /* For a query with a lookup root, there may be more than one TCKEYCONF
@@ -2120,6 +2174,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
         NdbRootFragment* frag;
         while ((frag=m_fullFrags.pop()) != NULL)
         {
+          frag->prepareResultSet();
           m_applFrags.add(*frag);
         }
         if (m_applFrags.getCurrent() != NULL)
@@ -2174,6 +2229,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
     NdbRootFragment* frag;
     if ((frag=m_fullFrags.pop()) != NULL)
     {
+      frag->prepareResultSet();
       m_applFrags.add(*frag);
     }
     assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2213,7 +2269,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
            << ", finalBatchFrags=" << m_finalBatchFrags
            <<  endl;
   }
-  bool resume = false;
 
   /* May received fragment data after a SCANREF() (timeout?) 
    * terminated the scan.  We are about to close this query, 
@@ -2221,7 +2276,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
    */
   if (likely(m_fullFrags.m_errorCode == 0))
   {
-    NdbQueryOperationImpl& root = getRoot();
     assert(rootFrag.isFragBatchComplete());
 
     assert(m_pendingFrags > 0);                // Check against underflow.
@@ -2234,34 +2288,14 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
       assert(m_finalBatchFrags <= m_rootFragCount);
     }
 
-    if (getQueryDef().isScanQuery())
-    {
-      // Only required for scans
-      rootFrag.getResultStream(root).handleBatchComplete();  
-
-      // Only ordered scans has to wait until all pending completed
-      resume = (m_pendingFrags==0) ||
-               (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
-    }
-    else
-    {
-      assert(rootFrag.getResultStream(root).getReceiver().m_tcPtrI==RNIL);
-      assert(m_finalBatchFrags==1);
-      assert(m_pendingFrags==0);  // Lookup query should be complete now.
-      resume = true;   
-    }
-
-    /* Position at the first (sorted?) row available from this fragments.
-     */
-    rootFrag.getResultStream(root).firstResult();
-
     /* When application thread ::awaitMoreResults() it will later be moved
      * from m_fullFrags to m_applFrags under mutex protection.
      */
     m_fullFrags.push(rootFrag);
+    return true;
   }
 
-  return resume;
+  return false;
 } // NdbQueryImpl::handleBatchComplete
 
 int
@@ -4619,10 +4653,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
 bool 
 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
 {
+  TupleCorrelation tupleCorrelation;
   NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
-  Uint32 rootFragNo = 0;
+
   if (getQueryDef().isScanQuery())
   {
+    const CorrelationData correlData(ptr, len);
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
@@ -4638,18 +4674,21 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       assert(false);
       return false;
     }
-    rootFragNo = rootFrag->getFragNo();
+
+    // Extract tuple correlation.
+    tupleCorrelation = correlData.getTupleCorrelation();
+    len -= CorrelationData::wordCount;
   }
+
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
            << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
-           << ", fragment no: " << rootFragNo
+           << ", fragment no: " << rootFrag->getFragNo()
            << endl;
   }
 
   // Process result values.
-  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len);
-
+  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
   rootFrag->incrOutstandingResults(-1);
 
   bool ret = false;
@@ -4985,6 +5024,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+    if (withCorrelation)
+    {
+      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+    }
   }
   return m_rowSize;
 }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3533 to 3534) Ole John Aske17 Aug