Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1942 05/11/21 12:23:14 jonas@stripped +30 -0
ndb - varsize
make all varsize tuples chained so that
handling of rowid wrt SR and NR is "possible"
storage/ndb/src/kernel/vm/SLList.hpp
1.5 05/11/21 12:19:05 jonas@stripped +35 -0
Add SLList::add(range) and SLList::remove_front
storage/ndb/src/kernel/blocks/restore.cpp
1.3 05/11/21 12:19:05 jonas@stripped +2 -1
redo handling of varsize records
storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp
1.3 05/11/21 12:19:05 jonas@stripped +205 -111
redo handling of varsize records
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
1.8 05/11/21 12:19:05 jonas@stripped +20 -41
redo handling of varsize records
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.25 05/11/21 12:19:05 jonas@stripped +72 -316
redo handling of varsize records
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
1.7 05/11/21 12:19:05 jonas@stripped +11 -56
remove if statements
wrt to chained/not chained
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.32 05/11/21 12:19:05 jonas@stripped +128 -141
Add list of empty pages to use for varparts
storage/ndb/src/kernel/vm/DLList.hpp
1.5 05/11/04 09:14:46 jonas@stripped[jonas] +41 -5
Import rowid
storage/ndb/src/kernel/vm/ArrayPool.hpp
1.10 05/11/04 09:17:42 jonas@stripped[jonas] +269 -180
Import rowid
storage/ndb/src/kernel/blocks/suma/Suma.cpp
1.29 05/11/04 09:22:13 jonas@stripped[jonas] +11 -9
Import rowid
storage/ndb/src/kernel/blocks/restore.cpp
1.2 05/11/05 00:34:51 jonas@stripped[jonas] +37 -14
Import rowid
storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
1.2 05/10/31 16:23:57 jonas@stripped[jonas] +44 -18
Import rowid
storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp
1.2 05/11/04 14:35:51 jonas@stripped[jonas] +177 -26
Import rowid
storage/ndb/src/kernel/blocks/dbtup/test_varpage.cpp
1.2 05/11/01 11:50:11 jonas@stripped[jonas] +172 -20
Import rowid
storage/ndb/src/kernel/blocks/dbtup/Undo_buffer.cpp
1.2 05/11/04 08:03:47 jonas@stripped[jonas] +17 -22
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp
1.2 05/11/04 14:34:58 jonas@stripped[jonas] +105 -624
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
1.7 05/11/04 07:59:25 jonas@stripped[jonas] +5 -12
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
1.24 05/11/05 00:05:04 jonas@stripped[jonas] +4 -0
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
1.4 05/11/04 10:32:07 jonas@stripped[jonas] +29 -12
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupPagMan.cpp
1.8 05/11/04 08:20:21 jonas@stripped[jonas] +21 -27
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
1.17 05/11/04 10:21:40 jonas@stripped[jonas] +0 -6
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
1.16 05/11/04 08:12:53 jonas@stripped[jonas] +6 -13
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
1.23 05/11/04 10:22:51 jonas@stripped[jonas] +10 -16
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp
1.4 05/11/04 14:35:25 jonas@stripped[jonas] +73 -9
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.24 05/11/04 12:16:48 jonas@stripped[jonas] +94 -45
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp
1.11 05/11/04 07:58:31 jonas@stripped[jonas] +5 -6
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
1.6 05/11/04 12:22:34 jonas@stripped[jonas] +24 -22
Import rowid
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
1.5 05/11/04 12:08:38 jonas@stripped[jonas] +7 -7
Import rowid
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.31 05/11/04 12:20:22 jonas@stripped[jonas] +24 -22
Import rowid
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
1.81 05/11/03 16:54:19 jonas@stripped[jonas] +29 -1
Import rowid
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
1.41 05/11/02 14:43:02 jonas@stripped[jonas] +4 -0
Import rowid
storage/ndb/src/kernel/blocks/backup/Backup.cpp
1.27 05/11/05 00:10:39 jonas@stripped[jonas] +35 -17
Import rowid
storage/ndb/include/ndb_version.h.in
1.5 05/11/03 14:55:34 jonas@stripped[jonas] +5 -0
Import rowid
storage/ndb/include/kernel/signaldata/TupKey.hpp
1.5 05/11/02 14:43:22 jonas@stripped[jonas] +5 -2
Import rowid
storage/ndb/include/kernel/signaldata/LqhKey.hpp
1.4 05/11/03 13:11:46 jonas@stripped[jonas] +19 -2
Import rowid
storage/ndb/include/kernel/AttributeHeader.hpp
1.13 05/11/05 00:00:24 jonas@stripped[jonas] +1 -0
Import rowid
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-vs
--- 1.1/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp 2005-11-07 12:19:12 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp 2005-11-21 12:19:05 +01:00
@@ -39,394 +39,6 @@
c_max_list_size[4]= 199;
}
-#if 0
-void
-Dbtup::free_separate_var_part(Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
- Tuple_header* tuple_header)
-{
- Uint32 page_ref, page_index;
- PagePtr page_ptr;
- page_ref= tuple_header->m_data[regTabPtr->var_offset];
- page_index= page_ref & MAX_TUPLES_PER_PAGE;
- page_ptr.i= page_ref >> MAX_TUPLES_BITS;
- ptrCheckGuard(page_ptr, cnoOfPage, cpage);
- free_var_rec(regFragPtr,
- regTabPtr,
- (Var_page*)page_ptr.p,
- page_index);
-}
-
-
-void
-Dbtup::abort_separate_var_part(Uint32 var_page_ref,
- const Uint32* copy_var_part,
- Uint32 copy_var_size)
-{
- Uint32 page_index;
- PagePtr var_page_ptr;
- page_index= var_page_ref & MAX_TUPLES_PER_PAGE;
- var_page_ptr.i= var_page_ref >> MAX_TUPLES_BITS;
- ptrCheckGuard(var_page_ptr, cnoOfPage, cpage);
- Uint32 *ptr= ((Var_page*)var_page_ptr.p)->get_ptr(page_index);
- MEMCOPY_NO_WORDS(ptr, copy_var_part, copy_var_size);
-}
-
-void
-Dbtup::shrink_entry(Fragrecord* const regFragPtr,
- Var_page* const page_ptr,
- Uint32 page_index,
- Uint32 new_size)
-{
-
- page_ptr->shrink_entry(page_index, new_size);
- update_free_page_list(regFragPtr, page_ptr);
-}
-
-void
-Dbtup::check_entry_size(KeyReqStruct* req_struct,
- Operationrec* regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
-{
-#if 0
- Uint32 vp_index, no_var_attr, total_var_size, add_size, new_size, entry_len;
- Uint32 vp_offset, tuple_size, var_part_local;
- Uint32 *var_data_part, *var_link;
- PagePtr var_page_ptr;
- Uint32* tuple_ptr= req_struct->m_tuple_ptr;
- Uint32 page_index= regOperPtr->m_tuple_location.m_page_idx;
- tuple_size= regTabPtr->tupheadsize;
- no_var_attr= regTabPtr->no_var_attr;
- var_part_local= get_var_part_local(* (tuple_ptr+1));
- add_size= regTabPtr->var_array_wsize;
- var_link= tuple_ptr+tuple_size;
- if (var_part_local == 1) {
- ljam();
- var_data_part= var_link;
- var_page_ptr.p= req_struct->fix_page_ptr.p;
- add_size+= tuple_size;
- vp_index= regOperPtr->m_tuple_location.m_page_idx;
- } else {
- ljam();
- entry_len= get_entry_len(req_struct->var_page_ptr, page_index);
- if (entry_len > (tuple_size + 1)) {
- ljam();
- shrink_entry(regFragPtr,
- req_struct->fix_page_ptr,
- page_index,
- tuple_size + 1);
- } else {
- ndbassert(entry_len == (tuple_size + 1));
- }
- set_up_var_page(*var_link,
- regFragPtr,
- var_page_ptr,
- vp_index,
- vp_offset);
- var_data_part= &var_page_ptr.p->pageWord[vp_offset];
- }
- total_var_size= calculate_total_var_size((uint16*)var_data_part,
- no_var_attr);
- new_size= total_var_size + add_size;
- entry_len= get_entry_len(var_page_ptr.p, vp_index);
- if (new_size < entry_len) {
- ljam();
- shrink_entry(regFragPtr,
- var_page_ptr.p,
- vp_index,
- new_size);
- } else {
- ndbassert(entry_len == new_size);
- }
-#endif
-}
-
-inline
-void
-Dbtup::grow_entry(Fragrecord* const regFragPtr,
- Var_page* page_header,
- Uint32 page_index,
- Uint32 growth_len)
-{
- page_header->grow_entry(page_index, growth_len);
- update_free_page_list(regFragPtr, page_header);
-}
-
-
-void
-Dbtup::setup_varsize_part(KeyReqStruct* req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const regTabPtr)
-{
- Uint32 num_var_attr;
- Uint32 var_data_wsize;
- Uint32* var_data_ptr;
- Uint32* var_data_start;
-
- Uint32 page_index= regOperPtr->m_tuple_location.m_page_idx;
- if (regTabPtr->var_sized_record) {
- ljam();
- num_var_attr= regTabPtr->no_var_attr;
- if (!(req_struct->m_tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW))
- {
- ljam();
- var_data_ptr= req_struct->m_tuple_ptr->m_data+regTabPtr->var_offset;
- req_struct->var_page_ptr.i = req_struct->fix_page_ptr.i;
- req_struct->var_page_ptr.p = (Var_page*)req_struct->fix_page_ptr.p;
- req_struct->vp_index= page_index;
- } else {
- Uint32 var_link= req_struct->m_tuple_ptr->m_data[regTabPtr->var_offset];
- ljam();
-
- Uint32 vp_index= var_link & MAX_TUPLES_PER_PAGE;
- PagePtr var_page_ptr;
- var_page_ptr.i= var_link >> MAX_TUPLES_BITS;
- ptrCheckGuard(var_page_ptr, cnoOfPage, cpage);
-
- req_struct->vp_index= vp_index;
- req_struct->var_page_ptr.i= var_page_ptr.i;
- req_struct->var_page_ptr.p= (Var_page*)var_page_ptr.p;
-
- var_data_ptr= ((Var_page*)var_page_ptr.p)->get_ptr(vp_index);
- req_struct->fix_var_together= false;
- }
- var_data_start= &var_data_ptr[regTabPtr->var_array_wsize];
- req_struct->var_len_array= (Uint16*)var_data_ptr;
- req_struct->var_data_start= var_data_start;
- var_data_wsize= init_var_pos_array(req_struct->var_len_array,
- &req_struct->var_pos_array[0],
- num_var_attr);
- req_struct->var_data_end= &var_data_start[var_data_wsize];
- }
-}
-
-
-bool
-Dbtup::compress_var_sized_part_after_update(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
-{
- Uint32 entry_len, old_var_len, new_size, total_size;
- Uint32* used_var_data_start= req_struct->var_data_start;
- total_size= calculate_total_var_size(req_struct->var_len_array,
- regTabPtr->no_var_attr);
- entry_len= req_struct->var_page_ptr.p->get_entry_len(req_struct->vp_index);
- if (req_struct->fix_var_together) {
- ljam();
- old_var_len= entry_len -
- (regTabPtr->tupheadsize + regTabPtr->var_array_wsize);
- } else {
- ljam();
- old_var_len= entry_len - regTabPtr->var_array_wsize;
- }
- if (total_size > old_var_len) {
- ljam();
- /**
- * The new total size of the variable part is greater than it was before
- * the update. We will need to increase the size of the record or split
- * it into a fixed part and a variable part.
- */
- if (! handle_growth_after_update(req_struct,
- regFragPtr,
- regTabPtr,
- (total_size - old_var_len))) {
- ljam();
- return false;
- }
- } else if (total_size < old_var_len) {
- ljam();
- /**
- * The new total size is smaller than what it was before we started.
- * In one case we can shrink immediately and this is after an initial
- * insert since we allocate in this case a full sized tuple and there
- * is no problem in shrinking this already before committing.
- *
- * For all other cases we need to keep the space to ensure that we
- * can safely abort (which means in this case to grow back to
- * original size). Thus shrink cannot be done before commit occurs
- * in those cases.
- */
- if (regOperPtr->op_struct.op_type == ZINSERT &&
- regOperPtr->prevActiveOp == RNIL &&
- regOperPtr->nextActiveOp == RNIL) {
- ljam();
- new_size= entry_len - (old_var_len - total_size);
- shrink_entry(regFragPtr,
- req_struct->var_page_ptr.p,
- req_struct->vp_index,
- new_size);
- }
- }
- reset_req_struct_data(regTabPtr,
- req_struct,
- regOperPtr->m_tuple_location.m_page_idx);
- copy_back_var_attr(req_struct, regTabPtr, used_var_data_start);
- return true;
-}
-
-void
-Dbtup::reset_req_struct_data(Tablerec* const regTabPtr,
- KeyReqStruct* req_struct,
- Uint32 fix_index)
-{
- Var_page *var_page_ptr, *fix_page_ptr;
- Uint32 vp_index;
-
- fix_page_ptr= (Var_page*)req_struct->fix_page_ptr.p;
- var_page_ptr= req_struct->var_page_ptr.p;
- vp_index= req_struct->vp_index;
-
- req_struct->m_tuple_ptr= (Tuple_header*)fix_page_ptr->get_ptr(fix_index);
-
- Uint32 vp_len= var_page_ptr->get_entry_len(vp_index);
-
- Uint32 *var_ptr;
- if (req_struct->fix_var_together)
- {
- ljam();
- var_ptr= req_struct->m_tuple_ptr->m_data+regTabPtr->var_offset;
- }
- else
- {
- var_ptr= var_page_ptr->get_ptr(vp_index);
- }
-
- req_struct->var_len_array= (Uint16*)(var_ptr);
- req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
- req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize+vp_len;
-}
-
-void
-Dbtup::copy_back_var_attr(KeyReqStruct *req_struct,
- Tablerec* const regTabPtr,
- Uint32 *source_rec)
-{
- Uint32 i, dest_index, vpos_index, byte_size, word_size, num_var_attr;
- Uint32 *dest_rec, max_var_size, entry_len;
- Uint32 total_word_size= 0;
-
-#ifdef VM_TRACE
- entry_len= req_struct->var_page_ptr.p->get_entry_len(req_struct->vp_index);
- if (req_struct->fix_var_together) {
- ljam();
- max_var_size= entry_len - (regTabPtr->tupheadsize +
- regTabPtr->var_array_wsize);
- } else {
- ljam();
- max_var_size= entry_len - regTabPtr->var_array_wsize;
- }
-#endif
- dest_rec= req_struct->var_data_start;
- num_var_attr= regTabPtr->no_var_attr;
- ljam();
- for (i= 0; i < num_var_attr; i++) {
- dest_index= total_word_size;
- byte_size= req_struct->var_len_array[i];
- vpos_index= req_struct->var_pos_array[i];
- word_size= convert_byte_to_word_size(byte_size);
- total_word_size+= word_size;
- req_struct->var_pos_array[i]= total_word_size;
- MEMCOPY_NO_WORDS(&dest_rec[vpos_index],
- &source_rec[dest_index],
- word_size);
- ndbassert((vpos_index + word_size) <= max_var_size);
- }
- ndbassert(total_word_size <= max_var_size);
- req_struct->var_pos_array[num_var_attr]= total_word_size;
- req_struct->var_data_end= &req_struct->var_data_start[total_word_size];
-}
-
-
-void
-Dbtup::copy_out_var_attr(KeyReqStruct *req_struct,
- Tablerec* const regTabPtr)
-{
- Uint32 i, source_index, byte_size, vpos_index, word_size, last_pos_array;
- Uint32 num_var_attr= regTabPtr->no_var_attr;
- Uint16 copy_pos_array[MAX_ATTRIBUTES_IN_TABLE + 1];
- init_var_len_array(©_pos_array[0], regTabPtr);
- init_var_pos_array(©_pos_array[0],
- ©_pos_array[0],
- regTabPtr->no_var_attr);
-
- Uint32 *source_rec= req_struct->var_data_start;
- Uint32 *dest_rec= &ctemp_var_record[0];
- Uint32 total_word_size= 0;
- ljam();
- for (i= 0; i < num_var_attr; i++) {
- source_index= total_word_size;
- byte_size= req_struct->var_len_array[i];
- vpos_index= copy_pos_array[i];
- word_size= convert_byte_to_word_size(byte_size);
- total_word_size+= word_size;
- req_struct->var_pos_array[i]= copy_pos_array[i];
- MEMCOPY_NO_WORDS(&dest_rec[source_index],
- &source_rec[vpos_index],
- word_size);
- }
- last_pos_array= copy_pos_array[num_var_attr];
- req_struct->var_data_start= dest_rec;
- req_struct->var_data_end= &dest_rec[last_pos_array];
- req_struct->var_part_updated= true;
- req_struct->var_pos_array[num_var_attr]= last_pos_array;
-}
-
-
-Uint32
-Dbtup::calculate_total_var_size(Uint16* var_len_array,
- Uint32 num_var_attr)
-{
- Uint32 i, byte_size, word_size, total_size;
- total_size= 0;
- for (i= 0; i < num_var_attr; i++) {
- byte_size= var_len_array[i];
- word_size= convert_byte_to_word_size(byte_size);
- total_size+= word_size;
- }
- return total_size;
-}
-
-Uint32
-Dbtup::init_var_pos_array(Uint16* var_len_array,
- Uint16* var_pos_array,
- Uint32 num_var_attr)
-{
- Uint32 i, real_len, word_len;
- Uint32 curr_pos= 0;
- for (i= 0, curr_pos= 0; i < num_var_attr; i++) {
- real_len= var_len_array[i];
- var_pos_array[i]= curr_pos;
- word_len= convert_byte_to_word_size(real_len);
- curr_pos+= word_len;
- }
- var_pos_array[num_var_attr]= curr_pos;
- return curr_pos;
-}
-
-void
-Dbtup::init_var_len_array(Uint16 *var_len_array, Tablerec *tab_ptr)
-{
- Uint32 array_ind= 0;
- Uint32 attr_descr, i;
- Uint32 no_of_attr= tab_ptr->noOfAttr;
- Uint32 descr_start= tab_ptr->tabDescriptor;
- TableDescriptor *tab_descr= &tableDescriptor[descr_start];
- ndbrequire(descr_start + (no_of_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
- for (i= 0; i < no_of_attr; i++) {
- attr_descr= tab_descr[i * ZAD_SIZE].tabDescr;
- if (AttributeDescriptor::getArrayType(attr_descr) == 0) {
- Uint32 bits_used= AttributeDescriptor::getArraySize(attr_descr) *
- (1 << AttributeDescriptor::getSize(attr_descr));
- Uint32 no_attr_bytes= ((bits_used + 7) >> 3);
- var_len_array[array_ind++]= no_attr_bytes;
- }
- }
-}
-
-#endif
-
/*
Allocator for variable sized segments
Part of the external interface for variable sized segments
@@ -438,8 +50,8 @@
and dropping attributes without the need to copy the entire table.
SYNOPSIS
- frag_ptr A pointer to the fragment description
- tab_ptr A pointer to the table description
+ fragPtr A pointer to the fragment description
+ tabPtr A pointer to the table description
alloc_size Size of the allocated record
signal The signal object to be used if a signal needs to
be sent
@@ -451,44 +63,81 @@
page_ptr The i and p value of the page where the record was
allocated
*/
-Uint32* Dbtup::alloc_var_rec(Fragrecord* const frag_ptr,
- Tablerec* const tab_ptr,
+Uint32* Dbtup::alloc_var_rec(Fragrecord* fragPtr,
+ Tablerec* tabPtr,
Uint32 alloc_size,
Local_key* key,
- Uint32 * out_frag_page_id,
- Uint32 base)
+ Uint32 * out_frag_page_id)
{
- Var_page* page_header;
- PagePtr page_ptr;
- page_ptr.i= get_alloc_page(frag_ptr, (alloc_size + 1));
- if (page_ptr.i == RNIL) {
+ /**
+ * TODO alloc fix+var part
+ */
+ tabPtr->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize + 1;
+ Uint32 *ptr = alloc_fix_rec(fragPtr, tabPtr, key, out_frag_page_id);
+ tabPtr->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize + 1;
+ if (unlikely(ptr == 0))
+ {
+ return 0;
+ }
+
+ ndbassert(alloc_size >= tabPtr->m_offsets[MM].m_fix_header_size +
+ Tuple_header::HeaderSize);
+
+ alloc_size -= tabPtr->m_offsets[MM].m_fix_header_size +
+ Tuple_header::HeaderSize;
+
+
+ Local_key varref;
+ if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
+ {
+ Tuple_header* tuple = (Tuple_header*)ptr;
+ * tuple->get_var_part_ptr(tabPtr) = varref.ref();
+ return ptr;
+ }
+
+ PagePtr pagePtr;
+ c_page_pool.getPtr(pagePtr, key->m_page_no);
+ free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
+ return 0;
+}
+
+Uint32*
+Dbtup::alloc_var_part(Fragrecord* fragPtr,
+ Tablerec* tabPtr,
+ Uint32 alloc_size,
+ Local_key* key)
+{
+ PagePtr pagePtr;
+ pagePtr.i= get_alloc_page(fragPtr, (alloc_size + 1));
+ if (pagePtr.i == RNIL) {
ljam();
- if ((page_ptr.i= getEmptyPage(frag_ptr)) == RNIL) {
+ if ((pagePtr.i= get_empty_var_page(fragPtr)) == RNIL) {
ljam();
return 0;
}
- ptrCheckGuard(page_ptr, cnoOfPage, cpage);
- page_header= (Var_page*)page_ptr.p;
- page_header->init();
- insert_free_page(frag_ptr, page_header, MAX_FREE_LIST - 1);
+ c_page_pool.getPtr(pagePtr);
+ ((Var_page*)pagePtr.p)->init();
+ pagePtr.p->list_index = MAX_FREE_LIST - 1;
+ LocalDLList<Page> list(c_page_pool,
+ fragPtr->free_var_page_array[MAX_FREE_LIST-1]);
+ list.add(pagePtr);
/*
* Tup scan and index build check ZEMPTY_MM to skip un-init()ed
* page. Change state here. For varsize it means "page in use".
*/
- page_ptr.p->page_state = ZTH_MM_FREE;
+ pagePtr.p->page_state = ZTH_MM_FREE;
} else {
- ptrCheckGuard(page_ptr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr);
ljam();
- page_header= (Var_page*)page_ptr.p;
}
- Uint32 idx= page_header->alloc_record(alloc_size,
- (Var_page*)ctemp_page, base);
+ Uint32 idx= ((Var_page*)pagePtr.p)
+ ->alloc_record(alloc_size, (Var_page*)ctemp_page, Var_page::CHAIN);
+
+ key->m_page_no = pagePtr.i;
+ key->m_page_idx = idx;
- key->m_page_no= page_ptr.i;
- key->m_page_idx= idx;
- *out_frag_page_id= page_header->frag_page_id;
- update_free_page_list(frag_ptr, page_header);
- return page_header->get_ptr(idx);
+ update_free_page_list(fragPtr, pagePtr);
+ return ((Var_page*)pagePtr.p)->get_ptr(idx);
}
/*
@@ -496,8 +145,8 @@
Part of the external interface for variable sized segments
SYNOPSIS
- frag_ptr A pointer to the fragment description
- tab_ptr A pointer to the table description
+ fragPtr A pointer to the fragment description
+ tabPtr A pointer to the table description
signal The signal object to be used if a signal needs to
be sent
page_ptr A reference to the page of the variable sized
@@ -507,102 +156,59 @@
RETURN VALUES
Returns true if deallocation was successful otherwise false
*/
-void
-Dbtup::free_var_part(Fragrecord* frag_ptr, Tablerec* tab_ptr,
- Var_part_ref ref, Uint32 chain)
-{
- Local_key tmp;
- PagePtr pagePtr;
- tmp.m_page_idx= ref.m_ref & MAX_TUPLES_PER_PAGE;
- pagePtr.i= tmp.m_page_no= ref.m_ref >> MAX_TUPLES_BITS;
-
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
- free_var_part(frag_ptr, tab_ptr, &tmp, (Var_page*)pagePtr.p, chain);
-}
-
-void Dbtup::free_var_part(Fragrecord* const frag_ptr,
- Tablerec* const tab_ptr,
- Local_key* key,
- Var_page* const page_header,
- Uint32 chain)
-{
-
+void Dbtup::free_var_rec(Fragrecord* fragPtr,
+ Tablerec* tabPtr,
+ Local_key* key,
+ Ptr<Page> pagePtr)
+{
+ /**
+ * TODO free fix + var part
+ */
Uint32 page_idx= key->m_page_idx;
- page_header->free_record(page_idx, chain);
+ Uint32 *ptr = ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
+ Tuple_header* tuple = (Tuple_header*)ptr;
+
+ Local_key ref;
+ ref = * tuple->get_var_part_ptr(tabPtr);
+
+ free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
+
+ c_page_pool.getPtr(pagePtr, ref.m_page_no);
+ ((Var_page*)pagePtr.p)->free_record(ref.m_page_idx, Var_page::CHAIN);
- ndbassert(page_header->free_space <= Var_page::DATA_WORDS);
- if (page_header->free_space == Var_page::DATA_WORDS - 1)
+ ndbassert(pagePtr.p->free_space <= Var_page::DATA_WORDS);
+ if (pagePtr.p->free_space == Var_page::DATA_WORDS - 1)
{
ljam();
/*
- This code could be used when we release pages.
- remove_free_page(signal,frag_ptr,page_header,page_header->list_index);
- return_empty_page(frag_ptr, page_header);
+ This code could be used when we release pages.
+ remove_free_page(signal,fragPtr,page_header,page_header->list_index);
+ return_empty_page(fragPtr, page_header);
*/
- update_free_page_list(frag_ptr, page_header);
+ update_free_page_list(fragPtr, pagePtr);
} else {
ljam();
- update_free_page_list(frag_ptr, page_header);
+ update_free_page_list(fragPtr, pagePtr);
}
return;
}
-
-#if 0
-/*
- This method is called whenever the variable part has been updated and
- has grown beyond its original size. This means that more space needs to
- be allocated to the record. If possible this space should be in the
- same page but we might have to allocate more space in a new page.
- In the case of a new page we must still keep the old page and the
- page index since this is the entrance to the record. In this case the
- record might have to be split into a fixed part and a variable part.
-
- This routine uses cinBuffer as temporary copy buffer. This is no longer
- used since it contains the interpreted program to use in the update
- and this has completed when this function is called.
-
- SYNOPSIS
- req_struct The structure for temporary content
- signal The signal object
- regOperPtr The operation record
- regFragPtr The fragment record
- regTabPtr The table record
-
- RETURN VALUES
- bool false if failed due to lack of memory
- */
-bool
-Dbtup::handle_growth_after_update(KeyReqStruct* req_struct,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
- Uint32 growth_len)
-{
- Uint32 vp_index, alloc_size, entry_len, curr_var_len;
- Uint32 new_vp_index, new_vp_offset, new_page_ref;
- Uint32 *copy_record= &cinBuffer[0];
- Ptr<Var_page> var_page= req_struct->var_page_ptr;
- Var_page* page_header= var_page.p;
- vp_index= req_struct->vp_index;
- entry_len= var_page.p->get_entry_len(vp_index);
- if (page_header->free_space >= growth_len) {
- /**
- * We will be able to handle the growth without changing the page
- * and page index.
- */
- if (page_header->largest_frag_size() >= entry_len + growth_len) {
- ljam();
- /**
- * In this case we need to copy the entry to the free space area of
- * the page, it is not necessary to reorganise the page.
- */
- MEMCOPY_NO_WORDS(page_header->get_free_space_ptr(),
- page_header->get_ptr(vp_index),
- entry_len);
- page_header->set_entry_offset(vp_index, page_header->insert_pos);
- page_header->insert_pos+= entry_len;
- } else {
- ljam();
+int
+Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
+ Var_part_ref* ref, Uint32 oldsz, Uint32 newsz)
+{
+ Uint32 add = newsz - oldsz;
+ Var_page* pageP = (Var_page*)pagePtr.p;
+ Local_key oldref;
+ oldref = *(Uint32*)ref;
+
+ if (pageP->free_space >= add)
+ {
+ jam();
+ if(!pageP->is_space_behind_entry(oldref.m_page_idx, add))
+ {
+ if(0) printf("extra reorg");
+ jam();
/**
* In this case we need to reorganise the page to fit. To ensure we
* don't complicate matters we make a little trick here where we
@@ -610,98 +216,49 @@
* that separately at the end. This means we need to copy it out of
* the page before reorg_page to save the entry contents.
*/
- MEMCOPY_NO_WORDS(copy_record,
- page_header->get_ptr(vp_index),
- entry_len);
- page_header->set_entry_len(vp_index, 0);
- page_header->free_space+= entry_len;
- reorg_page(page_header);
- MEMCOPY_NO_WORDS(page_header->get_free_space_ptr(),
- copy_record,
- entry_len);
- page_header->set_entry_offset(vp_index, page_header->insert_pos);
- growth_len+= entry_len;
- }
- grow_entry(regFragPtr,
- page_header,
- vp_index,
- growth_len);
- return true;
- } else {
- /**
- * It is necessary to allocate a segment from a new page.
- */
- if (req_struct->fix_var_together) {
- ljam();
- alloc_size= (entry_len + growth_len) - regTabPtr->tupheadsize;
- curr_var_len= alloc_size - regTabPtr->var_array_wsize;
- } else {
- ljam();
- curr_var_len= entry_len - regTabPtr->var_array_wsize;
- alloc_size= entry_len + growth_len;
- }
- Uint32* ptr, frag_page_id;
- Local_key key;
- if ((ptr= alloc_var_rec(regFragPtr,
- regTabPtr,
- alloc_size,
- &key, &frag_page_id)) == 0)
- {
- /**
- * No space existed for this growth. We need to abort the update.
- */
- ljam();
- terrorCode= ZMEM_NOMEM_ERROR;
- return false;
- }
-
- /*
- * I need to be careful to copy the var_len_array before freeing it.
- * The data part will be copied by copy_back_var_attr immediately
- * after returning from this method.
- * The updated var part is always in ctemp_var_record since I can
- * never arrive here after a first insert. Thus no danger of the
- * var part written being released.
- */
- MEMCOPY_NO_WORDS(ptr,
- req_struct->var_len_array,
- regTabPtr->var_array_wsize);
- req_struct->var_len_array= (Uint16*)ptr;
- if (! req_struct->fix_var_together) {
- ljam();
- /*
- * We need to deallocate the old variable part. This new one will
- * remain the variable part even if we abort the transaction.
- * We don't keep multiple references to the variable parts.
- * The copy data for abort is still kept in the copy record.
- */
- free_separate_var_part(regFragPtr, regTabPtr, req_struct->m_tuple_ptr);
- } else {
- ljam();
- req_struct->fix_var_together= false;
+ Uint32* copyBuffer= cinBuffer;
+ memcpy(copyBuffer, pageP->get_ptr(oldref.m_page_idx), 4*oldsz);
+ pageP->set_entry_len(oldref.m_page_idx, 0);
+ pageP->free_space += oldsz;
+ pageP->reorg((Var_page*)ctemp_page);
+ memcpy(pageP->get_free_space_ptr(), copyBuffer, 4*oldsz);
+ pageP->set_entry_offset(oldref.m_page_idx, pageP->insert_pos);
+ add += oldsz;
}
- page_header= (Var_page*)var_page.p;
- new_page_ref= (key.m_page_no << MAX_TUPLES_BITS) + key.m_page_idx;
- req_struct->m_tuple_ptr->m_data[regTabPtr->var_offset] = new_page_ref;
- Uint32 bits= req_struct->m_tuple_ptr->m_header_bits;
- req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::CHAINED_ROW;
- req_struct->var_page_ptr= var_page;
- req_struct->vp_index= key.m_page_idx;
+ pageP->grow_entry(oldref.m_page_idx, add);
+ update_free_page_list(fragPtr, pagePtr);
+ }
+ else
+ {
+ Local_key newref;
+ Uint32 *src = pageP->get_ptr(oldref.m_page_idx);
+ Uint32 *dst = alloc_var_part(fragPtr, tabPtr, newsz, &newref);
+ if (unlikely(dst == 0))
+ return -1;
+
+ ndbassert(oldref.m_page_no != newref.m_page_no);
+ ndbassert(pageP->get_entry_len(oldref.m_page_idx) == oldsz);
+ memcpy(dst, src, 4*oldsz);
+ * ((Uint32*)ref) = newref.ref();
+
+ pageP->free_record(oldref.m_page_idx, Var_page::CHAIN);
+ update_free_page_list(fragPtr, pagePtr);
}
- return true;
+
+ return 0;
}
-#endif
/* ------------------------------------------------------------------------ */
// Get a page from one of free lists. If the desired free list is empty we
// try with the next until we have tried all possible lists.
/* ------------------------------------------------------------------------ */
-Uint32 Dbtup::get_alloc_page(Fragrecord* const frag_ptr, Uint32 alloc_size)
+Uint32
+Dbtup::get_alloc_page(Fragrecord* fragPtr, Uint32 alloc_size)
{
- Uint32 i, start_index, loop_count= 0;
- PagePtr page_ptr;
-
+ Uint32 i, start_index, loop= 0;
+ PagePtr pagePtr;
+
start_index= calculate_free_list_impl(alloc_size);
if (start_index == (MAX_FREE_LIST - 1)) {
ljam();
@@ -712,37 +269,73 @@
}
for (i= start_index; i < MAX_FREE_LIST; i++) {
ljam();
- if (frag_ptr->free_var_page_array[i] != RNIL) {
+ if (!fragPtr->free_var_page_array[i].isEmpty()) {
ljam();
- return frag_ptr->free_var_page_array[i];
+ return fragPtr->free_var_page_array[i].firstItem;
}
}
ndbrequire(start_index > 0);
i= start_index - 1;
- page_ptr.i= frag_ptr->free_var_page_array[i];
- while ((page_ptr.i != RNIL) && (loop_count++ < 16)) {
+ LocalDLList<Page> list(c_page_pool, fragPtr->free_var_page_array[i]);
+ for(list.first(pagePtr); !pagePtr.isNull() && loop < 16; )
+ {
ljam();
- ptrCheckGuard(page_ptr, cnoOfPage, cpage);
- Var_page* page_header= (Var_page*)page_ptr.p;
- if (page_header->free_space >= alloc_size) {
+ if (pagePtr.p->free_space >= alloc_size) {
ljam();
- return page_ptr.i;
+ return pagePtr.i;
}
- page_ptr.i= page_header->next_page;
+ loop++;
+ list.next(pagePtr);
}
return RNIL;
}
+Uint32
+Dbtup::get_empty_var_page(Fragrecord* fragPtr)
+{
+ PagePtr ptr;
+ LocalSLList<Page> list(c_page_pool, fragPtr->m_empty_pages);
+ if (list.remove_front(ptr))
+ {
+ return ptr.i;
+ }
+
+ Uint32 cnt;
+ allocConsPages(10, cnt, ptr.i);
+ if (unlikely(cnt == 0))
+ {
+ return RNIL;
+ }
+
+ PagePtr ret = ptr;
+ for (Uint32 i = 0; i<cnt; i++, ptr.i++)
+ {
+ c_page_pool.getPtr(ptr);
+ ptr.p->physical_page_id = ptr.i;
+ ptr.p->page_state = ZEMPTY_MM;
+ ptr.p->nextList = ptr.i + 1;
+ ptr.p->prevList = RNIL;
+ ptr.p->frag_page_id = RNIL;
+ }
+
+ if (cnt > 1)
+ {
+ ptr.p->nextList = RNIL;
+ list.add(ret.i + 1, ptr);
+ }
+
+ return ret.i;
+}
/* ------------------------------------------------------------------------ */
// Check if the page needs to go to a new free page list.
/* ------------------------------------------------------------------------ */
-void Dbtup::update_free_page_list(Fragrecord* const frag_ptr,
- Var_page* page_header)
+void Dbtup::update_free_page_list(Fragrecord* fragPtr,
+ Ptr<Page> pagePtr)
{
Uint32 free_space, list_index;
- free_space= page_header->free_space;
- list_index= page_header->list_index;
+ free_space= pagePtr.p->free_space;
+ list_index= pagePtr.p->list_index;
if ((free_space < c_min_list_size[list_index]) ||
(free_space > c_max_list_size[list_index])) {
Uint32 new_list_index= calculate_free_list_impl(free_space);
@@ -751,25 +344,30 @@
/*
* Only remove it from its list if it is in a list
*/
- remove_free_page(frag_ptr, page_header, list_index);
+ LocalDLList<Page>
+ list(c_page_pool, fragPtr->free_var_page_array[list_index]);
+ list.remove(pagePtr);
}
if (free_space < c_min_list_size[new_list_index]) {
/*
- We have not sufficient amount of free space to put it into any
- free list. Thus the page will not be available for new inserts.
- This can only happen for the free list with least guaranteed free space.
+ We have not sufficient amount of free space to put it into any
+ free list. Thus the page will not be available for new inserts.
+ This can only happen for the free list with least guaranteed
+ free space.
*/
ljam();
ndbrequire(new_list_index == 0);
- page_header->list_index= MAX_FREE_LIST;
+ pagePtr.p->list_index= MAX_FREE_LIST;
} else {
ljam();
- insert_free_page(frag_ptr, page_header, new_list_index);
+ LocalDLList<Page> list(c_page_pool,
+ fragPtr->free_var_page_array[new_list_index]);
+ list.add(pagePtr);
+ pagePtr.p->list_index = new_list_index;
}
}
}
-
/* ------------------------------------------------------------------------ */
// Given size of free space, calculate the free list to put it into
/* ------------------------------------------------------------------------ */
@@ -787,60 +385,37 @@
return 0;
}
-
-/* ------------------------------------------------------------------------ */
-// Remove a page from its current free list
-/* ------------------------------------------------------------------------ */
-void Dbtup::remove_free_page(Fragrecord* frag_ptr,
- Var_page* page_header,
- Uint32 index)
-{
- Var_page* tmp_page_header;
- if (page_header->prev_page == RNIL) {
- ljam();
- ndbassert(index < MAX_FREE_LIST);
- frag_ptr->free_var_page_array[index]= page_header->next_page;
- } else {
- ljam();
- PagePtr prev_page_ptr;
- prev_page_ptr.i= page_header->prev_page;
- ptrCheckGuard(prev_page_ptr, cnoOfPage, cpage);
- tmp_page_header= (Var_page*)prev_page_ptr.p;
- tmp_page_header->next_page= page_header->next_page;
- }
- if (page_header->next_page != RNIL) {
- ljam();
- PagePtr next_page_ptr;
- next_page_ptr.i= page_header->next_page;
- ptrCheckGuard(next_page_ptr, cnoOfPage, cpage);
- tmp_page_header= (Var_page*) next_page_ptr.p;
- tmp_page_header->prev_page= page_header->prev_page;
+Uint32*
+Dbtup::alloc_var_rowid(Fragrecord* fragPtr,
+ Tablerec* tabPtr,
+ Uint32 alloc_size,
+ Local_key* key,
+ Uint32 * out_frag_page_id)
+{
+ tabPtr->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize + 1;
+ Uint32 *ptr = alloc_fix_rowid(fragPtr, tabPtr, key, out_frag_page_id);
+ tabPtr->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize + 1;
+ if (unlikely(ptr == 0))
+ {
+ return 0;
}
-}
+ ndbassert(alloc_size >= tabPtr->m_offsets[MM].m_fix_header_size +
+ Tuple_header::HeaderSize);
+
+ alloc_size -= tabPtr->m_offsets[MM].m_fix_header_size +
+ Tuple_header::HeaderSize;
-/* ------------------------------------------------------------------------ */
-// Insert a page into a free list on the fragment
-/* ------------------------------------------------------------------------ */
-void Dbtup::insert_free_page(Fragrecord* frag_ptr,
- Var_page* page_header,
- Uint32 index)
-{
- Var_page* tmp_page_header;
- Uint32 current_head= frag_ptr->free_var_page_array[index];
- Uint32 pagePtrI = page_header->physical_page_id;
- page_header->next_page= current_head;
- ndbassert(index < MAX_FREE_LIST);
- frag_ptr->free_var_page_array[index]= pagePtrI;
- page_header->prev_page= RNIL;
- page_header->list_index= index;
- if (current_head != RNIL) {
- ljam();
- PagePtr head_page_ptr;
- head_page_ptr.i= current_head;
- ptrCheckGuard(head_page_ptr, cnoOfPage, cpage);
- tmp_page_header= (Var_page*)head_page_ptr.p;
- tmp_page_header->prev_page= pagePtrI;
+ Local_key varref;
+ if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
+ {
+ Tuple_header* tuple = (Tuple_header*)ptr;
+ * tuple->get_var_part_ptr(tabPtr) = varref.ref();
+ return ptr;
}
+
+ PagePtr pagePtr;
+ c_page_pool.getPtr(pagePtr, key->m_page_no);
+ free_fix_rec(fragPtr, tabPtr, key, (Fix_page*)pagePtr.p);
+ return 0;
}
-
--- 1.1/storage/ndb/src/kernel/blocks/dbtup/Undo_buffer.cpp 2005-11-07 12:19:13 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/dbtup/Undo_buffer.cpp 2005-11-04 08:03:47 +01:00
@@ -50,39 +50,34 @@
m_tup->allocConsPages(1, count, m_first_free);
if(count == 0)
return 0;
- page= (UndoPage*)(m_tup->cpage+m_first_free);
+ page= (UndoPage*)m_tup->c_page_pool.getPtr(m_first_free);
page->m_state= ~ZFREE_COMMON;
page->m_words_used= 0;
page->m_ref_count= 0;
}
- if(m_first_free < m_tup->cnoOfPage)
+ page= (UndoPage*)m_tup->c_page_pool.getPtr(m_first_free);
+
+ Uint32 pos= page->m_words_used;
+ if(words + pos > UndoPage::DATA_WORDS)
{
- page= (UndoPage*)(m_tup->cpage+m_first_free);
-
- Uint32 pos= page->m_words_used;
- if(words + pos > UndoPage::DATA_WORDS)
- {
- m_first_free= RNIL;
- return alloc_copy_tuple(dst, words);
- }
-
- dst->m_page_no = m_first_free;
- dst->m_page_idx = pos;
-
- page->m_ref_count++;
- page->m_words_used = pos + words;
- return page->m_data + pos;
+ m_first_free= RNIL;
+ return alloc_copy_tuple(dst, words);
}
- assert(false);
- return 0;
+
+ dst->m_page_no = m_first_free;
+ dst->m_page_idx = pos;
+
+ page->m_ref_count++;
+ page->m_words_used = pos + words;
+ return page->m_data + pos;
}
void
Undo_buffer::shrink_copy_tuple(Local_key* key, Uint32 words)
{
assert(key->m_page_no == m_first_free);
- UndoPage* page= (UndoPage*)(m_tup->cpage+key->m_page_no);
+ UndoPage* page= (UndoPage*)m_tup->c_page_pool.getPtr(key->m_page_no);
assert(page->m_words_used >= words);
page->m_words_used -= words;
}
@@ -90,7 +85,7 @@
void
Undo_buffer::free_copy_tuple(Local_key* key)
{
- UndoPage* page= (UndoPage*)(m_tup->cpage+key->m_page_no);
+ UndoPage* page= (UndoPage*)m_tup->c_page_pool.getPtr(key->m_page_no);
Uint32 cnt= page->m_ref_count;
assert(cnt);
@@ -115,6 +110,6 @@
Uint32 *
Undo_buffer::get_ptr(Local_key* key)
{
- return ((UndoPage*)(m_tup->cpage+key->m_page_no))->m_data+key->m_page_idx;
+ return
((UndoPage*)(m_tup->c_page_pool.getPtr(key->m_page_no)))->m_data+key->m_page_idx;
}
--- 1.1/storage/ndb/src/kernel/blocks/dbtup/test_varpage.cpp 2005-11-07 12:19:13 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/dbtup/test_varpage.cpp 2005-11-01 11:50:11 +01:00
@@ -9,14 +9,21 @@
Uint32* data;
};
+NdbOut&
+operator <<(NdbOut& out, const Record& rec)
+{
+ out << "[ idx: " << rec.idx << " sz: " << rec.size << "
]";
+ return out;
+}
+
#define TRACE(x) x
static
-void
+bool
cmp(const Uint32 *p1, const Uint32 *p2, Uint32 words)
{
if(memcmp(p1, p2, 4*words) == 0)
- return;
+ return true;
for(Uint32 i = 0; i<words; i++)
printf(" %.8x", p1[i]);
@@ -26,13 +33,20 @@
printf(" %.8x", p2[i]);
printf("\n");
- abort();
+ return false;
}
static
void
-do_test(int loops, int dist[3])
+do_test(int loops, int dist[5])
{
+ fprintf(stderr, "do_test(%d, [ %d %d %d %d %d ])\n",
+ loops,
+ dist[0],
+ dist[1],
+ dist[2],
+ dist[3],
+ dist[4]);
int allocated= 0;
Record records[8192];
@@ -41,24 +55,39 @@
for(int i = 0; i<loops; i++)
{
+ assert(page.high_index + page.insert_pos <= page.DATA_WORDS);
+
for(int j = 0; j<allocated; j++)
{
Record rec= records[j];
Uint32* ptr= page.get_ptr(rec.idx);
- cmp(ptr, rec.data, rec.size);
+ Uint32 pos = page.get_ptr(rec.idx) - page.m_data;
+ if (page.get_entry_len(rec.idx) != rec.size)
+ {
+ ndbout << "INVALID LEN " << j << " " << rec << " pos: "
<< pos << endl;
+ ndbout << page << endl;
+ abort();
+ }
+
+ if(!cmp(ptr, rec.data, rec.size))
+ {
+ ndbout << "FAILED " << j << " " << rec << " pos: "
<< pos << endl;
+ ndbout << page << endl;
+ abort();
+ }
}
loop:
int op;
int rnd= rand() % 100;
- for(op= 0; op<3; op++)
+ for(op= 0; op<5; op++)
if(rnd < dist[op])
break;
if(allocated == 0)
op= 0;
if(page.free_space <= 2 && op == 0) goto loop;
-
+
switch(op){
case 0: // Alloc
{
@@ -69,9 +98,73 @@
{
rec.data[i] = rand();
}
- ndbout << "Alloc " << rec.size << flush;
- rec.idx= page.alloc_record(rec.size, &tmp, 0);
- ndbout << " -> " << rec.idx << endl;
+ ndbout << "Alloc hi: " << page.high_index << " (" <<
+ ((rnd < 30) ? "any" :
+ (rnd < 60) ? "dir" :
+ (rnd < 80) ? "exp" : "fail") << ") ";
+ ndbout << rec.size << flush;
+ if (rnd < 30)
+ {
+ rec.idx= page.alloc_record(rec.size, &tmp, 0);
+ }
+ else if (rnd < 60)
+ {
+ // Alloc with id, from directory
+ Vector<Uint32> free;
+ for(Uint32 i = page.high_index - 1; i > 0; i--)
+ {
+ if (page.get_index_word(i) & page.FREE)
+ {
+ free.push_back(i);
+ if (free.size() > 100)
+ break;
+ }
+ }
+ if (free.size())
+ {
+ rec.idx = free[rand() % free.size()];
+ if (page.alloc_record(rec.idx, rec.size, &tmp) != rec.idx)
+ {
+ abort();
+ }
+ }
+ else
+ {
+ rec.idx = page.high_index;
+ if (page.alloc_record(rec.idx, rec.size, &tmp) != rec.idx)
+ {
+ if (rec.size + 1 != page.free_space)
+ abort();
+ delete [] rec.data;
+ ndbout_c(" FAIL");
+ break;
+ }
+ }
+ }
+ else if(rnd < 80)
+ {
+ // Alloc with id, outside of directory
+ rec.idx = page.high_index + (rand() % (page.free_space - rec.size));
+ if (page.alloc_record(rec.idx, rec.size, &tmp) != rec.idx)
+ {
+ abort();
+ }
+ }
+ else
+ {
+ rec.idx = page.high_index + (page.free_space - rec.size) + 1;
+ if (page.alloc_record(rec.idx, rec.size, &tmp) == rec.idx)
+ {
+ abort();
+ }
+ delete [] rec.data;
+ ndbout_c(" FAIL");
+ break;
+ }
+
+ Uint32 pos = page.get_ptr(rec.idx) - page.m_data;
+ ndbout << " -> " << rec.idx
+ << " pos: " << pos << endl;
Uint32* ptr= page.get_ptr(rec.idx);
memcpy(ptr, rec.data, 4*rec.size);
records[allocated++] = rec;
@@ -81,12 +174,14 @@
{
int no= rand() % allocated;
Record rec= records[no];
- ndbout << "Free no: " << no << " idx: " << rec.idx <<
endl;
+ Uint32 pos = page.get_ptr(rec.idx) - page.m_data;
+ ndbout << "Free hi: " << page.high_index << " no: " << no
<< " idx: " << rec.idx << " pos: " << pos << endl;
Uint32* ptr= page.get_ptr(rec.idx);
+ assert(page.get_entry_len(rec.idx) == rec.size);
cmp(ptr, rec.data, rec.size);
delete[] rec.data;
page.free_record(rec.idx, 0);
-
+
for (unsigned k = no; k + 1 < allocated; k++)
records[k] = records[k+1];
allocated--;
@@ -98,8 +193,57 @@
page.reorg(&tmp);
break;
case 3:
- ndbout << "Expand" << endl;
-
+ {
+ Uint32 free = page.free_space;
+ if (free <= 2)
+ {
+ goto shrink;
+ }
+ free /= 2;
+ int no = rand() % allocated;
+ Record rec= records[no];
+ ndbout << "Expand no: " << no << " idx: " << rec.idx
+ << " add: " << free << " reorg: "
+ << !page.is_space_behind_entry(rec.idx, free)
+ << endl;
+ if (!page.is_space_behind_entry(rec.idx, free))
+ {
+ Uint32 buffer[8192];
+ Uint32 len = page.get_entry_len(rec.idx);
+ memcpy(buffer, page.get_ptr(rec.idx), 4*len);
+ page.set_entry_len(rec.idx, 0);
+ page.free_space += len;
+ page.reorg(&tmp);
+ memcpy(page.get_free_space_ptr(), buffer, 4*len);
+ page.set_entry_offset(rec.idx, page.insert_pos);
+ free += len;
+ records[no].size = 0;
+ }
+
+ page.grow_entry(rec.idx, free);
+ records[no].size += free;
+ Uint32 *ptr = page.get_ptr(rec.idx);
+ Uint32 *new_data = new Uint32[records[no].size];
+ for(Uint32 i= 0; i<records[no].size; i++)
+ {
+ ptr[i] = new_data[i] = rand();
+ }
+ delete []rec.data;
+ records[no].data = new_data;
+ break;
+ }
+ case 4:
+ {
+ shrink:
+ int no = rand() % allocated;
+ Record rec = records[no];
+ Uint32 sz = rec.size / 2 + 1;
+ ndbout << "Shrink no: " << no << " idx: " << rec.idx
<< " remove: "
+ << (rec.size - sz) << endl;
+ page.shrink_entry(rec.idx, sz);
+ records[no].size = sz;
+ break;
+ }
}
}
@@ -107,19 +251,27 @@
}
int
-main(void)
+main(int argc, char **argv)
{
ndb_init();
+
+ if (argc > 1)
+ {
+ time_t seed = time(0);
+ srand(seed);
+ fprintf(stderr, "srand(%d)\n", seed);
+ }
+ // alloc, free, reorg, grow, shrink
- int t1[] = { 30, 90, 100 };
- int t2[] = { 45, 90, 100 };
- int t3[] = { 60, 90, 100 };
- int t4[] = { 75, 90, 100 };
+ int t1[] = { 10, 60, 70, 85, 100 };
+ int t2[] = { 30, 60, 70, 85, 100 };
+ int t3[] = { 50, 60, 70, 85, 100 };
do_test(10000, t1);
do_test(10000, t2);
do_test(10000, t3);
- do_test(10000, t4);
+
+ return 0;
}
template class Vector<Record>;
--- 1.1/storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp 2005-11-07 12:19:13 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp 2005-11-04 14:35:51 +01:00
@@ -18,6 +18,20 @@
#include "tuppage.hpp"
#include "Dbtup.hpp"
+/**
+ * Fix pages maintain a double linked list of free entries
+ *
+ * Var pages has a directory where each entry is
+ * [ C(1), F(1), L(15), P(15) ]
+ * C is chain bit, (is it a full tuple or just chain)
+ * F is free bit
+ * If true, L is prev free entry (in directory)
+ * P is next free entry (in directory)
+ * else
+ * L is len of entry
+ * P is pos of entry
+ */
+
Uint32
Tup_fixsize_page::alloc_record()
{
@@ -116,11 +130,123 @@
free_space= DATA_WORDS - 1;
high_index= 1;
insert_pos= 0;
- next_free_index= 0xFFFF;
+ next_free_index= END_OF_FREE_LIST;
m_page_header.m_page_type = File_formats::PT_Tup_varsize_page;
}
Uint32
+Tup_varsize_page::alloc_record(Uint32 page_idx, Uint32 alloc_size,
+ Tup_varsize_page* temp)
+{
+ assert(page_idx); // 0 is not allowed
+ Uint32 free = free_space;
+ Uint32 largest_size= DATA_WORDS - (insert_pos + high_index);
+ Uint32 free_list = next_free_index;
+
+ if (page_idx < high_index)
+ {
+ Uint32 *ptr = get_index_ptr(page_idx);
+ Uint32 word = *ptr;
+
+ if (unlikely((free < alloc_size) || ! (word & FREE)))
+ {
+ return ~0;
+ }
+
+ if (alloc_size >= largest_size)
+ {
+ /*
+ We can't fit this segment between the insert position and the end of
+ the index entries. We will pack the page so that all free space
+ exists between the insert position and the end of the index entries.
+ */
+ reorg(temp);
+ }
+
+ Uint32 next = (word & NEXT_MASK) >> NEXT_SHIFT;
+ Uint32 prev = (word & PREV_MASK) >> PREV_SHIFT;
+
+ if (next != END_OF_FREE_LIST)
+ {
+ Uint32 * next_ptr = get_index_ptr(next);
+ Uint32 next_word = * next_ptr;
+ * next_ptr = (next_word & ~PREV_MASK) | (prev << PREV_SHIFT);
+ }
+
+ if (prev != END_OF_FREE_LIST)
+ {
+ Uint32 * prev_ptr = get_index_ptr(prev);
+ Uint32 prev_word = * prev_ptr;
+ * prev_ptr = (prev_word & ~NEXT_MASK) | (next << NEXT_SHIFT);
+ }
+ else
+ {
+ assert(next_free_index == page_idx);
+ next_free_index = next;
+ }
+
+ * ptr = insert_pos + (alloc_size << LEN_SHIFT);
+ free -= alloc_size;
+ }
+ else
+ {
+ /**
+ * We need to expand directory
+ */
+ Uint32 hi = high_index;
+ Uint32 expand = (page_idx + 1 - hi);
+ Uint32 size = alloc_size + expand;
+ if (unlikely(size > free))
+ {
+ return ~0;
+ }
+
+ if (size >= largest_size)
+ {
+ /*
+ We can't fit this segment between the insert position and the end of
+ the index entries. We will pack the page so that all free space
+ exists between the insert position and the end of the index entries.
+ */
+ reorg(temp);
+ }
+
+ Uint32 *ptr = m_data + DATA_WORDS - hi;
+ if (page_idx == hi)
+ {
+ * ptr = insert_pos + (alloc_size << LEN_SHIFT);
+ }
+ else
+ {
+ if (free_list != END_OF_FREE_LIST)
+ {
+ Uint32 * prev_ptr = get_index_ptr(free_list);
+ Uint32 prev_word = * prev_ptr;
+ * prev_ptr = (prev_word & ~PREV_MASK) | (hi << PREV_SHIFT);
+ }
+
+ for (; hi < page_idx;)
+ {
+ * ptr-- = FREE | (free_list << NEXT_SHIFT) | ((hi+1) << PREV_SHIFT);
+ free_list = hi++;
+ }
+
+ * ptr++ = insert_pos + (alloc_size << LEN_SHIFT);
+ * ptr = ((* ptr) & ~PREV_MASK) | (END_OF_FREE_LIST << PREV_SHIFT);
+
+ next_free_index = hi - 1;
+ }
+ high_index = hi + 1;
+ free -= size;
+ }
+
+ free_space = free;
+ insert_pos += alloc_size;
+
+ return page_idx;
+}
+
+Uint32
Tup_varsize_page::alloc_record(Uint32 alloc_size,
Tup_varsize_page* temp, Uint32 chain)
{
@@ -138,7 +264,7 @@
assert(largest_size > alloc_size);
Uint32 page_idx;
- if (next_free_index == 0xFFFF) {
+ if (next_free_index == END_OF_FREE_LIST) {
/*
We are out of free index slots. We will extend the array of free
slots
@@ -148,12 +274,21 @@
} else {
// Pick an empty slot among the index entries
page_idx= next_free_index;
- assert((get_index_word(page_idx) & 0xFFFF0000) == 0);
- next_free_index= get_index_word(page_idx);
+ assert((get_index_word(page_idx) & FREE) == FREE);
+ assert(((get_index_word(page_idx) & PREV_MASK) >> PREV_SHIFT) ==
+ END_OF_FREE_LIST);
+ next_free_index= (get_index_word(page_idx) & NEXT_MASK) >> NEXT_SHIFT;
+ assert(next_free_index);
+ if (next_free_index != END_OF_FREE_LIST)
+ {
+ Uint32 *ptr = get_index_ptr(next_free_index);
+ Uint32 word = *ptr;
+ * ptr = (word & ~PREV_MASK) | (END_OF_FREE_LIST << PREV_SHIFT);
+ }
}
assert(chain == 0 || chain == CHAIN);
- * get_index_ptr(page_idx) = insert_pos + ((chain + alloc_size) << 16);
+ * get_index_ptr(page_idx) = insert_pos + chain + (alloc_size << LEN_SHIFT);
insert_pos += alloc_size;
free_space -= alloc_size;
@@ -167,10 +302,10 @@
//ndbout_c("%p->free_record(%d%s)", this, page_idx, (chain ? " CHAIN": ""));
Uint32 *index_ptr= get_index_ptr(page_idx);
Uint32 index_word= * index_ptr;
- Uint32 entry_pos= index_word & 0xFFFF;
- Uint32 entry_len= (index_word >> 16) & ~CHAIN;
+ Uint32 entry_pos= (index_word & POS_MASK) >> POS_SHIFT;
+ Uint32 entry_len= (index_word & LEN_MASK) >> LEN_SHIFT;
assert(chain == 0 || chain == CHAIN);
- assert(!(((index_word >> 16) ^ chain) & 0x8000));
+ assert((index_word & CHAIN) == chain);
#ifdef VM_TRACE
memset(m_data + entry_pos, 0xF2, 4*entry_len);
#endif
@@ -183,8 +318,16 @@
*/
rebuild_index(index_ptr);
} else {
- * index_ptr= next_free_index;
+ if (next_free_index != END_OF_FREE_LIST)
+ {
+ Uint32 *ptr = get_index_ptr(next_free_index);
+ Uint32 word = *ptr;
+ assert(((word & PREV_MASK) >> PREV_SHIFT) == END_OF_FREE_LIST);
+ * ptr = (word & ~PREV_MASK) | (page_idx << PREV_SHIFT);
+ }
+ * index_ptr= FREE | next_free_index | (END_OF_FREE_LIST << PREV_SHIFT);
next_free_index= page_idx;
+ assert(next_free_index);
}
free_space+= entry_len;
@@ -204,7 +347,7 @@
* Scan until you find first non empty index pos
*/
for(index_ptr++; index_ptr < end; index_ptr++)
- if((* index_ptr >> 16) == 0)
+ if((* index_ptr) & FREE)
empty++;
else
break;
@@ -214,23 +357,30 @@
// Totally free page
high_index = 1;
free_space += empty;
- next_free_index= 0xFFFF;
+ next_free_index = END_OF_FREE_LIST;
return;
}
-
- Uint32 next= 0xFFFF;
- high_index -= empty;
+
+ Uint32 next= END_OF_FREE_LIST;
+ Uint32 dummy;
+ Uint32 *prev_ptr = &dummy;
for(index_ptr++; index_ptr < end; index_ptr++)
{
- if((* index_ptr >> 16) == 0)
+ if ((* index_ptr) & FREE)
{
- * index_ptr= next;
+ * index_ptr= FREE | next;
next= (end - index_ptr);
+ * prev_ptr |= (next << PREV_SHIFT);
+ prev_ptr = index_ptr;
}
}
+ * prev_ptr |= (END_OF_FREE_LIST << PREV_SHIFT);
+
+ high_index -= empty;
free_space += empty;
next_free_index= next;
+ assert(next_free_index);
}
void
@@ -247,16 +397,17 @@
for (; index_ptr < end_of_page; index_ptr++)
{
Uint32 index_word= * index_ptr;
- Uint32 entry_len= (index_word >> 16) & ~CHAIN;
- if (entry_len != 0) {
+ Uint32 entry_len= (index_word & LEN_MASK) >> LEN_SHIFT;
+ if (!(index_word & FREE) && entry_len)
+ {
/*
We found an index item that needs to be packed.
We will update the index entry and copy the data to the page.
*/
- Uint32 entry_pos= index_word & 0xffff;
+ Uint32 entry_pos= (index_word & POS_MASK) >> POS_SHIFT;
assert(entry_pos + entry_len <= old_insert_pos);
assert(new_insert_pos + entry_len <= old_insert_pos);
- * index_ptr= new_insert_pos + (index_word & 0xFFFF0000);
+ * index_ptr= (new_insert_pos << POS_SHIFT) + (index_word & ~POS_MASK);
memcpy(m_data+new_insert_pos, copy_page->m_data+entry_pos, 4*entry_len);
new_insert_pos += entry_len;
@@ -278,10 +429,10 @@
for(Uint32 i = 1; i<page.high_index; i++, index_ptr--)
{
out << " [ " << i;
- if(*index_ptr >> 16)
- out << " pos: " << ((*index_ptr) & 0xFFFF)
- << " len: " << ((*index_ptr >> 16) & ~page.CHAIN)
- << (((* index_ptr >> 16) & page.CHAIN) ? " CHAIN " : " ")
+ if(! (*index_ptr & page.FREE))
+ out << " pos: " << ((* index_ptr & page.POS_MASK) >>
page.POS_SHIFT)
+ << " len: " << ((* index_ptr & page.LEN_MASK) >> page.LEN_SHIFT)
+ << ((* index_ptr & page.CHAIN) ? " CHAIN " : " ")
<< "]" << flush;
else
out << " FREE ]" << flush;
@@ -289,10 +440,10 @@
out << " free list: " << flush;
Uint32 next= page.next_free_index;
- while(next != 0xFFFF)
+ while(next != page.END_OF_FREE_LIST)
{
out << next << " " << flush;
- next= * (page.m_data+page.DATA_WORDS-next);
+ next= ((* (page.m_data+page.DATA_WORDS-next)) & page.NEXT_MASK) >>
page.NEXT_SHIFT;
}
out << "]";
return out;
--- 1.1/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp 2005-11-07 12:19:13 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp 2005-10-31 16:23:57 +01:00
@@ -127,7 +127,18 @@
Uint32 unused_ph[7];
STATIC_CONST( DATA_WORDS = File_formats::NDB_PAGE_SIZE_WORDS - 32 );
- STATIC_CONST( CHAIN = 0x8000 );
+ STATIC_CONST( CHAIN = 0x80000000 );
+ STATIC_CONST( FREE = 0x40000000 );
+ STATIC_CONST( LEN_MASK = 0x3FFF8000 );
+ STATIC_CONST( POS_MASK = 0x00007FFF );
+ STATIC_CONST( LEN_SHIFT = 15 );
+ STATIC_CONST( POS_SHIFT = 0 );
+ STATIC_CONST( END_OF_FREE_LIST = POS_MASK );
+
+ STATIC_CONST( NEXT_MASK = POS_MASK );
+ STATIC_CONST( NEXT_SHIFT = POS_SHIFT );
+ STATIC_CONST( PREV_MASK = LEN_MASK );
+ STATIC_CONST( PREV_SHIFT = LEN_SHIFT );
Uint32 m_data[DATA_WORDS];
@@ -156,6 +167,12 @@
* temp is used when having to reorg page before allocating
*/
Uint32 alloc_record(Uint32 size, Tup_varsize_page* temp, Uint32 chain);
+
+ /**
+ * Alloc page_idx from page, return page_idx
+ * temp is used when having to reorg page before allocating
+ */
+ Uint32 alloc_record(Uint32 page_idx, Uint32 size, Tup_varsize_page* temp);
/**
* Free record from page
@@ -170,8 +187,8 @@
*/
bool is_space_behind_entry(Uint32 page_index, Uint32 growth_len) const {
Uint32 idx= get_index_word(page_index);
- Uint32 pos= idx & 0xFFFF;
- Uint32 len= (idx >> 16) & ~CHAIN;
+ Uint32 pos= (idx & POS_MASK) >> POS_SHIFT;
+ Uint32 len= (idx & LEN_MASK) >> LEN_SHIFT;
if ((pos + len == insert_pos) &&
(insert_pos + growth_len < DATA_WORDS - high_index))
return true;
@@ -180,12 +197,14 @@
void grow_entry(Uint32 page_index, Uint32 growth_len) {
assert(free_space >= growth_len);
-
+
Uint32 *pos= get_index_ptr(page_index);
Uint32 idx= *pos;
- Uint32 size= (idx >> 16) + growth_len;
- *pos= (idx & 0xFFFF) + (size << 16);
- assert((idx & 0xFFFF) + ((idx >> 16) & ~CHAIN) == insert_pos);
+ assert(! (idx & FREE));
+ assert((((idx & POS_MASK) >> POS_SHIFT) + ((idx & LEN_MASK) >>
LEN_SHIFT))
+ == insert_pos);
+
+ * pos= idx + (growth_len << LEN_SHIFT);
insert_pos+= growth_len;
free_space-= growth_len;
}
@@ -193,35 +212,42 @@
void shrink_entry(Uint32 page_index, Uint32 new_size){
Uint32 *pos= get_index_ptr(page_index);
Uint32 idx= *pos;
- *pos= (idx & (CHAIN << 16 | 0xFFFF)) + (new_size << 16);
- Uint32 old_size= (idx >> 16) & ~CHAIN;
-
+ Uint32 old_pos = (idx & POS_MASK) >> POS_SHIFT;
+ Uint32 old_size = (idx & LEN_MASK) >> LEN_SHIFT;
+
+ assert( ! (idx & FREE));
assert(old_size >= new_size);
+
+ * pos= (idx & ~LEN_MASK) + (new_size << LEN_SHIFT);
Uint32 shrink = old_size - new_size;
#ifdef VM_TRACE
- memset(m_data + (idx & 0xFFFF) + new_size, 0xF1, 4 * shrink);
+ memset(m_data + old_pos + new_size, 0xF1, 4 * shrink);
#endif
free_space+= shrink;
- if(insert_pos == ((idx & 0xFFFF) + old_size))
+ if(insert_pos == (old_pos + old_size))
insert_pos -= shrink;
}
Uint32* get_ptr(Uint32 page_idx) {
- return m_data + (get_index_word(page_idx) & 0xFFFF);
+ return m_data + ((get_index_word(page_idx) & POS_MASK) >> POS_SHIFT);
}
void set_entry_offset(Uint32 page_idx, Uint32 offset){
Uint32 *pos= get_index_ptr(page_idx);
- *pos = (* pos & 0xFFFF0000) + offset;
+ * pos = (* pos & ~POS_MASK) + (offset << POS_SHIFT);
}
+ void set_entry_len(Uint32 page_idx, Uint32 len) {
+ Uint32 *pos= get_index_ptr(page_idx);
+ * pos = (*pos & ~LEN_MASK) + (len << LEN_SHIFT);
+ }
+
Uint32 get_entry_len(Uint32 page_idx) const {
- return get_index_word(page_idx) >> 16;
+ return (get_index_word(page_idx) & LEN_MASK) >> LEN_SHIFT;
}
- void set_entry_len(Uint32 page_idx, Uint32 len) {
- Uint32 *pos= get_index_ptr(page_idx);
- *pos = (len << 16) + (*pos & (CHAIN << 16 | 0xFFFF));
+ Uint32 get_entry_chain(Uint32 page_idx) const {
+ return get_index_word(page_idx) & CHAIN;
}
};
--- 1.1/storage/ndb/src/kernel/blocks/restore.cpp 2005-11-07 12:19:15 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/restore.cpp 2005-11-21 12:19:05 +01:00
@@ -859,16 +859,31 @@
}
}
- if(lcp && disk)
+ if(lcp)
{
- c.m_id = AttributeHeader::DISK_REF;
- c.m_size = 2;
- c.m_nulloffset = 0;
- c.m_flags = 0;
- if(!columns.append(_align, sizeof(Column)/sizeof(Uint32)))
+ if (disk)
{
- parse_error(signal, file_ptr, __LINE__, 0);
- return;
+ c.m_id = AttributeHeader::DISK_REF;
+ c.m_size = 2;
+ c.m_nulloffset = 0;
+ c.m_flags = 0;
+ if(!columns.append(_align, sizeof(Column)/sizeof(Uint32)))
+ {
+ parse_error(signal, file_ptr, __LINE__, 0);
+ return;
+ }
+ }
+
+ {
+ c.m_id = AttributeHeader::ROWID;
+ c.m_size = 2;
+ c.m_nulloffset = 0;
+ c.m_flags = 0;
+ if(!columns.append(_align, sizeof(Column)/sizeof(Uint32)))
+ {
+ parse_error(signal, file_ptr, __LINE__, 0);
+ return;
+ }
}
}
@@ -943,13 +958,21 @@
//if (file_ptr.p->m_table_id >= 2) { ndbout << "*** "; columns.first(it);
while (!it.isNull()) { _align[0] = *it.data; columns.next(it); _align[1] = *it.data;
columns.next(it); ndbout << c << " "; } ndbout << endl; }
- Uint32 column_no = 0;
+ Local_key rowid;
columns.first(it);
while(!it.isNull())
{
_align[0] = *it.data; ndbrequire(columns.next(it));
_align[1] = *it.data; columns.next(it);
+ if (c.m_id == AttributeHeader::ROWID)
+ {
+ rowid.m_page_no = data[0];
+ rowid.m_page_idx = data[1];
+ data += 2;
+ continue;
+ }
+
if (! (c.m_flags & Column::COL_VAR) &&
! c.m_nulloffset)
{
@@ -968,8 +991,6 @@
//if (file_ptr.p->m_table_id >= 2) ndbout << "1: " << c.m_id
<< " " << c.m_size << " col=" << column_no << endl;
}
- column_no++;
-
if(c.m_flags & Column::COL_DISK)
disk= true;
}
@@ -1040,7 +1061,7 @@
LqhKeyReq::setSameClientAndTcFlag(tmp, 0);
LqhKeyReq::setAIInLqhKeyReq(tmp, 0);
LqhKeyReq::setNoDiskFlag(tmp, disk ? 0 : 1);
- //LqhKeyReq::setExecuteDirectFlag(tmp, 1);
+ LqhKeyReq::setRowidFlag(tmp, 1);
req->clientConnectPtr = file_ptr.i;
req->hashValue = hashValue;
req->requestInfo = tmp;
@@ -1053,9 +1074,12 @@
req->transId2 = 0;
req->scanInfo = 0;
memcpy(req->variableData, key_start, 16);
-
+ Uint32 pos = keyLen > 4 ? 4 : keyLen;
+ req->variableData[pos++] = rowid.m_page_no;
+ req->variableData[pos++] = rowid.m_page_idx;
file_ptr.p->m_outstanding_operations++;
- EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal, 11+(keyLen > 4 ? 4 : keyLen));
+ EXECUTE_DIRECT(DBLQH, GSN_LQHKEYREQ, signal,
+ LqhKeyReq::FixedSignalLength+pos);
if(keyLen > 4)
{
--- 1.4/storage/ndb/include/ndb_version.h.in 2005-07-19 20:40:16 +02:00
+++ 1.5/storage/ndb/include/ndb_version.h.in 2005-11-03 14:55:34 +01:00
@@ -57,5 +57,10 @@
*/
/*#define NDB_VERSION_ID 0*/
+/**
+ * From which version do we support rowid
+ */
+#define NDBD_ROWID_VERSION (MAKE_VERSION(5,1,99))
+
#endif
--- 1.12/storage/ndb/include/kernel/AttributeHeader.hpp 2005-11-07 12:19:04 +01:00
+++ 1.13/storage/ndb/include/kernel/AttributeHeader.hpp 2005-11-05 00:00:24 +01:00
@@ -43,6 +43,7 @@
STATIC_CONST( RECORDS_IN_RANGE = 0xFFF8 );
STATIC_CONST( DISK_REF = 0xFFF7 );
+ STATIC_CONST( ROWID = 0xFFF6 );
// NOTE: in 5.1 ctors and init take size in bytes
--- 1.3/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2005-11-07 12:19:05 +01:00
+++ 1.4/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2005-11-03 13:11:46 +01:00
@@ -127,6 +127,9 @@
static void setApplicationAddressFlag(UintR & requestInfo, UintR val);
static void setMarkerFlag(UintR & requestInfo, UintR val);
static void setNoDiskFlag(UintR & requestInfo, UintR val);
+
+ static UintR getRowidFlag(const UintR & requestInfo);
+ static void setRowidFlag(UintR & requestInfo, UintR val);
};
/**
@@ -146,11 +149,11 @@
* u = Read Len Return Ind - 1 Bit (28)
* m = Commit ack marker - 1 Bit (29)
* x = No disk usage - 1 Bit (30)
- * - = Unused - 2 Bit (31)
+ * z = Use rowid for insert - 1 Bit (31)
*
* 1111111111222222222233
* 01234567890123456789012345678901
- * kkkkkkkkkklltttpdisooorraaacumx-
+ * kkkkkkkkkklltttpdisooorraaacumxz
*/
#define RI_KEYLEN_SHIFT (0)
@@ -173,6 +176,7 @@
#define RI_RETURN_AI_SHIFT (28)
#define RI_MARKER_SHIFT (29)
#define RI_NODISK_SHIFT (30)
+#define RI_ROWID_SHIFT (31)
/**
* Scan Info
@@ -480,6 +484,19 @@
UintR
LqhKeyReq::getNoDiskFlag(const UintR & requestInfo){
return (requestInfo >> RI_NODISK_SHIFT) & 1;
+}
+
+inline
+void
+LqhKeyReq::setRowidFlag(UintR & requestInfo, UintR val){
+ ASSERT_BOOL(val, "LqhKeyReq::setRowidFlag");
+ requestInfo |= (val << RI_ROWID_SHIFT);
+}
+
+inline
+UintR
+LqhKeyReq::getRowidFlag(const UintR & requestInfo){
+ return (requestInfo >> RI_ROWID_SHIFT) & 1;
}
class LqhKeyConf {
--- 1.4/storage/ndb/include/kernel/signaldata/TupKey.hpp 2005-11-07 12:19:05 +01:00
+++ 1.5/storage/ndb/include/kernel/signaldata/TupKey.hpp 2005-11-02 14:43:22 +01:00
@@ -36,7 +36,7 @@
friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16
receiverBlockNo);
public:
- STATIC_CONST( SignalLength = 16 );
+ STATIC_CONST( SignalLength = 18 );
private:
@@ -59,6 +59,8 @@
Uint32 tcOpIndex;
Uint32 savePointId;
Uint32 disk_page;
+ Uint32 m_row_id_page_no;
+ Uint32 m_row_id_page_idx;
};
class TupKeyConf {
@@ -78,7 +80,7 @@
friend bool printTUPKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16
receiverBlockNo);
public:
- STATIC_CONST( SignalLength = 5 );
+ STATIC_CONST( SignalLength = 6 );
private:
@@ -90,6 +92,7 @@
Uint32 writeLength;
Uint32 noFiredTriggers;
Uint32 lastRow;
+ Uint32 rowid;
};
class TupKeyRef {
--- 1.26/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2005-11-07 12:19:06 +01:00
+++ 1.27/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2005-11-05 00:10:39 +01:00
@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <my_config.h>
#include "Backup.hpp"
#include <ndb_version.h>
@@ -3040,24 +3041,41 @@
}
}//for
- if(lcp && disk)
+
+ if(lcp)
{
- /**
- * Remove all disk attributes, but add DISK_REF (8 bytes)
- */
- tabPtr.p->noOfAttributes -= (disk - 1);
-
- AttributePtr attrPtr;
- ndbrequire(tabPtr.p->attributes.seize(attrPtr));
-
- Uint32 sz32 = 2;
- attrPtr.p->data.m_flags = 0;
- attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
- attrPtr.p->data.m_flags = Attribute::COL_FIXED;
- attrPtr.p->data.sz32 = sz32;
-
- attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
- tabPtr.p->sz_FixedAttributes += sz32;
+ if (disk)
+ {
+ /**
+ * Remove all disk attributes, but add DISK_REF (8 bytes)
+ */
+ tabPtr.p->noOfAttributes -= (disk - 1);
+
+ AttributePtr attrPtr;
+ ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+
+ Uint32 sz32 = 2;
+ attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
+ attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+ attrPtr.p->data.sz32 = 2;
+
+ attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+ tabPtr.p->sz_FixedAttributes += sz32;
+ }
+
+ {
+ AttributePtr attrPtr;
+ ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+
+ Uint32 sz32 = 2;
+ attrPtr.p->data.attrId = AttributeHeader::ROWID;
+ attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+ attrPtr.p->data.sz32 = 2;
+
+ attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+ tabPtr.p->sz_FixedAttributes += sz32;
+ tabPtr.p->noOfAttributes ++;
+ }
}
return tabPtr;
}
--- 1.40/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2005-11-07 12:19:07 +01:00
+++ 1.41/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2005-11-02 14:43:02 +01:00
@@ -2030,7 +2030,9 @@
Uint8 seqNoReplica;
Uint8 tcNodeFailrec;
Uint8 m_disk_table;
+ Uint8 m_use_rowid;
Uint32 m_local_key;
+ Local_key m_row_id;
}; /* p2c: size = 280 bytes */
typedef Ptr<TcConnectionrec> TcConnectionrecPtr;
@@ -2984,6 +2986,8 @@
signal->theData[0] = regTcPtr.p->accConnectrec;
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS |
key->m_page_idx;
c_acc->execACCMINUPDATE(signal);
+
+ regTcPtr.p->m_row_id = *key;
}
--- 1.80/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2005-11-07 12:19:07 +01:00
+++ 1.81/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2005-11-03 16:54:19 +01:00
@@ -3277,6 +3277,7 @@
regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
regTcPtr->apiVersionNo = 0;
+ regTcPtr->m_use_rowid = LqhKeyReq::getRowidFlag(Treqinfo);
CRASH_INSERTION2(5041, regTcPtr->simpleRead &&
refToNode(signal->senderBlockRef()) != cownNodeid);
@@ -3332,6 +3333,12 @@
return;
}//if
+ sig0 = lqhKeyReq->variableData[nextPos + 0];
+ sig1 = lqhKeyReq->variableData[nextPos + 1];
+ regTcPtr->m_row_id.m_page_no = sig0;
+ regTcPtr->m_row_id.m_page_idx = sig1;
+ nextPos += 2 * LqhKeyReq::getRowidFlag(Treqinfo);
+
if ((LqhKeyReq::FixedSignalLength + nextPos + TreclenAiLqhkey) !=
signal->length()) {
LQHKEY_error(signal, 2);
@@ -3866,6 +3873,7 @@
Ttupreq = Ttupreq + (regTcPtr->operation << 6);
Ttupreq = Ttupreq + (regTcPtr->opExec << 10);
Ttupreq = Ttupreq + (regTcPtr->apiVersionNo << 11);
+ Ttupreq = Ttupreq + (regTcPtr->m_use_rowid << 11);
/* ---------------------------------------------------------------------
* Clear interpreted mode bit since we do not want the next replica to
@@ -3902,12 +3910,19 @@
tupKeyReq->transId1 = sig1;
tupKeyReq->transId2 = sig2;
tupKeyReq->fragPtr = sig3;
+
+ sig0 = tcConnectptr.p->m_row_id.m_page_no;
+ sig1 = tcConnectptr.p->m_row_id.m_page_idx;
+
tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
tupKeyReq->savePointId = tcConnectptr.p->savePointId;
tupKeyReq->disk_page= disk_page;
+ tupKeyReq->m_row_id_page_no = sig0;
+ tupKeyReq->m_row_id_page_idx = sig1;
+
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
}//Dblqh::execACCKEYCONF()
@@ -4012,6 +4027,7 @@
}//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
+ regTcPtr->m_use_rowid = tupKeyConf->rowid;
rwConcludedLab(signal);
return;
}//Dblqh::tupkeyConfLab()
@@ -4414,11 +4430,16 @@
UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
Treqinfo = preComputedRequestInfoMask & regTcPtr->reqinfo;
+ Uint32 nextNodeId = regTcPtr->nextReplica;
+ Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
+
UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
LqhKeyReq::setAIInLqhKeyReq(Treqinfo, regTcPtr->reclenAiLqhkey);
+ LqhKeyReq::setRowidFlag(Treqinfo, ~(Uint32)0 >= (Uint32)NDBD_ROWID_VERSION ?
+ regTcPtr->m_use_rowid : 0);
UintR TreadLenAiInd = (regTcPtr->readlenAi == 0 ? 0 : 1);
UintR TsameLqhAndClient = (tcConnectptr.i ==
regTcPtr->tcOprec ? 0 : 1);
@@ -4490,6 +4511,12 @@
nextPos += 4;
}//if
+ Local_key tmp = regTcPtr->m_row_id;
+
+ lqhKeyReq->variableData[nextPos] = tmp.m_page_no;
+ lqhKeyReq->variableData[nextPos + 1] = tmp.m_page_idx;
+ nextPos += 2*LqhKeyReq::getRowidFlag(Treqinfo);
+
sig0 = regTcPtr->firstAttrinfo[0];
sig1 = regTcPtr->firstAttrinfo[1];
sig2 = regTcPtr->firstAttrinfo[2];
@@ -8204,7 +8231,7 @@
Uint32 reqinfo = (scanPtr.p->scanLockHold == ZFALSE);
reqinfo = reqinfo + (regTcPtr->operation << 6);
reqinfo = reqinfo + (regTcPtr->opExec << 10);
-
+
TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
tupKeyReq->connectPtr = regTcPtr->tupConnectrec;
@@ -15961,6 +15988,7 @@
regTcPtr->nodeAfterNext[1] = ZNIL;
regTcPtr->dirtyOp = ZFALSE;
regTcPtr->tcBlockref = cownref;
+ regTcPtr->m_use_rowid = 0;
}//Dblqh::initReqinfoExecSr()
/* --------------------------------------------------------------------------
--- 1.30/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2005-11-07 13:28:15 +01:00
+++ 1.32/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2005-11-21 12:19:05 +01:00
@@ -177,7 +177,7 @@
#define ZINSERT_ERROR 630
#define ZINVALID_CHAR_FORMAT 744
-
+#define ZROWID_ALLOCATED 899
/* SOME WORD POSITIONS OF FIELDS IN SOME HEADERS */
@@ -509,17 +509,18 @@
Uint32 currentPageRange;
Uint32 rootPageRange;
Uint32 noOfPages;
- Uint32 emptyPrimPage;
-
- Uint32 thFreeFirst;
+ DLList<Page>::Head emptyPrimPage; // allocated pages (not init)
+ DLList<Page>::Head thFreeFirst; // pages with atleast 1 free record
+ SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
+
Uint32 m_lcp_scan_op;
State fragStatus;
Uint32 fragTableId;
Uint32 fragmentId;
Uint32 nextfreefrag;
- Uint32 free_var_page_array[MAX_FREE_LIST];
-
+ DLList<Page>::Head free_var_page_array[MAX_FREE_LIST];
+
DLList<ScanOp>::Head m_scanList;
bool m_undo_complete;
@@ -1178,13 +1179,15 @@
} m_var_data[2];
Tuple_header *m_disk_ptr;
- Page* m_page_ptr_p;
- Var_page* m_varpart_page_ptr_p;// could be same as m_page_ptr_p
+ PagePtr m_page_ptr;
+ PagePtr m_varpart_page_ptr; // could be same as m_page_ptr_p
PagePtr m_disk_page_ptr; //
-
+ Local_key m_row_id;
+
bool dirty_op;
bool interpreted_exec;
bool last_row;
+ bool m_use_rowid;
Signal* signal;
Uint32 no_fired_triggers;
@@ -1541,7 +1544,7 @@
void handleATTRINFOforTUPKEYREQ(Signal* signal,
const Uint32* data,
Uint32 length,
- Operationrec * const regOperPtr);
+ Operationrec * regOperPtr);
// *****************************************************************
// Setting up the environment for reads, inserts, updates and deletes.
@@ -1549,16 +1552,16 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
int handleReadReq(Signal* signal,
- Operationrec* const regOperPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct);
//------------------------------------------------------------------
//------------------------------------------------------------------
int handleUpdateReq(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct,
bool disk);
@@ -1567,23 +1570,23 @@
int handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord>,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct);
//------------------------------------------------------------------
//------------------------------------------------------------------
int handleDeleteReq(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct);
//------------------------------------------------------------------
//------------------------------------------------------------------
int updateStartLab(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct);
// *****************************************************************
@@ -1615,19 +1618,19 @@
void sendReadAttrinfo(Signal* signal,
KeyReqStruct *req_struct,
Uint32 TnoOfData,
- const Operationrec * const regOperPtr);
+ const Operationrec * regOperPtr);
//------------------------------------------------------------------
//------------------------------------------------------------------
void sendLogAttrinfo(Signal* signal,
Uint32 TlogSize,
- Operationrec * const regOperPtr);
+ Operationrec * regOperPtr);
//------------------------------------------------------------------
//------------------------------------------------------------------
void sendTUPKEYCONF(Signal* signal,
KeyReqStruct *req_struct,
- Operationrec * const regOperPtr);
+ Operationrec * regOperPtr);
//------------------------------------------------------------------
//------------------------------------------------------------------
@@ -1842,7 +1845,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- void setUpQueryRoutines(Tablerec* const regTabPtr);
+ void setUpQueryRoutines(Tablerec* regTabPtr);
// *****************************************************************
// Service methods.
@@ -1862,7 +1865,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- void copyAttrinfo(Operationrec * const regOperPtr, Uint32* inBuffer);
+ void copyAttrinfo(Operationrec * regOperPtr, Uint32* inBuffer);
//------------------------------------------------------------------
//------------------------------------------------------------------
@@ -1874,7 +1877,7 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
- int initStoredOperationrec(Operationrec* const regOperPtr,
+ int initStoredOperationrec(Operationrec* regOperPtr,
KeyReqStruct* req_struct,
Uint32 storedId);
@@ -1904,57 +1907,57 @@
void
checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const tablePtr);
+ Operationrec* regOperPtr,
+ Tablerec* tablePtr);
void
checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const tablePtr);
+ Operationrec* regOperPtr,
+ Tablerec* tablePtr);
void
checkImmediateTriggersAfterDelete(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const tablePtr);
+ Operationrec* regOperPtr,
+ Tablerec* tablePtr);
#if 0
void checkDeferredTriggers(Signal* signal,
- Operationrec* const regOperPtr,
- Tablerec* const regTablePtr);
+ Operationrec* regOperPtr,
+ Tablerec* regTablePtr);
#endif
void checkDetachedTriggers(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const regTablePtr);
+ Operationrec* regOperPtr,
+ Tablerec* regTablePtr);
void fireImmediateTriggers(KeyReqStruct *req_struct,
ArrayList<TupTriggerData>& triggerList,
- Operationrec* const regOperPtr);
+ Operationrec* regOperPtr);
void fireDeferredTriggers(KeyReqStruct *req_struct,
ArrayList<TupTriggerData>& triggerList,
- Operationrec* const regOperPtr);
+ Operationrec* regOperPtr);
void fireDetachedTriggers(KeyReqStruct *req_struct,
ArrayList<TupTriggerData>& triggerList,
- Operationrec* const regOperPtr);
+ Operationrec* regOperPtr);
void executeTriggers(KeyReqStruct *req_struct,
ArrayList<TupTriggerData>& triggerList,
- Operationrec* const regOperPtr);
+ Operationrec* regOperPtr);
void executeTrigger(KeyReqStruct *req_struct,
- TupTriggerData* const trigPtr,
- Operationrec* const regOperPtr);
+ TupTriggerData* trigPtr,
+ Operationrec* regOperPtr);
- bool readTriggerInfo(TupTriggerData* const trigPtr,
- Operationrec* const regOperPtr,
- KeyReqStruct * const req_struct,
- Fragrecord* const regFragPtr,
- Uint32* const keyBuffer,
+ bool readTriggerInfo(TupTriggerData* trigPtr,
+ Operationrec* regOperPtr,
+ KeyReqStruct * req_struct,
+ Fragrecord* regFragPtr,
+ Uint32* keyBuffer,
Uint32& noPrimKey,
- Uint32* const afterBuffer,
+ Uint32* afterBuffer,
Uint32& noAfterWords,
- Uint32* const beforeBuffer,
+ Uint32* beforeBuffer,
Uint32& noBeforeWords);
void sendTrigAttrInfo(Signal* signal,
@@ -1969,8 +1972,8 @@
void sendFireTrigOrd(Signal* signal,
KeyReqStruct *req_struct,
- Operationrec * const regOperPtr,
- TupTriggerData* const trigPtr,
+ Operationrec * regOperPtr,
+ TupTriggerData* trigPtr,
Uint32 fragmentId,
Uint32 noPrimKeySignals,
Uint32 noBeforeSignals,
@@ -1981,19 +1984,19 @@
// these set terrorCode and return non-zero on error
int executeTuxInsertTriggers(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
int executeTuxUpdateTriggers(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
int executeTuxDeleteTriggers(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
int addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
@@ -2003,13 +2006,13 @@
void executeTuxCommitTriggers(Signal* signal,
Operationrec* regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
void executeTuxAbortTriggers(Signal* signal,
Operationrec* regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
void removeTuxEntries(Signal* signal,
Operationrec* regOperPtr,
@@ -2091,7 +2094,7 @@
//------------------------------------------------------------------
#if 0
- void checkPages(Fragrecord* const regFragPtr);
+ void checkPages(Fragrecord* regFragPtr);
#endif
Uint32 convert_byte_to_word_size(Uint32 byte_size)
{
@@ -2105,59 +2108,53 @@
void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*);
void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*);
void setup_fixed_part(KeyReqStruct* req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Tablerec* regTabPtr);
void send_TUPKEYREF(Signal* signal,
- Operationrec* const regOperPtr);
+ Operationrec* regOperPtr);
void early_tupkey_error(Signal* signal);
void printoutTuplePage(Uint32 fragid, Uint32 pageid, Uint32 printLimit);
bool checkUpdateOfPrimaryKey(KeyReqStruct *req_struct,
Uint32* updateBuffer,
- Tablerec* const regTabPtr);
+ Tablerec* regTabPtr);
- void setNullBits(Uint32*, Tablerec* const regTabPtr);
+ void setNullBits(Uint32*, Tablerec* regTabPtr);
bool checkNullAttributes(KeyReqStruct * const, Tablerec* const);
bool setup_read(KeyReqStruct* req_struct,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
bool disk);
- bool getPageLastCommitted(Operationrec* const regOperPtr,
- Operationrec* const leaderOpPtr);
-
- bool getPageThroughSavePoint(Operationrec* const regOperPtr,
- Operationrec* const leaderOpPtr);
-
- Uint32 calculateChecksum(Tuple_header*, Tablerec* const regTabPtr);
- void setChecksum(Tuple_header*, Tablerec* const regTabPtr);
+ Uint32 calculateChecksum(Tuple_header*, Tablerec* regTabPtr);
+ void setChecksum(Tuple_header*, Tablerec* regTabPtr);
void complexTrigger(Signal* signal,
KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
- void setTupleStatesSetOpType(Operationrec* const regOperPtr,
+ void setTupleStatesSetOpType(Operationrec* regOperPtr,
KeyReqStruct *req_struct,
- Page* const pagePtr,
+ Page* pagePtr,
Uint32& opType,
OperationrecPtr& firstOpPtr);
void findBeforeValueOperation(OperationrecPtr& befOpPtr,
OperationrecPtr firstOpPtr);
- void calculateChangeMask(Page* const PagePtr,
- Tablerec* const regTabPtr,
- KeyReqStruct * const req_struct);
+ void calculateChangeMask(Page* PagePtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct * req_struct);
void updateGcpId(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
void setTupleStateOnPreviousOps(Uint32 prevOpIndex);
void copyMem(Signal* signal, Uint32 sourceIndex, Uint32 destIndex);
@@ -2169,14 +2166,14 @@
void updatePackedList(Signal* signal, Uint16 ahostIndex);
void setUpDescriptorReferences(Uint32 descriptorReference,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
const Uint32* offset);
- void setUpKeyArray(Tablerec* const regTabPtr);
- bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex);
- void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId);
+ void setUpKeyArray(Tablerec* regTabPtr);
+ bool addfragtotab(Tablerec* regTabPtr, Uint32 fragId, Uint32 fragIndex);
+ void deleteFragTab(Tablerec* regTabPtr, Uint32 fragId);
void abortAddFragOp(Signal* signal);
- void releaseTabDescr(Tablerec* const regTabPtr);
- void getFragmentrec(FragrecordPtr& regFragPtr, Uint32 fragId, Tablerec* const
regTabPtr);
+ void releaseTabDescr(Tablerec* regTabPtr);
+ void getFragmentrec(FragrecordPtr& regFragPtr, Uint32 fragId, Tablerec* regTabPtr);
void initialiseRecordsLab(Signal* signal, Uint32 switchData, Uint32, Uint32);
void initializeAttrbufrec();
@@ -2193,7 +2190,7 @@
void initializeTabDescr();
void initializeUndoPage();
- void initTab(Tablerec* const regTabPtr);
+ void initTab(Tablerec* regTabPtr);
void startphase3Lab(Signal* signal, Uint32 config1, Uint32 config2);
@@ -2203,17 +2200,17 @@
void fragrefuse3Lab(Signal* signal,
FragoperrecPtr fragOperPtr,
FragrecordPtr regFragPtr,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
Uint32 fragId);
void fragrefuse4Lab(Signal* signal,
FragoperrecPtr fragOperPtr,
FragrecordPtr regFragPtr,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
Uint32 fragId);
void addattrrefuseLab(Signal* signal,
FragrecordPtr regFragPtr,
FragoperrecPtr fragOperPtr,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
Uint32 fragId);
@@ -2290,24 +2287,25 @@
//------------------------------------------------------------------------------------------------------
//
// Public methods
- Uint32 getRealpid(Fragrecord* const regFragPtr, Uint32 logicalPageId);
- Uint32 getNoOfPages(Fragrecord* const regFragPtr);
+ Uint32 getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId);
+ Uint32 getNoOfPages(Fragrecord* regFragPtr);
void initPageRangeSize(Uint32 size);
- bool insertPageRangeTab(Fragrecord* const regFragPtr,
+ bool insertPageRangeTab(Fragrecord* regFragPtr,
Uint32 startPageId,
Uint32 noPages);
- void releaseFragPages(Fragrecord* const regFragPtr);
- void initFragRange(Fragrecord* const regFragPtr);
+ void releaseFragPages(Fragrecord* regFragPtr);
+ void initFragRange(Fragrecord* regFragPtr);
void initializePageRange();
- Uint32 getEmptyPage(Fragrecord* const regFragPtr);
- Uint32 allocFragPages(Fragrecord* const regFragPtr, Uint32 noOfPagesAllocated);
-
+ Uint32 getEmptyPage(Fragrecord* regFragPtr);
+ Uint32 allocFragPages(Fragrecord* regFragPtr, Uint32 noOfPagesAllocated);
+ Uint32 get_empty_var_page(Fragrecord* frag_ptr);
+
// Private methods
- Uint32 leafPageRangeFull(Fragrecord* const regFragPtr, PageRangePtr currPageRangePtr);
+ Uint32 leafPageRangeFull(Fragrecord* regFragPtr, PageRangePtr currPageRangePtr);
void releasePagerange(PageRangePtr regPRPtr);
void seizePagerange(PageRangePtr& regPageRangePtr);
void errorHandler(Uint32 errorCode);
- void allocMoreFragPages(Fragrecord* const regFragPtr);
+ void allocMoreFragPages(Fragrecord* regFragPtr);
// Private data
Uint32 cfirstfreerange;
@@ -2327,7 +2325,7 @@
// Private methods
Uint32 get_alloc_page(Fragrecord* const, Uint32);
- void update_free_page_list(Fragrecord* const, Var_page*);
+ void update_free_page_list(Fragrecord* const, Ptr<Page>);
#if 0
Uint32 calc_free_list(const Tablerec* regTabPtr, Uint32 sz) const {
@@ -2345,25 +2343,28 @@
//---------------------------------------------------------------
//
// Public methods
- Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*,
- Uint32 base);
- void free_var_part(Fragrecord*, Tablerec*, Var_part_ref, Uint32 chain);
- void free_var_part(Fragrecord*, Tablerec*, Local_key*, Var_page*, Uint32 chain);
+ Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
+ void free_var_rec(Fragrecord*, Tablerec*, Local_key*, Ptr<Page>);
+ Uint32* alloc_var_part(Fragrecord*, Tablerec*, Uint32, Local_key*);
+ int realloc_var_part(Fragrecord*, Tablerec*,
+ PagePtr, Var_part_ref*, Uint32, Uint32);
void validate_page(Tablerec*, Var_page* page);
Uint32* alloc_fix_rec(Fragrecord*, Tablerec*, Local_key*, Uint32 *);
void free_fix_rec(Fragrecord*, Tablerec*, Local_key*, Fix_page*);
+ Uint32* alloc_fix_rowid(Fragrecord*, Tablerec*, Local_key*, Uint32 *);
+ Uint32* alloc_var_rowid(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
// Private methods
void convertThPage(Uint32 Tupheadsize,
- Fix_page* const regPagePtr);
+ Fix_page* regPagePtr);
/**
* Return offset
*/
- Uint32 alloc_tuple_from_page(Fragrecord* const regFragPtr,
- Fix_page* const regPagePtr);
+ Uint32 alloc_tuple_from_page(Fragrecord* regFragPtr,
+ Fix_page* regPagePtr);
//---------------------------------------------------------------
// Temporary variables used for storing commonly used variables
@@ -2396,8 +2397,7 @@
ArrayPool<Operationrec> c_operation_pool;
- Page *cpage;
- Uint32 cnoOfPage;
+ ArrayPool<Page> c_page_pool;
Uint32 cnoOfAllocatedPages;
Tablerec *tablerec;
@@ -2435,9 +2435,6 @@
// Trigger variables
Uint32 c_maxTriggersPerTable;
- STATIC_CONST(MAX_PARALLELL_TUP_SRREQ = 2);
- Uint32 c_sr_free_page_0;
-
Uint32 c_errorInsert4000TableId;
Uint32 c_min_list_size[MAX_FREE_LIST + 1];
Uint32 c_max_list_size[MAX_FREE_LIST + 1];
@@ -2460,9 +2457,9 @@
bool disk);
Uint32* get_ptr(Var_part_ref);
- Uint32* get_ptr(Ptr<Var_page>*, Var_part_ref);
+ Uint32* get_ptr(PagePtr*, Var_part_ref);
Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*);
- Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*, Uint32 mm);
+ Uint32* get_dd_ptr(PagePtr*, const Local_key*, const Tablerec*);
/**
* prealloc space from disk
@@ -2559,7 +2556,7 @@
#endif
void fix_commit_order(OperationrecPtr);
- void commit_operation(Signal*, Uint32, Tuple_header*, Page*,
+ void commit_operation(Signal*, Uint32, Tuple_header*, PagePtr,
Operationrec*, Fragrecord*, Tablerec*);
void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*,
@@ -2568,8 +2565,8 @@
int handle_size_change_after_update(KeyReqStruct* req_struct,
Tuple_header* org,
Operationrec*,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
Uint32 sizes[4]);
/**
@@ -2579,6 +2576,7 @@
void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
};
+#if 0
inline
Uint32
Dbtup::get_frag_page_id(Uint32 real_page_id)
@@ -2588,17 +2586,18 @@
ptrCheckGuard(real_page_ptr, cnoOfPage, cpage);
return real_page_ptr.p->frag_page_id;
}
+#endif
inline
Dbtup::TransState
-Dbtup::get_trans_state(Operationrec * const regOperPtr)
+Dbtup::get_trans_state(Operationrec * regOperPtr)
{
return (Dbtup::TransState)regOperPtr->op_struct.trans_state;
}
inline
void
-Dbtup::set_trans_state(Operationrec* const regOperPtr,
+Dbtup::set_trans_state(Operationrec* regOperPtr,
Dbtup::TransState trans_state)
{
regOperPtr->op_struct.trans_state= (Uint32)trans_state;
@@ -2606,14 +2605,14 @@
inline
Dbtup::TupleState
-Dbtup::get_tuple_state(Operationrec * const regOperPtr)
+Dbtup::get_tuple_state(Operationrec * regOperPtr)
{
return (Dbtup::TupleState)regOperPtr->op_struct.tuple_state;
}
inline
void
-Dbtup::set_tuple_state(Operationrec* const regOperPtr,
+Dbtup::set_tuple_state(Operationrec* regOperPtr,
Dbtup::TupleState tuple_state)
{
regOperPtr->op_struct.tuple_state= (Uint32)tuple_state;
@@ -2629,14 +2628,14 @@
inline
Dbtup::ChangeMaskState
-Dbtup::get_change_mask_state(Operationrec * const regOperPtr)
+Dbtup::get_change_mask_state(Operationrec * regOperPtr)
{
return (Dbtup::ChangeMaskState)regOperPtr->op_struct.change_mask_state;
}
inline
void
-Dbtup::set_change_mask_state(Operationrec * const regOperPtr,
+Dbtup::set_change_mask_state(Operationrec * regOperPtr,
ChangeMaskState new_state)
{
regOperPtr->op_struct.change_mask_state= (Uint32)new_state;
@@ -2644,8 +2643,8 @@
inline
void
-Dbtup::update_change_mask_info(KeyReqStruct * const req_struct,
- Operationrec * const regOperPtr)
+Dbtup::update_change_mask_info(KeyReqStruct * req_struct,
+ Operationrec * regOperPtr)
{
//Save change mask
if (req_struct->max_attr_id_updated == 0) {
@@ -2665,19 +2664,19 @@
Uint32*
Dbtup::get_ptr(Var_part_ref ref)
{
- Ptr<Var_page> tmp;
+ Ptr<Page> tmp;
return get_ptr(&tmp, ref);
}
inline
Uint32*
-Dbtup::get_ptr(Ptr<Var_page>* pagePtr, Var_part_ref ref)
+Dbtup::get_ptr(Ptr<Page>* pagePtr, Var_part_ref ref)
{
PagePtr tmp;
Uint32 page_idx= ref.m_ref & MAX_TUPLES_PER_PAGE;
tmp.i= ref.m_ref >> MAX_TUPLES_BITS;
- ptrCheckGuard(tmp, cnoOfPage, cpage);
+ c_page_pool.getPtr(tmp);
memcpy(pagePtr, &tmp, sizeof(tmp));
return ((Var_page*)tmp.p)->get_ptr(page_idx);
}
@@ -2689,38 +2688,28 @@
{
PagePtr tmp;
tmp.i= key->m_page_no;
- ptrCheckGuard(tmp, cnoOfPage, cpage);
+ c_page_pool.getPtr(tmp);
memcpy(pagePtr, &tmp, sizeof(tmp));
- if(regTabPtr->m_attributes[MM].m_no_of_varsize)
- return ((Var_page*)tmp.p)->get_ptr(key->m_page_idx);
- else
- return ((Fix_page*)tmp.p)->
- get_ptr(key->m_page_idx, regTabPtr->m_offsets[MM].m_fix_header_size);
+ return ((Fix_page*)tmp.p)->
+ get_ptr(key->m_page_idx, regTabPtr->m_offsets[MM].m_fix_header_size);
}
inline
Uint32*
-Dbtup::get_ptr(PagePtr* pagePtr,
- const Local_key* key, const Tablerec* regTabPtr, Uint32 mm)
+Dbtup::get_dd_ptr(PagePtr* pagePtr,
+ const Local_key* key, const Tablerec* regTabPtr)
{
PagePtr tmp;
tmp.i= key->m_page_no;
- if(mm == MM)
- {
- ptrCheckGuard(tmp, cnoOfPage, cpage);
- }
- else
- {
- tmp.p= (Page*)m_global_page_pool.getPtr(tmp.i);
- }
+ tmp.p= (Page*)m_global_page_pool.getPtr(tmp.i);
memcpy(pagePtr, &tmp, sizeof(tmp));
- if(regTabPtr->m_attributes[mm].m_no_of_varsize)
+ if(regTabPtr->m_attributes[DD].m_no_of_varsize)
return ((Var_page*)tmp.p)->get_ptr(key->m_page_idx);
else
return ((Fix_page*)tmp.p)->
- get_ptr(key->m_page_idx, regTabPtr->m_offsets[mm].m_fix_header_size);
+ get_ptr(key->m_page_idx, regTabPtr->m_offsets[DD].m_fix_header_size);
}
NdbOut&
--- 1.4/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2005-11-07 12:19:07 +01:00
+++ 1.5/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2005-11-04 12:08:38 +01:00
@@ -139,32 +139,32 @@
{
ndbout_c("abort grow");
Var_page *pageP= (Var_page*)page.p;
+ Ptr<Page> vpage;
Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx, sz;
Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
Uint32 *var_part;
if(! (tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW))
{
var_part= tuple_ptr->get_var_part_ptr(regTabPtr.p);
- sz= Tuple_header::HeaderSize +
- regTabPtr.p->m_offsets[MM].m_fix_header_size;
+ sz= Tuple_header::HeaderSize +
+ regTabPtr.p->m_offsets[MM].m_fix_header_size;
+ vpage = page;
}
else
{
- Ptr<Var_page> vpage;
Uint32 ref= * tuple_ptr->get_var_part_ptr(regTabPtr.p);
Local_key tmp;
tmp=ref;
sz= 0;
- idx= tmp.m_page_idx;
- var_part= get_ptr(&vpage, *(Var_part_ref*)&ref);
- pageP= vpage.p;
+ idx= tmp.m_page_idx;
+ var_part= get_ptr(&vpage, *(Var_part_ref*)&ref);
}
Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
sz += ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
ndbassert(sz <= len);
pageP->shrink_entry(idx, sz);
- update_free_page_list(regFragPtr.p, pageP);
+ update_free_page_list(regFragPtr.p, vpage);
}
else if(bits & Tuple_header::MM_SHRINK)
{
--- 1.5/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2005-11-07 12:19:07 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2005-11-21 12:19:05 +01:00
@@ -55,14 +55,7 @@
if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
{
ljam();
-
- if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
- {
- free_var_part(regFragPtr.p, regTabPtr.p,
- *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p),
- Var_page::CHAIN);
- }
- free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p, 0);
+ free_var_rec(regFragPtr.p, regTabPtr.p, &tmp, pagePtr);
} else {
free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
}
@@ -181,7 +174,7 @@
Dbtup::commit_operation(Signal* signal,
Uint32 gci,
Tuple_header* tuple_ptr,
- Page* page,
+ PagePtr pagePtr,
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
@@ -197,76 +190,39 @@
Uint32 copy_bits= copy->m_header_bits;
- Uint32 fix_size= regTabPtr->m_offsets[MM].m_fix_header_size;
+ Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
if(mm_vars == 0)
{
- memcpy(tuple_ptr, copy, 4*fix_size);
- //ndbout_c("commit: memcpy %p %p %d", tuple_ptr, copy, 4*fix_size);
- disk_ptr= (Tuple_header*)(((Uint32*)copy)+fix_size);
+ memcpy(tuple_ptr, copy, 4*fixsize);
+ disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
}
- else if(bits & Tuple_header::CHAINED_ROW)
+ else
{
Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr);
- memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fix_size));
-
+ memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fixsize));
+
Local_key tmp; tmp= *ref;
- if(0) printf("%p %d %d (%d bytes) - ref: %x ", tuple_ptr,
- regOperPtr->m_tuple_location.m_page_no,
- regOperPtr->m_tuple_location.m_page_idx,
- 4*(Tuple_header::HeaderSize+fix_size),
- *ref);
- Ptr<Var_page> vpagePtr;
+
+ PagePtr vpagePtr;
Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref);
+ Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
Uint32 *src= copy->get_var_part_ptr(regTabPtr);
Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
- ndbassert(4*vpagePtr.p->get_entry_len(tmp.m_page_idx) >= sz);
+ ndbassert(4*vpagePtrP->get_entry_len(tmp.m_page_idx) >= sz);
memcpy(dst, src, sz);
- if(0) printf("ptr: %p %d ref: %x - chain commit", dst, sz, *ref);
+
copy_bits |= Tuple_header::CHAINED_ROW;
- if(0)
- {
- for(Uint32 i = 0; i<((sz+3)>>2); i++)
- printf(" %.8x", src[i]);
- printf("\n");
- }
-
if(copy_bits & Tuple_header::MM_SHRINK)
{
- if(0) printf(" - shrink %d -> %d - ",
- vpagePtr.p->get_entry_len(tmp.m_page_idx), (sz + 3) >> 2);
- vpagePtr.p->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
- if(0)ndbout_c("%p->shrink_entry(%d, %d)", vpagePtr.p, tmp.m_page_idx,
- (sz + 3) >> 2);
- update_free_page_list(regFragPtr, vpagePtr.p);
+ vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+ update_free_page_list(regFragPtr, vpagePtr);
}
- if(0) ndbout_c("");
+
disk_ptr = (Tuple_header*)
- (((Uint32*)copy)+Tuple_header::HeaderSize+fix_size+((sz + 3) >> 2));
+ (((Uint32*)copy)+Tuple_header::HeaderSize+fixsize+((sz + 3) >> 2));
}
- else
- {
- Uint32 *var_part= copy->get_var_part_ptr(regTabPtr);
- Uint32 sz= Tuple_header::HeaderSize + fix_size +
- ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
- ndbassert(((Var_page*)page)->
- get_entry_len(regOperPtr->m_tuple_location.m_page_idx) >= sz);
- memcpy(tuple_ptr, copy, 4*sz);
- if(0) ndbout_c("%p %d %d (%d bytes)", tuple_ptr,
- regOperPtr->m_tuple_location.m_page_no,
- regOperPtr->m_tuple_location.m_page_idx,
- 4*sz);
- if(copy_bits & Tuple_header::MM_SHRINK)
- {
- ((Var_page*)page)->shrink_entry(regOperPtr->m_tuple_location.m_page_idx,
- sz);
- if(0)ndbout_c("%p->shrink_entry(%d, %d)",
- page, regOperPtr->m_tuple_location.m_page_idx, sz);
- update_free_page_list(regFragPtr, (Var_page*)page);
- }
- disk_ptr = (Tuple_header*)(((Uint32*)copy)+sz);
- }
if (regTabPtr->m_no_of_disk_attributes &&
(copy_bits & Tuple_header::DISK_INLINE))
@@ -276,13 +232,13 @@
Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
- PagePtr pagePtr = *(PagePtr*)&m_pgman.m_ptr;
- ndbassert(pagePtr.p->m_page_no == key.m_page_no);
- ndbassert(pagePtr.p->m_file_no == key.m_file_no);
+ PagePtr diskPagePtr = *(PagePtr*)&m_pgman.m_ptr;
+ ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+ ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
Uint32 sz, *dst;
if(copy_bits & Tuple_header::DISK_ALLOC)
{
- disk_page_alloc(signal, regTabPtr, regFragPtr, &key, pagePtr, gci);
+ disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
if(lcpScan_ptr_i != RNIL)
{
@@ -301,17 +257,18 @@
if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
{
sz= regTabPtr->m_offsets[DD].m_fix_header_size;
- dst= ((Fix_page*)pagePtr.p)->get_ptr(key.m_page_idx, sz);
+ dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
}
else
{
- dst= ((Var_page*)pagePtr.p)->get_ptr(key.m_page_idx);
- sz= ((Var_page*)pagePtr.p)->get_entry_len(key.m_page_idx);
+ dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+ sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
}
if(! (copy_bits & Tuple_header::DISK_ALLOC))
{
- disk_page_undo_update(pagePtr.p, &key, dst, sz, gci, logfile_group_id);
+ disk_page_undo_update(diskPagePtr.p,
+ &key, dst, sz, gci, logfile_group_id);
}
memcpy(dst, disk_ptr, 4*sz);
@@ -320,7 +277,7 @@
ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
copy_bits |= Tuple_header::DISK_PART;
}
-
+
Uint32 clear=
Tuple_header::ALLOC |
@@ -587,7 +544,7 @@
if(regOperPtr.p->op_struct.op_type != ZDELETE)
{
- commit_operation(signal, gci, tuple_ptr, page.p,
+ commit_operation(signal, gci, tuple_ptr, page,
regOperPtr.p, regFragPtr.p, regTabPtr.p);
removeActiveOpList(regOperPtr.p, tuple_ptr);
}
--- 1.10/storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp 2005-11-07 12:19:07 +01:00
+++ 1.11/storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp 2005-11-04 07:58:31 +01:00
@@ -38,7 +38,7 @@
PagePtr regPagePtr;
ljamEntry();
regPagePtr.i = signal->theData[0];
- ptrCheckGuard(regPagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(regPagePtr);
}//Dbtup::execDEBUG_SIG()
#ifdef TEST_MR
@@ -72,7 +72,7 @@
signal->theData[1] = incDec;
signal->theData[2] = sizeof(Page);
signal->theData[3] = cnoOfAllocatedPages;
- signal->theData[4] = cnoOfPage;
+ signal->theData[4] = c_page_pool.getSize();
signal->theData[5] = DBTUP;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6, JBB);
}
@@ -168,7 +168,7 @@
// Case
Uint32 c = (rand() % 3);
- const Uint32 free = cnoOfPage - cnoOfAllocatedPages;
+ const Uint32 free = c_page_pool.getSize() - cnoOfAllocatedPages;
Uint32 alloc = 0;
if(free <= 1){
@@ -213,7 +213,7 @@
for(Uint32 i = 0; i<chunk.pageCount; i++){
PagePtr pagePtr;
pagePtr.i = chunk.pageId + i;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr);
pagePtr.p->page_state = ~ZFREE_COMMON;
}
@@ -281,8 +281,7 @@
FragrecordPtr tmpFragP;
TablerecPtr tmpTableP;
- tmpPageP.i = pageid;
- ptrCheckGuard(tmpPageP, cnoOfPage, cpage);
+ c_page_pool.getPtr(tmpPageP, pageid);
tmpFragP.i = fragid;
ptrCheckGuard(tmpFragP, cnoOfFragrec, fragrecord);
--- 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2005-11-07 13:28:15 +01:00
+++ 1.25/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2005-11-21 12:19:05 +01:00
@@ -33,7 +33,7 @@
/* ----------------------------------------------------------------- */
/* ----------- INIT_STORED_OPERATIONREC -------------- */
/* ----------------------------------------------------------------- */
-int Dbtup::initStoredOperationrec(Operationrec* const regOperPtr,
+int Dbtup::initStoredOperationrec(Operationrec* regOperPtr,
KeyReqStruct* req_struct,
Uint32 storedId)
{
@@ -54,7 +54,7 @@
return terrorCode;
}
-void Dbtup::copyAttrinfo(Operationrec * const regOperPtr,
+void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
Uint32* inBuffer)
{
AttrbufrecPtr copyAttrBufPtr;
@@ -108,7 +108,7 @@
void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
const Uint32 *data,
Uint32 len,
- Operationrec * const regOperPtr)
+ Operationrec * regOperPtr)
{
while(len)
{
@@ -227,7 +227,7 @@
void
Dbtup::setChecksum(Tuple_header* tuple_ptr,
- Tablerec* const regTabPtr)
+ Tablerec* regTabPtr)
{
tuple_ptr->m_checksum= 0;
tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
@@ -235,7 +235,7 @@
Uint32
Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
- Tablerec* const regTabPtr)
+ Tablerec* regTabPtr)
{
Uint32 checksum;
Uint32 i, rec_size, *tuple_header;
@@ -342,9 +342,9 @@
bool
Dbtup::setup_read(KeyReqStruct *req_struct,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
bool disk)
{
OperationrecPtr currOpPtr;
@@ -433,167 +433,6 @@
return false;
}
- bool
- Dbtup::getPageThroughSavePoint(Operationrec* regOperPtr,
- Operationrec* leaderOpPtr)
- {
- bool found= false;
- OperationrecPtr loopOpPtr;
- loopOpPtr.p= leaderOpPtr;
- int res= 0;
- while(true) {
- ndbout_c("%d regOperPtr->savepointId: %d loopOpPtr.p->savepointId: %d",
- res++, regOperPtr->savepointId, loopOpPtr.p->savepointId);
- if (regOperPtr->savepointId > loopOpPtr.p->savepointId) {
- jam();
- found= true;
- break;
- }
- if (loopOpPtr.p->nextActiveOp == RNIL) {
- break;
- }
- loopOpPtr.i= loopOpPtr.p->nextActiveOp;
- c_operation_pool.getPtr(loopOpPtr);
- jam();
- }
- if (!found) {
- return getPageLastCommitted(regOperPtr, loopOpPtr.p);
- } else {
- if (loopOpPtr.p->op_struct.op_type == ZDELETE) {
- jam();
- terrorCode= ZTUPLE_DELETED_ERROR;
- return false;
- }
- if (get_tuple_state(loopOpPtr.p) == TUPLE_ALREADY_ABORTED) {
- /*
- Requested tuple version has already been aborted
- */
- jam();
- terrorCode= ZMUST_BE_ABORTED_ERROR;
- return false;
- }
- bool use_copy;
- if (loopOpPtr.p->prevActiveOp == RNIL) {
- jam();
- /*
- Use original tuple since we are reading from the last written tuple.
- We are the
- */
- use_copy= false;
- } else {
- /*
- Go forward in time to find a copy of the tuple which this operation
- produced
- */
- loopOpPtr.i= loopOpPtr.p->prevActiveOp;
- c_operation_pool.getPtr(loopOpPtr);
- if (loopOpPtr.p->op_struct.op_type == ZDELETE) {
- /*
- This operation was a Delete and thus have no copy tuple attached to
- it. We will move forward to the next that either doesn't exist in
- which case we will return the original tuple of any operation and
- otherwise it must be an insert which contains a copy record.
- */
- if (loopOpPtr.p->prevActiveOp == RNIL) {
- jam();
- use_copy= false;
- } else {
- jam();
- loopOpPtr.i= loopOpPtr.p->prevActiveOp;
- c_operation_pool.getPtr(loopOpPtr);
- ndbrequire(loopOpPtr.p->op_struct.op_type == ZINSERT);
- use_copy= true;
- }
- } else if (loopOpPtr.p->op_struct.op_type == ZUPDATE) {
- jam();
- /*
- This operation which was the next in time have a copy which was the
- result of the previous operation which we want to use. Thus use
- the copy tuple of this operation.
- */
- use_copy= true;
- } else {
- /*
- This operation was an insert that happened after an insert or update.
- This is not a possible case.
- */
- ndbrequire(false);
- return false;
- }
- }
- if (use_copy) {
- ndbrequire(false);
- regOperPtr->m_tuple_location= loopOpPtr.p->m_copy_tuple_location;
- } else {
- regOperPtr->m_tuple_location= loopOpPtr.p->m_tuple_location;
- }
- return true;
- }
- }
-
- bool
- Dbtup::getPageLastCommitted(Operationrec* const regOperPtr,
- Operationrec* const leaderOpPtr)
- {
- //----------------------------------------------------------------------
- // Dirty reads wants to read the latest committed tuple. The latest
- // tuple value could be not existing or else we have to find the copy
- // tuple. Start by finding the end of the list to find the first operation
- // on the record in the ongoing transaction.
- //----------------------------------------------------------------------
- jam();
- OperationrecPtr loopOpPtr;
- loopOpPtr.p= leaderOpPtr;
- while (loopOpPtr.p->nextActiveOp != RNIL) {
- jam();
- loopOpPtr.i= loopOpPtr.p->nextActiveOp;
- c_operation_pool.getPtr(loopOpPtr);
- }
- if (loopOpPtr.p->op_struct.op_type == ZINSERT) {
- jam();
- //----------------------------------------------------------------------
- // With an insert in the start of the list we know that the tuple did not
- // exist before this transaction was started. We don't care if the current
- // transaction is in the commit phase since the commit is not really
- // completed until the operation is gone from TUP.
- //----------------------------------------------------------------------
- terrorCode= ZTUPLE_DELETED_ERROR;
- return false;
- } else {
- //----------------------------------------------------------------------
- // A successful update and delete as first in the queue means that a tuple
- // exist in the committed world. We need to find it.
- //----------------------------------------------------------------------
- if (loopOpPtr.p->op_struct.op_type == ZUPDATE) {
- jam();
- //----------------------------------------------------------------------
- // The first operation was a delete we set our tuple reference to the
- // copy tuple of this operation.
- //----------------------------------------------------------------------
- ndbrequire(false);
- regOperPtr->m_tuple_location= loopOpPtr.p->m_copy_tuple_location;
- } else if ((loopOpPtr.p->op_struct.op_type == ZDELETE) &&
- (loopOpPtr.p->prevActiveOp == RNIL)) {
- jam();
- //----------------------------------------------------------------------
- // There was only a delete. The original tuple still is ok.
- //----------------------------------------------------------------------
- } else {
- jam();
- //----------------------------------------------------------------------
- // There was another operation after the delete, this must be an insert
- // and we have found our copy tuple there.
- //----------------------------------------------------------------------
- loopOpPtr.i= loopOpPtr.p->prevActiveOp;
- c_operation_pool.getPtr(loopOpPtr);
- ndbrequire(loopOpPtr.p->op_struct.op_type == ZINSERT);
- ndbrequire(false);
- regOperPtr->m_tuple_location = loopOpPtr.p->m_copy_tuple_location;
- }
- }
- return true;
- }
-
int
Dbtup::load_diskpage(Signal* signal,
Uint32 opRec, Uint32 fragPtrI,
@@ -749,7 +588,7 @@
void Dbtup::execTUPKEYREQ(Signal* signal)
{
- TupKeyReq * const tupKeyReq= (TupKeyReq *)signal->getDataPtr();
+ TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
KeyReqStruct req_struct;
Uint32 sig1, sig2, sig3, sig4;
@@ -825,7 +664,8 @@
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4;
-
+ req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
+
sig1= tupKeyReq->attrBufLen;
sig2= tupKeyReq->applRef;
sig3= tupKeyReq->transId1;
@@ -840,6 +680,12 @@
req_struct.trans_id2= sig4;
req_struct.m_disk_page_ptr.i= disk_page;
+ sig1 = tupKeyReq->m_row_id_page_no;
+ sig2 = tupKeyReq->m_row_id_page_idx;
+
+ req_struct.m_row_id.m_page_no = sig1;
+ req_struct.m_row_id.m_page_idx = sig2;
+
Uint32 Roptype = regOperPtr->op_struct.op_type;
if (Rstoredid != ZNIL) {
@@ -992,13 +838,13 @@
void
Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
- Operationrec* const regOperPtr,
- Tablerec* const regTabPtr)
+ Operationrec* regOperPtr,
+ Tablerec* regTabPtr)
{
PagePtr page_ptr;
Uint32* ptr= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
- req_struct->m_page_ptr_p= page_ptr.p;
- req_struct->m_tuple_ptr= (Tuple_header*)ptr;
+ req_struct->m_page_ptr = page_ptr;
+ req_struct->m_tuple_ptr = (Tuple_header*)ptr;
ndbassert(! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE));
@@ -1017,10 +863,11 @@
/* ---------------------------------------------------------------- */
void Dbtup::sendTUPKEYCONF(Signal* signal,
KeyReqStruct *req_struct,
- Operationrec * const regOperPtr)
+ Operationrec * regOperPtr)
{
- TupKeyConf * const tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
+ TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
+ Uint32 Rcreate_rowid = req_struct->m_use_rowid;
Uint32 RuserPointer= regOperPtr->userpointer;
Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
Uint32 log_size= req_struct->log_size;
@@ -1034,6 +881,7 @@
tupKeyConf->writeLength= log_size;
tupKeyConf->noFiredTriggers= RnoFiredTriggers;
tupKeyConf->lastRow= last_row;
+ tupKeyConf->rowid = Rcreate_rowid;
EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
TupKeyConf::SignalLength);
@@ -1047,8 +895,8 @@
/* ----------------------------- READ ---------------------------- */
/* ---------------------------------------------------------------- */
int Dbtup::handleReadReq(Signal* signal,
- Operationrec* const regOperPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct)
{
Uint32 *dst;
@@ -1109,9 +957,9 @@
/* ---------------------------- UPDATE ---------------------------- */
/* ---------------------------------------------------------------- */
int Dbtup::handleUpdateReq(Signal* signal,
- Operationrec* const operPtrP,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* operPtrP,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
KeyReqStruct* req_struct,
bool disk)
{
@@ -1344,7 +1192,7 @@
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
- Tablerec* const regTabPtr,
+ Tablerec* regTabPtr,
KeyReqStruct *req_struct)
{
Uint32 tup_version = 1;
@@ -1367,6 +1215,7 @@
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
bool disk_insert = regOperPtr.p->is_first_operation() && disk;
+ bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
union {
Uint32 sizes[4];
@@ -1433,32 +1282,61 @@
/**
* Alloc memory
*/
+ Uint32 rowid = req_struct->m_use_rowid;
Uint32 frag_page_id = req_struct->frag_page_id;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
if(mem_insert)
{
- if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
+ if (!rowid)
{
- jam();
- if ((ptr= alloc_fix_rec(regFragPtr,
- regTabPtr,
- ®OperPtr.p->m_tuple_location,
- &frag_page_id)) == 0)
+ if (!varsize)
+ {
+ jam();
+ ptr= alloc_fix_rec(regFragPtr,
+ regTabPtr,
+ ®OperPtr.p->m_tuple_location,
+ &frag_page_id);
+ }
+ else
+ {
+ jam();
+ regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+ ptr= alloc_var_rec(regFragPtr, regTabPtr,
+ sizes[2+MM],
+ ®OperPtr.p->m_tuple_location,
+ &frag_page_id);
+ }
+ if (unlikely(ptr == 0))
{
goto mem_error;
}
- }
- else
+ req_struct->m_use_rowid = true;
+ }
+ else
{
- jam();
- regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
- if ((ptr= alloc_var_rec(regFragPtr, regTabPtr,
- sizes[2+MM],
- ®OperPtr.p->m_tuple_location,
- &frag_page_id, 0)) == 0)
- goto mem_error;
+ regOperPtr.p->m_tuple_location = req_struct->m_row_id;
+ if (!varsize)
+ {
+ jam();
+ ptr= alloc_fix_rowid(regFragPtr,
+ regTabPtr,
+ ®OperPtr.p->m_tuple_location,
+ &frag_page_id);
+ }
+ else
+ {
+ jam();
+ regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+ ptr= alloc_var_rowid(regFragPtr, regTabPtr,
+ sizes[2+MM],
+ ®OperPtr.p->m_tuple_location,
+ &frag_page_id);
+ }
+ if (unlikely(ptr == 0))
+ {
+ goto error;
+ }
}
-
real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
c_lqh->accminupdate(signal,
@@ -1466,10 +1344,11 @@
®OperPtr.p->m_tuple_location);
((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
- ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
+ ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC |
+ (varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
- else
+ else if(!rowid || !regOperPtr.p->is_first_operation())
{
int ret;
if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
@@ -1482,6 +1361,12 @@
{
return ret;
}
+ req_struct->m_use_rowid = false;
+ }
+ else
+ {
+ // no mem insert, but rowid
+ ndbrequire(false);
}
if (disk_insert)
@@ -1537,9 +1422,9 @@
/* ---------------------------- DELETE ---------------------------- */
/* ---------------------------------------------------------------- */
int Dbtup::handleDeleteReq(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
KeyReqStruct *req_struct)
{
// delete must set but not increment tupVersion
@@ -1593,8 +1478,8 @@
}
bool
-Dbtup::checkNullAttributes(KeyReqStruct * const req_struct,
- Tablerec* const regTabPtr)
+Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
+ Tablerec* regTabPtr)
{
// Implement checking of updating all not null attributes in an insert here.
Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
@@ -2501,7 +2386,7 @@
Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
const Uint32* src,
- const Uint32 * const tabDesc,
+ const Uint32 * tabDesc,
const Uint16* order)
{
char* dst_ptr= dst->m_data_ptr;
@@ -2550,7 +2435,7 @@
Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
const Uint32 *src_ptr= src->get_var_part_ptr(tabPtrP);
- const Uint32 * const desc= (Uint32*)req_struct->attr_descr;
+ const Uint32 * desc= (Uint32*)req_struct->attr_descr;
const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
@@ -2562,17 +2447,17 @@
KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
if(bits & Tuple_header::CHAINED_ROW)
{
- Ptr<Var_page> var_page;
+ Ptr<Page> var_page;
src_data= get_ptr(&var_page, * (Var_part_ref*)src_ptr);
step= 4;
sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >>
2;
- req_struct->m_varpart_page_ptr_p= var_page.p;
+ req_struct->m_varpart_page_ptr = var_page;
}
else
{
step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
sizes[MM]= (step + 3) >> 2;
- req_struct->m_varpart_page_ptr_p= (Var_page*)req_struct->m_page_ptr_p;
+ req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
}
dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
dst->m_offset_array_ptr= req_struct->var_pos_array;
@@ -2614,7 +2499,7 @@
Local_key key;
memcpy(&key, disk_ref, sizeof(key));
key.m_page_no= req_struct->m_disk_page_ptr.i;
- src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
+ src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
}
bits |= Tuple_header::DISK_INLINE;
@@ -2646,7 +2531,7 @@
void
Dbtup::prepare_read(KeyReqStruct* req_struct,
- Tablerec* const tabPtrP, bool disk)
+ Tablerec* tabPtrP, bool disk)
{
Tuple_header* ptr= req_struct->m_tuple_ptr;
@@ -2697,7 +2582,7 @@
Local_key key;
memcpy(&key, disk_ref, sizeof(key));
key.m_page_no= req_struct->m_disk_page_ptr.i;
- src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
+ src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
}
// Fix diskpart
req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
@@ -2796,22 +2681,23 @@
for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
{
Uint32 real= getRealpid(fragPtr.p, P);
- Var_page* page= (Var_page*)(cpage + real);
+ Var_page* page= (Var_page*)c_page_pool.getPtr(real);
for(Uint32 i=1; i<page->high_index; i++)
{
- Uint32 len= page->get_entry_len(i);
- if(len && !(len & Var_page::CHAIN))
+ Uint32 idx= page->get_index_word(i);
+ Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
+ if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
{
Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
Uint32 *part= ptr->get_var_part_ptr(regTabPtr);
if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
{
- assert(len == fix_sz + 1);
- Local_key tmp; tmp= *part;
- Ptr<Var_page> tmpPage;
+ ndbassert(len == fix_sz + 1);
+ Local_key tmp; tmp = *part;
+ Ptr<Page> tmpPage;
part= get_ptr(&tmpPage, *(Var_part_ref*)part);
- len= tmpPage.p->get_entry_len(tmp.m_page_idx);
+ len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
ndbassert(len >= ((sz + 3) >> 2));
}
@@ -2825,14 +2711,14 @@
c_operation_pool.getPtr(ptr->m_operation_ptr_i);
}
}
- else if(len)
+ else if(!(idx & Var_page::FREE))
{
/**
* Chain
*/
Uint32 *part= page->get_ptr(i);
Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
- ndbassert((len & ~Var_page::CHAIN) >= ((sz + 3) >> 2));
+ ndbassert(len >= ((sz + 3) >> 2));
}
else
{
@@ -2881,26 +2767,22 @@
else
{
if(0) printf("grow - ");
- Var_page* pageP= req_struct->m_varpart_page_ptr_p;
+ Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
+ Var_page* pageP= (Var_page*)pagePtr.p;
Uint32 idx, alloc, needed;
- if(! (bits & Tuple_header::CHAINED_ROW))
- {
- idx= regOperPtr->m_tuple_location.m_page_idx;
- alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
- ndbassert(!(pageP->get_entry_len(idx) & Var_page::CHAIN));
- needed= sizes[2+MM];
- }
- else
- {
- Local_key tmp;
- tmp= *org->get_var_part_ptr(regTabPtr);
- idx= tmp.m_page_idx;
- alloc= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
- if(!(pageP->get_entry_len(idx) & Var_page::CHAIN))
- ndbout << *pageP << endl;
- ndbassert(pageP->get_entry_len(idx) & Var_page::CHAIN);
- needed= sizes[2+MM] - fix_sz;
- }
+ Uint32 *refptr = org->get_var_part_ptr(regTabPtr);
+ ndbassert(bits & Tuple_header::CHAINED_ROW);
+
+ Local_key ref;
+ ref = *refptr;
+ idx= ref.m_page_idx;
+ alloc= pageP->get_entry_len(idx);
+#ifdef VM_TRACE
+ if(!pageP->get_entry_chain(idx))
+ ndbout << *pageP << endl;
+#endif
+ ndbassert(pageP->get_entry_chain(idx));
+ needed= sizes[2+MM] - fix_sz;
if(needed <= alloc)
{
@@ -2908,87 +2790,10 @@
ndbout_c(" no grow");
return 0;
}
- Uint32 add= needed - alloc;
copy_bits |= Tuple_header::MM_GROWN;
-
- if(pageP->free_space >= add)
- {
- jam();
- if(!pageP->is_space_behind_entry(idx, add))
- {
- if(0) printf("extra reorg");
- jam();
- /**
- * In this case we need to reorganise the page to fit. To ensure we
- * don't complicate matters we make a little trick here where we
- * fool the reorg_page to avoid copying the entry at hand and copy
- * that separately at the end. This means we need to copy it out of
- * the page before reorg_page to save the entry contents.
- */
- Uint32* copyBuffer= cinBuffer;
- memcpy(copyBuffer, pageP->get_ptr(idx), 4*alloc);
- pageP->set_entry_len(idx, 0);
- pageP->free_space += alloc;
- pageP->reorg((Var_page*)ctemp_page);
- memcpy(pageP->get_free_space_ptr(), copyBuffer, 4*alloc);
- pageP->set_entry_offset(idx, pageP->insert_pos);
- add += alloc;
- }
- pageP->grow_entry(idx, add);
- ndbassert((pageP->get_entry_len(idx) & Var_page::CHAIN) ==
- (bits & Tuple_header::CHAINED_ROW ? Var_page::CHAIN : 0));
- update_free_page_list(regFragPtr, pageP);
- }
- else
- {
- Local_key key;
-
- if(! (bits & Tuple_header::CHAINED_ROW))
- {
- assert(fix_sz < alloc);
- org->m_header_bits |= Tuple_header::CHAINED_ROW;
- Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr,
- needed - fix_sz, &key, &id,
- Var_page::CHAIN);
- assert(dst);
- ndbassert(key.m_page_no != pageP->physical_page_id);
- ndbassert(pageP->get_ptr(idx) == (Uint32*)org);
- Uint32 *ptr= org->get_var_part_ptr(regTabPtr);
-
- Uint32 old= pageP->get_entry_len(idx);
- memcpy(dst, ptr, 4*(old - fix_sz));
- * ptr = key.ref(); // store ref
-
- ndbassert((ptr - (Uint32*)org) + 1 == fix_sz + 1);
- pageP->shrink_entry(idx, fix_sz + 1); // var part ref
- //ndbout_c("%p->shrink_entry(%d, %d)", pageP, idx, fix_sz + 1);
- update_free_page_list(regFragPtr, pageP);
- }
- else
- {
- assert(sizes[2+MM] >= alloc);
- Uint32 id, *dst= alloc_var_rec(regFragPtr, regTabPtr,
- needed, &key, &id,
- Var_page::CHAIN);
- assert(dst);
- ndbassert(key.m_page_no != pageP->physical_page_id);
-
- // Alloc var_rec can reorg base page, so we need to refetch ptr
- Uint32 base_idx= regOperPtr->m_tuple_location.m_page_idx;
- org= (Tuple_header*)
- ((Var_page*)req_struct->m_page_ptr_p)->get_ptr(base_idx);
- Uint32 *ref= org->get_var_part_ptr(regTabPtr);
- Uint32 old_ref= *ref;
- Uint32 *src= pageP->get_ptr(idx);
-
- assert(alloc < needed);
- memcpy(dst, src, 4*alloc);
- *ref = key.ref();
-
- free_var_part(regFragPtr, regTabPtr,
- *(Var_part_ref*)&old_ref, Var_page::CHAIN);
- }
- }
+ if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr,
+ (Var_part_ref*)refptr, alloc, needed)))
+ return -1;
}
req_struct->m_tuple_ptr->m_header_bits = copy_bits;
return 0;
--- 1.3/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp 2005-11-07 12:19:07 +01:00
+++ 1.4/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp 2005-11-04 14:35:25 +01:00
@@ -73,7 +73,7 @@
/* FAILED. TRY ALLOCATING FROM NORMAL PAGE. */
/* ---------------------------------------------------------------- */
PagePtr pagePtr;
- pagePtr.i= regFragPtr->thFreeFirst;
+ pagePtr.i = regFragPtr->thFreeFirst.firstItem;
if (pagePtr.i == RNIL) {
/* ---------------------------------------------------------------- */
// No prepared tuple header page with free entries exists.
@@ -85,14 +85,15 @@
// We found empty pages on the fragment. Allocate an empty page and
// convert it into a tuple header page and put it in thFreeFirst-list.
/* ---------------------------------------------------------------- */
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr);
convertThPage(regTabPtr->m_offsets[MM].m_fix_header_size,
(Fix_page*)pagePtr.p);
- pagePtr.p->next_page = regFragPtr->thFreeFirst;
pagePtr.p->page_state = ZTH_MM_FREE;
- regFragPtr->thFreeFirst = pagePtr.i;
+ pagePtr.p->nextList = RNIL;
+ pagePtr.p->prevList = RNIL;
+ regFragPtr->thFreeFirst.firstItem = pagePtr.i;
} else {
ljam();
/* ---------------------------------------------------------------- */
@@ -106,7 +107,7 @@
/* THIS SHOULD BE THE COMMON PATH THROUGH THE CODE, FREE */
/* COPY PAGE EXISTED. */
/* ---------------------------------------------------------------- */
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr);
}
Uint32 page_offset= alloc_tuple_from_page(regFragPtr, (Fix_page*)pagePtr.p);
@@ -164,8 +165,8 @@
/* ARE MAINTAINED EVEN AFTER A SYSTEM CRASH. */
/* ---------------------------------------------------------------- */
ndbrequire(regPagePtr->page_state == ZTH_MM_FREE);
- regFragPtr->thFreeFirst = regPagePtr->next_page;
- regPagePtr->next_page = RNIL;
+ LocalDLList<Page> free_pages(c_page_pool, regFragPtr->thFreeFirst);
+ free_pages.remove((Page*)regPagePtr);
regPagePtr->page_state = ZTH_MM_FULL;
}
@@ -183,10 +184,73 @@
if(free == 1)
{
ljam();
+ PagePtr pagePtr = { (Page*)regPagePtr, key->m_page_no };
+ LocalDLList<Page> free_pages(c_page_pool, regFragPtr->thFreeFirst);
ndbrequire(regPagePtr->page_state == ZTH_MM_FULL);
regPagePtr->page_state = ZTH_MM_FREE;
- regPagePtr->next_page= regFragPtr->thFreeFirst;
- regFragPtr->thFreeFirst = key->m_page_no;
+ free_pages.add(pagePtr);
}
}//Dbtup::freeTh()
+Uint32*
+Dbtup::alloc_fix_rowid(Fragrecord* const regFragPtr,
+ Tablerec* const regTabPtr,
+ Local_key* key,
+ Uint32 * out_frag_page_id)
+{
+ Uint32 page_no = key->m_page_no;
+ Uint32 pages = regFragPtr->noOfPages;
+
+ if (page_no >= pages)
+ {
+ Uint32 start = pages;
+ while(page_no >= pages)
+ pages += (pages >> 3) + (pages >> 4) + 2;
+ allocFragPages(regFragPtr, pages - start);
+ if (page_no >= (pages = regFragPtr->noOfPages))
+ {
+ terrorCode = ZMEM_NOMEM_ERROR;
+ return 0;
+ }
+ }
+
+ PagePtr pagePtr;
+ pagePtr.i = getRealpid(regFragPtr, page_no);
+ c_page_pool.getPtr(pagePtr);
+
+ Uint32 state = pagePtr.p->page_state;
+ LocalDLList<Page> free_pages(c_page_pool, regFragPtr->thFreeFirst);
+ LocalDLList<Page> alloc_pages(c_page_pool, regFragPtr->emptyPrimPage);
+ switch(state){
+ case ZEMPTY_MM:
+ convertThPage(regTabPtr->m_offsets[MM].m_fix_header_size,
+ (Fix_page*)pagePtr.p);
+
+ pagePtr.p->page_state = ZTH_MM_FREE;
+ alloc_pages.remove(pagePtr);
+ free_pages.add(pagePtr);
+ // continue
+ case ZTH_MM_FREE:
+ Uint32 idx= key->m_page_idx;
+ if (((Fix_page*)pagePtr.p)->alloc_record(idx) != idx)
+ {
+ terrorCode = ZROWID_ALLOCATED;
+ return 0;
+ }
+
+ if(pagePtr.p->free_space == 0)
+ {
+ jam();
+ pagePtr.p->page_state = ZTH_MM_FULL;
+ free_pages.remove(pagePtr);
+ }
+
+ *out_frag_page_id= page_no;
+ key->m_page_no = pagePtr.i;
+ key->m_page_idx = idx;
+ return pagePtr.p->m_data + idx;
+ case ZTH_MM_FULL:
+ terrorCode = ZROWID_ALLOCATED;
+ return 0;
+ }
+}
--- 1.22/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2005-11-07 12:19:07 +01:00
+++ 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2005-11-04 10:22:51 +01:00
@@ -42,7 +42,6 @@
{
cnoOfAttrbufrec = ZNO_OF_ATTRBUFREC;
cnoOfFragrec = MAX_FRAG_PER_NODE;
- cnoOfPage = ZNO_OF_PAGE;
cnoOfFragoprec = MAX_FRAG_PER_NODE;
cnoOfPageRangeRec = ZNO_OF_PAGE_RANGE_REC;
c_maxTriggersPerTable = ZDEFAULT_MAX_NO_TRIGGERS_PER_TABLE;
@@ -106,7 +105,6 @@
fragoperrec = 0;
fragrecord = 0;
hostBuffer = 0;
- cpage = 0;
pageRange = 0;
tablerec = 0;
tableDescriptor = 0;
@@ -135,10 +133,6 @@
sizeof(HostBuffer),
MAX_NODES);
- deallocRecord((void **)&cpage,"Page",
- sizeof(Page),
- cnoOfPage);
-
deallocRecord((void **)&pageRange,"PageRange",
sizeof(PageRange),
cnoOfPageRangeRec);
@@ -173,7 +167,7 @@
case ZREPORT_MEMORY_USAGE:{
ljam();
static int c_currentMemUsed = 0;
- int now = (cnoOfAllocatedPages * 100)/cnoOfPage;
+ int now = (cnoOfAllocatedPages * 100)/c_page_pool.getSize();
const int thresholds[] = { 100, 90, 80, 0 };
Uint32 i = 0;
@@ -279,7 +273,6 @@
ndbrequire(p != 0);
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_FRAG, &cnoOfFragrec));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_PAGE, &cnoOfPage));
Uint32 noOfTriggers= 0;
@@ -326,12 +319,16 @@
void Dbtup::initRecords()
{
unsigned i;
+ Uint32 tmp;
+ const ndb_mgm_configuration_iterator * p =
+ theConfiguration.getOwnConfigIterator();
+ ndbrequire(p != 0);
+
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_PAGE, &tmp));
// Records with dynamic sizes
- cpage = (Page*)allocRecord("Page",
- sizeof(Page),
- cnoOfPage,
- false);
+ Page* ptr =(Page*)allocRecord("Page", sizeof(Page), tmp, false);
+ c_page_pool.set(ptr, tmp);
attrbufrec = (Attrbufrec*)allocRecord("Attrbufrec",
sizeof(Attrbufrec),
@@ -353,10 +350,6 @@
sizeof(TableDescriptor),
cnoOfTabDescrRec);
- Uint32 tmp;
- const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
- ndbrequire(p != 0);
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_OP_RECS, &tmp));
c_operation_pool.setSize(tmp);
@@ -531,6 +524,7 @@
for (regFragPtr.i = 0; regFragPtr.i < cnoOfFragrec; regFragPtr.i++) {
refresh_watch_dog();
ptrAss(regFragPtr, fragrecord);
+ new (regFragPtr.p) Fragrecord();
regFragPtr.p->nextfreefrag = regFragPtr.i + 1;
regFragPtr.p->fragStatus = IDLE;
}//for
--- 1.15/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp 2005-11-07 12:19:07 +01:00
+++ 1.16/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp 2005-11-04 08:12:53 +01:00
@@ -37,8 +37,7 @@
{
ljamEntry();
PagePtr pagePtr;
- pagePtr.i= pageId;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr, pageId);
Uint32 fragPageId= pagePtr.p->frag_page_id;
tupAddr= (fragPageId << MAX_TUPLES_BITS) | pageIndex;
}
@@ -115,8 +114,7 @@
tablePtr.i= fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
- pagePtr.i= pageId;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr, pageId);
Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset= AttributeOffset::getOffset(
tableDescriptor[attrDescIndex + 1].tabDescr);
@@ -491,8 +489,7 @@
break;
}
Uint32 realPageId= getRealpid(fragPtr.p, buildPtr.p->m_pageId);
- pagePtr.i= realPageId;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr, realPageId);
Uint32 pageState= pagePtr.p->page_state;
// skip empty page
if (pageState == ZEMPTY_MM) {
@@ -529,16 +526,12 @@
buildPtr.p->m_tupleNo= firstTupleNo;
break;
}
- Uint32 len= page_ptr->get_entry_len(pageIndex);
- if (len == 0) {
+ Uint32 word= page_ptr->get_index_word(pageIndex);
+ if (word & (Var_page::CHAIN | Var_page::FREE))
+ {
ljam();
buildPtr.p->m_tupleNo++;
break;
- }
- if (len & Var_page::CHAIN) {
- ljam();
- buildPtr.p->m_tupleNo++;
- break;
}
tuple_ptr = (Tuple_header*)page_ptr->get_ptr(pageIndex);
}
--- 1.16/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2005-11-07 12:19:07 +01:00
+++ 1.17/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2005-11-04 10:21:40 +01:00
@@ -133,12 +133,6 @@
return;
}
- regFragPtr.p->emptyPrimPage= RNIL;
- regFragPtr.p->thFreeFirst= RNIL;
- regFragPtr.p->free_var_page_array[0]= RNIL;
- regFragPtr.p->free_var_page_array[1]= RNIL;
- regFragPtr.p->free_var_page_array[2]= RNIL;
- regFragPtr.p->free_var_page_array[3]= RNIL;
regFragPtr.p->fragTableId= regTabPtr.i;
regFragPtr.p->fragmentId= fragId;
regFragPtr.p->m_tablespace_id= tablespace;
--- 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupPagMan.cpp 2005-11-07 12:19:07 +01:00
+++ 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupPagMan.cpp 2005-11-04 08:20:21 +01:00
@@ -121,10 +121,10 @@
cfreepageList[i] = RNIL;
}//for
PagePtr pagePtr;
- for (pagePtr.i = 0; pagePtr.i < cnoOfPage; pagePtr.i++) {
+ for (pagePtr.i = 0; pagePtr.i < c_page_pool.getSize(); pagePtr.i++) {
ljam();
refresh_watch_dog();
- ptrAss(pagePtr, cpage);
+ c_page_pool.getPtr(pagePtr);
pagePtr.p->physical_page_id= RNIL;
pagePtr.p->next_page = pagePtr.i + 1;
pagePtr.p->first_cluster_page = RNIL;
@@ -133,24 +133,20 @@
pagePtr.p->prev_page = RNIL;
pagePtr.p->page_state = ZFREE_COMMON;
}//for
- pagePtr.i = cnoOfPage - 1;
- ptrAss(pagePtr, cpage);
pagePtr.p->next_page = RNIL;
+ /**
+ * Page 0 cant be part of buddy as
+ * it will scan left right when searching for bigger blocks,
+ * if 0 is part of arrat, it can search to -1...which is not good
+ */
pagePtr.i = 0;
- ptrAss(pagePtr, cpage);
+ c_page_pool.getPtr(pagePtr);
pagePtr.p->page_state = ~ZFREE_COMMON;
- for(size_t j = 0; j<MAX_PARALLELL_TUP_SRREQ; j++){
- pagePtr.i = 1+j;
- ptrAss(pagePtr, cpage);
- pagePtr.p->page_state = ~ZFREE_COMMON;
- }
-
- Uint32 tmp = 1 + MAX_PARALLELL_TUP_SRREQ;
- returnCommonArea(tmp, cnoOfPage - tmp);
+ Uint32 tmp = 1;
+ returnCommonArea(tmp, c_page_pool.getSize() - tmp);
cnoOfAllocatedPages = tmp; // Is updated by returnCommonArea
- c_sr_free_page_0 = ~0;
}//Dbtup::initializePage()
void Dbtup::allocConsPages(Uint32 noOfPagesToAllocate,
@@ -234,7 +230,7 @@
while (allocPageRef > 0) {
ljam();
pageLastPtr.i = allocPageRef - 1;
- ptrCheckGuard(pageLastPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageLastPtr);
if (pageLastPtr.p->page_state != ZFREE_COMMON) {
ljam();
return;
@@ -272,10 +268,10 @@
ljam();
return;
}//if
- while ((allocPageRef + noPagesAllocated) < cnoOfPage) {
+ while ((allocPageRef + noPagesAllocated) < c_page_pool.getSize()) {
ljam();
pageFirstPtr.i = allocPageRef + noPagesAllocated;
- ptrCheckGuard(pageFirstPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageFirstPtr);
if (pageFirstPtr.p->page_state != ZFREE_COMMON) {
ljam();
return;
@@ -307,8 +303,7 @@
cnoOfAllocatedPages -= (1 << insList);
PagePtr pageLastPtr, pageInsPtr;
- pageInsPtr.i = insPageRef;
- ptrCheckGuard(pageInsPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageInsPtr, insPageRef);
ndbrequire(insList < 16);
pageLastPtr.i = (pageInsPtr.i + (1 << insList)) - 1;
@@ -316,8 +311,8 @@
pageInsPtr.p->prev_cluster_page = RNIL;
pageInsPtr.p->last_cluster_page = pageLastPtr.i;
cfreepageList[insList] = pageInsPtr.i;
-
- ptrCheckGuard(pageLastPtr, cnoOfPage, cpage);
+
+ c_page_pool.getPtr(pageLastPtr);
pageLastPtr.p->first_cluster_page = pageInsPtr.i;
pageLastPtr.p->next_page = RNIL;
}//Dbtup::insertCommonArea()
@@ -327,8 +322,7 @@
cnoOfAllocatedPages += (1 << list);
PagePtr pagePrevPtr, pageNextPtr, pageLastPtr, pageSearchPtr, remPagePtr;
- remPagePtr.i = remPageRef;
- ptrCheckGuard(remPagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(remPagePtr, remPageRef);
ndbrequire(list < 16);
if (cfreepageList[list] == remPagePtr.i) {
ljam();
@@ -336,14 +330,14 @@
pageNextPtr.i = cfreepageList[list];
if (pageNextPtr.i != RNIL) {
ljam();
- ptrCheckGuard(pageNextPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageNextPtr);
pageNextPtr.p->prev_cluster_page = RNIL;
}//if
} else {
pageSearchPtr.i = cfreepageList[list];
while (true) {
ljam();
- ptrCheckGuard(pageSearchPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageSearchPtr);
pagePrevPtr = pageSearchPtr;
pageSearchPtr.i = pageSearchPtr.p->next_cluster_page;
if (pageSearchPtr.i == remPagePtr.i) {
@@ -355,7 +349,7 @@
pagePrevPtr.p->next_cluster_page = pageNextPtr.i;
if (pageNextPtr.i != RNIL) {
ljam();
- ptrCheckGuard(pageNextPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageNextPtr);
pageNextPtr.p->prev_cluster_page = pagePrevPtr.i;
}//if
}//if
@@ -364,6 +358,6 @@
remPagePtr.p->prev_cluster_page= RNIL;
pageLastPtr.i = (remPagePtr.i + (1 << list)) - 1;
- ptrCheckGuard(pageLastPtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pageLastPtr);
pageLastPtr.p->first_cluster_page= RNIL;
}//Dbtup::removeCommonArea()
--- 1.3/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp 2005-11-07 12:19:07 +01:00
+++ 1.4/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp 2005-11-04 10:32:07 +01:00
@@ -91,20 +91,20 @@
Uint32 Dbtup::getEmptyPage(Fragrecord* const regFragPtr)
{
- Uint32 pageId = regFragPtr->emptyPrimPage;
+ Uint32 pageId = regFragPtr->emptyPrimPage.firstItem;
if (pageId == RNIL) {
ljam();
allocMoreFragPages(regFragPtr);
- pageId = regFragPtr->emptyPrimPage;
+ pageId = regFragPtr->emptyPrimPage.firstItem;
if (pageId == RNIL) {
ljam();
return RNIL;
}//if
}//if
PagePtr pagePtr;
- pagePtr.i = pageId;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
- regFragPtr->emptyPrimPage = pagePtr.p->next_page;
+ LocalDLList<Page> alloc_pages(c_page_pool, regFragPtr->emptyPrimPage);
+ alloc_pages.getPtr(pagePtr, pageId);
+ alloc_pages.remove(pagePtr);
return pageId;
}//Dbtup::getEmptyPage()
@@ -284,6 +284,22 @@
ljam();
ndbrequire(regPRPtr.i == regFragPtr->rootPageRange);
initFragRange(regFragPtr);
+ for (Uint32 i = 0; i<MAX_FREE_LIST; i++)
+ {
+ LocalDLList<Page> tmp(c_page_pool, regFragPtr->free_var_page_array[i]);
+ tmp.remove();
+ }
+
+ {
+ LocalDLList<Page> tmp(c_page_pool, regFragPtr->emptyPrimPage);
+ tmp.remove();
+ }
+
+ {
+ LocalDLList<Page> tmp(c_page_pool, regFragPtr->thFreeFirst);
+ tmp.remove();
+ }
+
return;
} else {
if (regPRPtr.p->type[indexPos] == ZNON_LEAF) {
@@ -327,7 +343,6 @@
void Dbtup::initFragRange(Fragrecord* const regFragPtr)
{
- regFragPtr->emptyPrimPage = RNIL;
regFragPtr->rootPageRange = RNIL;
regFragPtr->currentPageRange = RNIL;
regFragPtr->noOfPages = 0;
@@ -365,19 +380,21 @@
/* THOSE PAGES TO EMPTY_MM AND LINK THEM INTO THE EMPTY */
/* PAGE LIST OF THE FRAGMENT. */
/* ---------------------------------------------------------------- */
+ Uint32 prev = RNIL;
for (loopPagePtr.i = retPageRef; loopPagePtr.i < loopLimit; loopPagePtr.i++) {
ljam();
- ptrCheckGuard(loopPagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(loopPagePtr);
loopPagePtr.p->page_state = ZEMPTY_MM;
loopPagePtr.p->frag_page_id = startRange +
(loopPagePtr.i - retPageRef);
loopPagePtr.p->physical_page_id = loopPagePtr.i;
- loopPagePtr.p->next_page = loopPagePtr.i + 1;
+ loopPagePtr.p->nextList = loopPagePtr.i + 1;
+ loopPagePtr.p->prevList = prev;
+ prev = loopPagePtr.i;
}//for
- loopPagePtr.i = (retPageRef + noOfPagesAllocated) - 1;
- ptrCheckGuard(loopPagePtr, cnoOfPage, cpage);
- loopPagePtr.p->next_page = regFragPtr->emptyPrimPage;
- regFragPtr->emptyPrimPage = retPageRef;
+ loopPagePtr.p->nextList = RNIL;
+ LocalDLList<Page> alloc(c_page_pool, regFragPtr->emptyPrimPage);
+ alloc.add(retPageRef, loopPagePtr);
/* ---------------------------------------------------------------- */
/* WAS ENOUGH PAGES ALLOCATED OR ARE MORE NEEDED. */
/* ---------------------------------------------------------------- */
--- 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2005-11-07 12:19:07 +01:00
+++ 1.24/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2005-11-05 00:05:04 +01:00
@@ -1160,6 +1160,10 @@
outBuffer[2] = signal->theData[2];
outBuffer[3] = signal->theData[3];
return 4;
+ case AttributeHeader::ROWID:
+ outBuffer[0] = req_struct->frag_page_id;
+ outBuffer[1] = operPtr.p->m_tuple_location.m_page_idx;
+ return 2;
default:
return 0;
}
--- 1.28/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2005-11-07 12:19:08 +01:00
+++ 1.29/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2005-11-04 09:22:13 +01:00
@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <my_config.h>
#include "Suma.hpp"
#include <ndb_version.h>
@@ -3149,7 +3150,8 @@
Page_pos pos= bucket->m_buffer_head;
ndbrequire(pos.m_max_gci < gci);
- Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+ Buffer_page* page= (Buffer_page*)
+ m_tup->c_page_pool.getPtr(pos.m_page_id);
ndbout_c("takeover %d", pos.m_page_id);
page->m_max_gci = pos.m_max_gci;
page->m_words_used = pos.m_page_pos;
@@ -4091,7 +4093,7 @@
Bucket* bucket= c_buckets+buck;
Page_pos pos= bucket->m_buffer_head;
- Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+ Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
Uint32* ptr= page->m_data + pos.m_page_pos;
const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
@@ -4150,7 +4152,7 @@
pos.m_page_pos = sz;
pos.m_last_gci = gci;
- page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+ page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
page->m_next_page= RNIL;
ptr= page->m_data;
goto loop; //
@@ -4181,7 +4183,7 @@
if(tail != RNIL)
{
- Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+ Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
bucket->m_buffer_tail = page->m_next_page;
free_page(tail, page);
signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
@@ -4225,8 +4227,8 @@
Uint32 ref= m_first_free_page;
if(likely(ref != RNIL))
{
- m_first_free_page = ((Buffer_page*)m_tup->cpage+ref)->m_next_page;
- Uint32 chunk = ((Buffer_page*)m_tup->cpage+ref)->m_page_chunk_ptr_i;
+ m_first_free_page =
((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page;
+ Uint32 chunk =
((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
c_page_chunk_pool.getPtr(ptr, chunk);
ndbassert(ptr.p->m_free);
ptr.p->m_free--;
@@ -4249,7 +4251,7 @@
Buffer_page* page;
for(Uint32 i = 0; i<count; i++)
{
- page = (Buffer_page*)(m_tup->cpage+ref);
+ page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref);
page->m_page_state= SUMA_SEQUENCE;
page->m_page_chunk_ptr_i = ptr.i;
page->m_next_page = ++ref;
@@ -4313,7 +4315,7 @@
else
{
jam();
- Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+ Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
Uint32 max_gci = page->m_max_gci;
Uint32 next_page = page->m_next_page;
@@ -4406,7 +4408,7 @@
Bucket* bucket= c_buckets+buck;
Uint32 tail= bucket->m_buffer_tail;
- Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+ Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
Uint32 max_gci = page->m_max_gci;
Uint32 next_page = page->m_next_page;
Uint32 *ptr = page->m_data + pos;
--- 1.9/storage/ndb/src/kernel/vm/ArrayPool.hpp 2005-11-07 12:19:08 +01:00
+++ 1.10/storage/ndb/src/kernel/vm/ArrayPool.hpp 2005-11-04 09:17:42 +01:00
@@ -42,7 +42,8 @@
*
* Note, can currently only be called once
*/
- bool setSize(Uint32 noOfElements, bool align = false, bool exit_on_error = true);
+ bool setSize(Uint32 noOfElements, bool align = false, bool exit_on_error = true, bool
guard = true);
+ bool set(T*, Uint32 cnt, bool align = false);
inline Uint32 getNoOfFree() const {
return noOfFree;
@@ -202,7 +203,8 @@
theArray = 0;
alloc_ptr = 0;
#ifdef ARRAY_GUARD
- delete []theAllocatedBitmask;
+ if (theAllocatedBitmask)
+ delete []theAllocatedBitmask;
theAllocatedBitmask = 0;
#endif
}
@@ -216,7 +218,8 @@
template <class T>
inline
bool
-ArrayPool<T>::setSize(Uint32 noOfElements, bool align, bool exit_on_error){
+ArrayPool<T>::setSize(Uint32 noOfElements,
+ bool align, bool exit_on_error, bool guard){
if(size == 0){
if(noOfElements == 0)
return true;
@@ -257,9 +260,12 @@
firstFree = 0;
#ifdef ARRAY_GUARD
- bitmaskSz = (noOfElements + 31) >> 5;
- theAllocatedBitmask = new Uint32[bitmaskSz];
- BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask);
+ if (guard)
+ {
+ bitmaskSz = (noOfElements + 31) >> 5;
+ theAllocatedBitmask = new Uint32[bitmaskSz];
+ BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask);
+ }
#endif
return true;
@@ -270,21 +276,56 @@
ErrorReporter::handleAssert("ArrayPool<T>::setSize called twice", __FILE__,
__LINE__);
return false; // not reached
}
+
+template <class T>
+inline
+bool
+ArrayPool<T>::set(T* ptr, Uint32 cnt, bool align){
+ if (size == 0)
+ {
+ alloc_ptr = ptr;
+ if(align)
+ {
+ UintPtr p = (UintPtr)alloc_ptr;
+ UintPtr mod = p % sizeof(T);
+ if (mod)
+ {
+ p += sizeof(T) - mod;
+ cnt --;
+ }
+ theArray = (T *)p;
+ }
+ else
+ {
+ theArray = (T *)alloc_ptr;
+ }
+
+ size = cnt;
+ noOfFree = 0;
+ return true;
+ }
+ ErrorReporter::handleAssert("ArrayPool<T>::set called twice",
+ __FILE__, __LINE__);
+ return false; // not reached
+}
template <class T>
inline
void
ArrayPool<T>::getPtr(Ptr<T> & ptr){
Uint32 i = ptr.i;
- if(i < size){
+ if(likely (i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
@@ -296,15 +337,18 @@
void
ArrayPool<T>::getPtr(ConstPtr<T> & ptr) const {
Uint32 i = ptr.i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
@@ -316,15 +360,18 @@
void
ArrayPool<T>::getPtr(Ptr<T> & ptr, Uint32 i){
ptr.i = i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
@@ -336,15 +383,18 @@
void
ArrayPool<T>::getPtr(ConstPtr<T> & ptr, Uint32 i) const {
ptr.i = i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
@@ -355,18 +405,20 @@
inline
T *
ArrayPool<T>::getPtr(Uint32 i){
- if(i < size){
+ if(likely(i < size)){
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return &theArray[i];
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
- return 0;
-#else
- return &theArray[i];
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return &theArray[i];
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ return 0;
+ }
#endif
+ return &theArray[i];
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
return 0;
@@ -377,18 +429,20 @@
inline
const T *
ArrayPool<T>::getConstPtr(Uint32 i) const {
- if(i < size){
+ if(likely(i < size)){
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return &theArray[i];
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
- return 0;
-#else
- return &theArray[i];
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return &theArray[i];
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ return 0;
+ }
#endif
+ return &theArray[i];
} else {
ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
return 0;
@@ -400,15 +454,18 @@
void
ArrayPool<T>::getPtr(Ptr<T> & ptr, bool CrashOnBoundaryError){
Uint32 i = ptr.i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ptr.i = RNIL;
@@ -420,15 +477,18 @@
void
ArrayPool<T>::getPtr(ConstPtr<T> & ptr, bool CrashOnBoundaryError) const
{
Uint32 i = ptr.i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ptr.i = RNIL;
@@ -440,15 +500,18 @@
void
ArrayPool<T>::getPtr(Ptr<T> & ptr, Uint32 i, bool CrashOnBoundaryError){
ptr.i = i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ptr.i = RNIL;
@@ -461,15 +524,18 @@
ArrayPool<T>::getPtr(ConstPtr<T> & ptr, Uint32 i,
bool CrashOnBoundaryError) const {
ptr.i = i;
- if(i < size){
+ if(likely(i < size)){
ptr.p = &theArray[i];
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return;
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return;
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ }
#endif
} else {
ptr.i = RNIL;
@@ -480,18 +546,20 @@
inline
T *
ArrayPool<T>::getPtr(Uint32 i, bool CrashOnBoundaryError){
- if(i < size){
+ if(likely(i < size)){
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return &theArray[i];
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
- return 0;
-#else
- return &theArray[i];
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return &theArray[i];
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getPtr", __FILE__, __LINE__);
+ return 0;
+ }
#endif
+ return &theArray[i];
} else {
return 0;
}
@@ -501,18 +569,20 @@
inline
const T *
ArrayPool<T>::getConstPtr(Uint32 i, bool CrashOnBoundaryError) const {
- if(i < size){
+ if(likely(i < size)){
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
- return &theArray[i];
- /**
- * Getting a non-seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::getConstPtr", __FILE__,__LINE__);
- return 0;
-#else
- return &theArray[i];
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i))
+ return &theArray[i];
+ /**
+ * Getting a non-seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::getConstPtr", __FILE__,__LINE__);
+ return 0;
+ }
#endif
+ return &theArray[i];
} else {
return 0;
}
@@ -534,21 +604,23 @@
ptr.i = ff;
ptr.p = &theArray[ff];
#ifdef ARRAY_GUARD
- if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, ff)){
- BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, ff);
- noOfFree--;
- return true;
- } else {
- /**
- * Seizing an already seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::seize", __FILE__, __LINE__);
- return false;
+ if (theAllocatedBitmask)
+ {
+ if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, ff)){
+ BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, ff);
+ noOfFree--;
+ return true;
+ } else {
+ /**
+ * Seizing an already seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::seize", __FILE__, __LINE__);
+ return false;
+ }
}
-#else
+#endif
noOfFree--;
return true;
-#endif
}
ptr.i = RNIL;
ptr.p = NULL;
@@ -575,21 +647,23 @@
ptr.i = ff;
ptr.p = &theArray[ff];
#ifdef ARRAY_GUARD
- if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, ff)){
- BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, ff);
- noOfFree--;
- return true;
- } else {
- /**
- * Seizing an already seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::seizeId", __FILE__, __LINE__);
- return false;
+ if (theAllocatedBitmask)
+ {
+ if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, ff)){
+ BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, ff);
+ noOfFree--;
+ return true;
+ } else {
+ /**
+ * Seizing an already seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::seizeId", __FILE__, __LINE__);
+ return false;
+ }
}
-#else
+#endif
noOfFree--;
return true;
-#endif
}
ptr.i = RNIL;
ptr.p = NULL;
@@ -636,15 +710,18 @@
noOfFree -= n;
#ifdef ARRAY_GUARD
- for(Uint32 j = base; j<curr; j++){
- if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, j)){
- BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, j);
- } else {
- /**
- * Seizing an already seized element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::seize", __FILE__, __LINE__);
- return RNIL;
+ if (theAllocatedBitmask)
+ {
+ for(Uint32 j = base; j<curr; j++){
+ if(!BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, j)){
+ BitmaskImpl::set(bitmaskSz, theAllocatedBitmask, j);
+ } else {
+ /**
+ * Seizing an already seized element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::seize", __FILE__, __LINE__);
+ return RNIL;
+ }
}
}
#endif
@@ -669,14 +746,17 @@
const Uint32 end = base + n;
for(Uint32 i = base; i<end; i++){
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
- BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
- } else {
- /**
- * Relesing a already released element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
- return;
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
+ BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
+ } else {
+ /**
+ * Relesing a already released element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
+ return;
+ }
}
#endif
theArray[i].nextPool = i + 1;
@@ -697,19 +777,22 @@
noOfFree += n;
#ifdef ARRAY_GUARD
- Uint32 tmp = first;
- for(Uint32 i = 0; i<n; i++){
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, tmp)){
- BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, tmp);
- } else {
- /**
- * Relesing a already released element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::releaseList",
- __FILE__, __LINE__);
- return;
+ if (theAllocatedBitmask)
+ {
+ Uint32 tmp = first;
+ for(Uint32 i = 0; i<n; i++){
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, tmp)){
+ BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, tmp);
+ } else {
+ /**
+ * Relesing a already released element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::releaseList",
+ __FILE__, __LINE__);
+ return;
+ }
+ tmp = theArray[tmp].nextPool;
}
- tmp = theArray[tmp].nextPool;
}
#endif
return;
@@ -725,21 +808,24 @@
void
ArrayPool<T>::release(Uint32 _i){
const Uint32 i = _i;
- if(i < size){
+ if(likely(i < size)){
Uint32 ff = firstFree;
theArray[i].nextPool = ff;
firstFree = i;
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
- BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
- noOfFree++;
- return;
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
+ BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
+ noOfFree++;
+ return;
+ }
+ /**
+ * Relesing a already released element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
}
- /**
- * Relesing a already released element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
#endif
noOfFree++;
return;
@@ -755,22 +841,25 @@
void
ArrayPool<T>::release(Ptr<T> & ptr){
Uint32 i = ptr.i;
- if(i < size){
+ if(likely(i < size)){
Uint32 ff = firstFree;
theArray[i].nextPool = ff;
firstFree = i;
#ifdef ARRAY_GUARD
- if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
- BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
- //assert(noOfFree() == noOfFree2());
- noOfFree++;
- return;
+ if (theAllocatedBitmask)
+ {
+ if(BitmaskImpl::get(bitmaskSz, theAllocatedBitmask, i)){
+ BitmaskImpl::clear(bitmaskSz, theAllocatedBitmask, i);
+ //assert(noOfFree() == noOfFree2());
+ noOfFree++;
+ return;
+ }
+ /**
+ * Relesing a already released element
+ */
+ ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
}
- /**
- * Relesing a already released element
- */
- ErrorReporter::handleAssert("ArrayPool<T>::release", __FILE__, __LINE__);
#endif
noOfFree++;
return;
@@ -798,7 +887,7 @@
void
UnsafeArrayPool<T>::getPtrForce(Ptr<T> & ptr){
Uint32 i = ptr.i;
- if(i < this->size){
+ if(likely(i < this->size)){
ptr.p = &this->theArray[i];
} else {
ErrorReporter::handleAssert("UnsafeArrayPool<T>::getPtr",
@@ -811,7 +900,7 @@
void
UnsafeArrayPool<T>::getPtrForce(ConstPtr<T> & ptr) const{
Uint32 i = ptr.i;
- if(i < this->size){
+ if(likely(i < this->size)){
ptr.p = &this->theArray[i];
} else {
ErrorReporter::handleAssert("UnsafeArrayPool<T>::getPtr",
@@ -823,7 +912,7 @@
inline
T *
UnsafeArrayPool<T>::getPtrForce(Uint32 i){
- if(i < this->size){
+ if(likely(i < this->size)){
return &this->theArray[i];
} else {
ErrorReporter::handleAssert("UnsafeArrayPool<T>::getPtr",
@@ -836,7 +925,7 @@
inline
const T *
UnsafeArrayPool<T>::getConstPtrForce(Uint32 i) const {
- if(i < this->size){
+ if(likely(i < this->size)){
return &this->theArray[i];
} else {
ErrorReporter::handleAssert("UnsafeArrayPool<T>::getPtr",
@@ -850,7 +939,7 @@
void
UnsafeArrayPool<T>::getPtrForce(Ptr<T> & ptr, Uint32 i){
ptr.i = i;
- if(i < this->size){
+ if(likely(i < this->size)){
ptr.p = &this->theArray[i];
return ;
} else {
@@ -864,7 +953,7 @@
void
UnsafeArrayPool<T>::getPtrForce(ConstPtr<T> & ptr, Uint32 i) const{
ptr.i = i;
- if(i < this->size){
+ if(likely(i < this->size)){
ptr.p = &this->theArray[i];
return ;
} else {
--- 1.4/storage/ndb/src/kernel/vm/DLList.hpp 2005-11-07 12:19:08 +01:00
+++ 1.5/storage/ndb/src/kernel/vm/DLList.hpp 2005-11-04 09:14:46 +01:00
@@ -91,6 +91,12 @@
* @NOTE MUST be seized from correct pool
*/
void add(Ptr<T> &);
+
+ /**
+ * Add a list to list
+ * @NOTE all elements _must_ be correctly initilized correctly wrt next/prev
+ */
+ void add(Uint32 first, Ptr<T> & last);
/**
* Remove object from list
@@ -98,6 +104,13 @@
* @NOTE Does not return it to pool
*/
void remove(Ptr<T> &);
+
+ /**
+ * Remove object from list
+ *
+ * @NOTE Does not return it to pool
+ */
+ void remove(T*);
/**
* Update i & p value according to <b>i</b>
@@ -256,19 +269,42 @@
template <class T, class U>
inline
void
+DLList<T,U>::add(Uint32 first, Ptr<T> & lastPtr)
+{
+ Uint32 ff = head.firstItem;
+
+ head.firstItem = first;
+ lastPtr.p->U::nextList = ff;
+
+ if(ff != RNIL){
+ T * t2 = thePool.getPtr(ff);
+ t2->U::prevList = lastPtr.i;
+ }
+}
+
+template <class T, class U>
+inline
+void
DLList<T,U>::remove(Ptr<T> & p){
- T * t = p.p;
+ remove(p.p);
+}
+
+template <class T, class U>
+inline
+void
+DLList<T,U>::remove(T * p){
+ T * t = p;
Uint32 ni = t->U::nextList;
Uint32 pi = t->U::prevList;
if(ni != RNIL){
- T * t = thePool.getPtr(ni);
- t->U::prevList = pi;
+ T * tn = thePool.getPtr(ni);
+ tn->U::prevList = pi;
}
if(pi != RNIL){
- T * t = thePool.getPtr(pi);
- t->U::nextList = ni;
+ T * tp = thePool.getPtr(pi);
+ tp->U::nextList = ni;
} else {
head.firstItem = ni;
}
--- 1.4/storage/ndb/src/kernel/vm/SLList.hpp 2005-11-07 12:19:08 +01:00
+++ 1.5/storage/ndb/src/kernel/vm/SLList.hpp 2005-11-21 12:19:05 +01:00
@@ -120,6 +120,19 @@
head.firstItem = p.i;
}
+ /**
+ * Add a list to list
+ * @NOTE all elements _must_ be correctly initilized correctly wrt next/prev
+ */
+ void add(Uint32 first, Ptr<T> & last);
+
+ /**
+ * Remove object from list
+ *
+ * @NOTE Does not return it to pool
+ */
+ bool remove_front(Ptr<T> &);
+
Uint32 noOfElements() const {
Uint32 c = 0;
Uint32 i = head.firstItem;
@@ -244,6 +257,28 @@
void
SLList<T,U>::remove(){
head.firstItem = RNIL;
+}
+
+template <class T, class U>
+inline
+bool
+SLList<T,U>::remove_front(Ptr<T> & p){
+ p.i = head.firstItem;
+ if (p.i != RNIL)
+ {
+ p.p = thePool.getPtr(p.i);
+ head.firstItem = p.p->U::nextList;
+ return true;
+ }
+ return false;
+}
+
+template <class T, class U>
+inline
+void
+SLList<T,U>::add(Uint32 first, Ptr<T> & last){
+ last.p->U::nextList = head.firstItem;
+ head.firstItem = first;
}
template <class T, class U>
--- 1.6/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2005-11-07 12:19:07 +01:00
+++ 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2005-11-21 12:19:05 +01:00
@@ -227,7 +227,7 @@
{
ScanOp& scan = *scanPtr.p;
// set to first fragment, first page, first tuple
- const Uint32 first_page_idx = scan.m_bits & ScanOp::SCAN_VS ? 1 : 0;
+ const Uint32 first_page_idx = 0;
PagePos& pos = scan.m_scanPos;
pos.m_key.m_page_no = 0;
pos.m_key.m_page_idx = first_page_idx;
@@ -254,7 +254,9 @@
tablePtr.i = scan.m_tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
Fragrecord& frag = *fragPtrP;
- const Uint32 first_page_idx = bits & ScanOp::SCAN_VS ? 1 : 0;
+ const Uint32 first_page_idx = 0;
+ const Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size +
+ (bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1 : 0);
while (true) {
// TODO time-slice here after X loops
jam();
@@ -266,8 +268,7 @@
break;
}
Uint32 realPageId = getRealpid(fragPtrP, key.m_page_no);
- pagePtr.i = realPageId;
- ptrCheckGuard(pagePtr, cnoOfPage, cpage);
+ c_page_pool.getPtr(pagePtr, realPageId);
Uint32 pageState = pagePtr.p->page_state;
// skip empty page
if (pageState == ZEMPTY_MM) {
@@ -279,53 +280,24 @@
}
// get next tuple
const Tuple_header* th = 0;
- if (! (bits & ScanOp::SCAN_VS)) {
- Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size;
- if (pos.m_match)
- key.m_page_idx += tupheadsize;
- pos.m_match = true;
- if (key.m_page_idx + tupheadsize > Fix_page::DATA_WORDS) {
- jam();
- key.m_page_no++;
- key.m_page_idx = first_page_idx;
- pos.m_match = false;
- continue;
- }
- th = (Tuple_header*)&pagePtr.p->m_data[key.m_page_idx];
- // skip over free tuple
- if (th->m_header_bits & Tuple_header::FREE) {
- jam();
- continue;
- }
- } else {
- Var_page* page_ptr = (Var_page*)pagePtr.p;
- if (pos.m_match)
- key.m_page_idx += 1;
- pos.m_match = true;
- if (key.m_page_idx >= page_ptr->high_index) {
- jam();
- key.m_page_no++;
- key.m_page_idx = first_page_idx;
- pos.m_match = false;
- continue;
- }
-
- Uint32 len= page_ptr->get_entry_len(key.m_page_idx);
- if (len == 0)
- {
- // skip empty slot or
- jam();
- continue;
- }
- if(len & Var_page::CHAIN)
- {
- // skip varpart chain
- jam();
- continue;
- }
- th = (Tuple_header*)page_ptr->get_ptr(key.m_page_idx);
+ if (pos.m_match)
+ key.m_page_idx += tupheadsize;
+ pos.m_match = true;
+ if (key.m_page_idx + tupheadsize > Fix_page::DATA_WORDS) {
+ jam();
+ key.m_page_no++;
+ key.m_page_idx = first_page_idx;
+ pos.m_match = false;
+ continue;
}
-
+ th = (Tuple_header*)
+ ((Fix_page*)pagePtr.p)->get_ptr(key.m_page_idx, tupheadsize);
+ // skip over free tuple
+ if (th->m_header_bits & Tuple_header::FREE) {
+ jam();
+ continue;
+ }
+
if(bits & ScanOp::SCAN_LCP &&
th->m_header_bits & Tuple_header::LCP_SKIP)
{
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.1942) | jonas | 21 Nov |