3238 Jan Wedvik 2010-08-23
This commit adds a regression test for http://lists.mysql.com/commits/116372
(missing rows in left-join scan-scan queries).
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown.result
mysql-test/suite/ndb/t/ndb_join_pushdown.test
3237 Jan Wedvik 2010-08-23
This commit fixes an error in how the node mask ('len' in SCAN TAB CONF) was
interpreted for queries with unique index lookup operations. A unique index
operation will have two different nodes in the operation tree sent to the
SPJ block. This was not taken into consideration when interpreting the node
mask in the API. The API was thus confused about what it would get in the next
batch.
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown.result
mysql-test/suite/ndb/t/ndb_join_pushdown.test
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3236 Jan Wedvik 2010-08-23
This commit (implemented by Jonas Oreland) fixes an error concerning scan-scan
queries with multiple batches running on clusters with more than one data node.
'senderData' values from different SPJ block instances would sometimes collide,
causing interference between different sub scan instances.
modified:
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
3235 Jan Wedvik 2010-08-20
This commit fixes an error in the way left joins where executed when running
scan-scan queries. Sometimes there would be missing rows (with NULL right hand
side) in the result set. This error caused the ndb_index_ordered test to fail
(with missing rows).
To verify this fix, run the SQL code below and observer that you get 381 rows
(as is correct) rather than 380 as before.
drop database if exists spj_ndb;
create database spj_ndb;
use spj_ndb;
set ndb_join_pushdown = true;
create table t1 (pk int primary key, a int, b int) engine=ndb;
create index ix1 on t1(b,a);
insert into t1 values (0,10,10);
insert into t1 values (1,10,10);
insert into t1 values (2,10,10);
insert into t1 values (3,10,10);
insert into t1 values (4,10,10);
insert into t1 values (5,10,10);
insert into t1 values (6,10,10);
insert into t1 values (7,10,10);
insert into t1 values (8,10,10);
insert into t1 values (9,10,10);
insert into t1 values (10,10,10);
insert into t1 values (11,10,10);
insert into t1 values (12,10,10);
insert into t1 values (13,10,10);
insert into t1 values (14,10,10);
insert into t1 values (15,10,10);
insert into t1 values (16,10,10);
insert into t1 values (17,10,10);
insert into t1 values (18,10,10);
insert into t1 values (19,20,10);
select * from t1 as x1 left join t1 as x2 on x1.a=x2.b;
modified:
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_join_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2010-08-17 12:04:13 +0000
+++ b/mysql-test/suite/ndb/r/ndb_join_pushdown.result 2010-08-23 10:23:17 +0000
@@ -3349,4 +3349,38 @@ id select_type table type possible_keys
select * from t1 as x join t1 as y on x.u=y.pk order by(x.pk);
pk u pk u
drop table t1;
+create table t1 (pk int primary key, u int not null, a int, b int) engine=ndb;
+create index ix1 on t1(b,a);
+create unique index ix2 on t1(u);
+insert into t1 values (0,0,10,10);
+insert into t1 values (1,1,10,10);
+insert into t1 values (2,2,10,10);
+insert into t1 values (3,3,10,10);
+insert into t1 values (4,4,10,10);
+insert into t1 values (5,5,10,10);
+insert into t1 values (6,6,10,10);
+insert into t1 values (7,7,10,10);
+insert into t1 values (8,8,10,10);
+insert into t1 values (9,9,10,10);
+insert into t1 values (10,10,10,10);
+insert into t1 values (11,11,10,10);
+explain select count(*) from t1 as x1 join t1 as x2 join t1 as x3
+on x1.a=x2.u and x2.a = x3.b;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE x1 ALL NULL NULL NULL NULL 12 Parent of 3 pushed join@1
+1 SIMPLE x2 eq_ref ix2 ix2 4 test.x1.a 1 Child of pushed join@1
+1 SIMPLE x3 ref ix1 ix1 5 test.x2.a 1 Child of pushed join@1; Using where
+select count(*) from t1 as x1 join t1 as x2 join t1 as x3
+on x1.a=x2.u and x2.a = x3.b;
+count(*)
+144
+insert into t1 values (12,12,20,10);
+explain select count(*) from t1 as x1 left join t1 as x2 on x1.a=x2.b;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE x1 ALL NULL NULL NULL NULL 13 Parent of 2 pushed join@1
+1 SIMPLE x2 ref ix1 ix1 5 test.x1.a 1 Child of pushed join@1
+select count(*) from t1 as x1 left join t1 as x2 on x1.a=x2.b;
+count(*)
+157
+drop table t1;
set ndb_join_pushdown = @save_ndb_join_pushdown;
=== modified file 'mysql-test/suite/ndb/t/ndb_join_pushdown.test'
--- a/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2010-08-17 12:04:13 +0000
+++ b/mysql-test/suite/ndb/t/ndb_join_pushdown.test 2010-08-23 10:23:17 +0000
@@ -2379,5 +2379,40 @@ select * from t1 as x join t1 as y on x.
drop table t1;
+# Test query using "scan -> unique index lookup -> index scan".
+
+create table t1 (pk int primary key, u int not null, a int, b int) engine=ndb;
+create index ix1 on t1(b,a);
+create unique index ix2 on t1(u);
+
+insert into t1 values (0,0,10,10);
+insert into t1 values (1,1,10,10);
+insert into t1 values (2,2,10,10);
+insert into t1 values (3,3,10,10);
+insert into t1 values (4,4,10,10);
+insert into t1 values (5,5,10,10);
+insert into t1 values (6,6,10,10);
+insert into t1 values (7,7,10,10);
+insert into t1 values (8,8,10,10);
+insert into t1 values (9,9,10,10);
+insert into t1 values (10,10,10,10);
+insert into t1 values (11,11,10,10);
+
+explain select count(*) from t1 as x1 join t1 as x2 join t1 as x3
+on x1.a=x2.u and x2.a = x3.b;
+
+select count(*) from t1 as x1 join t1 as x2 join t1 as x3
+on x1.a=x2.u and x2.a = x3.b;
+
+# Regression test for commit http://lists.mysql.com/commits/116372
+# (missing rows in left join query with multiple result batches).
+
+insert into t1 values (12,12,20,10);
+
+explain select count(*) from t1 as x1 left join t1 as x2 on x1.a=x2.b;
+select count(*) from t1 as x1 left join t1 as x2 on x1.a=x2.b;
+
+drop table t1;
+
set ndb_join_pushdown = @save_ndb_join_pushdown;
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2010-05-03 04:49:08 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2010-08-23 10:15:57 +0000
@@ -4113,25 +4113,19 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal*
ndbrequire(instanceKey != 0);
lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
}
+
+ Uint32 attrInfo[25];
+ attrInfo[0] = table.attrInfoLen;
+ attrInfo[1] = 0;
+ attrInfo[2] = 0;
+ attrInfo[3] = 0;
+ attrInfo[4] = 0;
+ memcpy(attrInfo + 5, table.attrInfo, 4*table.attrInfoLen);
+ LinearSectionPtr ptr[3];
+ ptr[0].p = attrInfo;
+ ptr[0].sz = 5 + table.attrInfoLen;
sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
- ScanFragReq::SignalLength, JBB);
-
- signal->theData[0] = filePtr.i;
- signal->theData[1] = 0;
- signal->theData[2] = (BACKUP << 20) + (getOwnNodeId() << 8);
-
- // Return all
- signal->theData[3] = table.attrInfoLen;
- signal->theData[4] = 0;
- signal->theData[5] = 0;
- signal->theData[6] = 0;
- signal->theData[7] = 0;
-
- Uint32 dataPos = 8;
- memcpy(signal->theData + dataPos, table.attrInfo, 4*table.attrInfoLen);
- dataPos += table.attrInfoLen;
- ndbassert(dataPos < 25);
- sendSignal(lqhRef, GSN_ATTRINFO, signal, dataPos, JBB);
+ ScanFragReq::SignalLength, JBB, ptr, 1);
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2010-08-16 11:37:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2010-08-23 10:15:57 +0000
@@ -1990,6 +1990,7 @@ public:
UintR simpleTcConnect;
UintR tableref;
UintR tcOprec;
+ Uint32 tcHashKeyHi;
UintR tcScanInfo;
UintR tcScanRec;
UintR totReclenAi;
@@ -2275,7 +2276,8 @@ private:
Uint32 transid1,
Uint32 transid2,
Uint32 fragId,
- Uint32 nodeId);
+ Uint32 nodeId,
+ Uint32 hashHi);
void finishScanrec(Signal* signal);
void releaseScanrec(Signal* signal);
void seizeScanrec(Signal* signal);
@@ -2335,7 +2337,7 @@ private:
LogPartRecordPtr flfLogPartPtr,
LogFileRecordPtr* parLogFilePtr);
void findPageRef(Signal* signal, CommitLogRecord* commitLogRecord);
- int findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec);
+ int findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec, UintR hi);
void getFirstInLogQueue(Signal* signal);
bool getFragmentrec(Signal* signal, Uint32 fragId);
void initialiseAddfragrec(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-08-16 11:37:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2010-08-23 10:15:57 +0000
@@ -3045,7 +3045,7 @@ void Dblqh::execLQHKEYREF(Signal* signal
* not find an easy way to modify the code so that findTransaction
* is usable also for them
*/
- if (findTransaction(transid1, transid2, tcOprec) != ZOK)
+ if (findTransaction(transid1, transid2, tcOprec, 0) != ZOK)
{
jam();
warningReport(signal, 14);
@@ -3730,7 +3730,7 @@ void Dblqh::execKEYINFO(Signal* signal)
Uint32 transid1 = signal->theData[1];
Uint32 transid2 = signal->theData[2];
jamEntry();
- if (findTransaction(transid1, transid2, tcOprec) != ZOK) {
+ if (findTransaction(transid1, transid2, tcOprec, 0) != ZOK) {
jam();
return;
}//if
@@ -3837,7 +3837,7 @@ void Dblqh::execATTRINFO(Signal* signal)
jamEntry();
if (findTransaction(transid1,
transid2,
- tcOprec) != ZOK) {
+ tcOprec, 0) != ZOK) {
jam();
return;
}//if
@@ -4002,7 +4002,8 @@ void Dblqh::lqhAttrinfoLab(Signal* signa
/* ------ FIND TRANSACTION BY USING HASH TABLE ------- */
/* */
/* ------------------------------------------------------------------------- */
-int Dblqh::findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec)
+int Dblqh::findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec,
+ Uint32 hi)
{
TcConnectionrec *regTcConnectionrec = tcConnectionrec;
Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
@@ -4014,7 +4015,8 @@ int Dblqh::findTransaction(UintR Transid
ptrCheckGuard(locTcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
if ((locTcConnectptr.p->transid[0] == Transid1) &&
(locTcConnectptr.p->transid[1] == Transid2) &&
- (locTcConnectptr.p->tcOprec == TcOprec)) {
+ (locTcConnectptr.p->tcOprec == TcOprec) &&
+ (locTcConnectptr.p->tcHashKeyHi == hi)) {
/* FIRST PART OF TRANSACTION CORRECT */
/* SECOND PART ALSO CORRECT */
/* THE OPERATION RECORD POINTER IN TC WAS ALSO CORRECT */
@@ -4251,6 +4253,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
Uint32 senderRef = regTcPtr->clientBlockref = signal->senderBlockRef();
regTcPtr->clientConnectrec = sig0;
regTcPtr->tcOprec = sig0;
+ regTcPtr->tcHashKeyHi = 0;
regTcPtr->storedProcId = ZNIL;
regTcPtr->lqhKeyReqId = cTotalLqhKeyReqCount;
regTcPtr->m_flags= 0;
@@ -5521,7 +5524,8 @@ void Dblqh::handleUserUnlockRequest(Sign
*/
if (unlikely( findTransaction(regTcPtr->transid[0],
regTcPtr->transid[1],
- tcOpRecIndex) != ZOK))
+ tcOpRecIndex,
+ 0) != ZOK))
{
jam();
unlockError(signal, ZBAD_OP_REF);
@@ -7228,7 +7232,7 @@ void Dblqh::execCOMMITREQ(Signal* signal
}//if
if (findTransaction(transid1,
transid2,
- tcOprec) != ZOK) {
+ tcOprec, 0) != ZOK) {
warningReport(signal, 5);
return;
}//if
@@ -7366,7 +7370,7 @@ void Dblqh::execCOMPLETEREQ(Signal* sign
}//if
if (findTransaction(transid1,
transid2,
- tcOprec) != ZOK) {
+ tcOprec, 0) != ZOK) {
jam();
/*---------------------------------------------------------*/
/* FOR SOME REASON THE COMPLETE PHASE STARTED AFTER */
@@ -8035,7 +8039,7 @@ void Dblqh::execABORT(Signal* signal)
}//if
if (findTransaction(transid1,
transid2,
- tcOprec) != ZOK) {
+ tcOprec, 0) != ZOK) {
jam();
if(ERROR_INSERTED(5039) &&
@@ -8127,7 +8131,7 @@ void Dblqh::execABORTREQ(Signal* signal)
}//if
if (findTransaction(transid1,
transid2,
- tcOprec) != ZOK) {
+ tcOprec, 0) != ZOK) {
signal->theData[0] = reqPtr;
signal->theData[2] = cownNodeid;
signal->theData[3] = transid1;
@@ -9279,8 +9283,12 @@ void Dblqh::execSCAN_NEXTREQ(Signal* sig
const Uint32 transid1 = nextReq->transId1;
const Uint32 transid2 = nextReq->transId2;
const Uint32 senderData = nextReq->senderData;
+ Uint32 hashHi = signal->getSendersBlockRef();
+ /**
+ * XXX TODO handle upgrade...
+ */
- if (findTransaction(transid1, transid2, senderData) != ZOK){
+ if (findTransaction(transid1, transid2, senderData, hashHi) != ZOK){
jam();
DEBUG(senderData <<
" Received SCAN_NEXTREQ in LQH with close flag when closed");
@@ -9790,6 +9798,10 @@ void Dblqh::execSCAN_FRAGREQ(Signal* sig
Uint32 senderData;
Uint32 hashIndex;
TcConnectionrecPtr nextHashptr;
+ Uint32 senderHi = signal->getSendersBlockRef();
+ /**
+ * XXX TODO handle upgrade...
+ */
const Uint32 reqinfo = scanFragReq->requestInfo;
@@ -9895,7 +9907,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* sig
transid1,
transid2,
fragId,
- ZNIL);
+ ZNIL,
+ senderHi);
tcConnectptr.p->save1 = 0;
tcConnectptr.p->primKeyLen = keyLen;
tcConnectptr.p->applRef = scanFragReq->resultRef;
@@ -11211,7 +11224,8 @@ void Dblqh::initScanTc(const ScanFragReq
Uint32 transid1,
Uint32 transid2,
Uint32 fragId,
- Uint32 nodeId)
+ Uint32 nodeId,
+ Uint32 hashHi)
{
tcConnectptr.p->transid[0] = transid1;
tcConnectptr.p->transid[1] = transid2;
@@ -11220,6 +11234,7 @@ void Dblqh::initScanTc(const ScanFragReq
tcConnectptr.p->fragmentid = fragId;
tcConnectptr.p->fragmentptr = fragptr.i;
tcConnectptr.p->tcOprec = tcConnectptr.p->clientConnectrec;
+ tcConnectptr.p->tcHashKeyHi = hashHi;
tcConnectptr.p->tcBlockref = tcConnectptr.p->clientBlockref;
tcConnectptr.p->errorCode = 0;
tcConnectptr.p->reclenAiLqhkey = 0;
@@ -11760,12 +11775,14 @@ void Dblqh::execCOPY_FRAGREQ(Signal* sig
0,
(DBLQH << 20) + (cownNodeid << 8),
fragId,
- copyFragReq->nodeId);
+ copyFragReq->nodeId,
+ 0);
cactiveCopy[cnoActiveCopy] = fragptr.i;
cnoActiveCopy++;
tcConnectptr.p->copyCountWords = 0;
tcConnectptr.p->tcOprec = tcConnectptr.i;
+ tcConnectptr.p->tcHashKeyHi = 0;
tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
tcConnectptr.p->savePointId = gci;
tcConnectptr.p->applRef = 0;
@@ -17760,6 +17777,7 @@ void Dblqh::execLogRecord(Signal* signal
tcConnectptr.p->nextReplica = refToNode(ref);
tcConnectptr.p->connectState = TcConnectionrec::LOG_CONNECTED;
tcConnectptr.p->tcOprec = tcConnectptr.i;
+ tcConnectptr.p->tcHashKeyHi = 0;
packLqhkeyreqLab(signal);
return;
}//Dblqh::execLogRecord()
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2010-06-14 19:33:54 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2010-08-23 10:15:57 +0000
@@ -2521,32 +2521,25 @@ Suma::SyncRecord::nextScan(Signal* signa
req->batch_size_rows= parallelism;
req->batch_size_bytes= 0;
- suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
- ScanFragReq::SignalLength, JBB);
-
- signal->theData[0] = ptrI;
- signal->theData[1] = 0;
- signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);
-
- // Return all
- signal->theData[3] = attrBuf.getSize();
- signal->theData[4] = 0;
- signal->theData[5] = 0;
- signal->theData[6] = 0;
- signal->theData[7] = 0;
+
+ Uint32 * attrInfo = signal->theData + 25;
+ attrInfo[0] = attrBuf.getSize();
+ attrInfo[1] = 0;
+ attrInfo[2] = 0;
+ attrInfo[3] = 0;
+ attrInfo[4] = 0;
- Uint32 dataPos = 8;
+ Uint32 pos = 5;
DataBuffer<15>::DataBufferIterator it;
- for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
- AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);
- if(dataPos == 25){
- suma.sendSignal(lqhRef, GSN_ATTRINFO, signal, 25, JBB);
- dataPos = 3;
- }
- }
- if(dataPos != 3){
- suma.sendSignal(lqhRef, GSN_ATTRINFO, signal, dataPos, JBB);
+ for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it))
+ {
+ AttributeHeader::init(&attrInfo[pos++], * it.data, 0);
}
+ LinearSectionPtr ptr[3];
+ ptr[0].p = attrInfo;
+ ptr[0].sz = pos;
+ suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
+ ScanFragReq::SignalLength, JBB, ptr, 1);
m_currentNoOfAttributes = attrBuf.getSize();
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-08-20 11:31:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2010-08-23 10:20:42 +0000
@@ -3606,7 +3606,7 @@ NdbQueryOperationImpl::calculateBatchedR
m_children[i]->calculateBatchedRows(scanParent);
#ifdef TEST_SCANREQ
- m_maxBatchRows = 1; // To force usage of SCAN_NEXTREQ even for small scans resultsets
+ m_maxBatchRows = 2; // To force usage of SCAN_NEXTREQ even for small scans resultsets
#endif
if (scanParent!=NULL)
@@ -4160,7 +4160,9 @@ NdbQueryOperationImpl::prepareLookupKeyI
bool
NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len){
if (traceSignals) {
- ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" << endl;
+ ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
+ << " operation no: "
+ << getQueryOperationDef().getQueryOperationIx() << endl;
}
bool ret = false;
NdbRootFragment* rootFrag = NULL;
@@ -4319,15 +4321,27 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
ndbout << " resultStream(root) {" << resultStream << "}" << endl;
}
- for(Uint32 opNo = 0; opNo < m_queryImpl.getQueryDef().getNoOfOperations();
- opNo++)
+ const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
+ for(Uint32 opNo = 0; opNo < queryDef.getNoOfOperations(); opNo++)
{
+ /* Find the node number seen by the SPJ block. Since a unique index
+ * operation will have two distincts nodes in the tree used by the
+ * SPJ block, this number may be different from 'opNo'.*/
+ const Uint32 internalOpNo =
+ queryDef.getQueryOperation(opNo).getQueryOperationId();
+ assert(internalOpNo >= opNo);
/* Mark each scan node to indicate if the current batch is the last in the
* current sub-scan.
*/
rootFrag.getResultStream(opNo)
- .setSubScanComplete((nodeMask & (1 << opNo)) == 0);
+ .setSubScanComplete((nodeMask & (1 << internalOpNo)) == 0);
}
+#ifndef NDEBUG
+ const NdbQueryOperationDefImpl& finalOpDef =
+ queryDef.getQueryOperation(queryDef.getNoOfOperations() - 1);
+ // Check that nodeMask does not have more bits than we have operations.
+ assert(nodeMask >> 1+finalOpDef.getQueryOperationId() == 0);
+#endif
bool ret = false;
if (rootFrag.isFragBatchComplete()) {
/* This fragment is now complete*/
Attachment: [text/bzr-bundle] bzr/jan.wedvik@sun.com-20100823102317-ygwcegdwocdsxo53.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (jan.wedvik:3235to 3238) | Jan Wedvik | 23 Aug |