List:Internals« Previous MessageNext Message »
From:jonas Date:November 7 2005 12:28pm
Subject:bk commit into 5.1 tree (jonas:1.1941)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1941 05/11/07 13:28:19 jonas@stripped +2 -0
  ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize
      same row -
        prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
          as prepare insert didnt call handle_size_change_after_update

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.23 05/11/07 13:28:15 jonas@stripped +146 -97
    ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize
        same row -
          prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
            as prepare insert didnt call handle_size_change_after_update

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.30 05/11/07 13:28:15 jonas@stripped +1 -0
    ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize
        same row -
          prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
            as prepare insert didnt call handle_size_change_after_update

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/mysql-5.1-new

--- 1.29/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-11-07 12:19:07 +01:00
+++ 1.30/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-11-07 13:28:15 +01:00
@@ -2103,6 +2103,7 @@
   }
 
   void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*);
+  void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*);
   void setup_fixed_part(KeyReqStruct* req_struct,
 			Operationrec* const regOperPtr,
 			Tablerec* const regTabPtr);

--- 1.22/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-11-07 12:19:07 +01:00
+++ 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-11-07 13:28:15 +01:00
@@ -1298,12 +1298,56 @@
     disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
 }
 
+void
+Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, 
+				     Operationrec* regOperPtr,
+				     Tablerec* regTabPtr)
+{
+  regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
+  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
+  
+  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+  Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
+
+  if(cnt1)
+  {
+    // Disk part is 32-bit aligned
+    char *varptr = req_struct->m_var_data[MM].m_data_ptr;
+    ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
+  } 
+  else
+  {
+    ptr -= Tuple_header::HeaderSize;
+  }
+
+  req_struct->m_disk_ptr= (Tuple_header*)ptr;
+  
+  if(cnt2)
+  {
+    KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
+    ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
+    dst->m_var_len_offset= cnt2;
+    dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
+  }
+  
+  // Set all null bits
+  memset(req_struct->m_disk_ptr->m_null_bits+
+	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
+	 4*regTabPtr->m_offsets[DD].m_null_words);
+  req_struct->m_tuple_ptr->m_header_bits = 
+    (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
+}
+
 int Dbtup::handleInsertReq(Signal* signal,
                            Ptr<Operationrec> regOperPtr,
                            Ptr<Fragrecord> fragPtr,
                            Tablerec* const regTabPtr,
                            KeyReqStruct *req_struct)
 {
+  Uint32 tup_version = 1;
   Fragrecord* regFragPtr = fragPtr.p;
   Uint32 *dst, *ptr= 0;
   Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
@@ -1320,51 +1364,64 @@
   ndbout << "dst: " << hex << UintPtr(dst) << " - " 
 	 << regOperPtr.p->m_copy_tuple_location << endl;
   
+  bool disk = regTabPtr->m_no_of_disk_attributes > 0;
+  bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
+  bool disk_insert = regOperPtr.p->is_first_operation() && disk;
 
-  Uint32 tup_version;
   union {
     Uint32 sizes[4];
     Uint64 cmp[2];
   };
-  if(regOperPtr.p->is_first_operation())
+
+  if(mem_insert)
   {
-    tup_version= 1;
+    jam();
+    ndbassert(regOperPtr.p->is_first_operation()); // disk insert
     prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
-    if(regTabPtr->m_no_of_disk_attributes)
-    {
-      int res;
-      if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
-					regOperPtr.p->m_undo_buffer_space)))
-      {
-	terrorCode= res;
-	regOperPtr.p->m_undo_buffer_space= 0;
-	goto log_space_error;
-      }
-    }
   }
   else
   {
-    Operationrec* prevOp= req_struct->prevOpPtr.p;
-    ndbassert(prevOp->op_struct.op_type == ZDELETE);
-    tup_version= prevOp->tupVersion + 1;
+    if (!regOperPtr.p->is_first_operation())
+    {
+      Operationrec* prevOp= req_struct->prevOpPtr.p;
+      ndbassert(prevOp->op_struct.op_type == ZDELETE);
+      tup_version= prevOp->tupVersion + 1;
+      
+      if(!prevOp->is_first_operation())
+	org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
+    }
 
-    if(!prevOp->is_first_operation())
-      org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
     if (regTabPtr->need_expand())
-      expand_tuple(req_struct, sizes, org, regTabPtr, true);
+      expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
     else
       memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
   }
   
+  if (disk_insert)
+  {
+    int res;
+    if (unlikely(!mem_insert))
+    {
+      sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
+      fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
+    }
+    if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
+				      regOperPtr.p->m_undo_buffer_space)))
+    {
+      terrorCode= res;
+      regOperPtr.p->m_undo_buffer_space= 0;
+      goto log_space_error;
+    }
+  }
+  
   regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
   tuple_ptr->set_tuple_version(tup_version);
   if(updateAttributes(req_struct, &cinBuffer[0], 
 		      req_struct->attrinfo_len) == -1)
     return -1;
-  
+
   if (checkNullAttributes(req_struct, regTabPtr) == false)
   {
-
     goto null_check_error;
   }
   
@@ -1376,95 +1433,87 @@
   /**
    * Alloc memory
    */
-  if(regOperPtr.p->is_first_operation())
+  Uint32 frag_page_id = req_struct->frag_page_id;
+  Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+  if(mem_insert)
   {
-    Uint32 frag_page_id = req_struct->frag_page_id;
-    Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
-    regOperPtr.p->m_tuple_location.m_page_no = frag_page_id;
-    if (likely(get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT))
+    if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
     {
-      if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
-      {
-	jam();
-	if ((ptr= alloc_fix_rec(regFragPtr,
-				regTabPtr,
-				&regOperPtr.p->m_tuple_location,
-				&frag_page_id)) == 0)
-	{
-	  goto mem_error;
-	}
-      } 
-      else 
+      jam();
+      if ((ptr= alloc_fix_rec(regFragPtr,
+			      regTabPtr,
+			      &regOperPtr.p->m_tuple_location,
+			      &frag_page_id)) == 0)
       {
-	jam();
-	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
-	if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
-				sizes[2+MM],
-				&regOperPtr.p->m_tuple_location,
-				&frag_page_id, 0)) == 0)
-	  goto mem_error;
+	goto mem_error;
       }
-
-      real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
-      regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
-      c_lqh->accminupdate(signal,
-			  regOperPtr.p->userpointer,
-			  &regOperPtr.p->m_tuple_location);
-      
-      ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
-      ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
-    }
-    
-    if (regTabPtr->m_no_of_disk_attributes)
+    } 
+    else 
     {
-      Local_key tmp;
-      Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
-	1 : sizes[2+DD];
-      
-      int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
-      ndbassert(ret >= 0);
-
-      regOperPtr.p->op_struct.m_disk_preallocated= 1;
-      tmp.m_page_idx= size;
-      memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
-      
-      /**
-       * Set ref from disk to mm
-       */
-      Tuple_header* disk_ptr= req_struct->m_disk_ptr;
-      disk_ptr->m_header_bits = 0;
-      disk_ptr->m_base_record_ref= regOperPtr.p->m_tuple_location.ref();
+      jam();
+      regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+      if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
+			      sizes[2+MM],
+			      &regOperPtr.p->m_tuple_location,
+			      &frag_page_id, 0)) == 0)
+	goto mem_error;
     }
     
-    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
-    tuple_ptr->m_header_bits |= Tuple_header::ALLOC;
+    real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+    regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
+    c_lqh->accminupdate(signal,
+			regOperPtr.p->userpointer,
+			&regOperPtr.p->m_tuple_location);
     
-    if (regTabPtr->checksumIndicator) {
-      jam();
-      setChecksum(req_struct->m_tuple_ptr, regTabPtr);
-    }
-  
-    return 0;
+    ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
+    ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
+    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
   }
   else
   {
-    if (regTabPtr->checksumIndicator) {
-      jam();
-      setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+    int ret;
+    if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
+	(ret = handle_size_change_after_update(req_struct,
+					       base,
+					       regOperPtr.p,
+					       regFragPtr,
+					       regTabPtr,
+					       sizes)))
+    {
+      return ret;
     }
+  }
+  
+  if (disk_insert)
+  {
+    Local_key tmp;
+    Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
+      1 : sizes[2+DD];
     
-    if (!regTabPtr->need_shrink() || cmp[0] == cmp[1])
-      return 0;
-
-    return handle_size_change_after_update(req_struct,
-					   base,
-					   regOperPtr.p,
-					   regFragPtr,
-					   regTabPtr,
-					   sizes);
+    int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
+    ndbassert(ret >= 0);
+    
+    regOperPtr.p->op_struct.m_disk_preallocated= 1;
+    tmp.m_page_idx= size;
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
+    
+    /**
+     * Set ref from disk to mm
+     */
+    Local_key ref = regOperPtr.p->m_tuple_location;
+    ref.m_page_no = frag_page_id;
+    
+    Tuple_header* disk_ptr= req_struct->m_disk_ptr;
+    disk_ptr->m_header_bits = 0;
+    disk_ptr->m_base_record_ref= ref.ref();
   }
-
-
+  
+  if (regTabPtr->checksumIndicator) 
+  {
+    jam();
+    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+  }
+  return 0;
   
 mem_error:
   terrorCode= ZMEM_NOMEM_ERROR;
Thread
bk commit into 5.1 tree (jonas:1.1941)jonas7 Nov