List:Internals« Previous MessageNext Message »
From:jonas.oreland Date:March 4 2005 7:45am
Subject:bk commit into 5.1-ndb tree (joreland:1.1762)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1-ndb repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1762 05/03/04 07:45:47 joreland@stripped +10 -0
  wl1866 - ndb diskdata -
    insert update read scan works ok.
    still not working: multi operations, timeslice tup_commit, delete

  ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
    1.29 05/03/04 07:45:44 joreland@stripped +507 -0
    Disk data routines

  ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
    1.35 05/03/04 07:45:44 joreland@stripped +3 -3
    Index don't setup disk part

  ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.44 05/03/04 07:45:44 joreland@stripped +120 -27
    Fix diskdata in TUPKEYREQ

  ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
    1.6 05/03/04 07:45:44 joreland@stripped +13 -13
    Unify callback names
    Comment printouts

  ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.29 05/03/04 07:45:43 joreland@stripped +31 -17
    Fix commit of initial insert/update
      (wo/ changing size)

  ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.39 05/03/04 07:45:43 joreland@stripped +85 -17
    Make inlines on need_expand and need_shrink
    Add read/update functions for disk data

  ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    1.56 05/03/04 07:45:43 joreland@stripped +136 -46
    Add methods for preloading pages on pk and scan access

  ndb/src/kernel/blocks/dblqh/Dblqh.hpp
    1.40 05/03/04 07:45:43 joreland@stripped +19 -1
    Add methods for preloading pages on pk and scan access

  ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
    1.22 05/03/04 07:45:43 joreland@stripped +1 -1
    Increase size of global page pool

  ndb/include/kernel/signaldata/TupKey.hpp
    1.4 05/03/04 07:45:43 joreland@stripped +2 -1
    Add diskpage to TUPKEYREQ

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	joreland
# Host:	eel.hemma.oreland.se
# Root:	/home/jonas/src/mysql-5.1-ndb-dd

--- 1.5/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Tue Mar  1 19:04:25 2005
+++ 1.6/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Fri Mar  4 07:45:44 2005
@@ -177,7 +177,7 @@
   ndbassert(req.p->m_free_space >= sz);
   if(i != idx)
   {
-    ndbout_c("moving page request %d -> %d", i, idx);
+    //ndbout_c("moving page request %d -> %d", i, idx);
     Page_request_list old_list(c_page_request_pool, list[i]);
     Page_request_list new_list(c_page_request_pool, list[idx]);
     
@@ -207,7 +207,7 @@
       old_list.remove(extentPtr);
       new_list.add(extentPtr);
       extentPtr.p->m_free_matrix_pos= new_pos;
-      ndbout_c("moving extent from %d to %d", old_pos, new_pos);
+      //ndbout_c("moving extent from %d to %d", old_pos, new_pos);
     }
   }
 }
@@ -251,7 +251,7 @@
       update_free_space(tabPtrP,*(PagePtr*)&page, alloc.m_dirty_pages, i, sz);
       key->m_page_no= ((Page*)page.p)->m_page_no;
       key->m_file_no= ((Page*)page.p)->m_file_no;
-      ndbout_c("found dirty page");
+      //ndbout_c("found dirty page");
       return 0; // Page in memory
     }
   }
@@ -272,7 +272,7 @@
       req.p->m_ref_count++;
       update_free_space(tabPtrP, req, alloc.m_page_requests, i, sz);
       * key = req.p->m_key;
-      ndbout_c("found transit page");
+      //ndbout_c("found transit page");
       return 0;
     }
   }
@@ -407,7 +407,7 @@
   preq.m_page = *key;
   preq.m_callback.m_callbackData= req.i;
   preq.m_callback.m_callbackFunction = 
-    safe_cast(&Dbtup::disk_alloc_page_callback);
+    safe_cast(&Dbtup::disk_page_alloc_callback);
   
   int flags= Page_cache_client::ALLOC_REQ;
   if(pageBits == 0)
@@ -415,7 +415,7 @@
     //XXX empty page -> fast to map
     flags |= Page_cache_client::EMPTY_PAGE;
     preq.m_callback.m_callbackFunction = 
-      safe_cast(&Dbtup::disk_alloc_page_callback_initial);
+      safe_cast(&Dbtup::disk_page_alloc_initial_callback);
   }
   
   int res= m_pgman.get_page(signal, preq, flags);
@@ -432,10 +432,10 @@
 }
 
 void
-Dbtup::disk_alloc_page_callback(Signal* signal, 
+Dbtup::disk_page_alloc_callback(Signal* signal, 
 				Uint32 page_request, Uint32 page_id)
 {
-  ndbout_c("disk_alloc_page_callback id: %d", page_id);
+  //ndbout_c("disk_alloc_page_callback id: %d", page_id);
 
   Ptr<Page_request> req;
   c_page_request_pool.getPtr(req, page_request);
@@ -447,14 +447,14 @@
   tabPtr.i= req.p->m_table_id;
   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
   
-  disk_alloc_page_callback_common(signal, req, tabPtr, gpage);
+  disk_page_alloc_callback_common(signal, req, tabPtr, gpage);
 }
 
 void
-Dbtup::disk_alloc_page_callback_initial(Signal*signal , Uint32 page_request, 
+Dbtup::disk_page_alloc_initial_callback(Signal*signal , Uint32 page_request, 
 					Uint32 page_id)
 {
-  ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);  
+  //ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);  
   /**
    * 1) lookup page request
    * 2) lookup page
@@ -484,11 +484,11 @@
   {
     abort();
   }
-  disk_alloc_page_callback_common(signal, req, tabPtr, gpage);
+  disk_page_alloc_callback_common(signal, req, tabPtr, gpage);
 }
 
 void
-Dbtup::disk_alloc_page_callback_common(Signal* signal, 
+Dbtup::disk_page_alloc_callback_common(Signal* signal, 
 				       Ptr<Page_request> req, 
 				       Ptr<Tablerec> tabPtr, 
 				       Ptr<GlobalPage> pagePtr)

--- 1.3/ndb/include/kernel/signaldata/TupKey.hpp	Wed Sep 15 16:57:12 2004
+++ 1.4/ndb/include/kernel/signaldata/TupKey.hpp	Fri Mar  4 07:45:43 2005
@@ -36,7 +36,7 @@
   friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16
receiverBlockNo);
 
 public:
-  STATIC_CONST( SignalLength = 15 );
+  STATIC_CONST( SignalLength = 16 );
 
 private:
 
@@ -58,6 +58,7 @@
   Uint32 coordinatorTC;
   Uint32 tcOpIndex;
   Uint32 savePointId;
+  Uint32 disk_page;
 };
 
 class TupKeyConf {

--- 1.21/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	Sun Jan 23 17:45:31 2005
+++ 1.22/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	Fri Mar  4 07:45:43 2005
@@ -92,7 +92,7 @@
   addRecSignal(GSN_TESTSIG, &Cmvmi::execTESTSIG);
   
   subscriberPool.setSize(5);
-  m_global_page_pool.setSize(8);
+  m_global_page_pool.setSize(100);
   
   const ndb_mgm_configuration_iterator * db = theConfig.getOwnConfigIterator();
   for(unsigned j = 0; j<LogLevel::LOGLEVEL_CATEGORIES; j++){

--- 1.39/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	Tue Feb 15 14:11:31 2005
+++ 1.40/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	Fri Mar  4 07:45:43 2005
@@ -2543,9 +2543,27 @@
   Dbacc* c_acc;
   Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
 
-  void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32);
+  void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
   void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
+
+public:
   void acckeyconf_load_diskpage_callback(Signal*, Uint32, Uint32);
+
+private:
+  void next_scanconf_load_diskpage(Signal* signal, 
+				   ScanRecordPtr scanPtr,
+				   Ptr<TcConnectionrec> regTcPtr,
+				   Fragrecord* fragPtrP);
+  
+  void next_scanconf_tupkeyreq(Signal* signal, ScanRecordPtr,
+			       TcConnectionrec * regTcPtr,
+			       Fragrecord* fragPtrP,
+			       Uint32 disk_page);
+
+public:  
+  void next_scanconf_load_diskpage_callback(Signal* signal, Uint32, Uint32);
+
+private:
 
 // ----------------------------------------------------------------
 // These are variables handling the records. For most records one

--- 1.55/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	Tue Feb 15 14:11:31 2005
+++ 1.56/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	Fri Mar  4 07:45:43 2005
@@ -984,7 +984,8 @@
   if (tabptr.p->tableStatus == Tablerec::NOT_DEFINED){
     tabptr.p->tableStatus = Tablerec::ADD_TABLE_ONGOING;
     tabptr.p->tableType = tableType;
-    tabptr.p->primaryTableId = primaryTableId;
+    tabptr.p->primaryTableId = 
+      (primaryTableId == RNIL ? tabptr.i : primaryTableId);
     tabptr.p->schemaVersion = tschemaVersion;
     tabptr.p->m_disk_table= 0;
   }//if
@@ -3903,14 +3904,16 @@
 
   ndbrequire(localKeyFlag == 1);
   if(!regTcPtr->m_disk_table)
-    acckeyconf_tupkeyreq(signal, regTcPtr, regFragptr.p, localKey1);
+    acckeyconf_tupkeyreq(signal, regTcPtr, regFragptr.p, localKey1, 0);
   else
     acckeyconf_load_diskpage(signal, tcConnectptr, regFragptr.p, localKey1);
 }
 
 void
 Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
-			    Fragrecord* regFragptrP, Uint32 local_key)
+			    Fragrecord* regFragptrP, 
+			    Uint32 local_key,
+			    Uint32 disk_page)
 {
   regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
   /* ------------------------------------------------------------------------
@@ -3972,6 +3975,7 @@
   tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
   tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
   tupKeyReq->savePointId = tcConnectptr.p->savePointId;
+  tupKeyReq->disk_page= disk_page;
 
   EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
 }//Dblqh::execACCKEYCONF()
@@ -3980,17 +3984,14 @@
 Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr,
 				Fragrecord* regFragptrP, Uint32 local_key)
 {
-  Callback cb = { safe_cast(&Dblqh::acckeyconf_load_diskpage_callback), 
-		  regTcPtr.i };
   int res;
   if((res= c_tup->load_diskpage(signal, 
-				&cb, 
 				regTcPtr.p->tupConnectrec,
 				regFragptrP->tupFragptr, 
 				local_key, 
 				0)) > 0)
   {
-    acckeyconf_tupkeyreq(signal, regTcPtr.p, regFragptrP, local_key);
+    acckeyconf_tupkeyreq(signal, regTcPtr.p, regFragptrP, local_key, res);
   }
   else if(res == 0)
   {
@@ -4009,25 +4010,27 @@
 void
 Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, 
 					 Uint32 callbackData,
-					 Uint32 returnCode)
+					 Uint32 disk_page)
 {
   jamEntry();
   tcConnectptr.i = callbackData;
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
 
-  if(returnCode == 0)
+  if(disk_page > 0)
   {
     FragrecordPtr fragPtr;
     c_fragment_pool.getPtr(fragPtr, regTcPtr->fragmentptr);
     
-    acckeyconf_tupkeyreq(signal, regTcPtr, fragPtr.p, regTcPtr->m_local_key);
+    acckeyconf_tupkeyreq(signal, regTcPtr, fragPtr.p, 
+			 regTcPtr->m_local_key,
+			 disk_page);
   }
   else
   {
     TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
     ref->userRef= callbackData;
-    ref->errorCode= returnCode;
+    ref->errorCode= disk_page;
     execTUPKEYREF(signal);
   }
 }
@@ -8136,49 +8139,130 @@
     closeScanLab(signal);
     return;
   }//if
-  jam();
-  Uint32 tupFragPtr;
-  Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
-  reqinfo = reqinfo + (tcConnectptr.p->operation << 6);
-  reqinfo = reqinfo + (tcConnectptr.p->opExec << 10);
-  tcConnectptr.p->transactionState = TcConnectionrec::SCAN_TUPKEY;
-  fragptr.i = tcConnectptr.p->fragmentptr;
-  c_fragment_pool.getPtr(fragptr);
-  if (! scanptr.p->rangeScan) {
-    tupFragPtr = fragptr.p->tupFragptr;
-  } else {
+
+  Fragrecord* fragPtrP= fragptr.p;
+  if (scanptr.p->rangeScan) {
     jam();
     // for ordered index use primary table
-    FragrecordPtr tFragPtr;
-    tFragPtr.i = fragptr.p->tableFragptr;
-    c_fragment_pool.getPtr(tFragPtr);
-    tupFragPtr = tFragPtr.p->tupFragptr;
+    fragPtrP= c_fragment_pool.getPtr(fragPtrP->tableFragptr);
   }
+
+  if(tcConnectptr.p->m_disk_table)
   {
-    jam();
-    TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend(); 
+    next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+  }
+  else
+  {
+    next_scanconf_tupkeyreq(signal, scanptr, tcConnectptr.p, fragPtrP, 0);
+  }
+}
 
-    tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
-    tupKeyReq->request = reqinfo;
-    tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0];
-    tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
-    tupKeyReq->attrBufLen = 0;
-    tupKeyReq->opRef = scanptr.p->scanApiOpPtr; 
-    tupKeyReq->applRef = scanptr.p->scanApiBlockref;
-    tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
-    tupKeyReq->transId1 = tcConnectptr.p->transid[0];
-    tupKeyReq->transId2 = tcConnectptr.p->transid[1];
-    tupKeyReq->fragPtr = tupFragPtr;
-    tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
-    tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
-    tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
-    tupKeyReq->savePointId = tcConnectptr.p->savePointId;
-    Uint32 blockNo = refToBlock(tcConnectptr.p->tcTupBlockref);
-    EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, 
-               TupKeyReq::SignalLength);
+void
+Dblqh::next_scanconf_load_diskpage(Signal* signal, 
+				   ScanRecordPtr scanPtr,
+				   Ptr<TcConnectionrec> regTcPtr,
+				   Fragrecord* fragPtrP)
+{
+  jam();
+  
+  int res;
+  Uint32 local_key;
+  local_key = scanptr.p->scanLocalref[0] << MAX_TUPLES_BITS;
+  local_key += scanptr.p->scanLocalref[1];
+  
+  if((res= c_tup->load_diskpage_scan(signal, 
+				     regTcPtr.p->tupConnectrec,
+				     fragPtrP->tupFragptr, 
+				     local_key, 
+				     0)) > 0)
+  {
+    next_scanconf_tupkeyreq(signal, scanptr, regTcPtr.p, fragPtrP, res);
+  }
+  else if(res == 0)
+  {
+    regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP;
+  }
+  else 
+  {
+    TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
+    ref->userRef= regTcPtr.i;
+    ref->errorCode= ~0;
+    execTUPKEYREF(signal);
+  }
+}
+
+void
+Dblqh::next_scanconf_load_diskpage_callback(Signal* signal, 
+					    Uint32 callbackData,
+					    Uint32 disk_page)
+{
+  jamEntry();
+
+  Ptr<TcConnectionrec> regTcPtr;
+  regTcPtr.i= callbackData;
+  ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+  
+  ScanRecordPtr scanPtr;
+  c_scanRecordPool.getPtr(scanPtr, regTcPtr.p->tcScanRec);
+
+  if(disk_page > 0)
+  {
+    FragrecordPtr fragPtr;
+    c_fragment_pool.getPtr(fragPtr, regTcPtr.p->fragmentptr);
+
+    if (scanPtr.p->rangeScan) {
+      jam();
+      // for ordered index use primary table
+      fragPtr.p = c_fragment_pool.getPtr(fragPtr.p->tableFragptr);
+    }
+    
+    next_scanconf_tupkeyreq(signal, scanPtr, regTcPtr.p, fragPtr.p, disk_page);
+  }
+  else
+  {
+    TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
+    ref->userRef= callbackData;
+    ref->errorCode= disk_page;
+    execTUPKEYREF(signal);
   }
 }
 
+void
+Dblqh::next_scanconf_tupkeyreq(Signal* signal, 
+			       Ptr<ScanRecord> scanPtr,
+			       TcConnectionrec * regTcPtr,
+			       Fragrecord* fragPtrP,
+			       Uint32 disk_page)
+{
+  jam();
+  Uint32 reqinfo = (scanPtr.p->scanLockHold == ZFALSE);
+  reqinfo = reqinfo + (regTcPtr->operation << 6);
+  reqinfo = reqinfo + (regTcPtr->opExec << 10);
+  regTcPtr->transactionState = TcConnectionrec::SCAN_TUPKEY;
+  
+  TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend(); 
+  
+  tupKeyReq->connectPtr = regTcPtr->tupConnectrec;
+  tupKeyReq->request = reqinfo;
+  tupKeyReq->keyRef1 = scanPtr.p->scanLocalref[0];
+  tupKeyReq->keyRef2 = scanPtr.p->scanLocalref[1];
+  tupKeyReq->attrBufLen = 0;
+  tupKeyReq->opRef = scanPtr.p->scanApiOpPtr; 
+  tupKeyReq->applRef = scanPtr.p->scanApiBlockref;
+  tupKeyReq->storedProcedure = scanPtr.p->scanStoredProcId;
+  tupKeyReq->transId1 = regTcPtr->transid[0];
+  tupKeyReq->transId2 = regTcPtr->transid[1];
+  tupKeyReq->fragPtr = fragPtrP->tupFragptr;
+  tupKeyReq->primaryReplica = (regTcPtr->seqNoReplica == 0)?true:false;
+  tupKeyReq->coordinatorTC = regTcPtr->tcBlockref;
+  tupKeyReq->tcOpIndex = regTcPtr->tcOprec;
+  tupKeyReq->savePointId = regTcPtr->savePointId;
+  tupKeyReq->disk_page= disk_page;
+  Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
+  EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, 
+		 TupKeyReq::SignalLength);
+}
+
 /* -------------------------------------------------------------------------
  *       RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
  * -------------------------------------------------------------------------
@@ -8679,7 +8763,13 @@
   tcConnectptr.p->commitAckMarker = RNIL;
   tcConnectptr.p->m_offset_current_keybuf = 0;
   tcConnectptr.p->m_scan_curr_range_no = 0;
+  tcConnectptr.p->m_disk_table = tabptr.p->m_disk_table;
 
+  TablerecPtr tTablePtr;
+  tTablePtr.i = tabptr.p->primaryTableId;
+  ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
+  tcConnectptr.p->m_disk_table = tTablePtr.p->m_disk_table;  
+  
   tabptr.p->usageCount++;
 }//Dblqh::initScanTc()
 

--- 1.38/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Tue Mar  1 19:04:25 2005
+++ 1.39/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Fri Mar  4 07:45:43 2005
@@ -719,7 +719,22 @@
     Uint16 m_no_of_disk_attributes;
     Uint16 noOfKeyAttr;
     Uint16 noOfCharsets;
+
+    bool need_expand() const { 
+      return m_no_of_attributes > m_attributes[MM].m_no_of_fixsize;
+    }
+
+    bool need_expand(bool disk) const { 
+      return m_attributes[MM].m_no_of_varsize > 0 ||
+	(disk && m_no_of_disk_attributes > 0);
+    }
     
+    bool need_shrink() const {
+      return 
+	m_attributes[MM].m_no_of_varsize > 0 ||
+	m_attributes[DD].m_no_of_varsize > 0;
+    }
+
     /**
      * Descriptors for MM and DD part
      */
@@ -937,9 +952,8 @@
      */
     STATIC_CONST ( TUP_VERSION_MASK = 0xFFFF );
     STATIC_CONST ( CHAINED_ROW = 0x10000 ); // Is var part on different page
-    STATIC_CONST ( DISK_LOADED = 0x20000 ); // Is disk loaded
-    STATIC_CONST ( DISK_INLINE = 0x40000 ); // Is disk directly after mm
-    STATIC_CONST ( DISK_MOVED  = 0x80000 ); // Has diskpart moved
+    STATIC_CONST ( DISK_ALLOC  = 0x20000 ); // Is disk part allocated
+    STATIC_CONST ( DISK_INLINE = 0x40000 ); // Is disk inline
     
     Uint32 get_tuple_version() const { 
       return m_header_bits & TUP_VERSION_MASK;
@@ -953,6 +967,10 @@
     Uint32* get_null_bits(const Tablerec* tabPtrP) {
       return m_null_bits+tabPtrP->m_offsets[MM].m_null_offset;
     }
+
+    Uint32* get_null_bits(const Tablerec* tabPtrP, Uint32 mm) {
+      return m_null_bits+tabPtrP->m_offsets[mm].m_null_offset;
+    }
     
     Uint32* get_var_part_ptr(const Tablerec* tabPtrP) {
       return m_data + tabPtrP->m_offsets[MM].m_varpart_offset;      
@@ -1114,8 +1132,11 @@
    */
   bool tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1,
Uint32 transId2, Uint32 savePointId);
 
-  int load_diskpage(Signal*, Callback*, Uint32 opRec, Uint32 fragPtrI, 
+  int load_diskpage(Signal*, Uint32 opRec, Uint32 fragPtrI, 
 		    Uint32 local_key, Uint32 flags);
+
+  int load_diskpage_scan(Signal*, Uint32 opRec, Uint32 fragPtrI, 
+			 Uint32 local_key, Uint32 flags);
 private:
   BLOCK_DEFINES(Dbtup);
 
@@ -1335,10 +1356,9 @@
 //------------------------------------------------------------------
 //------------------------------------------------------------------
   void execTUPKEYREQ(Signal* signal);
-  void execTUPKEYREQ_page_callback(Signal*, Uint32 data, Uint32 errCode);
-  void execTUPKEYREQ_page_retreived(Signal* signal, KeyReqStruct*,
-				    Operationrec*, Fragrecord*, Tablerec*);
-  
+  void disk_page_load_callback(Signal*, Uint32 op, Uint32 page);
+  void disk_page_load_scan_callback(Signal*, Uint32 op, Uint32 page);
+
 //------------------------------------------------------------------
 //------------------------------------------------------------------
   void execATTRINFO(Signal* signal);
@@ -1622,19 +1642,41 @@
   bool readBitsNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
   bool updateBitsNotNULL(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
 
-  bool readFixedDiskNULLable(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
   bool updateFixedNULLable(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
-  bool readFixedDiskNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
   bool updateFixedNotNull(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
 
-  bool readVarDiskNULLable(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
   bool updateVarNULLable(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
-  bool readVarDiskNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
   bool updateVarNotNull(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
 
+
+  bool readDiskFixedSizeNotNULL(Uint32* outBuffer,
+				KeyReqStruct *req_struct,
+				AttributeHeader* ahOut,
+				Uint32  attrDes2);
+  
+  bool readDiskFixedSizeNULLable(Uint32* outBuffer,
+				 KeyReqStruct *req_struct,
+				 AttributeHeader* ahOut,
+				 Uint32  attrDes2);
+  bool readDiskVarSizeNULLable(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
+  bool readDiskVarSizeNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
+
+  bool updateDiskFixedSizeNULLable(Uint32*, KeyReqStruct*, Uint32);
+  bool updateDiskFixedSizeNotNULL(Uint32*, KeyReqStruct*, Uint32);
+
+  bool updateDiskVarSizeNULLable(Uint32*, KeyReqStruct *, Uint32);
+  bool updateDiskVarSizeNotNULL(Uint32*, KeyReqStruct *, Uint32);
+  
+  bool readDiskBitsNULLable(Uint32*, KeyReqStruct*, AttributeHeader*, Uint32);
+  bool readDiskBitsNotNULL(Uint32*, KeyReqStruct*, AttributeHeader*, Uint32);
+  bool updateDiskBitsNULLable(Uint32*, KeyReqStruct*, Uint32);
+  bool updateDiskBitsNotNULL(Uint32*, KeyReqStruct*, Uint32);
+
+
 //------------------------------------------------------------------
 //------------------------------------------------------------------
   bool nullFlagCheck(KeyReqStruct *req_struct, Uint32  attrDes2);
+  bool disk_nullFlagCheck(KeyReqStruct *req_struct, Uint32  attrDes2);
   Uint32 read_pseudo(Uint32 attrId,
                      KeyReqStruct *req_struct,
                      Uint32* outBuffer);
@@ -1912,7 +1954,8 @@
   bool setup_read(KeyReqStruct* req_struct,
 		  Operationrec* const regOperPtr,
 		  Fragrecord* const regFragPtr,
-		  Tablerec* const regTabPtr);
+		  Tablerec* const regTabPtr,
+		  bool disk);
   
   bool getPageLastCommitted(Operationrec* const regOperPtr,
                             Operationrec* const leaderOpPtr);
@@ -2245,6 +2288,7 @@
   Uint32* get_ptr(Var_part_ref);
   Uint32* get_ptr(Ptr<Var_page>*, Var_part_ref);
   Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*);
+  Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*, Uint32 mm);
 
   /**
    * prealloc space from disk
@@ -2261,9 +2305,9 @@
   void update_free_space(const Tablerec*, PagePtr, 
 			 DLList<Page>::Head list[], Uint32 i, Uint32 sz);
   
-  void disk_alloc_page_callback(Signal*, Uint32 page_request, Uint32 page_id);
-  void disk_alloc_page_callback_initial(Signal*, Uint32, Uint32);
-  void disk_alloc_page_callback_common(Signal*, 
+  void disk_page_alloc_callback(Signal*, Uint32 page_request, Uint32 page_id);
+  void disk_page_alloc_initial_callback(Signal*, Uint32, Uint32);
+  void disk_page_alloc_callback_common(Signal*, 
 				       Ptr<Page_request>, 
 				       Ptr<Tablerec>, 
 				       Ptr<GlobalPage>);
@@ -2286,7 +2330,7 @@
    * Setup all pointer on keyreqstruct to prepare for read
    *   req_struct->m_tuple_ptr is set to tuple to read
    */
-  void prepare_read(KeyReqStruct* req_struct, Tablerec* const regTabPtr);
+  void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
 };
 
 inline
@@ -2405,6 +2449,30 @@
   else
     return ((Fix_page*)tmp.p)->
       get_ptr(key->m_page_idx, regTabPtr->m_offsets[MM].m_fix_header_size);
+}
+
+inline
+Uint32*
+Dbtup::get_ptr(PagePtr* pagePtr, 
+	       const Local_key* key, const Tablerec* regTabPtr, Uint32 mm)
+{
+  PagePtr tmp;
+  tmp.i= key->m_page_no;
+  if(mm == MM)
+  {
+    ptrCheckGuard(tmp, cnoOfPage, page);
+  }
+  else
+  {
+    tmp.p= (Page*)m_global_page_pool.getPtr(tmp.i);
+  }
+  memcpy(pagePtr, &tmp, sizeof(tmp));
+  
+  if(regTabPtr->m_attributes[mm].m_no_of_varsize)
+    return ((Var_page*)tmp.p)->get_ptr(key->m_page_idx);
+  else
+    return ((Fix_page*)tmp.p)->
+      get_ptr(key->m_page_idx, regTabPtr->m_offsets[mm].m_fix_header_size);
 }
 
 NdbOut&

--- 1.28/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	Tue Mar  1 19:04:25 2005
+++ 1.29/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	Fri Mar  4 07:45:43 2005
@@ -205,33 +205,47 @@
       res *= 1;
     }
     
+    Uint32 copy_bits= copy->m_header_bits;
     
     Ptr<GlobalPage> gpage;
     m_global_page_pool.getPtr(gpage, (Uint32)res);
     Uint32 *dst;
-    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    Page* page= (Page*)m_pgman.m_ptr_p;
+    if(copy_bits & Tuple_header::DISK_ALLOC)
     {
-      Fix_page* page= (Fix_page*)m_pgman.m_ptr_p;
-      Uint32 idx= page->alloc_record();
-      page->uncommitted_used_space--;
-      dst= page->get_ptr(idx, regTabPtr->m_offsets[DD].m_fix_header_size);
-      
-      disk_ptr->m_operation_ptr_i= RNIL;
-      memcpy(dst, disk_ptr, 4*regTabPtr->m_offsets[DD].m_fix_header_size);
-      req.m_page.m_page_idx= idx;
-      memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), 
-	     &req.m_page, sizeof(Local_key));
-
-      if (regTabPtr->checksumIndicator) {
-	jam();
-	setChecksum(tuple_ptr, regTabPtr);
+      if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+      {
+	req.m_page.m_page_idx= ((Fix_page*)page)->alloc_record();
+	page->uncommitted_used_space--;
+      }
+      else
+      {
+	page->uncommitted_used_space -= req.m_page.m_page_idx;
+	req.m_page.m_page_idx= ((Var_page*)page)->
+	  alloc_record(req.m_page.m_page_idx, (Var_page*)ctemp_page);
       }
     }
+    
+    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    {
+      dst= ((Fix_page*)page)->
+	get_ptr(req.m_page.m_page_idx, 
+		regTabPtr->m_offsets[DD].m_fix_header_size);
+    }
     else
     {
-      abort();
+      dst= ((Var_page*)page)->get_ptr(req.m_page.m_page_idx);
     }
     
+    disk_ptr->m_operation_ptr_i= RNIL;
+    memcpy(dst, disk_ptr, 4*regTabPtr->m_offsets[DD].m_fix_header_size);
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), 
+	   &req.m_page, sizeof(Local_key));
+    
+    if (regTabPtr->checksumIndicator) {
+      jam();
+      setChecksum(tuple_ptr, regTabPtr);
+    }
   }
   
   tuple_ptr->m_operation_ptr_i= save;
@@ -481,7 +495,7 @@
     Local_key tmp= befOpPtr.p->m_tuple_location;
     Uint32 tmpType= befOpPtr.p->op_struct.op_type;
     setup_fixed_part(req_struct, befOpPtr.p, regTabPtr);
-    prepare_read(req_struct, regTabPtr);
+    prepare_read(req_struct, regTabPtr, false);
     
     befOpPtr.p->op_struct.op_type= ZDELETE;
     req_struct->changeMask.clear();

--- 1.43/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	Tue Mar  1 19:04:25 2005
+++ 1.44/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	Fri Mar  4 07:45:44 2005
@@ -331,14 +331,15 @@
 Dbtup::setup_read(KeyReqStruct *req_struct,
 		  Operationrec* const regOperPtr,
 		  Fragrecord* const regFragPtr,
-		  Tablerec* const regTabPtr)
+		  Tablerec* const regTabPtr,
+		  bool disk)
 {
   OperationrecPtr currOpPtr;
   currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
   if (currOpPtr.i == RNIL)
   {
-    if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
-      prepare_read(req_struct, regTabPtr);
+    if (regTabPtr->need_expand(disk))
+      prepare_read(req_struct, regTabPtr, disk);
     return true;
   }
 
@@ -398,8 +399,8 @@
 	c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
     }      
 
-    if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
-      prepare_read(req_struct, regTabPtr);
+    if (regTabPtr->need_expand(disk))
+      prepare_read(req_struct, regTabPtr, disk);
     
 #if 0
     ndbout_c("reading copy");
@@ -582,7 +583,7 @@
 
 int
 Dbtup::load_diskpage(Signal* signal, 
-		     Callback* cb, Uint32 opRec, Uint32 fragPtrI, 
+		     Uint32 opRec, Uint32 fragPtrI, 
 		     Uint32 local_key, Uint32 flags)
 {
   c_operation_pool.getPtr(operPtr, opRec);
@@ -614,20 +615,84 @@
   Tuple_header* ptr= (Tuple_header*)tmp;
   
   int res;
-  if(ptr->m_operation_ptr_i == RNIL)
+  Uint32 opPtr= ptr->m_operation_ptr_i;
+  if(opPtr == RNIL)
   {
-    // We're first
-    ptr->m_operation_ptr_i= opRec;
+    // Make sure we maintain order
+    flags |= Page_cache_client::STRICT_ORDER;
+  }  
+  
+  Page_cache_client::Request req;
+  memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+  req.m_callback.m_callbackData= opRec;
+  req.m_callback.m_callbackFunction= 
+    safe_cast(&Dbtup::disk_page_load_callback);
+  
+  if((res= m_pgman.get_page(signal, req, flags)) > 0)
+  {
+    ndbout_c("in cache");
+    // In cache
   } 
+  else if(res == 0)
+  {
+    ndbout_c("waiting for callback");
+    // set state
+  }
   else 
   {
+    // Error
+  }
+  return res;
+}
+
+void
+Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  c_lqh->acckeyconf_load_diskpage_callback(signal, 
+					   operPtr.p->userpointer, page_id);
+}
+
+int
+Dbtup::load_diskpage_scan(Signal* signal, 
+			  Uint32 opRec, Uint32 fragPtrI, 
+			  Uint32 local_key, Uint32 flags)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  fragptr.i= fragPtrI;
+  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
+  
+  Operationrec *  regOperPtr= operPtr.p;
+  Fragrecord * regFragPtr= fragptr.p;
+  
+  tabptr.i = regFragPtr->fragTableId;
+  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
+  Tablerec* regTabPtr = tabptr.p;
+  
+  jam();
+  Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
+  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
+  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
+						     frag_page_id);
+  regOperPtr->m_tuple_location.m_page_idx= page_idx;
+  
+  PagePtr page_ptr;
+  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
+  Tuple_header* ptr= (Tuple_header*)tmp;
+  
+  int res;
+  Uint32 opPtr= ptr->m_operation_ptr_i;
+  if(opPtr == RNIL)
+  {
     // Make sure we maintain order
     flags |= Page_cache_client::STRICT_ORDER;
   }  
   
   Page_cache_client::Request req;
   memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
-  memcpy(&req.m_callback, cb, sizeof(* cb));
+  req.m_callback.m_callbackData= opRec;
+  req.m_callback.m_callbackFunction= 
+    safe_cast(&Dbtup::disk_page_load_scan_callback);
   
   if((res= m_pgman.get_page(signal, req, flags)) > 0)
   {
@@ -646,6 +711,15 @@
   return res;
 }
 
+void
+Dbtup::disk_page_load_scan_callback(Signal* signal, 
+				    Uint32 opRec, Uint32 page_id)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  c_lqh->next_scanconf_load_diskpage_callback(signal, 
+					      operPtr.p->userpointer, page_id);
+}
+
 void Dbtup::execTUPKEYREQ(Signal* signal) 
 {
    TupKeyReq * const tupKeyReq= (TupKeyReq *)signal->getDataPtr();
@@ -705,7 +779,7 @@
    sig1= tupKeyReq->savePointId;
    sig2= tupKeyReq->primaryReplica;
    sig3= tupKeyReq->keyRef2;
-
+   
    regOperPtr->savepointId= sig1;
    regOperPtr->op_struct.primary_replica= sig2;
    regOperPtr->m_tuple_location.m_page_idx= sig3;
@@ -719,17 +793,20 @@
    req_struct.TC_index= sig2;
    req_struct.TC_ref= sig3;
    req_struct.frag_page_id= sig4;
-
+   
    sig1= tupKeyReq->attrBufLen;
    sig2= tupKeyReq->applRef;
    sig3= tupKeyReq->transId1;
    sig4= tupKeyReq->transId2;
 
+   Uint32 disk_page= tupKeyReq->disk_page;
+   
    req_struct.log_size= sig1;
    req_struct.attrinfo_len= sig1;
    req_struct.rec_blockref= sig2;
    req_struct.trans_id1= sig3;
    req_struct.trans_id2= sig4;
+   req_struct.m_disk_page_ptr.i= disk_page;
    Uint32 Roptype = regOperPtr->op_struct.op_type;
 
    if (Rstoredid != ZNIL) {
@@ -760,7 +837,7 @@
    if (Roptype == ZREAD) {
      jam();
      
-     if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr))
+     if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr, true))
      {
        if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1) 
        {
@@ -1020,7 +1097,7 @@
     tup_version= prevOp->tupVersion;
     org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
   }
-  
+
   /**
    * Check consistency before update/delete
    */
@@ -1041,7 +1118,7 @@
     Uint32 sizes[4];
     Uint64 cmp[2];
   };
-  if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
+  if (regTabPtr->need_expand())
     expand_tuple(req_struct, sizes, org, regTabPtr);
   else
     memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
@@ -1065,7 +1142,7 @@
     return -1;
   }
   
-  if (regTabPtr->var_sized_record) 
+  if (regTabPtr->need_shrink())
   {  
     shrink_tuple(req_struct, sizes+2, regTabPtr);
     if (cmp[0] != cmp[1] && !handle_size_change_after_update(req_struct,
@@ -1126,7 +1203,12 @@
       pos += AttributeDescriptor::getSizeInBytes(* (Uint32*)tab_descr); 
       tab_descr += 2;
     }
+  } 
+  else
+  {
+    ptr -= Tuple_header::HeaderSize;
   }
+
   req_struct->m_disk_ptr= (Tuple_header*)ptr;
   
   if(cnt2)
@@ -1146,7 +1228,7 @@
   memset(req_struct->m_disk_ptr->m_null_bits+
 	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
 	 4*regTabPtr->m_offsets[DD].m_null_words);
-  req_struct->m_tuple_ptr->m_header_bits= 0;
+  req_struct->m_tuple_ptr->m_header_bits= Tuple_header::DISK_ALLOC;
 }
 
 int Dbtup::handleInsertReq(Signal* signal,
@@ -1188,7 +1270,7 @@
 
     if(!prevOp->is_first_operation())
       org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
-    if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
+    if (regTabPtr->need_expand())
       expand_tuple(req_struct, sizes, org, regTabPtr);
     else
       memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
@@ -1205,7 +1287,7 @@
     return -1;
   }
   
-  if (regTabPtr->var_sized_record) 
+  if (regTabPtr->need_shrink())
   {  
     shrink_tuple(req_struct, sizes+2, regTabPtr);
   }
@@ -2263,6 +2345,7 @@
   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
   
   Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
+  const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
   const Uint32 *src_ptr= src->get_var_part_ptr(tabPtrP);
   const Uint32 *desc= (Uint32*)req_struct->attr_descr;
   desc += (tabPtrP->m_attributes[MM].m_no_of_fixsize << 1);
@@ -2291,8 +2374,12 @@
     ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
     src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
     sizes[MM]= 4*(Tuple_header::HeaderSize + tabPtrP->m_offsets[MM].m_varpart_offset)
+ ((Uint16*)src_data)[mm_vars] + (mm_vars * 2) + 2;
+  } 
+  else 
+  {
+    dst_ptr -= Tuple_header::HeaderSize;
   }
-
+  
   sizes[DD]= 0;
   if(dd_tot)
   {
@@ -2306,10 +2393,12 @@
     }
     else
     {
-      // m_disk_tuple_location is already set
-      // XXX
-      //src_ptr= get_disk_ptr(req_struct, &req_struct->m_disk_tuple_location);
+      Local_key key;
+      memcpy(&key, disk_ref, sizeof(key));
+      key.m_page_no= req_struct->m_disk_page_ptr.i;
+      src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
     }
+    
     // Fix diskpart
     req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
     memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
@@ -2332,7 +2421,8 @@
 }
 
 void
-Dbtup::prepare_read(KeyReqStruct* req_struct, Tablerec* const tabPtrP)
+Dbtup::prepare_read(KeyReqStruct* req_struct, 
+		    Tablerec* const tabPtrP, bool disk)
 {
   Tuple_header* ptr= req_struct->m_tuple_ptr;
   
@@ -2341,6 +2431,7 @@
   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
   
   const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
+  const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
   
   if(mm_vars)
   {
@@ -2358,7 +2449,7 @@
     src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
   }
   
-  if(dd_tot)
+  if(disk && dd_tot)
   {
     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
     
@@ -2369,9 +2460,11 @@
     }
     else
     {
-      // m_disk_tuple_location is already set
       // XXX
-      //src_ptr= get_disk_ptr(req_struct, &req_struct->m_disk_tuple_location);
+      Local_key key;
+      memcpy(&key, disk_ref, sizeof(key));
+      key.m_page_no= req_struct->m_disk_page_ptr.i;
+      src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
     }
     // Fix diskpart
     req_struct->m_disk_ptr= (Tuple_header*)src_ptr;

--- 1.34/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	Wed Feb 16 13:31:07 2005
+++ 1.35/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	Fri Mar  4 07:45:44 2005
@@ -182,7 +182,7 @@
   fragptr= fragPtr;
   operPtr.i= RNIL;
   operPtr.p= NULL;
-  prepare_read(&req_struct, tablePtr.p); 
+  prepare_read(&req_struct, tablePtr.p, false); 
 
   // do it
   int ret = readAttributes(&req_struct,
@@ -227,7 +227,7 @@
       req_struct.m_tuple_ptr= (Tuple_header*)
 	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
   }
-  prepare_read(&req_struct, tablePtr.p);
+  prepare_read(&req_struct, tablePtr.p, false);
 
   const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
   const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
@@ -328,7 +328,7 @@
   req_struct.dirty_op= 1;
   
   setup_fixed_part(&req_struct, &tempOp, tablePtr.p);
-  if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p)) {
+  if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p, false)) {
     /*
      * We use the normal getPage which will return the tuple to be used
      * for this transaction and savepoint id.  If its tuple version

--- 1.28/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	Thu Feb 24 01:40:28 2005
+++ 1.29/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	Fri Mar  4 07:45:44 2005
@@ -119,6 +119,35 @@
       } else {
         ndbrequire(false);
       }
+      if(AttributeDescriptor::getDiskBased(attrDescr))
+      {
+	ReadFunction r[] = {
+	  &Dbtup::readDiskBitsNotNULL,
+	  &Dbtup::readDiskBitsNULLable,
+	  &Dbtup::readDiskFixedSizeNotNULL,
+	  &Dbtup::readDiskFixedSizeNULLable,
+	  &Dbtup::readDiskVarSizeNULLable,
+	  &Dbtup::readDiskVarSizeNotNULL
+	};
+	UpdateFunction u[] = {
+	  &Dbtup::updateDiskBitsNotNULL,
+	  &Dbtup::updateDiskBitsNULLable,
+	  &Dbtup::updateDiskFixedSizeNotNULL,
+	  &Dbtup::updateDiskFixedSizeNULLable,
+	  &Dbtup::updateDiskVarSizeNULLable,
+	  &Dbtup::updateDiskVarSizeNotNULL
+	};
+	Uint32 a= 
+	  AttributeDescriptor::getArrayType(attrDescr)==ZFIXED_ARRAY ? 2 : 4;
+	
+	if(AttributeDescriptor::getArraySize(attrDescr) == 0)
+	  a= 0;
+	
+	Uint32 b= 
+	  AttributeDescriptor::getNullable(attrDescr)? 1 : 0;
+	regTabPtr->readFunctionArray[i]= r[a+b];
+	regTabPtr->updateFunctionArray[i]= u[a+b];
+      }
     } else {
       if (AttributeDescriptor::getArrayType(attrDescr) == ZFIXED_ARRAY) {
         ljam();
@@ -423,6 +452,16 @@
 }
 
 bool
+Dbtup::disk_nullFlagCheck(KeyReqStruct *req_struct, Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr= tabptr.p;
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr);
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  
+  return BitmaskImpl::get(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+}
+
+bool
 Dbtup::readVarSizeNotNULL(Uint32* out_buffer,
                           KeyReqStruct *req_struct,
                           AttributeHeader* ah_out,
@@ -502,6 +541,153 @@
   return false;
 }//Dbtup::readDynBigVarSize()
 
+bool
+Dbtup::readDiskFixedSizeNotNULL(Uint32* outBuffer,
+				KeyReqStruct *req_struct,
+				AttributeHeader* ahOut,
+				Uint32  attrDes2)
+{
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+  Uint32 indexBuf= req_struct->out_buf_index;
+  Uint32 readOffset= AttributeOffset::getOffset(attrDes2);
+  Uint32 attrNoOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint32 maxRead= req_struct->max_read;
+  Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+
+  ndbrequire((readOffset + attrNoOfWords - 1) < req_struct->check_offset);
+  if (! charsetFlag || ! req_struct->xfrm_flag) {
+    Uint32 newIndexBuf = indexBuf + attrNoOfWords;
+    if (newIndexBuf <= maxRead) {
+      ljam();
+      ahOut->setDataSize(attrNoOfWords);
+      MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+                       &tuple_header[readOffset],
+                       attrNoOfWords);
+      req_struct->out_buf_index = newIndexBuf;
+      return true;
+    } else {
+      ljam();
+      terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    }//if
+  } else {
+    ljam();
+    Tablerec* regTabPtr = tabptr.p;
+    Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+    uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+    const uchar* srcPtr = (uchar*)&tuple_header[readOffset];
+    Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+    ndbrequire(i < regTabPtr->noOfCharsets);
+    CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+    Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+    Uint32 lb, len;
+    bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+    Uint32 xmul = cs->strxfrm_multiply;
+    if (xmul == 0)
+      xmul = 1;
+    Uint32 dstLen = xmul * (srcBytes - lb);
+    Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
+    if (maxIndexBuf <= maxRead && ok) {
+      ljam();
+      uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+      const char* ssrcPtr = (const char*)srcPtr;
+      int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+      ndbrequire(n != -1);
+      while ((n & 3) != 0) {
+        dstPtr[n++] = 0;
+      }
+      Uint32 dstWords = (n >> 2);
+      ahOut->setDataSize(dstWords);
+      Uint32 newIndexBuf = indexBuf + dstWords;
+      ndbrequire(newIndexBuf <= maxRead);
+      req_struct->out_buf_index = newIndexBuf;
+      return true;
+    } else {
+      ljam();
+      terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    }
+  } 
+  return false;
+}
+
+bool
+Dbtup::readDiskFixedSizeNULLable(Uint32* outBuffer,
+				 KeyReqStruct *req_struct,
+				 AttributeHeader* ahOut,
+				 Uint32  attrDes2)
+{
+  if (!disk_nullFlagCheck(req_struct, attrDes2)) {
+    ljam();
+    return readDiskFixedSizeNotNULL(outBuffer,
+				    req_struct,
+				    ahOut,
+				    attrDes2);
+  } else {
+    ljam();
+    ahOut->setNULL();
+    return true;
+  }
+}
+
+bool
+Dbtup::readDiskVarSizeNotNULL(Uint32* out_buffer,
+			      KeyReqStruct *req_struct,
+			      AttributeHeader* ah_out,
+			      Uint32  attr_des2)
+{
+  Uint32 attr_descriptor, index_buf, var_index;
+  Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
+  Uint32 var_attr_pos, max_read;
+
+  Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+  var_index= AttributeOffset::getOffset(attr_des2);
+  var_attr_pos= req_struct->m_var_data[DD].m_offset_array_ptr[var_index];
+  vsize_in_bytes= req_struct->m_var_data[DD].m_offset_array_ptr[var_index+idx] -
var_attr_pos;
+  attr_descriptor= req_struct->attr_descriptor;
+  index_buf= req_struct->out_buf_index;
+  max_var_size= AttributeDescriptor::getSizeInWords(attr_descriptor);
+  max_read= req_struct->max_read;
+  vsize_in_words= convert_byte_to_word_size(vsize_in_bytes);
+  new_index= index_buf + vsize_in_words;
+
+  ndbrequire(vsize_in_words <= max_var_size);
+  if (new_index <= max_read) {
+    ljam();
+    ah_out->setDataSize(vsize_in_words);
+    memcpy(out_buffer+index_buf,
+	   req_struct->m_var_data[DD].m_data_ptr+var_attr_pos,
+	   vsize_in_bytes);
+    char* rest= (char*)(out_buffer+index_buf);
+    memset(rest+vsize_in_bytes, 0, 4*vsize_in_words - vsize_in_bytes);
+    req_struct->out_buf_index= new_index;
+    return true;
+  } else {
+    ljam();
+    terrorCode= ZTRY_TO_READ_TOO_MUCH_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::readDiskVarSizeNULLable(Uint32* outBuffer,
+			       KeyReqStruct *req_struct,
+			       AttributeHeader* ahOut,
+			       Uint32  attrDes2)
+{
+  if (!disk_nullFlagCheck(req_struct, attrDes2)) {
+    ljam();
+    return readDiskVarSizeNotNULL(outBuffer,
+				  req_struct,
+				  ahOut,
+				  attrDes2);
+  } else {
+    ljam();
+    ahOut->setNULL();
+    return true;
+  }
+}
+
+
 /* ---------------------------------------------------------------------- */
 /*       THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS     */
 /*       USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE     */
@@ -1055,6 +1241,327 @@
     {
       ljam();
       BitmaskImpl::set(regTabPtr->m_offsets[MM].m_null_words, bits, pos);
+      
+      req_struct->in_buf_index = newIndex;
+      return true;
+    } else {
+      ljam();
+      terrorCode = ZAI_INCONSISTENCY_ERROR;
+      return false;
+    }//if
+  }//if
+}
+
+bool
+Dbtup::updateDiskFixedSizeNotNULL(Uint32* inBuffer,
+				  KeyReqStruct *req_struct,
+				  Uint32  attrDes2)
+{
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 indexBuf= req_struct->in_buf_index;
+  Uint32 inBufLen= req_struct->in_buf_len;
+  Uint32 updateOffset= AttributeOffset::getOffset(attrDes2);
+  Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+  
+  AttributeHeader ahIn(inBuffer[indexBuf]);
+  Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 newIndex= indexBuf + noOfWords + 1;
+  Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+  ndbrequire((updateOffset + noOfWords - 1) < req_struct->check_offset);
+
+  if (newIndex <= inBufLen) {
+    if (!nullIndicator) {
+      ljam();
+      if (charsetFlag) {
+        ljam();
+        Tablerec* regTabPtr = tabptr.p;
+	Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+        Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+        Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+        ndbrequire(i < regTabPtr->noOfCharsets);
+        // not const in MySQL
+        CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+        const char* ssrc = (const char*)&inBuffer[indexBuf + 1];
+        Uint32 lb, len;
+        if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) {
+          ljam();
+          terrorCode = ZINVALID_CHAR_FORMAT;
+          return false;
+        }
+	// fast fix bug#7340
+        if (typeId != NDB_TYPE_TEXT &&
+	    (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL) != len) {
+          ljam();
+          terrorCode = ZINVALID_CHAR_FORMAT;
+          return false;
+        }
+      }
+      req_struct->in_buf_index= newIndex;
+      MEMCOPY_NO_WORDS(&tuple_header[updateOffset],
+                       &inBuffer[indexBuf + 1],
+                       noOfWords);
+      return true;
+    } else {
+      ljam();
+      terrorCode= ZNOT_NULL_ATTR;
+      return false;
+    }
+  } else {
+    ljam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::updateDiskFixedSizeNULLable(Uint32* inBuffer,
+				   KeyReqStruct *req_struct,
+				   Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr=  tabptr.p;
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  
+  if (!nullIndicator) {
+    ljam();
+    BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+    return updateDiskFixedSizeNotNULL(inBuffer,
+				      req_struct,
+				      attrDes2);
+  } else {
+    Uint32 newIndex= req_struct->in_buf_index + 1;
+    if (newIndex <= req_struct->in_buf_len) {
+      BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+      ljam();
+      req_struct->in_buf_index= newIndex;
+      return true;
+    } else {
+      ljam();
+      terrorCode= ZAI_INCONSISTENCY_ERROR;
+      return false;
+    }
+  }
+}
+
+bool
+Dbtup::updateDiskVarSizeNotNULL(Uint32* in_buffer,
+                            KeyReqStruct *req_struct,
+                            Uint32 attr_des2)
+{
+  Uint32 attr_descriptor, index_buf, in_buf_len, var_index, null_ind;
+  Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
+  Uint32 var_attr_pos;
+  char *var_data_start;
+  Uint16 *vpos_array;
+
+  attr_descriptor= req_struct->attr_descriptor;
+  index_buf= req_struct->in_buf_index;
+  in_buf_len= req_struct->in_buf_len;
+  var_index= AttributeOffset::getOffset(attr_des2);
+  AttributeHeader ahIn(in_buffer[index_buf]);
+  null_ind= ahIn.isNULL();
+  vsize_in_words= ahIn.getDataSize();
+  max_var_size= AttributeDescriptor::getSizeInBytes(attr_descriptor);
+  new_index= index_buf + vsize_in_words + 1;
+  vpos_array= req_struct->m_var_data[DD].m_offset_array_ptr;
+  Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+  Uint32 check_offset= req_struct->m_var_data[DD].m_max_var_offset;
+  
+  if (new_index <= in_buf_len && vsize_in_words <= max_var_size) {
+    if (!null_ind) {
+      ljam();
+      int size_in_bytes= 4*vsize_in_words > max_var_size ? max_var_size :
4*vsize_in_words;
+
+      var_attr_pos= vpos_array[var_index];
+      var_data_start= req_struct->m_var_data[DD].m_data_ptr;
+      vpos_array[var_index+idx]= var_attr_pos+size_in_bytes;
+      req_struct->in_buf_index= new_index;
+
+      ndbrequire(var_attr_pos+vsize_in_words*4 <= check_offset);
+      memcpy(var_data_start+var_attr_pos, &in_buffer[index_buf + 1],
+	     size_in_bytes);
+      return true;
+    } else {
+      ljam();
+      terrorCode= ZNOT_NULL_ATTR;
+      return false;
+    }
+  } else {
+    ljam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::updateDiskVarSizeNULLable(Uint32* inBuffer,
+				 KeyReqStruct *req_struct,
+				 Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr=  tabptr.p;
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+  
+  if (!nullIndicator) {
+    ljam();
+    BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+    return updateDiskVarSizeNotNULL(inBuffer,
+				    req_struct,
+				    attrDes2);
+  } else {
+    Uint32 newIndex= req_struct->in_buf_index + 1;
+    Uint32 var_index= AttributeOffset::getOffset(attrDes2);
+    Uint32 var_pos= req_struct->var_pos_array[var_index];
+    if (newIndex <= req_struct->in_buf_len) {
+      ljam();
+      BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+      req_struct->var_pos_array[var_index+idx]= var_pos;
+      req_struct->in_buf_index= newIndex;
+      return true;
+    } else {
+      ljam();
+      terrorCode= ZAI_INCONSISTENCY_ERROR;
+      return false;
+    }
+  }
+}
+
+bool
+Dbtup::readDiskBitsNotNULL(Uint32* outBuffer,
+			   KeyReqStruct* req_struct,
+			   AttributeHeader* ahOut,
+			   Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr = tabptr.p;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 bitCount = 
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  Uint32 indexBuf = req_struct->out_buf_index;
+  Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+  Uint32 maxRead = req_struct->max_read;
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  if (newIndexBuf <= maxRead) {
+    ljam();
+    ahOut->setDataSize((bitCount + 31) >> 5);
+    req_struct->out_buf_index = newIndexBuf;
+    
+    BitmaskImpl::getField(regTabPtr->m_offsets[DD].m_null_words, bits, pos, 
+			  bitCount, outBuffer+indexBuf);
+    
+    return true;
+  } else {
+    ljam();
+    terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    return false;
+  }//if
+}
+
+bool
+Dbtup::readDiskBitsNULLable(Uint32* outBuffer,
+			    KeyReqStruct* req_struct,
+			    AttributeHeader* ahOut,
+			    Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr = tabptr.p;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 bitCount = 
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  
+  Uint32 indexBuf = req_struct->out_buf_index;
+  Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+  Uint32 maxRead = req_struct->max_read;
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  
+  if(BitmaskImpl::get(regTabPtr->m_offsets[DD].m_null_words, bits, pos))
+  {
+    ljam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  if (newIndexBuf <= maxRead) {
+    ljam();
+    ahOut->setDataSize((bitCount + 31) >> 5);
+    req_struct->out_buf_index = newIndexBuf;
+    BitmaskImpl::getField(regTabPtr->m_offsets[DD].m_null_words, bits, pos+1, 
+			  bitCount, outBuffer+indexBuf);
+    return true;
+  } else {
+    ljam();
+    terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    return false;
+  }//if
+}
+
+bool
+Dbtup::updateDiskBitsNotNULL(Uint32* inBuffer,
+			     KeyReqStruct* req_struct,
+			     Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr =  tabptr.p;
+  Uint32 indexBuf = req_struct->in_buf_index;
+  Uint32 inBufLen = req_struct->in_buf_len;
+  AttributeHeader ahIn(inBuffer[indexBuf]);
+  Uint32 nullIndicator = ahIn.isNULL();
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 bitCount = 
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  
+  if (newIndex <= inBufLen) {
+    if (!nullIndicator) {
+      BitmaskImpl::setField(regTabPtr->m_offsets[DD].m_null_words, bits, pos, 
+			    bitCount, inBuffer+indexBuf+1);
+      req_struct->in_buf_index = newIndex;
+      return true;
+    } else {
+      ljam();
+      terrorCode = ZNOT_NULL_ATTR;
+      return false;
+    }//if
+  } else {
+    ljam();
+    terrorCode = ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }//if
+  return true;
+}
+
+bool
+Dbtup::updateDiskBitsNULLable(Uint32* inBuffer,
+			      KeyReqStruct* req_struct,
+			      Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr =  tabptr.p;
+  Uint32 indexBuf = req_struct->in_buf_index;
+  AttributeHeader ahIn(inBuffer[indexBuf]);
+  Uint32 nullIndicator = ahIn.isNULL();
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 bitCount = 
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr);
+  
+  if (!nullIndicator) {
+    BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+    BitmaskImpl::setField(regTabPtr->m_offsets[DD].m_null_words, bits, pos+1, 
+			  bitCount, inBuffer+indexBuf+1);
+    
+    Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+    req_struct->in_buf_index = newIndex;
+    return true;
+  } else {
+    Uint32 newIndex = indexBuf + 1;
+    if (newIndex <= req_struct->in_buf_len)
+    {
+      ljam();
+      BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
       
       req_struct->in_buf_index = newIndex;
       return true;
Thread
bk commit into 5.1-ndb tree (joreland:1.1762)jonas.oreland4 Mar