From: Ole John Aske Date: March 20 2012 10:22am Subject: bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3853 to 3854) List-Archive: http://lists.mysql.com/commits/143249 Message-Id: <20120320102248.ADDDC244@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3854 Ole John Aske 2012-03-20 [merge] Merge mysql-5.1-telco-7.0-spj-scan-scan -> mysql-5.5-cluster-7.2-spj modified: mysql-test/suite/ndb/r/ndb_join_pushdown_default.result mysql-test/suite/ndb/t/ndb_join_pushdown.inc storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/test/ndbapi/testSpj.cpp 3853 magnus.blaudd@stripped 2012-03-20 [merge] Merge modified: sql/ndb_mi.cc === modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown_default.result' --- a/mysql-test/suite/ndb/r/ndb_join_pushdown_default.result 2012-02-23 15:41:31 +0000 +++ b/mysql-test/suite/ndb/r/ndb_join_pushdown_default.result 2012-03-20 09:42:12 +0000 @@ -2099,6 +2099,77 @@ count(*) 20000 drop table t1; drop table tx; +create table t1 ( +a int not null, +b int not null, +c int not null, +d int not null, +primary key (`a`,`b`) +) engine=ndbcluster partition by key(a); +insert into t1 values +(1,1,1,1), (2,2,2,2), (3,3,3,3), (4,4,4,4), +(1,2,5,1), (1,3,1,2), (1,4,2,3), +(2,1,3,4), (2,3,4,5), (2,4,5,1), +(3,1,1,2), (3,2,2,3), (3,4,3,4), +(4,1,4,5), (4,2,5,1), (4,3,1,2); +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; +id select_type table type possible_keys key key_len ref rows filtered Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 16 100.00 Parent of 2 pushed join@1 +1 SIMPLE t2 eq_ref PRIMARY PRIMARY 8 test.t1.b,test.t1.c 1 100.00 Child of 't1' in pushed join@1 +Warnings: +Note 1003 select `test`.`t1`.`a` AS `a`,`test`.`t1`.`b` AS `b`,`test`.`t1`.`c` AS `c`,`test`.`t1`.`d` AS `d`,`test`.`t2`.`a` AS `a`,`test`.`t2`.`b` AS `b`,`test`.`t2`.`c` AS `c`,`test`.`t2`.`d` AS `d` from `test`.`t1` join `test`.`t1` `t2` where ((`test`.`t2`.`b` = `test`.`t1`.`c`) and (`test`.`t2`.`a` = `test`.`t1`.`b`)) +set new=on; +alter table t1 partition by hash(a); +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; +id select_type table type possible_keys key key_len ref rows filtered Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 16 100.00 +1 SIMPLE t2 eq_ref PRIMARY PRIMARY 8 test.t1.b,test.t1.c 1 100.00 +Warnings: +Note 9999 Table 't1' is not pushable: has user defined partioning +Note 9999 Table 't2' is not pushable: has user defined partioning +Note 1003 select `test`.`t1`.`a` AS `a`,`test`.`t1`.`b` AS `b`,`test`.`t1`.`c` AS `c`,`test`.`t1`.`d` AS `d`,`test`.`t2`.`a` AS `a`,`test`.`t2`.`b` AS `b`,`test`.`t2`.`c` AS `c`,`test`.`t2`.`d` AS `d` from `test`.`t1` join `test`.`t1` `t2` where ((`test`.`t2`.`b` = `test`.`t1`.`c`) and (`test`.`t2`.`a` = `test`.`t1`.`b`)) +alter table t1 partition by list(a) ( +partition p1 values in (1), +partition p2 values in (2), +partition p3 values in (3), +partition p4 values in (4) +); +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; +id select_type table type possible_keys key key_len ref rows filtered Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 16 100.00 +1 SIMPLE t2 eq_ref PRIMARY PRIMARY 8 test.t1.b,test.t1.c 1 100.00 +Warnings: +Note 9999 Table 't1' is not pushable: has user defined partioning +Note 9999 Table 't2' is not pushable: has user defined partioning +Note 1003 select `test`.`t1`.`a` AS `a`,`test`.`t1`.`b` AS `b`,`test`.`t1`.`c` AS `c`,`test`.`t1`.`d` AS `d`,`test`.`t2`.`a` AS `a`,`test`.`t2`.`b` AS `b`,`test`.`t2`.`c` AS `c`,`test`.`t2`.`d` AS `d` from `test`.`t1` join `test`.`t1` `t2` where ((`test`.`t2`.`b` = `test`.`t1`.`c`) and (`test`.`t2`.`a` = `test`.`t1`.`b`)) +alter table t1 partition by range(a) partitions 4 ( +partition p1 values less than (0), +partition p2 values less than (2), +partition p3 values less than (4), +partition p4 values less than (99999) +); +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; +id select_type table type possible_keys key key_len ref rows filtered Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 16 100.00 +1 SIMPLE t2 eq_ref PRIMARY PRIMARY 8 test.t1.b,test.t1.c 1 100.00 +Warnings: +Note 9999 Table 't1' is not pushable: has user defined partioning +Note 9999 Table 't2' is not pushable: has user defined partioning +Note 1003 select `test`.`t1`.`a` AS `a`,`test`.`t1`.`b` AS `b`,`test`.`t1`.`c` AS `c`,`test`.`t1`.`d` AS `d`,`test`.`t2`.`a` AS `a`,`test`.`t2`.`b` AS `b`,`test`.`t2`.`c` AS `c`,`test`.`t2`.`d` AS `d` from `test`.`t1` join `test`.`t1` `t2` where ((`test`.`t2`.`b` = `test`.`t1`.`c`) and (`test`.`t2`.`a` = `test`.`t1`.`b`)) +drop table t1; +set new=default; create table t1 (a int, b int, primary key(a) using hash) engine = ndb; insert into t1 values (1, 2); insert into t1 values (2, 3); @@ -5560,7 +5631,7 @@ where new.variable_name = old.variable_n order by new.variable_name; variable_name new.variable_value - old.variable_value NDB_PRUNED_SCAN_COUNT 8 -NDB_PUSHED_QUERIES_DEFINED 405 +NDB_PUSHED_QUERIES_DEFINED 406 NDB_PUSHED_QUERIES_DROPPED 8 NDB_PUSHED_QUERIES_EXECUTED 550 NDB_SORTED_SCAN_COUNT 10 === modified file 'mysql-test/suite/ndb/t/ndb_join_pushdown.inc' --- a/mysql-test/suite/ndb/t/ndb_join_pushdown.inc 2012-02-23 15:41:31 +0000 +++ b/mysql-test/suite/ndb/t/ndb_join_pushdown.inc 2012-03-20 09:42:12 +0000 @@ -1055,6 +1055,76 @@ connection ddl; drop table t1; drop table tx; +# Test user defined partition not being pushed +# +# Note: User defined partitions are handled +# by the SQL layer, and as such are unknown +# to the NDB datanodes. +# + +connection spj; +create table t1 ( + a int not null, + b int not null, + c int not null, + d int not null, + primary key (`a`,`b`) +) engine=ndbcluster partition by key(a); + +connection spj; +insert into t1 values +(1,1,1,1), (2,2,2,2), (3,3,3,3), (4,4,4,4), +(1,2,5,1), (1,3,1,2), (1,4,2,3), +(2,1,3,4), (2,3,4,5), (2,4,5,1), +(3,1,1,2), (3,2,2,3), (3,4,3,4), +(4,1,4,5), (4,2,5,1), (4,3,1,2); + +# Only this query('partition by key') should be pushed +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; + +connection ddl; +set new=on; +alter table t1 partition by hash(a); + +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; + +connection ddl; +alter table t1 partition by list(a) ( + partition p1 values in (1), + partition p2 values in (2), + partition p3 values in (3), + partition p4 values in (4) +); + +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; + +connection ddl; +alter table t1 partition by range(a) partitions 4 ( + partition p1 values less than (0), + partition p2 values less than (2), + partition p3 values less than (4), + partition p4 values less than (99999) +); + +explain extended +select * +from t1 +join t1 as t2 on t2.a = t1.b and t2.b = t1.c; + +connection ddl; +drop table t1; +set new=default; + + # pushed mrr does not yet handle multiple PK operations in same transaction # Need 6.0 result handling stuff to simplify result handling # *** join push is currently dissabled for these **** === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-11-16 08:17:17 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-03-20 09:42:12 +0000 @@ -1232,13 +1232,13 @@ private: void scanIndex_execSCAN_FRAGCONF(Signal*, Ptr, Ptr, Ptr); void scanIndex_parent_row(Signal*,Ptr,Ptr, const RowPtr&); void scanIndex_fixupBound(Ptr fragPtr, Uint32 ptrI, Uint32); - void scanIndex_send(Signal* signal, - Ptr requestPtr, - Ptr treeNodePtr, - Uint32 noOfFrags, - Uint32 bs_bytes, - Uint32 bs_rows, - Uint32& batchRange); + Uint32 scanIndex_send(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr, + Uint32 noOfFrags, + Uint32 bs_bytes, + Uint32 bs_rows, + Uint32& batchRange); void scanIndex_batchComplete(Signal* signal); Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr&, Uint32 fragId); === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-03-09 15:37:45 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-03-20 09:42:12 +0000 @@ -3059,145 +3059,192 @@ Dbspj::lookup_send(Signal* signal, Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; - if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT) + Uint32 err = 0; + + do { - jam(); + if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT) + { + jam(); + /** + * Pass sections to send + */ + treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; + treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; + } + else + { + if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0) + { + jam(); + Uint32 tmp = RNIL; + if (!dupSection(tmp, keyInfoPtrI)) + { + jam(); + err = DbspjErr::OutOfSectionMemory; + break; + } + + keyInfoPtrI = tmp; + } + else + { + jam(); + treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; + } + + if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0) + { + jam(); + Uint32 tmp = RNIL; + + /** + * Test execution terminated due to 'OutOfSectionMemory' which + * may happen for different treeNodes in the request: + * - 17070: Fail on any lookup_send() + * - 17071: Fail on lookup_send() if 'isLeaf' + * - 17072: Fail on lookup_send() if treeNode not root + */ + + if (ERROR_INSERTED_CLEAR(17070) || + (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17071)) || + (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17072))) + { + jam(); + ndbout_c("Injecting OutOfSectionMemory error at line %d file %s", + __LINE__, __FILE__); + if (keyInfoPtrI != RNIL) + releaseSection(keyInfoPtrI); + err = DbspjErr::OutOfSectionMemory; + break; + } + + if (!dupSection(tmp, attrInfoPtrI)) + { + jam(); + if (keyInfoPtrI != RNIL) + releaseSection(keyInfoPtrI); + err = DbspjErr::OutOfSectionMemory; + break; + } + + attrInfoPtrI = tmp; + } + else + { + jam(); + treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; + } + } + + getSection(handle.m_ptr[0], keyInfoPtrI); + getSection(handle.m_ptr[1], attrInfoPtrI); + handle.m_cnt = 2; + /** - * Pass sections to send + * Inject error to test LQHKEYREF handling: + * Tampering with tableSchemaVersion such that LQH will + * return LQHKEYREF('1227: Invalid schema version') + * May happen for different treeNodes in the request: + * - 17030: Fail on any lookup_send() + * - 17031: Fail on lookup_send() if 'isLeaf' + * - 17032: Fail on lookup_send() if treeNode not root */ - treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; - treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; - } - else - { - if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0) + if (ERROR_INSERTED_CLEAR(17030) || + (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17031)) || + (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17032))) { jam(); - Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, keyInfoPtrI)); // TODO handle error - keyInfoPtrI = tmp; + req->tableSchemaVersion += (1 << 16); // Provoke 'Invalid schema version' + } + +#if defined DEBUG_LQHKEYREQ + ndbout_c("LQHKEYREQ to %x", ref); + printLQHKEYREQ(stdout, signal->getDataPtrSend(), + NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), + DBLQH); + printf("KEYINFO: "); + print(handle.m_ptr[0], stdout); + printf("ATTRINFO: "); + print(handle.m_ptr[1], stdout); +#endif + + Uint32 Tnode = refToNode(ref); + if (Tnode == getOwnNodeId()) + { + c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1); } else { - jam(); - treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; + c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1); } - if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0) + /** + * Test execution terminated due to 'NodeFailure' which + * may happen for different treeNodes in the request: + * - 17020: Fail on any lookup_send() + * - 17021: Fail on lookup_send() if 'isLeaf' + * - 17022: Fail on lookup_send() if treeNode not root + */ + if (ERROR_INSERTED_CLEAR(17020) || + (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17021)) || + (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17022))) { jam(); - Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error - attrInfoPtrI = tmp; + releaseSections(handle); + err = DbspjErr::NodeFailure; + break; } - else + if (unlikely(!c_alive_nodes.get(Tnode))) { jam(); - treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; + releaseSections(handle); + err = DbspjErr::NodeFailure; + break; + } + else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup())) + { + jam(); + ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data)); + requestPtr.p->m_outstanding += cnt; + requestPtr.p->m_lookup_node_data[Tnode] += cnt; + // number wrapped + ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0)); } - } - - getSection(handle.m_ptr[0], keyInfoPtrI); - getSection(handle.m_ptr[1], attrInfoPtrI); - handle.m_cnt = 2; - /** - * Inject error to test LQHKEYREF handling: - * Tampering with tableSchemaVersion such that LQH will - * return LQHKEYREF('1227: Invalid schema version') - * May happen for different treeNodes in the request: - * - 17030: Fail on any lookup_send() - * - 17031: Fail on lookup_send() if 'isLeaf' - * - 17032: Fail on lookup_send() if treeNode not root - */ - if (ERROR_INSERTED_CLEAR(17030) || - (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17031)) || - (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17032))) - { - jam(); - req->tableSchemaVersion += (1 << 16); // Provoke 'Invalid schema version' - } + sendSignal(ref, GSN_LQHKEYREQ, signal, + NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), + JBB, &handle); -#if defined DEBUG_LQHKEYREQ - ndbout_c("LQHKEYREQ to %x", ref); - printLQHKEYREQ(stdout, signal->getDataPtrSend(), - NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), - DBLQH); - printf("KEYINFO: "); - print(handle.m_ptr[0], stdout); - printf("ATTRINFO: "); - print(handle.m_ptr[1], stdout); -#endif - - Uint32 Tnode = refToNode(ref); - if (Tnode == getOwnNodeId()) - { - c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1); - } - else - { - c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1); - } + treeNodePtr.p->m_lookup_data.m_outstanding += cnt; + if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()) + { + jam(); + /** + * Send TCKEYCONF with DirtyReadBit + Tnode, + * so that API can discover if Tnode while waiting for result + */ + Uint32 resultRef = req->variableData[0]; + Uint32 resultData = req->variableData[1]; - /** - * Test execution terminated due to 'NodeFailure' which - * may happen for different treeNodes in the request: - * - 17020: Fail on any lookup_send() - * - 17021: Fail on lookup_send() if 'isLeaf' - * - 17022: Fail on lookup_send() if treeNode not root - */ - if (ERROR_INSERTED_CLEAR(17020) || - (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17021)) || - (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17022))) - { - jam(); - releaseSections(handle); - abort(signal, requestPtr, DbspjErr::NodeFailure); - return; - } - if (unlikely(!c_alive_nodes.get(Tnode))) - { - jam(); - releaseSections(handle); - abort(signal, requestPtr, DbspjErr::NodeFailure); + TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend(); + conf->apiConnectPtr = RNIL; // lookup transaction from operations... + conf->confInfo = 0; + TcKeyConf::setNoOfOperations(conf->confInfo, 1); + conf->transId1 = requestPtr.p->m_transId[0]; + conf->transId2 = requestPtr.p->m_transId[1]; + conf->operations[0].apiOperationPtr = resultData; + conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode; + Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength; + sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef); + } return; } - else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup())) - { - jam(); - ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data)); - requestPtr.p->m_outstanding += cnt; - requestPtr.p->m_lookup_node_data[Tnode] += cnt; - // number wrapped - ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0)); - } - - sendSignal(ref, GSN_LQHKEYREQ, signal, - NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), - JBB, &handle); - - treeNodePtr.p->m_lookup_data.m_outstanding += cnt; - if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()) - { - jam(); - /** - * Send TCKEYCONF with DirtyReadBit + Tnode, - * so that API can discover if Tnode while waiting for result - */ - Uint32 resultRef = req->variableData[0]; - Uint32 resultData = req->variableData[1]; + while (0); - TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend(); - conf->apiConnectPtr = RNIL; // lookup transaction from operations... - conf->confInfo = 0; - TcKeyConf::setNoOfOperations(conf->confInfo, 1); - conf->transId1 = requestPtr.p->m_transId[0]; - conf->transId2 = requestPtr.p->m_transId[1]; - conf->operations[0].apiOperationPtr = resultData; - conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode; - Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength; - sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef); - } + ndbrequire(err); + jam(); + abort(signal, requestPtr, err); } void @@ -3540,7 +3587,32 @@ Dbspj::lookup_parent_row(Signal* signal, { jam(); Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error + + /** + * Test execution terminated due to 'OutOfSectionMemory' which + * may happen for different treeNodes in the request: + * - 17080: Fail on lookup_parent_row + * - 17081: Fail on lookup_parent_row: if 'isLeaf' + * - 17082: Fail on lookup_parent_row: if treeNode not root + */ + + if (ERROR_INSERTED_CLEAR(17080) || + (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17081)) || + (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17082))) + { + jam(); + ndbout_c("Injecting OutOfSectionMemory error at line %d file %s", + __LINE__, __FILE__); + err = DbspjErr::OutOfSectionMemory; + break; + } + + if (!dupSection(tmp, attrInfoPtrI)) + { + jam(); + err = DbspjErr::OutOfSectionMemory; + break; + } Uint32 org_size; { @@ -4146,9 +4218,6 @@ Dbspj::scanFrag_send(Signal* signal, { jam(); - requestPtr.p->m_outstanding++; - requestPtr.p->m_cnt_active++; - treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; Ptr scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p-> m_scanfrag_data.m_scanFragHandlePtrI); @@ -4215,6 +4284,10 @@ Dbspj::scanFrag_send(Signal* signal, NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), JBB, &handle); + requestPtr.p->m_outstanding++; + requestPtr.p->m_cnt_active++; + treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; + scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING; treeNodePtr.p->m_scanfrag_data.m_rows_received = 0; treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0); @@ -5122,19 +5195,17 @@ Dbspj::scanIndex_parent_row(Signal* sign jam(); Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern); - /** - * Test execution terminated due to 'OutOfSectionMemory': - * - 17060: Fail on scanIndex_parent_row at first call - * - 17061: Fail on scanIndex_parent_row if 'isLeaf' - * - 17062: Fail on scanIndex_parent_row if treeNode not root - * - 17063: Fail on scanIndex_parent_row at a random node of the query tree - * - - */ - + /** + * Test execution terminated due to 'OutOfSectionMemory': + * - 17060: Fail on scanIndex_parent_row at first call + * - 17061: Fail on scanIndex_parent_row if 'isLeaf' + * - 17062: Fail on scanIndex_parent_row if treeNode not root + * - 17063: Fail on scanIndex_parent_row at a random node of the query tree + */ if (ERROR_INSERTED_CLEAR(17060) || ((rand() % 7) == 0 && ERROR_INSERTED_CLEAR(17061)) || ((treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17062))) || - ((treeNodePtr.p->m_parentPtrI != RNIL &&ERROR_INSERTED_CLEAR(17063)))) + ((treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17063)))) { ndbout_c("Injecting OutOfSectionMemory error at line %d file %s", __LINE__, __FILE__); @@ -5338,7 +5409,7 @@ Dbspj::scanIndex_parent_batch_complete(S data.m_parallelism = static_cast(parallelism); #ifdef DEBUG_SCAN_FRAGREQ - DEBUG("::scanIndex_send() starting index scan with parallelism=" + DEBUG("::scanIndex_parent_batch_complete() starting index scan with parallelism=" << data.m_parallelism); #endif } @@ -5369,24 +5440,34 @@ Dbspj::scanIndex_parent_batch_complete(S } Uint32 batchRange = 0; - scanIndex_send(signal, - requestPtr, - treeNodePtr, - data.m_parallelism, - bs_bytes, - bs_rows, - batchRange); - - data.m_firstExecution = false; - - ndbrequire(static_cast(data.m_frags_outstanding + - data.m_frags_complete) <= - data.m_fragCount); + Uint32 frags_started = + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism, + bs_bytes, + bs_rows, + batchRange); - data.m_batch_chunks = 1; - requestPtr.p->m_cnt_active++; - requestPtr.p->m_outstanding++; - treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; + /** + * scanIndex_send might fail to send (errors?): + * Check that we really did send something before + * updating outstanding & active. + */ + if (likely(frags_started > 0)) + { + jam(); + data.m_firstExecution = false; + + ndbrequire(static_cast(data.m_frags_outstanding + + data.m_frags_complete) <= + data.m_fragCount); + + data.m_batch_chunks = 1; + requestPtr.p->m_cnt_active++; + requestPtr.p->m_outstanding++; + treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; + } } void @@ -5418,8 +5499,11 @@ Dbspj::scanIndex_parent_batch_repeat(Sig /** * Ask for the first batch for a number of fragments. + * + * Returns how many fragments we did request the + * 'first batch' from. (<= noOfFrags) */ -void +Uint32 Dbspj::scanIndex_send(Signal* signal, Ptr requestPtr, Ptr treeNodePtr, @@ -5460,147 +5544,184 @@ Dbspj::scanIndex_send(Signal* signal, req->batch_size_bytes = bs_bytes; req->batch_size_rows = bs_rows; + Uint32 err = 0; Uint32 requestsSent = 0; - Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); - Ptr fragPtr; - list.first(fragPtr); - Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI; - ndbrequire(prune || keyInfoPtrI != RNIL); - /** - * Iterate over the list of fragments until we have sent as many - * SCAN_FRAGREQs as we should. - */ - while (requestsSent < noOfFrags) { - jam(); - ndbassert(!fragPtr.isNull()); - - if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED) - { - // Skip forward to the frags that we should send. - jam(); - list.next(fragPtr); - continue; - } - - const Uint32 ref = fragPtr.p->m_ref; - - if (noOfFrags==1 && !prune && - data.m_frags_not_started == data.m_fragCount && - refToNode(ref) != getOwnNodeId() && - list.hasNext(fragPtr)) + Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); + Ptr fragPtr; + list.first(fragPtr); + Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI; + ndbrequire(prune || keyInfoPtrI != RNIL); + /** + * Iterate over the list of fragments until we have sent as many + * SCAN_FRAGREQs as we should. + */ + while (requestsSent < noOfFrags) { - /** - * If we are doing a scan with adaptive parallelism and start with - * parallelism=1 then it makes sense to fetch a batch from a fragment on - * the local data node. The reason for this is that if that fragment - * contains few rows, we may be able to read from several fragments in - * parallel. Then we minimize the total number of round trips (to remote - * data nodes) if we fetch the first fragment batch locally. - */ jam(); - list.next(fragPtr); - continue; - } - - SectionHandle handle(this); + ndbassert(!fragPtr.isNull()); - Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; + if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED) + { + // Skip forward to the frags that we should send. + jam(); + list.next(fragPtr); + continue; + } - /** - * Set data specific for this fragment - */ - req->senderData = fragPtr.i; - req->fragmentNoKeyLen = fragPtr.p->m_fragId; + const Uint32 ref = fragPtr.p->m_ref; - if (prune) - { - jam(); - keyInfoPtrI = fragPtr.p->m_rangePtrI; - if (keyInfoPtrI == RNIL) + if (noOfFrags==1 && !prune && + data.m_frags_not_started == data.m_fragCount && + refToNode(ref) != getOwnNodeId() && + list.hasNext(fragPtr)) { /** - * Since we use pruning, we can see that no parent rows would hash - * to this fragment. + * If we are doing a scan with adaptive parallelism and start with + * parallelism=1 then it makes sense to fetch a batch from a fragment on + * the local data node. The reason for this is that if that fragment + * contains few rows, we may be able to read from several fragments in + * parallel. Then we minimize the total number of round trips (to remote + * data nodes) if we fetch the first fragment batch locally. */ jam(); - fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; list.next(fragPtr); continue; } - if (!repeatable) + SectionHandle handle(this); + + Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; + + /** + * Set data specific for this fragment + */ + req->senderData = fragPtr.i; + req->fragmentNoKeyLen = fragPtr.p->m_fragId; + + if (prune) { - /** - * If we'll use sendSignal() and we need to send the attrInfo several - * times, we need to copy them. (For repeatable or unpruned scans - * we use sendSignalNoRelease(), so then we do not need to copy.) - */ jam(); - Uint32 tmp = RNIL; - ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error - attrInfoPtrI = tmp; + keyInfoPtrI = fragPtr.p->m_rangePtrI; + if (keyInfoPtrI == RNIL) + { + /** + * Since we use pruning, we can see that no parent rows would hash + * to this fragment. + */ + jam(); + fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; + list.next(fragPtr); + continue; + } + + if (!repeatable) + { + /** + * If we'll use sendSignal() and we need to send the attrInfo several + * times, we need to copy them. (For repeatable or unpruned scans + * we use sendSignalNoRelease(), so then we do not need to copy.) + */ + jam(); + Uint32 tmp = RNIL; + + /** + * Test execution terminated due to 'OutOfSectionMemory' which + * may happen for different treeNodes in the request: + * - 17090: Fail on any scanIndex_send() + * - 17091: Fail after sending SCAN_FRAGREQ to some fragments + * - 17092: Fail on scanIndex_send() if 'isLeaf' + * - 17093: Fail on scanIndex_send() if treeNode not root + */ + + if (ERROR_INSERTED_CLEAR(17090) || + (requestsSent > 1 && ERROR_INSERTED_CLEAR(17091)) || + (treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17092)) || + (treeNodePtr.p->m_parentPtrI != RNIL && ERROR_INSERTED_CLEAR(17093))) + { + jam(); + ndbout_c("Injecting OutOfSectionMemory error at line %d file %s", + __LINE__, __FILE__); + err = DbspjErr::OutOfSectionMemory; + break; + } + + if (!dupSection(tmp, attrInfoPtrI)) + { + jam(); + err = DbspjErr::OutOfSectionMemory; + break; + } + + attrInfoPtrI = tmp; + } } - } - req->variableData[0] = batchRange; - getSection(handle.m_ptr[0], attrInfoPtrI); - getSection(handle.m_ptr[1], keyInfoPtrI); - handle.m_cnt = 2; + req->variableData[0] = batchRange; + getSection(handle.m_ptr[0], attrInfoPtrI); + getSection(handle.m_ptr[1], keyInfoPtrI); + handle.m_cnt = 2; #if defined DEBUG_SCAN_FRAGREQ - ndbout_c("SCAN_FRAGREQ to %x", ref); - printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), - NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), - DBLQH); - printf("ATTRINFO: "); - print(handle.m_ptr[0], stdout); - printf("KEYINFO: "); - print(handle.m_ptr[1], stdout); + ndbout_c("SCAN_FRAGREQ to %x", ref); + printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), + NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), + DBLQH); + printf("ATTRINFO: "); + print(handle.m_ptr[0], stdout); + printf("KEYINFO: "); + print(handle.m_ptr[1], stdout); #endif - if (refToNode(ref) == getOwnNodeId()) - { - c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1); - } - else - { - c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1); - } + if (refToNode(ref) == getOwnNodeId()) + { + c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1); + } + else + { + c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1); + } - if (prune && !repeatable) - { - /** - * For a non-repeatable pruned scan, key info is unique for each - * fragment and therefore cannot be reused, so we release key info - * right away. - */ - jam(); - sendSignal(ref, GSN_SCAN_FRAGREQ, signal, - NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); - fragPtr.p->m_rangePtrI = RNIL; - fragPtr.p->reset_ranges(); - } - else - { - /** - * Reuse key info for multiple fragments and/or multiple repetitions - * of the scan. - */ - jam(); - sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal, - NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); - } - handle.clear(); + if (prune && !repeatable) + { + /** + * For a non-repeatable pruned scan, key info is unique for each + * fragment and therefore cannot be reused, so we release key info + * right away. + */ + jam(); + sendSignal(ref, GSN_SCAN_FRAGREQ, signal, + NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); + fragPtr.p->m_rangePtrI = RNIL; + fragPtr.p->reset_ranges(); + } + else + { + /** + * Reuse key info for multiple fragments and/or multiple repetitions + * of the scan. + */ + jam(); + sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal, + NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); + } + handle.clear(); - fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running - data.m_frags_outstanding++; - batchRange += bs_rows; - requestsSent++; - list.next(fragPtr); - } // while (requestsSent < noOfFrags) + fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running + data.m_frags_outstanding++; + data.m_frags_not_started--; + batchRange += bs_rows; + requestsSent++; + list.next(fragPtr); + } // while (requestsSent < noOfFrags) + } + if (err) + { + jam(); + abort(signal, requestPtr, err); + } - data.m_frags_not_started -= requestsSent; + return requestsSent; } void @@ -5807,16 +5928,23 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa if (unlikely(bs_rows > bs_bytes)) bs_rows = bs_bytes; - scanIndex_send(signal, - requestPtr, - treeNodePtr, - data.m_frags_not_started, - bs_bytes, - bs_rows, - batchRange); - return; + Uint32 frags_started = + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_frags_not_started, + bs_bytes, + bs_rows, + batchRange); + + if (likely(frags_started > 0)) + return; + + // Else: scanIndex_send() didn't send anything for some reason. + // Need to continue into 'completion detection' below. + jam(); } - } + } // (data.m_frags_outstanding == 0) if (data.m_rows_received != data.m_rows_expecting) { @@ -5974,43 +6102,44 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal /** * First, ask for more data from fragments that are already started. */ - Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); - list.first(fragPtr); + Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); + list.first(fragPtr); while (sentFragCount < data.m_parallelism && !fragPtr.isNull()) - { - jam(); + { + jam(); ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ || fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE || fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED); - if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ) - { - jam(); + if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ) + { + jam(); - data.m_frags_outstanding++; - req->variableData[0] = batchRange; - fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; - batchRange += bs_rows; + data.m_frags_outstanding++; + req->variableData[0] = batchRange; + fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; + batchRange += bs_rows; - DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex - << treeNodePtr.p->m_send.m_ref - << ", m_node_no=" << treeNodePtr.p->m_node_no - << ", senderData: " << req->senderData); + DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex + << treeNodePtr.p->m_send.m_ref + << ", m_node_no=" << treeNodePtr.p->m_node_no + << ", senderData: " << req->senderData); #ifdef DEBUG_SCAN_FRAGREQ - printSCANFRAGNEXTREQ(stdout, &signal->theData[0], - ScanFragNextReq:: SignalLength + 1, DBLQH); + printSCANFRAGNEXTREQ(stdout, &signal->theData[0], + ScanFragNextReq:: SignalLength + 1, DBLQH); #endif - req->senderData = fragPtr.i; - sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, - ScanFragNextReq::SignalLength + 1, - JBB); + req->senderData = fragPtr.i; + sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, + ScanFragNextReq::SignalLength + 1, + JBB); sentFragCount++; } list.next(fragPtr); } } + Uint32 frags_started = 0; if (sentFragCount < data.m_parallelism) { /** @@ -6018,25 +6147,29 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal */ jam(); ndbassert(data.m_frags_not_started != 0); - scanIndex_send(signal, - requestPtr, - treeNodePtr, - data.m_parallelism - sentFragCount, - org->batch_size_bytes/data.m_parallelism, - bs_rows, - batchRange); + frags_started = + scanIndex_send(signal, + requestPtr, + treeNodePtr, + data.m_parallelism - sentFragCount, + org->batch_size_bytes/data.m_parallelism, + bs_rows, + batchRange); } /** - * cursor should not have been positioned here... - * unless we actually had something more to send. - * so require that we did actually send something + * sendSignal() or scanIndex_send() might have failed to send: + * Check that we really did send something before + * updating outstanding & active. */ - ndbrequire(data.m_frags_outstanding > 0); - ndbrequire(data.m_batch_chunks > 0); - data.m_batch_chunks++; + if (likely(sentFragCount+frags_started > 0)) + { + jam(); + ndbrequire(data.m_batch_chunks > 0); + data.m_batch_chunks++; - requestPtr.p->m_outstanding++; - ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE); + requestPtr.p->m_outstanding++; + ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE); + } } void @@ -7216,7 +7349,6 @@ Dbspj::parseDA(Build_context& ctx, * - 17051: Fail on parseDA if 'isLeaf' * - 17052: Fail on parseDA if treeNode not root * - 17053: Fail on parseDA at a random node of the query tree - * - */ if (ERROR_INSERTED_CLEAR(17050) || ((treeNodePtr.p->isLeaf() && ERROR_INSERTED_CLEAR(17051))) || === modified file 'storage/ndb/test/ndbapi/testSpj.cpp' --- a/storage/ndb/test/ndbapi/testSpj.cpp 2012-03-01 15:13:54 +0000 +++ b/storage/ndb/test/ndbapi/testSpj.cpp 2012-03-15 14:33:38 +0000 @@ -31,7 +31,7 @@ static int faultToInject = 0; enum faultsToInject { FI_START = 17001, - FI_END = 17063 + FI_END = 17093 }; int @@ -120,7 +120,9 @@ runLookupJoinError(NDBT_Context* ctx, ND 17030, 17031, 17032, // LQHKEYREQ reply is LQHKEYREF('Invalid..') 17040, 17041, 17042, // lookup_parent_row -> OutOfQueryMemory 17050, 17051, 17052, 17053, // parseDA -> outOfSectionMem - 17060, 17061, 17062, 17063 // scanIndex_parent_row -> outOfSectionMem + 17060, 17061, 17062, 17063, // scanIndex_parent_row -> outOfSectionMem + 17070, 17071, 17072, // lookup_send.dupsec -> outOfSectionMem + 17080, 17081, 17082 // lookup_parent_row -> OutOfQueryMemory }; loops = faultToInject ? 1 : sizeof(lookupFaults)/sizeof(int); @@ -206,7 +208,10 @@ runScanJoinError(NDBT_Context* ctx, NDBT 17030, 17031, 17032, // LQHKEYREQ reply is LQHKEYREF('Invalid..') 17040, 17041, 17042, // lookup_parent_row -> OutOfQueryMemory 17050, 17051, 17052, 17053, // parseDA -> outOfSectionMem - 17060, 17061, 17062, 17063 // scanIndex_parent_row -> outOfSectionMem + 17060, 17061, 17062, 17063, // scanIndex_parent_row -> outOfSectionMem + 17070, 17071, 17072, // lookup_send.dupsec -> outOfSectionMem + 17080, 17081, 17082, // lookup_parent_row -> OutOfQueryMemory + 17090, 17091, 17092, 17093 // scanIndex_send -> OutOfQueryMemory }; loops = faultToInject ? 1 : sizeof(scanFaults)/sizeof(int); No bundle (reason: useless for push emails).