=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_backup.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_backup.test	2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_backup.test	2008-08-07 11:52:50 +0000
@@ -19,8 +19,8 @@
 --echo *********************************
 --echo * restore tables w/ new column from little endian
 --echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
 SHOW TABLES;
 SHOW CREATE TABLE t1;
 SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;
@@ -32,8 +32,8 @@
 --echo *********************************
 --echo * restore tables w/ new column from big endian
 --echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
 SHOW TABLES;
 SHOW CREATE TABLE t1;
 SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;

=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2008-08-07 11:52:50 +0000
@@ -40,6 +40,10 @@
   STATIC_CONST( MaxKeyInfo = 4 );
   STATIC_CONST( MaxAttrInfo = 5);
 
+  /* Long LQHKEYREQ definitions */
+  STATIC_CONST( KeyInfoSectionNum = 0 );
+  STATIC_CONST( AttrInfoSectionNum = 1 );
+
 private:
 
   /**
@@ -147,7 +151,8 @@
 /**
  * Request Info
  *
- * k = Key len                - 10 Bits (0-9) max 1023
+ * k = Key len                - (Short LQHKEYREQ only) 
+ *                              10 Bits (0-9) max 1023
  * l = Last Replica No        - 2  Bits -> Max 3 (10-11)
 
  IF version < NDBD_ROWID_VERSION
@@ -158,7 +163,8 @@
  * s = Simple indicator       - 1  Bit (18)
  * o = Operation              - 3  Bits (19-21)
  * r = Sequence replica       - 2  Bits (22-23)
- * a = Attr Info in LQHKEYREQ - 3  Bits (24-26)
+ * a = Attr Info in LQHKEYREQ - (Short LQHKEYREQ only)
+                                3  Bits (24-26)
  * c = Same client and tc     - 1  Bit (27)
  * u = Read Len Return Ind    - 1  Bit (28)
  * m = Commit ack marker      - 1  Bit (29)
@@ -167,10 +173,17 @@
  * g = gci flag               - 1  Bit (12)
  * n = NR copy                - 1  Bit (13)
 
+ * Short LQHKEYREQ :
  *             1111111111222222222233
  *   01234567890123456789012345678901
  *   kkkkkkkkkklltttpdisooorraaacumxz
  *   kkkkkkkkkkllgn pdisooorraaacumxz
+ *
+ * Long LQHKEYREQ :
+ *             1111111111222222222233
+ *   01234567890123456789012345678901
+ *             llgn pdisooorr   cumxz
+ *
  */
 
 #define RI_KEYLEN_SHIFT      (0)
@@ -200,7 +213,8 @@
 /**
  * Scan Info
  *
- * a = Attr Len                 - 16 Bits -> max 65535 (0-15)
+ * a = Attr Len                 - (Short LQHKEYREQ only)
+ *                                 16 Bits -> max 65535 (0-15)
  * p = Stored Procedure Ind     -  1 Bit (16)
  * d = Distribution key         -  8 Bit  -> max 255 (17-24)
  * t = Scan take over indicator -  1 Bit (25)
@@ -208,7 +222,8 @@
  *
  *           1111111111222222222233
  * 01234567890123456789012345678901
- * aaaaaaaaaaaaaaaapddddddddtmm
+ * aaaaaaaaaaaaaaaapddddddddtmm       (Short LQHKEYREQ)
+ *                 pddddddddtmm       (Long LQHKEYREQ)
  */
 
 #define SI_ATTR_LEN_MASK     (65535)

=== modified file 'storage/ndb/include/kernel/signaldata/NextScan.hpp'
--- a/storage/ndb/include/kernel/signaldata/NextScan.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/NextScan.hpp	2008-08-07 11:52:50 +0000
@@ -47,6 +47,7 @@
 public:
   // length is less if no keyinfo or no next result
   STATIC_CONST( SignalLength = 11 );
+  STATIC_CONST( SignalLengthNoKeyInfo = 7 );
 private:
   Uint32 scanPtr;               // scan record in LQH
   Uint32 accOperationPtr;

=== modified file 'storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	2008-08-07 11:52:50 +0000
@@ -21,10 +21,11 @@
 class SignalDroppedRep {
 
   /**
-   * Reciver(s)
+   * Receiver(s)
    */
   friend class SimulatedBlock;
   friend class Dbtc;
+  friend class Dblqh;
 
   /**
    * Sender (TransporterCallback.cpp)

=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp	2008-08-07 11:52:50 +0000
@@ -35,7 +35,7 @@
   friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
 
 public:
-  STATIC_CONST( SignalLength = 18 );
+  STATIC_CONST( SignalLength = 19 );
 
 private:
 
@@ -60,6 +60,7 @@
   Uint32 disk_page;
   Uint32 m_row_id_page_no;
   Uint32 m_row_id_page_idx;
+  Uint32 attrInfoIVal;
 };
 
 class TupKeyConf {

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2008-08-07 11:52:50 +0000
@@ -99,6 +99,8 @@
 #define NDBD_MICRO_GCP_62 NDB_MAKE_VERSION(6,2,5)
 #define NDBD_MICRO_GCP_63 NDB_MAKE_VERSION(6,3,2)
 #define NDBD_RAW_LCP MAKE_VERSION(6,3,11)
+#define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
+
 
 static
 inline

=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2008-08-07 11:52:50 +0000
@@ -145,12 +145,16 @@
       fprintf(output, "H\'%.8x ", sig->variableData[nextPos]);
     fprintf(output, "\n");
   } else {
-    fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
-            "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
-            sig->variableData[nextPos+0], sig->variableData[nextPos+1], 
-            sig->variableData[nextPos+2], sig->variableData[nextPos+3],
-            sig->variableData[nextPos+4]);
-    nextPos += 5;
+    /* Only have section sizes if it's a short LQHKEYREQ */
+    if (LqhKeyReq::getAIInLqhKeyReq(reqInfo) == LqhKeyReq::MaxAttrInfo)
+    {
+      fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
+              "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
+              sig->variableData[nextPos+0], sig->variableData[nextPos+1], 
+              sig->variableData[nextPos+2], sig->variableData[nextPos+3],
+              sig->variableData[nextPos+4]);
+      nextPos += 5;
+    }
   }
   return true;
 }

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-08-07 11:52:50 +0000
@@ -1920,9 +1920,9 @@
       LOG_CONNECTED = 3
     };
     ConnectState connectState;
-    UintR copyCountWords;    
-    UintR firstAttrinfo[5];
-    UintR tupkeyData[4];
+    UintR copyCountWords;
+    Uint32 keyInfoIVal;
+    Uint32 attrInfoIVal;
     UintR transid[2];
     AbortState abortState;
     UintR accConnectrec;
@@ -1931,15 +1931,15 @@
     UintR tcTimer;
     UintR currReclenAi;
     UintR currTupAiLen;
-    UintR firstAttrinbuf;
-    UintR firstTupkeybuf;
+    UintR firstAttrinbuf;  // TODO : Remove
+    UintR firstTupkeybuf;  // TODO : Remove
     UintR fragmentid;
     UintR fragmentptr;
     UintR gci_hi;
     UintR gci_lo;
     UintR hashValue;
-    UintR lastTupkeybuf;
-    UintR lastAttrinbuf;
+    UintR lastTupkeybuf;   // TODO : Remove
+    UintR lastAttrinbuf;   // TODO : Remove
     /**
      * Each operation (TcConnectrec) can be stored in max one out of many 
      * lists.
@@ -2013,6 +2013,10 @@
     Uint8 m_disk_table;
     Uint8 m_use_rowid;
     Uint8 m_dealloc;
+    enum op_flags {
+      OP_ISLONGREQ         = 0x1,
+      OP_SAVEATTRINFO      = 0x2};
+    Uint32 m_flags;
     Uint32 m_log_part_ptr_i;
     Local_key m_row_id;
 
@@ -2079,6 +2083,7 @@
   void execEXEC_SRREQ(Signal* signal);
   void execEXEC_SRCONF(Signal* signal);
   void execREAD_PSEUDO_REQ(Signal* signal);
+  void execSIGNAL_DROPPED_REP(Signal* signal);
 
   void execDUMP_STATE_ORD(Signal* signal);
   void execACC_ABORTCONF(Signal* signal);
@@ -2324,7 +2329,7 @@
   void readExecSrNewMbyte(Signal* signal);
   void readExecSr(Signal* signal);
   void readKey(Signal* signal);
-  void readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr);
+  void readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal);
   void readLogHeader(Signal* signal);
   Uint32 readLogword(Signal* signal);
   Uint32 readLogwordExec(Signal* signal);
@@ -2343,6 +2348,7 @@
   void removePageRef(Signal* signal);
   Uint32 returnExecLog(Signal* signal);
   int saveTupattrbuf(Signal* signal, Uint32* dataPtr, Uint32 length);
+  int saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len);
   void seizeAddfragrec(Signal* signal);
   void seizeAttrinbuf(Signal* signal);
   Uint32 seize_attrinbuf();
@@ -2370,6 +2376,7 @@
   void writeKey(Signal* signal);
   void writeLogHeader(Signal* signal);
   void writeLogWord(Signal* signal, Uint32 data);
+  void writeLogWords(Signal* signal, const Uint32* data, Uint32 len);
   void writeNextLog(Signal* signal);
   void errorReport(Signal* signal, int place);
   void warningReport(Signal* signal, int place);
@@ -2465,7 +2472,7 @@
   void nextScanConfScanLab(Signal* signal);
   void nextScanConfCopyLab(Signal* signal);
   void continueScanNextReqLab(Signal* signal);
-  void keyinfoLab(const Uint32 * src, const Uint32 * end);
+  bool keyinfoLab(const Uint32 * src, Uint32 len);
   void copySendTupkeyReqLab(Signal* signal);
   void storedProcConfScanLab(Signal* signal);
   void storedProcConfCopyLab(Signal* signal);
@@ -2576,6 +2583,8 @@
   void exec_acckeyreq(Signal*, Ptr<TcConnectionrec>);
   int compare_key(const TcConnectionrec*, const Uint32 * ptr, Uint32 len);
   void nr_copy_delete_row(Signal*, Ptr<TcConnectionrec>, Local_key*, Uint32);
+  Uint32 getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr, 
+                              Uint32 offset);
 public:
   struct Nr_op_info
   {

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-08-07 11:52:50 +0000
@@ -200,6 +200,8 @@
 
   addRecSignal(GSN_ALTER_TAB_REQ, &Dblqh::execALTER_TAB_REQ);
 
+  addRecSignal(GSN_SIGNAL_DROPPED_REP, &Dblqh::execSIGNAL_DROPPED_REP, true);
+
   // Trigger signals, transit to from TUP
   addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &Dblqh::execCREATE_TRIG_IMPL_REQ);
   addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &Dblqh::execCREATE_TRIG_IMPL_CONF);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-07-23 11:36:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-08-07 11:52:50 +0000
@@ -63,9 +63,12 @@
 #include <signaldata/RestoreImpl.hpp>
 #include <signaldata/KeyInfo.hpp>
 #include <signaldata/AttrInfo.hpp>
+#include <signaldata/TransIdAI.hpp>
 #include <KeyDescriptor.hpp>
 #include <signaldata/RouteOrd.hpp>
 #include <signaldata/FsRef.hpp>
+#include <SectionReader.hpp>
+#include <signaldata/SignalDroppedRep.hpp>
 
 #include "../suma/Suma.hpp"
 
@@ -214,11 +217,6 @@
     ndbout << " tcNodeFailrec = " << tcConnectptr.p->tcNodeFailrec;
     ndbout << " activeCreat = " << tcConnectptr.p->activeCreat;
     ndbout << endl;
-    ndbout << "tupkeyData0 = " << tcConnectptr.p->tupkeyData[0];
-    ndbout << "tupkeyData1 = " << tcConnectptr.p->tupkeyData[1];
-    ndbout << "tupkeyData2 = " << tcConnectptr.p->tupkeyData[2];
-    ndbout << "tupkeyData3 = " << tcConnectptr.p->tupkeyData[3];
-    ndbout << endl;
     ndbout << "abortState = " << tcConnectptr.p->abortState;
     ndbout << "listState = " << tcConnectptr.p->listState;
     ndbout << endl;
@@ -3185,25 +3183,45 @@
 			       Uint32 len) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  Uint32 dataPos = 0;
+  TcConnectionrec::TransactionState state = regTcPtr->transactionState;
   Uint32 total = regTcPtr->save1 + len;
   Uint32 primKeyLen = regTcPtr->primKeyLen;
-  while (dataPos < len) {
-    if (cfirstfreeDatabuf == RNIL) {
+  if (state == TcConnectionrec::WAIT_TUPKEYINFO)
+  {
+    /* Different handling for TUPKEYREQ to SCANAI 
+     * TODO : Converge once long scan implemented
+     */
+    bool ok= appendToSection(regTcPtr->keyInfoIVal,
+                             dataPtr,
+                             len);
+    if (unlikely(!ok))
+    {
       jam();
       return ZGET_DATAREC_ERROR;
-    }//if
-    seizeTupkeybuf(signal);
-    Databuf * const regDataPtr = databufptr.p;
-    Uint32 data0 = dataPtr[dataPos];
-    Uint32 data1 = dataPtr[dataPos + 1];
-    Uint32 data2 = dataPtr[dataPos + 2];
-    Uint32 data3 = dataPtr[dataPos + 3];
-    regDataPtr->data[0] = data0;
-    regDataPtr->data[1] = data1;
-    regDataPtr->data[2] = data2;
-    regDataPtr->data[3] = data3;
-    dataPos += 4;
+    }
+  }
+  else
+  {
+    /* Scan_AI KeyInfo */
+    Uint32 dataPos = 0;
+    
+    while (dataPos < len) {
+      if (cfirstfreeDatabuf == RNIL) {
+        jam();
+        return ZGET_DATAREC_ERROR;
+      }//if
+      seizeTupkeybuf(signal);
+      Databuf * const regDataPtr = databufptr.p;
+      Uint32 data0 = dataPtr[dataPos];
+      Uint32 data1 = dataPtr[dataPos + 1];
+      Uint32 data2 = dataPtr[dataPos + 2];
+      Uint32 data3 = dataPtr[dataPos + 3];
+      regDataPtr->data[0] = data0;
+      regDataPtr->data[1] = data1;
+      regDataPtr->data[2] = data2;
+      regDataPtr->data[3] = data3;
+      dataPos += 4;
+    }
   }
 
   regTcPtr->save1 = total;
@@ -3329,23 +3347,42 @@
 void Dblqh::execTUP_ATTRINFO(Signal* signal) 
 {
   TcConnectionrec *regTcConnectionrec = tcConnectionrec;
-  Uint32 length = signal->length() - 3;
   Uint32 tcIndex = signal->theData[0];
   Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
   jamEntry();
   tcConnectptr.i = tcIndex;
   ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
-  ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::WAIT_TUP);
-  if (saveTupattrbuf(signal, &signal->theData[3], length) == ZOK) {
-    return;
-  } else {
-    jam();
-/* ------------------------------------------------------------------------- */
-/* WE ARE WAITING FOR RESPONSE FROM TUP HERE. THUS WE NEED TO                */
-/* GO THROUGH THE STATE MACHINE FOR THE OPERATION.                           */
-/* ------------------------------------------------------------------------- */
-    localAbortStateHandlerLab(signal);
-  }//if
+  TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+  ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP);
+  
+  /* TUP_ATTRINFO signal is unrelated to ATTRINFO
+   * It just transports a section IVAL from TUP back to 
+   * LQH
+   */
+  ndbrequire(signal->header.theLength == 3);
+  Uint32 tupAttrInfoWords= signal->theData[1];
+  Uint32 tupAttrInfoIVal= signal->theData[2];
+
+  ndbassert(tupAttrInfoWords > 0);
+  ndbassert(tupAttrInfoIVal != RNIL);
+
+  /* If we have stored ATTRINFO that we sent to TUP, 
+   * free it now
+   */
+  if (regTcPtr->attrInfoIVal != RNIL)
+  {
+    /* We should be expecting to receive attrInfo back */
+    ndbassert( !(regTcPtr->m_flags & 
+                 TcConnectionrec::OP_SAVEATTRINFO) );
+    releaseSection( regTcPtr->attrInfoIVal );
+    regTcPtr->attrInfoIVal= RNIL;
+  }
+
+  /* Store reference to ATTRINFO from TUP */
+  regTcPtr->attrInfoIVal= tupAttrInfoIVal;
+  regTcPtr->currTupAiLen= tupAttrInfoWords;
+
 }//Dblqh::execTUP_ATTRINFO()
 
 /* ------------------------------------------------------------------------- */
@@ -3354,26 +3391,19 @@
 /* ------------------------------------------------------------------------- */
 void Dblqh::lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length) 
 {
-  TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  if (regTcPtr->operation != ZREAD) {
-    if (regTcPtr->operation != ZDELETE)
-    {
-      if (regTcPtr->opExec != 1) {
-	if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
-	  ;
-	} else {
-	  jam();
+  /* Store received AttrInfo in a long section */
+  jam();
+  if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
+    ;
+  } else {
+    jam();
 /* ------------------------------------------------------------------------- */
 /* WE MIGHT BE WAITING FOR RESPONSE FROM SOME BLOCK HERE. THUS WE NEED TO    */
 /* GO THROUGH THE STATE MACHINE FOR THE OPERATION.                           */
 /* ------------------------------------------------------------------------- */
-	  localAbortStateHandlerLab(signal);
-	  return;
-	}//if
-      }//if
-    }//if
-  }
-  c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec, dataPtr, length);
+    localAbortStateHandlerLab(signal);
+    return;
+  }//if
 }//Dblqh::lqhAttrinfoLab()
 
 /* ------------------------------------------------------------------------- */
@@ -3438,6 +3468,32 @@
   return ZOK;
 }//Dblqh::saveTupattrbuf()
 
+
+/* ------------------------------------------------------------------------- */
+/* -------           SAVE ATTRINFO INTO ATTR SECTION                 ------- */
+/*                                                                           */
+/* ------------------------------------------------------------------------- */
+int Dblqh::saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len) 
+{
+  TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+  bool ok= appendToSection(regTcPtr->attrInfoIVal,
+                           dataPtr,
+                           len);
+
+  if (unlikely(!ok))
+  {
+    jam();
+    terrorCode = ZGET_ATTRINBUF_ERROR;
+    return ZGET_ATTRINBUF_ERROR;
+  }//if
+
+  regTcPtr->currTupAiLen+= len;
+  
+  return ZOK;
+} // saveAttrInfoInSection
+
+
 /* ==========================================================================
  * =======                       SEIZE ATTRIBUTE IN BUFFER            ======= 
  *
@@ -3467,6 +3523,7 @@
   attrinbufptr = regAttrinbufptr;
 }//Dblqh::seizeAttrinbuf()
 
+
 /* ==========================================================================
  * =======                        SEIZE TC CONNECT RECORD             ======= 
  * 
@@ -3551,6 +3608,47 @@
   return !mask.isclear();
 }
 
+void Dblqh::execSIGNAL_DROPPED_REP(Signal* signal)
+{
+  /* An incoming signal was dropped, handle it
+   * Dropped signal really means that we ran out of 
+   * long signal buffering to store its sections
+   */
+  jamEntry();
+  
+  const SignalDroppedRep* rep = (SignalDroppedRep*) &signal->theData[0];
+  Uint32 originalGSN= rep->originalGsn;
+
+  DEBUG("SignalDroppedRep received for GSN " << originalGSN);
+
+  switch(originalGSN) {
+  case GSN_LQHKEYREQ:
+  {
+    jam(); 
+    /* Get original signal data - unfortunately it may
+     * have been truncated.  We must not read beyond
+     * word # 22
+     * We will notify the client that their LQHKEYREQ
+     * failed
+     */
+    const LqhKeyReq * const truncatedLqhKeyReq = 
+      (LqhKeyReq *) &rep->originalData[0];
+    
+    noFreeRecordLab(signal, truncatedLqhKeyReq, ZGET_DATAREC_ERROR);
+
+    break;
+  }
+  default:
+    jam();
+    /* Don't expect dropped signals for other GSNs,
+     * default handling
+     */
+    SimulatedBlock::execSIGNAL_DROPPED_REP(signal);
+  };
+  
+  return;
+}
+
 /* ------------------------------------------------------------------------- */
 /* -------                TAKE CARE OF LQHKEYREQ                     ------- */
 /* LQHKEYREQ IS THE SIGNAL THAT STARTS ALL OPERATIONS IN THE LQH BLOCK       */
@@ -3563,6 +3661,7 @@
   Uint8 tfragDistKey;
 
   const LqhKeyReq * const lqhKeyReq = (LqhKeyReq *)signal->getDataPtr();
+  SectionHandle handle(this, signal);
 
   {
     const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
@@ -3570,6 +3669,7 @@
         checkTransporterOverloaded(signal, all, lqhKeyReq) ||
         ERROR_INSERTED_CLEAR(5047)) {
       jam();
+      releaseSections(handle);
       noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
       return;
     }
@@ -3586,6 +3686,7 @@
     if (ERROR_INSERTED(5031)) {
       CLEAR_ERROR_INSERT_VALUE;
     }
+    releaseSections(handle);
     noFreeRecordLab(signal, lqhKeyReq, ZNO_TC_CONNECT_ERROR);
     return;
   }//if
@@ -3593,6 +3694,7 @@
   if(ERROR_INSERTED(5038) && 
      refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
     jam();
+    releaseSections(handle);
     SET_ERROR_INSERT_VALUE(5039);
     return;
   }
@@ -3604,8 +3706,15 @@
   regTcPtr->clientConnectrec = sig0;
   regTcPtr->tcOprec = sig0;
   regTcPtr->storedProcId = ZNIL;
+  regTcPtr->m_flags= 0;
+  bool isLongReq= false;
+  if (handle.m_cnt > 0)
+  {
+    isLongReq= true;
+    regTcPtr->m_flags|= TcConnectionrec::OP_ISLONGREQ;
+  }
 
-  UintR TtotReclenAi = lqhKeyReq->attrLen;
+  UintR attrLenFlags = lqhKeyReq->attrLen;
   sig1 = lqhKeyReq->savePointId;
   sig2 = lqhKeyReq->hashValue;
   UintR Treqinfo = lqhKeyReq->requestInfo;
@@ -3620,16 +3729,16 @@
 
   const Uint8 op = LqhKeyReq::getOperation(Treqinfo);
   if ((op == ZREAD || op == ZREAD_EX) && !getAllowRead()){
+    releaseSections(handle);
     noFreeRecordLab(signal, lqhKeyReq, ZNODE_SHUTDOWN_IN_PROGESS);
     return;
   }
   
   Uint32 senderVersion = getNodeInfo(refToNode(senderRef)).m_version;
 
-  regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(TtotReclenAi);
   regTcPtr->tcScanInfo  = lqhKeyReq->scanInfo;
-  regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(TtotReclenAi);
-  regTcPtr->m_reorg     = LqhKeyReq::getReorgFlag(TtotReclenAi);
+  regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(attrLenFlags);
+  regTcPtr->m_reorg     = LqhKeyReq::getReorgFlag(attrLenFlags);
 
   regTcPtr->readlenAi = 0;
   regTcPtr->currTupAiLen = 0;
@@ -3674,6 +3783,7 @@
       if (markerPtr.i == RNIL)
       {
         noFreeRecordLab(signal, lqhKeyReq, ZNO_FREE_MARKER_RECORDS_ERROR);
+        releaseSections(handle);
         return;
       }
       markerPtr.p->transid1 = sig1;
@@ -3698,7 +3808,6 @@
   regTcPtr->opExec        = LqhKeyReq::getInterpretedFlag(Treqinfo);
   regTcPtr->opSimple      = LqhKeyReq::getSimpleFlag(Treqinfo);
   regTcPtr->seqNoReplica  = LqhKeyReq::getSeqNoReplica(Treqinfo);
-  UintR TreclenAiLqhkey   = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
   regTcPtr->apiVersionNo  = 0; 
   regTcPtr->m_use_rowid   = LqhKeyReq::getRowidFlag(Treqinfo);
   regTcPtr->m_dealloc     = 0;
@@ -3722,11 +3831,7 @@
   CRASH_INSERTION2(5041, (op == ZREAD && 
                           (regTcPtr->opSimple || regTcPtr->dirtyOp) &&
                           refToNode(signal->senderBlockRef()) != cownNodeid));
-  
-  regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
-  regTcPtr->currReclenAi = TreclenAiLqhkey;
-  UintR TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
-  regTcPtr->primKeyLen = TitcKeyLen;
+
   regTcPtr->noFiredTriggers = lqhKeyReq->noFiredTriggers;
 
   UintR TapplAddressInd = LqhKeyReq::getApplicationAddressFlag(Treqinfo);
@@ -3743,7 +3848,7 @@
     regTcPtr->nodeAfterNext[1] = lqhKeyReq->variableData[nextPos] >> 16;
     nextPos++;
   }//if
-  UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(TtotReclenAi);
+  UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(attrLenFlags);
   if (TstoredProcIndicator == 1) {
     regTcPtr->storedProcId = lqhKeyReq->variableData[nextPos] & ZNIL;
     nextPos++;
@@ -3753,29 +3858,82 @@
     regTcPtr->readlenAi = lqhKeyReq->variableData[nextPos] & ZNIL;
     nextPos++;
   }//if
-  sig0 = lqhKeyReq->variableData[nextPos + 0];
-  sig1 = lqhKeyReq->variableData[nextPos + 1];
-  sig2 = lqhKeyReq->variableData[nextPos + 2];
-  sig3 = lqhKeyReq->variableData[nextPos + 3];
-
-  regTcPtr->tupkeyData[0] = sig0;
-  regTcPtr->tupkeyData[1] = sig1;
-  regTcPtr->tupkeyData[2] = sig2;
-  regTcPtr->tupkeyData[3] = sig3;
-
-  if (TitcKeyLen > 0) {
-    if (TitcKeyLen < 4) {
-      nextPos += TitcKeyLen;
-    } else {
-      nextPos += 4;
-    }//if
-  } 
-  else if (! (LqhKeyReq::getNrCopyFlag(Treqinfo)))
+
+  UintR TitcKeyLen = 0;
+  Uint32 keyLenWithLQHReq = 0;
+  UintR TreclenAiLqhkey   = 0;
+
+  if (isLongReq)
+  {
+    /* Long LQHKEYREQ indicates Key and AttrInfo presence and
+     * size via section lengths
+     */
+    SegmentedSectionPtr keyInfoSection, attrInfoSection;
+    
+    handle.getSection(keyInfoSection,
+                      LqhKeyReq::KeyInfoSectionNum);
+
+    ndbassert(keyInfoSection.i != RNIL);
+
+    regTcPtr->keyInfoIVal= keyInfoSection.i;
+    TitcKeyLen= keyInfoSection.sz;
+    keyLenWithLQHReq= TitcKeyLen;
+
+    Uint32 totalAttrInfoLen= 0;
+    if (handle.getSection(attrInfoSection,
+                          LqhKeyReq::AttrInfoSectionNum))
+    {
+      regTcPtr->attrInfoIVal= attrInfoSection.i;
+      totalAttrInfoLen= attrInfoSection.sz;
+    }
+
+    regTcPtr->reclenAiLqhkey = 0;
+    regTcPtr->currReclenAi = totalAttrInfoLen;
+    regTcPtr->totReclenAi = totalAttrInfoLen;
+
+    /* Detach sections from the handle, we are now responsible
+     * for freeing them when appropriate
+     */
+    handle.clear();
+  }
+  else
+  {
+    /* Short LQHKEYREQ, Key and Attr sizes are in
+     * signal, along with some data
+     */
+    TreclenAiLqhkey= LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
+    regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
+    regTcPtr->currReclenAi = TreclenAiLqhkey;
+    TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
+    regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(attrLenFlags);
+
+    /* Note key can be length zero for NR when Rowid used */
+    keyLenWithLQHReq= MIN(TitcKeyLen, LqhKeyReq::MaxKeyInfo);
+
+    bool ok= appendToSection(regTcPtr->keyInfoIVal,
+                             &lqhKeyReq->variableData[ nextPos ],
+                             keyLenWithLQHReq);
+    if (unlikely(!ok))
+    {
+      jam();
+      terrorCode= ZGET_DATAREC_ERROR;
+      abortErrorLab(signal);
+      return;
+    }
+
+    nextPos+= keyLenWithLQHReq;
+  }
+  
+  regTcPtr->primKeyLen = TitcKeyLen;
+
+  /* Only node restart copy allowed to send no KeyInfo */
+  if (( keyLenWithLQHReq == 0 ) &&
+      (! (LqhKeyReq::getNrCopyFlag(Treqinfo))))
   {
     LQHKEY_error(signal, 3);
     return;
   }//if
-  
+
   sig0 = lqhKeyReq->variableData[nextPos + 0];
   sig1 = lqhKeyReq->variableData[nextPos + 1];
   regTcPtr->m_row_id.m_page_no = sig0;
@@ -3892,7 +4050,7 @@
   regTcPtr->replicaType = TcopyType;
   regTcPtr->fragmentptr = fragptr.i;
   regTcPtr->m_log_part_ptr_i = logPart;
-  Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
+  Uint8 TdistKey = LqhKeyReq::getDistributionKey(attrLenFlags);
   if ((tfragDistKey != TdistKey) &&
       (regTcPtr->seqNoReplica == 0) &&
       (regTcPtr->dirtyOp == ZFALSE)) 
@@ -3909,81 +4067,122 @@
     tmp = (tmp < 0 ? - tmp : tmp);
     if ((tmp <= 1) || (tfragDistKey == 0)) {
       LQHKEY_abort(signal, 0);
-      return;
     }//if
     LQHKEY_error(signal, 1);
+    return;
   }//if
-  if (TreclenAiLqhkey != 0) {
-    if (regTcPtr->operation != ZREAD) {
-      if (regTcPtr->operation != ZDELETE) {
-        if (regTcPtr->opExec != 1) {
-          jam();
-/*---------------------------------------------------------------------------*/
-/*                                                                           */
-/* UPDATES, WRITES AND INSERTS THAT ARE NOT INTERPRETED WILL USE THE         */
-/* SAME ATTRINFO IN ALL REPLICAS. THUS WE SAVE THE ATTRINFO ALREADY          */
-/* TO SAVE A SIGNAL FROM TUP TO LQH. INTERPRETED EXECUTION IN TUP            */
-/* WILL CREATE NEW ATTRINFO FOR THE OTHER REPLICAS AND IT IS THUS NOT        */
-/* A GOOD IDEA TO SAVE THE INFORMATION HERE. READS WILL ALSO BE              */
-/* UNNECESSARY TO SAVE SINCE THAT ATTRINFO WILL NEVER BE SENT TO ANY         */
-/* MORE REPLICAS.                                                            */
-/*---------------------------------------------------------------------------*/
-/* READS AND DELETES CAN ONLY HAVE INFORMATION ABOUT WHAT IS TO BE READ.     */
-/* NO INFORMATION THAT NEEDS LOGGING.                                        */
-/*---------------------------------------------------------------------------*/
-          sig0 = lqhKeyReq->variableData[nextPos + 0];
-          sig1 = lqhKeyReq->variableData[nextPos + 1];
-          sig2 = lqhKeyReq->variableData[nextPos + 2];
-          sig3 = lqhKeyReq->variableData[nextPos + 3];
-          sig4 = lqhKeyReq->variableData[nextPos + 4];
-
-          regTcPtr->firstAttrinfo[0] = sig0;
-          regTcPtr->firstAttrinfo[1] = sig1;
-          regTcPtr->firstAttrinfo[2] = sig2;
-          regTcPtr->firstAttrinfo[3] = sig3;
-          regTcPtr->firstAttrinfo[4] = sig4;
-          regTcPtr->currTupAiLen = TreclenAiLqhkey;
-        } else {
-          jam();
-          regTcPtr->reclenAiLqhkey = 0;
-        }//if
-      } else {
+
+  /*
+   * Interpreted updates and deletes may require different AttrInfo in 
+   * different replicas, as only the primary executes the interpreted 
+   * program, and the effect of the program rather than the program
+   * should be logged.
+   * Non interpreted inserts, updates, writes and deletes use the same
+   * AttrInfo in all replicas.
+   * All reads only run on one replica, and are not logged.
+   * The AttrInfo section is passed to TUP attached to the TUPKEYREQ
+   * signal below.
+   *
+   * Normal processing : 
+   *   - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ 
+   *     signal
+   *   - TUP processes request and sends direct TUPKEYCONF back to LQH
+   *   - LQH continues processing (logging, forwarding LQHKEYREQ to other
+   *     replicas as necessary)
+   *   - LQH frees ATTRINFO section
+   *   Note that TUP is not responsible for freeing the passed ATTRINFO
+   *   section, LQH is.
+   *
+   * Interpreted Update / Delete processing 
+   *   - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ 
+   *     signal
+   *   - TUP processes request, generating new ATTRINFO data
+   *   - If new AttrInfo data is > 0 words, TUP sends it back to LQH as
+   *     a long section attached to a single ATTRINFO signal.
+   *     - LQH frees the original AttrInfo section and stores a ref to 
+   *       the new section
+   *   - TUP sends direct TUPKEYCONF back to LQH with new ATTRINFO length
+   *   - If the new ATTRINFO is > 0 words, 
+   *       - LQH continues processing with it (logging, forwarding 
+   *         LQHKEYREQ to other replicas as necessary)
+   *       - LQH frees the new ATTRINFO section
+   *   - If the new ATTRINFO is 0 words, LQH frees the original ATTRINFO
+   *     section and continues processing (logging, forwarding LQHKEYREQ
+   *     to other replicas as necessary)
+   *
+   */
+  bool attrInfoToPropagate= 
+    (regTcPtr->totReclenAi != 0) &&
+    (regTcPtr->operation != ZREAD) &&
+    (regTcPtr->operation != ZDELETE);
+  bool tupCanChangePropagatedAttrInfo= (regTcPtr->opExec == 1);
+  
+  bool saveAttrInfo= 
+    attrInfoToPropagate &&
+    (! tupCanChangePropagatedAttrInfo);
+  
+  if (saveAttrInfo)
+    regTcPtr->m_flags|= TcConnectionrec::OP_SAVEATTRINFO;
+  
+  
+  /* Handle any AttrInfo we received with the LQHKEYREQ */
+  if (regTcPtr->currReclenAi != 0)
+  {
+    jam();
+    if (isLongReq)
+    {
+      /* Long LQHKEYREQ */
+      jam();
+      
+      regTcPtr->currTupAiLen= saveAttrInfo ?
+        regTcPtr->totReclenAi :
+        0;
+    }
+    else
+    {
+      /* Short LQHKEYREQ */
+      jam();
+
+      /* Lets put the AttrInfo into a segmented section */
+      bool ok= appendToSection(regTcPtr->attrInfoIVal,
+                               lqhKeyReq->variableData + nextPos,
+                               TreclenAiLqhkey);
+      if (unlikely(!ok))
+      {
         jam();
-        regTcPtr->reclenAiLqhkey = 0;
-      }//if
-    }//if
-    sig0 = lqhKeyReq->variableData[nextPos + 0];
-    sig1 = lqhKeyReq->variableData[nextPos + 1];
-    sig2 = lqhKeyReq->variableData[nextPos + 2];
-    sig3 = lqhKeyReq->variableData[nextPos + 3];
-    sig4 = lqhKeyReq->variableData[nextPos + 4];
-    
-    c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec, 
-			    lqhKeyReq->variableData+nextPos, TreclenAiLqhkey);
-    
-    if (signal->theData[0] == (UintR)-1) {
-      LQHKEY_abort(signal, 2);
-      return;
-    }//if
+        terrorCode= ZGET_DATAREC_ERROR;
+        abortErrorLab(signal);
+        return;
+      }
+        
+      regTcPtr->currTupAiLen= TreclenAiLqhkey;
+    }
   }//if
-/* ------- TAKE CARE OF PRIM KEY DATA ------- */
-  if (regTcPtr->primKeyLen <= 4) {
+
+  /* If we've received all KeyInfo, proceed with processing,
+   * otherwise wait for discrete KeyInfo signals
+   */
+  if (regTcPtr->primKeyLen == keyLenWithLQHReq) {
     endgettupkeyLab(signal);
     return;
   } else {
     jam();
-/*--------------------------------------------------------------------*/
-/*       KEY LENGTH WAS MORE THAN 4 WORDS (WORD = 4 BYTE). THUS WE    */
-/*       HAVE TO ALLOCATE A DATA BUFFER TO STORE THE KEY DATA AND     */
-/*       WAIT FOR THE KEYINFO SIGNAL.                                 */
-/*--------------------------------------------------------------------*/
-    regTcPtr->save1 = 4;
+    ndbassert(!isLongReq);
+    /* Wait for remaining KeyInfo */
+    regTcPtr->save1 = keyLenWithLQHReq;
     regTcPtr->transactionState = TcConnectionrec::WAIT_TUPKEYINFO;
     return;
   }//if
   return;
 }//Dblqh::execLQHKEYREQ()
 
+
+
+/**
+ * endgettupkeyLab
+ * Invoked when all KeyInfo and/or all AttrInfo has been 
+ * received
+ */
 void Dblqh::endgettupkeyLab(Signal* signal) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
@@ -3991,7 +4190,10 @@
     ;
   } else {
     jam();
+    /* Wait for discrete AttrInfo signals */
     ndbrequire(regTcPtr->currReclenAi < regTcPtr->totReclenAi);
+    ndbassert( !(regTcPtr->m_flags & 
+                 TcConnectionrec::OP_ISLONGREQ) );
     regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
     return;
   }//if
@@ -4119,7 +4321,7 @@
       TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
       TRACENR(" rowid: " << regTcPtr->m_row_id);
-    TRACENR(" key: " << regTcPtr->tupkeyData[0]);
+    TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr, 0));
   }
   
   if (likely(activeCreat == Fragrecord::AC_NORMAL))
@@ -4180,21 +4382,13 @@
   signal->theData[5] = sig4;
 
   sig0 = regTcPtr.p->transid[1];
-  sig1 = regTcPtr.p->tupkeyData[0];
-  sig2 = regTcPtr.p->tupkeyData[1];
-  sig3 = regTcPtr.p->tupkeyData[2];
-  sig4 = regTcPtr.p->tupkeyData[3];
   signal->theData[6] = sig0;
-  signal->theData[7] = sig1;
-  signal->theData[8] = sig2;
-  signal->theData[9] = sig3;
-  signal->theData[10] = sig4;
+
+  /* Copy KeyInfo to end of ACCKEYREQ signal, starting at offset 7 */
+  sendKeyinfoAcc(signal, 7);
 
   TRACE_OP(regTcPtr.p, "ACC");
   
-  if (regTcPtr.p->primKeyLen > 4) {
-    sendKeyinfoAcc(signal, 11);
-  }//if
   EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ, 
 		 signal, 7 + regTcPtr.p->primKeyLen);
   if (signal->theData[0] < RNIL) {
@@ -4266,7 +4460,7 @@
        * Case 4
        *   Perform delete using rowid
        *     primKeyLen == 0
-       *     tupkeyData[0] == rowid
+       *     key[0] == rowid
        */
       jam();
       ndbassert(regTcPtr.p->primKeyLen == 0);
@@ -4380,6 +4574,10 @@
   packLqhkeyreqLab(signal);
 }
 
+/**
+ * Compare received key data with the data supplied
+ * returning 0 if they are the same, 1 otherwise
+ */
 int
 Dblqh::compare_key(const TcConnectionrec* regTcPtr, 
 		   const Uint32 * ptr, Uint32 len)
@@ -4387,32 +4585,30 @@
   if (regTcPtr->primKeyLen != len)
     return 1;
   
-  if (len <= 4)
-    return memcmp(ptr, regTcPtr->tupkeyData, 4*len);
-  
-  if (memcmp(ptr, regTcPtr->tupkeyData, sizeof(regTcPtr->tupkeyData)))
-    return 1;
-  
-  len -= (sizeof(regTcPtr->tupkeyData) >> 2);
-  ptr += (sizeof(regTcPtr->tupkeyData) >> 2);
-
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-  ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-  while(len > 4)
+  ndbassert( regTcPtr->keyInfoIVal != RNIL );
+
+  SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                              getSectionSegmentPool());
+  
+  ndbassert(regTcPtr->primKeyLen == keyInfoReader.getSize());
+
+  while (len != 0)
   {
-    if (memcmp(ptr, regDatabufptr.p, 4*4))
+    const Uint32* keyChunk= NULL;
+    Uint32 chunkSize= 0;
+
+    /* Get a ptr to a chunk of contiguous words to compare */
+    bool ok= keyInfoReader.getWordsPtr(len, keyChunk, chunkSize);
+
+    ndbrequire(ok);
+
+    if ( memcmp(ptr, keyChunk, chunkSize << 2))
       return 1;
-
-    ptr += 4;
-    len -= 4;
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);    
+    
+    ptr+= chunkSize;
+    len-= chunkSize;
   }
 
-  if (memcmp(ptr, regDatabufptr.p, 4*len))
-    return 1;
-
   return 0;
 }
 
@@ -4454,12 +4650,11 @@
     keylen = regTcPtr.p->primKeyLen;
     signal->theData[3] = regTcPtr.p->hashValue;
     signal->theData[4] = keylen;
-    signal->theData[7] = regTcPtr.p->tupkeyData[0];
-    signal->theData[8] = regTcPtr.p->tupkeyData[1];
-    signal->theData[9] = regTcPtr.p->tupkeyData[2];
-    signal->theData[10] = regTcPtr.p->tupkeyData[3];
-    if (keylen > 4)
-      sendKeyinfoAcc(signal, 11);
+
+    /* Copy KeyInfo inline into the ACCKEYREQ signal, 
+     * starting at word 7 
+     */
+    sendKeyinfoAcc(signal, 7);
   }
   const Uint32 ref = refToBlock(regTcPtr.p->tcAccBlockref);
   EXECUTE_DIRECT(ref, GSN_ACCKEYREQ, signal, 7 + keylen);
@@ -4637,19 +4832,7 @@
   regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
   Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
 
-  memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
-  if (keyLen > 4)
-  {
-    tmp += 4;
-    Uint32 pos = 4;
-    do {
-      ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-      memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
-      regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-      tmp += sizeof(regDatabufptr.p->data) >> 2;
-      pos += sizeof(regDatabufptr.p->data) >> 2;
-    } while(pos < keyLen);
-  }    
+  copy(tmp, regTcPtr.p->keyInfoIVal);
   
   if (xfrm)
   {
@@ -4661,29 +4844,42 @@
   return keyLen;
 }
 
+/**
+ * getKeyInfoWordOrZero
+ * Get given word of KeyInfo, or zero if it's not available
+ * Used for tracing
+ */
+Uint32
+Dblqh::getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr,
+                            Uint32 offset)
+{
+  if (regTcPtr->keyInfoIVal != RNIL)
+  {
+    SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                                g_sectionSegmentPool);
+    
+    if (keyInfoReader.getSize() > offset)
+    {
+      if (offset)
+        keyInfoReader.step(offset);
+      
+      Uint32 word;
+      keyInfoReader.getWord(&word);
+      return word;
+    }
+  }
+  return 0;
+}
+
 /* =*======================================================================= */
 /* =======                 SEND KEYINFO TO ACC                       ======= */
 /*                                                                           */
 /* ========================================================================= */
 void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti) 
 {
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-  
-  do {
-    jam();
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-    Uint32 sig0 = regDatabufptr.p->data[0];
-    Uint32 sig1 = regDatabufptr.p->data[1];
-    Uint32 sig2 = regDatabufptr.p->data[2];
-    Uint32 sig3 = regDatabufptr.p->data[3];
-    signal->theData[Ti] = sig0;
-    signal->theData[Ti + 1] = sig1;
-    signal->theData[Ti + 2] = sig2;
-    signal->theData[Ti + 3] = sig3;
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-    Ti += 4;
-  } while (regDatabufptr.i != RNIL);
+  /* Copy all KeyInfo into the signal at offset Ti */
+  copy(&signal->theData[Ti],
+       tcConnectptr.p->keyInfoIVal);
 }//Dblqh::sendKeyinfoAcc()
 
 void Dblqh::execLQH_ALLOCREQ(Signal* signal)
@@ -4895,6 +5091,18 @@
   regTcPtr->m_row_id.m_page_no = page_no;
   regTcPtr->m_row_id.m_page_idx = page_idx;
   
+  tupKeyReq->attrInfoIVal= RNIL;
+
+  /* Pass AttrInfo section if available in the TupKeyReq signal
+   * We are still responsible for releasing it, TUP is just
+   * borrowing it
+   */
+  if (tupKeyReq->attrBufLen > 0)
+  {
+    ndbassert( regTcPtr->attrInfoIVal != RNIL );
+    tupKeyReq->attrInfoIVal= regTcPtr->attrInfoIVal;
+  }
+
   EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
 }//Dblqh::execACCKEYCONF()
 
@@ -5013,7 +5221,12 @@
     commitContinueAfterBlockedLab(signal);
     return;
   }//if
+
   regTcPtr->totSendlenAi = writeLen;
+  /* We will propagate / log writeLen words
+   * Check that that is how many we have available to 
+   * propagate
+   */
   ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
   
   if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@@ -5446,13 +5659,26 @@
 
   Uint32 nextNodeId = regTcPtr->nextReplica;
   Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
-  UintR TAiLen = regTcPtr->reclenAiLqhkey;
+
+  /* Send long LqhKeyReq to next replica if it can support it */
+  bool sendLongReq= ! ((nextVersion < NDBD_LONG_LQHKEYREQ) || 
+                       ERROR_INSERTED(5051));
+  
+  UintR TAiLen = sendLongReq ?
+    0 :
+    MIN(regTcPtr->totSendlenAi, LqhKeyReq::MaxAttrInfo);
+
+  /* Long LQHKeyReq uses section size for key length */
+  Uint32 lqhKeyLen= sendLongReq?
+    0 :
+    regTcPtr->primKeyLen;
 
   UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
   LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
   LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
   LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
   LqhKeyReq::setAIInLqhKeyReq(Treqinfo, TAiLen);
+  LqhKeyReq::setKeyLen(Treqinfo,lqhKeyLen);
   
   if (unlikely(nextVersion < NDBD_ROWID_VERSION))
   {
@@ -5481,7 +5707,11 @@
   LqhKeyReq::setSameClientAndTcFlag(Treqinfo, TsameLqhAndClient);
   LqhKeyReq::setReturnedReadLenAIFlag(Treqinfo, TreadLenAiInd);
 
-  UintR TotReclenAi = regTcPtr->totSendlenAi;
+  /* Long LQHKeyReq uses section size for AttrInfo length */
+  UintR TotReclenAi = sendLongReq ? 
+    0 :
+    regTcPtr->totSendlenAi;
+
   LqhKeyReq::setReorgFlag(TotReclenAi, regTcPtr->m_reorg);
 
 /* ------------------------------------------------------------------------- */
@@ -5530,23 +5760,33 @@
     nextPos++;
   }//if
   sig0 = regTcPtr->readlenAi;
-  sig1 = regTcPtr->tupkeyData[0];
-  sig2 = regTcPtr->tupkeyData[1];
-  sig3 = regTcPtr->tupkeyData[2];
-  sig4 = regTcPtr->tupkeyData[3];
-
   lqhKeyReq->variableData[nextPos] = sig0;
   nextPos += TreadLenAiInd;
-  lqhKeyReq->variableData[nextPos] = sig1;
-  lqhKeyReq->variableData[nextPos + 1] = sig2;
-  lqhKeyReq->variableData[nextPos + 2] = sig3;
-  lqhKeyReq->variableData[nextPos + 3] = sig4;
-  UintR TkeyLen = LqhKeyReq::getKeyLen(Treqinfo);
-  if (TkeyLen < 4) {
-    nextPos += TkeyLen;
-  } else {
-    nextPos += 4;
-  }//if
+
+  if (!sendLongReq)
+  {
+    /* Short LQHKEYREQ to older LQH
+     * First few words of KeyInfo go into LQHKEYREQ
+     * Sometimes have no Keyinfo
+     */
+    if (regTcPtr->primKeyLen != 0)
+    {
+      SegmentedSectionPtr keyInfoSection;
+      
+      ndbassert(regTcPtr->keyInfoIVal != RNIL);
+
+      getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+      SectionReader keyInfoReader(keyInfoSection, g_sectionSegmentPool);
+      
+      UintR keyLenInLqhKeyReq= MIN(LqhKeyReq::MaxKeyInfo, 
+                                   regTcPtr->primKeyLen);
+      
+      keyInfoReader.getWords(&lqhKeyReq->variableData[nextPos], 
+                             keyLenInLqhKeyReq);
+      
+      nextPos+= keyLenInLqhKeyReq;
+    }
+  }
 
   sig0 = regTcPtr->gci_hi;
   Local_key tmp = regTcPtr->m_row_id;
@@ -5560,77 +5800,147 @@
 
   BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
   
-  if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+  if (likely(sendLongReq))
   {
-    jam();
-    sig0 = regTcPtr->firstAttrinfo[0];
-    sig1 = regTcPtr->firstAttrinfo[1];
-    sig2 = regTcPtr->firstAttrinfo[2];
-    sig3 = regTcPtr->firstAttrinfo[3];
-    sig4 = regTcPtr->firstAttrinfo[4];
-
-    lqhKeyReq->variableData[nextPos] = sig0;
-    lqhKeyReq->variableData[nextPos + 1] = sig1;
-    lqhKeyReq->variableData[nextPos + 2] = sig2;
-    lqhKeyReq->variableData[nextPos + 3] = sig3;
-    lqhKeyReq->variableData[nextPos + 4] = sig4;
+    /* Long LQHKEYREQ, attach KeyInfo and AttrInfo
+     * sections to signal
+     */
+    SectionHandle handle(this);
+    handle.m_cnt= 0;
+
+    if (regTcPtr->primKeyLen > 0)
+    {
+      SegmentedSectionPtr keyInfoSection;
+      
+      ndbassert(regTcPtr->keyInfoIVal != RNIL);
+      getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+      
+      handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+      handle.m_cnt= 1;
+
+      if (regTcPtr->totSendlenAi > 0)
+      {
+        SegmentedSectionPtr attrInfoSection;
+        
+        ndbassert(regTcPtr->attrInfoIVal != RNIL);
+        getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+        
+        handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+        handle.m_cnt= 2;
+      }
+      else
+      {
+        /* No AttrInfo to be sent on.  This can occur for delete
+         * or with an interpreted update when no actual update 
+         * is made
+         * In this case, we free any attrInfo section now.
+         */
+        if (regTcPtr->attrInfoIVal != RNIL)
+        {
+          ndbassert(!( regTcPtr->m_flags & 
+                       TcConnectionrec::OP_SAVEATTRINFO))
+          releaseSection(regTcPtr->attrInfoIVal);
+          regTcPtr->attrInfoIVal= RNIL;
+        }
+      }
+    }
+    else
+    {
+      /* Zero-length primary key, better not have any
+       * AttrInfo
+       */
+      ndbrequire(regTcPtr->totSendlenAi == 0);
+      ndbassert(regTcPtr->keyInfoIVal == RNIL);
+      ndbassert(regTcPtr->attrInfoIVal == RNIL);
+    }
+
+    sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
+               LqhKeyReq::FixedSignalLength + nextPos,
+               JBB,
+               &handle);
     
-    nextPos += TAiLen;
-    TAiLen = 0;
+    /* Long sections were freed as part of sendSignal */
+    ndbassert( handle.m_cnt == 0);
+    regTcPtr->keyInfoIVal= RNIL;
+    regTcPtr->attrInfoIVal= RNIL;
   }
   else
   {
-    Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
-    lqhKeyReq->requestInfo = Treqinfo;
-  }
-  
-  sendSignal(lqhRef, GSN_LQHKEYREQ, signal, 
-             nextPos + LqhKeyReq::FixedSignalLength, JBB);
-  if (regTcPtr->primKeyLen > 4) {
-    jam();
-/* ------------------------------------------------------------------------- */
-/* MORE THAN 4 WORDS OF KEY DATA IS IN THE OPERATION. THEREFORE WE NEED TO   */
-/* PREPARE A KEYINFO SIGNAL. MORE THAN ONE KEYINFO SIGNAL CAN BE SENT.       */
-/* ------------------------------------------------------------------------- */
-    sendTupkey(signal);
-  }//if
-/* ------------------------------------------------------------------------- */
-/* NOW I AM PREPARED TO SEND ALL THE ATTRINFO SIGNALS. AT THE MOMENT A LOOP  */
-/* SENDS ALL AT ONCE. LATER WE HAVE TO ADDRESS THE PROBLEM THAT THESE COULD  */
-/* LEAD TO BUFFER EXPLOSION => NODE CRASH.                                   */
-/* ------------------------------------------------------------------------- */
-/*       NEW CODE TO SEND ATTRINFO IN PACK_LQHKEYREQ  */
-/*       THIS CODE USES A REAL-TIME BREAK AFTER       */
-/*       SENDING 16 SIGNALS.                          */
-/* -------------------------------------------------- */
-  sig0 = regTcPtr->tcOprec;
-  sig1 = regTcPtr->transid[0];
-  sig2 = regTcPtr->transid[1];
-  signal->theData[0] = sig0;
-  signal->theData[1] = sig1;
-  signal->theData[2] = sig2;
-  
-  if (unlikely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength > 25))
-  {
-    jam();
-    /**
-     * 4 replicas...
+    /* Short LQHKEYREQ to older LQH
+     * First few words of ATTRINFO go into LQHKEYREQ
+     * (if they fit)
      */
-    memcpy(signal->theData+3, regTcPtr->firstAttrinfo, TAiLen << 2);
-    sendSignal(lqhRef, GSN_ATTRINFO, signal, 3 + TAiLen, JBB);    
+    if (TAiLen > 0)
+    {
+      if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+      {
+        jam();
+        SegmentedSectionPtr attrInfoSection;
+        
+        ndbassert(regTcPtr->attrInfoIVal != RNIL);
+        
+        getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+        SectionReader attrInfoReader(attrInfoSection, getSectionSegmentPool());
+        
+        attrInfoReader.getWords(&lqhKeyReq->variableData[nextPos], 
+                                TAiLen);
+        
+        nextPos+= TAiLen;
+      }
+      else
+      {
+        /* Not enough space in LQHKEYREQ, we'll send everything in
+         * separate ATTRINFO signals
+         */
+        Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
+        lqhKeyReq->requestInfo = Treqinfo;
+        TAiLen= 0;
+      }
+    }
+  
+    sendSignal(lqhRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB);
+
+    /* Send extra KeyInfo signals if necessary... */
+    if (regTcPtr->primKeyLen > LqhKeyReq::MaxKeyInfo) {
+      jam();
+      sendTupkey(signal);
+    }//if
+
+    /* Send extra AttrInfo signals if necessary... */
+    Uint32 remainingAiLen= regTcPtr->totSendlenAi - TAiLen;
+    
+    if (remainingAiLen != 0)
+    {
+      sig0 = regTcPtr->tcOprec;
+      sig1 = regTcPtr->transid[0];
+      sig2 = regTcPtr->transid[1];
+      signal->theData[0] = sig0;
+      signal->theData[1] = sig1;
+      signal->theData[2] = sig2;
+
+      SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+                                   g_sectionSegmentPool);
+
+      ndbassert(attrInfoReader.getSize() == regTcPtr->totSendlenAi);
+
+      /* Step over words already sent in LQHKEYREQ above */
+      attrInfoReader.step(TAiLen);
+
+      while (remainingAiLen != 0)
+      {
+        Uint32 dataInSignal= MIN(AttrInfo::DataLength, remainingAiLen);
+        attrInfoReader.getWords(&signal->theData[3],
+                                dataInSignal);
+        remainingAiLen-= dataInSignal;
+        sendSignal(lqhRef, GSN_ATTRINFO, signal, 
+                   AttrInfo::HeaderLength + dataInSignal, JBB);
+      }
+    }
   }
 
-  AttrbufPtr regAttrinbufptr;
-  regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
-  while (regAttrinbufptr.i != RNIL) {
-    ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
-    jam();
-    Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
-    ndbrequire(dataLen != 0);
-    MEMCOPY_NO_WORDS(&signal->theData[3], &regAttrinbufptr.p->attrbuf[0], dataLen);
-    regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
-    sendSignal(lqhRef, GSN_ATTRINFO, signal, dataLen + 3, JBB);
-  }//while
+  /* LQHKEYREQ sent */
+
   regTcPtr->transactionState = TcConnectionrec::PREPARED;
   if (regTcPtr->dirtyOp == ZTRUE) {
     jam();
@@ -5758,49 +6068,24 @@
 void Dblqh::writeKey(Signal* signal) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  Uint32 logPos, endPos, dataLen;
-  Int32 remainingLen;
-  logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  remainingLen = regTcPtr->primKeyLen;
-  dataLen = remainingLen;
-  if (remainingLen > 4)
-    dataLen = 4;
-  remainingLen -= dataLen;
-  endPos = logPos + dataLen;
-  if (endPos < ZPAGE_SIZE) {
-    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                     &regTcPtr->tupkeyData[0],
-                     dataLen);
-  } else {
-    jam();
-    for (Uint32 i = 0; i < dataLen; i++)
-      writeLogWord(signal, regTcPtr->tupkeyData[i]);
-    endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  }//if
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = regTcPtr->firstTupkeybuf;
-  while (remainingLen > 0) {
-    logPos = endPos;
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-    dataLen = remainingLen;
-    if (remainingLen > 4)
-      dataLen = 4;
-    remainingLen -= dataLen;
-    endPos += dataLen;
-    if (endPos < ZPAGE_SIZE) {
-      MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                       &regDatabufptr.p->data[0],
-                       dataLen);
-    } else {
-      logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
-      for (Uint32 i = 0; i < dataLen; i++)
-        writeLogWord(signal, regDatabufptr.p->data[i]);
-      endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-    }//if
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-  }//while
-  logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
-  ndbrequire(regDatabufptr.i == RNIL);
+  jam();
+  SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                              g_sectionSegmentPool);
+  const Uint32* srcPtr;
+  Uint32 length;
+  Uint32 wordsWritten= 0;
+
+  /* Write contiguous chunks of words from the KeyInfo
+   * section to the log 
+   */
+  while (keyInfoReader.getWordsPtr(srcPtr,
+                                   length))
+  {
+    writeLogWords(signal, srcPtr, length);
+    wordsWritten+= length;
+  }
+
+  ndbassert( wordsWritten == regTcPtr->primKeyLen );
 }//Dblqh::writeKey()
 
 /* --------------------------------------------------------------------------
@@ -5814,44 +6099,26 @@
   Uint32 totLen = regTcPtr->currTupAiLen;
   if (totLen == 0)
     return;
-  Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  Uint32 lqhLen = regTcPtr->reclenAiLqhkey;
-  ndbrequire(totLen >= lqhLen);
-  Uint32 endPos = logPos + lqhLen;
-  totLen -= lqhLen;
-  if (endPos < ZPAGE_SIZE) {
-    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                     &regTcPtr->firstAttrinfo[0],
-                     lqhLen);
-  } else {
-    for (Uint32 i = 0; i < lqhLen; i++)
-      writeLogWord(signal, regTcPtr->firstAttrinfo[i]);
-    endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  }//if
-  AttrbufPtr regAttrinbufptr;
-  regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
-  while (totLen > 0) {
-    logPos = endPos;
-    ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
-    Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
-    ndbrequire(totLen >= dataLen);
-    ndbrequire(dataLen > 0);
-    totLen -= dataLen;
-    endPos += dataLen;
-    if (endPos < ZPAGE_SIZE) {
-      MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                      &regAttrinbufptr.p->attrbuf[0],
-                      dataLen);
-    } else {
-      logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
-      for (Uint32 i = 0; i < dataLen; i++)
-        writeLogWord(signal, regAttrinbufptr.p->attrbuf[i]);
-      endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-    }//if
-    regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
-  }//while
-  logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
-  ndbrequire(regAttrinbufptr.i == RNIL);
+
+  jam();
+  ndbassert( regTcPtr->attrInfoIVal != RNIL );
+  SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+                               g_sectionSegmentPool);
+  const Uint32* srcPtr;
+  Uint32 length;
+  Uint32 wordsWritten= 0;
+
+  /* Write contiguous chunks of words from the 
+   * AttrInfo section to the log 
+   */
+  while (attrInfoReader.getWordsPtr(srcPtr,
+                                    length))
+  {
+    writeLogWords(signal, srcPtr, length);
+    wordsWritten+= length;
+  }
+
+  ndbassert( wordsWritten == totLen );
 }//Dblqh::writeAttrinfoLab()
 
 /* ------------------------------------------------------------------------- */
@@ -5861,31 +6128,31 @@
 /* ------------------------------------------------------------------------- */
 void Dblqh::sendTupkey(Signal* signal) 
 {
-  UintR TdataPos = 3;
   BlockReference lqhRef = calcLqhBlockRef(tcConnectptr.p->nextReplica);
   signal->theData[0] = tcConnectptr.p->tcOprec;
   signal->theData[1] = tcConnectptr.p->transid[0];
   signal->theData[2] = tcConnectptr.p->transid[1];
-  databufptr.i = tcConnectptr.p->firstTupkeybuf;
-  do {
-    ptrCheckGuard(databufptr, cdatabufFileSize, databuf);
-    signal->theData[TdataPos] = databufptr.p->data[0];
-    signal->theData[TdataPos + 1] = databufptr.p->data[1];
-    signal->theData[TdataPos + 2] = databufptr.p->data[2];
-    signal->theData[TdataPos + 3] = databufptr.p->data[3];
-
-    databufptr.i = databufptr.p->nextDatabuf;
-    TdataPos += 4;
-    if (databufptr.i == RNIL) {
-      jam();
-      sendSignal(lqhRef, GSN_KEYINFO, signal, TdataPos, JBB);
-      return;
-    } else if (TdataPos == 23) {
-      jam();
-      sendSignal(lqhRef, GSN_KEYINFO, signal, 23, JBB);
-      TdataPos = 3;
-    }
-  } while (1);
+
+  Uint32 remainingLen= tcConnectptr.p->primKeyLen - 
+    LqhKeyReq::MaxKeyInfo;
+
+  SectionReader keyInfoReader(tcConnectptr.p->keyInfoIVal,
+                              g_sectionSegmentPool);
+
+  ndbassert(keyInfoReader.getSize() > LqhKeyReq::MaxKeyInfo);
+
+  /* Step over the words already sent in LQHKEYREQ */
+  keyInfoReader.step(LqhKeyReq::MaxKeyInfo);
+
+  while (remainingLen != 0)
+  {
+    Uint32 dataInSignal= MIN(KeyInfo::DataLength, remainingLen);
+    keyInfoReader.getWords(&signal->theData[3],
+                           dataInSignal);
+    remainingLen-= dataInSignal;
+    sendSignal(lqhRef, GSN_KEYINFO, signal, 
+               KeyInfo::HeaderLength + dataInSignal, JBB);
+  }
 }//Dblqh::sendTupkey()
 
 void Dblqh::cleanUp(Signal* signal) 
@@ -5935,6 +6202,12 @@
   regTcPtr->firstTupkeybuf = RNIL;
   regTcPtr->lastTupkeybuf = RNIL;
 
+  /* Release long sections if present */
+  releaseSection(regTcPtr->keyInfoIVal);
+  regTcPtr->keyInfoIVal = RNIL;
+  releaseSection(regTcPtr->attrInfoIVal);
+  regTcPtr->attrInfoIVal = RNIL;
+
   if (regTcPtr->m_dealloc)
   {
     jam();
@@ -6689,7 +6962,7 @@
 	  TRACENR(" NrCopy");
 	if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
 	  TRACENR(" rowid: " << regTcPtr.p->m_row_id);
-	TRACENR(" key: " << regTcPtr.p->tupkeyData[0]);
+	TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr.p, 0));
 	TRACENR(endl);
       }
 
@@ -7183,7 +7456,7 @@
       TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
       TRACENR(" rowid: " << tcPtr->m_row_id);
-    TRACENR(" key: " << tcPtr->tupkeyData[0]);
+    TRACENR(" key: " << getKeyInfoWordOrZero(tcPtr, 0));
     TRACENR(endl);
     
   }
@@ -9513,27 +9786,29 @@
   tupKeyReq->tcOpIndex = regTcPtr->tcOprec;
   tupKeyReq->savePointId = regTcPtr->savePointId;
   tupKeyReq->disk_page= disk_page;
+  tupKeyReq->attrInfoIVal= RNIL;
   Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
   EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, 
 		 TupKeyReq::SignalLength);
 }
 
 /* -------------------------------------------------------------------------
- *       RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
+ *       STORE KEYINFO IN A LONG SECTION PRIOR TO SENDING
  * -------------------------------------------------------------------------
  *       PRECONDITION:   SCAN_STATE = WAIT_SCAN_KEYINFO
  * ------------------------------------------------------------------------- */
-void 
-Dblqh::keyinfoLab(const Uint32 * src, const Uint32 * end) 
+bool 
+Dblqh::keyinfoLab(const Uint32 * src, Uint32 len) 
 {
-  do {
-    jam();
-    seizeTupkeybuf(0);
-    databufptr.p->data[0] = * src ++;
-    databufptr.p->data[1] = * src ++;
-    databufptr.p->data[2] = * src ++;
-    databufptr.p->data[3] = * src ++;
-  } while (src < end);
+  ndbassert( tcConnectptr.p->keyInfoIVal == RNIL );
+  ndbassert( len > 0 );
+
+  if (ERROR_INSERTED(5052))
+    return false;
+
+  return(appendToSection(tcConnectptr.p->keyInfoIVal,
+                         src,
+                         len));
 }//Dblqh::keyinfoLab()
 
 Uint32
@@ -10754,7 +11029,7 @@
 
   scanptr.p->m_curr_batch_size_rows++;
   
-  if (signal->getLength() == 7)
+  if (signal->getLength() == NextScanConf::SignalLengthNoKeyInfo)
   {
     jam();
     ndbrequire(nextScanConf->accOperationPtr == RNIL);
@@ -10843,27 +11118,22 @@
 void Dblqh::execTRANSID_AI(Signal* signal) 
 {
   jamEntry();
+  /* TransID_AI received from local TUP, data is linear inline in 
+   * signal buff 
+   */
   tcConnectptr.i = signal->theData[0];
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  Uint32 length = signal->length() - 3;
+  Uint32 length = signal->length() - TransIdAI::HeaderLength;
   ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::COPY_TUPKEY);
-  Uint32 * src = &signal->theData[3];
-  while(length > 22){
-    if (saveTupattrbuf(signal, src, 22) == ZOK) {
-      ;
-    } else {
-      jam();
-      tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
-      return;
-    }//if
-    src += 22;
-    length -= 22;
-  }
-  if (saveTupattrbuf(signal, src, length) == ZOK) {
-    return;
-  }
-  jam();
-  tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
+  Uint32 * src = &signal->theData[ TransIdAI::HeaderLength ];
+  bool ok= appendToSection(tcConnectptr.p->attrInfoIVal,
+                           src,
+                           length);
+  if (unlikely(! ok))
+  {
+    jam();
+    tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
+  }
 }//Dblqh::execTRANSID_AI()
 
 /*--------------------------------------------------------------------------*/
@@ -10907,13 +11177,15 @@
   tcConnectptr.p->totSendlenAi = readLength;
   tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
 
-  // Read primary keys (used to get here via scan keyinfo)
+  /* Read primary keys from TUP into signal buffer space
+   * (used to get here via scan keyinfo)
+   */
   Uint32* tmp = signal->getDataPtrSend()+24;
   Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
   
   tcConP->gci_hi = tmp[len];
   tcConP->gci_lo = 0;
-  // Calculate hash (no need to linearies key)
+  // Calculate hash (no need to linearise key)
   if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
   {
     tcConnectptr.p->hashValue = calculateHash(tableId, tmp);
@@ -10923,10 +11195,21 @@
     tcConnectptr.p->hashValue = md5_hash((Uint64*)tmp, len);
   }
 
-  // Move into databuffer to make packLqhkeyreqLab happy
-  memcpy(tcConP->tupkeyData, tmp, 4*4);
-  if(len > 4)
-    keyinfoLab(tmp+4, tmp + len);
+  // Copy keyinfo into long section for LQHKEYREQ below
+  if (unlikely(!keyinfoLab(tmp, len)))
+  {
+    /* Failed to store keyInfo, fail copy 
+     * This will result in a COPY_FRAGREF being sent to
+     * the starting node, which will cause it to fail
+     */
+    scanptr.p->scanErrorCounter++;
+    tcConP->errorCode= ZGET_DATAREC_ERROR;
+    scanptr.p->scanCompletedStatus= ZTRUE;
+
+    closeCopyLab(signal);
+    return;
+  }
+
   LqhKeyReq::setKeyLen(tcConP->reqinfo, len);
 
 /*---------------------------------------------------------------------------*/
@@ -16571,7 +16854,7 @@
 /*       ACTIVE CREATION TO FALSE. THIS WILL ENSURE THAT THE ABORT IS      */
 /*       COMPLETED.                                                        */
 /*************************************************************************>*/
-      if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
+      if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
         jam();
         if (tcConnectptr.p->transactionState == 
             TcConnectionrec::WAIT_AI_AFTER_ABORT) {
@@ -17541,6 +17824,9 @@
       tcConnectptr.p->lastAttrinbuf = RNIL;
       tcConnectptr.p->firstTupkeybuf = RNIL;
       tcConnectptr.p->lastTupkeybuf = RNIL;
+      tcConnectptr.p->keyInfoIVal = RNIL;
+      tcConnectptr.p->attrInfoIVal = RNIL;
+      tcConnectptr.p->m_flags= 0;
       tcConnectptr.p->tcTimer = 0;
       tcConnectptr.p->nextTcConnectrec = tcConnectptr.i + 1;
     }//for
@@ -18007,27 +18293,13 @@
 void Dblqh::readAttrinfo(Signal* signal) 
 {
   Uint32 remainingLen = tcConnectptr.p->totSendlenAi;
+  tcConnectptr.p->reclenAiLqhkey = 0;
   if (remainingLen == 0) {
     jam();
-    tcConnectptr.p->reclenAiLqhkey = 0;
     return;
   }//if
-  Uint32 dataLen = remainingLen;
-  if (remainingLen > 5)
-    dataLen = 5;
-  readLogData(signal, dataLen, &tcConnectptr.p->firstAttrinfo[0]);
-  tcConnectptr.p->reclenAiLqhkey = dataLen;
-  remainingLen -= dataLen;
-  while (remainingLen > 0) {
-    jam();
-    dataLen = remainingLen;
-    if (remainingLen > 22)
-      dataLen = 22;
-    seizeAttrinbuf(signal);
-    readLogData(signal, dataLen, &attrinbufptr.p->attrbuf[0]);
-    attrinbufptr.p->attrbuf[ZINBUF_DATA_LEN] = dataLen;
-    remainingLen -= dataLen;
-  }//while
+
+  readLogData(signal, remainingLen, tcConnectptr.p->attrInfoIVal);
 }//Dblqh::readAttrinfo()
 
 /* ------------------------------------------------------------------------- */
@@ -18201,20 +18473,8 @@
 {
   Uint32 remainingLen = tcConnectptr.p->primKeyLen;
   ndbrequire(remainingLen != 0);
-  Uint32 dataLen = remainingLen;
-  if (remainingLen > 4)
-    dataLen = 4;
-  readLogData(signal, dataLen, &tcConnectptr.p->tupkeyData[0]);
-  remainingLen -= dataLen;
-  while (remainingLen > 0) {
-    jam();
-    seizeTupkeybuf(signal);
-    dataLen = remainingLen;
-    if (dataLen > 4)
-      dataLen = 4;
-    readLogData(signal, dataLen, &databufptr.p->data[0]);
-    remainingLen -= dataLen;
-  }//while
+
+  readLogData(signal, remainingLen, tcConnectptr.p->keyInfoIVal);
 }//Dblqh::readKey()
 
 /* ------------------------------------------------------------------------- */
@@ -18222,15 +18482,25 @@
 /*                                                                           */
 /*       SUBROUTINE SHORT NAME = RLD                                         */
 /* --------------------------------------------------------------------------*/
-void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr) 
+void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal) 
 {
-  ndbrequire(noOfWords < 32);
   Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
   if ((logPos + noOfWords) >= ZPAGE_SIZE) {
     for (Uint32 i = 0; i < noOfWords; i++)
-      dataPtr[i] = readLogwordExec(signal);
+    {
+      /* Todo : Consider reading > 1 word at a time */
+      Uint32 word= readLogwordExec(signal);
+      bool ok= appendToSection(sectionIVal,
+                               &word,
+                               1);
+      ndbrequire(ok);
+    }
   } else {
-    MEMCOPY_NO_WORDS(dataPtr, &logPagePtr.p->logPageWord[logPos], noOfWords);
+    /* In one bite */
+    bool ok= appendToSection(sectionIVal,
+                             &logPagePtr.p->logPageWord[logPos],
+                             noOfWords);
+    ndbrequire(ok);
     logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + noOfWords;
   }//if
 }//Dblqh::readLogData()
@@ -18927,6 +19197,54 @@
 }//Dblqh::writeLogWord()
 
 /* --------------------------------------------------------------------------
+ * -------   WRITE MULTIPLE WORDS INTO THE LOG, CHECK FOR NEW PAGES   ------- 
+ * 
+ * ------------------------------------------------------------------------- */
+
+void Dblqh::writeLogWords(Signal* signal, const Uint32* data, Uint32 len)
+{
+  Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+  ndbrequire(logPos < ZPAGE_SIZE);
+  Uint32 wordsThisPage= ZPAGE_SIZE - logPos;
+
+  while (len >= wordsThisPage)
+  {
+    /* Fill rest of the log page */
+    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+                     data,
+                     wordsThisPage);
+    logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = ZPAGE_SIZE;
+    data+= wordsThisPage;
+    len-= wordsThisPage;
+    
+    /* Mark page completed and get a new one */
+    jam();
+    completedLogPage(signal, ZNORMAL, __LINE__);
+    seizeLogpage(signal);
+    initLogpage(signal);
+    logFilePtr.p->currentLogpage = logPagePtr.i;
+    logFilePtr.p->currentFilepage++;
+    
+    logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+    ndbrequire(logPos < ZPAGE_SIZE);
+    wordsThisPage= ZPAGE_SIZE - logPos;
+  }
+  
+  if (len > 0)
+  {
+    /* No need to worry about next page */
+    ndbassert( len < wordsThisPage );
+    /* Write partial log page */
+    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+                     data,
+                     len);
+    logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + len;
+  }
+
+  ndbassert( logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] < ZPAGE_SIZE );
+}
+
+/* --------------------------------------------------------------------------
  * -------         WRITE A NEXT LOG RECORD AND CHANGE TO NEXT MBYTE   ------- 
  *
  *       SUBROUTINE SHORT NAME:  WNL
@@ -19686,11 +20004,11 @@
 	   << endl;
     ndbout << " transid0 = " << hex << tcRec.p->transid[0]
 	   << " transid1 = " << hex << tcRec.p->transid[1]
-	   << " tupkeyData0 = " << tcRec.p->tupkeyData[0]
-	   << " tupkeyData1 = " << tcRec.p->tupkeyData[1]
+	   << " key[0] = " << getKeyInfoWordOrZero(tcRec.p, 0)
+	   << " key[1] = " << getKeyInfoWordOrZero(tcRec.p, 1)
 	   << endl;
-    ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
-	   << " tupkeyData3 = " << tcRec.p->tupkeyData[3]
+    ndbout << " key[2] = " << getKeyInfoWordOrZero(tcRec.p, 2)
+	   << " key[3] = " << getKeyInfoWordOrZero(tcRec.p, 3)
 	   << " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt
 	   << endl;
     switch (tcRec.p->transactionState) {
@@ -19894,7 +20212,6 @@
   if (arg == 2352 && signal->getLength() == 2)
   {
     jam();
-    Uint32 i;
     Uint32 opNo = signal->theData[1];
     TcConnectionrecPtr tcRec;
     if (opNo < ttcConnectrecFileSize)
@@ -19903,24 +20220,16 @@
       tcRec.i = opNo;
       ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
 
-      Uint32 keyLen = tcRec.p->primKeyLen;
       BaseString key;
-      for(i = 0; i<keyLen && i < 4; i++)
-      {
-	jam();
-	key.appfmt("0x%x ", tcRec.p->tupkeyData[i]);
-      }
-      
-      if (keyLen > 4)
-      {
-	jam();
-	tcConnectptr = tcRec;
-	sendKeyinfoAcc(signal, 4);
-	for (i = 4; i<keyLen; i++)
-	{
-	  jam();
-	  key.appfmt("0x%x ", signal->theData[i]);
-	}
+      if (tcRec.p->keyInfoIVal != RNIL)
+      {
+        jam();
+        SectionReader keyInfoReader(tcRec.p->keyInfoIVal,
+                                    g_sectionSegmentPool);
+        
+        Uint32 keyWord;
+        while (keyInfoReader.getWord(&keyWord))
+          key.appfmt("0x%x ", keyWord);
       }
       
       char buf[100];
@@ -20097,18 +20406,14 @@
   
   {
     (* traceopout) << "key=[" << hex;
-    Uint32 i;
-    for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
-      (* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
-    }
-    
-    DatabufPtr regDatabufptr;
-    regDatabufptr.i = regTcPtr->firstTupkeybuf;
-    while(i < regTcPtr->primKeyLen)
+    if (regTcPtr->keyInfoIVal != RNIL)
     {
-      ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-      for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
-	(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
+      SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                                  g_sectionSegmentPool);
+      
+      Uint32 keyWord;
+      while (keyInfoReader.getWord(&keyWord))
+        (* traceopout) << hex << keyWord << " ";
     }
     (* traceopout) << "] ";
   }

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-08-07 11:52:50 +0000
@@ -1975,7 +1975,6 @@
                         &signal->theData[KeyInfo::HeaderLength],
                         wordsInSignal))
   {
-    // TODO : Consider error insert
     jam();
     appendToSectionErrorLab(signal);
     return;
@@ -2991,8 +2990,6 @@
     bool ok= appendToSection(regCachePtr->keyInfoSectionI,
                              &TOptionalDataPtr[TkeyIndex],
                              keyInfoInTCKeyReq);
-    
-    // TODO : Consider error insert
     if (!ok)
     {
       jam();
@@ -3007,8 +3004,6 @@
       ok= appendToSection(regCachePtr->attrInfoSectionI,
                           &TOptionalDataPtr[TAIDataIndex],
                           titcLenAiInTckeyreq);
-
-      // TODO : Consider error insert
       if (!ok)
       {
         jam();
@@ -3532,13 +3527,27 @@
     reorg = 2;
   }
 
-  /* Todo : Determine whether to use a long Lqh key req based on the
-   * version of the target Lqh node
+  Uint32 inlineKeyLen= 0;
+  Uint32 inlineAttrLen= 0;
+
+  /* We normally send long LQHKEYREQ unless the
+   * destination cannot handle it or we are 
+   * testing
    */
-  regCachePtr->useLongLqhKeyReq= 0;
+  if (unlikely((version < NDBD_LONG_LQHKEYREQ) ||
+               ERROR_INSERTED(8069)))
+  {
+    /* Short LQHKEYREQ, with some key/attr data inline */
+    regCachePtr->useLongLqhKeyReq= 0;
+    inlineKeyLen= regCachePtr->keylen;
+    inlineAttrLen= regCachePtr->attrlength;
+  }
+  else
+    /* Long LQHKEYREQ, with key/attr data in long sections */
+    regCachePtr->useLongLqhKeyReq= 1;
 
   tslrAttrLen = 0;
-  LqhKeyReq::setAttrLen(tslrAttrLen, regCachePtr->attrlength);
+  LqhKeyReq::setAttrLen(tslrAttrLen, inlineAttrLen);
   /* ---------------------------------------------------------------------- */
   // Bit16 == 0 since StoredProcedures are not yet supported.
   /* ---------------------------------------------------------------------- */
@@ -3551,7 +3560,7 @@
   sig1 = regTcPtr->operation;
   sig2 = regTcPtr->dirtyOp;
   bool dirtyRead = (sig1 == ZREAD && sig2 == ZTRUE);
-  LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
+  LqhKeyReq::setKeyLen(Tdata10, inlineKeyLen);
   LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
   if (unlikely(version < NDBD_ROWID_VERSION))
   {
@@ -3647,9 +3656,42 @@
     nextPos++;
   }//if
 
+  // Reset trigger count
+  regTcPtr->accumulatingTriggerData.i = RNIL;  
+  regTcPtr->accumulatingTriggerData.p = NULL;  
+  regTcPtr->noFiredTriggers = 0;
+  regTcPtr->triggerExecutionCount = 0;
+
   if (regCachePtr->useLongLqhKeyReq)
   {
-    ndbassert(false); // TODO
+    /* Build long LQHKeyReq using Key + AttrInfo sections */
+    SectionHandle handle(this);
+    SegmentedSectionPtr keyInfoSection;
+    
+    getSection(keyInfoSection, regCachePtr->keyInfoSectionI);
+
+    handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+    handle.m_cnt= 1;
+
+    if (regCachePtr->attrlength != 0)
+    {
+      SegmentedSectionPtr attrInfoSection;
+
+      ndbassert(regCachePtr->attrInfoSectionI != RNIL);
+      getSection(attrInfoSection, regCachePtr->attrInfoSectionI);
+      
+      handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+      handle.m_cnt= 2;
+    }
+    
+    sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB, 
+               &handle);
+
+    /* Long sections were freed as part of sendSignal */
+    ndbassert( handle.m_cnt == 0 );
+    regCachePtr->keyInfoSectionI= RNIL;
+    regCachePtr->attrInfoSectionI= RNIL;
   }
   else
   {
@@ -3685,17 +3727,10 @@
       
       nextPos+= aiInLqhKeyReq;
     }
+
+    sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB);
   }
-
-  // Reset trigger count
-  regTcPtr->accumulatingTriggerData.i = RNIL;  
-  regTcPtr->accumulatingTriggerData.p = NULL;  
-  regTcPtr->noFiredTriggers = 0;
-  regTcPtr->triggerExecutionCount = 0;
-
-  // TODO : Long LqhKeyReq variant
-  sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
-             nextPos + LqhKeyReq::FixedSignalLength, JBB);
 }//Dbtc::sendlqhkeyreq()
 
 void Dbtc::packLqhkeyreq040Lab(Signal* signal,
@@ -3996,7 +4031,7 @@
   DEBUG("SignalDroppedRep received for GSN " << originalGSN);
 
   // TODO : Add handling for long signal variants as they
-  //        are added here (TCINDXREQ, SCANTABREQ etc.)
+  //        are added here (SCANTABREQ etc.)
 
   switch(originalGSN) {
   case GSN_TCKEYREQ:

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-08-07 11:52:50 +0000
@@ -784,15 +784,16 @@
 
   Uint32 m_undo_buffer_space; // In words
   union {
-    Uint32 firstAttrinbufrec; //Used until copyAttrinfo
+    Uint32 firstAttrinbufrec; //Used until copyAttrinfo TODO Remove
   };
+
   Uint32 m_any_value;
   union {
-    Uint32 lastAttrinbufrec; //Used until copyAttrinfo
+    Uint32 lastAttrinbufrec; //Used until copyAttrinfo TODO Remove
     Uint32 nextPool;
   };
-  Uint32 attrinbufLen; //only used during STORED_PROCDEF phase
-  Uint32 storedProcPtr; //only used during STORED_PROCDEF phase
+  Uint32 attrinbufLen; //only used during STORED_PROCDEF phase TODO Remove
+  Uint32 storedProcPtr; //only used during STORED_PROCDEF phase TODO Remove
   
   /*
    * From fragment i-value we can find fragment and table record
@@ -2055,9 +2056,9 @@
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  void sendLogAttrinfo(Signal* signal,
-                       Uint32 TlogSize,
-                       Operationrec * regOperPtr);
+  int sendLogAttrinfo(Signal* signal,
+                      Uint32 TlogSize,
+                      Operationrec * regOperPtr);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
@@ -2485,7 +2486,8 @@
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  void copyAttrinfo(Operationrec * regOperPtr, Uint32*  inBuffer);
+  void copyAttrinfo(Operationrec * regOperPtr, Uint32*  inBuffer, 
+                    Uint32 expectedLen, Uint32 attrInfoIVal);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-08-07 11:52:50 +0000
@@ -80,8 +80,37 @@
 }
 
 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
-                         Uint32* inBuffer)
+                         Uint32* inBuffer,
+                         Uint32 expectedLen,
+                         Uint32 attrInfoIVal)
 {
+  if (attrInfoIVal != RNIL)
+  {
+    /* AttrInfo is in segmented section */
+    ndbassert(regOperPtr->firstAttrinbufrec == RNIL);
+    ndbassert(regOperPtr->lastAttrinbufrec == RNIL);
+
+    // TODO : Add support for Stored procedure attrinfo
+
+    /* Check length */
+    SegmentedSectionPtr sectionPtr;
+    getSection(sectionPtr, attrInfoIVal);
+
+    ndbrequire(sectionPtr.sz == expectedLen);
+    ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
+
+    /* Copy attrInfo data into linear buffer */
+    copy(inBuffer, attrInfoIVal);
+    
+    regOperPtr->storedProcedureId= RNIL;
+    regOperPtr->m_any_value= 0;
+    
+    return;
+  }
+
+  /* Todo : Unify with code above when scan processing
+   * goes long
+   */
   AttrbufrecPtr copyAttrBufPtr;
   Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
   int RbufLen;
@@ -696,6 +725,17 @@
 
    req_struct.m_row_id.m_page_no = sig1;
    req_struct.m_row_id.m_page_idx = sig2;
+
+   /* Get AttrInfo section if this is a long TUPKEYREQ */
+   Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
+
+   /* If we have AttrInfo, check we expected it, and
+    * that we don't have AttrInfo by another means
+    */
+   ndbassert( (attrInfoIVal == RNIL) ||  
+              (tupKeyReq->attrBufLen > 0));
+   ndbassert( (attrInfoIVal == RNIL) || 
+              (regOperPtr->firstAttrinbufrec == RNIL ));
    
    Uint32 Roptype = regOperPtr->op_struct.op_type;
 
@@ -705,7 +745,10 @@
 				       Rstoredid) == ZOK);
    }
 
-   copyAttrinfo(regOperPtr, &cinBuffer[0]);
+   copyAttrinfo(regOperPtr, 
+                &cinBuffer[0], 
+                req_struct.attrinfo_len,
+                attrInfoIVal);
    
    Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
    if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
@@ -2031,7 +2074,7 @@
     req_struct->read_length= RattroutCounter;
     sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
     if (RlogSize > 0) {
-      sendLogAttrinfo(signal, RlogSize, regOperPtr);
+      return sendLogAttrinfo(signal, RlogSize, regOperPtr);
     }
     return 0;
   } else {
@@ -2047,25 +2090,41 @@
 /*               TLOG_START              FIRST INDEX TO LOG         */
 /*               TLOG_END                LAST INDEX + 1 TO LOG      */
 /* ---------------------------------------------------------------- */
-void Dbtup::sendLogAttrinfo(Signal* signal,
-                            Uint32 TlogSize,
-                            Operationrec *  const regOperPtr)
+int Dbtup::sendLogAttrinfo(Signal* signal,
+                           Uint32 TlogSize,
+                           Operationrec *  const regOperPtr)
 
 {
-  Uint32 TbufferIndex= 0;
+  /* Copy from Log buffer to segmented section,
+   * then attach to ATTRINFO and execute direct
+   * to LQH
+   */
+  ndbrequire( TlogSize > 0 );
+  Uint32 longSectionIVal= RNIL;
+  bool ok= appendToSection(longSectionIVal, 
+                           &clogMemBuffer[0],
+                           TlogSize);
+  if (unlikely(!ok))
+  {
+    /* Resource error, abort transaction */
+    terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
+    tupkeyErrorLab(signal);
+    return -1;
+  }
+  
+  /* Send a TUP_ATTRINFO signal to LQH, which contains
+   * the relevant user pointer and the attrinfo section's
+   * IVAL
+   */
   signal->theData[0]= regOperPtr->userpointer;
-  while (TlogSize > 22) {
-    MEMCOPY_NO_WORDS(&signal->theData[3],
-                     &clogMemBuffer[TbufferIndex],
-                     22);
-    EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
-    TbufferIndex += 22;
-    TlogSize -= 22;
-  }
-  MEMCOPY_NO_WORDS(&signal->theData[3],
-                   &clogMemBuffer[TbufferIndex],
-                   TlogSize);
-  EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
+  signal->theData[1]= TlogSize;
+  signal->theData[2]= longSectionIVal;
+
+  EXECUTE_DIRECT(DBLQH, 
+                 GSN_TUP_ATTRINFO, 
+                 signal, 
+                 3);
+  return 0;
 }
 
 inline

=== modified file 'storage/ndb/src/kernel/vm/LongSignal.hpp'
--- a/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-08-07 11:52:50 +0000
@@ -52,6 +52,7 @@
 void print(SegmentedSectionPtr ptr, FILE* out);
 void copy(SegmentedSectionPtr dst, Uint32 * src, Uint32 len);
 void copy(Uint32 * dst, SegmentedSectionPtr src);
+void copy(Uint32 * dst, Uint32 srcFirstIVal);
 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
 /* appendToSection : If firstSegmentIVal == RNIL, import */
 bool appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);

=== modified file 'storage/ndb/src/kernel/vm/SectionReader.cpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.cpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.cpp	2008-08-07 11:52:50 +0000
@@ -41,6 +41,17 @@
   }
 }
 
+SectionReader::SectionReader
+(Uint32 firstSectionIVal, class SectionSegmentPool& pool)
+  : m_pool(pool)
+{
+  SectionSegment* firstSeg= m_pool.getPtr(firstSectionIVal);
+  
+  m_pos = 0;
+  m_len = firstSeg->m_sz;
+  m_head = m_currentSegment = firstSeg;
+}
+
 void
 SectionReader::reset(){
   m_pos = 0;
@@ -137,7 +148,63 @@
   memcpy(dst, &p->theData[ind], 4 * len);
 
   m_pos += len;
-  m_currentSegment = p;
-  return true;
-}
-
+
+  /* If we read everything in the current
+   * segment, and there's another segment,
+   * move onto it for next time
+   */
+  if (unlikely((len == left) &&
+               (p->m_nextSegment != RNIL)))
+    p= m_pool.getPtr(p->m_nextSegment);
+  else
+    m_currentSegment = p;
+  
+  return true;
+}
+
+bool
+SectionReader::getWordsPtr(Uint32 maxLen,
+                           const Uint32*& readPtr,
+                           Uint32& actualLen)
+{
+  if(m_pos >= m_len)
+    return false;
+
+  /* We return a pointer to the current position,
+   * with length the minimum of
+   *  - significant words remaining in the whole section
+   *  - space remaining in the current segment
+   *  - maxLen from caller
+   */
+  const Uint32 sectionRemain= m_len - m_pos;
+  const Uint32 startInd = (m_pos % SectionSegment::DataLength);
+  const Uint32 segmentSpace = SectionSegment::DataLength - startInd;
+  SectionSegment * p = m_currentSegment;
+
+  const Uint32 remain= MIN(sectionRemain, segmentSpace);
+  actualLen= MIN(remain, maxLen);
+  readPtr= &p->theData[startInd];
+
+  /* If we've read everything in this segment, and
+   * there's another one, move onto it ready for 
+   * next time
+   */
+  if (((startInd + actualLen) == SectionSegment::DataLength) &&
+      (p->m_nextSegment != RNIL))
+    m_currentSegment= m_pool.getPtr(p->m_nextSegment);
+
+  m_pos += actualLen;
+  return true;
+};
+
+bool
+SectionReader::getWordsPtr(const Uint32*& readPtr,
+                           Uint32& actualLen)
+{
+  /* Cannot have more than SectionSegment::DataLength
+   * contiguous words
+   */
+  return getWordsPtr(SectionSegment::DataLength,
+                     readPtr,
+                     actualLen);
+}

=== modified file 'storage/ndb/src/kernel/vm/SectionReader.hpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.hpp	2008-08-07 11:52:50 +0000
@@ -22,15 +22,37 @@
 public:
   SectionReader(struct SegmentedSectionPtr &,
 		class SectionSegmentPool &);
+  SectionReader(Uint32 firstSectionIVal,
+                class SectionSegmentPool &);
 
+  /* reset : Set SectionReader to start of section */
   void reset();
+  /* step : Step over given number of words */
   bool step(Uint32 len);
+  /* getWord : Copy one word to dst + move forward */
   bool getWord(Uint32 * dst);
+  /* peekWord : Copy one word to dst */
   bool peekWord(Uint32 * dst) const ;
+  /* peekWords : Copy len words to dst */
   bool peekWords(Uint32 * dst, Uint32 len) const;
+  /* getSize : Get total size of section */
   Uint32 getSize() const;
+  /* getWords : Copy len words to dst + move forward */
   bool getWords(Uint32 * dst, Uint32 len);
 
+  /* getWordsPtr : Get const ptr to next contiguous
+   *               block of words
+   * In success case will return at least 1 word
+   */
+  bool getWordsPtr(const Uint32*& readPtr,
+                   Uint32& actualLen);
+  /* getWordsPtr : Get const ptr to at most maxLen words
+   * In success case will return at least 1 word
+   */
+  bool getWordsPtr(Uint32 maxLen,
+                   const Uint32*& readPtr,
+                   Uint32& actualLen);
+
 private:
   Uint32 m_pos;
   Uint32 m_len;

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-08-07 11:52:50 +0000
@@ -142,6 +142,16 @@
   copy(dst, g_sectionSegmentPool, src);
 }
 
+/* Copy variant which takes an IVal */
+void
+copy(Uint32* dst, Uint32 srcFirstIVal)
+{
+  SegmentedSectionPtr p;
+  getSection(p, srcFirstIVal);
+
+  copy(dst, p);
+} 
+
 /* Calculate number of segments to release based on section size
  * Always release one segment, even if size is zero
  */



