List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 19 2006 12:02pm
Subject:bk commit into 5.1 tree (aelkin:1.2165)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of elkin. When elkin does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2165 06/05/19 15:01:52 aelkin@stripped +11 -0
  Merge mysql.com:/usr_rh9/home/elkin.rh9/MySQL/Merge/5.0-merge
  into  mysql.com:/usr_rh9/home/elkin.rh9/MySQL/Merge/5.1

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.37 06/05/19 15:01:47 aelkin@stripped +2 -3
    manual merge using remote version

  storage/ndb/test/ndbapi/testInterpreter.cpp
    1.5 06/05/19 14:42:32 aelkin@stripped +0 -0
    Auto merged

  storage/ndb/test/ndbapi/Makefile.am
    1.28 06/05/19 14:42:32 aelkin@stripped +0 -1
    Auto merged

  sql/sql_acl.cc
    1.193 06/05/19 14:42:32 aelkin@stripped +0 -0
    Auto merged

  sql/opt_range.cc
    1.218 06/05/19 14:42:32 aelkin@stripped +0 -0
    Auto merged

  sql/mysqld.cc
    1.545 06/05/19 14:42:31 aelkin@stripped +0 -0
    Auto merged

  sql/item.h
    1.201 06/05/19 14:42:31 aelkin@stripped +0 -0
    Auto merged

  sql/item.cc
    1.191 06/05/19 14:42:31 aelkin@stripped +0 -0
    Auto merged

  mysql-test/t/rpl_temporary.test
    1.25 06/05/19 14:42:31 aelkin@stripped +0 -0
    Auto merged

  storage/ndb/test/ndbapi/testInterpreter.cpp
    1.3.1.2 06/05/19 14:42:30 aelkin@stripped +0 -0
    Merge rename: ndb/test/ndbapi/testInterpreter.cpp -> storage/ndb/test/ndbapi/testInterpreter.cpp

  storage/ndb/test/ndbapi/Makefile.am
    1.19.7.2 06/05/19 14:42:30 aelkin@stripped +0 -0
    Merge rename: ndb/test/ndbapi/Makefile.am -> storage/ndb/test/ndbapi/Makefile.am

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.19.1.2 06/05/19 14:42:30 aelkin@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp -> storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp

  libmysql/libmysql.c
    1.254 06/05/19 14:42:30 aelkin@stripped +0 -0
    Auto merged

  include/my_sys.h
    1.195 06/05/19 14:42:30 aelkin@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	aelkin
# Host:	dsl-hkigw8-feb0de00-199.dhcp.inet.fi
# Root:	/usr_rh9/home/elkin.rh9/MySQL/Merge/5.1/RESYNC

--- 1.190/sql/item.cc	2006-05-18 11:53:09 +03:00
+++ 1.191/sql/item.cc	2006-05-19 14:42:31 +03:00
@@ -5413,7 +5413,7 @@
 }
 
 
-void Item_trigger_field::set_required_privilege(const bool rw)
+void Item_trigger_field::set_required_privilege(bool rw)
 {
   /*
     Require SELECT and UPDATE privilege if this field will be read and

--- 1.200/sql/item.h	2006-05-18 11:53:10 +03:00
+++ 1.201/sql/item.h	2006-05-19 14:42:31 +03:00
@@ -2233,7 +2233,7 @@
   void cleanup();
 
 private:
-  void set_required_privilege(const bool rw);
+  void set_required_privilege(bool rw);
   bool set_value(THD *thd, sp_rcontext *ctx, Item **it);
 
 public:

--- 1.217/sql/opt_range.cc	2006-05-13 21:40:23 +03:00
+++ 1.218/sql/opt_range.cc	2006-05-19 14:42:32 +03:00
@@ -4698,17 +4698,46 @@
 
     if (inv)
     {
-      /*
-        We get here for conditions like "t.keypart NOT IN (....)".
-        
-        If the IN-list contains only constants (and func->array is an ordered
-        array of them), we construct the appropriate SEL_ARG tree manually, 
-        because constructing it using the range analyzer (as 
-        AND_i( t.keypart != c_i)) will cause lots of memory to be consumed
-        (see BUG#15872). 
-      */
       if (func->array && func->cmp_type != ROW_RESULT)
       {
+        /*
+          We get here for conditions in form "t.key NOT IN (c1, c2, ...)" 
+          (where c{i} are constants).
+          Our goal is to produce a SEL_ARG graph that represents intervals:
+          
+          ($MIN<t.key<c1) OR (c1<t.key<c2) OR (c2<t.key<c3) OR ...    (*)
+          
+          where $MIN is either "-inf" or NULL.
+          
+          The most straightforward way to handle NOT IN would be to convert
+          it to "(t.key != c1) AND (t.key != c2) AND ..." and let the range
+          optimizer to build SEL_ARG graph from that. However that will cause
+          the range optimizer to use O(N^2) memory (it's a bug, not filed),
+          and people do use big NOT IN lists (see BUG#15872). Also, for big          
+          NOT IN lists constructing/using graph (*) does not make the query
+          faster.
+          
+          So, we will handle NOT IN manually in the following way:
+          * if the number of entries in the NOT IN list is less then 
+            NOT_IN_IGNORE_THRESHOLD, we will construct SEL_ARG graph (*)
+            manually.
+          * Otherwise, we will construct a smaller graph: for 
+            "t.key NOT IN (c1,...cN)" we construct a graph representing 
+            ($MIN < t.key) OR (cN < t.key)  // here sequence of c_i is
+                                            // ordered.
+
+          A note about partially-covering indexes: for those (e.g. for 
+          "a CHAR(10), KEY(a(5))") the handling is correct (albeit not very
+          efficient):
+          Instead of "t.key < c1" we get "t.key <= prefix-val(c1)".
+          Combining the intervals in (*) together, we get:
+          (-inf<=t.key<=c1) OR (c1<=t.key<=c2) OR (c2<=t.key<=c3) OR ...
+          i.e. actually we get intervals combined into one interval:
+          (-inf<=t.key<=+inf). This doesn't make much sense but it doesn't
+          cause any problems.
+        */
+        MEM_ROOT *tmp_root= param->mem_root;
+        param->thd->mem_root= param->old_root;
         /* 
           Create one Item_type constant object. We'll need it as
           get_mm_parts only accepts constant values wrapped in Item_Type
@@ -4717,25 +4746,35 @@
           per-statement mem_root (while thd->mem_root is currently pointing
           to mem_root local to range optimizer).
         */
-        MEM_ROOT *tmp_root= param->mem_root;
-        param->thd->mem_root= param->old_root;
         Item *value_item= func->array->create_item();
         param->thd->mem_root= tmp_root;
 
         if (!value_item)
           break;
         
-        /* Get a SEL_TREE for "-inf < X < c_0" interval */
-        func->array->value_to_item(0, value_item);
-        tree= get_mm_parts(param, cond_func, field, Item_func::LT_FUNC,
-                           value_item, cmp_type);
-        if (!tree)
+        /* Get a SEL_TREE for "(-inf|NULL) < X < c_0" interval.  */
+        uint i=0;
+        do 
+        {
+          func->array->value_to_item(i, value_item);
+          tree= get_mm_parts(param, cond_func, field, Item_func::LT_FUNC,
+                             value_item, cmp_type);
+          if (!tree)
+            break;
+          i++;
+        } while (i < func->array->count && tree->type == SEL_TREE::IMPOSSIBLE);
+
+        if (!tree || tree->type == SEL_TREE::IMPOSSIBLE)
+        {
+          /* We get here in cases like "t.unsigned NOT IN (-1,-2,-3) */
+          tree= NULL;
           break;
+        }
 #define NOT_IN_IGNORE_THRESHOLD 1000        
         SEL_TREE *tree2;
         if (func->array->count < NOT_IN_IGNORE_THRESHOLD)
         {
-          for (uint i=1; i < func->array->count; i++)
+          for (; i < func->array->count; i++)
           {
             if (func->array->compare_elems(i, i-1))
             {
@@ -4743,32 +4782,44 @@
               func->array->value_to_item(i, value_item);
               tree2= get_mm_parts(param, cond_func, field, Item_func::LT_FUNC,
                                   value_item, cmp_type);
-              
+              if (!tree2)
+              {
+                tree= NULL;
+                break;
+              }
+
               /* Change all intervals to be "c_{i-1} < X < c_i" */
               for (uint idx= 0; idx < param->keys; idx++)
               {
-                SEL_ARG *new_interval;
-                if ((new_interval=  tree2->keys[idx]))
+                SEL_ARG *new_interval, *last_val;
+                if (((new_interval= tree2->keys[idx])) && 
+                    ((last_val= tree->keys[idx]->last())))
                 {
-                  SEL_ARG *last_val= tree->keys[idx]->last();
                   new_interval->min_value= last_val->max_value;
                   new_interval->min_flag= NEAR_MIN;
                 }
               }
+              /* 
+                The following doesn't try to allocate memory so no need to
+                check for NULL.
+              */
               tree= tree_or(param, tree, tree2);
             }
           }
         }
         else
           func->array->value_to_item(func->array->count - 1, value_item);
-
-        /* 
-          Get the SEL_TREE for the last "c_last < X < +inf" interval 
-          (value_item cotains c_last already)
-        */
-        tree2= get_mm_parts(param, cond_func, field, Item_func::GT_FUNC,
-                            value_item, cmp_type);
-        tree= tree_or(param, tree, tree2);
+        
+        if (tree && tree->type != SEL_TREE::IMPOSSIBLE)
+        {
+          /* 
+            Get the SEL_TREE for the last "c_last < X < +inf" interval 
+            (value_item cotains c_last already)
+          */
+          tree2= get_mm_parts(param, cond_func, field, Item_func::GT_FUNC,
+                              value_item, cmp_type);
+          tree= tree_or(param, tree, tree2);
+        }
       }
       else
       {

--- 1.192/sql/sql_acl.cc	2006-05-12 13:32:00 +03:00
+++ 1.193/sql/sql_acl.cc	2006-05-19 14:42:32 +03:00
@@ -6154,20 +6154,21 @@
   }
 
   /* table privileges */
+  rw_rdlock(&LOCK_grant);
   if (grant->version != grant_version)
   {
-    rw_rdlock(&LOCK_grant);
     grant->grant_table=
       table_hash_search(sctx->host, sctx->ip, db,
 			sctx->priv_user,
 			table, 0);              /* purecov: inspected */
     grant->version= grant_version;              /* purecov: inspected */
-    rw_unlock(&LOCK_grant);
   }
   if (grant->grant_table != 0)
   {
     grant->privilege|= grant->grant_table->privs;
   }
+  rw_unlock(&LOCK_grant);
+
   DBUG_PRINT("info", ("privilege 0x%lx", grant->privilege));
   DBUG_VOID_RETURN;
 }

--- 1.19.1.1/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2006-05-16 19:33:18 +03:00
+++ 1.37/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2006-05-19 15:01:47 +03:00
@@ -16,6 +16,7 @@
 
 
 #define DBTUP_C
+#include <Dblqh.hpp>
 #include "Dbtup.hpp"
 #include <RefConvert.hpp>
 #include <ndb_limits.h>
@@ -26,12 +27,14 @@
 #include <Interpreter.hpp>
 #include <signaldata/TupCommit.hpp>
 #include <signaldata/TupKey.hpp>
+#include <signaldata/AttrInfo.hpp>
 #include <NdbSqlUtil.hpp>
 
 /* ----------------------------------------------------------------- */
 /* -----------       INIT_STORED_OPERATIONREC         -------------- */
 /* ----------------------------------------------------------------- */
-int Dbtup::initStoredOperationrec(Operationrec* const regOperPtr,
+int Dbtup::initStoredOperationrec(Operationrec* regOperPtr,
+                                  KeyReqStruct* req_struct,
                                   Uint32 storedId) 
 {
   jam();
@@ -40,29 +43,28 @@
   if (storedPtr.i != RNIL) {
     if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) {
       storedPtr.p->storedCounter++;
-      regOperPtr->firstAttrinbufrec = storedPtr.p->storedLinkFirst;
-      regOperPtr->lastAttrinbufrec = storedPtr.p->storedLinkLast;
-      regOperPtr->attrinbufLen = storedPtr.p->storedProcLength;
-      regOperPtr->currentAttrinbufLen = storedPtr.p->storedProcLength;
+      regOperPtr->firstAttrinbufrec= storedPtr.p->storedLinkFirst;
+      regOperPtr->lastAttrinbufrec= storedPtr.p->storedLinkLast;
+      regOperPtr->currentAttrinbufLen= storedPtr.p->storedProcLength;
+      req_struct->attrinfo_len= storedPtr.p->storedProcLength;
       return ZOK;
-    }//if
-  }//if
-  terrorCode = ZSTORED_PROC_ID_ERROR;
+    }
+  }
+  terrorCode= ZSTORED_PROC_ID_ERROR;
   return terrorCode;
-}//Dbtup::initStoredOperationrec()
+}
 
-void Dbtup::copyAttrinfo(Signal* signal,
-                         Operationrec * const regOperPtr,
+void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
                          Uint32* inBuffer)
 {
   AttrbufrecPtr copyAttrBufPtr;
-  Uint32 RnoOfAttrBufrec = cnoOfAttrbufrec;
+  Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
   int RbufLen;
-  Uint32 RinBufIndex = 0;
+  Uint32 RinBufIndex= 0;
   Uint32 Rnext;
   Uint32 Rfirst;
-  Uint32 TstoredProcedure = (regOperPtr->storedProcedureId != ZNIL);
-  Uint32 RnoFree = cnoFreeAttrbufrec;
+  Uint32 TstoredProcedure= (regOperPtr->storedProcedureId != ZNIL);
+  Uint32 RnoFree= cnoFreeAttrbufrec;
 
 //-------------------------------------------------------------------------
 // As a prelude to the execution of the TUPKEYREQ we will copy the program
@@ -70,793 +72,832 @@
 // between the buffers. In particular this will make the interpreter less
 // complex. Hopefully it does also improve performance.
 //-------------------------------------------------------------------------
-  copyAttrBufPtr.i = regOperPtr->firstAttrinbufrec;
+  copyAttrBufPtr.i= regOperPtr->firstAttrinbufrec;
   while (copyAttrBufPtr.i != RNIL) {
     jam();
     ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
     ptrAss(copyAttrBufPtr, attrbufrec);
-    RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
-    Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
-    Rfirst = cfirstfreeAttrbufrec;
+    RbufLen= copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
+    Rnext= copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
+    Rfirst= cfirstfreeAttrbufrec;
     MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
                      &copyAttrBufPtr.p->attrbuf[0],
                      RbufLen);
     RinBufIndex += RbufLen;
     if (!TstoredProcedure) {
-      copyAttrBufPtr.p->attrbuf[ZBUF_NEXT] = Rfirst;
-      cfirstfreeAttrbufrec = copyAttrBufPtr.i;
+      copyAttrBufPtr.p->attrbuf[ZBUF_NEXT]= Rfirst;
+      cfirstfreeAttrbufrec= copyAttrBufPtr.i;
       RnoFree++;
-    }//if
-    copyAttrBufPtr.i = Rnext;
-  }//while
-  cnoFreeAttrbufrec = RnoFree;
+    }
+    copyAttrBufPtr.i= Rnext;
+  }
+  cnoFreeAttrbufrec= RnoFree;
   if (TstoredProcedure) {
     jam();
     StoredProcPtr storedPtr;
     c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId);
     ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE);
     storedPtr.p->storedCounter--;
-    regOperPtr->storedProcedureId = ZNIL;
-  }//if
+  }
   // Release the ATTRINFO buffers
-  regOperPtr->firstAttrinbufrec = RNIL;
-  regOperPtr->lastAttrinbufrec = RNIL;
-}//Dbtup::copyAttrinfo()
+  regOperPtr->storedProcedureId= RNIL;
+  regOperPtr->firstAttrinbufrec= RNIL;
+  regOperPtr->lastAttrinbufrec= RNIL;
+}
 
 void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
-                                       Uint32 length,
-                                       Operationrec * const regOperPtr) 
-{
-  AttrbufrecPtr TAttrinbufptr;
-  TAttrinbufptr.i = cfirstfreeAttrbufrec;
-  if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
-      (cnoFreeAttrbufrec > MIN_ATTRBUF)) {
-    ptrAss(TAttrinbufptr, attrbufrec);
-    MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
-                     &signal->theData[3],
-                     length);
-    Uint32 RnoFree = cnoFreeAttrbufrec;
-    Uint32 Rnext = TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
-    TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN] = length;
-    TAttrinbufptr.p->attrbuf[ZBUF_NEXT] = RNIL;
-
-    AttrbufrecPtr locAttrinbufptr;
-    Uint32 RnewLen = regOperPtr->currentAttrinbufLen;
-
-    locAttrinbufptr.i = regOperPtr->lastAttrinbufrec;
-    cfirstfreeAttrbufrec = Rnext;
-    cnoFreeAttrbufrec = RnoFree - 1;
-    RnewLen += length;
-    regOperPtr->lastAttrinbufrec = TAttrinbufptr.i;
-    regOperPtr->currentAttrinbufLen = RnewLen;
-    if (locAttrinbufptr.i == RNIL) {
-      regOperPtr->firstAttrinbufrec = TAttrinbufptr.i;
-      return;
-    } else {
+                                       const Uint32 *data,
+				       Uint32 len,
+                                       Operationrec * regOperPtr) 
+{
+  while(len)
+  {
+    Uint32 length = len > AttrInfo::DataLength ? AttrInfo::DataLength : len;
+
+    AttrbufrecPtr TAttrinbufptr;
+    TAttrinbufptr.i= cfirstfreeAttrbufrec;
+    if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
+	(cnoFreeAttrbufrec > MIN_ATTRBUF)) {
+      ptrAss(TAttrinbufptr, attrbufrec);
+      MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
+		       data,
+		       length);
+      Uint32 RnoFree= cnoFreeAttrbufrec;
+      Uint32 Rnext= TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
+      TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN]= length;
+      TAttrinbufptr.p->attrbuf[ZBUF_NEXT]= RNIL;
+      
+      AttrbufrecPtr locAttrinbufptr;
+      Uint32 RnewLen= regOperPtr->currentAttrinbufLen;
+      
+      locAttrinbufptr.i= regOperPtr->lastAttrinbufrec;
+      cfirstfreeAttrbufrec= Rnext;
+      cnoFreeAttrbufrec= RnoFree - 1;
+      RnewLen += length;
+      regOperPtr->lastAttrinbufrec= TAttrinbufptr.i;
+      regOperPtr->currentAttrinbufLen= RnewLen;
+      if (locAttrinbufptr.i == RNIL) {
+	regOperPtr->firstAttrinbufrec= TAttrinbufptr.i;
+      } else {
+	jam();
+	ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
+	locAttrinbufptr.p->attrbuf[ZBUF_NEXT]= TAttrinbufptr.i;
+      }
+      if (RnewLen < ZATTR_BUFFER_SIZE) {
+      } else {
+	jam();
+	set_trans_state(regOperPtr, TRANS_TOO_MUCH_AI);
+	return;
+      }
+    } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
       jam();
-      ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
-      locAttrinbufptr.p->attrbuf[ZBUF_NEXT] = TAttrinbufptr.i;
-    }//if
-    if (RnewLen < ZATTR_BUFFER_SIZE) {
-      return;
+      set_trans_state(regOperPtr, TRANS_ERROR_WAIT_TUPKEYREQ);
     } else {
-      jam();
-      regOperPtr->transstate = TOO_MUCH_AI;
-      return;
-    }//if
-  } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
-    jam();
-    regOperPtr->transstate = ERROR_WAIT_TUPKEYREQ;
-  } else {
-    ndbrequire(false);
-  }//if
-}//Dbtup::handleATTRINFOforTUPKEYREQ()
+      ndbrequire(false);
+    }
+    
+    len -= length;
+    data += length;    
+  }
+}
 
 void Dbtup::execATTRINFO(Signal* signal) 
 {
-  OperationrecPtr regOpPtr;
-  Uint32 Rsig0 = signal->theData[0];
-  Uint32 Rlen = signal->length();
-  regOpPtr.i = Rsig0;
-
+  Uint32 Rsig0= signal->theData[0];
+  Uint32 Rlen= signal->length();
   jamEntry();
 
-  ptrCheckGuard(regOpPtr, cnoOfOprec, operationrec);
-  if (regOpPtr.p->transstate == IDLE) {
-    handleATTRINFOforTUPKEYREQ(signal, Rlen - 3, regOpPtr.p);
+  receive_attrinfo(signal, Rsig0, signal->theData+3, Rlen-3);
+}
+ 
+void
+Dbtup::receive_attrinfo(Signal* signal, Uint32 op, 
+			const Uint32* data, Uint32 Rlen)
+{ 
+  OperationrecPtr regOpPtr;
+  regOpPtr.i= op;
+  c_operation_pool.getPtr(regOpPtr, op);
+  TransState trans_state= get_trans_state(regOpPtr.p);
+  if (trans_state == TRANS_IDLE) {
+    handleATTRINFOforTUPKEYREQ(signal, data, Rlen, regOpPtr.p);
     return;
-  } else if (regOpPtr.p->transstate == WAIT_STORED_PROCEDURE_ATTR_INFO) {
-    storedProcedureAttrInfo(signal, regOpPtr.p, Rlen - 3, 3, false);
+  } else if (trans_state == TRANS_WAIT_STORED_PROCEDURE_ATTR_INFO) {
+    storedProcedureAttrInfo(signal, regOpPtr.p, data, Rlen, false);
     return;
-  }//if
-  switch (regOpPtr.p->transstate) {
-  case ERROR_WAIT_STORED_PROCREQ:
+  }
+  switch (trans_state) {
+  case TRANS_ERROR_WAIT_STORED_PROCREQ:
     jam();
-  case TOO_MUCH_AI:
+  case TRANS_TOO_MUCH_AI:
     jam();
-  case ERROR_WAIT_TUPKEYREQ:
+  case TRANS_ERROR_WAIT_TUPKEYREQ:
     jam();
     return;	/* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */
-    break;
-  case DISCONNECTED:
+  case TRANS_DISCONNECTED:
     jam();
-  case STARTED:
+  case TRANS_STARTED:
     jam();
   default:
     ndbrequire(false);
-  }//switch
-}//Dbtup::execATTRINFO()
+  }
+}
 
 void Dbtup::execTUP_ALLOCREQ(Signal* signal)
 {
   OperationrecPtr regOperPtr;
-  TablerecPtr regTabPtr;
-  FragrecordPtr regFragPtr;
 
   jamEntry();
 
-  regOperPtr.i = signal->theData[0];
-  regFragPtr.i = signal->theData[1];
-  regTabPtr.i = signal->theData[2];
-
-  if (!((regOperPtr.i < cnoOfOprec) &&
-        (regFragPtr.i < cnoOfFragrec) &&
-        (regTabPtr.i < cnoOfTablerec))) {
-    ndbrequire(false);
-  }//if
-  ptrAss(regOperPtr, operationrec);
-  ptrAss(regFragPtr, fragrecord);
-  ptrAss(regTabPtr, tablerec);
-
-//---------------------------------------------------
-/* --- Allocate a tuple as requested by ACC    --- */
-//---------------------------------------------------
-  PagePtr pagePtr;
-  Uint32 pageOffset;
-  if (!allocTh(regFragPtr.p,
-               regTabPtr.p,
-               NORMAL_PAGE,
-               signal,
-               pageOffset,
-               pagePtr)) {
-    signal->theData[0] = terrorCode; // Indicate failure
-    return;
-  }//if
-  Uint32 fragPageId = pagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS];
-  Uint32 pageIndex = ((pageOffset - ZPAGE_HEADER_SIZE) /
-                       regTabPtr.p->tupheadsize) << 1;
-  regOperPtr.p->tableRef = regTabPtr.i;
-  regOperPtr.p->fragId = regFragPtr.p->fragmentId;
-  regOperPtr.p->realPageId = pagePtr.i;
-  regOperPtr.p->fragPageId = fragPageId;
-  regOperPtr.p->pageOffset = pageOffset;
-  regOperPtr.p->pageIndex  = pageIndex;
-  /* -------------------------------------------------------------- */
-  /* AN INSERT IS UNDONE BY FREEING THE DATA OCCUPIED BY THE INSERT */
-  /* THE ONLY DATA WE HAVE TO LOG EXCEPT THE TYPE, PAGE AND INDEX   */
-  /* IS THE AMOUNT OF DATA TO FREE                                  */
-  /* -------------------------------------------------------------- */
-  if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
-    jam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_DELETE_TH,
-                        fragPageId,
-                        pageIndex,
-                        regTabPtr.i,
-                        regFragPtr.p->fragmentId,
-                        regFragPtr.p->checkpointVersion);
-  }//if
-
-  //---------------------------------------------------------------
-  // Initialise Active operation list by setting the list to empty
-  //---------------------------------------------------------------
-  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
-  pagePtr.p->pageWord[pageOffset] = RNIL;
-
-  signal->theData[0] = 0;
-  signal->theData[1] = fragPageId;
-  signal->theData[2] = pageIndex;
-}//Dbtup::execTUP_ALLOCREQ()
+  regOperPtr.i= signal->theData[0];
+  c_operation_pool.getPtr(regOperPtr);
+  
+  regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
+  //ndbout_c("execTUP_ALLOCREQ");
+
+  signal->theData[0]= 0;
+  signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
+  signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
+  return;
+
+mem_error:
+  jam();
+  signal->theData[0]= ZMEM_NOMEM_ERROR;
+  return;
+}
 
 void
-Dbtup::setChecksum(Page* const pagePtr, Uint32 tupHeadOffset, Uint32 tupHeadSize)
+Dbtup::setChecksum(Tuple_header* tuple_ptr,
+                   Tablerec* regTabPtr)
 {
-  // 2 == regTabPtr.p->tupChecksumIndex
-  pagePtr->pageWord[tupHeadOffset + 2] = 0;
-  Uint32 checksum = calculateChecksum(pagePtr, tupHeadOffset, tupHeadSize);
-  pagePtr->pageWord[tupHeadOffset + 2] = checksum;
-}//Dbtup::setChecksum()
+  tuple_ptr->m_checksum= 0;
+  tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
+}
 
 Uint32
-Dbtup::calculateChecksum(Page* pagePtr,
-                         Uint32 tupHeadOffset,
-                         Uint32 tupHeadSize)
-{
-  Uint32 checksum = 0;
-  Uint32 loopStop = tupHeadOffset + tupHeadSize;
-  ndbrequire(loopStop <= ZWORDS_ON_PAGE);
+Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
+                         Tablerec* regTabPtr)
+{
+  Uint32 checksum;
+  Uint32 i, rec_size, *tuple_header;
+  rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
+  tuple_header= tuple_ptr->m_data;
+  checksum= 0;
   // includes tupVersion
-  for (Uint32 i = tupHeadOffset + 1; i < loopStop; i++) {
-    checksum ^= pagePtr->pageWord[i];
-  }//if
+  //printf("%p - ", tuple_ptr);
+  
+  if (regTabPtr->m_attributes[MM].m_no_of_varsize)
+    rec_size += Tuple_header::HeaderSize;
+  
+  for (i= 0; i < rec_size-2; i++) {
+    checksum ^= tuple_header[i];
+    //printf("%.8x ", tuple_header[i]);
+  }
+  
+  //printf("-> %.8x\n", checksum);
+
+#if 0
+  if (var_sized) {
+    /*
+    if (! req_struct->fix_var_together) {
+      jam();
+      checksum ^= tuple_header[rec_size];
+    }
+    */
+    jam();
+    var_data_part= req_struct->var_data_start;
+    vsize_words= calculate_total_var_size(req_struct->var_len_array,
+                                          regTabPtr->no_var_attr);
+    ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
+    for (i= 0; i < vsize_words; i++) {
+      checksum ^= var_data_part[i];
+    }
+  }
+#endif
   return checksum;
-}//Dbtup::calculateChecksum()
+}
 
 /* ----------------------------------------------------------------- */
 /* -----------       INSERT_ACTIVE_OP_LIST            -------------- */
 /* ----------------------------------------------------------------- */
-void Dbtup::insertActiveOpList(Signal* signal, 
-                               OperationrecPtr regOperPtr,
-                               Page*  const pagePtr,
-                               Uint32 pageOffset) 
-{
-  OperationrecPtr iaoPrevOpPtr;
-  ndbrequire(regOperPtr.p->inActiveOpList == ZFALSE);
-  regOperPtr.p->inActiveOpList = ZTRUE;
-  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
-  iaoPrevOpPtr.i = pagePtr->pageWord[pageOffset];
-  pagePtr->pageWord[pageOffset] = regOperPtr.i;
-  regOperPtr.p->prevActiveOp = RNIL;
-  regOperPtr.p->nextActiveOp = iaoPrevOpPtr.i;
-  if (iaoPrevOpPtr.i == RNIL) {
-    return;
+bool 
+Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
+			  KeyReqStruct* req_struct)
+{
+  OperationrecPtr prevOpPtr;
+  ndbrequire(!regOperPtr.p->op_struct.in_active_list);
+  regOperPtr.p->op_struct.in_active_list= true;
+  req_struct->prevOpPtr.i= 
+    prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
+  regOperPtr.p->prevActiveOp= prevOpPtr.i;
+  regOperPtr.p->nextActiveOp= RNIL;
+  regOperPtr.p->m_undo_buffer_space= 0;
+  req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
+  if (prevOpPtr.i == RNIL) {
+    set_change_mask_state(regOperPtr.p, USE_SAVED_CHANGE_MASK);
+    regOperPtr.p->saved_change_mask[0] = 0;
+    regOperPtr.p->saved_change_mask[1] = 0;
+    return true;
   } else {
-    jam();
-    ptrCheckGuard(iaoPrevOpPtr, cnoOfOprec, operationrec);
-    iaoPrevOpPtr.p->prevActiveOp = regOperPtr.i;
-    if (iaoPrevOpPtr.p->optype == ZDELETE &&
-        regOperPtr.p->optype == ZINSERT) {
-      jam();
-      // mark both
-      iaoPrevOpPtr.p->deleteInsertFlag = 1;
-      regOperPtr.p->deleteInsertFlag = 1;
-    }
-    return;
-  }//if
-}//Dbtup::insertActiveOpList()
+    req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
+    prevOpPtr.p->nextActiveOp= regOperPtr.i;
 
-void Dbtup::linkOpIntoFragList(OperationrecPtr regOperPtr,
-                               Fragrecord* const regFragPtr) 
-{
-  OperationrecPtr sopTmpOperPtr;
-  Uint32 tail = regFragPtr->lastusedOprec;
-  ndbrequire(regOperPtr.p->inFragList == ZFALSE);
-  regOperPtr.p->inFragList = ZTRUE;
-  regOperPtr.p->prevOprecInList = tail;
-  regOperPtr.p->nextOprecInList = RNIL;
-  sopTmpOperPtr.i = tail;
-  if (tail == RNIL) {
-    regFragPtr->firstusedOprec = regOperPtr.i;
-  } else {
-    jam();
-    ptrCheckGuard(sopTmpOperPtr, cnoOfOprec, operationrec);
-    sopTmpOperPtr.p->nextOprecInList = regOperPtr.i;
-  }//if
-  regFragPtr->lastusedOprec = regOperPtr.i;
-}//Dbtup::linkOpIntoFragList()
-
-/*
-This routine is optimised for use from TUPKEYREQ.
-This means that a lot of input data is stored in the operation record.
-The routine expects the following data in the operation record to be
-set-up properly.
-Transaction data
-1) transid1
-2) transid2
-3) savePointId
-
-Operation data
-4) optype
-5) dirtyOp
-
-Tuple address
-6) fragPageId
-7) pageIndex
-
-regFragPtr and regTabPtr are references to the table and fragment data and
-is read-only.
-
-The routine will set up the following data in the operation record if
-returned with success.
-
-Tuple address data
-1) realPageId
-2) fragPageId
-3) pageOffset
-4) pageIndex
-
-Also the pagePtr is an output variable if the routine returns with success.
-It's input value can be undefined.
-*/
-bool
-Dbtup::getPage(PagePtr& pagePtr,
-               Operationrec* const regOperPtr,
-               Fragrecord* const regFragPtr,
-               Tablerec* const regTabPtr)
-{
-/* ------------------------------------------------------------------------- */
-// GET THE REFERENCE TO THE TUPLE HEADER BY TRANSLATING THE FRAGMENT PAGE ID
-// INTO A REAL PAGE ID AND BY USING THE PAGE INDEX TO DERIVE THE PROPER INDEX
-// IN THE REAL PAGE.
-/* ------------------------------------------------------------------------- */
-  pagePtr.i = getRealpid(regFragPtr, regOperPtr->fragPageId);
-  regOperPtr->realPageId = pagePtr.i;
-  Uint32 RpageIndex = regOperPtr->pageIndex;
-  Uint32 Rtupheadsize = regTabPtr->tupheadsize;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 RpageIndexScaled = RpageIndex >> 1;
-  ndbrequire((RpageIndex & 1) == 0);
-  regOperPtr->pageOffset = ZPAGE_HEADER_SIZE + 
-                           (Rtupheadsize * RpageIndexScaled);
-
-  OperationrecPtr leaderOpPtr;
-  ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-  leaderOpPtr.i = pagePtr.p->pageWord[regOperPtr->pageOffset];
-  if (leaderOpPtr.i == RNIL) {
-    return true;
-  }//if
-  ptrCheckGuard(leaderOpPtr, cnoOfOprec, operationrec);
-  bool dirtyRead = ((regOperPtr->optype == ZREAD) &&
-                    (regOperPtr->dirtyOp == 1));
-  if (dirtyRead) {
-    bool sameTrans = ((regOperPtr->transid1 == leaderOpPtr.p->transid1) &&
-                      (regOperPtr->transid2 == leaderOpPtr.p->transid2));
-    if (!sameTrans) {
-      if (!getPageLastCommitted(regOperPtr, leaderOpPtr.p)) {
-        return false;
-      }//if
-      pagePtr.i = regOperPtr->realPageId;
-      ptrCheckGuard(pagePtr, cnoOfPage, page);
-      return true;
-    }//if
-  }//if
-  if (regOperPtr->optype == ZREAD) {
-    /*
-    Read uses savepoint id's to find the correct tuple version.
-    */
-    if (getPageThroughSavePoint(regOperPtr, leaderOpPtr.p)) {
-      jam();
-      pagePtr.i = regOperPtr->realPageId;
-      ptrCheckGuard(pagePtr, cnoOfPage, page);
+    regOperPtr.p->op_struct.m_wait_log_buffer= 
+      prevOpPtr.p->op_struct.m_wait_log_buffer;
+    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 
+      prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
+    regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
+    // start with prev mask (matters only for UPD o UPD)
+    set_change_mask_state(regOperPtr.p, get_change_mask_state(prevOpPtr.p));
+    regOperPtr.p->saved_change_mask[0] = prevOpPtr.p->saved_change_mask[0];
+    regOperPtr.p->saved_change_mask[1] = prevOpPtr.p->saved_change_mask[1];
+
+    prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
+    prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+
+    if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
+    {
+      Uint32 op= regOperPtr.p->op_struct.op_type;
+      Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
+      if (prevOp == ZDELETE)
+      {
+	if(op == ZINSERT)
+	{
+	  // mark both
+	  prevOpPtr.p->op_struct.delete_insert_flag= true;
+	  regOperPtr.p->op_struct.delete_insert_flag= true;
+	  return true;
+	} else {
+	  terrorCode= ZTUPLE_DELETED_ERROR;
+	  return false;
+	}
+      } 
+      else if(op == ZINSERT && prevOp != ZDELETE)
+      {
+	terrorCode= ZINSERT_ERROR;
+	return false;
+      }
       return true;
     }
-    return false;
-  }
-//----------------------------------------------------------------------
-// Check that no other operation is already active on the tuple. Also
-// that abort or commit is not ongoing.
-//----------------------------------------------------------------------
-  if (leaderOpPtr.p->tupleState == NO_OTHER_OP) {
-    jam();
-    if ((leaderOpPtr.p->optype == ZDELETE) &&
-        (regOperPtr->optype != ZINSERT)) {
-      jam();
-      terrorCode = ZTUPLE_DELETED_ERROR;
+    else
+    {
+      terrorCode= ZMUST_BE_ABORTED_ERROR;
       return false;
-    }//if
-    return true;
-  } else if (leaderOpPtr.p->tupleState == ALREADY_ABORTED) {
-    jam();
-    terrorCode = ZMUST_BE_ABORTED_ERROR;
-    return false;
-  } else {
-    ndbrequire(false);
-  }//if
-  return true;
-}//Dbtup::getPage()
+    }
+  }
+}
 
 bool
-Dbtup::getPageThroughSavePoint(Operationrec* regOperPtr,
-                               Operationrec* leaderOpPtr)
-{
-  bool found = false;
-  OperationrecPtr loopOpPtr;
-  loopOpPtr.p = leaderOpPtr;
-  while(true) {
-    if (regOperPtr->savePointId > loopOpPtr.p->savePointId) {
-      jam();
-      found = true;
-      break;
-    }
-    if (loopOpPtr.p->nextActiveOp == RNIL) {
-      break;
-    }
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    jam();
+Dbtup::setup_read(KeyReqStruct *req_struct,
+		  Operationrec* regOperPtr,
+		  Fragrecord* regFragPtr,
+		  Tablerec* regTabPtr,
+		  bool disk)
+{
+  OperationrecPtr currOpPtr;
+  currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
+  if (currOpPtr.i == RNIL)
+  {
+    if (regTabPtr->need_expand(disk))
+      prepare_read(req_struct, regTabPtr, disk);
+    return true;
   }
-  if (!found) {
-    return getPageLastCommitted(regOperPtr, loopOpPtr.p);
-  } else {
-    if (loopOpPtr.p->optype == ZDELETE) {
-      jam();
-      terrorCode = ZTUPLE_DELETED_ERROR;
-      return false;
+
+  do {
+    Uint32 savepointId= regOperPtr->savepointId;
+    bool dirty= req_struct->dirty_op;
+    
+    c_operation_pool.getPtr(currOpPtr);
+    bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
+					 req_struct->trans_id1,
+					 req_struct->trans_id2);
+    /**
+     * Read committed in same trans reads latest copy
+     */
+    if(dirty && !sameTrans)
+    {
+      savepointId= 0;
     }
-    if (loopOpPtr.p->tupleState == ALREADY_ABORTED) {
-      /*
-      Requested tuple version has already been aborted
-      */
-      jam();
-      terrorCode = ZMUST_BE_ABORTED_ERROR;
-      return false;
+    else if(sameTrans)
+    {
+      // Use savepoint even in read committed mode
+      dirty= false;
     }
-    bool use_copy;
-    if (loopOpPtr.p->prevActiveOp == RNIL) {
-      jam();
-      /*
-      Use original tuple since we are reading from the last written tuple.
-      We are the 
-      */
-      use_copy = false;
-    } else {
-      /*
-      Go forward in time to find a copy of the tuple which this operation
-      produced
-      */
-      loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-      ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-      if (loopOpPtr.p->optype == ZDELETE) {
-        /*
-        This operation was a Delete and thus have no copy tuple attached to
-        it. We will move forward to the next that either doesn't exist in
-        which case we will return the original tuple of any operation and
-        otherwise it must be an insert which contains a copy record.
-        */
-        if (loopOpPtr.p->prevActiveOp == RNIL) {
-          jam();
-          use_copy = false;
-        } else {
-          jam();
-          loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-          ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-          ndbrequire(loopOpPtr.p->optype == ZINSERT);
-          use_copy = true;
-        }
-      } else if (loopOpPtr.p->optype == ZUPDATE) {
-        jam();
-        /*
-        This operation which was the next in time have a copy which was the
-        result of the previous operation which we want to use. Thus use
-        the copy tuple of this operation.
-        */
-        use_copy = true;
-      } else {
-        /*
-        This operation was an insert that happened after an insert or update.
-        This is not a possible case.
-        */
-        ndbrequire(false);
-        return false;
+
+    OperationrecPtr prevOpPtr = currOpPtr;  
+    bool found= false;
+    while(true) 
+    {
+      if (savepointId > currOpPtr.p->savepointId) {
+	found= true;
+	break;
       }
+      if (currOpPtr.p->is_first_operation()){
+	break;
+      }
+      prevOpPtr= currOpPtr;
+      currOpPtr.i = currOpPtr.p->prevActiveOp;
+      c_operation_pool.getPtr(currOpPtr);
     }
-    if (use_copy) {
-      regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
-      regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
-      regOperPtr->pageIndex = loopOpPtr.p->pageIndexC;
-      regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
-    } else {
-      regOperPtr->realPageId = loopOpPtr.p->realPageId;
-      regOperPtr->fragPageId = loopOpPtr.p->fragPageId;
-      regOperPtr->pageIndex = loopOpPtr.p->pageIndex;
-      regOperPtr->pageOffset = loopOpPtr.p->pageOffset;
+    
+    Uint32 currOp= currOpPtr.p->op_struct.op_type;
+    
+    if((found && currOp == ZDELETE) || 
+       ((dirty || !found) && currOp == ZINSERT))
+    {
+      terrorCode= ZTUPLE_DELETED_ERROR;
+      break;
+    }
+    
+    if(dirty || !found)
+    {
+      
     }
+    else
+    {
+      req_struct->m_tuple_ptr= (Tuple_header*)
+	c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
+    }      
+
+    if (regTabPtr->need_expand(disk))
+      prepare_read(req_struct, regTabPtr, disk);
+    
+#if 0
+    ndbout_c("reading copy");
+    Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
+    req_struct->m_tuple_ptr= fixed_ptr;
+    req_struct->fix_var_together= true;  
+    req_struct->var_len_array= (Uint16*)var_ptr;
+    req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
+    Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
+					req_struct->var_pos_array,
+					regTabPtr->no_var_attr);
+    req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
+#endif
     return true;
-  }
+  } while(0);
+  
+  return false;
 }
 
-bool
-Dbtup::getPageLastCommitted(Operationrec* const regOperPtr,
-                            Operationrec* const leaderOpPtr)
-{
-//----------------------------------------------------------------------
-// Dirty reads wants to read the latest committed tuple. The latest
-// tuple value could be not existing or else we have to find the copy
-// tuple. Start by finding the end of the list to find the first operation
-// on the record in the ongoing transaction.
-//----------------------------------------------------------------------
+int
+Dbtup::load_diskpage(Signal* signal, 
+		     Uint32 opRec, Uint32 fragPtrI, 
+		     Uint32 local_key, Uint32 flags)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  fragptr.i= fragPtrI;
+  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
+  
+  Operationrec *  regOperPtr= operPtr.p;
+  Fragrecord * regFragPtr= fragptr.p;
+  
+  tabptr.i = regFragPtr->fragTableId;
+  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
+  Tablerec* regTabPtr = tabptr.p;
+  
+  if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT)
+  {
+    jam();
+    regOperPtr->op_struct.m_wait_log_buffer= 1;
+    regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
+    return 1;
+  } 
+  
   jam();
-  OperationrecPtr loopOpPtr;
-  loopOpPtr.p = leaderOpPtr;
-  while (loopOpPtr.p->nextActiveOp != RNIL) {
-    jam();
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  }//while
-  if (loopOpPtr.p->optype == ZINSERT) {
-    jam();
-//----------------------------------------------------------------------
-// With an insert in the start of the list we know that the tuple did not
-// exist before this transaction was started. We don't care if the current
-// transaction is in the commit phase since the commit is not really
-// completed until the operation is gone from TUP.
-//----------------------------------------------------------------------
-    terrorCode = ZTUPLE_DELETED_ERROR;
-    return false;
-  } else {
-//----------------------------------------------------------------------
-// A successful update and delete as first in the queue means that a tuple
-// exist in the committed world. We need to find it.
-//----------------------------------------------------------------------
-    if (loopOpPtr.p->optype == ZUPDATE) {
-      jam();
-//----------------------------------------------------------------------
-// The first operation was a delete we set our tuple reference to the
-// copy tuple of this operation.
-//----------------------------------------------------------------------
-      regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
-      regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
-      regOperPtr->pageIndex  = loopOpPtr.p->pageIndexC;
-      regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
-    } else if ((loopOpPtr.p->optype == ZDELETE) &&
-               (loopOpPtr.p->prevActiveOp == RNIL)) {
-      jam();
-//----------------------------------------------------------------------
-// There was only a delete. The original tuple still is ok.
-//----------------------------------------------------------------------
-    } else {
-      jam();
-//----------------------------------------------------------------------
-// There was another operation after the delete, this must be an insert
-// and we have found our copy tuple there.
-//----------------------------------------------------------------------
-      loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-      ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-      ndbrequire(loopOpPtr.p->optype == ZINSERT);
-      regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
-      regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
-      regOperPtr->pageIndex  = loopOpPtr.p->pageIndexC;
-      regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
-    }//if
-  }//if
-  return true;
-}//Dbtup::getPageLastCommitted()
+  Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
+  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
+  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
+						     frag_page_id);
+  regOperPtr->m_tuple_location.m_page_idx= page_idx;
+  
+  PagePtr page_ptr;
+  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
+  Tuple_header* ptr= (Tuple_header*)tmp;
+  
+  int res= 1;
+  Uint32 opPtr= ptr->m_operation_ptr_i;
+  if(ptr->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Page_cache_client::Request req;
+    memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    req.m_callback.m_callbackData= opRec;
+    req.m_callback.m_callbackFunction= 
+      safe_cast(&Dbtup::disk_page_load_callback);
+
+#ifdef ERROR_INSERT
+    if (ERROR_INSERTED(4022))
+    {
+      flags |= Page_cache_client::DELAY_REQ;
+      req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
+    }
+#endif
+    
+    if((res= m_pgman.get_page(signal, req, flags)) > 0)
+    {
+      //ndbout_c("in cache");
+      // In cache
+    } 
+    else if(res == 0)
+    {
+      //ndbout_c("waiting for callback");
+      // set state
+    }
+    else 
+    {
+      // Error
+    }
+  }
 
-void Dbtup::execTUPKEYREQ(Signal* signal) 
+  switch(flags & 7)
+  {
+  case ZREAD:
+  case ZREAD_EX:
+    break;
+  case ZDELETE:
+  case ZUPDATE:
+  case ZINSERT:
+  case ZWRITE:
+    regOperPtr->op_struct.m_wait_log_buffer= 1;
+    regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
+  }
+  return res;
+}
+
+void
+Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
 {
-  TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtr();
-  Uint32 RoperPtr = tupKeyReq->connectPtr;
-  Uint32 Rtabptr = tupKeyReq->tableRef;
-  Uint32 RfragId = tupKeyReq->fragId;
-  Uint32 Rstoredid = tupKeyReq->storedProcedure;
-  Uint32 Rfragptr = tupKeyReq->fragPtr;
-
-  Uint32 RnoOfOprec = cnoOfOprec;
-  Uint32 RnoOfTablerec = cnoOfTablerec;
-  Uint32 RnoOfFragrec = cnoOfFragrec;
-
-  operPtr.i = RoperPtr;
-  fragptr.i = Rfragptr;
-  tabptr.i = Rtabptr;
-  jamEntry();
+  c_operation_pool.getPtr(operPtr, opRec);
+  c_lqh->acckeyconf_load_diskpage_callback(signal, 
+					   operPtr.p->userpointer, page_id);
+}
 
-  ndbrequire(((RoperPtr < RnoOfOprec) &&
-        (Rtabptr < RnoOfTablerec) &&
-        (Rfragptr < RnoOfFragrec)));
-  ptrAss(operPtr, operationrec);
-  Operationrec * const regOperPtr = operPtr.p;
-  ptrAss(fragptr, fragrecord);
-  Fragrecord * const regFragPtr = fragptr.p;
-  ptrAss(tabptr, tablerec);
-  Tablerec* const regTabPtr = tabptr.p;
+int
+Dbtup::load_diskpage_scan(Signal* signal, 
+			  Uint32 opRec, Uint32 fragPtrI, 
+			  Uint32 local_key, Uint32 flags)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  fragptr.i= fragPtrI;
+  ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
+  
+  Operationrec *  regOperPtr= operPtr.p;
+  Fragrecord * regFragPtr= fragptr.p;
+  
+  tabptr.i = regFragPtr->fragTableId;
+  ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
+  Tablerec* regTabPtr = tabptr.p;
+  
+  jam();
+  Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
+  Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
+  regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
+						     frag_page_id);
+  regOperPtr->m_tuple_location.m_page_idx= page_idx;
+  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+  
+  PagePtr page_ptr;
+  Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
+  Tuple_header* ptr= (Tuple_header*)tmp;
+  
+  int res= 1;
+  Uint32 opPtr= ptr->m_operation_ptr_i;
+  if(ptr->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Page_cache_client::Request req;
+    memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    req.m_callback.m_callbackData= opRec;
+    req.m_callback.m_callbackFunction= 
+      safe_cast(&Dbtup::disk_page_load_scan_callback);
+    
+    if((res= m_pgman.get_page(signal, req, flags)) > 0)
+    {
+      // ndbout_c("in cache");
+      // In cache
+    } 
+    else if(res == 0)
+    {
+      //ndbout_c("waiting for callback");
+      // set state
+    }
+    else 
+    {
+      // Error
+    }
+  }
+  return res;
+}
 
-  Uint32 TrequestInfo = tupKeyReq->request;
+void
+Dbtup::disk_page_load_scan_callback(Signal* signal, 
+				    Uint32 opRec, Uint32 page_id)
+{
+  c_operation_pool.getPtr(operPtr, opRec);
+  c_lqh->next_scanconf_load_diskpage_callback(signal, 
+					      operPtr.p->userpointer, page_id);
+}
 
-  if (regOperPtr->transstate != IDLE) {
-    TUPKEY_abort(signal, 39);
-    return;
-  }//if
-/* ----------------------------------------------------------------- */
-// Operation is ZREAD when we arrive here so no need to worry about the
-// abort process.
-/* ----------------------------------------------------------------- */
-/* -----------    INITIATE THE OPERATION RECORD       -------------- */
-/* ----------------------------------------------------------------- */
-  regOperPtr->fragmentPtr = Rfragptr;
-  regOperPtr->dirtyOp = TrequestInfo & 1;
-  regOperPtr->opSimple = (TrequestInfo >> 1) & 1;
-  regOperPtr->interpretedExec = (TrequestInfo >> 10) & 1;
-  regOperPtr->optype = (TrequestInfo >> 6) & 0xf;
-
-  // Attributes needed by trigger execution
-  regOperPtr->noFiredTriggers = 0;
-  regOperPtr->tableRef = Rtabptr;
-  regOperPtr->tcOperationPtr = tupKeyReq->opRef;
-  regOperPtr->primaryReplica = tupKeyReq->primaryReplica;
-  regOperPtr->coordinatorTC = tupKeyReq->coordinatorTC;
-  regOperPtr->tcOpIndex = tupKeyReq->tcOpIndex;
-  regOperPtr->savePointId = tupKeyReq->savePointId;
-
-  regOperPtr->fragId = RfragId;
-
-  regOperPtr->fragPageId = tupKeyReq->keyRef1;
-  regOperPtr->pageIndex = tupKeyReq->keyRef2;
-  regOperPtr->attrinbufLen = regOperPtr->logSize = tupKeyReq->attrBufLen;
-  regOperPtr->recBlockref = tupKeyReq->applRef;
-
-// Schema Version in tupKeyReq->schemaVersion not used in this version
-  regOperPtr->storedProcedureId = Rstoredid;
-  regOperPtr->transid1 = tupKeyReq->transId1;
-  regOperPtr->transid2 = tupKeyReq->transId2;
-
-  regOperPtr->attroutbufLen = 0;
-/* ----------------------------------------------------------------------- */
-// INITIALISE TO DEFAULT VALUE
-// INIT THE COPY REFERENCE RECORDS TO RNIL TO ENSURE THAT THEIR VALUES
-// ARE VALID IF THEY EXISTS
-// NO PENDING CHECKPOINT WHEN COPY CREATED (DEFAULT)
-// NO TUPLE HAS BEEN ALLOCATED YET
-// NO COPY HAS BEEN CREATED YET
-/* ----------------------------------------------------------------------- */
-  regOperPtr->undoLogged = false;
-  regOperPtr->realPageId = RNIL;
-  regOperPtr->realPageIdC = RNIL;
-  regOperPtr->fragPageIdC = RNIL;
-
-  regOperPtr->pageOffset = ZNIL;
-  regOperPtr->pageOffsetC = ZNIL;
-
-  regOperPtr->pageIndexC = ZNIL;
-
-  // version not yet known
-  regOperPtr->tupVersion = ZNIL;
-  regOperPtr->deleteInsertFlag = 0;
-
-  regOperPtr->tupleState = TUPLE_BLOCKED;
-  regOperPtr->changeMask.clear();
-  
-  if (Rstoredid != ZNIL) {
-    ndbrequire(initStoredOperationrec(regOperPtr, Rstoredid) == ZOK);
-  }//if
-  copyAttrinfo(signal, regOperPtr, &cinBuffer[0]);
+void Dbtup::execTUPKEYREQ(Signal* signal) 
+{
+   TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
+   KeyReqStruct req_struct;
+   Uint32 sig1, sig2, sig3, sig4;
+
+   Uint32 RoperPtr= tupKeyReq->connectPtr;
+   Uint32 Rfragptr= tupKeyReq->fragPtr;
+
+   Uint32 RnoOfFragrec= cnoOfFragrec;
+   Uint32 RnoOfTablerec= cnoOfTablerec;
+
+   jamEntry();
+   fragptr.i= Rfragptr;
+
+   ndbrequire(Rfragptr < RnoOfFragrec);
+
+   c_operation_pool.getPtr(operPtr, RoperPtr);
+   ptrAss(fragptr, fragrecord);
+
+   Uint32 TrequestInfo= tupKeyReq->request;
+
+   Operationrec *  regOperPtr= operPtr.p;
+   Fragrecord * regFragPtr= fragptr.p;
+
+   tabptr.i = regFragPtr->fragTableId;
+   ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
+   Tablerec* regTabPtr = tabptr.p;
+
+   req_struct.signal= signal;
+   req_struct.dirty_op= TrequestInfo & 1;
+   req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
+   req_struct.no_fired_triggers= 0;
+   req_struct.read_length= 0;
+   req_struct.max_attr_id_updated= 0;
+   req_struct.no_changed_attrs= 0;
+   req_struct.last_row= false;
+   req_struct.changeMask.clear();
+
+   if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
+   {
+     TUPKEY_abort(signal, 39);
+     return;
+   }
+
+ /* ----------------------------------------------------------------- */
+ // Operation is ZREAD when we arrive here so no need to worry about the
+ // abort process.
+ /* ----------------------------------------------------------------- */
+ /* -----------    INITIATE THE OPERATION RECORD       -------------- */
+ /* ----------------------------------------------------------------- */
+   Uint32 Rstoredid= tupKeyReq->storedProcedure;
+
+   regOperPtr->fragmentPtr= Rfragptr;
+   regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
+   regOperPtr->op_struct.delete_insert_flag = false;
+   regOperPtr->storedProcedureId= Rstoredid;
+
+   regOperPtr->m_copy_tuple_location.setNull();
+   regOperPtr->tupVersion= ZNIL;
+
+   sig1= tupKeyReq->savePointId;
+   sig2= tupKeyReq->primaryReplica;
+   sig3= tupKeyReq->keyRef2;
+   
+   regOperPtr->savepointId= sig1;
+   regOperPtr->op_struct.primary_replica= sig2;
+   regOperPtr->m_tuple_location.m_page_idx= sig3;
+
+   sig1= tupKeyReq->opRef;
+   sig2= tupKeyReq->tcOpIndex;
+   sig3= tupKeyReq->coordinatorTC;
+   sig4= tupKeyReq->keyRef1;
+
+   req_struct.tc_operation_ptr= sig1;
+   req_struct.TC_index= sig2;
+   req_struct.TC_ref= sig3;
+   req_struct.frag_page_id= sig4;
+   req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
+
+   sig1= tupKeyReq->attrBufLen;
+   sig2= tupKeyReq->applRef;
+   sig3= tupKeyReq->transId1;
+   sig4= tupKeyReq->transId2;
+
+   Uint32 disk_page= tupKeyReq->disk_page;
+   
+   req_struct.log_size= sig1;
+   req_struct.attrinfo_len= sig1;
+   req_struct.rec_blockref= sig2;
+   req_struct.trans_id1= sig3;
+   req_struct.trans_id2= sig4;
+   req_struct.m_disk_page_ptr.i= disk_page;
+
+   sig1 = tupKeyReq->m_row_id_page_no;
+   sig2 = tupKeyReq->m_row_id_page_idx;
+
+   req_struct.m_row_id.m_page_no = sig1;
+   req_struct.m_row_id.m_page_idx = sig2;
+   
+   Uint32 Roptype = regOperPtr->op_struct.op_type;
+
+   if (Rstoredid != ZNIL) {
+     ndbrequire(initStoredOperationrec(regOperPtr,
+				       &req_struct,
+				       Rstoredid) == ZOK);
+   }
+
+   copyAttrinfo(regOperPtr, &cinBuffer[0]);
+   
+   if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT)
+   {
+     // No tuple allocatated yet
+     goto do_insert;
+   }
 
-  PagePtr pagePtr;
-  if (!getPage(pagePtr, regOperPtr, regFragPtr, regTabPtr)) {
-    tupkeyErrorLab(signal);
-    return;
-  }//if
+   /**
+    * Get pointer to tuple
+    */
+   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr, 
+						      req_struct.frag_page_id);
+   
+   setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
+   
+   /**
+    * Check operation
+    */
+   if (Roptype == ZREAD) {
+     jam();
+     
+     if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr, 
+		    disk_page != RNIL))
+     {
+       if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1) 
+       {
+	 req_struct.log_size= 0;
+	 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+	 /* ---------------------------------------------------------------- */
+	 // Read Operations need not to be taken out of any lists. 
+	 // We also do not need to wait for commit since there is no changes 
+	 // to commit. Thus we
+	 // prepare the operation record already now for the next operation.
+	 // Write operations have set the state to STARTED above indicating 
+	 // that they are waiting for the Commit or Abort decision.
+	 /* ---------------------------------------------------------------- */
+	 set_trans_state(regOperPtr, TRANS_IDLE);
+	 regOperPtr->currentAttrinbufLen= 0;
+       }
+       return;
+     }
+     tupkeyErrorLab(signal);
+     return;
+   }
+   
+   if(insertActiveOpList(operPtr, &req_struct))
+   {
+     if(Roptype == ZINSERT)
+     {
+       jam();
+   do_insert:
+       if (handleInsertReq(signal, operPtr,
+			   fragptr, regTabPtr, &req_struct) == -1) 
+       {
+	 return;
+       }
+       if (!regTabPtr->tuxCustomTriggers.isEmpty()) 
+       {
+	 jam();
+	 if (executeTuxInsertTriggers(signal,
+				      regOperPtr,
+				      regFragPtr,
+				      regTabPtr) != 0) {
+	   jam();
+	   tupkeyErrorLab(signal);
+	   return;
+	 }
+       }
+       checkImmediateTriggersAfterInsert(&req_struct,
+					 regOperPtr,
+					 regTabPtr);
+       set_change_mask_state(regOperPtr, SET_ALL_MASK);
+       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+       return;
+     }
+
+     if (Roptype == ZUPDATE) {
+       jam();
+       if (handleUpdateReq(signal, regOperPtr,
+			   regFragPtr, regTabPtr, &req_struct, disk_page != RNIL) == -1) {
+	 return;
+       }
+       // If update operation is done on primary, 
+       // check any after op triggers
+       terrorCode= 0;
+       if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
+	 jam();
+	 if (executeTuxUpdateTriggers(signal,
+				      regOperPtr,
+				      regFragPtr,
+				      regTabPtr) != 0) {
+	   jam();
+	   tupkeyErrorLab(signal);
+	   return;
+	 }
+       }
+       checkImmediateTriggersAfterUpdate(&req_struct,
+					 regOperPtr,
+					 regTabPtr);
+       // XXX use terrorCode for now since all methods are void
+       if (terrorCode != 0) 
+       {
+	 tupkeyErrorLab(signal);
+	 return;
+       }
+       update_change_mask_info(&req_struct, regOperPtr);
+       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+       return;
+     } 
+     else if(Roptype == ZDELETE)
+     {
+       jam();
+       if (handleDeleteReq(signal, regOperPtr,
+			   regFragPtr, regTabPtr, &req_struct) == -1) {
+	 return;
+       }
+       /*
+	* TUX doesn't need to check for triggers at delete since entries in
+	* the index are kept until commit time.
+	*/
+
+       /*
+	* Secondary index triggers fire on the primary after a delete.
+	*/
+       checkImmediateTriggersAfterDelete(&req_struct,
+					 regOperPtr, 
+					 regTabPtr);
+       set_change_mask_state(regOperPtr, DELETE_CHANGES);
+       req_struct.log_size= 0;
+       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+       return;
+     }
+     else
+     {
+       ndbrequire(false); // Invalid op type
+     }
+   }
 
-  Uint32 Roptype = regOperPtr->optype;
-  if (Roptype == ZREAD) {
-    jam();
-    if (handleReadReq(signal, regOperPtr, regTabPtr, pagePtr.p) != -1) {
-      sendTUPKEYCONF(signal, regOperPtr, 0);
-/* ------------------------------------------------------------------------- */
-// Read Operations need not to be taken out of any lists. We also do not
-// need to wait for commit since there is no changes to commit. Thus we
-// prepare the operation record already now for the next operation.
-// Write operations have set the state to STARTED above indicating that
-// they are waiting for the Commit or Abort decision.
-/* ------------------------------------------------------------------------- */
-      regOperPtr->transstate = IDLE;
-      regOperPtr->currentAttrinbufLen = 0;
-    }//if
-    return;
-  }//if
-  linkOpIntoFragList(operPtr, regFragPtr);
-  insertActiveOpList(signal,
-                     operPtr,
-                     pagePtr.p,
-                     regOperPtr->pageOffset);
-  if (isUndoLoggingBlocked(regFragPtr)) {
-    TUPKEY_abort(signal, 38);
-    return;
-  }//if
-/* ---------------------------------------------------------------------- */
-// WE SET THE CURRENT ACTIVE OPERATION IN THE TUPLE TO POINT TO OUR
-//OPERATION RECORD. IF SEVERAL OPERATIONS WORK ON THIS TUPLE THEY ARE
-// LINKED TO OUR OPERATION RECORD. DIRTY READS CAN ACCESS THE COPY
-// TUPLE THROUGH OUR OPERATION RECORD.
-/* ---------------------------------------------------------------------- */
-  if (Roptype == ZINSERT) {
-    jam();
-    if (handleInsertReq(signal, regOperPtr,
-                        regFragPtr, regTabPtr, pagePtr.p) == -1) {
-      return;
-    }//if
-    if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
-      jam();
-      if (executeTuxInsertTriggers(signal, regOperPtr, regTabPtr) != 0) {
-        jam();
-        tupkeyErrorLab(signal);
-        return;
-      }
-    }
-    checkImmediateTriggersAfterInsert(signal,
-                                      regOperPtr,
-                                      regTabPtr);
-    sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize);
-    return;
-  }//if
-  if (regTabPtr->checksumIndicator &&
-      (calculateChecksum(pagePtr.p,
-                         regOperPtr->pageOffset,
-                         regTabPtr->tupheadsize) != 0)) {
-    jam();
-    terrorCode = ZTUPLE_CORRUPTED_ERROR;
-    tupkeyErrorLab(signal);
-    return;
-  }//if
-  if (Roptype == ZUPDATE) {
-    jam();
-    if (handleUpdateReq(signal, regOperPtr,
-                        regFragPtr, regTabPtr, pagePtr.p) == -1) {
-      return;
-    }//if
-    // If update operation is done on primary, 
-    // check any after op triggers
-    terrorCode = 0;
-    if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
-      jam();
-      if (executeTuxUpdateTriggers(signal, regOperPtr, regTabPtr) != 0) {
-        jam();
-        tupkeyErrorLab(signal);
-        return;
-      }
-    }
-    checkImmediateTriggersAfterUpdate(signal,
-                                      regOperPtr,
-                                      regTabPtr);
-    // XXX use terrorCode for now since all methods are void
-    if (terrorCode != 0) {
-      tupkeyErrorLab(signal);
-      return;
-    }
-    sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize);
-    return;
-  } else if (Roptype == ZDELETE) {
-    jam();
-    if (handleDeleteReq(signal, regOperPtr,
-                        regFragPtr, regTabPtr, pagePtr.p) == -1) {
-      return;
-    }//if
-    // If delete operation is done on primary, 
-    // check any after op triggers
-    if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
-      jam();
-      if (executeTuxDeleteTriggers(signal, regOperPtr, regTabPtr) != 0) {
-        jam();
-        tupkeyErrorLab(signal);
-        return;
-      }
-    }
-    checkImmediateTriggersAfterDelete(signal,
-                                      regOperPtr, 
-                                      regTabPtr);
-    sendTUPKEYCONF(signal, regOperPtr, 0);
-    return;
-  } else {
-    ndbrequire(false);
-  }//if
-}//Dbtup::execTUPKEYREQ()
+   tupkeyErrorLab(signal);
+ }
 
-/* ---------------------------------------------------------------- */
-/* ------------------------ CONFIRM REQUEST ----------------------- */
-/* ---------------------------------------------------------------- */
-void Dbtup::sendTUPKEYCONF(Signal* signal, 
-                           Operationrec * const regOperPtr, 
-                           Uint32 TlogSize) 
-{
-  TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend();  
-
-  Uint32 RuserPointer = regOperPtr->userpointer;
-  Uint32 RattroutbufLen = regOperPtr->attroutbufLen;
-  Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers;
-  BlockReference Ruserblockref = regOperPtr->userblockref;
-  Uint32 lastRow = regOperPtr->lastRow;
-
-  regOperPtr->transstate = STARTED;
-  regOperPtr->tupleState = NO_OTHER_OP;
-  tupKeyConf->userPtr = RuserPointer;
-  tupKeyConf->readLength = RattroutbufLen;
-  tupKeyConf->writeLength = TlogSize;
-  tupKeyConf->noFiredTriggers = RnoFiredTriggers;
-  tupKeyConf->lastRow = lastRow;
+void
+Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
+			Operationrec* regOperPtr,
+			Tablerec* regTabPtr)
+{
+  PagePtr page_ptr;
+  Uint32* ptr= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
+  req_struct->m_page_ptr = page_ptr;
+  req_struct->m_tuple_ptr = (Tuple_header*)ptr;
+  
+  ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
+  
+  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
+  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
+  
+  Uint32 num_attr= regTabPtr->m_no_of_attributes;
+  Uint32 descr_start= regTabPtr->tabDescriptor;
+  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+  req_struct->attr_descr= tab_descr; 
+}
 
-  EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal,
+ /* ---------------------------------------------------------------- */
+ /* ------------------------ CONFIRM REQUEST ----------------------- */
+ /* ---------------------------------------------------------------- */
+ void Dbtup::sendTUPKEYCONF(Signal* signal,
+			    KeyReqStruct *req_struct,
+			    Operationrec * regOperPtr)
+{
+  TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();  
+  
+  Uint32 Rcreate_rowid = req_struct->m_use_rowid;
+  Uint32 RuserPointer= regOperPtr->userpointer;
+  Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
+  Uint32 log_size= req_struct->log_size;
+  Uint32 read_length= req_struct->read_length;
+  Uint32 last_row= req_struct->last_row;
+  
+  set_trans_state(regOperPtr, TRANS_STARTED);
+  set_tuple_state(regOperPtr, TUPLE_PREPARED);
+  tupKeyConf->userPtr= RuserPointer;
+  tupKeyConf->readLength= read_length;
+  tupKeyConf->writeLength= log_size;
+  tupKeyConf->noFiredTriggers= RnoFiredTriggers;
+  tupKeyConf->lastRow= last_row;
+  tupKeyConf->rowid = Rcreate_rowid;
+  
+  EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
 		 TupKeyConf::SignalLength);
-  return;
-}//Dbtup::sendTUPKEYCONF()
+  
+}
+
 
 #define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
 
@@ -864,41 +905,39 @@
 /* ----------------------------- READ  ---------------------------- */
 /* ---------------------------------------------------------------- */
 int Dbtup::handleReadReq(Signal* signal,
-                         Operationrec* const regOperPtr,
-                         Tablerec* const regTabPtr,
-                         Page* pagePtr)
-{
-  Uint32 Ttupheadoffset = regOperPtr->pageOffset;
-  const BlockReference sendBref = regOperPtr->recBlockref;
-  if (regTabPtr->checksumIndicator &&
-      (calculateChecksum(pagePtr, Ttupheadoffset,
-                         regTabPtr->tupheadsize) != 0)) {
+                         Operationrec* regOperPtr,
+                         Tablerec* regTabPtr,
+                         KeyReqStruct* req_struct)
+{
+  Uint32 *dst;
+  Uint32 dstLen, start_index;
+  const BlockReference sendBref= req_struct->rec_blockref;
+  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
     jam();
-    terrorCode = ZTUPLE_CORRUPTED_ERROR;
+    ndbout_c("here2");
+    terrorCode= ZTUPLE_CORRUPTED_ERROR;
     tupkeyErrorLab(signal);
     return -1;
-  }//if
+  }
 
-  Uint32 * dst = &signal->theData[25];
-  Uint32 dstLen = (MAX_READ / 4) - 25;
   const Uint32 node = refToNode(sendBref);
   if(node != 0 && node != getOwnNodeId()) {
-    ;
+    start_index= 25;
   } else {
     jam();
     /**
      * execute direct
      */
-    dst = &signal->theData[3];
-    dstLen = (MAX_READ / 4) - 3;
+    start_index= 3;
   }
-  
-  if (regOperPtr->interpretedExec != 1) {
+  dst= &signal->theData[start_index];
+  dstLen= (MAX_READ / 4) - start_index;
+  if (!req_struct->interpreted_exec) {
     jam();
-    int ret = readAttributes(pagePtr,
-			     Ttupheadoffset,
+    int ret = readAttributes(req_struct,
 			     &cinBuffer[0],
-			     regOperPtr->attrinbufLen,
+			     req_struct->attrinfo_len,
 			     dst,
 			     dstLen,
 			     false);
@@ -908,242 +947,643 @@
 /* ------------------------------------------------------------------------- */
       jam();
       Uint32 TnoOfDataRead= (Uint32) ret;
-      regOperPtr->attroutbufLen = TnoOfDataRead;
-      sendReadAttrinfo(signal, TnoOfDataRead, regOperPtr);
+      req_struct->read_length= TnoOfDataRead;
+      sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
       return 0;
-    }//if
+    }
     jam();
     tupkeyErrorLab(signal);
     return -1;
   } else {
     jam();
-    regOperPtr->lastRow = 0;
-    if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) {
+    if (interpreterStartLab(signal, req_struct) != -1) {
       return 0;
-    }//if
+    }
     return -1;
-  }//if
-}//Dbtup::handleReadReq()
+  }
+}
 
 /* ---------------------------------------------------------------- */
 /* ---------------------------- UPDATE ---------------------------- */
 /* ---------------------------------------------------------------- */
 int Dbtup::handleUpdateReq(Signal* signal,
-                           Operationrec* const regOperPtr,
-                           Fragrecord* const regFragPtr,
-                           Tablerec* const regTabPtr,
-                           Page* const pagePtr) 
-{
-  PagePtr copyPagePtr;
-  Uint32 tuple_size = regTabPtr->tupheadsize;
-
-//---------------------------------------------------
-/* --- MAKE A COPY OF THIS TUPLE ON A COPY PAGE --- */
-//---------------------------------------------------
-  Uint32 RpageOffsetC;
-  if (!allocTh(regFragPtr,
-               regTabPtr,
-               COPY_PAGE,
-               signal,
-               RpageOffsetC,
-               copyPagePtr)) {
-    TUPKEY_abort(signal, 1);
-    return -1;
-  }//if
-  Uint32 RpageIdC = copyPagePtr.i;
-  Uint32 RfragPageIdC = copyPagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS];
-  Uint32 indexC = ((RpageOffsetC - ZPAGE_HEADER_SIZE) / tuple_size) << 1;
-  regOperPtr->pageIndexC = indexC;
-  regOperPtr->fragPageIdC = RfragPageIdC;
-  regOperPtr->realPageIdC = RpageIdC;
-  regOperPtr->pageOffsetC = RpageOffsetC;
-  /* -------------------------------------------------------------- */
-  /* IF WE HAVE AN ONGING CHECKPOINT WE HAVE TO LOG THE ALLOCATION  */
-  /* OF THE TUPLE HEADER TO BE ABLE TO DELETE IT UPON RESTART       */
-  /* THE ONLY DATA EXCEPT THE TYPE, PAGE, INDEX IS THE SIZE TO FREE */
-  /* -------------------------------------------------------------- */
-  if (isUndoLoggingActive(regFragPtr)) {
-    if (isPageUndoLogged(regFragPtr, RfragPageIdC)) {
-      jam();
-      regOperPtr->undoLogged = true;
-      cprAddUndoLogRecord(signal,
-                          ZLCPR_TYPE_DELETE_TH,
-                          RfragPageIdC,
-                          indexC,
-                          regOperPtr->tableRef,
-                          regOperPtr->fragId,
-                          regFragPtr->checkpointVersion);
-    }//if
-    if (isPageUndoLogged(regFragPtr, regOperPtr->fragPageId)) {
-      jam();
-      cprAddUndoLogRecord(signal,
-                          ZLCPR_TYPE_UPDATE_TH,
-                          regOperPtr->fragPageId,
-                          regOperPtr->pageIndex,
-                          regOperPtr->tableRef,
-                          regOperPtr->fragId,
-                          regFragPtr->checkpointVersion);
-      cprAddData(signal,
-                 regFragPtr,
-                 regOperPtr->realPageId,
-                 tuple_size,
-                 regOperPtr->pageOffset);
-    }//if
-  }//if
-  Uint32 RwordCount = tuple_size - 1;
-  Uint32 end_dest = RpageOffsetC + tuple_size;
-  Uint32 offset = regOperPtr->pageOffset;
-  Uint32 end_source = offset + tuple_size;
-  ndbrequire(end_dest <= ZWORDS_ON_PAGE && end_source <= ZWORDS_ON_PAGE);
-  void* Tdestination = (void*)&copyPagePtr.p->pageWord[RpageOffsetC + 1];
-  const void* Tsource = (void*)&pagePtr->pageWord[offset + 1];
-  MEMCOPY_NO_WORDS(Tdestination, Tsource, RwordCount);
-
-  Uint32 prev_tup_version;
-  // nextActiveOp is before this op in event order
-  if (regOperPtr->nextActiveOp == RNIL) {
+                           Operationrec* operPtrP,
+                           Fragrecord* regFragPtr,
+                           Tablerec* regTabPtr,
+                           KeyReqStruct* req_struct,
+			   bool disk) 
+{
+  Uint32 *dst;
+  Tuple_header *base= req_struct->m_tuple_ptr, *org;
+  if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
+					   regTabPtr->total_rec_size)) == 0)
+  {
+    terrorCode= ZMEM_NOMEM_ERROR;
+    goto error;
+  }
+
+  Uint32 tup_version;
+  if(operPtrP->is_first_operation())
+  {
+    org= req_struct->m_tuple_ptr;
+    tup_version= org->get_tuple_version();
+  }
+  else
+  {
+    Operationrec* prevOp= req_struct->prevOpPtr.p;
+    tup_version= prevOp->tupVersion;
+    org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
+  }
+
+  /**
+   * Check consistency before update/delete
+   */
+  req_struct->m_tuple_ptr= org;
+  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) 
+  {
+    terrorCode= ZTUPLE_CORRUPTED_ERROR;
+    goto error;
+  }
+
+  req_struct->m_tuple_ptr= (Tuple_header*)dst;
+
+  union {
+    Uint32 sizes[4];
+    Uint64 cmp[2];
+  };
+  
+  disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
+  if (regTabPtr->need_expand(disk))
+  {
+    expand_tuple(req_struct, sizes, org, regTabPtr, disk);
+    if(disk && operPtrP->m_undo_buffer_space == 0)
+    {
+      operPtrP->op_struct.m_wait_log_buffer = 1;
+      operPtrP->op_struct.m_load_diskpage_on_commit = 1;
+      Uint32 sz= operPtrP->m_undo_buffer_space= 
+	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
+      
+      terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
+					   sz);
+      if(unlikely(terrorCode))
+      {
+	operPtrP->m_undo_buffer_space= 0;
+	goto error;
+      }
+    }
+  }
+  else
+  {
+    memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
+  }
+  
+  tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
+  operPtrP->tupVersion= tup_version;
+  
+  int retValue;
+  if (!req_struct->interpreted_exec) {
     jam();
-    prev_tup_version = ((const Uint32*)Tsource)[0];
+    retValue= updateAttributes(req_struct,
+                               &cinBuffer[0],
+                               req_struct->attrinfo_len);
   } else {
-    OperationrecPtr prevOperPtr;
     jam();
-    prevOperPtr.i = regOperPtr->nextActiveOp;
-    ptrCheckGuard(prevOperPtr, cnoOfOprec, operationrec);
-    prev_tup_version = prevOperPtr.p->tupVersion;
-  }//if
-  regOperPtr->tupVersion = (prev_tup_version + 1) &
-                           ((1 << ZTUP_VERSION_BITS) - 1);
-  // global variable alert
-  ndbassert(operationrec + operPtr.i == regOperPtr);
-  copyPagePtr.p->pageWord[RpageOffsetC] = operPtr.i;
+    retValue= interpreterStartLab(signal, req_struct);
+  }
 
-  return updateStartLab(signal, regOperPtr, regTabPtr, pagePtr);
-}//Dbtup::handleUpdateReq()
+  if (retValue == -1) {
+    goto error;
+  }
+  
+  if (regTabPtr->need_shrink())
+  {  
+    shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
+    if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
+							    base,
+							    operPtrP,
+							    regFragPtr,
+							    regTabPtr,
+							    sizes)) {
+      goto error;
+    }
+  }
+  
+  req_struct->m_tuple_ptr->set_tuple_version(tup_version);
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
+    jam();
+    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+  }
+  return retValue;
+  
+error:
+  tupkeyErrorLab(signal);  
+  return -1;
+}
 
 /* ---------------------------------------------------------------- */
 /* ----------------------------- INSERT --------------------------- */
 /* ---------------------------------------------------------------- */
+void
+Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, 
+			      Operationrec* regOperPtr,
+			      Tablerec* regTabPtr)
+{
+  Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ? 
+    sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
+  regOperPtr->nextActiveOp= RNIL;
+  regOperPtr->prevActiveOp= RNIL;
+  regOperPtr->op_struct.in_active_list= true;
+  regOperPtr->m_undo_buffer_space= disk_undo; 
+  
+  req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
+  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
+  
+  Uint32 num_attr= regTabPtr->m_no_of_attributes;
+  Uint32 descr_start= regTabPtr->tabDescriptor;
+  Uint32 order_desc= regTabPtr->m_real_order_descriptor;
+  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+  req_struct->attr_descr= tab_descr; 
+  Uint16* order= (Uint16*)&tableDescriptor[order_desc];
+
+  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+  Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
+
+  if(cnt1)
+  {
+    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array;
+    dst->m_var_len_offset= cnt1;
+    dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
+    // Disk part is 32-bit aligned
+    ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
+    order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
+    Uint32 pos= 0;
+    Uint16 *pos_ptr = req_struct->var_pos_array;
+    Uint16 *len_ptr = pos_ptr + cnt1;
+    for(Uint32 i= 0; i<cnt1; i++)
+    {
+      * pos_ptr++ = pos;
+      * len_ptr++ = pos;
+      pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
+    }
+  } 
+  else
+  {
+    ptr -= Tuple_header::HeaderSize;
+  }
+
+  req_struct->m_disk_ptr= (Tuple_header*)ptr;
+  
+  if(cnt2)
+  {
+    KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
+    ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
+    dst->m_var_len_offset= cnt2;
+    dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
+  }
+  
+  // Set all null bits
+  memset(req_struct->m_tuple_ptr->m_null_bits+
+	 regTabPtr->m_offsets[MM].m_null_offset, 0xFF, 
+	 4*regTabPtr->m_offsets[MM].m_null_words);
+  memset(req_struct->m_disk_ptr->m_null_bits+
+	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
+	 4*regTabPtr->m_offsets[DD].m_null_words);
+  req_struct->m_tuple_ptr->m_header_bits= 
+    disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
+}
+
+void
+Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, 
+				     Operationrec* regOperPtr,
+				     Tablerec* regTabPtr)
+{
+  regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
+  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
+  
+  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
+  Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
+
+  if(cnt1)
+  {
+    // Disk part is 32-bit aligned
+    char *varptr = req_struct->m_var_data[MM].m_data_ptr;
+    ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
+  } 
+  else
+  {
+    ptr -= Tuple_header::HeaderSize;
+  }
+
+  req_struct->m_disk_ptr= (Tuple_header*)ptr;
+  
+  if(cnt2)
+  {
+    KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
+    ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
+    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
+    dst->m_var_len_offset= cnt2;
+    dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
+  }
+  
+  // Set all null bits
+  memset(req_struct->m_disk_ptr->m_null_bits+
+	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
+	 4*regTabPtr->m_offsets[DD].m_null_words);
+  req_struct->m_tuple_ptr->m_header_bits = 
+    (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
+}
+
 int Dbtup::handleInsertReq(Signal* signal,
-                           Operationrec* const regOperPtr,
-                           Fragrecord* const regFragPtr,
-                           Tablerec* const regTabPtr,
-                           Page* const pagePtr) 
-{
-  Uint32 ret_value;
-
-  if (regOperPtr->nextActiveOp != RNIL) {
-    jam();
-    OperationrecPtr prevExecOpPtr;
-    prevExecOpPtr.i = regOperPtr->nextActiveOp;
-    ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec);
-    if (prevExecOpPtr.p->optype != ZDELETE) {
-      terrorCode = ZINSERT_ERROR;
-      tupkeyErrorLab(signal);
-      return -1;
-    }//if
-    ret_value = handleUpdateReq(signal, regOperPtr,
-                                regFragPtr, regTabPtr, pagePtr);
-  } else {
+                           Ptr<Operationrec> regOperPtr,
+                           Ptr<Fragrecord> fragPtr,
+                           Tablerec* regTabPtr,
+                           KeyReqStruct *req_struct)
+{
+  Uint32 tup_version = 1;
+  Fragrecord* regFragPtr = fragPtr.p;
+  Uint32 *dst, *ptr= 0;
+  Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
+  Tuple_header *tuple_ptr;
+    
+  bool disk = regTabPtr->m_no_of_disk_attributes > 0;
+  bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
+  bool disk_insert = regOperPtr.p->is_first_operation() && disk;
+  bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
+  bool rowid = req_struct->m_use_rowid;
+  Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+  Uint32 frag_page_id = req_struct->frag_page_id;
+
+  union {
+    Uint32 sizes[4];
+    Uint64 cmp[2];
+  };
+
+  if (ERROR_INSERTED(4014))
+  {
+    dst = 0;
+    goto undo_buffer_error;
+  }
+
+  dst= c_undo_buffer.alloc_copy_tuple(&regOperPtr.p->m_copy_tuple_location,
+				      regTabPtr->total_rec_size);
+  if (unlikely(dst == 0))
+  {
+    goto undo_buffer_error;
+  }
+  tuple_ptr= req_struct->m_tuple_ptr= (Tuple_header*)dst;
+
+  if(mem_insert)
+  {
     jam();
-    regOperPtr->tupVersion = 0;
-    ret_value = updateStartLab(signal, regOperPtr, regTabPtr, pagePtr);
-  }//if
-  if (ret_value != (Uint32)-1) {
-    if (checkNullAttributes(regOperPtr, regTabPtr)) {
-      jam();
-      return 0;
-    }//if
-    TUPKEY_abort(signal, 17);
-  }//if
+    ndbassert(regOperPtr.p->is_first_operation()); // disk insert
+    prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
+  }
+  else
+  {
+    if (!regOperPtr.p->is_first_operation())
+    {
+      Operationrec* prevOp= req_struct->prevOpPtr.p;
+      ndbassert(prevOp->op_struct.op_type == ZDELETE);
+      tup_version= prevOp->tupVersion + 1;
+      
+      if(!prevOp->is_first_operation())
+	org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
+    }
+
+    if (regTabPtr->need_expand())
+      expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
+    else
+      memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
+  }
+  
+  if (disk_insert)
+  {
+    int res;
+    if (unlikely(!mem_insert))
+    {
+      sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
+      fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
+    }
+    
+    if (ERROR_INSERTED(4015))
+    {
+      terrorCode = 1501;
+      goto log_space_error;
+    }
+
+    res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
+				  regOperPtr.p->m_undo_buffer_space);
+    if(unlikely(res))
+    {
+      terrorCode= res;
+      goto log_space_error;
+    }
+  }
+  
+  regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
+  tuple_ptr->set_tuple_version(tup_version);
+
+  if (ERROR_INSERTED(4016))
+  {
+    terrorCode = ZAI_INCONSISTENCY_ERROR;
+    goto update_error;
+  }
+
+  if(unlikely(updateAttributes(req_struct, &cinBuffer[0], 
+			       req_struct->attrinfo_len) == -1))
+  {
+    goto update_error;
+  }
+
+  if (ERROR_INSERTED(4017))
+  {
+    goto null_check_error;
+  }
+  if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
+  {
+    goto null_check_error;
+  }
+  
+  if (regTabPtr->need_shrink())
+  {  
+    shrink_tuple(req_struct, sizes+2, regTabPtr, true);
+  }
+  
+  /**
+   * Alloc memory
+   */
+  if(mem_insert)
+  {
+    if (!rowid)
+    {
+      if (ERROR_INSERTED(4018))
+      {
+	goto mem_error;
+      }
+
+      if (!varsize)
+      {
+	jam();
+	ptr= alloc_fix_rec(regFragPtr,
+			   regTabPtr,
+			   &regOperPtr.p->m_tuple_location,
+			   &frag_page_id);
+      } 
+      else 
+      {
+	jam();
+	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+	ptr= alloc_var_rec(regFragPtr, regTabPtr,
+			   sizes[2+MM],
+			   &regOperPtr.p->m_tuple_location,
+			   &frag_page_id);
+      }
+      if (unlikely(ptr == 0))
+      {
+	goto mem_error;
+      }
+      req_struct->m_use_rowid = true;
+    }
+    else
+    {
+      regOperPtr.p->m_tuple_location = req_struct->m_row_id;
+      if (ERROR_INSERTED(4019))
+      {
+	terrorCode = ZROWID_ALLOCATED;
+	goto alloc_rowid_error;
+      }
+      
+      if (!varsize)
+      {
+	jam();
+	ptr= alloc_fix_rowid(regFragPtr,
+			     regTabPtr,
+			     &regOperPtr.p->m_tuple_location,
+			     &frag_page_id);
+      } 
+      else 
+      {
+	jam();
+	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
+	ptr= alloc_var_rowid(regFragPtr, regTabPtr,
+			     sizes[2+MM],
+			     &regOperPtr.p->m_tuple_location,
+			     &frag_page_id);
+      }
+      if (unlikely(ptr == 0))
+      {
+	goto alloc_rowid_error;
+      }
+    }
+    real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
+    regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
+    c_lqh->accminupdate(signal,
+			regOperPtr.p->userpointer,
+			&regOperPtr.p->m_tuple_location);
+    
+    base = (Tuple_header*)ptr;
+    base->m_operation_ptr_i= regOperPtr.i;
+    base->m_header_bits= Tuple_header::ALLOC | 
+      (varsize ? Tuple_header::CHAINED_ROW : 0);
+    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
+  }
+  else if(!rowid || !regOperPtr.p->is_first_operation())
+  {
+    int ret;
+    if (ERROR_INSERTED(4020))
+    {
+      goto size_change_error;
+    }
+
+    if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
+	unlikely(ret = handle_size_change_after_update(req_struct,
+						       base,
+						       regOperPtr.p,
+						       regFragPtr,
+						       regTabPtr,
+						       sizes)))
+    {
+      goto size_change_error;
+    }
+    req_struct->m_use_rowid = false;
+    base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
+  }
+  else
+  {
+    if ((req_struct->m_row_id.m_page_no == frag_page_id &&
+	 req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
+    {
+      ndbout_c("no mem insert but rowid (same)");
+      base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
+    }
+    else
+    {
+      // no mem insert, but rowid
+      ndbrequire(false);
+    }
+  }
+
+  base->m_header_bits |= Tuple_header::ALLOC & 
+    (regOperPtr.p->is_first_operation() ? ~0 : 1);
+  
+  if (disk_insert)
+  {
+    Local_key tmp;
+    Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
+      1 : sizes[2+DD];
+    
+    if (ERROR_INSERTED(4021))
+    {
+      terrorCode = 1601;
+      goto disk_prealloc_error;
+    }
+    
+    int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
+    if (unlikely(ret < 0))
+    {
+      terrorCode = -ret;
+      goto disk_prealloc_error;
+    }
+    
+    regOperPtr.p->op_struct.m_disk_preallocated= 1;
+    tmp.m_page_idx= size;
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
+    
+    /**
+     * Set ref from disk to mm
+     */
+    Local_key ref = regOperPtr.p->m_tuple_location;
+    ref.m_page_no = frag_page_id;
+    
+    Tuple_header* disk_ptr= req_struct->m_disk_ptr;
+    disk_ptr->m_header_bits = 0;
+    disk_ptr->m_base_record_ref= ref.ref();
+  }
+  
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum) 
+  {
+    jam();
+    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+  }
+  return 0;
+  
+size_change_error:
+  jam();
+  terrorCode = ZMEM_NOMEM_ERROR;
+  goto exit_error;
+  
+undo_buffer_error:
+  jam();
+  terrorCode= ZMEM_NOMEM_ERROR;
+  regOperPtr.p->m_undo_buffer_space = 0;
+  if (mem_insert)
+    regOperPtr.p->m_tuple_location.setNull();
+  regOperPtr.p->m_copy_tuple_location.setNull();
+  tupkeyErrorLab(signal);  
   return -1;
-}//Dbtup::handleInsertReq()
+  
+null_check_error:
+  jam();
+  terrorCode= ZNO_ILLEGAL_NULL_ATTR;
+  goto update_error;
+
+mem_error:
+  jam();
+  terrorCode= ZMEM_NOMEM_ERROR;
+  goto update_error;
+
+log_space_error:
+  jam();
+  regOperPtr.p->m_undo_buffer_space = 0;
+alloc_rowid_error:
+  jam();
+update_error:
+  jam();
+  if (mem_insert)
+  {
+    regOperPtr.p->op_struct.in_active_list = false;
+    regOperPtr.p->m_tuple_location.setNull();
+  }
+exit_error:
+  tupkeyErrorLab(signal);
+  return -1;
+
+disk_prealloc_error:
+  base->m_header_bits |= Tuple_header::FREED;
+  goto exit_error;
+}
 
 /* ---------------------------------------------------------------- */
 /* ---------------------------- DELETE ---------------------------- */
 /* ---------------------------------------------------------------- */
 int Dbtup::handleDeleteReq(Signal* signal,
-                           Operationrec* const regOperPtr,
-                           Fragrecord* const regFragPtr,
-                           Tablerec* const regTabPtr,
-                           Page* const pagePtr)
+                           Operationrec* regOperPtr,
+                           Fragrecord* regFragPtr,
+                           Tablerec* regTabPtr,
+                           KeyReqStruct *req_struct)
 {
   // delete must set but not increment tupVersion
-  if (regOperPtr->nextActiveOp != RNIL) {
-    OperationrecPtr prevExecOpPtr;
-    prevExecOpPtr.i = regOperPtr->nextActiveOp;
-    ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec);
-    regOperPtr->tupVersion = prevExecOpPtr.p->tupVersion;
-  } else {
-    jam();
-    regOperPtr->tupVersion = pagePtr->pageWord[regOperPtr->pageOffset + 1];
+  if (!regOperPtr->is_first_operation())
+  {
+    Operationrec* prevOp= req_struct->prevOpPtr.p;
+    regOperPtr->tupVersion= prevOp->tupVersion;
+    regOperPtr->m_copy_tuple_location= prevOp->m_copy_tuple_location;
+  } 
+  else 
+  {
+    regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
+    if(regTabPtr->m_no_of_disk_attributes)
+    {
+      Uint32 sz;
+      if(regTabPtr->m_attributes[DD].m_no_of_varsize)
+      {
+	/**
+	 * Need to have page in memory to read size 
+	 *   to alloc undo space
+	 */
+	abort();
+      }
+      else
+	sz= (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
+	  regTabPtr->m_offsets[DD].m_fix_header_size - 1;
+      
+      regOperPtr->m_undo_buffer_space= sz;
+      
+      int res;
+      if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id, 
+					sz)))
+      {
+	terrorCode= res;
+	regOperPtr->m_undo_buffer_space= 0;
+	goto error;
+      }
+      
+    }
   }
-  if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
-    jam();
-    cprAddUndoLogRecord(signal,
-                        ZINDICATE_NO_OP_ACTIVE,
-                        regOperPtr->fragPageId,
-                        regOperPtr->pageIndex,
-                        regOperPtr->tableRef,
-                        regOperPtr->fragId,
-                        regFragPtr->checkpointVersion);
-  }//if
-  if (regOperPtr->attrinbufLen == 0) {
+  if (req_struct->attrinfo_len == 0)
+  {
     return 0;
-  }//if
-/* ------------------------------------------------------------------------ */
-/* THE APPLICATION WANTS TO READ THE TUPLE BEFORE IT IS DELETED.            */
-/* ------------------------------------------------------------------------ */
-  return handleReadReq(signal, regOperPtr, regTabPtr, pagePtr);
-}//Dbtup::handleDeleteReq()
-
-int
-Dbtup::updateStartLab(Signal* signal,
-                      Operationrec* const regOperPtr,
-                      Tablerec* const regTabPtr,
-                      Page* const pagePtr)
-{
-  int retValue;
-  if (regOperPtr->optype == ZINSERT) {
-    jam();
-    setNullBits(pagePtr, regTabPtr, regOperPtr->pageOffset);
   }
-  if (regOperPtr->interpretedExec != 1) {
-    jam();
-    retValue = updateAttributes(pagePtr,
-                                regOperPtr->pageOffset,
-                                &cinBuffer[0],
-                                regOperPtr->attrinbufLen);
-    if (retValue == -1) {
-      tupkeyErrorLab(signal);
-      return -1;
-    }//if
-  } else {
-    jam();
-    retValue = interpreterStartLab(signal, pagePtr, regOperPtr->pageOffset);
-  }//if
-  ndbrequire(regOperPtr->tupVersion != ZNIL);
-  pagePtr->pageWord[regOperPtr->pageOffset + 1] = regOperPtr->tupVersion;
-  if (regTabPtr->checksumIndicator) {
-    jam();
-    setChecksum(pagePtr, regOperPtr->pageOffset, regTabPtr->tupheadsize);
-  }//if
-  return retValue;
-}//Dbtup::updateStartLab()
+  
+  return handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
 
-void
-Dbtup::setNullBits(Page* const regPage, Tablerec* const regTabPtr, Uint32 pageOffset)
-{
-  Uint32 noOfExtraNullWords = regTabPtr->tupNullWords;
-  Uint32 nullOffsetStart = regTabPtr->tupNullIndex + pageOffset;
-  ndbrequire((noOfExtraNullWords + nullOffsetStart) < ZWORDS_ON_PAGE);
-  for (Uint32 i = 0; i < noOfExtraNullWords; i++) {
-    regPage->pageWord[nullOffsetStart + i] = 0xFFFFFFFF;
-  }//for
-}//Dbtup::setNullBits()
+error:
+  tupkeyErrorLab(signal);
+  return -1;
+}
 
 bool
-Dbtup::checkNullAttributes(Operationrec* const regOperPtr,
-                           Tablerec* const regTabPtr)
+Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
+                           Tablerec* regTabPtr)
 {
 // Implement checking of updating all not null attributes in an insert here.
   Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;  
@@ -1158,14 +1598,14 @@
    * XXX remove or fix
    */
   attributeMask.clear();
-  attributeMask.bitOR(regOperPtr->changeMask);
+  attributeMask.bitOR(req_struct->changeMask);
   attributeMask.bitAND(regTabPtr->notNullAttributeMask);
   attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
   if (!attributeMask.isclear()) {
     return false;
-  }//if
+  }
   return true;
-}//Dbtup::checkNullAttributes()
+}
 
 /* ---------------------------------------------------------------- */
 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE    */
@@ -1213,46 +1653,44 @@
 /* ----------------- INTERPRETED EXECUTION  ----------------------- */
 /* ---------------------------------------------------------------- */
 int Dbtup::interpreterStartLab(Signal* signal,
-                               Page* const pagePtr,
-                               Uint32 TupHeadOffset) 
+                               KeyReqStruct *req_struct)
 {
-  Operationrec *  const regOperPtr = operPtr.p;
-  Uint32 RtotalLen;
+  Operationrec *  const regOperPtr= operPtr.p;
   int TnoDataRW;
+  Uint32 RtotalLen, start_index, dstLen;
+  Uint32 *dst;
 
-  Uint32 RinitReadLen = cinBuffer[0];
-  Uint32 RexecRegionLen = cinBuffer[1];
-  Uint32 RfinalUpdateLen = cinBuffer[2];
-  Uint32 RfinalRLen = cinBuffer[3];
-  Uint32 RsubLen = cinBuffer[4];
+  Uint32 RinitReadLen= cinBuffer[0];
+  Uint32 RexecRegionLen= cinBuffer[1];
+  Uint32 RfinalUpdateLen= cinBuffer[2];
+  Uint32 RfinalRLen= cinBuffer[3];
+  Uint32 RsubLen= cinBuffer[4];
 
-  Uint32 RattrinbufLen = regOperPtr->attrinbufLen;
-  const BlockReference sendBref = regOperPtr->recBlockref;
+  Uint32 RattrinbufLen= req_struct->attrinfo_len;
+  const BlockReference sendBref= req_struct->rec_blockref;
 
-  Uint32 * dst = &signal->theData[25];
-  Uint32 dstLen = (MAX_READ / 4) - 25;
   const Uint32 node = refToNode(sendBref);
   if(node != 0 && node != getOwnNodeId()) {
-    ;
+    start_index= 25;
   } else {
     jam();
     /**
      * execute direct
      */
-    dst = &signal->theData[3];
-    dstLen = (MAX_READ / 4) - 3;
+    start_index= 3;
   }
+  dst= &signal->theData[start_index];
+  dstLen= (MAX_READ / 4) - start_index;
   
-  RtotalLen = RinitReadLen;
+  RtotalLen= RinitReadLen;
   RtotalLen += RexecRegionLen;
   RtotalLen += RfinalUpdateLen;
   RtotalLen += RfinalRLen;
   RtotalLen += RsubLen;
 
-  Uint32 RattroutCounter = 0;
-  Uint32 RinstructionCounter = 5;
-  Uint32 RlogSize = 0;
-
+  Uint32 RattroutCounter= 0;
+  Uint32 RinstructionCounter= 5;
+  Uint32 RlogSize= 0;
   if (((RtotalLen + 5) == RattrinbufLen) &&
       (RattrinbufLen >= 5) &&
       (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
@@ -1269,22 +1707,21 @@
       // The first step that can be taken in the interpreter is to read
       // data of the tuple before any updates have been applied.
       /* ---------------------------------------------------------------- */
-      TnoDataRW = readAttributes(pagePtr,
-				 TupHeadOffset,
+      TnoDataRW= readAttributes(req_struct,
 				 &cinBuffer[5],
 				 RinitReadLen,
 				 &dst[0],
 				 dstLen,
                                  false);
       if (TnoDataRW != -1) {
-	RattroutCounter = TnoDataRW;
+	RattroutCounter= TnoDataRW;
 	RinstructionCounter += RinitReadLen;
       } else {
 	jam();
 	tupkeyErrorLab(signal);
 	return -1;
-      }//if
-    }//if
+      }
+    }
     if (RexecRegionLen > 0) {
       jam();
       /* ---------------------------------------------------------------- */
@@ -1292,10 +1729,9 @@
       // a register-based virtual machine which can read and write attributes
       // to and from registers.
       /* ---------------------------------------------------------------- */
-      Uint32 RsubPC = RinstructionCounter + RfinalUpdateLen + RfinalRLen;     
-      TnoDataRW = interpreterNextLab(signal,
-				     pagePtr,
-				     TupHeadOffset,
+      Uint32 RsubPC= RinstructionCounter + RfinalUpdateLen + RfinalRLen;     
+      TnoDataRW= interpreterNextLab(signal,
+                                     req_struct,
 				     &clogMemBuffer[0],
 				     &cinBuffer[RinstructionCounter],
 				     RexecRegionLen,
@@ -1305,21 +1741,20 @@
 				     sizeof(coutBuffer) / 4);
       if (TnoDataRW != -1) {
 	RinstructionCounter += RexecRegionLen;
-	RlogSize = TnoDataRW;
+	RlogSize= TnoDataRW;
       } else {
 	jam();
 	return -1;
-      }//if
-    }//if
+      }
+    }
     if (RfinalUpdateLen > 0) {
       jam();
       /* ---------------------------------------------------------------- */
       // We can also apply a set of updates without any conditions as part
       // of the interpreted execution.
       /* ---------------------------------------------------------------- */
-      if (regOperPtr->optype == ZUPDATE) {
-	TnoDataRW = updateAttributes(pagePtr,
-				     TupHeadOffset,
+      if (regOperPtr->op_struct.op_type == ZUPDATE) {
+	TnoDataRW= updateAttributes(req_struct,
 				     &cinBuffer[RinstructionCounter],
 				     RfinalUpdateLen);
 	if (TnoDataRW != -1) {
@@ -1332,19 +1767,18 @@
 	  jam();
 	  tupkeyErrorLab(signal);
 	  return -1;
-	}//if
+	}
       } else {
 	return TUPKEY_abort(signal, 19);
-      }//if
-    }//if
+      }
+    }
     if (RfinalRLen > 0) {
       jam();
       /* ---------------------------------------------------------------- */
       // The final action is that we can also read the tuple after it has
       // been updated.
       /* ---------------------------------------------------------------- */
-      TnoDataRW = readAttributes(pagePtr,
-				 TupHeadOffset,
+      TnoDataRW= readAttributes(req_struct,
 				 &cinBuffer[RinstructionCounter],
 				 RfinalRLen,
 				 &dst[RattroutCounter],
@@ -1356,19 +1790,19 @@
 	jam();
 	tupkeyErrorLab(signal);
 	return -1;
-      }//if
-    }//if
-    regOperPtr->logSize = RlogSize;
-    regOperPtr->attroutbufLen = RattroutCounter;
-    sendReadAttrinfo(signal, RattroutCounter, regOperPtr);
+      }
+    }
+    req_struct->log_size= RlogSize;
+    req_struct->read_length= RattroutCounter;
+    sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
     if (RlogSize > 0) {
       sendLogAttrinfo(signal, RlogSize, regOperPtr);
-    }//if
+    }
     return 0;
   } else {
     return TUPKEY_abort(signal, 22);
-  }//if
-}//Dbtup::interpreterStartLab()
+  }
+}
 
 /* ---------------------------------------------------------------- */
 /*       WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
@@ -1383,30 +1817,28 @@
                             Operationrec *  const regOperPtr)
 
 {
-  Uint32 TbufferIndex = 0;
-  signal->theData[0] = regOperPtr->userpointer;
+  Uint32 TbufferIndex= 0;
+  signal->theData[0]= regOperPtr->userpointer;
   while (TlogSize > 22) {
     MEMCOPY_NO_WORDS(&signal->theData[3],
                      &clogMemBuffer[TbufferIndex],
                      22);
-    EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref), 
-                   GSN_TUP_ATTRINFO, signal, 25);
+    EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
     TbufferIndex += 22;
     TlogSize -= 22;
-  }//while
+  }
   MEMCOPY_NO_WORDS(&signal->theData[3],
                    &clogMemBuffer[TbufferIndex],
                    TlogSize);
-  EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref), 
-                 GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
-}//Dbtup::sendLogAttrinfo()
+  EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
+}
 
 inline
 Uint32 
 brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
 {         
-  Uint32 TbranchDirection = TheInstruction >> 31;
-  Uint32 TbranchLength = (TheInstruction >> 16) & 0x7fff;
+  Uint32 TbranchDirection= TheInstruction >> 31;
+  Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
   TprogramCounter--;
   if (TbranchDirection == 1) {
     jam();
@@ -1420,12 +1852,11 @@
     /*       WE JUMP FORWARD.                                           */
     /* ---------------------------------------------------------------- */
     return (TprogramCounter + TbranchLength);
-  }//if
-}//brancher()
+  }
+}
 
 int Dbtup::interpreterNextLab(Signal* signal,
-                              Page* const pagePtr,
-                              Uint32 TupHeadOffset,
+                              KeyReqStruct* req_struct,
                               Uint32* logMemory,
                               Uint32* mainProgram,
                               Uint32 TmainProgLen,
@@ -1434,14 +1865,14 @@
 			      Uint32 * tmpArea,
 			      Uint32 tmpAreaSz)
 {
-  register Uint32* TcurrentProgram = mainProgram;
-  register Uint32 TcurrentSize = TmainProgLen;
-  register Uint32 RnoOfInstructions = 0;
-  register Uint32 TprogramCounter = 0;
+  register Uint32* TcurrentProgram= mainProgram;
+  register Uint32 TcurrentSize= TmainProgLen;
+  register Uint32 RnoOfInstructions= 0;
+  register Uint32 TprogramCounter= 0;
   register Uint32 theInstruction;
   register Uint32 theRegister;
-  Uint32 TdataWritten = 0;
-  Uint32 RstackPtr = 0;
+  Uint32 TdataWritten= 0;
+  Uint32 RstackPtr= 0;
   union {
     Uint32 TregMemBuffer[32];
     Uint64 Tdummy[16];
@@ -1454,23 +1885,23 @@
   // They are handled as 64 bit values. Thus the 32 most significant
   // bits are zeroed for 32 bit values.
   /* ---------------------------------------------------------------- */
-  TregMemBuffer[0] = 0;
-  TregMemBuffer[4] = 0;
-  TregMemBuffer[8] = 0;
-  TregMemBuffer[12] = 0;
-  TregMemBuffer[16] = 0;
-  TregMemBuffer[20] = 0;
-  TregMemBuffer[24] = 0;
-  TregMemBuffer[28] = 0;
-  Uint32 tmpHabitant = ~0;
+  TregMemBuffer[0]= 0;
+  TregMemBuffer[4]= 0;
+  TregMemBuffer[8]= 0;
+  TregMemBuffer[12]= 0;
+  TregMemBuffer[16]= 0;
+  TregMemBuffer[20]= 0;
+  TregMemBuffer[24]= 0;
+  TregMemBuffer[28]= 0;
+  Uint32 tmpHabitant= ~0;
 
   while (RnoOfInstructions < 8000) {
     /* ---------------------------------------------------------------- */
     /* EXECUTE THE NEXT INTERPRETER INSTRUCTION.                        */
     /* ---------------------------------------------------------------- */
     RnoOfInstructions++;
-    theInstruction = TcurrentProgram[TprogramCounter];
-    theRegister = Interpreter::getReg1(theInstruction) << 2;
+    theInstruction= TcurrentProgram[TprogramCounter];
+    theRegister= Interpreter::getReg1(theInstruction) << 2;
     if (TprogramCounter < TcurrentSize) {
       TprogramCounter++;
       switch (Interpreter::getOpCode(theInstruction)) {
@@ -1482,14 +1913,13 @@
 	// as long as it fits in the 64 bits of the register.
 	/* ---------------------------------------------------------------- */
 	{
-	  Uint32 theAttrinfo = theInstruction;
-	  int TnoDataRW= readAttributes(pagePtr,
-					TupHeadOffset,
-					&theAttrinfo,
-					(Uint32)1,
-					&TregMemBuffer[theRegister],
-					(Uint32)3,
-					false);
+	  Uint32 theAttrinfo= theInstruction;
+	  int TnoDataRW= readAttributes(req_struct,
+				     &theAttrinfo,
+				     (Uint32)1,
+				     &TregMemBuffer[theRegister],
+				     (Uint32)3,
+                                     false);
 	  if (TnoDataRW == 2) {
 	    /* ------------------------------------------------------------- */
 	    // Two words read means that we get the instruction plus one 32 
@@ -1503,17 +1933,17 @@
 	    // Three words read means that we get the instruction plus two 
 	    // 32 words read. Thus we set the register to be a 64 bit register.
 	    /* ------------------------------------------------------------- */
-	    TregMemBuffer[theRegister] = 0x60;
-            TregMemBuffer[theRegister+3] = TregMemBuffer[theRegister+2];
-            TregMemBuffer[theRegister+2] = TregMemBuffer[theRegister+1];
+	    TregMemBuffer[theRegister]= 0x60;
+            TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
+            TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
 	  } else if (TnoDataRW == 1) {
 	    /* ------------------------------------------------------------- */
 	    // One word read means that we must have read a NULL value. We set
 	    // the register to indicate a NULL value.
 	    /* ------------------------------------------------------------- */
-	    TregMemBuffer[theRegister] = 0;
-	    TregMemBuffer[theRegister + 2] = 0;
-	    TregMemBuffer[theRegister + 3] = 0;
+	    TregMemBuffer[theRegister]= 0;
+	    TregMemBuffer[theRegister + 2]= 0;
+	    TregMemBuffer[theRegister + 3]= 0;
 	  } else if (TnoDataRW == -1) {
 	    jam();
 	    tupkeyErrorLab(signal);
@@ -1524,18 +1954,18 @@
 	    // allowed and will lead to a system crash.
 	    /* ------------------------------------------------------------- */
 	    ndbrequire(false);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::WRITE_ATTR_FROM_REG:
 	jam();
 	{
-	  Uint32 TattrId = theInstruction >> 16;
-	  Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
+	  Uint32 TattrId= theInstruction >> 16;
+	  Uint32 TattrDescrIndex= tabptr.p->tabDescriptor +
 	    (TattrId << ZAD_LOG_SIZE);
-	  Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
-	  Uint32 TregType = TregMemBuffer[theRegister];
+	  Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
+	  Uint32 TregType= TregMemBuffer[theRegister];
 
 	  /* --------------------------------------------------------------- */
 	  // Calculate the number of words of this attribute.
@@ -1543,16 +1973,16 @@
 	  // register size.
 	  /* --------------------------------------------------------------- */
           Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
-	  Uint32 Toptype = operPtr.p->optype;
-
+	  Uint32 Toptype = operPtr.p->op_struct.op_type;
 	  Uint32 TdataForUpdate[3];
 	  Uint32 Tlen;
 
-	  AttributeHeader& ah = AttributeHeader::init(&TdataForUpdate[0], 
-						      TattrId, TattrNoOfWords);
-	  TdataForUpdate[1] = TregMemBuffer[theRegister + 2];
-	  TdataForUpdate[2] = TregMemBuffer[theRegister + 3];
-	  Tlen = TattrNoOfWords + 1;
+	  AttributeHeader& ah= AttributeHeader::init(&TdataForUpdate[0], 
+						      TattrId,
+                                                      TattrNoOfWords << 2);
+	  TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
+	  TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
+	  Tlen= TattrNoOfWords + 1;
 	  if (Toptype == ZUPDATE) {
 	    if (TattrNoOfWords <= 2) {
               if (TattrNoOfWords == 1) {
@@ -1565,77 +1995,78 @@
 		// Write a NULL value into the attribute
 		/* --------------------------------------------------------- */
 		ah.setNULL();
-		Tlen = 1;
-	      }//if
-	      int TnoDataRW= updateAttributes(pagePtr,
-					      TupHeadOffset,
-					      &TdataForUpdate[0],
-					      Tlen);
+		Tlen= 1;
+	      }
+	      int TnoDataRW= updateAttributes(req_struct,
+					   &TdataForUpdate[0],
+					   Tlen);
 	      if (TnoDataRW != -1) {
 		/* --------------------------------------------------------- */
 		// Write the written data also into the log buffer so that it 
 		// will be logged.
 		/* --------------------------------------------------------- */
-		logMemory[TdataWritten + 0] = TdataForUpdate[0];
-		logMemory[TdataWritten + 1] = TdataForUpdate[1];
-		logMemory[TdataWritten + 2] = TdataForUpdate[2];
+		logMemory[TdataWritten + 0]= TdataForUpdate[0];
+		logMemory[TdataWritten + 1]= TdataForUpdate[1];
+		logMemory[TdataWritten + 2]= TdataForUpdate[2];
 		TdataWritten += Tlen;
 	      } else {
 		tupkeyErrorLab(signal);
 		return -1;
-	      }//if
+	      }
 	    } else {
 	      return TUPKEY_abort(signal, 15);
-	    }//if
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 16);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::LOAD_CONST_NULL:
 	jam();
-	TregMemBuffer[theRegister] = 0;	/* NULL INDICATOR */
+	TregMemBuffer[theRegister]= 0;	/* NULL INDICATOR */
 	break;
 
       case Interpreter::LOAD_CONST16:
 	jam();
-	TregMemBuffer[theRegister] = 0x50;	/* 32 BIT UNSIGNED CONSTANT */
-	* (Int64*)(TregMemBuffer+theRegister+2) = theInstruction >> 16;
+	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
+	* (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
 	break;
 
       case Interpreter::LOAD_CONST32:
 	jam();
-	TregMemBuffer[theRegister] = 0x50;	/* 32 BIT UNSIGNED CONSTANT */
-	* (Int64*)(TregMemBuffer+theRegister+2) = * 
+	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
+	* (Int64*)(TregMemBuffer+theRegister+2)= * 
 	  (TcurrentProgram+TprogramCounter);
 	TprogramCounter++;
 	break;
 
       case Interpreter::LOAD_CONST64:
 	jam();
-	TregMemBuffer[theRegister] = 0x60;	/* 64 BIT UNSIGNED CONSTANT */
-        TregMemBuffer[theRegister + 2 ] = * (TcurrentProgram + TprogramCounter++);
-        TregMemBuffer[theRegister + 3 ] = * (TcurrentProgram + TprogramCounter++);
+	TregMemBuffer[theRegister]= 0x60;	/* 64 BIT UNSIGNED CONSTANT */
+        TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
+                                             TprogramCounter++);
+        TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
+                                             TprogramCounter++);
 	break;
 
       case Interpreter::ADD_REG_REG:
 	jam();
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
-	  Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
 
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
          
 	  if ((TleftType | TrightType) != 0) {
-	    Uint64 Tdest0 = Tleft0 + Tright0;
-	    * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0;
-	    TregMemBuffer[TdestRegister] = 0x60;
+	    Uint64 Tdest0= Tleft0 + Tright0;
+	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
+	    TregMemBuffer[TdestRegister]= 0x60;
 	  } else {
 	    return TUPKEY_abort(signal, 20);
 	  }
@@ -1645,19 +2076,19 @@
       case Interpreter::SUB_REG_REG:
 	jam();
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
-	  Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
          
 	  if ((TleftType | TrightType) != 0) {
-	    Int64 Tdest0 = Tleft0 - Tright0;
-	    * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0;
-	    TregMemBuffer[TdestRegister] = 0x60;
+	    Int64 Tdest0= Tleft0 - Tright0;
+	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
+	    TregMemBuffer[TdestRegister]= 0x60;
 	  } else {
 	    return TUPKEY_abort(signal, 20);
 	  }
@@ -1665,7 +2096,7 @@
 	}
 
       case Interpreter::BRANCH:
-	TprogramCounter = brancher(theInstruction, TprogramCounter);
+	TprogramCounter= brancher(theInstruction, TprogramCounter);
 	break;
 
       case Interpreter::BRANCH_REG_EQ_NULL:
@@ -1674,8 +2105,8 @@
 	  continue;
 	} else {
 	  jam();
-	  TprogramCounter = brancher(theInstruction, TprogramCounter);
-	}//if
+	  TprogramCounter= brancher(theInstruction, TprogramCounter);
+	}
 	break;
 
       case Interpreter::BRANCH_REG_NE_NULL:
@@ -1684,140 +2115,140 @@
 	  continue;
 	} else {
 	  jam();
-	  TprogramCounter = brancher(theInstruction, TprogramCounter);
-	}//if
+	  TprogramCounter= brancher(theInstruction, TprogramCounter);
+	}
 	break;
 
 
       case Interpreter::BRANCH_EQ_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Uint32 Tleft0    = TregMemBuffer[theRegister + 2];
-	  Uint32 Tleft1    = TregMemBuffer[theRegister + 3];
-
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Uint32 Tright0 = TregMemBuffer[TrightRegister + 2];
-	  Uint32 Tright1 = TregMemBuffer[TrightRegister + 3];
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
+	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
+
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
+	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 23);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::BRANCH_NE_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Uint32 Tleft0    = TregMemBuffer[theRegister + 2];
-	  Uint32 Tleft1    = TregMemBuffer[theRegister + 3];
-
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Uint32 Tright0 = TregMemBuffer[TrightRegister + 2];
-	  Uint32 Tright1 = TregMemBuffer[TrightRegister + 3];
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
+	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
+
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
+	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 24);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::BRANCH_LT_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
          
 
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if (Tleft0 < Tright0) {
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 24);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::BRANCH_LE_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 	  
 
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if (Tleft0 <= Tright0) {
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 26);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::BRANCH_GT_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 	  
 
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if (Tleft0 > Tright0){
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 27);
-	  }//if
+	  }
 	  break;
 	}
 
       case Interpreter::BRANCH_GE_REG_REG:
 	{
-	  Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
 
-	  Uint32 TrightType = TregMemBuffer[TrightRegister];
-	  Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+	  Uint32 TrightType= TregMemBuffer[TrightRegister];
+	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
 	  
-	  Uint32 TleftType = TregMemBuffer[theRegister];
-	  Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+	  Uint32 TleftType= TregMemBuffer[theRegister];
+	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
 	  
 
 	  if ((TrightType | TleftType) != 0) {
 	    jam();
 	    if (Tleft0 >= Tright0){
-	      TprogramCounter = brancher(theInstruction, TprogramCounter);
-	    }//if
+	      TprogramCounter= brancher(theInstruction, TprogramCounter);
+	    }
 	  } else {
 	    return TUPKEY_abort(signal, 28);
-	  }//if
+	  }
 	  break;
 	}
 
@@ -1829,8 +2260,7 @@
 	Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
 
 	if(tmpHabitant != attrId){
-	  Int32 TnoDataR = readAttributes(pagePtr,
-					  TupHeadOffset,
+	  Int32 TnoDataR = readAttributes(req_struct,
 					  &attrId, 1,
 					  tmpArea, tmpAreaSz,
                                           false);
@@ -1840,7 +2270,7 @@
 	    tupkeyErrorLab(signal);
 	    return -1;
 	  }
-	  tmpHabitant = attrId;
+	  tmpHabitant= attrId;
 	}
 
         // get type
@@ -1931,12 +2361,11 @@
 	
       case Interpreter::BRANCH_ATTR_EQ_NULL:{
 	jam();
-	Uint32 ins2 = TcurrentProgram[TprogramCounter];
-	Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
+	Uint32 ins2= TcurrentProgram[TprogramCounter];
+	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
 	
-	if(tmpHabitant != attrId){
-	  Int32 TnoDataR = readAttributes(pagePtr,
-					  TupHeadOffset,
+	if (tmpHabitant != attrId){
+	  Int32 TnoDataR= readAttributes(req_struct,
 					  &attrId, 1,
 					  tmpArea, tmpAreaSz,
                                           false);
@@ -1946,12 +2375,12 @@
 	    tupkeyErrorLab(signal);
 	    return -1;
 	  }
-	  tmpHabitant = attrId;
+	  tmpHabitant= attrId;
 	}
 	
 	AttributeHeader ah(tmpArea[0]);
-	if(ah.isNULL()){
-	  TprogramCounter = brancher(theInstruction, TprogramCounter);
+	if (ah.isNULL()){
+	  TprogramCounter= brancher(theInstruction, TprogramCounter);
 	} else {
 	  TprogramCounter ++;
 	}
@@ -1960,12 +2389,11 @@
 
       case Interpreter::BRANCH_ATTR_NE_NULL:{
 	jam();
-	Uint32 ins2 = TcurrentProgram[TprogramCounter];
-	Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
+	Uint32 ins2= TcurrentProgram[TprogramCounter];
+	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
 	
-	if(tmpHabitant != attrId){
-	  Int32 TnoDataR = readAttributes(pagePtr,
-					  TupHeadOffset,
+	if (tmpHabitant != attrId){
+	  Int32 TnoDataR= readAttributes(req_struct,
 					  &attrId, 1,
 					  tmpArea, tmpAreaSz,
                                           false);
@@ -1975,14 +2403,14 @@
 	    tupkeyErrorLab(signal);
 	    return -1;
 	  }
-	  tmpHabitant = attrId;
+	  tmpHabitant= attrId;
 	}
 	
 	AttributeHeader ah(tmpArea[0]);
-	if(ah.isNULL()){
+	if (ah.isNULL()){
 	  TprogramCounter ++;
 	} else {
-	  TprogramCounter = brancher(theInstruction, TprogramCounter);
+	  TprogramCounter= brancher(theInstruction, TprogramCounter);
 	}
 	break;
       }
@@ -1999,7 +2427,7 @@
 #ifdef TRACE_INTERPRETER
 	ndbout_c(" - exit_ok_last");
 #endif
-	operPtr.p->lastRow = 1;
+	req_struct->last_row= true;
 	return TdataWritten;
 	
       case Interpreter::EXIT_REFUSE:
@@ -2007,52 +2435,846 @@
 #ifdef TRACE_INTERPRETER
 	ndbout_c(" - exit_nok");
 #endif
-	terrorCode = theInstruction >> 16;
+	terrorCode= theInstruction >> 16;
 	return TUPKEY_abort(signal, 29);
 
       case Interpreter::CALL:
 	jam();
 	RstackPtr++;
 	if (RstackPtr < 32) {
-	  TstackMemBuffer[RstackPtr] = TprogramCounter + 1;
-	  TprogramCounter = theInstruction >> 16;
+	  TstackMemBuffer[RstackPtr]= TprogramCounter + 1;
+	  TprogramCounter= theInstruction >> 16;
 	  if (TprogramCounter < TsubroutineLen) {
-	    TcurrentProgram = subroutineProg;
-	    TcurrentSize = TsubroutineLen;
+	    TcurrentProgram= subroutineProg;
+	    TcurrentSize= TsubroutineLen;
 	  } else {
 	    return TUPKEY_abort(signal, 30);
-	  }//if
+	  }
 	} else {
 	  return TUPKEY_abort(signal, 31);
-	}//if
+	}
 	break;
 
       case Interpreter::RETURN:
 	jam();
 	if (RstackPtr > 0) {
-	  TprogramCounter = TstackMemBuffer[RstackPtr];
+	  TprogramCounter= TstackMemBuffer[RstackPtr];
 	  RstackPtr--;
 	  if (RstackPtr == 0) {
 	    jam();
 	    /* ------------------------------------------------------------- */
 	    // We are back to the main program.
 	    /* ------------------------------------------------------------- */
-	    TcurrentProgram = mainProgram;
-	    TcurrentSize = TmainProgLen;
-	  }//if
+	    TcurrentProgram= mainProgram;
+	    TcurrentSize= TmainProgLen;
+	  }
 	} else {
 	  return TUPKEY_abort(signal, 32);
-	}//if
+	}
 	break;
 
       default:
 	return TUPKEY_abort(signal, 33);
-      }//switch
+      }
     } else {
       return TUPKEY_abort(signal, 34);
-    }//if
-  }//while
+    }
+  }
   return TUPKEY_abort(signal, 35);
-}//Dbtup::interpreterNextLab()
+}
+
+/**
+ * expand_var_part - copy packed variable attributes to fully expanded size
+ * 
+ * dst:        where to start writing attribute data
+ * dst_off_ptr where to write attribute offsets
+ * src         pointer to packed attributes
+ * tabDesc     array of attribute descriptors (used for getting max size)
+ * no_of_attr  no of atributes to expand
+ */
+Uint32*
+expand_var_part(Dbtup::KeyReqStruct::Var_data *dst, 
+		const Uint32* src, 
+		const Uint32 * tabDesc, 
+		const Uint16* order)
+{
+  char* dst_ptr= dst->m_data_ptr;
+  Uint32 no_attr= dst->m_var_len_offset;
+  Uint16* dst_off_ptr= dst->m_offset_array_ptr;
+  Uint16* dst_len_ptr= dst_off_ptr + no_attr;
+  const Uint16* src_off_ptr= (const Uint16*)src;
+  const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
+  
+  Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
+  for(Uint32 i = 0; i<no_attr; i++)
+  {
+    next_pos= *src_off_ptr++;
+    len= next_pos - tmp;
+    
+    *dst_off_ptr++ = dst_off; 
+    *dst_len_ptr++ = dst_off + len;
+    memcpy(dst_ptr, src_ptr, len);
+    src_ptr += len;
+    
+    max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
+    dst_ptr += max_len; // Max size
+    dst_off += max_len;
+    
+    tmp= next_pos;
+  }
+  
+  return ALIGN_WORD(dst_ptr);
+}
+
+void
+Dbtup::expand_tuple(KeyReqStruct* req_struct, 
+		    Uint32 sizes[2],
+		    Tuple_header* src, 
+		    const Tablerec* tabPtrP,
+		    bool disk)
+{
+  Uint32 bits= src->m_header_bits;
+  Tuple_header* ptr= req_struct->m_tuple_ptr;
+  
+  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint32 fix_size= tabPtrP->m_offsets[MM].m_varpart_offset;
+  Uint32 order_desc= tabPtrP->m_real_order_descriptor;
+
+  Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
+  const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
+  const Uint32 *src_ptr= src->get_var_part_ptr(tabPtrP);
+  const Uint32 * desc= (Uint32*)req_struct->attr_descr;
+  const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
+  order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
+  
+  if(mm_vars)
+  {
+
+    Uint32 step; // in bytes
+    const Uint32 *src_data= src_ptr;
+    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+    if(bits & Tuple_header::CHAINED_ROW)
+    {
+      Ptr<Page> var_page;
+      src_data= get_ptr(&var_page, * (Var_part_ref*)src_ptr);
+      step= 4;
+      sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
+      req_struct->m_varpart_page_ptr = var_page;
+    }
+    else
+    {
+      step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
+      sizes[MM]= (step + 3) >> 2;
+      req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
+    }
+    dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
+    dst->m_offset_array_ptr= req_struct->var_pos_array;
+    dst->m_var_len_offset= mm_vars;
+    dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
+    
+    dst_ptr= expand_var_part(dst, src_data, desc, order);
+    ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
+    ndbassert((UintPtr(src_ptr) & 3) == 0);
+    src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
+    
+    sizes[MM] += fix_size + Tuple_header::HeaderSize;
+    memcpy(ptr, src, 4*(fix_size + Tuple_header::HeaderSize));
+  } 
+  else 
+  {
+    sizes[MM]= 1;
+    dst_ptr -= Tuple_header::HeaderSize;
+    src_ptr -= Tuple_header::HeaderSize;
+    memcpy(ptr, src, 4*fix_size);
+  }
+
+  src->m_header_bits= bits & 
+    ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
+  
+  sizes[DD]= 0;
+  if(disk && dd_tot)
+  {
+    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
+    order += mm_vars;
+    
+    if(bits & Tuple_header::DISK_INLINE)
+    {
+      // Only on copy tuple
+      ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
+    }
+    else
+    {
+      Local_key key;
+      memcpy(&key, disk_ref, sizeof(key));
+      key.m_page_no= req_struct->m_disk_page_ptr.i;
+      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+    }
+    bits |= Tuple_header::DISK_INLINE;
+
+    // Fix diskpart
+    req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
+    memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
+    sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
+    
+    ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+    
+    if(dd_vars)
+    {
+      KeyReqStruct::Var_data* dst= &req_struct->m_var_data[DD];
+      dst_ptr += tabPtrP->m_offsets[DD].m_varpart_offset;
+      src_ptr += tabPtrP->m_offsets[DD].m_varpart_offset;
+      order += tabPtrP->m_attributes[DD].m_no_of_fixsize;
+      
+      dst->m_data_ptr= (char*)(char*)(((Uint16*)dst_ptr)+dd_vars+1);
+      dst->m_offset_array_ptr= req_struct->var_pos_array + (mm_vars << 1);
+      dst->m_var_len_offset= dd_vars;
+      dst->m_max_var_offset= tabPtrP->m_offsets[DD].m_max_var_offset;
+
+      expand_var_part(dst, src_ptr, desc, order);
+    }
+  }
+  
+  ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
+}
+
+void
+Dbtup::prepare_read(KeyReqStruct* req_struct, 
+		    Tablerec* tabPtrP, bool disk)
+{
+  Tuple_header* ptr= req_struct->m_tuple_ptr;
+  
+  Uint32 bits= ptr->m_header_bits;
+  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  
+  const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
+  const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
+  
+  if(mm_vars)
+  {
+    const Uint32 *src_data= src_ptr;
+    KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
+    if(bits & Tuple_header::CHAINED_ROW)
+    {
+#if VM_TRACE
+      
+#endif
+      src_data= get_ptr(* (Var_part_ref*)src_ptr);
+    }
+    dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
+    dst->m_offset_array_ptr= (Uint16*)src_data;
+    dst->m_var_len_offset= 1;
+    dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
+    
+    // disk part start after varsize (aligned)
+    src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
+  } 
+  else
+  {
+    // disk part if after fixsize part...
+    src_ptr -= Tuple_header::HeaderSize; 
+  }
+  
+  if(disk && dd_tot)
+  {
+    const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
+    
+    if(bits & Tuple_header::DISK_INLINE)
+    {
+      // Only on copy tuple
+      ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
+    }
+    else
+    {
+      // XXX
+      Local_key key;
+      memcpy(&key, disk_ref, sizeof(key));
+      key.m_page_no= req_struct->m_disk_page_ptr.i;
+      src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
+    }
+    // Fix diskpart
+    req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
+    ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
+    if(dd_vars)
+    {
+      KeyReqStruct::Var_data* dst= &req_struct->m_var_data[DD];
+      src_ptr += tabPtrP->m_offsets[DD].m_varpart_offset;
+      
+      dst->m_data_ptr= (char*)(char*)(((Uint16*)src_ptr)+dd_vars+1);
+      dst->m_offset_array_ptr= (Uint16*)src_ptr;
+      dst->m_var_len_offset= 1;
+      dst->m_max_var_offset= ((Uint16*)src_ptr)[dd_vars];
+    }
+  }
+}
 
+void
+Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
+		    const Tablerec* tabPtrP, bool disk)
+{
+  ndbassert(tabPtrP->need_shrink());
+  Tuple_header* ptr= req_struct->m_tuple_ptr;
+  
+  Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
+  Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
+  Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
+  
+  Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
+  Uint16* src_off_ptr= req_struct->var_pos_array;
+
+  sizes[MM]= sizes[DD]= 0;
+  if(mm_vars)
+  {
+    Uint16* dst_off_ptr= (Uint16*)dst_ptr;
+    char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
+    char*  src_data_ptr= dst_data_ptr;
+    Uint32 off= 0;
+    for(Uint32 i= 0; i<mm_vars; i++)
+    {
+      const char* data_ptr= src_data_ptr + *src_off_ptr;
+      Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
+      * dst_off_ptr++= off;
+      memmove(dst_data_ptr, data_ptr, len);
+      off += len;
+      src_off_ptr++;
+      dst_data_ptr += len;
+    }
+    *dst_off_ptr= off;
+    ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
+    ndbassert((UintPtr(ptr) & 3) == 0);
+    sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
 
+    dst_ptr = ALIGN_WORD(dst_data_ptr);
+  }
+  else
+  {
+    sizes[MM] = 1;
+    dst_ptr -= Tuple_header::HeaderSize;
+  }
+  
+  if(disk && dd_tot)
+  {
+    Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
+    req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
+    if (unlikely(dd_vars))
+    {
+      abort();
+    }
+    else
+    {
+      sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
+      memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
+    }
+  }
+}
+
+void
+Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
+{
+  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size + 
+    Tuple_header::HeaderSize;
+    
+  if(mm_vars == 0)
+    return;
+  
+  for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
+  {
+    FragrecordPtr fragPtr;
+
+    if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
+      continue;
+
+    ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+    for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
+    {
+      Uint32 real= getRealpid(fragPtr.p, P);
+      Var_page* page= (Var_page*)c_page_pool.getPtr(real);
+
+      for(Uint32 i=1; i<page->high_index; i++)
+      {
+	Uint32 idx= page->get_index_word(i);
+	Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
+	if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
+	{
+	  Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
+	  Uint32 *part= ptr->get_var_part_ptr(regTabPtr);
+	  if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
+	  {
+	    ndbassert(len == fix_sz + 1);
+	    Local_key tmp; tmp.assref(*part);
+	    Ptr<Page> tmpPage;
+	    part= get_ptr(&tmpPage, *(Var_part_ref*)part);
+	    len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
+	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
+	    ndbassert(len >= ((sz + 3) >> 2));
+	  } 
+	  else
+	  {
+	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
+	    ndbassert(len >= ((sz+3)>>2)+fix_sz);
+	  }
+	  if(ptr->m_operation_ptr_i != RNIL)
+	  {
+	    c_operation_pool.getPtr(ptr->m_operation_ptr_i);
+	  }
+	} 
+	else if(!(idx & Var_page::FREE))
+	{
+	  /**
+	   * Chain
+	   */
+	  Uint32 *part= page->get_ptr(i);
+	  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
+	  ndbassert(len >= ((sz + 3) >> 2));
+	} 
+	else 
+	{
+	  
+	}
+      }
+      if(p == 0 && page->high_index > 1)
+	page->reorg((Var_page*)ctemp_page);
+    }
+  }
+  
+  if(p == 0)
+  {
+    validate_page(regTabPtr, (Var_page*)1);
+  }
+}
+
+int
+Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
+				       Tuple_header* org,
+				       Operationrec* regOperPtr,
+				       Fragrecord* regFragPtr,
+				       Tablerec* regTabPtr,
+				       Uint32 sizes[4])
+{
+  ndbrequire(sizes[1] == sizes[3]);
+  //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
+  if(0)
+    printf("%p %d %d - handle_size_change_after_update ",
+	   req_struct->m_tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx);
+  
+  Uint32 bits= org->m_header_bits;
+  Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
+  Uint32 fix_sz = Tuple_header::HeaderSize + 
+    regTabPtr->m_offsets[MM].m_fix_header_size;
+  
+  if(sizes[MM] == sizes[2+MM])
+    ;
+  else if(sizes[MM] > sizes[2+MM])
+  {
+    if(0) ndbout_c("shrink");
+    copy_bits |= Tuple_header::MM_SHRINK;
+  }
+  else
+  {
+    if(0) printf("grow - ");
+    Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
+    Var_page* pageP= (Var_page*)pagePtr.p;
+    Uint32 idx, alloc, needed;
+    Uint32 *refptr = org->get_var_part_ptr(regTabPtr);
+    ndbassert(bits & Tuple_header::CHAINED_ROW);
+
+    Local_key ref;
+    ref.assref(*refptr);
+    idx= ref.m_page_idx;
+    if (! (copy_bits & Tuple_header::CHAINED_ROW))
+    {
+      c_page_pool.getPtr(pagePtr, ref.m_page_no);
+      pageP = (Var_page*)pagePtr.p;
+    }
+    alloc= pageP->get_entry_len(idx);
+#ifdef VM_TRACE
+    if(!pageP->get_entry_chain(idx))
+      ndbout << *pageP << endl;
+#endif
+    ndbassert(pageP->get_entry_chain(idx));
+    needed= sizes[2+MM] - fix_sz;
+    
+    if(needed <= alloc)
+    {
+      //ndbassert(!regOperPtr->is_first_operation());
+      ndbout_c(" no grow");
+      return 0;
+    }
+    copy_bits |= Tuple_header::MM_GROWN;
+    if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr, 
+				  (Var_part_ref*)refptr, alloc, needed)))
+      return -1;
+  }
+  req_struct->m_tuple_ptr->m_header_bits = copy_bits;
+  return 0;
+}
+
+int
+Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
+{
+  FragrecordPtr fragPtr;
+  fragPtr.i= fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  TablerecPtr tablePtr;
+  tablePtr.i= fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
+  {
+    Local_key tmp = *key;
+    PagePtr page_ptr;
+
+    int ret;
+    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+    {
+      tablePtr.p->m_offsets[MM].m_fix_header_size += 
+	Tuple_header::HeaderSize+1;
+      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
+      tablePtr.p->m_offsets[MM].m_fix_header_size -= 
+	Tuple_header::HeaderSize+1;
+    } 
+    else
+    {
+      ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
+    }
+
+    if (ret)
+      return -1;
+    
+    Tuple_header* ptr = (Tuple_header*)
+      ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
+    
+    ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
+    *ptr->get_mm_gci(tablePtr.p) = gci;
+  }
+  return 0;
+}
+
+int
+Dbtup::nr_read_pk(Uint32 fragPtrI, 
+		  const Local_key* key, Uint32* dst, bool& copy)
+{
+  
+  FragrecordPtr fragPtr;
+  fragPtr.i= fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  TablerecPtr tablePtr;
+  tablePtr.i= fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  Local_key tmp = *key;
+  Uint32 pages = fragPtr.p->noOfPages;
+  
+  int ret;
+  PagePtr page_ptr;
+  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+  {
+    tablePtr.p->m_offsets[MM].m_fix_header_size += Tuple_header::HeaderSize+1;
+    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
+    tablePtr.p->m_offsets[MM].m_fix_header_size -= Tuple_header::HeaderSize+1;
+  } 
+  else
+  {
+    ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);  
+  }
+  if (ret)
+    return -1;
+  
+  KeyReqStruct req_struct;
+  Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
+  
+  req_struct.m_page_ptr = page_ptr;
+  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
+  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
+
+  ret = 0;
+  copy = false;
+  if (! (bits & Tuple_header::FREE))
+  {
+    if (bits & Tuple_header::ALLOC)
+    {
+      Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
+      Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
+      ndbassert(!opPtrP->m_copy_tuple_location.isNull());
+      req_struct.m_tuple_ptr= (Tuple_header*)
+	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
+      copy = true;
+    }
+    req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
+    req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
+    
+    Uint32 num_attr= tablePtr.p->m_no_of_attributes;
+    Uint32 descr_start= tablePtr.p->tabDescriptor;
+    TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+    ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+    req_struct.attr_descr= tab_descr; 
+
+    if (tablePtr.p->need_expand())
+      prepare_read(&req_struct, tablePtr.p, false);
+    
+    const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
+    const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
+    // read pk attributes from original tuple
+    
+    // new globals
+    tabptr= tablePtr;
+    fragptr= fragPtr;
+    operPtr.i= RNIL;
+    operPtr.p= NULL;
+    
+    // do it
+    ret = readAttributes(&req_struct,
+			 attrIds,
+			 numAttrs,
+			 dst,
+			 ZNIL, false);
+    
+    // done
+    if (likely(ret != -1)) {
+      // remove headers
+      Uint32 n= 0;
+      Uint32 i= 0;
+      while (n < numAttrs) {
+	const AttributeHeader ah(dst[i]);
+	Uint32 size= ah.getDataSize();
+	ndbrequire(size != 0);
+	for (Uint32 j= 0; j < size; j++) {
+	  dst[i + j - n]= dst[i + j + 1];
+	}
+	n+= 1;
+	i+= 1 + size;
+      }
+      ndbrequire((int)i == ret);
+      ret -= numAttrs;
+    } else {
+      return terrorCode ? (-(int)terrorCode) : -1;
+    }
+  }
+    
+  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
+  {
+    dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
+  }
+  else
+  {
+    dst[ret] = 0;
+  }
+  return ret;
+}
+
+#include <signaldata/TuxMaint.hpp>
+
+int
+Dbtup::nr_delete(Signal* signal, Uint32 senderData,
+		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
+{
+  FragrecordPtr fragPtr;
+  fragPtr.i= fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  TablerecPtr tablePtr;
+  tablePtr.i= fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  Local_key tmp = * key;
+  tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no); 
+  
+  PagePtr pagePtr;
+  Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
+
+  if (!tablePtr.p->tuxCustomTriggers.isEmpty()) 
+  {
+    jam();
+    TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
+    req->tableId = fragPtr.p->fragTableId;
+    req->fragId = fragPtr.p->fragmentId;
+    req->pageId = tmp.m_page_no;
+    req->pageIndex = tmp.m_page_idx;
+    req->tupVersion = ptr->get_tuple_version();
+    req->opInfo = TuxMaintReq::OpRemove;
+    removeTuxEntries(signal, tablePtr.p);
+  }
+  
+  Local_key disk;
+  memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
+  
+  if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
+  {
+    jam();
+    free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
+  } else {
+    jam();
+    free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
+  }
+
+  if (tablePtr.p->m_no_of_disk_attributes)
+  {
+    jam();
+
+    Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
+      tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
+    
+    int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
+    ndbrequire(res == 0);
+    
+    /**
+     * 1) alloc log buffer
+     * 2) get page
+     * 3) get log buffer
+     * 4) delete tuple
+     */
+    Page_cache_client::Request preq;
+    preq.m_page = disk;
+    preq.m_callback.m_callbackData = senderData;
+    preq.m_callback.m_callbackFunction =
+      safe_cast(&Dbtup::nr_delete_page_callback);
+    int flags = Page_cache_client::COMMIT_REQ;
+    
+#ifdef ERROR_INSERT
+    if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
+    {
+      int rnd = rand() % 100;
+      int slp = 0;
+      if (ERROR_INSERTED(4024))
+      {
+	slp = 3000;
+      }
+      else if (rnd > 90)
+      {
+	slp = 3000;
+      }
+      else if (rnd > 70)
+      {
+	slp = 100;
+      }
+      
+      ndbout_c("rnd: %d slp: %d", rnd, slp);
+      
+      if (slp)
+      {
+	flags |= Page_cache_client::DELAY_REQ;
+	preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
+      }
+    }
+#endif
+    
+    res = m_pgman.get_page(signal, preq, flags);
+    if (res == 0)
+    {
+      goto timeslice;
+    }
+    else if (unlikely(res == -1))
+    {
+      return -1;
+    }
+
+    PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
+    disk_page_set_dirty(disk_page);
+
+    preq.m_callback.m_callbackFunction =
+      safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
+    Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
+    res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
+    switch(res){
+    case 0:
+      signal->theData[2] = disk_page.i;
+      goto timeslice;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+
+    ndbout << "DIRECT DISK DELETE: " << disk << endl;
+    disk_page_free(signal, tablePtr.p, fragPtr.p,
+		   &disk, *(PagePtr*)&disk_page, gci);
+    return 0;
+  }
+  
+  return 0;
+
+timeslice:
+  memcpy(signal->theData, &disk, sizeof(disk));
+  return 1;
+}
+
+void
+Dbtup::nr_delete_page_callback(Signal* signal, 
+			       Uint32 userpointer, Uint32 page_id)
+{
+  Ptr<GlobalPage> gpage;
+  m_global_page_pool.getPtr(gpage, page_id);
+  PagePtr pagePtr= *(PagePtr*)&gpage;
+  disk_page_set_dirty(pagePtr);
+  Dblqh::Nr_op_info op;
+  op.m_ptr_i = userpointer;
+  op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
+  op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
+  c_lqh->get_nr_op_info(&op, page_id);
+
+  Ptr<Fragrecord> fragPtr;
+  fragPtr.i= op.m_tup_frag_ptr_i;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+
+  Ptr<Tablerec> tablePtr;
+  tablePtr.i = fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+  
+  Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) + 
+    tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
+  
+  Callback cb;
+  cb.m_callbackData = userpointer;
+  cb.m_callbackFunction =
+    safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
+  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
+  int res= lgman.get_log_buffer(signal, sz, &cb);
+  switch(res){
+  case 0:
+    return;
+  case -1:
+    ndbrequire("NOT YET IMPLEMENTED" == 0);
+    break;
+  }
+    
+  ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
+  disk_page_free(signal, tablePtr.p, fragPtr.p,
+		 &op.m_disk_ref, pagePtr, op.m_gci);
+  
+  c_lqh->nr_delete_complete(signal, &op);
+  return;
+}
+
+void
+Dbtup::nr_delete_logbuffer_callback(Signal* signal, 
+				    Uint32 userpointer, 
+				    Uint32 unused)
+{
+  Dblqh::Nr_op_info op;
+  op.m_ptr_i = userpointer;
+  c_lqh->get_nr_op_info(&op, RNIL);
+  
+  Ptr<Fragrecord> fragPtr;
+  fragPtr.i= op.m_tup_frag_ptr_i;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+
+  Ptr<Tablerec> tablePtr;
+  tablePtr.i = fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  Ptr<GlobalPage> gpage;
+  m_global_page_pool.getPtr(gpage, op.m_page_id);
+  PagePtr pagePtr= *(PagePtr*)&gpage;
+
+  /**
+   * reset page no
+   */
+  ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
+  
+  disk_page_free(signal, tablePtr.p, fragPtr.p,
+		 &op.m_disk_ref, pagePtr, op.m_gci);
+  
+  c_lqh->nr_delete_complete(signal, &op);
+}

--- 1.19.7.1/ndb/test/ndbapi/Makefile.am	2006-05-16 19:33:18 +03:00
+++ 1.28/storage/ndb/test/ndbapi/Makefile.am	2006-05-19 14:42:32 +03:00
@@ -35,7 +35,6 @@
 testPartitioning \
 testBitfield \
 DbCreate DbAsyncGenerator \
-test_event_multi_table \
 testSRBank \
 test_event_merge
 
@@ -81,14 +80,13 @@
 testBitfield_SOURCES = testBitfield.cpp
 DbCreate_SOURCES = bench/mainPopulate.cpp bench/dbPopulate.cpp bench/userInterface.cpp bench/dbPopulate.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp
 DbAsyncGenerator_SOURCES = bench/mainAsyncGenerator.cpp bench/asyncGenerator.cpp bench/ndb_async2.cpp bench/dbGenerator.h bench/macros.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp
-test_event_multi_table_SOURCES = test_event_multi_table.cpp
 testSRBank_SOURCES = testSRBank.cpp
 test_event_merge_SOURCES = test_event_merge.cpp
 
-INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
+INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/include/kernel
 
-include $(top_srcdir)/ndb/config/common.mk.am
-include $(top_srcdir)/ndb/config/type_ndbapitest.mk.am
+include $(top_srcdir)/storage/ndb/config/common.mk.am
+include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am
 
 ##testDict_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
 ##testIndex_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
@@ -106,62 +104,62 @@
              testScan.dsp
 
 flexBench.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ flexBench
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(flexBench_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ flexBench
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(flexBench_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testBasic.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testBasic
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testBasic_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testBasic
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testBasic_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testOIBasic.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testOIBasic
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testOIBasic_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testOIBasic
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testOIBasic_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testBlobs.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testBlobs
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testBlobs_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testBlobs
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testBlobs_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 
 testScan.dsp: Makefile \
-               $(top_srcdir)/ndb/config/win-prg.am \
-               $(top_srcdir)/ndb/config/win-name \
-               $(top_srcdir)/ndb/config/win-includes \
-               $(top_srcdir)/ndb/config/win-sources \
-               $(top_srcdir)/ndb/config/win-libraries
-	cat $(top_srcdir)/ndb/config/win-prg.am > $@
-	@$(top_srcdir)/ndb/config/win-name $@ testScan
-	@$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES)
-	@$(top_srcdir)/ndb/config/win-sources $@ $(testScan_SOURCES)
-	@$(top_srcdir)/ndb/config/win-libraries $@ LINK $(LDADD)
+               $(top_srcdir)/storage/ndb/config/win-prg.am \
+               $(top_srcdir)/storage/ndb/config/win-name \
+               $(top_srcdir)/storage/ndb/config/win-includes \
+               $(top_srcdir)/storage/ndb/config/win-sources \
+               $(top_srcdir)/storage/ndb/config/win-libraries
+	cat $(top_srcdir)/storage/ndb/config/win-prg.am > $@
+	@$(top_srcdir)/storage/ndb/config/win-name $@ testScan
+	@$(top_srcdir)/storage/ndb/config/win-includes $@ $(INCLUDES)
+	@$(top_srcdir)/storage/ndb/config/win-sources $@ $(testScan_SOURCES)
+	@$(top_srcdir)/storage/ndb/config/win-libraries $@ LINK $(LDADD)
 

--- 1.24/mysql-test/t/rpl_temporary.test	2006-05-18 22:40:34 +03:00
+++ 1.25/mysql-test/t/rpl_temporary.test	2006-05-19 14:42:31 +03:00
@@ -194,7 +194,7 @@
 #
 #14157: utf8 encoding in binlog without set character_set_client
 #
 
 sync_slave_with_master;
 #connection slave;
Thread
bk commit into 5.1 tree (aelkin:1.2165)Andrei Elkin19 May