List:Internals« Previous MessageNext Message »
From:pekka Date:November 10 2005 11:05am
Subject:bk commit into 5.1 tree (pekka:1.1952)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1952 05/11/10 12:05:29 pekka@stripped +12 -0
  ndb - wl#2455 tup scan: disk & locking

  storage/ndb/tools/select_all.cpp
    1.25 05/11/10 12:04:37 pekka@stripped +5 -4
    wl#2455 tup scan: disk & locking

  storage/ndb/test/ndbapi/testScan.cpp
    1.20 05/11/10 12:04:37 pekka@stripped +0 -11
    wl#2455 tup scan: disk & locking

  storage/ndb/src/ndbapi/NdbScanOperation.cpp
    1.74 05/11/10 12:04:37 pekka@stripped +11 -4
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/tsman.hpp
    1.2 05/11/10 12:04:37 pekka@stripped +13 -0
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/tsman.cpp
    1.2 05/11/10 12:04:37 pekka@stripped +51 -0
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/pgman.cpp
    1.2 05/11/10 12:04:37 pekka@stripped +2 -2
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
    1.25 05/11/10 12:04:37 pekka@stripped +2 -0
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
    1.7 05/11/10 12:04:37 pekka@stripped +690 -141
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
    1.23 05/11/10 12:04:37 pekka@stripped +17 -3
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
    1.3 05/11/10 12:04:37 pekka@stripped +3 -1
    wl#2455 tup scan: disk & locking

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.31 05/11/10 12:04:37 pekka@stripped +98 -23
    wl#2455 tup scan: disk & locking

  storage/ndb/include/kernel/signaldata/AccLock.hpp
    1.3 05/11/10 12:04:37 pekka@stripped +2 -1
    wl#2455 tup scan: disk & locking

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	clam.ndb.mysql.com
# Root:	/export/space/pekka/ndb/version/my51-ts

--- 1.2/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2005-11-08 17:30:36 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2005-11-10 12:04:37 +01:00
@@ -20,7 +20,8 @@
 Dbtup::Disk_alloc_info::Disk_alloc_info(const Tablerec* tabPtrP, 
 					Uint32 extent_size)
 {
-  m_curr_extent_info_ptr_i= RNIL; 
+  m_extent_size = extent_size;
+  m_curr_extent_info_ptr_i = RNIL; 
   if (tabPtrP->m_no_of_disk_attributes == 0)
     return;
   
@@ -278,6 +279,7 @@
       
       int pages= err;
       ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
+      ext.p->m_first_page_no = ext.p->m_key.m_page_no;
       bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
       ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages; 
       ext.p->m_free_page_count[0]= pages; // All pages are "free"-est

--- 1.1/storage/ndb/src/kernel/blocks/pgman.cpp	2005-11-07 12:19:14 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/pgman.cpp	2005-11-10 12:04:37 +01:00
@@ -1359,8 +1359,8 @@
 Pgman::fsreadreq(Signal* signal, Ptr<Page_entry> ptr)
 {
   File_map::ConstDataBufferIterator it;
-  m_file_map.first(it);
-  m_file_map.next(it, ptr.p->m_file_no);
+  bool ret = m_file_map.first(it) && m_file_map.next(it, ptr.p->m_file_no);
+  ndbrequire(ret);
   Uint32 fd = * it.data;
 
   ndbrequire(ptr.p->m_page_no > 0);

--- 1.1/storage/ndb/src/kernel/blocks/tsman.cpp	2005-11-07 12:19:15 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/tsman.cpp	2005-11-10 12:04:37 +01:00
@@ -1615,6 +1615,57 @@
 }
 
 int
+Tsman::get_page_free_bits(Signal* signal, Local_key *key, unsigned* bits)
+{
+  jamEntry();
+
+  /**
+   * XXX make into subroutine
+   */   
+  Ptr<Datafile> file_ptr;
+  Datafile file_key;
+  file_key.m_file_no = key->m_file_no;
+  ndbrequire(m_file_hash.find(file_ptr, file_key));
+
+  Uint32 size = file_ptr.p->m_extent_size;
+  Uint32 data_off = file_ptr.p->m_online.m_offset_data_pages;
+  Uint32 eh_words = File_formats::Datafile::extent_header_words(size);
+  Uint32 per_page = File_formats::Datafile::EXTENT_PAGE_WORDS/eh_words;
+  Uint32 SZ= File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
+  
+  Uint32 extent = (key->m_page_no - data_off) / size + per_page;
+  Uint32 page_no = extent / per_page;
+  Uint32 extent_no = extent % per_page;
+  
+  Page_cache_client::Request preq;
+  preq.m_page.m_page_no = page_no;
+  preq.m_page.m_file_no = key->m_file_no;
+  
+  /**
+   * Handling of unmapped extent header pages is not implemented
+   */
+  int flags = Page_cache_client::COMMIT_REQ;
+  int real_page_id;
+  if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
+  {
+    GlobalPage* ptr_p = m_page_cache_client.m_ptr.p;
+    
+    File_formats::Datafile::Extent_page* page = 
+      (File_formats::Datafile::Extent_page*)ptr_p;
+    File_formats::Datafile::Extent_header* header = 
+      page->get_header(extent_no, size);
+    
+    ndbrequire(header->m_table != RNIL);
+
+    Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
+    *bits = header->get_free_bits(page_no_in_extent);
+    return 0;
+  }
+  
+  return AllocExtentReq::UnmappedExtentPageIsNotImplemented;
+}
+
+int
 Tsman::unmap_page(Signal* signal, Local_key *key)
 {
   jamEntry();

--- 1.1/storage/ndb/src/kernel/blocks/tsman.hpp	2005-11-07 12:19:15 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/tsman.hpp	2005-11-10 12:04:37 +01:00
@@ -196,6 +196,7 @@
   void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>, 
 		       Uint32,Uint32,Uint32);
   int update_page_free_bits(Signal*, Local_key*, unsigned bits, Uint64 lsn);
+  int get_page_free_bits(Signal*, Local_key*, unsigned* bits);
   int unmap_page(Signal*, Local_key*);
   int restart_undo_page_free_bits(Signal*, Local_key*, unsigned, Uint64);
 
@@ -267,6 +268,11 @@
   int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
 
   /**
+   * Get page free bits
+   */
+  int get_page_free_bits(Local_key*, unsigned* bits);
+
+  /**
    * Update unlogged page free bit
    */
   int unmap_page(Local_key*);
@@ -350,6 +356,13 @@
 					 unsigned bits, Uint64 lsn)
 {
   return m_tsman->update_page_free_bits(m_signal, key, bits, lsn);
+}
+
+inline
+int
+Tablespace_client::get_page_free_bits(Local_key *key, unsigned* bits)
+{
+  return m_tsman->get_page_free_bits(m_signal, key, bits);
 }
 
 inline

--- 1.2/storage/ndb/include/kernel/signaldata/AccLock.hpp	2005-04-08 02:43:49 +02:00
+++ 1.3/storage/ndb/include/kernel/signaldata/AccLock.hpp	2005-11-10 12:04:37 +01:00
@@ -24,8 +24,9 @@
  * via ACCKEYCONF.
  */
 class AccLockReq {
-  friend class Dbtux;
   friend class Dbacc;
+  friend class Dbtup;
+  friend class Dbtux;
   friend bool printACC_LOCKREQ(FILE *, const Uint32*, Uint32, Uint16);
 public:
   enum RequestType {    // first byte

--- 1.30/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-11-07 13:28:15 +01:00
+++ 1.31/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-11-10 12:04:37 +01:00
@@ -235,8 +235,9 @@
 #define ZREL_FRAG 7
 #define ZREPORT_MEMORY_USAGE 8
 #define ZBUILD_INDEX 9
-#define ZFREE_EXTENT 10
-#define ZUNMAP_PAGES 11
+#define ZTUP_SCAN 10
+#define ZFREE_EXTENT 11
+#define ZUNMAP_PAGES 12
 
 #define ZSCAN_PROCEDURE 0
 #define ZCOPY_PROCEDURE 2
@@ -335,40 +336,104 @@
 };
 typedef Ptr<Fragoperrec> FragoperrecPtr;
 
-  // Position for use by scan
-  struct PagePos {
+
+  typedef Tup_page Page;
+  typedef Ptr<Page> PagePtr;
+
+  // Scan position
+  struct ScanPos {
+    enum Get {
+      Get_undef = 0,
+      Get_next_page,
+      Get_page,
+      Get_next_page_mm,
+      Get_page_mm,
+      Get_next_page_dd,
+      Get_page_dd,
+      Get_next_tuple,
+      Get_tuple,
+      Get_next_tuple_fs,
+      Get_tuple_fs,
+      Get_next_tuple_vs,
+      Get_tuple_vs
+    };
+    Get m_get;                  // entry point in scanNext
+    Local_key m_key;            // scan position pointer MM or DD
+    Page* m_page;               // scanned MM or DD (cache) page
+    Local_key m_key_mm;         // MM local key returned
+    Uint32 m_realpid_mm;        // MM real page id
     Uint32 m_extent_info_ptr_i;
-    Local_key m_key;
-    bool m_match;
   };
 
-  // Tup scan op (compare Dbtux::ScanOp)
+  // Scan Lock
+  struct ScanLock {
+    Uint32 m_accLockOp;
+    union {
+      Uint32 nextPool;
+      Uint32 nextList;
+    };
+    Uint32 prevList;
+  };
+  typedef Ptr<ScanLock> ScanLockPtr;
+  ArrayPool<ScanLock> c_scanLockPool;
+
+  // Tup scan, similar to Tux scan.  Later some of this could
+  // be moved to common superclass.
   struct ScanOp {
-    ScanOp() {}
-    enum {      // state
+    ScanOp() :
+      m_state(Undef),
+      m_bits(0),
+      m_userPtr(RNIL),
+      m_userRef(RNIL),
+      m_tableId(RNIL),
+      m_fragId(~(Uint32)0),
+      m_fragPtrI(RNIL),
+      m_transId1(0),
+      m_transId2(0),
+      m_savePointId(0),
+      m_accLockOp(RNIL)
+    {}
+
+    enum State {
       Undef = 0,
       First = 1,                // before first entry
-      Locked = 4,               // at current entry (no lock needed)
+      Current = 2,              // at current before locking
+      Blocked = 3,              // at current waiting for ACC lock
+      Locked = 4,               // at current and locked or no lock needed
       Next = 5,                 // looking for next extry
       Last = 6,                 // after last entry
+      Aborting = 7,             // lock wait at scan close
       Invalid = 9               // cannot return REF to LQH currently
     };
     Uint16 m_state;
 
-    STATIC_CONST( SCAN_DD    = 0x1 );
-    STATIC_CONST( SCAN_VS    = 0x2 );
-    STATIC_CONST( SCAN_LCP   = 0x4 );
-    STATIC_CONST( SCAN_DD_VS = 0x8 );
+    enum Bits {
+      SCAN_DD        = 0x01,        // scan disk pages
+      SCAN_VS        = 0x02,        // page format is var size
+      SCAN_LCP       = 0x04,        // LCP mem page scan
+      SCAN_LOCK_SH   = 0x10,        // lock mode shared
+      SCAN_LOCK_EX   = 0x20,        // lock mode exclusive
+      SCAN_LOCK_WAIT = 0x40,        // lock wait
+      // any lock mode
+      SCAN_LOCK      = SCAN_LOCK_SH | SCAN_LOCK_EX
+    };
     Uint16 m_bits;
     
     Uint32 m_userPtr;           // scanptr.i in LQH
     Uint32 m_userRef;
     Uint32 m_tableId;
-    Uint32 m_fragId;            // "base" fragment id
+    Uint32 m_fragId;
     Uint32 m_fragPtrI;
     Uint32 m_transId1;
     Uint32 m_transId2;
-    PagePos m_scanPos;
+    Uint32 m_savePointId;
+    // lock waited for or obtained and not yet passed to LQH
+    Uint32 m_accLockOp;
+
+    ScanPos m_scanPos;
+
+    DLFifoList<ScanLock>::Head m_accLockOps;
+
     union {
     Uint32 nextPool;
     Uint32 nextList;
@@ -378,8 +443,18 @@
   typedef Ptr<ScanOp> ScanOpPtr;
   ArrayPool<ScanOp> c_scanOpPool;
 
-  typedef Tup_page Page;
-  typedef Ptr<Page> PagePtr;
+  void scanReply(Signal*, ScanOpPtr scanPtr);
+  void scanFirst(Signal*, ScanOpPtr scanPtr);
+  bool scanNext(Signal*, ScanOpPtr scanPtr);
+  void scanCont(Signal*, ScanOpPtr scanPtr);
+  void disk_page_tup_scan_callback(Signal*, Uint32 scanPtrI, Uint32 page_i);
+  void scanClose(Signal*, ScanOpPtr scanPtr);
+  void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
+  void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
+  void releaseScanOp(ScanOpPtr& scanPtr);
+
+  // for md5 of key (could maybe reuse existing temp buffer)
+  Uint64 c_dataBuffer[ZWORDS_ON_PAGE/2 + 1];
 
   struct Page_request 
   {
@@ -409,6 +484,7 @@
 
   struct Extent_info : public Extent_list_t
   {
+    Uint32 m_first_page_no;
     Local_key m_key;
     Uint32 m_free_space;
     Uint32 m_free_matrix_pos;
@@ -439,6 +515,7 @@
     Disk_alloc_info() {}
     Disk_alloc_info(const Tablerec* tabPtrP, 
 		    Uint32 extent_size_in_pages);
+    Uint32 m_extent_size;
     
     /**
      * Disk allocation
@@ -529,11 +606,6 @@
 };
 typedef Ptr<Fragrecord> FragrecordPtr;
 
-  void scanFirst(Signal* signal, Fragrecord*, ScanOpPtr scanPtr);
-  void scanNext(Signal* signal, Fragrecord*, ScanOpPtr scanPtr);
-  void scanClose(Signal* signal, ScanOpPtr scanPtr);
-  void releaseScanOp(ScanOpPtr& scanPtr);
-
 
 struct Operationrec {
   /*
@@ -1332,6 +1404,9 @@
   void execACC_SCANREQ(Signal* signal);
   void execNEXT_SCANREQ(Signal* signal);
   void execACC_CHECK_SCAN(Signal* signal);
+  void execACCKEYCONF(Signal* signal);
+  void execACCKEYREF(Signal* signal);
+  void execACC_ABORTCONF(Signal* signal);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------

--- 1.22/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2005-11-07 12:19:07 +01:00
+++ 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2005-11-10 12:04:37 +01:00
@@ -55,11 +55,11 @@
 Dbtup::Dbtup(const class Configuration & conf, Pgman* pgman)
   : SimulatedBlock(DBTUP, conf),
     c_lqh(0),
+    m_pgman(this, pgman),
+    c_extent_hash(c_extent_pool),
     c_storedProcPool(),
     c_buildIndexList(c_buildIndexPool),
-    c_undo_buffer(this),
-    m_pgman(this, pgman),
-    c_extent_hash(c_extent_pool)
+    c_undo_buffer(this)
 {
   BLOCK_CONSTRUCTOR(Dbtup);
 
@@ -101,6 +101,9 @@
   addRecSignal(GSN_ACC_SCANREQ, &Dbtup::execACC_SCANREQ);
   addRecSignal(GSN_NEXT_SCANREQ, &Dbtup::execNEXT_SCANREQ);
   addRecSignal(GSN_ACC_CHECK_SCAN, &Dbtup::execACC_CHECK_SCAN);
+  addRecSignal(GSN_ACCKEYCONF, &Dbtup::execACCKEYCONF);
+  addRecSignal(GSN_ACCKEYREF, &Dbtup::execACCKEYREF);
+  addRecSignal(GSN_ACC_ABORTCONF, &Dbtup::execACC_ABORTCONF);
 
   attrbufrec = 0;
   fragoperrec = 0;
@@ -197,6 +200,14 @@
     ljam();
     buildIndex(signal, dataPtr);
     break;
+  case ZTUP_SCAN:
+    ljam();
+    {
+      ScanOpPtr scanPtr;
+      c_scanOpPool.getPtr(scanPtr, dataPtr);
+      scanCont(signal, scanPtr);
+    }
+    return;
   case ZFREE_EXTENT:
   {
     ljam();
@@ -310,6 +321,9 @@
   Uint32 nScanOp;       // use TUX config for now
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
   c_scanOpPool.setSize(nScanOp + 1);
+  Uint32 nScanBatch;
+  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch));
+  c_scanLockPool.setSize(nScanOp * nScanBatch);
 
   ScanOpPtr lcp;
   ndbrequire(c_scanOpPool.seize(lcp));

--- 1.24/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2005-11-08 06:50:43 +01:00
+++ 1.25/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp	2005-11-10 12:04:37 +01:00
@@ -339,11 +339,13 @@
 #endif
   }
   
+#ifndef NDB_NO_O_DIRECT  /* to allow tmpfs */
 #ifdef O_DIRECT
   if (flags & FsOpenReq::OM_DIRECT) 
   {
     new_flags |= O_DIRECT;
   }
+#endif
 #endif
   
   switch(flags & 0x3){

--- 1.73/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2005-11-07 12:19:09 +01:00
+++ 1.74/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2005-11-10 12:04:37 +01:00
@@ -161,6 +161,16 @@
   }
 
   m_keyInfo = lockExcl ? 1 : 0;
+  bool tupScan = (scan_flags & SF_TupScan);
+
+#if 1 // XXX temp for testing
+  { char* p = getenv("NDB_USE_TUPSCAN");
+    if (p != 0) {
+      unsigned n = atoi(p); // 0-10
+      if (::time(0) % 10 < n) tupScan = true;
+    }
+  }
+#endif
 
   bool rangeScan = false;
   if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
@@ -176,11 +186,8 @@
     theStatus = GetValue;
     theOperationType  = OpenRangeScanRequest;
     rangeScan = true;
-  }
-
-  bool tupScan = (scan_flags & SF_TupScan);
-  if (tupScan && rangeScan)
     tupScan = false;
+  }
   
   theParallelism = parallel;
 

--- 1.19/storage/ndb/test/ndbapi/testScan.cpp	2005-05-17 10:10:43 +02:00
+++ 1.20/storage/ndb/test/ndbapi/testScan.cpp	2005-11-10 12:04:37 +01:00
@@ -1162,17 +1162,6 @@
   STEP(runScanReadCommitted);
   FINALIZER(runClearTable);
 }
-TESTCASE("ScanTupReadCommitted240", 
-	 "Verify scan requirement: It should be possible to scan read committed with "\
-	 "parallelism, test with parallelism 240(240 would automatically be "\
-	 "downgraded to the maximum parallelism value for the current config). "\
-         "Scans TUP pages directly without using ACC."){
-  INITIALIZER(runLoadTable);
-  TC_PROPERTY("Parallelism", 240);
-  TC_PROPERTY("TupScan", 1);
-  STEP(runScanReadCommitted);
-  FINALIZER(runClearTable);
-}
 TESTCASE("ScanUpdate", 
 	 "Verify scan requirement: It should be possible "\
 	 "to update all records in a table without knowing their"\

--- 1.24/storage/ndb/tools/select_all.cpp	2005-11-07 12:19:10 +01:00
+++ 1.25/storage/ndb/tools/select_all.cpp	2005-11-10 12:04:37 +01:00
@@ -222,12 +222,14 @@
     }
 
     int rs;
+    unsigned scan_flags = 0;
+    if (_tup) scan_flags |= NdbScanOperation::SF_TupScan;
     switch(_lock + (3 * order)){
     case 1:
-      rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallel);
+      rs = pOp->readTuples(NdbScanOperation::LM_Read, scan_flags, parallel);
       break;
     case 2:
-      rs = pOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallel);
+      rs = pOp->readTuples(NdbScanOperation::LM_Exclusive, scan_flags, parallel);
       break;
     case 3:
       rs = pIOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallel, 
@@ -241,8 +243,7 @@
       break;
     case 0:
     default:
-      rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 
-			   _tup ? NdbScanOperation::SF_TupScan : 0, parallel);
+      rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, scan_flags, parallel);
       break;
     }
     if( rs != 0 ){

--- 1.6/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2005-11-07 12:19:07 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2005-11-10 12:04:37 +01:00
@@ -18,12 +18,20 @@
 #include "Dbtup.hpp"
 #include <signaldata/AccScan.hpp>
 #include <signaldata/NextScan.hpp>
+#include <signaldata/AccLock.hpp>
+#include <md5_hash.hpp>
 
 #undef jam
 #undef jamEntry
 #define jam() { jamLine(32000 + __LINE__); }
 #define jamEntry() { jamEntryLine(32000 + __LINE__); }
 
+#ifdef VM_TRACE
+#define dbg(x) globalSignalLoggers.log x
+#else
+#define dbg(x)
+#endif
+
 void
 Dbtup::execACC_SCANREQ(Signal* signal)
 {
@@ -33,7 +41,7 @@
   ScanOpPtr scanPtr;
   scanPtr.i = RNIL;
   do {
-    // find table and fragments
+    // find table and fragment
     TablerecPtr tablePtr;
     tablePtr.i = req->tableId;
     ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
@@ -43,49 +51,44 @@
     getFragmentrec(fragPtr, fragId, tablePtr.p);
     ndbrequire(fragPtr.i != RNIL);
     Fragrecord& frag = *fragPtr.p;
-    // seize from pool and link to per-fragment list
-
-    Uint32 bits= 0;
-    if(frag.m_lcp_scan_op != RNIL)
-    {
-      bits |= ScanOp::SCAN_LCP;
-      ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
-      c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
-    }
-    else 
-    {
+    // flags
+    Uint32 bits = 0;
+    if (frag.m_lcp_scan_op == RNIL) {
+      // seize from pool and link to per-fragment list
       LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
-      if (! list.seize(scanPtr)) 
-      {
+      if (! list.seize(scanPtr)) {
 	jam();
 	break;
       }
+      if (tablePtr.p->m_attributes[DD].m_no_of_fixsize +
+          tablePtr.p->m_attributes[DD].m_no_of_varsize > 0) {
+        bits |= ScanOp::SCAN_DD;
+      }
+      bool mm = (bits & ScanOp::SCAN_DD);
+      if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
+        // disk pages have fixed page format
+        if (! (bits & ScanOp::SCAN_DD))
+          bits |= ScanOp::SCAN_VS;
+      }
+      if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
+        if (AccScanReq::getLockMode(req->requestInfo) == 0)
+          bits |= ScanOp::SCAN_LOCK_SH;
+        else
+          bits |= ScanOp::SCAN_LOCK_EX;
+      }
+    } else {
+      jam();
+      ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
+      c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
+      bits |= ScanOp::SCAN_LCP;
+      if (tablePtr.p->m_attributes[MM].m_no_of_varsize > 0) {
+        bits |= ScanOp::SCAN_VS;
+      }
     }
+    // set up scan op
     new (scanPtr.p) ScanOp();
     ScanOp& scan = *scanPtr.p;
     scan.m_state = ScanOp::First;
-    // TODO scan disk only if any scanned attribute on disk
-
-    if(! (bits & ScanOp::SCAN_LCP))
-    {
-      /**
-       * Remove this until disk scan has been implemented
-       */
-      if(tablePtr.p->m_attributes[DD].m_no_of_fixsize > 0 ||
-	 tablePtr.p->m_attributes[DD].m_no_of_varsize > 0)
-      {
-	bits |= ScanOp::SCAN_DD;
-	
-	if (tablePtr.p->m_attributes[DD].m_no_of_varsize > 0)
-	  bits |= ScanOp::SCAN_DD_VS;
-      }
-    }
-    
-    if(tablePtr.p->m_attributes[MM].m_no_of_varsize)
-    {
-      bits |= ScanOp::SCAN_VS;
-    }
-
     scan.m_bits = bits;
     scan.m_userPtr = req->senderData;
     scan.m_userRef = req->senderRef;
@@ -94,18 +97,17 @@
     scan.m_fragPtrI = fragPtr.i;
     scan.m_transId1 = req->transId1;
     scan.m_transId2 = req->transId2;
+    scan.m_savePointId = req->savePointId;
     // conf
     AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
     conf->scanPtr = req->senderData;
     conf->accPtr = scanPtr.i;
     conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
-    sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal,
-        AccScanConf::SignalLength, JBB);
-
+    sendSignal(req->senderRef, GSN_ACC_SCANCONF,
+        signal, AccScanConf::SignalLength, JBB);
     return;
   } while (0);
-  if (scanPtr.i != RNIL)
-  {
+  if (scanPtr.i != RNIL) {
     jam();
     releaseScanOp(scanPtr);
   }
@@ -129,10 +131,21 @@
     break;
   case NextScanReq::ZSCAN_NEXT_COMMIT:
     jam();
-    break;
   case NextScanReq::ZSCAN_COMMIT:
     jam();
-    {
+    if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
+      jam();
+      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+      lockReq->returnCode = RNIL;
+      lockReq->requestInfo = AccLockReq::Unlock;
+      lockReq->accOpPtr = req->accOperationPtr;
+      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
+          signal, AccLockReq::UndoSignalLength);
+      jamEntry();
+      ndbrequire(lockReq->returnCode == AccLockReq::Success);
+      removeAccLockOp(scan, req->accOperationPtr);
+    }
+    if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
       NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
       conf->scanPtr = scan.m_userPtr;
       unsigned signalLength = 1;
@@ -143,6 +156,35 @@
     break;
   case NextScanReq::ZSCAN_CLOSE:
     jam();
+    if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
+      jam();
+      ndbrequire(scan.m_accLockOp != RNIL);
+      // use ACC_ABORTCONF to flush out any reply in job buffer
+      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+      lockReq->returnCode = RNIL;
+      lockReq->requestInfo = AccLockReq::AbortWithConf;
+      lockReq->accOpPtr = scan.m_accLockOp;
+      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
+          signal, AccLockReq::UndoSignalLength);
+      jamEntry();
+      ndbrequire(lockReq->returnCode == AccLockReq::Success);
+      scan.m_state = ScanOp::Aborting;
+      return;
+    }
+    if (scan.m_state == ScanOp::Locked) {
+      jam();
+      ndbrequire(scan.m_accLockOp != RNIL);
+      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+      lockReq->returnCode = RNIL;
+      lockReq->requestInfo = AccLockReq::Unlock;
+      lockReq->accOpPtr = scan.m_accLockOp;
+      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
+          signal, AccLockReq::UndoSignalLength);
+      jamEntry();
+      ndbrequire(lockReq->returnCode == AccLockReq::Success);
+      scan.m_accLockOp = RNIL;
+    }
+    scan.m_state = ScanOp::Aborting;
     scanClose(signal, scanPtr);
     return;
   case NextScanReq::ZSCAN_NEXT_ABORT:
@@ -169,6 +211,7 @@
   ScanOpPtr scanPtr;
   c_scanOpPool.getPtr(scanPtr, req->accPtr);
   ScanOp& scan = *scanPtr.p;
+  // fragment
   FragrecordPtr fragPtr;
   fragPtr.i = scan.m_fragPtrI;
   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -181,28 +224,161 @@
     jamEntry();
     return;
   }
+  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
+    jam();
+    // LQH asks if we are waiting for lock and we tell it to ask again
+    NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+    conf->scanPtr = scan.m_userPtr;
+    conf->accOperationPtr = RNIL;       // no tuple returned
+    conf->fragId = frag.fragmentId;
+    unsigned signalLength = 3;
+    // if TC has ordered scan close, it will be detected here
+    sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
+        signal, signalLength, JBB);
+    return;     // stop
+  }
   if (scan.m_state == ScanOp::First) {
     jam();
-    scanFirst(signal, fragPtr.p, scanPtr);
+    scanFirst(signal, scanPtr);
   }
   if (scan.m_state == ScanOp::Next) {
     jam();
-    scanNext(signal, fragPtr.p, scanPtr);
+    bool immediate = scanNext(signal, scanPtr);
+    if (! immediate) {
+      jam();
+      // time-slicing via TUP or PGMAN
+      return;
+    }
+  }
+  scanReply(signal, scanPtr);
+}
+
+void
+Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
+{
+  ScanOp& scan = *scanPtr.p;
+  FragrecordPtr fragPtr;
+  fragPtr.i = scan.m_fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  Fragrecord& frag = *fragPtr.p;
+  // for reading tuple key in Current state
+  Uint32* pkData = (Uint32*)c_dataBuffer;
+  unsigned pkSize = 0;
+  if (scan.m_state == ScanOp::Current) {
+    // found an entry to return
+    jam();
+    ndbrequire(scan.m_accLockOp == RNIL);
+    if (scan.m_bits & ScanOp::SCAN_LOCK) {
+      jam();
+      // read tuple key - use TUX routine
+      const ScanPos& pos = scan.m_scanPos;
+      const Local_key& key_mm = pos.m_key_mm;
+      int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
+          pkData, false);
+      ndbrequire(ret > 0);
+      pkSize = ret;
+      dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
+      // get read lock or exclusive lock
+      AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+      lockReq->returnCode = RNIL;
+      lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
+        AccLockReq::LockShared : AccLockReq::LockExclusive;
+      lockReq->accOpPtr = RNIL;
+      lockReq->userPtr = scanPtr.i;
+      lockReq->userRef = reference();
+      lockReq->tableId = scan.m_tableId;
+      lockReq->fragId = frag.fragmentId;
+      lockReq->fragPtrI = RNIL; // no cached frag ptr yet
+      lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
+      lockReq->tupAddr = key_mm.ref();
+      lockReq->transId1 = scan.m_transId1;
+      lockReq->transId2 = scan.m_transId2;
+      EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
+          signal, AccLockReq::LockSignalLength);
+      jamEntry();
+      switch (lockReq->returnCode) {
+      case AccLockReq::Success:
+        jam();
+        scan.m_state = ScanOp::Locked;
+        scan.m_accLockOp = lockReq->accOpPtr;
+        break;
+      case AccLockReq::IsBlocked:
+        jam();
+        // normal lock wait
+        scan.m_state = ScanOp::Blocked;
+        scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
+        scan.m_accLockOp = lockReq->accOpPtr;
+        // LQH will wake us up
+        signal->theData[0] = scan.m_userPtr;
+        signal->theData[1] = true;
+        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
+        jamEntry();
+        return;
+        break;
+      case AccLockReq::Refused:
+        jam();
+        // we cannot see deleted tuple (assert only)
+        ndbassert(false);
+        // skip it
+        scan.m_state = ScanOp::Next;
+        signal->theData[0] = scan.m_userPtr;
+        signal->theData[1] = true;
+        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
+        jamEntry();
+        return;
+        break;
+      case AccLockReq::NoFreeOp:
+        jam();
+        // max ops should depend on max scans (assert only)
+        ndbassert(false);
+        // stay in Current state
+        scan.m_state = ScanOp::Current;
+        signal->theData[0] = scan.m_userPtr;
+        signal->theData[1] = true;
+        EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
+        jamEntry();
+        return;
+        break;
+      default:
+        ndbrequire(false);
+        break;
+      }
+    } else {
+      scan.m_state = ScanOp::Locked;
+    }
   }
   if (scan.m_state == ScanOp::Locked) {
+    // we have lock or do not need one
     jam();
-    const PagePos& pos = scan.m_scanPos;
+    // conf signal
     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
     conf->scanPtr = scan.m_userPtr;
-    conf->accOperationPtr = (Uint32)-1; // no lock returned
+    // the lock is passed to LQH
+    Uint32 accLockOp = scan.m_accLockOp;
+    if (accLockOp != RNIL) {
+      scan.m_accLockOp = RNIL;
+      // remember it until LQH unlocks it
+      addAccLockOp(scan, accLockOp);
+    } else {
+      ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
+      // operation RNIL in LQH would signal no tuple returned
+      accLockOp = (Uint32)-1;
+    }
+    const ScanPos& pos = scan.m_scanPos;
+    conf->accOperationPtr = accLockOp;
     conf->fragId = frag.fragmentId;
-    conf->localKey[0] = pos.m_key.ref();
+    conf->localKey[0] = pos.m_key_mm.ref();
     conf->localKey[1] = 0;
     conf->localKeyLength = 1;
     unsigned signalLength = 6;
-    Uint32 blockNo = refToBlock(scan.m_userRef);
-    EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
-    jamEntry();
+    if (scan.m_bits & ScanOp::SCAN_LOCK) {
+      sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
+          signal, signalLength, JBB);
+    } else {
+      Uint32 blockNo = refToBlock(scan.m_userRef);
+      EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
+      jamEntry();
+    }
     // next time look for next entry
     scan.m_state = ScanOp::Next;
     return;
@@ -222,122 +398,461 @@
   ndbrequire(false);
 }
 
+/*
+ * Lock succeeded (after delay) in ACC.  If the lock is for current
+ * entry, set state to Locked.  If the lock is for an entry we were
+ * moved away from, simply unlock it.  Finally, if we are closing the
+ * scan, do nothing since we have already sent an abort request.
+ */
 void
-Dbtup::scanFirst(Signal*, Fragrecord* fragPtrP, ScanOpPtr scanPtr)
+Dbtup::execACCKEYCONF(Signal* signal)
 {
+  jamEntry();
+  ScanOpPtr scanPtr;
+  scanPtr.i = signal->theData[0];
+  c_scanOpPool.getPtr(scanPtr);
   ScanOp& scan = *scanPtr.p;
-  // set to first fragment, first page, first tuple
-  const Uint32 first_page_idx = scan.m_bits & ScanOp::SCAN_VS ? 1 : 0;
-  PagePos& pos = scan.m_scanPos;
-  pos.m_key.m_page_no = 0;
-  pos.m_key.m_page_idx = first_page_idx;
-  // just before
-  pos.m_match = false;
-  // let scanNext() do the work
-  scan.m_state = ScanOp::Next;
+  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
+  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
+  if (scan.m_state == ScanOp::Blocked) {
+    // the lock wait was for current entry
+    jam();
+    scan.m_state = ScanOp::Locked;
+    // LQH has the ball
+    return;
+  }
+  if (scan.m_state != ScanOp::Aborting) {
+    // we were moved, release lock
+    jam();
+    AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+    lockReq->returnCode = RNIL;
+    lockReq->requestInfo = AccLockReq::Unlock;
+    lockReq->accOpPtr = scan.m_accLockOp;
+    EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
+    jamEntry();
+    ndbrequire(lockReq->returnCode == AccLockReq::Success);
+    scan.m_accLockOp = RNIL;
+    // LQH has the ball
+    return;
+  }
+  // lose the lock
+  scan.m_accLockOp = RNIL;
+  // continue at ACC_ABORTCONF
+}
 
-  if (scan.m_bits & ScanOp::SCAN_DD)
-  {
-    pos.m_extent_info_ptr_i = 
-      fragPtrP->m_disk_alloc_info.m_extent_list.firstItem;
+/*
+ * Lock failed (after delay) in ACC.  Probably means somebody ahead of
+ * us in lock queue deleted the tuple.
+ */
+void
+Dbtup::execACCKEYREF(Signal* signal)
+{
+  jamEntry();
+  ScanOpPtr scanPtr;
+  scanPtr.i = signal->theData[0];
+  c_scanOpPool.getPtr(scanPtr);
+  ScanOp& scan = *scanPtr.p;
+  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
+  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
+  if (scan.m_state != ScanOp::Aborting) {
+    jam();
+    // release the operation
+    AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+    lockReq->returnCode = RNIL;
+    lockReq->requestInfo = AccLockReq::Abort;
+    lockReq->accOpPtr = scan.m_accLockOp;
+    EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
+    jamEntry();
+    ndbrequire(lockReq->returnCode == AccLockReq::Success);
+    scan.m_accLockOp = RNIL;
+    // scan position should already have been moved (assert only)
+    if (scan.m_state == ScanOp::Blocked) {
+      jam();
+      ndbassert(false);
+      scan.m_state = ScanOp::Next;
+    }
+    // LQH has the ball
+    return;
+  }
+  // lose the lock
+  scan.m_accLockOp = RNIL;
+  // continue at ACC_ABORTCONF
+}
+
+/*
+ * Received when scan is closing.  This signal arrives after any
+ * ACCKEYCON or ACCKEYREF which may have been in job buffer.
+ */
+void
+Dbtup::execACC_ABORTCONF(Signal* signal)
+{
+  jamEntry();
+  ScanOpPtr scanPtr;
+  scanPtr.i = signal->theData[0];
+  c_scanOpPool.getPtr(scanPtr);
+  ScanOp& scan = *scanPtr.p;
+  ndbrequire(scan.m_state == ScanOp::Aborting);
+  // most likely we are still in lock wait
+  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
+    jam();
+    scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
+    scan.m_accLockOp = RNIL;
   }
+  scanClose(signal, scanPtr);
 }
 
 void
-Dbtup::scanNext(Signal* signal, Fragrecord* fragPtrP, ScanOpPtr scanPtr)
+Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
 {
   ScanOp& scan = *scanPtr.p;
-  PagePos& pos = scan.m_scanPos;
-  Uint32 bits = scan.m_bits;
+  ScanPos& pos = scan.m_scanPos;
   Local_key& key = pos.m_key;
+  const Uint32 bits = scan.m_bits;
+  // fragment
+  FragrecordPtr fragPtr;
+  fragPtr.i = scan.m_fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  Fragrecord& frag = *fragPtr.p;
+  // in the future should not pre-allocate pages
+  if (frag.noOfPages == 0) {
+    jam();
+    scan.m_state = ScanOp::Last;
+    return;
+  }
+  if (! (bits & ScanOp::SCAN_DD)) {
+    key.m_file_no = ZNIL;
+    key.m_page_no = 0;
+    pos.m_get = ScanPos::Get_page_mm;
+    // for MM scan real page id is cached for efficiency
+    pos.m_realpid_mm = RNIL;
+  } else {
+    Disk_alloc_info& alloc = frag.m_disk_alloc_info;
+    // for now must check disk part explicitly
+    if (alloc.m_extent_list.firstItem == RNIL) {
+      jam();
+      scan.m_state = ScanOp::Last;
+      return;
+    }
+    pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
+    Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
+    key.m_file_no = ext->m_key.m_file_no;
+    key.m_page_no = ext->m_first_page_no;
+    pos.m_get = ScanPos::Get_page_dd;
+  }
+  key.m_page_idx = (bits & ScanOp::SCAN_VS ? 1 : 0);
+  // let scanNext() do the work
+  scan.m_state = ScanOp::Next;
+}
+
+bool
+Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
+{
+  ScanOp& scan = *scanPtr.p;
+  ScanPos& pos = scan.m_scanPos;
+  Local_key& key = pos.m_key;
+  const Uint32 bits = scan.m_bits;
+  // table
   TablerecPtr tablePtr;
   tablePtr.i = scan.m_tableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-  Fragrecord& frag = *fragPtrP;
-  const Uint32 first_page_idx = bits & ScanOp::SCAN_VS ? 1 : 0;
+  Tablerec& table = *tablePtr.p;
+  // fragment
+  FragrecordPtr fragPtr;
+  fragPtr.i = scan.m_fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  Fragrecord& frag = *fragPtr.p;
+  // tuple found
+  Tuple_header* th = 0;
+  Uint32 loop_count = 0;
   while (true) {
-    // TODO time-slice here after X loops
-    jam();
-    // get page
-    PagePtr pagePtr;
-    if (key.m_page_no >= frag.noOfPages) {
+    switch (pos.m_get) {
+    case ScanPos::Get_next_page:
+      // move to next page
       jam();
-      scan.m_state = ScanOp::Last;
-      break;
-    }
-    Uint32 realPageId = getRealpid(fragPtrP, key.m_page_no);
-    pagePtr.i = realPageId;
-    ptrCheckGuard(pagePtr, cnoOfPage, cpage);
-    Uint32 pageState = pagePtr.p->page_state;
-    // skip empty page
-    if (pageState == ZEMPTY_MM) {
-      jam();
-      key.m_page_no++;
-      key.m_page_idx = first_page_idx;
-      pos.m_match = false;
+      {
+        if (! (bits & ScanOp::SCAN_DD))
+          pos.m_get = ScanPos::Get_next_page_mm;
+        else
+          pos.m_get = ScanPos::Get_next_page_dd;
+      }
       continue;
-    }
-    // get next tuple
-    const Tuple_header* th = 0;
-    if (! (bits & ScanOp::SCAN_VS)) {
-      Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size;
-      if (pos.m_match)
-        key.m_page_idx += tupheadsize;
-      pos.m_match = true;
-      if (key.m_page_idx + tupheadsize > Fix_page::DATA_WORDS) {
-        jam();
+    case ScanPos::Get_page:
+      // get real page
+      jam();
+      {
+        if (! (bits & ScanOp::SCAN_DD))
+          pos.m_get = ScanPos::Get_page_mm;
+        else
+          pos.m_get = ScanPos::Get_page_dd;
+      }
+      continue;
+    case ScanPos::Get_next_page_mm:
+      // move to next logical TUP page
+      jam();
+      {
         key.m_page_no++;
-        key.m_page_idx = first_page_idx;
-        pos.m_match = false;
-        continue;
-      }
-      th = (Tuple_header*)&pagePtr.p->m_data[key.m_page_idx];
-      // skip over free tuple
-      if (th->m_header_bits & Tuple_header::FREE) {
+        if (key.m_page_no >= frag.noOfPages) {
           jam();
-          continue;
+          // no more pages, scan ends
+          pos.m_get = ScanPos::Get_undef;
+          scan.m_state = ScanOp::Last;
+          return true;
+        }
+        key.m_page_idx = (bits & ScanOp::SCAN_VS ? 1 : 0);
+        pos.m_get = ScanPos::Get_page_mm;
+        // clear cached value
+        pos.m_realpid_mm = RNIL;
       }
-    } else {
-      Var_page* page_ptr = (Var_page*)pagePtr.p;
-      if (pos.m_match)
-        key.m_page_idx += 1;
-      pos.m_match = true;
-      if (key.m_page_idx >= page_ptr->high_index) {
-        jam();
+      /*FALLTHRU*/
+    case ScanPos::Get_page_mm:
+      // get TUP real page
+      jam();
+      {
+        if (pos.m_realpid_mm == RNIL) {
+          jam();
+          pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
+        }
+        PagePtr pagePtr;
+        pagePtr.i = pos.m_realpid_mm;
+        ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+        if (pagePtr.p->page_state == ZEMPTY_MM) {
+          // skip empty page
+          jam();
+          pos.m_get = ScanPos::Get_next_page_mm;
+          break; // incr loop count
+        }
+        pos.m_page = pagePtr.p;
+        pos.m_get = ScanPos::Get_tuple;
+      }
+      continue;
+    case ScanPos::Get_next_page_dd:
+      // move to next disk page
+      jam();
+      {
+        Disk_alloc_info& alloc = frag.m_disk_alloc_info;
+        LocalSLList<Extent_info, Extent_list_t>
+          list(c_extent_pool, alloc.m_extent_list);
+        Ptr<Extent_info> ext_ptr;
+        c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
+        Extent_info* ext = ext_ptr.p;
         key.m_page_no++;
-        key.m_page_idx = first_page_idx;
-        pos.m_match = false;
-        continue;
+        if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
+          // no more pages in this extent
+          jam();
+          if (! list.next(ext_ptr)) {
+            // no more extents, scan ends
+            jam();
+            pos.m_get = ScanPos::Get_undef;
+            scan.m_state = ScanOp::Last;
+            return true;
+          } else {
+            // move to next extent
+            jam();
+            pos.m_extent_info_ptr_i = ext_ptr.i;
+            Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
+            key.m_file_no = ext->m_key.m_file_no;
+            key.m_page_no = ext->m_first_page_no;
+          }
+        }
+        key.m_page_idx = (bits & ScanOp::SCAN_VS ? 1 : 0);
+        pos.m_get = ScanPos::Get_page_dd;
       }
-
-      Uint32 len= page_ptr->get_entry_len(key.m_page_idx);
-      if (len == 0)
+      /*FALLTHRU*/
+    case ScanPos::Get_page_dd:
+      // get global page in PGMAN cache
+      jam();
       {
-        // skip empty slot or 
-        jam();
-        continue;
+        // check if page is un-allocated or empty
+        Tablespace_client tsman(signal, c_tsman,
+            frag.fragTableId, frag.fragmentId, frag.m_tablespace_id);
+        unsigned bits = ~(unsigned)0;
+        int ret = tsman.get_page_free_bits(&key, &bits);
+        ndbrequire(ret == 0);
+        if (bits == 0) {
+          // skip empty page
+          jam();
+          pos.m_get = ScanPos::Get_next_page_dd;
+          break; // incr loop count
+        }
+        // page request to PGMAN
+        Page_cache_client::Request preq;
+        preq.m_page = pos.m_key;
+        preq.m_callback.m_callbackData = scanPtr.i;
+        preq.m_callback.m_callbackFunction =
+          safe_cast(&Dbtup::disk_page_tup_scan_callback);
+        int flags = 0;
+        int res = m_pgman.get_page(signal, preq, flags);
+        if (res == 0) {
+          jam();
+          // request queued
+          pos.m_get = ScanPos::Get_tuple;
+          return false;
+        }
+        ndbrequire(res > 0);
+        pos.m_page = (Page*)m_pgman.m_ptr.p;
       }
-      if(len & Var_page::CHAIN)
+      pos.m_get = ScanPos::Get_tuple;
+      continue;
+    case ScanPos::Get_next_tuple:
+      // move to next tuple
+      jam();
       {
-	// skip varpart chain
-	jam();
-	continue;
+        if (! (bits & ScanOp::SCAN_VS))
+          pos.m_get = ScanPos::Get_next_tuple_fs;
+        else
+          pos.m_get = ScanPos::Get_next_tuple_vs;
       }
-      th = (Tuple_header*)page_ptr->get_ptr(key.m_page_idx);
-    }
-
-    if(bits & ScanOp::SCAN_LCP && 
-       th->m_header_bits & Tuple_header::LCP_SKIP)
-    {
-      /**
-       * Clear it so that it will show up in next LCP
-       */
-      ((Tuple_header*)th)->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP;
       continue;
+    case ScanPos::Get_tuple:
+      // get tuple
+      jam();
+      {
+        if (! (bits & ScanOp::SCAN_VS))
+          pos.m_get = ScanPos::Get_tuple_fs;
+        else
+          pos.m_get = ScanPos::Get_tuple_vs;
+      }
+      continue;
+    case ScanPos::Get_next_tuple_fs:
+      // move to next fixed size tuple
+      jam();
+      {
+        Fix_page* page = (Fix_page*)pos.m_page;
+        bool mm = (bits & ScanOp::SCAN_DD);
+        Uint32 size = table.m_offsets[mm].m_fix_header_size;
+        key.m_page_idx += size;
+        pos.m_get = ScanPos::Get_tuple_fs;
+      }
+      /*FALLTHRU*/
+    case ScanPos::Get_tuple_fs:
+      // get fixed size tuple
+      jam();
+      {
+        Fix_page* page = (Fix_page*)pos.m_page;
+        bool mm = (bits & ScanOp::SCAN_DD);
+        Uint32 size = table.m_offsets[mm].m_fix_header_size;
+        if (key.m_page_idx + size <= Fix_page::DATA_WORDS) {
+          th = (Tuple_header*)&page->m_data[key.m_page_idx];
+          if (! (th->m_header_bits & Tuple_header::FREE)) {
+            pos.m_get = ScanPos::Get_next_tuple_fs;
+            goto found_tuple;
+          } else {
+            jam();
+            // skip free tuple
+            pos.m_get = ScanPos::Get_next_tuple_fs;
+          }
+        } else {
+          jam();
+          // no more tuples on this page
+          pos.m_get = ScanPos::Get_next_page;
+        }
+      }
+      break; // incr loop count
+    case ScanPos::Get_next_tuple_vs:
+      // move to next var size tuple
+      jam();
+      {
+        key.m_page_idx++;
+        pos.m_get = ScanPos::Get_tuple_vs;
+      }
+      /*FALLTHRU*/
+    case ScanPos::Get_tuple_vs:
+      // get var size tuple
+      jam();
+      {
+        Var_page* page = (Var_page*)pos.m_page;
+        if (key.m_page_idx < page->high_index) {
+          Uint32 len = page->get_entry_len(key.m_page_idx);
+          if (len != 0) {
+            if (! (len & Var_page::CHAIN)) {
+              th = (Tuple_header*)page->get_ptr(key.m_page_idx);
+              pos.m_get = ScanPos::Get_next_tuple_vs;
+              goto found_tuple;
+            } else {
+              // skip varpart chain
+              jam();
+              pos.m_get = ScanPos::Get_next_tuple_vs;
+            }
+          } else {
+            // skip empty slot
+            pos.m_get = ScanPos::Get_next_tuple_vs;
+            jam();
+          }
+        } else {
+          jam();
+          // no more tuples on this page
+          pos.m_get = ScanPos::Get_next_page;
+        }
+      }
+      break; // incr loop count
+    found_tuple:
+      // found possible tuple to return
+      jam();
+      {
+        // caller has already set pos.m_get to next tuple
+        if (! (bits & ScanOp::SCAN_LCP && 
+           th->m_header_bits & Tuple_header::LCP_SKIP)) {
+          Local_key& key_mm = pos.m_key_mm;
+          if (! (bits & ScanOp::SCAN_DD)) {
+            key_mm = pos.m_key;
+            // real page id is already set
+          } else {
+            Uint32 ref = th->m_base_record_ref;
+            key_mm.m_page_no = (ref >> MAX_TUPLES_BITS);
+            key_mm.m_page_idx = (ref & ((1 << MAX_TUPLES_BITS) - 1));
+            // recompute for each disk tuple
+            pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
+          }
+          // TUPKEYREQ handles savepoint stuff
+          scan.m_state = ScanOp::Current;
+          return true;
+        } else {
+          jam();
+          // clear it so that it will show up in next LCP
+          th->m_header_bits &= Tuple_header::LCP_SKIP;
+        }
+      }
+      break; // incr loop count
+    default:
+      ndbrequire(false);
+      break;
     }
-    scan.m_state = ScanOp::Locked;
-    break;
+    if (++loop_count >= 32)
+      break;
+  }
+  // TODO: at drop table we have to flush and terminate these
+  jam();
+  signal->theData[0] = ZTUP_SCAN;
+  signal->theData[1] = scanPtr.i;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+  return false;
+}
+
+void
+Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
+{
+  bool immediate = scanNext(signal, scanPtr);
+  if (! immediate) {
+    jam();
+    // time-slicing again
+    return;
   }
+  scanReply(signal, scanPtr);
+}
+
+void
+Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
+{
+  ScanOpPtr scanPtr;
+  c_scanOpPool.getPtr(scanPtr, scanPtrI);
+  ScanOp& scan = *scanPtr.p;
+  ScanPos& pos = scan.m_scanPos;
+  // get cache page
+  Ptr<GlobalPage> gptr;
+  m_global_page_pool.getPtr(gptr, page_i);
+  pos.m_page = (Page*)gptr.p;
+  // continue
+  scanCont(signal, scanPtr);
 }
 
 void
@@ -350,11 +865,44 @@
   unsigned signalLength = 3;
   sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
       signal, signalLength, JBB);
-
   releaseScanOp(scanPtr);
 }
 
 void
+Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
+{
+  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
+  ScanLockPtr lockPtr;
+#ifdef VM_TRACE
+  list.first(lockPtr);
+  while (lockPtr.i != RNIL) {
+    ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
+    list.next(lockPtr);
+  }
+#endif
+  bool ok = list.seize(lockPtr);
+  ndbrequire(ok);
+  lockPtr.p->m_accLockOp = accLockOp;
+}
+
+void
+Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
+{
+  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
+  ScanLockPtr lockPtr;
+  list.first(lockPtr);
+  while (lockPtr.i != RNIL) {
+    if (lockPtr.p->m_accLockOp == accLockOp) {
+      jam();
+      break;
+    }
+    list.next(lockPtr);
+  }
+  ndbrequire(lockPtr.i != RNIL);
+  list.release(lockPtr);
+}
+
+void
 Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
 {
   FragrecordPtr fragPtr;
@@ -396,8 +944,9 @@
     frag.m_lcp_scan_op = c_lcp_scan_op;
     ScanOpPtr scanPtr;
     c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
-    
-    scanFirst(signal, fragPtr.p, scanPtr);
+    //ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i); ?
+
+    scanFirst(signal, scanPtr);
     scanPtr.p->m_state = ScanOp::First;
   }
 }
Thread
bk commit into 5.1 tree (pekka:1.1952)pekka10 Nov