List:Commits« Previous MessageNext Message »
From:Maitrayi Sabaratnam Date:November 9 2009 9:49am
Subject:bzr push into mysql-5.1-telco-7.0 branch (msabaratnam:3196 to 3197)
View as plain text  
 3197 Maitrayi Sabaratnam	2009-11-09
      Committing varsize disk tuples

    modified:
      storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp
      storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
 3196 Frazer Clement	2009-11-06
      Bug47757 MySQL Cluster : port bug#45017 to 7.0 (ipv6)

    modified:
      sql-common/client.c
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-11-09 09:42:56 +0000
@@ -363,6 +363,7 @@ public:
   Pgman* c_pgman;
   // copy of pgman.m_ptr set after each get_page
   Ptr<GlobalPage> m_pgman_ptr;
+  Ptr<GlobalPage> m_pgman_ptr_orig;
 
   enum CallbackIndex {
     // lgman
@@ -435,14 +436,16 @@ typedef Ptr<Fragoperrec> FragoperrecPtr;
     Uint32 newNoOfAttrs;
     Uint32 newNoOfCharsets;
     Uint32 newNoOfKeyAttrs;
-    Uint32 noOfDynNullBits;
-    Uint32 noOfDynVar;
-    Uint32 noOfDynFix;
-    Uint32 noOfDynamic;
+    Uint32 noOfDynNullBits[2];
+    Uint32 noOfDynVar[2];
+    Uint32 noOfDynFix[2];
+    Uint32 noOfDynamic[2];
     Uint32 tabDesOffset[7];
     Uint32 tableDescriptor;
     Uint32 dynTabDesOffset[3];
-    Uint32 dynTableDescriptor;
+    Uint32 dynDesAllocSize[2];
+    Uint32 dynTableDescriptor[2];
+    Uint16 disk_tuple_format;
   };
   typedef Ptr<AlterTabOperation> AlterTabOperationPtr;
 
@@ -802,6 +805,7 @@ struct Operationrec {
     Uint32 savepointId;
     Uint32 m_commit_disk_callback_page;
   };
+  Uint32 m_commit_disk_callback_page_orig;
 
   /*
    * State variables on connection.
@@ -825,6 +829,7 @@ struct Operationrec {
     unsigned int m_disk_preallocated : 1;
     unsigned int m_load_diskpage_on_commit : 1;
     unsigned int m_wait_log_buffer : 1;
+    unsigned int m_load_diskpage_orig_on_commit : 1;
   };
   union {
     OpBitFields op_struct;
@@ -942,6 +947,8 @@ ArrayPool<TupTriggerData> c_triggerPool;
   STATIC_CONST( DD = 1 );
   STATIC_CONST( DYN_BM_LEN_BITS = 8 );
   STATIC_CONST( DYN_BM_LEN_MASK = ((1 << DYN_BM_LEN_BITS) - 1));
+  STATIC_CONST( DISK_TUPLE_FIXED_SIZE = 0 );
+  STATIC_CONST( DISK_TUPLE_VAR_SIZE = 1 );
   
   struct Tablerec {
     Tablerec(ArrayPool<TupTriggerData> & triggerPool) : 
@@ -964,16 +971,16 @@ ArrayPool<TupTriggerData> c_triggerPool;
       _after_ seeing all columns, hence must be separate from the readKeyArray
       et al descriptor, which is allocated before seeing columns.
     */
-    Uint32 dynTabDescriptor;
+    Uint32 dynTabDescriptor[2];
 
     /* Mask of variable-sized dynamic attributes. */
-    Uint32* dynVarSizeMask;
+    Uint32* dynVarSizeMask[2];
     /*
       Mask of fixed-sized dynamic attributes. There is one bit set for each
       32-bit word occupied by fixed-size attributes, so fixed-size dynamic
       attributes >32bit have multiple bits here.
     */
-    Uint32* dynFixSizeMask;
+    Uint32* dynFixSizeMask[2];
 
     ReadFunction* readFunctionArray;
     UpdateFunction* updateFunctionArray;
@@ -1019,7 +1026,9 @@ ArrayPool<TupTriggerData> c_triggerPool;
     Uint16 m_no_of_disk_attributes;
     Uint16 noOfKeyAttr;
     Uint16 noOfCharsets;
-    Uint16 m_dyn_null_bits;
+    Uint16 m_dyn_null_bits[2];
+
+    Uint16 m_disk_tuple_format;
 
     bool need_expand() const { 
       return m_no_of_attributes > m_attributes[MM].m_no_of_fixsize;
@@ -1035,14 +1044,14 @@ ArrayPool<TupTriggerData> c_triggerPool;
       return 
 	m_attributes[MM].m_no_of_varsize > 0 ||
         m_attributes[MM].m_no_of_dynamic > 0 ||
-	m_attributes[DD].m_no_of_varsize > 0;
+        m_attributes[DD].m_no_of_dynamic > 0;
     }
     
     bool need_shrink(bool disk) const {
       return 
 	m_attributes[MM].m_no_of_varsize > 0 ||
 	m_attributes[MM].m_no_of_dynamic > 0 ||
-        (disk && m_attributes[DD].m_no_of_varsize > 0);
+        (disk && m_attributes[DD].m_no_of_dynamic > 0);
     }
 
     /**
@@ -1420,6 +1429,13 @@ typedef Ptr<HostBuffer> HostBufferPtr;
     STATIC_CONST( FREE        = 0x02800000 ); // Is free
     STATIC_CONST( VAR_PART    = 0x04000000 ); // Is there a varpart
     STATIC_CONST( REORG_MOVE  = 0x08000000 );
+    STATIC_CONST( DISK_SHRINK = 0x10000000 ); // disk tuple has shrunk
+    /* Disk tuple has grown, found enough space on the same page. */
+    STATIC_CONST( DISK_GROW= 0x20000000 );
+    /* Disk tuple is moved to another page,
+       due to lack of space on the same page.
+    */
+    STATIC_CONST( DISK_MOVE   = 0x40000000 );
 
     Tuple_header() {}
     Uint32 get_tuple_version() const { 
@@ -1470,12 +1486,85 @@ typedef Ptr<HostBuffer> HostBufferPtr;
       return m_data + (tabPtrP->m_bits & Tablerec::TR_Checksum);
     }
 
-    Uint32 *get_dd_gci(const Tablerec* tabPtrP, Uint32 mm){
+    Uint32 *get_dd_gci(const Tablerec* tabPtrP){
       assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
       return m_data;
     }
+
+    Uint32* get_end_of_fix_part_ptr_dd(const Tablerec* tabPtrP) {
+      assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
+      Uint32 nullwds = tabPtrP->m_offsets[DD].m_null_words;
+      return m_data + (tabPtrP->m_bits & Tablerec::TR_RowGCI) + nullwds;
+    }
+
+    const Uint32* get_end_of_fix_part_ptr_dd(const Tablerec* tabPtrP) const {
+      assert(tabPtrP->m_bits & Tablerec::TR_RowGCI);
+      Uint32 nullwds = tabPtrP->m_offsets[DD].m_null_words;
+      return m_data + (tabPtrP->m_bits & Tablerec::TR_RowGCI) + nullwds;
+    }
+
+    Uint32* get_end_of_fix_part_ptr_dd_with_dlen(const Tablerec* tabPtrP) {
+      Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+      Varpart_copy* dp= (Varpart_copy*)(eofp);
+      return dp->m_data;
+    }
+
+    const Uint32* get_end_of_fix_part_ptr_dd_with_dlen(const Tablerec* tabPtrP) const {
+      const Uint32* eofp=  get_end_of_fix_part_ptr_dd(tabPtrP);
+      Varpart_copy* dp= (Varpart_copy*)(eofp);
+      return dp->m_data;
+    }
+
+    Uint32 get_fix_size_dd_with_dlen(const Tablerec* tabPtrP) {
+      Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+      Varpart_copy* dp= (Varpart_copy*)(eofp);
+      return (dp->m_data - (Uint32 *)this);
+    }
+
+    Uint32 get_fix_size_dd(const Tablerec* tabPtrP) {
+      Uint32* eofp= get_end_of_fix_part_ptr_dd(tabPtrP);
+      return (eofp - (Uint32 *)this);
+    }
   };
 
+  void Dbtup::debug_bits(char *text, Uint32 bits)
+  {
+    ndbout_c("%s", text);
+    if (bits & Tuple_header::COPY_TUPLE)
+        ndbout_c("COPY_TUPLE");
+    if (bits & Tuple_header::DISK_PART)
+        ndbout_c("DISK_PART");
+    if (bits & Tuple_header::DISK_ALLOC)
+        ndbout_c("DISK_ALLOC");
+    if (bits & Tuple_header::DISK_INLINE)
+        ndbout_c("DISK_INLINE");
+    if (bits & Tuple_header::ALLOC)
+        ndbout_c("ALLOC");
+    if (bits & Tuple_header::MM_SHRINK)
+        ndbout_c("MM_SHRINK");
+    if (bits & Tuple_header::MM_GROWN)
+        ndbout_c("MM_GROWN");
+    if (bits & Tuple_header::FREED)
+        ndbout_c("FREED");
+    if (bits & Tuple_header::LCP_SKIP)
+        ndbout_c("LCP_SKIP");
+    if (bits & Tuple_header::LCP_KEEP)
+        ndbout_c("LCP_KEEP");
+    if (bits & Tuple_header::FREE)
+        ndbout_c("FREE");
+    if (bits & Tuple_header::VAR_PART)
+        ndbout_c("VAR_PART");
+    if (bits & Tuple_header::REORG_MOVE)
+        ndbout_c("REORG_MOVE");
+
+    if (bits & Tuple_header::DISK_SHRINK)
+        ndbout_c("DISK_SHRINK");
+    if (bits & Tuple_header::DISK_GROW)
+        ndbout_c("DISK_GROW");
+    if (bits & Tuple_header::DISK_MOVE)
+        ndbout_c("DISK_MOVE");
+  }
+
   /**
    * Format of varpart after insert/update
    */
@@ -1602,7 +1691,7 @@ struct KeyReqStruct {
    * was updating this record.
    */
   Bitmask<MAXNROFATTRIBUTESINWORDS> changeMask;
-  Uint16 var_pos_array[2*MAX_ATTRIBUTES_IN_TABLE + 1];
+  Uint16 var_pos_array[4*MAX_ATTRIBUTES_IN_TABLE + 1];
   OperationrecPtr prevOpPtr;
 };
 
@@ -1963,6 +2052,15 @@ private:
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
+  int handleUpdateReq_dd_var(Signal* signal,
+                      Ptr<Operationrec> regOperRecPtr,
+                      Ptr<Fragrecord> fragPtr,
+                      Tablerec* regTabPtr,
+                      KeyReqStruct* req_struct,
+		      bool disk);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
   int handleInsertReq(Signal* signal,
                       Ptr<Operationrec> regOperPtr,
                       Ptr<Fragrecord>,
@@ -2411,6 +2509,7 @@ private:
                         Uint32 noOfCharsets,
                         Uint32 & charsetIndex, Uint32 & attrDes2);
   void computeTableMetaData(Tablerec *regTabPtr);
+  void computeTableMetaData_dd_var(Tablerec*);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
@@ -2782,8 +2881,10 @@ private:
                                  const Uint32* offset);
   void setupDynDescriptorReferences(Uint32 dynDescr,
                                     Tablerec* const regTabPtr,
-                                    const Uint32* offset);
+                                    const Uint32* offset,
+                                    Uint32 ind=0);
   void setUpKeyArray(Tablerec* regTabPtr);
+  void setUpKeyArray_dd_var(Tablerec*);
   bool addfragtotab(Tablerec* regTabPtr, Uint32 fragId, Uint32 fragIndex);
   void deleteFragTab(Tablerec* regTabPtr, Uint32 fragId);
   void abortAddFragOp(Signal* signal);
@@ -2956,6 +3057,7 @@ private:
 // Public methods
   Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
   void free_var_rec(Fragrecord*, Tablerec*, Local_key*, Ptr<Page>);
+  Uint16 grow_record_on_same_page(PagePtr, Uint16,Uint32, Uint32);
   Uint32* alloc_var_part(Fragrecord*, Tablerec*, Uint32, Local_key*);
   Uint32 *realloc_var_part(Fragrecord*, Tablerec*, 
                            PagePtr, Var_part_ref*, Uint32, Uint32);
@@ -3081,8 +3183,20 @@ private:
 
   void expand_tuple(KeyReqStruct*, Uint32 sizes[4], Tuple_header*org, 
 		    const Tablerec*, bool disk);
+  void expand_tuple_dd_fixed(KeyReqStruct*, Uint32 sizes[2],
+                             const Tablerec*, Uint32,
+                             const Uint32*, Uint32*,
+                             const Uint32 *);
+  void expand_tuple_dd_var(KeyReqStruct*, Uint32 sizes[2],
+                           const Tablerec*, Uint32,
+                           const Uint32*, Uint32*,
+                           const Uint32 *, const Uint16*,
+                           const Uint32*);
+
   void shrink_tuple(KeyReqStruct*, Uint32 sizes[2], const Tablerec*,
 		    bool disk);
+  void shrink_tuple_dd_var(KeyReqStruct*, Uint32 sizes[2],
+                           const Tablerec*, bool disk);
   
   Uint32* get_ptr(Var_part_ref);
   Uint32* get_ptr(PagePtr*, Var_part_ref);
@@ -3123,6 +3237,13 @@ private:
 				     Ptr<Page>, Uint32, Uint32);
   void disk_page_prealloc_transit_page(Disk_alloc_info&,
 				       Ptr<Page_request>, Uint32, Uint32);
+
+  /* Prealloc the space needed for the expantion of a tuple
+     on the current page.
+  */
+  bool disk_page_prealloc_on_same_page(Signal*, Fragrecord*,
+                                       Local_key, PagePtr,
+                                       Uint32, Uint32);
   
   void disk_page_abort_prealloc(Signal*, Fragrecord*,Local_key*, Uint32);
   void disk_page_abort_prealloc_callback(Signal*, Uint32, Uint32);
@@ -3138,10 +3259,13 @@ private:
   
   void disk_page_alloc(Signal*, 
 		       Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
+  void disk_page_realloc_on_same_page(Fragrecord*, PagePtr,
+                                      Local_key*, Uint32, Uint32, Uint32, bool);
   void disk_page_free(Signal*, 
 		      Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
   
   void disk_page_commit_callback(Signal*, Uint32 opPtrI, Uint32 page_id);  
+  void disk_page_commit_callback_orig(Signal*, Uint32, Uint32);
   
   void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32); 
 
@@ -3236,6 +3360,21 @@ private:
                            const Dbtup::ScanOp& op);
   void commit_operation(Signal*, Uint32, Tuple_header*, PagePtr,
 			Operationrec*, Fragrecord*, Tablerec*);
+  int retrieve_data_page(Signal*, bool,
+                         Page_cache_client::Request,
+                         OperationrecPtr);
+  int retrieve_log_page(Signal*, FragrecordPtr, OperationrecPtr);
+
+  int start_commit_dd_fixed(Signal*, OperationrecPtr, TablerecPtr,
+                            FragrecordPtr, Tuple_header*);
+  int start_commit_dd_var(Signal*, OperationrecPtr, TablerecPtr,
+                          FragrecordPtr, Tuple_header*);
+  void commit_operation_dd_fixed(Signal*, Operationrec*, Fragrecord*,
+                                 Tablerec*, Tuple_header*,
+                                 Tuple_header*, Tuple_header*, Uint32);
+  void commit_operation_dd_var(Signal*, Operationrec*, Fragrecord*,
+                               Tablerec*, Tuple_header*,
+                               Tuple_header*, Local_key, Uint32);
   
   void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*, 
 		     Operationrec*, Fragrecord*, Tablerec*);
@@ -3246,6 +3385,15 @@ private:
 				      Fragrecord* regFragPtr,
 				      Tablerec* regTabPtr,
 				      Uint32 sizes[4]);
+
+  int handle_size_change_after_update_disk(Signal*,
+                                           KeyReqStruct* req_struct,
+                                           Tuple_header* org,
+                                           Ptr<Operationrec> regOperRecPtr,
+                                           Ptr<Fragrecord> fragPtr,
+                                           Tablerec* regTabPtr,
+                                           Uint32 sizes[4]);
+
   int optimize_var_part(KeyReqStruct* req_struct,
                         Tuple_header* org,
                         Operationrec* regOperPtr,
@@ -3257,6 +3405,10 @@ private:
    *   req_struct->m_tuple_ptr is set to tuple to read
    */
   void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
+  void prepare_read_dd_fixed(KeyReqStruct*, Tablerec* const,
+                             const Uint32*);
+  void prepare_read_dd_var(KeyReqStruct*, Tablerec* const,
+                           const Uint32*);
 
   /* For debugging, dump the contents of a tuple. */
   void dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2009-08-17 07:36:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2009-11-09 09:42:56 +0000
@@ -46,13 +46,38 @@ Dbtup::do_tup_abort_operation(Signal* si
   {
     Tuple_header *copy=
       get_copy_tuple(tablePtrP, &opPtrP->m_copy_tuple_location);
+    Uint32 copybits = copy->m_header_bits;
     
     if (opPtrP->op_struct.m_disk_preallocated)
     {
       jam();
-      Local_key key;
-      memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
-      disk_page_abort_prealloc(signal, fragPtrP, &key, key.m_page_idx);
+      if (tablePtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+          opPtrP->is_last_operation())
+      {
+        jam();
+        ndbassert(!(copybits & Tuple_header::DISK_SHRINK));
+        ndbassert(!opPtrP->m_copy_tuple_location.isNull());
+        ndbassert(copybits & Tuple_header::DISK_ALLOC ||
+                  copybits & Tuple_header::DISK_MOVE ||
+                  copybits & Tuple_header::DISK_GROW);
+
+        Local_key key;
+        memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
+        Uint32 sz = key.m_page_idx;
+        if (copybits & Tuple_header::DISK_ALLOC ||
+            copybits & Tuple_header::DISK_MOVE)
+        {
+          sz += 1;
+        }
+        disk_page_abort_prealloc(signal, fragPtrP, &key, sz);
+      }
+      else
+      {
+        jam();
+        Local_key key;
+        memcpy(&key, copy->get_disk_ref_ptr(tablePtrP), sizeof(key));
+        disk_page_abort_prealloc(signal, fragPtrP, &key, key.m_page_idx);
+      }
     }
 
     if(! (bits & Tuple_header::ALLOC))

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2009-10-20 16:10:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2009-11-09 09:42:56 +0000
@@ -113,6 +113,7 @@ void Dbtup::initOpConnection(Operationre
   regOperPtr->op_struct.op_type= ZREAD;
   regOperPtr->op_struct.m_disk_preallocated= 0;
   regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr->op_struct.m_load_diskpage_orig_on_commit = 0;
   regOperPtr->op_struct.m_wait_log_buffer= 0;
   regOperPtr->op_struct.in_active_list = false;
   regOperPtr->m_undo_buffer_space= 0;
@@ -239,6 +240,9 @@ Dbtup::commit_operation(Signal* signal,
   Uint32 save= tuple_ptr->m_operation_ptr_i;
   Uint32 bits= tuple_ptr->m_header_bits;
 
+  // Save the key of the original disk_tuple
+  Local_key key_orig;
+  memcpy(&key_orig, tuple_ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
   Tuple_header *disk_ptr= 0;
   Tuple_header *copy=
     get_copy_tuple(regTabPtr, &regOperPtr->m_copy_tuple_location);
@@ -327,43 +331,20 @@ Dbtup::commit_operation(Signal* signal,
       (copy_bits & Tuple_header::DISK_INLINE))
   {
     jam();
-    Local_key key;
-    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
-    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
 
-    PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
-    ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
-    ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
-    Uint32 sz, *dst;
-    if(copy_bits & Tuple_header::DISK_ALLOC)
-    {
-      jam();
-      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
-    }
-    
-    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
     {
-      jam();
-      sz= regTabPtr->m_offsets[DD].m_fix_header_size;
-      dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
+      commit_operation_dd_var(signal,regOperPtr, regFragPtr,
+			        regTabPtr, tuple_ptr,
+				disk_ptr, key_orig, gci);
     }
     else
     {
-      jam();
-      dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
-      sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
-    }
-    
-    if(! (copy_bits & Tuple_header::DISK_ALLOC))
-    {
-      jam();
-      disk_page_undo_update(diskPagePtr.p, 
-			    &key, dst, sz, gci, logfile_group_id);
+      commit_operation_dd_fixed(signal,regOperPtr, regFragPtr,
+			        regTabPtr, tuple_ptr,
+				disk_ptr, copy, gci);
     }
-    
-    memcpy(dst, disk_ptr, 4*sz);
-    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
-    
+
     ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
     copy_bits |= Tuple_header::DISK_PART;
   }
@@ -382,10 +363,23 @@ Dbtup::commit_operation(Signal* signal,
     }
   }
   
-  Uint32 clear= 
+  Uint32 clear= (Uint32)0;
+  if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    clear=
+      Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
+      Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE |
+      Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN |
+      Tuple_header::DISK_SHRINK | Tuple_header::DISK_GROW |
+      Tuple_header::DISK_MOVE;
+  }
+  else
+  {
+    clear=
     Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
     Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE | 
     Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN;
+  }
   copy_bits &= ~(Uint32)clear;
   
   tuple_ptr->m_header_bits= copy_bits;
@@ -467,8 +461,11 @@ Dbtup::disk_page_log_buffer_callback(Sig
   tupCommitReq->diskpage = page;
 
   ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
+  ndbassert(regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit == 0);
   regOperPtr.p->op_struct.m_wait_log_buffer= 0;
   m_global_page_pool.getPtr(m_pgman_ptr, page);
+  m_global_page_pool.getPtr(m_pgman_ptr_orig,
+                       regOperPtr.p->m_commit_disk_callback_page_orig);
   
   execTUP_COMMITREQ(signal);
   ndbassert(signal->theData[0] == 0);
@@ -586,135 +583,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* si
       }
     }
   }
-  
-  bool get_page = false;
-  if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
-  {
-    jam();
-    Page_cache_client::Request req;
 
-    /**
-     * Only last op on tuple needs "real" commit,
-     *   hence only this one should have m_load_diskpage_on_commit
-     */
-    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
-
-    /**
-     * Check for page
-     */
-    if(!regOperPtr.p->m_copy_tuple_location.isNull())
-    {
-      jam();
-      Tuple_header* tmp=
-        get_copy_tuple(regTabPtr.p, &regOperPtr.p->m_copy_tuple_location);
-      
-      memcpy(&req.m_page, 
-	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
-
-      if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
-		   tmp->m_header_bits & Tuple_header::DISK_ALLOC))
-      {
-        jam();
-	/**
-	 * Insert+Delete
-	 */
-        regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
-        regOperPtr.p->op_struct.m_wait_log_buffer = 0;	
-        disk_page_abort_prealloc(signal, regFragPtr.p, 
-				 &req.m_page, req.m_page.m_page_idx);
-        
-        D("Logfile_client - execTUP_COMMITREQ");
-        Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
-        lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
-	goto skip_disk;
-        if (0) ndbout_c("insert+delete");
-        jamEntry();
-        goto skip_disk;
-      }
-    } 
-    else
-    {
-      jam();
-      // initial delete
-      ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
-      memcpy(&req.m_page, 
-	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
-      
-      ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
-    }
-    req.m_callback.m_callbackData= regOperPtr.i;
-    req.m_callback.m_callbackFunction = 
-      safe_cast(&Dbtup::disk_page_commit_callback);
-
-    /*
-     * Consider commit to be correlated.  Otherwise pk op + commit makes
-     * the page hot.   XXX move to TUP which knows better.
-     */
-    int flags= regOperPtr.p->op_struct.op_type |
-      Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
-    Page_cache_client pgman(this, c_pgman);
-    int res= pgman.get_page(signal, req, flags);
-    m_pgman_ptr = pgman.m_ptr;
-    switch(res){
-    case 0:
-      /**
-       * Timeslice
-       */
-      jam();
-      signal->theData[0] = 1;
-      return;
-    case -1:
-      ndbrequire("NOT YET IMPLEMENTED" == 0);
-      break;
-    default:
-      jam();
-    }
-    get_page = true;
-
-    {
-      PagePtr tmpptr;
-      tmpptr.i = m_pgman_ptr.i;
-      tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr.p);
-      disk_page_set_dirty(tmpptr);
-    }
-    
-    regOperPtr.p->m_commit_disk_callback_page= res;
-    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
-  } 
-  
-  if(regOperPtr.p->op_struct.m_wait_log_buffer)
+  int result = -1;
+  if (regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
   {
-    jam();
-    /**
-     * Only last op on tuple needs "real" commit,
-     *   hence only this one should have m_wait_log_buffer
-     */
-    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
-    
-    CallbackPtr cb;
-    cb.m_callbackData= regOperPtr.i;
-    cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
-    Uint32 sz= regOperPtr.p->m_undo_buffer_space;
-    
-    D("Logfile_client - execTUP_COMMITREQ");
-    Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
-    int res= lgman.get_log_buffer(signal, sz, &cb);
-    jamEntry();
-    switch(res){
-    case 0:
-      jam();
-      signal->theData[0] = 1;
-      return;
-    case -1:
-      ndbrequire("NOT YET IMPLEMENTED" == 0);
-      break;
-    default:
-      jam();
-    }
+    result= start_commit_dd_var(signal, regOperPtr, regTabPtr,
+                                regFragPtr, tuple_ptr);
   }
-  
+  else
+  {
+    result= start_commit_dd_fixed(signal, regOperPtr, regTabPtr,
+                                regFragPtr, tuple_ptr);
+  }
+  // If pages have not been retrieved yet, wait..
+  if (result != 0)
+    return;
+
   assert(tuple_ptr);
-skip_disk:
   req_struct.m_tuple_ptr = tuple_ptr;
   
   Uint32 nextOp = regOperPtr.p->nextActiveOp;
@@ -751,8 +636,6 @@ skip_disk:
     else
     {
       jam();
-      if (get_page)
-	ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
       dealloc_tuple(signal, gci_hi, page.p, tuple_ptr,
 		    regOperPtr.p, regFragPtr.p, regTabPtr.p); 
     }
@@ -798,3 +681,606 @@ Dbtup::set_commit_change_mask_info(const
     memcpy(dst, maskptr, 4*masklen);
   }
 }
+
+/*
+  The following methods will handle disk data
+  as fixed size (m_disk_tuple_format=DISK_TUPLE_FIXED_SIZE)
+*/
+
+int Dbtup::retrieve_data_page(Signal *signal, bool orig,
+                              Page_cache_client::Request req,
+                              OperationrecPtr regOperPtr)
+{
+  req.m_callback.m_callbackData= regOperPtr.i;
+  if (orig)
+  {
+    req.m_callback.m_callbackFunction =
+      safe_cast(&Dbtup::disk_page_commit_callback_orig);
+  }
+  else
+  {
+    req.m_callback.m_callbackFunction =
+      safe_cast(&Dbtup::disk_page_commit_callback);
+  }
+
+  /*
+   * Consider commit to be correlated.  Otherwise pk op + commit makes
+   * the page hot.   XXX move to TUP which knows better.
+   */
+  int flags= regOperPtr.p->op_struct.op_type |
+    Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
+  Page_cache_client pgman(this, c_pgman);
+  int res= pgman.get_page(signal, req, flags);
+
+  if (orig)
+  {
+      m_pgman_ptr_orig = pgman.m_ptr;
+  }
+  else
+  {
+    m_pgman_ptr = pgman.m_ptr;
+  }
+
+  switch(res){
+  case 0:
+    /**
+     * Timeslice
+     */
+    jam();
+    signal->theData[0] = 1;
+    return res;
+  case -1:
+    ndbrequire("NOT YET IMPLEMENTED" == 0);
+    break;
+  default:
+    jam();
+  }
+  {
+    PagePtr tmpptr;
+    if (orig) {
+      tmpptr.i = m_pgman_ptr_orig.i;
+      tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr_orig.p);
+    }
+    else
+    {
+      tmpptr.i = m_pgman_ptr.i;
+      tmpptr.p = reinterpret_cast<Page*>(m_pgman_ptr.p);
+    }
+    disk_page_set_dirty(tmpptr);
+  }
+
+  if (orig)
+  {
+    regOperPtr.p->m_commit_disk_callback_page_orig= res;
+    regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit= 0;
+  }
+  else
+  {
+    regOperPtr.p->m_commit_disk_callback_page= res;
+    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  }
+  return res;
+}
+
+int Dbtup::retrieve_log_page(Signal *signal,
+                             FragrecordPtr regFragPtr,
+                             OperationrecPtr regOperPtr)
+{
+  jam();
+  /**
+   * Only last op on tuple needs "real" commit,
+   *   hence only this one should have m_wait_log_buffer
+   */
+
+  CallbackPtr cb;
+  cb.m_callbackData= regOperPtr.i;
+  cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
+  Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+
+  D("Logfile_client - execTUP_COMMITREQ");
+  Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+  int res= lgman.get_log_buffer(signal, sz, &cb);
+  jamEntry();
+  switch(res){
+  case 0:
+    jam();
+    signal->theData[0] = 1;
+    return res;
+  case -1:
+    ndbrequire("NOT YET IMPLEMENTED" == 0);
+    break;
+  default:
+    jam();
+  }
+  regOperPtr.p->op_struct.m_wait_log_buffer= 0;
+
+  return res;
+}
+
+int Dbtup::start_commit_dd_fixed(Signal* signal,
+                                   OperationrecPtr regOperPtr,
+                                   TablerecPtr regTabPtr,
+                                   FragrecordPtr regFragPtr,
+                                   Tuple_header* tuple_ptr)
+{
+  bool get_page = false;
+  if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+  {
+    jam();
+    Page_cache_client::Request req;
+
+    /**
+     * Only last op on tuple needs "real" commit,
+     *   hence only this one should have m_load_diskpage_on_commit
+     */
+    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+
+    /**
+     * Check for page
+     */
+    if(!regOperPtr.p->m_copy_tuple_location.isNull())
+    {
+      jam();
+      Tuple_header* tmp=
+        get_copy_tuple(regTabPtr.p, &regOperPtr.p->m_copy_tuple_location);
+
+      memcpy(&req.m_page,
+	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+      if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
+		   tmp->m_header_bits & Tuple_header::DISK_ALLOC))
+      {
+        jam();
+	/**
+	 * Insert+Delete
+	 */
+        if (0) ndbout_c("insert+delete");
+
+        regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+        regOperPtr.p->op_struct.m_wait_log_buffer = 0;
+        disk_page_abort_prealloc(signal, regFragPtr.p,
+				 &req.m_page, req.m_page.m_page_idx);
+
+        D("Logfile_client - execTUP_COMMITREQ");
+        Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+        lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
+	//     ndbout_c("i+d, returning");
+	return 0;
+      }
+    }
+    else
+    {
+      jam();
+      // initial delete
+      ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
+      memcpy(&req.m_page,
+	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+      ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+    }
+
+    bool orig_page= false;
+    if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+    {
+      return 1; // data page is not retrieved yet
+    }
+  }
+
+  if(regOperPtr.p->op_struct.m_wait_log_buffer)
+  {
+    jam();
+    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+    if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
+    {
+      return 1; // log page is not retrieved yet
+    }
+  }
+
+  return 0;
+}
+
+void Dbtup::commit_operation_dd_fixed(Signal* signal,
+                                      Operationrec* regOperPtr,
+			              Fragrecord* regFragPtr,
+			              Tablerec* regTabPtr,
+				      Tuple_header *tuple_ptr,
+				      Tuple_header *disk_ptr,
+				      Tuple_header *copy,
+                                      Uint32 gci)
+{
+    jam();
+    Local_key key;
+    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
+
+    PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+    ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+    ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
+    Uint32 copy_bits= copy->m_header_bits;
+    Uint32 sz, *dst;
+    if(copy_bits & Tuple_header::DISK_ALLOC)
+    {
+      jam();
+      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
+    }
+
+    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    {
+      jam();
+      sz= regTabPtr->m_offsets[DD].m_fix_header_size;
+      dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
+    }
+    else
+    {
+      jam();
+      dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+      sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
+    }
+
+    if(! (copy_bits & Tuple_header::DISK_ALLOC))
+    {
+      jam();
+      disk_page_undo_update(diskPagePtr.p,
+			    &key, dst, sz, gci, logfile_group_id);
+    }
+
+    memcpy(dst, disk_ptr, 4*sz);
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+}
+
+
+/*
+  The following methods will handle disk data
+  as variable size (m_disk_tuple_format=DISK_TUPLE_VAR_SIZE)
+*/
+
+void
+Dbtup::disk_page_commit_callback_orig(Signal* signal,
+				       Uint32 opPtrI, Uint32 page_id)
+{
+  Uint32 hash_value;
+  Uint32 gci_hi, gci_lo;
+  OperationrecPtr regOperPtr;
+
+  jamEntry();
+
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci_hi= gci_hi;
+  tupCommitReq->gci_lo= gci_lo;
+  tupCommitReq->diskpage = regOperPtr.p->m_commit_disk_callback_page;
+
+  regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit= 0;
+  regOperPtr.p->m_commit_disk_callback_page_orig= page_id;
+  m_global_page_pool.getPtr(m_pgman_ptr_orig, page_id);
+  m_global_page_pool.getPtr(m_pgman_ptr,
+                            regOperPtr.p->m_commit_disk_callback_page);
+
+  {
+    PagePtr tmp;
+    tmp.i = m_pgman_ptr_orig.i;
+    tmp.p = reinterpret_cast<Page*>(m_pgman_ptr_orig.p);
+    disk_page_set_dirty(tmp);
+  }
+
+  execTUP_COMMITREQ(signal);
+  if(signal->theData[0] == 0)
+  {
+    jam();
+    c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+  }
+}
+
+int Dbtup::start_commit_dd_var(Signal* signal,
+                               OperationrecPtr regOperPtr,
+                               TablerecPtr regTabPtr,
+                               FragrecordPtr regFragPtr,
+                               Tuple_header* tuple_ptr)
+{
+  bool get_page = false;
+  Uint32 bits = tuple_ptr->m_header_bits;
+
+  Local_key origkey;
+  memcpy(&origkey,
+	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+  Uint32 cp_bits= (Uint32)0;
+  Tuple_header* copy_tuple= 0;
+  Local_key key_copy;
+
+  if (regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+  {
+    jam();
+    Page_cache_client::Request req;
+    bool has_copy_tuple = !regOperPtr.p->m_copy_tuple_location.isNull();
+
+    /*
+      Only last op on tuple needs "real" commit,
+      hence only this one should have m_load_diskpage_on_commit.
+    */
+    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+    /*
+      Find the page to retrieve and
+      abort the preallocs in case of DELETE.
+    */
+    if (has_copy_tuple)
+    {
+      // There is a copy tuple.
+        jam();
+        copy_tuple=
+        get_copy_tuple(regTabPtr.p, &regOperPtr.p->m_copy_tuple_location);
+        cp_bits= copy_tuple->m_header_bits;
+
+        memcpy(&key_copy,
+	       copy_tuple->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+        memcpy(&req.m_page,
+	       copy_tuple->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+    }
+
+    if (regOperPtr.p->op_struct.op_type == ZDELETE)
+    {
+      if (has_copy_tuple)
+      {
+        if (unlikely(cp_bits & Tuple_header::DISK_ALLOC))
+        {
+	  /* Insert+Delete.  */
+          jam();
+          if (0) ndbout_c("insert+delete");
+
+          regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+          regOperPtr.p->op_struct.m_wait_log_buffer = 0;
+          disk_page_abort_prealloc(signal, regFragPtr.p,
+				   &req.m_page, req.m_page.m_page_idx+1);
+
+          D("Logfile_client - execTUP_COMMITREQ");
+          Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+          lgman.free_log_space(regOperPtr.p->m_undo_buffer_space);
+	  return 0;
+        }
+
+        if (cp_bits & Tuple_header::DISK_MOVE)
+        {
+          regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit = 0;
+
+          /* MOVE is logged as an insert. Since there is no move
+             due to ZDELETE, relese the preallocs for data & log.
+	  */
+
+           // +1 for index slot.
+          disk_page_abort_prealloc(signal, regFragPtr.p,
+				   &req.m_page, req.m_page.m_page_idx+1);
+
+          Uint32 undo_log_tup_sz= sizeof(Dbtup::Disk_undo::Alloc)>>2;
+          /* Keep the undo_buffer_space to write log the
+             original tuple in dealloc_tuple.
+          */
+          regOperPtr.p->m_undo_buffer_space -= undo_log_tup_sz;
+
+          Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+          lgman.free_log_space(undo_log_tup_sz);
+        }
+        else if (cp_bits & Tuple_header::DISK_GROW)
+        {
+          // Release the delta prealloc'd by GROW.
+          disk_page_abort_prealloc(signal, regFragPtr.p,
+				   &req.m_page, req.m_page.m_page_idx);
+        }
+      } // end has_copy_tuple
+      else
+      {
+        // No copy-tuple, i.e. initial delete, retrieve original page.
+        jam();
+        ndbassert(bits & Tuple_header::DISK_PART);
+        memcpy(&req.m_page,
+	       tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+      }
+    } // end ZDELETE
+
+    /* Retrieve the requested page:
+      In case of initial delete, retrieve the original page,
+      else retrieve the page referred by the copy tuple's disk ref.
+     */
+
+    bool orig_page= false;
+    if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+    {
+      return 1; // Data page is not retrieved yet.
+    }
+  } // end load_diskpage_on_commit
+
+ /* Retrieve the ori page, in case of MOVE.
+   MOVE will be performed as a delete of the original tuple
+   and an insert of the new (moved) tuple.
+ */
+
+  if (regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit)
+  {
+    Page_cache_client::Request req;
+    memcpy(&req.m_page,
+	   tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+
+    ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+
+    bool orig_page= true;
+    if (retrieve_data_page(signal, orig_page, req, regOperPtr) == 0)
+    {
+      return 1; // data page is not retrieved yet
+    }
+  }
+
+  /******* Retrieve the log page **********/
+
+  if (regOperPtr.p->op_struct.m_wait_log_buffer)
+  {
+    jam();
+    ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
+    if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
+    {
+      return 1; // log page is not retrieved yet
+    }
+  }
+
+  if (copy_tuple)
+  {
+    m_global_page_pool.getPtr(m_pgman_ptr,
+                       regOperPtr.p->m_commit_disk_callback_page);
+
+    if (cp_bits & Tuple_header::DISK_MOVE)
+    {
+      m_global_page_pool.getPtr(m_pgman_ptr_orig,
+                         regOperPtr.p->m_commit_disk_callback_page_orig);
+    }
+  }
+  return 0;
+}
+
+void Dbtup::commit_operation_dd_var(Signal* signal,
+                                    Operationrec* regOperPtr,
+			            Fragrecord* regFragPtr,
+			            Tablerec* regTabPtr,
+				    Tuple_header *tuple_ptr,
+				    Tuple_header *disk_ptr,
+				    Local_key key_orig,
+                                    Uint32 gci)
+{
+  //ndbout_c("commit_operation_dd_var");
+  jam();
+  // tuple_ptr contains the copy tuple
+  Local_key key;
+  memcpy(&key, tuple_ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+  Uint32 logfile_group_id = regFragPtr->m_logfile_group_id;
+  PagePtr diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+
+  ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
+  ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
+  diskPagePtr.p= reinterpret_cast<Page*>(m_pgman_ptr.p);
+
+  Uint32 copy_bits = tuple_ptr->m_header_bits;
+  bool tuple_exists = copy_bits & Tuple_header::DISK_PART;
+  bool initial_insert = copy_bits & Tuple_header::DISK_ALLOC;
+  bool tuple_moved = copy_bits & Tuple_header::DISK_MOVE;
+  bool tuple_grown = copy_bits & Tuple_header::DISK_GROW;
+  bool tuple_shrunken = copy_bits & Tuple_header::DISK_SHRINK;
+  ndbassert(initial_insert == !tuple_exists);
+
+
+
+  /* Find the real size of the copy tuple. */
+
+  Uint32 fixedsz= ((Tuple_header*)disk_ptr)->get_fix_size_dd(regTabPtr);
+  Varpart_copy *dp=  (Varpart_copy*)(((Tuple_header*)disk_ptr)->
+                        get_end_of_fix_part_ptr_dd(regTabPtr));
+  Uint32 *dynstart = dp->m_data;
+  Uint32 dynlen = dp->m_len;
+  Uint32 realsz = fixedsz+dynlen;
+
+
+  /* Get the sz of the original disk tuple
+     and log the before_image. */
+
+  PagePtr orig_diskPagePtr;
+  Uint32 orig_sz = 0;
+
+  if (tuple_exists)
+  {
+    if (tuple_moved)
+    {
+      jam();
+      orig_diskPagePtr = *(PagePtr*)&m_pgman_ptr_orig;
+      ndbassert(orig_diskPagePtr.p->m_page_no == key_orig.m_page_no);
+      ndbassert(orig_diskPagePtr.p->m_file_no == key_orig.m_file_no);
+    }
+    else
+    {
+      orig_diskPagePtr = *(PagePtr*)&m_pgman_ptr;
+    }
+    Uint32 *orig_dst = ((Var_page*)orig_diskPagePtr.p)->
+                        get_ptr(key_orig.m_page_idx);
+    orig_sz =
+      ((Var_page*)orig_diskPagePtr.p)->get_entry_len(key_orig.m_page_idx);
+
+
+    /* Log the before-image. */
+
+    jam();
+    if (! (initial_insert || tuple_moved) )
+      disk_page_undo_update(diskPagePtr.p,
+			    &key_orig, orig_dst, orig_sz, gci,
+                            logfile_group_id);
+  }
+
+  /* Free any extra reserved/alloc'd space */
+
+  Uint32 reservedsz = key.m_page_idx;
+
+  if ((initial_insert || tuple_moved) && reservedsz > realsz)
+  {
+    /* MOVE after initial insert or grow has alloc'd more than needed.
+         -1 for index slot.
+    */
+    disk_page_abort_prealloc_callback_1(signal, regFragPtr,
+					diskPagePtr,
+                                        reservedsz - realsz - 1);
+ }
+
+  /*  Alloc/realloc record. */
+
+  if (initial_insert || tuple_moved)
+  {
+    jam();
+    key.m_page_idx = realsz;
+    disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
+  }
+  else if (tuple_grown)
+  {
+    jam();
+
+    bool grown = true;
+    disk_page_realloc_on_same_page(regFragPtr, diskPagePtr, &key_orig,
+                                   gci, orig_sz, realsz, grown);
+
+    // In GROW case, copy_key's m_page_idx contains the growth delta.
+    memcpy(&key, &key_orig, sizeof(Local_key));
+  }
+
+  else if (tuple_shrunken)
+  {
+    /* Update the index slot's length entry after SHRINK
+       and release the extra space. */
+
+    bool grown = false;
+    disk_page_realloc_on_same_page(regFragPtr, diskPagePtr, &key,
+                                   gci, orig_sz, realsz, grown);
+  }
+
+ /* Get the record & check the size. */
+
+  Uint32 *dst;
+  Uint32 newsz = realsz;
+  jam();
+
+  dst = ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
+  newsz = ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
+
+  ndbassert(newsz == realsz);
+
+  /* Copy after_image of the tuple into the data page. */
+
+  memcpy(dst, disk_ptr, fixedsz*4); // Copy fixed part.
+  dst += fixedsz;
+  memcpy(dst, dynstart, dynlen*4); // Copy dyn part.
+  memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+
+  /* Free the original tuple after being MOVEd. */
+
+  if (tuple_moved)
+  {
+    disk_page_free(signal, regTabPtr, regFragPtr,
+		   &key_orig, orig_diskPagePtr, gci);
+  }
+}

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2009-09-04 15:24:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2009-11-09 09:42:56 +0000
@@ -154,26 +154,36 @@ Dbtup::Disk_alloc_info::Disk_alloc_info(
   if (tabPtrP->m_no_of_disk_attributes == 0)
     return;
   
-  Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
-  
-  if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
+  Uint32 max=0;
+  if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
   {
-    Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
-    m_page_free_bits_map[0] = recs_per_page; // 100% free
-    m_page_free_bits_map[1] = 1;
-    m_page_free_bits_map[2] = 0;
+   Uint32 pagesz = Tup_varsize_page::DATA_WORDS - 1;
+    m_page_free_bits_map[0] = pagesz;    // 100% free 8159 bytes
+    m_page_free_bits_map[1] = pagesz/2; // 50% free 4079
+    m_page_free_bits_map[2] = pagesz/10;
     m_page_free_bits_map[3] = 0;
-    
-    Uint32 max= recs_per_page * extent_size;
-    for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
+
+    max= pagesz * extent_size;
+  }
+  else
+  {
+    Uint32 min_size= 4*tabPtrP->m_offsets[DD].m_fix_header_size;
+
+    if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
     {
-      m_total_extent_free_space_thresholds[i] = 
-	(EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
+      Uint32 recs_per_page= (4*Tup_fixsize_page::DATA_WORDS)/min_size;
+      m_page_free_bits_map[0] = recs_per_page; // 100% free
+      m_page_free_bits_map[1] = 1;
+      m_page_free_bits_map[2] = 0;
+      m_page_free_bits_map[3] = 0;
+
+      max= recs_per_page * extent_size;
     }
   }
-  else
+  for(Uint32 i = 0; i<EXTENT_SEARCH_MATRIX_ROWS; i++)
   {
-    abort();
+    m_total_extent_free_space_thresholds[i] =
+      (EXTENT_SEARCH_MATRIX_ROWS - i - 1)*max/EXTENT_SEARCH_MATRIX_ROWS;
   }
 }
 
@@ -186,6 +196,7 @@ Dbtup::Disk_alloc_info::find_extent(Uint
    * Return position in matrix
    */
   Uint32 col = calc_page_free_bits(sz);
+  if (col == 3) col = 2;
   Uint32 mask= EXTENT_SEARCH_MATRIX_COLS - 1;
   for(Uint32 i= 0; i<EXTENT_SEARCH_MATRIX_SIZE; i++)
   {
@@ -428,6 +439,16 @@ Dbtup::disk_page_prealloc(Signal* signal
       PagePtr tmp;
       tmp.i = gpage.i;
       tmp.p = reinterpret_cast<Page*>(gpage.p);
+
+      Uint32 ext= tmp.p->m_extent_info_ptr;
+      Ptr<Extent_info> extentPtr;
+      c_extent_pool.getPtr(extentPtr, ext);
+
+      if (tmp.p->free_space < tmp.p->uncommitted_used_space + sz)
+      {
+        continue;
+      }
+
       disk_page_prealloc_dirty_page(alloc, tmp, i, sz);
       key->m_page_no= tmp.p->m_page_no;
       key->m_file_no= tmp.p->m_file_no;
@@ -451,6 +472,16 @@ Dbtup::disk_page_prealloc(Signal* signal
       Ptr<Page_request> req;
       c_page_request_pool.getPtr(req, ptrI);
 
+      Uint32 ext= req.p->m_extent_info_ptr;
+      Ptr<Extent_info> extentPtr;
+      c_extent_pool.getPtr(extentPtr, ext);
+
+      if (req.p->m_original_estimated_free_space <=
+	    req.p->m_uncommitted_used_space + sz)
+      {
+        continue;
+      }
+
       disk_page_prealloc_transit_page(alloc, req, i, sz);
       * key = req.p->m_key;
       if (DBG_DISK)
@@ -478,7 +509,9 @@ Dbtup::disk_page_prealloc(Signal* signal
   
   int pageBits; // received
   Ptr<Extent_info> ext;
-  const Uint32 bits= alloc.calc_page_free_bits(sz); // required
+  Uint32 bits= alloc.calc_page_free_bits(sz); // required
+  if (bits == EXTENT_SEARCH_MATRIX_COLS - 1)
+    bits= EXTENT_SEARCH_MATRIX_COLS - 2;
   bool found= false;
 
   /**
@@ -886,13 +919,14 @@ Dbtup::disk_page_prealloc_initial_callba
   Ptr<Extent_info> extentPtr;
   c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
 
-  if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
+  if (tabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
   {
-    convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
+    ((Var_page*)pagePtr.p)->init();
   }
   else
+  if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
   {
-    abort();
+    convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
   }
 
   pagePtr.p->m_page_no= req.p->m_key.m_page_no;
@@ -1133,6 +1167,43 @@ Dbtup::disk_page_alloc(Signal* signal, 
   Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
 
   Uint64 lsn;
+
+  if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    Uint32 sz= key->m_page_idx;
+    // Needs space for index slot.
+    ddassert(pagePtr.p->uncommitted_used_space >= sz+1);
+    Uint32 old_free =  ((Var_page*)pagePtr.p)->free_space;
+    Uint32 old_idx= pagePtr.p->list_index;
+    Uint32 new_used= pagePtr.p->uncommitted_used_space -= sz+1;
+
+    key->m_page_idx= ((Var_page*)pagePtr.p)->
+      alloc_record(sz, (Var_page*)ctemp_page, 0);
+
+    Uint32 new_free =  ((Var_page*)pagePtr.p)->free_space;
+    Uint32 new_idx= alloc.calc_page_free_bits(new_free-new_used);
+    Uint32 ext = pagePtr.p->m_extent_info_ptr;
+    Ptr<Extent_info> extentPtr;
+    c_extent_pool.getPtr(extentPtr, ext);
+
+    if (old_idx != new_idx)
+    {
+      jam();
+      disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
+    }
+
+    /* Update ext's free space with the unused space
+       prealloc'd for the index slot, if any.
+    */
+    Uint32 delta = ((old_free - new_free) == sz) ? 1 : 0;
+    if (delta > 0)
+    {
+      update_extent_pos(alloc, extentPtr, delta);
+    }
+
+    lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
+  }
+  else
   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
   {
     ddassert(pagePtr.p->uncommitted_used_space > 0);
@@ -1140,16 +1211,6 @@ Dbtup::disk_page_alloc(Signal* signal, 
     key->m_page_idx= ((Fix_page*)pagePtr.p)->alloc_record();
     lsn= disk_page_undo_alloc(pagePtr.p, key, 1, gci, logfile_group_id);
   }
-  else
-  {
-    Uint32 sz= key->m_page_idx;
-    ddassert(pagePtr.p->uncommitted_used_space >= sz);
-    pagePtr.p->uncommitted_used_space -= sz;
-    key->m_page_idx= ((Var_page*)pagePtr.p)->
-      alloc_record(sz, (Var_page*)ctemp_page, 0);
-    
-    lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
-  }
 }
 
 void
@@ -1168,6 +1229,17 @@ Dbtup::disk_page_free(Signal *signal, 
 
   Uint32 sz;
   Uint64 lsn;
+  if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
+    sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
+    lsn= disk_page_undo_free(pagePtr.p, key,
+			     src, sz,
+			     gci, logfile_group_id);
+
+    ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
+  }
+  else
   if (tabPtrP->m_attributes[DD].m_no_of_varsize == 0)
   {
     sz = 1;
@@ -1179,17 +1251,7 @@ Dbtup::disk_page_free(Signal *signal, 
     
     ((Fix_page*)pagePtr.p)->free_record(page_idx);
   }
-  else
-  {
-    const Uint32 *src= ((Var_page*)pagePtr.p)->get_ptr(page_idx);
-    sz= ((Var_page*)pagePtr.p)->get_entry_len(page_idx);
-    lsn= disk_page_undo_free(pagePtr.p, key,
-			     src, sz,
-			     gci, logfile_group_id);
-    
-    ((Var_page*)pagePtr.p)->free_record(page_idx, 0);
-  }    
-  
+
   Uint32 new_free = pagePtr.p->free_space;
   
   Uint32 ext = pagePtr.p->m_extent_info_ptr;
@@ -1212,7 +1274,15 @@ Dbtup::disk_page_free(Signal *signal, 
     disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
   }
   
-  update_extent_pos(alloc, extentPtr, sz);
+  if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    Uint32 delta = new_free-old_free; // sz + freed index slot, if any
+    update_extent_pos(alloc, extentPtr, delta);
+  }
+  else
+  {
+    update_extent_pos(alloc, extentPtr, sz);
+  }
 #if NOT_YET_FREE_EXTENT
   if (check_free(extentPtr.p) == 0)
   {
@@ -1811,30 +1881,51 @@ Dbtup::disk_restart_undo_alloc(Apply_und
 {
   ndbassert(undo->m_page_ptr.p->m_file_no == undo->m_key.m_file_no);
   ndbassert(undo->m_page_ptr.p->m_page_no == undo->m_key.m_page_no);
+
+  if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
+  }
+  else
   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
   {
     ((Fix_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx);
   }
-  else
-    ((Var_page*)undo->m_page_ptr.p)->free_record(undo->m_key.m_page_idx, 0);
 }
 
 void
 Dbtup::disk_restart_undo_update(Apply_undo* undo)
 {
   Uint32* ptr;
-  Uint32 len= undo->m_len - 4;
+  Uint32 len= undo->m_len - 4; //Len of the before-image- from log record.
+  Uint32 record_len= 0; // Len of the record on the disk page.
+
+  if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
+
+    // Length of the record (after-image).
+    record_len= ((Var_page*)undo->m_page_ptr.p)->
+                  get_entry_len(undo->m_key.m_page_idx);
+    if (len > record_len)
+    {
+      undo->m_key.m_page_idx = grow_record_on_same_page(undo->m_page_ptr,
+                               undo->m_key.m_page_idx, record_len, len);
+      ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
+    }
+    else if (len < record_len)
+    {
+      ((Var_page*)undo->m_page_ptr.p)->shrink_entry(undo->m_key.m_page_idx,
+                                                    len);
+    }
+  }
+  else
   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
   {
     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx, len);
     ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
   }
-  else
-  {
-    ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(undo->m_key.m_page_idx);
-    abort();
-  }  
-  
+
   const Disk_undo::Update *update = (const Disk_undo::Update*)undo->m_ptr;
   const Uint32* src= update->m_data;
   memcpy(ptr, src, 4 * len);
@@ -1845,17 +1936,21 @@ Dbtup::disk_restart_undo_free(Apply_undo
 {
   Uint32* ptr, idx = undo->m_key.m_page_idx;
   Uint32 len= undo->m_len - 4;
+
+  if (undo->m_table_ptr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+    idx= ((Var_page*)undo->m_page_ptr.p)->
+           alloc_record(idx, len, (Var_page*)ctemp_page);
+    ptr= ((Var_page*)undo->m_page_ptr.p)->get_ptr(idx);
+  }
+  else
   if (undo->m_table_ptr.p->m_attributes[DD].m_no_of_varsize == 0)
   {
     ndbrequire(len == undo->m_table_ptr.p->m_offsets[DD].m_fix_header_size);
     idx= ((Fix_page*)undo->m_page_ptr.p)->alloc_record(idx);
     ptr= ((Fix_page*)undo->m_page_ptr.p)->get_ptr(idx, len);
   }
-  else
-  {
-    abort();
-  }  
-  
+
   ndbrequire(idx == undo->m_key.m_page_idx);
   const Disk_undo::Free *free = (const Disk_undo::Free*)undo->m_ptr;
   const Uint32* src= free->m_data;
@@ -1993,6 +2088,146 @@ Dbtup::disk_page_get_allocated(const Tab
       }
     }
     res[0] = cnt * alloc.m_extent_size * File_formats::NDB_PAGE_SIZE;
-    res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
+    if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+    {
+      res[1] = free;
+    }
+    else
+    {
+      res[1] = free * 4 * tabPtrP->m_offsets[DD].m_fix_header_size;
+    }
+  }
+}
+
+
+/*
+  The following methods will be used when a tuple grows and the
+  disk data is varsz (m_disk_tuple_format=DISK_TUPLE_VAR_SIZE).
+*/
+
+
+/* Record has grown; prealloc if there is enough space on the page. */
+bool
+Dbtup::disk_page_prealloc_on_same_page(Signal *signal,
+                                       Fragrecord* fragPtrP,
+                                       Local_key currentKey,
+                                       PagePtr currentpage,
+                                       Uint32 addsz,
+                                       Uint32 page_id)
+{
+  disk_page_set_dirty(currentpage);
+  Disk_alloc_info& alloc = fragPtrP->m_disk_alloc_info;
+  Uint32 used = currentpage.p->uncommitted_used_space;
+
+  Uint32 ext = currentpage.p->m_extent_info_ptr;
+  Ptr<Extent_info> extentPtr;
+  c_extent_pool.getPtr(extentPtr, ext);
+
+  if (currentpage.p->free_space >= used + addsz)
+  {
+    disk_page_prealloc_dirty_page(alloc, currentpage,
+                                  currentpage.p->list_index, addsz);
+    return true;
+  }
+  return false;
+}
+
+Uint16
+Dbtup::grow_record_on_same_page(PagePtr pagePtr, Uint16 page_index,
+                            Uint32 oldsz, Uint32 newsz)
+{
+  Uint32 add = newsz - oldsz;
+  Uint32 *new_var_ptr = 0;
+  Var_page* pageP = (Var_page*)pagePtr.p;
+  ndbassert(newsz);
+  ndbassert(add);
+
+  if (oldsz && pageP->free_space >= add)
+  {
+    jam();
+    new_var_ptr = pageP->get_ptr(page_index);
+    if(!pageP->is_space_behind_entry(page_index, add))
+    {
+      if(0) printf("realloc_rec_on_same_page extra reorg\n");
+      jam();
+      /*
+        In this case we need to reorganise the page to fit. To ensure we
+        don't complicate matters we make a little trick here where we
+        fool the reorg_page to avoid copying the entry at hand and copy
+        that separately at the end. This means we need to copy it out of
+        the page before reorg_page to save the entry contents.
+      */
+      Uint32* copyBuffer = cinBuffer;
+      memcpy(copyBuffer, new_var_ptr, 4*oldsz);
+      pageP->set_entry_len(page_index, 0);
+      pageP->free_space += oldsz;
+      pageP->reorg((Var_page*)ctemp_page);
+      new_var_ptr = pageP->get_free_space_ptr();
+      memcpy(new_var_ptr, copyBuffer, 4*oldsz);
+      pageP->set_entry_offset(page_index, pageP->insert_pos);
+      add += oldsz;
+    }
+    pageP->grow_entry(page_index, add);
+  }
+  return page_index;
+}
+
+/* This method is called by commit_operation in 2 cases:
+  - when a record expands and there is enough space on the current page,
+  - when the record shrinks.
+*/
+void
+Dbtup::disk_page_realloc_on_same_page(Fragrecord* fragPtrP, PagePtr pagePtr,
+                                      Local_key* key, Uint32  gci,
+                                      Uint32 oldsz, Uint32 newsz, bool grow)
+{
+  jam();
+
+  Uint32 logfile_group_id = fragPtrP->m_logfile_group_id;
+  Disk_alloc_info& alloc = fragPtrP->m_disk_alloc_info;
+
+  Uint64 lsn;
+  Uint32 old_used = pagePtr.p->uncommitted_used_space;
+  Uint32 old_free =  ((Var_page*)pagePtr.p)->free_space;
+  Uint32 old_idx = alloc.calc_page_free_bits(old_free-old_used);
+
+  Uint32 new_used = old_used;
+  Uint32 new_free = 0;
+  Uint32 new_idx = 0;
+
+  if (grow)
+  {
+    ddassert(newsz > oldsz);
+    Uint32 diff = newsz-oldsz;
+    key->m_page_idx =
+      grow_record_on_same_page(pagePtr,key->m_page_idx, oldsz, newsz);
+    pagePtr.p->uncommitted_used_space = new_used -= diff;
+  }
+  else
+  {
+    // Record has shrunken.
+    ddassert(newsz < oldsz);
+    ((Var_page*)pagePtr.p)->shrink_entry(key->m_page_idx, newsz);
+  }
+
+  new_free =  ((Var_page*)pagePtr.p)->free_space;
+  new_idx = alloc.calc_page_free_bits(new_free-new_used);
+
+  Ptr<Extent_info> extentPtr;
+
+  if (old_idx != new_idx  || !grow)
+  {
+    Uint32 ext = pagePtr.p->m_extent_info_ptr;
+    c_extent_pool.getPtr(extentPtr, ext);
+  }
+  if (old_idx != new_idx)
+  {
+    disk_page_move_dirty_page(alloc, extentPtr, pagePtr, old_idx, new_idx);
+  }
+
+  /* If the tuple is shrunken, return the delta to the ext's free space.*/
+  if (!grow)
+  {
+    update_extent_pos(alloc, extentPtr, new_free - old_free);
   }
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2009-11-09 09:42:56 +0000
@@ -193,6 +193,8 @@ Dbtup::insertActiveOpList(OperationrecPt
       prevOpPtr.p->op_struct.m_wait_log_buffer;
     regOperPtr.p->op_struct.m_load_diskpage_on_commit= 
       prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
+    regOperPtr.p->op_struct.m_load_diskpage_orig_on_commit =
+      prevOpPtr.p->op_struct.m_load_diskpage_orig_on_commit;
     regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
     // start with prev mask (matters only for UPD o UPD)
 
@@ -200,6 +202,7 @@ Dbtup::insertActiveOpList(OperationrecPt
 
     prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
     prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+    prevOpPtr.p->op_struct.m_load_diskpage_orig_on_commit = 0;
 
     if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
     {
@@ -726,9 +729,20 @@ void Dbtup::execTUPKEYREQ(Signal* signal
 
      if (Roptype == ZUPDATE) {
        jam();
-       if (unlikely(handleUpdateReq(signal, regOperPtr,
-                                    regFragPtr, regTabPtr,
-                                    &req_struct, disk_page != RNIL) == -1))
+       int res=0;
+       if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+       {
+         res= handleUpdateReq_dd_var(signal, operPtr,
+			             fragptr, regTabPtr, &req_struct,
+                                     disk_page != RNIL);
+       }
+       else
+       {
+         res= handleUpdateReq(signal, regOperPtr,
+			      regFragPtr, regTabPtr, &req_struct,
+                              disk_page != RNIL);
+       }
+       if (unlikely(res == -1))
        {
          return;
        }
@@ -1113,6 +1127,8 @@ error:
   Both variable-sized and fixed-size attributes are stored in the same way
   in the expanded form as variable-sized attributes (in expand_var_part()).
 
+  This method is used for both mem and disk dynamic data.
+
     dst         Destination for expanded data
     tabPtrP     Table descriptor
     src         Pointer to the start of dynamic bitmap in source row
@@ -1127,8 +1143,8 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
 		Uint32 row_len,
 		const Uint32 * tabDesc,
 		const Uint16* order,
-		Uint32 mm_dynvar,
-		Uint32 mm_dynfix,
+		Uint32 dynvar,
+		Uint32 dynfix,
 		Uint32 max_bmlen)
 {
   /* Copy the bitmap, zeroing out any words not stored in the row. */
@@ -1161,11 +1177,11 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
   Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
   /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
-  Uint16 dst_off= 4 * (max_bmlen + ((mm_dynvar+2)>>1));
+  Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
   char *dst_ptr= (char*)dst_bm_ptr + dst_off;
-  for(Uint32 i= 0; i<mm_dynvar; i++)
+  for(Uint32 i= 0; i<dynvar; i++)
   {
-    Uint16 j= order[mm_dynfix+i];
+    Uint16 j= order[dynfix+i];
     Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
     Uint32 len;
     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
@@ -1199,12 +1215,12 @@ expand_dyn_part(Dbtup::KeyReqStruct::Var
     dynamic part of the row. This is true both for the stored/shrunken and
     for the expanded form.
   */
-  for(Uint32 i= mm_dynfix; i>0; )
+  for(Uint32 i= dynfix; i>0; )
   {
     i--;
     Uint16 j= order[i];
     Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
-    dst_off_ptr[mm_dynvar+i]= dst_off;
+    dst_off_ptr[dynvar+i]= dst_off;
     /* len offset array is not used for fixed size. */
     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
     if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
@@ -1255,6 +1271,11 @@ Dbtup::prepare_initial_insert(KeyReqStru
   const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
   const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
   const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
+
+  const Uint32 dd_dyns= regTabPtr->m_attributes[DD].m_no_of_dynamic;
+  const Uint32 dd_dynvar= regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+  const Uint32 dd_dynfix= regTabPtr->m_attributes[DD].m_no_of_dyn_fix;
+
   Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
   Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
 
@@ -1320,6 +1341,27 @@ Dbtup::prepare_initial_insert(KeyReqStru
   
   ndbrequire(dd_vars == 0);
   
+  /* Prepare empty dynamic part for disk data. */
+  if (dd_dyns &&
+      regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+  {
+
+    KeyReqStruct::Var_data* dst = &req_struct->m_var_data[DD];
+    dst->m_dyn_data_ptr=
+     (char*)((Tuple_header*)ptr)->get_end_of_fix_part_ptr_dd_with_dlen(regTabPtr);
+
+    dst->m_dyn_offset_arr_ptr = req_struct->var_pos_array +
+                               2 * mm_vars + 2 * (mm_dynvar + mm_dynfix);
+    dst->m_dyn_len_offset = dd_dynvar + dd_dynfix;
+    dst->m_max_dyn_offset = regTabPtr->m_offsets[DD].m_max_dyn_offset;
+    order += mm_dynvar + mm_dynfix;
+
+    ptr = expand_dyn_part(dst, 0, 0,
+                          (Uint32*)tab_descr, order,
+                          dd_dynvar, dd_dynfix,
+                          regTabPtr->m_offsets[DD].m_dyn_null_words);
+  }
+
   req_struct->m_tuple_ptr->m_header_bits= bits;
 
   // Set all null bits
@@ -1467,7 +1509,14 @@ int Dbtup::handleInsertReq(Signal* signa
   }
   else if (regTabPtr->need_shrink())
   {  
-    shrink_tuple(req_struct, sizes+2, regTabPtr, true);
+    if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+    {
+      shrink_tuple_dd_var(req_struct, sizes + 2, regTabPtr, true);
+    }
+    else
+    {
+      shrink_tuple(req_struct, sizes + 2, regTabPtr, true);
+    }
   }
 
   if (ERROR_INSERTED(4025))
@@ -1574,7 +1623,7 @@ int Dbtup::handleInsertReq(Signal* signa
       goto size_change_error;
     }
 
-    if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
+    if (regTabPtr->need_shrink() && sizes[MM] != sizes[MM+2] &&
 	unlikely(handle_size_change_after_update(req_struct,
                                                  base,
                                                  regOperPtr.p,
@@ -1591,8 +1640,8 @@ int Dbtup::handleInsertReq(Signal* signa
   if (disk_insert)
   {
     Local_key tmp;
-    Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
-      1 : sizes[2+DD];
+    Uint32 size= regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE ?
+      sizes[2+DD] : 1;
     
     if (ERROR_INSERTED(4021))
     {
@@ -1600,7 +1649,17 @@ int Dbtup::handleInsertReq(Signal* signa
       goto disk_prealloc_error;
     }
     
-    int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
+    int ret= 0;
+    if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+    {
+      ret = disk_page_prealloc(signal, fragPtr, &tmp, size + 1);
+    // +1 for index slot
+    }
+    else
+    {
+      ret = disk_page_prealloc(signal, fragPtr, &tmp, size);
+    }
+
     if (unlikely(ret < 0))
     {
       terrorCode = -ret;
@@ -1621,6 +1680,23 @@ int Dbtup::handleInsertReq(Signal* signa
     disk_ptr->m_header_bits = 0;
     disk_ptr->m_base_record_ref= ref.ref();
   }
+  else if (disk &&
+           regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+           sizes[DD] != sizes[DD+2])
+  {
+    int ret;
+    if (unlikely(ret = handle_size_change_after_update_disk(signal,
+                                                            req_struct,
+                                                            base,
+                                                            regOperPtr,
+                                                            fragPtr,
+                                                            regTabPtr,
+                                                            sizes))!=0)
+    {
+      terrorCode = -ret;
+      goto exit_error;
+    }
+  }
 
   if (req_struct->m_reorg)
   {
@@ -1713,6 +1789,20 @@ int Dbtup::handleDeleteReq(Signal* signa
                            KeyReqStruct *req_struct,
 			   bool disk)
 {
+  // If tuple exists already, find its length.
+  Tuple_header* base = req_struct->m_tuple_ptr;
+  Uint32 src_tuplelen = 0;
+  if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE &&
+      base->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Local_key key;
+    PagePtr orgpage;
+    memcpy(&key, base->get_disk_ref_ptr(regTabPtr), sizeof(key));
+    key.m_page_no = req_struct->m_disk_page_ptr.i;
+    Uint32 *src_ptr = get_dd_ptr(&orgpage, &key, regTabPtr);
+    src_tuplelen = ((Var_page*)(orgpage.p))->get_entry_len(key.m_page_idx);
+  }
+
   // delete must set but not increment tupVersion
   if (!regOperPtr->is_first_operation())
   {
@@ -1740,9 +1830,17 @@ int Dbtup::handleDeleteReq(Signal* signa
   {
     regOperPtr->op_struct.m_wait_log_buffer = 1;
     regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
-    Uint32 sz= regOperPtr->m_undo_buffer_space= 
-      (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
-      regTabPtr->m_offsets[DD].m_fix_header_size - 1;
+    Uint32 sz = 0;
+    if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
+    {
+      sz = src_tuplelen;
+    }
+    else
+    {
+      sz = regTabPtr->m_offsets[DD].m_fix_header_size-1;
+    }
+    sz += (sizeof(Dbtup::Disk_undo::Free) >> 2 - 1);
+    regOperPtr->m_undo_buffer_space = sz;
     
     D("Logfile_client - handleDeleteReq");
     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
@@ -2957,31 +3055,21 @@ Dbtup::expand_tuple(KeyReqStruct* req_st
   sizes[DD]= 0;
   if(disk && dd_tot)
   {
-    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
     order+= mm_vars+mm_dynvar+mm_dynfix;
     
-    if(bits & Tuple_header::DISK_INLINE)
+    if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
     {
-      // Only on copy tuple
-      ndbassert(bits & Tuple_header::COPY_TUPLE);
+      expand_tuple_dd_var(req_struct, sizes, tabPtrP, bits, disk_ref,
+                          dst_ptr, src_ptr, order, desc);
     }
     else
     {
-      Local_key key;
-      memcpy(&key, disk_ref, sizeof(key));
-      key.m_page_no= req_struct->m_disk_page_ptr.i;
-      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+      expand_tuple_dd_fixed(req_struct, sizes, tabPtrP, bits, disk_ref,
+                            dst_ptr, src_ptr);
     }
-    extra_bits |= Tuple_header::DISK_INLINE;
 
-    // Fix diskpart
-    req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
-    memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
-    sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
-    
+    extra_bits |= Tuple_header::DISK_INLINE;
     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
-    
-    ndbrequire(dd_vars == 0);
   }
   
   ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
@@ -3087,7 +3175,6 @@ Dbtup::prepare_read(KeyReqStruct* req_st
   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
   
   const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
-  const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
   const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
   if(mm_vars || mm_dyns)
   {
@@ -3171,25 +3258,14 @@ Dbtup::prepare_read(KeyReqStruct* req_st
   
   if(disk && dd_tot)
   {
-    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
-    
-    if(bits & Tuple_header::DISK_INLINE)
+    if (tabPtrP->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
     {
-      // Only on copy tuple
-      ndbassert(bits & Tuple_header::COPY_TUPLE);
+      prepare_read_dd_var(req_struct, tabPtrP, src_ptr);
     }
     else
     {
-      // XXX
-      Local_key key;
-      memcpy(&key, disk_ref, sizeof(key));
-      key.m_page_no= req_struct->m_disk_page_ptr.i;
-      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+      prepare_read_dd_fixed(req_struct, tabPtrP, src_ptr);
     }
-    // Fix diskpart
-    req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
-    ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
-    ndbrequire(dd_vars == 0);
   }
 
   req_struct->is_expanded= false;
@@ -3289,7 +3365,7 @@ Dbtup::shrink_tuple(KeyReqStruct* req_st
         /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... }, 
          * split to separate function. */
         Uint16 dyn_dst_data_offset= 0;
-        const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask;
+        const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[MM];
         for(Uint16 i= 0; i< bm_len; i++)
         {
           Uint32 v= src_bm_ptr[i];
@@ -3486,7 +3562,8 @@ Dbtup::handle_size_change_after_update(K
 				       Tablerec* regTabPtr,
 				       Uint32 sizes[4])
 {
-  ndbrequire(sizes[1] == sizes[3]);
+  if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_FIXED_SIZE)
+    ndbrequire(sizes[1] == sizes[3]);
   //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
   if(0)
     printf("%p %d %d - handle_size_change_after_update ",
@@ -3967,3 +4044,1065 @@ Dbtup::nr_delete_log_buffer_callback(Sig
   
   c_lqh->nr_delete_complete(signal, &op);
 }
+
+
+/*
+  The following methods will handle disk data
+  as fixed or variable size, depending on the m_disk_tuple_format
+*/
+
+void
+Dbtup::prepare_read_dd_fixed(KeyReqStruct* req_struct,
+		             Tablerec* tabPtrP, const Uint32 *src_ptr)
+{
+  Tuple_header* ptr = req_struct->m_tuple_ptr;
+  Uint32 bits = ptr->m_header_bits;
+  const Uint32 *disk_ref = ptr->get_disk_ref_ptr(tabPtrP);
+
+  const Uint16 dd_vars = tabPtrP->m_attributes[DD].m_no_of_varsize;
+
+  if(bits & Tuple_header::DISK_INLINE)
+  {
+    // Only on copy tuple
+    ndbassert(bits & Tuple_header::COPY_TUPLE);
+  }
+  else
+  {
+    Local_key key;
+    memcpy(&key, disk_ref, sizeof(key));
+    key.m_page_no = req_struct->m_disk_page_ptr.i;
+    src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+  }
+  // Fixed diskpart
+  req_struct->m_disk_ptr = (Tuple_header*)src_ptr;
+  ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+  ndbrequire(dd_vars == 0);
+}
+
+void
+Dbtup::prepare_read_dd_var(KeyReqStruct* req_struct,
+			   Tablerec* tabPtrP, const Uint32 *src_ptr)
+{
+  Tuple_header* ptr = req_struct->m_tuple_ptr;
+  Uint32 bits = ptr->m_header_bits;
+  const Uint32 *disk_ref = ptr->get_disk_ref_ptr(tabPtrP);
+
+  const Uint32 *src_data = src_ptr;
+  Uint32 src_dynlen = 0;
+  Uint32 src_fixedlen = 0;
+
+  if (bits & Tuple_header::DISK_INLINE)
+  {
+    // Only on copy tuple
+    ndbassert(bits & Tuple_header::COPY_TUPLE);
+    src_data = ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+    src_fixedlen = src_data - src_ptr;
+    Varpart_copy *dp = (Varpart_copy*)(src_data);
+    src_dynlen = dp->m_len;
+    src_data = dp->m_data;
+  }
+  else
+  {
+    Local_key key;
+    memcpy(&key, disk_ref, sizeof(key));
+    key.m_page_no = req_struct->m_disk_page_ptr.i;
+    src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+    Uint32 src_tuplelen =
+      ((Var_page*)(req_struct->m_disk_page_ptr.p))->get_entry_len(key.m_page_idx);
+
+    src_data = ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+    src_fixedlen =  ((Tuple_header*)src_ptr)->get_fix_size_dd(tabPtrP);
+    src_dynlen = src_tuplelen - src_fixedlen;
+  }
+  // Dyn diskpart
+  req_struct->m_disk_ptr = (Tuple_header*)src_ptr;
+  ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+
+  req_struct->m_var_data[DD].m_dyn_part_len = src_dynlen;
+  req_struct->m_var_data[DD].m_dyn_data_ptr = (char*)(src_data);
+}
+
+void
+Dbtup::expand_tuple_dd_fixed(KeyReqStruct *req_struct, Uint32 sizes[2],
+                             const Tablerec *tabPtrP, Uint32 bits,
+                             const Uint32 *disk_ref, Uint32 *dst_ptr,
+                             const Uint32 *src_ptr)
+{
+    const Uint16 dd_vars = tabPtrP->m_attributes[DD].m_no_of_varsize;
+
+    if(bits & Tuple_header::DISK_INLINE)
+    {
+      // Only on copy tuple
+      ndbassert(bits & Tuple_header::COPY_TUPLE);
+    }
+    else
+    {
+      Local_key key;
+      memcpy(&key, disk_ref, sizeof(key));
+      key.m_page_no = req_struct->m_disk_page_ptr.i;
+      src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+    }
+
+    // Fix diskpart
+    req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+    memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
+    sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
+
+    ndbrequire(dd_vars == 0);
+}
+
+void
+Dbtup::expand_tuple_dd_var(KeyReqStruct *req_struct, Uint32 sizes[2],
+		           const Tablerec *tabPtrP, Uint32 bits,
+                           const Uint32 *disk_ref, Uint32 *dst_ptr,
+                           const Uint32 *src_ptr, const Uint16 *order,
+                           const Uint32 *desc)
+{
+  const Uint16 dd_dynvar = tabPtrP->m_attributes[DD].m_no_of_dyn_var;
+  const Uint16 dd_dynfix = tabPtrP->m_attributes[DD].m_no_of_dyn_fix;
+  Uint16 mm_dynvar = tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+  Uint16 mm_vars = tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dynfix = tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+  KeyReqStruct::Var_data* dst = &req_struct->m_var_data[DD];
+
+  req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+
+  Uint32 src_dynlen = 0;
+  Varpart_copy* dp;
+  Uint32 src_tuplelen = 0;
+
+  if (bits & Tuple_header::DISK_INLINE)
+  {
+    /* Only on copy tuple. */
+    ndbassert(bits & Tuple_header::COPY_TUPLE);
+    /* Copy tuple has dyn len field after fixed data. */
+    Uint32* end_fix_part_ptr =
+      ((Tuple_header*)src_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+    dp = (Varpart_copy*)(end_fix_part_ptr);
+    src_dynlen = dp->m_len;
+
+    Uint32 disk_fix_header_len =
+      ((Tuple_header*)src_ptr)->get_fix_size_dd_with_dlen(tabPtrP);
+
+    /* Dynamic length takes 1 word. */
+    src_tuplelen = disk_fix_header_len + src_dynlen - 1;
+    memcpy(dst_ptr, src_ptr, 4*disk_fix_header_len);
+    src_ptr += disk_fix_header_len;
+    dst_ptr += disk_fix_header_len;
+  }
+  else
+  {
+    Local_key key;
+    memcpy(&key, disk_ref, sizeof(key));
+    key.m_page_no = req_struct->m_disk_page_ptr.i;
+    src_ptr = get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+
+    // stored tuple does not have dyn len field after fixed data
+    src_tuplelen= ((Var_page*)(req_struct->m_disk_page_ptr.p))->
+                    get_entry_len(key.m_page_idx);
+
+    Uint32 src_disk_fix_header_len =
+      ((Tuple_header*)src_ptr)->get_fix_size_dd(tabPtrP);
+    src_dynlen = src_tuplelen - src_disk_fix_header_len;
+
+    memcpy(dst_ptr, src_ptr, 4*src_disk_fix_header_len);
+    src_ptr += src_disk_fix_header_len;
+    dst_ptr += src_disk_fix_header_len;
+
+    dp = (Varpart_copy*)(dst_ptr);
+
+    /* copy tuple keeps the length of the dynamic part. */
+    dp->m_len = src_dynlen;
+    dst_ptr = dp->m_data;
+  }
+
+  /* Dyn diskpart - no space for dyn len field in sizes[DD]. */
+  sizes[DD] = src_tuplelen;
+  dst->m_dyn_offset_arr_ptr = req_struct->var_pos_array +
+                              2 * mm_vars + 2 * (mm_dynvar + mm_dynfix);
+  dst->m_dyn_len_offset = dd_dynvar+dd_dynfix;
+  dst->m_max_dyn_offset = tabPtrP->m_offsets[DD].m_max_dyn_offset;
+  dst->m_dyn_data_ptr = (char*)dst_ptr;
+  dst->m_dyn_part_len = dp->m_len;
+
+  dst_ptr = expand_dyn_part(dst, src_ptr, dst->m_dyn_part_len,
+                            (Uint32*)desc, order,
+                            dd_dynvar, dd_dynfix,
+                            tabPtrP->m_offsets[DD].m_dyn_null_words);
+}
+
+static
+Uint32*
+shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
+                Uint32 *dst_ptr,
+                const Dbtup::Tablerec* tabPtrP,
+                const Uint32 * tabDesc,
+                const Uint16* order,
+                Uint32 dynvar,
+                Uint32 dynfix,
+                Uint32 ind)
+{
+  /*
+    Now build the dynamic part, if any.
+    First look for any trailing all-NULL words of the bitmap; we do
+    not need to store those.
+  */
+  assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+  char *dyn_src_ptr = dst->m_dyn_data_ptr;
+  Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
+
+  /* If no dynamic variables, store nothing. */
+  assert(bm_len);
+  {
+    Uint32 *bm_ptr = (Uint32 *)dyn_src_ptr + bm_len - 1;
+    while(*bm_ptr == 0)
+    {
+      bm_ptr--;
+      bm_len--;
+      if(bm_len == 0)
+        break;
+    }
+  }
+
+  if (bm_len)
+  {
+    /**
+     * Copy the bitmap, counting the number of variable sized
+     * attributes that are not NULL on the way.
+     */
+    Uint32 *dyn_dst_ptr = dst_ptr;
+    Uint32 dyn_var_count = 0;
+    const Uint32 *src_bm_ptr = (Uint32 *)(dyn_src_ptr);
+    Uint32 *dst_bm_ptr = (Uint32 *)dyn_dst_ptr;
+
+    /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
+     * split to separate function. */
+    Uint16 dyn_dst_data_offset = 0;
+    const Uint32 *dyn_bm_var_mask_ptr = tabPtrP->dynVarSizeMask[ind];
+    for(Uint16 i = 0; i < bm_len; i++)
+    {
+      Uint32 v = src_bm_ptr[i];
+      dyn_var_count += BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
+      dst_bm_ptr[i] = v;
+    }
+
+    Uint32 tmp = *dyn_dst_ptr;
+    assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
+    * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
+    dyn_dst_ptr += bm_len;
+    dyn_dst_data_offset = 2*dyn_var_count + 2;
+
+    Uint16 *dyn_src_off_array = dst->m_dyn_offset_arr_ptr;
+    Uint16 *dyn_src_lenoff_array =
+      dyn_src_off_array + dst->m_dyn_len_offset;
+    Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
+
+    /*
+      Copy over the variable sized not-NULL attributes.
+      Data offsets are counted from the start of the offset array, and
+      we store one additional offset to be able to easily compute the
+      data length as the difference between offsets.
+    */
+    Uint16 off_idx = 0;
+    for(Uint32 i = 0; i < dynvar; i++)
+    {
+      /*
+        Note that we must use the destination (shrunken) bitmap here,
+        as the source (expanded) bitmap may have been already clobbered
+        (by offset data).
+      */
+      Uint32 attrDesc2 = tabDesc[order[dynfix+i] + 1];
+      Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+      if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+      {
+        dyn_dst_off_array[off_idx++] = dyn_dst_data_offset;
+        Uint32 dyn_src_off = dyn_src_off_array[i];
+        Uint32 dyn_len = dyn_src_lenoff_array[i] - dyn_src_off;
+        memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
+                dyn_src_ptr + dyn_src_off,
+                dyn_len);
+        dyn_dst_data_offset += dyn_len;
+      }
+    }
+    /* If all dynamic attributes are NULL, we store nothing. */
+    dyn_dst_off_array[off_idx] = dyn_dst_data_offset;
+    assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
+
+    char *dynvar_end_ptr = ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
+    char *dyn_dst_data_ptr = (char *)(ALIGN_WORD(dynvar_end_ptr));
+
+    /*
+      Zero out any padding bytes. Might not be strictly necessary,
+      but seems cleaner than leaving random stuff in there.
+    */
+    bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
+
+    /*
+     Copy over the fixed-sized not-NULL attributes.
+     Note that attributes are copied in reverse order; this is to avoid
+     overwriting not-yet-copied data, as the data is also stored in
+     reverse order.
+    */
+    for(Uint32 i = dynfix; i > 0; )
+    {
+      i--;
+      Uint16 j = order[i];
+      Uint32 attrDesc2 = tabDesc[j+1];
+      Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+      if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+      {
+        Uint32 fixsize =
+          4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+        memmove(dyn_dst_data_ptr,
+                dyn_src_ptr + dyn_src_off_array[dynvar+i],
+                fixsize);
+     dyn_dst_data_ptr += fixsize;
+      }
+    }
+    dst_ptr = (Uint32*)dyn_dst_data_ptr;
+    assert((UintPtr(dst_ptr) & 3) == 0);
+  }
+  return (Uint32 *)dst_ptr;
+}
+
+void
+Dbtup::shrink_tuple_dd_var(KeyReqStruct* req_struct, Uint32 sizes[2],
+                           const Tablerec* tabPtrP, bool disk)
+{
+  ndbassert(tabPtrP->need_shrink());
+  Tuple_header* ptr = req_struct->m_tuple_ptr;
+  ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
+  KeyReqStruct::Var_data* dst = &req_struct->m_var_data[MM];
+  Uint32 order_desc = tabPtrP->m_real_order_descriptor;
+  const Uint32 * tabDesc = (Uint32*)req_struct->attr_descr;
+  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
+  Uint16 dd_tot = tabPtrP->m_no_of_disk_attributes;
+  Uint16 mm_fix = tabPtrP->m_attributes[MM].m_no_of_fixsize;
+  Uint16 mm_vars = tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dyns = tabPtrP->m_attributes[MM].m_no_of_dynamic;
+  Uint16 mm_dynvar = tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+  Uint16 mm_dynfix = tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+
+  Uint32 *dst_ptr = ptr->get_end_of_fix_part_ptr(tabPtrP);
+  Uint16* src_off_ptr = req_struct->var_pos_array;
+  order += mm_fix;
+
+  sizes[MM] = 1;
+  sizes[DD] = 0;
+  if(mm_vars || mm_dyns)
+  {
+    Varpart_copy* vp = (Varpart_copy*)dst_ptr;
+    Uint32* varstart = dst_ptr = vp->m_data;
+
+    if (mm_vars)
+    {
+      Uint16* dst_off_ptr = (Uint16*)dst_ptr;
+      char*  dst_data_ptr = (char*)(dst_off_ptr + mm_vars + 1);
+      char*  src_data_ptr = dst_data_ptr;
+      Uint32 off = 0;
+      for(Uint32 i = 0; i < mm_vars; i++)
+      {
+        const char* data_ptr = src_data_ptr + *src_off_ptr;
+        Uint32 len = src_off_ptr[mm_vars] - *src_off_ptr;
+        * dst_off_ptr++ = off;
+        memmove(dst_data_ptr, data_ptr, len);
+        off += len;
+        src_off_ptr++;
+        dst_data_ptr += len;
+      }
+      *dst_off_ptr = off;
+      dst_ptr = ALIGN_WORD(dst_data_ptr);
+      order += mm_vars;
+    }
+
+    if (mm_dyns)
+    {
+      dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
+                                order, mm_dynvar, mm_dynfix, MM);
+      order += mm_dynfix + mm_dynvar;
+    }
+
+    Uint32 varpart_len = dst_ptr - varstart;
+    vp->m_len = varpart_len;
+    sizes[MM] = varpart_len;
+    ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
+
+    ndbassert((UintPtr(ptr) & 3) == 0);
+    ndbassert(varpart_len < 0x10000);
+  }
+
+  if(disk && dd_tot)
+  {
+    Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
+    req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+
+    /* Copy dd fix diskpart: header + nullbits + GCI + dynlen. */
+    Uint32 disk_fix_header_len =
+      ((Tuple_header*)src_ptr)->get_fix_size_dd_with_dlen(tabPtrP);
+    Uint32* end_fix_part_ptr =
+      ((Tuple_header*)dst_ptr)->get_end_of_fix_part_ptr_dd(tabPtrP);
+
+    Varpart_copy* dp = (Varpart_copy*)(end_fix_part_ptr);
+
+    memmove(dst_ptr, src_ptr, 4 * disk_fix_header_len);
+    src_ptr += disk_fix_header_len;
+    Uint32 *diskdynstartptr= dst_ptr = dp->m_data;
+
+    /* Copy dynamic disk part. */
+    dst = &req_struct->m_var_data[DD];
+    Uint16 dd_dynvar = tabPtrP->m_attributes[DD].m_no_of_dyn_var;
+    Uint16 dd_dynfix = tabPtrP->m_attributes[DD].m_no_of_dyn_fix;
+
+    dst->m_dyn_data_ptr = (char*)src_ptr;
+
+    dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
+                             order, dd_dynvar, dd_dynfix, DD);
+
+    Uint32 dynpart_len = dst_ptr - diskdynstartptr;
+    dst->m_dyn_part_len = dynpart_len;
+    dp->m_len = dynpart_len;
+
+   // -1: to remove the space for dyn len
+    sizes[DD] = dynpart_len + disk_fix_header_len - 1;
+  }
+
+  req_struct->is_expanded = false;
+}
+
+/*
+  Dbtup::handle_size_change_after_update_disk(..) handles the
+  size changes when a disk-tuple is updated.
+
+  Terminology:
+  Initial_insert: inserted first time withing this transaction
+  Original_tuple = stored tuple
+  Originally_alloced = size of the stored tuple
+  Previously_alloced = size of the copy tuple after ini_insert
+  or a stored tuple is updated by a previous operation
+  delta: difference between prev/ori_alloc'd size and the current sz
+  final_delta: aggregated deltas
+
+  Use cases:
+
+  A) Initial Insert cases:
+
+  DISK_SHRINK case:
+  1)
+  prev_alloc'd tuple  : ----------------
+  tuple after update  : ----------
+
+  Abort_prealloc delta
+  At commit, the real sz will be alloc'd
+
+  2) Tuple growth will lead to DISK_MOVE
+  (This limitation will be removed later)
+  prev_alloc'd tuple  : ----------------
+  abort_prealloc this, order a new prealloc to fullsz (total_rec_sz)
+  tuple after update : --------------------------
+
+  At commit, final delta will be abort_prealloc'd
+  and the real sz will be alloc'd
+  At abort or DELETE of the tuple as the last operation, abort the prealloc
+
+  B) Not the initial insert cases:
+
+  DISK_SHRINK cases:
+
+  At commit, final-delta will be freed (disk_free)
+  and the real sz will be realloc'd
+
+  1) simple cases:
+  ori_alloc'd tuple  : ----------------
+  tuple after update : ----------
+
+  2) SHRINK and then SHRINK
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple
+  tuple after prev update : -------------
+  tuple after current upd : --------
+
+  3) GROW and then SHRINK to a size smaller than the ori_alloc'd sz
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple
+  tuple after prev update : -----------------------
+  tuple after current upd : -----------
+
+  DISK_GROW cases:
+
+  deltas will be adjusted with prealloc or abort_prealloc here
+  At commit, the real size will be realloc'd
+
+  1) simple case 1:
+  ori_alloc'd tuple     : ----------------
+  tup after cur update  : -------------------- (found space on the page)
+
+  2) GROW and then GROW
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple
+  tuple after prev update : ---------------------
+  tuple after current upd : --------------------------
+
+  3) GROW and then shrink to a sz still larger than ori_alloc'd size
+  ori_alloc'd             : ----------------
+  more updates on the tuple
+  tuple after prev upd : ------------------------------
+  after current update : ------------------------
+
+  4) SHRINK and then GROW
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple
+  tuple after prev update : -----------
+  tuple after current upd : -----------------------
+
+  DISK_MOVE cases:
+
+  ori_alloc'd : ----------------
+  after updat : -------------------- (not enough space on the page)
+
+  No_change cases:
+
+  1) simple case:
+  ori_alloc'd tuple  : ----------------
+  tuple after update : ----------------
+
+  2) GROWn and then to ori_alloc'd size
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple
+  tuple after prev update : -----------------------
+  tuple after current upd : ----------------
+
+  3) SHRINK and then to ori_alloc'd size
+  ori_alloc'd tuple       : ----------------
+  more updates on the tuple ..
+  tuple after prev update : ---------
+  tuple after current upd : ----------------
+*/
+
+int
+Dbtup::handle_size_change_after_update_disk(Signal* signal,
+                                       KeyReqStruct* req_struct,
+				       Tuple_header* org,
+				       Ptr<Operationrec> regOperRecPtr,
+				       Ptr<Fragrecord> fragPtr,
+				       Tablerec* regTabPtr,
+				       Uint32 sizes[4])
+{
+  jam();
+  Operationrec* regOperPtr = regOperRecPtr.p;
+
+  if (0)
+  {
+    ndbout_c("\n scaud start: sizes - %d %d %d %d",
+             sizes[0], sizes[1], sizes[2], sizes[3]);
+    printf("%p %d %d\n",
+	   req_struct->m_tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx);
+  }
+
+  ndbassert(regTabPtr->m_no_of_disk_attributes);
+  Uint32 needed = sizes[2+DD];
+  ndbassert(needed <= regTabPtr->total_rec_size);
+
+  Tuple_header* copy = req_struct->m_tuple_ptr;
+  Uint32 copy_bits = copy->m_header_bits;
+  ndbassert(copy_bits & Tuple_header::COPY_TUPLE);
+  Local_key copykey;
+  memcpy(&copykey, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+
+  if (copy_bits & Tuple_header::DISK_MOVE)
+  {
+    /* Tuple is already moved to another page with total_rec_size,
+       so must have enough space to all the consecutive 'grow's.
+    */
+    if (0)
+      ndbout_c("Tuple is already moved. pgidx %u, returning..",
+               copykey.m_page_idx);
+    return 0;
+  }
+
+  Uint32 org_bits = org->m_header_bits;
+
+  /* The size of the stored tuple. */
+  Uint32 originally_alloced = 0;
+  /* The copy tuple size allocated by the previous operation. */
+  Uint32 previously_alloced = 0;
+
+  PagePtr orgpage;
+  Local_key orgkey;
+
+  bool initial_insert = false;
+  if (copy_bits & Tuple_header::DISK_ALLOC)
+  {
+    initial_insert = true;
+    previously_alloced = copykey.m_page_idx;
+    //   if (0) ndbout_c("initialinsert cp.pgidx %u", copykey.m_page_idx);
+
+    /* Ini_ins: copykey.m_page_idx will reflect the first time allocated size
+       Any shrink should be found from the realsz of the copy tuple at commit
+    */
+    if (needed <= previously_alloced)
+    {
+      if (0)
+        ndbout_c("initial insert and equal or shrink pgidx %u",
+                  copykey.m_page_idx);
+      return 0;
+    }
+    if (0) ndbout_c("initial insert and grow");
+    // implement later: wait for disk_page and expand on it if possible
+    Page_cache_client::Request req;
+    memcpy(&req.m_page, &copykey, sizeof(Local_key));
+
+    /* +1 for the extra word prealloc'd for the index slot. */
+    disk_page_abort_prealloc(signal, fragPtr.p,
+                             &req.m_page, req.m_page.m_page_idx+1);
+    goto prealloc_full_sz;
+  }
+  else
+  {
+    /* Not initial_insert:
+       Originally_alloced tuple and its size will be kept intact.
+    */
+    ndbassert(org_bits & Tuple_header::DISK_PART);
+
+    memcpy(&orgkey, org->get_disk_ref_ptr(regTabPtr), sizeof(orgkey));
+    Uint32 tmp_pg_no = orgkey.m_page_no;
+    orgkey.m_page_no = req_struct->m_disk_page_ptr.i;
+    Uint32 *org_ptr = get_dd_ptr(&orgpage, &orgkey, regTabPtr);
+    originally_alloced =
+      ((Var_page*)orgpage.p)->get_entry_len(orgkey.m_page_idx);
+    orgkey.m_page_no = tmp_pg_no;
+    previously_alloced = originally_alloced;
+    if (copy_bits & Tuple_header::DISK_GROW)
+      previously_alloced += copykey.m_page_idx;
+
+    Uint32 reservesz = 0;
+    Uint32 unreservesz = 0;
+
+    if (!(copy_bits & Tuple_header::DISK_GROW))
+    {
+      // Not grown before.
+      if (needed <= originally_alloced)
+      {
+        /* Don't shrink the original tuple before commit,
+           in case transaction aborts
+        */
+        if (0) ndbout_c("!initial insert, needed <= ori_allocd sz");
+        if (needed < originally_alloced)
+        {
+          copy->m_header_bits |= Tuple_header::DISK_SHRINK;
+        }
+        else
+	{
+          // needed = originally_alloced, remove any flags
+          copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_SHRINK;
+          copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+        }
+        copykey.m_page_idx = orgkey.m_page_idx;
+        return 0;
+      }
+      else
+      {
+        // First time grow, needed > originally_alloced
+
+        // Remove any previous shrink
+        copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_SHRINK;
+
+        reservesz = needed - originally_alloced;
+        copykey.m_page_idx = reservesz;
+
+        if (originally_alloced + copykey.m_page_idx != sizes[DD+2])
+        {
+          ndbout_c("OBS !ini_ins First time grow:");
+          ndbout_c("needed %u > ori_allocd sz %u, reservesz %u sizes[DD] %u",
+                    needed, originally_alloced, reservesz, sizes[DD]);
+	}
+	ndbassert(originally_alloced + copykey.m_page_idx == sizes[DD+2]);
+	if (0) ndbout_c("First time grow.");
+      }
+    }
+    else
+    {
+      /* Tuple has grown before. There was a previous prealloc. */
+      if (needed <= originally_alloced)
+      {
+        // Unreserve any previous realloc.
+        unreservesz = previously_alloced - originally_alloced;
+        copykey.m_page_idx = orgkey.m_page_idx;
+
+        // Remove any previous flag.
+        copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+
+	if (needed < originally_alloced)
+        {
+          copy->m_header_bits |= Tuple_header::DISK_SHRINK;
+        }
+	if (0)
+        {
+          ndbout_c("!ini_ins: needed %u <= ori_alloced && prev realloc",
+                    needed);
+          ndbout_c("unresrv %u pg_idx %u ori_alloced %u",
+                    unreservesz, copykey.m_page_idx, originally_alloced);
+          ndbout_c(" prev_alloced %u", previously_alloced);
+	}
+      }
+      else
+      {
+        // needed > originally_alloced
+        if (needed == previously_alloced)
+        {
+          if (0)
+          {
+	    /* Riding on an allocation made by the operation
+               prior to ZDELETE.
+	    */
+            ndbout_c("!initial insert, needed == prev_alloced %u pgidx %u",
+                      needed, copykey.m_page_idx);
+            Operationrec* prevOp= req_struct->prevOpPtr.p;
+
+            if (prevOp->op_struct.op_type == ZDELETE &&
+                regOperPtr->op_struct.op_type == ZINSERT)
+              ndbout_c("insert after delete");
+          }
+          return 0;
+        }
+
+        if (needed < previously_alloced)
+	{
+          // Release any extra space preallocated.
+          unreservesz = previously_alloced - needed;
+          copykey.m_page_idx = needed - originally_alloced;
+          if (0)
+          {
+            ndbout_c("!ini_ins: shrink from prev_all");
+            ndbout_c("needed %u < previously_alloced %u unres %u",
+                      needed, previously_alloced, unreservesz);
+	  }
+	}
+        else if (needed > previously_alloced)
+	{
+          reservesz = needed - previously_alloced; // delta
+          copykey.m_page_idx = needed - originally_alloced;
+          if (0)
+          {
+            ndbout_c("!ini_ins: grow from prev_all:");
+            ndbout_c("needed %u > prev_alloced %u reservsz %u pg_idx %u",
+                     needed, previously_alloced, reservesz,
+                     copykey.m_page_idx);
+	  }
+	}
+      }
+    }
+
+    // Unreserve any extra reserved space.
+    if (unreservesz != 0)
+    {
+      if (0)
+      {
+        ndbout_c("scaud unreserving abrt preal1 [%u %u]",
+                  copykey.m_file_no, copykey.m_page_no);
+        ndbout_c("unresrvsz %u cp pg_idx %u unsetting preald",
+                  unreservesz, copykey.m_page_idx);
+      }
+      disk_page_abort_prealloc_callback_1(signal, fragPtr.p, orgpage,
+                                        unreservesz);
+      memcpy(copy->get_disk_ref_ptr(regTabPtr), &copykey, sizeof(Local_key));
+      if (reservesz == 0)
+      {
+	if (0)
+         ndbout_c("no new reservn pgidx %u, unsetting preald",
+                   copykey.m_page_idx);
+
+        // Remove any previous flags.
+        copy->m_header_bits &= ~(Uint32)Tuple_header::DISK_GROW;
+
+        return 0;
+      }
+      else
+      {
+        ndbout_c("OBS! unresrv and resrv at the same time");
+        ndbassert(0);
+      }
+    }
+
+   if (reservesz &&
+       disk_page_prealloc_on_same_page(signal, fragptr.p, orgkey,
+				       orgpage, reservesz,
+                                       req_struct->m_disk_page_ptr.i))
+    {
+      if (0)
+      {
+        ndbout_c("Found space in orgpage [%u %u]",
+                  (orgpage.p)->m_file_no, (orgpage.p)->m_page_no);
+        ndbout_c("copykey [%u %u] reservesz %u pg_idx %u orgpgidx %u",
+                  copykey.m_file_no, copykey.m_page_no,reservesz,
+                  copykey.m_page_idx, orgkey.m_page_idx);
+      }
+      regOperPtr->op_struct.m_disk_preallocated = 1;
+      regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
+      regOperPtr->op_struct.m_wait_log_buffer = 1;
+
+      copy->m_header_bits |= Tuple_header::DISK_GROW;
+      memcpy(copy->get_disk_ref_ptr(regTabPtr), &copykey, sizeof(Local_key));
+
+      if (originally_alloced + copykey.m_page_idx != sizes[DD+2])
+      {
+        ndbout_c("!initial insert, grow:");
+        ndbout_c("needed %u > ori_allocd sz %u + pg_idx %u reservesz %u",
+              needed, originally_alloced, copykey.m_page_idx, reservesz);
+      }
+      ndbassert(originally_alloced + copykey.m_page_idx == sizes[DD+2]);
+      return 0;
+    }
+   else
+   {
+     if (0)
+       ndbout_c("No space in cur page [%u %u] reservesz %u pg_idx %u",
+                   copykey.m_file_no, copykey.m_file_no,reservesz,
+                   copykey.m_page_idx);
+    }
+  } // end: ! initial_insert
+
+  prealloc_full_sz:
+
+  if (!initial_insert)
+  {
+    Uint32 logrecsz = sizeof(Dbtup::Disk_undo::Alloc) >> 2;
+    Logfile_client lgman(this, c_lgman, fragptr.p->m_logfile_group_id);
+    int res = lgman.alloc_log_space(logrecsz);
+    if (unlikely(res < 0))
+    {
+      regOperPtr->m_undo_buffer_space= 0;
+      return res;
+    }
+    /* Moving existing tuple will be logged as delete-old and insert-new. */
+    regOperPtr->m_undo_buffer_space += logrecsz;
+  }
+  /* else, i.e. initial insert case, regOperPtr->m_undo_buffer_space is
+     already set in prepare_initial_insert.
+  */
+
+  Local_key tmp;
+  // +1 for index slot
+  int ret= disk_page_prealloc(signal, fragptr, &tmp,
+                                regTabPtr->total_rec_size + 1);
+  if (unlikely(ret < 0))
+  {
+    regOperPtr->m_undo_buffer_space= 0;
+    return ret;
+  }
+
+ if (0)
+    ndbout_c("scaud preallocs/MOVE page[%u %u] totalsz %u",
+              tmp.m_file_no, tmp.m_page_no, regTabPtr->total_rec_size);
+
+  if (!initial_insert &&
+      tmp.m_file_no == orgkey.m_file_no &&
+      tmp.m_page_no == orgkey.m_page_no)
+  {
+    ndbout_c("OBS MV: orig key  [%u %u] mvd key [%u %u] totalsz %u",
+              orgkey.m_file_no, orgkey.m_page_no, tmp.m_file_no,
+              tmp.m_page_no, regTabPtr->total_rec_size);
+    ndbassert(0);
+  }
+
+  if (!initial_insert)
+  {
+    copy->m_header_bits |= Tuple_header::DISK_MOVE;
+    regOperPtr->op_struct.m_load_diskpage_orig_on_commit = 1;
+  }
+  regOperPtr->op_struct.m_disk_preallocated = 1;
+
+  tmp.m_page_idx = regTabPtr->total_rec_size;
+  memcpy(copy->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
+  /*
+    Set ref from disk to mm
+  */
+  Local_key ref = regOperPtr->m_tuple_location;
+  ref.m_page_no = req_struct->frag_page_id;
+
+  Tuple_header* disk_ptr = req_struct->m_disk_ptr;
+  disk_ptr->m_header_bits = 0;
+  disk_ptr->m_base_record_ref = ref.ref();
+
+  return 0;
+}
+
+/* UPDATE when the disk tuple is of format varsz */
+
+int Dbtup::handleUpdateReq_dd_var(Signal* signal,
+                                  Ptr<Operationrec> regOperRecPtr,
+                                  Ptr<Fragrecord> fragPtr,
+                                  Tablerec* regTabPtr,
+                                  KeyReqStruct* req_struct,
+                                  bool disk)
+{
+
+  Operationrec* operPtrP = regOperRecPtr.p;
+  Fragrecord* regFragPtr = fragptr.p;
+  Tuple_header *dst;
+  Tuple_header *base = req_struct->m_tuple_ptr, *org;
+  Uint32 * change_mask_ptr;
+  Uint32 src_tuplelen = 0;
+
+  if (disk && base->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Local_key key;
+    PagePtr orgpage;
+    memcpy(&key, base->get_disk_ref_ptr(regTabPtr), sizeof(key));
+    key.m_page_no = req_struct->m_disk_page_ptr.i;
+    Uint32 *src_ptr = get_dd_ptr(&orgpage, &key, regTabPtr);
+    src_tuplelen = ((Var_page*)(orgpage.p))->get_entry_len(key.m_page_idx);
+  }
+  if ((dst = alloc_copy_tuple(regTabPtr,
+                              &operPtrP->m_copy_tuple_location)) == 0)
+  {
+    terrorCode = ZMEM_NOMEM_ERROR;
+    goto error;
+  }
+
+  Uint32 tup_version;
+  change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
+  if (operPtrP->is_first_operation())
+  {
+    org = req_struct->m_tuple_ptr;
+    tup_version = org->get_tuple_version();
+    clear_change_mask_info(regTabPtr, change_mask_ptr);
+  }
+  else
+  {
+    Operationrec* prevOp = req_struct->prevOpPtr.p;
+    tup_version= prevOp->tupVersion;
+    org = get_copy_tuple(regTabPtr, &prevOp->m_copy_tuple_location);
+    copy_change_mask_info(regTabPtr,
+                          change_mask_ptr,
+                          get_change_mask_ptr(regTabPtr, org));
+  }
+
+  /* Check consistency before update/delete. */
+  req_struct->m_tuple_ptr = org;
+  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
+  {
+    terrorCode = ZTUPLE_CORRUPTED_ERROR;
+    goto error;
+  }
+
+  req_struct->m_tuple_ptr = dst;
+
+  Uint32 sizes[4];
+
+  disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
+
+  sizes[DD] = 0;
+  if (regTabPtr->need_expand(disk))
+  {
+    expand_tuple(req_struct, sizes, org, regTabPtr, disk);
+
+    if (disk && operPtrP->m_undo_buffer_space == 0)
+    {
+      operPtrP->op_struct.m_wait_log_buffer = 1;
+      operPtrP->op_struct.m_load_diskpage_on_commit = 1;
+      Uint32 sz= operPtrP->m_undo_buffer_space =
+	(sizeof(Dbtup::Disk_undo::Update) >> 2) + src_tuplelen - 1; // - 1 ???
+
+      D("Logfile_client - handleUpdateReq");
+      Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
+      terrorCode = lgman.alloc_log_space(sz);
+      if (unlikely(terrorCode))
+      {
+	operPtrP->m_undo_buffer_space = 0;
+	goto error;
+      }
+    }
+  }
+  else
+  {
+    memcpy(dst, org, 4 * regTabPtr->m_offsets[MM].m_fix_header_size);
+    req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
+  }
+
+  tup_version = (tup_version + 1) & ZTUP_VERSION_MASK;
+  operPtrP->tupVersion = tup_version;
+
+  req_struct->optimize_options = 0;
+
+  if (!req_struct->interpreted_exec)
+  {
+    jam();
+    int retValue = updateAttributes(req_struct,
+				    &cinBuffer[0],
+				    req_struct->attrinfo_len);
+    if (unlikely(retValue == -1))
+      goto error;
+  }
+  else
+  {
+    jam();
+    if (unlikely(interpreterStartLab(signal, req_struct) == -1))
+      return -1;
+  }
+
+  update_change_mask_info(regTabPtr,
+                          change_mask_ptr,
+                          req_struct->changeMask.rep.data);
+
+  switch (req_struct->optimize_options) {
+    case AttributeHeader::OPTIMIZE_MOVE_VARPART:
+      /*
+        optimize varpart of tuple,  move varpart of tuple from
+        big-free-size page list into small-free-size page list
+      */
+      if (base->m_header_bits & Tuple_header::VAR_PART)
+        optimize_var_part(req_struct, base, operPtrP,
+                          regFragPtr, regTabPtr);
+      break;
+    case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
+      //TODO: move fix part of tuple
+      break;
+    default:
+      break;
+  }
+
+
+  sizes[DD+2] = 0;
+  if (regTabPtr->need_shrink())
+  {
+    shrink_tuple_dd_var(req_struct, sizes + 2, regTabPtr, disk);
+    if (sizes[MM] != sizes[MM+2] &&
+        handle_size_change_after_update(req_struct, base,
+                                        operPtrP, regFragPtr,
+                                        regTabPtr, sizes))
+    {
+      goto error;
+    }
+    if (disk && sizes[DD] != sizes[DD+2] &&
+        unlikely(handle_size_change_after_update_disk(signal,
+                                                      req_struct,
+                                                      base,
+                                                      regOperRecPtr,
+                                                      fragPtr,
+                                                      regTabPtr,
+                                                      sizes)))
+    {
+      goto error;
+    }
+  }
+
+  if (req_struct->m_reorg)
+  {
+    handle_reorg(req_struct, regFragPtr->fragStatus);
+  }
+
+  req_struct->m_tuple_ptr->set_tuple_version(tup_version);
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
+  {
+    jam();
+    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+  }
+  return 0;
+
+error:
+  tupkeyErrorLab(signal);
+  return -1;
+}

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-11-09 09:42:56 +0000
@@ -726,7 +726,8 @@ Dbtup::initTab(Tablerec* const regTabPtr
 
   regTabPtr->tabDescriptor = RNIL;
   regTabPtr->readKeyArray = RNIL;
-  regTabPtr->dynTabDescriptor = RNIL;
+  regTabPtr->dynTabDescriptor[MM] = RNIL;
+  regTabPtr->dynTabDescriptor[DD] = RNIL;
 
   regTabPtr->m_bits = 0;
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2009-09-21 12:17:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2009-11-09 09:42:56 +0000
@@ -109,12 +109,27 @@ Dbtup::execCREATE_TAB_REQ(Signal* signal
   regTabPtr.p->m_attributes[DD].m_no_of_dyn_fix= 0;
   regTabPtr.p->m_attributes[DD].m_no_of_dyn_var= 0;
 
-  // Reserve space for bitmap length
-  regTabPtr.p->m_dyn_null_bits= DYN_BM_LEN_BITS;
+  /* Reserve space for bitmap length. */
+  regTabPtr.p->m_dyn_null_bits[MM]= DYN_BM_LEN_BITS;
+  regTabPtr.p->m_dyn_null_bits[DD]= DYN_BM_LEN_BITS;
   regTabPtr.p->noOfKeyAttr= req->noOfKeyAttr;
   regTabPtr.p->noOfCharsets= req->noOfCharsets;
   regTabPtr.p->m_no_of_attributes= req->noOfAttributes;
-  regTabPtr.p->dynTabDescriptor= RNIL;
+
+/*
+  if (req->diskTupleFormat == DISK_TUPLE_FIXED_SIZE)
+    ndbout_c("Table id %u created with DISK_TUPLE_FIXED_SIZE", regTabPtr.i);
+  if (req->diskTupleFormat == DISK_TUPLE_VAR_SIZE)
+    ndbout_c("Table id %u created with DISK_TUPLE_VAR_SIZE", regTabPtr.i);
+  else
+    goto error;
+//  if (regTabPtr.i <7)
+*/
+//   regTabPtr.p->m_disk_tuple_format= DISK_TUPLE_FIXED_SIZE;
+  regTabPtr.p->m_disk_tuple_format= DISK_TUPLE_VAR_SIZE;
+
+  regTabPtr.p->dynTabDescriptor[MM]= RNIL;
+  regTabPtr.p->dynTabDescriptor[DD]= RNIL;
 
   {
     Uint32 offset[10];
@@ -176,17 +191,33 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
   const bool lastAttr = (fragOperPtr.p->attributeCount == 0);
 
   Uint32 firstTabDesIndex= regTabPtr.p->tabDescriptor + (attrId * ZAD_SIZE);
+  Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
+  Uint32 dd_varsz= (regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE);
+  if (ind && dd_varsz)
+  {
+    AttributeDescriptor::setDynamic(attrDescriptor, true);
+  }
+
   setTabDescrWord(firstTabDesIndex, attrDescriptor);
   Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
   
   Uint32 attrDes2= 0;
   Uint32 bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
   Uint32 words= (bytes + 3) / 4;
-  Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
   if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
     jam();
     Uint32 null_pos;
-    ndbrequire(ind <= 1);
+
+    if (dd_varsz)
+    {
+      /* Disk data will be dynamic only. */
+      ndbrequire(ind < 1);
+    }
+    else
+    {
+      ndbrequire(ind <= 1);
+    }
+
     null_pos= fragOperPtr.p->m_null_bits[ind];
 
     if (AttributeDescriptor::getNullable(attrDescriptor)) 
@@ -196,7 +227,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
     } 
 
     if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED
-        || ind==DD)
+        || (ind==DD &&
+            regTabPtr.p->m_disk_tuple_format == DISK_TUPLE_FIXED_SIZE))
     {
       jam();
       regTabPtr.p->m_attributes[ind].m_no_of_fixsize++;
@@ -216,8 +248,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
     
     AttributeOffset::setNullFlagPos(attrDes2, null_pos);
   } else {
-    /* A dynamic attribute. */
-    ndbrequire(ind==MM);
+    /* A dynamic attribute can be either mm-dyn or disk data.  */
+    if (!dd_varsz)
+      ndbrequire(ind==MM);
     regTabPtr.p->m_attributes[ind].m_no_of_dynamic++;
     /*
      * The dynamic attribute format always require a 'null' bit. So
@@ -227,7 +260,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
      * xxx internally as 'null'.
      */
 
-    Uint32 null_pos= regTabPtr.p->m_dyn_null_bits;
+    Uint32 null_pos= regTabPtr.p->m_dyn_null_bits[ind];
 
     if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED)
     {
@@ -246,7 +279,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
          * ensure that the full size bitmap is stored when non-NULL.
          */
         null_pos+= bits;
-        regTabPtr.p->m_dyn_null_bits+= bits+1;
+        regTabPtr.p->m_dyn_null_bits[ind]+= bits+1;
       }
       else
       {
@@ -261,7 +294,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
 
         regTabPtr.p->m_attributes[ind].m_no_of_dyn_fix++;
         Uint32 null_bits= (bytes+3) >> 2;
-        regTabPtr.p->m_dyn_null_bits+= null_bits;
+        regTabPtr.p->m_dyn_null_bits[ind]+= null_bits;
       }
     }
     else
@@ -270,7 +303,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
   treat_as_varsize:
       jam();
       regTabPtr.p->m_attributes[ind].m_no_of_dyn_var++;
-      regTabPtr.p->m_dyn_null_bits++;
+      regTabPtr.p->m_dyn_null_bits[ind]++;
     }
     AttributeOffset::setNullFlagPos(attrDes2, null_pos);
 
@@ -307,22 +340,37 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
   regTabPtr.p->m_offsets[DD].m_null_words= BTW(fragOperPtr.p->m_null_bits[DD]);
 #undef BTW
 
+  Uint32 tup_format= regTabPtr.p->m_disk_tuple_format;
+
   {
     /* Allocate  dynamic descriptor. */
-    Uint32 offset[3];
-    Uint32 allocSize= getDynTabDescrOffsets((regTabPtr.p->m_dyn_null_bits+31)>>5,
-                                            offset);
-    Uint32 dynTableDescriptorRef= allocTabDescr(allocSize);
-    if (dynTableDescriptorRef == RNIL)
+
+    for (Uint32 i = 0; i < tup_format+1; ++i)
     {
-      jam();
-      goto error;
+      Uint32 offset[3];
+      Uint32 allocSize = getDynTabDescrOffsets(
+                          (regTabPtr.p->m_dyn_null_bits[i]+31)>>5,
+                           offset);
+      Uint32 dynTableDescriptorRef = allocTabDescr(allocSize);
+      if (dynTableDescriptorRef == RNIL)
+      {
+        jam();
+        goto error;
+      }
+      setupDynDescriptorReferences(dynTableDescriptorRef,
+                                   regTabPtr.p, offset, i);
     }
-    setupDynDescriptorReferences(dynTableDescriptorRef, regTabPtr.p, offset);
   }
 
   /* Compute table aggregate metadata. */
-  computeTableMetaData(regTabPtr.p);
+  if (dd_varsz)
+  {
+    computeTableMetaData_dd_var(regTabPtr.p);
+  }
+  else
+  {
+    computeTableMetaData(regTabPtr.p);
+  }
 
 #if 0
   ndbout << *regTabPtr.p << endl;
@@ -796,7 +844,7 @@ Dbtup::handleAlterTablePrepare(Signal *s
       in ALTER_TAB_REQ[commit];
     */
     Uint32 charsetIndex= regTabPtr->noOfCharsets;
-    Uint32 dyn_nullbits= regTabPtr->m_dyn_null_bits;
+    Uint32 dyn_nullbits= regTabPtr->m_dyn_null_bits[MM];
     if (dyn_nullbits == 0)
     {
       jam();
@@ -857,37 +905,59 @@ Dbtup::handleAlterTablePrepare(Signal *s
     }
     ndbassert(newNoOfCharsets==charsetIndex);
 
-    regAlterTabOpPtr.p->noOfDynNullBits= dyn_nullbits;
-    ndbassert(noDynamic ==
-              regTabPtr->m_attributes[MM].m_no_of_dynamic + noOfNewAttr);
-    regAlterTabOpPtr.p->noOfDynFix= noDynFix;
-    regAlterTabOpPtr.p->noOfDynVar= noDynVar;
-    regAlterTabOpPtr.p->noOfDynamic= noDynamic;
-
-    /* Allocate the new (possibly larger) dynamic descriptor. */
-    allocSize= getDynTabDescrOffsets((dyn_nullbits+31)>>5,
-                                     regAlterTabOpPtr.p->dynTabDesOffset);
-    Uint32 dynTableDescriptorRef = RNIL;
-    if (ERROR_INSERTED(4029))
+    regAlterTabOpPtr.p->noOfDynNullBits[MM]= dyn_nullbits;
+    if (regTabPtr->m_disk_tuple_format == DISK_TUPLE_VAR_SIZE)
     {
-      jam();
-      dynTableDescriptorRef = RNIL;
-      terrorCode = ZMEM_NOTABDESCR_ERROR;
-      CLEAR_ERROR_INSERT_VALUE;
+      /* Save disk-data related info from the old table. */
+      regAlterTabOpPtr.p->disk_tuple_format = regTabPtr->m_disk_tuple_format;
+      regAlterTabOpPtr.p->noOfDynFix[DD] =
+        regTabPtr->m_attributes[DD].m_no_of_dyn_fix;
+      regAlterTabOpPtr.p->noOfDynVar[DD] =
+        regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+      regAlterTabOpPtr.p->noOfDynamic[DD] =
+        regTabPtr->m_attributes[DD].m_no_of_dyn_fix +
+        regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+      regAlterTabOpPtr.p->noOfDynNullBits[DD] =
+                            regTabPtr->m_dyn_null_bits[DD];
     }
-    else
-    {
-      jam();
-      dynTableDescriptorRef = allocTabDescr(allocSize);
-    }
-    if (dynTableDescriptorRef == RNIL) {
-      jam();
-      releaseTabDescr(tableDescriptorRef);
-      releaseAlterTabOpRec(regAlterTabOpPtr);
-      sendAlterTabRef(signal, terrorCode);
-      return;
+    ndbassert(noDynamic ==
+              regTabPtr->m_attributes[MM].m_no_of_dynamic + noOfNewAttr);
+
+    regAlterTabOpPtr.p->noOfDynFix[MM]= noDynFix;
+    regAlterTabOpPtr.p->noOfDynVar[MM]= noDynVar;
+    regAlterTabOpPtr.p->noOfDynamic[MM]= noDynamic;
+
+    /* Allocate the new (possibly larger) dynamic descriptor for MM and DD. */
+
+    for (int i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+    {
+      Uint32 offset[3];
+      allocSize =
+       getDynTabDescrOffsets((regAlterTabOpPtr.p->noOfDynNullBits[i]+31)>>5,
+                             offset);
+      Uint32 dynTableDescriptorRef = RNIL;
+      if (ERROR_INSERTED(4029))
+      {
+        jam();
+        dynTableDescriptorRef = RNIL;
+        terrorCode = ZMEM_NOTABDESCR_ERROR;
+        CLEAR_ERROR_INSERT_VALUE;
+      }
+      else
+      {
+        jam();
+        dynTableDescriptorRef= allocTabDescr(allocSize);
+      }
+      if (dynTableDescriptorRef == RNIL) {
+        jam();
+        releaseTabDescr(tableDescriptorRef);
+        releaseAlterTabOpRec(regAlterTabOpPtr);
+        sendAlterTabRef(signal, terrorCode);
+        return;
+      }
+      regAlterTabOpPtr.p->dynDesAllocSize[i]= allocSize;
+      regAlterTabOpPtr.p->dynTableDescriptor[i]= dynTableDescriptorRef;
     }
-    regAlterTabOpPtr.p->dynTableDescriptor= dynTableDescriptorRef;
     connectPtr = regAlterTabOpPtr.i;
   }
 
@@ -921,28 +991,55 @@ Dbtup::handleAlterTableCommit(Signal *si
 
     /* Free old table descriptors. */
     releaseTabDescr(regTabPtr);
-
+    Uint32 tup_format= regAlterTabOpPtr.p->disk_tuple_format;
     /* Set new attribute counts. */
     regTabPtr->m_no_of_attributes= regAlterTabOpPtr.p->newNoOfAttrs;
     regTabPtr->noOfCharsets= regAlterTabOpPtr.p->newNoOfCharsets;
     regTabPtr->noOfKeyAttr= regAlterTabOpPtr.p->newNoOfKeyAttrs;
-    regTabPtr->m_attributes[MM].m_no_of_dyn_fix= regAlterTabOpPtr.p->noOfDynFix;
-    regTabPtr->m_attributes[MM].m_no_of_dyn_var= regAlterTabOpPtr.p->noOfDynVar;
-    regTabPtr->m_attributes[MM].m_no_of_dynamic= regAlterTabOpPtr.p->noOfDynamic;
-    regTabPtr->m_dyn_null_bits= regAlterTabOpPtr.p->noOfDynNullBits;
+    regTabPtr->m_disk_tuple_format= tup_format;
+
+    /* Transfer the alter table info to the new table record. */
+    for (int i = 0; i < tup_format+1; ++i)
+    {
+      regTabPtr->m_attributes[i].m_no_of_dyn_fix =
+        regAlterTabOpPtr.p->noOfDynFix[i];
+      regTabPtr->m_attributes[i].m_no_of_dyn_var =
+        regAlterTabOpPtr.p->noOfDynVar[i];
+      regTabPtr->m_attributes[i].m_no_of_dynamic =
+        regAlterTabOpPtr.p->noOfDynamic[i];
+      regTabPtr->m_dyn_null_bits[i] =
+        regAlterTabOpPtr.p->noOfDynNullBits[i];
+    }
 
     /* Install the new (larger) table descriptors. */
     setUpDescriptorReferences(regAlterTabOpPtr.p->tableDescriptor,
                               regTabPtr,
                               regAlterTabOpPtr.p->tabDesOffset);
-    setupDynDescriptorReferences(regAlterTabOpPtr.p->dynTableDescriptor,
-                                 regTabPtr,
-                                 regAlterTabOpPtr.p->dynTabDesOffset);
 
+    /* Allocate dynamic descriptor for mm and disk data. */
+    for (int i = 0; i < tup_format+1; ++i)
+    {
+      Uint32 offset[3];
+      Uint32 allocSize =
+        getDynTabDescrOffsets((regTabPtr->m_dyn_null_bits[i]+31)>>5,
+                               offset);
+      Uint32 dynTableDescriptorRef = allocTabDescr(allocSize);
+      ndbrequire(dynTableDescriptorRef != RNIL);
+      setupDynDescriptorReferences(regAlterTabOpPtr.p->dynTableDescriptor[i],
+                                   regTabPtr,
+                                   offset, i);
+    }
     releaseAlterTabOpRec(regAlterTabOpPtr);
 
     /* Recompute aggregate table meta data. */
-    computeTableMetaData(regTabPtr);
+    if (tup_format == DISK_TUPLE_VAR_SIZE)
+    {
+      computeTableMetaData_dd_var(regTabPtr);
+    }
+    else
+    {
+      computeTableMetaData(regTabPtr);
+    }
   }
 
   if (AlterTableReq::getReorgFragFlag(req->changeMask))
@@ -1042,7 +1139,11 @@ Dbtup::handleAlterTableAbort(Signal *sig
       ptrCheckGuard(regAlterTabOpPtr, cnoOfAlterTabOps, alterTabOperRec);
 
       releaseTabDescr(regAlterTabOpPtr.p->tableDescriptor);
-      releaseTabDescr(regAlterTabOpPtr.p->dynTableDescriptor);
+
+      for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+      {
+        releaseTabDescr(regAlterTabOpPtr.p->dynTableDescriptor[i]);
+      }
       releaseAlterTabOpRec(regAlterTabOpPtr);
     }
   }
@@ -1089,12 +1190,12 @@ Dbtup::handleCharsetPos(Uint32 csNumber,
 void
 Dbtup::computeTableMetaData(Tablerec *regTabPtr)
 {
-  if (regTabPtr->m_dyn_null_bits == DYN_BM_LEN_BITS)
+  if (regTabPtr->m_dyn_null_bits[MM] == DYN_BM_LEN_BITS)
   {
-    regTabPtr->m_dyn_null_bits = 0;
+    regTabPtr->m_dyn_null_bits[MM] = 0;
   }
   
-  Uint32 dyn_null_words= (regTabPtr->m_dyn_null_bits+31)>>5;
+  Uint32 dyn_null_words= (regTabPtr->m_dyn_null_bits[MM]+31)>>5;
   regTabPtr->m_offsets[MM].m_dyn_null_words= dyn_null_words;
 
   /* Compute the size of the static headers. */
@@ -1150,7 +1251,6 @@ Dbtup::computeTableMetaData(Tablerec *re
     We also compute the dynamic bitmasks here.
   */
   Uint32 *tabDesc= (Uint32*)(tableDescriptor+regTabPtr->tabDescriptor);
-  Uint32 *dynDesc= (Uint32*)(tableDescriptor+regTabPtr->dynTabDescriptor);
   Uint32 fix_size[2]= {0, 0};
   Uint32 var_size[2]= {0, 0};
   Uint32 dyn_size[2]= {0, 0};
@@ -1160,8 +1260,8 @@ Dbtup::computeTableMetaData(Tablerec *re
   Uint32 dynamic_count= 0;
   regTabPtr->blobAttributeMask.clear();
   regTabPtr->notNullAttributeMask.clear();
-  bzero(regTabPtr->dynVarSizeMask, dyn_null_words<<2);
-  bzero(regTabPtr->dynFixSizeMask, dyn_null_words<<2);
+  bzero(regTabPtr->dynVarSizeMask[MM], dyn_null_words<<2);
+  bzero(regTabPtr->dynFixSizeMask[MM], dyn_null_words<<2);
 
   for(Uint32 i= 0; i<regTabPtr->m_no_of_attributes; i++)
   {
@@ -1223,7 +1323,7 @@ Dbtup::computeTableMetaData(Tablerec *re
           while(size_in_words-- > 0)
 	  {
 	    BitmaskImpl::set(dyn_null_words, 
-			     regTabPtr->dynFixSizeMask, null_pos++);
+			     regTabPtr->dynFixSizeMask[ind], null_pos++);
 	  }
         }
         else
@@ -1234,7 +1334,7 @@ Dbtup::computeTableMetaData(Tablerec *re
       treat_as_varsize:
         jam();
         off= dynvar_count++;
-	BitmaskImpl::set(dyn_null_words, regTabPtr->dynVarSizeMask, null_pos);
+	BitmaskImpl::set(dyn_null_words, regTabPtr->dynVarSizeMask[ind], null_pos);
       }
     }
     AttributeOffset::setOffset(attrDes2, off);
@@ -1393,12 +1493,13 @@ void Dbtup::setUpDescriptorReferences(Ui
 
 void Dbtup::setupDynDescriptorReferences(Uint32 dynDescr,
                                          Tablerec* const regTabPtr,
-                                         const Uint32* offset)
+                                         const Uint32* offset,
+                                         Uint32 ind)
 {
-  regTabPtr->dynTabDescriptor= dynDescr;
+  regTabPtr->dynTabDescriptor[ind]= dynDescr;
   Uint32* desc= &tableDescriptor[dynDescr].tabDescr;
-  regTabPtr->dynVarSizeMask= desc+offset[0];
-  regTabPtr->dynFixSizeMask= desc+offset[1];
+  regTabPtr->dynVarSizeMask[ind]= desc+offset[0];
+  regTabPtr->dynFixSizeMask[ind]= desc+offset[1];
 }
 
 Uint32
@@ -1579,14 +1680,19 @@ void Dbtup::releaseTabDescr(Tablerec* co
     releaseTabDescr(descriptor);
   }
 
-  descriptor= regTabPtr->dynTabDescriptor;
-  if(descriptor != RNIL)
+  /* Release  dynamic descriptor, etc for mm and disk data. */
+
+  for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
   {
-    jam();
-    regTabPtr->dynTabDescriptor= RNIL;
-    regTabPtr->dynVarSizeMask= NULL;
-    regTabPtr->dynFixSizeMask= NULL;
-    releaseTabDescr(descriptor);
+    descriptor= regTabPtr->dynTabDescriptor[i];
+    if(descriptor != RNIL)
+    {
+      jam();
+      regTabPtr->dynTabDescriptor[i] = RNIL;
+      regTabPtr->dynVarSizeMask[i] = NULL;
+      regTabPtr->dynFixSizeMask[i] = NULL;
+      releaseTabDescr(descriptor);
+    }
   }
 }
 
@@ -2181,10 +2287,23 @@ Dbtup::start_restore_lcp(Uint32 tableId,
   TablerecPtr tabPtr;
   tabPtr.i= tableId;
   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-  
-  tabPtr.p->m_dropTable.tabUserPtr= tabPtr.p->m_attributes[DD].m_no_of_fixsize;
-  tabPtr.p->m_dropTable.tabUserRef= tabPtr.p->m_attributes[DD].m_no_of_varsize;
-  
+
+  Uint32 tup_format= tabPtr.p->m_disk_tuple_format;
+  if (tup_format == DISK_TUPLE_VAR_SIZE)
+  {
+    tabPtr.p->m_dropTable.tabUserPtr =
+      tabPtr.p->m_attributes[DD].m_no_of_dyn_fix;
+    tabPtr.p->m_dropTable.tabUserRef =
+      tabPtr.p->m_attributes[DD].m_no_of_dyn_var;
+  }
+  else
+  {
+    tabPtr.p->m_dropTable.tabUserPtr =
+      tabPtr.p->m_attributes[DD].m_no_of_fixsize;
+    tabPtr.p->m_dropTable.tabUserRef =
+      tabPtr.p->m_attributes[DD].m_no_of_varsize;
+  }
+
   Uint32 *tabDesc = (Uint32*)(tableDescriptor+tabPtr.p->tabDescriptor);
   for(Uint32 i= 0; i<tabPtr.p->m_no_of_attributes; i++)
   {
@@ -2200,6 +2319,8 @@ Dbtup::start_restore_lcp(Uint32 tableId,
   tabPtr.p->m_no_of_disk_attributes = 0;
   tabPtr.p->m_attributes[DD].m_no_of_fixsize = 0;
   tabPtr.p->m_attributes[DD].m_no_of_varsize = 0;
+  tabPtr.p->m_attributes[DD].m_no_of_dyn_fix = 0;
+  tabPtr.p->m_attributes[DD].m_no_of_dyn_var = 0;
 }
 void
 Dbtup::complete_restore_lcp(Signal* signal, 
@@ -2210,13 +2331,27 @@ Dbtup::complete_restore_lcp(Signal* sign
   tabPtr.i= tableId;
   ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
   
-  tabPtr.p->m_attributes[DD].m_no_of_fixsize= tabPtr.p->m_dropTable.tabUserPtr;
-  tabPtr.p->m_attributes[DD].m_no_of_varsize= tabPtr.p->m_dropTable.tabUserRef;
-  
-  tabPtr.p->m_no_of_disk_attributes = 
-    tabPtr.p->m_attributes[DD].m_no_of_fixsize + 
-    tabPtr.p->m_attributes[DD].m_no_of_varsize;
-  
+  Uint32 tup_format = tabPtr.p->m_disk_tuple_format;
+  if (tup_format == DISK_TUPLE_VAR_SIZE)
+  {
+    tabPtr.p->m_attributes[DD].m_no_of_dyn_fix =
+      tabPtr.p->m_dropTable.tabUserPtr;
+    tabPtr.p->m_attributes[DD].m_no_of_dyn_var =
+      tabPtr.p->m_dropTable.tabUserRef;
+
+    tabPtr.p->m_no_of_disk_attributes =
+      tabPtr.p->m_attributes[DD].m_no_of_dyn_fix +
+      tabPtr.p->m_attributes[DD].m_no_of_dyn_var;
+  }
+  else
+  {
+    tabPtr.p->m_attributes[DD].m_no_of_fixsize= tabPtr.p->m_dropTable.tabUserPtr;
+    tabPtr.p->m_attributes[DD].m_no_of_varsize= tabPtr.p->m_dropTable.tabUserRef;
+    tabPtr.p->m_no_of_disk_attributes =
+      tabPtr.p->m_attributes[DD].m_no_of_fixsize +
+      tabPtr.p->m_attributes[DD].m_no_of_varsize;
+  }
+
   Uint32 *tabDesc = (Uint32*)(tableDescriptor+tabPtr.p->tabDescriptor);
   for(Uint32 i= 0; i<tabPtr.p->m_no_of_attributes; i++)
   {
@@ -2315,3 +2450,335 @@ Dbtup::execDROP_FRAG_REQ(Signal* signal)
              signal, DropFragConf::SignalLength, JBB);
 }
 
+
+/*
+ The following methods handle disk data as fixed or variable size,
+ depending on the m_disk_tuple_format.
+*/
+
+void
+Dbtup::computeTableMetaData_dd_var(Tablerec *regTabPtr)
+{
+  Uint32 dyn_null_words[2];
+
+  for (Uint32 ind = 0; ind < regTabPtr->m_disk_tuple_format + 1; ++ind)
+  {
+    if (regTabPtr->m_dyn_null_bits[ind] == DYN_BM_LEN_BITS)
+    {
+      regTabPtr->m_dyn_null_bits[ind] = 0;
+    }
+
+    dyn_null_words[ind] = (regTabPtr->m_dyn_null_bits[ind]+31)>>5;
+    regTabPtr->m_offsets[ind].m_dyn_null_words = dyn_null_words[ind];
+  }
+
+  /* Compute the size of the static headers. */
+  Uint32 pos[2] = { 0, 0 };
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
+  {
+    pos[MM]++;
+  }
+
+  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
+  {
+    pos[MM]++;
+    pos[DD]++;
+  }
+
+  regTabPtr->m_no_of_disk_attributes =
+    regTabPtr->m_attributes[DD].m_no_of_dynamic;
+  if(regTabPtr->m_no_of_disk_attributes > 0)
+  {
+    /* Room for disk part location. */
+    regTabPtr->m_offsets[MM].m_disk_ref_offset = pos[MM];
+    pos[MM] += Disk_part_ref::SZ32; // 8 bytes
+    regTabPtr->m_bits |= Tablerec::TR_DiskPart;
+  }
+  else
+  {
+    regTabPtr->m_offsets[MM].m_disk_ref_offset = pos[MM] - Disk_part_ref::SZ32;
+  }
+
+  if (regTabPtr->m_attributes[MM].m_no_of_varsize ||
+      regTabPtr->m_attributes[MM].m_no_of_dynamic)
+  {
+    pos[MM] += Var_part_ref::SZ32;
+    regTabPtr->m_bits &= ~(Uint32)Tablerec::TR_ForceVarPart;
+  }
+  else if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
+  {
+    pos[MM] += Var_part_ref::SZ32;
+  }
+
+  regTabPtr->m_offsets[MM].m_null_offset = pos[MM];
+  regTabPtr->m_offsets[DD].m_null_offset = pos[DD];
+  pos[MM] += regTabPtr->m_offsets[MM].m_null_words;
+  pos[DD] += regTabPtr->m_offsets[DD].m_null_words;
+
+  /*
+    Compute the offsets for the attributes.
+    For static fixed-size, this is the offset from the tuple pointer of the
+    actual data.
+    For static var-size and dynamic, this is the index into the offset array.
+
+    We also compute the dynamic bitmasks here.
+  */
+  Uint32 *tabDesc = (Uint32*)(tableDescriptor+regTabPtr->tabDescriptor);
+  Uint32 fix_size = 0;
+  Uint32 var_size = 0;
+  Uint32 statvar_count = 0;
+  Uint32 dyn_size[2]     = {0, 0};
+  Uint32 dynfix_count[2] = {0, 0};
+  Uint32 dynvar_count[2] = {0, 0};
+  Uint32 dynamic_count[2]= {0, 0};
+  regTabPtr->blobAttributeMask.clear();
+  regTabPtr->notNullAttributeMask.clear();
+
+  for (Uint32 i = 0; i < regTabPtr->m_disk_tuple_format + 1; ++i)
+  {
+    bzero(regTabPtr->dynVarSizeMask[i], dyn_null_words[i]<<2);
+    bzero(regTabPtr->dynFixSizeMask[i], dyn_null_words[i]<<2);
+  }
+
+  for(Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++)
+  {
+    Uint32 attrDescriptor = *tabDesc++;
+    Uint32 attrDes2 = *tabDesc;
+    Uint32 ind = AttributeDescriptor::getDiskBased(attrDescriptor);
+    Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
+    Uint32 arr = AttributeDescriptor::getArrayType(attrDescriptor);
+    Uint32 size_in_words = AttributeDescriptor::getSizeInWords(attrDescriptor);
+    Uint32 size_in_bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+    Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
+    Uint32 off;
+
+    if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+      regTabPtr->blobAttributeMask.set(i);
+    if(!AttributeDescriptor::getNullable(attrDescriptor))
+      regTabPtr->notNullAttributeMask.set(i);
+    if (!AttributeDescriptor::getDynamic(attrDescriptor))
+    {
+      ndbassert(ind == MM); // Disk data will be dynamic only.
+      if(arr == NDB_ARRAYTYPE_FIXED)
+      {
+        if (attrLen != 0)
+        {
+          off = fix_size + pos[ind];
+          fix_size += size_in_words;
+        }
+        else
+          off = 0;                               // Bit type
+      }
+      else
+      {
+        /* Static varsize. */
+        ndbassert(ind == MM);
+        off = statvar_count++;
+        var_size += size_in_bytes;
+      }
+    }
+    else
+    {
+      /* Dynamic attribute  - both memory and disk. */
+      dynamic_count[ind]++;
+      Uint32 null_pos = AttributeOffset::getNullFlagPos(attrDes2);
+      dyn_size[ind] += (size_in_words<<2);
+      if(arr == NDB_ARRAYTYPE_FIXED)
+      {
+        jam();
+        //if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+          //regTabPtr->blobAttributeMask.set(i);
+        // ToDo: I wonder what else is needed to handle BLOB/TEXT, if anything?
+
+        if (attrLen != 0)
+        {
+          jam();
+          if(size_in_words>InternalMaxDynFix)
+            goto treat_as_varsize;
+
+          off =
+            dynfix_count[ind]++ + regTabPtr->m_attributes[ind].m_no_of_dyn_var;
+          while(size_in_words-- > 0)
+	  {
+	    BitmaskImpl::set(dyn_null_words[ind],
+			     regTabPtr->dynFixSizeMask[ind], null_pos++);
+	  }
+        }
+        else
+          off = 0;                               // Bit type
+      }
+      else
+      {
+      treat_as_varsize:
+        jam();
+        off = dynvar_count[ind]++;
+	BitmaskImpl::set(dyn_null_words[ind], regTabPtr->dynVarSizeMask[ind], null_pos);
+      }
+    }
+    AttributeOffset::setOffset(attrDes2, off);
+    *tabDesc++= attrDes2;
+  }
+  ndbassert(dynvar_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dyn_var);
+  ndbassert(dynfix_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dyn_fix);
+  ndbassert(dynamic_count[MM]==regTabPtr->m_attributes[MM].m_no_of_dynamic);
+  ndbassert(statvar_count==regTabPtr->m_attributes[MM].m_no_of_varsize);
+
+  ndbassert(dynvar_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dyn_var);
+  ndbassert(dynfix_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dyn_fix);
+  ndbassert(dynamic_count[DD] == regTabPtr->m_attributes[DD].m_no_of_dynamic);
+
+  regTabPtr->m_offsets[MM].m_fix_header_size =
+    Tuple_header::HeaderSize + fix_size + pos[MM];
+  regTabPtr->m_offsets[DD].m_fix_header_size =
+    Tuple_header::HeaderSize+pos[DD]+1;
+
+  Uint32 mm_vars = regTabPtr->m_attributes[MM].m_no_of_varsize;
+  Uint32 mm_dyns = regTabPtr->m_attributes[MM].m_no_of_dyn_fix +
+                  regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+  Uint32 dd_dyns = regTabPtr->m_attributes[DD].m_no_of_dyn_fix +
+                  regTabPtr->m_attributes[DD].m_no_of_dyn_var;
+
+  regTabPtr->m_offsets[MM].m_max_var_offset = var_size;
+  /*
+    Size of the expanded dynamic part. Needs room for bitmap, (N+1) 16-bit
+    offset words with 32-bit padding, and all attribute data.
+  */
+  regTabPtr->m_offsets[MM].m_max_dyn_offset =
+    (regTabPtr->m_offsets[MM].m_dyn_null_words<<2) +
+    4*((mm_dyns+2)>>1) + dyn_size[MM];
+
+  regTabPtr->m_offsets[DD].m_max_dyn_offset =
+    (regTabPtr->m_offsets[DD].m_dyn_null_words<<2) +
+    4*((dd_dyns+2)>>1) + dyn_size[DD];
+
+  /* Room for data for all the attributes. */
+  Uint32 total_rec_size =
+    pos[MM] + fix_size +
+    ((var_size + 3) >> 2) + ((dyn_size[MM] + 3) >> 2) +
+    regTabPtr->m_offsets[DD].m_fix_header_size +
+    ((dyn_size[DD] + 3) >> 2);
+
+  if(dd_dyns)
+  {
+    regTabPtr->m_offsets[DD].m_fix_header_size +=
+      (regTabPtr->m_offsets[DD].m_max_dyn_offset+3)>>2;
+
+    regTabPtr->m_offsets[DD].m_fix_header_size += 1; // space for dynlen
+  }
+
+  /*
+    Room for offset arrays and dynamic bitmaps. There is one extra 16-bit
+    offset in each offset array (for easy computation of final length).
+    Also one word for storing total length of varsize+dynamic part
+  */
+  if(mm_vars + regTabPtr->m_attributes[MM].m_no_of_dynamic)
+  {
+    total_rec_size += (mm_vars + 2) >> 1;
+    total_rec_size += regTabPtr->m_offsets[MM].m_dyn_null_words;
+    total_rec_size += (mm_dyns + 2) >> 1;
+    total_rec_size += 1;
+  }
+
+  if (dd_dyns)
+  {
+    total_rec_size += regTabPtr->m_offsets[DD].m_dyn_null_words;
+    total_rec_size += (dd_dyns + 2) >> 1;
+    total_rec_size += 2;
+  }
+
+  /* Room for the header. */
+  total_rec_size += Tuple_header::HeaderSize;
+  if(regTabPtr->m_no_of_disk_attributes)
+    total_rec_size += Tuple_header::HeaderSize;
+
+  /* Room for changemask */
+  total_rec_size += (regTabPtr->m_no_of_attributes + 31) >> 5;
+
+  regTabPtr->total_rec_size = total_rec_size;
+
+  setUpQueryRoutines(regTabPtr);
+  setUpKeyArray_dd_var(regTabPtr);
+}
+
+void Dbtup::setUpKeyArray_dd_var(Tablerec* const regTabPtr)
+{
+  ndbrequire((regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr) <
+              cnoOfTabDescrRec);
+  Uint32* keyArray = &tableDescriptor[regTabPtr->readKeyArray].tabDescr;
+  Uint32 countKeyAttr = 0;
+  for (Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++) {
+    jam();
+    Uint32 refAttr = regTabPtr->tabDescriptor + (i * ZAD_SIZE);
+    Uint32 attrDescriptor = getTabDescrWord(refAttr);
+    if (AttributeDescriptor::getPrimaryKey(attrDescriptor)) {
+      jam();
+      AttributeHeader::init(&keyArray[countKeyAttr], i, 0);
+      countKeyAttr++;
+    }
+  }
+  ndbrequire(countKeyAttr == regTabPtr->noOfKeyAttr);
+
+  /*
+    Setup real order array (16 bit per column)
+
+    Sequence is [mm_fix mm_var mm_dynfix mm_dynvar dd_dynfix dd_dynvar]
+  */
+  const Uint32 off = regTabPtr->m_real_order_descriptor;
+  const Uint32 sz = (regTabPtr->m_no_of_attributes + 1) >> 1;
+  ndbrequire((off + sz) < cnoOfTabDescrRec);
+
+  Uint32 cnt = 0;
+  Uint16* order = (Uint16*)&tableDescriptor[off].tabDescr;
+  for (Uint32 type = 0; type < 6; type++)
+  {
+    for (Uint32 i = 0; i < regTabPtr->m_no_of_attributes; i++)
+    {
+      jam();
+      Uint32 refAttr = regTabPtr->tabDescriptor + (i * ZAD_SIZE);
+      Uint32 desc = getTabDescrWord(refAttr);
+      bool disk = AttributeDescriptor::getDiskBased(desc);
+      Uint32 t = 0;
+
+      if (AttributeDescriptor::getDynamic(desc) &&
+          AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+          AttributeDescriptor::getSize(desc) == 0)
+      {
+        /*
+          Dynamic bit types are stored inside the dynamic NULL bitmap, and are
+          never expanded. So we do not need any real_order_descriptor for
+          them.
+        */
+        jam();
+        if((!disk && type == 0) || (disk && type == 4))
+          {
+            cnt++;
+            continue;
+          }
+      }
+      else {
+        if ((AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED) ||
+            (AttributeDescriptor::getDynamic(desc) &&
+             AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+             AttributeDescriptor::getSizeInWords(desc) > InternalMaxDynFix))
+        {
+	  t += 1;
+        }
+        if (AttributeDescriptor::getDynamic(desc) && !disk)
+        {
+	  t += 2;
+        }
+        if (disk)
+        {
+	  t += 4;
+        }
+        ndbrequire(t < 6);
+        if(t == type)
+        {
+	  * order++ = i << ZAD_LOG_SIZE;
+	  cnt++;
+        }
+      }
+    }
+  }
+  ndbrequire(cnt == regTabPtr->m_no_of_attributes);
+}

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2009-11-09 09:42:56 +0000
@@ -627,8 +627,11 @@ Dbtup::readFixedSizeTHZeroWordNULLable(U
 bool
 Dbtup::nullFlagCheck(KeyReqStruct *req_struct, Uint32  attrDes2)
 {
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
   Tablerec* const regTabPtr= tabptr.p;
-  Uint32 *bits= req_struct->m_tuple_ptr->get_null_bits(regTabPtr);
+  Uint32 *bits= (ind) ? req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD) :
+                        req_struct->m_tuple_ptr->get_null_bits(regTabPtr);
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
   
   return BitmaskImpl::get(regTabPtr->m_offsets[MM].m_null_words, bits, pos);
@@ -858,12 +861,16 @@ Dbtup::readDynFixedSizeExpandedNotNULL(U
     using different data base pointer and offset/lenght arrays.
   */
   jam();
-  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
   Uint32 var_index= AttributeOffset::getOffset(attrDes2);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 var_attr_pos= off_arr[var_index];
   Uint32 vsize_in_bytes=
-    AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
+    AttributeDescriptor::getSizeInBytes(attr_descriptor);
   return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
                         src_ptr + var_attr_pos, vsize_in_bytes);
 }
@@ -878,7 +885,11 @@ Dbtup::readDynFixedSizeExpandedNULLable(
     Check for NULL. In the expanded format, the bitmap is guaranteed
     to be stored in full length.
   */
-  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
   {
@@ -897,8 +908,12 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
                                        AttributeHeader* ahOut,
                                        Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   ndbrequire(dyn_len != 0);
   Uint32 bm_len= (* bm_ptr) & DYN_BM_LEN_MASK; // In 32-bit words
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -918,7 +933,7 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
   */
   jam();
   Tablerec* regTabPtr = tabptr.p;
-  Uint32 *bm_mask_ptr= regTabPtr->dynFixSizeMask;
+  Uint32 *bm_mask_ptr= regTabPtr->dynFixSizeMask[ind];
   Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
   Uint32 prevMask= (1 << (pos & 31)) - 1;
   Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -926,7 +941,6 @@ Dbtup::readDynFixedSizeShrunkenNotNULL(U
     bit_count+= BitmaskImpl::count_bits(bm_mask_ptr[i] & bm_ptr[i]);
 
   /* Now compute the data pointer from the row length. */
-  Uint32 attr_descriptor= req_struct->attr_descriptor;
   Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
   Uint32 vsize_in_words= (vsize_in_bytes+3)>>2;
   Uint32 *data_ptr= bm_ptr + dyn_len - bit_count - vsize_in_words;
@@ -950,8 +964,11 @@ Dbtup::readDynFixedSizeShrunkenNULLable(
                                         AttributeHeader* ahOut,
                                         Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   /* Check for NULL (including the case of an empty bitmap). */
   if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1007,13 +1024,17 @@ Dbtup::readDynBigFixedSizeExpandedNotNUL
     using different data base pointer and offset/lenght arrays.
   */
   jam();
-  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
   Uint32 var_index= AttributeOffset::getOffset(attrDes2);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 var_attr_pos= off_arr[var_index];
   Uint32 vsize_in_bytes=
-    AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
-  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+    AttributeDescriptor::getSizeInBytes(attr_descriptor);
+  Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
   ndbrequire(vsize_in_bytes <= off_arr[var_index+idx] - var_attr_pos);
   return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
                         src_ptr + var_attr_pos, vsize_in_bytes);
@@ -1029,7 +1050,11 @@ Dbtup::readDynBigFixedSizeExpandedNULLab
     Check for NULL. In the expanded format, the bitmap is guaranteed
     to be stored in full length.
   */
-  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
   {
@@ -1048,8 +1073,12 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
                                           AttributeHeader* ahOut,
                                           Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   ndbrequire(dyn_len!=0);
   Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -1064,7 +1093,7 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
     any trailing non-bitmap bytes to save a few conditionals.
   */
   Tablerec* regTabPtr = tabptr.p;
-  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask[ind];
   Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
   Uint32 prevMask= (1 << (pos & 31)) - 1;
   Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -1072,7 +1101,6 @@ Dbtup::readDynBigFixedSizeShrunkenNotNUL
     bit_count+= BitmaskImpl::count_bits(bm_mask_ptr[i] & bm_ptr[i]);
 
   /* Now find the data pointer and length from the offset array. */
-  Uint32 attr_descriptor= req_struct->attr_descriptor;
   Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
   //Uint16 *offset_array= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
   Uint16* offset_array = (Uint16*)(bm_ptr + bm_len);
@@ -1094,8 +1122,11 @@ Dbtup::readDynBigFixedSizeShrunkenNULLab
                                            AttributeHeader* ahOut,
                                            Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   /* Check for NULL (including the case of an empty bitmap). */
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1142,12 +1173,17 @@ Dbtup::readDynBitsShrunkenNotNULL(Uint8*
                                   AttributeHeader* ahOut,
                                   Uint32 attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   ndbrequire(dyn_len != 0);
   Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
   Uint32 bitCount =
-    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+    AttributeDescriptor::getArraySize(attr_descriptor);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   /* Make sure we have sufficient data in the row. */
   ndbrequire((pos>>5)<bm_len);
@@ -1166,8 +1202,11 @@ Dbtup::readDynBitsShrunkenNULLable(Uint8
                                    AttributeHeader* ahOut,
                                    Uint32 attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   /* Check for NULL (including the case of an empty bitmap). */
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -1187,10 +1226,15 @@ Dbtup::readDynBitsExpandedNotNULL(Uint8*
                                   AttributeHeader* ahOut,
                                   Uint32 attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
   Uint32 bitCount =
-    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+    AttributeDescriptor::getArraySize(attr_descriptor);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   /* The bit data is stored just before the NULL bit. */
   ndbassert(pos>bitCount);
@@ -1207,7 +1251,11 @@ Dbtup::readDynBitsExpandedNULLable(Uint8
                                    AttributeHeader* ahOut,
                                    Uint32 attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(!BitmaskImpl::get((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos))
   {
@@ -1260,11 +1308,14 @@ Dbtup::readDynVarSizeExpandedNotNULL(Uin
     using different data base pointer and offset/lenght arrays.
   */
   jam();
-  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  char *src_ptr= req_struct->m_var_data[ind].m_dyn_data_ptr;
   Uint32 var_index= AttributeOffset::getOffset(attrDes2);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 var_attr_pos= off_arr[var_index];
-  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+  Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
   Uint32 vsize_in_bytes= off_arr[var_index+idx] - var_attr_pos;
   return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
                         src_ptr + var_attr_pos, vsize_in_bytes);
@@ -1280,7 +1331,11 @@ Dbtup::readDynVarSizeExpandedNULLable(Ui
     Check for NULL. In the expanded format, the bitmap is guaranteed
     to be stored in full length.
   */
-  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
   {
@@ -1299,8 +1354,11 @@ Dbtup::readDynVarSizeShrunkenNotNULL(Uin
                                      AttributeHeader* ahOut,
                                      Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   ndbrequire(dyn_len!=0);
   Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
@@ -1315,7 +1373,7 @@ Dbtup::readDynVarSizeShrunkenNotNULL(Uin
     any trailing non-bitmap bytes to save a few conditionals.
   */
   Tablerec* regTabPtr = tabptr.p;
-  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask[ind];
   Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
   Uint32 prevMask= (1 << (pos & 31)) - 1;
   Uint32 bit_count= BitmaskImpl::count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
@@ -1343,8 +1401,11 @@ Dbtup::readDynVarSizeShrunkenNULLable(Ui
                                       AttributeHeader* ahOut,
                                       Uint32  attrDes2)
 {
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[ind].m_dyn_part_len;
   /* Check for NULL (including the case of an empty bitmap). */
   Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
   if(dyn_len == 0 || dynCheckNull(dyn_len, (* bm_ptr) & DYN_BM_LEN_MASK,
@@ -2037,10 +2098,13 @@ Dbtup::updateDynFixedSizeNotNULL(Uint32*
                                  Uint32  attrDes2)
 {
   Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
   Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
 
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   ndbassert(nullbits && nullbits <= 16);
 
   /*
@@ -2068,10 +2132,10 @@ Dbtup::updateDynFixedSizeNotNULL(Uint32*
 
   /* Compute the data and offset location and write the actual data. */
   Uint32 off_index= AttributeOffset::getOffset(attrDes2);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 offset= off_arr[off_index];
-  Uint32 *dst_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
-  Uint32 check_offset= req_struct->m_var_data[MM].m_max_dyn_offset;
+  Uint32 *dst_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
+  Uint32 check_offset= req_struct->m_var_data[ind].m_max_dyn_offset;
 
   ndbassert((offset&3)==0);
   ndbassert((check_offset&3)==0);
@@ -2092,9 +2156,12 @@ Dbtup::updateDynFixedSizeNULLable(Uint32
     return updateDynFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
 
   Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
   Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
 
   ndbassert(nullbits && nullbits <= 16);
 
@@ -2133,17 +2200,20 @@ Dbtup::updateDynBigFixedSizeNotNULL(Uint
                                   Uint32  attrDes2)
 {
   Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   
   jam();
   BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
   /* Compute the data and offset location and write the actual data. */
   Uint32 off_index= AttributeOffset::getOffset(attrDes2);
   Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 offset= off_arr[off_index];
-  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+  Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
 
   ndbassert((offset&3)==0);
   bool res= fixsize_updater(inBuffer,
@@ -2151,7 +2221,7 @@ Dbtup::updateDynBigFixedSizeNotNULL(Uint
                             attrDes2,
                             bm_ptr,
                             offset>>2,
-                            req_struct->m_var_data[MM].m_max_dyn_offset);
+                            req_struct->m_var_data[ind].m_max_dyn_offset);
   /* Set the correct size for fixsize data. */
   off_arr[off_index+idx]= offset+(noOfWords<<2);
   return res;
@@ -2162,10 +2232,13 @@ Dbtup::updateDynBigFixedSizeNULLable(Uin
                                    KeyReqStruct *req_struct,
                                    Uint32  attrDes2)
 {
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
   Uint32 nullIndicator= ahIn.isNULL();
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
-  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
   
   if (!nullIndicator)
     return updateDynBigFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
@@ -2189,9 +2262,12 @@ Dbtup::updateDynBitsNotNULL(Uint32* inBu
                             Uint32  attrDes2)
 {
   Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
   Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
-  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[ind].m_dyn_data_ptr);
   Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
   jam();
   BitmaskImpl::set(bm_len, bm_ptr, pos);
@@ -2227,6 +2303,9 @@ Dbtup::updateDynBitsNULLable(Uint32* inB
                              KeyReqStruct *req_struct,
                              Uint32  attrDes2)
 {
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
   Uint32 nullIndicator= ahIn.isNULL();
 
@@ -2234,7 +2313,7 @@ Dbtup::updateDynBitsNULLable(Uint32* inB
     return updateDynBitsNotNULL(inBuffer, req_struct, attrDes2);
 
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
-  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
 
   Uint32 newIndex= req_struct->in_buf_index + 1;
   if (newIndex <= req_struct->in_buf_len) {
@@ -2254,23 +2333,26 @@ Dbtup::updateDynVarSizeNotNULL(Uint32* i
                                KeyReqStruct *req_struct,
                                Uint32  attrDes2)
 {
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
-  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
   
   jam();
   BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
   /* Compute the data and offset location and write the actual data. */
   Uint32 off_index= AttributeOffset::getOffset(attrDes2);
-  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* off_arr= req_struct->m_var_data[ind].m_dyn_offset_arr_ptr;
   Uint32 offset= off_arr[off_index];
-  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+  Uint32 idx= req_struct->m_var_data[ind].m_dyn_len_offset;
 
   bool res= varsize_updater(inBuffer,
                             req_struct,
                             (char*)bm_ptr,
                             offset,
                             &(off_arr[off_index+idx]),
-                            req_struct->m_var_data[MM].m_max_dyn_offset);
+                            req_struct->m_var_data[ind].m_max_dyn_offset);
   return res;
 }
 
@@ -2279,10 +2361,13 @@ Dbtup::updateDynVarSizeNULLable(Uint32* 
                                 KeyReqStruct *req_struct,
                                 Uint32  attrDes2)
 {
+  Uint32 ind =
+    (AttributeDescriptor::getDiskBased(req_struct->attr_descriptor)) ? DD : MM;
+
   AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
   Uint32 nullIndicator= ahIn.isNULL();
   Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
-  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[ind].m_dyn_data_ptr;
   
   if (!nullIndicator)
     return updateDynVarSizeNotNULL(inBuffer, req_struct, attrDes2);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2009-10-20 16:10:06 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2009-11-09 09:42:56 +0000
@@ -86,11 +86,14 @@ Dbtup::execACC_SCANREQ(Signal* signal)
     if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
          tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0) 
     {
-      bits |= ScanOp::SCAN_VS;
-	
-      // disk pages have fixed page format
-      ndbrequire(! (bits & ScanOp::SCAN_DD));
+      if (bits & ScanOp::SCAN_DD)
+      {
+        // only dd scan varsize pages
+        // mm always has a fixed part
+        bits |= ScanOp::SCAN_VS;
+      }
     }
+
     if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) 
     {
       if (AccScanReq::getLockMode(req->requestInfo) == 0)
@@ -619,7 +622,7 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
     key.m_page_no = ext->m_first_page_no;
     pos.m_get = ScanPos::Get_page_dd;
   }
-  key.m_page_idx = 0;
+  key.m_page_idx = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
   // let scanNext() do the work
   scan.m_state = ScanOp::Next;
 }
@@ -652,7 +655,9 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
   const bool lcp = (bits & ScanOp::SCAN_LCP);
   
   Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
-  Uint32 size = table.m_offsets[mm].m_fix_header_size;
+  const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
+    table.m_offsets[mm].m_fix_header_size : 1;
+  const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
 
   if (lcp && lcp_list != RNIL)
   {
@@ -729,7 +734,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
           return true;
         }
     cont:
-        key.m_page_idx = 0;
+        key.m_page_idx = first;
         pos.m_get = ScanPos::Get_page_mm;
         // clear cached value
         pos.m_realpid_mm = RNIL;
@@ -791,7 +796,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
             key.m_page_no = ext->m_first_page_no;
           }
         }
-        key.m_page_idx = 0;
+        key.m_page_idx = first;
         pos.m_get = ScanPos::Get_page_dd;
         /*
           read ahead for scan in disk order
@@ -919,62 +924,89 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
       // get fixed size tuple
       jam();
       {
-        Fix_page* page = (Fix_page*)pos.m_page;
-        if (key.m_page_idx + size <= Fix_page::DATA_WORDS) 
-	{
-	  pos.m_get = ScanPos::Get_next_tuple_fs;
-#ifdef VM_TRACE
-          if (! (bits & ScanOp::SCAN_DD))
+        if ((bits & ScanOp::SCAN_VS) == 0)
+        {
+          Fix_page* page = (Fix_page*)pos.m_page;
+          if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
           {
-            Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
-            ndbassert(pos.m_realpid_mm == realpid);
+            jam();
+            pos.m_get = ScanPos::Get_next_tuple_fs;
+            th = (Tuple_header*)&page->m_data[key.m_page_idx];
           }
+          else
+          {
+            jam();
+            // no more tuples on this page
+            pos.m_get = ScanPos::Get_next_page;
+            break;
+          }
+        }
+        else
+        {
+          Var_page * page = (Var_page*)pos.m_page;
+          if (key.m_page_idx < page->high_index)
+          {
+            jam();
+            pos.m_get = ScanPos::Get_next_tuple_fs;
+            if (((Var_page*)page)->is_free(key.m_page_idx))
+             continue;
+            th = (Tuple_header*)page->get_ptr(key.m_page_idx);
+          }
+          else
+          {
+            jam();
+            // no more tuples on this page
+            pos.m_get = ScanPos::Get_next_page;
+            break;
+          }
+        }
+
+#ifdef VM_TRACE
+        if (! (bits & ScanOp::SCAN_DD))
+        {
+          Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
+          ndbassert(pos.m_realpid_mm == realpid);
+        }
 #endif
-          th = (Tuple_header*)&page->m_data[key.m_page_idx];
-	  
-	  if (likely(! (bits & ScanOp::SCAN_NR)))
-	  {
-	    jam();
-            thbits = th->m_header_bits;
-	    if (! (thbits & Tuple_header::FREE))
-	    {
-              goto found_tuple;
-	    } 
-	  }
-	  else
-	  {
-            if (pos.m_realpid_mm == RNIL)
+
+        if (likely(! (bits & ScanOp::SCAN_NR)))
+        {
+          jam();
+          thbits = th->m_header_bits;
+          if (! (thbits & Tuple_header::FREE))
+          {
+            goto found_tuple;
+          }
+        }
+        else
+        {
+          if (pos.m_realpid_mm == RNIL)
+          {
+            jam();
+            foundGCI = 0;
+            goto found_deleted_rowid;
+          }
+          thbits = th->m_header_bits;
+          if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
+              foundGCI == 0)
+          {
+            if (! (thbits & Tuple_header::FREE))
             {
               jam();
-              foundGCI = 0;
+              goto found_tuple;
+            }
+            else
+            {
               goto found_deleted_rowid;
             }
-            thbits = th->m_header_bits;
-	    if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
-                foundGCI == 0)
-	    {
-	      if (! (thbits & Tuple_header::FREE))
-	      {
-		jam();
-		goto found_tuple;
-	      }
-	      else
-	      {
-		goto found_deleted_rowid;
-	      }
-	    }
-	    else if (thbits != Fix_page::FREE_RECORD && 
-		     th->m_operation_ptr_i != RNIL)
-	    {
-	      jam();
-	      goto found_tuple; // Locked tuple...
-	      // skip free tuple
-	    }
-	  }
-        } else {
-          jam();
-          // no more tuples on this page
-          pos.m_get = ScanPos::Get_next_page;
+          }
+          else if (thbits != Fix_page::FREE_RECORD &&
+                   th->m_operation_ptr_i != RNIL)
+          {
+            jam();
+            goto found_tuple; // Locked tuple...
+            // skip free tuple
+          }
         }
       }
       break; // incr loop count

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2009-09-16 10:52:41 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2009-11-09 09:42:56 +0000
@@ -344,9 +344,9 @@ Dbtup::verifytabdes()
       }
       {
         Uint32 offset[3];
-        Uint32 MaskSize = (ptr.p->m_dyn_null_bits + 31) >> 5;
+        Uint32 MaskSize = (ptr.p->m_dyn_null_bits[MM] + 31) >> 5;
         const Uint32 alloc = getDynTabDescrOffsets(MaskSize, offset);
-        const Uint32 desc = ptr.p->dynTabDescriptor;
+        const Uint32 desc = ptr.p->dynTabDescriptor[MM];
         Uint32 size = alloc;
         if (size % ZTD_FREE_SIZE != 0)
           size += ZTD_FREE_SIZE - size % ZTD_FREE_SIZE;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp	2009-10-08 11:41:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp	2009-11-09 09:42:56 +0000
@@ -255,6 +255,11 @@ struct Tup_varsize_page
   Uint32 get_entry_chain(Uint32 page_idx) const {
     return get_index_word(page_idx) & CHAIN;
   }
+
+  bool is_free(Uint32 page_idx)
+  {
+    return (get_index_word(page_idx) & FREE) ? true : false;
+  }
 };
 
 NdbOut& operator<< (NdbOut& out, const Tup_varsize_page& page);


Attachment: [text/bzr-bundle] bzr/msabaratnam@mysql.com-20091109094256-jtp2ly3tsmqc3ri9.bundle
Thread
bzr push into mysql-5.1-telco-7.0 branch (msabaratnam:3196 to 3197) Maitrayi Sabaratnam9 Nov