Below is the list of changes that have just been committed into a local
5.0 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1791 05/04/07 11:28:43 pekka@stripped +1 -0
ndb - DbtupScan.cpp added
ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
1.1 05/04/07 11:27:56 pekka@stripped +314 -0
ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
1.0 05/04/07 11:27:56 pekka@stripped +0 -0
BitKeeper file /export/space/pekka/ndb/version/my50-ndb/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: pekka
# Host: clam.ndb.mysql.com
# Root: /export/space/pekka/ndb/version/my50-ndb
--- New file ---
+++ ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 05/04/07 11:27:56
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define DBTUP_C
#include "Dbtup.hpp"
#include <signaldata/AccScan.hpp>
#include <signaldata/NextScan.hpp>
#undef jam
#undef jamEntry
#define jam() { jamLine(32000 + __LINE__); }
#define jamEntry() { jamEntryLine(32000 + __LINE__); }
void
Dbtup::execACC_SCANREQ(Signal* signal)
{
jamEntry();
const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
const AccScanReq* const req = &reqCopy;
ScanOpPtr scanPtr;
scanPtr.i = RNIL;
do {
// find table and fragments
TablerecPtr tablePtr;
tablePtr.i = req->tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
FragrecordPtr fragPtr[2];
Uint32 fragId = req->fragmentNo;
fragPtr[0].i = fragPtr[1].i = RNIL;
getFragmentrec(fragPtr[0], fragId | 0, tablePtr.p);
getFragmentrec(fragPtr[1], fragId | 1, tablePtr.p);
ndbrequire(fragPtr[0].i != RNIL && fragPtr[1].i != RNIL);
Fragrecord& frag = *fragPtr[0].p;
// seize from pool and link to per-fragment list
if (! frag.m_scanList.seize(scanPtr)) {
jam();
break;
}
new (scanPtr.p) ScanOp();
ScanOp& scan = *scanPtr.p;
scan.m_state = ScanOp::First;
scan.m_userPtr = req->senderData;
scan.m_userRef = req->senderRef;
scan.m_tableId = tablePtr.i;
scan.m_fragId = frag.fragmentId;
scan.m_fragPtrI[0] = fragPtr[0].i;
scan.m_fragPtrI[1] = fragPtr[1].i;
scan.m_transId1 = req->transId1;
scan.m_transId2 = req->transId2;
// conf
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
conf->scanPtr = req->senderData;
conf->accPtr = scanPtr.i;
conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal,
AccScanConf::SignalLength, JBB);
return;
} while (0);
if (scanPtr.i != RNIL) {
jam();
releaseScanOp(scanPtr);
}
// LQH does not handle REF
signal->theData[0] = 0x313;
sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
}
void
Dbtup::execNEXT_SCANREQ(Signal* signal)
{
jamEntry();
const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
const NextScanReq* const req = &reqCopy;
ScanOpPtr scanPtr;
c_scanOpPool.getPtr(scanPtr, req->accPtr);
ScanOp& scan = *scanPtr.p;
FragrecordPtr fragPtr;
fragPtr.i = scan.m_fragPtrI[0];
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
Fragrecord& frag = *fragPtr.p;
switch (req->scanFlag) {
case NextScanReq::ZSCAN_NEXT:
jam();
break;
case NextScanReq::ZSCAN_NEXT_COMMIT:
jam();
break;
case NextScanReq::ZSCAN_COMMIT:
jam();
{
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
return;
}
break;
case NextScanReq::ZSCAN_CLOSE:
jam();
scanClose(signal, scanPtr);
return;
case NextScanReq::ZSCAN_NEXT_ABORT:
jam();
default:
jam();
ndbrequire(false);
break;
}
// start looking for next scan result
AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
checkReq->accPtr = scanPtr.i;
checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
jamEntry();
}
void
Dbtup::execACC_CHECK_SCAN(Signal* signal)
{
jamEntry();
const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
const AccCheckScan* const req = &reqCopy;
ScanOpPtr scanPtr;
c_scanOpPool.getPtr(scanPtr, req->accPtr);
ScanOp& scan = *scanPtr.p;
FragrecordPtr fragPtr;
fragPtr.i = scan.m_fragPtrI[0];
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
Fragrecord& frag = *fragPtr.p;
if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
jam();
signal->theData[0] = scan.m_userPtr;
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
return;
}
if (scan.m_state == ScanOp::First) {
jam();
scanFirst(signal, scanPtr);
}
if (scan.m_state == ScanOp::Next) {
jam();
scanNext(signal, scanPtr);
}
if (scan.m_state == ScanOp::Locked) {
jam();
const PagePos& pos = scan.m_scanPos;
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->accOperationPtr = (Uint32)-1; // no lock returned
conf->fragId = frag.fragmentId | pos.m_fragBit;
conf->localKey[0] = (pos.m_pageId << MAX_TUPLES_BITS) |
(pos.m_tupleNo << 1);
conf->localKey[1] = 0;
conf->localKeyLength = 1;
unsigned signalLength = 6;
Uint32 blockNo = refToBlock(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
jamEntry();
// next time look for next entry
scan.m_state = ScanOp::Next;
return;
}
if (scan.m_state == ScanOp::Last ||
scan.m_state == ScanOp::Invalid) {
jam();
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL;
conf->fragId = RNIL;
unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
return;
}
ndbrequire(false);
}
void
Dbtup::scanFirst(Signal* signal, ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
// set to first fragment, first page, first tuple
PagePos& pos = scan.m_scanPos;
pos.m_fragId = scan.m_fragId;
pos.m_fragBit = 0;
pos.m_pageId = 0;
pos.m_tupleNo = 0;
// just before
pos.m_match = false;
// let scanNext() do the work
scan.m_state = ScanOp::Next;
}
// TODO optimize this + index build
void
Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
PagePos& pos = scan.m_scanPos;
TablerecPtr tablePtr;
tablePtr.i = scan.m_tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
while (true) {
// TODO time-slice here after X loops
jam();
// get fragment
if (pos.m_fragBit == 2) {
jam();
scan.m_state = ScanOp::Last;
break;
}
ndbrequire(pos.m_fragBit <= 1);
FragrecordPtr fragPtr;
fragPtr.i = scan.m_fragPtrI[pos.m_fragBit];
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
Fragrecord& frag = *fragPtr.p;
// get page
PagePtr pagePtr;
if (pos.m_pageId >= frag.noOfPages) {
jam();
pos.m_fragBit++;
pos.m_pageId = 0;
pos.m_tupleNo = 0;
pos.m_match = false;
continue;
}
Uint32 realPageId = getRealpid(fragPtr.p, pos.m_pageId);
pagePtr.i = realPageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS];
if (pageState != ZTH_MM_FREE &&
pageState != ZTH_MM_FULL) {
jam();
pos.m_pageId++;
pos.m_tupleNo = 0;
pos.m_match = false;
continue;
}
// get next tuple
if (pos.m_match)
pos.m_tupleNo++;
pos.m_match = true;
const Uint32 tupheadsize = tablePtr.p->tupheadsize;
Uint32 pageOffset = ZPAGE_HEADER_SIZE + pos.m_tupleNo * tupheadsize;
if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) {
jam();
pos.m_pageId++;
pos.m_tupleNo = 0;
pos.m_match = false;
continue;
}
// skip over free tuple
bool isFree = false;
if (pageState == ZTH_MM_FREE) {
jam();
if ((pagePtr.p->pageWord[pageOffset] >> 16) == tupheadsize) {
Uint32 nextTuple = pagePtr.p->pageWord[ZFREELIST_HEADER_POS] >> 16;
while (nextTuple != 0) {
jam();
if (nextTuple == pageOffset) {
jam();
isFree = true;
break;
}
nextTuple = pagePtr.p->pageWord[nextTuple] & 0xffff;
}
}
}
if (isFree) {
jam();
continue;
}
// TODO check for operation and return latest in own tx
scan.m_state = ScanOp::Locked;
break;
}
}
void
Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
{
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scanPtr.p->m_userPtr;
conf->accOperationPtr = RNIL;
conf->fragId = RNIL;
unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
releaseScanOp(scanPtr);
}
void
Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
{
FragrecordPtr fragPtr;
fragPtr.i = scanPtr.p->m_fragPtrI[0];
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
fragPtr.p->m_scanList.release(scanPtr);
}
| Thread |
|---|
| • bk commit into 5.0 tree (pekka:1.1791) | pekka | 7 Apr |