List:Internals« Previous MessageNext Message »
From:jonas.oreland Date:April 5 2005 6:58am
Subject:bk commit into 5.1-ndb tree (joreland:1.1816)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1-ndb repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1816 05/04/05 08:57:55 joreland@stripped +9 -0
  wl1870 - ndb disk/varsize
    fix remaining resize cases...

  ndb/src/kernel/blocks/dbtup/tuppage.hpp
    1.8 05/04/05 08:57:51 joreland@stripped +7 -6
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/tuppage.cpp
    1.4 05/04/05 08:57:51 joreland@stripped +14 -8
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/test_varpage.cpp
    1.2 05/04/05 08:57:51 joreland@stripped +6 -2
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp
    1.32 05/04/05 08:57:51 joreland@stripped +10 -6
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.58 05/04/05 08:57:51 joreland@stripped +157 -32
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
    1.16 05/04/05 08:57:51 joreland@stripped +1 -1
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.44 05/04/05 08:57:51 joreland@stripped +80 -12
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
    1.30 05/04/05 08:57:51 joreland@stripped +2 -1
    Fix remaining varsize/resize cases

  ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.53 05/04/05 08:57:51 joreland@stripped +5 -3
    Fix remaining varsize/resize cases

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	joreland
# Host:	eel.hemma.oreland.se
# Root:	/home/jonas/src/mysql-5.1-ndb-dd

--- 1.15/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Wed Mar 30 15:09:39 2005
+++ 1.16/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	Tue Apr  5 08:57:51 2005
@@ -634,7 +634,7 @@
 			src, sz,
 			gci, logfile_group_id);
     
-    ((Var_page*)pagePtr.p)->free_record(page_idx);
+    ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
   }    
   
   disk_page_update_free_space(tabPtrP, pagePtr, pagePtr.p->list_index);

--- 1.1/ndb/src/kernel/blocks/dbtup/test_varpage.cpp	Fri Feb 18 15:04:58 2005
+++ 1.2/ndb/src/kernel/blocks/dbtup/test_varpage.cpp	Tue Apr  5 08:57:51 2005
@@ -70,7 +70,7 @@
 	rec.data[i] = rand();
       }
       ndbout << "Alloc " << rec.size << flush;
-      rec.idx= page.alloc_record(rec.size, &tmp);
+      rec.idx= page.alloc_record(rec.size, &tmp, 0);
       ndbout << " -> " << rec.idx << endl;
       Uint32* ptr= page.get_ptr(rec.idx);
       memcpy(ptr, rec.data, 4*rec.size);
@@ -85,7 +85,7 @@
       Uint32* ptr= page.get_ptr(rec.idx);
       cmp(ptr, rec.data, rec.size);
       delete[] rec.data;
-      page.free_record(rec.idx);
+      page.free_record(rec.idx, 0);
 
       for (unsigned k = no; k + 1 < allocated; k++)
 	records[k] = records[k+1];
@@ -97,7 +97,11 @@
       ndbout << "Reorg" << endl;      
       page.reorg(&tmp);
       break;
+    case 3: 
+      ndbout << "Expand" << endl;
+      
     }
+
   }
   ndbout << page << endl;
 }

--- 1.3/ndb/src/kernel/blocks/dbtup/tuppage.cpp	Fri Mar  4 22:14:21 2005
+++ 1.4/ndb/src/kernel/blocks/dbtup/tuppage.cpp	Tue Apr  5 08:57:51 2005
@@ -76,7 +76,8 @@
 }
 
 Uint32
-Tup_varsize_page::alloc_record(Uint32 alloc_size, Tup_varsize_page* temp)
+Tup_varsize_page::alloc_record(Uint32 alloc_size, 
+			       Tup_varsize_page* temp, Uint32 chain)
 {
   assert(free_space >= alloc_size);
   Uint32 largest_size= DATA_WORDS - (insert_pos + high_index);
@@ -106,21 +107,23 @@
     next_free_index= get_index_word(page_idx);
   }
 
-  * get_index_ptr(page_idx) = insert_pos + (alloc_size << 16);
+  assert(chain == 0 || chain == CHAIN);
+  * get_index_ptr(page_idx) = insert_pos + ((chain + alloc_size) << 16);
   
   insert_pos += alloc_size;
   free_space -= alloc_size;
-  
   return page_idx;
 }
   
 Uint32
-Tup_varsize_page::free_record(Uint32 page_idx)
+Tup_varsize_page::free_record(Uint32 page_idx, Uint32 chain)
 {
   Uint32 *index_ptr= get_index_ptr(page_idx);
   Uint32 index_word= * index_ptr;
   Uint32 entry_pos= index_word & 0xFFFF;
-  Uint32 entry_len= index_word >> 16;
+  Uint32 entry_len= (index_word >> 16) & ~CHAIN;
+  assert(chain == 0 || chain == CHAIN);
+  assert(!(((index_word >> 16) ^ chain) & 0x8000));
   if (page_idx + 1 == high_index) {
     /*
       We are removing the last in the entry list. We could potentially
@@ -194,7 +197,7 @@
   for (; index_ptr < end_of_page; index_ptr++)
   {
     Uint32 index_word= * index_ptr;
-    Uint32 entry_len= index_word >> 16;
+    Uint32 entry_len= (index_word >> 16) & ~CHAIN;
     if (entry_len != 0) {
       /*
 	We found an index item that needs to be packed. 
@@ -203,11 +206,13 @@
       Uint32 entry_pos= index_word & 0xffff;
       assert(entry_pos + entry_len <= old_insert_pos);
       assert(new_insert_pos + entry_len <= old_insert_pos);
-      * index_ptr= new_insert_pos + (entry_len << 16);
+      * index_ptr= new_insert_pos + (index_word & 0xFFFF0000);
       memcpy(m_data+new_insert_pos, copy_page->m_data+entry_pos, 4*entry_len);
       
       new_insert_pos += entry_len;
     }
+    else
+      assert((index_word >> 16) == 0);
   }
   insert_pos= new_insert_pos;
 }
@@ -227,7 +232,8 @@
     out << " [ " << i;
     if(*index_ptr >> 16)
       out << " pos: " << ((*index_ptr) & 0xFFFF)
-	  << " len: " << (*index_ptr >> 16)
+	  << " len: " << ((*index_ptr >> 16) & ~page.CHAIN)
+	  << (((* index_ptr >> 16) & page.CHAIN) ? " CHAIN " : " ") 
 	  << "]" << flush;
     else
       out << " FREE ]" << flush;

--- 1.7/ndb/src/kernel/blocks/dbtup/tuppage.hpp	Fri Apr  1 17:01:26 2005
+++ 1.8/ndb/src/kernel/blocks/dbtup/tuppage.hpp	Tue Apr  5 08:57:51 2005
@@ -118,6 +118,7 @@
   Uint32 unused_ph[10];
   
   STATIC_CONST( DATA_WORDS = File_formats::PAGE_SIZE_WORDS - 32 );
+  STATIC_CONST( CHAIN = 0x8000 );
   
   Uint32 m_data[DATA_WORDS];
   
@@ -145,12 +146,12 @@
    * Alloc record from page, return page_idx
    *   temp is used when having to reorg page before allocating
    */
-  Uint32 alloc_record(Uint32 size, Tup_varsize_page* temp);
+  Uint32 alloc_record(Uint32 size, Tup_varsize_page* temp, Uint32 chain);
   
   /**
    * Free record from page
    */
-  Uint32 free_record(Uint32 page_idx);
+  Uint32 free_record(Uint32 page_idx, Uint32 chain);
 
   void reorg(Tup_varsize_page* temp);
   void rebuild_index(Uint32* ptr);
@@ -161,8 +162,8 @@
   bool is_space_behind_entry(Uint32 page_index, Uint32 growth_len) const {
     Uint32 idx= get_index_word(page_index); 
     Uint32 pos= idx & 0xFFFF;
-    Uint32 len= idx >> 16;
-    if ((insert_pos == pos + len) && 
+    Uint32 len= (idx >> 16) & ~CHAIN;
+    if ((pos + len == insert_pos) && 
 	(insert_pos + growth_len < DATA_WORDS - high_index))
       return true;
     return false;
@@ -182,8 +183,8 @@
   void shrink_entry(Uint32 page_index, Uint32 new_size){
     Uint32 *pos= get_index_ptr(page_index);
     Uint32 idx= *pos;
-    *pos= (idx & 0xFFFF) + (new_size << 16);
-    Uint32 old_size= idx >> 16;
+    *pos= (idx & (CHAIN << 16 | 0xFFFF)) + (new_size << 16);
+    Uint32 old_size= (idx >> 16) & ~CHAIN;
     
     assert(old_size >= new_size);
     Uint32 shrink = old_size - new_size;

--- 1.52/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Fri Apr  1 17:01:26 2005
+++ 1.53/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	Tue Apr  5 08:57:51 2005
@@ -2234,9 +2234,10 @@
 //---------------------------------------------------------------
 //
 // Public methods
-  Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
-  void free_var_part(Fragrecord*, Tablerec*, Var_part_ref);
-  void free_var_part(Fragrecord*, Tablerec*, Local_key*, Var_page*);
+  Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*,
+			Uint32 base);
+  void free_var_part(Fragrecord*, Tablerec*, Var_part_ref, Uint32 chain);
+  void free_var_part(Fragrecord*, Tablerec*, Local_key*, Var_page*, Uint32 chain);
 
   Uint32* alloc_fix_rec(Fragrecord*, Tablerec*, Local_key*, Uint32 *);
   void free_fix_rec(Fragrecord*, Tablerec*, Local_key*, Fix_page*);
@@ -2400,6 +2401,7 @@
 		     Operationrec*, Fragrecord*, Tablerec*);
   
   int handle_size_change_after_update(KeyReqStruct* req_struct,
+				      Tuple_header* org,
 				      Operationrec*,
 				      Fragrecord* const regFragPtr,
 				      Tablerec* const regTabPtr,

--- 1.29/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	Fri Apr  1 17:01:26 2005
+++ 1.30/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	Tue Apr  5 08:57:51 2005
@@ -142,7 +142,7 @@
     ndbout_c("abort grow");
     Var_page *pageP= (Var_page*)page.p;
     Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx;
-    Uint32 len= pageP->get_entry_len(idx);
+    Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
     Uint32 fix_size= regTabPtr.p->m_offsets[MM].m_fix_header_size;
     Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
     
@@ -152,6 +152,7 @@
     
     ndbassert(sz <= len);
     pageP->shrink_entry(idx, sz);
+    update_free_page_list(regFragPtr.p, pageP);
   } 
   else if(bits & Tuple_header::MM_SHRINK)
   {

--- 1.43/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	Fri Apr  1 13:11:10 2005
+++ 1.44/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	Tue Apr  5 08:57:51 2005
@@ -30,12 +30,10 @@
   TablerecPtr regTabPtr;
   FragrecordPtr regFragPtr;
   Uint32 frag_page_id, frag_id;
+#if 0
 
   ljamEntry();
 
-#if 0
-  //ndbout_c("execTUP_DEALLOCREQ");
-
   frag_id= signal->theData[0];
   regTabPtr.i= signal->theData[1];
   frag_page_id= signal->theData[2];
@@ -48,6 +46,8 @@
   Local_key tmp;
   tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
   tmp.m_page_idx= page_index;
+  ndbout_c("execTUP_DEALLOCREQ %d %d", tmp.m_page_no, page_index);
+  return;
 
   PagePtr pagePtr;
   Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
@@ -67,9 +67,10 @@
     if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
     {
       free_var_part(regFragPtr.p, regTabPtr.p,
-		    *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p));
+		    *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p),
+		    Var_page::CHAIN);
     }
-    free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p);
+    free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p, 0);
   } else {
     free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
   }
@@ -180,10 +181,11 @@
     if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
     {
       free_var_part(regFragPtr, regTabPtr,
-		    *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr));
+		    *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr),
+		    Var_page::CHAIN);
     }
     free_var_part(regFragPtr, regTabPtr, 
-		  &regOperPtr->m_tuple_location, (Var_page*)page);
+		  &regOperPtr->m_tuple_location, (Var_page*)page, 0);
   } else {
     free_fix_rec(regFragPtr, regTabPtr, 
 		 &regOperPtr->m_tuple_location, (Fix_page*)page);
@@ -222,10 +224,36 @@
   {
     Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr);
     memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fix_size));
-    Uint32 *var_part= get_ptr(*(Var_part_ref*)ref);
-    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]);
-    memcpy(var_part, copy->get_var_part_ptr(regTabPtr), sz);
+
+    Local_key tmp; tmp= *ref;
+    if(0) printf("%p %d %d (%d bytes) - ref: %x ", tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx,
+	   4*(Tuple_header::HeaderSize+fix_size),
+	   *ref);
+    Ptr<Var_page> vpagePtr;
+    Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref);
+    Uint32 *src= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
+    memcpy(dst, src, sz);
+    if(0) printf("ptr: %p %d ref: %x - chain commit", dst, sz, *ref);
     copy_bits |= Tuple_header::CHAINED_ROW;
+    
+    if(0)
+    {
+      for(Uint32 i = 0; i<((sz+3)>>2); i++)
+	printf(" %.8x", src[i]);
+      printf("\n");
+    }
+    
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      if(0) printf(" - shrink %d -> %d - ", 
+	     vpagePtr.p->get_entry_len(tmp.m_page_idx), (sz + 3) >> 2);
+      vpagePtr.p->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+      update_free_page_list(regFragPtr, vpagePtr.p);
+    } 
+    if(0) ndbout_c("");
   } 
   else 
   {
@@ -233,10 +261,15 @@
     Uint32 sz= Tuple_header::HeaderSize + fix_size +
       ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
     memcpy(tuple_ptr, copy, 4*sz);      
-    if(copy->m_header_bits & Tuple_header::MM_SHRINK)
+    if(0) ndbout_c("%p %d %d (%d bytes)", tuple_ptr,
+	     regOperPtr->m_tuple_location.m_page_no,
+	     regOperPtr->m_tuple_location.m_page_idx,
+	     4*sz);
+    if(copy_bits & Tuple_header::MM_SHRINK)
     {
       ((Var_page*)page)->shrink_entry(regOperPtr->m_tuple_location.m_page_idx, 
 				      sz);
+      update_free_page_list(regFragPtr, (Var_page*)page);
     } 
   }
   
@@ -263,7 +296,7 @@
 	ndbassert(page->uncommitted_used_space >= sz);
 	page->uncommitted_used_space -= sz;
 	key.m_page_idx= ((Var_page*)page)->
-	  alloc_record(sz, (Var_page*)ctemp_page);
+	  alloc_record(sz, (Var_page*)ctemp_page, 0);
 	
 	disk_page_undo_alloc(page, &key, sz, gci, logfile_group_id);
       }
@@ -303,6 +336,41 @@
   if (regTabPtr->checksumIndicator) {
     jam();
     setChecksum(tuple_ptr, regTabPtr);
+  }
+
+  if(regTabPtr->m_attributes[MM].m_no_of_varsize)
+  {
+    Var_page* pageP= (Var_page*)page;
+loop:
+    for(Uint32 i = 1; i < pageP->high_index; i++)
+    {
+      Uint32 len= pageP->get_entry_len(i);
+      if(len > 0 && !(len & Var_page::CHAIN))
+      {
+	Tuple_header* tmp= (Tuple_header*)pageP->get_ptr(i);
+	if(tmp->m_operation_ptr_i != RNIL)
+	{
+	  if(tmp->m_operation_ptr_i > c_operation_pool.getSize())
+	  {
+	    ndbout << "FAILED " << i << endl << *pageP << endl;
+	  }
+	  c_operation_pool.getPtr(tmp->m_operation_ptr_i);
+	}
+	if(tmp->m_header_bits & Tuple_header::CHAINED_ROW)
+	  get_ptr(* (Var_part_ref*)tmp->get_var_part_ptr(regTabPtr));
+      }
+    }
+    
+#if 0
+    if(false && regOperPtr->op_struct.op_type != ZREAD && 
+       req_struct->m_varpart_page_ptr_p != pageP &&
+       req_struct->m_varpart_page_ptr_p != 0)
+    {
+      ndbout_c("verifying extra page");
+      pageP= req_struct->m_varpart_page_ptr_p;
+      goto loop;
+    }
+#endif
   }
 }
 

--- 1.57/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	Fri Apr  1 17:01:26 2005
+++ 1.58/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	Tue Apr  5 08:57:51 2005
@@ -1010,27 +1010,72 @@
  void Dbtup::sendTUPKEYCONF(Signal* signal,
 			    KeyReqStruct *req_struct,
 			    Operationrec * const regOperPtr)
- {
-   TupKeyConf * const tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();  
-
-   Uint32 RuserPointer= regOperPtr->userpointer;
-   Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
-   Uint32 log_size= req_struct->log_size;
-   Uint32 read_length= req_struct->read_length;
-   Uint32 last_row= req_struct->last_row;
-
-   set_trans_state(regOperPtr, TRANS_STARTED);
-   set_tuple_state(regOperPtr, TUPLE_PREPARED);
-   tupKeyConf->userPtr= RuserPointer;
-   tupKeyConf->readLength= read_length;
-   tupKeyConf->writeLength= log_size;
-   tupKeyConf->noFiredTriggers= RnoFiredTriggers;
-   tupKeyConf->lastRow= last_row;
-
+{
+  TupKeyConf * const tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();  
+  
+  Uint32 RuserPointer= regOperPtr->userpointer;
+  Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
+  Uint32 log_size= req_struct->log_size;
+  Uint32 read_length= req_struct->read_length;
+  Uint32 last_row= req_struct->last_row;
+  
+  set_trans_state(regOperPtr, TRANS_STARTED);
+  set_tuple_state(regOperPtr, TUPLE_PREPARED);
+  tupKeyConf->userPtr= RuserPointer;
+  tupKeyConf->readLength= read_length;
+  tupKeyConf->writeLength= log_size;
+  tupKeyConf->noFiredTriggers= RnoFiredTriggers;
+  tupKeyConf->lastRow= last_row;
+  
   EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
 		 TupKeyConf::SignalLength);
+  
+  Uint32 RnoOfFragrec= cnoOfFragrec;
+  Uint32 RnoOfTablerec= cnoOfTablerec;
+  
+  jamEntry();
+  fragptr.i= regOperPtr->fragmentPtr;
+  ptrAss(fragptr, fragrecord);
+  Fragrecord * regFragPtr= fragptr.p;
+  
+  tabptr.i = regFragPtr->fragTableId;
+  ptrAss(tabptr, tablerec);
+  Tablerec* regTabPtr = tabptr.p;
+  
+  if(regTabPtr->m_attributes[MM].m_no_of_varsize)
+  {
+    PagePtr tmp;
+    tmp.i= regOperPtr->m_tuple_location.m_page_no;
+    ptrCheckGuard(tmp, cnoOfPage, page);
+    Var_page* pageP= (Var_page*)tmp.p;
+loop:
+    for(Uint32 i = 1; i < pageP->high_index; i++)
+    {
+      Uint32 len= pageP->get_entry_len(i);
+      if(len > 0 && !(len & Var_page::CHAIN))
+      {
+	Tuple_header* tmp= (Tuple_header*)pageP->get_ptr(i);
+	if(tmp->m_operation_ptr_i != RNIL)
+	{
+	  c_operation_pool.getPtr(tmp->m_operation_ptr_i);
+	}
+	if(tmp->m_header_bits & Tuple_header::CHAINED_ROW)
+	  get_ptr(* (Var_part_ref*)tmp->get_var_part_ptr(regTabPtr));
+      }
+    }
+    
+    if(false && regOperPtr->op_struct.op_type != ZREAD && 
+       req_struct->m_varpart_page_ptr_p != pageP &&
+       req_struct->m_varpart_page_ptr_p != 0)
+    {
+      ndbout_c("verifying extra page");
+      pageP= req_struct->m_varpart_page_ptr_p;
+      goto loop;
+    }
+  }
 }
 
+
 #define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
 
 /* ---------------------------------------------------------------- */
@@ -1105,6 +1150,7 @@
                            KeyReqStruct* req_struct) 
 {
   Uint32 *dst;
+  Tuple_header *base= req_struct->m_tuple_ptr, *org;
   if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
 					   regTabPtr->total_rec_size)) == 0)
   {
@@ -1112,7 +1158,6 @@
     goto error;
   }
 
-  Tuple_header* org;
   Uint32 tup_version;
   if(operPtrP->is_first_operation())
   {
@@ -1187,6 +1232,7 @@
   {  
     shrink_tuple(req_struct, sizes+2, regTabPtr);
     if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
+							    base,
 							    operPtrP,
 							    regFragPtr,
 							    regTabPtr,
@@ -1286,7 +1332,7 @@
                            KeyReqStruct *req_struct)
 {
   Uint32 *dst, *ptr= 0;
-  Tuple_header *org= req_struct->m_tuple_ptr;
+  Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
   Tuple_header *tuple_ptr;
   if ((dst= 
        c_undo_buffer.alloc_copy_tuple(&regOperPtr.p->m_copy_tuple_location,
@@ -1373,7 +1419,7 @@
       if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
 			      sizes[2+MM],
 			      &regOperPtr.p->m_tuple_location,
-			      &frag_page_id)) == 0)
+			      &frag_page_id, 0)) == 0)
 	goto mem_error;
     }
 
@@ -1425,6 +1471,7 @@
       return 0;
 
     return handle_size_change_after_update(req_struct,
+					   base,
 					   regOperPtr.p,
 					   regFragPtr,
 					   regTabPtr,
@@ -2496,7 +2543,8 @@
     memcpy(ptr, src, 4*fix_size);
   }
   
-  ptr->m_header_bits= Tuple_header::DISK_INLINE | bits;
+  ptr->m_header_bits= Tuple_header::DISK_INLINE | 
+    bits & ~(Uint32)(Tuple_header::CHAINED_ROW);
   src->m_header_bits= bits & 
     ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
   
@@ -2651,40 +2699,65 @@
 
 int
 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
+				       Tuple_header* org,
 				       Operationrec* regOperPtr,
 				       Fragrecord* regFragPtr,
 				       Tablerec* regTabPtr,
 				       Uint32 sizes[4])
 {
   ndbrequire(sizes[1] == sizes[3]);
-  ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
+  //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
+  if(0)
+    printf("%p %d %d - handle_size_change_after_update ",
+	   req_struct->m_tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx);
+  
+  Uint32 bits= org->m_header_bits;
+  Uint32 fix_sz = Tuple_header::HeaderSize + 
+    regTabPtr->m_offsets[MM].m_fix_header_size;
+  
   if(sizes[MM] == sizes[2+MM])
     ;
   else if(sizes[MM] > sizes[2+MM])
   {
-    ndbout_c("shrink");
+    if(0) ndbout_c("shrink");
     req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::MM_SHRINK;
   }
   else
   {
-    printf("grow - ");
+    if(0) printf("grow - ");
     Var_page* pageP= req_struct->m_varpart_page_ptr_p;
-    Uint32 idx= regOperPtr->m_tuple_location.m_page_idx;
-    Uint32 alloc= pageP->get_entry_len(idx);
-    if(sizes[2+MM] <= alloc)
+    Uint32 idx, alloc, needed;
+    if(! (bits & Tuple_header::CHAINED_ROW))
+    {
+      idx= regOperPtr->m_tuple_location.m_page_idx;
+      alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
+      needed= sizes[2+MM];
+    }
+    else
+    {
+      Local_key tmp;
+      tmp= *org->get_var_part_ptr(regTabPtr);
+      idx= tmp.m_page_idx;
+      alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
+      needed= sizes[2+MM] - fix_sz;
+    }
+    
+    if(needed <= alloc)
     {
       ndbassert(!regOperPtr->is_first_operation());
       ndbout_c(" no grow");
       return 0;
     }
-    Uint32 add= sizes[2+MM] - alloc;
+    Uint32 add= needed - alloc;
     
-    if(pageP->free_space >= add)
+    if(false && pageP->free_space >= add)
     {
       jam();
       if(!pageP->is_space_behind_entry(idx, add))
       {
-	printf("extra reorg");
+	if(0) printf("extra reorg");
 	jam();
 	/**
 	 * In this case we need to reorganise the page to fit. To ensure we
@@ -2703,13 +2776,65 @@
 	add += alloc;
       }
       pageP->grow_entry(idx, add);
+      update_free_page_list(regFragPtr, pageP);
       req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::MM_GROWN;
     }
     else
     {
-      abort();
+      assert(fix_sz < alloc);
+
+      Local_key key;
+
+      if(! (bits & Tuple_header::CHAINED_ROW))
+      {
+	org->m_header_bits |= Tuple_header::CHAINED_ROW;
+	Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr, 
+				       needed - fix_sz, &key, &id, 
+				       Var_page::CHAIN);
+	assert(dst);
+	org= (Tuple_header*)pageP->get_ptr(idx);
+	Uint32 *ptr= org->get_var_part_ptr(regTabPtr);
+
+	//assert(key.m_page_no != pageP->physical_page_id);
+	memcpy(dst, ptr, 4*(needed-fix_sz));
+	* ptr = key.ref(); // store ref
+	
+	if(0)printf("%d %d", pageP->get_entry_len(idx), fix_sz + 1);
+
+	pageP->shrink_entry(idx, fix_sz + 1); // var part ref
+	pageP->reorg((Var_page*)ctemp_page);
+	update_free_page_list(regFragPtr, pageP);
+	if(0) printf(" %d bytes, split(ref: %x %x ptr: %p sz: %d, %d) ",
+		     4*fix_sz, *ptr, key.ref(), dst, sizes[2+MM] - fix_sz, alloc-fix_sz);
+      }
+      else
+      {
+	assert(sizes[2+MM] >= alloc);
+	pageP->reorg((Var_page*)ctemp_page);
+	Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr, 
+				       needed, &key, &id, 
+				       Var_page::CHAIN);
+	assert(dst);
+	
+	Uint32 base_idx= regOperPtr->m_tuple_location.m_page_idx;
+	org= (Tuple_header*)
+	  ((Var_page*)req_struct->m_page_ptr_p)->get_ptr(base_idx);
+	Uint32 *ref= org->get_var_part_ptr(regTabPtr);
+	Uint32 old_ref= *ref;
+	Uint32 *src= pageP->get_ptr(idx);
+	
+	assert(alloc < needed);
+	memcpy(dst, src, 4*alloc);
+	*ref = key.ref();
+	if(0) printf(" %d bytes, resplit(ref: %x,%x ptr: %p sz: %d, %d) ",
+		     4*(fix_sz+1), old_ref,*ref, dst, sizes[2+MM] - fix_sz, alloc-fix_sz);
+	free_var_part(regFragPtr, regTabPtr, 
+		      *(Var_part_ref*)&old_ref, Var_page::CHAIN);
+      }
+      
+      req_struct->m_tuple_ptr->m_header_bits = bits | Tuple_header::MM_GROWN;
     }
-    ndbout_c("free: %d", pageP->free_space);
+    if(0) ndbout_c("free: %d", pageP->free_space);
   }
   return 0;
 }

--- 1.31/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	Thu Mar 31 20:59:24 2005
+++ 1.32/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	Tue Apr  5 08:57:51 2005
@@ -455,7 +455,8 @@
 			     Tablerec* const tab_ptr,
 			     Uint32 alloc_size,
 			     Local_key* key,
-			     Uint32 * out_frag_page_id)
+			     Uint32 * out_frag_page_id,
+			     Uint32 base)
 {
   Var_page* page_header;
   PagePtr page_ptr;
@@ -475,7 +476,8 @@
     ljam();
     page_header= (Var_page*)page_ptr.p;
   }
-  Uint32 idx= page_header->alloc_record(alloc_size, (Var_page*)ctemp_page);
+  Uint32 idx= page_header->alloc_record(alloc_size, 
+					(Var_page*)ctemp_page, base);
   
   key->m_page_no= page_ptr.i;
   key->m_page_idx= idx;
@@ -501,7 +503,8 @@
     Returns true if deallocation was successful otherwise false
 */
 void
-Dbtup::free_var_part(Fragrecord* frag_ptr, Tablerec* tab_ptr, Var_part_ref ref)
+Dbtup::free_var_part(Fragrecord* frag_ptr, Tablerec* tab_ptr, 
+		     Var_part_ref ref, Uint32 chain)
 {
   Local_key tmp;
   PagePtr pagePtr;
@@ -509,17 +512,18 @@
   pagePtr.i= tmp.m_page_no= ref.m_ref >> MAX_TUPLES_BITS;
   
   ptrCheckGuard(pagePtr, cnoOfPage, page);
-  free_var_part(frag_ptr, tab_ptr, &tmp, (Var_page*)pagePtr.p);
+  free_var_part(frag_ptr, tab_ptr, &tmp, (Var_page*)pagePtr.p, chain);
 }
 
 void Dbtup::free_var_part(Fragrecord* const frag_ptr,
 			  Tablerec* const tab_ptr,
 			  Local_key* key,
-			  Var_page* const page_header)
+			  Var_page* const page_header,
+			  Uint32 chain)
 {
   
   Uint32 page_idx= key->m_page_idx;
-  page_header->free_record(page_idx);
+  page_header->free_record(page_idx, chain);
   
   ndbassert(page_header->free_space <= Var_page::DATA_WORDS);
   if (page_header->free_space == Var_page::DATA_WORDS - 1)
Thread
bk commit into 5.1-ndb tree (joreland:1.1816)jonas.oreland5 Apr