List:Commits« Previous MessageNext Message »
From:jonas Date:April 3 2007 12:34pm
Subject:bk commit into 5.1 tree (jonas:1.2512)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-04-03 14:34:21+02:00, jonas@stripped +30 -0
  ndb - wl1190
    dynamic attribute + online add column

  storage/ndb/include/kernel/AttributeDescriptor.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +6 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/include/kernel/signaldata/AlterTab.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +27 -3
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/include/kernel/signaldata/AlterTable.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +18 -1
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/include/kernel/signaldata/DictTabInfo.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +2 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +13 -2
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/include/util/UtilBuffer.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +9 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +2 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +225 -45
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +2 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +13 -2
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +340 -25
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +27 -19
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +40 -11
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +567 -59
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +30 -0
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +2 -1
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +715 -138
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +953 -68
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +6 -3
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +26 -10
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +11 -21
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/kernel/blocks/dbtup/test_dynbm.c@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +228 -0
    New BitKeeper file ``storage/ndb/src/kernel/blocks/dbtup/test_dynbm.c''

  storage/ndb/src/kernel/blocks/dbtup/test_dynbm.c@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +0 -0

  storage/ndb/src/kernel/vm/DLFifoList.hpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +2 -2
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/ndbapi/NdbDictionary.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +16 -2
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-04-03 14:34:18+02:00, jonas@stripped +238 -219
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +16 -11
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/src/ndbapi/TransporterFacade.cpp@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +1 -1
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/test/include/NDBT_Table.hpp@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +3 -1
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/test/ndbapi/testDict.cpp@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +226 -4
    ndb - wl1190
      dynamic attribute + online add column

  storage/ndb/test/src/NDBT_Tables.cpp@stripped, 2007-04-03 14:34:19+02:00, jonas@stripped +89 -0
    ndb - wl1190
      dynamic attribute + online add column

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/check/51-telco

--- 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	2007-04-03 14:34:27 +02:00
+++ 1.9/storage/ndb/src/kernel/blocks/dbtup/DbtupVarAlloc.cpp	2007-04-03 14:34:27 +02:00
@@ -76,13 +76,6 @@
   {
     return 0;
   }
-
-  ndbassert(alloc_size >= tabPtr->m_offsets[MM].m_fix_header_size + 
-	    Tuple_header::HeaderSize);
-  
-  alloc_size -= tabPtr->m_offsets[MM].m_fix_header_size + 
-    Tuple_header::HeaderSize;
-
   
   Local_key varref;
   if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
@@ -191,11 +184,12 @@
   return;
 }
 
-int
+Uint32 *
 Dbtup::realloc_var_part(Fragrecord* fragPtr, Tablerec* tabPtr, PagePtr pagePtr,
 			Var_part_ref* refptr, Uint32 oldsz, Uint32 newsz)
 {
   Uint32 add = newsz - oldsz;
+  Uint32 *new_var_ptr;
   Var_page* pageP = (Var_page*)pagePtr.p;
   Local_key oldref;
   refptr->copyout(&oldref);
@@ -203,6 +197,7 @@
   if (pageP->free_space >= add)
   {
     jam();
+    new_var_ptr= pageP->get_ptr(oldref.m_page_idx);
     if(!pageP->is_space_behind_entry(oldref.m_page_idx, add))
     {
       if(0) printf("extra reorg");
@@ -215,11 +210,12 @@
        * the page before reorg_page to save the entry contents.
        */
       Uint32* copyBuffer= cinBuffer;
-      memcpy(copyBuffer, pageP->get_ptr(oldref.m_page_idx), 4*oldsz);
+      memcpy(copyBuffer, new_var_ptr, 4*oldsz);
       pageP->set_entry_len(oldref.m_page_idx, 0);
       pageP->free_space += oldsz;
       pageP->reorg((Var_page*)ctemp_page);
-      memcpy(pageP->get_free_space_ptr(), copyBuffer, 4*oldsz);
+      new_var_ptr= pageP->get_free_space_ptr();
+      memcpy(new_var_ptr, copyBuffer, 4*oldsz);
       pageP->set_entry_offset(oldref.m_page_idx, pageP->insert_pos);
       add += oldsz;
     }
@@ -230,20 +226,20 @@
   {
     Local_key newref;
     Uint32 *src = pageP->get_ptr(oldref.m_page_idx);
-    Uint32 *dst = alloc_var_part(fragPtr, tabPtr, newsz, &newref);
-    if (unlikely(dst == 0))
-      return -1;
+    new_var_ptr = alloc_var_part(fragPtr, tabPtr, newsz, &newref);
+    if (unlikely(new_var_ptr == 0))
+      return NULL;
 
     ndbassert(oldref.m_page_no != newref.m_page_no);
     ndbassert(pageP->get_entry_len(oldref.m_page_idx) == oldsz);
-    memcpy(dst, src, 4*oldsz);
+    memcpy(new_var_ptr, src, 4*oldsz);
     refptr->assign(&newref);
     
     pageP->free_record(oldref.m_page_idx, Var_page::CHAIN);
     update_free_page_list(fragPtr, pagePtr);    
   }
   
-  return 0;
+  return new_var_ptr;
 }
 
 
@@ -406,12 +402,6 @@
   {
     return 0;
   }
-
-  ndbassert(alloc_size >= tabPtr->m_offsets[MM].m_fix_header_size + 
-	    Tuple_header::HeaderSize);
-  
-  alloc_size -= tabPtr->m_offsets[MM].m_fix_header_size + 
-    Tuple_header::HeaderSize;
 
   Local_key varref;
   if (likely(alloc_var_part(fragPtr, tabPtr, alloc_size, &varref) != 0))
--- New file ---
+++ storage/ndb/src/kernel/blocks/dbtup/test_dynbm.c	07/04/03 14:34:19
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <assert.h>

#define N (1024*1024)
#define S 65537
/* The number S must be relative prime to N. */

uint32_t bm[N*4];

uint32_t bms[N][4];
uint32_t len[N];
uint32_t pos[N];

typedef uint32_t Uint32;
#define MEMCOPY_NO_WORDS(to, from, no_of_words) \
  memcpy((to), (void*)(from), (size_t)(no_of_words << 2));

/****************************************************************************/
static void
getbits(const Uint32 *src, Uint32 bit_pos, Uint32 *dst, Uint32 count)
{
  Uint32 val;

  /* Move to start word in src. */
  src+= bit_pos>>5;
  bit_pos&= 31;

  /*
    If word-aligned, copy word-for-word is faster and avoids edge
    cases with undefined bitshift operations.
  */
  if (bit_pos==0)
  {
    MEMCOPY_NO_WORDS(dst, src, count>>5);
    src+= count>>5;
    dst+= count>>5;
    count&= 31;
  }
  else
  {
    while(count >= 32)
    {
      /*
        Get bits 0-X from first source word.
        Get bits (X+1)-31 from second source word.
        Handle endian so that we store bit 0 in the first byte, and bit 31 in
        the last byte, so that we don't waste space on 32-bit aligning the
        bitmap.
      */
#ifdef WORDS_BIGENDIAN
      Uint32 firstpart_len= 32-bit_pos;
      val= *src++ & (((Uint32)1<<firstpart_len)-1);
      val|= *src & ((Uint32)0xffffffff << firstpart_len);
#else
      val= *src++ >> bit_pos;
      val|= *src << (32-bit_pos);
#endif
      *dst++= val;
      count-= 32;
    }
  }

  /* Handle any partial word at the end. */
  if (count>0)
  {
    if (bit_pos+count <= 32)
    {
      /* Last part is wholly contained in one source word. */
#ifdef WORDS_BIGENDIAN
      val= *src >> (32-(bit_pos+count));
#else
      val= *src >> bit_pos;
#endif
    }
    else
    {
      /* Need to assemble last part from two source words. */
#ifdef WORDS_BIGENDIAN
      Uint32 firstpart_len= 32-bit_pos;
      val= *src++ & (((Uint32)1<<firstpart_len)-1);
      val|= (*src >> (32-count)) & ((Uint32)0xffffffff << firstpart_len);
#else
      val= *src++ >> bit_pos;
      val|= *src << (32-bit_pos);
#endif
    }
    /* Mask off any unused bits. */
    *dst= val & (((Uint32)1<<count)-1);
  }
}

static void
setbits(const Uint32 *src, Uint32 *dst, Uint32 bit_pos, Uint32 count)
{
  Uint32 val;

  /* Move to start word in dst. */

  dst+= bit_pos>>5;
  bit_pos&= 31;

#ifdef WORDS_BIGENDIAN
  Uint32 low_mask= ((Uint32)0xffffffff)<<(32-bit_pos);
  Uint32 high_mask= ~low_mask;
#else
  Uint32 low_mask= (((Uint32)1)<<bit_pos) - 1;
  Uint32 high_mask= ~low_mask;
#endif

  if (bit_pos==0)
  {
    MEMCOPY_NO_WORDS(dst, src, count>>5);
    src+= count>>5;
    dst+= count>>5;
    count&= 31;
  }
  else
  {
    while (count >= 32)
    {
      val= *src++;
#ifdef WORDS_BIGENDIAN
      *dst= (*dst&low_mask) | (val&high_mask);
      dst++;
      *dst= (*dst&high_mask) | (val&low_mask);
#else
      *dst= (*dst&low_mask) | (val<<bit_pos);
      dst++;
      *dst= (*dst&high_mask) | (val>>(32-bit_pos));
#endif
      count-= 32;
    }
  }

  /* Handle any partial word at the end. */
  if (count > 0)
  {
    val= *src;
    if (bit_pos+count <= 32)
    {
      /* Remaining part fits in one word of destination. */
      Uint32 end_mask= (((Uint32)1)<<count) - 1;
#ifdef WORDS_BIGENDIAN
      Uint32 shift= (32-(bit_pos+count));
      *dst= (*dst&~(end_mask<<shift)) | ((val&end_mask)<<shift);
#else
      *dst= (*dst&~(end_mask<<bit_pos)) | ((val&end_mask)<<bit_pos);
#endif
    }
    else
    {
      /* Need to split the remaining part across two destination words. */
#ifdef WORDS_BIGENDIAN
      *dst= (*dst&low_mask) | (val&high_mask);
      dst++;
      Uint32 shift= 32-count;
      Uint32 end_mask= ((((Uint32)1)<<(bit_pos+count-32)) - 1) << (32-bit_pos);
      *dst= (*dst&~(end_mask<<shift)) | ((val&end_mask)<<shift);
#else
      *dst= (*dst&low_mask) | (val<<bit_pos);
      dst++;
      Uint32 end_mask= (((Uint32)1)<<(count+bit_pos-32)) - 1;
      *dst= (*dst&~end_mask) | ((val>>(32-bit_pos))&end_mask);
#endif
    }
  }
}
/****************************************************************************/

/* Set up a bunch of test bit fields. */
void fill(void)
{
  uint32_t i,j;
  uint32_t p= 0;

  for(i= 0; i<N; i++)
  {
    memset(bms[i], 0, sizeof(bms[i]));
    pos[i]= p;
    do
      len[i]= rand()%128;
    while (!len[i]);
    p+= len[i];
    for(j= 0; j<len[i]; j++)
      if(rand()%2)
        bms[i][j>>5]|= (((uint32_t)1)<<(j&31));
  }
}

void write(void)
{
  uint32_t i, idx;

  for(i=0, idx=0; i<N; i++, idx+= S)
  {
    if(idx>=N)
      idx-= N;
    setbits(&(bms[idx][0]), &(bm[0]), pos[idx], len[idx]);
  }
}

void read(void)
{
  uint32_t buf[4];
  uint32_t i;

  for(i=0; i<N; i++)
  {
    getbits(&(bm[0]), pos[i], &(buf[0]), len[i]);
    assert(0==memcmp(buf, bms[i], ((len[i]+31)>>5)<<2));
  }
}

int main(int argc, char *argv[])
{
  uint32_t i;

  srand(1);
  fill();
  write();
  read();

  exit(0);
  return 0;
}


--- 1.10/storage/ndb/include/kernel/AttributeDescriptor.hpp	2007-04-03 14:34:27 +02:00
+++ 1.11/storage/ndb/include/kernel/AttributeDescriptor.hpp	2007-04-03 14:34:27 +02:00
@@ -56,12 +56,18 @@
  * a = Array type            - 2  Bits -> Max 3  (Bit 0-1)
  * t = Attribute type        - 5  Bits -> Max 31  (Bit 2-6)
  * s = Attribute size        - 3  Bits -> Max 7  (Bit 8-10)
+ *                                0 is for bit types, stored in bitmap
+ *                                1-2 unused
+ *                                3 for byte-sized (char...)
+ *                                4 for 16-bit sized
+ *                                etc.
  * d = Disk based            - 1  Bit 11
  * n = Nullable              - 1  Bit 12
  * k = Distribution Key Ind  - 1  Bit 13
  * p = Primary key attribute - 1  Bit 14
  * y = Dynamic attribute     - 1  Bit 15
  * z = Array size            - 16 Bits -> Max 65535 (Bit 16-31)
+ *                                Element size is determined by attribute size
  *
  *           1111111111222222222233
  * 01234567890123456789012345678901

--- 1.5/storage/ndb/include/kernel/signaldata/AlterTab.hpp	2007-04-03 14:34:27 +02:00
+++ 1.6/storage/ndb/include/kernel/signaldata/AlterTab.hpp	2007-04-03 14:34:27 +02:00
@@ -22,7 +22,7 @@
 /**
  * AlterTab
  *
- * Implemenatation of AlterTable
+ * Implemetation of AlterTable
  */
 class AlterTabReq {
   /**
@@ -33,6 +33,7 @@
   friend class Dbtc;
   friend class Dblqh;
   friend class Suma;
+  friend class Dbtup;
 
   /**
    * For printing
@@ -40,7 +41,7 @@
   friend bool printALTER_TAB_REQ(FILE*, const Uint32*, Uint32, Uint16);
   
 public:
-  STATIC_CONST( SignalLength = 9 );
+  STATIC_CONST( SignalLength = 12 );
   
   enum RequestType {
     AlterTablePrepare = 0, // Prepare alter table
@@ -59,7 +60,16 @@
   Uint32 gci;
   Uint32 requestType;
 
+  /* Only used when sending to TUP. */
+  Uint32 noOfNewAttr;
+  Uint32 newNoOfCharsets;
+  Uint32 newNoOfKeyAttrs;
+
   SECTION( DICT_TAB_INFO = 0 );
+  /*
+    When sent to DICT, the first section contains the new table definition.
+    When sent to TUP, the first section contains the new attributes.
+  */
 };
 
 struct AlterTabRef {
@@ -110,7 +120,7 @@
   friend bool printALTER_TAB_CONF(FILE *, const Uint32 *, Uint32, Uint16);
   
 public:
-  STATIC_CONST( SignalLength = 7 );
+  STATIC_CONST( SignalLength = 8 );
 
 private:
   Uint32 senderRef;
@@ -120,6 +130,20 @@
   Uint32 tableVersion;
   Uint32 gci;
   Uint32 requestType;
+
+  /* Only used when sent from TUP. */
+  Uint32 clientData;
+};
+
+/*
+  This union can be used to safely refer to a signal data part
+  simultaneously as AlterTab{Req,Ref,Conf} without violating the
+  strict aliasing rule.
+*/
+union AlterTabAll {
+  AlterTabReq req;
+  AlterTabRef ref;
+  AlterTabConf conf;
 };
 
 #endif

--- 1.12/storage/ndb/include/kernel/signaldata/AlterTable.hpp	2007-04-03 14:34:27 +02:00
+++ 1.13/storage/ndb/include/kernel/signaldata/AlterTable.hpp	2007-04-03 14:34:27 +02:00
@@ -38,6 +38,7 @@
   friend class NdbEventOperationImpl;
   friend class NdbDictInterface;
   friend class Dbdict;
+  friend class Dbtup;
   friend class Suma;
 
   /**
@@ -68,10 +69,11 @@
   r = Changed range or list array
   t = Changed tablespace name array
   s = Changed tablespace id array
+  a = Add attribute
 
            1111111111222222222233
  01234567890123456789012345678901
- nf------------------------------
+ nfdrtsa-------------------------
 */
 #define NAME_SHIFT        (0)
 #define FRM_SHIFT         (1)
@@ -79,6 +81,7 @@
 #define RANGE_LIST_SHIFT  (3)
 #define TS_NAME_SHIFT     (4)
 #define TS_SHIFT          (5)
+#define ADD_ATTR_SHIFT    (6)
 
  /**
    * Getters and setters
@@ -95,6 +98,8 @@
   static void setTsNameFlag(UintR &  changeMask, Uint32 tsNameFlg);
   static Uint8 getTsFlag(const UintR & changeMask);
   static void setTsFlag(UintR &  changeMask, Uint32 tsFlg);
+  static Uint8 getAddAttrFlag(const UintR & changeMask);
+  static void setAddAttrFlag(UintR &  changeMask, Uint32 tsFlg);
 };
 
 inline
@@ -167,6 +172,18 @@
 void
 AlterTableReq::setTsNameFlag(UintR & changeMask, Uint32 tsNameFlg){
   changeMask |= (tsNameFlg << TS_NAME_SHIFT);
+}
+
+inline
+Uint8
+AlterTableReq::getAddAttrFlag(const UintR & changeMask){
+  return (Uint8)((changeMask >> ADD_ATTR_SHIFT) & 1);
+}
+
+inline
+void
+AlterTableReq::setAddAttrFlag(UintR & changeMask, Uint32 addAttrFlg){
+  changeMask |= (addAttrFlg << ADD_ATTR_SHIFT);
 }
 
 

--- 1.38/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp	2007-04-03 14:34:27 +02:00
+++ 1.39/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp	2007-04-03 14:34:27 +02:00
@@ -152,6 +152,7 @@
     AttributeKeyFlag       = 1006, //Default noKey
     AttributeStorageType   = 1007, //Default NDB_STORAGETYPE_MEMORY
     AttributeNullableFlag  = 1008, //Default NotNullable
+    AttributeDynamic       = 1009, //Default not dynamic
     AttributeDKey          = 1010, //Default NotDKey
     AttributeExtType       = 1013, //Default ExtUnsigned
     AttributeExtPrecision  = 1014, //Default 0
@@ -410,6 +411,7 @@
     Uint32 AttributeExtLength;
     Uint32 AttributeAutoIncrement;
     Uint32 AttributeStorageType;
+    Uint32 AttributeDynamic;
     char   AttributeDefaultValue[MAX_ATTR_DEFAULT_VALUE_SIZE];
     
     Attribute() {}

--- 1.90/storage/ndb/include/ndbapi/NdbDictionary.hpp	2007-04-03 14:34:27 +02:00
+++ 1.91/storage/ndb/include/ndbapi/NdbDictionary.hpp	2007-04-03 14:34:27 +02:00
@@ -402,6 +402,11 @@
     ArrayType getArrayType() const;
     StorageType getStorageType() const;
 
+    /**
+     * Get if the column is dynamic (NULL values not stored)
+     */
+    bool getDynamic() const;
+
     /** @} *******************************************************************/
 
 
@@ -513,6 +518,11 @@
     void setArrayType(ArrayType type);
     void setStorageType(StorageType type);
 
+    /**
+     * Set whether column is dynamic.
+     */
+    void setDynamic(bool);
+
     /** @} *******************************************************************/
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
@@ -1812,12 +1822,13 @@
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     /**
      * Alter defined table given defined Table instance
-     * @param table Table to alter
+     * @param f Table to alter
+     * @param t New definition of table
      * @return  -2 (incompatible version) <br>
      *          -1 general error          <br>
      *           0 success                 
      */
-    int alterTable(const Table &table);
+    int alterTable(const Table & f, const Table & t);
 
     /**
      * Invalidate cached table object

--- 1.8/storage/ndb/include/util/UtilBuffer.hpp	2007-04-03 14:34:27 +02:00
+++ 1.9/storage/ndb/include/util/UtilBuffer.hpp	2007-04-03 14:34:27 +02:00
@@ -92,6 +92,15 @@
   void *get_data() const { return data; }
 
   bool empty () const { return len == 0; }
+
+  bool equal(const UtilBuffer &cmp) const {
+    if(len==0 && cmp.len==0)
+      return true;
+    else if(len!=cmp.len)
+      return false;
+    else
+      return (memcmp(get_data(), cmp.get_data(), len) == 0);
+  }
 private:
   void *data;          /* Pointer to data storage */
   size_t len;          /* Size of the stored data */

--- 1.21/storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp	2007-04-03 14:34:27 +02:00
+++ 1.22/storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp	2007-04-03 14:34:27 +02:00
@@ -88,6 +88,7 @@
   DTIMAP2(Attribute, AttributeNullableFlag, AttributeNullableFlag, 0, 1),
   DTIMAP2(Attribute, AttributeDKey, AttributeDKey, 0, 1),
   DTIMAP2(Attribute, AttributeStorageType, AttributeStorageType, 0, 1),
+  DTIMAP2(Attribute, AttributeDynamic, AttributeDynamic, 0, 1),
   DTIMAP(Attribute, AttributeExtType, AttributeExtType),
   DTIMAP(Attribute, AttributeExtPrecision, AttributeExtPrecision),
   DTIMAP(Attribute, AttributeExtScale, AttributeExtScale),
@@ -186,6 +187,7 @@
   AttributeExtLength = 0,
   AttributeAutoIncrement = false;
   AttributeStorageType = 0;
+  AttributeDynamic = 0;                         // Default is not dynamic
   memset(AttributeDefaultValue, 0, sizeof(AttributeDefaultValue));//AttributeDefaultValue[0] = 0;
 }
 

--- 1.125/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-04-03 14:34:27 +02:00
+++ 1.126/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2007-04-03 14:34:27 +02:00
@@ -556,6 +556,7 @@
     const Uint32 nullable = AttributeDescriptor::getNullable(desc);
     const Uint32 DKey = AttributeDescriptor::getDKey(desc);
     const Uint32 disk= AttributeDescriptor::getDiskBased(desc);
+    const Uint32 dynamic= AttributeDescriptor::getDynamic(desc);
     
 
     // AttributeType deprecated
@@ -563,6 +564,7 @@
     w.add(DictTabInfo::AttributeArraySize, arraySize);
     w.add(DictTabInfo::AttributeArrayType, arrayType);
     w.add(DictTabInfo::AttributeNullableFlag, nullable);
+    w.add(DictTabInfo::AttributeDynamic, dynamic);
     w.add(DictTabInfo::AttributeDKey, DKey);
     w.add(DictTabInfo::AttributeExtType, attrType);
     w.add(DictTabInfo::AttributeExtPrecision, attrPtr.p->extPrecision);
@@ -4038,6 +4040,7 @@
   alterTabPtr.p->m_tablePtrI = parseRecord.tablePtr.i;
   alterTabPtr.p->m_alterTableFailed = false;
   alterTabPtr.p->m_coordinatorRef = reference();
+  alterTabPtr.p->m_tupAlterTabPtr= RNIL;
   alterTabPtr.p->m_fragmentsPtrI = RNIL;
   alterTabPtr.p->m_dihAddFragPtr = RNIL;
   alterTabPtr.p->m_alterTableId = tablePtr.p->tableId;
@@ -4277,16 +4280,20 @@
       alterTabPtr.p->m_senderRef = senderRef;
       alterTabPtr.p->m_senderData = senderData;
       alterTabPtr.p->m_tablePtrI = parseRecord.tablePtr.i;
+      alterTabPtr.p->m_tupAlterTabPtr= RNIL;
       alterTabPtr.p->m_fragmentsPtrI = RNIL;
       alterTabPtr.p->m_dihAddFragPtr = RNIL;
       newTablePtr = parseRecord.tablePtr;
       newTablePtr.p->tableVersion = tableVersion;
+      ndbrequire(newTablePtr.p->noOfAttributes >= tablePtr.p->noOfAttributes);
     }
     else { // (req->senderRef  == reference())
       jam();
       c_tableRecordPool.getPtr(newTablePtr, alterTabPtr.p->m_tablePtrI);
       newTablePtr.p->tableVersion = tableVersion;
     }
+    Uint32 oldNoOfAttributes= tablePtr.p->noOfAttributes;
+    alterTabPtr.p->m_new_cols= newTablePtr.p->noOfAttributes-oldNoOfAttributes;
     if (handleAlterTab(req, alterTabPtr.p, tablePtr, newTablePtr) == -1) {
       jam();
       c_opCreateTable.release(alterTabPtr);
@@ -4294,69 +4301,96 @@
       return;
     }
     releaseSections(signal);
+
     // Propagate alter table to other local blocks
-    AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
-    req->senderRef = reference();
-    req->senderData = senderData;
-    req->changeMask = changeMask;
+    // First to TUP, which is the only one that needs info about added attrs
+
+    /* Copy out descriptor and charset for any new attributes. */
+    Uint32 *attrData= &(signal->theData[0])+25;
+    Uint32 *p= attrData;
+    LocalDLFifoList<AttributeRecord> list(c_attributeRecordPool, 
+                                          tablePtr.p->m_attributes);
+    AttributeRecordPtr attrPtr;
+    list.first(attrPtr);
+    for(Uint32 i= 0; i<tablePtr.p->noOfAttributes; i++)
+    {
+      if (i >= oldNoOfAttributes)
+      {
+        *p++= attrPtr.p->attributeDescriptor;
+        *p++= attrPtr.p->extPrecision & ~0xFFFF;
+      }
+      list.next(attrPtr);
+    }
+
+    AlterTabReq *req= (AlterTabReq *)signal->getDataPtrSend();
+    req->senderRef= reference();
+    req->changeMask= changeMask;
+    req->senderData= senderData;
     req->tableId = tableId;
     req->tableVersion = tableVersion;
     req->gci = gci;
     req->requestType = requestType;
-    sendSignal(DBLQH_REF, GSN_ALTER_TAB_REQ, signal, 
-	       AlterTabReq::SignalLength, JBB);	
+    req->noOfNewAttr= alterTabPtr.p->m_new_cols;
+    req->newNoOfCharsets= newTablePtr.p->noOfCharsets;
+    req->newNoOfKeyAttrs= newTablePtr.p->noOfPrimkey;
+
+    if (req->noOfNewAttr>0)
+    {
+      ndbrequire(AlterTableReq::getAddAttrFlag(changeMask));
+      /* Send long signal with info for all attributes to TUP. */
+      LinearSectionPtr ptr[3];
+      ptr[0].p= attrData;
+      ptr[0].sz= 2*req->noOfNewAttr;
+      sendSignal(DBTUP_REF, GSN_ALTER_TAB_REQ, signal,
+                 AlterTabReq::SignalLength, JBB, ptr, 1);
+    }
+    else
+    {
+      ndbrequire(!AlterTableReq::getAddAttrFlag(changeMask));
+      /* No linear section since no attributes to send. */
+      sendSignal(DBTUP_REF, GSN_ALTER_TAB_REQ, signal,
+                 AlterTabReq::SignalLength, JBB);
+    }
     return;
   }
   case(AlterTabReq::AlterTableCommit): {
     jam();
-    // Write schema for altered table to disk
+
     SegmentedSectionPtr tabInfoPtr;
     signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
     alterTabPtr.p->m_tabInfoPtrI = tabInfoPtr.i;
-    bool savetodisk = !(tablePtr.p->m_bits & TableRecord::TR_Temporary);
-    
     signal->header.m_noOfSections = 0;
 
-    // Update table record
-    tablePtr.p->packedSize = tabInfoPtr.sz;
-    tablePtr.p->tableVersion = tableVersion;
-    tablePtr.p->gciTableCreated = gci;
+    /* Send alter table commit to TUP. */
+    AlterTabReq *req= (AlterTabReq *)signal->getDataPtrSend();
+    req->senderRef= reference();
+    req->changeMask= changeMask;
+    req->senderData= senderData;
+    req->clientData= alterTabPtr.p->m_tupAlterTabPtr;
+    req->tableId = tableId;
+    req->tableVersion = tableVersion;
+    req->gci = gci;
+    req->requestType = requestType;
+    sendSignal(DBTUP_REF, GSN_ALTER_TAB_REQ, signal,
+               AlterTabReq::SignalLength, JBB);
+    return;
 
-    SchemaFile::TableEntry tabEntry;
-    tabEntry.m_tableVersion = tableVersion;
-    tabEntry.m_tableType    = tablePtr.p->tableType;
-    if (savetodisk)
-      tabEntry.m_tableState   = SchemaFile::ALTER_TABLE_COMMITTED;
-    else
-      tabEntry.m_tableState   = SchemaFile::TEMPORARY_TABLE_COMMITTED;
-    tabEntry.m_gcp          = gci;
-    tabEntry.m_info_words   = tabInfoPtr.sz;
-    memset(tabEntry.m_unused, 0, sizeof(tabEntry.m_unused));
-    
-    Callback callback;
-    callback.m_callbackData = senderData;
-    callback.m_callbackFunction = 
-      safe_cast(&Dbdict::alterTab_writeSchemaConf);
-    
-    updateSchemaState(signal, tableId, &tabEntry, &callback, savetodisk);
-    break;
   }
   case(AlterTabReq::AlterTableRevert): {
     jam();
-    // Revert failed alter table
-    revertAlterTable(signal, changeMask, tableId, alterTabPtr.p);
-    // Acknowledge the reverted alter table
-    AlterTabConf * conf = (AlterTabConf*)signal->getDataPtrSend();
-    conf->senderRef = reference();
-    conf->senderData = senderData;
-    conf->changeMask = changeMask;
-    conf->tableId = tableId;
-    conf->tableVersion = tableVersion;
-    conf->gci = gci;
-    conf->requestType = requestType;
-    sendSignal(senderRef, GSN_ALTER_TAB_CONF, signal, 
-	       AlterTabConf::SignalLength, JBB);
-    break;
+    /* Send alter table abort to TUP. */
+    AlterTabReq *req= (AlterTabReq *)signal->getDataPtrSend();
+    req->senderRef= reference();
+    req->changeMask= changeMask;
+    req->senderData= senderData;
+    req->clientData= alterTabPtr.p->m_tupAlterTabPtr;
+    req->tableId = tableId;
+    req->tableVersion = tableVersion;
+    req->gci = gci;
+    req->requestType = requestType;
+    sendSignal(DBTUP_REF, GSN_ALTER_TAB_REQ, signal,
+               AlterTabReq::SignalLength, JBB);
+    return;
   }
   default: ndbrequire(false);
   }
@@ -4408,6 +4442,28 @@
     (AlterTabReq::RequestType) ref->requestType;
   CreateTableRecordPtr alterTabPtr;  
   ndbrequire(c_opCreateTable.find(alterTabPtr, senderData));
+
+  if(refToBlock(signal->getSendersBlockRef())==DBTUP)
+  {
+    /* Error from TUP causes abort of alter table. */
+    jam();
+
+    ref= (AlterTabRef*)signal->getDataPtrSend();
+    ref->senderData = senderData;
+    ref->senderRef = reference();
+    ref->errorCode = errorCode;
+    ref->errorLine = 0;
+    ref->errorKey = 0;
+    ref->errorStatus = 0;
+    sendSignal(alterTabPtr.p->m_senderRef, GSN_ALTER_TAB_REF, signal, 
+               AlterTabRef::SignalLength, JBB);
+  
+    c_blockState = BS_IDLE;
+
+    return;
+  }
+
+  /* Else it is coordinator receiving abort from local DICT alter table. */
   Uint32 changeMask = alterTabPtr.p->m_changeMask;
   SafeCounter safeCounter(c_counterMgr, alterTabPtr.p->m_coordinatorData.m_counter);
   safeCounter.clearWaitingFor(refToNode(senderRef));
@@ -4498,6 +4554,24 @@
   switch (requestType) {
   case(AlterTabReq::AlterTablePrepare): {
     switch(refToBlock(signal->getSendersBlockRef())) {
+    case DBTUP: {
+      jam();
+
+      /* Need to store TUP's operation record reference for commit/revert. */
+      alterTabPtr.p->m_tupAlterTabPtr= conf->clientData;
+
+      AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
+      req->senderRef = reference();
+      req->senderData = senderData;
+      req->changeMask = changeMask;
+      req->tableId = tableId;
+      req->tableVersion = tableVersion;
+      req->gci = gci;
+      req->requestType = requestType;
+      sendSignal(DBLQH_REF, GSN_ALTER_TAB_REQ, signal, 
+                 AlterTabReq::SignalLength, JBB);	
+      return;
+    }
     case DBLQH: {
       jam();
       AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
@@ -4619,7 +4693,66 @@
   }
   case(AlterTabReq::AlterTableRevert):
     jam();
+    if(refToBlock(signal->getSendersBlockRef())==DBTUP)
+    {
+      jam();
+      // Revert failed alter table
+      revertAlterTable(signal, changeMask, tableId, alterTabPtr.p);
+      // Acknowledge the reverted alter table
+      AlterTabConf * conf = (AlterTabConf*)signal->getDataPtrSend();
+      conf->senderRef = reference();
+      conf->senderData = senderData;
+      conf->changeMask = changeMask;
+      conf->tableId = tableId;
+      conf->tableVersion = tableVersion;
+      conf->gci = gci;
+      conf->requestType = requestType;
+      sendSignal(alterTabPtr.p->m_senderRef, GSN_ALTER_TAB_CONF, signal, 
+                 AlterTabConf::SignalLength, JBB);
+      return;
+    }
+    /* Else fall through to commit case... */
+
   case(AlterTabReq::AlterTableCommit): {
+    jam();
+    if(refToBlock(signal->getSendersBlockRef())==DBTUP)
+    {
+      jam();
+      // TUP alter tab commit done. Write schema for altered table to disk
+      SegmentedSectionPtr tabInfoPtr;
+      getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
+      TableRecordPtr tablePtr;
+      c_tableRecordPool.getPtr(tablePtr, tableId, false);
+      ndbrequire(!tablePtr.isNull());
+
+      bool savetodisk = !(tablePtr.p->m_bits & TableRecord::TR_Temporary);
+
+      // Update table record
+      tablePtr.p->packedSize = tabInfoPtr.sz;
+      tablePtr.p->tableVersion = tableVersion;
+      tablePtr.p->gciTableCreated = gci;
+
+      SchemaFile::TableEntry tabEntry;
+      tabEntry.m_tableVersion = tableVersion;
+      tabEntry.m_tableType    = tablePtr.p->tableType;
+      if (savetodisk)
+        tabEntry.m_tableState   = SchemaFile::ALTER_TABLE_COMMITTED;
+      else
+        tabEntry.m_tableState   = SchemaFile::TEMPORARY_TABLE_COMMITTED;
+      tabEntry.m_gcp          = gci;
+      tabEntry.m_info_words   = tabInfoPtr.sz;
+      memset(tabEntry.m_unused, 0, sizeof(tabEntry.m_unused));
+    
+      Callback callback;
+      callback.m_callbackData = senderData;
+      callback.m_callbackFunction = 
+        safe_cast(&Dbdict::alterTab_writeSchemaConf);
+    
+      updateSchemaState(signal, tableId, &tabEntry, &callback, savetodisk);
+      return;
+    }
+
+    /* A confirm signal from local DICT nodes to coordinator DICT. */
     SafeCounter safeCounter(c_counterMgr, alterTabPtr.p->m_coordinatorData.m_counter);
     safeCounter.clearWaitingFor(refToNode(senderRef));
     if (safeCounter.done()) {
@@ -4730,6 +4863,35 @@
     ndbrequire(org.assign(tmp, src.size()));
   }
 
+  if (AlterTableReq::getAddAttrFlag(changeMask))
+  {
+    /* Attribute(s) added. */
+    supportedAlteration= true;
+
+    /* Move the column definitions to the real table definitions. */
+    LocalDLFifoList<AttributeRecord> origlist(c_attributeRecordPool, 
+                                              origTablePtr.p->m_attributes);
+    LocalDLFifoList<AttributeRecord> newlist(c_attributeRecordPool, 
+                                             newTablePtr.p->m_attributes);
+
+    /* Move back to find the first column to move. */
+    AttributeRecordPtr p;
+    ndbrequire(alterTabPtrP->m_new_cols>0);
+    ndbrequire(newlist.last(p));
+    for (Uint32 i= 1; i<alterTabPtrP->m_new_cols; i++)
+      ndbrequire(newlist.prev(p));
+
+    /* Move columns. */
+    for (Uint32 i= 0; i<alterTabPtrP->m_new_cols; i++)
+    {
+      AttributeRecordPtr q= p;
+      newlist.next(p);
+      newlist.remove(q);
+      origlist.addLast(q);
+    }
+    origTablePtr.p->noOfAttributes+= alterTabPtrP->m_new_cols;
+  }
+
 /*
 */
@@ -4788,6 +4950,23 @@
     
   }
   
+  if (AlterTableReq::getAddAttrFlag(changeMask))
+  {
+    /* Attribute(s) added. */
+    supportedAlteration= true;
+
+    /* Release the extra columns, not to be used anyway. */
+    LocalDLFifoList<AttributeRecord> list(c_attributeRecordPool, 
+                                          tablePtr.p->m_attributes);
+
+    ndbrequire(alterTabPtrP->m_new_cols>0);
+    for (Uint32 i= 0; i<alterTabPtrP->m_new_cols; i++)
+    {
+      AttributeRecordPtr p;
+      ndbrequire(list.last(p));
+      list.release(p);
+    }
+  }
 
   if (supportedAlteration)
   {
@@ -6351,6 +6530,7 @@
     AttributeDescriptor::setDKey(desc, attrDesc.AttributeDKey);
     AttributeDescriptor::setPrimaryKey(desc, attrDesc.AttributeKeyFlag);
     AttributeDescriptor::setDiskBased(desc, attrDesc.AttributeStorageType == NDB_STORAGETYPE_DISK);
+    AttributeDescriptor::setDynamic(desc, attrDesc.AttributeDynamic);
     attrPtr.p->attributeDescriptor = desc;
     attrPtr.p->autoIncrement = attrDesc.AttributeAutoIncrement;
     {

--- 1.54/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-04-03 14:34:27 +02:00
+++ 1.55/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2007-04-03 14:34:27 +02:00
@@ -1162,9 +1162,11 @@
 
     // For alter table
     Uint32 m_changeMask;
+    Uint32 m_new_cols;
     bool m_alterTableFailed;
     AlterTableRef m_alterTableRef;
     Uint32 m_alterTableId;
+    Uint32 m_tupAlterTabPtr;                    // Connect ptr towards TUP
 
     /* Previous table name (used for reverting failed table rename) */
     char previousTableName[MAX_TAB_NAME_SIZE];

--- 1.5/storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp	2007-04-03 14:34:27 +02:00
+++ 1.6/storage/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp	2007-04-03 14:34:27 +02:00
@@ -19,7 +19,7 @@
 class AttributeOffset {
   friend class Dbtup;
   
-private:
+public:
   static void   setOffset(Uint32 & desc, Uint32 offset);
   static void   setCharsetPos(Uint32 & desc, Uint32 offset);
   static void   setNullFlagPos(Uint32 & desc, Uint32 offset);
@@ -29,6 +29,7 @@
   static Uint32 getCharsetPos(const Uint32 &);
   static Uint32 getNullFlagPos(const Uint32 &);
   static Uint32 getNullFlagOffset(const Uint32 &);
+  static Uint32 getNullFlagByteOffset(const Uint32 & desc);
   static Uint32 getNullFlagBitOffset(const Uint32 &);
   
   Uint32 m_data;
@@ -44,7 +45,7 @@
  * c = Has charset flag           1  bits 11-11
  * s = Charset pointer position - 7  bits 12-18 ( in table descriptor )
  * f = Null flag offset in word - 5  bits 20-24 ( address 32 bits )
- * w = Null word offset         - 7  bits 25-32 ( f+w addr 4096 attrs )
+ * w = Null word offset         - 7  bits 25-31 ( f+w addr 4096 attrs )
  *
  *           1111111111222222222233
  * 01234567890123456789012345678901
@@ -63,6 +64,7 @@
 
 #define AO_NULL_FLAG_WORD_MASK          31      // f
 #define AO_NULL_FLAG_OFFSET_SHIFT       5
+#define AO_NULL_FLAG_BYTE_OFFSET_SHIFT  3
 
 inline
 void
@@ -115,11 +117,20 @@
   return ((desc >> AO_NULL_FLAG_POS_SHIFT) & AO_NULL_FLAG_POS_MASK);
 }
 
+/* Offset of NULL bit in 32-bit words. */
 inline
 Uint32
 AttributeOffset::getNullFlagOffset(const Uint32 & desc)
 {
   return (getNullFlagPos(desc) >> AO_NULL_FLAG_OFFSET_SHIFT);
+}
+
+/* Offset of NULL bit in bytes. */
+inline
+Uint32
+AttributeOffset::getNullFlagByteOffset(const Uint32 & desc)
+{
+  return (getNullFlagPos(desc) >> AO_NULL_FLAG_BYTE_OFFSET_SHIFT);
 }
 
 inline

--- 1.65/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-04-03 14:34:27 +02:00
+++ 1.66/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-04-03 14:34:27 +02:00
@@ -27,6 +27,9 @@
 #include <signaldata/DropTrig.hpp>
 #include <signaldata/TrigAttrInfo.hpp>
 #include <signaldata/BuildIndx.hpp>
+#include <signaldata/AlterTab.hpp>
+#include <AttributeDescriptor.hpp>
+#include "AttributeOffset.hpp"
 #include "Undo_buffer.hpp"
 #include "tuppage.hpp"
 #include <../pgman.hpp>
@@ -263,6 +266,7 @@
 
 #define ZINVALID_CHAR_FORMAT 744
 #define ZROWID_ALLOCATED 899
+#define ZINVALID_ALTER_TAB 741
 
           /* SOME WORD POSITIONS OF FIELDS IN SOME HEADERS */
 
@@ -417,14 +421,28 @@
   Uint32 attributeCount;
   Uint32 charsetIndex;
   Uint32 m_null_bits[2];
-  Uint32 m_fix_attributes_size[2]; // In words
-  Uint32 m_var_attributes_size[2]; // In bytes
   BlockReference lqhBlockrefFrag;
   bool inUse;
   bool definingFragment;
 };
 typedef Ptr<Fragoperrec> FragoperrecPtr;
 
+  /* Operation record used during alter table. */
+  struct AlterTabOperation {
+    Uint32 nextAlterTabOp;
+    Uint32 newNoOfAttrs;
+    Uint32 noOfDynNullBits;
+    Uint32 noOfDynVar;
+    Uint32 noOfDynFix;
+    Uint32 noOfDynamic;
+    Uint32 tabDesOffset[7];
+    Uint32 desAllocSize;
+    Uint32 tableDescriptor;
+    Uint32 dynTabDesOffset[3];
+    Uint32 dynDesAllocSize;
+    Uint32 dynTableDescriptor;
+  };
+  typedef Ptr<AlterTabOperation> AlterTabOperationPtr;
 
   typedef Tup_page Page;
   typedef Ptr<Page> PagePtr;
@@ -952,6 +970,8 @@
   /* ************************************** */
   STATIC_CONST( MM = 0 );
   STATIC_CONST( DD = 1 );
+  STATIC_CONST( DYN_BM_LEN_BITS = 8 );
+  STATIC_CONST( DYN_BM_LEN_MASK = ((1 << DYN_BM_LEN_BITS) - 1));
   
   struct Tablerec {
     Tablerec(ArrayPool<TupTriggerData> & triggerPool) : 
@@ -968,12 +988,48 @@
     Bitmask<MAXNROFATTRIBUTESINWORDS> notNullAttributeMask;
     Bitmask<MAXNROFATTRIBUTESINWORDS> blobAttributeMask;
     
+    /*
+      Extra table descriptor for dynamic attributes, or RNIL if none.
+      The size of this depends on actual column definitions, so it is allocated
+      _after_ seeing all columns, hence must be separate from the readKeyArray
+      et al descriptor, which is allocated before seeing columns.
+    */
+    Uint32 dynTabDescriptor;
+
+    /* Mask of variable-sized dynamic attributes. */
+    Uint32* dynVarSizeMask;
+    /*
+      Mask of fixed-sized dynamic attributes. There is one bit set for each
+      32-bit word occupied by fixed-size attributes, so fixed-size dynamic
+      attributes >32bit have multiple bits here.
+    */
+    Uint32* dynFixSizeMask;
+
     ReadFunction* readFunctionArray;
     UpdateFunction* updateFunctionArray;
     CHARSET_INFO** charsetArray;
     
     Uint32 readKeyArray;
+    /*
+      Offset into Dbtup::tableDescriptor of the start of the descriptor
+      words for each attribute.
+      For attribute i, the AttributeDescriptor word is stored at index
+      Tablerec::tabDescriptor+i*ZAD_SIZE, and the AttributeOffset word at
+      index Tablerec::tabDescriptor+i*ZAD_SIZE+1.
+    */
     Uint32 tabDescriptor;
+    /*
+      Offset into Dbtup::tableDescriptor of memory used as an array of Uint16.
+
+      The values stored are offsets from Tablerec::tabDescriptor first for all
+      fixed-sized static attributes, then static varsized attributes, then
+      dynamic fixed-size, then dynamic varsized, and finally disk-stored fixed
+      size:
+              [mm_fix mm_var mm_dynfix mm_dynvar dd_fix]
+      This is used to find the AttributeDescriptor and AttributeOffset words
+      for an attribute. For example, the offset for the second dynamic
+      fixed-size attribute is at index <num fixed> + <num varsize> + 1.
+    */
     Uint32 m_real_order_descriptor;
     
     enum Bits
@@ -991,6 +1047,7 @@
     Uint16 m_no_of_disk_attributes;
     Uint16 noOfKeyAttr;
     Uint16 noOfCharsets;
+    Uint16 m_dyn_null_bits;
 
     bool need_expand() const { 
       return m_no_of_attributes > m_attributes[MM].m_no_of_fixsize;
@@ -998,19 +1055,22 @@
 
     bool need_expand(bool disk) const { 
       return m_attributes[MM].m_no_of_varsize > 0 ||
+        m_attributes[MM].m_no_of_dynamic > 0 ||
 	(disk && m_no_of_disk_attributes > 0);
     }
     
     bool need_shrink() const {
       return 
 	m_attributes[MM].m_no_of_varsize > 0 ||
+        m_attributes[MM].m_no_of_dynamic > 0 ||
 	m_attributes[DD].m_no_of_varsize > 0;
     }
     
     bool need_shrink(bool disk) const {
       return 
 	m_attributes[MM].m_no_of_varsize > 0 ||
-	(disk && m_attributes[DD].m_no_of_varsize > 0);
+	m_attributes[MM].m_no_of_dynamic > 0 ||
+        (disk && m_attributes[DD].m_no_of_varsize > 0);
     }
 
     /**
@@ -1025,11 +1085,14 @@
 	Uint16 m_fix_header_size; // For fix size tuples= total rec size(part)
       };
       Uint16 m_max_var_offset;  // In bytes relative m_var_data.m_data_ptr
+      Uint16 m_max_dyn_offset;  // In bytes relative m_var_data.m_dyn_data_ptr
+      Uint16 m_dyn_null_words;  // 32-bit words in dynattr bitmap
     } m_offsets[2];
     
 
     Uint32 get_check_offset(Uint32 mm) const {
-      Uint32 cnt= m_attributes[mm].m_no_of_varsize;
+      Uint32 cnt=
+        m_attributes[mm].m_no_of_varsize + m_attributes[mm].m_no_of_dynamic;
       Uint32 off= m_offsets[mm].m_varpart_offset;
       return off - (cnt ? 0 : Tuple_header::HeaderSize);
     }
@@ -1037,6 +1100,13 @@
     struct {
       Uint16 m_no_of_fixsize;
       Uint16 m_no_of_varsize;
+      Uint16 m_no_of_dynamic;                   // Total no. of dynamic attrs
+      Uint16 m_no_of_dyn_fix;                   // No. of fixsize dynamic
+      Uint16 m_no_of_dyn_var;                   // No. of varsize dynamic
+      /*
+        Note that due to bit types, we may have
+            m_no_of_dynamic > m_no_of_dyn_fix + m_no_of_dyn_var
+      */
     } m_attributes[2];
     
     // Lists of trigger data for active triggers
@@ -1061,6 +1131,12 @@
     State tableStatus;
   };  
 
+  /*
+    It is more space efficient to store dynamic fixed-size attributes
+    of more than about 16 words as variable-sized internally.
+   */
+  STATIC_CONST(InternalMaxDynFix= 16);
+
   struct Disk_undo 
   {
     enum 
@@ -1335,9 +1411,17 @@
 
     STATIC_CONST( HeaderSize = 2 );
     
-    /**
-     * header bits
-     */
+    /*
+     Header bits.
+
+     MM_GROWN: When a tuple is updated to a bigger size, the original varpart
+     of the tuple is immediately re-allocated to a location with sufficient
+     size for the new data (but containing only the original smaller-sized
+     data). This is so that commit can be sure to find room for the extra
+     data. In the case of abort, the varpart must then be shrunk. For a
+     MM_GROWN tuple, the original size is stored in the last word of the
+     varpart until commit.
+    */
     STATIC_CONST( TUP_VERSION_MASK = 0xFFFF );
     STATIC_CONST( CHAINED_ROW = 0x00010000 ); // Is var part on different page
     STATIC_CONST( DISK_PART   = 0x00020000 ); // Is there a disk part
@@ -1395,7 +1479,16 @@
       return m_data;
     }
   };
-  
+
+  /**
+   * Format of varpart after insert/update
+   */
+  struct Varpart_copy
+  {
+    Uint32 m_len;
+    Uint32 m_data[1]; // Only used for easy offset handling
+  };
+
 struct KeyReqStruct {
 /**
  * These variables are used as temporary storage during execution of the
@@ -1427,11 +1520,41 @@
   Uint32          attr_descriptor;
   bool            xfrm_flag;
 
+  /* Flag: is tuple in expanded or in shrunken/stored format? */
+  bool is_expanded;
+
   struct Var_data {
+    /*
+      These are the pointers and offsets to the variable-sized part of the row
+      (static part, alwways stored even if NULL). They are used both for
+      expanded and shrunken form, with different values to allow using the
+      same read/update code for both forms.
+    */
     char *m_data_ptr;
     Uint16 *m_offset_array_ptr;
     Uint16 m_var_len_offset;
     Uint16 m_max_var_offset;
+    Uint16 m_max_dyn_offset;
+
+    /* These are the pointers and offsets to the dynamic part of the row. */
+
+    /* Pointer to the start of the bitmap for the dynamic part of the row. */
+    char *m_dyn_data_ptr;
+    /* Number of 32-bit words in dynamic part (stored/shrunken format). */
+    Uint32 m_dyn_part_len;
+    /*
+      Pointer to array with one element for each dynamic attribute (both
+      variable and fixed size). Each value is the offset from the end of the
+      bitmap to the start of the data for that attribute.
+    */
+    Uint16 *m_dyn_offset_arr_ptr;
+    /*
+      Offset from m_dyn_offset_array_ptr of array with one element for each
+      dynamic attribute. Each value is the offset to the end of data for that
+      attribute, so the difference to m_dyn_offset_array_ptr elements provides
+      the data lengths.
+    */
+    Uint16 m_dyn_len_offset;
   } m_var_data[2];
 
   Tuple_header *m_disk_ptr;
@@ -1956,6 +2079,12 @@
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
+  bool fixsize_updater(Uint32* inBuffer,
+                       KeyReqStruct *req_struct,
+                       Uint32  attrDes2,
+                       Uint32 *dst_ptr,
+                       Uint32 updateOffset,
+                       Uint32 checkOffset);
   bool updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
                                         KeyReqStruct *req_struct,
                                         Uint32  attrDes2);
@@ -2007,6 +2136,20 @@
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
+  bool varsize_reader(Uint32* out_buffer,
+                      KeyReqStruct *req_struct,
+                      AttributeHeader* ah_out,
+                      Uint32  attr_des2,
+                      char * src_ptr,
+                      Uint32 vsize_in_bytes);
+  bool varsize_updater(Uint32* in_buffer,
+                       KeyReqStruct *req_struct,
+                       char *var_data_start,
+                       Uint32 var_attr_pos,
+                       Uint16 *len_offset_ptr,
+                       Uint32 check_offset);
+//------------------------------------------------------------------
+//------------------------------------------------------------------
   bool readVarSizeNotNULL(Uint32* outBuffer,
                           KeyReqStruct *req_struct,
                           AttributeHeader* ahOut,
@@ -2033,29 +2176,147 @@
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  bool readDynFixedSize(Uint32* outBuffer,
-                        KeyReqStruct *req_struct,
-                        AttributeHeader* ahOut,
-                        Uint32  attrDes2);
+  bool readDynFixedSizeNotNULL(Uint32* outBuffer,
+                               KeyReqStruct *req_struct,
+                               AttributeHeader* ahOut,
+                               Uint32  attrDes2);
+  bool readDynFixedSizeNULLable(Uint32* outBuffer,
+                                KeyReqStruct *req_struct,
+                                AttributeHeader* ahOut,
+                                Uint32  attrDes2);
+  bool readDynFixedSizeExpandedNotNULL(Uint32* outBuffer,
+                                       KeyReqStruct *req_struct,
+                                       AttributeHeader* ahOut,
+                                       Uint32  attrDes2);
+  bool readDynFixedSizeShrunkenNotNULL(Uint32* outBuffer,
+                                       KeyReqStruct *req_struct,
+                                       AttributeHeader* ahOut,
+                                       Uint32  attrDes2);
+  bool readDynFixedSizeExpandedNULLable(Uint32* outBuffer,
+                                        KeyReqStruct *req_struct,
+                                        AttributeHeader* ahOut,
+                                        Uint32  attrDes2);
+  bool readDynFixedSizeShrunkenNULLable(Uint32* outBuffer,
+                                        KeyReqStruct *req_struct,
+                                        AttributeHeader* ahOut,
+                                        Uint32  attrDes2);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
+  bool updateDynFixedSizeNotNULL(Uint32* inBuffer,
+                                 KeyReqStruct *req_struct,
+                                 Uint32  attrDes2);
+  bool updateDynFixedSizeNULLable(Uint32* inBuffer,
+                                  KeyReqStruct *req_struct,
+                                  Uint32  attrDes2);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  bool updateDynFixedSize(Uint32* inBuffer,
+  bool readDynBigFixedSizeNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct *req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32  attrDes2);
+  bool readDynBigFixedSizeNULLable(Uint32* outBuffer,
+                                   KeyReqStruct *req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32  attrDes2);
+  bool readDynBigFixedSizeExpandedNotNULL(Uint32* outBuffer,
+                                          KeyReqStruct *req_struct,
+                                          AttributeHeader* ahOut,
+                                          Uint32  attrDes2);
+  bool readDynBigFixedSizeShrunkenNotNULL(Uint32* outBuffer,
+                                          KeyReqStruct *req_struct,
+                                          AttributeHeader* ahOut,
+                                          Uint32  attrDes2);
+  bool readDynBigFixedSizeExpandedNULLable(Uint32* outBuffer,
+                                           KeyReqStruct *req_struct,
+                                           AttributeHeader* ahOut,
+                                           Uint32  attrDes2);
+  bool readDynBigFixedSizeShrunkenNULLable(Uint32* outBuffer,
+                                           KeyReqStruct *req_struct,
+                                           AttributeHeader* ahOut,
+                                           Uint32  attrDes2);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
+  bool updateDynBigFixedSizeNotNULL(Uint32* inBuffer,
+                                    KeyReqStruct *req_struct,
+                                    Uint32  attrDes2);
+  bool updateDynBigFixedSizeNULLable(Uint32* inBuffer,
+                                     KeyReqStruct *req_struct,
+                                     Uint32  attrDes2);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
+  bool readDynBitsNotNULL(Uint32* outBuffer,
                           KeyReqStruct *req_struct,
+                          AttributeHeader* ahOut,
                           Uint32  attrDes2);
+  bool readDynBitsNULLable(Uint32* outBuffer,
+                           KeyReqStruct *req_struct,
+                           AttributeHeader* ahOut,
+                           Uint32  attrDes2);
+  bool readDynBitsExpandedNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct *req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32  attrDes2);
+  bool readDynBitsShrunkenNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct *req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32  attrDes2);
+  bool readDynBitsExpandedNULLable(Uint32* outBuffer,
+                                   KeyReqStruct *req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32  attrDes2);
+  bool readDynBitsShrunkenNULLable(Uint32* outBuffer,
+                                   KeyReqStruct *req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32  attrDes2);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  bool readDynVarSize(Uint32* outBuffer,
-                      KeyReqStruct *req_struct,
-                      AttributeHeader* ahOut,
-                      Uint32  attrDes2);
+  bool updateDynBitsNotNULL(Uint32* inBuffer,
+                            KeyReqStruct *req_struct,
+                            Uint32  attrDes2);
+  bool updateDynBitsNULLable(Uint32* inBuffer,
+                             KeyReqStruct *req_struct,
+                             Uint32  attrDes2);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  bool updateDynVarSize(Uint32* inBuffer,
-                        KeyReqStruct *req_struct,
-                        Uint32  attrDes2);
+  bool readDynVarSizeNotNULL(Uint32* outBuffer,
+                             KeyReqStruct *req_struct,
+                             AttributeHeader* ahOut,
+                             Uint32  attrDes2);
+  bool readDynVarSizeNULLable(Uint32* outBuffer,
+                              KeyReqStruct *req_struct,
+                              AttributeHeader* ahOut,
+                              Uint32  attrDes2);
+  bool readDynVarSizeExpandedNotNULL(Uint32* outBuffer,
+                                     KeyReqStruct *req_struct,
+                                     AttributeHeader* ahOut,
+                                     Uint32  attrDes2);
+  bool readDynVarSizeShrunkenNotNULL(Uint32* outBuffer,
+                                     KeyReqStruct *req_struct,
+                                     AttributeHeader* ahOut,
+                                     Uint32  attrDes2);
+  bool readDynVarSizeExpandedNULLable(Uint32* outBuffer,
+                                      KeyReqStruct *req_struct,
+                                      AttributeHeader* ahOut,
+                                      Uint32  attrDes2);
+  bool readDynVarSizeShrunkenNULLable(Uint32* outBuffer,
+                                      KeyReqStruct *req_struct,
+                                      AttributeHeader* ahOut,
+                                      Uint32  attrDes2);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
+  bool updateDynVarSizeNotNULL(Uint32* inBuffer,
+                               KeyReqStruct *req_struct,
+                               Uint32  attrDes2);
+  bool updateDynVarSizeNULLable(Uint32* inBuffer,
+                                KeyReqStruct *req_struct,
+                                Uint32  attrDes2);
 
   bool readCharNotNULL(Uint32* outBuffer,
                        KeyReqStruct *req_struct,
@@ -2103,6 +2364,21 @@
   bool updateDiskBitsNotNULL(Uint32*, KeyReqStruct*, Uint32);
 
 
+  /* Alter table methods. */
+  void handleAlterTabPrepare(Signal *signal, const Tablerec *regTabPtr);
+  void sendAlterTabRef(Signal *signal, AlterTabReq *req, Uint32 errorCode);
+  void sendAlterTabConf(Signal *, AlterTabReq *, Uint32 clientData=RNIL);
+  void handleAlterTableCommit(Signal *signal,
+                              AlterTabOperationPtr regAlterTabOpPtr,
+                              Tablerec *regTabPtr);
+  void handleAlterTableAbort(Signal *signal,
+                             AlterTabOperationPtr regAlterTabOpPtr,
+                             Tablerec *regTabPtr);
+  void handleCharsetPos(Uint32 csNumber, CHARSET_INFO** charsetArray,
+                        Uint32 noOfCharsets,
+                        Uint32 & charsetIndex, Uint32 & attrDes2);
+  void computeTableMetaData(Tablerec *regTabPtr);
+
 //------------------------------------------------------------------
 //------------------------------------------------------------------
   bool nullFlagCheck(KeyReqStruct *req_struct, Uint32  attrDes2);
@@ -2111,6 +2387,16 @@
                      KeyReqStruct *req_struct,
                      Uint32* outBuffer);
 
+  /* Fast bit counting (16 instructions on x86_64, gcc -O3). */
+  static inline uint32_t count_bits(uint32_t x)
+  {
+    x= x - ((x>>1) & 0x55555555);
+    x= (x & 0x33333333) + ((x>>2) & 0x33333333);
+    x= (x + (x>>4)) & 0x0f0f0f0f;
+    x= (x*0x01010101) >> 24;
+    return x;
+  }
+
 //------------------------------------------------------------------
 //------------------------------------------------------------------
   void setUpQueryRoutines(Tablerec* regTabPtr);
@@ -2438,6 +2724,9 @@
   void setUpDescriptorReferences(Uint32 descriptorReference,
                                  Tablerec* regTabPtr,
                                  const Uint32* offset);
+  void setupDynDescriptorReferences(Uint32 dynDescr,
+                                    Tablerec* const regTabPtr,
+                                    const Uint32* offset);
   void setUpKeyArray(Tablerec* regTabPtr);
   bool addfragtotab(Tablerec* regTabPtr, Uint32 fragId, Uint32 fragIndex);
   void deleteFragTab(Tablerec* regTabPtr, Uint32 fragId);
@@ -2451,6 +2740,7 @@
   void initializeDiskBufferSegmentRecord();
   void initializeFragoperrec();
   void initializeFragrecord();
+  void initializeAlterTabOperation();
   void initializeHostBuffer();
   void initializeLocalLogInfo();
   void initializeOperationrec();
@@ -2515,8 +2805,9 @@
 //-----------------------------------------------------------------------------
 
 // Public methods
-  Uint32 getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset);
-  Uint32 allocTabDescr(const Tablerec* regTabPtr, Uint32* offset);
+  Uint32 getTabDescrOffsets(Uint32, Uint32, Uint32, Uint32*);
+  Uint32 getDynTabDescrOffsets(Uint32 MaskSize, Uint32* offset);
+  Uint32 allocTabDescr(Uint32 allocSize);
   void freeTabDescr(Uint32 retRef, Uint32 retNo, bool normal = true);
   Uint32 getTabDescrWord(Uint32 index);
   void setTabDescrWord(Uint32 index, Uint32 word);
@@ -2533,8 +2824,11 @@
   void seizeOpRec(OperationrecPtr& regOperPtr);
   void seizeFragrecord(FragrecordPtr& regFragPtr);
   void seizeFragoperrec(FragoperrecPtr& fragOperPtr);
+  void seizeAlterTabOperation(AlterTabOperationPtr& alterTabOpPtr);
   void releaseFragoperrec(FragoperrecPtr fragOperPtr);
   void releaseFragrec(FragrecordPtr);
+  void releaseAlterTabOpRec(AlterTabOperationPtr regAlterTabOpPtr);
+
 //----------------------------------------------------------------------------
 // Page Memory Manager
 //----------------------------------------------------------------------------
@@ -2614,8 +2908,8 @@
   Uint32* alloc_var_rec(Fragrecord*, Tablerec*, Uint32, Local_key*, Uint32*);
   void free_var_rec(Fragrecord*, Tablerec*, Local_key*, Ptr<Page>);
   Uint32* alloc_var_part(Fragrecord*, Tablerec*, Uint32, Local_key*);
-  int realloc_var_part(Fragrecord*, Tablerec*, 
-		       PagePtr, Var_part_ref*, Uint32, Uint32);
+  Uint32 *realloc_var_part(Fragrecord*, Tablerec*, 
+                           PagePtr, Var_part_ref*, Uint32, Uint32);
   
   void validate_page(Tablerec*, Var_page* page);
   
@@ -2663,6 +2957,10 @@
   Uint32 cfirstfreefrag;
   Uint32 cnoOfFragrec;
 
+  AlterTabOperation *alterTabOperRec;
+  Uint32 cfirstfreeAlterTabOp;
+  Uint32 cnoOfAlterTabOps;
+
   HostBuffer *hostBuffer;
 
   ArrayPool<Operationrec> c_operation_pool;
@@ -2732,6 +3030,7 @@
   Uint32* get_ptr(PagePtr*, Var_part_ref);
   Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*);
   Uint32* get_dd_ptr(PagePtr*, const Local_key*, const Tablerec*);
+  Uint32 get_len(Ptr<Page>* pagePtr, Var_part_ref ref);
 
   /**
    * prealloc space from disk
@@ -2853,6 +3152,9 @@
    *   req_struct->m_tuple_ptr is set to tuple to read
    */
   void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
+
+  /* For debugging, dump the contents of a tuple. */
+  void dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP);
 };
 
 #if 0
@@ -2986,11 +3288,24 @@
   tmp.p= (Page*)m_global_page_pool.getPtr(tmp.i);
   memcpy(pagePtr, &tmp, sizeof(tmp));
   
-  if(regTabPtr->m_attributes[DD].m_no_of_varsize)
+  if(regTabPtr->m_attributes[DD].m_no_of_varsize ||
+     regTabPtr->m_attributes[DD].m_no_of_dynamic)
     return ((Var_page*)tmp.p)->get_ptr(key->m_page_idx);
   else
     return ((Fix_page*)tmp.p)->
       get_ptr(key->m_page_idx, regTabPtr->m_offsets[DD].m_fix_header_size);
+}
+
+/*
+  This function assumes that get_ptr() has been called first to
+  initialise the pagePtr argument.
+*/
+inline
+Uint32
+Dbtup::get_len(Ptr<Page>* pagePtr, Var_part_ref ref)
+{
+  Uint32 page_idx= ref.m_page_idx;
+  return ((Var_page*)pagePtr->p)->get_entry_len(page_idx);
 }
 
 NdbOut&

--- 1.19/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2007-04-03 14:34:27 +02:00
+++ 1.20/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2007-04-03 14:34:27 +02:00
@@ -141,30 +141,38 @@
     Uint32 copy_bits= copy->m_header_bits;
     if(! (bits & Tuple_header::ALLOC))
     {
-      if(copy_bits & Tuple_header::MM_GROWN)
+      if(bits & Tuple_header::MM_GROWN)
       {
-	if (0) ndbout_c("abort grow");
-	Ptr<Page> vpage;
-	Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx;
-	Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
-	Uint32 *var_part;
+        if (0) ndbout_c("abort grow");
+        Ptr<Page> vpage;
+        Uint32 idx= regOperPtr.p->m_tuple_location.m_page_idx;
+        Uint32 mm_vars= regTabPtr.p->m_attributes[MM].m_no_of_varsize;
+        Uint32 *var_part;
 
-	ndbassert(tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW);
+        ndbassert(tuple_ptr->m_header_bits & Tuple_header::CHAINED_ROW);
 	
-	Var_part_ref *ref = 
-	  (Var_part_ref*)tuple_ptr->get_var_part_ptr(regTabPtr.p);
+        Var_part_ref *ref = 
+          (Var_part_ref*)tuple_ptr->get_var_part_ptr(regTabPtr.p);
 
-	Local_key tmp; 
-	ref->copyout(&tmp);
+        Local_key tmp; 
+        ref->copyout(&tmp);
 	
-	idx= tmp.m_page_idx;
-	var_part= get_ptr(&vpage, *ref);
-	Var_page* pageP = (Var_page*)vpage.p;
-	Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
-	Uint32 sz = ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
-	ndbassert(sz <= len);
-	pageP->shrink_entry(idx, sz);
-	update_free_page_list(regFragPtr.p, vpage);
+        idx= tmp.m_page_idx;
+        var_part= get_ptr(&vpage, *ref);
+        Var_page* pageP = (Var_page*)vpage.p;
+        Uint32 len= pageP->get_entry_len(idx) & ~Var_page::CHAIN;
+
+        /*
+          A MM_GROWN tuple was relocated with a bigger size in preparation for
+          commit, so we need to shrink it back. The original size is stored in
+          the last word of the relocated (oversized) tuple.
+        */
+        ndbassert(len > 0);
+        Uint32 sz= var_part[len-1];
+        ndbassert(sz < len);
+        pageP->shrink_entry(idx, sz);
+        update_free_page_list(regFragPtr.p, vpage);
+        tuple_ptr->m_header_bits= bits & ~Tuple_header::MM_GROWN;
       } 
       else if(bits & Tuple_header::MM_SHRINK)
       {

--- 1.26/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-04-03 14:34:27 +02:00
+++ 1.27/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2007-04-03 14:34:27 +02:00
@@ -58,7 +58,8 @@
       return;
     }
     
-    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
+    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize +
+        regTabPtr.p->m_attributes[MM].m_no_of_dynamic)
     {
       jam();
       free_var_rec(regFragPtr.p, regTabPtr.p, &tmp, pagePtr);
@@ -204,6 +205,23 @@
   }
 }
 
+static void dump_buf_hex(unsigned char *p, Uint32 bytes)
+{
+  char buf[3001];
+  char *q= buf;
+  buf[0]= '\0';
+
+  for(Uint32 i=0; i<bytes; i++)
+  {
+    if(i==((sizeof(buf)/3)-1))
+    {
+      sprintf(q, "...");
+      break;
+    }
+    sprintf(q+3*i, " %02X", p[i]);
+  }
+  ndbout_c("%8p: %s", p, buf);
+}
 void
 Dbtup::commit_operation(Signal* signal,
 			Uint32 gci,
@@ -227,7 +245,8 @@
 
   Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
-  if(mm_vars == 0)
+  Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
+  if((mm_vars+mm_dyns) == 0)
   {
     memcpy(tuple_ptr, copy, 4*fixsize);
     disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
@@ -243,21 +262,28 @@
     PagePtr vpagePtr;
     Uint32 *dst= get_ptr(&vpagePtr, *ref);
     Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
-    Uint32 *src= copy->get_var_part_ptr(regTabPtr);
-    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
-    ndbassert(4*vpagePtrP->get_entry_len(tmp.m_page_idx) >= sz);
-    memcpy(dst, src, sz);
-
+    Varpart_copy* vp = (Varpart_copy*)copy->get_var_part_ptr(regTabPtr);
+    ndbassert(!(copy_bits & Tuple_header::CHAINED_ROW));
+    /* The first word of shrunken tuple holds the lenght in words. */
+    Uint32 len = vp->m_len;
+    memcpy(dst, vp->m_data, 4*len);
     copy_bits |= Tuple_header::CHAINED_ROW;
     
     if(copy_bits & Tuple_header::MM_SHRINK)
     {
-      vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+      ndbassert(vpagePtrP->get_entry_len(tmp.m_page_idx) >= len);
+      vpagePtrP->shrink_entry(tmp.m_page_idx, len);
       update_free_page_list(regFragPtr, vpagePtr);
-    } 
+    }
+    else
+    {
+      ndbassert(vpagePtrP->get_entry_len(tmp.m_page_idx) == len);
+    }
     
-    disk_ptr = (Tuple_header*)
-      (((Uint32*)copy)+Tuple_header::HeaderSize+fixsize+((sz + 3) >> 2));
+    /*
+      Find disk part after header + fixed MM part + length word + varsize part.
+    */
+    disk_ptr = (Tuple_header*)(vp->m_data + len);
   } 
   
   if (regTabPtr->m_no_of_disk_attributes &&
@@ -619,6 +645,7 @@
   
   if(regOperPtr.p->is_last_operation())
   {
+    jam();
     /**
      * Perform "real" commit
      */
@@ -627,12 +654,14 @@
     
     if(regOperPtr.p->op_struct.op_type != ZDELETE)
     {
+      jam();
       commit_operation(signal, gci, tuple_ptr, page,
 		       regOperPtr.p, regFragPtr.p, regTabPtr.p); 
       removeActiveOpList(regOperPtr.p, tuple_ptr);
     }
     else
     {
+      jam();
       removeActiveOpList(regOperPtr.p, tuple_ptr);
       if (get_page)
 	ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);

--- 1.64/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-04-03 14:34:27 +02:00
+++ 1.65/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2007-04-03 14:34:27 +02:00
@@ -29,6 +29,31 @@
 #include <signaldata/AttrInfo.hpp>
 #include <NdbSqlUtil.hpp>
 
+/* For debugging */
+static void
+dump_hex(const Uint32 *p, Uint32 len)
+{
+  if(len > 2560)
+    len= 160;
+  if(len==0)
+    return;
+  for(;;)
+  {
+    if(len>=4)
+      ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
+    else if(len>=3)
+      ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
+    else if(len>=2)
+      ndbout_c("%8p %08X %08X", p, p[0], p[1]);
+    else
+      ndbout_c("%8p %08X", p, p[0]);
+    if(len <= 4)
+      break;
+    len-= 4;
+    p+= 4;
+  }
+}
+
 /* ----------------------------------------------------------------- */
 /* -----------       INIT_STORED_OPERATIONREC         -------------- */
 /* ----------------------------------------------------------------- */
@@ -221,7 +246,8 @@
   // includes tupVersion
   //printf("%p - ", tuple_ptr);
   
-  if (regTabPtr->m_attributes[MM].m_no_of_varsize)
+  if (regTabPtr->m_attributes[MM].m_no_of_varsize +
+      regTabPtr->m_attributes[MM].m_no_of_dynamic)
     rec_size += Tuple_header::HeaderSize;
   
   for (i= 0; i < rec_size-2; i++) {
@@ -1102,29 +1128,73 @@
   req_struct->attr_descr= tab_descr; 
   Uint16* order= (Uint16*)&tableDescriptor[order_desc];
 
-  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
-  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+  const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
+  const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+  const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
+  const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
   Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
 
-  if(cnt1)
+  if(mm_vars || mm_dyns)
   {
+    /* Prepare empty varsize part. */
     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
-    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
+    /* Reserve room for length word. */
+    ptr++;
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
     dst->m_offset_array_ptr= req_struct->var_pos_array;
-    dst->m_var_len_offset= cnt1;
+    dst->m_var_len_offset= mm_vars;
     dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
-    // Disk part is 32-bit aligned
+    // Disk/dynamic part is 32-bit aligned
     ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
     order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
     Uint32 pos= 0;
     Uint16 *pos_ptr = req_struct->var_pos_array;
-    Uint16 *len_ptr = pos_ptr + cnt1;
-    for(Uint32 i= 0; i<cnt1; i++)
+    Uint16 *len_ptr = pos_ptr + mm_vars;
+    for(Uint32 i= 0; i<mm_vars; i++)
     {
       * pos_ptr++ = pos;
       * len_ptr++ = pos;
       pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
     }
+    ndbassert(ptr==ALIGN_WORD(dst->m_data_ptr+pos));
+
+    /* Prepare empty dynamic part. */
+    dst->m_dyn_data_ptr= (char *)ptr;
+    ndbassert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+
+    /* Zero out the bitmap. */
+    Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
+    bzero(ptr, bm_size_in_bytes);
+    * ptr = bm_size_in_bytes >> 2; // Store len, for easy access
+
+    /* Set up the offsets for the attribute data. */
+    dst->m_dyn_offset_arr_ptr= len_ptr;
+    dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
+    dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
+    pos_ptr= len_ptr;
+    len_ptr= pos_ptr+mm_dynvar+mm_dynfix;
+    /* Reserve room for bitmap + shrunken offset array + padding. */
+    pos= bm_size_in_bytes + 4*((mm_dynvar+2)>>1);
+    for(Uint32 i= 0; i<mm_dynvar; i++)
+    {
+      * pos_ptr++ = pos;
+      * len_ptr++ = pos;
+      Uint32 attrDes= tab_descr[order[i+mm_dynfix]].tabDescr;
+      pos += (AttributeDescriptor::getSizeInWords(attrDes) << 2);
+    }
+    /* Fixed part is stored 32-bit aligned, from the end of the row back. */
+    ndbassert((pos&3)==0);
+    for(Uint32 i= mm_dynfix; i>0; )
+    {
+      i--;
+      pos_ptr[i]= pos;
+      Uint32 attrDes= tab_descr[order[i]].tabDescr;
+      pos+= (AttributeDescriptor::getSizeInBytes(attrDes)+3)&~3;
+      /* len offset array is not used for fixed-size attributes. */
+    }
+    ndbassert((pos&3)==0);
+    ptr+= (pos>>2);
   } 
   else
   {
@@ -1133,13 +1203,13 @@
 
   req_struct->m_disk_ptr= (Tuple_header*)ptr;
   
-  if(cnt2)
+  if(dd_vars)
   {
     KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
     ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
-    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
-    dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
-    dst->m_var_len_offset= cnt2;
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+dd_vars+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array + (mm_vars << 1);
+    dst->m_var_len_offset= dd_vars;
     dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
   }
   
@@ -1169,7 +1239,9 @@
   bool disk = regTabPtr->m_no_of_disk_attributes > 0;
   bool mem_insert = regOperPtr.p->is_first_operation();
   bool disk_insert = mem_insert && disk;
-  bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
+  bool vardynsize =
+    regTabPtr->m_attributes[MM].m_no_of_varsize + 
+    regTabPtr->m_attributes[MM].m_no_of_dynamic;
   bool rowid = req_struct->m_use_rowid;
   Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
   Uint32 frag_page_id = req_struct->frag_page_id;
@@ -1294,7 +1366,7 @@
 	goto mem_error;
       }
 
-      if (!varsize)
+      if (!vardynsize)
       {
 	jam();
 	ptr= alloc_fix_rec(regFragPtr,
@@ -1326,7 +1398,7 @@
 	goto alloc_rowid_error;
       }
       
-      if (!varsize)
+      if (!vardynsize)
       {
 	jam();
 	ptr= alloc_fix_rowid(regFragPtr,
@@ -1358,7 +1430,7 @@
     base = (Tuple_header*)ptr;
     base->m_operation_ptr_i= regOperPtr.i;
     base->m_header_bits= Tuple_header::ALLOC | 
-      (varsize ? Tuple_header::CHAINED_ROW : 0);
+      (vardynsize ? Tuple_header::CHAINED_ROW : 0);
     regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
   }
   else 
@@ -2468,6 +2540,7 @@
  * tabDesc     array of attribute descriptors (used for getting max size)
  * no_of_attr  no of atributes to expand
  */
+static
 Uint32*
 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst, 
 		const Uint32* src, 
@@ -2502,6 +2575,120 @@
   return ALIGN_WORD(dst_ptr);
 }
 
+/*
+  expand_dyn_part - copy dynamic attributes to fully expanded size.
+
+  Both variable-sized and fixed-size attributes are stored in the same way
+  in the expanded form as variable-sized attributes (in expand_var_part()).
+
+    dst         Destination for expanded data
+    tabPtrP     Table descriptor
+    src         Pointer to the start of dynamic bitmap in source row
+    row_len     Total number of 32-bit words in dynamic part of row
+    tabDesc     Array of table descriptors
+    order       Array of indexes into tabDesc, dynfix followed by dynvar
+*/
+static
+Uint32*
+expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
+		const Uint32* src,
+		Uint32 row_len,
+		const Uint32 * tabDesc,
+		const Uint16* order,
+		Uint32 mm_dynvar,
+		Uint32 mm_dynfix,
+		Uint32 max_bmlen)
+{
+  /* Copy the bitmap, zeroing out any words not stored in the row. */
+  Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
+  Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
+  
+  assert(bm_len <= max_bmlen);
+
+  if(bm_len > 0)
+    memcpy(dst_bm_ptr, src, 4*bm_len);
+  if(bm_len < max_bmlen)
+    bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
+
+  /**
+   * Store max_bmlen for homogen code in DbtupRoutines
+   */
+  Uint32 tmp = (* dst_bm_ptr);
+  * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
+
+  char *src_off_start= (char*)(src + bm_len);
+  assert((UintPtr(src_off_start)&3) == 0);
+  Uint16 *src_off_ptr= (Uint16*)src_off_start;
+
+  /*
+    Prepare the variable-sized dynamic attributes, copying out data from the
+    source row for any that are not NULL.
+  */
+  Uint32 no_attr= dst->m_dyn_len_offset;
+  Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
+  Uint16* dst_len_ptr= dst_off_ptr + no_attr;
+  Uint16 this_src_off= *src_off_ptr++;
+  /* We need to reserve room for the offsets written by shrink_tuple+padding. */
+  Uint16 dst_off= 4 * (max_bmlen + ((mm_dynvar+2)>>1));
+  char *dst_ptr= (char*)dst_bm_ptr + dst_off;
+  for(Uint32 i= 0; i<mm_dynvar; i++)
+  {
+    Uint16 j= order[mm_dynfix+i];
+    Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
+    Uint32 len;
+    Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
+    if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
+    {
+      Uint16 next_src_off= *src_off_ptr++;
+      len= next_src_off - this_src_off;
+      memcpy(dst_ptr, src_off_start+this_src_off, len);
+      this_src_off= next_src_off;
+    }
+    else
+    {
+      len= 0;
+    }
+    dst_off_ptr[i]= dst_off;
+    dst_len_ptr[i]= dst_off+len;
+    dst_off+= max_len;
+    dst_ptr+= max_len;
+  }
+  /*
+    The fixed-size data is stored 32-bit aligned after the variable-sized
+    data.
+  */
+  char *src_ptr= src_off_start+this_src_off;
+  src_ptr= (char *)(ALIGN_WORD(src_ptr));
+
+  /*
+    Prepare the fixed-size dynamic attributes, copying out data from the
+    source row for any that are not NULL.
+    Note that the fixed-size data is stored in reverse from the end of the
+    dynamic part of the row. This is true both for the stored/shrunken and
+    for the expanded form.
+  */
+  for(Uint32 i= mm_dynfix; i>0; )
+  {
+    i--;
+    Uint16 j= order[i];
+    Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+    dst_off_ptr[mm_dynvar+i]= dst_off;
+    /* len offset array is not used for fixed size. */
+    Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
+    if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
+    {
+      assert((UintPtr(dst_ptr)&3) == 0);
+      memcpy(dst_ptr, src_ptr, fix_size);
+      src_ptr+= fix_size;
+    }
+    dst_off+= fix_size;
+    dst_ptr+= fix_size;
+  }
+
+  return (Uint32 *)dst_ptr;
+}
+
+
 void
 Dbtup::expand_tuple(KeyReqStruct* req_struct, 
 		    Uint32 sizes[2],
@@ -2514,6 +2701,9 @@
   
   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+  Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
+  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
   Uint32 fix_size= tabPtrP->m_offsets[MM].m_varpart_offset;
   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
 
@@ -2524,39 +2714,77 @@
   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
   order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
   
-  if(mm_vars)
+  if(mm_vars || mm_dyns)
   {
-
+    // Copy fix part
+    memcpy(ptr, src, 4*(fix_size + Tuple_header::HeaderSize));
+    
+    Uint32 src_len;
     Uint32 step; // in bytes
-    const Uint32 *src_data= src_ptr;
+    const Uint32 *src_data;
     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
     if(bits & Tuple_header::CHAINED_ROW)
     {
+      /* This is for the initial expansion of a stored row. */
       Ptr<Page> var_page;
-      src_data= get_ptr(&var_page, * (Var_part_ref*)src_ptr);
-      step= 4;
-      sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
+      Var_part_ref *var_ref= (Var_part_ref*)src_ptr;
+      src_data= get_ptr(&var_page, *var_ref);
+      src_len= get_len(&var_page, *var_ref);
+      sizes[MM]= src_len;
+      /* If the original tuple was grown, the old size is stored at the end. */
+      if(bits & Tuple_header::MM_GROWN)
+      {
+	ndbrequire(false);
+        ndbassert(src_len>0);
+        src_len= src_data[src_len-1];
+      }
+      step= 1;
       req_struct->m_varpart_page_ptr = var_page;
     }
     else
     {
-      step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
-      sizes[MM]= (step + 3) >> 2;
+      /* This is for the re-expansion of a shrunken row (update2 ...) */
+
+      Varpart_copy* vp = (Varpart_copy*)src_ptr;
+      src_len = vp->m_len;
+      src_data= vp->m_data;
+      step= (1+src_len); // 1+ is for extra word
       req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
+      sizes[MM]= src_len;
     }
+    /*
+     * Reserve place for initial length word and offset array (with one extra
+     * offset). This will be filled-in in later, in shrink_tuple().
+     */
+    dst_ptr++;
     dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
     dst->m_offset_array_ptr= req_struct->var_pos_array;
     dst->m_var_len_offset= mm_vars;
     dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
-    
+
     dst_ptr= expand_var_part(dst, src_data, desc, order);
     ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
-    ndbassert((UintPtr(src_ptr) & 3) == 0);
-    src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
+
+    /**
+     * Now do dynpart
+     */
+    char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
+    Uint32 varlen = ((Uint16*)src_data)[mm_vars];
+    Uint32* dynstart = ALIGN_WORD(varstart + varlen);
+
+    dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
+    dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
+    dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
+    dst->m_dyn_data_ptr= (char*)dst_ptr;
+    dst_ptr= expand_dyn_part(dst, dynstart,
+			     src_len - (dynstart - src_data),
+			     desc, order + mm_vars,
+			     mm_dynvar, mm_dynfix,
+			     tabPtrP->m_offsets[MM].m_dyn_null_words);
     
-    sizes[MM] += fix_size + Tuple_header::HeaderSize;
-    memcpy(ptr, src, 4*(fix_size + Tuple_header::HeaderSize));
-  } 
+    ndbassert((UintPtr(src_ptr) & 3) == 0);
+    src_ptr = src_ptr + step;
+  }
   else 
   {
     sizes[MM]= 1;
@@ -2572,7 +2800,7 @@
   if(disk && dd_tot)
   {
     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
-    order += mm_vars;
+    order+= mm_vars+mm_dynvar+mm_dynfix;
     
     if(bits & Tuple_header::DISK_INLINE)
     {
@@ -2612,6 +2840,96 @@
   }
   
   ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
+  req_struct->is_expanded= true;
+}
+
+void
+Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
+{
+  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
+  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+  const Tuple_header* ptr= req_struct->m_tuple_ptr;
+  Uint32 bits= ptr->m_header_bits;
+  const Uint32 *tuple_words= (Uint32 *)ptr;
+  const Uint32 *fix_p;
+  Uint32 fix_len;
+  const Uint32 *var_p;
+  Uint32 var_len;
+  const Uint32 *disk_p;
+  Uint32 disk_len;
+  const char *typ;
+
+  fix_p= tuple_words;
+  fix_len= Tuple_header::HeaderSize+tabPtrP->m_offsets[MM].m_fix_header_size;
+  if(req_struct->is_expanded)
+  {
+    typ= "expanded";
+    var_p= ptr->get_var_part_ptr(tabPtrP);
+    var_len= 0;                                 // No dump of varpart in expanded
+#if 0
+    disk_p= (Uint32 *)req_struct->m_disk_ptr;
+    disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
+#endif
+  }
+  else if(bits & Tuple_header::CHAINED_ROW)
+  {
+    typ= "stored";
+    if(mm_vars+mm_dyns)
+    {
+      const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+      const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
+      Ptr<Page> tmp;
+      var_p= get_ptr(&tmp, * (Var_part_ref*)src_ptr);
+      var_len= get_len(&tmp, * (Var_part_ref*)src_ptr);
+    }
+    else
+    {
+      var_p= 0;
+      var_len= 0;
+    }
+#if 0
+    if(dd_tot)
+    {
+      Local_key key;
+      memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
+      key.m_page_no= req_struct->m_disk_page_ptr.i;
+      disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+      disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
+    }
+    else
+    {
+      disk_p= var_p;
+      disk_len= 0;
+    }
+#endif
+  }
+  else
+  {
+    typ= "shrunken";
+    if(mm_vars+mm_dyns)
+    {
+      var_p= ptr->get_var_part_ptr(tabPtrP);
+      var_len= *((Uint16 *)var_p) + 1;
+    }
+    else
+    {
+      var_p= 0;
+      var_len= 0;
+    }
+#if 0
+    disk_p= (Uint32 *)(req_struct->m_disk_ptr);
+    disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
+#endif
+  }
+  ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
+  dump_hex(fix_p, fix_len);
+  ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
+  dump_hex(var_p, var_len);
+#if 0
+  ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
+  dump_hex(disk_p, disk_len);
+#endif
 }
 
 void
@@ -2623,28 +2941,64 @@
   Uint32 bits= ptr->m_header_bits;
   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
   
   const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
   const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
   
-  if(mm_vars)
+  if(mm_vars || mm_dyns)
   {
     const Uint32 *src_data= src_ptr;
+    Uint32 src_len;
     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
     if(bits & Tuple_header::CHAINED_ROW)
     {
-#if VM_TRACE
-      
-#endif
-      src_data= get_ptr(* (Var_part_ref*)src_ptr);
+      Ptr<Page> tmp;
+      src_data= get_ptr(&tmp, * (Var_part_ref*)src_ptr);
+      src_len= get_len(&tmp, * (Var_part_ref*)src_ptr);
+      /* If the original tuple was grown, the old size is stored at the end. */
+      if(bits & Tuple_header::MM_GROWN)
+      {
+	/**
+	 * This is when triggers read before value of update
+	 *   when original has been reallocated due to grow
+	 */
+	ndbassert(src_len>0);
+	src_len= src_data[src_len-1];
+      }
     }
-    dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
+    else
+    {
+      Varpart_copy* vp = (Varpart_copy*)src_ptr;
+      src_len = vp->m_len;
+      src_data = vp->m_data;
+      src_ptr++;
+    }
+    
+    char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
+    Uint32 varlen = ((Uint16*)src_data)[mm_vars];
+    Uint32* dynstart = ALIGN_WORD(varstart + varlen);
+
+    dst->m_data_ptr= varstart;
     dst->m_offset_array_ptr= (Uint16*)src_data;
     dst->m_var_len_offset= 1;
-    dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
+    dst->m_max_var_offset= varlen;
+
+    Uint32 dynlen = src_len - (dynstart - src_data);
+    dst->m_dyn_data_ptr= (char*)dynstart;
+    dst->m_dyn_part_len= dynlen;
+    // Do or not to to do
+    // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
+
+
     
-    // disk part start after varsize (aligned)
-    src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
+    /*
+      dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
+      reading the stored/shrunken format.
+    */
+    
+    // disk part start after dynamic part.
+    src_ptr+= src_len;
   } 
   else
   {
@@ -2683,6 +3037,8 @@
       dst->m_max_var_offset= ((Uint16*)src_ptr)[dd_vars];
     }
   }
+
+  req_struct->is_expanded= false;
 }
 
 void
@@ -2691,17 +3047,29 @@
 {
   ndbassert(tabPtrP->need_shrink());
   Tuple_header* ptr= req_struct->m_tuple_ptr;
+  ndbassert(!(ptr->m_header_bits & Tuple_header::CHAINED_ROW));
   
+  KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+  Uint32 order_desc= tabPtrP->m_real_order_descriptor;
+  const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
+  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+  Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
+  Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
+  Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
   Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
   
   Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
   Uint16* src_off_ptr= req_struct->var_pos_array;
 
   sizes[MM]= sizes[DD]= 0;
-  if(mm_vars)
+  if(mm_vars || mm_dyns)
   {
+    Varpart_copy* vp = (Varpart_copy*)dst_ptr;
+    dst_ptr = vp->m_data;
+
     Uint16* dst_off_ptr= (Uint16*)dst_ptr;
     char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
     char*  src_data_ptr= dst_data_ptr;
@@ -2717,11 +3085,134 @@
       dst_data_ptr += len;
     }
     *dst_off_ptr= off;
-    ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
-    ndbassert((UintPtr(ptr) & 3) == 0);
-    sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
 
-    dst_ptr = ALIGN_WORD(dst_data_ptr);
+    /*
+      Now build the dynamic part, if any.
+      First look for any trailing all-NULL words of the bitmap; we do
+      not need to store those.
+    */
+    
+    ndbassert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
+    char *dyn_src_ptr= dst->m_dyn_data_ptr;
+    Uint32 bm_len= tabPtrP->m_offsets[MM].m_dyn_null_words; // In 32-bit words
+    /* If no dynamic variables, store nothing. */
+    if(bm_len != 0)
+    {
+      Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
+      while(*bm_ptr == 0)
+      {
+        bm_ptr--;
+        bm_len--;
+        if(bm_len == 0)
+          break;
+      }
+    }
+
+    /*
+      Copy the bitmap, counting the number of variable sized
+      attributes that are not NULL on the way.
+    */
+    Uint32 *dyn_dst_ptr= ALIGN_WORD(dst_data_ptr);
+    Uint32 dyn_var_count= 0;
+    const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
+    Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
+    /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... }, split to separate function. */
+    Uint16 dyn_dst_data_offset= 0;
+    if (bm_len > 0)
+    {
+      const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask;
+      for(Uint16 i= 0; i< bm_len; i++)
+      {
+        Uint32 v= src_bm_ptr[i];
+        dyn_var_count+= count_bits(v & *dyn_bm_var_mask_ptr++);
+        dst_bm_ptr[i]= v;
+      }
+
+      Uint32 tmp = *dyn_dst_ptr;
+      ndbassert(bm_len <= DYN_BM_LEN_MASK);
+      * dyn_dst_ptr = (tmp & ~(Uint32)DYN_BM_LEN_MASK) | bm_len;
+      dyn_dst_ptr+= bm_len;
+      dyn_dst_data_offset= 2*dyn_var_count + 2;
+    }
+    Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
+    Uint16 *dyn_src_lenoff_array=
+      dyn_src_off_array + dst->m_dyn_len_offset;
+    Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
+    /*
+      Copy over the variable sized not-NULL attributes.
+      Data offsets are counted from the start of the offset array, and
+      we store one additional offset to be able to easily compute the
+      data length as the difference between offsets.
+    */
+    Uint16 off_idx= 0;
+    order+= mm_fix+mm_vars;                     // Point to first dynfix entry
+    for(Uint32 i= 0; i<mm_dynvar; i++)
+    {
+      /*
+        Note that we must use the destination (shrunken) bitmap here, as the
+        source (expanded) bitmap may have been already clobbered (by offset
+        data).
+      */
+      Uint32 attrDesc2 = tabDesc[order[mm_dynfix+i]+1];
+      Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+      if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+      {
+        dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
+        Uint32 dyn_src_off= dyn_src_off_array[i];
+        Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
+        memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
+                dyn_src_ptr + dyn_src_off,
+                dyn_len);
+        dyn_dst_data_offset+= dyn_len;
+      }
+    }
+    /* If all dynamic attributes are NULL, we store nothing. */
+    if(bm_len != 0)
+    {
+      dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
+      ndbassert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
+    }
+
+    char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
+    char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
+    /*
+      Zero out any padding bytes. Might not be strictly necessary, but seems
+      cleaner than leaving random stuff in there.
+    */
+    bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
+
+    /* 
+       Copy over the fixed-sized not-NULL attributes.
+       Note that attributes are copied in reverse order; this is to avoid
+       overwriting not-yet-copied data, as the data is also stored in reverse
+       order.
+    */
+    for(Uint32 i= mm_dynfix; i > 0; )
+    {
+      i--;
+      Uint16 j= order[i];
+      Uint32 attrDesc2 = tabDesc[j+1];
+      Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
+      if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
+      {
+        Uint32 fixsize=
+          4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
+        memmove(dyn_dst_data_ptr,
+                dyn_src_ptr + dyn_src_off_array[mm_dynvar+i],
+                fixsize);
+        dyn_dst_data_ptr += fixsize;
+      }
+    }
+
+    Uint32 varpart_len= ((Uint32 *)dyn_dst_data_ptr)-dst_ptr;
+    vp->m_len = varpart_len;
+    sizes[MM] = varpart_len;
+    dst_ptr= (Uint32 *)dyn_dst_data_ptr;
+    
+    ndbassert(dyn_dst_data_ptr <= ((char*)ptr) + 8192);
+    ndbassert((UintPtr(ptr) & 3) == 0);
+    ndbassert((UintPtr(dyn_dst_data_ptr) & 3) == 0);
+    ndbassert(varpart_len < 0x10000);
   }
   else
   {
@@ -2743,11 +3234,15 @@
       memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
     }
   }
+
+  req_struct->is_expanded= false;
+
 }
 
 void
 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
 {
+  /* ToDo: We could also do some checks here for any dynamic part. */
   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
   Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size + 
     Tuple_header::HeaderSize;
@@ -2839,15 +3334,13 @@
   
   Uint32 bits= org->m_header_bits;
   Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
-  Uint32 fix_sz = Tuple_header::HeaderSize + 
-    regTabPtr->m_offsets[MM].m_fix_header_size;
   
   if(sizes[MM] == sizes[2+MM])
     ;
   else if(sizes[MM] > sizes[2+MM])
   {
     if(0) ndbout_c("shrink");
-    copy_bits |= Tuple_header::MM_SHRINK;
+    req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
   }
   else
   {
@@ -2855,7 +3348,7 @@
     Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
     Var_page* pageP= (Var_page*)pagePtr.p;
     Uint32 idx, alloc, needed;
-    Var_part_ref *refptr = (Var_part_ref*)org->get_var_part_ptr(regTabPtr);
+    Var_part_ref *refptr= (Var_part_ref *)(org->get_var_part_ptr(regTabPtr));
     ndbassert(bits & Tuple_header::CHAINED_ROW);
 
     Local_key ref;
@@ -2867,25 +3360,37 @@
       pageP = (Var_page*)pagePtr.p;
     }
     alloc= pageP->get_entry_len(idx);
+    Uint32 orig_size= alloc;
+    if(bits&Tuple_header::MM_GROWN)
+    {
+      /* Was grown before, so must fetch real original size from last word. */
+      Uint32 *old_var_part= pageP->get_ptr(idx);
+      ndbassert(alloc>0);
+      orig_size= old_var_part[alloc-1];
+    }
+      
 #ifdef VM_TRACE
     if(!pageP->get_entry_chain(idx))
       ndbout << *pageP << endl;
 #endif
     ndbassert(pageP->get_entry_chain(idx));
-    needed= sizes[2+MM] - fix_sz;
-    
+    needed= sizes[2+MM];
+
     if(needed <= alloc)
     {
       //ndbassert(!regOperPtr->is_first_operation());
       if (0) ndbout_c(" no grow");
       return 0;
     }
-    copy_bits |= Tuple_header::MM_GROWN;
-    if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr, 
-				  refptr, alloc, needed)))
+    Uint32 *new_var_part=realloc_var_part(regFragPtr, regTabPtr, pagePtr, 
+                                          refptr, alloc, needed);
+    if (unlikely(new_var_part==NULL))
       return -1;
+    /* Mark the tuple grown, store the original length at the end. */
+    org->m_header_bits= bits|Tuple_header::MM_GROWN;
+    ndbassert(needed>1);
+    new_var_part[needed-1]= orig_size;
   }
-  req_struct->m_tuple_ptr->m_header_bits = copy_bits;
   return 0;
 }
 
@@ -2905,7 +3410,8 @@
     PagePtr page_ptr;
 
     int ret;
-    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+    if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+        tablePtr.p->m_attributes[MM].m_no_of_dynamic)
     {
       tablePtr.p->m_offsets[MM].m_fix_header_size += 
 	Tuple_header::HeaderSize+1;
@@ -2946,7 +3452,8 @@
   
   int ret;
   PagePtr page_ptr;
-  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+  if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+      tablePtr.p->m_attributes[MM].m_no_of_dynamic)
   {
     const Uint32 XXX = Tuple_header::HeaderSize+Var_part_ref::SZ32;
     tablePtr.p->m_offsets[MM].m_fix_header_size += XXX;
@@ -3077,7 +3584,8 @@
   Local_key disk;
   memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
   
-  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+  if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+      tablePtr.p->m_attributes[MM].m_no_of_dynamic)
   {
     jam();
     free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);

--- 1.45/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-04-03 14:34:27 +02:00
+++ 1.46/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-04-03 14:34:27 +02:00
@@ -40,6 +40,7 @@
   cnoOfAttrbufrec = ZNO_OF_ATTRBUFREC;
   cnoOfFragrec = MAX_FRAG_PER_NODE;
   cnoOfFragoprec = MAX_FRAG_PER_NODE;
+  cnoOfAlterTabOps = MAX_FRAG_PER_NODE;
   cnoOfPageRangeRec = ZNO_OF_PAGE_RANGE_REC;
   c_maxTriggersPerTable = ZDEFAULT_MAX_NO_TRIGGERS_PER_TABLE;
   c_noOfBuildIndexRec = 32;
@@ -74,6 +75,7 @@
   addRecSignal(GSN_STORED_PROCREQ, &Dbtup::execSTORED_PROCREQ);
   addRecSignal(GSN_TUPFRAGREQ, &Dbtup::execTUPFRAGREQ);
   addRecSignal(GSN_TUP_ADD_ATTRREQ, &Dbtup::execTUP_ADD_ATTRREQ);
+  addRecSignal(GSN_ALTER_TAB_REQ, &Dbtup::execALTER_TAB_REQ);
   addRecSignal(GSN_TUP_COMMITREQ, &Dbtup::execTUP_COMMITREQ);
   addRecSignal(GSN_TUP_ABORTREQ, &Dbtup::execTUP_ABORTREQ);
   addRecSignal(GSN_NDB_STTOR, &Dbtup::execNDB_STTOR);
@@ -102,6 +104,7 @@
   attrbufrec = 0;
   fragoperrec = 0;
   fragrecord = 0;
+  alterTabOperRec = 0;
   hostBuffer = 0;
   pageRange = 0;
   tablerec = 0;
@@ -129,6 +132,10 @@
   deallocRecord((void **)&fragrecord,"Fragrecord",
 		sizeof(Fragrecord), 
 		cnoOfFragrec);
+
+  deallocRecord((void **)&alterTabOperRec,"AlterTabOperRec",
+                sizeof(alterTabOperRec),
+                cnoOfAlterTabOps);
   
   deallocRecord((void **)&hostBuffer,"HostBuffer",
 		sizeof(HostBuffer), 
@@ -381,6 +388,10 @@
 					sizeof(Fragrecord), 
 					cnoOfFragrec);
   
+  alterTabOperRec = (AlterTabOperation*)allocRecord("AlterTabOperation",
+                                                    sizeof(AlterTabOperation),
+                                                    cnoOfAlterTabOps);
+
   hostBuffer = (HostBuffer*)allocRecord("HostBuffer",
 					sizeof(HostBuffer), 
 					MAX_NODES);
@@ -452,6 +463,7 @@
     break;
   case 10:
     jam();
+    initializeAlterTabOperation();
     break;
   case 11:
     jam();
@@ -575,6 +587,24 @@
   regFragPtr.p->nextfreefrag = RNIL;
   cfirstfreefrag = 0;
 }//Dbtup::initializeFragrecord()
+
+void Dbtup::initializeAlterTabOperation()
+{
+  AlterTabOperationPtr regAlterTabOpPtr;
+  for (regAlterTabOpPtr.i= 0;
+       regAlterTabOpPtr.i<cnoOfAlterTabOps;
+       regAlterTabOpPtr.i++)
+  {
+    refresh_watch_dog();
+    ptrAss(regAlterTabOpPtr, alterTabOperRec);
+    new (regAlterTabOpPtr.p) AlterTabOperation();
+    regAlterTabOpPtr.p->nextAlterTabOp= regAlterTabOpPtr.i+1;
+  }
+  regAlterTabOpPtr.i= cnoOfAlterTabOps-1;
+  ptrAss(regAlterTabOpPtr, alterTabOperRec);
+  regAlterTabOpPtr.p->nextAlterTabOp= RNIL;
+  cfirstfreeAlterTabOp= 0;
+}
 
 void Dbtup::initializeHostBuffer() 
 {

--- 1.24/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2007-04-03 14:34:27 +02:00
+++ 1.25/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2007-04-03 14:34:27 +02:00
@@ -414,7 +414,8 @@
     }
     // memory page format
     buildPtr.p->m_build_vs =
-      tablePtr.p->m_attributes[MM].m_no_of_varsize > 0;
+      (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+       tablePtr.p->m_attributes[MM].m_no_of_dynamic) > 0;
     if (DictTabInfo::isOrderedIndex(buildReq->getIndexType())) {
       jam();
       const DLList<TupTriggerData>& triggerList = 

--- 1.36/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-04-03 14:34:27 +02:00
+++ 1.37/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-04-03 14:34:27 +02:00
@@ -25,6 +25,7 @@
 #include <signaldata/FsRemoveReq.hpp>
 #include <signaldata/DropTab.hpp>
 #include <signaldata/AlterTab.hpp>
+#include <signaldata/AlterTable.hpp>
 #include <signaldata/CreateFilegroupImpl.hpp>
 #include <AttributeDescriptor.hpp>
 #include "AttributeOffset.hpp"
@@ -93,10 +94,6 @@
   fragOperPtr.p->attributeCount = noOfAttributes;
   
   memset(fragOperPtr.p->m_null_bits, 0, sizeof(fragOperPtr.p->m_null_bits));
-  memset(fragOperPtr.p->m_fix_attributes_size, 0, 
-	 sizeof(fragOperPtr.p->m_fix_attributes_size));
-  memset(fragOperPtr.p->m_var_attributes_size, 0, 
-	 sizeof(fragOperPtr.p->m_var_attributes_size));
 
   fragOperPtr.p->charsetIndex = 0;
   fragOperPtr.p->minRows = minRows;
@@ -166,6 +163,8 @@
 //                   bits. Each time a new word is needed we allocate the
 //                   complete word. Zero nullable attributes means that there
 //                   is no word at all
+//                   Note that the null-bits are also used for storing the
+//                   data for (non-dynamic) bit types.
 //-----------------------------------------------------------------------------
     fragOperPtr.p->definingFragment= true;
     regTabPtr.p->tableStatus= DEFINING;
@@ -177,26 +176,39 @@
     regTabPtr.p->m_offsets[MM].m_null_words= 0;
     regTabPtr.p->m_offsets[MM].m_varpart_offset= 0;
     regTabPtr.p->m_offsets[MM].m_max_var_offset= 0;
+    regTabPtr.p->m_offsets[MM].m_max_dyn_offset= 0;
+    regTabPtr.p->m_offsets[MM].m_dyn_null_words= 0;
 
     regTabPtr.p->m_offsets[DD].m_disk_ref_offset= 0;
     regTabPtr.p->m_offsets[DD].m_null_words= 0;
     regTabPtr.p->m_offsets[DD].m_varpart_offset= 0;
     regTabPtr.p->m_offsets[DD].m_max_var_offset= 0;
+    regTabPtr.p->m_offsets[DD].m_max_dyn_offset= 0;
+    regTabPtr.p->m_offsets[DD].m_dyn_null_words= 0;
 
     regTabPtr.p->m_attributes[MM].m_no_of_fixsize= 0;
     regTabPtr.p->m_attributes[MM].m_no_of_varsize= 0;
+    regTabPtr.p->m_attributes[MM].m_no_of_dynamic= 0;
+    regTabPtr.p->m_attributes[MM].m_no_of_dyn_fix= 0;
+    regTabPtr.p->m_attributes[MM].m_no_of_dyn_var= 0;
     regTabPtr.p->m_attributes[DD].m_no_of_fixsize= 0;
     regTabPtr.p->m_attributes[DD].m_no_of_varsize= 0;
+    regTabPtr.p->m_attributes[DD].m_no_of_dynamic= 0;
+    regTabPtr.p->m_attributes[DD].m_no_of_dyn_fix= 0;
+    regTabPtr.p->m_attributes[DD].m_no_of_dyn_var= 0;
 
+    // Reserve space for bitmap length
+    regTabPtr.p->m_dyn_null_bits= DYN_BM_LEN_BITS; 
     regTabPtr.p->noOfKeyAttr= noOfKeyAttr;
     regTabPtr.p->noOfCharsets= noOfCharsets;
     regTabPtr.p->m_no_of_attributes= noOfAttributes;
     
-    regTabPtr.p->notNullAttributeMask.clear();
-    regTabPtr.p->blobAttributeMask.clear();
+    regTabPtr.p->dynTabDescriptor= RNIL;
     
     Uint32 offset[10];
-    Uint32 tableDescriptorRef= allocTabDescr(regTabPtr.p, offset);
+    Uint32 allocSize= getTabDescrOffsets(noOfAttributes, noOfCharsets,
+                                         noOfKeyAttr, offset);
+    Uint32 tableDescriptorRef= allocTabDescr(allocSize);
     if (tableDescriptorRef == RNIL) {
       jam();
       fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId);
@@ -265,6 +277,14 @@
   fragOperPtr.p->inUse = true;
 }//Dbtup::seizeFragoperrec()
 
+void Dbtup::seizeAlterTabOperation(AlterTabOperationPtr& alterTabOpPtr)
+{
+  alterTabOpPtr.i= cfirstfreeAlterTabOp;
+  ptrCheckGuard(alterTabOpPtr, cnoOfAlterTabOps, alterTabOperRec);
+  cfirstfreeAlterTabOp= alterTabOpPtr.p->nextAlterTabOp;
+  alterTabOpPtr.p->nextAlterTabOp= RNIL;
+}
+
 /* **************************************************************** */
 /* **************          TUP_ADD_ATTRREQ       ****************** */
 /* **************************************************************** */
@@ -279,7 +299,6 @@
   ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
   Uint32 attrId = signal->theData[2];
   Uint32 attrDescriptor = signal->theData[3];
-  Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
   // DICT sends charset number in upper half
   Uint32 csNumber = (signal->theData[4] >> 16);
 
@@ -334,12 +353,12 @@
   Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
   
   Uint32 attrDes2= 0;
+  Uint32 bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
+  Uint32 words= (bytes + 3) / 4;
+  Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
   if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
     jam();
-    Uint32 pos= 0, null_pos;
-    Uint32 bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
-    Uint32 words= (bytes + 3) / 4;
-    Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
+    Uint32 null_pos;
     ndbrequire(ind <= 1);
     null_pos= fragOperPtr.p->m_null_bits[ind];
 
@@ -348,66 +367,92 @@
       jam();
       fragOperPtr.p->m_null_bits[ind]++;
     } 
-    else 
-    {
-      regTabPtr.p->notNullAttributeMask.set(attrId);
-    }
-
-    if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT) {
-      regTabPtr.p->blobAttributeMask.set(attrId);
-    }
 
-    switch (AttributeDescriptor::getArrayType(attrDescriptor)) {
-    case NDB_ARRAYTYPE_FIXED:
+    if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED)
     {
       jam();
       regTabPtr.p->m_attributes[ind].m_no_of_fixsize++;
-      if(attrLen != 0)
-      {
-	jam();
-	pos= fragOperPtr.p->m_fix_attributes_size[ind];
-	fragOperPtr.p->m_fix_attributes_size[ind] += words;
-      }
-      else
+      if(attrLen == 0)
       {
+        /* Static bit type. */
 	jam();
 	Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
 	fragOperPtr.p->m_null_bits[ind] += bitCount;
       }
-      break;
     }
-    default:
+    else
     {
       jam();
-      fragOperPtr.p->m_var_attributes_size[ind] += bytes;
-      pos= regTabPtr.p->m_attributes[ind].m_no_of_varsize++;
-      break;
+      regTabPtr.p->m_attributes[ind].m_no_of_varsize++;
     }
-    }//switch
     
-    AttributeOffset::setOffset(attrDes2, pos);
     AttributeOffset::setNullFlagPos(attrDes2, null_pos);
   } else {
-    ndbrequire(false);
-  }
-  if (csNumber != 0) { 
-    CHARSET_INFO* cs = all_charsets[csNumber];
-    ndbrequire(cs != NULL);
-    Uint32 i = 0;
-    while (i < fragOperPtr.p->charsetIndex) {
+    /* A dynamic attribute. */
+    ndbrequire(ind==MM);
+    regTabPtr.p->m_attributes[ind].m_no_of_dynamic++;
+    /*
+       The dynamic attribute format always require a 'null' bit. So
+       storing NOT NULL attributes as dynamic is not all that useful
+       (but not harmful in any way either).
+       Later we might implement NOT NULL DEFAULT xxx by storing the value
+       xxx internally as 'null'.
+    */
+
+    Uint32 null_pos= regTabPtr.p->m_dyn_null_bits;
+
+    if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED)
+    {
+      /* A fixed-size dynamic attribute. */
       jam();
-      if (regTabPtr.p->charsetArray[i] == cs)
-	break;
-      i++;
+      if (AttributeDescriptor::getSize(attrDescriptor)==0)
+      {
+        /*
+          Bit type. These are stored directly in the bitmap.
+          This means that we will still use some space for a dynamic NULL
+          bittype if a following dynamic attribute is non-NULL.
+        */
+        Uint32 bits= AttributeDescriptor::getArraySize(attrDescriptor);
+        /*
+          The NULL bit is stored after the data bits, so that we automatically
+          ensure that the full size bitmap is stored when non-NULL.
+        */
+        null_pos+= bits;
+        regTabPtr.p->m_dyn_null_bits+= bits+1;
+      }
+      else
+      {
+        jam();
+        /*
+          We use one NULL bit per 4 bytes of dynamic fixed-size attribute. So
+          for dynamic fixsize longer than 64 bytes (16 null bits), it is more
+          efficient to store them as dynamic varsize internally.
+        */
+        if(words > InternalMaxDynFix)
+          goto treat_as_varsize;
+
+        regTabPtr.p->m_attributes[ind].m_no_of_dyn_fix++;
+        Uint32 null_bits= (bytes+3) >> 2;
+        regTabPtr.p->m_dyn_null_bits+= null_bits;
+      }
     }
-    if (i == fragOperPtr.p->charsetIndex) {
+    else
+    {
+      /* A variable-sized dynamic attribute. */
+    treat_as_varsize:
       jam();
-      fragOperPtr.p->charsetIndex++;
+      regTabPtr.p->m_attributes[ind].m_no_of_dyn_var++;
+      regTabPtr.p->m_dyn_null_bits++;
     }
-    ndbrequire(i < regTabPtr.p->noOfCharsets);
-    regTabPtr.p->charsetArray[i]= cs;
-    AttributeOffset::setCharsetPos(attrDes2, i);
-  }
+    AttributeOffset::setNullFlagPos(attrDes2, null_pos);
+
+    ndbassert((regTabPtr.p->m_attributes[ind].m_no_of_dyn_var +
+               regTabPtr.p->m_attributes[ind].m_no_of_dyn_fix) <=
+              regTabPtr.p->m_attributes[ind].m_no_of_dynamic);
+  }
+  handleCharsetPos(csNumber, regTabPtr.p->charsetArray,
+                   regTabPtr.p->noOfCharsets,
+                   fragOperPtr.p->charsetIndex, attrDes2);
   setTabDescrWord(firstTabDesIndex + 1, attrDes2);
 
   if (ERROR_INSERTED(4009) && regTabPtr.p->fragid[0] == fragId && attrId == 0||
@@ -440,89 +485,22 @@
 #define BTW(x) ((x+31) >> 5)
   regTabPtr.p->m_offsets[MM].m_null_words= BTW(fragOperPtr.p->m_null_bits[MM]);
   regTabPtr.p->m_offsets[DD].m_null_words= BTW(fragOperPtr.p->m_null_bits[DD]);
+#undef BTW
 
-  /**
-   * Fix offsets
-   */
-  Uint32 pos[2] = { 0, 0 };
-  if (regTabPtr.p->m_bits & Tablerec::TR_Checksum)
-  {
-    pos[0]= 1; 
-  }
-
-  if (regTabPtr.p->m_bits & Tablerec::TR_RowGCI)
-  {
-    pos[MM]++;
-    pos[DD]++;
-  }
-  
-  regTabPtr.p->m_no_of_disk_attributes= 
-    regTabPtr.p->m_attributes[DD].m_no_of_fixsize +
-    regTabPtr.p->m_attributes[DD].m_no_of_varsize;
-  
-  if(regTabPtr.p->m_no_of_disk_attributes > 0)
-  {
-    regTabPtr.p->m_offsets[MM].m_disk_ref_offset= pos[MM];
-    pos[MM] += 2; // 8 bytes
-  }
-  
-  regTabPtr.p->m_offsets[MM].m_null_offset= pos[MM];
-  regTabPtr.p->m_offsets[DD].m_null_offset= pos[DD];
-  
-  pos[MM]+= regTabPtr.p->m_offsets[MM].m_null_words;
-  pos[DD]+= regTabPtr.p->m_offsets[DD].m_null_words;
-
-  Uint32 *tabDesc = (Uint32*)(tableDescriptor+regTabPtr.p->tabDescriptor);
-  for(Uint32 i= 0; i<regTabPtr.p->m_no_of_attributes; i++)
-  {
-    Uint32 ind= AttributeDescriptor::getDiskBased(* tabDesc);
-    Uint32 arr= AttributeDescriptor::getArrayType(* tabDesc++);
-
-    if(arr == NDB_ARRAYTYPE_FIXED)
-    {
-      Uint32 desc= * tabDesc;
-      Uint32 off= AttributeOffset::getOffset(desc) + pos[ind];
-      AttributeOffset::setOffset(desc, off);
-      * tabDesc= desc;
-    }
-    tabDesc++;
+  /* Allocate  dynamic descriptor. */
+  Uint32 offset[3];
+  Uint32 allocSize= getDynTabDescrOffsets((regTabPtr.p->m_dyn_null_bits+31)>>5,
+                                          offset);
+  Uint32 dynTableDescriptorRef= allocTabDescr(allocSize);
+  if (dynTableDescriptorRef == RNIL) {
+    jam();
+    addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
+    return;
   }
+  setupDynDescriptorReferences(dynTableDescriptorRef, regTabPtr.p, offset);
 
-  regTabPtr.p->m_offsets[MM].m_fix_header_size= 
-    fragOperPtr.p->m_fix_attributes_size[MM] + 
-    pos[MM];
-
-  regTabPtr.p->m_offsets[DD].m_fix_header_size= 
-    fragOperPtr.p->m_fix_attributes_size[DD] + 
-    pos[DD];
-
-  if(regTabPtr.p->m_attributes[MM].m_no_of_varsize == 0)
-    regTabPtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize;
-
-  if(regTabPtr.p->m_attributes[DD].m_no_of_varsize == 0 &&
-     regTabPtr.p->m_attributes[DD].m_no_of_fixsize > 0)
-    regTabPtr.p->m_offsets[DD].m_fix_header_size += Tuple_header::HeaderSize;
-  
-  regTabPtr.p->m_offsets[MM].m_max_var_offset= 
-    fragOperPtr.p->m_var_attributes_size[MM];
-  
-  regTabPtr.p->m_offsets[DD].m_max_var_offset= 
-    fragOperPtr.p->m_var_attributes_size[DD];
-
-  regTabPtr.p->total_rec_size= 
-    pos[MM] + fragOperPtr.p->m_fix_attributes_size[MM] +
-    pos[DD] + fragOperPtr.p->m_fix_attributes_size[DD] +
-    ((fragOperPtr.p->m_var_attributes_size[MM] + 3) >> 2) +
-    ((fragOperPtr.p->m_var_attributes_size[DD] + 3) >> 2) +
-    (regTabPtr.p->m_attributes[MM].m_no_of_varsize ? 
-     (regTabPtr.p->m_attributes[MM].m_no_of_varsize + 2) >> 1 : 0) +
-    (regTabPtr.p->m_attributes[DD].m_no_of_varsize ? 
-     (regTabPtr.p->m_attributes[DD].m_no_of_varsize + 2) >> 1 : 0) +
-    Tuple_header::HeaderSize +
-    (regTabPtr.p->m_no_of_disk_attributes ? Tuple_header::HeaderSize : 0);
-  
-  setUpQueryRoutines(regTabPtr.p);
-  setUpKeyArray(regTabPtr.p);
+  /* Compute table aggregate metadata. */
+  computeTableMetaData(regTabPtr.p);
 
 #if 0
   ndbout << *regTabPtr.p << endl;
@@ -538,7 +516,8 @@
   
   {
     Uint32 fix_tupheader = regTabPtr.p->m_offsets[MM].m_fix_header_size;
-    if(regTabPtr.p->m_attributes[MM].m_no_of_varsize != 0)
+    if((regTabPtr.p->m_attributes[MM].m_no_of_varsize +
+        regTabPtr.p->m_attributes[MM].m_no_of_dynamic) != 0)
       fix_tupheader += Tuple_header::HeaderSize + Var_part_ref::SZ32;
     ndbassert(fix_tupheader > 0);
     Uint32 noRowsPerPage = ZWORDS_ON_PAGE / fix_tupheader;
@@ -620,6 +599,546 @@
 }
 
 void
+Dbtup::execALTER_TAB_REQ(Signal *signal)
+{
+  jamEntry();
+  if(!assembleFragments(signal))
+    return;
+  AlterTabReq *const req= (AlterTabReq *)signal->getDataPtr();
+  if (!AlterTableReq::getAddAttrFlag(req->changeMask))
+  {
+    /* Nothing to do in TUP. */
+    releaseSections(signal);
+    sendAlterTabConf(signal, req);
+    return;
+  }
+
+  TablerecPtr regTabPtr;
+  regTabPtr.i= req->tableId;
+  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
+
+  AlterTabReq::RequestType alterType=
+    (AlterTabReq::RequestType)req->requestType;
+
+  if(alterType==AlterTabReq::AlterTablePrepare)
+  {
+    handleAlterTabPrepare(signal, regTabPtr.p);
+    return;
+  }
+
+  AlterTabOperationPtr regAlterTabOpPtr;
+  if (req->clientData==RNIL)
+  {
+    /* This means that we failed in prepare, or never got there. */
+    sendAlterTabConf(signal, req);
+    return;
+  }
+  regAlterTabOpPtr.i= req->clientData;
+  ptrCheckGuard(regAlterTabOpPtr, cnoOfAlterTabOps, alterTabOperRec);
+
+  if(alterType==AlterTabReq::AlterTableCommit)
+  {
+    handleAlterTableCommit(signal, regAlterTabOpPtr, regTabPtr.p);
+  }
+  else if(alterType==AlterTabReq::AlterTableRevert)
+  {
+    handleAlterTableAbort(signal, regAlterTabOpPtr, regTabPtr.p);
+  }
+  else
+  {
+    ndbrequire(false);
+  }
+}
+
+void
+Dbtup::handleAlterTabPrepare(Signal *signal, const Tablerec *regTabPtr)
+{
+  AlterTabReq *const req= (AlterTabReq *)signal->getDataPtr();
+
+  Uint32 noOfNewAttr= req->noOfNewAttr;
+  Uint32 newNoOfCharsets= req->newNoOfCharsets;
+  Uint32 newNoOfKeyAttrs= req->newNoOfKeyAttrs;
+
+  ndbrequire(signal->getNoOfSections() == 1);
+  ndbrequire((25+noOfNewAttr*2)<<2 < sizeof(signal->theData));
+
+  /* Get the array of attribute descriptor words. */
+  SegmentedSectionPtr ssPtr;
+  Uint32 *attrInfo= signal->theData+25;
+  signal->getSection(ssPtr, 0);
+  copy(attrInfo, ssPtr);
+  releaseSections(signal);
+
+  Uint32 oldNoOfAttr= regTabPtr->m_no_of_attributes;
+  Uint32 newNoOfAttr= oldNoOfAttr+noOfNewAttr;
+
+  /* Can only add attributes if varpart already present. */
+  if((regTabPtr->m_attributes[MM].m_no_of_varsize +
+      regTabPtr->m_attributes[MM].m_no_of_dynamic) == 0)
+  {
+    sendAlterTabRef(signal, req, ZINVALID_ALTER_TAB);
+    return;
+  }
+
+  AlterTabOperationPtr regAlterTabOpPtr;
+  seizeAlterTabOperation(regAlterTabOpPtr);
+
+  regAlterTabOpPtr.p->newNoOfAttrs= newNoOfAttr;
+
+  /* Allocate a new (possibly larger) table descriptor buffer. */
+  Uint32 allocSize= getTabDescrOffsets(newNoOfAttr, newNoOfCharsets,
+                                       newNoOfKeyAttrs,
+                                       regAlterTabOpPtr.p->tabDesOffset);
+  Uint32 tableDescriptorRef= allocTabDescr(allocSize);
+  if (tableDescriptorRef == RNIL) {
+    jam();
+    releaseAlterTabOpRec(regAlterTabOpPtr);
+    sendAlterTabRef(signal, req, terrorCode);
+    return;
+  }
+  regAlterTabOpPtr.p->tableDescriptor= tableDescriptorRef;
+  regAlterTabOpPtr.p->desAllocSize= allocSize;
+
+  /*
+    Get new pointers into tableDescriptor, and copy over old data.
+    (Rest will be recomputed in computeTableMetaData() in case of
+    ALTER_TAB_REQ[commit]).
+  */
+  Uint32* desc= &tableDescriptor[tableDescriptorRef].tabDescr;
+  CHARSET_INFO** CharsetArray=
+    (CHARSET_INFO**)(desc + regAlterTabOpPtr.p->tabDesOffset[2]);
+  memcpy(CharsetArray, regTabPtr->charsetArray,
+         sizeof(*CharsetArray)*regTabPtr->noOfCharsets);
+  Uint32 *attrDesPtr= desc + regAlterTabOpPtr.p->tabDesOffset[4];
+  memcpy(attrDesPtr,
+         &tableDescriptor[regTabPtr->tabDescriptor].tabDescr,
+         (ZAD_SIZE<<2)*oldNoOfAttr);
+  attrDesPtr+= ZAD_SIZE*oldNoOfAttr;
+
+  /*
+    Loop over the new attributes to add.
+     - Save AttributeDescriptor word in new TabDescriptor record.
+     - Compute charset pos, as we will not save original csNumber.
+     - Compute size needed for dynamic bitmap mask allocation.
+     - Compute number of dynamic varsize, needed for fixsize offset calculation
+       in ALTER_TAB_REQ[commit];
+   */
+  Uint32 charsetIndex= regTabPtr->noOfCharsets;
+  Uint32 dyn_nullbits= regTabPtr->m_dyn_null_bits;
+  Uint32 noDynFix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
+  Uint32 noDynVar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+  Uint32 noDynamic= regTabPtr->m_attributes[MM].m_no_of_dynamic;
+  for (Uint32 i= 0; i<noOfNewAttr; i++)
+  {
+    Uint32 attrDescriptor= *attrInfo++;
+    Uint32 csNumber= (*attrInfo++ >> 16);
+    Uint32 attrDes2= 0;
+
+    /* Only dynamic attributes possible for add attr */
+    ndbrequire(AttributeDescriptor::getDynamic(attrDescriptor));
+    ndbrequire(!AttributeDescriptor::getDiskBased(attrDescriptor));
+
+    handleCharsetPos(csNumber, CharsetArray, newNoOfCharsets,
+                     charsetIndex, attrDes2);
+
+    Uint32 null_pos= dyn_nullbits;
+    Uint32 arrType= AttributeDescriptor::getArrayType(attrDescriptor);
+    noDynamic++;
+    if (arrType==NDB_ARRAYTYPE_FIXED)
+    {
+      Uint32 words= AttributeDescriptor::getSizeInWords(attrDescriptor);
+
+      if(AttributeDescriptor::getSize(attrDescriptor) > 0)
+      {
+        jam();
+        if(words > InternalMaxDynFix)
+          goto treat_as_varsize;
+        noDynFix++;
+        dyn_nullbits+= words;
+      }
+      else
+      {
+        /* Bit type. */
+        jam();
+        Uint32 bits= AttributeDescriptor::getArraySize(attrDescriptor);
+        null_pos+= bits;
+        dyn_nullbits+= bits+1;
+      }
+    }
+    else
+    {
+      jam();
+    treat_as_varsize:
+      noDynVar++;
+      dyn_nullbits++;
+    }
+    AttributeOffset::setNullFlagPos(attrDes2, null_pos);
+
+    *attrDesPtr++= attrDescriptor;
+    *attrDesPtr++= attrDes2;
+  }
+  ndbassert(newNoOfCharsets==charsetIndex);
+
+  regAlterTabOpPtr.p->noOfDynNullBits= dyn_nullbits;
+  ndbassert(noDynamic ==
+            regTabPtr->m_attributes[MM].m_no_of_dynamic + noOfNewAttr);
+  regAlterTabOpPtr.p->noOfDynFix= noDynFix;
+  regAlterTabOpPtr.p->noOfDynVar= noDynVar;
+  regAlterTabOpPtr.p->noOfDynamic= noDynamic;
+
+  /* Allocate the new (possibly larger) dynamic descriptor. */
+  allocSize= getDynTabDescrOffsets((dyn_nullbits+31)>>5,
+                                   regAlterTabOpPtr.p->dynTabDesOffset);
+  Uint32 dynTableDescriptorRef= allocTabDescr(allocSize);
+  if (dynTableDescriptorRef == RNIL) {
+    jam();
+    freeTabDescr(tableDescriptorRef, regAlterTabOpPtr.p->desAllocSize);
+    releaseAlterTabOpRec(regAlterTabOpPtr);
+    sendAlterTabRef(signal, req, terrorCode);
+    return;
+  }
+  regAlterTabOpPtr.p->dynDesAllocSize= allocSize;
+  regAlterTabOpPtr.p->dynTableDescriptor= dynTableDescriptorRef;
+
+  sendAlterTabConf(signal, req, regAlterTabOpPtr.i);
+}
+
+void
+Dbtup::sendAlterTabRef(Signal *signal, AlterTabReq *req, Uint32 errorCode)
+{
+  signal->header.m_noOfSections = 0;
+
+  AlterTabAll *const src= (AlterTabAll *)req;
+  Uint32 senderRef= src->req.senderRef;
+  Uint32 senderData= src->req.senderData;
+  Uint32 requestType= src->req.requestType;
+
+  AlterTabAll *const dst= (AlterTabAll *)signal->getDataPtrSend();
+  dst->ref.senderRef= reference();
+  dst->ref.senderData= senderData;
+  dst->ref.errorCode= errorCode;
+  dst->ref.errorLine= 0;
+  dst->ref.errorKey= 0;
+  dst->ref.errorStatus= 0;
+  dst->ref.requestType= requestType;
+
+  sendSignal(senderRef, GSN_ALTER_TAB_REF, signal,
+             AlterTabRef::SignalLength, JBB);
+}
+
+void
+Dbtup::sendAlterTabConf(Signal *signal, AlterTabReq *req, Uint32 clientData)
+{
+  signal->header.m_noOfSections = 0;
+
+  AlterTabAll *const src= (AlterTabAll *)req;
+  Uint32 senderRef= src->req.senderRef;
+  Uint32 senderData= src->req.senderData;
+  Uint32 changeMask= src->req.changeMask;
+  Uint32 tableId= src->req.tableId;
+  Uint32 tableVersion= src->req.tableVersion;
+  Uint32 gci= src->req.gci;
+  Uint32 requestType= src->req.requestType;
+
+  AlterTabAll *const dst= (AlterTabAll *)signal->getDataPtrSend();
+  dst->conf.senderRef= reference();
+  dst->conf.senderData= senderData;
+  dst->conf.changeMask= changeMask;
+  dst->conf.tableId= tableId;
+  dst->conf.tableVersion= tableVersion;
+  dst->conf.gci= gci;
+  dst->conf.requestType= requestType;
+  dst->conf.clientData= clientData;
+
+  sendSignal(senderRef, GSN_ALTER_TAB_CONF, signal,
+             AlterTabConf::SignalLength, JBB);
+}
+
+void
+Dbtup::handleAlterTableCommit(Signal *signal,
+                              AlterTabOperationPtr regAlterTabOpPtr,
+                              Tablerec *regTabPtr)
+{
+  /* Free old table descriptors. */
+  releaseTabDescr(regTabPtr);
+
+  /* Set new attribute counts. */
+  regTabPtr->m_no_of_attributes= regAlterTabOpPtr.p->newNoOfAttrs;
+  regTabPtr->m_attributes[MM].m_no_of_dyn_fix= regAlterTabOpPtr.p->noOfDynFix;
+  regTabPtr->m_attributes[MM].m_no_of_dyn_var= regAlterTabOpPtr.p->noOfDynVar;
+  regTabPtr->m_attributes[MM].m_no_of_dynamic= regAlterTabOpPtr.p->noOfDynamic;
+  regTabPtr->m_dyn_null_bits= regAlterTabOpPtr.p->noOfDynNullBits;
+
+  /* Install the new (larger) table descriptors. */
+  setUpDescriptorReferences(regAlterTabOpPtr.p->tableDescriptor,
+                            regTabPtr,
+                            regAlterTabOpPtr.p->tabDesOffset);
+  setupDynDescriptorReferences(regAlterTabOpPtr.p->dynTableDescriptor,
+                               regTabPtr,
+                               regAlterTabOpPtr.p->dynTabDesOffset);
+
+  releaseAlterTabOpRec(regAlterTabOpPtr);
+
+  /* Recompute aggregate table meta data. */
+  computeTableMetaData(regTabPtr);
+
+  sendAlterTabConf(signal, (AlterTabReq *)signal->getDataPtr());
+}
+
+void
+Dbtup::handleAlterTableAbort(Signal *signal,
+                             AlterTabOperationPtr regAlterTabOpPtr,
+                              Tablerec *regTabPtr)
+{
+  freeTabDescr(regAlterTabOpPtr.p->tableDescriptor,
+               regAlterTabOpPtr.p->desAllocSize);
+  freeTabDescr(regAlterTabOpPtr.p->dynTableDescriptor,
+               regAlterTabOpPtr.p->dynDesAllocSize);
+  releaseAlterTabOpRec(regAlterTabOpPtr);
+
+  sendAlterTabConf(signal, (AlterTabReq *)signal->getDataPtr());
+}
+
+/*
+  Update information for charset for a new attribute.
+  If needed, attrDes2 will be updated with the correct charsetPos and
+  charsetIndex will be updated to point to next free charsetPos slot.
+*/
+void
+Dbtup::handleCharsetPos(Uint32 csNumber, CHARSET_INFO** charsetArray,
+                        Uint32 noOfCharsets,
+                        Uint32 & charsetIndex, Uint32 & attrDes2)
+{
+  if (csNumber != 0)
+  { 
+    CHARSET_INFO* cs = all_charsets[csNumber];
+    ndbrequire(cs != NULL);
+    Uint32 i= 0;
+    while (i < charsetIndex)
+    {
+      jam();
+      if (charsetArray[i] == cs)
+	break;
+      i++;
+    }
+    if (i == charsetIndex) {
+      jam();
+      ndbrequire(i < noOfCharsets);
+      charsetArray[i]= cs;
+      charsetIndex++;
+    }
+    AttributeOffset::setCharsetPos(attrDes2, i);
+  }
+}
+
+/*
+  This function (re-)computes aggregated metadata. It is called for
+  both ALTER TABLE and CREATE TABLE.
+ */
+void
+Dbtup::computeTableMetaData(Tablerec *regTabPtr)
+{
+
+  Uint32 dyn_null_words= (regTabPtr->m_dyn_null_bits+31)>>5;
+  regTabPtr->m_offsets[MM].m_dyn_null_words= dyn_null_words;
+
+  /* Compute the size of the static headers. */
+  Uint32 pos[2] = { 0, 0 };
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum)
+  {
+    pos[MM]++; 
+  }
+
+  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
+  {
+    pos[MM]++;
+    pos[DD]++;
+  }
+
+  regTabPtr->m_no_of_disk_attributes= 
+    regTabPtr->m_attributes[DD].m_no_of_fixsize +
+    regTabPtr->m_attributes[DD].m_no_of_varsize;
+  if(regTabPtr->m_no_of_disk_attributes > 0)
+  {
+    /* Room for disk part location. */
+    regTabPtr->m_offsets[MM].m_disk_ref_offset= pos[MM];
+    pos[MM] += 2; // 8 bytes
+  }
+
+  regTabPtr->m_offsets[MM].m_null_offset= pos[MM];
+  regTabPtr->m_offsets[DD].m_null_offset= pos[DD];
+  pos[MM]+= regTabPtr->m_offsets[MM].m_null_words;
+  pos[DD]+= regTabPtr->m_offsets[DD].m_null_words;
+
+  /*
+    Compute the offsets for the attributes.
+    For static fixed-size, this is the offset from the tuple pointer of the
+    actual data.
+    For static var-size and dynamic, this is the index into the offset array.
+
+    We also compute the dynamic bitmasks here.
+  */
+  Uint32 *tabDesc= (Uint32*)(tableDescriptor+regTabPtr->tabDescriptor);
+  Uint32 *dynDesc= (Uint32*)(tableDescriptor+regTabPtr->dynTabDescriptor);
+  Uint32 fix_size[2]= {0, 0};
+  Uint32 var_size[2]= {0, 0};
+  Uint32 dyn_size[2]= {0, 0};
+  Uint32 statvar_count= 0;
+  Uint32 dynfix_count= 0;
+  Uint32 dynvar_count= 0;
+  Uint32 dynamic_count= 0;
+  regTabPtr->blobAttributeMask.clear();
+  regTabPtr->notNullAttributeMask.clear();
+  bzero(regTabPtr->dynVarSizeMask, dyn_null_words<<2);
+  bzero(regTabPtr->dynFixSizeMask, dyn_null_words<<2);
+
+  for(Uint32 i= 0; i<regTabPtr->m_no_of_attributes; i++)
+  {
+    Uint32 attrDescriptor= *tabDesc++;
+    Uint32 attrDes2= *tabDesc;
+    Uint32 ind= AttributeDescriptor::getDiskBased(attrDescriptor);
+    Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
+    Uint32 arr= AttributeDescriptor::getArrayType(attrDescriptor);
+    Uint32 size_in_words= AttributeDescriptor::getSizeInWords(attrDescriptor);
+    Uint32 size_in_bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
+    Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
+    Uint32 off;
+
+    if(!AttributeDescriptor::getNullable(attrDescriptor))
+      regTabPtr->notNullAttributeMask.set(i);
+    if (!AttributeDescriptor::getDynamic(attrDescriptor))
+    {
+      if(arr == NDB_ARRAYTYPE_FIXED)
+      {
+        if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+          regTabPtr->blobAttributeMask.set(i);
+
+        if (attrLen!=0)
+        {
+          off= fix_size[ind] + pos[ind];
+          fix_size[ind]+= size_in_words;
+        }
+        else
+          off= 0;                               // Bit type
+      }
+      else
+      {
+        /* Static varsize. */
+        ndbassert(ind==MM);
+        off= statvar_count++;
+        var_size[ind]+= size_in_bytes;
+      }
+    }
+    else
+    {
+      /* Dynamic attribute. */
+      dynamic_count++;
+      ndbrequire(ind==MM);
+      Uint32 null_pos= AttributeOffset::getNullFlagPos(attrDes2);
+      dyn_size[ind]+= (size_in_words<<2);
+      if(arr == NDB_ARRAYTYPE_FIXED)
+      {
+        jam();
+        if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT)
+          regTabPtr->blobAttributeMask.set(i);
+        // ToDo: I wonder what else is needed to handle BLOB/TEXT, if anything?
+
+        if (attrLen!=0)
+        {
+          jam();
+          if(size_in_words>InternalMaxDynFix)
+            goto treat_as_varsize;
+
+          off= dynfix_count++ + regTabPtr->m_attributes[ind].m_no_of_dyn_var;
+          while(size_in_words-- > 0)
+	  {
+	    BitmaskImpl::set(dyn_null_words, 
+			     regTabPtr->dynFixSizeMask, null_pos++);
+	  }
+        }
+        else
+          off= 0;                               // Bit type
+      }
+      else
+      {
+      treat_as_varsize:
+        jam();
+        off= dynvar_count++;
+	BitmaskImpl::set(dyn_null_words, regTabPtr->dynVarSizeMask, null_pos);
+      }
+    }
+    AttributeOffset::setOffset(attrDes2, off);
+    *tabDesc++= attrDes2;
+  }
+  ndbassert(dynvar_count==regTabPtr->m_attributes[MM].m_no_of_dyn_var);
+  ndbassert(dynfix_count==regTabPtr->m_attributes[MM].m_no_of_dyn_fix);
+  ndbassert(dynamic_count==regTabPtr->m_attributes[MM].m_no_of_dynamic);
+  ndbassert(statvar_count==regTabPtr->m_attributes[MM].m_no_of_varsize);
+
+  regTabPtr->m_offsets[MM].m_fix_header_size= 
+    fix_size[MM] + pos[MM];
+  regTabPtr->m_offsets[DD].m_fix_header_size= 
+    fix_size[DD] + pos[DD];
+
+  if((regTabPtr->m_attributes[MM].m_no_of_varsize +
+      regTabPtr->m_attributes[MM].m_no_of_dynamic) == 0)
+    regTabPtr->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize;
+
+  if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0 &&
+     regTabPtr->m_attributes[DD].m_no_of_fixsize > 0)
+    regTabPtr->m_offsets[DD].m_fix_header_size += Tuple_header::HeaderSize;
+
+  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dyn_fix +
+                  regTabPtr->m_attributes[MM].m_no_of_dyn_var;
+  Uint32 dd_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  Uint32 dd_dyns= regTabPtr->m_attributes[DD].m_no_of_dynamic;
+
+  regTabPtr->m_offsets[MM].m_max_var_offset= var_size[MM];
+  /*
+    Size of the expanded dynamic part. Needs room for bitmap, (N+1) 16-bit
+    offset words with 32-bit padding, and all attribute data.
+  */
+  regTabPtr->m_offsets[MM].m_max_dyn_offset= 
+    (regTabPtr->m_offsets[MM].m_dyn_null_words<<2) + 4*((mm_dyns+2)>>1) +
+    dyn_size[MM];
+  
+  regTabPtr->m_offsets[DD].m_max_var_offset= var_size[DD];
+  regTabPtr->m_offsets[DD].m_max_dyn_offset= 
+    (regTabPtr->m_offsets[DD].m_dyn_null_words<<2) + 4*((dd_dyns+2)>>1) +
+    dyn_size[DD];
+
+  /* Room for data for all the attributes. */
+  Uint32 total_rec_size=
+    pos[MM] + fix_size[MM] + pos[DD] + fix_size[DD] +
+    ((var_size[MM] + 3) >> 2) + ((dyn_size[MM] + 3) >> 2) +
+    ((var_size[DD] + 3) >> 2) + ((dyn_size[DD] + 3) >> 2);
+  /*
+    Room for offset arrays and dynamic bitmaps. There is one extra 16-bit
+    offset in each offset array (for easy computation of final length).
+    Also one word for storing total length of varsize+dynamic part
+  */
+  if(mm_vars + regTabPtr->m_attributes[MM].m_no_of_dynamic)
+  {
+    total_rec_size+= (mm_vars + 2) >> 1;
+    total_rec_size+= regTabPtr->m_offsets[MM].m_dyn_null_words;
+    total_rec_size+= (mm_dyns + 2) >> 1;
+    total_rec_size+= 1;
+  }
+  /* Disk data varsize offset array (not currently used). */
+  if(dd_vars)
+    total_rec_size+= (dd_vars + 2) >> 1;
+  /* Room for the header. */
+  total_rec_size+= Tuple_header::HeaderSize;
+  if(regTabPtr->m_no_of_disk_attributes)
+    total_rec_size+= Tuple_header::HeaderSize;
+  regTabPtr->total_rec_size= total_rec_size;
+
+  setUpQueryRoutines(regTabPtr);
+  setUpKeyArray(regTabPtr);
+}
+
+void
 Dbtup::undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused)
 {
   FragrecordPtr regFragPtr;
@@ -700,6 +1219,16 @@
   regTabPtr->m_real_order_descriptor = descriptorReference + offset[5];
 }
 
+void Dbtup::setupDynDescriptorReferences(Uint32 dynDescr,
+                                         Tablerec* const regTabPtr,
+                                         const Uint32* offset)
+{
+  regTabPtr->dynTabDescriptor= dynDescr;
+  Uint32* desc= &tableDescriptor[dynDescr].tabDescr;
+  regTabPtr->dynVarSizeMask= desc+offset[0];
+  regTabPtr->dynFixSizeMask= desc+offset[1];
+}
+
 Uint32
 Dbtup::sizeOfReadFunction()
 {
@@ -729,6 +1258,8 @@
 
   /**
    * Setup real order array (16 bit per column)
+   *
+   * Sequence is [mm_fix mm_var mm_dynfix mm_dynvar dd_fix]
    */
   const Uint32 off= regTabPtr->m_real_order_descriptor;
   const Uint32 sz= (regTabPtr->m_no_of_attributes + 1) >> 1;
@@ -736,7 +1267,7 @@
   
   Uint32 cnt= 0;
   Uint16* order= (Uint16*)&tableDescriptor[off].tabDescr;
-  for (Uint32 type = 0; type < 4; type++)
+  for (Uint32 type = 0; type < 5; type++)
   {
     for (Uint32 i= 0; i < regTabPtr->m_no_of_attributes; i++) 
     {
@@ -745,15 +1276,37 @@
       Uint32 desc = getTabDescrWord(refAttr);
       Uint32 t = 0;
 
-      if (AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED) 
+      if (AttributeDescriptor::getDynamic(desc) &&
+          AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+          AttributeDescriptor::getSize(desc) == 0)
+      {
+        /*
+          Dynamic bit types are stored inside the dynamic NULL bitmap, and are
+          never expanded. So we do not need any real_order_descriptor for
+          them.
+        */
+        jam();
+        if(type==0)
+          cnt++;
+        continue;
+      }
+
+      if (AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED ||
+          (AttributeDescriptor::getDynamic(desc) &&
+           AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
+           AttributeDescriptor::getSizeInWords(desc) > InternalMaxDynFix))
       {
 	t += 1;
       }
-      if (AttributeDescriptor::getDiskBased(desc))
+      if (AttributeDescriptor::getDynamic(desc)) 
       {
 	t += 2;
       }
-      ndbrequire(t < 4);
+      if (AttributeDescriptor::getDiskBased(desc))
+      {
+	t += 4;
+      }
+      ndbrequire(t < 5);              // Disk data currently only static/fixed
       if(t == type)
       {
 	* order++ = i << ZAD_LOG_SIZE;
@@ -832,6 +1385,12 @@
   cfirstfreeFragopr = fragOperPtr.i;
 }//Dbtup::releaseFragoperrec()
 
+void Dbtup::releaseAlterTabOpRec(AlterTabOperationPtr regAlterTabOpPtr)
+{
+  regAlterTabOpPtr.p->nextAlterTabOp= cfirstfreeAlterTabOp;
+  cfirstfreeAlterTabOp= regAlterTabOpPtr.i;
+}
+
 void Dbtup::deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId) 
 {
   for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
@@ -891,7 +1450,10 @@
   if (descriptor != RNIL) {
     jam();
     Uint32 offset[10];
-    getTabDescrOffsets(regTabPtr, offset);
+    getTabDescrOffsets(regTabPtr->m_no_of_attributes,
+                       regTabPtr->noOfCharsets,
+                       regTabPtr->noOfKeyAttr,
+                       offset);
 
     regTabPtr->tabDescriptor= RNIL;
     regTabPtr->readKeyArray= RNIL;
@@ -901,6 +1463,21 @@
 
     // move to start of descriptor
     descriptor -= offset[3];
+    Uint32 retNo= getTabDescrWord(descriptor + ZTD_DATASIZE);
+    ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL);
+    ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE));
+    ndbrequire(ZTD_TYPE_NORMAL ==
+               getTabDescrWord((descriptor + retNo) - ZTD_TR_TYPE));
+    freeTabDescr(descriptor, retNo);
+  }
+
+  descriptor= regTabPtr->dynTabDescriptor;
+  if(descriptor != RNIL)
+  {
+    jam();
+    regTabPtr->dynTabDescriptor= RNIL;
+    regTabPtr->dynVarSizeMask= NULL;
+    regTabPtr->dynFixSizeMask= NULL;
     Uint32 retNo= getTabDescrWord(descriptor + ZTD_DATASIZE);
     ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL);
     ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE));

--- 1.34/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2007-04-03 14:34:27 +02:00
+++ 1.35/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2007-04-03 14:34:27 +02:00
@@ -140,19 +140,70 @@
 	regTabPtr->readFunctionArray[i]= r[a+b];
 	regTabPtr->updateFunctionArray[i]= u[a+b];
       }
+    } else {
+      if (AttributeDescriptor::getNullable(attrDescr)) {
+        if (AttributeDescriptor::getArrayType(attrDescr) == NDB_ARRAYTYPE_FIXED){
+          if (AttributeDescriptor::getSize(attrDescr) == 0){
+            jam(); 
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynBitsNULLable;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynBitsNULLable;
+          } else if (AttributeDescriptor::getSizeInWords(attrDescr)>InternalMaxDynFix) {
+            jam();
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynBigFixedSizeNULLable;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynBigFixedSizeNULLable;
+          } else {
+            jam();
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynFixedSizeNULLable;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynFixedSizeNULLable;
+          }
+        } else {
+          regTabPtr->readFunctionArray[i]= &Dbtup::readDynVarSizeNULLable;
+          regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynVarSizeNULLable;
+        }
       } else {
-      if (AttributeDescriptor::getArrayType(attrDescr) == NDB_ARRAYTYPE_FIXED){
-        jam();
-        regTabPtr->readFunctionArray[i]= &Dbtup::readDynFixedSize;
-        regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynFixedSize;
-      } else {
-        regTabPtr->readFunctionArray[i]= &Dbtup::readDynVarSize;
-        regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynVarSize;
+        if (AttributeDescriptor::getArrayType(attrDescr) == NDB_ARRAYTYPE_FIXED){
+          if (AttributeDescriptor::getSize(attrDescr) == 0){
+            jam(); 
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynBitsNotNULL;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynBitsNotNULL;
+          } else if (AttributeDescriptor::getSizeInWords(attrDescr)>InternalMaxDynFix) {
+            jam();
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynBigFixedSizeNotNULL;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynBigFixedSizeNotNULL;
+          } else {
+            jam();
+            regTabPtr->readFunctionArray[i]= &Dbtup::readDynFixedSizeNotNULL;
+            regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynFixedSizeNotNULL;
+          }
+        } else {
+          jam();
+	  regTabPtr->readFunctionArray[i]= &Dbtup::readDynVarSizeNotNULL;
+          regTabPtr->updateFunctionArray[i]= &Dbtup::updateDynVarSizeNotNULL;
+        }
       }
     }
   }
 }
 
+/* Dump a byte buffer, for debugging. */
+static void dump_buf_hex(unsigned char *p, Uint32 bytes)
+{
+  char buf[3001];
+  char *q= buf;
+  buf[0]= '\0';
+
+  for(Uint32 i=0; i<bytes; i++)
+  {
+    if(i==((sizeof(buf)/3)-1))
+    {
+      sprintf(q, "...");
+      break;
+    }
+    sprintf(q+3*i, " %02X", p[i]);
+  }
+  ndbout_c("%8p: %s", p, buf);
+}
+
 /* ---------------------------------------------------------------- */
 /*       THIS ROUTINE IS USED TO READ A NUMBER OF ATTRIBUTES IN THE */
 /*       DATABASE AND PLACE THE RESULT IN ATTRINFO RECORDS.         */
@@ -449,21 +500,20 @@
   return BitmaskImpl::get(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
 }
 
+/* Shared code for reading static varsize and expanded dynamic attributes. */
 bool
-Dbtup::readVarSizeNotNULL(Uint32* out_buffer,
-                          KeyReqStruct *req_struct,
-                          AttributeHeader* ah_out,
-                          Uint32  attr_des2)
+Dbtup::varsize_reader(Uint32* out_buffer,
+                      KeyReqStruct *req_struct,
+                      AttributeHeader* ah_out,
+                      Uint32  attr_des2,
+                      char * src_ptr,
+                      Uint32 vsize_in_bytes)
 {
-  Uint32 attr_descriptor, index_buf, var_index;
-  Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
-  Uint32 var_attr_pos, max_read;
+  Uint32 attr_descriptor, index_buf;
+  Uint32 vsize_in_words, new_index, max_var_size;
+  Uint32 max_read;
 
-  Uint32 idx= req_struct->m_var_data[MM].m_var_len_offset;
-  var_index= AttributeOffset::getOffset(attr_des2);
   Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attr_des2);
-  var_attr_pos= req_struct->m_var_data[MM].m_offset_array_ptr[var_index];
-  vsize_in_bytes= req_struct->m_var_data[MM].m_offset_array_ptr[var_index+idx] - var_attr_pos;
   attr_descriptor= req_struct->attr_descriptor;
   index_buf= req_struct->out_buf_index;
   max_var_size= AttributeDescriptor::getSizeInWords(attr_descriptor);
@@ -478,9 +528,7 @@
       jam();
       ah_out->setByteSize(vsize_in_bytes);
       out_buffer[index_buf + (vsize_in_bytes >> 2)] = 0;
-      memcpy(out_buffer+index_buf,
-	     req_struct->m_var_data[MM].m_data_ptr+var_attr_pos,
-	     vsize_in_bytes);
+      memcpy(out_buffer+index_buf, src_ptr, vsize_in_bytes);
       req_struct->out_buf_index= new_index;
       return true;
     }
@@ -492,7 +540,7 @@
     Uint32 maxBytes = AttributeDescriptor::getSizeInBytes(attr_descriptor);
     Uint32 srcBytes = vsize_in_bytes;
     uchar* dstPtr = (uchar*)(out_buffer+index_buf);
-    const uchar* srcPtr = (uchar*)(req_struct->m_var_data[MM].m_data_ptr+var_attr_pos);
+    const uchar* srcPtr = (uchar*)src_ptr;
     Uint32 i = AttributeOffset::getCharsetPos(attr_des2);
     ndbrequire(i < regTabPtr->noOfCharsets);
     CHARSET_INFO* cs = regTabPtr->charsetArray[i];
@@ -526,6 +574,28 @@
 }
 
 bool
+Dbtup::readVarSizeNotNULL(Uint32* out_buffer,
+                          KeyReqStruct *req_struct,
+                          AttributeHeader* ah_out,
+                          Uint32  attr_des2)
+{
+  Uint32 var_index;
+  Uint32 vsize_in_bytes;
+  Uint32 var_attr_pos;
+  char *src_ptr;
+
+  var_index= AttributeOffset::getOffset(attr_des2);
+  Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attr_des2);
+  var_attr_pos= req_struct->m_var_data[MM].m_offset_array_ptr[var_index];
+  Uint32 idx= req_struct->m_var_data[MM].m_var_len_offset;
+  vsize_in_bytes=
+    req_struct->m_var_data[MM].m_offset_array_ptr[var_index+idx] - var_attr_pos;
+  src_ptr= req_struct->m_var_data[MM].m_data_ptr+var_attr_pos;
+  return varsize_reader(out_buffer, req_struct, ah_out, attr_des2,
+                        src_ptr, vsize_in_bytes);
+}
+
+bool
 Dbtup::readVarSizeNULLable(Uint32* outBuffer,
                            KeyReqStruct *req_struct,
                            AttributeHeader* ahOut,
@@ -545,28 +615,572 @@
 }
 
 bool
-Dbtup::readDynFixedSize(Uint32* outBuffer,
-                        KeyReqStruct *req_struct,
-                        AttributeHeader* ahOut,
-                        Uint32  attrDes2)
+Dbtup::readDynFixedSizeNotNULL(Uint32* outBuffer,
+                               KeyReqStruct *req_struct,
+                               AttributeHeader* ahOut,
+                               Uint32  attrDes2)
 {
   jam();
-  terrorCode= ZVAR_SIZED_NOT_SUPPORTED;
-  return false;
+  if(req_struct->is_expanded)
+    return readDynFixedSizeExpandedNotNULL(outBuffer, req_struct,
+                                           ahOut, attrDes2);
+  else
+    return readDynFixedSizeShrunkenNotNULL(outBuffer, req_struct,
+                                           ahOut, attrDes2);
 }
 
 bool
-Dbtup::readDynVarSize(Uint32* outBuffer,
-                      KeyReqStruct *req_struct,
-                      AttributeHeader* ahOut,
-                      Uint32  attrDes2)
+Dbtup::readDynFixedSizeNULLable(Uint32* outBuffer,
+                                KeyReqStruct *req_struct,
+                                AttributeHeader* ahOut,
+                                Uint32  attrDes2)
 {
   jam();
-  terrorCode= ZVAR_SIZED_NOT_SUPPORTED;
-  return false;
+  if(req_struct->is_expanded)
+    return readDynFixedSizeExpandedNULLable(outBuffer, req_struct,
+                                            ahOut, attrDes2);
+  else
+    return readDynFixedSizeShrunkenNULLable(outBuffer, req_struct,
+                                            ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynFixedSizeExpandedNotNULL(Uint32* outBuffer,
+                                       KeyReqStruct *req_struct,
+                                       AttributeHeader* ahOut,
+                                       Uint32  attrDes2)
+{
+  /*
+    In the expanded format, we share the read code with static varsized, just
+    using different data base pointer and offset/lenght arrays.
+  */
+  jam();
+  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 var_index= AttributeOffset::getOffset(attrDes2);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 var_attr_pos= off_arr[var_index];
+  Uint32 vsize_in_bytes=
+    AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        src_ptr + var_attr_pos, vsize_in_bytes);
+}
+
+bool
+Dbtup::readDynFixedSizeExpandedNULLable(Uint32* outBuffer,
+                                        KeyReqStruct *req_struct,
+                                        AttributeHeader* ahOut,
+                                        Uint32  attrDes2)
+{
+  /*
+    Check for NULL. In the expanded format, the bitmap is guaranteed
+    to be stored in full length.
+  */
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynFixedSizeExpandedNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynFixedSizeShrunkenNotNULL(Uint32* outBuffer,
+                                       KeyReqStruct *req_struct,
+                                       AttributeHeader* ahOut,
+                                       Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  ndbrequire(dyn_len != 0);
+  Uint32 bm_len= (* bm_ptr) & DYN_BM_LEN_MASK; // In 32-bit words
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  ndbrequire(BitmaskImpl::get(bm_len, bm_ptr, pos));
+
+  /*
+    The attribute is not NULL. Now to get the data offset, we count the number
+    of bits set in the bitmap for fixed-size dynamic attributes prior to this
+    attribute. Since there is one bit for each word of fixed-size attribute,
+    and since fixed-size attributes are stored word-aligned backwards from the
+    end of the row, this gives the distance in words from the row end to the
+    end of the data for this attribute.
+
+    We use a pre-computed bitmask to mask away all bits for fixed-sized
+    dynamic attributes, and we also mask away the initial bitmap length byte and
+    any trailing non-bitmap bytes to save a few conditionals.
+  */
+  jam();
+  Tablerec* regTabPtr = tabptr.p;
+  Uint32 *bm_mask_ptr= regTabPtr->dynFixSizeMask;
+  Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
+  Uint32 prevMask= (1 << (pos & 31)) - 1;
+  Uint32 bit_count= count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
+  for(Uint32 i=0; i<bm_pos; i++)
+    bit_count+= count_bits(bm_mask_ptr[i] & bm_ptr[i]);
+
+  /* Now compute the data pointer from the row length. */
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
+  Uint32 vsize_in_words= (vsize_in_bytes+3)>>2;
+  Uint32 *data_ptr= bm_ptr + dyn_len - bit_count - vsize_in_words;
+
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        (char *)data_ptr, vsize_in_bytes);
+}
+
+static
+inline
+bool
+dynCheckNull(Uint32 totlen, Uint32 bm_len, const Uint32* bm_ptr, Uint32 pos)
+{
+  return  totlen == 0 || !(bm_len > (pos >> 5)) || 
+    !BitmaskImpl::get(bm_len, bm_ptr, pos);
+}
+
+bool
+Dbtup::readDynFixedSizeShrunkenNULLable(Uint32* outBuffer,
+                                        KeyReqStruct *req_struct,
+                                        AttributeHeader* ahOut,
+                                        Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  /* Check for NULL (including the case of an empty bitmap). */
+  if(dynCheckNull(dyn_len, bm_len, bm_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynFixedSizeShrunkenNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBigFixedSizeNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct *req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynBigFixedSizeExpandedNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
+  else
+    return readDynBigFixedSizeShrunkenNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
 }//Dbtup::readDynBigVarSize()
 
 bool
+Dbtup::readDynBigFixedSizeNULLable(Uint32* outBuffer,
+                                   KeyReqStruct *req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynBigFixedSizeExpandedNULLable(outBuffer, req_struct,
+                                          ahOut, attrDes2);
+  else
+    return readDynBigFixedSizeShrunkenNULLable(outBuffer, req_struct,
+                                          ahOut, attrDes2);
+}//Dbtup::readDynBigVarSize()
+
+bool
+Dbtup::readDynBigFixedSizeExpandedNotNULL(Uint32* outBuffer,
+                                          KeyReqStruct *req_struct,
+                                          AttributeHeader* ahOut,
+                                          Uint32  attrDes2)
+{
+  /*
+    In the expanded format, we share the read code with static varsized, just
+    using different data base pointer and offset/lenght arrays.
+  */
+  jam();
+  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 var_index= AttributeOffset::getOffset(attrDes2);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 var_attr_pos= off_arr[var_index];
+  Uint32 vsize_in_bytes=
+    AttributeDescriptor::getSizeInBytes(req_struct->attr_descriptor);
+  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+  ndbrequire(vsize_in_bytes <= off_arr[var_index+idx] - var_attr_pos);
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        src_ptr + var_attr_pos, vsize_in_bytes);
+}
+
+bool
+Dbtup::readDynBigFixedSizeExpandedNULLable(Uint32* outBuffer,
+                                           KeyReqStruct *req_struct,
+                                           AttributeHeader* ahOut,
+                                           Uint32  attrDes2)
+{
+  /*
+    Check for NULL. In the expanded format, the bitmap is guaranteed
+    to be stored in full length.
+  */
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynBigFixedSizeExpandedNotNULL(outBuffer, req_struct,
+                                       ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBigFixedSizeShrunkenNotNULL(Uint32* outBuffer,
+                                          KeyReqStruct *req_struct,
+                                          AttributeHeader* ahOut,
+                                          Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  ndbrequire(dyn_len!=0);
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  ndbrequire(BitmaskImpl::get(bm_len, bm_ptr, pos));
+
+  /*
+    The attribute is not NULL. Now to get the data offset, we count the number
+    of varsize dynamic attributes prior to this one that are not NULL.
+
+    We use a pre-computed bitmask to mask away all bits for fixed-sized
+    dynamic attributes, and we also mask away the initial bitmap length byte and
+    any trailing non-bitmap bytes to save a few conditionals.
+  */
+  Tablerec* regTabPtr = tabptr.p;
+  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+  Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
+  Uint32 prevMask= (1 << (pos & 31)) - 1;
+  Uint32 bit_count= count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
+  for(Uint32 i=0; i<bm_pos; i++)
+    bit_count+= count_bits(bm_mask_ptr[i] & bm_ptr[i]);
+
+  /* Now find the data pointer and length from the offset array. */
+  Uint32 attr_descriptor= req_struct->attr_descriptor;
+  Uint32 vsize_in_bytes= AttributeDescriptor::getSizeInBytes(attr_descriptor);
+  //Uint16 *offset_array= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* offset_array = (Uint16*)(bm_ptr + bm_len);
+  Uint16 data_offset= offset_array[bit_count];
+  ndbrequire(vsize_in_bytes <= offset_array[bit_count+1] - data_offset);
+
+  /*
+    In the expanded format, we share the read code with static varsized, just
+    using different data base pointer and offset/lenght arrays.
+  */
+  jam();
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        ((char *)offset_array) + data_offset, vsize_in_bytes);
+}
+
+bool
+Dbtup::readDynBigFixedSizeShrunkenNULLable(Uint32* outBuffer,
+                                           KeyReqStruct *req_struct,
+                                           AttributeHeader* ahOut,
+                                           Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  /* Check for NULL (including the case of an empty bitmap). */
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(dynCheckNull(dyn_len, bm_len, bm_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynBigFixedSizeShrunkenNotNULL(outBuffer, req_struct,
+                                       ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBitsNotNULL(Uint32* outBuffer,
+                          KeyReqStruct *req_struct,
+                          AttributeHeader* ahOut,
+                          Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynBitsExpandedNotNULL(outBuffer, req_struct, ahOut, attrDes2);
+  else
+    return readDynBitsShrunkenNotNULL(outBuffer, req_struct, ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBitsNULLable(Uint32* outBuffer,
+                           KeyReqStruct *req_struct,
+                           AttributeHeader* ahOut,
+                           Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynBitsExpandedNULLable(outBuffer, req_struct, ahOut, attrDes2);
+  else
+    return readDynBitsShrunkenNULLable(outBuffer, req_struct, ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBitsShrunkenNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct* req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32 attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  ndbrequire(dyn_len != 0);
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 bitCount =
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  /* Make sure we have sufficient data in the row. */
+  ndbrequire((pos>>5)<bm_len);
+  /* The bit data is stored just before the NULL bit. */
+  ndbassert(pos>bitCount);
+  pos-= bitCount;
+
+  Uint32 indexBuf = req_struct->out_buf_index;
+  Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+  Uint32 maxRead = req_struct->max_read;
+  if (newIndexBuf <= maxRead) {
+    jam();
+    ahOut->setDataSize((bitCount + 31) >> 5);
+    req_struct->out_buf_index = newIndexBuf;
+
+    BitmaskImpl::getField(bm_len, bm_ptr, pos, bitCount, outBuffer+indexBuf);
+    return true;
+  } else {
+    jam();
+    terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    return false;
+  }//if
+}
+
+bool
+Dbtup::readDynBitsShrunkenNULLable(Uint32* outBuffer,
+                                   KeyReqStruct* req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32 attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  /* Check for NULL (including the case of an empty bitmap). */
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(dynCheckNull(dyn_len, bm_len, bm_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynBitsShrunkenNotNULL(outBuffer, req_struct, ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynBitsExpandedNotNULL(Uint32* outBuffer,
+                                  KeyReqStruct* req_struct,
+                                  AttributeHeader* ahOut,
+                                  Uint32 attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 bitCount =
+    AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  /* The bit data is stored just before the NULL bit. */
+  ndbassert(pos>bitCount);
+  pos-= bitCount;
+
+  Uint32 indexBuf = req_struct->out_buf_index;
+  Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+  Uint32 maxRead = req_struct->max_read;
+  if (newIndexBuf <= maxRead) {
+    jam();
+    ahOut->setDataSize((bitCount + 31) >> 5);
+    req_struct->out_buf_index = newIndexBuf;
+
+    BitmaskImpl::getField(bm_len, bm_ptr, pos, bitCount, outBuffer+indexBuf);
+    return true;
+  } else {
+    jam();
+    terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+    return false;
+  }//if
+}
+
+bool
+Dbtup::readDynBitsExpandedNULLable(Uint32* outBuffer,
+                                   KeyReqStruct* req_struct,
+                                   AttributeHeader* ahOut,
+                                   Uint32 attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(!BitmaskImpl::get((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynBitsExpandedNotNULL(outBuffer, req_struct, ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynVarSizeNotNULL(Uint32* outBuffer,
+                             KeyReqStruct *req_struct,
+                             AttributeHeader* ahOut,
+                             Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynVarSizeExpandedNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
+  else
+    return readDynVarSizeShrunkenNotNULL(outBuffer, req_struct,
+                                         ahOut, attrDes2);
+}//Dbtup::readDynBigVarSize()
+
+bool
+Dbtup::readDynVarSizeNULLable(Uint32* outBuffer,
+                              KeyReqStruct *req_struct,
+                              AttributeHeader* ahOut,
+                              Uint32  attrDes2)
+{
+  jam();
+  if(req_struct->is_expanded)
+    return readDynVarSizeExpandedNULLable(outBuffer, req_struct,
+                                          ahOut, attrDes2);
+  else
+    return readDynVarSizeShrunkenNULLable(outBuffer, req_struct,
+                                          ahOut, attrDes2);
+}//Dbtup::readDynBigVarSize()
+
+bool
+Dbtup::readDynVarSizeExpandedNotNULL(Uint32* outBuffer,
+                                     KeyReqStruct *req_struct,
+                                     AttributeHeader* ahOut,
+                                     Uint32  attrDes2)
+{
+  /*
+    In the expanded format, we share the read code with static varsized, just
+    using different data base pointer and offset/lenght arrays.
+  */
+  jam();
+  char *src_ptr= req_struct->m_var_data[MM].m_dyn_data_ptr;
+  Uint32 var_index= AttributeOffset::getOffset(attrDes2);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 var_attr_pos= off_arr[var_index];
+  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+  Uint32 vsize_in_bytes= off_arr[var_index+idx] - var_attr_pos;
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        src_ptr + var_attr_pos, vsize_in_bytes);
+}
+
+bool
+Dbtup::readDynVarSizeExpandedNULLable(Uint32* outBuffer,
+                                      KeyReqStruct *req_struct,
+                                      AttributeHeader* ahOut,
+                                      Uint32  attrDes2)
+{
+  /*
+    Check for NULL. In the expanded format, the bitmap is guaranteed
+    to be stored in full length.
+  */
+  Uint32 *src_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(!BitmaskImpl::get((* src_ptr) & DYN_BM_LEN_MASK, src_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynVarSizeExpandedNotNULL(outBuffer, req_struct,
+                                       ahOut, attrDes2);
+}
+
+bool
+Dbtup::readDynVarSizeShrunkenNotNULL(Uint32* outBuffer,
+                                     KeyReqStruct *req_struct,
+                                     AttributeHeader* ahOut,
+                                     Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  ndbrequire(dyn_len!=0);
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  ndbrequire(BitmaskImpl::get(bm_len, bm_ptr, pos));
+
+  /*
+    The attribute is not NULL. Now to get the data offset, we count the number
+    of varsize dynamic attributes prior to this one that are not NULL.
+
+    We use a pre-computed bitmask to mask away all bits for fixed-sized
+    dynamic attributes, and we also mask away the initial bitmap length byte and
+    any trailing non-bitmap bytes to save a few conditionals.
+  */
+  Tablerec* regTabPtr = tabptr.p;
+  Uint32 *bm_mask_ptr= regTabPtr->dynVarSizeMask;
+  Uint32 bm_pos= AttributeOffset::getNullFlagOffset(attrDes2);
+  Uint32 prevMask= (1 << (pos & 31)) - 1;
+  Uint32 bit_count= count_bits(prevMask & bm_mask_ptr[bm_pos] & bm_ptr[bm_pos]);
+  for(Uint32 i=0; i<bm_pos; i++)
+    bit_count+= count_bits(bm_mask_ptr[i] & bm_ptr[i]);
+
+  /* Now find the data pointer and length from the offset array. */
+  //Uint16* offset_array = req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint16* offset_array = (Uint16*)(bm_ptr + bm_len);
+  Uint16 data_offset= offset_array[bit_count];
+  Uint32 vsize_in_bytes= offset_array[bit_count+1] - data_offset;
+
+  /*
+    In the expanded format, we share the read code with static varsized, just
+    using different data base pointer and offset/lenght arrays.
+  */
+  jam();
+  return varsize_reader(outBuffer, req_struct, ahOut, attrDes2,
+                        ((char *)offset_array) + data_offset, vsize_in_bytes);
+}
+
+bool
+Dbtup::readDynVarSizeShrunkenNULLable(Uint32* outBuffer,
+                                      KeyReqStruct *req_struct,
+                                      AttributeHeader* ahOut,
+                                      Uint32  attrDes2)
+{
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 dyn_len= req_struct->m_var_data[MM].m_dyn_part_len;
+  /* Check for NULL (including the case of an empty bitmap). */
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+  if(dynCheckNull(dyn_len, bm_len, bm_ptr, pos))
+  {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+
+  return readDynVarSizeShrunkenNotNULL(outBuffer, req_struct,
+                                       ahOut, attrDes2);
+}
+
+bool
 Dbtup::readDiskFixedSizeNotNULL(Uint32* outBuffer,
 				KeyReqStruct *req_struct,
 				AttributeHeader* ahOut,
@@ -919,22 +1533,23 @@
 }
 
 bool
-Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
-                                        KeyReqStruct *req_struct,
-                                        Uint32  attrDes2)
+Dbtup::fixsize_updater(Uint32* inBuffer,
+                       KeyReqStruct *req_struct,
+                       Uint32  attrDes2,
+                       Uint32 *dst_ptr,
+                       Uint32 updateOffset,
+                       Uint32 checkOffset)
 {
   Uint32 attrDescriptor= req_struct->attr_descriptor;
   Uint32 indexBuf= req_struct->in_buf_index;
   Uint32 inBufLen= req_struct->in_buf_len;
-  Uint32 updateOffset= AttributeOffset::getOffset(attrDes2);
   Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
   
   AttributeHeader ahIn(inBuffer[indexBuf]);
   Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
   Uint32 nullIndicator= ahIn.isNULL();
   Uint32 newIndex= indexBuf + noOfWords + 1;
-  Uint32 *tuple_header= req_struct->m_tuple_ptr->m_data;
-  ndbrequire((updateOffset + noOfWords - 1) < req_struct->check_offset[MM]);
+  ndbrequire((updateOffset + noOfWords - 1) < checkOffset);
 
   if (newIndex <= inBufLen) {
     if (!nullIndicator) {
@@ -965,7 +1580,7 @@
         }
       }
       req_struct->in_buf_index= newIndex;
-      MEMCOPY_NO_WORDS(&tuple_header[updateOffset],
+      MEMCOPY_NO_WORDS(&(dst_ptr[updateOffset]),
                        &inBuffer[indexBuf + 1],
                        noOfWords);
       
@@ -983,6 +1598,18 @@
 }
 
 bool
+Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
+                                        KeyReqStruct *req_struct,
+                                        Uint32  attrDes2)
+{
+  Uint32 *tuple_header= req_struct->m_tuple_ptr->m_data;
+  Uint32 updateOffset= AttributeOffset::getOffset(attrDes2);
+  Uint32 checkOffset= req_struct->check_offset[MM];
+  return fixsize_updater(inBuffer, req_struct, attrDes2, tuple_header,
+                         updateOffset, checkOffset);
+}
+
+bool
 Dbtup::updateFixedSizeTHManyWordNULLable(Uint32* inBuffer,
                                          KeyReqStruct *req_struct,
                                          Uint32  attrDes2)
@@ -1019,48 +1646,57 @@
                             KeyReqStruct *req_struct,
                             Uint32 attr_des2)
 {
-  Uint32 attr_descriptor, index_buf, in_buf_len, var_index, null_ind;
+  Uint32 var_index;
+  char *var_data_start= req_struct->m_var_data[MM].m_data_ptr;
+  var_index= AttributeOffset::getOffset(attr_des2);
+  Uint32 idx= req_struct->m_var_data[MM].m_var_len_offset;
+  Uint16 *vpos_array= req_struct->m_var_data[MM].m_offset_array_ptr;
+  Uint16 offset= vpos_array[var_index];
+  Uint16 *len_offset_ptr= &(vpos_array[var_index+idx]);
+  return varsize_updater(in_buffer, req_struct, var_data_start,
+                         offset, len_offset_ptr,
+                         req_struct->m_var_data[MM].m_max_var_offset);
+}
+bool
+Dbtup::varsize_updater(Uint32* in_buffer,
+                       KeyReqStruct *req_struct,
+                       char *var_data_start,
+                       Uint32 var_attr_pos,
+                       Uint16 *len_offset_ptr,
+                       Uint32 check_offset)
+{
+  Uint32 attr_descriptor, index_buf, in_buf_len, null_ind;
   Uint32 vsize_in_words, new_index, max_var_size;
-  Uint32 var_attr_pos;
-  char *var_data_start;
-  Uint16 *vpos_array;
 
   attr_descriptor= req_struct->attr_descriptor;
   index_buf= req_struct->in_buf_index;
   in_buf_len= req_struct->in_buf_len;
-  var_index= AttributeOffset::getOffset(attr_des2);
   AttributeHeader ahIn(in_buffer[index_buf]);
   null_ind= ahIn.isNULL();
   Uint32 size_in_bytes = ahIn.getByteSize();
   vsize_in_words= (size_in_bytes + 3) >> 2;
   max_var_size= AttributeDescriptor::getSizeInBytes(attr_descriptor);
   new_index= index_buf + vsize_in_words + 1;
-  vpos_array= req_struct->m_var_data[MM].m_offset_array_ptr;
-  Uint32 idx= req_struct->m_var_data[MM].m_var_len_offset;
-  Uint32 check_offset= req_struct->m_var_data[MM].m_max_var_offset;
   
   if (new_index <= in_buf_len && vsize_in_words <= max_var_size) {
     if (!null_ind) {
       jam();
-      var_attr_pos= vpos_array[var_index];
-      var_data_start= req_struct->m_var_data[MM].m_data_ptr;
-      vpos_array[var_index+idx]= var_attr_pos+size_in_bytes;
+      *len_offset_ptr= var_attr_pos+size_in_bytes;
       req_struct->in_buf_index= new_index;
       
       ndbrequire(var_attr_pos+size_in_bytes <= check_offset);
       memcpy(var_data_start+var_attr_pos, &in_buffer[index_buf + 1],
 	     size_in_bytes);
       return true;
-    } else {
-      jam();
-      terrorCode= ZNOT_NULL_ATTR;
-      return false;
     }
-  } else {
+
     jam();
-    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    terrorCode= ZNOT_NULL_ATTR;
     return false;
   }
+
+  jam();
+  terrorCode= ZAI_INCONSISTENCY_ERROR;
   return false;
 }
 
@@ -1101,23 +1737,272 @@
 }
 
 bool
-Dbtup::updateDynFixedSize(Uint32* inBuffer,
-                          KeyReqStruct *req_struct,
-                          Uint32  attrDes2)
+Dbtup::updateDynFixedSizeNotNULL(Uint32* inBuffer,
+                                 KeyReqStruct *req_struct,
+                                 Uint32  attrDes2)
 {
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  ndbassert(nullbits && nullbits <= 16);
+
+  /*
+    Compute two 16-bit bitmasks and a 16-bit aligned bitmap offset for setting
+    all the null bits for the fixed-size dynamic attribute.
+    There are at most 16 bits (corresponding to 64 bytes fixsize; longer
+    attributes are stored more efficiently as varsize internally anyway).
+  */
+
+  Uint32 bm_idx= (pos >> 5);
+  /* Store bits in little-endian so fit with length byte and trailing padding*/
+  Uint64 bm_mask = ((Uint64(1) << nullbits) - 1) << (pos & 31);
+  Uint32 bm_mask1 = bm_mask & 0xFFFFFFFF;
+  Uint32 bm_mask2 = bm_mask >> 32;
+
   jam();
-  terrorCode= ZVAR_SIZED_NOT_SUPPORTED;
-  return false;
+  /* Set all the bits in the NULL bitmap. */
+  bm_ptr[bm_idx]|= bm_mask1;
+  /*
+    It is possible that bm_ptr[bm_idx+1] points off the end of the
+    bitmap. But in that case, we are merely ANDing all ones into the offset
+    array (no-op), cheaper than a conditional.
+  */
+  bm_ptr[bm_idx+1]|= bm_mask2;
+
+  /* Compute the data and offset location and write the actual data. */
+  Uint32 off_index= AttributeOffset::getOffset(attrDes2);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 offset= off_arr[off_index];
+  Uint32 *dst_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 check_offset= req_struct->m_var_data[MM].m_max_dyn_offset;
+
+  ndbassert((offset&3)==0);
+  ndbassert((check_offset&3)==0);
+  bool result= fixsize_updater(inBuffer, req_struct, attrDes2, dst_ptr,
+                               (offset>>2), (check_offset>>2));
+  return result; 
+}
+
+bool
+Dbtup::updateDynFixedSizeNULLable(Uint32* inBuffer,
+                                  KeyReqStruct *req_struct,
+                                  Uint32  attrDes2)
+{
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+
+  if(!nullIndicator)
+    return updateDynFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
+
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 nullbits= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+
+  ndbassert(nullbits && nullbits <= 16);
+
+  /*
+    Compute two 16-bit bitmasks and a 16-bit aligned bitmap offset for
+    clearing all the null bits for the fixed-size dynamic attribute.
+    There are at most 16 bits (corresponding to 64 bytes fixsize; longer
+    attributes are stored more efficiently as varsize internally anyway).
+  */
+
+  Uint32 bm_idx= (pos >> 5);
+  /* Store bits in little-endian so fit with length byte and trailing padding*/
+  Uint64 bm_mask = ~(((Uint64(1) << nullbits) - 1) << (pos & 31));
+  Uint32 bm_mask1 = bm_mask & 0xFFFFFFFF;
+  Uint32 bm_mask2 = bm_mask >> 32;
+  
+  Uint32 newIndex= req_struct->in_buf_index + 1;
+  if (newIndex <= req_struct->in_buf_len) {
+    jam();
+    /* Clear the bits in the NULL bitmap. */
+    bm_ptr[bm_idx] &= bm_mask1;
+    bm_ptr[bm_idx+1] &= bm_mask2;
+    req_struct->in_buf_index= newIndex;
+    return true;
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
 }
 
+/* Update a big dynamic fixed-size column, stored internally as varsize. */
 bool
-Dbtup::updateDynVarSize(Uint32* inBuffer,
-                        KeyReqStruct *req_struct,
-                        Uint32  attrDes2)
+Dbtup::updateDynBigFixedSizeNotNULL(Uint32* inBuffer,
+                                  KeyReqStruct *req_struct,
+                                  Uint32  attrDes2)
 {
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  
   jam();
-  terrorCode= ZVAR_SIZED_NOT_SUPPORTED;
-  return false;
+  BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
+  /* Compute the data and offset location and write the actual data. */
+  Uint32 off_index= AttributeOffset::getOffset(attrDes2);
+  Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 offset= off_arr[off_index];
+  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+
+  ndbassert((offset&3)==0);
+  bool res= fixsize_updater(inBuffer,
+                            req_struct,
+                            attrDes2,
+                            bm_ptr,
+                            offset>>2,
+                            req_struct->m_var_data[MM].m_max_dyn_offset);
+  /* Set the correct size for fixsize data. */
+  off_arr[off_index+idx]= offset+(noOfWords<<2);
+  return res;
+}
+
+bool
+Dbtup::updateDynBigFixedSizeNULLable(Uint32* inBuffer,
+                                   KeyReqStruct *req_struct,
+                                   Uint32  attrDes2)
+{
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  
+  if (!nullIndicator)
+    return updateDynBigFixedSizeNotNULL(inBuffer, req_struct, attrDes2);
+
+  Uint32 newIndex= req_struct->in_buf_index + 1;
+  if (newIndex <= req_struct->in_buf_len) {
+    jam();
+    BitmaskImpl::clear((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
+    req_struct->in_buf_index= newIndex;
+    return true;
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::updateDynBitsNotNULL(Uint32* inBuffer,
+                            KeyReqStruct *req_struct,
+                            Uint32  attrDes2)
+{
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+  Uint32 *bm_ptr= (Uint32 *)(req_struct->m_var_data[MM].m_dyn_data_ptr);
+  Uint32 bm_len = (* bm_ptr) & DYN_BM_LEN_MASK;
+  jam();
+  BitmaskImpl::set(bm_len, bm_ptr, pos);
+
+  Uint32 indexBuf= req_struct->in_buf_index;
+  Uint32 inBufLen= req_struct->in_buf_len;
+  AttributeHeader ahIn(inBuffer[indexBuf]);
+  Uint32 nullIndicator = ahIn.isNULL();
+  Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+
+  if (newIndex <= inBufLen) {
+    if (!nullIndicator) {
+      ndbassert(pos>=bitCount);
+      BitmaskImpl::setField(bm_len, bm_ptr, pos-bitCount, bitCount, 
+			    inBuffer+indexBuf+1);
+      req_struct->in_buf_index= newIndex;
+      return true;
+    } else {
+      jam();
+      terrorCode= ZNOT_NULL_ATTR;
+      return false;
+    }//if
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }//if
+  return true;
+}
+
+bool
+Dbtup::updateDynBitsNULLable(Uint32* inBuffer,
+                             KeyReqStruct *req_struct,
+                             Uint32  attrDes2)
+{
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+
+  if(!nullIndicator)
+    return updateDynBitsNotNULL(inBuffer, req_struct, attrDes2);
+
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+
+  Uint32 newIndex= req_struct->in_buf_index + 1;
+  if (newIndex <= req_struct->in_buf_len) {
+    jam();
+    BitmaskImpl::clear((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
+    req_struct->in_buf_index= newIndex;
+    return true;
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::updateDynVarSizeNotNULL(Uint32* inBuffer,
+                               KeyReqStruct *req_struct,
+                               Uint32  attrDes2)
+{
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  
+  jam();
+  BitmaskImpl::set((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
+  /* Compute the data and offset location and write the actual data. */
+  Uint32 off_index= AttributeOffset::getOffset(attrDes2);
+  Uint16* off_arr= req_struct->m_var_data[MM].m_dyn_offset_arr_ptr;
+  Uint32 offset= off_arr[off_index];
+  Uint32 idx= req_struct->m_var_data[MM].m_dyn_len_offset;
+
+  bool res= varsize_updater(inBuffer,
+                            req_struct,
+                            (char*)bm_ptr,
+                            offset,
+                            &(off_arr[off_index+idx]),
+                            req_struct->m_var_data[MM].m_max_dyn_offset);
+  return res;
+}
+
+bool
+Dbtup::updateDynVarSizeNULLable(Uint32* inBuffer,
+                                KeyReqStruct *req_struct,
+                                Uint32  attrDes2)
+{
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bm_ptr= (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
+  
+  if (!nullIndicator)
+    return updateDynVarSizeNotNULL(inBuffer, req_struct, attrDes2);
+
+  Uint32 newIndex= req_struct->in_buf_index + 1;
+  if (newIndex <= req_struct->in_buf_len) {
+    jam();
+    BitmaskImpl::clear((* bm_ptr) & DYN_BM_LEN_MASK, bm_ptr, pos);
+    req_struct->in_buf_index= newIndex;
+    return true;
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
 }
 
 Uint32 

--- 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2007-04-03 14:34:27 +02:00
+++ 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp	2007-04-03 14:34:27 +02:00
@@ -32,7 +32,8 @@
  */
 
 Uint32
-Dbtup::getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset)
+Dbtup::getTabDescrOffsets(Uint32 noOfAttrs, Uint32 noOfCharsets,
+                          Uint32 noOfKeyAttr, Uint32* offset)
 {
   // belongs to configure.in
   unsigned sizeOfPointer = sizeof(CHARSET_INFO*);
@@ -42,21 +43,33 @@
   Uint32 allocSize = 0;
   // magically aligned to 8 bytes
   offset[0] = allocSize += ZTD_SIZE;
-  offset[1] = allocSize += regTabPtr->m_no_of_attributes* sizeOfReadFunction();
-  offset[2] = allocSize += regTabPtr->m_no_of_attributes* sizeOfReadFunction();
-  offset[3] = allocSize += regTabPtr->noOfCharsets * sizeOfPointer;
-  offset[4] = allocSize += regTabPtr->noOfKeyAttr;
-  offset[5] = allocSize += regTabPtr->m_no_of_attributes * ZAD_SIZE;
-  offset[6] = allocSize += (regTabPtr->m_no_of_attributes + 1) >> 1; // real order
+  offset[1] = allocSize += noOfAttrs * sizeOfReadFunction();
+  offset[2] = allocSize += noOfAttrs * sizeOfReadFunction();
+  offset[3] = allocSize += noOfCharsets * sizeOfPointer;
+  offset[4] = allocSize += noOfKeyAttr;
+  offset[5] = allocSize += noOfAttrs * ZAD_SIZE;
+  offset[6] = allocSize += (noOfAttrs+1) >> 1;  // real order
   allocSize += ZTD_TRAILER_SIZE;
   // return number of words
   return allocSize;
 }
 
-Uint32 Dbtup::allocTabDescr(const Tablerec* regTabPtr, Uint32* offset)
+Uint32
+Dbtup::getDynTabDescrOffsets(Uint32 MaskSize, Uint32* offset)
+{
+  // do in layout order and return offsets (see DbtupMeta.cpp)
+  Uint32 allocSize= 0;
+  offset[0]= allocSize += ZTD_SIZE;
+  offset[1]= allocSize += MaskSize;
+  offset[2]= allocSize += MaskSize;
+  allocSize+= ZTD_TRAILER_SIZE;
+  // return number of words
+  return allocSize;
+}
+
+Uint32 Dbtup::allocTabDescr(Uint32 allocSize)
 {
   Uint32 reference = RNIL;
-  Uint32 allocSize = getTabDescrOffsets(regTabPtr, offset);
 /* ---------------------------------------------------------------- */
 /*       ALWAYS ALLOCATE A MULTIPLE OF 16 WORDS                     */
 /* ---------------------------------------------------------------- */
@@ -277,7 +290,10 @@
       ptrAss(ptr, tablerec);
       if (ptr.p->tableStatus == DEFINED) {
         Uint32 offset[10];
-        const Uint32 alloc = getTabDescrOffsets(ptr.p, offset);
+        const Uint32 alloc = getTabDescrOffsets(ptr.p->m_no_of_attributes,
+                                                ptr.p->noOfCharsets,
+                                                ptr.p->noOfKeyAttr,
+                                                offset);
         const Uint32 desc = ptr.p->readKeyArray - offset[3];
         Uint32 size = alloc;
         if (size % ZTD_FREE_SIZE != 0)

--- 1.11/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-04-03 14:34:27 +02:00
+++ 1.12/storage/ndb/src/kernel/vm/DLFifoList.hpp	2007-04-03 14:34:27 +02:00
@@ -82,14 +82,14 @@
   /**
    * Update ptr to first element in list
    *
-   * Return i
+   * Return true if ok, false if empty
    */
   bool first(Ptr<T> &) const ;
 
   /**
    * Update ptr to first element in list
    *
-   * Return i
+   * Return true if ok, false if empty
    */
   bool last(Ptr<T> &) const ;
 

--- 1.70/storage/ndb/src/ndbapi/NdbDictionary.cpp	2007-04-03 14:34:27 +02:00
+++ 1.71/storage/ndb/src/ndbapi/NdbDictionary.cpp	2007-04-03 14:34:27 +02:00
@@ -291,6 +291,16 @@
   return (StorageType)m_impl.m_storageType;
 }
 
+void 
+NdbDictionary::Column::setDynamic(bool val){
+  m_impl.m_dynamic = val;
+}
+
+bool 
+NdbDictionary::Column::getDynamic() const {
+  return m_impl.m_dynamic;
+}
+
 /*****************************************************************
  * Table facade
  */
@@ -1442,8 +1452,9 @@
 }
 
 int
-NdbDictionary::Dictionary::alterTable(const Table & t){
-  return m_impl.alterTable(NdbTableImpl::getImpl(t));
+NdbDictionary::Dictionary::alterTable(const Table & f, const Table & t)
+{
+  return m_impl.alterTable(NdbTableImpl::getImpl(f), NdbTableImpl::getImpl(t));
 }
 
 int
@@ -1902,6 +1913,9 @@
     out << " ST=" << (int)col.getStorageType() << "?";
     break;
   }
+
+  if(col.getDynamic())
+    out << " DYNAMIC";
 
   return out;
 }

--- 1.166/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-04-03 14:34:27 +02:00
+++ 1.167/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-04-03 14:34:27 +02:00
@@ -114,6 +114,7 @@
   m_arraySize = col.m_arraySize;
   m_arrayType = col.m_arrayType;
   m_storageType = col.m_storageType;
+  m_dynamic = col.m_dynamic;
   m_keyInfoPos = col.m_keyInfoPos;
   if (col.m_blobTable == NULL)
     m_blobTable = NULL;
@@ -260,6 +261,7 @@
   m_autoIncrementInitialValue = 1;
   m_blobTable = NULL;
   m_storageType = NDB_STORAGETYPE_MEMORY;
+  m_dynamic = false;
 #ifdef VM_TRACE
   if(NdbEnv_GetEnv("NDB_DEFAULT_DISK", (char *)0, 0))
     m_storageType = NDB_STORAGETYPE_DISK;
@@ -314,6 +316,9 @@
   if (m_arrayType != col.m_arrayType || m_storageType != col.m_storageType){
     DBUG_RETURN(false);
   }
+  if(m_dynamic != col.m_dynamic){
+    DBUG_RETURN(false);
+  }
 
   DBUG_RETURN(true);
 }
@@ -424,7 +429,6 @@
 
 void
 NdbTableImpl::init(){
-  m_changeMask= 0;
   m_id= RNIL;
   m_version = ~0;
   m_status = NdbDictionary::Object::Invalid;
@@ -432,18 +436,12 @@
   m_primaryTableId= RNIL;
   m_internalName.clear();
   m_externalName.clear();
-  m_newExternalName.clear();
   m_mysqlName.clear();
   m_frm.clear();
-  m_newFrm.clear();
   m_ts_name.clear();
-  m_new_ts_name.clear();
   m_ts.clear();
-  m_new_ts.clear();
   m_fd.clear();
-  m_new_fd.clear();
   m_range.clear();
-  m_new_range.clear();
   m_fragmentType= NdbDictionary::Object::FragAllSmall;
   m_hashValueMask= 0;
   m_hashpointerValue= 0;
@@ -681,26 +679,15 @@
 {
   DBUG_ENTER("NdbColumnImpl::assign");
   DBUG_PRINT("info", ("this: %p  &org: %p", this, &org));
-  /* m_changeMask intentionally not copied */
   m_primaryTableId = org.m_primaryTableId;
   m_internalName.assign(org.m_internalName);
   updateMysqlName();
-  // If the name has been explicitly set, use that name
-  // otherwise use the fetched name
-  if (!org.m_newExternalName.empty())
-    m_externalName.assign(org.m_newExternalName);
-  else
-    m_externalName.assign(org.m_externalName);
+  m_externalName.assign(org.m_externalName);
   m_frm.assign(org.m_frm.get_data(), org.m_frm.length());
   m_ts_name.assign(org.m_ts_name.get_data(), org.m_ts_name.length());
-  m_new_ts_name.assign(org.m_new_ts_name.get_data(),
-                       org.m_new_ts_name.length());
   m_ts.assign(org.m_ts.get_data(), org.m_ts.length());
-  m_new_ts.assign(org.m_new_ts.get_data(), org.m_new_ts.length());
   m_fd.assign(org.m_fd.get_data(), org.m_fd.length());
-  m_new_fd.assign(org.m_new_fd.get_data(), org.m_new_fd.length());
   m_range.assign(org.m_range.get_data(), org.m_range.length());
-  m_new_range.assign(org.m_new_range.get_data(), org.m_new_range.length());
 
   m_fragmentType = org.m_fragmentType;
   /*
@@ -765,16 +752,13 @@
 
 void NdbTableImpl::setName(const char * name)
 {
-  m_newExternalName.assign(name);
+  m_externalName.assign(name);
 }
 
 const char * 
 NdbTableImpl::getName() const
 {
-  if (m_newExternalName.empty())
-    return m_externalName.c_str();
-  else
-    return m_newExternalName.c_str();
+  return m_externalName.c_str();
 }
 
 void
@@ -849,24 +833,18 @@
 const void*
 NdbTableImpl::getTablespaceNames() const
 {
-  if (m_new_ts_name.empty())
-    return m_ts_name.get_data();
-  else
-    return m_new_ts_name.get_data();
+  return m_ts_name.get_data();
 }
 
 Uint32
 NdbTableImpl::getTablespaceNamesLen() const
 {
-  if (m_new_ts_name.empty())
-    return m_ts_name.length();
-  else
-    return m_new_ts_name.length();
+  return m_ts_name.length();
 }
 
 void NdbTableImpl::setTablespaceNames(const void *data, Uint32 len)
 {
-  m_new_ts_name.assign(data, len);
+  m_ts_name.assign(data, len);
 }
 
 void NdbTableImpl::setFragmentCount(Uint32 count)
@@ -881,94 +859,70 @@
 
 void NdbTableImpl::setFrm(const void* data, Uint32 len)
 {
-  m_newFrm.assign(data, len);
+  m_frm.assign(data, len);
 }
 
 const void * 
 NdbTableImpl::getFrmData() const
 {
-  if (m_newFrm.empty())
-    return m_frm.get_data();
-  else
-    return m_newFrm.get_data();
+  return m_frm.get_data();
 }
 
 Uint32
 NdbTableImpl::getFrmLength() const 
 {
-  if (m_newFrm.empty())
-    return m_frm.length();
-  else
-    return m_newFrm.length();
+  return m_frm.length();
 }
 
 void NdbTableImpl::setFragmentData(const void* data, Uint32 len)
 {
-  m_new_fd.assign(data, len);
+  m_fd.assign(data, len);
 }
 
 const void * 
 NdbTableImpl::getFragmentData() const
 {
-  if (m_new_fd.empty())
-    return m_fd.get_data();
-  else
-    return m_new_fd.get_data();
+  return m_fd.get_data();
 }
 
 Uint32
 NdbTableImpl::getFragmentDataLen() const 
 {
-  if (m_new_fd.empty())
-    return m_fd.length();
-  else
-    return m_new_fd.length();
+  return m_fd.length();
 }
 
 void NdbTableImpl::setTablespaceData(const void* data, Uint32 len)
 {
-  m_new_ts.assign(data, len);
+  m_ts.assign(data, len);
 }
 
 const void * 
 NdbTableImpl::getTablespaceData() const
 {
-  if (m_new_ts.empty())
-    return m_ts.get_data();
-  else
-    return m_new_ts.get_data();
+  return m_ts.get_data();
 }
 
 Uint32
 NdbTableImpl::getTablespaceDataLen() const 
 {
-  if (m_new_ts.empty())
-    return m_ts.length();
-  else
-    return m_new_ts.length();
+  return m_ts.length();
 }
 
 void NdbTableImpl::setRangeListData(const void* data, Uint32 len)
 {
-  m_new_range.assign(data, len);
+  m_range.assign(data, len);
 }
 
 const void * 
 NdbTableImpl::getRangeListData() const
 {
-  if (m_new_range.empty())
-    return m_range.get_data();
-  else
-    return m_new_range.get_data();
+  return m_range.get_data();
 }
 
 Uint32
 NdbTableImpl::getRangeListDataLen() const 
 {
-  if (m_new_range.empty())
-    return m_range.length();
-  else
-    return m_new_range.length();
+  return m_range.length();
 }
 
 void
@@ -2182,6 +2136,7 @@
       col->m_arraySize = (attrDesc.AttributeArraySize + 31) >> 5;
     }
     col->m_storageType = attrDesc.AttributeStorageType;
+    col->m_dynamic = (attrDesc.AttributeDynamic != 0);
     
     col->m_pk = attrDesc.AttributeKeyFlag;
     col->m_distributionKey = (attrDesc.AttributeDKey != 0);
@@ -2255,10 +2210,6 @@
 { 
   DBUG_ENTER("NdbDictionaryImpl::createTable");
 
-  // if the new name has not been set, use the copied name
-  if (t.m_newExternalName.empty())
-    t.m_newExternalName.assign(t.m_externalName);
-
   // create table
   if (m_receiver.createTable(m_ndb, t) != 0)
     DBUG_RETURN(-1);
@@ -2354,33 +2305,25 @@
 NdbDictInterface::createTable(Ndb & ndb,
 			      NdbTableImpl & impl)
 {
+  int ret;
+
   DBUG_ENTER("NdbDictInterface::createTable");
-  DBUG_RETURN(createOrAlterTable(ndb, impl, false));
-}
 
-int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
-{
-  BaseString internalName(impl.m_internalName);
-  const char * originalInternalName = internalName.c_str();
 
-  DBUG_ENTER("NdbDictionaryImpl::alterTable");
-  Ndb_local_table_info * local = 0;
-  if((local= get_local_table_info(originalInternalName)) == 0)
-  {
-    m_error.code = 709;
-    DBUG_RETURN(-1);
-  }
+  syncInternalName(ndb, impl);
 
-  // Alter the table
-  int ret = alterTableGlobal(*local->m_table_impl, impl);
-  if(ret == 0)
-  {
-    m_globalHash->lock();
-    m_globalHash->release(local->m_table_impl, 1);
-    m_globalHash->unlock();
-    m_localHash.drop(originalInternalName);
-  }
-  DBUG_RETURN(ret);
+  UtilBufferWriter w(m_buffer);
+  ret= serializeTableDesc(ndb, impl, w);
+  if(ret != 0)
+    DBUG_RETURN(ret);
+
+  DBUG_RETURN(sendCreateTable(impl, w));
+}
+
+int NdbDictionaryImpl::alterTable(NdbTableImpl &old_impl,
+                                  NdbTableImpl &impl)
+{
+  return alterTableGlobal(old_impl, impl);
 }
 
 int NdbDictionaryImpl::alterTableGlobal(NdbTableImpl &old_impl,
@@ -2388,7 +2331,7 @@
 {
   DBUG_ENTER("NdbDictionaryImpl::alterTableGlobal");
   // Alter the table
-  int ret = m_receiver.alterTable(m_ndb, impl);
+  int ret = m_receiver.alterTable(m_ndb, old_impl, impl);
   old_impl.m_status = NdbDictionary::Object::Invalid;
   if(ret == 0){
     DBUG_RETURN(ret);
@@ -2396,22 +2339,151 @@
   ERR_RETURN(getNdbError(), ret);
 }
 
-int 
+int
 NdbDictInterface::alterTable(Ndb & ndb,
-			      NdbTableImpl & impl)
+                             const NdbTableImpl &old_impl,
+                             NdbTableImpl &impl)
 {
+  int ret;
+  Uint32 change_mask;
+
   DBUG_ENTER("NdbDictInterface::alterTable");
-  DBUG_RETURN(createOrAlterTable(ndb, impl, true));
+
+  syncInternalName(ndb, impl);
+
+  /* Check that alter request is valid and compute stuff to alter. */
+  ret= compChangeMask(old_impl, impl, change_mask);
+  if(ret != 0)
+    DBUG_RETURN(ret);
+
+  UtilBufferWriter w(m_buffer);
+  ret= serializeTableDesc(ndb, impl, w);
+  if(ret != 0)
+    DBUG_RETURN(ret);
+
+  DBUG_RETURN(sendAlterTable(impl, change_mask, w));
+}
+
+void
+NdbDictInterface::syncInternalName(Ndb & ndb, NdbTableImpl &impl)
+{
+  const BaseString internalName(
+    ndb.internalize_table_name(impl.m_externalName.c_str()));
+  impl.m_internalName.assign(internalName);
+  impl.updateMysqlName();
+}
+
+/*
+  Compare old and new Table descriptors.
+  Set the corresponding flag for any (supported) difference.
+  Error on any difference not supported for alter table.
+*/
+int
+NdbDictInterface::compChangeMask(const NdbTableImpl &old_impl,
+                                 const NdbTableImpl &impl,
+                                 Uint32 &change_mask)
+{
+  bool found_varpart;
+  change_mask= 0;
+
+  /* These are the supported properties that may be altered. */
+  DBUG_PRINT("info", ("old_impl.m_internalName='%s' impl.m_internalName='%s'",
+                      old_impl.m_internalName.c_str(),
+                      impl.m_internalName.c_str()));
+  if(impl.m_internalName != old_impl.m_internalName)
+    AlterTableReq::setNameFlag(change_mask, true);
+  if(!impl.m_frm.equal(old_impl.m_frm))
+    AlterTableReq::setFrmFlag(change_mask, true);
+  if(!impl.m_fd.equal(old_impl.m_fd))
+    AlterTableReq::setFragDataFlag(change_mask, true);
+  if(!impl.m_ts_name.equal(old_impl.m_ts_name))
+    AlterTableReq::setTsNameFlag(change_mask, true);
+  if(!impl.m_range.equal(old_impl.m_range))
+    AlterTableReq::setRangeListFlag(change_mask, true);
+  if(!impl.m_ts.equal(old_impl.m_ts))
+    AlterTableReq::setTsFlag(change_mask, true);
+
+  /* No other property can be changed in alter table. */
+  Uint32 old_sz= old_impl.m_columns.size();
+  Uint32 sz= impl.m_columns.size();
+  if(impl.m_fragmentCount != old_impl.m_fragmentCount ||
+     impl.m_logging != old_impl.m_logging ||
+     impl.m_temporary != old_impl.m_temporary ||
+     impl.m_row_gci != old_impl.m_row_gci ||
+     impl.m_row_checksum != old_impl.m_row_checksum ||
+     impl.m_kvalue != old_impl.m_kvalue ||
+     impl.m_minLoadFactor != old_impl.m_minLoadFactor ||
+     impl.m_maxLoadFactor != old_impl.m_maxLoadFactor ||
+     impl.m_primaryTableId != old_impl.m_primaryTableId ||
+     impl.m_max_rows != old_impl.m_max_rows ||
+     impl.m_min_rows != old_impl.m_min_rows ||
+     impl.m_default_no_part_flag != old_impl.m_default_no_part_flag ||
+     impl.m_linear_flag != old_impl.m_linear_flag ||
+     impl.m_fragmentType != old_impl.m_fragmentType ||
+     impl.m_tablespace_name != old_impl.m_tablespace_name ||
+     impl.m_tablespace_id != old_impl.m_tablespace_id ||
+     impl.m_tablespace_version != old_impl.m_tablespace_version ||
+     impl.m_id != old_impl.m_id ||
+     impl.m_version != old_impl.m_version ||
+     sz < old_sz)
+    goto invalid_alter_table;
+
+  /*
+    Check for new columns.
+    We can add one or more new columns at the end, with some restrictions:
+     - All existing columns must be unchanged.
+     - The new column must be dynamic.
+     - The new column must be nullable.
+     - The new column must be memory based.
+     - The new column can not be a primary key or distribution key.
+     - There must already be at least one existing memory-stored dynamic or
+       variable-sized column (so that the varpart is already allocated).
+  */
+  found_varpart= false;
+  for(Uint32 i= 0; i<old_sz; i++)
+  {
+    const NdbColumnImpl *col= impl.m_columns[i];
+    if(!col->equal(*(old_impl.m_columns[i])))
+      goto invalid_alter_table;
+    if(col->m_storageType == NDB_STORAGETYPE_MEMORY &&
+       (col->m_dynamic || col->m_arrayType != NDB_ARRAYTYPE_FIXED))
+      found_varpart= true;
+  }
+
+  if(sz > old_sz)
+  {
+    if(!found_varpart)
+      goto invalid_alter_table;
+
+    for(Uint32 i=old_sz; i<sz; i++)
+    {
+      const NdbColumnImpl *col= impl.m_columns[i];
+      if(!col->m_dynamic || !col->m_nullable ||
+         col->m_storageType == NDB_STORAGETYPE_DISK ||
+         col->m_pk ||
+         col->m_distributionKey ||
+         col->m_autoIncrement                   // ToDo: allow this?
+         )
+        goto invalid_alter_table;
+    }
+    AlterTableReq::setAddAttrFlag(change_mask, true);
+  }
+
+  return 0;
+
+ invalid_alter_table:
+  m_error.code = 741;                           // "Unsupported alter table"
+  return -1;
 }
 
 int 
-NdbDictInterface::createOrAlterTable(Ndb & ndb,
+NdbDictInterface::serializeTableDesc(Ndb & ndb,
 				     NdbTableImpl & impl,
-				     bool alter)
+				     UtilBufferWriter & w)
 {
   unsigned i, err;
   char *ts_names[MAX_NDB_PARTITIONS];
-  DBUG_ENTER("NdbDictInterface::createOrAlterTable");
+  DBUG_ENTER("NdbDictInterface::serializeTableDesc");
 
   impl.computeAggregates();
 
@@ -2425,72 +2497,6 @@
     DBUG_RETURN(-1);
   }
   
-  // Check if any changes for alter table
-  
-  // Name change
-  if (!impl.m_newExternalName.empty()) {
-    if (alter)
-    {
-      AlterTableReq::setNameFlag(impl.m_changeMask, true);
-    }
-    impl.m_externalName.assign(impl.m_newExternalName);
-    impl.m_newExternalName.clear();
-  }
-  // Definition change (frm)
-  if (!impl.m_newFrm.empty())
-  {
-    if (alter)
-    {
-      AlterTableReq::setFrmFlag(impl.m_changeMask, true);
-    }
-    impl.m_frm.assign(impl.m_newFrm.get_data(), impl.m_newFrm.length());
-    impl.m_newFrm.clear();
-  }
-  // Change FragmentData (fragment identity, state, tablespace id)
-  if (!impl.m_new_fd.empty())
-  {
-    if (alter)
-    {
-      AlterTableReq::setFragDataFlag(impl.m_changeMask, true);
-    }
-    impl.m_fd.assign(impl.m_new_fd.get_data(), impl.m_new_fd.length());
-    impl.m_new_fd.clear();
-  }
-  // Change Tablespace Name Data
-  if (!impl.m_new_ts_name.empty())
-  {
-    if (alter)
-    {
-      AlterTableReq::setTsNameFlag(impl.m_changeMask, true);
-    }
-    impl.m_ts_name.assign(impl.m_new_ts_name.get_data(),
-                          impl.m_new_ts_name.length());
-    impl.m_new_ts_name.clear();
-  }
-  // Change Range/List Data
-  if (!impl.m_new_range.empty())
-  {
-    if (alter)
-    {
-      AlterTableReq::setRangeListFlag(impl.m_changeMask, true);
-    }
-    impl.m_range.assign(impl.m_new_range.get_data(),
-                          impl.m_new_range.length());
-    impl.m_new_range.clear();
-  }
-  // Change Tablespace Data
-  if (!impl.m_new_ts.empty())
-  {
-    if (alter)
-    {
-      AlterTableReq::setTsFlag(impl.m_changeMask, true);
-    }
-    impl.m_ts.assign(impl.m_new_ts.get_data(),
-                     impl.m_new_ts.length());
-    impl.m_new_ts.clear();
-  }
-
-
   /*
      TODO RONM: Here I need to insert checks for fragment array and
      range or list array
@@ -2499,10 +2505,6 @@
   //validate();
   //aggregate();
 
-  const BaseString internalName(
-    ndb.internalize_table_name(impl.m_externalName.c_str()));
-  impl.m_internalName.assign(internalName);
-  impl.updateMysqlName();
   DictTabInfo::Table *tmpTab;
 
   tmpTab = (DictTabInfo::Table*)NdbMem_Allocate(sizeof(DictTabInfo::Table));
@@ -2512,9 +2514,8 @@
     DBUG_RETURN(-1);
   }
   tmpTab->init();
-  BaseString::snprintf(tmpTab->TableName,
-	   sizeof(tmpTab->TableName),
-	   internalName.c_str());
+  BaseString::snprintf(tmpTab->TableName, sizeof(tmpTab->TableName),
+                       "%s", impl.m_internalName.c_str());
 
   Uint32 distKeys= 0;
   for(i = 0; i<sz; i++) {
@@ -2656,8 +2657,8 @@
     }
   }
   
-  UtilBufferWriter w(m_buffer);
   SimpleProperties::UnpackStatus s;
+  w.reset();
   s = SimpleProperties::pack(w, 
 			     tmpTab,
 			     DictTabInfo::TableMapping, 
@@ -2683,7 +2684,7 @@
 		       col->m_name.c_str(), i, col->m_distributionKey));
     DictTabInfo::Attribute tmpAttr; tmpAttr.init();
     BaseString::snprintf(tmpAttr.AttributeName, sizeof(tmpAttr.AttributeName), 
-	     col->m_name.c_str());
+	     "%s", col->m_name.c_str());
     tmpAttr.AttributeId = col->m_attrId;
     tmpAttr.AttributeKeyFlag = col->m_pk;
     tmpAttr.AttributeNullableFlag = col->m_nullable;
@@ -2702,6 +2703,7 @@
       tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
     else
       tmpAttr.AttributeStorageType = col->m_storageType;
+    tmpAttr.AttributeDynamic = (col->m_dynamic ? 1 : 0);
 
     if(col->getBlobType())
       tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
@@ -2741,8 +2743,8 @@
 
     tmpAttr.AttributeAutoIncrement = col->m_autoIncrement;
     BaseString::snprintf(tmpAttr.AttributeDefaultValue, 
-	     sizeof(tmpAttr.AttributeDefaultValue),
-	     col->m_defaultValue.c_str());
+                         sizeof(tmpAttr.AttributeDefaultValue),
+                         "%s", col->m_defaultValue.c_str());
     s = SimpleProperties::pack(w, 
 			       &tmpAttr,
 			       DictTabInfo::AttributeMapping, 
@@ -2750,52 +2752,68 @@
     w.add(DictTabInfo::AttributeEnd, 1);
   }
 
-  int ret;
-  
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictInterface::sendAlterTable(const NdbTableImpl &impl,
+                                 Uint32 change_mask,
+                                 UtilBufferWriter &w)
+{
   LinearSectionPtr ptr[1];
   ptr[0].p = (Uint32*)m_buffer.get_data();
   ptr[0].sz = m_buffer.length() / 4;
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
-  if (alter) {
-    tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
-    tSignal.theLength = AlterTableReq::SignalLength;
+  tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
+  tSignal.theLength = AlterTableReq::SignalLength;
 
-    AlterTableReq * req = CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
-    
-    req->senderRef = m_reference;
-    req->senderData = 0;
-    req->changeMask = impl.m_changeMask;
-    req->tableId = impl.m_id;
-    req->tableVersion = impl.m_version;;
+  AlterTableReq * req = CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
 
-    int errCodes[] = { AlterTableRef::NotMaster, AlterTableRef::Busy, 0 };
-    ret = dictSignal(&tSignal, ptr, 1,
-		     0, // master
-		     WAIT_ALTER_TAB_REQ,
-		     DICT_WAITFOR_TIMEOUT, 100,
-		     errCodes);
-    
-    if(m_error.code == AlterTableRef::InvalidTableVersion) {
-      // Clear caches and try again
-      DBUG_RETURN(INCOMPATIBLE_VERSION);
-    }
-  } else {
-    tSignal.theVerId_signalNumber   = GSN_CREATE_TABLE_REQ;
-    tSignal.theLength = CreateTableReq::SignalLength;
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->changeMask = change_mask;
+  req->tableId = impl.m_id;
+  req->tableVersion = impl.m_version;
 
-    CreateTableReq * req = CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
-    req->senderRef = m_reference;
-    req->senderData = 0;
-    int errCodes[] = { CreateTableRef::Busy, CreateTableRef::NotMaster, 0 };
-    ret = dictSignal(&tSignal, ptr, 1,
-		     0, // master node
-		     WAIT_CREATE_INDX_REQ,
-		     DICT_WAITFOR_TIMEOUT, 100,
-		     errCodes);
+  int errCodes[] = { AlterTableRef::NotMaster, AlterTableRef::Busy, 0 };
+  int ret= dictSignal(&tSignal, ptr, 1,
+                      0,                        // master
+                      WAIT_ALTER_TAB_REQ,
+                      DICT_WAITFOR_TIMEOUT, 100,
+                      errCodes);
+
+  if(m_error.code == AlterTableRef::InvalidTableVersion) {
+    // Clear caches and try again
+    return(INCOMPATIBLE_VERSION);
   }
-  
-  DBUG_RETURN(ret);
+
+  return ret;
+}
+
+int
+NdbDictInterface::sendCreateTable(const NdbTableImpl &impl,
+                                  UtilBufferWriter &w)
+{
+  LinearSectionPtr ptr[1];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = m_buffer.length() / 4;
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_CREATE_TABLE_REQ;
+  tSignal.theLength = CreateTableReq::SignalLength;
+
+  CreateTableReq * req = CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  int errCodes[]= { CreateTableRef::Busy, CreateTableRef::NotMaster, 0 };
+  int ret= dictSignal(&tSignal, ptr, 1,
+                      0,                        // master node
+                      WAIT_CREATE_INDX_REQ,
+                      DICT_WAITFOR_TIMEOUT, 100,
+                      errCodes);
+
+  return ret;
 }
 
 void
@@ -4738,7 +4756,7 @@
   DBUG_ENTER("NdbDictInterface::create_file"); 
   UtilBufferWriter w(m_buffer);
   DictFilegroupInfo::File f; f.init();
-  snprintf(f.FileName, sizeof(f.FileName), file.m_path.c_str());
+  snprintf(f.FileName, sizeof(f.FileName), "%s", file.m_path.c_str());
   f.FileType = file.m_type;
   f.FilegroupId = group.m_id;
   f.FilegroupVersion = group.m_version;
@@ -4866,7 +4884,8 @@
   DBUG_ENTER("NdbDictInterface::create_filegroup");
   UtilBufferWriter w(m_buffer);
   DictFilegroupInfo::Filegroup fg; fg.init();
-  snprintf(fg.FilegroupName, sizeof(fg.FilegroupName), group.m_name.c_str());
+  snprintf(fg.FilegroupName, sizeof(fg.FilegroupName),
+           "%s", group.m_name.c_str());
   switch(group.m_type){
   case NdbDictionary::Object::Tablespace:
   {

--- 1.74/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-04-03 14:34:27 +02:00
+++ 1.75/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-04-03 14:34:27 +02:00
@@ -22,6 +22,7 @@
 #include <BaseString.hpp>
 #include <Vector.hpp>
 #include <UtilBuffer.hpp>
+#include <SimpleProperties.hpp>
 #include <NdbDictionary.hpp>
 #include <Bitmask.hpp>
 #include <AttributeList.hpp>
@@ -98,6 +99,7 @@
   Uint32 m_arraySize;           // length or maxlength+1/2 for Var* types
   Uint32 m_arrayType;           // NDB_ARRAYTYPE_FIXED or _VAR
   Uint32 m_storageType;         // NDB_STORAGETYPE_MEMORY or _DISK
+  bool m_dynamic;
   /*
    * NdbTableImpl: if m_pk, 0-based index of key in m_attrId order
    * NdbIndexImpl: m_column_no of primary table column
@@ -157,22 +159,15 @@
   int aggregate(NdbError& error);
   int validate(NdbError& error);
 
-  Uint32 m_changeMask;
   Uint32 m_primaryTableId;
   BaseString m_internalName;
   BaseString m_externalName;
   BaseString m_mysqlName;
-  BaseString m_newExternalName; // Used for alter table
   UtilBuffer m_frm; 
-  UtilBuffer m_newFrm;       // Used for alter table
   UtilBuffer m_ts_name;      //Tablespace Names
-  UtilBuffer m_new_ts_name;  //Tablespace Names
   UtilBuffer m_ts;           //TablespaceData
-  UtilBuffer m_new_ts;       //TablespaceData
   UtilBuffer m_fd;           //FragmentData
-  UtilBuffer m_new_fd;       //FragmentData
   UtilBuffer m_range;        //Range Or List Array
-  UtilBuffer m_new_range;    //Range Or List Array
   NdbDictionary::Object::FragmentType m_fragmentType;
 
   /**
@@ -442,10 +437,20 @@
 		 int timeout, Uint32 RETRIES,
 		 const int *errcodes = 0, int temporaryMask = 0);
 
-  int createOrAlterTable(class Ndb & ndb, NdbTableImpl &, bool alter);
-
   int createTable(class Ndb & ndb, NdbTableImpl &);
-  int alterTable(class Ndb & ndb, NdbTableImpl &);
+  int alterTable(class Ndb & ndb, const NdbTableImpl &, NdbTableImpl &);
+  void syncInternalName(Ndb & ndb, NdbTableImpl &impl);
+  int compChangeMask(const NdbTableImpl &old_impl,
+                     const NdbTableImpl &impl,
+                     Uint32 &change_mask);
+  int serializeTableDesc(Ndb & ndb,
+                         NdbTableImpl & impl,
+                         UtilBufferWriter & w);
+  int sendAlterTable(const NdbTableImpl &impl,
+                     Uint32 change_mask,
+                     UtilBufferWriter &w);
+  int sendCreateTable(const NdbTableImpl &impl, UtilBufferWriter &w);
+
   int dropTable(const NdbTableImpl &);
 
   int createIndex(class Ndb & ndb, const NdbIndexImpl &, const NdbTableImpl &);
@@ -583,7 +588,7 @@
 
   int createTable(NdbTableImpl &t);
   int createBlobTables(NdbTableImpl& org, NdbTableImpl& created);
-  int alterTable(NdbTableImpl &t);
+  int alterTable(NdbTableImpl &old_impl, NdbTableImpl &impl);
   int dropTable(const char * name);
   int dropTable(NdbTableImpl &);
   int dropBlobTables(NdbTableImpl &);

--- 1.64/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-04-03 14:34:27 +02:00
+++ 1.65/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-04-03 14:34:27 +02:00
@@ -38,7 +38,7 @@
 #include <signaldata/SumaImpl.hpp>
 
 //#define REPORT_TRANSPORTER
-//#define API_TRACE;
+#define API_TRACE;
 
 static int numberToIndex(int number)
 {

--- 1.14/storage/ndb/test/include/NDBT_Table.hpp	2007-04-03 14:34:27 +02:00
+++ 1.15/storage/ndb/test/include/NDBT_Table.hpp	2007-04-03 14:34:27 +02:00
@@ -29,7 +29,8 @@
 		 bool _pk = false, 
 		 bool _nullable = false,
 		 CHARSET_INFO *cs= 0,
-		 NdbDictionary::Column::StorageType storage = NdbDictionary::Column::StorageTypeMemory):
+		 NdbDictionary::Column::StorageType storage = NdbDictionary::Column::StorageTypeMemory,
+                 bool dynamic = false):
     NdbDictionary::Column(_name)
   {
     assert(_name != 0);
@@ -43,6 +44,7 @@
       setCharset(cs);
     }
     setStorageType(storage);
+    setDynamic(dynamic);
   }
 };
 

--- 1.35/storage/ndb/test/ndbapi/testDict.cpp	2007-04-03 14:34:27 +02:00
+++ 1.36/storage/ndb/test/ndbapi/testDict.cpp	2007-04-03 14:34:27 +02:00
@@ -525,6 +525,49 @@
   return NDBT_OK;
 }
 
+int runUseTableUntilStopped2(NDBT_Context* ctx, NDBT_Step* step){
+  int records = ctx->getNumRecords();
+
+  Ndb* pNdb = GETNDB(step);
+  const NdbDictionary::Table* pTab = ctx->getTab();
+  const NdbDictionary::Table* pTab2 = 
+    NDBT_Table::discoverTableFromDb(pNdb, pTab->getName());
+  HugoTransactions hugoTrans(*pTab2);
+
+  while (ctx->isTestStopped() == false) 
+  {
+    //    g_info << i++ << ": ";    
+
+
+    // Delete and recreate Ndb object
+    // Otherwise you always get Invalid Schema Version
+    // It would be a nice feature to remove this two lines
+    //step->tearDown();
+    //step->setUp();
+
+
+    int res;
+    if ((res = hugoTrans.loadTable(pNdb, records)) != 0){
+      NdbError err = pNdb->getNdbError(res);
+      if(err.classification == NdbError::SchemaError){
+	pNdb->getDictionary()->invalidateTable(pTab->getName());
+      }
+      continue;
+    }
+    
+    UtilTransactions utilTrans(*pTab2);
+    if ((res = utilTrans.clearTable(pNdb,  records)) != 0){
+      NdbError err = pNdb->getNdbError(res);
+      if(err.classification == NdbError::SchemaError){
+	pNdb->getDictionary()->invalidateTable(pTab->getName());
+      }
+      continue;
+    }
+  }
+  g_info << endl;
+  return NDBT_OK;
+}
+
 
 int
 runCreateMaxTables(NDBT_Context* ctx, NDBT_Step* step)
@@ -1305,7 +1348,7 @@
     if (oldTable) {
       NdbDictionary::Table newTable = *oldTable;
       newTable.setName(pTabNewName.c_str());
-      CHECK2(dict->alterTable(newTable) == 0,
+      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
 	     "TableRename failed");
     }
     else {
@@ -1374,7 +1417,7 @@
     if (oldTable) {
       NdbDictionary::Table newTable = *oldTable;
       newTable.setName(pTabNewName.c_str());
-      CHECK2(dict->alterTable(newTable) == 0,
+      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
 	     "TableRename failed");
     }
     else {
@@ -1471,7 +1514,7 @@
     if (oldTable) {
       NdbDictionary::Table newTable = *oldTable;
       newTable.setName(pTabNewName.c_str());
-      CHECK2(dict->alterTable(newTable) == 0,
+      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
 	     "TableRename failed");
     }
     else {
@@ -1512,6 +1555,174 @@
   return result;
 }
 
+/*
+  Run online alter table add attributes.
+ */
+int
+runTableAddAttrs(NDBT_Context* ctx, NDBT_Step* step){
+
+  int result = NDBT_OK;
+
+  Ndb* pNdb = GETNDB(step);
+  NdbDictionary::Dictionary* dict = pNdb->getDictionary();
+  int records = ctx->getNumRecords();
+  const int loops = ctx->getNumLoops();
+
+  ndbout << "|- " << ctx->getTab()->getName() << endl;  
+
+  NdbDictionary::Table myTab= *(ctx->getTab());
+
+  for (int l = 0; l < loops && result == NDBT_OK ; l++){
+    // Try to create table in db
+    if (myTab.createTableInDb(pNdb) != 0){
+      return NDBT_FAILED;
+    }
+    
+    // Verify that table is in db     
+    const NdbDictionary::Table* pTab2 = 
+      NDBT_Table::discoverTableFromDb(pNdb, myTab.getName());
+    if (pTab2 == NULL){
+      ndbout << myTab.getName() << " was not found in DB"<< endl;
+      return NDBT_FAILED;
+    }
+    ctx->setTab(pTab2);
+
+    /*
+      Check that table already has a varpart, otherwise add attr is
+      not possible.
+    */
+    const NdbDictionary::Column *col;
+    for (Uint32 i= 0; (col= pTab2->getColumn(i)) != 0; i++)
+    {
+      if (col->getStorageType() == NDB_STORAGETYPE_MEMORY &&
+          (col->getDynamic() || col->getArrayType() != NDB_ARRAYTYPE_FIXED))
+        break;
+    }
+    if (col == 0)
+    {
+      /* Alter table add attribute not applicable, just mark success. */
+      break;
+    }
+
+    // Load table
+    HugoTransactions hugoTrans(*ctx->getTab());
+    if (hugoTrans.loadTable(pNdb, records) != 0){
+      return NDBT_FAILED;
+    }
+
+    // Add attributes to table.
+    BaseString pTabName(pTab2->getName());
+    
+    const NdbDictionary::Table * oldTable = dict->getTable(pTabName.c_str());
+    if (oldTable) {
+      NdbDictionary::Table newTable= *oldTable;
+
+      NDBT_Attribute newcol1("NEWKOL1", NdbDictionary::Column::Unsigned, 1,
+                            false, true, 0,
+                            NdbDictionary::Column::StorageTypeMemory, true);
+      newTable.addColumn(newcol1);
+      NDBT_Attribute newcol2("NEWKOL2", NdbDictionary::Column::Char, 14,
+                            false, true, 0,
+                            NdbDictionary::Column::StorageTypeMemory, true);
+      newTable.addColumn(newcol2);
+      NDBT_Attribute newcol3("NEWKOL3", NdbDictionary::Column::Bit, 20,
+                            false, true, 0,
+                            NdbDictionary::Column::StorageTypeMemory, true);
+      newTable.addColumn(newcol3);
+      NDBT_Attribute newcol4("NEWKOL4", NdbDictionary::Column::Varbinary, 42,
+                            false, true, 0,
+                            NdbDictionary::Column::StorageTypeMemory, true);
+      newTable.addColumn(newcol4);
+
+      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
+	     "TableAddAttrs failed");
+      /* Need to purge old version and reload new version after alter table. */
+      dict->invalidateTable(pTabName.c_str());
+    }
+    else {
+      result = NDBT_FAILED;
+    }
+    
+    // Drop table.
+    dict->dropTable(pTabName.c_str());
+  }
+ end:
+
+  return result;
+}
+
+/*
+  Run online alter table add attributes while running simultaneous
+  transactions on it in separate thread.
+ */
+int
+runTableAddAttrsDuring(NDBT_Context* ctx, NDBT_Step* step){
+
+  int result = NDBT_OK;
+
+  int records = ctx->getNumRecords();
+  const int loops = ctx->getNumLoops();
+
+  ndbout << "|- " << ctx->getTab()->getName() << endl;  
+
+  NdbDictionary::Table myTab= *(ctx->getTab());
+
+  const NdbDictionary::Column *col;
+  for (Uint32 i= 0; (col= myTab.getColumn(i)) != 0; i++)
+  {
+    if (col->getStorageType() == NDB_STORAGETYPE_MEMORY &&
+        (col->getDynamic() || col->getArrayType() != NDB_ARRAYTYPE_FIXED))
+      break;
+  }
+
+  //if 
+
+  for (int l = 0; l < loops && result == NDBT_OK ; l++){
+    ndbout << l << ": " << endl;    
+
+    Ndb* pNdb = GETNDB(step);
+    NdbDictionary::Dictionary* dict = pNdb->getDictionary();
+
+    /*
+      Check that table already has a varpart, otherwise add attr is
+      not possible.
+    */
+
+    // Add attributes to table.
+    ndbout << "Altering table" << endl;
+    
+    const NdbDictionary::Table * oldTable = dict->getTable(myTab.getName());
+    if (oldTable) {
+      NdbDictionary::Table newTable= *oldTable;
+      
+      char name[256];
+      BaseString::snprintf(name, sizeof(name), "NEWCOL%d", l);
+      NDBT_Attribute newcol1(name, NdbDictionary::Column::Unsigned, 1,
+                             false, true, 0,
+                             NdbDictionary::Column::StorageTypeMemory, true);
+      newTable.addColumn(newcol1);
+      //ToDo: check #loops, how many columns l
+      
+      CHECK2(dict->alterTable(*oldTable, newTable) == 0,
+	     "TableAddAttrsDuring failed");
+      
+      dict->invalidateTable(myTab.getName());
+      const NdbDictionary::Table * newTab = dict->getTable(myTab.getName());
+      HugoTransactions hugoTrans(* newTab);
+      hugoTrans.scanUpdateRecords(pNdb, records);
+    }
+    else {
+      result= NDBT_FAILED;
+      break;
+    }
+  }
+ end:
+
+  ctx->stopTest();
+
+  return result;
+}
+
 static void
 f(const NdbDictionary::Column * col){
   if(col == 0){
@@ -2256,7 +2467,18 @@
   STEP(runRestarts);
   STEP(runDictOps);
 }
-
+TESTCASE("TableAddAttrs",
+	 "Add attributes to an existing table using alterTable()"){
+  INITIALIZER(runTableAddAttrs);
+}
+TESTCASE("TableAddAttrsDuring",
+	 "Try to add attributes to the table when other thread is using it\n"
+	 "do this loop number of times\n"){
+  INITIALIZER(runCreateTheTable);
+  STEP(runTableAddAttrsDuring);
+  STEP(runUseTableUntilStopped2);
+  FINALIZER(runDropTheTable);
+}
 NDBT_TESTSUITE_END(testDict);
 
 int main(int argc, const char** argv){

--- 1.23/storage/ndb/test/src/NDBT_Tables.cpp	2007-04-03 14:34:27 +02:00
+++ 1.24/storage/ndb/test/src/NDBT_Tables.cpp	2007-04-03 14:34:27 +02:00
@@ -23,6 +23,11 @@
 //  USE ONLY UPPERLETTERS IN TAB AND COLUMN NAMES
 /* ******************************************************* */
 
+static const NdbDictionary::Column::StorageType MM=
+    NdbDictionary::Column::StorageTypeMemory;
+static const NdbDictionary::Column::StorageType DD=
+    NdbDictionary::Column::StorageTypeDisk;
+
 /*
  * These are our "official" test tables
  *
@@ -292,6 +297,88 @@
 NDBT_Table T14("T14", sizeof(T14Attribs)/sizeof(NDBT_Attribute), T14Attribs);
 
 /*
+  T15 - Dynamic attributes.
+  Test many different combinations of attribute types, sizes, and NULLability.
+  Also exersize >32bit dynattr bitmap.
+*/
+static
+const
+NDBT_Attribute T15Attribs[] = {
+  NDBT_Attribute("KOL1", NdbDictionary::Column::Unsigned, 1, true, false, 0, MM, true),
+  NDBT_Attribute("KOL2", NdbDictionary::Column::Varbinary, 100, false, true, 0, MM, true),
+  NDBT_Attribute("KOL3", NdbDictionary::Column::Unsigned, 1, false, true, 0, MM, true),
+  NDBT_Attribute("KOL4", NdbDictionary::Column::Int, 1, false, false, 0, MM, true),
+  NDBT_Attribute("KOL5", NdbDictionary::Column::Float, 1, false, true, 0, MM, true),
+  NDBT_Attribute("KOL6", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL7", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL8", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL9", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL10", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL11", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL12", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL13", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL14", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL15", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL16", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL17", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL18", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL19", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL20", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL21", NdbDictionary::Column::Varbinary, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL22", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL23", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL24", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL25", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL26", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL27", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL28", NdbDictionary::Column::Char, 4, false, false),
+  NDBT_Attribute("KOL29", NdbDictionary::Column::Varbinary, 4, false, false),
+  NDBT_Attribute("KOL30", NdbDictionary::Column::Char, 4, false, true, 0, DD),
+  NDBT_Attribute("KOL31", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL32", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("BIT1", NdbDictionary::Column::Bit, 27, false, true, 0, MM, true),
+  NDBT_Attribute("BIT2", NdbDictionary::Column::Bit, 1, false, false, 0, MM, true),
+  NDBT_Attribute("BIT3", NdbDictionary::Column::Bit, 1, false, true, 0, MM, true),
+  NDBT_Attribute("BIT4", NdbDictionary::Column::Bit, 8, false, false, 0, MM, true),
+  NDBT_Attribute("KOL33", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL34", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL35", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL36", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL37", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL38", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL39", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL40", NdbDictionary::Column::Varbinary, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL41", NdbDictionary::Column::Char, 64, false, true, 0, MM, true),
+  NDBT_Attribute("KOL42", NdbDictionary::Column::Char, 4, false, true, 0, MM, true),
+  NDBT_Attribute("KOL43", NdbDictionary::Column::Char, 8, false, true, 0, MM, true),
+  NDBT_Attribute("KOL44", NdbDictionary::Column::Char, 27, false, true, 0, MM, true),
+  NDBT_Attribute("KOL45", NdbDictionary::Column::Char, 64, false, false, 0, MM, true),
+  NDBT_Attribute("KOL46", NdbDictionary::Column::Char, 4, false, false, 0, MM, true),
+  NDBT_Attribute("KOL47", NdbDictionary::Column::Char, 8, false, false, 0, MM, true),
+  NDBT_Attribute("KOL48", NdbDictionary::Column::Char, 27, false, false, 0, MM, true),
+  NDBT_Attribute("KOL49", NdbDictionary::Column::Varbinary, 255, false, false, 0, MM, true),
+  /* This one is for update count, needed by hugoScanUpdate. */
+  NDBT_Attribute("KOL99", NdbDictionary::Column::Unsigned, 1, false, false, 0, MM, true),
+};
+
+static
+const
+NDBT_Table T15("T15", sizeof(T15Attribs)/sizeof(NDBT_Attribute), T15Attribs);
+
+/* Test dynamic bit types when no other varsize/dynamic. */
+static
+const
+NDBT_Attribute T16Attribs[] = {
+  NDBT_Attribute("KOL1", NdbDictionary::Column::Unsigned, 1, true, false),
+  NDBT_Attribute("Kol2", NdbDictionary::Column::Bit, 27, false, true, 0, MM, true),
+  NDBT_Attribute("KOL99", NdbDictionary::Column::Unsigned, 1, false, false),
+};
+
+static
+const
+NDBT_Table T16("T16", sizeof(T16Attribs)/sizeof(NDBT_Attribute), T16Attribs);
+
+/*
   C2 DHCP TABLES, MAYBE THESE SHOULD BE MOVED TO THE UTIL_TABLES?
 */
 static 
@@ -430,6 +517,8 @@
   &T12,
   &T13,
   &T14,
+  &T15,
+  &T16,
   &I1,
   &I2,
   &I3,

--- 1.22/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-04-03 14:34:27 +02:00
+++ 1.23/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-04-03 14:34:27 +02:00
@@ -68,7 +68,8 @@
 	bits |= ScanOp::SCAN_DD;
       }
       bool mm = (bits & ScanOp::SCAN_DD);
-      if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
+      if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
+           tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0) {
 	bits |= ScanOp::SCAN_VS;
 	
 	// disk pages have fixed page format
@@ -88,7 +89,8 @@
       c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
       ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
       bits |= ScanOp::SCAN_LCP;
-      if (tablePtr.p->m_attributes[MM].m_no_of_varsize > 0) {
+      if ((tablePtr.p->m_attributes[MM].m_no_of_varsize +
+           tablePtr.p->m_attributes[MM].m_no_of_dynamic) > 0) {
         bits |= ScanOp::SCAN_VS;
       }
     }
@@ -918,7 +920,8 @@
   ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
   if (headerbits & Tuple_header::FREED)
   {
-    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+    if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
+        tablePtr.p->m_attributes[MM].m_no_of_dynamic)
     {
       jam();
       free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
Thread
bk commit into 5.1 tree (jonas:1.2512)jonas3 Apr