3981 Ole John Aske 2012-08-24 [merge]
Merge 7.1 -> 7.2
Also added the ndb_many_fragments.test which does SPJ testing of
the fix for bugs #14143553 and #13799800 in this merge.
added:
mysql-test/suite/ndb/r/ndb_many_fragments.result
mysql-test/suite/ndb/t/ndb_many_fragments.cnf
mysql-test/suite/ndb/t/ndb_many_fragments.test
modified:
mysql-test/mysql-test-run.pl
storage/ndb/include/kernel/signaldata/DihScanTab.hpp
storage/ndb/include/kernel/signaldata/TcContinueB.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/ndbapi/ndberror.c
storage/ndb/test/ndbapi/testScan.cpp
storage/ndb/test/ndbapi/testSpj.cpp
3980 Ole John Aske 2012-08-24 [merge]
Null merge 7.1 -> 7.2
=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl 2012-08-13 09:33:22 +0000
+++ b/mysql-test/mysql-test-run.pl 2012-08-24 12:07:17 +0000
@@ -3034,10 +3034,17 @@ sub ndbd_start {
# > 5.0 { 'character-sets-dir' => \&fix_charset_dir },
my $exe= $exe_ndbd;
- if ($exe_ndbmtd and ($exe_ndbmtd_counter++ % 2) == 0)
- {
- # Use ndbmtd every other time
- $exe= $exe_ndbmtd;
+ if ($exe_ndbmtd)
+ { if ($ENV{MTR_NDBMTD})
+ {
+ # ndbmtd forced by env var MTR_NDBMTD
+ $exe= $exe_ndbmtd;
+ }
+ if (($exe_ndbmtd_counter++ % 2) == 0)
+ {
+ # Use ndbmtd every other time
+ $exe= $exe_ndbmtd;
+ }
}
my $path_ndbd_log= "$dir/ndbd.log";
=== added file 'mysql-test/suite/ndb/r/ndb_many_fragments.result'
--- a/mysql-test/suite/ndb/r/ndb_many_fragments.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_many_fragments.result 2012-08-24 12:07:17 +0000
@@ -0,0 +1,135 @@
+create table parent(a int primary key, b int) engine=ndb;
+create table child(a int, b int, primary key(a,b)) engine=ndb;
+alter table parent partition by key(a) partitions 128;
+alter table child partition by key(a,b) partitions 128;
+insert into parent values (1,1), (2,2), (3,3), (4,4);
+insert into parent select a+4, b+4 from parent;
+insert into parent select a+8, b+8 from parent;
+insert into parent select a+16, b+16 from parent;
+insert into parent select a+32, b+32 from parent;
+insert into parent select a+64, b+64 from parent;
+insert into parent select a+128, b+128 from parent;
+insert into child select * from parent;
+analyze table parent, child;
+Table Op Msg_type Msg_text
+test.parent analyze status OK
+test.child analyze status OK
+set ndb_join_pushdown = false;
+explain select straight_join count(*) from parent
+join child as c1 on c1.a = parent.b
+join child as c2 on c2.a = parent.b
+join child as c3 on c3.a = parent.b
+join child as c4 on c4.a = parent.b
+join child as c5 on c5.a = parent.b
+join child as c6 on c6.a = parent.b
+join child as c7 on c7.a = parent.b
+join child as c8 on c8.a = parent.b
+join child as c9 on c9.a = parent.b
+join child as c10 on c10.a = parent.b
+join child as c11 on c11.a = parent.b
+join child as c12 on c12.a = parent.b
+join child as c13 on c13.a = parent.b
+join child as c14 on c14.a = parent.b
+join child as c15 on c15.a = parent.b
+join child as c16 on c16.a = parent.b
+where parent.b < 2
+;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE parent ALL NULL NULL NULL NULL 256 Using where with pushed condition
+1 SIMPLE c1 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c2 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c3 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c4 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c5 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c6 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c7 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c8 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c9 ref PRIMARY PRIMARY 4 test.c1.a 1 Using where
+1 SIMPLE c10 ref PRIMARY PRIMARY 4 test.parent.b 1
+1 SIMPLE c11 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+1 SIMPLE c12 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+1 SIMPLE c13 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+1 SIMPLE c14 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+1 SIMPLE c15 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+1 SIMPLE c16 ref PRIMARY PRIMARY 4 test.c10.a 1 Using where
+select straight_join count(*) from parent
+join child as c1 on c1.a = parent.b
+join child as c2 on c2.a = parent.b
+join child as c3 on c3.a = parent.b
+join child as c4 on c4.a = parent.b
+join child as c5 on c5.a = parent.b
+join child as c6 on c6.a = parent.b
+join child as c7 on c7.a = parent.b
+join child as c8 on c8.a = parent.b
+join child as c9 on c9.a = parent.b
+join child as c10 on c10.a = parent.b
+join child as c11 on c11.a = parent.b
+join child as c12 on c12.a = parent.b
+join child as c13 on c13.a = parent.b
+join child as c14 on c14.a = parent.b
+join child as c15 on c15.a = parent.b
+join child as c16 on c16.a = parent.b
+where parent.b < 2
+;
+count(*)
+1
+set ndb_join_pushdown = true;
+explain select straight_join count(*) from parent
+join child as c1 on c1.a = parent.b
+join child as c2 on c2.a = parent.b
+join child as c3 on c3.a = parent.b
+join child as c4 on c4.a = parent.b
+join child as c5 on c5.a = parent.b
+join child as c6 on c6.a = parent.b
+join child as c7 on c7.a = parent.b
+join child as c8 on c8.a = parent.b
+join child as c9 on c9.a = parent.b
+join child as c10 on c10.a = parent.b
+join child as c11 on c11.a = parent.b
+join child as c12 on c12.a = parent.b
+join child as c13 on c13.a = parent.b
+join child as c14 on c14.a = parent.b
+join child as c15 on c15.a = parent.b
+join child as c16 on c16.a = parent.b
+where parent.b < 2
+;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE parent ALL NULL NULL NULL NULL 256 Parent of 17 pushed join@1; Using where with pushed condition
+1 SIMPLE c1 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'parent' in pushed join@1
+1 SIMPLE c2 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c1' in pushed join@1
+1 SIMPLE c3 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c2' in pushed join@1
+1 SIMPLE c4 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c3' in pushed join@1
+1 SIMPLE c5 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c4' in pushed join@1
+1 SIMPLE c6 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c5' in pushed join@1
+1 SIMPLE c7 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c6' in pushed join@1
+1 SIMPLE c8 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c7' in pushed join@1
+1 SIMPLE c9 ref PRIMARY PRIMARY 4 test.c1.a 1 Child of 'c8' in pushed join@1; Using where
+1 SIMPLE c10 ref PRIMARY PRIMARY 4 test.parent.b 1 Child of 'c9' in pushed join@1
+1 SIMPLE c11 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c10' in pushed join@1; Using where
+1 SIMPLE c12 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c11' in pushed join@1; Using where
+1 SIMPLE c13 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c12' in pushed join@1; Using where
+1 SIMPLE c14 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c13' in pushed join@1; Using where
+1 SIMPLE c15 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c14' in pushed join@1; Using where
+1 SIMPLE c16 ref PRIMARY PRIMARY 4 test.c10.a 1 Child of 'c15' in pushed join@1; Using where
+select straight_join count(*) from parent
+join child as c1 on c1.a = parent.b
+join child as c2 on c2.a = parent.b
+join child as c3 on c3.a = parent.b
+join child as c4 on c4.a = parent.b
+join child as c5 on c5.a = parent.b
+join child as c6 on c6.a = parent.b
+join child as c7 on c7.a = parent.b
+join child as c8 on c8.a = parent.b
+join child as c9 on c9.a = parent.b
+join child as c10 on c10.a = parent.b
+join child as c11 on c11.a = parent.b
+join child as c12 on c12.a = parent.b
+join child as c13 on c13.a = parent.b
+join child as c14 on c14.a = parent.b
+join child as c15 on c15.a = parent.b
+join child as c16 on c16.a = parent.b
+where parent.b < 2
+;
+count(*)
+1
+drop table parent, child;
=== added file 'mysql-test/suite/ndb/t/ndb_many_fragments.cnf'
--- a/mysql-test/suite/ndb/t/ndb_many_fragments.cnf 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_many_fragments.cnf 2012-08-24 12:07:17 +0000
@@ -0,0 +1,14 @@
+!include suite/ndb/my.cnf
+
+[cluster_config]
+# Config for 8 LQH blocks in order to allow total 128 partitions
+# with two datanodes.
+ThreadConfig=main={count=1},ldm={count=8},recv={count=1},io={count=1},tc={count=1},send={count=1},rep={count=1}
+NoOfReplicas=1
+NoOfFragmentLogParts=8
+LongMessageBuffer=8M
+
+[ENV]
+# Need to always use ndbmtd when we want lots of partitions
+# (Avoid mixed 'round robin' use of mt / non-mt)
+MTR_NDBMTD= 1
=== added file 'mysql-test/suite/ndb/t/ndb_many_fragments.test'
--- a/mysql-test/suite/ndb/t/ndb_many_fragments.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_many_fragments.test 2012-08-24 12:07:17 +0000
@@ -0,0 +1,54 @@
+#
+# Tests of table partotioned across a large number
+# of fragments. Intended to test possible issues
+# with large signal fanout, breaking the '1::4 rule'
+#
+
+
+create table parent(a int primary key, b int) engine=ndb;
+create table child(a int, b int, primary key(a,b)) engine=ndb;
+
+alter table parent partition by key(a) partitions 128;
+alter table child partition by key(a,b) partitions 128;
+
+insert into parent values (1,1), (2,2), (3,3), (4,4);
+insert into parent select a+4, b+4 from parent;
+insert into parent select a+8, b+8 from parent;
+insert into parent select a+16, b+16 from parent;
+insert into parent select a+32, b+32 from parent;
+insert into parent select a+64, b+64 from parent;
+insert into parent select a+128, b+128 from parent;
+
+insert into child select * from parent;
+analyze table parent, child;
+
+let $query =
+ select straight_join count(*) from parent
+ join child as c1 on c1.a = parent.b
+ join child as c2 on c2.a = parent.b
+ join child as c3 on c3.a = parent.b
+ join child as c4 on c4.a = parent.b
+ join child as c5 on c5.a = parent.b
+ join child as c6 on c6.a = parent.b
+ join child as c7 on c7.a = parent.b
+ join child as c8 on c8.a = parent.b
+ join child as c9 on c9.a = parent.b
+ join child as c10 on c10.a = parent.b
+ join child as c11 on c11.a = parent.b
+ join child as c12 on c12.a = parent.b
+ join child as c13 on c13.a = parent.b
+ join child as c14 on c14.a = parent.b
+ join child as c15 on c15.a = parent.b
+ join child as c16 on c16.a = parent.b
+ where parent.b < 2
+;
+
+set ndb_join_pushdown = false;
+eval explain $query;
+eval $query;
+
+set ndb_join_pushdown = true;
+eval explain $query;
+eval $query;
+
+drop table parent, child;
=== modified file 'storage/ndb/include/kernel/signaldata/DihScanTab.hpp'
--- a/storage/ndb/include/kernel/signaldata/DihScanTab.hpp 2012-04-24 13:17:43 +0000
+++ b/storage/ndb/include/kernel/signaldata/DihScanTab.hpp 2012-08-24 12:00:05 +0000
@@ -50,24 +50,82 @@ struct DihScanTabConf
struct DihScanGetNodesReq
{
- STATIC_CONST( SignalLength = 5 );
+ STATIC_CONST( FixedSignalLength = 4 );
+ STATIC_CONST( MAX_DIH_FRAG_REQS = 64); // Max #FragItem in REQ/CONF
+
Uint32 tableId;
- Uint32 senderData;
Uint32 senderRef;
- Uint32 fragId;
Uint32 scanCookie;
+ Uint32 fragCnt;
+
+ struct FragItem
+ {
+ STATIC_CONST( Length = 2 );
+
+ Uint32 senderData;
+ Uint32 fragId;
+ };
+
+ /**
+ * DihScanGetNodesReq request information about specific fragments.
+ * - These are either specified in a seperate section (long request)
+ * containing multiple FragItems.
+ * - Or directly in a single fragItem[] below (short signal) if it
+ * contain only a single FragItem.
+ */
+ FragItem fragItem[1];
};
struct DihScanGetNodesConf
{
- STATIC_CONST( SignalLength = 9 );
+ STATIC_CONST( FixedSignalLength = 2 );
+ Uint32 tableId;
+ Uint32 fragCnt;
- Uint32 senderData;
- Uint32 nodes[4];
- Uint32 count;
+ struct FragItem
+ {
+ STATIC_CONST( Length = 8 );
+
+ Uint32 senderData;
+ Uint32 fragId;
+ Uint32 instanceKey;
+ Uint32 count;
+ Uint32 nodes[4];
+ };
+
+ /**
+ * DihScanGetNodesConf supply information about specific fragments.
+ * - These are either specified in a seperate section (long request)
+ * containing multiple FragItems.
+ * - Or directly in a single fragItem[] below (short signal) if it
+ * contain only a single FragItem.
+ * Type of long/short Conf-reply will always be the same as the REQuest
+ */
+ FragItem fragItem[1];
+};
+
+struct DihScanGetNodesRef
+{
+ STATIC_CONST( FixedSignalLength = 3 );
Uint32 tableId;
- Uint32 fragId;
- Uint32 instanceKey;
+ Uint32 fragCnt;
+ Uint32 errCode;
+
+ /**
+ * DihScanGetNodesRef signals failure of a DihScanGetNodesReq.
+ * As this is likely due to a sectioned memory alloc failure,
+ * we avoid further alloc problems by returning the same FragItem[]
+ * list as in the DihScanGetNodesReq.
+ *
+ * Depending on 'fragCnt', the fragItem[] is either:
+ * - These are either specified in a seperate section (long request)
+ * containing multiple FragItems.
+ * - Or directly in a single fragItem[] below (short signal) if it
+ * contain only a single FragItem.
+ */
+ typedef DihScanGetNodesReq::FragItem FragItem; // Reused, see above
+
+ FragItem fragItem[1];
};
/**
=== modified file 'storage/ndb/include/kernel/signaldata/TcContinueB.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcContinueB.hpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcContinueB.hpp 2012-08-24 12:07:17 +0000
@@ -44,10 +44,10 @@ private:
ZWAIT_ABORT_ALL = 14,
ZCHECK_SCAN_ACTIVE_FAILED_LQH = 15,
TRIGGER_PENDING = 17,
-
- DelayTCKEYCONF = 18,
- ZNF_CHECK_TRANSACTIONS = 19,
- ZSEND_FIRE_TRIG_REQ = 20
+ DelayTCKEYCONF = 18,
+ ZNF_CHECK_TRANSACTIONS = 19,
+ ZSEND_FIRE_TRIG_REQ = 20,
+ ZSTART_FRAG_SCANS = 21
};
};
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-05-21 23:05:17 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2012-08-24 12:07:17 +0000
@@ -3885,15 +3885,18 @@ Backup::getFragmentInfo(Signal* signal,
tabPtr.p->fragments.getPtr(fragPtr, fragNo);
if(fragPtr.p->scanned == 0 && fragPtr.p->scanning == 0) {
- jam();
+ jam();
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = ptr.i;
req->tableId = tabPtr.p->tableId;
- req->fragId = fragNo;
req->scanCookie = tabPtr.p->m_scan_cookie;
- sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
+ req->fragCnt = 1;
+ req->fragItem[0].senderData = ptr.i;
+ req->fragItem[0].fragId = fragNo;
+ sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
+ DihScanGetNodesReq::FixedSignalLength
+ + DihScanGetNodesReq::FragItem::Length,
+ JBB);
return;
}//if
}//for
@@ -3916,12 +3919,21 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign
{
jamEntry();
+ /**
+ * Assume only short CONFs with a single FragItem as we only do single
+ * fragment requests in DIH_SCAN_GET_NODES_REQ from Backup::getFragmentInfo.
+ */
+ ndbrequire(signal->getNoOfSections() == 0);
+ ndbassert(signal->getLength() ==
+ DihScanGetNodesConf::FixedSignalLength
+ + DihScanGetNodesConf::FragItem::Length);
+
DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend();
- const Uint32 senderData = conf->senderData;
- const Uint32 nodeCount = conf->count;
const Uint32 tableId = conf->tableId;
- const Uint32 fragNo = conf->fragId;
- const Uint32 instanceKey = conf->instanceKey;
+ const Uint32 senderData = conf->fragItem[0].senderData;
+ const Uint32 nodeCount = conf->fragItem[0].count;
+ const Uint32 fragNo = conf->fragItem[0].fragId;
+ const Uint32 instanceKey = conf->fragItem[0].instanceKey;
ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
@@ -3935,7 +3947,7 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign
tabPtr.p->fragments.getPtr(fragPtr, fragNo);
fragPtr.p->lqhInstanceKey = instanceKey;
- fragPtr.p->node = conf->nodes[0];
+ fragPtr.p->node = conf->fragItem[0].nodes[0];
getFragmentInfo(signal, ptr, tabPtr, fragNo + 1);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-06-21 15:24:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2012-08-24 12:07:17 +0000
@@ -72,6 +72,11 @@
#define ZREPLERROR2 307
// --------------------------------------
+// Other DIH error codes
+// --------------------------------------
+#define ZLONG_MESSAGE_ERROR 312
+
+// --------------------------------------
// Crash Codes
// --------------------------------------
#define ZCOULD_NOT_OCCUR_ERROR 300
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2012-06-21 15:24:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2012-08-24 12:07:17 +0000
@@ -689,7 +689,6 @@ void Dbdih::execCONTINUEB(Signal* signal
{
jam();
TabRecordPtr tabPtr;
- jam();
tabPtr.i = signal->theData[1];
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
getTabInfo_send(signal, tabPtr);
@@ -9524,43 +9523,133 @@ error:
DihScanTabRef::SignalLength, JBB);
return;
-}//Dbdih::execDI_FCOUNTREQ()
+}//Dbdih::execDIH_SCAN_TAB_REQ()
void Dbdih::execDIH_SCAN_GET_NODES_REQ(Signal* signal)
{
- FragmentstorePtr fragPtr;
- TabRecordPtr tabPtr;
jamEntry();
+
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
- Uint32 senderRef = req->senderRef;
- Uint32 senderData = req->senderData;
- Uint32 fragId = req->fragId;
+ const Uint32 tableId = req->tableId;
+ const Uint32 senderRef = req->senderRef;
+ const Uint32 fragCnt = req->fragCnt;
- tabPtr.i = req->tableId;
+ SectionHandle reqHandle(this, signal);
+ const bool useLongSignal = (reqHandle.m_cnt > 0);
+
+ DihScanGetNodesReq::FragItem fragReq[DihScanGetNodesReq::MAX_DIH_FRAG_REQS];
+ if (useLongSignal)
+ {
+ // Long signal: Fetch into fragReq[]
+ jam();
+ SegmentedSectionPtr fragReqSection;
+ ndbrequire(reqHandle.getSection(fragReqSection,0));
+ ndbassert(fragReqSection.p->m_sz == (fragCnt*DihScanGetNodesReq::FragItem::Length));
+ ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS);
+ copy((Uint32*)fragReq, fragReqSection);
+ }
+ else // Short signal, with single FragItem
+ {
+ jam();
+ ndbassert(fragCnt == 1);
+ ndbassert(signal->getLength()
+ == DihScanGetNodesReq::FixedSignalLength + DihScanGetNodesReq::FragItem::Length);
+ memcpy(fragReq, req->fragItem, 4 * DihScanGetNodesReq::FragItem::Length);
+ }
+
+ TabRecordPtr tabPtr;
+ tabPtr.i = tableId;
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
if (DictTabInfo::isOrderedIndex(tabPtr.p->tableType)) {
jam();
tabPtr.i = tabPtr.p->primaryTableId;
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
}
-
- Uint32 nodes[MAX_REPLICAS];
- getFragstore(tabPtr.p, fragId, fragPtr);
- Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtrSend();
- conf->senderData = senderData;
- conf->nodes[0] = nodes[0];
- conf->nodes[1] = nodes[1];
- conf->nodes[2] = nodes[2];
- conf->nodes[3] = nodes[3];
- conf->count = count;
- conf->tableId = tabPtr.i;
- conf->fragId = fragId;
- conf->instanceKey = dihGetInstanceKey(fragPtr);
- sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal,
- DihScanGetNodesConf::SignalLength, JBB);
-}//Dbdih::execDIGETPRIMREQ()
+ conf->tableId = tableId;
+ conf->fragCnt = fragCnt;
+
+ for (Uint32 i=0; i < fragCnt; i++)
+ {
+ jam();
+ FragmentstorePtr fragPtr;
+ Uint32 nodes[MAX_REPLICAS];
+
+ getFragstore(tabPtr.p, fragReq[i].fragId, fragPtr);
+ Uint32 count = extractNodeInfo(jamBuffer(), fragPtr.p, nodes);
+
+ conf->fragItem[i].senderData = fragReq[i].senderData;
+ conf->fragItem[i].fragId = fragReq[i].fragId;
+ conf->fragItem[i].instanceKey = dihGetInstanceKey(fragPtr);
+ conf->fragItem[i].count = count;
+ conf->fragItem[i].nodes[0] = nodes[0];
+ conf->fragItem[i].nodes[1] = nodes[1];
+ conf->fragItem[i].nodes[2] = nodes[2];
+ conf->fragItem[i].nodes[3] = nodes[3];
+ }
+
+ if (useLongSignal)
+ {
+ jam();
+ Ptr<SectionSegment> fragConf;
+ const Uint32 len = fragCnt*DihScanGetNodesConf::FragItem::Length;
+
+ if (ERROR_INSERTED_CLEAR(7234) ||
+ unlikely(!import(fragConf, (Uint32*)conf->fragItem, len)))
+ {
+ jam();
+ DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtrSend();
+
+ ref->tableId = tableId;
+ ref->fragCnt = fragCnt;
+ ref->errCode = ZLONG_MESSAGE_ERROR;
+
+ /**
+ * NOTE: DihScanGetNodesRef return the same FragItem list
+ * received as part of the REQuest to avoid possible
+ * malloc failure handling in the REF.
+ */
+ sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_REF, signal,
+ DihScanGetNodesRef::FixedSignalLength,
+ JBB, &reqHandle);
+ return;
+ }
+ releaseSections(reqHandle);
+
+ SectionHandle confHandle(this, fragConf.i);
+ sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal,
+ DihScanGetNodesConf::FixedSignalLength,
+ JBB, &confHandle);
+ }
+ else
+ {
+ // A short signal is sufficient.
+ jam();
+ ndbassert(fragCnt == 1);
+
+ if (ERROR_INSERTED_CLEAR(7234))
+ {
+ jam();
+ DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtrSend();
+
+ ref->tableId = tableId;
+ ref->fragCnt = fragCnt;
+ ref->errCode = ZLONG_MESSAGE_ERROR;
+ ref->fragItem[0] = fragReq[0];
+
+ sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_REF, signal,
+ DihScanGetNodesRef::FixedSignalLength
+ + DihScanGetNodesRef::FragItem::Length,
+ JBB);
+ return;
+ }
+ sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal,
+ DihScanGetNodesConf::FixedSignalLength
+ + DihScanGetNodesConf::FragItem::Length,
+ JBB);
+ }
+}//Dbdih::execDIH_SCAN_GET_NODES_REQ
void
Dbdih::execDIH_SCAN_TAB_COMPLETE_REP(Signal* signal)
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-08-24 11:53:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-08-24 12:07:17 +0000
@@ -1253,6 +1253,10 @@ private:
void scanIndex_release_rangekeys(Ptr<Request>, Ptr<TreeNode>);
+ Uint32 scanindex_sendDihGetNodesReq(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr);
+
/**
* Page manager
*/
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-08-24 11:53:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-08-24 12:07:17 +0000
@@ -5056,25 +5056,14 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig
if (!pruned)
{
+ /** Start requesting node info from DIH */
jam();
- Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId;
- DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend();
- req->senderRef = reference();
- req->tableId = tableId;
- req->scanCookie = cookie;
-
- Uint32 cnt = 0;
- Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
- for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
+ err = scanindex_sendDihGetNodesReq(signal, requestPtr, treeNodePtr);
+ if (unlikely(err != 0))
{
jam();
- req->senderData = fragPtr.i;
- req->fragId = fragPtr.p->m_fragId;
- sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
- cnt++;
+ break;
}
- data.m_frags_outstanding = cnt;
requestPtr.p->m_outstanding++;
}
else
@@ -5097,46 +5086,206 @@ error:
abort(signal, requestPtr, err);
}
+/**
+ * Will check the fragment list for fragments which need to
+ * get node info to construct 'fragPtr.p->m_ref' from DIH.
+ *
+ * In order to avoid CPU starvation, or unmanagable huge FragItem[],
+ * max MAX_DIH_FRAG_REQS are requested in a single signal.
+ * If there are more fragments, we have to repeatable call this
+ * function when CONF for the first fragment set is received.
+ */
+Uint32
+Dbspj::scanindex_sendDihGetNodesReq(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
+{
+ jam();
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ Ptr<ScanFragHandle> fragPtr;
+ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+
+ DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend();
+ Uint32 fragCnt = 0;
+ for (list.first(fragPtr);
+ !fragPtr.isNull() && fragCnt < DihScanGetNodesReq::MAX_DIH_FRAG_REQS;
+ list.next(fragPtr))
+ {
+ jam();
+ if (fragPtr.p->m_ref == 0) // Need GSN_DIH_SCAN_GET_NODES_REQ
+ {
+ jam();
+ req->fragItem[fragCnt].senderData = fragPtr.i;
+ req->fragItem[fragCnt].fragId = fragPtr.p->m_fragId;
+ fragCnt++;
+ }
+ }
+
+ if (fragCnt > 0)
+ {
+ jam();
+ Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId;
+ req->senderRef = reference();
+ req->tableId = tableId;
+ req->scanCookie = data.m_scanCookie;
+ req->fragCnt = fragCnt;
+
+ /** Always send as a long signal, even if a short would
+ * have been sufficient in the (rare) case of 'fragCnt==1'
+ */
+ Ptr<SectionSegment> fragReq;
+ Uint32 len = fragCnt*DihScanGetNodesReq::FragItem::Length;
+ if (ERROR_INSERTED_CLEAR(17130) ||
+ unlikely(!import(fragReq, (Uint32*)req->fragItem, len)))
+ {
+ jam();
+ return DbspjErr::OutOfSectionMemory;
+ }
+
+ SectionHandle handle(this, fragReq.i);
+ sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
+ DihScanGetNodesReq::FixedSignalLength,
+ JBB, &handle);
+
+ data.m_frags_outstanding += fragCnt;
+ }
+ return 0;
+} //Dbspj::scanindex_sendDihGetNodesReq
+
void
Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal)
{
jamEntry();
- ndbrequire(false);
-}
+ const DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtr();
+//const Uint32 tableId = ref->tableId;
+ const Uint32 fragCnt = ref->fragCnt;
+ const Uint32 errCode = ref->errCode;
+ ndbassert(errCode != 0);
+
+ if (signal->getNoOfSections() > 0)
+ {
+ // Long signal: FragItems listed in first section
+ jam();
+ SectionHandle handle(this, signal);
+ ndbassert(handle.m_cnt==1);
+ SegmentedSectionPtr fragRefSection;
+ ndbrequire(handle.getSection(fragRefSection,0));
+ ndbassert(fragRefSection.p->m_sz == (fragCnt*DihScanGetNodesRef::FragItem::Length));
+ ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS);
+ copy((Uint32*)ref->fragItem, fragRefSection);
+ releaseSections(handle);
+ }
+ else // Short signal, single frag in ref->fragItem[0]
+ {
+ ndbassert(fragCnt == 1);
+ ndbassert(signal->getLength()
+ == DihScanGetNodesRef::FixedSignalLength + DihScanGetNodesRef::FragItem::Length);
+ }
+
+ UintR treeNodePtrI = RNIL;
+ for (Uint32 i=0; i < fragCnt; i++)
+ {
+ jam();
+ const Uint32 senderData = ref->fragItem[i].senderData;
+
+ Ptr<ScanFragHandle> fragPtr;
+ m_scanfraghandle_pool.getPtr(fragPtr, senderData);
+
+ // All fragItem[] should be for same TreeNode
+ ndbassert (treeNodePtrI == RNIL || treeNodePtrI == fragPtr.p->m_treeNodePtrI);
+ treeNodePtrI = fragPtr.p->m_treeNodePtrI;
+ } //for
+
+ ndbassert(treeNodePtrI != RNIL); // fragCnt > 0 above
+ Ptr<TreeNode> treeNodePtr;
+ m_treenode_pool.getPtr(treeNodePtr, treeNodePtrI);
+
+ ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+ ndbassert(data.m_frags_outstanding == fragCnt);
+ data.m_frags_outstanding -= fragCnt;
+
+ Ptr<Request> requestPtr;
+ m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
+ abort(signal, requestPtr, errCode);
+
+ if (data.m_frags_outstanding == 0)
+ {
+ jam();
+ treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+ checkPrepareComplete(signal, requestPtr, 1);
+ }
+}//Dbspj::execDIH_SCAN_GET_NODES_REF
void
Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
{
jamEntry();
+ const DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
+ const Uint32 fragCnt = conf->fragCnt;
- DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
+ if (signal->getNoOfSections() > 0)
+ {
+ // Unpack long signal
+ jam();
+ SectionHandle handle(this, signal);
+ SegmentedSectionPtr fragConfSection;
+ ndbrequire(handle.getSection(fragConfSection,0));
+ ndbassert(fragConfSection.p->m_sz == (fragCnt*DihScanGetNodesConf::FragItem::Length));
+ copy((Uint32*)conf->fragItem, fragConfSection);
+ releaseSections(handle);
+ }
+ else // Short signal, with single FragItem
+ {
+ jam();
+ ndbassert(fragCnt == 1);
+ ndbassert(signal->getLength()
+ == DihScanGetNodesConf::FixedSignalLength + DihScanGetNodesConf::FragItem::Length);
+ }
- Uint32 senderData = conf->senderData;
- Uint32 node = conf->nodes[0];
- Uint32 instanceKey = conf->instanceKey;
+ UintR treeNodePtrI = RNIL;
+ for (Uint32 i=0; i < fragCnt; i++)
+ {
+ jam();
+ const Uint32 senderData = conf->fragItem[i].senderData;
+ const Uint32 node = conf->fragItem[i].nodes[0];
+ const Uint32 instanceKey = conf->fragItem[i].instanceKey;
- Ptr<ScanFragHandle> fragPtr;
- m_scanfraghandle_pool.getPtr(fragPtr, senderData);
+ Ptr<ScanFragHandle> fragPtr;
+ m_scanfraghandle_pool.getPtr(fragPtr, senderData);
+
+ // All fragItem[] should be for same TreeNode
+ ndbassert (treeNodePtrI == RNIL || treeNodePtrI == fragPtr.p->m_treeNodePtrI);
+ treeNodePtrI = fragPtr.p->m_treeNodePtrI;
+
+ fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
+ } //for
+
+ ndbassert(treeNodePtrI != RNIL); // fragCnt > 0 above
Ptr<TreeNode> treeNodePtr;
- m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI);
- ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
+ m_treenode_pool.getPtr(treeNodePtr, treeNodePtrI);
+
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
- ndbrequire(data.m_frags_outstanding > 0);
- data.m_frags_outstanding--;
+ ndbassert(data.m_frags_outstanding == fragCnt);
+ data.m_frags_outstanding -= fragCnt;
- fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
+ Ptr<Request> requestPtr;
+ m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
- if (data.m_frags_outstanding == 0)
+ /** Check if we need to send more GSN_DIH_SCAN_GET_NODES_REQ */
+ Uint32 err = scanindex_sendDihGetNodesReq(signal, requestPtr, treeNodePtr);
+ if (unlikely(err != 0))
{
jam();
+ abort(signal, requestPtr, err);
+ }
+ if (data.m_frags_outstanding == 0)
+ {
+ jam();
treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
-
- Ptr<Request> requestPtr;
- m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
checkPrepareComplete(signal, requestPtr, 1);
}
-}
+}//Dbspj::execDIH_SCAN_GET_NODES_CONF
Uint32
Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list,
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-04-24 14:41:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-08-24 12:07:17 +0000
@@ -29,6 +29,7 @@
#include <DataBuffer.hpp>
#include <Bitmask.hpp>
#include <AttributeList.hpp>
+#include <signaldata/DihScanTab.hpp>
#include <signaldata/AttrInfo.hpp>
#include <signaldata/LqhTransConf.hpp>
#include <signaldata/LqhKey.hpp>
@@ -1496,6 +1497,12 @@ private:
void initScanfragrec(Signal* signal);
void releaseScanResources(Signal*, ScanRecordPtr, bool not_started = false);
ScanRecordPtr seizeScanrec(Signal* signal);
+ void startFragScansLab(Signal*, Uint32 tableId,
+ SectionHandle&, Uint32 secOffs);
+ void startFragScanLab(Signal*, Uint32 tableId,
+ const DihScanGetNodesConf::FragItem& fragConf);
+
+ void sendDihGetNodesReq(Signal*, ScanRecordPtr);
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*, bool);
void sendScanTabConf(Signal* signal, ScanRecordPtr);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-06-25 11:35:54 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-08-24 12:07:17 +0000
@@ -354,6 +354,13 @@ void Dbtc::execCONTINUEB(Signal* signal)
}
sendFireTrigReq(signal, apiConnectptr, signal->theData[4]);
return;
+ case TcContinueB::ZSTART_FRAG_SCANS:
+ {
+ jam();
+ SectionHandle handle(this, signal);
+ startFragScansLab(signal, Tdata0, handle, Tdata1);
+ return;
+ }
default:
ndbrequire(false);
}//switch
@@ -10927,7 +10934,7 @@ void Dbtc::diFcountReqLab(Signal* signal
}//Dbtc::diFcountReqLab()
/********************************************************************
- * execDI_FCOUNTCONF
+ * execDIH_SCAN_TAB_CONF
*
* WE HAVE ASKED DIH ABOUT THE NUMBER OF FRAGMENTS IN THIS TABLE.
* WE WILL NOW START A NUMBER OF PARALLEL SCAN PROCESSES. EACH OF
@@ -11030,50 +11037,145 @@ void Dbtc::execDIH_SCAN_TAB_CONF(Signal*
setApiConTimer(apiConnectptr.i, 0, __LINE__);
updateBuddyTimer(apiConnectptr);
+ /** Need own local scope of list(...,m_running_scan_frags) */
+ {
+ ScanFragRecPtr ptr;
+ ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+
+ /**
+ * Initially list(...,m_running_scan_frags) contains an 'IDLE' entry
+ * for all fragments. Now assign fragId to those 'tfragCount' fragments
+ * to execute in parallel.
+ */
+ for (list.first(ptr); !ptr.isNull() && tfragCount;
+ list.next(ptr), tfragCount--){
+ jam();
+
+ ndbassert(ptr.p->scanFragState == ScanFragRec::IDLE);
+ ptr.p->lqhBlockref = 0;
+ ptr.p->scanFragId = scanptr.p->scanNextFragId++;
+ }//for
+
+ /**
+ * Any remaining fragments, not allowed to execute in parallel, are
+ * put into the 'queued-list' until they can be executed.
+ */
+ ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags);
+ for (; !ptr.isNull();)
+ {
+ jam();
+ ptr.p->m_ops = 0;
+ ptr.p->m_totalLen = 0;
+ ptr.p->m_scan_frag_conf_status = 1;
+ ptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
+ ptr.p->stopFragTimer();
+
+ ScanFragRecPtr tmp;
+ tmp.i = ptr.i;
+ tmp.p = ptr.p;
+ list.next(ptr);
+ list.remove(tmp);
+ queued.add(tmp);
+ scanptr.p->m_queued_count++;
+ }
+ }
+
+ /**
+ * Start by requesting fragment info from DIH.
+ * Max MAX_DIH_FRAG_REQS fragments can be requested at once.
+ * If needed we will send more requests after CONF is received.
+ */
+ jam();
+ sendDihGetNodesReq(signal, scanptr);
+
+}//Dbtc::execDIH_SCAN_TAB_CONF()
+
+/********************************************************************
+ * sendDihGetNodesReq
+ *
+ * Will check the 'm_running_scan_frags' list for fragments which
+ * are still 'IDLE'. These should be started by requesting
+ * node info in a DIH_SCAN_GET_NODES_REQ.
+ *
+ * In order to avoid CPU starvation, or unmanagable huge FragItem[],
+ * max MAX_DIH_FRAG_REQS are requested in a single signal.
+ * If there are more fragments, we have to repeatable call this
+ * function when CONF for the first fragment set is received.
+ ********************************************************************/
+void Dbtc::sendDihGetNodesReq(Signal* signal, ScanRecordPtr scanptr)
+{
+ jam();
ScanFragRecPtr ptr;
- ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
- for (list.first(ptr); !ptr.isNull() && tfragCount;
- list.next(ptr), tfragCount--){
- jam();
+ DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
+ Uint32 fragCnt = 0;
- ptr.p->lqhBlockref = 0;
- ptr.p->startFragTimer(ctcTimer);
- ptr.p->scanFragId = scanptr.p->scanNextFragId++;
- ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
- ptr.p->startFragTimer(ctcTimer);
+ { // running-list scope
+ ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ for (list.first(ptr);
+ !ptr.isNull() && fragCnt < DihScanGetNodesReq::MAX_DIH_FRAG_REQS;
+ list.next(ptr))
+ {
+ jam();
+ if (ptr.p->scanFragState == ScanFragRec::IDLE) // Start it NOW!.
+ {
+ jam();
+ ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
+ ptr.p->startFragTimer(ctcTimer);
+ req->fragItem[fragCnt].senderData = ptr.i;
+ req->fragItem[fragCnt].fragId = ptr.p->scanFragId;
- DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
+ fragCnt++;
+ } // if IDLE
+ }
+ } // running-list scope
+
+ if (fragCnt > 0)
+ {
+ jam();
req->senderRef = reference();
- req->senderData = ptr.i;
req->tableId = scanptr.p->scanTableref;
- req->fragId = ptr.p->scanFragId;
req->scanCookie = scanptr.p->m_scan_cookie;
- sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
- }//for
+ req->fragCnt = fragCnt;
- ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags);
- for (; !ptr.isNull();)
- {
- ptr.p->m_ops = 0;
- ptr.p->m_totalLen = 0;
- ptr.p->m_scan_frag_conf_status = 1;
- ptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
- ptr.p->stopFragTimer();
-
- ScanFragRecPtr tmp;
- tmp.i = ptr.i;
- tmp.p = ptr.p;
- list.next(ptr);
- list.remove(tmp);
- queued.add(tmp);
- scanptr.p->m_queued_count++;
+ /** Always send as a long signal, even if a short would
+ * have been sufficient in the (rare) case of 'fragCnt==1'
+ */
+ Ptr<SectionSegment> fragReq;
+ const Uint32 len = fragCnt*DihScanGetNodesReq::FragItem::Length;
+
+ if (ERROR_INSERTED_CLEAR(8095) || // Fail once
+ unlikely(!import(fragReq, (Uint32*)req->fragItem, len)))
+ {
+ jam();
+
+ /** Handling of failed REQ is similar to :execDIH_SCAN_GET_NODES_REF */
+ for (Uint32 i = 0; i < fragCnt; i++)
+ {
+ jam();
+ ptr.i = req->fragItem[i].senderData;
+ c_scan_frag_pool.getPtr(ptr);
+
+ ndbrequire(ptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
+ ptr.p->scanFragState = ScanFragRec::COMPLETED;
+ ptr.p->stopFragTimer();
+ {
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ run.release(ptr);
+ }
+ }
+ scanError(signal, scanptr, ZGET_DATAREC_ERROR);
+ return;
+ }
+ SectionHandle handle(this, fragReq.i);
+ sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal,
+ DihScanGetNodesReq::FixedSignalLength,
+ JBB, &handle);
}
-}//Dbtc::execDI_FCOUNTCONF()
+}//Dbtc::sendDihGetNodesReq
/******************************************************
- * execDI_FCOUNTREF
+ * execDIH_SCAN_TAB_REF
******************************************************/
void Dbtc::execDIH_SCAN_TAB_REF(Signal* signal)
{
@@ -11095,7 +11197,7 @@ void Dbtc::execDIH_SCAN_TAB_REF(Signal*
return;
}//if
abortScanLab(signal, scanptr, errCode, true);
-}//Dbtc::execDI_FCOUNTREF()
+}//Dbtc::execDIH_SCAN_TAB_REF()
void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode,
bool not_started)
@@ -11160,23 +11262,113 @@ void Dbtc::releaseScanResources(Signal*
/****************************************************************
- * execDIGETPRIMCONF
+ * execDIH_SCAN_GET_NODES_CONF
*
- * WE HAVE RECEIVED THE PRIMARY NODE OF THIS FRAGMENT.
- * WE ARE NOW READY TO ASK FOR PERMISSION TO LOAD THIS
- * SPECIFIC NODE WITH A SCAN OPERATION.
+ * WE HAVE RECEIVED THE PRIMARY NODES OF ALL FRAGMENTS.
+ * WE ARE NOW READY TO ASK FOR PERMISSION TO LOAD THESE
+ * NODES WITH A SCAN OPERATIONS.
****************************************************************/
void Dbtc::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
{
jamEntry();
DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
- scanFragptr.i = conf->senderData;
+ const Uint32 tableId = conf->tableId;
+ const Uint32 fragCnt = conf->fragCnt;
+
+ if (signal->getNoOfSections() > 0)
+ {
+ // Long signal: FragItems listed in first section
+ jam();
+ SectionHandle handle(this, signal);
+ ndbassert(handle.m_cnt==1);
+ startFragScansLab(signal, tableId, handle, 0);
+ }
+ else // Short signal, with single FragItem
+ {
+ jam();
+ ndbassert(fragCnt == 1);
+ ndbassert(signal->getLength()
+ == DihScanGetNodesConf::FixedSignalLength + DihScanGetNodesConf::FragItem::Length);
+
+ DihScanGetNodesConf::FragItem fragConf[1];
+ memcpy(fragConf, conf->fragItem, 4 * DihScanGetNodesConf::FragItem::Length);
+ startFragScanLab(signal, tableId, fragConf[0]);
+ }
+
+ /**
+ * As MAX_DIH_FRAG_REQS fragments can be requested at once,
+ * we may have to send more DIH_SCAN_GET_NODES_REQ now
+ */
+ ScanRecordPtr scanptr;
+ scanptr.i = scanFragptr.p->scanRec;
+ ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+
+ jam();
+ sendDihGetNodesReq(signal, scanptr);
+
+}//Dbtc::execDIH_SCAN_GET_NODES_CONF
+
+/****************************************************************
+ * startFragScansLab
+ *
+ * PROCESS THE LIST OF DihScanGetNodesConf::FragItem RECEIVED
+ * FROM ::execDIH_SCAN_GET_NODES_CONF. SEND A 'SCAN_FRAGREQ'
+ * FOR EACH OF THESE.
+ * AVOID PRODUCING TOO MANY LOCAL SIGNALS WHICH MAY RESULT IN
+ * A 'Out of SendBuffers' ERROR. IN THESE CASES WE TAKE A BREAK
+ * AND CONTINUEB LATER.
+ ****************************************************************/
+void Dbtc::startFragScansLab(Signal* signal, Uint32 tableId,
+ SectionHandle& handle, Uint32 secOffs)
+{
+ Uint32 cntLocalSignals = 0;
+ const NodeId ownNodeId = getOwnNodeId();
+ SectionReader fragReader(handle.m_ptr[0], getSectionSegmentPool());
+ ndbassert((fragReader.getSize() % DihScanGetNodesConf::FragItem::Length) == 0);
+ ndbrequire(fragReader.step(secOffs));
+
+ DihScanGetNodesConf::FragItem fragConf;
+ while (fragReader.getWords((Uint32*)&fragConf,DihScanGetNodesConf::FragItem::Length))
+ {
+ jam();
+ if (fragConf.nodes[0] == ownNodeId)
+ cntLocalSignals++;
+
+ /**
+ * A max fanout of 1::4 of consumed::produced signals are allowed.
+ * If we are about to produce more, we have to contine later.
+ */
+ if (cntLocalSignals >= 4)
+ {
+ jam();
+ signal->theData[0] = TcContinueB::ZSTART_FRAG_SCANS;
+ signal->theData[1] = tableId;
+ signal->theData[2] = secOffs;
+ sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB, &handle);
+ return;
+ }
+
+ startFragScanLab(signal, tableId, fragConf);
+ secOffs += DihScanGetNodesConf::FragItem::Length;
+ } //while
+
+ jam();
+ releaseSections(handle);
+}//Dbtc::startFragScansLab
+
+void Dbtc::startFragScanLab(Signal* signal, Uint32 tableId,
+ const DihScanGetNodesConf::FragItem& fragConf)
+{
+ jam();
+ const NodeId ownNodeId = getOwnNodeId();
+
+ scanFragptr.i = fragConf.senderData;
c_scan_frag_pool.getPtr(scanFragptr);
- tnodeid = conf->nodes[0];
+ tnodeid = fragConf.nodes[0];
arrGuard(tnodeid, MAX_NDB_NODES);
- if(ERROR_INSERTED(8050) && tnodeid != getOwnNodeId())
+ if (ERROR_INSERTED(8050) && tnodeid != ownNodeId)
{
/* Asked to scan a fragment which is not on the same node as the
* TC - transaction hinting / scan partition pruning has failed
@@ -11197,16 +11389,15 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S
* can "pass" committing on backup fragments and
* get incorrect row count
*/
- if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
+ if (false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
{
jam();
- Uint32 nodeid = getOwnNodeId();
- for(Uint32 i = 1; i<conf->count; i++)
+ for (Uint32 i = 1; i<fragConf.count; i++)
{
- if(conf->nodes[i] == nodeid)
+ if (fragConf.nodes[i] == ownNodeId)
{
jam();
- tnodeid = nodeid;
+ tnodeid = ownNodeId;
break;
}
}
@@ -11271,17 +11462,10 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S
* If this is the last SCANFRAGREQ, sendScanFragReq will release
* the KeyInfo and AttrInfo sections when sending.
*/
- Uint32 instanceKey = conf->instanceKey;
+ Uint32 instanceKey = fragConf.instanceKey;
scanFragptr.p->lqhBlockref = numberToRef(scanptr.p->m_scan_block_no,
instanceKey, tnodeid);
-#if 0
- if (scanptr.p->m_scan_block_no == DBSPJ)
- {
- scanFragptr.p->lqhBlockref = numberToRef(scanptr.p->m_scan_block_no,
- instanceKey, tnodeid);
- }
-#endif
scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount;
/* Determine whether this is the last scanFragReq
@@ -11306,35 +11490,77 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S
* WE HAVE NOW STARTED A FRAGMENT SCAN. NOW
* WAIT FOR THE FIRST SCANNED RECORDS
*********************************************/
-}//Dbtc::execDIGETPRIMCONF
+}//Dbtc::startFragScanLab
+
/***************************************************
- * execDIGETPRIMREF
+ * execDIH_SCAN_GET_NODES_REF
*
* WE ARE NOW FORCED TO STOP THE SCAN. THIS ERROR
* IS NOT RECOVERABLE SINCE THERE IS A PROBLEM WITH
- * FINDING A PRIMARY REPLICA OF A CERTAIN FRAGMENT.
+ * FINDING A PRIMARY REPLICA OF SOME FRAGMENT(s).
***************************************************/
void Dbtc::execDIH_SCAN_GET_NODES_REF(Signal* signal)
{
jamEntry();
- // tcConnectptr.i in theData[0] is not used.
- scanFragptr.i = signal->theData[1];
- const Uint32 errCode = signal->theData[2];
- c_scan_frag_pool.getPtr(scanFragptr);
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
+ const DihScanGetNodesRef* ref = (DihScanGetNodesRef*)signal->getDataPtr();
+//const Uint32 tableId = ref->tableId;
+ const Uint32 fragCnt = ref->fragCnt;
+ const Uint32 errCode = ref->errCode;
+
+ if (signal->getNoOfSections() > 0)
+ {
+ // Long signal: FragItems listed in first section
+ jam();
+ SectionHandle handle(this, signal);
+ ndbassert(handle.m_cnt==1);
+
+ SegmentedSectionPtr fragRefSection;
+ ndbrequire(handle.getSection(fragRefSection,0));
+ ndbassert(fragRefSection.p->m_sz == (fragCnt*DihScanGetNodesRef::FragItem::Length));
+ ndbassert(fragCnt <= DihScanGetNodesReq::MAX_DIH_FRAG_REQS);
+ copy((Uint32*)ref->fragItem, fragRefSection);
+ releaseSections(handle);
+ }
+ else // Short signal, single frag in ref->fragItem[0]
+ {
+ ndbassert(fragCnt == 1);
+ ndbassert(signal->getLength()
+ == DihScanGetNodesRef::FixedSignalLength + DihScanGetNodesRef::FragItem::Length);
+ }
ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+ scanptr.setNull();
+ for (Uint32 i = 0; i < fragCnt; i++)
{
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
- run.release(scanFragptr);
+ jam();
+ scanFragptr.i = ref->fragItem[i].senderData;
+ jam(); jamLine(ref->fragItem[i].fragId); //OJA
+ c_scan_frag_pool.getPtr(scanFragptr);
+
+ jam(); //OJA TEMP
+ ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
+ scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
+ scanFragptr.p->stopFragTimer();
+
+ jam(); //OJA TEMP
+ // All scanFrags should belong to the same table scan
+ ndbassert(scanptr.isNull() || scanptr.i==scanFragptr.p->scanRec);
+ if (scanptr.isNull())
+ {
+ jam();
+ scanptr.i = scanFragptr.p->scanRec;
+ ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+ }
+ {
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ run.release(scanFragptr);
+ }
}
scanError(signal, scanptr, errCode);
-}//Dbtc::execDIGETPRIMREF()
+}//Dbtc::execDIH_SCAN_GET_NODES_REF()
/**
* Dbtc::execSCAN_FRAGREF
@@ -11503,12 +11729,15 @@ void Dbtc::execSCAN_FRAGCONF(Signal* sig
scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = scanFragptr.i;
req->tableId = scanptr.p->scanTableref;
- req->fragId = scanFragptr.p->scanFragId;
req->scanCookie = scanptr.p->m_scan_cookie;
+ req->fragCnt = 1;
+ req->fragItem[0].senderData = scanFragptr.i;
+ req->fragItem[0].fragId = scanFragptr.p->scanFragId;
sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
+ DihScanGetNodesReq::FixedSignalLength
+ + DihScanGetNodesReq::FragItem::Length,
+ JBB);
return;
}
/*
@@ -11719,12 +11948,15 @@ void Dbtc::execSCAN_NEXTREQ(Signal* sign
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = scanFragptr.i;
req->tableId = scanptr.p->scanTableref;
- req->fragId = scanFragptr.p->scanFragId;
req->scanCookie = scanptr.p->m_scan_cookie;
+ req->fragCnt = 1;
+ req->fragItem[0].senderData = scanFragptr.i;
+ req->fragItem[0].fragId = scanFragptr.p->scanFragId;
sendSignal(cdihblockref, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
+ DihScanGetNodesReq::FixedSignalLength
+ + DihScanGetNodesReq::FragItem::Length,
+ JBB);
}
else
{
@@ -11788,7 +12020,7 @@ Dbtc::close_scan_req(Signal* signal, Sca
switch(curr.p->scanFragState){
case ScanFragRec::IDLE:
jam(); // real early abort
- ndbrequire(old == ScanRecord::WAIT_AI);
+ ndbrequire(old == ScanRecord::WAIT_AI || old == ScanRecord::RUNNING);
running.release(curr);
continue;
case ScanFragRec::WAIT_GET_PRIMCONF:
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2012-08-24 12:07:17 +0000
@@ -2456,12 +2456,16 @@ Suma::execDIH_SCAN_TAB_CONF(Signal* sign
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = ptr.i;
req->tableId = tableId;
- req->fragId = 0;
req->scanCookie = scanCookie;
+ req->fragCnt = 1;
+ req->fragItem[0].senderData = ptr.i;
+ req->fragItem[0].fragId = 0;
+
sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
+ DihScanGetNodesReq::FixedSignalLength
+ + DihScanGetNodesReq::FragItem::Length,
+ JBB);
DBUG_VOID_RETURN;
}
@@ -2470,18 +2474,26 @@ void
Suma::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
{
jamEntry();
- DBUG_ENTER("Suma::execDIGETPRIMCONF");
+ DBUG_ENTER("Suma::execDIH_SCAN_GET_NODES_CONF");
+
+ /**
+ * Assume a short signal, with a single FragItem being returned
+ * as we do only single fragment requests in
+ * DIH_SCAN_GET_NODES_REQs sent from SUMA.
+ */
ndbassert(signal->getNoOfSections() == 0);
+ ndbassert(signal->getLength() ==
+ DihScanGetNodesConf::FixedSignalLength
+ + DihScanGetNodesConf::FragItem::Length);
DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr();
- const Uint32 nodeCount = conf->count;
const Uint32 tableId = conf->tableId;
- const Uint32 fragNo = conf->fragId;
-
+ const Uint32 fragNo = conf->fragItem[0].fragId;
+ const Uint32 nodeCount = conf->fragItem[0].count;
ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
Ptr<SyncRecord> ptr;
- c_syncPool.getPtr(ptr, conf->senderData);
+ c_syncPool.getPtr(ptr, conf->fragItem[0].senderData);
{
LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
@@ -2490,9 +2502,9 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal
* Add primary node for fragment to list
*/
FragmentDescriptor fd;
- fd.m_fragDesc.m_nodeId = conf->nodes[0];
+ fd.m_fragDesc.m_nodeId = conf->fragItem[0].nodes[0];
fd.m_fragDesc.m_fragmentNo = fragNo;
- fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
+ fd.m_fragDesc.m_lqhInstanceKey = conf->fragItem[0].instanceKey;
if (ptr.p->m_frag_id == ZNIL)
{
signal->theData[2] = fd.m_dummy;
@@ -2506,7 +2518,7 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal
const Uint32 ownNodeId = getOwnNodeId();
Uint32 i = 0;
for (i = 0; i < nodeCount; i++)
- if (conf->nodes[i] == ownNodeId)
+ if (conf->fragItem[0].nodes[i] == ownNodeId)
break;
if (i == nodeCount)
{
@@ -2530,12 +2542,16 @@ Suma::execDIH_SCAN_GET_NODES_CONF(Signal
DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = ptr.i;
req->tableId = tableId;
- req->fragId = nextFrag;
req->scanCookie = ptr.p->m_scan_cookie;
+ req->fragCnt = 1;
+ req->fragItem[0].senderData = ptr.i;
+ req->fragItem[0].fragId = nextFrag;
+
sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
- DihScanGetNodesReq::SignalLength, JBB);
+ DihScanGetNodesReq::FixedSignalLength
+ + DihScanGetNodesReq::FragItem::Length,
+ JBB);
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2012-07-04 12:44:30 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2012-08-24 12:07:17 +0000
@@ -201,6 +201,7 @@ ErrorBundle ErrorCodes[] = {
"Out of operation records in transaction coordinator (increase MaxNoOfConcurrentOperations)" },
{ 275, DMEC, TR, "Out of transaction records for complete phase (increase MaxNoOfConcurrentTransactions)" },
{ 279, DMEC, TR, "Out of transaction markers in transaction coordinator" },
+ { 312, DMEC, TR, "Out of LongMessageBuffer" },
{ 414, DMEC, TR, "414" },
{ 418, DMEC, TR, "Out of transaction buffers in LQH" },
{ 419, DMEC, TR, "419" },
=== modified file 'storage/ndb/test/ndbapi/testScan.cpp'
--- a/storage/ndb/test/ndbapi/testScan.cpp 2012-06-26 12:33:15 +0000
+++ b/storage/ndb/test/ndbapi/testScan.cpp 2012-08-24 12:07:17 +0000
@@ -2188,6 +2188,24 @@ TESTCASE("ScanReadError5030",
STEP(runScanReadErrorOneNode);
FINALIZER(runClearTable);
}
+TESTCASE("ScanReadError8095",
+ "Scan and insert error 8095. "\
+ "TC fails to send a DIH_SCAN_GET_NODES_REQ due to "\
+ "'out of LongMessageBuffers' -> terminate scan."){
+ INITIALIZER(runLoadTable);
+ TC_PROPERTY("ErrorCode", 8095);
+ STEP(runScanReadError);
+ FINALIZER(runClearTable);
+}
+TESTCASE("ScanReadError7234",
+ "Scan and insert error 7234. "\
+ "DIH fails to send a DIH_SCAN_GET_NODES_CONF due to "\
+ "'out of LongMessageBuffers' -> send DIH_SCAN_GET_NODES_REF."){
+ INITIALIZER(runLoadTable);
+ TC_PROPERTY("ErrorCode", 7234);
+ STEP(runScanReadError);
+ FINALIZER(runClearTable);
+}
TESTCASE("ScanReadRestart",
"Scan requirement:A scan should be able to start and "\
"complete during node recovery and when one or more nodes "\
=== modified file 'storage/ndb/test/ndbapi/testSpj.cpp'
--- a/storage/ndb/test/ndbapi/testSpj.cpp 2012-06-19 11:04:55 +0000
+++ b/storage/ndb/test/ndbapi/testSpj.cpp 2012-08-24 12:07:17 +0000
@@ -127,6 +127,8 @@ runLookupJoinError(NDBT_Context* ctx, ND
17070, 17071, 17072, // lookup_send.dupsec -> outOfSectionMem
17080, 17081, 17082, // lookup_parent_row -> OutOfQueryMemory
17120, 17121, // execTRANSID_AI -> OutOfRowMemory
+ 17130, // sendSignal(DIH_SCAN_GET_NODES_REQ) -> import() failed
+ 7234, // sendSignal(DIH_SCAN_GET_NODES_CONF) -> import() failed (DIH)
17510 // random failure when allocating seection memory
};
loops = faultToInject ? 1 : sizeof(lookupFaults)/sizeof(int);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (ole.john.aske:3980 to 3981) | Ole John Aske | 27 Aug |