List:Commits« Previous MessageNext Message »
From:jonas Date:August 25 2006 12:50pm
Subject:bk commit into 5.1 tree (jonas:1.2261)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2006-08-25 14:50:37+02:00, jonas@stripped +14 -0
  ndb -
    use real pid in ACC instead of logical pid

  storage/ndb/include/kernel/kernel_types.h@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +4 -0
    Add ref()

  storage/ndb/include/kernel/signaldata/NextScan.hpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +4 -2
    Add return of logic page id

  storage/ndb/include/kernel/signaldata/TupKey.hpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +3 -2
    Add return of rowid

  storage/ndb/src/kernel/blocks/Makefile.am@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +1 -0
    Move TUP nr code into own file

  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +9 -9
    clean up scan 
    move assignement of row_id to tupkeyconf

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +91 -71
    handle change wrt to real/logical pid in acc

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +12 -12
    use bits instead of bools for misc flags

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +6 -6
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +47 -396
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

  storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +5 -19
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

  storage/ndb/src/kernel/blocks/dbtup/DbtupNr.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +398 -0
    New BitKeeper file ``storage/ndb/src/kernel/blocks/dbtup/DbtupNr.cpp''
    Move all tup nr related functions into own file

  storage/ndb/src/kernel/blocks/dbtup/DbtupNr.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +0 -0

  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +10 -8
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +18 -8
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

  storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp@stripped, 2006-08-25 14:50:35+02:00, jonas@stripped +2 -3
    use bits instead of bools
    get realpid via tupkey req (instead of logical pid)

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-dynarr
--- New file ---
+++ storage/ndb/src/kernel/blocks/dbtup/DbtupNr.cpp	06/08/25 14:50:35
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#define DBTUP_C
#include <Dblqh.hpp>
#include "Dbtup.hpp"
#include <RefConvert.hpp>
#include <ndb_limits.h>
#include <pc.hpp>

int
Dbtup::nr_get_real_pid(Uint32 fragPtrI, Uint32 pid, Uint32* res)
{
  FragrecordPtr fragPtr;
  fragPtr.i= fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  * res = getRealpid(fragPtr.p, pid);
  return 0;
}

int
Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
{
  FragrecordPtr fragPtr;
  fragPtr.i= fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  TablerecPtr tablePtr;
  tablePtr.i= fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);

  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
  {
    Local_key tmp = *key;
    PagePtr page_ptr;

    int ret;
    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
    {
      tablePtr.p->m_offsets[MM].m_fix_header_size += 
	Tuple_header::HeaderSize+1;
      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
      tablePtr.p->m_offsets[MM].m_fix_header_size -= 
	Tuple_header::HeaderSize+1;
    } 
    else
    {
      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
    }

    if (ret)
      return -1;
    
    Tuple_header* ptr = (Tuple_header*)
      ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
    
    ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
    *ptr->get_mm_gci(tablePtr.p) = gci;
  }
  return 0;
}

int
Dbtup::nr_read_pk(Uint32 fragPtrI, 
		  const Local_key* key, Uint32* dst, bool& copy)
{
  
  FragrecordPtr fragPtr;
  fragPtr.i= fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  TablerecPtr tablePtr;
  tablePtr.i= fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);

  Local_key tmp = *key;
  Uint32 pages = fragPtr.p->noOfPages;
  
  int ret;
  PagePtr page_ptr;
  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
  {
    tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1;
    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
    tablePtr.p->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize+1;
  } 
  else
  {
    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
  }
  if (ret)
    return -1;
  
  KeyReqStruct req_struct;
  Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
  
  req_struct.m_page_ptr = page_ptr;
  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;

  ret = 0;
  copy = false;
  if (! (bits & Tuple_header::FREE))
  {
    if (bits & Tuple_header::ALLOC)
    {
      Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
      Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
      ndbassert(!opPtrP->m_copy_tuple_location.isNull());
      req_struct.m_tuple_ptr= (Tuple_header*)
	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
      copy = true;
    }
    req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
    req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
    
    Uint32 num_attr= tablePtr.p->m_no_of_attributes;
    Uint32 descr_start= tablePtr.p->tabDescriptor;
    TableDescriptor *tab_descr= &tableDescriptor[descr_start];
    ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
    req_struct.attr_descr= tab_descr; 

    if (tablePtr.p->need_expand())
      prepare_read(&req_struct, tablePtr.p, false);
    
    const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
    const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
    // read pk attributes from original tuple
    
    // new globals
    tabptr= tablePtr;
    fragptr= fragPtr;
    operPtr.i= RNIL;
    operPtr.p= NULL;
    
    // do it
    ret = readAttributes(&req_struct,
			 attrIds,
			 numAttrs,
			 dst,
			 ZNIL, false);
    
    // done
    if (likely(ret != -1)) {
      // remove headers
      Uint32 n= 0;
      Uint32 i= 0;
      while (n < numAttrs) {
	const AttributeHeader ah(dst[i]);
	Uint32 size= ah.getDataSize();
	ndbrequire(size != 0);
	for (Uint32 j= 0; j < size; j++) {
	  dst[i + j - n]= dst[i + j + 1];
	}
	n+= 1;
	i+= 1 + size;
      }
      ndbrequire((int)i == ret);
      ret -= numAttrs;
    } else {
      return terrorCode ? (-(int)terrorCode) : -1;
    }
  }
    
  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
  {
    dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
  }
  else
  {
    dst[ret] = 0;
  }
  return ret;
}

#include <signaldata/TuxMaint.hpp>

int
Dbtup::nr_delete(Signal* signal, Uint32 senderData,
		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
{
  FragrecordPtr fragPtr;
  fragPtr.i= fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  TablerecPtr tablePtr;
  tablePtr.i= fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);

  Local_key tmp = * key;
  
  PagePtr pagePtr;
  Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);

  if (!tablePtr.p->tuxCustomTriggers.isEmpty()) 
  {
    jam();
    TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
    req->tableId = fragPtr.p->fragTableId;
    req->fragId = fragPtr.p->fragmentId;
    req->pageId = tmp.m_page_no;
    req->pageIndex = tmp.m_page_idx;
    req->tupVersion = ptr->get_tuple_version();
    req->opInfo = TuxMaintReq::OpRemove;
    removeTuxEntries(signal, tablePtr.p);
  }
  
  Local_key disk;
  memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
  
  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
  {
    jam();
    free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
  } else {
    jam();
    free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
  }

  if (tablePtr.p->m_no_of_disk_attributes)
  {
    jam();

    Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
      tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
    
    int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
    ndbrequire(res == 0);
    
    /**
     * 1) alloc log buffer
     * 2) get page
     * 3) get log buffer
     * 4) delete tuple
     */
    Page_cache_client::Request preq;
    preq.m_page = disk;
    preq.m_callback.m_callbackData = senderData;
    preq.m_callback.m_callbackFunction =
      safe_cast(&Dbtup::nr_delete_page_callback);
    int flags = Page_cache_client::COMMIT_REQ;
    
#ifdef ERROR_INSERT
    if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
    {
      int rnd = rand() % 100;
      int slp = 0;
      if (ERROR_INSERTED(4024))
      {
	slp = 3000;
      }
      else if (rnd > 90)
      {
	slp = 3000;
      }
      else if (rnd > 70)
      {
	slp = 100;
      }
      
      ndbout_c("rnd: %d slp: %d", rnd, slp);
      
      if (slp)
      {
	flags |= Page_cache_client::DELAY_REQ;
	preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
      }
    }
#endif
    
    res = m_pgman.get_page(signal, preq, flags);
    if (res == 0)
    {
      goto timeslice;
    }
    else if (unlikely(res == -1))
    {
      return -1;
    }

    PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
    disk_page_set_dirty(disk_page);

    preq.m_callback.m_callbackFunction =
      safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
    Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
    res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
    switch(res){
    case 0:
      signal->theData[2] = disk_page.i;
      goto timeslice;
    case -1:
      ndbrequire("NOT YET IMPLEMENTED" == 0);
      break;
    }

    ndbout << "DIRECT DISK DELETE: " << disk << endl;
    disk_page_free(signal, tablePtr.p, fragPtr.p,
		   &disk, *(PagePtr*)&disk_page, gci);
    return 0;
  }
  
  return 0;

timeslice:
  memcpy(signal->theData, &disk, sizeof(disk));
  return 1;
}

void
Dbtup::nr_delete_page_callback(Signal* signal, 
			       Uint32 userpointer, Uint32 page_id)
{
  Ptr<GlobalPage> gpage;
  m_global_page_pool.getPtr(gpage, page_id);
  PagePtr pagePtr= *(PagePtr*)&gpage;
  disk_page_set_dirty(pagePtr);
  Dblqh::Nr_op_info op;
  op.m_ptr_i = userpointer;
  op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
  op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
  c_lqh->get_nr_op_info(&op, page_id);

  Ptr<Fragrecord> fragPtr;
  fragPtr.i= op.m_tup_frag_ptr_i;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);

  Ptr<Tablerec> tablePtr;
  tablePtr.i = fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
  
  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
    tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
  
  Callback cb;
  cb.m_callbackData = userpointer;
  cb.m_callbackFunction =
    safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
  int res= lgman.get_log_buffer(signal, sz, &cb);
  switch(res){
  case 0:
    return;
  case -1:
    ndbrequire("NOT YET IMPLEMENTED" == 0);
    break;
  }
    
  ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
  disk_page_free(signal, tablePtr.p, fragPtr.p,
		 &op.m_disk_ref, pagePtr, op.m_gci);
  
  c_lqh->nr_delete_complete(signal, &op);
  return;
}

void
Dbtup::nr_delete_logbuffer_callback(Signal* signal, 
				    Uint32 userpointer, 
				    Uint32 unused)
{
  Dblqh::Nr_op_info op;
  op.m_ptr_i = userpointer;
  c_lqh->get_nr_op_info(&op, RNIL);
  
  Ptr<Fragrecord> fragPtr;
  fragPtr.i= op.m_tup_frag_ptr_i;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);

  Ptr<Tablerec> tablePtr;
  tablePtr.i = fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);

  Ptr<GlobalPage> gpage;
  m_global_page_pool.getPtr(gpage, op.m_page_id);
  PagePtr pagePtr= *(PagePtr*)&gpage;

  /**
   * reset page no
   */
  ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
  
  disk_page_free(signal, tablePtr.p, fragPtr.p,
		 &op.m_disk_ref, pagePtr, op.m_gci);
  
  c_lqh->nr_delete_complete(signal, &op);
}


--- 1.6/storage/ndb/include/kernel/kernel_types.h	2006-08-25 14:50:42 +02:00
+++ 1.7/storage/ndb/include/kernel/kernel_types.h	2006-08-25 14:50:42 +02:00
@@ -64,6 +64,10 @@
     m_page_idx = ref & MAX_TUPLES_PER_PAGE;
     return *this;
   }
+  
+  static Uint32 ref(Uint32 page, Uint32 idx) { 
+    return (page << MAX_TUPLES_BITS) | idx; 
+  }
 };
 
 class NdbOut&

--- 1.5/storage/ndb/include/kernel/signaldata/NextScan.hpp	2006-08-25 14:50:42 +02:00
+++ 1.6/storage/ndb/include/kernel/signaldata/NextScan.hpp	2006-08-25 14:50:42 +02:00
@@ -47,14 +47,16 @@
   friend class Dblqh;
 public:
   // length is less if no keyinfo or no next result
-  STATIC_CONST( SignalLength = 11 );
+  STATIC_CONST( SignalLength = 7 );
+  STATIC_CONST( DeletedRowSignalLength = 8 );
 private:
   Uint32 scanPtr;               // scan record in LQH
   Uint32 accOperationPtr;
   Uint32 fragId;
   Uint32 localKey[2];
   Uint32 localKeyLength;
-  Uint32 gci;
+  Uint32 logical_page_id;
+  Uint32 gci; // This is only used 
 };
 
 #endif

--- 1.5/storage/ndb/include/kernel/signaldata/TupKey.hpp	2006-08-25 14:50:42 +02:00
+++ 1.6/storage/ndb/include/kernel/signaldata/TupKey.hpp	2006-08-25 14:50:42 +02:00
@@ -80,7 +80,7 @@
   friend bool printTUPKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
 
 public:
-  STATIC_CONST( SignalLength = 6 );
+  STATIC_CONST( SignalLength = 8 );
 
 private:
 
@@ -92,7 +92,8 @@
   Uint32 writeLength;
   Uint32 noFiredTriggers;
   Uint32 lastRow;
-  Uint32 rowid;
+  Uint32 rowid_page_no;  // Note: only valid if TUPKEYCONF read-row-id
+  Uint32 rowid_page_idx;
 };
 
 class TupKeyRef {

--- 1.53/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2006-08-25 14:50:42 +02:00
+++ 1.54/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2006-08-25 14:50:42 +02:00
@@ -521,8 +521,9 @@
     Uint32 scan_acc_index;
     Uint32 scan_acc_attr_recs;
     UintR scanApiOpPtr;
-    Local_key m_row_id;
-    
+    Uint32 m_real_page_id;
+    Uint32 m_logical_page_id;
+
     Uint32 m_max_batch_size_rows;
     Uint32 m_max_batch_size_bytes;
 
@@ -2466,10 +2467,11 @@
   void completeTransLastLab(Signal* signal);
   void tupScanCloseConfLab(Signal* signal);
   void tupCopyCloseConfLab(Signal* signal);
-  void accScanCloseConfLab(Signal* signal);
-  void accCopyCloseConfLab(Signal* signal);
-  void nextScanConfScanLab(Signal* signal);
-  void nextScanConfCopyLab(Signal* signal);
+  void accScanCloseConfLab(Signal* signal, Ptr<TcConnectionrec>);
+  void accCopyCloseConfLab(Signal* signal, Ptr<TcConnectionrec>);
+  void nextScanConfScanLab(Signal* signal, Ptr<TcConnectionrec>);
+  void nextScanConfCopyLab(Signal* signal, Ptr<TcConnectionrec>);
+  void scanLockReleasedLab(Signal* signal, Ptr<TcConnectionrec>);
   void continueScanNextReqLab(Signal* signal);
   void keyinfoLab(const Uint32 * src, const Uint32 * end);
   void copySendTupkeyReqLab(Signal* signal);
@@ -2525,7 +2527,6 @@
   void scanTupkeyRefLab(Signal* signal);
   void accScanConfScanLab(Signal* signal);
   void accScanConfCopyLab(Signal* signal);
-  void scanLockReleasedLab(Signal* signal);
   void openSrFourthNextLab(Signal* signal);
   void closingInitLab(Signal* signal);
   void closeExecSrCompletedLab(Signal* signal);
@@ -3005,12 +3006,11 @@
   regTcPtr.i= opId;
   ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
   signal->theData[0] = regTcPtr.p->accConnectrec;
-  signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
+  signal->theData[1] = key->ref();
   c_acc->execACCMINUPDATE(signal);
   
   if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
     ndbout << " LK: " << *key;
-  regTcPtr.p->m_row_id = *key;
 }
 
 inline

--- 1.118/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2006-08-25 14:50:42 +02:00
+++ 1.119/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2006-08-25 14:50:42 +02:00
@@ -2558,15 +2558,18 @@
   Uint32 activeCreat = regTcPtr->activeCreat;
 
   FragrecordPtr regFragptr;
-  regFragptr.i = tcConnectptr.p->fragmentptr;
+  regFragptr.i = regTcPtr->fragmentptr;
   c_fragment_pool.getPtr(regFragptr);
   fragptr = regFragptr;
   
+  regTcPtr->m_row_id.m_page_no = tupKeyConf->rowid_page_no;
+  regTcPtr->m_row_id.m_page_idx = tupKeyConf->rowid_page_idx;
+
   switch (tcConnectptr.p->transactionState) {
   case TcConnectionrec::WAIT_TUP:
     jam();
-    if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
-      tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
+    if (regTcPtr->seqNoReplica == 0) // Primary replica
+      regTcPtr->noFiredTriggers = tupKeyConf->noFiredTriggers;
     tupkeyConfLab(signal);
     break;
   case TcConnectionrec::COPY_TUPKEY:
@@ -3397,13 +3400,13 @@
   } 
   
   regTcPtr->reqinfo = Treqinfo;
-  regTcPtr->lastReplicaNo = LqhKeyReq::getLastReplicaNo(Treqinfo);
+  Uint32 TlastReplicaNo = regTcPtr->lastReplicaNo = LqhKeyReq::getLastReplicaNo(Treqinfo);
   regTcPtr->dirtyOp       = LqhKeyReq::getDirtyFlag(Treqinfo);
   regTcPtr->opExec        = LqhKeyReq::getInterpretedFlag(Treqinfo);
   regTcPtr->opSimple      = LqhKeyReq::getSimpleFlag(Treqinfo);
   regTcPtr->simpleRead    = op == ZREAD && regTcPtr->opSimple;
-  regTcPtr->seqNoReplica  = LqhKeyReq::getSeqNoReplica(Treqinfo);
-  UintR TreclenAiLqhkey   = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
+  Uint32 TseqNoReplica = regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
+  Uint32 TreclenAiLqhkey   = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
   regTcPtr->apiVersionNo  = 0; 
   regTcPtr->m_use_rowid   = LqhKeyReq::getRowidFlag(Treqinfo);
   regTcPtr->m_dealloc     = 0;
@@ -3435,8 +3438,7 @@
     regTcPtr->tcOprec = lqhKeyReq->variableData[nextPos];
     nextPos++;
   }//if
-  UintR TnextReplicasIndicator = regTcPtr->lastReplicaNo - 
-                                 regTcPtr->seqNoReplica;
+  UintR TnextReplicasIndicator = TlastReplicaNo - TseqNoReplica;
   if (TnextReplicasIndicator > 1) {
     regTcPtr->nodeAfterNext[0] = lqhKeyReq->variableData[nextPos] & 0xFFFF;
     regTcPtr->nodeAfterNext[1] = lqhKeyReq->variableData[nextPos] >> 16;
@@ -3500,8 +3502,6 @@
     LQHKEY_error(signal, 2);
     return;
   }//if
-  UintR TseqNoReplica = regTcPtr->seqNoReplica;
-  UintR TlastReplicaNo = regTcPtr->lastReplicaNo;
   if (TseqNoReplica == TlastReplicaNo) {
     jam();
     regTcPtr->nextReplica = ZNIL;
@@ -3573,6 +3573,11 @@
     }
     ndbassert(fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION);
     fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
+    if (TRACENR_FLAG)
+    {
+      TRACENR("Starting NR for tab: " << tabptr.i << " frag: "
+	      << regTcPtr->fragmentid << endl);
+    }
   }
   
   Uint8 TcopyType = fragptr.p->fragCopy;
@@ -3585,13 +3590,16 @@
     CRASH_INSERTION2(5042, tabptr.i == c_error_insert_table_id);
   } else {
     regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
+    if (fragptr.p->m_copy_started_state == Fragrecord::AC_NR_COPY &&
+	!(op == ZINSERT && TseqNoReplica == 0))
+      regTcPtr->m_use_rowid = 1; 
   }//if
   regTcPtr->replicaType = TcopyType;
   regTcPtr->fragmentptr = fragptr.i;
   regTcPtr->m_log_part_ptr_i = logPart;
   Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
   if ((tfragDistKey != TdistKey) &&
-      (regTcPtr->seqNoReplica == 0) &&
+      (TseqNoReplica == 0) &&
       (regTcPtr->dirtyOp == ZFALSE) &&
       (regTcPtr->simpleRead == ZFALSE)) {
     /* ----------------------------------------------------------------------
@@ -4122,9 +4130,11 @@
   Uint32 keylen;
   Uint32 tableId = regTcPtr.p->tableref;
   Uint32 accPtr = regTcPtr.p->accConnectrec;
-  
+  Uint32 tupFragPtr = fragptr.p->tupFragptr;
+  Uint32 accFragPtr = fragptr.p->accFragptr;
+
   signal->theData[0] = accPtr;
-  signal->theData[1] = fragptr.p->accFragptr;
+  signal->theData[1] = accFragPtr;
   signal->theData[2] = ZDELETE + (ZDELETE << 4);
   signal->theData[5] = regTcPtr.p->transid[0];
   signal->theData[6] = regTcPtr.p->transid[1];
@@ -4133,6 +4143,9 @@
   {
     jam();
     keylen = 1;
+    Uint32 realpid;
+    ndbrequire(!c_tup->nr_get_real_pid(tupFragPtr, rowid->m_page_no,&realpid));
+    
     if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
     {
       signal->theData[3] = calculateHash(tableId, signal->theData+24);
@@ -4142,7 +4155,7 @@
       signal->theData[3] = md5_hash((Uint64*)(signal->theData+24), len);
     }
     signal->theData[4] = 0; // seach by local key
-    signal->theData[7] = rowid->ref();
+    signal->theData[7] = Local_key::ref(realpid, rowid->m_page_idx);
   }
   else
   {
@@ -5148,8 +5161,6 @@
   }
   else
   {
-    regTcPtr->m_use_rowid |= 
-      fragptr.p->m_copy_started_state == Fragrecord::AC_NR_COPY;
     LqhKeyReq::setRowidFlag(Treqinfo, regTcPtr->m_use_rowid);
   }
 
@@ -7636,51 +7647,58 @@
   jamEntry();
   scanptr.i = nextScanConf->scanPtr;
   c_scanRecordPool.getPtr(scanptr);
+  Local_key tmp;
+
   if (likely(nextScanConf->localKeyLength == 1)) 
   {
     jam();
-    scanptr.p->m_row_id.assref(nextScanConf->localKey[0]);
+    tmp.assref(nextScanConf->localKey[0]);
   }
   else
   {
     jam();
-    scanptr.p->m_row_id.m_page_no = nextScanConf->localKey[0];
-    scanptr.p->m_row_id.m_page_idx = nextScanConf->localKey[1]; 
+    tmp.m_page_no = nextScanConf->localKey[0];
+    tmp.m_page_idx = nextScanConf->localKey[1]; 
   }
   
+  Ptr<TcConnectionrec> regTcPtr;
+  regTcPtr.i = scanptr.p->scanTcrec;
+  ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+  fragptr.i = scanptr.p->fragPtrI;
+  c_fragment_pool.getPtr(fragptr);
+
 #ifdef VM_TRACE
   if (signal->getLength() > 2 && nextScanConf->accOperationPtr != RNIL)
   {
-    Ptr<TcConnectionrec> regTcPtr;
-    regTcPtr.i = scanptr.p->scanTcrec;
-    ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
     ndbassert(regTcPtr.p->fragmentid == nextScanConf->fragId);
   }
 #endif
   
-  fragptr.i = scanptr.p->fragPtrI;
-  c_fragment_pool.getPtr(fragptr);
+  scanptr.p->m_real_page_id = tmp.m_page_no;
+  scanptr.p->m_logical_page_id = nextScanConf->logical_page_id;
+  regTcPtr.p->m_row_id = tmp;
+
   switch (scanptr.p->scanState) {
   case ScanRecord::WAIT_CLOSE_SCAN:
     jam();
-    accScanCloseConfLab(signal);
+    accScanCloseConfLab(signal, regTcPtr);
     break;
   case ScanRecord::WAIT_CLOSE_COPY:
     jam();
-    accCopyCloseConfLab(signal);
+    accCopyCloseConfLab(signal, regTcPtr);
     break;
   case ScanRecord::WAIT_NEXT_SCAN:	      	       
     jam();     
-    nextScanConfScanLab(signal);       
+    nextScanConfScanLab(signal, regTcPtr);       
     break;
   case ScanRecord::WAIT_NEXT_SCAN_COPY:
     jam();
-    nextScanConfCopyLab(signal);
+    nextScanConfCopyLab(signal, regTcPtr);
     break;
   case ScanRecord::WAIT_RELEASE_LOCK:
     jam();
     ndbrequire(signal->length() == 1);
-    scanLockReleasedLab(signal);
+    scanLockReleasedLab(signal, regTcPtr);
     break;
   default:
     ndbout_c("%d", scanptr.p->scanState);
@@ -8097,10 +8115,9 @@
  * -------------------------------------------------------------------------
  *       PRECONDITION: SCAN_STATE = WAIT_RELEASE_LOCK
  * ------------------------------------------------------------------------- */
-void Dblqh::scanLockReleasedLab(Signal* signal)
+void Dblqh::scanLockReleasedLab(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
 {
-  tcConnectptr.i = scanptr.p->scanTcrec;
-  ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);  
+  tcConnectptr = regTcPtr;
 
   if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
     if ((scanptr.p->scanErrorCounter > 0) ||
@@ -8848,11 +8865,10 @@
  * -------------------------------------------------------------------------
  *       PRECONDITION: SCAN_STATE = WAIT_NEXT_SCAN
  * ------------------------------------------------------------------------- */
-void Dblqh::nextScanConfScanLab(Signal* signal) 
+void Dblqh::nextScanConfScanLab(Signal* signal, Ptr<TcConnectionrec> regTcPtr) 
 {
+  tcConnectptr = regTcPtr;
   NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
-  tcConnectptr.i = scanptr.p->scanTcrec;
-  ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   if (nextScanConf->fragId == RNIL) {
     jam();
     /* ---------------------------------------------------------------------
@@ -8888,7 +8904,7 @@
     if (scanptr.p->m_curr_batch_size_rows > 0) {
       jam();
 
-      if((tcConnectptr.p->primKeyLen - 4) == 0)
+      if((regTcPtr.p->primKeyLen - 4) == 0)
 	scanptr.p->scanCompletedStatus = ZTRUE;
       
       scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
@@ -8987,7 +9003,7 @@
   jam();
   
   int res;
-  Uint32 local_key = scanPtr.p->m_row_id.ref();
+  Uint32 local_key = regTcPtr.p->m_row_id.ref();
   
   if((res= c_tup->load_diskpage_scan(signal, 
 				     regTcPtr.p->tupConnectrec,
@@ -9059,8 +9075,8 @@
   
   tupKeyReq->connectPtr = regTcPtr->tupConnectrec;
   tupKeyReq->request = reqinfo;
-  tupKeyReq->keyRef1 = scanPtr.p->m_row_id.m_page_no;
-  tupKeyReq->keyRef2 = scanPtr.p->m_row_id.m_page_idx;
+  tupKeyReq->keyRef1 = regTcPtr->m_row_id.m_page_no;
+  tupKeyReq->keyRef2 = regTcPtr->m_row_id.m_page_idx;
   tupKeyReq->attrBufLen = 0;
   tupKeyReq->opRef = scanPtr.p->scanApiOpPtr; 
   tupKeyReq->applRef = scanPtr.p->scanApiBlockref;
@@ -9101,8 +9117,8 @@
 {
   Uint32 tableId = tcConP->tableref;
   Uint32 fragId = tcConP->fragmentid;
-  Uint32 fragPageId = scanP->m_row_id.m_page_no;
-  Uint32 pageIndex = scanP->m_row_id.m_page_idx;
+  Uint32 realPageId = scanP->m_real_page_id;
+  Uint32 pageIndex = tcConP->m_row_id.m_page_idx;
 
   if(scanP->rangeScan)
   {
@@ -9114,11 +9130,12 @@
     tableId = tFragPtr.p->tabRef;
   }
 
-  int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
+  int ret = c_tup->accReadPk(tableId, fragId, realPageId, pageIndex, 
+			     dst, false);
   jamEntry();
   if(0)
     ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
-	     tableId, fragId, fragPageId, pageIndex, ret);
+	     tableId, fragId, realPageId, pageIndex, ret);
   ndbassert(ret > 0);
 
   return ret;
@@ -9392,26 +9409,24 @@
  * -------------------------------------------------------------------------
  *       PRECONDITION: SCAN_STATE = WAIT_CLOSE_SCAN
  * ------------------------------------------------------------------------- */
-void Dblqh::accScanCloseConfLab(Signal* signal) 
+void Dblqh::accScanCloseConfLab(Signal* signal, Ptr<TcConnectionrec> regTcPtr) 
 {
-  tcConnectptr.i = scanptr.p->scanTcrec;
-  ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-
-  if((tcConnectptr.p->primKeyLen - 4) > 0 && 
+  if((regTcPtr.p->primKeyLen - 4) > 0 && 
     scanptr.p->scanCompletedStatus != ZTRUE)
   {
     jam();
+    tcConnectptr = regTcPtr;
     continueAfterReceivingAllAiLab(signal);
     return;
   }
   
   scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN;
-  signal->theData[0] = tcConnectptr.p->tupConnectrec;
-  signal->theData[1] = tcConnectptr.p->tableref;
+  signal->theData[0] = regTcPtr.p->tupConnectrec;
+  signal->theData[1] = regTcPtr.p->tableref;
   signal->theData[2] = scanptr.p->scanSchemaVersion;
   signal->theData[3] = ZDELETE_STORED_PROC_ID;
   signal->theData[4] = scanptr.p->scanStoredProcId;
-  sendSignal(tcConnectptr.p->tcTupBlockref,
+  sendSignal(regTcPtr.p->tcTupBlockref,
              GSN_STORED_PROCREQ, signal, 5, JBB);
 }//Dblqh::accScanCloseConfLab()
 
@@ -9508,7 +9523,6 @@
   scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo);
   scanptr.p->scanState = ScanRecord::SCAN_FREE;
   scanptr.p->scanFlag = ZFALSE;
-  scanptr.p->m_row_id.setNull();
   scanptr.p->scanTcWaiting = ZTRUE;
   scanptr.p->scanNumber = ~0;
   scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
@@ -10098,9 +10112,10 @@
   scanptr.i = tcConnectptr.p->tcScanRec;
   c_scanRecordPool.getPtr(scanptr);
   
-  if (false && fragptr.p->tabRef > 4)
+  if (ERROR_INSERTED(5714) && fragptr.p->tabRef == c_error_insert_table_id)
   {
-    ndbout_c("STOPPING COPY X = [ %d %d %d %d ]",
+    ndbout_c("STOPPING COPY X = DUMP %d 7020 332 [ %d %d %d %d ]",
+	     getOwnNodeId(),
 	     refToBlock(scanptr.p->scanBlockref),
 	     scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
     
@@ -10132,11 +10147,10 @@
 /*---------------------------------------------------------------------------*/
 /*       PRECONDITION: SCAN_STATE = WAIT_NEXT_SCAN_COPY                      */
 /*---------------------------------------------------------------------------*/
-void Dblqh::nextScanConfCopyLab(Signal* signal) 
+void Dblqh::nextScanConfCopyLab(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
 {
+  tcConnectptr = regTcPtr;
   NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
-  tcConnectptr.i = scanptr.p->scanTcrec;
-  ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   if (nextScanConf->fragId == RNIL) {
     jam();
 /*---------------------------------------------------------------------------*/
@@ -10167,9 +10181,8 @@
   TcConnectionrec * tcConP = tcConnectptr.p;
   
   tcConP->m_use_rowid = true;
-  tcConP->m_row_id = scanptr.p->m_row_id;
   
-  if (signal->getLength() == 7)
+  if (signal->getLength() == NextScanConf::DeletedRowSignalLength)
   {
     jam();
     ndbrequire(nextScanConf->accOperationPtr == RNIL);
@@ -10180,6 +10193,7 @@
     tcConP->primKeyLen = 0;
     tcConP->totSendlenAi = 0;
     tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
+    tcConP->m_row_id.m_page_no = scanptr.p->m_logical_page_id;
 
 /*---------------------------------------------------------------------------*/
 // To avoid using up to many operation records in ACC we will increase the
@@ -10304,6 +10318,8 @@
   Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
   ndbassert(accOpPtr != (Uint32)-1);
   c_acc->execACCKEY_ORD(signal, accOpPtr);
+
+  Uint32 logicalPageId = scanP->m_logical_page_id;
   
   if (tcConnectptr.p->errorCode != 0) {
     jam();
@@ -10319,12 +10335,12 @@
     return;
   }//if
   TcConnectionrec * tcConP = tcConnectptr.p;
-  tcConnectptr.p->totSendlenAi = readLength;
-  tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
+  tcConP->totSendlenAi = readLength;
+  tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
 
   // Read primary keys (used to get here via scan keyinfo)
   Uint32* tmp = signal->getDataPtrSend()+24;
-  Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
+  Uint32 len= tcConP->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
   
   tcConP->gci = tmp[len];
   // Calculate hash (no need to linearies key)
@@ -10372,11 +10388,12 @@
    *
    * Also see TR 587.
    *----------------------------------------------------------------*/
-  tcConnectptr.p->transid[0] = TnoOfWords; // Data overload, see note!
+  tcConP->m_row_id.m_page_no = logicalPageId;
+  tcConP->transid[0] = TnoOfWords; // Data overload, see note!
   packLqhkeyreqLab(signal);
-  tcConnectptr.p->copyCountWords += TnoOfWords;
+  tcConP->copyCountWords += TnoOfWords;
   scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
-  if (tcConnectptr.p->copyCountWords < cmaxWordsAtNodeRec) {
+  if (tcConP->copyCountWords < cmaxWordsAtNodeRec) {
     nextRecordCopy(signal);
     return;
   }//if
@@ -10605,17 +10622,15 @@
 /*---------------------------------------------------------------------------*/
 /*   PRECONDITION: SCAN_STATE = WAIT_CLOSE_COPY                              */
 /*---------------------------------------------------------------------------*/
-void Dblqh::accCopyCloseConfLab(Signal* signal) 
+void Dblqh::accCopyCloseConfLab(Signal* signal, Ptr<TcConnectionrec> regTcPtr) 
 {
-  tcConnectptr.i = scanptr.p->scanTcrec;
   scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_COPY;
-  ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  signal->theData[0] = tcConnectptr.p->tupConnectrec;
-  signal->theData[1] = tcConnectptr.p->tableref;
+  signal->theData[0] = regTcPtr.p->tupConnectrec;
+  signal->theData[1] = regTcPtr.p->tableref;
   signal->theData[2] = scanptr.p->scanSchemaVersion;
   signal->theData[3] = ZDELETE_STORED_PROC_ID;
   signal->theData[4] = scanptr.p->scanStoredProcId;
-  sendSignal(tcConnectptr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
+  sendSignal(regTcPtr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
   return;
 }//Dblqh::accCopyCloseConfLab()
 
@@ -18655,6 +18670,11 @@
       traceopout = tracenrout;
     }
     SET_ERROR_INSERT_VALUE(arg);
+  }
+  if (arg == 5714 && signal->getLength() == 2)
+  {
+    SET_ERROR_INSERT_VALUE(arg);
+    c_error_insert_table_id = signal->theData[1];
   }
 #endif
   

--- 1.50/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2006-08-25 14:50:42 +02:00
+++ 1.51/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2006-08-25 14:50:42 +02:00
@@ -1285,7 +1285,6 @@
   Uint32          in_buf_index;
   Uint32          in_buf_len;
   Uint32          attr_descriptor;
-  bool            xfrm_flag;
 
   struct Var_data {
     char *m_data_ptr;
@@ -1300,14 +1299,19 @@
   PagePtr m_disk_page_ptr;       //
   Local_key m_row_id;
   
-  bool            dirty_op;
-  bool            interpreted_exec;
-  bool            last_row;
-  bool            m_use_rowid;
+  enum Bits {
+    KR_DIRTY        = 0x01,
+    KR_INTERPRETED  = 0x02,
+    KR_LAST_ROW     = 0x04,
+    KR_ROWID        = 0x08, // For insert, this mean insert with specific rowid
+                            // For update/delete, this means return rowid
+    KR_XFRM         = 0x10,
+    KR_CHANGE_MASK  = 0x20
+  };
+  Uint32 m_bits;
 
   Signal*         signal;
   Uint32 no_fired_triggers;
-  Uint32 frag_page_id;
   Uint32 hash_value;
   Uint32 gci;
   Uint32 log_size;
@@ -1322,7 +1326,7 @@
   Uint32 no_changed_attrs;
   BlockReference TC_ref;
   BlockReference rec_blockref;
-  bool change_mask_calculated;
+
   /*
    * A bit mask where a bit set means that the update or insert
    * was updating this record.
@@ -1363,11 +1367,6 @@
   virtual ~Dbtup();
 
   /*
-   * TUX uses logical tuple address when talking to ACC and LQH.
-   */
-  void tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr);
-
-  /*
    * TUX index in TUP has single Uint32 array attribute which stores an
    * index node.  TUX reads and writes the node directly via pointer.
    */
@@ -1417,6 +1416,7 @@
   int nr_read_pk(Uint32 fragPtr, const Local_key*, Uint32* dataOut, bool&copy);
   int nr_update_gci(Uint32 fragPtr, const Local_key*, Uint32 gci);
   int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci);
+  int nr_get_real_pid(Uint32 fragPtr, Uint32 pid, Uint32* res);
 
   void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
   void nr_delete_logbuffer_callback(Signal*, Uint32 op, Uint32 page);

--- 1.16/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2006-08-25 14:50:42 +02:00
+++ 1.17/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2006-08-25 14:50:42 +02:00
@@ -29,13 +29,12 @@
 {
   TablerecPtr regTabPtr;
   FragrecordPtr regFragPtr;
-  Uint32 frag_page_id, frag_id;
 
   ljamEntry();
 
-  frag_id= signal->theData[0];
+  Uint32 frag_id= signal->theData[0];
   regTabPtr.i= signal->theData[1];
-  frag_page_id= signal->theData[2];
+  Uint32 real_page_id= signal->theData[2];
   Uint32 page_index= signal->theData[3];
 
   ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
@@ -43,10 +42,10 @@
   getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
   ndbassert(regFragPtr.p != NULL);
   
-  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~0))
+  if (! (((real_page_id << MAX_TUPLES_BITS) + page_index) == ~0))
   {
     Local_key tmp;
-    tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
+    tmp.m_page_no= real_page_id;
     tmp.m_page_idx= page_index;
     
     PagePtr pagePtr;
@@ -186,12 +185,13 @@
     c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
     Local_key rowid = regOperPtr->m_tuple_location;
     Local_key scanpos = scanOp.p->m_scanPos.m_key;
+    Uint32 ref = rowid.ref();
     rowid.m_page_no = page->frag_page_id;
     if (rowid >= scanpos)
     {
       extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
       ptr->m_operation_ptr_i = lcp_keep_list;
-      regFragPtr->m_lcp_keep_list = rowid.ref();
+      regFragPtr->m_lcp_keep_list = ref;
     }
   }
   

--- 1.41/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2006-08-25 14:50:42 +02:00
+++ 1.42/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2006-08-25 14:50:42 +02:00
@@ -341,7 +341,7 @@
 
   do {
     Uint32 savepointId= regOperPtr->savepointId;
-    bool dirty= req_struct->dirty_op;
+    bool dirty= req_struct->m_bits & KeyReqStruct::KR_DIRTY;
     
     c_operation_pool.getPtr(currOpPtr);
     bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
@@ -442,9 +442,8 @@
   
   jam();
   Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
-  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
-  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
-						     frag_page_id);
+  Uint32 real_page_id= local_key >> MAX_TUPLES_BITS;
+  regOperPtr->m_tuple_location.m_page_no= real_page_id;
   regOperPtr->m_tuple_location.m_page_idx= page_idx;
   
   PagePtr page_ptr;
@@ -526,9 +525,8 @@
   
   jam();
   Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
-  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
-  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
-						     frag_page_id);
+  Uint32 real_page_id= local_key >> MAX_TUPLES_BITS;
+  regOperPtr->m_tuple_location.m_page_no= real_page_id;
   regOperPtr->m_tuple_location.m_page_idx= page_idx;
   regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
   
@@ -603,15 +601,20 @@
    Tablerec* regTabPtr = tabptr.p;
 
    req_struct.signal= signal;
-   req_struct.dirty_op= TrequestInfo & 1;
-   req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
    req_struct.no_fired_triggers= 0;
    req_struct.read_length= 0;
    req_struct.max_attr_id_updated= 0;
    req_struct.no_changed_attrs= 0;
-   req_struct.last_row= false;
    req_struct.changeMask.clear();
 
+   req_struct.m_bits = 0;
+   req_struct.m_bits |= (TrequestInfo & 1) ? KeyReqStruct::KR_DIRTY : 0;
+   req_struct.m_bits |= (TrequestInfo & (1 << 10)) ? 
+     KeyReqStruct::KR_INTERPRETED : 0;
+
+   req_struct.m_bits |= (TrequestInfo & (1 << 11)) ? 
+     KeyReqStruct::KR_ROWID : 0;
+   
    if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
    {
      TUPKEY_abort(signal, 39);
@@ -650,8 +653,7 @@
    req_struct.tc_operation_ptr= sig1;
    req_struct.TC_index= sig2;
    req_struct.TC_ref= sig3;
-   Uint32 pageid = req_struct.frag_page_id= sig4;
-   req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
+   Uint32 pageid = regOperPtr->m_tuple_location.m_page_no = sig4;
 
    sig1= tupKeyReq->attrBufLen;
    sig2= tupKeyReq->applRef;
@@ -693,9 +695,6 @@
    /**
     * Get pointer to tuple
     */
-   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr, 
-						      req_struct.frag_page_id);
-   
    setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
    
    /**
@@ -874,12 +873,13 @@
 {
   TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();  
   
-  Uint32 Rcreate_rowid = req_struct->m_use_rowid;
   Uint32 RuserPointer= regOperPtr->userpointer;
   Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
   Uint32 log_size= req_struct->log_size;
   Uint32 read_length= req_struct->read_length;
-  Uint32 last_row= req_struct->last_row;
+  Uint32 last_row= req_struct->m_bits & KeyReqStruct::KR_LAST_ROW;
+  Uint32 page_no= req_struct->m_row_id.m_page_no;
+  Uint32 page_idx= regOperPtr->m_tuple_location.m_page_idx;
   
   set_trans_state(regOperPtr, TRANS_STARTED);
   set_tuple_state(regOperPtr, TUPLE_PREPARED);
@@ -888,8 +888,9 @@
   tupKeyConf->writeLength= log_size;
   tupKeyConf->noFiredTriggers= RnoFiredTriggers;
   tupKeyConf->lastRow= last_row;
-  tupKeyConf->rowid = Rcreate_rowid;
-  
+  tupKeyConf->rowid_page_no = page_no;
+  tupKeyConf->rowid_page_idx= page_idx;
+
   EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
 		 TupKeyConf::SignalLength);
   
@@ -930,7 +931,8 @@
   }
   dst= &signal->theData[start_index];
   dstLen= (MAX_READ / 4) - start_index;
-  if (!req_struct->interpreted_exec) {
+  if (! (req_struct->m_bits & KeyReqStruct::KR_INTERPRETED))
+  {
     jam();
     int ret = readAttributes(req_struct,
 			     &cinBuffer[0],
@@ -1039,7 +1041,8 @@
   operPtrP->tupVersion= tup_version;
   
   int retValue;
-  if (!req_struct->interpreted_exec) {
+  if (! (req_struct->m_bits & KeyReqStruct::KR_INTERPRETED))
+  {
     jam();
     retValue= updateAttributes(req_struct,
                                &cinBuffer[0],
@@ -1067,10 +1070,19 @@
   }
   
   req_struct->m_tuple_ptr->set_tuple_version(tup_version);
-  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum) 
+  {
     jam();
     setChecksum(req_struct->m_tuple_ptr, regTabPtr);
   }
+
+  if (req_struct->m_bits & KeyReqStruct::KR_ROWID)
+  {
+    jam();
+    // likely cachemiss
+    req_struct->m_row_id.m_page_no = req_struct->m_page_ptr.p->frag_page_id;
+  }
+
   return retValue;
   
 error:
@@ -1172,9 +1184,9 @@
   bool mem_insert = regOperPtr.p->is_first_operation();
   bool disk_insert = mem_insert && disk;
   bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
-  bool rowid = req_struct->m_use_rowid;
+  bool rowid = req_struct->m_bits & KeyReqStruct::KR_ROWID;
   Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
-  Uint32 frag_page_id = req_struct->frag_page_id;
+  Uint32 frag_page_id;
 
   union {
     Uint32 sizes[4];
@@ -1295,7 +1307,6 @@
       {
 	goto mem_error;
       }
-      req_struct->m_use_rowid = true;
     }
     else
     {
@@ -1329,8 +1340,8 @@
 	goto alloc_rowid_error;
       }
     }
-    real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
-    regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
+
+    req_struct->m_row_id.m_page_no = frag_page_id;
     c_lqh->accminupdate(signal,
 			regOperPtr.p->userpointer,
 			&regOperPtr.p->m_tuple_location);
@@ -1339,7 +1350,6 @@
     base->m_operation_ptr_i= regOperPtr.i;
     base->m_header_bits= Tuple_header::ALLOC | 
       (varsize ? Tuple_header::CHAINED_ROW : 0);
-    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
   }
   else 
   {
@@ -1359,7 +1369,6 @@
     {
       goto size_change_error;
     }
-    req_struct->m_use_rowid = false;
     base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
   }
 
@@ -1393,7 +1402,7 @@
      * Set ref from disk to mm
      */
     Local_key ref = regOperPtr.p->m_tuple_location;
-    ref.m_page_no = frag_page_id;
+    ref.m_page_no = frag_page_id; //  Must use logical page id
     
     Tuple_header* disk_ptr= req_struct->m_disk_ptr;
     disk_ptr->m_header_bits = 0;
@@ -1505,6 +1514,13 @@
     return 0;
   }
   
+  if (req_struct->m_bits & KeyReqStruct::KR_ROWID)
+  {
+    jam();
+    // likely cachemiss
+    req_struct->m_row_id.m_page_no = req_struct->m_page_ptr.p->frag_page_id;
+  }
+  
   return handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
 
 error:
@@ -2358,7 +2374,7 @@
 #ifdef TRACE_INTERPRETER
 	ndbout_c(" - exit_ok_last");
 #endif
-	req_struct->last_row= true;
+	req_struct->m_bits |= KeyReqStruct::KR_LAST_ROW;
 	return TdataWritten;
 	
       case Interpreter::EXIT_REFUSE:
@@ -2844,368 +2860,3 @@
   return 0;
 }
 
-int
-Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
-{
-  FragrecordPtr fragPtr;
-  fragPtr.i= fragPtrI;
-  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-  TablerecPtr tablePtr;
-  tablePtr.i= fragPtr.p->fragTableId;
-  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-
-  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
-  {
-    Local_key tmp = *key;
-    PagePtr page_ptr;
-
-    int ret;
-    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
-    {
-      tablePtr.p->m_offsets[MM].m_fix_header_size += 
-	Tuple_header::HeaderSize+1;
-      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
-      tablePtr.p->m_offsets[MM].m_fix_header_size -= 
-	Tuple_header::HeaderSize+1;
-    } 
-    else
-    {
-      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
-    }
-
-    if (ret)
-      return -1;
-    
-    Tuple_header* ptr = (Tuple_header*)
-      ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
-    
-    ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
-    *ptr->get_mm_gci(tablePtr.p) = gci;
-  }
-  return 0;
-}
-
-int
-Dbtup::nr_read_pk(Uint32 fragPtrI, 
-		  const Local_key* key, Uint32* dst, bool& copy)
-{
-  
-  FragrecordPtr fragPtr;
-  fragPtr.i= fragPtrI;
-  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-  TablerecPtr tablePtr;
-  tablePtr.i= fragPtr.p->fragTableId;
-  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-
-  Local_key tmp = *key;
-  Uint32 pages = fragPtr.p->noOfPages;
-  
-  int ret;
-  PagePtr page_ptr;
-  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
-  {
-    tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1;
-    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
-    tablePtr.p->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize+1;
-  } 
-  else
-  {
-    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
-  }
-  if (ret)
-    return -1;
-  
-  KeyReqStruct req_struct;
-  Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
-  
-  req_struct.m_page_ptr = page_ptr;
-  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
-  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
-
-  ret = 0;
-  copy = false;
-  if (! (bits & Tuple_header::FREE))
-  {
-    if (bits & Tuple_header::ALLOC)
-    {
-      Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
-      Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
-      ndbassert(!opPtrP->m_copy_tuple_location.isNull());
-      req_struct.m_tuple_ptr= (Tuple_header*)
-	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
-      copy = true;
-    }
-    req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
-    req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
-    
-    Uint32 num_attr= tablePtr.p->m_no_of_attributes;
-    Uint32 descr_start= tablePtr.p->tabDescriptor;
-    TableDescriptor *tab_descr= &tableDescriptor[descr_start];
-    ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
-    req_struct.attr_descr= tab_descr; 
-
-    if (tablePtr.p->need_expand())
-      prepare_read(&req_struct, tablePtr.p, false);
-    
-    const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
-    const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
-    // read pk attributes from original tuple
-    
-    // new globals
-    tabptr= tablePtr;
-    fragptr= fragPtr;
-    operPtr.i= RNIL;
-    operPtr.p= NULL;
-    
-    // do it
-    ret = readAttributes(&req_struct,
-			 attrIds,
-			 numAttrs,
-			 dst,
-			 ZNIL, false);
-    
-    // done
-    if (likely(ret != -1)) {
-      // remove headers
-      Uint32 n= 0;
-      Uint32 i= 0;
-      while (n < numAttrs) {
-	const AttributeHeader ah(dst[i]);
-	Uint32 size= ah.getDataSize();
-	ndbrequire(size != 0);
-	for (Uint32 j= 0; j < size; j++) {
-	  dst[i + j - n]= dst[i + j + 1];
-	}
-	n+= 1;
-	i+= 1 + size;
-      }
-      ndbrequire((int)i == ret);
-      ret -= numAttrs;
-    } else {
-      return terrorCode ? (-(int)terrorCode) : -1;
-    }
-  }
-    
-  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
-  {
-    dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
-  }
-  else
-  {
-    dst[ret] = 0;
-  }
-  return ret;
-}
-
-#include <signaldata/TuxMaint.hpp>
-
-int
-Dbtup::nr_delete(Signal* signal, Uint32 senderData,
-		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
-{
-  FragrecordPtr fragPtr;
-  fragPtr.i= fragPtrI;
-  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-  TablerecPtr tablePtr;
-  tablePtr.i= fragPtr.p->fragTableId;
-  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-
-  Local_key tmp = * key;
-  tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no); 
-  
-  PagePtr pagePtr;
-  Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
-
-  if (!tablePtr.p->tuxCustomTriggers.isEmpty()) 
-  {
-    jam();
-    TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
-    req->tableId = fragPtr.p->fragTableId;
-    req->fragId = fragPtr.p->fragmentId;
-    req->pageId = tmp.m_page_no;
-    req->pageIndex = tmp.m_page_idx;
-    req->tupVersion = ptr->get_tuple_version();
-    req->opInfo = TuxMaintReq::OpRemove;
-    removeTuxEntries(signal, tablePtr.p);
-  }
-  
-  Local_key disk;
-  memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
-  
-  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
-  {
-    jam();
-    free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
-  } else {
-    jam();
-    free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
-  }
-
-  if (tablePtr.p->m_no_of_disk_attributes)
-  {
-    jam();
-
-    Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
-      tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
-    
-    int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
-    ndbrequire(res == 0);
-    
-    /**
-     * 1) alloc log buffer
-     * 2) get page
-     * 3) get log buffer
-     * 4) delete tuple
-     */
-    Page_cache_client::Request preq;
-    preq.m_page = disk;
-    preq.m_callback.m_callbackData = senderData;
-    preq.m_callback.m_callbackFunction =
-      safe_cast(&Dbtup::nr_delete_page_callback);
-    int flags = Page_cache_client::COMMIT_REQ;
-    
-#ifdef ERROR_INSERT
-    if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
-    {
-      int rnd = rand() % 100;
-      int slp = 0;
-      if (ERROR_INSERTED(4024))
-      {
-	slp = 3000;
-      }
-      else if (rnd > 90)
-      {
-	slp = 3000;
-      }
-      else if (rnd > 70)
-      {
-	slp = 100;
-      }
-      
-      ndbout_c("rnd: %d slp: %d", rnd, slp);
-      
-      if (slp)
-      {
-	flags |= Page_cache_client::DELAY_REQ;
-	preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
-      }
-    }
-#endif
-    
-    res = m_pgman.get_page(signal, preq, flags);
-    if (res == 0)
-    {
-      goto timeslice;
-    }
-    else if (unlikely(res == -1))
-    {
-      return -1;
-    }
-
-    PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
-    disk_page_set_dirty(disk_page);
-
-    preq.m_callback.m_callbackFunction =
-      safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
-    Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
-    res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
-    switch(res){
-    case 0:
-      signal->theData[2] = disk_page.i;
-      goto timeslice;
-    case -1:
-      ndbrequire("NOT YET IMPLEMENTED" == 0);
-      break;
-    }
-
-    ndbout << "DIRECT DISK DELETE: " << disk << endl;
-    disk_page_free(signal, tablePtr.p, fragPtr.p,
-		   &disk, *(PagePtr*)&disk_page, gci);
-    return 0;
-  }
-  
-  return 0;
-
-timeslice:
-  memcpy(signal->theData, &disk, sizeof(disk));
-  return 1;
-}
-
-void
-Dbtup::nr_delete_page_callback(Signal* signal, 
-			       Uint32 userpointer, Uint32 page_id)
-{
-  Ptr<GlobalPage> gpage;
-  m_global_page_pool.getPtr(gpage, page_id);
-  PagePtr pagePtr= *(PagePtr*)&gpage;
-  disk_page_set_dirty(pagePtr);
-  Dblqh::Nr_op_info op;
-  op.m_ptr_i = userpointer;
-  op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
-  op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
-  c_lqh->get_nr_op_info(&op, page_id);
-
-  Ptr<Fragrecord> fragPtr;
-  fragPtr.i= op.m_tup_frag_ptr_i;
-  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-
-  Ptr<Tablerec> tablePtr;
-  tablePtr.i = fragPtr.p->fragTableId;
-  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-  
-  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
-    tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
-  
-  Callback cb;
-  cb.m_callbackData = userpointer;
-  cb.m_callbackFunction =
-    safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
-  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
-  int res= lgman.get_log_buffer(signal, sz, &cb);
-  switch(res){
-  case 0:
-    return;
-  case -1:
-    ndbrequire("NOT YET IMPLEMENTED" == 0);
-    break;
-  }
-    
-  ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
-  disk_page_free(signal, tablePtr.p, fragPtr.p,
-		 &op.m_disk_ref, pagePtr, op.m_gci);
-  
-  c_lqh->nr_delete_complete(signal, &op);
-  return;
-}
-
-void
-Dbtup::nr_delete_logbuffer_callback(Signal* signal, 
-				    Uint32 userpointer, 
-				    Uint32 unused)
-{
-  Dblqh::Nr_op_info op;
-  op.m_ptr_i = userpointer;
-  c_lqh->get_nr_op_info(&op, RNIL);
-  
-  Ptr<Fragrecord> fragPtr;
-  fragPtr.i= op.m_tup_frag_ptr_i;
-  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-
-  Ptr<Tablerec> tablePtr;
-  tablePtr.i = fragPtr.p->fragTableId;
-  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-
-  Ptr<GlobalPage> gpage;
-  m_global_page_pool.getPtr(gpage, op.m_page_id);
-  PagePtr pagePtr= *(PagePtr*)&gpage;
-
-  /**
-   * reset page no
-   */
-  ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
-  
-  disk_page_free(signal, tablePtr.p, fragPtr.p,
-		 &op.m_disk_ref, pagePtr, op.m_gci);
-  
-  c_lqh->nr_delete_complete(signal, &op);
-}

--- 1.20/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2006-08-25 14:50:42 +02:00
+++ 1.21/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2006-08-25 14:50:42 +02:00
@@ -29,19 +29,6 @@
 
 // methods used by ordered index
 
-void
-Dbtup::tuxGetTupAddr(Uint32 fragPtrI,
-                     Uint32 pageId,
-                     Uint32 pageIndex,
-                     Uint32& tupAddr)
-{
-  ljamEntry();
-  PagePtr pagePtr;
-  c_page_pool.getPtr(pagePtr, pageId);
-  Uint32 fragPageId= pagePtr.p->frag_page_id;
-  tupAddr= (fragPageId << MAX_TUPLES_BITS) | pageIndex;
-}
-
 int
 Dbtup::tuxAllocNode(Signal* signal,
                     Uint32 fragPtrI,
@@ -304,7 +291,7 @@
 }
 
 int
-Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
+Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 pageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
 {
   ljamEntry();
   // get table
@@ -316,7 +303,6 @@
   getFragmentrec(fragPtr, fragId, tablePtr.p);
   // get real page id and tuple offset
 
-  Uint32 pageId = getRealpid(fragPtr.p, fragPageId);
   // use TUX routine - optimize later
   int ret = tuxReadPk(fragPtr.i, pageId, pageIndex, dataOut, xfrmFlag);
   return ret;
@@ -338,19 +324,19 @@
   tablePtr.i= fragPtr.p->fragTableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
   // get page
-  Uint32 fragPageId= tupAddr >> MAX_TUPLES_BITS;
+  Uint32 realPageId= tupAddr >> MAX_TUPLES_BITS;
   Uint32 pageIndex= tupAddr & ((1 << MAX_TUPLES_BITS ) - 1);
   // use temp op rec
   Operationrec tempOp;
   KeyReqStruct req_struct;
-  tempOp.m_tuple_location.m_page_no= getRealpid(fragPtr.p, fragPageId);
+  tempOp.m_tuple_location.m_page_no= realPageId;
   tempOp.m_tuple_location.m_page_idx= pageIndex;
   tempOp.savepointId= savePointId;
   tempOp.op_struct.op_type= ZREAD;
-  req_struct.frag_page_id= fragPageId;
+  //req_struct.frag_page_id= fragPageId;
   req_struct.trans_id1= transId1;
   req_struct.trans_id2= transId2;
-  req_struct.dirty_op= 1;
+  req_struct.m_bits = KeyReqStruct::KR_DIRTY;
   
   setup_fixed_part(&req_struct, &tempOp, tablePtr.p);
   if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p, false)) {

--- 1.28/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2006-08-25 14:50:42 +02:00
+++ 1.29/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2006-08-25 14:50:42 +02:00
@@ -198,7 +198,7 @@
   inBufIndex= 0;
   req_struct->out_buf_index= 0;
   req_struct->max_read= maxRead;
-  req_struct->xfrm_flag= xfrm_flag;
+  req_struct->m_bits |= xfrm_flag ? KeyReqStruct::KR_XFRM : 0;
   while (inBufIndex < inBufLen) {
     tmpAttrBufIndex= req_struct->out_buf_index;
     AttributeHeader ahIn(inBuffer[inBufIndex]);
@@ -309,7 +309,8 @@
   Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
 
   ndbrequire((readOffset + attrNoOfWords - 1) < req_struct->check_offset[MM]);
-  if (! charsetFlag || ! req_struct->xfrm_flag) {
+  if (! charsetFlag || ! (req_struct->m_bits & KeyReqStruct::KR_XFRM))
+  {
     Uint32 newIndexBuf = indexBuf + attrNoOfWords;
     if (newIndexBuf <= maxRead) {
       ljam();
@@ -476,7 +477,7 @@
   new_index= index_buf + vsize_in_words;
 
   ndbrequire(vsize_in_words <= max_var_size);
-  if (! charsetFlag || ! req_struct->xfrm_flag)
+  if (! charsetFlag || ! (req_struct->m_bits & KeyReqStruct::KR_XFRM))
   {
     if (new_index <= max_read) {
       ljam();
@@ -586,7 +587,8 @@
   Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
 
   ndbrequire((readOffset + attrNoOfWords - 1) < req_struct->check_offset[DD]);
-  if (! charsetFlag || ! req_struct->xfrm_flag) {
+  if (! charsetFlag || ! (req_struct->m_bits & KeyReqStruct::KR_XFRM))
+  {
     Uint32 newIndexBuf = indexBuf + attrNoOfWords;
     if (newIndexBuf <= maxRead) {
       ljam();
@@ -832,13 +834,13 @@
   req_struct->max_read = MAX_KEY_SIZE_IN_WORDS;
   req_struct->attr_descriptor = attrDescriptor;
 
-  bool tmp = req_struct->xfrm_flag;
-  req_struct->xfrm_flag = true;
+  Uint32 tmp = req_struct->m_bits;
+  req_struct->m_bits |= KeyReqStruct::KR_XFRM;
   ndbrequire((this->*f)(&keyReadBuffer[0],
                         req_struct,
                         ahOut,
                         attributeOffset));
-  req_struct->xfrm_flag = tmp;
+  req_struct->m_bits = tmp;
   
   ndbrequire(req_struct->out_buf_index == ahOut->getDataSize());
   if (ahIn.getDataSize() != ahOut->getDataSize()) {
@@ -1186,7 +1188,7 @@
     outBuffer[3] = signal->theData[3];
     return 4;
   case AttributeHeader::ROWID:
-    outBuffer[0] = req_struct->frag_page_id;
+    outBuffer[0] = req_struct->m_page_ptr.p->frag_page_id;
     outBuffer[1] = operPtr.p->m_tuple_location.m_page_idx;
     return 2;
   case AttributeHeader::ROW_GCI:

--- 1.45/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2006-08-25 14:50:42 +02:00
+++ 1.46/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2006-08-25 14:50:42 +02:00
@@ -1276,9 +1276,8 @@
   const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
   const TupLoc tupLoc = ent.m_tupLoc;
   Uint32 tupAddr = NullTupAddr;
-  c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), tupAddr);
-  jamEntry();
-  return tupAddr;
+  return Local_key::ref(tupLoc.getPageId(),
+			tupLoc.getPageOffset());
 }
 
 inline unsigned

--- 1.15/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2006-08-25 14:50:42 +02:00
+++ 1.16/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2006-08-25 14:50:42 +02:00
@@ -301,7 +301,7 @@
       lockReq->fragId = frag.fragmentId;
       lockReq->fragPtrI = RNIL; // no cached frag ptr yet
       lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
-      lockReq->tupAddr = key_mm.ref();
+      lockReq->tupAddr = Local_key::ref(pos.m_realpid_mm, key_mm.m_page_idx);
       lockReq->transId1 = scan.m_transId1;
       lockReq->transId2 = scan.m_transId2;
       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
@@ -379,10 +379,12 @@
     const ScanPos& pos = scan.m_scanPos;
     conf->accOperationPtr = accLockOp;
     conf->fragId = frag.fragmentId;
-    conf->localKey[0] = pos.m_key_mm.ref();
+    conf->localKey[0] = Local_key::ref(pos.m_realpid_mm, 
+				       pos.m_key_mm.m_page_idx);
     conf->localKey[1] = 0;
     conf->localKeyLength = 1;
-    unsigned signalLength = 6;
+    conf->logical_page_id = pos.m_key_mm.m_page_no;
+    unsigned signalLength = NextScanConf::SignalLength;
     if (scan.m_bits & ScanOp::SCAN_LOCK) {
       sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
           signal, signalLength, JBB);
@@ -855,12 +857,15 @@
 	conf->scanPtr = scan.m_userPtr;
 	conf->accOperationPtr = RNIL;
 	conf->fragId = frag.fragmentId;
-	conf->localKey[0] = pos.m_key_mm.ref();
+	conf->localKey[0] = Local_key::ref(pos.m_realpid_mm, 
+					   key_mm.m_page_idx);
 	conf->localKey[1] = 0;
 	conf->localKeyLength = 1;
 	conf->gci = foundGCI;
+	conf->logical_page_id =  key_mm.m_page_no;
 	Uint32 blockNo = refToBlock(scan.m_userRef);
-	EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
+	EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 
+		       NextScanConf::DeletedRowSignalLength);
 	jamEntry();
 
 	// TUPKEYREQ handles savepoint stuff
@@ -886,7 +891,6 @@
 found_lcp_keep:
   Local_key tmp;
   tmp.assref(lcp_list);
-  tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
   
   Ptr<Page> pagePtr;
   c_page_pool.getPtr(pagePtr, tmp.m_page_no);
@@ -911,9 +915,15 @@
   conf->localKey[0] = lcp_list;
   conf->localKey[1] = 0;
   conf->localKeyLength = 1;
-  conf->gci = 0;
+  /**
+   * This are not needed in LCP scan,
+   *  as BACKUP reads them using psuedo-columns
+   */
+  conf->logical_page_id = ~0;
+  
   Uint32 blockNo = refToBlock(scan.m_userRef);
-  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
+  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 
+		 NextScanConf::SignalLength);
   
   fragPtr.p->m_lcp_keep_list = next;
   ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag

--- 1.7/storage/ndb/src/kernel/blocks/Makefile.am	2006-08-25 14:50:42 +02:00
+++ 1.8/storage/ndb/src/kernel/blocks/Makefile.am	2006-08-25 14:50:42 +02:00
@@ -24,6 +24,7 @@
   dbtup/DbtupGen.cpp dbtup/DbtupIndex.cpp \
   dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
   dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
+  dbtup/DbtupNr.cpp \
   dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
   ndbfs/AsyncFile.cpp ndbfs/Ndbfs.cpp ndbfs/VoidFs.cpp \
   ndbfs/Filename.cpp ndbfs/CircularIndex.cpp \
Thread
bk commit into 5.1 tree (jonas:1.2261)jonas25 Aug