List:Commits« Previous MessageNext Message »
From:Magnus Svensson Date:August 8 2008 9:11am
Subject:bzr commit into mysql-5.1-telco-6.4 branch (msvensson:2700)
View as plain text  
#At file:///home/msvensson/mysql/6.4/

 2700 Magnus Svensson	2008-08-08 [merge]
      Merge
modified:
  mysql-test/suite/ndb/r/ndb_multi_row.result
  mysql-test/suite/ndb/r/ndb_trigger.result
  mysql-test/suite/ndb/t/ndb_alter_table_backup.test
  mysql-test/suite/ndb/t/ndb_trigger.test
  mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result
  mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result
  sql/ha_ndbcluster.cc
  sql/ha_ndbcluster_binlog.cc
  storage/ndb/include/kernel/signaldata/LqhKey.hpp
  storage/ndb/include/kernel/signaldata/NextScan.hpp
  storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp
  storage/ndb/include/kernel/signaldata/TupKey.hpp
  storage/ndb/include/mgmapi/mgmapi.h
  storage/ndb/include/ndb_version.h.in
  storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
  storage/ndb/include/ndbapi/NdbOperation.hpp
  storage/ndb/include/ndbapi/NdbTransaction.hpp
  storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
  storage/ndb/src/kernel/vm/LongSignal.hpp
  storage/ndb/src/kernel/vm/SectionReader.cpp
  storage/ndb/src/kernel/vm/SectionReader.hpp
  storage/ndb/src/kernel/vm/TransporterCallback.cpp
  storage/ndb/src/ndbapi/NdbOperationExec.cpp
  storage/ndb/src/ndbapi/NdbScanFilter.cpp
  storage/ndb/src/ndbapi/NdbScanOperation.cpp
  storage/ndb/src/ndbapi/NdbTransaction.cpp
  storage/ndb/test/ndbapi/testDict.cpp
  storage/ndb/test/ndbapi/testScanFilter.cpp
  storage/ndb/test/ndbapi/test_event.cpp
  storage/ndb/test/run-test/daily-basic-tests.txt
  storage/ndb/tools/restore/Restore.cpp

=== modified file 'mysql-test/suite/ndb/r/ndb_multi_row.result'
--- a/mysql-test/suite/ndb/r/ndb_multi_row.result	2007-11-01 14:08:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_multi_row.result	2008-08-07 05:29:44 +0000
@@ -63,6 +63,6 @@ t4
 drop table t1, t2, t3, t4;
 drop table if exists t1, t3, t4;
 Warnings:
-Error	155	Table 'test.t1' doesn't exist
-Error	155	Table 'test.t3' doesn't exist
-Error	155	Table 'test.t4' doesn't exist
+Note	1051	Unknown table 't1'
+Note	1051	Unknown table 't3'
+Note	1051	Unknown table 't4'

=== modified file 'mysql-test/suite/ndb/r/ndb_trigger.result'
--- a/mysql-test/suite/ndb/r/ndb_trigger.result	2007-07-04 20:38:53 +0000
+++ b/mysql-test/suite/ndb/r/ndb_trigger.result	2008-08-07 05:29:44 +0000
@@ -1,4 +1,7 @@
 drop table if exists t1, t2, t3, t4, t5;
+flush status;
+drop table if exists t1, t2, t3, t4, t5;
+flush status;
 create table t1 (id int primary key, a int not null, b decimal (63,30) default 0) engine=ndb;
 create table t2 (op char(1), a int not null, b decimal (63,30)) engine=ndb;
 create table t3  engine=ndb select 1 as i;
@@ -312,4 +315,29 @@ id	xy
 DROP TRIGGER t1_delete;
 DROP TRIGGER t4_delete;
 DROP TABLE t1, t2, t3, t4, t5;
+create table t1(a int, b varchar(10), c date) engine=ndb;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+SET new.c = date(now());
+End //
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name	event_object_table
+trg1	t1
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+SET new.c = date(now());
+End //
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name	event_object_table
+trg1	t1
+rename table t1 to t2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name	event_object_table
+trg1	t2
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name	event_object_table
+trg1	t2
+drop table t2;
 End of 5.1 tests

=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_backup.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_backup.test	2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_backup.test	2008-08-07 11:52:50 +0000
@@ -19,8 +19,8 @@ DROP TABLE IF EXISTS t1;
 --echo *********************************
 --echo * restore tables w/ new column from little endian
 --echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
 SHOW TABLES;
 SHOW CREATE TABLE t1;
 SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;
@@ -32,8 +32,8 @@ DROP TABLE t1;
 --echo *********************************
 --echo * restore tables w/ new column from big endian
 --echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
 SHOW TABLES;
 SHOW CREATE TABLE t1;
 SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;

=== modified file 'mysql-test/suite/ndb/t/ndb_trigger.test'
--- a/mysql-test/suite/ndb/t/ndb_trigger.test	2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_trigger.test	2008-08-07 11:51:34 +0000
@@ -1,5 +1,6 @@
 # Tests which involve triggers and NDB storage engine
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
+--source include/not_embedded.inc
 
 # 
 # Test for bug#18437 "Wrong values inserted with a before update
@@ -13,7 +14,12 @@
 #
 
 --disable_warnings
+connection server2;
 drop table if exists t1, t2, t3, t4, t5;
+flush status;
+connection server1;
+drop table if exists t1, t2, t3, t4, t5;
+flush status;
 --enable_warnings
 
 create table t1 (id int primary key, a int not null, b decimal (63,30) default 0) engine=ndb;
@@ -217,4 +223,39 @@ DROP TRIGGER t1_delete;
 DROP TRIGGER t4_delete;
 DROP TABLE t1, t2, t3, t4, t5;
 
+# Test for bug#36658
+# Verify that rename table
+# doesn't remove triggers
+
+connection server1;
+create table t1(a int, b varchar(10), c date) engine=ndb;
+delimiter //;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+   SET new.c = date(now());
+End //
+delimiter ;//
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server2;
+delimiter //;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+   SET new.c = date(now());
+End //
+delimiter ;//
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server1;
+rename table t1 to t2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server1;
+drop table t2;
+
 --echo End of 5.1 tests

=== modified file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result	2008-02-25 13:50:20 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result	2008-08-07 05:29:44 +0000
@@ -33,11 +33,11 @@ drop table mysqltest.t1;
 show binlog events from <binlog_start>;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
 master-bin.000001	#	Query	102	#	ALTER DATABASE mysqltest CHARACTER SET latin1
-master-bin.000001	#	Query	102	#	use `mysqltest`; drop table `t1`
+master-bin.000001	#	Query	102	#	use `mysqltest`; drop table `mysqltest`.`t1`
 show binlog events from <binlog_start>;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
 master-bin.000001	#	Query	102	#	ALTER DATABASE mysqltest CHARACTER SET latin1
-master-bin.000001	#	Query	102	#	use `mysqltest`; drop table `t1`
+master-bin.000001	#	Query	102	#	use `mysqltest`; drop table `mysqltest`.`t1`
 reset master;
 reset master;
 use test;
@@ -161,10 +161,10 @@ Log_name	Pos	Event_type	Server_id	End_lo
 master-bin1.000001	#	Query	1	#	use `test`; create table t1 (a int key) engine=ndb
 master-bin1.000001	#	Query	1	#	use `test`; create table t2 (a int key) engine=ndb
 master-bin1.000001	#	Query	1	#	use `test`; create table t3 (a int key) engine=ndb
-master-bin1.000001	#	Query	1	#	use `test`; rename table `test.t3` to `test.t4`
-master-bin1.000001	#	Query	1	#	use `test`; rename table `test.t2` to `test.t3`
-master-bin1.000001	#	Query	1	#	use `test`; rename table `test.t1` to `test.t2`
-master-bin1.000001	#	Query	1	#	use `test`; rename table `test.t4` to `test.t1`
+master-bin1.000001	#	Query	1	#	use `test`; rename table `test`.`t3` to `test`.`t4`
+master-bin1.000001	#	Query	1	#	use `test`; rename table `test`.`t2` to `test`.`t3`
+master-bin1.000001	#	Query	1	#	use `test`; rename table `test`.`t1` to `test`.`t2`
+master-bin1.000001	#	Query	1	#	use `test`; rename table `test`.`t4` to `test`.`t1`
 drop table t1;
 drop table t2;
 drop table t3;
@@ -188,7 +188,7 @@ master-bin1.000001	#	Table_map	102	#	tab
 master-bin1.000001	#	Write_rows	102	#	table_id: #
 master-bin1.000001	#	Write_rows	102	#	table_id: # flags: STMT_END_F
 master-bin1.000001	#	Query	102	#	COMMIT
-master-bin1.000001	#	Query	1	#	use `test`; rename table `test.t1` to `test.t2`
+master-bin1.000001	#	Query	1	#	use `test`; rename table `test`.`t1` to `test`.`t2`
 master-bin1.000001	#	Query	102	#	BEGIN
 master-bin1.000001	#	Table_map	102	#	table_id: # (test.t2)
 master-bin1.000001	#	Table_map	102	#	table_id: # (mysql.ndb_apply_status)

=== modified file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result	2008-06-19 07:18:42 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result	2008-08-07 05:29:44 +0000
@@ -53,8 +53,8 @@ use mysqltest;
 insert into t2 values (1,1);
 show binlog events from <binlog_start>;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
-master-bin1.000001	#	Query	1	#	use `mysqltest`; drop table `t1`
-master-bin1.000001	#	Query	1	#	use `mysqltest`; drop table `t2`
+master-bin1.000001	#	Query	1	#	use `mysqltest`; drop table `mysqltest`.`t1`
+master-bin1.000001	#	Query	1	#	use `mysqltest`; drop table `mysqltest`.`t2`
 master-bin1.000001	#	Query	1	#	use `mysqltest`; create table t1 (d int key, e int) engine=ndb
 master-bin1.000001	#	Query	1	#	use `mysqltest`; create table t2 (d int key, e int) engine=ndb
 master-bin1.000001	#	Query	102	#	BEGIN

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2008-07-01 07:53:42 +0000
+++ b/sql/ha_ndbcluster.cc	2008-08-07 13:17:36 +0000
@@ -158,6 +158,7 @@ static uchar *ndbcluster_get_key(NDB_SHA
 static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
                                     const NdbRecord *, struct Ndb_statistics *);
 
+THD *injector_thd= 0;
 
 // Util thread variables
 pthread_t ndb_util_thread;
@@ -6955,6 +6956,17 @@ int ha_ndbcluster::rename_table(const ch
 
   DBUG_ENTER("ha_ndbcluster::rename_table");
   DBUG_PRINT("info", ("Renaming %s to %s", from, to));
+
+  if (thd == injector_thd)
+  {
+    /*
+      Table was renamed remotely is already
+      renamed inside ndb.
+      Just rename .ndb file.
+     */
+    DBUG_RETURN(handler::rename_table(from, to));
+  }
+
   set_dbname(from, old_dbname);
   set_dbname(to, new_dbname);
   set_tabname(from);
@@ -7025,7 +7037,7 @@ int ha_ndbcluster::rename_table(const ch
     }
     ERR_RETURN(ndb_error);
   }
-  
+
   // Rename .ndb file
   if ((result= handler::rename_table(from, to)))
   {
@@ -7323,6 +7335,17 @@ int ha_ndbcluster::delete_table(const ch
   int error= 0;
   DBUG_ENTER("ha_ndbcluster::delete_table");
   DBUG_PRINT("enter", ("name: %s", name));
+
+  if (thd == injector_thd)
+  {
+    /*
+      Table was dropped remotely is already
+      dropped inside ndb.
+      Just drop local files.
+     */
+    DBUG_RETURN(handler::delete_table(name));
+  }
+
   set_dbname(name);
   set_tabname(name);
 

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2008-06-25 13:06:41 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2008-08-07 13:17:36 +0000
@@ -71,7 +71,7 @@ my_bool ndb_binlog_is_ready= FALSE;
   Has one sole purpose, for setting the in_use table member variable
   in get_share(...)
 */
-THD *injector_thd= 0;
+extern THD * injector_thd; // Declared in ha_ndbcluster.cc
 
 /*
   Global reference to ndb injector thd object.
@@ -140,12 +140,6 @@ static void ndb_free_schema_object(NDB_S
                                    bool have_lock);
 
 /*
-  Global variables for holding the ndb_binlog_index table reference
-*/
-static TABLE *ndb_binlog_index= 0;
-static TABLE_LIST binlog_tables;
-
-/*
   Helper functions
 */
 
@@ -308,15 +302,6 @@ static void run_query(THD *thd, char *bu
   thd->transaction.all= save_thd_transaction_all;
   thd->transaction.stmt= save_thd_transaction_stmt;
   thd->net= save_thd_net;
-
-  if (thd == injector_thd)
-  {
-    /*
-      running the query will close all tables, including the ndb_binlog_index
-      used in injector_thd
-    */
-    ndb_binlog_index= 0;
-  }
 }
 
 static void
@@ -1307,8 +1292,9 @@ int ndbcluster_log_schema_op(THD *thd,
       DBUG_RETURN(0);
     /* redo the drop table query as is may contain several tables */
     query= tmp_buf2;
-    query_length= (uint) (strxmov(tmp_buf2, "drop table `",
-                                  table_name, "`", NullS) - tmp_buf2);
+    query_length= (uint) (strxmov(tmp_buf2, "drop table ",
+                                  "`", db, "`", ".",
+                                  "`", table_name, "`", NullS) - tmp_buf2);
     type_str= "drop table";
     break;
   case SOT_RENAME_TABLE_PREPARE:
@@ -1318,9 +1304,11 @@ int ndbcluster_log_schema_op(THD *thd,
   case SOT_RENAME_TABLE:
     /* redo the rename table query as is may contain several tables */
     query= tmp_buf2;
-    query_length= (uint) (strxmov(tmp_buf2, "rename table `",
-                                  db, ".", table_name, "` to `",
-                                  new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
+    query_length= (uint) (strxmov(tmp_buf2, "rename table ",
+                                  "`", db, "`", ".",
+                                  "`", table_name, "` to ",
+                                  "`", new_db, "`", ".",
+                                  "`", new_table_name, "`", NullS) - tmp_buf2);
     type_str= "rename table";
     break;
   case SOT_CREATE_TABLE:
@@ -1863,17 +1851,53 @@ ndb_binlog_thread_handle_schema_event(TH
       if (schema->node_id != node_id)
       {
         int log_query= 0, post_epoch_unlock= 0;
+        char errmsg[MYSQL_ERRMSG_SIZE];
+
         switch (schema_type)
         {
-        case SOT_DROP_TABLE:
-          // fall through
         case SOT_RENAME_TABLE:
           // fall through
         case SOT_RENAME_TABLE_NEW:
-          post_epoch_log_list->push_back(schema, mem_root);
-          /* acknowledge this query _after_ epoch completion */
-          post_epoch_unlock= 1;
-          break;
+        {
+          uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+                             "NDB Binlog: Skipping renaming locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
+                             schema->db, schema->name, schema->query,
+                             schema->node_id);
+          
+          errmsg[end]= '\0';
+        }
+        // fall through
+        case SOT_DROP_TABLE:
+          if (schema_type == SOT_DROP_TABLE)
+          {
+            uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+                               "NDB Binlog: Skipping dropping locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
+                               schema->db, schema->name, schema->query,
+                               schema->node_id);
+            errmsg[end]= '\0';
+          }
+          if (! ndbcluster_check_if_local_table(schema->db, schema->name))
+          {
+            const int no_print_error[1]=
+              {ER_BAD_TABLE_ERROR}; /* ignore missing table */
+            run_query(thd, schema->query,
+                      schema->query + schema->query_length,
+                      no_print_error, //   /* don't print error */
+                      TRUE); //  /* don't binlog the query */
+
+            /* binlog dropping table after any table operations */
+            post_epoch_log_list->push_back(schema, mem_root);
+            /* acknowledge this query _after_ epoch completion */
+            post_epoch_unlock= 1;
+          }
+          else
+          {
+            /* Tables exists as a local table, leave it */
+            DBUG_PRINT("info", ((const char *) errmsg));
+            sql_print_error((const char *) errmsg);
+            log_query= 1;
+          }
+          // Fall through
 	case SOT_TRUNCATE_TABLE:
         {
           char key[FN_REFLEN];
@@ -1909,6 +1933,8 @@ ndb_binlog_thread_handle_schema_event(TH
             free_share(&share);
           }
         }
+        if (schema_type != SOT_TRUNCATE_TABLE)
+          break;
         // fall through
         case SOT_CREATE_TABLE:
           pthread_mutex_lock(&LOCK_open);
@@ -2531,8 +2557,8 @@ struct ndb_binlog_index_row {
 /*
   Open the ndb_binlog_index table
 */
-static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
-                                 TABLE **ndb_binlog_index)
+static int open_and_lock_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
+                                          TABLE **ndb_binlog_index)
 {
   static char repdb[]= NDB_REP_DB;
   static char reptable[]= NDB_REP_TABLE;
@@ -2544,9 +2570,8 @@ static int open_ndb_binlog_index(THD *th
   tables->lock_type= TL_WRITE;
   thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
   tables->required_type= FRMTYPE_TABLE;
-  uint counter;
   thd->clear_error();
-  if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
+  if (simple_open_n_lock_tables(thd, tables))
   {
     if (thd->killed)
       sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
@@ -2563,32 +2588,6 @@ static int open_ndb_binlog_index(THD *th
   return 0;
 }
 
-
-/*
-  Check if ndb_binlog_index is lockable, if not close, to free for
-  other threads use
-*/
-
-static void
-ndb_check_ndb_binlog_index(THD *thd)
-{
-  if (!ndb_binlog_index)
-    return;
-
-  bool need_reopen;
-  if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
-  {
-    ndb_binlog_index= 0;
-    close_thread_tables(thd);
-    ndb_binlog_index= 0;
-    return;
-  }
-
-  mysql_unlock_tables(thd, thd->lock);
-  thd->lock= 0;
-  return;
-}
-
 /*
   Insert one row in the ndb_binlog_index
 */
@@ -2597,8 +2596,10 @@ static int
 ndb_add_ndb_binlog_index(THD *thd, ndb_binlog_index_row *row)
 {
   int error= 0;
-  bool need_reopen;
   ndb_binlog_index_row *first= row;
+  TABLE *ndb_binlog_index= 0;
+  TABLE_LIST binlog_tables;
+
   /*
     Turn of binlogging to prevent the table changes to be written to
     the binary log.
@@ -2606,28 +2607,12 @@ ndb_add_ndb_binlog_index(THD *thd, ndb_b
   ulong saved_options= thd->options;
   thd->options&= ~(OPTION_BIN_LOG);
 
-  for ( ; ; ) /* loop for need_reopen */
-  {
-    if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
-    {
-      error= -1;
-      goto add_ndb_binlog_index_err;
-    }
 
-    if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
-    {
-      if (need_reopen)
-      {
-        TABLE_LIST *p_binlog_tables= &binlog_tables;
-        close_tables_for_reopen(thd, &p_binlog_tables);
-        ndb_binlog_index= 0;
-        continue;
-      }
-      sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
-      error= -1;
-      goto add_ndb_binlog_index_err;
-    }
-    break;
+  if (open_and_lock_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
+  {
+    sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
+    error= -1;
+    goto add_ndb_binlog_index_err;
   }
 
   /*
@@ -2676,13 +2661,8 @@ ndb_add_ndb_binlog_index(THD *thd, ndb_b
     }
   } while (row);
 
-  mysql_unlock_tables(thd, thd->lock);
-  thd->lock= 0;
-  thd->options= saved_options;
-  return 0;
 add_ndb_binlog_index_err:
   close_thread_tables(thd);
-  ndb_binlog_index= 0;
   thd->options= saved_options;
   return error;
 }
@@ -5228,15 +5208,6 @@ restart:
          !ndb_binlog_running))
       break; /* Shutting down server */
 
-    if (ndb_binlog_index && ndb_binlog_index->s->version < refresh_version)
-    {
-      if (ndb_binlog_index->s->version < refresh_version)
-      {
-        close_thread_tables(thd);
-        ndb_binlog_index= 0;
-      }
-    }
-
     MEM_ROOT **root_ptr=
       my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
     MEM_ROOT *old_root= *root_ptr;
@@ -5294,17 +5265,8 @@ restart:
       }
     }
 
-    /*
-      For each epoch atleast one check should be made to see if ndb_binlog_index
-      table is lockable.  Variable 'do_check_ndb_binlog_index' is used as a flag
-      to signal if a check is needed for current epoch.
-    */
-    int do_check_ndb_binlog_index= 1;
-
     if (res > 0)
     {
-      do_check_ndb_binlog_index= 1;
-
       DBUG_PRINT("info", ("pollEvents res: %d", res));
       thd->proc_info= "Processing events";
       NdbEventOperation *pOp= i_ndb->nextEvent();
@@ -5577,19 +5539,12 @@ restart:
           if (ndb_update_ndb_binlog_index)
           {
             ndb_add_ndb_binlog_index(thd, rows);
-            do_check_ndb_binlog_index= 0;
           }
           ndb_latest_applied_binlog_epoch= gci;
           break;
         }
         ndb_latest_handled_binlog_epoch= gci;
 
-        if (do_check_ndb_binlog_index)
-        {
-          ndb_check_ndb_binlog_index(thd);
-          do_check_ndb_binlog_index= 0;
-        }
-
 #ifdef RUN_NDB_BINLOG_TIMER
         gci_timer.stop();
         sql_print_information("gci %ld event_count %d write time "
@@ -5622,25 +5577,16 @@ restart:
     free_root(&mem_root, MYF(0));
     *root_ptr= old_root;
     ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
-
-    if (do_check_ndb_binlog_index)
-    {
-      ndb_check_ndb_binlog_index(thd);
-      do_check_ndb_binlog_index= 0;
-    }
   }
   if (do_ndbcluster_binlog_close_connection == BCCC_restart)
   {
     ndb_binlog_tables_inited= FALSE;
-    close_thread_tables(thd);
-    ndb_binlog_index= 0;
     goto restart;
   }
 err:
   sql_print_information("Stopping Cluster Binlog");
   DBUG_PRINT("info",("Shutting down cluster binlog thread"));
   thd->proc_info= "Shutting down";
-  close_thread_tables(thd);
   pthread_mutex_lock(&injector_mutex);
   /* don't mess with the injector_ndb anymore from other threads */
   injector_thd= 0;

=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2008-08-07 11:52:50 +0000
@@ -40,6 +40,10 @@ public:
   STATIC_CONST( MaxKeyInfo = 4 );
   STATIC_CONST( MaxAttrInfo = 5);
 
+  /* Long LQHKEYREQ definitions */
+  STATIC_CONST( KeyInfoSectionNum = 0 );
+  STATIC_CONST( AttrInfoSectionNum = 1 );
+
 private:
 
   /**
@@ -147,7 +151,8 @@ private:
 /**
  * Request Info
  *
- * k = Key len                - 10 Bits (0-9) max 1023
+ * k = Key len                - (Short LQHKEYREQ only) 
+ *                              10 Bits (0-9) max 1023
  * l = Last Replica No        - 2  Bits -> Max 3 (10-11)
 
  IF version < NDBD_ROWID_VERSION
@@ -158,7 +163,8 @@ private:
  * s = Simple indicator       - 1  Bit (18)
  * o = Operation              - 3  Bits (19-21)
  * r = Sequence replica       - 2  Bits (22-23)
- * a = Attr Info in LQHKEYREQ - 3  Bits (24-26)
+ * a = Attr Info in LQHKEYREQ - (Short LQHKEYREQ only)
+                                3  Bits (24-26)
  * c = Same client and tc     - 1  Bit (27)
  * u = Read Len Return Ind    - 1  Bit (28)
  * m = Commit ack marker      - 1  Bit (29)
@@ -167,10 +173,17 @@ private:
  * g = gci flag               - 1  Bit (12)
  * n = NR copy                - 1  Bit (13)
 
+ * Short LQHKEYREQ :
  *             1111111111222222222233
  *   01234567890123456789012345678901
  *   kkkkkkkkkklltttpdisooorraaacumxz
  *   kkkkkkkkkkllgn pdisooorraaacumxz
+ *
+ * Long LQHKEYREQ :
+ *             1111111111222222222233
+ *   01234567890123456789012345678901
+ *             llgn pdisooorr   cumxz
+ *
  */
 
 #define RI_KEYLEN_SHIFT      (0)
@@ -200,7 +213,8 @@ private:
 /**
  * Scan Info
  *
- * a = Attr Len                 - 16 Bits -> max 65535 (0-15)
+ * a = Attr Len                 - (Short LQHKEYREQ only)
+ *                                 16 Bits -> max 65535 (0-15)
  * p = Stored Procedure Ind     -  1 Bit (16)
  * d = Distribution key         -  8 Bit  -> max 255 (17-24)
  * t = Scan take over indicator -  1 Bit (25)
@@ -208,7 +222,8 @@ private:
  *
  *           1111111111222222222233
  * 01234567890123456789012345678901
- * aaaaaaaaaaaaaaaapddddddddtmm
+ * aaaaaaaaaaaaaaaapddddddddtmm       (Short LQHKEYREQ)
+ *                 pddddddddtmm       (Long LQHKEYREQ)
  */
 
 #define SI_ATTR_LEN_MASK     (65535)

=== modified file 'storage/ndb/include/kernel/signaldata/NextScan.hpp'
--- a/storage/ndb/include/kernel/signaldata/NextScan.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/NextScan.hpp	2008-08-07 11:52:50 +0000
@@ -47,6 +47,7 @@ class NextScanConf {
 public:
   // length is less if no keyinfo or no next result
   STATIC_CONST( SignalLength = 11 );
+  STATIC_CONST( SignalLengthNoKeyInfo = 7 );
 private:
   Uint32 scanPtr;               // scan record in LQH
   Uint32 accOperationPtr;

=== modified file 'storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	2008-08-07 11:52:50 +0000
@@ -21,10 +21,11 @@
 class SignalDroppedRep {
 
   /**
-   * Reciver(s)
+   * Receiver(s)
    */
   friend class SimulatedBlock;
   friend class Dbtc;
+  friend class Dblqh;
 
   /**
    * Sender (TransporterCallback.cpp)

=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp	2008-08-07 11:52:50 +0000
@@ -35,7 +35,7 @@ class TupKeyReq {
   friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
 
 public:
-  STATIC_CONST( SignalLength = 18 );
+  STATIC_CONST( SignalLength = 19 );
 
 private:
 
@@ -60,6 +60,7 @@ private:
   Uint32 disk_page;
   Uint32 m_row_id_page_no;
   Uint32 m_row_id_page_idx;
+  Uint32 attrInfoIVal;
 };
 
 class TupKeyConf {

=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h	2008-04-07 10:26:34 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h	2008-08-07 04:13:41 +0000
@@ -444,10 +444,6 @@ extern "C" {
   int ndb_mgm_number_of_mgmd_in_connect_string(NdbMgmHandle handle);
 
   int ndb_mgm_set_configuration_nodeid(NdbMgmHandle handle, int nodeid);
-  int ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle);
-  int ndb_mgm_get_connected_port(NdbMgmHandle handle);
-  const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
-  const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
 
   /**
    * Set local bindaddress

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2008-08-07 11:52:50 +0000
@@ -99,6 +99,8 @@ Uint32 ndbGetOwnVersion();
 #define NDBD_MICRO_GCP_62 NDB_MAKE_VERSION(6,2,5)
 #define NDBD_MICRO_GCP_63 NDB_MAKE_VERSION(6,3,2)
 #define NDBD_RAW_LCP MAKE_VERSION(6,3,11)
+#define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
+
 
 static
 inline

=== modified file 'storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2008-08-07 06:24:52 +0000
@@ -309,6 +309,7 @@ int
 NdbIndexScanOperation::setBound(const char* attr, int type, const void* value,
                                 Uint32 len)
 {
+  (void)len;  // unused
   return setBound(attr, type, value);
 }
 
@@ -317,6 +318,7 @@ int
 NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* value,
                                 Uint32 len)
 {
+  (void)len;  // unused
   return setBound(anAttrId, type, value);
 }
 

=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-08-07 13:17:36 +0000
@@ -1457,6 +1457,9 @@ inline
 int
 NdbOperation::checkMagicNumber(bool b)
 {
+#ifndef NDB_NO_DROPPED_SIGNAL
+  (void)b;  // unused param in this context
+#endif
   if (theMagicNumber != 0xABCDEF01){
 #ifdef NDB_NO_DROPPED_SIGNAL
     if(b) abort();
@@ -1576,8 +1579,10 @@ NdbOperation::NdbCon(NdbTransaction* aNd
 
 inline
 int
-NdbOperation::equal(const char* anAttrName, const char* aValue, Uint32 len)
+NdbOperation::equal(const char* anAttrName, const char* aValue,
+                    Uint32 len)
 {
+  (void)len;   // unused
   return equal(anAttrName, aValue);
 }
 
@@ -1611,8 +1616,10 @@ NdbOperation::equal(const char* anAttrNa
 
 inline
 int
-NdbOperation::equal(Uint32 anAttrId, const char* aValue, Uint32 len)
+NdbOperation::equal(Uint32 anAttrId, const char* aValue,
+                    Uint32 len)
 {
+  (void)len;   // unused
   return equal(anAttrId, aValue);
 }
 
@@ -1646,8 +1653,10 @@ NdbOperation::equal(Uint32 anAttrId, Uin
 
 inline
 int
-NdbOperation::setValue(const char* anAttrName, const char* aValue, Uint32 len)
+NdbOperation::setValue(const char* anAttrName, const char* aValue,
+                       Uint32 len)
 {
+  (void)len;   // unused
   return setValue(anAttrName, aValue);
 }
 
@@ -1695,8 +1704,10 @@ NdbOperation::setValue(const char* anAtt
 
 inline
 int
-NdbOperation::setValue(Uint32 anAttrId, const char* aValue, Uint32 len)
+NdbOperation::setValue(Uint32 anAttrId, const char* aValue,
+                       Uint32 len)
 {
+  (void)len;   // unused
   return setValue(anAttrId, aValue);
 }
 

=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2008-08-07 06:24:52 +0000
@@ -1093,6 +1093,7 @@ inline
 void
 NdbTransaction::set_send_size(Uint32 send_size)
 {
+  (void)send_size; //unused
   return;
 }
 

=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2008-08-07 11:52:50 +0000
@@ -145,12 +145,16 @@ printLQHKEYREQ(FILE * output, const Uint
       fprintf(output, "H\'%.8x ", sig->variableData[nextPos]);
     fprintf(output, "\n");
   } else {
-    fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
-            "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
-            sig->variableData[nextPos+0], sig->variableData[nextPos+1], 
-            sig->variableData[nextPos+2], sig->variableData[nextPos+3],
-            sig->variableData[nextPos+4]);
-    nextPos += 5;
+    /* Only have section sizes if it's a short LQHKEYREQ */
+    if (LqhKeyReq::getAIInLqhKeyReq(reqInfo) == LqhKeyReq::MaxAttrInfo)
+    {
+      fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
+              "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
+              sig->variableData[nextPos+0], sig->variableData[nextPos+1], 
+              sig->variableData[nextPos+2], sig->variableData[nextPos+3],
+              sig->variableData[nextPos+4]);
+      nextPos += 5;
+    }
   }
   return true;
 }

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-08-07 11:52:50 +0000
@@ -1920,9 +1920,9 @@ public:
       LOG_CONNECTED = 3
     };
     ConnectState connectState;
-    UintR copyCountWords;    
-    UintR firstAttrinfo[5];
-    UintR tupkeyData[4];
+    UintR copyCountWords;
+    Uint32 keyInfoIVal;
+    Uint32 attrInfoIVal;
     UintR transid[2];
     AbortState abortState;
     UintR accConnectrec;
@@ -1931,15 +1931,15 @@ public:
     UintR tcTimer;
     UintR currReclenAi;
     UintR currTupAiLen;
-    UintR firstAttrinbuf;
-    UintR firstTupkeybuf;
+    UintR firstAttrinbuf;  // TODO : Remove
+    UintR firstTupkeybuf;  // TODO : Remove
     UintR fragmentid;
     UintR fragmentptr;
     UintR gci_hi;
     UintR gci_lo;
     UintR hashValue;
-    UintR lastTupkeybuf;
-    UintR lastAttrinbuf;
+    UintR lastTupkeybuf;   // TODO : Remove
+    UintR lastAttrinbuf;   // TODO : Remove
     /**
      * Each operation (TcConnectrec) can be stored in max one out of many 
      * lists.
@@ -2013,6 +2013,10 @@ public:
     Uint8 m_disk_table;
     Uint8 m_use_rowid;
     Uint8 m_dealloc;
+    enum op_flags {
+      OP_ISLONGREQ         = 0x1,
+      OP_SAVEATTRINFO      = 0x2};
+    Uint32 m_flags;
     Uint32 m_log_part_ptr_i;
     Local_key m_row_id;
 
@@ -2079,6 +2083,7 @@ private:
   void execEXEC_SRREQ(Signal* signal);
   void execEXEC_SRCONF(Signal* signal);
   void execREAD_PSEUDO_REQ(Signal* signal);
+  void execSIGNAL_DROPPED_REP(Signal* signal);
 
   void execDUMP_STATE_ORD(Signal* signal);
   void execACC_ABORTCONF(Signal* signal);
@@ -2324,7 +2329,7 @@ private:
   void readExecSrNewMbyte(Signal* signal);
   void readExecSr(Signal* signal);
   void readKey(Signal* signal);
-  void readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr);
+  void readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal);
   void readLogHeader(Signal* signal);
   Uint32 readLogword(Signal* signal);
   Uint32 readLogwordExec(Signal* signal);
@@ -2343,6 +2348,7 @@ private:
   void removePageRef(Signal* signal);
   Uint32 returnExecLog(Signal* signal);
   int saveTupattrbuf(Signal* signal, Uint32* dataPtr, Uint32 length);
+  int saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len);
   void seizeAddfragrec(Signal* signal);
   void seizeAttrinbuf(Signal* signal);
   Uint32 seize_attrinbuf();
@@ -2370,6 +2376,7 @@ private:
   void writeKey(Signal* signal);
   void writeLogHeader(Signal* signal);
   void writeLogWord(Signal* signal, Uint32 data);
+  void writeLogWords(Signal* signal, const Uint32* data, Uint32 len);
   void writeNextLog(Signal* signal);
   void errorReport(Signal* signal, int place);
   void warningReport(Signal* signal, int place);
@@ -2465,7 +2472,7 @@ private:
   void nextScanConfScanLab(Signal* signal);
   void nextScanConfCopyLab(Signal* signal);
   void continueScanNextReqLab(Signal* signal);
-  void keyinfoLab(const Uint32 * src, const Uint32 * end);
+  bool keyinfoLab(const Uint32 * src, Uint32 len);
   void copySendTupkeyReqLab(Signal* signal);
   void storedProcConfScanLab(Signal* signal);
   void storedProcConfCopyLab(Signal* signal);
@@ -2576,6 +2583,8 @@ private:
   void exec_acckeyreq(Signal*, Ptr<TcConnectionrec>);
   int compare_key(const TcConnectionrec*, const Uint32 * ptr, Uint32 len);
   void nr_copy_delete_row(Signal*, Ptr<TcConnectionrec>, Local_key*, Uint32);
+  Uint32 getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr, 
+                              Uint32 offset);
 public:
   struct Nr_op_info
   {

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-08-07 11:52:50 +0000
@@ -200,6 +200,8 @@ Dblqh::Dblqh(Block_context& ctx, Uint32 
 
   addRecSignal(GSN_ALTER_TAB_REQ, &Dblqh::execALTER_TAB_REQ);
 
+  addRecSignal(GSN_SIGNAL_DROPPED_REP, &Dblqh::execSIGNAL_DROPPED_REP, true);
+
   // Trigger signals, transit to from TUP
   addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &Dblqh::execCREATE_TRIG_IMPL_REQ);
   addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &Dblqh::execCREATE_TRIG_IMPL_CONF);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-07-23 11:36:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-08-07 11:52:50 +0000
@@ -63,9 +63,12 @@
 #include <signaldata/RestoreImpl.hpp>
 #include <signaldata/KeyInfo.hpp>
 #include <signaldata/AttrInfo.hpp>
+#include <signaldata/TransIdAI.hpp>
 #include <KeyDescriptor.hpp>
 #include <signaldata/RouteOrd.hpp>
 #include <signaldata/FsRef.hpp>
+#include <SectionReader.hpp>
+#include <signaldata/SignalDroppedRep.hpp>
 
 #include "../suma/Suma.hpp"
 
@@ -214,11 +217,6 @@ void Dblqh::execCONTINUEB(Signal* signal
     ndbout << " tcNodeFailrec = " << tcConnectptr.p->tcNodeFailrec;
     ndbout << " activeCreat = " << tcConnectptr.p->activeCreat;
     ndbout << endl;
-    ndbout << "tupkeyData0 = " << tcConnectptr.p->tupkeyData[0];
-    ndbout << "tupkeyData1 = " << tcConnectptr.p->tupkeyData[1];
-    ndbout << "tupkeyData2 = " << tcConnectptr.p->tupkeyData[2];
-    ndbout << "tupkeyData3 = " << tcConnectptr.p->tupkeyData[3];
-    ndbout << endl;
     ndbout << "abortState = " << tcConnectptr.p->abortState;
     ndbout << "listState = " << tcConnectptr.p->listState;
     ndbout << endl;
@@ -3185,25 +3183,45 @@ Uint32 Dblqh::handleLongTupKey(Signal* s
 			       Uint32 len) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  Uint32 dataPos = 0;
+  TcConnectionrec::TransactionState state = regTcPtr->transactionState;
   Uint32 total = regTcPtr->save1 + len;
   Uint32 primKeyLen = regTcPtr->primKeyLen;
-  while (dataPos < len) {
-    if (cfirstfreeDatabuf == RNIL) {
+  if (state == TcConnectionrec::WAIT_TUPKEYINFO)
+  {
+    /* Different handling for TUPKEYREQ to SCANAI 
+     * TODO : Converge once long scan implemented
+     */
+    bool ok= appendToSection(regTcPtr->keyInfoIVal,
+                             dataPtr,
+                             len);
+    if (unlikely(!ok))
+    {
       jam();
       return ZGET_DATAREC_ERROR;
-    }//if
-    seizeTupkeybuf(signal);
-    Databuf * const regDataPtr = databufptr.p;
-    Uint32 data0 = dataPtr[dataPos];
-    Uint32 data1 = dataPtr[dataPos + 1];
-    Uint32 data2 = dataPtr[dataPos + 2];
-    Uint32 data3 = dataPtr[dataPos + 3];
-    regDataPtr->data[0] = data0;
-    regDataPtr->data[1] = data1;
-    regDataPtr->data[2] = data2;
-    regDataPtr->data[3] = data3;
-    dataPos += 4;
+    }
+  }
+  else
+  {
+    /* Scan_AI KeyInfo */
+    Uint32 dataPos = 0;
+    
+    while (dataPos < len) {
+      if (cfirstfreeDatabuf == RNIL) {
+        jam();
+        return ZGET_DATAREC_ERROR;
+      }//if
+      seizeTupkeybuf(signal);
+      Databuf * const regDataPtr = databufptr.p;
+      Uint32 data0 = dataPtr[dataPos];
+      Uint32 data1 = dataPtr[dataPos + 1];
+      Uint32 data2 = dataPtr[dataPos + 2];
+      Uint32 data3 = dataPtr[dataPos + 3];
+      regDataPtr->data[0] = data0;
+      regDataPtr->data[1] = data1;
+      regDataPtr->data[2] = data2;
+      regDataPtr->data[3] = data3;
+      dataPos += 4;
+    }
   }
 
   regTcPtr->save1 = total;
@@ -3329,23 +3347,42 @@ Dblqh::receive_attrinfo(Signal* signal, 
 void Dblqh::execTUP_ATTRINFO(Signal* signal) 
 {
   TcConnectionrec *regTcConnectionrec = tcConnectionrec;
-  Uint32 length = signal->length() - 3;
   Uint32 tcIndex = signal->theData[0];
   Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
   jamEntry();
   tcConnectptr.i = tcIndex;
   ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
-  ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::WAIT_TUP);
-  if (saveTupattrbuf(signal, &signal->theData[3], length) == ZOK) {
-    return;
-  } else {
-    jam();
-/* ------------------------------------------------------------------------- */
-/* WE ARE WAITING FOR RESPONSE FROM TUP HERE. THUS WE NEED TO                */
-/* GO THROUGH THE STATE MACHINE FOR THE OPERATION.                           */
-/* ------------------------------------------------------------------------- */
-    localAbortStateHandlerLab(signal);
-  }//if
+  TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+  ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP);
+  
+  /* TUP_ATTRINFO signal is unrelated to ATTRINFO
+   * It just transports a section IVAL from TUP back to 
+   * LQH
+   */
+  ndbrequire(signal->header.theLength == 3);
+  Uint32 tupAttrInfoWords= signal->theData[1];
+  Uint32 tupAttrInfoIVal= signal->theData[2];
+
+  ndbassert(tupAttrInfoWords > 0);
+  ndbassert(tupAttrInfoIVal != RNIL);
+
+  /* If we have stored ATTRINFO that we sent to TUP, 
+   * free it now
+   */
+  if (regTcPtr->attrInfoIVal != RNIL)
+  {
+    /* We should be expecting to receive attrInfo back */
+    ndbassert( !(regTcPtr->m_flags & 
+                 TcConnectionrec::OP_SAVEATTRINFO) );
+    releaseSection( regTcPtr->attrInfoIVal );
+    regTcPtr->attrInfoIVal= RNIL;
+  }
+
+  /* Store reference to ATTRINFO from TUP */
+  regTcPtr->attrInfoIVal= tupAttrInfoIVal;
+  regTcPtr->currTupAiLen= tupAttrInfoWords;
+
 }//Dblqh::execTUP_ATTRINFO()
 
 /* ------------------------------------------------------------------------- */
@@ -3354,26 +3391,19 @@ void Dblqh::execTUP_ATTRINFO(Signal* sig
 /* ------------------------------------------------------------------------- */
 void Dblqh::lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length) 
 {
-  TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  if (regTcPtr->operation != ZREAD) {
-    if (regTcPtr->operation != ZDELETE)
-    {
-      if (regTcPtr->opExec != 1) {
-	if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
-	  ;
-	} else {
-	  jam();
+  /* Store received AttrInfo in a long section */
+  jam();
+  if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
+    ;
+  } else {
+    jam();
 /* ------------------------------------------------------------------------- */
 /* WE MIGHT BE WAITING FOR RESPONSE FROM SOME BLOCK HERE. THUS WE NEED TO    */
 /* GO THROUGH THE STATE MACHINE FOR THE OPERATION.                           */
 /* ------------------------------------------------------------------------- */
-	  localAbortStateHandlerLab(signal);
-	  return;
-	}//if
-      }//if
-    }//if
-  }
-  c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec, dataPtr, length);
+    localAbortStateHandlerLab(signal);
+    return;
+  }//if
 }//Dblqh::lqhAttrinfoLab()
 
 /* ------------------------------------------------------------------------- */
@@ -3438,6 +3468,32 @@ int Dblqh::saveTupattrbuf(Signal* signal
   return ZOK;
 }//Dblqh::saveTupattrbuf()
 
+
+/* ------------------------------------------------------------------------- */
+/* -------           SAVE ATTRINFO INTO ATTR SECTION                 ------- */
+/*                                                                           */
+/* ------------------------------------------------------------------------- */
+int Dblqh::saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len) 
+{
+  TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+  bool ok= appendToSection(regTcPtr->attrInfoIVal,
+                           dataPtr,
+                           len);
+
+  if (unlikely(!ok))
+  {
+    jam();
+    terrorCode = ZGET_ATTRINBUF_ERROR;
+    return ZGET_ATTRINBUF_ERROR;
+  }//if
+
+  regTcPtr->currTupAiLen+= len;
+  
+  return ZOK;
+} // saveAttrInfoInSection
+
+
 /* ==========================================================================
  * =======                       SEIZE ATTRIBUTE IN BUFFER            ======= 
  *
@@ -3467,6 +3523,7 @@ void Dblqh::seizeAttrinbuf(Signal* signa
   attrinbufptr = regAttrinbufptr;
 }//Dblqh::seizeAttrinbuf()
 
+
 /* ==========================================================================
  * =======                        SEIZE TC CONNECT RECORD             ======= 
  * 
@@ -3551,6 +3608,47 @@ bool Dblqh::checkTransporterOverloaded(S
   return !mask.isclear();
 }
 
+void Dblqh::execSIGNAL_DROPPED_REP(Signal* signal)
+{
+  /* An incoming signal was dropped, handle it
+   * Dropped signal really means that we ran out of 
+   * long signal buffering to store its sections
+   */
+  jamEntry();
+  
+  const SignalDroppedRep* rep = (SignalDroppedRep*) &signal->theData[0];
+  Uint32 originalGSN= rep->originalGsn;
+
+  DEBUG("SignalDroppedRep received for GSN " << originalGSN);
+
+  switch(originalGSN) {
+  case GSN_LQHKEYREQ:
+  {
+    jam(); 
+    /* Get original signal data - unfortunately it may
+     * have been truncated.  We must not read beyond
+     * word # 22
+     * We will notify the client that their LQHKEYREQ
+     * failed
+     */
+    const LqhKeyReq * const truncatedLqhKeyReq = 
+      (LqhKeyReq *) &rep->originalData[0];
+    
+    noFreeRecordLab(signal, truncatedLqhKeyReq, ZGET_DATAREC_ERROR);
+
+    break;
+  }
+  default:
+    jam();
+    /* Don't expect dropped signals for other GSNs,
+     * default handling
+     */
+    SimulatedBlock::execSIGNAL_DROPPED_REP(signal);
+  };
+  
+  return;
+}
+
 /* ------------------------------------------------------------------------- */
 /* -------                TAKE CARE OF LQHKEYREQ                     ------- */
 /* LQHKEYREQ IS THE SIGNAL THAT STARTS ALL OPERATIONS IN THE LQH BLOCK       */
@@ -3563,6 +3661,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   Uint8 tfragDistKey;
 
   const LqhKeyReq * const lqhKeyReq = (LqhKeyReq *)signal->getDataPtr();
+  SectionHandle handle(this, signal);
 
   {
     const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
@@ -3570,6 +3669,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
         checkTransporterOverloaded(signal, all, lqhKeyReq) ||
         ERROR_INSERTED_CLEAR(5047)) {
       jam();
+      releaseSections(handle);
       noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
       return;
     }
@@ -3586,6 +3686,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
     if (ERROR_INSERTED(5031)) {
       CLEAR_ERROR_INSERT_VALUE;
     }
+    releaseSections(handle);
     noFreeRecordLab(signal, lqhKeyReq, ZNO_TC_CONNECT_ERROR);
     return;
   }//if
@@ -3593,6 +3694,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   if(ERROR_INSERTED(5038) && 
      refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
     jam();
+    releaseSections(handle);
     SET_ERROR_INSERT_VALUE(5039);
     return;
   }
@@ -3604,8 +3706,15 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   regTcPtr->clientConnectrec = sig0;
   regTcPtr->tcOprec = sig0;
   regTcPtr->storedProcId = ZNIL;
+  regTcPtr->m_flags= 0;
+  bool isLongReq= false;
+  if (handle.m_cnt > 0)
+  {
+    isLongReq= true;
+    regTcPtr->m_flags|= TcConnectionrec::OP_ISLONGREQ;
+  }
 
-  UintR TtotReclenAi = lqhKeyReq->attrLen;
+  UintR attrLenFlags = lqhKeyReq->attrLen;
   sig1 = lqhKeyReq->savePointId;
   sig2 = lqhKeyReq->hashValue;
   UintR Treqinfo = lqhKeyReq->requestInfo;
@@ -3620,16 +3729,16 @@ void Dblqh::execLQHKEYREQ(Signal* signal
 
   const Uint8 op = LqhKeyReq::getOperation(Treqinfo);
   if ((op == ZREAD || op == ZREAD_EX) && !getAllowRead()){
+    releaseSections(handle);
     noFreeRecordLab(signal, lqhKeyReq, ZNODE_SHUTDOWN_IN_PROGESS);
     return;
   }
   
   Uint32 senderVersion = getNodeInfo(refToNode(senderRef)).m_version;
 
-  regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(TtotReclenAi);
   regTcPtr->tcScanInfo  = lqhKeyReq->scanInfo;
-  regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(TtotReclenAi);
-  regTcPtr->m_reorg     = LqhKeyReq::getReorgFlag(TtotReclenAi);
+  regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(attrLenFlags);
+  regTcPtr->m_reorg     = LqhKeyReq::getReorgFlag(attrLenFlags);
 
   regTcPtr->readlenAi = 0;
   regTcPtr->currTupAiLen = 0;
@@ -3674,6 +3783,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
       if (markerPtr.i == RNIL)
       {
         noFreeRecordLab(signal, lqhKeyReq, ZNO_FREE_MARKER_RECORDS_ERROR);
+        releaseSections(handle);
         return;
       }
       markerPtr.p->transid1 = sig1;
@@ -3698,7 +3808,6 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   regTcPtr->opExec        = LqhKeyReq::getInterpretedFlag(Treqinfo);
   regTcPtr->opSimple      = LqhKeyReq::getSimpleFlag(Treqinfo);
   regTcPtr->seqNoReplica  = LqhKeyReq::getSeqNoReplica(Treqinfo);
-  UintR TreclenAiLqhkey   = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
   regTcPtr->apiVersionNo  = 0; 
   regTcPtr->m_use_rowid   = LqhKeyReq::getRowidFlag(Treqinfo);
   regTcPtr->m_dealloc     = 0;
@@ -3722,11 +3831,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   CRASH_INSERTION2(5041, (op == ZREAD && 
                           (regTcPtr->opSimple || regTcPtr->dirtyOp) &&
                           refToNode(signal->senderBlockRef()) != cownNodeid));
-  
-  regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
-  regTcPtr->currReclenAi = TreclenAiLqhkey;
-  UintR TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
-  regTcPtr->primKeyLen = TitcKeyLen;
+
   regTcPtr->noFiredTriggers = lqhKeyReq->noFiredTriggers;
 
   UintR TapplAddressInd = LqhKeyReq::getApplicationAddressFlag(Treqinfo);
@@ -3743,7 +3848,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
     regTcPtr->nodeAfterNext[1] = lqhKeyReq->variableData[nextPos] >> 16;
     nextPos++;
   }//if
-  UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(TtotReclenAi);
+  UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(attrLenFlags);
   if (TstoredProcIndicator == 1) {
     regTcPtr->storedProcId = lqhKeyReq->variableData[nextPos] & ZNIL;
     nextPos++;
@@ -3753,29 +3858,82 @@ void Dblqh::execLQHKEYREQ(Signal* signal
     regTcPtr->readlenAi = lqhKeyReq->variableData[nextPos] & ZNIL;
     nextPos++;
   }//if
-  sig0 = lqhKeyReq->variableData[nextPos + 0];
-  sig1 = lqhKeyReq->variableData[nextPos + 1];
-  sig2 = lqhKeyReq->variableData[nextPos + 2];
-  sig3 = lqhKeyReq->variableData[nextPos + 3];
 
-  regTcPtr->tupkeyData[0] = sig0;
-  regTcPtr->tupkeyData[1] = sig1;
-  regTcPtr->tupkeyData[2] = sig2;
-  regTcPtr->tupkeyData[3] = sig3;
-
-  if (TitcKeyLen > 0) {
-    if (TitcKeyLen < 4) {
-      nextPos += TitcKeyLen;
-    } else {
-      nextPos += 4;
-    }//if
-  } 
-  else if (! (LqhKeyReq::getNrCopyFlag(Treqinfo)))
+  UintR TitcKeyLen = 0;
+  Uint32 keyLenWithLQHReq = 0;
+  UintR TreclenAiLqhkey   = 0;
+
+  if (isLongReq)
+  {
+    /* Long LQHKEYREQ indicates Key and AttrInfo presence and
+     * size via section lengths
+     */
+    SegmentedSectionPtr keyInfoSection, attrInfoSection;
+    
+    handle.getSection(keyInfoSection,
+                      LqhKeyReq::KeyInfoSectionNum);
+
+    ndbassert(keyInfoSection.i != RNIL);
+
+    regTcPtr->keyInfoIVal= keyInfoSection.i;
+    TitcKeyLen= keyInfoSection.sz;
+    keyLenWithLQHReq= TitcKeyLen;
+
+    Uint32 totalAttrInfoLen= 0;
+    if (handle.getSection(attrInfoSection,
+                          LqhKeyReq::AttrInfoSectionNum))
+    {
+      regTcPtr->attrInfoIVal= attrInfoSection.i;
+      totalAttrInfoLen= attrInfoSection.sz;
+    }
+
+    regTcPtr->reclenAiLqhkey = 0;
+    regTcPtr->currReclenAi = totalAttrInfoLen;
+    regTcPtr->totReclenAi = totalAttrInfoLen;
+
+    /* Detach sections from the handle, we are now responsible
+     * for freeing them when appropriate
+     */
+    handle.clear();
+  }
+  else
+  {
+    /* Short LQHKEYREQ, Key and Attr sizes are in
+     * signal, along with some data
+     */
+    TreclenAiLqhkey= LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
+    regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
+    regTcPtr->currReclenAi = TreclenAiLqhkey;
+    TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
+    regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(attrLenFlags);
+
+    /* Note key can be length zero for NR when Rowid used */
+    keyLenWithLQHReq= MIN(TitcKeyLen, LqhKeyReq::MaxKeyInfo);
+
+    bool ok= appendToSection(regTcPtr->keyInfoIVal,
+                             &lqhKeyReq->variableData[ nextPos ],
+                             keyLenWithLQHReq);
+    if (unlikely(!ok))
+    {
+      jam();
+      terrorCode= ZGET_DATAREC_ERROR;
+      abortErrorLab(signal);
+      return;
+    }
+
+    nextPos+= keyLenWithLQHReq;
+  }
+  
+  regTcPtr->primKeyLen = TitcKeyLen;
+
+  /* Only node restart copy allowed to send no KeyInfo */
+  if (( keyLenWithLQHReq == 0 ) &&
+      (! (LqhKeyReq::getNrCopyFlag(Treqinfo))))
   {
     LQHKEY_error(signal, 3);
     return;
   }//if
-  
+
   sig0 = lqhKeyReq->variableData[nextPos + 0];
   sig1 = lqhKeyReq->variableData[nextPos + 1];
   regTcPtr->m_row_id.m_page_no = sig0;
@@ -3892,7 +4050,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   regTcPtr->replicaType = TcopyType;
   regTcPtr->fragmentptr = fragptr.i;
   regTcPtr->m_log_part_ptr_i = logPart;
-  Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
+  Uint8 TdistKey = LqhKeyReq::getDistributionKey(attrLenFlags);
   if ((tfragDistKey != TdistKey) &&
       (regTcPtr->seqNoReplica == 0) &&
       (regTcPtr->dirtyOp == ZFALSE)) 
@@ -3909,81 +4067,122 @@ void Dblqh::execLQHKEYREQ(Signal* signal
     tmp = (tmp < 0 ? - tmp : tmp);
     if ((tmp <= 1) || (tfragDistKey == 0)) {
       LQHKEY_abort(signal, 0);
-      return;
     }//if
     LQHKEY_error(signal, 1);
+    return;
   }//if
-  if (TreclenAiLqhkey != 0) {
-    if (regTcPtr->operation != ZREAD) {
-      if (regTcPtr->operation != ZDELETE) {
-        if (regTcPtr->opExec != 1) {
-          jam();
-/*---------------------------------------------------------------------------*/
-/*                                                                           */
-/* UPDATES, WRITES AND INSERTS THAT ARE NOT INTERPRETED WILL USE THE         */
-/* SAME ATTRINFO IN ALL REPLICAS. THUS WE SAVE THE ATTRINFO ALREADY          */
-/* TO SAVE A SIGNAL FROM TUP TO LQH. INTERPRETED EXECUTION IN TUP            */
-/* WILL CREATE NEW ATTRINFO FOR THE OTHER REPLICAS AND IT IS THUS NOT        */
-/* A GOOD IDEA TO SAVE THE INFORMATION HERE. READS WILL ALSO BE              */
-/* UNNECESSARY TO SAVE SINCE THAT ATTRINFO WILL NEVER BE SENT TO ANY         */
-/* MORE REPLICAS.                                                            */
-/*---------------------------------------------------------------------------*/
-/* READS AND DELETES CAN ONLY HAVE INFORMATION ABOUT WHAT IS TO BE READ.     */
-/* NO INFORMATION THAT NEEDS LOGGING.                                        */
-/*---------------------------------------------------------------------------*/
-          sig0 = lqhKeyReq->variableData[nextPos + 0];
-          sig1 = lqhKeyReq->variableData[nextPos + 1];
-          sig2 = lqhKeyReq->variableData[nextPos + 2];
-          sig3 = lqhKeyReq->variableData[nextPos + 3];
-          sig4 = lqhKeyReq->variableData[nextPos + 4];
-
-          regTcPtr->firstAttrinfo[0] = sig0;
-          regTcPtr->firstAttrinfo[1] = sig1;
-          regTcPtr->firstAttrinfo[2] = sig2;
-          regTcPtr->firstAttrinfo[3] = sig3;
-          regTcPtr->firstAttrinfo[4] = sig4;
-          regTcPtr->currTupAiLen = TreclenAiLqhkey;
-        } else {
-          jam();
-          regTcPtr->reclenAiLqhkey = 0;
-        }//if
-      } else {
+
+  /*
+   * Interpreted updates and deletes may require different AttrInfo in 
+   * different replicas, as only the primary executes the interpreted 
+   * program, and the effect of the program rather than the program
+   * should be logged.
+   * Non interpreted inserts, updates, writes and deletes use the same
+   * AttrInfo in all replicas.
+   * All reads only run on one replica, and are not logged.
+   * The AttrInfo section is passed to TUP attached to the TUPKEYREQ
+   * signal below.
+   *
+   * Normal processing : 
+   *   - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ 
+   *     signal
+   *   - TUP processes request and sends direct TUPKEYCONF back to LQH
+   *   - LQH continues processing (logging, forwarding LQHKEYREQ to other
+   *     replicas as necessary)
+   *   - LQH frees ATTRINFO section
+   *   Note that TUP is not responsible for freeing the passed ATTRINFO
+   *   section, LQH is.
+   *
+   * Interpreted Update / Delete processing 
+   *   - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ 
+   *     signal
+   *   - TUP processes request, generating new ATTRINFO data
+   *   - If new AttrInfo data is > 0 words, TUP sends it back to LQH as
+   *     a long section attached to a single ATTRINFO signal.
+   *     - LQH frees the original AttrInfo section and stores a ref to 
+   *       the new section
+   *   - TUP sends direct TUPKEYCONF back to LQH with new ATTRINFO length
+   *   - If the new ATTRINFO is > 0 words, 
+   *       - LQH continues processing with it (logging, forwarding 
+   *         LQHKEYREQ to other replicas as necessary)
+   *       - LQH frees the new ATTRINFO section
+   *   - If the new ATTRINFO is 0 words, LQH frees the original ATTRINFO
+   *     section and continues processing (logging, forwarding LQHKEYREQ
+   *     to other replicas as necessary)
+   *
+   */
+  bool attrInfoToPropagate= 
+    (regTcPtr->totReclenAi != 0) &&
+    (regTcPtr->operation != ZREAD) &&
+    (regTcPtr->operation != ZDELETE);
+  bool tupCanChangePropagatedAttrInfo= (regTcPtr->opExec == 1);
+  
+  bool saveAttrInfo= 
+    attrInfoToPropagate &&
+    (! tupCanChangePropagatedAttrInfo);
+  
+  if (saveAttrInfo)
+    regTcPtr->m_flags|= TcConnectionrec::OP_SAVEATTRINFO;
+  
+  
+  /* Handle any AttrInfo we received with the LQHKEYREQ */
+  if (regTcPtr->currReclenAi != 0)
+  {
+    jam();
+    if (isLongReq)
+    {
+      /* Long LQHKEYREQ */
+      jam();
+      
+      regTcPtr->currTupAiLen= saveAttrInfo ?
+        regTcPtr->totReclenAi :
+        0;
+    }
+    else
+    {
+      /* Short LQHKEYREQ */
+      jam();
+
+      /* Lets put the AttrInfo into a segmented section */
+      bool ok= appendToSection(regTcPtr->attrInfoIVal,
+                               lqhKeyReq->variableData + nextPos,
+                               TreclenAiLqhkey);
+      if (unlikely(!ok))
+      {
         jam();
-        regTcPtr->reclenAiLqhkey = 0;
-      }//if
-    }//if
-    sig0 = lqhKeyReq->variableData[nextPos + 0];
-    sig1 = lqhKeyReq->variableData[nextPos + 1];
-    sig2 = lqhKeyReq->variableData[nextPos + 2];
-    sig3 = lqhKeyReq->variableData[nextPos + 3];
-    sig4 = lqhKeyReq->variableData[nextPos + 4];
-    
-    c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec, 
-			    lqhKeyReq->variableData+nextPos, TreclenAiLqhkey);
-    
-    if (signal->theData[0] == (UintR)-1) {
-      LQHKEY_abort(signal, 2);
-      return;
-    }//if
+        terrorCode= ZGET_DATAREC_ERROR;
+        abortErrorLab(signal);
+        return;
+      }
+        
+      regTcPtr->currTupAiLen= TreclenAiLqhkey;
+    }
   }//if
-/* ------- TAKE CARE OF PRIM KEY DATA ------- */
-  if (regTcPtr->primKeyLen <= 4) {
+
+  /* If we've received all KeyInfo, proceed with processing,
+   * otherwise wait for discrete KeyInfo signals
+   */
+  if (regTcPtr->primKeyLen == keyLenWithLQHReq) {
     endgettupkeyLab(signal);
     return;
   } else {
     jam();
-/*--------------------------------------------------------------------*/
-/*       KEY LENGTH WAS MORE THAN 4 WORDS (WORD = 4 BYTE). THUS WE    */
-/*       HAVE TO ALLOCATE A DATA BUFFER TO STORE THE KEY DATA AND     */
-/*       WAIT FOR THE KEYINFO SIGNAL.                                 */
-/*--------------------------------------------------------------------*/
-    regTcPtr->save1 = 4;
+    ndbassert(!isLongReq);
+    /* Wait for remaining KeyInfo */
+    regTcPtr->save1 = keyLenWithLQHReq;
     regTcPtr->transactionState = TcConnectionrec::WAIT_TUPKEYINFO;
     return;
   }//if
   return;
 }//Dblqh::execLQHKEYREQ()
 
+
+
+/**
+ * endgettupkeyLab
+ * Invoked when all KeyInfo and/or all AttrInfo has been 
+ * received
+ */
 void Dblqh::endgettupkeyLab(Signal* signal) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
@@ -3991,7 +4190,10 @@ void Dblqh::endgettupkeyLab(Signal* sign
     ;
   } else {
     jam();
+    /* Wait for discrete AttrInfo signals */
     ndbrequire(regTcPtr->currReclenAi < regTcPtr->totReclenAi);
+    ndbassert( !(regTcPtr->m_flags & 
+                 TcConnectionrec::OP_ISLONGREQ) );
     regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
     return;
   }//if
@@ -4119,7 +4321,7 @@ void Dblqh::prepareContinueAfterBlockedL
       TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
       TRACENR(" rowid: " << regTcPtr->m_row_id);
-    TRACENR(" key: " << regTcPtr->tupkeyData[0]);
+    TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr, 0));
   }
   
   if (likely(activeCreat == Fragrecord::AC_NORMAL))
@@ -4180,21 +4382,13 @@ Dblqh::exec_acckeyreq(Signal* signal, Tc
   signal->theData[5] = sig4;
 
   sig0 = regTcPtr.p->transid[1];
-  sig1 = regTcPtr.p->tupkeyData[0];
-  sig2 = regTcPtr.p->tupkeyData[1];
-  sig3 = regTcPtr.p->tupkeyData[2];
-  sig4 = regTcPtr.p->tupkeyData[3];
   signal->theData[6] = sig0;
-  signal->theData[7] = sig1;
-  signal->theData[8] = sig2;
-  signal->theData[9] = sig3;
-  signal->theData[10] = sig4;
+
+  /* Copy KeyInfo to end of ACCKEYREQ signal, starting at offset 7 */
+  sendKeyinfoAcc(signal, 7);
 
   TRACE_OP(regTcPtr.p, "ACC");
   
-  if (regTcPtr.p->primKeyLen > 4) {
-    sendKeyinfoAcc(signal, 11);
-  }//if
   EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ, 
 		 signal, 7 + regTcPtr.p->primKeyLen);
   if (signal->theData[0] < RNIL) {
@@ -4266,7 +4460,7 @@ Dblqh::handle_nr_copy(Signal* signal, Pt
        * Case 4
        *   Perform delete using rowid
        *     primKeyLen == 0
-       *     tupkeyData[0] == rowid
+       *     key[0] == rowid
        */
       jam();
       ndbassert(regTcPtr.p->primKeyLen == 0);
@@ -4380,6 +4574,10 @@ update_gci_ignore:
   packLqhkeyreqLab(signal);
 }
 
+/**
+ * Compare received key data with the data supplied
+ * returning 0 if they are the same, 1 otherwise
+ */
 int
 Dblqh::compare_key(const TcConnectionrec* regTcPtr, 
 		   const Uint32 * ptr, Uint32 len)
@@ -4387,31 +4585,29 @@ Dblqh::compare_key(const TcConnectionrec
   if (regTcPtr->primKeyLen != len)
     return 1;
   
-  if (len <= 4)
-    return memcmp(ptr, regTcPtr->tupkeyData, 4*len);
-  
-  if (memcmp(ptr, regTcPtr->tupkeyData, sizeof(regTcPtr->tupkeyData)))
-    return 1;
+  ndbassert( regTcPtr->keyInfoIVal != RNIL );
+
+  SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                              getSectionSegmentPool());
   
-  len -= (sizeof(regTcPtr->tupkeyData) >> 2);
-  ptr += (sizeof(regTcPtr->tupkeyData) >> 2);
+  ndbassert(regTcPtr->primKeyLen == keyInfoReader.getSize());
 
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-  ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-  while(len > 4)
+  while (len != 0)
   {
-    if (memcmp(ptr, regDatabufptr.p, 4*4))
-      return 1;
+    const Uint32* keyChunk= NULL;
+    Uint32 chunkSize= 0;
 
-    ptr += 4;
-    len -= 4;
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);    
-  }
+    /* Get a ptr to a chunk of contiguous words to compare */
+    bool ok= keyInfoReader.getWordsPtr(len, keyChunk, chunkSize);
 
-  if (memcmp(ptr, regDatabufptr.p, 4*len))
-    return 1;
+    ndbrequire(ok);
+
+    if ( memcmp(ptr, keyChunk, chunkSize << 2))
+      return 1;
+    
+    ptr+= chunkSize;
+    len-= chunkSize;
+  }
 
   return 0;
 }
@@ -4454,12 +4650,11 @@ Dblqh::nr_copy_delete_row(Signal* signal
     keylen = regTcPtr.p->primKeyLen;
     signal->theData[3] = regTcPtr.p->hashValue;
     signal->theData[4] = keylen;
-    signal->theData[7] = regTcPtr.p->tupkeyData[0];
-    signal->theData[8] = regTcPtr.p->tupkeyData[1];
-    signal->theData[9] = regTcPtr.p->tupkeyData[2];
-    signal->theData[10] = regTcPtr.p->tupkeyData[3];
-    if (keylen > 4)
-      sendKeyinfoAcc(signal, 11);
+
+    /* Copy KeyInfo inline into the ACCKEYREQ signal, 
+     * starting at word 7 
+     */
+    sendKeyinfoAcc(signal, 7);
   }
   const Uint32 ref = refToBlock(regTcPtr.p->tcAccBlockref);
   EXECUTE_DIRECT(ref, GSN_ACCKEYREQ, signal, 7 + keylen);
@@ -4637,19 +4832,7 @@ Dblqh::readPrimaryKeys(Uint32 opPtrI, Ui
   regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
   Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
 
-  memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
-  if (keyLen > 4)
-  {
-    tmp += 4;
-    Uint32 pos = 4;
-    do {
-      ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-      memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
-      regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-      tmp += sizeof(regDatabufptr.p->data) >> 2;
-      pos += sizeof(regDatabufptr.p->data) >> 2;
-    } while(pos < keyLen);
-  }    
+  copy(tmp, regTcPtr.p->keyInfoIVal);
   
   if (xfrm)
   {
@@ -4661,29 +4844,42 @@ Dblqh::readPrimaryKeys(Uint32 opPtrI, Ui
   return keyLen;
 }
 
+/**
+ * getKeyInfoWordOrZero
+ * Get given word of KeyInfo, or zero if it's not available
+ * Used for tracing
+ */
+Uint32
+Dblqh::getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr,
+                            Uint32 offset)
+{
+  if (regTcPtr->keyInfoIVal != RNIL)
+  {
+    SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                                g_sectionSegmentPool);
+    
+    if (keyInfoReader.getSize() > offset)
+    {
+      if (offset)
+        keyInfoReader.step(offset);
+      
+      Uint32 word;
+      keyInfoReader.getWord(&word);
+      return word;
+    }
+  }
+  return 0;
+}
+
 /* =*======================================================================= */
 /* =======                 SEND KEYINFO TO ACC                       ======= */
 /*                                                                           */
 /* ========================================================================= */
 void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti) 
 {
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-  
-  do {
-    jam();
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-    Uint32 sig0 = regDatabufptr.p->data[0];
-    Uint32 sig1 = regDatabufptr.p->data[1];
-    Uint32 sig2 = regDatabufptr.p->data[2];
-    Uint32 sig3 = regDatabufptr.p->data[3];
-    signal->theData[Ti] = sig0;
-    signal->theData[Ti + 1] = sig1;
-    signal->theData[Ti + 2] = sig2;
-    signal->theData[Ti + 3] = sig3;
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-    Ti += 4;
-  } while (regDatabufptr.i != RNIL);
+  /* Copy all KeyInfo into the signal at offset Ti */
+  copy(&signal->theData[Ti],
+       tcConnectptr.p->keyInfoIVal);
 }//Dblqh::sendKeyinfoAcc()
 
 void Dblqh::execLQH_ALLOCREQ(Signal* signal)
@@ -4895,6 +5091,18 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
   regTcPtr->m_row_id.m_page_no = page_no;
   regTcPtr->m_row_id.m_page_idx = page_idx;
   
+  tupKeyReq->attrInfoIVal= RNIL;
+
+  /* Pass AttrInfo section if available in the TupKeyReq signal
+   * We are still responsible for releasing it, TUP is just
+   * borrowing it
+   */
+  if (tupKeyReq->attrBufLen > 0)
+  {
+    ndbassert( regTcPtr->attrInfoIVal != RNIL );
+    tupKeyReq->attrInfoIVal= regTcPtr->attrInfoIVal;
+  }
+
   EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
 }//Dblqh::execACCKEYCONF()
 
@@ -5013,7 +5221,12 @@ void Dblqh::tupkeyConfLab(Signal* signal
     commitContinueAfterBlockedLab(signal);
     return;
   }//if
+
   regTcPtr->totSendlenAi = writeLen;
+  /* We will propagate / log writeLen words
+   * Check that that is how many we have available to 
+   * propagate
+   */
   ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
   
   if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@@ -5446,13 +5659,26 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
 
   Uint32 nextNodeId = regTcPtr->nextReplica;
   Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
-  UintR TAiLen = regTcPtr->reclenAiLqhkey;
+
+  /* Send long LqhKeyReq to next replica if it can support it */
+  bool sendLongReq= ! ((nextVersion < NDBD_LONG_LQHKEYREQ) || 
+                       ERROR_INSERTED(5051));
+  
+  UintR TAiLen = sendLongReq ?
+    0 :
+    MIN(regTcPtr->totSendlenAi, LqhKeyReq::MaxAttrInfo);
+
+  /* Long LQHKeyReq uses section size for key length */
+  Uint32 lqhKeyLen= sendLongReq?
+    0 :
+    regTcPtr->primKeyLen;
 
   UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
   LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
   LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
   LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
   LqhKeyReq::setAIInLqhKeyReq(Treqinfo, TAiLen);
+  LqhKeyReq::setKeyLen(Treqinfo,lqhKeyLen);
   
   if (unlikely(nextVersion < NDBD_ROWID_VERSION))
   {
@@ -5481,7 +5707,11 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
   LqhKeyReq::setSameClientAndTcFlag(Treqinfo, TsameLqhAndClient);
   LqhKeyReq::setReturnedReadLenAIFlag(Treqinfo, TreadLenAiInd);
 
-  UintR TotReclenAi = regTcPtr->totSendlenAi;
+  /* Long LQHKeyReq uses section size for AttrInfo length */
+  UintR TotReclenAi = sendLongReq ? 
+    0 :
+    regTcPtr->totSendlenAi;
+
   LqhKeyReq::setReorgFlag(TotReclenAi, regTcPtr->m_reorg);
 
 /* ------------------------------------------------------------------------- */
@@ -5530,23 +5760,33 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
     nextPos++;
   }//if
   sig0 = regTcPtr->readlenAi;
-  sig1 = regTcPtr->tupkeyData[0];
-  sig2 = regTcPtr->tupkeyData[1];
-  sig3 = regTcPtr->tupkeyData[2];
-  sig4 = regTcPtr->tupkeyData[3];
-
   lqhKeyReq->variableData[nextPos] = sig0;
   nextPos += TreadLenAiInd;
-  lqhKeyReq->variableData[nextPos] = sig1;
-  lqhKeyReq->variableData[nextPos + 1] = sig2;
-  lqhKeyReq->variableData[nextPos + 2] = sig3;
-  lqhKeyReq->variableData[nextPos + 3] = sig4;
-  UintR TkeyLen = LqhKeyReq::getKeyLen(Treqinfo);
-  if (TkeyLen < 4) {
-    nextPos += TkeyLen;
-  } else {
-    nextPos += 4;
-  }//if
+
+  if (!sendLongReq)
+  {
+    /* Short LQHKEYREQ to older LQH
+     * First few words of KeyInfo go into LQHKEYREQ
+     * Sometimes have no Keyinfo
+     */
+    if (regTcPtr->primKeyLen != 0)
+    {
+      SegmentedSectionPtr keyInfoSection;
+      
+      ndbassert(regTcPtr->keyInfoIVal != RNIL);
+
+      getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+      SectionReader keyInfoReader(keyInfoSection, g_sectionSegmentPool);
+      
+      UintR keyLenInLqhKeyReq= MIN(LqhKeyReq::MaxKeyInfo, 
+                                   regTcPtr->primKeyLen);
+      
+      keyInfoReader.getWords(&lqhKeyReq->variableData[nextPos], 
+                             keyLenInLqhKeyReq);
+      
+      nextPos+= keyLenInLqhKeyReq;
+    }
+  }
 
   sig0 = regTcPtr->gci_hi;
   Local_key tmp = regTcPtr->m_row_id;
@@ -5560,77 +5800,147 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
 
   BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
   
-  if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+  if (likely(sendLongReq))
   {
-    jam();
-    sig0 = regTcPtr->firstAttrinfo[0];
-    sig1 = regTcPtr->firstAttrinfo[1];
-    sig2 = regTcPtr->firstAttrinfo[2];
-    sig3 = regTcPtr->firstAttrinfo[3];
-    sig4 = regTcPtr->firstAttrinfo[4];
+    /* Long LQHKEYREQ, attach KeyInfo and AttrInfo
+     * sections to signal
+     */
+    SectionHandle handle(this);
+    handle.m_cnt= 0;
 
-    lqhKeyReq->variableData[nextPos] = sig0;
-    lqhKeyReq->variableData[nextPos + 1] = sig1;
-    lqhKeyReq->variableData[nextPos + 2] = sig2;
-    lqhKeyReq->variableData[nextPos + 3] = sig3;
-    lqhKeyReq->variableData[nextPos + 4] = sig4;
-    
-    nextPos += TAiLen;
-    TAiLen = 0;
+    if (regTcPtr->primKeyLen > 0)
+    {
+      SegmentedSectionPtr keyInfoSection;
+      
+      ndbassert(regTcPtr->keyInfoIVal != RNIL);
+      getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+      
+      handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+      handle.m_cnt= 1;
+
+      if (regTcPtr->totSendlenAi > 0)
+      {
+        SegmentedSectionPtr attrInfoSection;
+        
+        ndbassert(regTcPtr->attrInfoIVal != RNIL);
+        getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+        
+        handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+        handle.m_cnt= 2;
+      }
+      else
+      {
+        /* No AttrInfo to be sent on.  This can occur for delete
+         * or with an interpreted update when no actual update 
+         * is made
+         * In this case, we free any attrInfo section now.
+         */
+        if (regTcPtr->attrInfoIVal != RNIL)
+        {
+          ndbassert(!( regTcPtr->m_flags & 
+                       TcConnectionrec::OP_SAVEATTRINFO))
+          releaseSection(regTcPtr->attrInfoIVal);
+          regTcPtr->attrInfoIVal= RNIL;
+        }
+      }
+    }
+    else
+    {
+      /* Zero-length primary key, better not have any
+       * AttrInfo
+       */
+      ndbrequire(regTcPtr->totSendlenAi == 0);
+      ndbassert(regTcPtr->keyInfoIVal == RNIL);
+      ndbassert(regTcPtr->attrInfoIVal == RNIL);
+    }
+
+    sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
+               LqhKeyReq::FixedSignalLength + nextPos,
+               JBB,
+               &handle);
+    
+    /* Long sections were freed as part of sendSignal */
+    ndbassert( handle.m_cnt == 0);
+    regTcPtr->keyInfoIVal= RNIL;
+    regTcPtr->attrInfoIVal= RNIL;
   }
   else
   {
-    Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
-    lqhKeyReq->requestInfo = Treqinfo;
-  }
-  
-  sendSignal(lqhRef, GSN_LQHKEYREQ, signal, 
-             nextPos + LqhKeyReq::FixedSignalLength, JBB);
-  if (regTcPtr->primKeyLen > 4) {
-    jam();
-/* ------------------------------------------------------------------------- */
-/* MORE THAN 4 WORDS OF KEY DATA IS IN THE OPERATION. THEREFORE WE NEED TO   */
-/* PREPARE A KEYINFO SIGNAL. MORE THAN ONE KEYINFO SIGNAL CAN BE SENT.       */
-/* ------------------------------------------------------------------------- */
-    sendTupkey(signal);
-  }//if
-/* ------------------------------------------------------------------------- */
-/* NOW I AM PREPARED TO SEND ALL THE ATTRINFO SIGNALS. AT THE MOMENT A LOOP  */
-/* SENDS ALL AT ONCE. LATER WE HAVE TO ADDRESS THE PROBLEM THAT THESE COULD  */
-/* LEAD TO BUFFER EXPLOSION => NODE CRASH.                                   */
-/* ------------------------------------------------------------------------- */
-/*       NEW CODE TO SEND ATTRINFO IN PACK_LQHKEYREQ  */
-/*       THIS CODE USES A REAL-TIME BREAK AFTER       */
-/*       SENDING 16 SIGNALS.                          */
-/* -------------------------------------------------- */
-  sig0 = regTcPtr->tcOprec;
-  sig1 = regTcPtr->transid[0];
-  sig2 = regTcPtr->transid[1];
-  signal->theData[0] = sig0;
-  signal->theData[1] = sig1;
-  signal->theData[2] = sig2;
-  
-  if (unlikely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength > 25))
-  {
-    jam();
-    /**
-     * 4 replicas...
+    /* Short LQHKEYREQ to older LQH
+     * First few words of ATTRINFO go into LQHKEYREQ
+     * (if they fit)
      */
-    memcpy(signal->theData+3, regTcPtr->firstAttrinfo, TAiLen << 2);
-    sendSignal(lqhRef, GSN_ATTRINFO, signal, 3 + TAiLen, JBB);    
+    if (TAiLen > 0)
+    {
+      if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+      {
+        jam();
+        SegmentedSectionPtr attrInfoSection;
+        
+        ndbassert(regTcPtr->attrInfoIVal != RNIL);
+        
+        getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+        SectionReader attrInfoReader(attrInfoSection, getSectionSegmentPool());
+        
+        attrInfoReader.getWords(&lqhKeyReq->variableData[nextPos], 
+                                TAiLen);
+        
+        nextPos+= TAiLen;
+      }
+      else
+      {
+        /* Not enough space in LQHKEYREQ, we'll send everything in
+         * separate ATTRINFO signals
+         */
+        Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
+        lqhKeyReq->requestInfo = Treqinfo;
+        TAiLen= 0;
+      }
+    }
+  
+    sendSignal(lqhRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB);
+
+    /* Send extra KeyInfo signals if necessary... */
+    if (regTcPtr->primKeyLen > LqhKeyReq::MaxKeyInfo) {
+      jam();
+      sendTupkey(signal);
+    }//if
+
+    /* Send extra AttrInfo signals if necessary... */
+    Uint32 remainingAiLen= regTcPtr->totSendlenAi - TAiLen;
+    
+    if (remainingAiLen != 0)
+    {
+      sig0 = regTcPtr->tcOprec;
+      sig1 = regTcPtr->transid[0];
+      sig2 = regTcPtr->transid[1];
+      signal->theData[0] = sig0;
+      signal->theData[1] = sig1;
+      signal->theData[2] = sig2;
+
+      SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+                                   g_sectionSegmentPool);
+
+      ndbassert(attrInfoReader.getSize() == regTcPtr->totSendlenAi);
+
+      /* Step over words already sent in LQHKEYREQ above */
+      attrInfoReader.step(TAiLen);
+
+      while (remainingAiLen != 0)
+      {
+        Uint32 dataInSignal= MIN(AttrInfo::DataLength, remainingAiLen);
+        attrInfoReader.getWords(&signal->theData[3],
+                                dataInSignal);
+        remainingAiLen-= dataInSignal;
+        sendSignal(lqhRef, GSN_ATTRINFO, signal, 
+                   AttrInfo::HeaderLength + dataInSignal, JBB);
+      }
+    }
   }
 
-  AttrbufPtr regAttrinbufptr;
-  regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
-  while (regAttrinbufptr.i != RNIL) {
-    ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
-    jam();
-    Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
-    ndbrequire(dataLen != 0);
-    MEMCOPY_NO_WORDS(&signal->theData[3], &regAttrinbufptr.p->attrbuf[0], dataLen);
-    regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
-    sendSignal(lqhRef, GSN_ATTRINFO, signal, dataLen + 3, JBB);
-  }//while
+  /* LQHKEYREQ sent */
+
   regTcPtr->transactionState = TcConnectionrec::PREPARED;
   if (regTcPtr->dirtyOp == ZTRUE) {
     jam();
@@ -5758,49 +6068,24 @@ void Dblqh::writeLogHeader(Signal* signa
 void Dblqh::writeKey(Signal* signal) 
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
-  Uint32 logPos, endPos, dataLen;
-  Int32 remainingLen;
-  logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  remainingLen = regTcPtr->primKeyLen;
-  dataLen = remainingLen;
-  if (remainingLen > 4)
-    dataLen = 4;
-  remainingLen -= dataLen;
-  endPos = logPos + dataLen;
-  if (endPos < ZPAGE_SIZE) {
-    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                     &regTcPtr->tupkeyData[0],
-                     dataLen);
-  } else {
-    jam();
-    for (Uint32 i = 0; i < dataLen; i++)
-      writeLogWord(signal, regTcPtr->tupkeyData[i]);
-    endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  }//if
-  DatabufPtr regDatabufptr;
-  regDatabufptr.i = regTcPtr->firstTupkeybuf;
-  while (remainingLen > 0) {
-    logPos = endPos;
-    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-    dataLen = remainingLen;
-    if (remainingLen > 4)
-      dataLen = 4;
-    remainingLen -= dataLen;
-    endPos += dataLen;
-    if (endPos < ZPAGE_SIZE) {
-      MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                       &regDatabufptr.p->data[0],
-                       dataLen);
-    } else {
-      logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
-      for (Uint32 i = 0; i < dataLen; i++)
-        writeLogWord(signal, regDatabufptr.p->data[i]);
-      endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-    }//if
-    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
-  }//while
-  logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
-  ndbrequire(regDatabufptr.i == RNIL);
+  jam();
+  SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                              g_sectionSegmentPool);
+  const Uint32* srcPtr;
+  Uint32 length;
+  Uint32 wordsWritten= 0;
+
+  /* Write contiguous chunks of words from the KeyInfo
+   * section to the log 
+   */
+  while (keyInfoReader.getWordsPtr(srcPtr,
+                                   length))
+  {
+    writeLogWords(signal, srcPtr, length);
+    wordsWritten+= length;
+  }
+
+  ndbassert( wordsWritten == regTcPtr->primKeyLen );
 }//Dblqh::writeKey()
 
 /* --------------------------------------------------------------------------
@@ -5814,44 +6099,26 @@ void Dblqh::writeAttrinfoLab(Signal* sig
   Uint32 totLen = regTcPtr->currTupAiLen;
   if (totLen == 0)
     return;
-  Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  Uint32 lqhLen = regTcPtr->reclenAiLqhkey;
-  ndbrequire(totLen >= lqhLen);
-  Uint32 endPos = logPos + lqhLen;
-  totLen -= lqhLen;
-  if (endPos < ZPAGE_SIZE) {
-    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                     &regTcPtr->firstAttrinfo[0],
-                     lqhLen);
-  } else {
-    for (Uint32 i = 0; i < lqhLen; i++)
-      writeLogWord(signal, regTcPtr->firstAttrinfo[i]);
-    endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-  }//if
-  AttrbufPtr regAttrinbufptr;
-  regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
-  while (totLen > 0) {
-    logPos = endPos;
-    ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
-    Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
-    ndbrequire(totLen >= dataLen);
-    ndbrequire(dataLen > 0);
-    totLen -= dataLen;
-    endPos += dataLen;
-    if (endPos < ZPAGE_SIZE) {
-      MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
-                      &regAttrinbufptr.p->attrbuf[0],
-                      dataLen);
-    } else {
-      logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
-      for (Uint32 i = 0; i < dataLen; i++)
-        writeLogWord(signal, regAttrinbufptr.p->attrbuf[i]);
-      endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
-    }//if
-    regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
-  }//while
-  logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
-  ndbrequire(regAttrinbufptr.i == RNIL);
+
+  jam();
+  ndbassert( regTcPtr->attrInfoIVal != RNIL );
+  SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+                               g_sectionSegmentPool);
+  const Uint32* srcPtr;
+  Uint32 length;
+  Uint32 wordsWritten= 0;
+
+  /* Write contiguous chunks of words from the 
+   * AttrInfo section to the log 
+   */
+  while (attrInfoReader.getWordsPtr(srcPtr,
+                                    length))
+  {
+    writeLogWords(signal, srcPtr, length);
+    wordsWritten+= length;
+  }
+
+  ndbassert( wordsWritten == totLen );
 }//Dblqh::writeAttrinfoLab()
 
 /* ------------------------------------------------------------------------- */
@@ -5861,31 +6128,31 @@ void Dblqh::writeAttrinfoLab(Signal* sig
 /* ------------------------------------------------------------------------- */
 void Dblqh::sendTupkey(Signal* signal) 
 {
-  UintR TdataPos = 3;
   BlockReference lqhRef = calcLqhBlockRef(tcConnectptr.p->nextReplica);
   signal->theData[0] = tcConnectptr.p->tcOprec;
   signal->theData[1] = tcConnectptr.p->transid[0];
   signal->theData[2] = tcConnectptr.p->transid[1];
-  databufptr.i = tcConnectptr.p->firstTupkeybuf;
-  do {
-    ptrCheckGuard(databufptr, cdatabufFileSize, databuf);
-    signal->theData[TdataPos] = databufptr.p->data[0];
-    signal->theData[TdataPos + 1] = databufptr.p->data[1];
-    signal->theData[TdataPos + 2] = databufptr.p->data[2];
-    signal->theData[TdataPos + 3] = databufptr.p->data[3];
 
-    databufptr.i = databufptr.p->nextDatabuf;
-    TdataPos += 4;
-    if (databufptr.i == RNIL) {
-      jam();
-      sendSignal(lqhRef, GSN_KEYINFO, signal, TdataPos, JBB);
-      return;
-    } else if (TdataPos == 23) {
-      jam();
-      sendSignal(lqhRef, GSN_KEYINFO, signal, 23, JBB);
-      TdataPos = 3;
-    }
-  } while (1);
+  Uint32 remainingLen= tcConnectptr.p->primKeyLen - 
+    LqhKeyReq::MaxKeyInfo;
+
+  SectionReader keyInfoReader(tcConnectptr.p->keyInfoIVal,
+                              g_sectionSegmentPool);
+
+  ndbassert(keyInfoReader.getSize() > LqhKeyReq::MaxKeyInfo);
+
+  /* Step over the words already sent in LQHKEYREQ */
+  keyInfoReader.step(LqhKeyReq::MaxKeyInfo);
+
+  while (remainingLen != 0)
+  {
+    Uint32 dataInSignal= MIN(KeyInfo::DataLength, remainingLen);
+    keyInfoReader.getWords(&signal->theData[3],
+                           dataInSignal);
+    remainingLen-= dataInSignal;
+    sendSignal(lqhRef, GSN_KEYINFO, signal, 
+               KeyInfo::HeaderLength + dataInSignal, JBB);
+  }
 }//Dblqh::sendTupkey()
 
 void Dblqh::cleanUp(Signal* signal) 
@@ -5935,6 +6202,12 @@ void Dblqh::releaseOprec(Signal* signal)
   regTcPtr->firstTupkeybuf = RNIL;
   regTcPtr->lastTupkeybuf = RNIL;
 
+  /* Release long sections if present */
+  releaseSection(regTcPtr->keyInfoIVal);
+  regTcPtr->keyInfoIVal = RNIL;
+  releaseSection(regTcPtr->attrInfoIVal);
+  regTcPtr->attrInfoIVal = RNIL;
+
   if (regTcPtr->m_dealloc)
   {
     jam();
@@ -6689,7 +6962,7 @@ void Dblqh::commitContinueAfterBlockedLa
 	  TRACENR(" NrCopy");
 	if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
 	  TRACENR(" rowid: " << regTcPtr.p->m_row_id);
-	TRACENR(" key: " << regTcPtr.p->tupkeyData[0]);
+	TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr.p, 0));
 	TRACENR(endl);
       }
 
@@ -7183,7 +7456,7 @@ void Dblqh::execACCKEYREF(Signal* signal
       TRACENR(" NrCopy");
     if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
       TRACENR(" rowid: " << tcPtr->m_row_id);
-    TRACENR(" key: " << tcPtr->tupkeyData[0]);
+    TRACENR(" key: " << getKeyInfoWordOrZero(tcPtr, 0));
     TRACENR(endl);
     
   }
@@ -9513,27 +9786,29 @@ Dblqh::next_scanconf_tupkeyreq(Signal* s
   tupKeyReq->tcOpIndex = regTcPtr->tcOprec;
   tupKeyReq->savePointId = regTcPtr->savePointId;
   tupKeyReq->disk_page= disk_page;
+  tupKeyReq->attrInfoIVal= RNIL;
   Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
   EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, 
 		 TupKeyReq::SignalLength);
 }
 
 /* -------------------------------------------------------------------------
- *       RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
+ *       STORE KEYINFO IN A LONG SECTION PRIOR TO SENDING
  * -------------------------------------------------------------------------
  *       PRECONDITION:   SCAN_STATE = WAIT_SCAN_KEYINFO
  * ------------------------------------------------------------------------- */
-void 
-Dblqh::keyinfoLab(const Uint32 * src, const Uint32 * end) 
+bool 
+Dblqh::keyinfoLab(const Uint32 * src, Uint32 len) 
 {
-  do {
-    jam();
-    seizeTupkeybuf(0);
-    databufptr.p->data[0] = * src ++;
-    databufptr.p->data[1] = * src ++;
-    databufptr.p->data[2] = * src ++;
-    databufptr.p->data[3] = * src ++;
-  } while (src < end);
+  ndbassert( tcConnectptr.p->keyInfoIVal == RNIL );
+  ndbassert( len > 0 );
+
+  if (ERROR_INSERTED(5052))
+    return false;
+
+  return(appendToSection(tcConnectptr.p->keyInfoIVal,
+                         src,
+                         len));
 }//Dblqh::keyinfoLab()
 
 Uint32
@@ -10754,7 +11029,7 @@ void Dblqh::nextScanConfCopyLab(Signal* 
 
   scanptr.p->m_curr_batch_size_rows++;
   
-  if (signal->getLength() == 7)
+  if (signal->getLength() == NextScanConf::SignalLengthNoKeyInfo)
   {
     jam();
     ndbrequire(nextScanConf->accOperationPtr == RNIL);
@@ -10843,27 +11118,22 @@ void Dblqh::nextScanConfCopyLab(Signal* 
 void Dblqh::execTRANSID_AI(Signal* signal) 
 {
   jamEntry();
+  /* TransID_AI received from local TUP, data is linear inline in 
+   * signal buff 
+   */
   tcConnectptr.i = signal->theData[0];
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  Uint32 length = signal->length() - 3;
+  Uint32 length = signal->length() - TransIdAI::HeaderLength;
   ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::COPY_TUPKEY);
-  Uint32 * src = &signal->theData[3];
-  while(length > 22){
-    if (saveTupattrbuf(signal, src, 22) == ZOK) {
-      ;
-    } else {
-      jam();
-      tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
-      return;
-    }//if
-    src += 22;
-    length -= 22;
-  }
-  if (saveTupattrbuf(signal, src, length) == ZOK) {
-    return;
+  Uint32 * src = &signal->theData[ TransIdAI::HeaderLength ];
+  bool ok= appendToSection(tcConnectptr.p->attrInfoIVal,
+                           src,
+                           length);
+  if (unlikely(! ok))
+  {
+    jam();
+    tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
   }
-  jam();
-  tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
 }//Dblqh::execTRANSID_AI()
 
 /*--------------------------------------------------------------------------*/
@@ -10907,13 +11177,15 @@ void Dblqh::copyTupkeyConfLab(Signal* si
   tcConnectptr.p->totSendlenAi = readLength;
   tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
 
-  // Read primary keys (used to get here via scan keyinfo)
+  /* Read primary keys from TUP into signal buffer space
+   * (used to get here via scan keyinfo)
+   */
   Uint32* tmp = signal->getDataPtrSend()+24;
   Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
   
   tcConP->gci_hi = tmp[len];
   tcConP->gci_lo = 0;
-  // Calculate hash (no need to linearies key)
+  // Calculate hash (no need to linearise key)
   if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
   {
     tcConnectptr.p->hashValue = calculateHash(tableId, tmp);
@@ -10923,10 +11195,21 @@ void Dblqh::copyTupkeyConfLab(Signal* si
     tcConnectptr.p->hashValue = md5_hash((Uint64*)tmp, len);
   }
 
-  // Move into databuffer to make packLqhkeyreqLab happy
-  memcpy(tcConP->tupkeyData, tmp, 4*4);
-  if(len > 4)
-    keyinfoLab(tmp+4, tmp + len);
+  // Copy keyinfo into long section for LQHKEYREQ below
+  if (unlikely(!keyinfoLab(tmp, len)))
+  {
+    /* Failed to store keyInfo, fail copy 
+     * This will result in a COPY_FRAGREF being sent to
+     * the starting node, which will cause it to fail
+     */
+    scanptr.p->scanErrorCounter++;
+    tcConP->errorCode= ZGET_DATAREC_ERROR;
+    scanptr.p->scanCompletedStatus= ZTRUE;
+
+    closeCopyLab(signal);
+    return;
+  }
+
   LqhKeyReq::setKeyLen(tcConP->reqinfo, len);
 
 /*---------------------------------------------------------------------------*/
@@ -16571,7 +16854,7 @@ void Dblqh::aiStateErrorCheckLab(Signal*
 /*       ACTIVE CREATION TO FALSE. THIS WILL ENSURE THAT THE ABORT IS      */
 /*       COMPLETED.                                                        */
 /*************************************************************************>*/
-      if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
+      if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
         jam();
         if (tcConnectptr.p->transactionState == 
             TcConnectionrec::WAIT_AI_AFTER_ABORT) {
@@ -17541,6 +17824,9 @@ void Dblqh::initialiseTcrec(Signal* sign
       tcConnectptr.p->lastAttrinbuf = RNIL;
       tcConnectptr.p->firstTupkeybuf = RNIL;
       tcConnectptr.p->lastTupkeybuf = RNIL;
+      tcConnectptr.p->keyInfoIVal = RNIL;
+      tcConnectptr.p->attrInfoIVal = RNIL;
+      tcConnectptr.p->m_flags= 0;
       tcConnectptr.p->tcTimer = 0;
       tcConnectptr.p->nextTcConnectrec = tcConnectptr.i + 1;
     }//for
@@ -18007,27 +18293,13 @@ MPR_LOOP:
 void Dblqh::readAttrinfo(Signal* signal) 
 {
   Uint32 remainingLen = tcConnectptr.p->totSendlenAi;
+  tcConnectptr.p->reclenAiLqhkey = 0;
   if (remainingLen == 0) {
     jam();
-    tcConnectptr.p->reclenAiLqhkey = 0;
     return;
   }//if
-  Uint32 dataLen = remainingLen;
-  if (remainingLen > 5)
-    dataLen = 5;
-  readLogData(signal, dataLen, &tcConnectptr.p->firstAttrinfo[0]);
-  tcConnectptr.p->reclenAiLqhkey = dataLen;
-  remainingLen -= dataLen;
-  while (remainingLen > 0) {
-    jam();
-    dataLen = remainingLen;
-    if (remainingLen > 22)
-      dataLen = 22;
-    seizeAttrinbuf(signal);
-    readLogData(signal, dataLen, &attrinbufptr.p->attrbuf[0]);
-    attrinbufptr.p->attrbuf[ZINBUF_DATA_LEN] = dataLen;
-    remainingLen -= dataLen;
-  }//while
+
+  readLogData(signal, remainingLen, tcConnectptr.p->attrInfoIVal);
 }//Dblqh::readAttrinfo()
 
 /* ------------------------------------------------------------------------- */
@@ -18201,20 +18473,8 @@ void Dblqh::readKey(Signal* signal) 
 {
   Uint32 remainingLen = tcConnectptr.p->primKeyLen;
   ndbrequire(remainingLen != 0);
-  Uint32 dataLen = remainingLen;
-  if (remainingLen > 4)
-    dataLen = 4;
-  readLogData(signal, dataLen, &tcConnectptr.p->tupkeyData[0]);
-  remainingLen -= dataLen;
-  while (remainingLen > 0) {
-    jam();
-    seizeTupkeybuf(signal);
-    dataLen = remainingLen;
-    if (dataLen > 4)
-      dataLen = 4;
-    readLogData(signal, dataLen, &databufptr.p->data[0]);
-    remainingLen -= dataLen;
-  }//while
+
+  readLogData(signal, remainingLen, tcConnectptr.p->keyInfoIVal);
 }//Dblqh::readKey()
 
 /* ------------------------------------------------------------------------- */
@@ -18222,15 +18482,25 @@ void Dblqh::readKey(Signal* signal) 
 /*                                                                           */
 /*       SUBROUTINE SHORT NAME = RLD                                         */
 /* --------------------------------------------------------------------------*/
-void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr) 
+void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal) 
 {
-  ndbrequire(noOfWords < 32);
   Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
   if ((logPos + noOfWords) >= ZPAGE_SIZE) {
     for (Uint32 i = 0; i < noOfWords; i++)
-      dataPtr[i] = readLogwordExec(signal);
+    {
+      /* Todo : Consider reading > 1 word at a time */
+      Uint32 word= readLogwordExec(signal);
+      bool ok= appendToSection(sectionIVal,
+                               &word,
+                               1);
+      ndbrequire(ok);
+    }
   } else {
-    MEMCOPY_NO_WORDS(dataPtr, &logPagePtr.p->logPageWord[logPos], noOfWords);
+    /* In one bite */
+    bool ok= appendToSection(sectionIVal,
+                             &logPagePtr.p->logPageWord[logPos],
+                             noOfWords);
+    ndbrequire(ok);
     logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + noOfWords;
   }//if
 }//Dblqh::readLogData()
@@ -18927,6 +19197,54 @@ void Dblqh::writeLogWord(Signal* signal,
 }//Dblqh::writeLogWord()
 
 /* --------------------------------------------------------------------------
+ * -------   WRITE MULTIPLE WORDS INTO THE LOG, CHECK FOR NEW PAGES   ------- 
+ * 
+ * ------------------------------------------------------------------------- */
+
+void Dblqh::writeLogWords(Signal* signal, const Uint32* data, Uint32 len)
+{
+  Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+  ndbrequire(logPos < ZPAGE_SIZE);
+  Uint32 wordsThisPage= ZPAGE_SIZE - logPos;
+
+  while (len >= wordsThisPage)
+  {
+    /* Fill rest of the log page */
+    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+                     data,
+                     wordsThisPage);
+    logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = ZPAGE_SIZE;
+    data+= wordsThisPage;
+    len-= wordsThisPage;
+    
+    /* Mark page completed and get a new one */
+    jam();
+    completedLogPage(signal, ZNORMAL, __LINE__);
+    seizeLogpage(signal);
+    initLogpage(signal);
+    logFilePtr.p->currentLogpage = logPagePtr.i;
+    logFilePtr.p->currentFilepage++;
+    
+    logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+    ndbrequire(logPos < ZPAGE_SIZE);
+    wordsThisPage= ZPAGE_SIZE - logPos;
+  }
+  
+  if (len > 0)
+  {
+    /* No need to worry about next page */
+    ndbassert( len < wordsThisPage );
+    /* Write partial log page */
+    MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+                     data,
+                     len);
+    logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + len;
+  }
+
+  ndbassert( logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] < ZPAGE_SIZE );
+}
+
+/* --------------------------------------------------------------------------
  * -------         WRITE A NEXT LOG RECORD AND CHANGE TO NEXT MBYTE   ------- 
  *
  *       SUBROUTINE SHORT NAME:  WNL
@@ -19686,11 +20004,11 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
 	   << endl;
     ndbout << " transid0 = " << hex << tcRec.p->transid[0]
 	   << " transid1 = " << hex << tcRec.p->transid[1]
-	   << " tupkeyData0 = " << tcRec.p->tupkeyData[0]
-	   << " tupkeyData1 = " << tcRec.p->tupkeyData[1]
+	   << " key[0] = " << getKeyInfoWordOrZero(tcRec.p, 0)
+	   << " key[1] = " << getKeyInfoWordOrZero(tcRec.p, 1)
 	   << endl;
-    ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
-	   << " tupkeyData3 = " << tcRec.p->tupkeyData[3]
+    ndbout << " key[2] = " << getKeyInfoWordOrZero(tcRec.p, 2)
+	   << " key[3] = " << getKeyInfoWordOrZero(tcRec.p, 3)
 	   << " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt
 	   << endl;
     switch (tcRec.p->transactionState) {
@@ -19894,7 +20212,6 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
   if (arg == 2352 && signal->getLength() == 2)
   {
     jam();
-    Uint32 i;
     Uint32 opNo = signal->theData[1];
     TcConnectionrecPtr tcRec;
     if (opNo < ttcConnectrecFileSize)
@@ -19903,24 +20220,16 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
       tcRec.i = opNo;
       ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
 
-      Uint32 keyLen = tcRec.p->primKeyLen;
       BaseString key;
-      for(i = 0; i<keyLen && i < 4; i++)
+      if (tcRec.p->keyInfoIVal != RNIL)
       {
-	jam();
-	key.appfmt("0x%x ", tcRec.p->tupkeyData[i]);
-      }
-      
-      if (keyLen > 4)
-      {
-	jam();
-	tcConnectptr = tcRec;
-	sendKeyinfoAcc(signal, 4);
-	for (i = 4; i<keyLen; i++)
-	{
-	  jam();
-	  key.appfmt("0x%x ", signal->theData[i]);
-	}
+        jam();
+        SectionReader keyInfoReader(tcRec.p->keyInfoIVal,
+                                    g_sectionSegmentPool);
+        
+        Uint32 keyWord;
+        while (keyInfoReader.getWord(&keyWord))
+          key.appfmt("0x%x ", keyWord);
       }
       
       char buf[100];
@@ -20097,18 +20406,14 @@ Dblqh::TRACE_OP_DUMP(const Dblqh::TcConn
   
   {
     (* traceopout) << "key=[" << hex;
-    Uint32 i;
-    for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
-      (* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
-    }
-    
-    DatabufPtr regDatabufptr;
-    regDatabufptr.i = regTcPtr->firstTupkeybuf;
-    while(i < regTcPtr->primKeyLen)
+    if (regTcPtr->keyInfoIVal != RNIL)
     {
-      ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
-      for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
-	(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
+      SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+                                  g_sectionSegmentPool);
+      
+      Uint32 keyWord;
+      while (keyInfoReader.getWord(&keyWord))
+        (* traceopout) << hex << keyWord << " ";
     }
     (* traceopout) << "] ";
   }

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-08-07 11:52:50 +0000
@@ -1975,7 +1975,6 @@ void Dbtc::tckeyreq020Lab(Signal* signal
                         &signal->theData[KeyInfo::HeaderLength],
                         wordsInSignal))
   {
-    // TODO : Consider error insert
     jam();
     appendToSectionErrorLab(signal);
     return;
@@ -2991,8 +2990,6 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
     bool ok= appendToSection(regCachePtr->keyInfoSectionI,
                              &TOptionalDataPtr[TkeyIndex],
                              keyInfoInTCKeyReq);
-    
-    // TODO : Consider error insert
     if (!ok)
     {
       jam();
@@ -3007,8 +3004,6 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
       ok= appendToSection(regCachePtr->attrInfoSectionI,
                           &TOptionalDataPtr[TAIDataIndex],
                           titcLenAiInTckeyreq);
-
-      // TODO : Consider error insert
       if (!ok)
       {
         jam();
@@ -3532,13 +3527,27 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
     reorg = 2;
   }
 
-  /* Todo : Determine whether to use a long Lqh key req based on the
-   * version of the target Lqh node
+  Uint32 inlineKeyLen= 0;
+  Uint32 inlineAttrLen= 0;
+
+  /* We normally send long LQHKEYREQ unless the
+   * destination cannot handle it or we are 
+   * testing
    */
-  regCachePtr->useLongLqhKeyReq= 0;
+  if (unlikely((version < NDBD_LONG_LQHKEYREQ) ||
+               ERROR_INSERTED(8069)))
+  {
+    /* Short LQHKEYREQ, with some key/attr data inline */
+    regCachePtr->useLongLqhKeyReq= 0;
+    inlineKeyLen= regCachePtr->keylen;
+    inlineAttrLen= regCachePtr->attrlength;
+  }
+  else
+    /* Long LQHKEYREQ, with key/attr data in long sections */
+    regCachePtr->useLongLqhKeyReq= 1;
 
   tslrAttrLen = 0;
-  LqhKeyReq::setAttrLen(tslrAttrLen, regCachePtr->attrlength);
+  LqhKeyReq::setAttrLen(tslrAttrLen, inlineAttrLen);
   /* ---------------------------------------------------------------------- */
   // Bit16 == 0 since StoredProcedures are not yet supported.
   /* ---------------------------------------------------------------------- */
@@ -3551,7 +3560,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
   sig1 = regTcPtr->operation;
   sig2 = regTcPtr->dirtyOp;
   bool dirtyRead = (sig1 == ZREAD && sig2 == ZTRUE);
-  LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
+  LqhKeyReq::setKeyLen(Tdata10, inlineKeyLen);
   LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
   if (unlikely(version < NDBD_ROWID_VERSION))
   {
@@ -3647,9 +3656,42 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
     nextPos++;
   }//if
 
+  // Reset trigger count
+  regTcPtr->accumulatingTriggerData.i = RNIL;  
+  regTcPtr->accumulatingTriggerData.p = NULL;  
+  regTcPtr->noFiredTriggers = 0;
+  regTcPtr->triggerExecutionCount = 0;
+
   if (regCachePtr->useLongLqhKeyReq)
   {
-    ndbassert(false); // TODO
+    /* Build long LQHKeyReq using Key + AttrInfo sections */
+    SectionHandle handle(this);
+    SegmentedSectionPtr keyInfoSection;
+    
+    getSection(keyInfoSection, regCachePtr->keyInfoSectionI);
+
+    handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+    handle.m_cnt= 1;
+
+    if (regCachePtr->attrlength != 0)
+    {
+      SegmentedSectionPtr attrInfoSection;
+
+      ndbassert(regCachePtr->attrInfoSectionI != RNIL);
+      getSection(attrInfoSection, regCachePtr->attrInfoSectionI);
+      
+      handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+      handle.m_cnt= 2;
+    }
+    
+    sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB, 
+               &handle);
+
+    /* Long sections were freed as part of sendSignal */
+    ndbassert( handle.m_cnt == 0 );
+    regCachePtr->keyInfoSectionI= RNIL;
+    regCachePtr->attrInfoSectionI= RNIL;
   }
   else
   {
@@ -3685,17 +3727,10 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
       
       nextPos+= aiInLqhKeyReq;
     }
-  }
 
-  // Reset trigger count
-  regTcPtr->accumulatingTriggerData.i = RNIL;  
-  regTcPtr->accumulatingTriggerData.p = NULL;  
-  regTcPtr->noFiredTriggers = 0;
-  regTcPtr->triggerExecutionCount = 0;
-
-  // TODO : Long LqhKeyReq variant
-  sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
-             nextPos + LqhKeyReq::FixedSignalLength, JBB);
+    sendSignal(TBRef, GSN_LQHKEYREQ, signal, 
+               nextPos + LqhKeyReq::FixedSignalLength, JBB);
+  }
 }//Dbtc::sendlqhkeyreq()
 
 void Dbtc::packLqhkeyreq040Lab(Signal* signal,
@@ -3996,7 +4031,7 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal
   DEBUG("SignalDroppedRep received for GSN " << originalGSN);
 
   // TODO : Add handling for long signal variants as they
-  //        are added here (TCINDXREQ, SCANTABREQ etc.)
+  //        are added here (SCANTABREQ etc.)
 
   switch(originalGSN) {
   case GSN_TCKEYREQ:

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-08-07 11:52:50 +0000
@@ -784,15 +784,16 @@ struct Operationrec {
 
   Uint32 m_undo_buffer_space; // In words
   union {
-    Uint32 firstAttrinbufrec; //Used until copyAttrinfo
+    Uint32 firstAttrinbufrec; //Used until copyAttrinfo TODO Remove
   };
+
   Uint32 m_any_value;
   union {
-    Uint32 lastAttrinbufrec; //Used until copyAttrinfo
+    Uint32 lastAttrinbufrec; //Used until copyAttrinfo TODO Remove
     Uint32 nextPool;
   };
-  Uint32 attrinbufLen; //only used during STORED_PROCDEF phase
-  Uint32 storedProcPtr; //only used during STORED_PROCDEF phase
+  Uint32 attrinbufLen; //only used during STORED_PROCDEF phase TODO Remove
+  Uint32 storedProcPtr; //only used during STORED_PROCDEF phase TODO Remove
   
   /*
    * From fragment i-value we can find fragment and table record
@@ -2055,9 +2056,9 @@ private:
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  void sendLogAttrinfo(Signal* signal,
-                       Uint32 TlogSize,
-                       Operationrec * regOperPtr);
+  int sendLogAttrinfo(Signal* signal,
+                      Uint32 TlogSize,
+                      Operationrec * regOperPtr);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
@@ -2485,7 +2486,8 @@ private:
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
-  void copyAttrinfo(Operationrec * regOperPtr, Uint32*  inBuffer);
+  void copyAttrinfo(Operationrec * regOperPtr, Uint32*  inBuffer, 
+                    Uint32 expectedLen, Uint32 attrInfoIVal);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-08-07 12:32:06 +0000
@@ -80,8 +80,37 @@ int Dbtup::initStoredOperationrec(Operat
 }
 
 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
-                         Uint32* inBuffer)
+                         Uint32* inBuffer,
+                         Uint32 expectedLen,
+                         Uint32 attrInfoIVal)
 {
+  if (attrInfoIVal != RNIL)
+  {
+    /* AttrInfo is in segmented section */
+    ndbassert(regOperPtr->firstAttrinbufrec == RNIL);
+    ndbassert(regOperPtr->lastAttrinbufrec == RNIL);
+
+    // TODO : Add support for Stored procedure attrinfo
+
+    /* Check length */
+    SegmentedSectionPtr sectionPtr;
+    getSection(sectionPtr, attrInfoIVal);
+
+    ndbrequire(sectionPtr.sz == expectedLen);
+    ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
+
+    /* Copy attrInfo data into linear buffer */
+    copy(inBuffer, attrInfoIVal);
+    
+    regOperPtr->storedProcedureId= RNIL;
+    regOperPtr->m_any_value= 0;
+    
+    return;
+  }
+
+  /* Todo : Unify with code above when scan processing
+   * goes long
+   */
   AttrbufrecPtr copyAttrBufPtr;
   Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
   int RbufLen;
@@ -696,6 +725,17 @@ void Dbtup::execTUPKEYREQ(Signal* signal
 
    req_struct.m_row_id.m_page_no = sig1;
    req_struct.m_row_id.m_page_idx = sig2;
+
+   /* Get AttrInfo section if this is a long TUPKEYREQ */
+   Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
+
+   /* If we have AttrInfo, check we expected it, and
+    * that we don't have AttrInfo by another means
+    */
+   ndbassert( (attrInfoIVal == RNIL) ||  
+              (tupKeyReq->attrBufLen > 0));
+   ndbassert( (attrInfoIVal == RNIL) || 
+              (regOperPtr->firstAttrinbufrec == RNIL ));
    
    Uint32 Roptype = regOperPtr->op_struct.op_type;
 
@@ -705,7 +745,10 @@ void Dbtup::execTUPKEYREQ(Signal* signal
 				       Rstoredid) == ZOK);
    }
 
-   copyAttrinfo(regOperPtr, &cinBuffer[0]);
+   copyAttrinfo(regOperPtr, 
+                &cinBuffer[0], 
+                req_struct.attrinfo_len,
+                attrInfoIVal);
    
    Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
    if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
@@ -1921,7 +1964,11 @@ int Dbtup::interpreterStartLab(Signal* s
 
   Uint32 RattroutCounter= 0;
   Uint32 RinstructionCounter= 5;
-  Uint32 RlogSize= 0;
+
+  /* All information to be logged/propagated to replicas
+   * is generated from here on so reset the log word count
+   */
+  Uint32 RlogSize= req_struct->log_size= 0;
   if (((RtotalLen + 5) == RattrinbufLen) &&
       (RattrinbufLen >= 5) &&
       (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
@@ -2027,11 +2074,16 @@ int Dbtup::interpreterStartLab(Signal* s
 	return -1;
       }
     }
-    req_struct->log_size= RlogSize;
+    /* Add log words explicitly generated here to existing log size
+     *  - readAttributes can generate log for ANYVALUE column
+     *    It adds the words directly to req_struct->log_size
+     *    This is used for ANYVALUE and interpreted delete.
+     */
+    req_struct->log_size+= RlogSize;
     req_struct->read_length= RattroutCounter;
     sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
     if (RlogSize > 0) {
-      sendLogAttrinfo(signal, RlogSize, regOperPtr);
+      return sendLogAttrinfo(signal, RlogSize, regOperPtr);
     }
     return 0;
   } else {
@@ -2047,25 +2099,41 @@ int Dbtup::interpreterStartLab(Signal* s
 /*               TLOG_START              FIRST INDEX TO LOG         */
 /*               TLOG_END                LAST INDEX + 1 TO LOG      */
 /* ---------------------------------------------------------------- */
-void Dbtup::sendLogAttrinfo(Signal* signal,
-                            Uint32 TlogSize,
-                            Operationrec *  const regOperPtr)
+int Dbtup::sendLogAttrinfo(Signal* signal,
+                           Uint32 TlogSize,
+                           Operationrec *  const regOperPtr)
 
 {
-  Uint32 TbufferIndex= 0;
+  /* Copy from Log buffer to segmented section,
+   * then attach to ATTRINFO and execute direct
+   * to LQH
+   */
+  ndbrequire( TlogSize > 0 );
+  Uint32 longSectionIVal= RNIL;
+  bool ok= appendToSection(longSectionIVal, 
+                           &clogMemBuffer[0],
+                           TlogSize);
+  if (unlikely(!ok))
+  {
+    /* Resource error, abort transaction */
+    terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
+    tupkeyErrorLab(signal);
+    return -1;
+  }
+  
+  /* Send a TUP_ATTRINFO signal to LQH, which contains
+   * the relevant user pointer and the attrinfo section's
+   * IVAL
+   */
   signal->theData[0]= regOperPtr->userpointer;
-  while (TlogSize > 22) {
-    MEMCOPY_NO_WORDS(&signal->theData[3],
-                     &clogMemBuffer[TbufferIndex],
-                     22);
-    EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
-    TbufferIndex += 22;
-    TlogSize -= 22;
-  }
-  MEMCOPY_NO_WORDS(&signal->theData[3],
-                   &clogMemBuffer[TbufferIndex],
-                   TlogSize);
-  EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
+  signal->theData[1]= TlogSize;
+  signal->theData[2]= longSectionIVal;
+
+  EXECUTE_DIRECT(DBLQH, 
+                 GSN_TUP_ATTRINFO, 
+                 signal, 
+                 3);
+  return 0;
 }
 
 inline

=== modified file 'storage/ndb/src/kernel/vm/LongSignal.hpp'
--- a/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal.hpp	2008-08-07 11:52:50 +0000
@@ -52,6 +52,7 @@ extern SectionSegmentPool g_sectionSegme
 void print(SegmentedSectionPtr ptr, FILE* out);
 void copy(SegmentedSectionPtr dst, Uint32 * src, Uint32 len);
 void copy(Uint32 * dst, SegmentedSectionPtr src);
+void copy(Uint32 * dst, Uint32 srcFirstIVal);
 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
 /* appendToSection : If firstSegmentIVal == RNIL, import */
 bool appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);

=== modified file 'storage/ndb/src/kernel/vm/SectionReader.cpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.cpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.cpp	2008-08-07 11:52:50 +0000
@@ -41,6 +41,17 @@ SectionReader::SectionReader
   }
 }
 
+SectionReader::SectionReader
+(Uint32 firstSectionIVal, class SectionSegmentPool& pool)
+  : m_pool(pool)
+{
+  SectionSegment* firstSeg= m_pool.getPtr(firstSectionIVal);
+  
+  m_pos = 0;
+  m_len = firstSeg->m_sz;
+  m_head = m_currentSegment = firstSeg;
+}
+
 void
 SectionReader::reset(){
   m_pos = 0;
@@ -137,7 +148,63 @@ SectionReader::getWords(Uint32 * dst, Ui
   memcpy(dst, &p->theData[ind], 4 * len);
 
   m_pos += len;
-  m_currentSegment = p;
+
+  /* If we read everything in the current
+   * segment, and there's another segment,
+   * move onto it for next time
+   */
+  if (unlikely((len == left) &&
+               (p->m_nextSegment != RNIL)))
+    p= m_pool.getPtr(p->m_nextSegment);
+  else
+    m_currentSegment = p;
+  
   return true;
 }
 
+bool
+SectionReader::getWordsPtr(Uint32 maxLen,
+                           const Uint32*& readPtr,
+                           Uint32& actualLen)
+{
+  if(m_pos >= m_len)
+    return false;
+
+  /* We return a pointer to the current position,
+   * with length the minimum of
+   *  - significant words remaining in the whole section
+   *  - space remaining in the current segment
+   *  - maxLen from caller
+   */
+  const Uint32 sectionRemain= m_len - m_pos;
+  const Uint32 startInd = (m_pos % SectionSegment::DataLength);
+  const Uint32 segmentSpace = SectionSegment::DataLength - startInd;
+  SectionSegment * p = m_currentSegment;
+
+  const Uint32 remain= MIN(sectionRemain, segmentSpace);
+  actualLen= MIN(remain, maxLen);
+  readPtr= &p->theData[startInd];
+
+  /* If we've read everything in this segment, and
+   * there's another one, move onto it ready for 
+   * next time
+   */
+  if (((startInd + actualLen) == SectionSegment::DataLength) &&
+      (p->m_nextSegment != RNIL))
+    m_currentSegment= m_pool.getPtr(p->m_nextSegment);
+
+  m_pos += actualLen;
+  return true;
+};
+
+bool
+SectionReader::getWordsPtr(const Uint32*& readPtr,
+                           Uint32& actualLen)
+{
+  /* Cannot have more than SectionSegment::DataLength
+   * contiguous words
+   */
+  return getWordsPtr(SectionSegment::DataLength,
+                     readPtr,
+                     actualLen);
+}

=== modified file 'storage/ndb/src/kernel/vm/SectionReader.hpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.hpp	2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.hpp	2008-08-07 11:52:50 +0000
@@ -22,15 +22,37 @@ class SectionReader {
 public:
   SectionReader(struct SegmentedSectionPtr &,
 		class SectionSegmentPool &);
+  SectionReader(Uint32 firstSectionIVal,
+                class SectionSegmentPool &);
 
+  /* reset : Set SectionReader to start of section */
   void reset();
+  /* step : Step over given number of words */
   bool step(Uint32 len);
+  /* getWord : Copy one word to dst + move forward */
   bool getWord(Uint32 * dst);
+  /* peekWord : Copy one word to dst */
   bool peekWord(Uint32 * dst) const ;
+  /* peekWords : Copy len words to dst */
   bool peekWords(Uint32 * dst, Uint32 len) const;
+  /* getSize : Get total size of section */
   Uint32 getSize() const;
+  /* getWords : Copy len words to dst + move forward */
   bool getWords(Uint32 * dst, Uint32 len);
 
+  /* getWordsPtr : Get const ptr to next contiguous
+   *               block of words
+   * In success case will return at least 1 word
+   */
+  bool getWordsPtr(const Uint32*& readPtr,
+                   Uint32& actualLen);
+  /* getWordsPtr : Get const ptr to at most maxLen words
+   * In success case will return at least 1 word
+   */
+  bool getWordsPtr(Uint32 maxLen,
+                   const Uint32*& readPtr,
+                   Uint32& actualLen);
+
 private:
   Uint32 m_pos;
   Uint32 m_len;

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	2008-08-07 11:52:50 +0000
@@ -142,6 +142,16 @@ copy(Uint32 * dst, SegmentedSectionPtr s
   copy(dst, g_sectionSegmentPool, src);
 }
 
+/* Copy variant which takes an IVal */
+void
+copy(Uint32* dst, Uint32 srcFirstIVal)
+{
+  SegmentedSectionPtr p;
+  getSection(p, srcFirstIVal);
+
+  copy(dst, p);
+} 
+
 /* Calculate number of segments to release based on section size
  * Always release one segment, even if size is zero
  */

=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-08-05 10:52:53 +0000
@@ -942,6 +942,28 @@ NdbOperation::buildSignalsNdbRecord(Uint
     }
   }
 
+  if (m_use_any_value && 
+      (tOpType == DeleteRequest))
+  {
+    /* Special hack for delete and ANYVALUE pseudo-column
+     * We want to be able set the ANYVALUE pseudo-column as
+     * part of a delete, but deletes don't allow updates
+     * So we perform a 'read' of the column, passing a value.
+     * Code in TUP which handles this 'read' will set the
+     * value when the read is processed.
+     */
+    res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+                                     AttributeHeader::ANY_VALUE, 4,
+                                     &attrInfoPtr, &remain);
+    if(res)
+      return res;
+    res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                      (const char *)(&m_any_value), 4,
+                                      &attrInfoPtr, &remain);
+    if(res)
+      return res;
+  }
+
   /* Interpreted program main signal words */
   if (code)
   {
@@ -1126,10 +1148,9 @@ NdbOperation::buildSignalsNdbRecord(Uint
 
   if ((tOpType == InsertRequest) ||
       (tOpType == WriteRequest) ||
-      (tOpType == UpdateRequest) ||
-      (tOpType == DeleteRequest))
+      (tOpType == UpdateRequest))
   {
-    /* Handle any setAnyValue(). */
+    /* Handle setAnyValue() for all cases except delete */
     if (m_use_any_value)
     {
       res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,

=== modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp	2008-08-06 15:46:41 +0000
@@ -75,7 +75,11 @@ public:
     m_code= code;
     m_associated_op= NULL;
     
-    m_error.code = 0;
+    if (code == NULL)
+      /* NdbInterpretedCode not supported for operation type */
+      m_error.code = 4539;
+    else
+      m_error.code = 0;
   };
 
   /* This method propagates an error code from NdbInterpretedCode
@@ -136,13 +140,22 @@ NdbScanFilter::NdbScanFilter(class NdbOp
 {
   DBUG_ENTER("NdbScanFilter::NdbScanFilter(NdbOperation)");
   
-  /* We ask the NdbScanOperation to allocate an InterpretedCode
-   * object for us.  It will look after freeing it when 
-   * necessary.  This allows the InterpretedCode object to 
-   * survive after the NdbScanFilter has gone out of scope
+  NdbInterpretedCode* code= NULL;
+  NdbOperation::Type opType= op->getType();
+
+  /* If the operation is not of the correct type then
+   * m_impl.init() will set an error on the scan filter
    */
-  NdbInterpretedCode* code= 
-    ((NdbScanOperation *)op)->allocInterpretedCodeOldApi();
+  if (likely((opType == NdbOperation::TableScan) || 
+             (opType == NdbOperation::OrderedIndexScan)))
+  {    
+    /* We ask the NdbScanOperation to allocate an InterpretedCode
+     * object for us.  It will look after freeing it when 
+     * necessary.  This allows the InterpretedCode object to 
+     * survive after the NdbScanFilter has gone out of scope
+     */
+    code= ((NdbScanOperation *)op)->allocInterpretedCodeOldApi();
+  }
 
   m_impl.init(code);
 
@@ -158,6 +171,8 @@ NdbScanFilter::~NdbScanFilter()
 
 int
 NdbScanFilter::begin(Group group){
+  if (m_impl.m_error.code != 0) return -1;
+
   if (m_impl.m_stack2.push_back(m_impl.m_negative))
   {
     /* Memory allocation problem */
@@ -244,6 +259,8 @@ NdbScanFilter::begin(Group group){
 
 int
 NdbScanFilter::end(){
+  if (m_impl.m_error.code != 0) return -1;
+
   if(m_impl.m_stack2.size() == 0){
     /* Invalid set of range scan bounds */
     m_impl.m_error.code= 4259;
@@ -353,6 +370,8 @@ NdbScanFilter::end(){
 
 int
 NdbScanFilter::istrue(){
+  if(m_impl.m_error.code != 0) return -1;
+
   if(m_impl.m_current.m_group < NdbScanFilter::AND || 
      m_impl.m_current.m_group > NdbScanFilter::NOR){
     /* Operator is not defined in NdbScanFilter::Group */
@@ -373,6 +392,7 @@ NdbScanFilter::istrue(){
 
 int
 NdbScanFilter::isfalse(){
+  if (m_impl.m_error.code != 0) return -1;
   if(m_impl.m_current.m_group < NdbScanFilter::AND || 
      m_impl.m_current.m_group > NdbScanFilter::NOR){
     /* Operator is not defined in NdbScanFilter::Group */
@@ -436,6 +456,8 @@ const int tab2_sz = sizeof(table2)/sizeo
 int
 NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){
   
+  if (m_error.code != 0) return -1;
+
   if(op < 0 || op >= tab2_sz){
     /* Condition is out of bounds */
     m_error.code= 4262;
@@ -458,6 +480,8 @@ NdbScanFilterImpl::cond_col(Interpreter:
 
 int
 NdbScanFilter::isnull(int AttrId){
+  if (m_impl.m_error.code != 0) return -1;
+
   if(m_impl.m_negative == 1)
     return m_impl.cond_col(Interpreter::IS_NOT_NULL, AttrId);
   else
@@ -466,6 +490,8 @@ NdbScanFilter::isnull(int AttrId){
 
 int
 NdbScanFilter::isnotnull(int AttrId){
+  if (m_impl.m_error.code != 0) return -1;
+
   if(m_impl.m_negative == 1)
     return m_impl.cond_col(Interpreter::IS_NULL, AttrId);
   else
@@ -568,6 +594,8 @@ int
 NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op, 
 				  Uint32 AttrId, 
 				  const void * value, Uint32 len){
+  if (m_error.code != 0) return -1;
+
   if(op < 0 || op >= tab3_sz){
     /* Condition is out of bounds */
     m_error.code= 4262;

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-08-06 15:46:41 +0000
@@ -53,7 +53,7 @@ NdbScanOperation::NdbScanOperation(Ndb* 
   theSCAN_TABREQ = 0;
   m_executed = false;
   m_scan_buffer= NULL;
-  m_scanUsingOldApi= false;
+  m_scanUsingOldApi= true;
   m_interpretedCodeOldApi= NULL;
 }
 
@@ -119,7 +119,7 @@ NdbScanOperation::init(const NdbTableImp
   m_descending= false;
   m_read_range_no = 0;
   m_executed = false;
-  m_scanUsingOldApi= false;
+  m_scanUsingOldApi= true;
   m_interpretedCodeOldApi= NULL;
 
   m_api_receivers_count = 0;
@@ -937,16 +937,14 @@ NdbScanOperation::readTuples(NdbScanOper
                              Uint32 parallel,
                              Uint32 batch)
 {
-  // It is only possible to call readTuples if 
-  //  1. the scan transaction doesn't already  contain another scan operation
-  //  2. We have not already defined an old Api scan operation.
-  if (theNdbCon->theScanningOp != NULL ||
-      m_scanUsingOldApi ){
+  // It is only possible to call readTuples if the scan transaction 
+  // doesn't already contain a scan operation
+  if (theNdbCon->theScanningOp != NULL)
+  {
     setErrorCode(4605);
     return -1;
   }
-
-  m_scanUsingOldApi= true;
+  
   /* Save parameters for later */
   m_savedLockModeOldApi= lm;
   m_savedScanFlagsOldApi= scan_flags;

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2008-06-03 16:18:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2008-08-06 15:39:54 +0000
@@ -2583,6 +2583,8 @@ NdbTransaction::scanTable(const NdbRecor
     DBUG_RETURN(NULL);
   }
 
+  op_idx->m_scanUsingOldApi= false;
+
   /* The real work is done in NdbScanOperation */
   if (op_idx->scanTableImpl(result_record,
                             lock_mode,
@@ -2622,6 +2624,8 @@ NdbTransaction::scanIndex(const NdbRecor
     return NULL;
   }
 
+  op->m_scanUsingOldApi= false;
+
   /* Defer the rest of the work to NdbIndexScanOperation */
   if (op->scanIndexImpl(key_record,
                         result_record,

=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp	2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/testDict.cpp	2008-08-07 12:32:06 +0000
@@ -57,6 +57,13 @@ int runCreateInvalidTables(NDBT_Context*
   int result = NDBT_OK;
 
   char failTabName[256];
+  
+  const int expectedDictErrors[6]= {720, 
+                                    4317, 
+                                    737, 
+                                    739, 
+                                    736, 
+                                    740 };
 
   for (int i = 0; i < 10; i++){
     BaseString::snprintf(failTabName, 256, "F%d", i);
@@ -71,6 +78,19 @@ int runCreateInvalidTables(NDBT_Context*
         result = NDBT_FAILED;
       }
 
+      // Ensure any error is roughly as expected
+      int errorCode=pNdb->getDictionary()->getNdbError().code;
+      bool errorOk= false;
+      for (int e=0; e < 6; e++)
+        errorOk |= (errorCode == expectedDictErrors[e]);
+
+      if (!errorOk)
+      {
+        ndbout << "Failure, got dict error : " << pNdb->getDictionary()->
+          getNdbError().code << endl;
+        return NDBT_FAILED;
+      }
+
       // Verify that table is not in db    
       const NdbDictionary::Table* pTab2 = 
 	NDBT_Table::discoverTableFromDb(pNdb, failTabName) ;

=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp	2008-08-06 15:46:41 +0000
@@ -894,6 +894,132 @@ int runMaxScanFilterSize(NDBT_Context* c
   return NDBT_OK;
 }
 
+
+int runScanFilterConstructorFail(NDBT_Context* ctx, NDBT_Step* step)
+{
+  /* We test that failures in the ScanFilter constructor can be
+   * detected by the various ScanFilter methods without
+   * issues
+   */
+  Ndb *myNdb = GETNDB(step);
+  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
+  const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
+  if(myTable == NULL) 
+    APIERROR(myDict->getNdbError());
+
+  NdbTransaction* trans=myNdb->startTransaction();
+  
+  if (trans == NULL)
+  {
+    APIERROR(trans->getNdbError());
+    return NDBT_FAILED;
+  }
+  
+  /* Create an NdbRecord scan operation */
+  const NdbScanOperation* tabScan=
+    trans->scanTable(myTable->getDefaultRecord());
+  
+  if (tabScan==NULL)
+  {
+    APIERROR(trans->getNdbError());
+    return NDBT_FAILED;
+  }
+
+  /* Now we hackily try to add a ScanFilter after the operation
+   * is defined.  This will cause a failure within the 
+   * constructor
+   */
+  NdbScanFilter brokenSf((NdbScanOperation*) tabScan);
+
+  /* Scan operation should have an error */
+  if (tabScan->getNdbError().code != 4536)
+  {
+    ndbout << "Expected error 4536, had error " << 
+      tabScan->getNdbError().code << " instead" << endl;
+    return NDBT_FAILED;
+  }
+
+  /* ScanFilter should have an error */
+  if (brokenSf.getNdbError().code != 4539)
+  {
+    ndbout  << "Expected error 4539, had error " << 
+      brokenSf.getNdbError().code << " instead" << endl;
+    return NDBT_FAILED;
+  }
+  
+  if (brokenSf.begin() != -1)
+  { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.istrue() != -1)
+  { ndbout << "Bad rc from istrue" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.isfalse() != -1)
+  { ndbout << "Bad rc from isfalse" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.isnull(0) != -1)
+  { ndbout << "Bad rc from isnull" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.isnotnull(0) != -1)
+  { ndbout << "Bad rc from isnotnull" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.cmp(NdbScanFilter::COND_EQ, 0, NULL, 0) != -1)
+  { ndbout << "Bad rc from cmp" << endl; return NDBT_FAILED; }
+
+  if (brokenSf.end() != -1)
+  { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
+
+  trans->close();
+
+  /* Now we check that we can define a ScanFilter before 
+   * calling readTuples() for a scan operation
+   */
+  trans= myNdb->startTransaction();
+  
+  if (trans == NULL)
+  {
+    APIERROR(trans->getNdbError());
+    return NDBT_FAILED;
+  }
+  
+  /* Get an old Api table scan operation */
+  NdbScanOperation* tabScanOp=
+    trans->getNdbScanOperation(myTable);
+
+  if (tabScanOp==NULL)
+  {
+    APIERROR(trans->getNdbError());
+    return NDBT_FAILED;
+  }
+
+  /* Attempt to define a ScanFilter before calling readTuples() */
+  NdbScanFilter sf(tabScanOp);
+
+  /* Should be no problem ... */
+  if (sf.getNdbError().code != 0) 
+  { APIERROR(sf.getNdbError()); return NDBT_FAILED; };
+  
+ 
+  /* Ok, now attempt to define a ScanFilter against a primary key op */
+  NdbOperation* pkOp= trans->getNdbOperation(myTable);
+
+  if (pkOp == NULL)
+  {
+    APIERROR(trans->getNdbError());
+    return NDBT_FAILED;
+  }
+
+  NdbScanFilter sf2(pkOp);
+  
+  if (sf2.getNdbError().code != 4539)
+  {
+    ndbout << "Error, expected 4539" << endl;
+    APIERROR(sf2.getNdbError());
+    return NDBT_FAILED;
+  }
+
+  return NDBT_OK;
+}
+
 NDBT_TESTSUITE(testScanFilter);
 TESTCASE(TEST_NAME, 
 	 "Scan table TABLE_NAME for the records which accord with \
@@ -903,6 +1029,7 @@ TESTCASE(TEST_NAME, 
   INITIALIZER(runPopulate);
   INITIALIZER(runScanRandomFilterTest);
   INITIALIZER(runMaxScanFilterSize);
+  INITIALIZER(runScanFilterConstructorFail);
   FINALIZER(runDropTables);
 }
 

=== modified file 'storage/ndb/test/ndbapi/test_event.cpp'
--- a/storage/ndb/test/ndbapi/test_event.cpp	2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/test_event.cpp	2008-08-05 10:52:53 +0000
@@ -23,14 +23,13 @@
 #include <NdbRestarts.hpp>
 #include <signaldata/DumpStateOrd.hpp>
 
-static int createEvent(Ndb *pNdb, 
+static int createEvent(Ndb *pNdb,
                        const NdbDictionary::Table &tab,
-                       NDBT_Context* ctx)
+                       bool merge_events,
+                       bool report)
 {
   char eventName[1024];
   sprintf(eventName,"%s_EVENT",tab.getName());
-  bool merge_events = ctx->getProperty("MergeEvents");
-  bool report = ctx->getProperty("ReportSubscribe");
 
   NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
 
@@ -87,6 +86,16 @@ static int createEvent(Ndb *pNdb, 
   return NDBT_OK;
 }
 
+static int createEvent(Ndb *pNdb, 
+                       const NdbDictionary::Table &tab,
+                       NDBT_Context* ctx)
+{
+  bool merge_events = ctx->getProperty("MergeEvents");
+  bool report = ctx->getProperty("ReportSubscribe");
+
+  return createEvent(pNdb, tab, merge_events, report);
+}
+
 static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
 {
   char eventName[1024];
@@ -2688,6 +2697,346 @@ runBug37442(NDBT_Context* ctx, NDBT_Step
   return NDBT_OK;
 }
 
+const NdbDictionary::Table* createBoringTable(const char* name, Ndb* pNdb)
+{
+  NdbDictionary::Table tab;
+
+  tab.setName(name);
+
+  NdbDictionary::Column pk;
+  pk.setName("Key");
+  pk.setType(NdbDictionary::Column::Unsigned);
+  pk.setLength(1); 
+  pk.setNullable(false);
+  pk.setPrimaryKey(true);
+  tab.addColumn(pk);
+
+  NdbDictionary::Column attr;
+  attr.setName("Attr");
+  attr.setType(NdbDictionary::Column::Unsigned);
+  attr.setLength(1);
+  attr.setNullable(true);
+  attr.setPrimaryKey(false);
+  tab.addColumn(attr);
+  
+  pNdb->getDictionary()->dropTable(tab.getName());
+  if(pNdb->getDictionary()->createTable(tab) == 0)
+  {
+    ndbout << (NDBT_Table&)tab << endl;
+    return pNdb->getDictionary()->getTable(tab.getName());
+  }
+  
+  ndbout << "Table create failed, err : " << 
+    pNdb->getDictionary()->getNdbError().code << endl;
+  
+  return NULL;
+}
+
+/* Types of operation which can be tagged via 'setAnyValue */
+enum OpTypes {Insert, Update, Write, Delete, EndOfOpTypes};
+
+/** 
+ * executeOps
+ * Generate a number of PK operations of the supplied type
+ * using the passed operation options and setting the
+ * anyValue tag
+ */
+int
+executeOps(Ndb* pNdb,
+           const NdbDictionary::Table* tab,
+           OpTypes op, 
+           Uint32 rowCount,
+           Uint32 keyOffset,
+           Uint32 anyValueOffset,
+           NdbOperation::OperationOptions opts)
+{
+  NdbTransaction* trans= pNdb->startTransaction();
+  const NdbRecord* record= tab->getDefaultRecord();
+
+  char RowBuf[16];
+  Uint32* keyPtr= (Uint32*) NdbDictionary::getValuePtr(record,
+                                                       RowBuf,
+                                                       0);
+  Uint32* attrPtr= (Uint32*) NdbDictionary::getValuePtr(record,
+                                                       RowBuf,
+                                                       1);
+
+  for (Uint32 i=keyOffset; i < (keyOffset + rowCount); i++)
+  {
+    *keyPtr= *attrPtr= i;
+    opts.optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
+    opts.anyValue= anyValueOffset + i;
+    bool allowInterpreted= 
+      (op == Update) ||
+      (op == Delete);
+
+    if (!allowInterpreted)
+      opts.optionsPresent &= 
+        ~ (Uint64) NdbOperation::OperationOptions::OO_INTERPRETED;
+
+    switch (op) {
+    case Insert : 
+      if (trans->insertTuple(record, 
+                             RowBuf, 
+                             NULL,
+                             &opts, 
+                             sizeof(opts)) == NULL)
+      {
+        g_err << "Can't create operation : " <<
+          trans->getNdbError().code << endl;
+        return NDBT_FAILED;
+      }
+      break;
+    case Update :
+      if (trans->updateTuple(record,
+                             RowBuf,
+                             record,
+                             RowBuf,
+                             NULL,
+                             &opts,
+                             sizeof(opts)) == NULL)
+      {
+        g_err << "Can't create operation : " <<
+          trans->getNdbError().code << endl;
+        return NDBT_FAILED;
+      }
+      break;
+    case Write : 
+      if (trans->writeTuple(record,
+                            RowBuf,
+                            record,
+                            RowBuf,
+                            NULL,
+                            &opts,
+                            sizeof(opts)) == NULL)
+      {
+        g_err << "Can't create operation : " <<
+          trans->getNdbError().code << endl;
+        return NDBT_FAILED;
+      }
+      break;
+    case Delete : 
+      if (trans->deleteTuple(record,
+                             RowBuf,
+                             record,
+                             NULL,
+                             NULL,
+                             &opts,
+                             sizeof(opts)) == NULL)
+      {
+        g_err << "Can't create operation : " <<
+          trans->getNdbError().code << endl;
+        return NDBT_FAILED;
+      }
+      break;
+    default:
+      g_err << "Bad operation type : " << op << endl;
+      return NDBT_FAILED;
+    }
+  }
+
+  trans->execute(Commit);
+
+  if (trans->getNdbError().code != 0)
+  {
+    g_err << "Error executing operations :" << 
+      trans->getNdbError().code << endl;
+    return NDBT_FAILED;
+  }
+  
+  trans->close();
+
+  return NDBT_OK;
+}
+
+int
+checkAnyValueInEvent(Ndb* pNdb,
+                     NdbRecAttr* preKey,
+                     NdbRecAttr* postKey,
+                     NdbRecAttr* preAttr,
+                     NdbRecAttr* postAttr,
+                     Uint32 num,
+                     Uint32 anyValueOffset,
+                     bool checkPre)
+{
+  Uint32 received= 0;
+
+  while (received < num)
+  {
+    int pollRc;
+
+    if ((pollRc= pNdb->pollEvents(10000)) < 0)
+    {
+      g_err << "Error while polling for events : " <<
+        pNdb->getNdbError().code;
+      return NDBT_FAILED;
+    }
+
+    if (pollRc == 0)
+    {
+      printf("No event, waiting...\n");
+      continue;
+    }
+
+    NdbEventOperation* event;
+    while((event= pNdb->nextEvent()) != NULL)
+    {
+//       printf("Event is %p of type %u\n",
+//              event, event->getEventType());
+//       printf("Got event, prekey is %u predata is %u \n",
+//              preKey->u_32_value(),
+//              preAttr->u_32_value());
+//       printf("           postkey is %u postdata is %u anyvalue is %u\n",
+//              postKey->u_32_value(),
+//              postAttr->u_32_value(),
+//              event->getAnyValue());
+      
+      received ++;
+      Uint32 keyVal= (checkPre? 
+                      preKey->u_32_value() :
+                      postKey->u_32_value());
+      
+      if (event->getAnyValue() != 
+          (anyValueOffset + keyVal))
+      {
+        g_err << "Error : Got event, key is " <<
+          keyVal << " anyValue is " <<
+          event->getAnyValue() <<
+          " expected " << (anyValueOffset + keyVal) 
+              << endl;
+        return NDBT_FAILED;
+      }
+    }
+  }
+
+  return NDBT_OK;
+}
+                      
+                      
+
+int
+runBug37672(NDBT_Context* ctx, NDBT_Step* step)
+{
+  /* InterpretedDelete and setAnyValue failed */
+  /* Let's create a boring, known table for this since 
+   * we don't yet have Hugo tools for NdbRecord
+   */
+  BaseString name; 
+  name.assfmt("TAB_TESTEVENT%d", rand() & 65535);
+  Ndb* pNdb= GETNDB(step);
+  
+  const NdbDictionary::Table* tab= createBoringTable(name.c_str(), pNdb);
+  
+  if (tab == NULL)
+    return NDBT_FAILED;
+  
+  /* Create an event to listen to events on the table */
+  char eventName[1024];
+  sprintf(eventName,"%s_EVENT", tab->getName());
+
+  if (createEvent(pNdb, *tab, false, true) != 0)
+    return NDBT_FAILED;
+
+  /* Now create the event operation to retrieve the events */
+  NdbEventOperation* eventOp;
+  eventOp= pNdb->createEventOperation(eventName);
+
+  if (eventOp == NULL)
+  {
+    g_err << "Failed to create event operation :" << 
+      pNdb->getNdbError().code << endl;
+    return NDBT_FAILED;
+  }
+
+  NdbRecAttr* eventKeyData= eventOp->getValue("Key");
+  NdbRecAttr* eventOldKeyData= eventOp->getPreValue("Key");
+  NdbRecAttr* eventAttrData= eventOp->getValue("Attr");
+  NdbRecAttr* eventOldAttrData= eventOp->getPreValue("Attr");
+  
+  if ((eventKeyData == NULL) || (eventAttrData == NULL))
+  {
+    g_err << "Failed to get NdbRecAttrs for events" << endl;
+    return NDBT_FAILED;
+  };
+  
+  if (eventOp->execute() != 0)
+  {
+    g_err << "Failed to execute event operation :" <<
+      eventOp->getNdbError().code << endl;
+    return NDBT_FAILED;
+  }
+
+  /* Perform some operations on the table, and check
+   * that we get the correct AnyValues propagated
+   * through
+   */
+  NdbOperation::OperationOptions opts;
+  opts.optionsPresent= 0;
+
+  NdbInterpretedCode nonsenseProgram;
+
+  nonsenseProgram.load_const_u32(0, 0);
+  nonsenseProgram.interpret_exit_ok();
+
+  nonsenseProgram.finalise();
+
+  const Uint32 rowCount= 1500;
+  Uint32 keyOffset= 0;
+  Uint32 anyValueOffset= 100;
+
+  printf ("Testing AnyValue with no interpreted program\n");
+  for (int variants= 0; variants < 2; variants ++)
+  {
+    for (int op= Insert; op < EndOfOpTypes; op++)
+    {
+      printf("  Testing opType %d (ko=%d, ao=%d)...", 
+             op, keyOffset, anyValueOffset);
+      
+      if (executeOps(pNdb, 
+                     tab, 
+                     (OpTypes)op, 
+                     rowCount, 
+                     keyOffset, 
+                     anyValueOffset, 
+                     opts))
+        return NDBT_FAILED;
+      
+      if (checkAnyValueInEvent(pNdb, 
+                               eventOldKeyData, eventKeyData,
+                               eventOldAttrData, eventAttrData,
+                               rowCount,
+                               anyValueOffset,
+                               false // always use postKey data
+                               ) != NDBT_OK)
+        return NDBT_FAILED;
+      printf("ok\n");
+    };
+    
+    printf("Testing AnyValue with interpreted program\n");
+    opts.optionsPresent|= NdbOperation::OperationOptions::OO_INTERPRETED;
+    opts.interpretedCode= &nonsenseProgram;
+  }
+    
+  if (dropEventOperations(pNdb) != 0)
+  {
+    g_err << "Dropping event operations failed : " << 
+      pNdb->getNdbError().code << endl;
+    return NDBT_FAILED;
+  }
+  
+  if (dropEvent(pNdb, tab->getName()) != 0)
+  {
+    g_err << "Dropping event failed : " << 
+      pNdb->getDictionary()->getNdbError().code << endl;
+    return NDBT_FAILED;
+  }
+
+  pNdb->getDictionary()->dropTable(tab->getName());
+  
+  return NDBT_OK;
+}
+
+
 NDBT_TESTSUITE(test_event);
 TESTCASE("BasicEventOperation", 
 	 "Verify that we can listen to Events"
@@ -2879,6 +3228,10 @@ TESTCASE("Bug37442", "")
 {
   INITIALIZER(runBug37442);
 }
+TESTCASE("Bug37672", "NdbRecord option OO_ANYVALUE causes interpreted delete to abort.")
+{
+  INITIALIZER(runBug37672);
+}
 NDBT_TESTSUITE_END(test_event);
 
 int main(int argc, const char** argv){

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2008-07-01 15:01:50 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2008-08-07 12:32:06 +0000
@@ -629,7 +629,7 @@ args: -n CreateAndDropDuring T6 D1 D2
 
 max-time: 1500
 cmd: testDict
-args: -n CreateInvalidTables 
+args: -n CreateInvalidTables T1 
 
 max-time: 1500
 cmd: testDict
@@ -1197,6 +1197,16 @@ max-time: 500
 cmd: testBasic
 args: -n PkUpdate WIDE_MAXKEY_HUGO WIDE_MAXATTR_HUGO WIDE_MAXKEYMAXCOLS_HUGO WIDE_MINKEYMAXCOLS_HUGO
 
-
 # EOF 2008-06-30
-# EOF
+
+max-time: 500
+cmd: test_event
+args -n bug37672 T1
+
+#EOF 2008-07-04
+
+max-time: 500
+cmd: testScanFilter
+args: 
+
+#EOF 2008-07-09

=== modified file 'storage/ndb/tools/restore/Restore.cpp'
--- a/storage/ndb/tools/restore/Restore.cpp	2008-06-02 13:27:27 +0000
+++ b/storage/ndb/tools/restore/Restore.cpp	2008-08-05 14:34:39 +0000
@@ -1365,13 +1365,19 @@ bool RestoreDataIterator::readFragmentHe
     if (Header.SectionType == BackupFormat::EMPTY_ENTRY)
     {
       void *tmp;
-      buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
+      if (Header.SectionLength < 2)
+      {
+        err << "getFragmentFooter:Error reading fragment footer" << endl;
+        return false;
+      }
+      if (Header.SectionLength > 2)
+        buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
       continue;
     }
     break;
   }
   /* read rest of header */
-  if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1)
+  if (buffer_read(((char*)&Header)+8, Header.SectionLength*4-8, 1) != 1)
   {
     ret = 0;
     return false;

Thread
bzr commit into mysql-5.1-telco-6.4 branch (msvensson:2700) Magnus Svensson8 Aug