From: Mikael Ronstrom Date: August 28 2012 1:37pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3966 to 3968) List-Archive: http://lists.mysql.com/commits/144640 Message-Id: <201208281338.q7SDcPHH018583@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3968 Mikael Ronstrom 2012-08-28 Fixed m_node_active modified: storage/ndb/src/ndbapi/TransporterFacade.cpp 3967 Mikael Ronstrom 2012-08-27 [merge] merge added: mysql-test/suite/ndb/r/ndb_many_fragments.result mysql-test/suite/ndb/t/ndb_many_fragments.cnf mysql-test/suite/ndb/t/ndb_many_fragments.test modified: mysql-test/mysql-test-run.pl storage/ndb/include/kernel/signaldata/DihScanTab.hpp storage/ndb/include/kernel/signaldata/TcContinueB.hpp storage/ndb/src/kernel/blocks/backup/Backup.cpp storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/blocks/suma/Suma.cpp storage/ndb/src/ndbapi/ndberror.c storage/ndb/test/ndbapi/testScan.cpp storage/ndb/test/ndbapi/testSpj.cpp 3966 Mikael Ronstrom 2012-08-23 [merge] merge modified: storage/ndb/include/kernel/signaldata/SchemaTrans.hpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/test/ndbapi/testDict.cpp storage/ndb/test/run-test/daily-basic-tests.txt === modified file 'mysql-test/mysql-test-run.pl' --- a/mysql-test/mysql-test-run.pl revid:mikael.ronstrom@stripped +++ b/mysql-test/mysql-test-run.pl revid:mikael.ronstrom@stripped @@ -3034,10 +3034,17 @@ sub ndbd_start { # > 5.0 { 'character-sets-dir' => \&fix_charset_dir }, my $exe= $exe_ndbd; - if ($exe_ndbmtd and ($exe_ndbmtd_counter++ % 2) == 0) - { - # Use ndbmtd every other time - $exe= $exe_ndbmtd; + if ($exe_ndbmtd) + { if ($ENV{MTR_NDBMTD}) + { + # ndbmtd forced by env var MTR_NDBMTD + $exe= $exe_ndbmtd; + } + if (($exe_ndbmtd_counter++ % 2) == 0) + { + # Use ndbmtd every other time + $exe= $exe_ndbmtd; + } } my $path_ndbd_log= "$dir/ndbd.log"; === added file 'mysql-test/suite/ndb/r/ndb_many_fragments.result' --- a/mysql-test/suite/ndb/r/ndb_many_fragments.result 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb/r/ndb_many_fragments.result revid:mikael.ronstrom@stripped @@ -0,0 +1,135 @@ +create table parent(a int primary key, b int) engine=ndb; +create table child(a int, b int, primary key(a,b)) engine=ndb; +alter table parent partition by key(a) partitions 128; +alter table child partition by key(a,b) partitions 128; +insert into parent values (1,1), (2,2), (3,3), (4,4); +insert into parent select a+4, b+4 from parent; +insert into parent select a+8, b+8 from parent; +insert into parent select a+16, b+16 from parent; +insert into parent select a+32, b+32 from parent; +insert into parent select a+64, b+64 from parent; +insert into parent select a+128, b+128 from parent; +insert into child select * from parent; +analyze table parent, child; +Table Op Msg_type Msg_text +test.parent analyze status OK +test.child analyze status OK +set ndb_join_pushdown = false; +explain select straight_join count(*) from parent +join child as c1 on c1.a = parent.b +join child as c2 on c2.a = parent.b +join child as c3 on c3.a = parent.b +join child as c4 on c4.a = parent.b +join child as c5 on c5.a = parent.b +join child as c6 on c6.a = parent.b +join child as c7 on c7.a = parent.b +join child as c8 on c8.a = parent.b +join child as c9 on c9.a = parent.b +join child as c10 on c10.a = parent.b +join child as c11 on c11.a = parent.b +join child as c12 on c12.a = parent.b +join child as c13 on c13.a = parent.b +join child as c14 on c14.a = parent.b +join child as c15 on c15.a = parent.b +join child as c16 on c16.a = parent.b +where parent.b < 2 +; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE parent ALL NULL NULL NULL NULL 256 Using where with pushed condition +1 SIMPLE c1 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c2 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c3 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c4 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c5 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c6 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c7 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c8 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c9 ref PRIMARY PRIMARY 4 test.c1.a 1 Using where +1 SIMPLE c10 ref PRIMARY PRIMARY 4 test.parent.b 1 +1 SIMPLE c11 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +1 SIMPLE c12 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +1 SIMPLE c13 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +1 SIMPLE c14 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +1 SIMPLE c15 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +1 SIMPLE c16 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where +select straight_join count(*) from parent +join child as c1 on c1.a = parent.b +join child as c2 on c2.a = parent.b +join child as c3 on c3.a = parent.b +join child as c4 on c4.a = parent.b +join child as c5 on c5.a = parent.b +join child as c6 on c6.a = parent.b +join child as c7 on c7.a = parent.b +join child as c8 on c8.a = parent.b +join child as c9 on c9.a = parent.b +join child as c10 on c10.a = parent.b +join child as c11 on c11.a = parent.b +join child as c12 on c12.a = parent.b +join child as c13 on c13.a = parent.b +join child as c14 on c14.a = parent.b +join child as c15 on c15.a = parent.b +join child as c16 on c16.a = parent.b +where parent.b < 2 +; +count(*) +1 +set ndb_join_pushdown = true; +explain select straight_join count(*) from parent +join child as c1 on c1.a = parent.b +join child as c2 on c2.a = parent.b +join child as c3 on c3.a = parent.b +join child as c4 on c4.a = parent.b +join child as c5 on c5.a = parent.b +join child as c6 on c6.a = parent.b +join child as c7 on c7.a = parent.b +join child as c8 on c8.a = parent.b +join child as c9 on c9.a = parent.b +join child as c10 on c10.a = parent.b +join child as c11 on c11.a = parent.b +join child as c12 on c12.a = parent.b +join child as c13 on c13.a = parent.b +join child as c14 on c14.a = parent.b +join child as c15 on c15.a = parent.b +join child as c16 on c16.a = parent.b +where parent.b < 2 +; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE parent ALL NULL NULL NULL NULL 256 Parent of 17 pushed join@1; Using where with pushed condition +1 SIMPLE c1 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'parent' in pushed join@1 +1 SIMPLE c2 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c1' in pushed join@1 +1 SIMPLE c3 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c2' in pushed join@1 +1 SIMPLE c4 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c3' in pushed join@1 +1 SIMPLE c5 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c4' in pushed join@1 +1 SIMPLE c6 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c5' in pushed join@1 +1 SIMPLE c7 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c6' in pushed join@1 +1 SIMPLE c8 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c7' in pushed join@1 +1 SIMPLE c9 ref PRIMARY PRIMARY 4 test.c1.a 1 Child of 'c8' in pushed join@1; Using where +1 SIMPLE c10 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c9' in pushed join@1 +1 SIMPLE c11 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c10' in pushed join@1; Using where +1 SIMPLE c12 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c11' in pushed join@1; Using where +1 SIMPLE c13 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c12' in pushed join@1; Using where +1 SIMPLE c14 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c13' in pushed join@1; Using where +1 SIMPLE c15 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c14' in pushed join@1; Using where +1 SIMPLE c16 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c15' in pushed join@1; Using where +select straight_join count(*) from parent +join child as c1 on c1.a = parent.b +join child as c2 on c2.a = parent.b +join child as c3 on c3.a = parent.b +join child as c4 on c4.a = parent.b +join child as c5 on c5.a = parent.b +join child as c6 on c6.a = parent.b +join child as c7 on c7.a = parent.b +join child as c8 on c8.a = parent.b +join child as c9 on c9.a = parent.b +join child as c10 on c10.a = parent.b +join child as c11 on c11.a = parent.b +join child as c12 on c12.a = parent.b +join child as c13 on c13.a = parent.b +join child as c14 on c14.a = parent.b +join child as c15 on c15.a = parent.b +join child as c16 on c16.a = parent.b +where parent.b < 2 +; +count(*) +1 +drop table parent, child; === added file 'mysql-test/suite/ndb/t/ndb_many_fragments.cnf' --- a/mysql-test/suite/ndb/t/ndb_many_fragments.cnf 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb/t/ndb_many_fragments.cnf revid:mikael.ronstrom@stripped @@ -0,0 +1,14 @@ +!include suite/ndb/my.cnf + +[cluster_config] +# Config for 8 LQH blocks in order to allow total 128 partitions +# with two datanodes. +ThreadConfig=main={count=1},ldm={count=8},recv={count=1},io={count=1},tc={count=1},send={count=1},rep={count=1} +NoOfReplicas=1 +NoOfFragmentLogParts=8 +LongMessageBuffer=8M + +[ENV] +# Need to always use ndbmtd when we want lots of partitions +# (Avoid mixed 'round robin' use of mt / non-mt) +MTR_NDBMTD= 1 === added file 'mysql-test/suite/ndb/t/ndb_many_fragments.test' --- a/mysql-test/suite/ndb/t/ndb_many_fragments.test 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb/t/ndb_many_fragments.test revid:mikael.ronstrom@stripped @@ -0,0 +1,54 @@ +# +# Tests of table partotioned across a large number +# of fragments. Intended to test possible issues +# with large signal fanout, breaking the '1::4 rule' +# + + +create table parent(a int primary key, b int) engine=ndb; +create table child(a int, b int, primary key(a,b)) engine=ndb; + +alter table parent partition by key(a) partitions 128; +alter table child partition by key(a,b) partitions 128; + +insert into parent values (1,1), (2,2), (3,3), (4,4); +insert into parent select a+4, b+4 from parent; +insert into parent select a+8, b+8 from parent; +insert into parent select a+16, b+16 from parent; +insert into parent select a+32, b+32 from parent; +insert into parent select a+64, b+64 from parent; +insert into parent select a+128, b+128 from parent; + +insert into child select * from parent; +analyze table parent, child; + +let $query = + select straight_join count(*) from parent + join child as c1 on c1.a = parent.b + join child as c2 on c2.a = parent.b + join child as c3 on c3.a = parent.b + join child as c4 on c4.a = parent.b + join child as c5 on c5.a = parent.b + join child as c6 on c6.a = parent.b + join child as c7 on c7.a = parent.b + join child as c8 on c8.a = parent.b + join child as c9 on c9.a = parent.b + join child as c10 on c10.a = parent.b + join child as c11 on c11.a = parent.b + join child as c12 on c12.a = parent.b + join child as c13 on c13.a = parent.b + join child as c14 on c14.a = parent.b + join child as c15 on c15.a = parent.b + join child as c16 on c16.a = parent.b + where parent.b < 2 +; + +set ndb_join_pushdown = false; +eval explain $query; +eval $query; + +set ndb_join_pushdown = true; +eval explain $query; +eval $query; + +drop table parent, child; === modified file 'storage/ndb/include/kernel/signaldata/DihScanTab.hpp' --- a/storage/ndb/include/kernel/signaldata/DihScanTab.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/include/kernel/signaldata/DihScanTab.hpp revid:mikael.ronstrom@stripped @@ -50,24 +50,82 @@ struct DihScanTabConf struct DihScanGetNodesReq { - STATIC_CONST( SignalLength = 5 ); + STATIC_CONST( FixedSignalLength = 4 ); + STATIC_CONST( MAX_DIH_FRAG_REQS = 64); // Max #FragItem in REQ/CONF + Uint32 tableId; - Uint32 senderData; Uint32 senderRef; - Uint32 fragId; Uint32 scanCookie; + Uint32 fragCnt; + + struct FragItem + { + STATIC_CONST( Length = 2 ); + + Uint32 senderData; + Uint32 fragId; + }; + + /** + * DihScanGetNodesReq request information about specific fragments. + * - These are either specified in a seperate section (long request) + * containing multiple FragItems. + * - Or directly in a single fragItem[] below (short signal) if it + * contain only a single FragItem. + */ + FragItem fragItem[1]; }; struct DihScanGetNodesConf { - STATIC_CONST( SignalLength = 9 ); + STATIC_CONST( FixedSignalLength = 2 ); + Uint32 tableId; + Uint32 fragCnt; - Uint32 senderData; - Uint32 nodes[4]; - Uint32 count; + struct FragItem + { + STATIC_CONST( Length = 8 ); + + Uint32 senderData; + Uint32 fragId; + Uint32 instanceKey; + Uint32 count; + Uint32 nodes[4]; + }; + + /** + * DihScanGetNodesConf supply information about specific fragments. + * - These are either specified in a seperate section (long request) + * containing multiple FragItems. + * - Or directly in a single fragItem[] below (short signal) if it + * contain only a single FragItem. + * Type of long/short Conf-reply will always be the same as the REQuest + */ + FragItem fragItem[1]; +}; + +struct DihScanGetNodesRef +{ + STATIC_CONST( FixedSignalLength = 3 ); Uint32 tableId; - Uint32 fragId; - Uint32 instanceKey; + Uint32 fragCnt; + Uint32 errCode; + + /** + * DihScanGetNodesRef signals failure of a DihScanGetNodesReq. + * As this is likely due to a sectioned memory alloc failure, + * we avoid further alloc problems by returning the same FragItem[] + * list as in the DihScanGetNodesReq. + * + * Depending on 'fragCnt', the fragItem[] is either: + * - These are either specified in a seperate section (long request) + * containing multiple FragItems. + * - Or directly in a single fragItem[] below (short signal) if it + * contain only a single FragItem. + */ + typedef DihScanGetNodesReq::FragItem FragItem; // Reused, see above + + FragItem fragItem[1]; }; /** === modified file 'storage/ndb/include/kernel/signaldata/TcContinueB.hpp' --- a/storage/ndb/include/kernel/signaldata/TcContinueB.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/include/kernel/signaldata/TcContinueB.hpp revid:mikael.ronstrom@stripped @@ -44,10 +44,10 @@ private: ZWAIT_ABORT_ALL = 14, ZCHECK_SCAN_ACTIVE_FAILED_LQH = 15, TRIGGER_PENDING = 17, - - DelayTCKEYCONF = 18, - ZNF_CHECK_TRANSACTIONS = 19, - ZSEND_FIRE_TRIG_REQ = 20 + DelayTCKEYCONF = 18, + ZNF_CHECK_TRANSACTIONS = 19, + ZSEND_FIRE_TRIG_REQ = 20, + ZSTART_FRAG_SCANS = 21 }; }; === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp revid:mikael.ronstrom@stripped @@ -3885,15 +3885,18 @@ Backup::getFragmentInfo(Signal* signal, tabPtr.p->fragments.getPtr(fragPtr, fragNo); if(fragPtr.p->scanned == 0 && fragPtr.p->scanning == 0) { - jam(); + jam(); DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); - req->senderData = ptr.i; req->tableId = tabPtr.p->tableId; - req->fragId = fragNo; req->scanCookie = tabPtr.p->m_scan_cookie; - sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); + req->fragCnt = 1; + req->fragItem[0].senderData = ptr.i; + req->fragItem[0].fragId = fragNo; + sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, + DihScanGetNodesReq::FixedSignalLength + + DihScanGetNodesReq::FragItem::Length, + JBB); return; }//if }//for @@ -3916,12 +3919,21 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign { jamEntry(); + /** + * Assume only short CONFs with a single FragItem as we only do single + * fragment requests in DIH_SCAN_GET_NODES_REQ from Backup::getFragmentInfo. + */ + ndbrequire(signal->getNoOfSections() == 0); + ndbassert(signal->getLength() == + DihScanGetNodesConf::FixedSignalLength + + DihScanGetNodesConf::FragItem::Length); + DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend(); - const Uint32 senderData = conf->senderData; - const Uint32 nodeCount = conf->count; const Uint32 tableId = conf->tableId; - const Uint32 fragNo = conf->fragId; - const Uint32 instanceKey = conf->instanceKey; + const Uint32 senderData = conf->fragItem[0].senderData; + const Uint32 nodeCount = conf->fragItem[0].count; + const Uint32 fragNo = conf->fragItem[0].fragId; + const Uint32 instanceKey = conf->fragItem[0].instanceKey; ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); @@ -3935,7 +3947,7 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign tabPtr.p->fragments.getPtr(fragPtr, fragNo); fragPtr.p->lqhInstanceKey = instanceKey; - fragPtr.p->node = conf->nodes[0]; + fragPtr.p->node = conf->fragItem[0].nodes[0]; getFragmentInfo(signal, ptr, tabPtr, fragNo + 1); } === modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp' --- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp revid:mikael.ronstrom@stripped @@ -72,6 +72,11 @@ #define ZREPLERROR2 307 // -------------------------------------- +// Other DIH error codes +// -------------------------------------- +#define ZLONG_MESSAGE_ERROR 312 + +// -------------------------------------- // Crash Codes // -------------------------------------- #define ZCOULD_NOT_OCCUR_ERROR 300 === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp revid:mikael.ronstrom@stripped @@ -689,7 +689,6 @@ void Dbdih::execCONTINUEB(Signal* signal { jam(); TabRecordPtr tabPtr; - jam(); tabPtr.i = signal->theData[1]; ptrCheckGuard(tabPtr, ctabFileSize, tabRecord); getTabInfo_send(signal, tabPtr); @@ -9524,43 +9523,133 @@ error: DihScanTabRef::SignalLength, JBB); return; -}//Dbdih::execDI_FCOUNTREQ() +}//Dbdih::execDIH_SCAN_TAB_REQ() void Dbdih::execDIH_SCAN_GET_NODES_REQ(Signal* signal) { - FragmentstorePtr fragPtr; - TabRecordPtr tabPtr; jamEntry(); + DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); - Uint32 senderRef = req->senderRef; - Uint32 senderData = req->senderData; - Uint32 fragId = req->fragId; + const Uint32 tableId = req->tableId; + const Uint32 senderRef = req->senderRef; + const Uint32 fragCnt = req->fragCnt; - tabPtr.i = req->tableId; + SectionHandle reqHandle(this, signal); + const bool useLongSignal = (reqHandle.m_cnt > 0); + + DihScanGetNodesReq::FragItem fragReq[DihScanGetNodesReq::MAX_DIH_FRAG_REQS]; + if (useLongSignal) + { + // Long signal: Fetch into fragReq[] + jam(); + SegmentedSectionPtr fragReqSection; + ndbrequire(reqHandle.getSection(fragReqSection,0)); + ndbassert(fragReqSection.p->m_sz == (fragCnt*DihScanGetNodesReq::FragItem::Length)); + ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS); + copy((Uint32*)fragReq, fragReqSection); + } + else // Short signal, with single FragItem + { + jam(); + ndbassert(fragCnt == 1); + ndbassert(signal->getLength() + == DihScanGetNodesReq::FixedSignalLength + DihScanGetNodesReq::FragItem::Length); + memcpy(fragReq, req->fragItem, 4 * DihScanGetNodesReq::FragItem::Length); + } + + TabRecordPtr tabPtr; + tabPtr.i = tableId; ptrCheckGuard(tabPtr, ctabFileSize, tabRecord); if (DictTabInfo::isOrderedIndex(tabPtr.p->tableType)) { jam(); tabPtr.i = tabPtr.p->primaryTableId; ptrCheckGuard(tabPtr, ctabFileSize, tabRecord); } - - Uint32 nodes[MAX_REPLICAS]; - getFragstore(tabPtr.p, fragId, fragPtr); - Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes); DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend(); - conf->senderData = senderData; - conf->nodes[0] = nodes[0]; - conf->nodes[1] = nodes[1]; - conf->nodes[2] = nodes[2]; - conf->nodes[3] = nodes[3]; - conf->count = count; - conf->tableId = tabPtr.i; - conf->fragId = fragId; - conf->instanceKey = dihGetInstanceKey(fragPtr); - sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal, - DihScanGetNodesConf::SignalLength, JBB); -}//Dbdih::execDIGETPRIMREQ() + conf->tableId = tableId; + conf->fragCnt = fragCnt; + + for (Uint32 i=0; i < fragCnt; i++) + { + jam(); + FragmentstorePtr fragPtr; + Uint32 nodes[MAX_REPLICAS]; + + getFragstore(tabPtr.p, fragReq[i].fragId, fragPtr); + Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes); + + conf->fragItem[i].senderData = fragReq[i].senderData; + conf->fragItem[i].fragId = fragReq[i].fragId; + conf->fragItem[i].instanceKey = dihGetInstanceKey(fragPtr); + conf->fragItem[i].count = count; + conf->fragItem[i].nodes[0] = nodes[0]; + conf->fragItem[i].nodes[1] = nodes[1]; + conf->fragItem[i].nodes[2] = nodes[2]; + conf->fragItem[i].nodes[3] = nodes[3]; + } + + if (useLongSignal) + { + jam(); + Ptr fragConf; + const Uint32 len = fragCnt*DihScanGetNodesConf::FragItem::Length; + + if (ERROR_INSERTED_CLEAR(7234) || + unlikely(!import(fragConf, (Uint32*)conf->fragItem, len))) + { + jam(); + DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtrSend(); + + ref->tableId = tableId; + ref->fragCnt = fragCnt; + ref->errCode = ZLONG_MESSAGE_ERROR; + + /** + * NOTE: DihScanGetNodesRef return the same FragItem list + * received as part of the REQuest to avoid possible + * malloc failure handling in the REF. + */ + sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_REF, signal, + DihScanGetNodesRef::FixedSignalLength, + JBB, &reqHandle); + return; + } + releaseSections(reqHandle); + + SectionHandle confHandle(this, fragConf.i); + sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal, + DihScanGetNodesConf::FixedSignalLength, + JBB, &confHandle); + } + else + { + // A short signal is sufficient. + jam(); + ndbassert(fragCnt == 1); + + if (ERROR_INSERTED_CLEAR(7234)) + { + jam(); + DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtrSend(); + + ref->tableId = tableId; + ref->fragCnt = fragCnt; + ref->errCode = ZLONG_MESSAGE_ERROR; + ref->fragItem[0] = fragReq[0]; + + sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_REF, signal, + DihScanGetNodesRef::FixedSignalLength + + DihScanGetNodesRef::FragItem::Length, + JBB); + return; + } + sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal, + DihScanGetNodesConf::FixedSignalLength + + DihScanGetNodesConf::FragItem::Length, + JBB); + } +}//Dbdih::execDIH_SCAN_GET_NODES_REQ void Dbdih::execDIH_SCAN_TAB_COMPLETE_REP(Signal* signal) === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp revid:mikael.ronstrom@stripped @@ -1253,6 +1253,10 @@ private: void scanIndex_release_rangekeys(Ptr, Ptr); + Uint32 scanindex_sendDihGetNodesReq(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr); + /** * Page manager */ === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp revid:mikael.ronstrom@stripped @@ -5056,25 +5056,14 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig if (!pruned) { + /** Start requesting node info from DIH */ jam(); - Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId; - DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend(); - req->senderRef = reference(); - req->tableId = tableId; - req->scanCookie = cookie; - - Uint32 cnt = 0; - Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); - for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) + err = scanindex_sendDihGetNodesReq(signal, requestPtr, treeNodePtr); + if (unlikely(err != 0)) { jam(); - req->senderData = fragPtr.i; - req->fragId = fragPtr.p->m_fragId; - sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); - cnt++; + break; } - data.m_frags_outstanding = cnt; requestPtr.p->m_outstanding++; } else @@ -5097,46 +5086,206 @@ error: abort(signal, requestPtr, err); } +/** + * Will check the fragment list for fragments which need to + * get node info to construct 'fragPtr.p->m_ref' from DIH. + * + * In order to avoid CPU starvation, or unmanagable huge FragItem[], + * max MAX_DIH_FRAG_REQS are requested in a single signal. + * If there are more fragments, we have to repeatable call this + * function when CONF for the first fragment set is received. + */ +Uint32 +Dbspj::scanindex_sendDihGetNodesReq(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr) +{ + jam(); + ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + Ptr fragPtr; + Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); + + DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend(); + Uint32 fragCnt = 0; + for (list.first(fragPtr); + !fragPtr.isNull() && fragCnt < DihScanGetNodesReq::MAX_DIH_FRAG_REQS; + list.next(fragPtr)) + { + jam(); + if (fragPtr.p->m_ref == 0) // Need GSN_DIH_SCAN_GET_NODES_REQ + { + jam(); + req->fragItem[fragCnt].senderData = fragPtr.i; + req->fragItem[fragCnt].fragId = fragPtr.p->m_fragId; + fragCnt++; + } + } + + if (fragCnt > 0) + { + jam(); + Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId; + req->senderRef = reference(); + req->tableId = tableId; + req->scanCookie = data.m_scanCookie; + req->fragCnt = fragCnt; + + /** Always send as a long signal, even if a short would + * have been sufficient in the (rare) case of 'fragCnt==1' + */ + Ptr fragReq; + Uint32 len = fragCnt*DihScanGetNodesReq::FragItem::Length; + if (ERROR_INSERTED_CLEAR(17130) || + unlikely(!import(fragReq, (Uint32*)req->fragItem, len))) + { + jam(); + return DbspjErr::OutOfSectionMemory; + } + + SectionHandle handle(this, fragReq.i); + sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, + DihScanGetNodesReq::FixedSignalLength, + JBB, &handle); + + data.m_frags_outstanding += fragCnt; + } + return 0; +} //Dbspj::scanindex_sendDihGetNodesReq + void Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal) { jamEntry(); - ndbrequire(false); -} + const DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtr(); +//const Uint32 tableId = ref->tableId; + const Uint32 fragCnt = ref->fragCnt; + const Uint32 errCode = ref->errCode; + ndbassert(errCode != 0); + + if (signal->getNoOfSections() > 0) + { + // Long signal: FragItems listed in first section + jam(); + SectionHandle handle(this, signal); + ndbassert(handle.m_cnt==1); + SegmentedSectionPtr fragRefSection; + ndbrequire(handle.getSection(fragRefSection,0)); + ndbassert(fragRefSection.p->m_sz == (fragCnt*DihScanGetNodesRef::FragItem::Length)); + ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS); + copy((Uint32*)ref->fragItem, fragRefSection); + releaseSections(handle); + } + else // Short signal, single frag in ref->fragItem[0] + { + ndbassert(fragCnt == 1); + ndbassert(signal->getLength() + == DihScanGetNodesRef::FixedSignalLength + DihScanGetNodesRef::FragItem::Length); + } + + UintR treeNodePtrI = RNIL; + for (Uint32 i=0; i < fragCnt; i++) + { + jam(); + const Uint32 senderData = ref->fragItem[i].senderData; + + Ptr fragPtr; + m_scanfraghandle_pool.getPtr(fragPtr, senderData); + + // All fragItem[] should be for same TreeNode + ndbassert (treeNodePtrI == RNIL || treeNodePtrI == fragPtr.p->m_treeNodePtrI); + treeNodePtrI = fragPtr.p->m_treeNodePtrI; + } //for + + ndbassert(treeNodePtrI != RNIL); // fragCnt > 0 above + Ptr treeNodePtr; + m_treenode_pool.getPtr(treeNodePtr, treeNodePtrI); + + ScanIndexData& data = treeNodePtr.p->m_scanindex_data; + ndbassert(data.m_frags_outstanding == fragCnt); + data.m_frags_outstanding -= fragCnt; + + Ptr requestPtr; + m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); + abort(signal, requestPtr, errCode); + + if (data.m_frags_outstanding == 0) + { + jam(); + treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; + checkPrepareComplete(signal, requestPtr, 1); + } +}//Dbspj::execDIH_SCAN_GET_NODES_REF void Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal) { jamEntry(); + const DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr(); + const Uint32 fragCnt = conf->fragCnt; - DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr(); + if (signal->getNoOfSections() > 0) + { + // Unpack long signal + jam(); + SectionHandle handle(this, signal); + SegmentedSectionPtr fragConfSection; + ndbrequire(handle.getSection(fragConfSection,0)); + ndbassert(fragConfSection.p->m_sz == (fragCnt*DihScanGetNodesConf::FragItem::Length)); + copy((Uint32*)conf->fragItem, fragConfSection); + releaseSections(handle); + } + else // Short signal, with single FragItem + { + jam(); + ndbassert(fragCnt == 1); + ndbassert(signal->getLength() + == DihScanGetNodesConf::FixedSignalLength + DihScanGetNodesConf::FragItem::Length); + } - Uint32 senderData = conf->senderData; - Uint32 node = conf->nodes[0]; - Uint32 instanceKey = conf->instanceKey; + UintR treeNodePtrI = RNIL; + for (Uint32 i=0; i < fragCnt; i++) + { + jam(); + const Uint32 senderData = conf->fragItem[i].senderData; + const Uint32 node = conf->fragItem[i].nodes[0]; + const Uint32 instanceKey = conf->fragItem[i].instanceKey; - Ptr fragPtr; - m_scanfraghandle_pool.getPtr(fragPtr, senderData); + Ptr fragPtr; + m_scanfraghandle_pool.getPtr(fragPtr, senderData); + + // All fragItem[] should be for same TreeNode + ndbassert (treeNodePtrI == RNIL || treeNodePtrI == fragPtr.p->m_treeNodePtrI); + treeNodePtrI = fragPtr.p->m_treeNodePtrI; + + fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node); + } //for + + ndbassert(treeNodePtrI != RNIL); // fragCnt > 0 above Ptr treeNodePtr; - m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI); - ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo); + m_treenode_pool.getPtr(treeNodePtr, treeNodePtrI); + ScanIndexData& data = treeNodePtr.p->m_scanindex_data; - ndbrequire(data.m_frags_outstanding > 0); - data.m_frags_outstanding--; + ndbassert(data.m_frags_outstanding == fragCnt); + data.m_frags_outstanding -= fragCnt; - fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node); + Ptr requestPtr; + m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); - if (data.m_frags_outstanding == 0) + /** Check if we need to send more GSN_DIH_SCAN_GET_NODES_REQ */ + Uint32 err = scanindex_sendDihGetNodesReq(signal, requestPtr, treeNodePtr); + if (unlikely(err != 0)) { jam(); + abort(signal, requestPtr, err); + } + if (data.m_frags_outstanding == 0) + { + jam(); treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; - - Ptr requestPtr; - m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); checkPrepareComplete(signal, requestPtr, 1); } -} +}//Dbspj::execDIH_SCAN_GET_NODES_CONF Uint32 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list, === modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp' --- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp revid:mikael.ronstrom@stripped @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -1496,6 +1497,12 @@ private: void initScanfragrec(Signal* signal); void releaseScanResources(Signal*, ScanRecordPtr, bool not_started = false); ScanRecordPtr seizeScanrec(Signal* signal); + void startFragScansLab(Signal*, Uint32 tableId, + SectionHandle&, Uint32 secOffs); + void startFragScanLab(Signal*, Uint32 tableId, + const DihScanGetNodesConf::FragItem& fragConf); + + void sendDihGetNodesReq(Signal*, ScanRecordPtr); void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*, bool); void sendScanTabConf(Signal* signal, ScanRecordPtr); void close_scan_req(Signal*, ScanRecordPtr, bool received_req); === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp revid:mikael.ronstrom@stripped @@ -354,6 +354,13 @@ void Dbtc::execCONTINUEB(Signal* signal) } sendFireTrigReq(signal, apiConnectptr, signal->theData[4]); return; + case TcContinueB::ZSTART_FRAG_SCANS: + { + jam(); + SectionHandle handle(this, signal); + startFragScansLab(signal, Tdata0, handle, Tdata1); + return; + } default: ndbrequire(false); }//switch @@ -10927,7 +10934,7 @@ void Dbtc::diFcountReqLab(Signal* signal }//Dbtc::diFcountReqLab() /******************************************************************** - * execDI_FCOUNTCONF + * execDIH_SCAN_TAB_CONF * * WE HAVE ASKED DIH ABOUT THE NUMBER OF FRAGMENTS IN THIS TABLE. * WE WILL NOW START A NUMBER OF PARALLEL SCAN PROCESSES. EACH OF @@ -11030,50 +11037,145 @@ void Dbtc::execDIH_SCAN_TAB_CONF(Signal* setApiConTimer(apiConnectptr.i, 0, __LINE__); updateBuddyTimer(apiConnectptr); + /** Need own local scope of list(...,m_running_scan_frags) */ + { + ScanFragRecPtr ptr; + ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + + /** + * Initially list(...,m_running_scan_frags) contains an 'IDLE' entry + * for all fragments. Now assign fragId to those 'tfragCount' fragments + * to execute in parallel. + */ + for (list.first(ptr); !ptr.isNull() && tfragCount; + list.next(ptr), tfragCount--){ + jam(); + + ndbassert(ptr.p->scanFragState == ScanFragRec::IDLE); + ptr.p->lqhBlockref = 0; + ptr.p->scanFragId = scanptr.p->scanNextFragId++; + }//for + + /** + * Any remaining fragments, not allowed to execute in parallel, are + * put into the 'queued-list' until they can be executed. + */ + ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags); + for (; !ptr.isNull();) + { + jam(); + ptr.p->m_ops = 0; + ptr.p->m_totalLen = 0; + ptr.p->m_scan_frag_conf_status = 1; + ptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; + ptr.p->stopFragTimer(); + + ScanFragRecPtr tmp; + tmp.i = ptr.i; + tmp.p = ptr.p; + list.next(ptr); + list.remove(tmp); + queued.add(tmp); + scanptr.p->m_queued_count++; + } + } + + /** + * Start by requesting fragment info from DIH. + * Max MAX_DIH_FRAG_REQS fragments can be requested at once. + * If needed we will send more requests after CONF is received. + */ + jam(); + sendDihGetNodesReq(signal, scanptr); + +}//Dbtc::execDIH_SCAN_TAB_CONF() + +/******************************************************************** + * sendDihGetNodesReq + * + * Will check the 'm_running_scan_frags' list for fragments which + * are still 'IDLE'. These should be started by requesting + * node info in a DIH_SCAN_GET_NODES_REQ. + * + * In order to avoid CPU starvation, or unmanagable huge FragItem[], + * max MAX_DIH_FRAG_REQS are requested in a single signal. + * If there are more fragments, we have to repeatable call this + * function when CONF for the first fragment set is received. + ********************************************************************/ +void Dbtc::sendDihGetNodesReq(Signal* signal, ScanRecordPtr scanptr) +{ + jam(); ScanFragRecPtr ptr; - ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags); - for (list.first(ptr); !ptr.isNull() && tfragCount; - list.next(ptr), tfragCount--){ - jam(); + DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); + Uint32 fragCnt = 0; - ptr.p->lqhBlockref = 0; - ptr.p->startFragTimer(ctcTimer); - ptr.p->scanFragId = scanptr.p->scanNextFragId++; - ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; - ptr.p->startFragTimer(ctcTimer); + { // running-list scope + ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + for (list.first(ptr); + !ptr.isNull() && fragCnt < DihScanGetNodesReq::MAX_DIH_FRAG_REQS; + list.next(ptr)) + { + jam(); + if (ptr.p->scanFragState == ScanFragRec::IDLE) // Start it NOW!. + { + jam(); + ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; + ptr.p->startFragTimer(ctcTimer); + req->fragItem[fragCnt].senderData = ptr.i; + req->fragItem[fragCnt].fragId = ptr.p->scanFragId; - DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); + fragCnt++; + } // if IDLE + } + } // running-list scope + + if (fragCnt > 0) + { + jam(); req->senderRef = reference(); - req->senderData = ptr.i; req->tableId = scanptr.p->scanTableref; - req->fragId = ptr.p->scanFragId; req->scanCookie = scanptr.p->m_scan_cookie; - sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); - }//for + req->fragCnt = fragCnt; - ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags); - for (; !ptr.isNull();) - { - ptr.p->m_ops = 0; - ptr.p->m_totalLen = 0; - ptr.p->m_scan_frag_conf_status = 1; - ptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; - ptr.p->stopFragTimer(); - - ScanFragRecPtr tmp; - tmp.i = ptr.i; - tmp.p = ptr.p; - list.next(ptr); - list.remove(tmp); - queued.add(tmp); - scanptr.p->m_queued_count++; + /** Always send as a long signal, even if a short would + * have been sufficient in the (rare) case of 'fragCnt==1' + */ + Ptr fragReq; + const Uint32 len = fragCnt*DihScanGetNodesReq::FragItem::Length; + + if (ERROR_INSERTED_CLEAR(8095) || // Fail once + unlikely(!import(fragReq, (Uint32*)req->fragItem, len))) + { + jam(); + + /** Handling of failed REQ is similar to :execDIH_SCAN_GET_NODES_REF */ + for (Uint32 i = 0; i < fragCnt; i++) + { + jam(); + ptr.i = req->fragItem[i].senderData; + c_scan_frag_pool.getPtr(ptr); + + ndbrequire(ptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF); + ptr.p->scanFragState = ScanFragRec::COMPLETED; + ptr.p->stopFragTimer(); + { + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + run.release(ptr); + } + } + scanError(signal, scanptr, ZGET_DATAREC_ERROR); + return; + } + SectionHandle handle(this, fragReq.i); + sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal, + DihScanGetNodesReq::FixedSignalLength, + JBB, &handle); } -}//Dbtc::execDI_FCOUNTCONF() +}//Dbtc::sendDihGetNodesReq /****************************************************** - * execDI_FCOUNTREF + * execDIH_SCAN_TAB_REF ******************************************************/ void Dbtc::execDIH_SCAN_TAB_REF(Signal* signal) { @@ -11095,7 +11197,7 @@ void Dbtc::execDIH_SCAN_TAB_REF(Signal* return; }//if abortScanLab(signal, scanptr, errCode, true); -}//Dbtc::execDI_FCOUNTREF() +}//Dbtc::execDIH_SCAN_TAB_REF() void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode, bool not_started) @@ -11160,23 +11262,113 @@ void Dbtc::releaseScanResources(Signal* /**************************************************************** - * execDIGETPRIMCONF + * execDIH_SCAN_GET_NODES_CONF * - * WE HAVE RECEIVED THE PRIMARY NODE OF THIS FRAGMENT. - * WE ARE NOW READY TO ASK FOR PERMISSION TO LOAD THIS - * SPECIFIC NODE WITH A SCAN OPERATION. + * WE HAVE RECEIVED THE PRIMARY NODES OF ALL FRAGMENTS. + * WE ARE NOW READY TO ASK FOR PERMISSION TO LOAD THESE + * NODES WITH A SCAN OPERATIONS. ****************************************************************/ void Dbtc::execDIH_SCAN_GET_NODES_CONF(Signal* signal) { jamEntry(); DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr(); - scanFragptr.i = conf->senderData; + const Uint32 tableId = conf->tableId; + const Uint32 fragCnt = conf->fragCnt; + + if (signal->getNoOfSections() > 0) + { + // Long signal: FragItems listed in first section + jam(); + SectionHandle handle(this, signal); + ndbassert(handle.m_cnt==1); + startFragScansLab(signal, tableId, handle, 0); + } + else // Short signal, with single FragItem + { + jam(); + ndbassert(fragCnt == 1); + ndbassert(signal->getLength() + == DihScanGetNodesConf::FixedSignalLength + DihScanGetNodesConf::FragItem::Length); + + DihScanGetNodesConf::FragItem fragConf[1]; + memcpy(fragConf, conf->fragItem, 4 * DihScanGetNodesConf::FragItem::Length); + startFragScanLab(signal, tableId, fragConf[0]); + } + + /** + * As MAX_DIH_FRAG_REQS fragments can be requested at once, + * we may have to send more DIH_SCAN_GET_NODES_REQ now + */ + ScanRecordPtr scanptr; + scanptr.i = scanFragptr.p->scanRec; + ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + + jam(); + sendDihGetNodesReq(signal, scanptr); + +}//Dbtc::execDIH_SCAN_GET_NODES_CONF + +/**************************************************************** + * startFragScansLab + * + * PROCESS THE LIST OF DihScanGetNodesConf::FragItem RECEIVED + * FROM ::execDIH_SCAN_GET_NODES_CONF. SEND A 'SCAN_FRAGREQ' + * FOR EACH OF THESE. + * AVOID PRODUCING TOO MANY LOCAL SIGNALS WHICH MAY RESULT IN + * A 'Out of SendBuffers' ERROR. IN THESE CASES WE TAKE A BREAK + * AND CONTINUEB LATER. + ****************************************************************/ +void Dbtc::startFragScansLab(Signal* signal, Uint32 tableId, + SectionHandle& handle, Uint32 secOffs) +{ + Uint32 cntLocalSignals = 0; + const NodeId ownNodeId = getOwnNodeId(); + SectionReader fragReader(handle.m_ptr[0], getSectionSegmentPool()); + ndbassert((fragReader.getSize() % DihScanGetNodesConf::FragItem::Length) == 0); + ndbrequire(fragReader.step(secOffs)); + + DihScanGetNodesConf::FragItem fragConf; + while (fragReader.getWords((Uint32*)&fragConf,DihScanGetNodesConf::FragItem::Length)) + { + jam(); + if (fragConf.nodes[0] == ownNodeId) + cntLocalSignals++; + + /** + * A max fanout of 1::4 of consumed::produced signals are allowed. + * If we are about to produce more, we have to contine later. + */ + if (cntLocalSignals >= 4) + { + jam(); + signal->theData[0] = TcContinueB::ZSTART_FRAG_SCANS; + signal->theData[1] = tableId; + signal->theData[2] = secOffs; + sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB, &handle); + return; + } + + startFragScanLab(signal, tableId, fragConf); + secOffs += DihScanGetNodesConf::FragItem::Length; + } //while + + jam(); + releaseSections(handle); +}//Dbtc::startFragScansLab + +void Dbtc::startFragScanLab(Signal* signal, Uint32 tableId, + const DihScanGetNodesConf::FragItem& fragConf) +{ + jam(); + const NodeId ownNodeId = getOwnNodeId(); + + scanFragptr.i = fragConf.senderData; c_scan_frag_pool.getPtr(scanFragptr); - tnodeid = conf->nodes[0]; + tnodeid = fragConf.nodes[0]; arrGuard(tnodeid, MAX_NDB_NODES); - if(ERROR_INSERTED(8050) && tnodeid != getOwnNodeId()) + if (ERROR_INSERTED(8050) && tnodeid != ownNodeId) { /* Asked to scan a fragment which is not on the same node as the * TC - transaction hinting / scan partition pruning has failed @@ -11197,16 +11389,15 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S * can "pass" committing on backup fragments and * get incorrect row count */ - if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo)) + if (false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo)) { jam(); - Uint32 nodeid = getOwnNodeId(); - for(Uint32 i = 1; icount; i++) + for (Uint32 i = 1; inodes[i] == nodeid) + if (fragConf.nodes[i] == ownNodeId) { jam(); - tnodeid = nodeid; + tnodeid = ownNodeId; break; } } @@ -11271,17 +11462,10 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S * If this is the last SCANFRAGREQ, sendScanFragReq will release * the KeyInfo and AttrInfo sections when sending. */ - Uint32 instanceKey = conf->instanceKey; + Uint32 instanceKey = fragConf.instanceKey; scanFragptr.p->lqhBlockref = numberToRef(scanptr.p->m_scan_block_no, instanceKey, tnodeid); -#if 0 - if (scanptr.p->m_scan_block_no == DBSPJ) - { - scanFragptr.p->lqhBlockref = numberToRef(scanptr.p->m_scan_block_no, - instanceKey, tnodeid); - } -#endif scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount; /* Determine whether this is the last scanFragReq @@ -11306,35 +11490,77 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S * WE HAVE NOW STARTED A FRAGMENT SCAN. NOW * WAIT FOR THE FIRST SCANNED RECORDS *********************************************/ -}//Dbtc::execDIGETPRIMCONF +}//Dbtc::startFragScanLab + /*************************************************** - * execDIGETPRIMREF + * execDIH_SCAN_GET_NODES_REF * * WE ARE NOW FORCED TO STOP THE SCAN. THIS ERROR * IS NOT RECOVERABLE SINCE THERE IS A PROBLEM WITH - * FINDING A PRIMARY REPLICA OF A CERTAIN FRAGMENT. + * FINDING A PRIMARY REPLICA OF SOME FRAGMENT(s). ***************************************************/ void Dbtc::execDIH_SCAN_GET_NODES_REF(Signal* signal) { jamEntry(); - // tcConnectptr.i in theData[0] is not used. - scanFragptr.i = signal->theData[1]; - const Uint32 errCode = signal->theData[2]; - c_scan_frag_pool.getPtr(scanFragptr); - ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF); + const DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtr(); +//const Uint32 tableId = ref->tableId; + const Uint32 fragCnt = ref->fragCnt; + const Uint32 errCode = ref->errCode; + + if (signal->getNoOfSections() > 0) + { + // Long signal: FragItems listed in first section + jam(); + SectionHandle handle(this, signal); + ndbassert(handle.m_cnt==1); + + SegmentedSectionPtr fragRefSection; + ndbrequire(handle.getSection(fragRefSection,0)); + ndbassert(fragRefSection.p->m_sz == (fragCnt*DihScanGetNodesRef::FragItem::Length)); + ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS); + copy((Uint32*)ref->fragItem, fragRefSection); + releaseSections(handle); + } + else // Short signal, single frag in ref->fragItem[0] + { + ndbassert(fragCnt == 1); + ndbassert(signal->getLength() + == DihScanGetNodesRef::FixedSignalLength + DihScanGetNodesRef::FragItem::Length); + } ScanRecordPtr scanptr; - scanptr.i = scanFragptr.p->scanRec; - ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + scanptr.setNull(); + for (Uint32 i = 0; i < fragCnt; i++) { - ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); - run.release(scanFragptr); + jam(); + scanFragptr.i = ref->fragItem[i].senderData; + jam(); jamLine(ref->fragItem[i].fragId); //OJA + c_scan_frag_pool.getPtr(scanFragptr); + + jam(); //OJA TEMP + ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF); + scanFragptr.p->scanFragState = ScanFragRec::COMPLETED; + scanFragptr.p->stopFragTimer(); + + jam(); //OJA TEMP + // All scanFrags should belong to the same table scan + ndbassert(scanptr.isNull() || scanptr.i==scanFragptr.p->scanRec); + if (scanptr.isNull()) + { + jam(); + scanptr.i = scanFragptr.p->scanRec; + ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + } + { + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + run.release(scanFragptr); + } } scanError(signal, scanptr, errCode); -}//Dbtc::execDIGETPRIMREF() +}//Dbtc::execDIH_SCAN_GET_NODES_REF() /** * Dbtc::execSCAN_FRAGREF @@ -11503,12 +11729,15 @@ void Dbtc::execSCAN_FRAGCONF(Signal* sig scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); - req->senderData = scanFragptr.i; req->tableId = scanptr.p->scanTableref; - req->fragId = scanFragptr.p->scanFragId; req->scanCookie = scanptr.p->m_scan_cookie; + req->fragCnt = 1; + req->fragItem[0].senderData = scanFragptr.i; + req->fragItem[0].fragId = scanFragptr.p->scanFragId; sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); + DihScanGetNodesReq::FixedSignalLength + + DihScanGetNodesReq::FragItem::Length, + JBB); return; } /* @@ -11719,12 +11948,15 @@ void Dbtc::execSCAN_NEXTREQ(Signal* sign DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); - req->senderData = scanFragptr.i; req->tableId = scanptr.p->scanTableref; - req->fragId = scanFragptr.p->scanFragId; req->scanCookie = scanptr.p->m_scan_cookie; + req->fragCnt = 1; + req->fragItem[0].senderData = scanFragptr.i; + req->fragItem[0].fragId = scanFragptr.p->scanFragId; sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); + DihScanGetNodesReq::FixedSignalLength + + DihScanGetNodesReq::FragItem::Length, + JBB); } else { @@ -11788,7 +12020,7 @@ Dbtc::close_scan_req(Signal* signal, Sca switch(curr.p->scanFragState){ case ScanFragRec::IDLE: jam(); // real early abort - ndbrequire(old == ScanRecord::WAIT_AI); + ndbrequire(old == ScanRecord::WAIT_AI || old == ScanRecord::RUNNING); running.release(curr); continue; case ScanFragRec::WAIT_GET_PRIMCONF: === modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp' --- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp revid:mikael.ronstrom@stripped @@ -2456,12 +2456,16 @@ Suma::execDIH_SCAN_TAB_CONF(Signal* sign DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); - req->senderData = ptr.i; req->tableId = tableId; - req->fragId = 0; req->scanCookie = scanCookie; + req->fragCnt = 1; + req->fragItem[0].senderData = ptr.i; + req->fragItem[0].fragId = 0; + sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); + DihScanGetNodesReq::FixedSignalLength + + DihScanGetNodesReq::FragItem::Length, + JBB); DBUG_VOID_RETURN; } @@ -2470,18 +2474,26 @@ void Suma::execDIH_SCAN_GET_NODES_CONF(Signal* signal) { jamEntry(); - DBUG_ENTER("Suma::execDIGETPRIMCONF"); + DBUG_ENTER("Suma::execDIH_SCAN_GET_NODES_CONF"); + + /** + * Assume a short signal, with a single FragItem being returned + * as we do only single fragment requests in + * DIH_SCAN_GET_NODES_REQs sent from SUMA. + */ ndbassert(signal->getNoOfSections() == 0); + ndbassert(signal->getLength() == + DihScanGetNodesConf::FixedSignalLength + + DihScanGetNodesConf::FragItem::Length); DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr(); - const Uint32 nodeCount = conf->count; const Uint32 tableId = conf->tableId; - const Uint32 fragNo = conf->fragId; - + const Uint32 fragNo = conf->fragItem[0].fragId; + const Uint32 nodeCount = conf->fragItem[0].count; ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); Ptr ptr; - c_syncPool.getPtr(ptr, conf->senderData); + c_syncPool.getPtr(ptr, conf->fragItem[0].senderData); { LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments); @@ -2490,9 +2502,9 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal * Add primary node for fragment to list */ FragmentDescriptor fd; - fd.m_fragDesc.m_nodeId = conf->nodes[0]; + fd.m_fragDesc.m_nodeId = conf->fragItem[0].nodes[0]; fd.m_fragDesc.m_fragmentNo = fragNo; - fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey; + fd.m_fragDesc.m_lqhInstanceKey = conf->fragItem[0].instanceKey; if (ptr.p->m_frag_id == ZNIL) { signal->theData[2] = fd.m_dummy; @@ -2506,7 +2518,7 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal const Uint32 ownNodeId = getOwnNodeId(); Uint32 i = 0; for (i = 0; i < nodeCount; i++) - if (conf->nodes[i] == ownNodeId) + if (conf->fragItem[0].nodes[i] == ownNodeId) break; if (i == nodeCount) { @@ -2530,12 +2542,16 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); - req->senderData = ptr.i; req->tableId = tableId; - req->fragId = nextFrag; req->scanCookie = ptr.p->m_scan_cookie; + req->fragCnt = 1; + req->fragItem[0].senderData = ptr.i; + req->fragItem[0].fragId = nextFrag; + sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, - DihScanGetNodesReq::SignalLength, JBB); + DihScanGetNodesReq::FixedSignalLength + + DihScanGetNodesReq::FragItem::Length, + JBB); DBUG_VOID_RETURN; } === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped @@ -924,6 +924,10 @@ TransporterFacade::set_up_node_active_in Uint32 nodeId1, nodeId2, remoteNodeId; struct TFSendBuffer *b; + /* Need to also communicate with myself, not found in config */ + b = m_send_buffers + nodeId; + b->m_node_active = true; + for (iter.first(); iter.valid(); iter.next()) { if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1)) continue; @@ -932,8 +936,8 @@ TransporterFacade::set_up_node_active_in remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1); b = m_send_buffers + remoteNodeId; b->m_node_active = true; - } + DBUG_VOID_RETURN; } bool @@ -946,6 +950,9 @@ TransporterFacade::configure(NodeId node assert(theTransporterRegistry); assert(theClusterMgr); + /* Set up active communication with all configured nodes */ + set_up_node_active_in_send_buffers(nodeId, *conf); + // Configure transporters if (!IPCConfig::configureTransporters(nodeId, * conf, === modified file 'storage/ndb/src/ndbapi/ndberror.c' --- a/storage/ndb/src/ndbapi/ndberror.c revid:mikael.ronstrom@stripped +++ b/storage/ndb/src/ndbapi/ndberror.c revid:mikael.ronstrom@stripped @@ -201,6 +201,7 @@ ErrorBundle ErrorCodes[] = { "Out of operation records in transaction coordinator (increase MaxNoOfConcurrentOperations)" }, { 275, DMEC, TR, "Out of transaction records for complete phase (increase MaxNoOfConcurrentTransactions)" }, { 279, DMEC, TR, "Out of transaction markers in transaction coordinator" }, + { 312, DMEC, TR, "Out of LongMessageBuffer" }, { 414, DMEC, TR, "414" }, { 418, DMEC, TR, "Out of transaction buffers in LQH" }, { 419, DMEC, TR, "419" }, === modified file 'storage/ndb/test/ndbapi/testScan.cpp' --- a/storage/ndb/test/ndbapi/testScan.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/test/ndbapi/testScan.cpp revid:mikael.ronstrom@stripped @@ -2188,6 +2188,24 @@ TESTCASE("ScanReadError5030", STEP(runScanReadErrorOneNode); FINALIZER(runClearTable); } +TESTCASE("ScanReadError8095", + "Scan and insert error 8095. "\ + "TC fails to send a DIH_SCAN_GET_NODES_REQ due to "\ + "'out of LongMessageBuffers' -> terminate scan."){ + INITIALIZER(runLoadTable); + TC_PROPERTY("ErrorCode", 8095); + STEP(runScanReadError); + FINALIZER(runClearTable); +} +TESTCASE("ScanReadError7234", + "Scan and insert error 7234. "\ + "DIH fails to send a DIH_SCAN_GET_NODES_CONF due to "\ + "'out of LongMessageBuffers' -> send DIH_SCAN_GET_NODES_REF."){ + INITIALIZER(runLoadTable); + TC_PROPERTY("ErrorCode", 7234); + STEP(runScanReadError); + FINALIZER(runClearTable); +} TESTCASE("ScanReadRestart", "Scan requirement:A scan should be able to start and "\ "complete during node recovery and when one or more nodes "\ === modified file 'storage/ndb/test/ndbapi/testSpj.cpp' --- a/storage/ndb/test/ndbapi/testSpj.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/test/ndbapi/testSpj.cpp revid:mikael.ronstrom@stripped @@ -127,6 +127,8 @@ runLookupJoinError(NDBT_Context* ctx, ND 17070, 17071, 17072, // lookup_send.dupsec -> outOfSectionMem 17080, 17081, 17082, // lookup_parent_row -> OutOfQueryMemory 17120, 17121, // execTRANSID_AI -> OutOfRowMemory + 17130, // sendSignal(DIH_SCAN_GET_NODES_REQ) -> import() failed + 7234, // sendSignal(DIH_SCAN_GET_NODES_CONF) -> import() failed (DIH) 17510 // random failure when allocating seection memory }; loops = faultToInject ? 1 : sizeof(lookupFaults)/sizeof(int); No bundle (reason: useless for push emails).