#At file:///net/fimafeng09/export/home/tmp/oleja/mysql/mysql-5.1-telco-7.0-spj-scan-scan/ based on revid:jonas@stripped
3489 Ole John Aske 2011-05-04 [merge]
merge telco-7.0 -> spj-scan-scan
modified:
storage/ndb/include/kernel/signaldata/QueryTree.hpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
storage/ndb/src/ndbapi/Ndbinit.cpp
storage/ndb/src/ndbapi/ObjectMap.cpp
storage/ndb/src/ndbapi/ObjectMap.hpp
storage/ndb/src/ndbapi/ndberror.c
=== modified file 'storage/ndb/include/kernel/signaldata/QueryTree.hpp'
--- a/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-02-23 19:28:26 +0000
+++ b/storage/ndb/include/kernel/signaldata/QueryTree.hpp 2011-05-04 11:45:33 +0000
@@ -99,6 +99,17 @@ struct DABits
*/
NI_LINKED_DISK = 0x100,
+ /**
+ * If REPEAT_SCAN_RESULT is set, multiple star-joined (or bushy, or X)
+ * scan results are handled by repeating the other scans result
+ * when we advance to the next batch chunk for the current 'active'
+ * result set.
+ * This removes the requirement for the API client to being able
+ * buffer an (possible huge) amount of scan result relating to
+ * the same parent scan.
+ */
+ NI_REPEAT_SCAN_RESULT = 0x200,
+
NI_END = 0
};
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-02-23 19:28:26 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-05-04 11:45:33 +0000
@@ -27,6 +27,7 @@
#include <SLList.hpp>
#include <ArenaPool.hpp>
#include <DataBuffer2.hpp>
+#include <Bitmask.hpp>
#include <signaldata/DbspjErr.hpp>
#include "../dbtup/tuppage.hpp"
@@ -104,6 +105,7 @@ public:
typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map;
typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore;
typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_pattern_store;
+ typedef Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> TreeNodeBitMask;
struct RowRef
{
@@ -296,6 +298,8 @@ public:
Signal* m_start_signal; // Argument to first node in tree
SegmentedSectionPtr m_keyPtr;
+ TreeNodeBitMask m_scans; // TreeNodes doing scans
+
// Used for resolving dependencies
Ptr<TreeNode> m_node_list[NDB_SPJ_MAX_TREE_NODES];
};
@@ -415,6 +419,18 @@ public:
void (Dbspj::*m_parent_batch_complete)(Signal*,Ptr<Request>,Ptr<TreeNode>);
/**
+ * This function is called on the *child* by the *parent* when this
+ * child should prepare to resend results related to parents current batch
+ */
+ void (Dbspj::*m_parent_batch_repeat)(Signal*,Ptr<Request>,Ptr<TreeNode>);
+
+ /**
+ * This function is called on the *child* by the *parent* when
+ * child should release buffers related to parents current batch
+ */
+ void (Dbspj::*m_parent_batch_cleanup)(Ptr<Request>,Ptr<TreeNode>);
+
+ /**
* This function is called when getting a SCAN_NEXTREQ
*/
void (Dbspj::*m_execSCAN_NEXTREQ)(Signal*, Ptr<Request>,Ptr<TreeNode>);
@@ -441,7 +457,7 @@ public:
* should only do local cleanup(s)
*/
void (Dbspj::*m_cleanup)(Ptr<Request>, Ptr<TreeNode>);
- };
+ }; //struct OpInfo
struct LookupData
{
@@ -520,6 +536,7 @@ public:
Uint16 m_frags_outstanding;
Uint32 m_rows_received; // #execTRANSID_AI
Uint32 m_rows_expecting; // Sum(ScanFragConf)
+ Uint32 m_batch_chunks; // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
Uint32 m_scanCookie;
Uint32 m_fragCount;
ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
@@ -547,7 +564,8 @@ public:
TreeNode()
: m_magic(MAGIC), m_state(TN_END),
- m_parentPtrI(RNIL), m_requestPtrI(0)
+ m_parentPtrI(RNIL), m_requestPtrI(0),
+ m_ancestors()
{
}
@@ -555,6 +573,7 @@ public:
: m_magic(MAGIC),
m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING),
m_parentPtrI(RNIL), m_requestPtrI(request),
+ m_ancestors(),
nextList(RNIL), prevList(RNIL)
{
// m_send.m_ref = 0;
@@ -658,7 +677,7 @@ public:
T_REPORT_BATCH_COMPLETE = 0x200,
/**
- * Do I need to know when parent batch is cimpleted
+ * Do I need to know when parent batch is completed
*/
T_NEED_REPORT_BATCH_COMPLETED = 0x400,
@@ -677,6 +696,11 @@ public:
*/
T_SCAN_PARALLEL = 0x2000,
+ /**
+ * Possible requesting resultset for this index scan to be repeated
+ */
+ T_SCAN_REPEATABLE = 0x4000,
+
// End marker...
T_END = 0
};
@@ -689,6 +713,7 @@ public:
Uint32 m_batch_size;
Uint32 m_parentPtrI;
const Uint32 m_requestPtrI;
+ TreeNodeBitMask m_ancestors;
Dependency_map::Head m_dependent_nodes;
PatternStore::Head m_keyPattern;
PatternStore::Head m_attrParamPattern;
@@ -725,7 +750,7 @@ public:
Uint32 nextPool;
};
Uint32 prevList;
- };
+ }; //struct TreeNode
static const Ptr<TreeNode> NullTreeNodePtr;
@@ -745,12 +770,13 @@ public:
{
enum RequestBits
{
- RT_SCAN = 0x1 // unbounded result set, scan interface
- ,RT_ROW_BUFFERS = 0x2 // Do any of the node use row-buffering
- ,RT_MULTI_SCAN = 0x4 // Is there several scans in request
- ,RT_VAR_ALLOC = 0x8 // Is var-allocation used for row-buffer
- ,RT_NEED_PREPARE = 0x10 // Does any node need m_prepare hook
- ,RT_NEED_COMPLETE = 0x20 // Does any node need m_complete hook
+ RT_SCAN = 0x1 // unbounded result set, scan interface
+ ,RT_ROW_BUFFERS = 0x2 // Do any of the node use row-buffering
+ ,RT_MULTI_SCAN = 0x4 // Is there several scans in request
+ ,RT_VAR_ALLOC = 0x8 // Is var-allocation used for row-buffer
+ ,RT_NEED_PREPARE = 0x10 // Does any node need m_prepare hook
+ ,RT_NEED_COMPLETE = 0x20 // Does any node need m_complete hook
+ ,RT_REPEAT_SCAN_RESULT = 0x40 // Repeat bushy scan result when required
};
enum RequestState
@@ -765,7 +791,7 @@ public:
RS_ABORTED = 0x2008, // Aborted and waiting for SCAN_NEXTREQ
RS_END = 0
- };
+ }; //struct Request
Request() {}
Request(const ArenaHead & arena) : m_arena(arena) {}
@@ -781,7 +807,8 @@ public:
TreeNode_list::Head m_nodes;
TreeNodeCursor_list::Head m_cursor_nodes;
Uint32 m_cnt_active; // No of "running" nodes
- Bitmask<1> m_active_nodes; // Nodes which will return more data
+ TreeNodeBitMask
+ m_active_nodes; // Nodes which will return more data in NEXTREQ
Uint32 m_rows; // Rows accumulated in current batch
Uint32 m_outstanding; // Outstanding signals, when 0, batch is done
Uint16 m_lookup_node_data[MAX_NDB_NODES];
@@ -976,6 +1003,7 @@ private:
void start(Signal*, Ptr<Request>);
void checkBatchComplete(Signal*, Ptr<Request>, Uint32 cnt);
void batchComplete(Signal*, Ptr<Request>);
+ void prepareNextBatch(Signal*, Ptr<Request>);
void sendConf(Signal*, Ptr<Request>, bool is_complete);
void complete(Signal*, Ptr<Request>);
void cleanup(Ptr<Request>);
@@ -988,12 +1016,11 @@ private:
void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
void releaseRow(Ptr<Request>, RowRef ref);
- Uint32 releaseScanBuffers(Ptr<Request> requestPtr, Ptr<TreeNode>);
- void registerCursor(Ptr<Request>, Ptr<TreeNode>);
+ void registerActiveCursor(Ptr<Request>, Ptr<TreeNode>);
void nodeFail_checkRequests(Signal*);
+ void cleanupChildBranch(Ptr<Request>, Ptr<TreeNode>);
void cleanup_common(Ptr<Request>, Ptr<TreeNode>);
- void mark_active(Ptr<Request>, Ptr<TreeNode>, bool value);
/**
* Row buffering
@@ -1141,13 +1168,17 @@ private:
Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
Uint32 fragId);
void scanIndex_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void scanIndex_parent_batch_repeat(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanIndex_execSCAN_NEXTREQ(Signal*, Ptr<Request>,Ptr<TreeNode>);
void scanIndex_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
void scanIndex_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
Uint32 scanIndex_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
NdbNodeBitmask);
+ void scanIndex_parent_batch_cleanup(Ptr<Request>, Ptr<TreeNode>);
void scanIndex_cleanup(Ptr<Request>, Ptr<TreeNode>);
+ void scanIndex_release_rangekeys(Ptr<Request>, Ptr<TreeNode>);
+
/**
* Page manager
*/
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-04-29 09:11:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-05-04 11:45:33 +0000
@@ -973,19 +973,6 @@ Dbspj::build(Build_context& ctx,
jam();
requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
}
-
- {
- /**
- * If multi scan, then cursors are determined when one batch is complete
- * hence clear list here...
- * But if it's single scan...the list will already contain the
- * only scan in the tree
- */
- Local_TreeNodeCursor_list list(m_treenode_pool,
- requestPtr.p->m_cursor_nodes);
- ndbassert(list.noOfElements() > 1);
- list.remove();
- }
}
return 0;
@@ -1119,6 +1106,8 @@ Dbspj::batchComplete(Signal* signal, Ptr
{
ndbassert(is_complete);
}
+
+ prepareNextBatch(signal, requestPtr);
sendConf(signal, requestPtr, is_complete);
}
else if (is_complete && need_complete_phase)
@@ -1158,6 +1147,132 @@ Dbspj::batchComplete(Signal* signal, Ptr
}
}
+/**
+ * Locate next TreeNode(s) to retrieve more rows from.
+ *
+ * Calcule set of 'm_active_nodes' we will receive from in NEXTREQ.
+ * Add these TreeNodes to the cursor list to be iterated.
+ */
+void
+Dbspj::prepareNextBatch(Signal* signal, Ptr<Request> requestPtr)
+{
+ requestPtr.p->m_cursor_nodes.init();
+ requestPtr.p->m_active_nodes.clear();
+
+ if (requestPtr.p->m_cnt_active == 0)
+ {
+ jam();
+ return;
+ }
+
+ if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)
+ {
+ /**
+ * If REPEAT_SCAN_RESULT we handle byshy scans by return more *new* rows
+ * from only one of the active child scans. If there are multiple
+ * bushy scans not being able to return their current result set in
+ * a single batch, result sets from the other child scans are repeated
+ * until all rows has been returned to the API client.
+ *
+ * Hence, the cross joined results from the bushy scans are partly
+ * produced within the SPJ block on a 'batchsize granularity',
+ * and partly is the responsibility of the API-client by iterating
+ * the result rows within the current result batches.
+ * (Opposed to non-REPEAT_SCAN_RESULT, the client only have to care about
+ * the current batched rows - no buffering is required)
+ */
+ jam();
+ Ptr<TreeNode> nodePtr;
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+
+ /**
+ * Locate last 'TN_ACTIVE' TreeNode which is the only one choosen
+ * to return more *new* rows.
+ */
+ for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
+ {
+ if (nodePtr.p->m_state == TreeNode::TN_ACTIVE)
+ {
+ jam();
+ DEBUG("Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no);
+ /**
+ * A later NEXTREQ will request a *new* batch of rows from this TreeNode.
+ */
+ registerActiveCursor(requestPtr, nodePtr);
+ break;
+ }
+ }
+
+ /**
+ * Restart/repeat other (index scan) child batches which:
+ * - Being 'after' nodePtr located above.
+ * - Not being an ancestor of (depends on) any 'active' TreeNode.
+ * (As these scans are started when rows from these parent nodes
+ * arrives.)
+ */
+ if (!nodePtr.isNull())
+ {
+ jam();
+ DEBUG("Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no);
+
+ /* Restart any partial index-scans after this 'TN_ACTIVE' TreeNode */
+ for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+ {
+ jam();
+ if (!nodePtr.p->m_ancestors.overlaps (requestPtr.p->m_active_nodes))
+ {
+ jam();
+ ndbrequire(nodePtr.p->m_state != TreeNode::TN_ACTIVE);
+ ndbrequire(nodePtr.p->m_info != 0);
+ if (nodePtr.p->m_info->m_parent_batch_repeat != 0)
+ {
+ jam();
+ (this->*(nodePtr.p->m_info->m_parent_batch_repeat))(signal,
+ requestPtr,
+ nodePtr);
+ }
+ }
+ }
+ } // if (!nodePtr.isNull()
+ }
+ else // not 'RT_REPEAT_SCAN_RESULT'
+ {
+ /**
+ * If not REPEAT_SCAN_RESULT multiple active TreeNodes may return their
+ * remaining result simultaneously. In case of byshy-scans, these
+ * concurrent result streams are cross joins of each other
+ * in SQL terms. In order to produce the cross joined result, it is
+ * the responsibility of the API-client to buffer these streams and
+ * iterate them to produce the cross join.
+ */
+ jam();
+ Ptr<TreeNode> nodePtr;
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+ TreeNodeBitMask ancestors_of_active;
+
+ for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
+ {
+ /**
+ * If we are active (i.e not consumed all rows originating
+ * from parent rows) and we are not in the set of parents
+ * for any active child:
+ *
+ * Then, this is a position that execSCAN_NEXTREQ should continue
+ */
+ if (nodePtr.p->m_state == TreeNode::TN_ACTIVE &&
+ !ancestors_of_active.get (nodePtr.p->m_node_no))
+ {
+ jam();
+ DEBUG("Add 'active' m_node_no: " << nodePtr.p->m_node_no);
+ registerActiveCursor(requestPtr, nodePtr);
+ ancestors_of_active.bitOR(nodePtr.p->m_ancestors);
+ }
+ }
+ } // if (RT_REPEAT_SCAN_RESULT)
+
+ DEBUG("Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]);
+}
+
void
Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
{
@@ -1283,51 +1398,52 @@ void
Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
{
Ptr<TreeNode> treeNodePtr;
- {
- Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
- list.first(treeNodePtr);
- }
-
- /**
- * This is calling recursive function...buh!
- * but i can't figure out how to do it someother way...
- */
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+ TreeNodeBitMask ancestors_of_active;
- /**
- * This recursive function will register nodes to be notified
- * about SCAN_NEXTREQ.
- *
- * Clear it first, so that nodes won't end up in it several times...
- */
- requestPtr.p->m_cursor_nodes.init();
+ for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr))
+ {
+ /**
+ * If there are no active children,
+ * then we can cleanup in our sub-branch
+ */
+ if (!ancestors_of_active.get(treeNodePtr.p->m_node_no))
+ {
+ if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
+ {
+ jam();
+ releaseNodeRows(requestPtr, treeNodePtr);
+ }
+
+ if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
+ {
+ jam();
+ cleanupChildBranch(requestPtr, treeNodePtr);
+ }
+ }
+ /**
+ * Build Bitmask of all nodes having TN_ACTIVE childs
+ */
+ if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
+ {
+ ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors);
+ }
+ }
/**
* Needs to be atleast 1 active otherwise we should have
* taken the cleanup "path" in batchComplete
*/
- ndbrequire(releaseScanBuffers(requestPtr, treeNodePtr) > 0);
+ ndbrequire(requestPtr.p->m_cnt_active >= 1);
}
void
-Dbspj::mark_active(Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr,
- bool value)
+Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
{
Uint32 bit = treeNodePtr.p->m_node_no;
- if (value)
- {
- ndbassert(requestPtr.p->m_active_nodes.get(bit) == false);
- }
- else
- {
- ndbassert(requestPtr.p->m_active_nodes.get(bit) == true);
- }
- requestPtr.p->m_active_nodes.set(bit, value);
-}
+ ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
+ requestPtr.p->m_active_nodes.set(bit);
-void
-Dbspj::registerCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
-{
Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
#ifdef VM_TRACE
{
@@ -1341,12 +1457,9 @@ Dbspj::registerCursor(Ptr<Request> reque
list.add(treeNodePtr);
}
-Uint32
-Dbspj::releaseScanBuffers(Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+void
+Dbspj::cleanupChildBranch(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
{
- Uint32 active_child = 0;
-
LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
Dependency_map::ConstDataBufferIterator it;
@@ -1354,40 +1467,15 @@ Dbspj::releaseScanBuffers(Ptr<Request> r
{
jam();
Ptr<TreeNode> childPtr;
- m_treenode_pool.getPtr(childPtr, * it.data);
- active_child += releaseScanBuffers(requestPtr, childPtr);
- }
-
- const bool active = treeNodePtr.p->m_state == TreeNode::TN_ACTIVE;
- if (active_child == 0)
- {
- jam();
-
- /**
- * If there is no active children,
- * then we can release our own (optionally) buffered rows
- */
- if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
- {
- jam();
- releaseNodeRows(requestPtr, treeNodePtr);
- }
-
- /**
- * If we have no active children,
- * and we ourself is active (i.e not consumed all rows originating
- * from parent rows)
- *
- * Then, this is a position that execSCAN_NEXTREQ should continue
- */
- if (active)
+ m_treenode_pool.getPtr(childPtr, *it.data);
+ if (childPtr.p->m_info->m_parent_batch_cleanup != 0)
{
jam();
- registerCursor(requestPtr, treeNodePtr);
+ (this->*(childPtr.p->m_info->m_parent_batch_cleanup))(requestPtr,
+ childPtr);
}
+ cleanupChildBranch(requestPtr,childPtr);
}
-
- return active_child + (active ? 1 : 0);
}
void
@@ -1672,7 +1760,7 @@ Dbspj::complete(Signal* signal, Ptr<Requ
void
Dbspj::cleanup(Ptr<Request> requestPtr)
{
- ndbrequire(requestPtr.p->m_active_nodes.isclear());
+ ndbrequire(requestPtr.p->m_cnt_active == 0);
{
Ptr<TreeNode> nodePtr;
Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
@@ -1949,16 +2037,46 @@ Dbspj::execSCAN_NEXTREQ(Signal* signal)
Ptr<TreeNode> treeNodePtr;
Local_TreeNodeCursor_list list(m_treenode_pool,
requestPtr.p->m_cursor_nodes);
+ Uint32 cnt_active = 0;
+
for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
{
- jam();
- ndbrequire(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
- ndbrequire(treeNodePtr.p->m_info != 0 &&
- treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
- (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
- requestPtr,
- treeNodePtr);
+ if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
+ {
+ jam();
+ DEBUG("SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i
+ << ", m_node_no: " << treeNodePtr.p->m_node_no
+ << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
+
+ ndbrequire(treeNodePtr.p->m_info != 0 &&
+ treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
+ (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
+ requestPtr,
+ treeNodePtr);
+ cnt_active++;
+ }
+ else
+ {
+ /**
+ * Restart any other scans not being 'TN_ACTIVE'
+ * (Only effective if 'RT_REPEAT_SCAN_RESULT')
+ */
+ jam();
+ ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT);
+ DEBUG(" Restart TreeNode: " << treeNodePtr.i
+ << ", m_node_no: " << treeNodePtr.p->m_node_no
+ << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
+
+ ndbrequire(treeNodePtr.p->m_info != 0 &&
+ treeNodePtr.p->m_info->m_parent_batch_complete !=0 );
+ (this->*(treeNodePtr.p->m_info->m_parent_batch_complete))(signal,
+ requestPtr,
+ treeNodePtr);
+ }
}
+ /* Expected only a single ACTIVE TreeNode among the cursors */
+ ndbrequire(cnt_active == 1 ||
+ !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT));
}
}
@@ -2559,6 +2677,8 @@ Dbspj::g_LookupOpInfo =
0, // execSCAN_FRAGCONF
&Dbspj::lookup_parent_row,
&Dbspj::lookup_parent_batch_complete,
+ 0, // Dbspj::lookup_parent_batch_repeat,
+ 0, // Dbspj::lookup_parent_batch_cleanup,
0, // Dbspj::lookup_execSCAN_NEXTREQ
0, // Dbspj::lookup_complete
&Dbspj::lookup_abort,
@@ -3613,6 +3733,8 @@ Dbspj::g_ScanFragOpInfo =
&Dbspj::scanFrag_execSCAN_FRAGCONF,
0, // parent row
0, // parent batch complete
+ 0, // parent batch repeat
+ 0, // Dbspj::scanFrag_parent_batch_cleanup,
&Dbspj::scanFrag_execSCAN_NEXTREQ,
0, // Dbspj::scanFrag_complete
&Dbspj::scanFrag_abort,
@@ -3716,13 +3838,7 @@ Dbspj::scanFrag_build(Build_context& ctx
}
ctx.m_scan_cnt++;
- /**
- * In the scenario with only 1 scan in tree,
- * register cursor here, so we don't need to search for in after build
- * If m_scan_cnt > 1,
- * then this list will simply be cleared after build
- */
- registerCursor(requestPtr, treeNodePtr);
+ ctx.m_scans.set(treeNodePtr.p->m_node_no);
if (ctx.m_start_signal)
{
@@ -3833,7 +3949,6 @@ Dbspj::scanFrag_send(Signal* signal,
requestPtr.p->m_outstanding++;
requestPtr.p->m_cnt_active++;
- mark_active(requestPtr, treeNodePtr, true);
treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
Ptr<ScanFragHandle> scanFragHandlePtr;
m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
@@ -3966,7 +4081,6 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
ndbrequire(requestPtr.p->m_outstanding);
requestPtr.p->m_outstanding--;
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
- mark_active(requestPtr, treeNodePtr, false);
abort(signal, requestPtr, errCode);
}
@@ -4014,7 +4128,6 @@ Dbspj::scanFrag_execSCAN_FRAGCONF(Signal
ndbrequire(requestPtr.p->m_cnt_active);
requestPtr.p->m_cnt_active--;
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
- mark_active(requestPtr, treeNodePtr, false);
scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
}
else
@@ -4170,6 +4283,8 @@ Dbspj::g_ScanIndexOpInfo =
&Dbspj::scanIndex_execSCAN_FRAGCONF,
&Dbspj::scanIndex_parent_row,
&Dbspj::scanIndex_parent_batch_complete,
+ &Dbspj::scanIndex_parent_batch_repeat,
+ &Dbspj::scanIndex_parent_batch_cleanup,
&Dbspj::scanIndex_execSCAN_NEXTREQ,
&Dbspj::scanIndex_complete,
&Dbspj::scanIndex_abort,
@@ -4282,14 +4397,23 @@ Dbspj::scanIndex_build(Build_context& ct
nodePtr.i = nodePtr.p->m_parentPtrI;
}
- ctx.m_scan_cnt++;
/**
- * In the scenario with only 1 scan in tree,
- * register cursor here, so we don't need to search for in after build
- * If m_scan_cnt > 1,
- * then this list will simply be cleared after build
+ * If there exists other scan TreeNodes not being among
+ * my ancestors, results from this scanIndex may be repeated
+ * as part of an X-scan.
+ *
+ * NOTE: The scan nodes being along the left deep ancestor chain
+ * are not 'repeatable' as they are driving the
+ * repeated X-scan and are thus not repeated themself.
*/
- registerCursor(requestPtr, treeNodePtr);
+ if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
+ !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
+ {
+ nodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
+ }
+
+ ctx.m_scan_cnt++;
+ ctx.m_scans.set(treeNodePtr.p->m_node_no);
return 0;
} while (0);
@@ -4317,6 +4441,7 @@ Dbspj::parseScanIndex(Build_context& ctx
data.m_fragments.init();
data.m_frags_outstanding = 0;
data.m_frags_not_complete = 0;
+ data.m_batch_chunks = 0;
err = parseDA(ctx, requestPtr, treeNodePtr,
tree, treeBits, param, paramBits);
@@ -4860,6 +4985,32 @@ Dbspj::scanIndex_parent_batch_complete(S
}
void
+Dbspj::scanIndex_parent_batch_repeat(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
+{
+ jam();
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+
+ DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
+ << ", m_batch_chunks: " << data.m_batch_chunks);
+
+ /**
+ * Register index-scans to be restarted if we didn't get all
+ * previously fetched parent related child rows in a single batch.
+ */
+ if (data.m_batch_chunks > 1)
+ {
+ jam();
+ DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
+ ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE);
+ registerActiveCursor(requestPtr, treeNodePtr);
+ data.m_frags_not_complete = 1;
+ data.m_batch_chunks = 0;
+ }
+}
+
+void
Dbspj::scanIndex_send(Signal* signal,
Ptr<Request> requestPtr,
Ptr<TreeNode> treeNodePtr)
@@ -4884,12 +5035,24 @@ Dbspj::scanIndex_send(Signal* signal,
}
/**
- * keys,
- * - sliced out to each ScanFragHandle => release = true
- * - all kept on first ScanFragHandle => release = false
+ * if (m_bits & prunemask):
+ * - Range keys sliced out to each ScanFragHandle
+ * - Else, range keys kept on first (and only) ScanFragHandle
*/
Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
- bool release = (treeNodePtr.p->m_bits & prunemask) != 0;
+
+ /**
+ * Don't release keyInfo if it may be sent multiple times, eiter:
+ * - Not pruned -> same keyInfo goes to all datanodes.
+ * - Result handling is REPEAT_SCAN_RESULT and same batch may be
+ * repeated multiple times due to incomplete bushy X-scans.
+ * (by ::scanIndex_parent_batch_repeat())
+ *
+ * When not released, ::scanIndex_parent_batch_cleanup() will
+ * eventually release them when preparing arrival of a new parent batch.
+ */
+ const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
+ (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
memcpy(req, org, sizeof(data.m_scanFragReq));
@@ -4901,18 +5064,16 @@ Dbspj::scanIndex_send(Signal* signal,
Ptr<ScanFragHandle> fragPtr;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
- Uint32 keyInfoPtrI;
- if (release == false)
+ Uint32 keyInfoPtrI = RNIL;
+ list.first(fragPtr);
+ if ((treeNodePtr.p->m_bits & prunemask) == 0)
{
jam();
- list.first(fragPtr);
keyInfoPtrI = fragPtr.p->m_rangePtrI;
ndbrequire(keyInfoPtrI != RNIL);
- fragPtr.p->m_rangePtrI = RNIL;
}
Uint32 batchRange = 0;
- list.first(fragPtr);
for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
{
jam();
@@ -4928,7 +5089,7 @@ Dbspj::scanIndex_send(Signal* signal,
req->senderData = fragPtr.i;
req->fragmentNoKeyLen = fragPtr.p->m_fragId;
- if (release)
+ if ((treeNodePtr.p->m_bits & prunemask))
{
jam();
keyInfoPtrI = fragPtr.p->m_rangePtrI;
@@ -4938,8 +5099,9 @@ Dbspj::scanIndex_send(Signal* signal,
fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
continue;
}
- fragPtr.p->m_rangePtrI = RNIL;
-
+ }
+ if (release)
+ {
/**
* If we'll use sendSignal() and we need to send the attrInfo several
* times, we need to copy them
@@ -4948,7 +5110,6 @@ Dbspj::scanIndex_send(Signal* signal,
ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
attrInfoPtrI = tmp;
}
- fragPtr.p->reset_ranges();
req->variableData[0] = batchRange;
getSection(handle.m_ptr[0], attrInfoPtrI);
@@ -4982,6 +5143,8 @@ Dbspj::scanIndex_send(Signal* signal,
jam();
sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
+ fragPtr.p->m_rangePtrI = RNIL;
+ fragPtr.p->reset_ranges();
}
else
{
@@ -4997,14 +5160,6 @@ Dbspj::scanIndex_send(Signal* signal,
batchRange += bs_rows;
}
- if (release == false)
- {
- jam();
- // only supported for now...
- ndbrequire(treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL);
- releaseSection(keyInfoPtrI);
- }
-
if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
{
ndbrequire(data.m_frags_outstanding == data.m_frags_not_complete);
@@ -5014,9 +5169,9 @@ Dbspj::scanIndex_send(Signal* signal,
ndbrequire(data.m_frags_outstanding == 1);
}
+ data.m_batch_chunks = 1;
requestPtr.p->m_cnt_active++;
requestPtr.p->m_outstanding++;
- mark_active(requestPtr, treeNodePtr, true);
treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
}
@@ -5117,7 +5272,6 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
ndbrequire(requestPtr.p->m_cnt_active);
requestPtr.p->m_cnt_active--;
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
- mark_active(requestPtr, treeNodePtr, false);
}
}
@@ -5180,7 +5334,6 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
ndbrequire(requestPtr.p->m_cnt_active);
requestPtr.p->m_cnt_active--;
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
- mark_active(requestPtr, treeNodePtr, false);
}
if (data.m_frags_outstanding == 0)
@@ -5265,6 +5418,8 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
* so require that we did actually send something
*/
ndbrequire(data.m_frags_outstanding > 0);
+ ndbrequire(data.m_batch_chunks > 0);
+ data.m_batch_chunks++;
requestPtr.p->m_outstanding++;
ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
@@ -5452,26 +5607,25 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
ndbrequire(requestPtr.p->m_cnt_active);
requestPtr.p->m_cnt_active--;
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
- mark_active(requestPtr, treeNodePtr, false);
}
return sum;
}
void
-Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr,
- Ptr<TreeNode> treeNodePtr)
+Dbspj::scanIndex_release_rangekeys(Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
{
+ jam();
+ DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
- if (requestPtr.p->m_state & Request::RS_ABORTING)
+ Ptr<ScanFragHandle> fragPtr;
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
{
- /**
- * If we're aborting...there can be keys attached...that has not
- * (and will not) be sent...release them to avoid memleak
- */
jam();
- Ptr<ScanFragHandle> fragPtr;
for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
{
if (fragPtr.p->m_rangePtrI != RNIL)
@@ -5479,20 +5633,52 @@ Dbspj::scanIndex_cleanup(Ptr<Request> re
releaseSection(fragPtr.p->m_rangePtrI);
fragPtr.p->m_rangePtrI = RNIL;
}
+ fragPtr.p->reset_ranges();
}
}
else
{
-#ifdef VM_TRACE
- Ptr<ScanFragHandle> fragPtr;
- for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
+ jam();
+ list.first(fragPtr);
+ if (fragPtr.p->m_rangePtrI != RNIL)
{
- ndbrequire(fragPtr.p->m_rangePtrI == RNIL);
+ releaseSection(fragPtr.p->m_rangePtrI);
+ fragPtr.p->m_rangePtrI = RNIL;
}
-#endif
+ fragPtr.p->reset_ranges();
}
- list.remove();
+}
+
+/**
+ * Parent batch has completed, and will not refetch (X-joined) results
+ * from its childs. Release & reset range keys which are unsent or we
+ * have kept for possible resubmits.
+ */
+void
+Dbspj::scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
+{
+ DEBUG("scanIndex_parent_batch_cleanup");
+ scanIndex_release_rangekeys(requestPtr,treeNodePtr);
+}
+void
+Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
+{
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ DEBUG("scanIndex_cleanup");
+
+ /**
+ * Range keys has been collected wherever there are uncompleted
+ * parent batches...release them to avoid memleak.
+ */
+ scanIndex_release_rangekeys(requestPtr,treeNodePtr);
+
+ {
+ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+ list.remove();
+ }
if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
{
jam();
@@ -6085,7 +6271,7 @@ Dbspj::expandS(Uint32 & _dst, Local_patt
case QueryPattern::P_PARENT:
jam();
// P_PARENT is a prefix to another pattern token
- // that permits code to access rows from earlier than imediate parent.
+ // that permits code to access rows from earlier than immediate parent.
// val is no of levels to move up the tree
err = appendFromParent(dst, pattern, it, val, row, hasNull);
break;
@@ -6150,7 +6336,7 @@ Dbspj::expandL(Uint32 & _dst, Local_patt
case QueryPattern::P_PARENT:
jam();
// P_PARENT is a prefix to another pattern token
- // that permits code to access rows from earlier than imediate parent
+ // that permits code to access rows from earlier than immediate parent
// val is no of levels to move up the tree
err = appendFromParent(dst, pattern, it, val, row, hasNull);
break;
@@ -6360,6 +6546,13 @@ Dbspj::parseDA(Build_context& ctx,
do
{
+ if (treeBits & DABits::NI_REPEAT_SCAN_RESULT)
+ {
+ jam();
+ DEBUG("use REPEAT_SCAN_RESULT when returning results");
+ requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT;
+ } // DABits::NI_HAS_PARENT
+
if (treeBits & DABits::NI_HAS_PARENT)
{
jam();
@@ -6405,6 +6598,10 @@ Dbspj::parseDA(Build_context& ctx,
}
parentPtr.p->m_bits &= ~(Uint32)TreeNode::T_LEAF;
treeNodePtr.p->m_parentPtrI = parentPtr.i;
+
+ // Build Bitmask of all ancestors to treeNode
+ treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors;
+ treeNodePtr.p->m_ancestors.set(parentPtr.p->m_node_no);
}
if (unlikely(err != 0))
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2011-04-14 09:21:18 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2011-05-04 11:48:48 +0000
@@ -2060,14 +2060,6 @@ int NdbQueryOperationDefImpl::markScanAn
NdbQueryOperationDefImpl* operation = getParentOperation();
while (operation != NULL)
{
- if (operation->m_hasScanDescendant)
- {
- /* Remove this line if you want to allow bushy scans. Result sets will
- * probably be wrong, but 'explain' output etc. may be useful for
- * debugging.
- */
- return QRY_MULTIPLE_SCAN_BRANCHES;
- }
operation->m_hasScanDescendant = true;
if (operation->isScanOperation())
{
@@ -2829,7 +2821,8 @@ NdbQueryScanOperationDefImpl::serialize(
}
node->tableId = tableOrIndex.getObjectId();
node->tableVersion = tableOrIndex.getObjectVersion();
- node->requestInfo = requestInfo;
+ // Need NI_REPEAT_SCAN_RESULT if there are star-joined scans
+ node->requestInfo = requestInfo | DABits::NI_REPEAT_SCAN_RESULT;
QueryNode::setOpLen(node->len, QueryNode::QN_SCAN_INDEX, length);
}
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp 2011-04-14 08:59:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp 2011-05-04 11:48:48 +0000
@@ -42,9 +42,8 @@
#define QRY_SCAN_ORDER_ALREADY_SET 4821
#define QRY_PARAMETER_HAS_WRONG_TYPE 4822
#define QRY_CHAR_PARAMETER_TRUNCATED 4823
-#define QRY_MULTIPLE_SCAN_BRANCHES 4824
-#define QRY_MULTIPLE_SCAN_SORTED 4825
-#define QRY_BATCH_SIZE_TOO_SMALL 4826
+#define QRY_MULTIPLE_SCAN_SORTED 4824
+#define QRY_BATCH_SIZE_TOO_SMALL 4825
#ifdef __cplusplus
#include <Vector.hpp>
=== modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp'
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-04-15 06:29:59 +0000
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-05-04 09:35:08 +0000
@@ -201,8 +201,7 @@ NdbImpl::NdbImpl(Ndb_cluster_connection
m_transporter_facade(ndb_cluster_connection->m_impl.m_transporter_facade),
m_dictionary(ndb),
theCurrentConnectIndex(0),
- theNdbObjectIdMap(m_transporter_facade->theMutexPtr,
- 1024,1024),
+ theNdbObjectIdMap(1024,1024),
theNoOfDBnodes(0),
theWaiter(this),
m_ev_op(0),
=== modified file 'storage/ndb/src/ndbapi/ObjectMap.cpp'
--- a/storage/ndb/src/ndbapi/ObjectMap.cpp 2011-04-07 14:02:50 +0000
+++ b/storage/ndb/src/ndbapi/ObjectMap.cpp 2011-05-04 09:35:08 +0000
@@ -18,13 +18,13 @@
#include "ObjectMap.hpp"
-NdbObjectIdMap::NdbObjectIdMap(NdbMutex* mutex, Uint32 sz, Uint32 eSz)
+NdbObjectIdMap::NdbObjectIdMap(Uint32 sz, Uint32 eSz):
+ m_expandSize(eSz),
+ m_size(0),
+ m_firstFree(InvalidId),
+ m_lastFree(InvalidId),
+ m_map(0)
{
- m_size = 0;
- m_firstFree = InvalidId;
- m_map = 0;
- m_mutex = mutex;
- m_expandSize = eSz;
expand(sz);
#ifdef DEBUG_OBJECTMAP
ndbout_c("NdbObjectIdMap:::NdbObjectIdMap(%u)", sz);
@@ -33,12 +33,14 @@ NdbObjectIdMap::NdbObjectIdMap(NdbMutex*
NdbObjectIdMap::~NdbObjectIdMap()
{
+ assert(checkConsistency());
free(m_map);
+ m_map = NULL;
}
int NdbObjectIdMap::expand(Uint32 incSize)
{
- NdbMutex_Lock(m_mutex);
+ assert(checkConsistency());
Uint32 newSize = m_size + incSize;
MapEntry * tmp = (MapEntry*)realloc(m_map, newSize * sizeof(MapEntry));
@@ -46,20 +48,45 @@ int NdbObjectIdMap::expand(Uint32 incSiz
{
m_map = tmp;
- for(Uint32 i = m_size; i < newSize; i++){
- m_map[i].m_next = 2 * (i + 1) + 1;
+ for(Uint32 i = m_size; i < newSize-1; i++)
+ {
+ m_map[i].setNext(i+1);
}
- m_firstFree = (2 * m_size) + 1;
- m_map[newSize-1].m_next = Uint32(InvalidId);
+ m_firstFree = m_size;
+ m_lastFree = newSize - 1;
+ m_map[newSize-1].setNext(InvalidId);
m_size = newSize;
+ assert(checkConsistency());
}
else
{
- NdbMutex_Unlock(m_mutex);
g_eventLogger->error("NdbObjectIdMap::expand: realloc(%u*%lu) failed",
newSize, sizeof(MapEntry));
return -1;
}
- NdbMutex_Unlock(m_mutex);
return 0;
}
+
+bool NdbObjectIdMap::checkConsistency()
+{
+ if (m_firstFree == InvalidId)
+ {
+ for (Uint32 i = 0; i<m_size; i++)
+ {
+ if (m_map[i].isFree())
+ {
+ assert(false);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ Uint32 i = m_firstFree;
+ while (m_map[i].getNext() != InvalidId)
+ {
+ i = m_map[i].getNext();
+ }
+ assert(i == m_lastFree);
+ return i == m_lastFree;
+}
=== modified file 'storage/ndb/src/ndbapi/ObjectMap.hpp'
--- a/storage/ndb/src/ndbapi/ObjectMap.hpp 2011-04-07 14:02:50 +0000
+++ b/storage/ndb/src/ndbapi/ObjectMap.hpp 2011-05-04 09:35:08 +0000
@@ -20,7 +20,6 @@
#define NDB_OBJECT_ID_MAP_HPP
#include <ndb_global.h>
-//#include <NdbMutex.h>
#include <NdbOut.hpp>
#include <EventLogger.hpp>
@@ -31,11 +30,11 @@ extern EventLogger * g_eventLogger;
/**
* Global ObjectMap
*/
-class NdbObjectIdMap //: NdbLockable
+class NdbObjectIdMap
{
public:
- STATIC_CONST( InvalidId = ~(Uint32)0 );
- NdbObjectIdMap(NdbMutex*, Uint32 initalSize = 128, Uint32 expandSize = 10);
+ STATIC_CONST( InvalidId = 0x7fffffff );
+ NdbObjectIdMap(Uint32 initalSize, Uint32 expandSize);
~NdbObjectIdMap();
Uint32 map(void * object);
@@ -43,34 +42,75 @@ public:
void * getObject(Uint32 id);
private:
+ const Uint32 m_expandSize;
Uint32 m_size;
- Uint32 m_expandSize;
Uint32 m_firstFree;
- union MapEntry {
- UintPtr m_next;
- void * m_obj;
- } * m_map;
+ /**
+ * We put released entries at the end of the free list. That way, we delay
+ * re-use of an object id as long as possible. This minimizes the chance
+ * of sending an incoming message to the wrong object because the recipient
+ * object id was reused.
+ */
+ Uint32 m_lastFree;
+
+ class MapEntry
+ {
+ public:
+ bool isFree() const
+ {
+ return (m_val & 1) == 1;
+ }
+
+ Uint32 getNext() const
+ {
+ assert(isFree());
+ return static_cast<Uint32>(m_val >> 1);
+ }
+
+ void setNext(Uint32 next)
+ {
+ m_val = (next << 1) | 1;
+ }
+
+ void* getObj() const
+ {
+ assert((m_val & 3) == 0);
+ return reinterpret_cast<void*>(m_val);
+ }
+
+ void setObj(void* obj)
+ {
+ m_val = reinterpret_cast<UintPtr>(obj);
+ assert((m_val & 3) == 0);
+ }
+
+ private:
+ /**
+ * This holds either a pointer to a mapped object *or* the index of the
+ * next entry in the free list. If it is a pointer, then the two least
+ * significant bits should be zero (requiring all mapped objects to be
+ * four-byte aligned). If it is an index, then bit 0 should be set.
+ */
+ UintPtr m_val;
+ };
+
+ MapEntry* m_map;
- NdbMutex * m_mutex;
int expand(Uint32 newSize);
+ // For debugging purposes.
+ bool checkConsistency();
};
inline
Uint32
-NdbObjectIdMap::map(void * object){
-
- // lock();
- assert((UintPtr(object) & 3) == 0);
-
- if(m_firstFree == Uint32(InvalidId) && expand(m_expandSize))
+NdbObjectIdMap::map(void * object)
+{
+ if(m_firstFree == InvalidId && expand(m_expandSize))
return InvalidId;
- Uint32 ff = m_firstFree >> 1;
- assert(UintPtr(m_map[ff].m_next) == Uint32(m_map[ff].m_next));
- m_firstFree = Uint32(m_map[ff].m_next);
- m_map[ff].m_obj = object;
-
- // unlock();
+ const Uint32 ff = m_firstFree;
+ m_firstFree = m_map[ff].getNext();
+ m_map[ff].setObj(object);
DBUG_PRINT("info",("NdbObjectIdMap::map(0x%lx) %u", (long) object, ff<<2));
@@ -79,26 +119,37 @@ NdbObjectIdMap::map(void * object){
inline
void *
-NdbObjectIdMap::unmap(Uint32 id, void *object){
-
- Uint32 i = id>>2;
+NdbObjectIdMap::unmap(Uint32 id, void *object)
+{
+ const Uint32 i = id>>2;
- // lock();
- if(i < m_size){
- void * obj = m_map[i].m_obj;
- if (object == obj) {
- m_map[i].m_next = m_firstFree;
- m_firstFree = (2 * i) + 1;
- } else {
+ assert(i < m_size);
+ if(i < m_size)
+ {
+ void * const obj = m_map[i].getObj();
+ if (object == obj)
+ {
+ m_map[i].setNext(InvalidId);
+ if (m_firstFree == InvalidId)
+ {
+ m_firstFree = i;
+ }
+ else
+ {
+ m_map[m_lastFree].setNext(i);
+ }
+ m_lastFree = i;
+ }
+ else
+ {
g_eventLogger->error("NdbObjectIdMap::unmap(%u, 0x%lx) obj=0x%lx",
id, (long) object, (long) obj);
DBUG_PRINT("error",("NdbObjectIdMap::unmap(%u, 0x%lx) obj=0x%lx",
id, (long) object, (long) obj));
+ assert(false);
return 0;
}
- // unlock();
-
DBUG_PRINT("info",("NdbObjectIdMap::unmap(%u) obj=0x%lx", id, (long) obj));
return obj;
@@ -107,12 +158,21 @@ NdbObjectIdMap::unmap(Uint32 id, void *o
}
inline void *
-NdbObjectIdMap::getObject(Uint32 id){
+NdbObjectIdMap::getObject(Uint32 id)
+{
// DBUG_PRINT("info",("NdbObjectIdMap::getObject(%u) obj=0x%x", id, m_map[id>>2].m_obj));
id >>= 2;
- if(id < m_size){
- if ((m_map[id].m_next & 3) == 0)
- return m_map[id].m_obj;
+ assert(id < m_size);
+ if(id < m_size)
+ {
+ if(m_map[id].isFree())
+ {
+ return 0;
+ }
+ else
+ {
+ return m_map[id].getObj();
+ }
}
return 0;
}
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2011-04-29 09:23:56 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2011-05-04 11:48:48 +0000
@@ -797,8 +797,6 @@ ErrorBundle ErrorCodes[] = {
"Parameter value has an incompatible datatype" },
{ QRY_CHAR_PARAMETER_TRUNCATED, DMEC, AE,
"Character Parameter was right truncated" },
- { QRY_MULTIPLE_SCAN_BRANCHES, DMEC, AE,
- "Query has scans that are not descendants/ancestors of each other." },
{ QRY_MULTIPLE_SCAN_SORTED, DMEC, AE,
"Query with multiple scans may not be sorted." },
{ QRY_SEQUENTIAL_SCAN_SORTED, DMEC, AE,
No bundle (reason: revision is a merge).
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3489) | Ole John Aske | 4 May |