List:Commits« Previous MessageNext Message »
From:jonas Date:December 23 2005 7:34am
Subject:bk commit into 5.1 tree (jonas:1.1976)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1976 05/12/23 08:34:27 jonas@eel.(none) +4 -0
  ndb - optnr
    structure trace functionality
    fix handle_nr_copy wrt varsize

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.34 05/12/23 08:34:19 jonas@eel.(none) +30 -4
    Fix varsize handling in opt nr
      Modify fix_header_size...

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    1.94 05/12/23 08:34:19 jonas@eel.(none) +108 -97
    Make tracenr more "flexible"

  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
    1.47 05/12/23 08:34:19 jonas@eel.(none) +1 -1
    Add line to FSCLOSEREQ

  storage/ndb/src/common/debugger/signaldata/FsCloseReq.cpp
    1.4 05/12/23 08:34:19 jonas@eel.(none) +2 -1
    More printouts

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	eel.(none)
# Root:	/home/jonas/src/51-ndb

--- 1.3/storage/ndb/src/common/debugger/signaldata/FsCloseReq.cpp	2005-04-08 02:43:55 +02:00
+++ 1.4/storage/ndb/src/common/debugger/signaldata/FsCloseReq.cpp	2005-12-23 08:34:19 +01:00
@@ -36,5 +36,6 @@
   else
     fprintf(output, "Don't remove file");
   fprintf(output, "\n");
-  return true;
+
+  return len == 4;
 }

--- 1.46/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2005-12-21 16:47:15 +01:00
+++ 1.47/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2005-12-23 08:34:19 +01:00
@@ -2278,7 +2278,7 @@
   void checkReadExecSr(Signal* signal);
   void checkScanTcCompleted(Signal* signal);
   void checkSrCompleted(Signal* signal);
-  void closeFile(Signal* signal, LogFileRecordPtr logFilePtr);
+  void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place);
   void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place);
   void deleteFragrec(Uint32 fragId);
   void deleteTransidHash(Signal* signal);

--- 1.93/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2005-12-21 16:47:15 +01:00
+++ 1.94/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2005-12-23 08:34:19 +01:00
@@ -118,9 +118,19 @@
 
 const Uint32 NR_ScanNo = 0;
 
-static FileOutputStream pkk_fileoutputstream(fopen("pkk.log", "w+"));
-NdbOut pkkout(pkk_fileoutputstream);
-static int PKK = 0;
+#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
+static FileOutputStream tracenr_fos(fopen("ndbd_tracenr.log", "w+"));
+NdbOut tracenrout(tracenr_fos);
+static int TRACENR_FLAG = 0;
+#define TRACENR(x) tracenrout << x
+#define SET_TRACENR_FLAG TRACENR_FLAG = 1
+#define CLEAR_TRACENR_FLAG TRACENR_FLAG = 0
+#else
+#define TRACENR_FLAG 0
+#define TRACENR(x)
+#define SET_TRACENR_FLAG
+#define CLEAR_TRACENR_FLAG
+#endif
 
 /* ------------------------------------------------------------------------- */
 /* -------               SEND SYSTEM ERROR                           ------- */
@@ -782,13 +792,13 @@
   else if (cstartType == NodeState::ST_NODE_RESTART)
   {
     jam();
-    PKK = 1;
+    SET_TRACENR_FLAG;
     m_sr_nodes.clear();
     m_sr_nodes.set(getOwnNodeId());
     sendNdbSttorryLab(signal);
     return;
   }
-  PKK = 1;
+  SET_TRACENR_FLAG;
   
   checkStartCompletedLab(signal);
   return;
@@ -819,7 +829,7 @@
 {
   cstartPhase = ZNIL;
   cstartType = ZNIL;
-  PKK = 0;
+  CLEAR_TRACENR_FLAG;
   sendNdbSttorryLab(signal);
   return;
 }//Dblqh::startphase6Lab()
@@ -2495,17 +2505,7 @@
   switch (tcConnectptr.p->transactionState) {
   case TcConnectionrec::WAIT_TUP:
     jam();
-    ndbrequire(terrorCode != 626);
-    if (regTcPtr->activeCreat == ZTRUE && 
-	regTcPtr->operation == ZINSERT &&
-	terrorCode == 899)
-    {
-      ndbrequire(false);
-    }
-    else
-    {
-      abortErrorLab(signal);
-    }
+    abortErrorLab(signal);
     break;
   case TcConnectionrec::COPY_TUPKEY:
     ndbrequire(false);
@@ -3694,31 +3694,31 @@
 /*       BEFORE SENDING THE MESSAGE THE REQUEST INFORMATION IS SET   */
 /*       PROPERLY.                                                   */
 /* ----------------------------------------------------------------- */
-  if (PKK)
+  if (TRACENR_FLAG)
   {
     switch (regTcPtr->operation) {
-    case ZREAD: pkkout << "READ"; break;
-    case ZUPDATE: pkkout << "UPDATE"; break;
-    case ZWRITE: pkkout << "WRITE"; break;
-    case ZINSERT: pkkout << "INSERT"; break;
-    case ZDELETE: pkkout << "DELETE"; break;
-    default: pkkout << "<Unknown: " << regTcPtr->operation << ">"; break;
+    case ZREAD: TRACENR("READ"); break;
+    case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZWRITE: TRACENR("WRITE"); break;
+    case ZINSERT: TRACENR("INSERT"); break;
+    case ZDELETE: TRACENR("DELETE"); break;
+    default: TRACENR("<Unknown: " << regTcPtr->operation << ">"); break;
     }
     
-    pkkout << " tab: " << regTcPtr->tableref 
+    TRACENR(" tab: " << regTcPtr->tableref 
 	   << " frag: " << regTcPtr->fragmentid
-	   << " activeCreat: " << (Uint32)activeCreat;
+	   << " activeCreat: " << (Uint32)activeCreat);
     if (LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo))
-      pkkout << " NrCopy";
+      TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
-      pkkout << " rowid: " << regTcPtr->m_row_id;
-    pkkout << " key: " << regTcPtr->tupkeyData[0];
+      TRACENR(" rowid: " << regTcPtr->m_row_id);
+    TRACENR(" key: " << regTcPtr->tupkeyData[0]);
   }
   
   if (likely(activeCreat == Fragrecord::AC_NORMAL))
   {
-    if (PKK)
-      pkkout << endl;
+    if (TRACENR_FLAG)
+      TRACENR(endl);
     ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo));
     exec_acckeyreq(signal, tcConnectptr);
   } 
@@ -3729,8 +3729,8 @@
   else
   {
     ndbassert(activeCreat == Fragrecord::AC_IGNORED);
-    if (PKK)
-      pkkout << " IGNORING (activeCreat == 2)" << endl;
+    if (TRACENR_FLAG)
+      TRACENR(" IGNORING (activeCreat == 2)" << endl);
     
     signal->theData[0] = tc_ptr_i;
     regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
@@ -3815,8 +3815,8 @@
      * Rowid not set, that mean that primary has finished copying...
      */
     jam();
-    if (PKK)
-      pkkout << " Waiting for COPY_ACTIVEREQ" << endl;
+    if (TRACENR_FLAG)
+      TRACENR(" Waiting for COPY_ACTIVEREQ" << endl);
     ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo));
     regTcPtr.p->activeCreat = Fragrecord::AC_NORMAL;
     exec_acckeyreq(signal, regTcPtr);
@@ -3829,9 +3829,9 @@
 				    uncommitted);
   const bool match = (len>0) ? compare_key(regTcPtr.p, dst, len) == 0 : false;
   
-  if (PKK)
-    pkkout << " len: " << len << " match: " << match 
-	   << " uncommitted: " << uncommitted;
+  if (TRACENR_FLAG)
+    TRACENR(" len: " << len << " match: " << match 
+	   << " uncommitted: " << uncommitted);
 
   if (copy)
   {
@@ -3843,8 +3843,8 @@
        */
       jam();
       ndbassert(op == ZINSERT);
-      if (PKK)
-	pkkout << " Changing from INSERT to ZUPDATE" << endl;
+      if (TRACENR_FLAG)
+	TRACENR(" Changing from INSERT to ZUPDATE" << endl);
       regTcPtr.p->operation = ZUPDATE;
       goto run;
     }
@@ -3858,9 +3858,9 @@
        */
       jam();
       ndbassert(regTcPtr.p->primKeyLen == 0);
-      if (PKK)
-	pkkout << " performing DELETE key: " 
-	       << dst[0] << endl; 
+      if (TRACENR_FLAG)
+	TRACENR(" performing DELETE key: " 
+	       << dst[0] << endl); 
       regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref();
       if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
       {
@@ -3878,8 +3878,8 @@
        * Case 7
        */
       jam();
-      if (PKK)
-	pkkout << " UPDATE_GCI" << endl; 
+      if (TRACENR_FLAG)
+	TRACENR(" UPDATE_GCI" << endl); 
       c_tup->nr_update_gci(fragPtr, &regTcPtr.p->m_row_id, regTcPtr.p->gci);
       goto update_gci_ignore;
     }
@@ -3902,8 +3902,8 @@
      */
     jam();
     nr_copy_delete_row(signal, regTcPtr, 0, 0);
-    if (PKK)
-      pkkout << " RUN INSERT" << endl; 
+    if (TRACENR_FLAG)
+      TRACENR(" RUN INSERT" << endl); 
     goto run;
   }
   else
@@ -3911,8 +3911,8 @@
     if (!match && op != ZINSERT)
     {
       jam();
-      if (PKK)
-	pkkout << " IGNORE " << endl; 
+      if (TRACENR_FLAG)
+	TRACENR(" IGNORE " << endl); 
       goto ignore;
     }
     if (match)
@@ -3920,8 +3920,8 @@
       jam();
       if (op != ZDELETE)
       {
-	if (PKK)
-	  pkkout << " Changing from to ZWRITE" << endl;
+	if (TRACENR_FLAG)
+	  TRACENR(" Changing from to ZWRITE" << endl);
 	regTcPtr.p->operation = ZWRITE;
       }
       goto run;
@@ -3946,8 +3946,8 @@
      */
     jam();
     nr_copy_delete_row(signal, regTcPtr, 0, 0);
-    if (PKK)
-      pkkout << " RUN op: " << op << endl; 
+    if (TRACENR_FLAG)
+      TRACENR(" RUN op: " << op << endl); 
     goto run;
   }
   
@@ -4090,7 +4090,7 @@
   EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 5);
   jamEntry();
 
-  pkkout << "DELETED: " << regTcPtr.p->m_row_id << endl;
+  TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
 
   regTcPtr.p->m_dealloc = 0;
   regTcPtr.p->m_row_id = save;
@@ -4149,13 +4149,13 @@
   jamEntry();
   regTcPtr.i = signal->theData[4];
   
-  if (PKK)
+  if (TRACENR_FLAG)
   {
     Local_key tmp;
     tmp.m_page_no = signal->theData[2];
     tmp.m_page_idx = signal->theData[3];
-    pkkout << "TUP_DEALLOC: " << tmp << 
-      (signal->theData[5] ? " DIRECT " : " DELAYED") << endl;
+    TRACENR("TUP_DEALLOC: " << tmp << 
+      (signal->theData[5] ? " DIRECT " : " DELAYED") << endl);
   }
   
   if (signal->theData[5])
@@ -5365,6 +5365,9 @@
     jam();
     regTcPtr->m_dealloc = 0;
 
+    if (TRACENR_FLAG)
+      TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
+    
     signal->theData[0] = regTcPtr->fragmentid;
     signal->theData[1] = regTcPtr->tableref;
     signal->theData[2] = regTcPtr->m_row_id.m_page_no;
@@ -6055,26 +6058,26 @@
 	return; // TUP_COMMIT was timesliced
       }
       
-      if (PKK)
+      if (TRACENR_FLAG)
       {
-	pkkout << "COMMIT: ";
+	TRACENR("COMMIT: ");
 	switch (regTcPtr.p->operation) {
-	case ZREAD: pkkout << "READ"; break;
-	case ZUPDATE: pkkout << "UPDATE"; break;
-	case ZWRITE: pkkout << "WRITE"; break;
-	case ZINSERT: pkkout << "INSERT"; break;
-	case ZDELETE: pkkout << "DELETE"; break;
+	case ZREAD: TRACENR("READ"); break;
+	case ZUPDATE: TRACENR("UPDATE"); break;
+	case ZWRITE: TRACENR("WRITE"); break;
+	case ZINSERT: TRACENR("INSERT"); break;
+	case ZDELETE: TRACENR("DELETE"); break;
 	}
 
-	pkkout << " tab: " << regTcPtr.p->tableref 
+	TRACENR(" tab: " << regTcPtr.p->tableref 
 	       << " frag: " << regTcPtr.p->fragmentid
-	       << " activeCreat: " << (Uint32)regTcPtr.p->activeCreat;
+	       << " activeCreat: " << (Uint32)regTcPtr.p->activeCreat);
 	if (LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo))
-	  pkkout << " NrCopy";
+	  TRACENR(" NrCopy");
 	if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
-	  pkkout << " rowid: " << regTcPtr.p->m_row_id;
-	pkkout << " key: " << regTcPtr.p->tupkeyData[0];
-	pkkout << endl;
+	  TRACENR(" rowid: " << regTcPtr.p->m_row_id);
+	TRACENR(" key: " << regTcPtr.p->tupkeyData[0]);
+	TRACENR(endl);
       }
 
       Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
@@ -6513,27 +6516,27 @@
   const Uint32 errCode = terrorCode; 
   tcPtr->errorCode = errCode;
 
-  if (PKK)
+  if (TRACENR_FLAG)
   {
-    pkkout << "ACCKEYREF: " << errCode << " ";
+    TRACENR("ACCKEYREF: " << errCode << " ");
     switch (tcPtr->operation) {
-    case ZREAD: pkkout << "READ"; break;
-    case ZUPDATE: pkkout << "UPDATE"; break;
-    case ZWRITE: pkkout << "WRITE"; break;
-    case ZINSERT: pkkout << "INSERT"; break;
-    case ZDELETE: pkkout << "DELETE"; break;
-    default: pkkout << "<Unknown: " << tcPtr->operation << ">"; break;
+    case ZREAD: TRACENR("READ"); break;
+    case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZWRITE: TRACENR("WRITE"); break;
+    case ZINSERT: TRACENR("INSERT"); break;
+    case ZDELETE: TRACENR("DELETE"); break;
+    default: TRACENR("<Unknown: " << tcPtr->operation << ">"); break;
     }
     
-    pkkout << " tab: " << tcPtr->tableref 
+    TRACENR(" tab: " << tcPtr->tableref 
 	   << " frag: " << tcPtr->fragmentid
-	   << " activeCreat: " << (Uint32)tcPtr->activeCreat;
+	   << " activeCreat: " << (Uint32)tcPtr->activeCreat);
     if (LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo))
-      pkkout << " NrCopy";
+      TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
-      pkkout << " rowid: " << tcPtr->m_row_id;
-    pkkout << " key: " << tcPtr->tupkeyData[0];
-    pkkout << endl;
+      TRACENR(" rowid: " << tcPtr->m_row_id);
+    TRACENR(" key: " << tcPtr->tupkeyData[0]);
+    TRACENR(endl);
     
   }
 
@@ -10435,6 +10438,11 @@
   }//if
   
   fragptr.p->fragStatus = Fragrecord::FSACTIVE;
+  if (TRACENR_FLAG)
+    TRACENR("tab: " << tabptr.i 
+	    << " frag: " << fragId 
+	    << " COPY ACTIVE" << endl);
+  
   if (fragptr.p->lcpFlag == Fragrecord::LCP_STATE_TRUE) {
     jam();
     fragptr.p->logFlag = Fragrecord::STATE_TRUE;
@@ -11376,7 +11384,8 @@
     return;
   }
 
-  if(getNodeState().getNodeRestartInProgress()){
+  if (getNodeState().getNodeRestartInProgress())
+  {
     GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
     saveRef->dihPtr = dihPtr;
     saveRef->nodeId = getOwnNodeId();
@@ -11386,7 +11395,7 @@
 	       GCPSaveRef::SignalLength, JBB);
     return;
   }
-
+  
   ndbrequire(gci >= cnewestCompletedGci);
   
   if (gci == cnewestCompletedGci) {
@@ -12341,7 +12350,7 @@
 /* WE WILL CLOSE THE FILE.                                                   */
 /*---------------------------------------------------------------------------*/
         logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_WRITE_LOG;
-        closeFile(signal, logFilePtr);
+        closeFile(signal, logFilePtr, __LINE__);
       }//if
     }//if
   }//if
@@ -12511,7 +12520,7 @@
     jam();
     releaseLogpage(signal);
     logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_INIT;
-    closeFile(signal, logFilePtr);
+    closeFile(signal, logFilePtr, __LINE__);
     return;
   }//if
   writeInitMbyte(signal);
@@ -13069,7 +13078,7 @@
    * ------------------------------------------------------------------------ */
   releaseLogpage(signal);
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   LogFileRecordPtr locLogFilePtr;
   findLogfile(signal, fileNo, logPartPtr, &locLogFilePtr);
   locLogFilePtr.p->logFileStatus = LogFileRecord::OPEN_SR_LAST_FILE;
@@ -13140,7 +13149,7 @@
     }//if
   }//if
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   if (logPartPtr.p->noLogFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
     Uint32 fileNo;
     if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
@@ -13198,7 +13207,7 @@
   }//if
   releaseLogpage(signal);
   logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_SR;
-  closeFile(signal, logFilePtr);
+  closeFile(signal, logFilePtr, __LINE__);
   if (logPartPtr.p->srRemainingFiles > ZMAX_LOG_FILES_IN_PAGE_ZERO) {
     Uint32 fileNo;
     if (logFilePtr.p->fileNo >= ZMAX_LOG_FILES_IN_PAGE_ZERO) {
@@ -14353,7 +14362,7 @@
       jam();
       releaseMmPages(signal);
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_SR_COMPLETED;
-      closeFile(signal, logFilePtr);
+      closeFile(signal, logFilePtr, __LINE__);
       return;
       break;
     case LogPartRecord::LES_EXEC_LOG_NEW_MBYTE:
@@ -14369,7 +14378,7 @@
       ptrCheckGuard(nextLogFilePtr, clogFileFileSize, logFileRecord);
       nextLogFilePtr.p->currentMbyte = 0;
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_SR;
-      closeFile(signal, logFilePtr);
+      closeFile(signal, logFilePtr, __LINE__);
       return;
       break;
     case LogPartRecord::LES_EXEC_LOG:
@@ -14822,7 +14831,7 @@
     if (logFilePtr.i != nextAfterCurrentLogFilePtr.i) {
       // This file should be closed.
       logFilePtr.p->logFileStatus = LogFileRecord::CLOSE_SR_INVALIDATE_PAGES;
-      closeFile(signal, logFilePtr);  
+      closeFile(signal, logFilePtr, __LINE__);  
       // Return from this function and wait for close confirm. Then come back 
       // and test the previous file for closing.
       return;
@@ -15680,13 +15689,15 @@
 /* ------       CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG        ------- */
 /*                                                                           */
 /* ------------------------------------------------------------------------- */
-void Dblqh::closeFile(Signal* signal, LogFileRecordPtr clfLogFilePtr) 
+void Dblqh::closeFile(Signal* signal, 
+		      LogFileRecordPtr clfLogFilePtr, Uint32 line) 
 {
   signal->theData[0] = clfLogFilePtr.p->fileRef;
   signal->theData[1] = cownref;
   signal->theData[2] = clfLogFilePtr.i;
   signal->theData[3] = ZCLOSE_NO_DELETE;
-  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
+  signal->theData[4] = line;
+  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 5, JBA);
 }//Dblqh::closeFile()
 
 
@@ -17359,7 +17370,7 @@
         clfLogFilePtr.i = logPartPtr.p->execSrExecLogFile;
         ptrCheckGuard(clfLogFilePtr, clogFileFileSize, logFileRecord);
         clfLogFilePtr.p->logFileStatus = LogFileRecord::CLOSING_EXEC_LOG;
-        closeFile(signal, clfLogFilePtr);
+        closeFile(signal, clfLogFilePtr, __LINE__);
         result = ZCLOSE_FILE;
       }//if
     }//if

--- 1.33/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-12-21 16:41:52 +01:00
+++ 1.34/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-12-23 08:34:19 +01:00
@@ -2898,7 +2898,22 @@
   {
     Local_key tmp = *key;
     PagePtr page_ptr;
-    if (alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no))
+
+    int ret;
+    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+    {
+      tablePtr.p->m_offsets[MM].m_fix_header_size += 
+	Tuple_header::HeaderSize+1;
+      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
+      tablePtr.p->m_offsets[MM].m_fix_header_size -= 
+	Tuple_header::HeaderSize+1;
+    } 
+    else
+    {
+      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
+    }
+
+    if (ret)
       return -1;
     
     Tuple_header* ptr = (Tuple_header*)
@@ -2925,10 +2940,21 @@
   Local_key tmp = *key;
   Uint32 pages = fragPtr.p->noOfPages;
   
+  int ret;
   PagePtr page_ptr;
-  if (alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no))
+  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+  {
+    tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1;
+    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
+    tablePtr.p->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize+1;
+  } 
+  else
+  {
+    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
+  }
+  if (ret)
     return -1;
-
+  
   KeyReqStruct req_struct;
   Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
   
@@ -2936,7 +2962,7 @@
   req_struct.m_tuple_ptr = (Tuple_header*)ptr;
   Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
 
-  int ret = 0;
+  ret = 0;
   copy = false;
   if (! (bits & Tuple_header::FREE))
   {
Thread
bk commit into 5.1 tree (jonas:1.1976)jonas23 Dec