Below is the list of changes that have just been committed into a local
5.1-ndb repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1816 05/04/05 08:57:55 joreland@stripped +9 -0
wl1870 - ndb disk/varsize
fix remaining resize cases...
ndb/src/kernel/blocks/dbtup/tuppage.hpp
1.8 05/04/05 08:57:51 joreland@stripped +7 -6
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/tuppage.cpp
1.4 05/04/05 08:57:51 joreland@stripped +14 -8
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/test_varpage.cpp
1.2 05/04/05 08:57:51 joreland@stripped +6 -2
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp
1.32 05/04/05 08:57:51 joreland@stripped +10 -6
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.58 05/04/05 08:57:51 joreland@stripped +157 -32
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
1.16 05/04/05 08:57:51 joreland@stripped +1 -1
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
1.44 05/04/05 08:57:51 joreland@stripped +80 -12
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
1.30 05/04/05 08:57:51 joreland@stripped +2 -1
Fix remaining varsize/resize cases
ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.53 05/04/05 08:57:51 joreland@stripped +5 -3
Fix remaining varsize/resize cases
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: joreland
# Host: eel.hemma.oreland.se
# Root: /home/jonas/src/mysql-5.1-ndb-dd
--- 1.15/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp Wed Mar 30 15:09:39 2005
+++ 1.16/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp Tue Apr 5 08:57:51 2005
@@ -634,7 +634,7 @@
src, sz,
gci, logfile_group_id);
- ((Var_page*)pagePtr.p)->free_record(page_idx);
+ ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
}
disk_page_update_free_space(tabPtrP, pagePtr, pagePtr.p->list_index);
--- 1.1/ndb/src/kernel/blocks/dbtup/test_varpage.cpp Fri Feb 18 15:04:58 2005
+++ 1.2/ndb/src/kernel/blocks/dbtup/test_varpage.cpp Tue Apr 5 08:57:51 2005
@@ -70,7 +70,7 @@
rec.data[i] = rand();
}
ndbout << "Alloc " << rec.size << flush;
- rec.idx= page.alloc_record(rec.size, &tmp);
+ rec.idx= page.alloc_record(rec.size, &tmp, 0);
ndbout << " -> " << rec.idx << endl;
Uint32* ptr= page.get_ptr(rec.idx);
memcpy(ptr, rec.data, 4*rec.size);
@@ -85,7 +85,7 @@
Uint32* ptr= page.get_ptr(rec.idx);
cmp(ptr, rec.data, rec.size);
delete[] rec.data;
- page.free_record(rec.idx);
+ page.free_record(rec.idx, 0);
for (unsigned k = no; k + 1 < allocated; k++)
records[k] = records[k+1];
@@ -97,7 +97,11 @@
ndbout << "Reorg" << endl;
page.reorg(&tmp);
break;
+ case 3:
+ ndbout << "Expand" << endl;
+
}
+
}
ndbout << page << endl;
}
--- 1.3/ndb/src/kernel/blocks/dbtup/tuppage.cpp Fri Mar 4 22:14:21 2005
+++ 1.4/ndb/src/kernel/blocks/dbtup/tuppage.cpp Tue Apr 5 08:57:51 2005
@@ -76,7 +76,8 @@
}
Uint32
-Tup_varsize_page::alloc_record(Uint32 alloc_size, Tup_varsize_page* temp)
+Tup_varsize_page::alloc_record(Uint32 alloc_size,
+ Tup_varsize_page* temp, Uint32 chain)
{
assert(free_space >= alloc_size);
Uint32 largest_size= DATA_WORDS - (insert_pos + high_index);
@@ -106,21 +107,23 @@
next_free_index= get_index_word(page_idx);
}
- * get_index_ptr(page_idx) = insert_pos + (alloc_size << 16);
+ assert(chain == 0 || chain == CHAIN);
+ * get_index_ptr(page_idx) = insert_pos + ((chain + alloc_size) << 16);
insert_pos += alloc_size;
free_space -= alloc_size;
-
return page_idx;
}
Uint32
-Tup_varsize_page::free_record(Uint32 page_idx)
+Tup_varsize_page::free_record(Uint32 page_idx, Uint32 chain)
{
Uint32 *index_ptr= get_index_ptr(page_idx);
Uint32 index_word= * index_ptr;
Uint32 entry_pos= index_word & 0xFFFF;
- Uint32 entry_len= index_word >> 16;
+ Uint32 entry_len= (index_word >> 16) & ~CHAIN;
+ assert(chain == 0 || chain == CHAIN);
+ assert(!(((index_word >> 16) ^ chain) & 0x8000));
if (page_idx + 1 == high_index) {
/*
We are removing the last in the entry list. We could potentially
@@ -194,7 +197,7 @@
for (; index_ptr < end_of_page; index_ptr++)
{
Uint32 index_word= * index_ptr;
- Uint32 entry_len= index_word >> 16;
+ Uint32 entry_len= (index_word >> 16) & ~CHAIN;
if (entry_len != 0) {
/*
We found an index item that needs to be packed.
@@ -203,11 +206,13 @@
Uint32 entry_pos= index_word & 0xffff;
assert(entry_pos + entry_len <= old_insert_pos);
assert(new_insert_pos + entry_len <= old_insert_pos);
- * index_ptr= new_insert_pos + (entry_len << 16);
+ * index_ptr= new_insert_pos + (index_word & 0xFFFF0000);
memcpy(m_data+new_insert_pos, copy_page->m_data+entry_pos, 4*entry_len);
new_insert_pos += entry_len;
}
+ else
+ assert((index_word >> 16) == 0);
}
insert_pos= new_insert_pos;
}
@@ -227,7 +232,8 @@
out << " [ " << i;
if(*index_ptr >> 16)
out << " pos: " << ((*index_ptr) & 0xFFFF)
- << " len: " << (*index_ptr >> 16)
+ << " len: " << ((*index_ptr >> 16) & ~page.CHAIN)
+ << (((* index_ptr >> 16) & page.CHAIN) ? " CHAIN " : " ")
<< "]" << flush;
else
out << " FREE ]" << flush;
--- 1.7/ndb/src/kernel/blocks/dbtup/tuppage.hpp Fri Apr 1 17:01:26 2005
+++ 1.8/ndb/src/kernel/blocks/dbtup/tuppage.hpp Tue Apr 5 08:57:51 2005
@@ -118,6 +118,7 @@
Uint32 unused_ph[10];
STATIC_CONST( DATA_WORDS = File_formats::PAGE_SIZE_WORDS - 32 );
+ STATIC_CONST( CHAIN = 0x8000 );
Uint32 m_data[DATA_WORDS];
@@ -145,12 +146,12 @@
* Alloc record from page, return page_idx
* temp is used when having to reorg page before allocating
*/
- Uint32 alloc_record(Uint32 size, Tup_varsize_page* temp);
+ Uint32 alloc_record(Uint32 size, Tup_varsize_page* temp, Uint32 chain);
/**
* Free record from page
*/
- Uint32 free_record(Uint32 page_idx);
+ Uint32 free_record(Uint32 page_idx, Uint32 chain);
void reorg(Tup_varsize_page* temp);
void rebuild_index(Uint32* ptr);
@@ -161,8 +162,8 @@
bool is_space_behind_entry(Uint32 page_index, Uint32 growth_len) const {
Uint32 idx= get_index_word(page_index);
Uint32 pos= idx & 0xFFFF;
- Uint32 len= idx >> 16;
- if ((insert_pos == pos + len) &&
+ Uint32 len= (idx >> 16) & ~CHAIN;
+ if ((pos + len == insert_pos) &&
(insert_pos + growth_len < DATA_WORDS - high_index))
return true;
return false;
@@ -182,8 +183,8 @@
void shrink_entry(Uint32 page_index, Uint32 new_size){
Uint32 *pos= get_index_ptr(page_index);
Uint32 idx= *pos;
- *pos= (idx & 0xFFFF) + (new_size << 16);
- Uint32 old_size= idx >> 16;
+ *pos= (idx & (CHAIN << 16 | 0xFFFF)) + (new_size << 16);
+ Uint32 old_size= (idx >> 16) & ~CHAIN;
assert(old_size >= new_size);
Uint32 shrink = old_size - new_size;
--- 1.52/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Fri Apr 1 17:01:26 2005
+++ 1.53/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Tue Apr 5 08:57:51 2005
@@ -2234,9 +2234,10 @@
//---------------------------------------------------------------
//
// Public methods
- Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
- void free_var_part(Fragrecord*, Tablerec*, Var_part_ref);
- void free_var_part(Fragrecord*, Tablerec*, Local_key*, Var_page*);
+ Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*,
+ Uint32 base);
+ void free_var_part(Fragrecord*, Tablerec*, Var_part_ref, Uint32 chain);
+ void free_var_part(Fragrecord*, Tablerec*, Local_key*, Var_page*, Uint32 chain);
Uint32* alloc_fix_rec(Fragrecord*, Tablerec*, Local_key*, Uint32 *);
void free_fix_rec(Fragrecord*, Tablerec*, Local_key*, Fix_page*);
@@ -2400,6 +2401,7 @@
Operationrec*, Fragrecord*, Tablerec*);
int handle_size_change_after_update(KeyReqStruct* req_struct,
+ Tuple_header* org,
Operationrec*,
Fragrecord* const regFragPtr,
Tablerec* const regTabPtr,
--- 1.29/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp Fri Apr 1 17:01:26 2005
+++ 1.30/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp Tue Apr 5 08:57:51 2005
@@ -142,7 +142,7 @@
ndbout_c("abort grow");
Var_page *pageP= (Var_page*)page.p;
Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx;
- Uint32 len= pageP->get_entry_len(idx);
+ Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
Uint32 fix_size= regTabPtr.p->m_offsets[MM].m_fix_header_size;
Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
@@ -152,6 +152,7 @@
ndbassert(sz <= len);
pageP->shrink_entry(idx, sz);
+ update_free_page_list(regFragPtr.p, pageP);
}
else if(bits & Tuple_header::MM_SHRINK)
{
--- 1.43/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp Fri Apr 1 13:11:10 2005
+++ 1.44/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp Tue Apr 5 08:57:51 2005
@@ -30,12 +30,10 @@
TablerecPtr regTabPtr;
FragrecordPtr regFragPtr;
Uint32 frag_page_id, frag_id;
+#if 0
ljamEntry();
-#if 0
- //ndbout_c("execTUP_DEALLOCREQ");
-
frag_id= signal->theData[0];
regTabPtr.i= signal->theData[1];
frag_page_id= signal->theData[2];
@@ -48,6 +46,8 @@
Local_key tmp;
tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id);
tmp.m_page_idx= page_index;
+ ndbout_c("execTUP_DEALLOCREQ %d %d", tmp.m_page_no, page_index);
+ return;
PagePtr pagePtr;
Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
@@ -67,9 +67,10 @@
if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
{
free_var_part(regFragPtr.p, regTabPtr.p,
- *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p));
+ *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p),
+ Var_page::CHAIN);
}
- free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p);
+ free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p, 0);
} else {
free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
}
@@ -180,10 +181,11 @@
if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
{
free_var_part(regFragPtr, regTabPtr,
- *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr));
+ *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr),
+ Var_page::CHAIN);
}
free_var_part(regFragPtr, regTabPtr,
- ®OperPtr->m_tuple_location, (Var_page*)page);
+ ®OperPtr->m_tuple_location, (Var_page*)page, 0);
} else {
free_fix_rec(regFragPtr, regTabPtr,
®OperPtr->m_tuple_location, (Fix_page*)page);
@@ -222,10 +224,36 @@
{
Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr);
memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fix_size));
- Uint32 *var_part= get_ptr(*(Var_part_ref*)ref);
- Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]);
- memcpy(var_part, copy->get_var_part_ptr(regTabPtr), sz);
+
+ Local_key tmp; tmp= *ref;
+ if(0) printf("%p %d %d (%d bytes) - ref: %x ", tuple_ptr,
+ regOperPtr->m_tuple_location.m_page_no,
+ regOperPtr->m_tuple_location.m_page_idx,
+ 4*(Tuple_header::HeaderSize+fix_size),
+ *ref);
+ Ptr<Var_page> vpagePtr;
+ Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref);
+ Uint32 *src= copy->get_var_part_ptr(regTabPtr);
+ Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
+ memcpy(dst, src, sz);
+ if(0) printf("ptr: %p %d ref: %x - chain commit", dst, sz, *ref);
copy_bits |= Tuple_header::CHAINED_ROW;
+
+ if(0)
+ {
+ for(Uint32 i = 0; i<((sz+3)>>2); i++)
+ printf(" %.8x", src[i]);
+ printf("\n");
+ }
+
+ if(copy_bits & Tuple_header::MM_SHRINK)
+ {
+ if(0) printf(" - shrink %d -> %d - ",
+ vpagePtr.p->get_entry_len(tmp.m_page_idx), (sz + 3) >> 2);
+ vpagePtr.p->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+ update_free_page_list(regFragPtr, vpagePtr.p);
+ }
+ if(0) ndbout_c("");
}
else
{
@@ -233,10 +261,15 @@
Uint32 sz= Tuple_header::HeaderSize + fix_size +
((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
memcpy(tuple_ptr, copy, 4*sz);
- if(copy->m_header_bits & Tuple_header::MM_SHRINK)
+ if(0) ndbout_c("%p %d %d (%d bytes)", tuple_ptr,
+ regOperPtr->m_tuple_location.m_page_no,
+ regOperPtr->m_tuple_location.m_page_idx,
+ 4*sz);
+ if(copy_bits & Tuple_header::MM_SHRINK)
{
((Var_page*)page)->shrink_entry(regOperPtr->m_tuple_location.m_page_idx,
sz);
+ update_free_page_list(regFragPtr, (Var_page*)page);
}
}
@@ -263,7 +296,7 @@
ndbassert(page->uncommitted_used_space >= sz);
page->uncommitted_used_space -= sz;
key.m_page_idx= ((Var_page*)page)->
- alloc_record(sz, (Var_page*)ctemp_page);
+ alloc_record(sz, (Var_page*)ctemp_page, 0);
disk_page_undo_alloc(page, &key, sz, gci, logfile_group_id);
}
@@ -303,6 +336,41 @@
if (regTabPtr->checksumIndicator) {
jam();
setChecksum(tuple_ptr, regTabPtr);
+ }
+
+ if(regTabPtr->m_attributes[MM].m_no_of_varsize)
+ {
+ Var_page* pageP= (Var_page*)page;
+loop:
+ for(Uint32 i = 1; i < pageP->high_index; i++)
+ {
+ Uint32 len= pageP->get_entry_len(i);
+ if(len > 0 && !(len & Var_page::CHAIN))
+ {
+ Tuple_header* tmp= (Tuple_header*)pageP->get_ptr(i);
+ if(tmp->m_operation_ptr_i != RNIL)
+ {
+ if(tmp->m_operation_ptr_i > c_operation_pool.getSize())
+ {
+ ndbout << "FAILED " << i << endl << *pageP << endl;
+ }
+ c_operation_pool.getPtr(tmp->m_operation_ptr_i);
+ }
+ if(tmp->m_header_bits & Tuple_header::CHAINED_ROW)
+ get_ptr(* (Var_part_ref*)tmp->get_var_part_ptr(regTabPtr));
+ }
+ }
+
+#if 0
+ if(false && regOperPtr->op_struct.op_type != ZREAD &&
+ req_struct->m_varpart_page_ptr_p != pageP &&
+ req_struct->m_varpart_page_ptr_p != 0)
+ {
+ ndbout_c("verifying extra page");
+ pageP= req_struct->m_varpart_page_ptr_p;
+ goto loop;
+ }
+#endif
}
}
--- 1.57/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp Fri Apr 1 17:01:26 2005
+++ 1.58/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp Tue Apr 5 08:57:51 2005
@@ -1010,27 +1010,72 @@
void Dbtup::sendTUPKEYCONF(Signal* signal,
KeyReqStruct *req_struct,
Operationrec * const regOperPtr)
- {
- TupKeyConf * const tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
-
- Uint32 RuserPointer= regOperPtr->userpointer;
- Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
- Uint32 log_size= req_struct->log_size;
- Uint32 read_length= req_struct->read_length;
- Uint32 last_row= req_struct->last_row;
-
- set_trans_state(regOperPtr, TRANS_STARTED);
- set_tuple_state(regOperPtr, TUPLE_PREPARED);
- tupKeyConf->userPtr= RuserPointer;
- tupKeyConf->readLength= read_length;
- tupKeyConf->writeLength= log_size;
- tupKeyConf->noFiredTriggers= RnoFiredTriggers;
- tupKeyConf->lastRow= last_row;
-
+{
+ TupKeyConf * const tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
+
+ Uint32 RuserPointer= regOperPtr->userpointer;
+ Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
+ Uint32 log_size= req_struct->log_size;
+ Uint32 read_length= req_struct->read_length;
+ Uint32 last_row= req_struct->last_row;
+
+ set_trans_state(regOperPtr, TRANS_STARTED);
+ set_tuple_state(regOperPtr, TUPLE_PREPARED);
+ tupKeyConf->userPtr= RuserPointer;
+ tupKeyConf->readLength= read_length;
+ tupKeyConf->writeLength= log_size;
+ tupKeyConf->noFiredTriggers= RnoFiredTriggers;
+ tupKeyConf->lastRow= last_row;
+
EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
TupKeyConf::SignalLength);
+
+ Uint32 RnoOfFragrec= cnoOfFragrec;
+ Uint32 RnoOfTablerec= cnoOfTablerec;
+
+ jamEntry();
+ fragptr.i= regOperPtr->fragmentPtr;
+ ptrAss(fragptr, fragrecord);
+ Fragrecord * regFragPtr= fragptr.p;
+
+ tabptr.i = regFragPtr->fragTableId;
+ ptrAss(tabptr, tablerec);
+ Tablerec* regTabPtr = tabptr.p;
+
+ if(regTabPtr->m_attributes[MM].m_no_of_varsize)
+ {
+ PagePtr tmp;
+ tmp.i= regOperPtr->m_tuple_location.m_page_no;
+ ptrCheckGuard(tmp, cnoOfPage, page);
+ Var_page* pageP= (Var_page*)tmp.p;
+loop:
+ for(Uint32 i = 1; i < pageP->high_index; i++)
+ {
+ Uint32 len= pageP->get_entry_len(i);
+ if(len > 0 && !(len & Var_page::CHAIN))
+ {
+ Tuple_header* tmp= (Tuple_header*)pageP->get_ptr(i);
+ if(tmp->m_operation_ptr_i != RNIL)
+ {
+ c_operation_pool.getPtr(tmp->m_operation_ptr_i);
+ }
+ if(tmp->m_header_bits & Tuple_header::CHAINED_ROW)
+ get_ptr(* (Var_part_ref*)tmp->get_var_part_ptr(regTabPtr));
+ }
+ }
+
+ if(false && regOperPtr->op_struct.op_type != ZREAD &&
+ req_struct->m_varpart_page_ptr_p != pageP &&
+ req_struct->m_varpart_page_ptr_p != 0)
+ {
+ ndbout_c("verifying extra page");
+ pageP= req_struct->m_varpart_page_ptr_p;
+ goto loop;
+ }
+ }
}
+
#define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
/* ---------------------------------------------------------------- */
@@ -1105,6 +1150,7 @@
KeyReqStruct* req_struct)
{
Uint32 *dst;
+ Tuple_header *base= req_struct->m_tuple_ptr, *org;
if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
regTabPtr->total_rec_size)) == 0)
{
@@ -1112,7 +1158,6 @@
goto error;
}
- Tuple_header* org;
Uint32 tup_version;
if(operPtrP->is_first_operation())
{
@@ -1187,6 +1232,7 @@
{
shrink_tuple(req_struct, sizes+2, regTabPtr);
if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
+ base,
operPtrP,
regFragPtr,
regTabPtr,
@@ -1286,7 +1332,7 @@
KeyReqStruct *req_struct)
{
Uint32 *dst, *ptr= 0;
- Tuple_header *org= req_struct->m_tuple_ptr;
+ Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
Tuple_header *tuple_ptr;
if ((dst=
c_undo_buffer.alloc_copy_tuple(®OperPtr.p->m_copy_tuple_location,
@@ -1373,7 +1419,7 @@
if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
sizes[2+MM],
®OperPtr.p->m_tuple_location,
- &frag_page_id)) == 0)
+ &frag_page_id, 0)) == 0)
goto mem_error;
}
@@ -1425,6 +1471,7 @@
return 0;
return handle_size_change_after_update(req_struct,
+ base,
regOperPtr.p,
regFragPtr,
regTabPtr,
@@ -2496,7 +2543,8 @@
memcpy(ptr, src, 4*fix_size);
}
- ptr->m_header_bits= Tuple_header::DISK_INLINE | bits;
+ ptr->m_header_bits= Tuple_header::DISK_INLINE |
+ bits & ~(Uint32)(Tuple_header::CHAINED_ROW);
src->m_header_bits= bits &
~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
@@ -2651,40 +2699,65 @@
int
Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
+ Tuple_header* org,
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr,
Uint32 sizes[4])
{
ndbrequire(sizes[1] == sizes[3]);
- ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
+ //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
+ if(0)
+ printf("%p %d %d - handle_size_change_after_update ",
+ req_struct->m_tuple_ptr,
+ regOperPtr->m_tuple_location.m_page_no,
+ regOperPtr->m_tuple_location.m_page_idx);
+
+ Uint32 bits= org->m_header_bits;
+ Uint32 fix_sz = Tuple_header::HeaderSize +
+ regTabPtr->m_offsets[MM].m_fix_header_size;
+
if(sizes[MM] == sizes[2+MM])
;
else if(sizes[MM] > sizes[2+MM])
{
- ndbout_c("shrink");
+ if(0) ndbout_c("shrink");
req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::MM_SHRINK;
}
else
{
- printf("grow - ");
+ if(0) printf("grow - ");
Var_page* pageP= req_struct->m_varpart_page_ptr_p;
- Uint32 idx= regOperPtr->m_tuple_location.m_page_idx;
- Uint32 alloc= pageP->get_entry_len(idx);
- if(sizes[2+MM] <= alloc)
+ Uint32 idx, alloc, needed;
+ if(! (bits & Tuple_header::CHAINED_ROW))
+ {
+ idx= regOperPtr->m_tuple_location.m_page_idx;
+ alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
+ needed= sizes[2+MM];
+ }
+ else
+ {
+ Local_key tmp;
+ tmp= *org->get_var_part_ptr(regTabPtr);
+ idx= tmp.m_page_idx;
+ alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
+ needed= sizes[2+MM] - fix_sz;
+ }
+
+ if(needed <= alloc)
{
ndbassert(!regOperPtr->is_first_operation());
ndbout_c(" no grow");
return 0;
}
- Uint32 add= sizes[2+MM] - alloc;
+ Uint32 add= needed - alloc;
- if(pageP->free_space >= add)
+ if(false && pageP->free_space >= add)
{
jam();
if(!pageP->is_space_behind_entry(idx, add))
{
- printf("extra reorg");
+ if(0) printf("extra reorg");
jam();
/**
* In this case we need to reorganise the page to fit. To ensure we
@@ -2703,13 +2776,65 @@
add += alloc;
}
pageP->grow_entry(idx, add);
+ update_free_page_list(regFragPtr, pageP);
req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::MM_GROWN;
}
else
{
- abort();
+ assert(fix_sz < alloc);
+
+ Local_key key;
+
+ if(! (bits & Tuple_header::CHAINED_ROW))
+ {
+ org->m_header_bits |= Tuple_header::CHAINED_ROW;
+ Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr,
+ needed - fix_sz, &key, &id,
+ Var_page::CHAIN);
+ assert(dst);
+ org= (Tuple_header*)pageP->get_ptr(idx);
+ Uint32 *ptr= org->get_var_part_ptr(regTabPtr);
+
+ //assert(key.m_page_no != pageP->physical_page_id);
+ memcpy(dst, ptr, 4*(needed-fix_sz));
+ * ptr = key.ref(); // store ref
+
+ if(0)printf("%d %d", pageP->get_entry_len(idx), fix_sz + 1);
+
+ pageP->shrink_entry(idx, fix_sz + 1); // var part ref
+ pageP->reorg((Var_page*)ctemp_page);
+ update_free_page_list(regFragPtr, pageP);
+ if(0) printf(" %d bytes, split(ref: %x %x ptr: %p sz: %d, %d) ",
+ 4*fix_sz, *ptr, key.ref(), dst, sizes[2+MM] - fix_sz, alloc-fix_sz);
+ }
+ else
+ {
+ assert(sizes[2+MM] >= alloc);
+ pageP->reorg((Var_page*)ctemp_page);
+ Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr,
+ needed, &key, &id,
+ Var_page::CHAIN);
+ assert(dst);
+
+ Uint32 base_idx= regOperPtr->m_tuple_location.m_page_idx;
+ org= (Tuple_header*)
+ ((Var_page*)req_struct->m_page_ptr_p)->get_ptr(base_idx);
+ Uint32 *ref= org->get_var_part_ptr(regTabPtr);
+ Uint32 old_ref= *ref;
+ Uint32 *src= pageP->get_ptr(idx);
+
+ assert(alloc < needed);
+ memcpy(dst, src, 4*alloc);
+ *ref = key.ref();
+ if(0) printf(" %d bytes, resplit(ref: %x,%x ptr: %p sz: %d, %d) ",
+ 4*(fix_sz+1), old_ref,*ref, dst, sizes[2+MM] - fix_sz, alloc-fix_sz);
+ free_var_part(regFragPtr, regTabPtr,
+ *(Var_part_ref*)&old_ref, Var_page::CHAIN);
+ }
+
+ req_struct->m_tuple_ptr->m_header_bits = bits | Tuple_header::MM_GROWN;
}
- ndbout_c("free: %d", pageP->free_space);
+ if(0) ndbout_c("free: %d", pageP->free_space);
}
return 0;
}
--- 1.31/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp Thu Mar 31 20:59:24 2005
+++ 1.32/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp Tue Apr 5 08:57:51 2005
@@ -455,7 +455,8 @@
Tablerec* const tab_ptr,
Uint32 alloc_size,
Local_key* key,
- Uint32 * out_frag_page_id)
+ Uint32 * out_frag_page_id,
+ Uint32 base)
{
Var_page* page_header;
PagePtr page_ptr;
@@ -475,7 +476,8 @@
ljam();
page_header= (Var_page*)page_ptr.p;
}
- Uint32 idx= page_header->alloc_record(alloc_size, (Var_page*)ctemp_page);
+ Uint32 idx= page_header->alloc_record(alloc_size,
+ (Var_page*)ctemp_page, base);
key->m_page_no= page_ptr.i;
key->m_page_idx= idx;
@@ -501,7 +503,8 @@
Returns true if deallocation was successful otherwise false
*/
void
-Dbtup::free_var_part(Fragrecord* frag_ptr, Tablerec* tab_ptr, Var_part_ref ref)
+Dbtup::free_var_part(Fragrecord* frag_ptr, Tablerec* tab_ptr,
+ Var_part_ref ref, Uint32 chain)
{
Local_key tmp;
PagePtr pagePtr;
@@ -509,17 +512,18 @@
pagePtr.i= tmp.m_page_no= ref.m_ref >> MAX_TUPLES_BITS;
ptrCheckGuard(pagePtr, cnoOfPage, page);
- free_var_part(frag_ptr, tab_ptr, &tmp, (Var_page*)pagePtr.p);
+ free_var_part(frag_ptr, tab_ptr, &tmp, (Var_page*)pagePtr.p, chain);
}
void Dbtup::free_var_part(Fragrecord* const frag_ptr,
Tablerec* const tab_ptr,
Local_key* key,
- Var_page* const page_header)
+ Var_page* const page_header,
+ Uint32 chain)
{
Uint32 page_idx= key->m_page_idx;
- page_header->free_record(page_idx);
+ page_header->free_record(page_idx, chain);
ndbassert(page_header->free_space <= Var_page::DATA_WORDS);
if (page_header->free_space == Var_page::DATA_WORDS - 1)
| Thread |
|---|
| • bk commit into 5.1-ndb tree (joreland:1.1816) | jonas.oreland | 5 Apr |