List:Internals« Previous MessageNext Message »
From:pekka Date:April 7 2005 9:28am
Subject:bk commit into 5.0 tree (pekka:1.1791)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1791 05/04/07 11:28:43 pekka@stripped +1 -0
  ndb - DbtupScan.cpp added

  ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
    1.1 05/04/07 11:27:56 pekka@stripped +314 -0

  ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
    1.0 05/04/07 11:27:56 pekka@stripped +0 -0
    BitKeeper file /export/space/pekka/ndb/version/my50-ndb/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	clam.ndb.mysql.com
# Root:	/export/space/pekka/ndb/version/my50-ndb
--- New file ---
+++ ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	05/04/07 11:27:56
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#define DBTUP_C
#include "Dbtup.hpp"
#include <signaldata/AccScan.hpp>
#include <signaldata/NextScan.hpp>

#undef jam
#undef jamEntry
#define jam() { jamLine(32000 + __LINE__); }
#define jamEntry() { jamEntryLine(32000 + __LINE__); }

void
Dbtup::execACC_SCANREQ(Signal* signal)
{
  jamEntry();
  const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
  const AccScanReq* const req = &reqCopy;
  ScanOpPtr scanPtr;
  scanPtr.i = RNIL;
  do {
    // find table and fragments
    TablerecPtr tablePtr;
    tablePtr.i = req->tableId;
    ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
    FragrecordPtr fragPtr[2];
    Uint32 fragId = req->fragmentNo;
    fragPtr[0].i = fragPtr[1].i = RNIL;
    getFragmentrec(fragPtr[0], fragId | 0, tablePtr.p);
    getFragmentrec(fragPtr[1], fragId | 1, tablePtr.p);
    ndbrequire(fragPtr[0].i != RNIL && fragPtr[1].i != RNIL);
    Fragrecord& frag = *fragPtr[0].p;
    // seize from pool and link to per-fragment list
    if (! frag.m_scanList.seize(scanPtr)) {
      jam();
      break;
    }
    new (scanPtr.p) ScanOp();
    ScanOp& scan = *scanPtr.p;
    scan.m_state = ScanOp::First;
    scan.m_userPtr = req->senderData;
    scan.m_userRef = req->senderRef;
    scan.m_tableId = tablePtr.i;
    scan.m_fragId = frag.fragmentId;
    scan.m_fragPtrI[0] = fragPtr[0].i;
    scan.m_fragPtrI[1] = fragPtr[1].i;
    scan.m_transId1 = req->transId1;
    scan.m_transId2 = req->transId2;
    // conf
    AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
    conf->scanPtr = req->senderData;
    conf->accPtr = scanPtr.i;
    conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
    sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal,
        AccScanConf::SignalLength, JBB);
    return;
  } while (0);
  if (scanPtr.i != RNIL) {
    jam();
    releaseScanOp(scanPtr);
  }
  // LQH does not handle REF
  signal->theData[0] = 0x313;
  sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
}

void
Dbtup::execNEXT_SCANREQ(Signal* signal)
{
  jamEntry();
  const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
  const NextScanReq* const req = &reqCopy;
  ScanOpPtr scanPtr;
  c_scanOpPool.getPtr(scanPtr, req->accPtr);
  ScanOp& scan = *scanPtr.p;
  FragrecordPtr fragPtr;
  fragPtr.i = scan.m_fragPtrI[0];
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  Fragrecord& frag = *fragPtr.p;
  switch (req->scanFlag) {
  case NextScanReq::ZSCAN_NEXT:
    jam();
    break;
  case NextScanReq::ZSCAN_NEXT_COMMIT:
    jam();
    break;
  case NextScanReq::ZSCAN_COMMIT:
    jam();
    {
      NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
      conf->scanPtr = scan.m_userPtr;
      unsigned signalLength = 1;
      sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
          signal, signalLength, JBB);
      return;
    }
    break;
  case NextScanReq::ZSCAN_CLOSE:
    jam();
    scanClose(signal, scanPtr);
    return;
  case NextScanReq::ZSCAN_NEXT_ABORT:
    jam();
  default:
    jam();
    ndbrequire(false);
    break;
  }
  // start looking for next scan result
  AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
  checkReq->accPtr = scanPtr.i;
  checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
  EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
  jamEntry();
}

void
Dbtup::execACC_CHECK_SCAN(Signal* signal)
{
  jamEntry();
  const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
  const AccCheckScan* const req = &reqCopy;
  ScanOpPtr scanPtr;
  c_scanOpPool.getPtr(scanPtr, req->accPtr);
  ScanOp& scan = *scanPtr.p;
  FragrecordPtr fragPtr;
  fragPtr.i = scan.m_fragPtrI[0];
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  Fragrecord& frag = *fragPtr.p;
  if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
    jam();
    signal->theData[0] = scan.m_userPtr;
    signal->theData[1] = true;
    EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
    jamEntry();
    return;
  }
  if (scan.m_state == ScanOp::First) {
    jam();
    scanFirst(signal, scanPtr);
  }
  if (scan.m_state == ScanOp::Next) {
    jam();
    scanNext(signal, scanPtr);
  }
  if (scan.m_state == ScanOp::Locked) {
    jam();
    const PagePos& pos = scan.m_scanPos;
    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
    conf->accOperationPtr = (Uint32)-1; // no lock returned
    conf->fragId = frag.fragmentId | pos.m_fragBit;
    conf->localKey[0] = (pos.m_pageId << MAX_TUPLES_BITS) |
                        (pos.m_tupleNo << 1);
    conf->localKey[1] = 0;
    conf->localKeyLength = 1;
    unsigned signalLength = 6;
    Uint32 blockNo = refToBlock(scan.m_userRef);
    EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
    jamEntry();
    // next time look for next entry
    scan.m_state = ScanOp::Next;
    return;
  }
  if (scan.m_state == ScanOp::Last ||
      scan.m_state == ScanOp::Invalid) {
    jam();
    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
    conf->scanPtr = scan.m_userPtr;
    conf->accOperationPtr = RNIL;
    conf->fragId = RNIL;
    unsigned signalLength = 3;
    sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
        signal, signalLength, JBB);
    return;
  }
  ndbrequire(false);
}

void
Dbtup::scanFirst(Signal* signal, ScanOpPtr scanPtr)
{
  ScanOp& scan = *scanPtr.p;
  // set to first fragment, first page, first tuple
  PagePos& pos = scan.m_scanPos;
  pos.m_fragId = scan.m_fragId;
  pos.m_fragBit = 0;
  pos.m_pageId = 0;
  pos.m_tupleNo = 0;
  // just before
  pos.m_match = false;
  // let scanNext() do the work
  scan.m_state = ScanOp::Next;
}

// TODO optimize this + index build
void
Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
  ScanOp& scan = *scanPtr.p;
  PagePos& pos = scan.m_scanPos;
  TablerecPtr tablePtr;
  tablePtr.i = scan.m_tableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
  while (true) {
    // TODO time-slice here after X loops
    jam();
    // get fragment
    if (pos.m_fragBit == 2) {
      jam();
      scan.m_state = ScanOp::Last;
      break;
    }
    ndbrequire(pos.m_fragBit <= 1);
    FragrecordPtr fragPtr;
    fragPtr.i = scan.m_fragPtrI[pos.m_fragBit];
    ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
    Fragrecord& frag = *fragPtr.p;
    // get page
    PagePtr pagePtr;
    if (pos.m_pageId >= frag.noOfPages) {
      jam();
      pos.m_fragBit++;
      pos.m_pageId = 0;
      pos.m_tupleNo = 0;
      pos.m_match = false;
      continue;
    }
    Uint32 realPageId = getRealpid(fragPtr.p, pos.m_pageId);
    pagePtr.i = realPageId;
    ptrCheckGuard(pagePtr, cnoOfPage, page);
    const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS];
    if (pageState != ZTH_MM_FREE &&
        pageState != ZTH_MM_FULL) {
      jam();
      pos.m_pageId++;
      pos.m_tupleNo = 0;
      pos.m_match = false;
      continue;
    }
    // get next tuple
    if (pos.m_match)
      pos.m_tupleNo++;
    pos.m_match = true;
    const Uint32 tupheadsize = tablePtr.p->tupheadsize;
    Uint32 pageOffset = ZPAGE_HEADER_SIZE + pos.m_tupleNo * tupheadsize;
    if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) {
      jam();
      pos.m_pageId++;
      pos.m_tupleNo = 0;
      pos.m_match = false;
      continue;
    }
    // skip over free tuple
    bool isFree = false;
    if (pageState == ZTH_MM_FREE) {
      jam();
      if ((pagePtr.p->pageWord[pageOffset] >> 16) == tupheadsize) {
        Uint32 nextTuple = pagePtr.p->pageWord[ZFREELIST_HEADER_POS] >> 16;
        while (nextTuple != 0) {
          jam();
          if (nextTuple == pageOffset) {
            jam();
            isFree = true;
            break;
          }
          nextTuple = pagePtr.p->pageWord[nextTuple] & 0xffff;
        }
      }
    }
    if (isFree) {
      jam();
      continue;
    }
    // TODO check for operation and return latest in own tx
    scan.m_state = ScanOp::Locked;
    break;
  }
}

void
Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
{
  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  conf->scanPtr = scanPtr.p->m_userPtr;
  conf->accOperationPtr = RNIL;
  conf->fragId = RNIL;
  unsigned signalLength = 3;
  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
      signal, signalLength, JBB);
  releaseScanOp(scanPtr);
}

void
Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
{
  FragrecordPtr fragPtr;
  fragPtr.i = scanPtr.p->m_fragPtrI[0];
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  fragPtr.p->m_scanList.release(scanPtr);
}

Thread
bk commit into 5.0 tree (pekka:1.1791)pekka7 Apr