Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-11-07 09:47:31+01:00, knielsen@ymer.(none) +2 -0
Merge bk-internal:/home/bk/mysql-5.1-new-ndb
into ymer.(none):/usr/local/mysql/mysql-5.1-ndb-dynattr
MERGE: 1.2322.1.13
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2006-11-07 09:47:26+01:00, knielsen@ymer.(none) +0 -0
Auto merged
MERGE: 1.45.1.1
storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2006-11-07 09:47:26+01:00, knielsen@ymer.(none) +0 -0
Auto merged
MERGE: 1.62.1.3
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: knielsen
# Host: ymer.(none)
# Root: /usr/local/mysql/mysql-5.1-ndb-dynattr/RESYNC
--- 1.46/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2006-11-07 09:47:37 +01:00
+++ 1.47/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2006-11-07 09:47:37 +01:00
@@ -222,7 +222,8 @@ Dbtup::calculateChecksum(Tuple_header* t
// includes tupVersion
//printf("%p - ", tuple_ptr);
- if (regTabPtr->m_attributes[MM].m_no_of_varsize)
+ if (regTabPtr->m_attributes[MM].m_no_of_varsize ||
+ regTabPtr->m_attributes[MM].m_no_of_dynamic)
rec_size += Tuple_header::HeaderSize;
for (i= 0; i < rec_size-2; i++) {
@@ -1103,29 +1104,72 @@ Dbtup::prepare_initial_insert(KeyReqStru
req_struct->attr_descr= tab_descr;
Uint16* order= (Uint16*)&tableDescriptor[order_desc];
- const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
- const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+ const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+ const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
+ const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+ const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
+ const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
- if(cnt1)
+ if(mm_vars || mm_dyns)
{
+ /* Prepare empty varsize part. */
KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
- dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
+ /* Reserve room for length word. */
+ ptr++;
+ dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
dst->m_offset_array_ptr= req_struct->var_pos_array;
- dst->m_var_len_offset= cnt1;
+ dst->m_var_len_offset= mm_vars;
dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
- // Disk part is 32-bit aligned
+ // Disk/dynamic part is 32-bit aligned
ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
Uint32 pos= 0;
Uint16 *pos_ptr = req_struct->var_pos_array;
- Uint16 *len_ptr = pos_ptr + cnt1;
- for(Uint32 i= 0; i<cnt1; i++)
+ Uint16 *len_ptr = pos_ptr + mm_vars;
+ for(Uint32 i= 0; i<mm_vars; i++)
{
* pos_ptr++ = pos;
* len_ptr++ = pos;
pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
}
+ ndbassert(ptr==ALIGN_WORD(dst->m_data_ptr+pos));
+
+ /* Prepare empty dynamic part. */
+ dst->m_dyn_data_ptr= (char *)ptr;
+ ndbassert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+
+ /* Zero out the bitmap. */
+ Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
+ bzero(ptr, bm_size_in_bytes);
+
+ /* Set up the offsets for the attribute data. */
+ dst->m_dyn_offset_arr_ptr= len_ptr;
+ dst->m_dyn_len_offset= mm_dyns;
+ dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
+ pos_ptr= len_ptr;
+ len_ptr= pos_ptr+mm_dyns;
+ /* Reserve room for bitmap + shrunken offset array + padding. */
+ pos= bm_size_in_bytes + 4*((mm_dynvar+2)>>1);
+ for(Uint32 i= 0; i<mm_dynvar; i++)
+ {
+ * pos_ptr++ = pos;
+ * len_ptr++ = pos;
+ Uint32 attrDes= tab_descr[order[i+mm_dynfix]].tabDescr;
+ pos += AttributeDescriptor::getSizeInBytes(attrDes);
+ }
+ /* Fixed part is stored 32-bit aligned, from the end of the row back. */
+ pos= (pos+3)&~3;
+ for(Uint32 i= mm_dynfix; i>0; )
+ {
+ i--;
+ pos_ptr[i]= pos;
+ Uint32 attrDes= tab_descr[order[i]].tabDescr;
+ pos+= (AttributeDescriptor::getSizeInBytes(attrDes)+3)&~3;
+ /* len offset array is not used for fixed-size attributes. */
+ }
+ ndbassert((pos&3)==0);
+ ptr+= (pos>>2);
}
else
{
@@ -1134,13 +1178,13 @@ Dbtup::prepare_initial_insert(KeyReqStru
req_struct->m_disk_ptr= (Tuple_header*)ptr;
- if(cnt2)
+ if(dd_vars)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
- dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
- dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
- dst->m_var_len_offset= cnt2;
+ dst->m_data_ptr= (char*)(((Uint16*)ptr)+dd_vars+1);
+ dst->m_offset_array_ptr= req_struct->var_pos_array + (mm_vars << 1);
+ dst->m_var_len_offset= dd_vars;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
@@ -1170,7 +1214,9 @@ int Dbtup::handleInsertReq(Signal* signa
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = regOperPtr.p->is_first_operation();
bool disk_insert = mem_insert && disk;
- bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
+ bool vardynsize =
+ regTabPtr->m_attributes[MM].m_no_of_varsize +
+ regTabPtr->m_attributes[MM].m_no_of_dynamic;
bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
Uint32 frag_page_id = req_struct->frag_page_id;
@@ -1273,7 +1319,7 @@ int Dbtup::handleInsertReq(Signal* signa
goto mem_error;
}
- if (!varsize)
+ if (!vardynsize)
{
jam();
ptr= alloc_fix_rec(regFragPtr,
@@ -1305,7 +1351,7 @@ int Dbtup::handleInsertReq(Signal* signa
goto alloc_rowid_error;
}
- if (!varsize)
+ if (!vardynsize)
{
jam();
ptr= alloc_fix_rowid(regFragPtr,
@@ -1337,7 +1383,7 @@ int Dbtup::handleInsertReq(Signal* signa
base = (Tuple_header*)ptr;
base->m_operation_ptr_i= regOperPtr.i;
base->m_header_bits= Tuple_header::ALLOC |
- (varsize ? Tuple_header::CHAINED_ROW : 0);
+ (vardynsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
else
@@ -2467,6 +2513,117 @@ expand_var_part(Dbtup::KeyReqStruct::Var
return ALIGN_WORD(dst_ptr);
}
+/*
+ expand_dyn_part - copy dynamic attributes to fully expanded size.
+
+ Both variable-sized and fixed-size attributes are stored in the same way
+ in the expanded form as variable-sized attributes (in expand_var_part()).
+
+ dst Destination for expanded data
+ tabPtrP Table descriptor
+ src Pointer to the start of dynamic bitmap in source row
+ row_len Total number of 32-bit words in dynamic part of row
+ tabDesc Array of table descriptors
+ order Array of indexes into tabDesc, dynfix followed by dynvar
+*/
+Uint32*
+Dbtup::expand_dyn_part(KeyReqStruct::Var_data *dst,
+ const Tablerec* tabPtrP,
+ const Uint32* src,
+ Uint32 row_len,
+ const Uint32 * tabDesc,
+ const Uint16* order)
+{
+ Uint16 bm_len, max_bmlen;
+ Uint32 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+ Uint32 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+
+ /* Copy the bitmap, zeroing out any words not stored in the row. */
+ char *dst_bm_ptr= dst->m_dyn_data_ptr;
+ if(row_len == 0)
+ bm_len= 0;
+ else
+ bm_len= 2 * (*((unsigned char *)src));
+ max_bmlen= tabPtrP->m_offsets[MM].m_dyn_null_words << 2;
+
+ ndbrequire(bm_len <= max_bmlen);
+
+ if(bm_len > 0)
+ memcpy(dst_bm_ptr, src, bm_len);
+ if(bm_len < max_bmlen)
+ bzero(dst_bm_ptr + bm_len, max_bmlen - bm_len);
+
+ char *src_off_start= ((char *)src) + bm_len;
+ ndbassert((UintPtr(src_off_start)&1) == 0);
+ Uint16 *src_off_ptr= (Uint16*)src_off_start;
+
+ /*
+ Prepare the variable-sized dynamic attributes, copying out data from the
+ source row for any that are not NULL.
+ */
+ Uint32 no_attr= dst->m_dyn_len_offset;
+ Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
+ Uint16* dst_len_ptr= dst_off_ptr + no_attr;
+ Uint16 this_src_off= *src_off_ptr++;
+ /* We need to reserve room for the offsets written by shrink_tuple+padding. */
+ Uint16 dst_off= max_bmlen + 4*((mm_dynvar+2)>>1);
+ char *dst_ptr= dst_bm_ptr+dst_off;
+ for(Uint32 i= 0; i<mm_dynvar; i++)
+ {
+ Uint16 j= order[mm_dynfix+i];
+ Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
+ Uint32 len;
+ if(dynNotNullFlagCheckLen(src, bm_len, tabDesc[j+1]))
+ {
+ Uint16 next_src_off= *src_off_ptr++;
+ len= next_src_off - this_src_off;
+ memcpy(dst_ptr, src_off_start+this_src_off, len);
+ this_src_off= next_src_off;
+ }
+ else
+ {
+ len= 0;
+ }
+ dst_off_ptr[i]= dst_off;
+ dst_len_ptr[i]= dst_off+len;
+ dst_off+= max_len;
+ dst_ptr+= max_len;
+ }
+ /*
+ The fixed-size data is stored 32-bit aligned after the variable-sized
+ data.
+ */
+ char *src_ptr= src_off_start+this_src_off;
+ src_ptr= (char *)(ALIGN_WORD(src_ptr));
+
+ /*
+ Prepare the fixed-size dynamic attributes, copying out data from the
+ source row for any that are not NULL.
+ Note that the fixed-size data is stored in reverse from the end of the
+ dynamic part of the row. This is true both for the stored/shrunken and
+ for the expanded form.
+ */
+ for(Uint32 i= mm_dynfix; i>0; )
+ {
+ i--;
+ Uint16 j= order[i];
+ Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+ dst_off_ptr[mm_dynvar+i]= dst_off;
+ /* len offset array is not used for fixed size. */
+ if(dynNotNullFlagCheckLen(src, bm_len, tabDesc[j+1]))
+ {
+ ndbassert((UintPtr(dst_ptr)&3) == 0);
+ memcpy(dst_ptr, src_ptr, fix_size);
+ src_ptr+= fix_size;
+ }
+ dst_off+= fix_size;
+ dst_ptr+= fix_size;
+ }
+
+ return (Uint32 *)dst_ptr;
+}
+
+
void
Dbtup::expand_tuple(KeyReqStruct* req_struct,
Uint32 sizes[2],
@@ -2479,6 +2636,7 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
Uint32 fix_size= tabPtrP->m_offsets[MM].m_varpart_offset;
Uint32 order_desc= tabPtrP->m_real_order_descriptor;
@@ -2489,36 +2647,67 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
- if(mm_vars)
+ if(mm_vars || mm_dyns)
{
-
+ Uint32 src_len, orig_src_len;
Uint32 step; // in bytes
- const Uint32 *src_data= src_ptr;
+ const Uint32 *src_data;
KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
if(bits & Tuple_header::CHAINED_ROW)
{
+ /* This is for the initial expansion of a stored row. */
Ptr<Page> var_page;
- src_data= get_ptr(&var_page, * (Var_part_ref*)src_ptr);
+ Var_part_ref *var_ref= (Var_part_ref*)src_ptr;
+ src_data= get_ptr(&var_page, *var_ref);
+ src_len= get_len(&var_page, *var_ref);
+ sizes[MM]= src_len;
+ /* If the original tuple was grown, the old size is stored at the end. */
+ if(bits & Tuple_header::MM_GROWN)
+ {
+ ndbassert(src_len>0);
+ src_len= src_data[src_len-1];
+ }
step= 4;
- sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
req_struct->m_varpart_page_ptr = var_page;
}
else
{
- step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
- sizes[MM]= (step + 3) >> 2;
+ /* This is for the re-expansion of a shrunken row (update2 ...) */
+
+ /* Read the total length of varpart from initial extra length word. */
+ Uint16 *len_ptr= (Uint16 *)src_ptr;
+ src_len= len_ptr[0];
+ /* Reserve space for shrink_tuple to write the new size word. */
+ src_data= src_ptr+1;
+ step= 4*(1+src_len);
req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
+ sizes[MM]= src_len;
}
- dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
+ /*
+ Reserve place for initial length word and offset array (with one extra
+ offset). This will be filled-in in later, in shrink_tuple().
+ */
+ dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+2+mm_vars+1);
dst->m_offset_array_ptr= req_struct->var_pos_array;
dst->m_var_len_offset= mm_vars;
dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
-
+
dst_ptr= expand_var_part(dst, src_data, desc, order);
+ /* Find the start of the dynamic data. */
+ Uint32* dyn_src_data= ALIGN_WORD((char *)src_data + 2*(mm_vars+1) +
+ ((Uint16 *)src_data)[mm_vars]);
ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
ndbassert((UintPtr(src_ptr) & 3) == 0);
src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
-
+
+ dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
+ dst->m_dyn_len_offset= mm_dyns;
+ dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
+ dst->m_dyn_data_ptr= (char*)dst_ptr;
+ dst_ptr= expand_dyn_part(dst, tabPtrP, dyn_src_data,
+ src_len-(dyn_src_data-src_data),
+ desc, order + mm_vars);
+
sizes[MM] += fix_size + Tuple_header::HeaderSize;
memcpy(ptr, src, 4*(fix_size + Tuple_header::HeaderSize));
}
@@ -2537,7 +2726,7 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
if(disk && dd_tot)
{
const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
- order += mm_vars;
+ order+= mm_vars+mm_dyns;
if(bits & Tuple_header::DISK_INLINE)
{
@@ -2577,6 +2766,120 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
}
ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
+ req_struct->is_expanded= true;
+}
+
+static void
+dump_hex(const Uint32 *p, Uint32 len)
+{
+ if(len > 256)
+ len= 16;
+ if(len==0)
+ return;
+ for(;;)
+ {
+ if(len>=4)
+ ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
+ else if(len>=3)
+ ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
+ else if(len>=2)
+ ndbout_c("%8p %08X %08X", p, p[0], p[1]);
+ else
+ ndbout_c("%8p %08X", p, p[0]);
+ if(len <= 4)
+ break;
+ len-= 4;
+ p+= 4;
+ }
+}
+
+void
+Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
+{
+ Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
+ Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+ const Tuple_header* ptr= req_struct->m_tuple_ptr;
+ Uint32 bits= ptr->m_header_bits;
+ const Uint32 *tuple_words= (Uint32 *)ptr;
+ const Uint32 *fix_p;
+ Uint32 fix_len;
+ const Uint32 *var_p;
+ Uint32 var_len;
+ const Uint32 *disk_p;
+ Uint32 disk_len;
+ const char *typ;
+
+ fix_p= tuple_words;
+ fix_len= Tuple_header::HeaderSize+tabPtrP->m_offsets[MM].m_fix_header_size;
+ if(req_struct->is_expanded)
+ {
+ typ= "expanded";
+ var_p= ptr->get_var_part_ptr(tabPtrP);
+ var_len= 0; // No dump of varpart in expanded
+#if 0
+ disk_p= (Uint32 *)req_struct->m_disk_ptr;
+ disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
+#endif
+ }
+ else if(bits & Tuple_header::CHAINED_ROW)
+ {
+ typ= "stored";
+ if(mm_vars+mm_dyns)
+ {
+ const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+ const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
+ Ptr<Page> tmp;
+ var_p= get_ptr(&tmp, * (Var_part_ref*)src_ptr);
+ var_len= get_len(&tmp, * (Var_part_ref*)src_ptr);
+ }
+ else
+ {
+ var_p= 0;
+ var_len= 0;
+ }
+#if 0
+ if(dd_tot)
+ {
+ Local_key key;
+ memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
+ key.m_page_no= req_struct->m_disk_page_ptr.i;
+ disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
+ }
+ else
+ {
+ disk_p= var_p;
+ disk_len= 0;
+ }
+#endif
+ }
+ else
+ {
+ typ= "shrunken";
+ if(mm_vars+mm_dyns)
+ {
+ var_p= ptr->get_var_part_ptr(tabPtrP);
+ var_len= *((Uint16 *)var_p) + 1;
+ }
+ else
+ {
+ var_p= 0;
+ var_len= 0;
+ }
+#if 0
+ disk_p= (Uint32 *)(req_struct->m_disk_ptr);
+ disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
+#endif
+ }
+ ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
+ dump_hex(fix_p, fix_len);
+ ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
+ dump_hex(var_p, var_len);
+#if 0
+ ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
+ dump_hex(disk_p, disk_len);
+#endif
}
void
@@ -2588,28 +2891,50 @@ Dbtup::prepare_read(KeyReqStruct* req_st
Uint32 bits= ptr->m_header_bits;
Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
- if(mm_vars)
+ if(mm_vars || mm_dyns)
{
const Uint32 *src_data= src_ptr;
+ Uint32 src_len;
KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
if(bits & Tuple_header::CHAINED_ROW)
{
-#if VM_TRACE
-
-#endif
- src_data= get_ptr(* (Var_part_ref*)src_ptr);
+ Ptr<Page> tmp;
+ src_data= get_ptr(&tmp, * (Var_part_ref*)src_ptr);
+ src_len= get_len(&tmp, * (Var_part_ref*)src_ptr);
+ /* If the original tuple was grown, the old size is stored at the end. */
+ if(bits & Tuple_header::MM_GROWN)
+ {
+ ndbassert(src_len>0);
+ src_len= src_data[src_len-1];
+ }
+ }
+ else
+ {
+ /* Total length of varpart stored in first word. */
+ src_len= *((Uint16 *)src_ptr);
+ src_ptr++;
+ src_data= src_ptr;
}
+
dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
dst->m_offset_array_ptr= (Uint16*)src_data;
dst->m_var_len_offset= 1;
dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
+ dst->m_dyn_data_ptr= (char *)
+ (ALIGN_WORD(dst->m_data_ptr + dst->m_offset_array_ptr[mm_vars]));
+ dst->m_dyn_part_len= src_len - ((Uint32 *)(dst->m_dyn_data_ptr)-src_data);
+ /*
+ dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
+ reading the stored/shrunken format.
+ */
- // disk part start after varsize (aligned)
- src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
+ // disk part start after dynamic part.
+ src_ptr+= src_len;
}
else
{
@@ -2648,6 +2973,8 @@ Dbtup::prepare_read(KeyReqStruct* req_st
dst->m_max_var_offset= ((Uint16*)src_ptr)[dd_vars];
}
}
+
+ req_struct->is_expanded= false;
}
void
@@ -2656,17 +2983,30 @@ Dbtup::shrink_tuple(KeyReqStruct* req_st
{
ndbassert(tabPtrP->need_shrink());
Tuple_header* ptr= req_struct->m_tuple_ptr;
+ ndbassert(!(ptr->m_header_bits & Tuple_header::CHAINED_ROW));
+ KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+ Uint32 order_desc= tabPtrP->m_real_order_descriptor;
+ const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
+ const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+ Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
+ Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+ Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
Uint16* src_off_ptr= req_struct->var_pos_array;
sizes[MM]= sizes[DD]= 0;
- if(mm_vars)
+ if(mm_vars || mm_dyns)
{
+ /* Skip the initial length word, we will set it at the end. */
+ Uint16 *length_word= (Uint16 *)dst_ptr;
+ dst_ptr++;
+
Uint16* dst_off_ptr= (Uint16*)dst_ptr;
char* dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
char* src_data_ptr= dst_data_ptr;
@@ -2682,11 +3022,129 @@ Dbtup::shrink_tuple(KeyReqStruct* req_st
dst_data_ptr += len;
}
*dst_off_ptr= off;
- ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
- ndbassert((UintPtr(ptr) & 3) == 0);
- sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
- dst_ptr = ALIGN_WORD(dst_data_ptr);
+ /*
+ Now build the dynamic part, if any.
+ First look for any trailing all-NULL words of the bitmap; we do
+ not need to store those.
+ */
+
+ ndbassert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+ char *dyn_src_ptr= dst->m_dyn_data_ptr;
+ Uint16 bm_len= tabPtrP->m_offsets[MM].m_dyn_null_words*2; // In 16-bit words
+ /* If no dynamic variables, store nothing. */
+ if(bm_len != 0)
+ {
+ Uint16 *bm_ptr= (Uint16 *)dyn_src_ptr + bm_len - 1;
+ while(*bm_ptr == 0)
+ {
+ bm_ptr--;
+ bm_len--;
+ if(bm_len == 0)
+ break;
+ }
+ }
+
+ /*
+ Copy the bitmap, counting the number of variable sized
+ attributes that are not NULL on the way.
+ */
+ Uint16 *dyn_dst_ptr= (Uint16 *)(ALIGN_WORD(dst_data_ptr));
+ Uint32 dyn_var_count= 0;
+ const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
+ Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
+ /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... }, split to separate function. */
+ Uint16 dyn_dst_data_offset= 0;
+ if (bm_len > 0)
+ {
+ const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask;
+ for(Uint16 i= 0; i<((bm_len+1)>>1); i++)
+ {
+ Uint32 v= src_bm_ptr[i];
+ dyn_var_count+= count_bits(v & *dyn_bm_var_mask_ptr++);
+ dst_bm_ptr[i]= v;
+ }
+ *((unsigned char *)dyn_dst_ptr)= bm_len; // Set length
+ dyn_dst_ptr+= bm_len;
+ dyn_dst_data_offset= 2*dyn_var_count + 2;
+ }
+ Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
+ Uint16 *dyn_src_lenoff_array=
+ dyn_src_off_array + dst->m_dyn_len_offset;
+ /*
+ Copy over the variable sized not-NULL attributes.
+ Data offsets are counted from the start of the offset array, and
+ we store one additional offset to be able to easily compute the
+ data length as the difference between offsets.
+ */
+ Uint32 bm_len_in_bytes= bm_len<<1;
+ Uint16 off_idx= 0;
+ order+= mm_fix+mm_vars; // Point to first dynfix entry
+ for(Uint32 i= 0; i<mm_dynvar; i++)
+ {
+ /*
+ Note that we must use the destination (shrunken) bitmap here, as the
+ source (expanded) bitmap may have been already clobbered (by offset
+ data).
+ */
+ if(dynNotNullFlagCheckLen(dst_bm_ptr, bm_len_in_bytes,
+ tabDesc[order[mm_dynfix+i]+1]))
+ {
+ dyn_dst_ptr[off_idx++]= dyn_dst_data_offset;
+ Uint32 dyn_src_off= dyn_src_off_array[i];
+ Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
+ memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
+ dyn_src_ptr + dyn_src_off,
+ dyn_len);
+ dyn_dst_data_offset+= dyn_len;
+ }
+ }
+ /* If all dynamic attributes are NULL, we store nothing. */
+ if(bm_len != 0)
+ {
+ dyn_dst_ptr[off_idx]= dyn_dst_data_offset;
+ ndbassert(&dyn_dst_ptr[off_idx] == dyn_dst_ptr+dyn_var_count);
+ }
+
+ char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
+ char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
+ /*
+ Zero out any padding bytes. Might not be strictly necessary, but seems
+ cleaner than leaving random stuff in there.
+ */
+ bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
+
+ /*
+ Copy over the fixed-sized not-NULL attributes.
+ Note that attributes are copied in reverse order; this is to avoid
+ overwriting not-yet-copied data, as the data is also stored in reverse
+ order.
+ */
+ for(Uint32 i= mm_dynfix; i > 0; )
+ {
+ i--;
+ Uint16 j= order[i];
+ if(dynNotNullFlagCheckLen(dst_bm_ptr, bm_len_in_bytes, tabDesc[j+1]))
+ {
+ Uint32 fixsize=
+ 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+ memmove(dyn_dst_data_ptr,
+ dyn_src_ptr + dyn_src_off_array[mm_dynvar+i],
+ fixsize);
+ dyn_dst_data_ptr += fixsize;
+ }
+ }
+
+ ndbassert(dyn_dst_data_ptr <= ((char*)ptr) + 8192);
+ ndbassert((UintPtr(ptr) & 3) == 0);
+ ndbassert((UintPtr(dyn_dst_data_ptr) & 3) == 0);
+ sizes[MM]= (dyn_dst_data_ptr - ((char*)ptr) - 4) >> 2;
+ dst->m_dyn_part_len=
+ (dyn_dst_data_ptr - dst->m_dyn_data_ptr) >> 2;
+ Uint32 varpart_len= ((Uint32 *)dyn_dst_data_ptr)-dst_ptr;
+ ndbassert(varpart_len < 0x10000);
+ length_word[0]= varpart_len;
+ dst_ptr= (Uint32 *)dyn_dst_data_ptr;
}
else
{
@@ -2708,11 +3166,15 @@ Dbtup::shrink_tuple(KeyReqStruct* req_st
memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
}
}
+
+ req_struct->is_expanded= false;
+
}
void
Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
{
+ /* ToDo: We could also do some checks here for any dynamic part. */
Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
Tuple_header::HeaderSize;
@@ -2812,7 +3274,7 @@ Dbtup::handle_size_change_after_update(K
else if(sizes[MM] > sizes[2+MM])
{
if(0) ndbout_c("shrink");
- copy_bits |= Tuple_header::MM_SHRINK;
+ req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
}
else
{
@@ -2820,11 +3282,11 @@ Dbtup::handle_size_change_after_update(K
Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
Var_page* pageP= (Var_page*)pagePtr.p;
Uint32 idx, alloc, needed;
- Uint32 *refptr = org->get_var_part_ptr(regTabPtr);
+ Var_part_ref *refptr= (Var_part_ref *)(org->get_var_part_ptr(regTabPtr));
ndbassert(bits & Tuple_header::CHAINED_ROW);
Local_key ref;
- ref.assref(*refptr);
+ ref.assref(refptr->m_ref);
idx= ref.m_page_idx;
if (! (copy_bits & Tuple_header::CHAINED_ROW))
{
@@ -2832,25 +3294,37 @@ Dbtup::handle_size_change_after_update(K
pageP = (Var_page*)pagePtr.p;
}
alloc= pageP->get_entry_len(idx);
+ Uint32 orig_size= alloc;
+ if(bits&Tuple_header::MM_GROWN)
+ {
+ /* Was grown before, so must fetch real original size from last word. */
+ Uint32 *old_var_part= pageP->get_ptr(idx);
+ ndbassert(alloc>0);
+ orig_size= old_var_part[alloc-1];
+ }
+
#ifdef VM_TRACE
if(!pageP->get_entry_chain(idx))
ndbout << *pageP << endl;
#endif
ndbassert(pageP->get_entry_chain(idx));
needed= sizes[2+MM] - fix_sz;
-
+
if(needed <= alloc)
{
//ndbassert(!regOperPtr->is_first_operation());
ndbout_c(" no grow");
return 0;
}
- copy_bits |= Tuple_header::MM_GROWN;
- if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr,
- (Var_part_ref*)refptr, alloc, needed)))
+ Uint32 *new_var_part=realloc_var_part(regFragPtr, regTabPtr, pagePtr,
+ refptr, alloc, needed);
+ if (unlikely(new_var_part==NULL))
return -1;
+ /* Mark the tuple grown, store the original length at the end. */
+ org->m_header_bits= bits|Tuple_header::MM_GROWN;
+ ndbassert(needed>1);
+ new_var_part[needed-1]= orig_size;
}
- req_struct->m_tuple_ptr->m_header_bits = copy_bits;
return 0;
}
@@ -2870,7 +3344,8 @@ Dbtup::nr_update_gci(Uint32 fragPtrI, co
PagePtr page_ptr;
int ret;
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+ if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+ tablePtr.p->m_attributes[MM].m_no_of_dynamic)
{
tablePtr.p->m_offsets[MM].m_fix_header_size +=
Tuple_header::HeaderSize+1;
@@ -2912,7 +3387,8 @@ Dbtup::nr_read_pk(Uint32 fragPtrI,
int ret;
PagePtr page_ptr;
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+ if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+ tablePtr.p->m_attributes[MM].m_no_of_dynamic)
{
tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1;
ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
@@ -3042,7 +3518,8 @@ Dbtup::nr_delete(Signal* signal, Uint32
Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+ if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+ tablePtr.p->m_attributes[MM].m_no_of_dynamic)
{
jam();
free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
--- 1.63/storage/ndb/src/ndbapi/NdbTransaction.cpp 2006-11-07 09:47:37 +01:00
+++ 1.64/storage/ndb/src/ndbapi/NdbTransaction.cpp 2006-11-07 09:47:37 +01:00
@@ -366,7 +366,29 @@ NdbTransaction::execute(ExecType aTypeOf
* operations, making postExecute impossible
*/
if (abortOption == AO_IgnoreError)
+ {
+ if (theCompletedFirstOp != NULL)
+ {
+ if (tCompletedFirstOp != NULL)
+ {
+ tCompletedLastOp->next(theCompletedFirstOp);
+ theCompletedFirstOp = tCompletedFirstOp;
+ }
+ }
+ else
+ {
+ theCompletedFirstOp = tCompletedFirstOp;
+ theCompletedLastOp = tCompletedLastOp;
+ }
+ if (tPrepOp != NULL && tRestOp != NULL) {
+ if (theFirstOpInList == NULL)
+ theFirstOpInList = tRestOp;
+ else
+ theLastOpInList->next(tRestOp);
+ theLastOpInList = tLastOp;
+ }
DBUG_RETURN(-1);
+ }
}
#ifdef ndb_api_crash_on_complex_blob_abort
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2329) | knielsen | 7 Nov |