#At file:///home/ms136779/mysql-telco/mysql-5.1-telco-7.0-orig/ based on revid:frazer@stripped
3197 Maitrayi Sabaratnam 2009-11-09
Committing varsize disk tuples
modified:
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp
storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-11-09 09:42:56 +0000
@@ -363,6 +363,7 @@ public:
Pgman* c_pgman;
// copy of pgman.m_ptr set after each get_page
Ptr<GlobalPage> m_pgman_ptr;
+ Ptr<GlobalPage> m_pgman_ptr_orig;
enum CallbackIndex {
// lgman
@@ -435,14 +436,16 @@ typedef Ptr<Fragoperrec> FragoperrecPtr;
Uint32 newNoOfAttrs;
Uint32 newNoOfCharsets;
Uint32 newNoOfKeyAttrs;
- Uint32 noOfDynNullBits;
- Uint32 noOfDynVar;
- Uint32 noOfDynFix;
- Uint32 noOfDynamic;
+ Uint32 noOfDynNullBits[2];
+ Uint32 noOfDynVar[2];
+ Uint32 noOfDynFix[2];
+ Uint32 noOfDynamic[2];
Uint32 tabDesOffset[7];
Uint32 tableDescriptor;
Uint32 dynTabDesOffset[3];
- Uint32 dynTableDescriptor;
+ Uint32 dynDesAllocSize[2];
+ Uint32 dynTableDescriptor[2];
+ Uint16 disk_tuple_format;
};
typedef Ptr<AlterTabOperation> AlterTabOperationPtr;
@@ -802,6 +805,7 @@ struct Operationrec {
Uint32 savepointId;
Uint32 m_commit_disk_callback_page;
};
+ Uint32 m_commit_disk_callback_page_orig;
/*
* State variables on connection.
@@ -825,6 +829,7 @@ struct Operationrec {
unsigned int m_disk_preallocated : 1;
unsigned int m_load_diskpage_on_commit : 1;
unsigned int m_wait_log_buffer : 1;
+ unsigned int m_load_diskpage_orig_on_commit : 1;
};
union {
OpBitFields op_struct;
@@ -942,6 +947,8 @@ ArrayPool<TupTriggerData> c_triggerPool;
STATIC_CONST( DD = 1 );
STATIC_CONST( DYN_BM_LEN_BITS = 8 );
STATIC_CONST( DYN_BM_LEN_MASK = ((1 << DYN_BM_LEN_BITS) - 1));
+ STATIC_CONST( DISK_TUPLE_FIXED_SIZE = 0 );
+ STATIC_CONST( DISK_TUPLE_VAR_SIZE = 1 );
struct Tablerec {
Tablerec(ArrayPool<TupTriggerData> & triggerPool) :
@@ -964,16 +971,16 @@ ArrayPool<TupTriggerData> c_triggerPool;
_after_ seeing all columns, hence must be separate from the readKeyArray
et al descriptor, which is allocated before seeing columns.
*/
- Uint32 dynTabDescriptor;
+ Uint32 dynTabDescriptor[2];
/* Mask of variable-sized dynamic attributes. */
- Uint32* dynVarSizeMask;
+ Uint32* dynVarSizeMask[2];
/*
Mask of fixed-sized dynamic attributes. There is one bit set for each
32-bit word occupied by fixed-size attributes, so fixed-size dynamic
attributes >32bit have multiple bits here.
*/
- Uint32* dynFixSizeMask;
+ Uint32* dynFixSizeMask[2];
ReadFunction* readFunctionArray;
UpdateFunction* updateFunctionArray;
@@ -1019,7 +1026,9 @@ ArrayPool<TupTriggerData> c_triggerPool;
Uint16 m_no_of_disk_attributes;
Uint16 noOfKeyAttr;
Uint16 noOfCharsets;
- Uint16 m_dyn_null_bits;
+ Uint16 m_dyn_null_bits[2];
+
+ Uint16 m_disk_tuple_format;
bool need_expand() const {
return m_no_of_attributes > m_attributes[MM].m_no_of_fixsize;
@@ -1035,14 +1044,14 @@ ArrayPool<TupTriggerData> c_triggerPool;
return
m_attributes[MM].m_no_of_varsize > 0 ||
m_attributes[MM].m_no_of_dynamic > 0 ||
- m_attributes[DD].m_no_of_varsize > 0;
+ m_attributes[DD].m_no_of_dynamic > 0;
}
bool need_shrink(bool disk) const {
return
m_attributes[MM].m_no_of_varsize > 0 ||
m_attributes[MM].m_no_of_dynamic > 0 ||
- (disk && m_attributes[DD].m_no_of_varsize > 0);
+ (disk && m_attributes[DD].m_no_of_dynamic > 0);
}
/**
@@ -1420,6 +1429,13 @@ typedef Ptr<HostBuffer> HostBufferPtr;
STATIC_CONST( FREE = 0x02800000 ); // Is free
STATIC_CONST( VAR_PART = 0x04000000 ); // Is there a varpart
STATIC_CONST( REORG_MOVE = 0x08000000 );
+ STATIC_CONST( DISK_SHRINK = 0x10000000 ); // disk tuple has shrunk
+ /* Disk tuple has grown, found enough space on the same page. */
+ STATIC_CONST( DISK_GROW= 0x20000000 );
+ /* Disk tuple is moved to another page,
+ due to lack of space on the same page.
+ */
+ STATIC_CONST( DISK_MOVE = 0x40000000 );
Tuple_header() {}
Uint32 get_tuple_version() const {
@@ -1470,12 +1486,85 @@ typedef Ptr<HostBuffer> HostBufferPtr;
return m_data + (tabPtrP->m_bits & Tablerec::TR_Checksum);
}
- Uint32 *get_dd_gci(const Tablerec* tabPtrP, Uint32 mm){
+ Uint32 *get_dd_gci(const Tablerec* tabPtrP){
assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
return m_data;
}
+
+ Uint32* get_end_of_fix_part_ptr_dd(const Tablerec* tabPtrP) {
+ assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
+ Uint32 nullwds = tabPtrP->m_offsets[DD].m_null_words;
+ return m_data + (tabPtrP->m_bits & Tablerec::TR_RowGCI) + nullwds;
+ }
+
+ const Uint32* get_end_of_fix_part_ptr_dd(const Tablerec* tabPtrP) const {
+ assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
+ Uint32 nullwds = tabPtrP->m_offsets[DD].m_null_words;
+ return m_data + (tabPtrP->m_bits & Tablerec::TR_RowGCI) + nullwds;
+ }
+
+ Uint32* get_end_of_fix_part_ptr_dd_with_dlen(const Tablerec* tabPtrP) {
+ Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+ Varpart_copy* dp= (Varpart_copy*)(eofp);
+ return dp->m_data;
+ }
+
+ const Uint32* get_end_of_fix_part_ptr_dd_with_dlen(const Tablerec* tabPtrP) const {
+ const Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+ Varpart_copy* dp= (Varpart_copy*)(eofp);
+ return dp->m_data;
+ }
+
+ Uint32 get_fix_size_dd_with_dlen(const Tablerec* tabPtrP) {
+ Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+ Varpart_copy* dp= (Varpart_copy*)(eofp);
+ return (dp->m_data - (Uint32 *)this);
+ }
+
+ Uint32 get_fix_size_dd(const Tablerec* tabPtrP) {
+ Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+ return (eofp - (Uint32 *)this);
+ }
};
+ void Dbtup::debug_bits(char *text, Uint32 bits)
+ {
+ ndbout_c("%s", text);
+ if (bits & Tuple_header::COPY_TUPLE)
+ ndbout_c("COPY_TUPLE");
+ if (bits & Tuple_header::DISK_PART)
+ ndbout_c("DISK_PART");
+ if (bits & Tuple_header::DISK_ALLOC)
+ ndbout_c("DISK_ALLOC");
+ if (bits & Tuple_header::DISK_INLINE)
+ ndbout_c("DISK_INLINE");
+ if (bits & Tuple_header::ALLOC)
+ ndbout_c("ALLOC");
+ if (bits & Tuple_header::MM_SHRINK)
+ ndbout_c("MM_SHRINK");
+ if (bits & Tuple_header::MM_GROWN)
+ ndbout_c("MM_GROWN");
+ if (bits & Tuple_header::FREED)
+ ndbout_c("FREED");
+ if (bits & Tuple_header::LCP_SKIP)
+ ndbout_c("LCP_SKIP");
+ if (bits & Tuple_header::LCP_KEEP)
+ ndbout_c("LCP_KEEP");
+ if (bits & Tuple_header::FREE)
+ ndbout_c("FREE");
+ if (bits & Tuple_header::VAR_PART)
+ ndbout_c("VAR_PART");
+ if (bits & Tuple_header::REORG_MOVE)
+ ndbout_c("REORG_MOVE");
+
+ if (bits & Tuple_header::DISK_SHRINK)
+ ndbout_c("DISK_SHRINK");
+ if (bits & Tuple_header::DISK_GROW)
+ ndbout_c("DISK_GROW");
+ if (bits & Tuple_header::DISK_MOVE)
+ ndbout_c("DISK_MOVE");
+ }
+
/**
* Format of varpart after insert/update
*/
@@ -1602,7 +1691,7 @@ struct KeyReqStruct {
* was updating this record.
*/
Bitmask<MAXNROFATTRIBUTESINWORDS> changeMask;
- Uint16 var_pos_array[2*MAX_ATTRIBUTES_IN_TABLE + 1];
+ Uint16 var_pos_array[4*MAX_ATTRIBUTES_IN_TABLE + 1];
OperationrecPtr prevOpPtr;
};
@@ -1963,6 +2052,15 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
+ int handleUpdateReq_dd_var(Signal* signal,
+ Ptr<Operationrec> regOperRecPtr,
+ Ptr<Fragrecord> fragPtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct* req_struct,
+ bool disk);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
int handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord>,
@@ -2411,6 +2509,7 @@ private:
Uint32 noOfCharsets,
Uint32 & charsetIndex, Uint32 & attrDes2);
void computeTableMetaData(Tablerec *regTabPtr);
+ void computeTableMetaData_dd_var(Tablerec*);
//------------------------------------------------------------------
//------------------------------------------------------------------
@@ -2782,8 +2881,10 @@ private:
const Uint32* offset);
void setupDynDescriptorReferences(Uint32 dynDescr,
Tablerec* const regTabPtr,
- const Uint32* offset);
+ const Uint32* offset,
+ Uint32 ind=0);
void setUpKeyArray(Tablerec* regTabPtr);
+ void setUpKeyArray_dd_var(Tablerec*);
bool addfragtotab(Tablerec* regTabPtr, Uint32 fragId, Uint32 fragIndex);
void deleteFragTab(Tablerec* regTabPtr, Uint32 fragId);
void abortAddFragOp(Signal* signal);
@@ -2956,6 +3057,7 @@ private:
// Public methods
Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
void free_var_rec(Fragrecord*, Tablerec*, Local_key*, Ptr<Page>);
+ Uint16 grow_record_on_same_page(PagePtr, Uint16,Uint32, Uint32);
Uint32* alloc_var_part(Fragrecord*, Tablerec*, Uint32, Local_key*);
Uint32 *realloc_var_part(Fragrecord*, Tablerec*,
PagePtr, Var_part_ref*, Uint32, Uint32);
@@ -3081,8 +3183,20 @@ private:
void expand_tuple(KeyReqStruct*, Uint32 sizes[4], Tuple_header*org,
const Tablerec*, bool disk);
+ void expand_tuple_dd_fixed(KeyReqStruct*, Uint32 sizes[2],
+ const Tablerec*, Uint32,
+ const Uint32*, Uint32*,
+ const Uint32 *);
+ void expand_tuple_dd_var(KeyReqStruct*, Uint32 sizes[2],
+ const Tablerec*, Uint32,
+ const Uint32*, Uint32*,
+ const Uint32 *, const Uint16*,
+ const Uint32*);
+
void shrink_tuple(KeyReqStruct*, Uint32 sizes[2], const Tablerec*,
bool disk);
+ void shrink_tuple_dd_var(KeyReqStruct*, Uint32 sizes[2],
+ const Tablerec*, bool disk);
Uint32* get_ptr(Var_part_ref);
Uint32* get_ptr(PagePtr*, Var_part_ref);
@@ -3123,6 +3237,13 @@ private:
Ptr<Page>, Uint32, Uint32);
void disk_page_prealloc_transit_page(Disk_alloc_info&,
Ptr<Page_request>, Uint32, Uint32);
+
+ /* Prealloc the space needed for the expantion of a tuple
+ on the current page.
+ */
+ bool disk_page_prealloc_on_same_page(Signal*, Fragrecord*,
+ Local_key, PagePtr,
+ Uint32, Uint32);
void disk_page_abort_prealloc(Signal*, Fragrecord*,Local_key*, Uint32);
void disk_page_abort_prealloc_callback(Signal*, Uint32, Uint32);
@@ -3138,10 +3259,13 @@ private:
void disk_page_alloc(Signal*,
Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
+ void disk_page_realloc_on_same_page(Fragrecord*, PagePtr,
+ Local_key*, Uint32, Uint32, Uint32, bool);
void disk_page_free(Signal*,
Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
void disk_page_commit_callback(Signal*, Uint32 opPtrI, Uint32 page_id);
+ void disk_page_commit_callback_orig(Signal*, Uint32, Uint32);
void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32);
@@ -3236,6 +3360,21 @@ private:
const Dbtup::ScanOp& op);
void commit_operation(Signal*, Uint32, Tuple_header*, PagePtr,
Operationrec*, Fragrecord*, Tablerec*);
+ int retrieve_data_page(Signal*, bool,
+ Page_cache_client::Request,
+ OperationrecPtr);
+ int retrieve_log_page(Signal*, FragrecordPtr, OperationrecPtr);
+
+ int start_commit_dd_fixed(Signal*, OperationrecPtr, TablerecPtr,
+ FragrecordPtr, Tuple_header*);
+ int start_commit_dd_var(Signal*, OperationrecPtr, TablerecPtr,
+ FragrecordPtr, Tuple_header*);
+ void commit_operation_dd_fixed(Signal*, Operationrec*, Fragrecord*,
+ Tablerec*, Tuple_header*,
+ Tuple_header*, Tuple_header*, Uint32);
+ void commit_operation_dd_var(Signal*, Operationrec*, Fragrecord*,
+ Tablerec*, Tuple_header*,
+ Tuple_header*, Local_key, Uint32);
void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*,
Operationrec*, Fragrecord*, Tablerec*);
@@ -3246,6 +3385,15 @@ private:
Fragrecord* regFragPtr,
Tablerec* regTabPtr,
Uint32 sizes[4]);
+
+ int handle_size_change_after_update_disk(Signal*,
+ KeyReqStruct* req_struct,
+ Tuple_header* org,
+ Ptr<Operationrec> regOperRecPtr,
+ Ptr<Fragrecord> fragPtr,
+ Tablerec* regTabPtr,
+ Uint32 sizes[4]);
+
int optimize_var_part(KeyReqStruct* req_struct,
Tuple_header* org,
Operationrec* regOperPtr,
@@ -3257,6 +3405,10 @@ private:
* req_struct->m_tuple_ptr is set to tuple to read
*/
void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
+ void prepare_read_dd_fixed(KeyReqStruct*, Tablerec* const,
+ const Uint32*);
+ void prepare_read_dd_var(KeyReqStruct*, Tablerec* const,
+ const Uint32*);
/* For debugging, dump the contents of a tuple. */
void dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2009-08-17 07:36:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp 2009-11-09 09:42:56 +0000
@@ -46,13 +46,38 @@ Dbtup::do_tup_abort_operation(Signal* si
{
Tuple_header *copy=
get_copy_tuple(tablePtrP, &opPtrP->m_copy_tuple_location);
+ Uint32 copybits = copy->m_header_bits;
if (opPtrP->op_struct.m_disk_preallocated)
{
jam();
- Local_key key;
- memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
- disk_page_abort_prealloc(signal, fragPtrP, &key, key.m_page_idx);
+ if (tablePtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+ opPtrP->is_last_operation())
+ {
+ jam();
+ ndbassert(!(copybits & Tuple_header::DISK_SHRINK));
+ ndbassert(!opPtrP->m_copy_tuple_location.isNull());
+ ndbassert(copybits & Tuple_header::DISK_ALLOC ||
+ copybits & Tuple_header::DISK_MOVE ||
+ copybits & Tuple_header::DISK_GROW);
+
+ Local_key key;
+ memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
+ Uint32 sz = key.m_page_idx;
+ if (copybits & Tuple_header::DISK_ALLOC ||
+ copybits & Tuple_header::DISK_MOVE)
+ {
+ sz += 1;
+ }
+ disk_page_abort_prealloc(signal, fragPtrP, &key, sz);
+ }
+ else
+ {
+ jam();
+ Local_key key;
+ memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
+ disk_page_abort_prealloc(signal, fragPtrP, &key, key.m_page_idx);
+ }
}
if(! (bits & Tuple_header::ALLOC))
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2009-10-20 16:10:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2009-11-09 09:42:56 +0000
@@ -113,6 +113,7 @@ void Dbtup::initOpConnection(Operationre
regOperPtr->op_struct.op_type= ZREAD;
regOperPtr->op_struct.m_disk_preallocated= 0;
regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+ regOperPtr->op_struct.m_load_diskpage_orig_on_commit = 0;
regOperPtr->op_struct.m_wait_log_buffer= 0;
regOperPtr->op_struct.in_active_list = false;
regOperPtr->m_undo_buffer_space= 0;
@@ -239,6 +240,9 @@ Dbtup::commit_operation(Signal* signal,
Uint32 save= tuple_ptr->m_operation_ptr_i;
Uint32 bits= tuple_ptr->m_header_bits;
+ // Save the key of the original disk_tuple
+ Local_key key_orig;
+ memcpy(&key_orig, tuple_ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
Tuple_header *disk_ptr= 0;
Tuple_header *copy=
get_copy_tuple(regTabPtr, ®OperPtr->m_copy_tuple_location);
@@ -327,43 +331,20 @@ Dbtup::commit_operation(Signal* signal,
(copy_bits & Tuple_header::DISK_INLINE))
{
jam();
- Local_key key;
- memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
- Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
- PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
- ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
- ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
- Uint32 sz, *dst;
- if(copy_bits & Tuple_header::DISK_ALLOC)
- {
- jam();
- disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
- }
-
- if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- jam();
- sz= regTabPtr->m_offsets[DD].m_fix_header_size;
- dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
+ commit_operation_dd_var(signal,regOperPtr, regFragPtr,
+ regTabPtr, tuple_ptr,
+ disk_ptr, key_orig, gci);
}
else
{
- jam();
- dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
- sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
- }
-
- if(! (copy_bits & Tuple_header::DISK_ALLOC))
- {
- jam();
- disk_page_undo_update(diskPagePtr.p,
- &key, dst, sz, gci, logfile_group_id);
+ commit_operation_dd_fixed(signal,regOperPtr, regFragPtr,
+ regTabPtr, tuple_ptr,
+ disk_ptr, copy, gci);
}
-
- memcpy(dst, disk_ptr, 4*sz);
- memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
-
+
ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
copy_bits |= Tuple_header::DISK_PART;
}
@@ -382,10 +363,23 @@ Dbtup::commit_operation(Signal* signal,
}
}
- Uint32 clear=
+ Uint32 clear= (Uint32)0;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ clear=
+ Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
+ Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE |
+ Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN |
+ Tuple_header::DISK_SHRINK | Tuple_header::DISK_GROW |
+ Tuple_header::DISK_MOVE;
+ }
+ else
+ {
+ clear=
Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE |
Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN;
+ }
copy_bits &= ~(Uint32)clear;
tuple_ptr->m_header_bits= copy_bits;
@@ -467,8 +461,11 @@ Dbtup::disk_page_log_buffer_callback(Sig
tupCommitReq->diskpage = page;
ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
+ ndbassert(regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit == 0);
regOperPtr.p->op_struct.m_wait_log_buffer= 0;
m_global_page_pool.getPtr(m_pgman_ptr, page);
+ m_global_page_pool.getPtr(m_pgman_ptr_orig,
+ regOperPtr.p->m_commit_disk_callback_page_orig);
execTUP_COMMITREQ(signal);
ndbassert(signal->theData[0] == 0);
@@ -586,135 +583,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
}
}
}
-
- bool get_page = false;
- if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
- {
- jam();
- Page_cache_client::Request req;
- /**
- * Only last op on tuple needs "real" commit,
- * hence only this one should have m_load_diskpage_on_commit
- */
- ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
-
- /**
- * Check for page
- */
- if(!regOperPtr.p->m_copy_tuple_location.isNull())
- {
- jam();
- Tuple_header* tmp=
- get_copy_tuple(regTabPtr.p, ®OperPtr.p->m_copy_tuple_location);
-
- memcpy(&req.m_page,
- tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
-
- if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
- tmp->m_header_bits & Tuple_header::DISK_ALLOC))
- {
- jam();
- /**
- * Insert+Delete
- */
- regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
- regOperPtr.p->op_struct.m_wait_log_buffer = 0;
- disk_page_abort_prealloc(signal, regFragPtr.p,
- &req.m_page, req.m_page.m_page_idx);
-
- D("Logfile_client - execTUP_COMMITREQ");
- Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
- lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
- goto skip_disk;
- if (0) ndbout_c("insert+delete");
- jamEntry();
- goto skip_disk;
- }
- }
- else
- {
- jam();
- // initial delete
- ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
- memcpy(&req.m_page,
- tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
-
- ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
- }
- req.m_callback.m_callbackData= regOperPtr.i;
- req.m_callback.m_callbackFunction =
- safe_cast(&Dbtup::disk_page_commit_callback);
-
- /*
- * Consider commit to be correlated. Otherwise pk op + commit makes
- * the page hot. XXX move to TUP which knows better.
- */
- int flags= regOperPtr.p->op_struct.op_type |
- Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
- Page_cache_client pgman(this, c_pgman);
- int res= pgman.get_page(signal, req, flags);
- m_pgman_ptr = pgman.m_ptr;
- switch(res){
- case 0:
- /**
- * Timeslice
- */
- jam();
- signal->theData[0] = 1;
- return;
- case -1:
- ndbrequire("NOT YET IMPLEMENTED" == 0);
- break;
- default:
- jam();
- }
- get_page = true;
-
- {
- PagePtr tmpptr;
- tmpptr.i = m_pgman_ptr.i;
- tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr.p);
- disk_page_set_dirty(tmpptr);
- }
-
- regOperPtr.p->m_commit_disk_callback_page= res;
- regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
- }
-
- if(regOperPtr.p->op_struct.m_wait_log_buffer)
+ int result = -1;
+ if (regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- jam();
- /**
- * Only last op on tuple needs "real" commit,
- * hence only this one should have m_wait_log_buffer
- */
- ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
-
- CallbackPtr cb;
- cb.m_callbackData= regOperPtr.i;
- cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
- Uint32 sz= regOperPtr.p->m_undo_buffer_space;
-
- D("Logfile_client - execTUP_COMMITREQ");
- Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
- int res= lgman.get_log_buffer(signal, sz, &cb);
- jamEntry();
- switch(res){
- case 0:
- jam();
- signal->theData[0] = 1;
- return;
- case -1:
- ndbrequire("NOT YET IMPLEMENTED" == 0);
- break;
- default:
- jam();
- }
+ result= start_commit_dd_var(signal, regOperPtr, regTabPtr,
+ regFragPtr, tuple_ptr);
}
-
+ else
+ {
+ result= start_commit_dd_fixed(signal, regOperPtr, regTabPtr,
+ regFragPtr, tuple_ptr);
+ }
+ // If pages have not been retrieved yet, wait..
+ if (result != 0)
+ return;
+
assert(tuple_ptr);
-skip_disk:
req_struct.m_tuple_ptr = tuple_ptr;
Uint32 nextOp = regOperPtr.p->nextActiveOp;
@@ -751,8 +636,6 @@ skip_disk:
else
{
jam();
- if (get_page)
- ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
dealloc_tuple(signal, gci_hi, page.p, tuple_ptr,
regOperPtr.p, regFragPtr.p, regTabPtr.p);
}
@@ -798,3 +681,606 @@ Dbtup::set_commit_change_mask_info(const
memcpy(dst, maskptr, 4*masklen);
}
}
+
+/*
+ The following methods will handle disk data
+ as fixed size (m_disk_tuple_format=DISK_TUPLE_FIXED_SIZE)
+*/
+
+int Dbtup::retrieve_data_page(Signal *signal, bool orig,
+ Page_cache_client::Request req,
+ OperationrecPtr regOperPtr)
+{
+ req.m_callback.m_callbackData= regOperPtr.i;
+ if (orig)
+ {
+ req.m_callback.m_callbackFunction =
+ safe_cast(&Dbtup::disk_page_commit_callback_orig);
+ }
+ else
+ {
+ req.m_callback.m_callbackFunction =
+ safe_cast(&Dbtup::disk_page_commit_callback);
+ }
+
+ /*
+ * Consider commit to be correlated. Otherwise pk op + commit makes
+ * the page hot. XXX move to TUP which knows better.
+ */
+ int flags= regOperPtr.p->op_struct.op_type |
+ Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
+ Page_cache_client pgman(this, c_pgman);
+ int res= pgman.get_page(signal, req, flags);
+
+ if (orig)
+ {
+ m_pgman_ptr_orig = pgman.m_ptr;
+ }
+ else
+ {
+ m_pgman_ptr = pgman.m_ptr;
+ }
+
+ switch(res){
+ case 0:
+ /**
+ * Timeslice
+ */
+ jam();
+ signal->theData[0] = 1;
+ return res;
+ case -1:
+ ndbrequire("NOT YET IMPLEMENTED" == 0);
+ break;
+ default:
+ jam();
+ }
+ {
+ PagePtr tmpptr;
+ if (orig) {
+ tmpptr.i = m_pgman_ptr_orig.i;
+ tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr_orig.p);
+ }
+ else
+ {
+ tmpptr.i = m_pgman_ptr.i;
+ tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr.p);
+ }
+ disk_page_set_dirty(tmpptr);
+ }
+
+ if (orig)
+ {
+ regOperPtr.p->m_commit_disk_callback_page_orig= res;
+ regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit= 0;
+ }
+ else
+ {
+ regOperPtr.p->m_commit_disk_callback_page= res;
+ regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+ }
+ return res;
+}
+
+int Dbtup::retrieve_log_page(Signal *signal,
+ FragrecordPtr regFragPtr,
+ OperationrecPtr regOperPtr)
+{
+ jam();
+ /**
+ * Only last op on tuple needs "real" commit,
+ * hence only this one should have m_wait_log_buffer
+ */
+
+ CallbackPtr cb;
+ cb.m_callbackData= regOperPtr.i;
+ cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
+ Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+
+ D("Logfile_client - execTUP_COMMITREQ");
+ Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+ int res= lgman.get_log_buffer(signal, sz, &cb);
+ jamEntry();
+ switch(res){
+ case 0:
+ jam();
+ signal->theData[0] = 1;
+ return res;
+ case -1:
+ ndbrequire("NOT YET IMPLEMENTED" == 0);
+ break;
+ default:
+ jam();
+ }
+ regOperPtr.p->op_struct.m_wait_log_buffer= 0;
+
+ return res;
+}
+
+int Dbtup::start_commit_dd_fixed(Signal* signal,
+ OperationrecPtr regOperPtr,
+ TablerecPtr regTabPtr,
+ FragrecordPtr regFragPtr,
+ Tuple_header* tuple_ptr)
+{
+ bool get_page = false;
+ if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+ {
+ jam();
+ Page_cache_client::Request req;
+
+ /**
+ * Only last op on tuple needs "real" commit,
+ * hence only this one should have m_load_diskpage_on_commit
+ */
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+
+ /**
+ * Check for page
+ */
+ if(!regOperPtr.p->m_copy_tuple_location.isNull())
+ {
+ jam();
+ Tuple_header* tmp=
+ get_copy_tuple(regTabPtr.p, ®OperPtr.p->m_copy_tuple_location);
+
+ memcpy(&req.m_page,
+ tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+ if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
+ tmp->m_header_bits & Tuple_header::DISK_ALLOC))
+ {
+ jam();
+ /**
+ * Insert+Delete
+ */
+ if (0) ndbout_c("insert+delete");
+
+ regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+ regOperPtr.p->op_struct.m_wait_log_buffer = 0;
+ disk_page_abort_prealloc(signal, regFragPtr.p,
+ &req.m_page, req.m_page.m_page_idx);
+
+ D("Logfile_client - execTUP_COMMITREQ");
+ Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+ lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
+ // ndbout_c("i+d, returning");
+ return 0;
+ }
+ }
+ else
+ {
+ jam();
+ // initial delete
+ ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
+ memcpy(&req.m_page,
+ tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+ ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+ }
+
+ bool orig_page= false;
+ if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+ {
+ return 1; // data page is not retrieved yet
+ }
+ }
+
+ if(regOperPtr.p->op_struct.m_wait_log_buffer)
+ {
+ jam();
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+ if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
+ {
+ return 1; // log page is not retrieved yet
+ }
+ }
+
+ return 0;
+}
+
+void Dbtup::commit_operation_dd_fixed(Signal* signal,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
+ Tuple_header *tuple_ptr,
+ Tuple_header *disk_ptr,
+ Tuple_header *copy,
+ Uint32 gci)
+{
+ jam();
+ Local_key key;
+ memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+ Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
+
+ PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+ ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+ ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
+ Uint32 copy_bits= copy->m_header_bits;
+ Uint32 sz, *dst;
+ if(copy_bits & Tuple_header::DISK_ALLOC)
+ {
+ jam();
+ disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
+ }
+
+ if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+ {
+ jam();
+ sz= regTabPtr->m_offsets[DD].m_fix_header_size;
+ dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
+ }
+ else
+ {
+ jam();
+ dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+ sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
+ }
+
+ if(! (copy_bits & Tuple_header::DISK_ALLOC))
+ {
+ jam();
+ disk_page_undo_update(diskPagePtr.p,
+ &key, dst, sz, gci, logfile_group_id);
+ }
+
+ memcpy(dst, disk_ptr, 4*sz);
+ memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+}
+
+
+/*
+ The following methods will handle disk data
+ as variable size (m_disk_tuple_format=DISK_TUPLE_VAR_SIZE)
+*/
+
+void
+Dbtup::disk_page_commit_callback_orig(Signal* signal,
+ Uint32 opPtrI, Uint32 page_id)
+{
+ Uint32 hash_value;
+ Uint32 gci_hi, gci_lo;
+ OperationrecPtr regOperPtr;
+
+ jamEntry();
+
+ c_operation_pool.getPtr(regOperPtr, opPtrI);
+ c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo);
+
+ TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+
+ tupCommitReq->opPtr= opPtrI;
+ tupCommitReq->hashValue= hash_value;
+ tupCommitReq->gci_hi= gci_hi;
+ tupCommitReq->gci_lo= gci_lo;
+ tupCommitReq->diskpage = regOperPtr.p->m_commit_disk_callback_page;
+
+ regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit= 0;
+ regOperPtr.p->m_commit_disk_callback_page_orig= page_id;
+ m_global_page_pool.getPtr(m_pgman_ptr_orig, page_id);
+ m_global_page_pool.getPtr(m_pgman_ptr,
+ regOperPtr.p->m_commit_disk_callback_page);
+
+ {
+ PagePtr tmp;
+ tmp.i = m_pgman_ptr_orig.i;
+ tmp.p = reinterpret_cast<Page*>(m_pgman_ptr_orig.p);
+ disk_page_set_dirty(tmp);
+ }
+
+ execTUP_COMMITREQ(signal);
+ if(signal->theData[0] == 0)
+ {
+ jam();
+ c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+ }
+}
+
+int Dbtup::start_commit_dd_var(Signal* signal,
+ OperationrecPtr regOperPtr,
+ TablerecPtr regTabPtr,
+ FragrecordPtr regFragPtr,
+ Tuple_header* tuple_ptr)
+{
+ bool get_page = false;
+ Uint32 bits = tuple_ptr->m_header_bits;
+
+ Local_key origkey;
+ memcpy(&origkey,
+ tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+ Uint32 cp_bits= (Uint32)0;
+ Tuple_header* copy_tuple= 0;
+ Local_key key_copy;
+
+ if (regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+ {
+ jam();
+ Page_cache_client::Request req;
+ bool has_copy_tuple = !regOperPtr.p->m_copy_tuple_location.isNull();
+
+ /*
+ Only last op on tuple needs "real" commit,
+ hence only this one should have m_load_diskpage_on_commit.
+ */
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+ /*
+ Find the page to retrieve and
+ abort the preallocs in case of DELETE.
+ */
+ if (has_copy_tuple)
+ {
+ // There is a copy tuple.
+ jam();
+ copy_tuple=
+ get_copy_tuple(regTabPtr.p, ®OperPtr.p->m_copy_tuple_location);
+ cp_bits= copy_tuple->m_header_bits;
+
+ memcpy(&key_copy,
+ copy_tuple->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+ memcpy(&req.m_page,
+ copy_tuple->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+ }
+
+ if (regOperPtr.p->op_struct.op_type == ZDELETE)
+ {
+ if (has_copy_tuple)
+ {
+ if (unlikely(cp_bits & Tuple_header::DISK_ALLOC))
+ {
+ /* Insert+Delete. */
+ jam();
+ if (0) ndbout_c("insert+delete");
+
+ regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+ regOperPtr.p->op_struct.m_wait_log_buffer = 0;
+ disk_page_abort_prealloc(signal, regFragPtr.p,
+ &req.m_page, req.m_page.m_page_idx+1);
+
+ D("Logfile_client - execTUP_COMMITREQ");
+ Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+ lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
+ return 0;
+ }
+
+ if (cp_bits & Tuple_header::DISK_MOVE)
+ {
+ regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit = 0;
+
+ /* MOVE is logged as an insert. Since there is no move
+ due to ZDELETE, relese the preallocs for data & log.
+ */
+
+ // +1 for index slot.
+ disk_page_abort_prealloc(signal, regFragPtr.p,
+ &req.m_page, req.m_page.m_page_idx+1);
+
+ Uint32 undo_log_tup_sz= sizeof(Dbtup::Disk_undo::Alloc)>>2;
+ /* Keep the undo_buffer_space to write log the
+ original tuple in dealloc_tuple.
+ */
+ regOperPtr.p->m_undo_buffer_space -= undo_log_tup_sz;
+
+ Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+ lgman.free_log_space(undo_log_tup_sz);
+ }
+ else if (cp_bits & Tuple_header::DISK_GROW)
+ {
+ // Release the delta prealloc'd by GROW.
+ disk_page_abort_prealloc(signal, regFragPtr.p,
+ &req.m_page, req.m_page.m_page_idx);
+ }
+ } // end has_copy_tuple
+ else
+ {
+ // No copy-tuple, i.e. initial delete, retrieve original page.
+ jam();
+ ndbassert(bits & Tuple_header::DISK_PART);
+ memcpy(&req.m_page,
+ tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+ }
+ } // end ZDELETE
+
+ /* Retrieve the requested page:
+ In case of initial delete, retrieve the original page,
+ else retrieve the page referred by the copy tuple's disk ref.
+ */
+
+ bool orig_page= false;
+ if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+ {
+ return 1; // Data page is not retrieved yet.
+ }
+ } // end load_diskpage_on_commit
+
+ /* Retrieve the ori page, in case of MOVE.
+ MOVE will be performed as a delete of the original tuple
+ and an insert of the new (moved) tuple.
+ */
+
+ if (regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit)
+ {
+ Page_cache_client::Request req;
+ memcpy(&req.m_page,
+ tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+ ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+
+ bool orig_page= true;
+ if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+ {
+ return 1; // data page is not retrieved yet
+ }
+ }
+
+ /******* Retrieve the log page **********/
+
+ if (regOperPtr.p->op_struct.m_wait_log_buffer)
+ {
+ jam();
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+ if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
+ {
+ return 1; // log page is not retrieved yet
+ }
+ }
+
+ if (copy_tuple)
+ {
+ m_global_page_pool.getPtr(m_pgman_ptr,
+ regOperPtr.p->m_commit_disk_callback_page);
+
+ if (cp_bits & Tuple_header::DISK_MOVE)
+ {
+ m_global_page_pool.getPtr(m_pgman_ptr_orig,
+ regOperPtr.p->m_commit_disk_callback_page_orig);
+ }
+ }
+ return 0;
+}
+
+void Dbtup::commit_operation_dd_var(Signal* signal,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
+ Tuple_header *tuple_ptr,
+ Tuple_header *disk_ptr,
+ Local_key key_orig,
+ Uint32 gci)
+{
+ //ndbout_c("commit_operation_dd_var");
+ jam();
+ // tuple_ptr contains the copy tuple
+ Local_key key;
+ memcpy(&key, tuple_ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+ Uint32 logfile_group_id = regFragPtr->m_logfile_group_id;
+ PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+
+ ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+ ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
+ diskPagePtr.p= reinterpret_cast<Page*>(m_pgman_ptr.p);
+
+ Uint32 copy_bits = tuple_ptr->m_header_bits;
+ bool tuple_exists = copy_bits & Tuple_header::DISK_PART;
+ bool initial_insert = copy_bits & Tuple_header::DISK_ALLOC;
+ bool tuple_moved = copy_bits & Tuple_header::DISK_MOVE;
+ bool tuple_grown = copy_bits & Tuple_header::DISK_GROW;
+ bool tuple_shrunken = copy_bits & Tuple_header::DISK_SHRINK;
+ ndbassert(initial_insert == !tuple_exists);
+
+
+
+ /* Find the real size of the copy tuple. */
+
+ Uint32 fixedsz= ((Tuple_header*)disk_ptr)->get_fix_size_dd(regTabPtr);
+ Varpart_copy *dp= (Varpart_copy*)(((Tuple_header*)disk_ptr)->
+ get_end_of_fix_part_ptr_dd(regTabPtr));
+ Uint32 *dynstart = dp->m_data;
+ Uint32 dynlen = dp->m_len;
+ Uint32 realsz = fixedsz+dynlen;
+
+
+ /* Get the sz of the original disk tuple
+ and log the before_image. */
+
+ PagePtr orig_diskPagePtr;
+ Uint32 orig_sz = 0;
+
+ if (tuple_exists)
+ {
+ if (tuple_moved)
+ {
+ jam();
+ orig_diskPagePtr = *(PagePtr*)&m_pgman_ptr_orig;
+ ndbassert(orig_diskPagePtr.p->m_page_no == key_orig.m_page_no);
+ ndbassert(orig_diskPagePtr.p->m_file_no == key_orig.m_file_no);
+ }
+ else
+ {
+ orig_diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+ }
+ Uint32 *orig_dst = ((Var_page*)orig_diskPagePtr.p)->
+ get_ptr(key_orig.m_page_idx);
+ orig_sz =
+ ((Var_page*)orig_diskPagePtr.p)->get_entry_len(key_orig.m_page_idx);
+
+
+ /* Log the before-image. */
+
+ jam();
+ if (! (initial_insert || tuple_moved) )
+ disk_page_undo_update(diskPagePtr.p,
+ &key_orig, orig_dst, orig_sz, gci,
+ logfile_group_id);
+ }
+
+ /* Free any extra reserved/alloc'd space */
+
+ Uint32 reservedsz = key.m_page_idx;
+
+ if ((initial_insert || tuple_moved) && reservedsz > realsz)
+ {
+ /* MOVE after initial insert or grow has alloc'd more than needed.
+ -1 for index slot.
+ */
+ disk_page_abort_prealloc_callback_1(signal, regFragPtr,
+ diskPagePtr,
+ reservedsz - realsz - 1);
+ }
+
+ /* Alloc/realloc record. */
+
+ if (initial_insert || tuple_moved)
+ {
+ jam();
+ key.m_page_idx = realsz;
+ disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
+ }
+ else if (tuple_grown)
+ {
+ jam();
+
+ bool grown = true;
+ disk_page_realloc_on_same_page(regFragPtr, diskPagePtr, &key_orig,
+ gci, orig_sz, realsz, grown);
+
+ // In GROW case, copy_key's m_page_idx contains the growth delta.
+ memcpy(&key, &key_orig, sizeof(Local_key));
+ }
+
+ else if (tuple_shrunken)
+ {
+ /* Update the index slot's length entry after SHRINK
+ and release the extra space. */
+
+ bool grown = false;
+ disk_page_realloc_on_same_page(regFragPtr, diskPagePtr, &key,
+ gci, orig_sz, realsz, grown);
+ }
+
+ /* Get the record & check the size. */
+
+ Uint32 *dst;
+ Uint32 newsz = realsz;
+ jam();
+
+ dst = ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+ newsz = ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
+
+ ndbassert(newsz == realsz);
+
+ /* Copy after_image of the tuple into the data page. */
+
+ memcpy(dst, disk_ptr, fixedsz*4); // Copy fixed part.
+ dst += fixedsz;
+ memcpy(dst, dynstart, dynlen*4); // Copy dyn part.
+ memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+
+ /* Free the original tuple after being MOVEd. */
+
+ if (tuple_moved)
+ {
+ disk_page_free(signal, regTabPtr, regFragPtr,
+ &key_orig, orig_diskPagePtr, gci);
+ }
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2009-09-04 15:24:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2009-11-09 09:42:56 +0000
@@ -154,26 +154,36 @@ Dbtup::Disk_alloc_info::Disk_alloc_info(
if (tabPtrP->m_no_of_disk_attributes == 0)
return;
- Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
-
- if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
+ Uint32 max=0;
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
- m_page_free_bits_map[0] = recs_per_page; // 100% free
- m_page_free_bits_map[1] = 1;
- m_page_free_bits_map[2] = 0;
+ Uint32 pagesz = Tup_varsize_page::DATA_WORDS - 1;
+ m_page_free_bits_map[0] = pagesz; // 100% free 8159 bytes
+ m_page_free_bits_map[1] = pagesz/2; // 50% free 4079
+ m_page_free_bits_map[2] = pagesz/10;
m_page_free_bits_map[3] = 0;
-
- Uint32 max= recs_per_page * extent_size;
- for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
+
+ max= pagesz * extent_size;
+ }
+ else
+ {
+ Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
+
+ if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
{
- m_total_extent_free_space_thresholds[i] =
- (EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
+ Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
+ m_page_free_bits_map[0] = recs_per_page; // 100% free
+ m_page_free_bits_map[1] = 1;
+ m_page_free_bits_map[2] = 0;
+ m_page_free_bits_map[3] = 0;
+
+ max= recs_per_page * extent_size;
}
}
- else
+ for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
{
- abort();
+ m_total_extent_free_space_thresholds[i] =
+ (EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
}
}
@@ -186,6 +196,7 @@ Dbtup::Disk_alloc_info::find_extent(Uint
* Return position in matrix
*/
Uint32 col = calc_page_free_bits(sz);
+ if (col == 3) col = 2;
Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
for(Uint32 i= 0; i<EXTENT_SEARCH_MATRIX_SIZE; i++)
{
@@ -428,6 +439,16 @@ Dbtup::disk_page_prealloc(Signal* signal
PagePtr tmp;
tmp.i = gpage.i;
tmp.p = reinterpret_cast<Page*>(gpage.p);
+
+ Uint32 ext= tmp.p->m_extent_info_ptr;
+ Ptr<Extent_info> extentPtr;
+ c_extent_pool.getPtr(extentPtr, ext);
+
+ if (tmp.p->free_space < tmp.p->uncommitted_used_space + sz)
+ {
+ continue;
+ }
+
disk_page_prealloc_dirty_page(alloc, tmp, i, sz);
key->m_page_no= tmp.p->m_page_no;
key->m_file_no= tmp.p->m_file_no;
@@ -451,6 +472,16 @@ Dbtup::disk_page_prealloc(Signal* signal
Ptr<Page_request> req;
c_page_request_pool.getPtr(req, ptrI);
+ Uint32 ext= req.p->m_extent_info_ptr;
+ Ptr<Extent_info> extentPtr;
+ c_extent_pool.getPtr(extentPtr, ext);
+
+ if (req.p->m_original_estimated_free_space <=
+ req.p->m_uncommitted_used_space + sz)
+ {
+ continue;
+ }
+
disk_page_prealloc_transit_page(alloc, req, i, sz);
* key = req.p->m_key;
if (DBG_DISK)
@@ -478,7 +509,9 @@ Dbtup::disk_page_prealloc(Signal* signal
int pageBits; // received
Ptr<Extent_info> ext;
- const Uint32 bits= alloc.calc_page_free_bits(sz); // required
+ Uint32 bits= alloc.calc_page_free_bits(sz); // required
+ if (bits == EXTENT_SEARCH_MATRIX_COLS - 1)
+ bits= EXTENT_SEARCH_MATRIX_COLS - 2;
bool found= false;
/**
@@ -886,13 +919,14 @@ Dbtup::disk_page_prealloc_initial_callba
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
- if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
+ if (tabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
+ ((Var_page*)pagePtr.p)->init();
}
else
+ if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
{
- abort();
+ convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
}
pagePtr.p->m_page_no= req.p->m_key.m_page_no;
@@ -1133,6 +1167,43 @@ Dbtup::disk_page_alloc(Signal* signal,
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
Uint64 lsn;
+
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ Uint32 sz= key->m_page_idx;
+ // Needs space for index slot.
+ ddassert(pagePtr.p->uncommitted_used_space >= sz+1);
+ Uint32 old_free = ((Var_page*)pagePtr.p)->free_space;
+ Uint32 old_idx= pagePtr.p->list_index;
+ Uint32 new_used= pagePtr.p->uncommitted_used_space -= sz+1;
+
+ key->m_page_idx= ((Var_page*)pagePtr.p)->
+ alloc_record(sz, (Var_page*)ctemp_page, 0);
+
+ Uint32 new_free = ((Var_page*)pagePtr.p)->free_space;
+ Uint32 new_idx= alloc.calc_page_free_bits(new_free-new_used);
+ Uint32 ext = pagePtr.p->m_extent_info_ptr;
+ Ptr<Extent_info> extentPtr;
+ c_extent_pool.getPtr(extentPtr, ext);
+
+ if (old_idx != new_idx)
+ {
+ jam();
+ disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
+ }
+
+ /* Update ext's free space with the unused space
+ prealloc'd for the index slot, if any.
+ */
+ Uint32 delta = ((old_free - new_free) == sz) ? 1 : 0;
+ if (delta > 0)
+ {
+ update_extent_pos(alloc, extentPtr, delta);
+ }
+
+ lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
+ }
+ else
if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
{
ddassert(pagePtr.p->uncommitted_used_space > 0);
@@ -1140,16 +1211,6 @@ Dbtup::disk_page_alloc(Signal* signal,
key->m_page_idx= ((Fix_page*)pagePtr.p)->alloc_record();
lsn= disk_page_undo_alloc(pagePtr.p, key, 1, gci, logfile_group_id);
}
- else
- {
- Uint32 sz= key->m_page_idx;
- ddassert(pagePtr.p->uncommitted_used_space >= sz);
- pagePtr.p->uncommitted_used_space -= sz;
- key->m_page_idx= ((Var_page*)pagePtr.p)->
- alloc_record(sz, (Var_page*)ctemp_page, 0);
-
- lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
- }
}
void
@@ -1168,6 +1229,17 @@ Dbtup::disk_page_free(Signal *signal,
Uint32 sz;
Uint64 lsn;
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
+ sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
+ lsn= disk_page_undo_free(pagePtr.p, key,
+ src, sz,
+ gci, logfile_group_id);
+
+ ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
+ }
+ else
if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
{
sz = 1;
@@ -1179,17 +1251,7 @@ Dbtup::disk_page_free(Signal *signal,
((Fix_page*)pagePtr.p)->free_record(page_idx);
}
- else
- {
- const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
- sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
- lsn= disk_page_undo_free(pagePtr.p, key,
- src, sz,
- gci, logfile_group_id);
-
- ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
- }
-
+
Uint32 new_free = pagePtr.p->free_space;
Uint32 ext = pagePtr.p->m_extent_info_ptr;
@@ -1212,7 +1274,15 @@ Dbtup::disk_page_free(Signal *signal,
disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
}
- update_extent_pos(alloc, extentPtr, sz);
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ Uint32 delta = new_free-old_free; // sz + freed index slot, if any
+ update_extent_pos(alloc, extentPtr, delta);
+ }
+ else
+ {
+ update_extent_pos(alloc, extentPtr, sz);
+ }
#if NOT_YET_FREE_EXTENT
if (check_free(extentPtr.p) == 0)
{
@@ -1811,30 +1881,51 @@ Dbtup::disk_restart_undo_alloc(Apply_und
{
ndbassert(undo->m_page_ptr.p->m_file_no == undo->m_key.m_file_no);
ndbassert(undo->m_page_ptr.p->m_page_no == undo->m_key.m_page_no);
+
+ if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
+ }
+ else
if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
{
((Fix_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx);
}
- else
- ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
}
void
Dbtup::disk_restart_undo_update(Apply_undo* undo)
{
Uint32* ptr;
- Uint32 len= undo->m_len - 4;
+ Uint32 len= undo->m_len - 4; //Len of the before-image- from log record.
+ Uint32 record_len= 0; // Len of the record on the disk page.
+
+ if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
+
+ // Length of the record (after-image).
+ record_len= ((Var_page*)undo->m_page_ptr.p)->
+ get_entry_len(undo->m_key.m_page_idx);
+ if (len > record_len)
+ {
+ undo->m_key.m_page_idx = grow_record_on_same_page(undo->m_page_ptr,
+ undo->m_key.m_page_idx, record_len, len);
+ ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
+ }
+ else if (len < record_len)
+ {
+ ((Var_page*)undo->m_page_ptr.p)->shrink_entry(undo->m_key.m_page_idx,
+ len);
+ }
+ }
+ else
if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
{
ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
}
- else
- {
- ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
- abort();
- }
-
+
const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
const Uint32* src= update->m_data;
memcpy(ptr, src, 4 * len);
@@ -1845,17 +1936,21 @@ Dbtup::disk_restart_undo_free(Apply_undo
{
Uint32* ptr, idx = undo->m_key.m_page_idx;
Uint32 len= undo->m_len - 4;
+
+ if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ idx= ((Var_page*)undo->m_page_ptr.p)->
+ alloc_record(idx, len, (Var_page*)ctemp_page);
+ ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(idx);
+ }
+ else
if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
{
ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
idx= ((Fix_page*)undo->m_page_ptr.p)->alloc_record(idx);
ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(idx, len);
}
- else
- {
- abort();
- }
-
+
ndbrequire(idx == undo->m_key.m_page_idx);
const Disk_undo::Free *free = (const Disk_undo::Free*)undo->m_ptr;
const Uint32* src= free->m_data;
@@ -1993,6 +2088,146 @@ Dbtup::disk_page_get_allocated(const Tab
}
}
res[0] = cnt * alloc.m_extent_size * File_formats::NDB_PAGE_SIZE;
- res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ res[1] = free;
+ }
+ else
+ {
+ res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
+ }
+ }
+}
+
+
+/*
+ The following methods will be used when a tuple grows and the
+ disk data is varsz (m_disk_tuple_format=DISK_TUPLE_VAR_SIZE).
+*/
+
+
+/* Record has grown; prealloc if there is enough space on the page. */
+bool
+Dbtup::disk_page_prealloc_on_same_page(Signal *signal,
+ Fragrecord* fragPtrP,
+ Local_key currentKey,
+ PagePtr currentpage,
+ Uint32 addsz,
+ Uint32 page_id)
+{
+ disk_page_set_dirty(currentpage);
+ Disk_alloc_info& alloc = fragPtrP->m_disk_alloc_info;
+ Uint32 used = currentpage.p->uncommitted_used_space;
+
+ Uint32 ext = currentpage.p->m_extent_info_ptr;
+ Ptr<Extent_info> extentPtr;
+ c_extent_pool.getPtr(extentPtr, ext);
+
+ if (currentpage.p->free_space >= used + addsz)
+ {
+ disk_page_prealloc_dirty_page(alloc, currentpage,
+ currentpage.p->list_index, addsz);
+ return true;
+ }
+ return false;
+}
+
+Uint16
+Dbtup::grow_record_on_same_page(PagePtr pagePtr, Uint16 page_index,
+ Uint32 oldsz, Uint32 newsz)
+{
+ Uint32 add = newsz - oldsz;
+ Uint32 *new_var_ptr = 0;
+ Var_page* pageP = (Var_page*)pagePtr.p;
+ ndbassert(newsz);
+ ndbassert(add);
+
+ if (oldsz && pageP->free_space >= add)
+ {
+ jam();
+ new_var_ptr = pageP->get_ptr(page_index);
+ if(!pageP->is_space_behind_entry(page_index, add))
+ {
+ if(0) printf("realloc_rec_on_same_page extra reorg\n");
+ jam();
+ /*
+ In this case we need to reorganise the page to fit. To ensure we
+ don't complicate matters we make a little trick here where we
+ fool the reorg_page to avoid copying the entry at hand and copy
+ that separately at the end. This means we need to copy it out of
+ the page before reorg_page to save the entry contents.
+ */
+ Uint32* copyBuffer = cinBuffer;
+ memcpy(copyBuffer, new_var_ptr, 4*oldsz);
+ pageP->set_entry_len(page_index, 0);
+ pageP->free_space += oldsz;
+ pageP->reorg((Var_page*)ctemp_page);
+ new_var_ptr = pageP->get_free_space_ptr();
+ memcpy(new_var_ptr, copyBuffer, 4*oldsz);
+ pageP->set_entry_offset(page_index, pageP->insert_pos);
+ add += oldsz;
+ }
+ pageP->grow_entry(page_index, add);
+ }
+ return page_index;
+}
+
+/* This method is called by commit_operation in 2 cases:
+ - when a record expands and there is enough space on the current page,
+ - when the record shrinks.
+*/
+void
+Dbtup::disk_page_realloc_on_same_page(Fragrecord* fragPtrP, PagePtr pagePtr,
+ Local_key* key, Uint32 gci,
+ Uint32 oldsz, Uint32 newsz, bool grow)
+{
+ jam();
+
+ Uint32 logfile_group_id = fragPtrP->m_logfile_group_id;
+ Disk_alloc_info& alloc = fragPtrP->m_disk_alloc_info;
+
+ Uint64 lsn;
+ Uint32 old_used = pagePtr.p->uncommitted_used_space;
+ Uint32 old_free = ((Var_page*)pagePtr.p)->free_space;
+ Uint32 old_idx = alloc.calc_page_free_bits(old_free-old_used);
+
+ Uint32 new_used = old_used;
+ Uint32 new_free = 0;
+ Uint32 new_idx = 0;
+
+ if (grow)
+ {
+ ddassert(newsz > oldsz);
+ Uint32 diff = newsz-oldsz;
+ key->m_page_idx =
+ grow_record_on_same_page(pagePtr,key->m_page_idx, oldsz, newsz);
+ pagePtr.p->uncommitted_used_space = new_used -= diff;
+ }
+ else
+ {
+ // Record has shrunken.
+ ddassert(newsz < oldsz);
+ ((Var_page*)pagePtr.p)->shrink_entry(key->m_page_idx, newsz);
+ }
+
+ new_free = ((Var_page*)pagePtr.p)->free_space;
+ new_idx = alloc.calc_page_free_bits(new_free-new_used);
+
+ Ptr<Extent_info> extentPtr;
+
+ if (old_idx != new_idx || !grow)
+ {
+ Uint32 ext = pagePtr.p->m_extent_info_ptr;
+ c_extent_pool.getPtr(extentPtr, ext);
+ }
+ if (old_idx != new_idx)
+ {
+ disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
+ }
+
+ /* If the tuple is shrunken, return the delta to the ext's free space.*/
+ if (!grow)
+ {
+ update_extent_pos(alloc, extentPtr, new_free - old_free);
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2009-11-09 09:42:56 +0000
@@ -193,6 +193,8 @@ Dbtup::insertActiveOpList(OperationrecPt
prevOpPtr.p->op_struct.m_wait_log_buffer;
regOperPtr.p->op_struct.m_load_diskpage_on_commit=
prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
+ regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit =
+ prevOpPtr.p->op_struct.m_load_diskpage_orig_on_commit;
regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
// start with prev mask (matters only for UPD o UPD)
@@ -200,6 +202,7 @@ Dbtup::insertActiveOpList(OperationrecPt
prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+ prevOpPtr.p->op_struct.m_load_diskpage_orig_on_commit = 0;
if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
{
@@ -726,9 +729,20 @@ void Dbtup::execTUPKEYREQ(Signal* signal
if (Roptype == ZUPDATE) {
jam();
- if (unlikely(handleUpdateReq(signal, regOperPtr,
- regFragPtr, regTabPtr,
- &req_struct, disk_page != RNIL) == -1))
+ int res=0;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ res= handleUpdateReq_dd_var(signal, operPtr,
+ fragptr, regTabPtr, &req_struct,
+ disk_page != RNIL);
+ }
+ else
+ {
+ res= handleUpdateReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, &req_struct,
+ disk_page != RNIL);
+ }
+ if (unlikely(res == -1))
{
return;
}
@@ -1113,6 +1127,8 @@ error:
Both variable-sized and fixed-size attributes are stored in the same way
in the expanded form as variable-sized attributes (in expand_var_part()).
+ This method is used for both mem and disk dynamic data.
+
dst Destination for expanded data
tabPtrP Table descriptor
src Pointer to the start of dynamic bitmap in source row
@@ -1127,8 +1143,8 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
Uint32 row_len,
const Uint32 * tabDesc,
const Uint16* order,
- Uint32 mm_dynvar,
- Uint32 mm_dynfix,
+ Uint32 dynvar,
+ Uint32 dynfix,
Uint32 max_bmlen)
{
/* Copy the bitmap, zeroing out any words not stored in the row. */
@@ -1161,11 +1177,11 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
Uint16* dst_len_ptr= dst_off_ptr + no_attr;
Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
/* We need to reserve room for the offsets written by shrink_tuple+padding.*/
- Uint16 dst_off= 4 * (max_bmlen + ((mm_dynvar+2)>>1));
+ Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
char *dst_ptr= (char*)dst_bm_ptr + dst_off;
- for(Uint32 i= 0; i<mm_dynvar; i++)
+ for(Uint32 i= 0; i<dynvar; i++)
{
- Uint16 j= order[mm_dynfix+i];
+ Uint16 j= order[dynfix+i];
Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
Uint32 len;
Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
@@ -1199,12 +1215,12 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
dynamic part of the row. This is true both for the stored/shrunken and
for the expanded form.
*/
- for(Uint32 i= mm_dynfix; i>0; )
+ for(Uint32 i= dynfix; i>0; )
{
i--;
Uint16 j= order[i];
Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
- dst_off_ptr[mm_dynvar+i]= dst_off;
+ dst_off_ptr[dynvar+i]= dst_off;
/* len offset array is not used for fixed size. */
Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
@@ -1255,6 +1271,11 @@ Dbtup::prepare_initial_insert(KeyReqStru
const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
+
+ const Uint32 dd_dyns= regTabPtr->m_attributes[DD].m_no_of_dynamic;
+ const Uint32 dd_dynvar= regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+ const Uint32 dd_dynfix= regTabPtr->m_attributes[DD].m_no_of_dyn_fix;
+
Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
@@ -1320,6 +1341,27 @@ Dbtup::prepare_initial_insert(KeyReqStru
ndbrequire(dd_vars == 0);
+ /* Prepare empty dynamic part for disk data. */
+ if (dd_dyns &&
+ regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+
+ KeyReqStruct::Var_data* dst = &req_struct->m_var_data[DD];
+ dst->m_dyn_data_ptr=
+ (char*)((Tuple_header*)ptr)->get_end_of_fix_part_ptr_dd_with_dlen(regTabPtr);
+
+ dst->m_dyn_offset_arr_ptr = req_struct->var_pos_array +
+ 2 * mm_vars + 2 * (mm_dynvar + mm_dynfix);
+ dst->m_dyn_len_offset = dd_dynvar + dd_dynfix;
+ dst->m_max_dyn_offset = regTabPtr->m_offsets[DD].m_max_dyn_offset;
+ order += mm_dynvar + mm_dynfix;
+
+ ptr = expand_dyn_part(dst, 0, 0,
+ (Uint32*)tab_descr, order,
+ dd_dynvar, dd_dynfix,
+ regTabPtr->m_offsets[DD].m_dyn_null_words);
+ }
+
req_struct->m_tuple_ptr->m_header_bits= bits;
// Set all null bits
@@ -1467,7 +1509,14 @@ int Dbtup::handleInsertReq(Signal* signa
}
else if (regTabPtr->need_shrink())
{
- shrink_tuple(req_struct, sizes+2, regTabPtr, true);
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ shrink_tuple_dd_var(req_struct, sizes + 2, regTabPtr, true);
+ }
+ else
+ {
+ shrink_tuple(req_struct, sizes + 2, regTabPtr, true);
+ }
}
if (ERROR_INSERTED(4025))
@@ -1574,7 +1623,7 @@ int Dbtup::handleInsertReq(Signal* signa
goto size_change_error;
}
- if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
+ if (regTabPtr->need_shrink() && sizes[MM] != sizes[MM+2] &&
unlikely(handle_size_change_after_update(req_struct,
base,
regOperPtr.p,
@@ -1591,8 +1640,8 @@ int Dbtup::handleInsertReq(Signal* signa
if (disk_insert)
{
Local_key tmp;
- Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
- 1 : sizes[2+DD];
+ Uint32 size= regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE ?
+ sizes[2+DD] : 1;
if (ERROR_INSERTED(4021))
{
@@ -1600,7 +1649,17 @@ int Dbtup::handleInsertReq(Signal* signa
goto disk_prealloc_error;
}
- int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
+ int ret= 0;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ ret = disk_page_prealloc(signal, fragPtr, &tmp, size + 1);
+ // +1 for index slot
+ }
+ else
+ {
+ ret = disk_page_prealloc(signal, fragPtr, &tmp, size);
+ }
+
if (unlikely(ret < 0))
{
terrorCode = -ret;
@@ -1621,6 +1680,23 @@ int Dbtup::handleInsertReq(Signal* signa
disk_ptr->m_header_bits = 0;
disk_ptr->m_base_record_ref= ref.ref();
}
+ else if (disk &&
+ regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+ sizes[DD] != sizes[DD+2])
+ {
+ int ret;
+ if (unlikely(ret = handle_size_change_after_update_disk(signal,
+ req_struct,
+ base,
+ regOperPtr,
+ fragPtr,
+ regTabPtr,
+ sizes))!=0)
+ {
+ terrorCode = -ret;
+ goto exit_error;
+ }
+ }
if (req_struct->m_reorg)
{
@@ -1713,6 +1789,20 @@ int Dbtup::handleDeleteReq(Signal* signa
KeyReqStruct *req_struct,
bool disk)
{
+ // If tuple exists already, find its length.
+ Tuple_header* base = req_struct->m_tuple_ptr;
+ Uint32 src_tuplelen = 0;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+ base->m_header_bits & Tuple_header::DISK_PART)
+ {
+ Local_key key;
+ PagePtr orgpage;
+ memcpy(&key, base->get_disk_ref_ptr(regTabPtr), sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ Uint32 *src_ptr = get_dd_ptr(&orgpage, &key, regTabPtr);
+ src_tuplelen = ((Var_page*)(orgpage.p))->get_entry_len(key.m_page_idx);
+ }
+
// delete must set but not increment tupVersion
if (!regOperPtr->is_first_operation())
{
@@ -1740,9 +1830,17 @@ int Dbtup::handleDeleteReq(Signal* signa
{
regOperPtr->op_struct.m_wait_log_buffer = 1;
regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
- Uint32 sz= regOperPtr->m_undo_buffer_space=
- (sizeof(Dbtup::Disk_undo::Free) >> 2) +
- regTabPtr->m_offsets[DD].m_fix_header_size - 1;
+ Uint32 sz = 0;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+ {
+ sz = src_tuplelen;
+ }
+ else
+ {
+ sz = regTabPtr->m_offsets[DD].m_fix_header_size-1;
+ }
+ sz += (sizeof(Dbtup::Disk_undo::Free) >> 2 - 1);
+ regOperPtr->m_undo_buffer_space = sz;
D("Logfile_client - handleDeleteReq");
Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
@@ -2957,31 +3055,21 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
sizes[DD]= 0;
if(disk && dd_tot)
{
- const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
order+= mm_vars+mm_dynvar+mm_dynfix;
- if(bits & Tuple_header::DISK_INLINE)
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- // Only on copy tuple
- ndbassert(bits & Tuple_header::COPY_TUPLE);
+ expand_tuple_dd_var(req_struct, sizes, tabPtrP, bits, disk_ref,
+ dst_ptr, src_ptr, order, desc);
}
else
{
- Local_key key;
- memcpy(&key, disk_ref, sizeof(key));
- key.m_page_no= req_struct->m_disk_page_ptr.i;
- src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ expand_tuple_dd_fixed(req_struct, sizes, tabPtrP, bits, disk_ref,
+ dst_ptr, src_ptr);
}
- extra_bits |= Tuple_header::DISK_INLINE;
- // Fix diskpart
- req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
- memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
- sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
-
+ extra_bits |= Tuple_header::DISK_INLINE;
ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
-
- ndbrequire(dd_vars == 0);
}
ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
@@ -3087,7 +3175,6 @@ Dbtup::prepare_read(KeyReqStruct* req_st
Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
- const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
if(mm_vars || mm_dyns)
{
@@ -3171,25 +3258,14 @@ Dbtup::prepare_read(KeyReqStruct* req_st
if(disk && dd_tot)
{
- const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
-
- if(bits & Tuple_header::DISK_INLINE)
+ if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- // Only on copy tuple
- ndbassert(bits & Tuple_header::COPY_TUPLE);
+ prepare_read_dd_var(req_struct, tabPtrP, src_ptr);
}
else
{
- // XXX
- Local_key key;
- memcpy(&key, disk_ref, sizeof(key));
- key.m_page_no= req_struct->m_disk_page_ptr.i;
- src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ prepare_read_dd_fixed(req_struct, tabPtrP, src_ptr);
}
- // Fix diskpart
- req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
- ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
- ndbrequire(dd_vars == 0);
}
req_struct->is_expanded= false;
@@ -3289,7 +3365,7 @@ Dbtup::shrink_tuple(KeyReqStruct* req_st
/* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
* split to separate function. */
Uint16 dyn_dst_data_offset= 0;
- const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask;
+ const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[MM];
for(Uint16 i= 0; i< bm_len; i++)
{
Uint32 v= src_bm_ptr[i];
@@ -3486,7 +3562,8 @@ Dbtup::handle_size_change_after_update(K
Tablerec* regTabPtr,
Uint32 sizes[4])
{
- ndbrequire(sizes[1] == sizes[3]);
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_FIXED_SIZE)
+ ndbrequire(sizes[1] == sizes[3]);
//ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
if(0)
printf("%p %d %d - handle_size_change_after_update ",
@@ -3967,3 +4044,1065 @@ Dbtup::nr_delete_log_buffer_callback(Sig
c_lqh->nr_delete_complete(signal, &op);
}
+
+
+/*
+ The following methods will handle disk data
+ as fixed or variable size, depending on the m_disk_tuple_format
+*/
+
+void
+Dbtup::prepare_read_dd_fixed(KeyReqStruct* req_struct,
+ Tablerec* tabPtrP, const Uint32 *src_ptr)
+{
+ Tuple_header* ptr = req_struct->m_tuple_ptr;
+ Uint32 bits = ptr->m_header_bits;
+ const Uint32 *disk_ref = ptr->get_disk_ref_ptr(tabPtrP);
+
+ const Uint16 dd_vars = tabPtrP->m_attributes[DD].m_no_of_varsize;
+
+ if(bits & Tuple_header::DISK_INLINE)
+ {
+ // Only on copy tuple
+ ndbassert(bits & Tuple_header::COPY_TUPLE);
+ }
+ else
+ {
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ }
+ // Fixed diskpart
+ req_struct->m_disk_ptr = (Tuple_header*)src_ptr;
+ ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+ ndbrequire(dd_vars == 0);
+}
+
+void
+Dbtup::prepare_read_dd_var(KeyReqStruct* req_struct,
+ Tablerec* tabPtrP, const Uint32 *src_ptr)
+{
+ Tuple_header* ptr = req_struct->m_tuple_ptr;
+ Uint32 bits = ptr->m_header_bits;
+ const Uint32 *disk_ref = ptr->get_disk_ref_ptr(tabPtrP);
+
+ const Uint32 *src_data = src_ptr;
+ Uint32 src_dynlen = 0;
+ Uint32 src_fixedlen = 0;
+
+ if (bits & Tuple_header::DISK_INLINE)
+ {
+ // Only on copy tuple
+ ndbassert(bits & Tuple_header::COPY_TUPLE);
+ src_data = ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+ src_fixedlen = src_data - src_ptr;
+ Varpart_copy *dp = (Varpart_copy*)(src_data);
+ src_dynlen = dp->m_len;
+ src_data = dp->m_data;
+ }
+ else
+ {
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ Uint32 src_tuplelen =
+ ((Var_page*)(req_struct->m_disk_page_ptr.p))->get_entry_len(key.m_page_idx);
+
+ src_data = ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+ src_fixedlen = ((Tuple_header*)src_ptr)->get_fix_size_dd(tabPtrP);
+ src_dynlen = src_tuplelen - src_fixedlen;
+ }
+ // Dyn diskpart
+ req_struct->m_disk_ptr = (Tuple_header*)src_ptr;
+ ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+
+ req_struct->m_var_data[DD].m_dyn_part_len = src_dynlen;
+ req_struct->m_var_data[DD].m_dyn_data_ptr = (char*)(src_data);
+}
+
+void
+Dbtup::expand_tuple_dd_fixed(KeyReqStruct *req_struct, Uint32 sizes[2],
+ const Tablerec *tabPtrP, Uint32 bits,
+ const Uint32 *disk_ref, Uint32 *dst_ptr,
+ const Uint32 *src_ptr)
+{
+ const Uint16 dd_vars = tabPtrP->m_attributes[DD].m_no_of_varsize;
+
+ if(bits & Tuple_header::DISK_INLINE)
+ {
+ // Only on copy tuple
+ ndbassert(bits & Tuple_header::COPY_TUPLE);
+ }
+ else
+ {
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+ }
+
+ // Fix diskpart
+ req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+ memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
+ sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
+
+ ndbrequire(dd_vars == 0);
+}
+
+void
+Dbtup::expand_tuple_dd_var(KeyReqStruct *req_struct, Uint32 sizes[2],
+ const Tablerec *tabPtrP, Uint32 bits,
+ const Uint32 *disk_ref, Uint32 *dst_ptr,
+ const Uint32 *src_ptr, const Uint16 *order,
+ const Uint32 *desc)
+{
+ const Uint16 dd_dynvar = tabPtrP->m_attributes[DD].m_no_of_dyn_var;
+ const Uint16 dd_dynfix = tabPtrP->m_attributes[DD].m_no_of_dyn_fix;
+ Uint16 mm_dynvar = tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+ Uint16 mm_vars = tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dynfix = tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+ KeyReqStruct::Var_data* dst = &req_struct->m_var_data[DD];
+
+ req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+
+ Uint32 src_dynlen = 0;
+ Varpart_copy* dp;
+ Uint32 src_tuplelen = 0;
+
+ if (bits & Tuple_header::DISK_INLINE)
+ {
+ /* Only on copy tuple. */
+ ndbassert(bits & Tuple_header::COPY_TUPLE);
+ /* Copy tuple has dyn len field after fixed data. */
+ Uint32* end_fix_part_ptr =
+ ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+ dp = (Varpart_copy*)(end_fix_part_ptr);
+ src_dynlen = dp->m_len;
+
+ Uint32 disk_fix_header_len =
+ ((Tuple_header*)src_ptr)->get_fix_size_dd_with_dlen(tabPtrP);
+
+ /* Dynamic length takes 1 word. */
+ src_tuplelen = disk_fix_header_len + src_dynlen - 1;
+ memcpy(dst_ptr, src_ptr, 4*disk_fix_header_len);
+ src_ptr += disk_fix_header_len;
+ dst_ptr += disk_fix_header_len;
+ }
+ else
+ {
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+
+ // stored tuple does not have dyn len field after fixed data
+ src_tuplelen= ((Var_page*)(req_struct->m_disk_page_ptr.p))->
+ get_entry_len(key.m_page_idx);
+
+ Uint32 src_disk_fix_header_len =
+ ((Tuple_header*)src_ptr)->get_fix_size_dd(tabPtrP);
+ src_dynlen = src_tuplelen - src_disk_fix_header_len;
+
+ memcpy(dst_ptr, src_ptr, 4*src_disk_fix_header_len);
+ src_ptr += src_disk_fix_header_len;
+ dst_ptr += src_disk_fix_header_len;
+
+ dp = (Varpart_copy*)(dst_ptr);
+
+ /* copy tuple keeps the length of the dynamic part. */
+ dp->m_len = src_dynlen;
+ dst_ptr = dp->m_data;
+ }
+
+ /* Dyn diskpart - no space for dyn len field in sizes[DD]. */
+ sizes[DD] = src_tuplelen;
+ dst->m_dyn_offset_arr_ptr = req_struct->var_pos_array +
+ 2 * mm_vars + 2 * (mm_dynvar + mm_dynfix);
+ dst->m_dyn_len_offset = dd_dynvar+dd_dynfix;
+ dst->m_max_dyn_offset = tabPtrP->m_offsets[DD].m_max_dyn_offset;
+ dst->m_dyn_data_ptr = (char*)dst_ptr;
+ dst->m_dyn_part_len = dp->m_len;
+
+ dst_ptr = expand_dyn_part(dst, src_ptr, dst->m_dyn_part_len,
+ (Uint32*)desc, order,
+ dd_dynvar, dd_dynfix,
+ tabPtrP->m_offsets[DD].m_dyn_null_words);
+}
+
+static
+Uint32*
+shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
+ Uint32 *dst_ptr,
+ const Dbtup::Tablerec* tabPtrP,
+ const Uint32 * tabDesc,
+ const Uint16* order,
+ Uint32 dynvar,
+ Uint32 dynfix,
+ Uint32 ind)
+{
+ /*
+ Now build the dynamic part, if any.
+ First look for any trailing all-NULL words of the bitmap; we do
+ not need to store those.
+ */
+ assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+ char *dyn_src_ptr = dst->m_dyn_data_ptr;
+ Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
+
+ /* If no dynamic variables, store nothing. */
+ assert(bm_len);
+ {
+ Uint32 *bm_ptr = (Uint32 *)dyn_src_ptr + bm_len - 1;
+ while(*bm_ptr == 0)
+ {
+ bm_ptr--;
+ bm_len--;
+ if(bm_len == 0)
+ break;
+ }
+ }
+
+ if (bm_len)
+ {
+ /**
+ * Copy the bitmap, counting the number of variable sized
+ * attributes that are not NULL on the way.
+ */
+ Uint32 *dyn_dst_ptr = dst_ptr;
+ Uint32 dyn_var_count = 0;
+ const Uint32 *src_bm_ptr = (Uint32 *)(dyn_src_ptr);
+ Uint32 *dst_bm_ptr = (Uint32 *)dyn_dst_ptr;
+
+ /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
+ * split to separate function. */
+ Uint16 dyn_dst_data_offset = 0;
+ const Uint32 *dyn_bm_var_mask_ptr = tabPtrP->dynVarSizeMask[ind];
+ for(Uint16 i = 0; i < bm_len; i++)
+ {
+ Uint32 v = src_bm_ptr[i];
+ dyn_var_count += BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
+ dst_bm_ptr[i] = v;
+ }
+
+ Uint32 tmp = *dyn_dst_ptr;
+ assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
+ * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
+ dyn_dst_ptr += bm_len;
+ dyn_dst_data_offset = 2*dyn_var_count + 2;
+
+ Uint16 *dyn_src_off_array = dst->m_dyn_offset_arr_ptr;
+ Uint16 *dyn_src_lenoff_array =
+ dyn_src_off_array + dst->m_dyn_len_offset;
+ Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
+
+ /*
+ Copy over the variable sized not-NULL attributes.
+ Data offsets are counted from the start of the offset array, and
+ we store one additional offset to be able to easily compute the
+ data length as the difference between offsets.
+ */
+ Uint16 off_idx = 0;
+ for(Uint32 i = 0; i < dynvar; i++)
+ {
+ /*
+ Note that we must use the destination (shrunken) bitmap here,
+ as the source (expanded) bitmap may have been already clobbered
+ (by offset data).
+ */
+ Uint32 attrDesc2 = tabDesc[order[dynfix+i] + 1];
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+ if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+ {
+ dyn_dst_off_array[off_idx++] = dyn_dst_data_offset;
+ Uint32 dyn_src_off = dyn_src_off_array[i];
+ Uint32 dyn_len = dyn_src_lenoff_array[i] - dyn_src_off;
+ memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
+ dyn_src_ptr + dyn_src_off,
+ dyn_len);
+ dyn_dst_data_offset += dyn_len;
+ }
+ }
+ /* If all dynamic attributes are NULL, we store nothing. */
+ dyn_dst_off_array[off_idx] = dyn_dst_data_offset;
+ assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
+
+ char *dynvar_end_ptr = ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
+ char *dyn_dst_data_ptr = (char *)(ALIGN_WORD(dynvar_end_ptr));
+
+ /*
+ Zero out any padding bytes. Might not be strictly necessary,
+ but seems cleaner than leaving random stuff in there.
+ */
+ bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
+
+ /*
+ Copy over the fixed-sized not-NULL attributes.
+ Note that attributes are copied in reverse order; this is to avoid
+ overwriting not-yet-copied data, as the data is also stored in
+ reverse order.
+ */
+ for(Uint32 i = dynfix; i > 0; )
+ {
+ i--;
+ Uint16 j = order[i];
+ Uint32 attrDesc2 = tabDesc[j+1];
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+ if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+ {
+ Uint32 fixsize =
+ 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+ memmove(dyn_dst_data_ptr,
+ dyn_src_ptr + dyn_src_off_array[dynvar+i],
+ fixsize);
+ dyn_dst_data_ptr += fixsize;
+ }
+ }
+ dst_ptr = (Uint32*)dyn_dst_data_ptr;
+ assert((UintPtr(dst_ptr) & 3) == 0);
+ }
+ return (Uint32 *)dst_ptr;
+}
+
+void
+Dbtup::shrink_tuple_dd_var(KeyReqStruct* req_struct, Uint32 sizes[2],
+ const Tablerec* tabPtrP, bool disk)
+{
+ ndbassert(tabPtrP->need_shrink());
+ Tuple_header* ptr = req_struct->m_tuple_ptr;
+ ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
+ KeyReqStruct::Var_data* dst = &req_struct->m_var_data[MM];
+ Uint32 order_desc = tabPtrP->m_real_order_descriptor;
+ const Uint32 * tabDesc = (Uint32*)req_struct->attr_descr;
+ const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
+ Uint16 dd_tot = tabPtrP->m_no_of_disk_attributes;
+ Uint16 mm_fix = tabPtrP->m_attributes[MM].m_no_of_fixsize;
+ Uint16 mm_vars = tabPtrP->m_attributes[MM].m_no_of_varsize;
+ Uint16 mm_dyns = tabPtrP->m_attributes[MM].m_no_of_dynamic;
+ Uint16 mm_dynvar = tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+ Uint16 mm_dynfix = tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+
+ Uint32 *dst_ptr = ptr->get_end_of_fix_part_ptr(tabPtrP);
+ Uint16* src_off_ptr = req_struct->var_pos_array;
+ order += mm_fix;
+
+ sizes[MM] = 1;
+ sizes[DD] = 0;
+ if(mm_vars || mm_dyns)
+ {
+ Varpart_copy* vp = (Varpart_copy*)dst_ptr;
+ Uint32* varstart = dst_ptr = vp->m_data;
+
+ if (mm_vars)
+ {
+ Uint16* dst_off_ptr = (Uint16*)dst_ptr;
+ char* dst_data_ptr = (char*)(dst_off_ptr + mm_vars + 1);
+ char* src_data_ptr = dst_data_ptr;
+ Uint32 off = 0;
+ for(Uint32 i = 0; i < mm_vars; i++)
+ {
+ const char* data_ptr = src_data_ptr + *src_off_ptr;
+ Uint32 len = src_off_ptr[mm_vars] - *src_off_ptr;
+ * dst_off_ptr++ = off;
+ memmove(dst_data_ptr, data_ptr, len);
+ off += len;
+ src_off_ptr++;
+ dst_data_ptr += len;
+ }
+ *dst_off_ptr = off;
+ dst_ptr = ALIGN_WORD(dst_data_ptr);
+ order += mm_vars;
+ }
+
+ if (mm_dyns)
+ {
+ dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
+ order, mm_dynvar, mm_dynfix, MM);
+ order += mm_dynfix + mm_dynvar;
+ }
+
+ Uint32 varpart_len = dst_ptr - varstart;
+ vp->m_len = varpart_len;
+ sizes[MM] = varpart_len;
+ ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
+
+ ndbassert((UintPtr(ptr) & 3) == 0);
+ ndbassert(varpart_len < 0x10000);
+ }
+
+ if(disk && dd_tot)
+ {
+ Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
+ req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+
+ /* Copy dd fix diskpart: header + nullbits + GCI + dynlen. */
+ Uint32 disk_fix_header_len =
+ ((Tuple_header*)src_ptr)->get_fix_size_dd_with_dlen(tabPtrP);
+ Uint32* end_fix_part_ptr =
+ ((Tuple_header*)dst_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+
+ Varpart_copy* dp = (Varpart_copy*)(end_fix_part_ptr);
+
+ memmove(dst_ptr, src_ptr, 4 * disk_fix_header_len);
+ src_ptr += disk_fix_header_len;
+ Uint32 *diskdynstartptr= dst_ptr = dp->m_data;
+
+ /* Copy dynamic disk part. */
+ dst = &req_struct->m_var_data[DD];
+ Uint16 dd_dynvar = tabPtrP->m_attributes[DD].m_no_of_dyn_var;
+ Uint16 dd_dynfix = tabPtrP->m_attributes[DD].m_no_of_dyn_fix;
+
+ dst->m_dyn_data_ptr = (char*)src_ptr;
+
+ dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
+ order, dd_dynvar, dd_dynfix, DD);
+
+ Uint32 dynpart_len = dst_ptr - diskdynstartptr;
+ dst->m_dyn_part_len = dynpart_len;
+ dp->m_len = dynpart_len;
+
+ // -1: to remove the space for dyn len
+ sizes[DD] = dynpart_len + disk_fix_header_len - 1;
+ }
+
+ req_struct->is_expanded = false;
+}
+
+/*
+ Dbtup::handle_size_change_after_update_disk(..) handles the
+ size changes when a disk-tuple is updated.
+
+ Terminology:
+ Initial_insert: inserted first time withing this transaction
+ Original_tuple = stored tuple
+ Originally_alloced = size of the stored tuple
+ Previously_alloced = size of the copy tuple after ini_insert
+ or a stored tuple is updated by a previous operation
+ delta: difference between prev/ori_alloc'd size and the current sz
+ final_delta: aggregated deltas
+
+ Use cases:
+
+ A) Initial Insert cases:
+
+ DISK_SHRINK case:
+ 1)
+ prev_alloc'd tuple : ----------------
+ tuple after update : ----------
+
+ Abort_prealloc delta
+ At commit, the real sz will be alloc'd
+
+ 2) Tuple growth will lead to DISK_MOVE
+ (This limitation will be removed later)
+ prev_alloc'd tuple : ----------------
+ abort_prealloc this, order a new prealloc to fullsz (total_rec_sz)
+ tuple after update : --------------------------
+
+ At commit, final delta will be abort_prealloc'd
+ and the real sz will be alloc'd
+ At abort or DELETE of the tuple as the last operation, abort the prealloc
+
+ B) Not the initial insert cases:
+
+ DISK_SHRINK cases:
+
+ At commit, final-delta will be freed (disk_free)
+ and the real sz will be realloc'd
+
+ 1) simple cases:
+ ori_alloc'd tuple : ----------------
+ tuple after update : ----------
+
+ 2) SHRINK and then SHRINK
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple
+ tuple after prev update : -------------
+ tuple after current upd : --------
+
+ 3) GROW and then SHRINK to a size smaller than the ori_alloc'd sz
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple
+ tuple after prev update : -----------------------
+ tuple after current upd : -----------
+
+ DISK_GROW cases:
+
+ deltas will be adjusted with prealloc or abort_prealloc here
+ At commit, the real size will be realloc'd
+
+ 1) simple case 1:
+ ori_alloc'd tuple : ----------------
+ tup after cur update : -------------------- (found space on the page)
+
+ 2) GROW and then GROW
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple
+ tuple after prev update : ---------------------
+ tuple after current upd : --------------------------
+
+ 3) GROW and then shrink to a sz still larger than ori_alloc'd size
+ ori_alloc'd : ----------------
+ more updates on the tuple
+ tuple after prev upd : ------------------------------
+ after current update : ------------------------
+
+ 4) SHRINK and then GROW
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple
+ tuple after prev update : -----------
+ tuple after current upd : -----------------------
+
+ DISK_MOVE cases:
+
+ ori_alloc'd : ----------------
+ after updat : -------------------- (not enough space on the page)
+
+ No_change cases:
+
+ 1) simple case:
+ ori_alloc'd tuple : ----------------
+ tuple after update : ----------------
+
+ 2) GROWn and then to ori_alloc'd size
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple
+ tuple after prev update : -----------------------
+ tuple after current upd : ----------------
+
+ 3) SHRINK and then to ori_alloc'd size
+ ori_alloc'd tuple : ----------------
+ more updates on the tuple ..
+ tuple after prev update : ---------
+ tuple after current upd : ----------------
+*/
+
+int
+Dbtup::handle_size_change_after_update_disk(Signal* signal,
+ KeyReqStruct* req_struct,
+ Tuple_header* org,
+ Ptr<Operationrec> regOperRecPtr,
+ Ptr<Fragrecord> fragPtr,
+ Tablerec* regTabPtr,
+ Uint32 sizes[4])
+{
+ jam();
+ Operationrec* regOperPtr = regOperRecPtr.p;
+
+ if (0)
+ {
+ ndbout_c("\n scaud start: sizes - %d %d %d %d",
+ sizes[0], sizes[1], sizes[2], sizes[3]);
+ printf("%p %d %d\n",
+ req_struct->m_tuple_ptr,
+ regOperPtr->m_tuple_location.m_page_no,
+ regOperPtr->m_tuple_location.m_page_idx);
+ }
+
+ ndbassert(regTabPtr->m_no_of_disk_attributes);
+ Uint32 needed = sizes[2+DD];
+ ndbassert(needed <= regTabPtr->total_rec_size);
+
+ Tuple_header* copy = req_struct->m_tuple_ptr;
+ Uint32 copy_bits = copy->m_header_bits;
+ ndbassert(copy_bits & Tuple_header::COPY_TUPLE);
+ Local_key copykey;
+ memcpy(©key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+
+ if (copy_bits & Tuple_header::DISK_MOVE)
+ {
+ /* Tuple is already moved to another page with total_rec_size,
+ so must have enough space to all the consecutive 'grow's.
+ */
+ if (0)
+ ndbout_c("Tuple is already moved. pgidx %u, returning..",
+ copykey.m_page_idx);
+ return 0;
+ }
+
+ Uint32 org_bits = org->m_header_bits;
+
+ /* The size of the stored tuple. */
+ Uint32 originally_alloced = 0;
+ /* The copy tuple size allocated by the previous operation. */
+ Uint32 previously_alloced = 0;
+
+ PagePtr orgpage;
+ Local_key orgkey;
+
+ bool initial_insert = false;
+ if (copy_bits & Tuple_header::DISK_ALLOC)
+ {
+ initial_insert = true;
+ previously_alloced = copykey.m_page_idx;
+ // if (0) ndbout_c("initialinsert cp.pgidx %u", copykey.m_page_idx);
+
+ /* Ini_ins: copykey.m_page_idx will reflect the first time allocated size
+ Any shrink should be found from the realsz of the copy tuple at commit
+ */
+ if (needed <= previously_alloced)
+ {
+ if (0)
+ ndbout_c("initial insert and equal or shrink pgidx %u",
+ copykey.m_page_idx);
+ return 0;
+ }
+ if (0) ndbout_c("initial insert and grow");
+ // implement later: wait for disk_page and expand on it if possible
+ Page_cache_client::Request req;
+ memcpy(&req.m_page, ©key, sizeof(Local_key));
+
+ /* +1 for the extra word prealloc'd for the index slot. */
+ disk_page_abort_prealloc(signal, fragPtr.p,
+ &req.m_page, req.m_page.m_page_idx+1);
+ goto prealloc_full_sz;
+ }
+ else
+ {
+ /* Not initial_insert:
+ Originally_alloced tuple and its size will be kept intact.
+ */
+ ndbassert(org_bits & Tuple_header::DISK_PART);
+
+ memcpy(&orgkey, org->get_disk_ref_ptr(regTabPtr), sizeof(orgkey));
+ Uint32 tmp_pg_no = orgkey.m_page_no;
+ orgkey.m_page_no = req_struct->m_disk_page_ptr.i;
+ Uint32 *org_ptr = get_dd_ptr(&orgpage, &orgkey, regTabPtr);
+ originally_alloced =
+ ((Var_page*)orgpage.p)->get_entry_len(orgkey.m_page_idx);
+ orgkey.m_page_no = tmp_pg_no;
+ previously_alloced = originally_alloced;
+ if (copy_bits & Tuple_header::DISK_GROW)
+ previously_alloced += copykey.m_page_idx;
+
+ Uint32 reservesz = 0;
+ Uint32 unreservesz = 0;
+
+ if (!(copy_bits & Tuple_header::DISK_GROW))
+ {
+ // Not grown before.
+ if (needed <= originally_alloced)
+ {
+ /* Don't shrink the original tuple before commit,
+ in case transaction aborts
+ */
+ if (0) ndbout_c("!initial insert, needed <= ori_allocd sz");
+ if (needed < originally_alloced)
+ {
+ copy->m_header_bits |= Tuple_header::DISK_SHRINK;
+ }
+ else
+ {
+ // needed = originally_alloced, remove any flags
+ copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_SHRINK;
+ copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+ }
+ copykey.m_page_idx = orgkey.m_page_idx;
+ return 0;
+ }
+ else
+ {
+ // First time grow, needed > originally_alloced
+
+ // Remove any previous shrink
+ copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_SHRINK;
+
+ reservesz = needed - originally_alloced;
+ copykey.m_page_idx = reservesz;
+
+ if (originally_alloced + copykey.m_page_idx != sizes[DD+2])
+ {
+ ndbout_c("OBS !ini_ins First time grow:");
+ ndbout_c("needed %u > ori_allocd sz %u, reservesz %u sizes[DD] %u",
+ needed, originally_alloced, reservesz, sizes[DD]);
+ }
+ ndbassert(originally_alloced + copykey.m_page_idx == sizes[DD+2]);
+ if (0) ndbout_c("First time grow.");
+ }
+ }
+ else
+ {
+ /* Tuple has grown before. There was a previous prealloc. */
+ if (needed <= originally_alloced)
+ {
+ // Unreserve any previous realloc.
+ unreservesz = previously_alloced - originally_alloced;
+ copykey.m_page_idx = orgkey.m_page_idx;
+
+ // Remove any previous flag.
+ copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+
+ if (needed < originally_alloced)
+ {
+ copy->m_header_bits |= Tuple_header::DISK_SHRINK;
+ }
+ if (0)
+ {
+ ndbout_c("!ini_ins: needed %u <= ori_alloced && prev realloc",
+ needed);
+ ndbout_c("unresrv %u pg_idx %u ori_alloced %u",
+ unreservesz, copykey.m_page_idx, originally_alloced);
+ ndbout_c(" prev_alloced %u", previously_alloced);
+ }
+ }
+ else
+ {
+ // needed > originally_alloced
+ if (needed == previously_alloced)
+ {
+ if (0)
+ {
+ /* Riding on an allocation made by the operation
+ prior to ZDELETE.
+ */
+ ndbout_c("!initial insert, needed == prev_alloced %u pgidx %u",
+ needed, copykey.m_page_idx);
+ Operationrec* prevOp= req_struct->prevOpPtr.p;
+
+ if (prevOp->op_struct.op_type == ZDELETE &&
+ regOperPtr->op_struct.op_type == ZINSERT)
+ ndbout_c("insert after delete");
+ }
+ return 0;
+ }
+
+ if (needed < previously_alloced)
+ {
+ // Release any extra space preallocated.
+ unreservesz = previously_alloced - needed;
+ copykey.m_page_idx = needed - originally_alloced;
+ if (0)
+ {
+ ndbout_c("!ini_ins: shrink from prev_all");
+ ndbout_c("needed %u < previously_alloced %u unres %u",
+ needed, previously_alloced, unreservesz);
+ }
+ }
+ else if (needed > previously_alloced)
+ {
+ reservesz = needed - previously_alloced; // delta
+ copykey.m_page_idx = needed - originally_alloced;
+ if (0)
+ {
+ ndbout_c("!ini_ins: grow from prev_all:");
+ ndbout_c("needed %u > prev_alloced %u reservsz %u pg_idx %u",
+ needed, previously_alloced, reservesz,
+ copykey.m_page_idx);
+ }
+ }
+ }
+ }
+
+ // Unreserve any extra reserved space.
+ if (unreservesz != 0)
+ {
+ if (0)
+ {
+ ndbout_c("scaud unreserving abrt preal1 [%u %u]",
+ copykey.m_file_no, copykey.m_page_no);
+ ndbout_c("unresrvsz %u cp pg_idx %u unsetting preald",
+ unreservesz, copykey.m_page_idx);
+ }
+ disk_page_abort_prealloc_callback_1(signal, fragPtr.p, orgpage,
+ unreservesz);
+ memcpy(copy->get_disk_ref_ptr(regTabPtr), ©key, sizeof(Local_key));
+ if (reservesz == 0)
+ {
+ if (0)
+ ndbout_c("no new reservn pgidx %u, unsetting preald",
+ copykey.m_page_idx);
+
+ // Remove any previous flags.
+ copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+
+ return 0;
+ }
+ else
+ {
+ ndbout_c("OBS! unresrv and resrv at the same time");
+ ndbassert(0);
+ }
+ }
+
+ if (reservesz &&
+ disk_page_prealloc_on_same_page(signal, fragptr.p, orgkey,
+ orgpage, reservesz,
+ req_struct->m_disk_page_ptr.i))
+ {
+ if (0)
+ {
+ ndbout_c("Found space in orgpage [%u %u]",
+ (orgpage.p)->m_file_no, (orgpage.p)->m_page_no);
+ ndbout_c("copykey [%u %u] reservesz %u pg_idx %u orgpgidx %u",
+ copykey.m_file_no, copykey.m_page_no,reservesz,
+ copykey.m_page_idx, orgkey.m_page_idx);
+ }
+ regOperPtr->op_struct.m_disk_preallocated = 1;
+ regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
+ regOperPtr->op_struct.m_wait_log_buffer = 1;
+
+ copy->m_header_bits |= Tuple_header::DISK_GROW;
+ memcpy(copy->get_disk_ref_ptr(regTabPtr), ©key, sizeof(Local_key));
+
+ if (originally_alloced + copykey.m_page_idx != sizes[DD+2])
+ {
+ ndbout_c("!initial insert, grow:");
+ ndbout_c("needed %u > ori_allocd sz %u + pg_idx %u reservesz %u",
+ needed, originally_alloced, copykey.m_page_idx, reservesz);
+ }
+ ndbassert(originally_alloced + copykey.m_page_idx == sizes[DD+2]);
+ return 0;
+ }
+ else
+ {
+ if (0)
+ ndbout_c("No space in cur page [%u %u] reservesz %u pg_idx %u",
+ copykey.m_file_no, copykey.m_file_no,reservesz,
+ copykey.m_page_idx);
+ }
+ } // end: ! initial_insert
+
+ prealloc_full_sz:
+
+ if (!initial_insert)
+ {
+ Uint32 logrecsz = sizeof(Dbtup::Disk_undo::Alloc) >> 2;
+ Logfile_client lgman(this, c_lgman, fragptr.p->m_logfile_group_id);
+ int res = lgman.alloc_log_space(logrecsz);
+ if (unlikely(res < 0))
+ {
+ regOperPtr->m_undo_buffer_space= 0;
+ return res;
+ }
+ /* Moving existing tuple will be logged as delete-old and insert-new. */
+ regOperPtr->m_undo_buffer_space += logrecsz;
+ }
+ /* else, i.e. initial insert case, regOperPtr->m_undo_buffer_space is
+ already set in prepare_initial_insert.
+ */
+
+ Local_key tmp;
+ // +1 for index slot
+ int ret= disk_page_prealloc(signal, fragptr, &tmp,
+ regTabPtr->total_rec_size + 1);
+ if (unlikely(ret < 0))
+ {
+ regOperPtr->m_undo_buffer_space= 0;
+ return ret;
+ }
+
+ if (0)
+ ndbout_c("scaud preallocs/MOVE page[%u %u] totalsz %u",
+ tmp.m_file_no, tmp.m_page_no, regTabPtr->total_rec_size);
+
+ if (!initial_insert &&
+ tmp.m_file_no == orgkey.m_file_no &&
+ tmp.m_page_no == orgkey.m_page_no)
+ {
+ ndbout_c("OBS MV: orig key [%u %u] mvd key [%u %u] totalsz %u",
+ orgkey.m_file_no, orgkey.m_page_no, tmp.m_file_no,
+ tmp.m_page_no, regTabPtr->total_rec_size);
+ ndbassert(0);
+ }
+
+ if (!initial_insert)
+ {
+ copy->m_header_bits |= Tuple_header::DISK_MOVE;
+ regOperPtr->op_struct.m_load_diskpage_orig_on_commit = 1;
+ }
+ regOperPtr->op_struct.m_disk_preallocated = 1;
+
+ tmp.m_page_idx = regTabPtr->total_rec_size;
+ memcpy(copy->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
+ /*
+ Set ref from disk to mm
+ */
+ Local_key ref = regOperPtr->m_tuple_location;
+ ref.m_page_no = req_struct->frag_page_id;
+
+ Tuple_header* disk_ptr = req_struct->m_disk_ptr;
+ disk_ptr->m_header_bits = 0;
+ disk_ptr->m_base_record_ref = ref.ref();
+
+ return 0;
+}
+
+/* UPDATE when the disk tuple is of format varsz */
+
+int Dbtup::handleUpdateReq_dd_var(Signal* signal,
+ Ptr<Operationrec> regOperRecPtr,
+ Ptr<Fragrecord> fragPtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct* req_struct,
+ bool disk)
+{
+
+ Operationrec* operPtrP = regOperRecPtr.p;
+ Fragrecord* regFragPtr = fragptr.p;
+ Tuple_header *dst;
+ Tuple_header *base = req_struct->m_tuple_ptr, *org;
+ Uint32 * change_mask_ptr;
+ Uint32 src_tuplelen = 0;
+
+ if (disk && base->m_header_bits & Tuple_header::DISK_PART)
+ {
+ Local_key key;
+ PagePtr orgpage;
+ memcpy(&key, base->get_disk_ref_ptr(regTabPtr), sizeof(key));
+ key.m_page_no = req_struct->m_disk_page_ptr.i;
+ Uint32 *src_ptr = get_dd_ptr(&orgpage, &key, regTabPtr);
+ src_tuplelen = ((Var_page*)(orgpage.p))->get_entry_len(key.m_page_idx);
+ }
+ if ((dst = alloc_copy_tuple(regTabPtr,
+ &operPtrP->m_copy_tuple_location)) == 0)
+ {
+ terrorCode = ZMEM_NOMEM_ERROR;
+ goto error;
+ }
+
+ Uint32 tup_version;
+ change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
+ if (operPtrP->is_first_operation())
+ {
+ org = req_struct->m_tuple_ptr;
+ tup_version = org->get_tuple_version();
+ clear_change_mask_info(regTabPtr, change_mask_ptr);
+ }
+ else
+ {
+ Operationrec* prevOp = req_struct->prevOpPtr.p;
+ tup_version= prevOp->tupVersion;
+ org = get_copy_tuple(regTabPtr, &prevOp->m_copy_tuple_location);
+ copy_change_mask_info(regTabPtr,
+ change_mask_ptr,
+ get_change_mask_ptr(regTabPtr, org));
+ }
+
+ /* Check consistency before update/delete. */
+ req_struct->m_tuple_ptr = org;
+ if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+ (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
+ {
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ goto error;
+ }
+
+ req_struct->m_tuple_ptr = dst;
+
+ Uint32 sizes[4];
+
+ disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
+
+ sizes[DD] = 0;
+ if (regTabPtr->need_expand(disk))
+ {
+ expand_tuple(req_struct, sizes, org, regTabPtr, disk);
+
+ if (disk && operPtrP->m_undo_buffer_space == 0)
+ {
+ operPtrP->op_struct.m_wait_log_buffer = 1;
+ operPtrP->op_struct.m_load_diskpage_on_commit = 1;
+ Uint32 sz= operPtrP->m_undo_buffer_space =
+ (sizeof(Dbtup::Disk_undo::Update) >> 2) + src_tuplelen - 1; // - 1 ???
+
+ D("Logfile_client - handleUpdateReq");
+ Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
+ terrorCode = lgman.alloc_log_space(sz);
+ if (unlikely(terrorCode))
+ {
+ operPtrP->m_undo_buffer_space = 0;
+ goto error;
+ }
+ }
+ }
+ else
+ {
+ memcpy(dst, org, 4 * regTabPtr->m_offsets[MM].m_fix_header_size);
+ req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
+ }
+
+ tup_version = (tup_version + 1) & ZTUP_VERSION_MASK;
+ operPtrP->tupVersion = tup_version;
+
+ req_struct->optimize_options = 0;
+
+ if (!req_struct->interpreted_exec)
+ {
+ jam();
+ int retValue = updateAttributes(req_struct,
+ &cinBuffer[0],
+ req_struct->attrinfo_len);
+ if (unlikely(retValue == -1))
+ goto error;
+ }
+ else
+ {
+ jam();
+ if (unlikely(interpreterStartLab(signal, req_struct) == -1))
+ return -1;
+ }
+
+ update_change_mask_info(regTabPtr,
+ change_mask_ptr,
+ req_struct->changeMask.rep.data);
+
+ switch (req_struct->optimize_options) {
+ case AttributeHeader::OPTIMIZE_MOVE_VARPART:
+ /*
+ optimize varpart of tuple, move varpart of tuple from
+ big-free-size page list into small-free-size page list
+ */
+ if (base->m_header_bits & Tuple_header::VAR_PART)
+ optimize_var_part(req_struct, base, operPtrP,
+ regFragPtr, regTabPtr);
+ break;
+ case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
+ //TODO: move fix part of tuple
+ break;
+ default:
+ break;
+ }
+
+
+ sizes[DD+2] = 0;
+ if (regTabPtr->need_shrink())
+ {
+ shrink_tuple_dd_var(req_struct, sizes + 2, regTabPtr, disk);
+ if (sizes[MM] != sizes[MM+2] &&
+ handle_size_change_after_update(req_struct, base,
+ operPtrP, regFragPtr,
+ regTabPtr, sizes))
+ {
+ goto error;
+ }
+ if (disk && sizes[DD] != sizes[DD+2] &&
+ unlikely(handle_size_change_after_update_disk(signal,
+ req_struct,
+ base,
+ regOperRecPtr,
+ fragPtr,
+ regTabPtr,
+ sizes)))
+ {
+ goto error;
+ }
+ }
+
+ if (req_struct->m_reorg)
+ {
+ handle_reorg(req_struct, regFragPtr->fragStatus);
+ }
+
+ req_struct->m_tuple_ptr->set_tuple_version(tup_version);
+ if (regTabPtr->m_bits & Tablerec::TR_Checksum)
+ {
+ jam();
+ setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+ }
+ return 0;
+
+error:
+ tupkeyErrorLab(signal);
+ return -1;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-11-09 09:42:56 +0000
@@ -726,7 +726,8 @@ Dbtup::initTab(Tablerec* const regTabPtr
regTabPtr->tabDescriptor = RNIL;
regTabPtr->readKeyArray = RNIL;
- regTabPtr->dynTabDescriptor = RNIL;
+ regTabPtr->dynTabDescriptor[MM] = RNIL;
+ regTabPtr->dynTabDescriptor[DD] = RNIL;
regTabPtr->m_bits = 0;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2009-09-21 12:17:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2009-11-09 09:42:56 +0000
@@ -109,12 +109,27 @@ Dbtup::execCREATE_TAB_REQ(Signal* signal
regTabPtr.p->m_attributes[DD].m_no_of_dyn_fix= 0;
regTabPtr.p->m_attributes[DD].m_no_of_dyn_var= 0;
- // Reserve space for bitmap length
- regTabPtr.p->m_dyn_null_bits= DYN_BM_LEN_BITS;
+ /* Reserve space for bitmap length. */
+ regTabPtr.p->m_dyn_null_bits[MM]= DYN_BM_LEN_BITS;
+ regTabPtr.p->m_dyn_null_bits[DD]= DYN_BM_LEN_BITS;
regTabPtr.p->noOfKeyAttr= req->noOfKeyAttr;
regTabPtr.p->noOfCharsets= req->noOfCharsets;
regTabPtr.p->m_no_of_attributes= req->noOfAttributes;
- regTabPtr.p->dynTabDescriptor= RNIL;
+
+/*
+ if (req->diskTupleFormat == DISK_TUPLE_FIXED_SIZE)
+ ndbout_c("Table id %u created with DISK_TUPLE_FIXED_SIZE", regTabPtr.i);
+ if (req->diskTupleFormat == DISK_TUPLE_VAR_SIZE)
+ ndbout_c("Table id %u created with DISK_TUPLE_VAR_SIZE", regTabPtr.i);
+ else
+ goto error;
+// if (regTabPtr.i <7)
+*/
+// regTabPtr.p->m_disk_tuple_format= DISK_TUPLE_FIXED_SIZE;
+ regTabPtr.p->m_disk_tuple_format= DISK_TUPLE_VAR_SIZE;
+
+ regTabPtr.p->dynTabDescriptor[MM]= RNIL;
+ regTabPtr.p->dynTabDescriptor[DD]= RNIL;
{
Uint32 offset[10];
@@ -176,17 +191,33 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
const bool lastAttr = (fragOperPtr.p->attributeCount == 0);
Uint32 firstTabDesIndex= regTabPtr.p->tabDescriptor + (attrId * ZAD_SIZE);
+ Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
+ Uint32 dd_varsz= (regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE);
+ if (ind && dd_varsz)
+ {
+ AttributeDescriptor::setDynamic(attrDescriptor, true);
+ }
+
setTabDescrWord(firstTabDesIndex, attrDescriptor);
Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
Uint32 attrDes2= 0;
Uint32 bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
Uint32 words= (bytes + 3) / 4;
- Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
jam();
Uint32 null_pos;
- ndbrequire(ind <= 1);
+
+ if (dd_varsz)
+ {
+ /* Disk data will be dynamic only. */
+ ndbrequire(ind < 1);
+ }
+ else
+ {
+ ndbrequire(ind <= 1);
+ }
+
null_pos= fragOperPtr.p->m_null_bits[ind];
if (AttributeDescriptor::getNullable(attrDescriptor))
@@ -196,7 +227,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
}
if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED
- || ind==DD)
+ || (ind==DD &&
+ regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_FIXED_SIZE))
{
jam();
regTabPtr.p->m_attributes[ind].m_no_of_fixsize++;
@@ -216,8 +248,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
AttributeOffset::setNullFlagPos(attrDes2, null_pos);
} else {
- /* A dynamic attribute. */
- ndbrequire(ind==MM);
+ /* A dynamic attribute can be either mm-dyn or disk data. */
+ if (!dd_varsz)
+ ndbrequire(ind==MM);
regTabPtr.p->m_attributes[ind].m_no_of_dynamic++;
/*
* The dynamic attribute format always require a 'null' bit. So
@@ -227,7 +260,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
* xxx internally as 'null'.
*/
- Uint32 null_pos= regTabPtr.p->m_dyn_null_bits;
+ Uint32 null_pos= regTabPtr.p->m_dyn_null_bits[ind];
if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED)
{
@@ -246,7 +279,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
* ensure that the full size bitmap is stored when non-NULL.
*/
null_pos+= bits;
- regTabPtr.p->m_dyn_null_bits+= bits+1;
+ regTabPtr.p->m_dyn_null_bits[ind]+= bits+1;
}
else
{
@@ -261,7 +294,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
regTabPtr.p->m_attributes[ind].m_no_of_dyn_fix++;
Uint32 null_bits= (bytes+3) >> 2;
- regTabPtr.p->m_dyn_null_bits+= null_bits;
+ regTabPtr.p->m_dyn_null_bits[ind]+= null_bits;
}
}
else
@@ -270,7 +303,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
treat_as_varsize:
jam();
regTabPtr.p->m_attributes[ind].m_no_of_dyn_var++;
- regTabPtr.p->m_dyn_null_bits++;
+ regTabPtr.p->m_dyn_null_bits[ind]++;
}
AttributeOffset::setNullFlagPos(attrDes2, null_pos);
@@ -307,22 +340,37 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal*
regTabPtr.p->m_offsets[DD].m_null_words= BTW(fragOperPtr.p->m_null_bits[DD]);
#undef BTW
+ Uint32 tup_format= regTabPtr.p->m_disk_tuple_format;
+
{
/* Allocate dynamic descriptor. */
- Uint32 offset[3];
- Uint32 allocSize= getDynTabDescrOffsets((regTabPtr.p->m_dyn_null_bits+31)>>5,
- offset);
- Uint32 dynTableDescriptorRef= allocTabDescr(allocSize);
- if (dynTableDescriptorRef == RNIL)
+
+ for (Uint32 i = 0; i < tup_format+1; ++i)
{
- jam();
- goto error;
+ Uint32 offset[3];
+ Uint32 allocSize = getDynTabDescrOffsets(
+ (regTabPtr.p->m_dyn_null_bits[i]+31)>>5,
+ offset);
+ Uint32 dynTableDescriptorRef = allocTabDescr(allocSize);
+ if (dynTableDescriptorRef == RNIL)
+ {
+ jam();
+ goto error;
+ }
+ setupDynDescriptorReferences(dynTableDescriptorRef,
+ regTabPtr.p, offset, i);
}
- setupDynDescriptorReferences(dynTableDescriptorRef, regTabPtr.p, offset);
}
/* Compute table aggregate metadata. */
- computeTableMetaData(regTabPtr.p);
+ if (dd_varsz)
+ {
+ computeTableMetaData_dd_var(regTabPtr.p);
+ }
+ else
+ {
+ computeTableMetaData(regTabPtr.p);
+ }
#if 0
ndbout << *regTabPtr.p << endl;
@@ -796,7 +844,7 @@ Dbtup::handleAlterTablePrepare(Signal *s
in ALTER_TAB_REQ[commit];
*/
Uint32 charsetIndex= regTabPtr->noOfCharsets;
- Uint32 dyn_nullbits= regTabPtr->m_dyn_null_bits;
+ Uint32 dyn_nullbits= regTabPtr->m_dyn_null_bits[MM];
if (dyn_nullbits == 0)
{
jam();
@@ -857,37 +905,59 @@ Dbtup::handleAlterTablePrepare(Signal *s
}
ndbassert(newNoOfCharsets==charsetIndex);
- regAlterTabOpPtr.p->noOfDynNullBits= dyn_nullbits;
- ndbassert(noDynamic ==
- regTabPtr->m_attributes[MM].m_no_of_dynamic + noOfNewAttr);
- regAlterTabOpPtr.p->noOfDynFix= noDynFix;
- regAlterTabOpPtr.p->noOfDynVar= noDynVar;
- regAlterTabOpPtr.p->noOfDynamic= noDynamic;
-
- /* Allocate the new (possibly larger) dynamic descriptor. */
- allocSize= getDynTabDescrOffsets((dyn_nullbits+31)>>5,
- regAlterTabOpPtr.p->dynTabDesOffset);
- Uint32 dynTableDescriptorRef = RNIL;
- if (ERROR_INSERTED(4029))
+ regAlterTabOpPtr.p->noOfDynNullBits[MM]= dyn_nullbits;
+ if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
{
- jam();
- dynTableDescriptorRef = RNIL;
- terrorCode = ZMEM_NOTABDESCR_ERROR;
- CLEAR_ERROR_INSERT_VALUE;
+ /* Save disk-data related info from the old table. */
+ regAlterTabOpPtr.p->disk_tuple_format = regTabPtr->m_disk_tuple_format;
+ regAlterTabOpPtr.p->noOfDynFix[DD] =
+ regTabPtr->m_attributes[DD].m_no_of_dyn_fix;
+ regAlterTabOpPtr.p->noOfDynVar[DD] =
+ regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+ regAlterTabOpPtr.p->noOfDynamic[DD] =
+ regTabPtr->m_attributes[DD].m_no_of_dyn_fix +
+ regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+ regAlterTabOpPtr.p->noOfDynNullBits[DD] =
+ regTabPtr->m_dyn_null_bits[DD];
}
- else
- {
- jam();
- dynTableDescriptorRef = allocTabDescr(allocSize);
- }
- if (dynTableDescriptorRef == RNIL) {
- jam();
- releaseTabDescr(tableDescriptorRef);
- releaseAlterTabOpRec(regAlterTabOpPtr);
- sendAlterTabRef(signal, terrorCode);
- return;
+ ndbassert(noDynamic ==
+ regTabPtr->m_attributes[MM].m_no_of_dynamic + noOfNewAttr);
+
+ regAlterTabOpPtr.p->noOfDynFix[MM]= noDynFix;
+ regAlterTabOpPtr.p->noOfDynVar[MM]= noDynVar;
+ regAlterTabOpPtr.p->noOfDynamic[MM]= noDynamic;
+
+ /* Allocate the new (possibly larger) dynamic descriptor for MM and DD. */
+
+ for (int i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+ {
+ Uint32 offset[3];
+ allocSize =
+ getDynTabDescrOffsets((regAlterTabOpPtr.p->noOfDynNullBits[i]+31)>>5,
+ offset);
+ Uint32 dynTableDescriptorRef = RNIL;
+ if (ERROR_INSERTED(4029))
+ {
+ jam();
+ dynTableDescriptorRef = RNIL;
+ terrorCode = ZMEM_NOTABDESCR_ERROR;
+ CLEAR_ERROR_INSERT_VALUE;
+ }
+ else
+ {
+ jam();
+ dynTableDescriptorRef= allocTabDescr(allocSize);
+ }
+ if (dynTableDescriptorRef == RNIL) {
+ jam();
+ releaseTabDescr(tableDescriptorRef);
+ releaseAlterTabOpRec(regAlterTabOpPtr);
+ sendAlterTabRef(signal, terrorCode);
+ return;
+ }
+ regAlterTabOpPtr.p->dynDesAllocSize[i]= allocSize;
+ regAlterTabOpPtr.p->dynTableDescriptor[i]= dynTableDescriptorRef;
}
- regAlterTabOpPtr.p->dynTableDescriptor= dynTableDescriptorRef;
connectPtr = regAlterTabOpPtr.i;
}
@@ -921,28 +991,55 @@ Dbtup::handleAlterTableCommit(Signal *si
/* Free old table descriptors. */
releaseTabDescr(regTabPtr);
-
+ Uint32 tup_format= regAlterTabOpPtr.p->disk_tuple_format;
/* Set new attribute counts. */
regTabPtr->m_no_of_attributes= regAlterTabOpPtr.p->newNoOfAttrs;
regTabPtr->noOfCharsets= regAlterTabOpPtr.p->newNoOfCharsets;
regTabPtr->noOfKeyAttr= regAlterTabOpPtr.p->newNoOfKeyAttrs;
- regTabPtr->m_attributes[MM].m_no_of_dyn_fix= regAlterTabOpPtr.p->noOfDynFix;
- regTabPtr->m_attributes[MM].m_no_of_dyn_var= regAlterTabOpPtr.p->noOfDynVar;
- regTabPtr->m_attributes[MM].m_no_of_dynamic= regAlterTabOpPtr.p->noOfDynamic;
- regTabPtr->m_dyn_null_bits= regAlterTabOpPtr.p->noOfDynNullBits;
+ regTabPtr->m_disk_tuple_format= tup_format;
+
+ /* Transfer the alter table info to the new table record. */
+ for (int i = 0; i < tup_format+1; ++i)
+ {
+ regTabPtr->m_attributes[i].m_no_of_dyn_fix =
+ regAlterTabOpPtr.p->noOfDynFix[i];
+ regTabPtr->m_attributes[i].m_no_of_dyn_var =
+ regAlterTabOpPtr.p->noOfDynVar[i];
+ regTabPtr->m_attributes[i].m_no_of_dynamic =
+ regAlterTabOpPtr.p->noOfDynamic[i];
+ regTabPtr->m_dyn_null_bits[i] =
+ regAlterTabOpPtr.p->noOfDynNullBits[i];
+ }
/* Install the new (larger) table descriptors. */
setUpDescriptorReferences(regAlterTabOpPtr.p->tableDescriptor,
regTabPtr,
regAlterTabOpPtr.p->tabDesOffset);
- setupDynDescriptorReferences(regAlterTabOpPtr.p->dynTableDescriptor,
- regTabPtr,
- regAlterTabOpPtr.p->dynTabDesOffset);
+ /* Allocate dynamic descriptor for mm and disk data. */
+ for (int i = 0; i < tup_format+1; ++i)
+ {
+ Uint32 offset[3];
+ Uint32 allocSize =
+ getDynTabDescrOffsets((regTabPtr->m_dyn_null_bits[i]+31)>>5,
+ offset);
+ Uint32 dynTableDescriptorRef = allocTabDescr(allocSize);
+ ndbrequire(dynTableDescriptorRef != RNIL);
+ setupDynDescriptorReferences(regAlterTabOpPtr.p->dynTableDescriptor[i],
+ regTabPtr,
+ offset, i);
+ }
releaseAlterTabOpRec(regAlterTabOpPtr);
/* Recompute aggregate table meta data. */
- computeTableMetaData(regTabPtr);
+ if (tup_format == DISK_TUPLE_VAR_SIZE)
+ {
+ computeTableMetaData_dd_var(regTabPtr);
+ }
+ else
+ {
+ computeTableMetaData(regTabPtr);
+ }
}
if (AlterTableReq::getReorgFragFlag(req->changeMask))
@@ -1042,7 +1139,11 @@ Dbtup::handleAlterTableAbort(Signal *sig
ptrCheckGuard(regAlterTabOpPtr, cnoOfAlterTabOps, alterTabOperRec);
releaseTabDescr(regAlterTabOpPtr.p->tableDescriptor);
- releaseTabDescr(regAlterTabOpPtr.p->dynTableDescriptor);
+
+ for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+ {
+ releaseTabDescr(regAlterTabOpPtr.p->dynTableDescriptor[i]);
+ }
releaseAlterTabOpRec(regAlterTabOpPtr);
}
}
@@ -1089,12 +1190,12 @@ Dbtup::handleCharsetPos(Uint32 csNumber,
void
Dbtup::computeTableMetaData(Tablerec *regTabPtr)
{
- if (regTabPtr->m_dyn_null_bits == DYN_BM_LEN_BITS)
+ if (regTabPtr->m_dyn_null_bits[MM] == DYN_BM_LEN_BITS)
{
- regTabPtr->m_dyn_null_bits = 0;
+ regTabPtr->m_dyn_null_bits[MM] = 0;
}
- Uint32 dyn_null_words= (regTabPtr->m_dyn_null_bits+31)>>5;
+ Uint32 dyn_null_words= (regTabPtr->m_dyn_null_bits[MM]+31)>>5;
regTabPtr->m_offsets[MM].m_dyn_null_words= dyn_null_words;
/* Compute the size of the static headers. */
@@ -1150,7 +1251,6 @@ Dbtup::computeTableMetaData(Tablerec *re
We also compute the dynamic bitmasks here.
*/
Uint32 *tabDesc= (Uint32*)(tableDescriptor+regTabPtr->tabDescriptor);
- Uint32 *dynDesc= (Uint32*)(tableDescriptor+regTabPtr->dynTabDescriptor);
Uint32 fix_size[2]= {0, 0};
Uint32 var_size[2]= {0, 0};
Uint32 dyn_size[2]= {0, 0};
@@ -1160,8 +1260,8 @@ Dbtup::computeTableMetaData(Tablerec *re
Uint32 dynamic_count= 0;
regTabPtr->blobAttributeMask.clear();
regTabPtr->notNullAttributeMask.clear();
- bzero(regTabPtr->dynVarSizeMask, dyn_null_words<<2);
- bzero(regTabPtr->dynFixSizeMask, dyn_null_words<<2);
+ bzero(regTabPtr->dynVarSizeMask[MM], dyn_null_words<<2);
+ bzero(regTabPtr->dynFixSizeMask[MM], dyn_null_words<<2);
for(Uint32 i= 0; i<regTabPtr->m_no_of_attributes; i++)
{
@@ -1223,7 +1323,7 @@ Dbtup::computeTableMetaData(Tablerec *re
while(size_in_words-- > 0)
{
BitmaskImpl::set(dyn_null_words,
- regTabPtr->dynFixSizeMask, null_pos++);
+ regTabPtr->dynFixSizeMask[ind], null_pos++);
}
}
else
@@ -1234,7 +1334,7 @@ Dbtup::computeTableMetaData(Tablerec *re
treat_as_varsize:
jam();
off= dynvar_count++;
- BitmaskImpl::set(dyn_null_words, regTabPtr->dynVarSizeMask, null_pos);
+ BitmaskImpl::set(dyn_null_words, regTabPtr->dynVarSizeMask[ind], null_pos);
}
}
AttributeOffset::setOffset(attrDes2, off);
@@ -1393,12 +1493,13 @@ void Dbtup::setUpDescriptorReferences(Ui
void Dbtup::setupDynDescriptorReferences(Uint32 dynDescr,
Tablerec* const regTabPtr,
- const Uint32* offset)
+ const Uint32* offset,
+ Uint32 ind)
{
- regTabPtr->dynTabDescriptor= dynDescr;
+ regTabPtr->dynTabDescriptor[ind]= dynDescr;
Uint32* desc= &tableDescriptor[dynDescr].tabDescr;
- regTabPtr->dynVarSizeMask= desc+offset[0];
- regTabPtr->dynFixSizeMask= desc+offset[1];
+ regTabPtr->dynVarSizeMask[ind]= desc+offset[0];
+ regTabPtr->dynFixSizeMask[ind]= desc+offset[1];
}
Uint32
@@ -1579,14 +1680,19 @@ void Dbtup::releaseTabDescr(Tablerec* co
releaseTabDescr(descriptor);
}
- descriptor= regTabPtr->dynTabDescriptor;
- if(descriptor != RNIL)
+ /* Release dynamic descriptor, etc for mm and disk data. */
+
+ for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
{
- jam();
- regTabPtr->dynTabDescriptor= RNIL;
- regTabPtr->dynVarSizeMask= NULL;
- regTabPtr->dynFixSizeMask= NULL;
- releaseTabDescr(descriptor);
+ descriptor= regTabPtr->dynTabDescriptor[i];
+ if(descriptor != RNIL)
+ {
+ jam();
+ regTabPtr->dynTabDescriptor[i] = RNIL;
+ regTabPtr->dynVarSizeMask[i] = NULL;
+ regTabPtr->dynFixSizeMask[i] = NULL;
+ releaseTabDescr(descriptor);
+ }
}
}
@@ -2181,10 +2287,23 @@ Dbtup::start_restore_lcp(Uint32 tableId,
TablerecPtr tabPtr;
tabPtr.i= tableId;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-
- tabPtr.p->m_dropTable.tabUserPtr= tabPtr.p->m_attributes[DD].m_no_of_fixsize;
- tabPtr.p->m_dropTable.tabUserRef= tabPtr.p->m_attributes[DD].m_no_of_varsize;
-
+
+ Uint32 tup_format= tabPtr.p->m_disk_tuple_format;
+ if (tup_format == DISK_TUPLE_VAR_SIZE)
+ {
+ tabPtr.p->m_dropTable.tabUserPtr =
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_fix;
+ tabPtr.p->m_dropTable.tabUserRef =
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_var;
+ }
+ else
+ {
+ tabPtr.p->m_dropTable.tabUserPtr =
+ tabPtr.p->m_attributes[DD].m_no_of_fixsize;
+ tabPtr.p->m_dropTable.tabUserRef =
+ tabPtr.p->m_attributes[DD].m_no_of_varsize;
+ }
+
Uint32 *tabDesc = (Uint32*)(tableDescriptor+tabPtr.p->tabDescriptor);
for(Uint32 i= 0; i<tabPtr.p->m_no_of_attributes; i++)
{
@@ -2200,6 +2319,8 @@ Dbtup::start_restore_lcp(Uint32 tableId,
tabPtr.p->m_no_of_disk_attributes = 0;
tabPtr.p->m_attributes[DD].m_no_of_fixsize = 0;
tabPtr.p->m_attributes[DD].m_no_of_varsize = 0;
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_fix = 0;
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_var = 0;
}
void
Dbtup::complete_restore_lcp(Signal* signal,
@@ -2210,13 +2331,27 @@ Dbtup::complete_restore_lcp(Signal* sign
tabPtr.i= tableId;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- tabPtr.p->m_attributes[DD].m_no_of_fixsize= tabPtr.p->m_dropTable.tabUserPtr;
- tabPtr.p->m_attributes[DD].m_no_of_varsize= tabPtr.p->m_dropTable.tabUserRef;
-
- tabPtr.p->m_no_of_disk_attributes =
- tabPtr.p->m_attributes[DD].m_no_of_fixsize +
- tabPtr.p->m_attributes[DD].m_no_of_varsize;
-
+ Uint32 tup_format = tabPtr.p->m_disk_tuple_format;
+ if (tup_format == DISK_TUPLE_VAR_SIZE)
+ {
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_fix =
+ tabPtr.p->m_dropTable.tabUserPtr;
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_var =
+ tabPtr.p->m_dropTable.tabUserRef;
+
+ tabPtr.p->m_no_of_disk_attributes =
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_fix +
+ tabPtr.p->m_attributes[DD].m_no_of_dyn_var;
+ }
+ else
+ {
+ tabPtr.p->m_attributes[DD].m_no_of_fixsize= tabPtr.p->m_dropTable.tabUserPtr;
+ tabPtr.p->m_attributes[DD].m_no_of_varsize= tabPtr.p->m_dropTable.tabUserRef;
+ tabPtr.p->m_no_of_disk_attributes =
+ tabPtr.p->m_attributes[DD].m_no_of_fixsize +
+ tabPtr.p->m_attributes[DD].m_no_of_varsize;
+ }
+
Uint32 *tabDesc = (Uint32*)(tableDescriptor+tabPtr.p->tabDescriptor);
for(Uint32 i= 0; i<tabPtr.p->m_no_of_attributes; i++)
{
@@ -2315,3 +2450,335 @@ Dbtup::execDROP_FRAG_REQ(Signal* signal)
signal, DropFragConf::SignalLength, JBB);
}
+
+/*
+ The following methods handle disk data as fixed or variable size,
+ depending on the m_disk_tuple_format.
+*/
+
+void
+Dbtup::computeTableMetaData_dd_var(Tablerec *regTabPtr)
+{
+ Uint32 dyn_null_words[2];
+
+ for (Uint32 ind = 0; ind < regTabPtr->m_disk_tuple_format + 1; ++ind)
+ {
+ if (regTabPtr->m_dyn_null_bits[ind] == DYN_BM_LEN_BITS)
+ {
+ regTabPtr->m_dyn_null_bits[ind] = 0;
+ }
+
+ dyn_null_words[ind] = (regTabPtr->m_dyn_null_bits[ind]+31)>>5;
+ regTabPtr->m_offsets[ind].m_dyn_null_words = dyn_null_words[ind];
+ }
+
+ /* Compute the size of the static headers. */
+ Uint32 pos[2] = { 0, 0 };
+ if (regTabPtr->m_bits & Tablerec::TR_Checksum)
+ {
+ pos[MM]++;
+ }
+
+ if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
+ {
+ pos[MM]++;
+ pos[DD]++;
+ }
+
+ regTabPtr->m_no_of_disk_attributes =
+ regTabPtr->m_attributes[DD].m_no_of_dynamic;
+ if(regTabPtr->m_no_of_disk_attributes > 0)
+ {
+ /* Room for disk part location. */
+ regTabPtr->m_offsets[MM].m_disk_ref_offset = pos[MM];
+ pos[MM] += Disk_part_ref::SZ32; // 8 bytes
+ regTabPtr->m_bits |= Tablerec::TR_DiskPart;
+ }
+ else
+ {
+ regTabPtr->m_offsets[MM].m_disk_ref_offset = pos[MM] - Disk_part_ref::SZ32;
+ }
+
+ if (regTabPtr->m_attributes[MM].m_no_of_varsize ||
+ regTabPtr->m_attributes[MM].m_no_of_dynamic)
+ {
+ pos[MM] += Var_part_ref::SZ32;
+ regTabPtr->m_bits &= ~(Uint32)Tablerec::TR_ForceVarPart;
+ }
+ else if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
+ {
+ pos[MM] += Var_part_ref::SZ32;
+ }
+
+ regTabPtr->m_offsets[MM].m_null_offset = pos[MM];
+ regTabPtr->m_offsets[DD].m_null_offset = pos[DD];
+ pos[MM] += regTabPtr->m_offsets[MM].m_null_words;
+ pos[DD] += regTabPtr->m_offsets[DD].m_null_words;
+
+ /*
+ Compute the offsets for the attributes.
+ For static fixed-size, this is the offset from the tuple pointer of the
+ actual data.
+ For static var-size and dynamic, this is the index into the offset array.
+
+ We also compute the dynamic bitmasks here.
+ */
+ Uint32 *tabDesc = (Uint32*)(tableDescriptor+regTabPtr->tabDescriptor);
+ Uint32 fix_size = 0;
+ Uint32 var_size = 0;
+ Uint32 statvar_count = 0;
+ Uint32 dyn_size[2] = {0, 0};
+ Uint32 dynfix_count[2] = {0, 0};
+ Uint32 dynvar_count[2] = {0, 0};
+ Uint32 dynamic_count[2]= {0, 0};
+ regTabPtr->blobAttributeMask.clear();
+ regTabPtr->notNullAttributeMask.clear();
+
+ for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+ {
+ bzero(regTabPtr->dynVarSizeMask[i], dyn_null_words[i]<<2);
+ bzero(regTabPtr->dynFixSizeMask[i], dyn_null_words[i]<<2);
+ }
+
+ for(Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++)
+ {
+ Uint32 attrDescriptor = *tabDesc++;
+ Uint32 attrDes2 = *tabDesc;
+ Uint32 ind = AttributeDescriptor::getDiskBased(attrDescriptor);
+ Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
+ Uint32 arr = AttributeDescriptor::getArrayType(attrDescriptor);
+ Uint32 size_in_words = AttributeDescriptor::getSizeInWords(attrDescriptor);
+ Uint32 size_in_bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 off;
+
+ if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+ regTabPtr->blobAttributeMask.set(i);
+ if(!AttributeDescriptor::getNullable(attrDescriptor))
+ regTabPtr->notNullAttributeMask.set(i);
+ if (!AttributeDescriptor::getDynamic(attrDescriptor))
+ {
+ ndbassert(ind == MM); // Disk data will be dynamic only.
+ if(arr == NDB_ARRAYTYPE_FIXED)
+ {
+ if (attrLen != 0)
+ {
+ off = fix_size + pos[ind];
+ fix_size += size_in_words;
+ }
+ else
+ off = 0; // Bit type
+ }
+ else
+ {
+ /* Static varsize. */
+ ndbassert(ind == MM);
+ off = statvar_count++;
+ var_size += size_in_bytes;
+ }
+ }
+ else
+ {
+ /* Dynamic attribute - both memory and disk. */
+ dynamic_count[ind]++;
+ Uint32 null_pos = AttributeOffset::getNullFlagPos(attrDes2);
+ dyn_size[ind] += (size_in_words<<2);
+ if(arr == NDB_ARRAYTYPE_FIXED)
+ {
+ jam();
+ //if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+ //regTabPtr->blobAttributeMask.set(i);
+ // ToDo: I wonder what else is needed to handle BLOB/TEXT, if anything?
+
+ if (attrLen != 0)
+ {
+ jam();
+ if(size_in_words>InternalMaxDynFix)
+ goto treat_as_varsize;
+
+ off =
+ dynfix_count[ind]++ + regTabPtr->m_attributes[ind].m_no_of_dyn_var;
+ while(size_in_words-- > 0)
+ {
+ BitmaskImpl::set(dyn_null_words[ind],
+ regTabPtr->dynFixSizeMask[ind], null_pos++);
+ }
+ }
+ else
+ off = 0; // Bit type
+ }
+ else
+ {
+ treat_as_varsize:
+ jam();
+ off = dynvar_count[ind]++;
+ BitmaskImpl::set(dyn_null_words[ind], regTabPtr->dynVarSizeMask[ind], null_pos);
+ }
+ }
+ AttributeOffset::setOffset(attrDes2, off);
+ *tabDesc++= attrDes2;
+ }
+ ndbassert(dynvar_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dyn_var);
+ ndbassert(dynfix_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dyn_fix);
+ ndbassert(dynamic_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dynamic);
+ ndbassert(statvar_count==regTabPtr->m_attributes[MM].m_no_of_varsize);
+
+ ndbassert(dynvar_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dyn_var);
+ ndbassert(dynfix_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dyn_fix);
+ ndbassert(dynamic_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dynamic);
+
+ regTabPtr->m_offsets[MM].m_fix_header_size =
+ Tuple_header::HeaderSize + fix_size + pos[MM];
+ regTabPtr->m_offsets[DD].m_fix_header_size =
+ Tuple_header::HeaderSize+pos[DD]+1;
+
+ Uint32 mm_vars = regTabPtr->m_attributes[MM].m_no_of_varsize;
+ Uint32 mm_dyns = regTabPtr->m_attributes[MM].m_no_of_dyn_fix +
+ regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+ Uint32 dd_dyns = regTabPtr->m_attributes[DD].m_no_of_dyn_fix +
+ regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+
+ regTabPtr->m_offsets[MM].m_max_var_offset = var_size;
+ /*
+ Size of the expanded dynamic part. Needs room for bitmap, (N+1) 16-bit
+ offset words with 32-bit padding, and all attribute data.
+ */
+ regTabPtr->m_offsets[MM].m_max_dyn_offset =
+ (regTabPtr->m_offsets[MM].m_dyn_null_words<<2) +
+ 4*((mm_dyns+2)>>1) + dyn_size[MM];
+
+ regTabPtr->m_offsets[DD].m_max_dyn_offset =
+ (regTabPtr->m_offsets[DD].m_dyn_null_words<<2) +
+ 4*((dd_dyns+2)>>1) + dyn_size[DD];
+
+ /* Room for data for all the attributes. */
+ Uint32 total_rec_size =
+ pos[MM] + fix_size +
+ ((var_size + 3) >> 2) + ((dyn_size[MM] + 3) >> 2) +
+ regTabPtr->m_offsets[DD].m_fix_header_size +
+ ((dyn_size[DD] + 3) >> 2);
+
+ if(dd_dyns)
+ {
+ regTabPtr->m_offsets[DD].m_fix_header_size +=
+ (regTabPtr->m_offsets[DD].m_max_dyn_offset+3)>>2;
+
+ regTabPtr->m_offsets[DD].m_fix_header_size += 1; // space for dynlen
+ }
+
+ /*
+ Room for offset arrays and dynamic bitmaps. There is one extra 16-bit
+ offset in each offset array (for easy computation of final length).
+ Also one word for storing total length of varsize+dynamic part
+ */
+ if(mm_vars + regTabPtr->m_attributes[MM].m_no_of_dynamic)
+ {
+ total_rec_size += (mm_vars + 2) >> 1;
+ total_rec_size += regTabPtr->m_offsets[MM].m_dyn_null_words;
+ total_rec_size += (mm_dyns + 2) >> 1;
+ total_rec_size += 1;
+ }
+
+ if (dd_dyns)
+ {
+ total_rec_size += regTabPtr->m_offsets[DD].m_dyn_null_words;
+ total_rec_size += (dd_dyns + 2) >> 1;
+ total_rec_size += 2;
+ }
+
+ /* Room for the header. */
+ total_rec_size += Tuple_header::HeaderSize;
+ if(regTabPtr->m_no_of_disk_attributes)
+ total_rec_size += Tuple_header::HeaderSize;
+
+ /* Room for changemask */
+ total_rec_size += (regTabPtr->m_no_of_attributes + 31) >> 5;
+
+ regTabPtr->total_rec_size = total_rec_size;
+
+ setUpQueryRoutines(regTabPtr);
+ setUpKeyArray_dd_var(regTabPtr);
+}
+
+void Dbtup::setUpKeyArray_dd_var(Tablerec* const regTabPtr)
+{
+ ndbrequire((regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr) <
+ cnoOfTabDescrRec);
+ Uint32* keyArray = &tableDescriptor[regTabPtr->readKeyArray].tabDescr;
+ Uint32 countKeyAttr = 0;
+ for (Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++) {
+ jam();
+ Uint32 refAttr = regTabPtr->tabDescriptor + (i * ZAD_SIZE);
+ Uint32 attrDescriptor = getTabDescrWord(refAttr);
+ if (AttributeDescriptor::getPrimaryKey(attrDescriptor)) {
+ jam();
+ AttributeHeader::init(&keyArray[countKeyAttr], i, 0);
+ countKeyAttr++;
+ }
+ }
+ ndbrequire(countKeyAttr == regTabPtr->noOfKeyAttr);
+
+ /*
+ Setup real order array (16 bit per column)
+
+ Sequence is [mm_fix mm_var mm_dynfix mm_dynvar dd_dynfix dd_dynvar]
+ */
+ const Uint32 off = regTabPtr->m_real_order_descriptor;
+ const Uint32 sz = (regTabPtr->m_no_of_attributes + 1) >> 1;
+ ndbrequire((off + sz) < cnoOfTabDescrRec);
+
+ Uint32 cnt = 0;
+ Uint16* order = (Uint16*)&tableDescriptor[off].tabDescr;
+ for (Uint32 type = 0; type < 6; type++)
+ {
+ for (Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++)
+ {
+ jam();
+ Uint32 refAttr = regTabPtr->tabDescriptor + (i * ZAD_SIZE);
+ Uint32 desc = getTabDescrWord(refAttr);
+ bool disk = AttributeDescriptor::getDiskBased(desc);
+ Uint32 t = 0;
+
+ if (AttributeDescriptor::getDynamic(desc) &&
+ AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+ AttributeDescriptor::getSize(desc) == 0)
+ {
+ /*
+ Dynamic bit types are stored inside the dynamic NULL bitmap, and are
+ never expanded. So we do not need any real_order_descriptor for
+ them.
+ */
+ jam();
+ if((!disk && type == 0) || (disk && type == 4))
+ {
+ cnt++;
+ continue;
+ }
+ }
+ else {
+ if ((AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED) ||
+ (AttributeDescriptor::getDynamic(desc) &&
+ AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+ AttributeDescriptor::getSizeInWords(desc) > InternalMaxDynFix))
+ {
+ t += 1;
+ }
+ if (AttributeDescriptor::getDynamic(desc) && !disk)
+ {
+ t += 2;
+ }
+ if (disk)
+ {
+ t += 4;
+ }
+ ndbrequire(t < 6);
+ if(t == type)
+ {
+ * order++ = i << ZAD_LOG_SIZE;
+ cnt++;
+ }
+ }
+ }
+ }
+ ndbrequire(cnt == regTabPtr->m_no_of_attributes);
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2009-11-09 09:42:56 +0000
@@ -627,8 +627,11 @@ Dbtup::readFixedSizeTHZeroWordNULLable(U
bool
Dbtup::nullFlagCheck(KeyReqStruct *req_struct, Uint32 attrDes2)
{
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
Tablerec* const regTabPtr= tabptr.p;
- Uint32 *bits= req_struct->m_tuple_ptr->get_null_bits(regTabPtr);
+ Uint32 *bits= (ind) ? req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD) :
+ req_struct->m_tuple_ptr->get_null_bits(regTabPtr);
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
return BitmaskImpl::get(regTabPtr->m_offsets[MM].m_null_words, bits, pos);
@@ -858,12 +861,16 @@ Dbtup::readDynFixedSizeExpandedNotNULL(U
using different data base pointer and offset/lenght arrays.
*/
jam();
- char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
Uint32 var_index= AttributeOffset::getOffset(attrDes2);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 var_attr_pos= off_arr[var_index];
Uint32 vsize_in_bytes=
- AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
+ AttributeDescriptor::getSizeInBytes(attr_descriptor);
return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
src_ptr + var_attr_pos, vsize_in_bytes);
}
@@ -878,7 +885,11 @@ Dbtup::readDynFixedSizeExpandedNULLable(
Check for NULL. In the expanded format, the bitmap is guaranteed
to be stored in full length.
*/
- Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
{
@@ -897,8 +908,12 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
ndbrequire(dyn_len != 0);
Uint32 bm_len= (* bm_ptr) & DYN_BM_LEN_MASK; // In 32-bit words
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -918,7 +933,7 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
*/
jam();
Tablerec* regTabPtr = tabptr.p;
- Uint32 *bm_mask_ptr= regTabPtr->dynFixSizeMask;
+ Uint32 *bm_mask_ptr= regTabPtr->dynFixSizeMask[ind];
Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 prevMask= (1 << (pos & 31)) - 1;
Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -926,7 +941,6 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
bit_count+= BitmaskImpl::count_bits(bm_mask_ptr[i] & bm_ptr[i]);
/* Now compute the data pointer from the row length. */
- Uint32 attr_descriptor= req_struct->attr_descriptor;
Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
Uint32 vsize_in_words= (vsize_in_bytes+3)>>2;
Uint32 *data_ptr= bm_ptr + dyn_len - bit_count - vsize_in_words;
@@ -950,8 +964,11 @@ Dbtup::readDynFixedSizeShrunkenNULLable(
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
/* Check for NULL (including the case of an empty bitmap). */
if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1007,13 +1024,17 @@ Dbtup::readDynBigFixedSizeExpandedNotNUL
using different data base pointer and offset/lenght arrays.
*/
jam();
- char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
Uint32 var_index= AttributeOffset::getOffset(attrDes2);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 var_attr_pos= off_arr[var_index];
Uint32 vsize_in_bytes=
- AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
- Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+ AttributeDescriptor::getSizeInBytes(attr_descriptor);
+ Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
ndbrequire(vsize_in_bytes <= off_arr[var_index+idx] - var_attr_pos);
return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
src_ptr + var_attr_pos, vsize_in_bytes);
@@ -1029,7 +1050,11 @@ Dbtup::readDynBigFixedSizeExpandedNULLab
Check for NULL. In the expanded format, the bitmap is guaranteed
to be stored in full length.
*/
- Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
{
@@ -1048,8 +1073,12 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
ndbrequire(dyn_len!=0);
Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -1064,7 +1093,7 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
any trailing non-bitmap bytes to save a few conditionals.
*/
Tablerec* regTabPtr = tabptr.p;
- Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+ Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask[ind];
Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 prevMask= (1 << (pos & 31)) - 1;
Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -1072,7 +1101,6 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
bit_count+= BitmaskImpl::count_bits(bm_mask_ptr[i] & bm_ptr[i]);
/* Now find the data pointer and length from the offset array. */
- Uint32 attr_descriptor= req_struct->attr_descriptor;
Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
//Uint16 *offset_array= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
Uint16* offset_array = (Uint16*)(bm_ptr + bm_len);
@@ -1094,8 +1122,11 @@ Dbtup::readDynBigFixedSizeShrunkenNULLab
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
/* Check for NULL (including the case of an empty bitmap). */
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1142,12 +1173,17 @@ Dbtup::readDynBitsShrunkenNotNULL(Uint8*
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
ndbrequire(dyn_len != 0);
Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
Uint32 bitCount =
- AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+ AttributeDescriptor::getArraySize(attr_descriptor);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
/* Make sure we have sufficient data in the row. */
ndbrequire((pos>>5)<bm_len);
@@ -1166,8 +1202,11 @@ Dbtup::readDynBitsShrunkenNULLable(Uint8
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
/* Check for NULL (including the case of an empty bitmap). */
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1187,10 +1226,15 @@ Dbtup::readDynBitsExpandedNotNULL(Uint8*
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+ Uint32 attr_descriptor= req_struct->attr_descriptor;
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
Uint32 bitCount =
- AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+ AttributeDescriptor::getArraySize(attr_descriptor);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
/* The bit data is stored just before the NULL bit. */
ndbassert(pos>bitCount);
@@ -1207,7 +1251,11 @@ Dbtup::readDynBitsExpandedNULLable(Uint8
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(!BitmaskImpl::get((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos))
{
@@ -1260,11 +1308,14 @@ Dbtup::readDynVarSizeExpandedNotNULL(Uin
using different data base pointer and offset/lenght arrays.
*/
jam();
- char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
Uint32 var_index= AttributeOffset::getOffset(attrDes2);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 var_attr_pos= off_arr[var_index];
- Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+ Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
Uint32 vsize_in_bytes= off_arr[var_index+idx] - var_attr_pos;
return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
src_ptr + var_attr_pos, vsize_in_bytes);
@@ -1280,7 +1331,11 @@ Dbtup::readDynVarSizeExpandedNULLable(Ui
Check for NULL. In the expanded format, the bitmap is guaranteed
to be stored in full length.
*/
- Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
{
@@ -1299,8 +1354,11 @@ Dbtup::readDynVarSizeShrunkenNotNULL(Uin
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
ndbrequire(dyn_len!=0);
Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -1315,7 +1373,7 @@ Dbtup::readDynVarSizeShrunkenNotNULL(Uin
any trailing non-bitmap bytes to save a few conditionals.
*/
Tablerec* regTabPtr = tabptr.p;
- Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+ Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask[ind];
Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 prevMask= (1 << (pos & 31)) - 1;
Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -1343,8 +1401,11 @@ Dbtup::readDynVarSizeShrunkenNULLable(Ui
AttributeHeader* ahOut,
Uint32 attrDes2)
{
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
/* Check for NULL (including the case of an empty bitmap). */
Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -2037,10 +2098,13 @@ Dbtup::updateDynFixedSizeNotNULL(Uint32*
Uint32 attrDes2)
{
Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
ndbassert(nullbits && nullbits <= 16);
/*
@@ -2068,10 +2132,10 @@ Dbtup::updateDynFixedSizeNotNULL(Uint32*
/* Compute the data and offset location and write the actual data. */
Uint32 off_index= AttributeOffset::getOffset(attrDes2);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 offset= off_arr[off_index];
- Uint32 *dst_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
- Uint32 check_offset= req_struct->m_var_data[MM].m_max_dyn_offset;
+ Uint32 *dst_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+ Uint32 check_offset= req_struct->m_var_data[ind].m_max_dyn_offset;
ndbassert((offset&3)==0);
ndbassert((check_offset&3)==0);
@@ -2092,9 +2156,12 @@ Dbtup::updateDynFixedSizeNULLable(Uint32
return updateDynFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
ndbassert(nullbits && nullbits <= 16);
@@ -2133,17 +2200,20 @@ Dbtup::updateDynBigFixedSizeNotNULL(Uint
Uint32 attrDes2)
{
Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
jam();
BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
/* Compute the data and offset location and write the actual data. */
Uint32 off_index= AttributeOffset::getOffset(attrDes2);
Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 offset= off_arr[off_index];
- Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+ Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
ndbassert((offset&3)==0);
bool res= fixsize_updater(inBuffer,
@@ -2151,7 +2221,7 @@ Dbtup::updateDynBigFixedSizeNotNULL(Uint
attrDes2,
bm_ptr,
offset>>2,
- req_struct->m_var_data[MM].m_max_dyn_offset);
+ req_struct->m_var_data[ind].m_max_dyn_offset);
/* Set the correct size for fixsize data. */
off_arr[off_index+idx]= offset+(noOfWords<<2);
return res;
@@ -2162,10 +2232,13 @@ Dbtup::updateDynBigFixedSizeNULLable(Uin
KeyReqStruct *req_struct,
Uint32 attrDes2)
{
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
Uint32 nullIndicator= ahIn.isNULL();
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
- Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
if (!nullIndicator)
return updateDynBigFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
@@ -2189,9 +2262,12 @@ Dbtup::updateDynBitsNotNULL(Uint32* inBu
Uint32 attrDes2)
{
Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
- Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+ Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
jam();
BitmaskImpl::set(bm_len, bm_ptr, pos);
@@ -2227,6 +2303,9 @@ Dbtup::updateDynBitsNULLable(Uint32* inB
KeyReqStruct *req_struct,
Uint32 attrDes2)
{
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
Uint32 nullIndicator= ahIn.isNULL();
@@ -2234,7 +2313,7 @@ Dbtup::updateDynBitsNULLable(Uint32* inB
return updateDynBitsNotNULL(inBuffer, req_struct, attrDes2);
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
- Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
Uint32 newIndex= req_struct->in_buf_index + 1;
if (newIndex <= req_struct->in_buf_len) {
@@ -2254,23 +2333,26 @@ Dbtup::updateDynVarSizeNotNULL(Uint32* i
KeyReqStruct *req_struct,
Uint32 attrDes2)
{
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
- Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
jam();
BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
/* Compute the data and offset location and write the actual data. */
Uint32 off_index= AttributeOffset::getOffset(attrDes2);
- Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+ Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
Uint32 offset= off_arr[off_index];
- Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+ Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
bool res= varsize_updater(inBuffer,
req_struct,
(char*)bm_ptr,
offset,
&(off_arr[off_index+idx]),
- req_struct->m_var_data[MM].m_max_dyn_offset);
+ req_struct->m_var_data[ind].m_max_dyn_offset);
return res;
}
@@ -2279,10 +2361,13 @@ Dbtup::updateDynVarSizeNULLable(Uint32*
KeyReqStruct *req_struct,
Uint32 attrDes2)
{
+ Uint32 ind =
+ (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
Uint32 nullIndicator= ahIn.isNULL();
Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
- Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+ Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
if (!nullIndicator)
return updateDynVarSizeNotNULL(inBuffer, req_struct, attrDes2);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2009-10-20 16:10:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2009-11-09 09:42:56 +0000
@@ -86,11 +86,14 @@ Dbtup::execACC_SCANREQ(Signal* signal)
if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0)
{
- bits |= ScanOp::SCAN_VS;
-
- // disk pages have fixed page format
- ndbrequire(! (bits & ScanOp::SCAN_DD));
+ if (bits & ScanOp::SCAN_DD)
+ {
+ // only dd scan varsize pages
+ // mm always has a fixed part
+ bits |= ScanOp::SCAN_VS;
+ }
}
+
if (! AccScanReq::getReadCommittedFlag(req->requestInfo))
{
if (AccScanReq::getLockMode(req->requestInfo) == 0)
@@ -619,7 +622,7 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
key.m_page_no = ext->m_first_page_no;
pos.m_get = ScanPos::Get_page_dd;
}
- key.m_page_idx = 0;
+ key.m_page_idx = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
// let scanNext() do the work
scan.m_state = ScanOp::Next;
}
@@ -652,7 +655,9 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
const bool lcp = (bits & ScanOp::SCAN_LCP);
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
- Uint32 size = table.m_offsets[mm].m_fix_header_size;
+ const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
+ table.m_offsets[mm].m_fix_header_size : 1;
+ const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
if (lcp && lcp_list != RNIL)
{
@@ -729,7 +734,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
return true;
}
cont:
- key.m_page_idx = 0;
+ key.m_page_idx = first;
pos.m_get = ScanPos::Get_page_mm;
// clear cached value
pos.m_realpid_mm = RNIL;
@@ -791,7 +796,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
key.m_page_no = ext->m_first_page_no;
}
}
- key.m_page_idx = 0;
+ key.m_page_idx = first;
pos.m_get = ScanPos::Get_page_dd;
/*
read ahead for scan in disk order
@@ -919,62 +924,89 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
// get fixed size tuple
jam();
{
- Fix_page* page = (Fix_page*)pos.m_page;
- if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
- {
- pos.m_get = ScanPos::Get_next_tuple_fs;
-#ifdef VM_TRACE
- if (! (bits & ScanOp::SCAN_DD))
+ if ((bits & ScanOp::SCAN_VS) == 0)
+ {
+ Fix_page* page = (Fix_page*)pos.m_page;
+ if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
{
- Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
- ndbassert(pos.m_realpid_mm == realpid);
+ jam();
+ pos.m_get = ScanPos::Get_next_tuple_fs;
+ th = (Tuple_header*)&page->m_data[key.m_page_idx];
}
+ else
+ {
+ jam();
+ // no more tuples on this page
+ pos.m_get = ScanPos::Get_next_page;
+ break;
+ }
+ }
+ else
+ {
+ Var_page * page = (Var_page*)pos.m_page;
+ if (key.m_page_idx < page->high_index)
+ {
+ jam();
+ pos.m_get = ScanPos::Get_next_tuple_fs;
+ if (((Var_page*)page)->is_free(key.m_page_idx))
+ continue;
+ th = (Tuple_header*)page->get_ptr(key.m_page_idx);
+ }
+ else
+ {
+ jam();
+ // no more tuples on this page
+ pos.m_get = ScanPos::Get_next_page;
+ break;
+ }
+ }
+
+#ifdef VM_TRACE
+ if (! (bits & ScanOp::SCAN_DD))
+ {
+ Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
+ ndbassert(pos.m_realpid_mm == realpid);
+ }
#endif
- th = (Tuple_header*)&page->m_data[key.m_page_idx];
-
- if (likely(! (bits & ScanOp::SCAN_NR)))
- {
- jam();
- thbits = th->m_header_bits;
- if (! (thbits & Tuple_header::FREE))
- {
- goto found_tuple;
- }
- }
- else
- {
- if (pos.m_realpid_mm == RNIL)
+
+ if (likely(! (bits & ScanOp::SCAN_NR)))
+ {
+ jam();
+ thbits = th->m_header_bits;
+ if (! (thbits & Tuple_header::FREE))
+ {
+ goto found_tuple;
+ }
+ }
+ else
+ {
+ if (pos.m_realpid_mm == RNIL)
+ {
+ jam();
+ foundGCI = 0;
+ goto found_deleted_rowid;
+ }
+ thbits = th->m_header_bits;
+ if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
+ foundGCI == 0)
+ {
+ if (! (thbits & Tuple_header::FREE))
{
jam();
- foundGCI = 0;
+ goto found_tuple;
+ }
+ else
+ {
goto found_deleted_rowid;
}
- thbits = th->m_header_bits;
- if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
- foundGCI == 0)
- {
- if (! (thbits & Tuple_header::FREE))
- {
- jam();
- goto found_tuple;
- }
- else
- {
- goto found_deleted_rowid;
- }
- }
- else if (thbits != Fix_page::FREE_RECORD &&
- th->m_operation_ptr_i != RNIL)
- {
- jam();
- goto found_tuple; // Locked tuple...
- // skip free tuple
- }
- }
- } else {
- jam();
- // no more tuples on this page
- pos.m_get = ScanPos::Get_next_page;
+ }
+ else if (thbits != Fix_page::FREE_RECORD &&
+ th->m_operation_ptr_i != RNIL)
+ {
+ jam();
+ goto found_tuple; // Locked tuple...
+ // skip free tuple
+ }
}
}
break; // incr loop count
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp 2009-09-16 10:52:41 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp 2009-11-09 09:42:56 +0000
@@ -344,9 +344,9 @@ Dbtup::verifytabdes()
}
{
Uint32 offset[3];
- Uint32 MaskSize = (ptr.p->m_dyn_null_bits + 31) >> 5;
+ Uint32 MaskSize = (ptr.p->m_dyn_null_bits[MM] + 31) >> 5;
const Uint32 alloc = getDynTabDescrOffsets(MaskSize, offset);
- const Uint32 desc = ptr.p->dynTabDescriptor;
+ const Uint32 desc = ptr.p->dynTabDescriptor[MM];
Uint32 size = alloc;
if (size % ZTD_FREE_SIZE != 0)
size += ZTD_FREE_SIZE - size % ZTD_FREE_SIZE;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp 2009-10-08 11:41:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp 2009-11-09 09:42:56 +0000
@@ -255,6 +255,11 @@ struct Tup_varsize_page
Uint32 get_entry_chain(Uint32 page_idx) const {
return get_index_word(page_idx) & CHAIN;
}
+
+ bool is_free(Uint32 page_idx)
+ {
+ return (get_index_word(page_idx) & FREE) ? true : false;
+ }
};
NdbOut& operator<< (NdbOut& out, const Tup_varsize_page& page);
Attachment: [text/bzr-bundle] bzr/msabaratnam@mysql.com-20091109094256-jtp2ly3tsmqc3ri9.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (msabaratnam:3197) | Maitrayi Sabaratnam | 9 Nov |